Fetcher: changed to interior mutability

This commit is contained in:
Mike Dilger 2023-01-01 07:01:16 +13:00
parent cef0dcfcb1
commit c4f30f3d0d
4 changed files with 32 additions and 32 deletions

View File

@ -7,14 +7,9 @@ use std::collections::{HashMap, HashSet};
use std::fs; use std::fs;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::RwLock;
use tokio::task; use tokio::task;
pub enum FetcherResult {
Success(Vec<u8>),
Pending,
Error(String),
}
pub struct Fetcher { pub struct Fetcher {
// we don't want new() to fail in lazy_static init, so we just mark it dead if there was an error // we don't want new() to fail in lazy_static init, so we just mark it dead if there was an error
// on creation // on creation
@ -22,8 +17,10 @@ pub struct Fetcher {
cache_dir: PathBuf, cache_dir: PathBuf,
client: Client, client: Client,
pending: HashSet<Url>,
failed: HashMap<Url, Error>, // We use std::sync::RwLock because this isn't used in async code
pending: RwLock<HashSet<Url>>,
failed: RwLock<HashMap<Url, Error>>,
} }
impl Fetcher { impl Fetcher {
@ -32,8 +29,8 @@ impl Fetcher {
dead: None, dead: None,
cache_dir: PathBuf::new(), cache_dir: PathBuf::new(),
client: Client::new(), client: Client::new(),
pending: HashSet::new(), pending: RwLock::new(HashSet::new()),
failed: HashMap::new(), failed: RwLock::new(HashMap::new()),
}; };
// Setup the cache directory // Setup the cache directory
@ -71,68 +68,69 @@ impl Fetcher {
cache_file cache_file
} }
pub fn try_get(&mut self, url: Url) -> FetcherResult { #[allow(dead_code)]
pub fn try_get(&self, url: Url) -> Result<Option<Vec<u8>>, Error> {
// Error if we are dead // Error if we are dead
if let Some(reason) = &self.dead { if let Some(reason) = &self.dead {
return FetcherResult::Error(format!("Fetcher is dead: {}", reason)); return Err(Error::General(format!("Fetcher is dead: {}", reason)));
} }
// Error if we couldn't fetch this item // Error if we couldn't fetch this item
if let Some(error) = self.failed.get(&url) { if let Some(error) = self.failed.read().unwrap().get(&url) {
return FetcherResult::Error(format!("{}", error)); return Err(Error::General(format!("{}", error)));
} }
// Pending if we are trying to fetch this item // Pending if we are trying to fetch this item
if self.pending.contains(&url) { if self.pending.read().unwrap().contains(&url) {
return FetcherResult::Pending; return Ok(None);
} }
// Try to get it from the cache file // Try to get it from the cache file
// FIXME - even this can be time consuming and should be synced instead of tried // FIXME - even this can be time consuming and should be synced instead of tried
// directly, especially on spinning hard drives. // directly, especially on spinning hard drives.
let cache_file = self.cache_file(&url); let cache_file = self.cache_file(&url);
match fs::read(&cache_file) { match fs::read(cache_file) {
Ok(contents) => { Ok(contents) => {
tracing::debug!("Found web content at {}", cache_file.display()); tracing::debug!("cache hit");
return FetcherResult::Success(contents); return Ok(Some(contents));
} }
Err(e) => { Err(e) => {
// Any error other than this falls through // Any error other than this falls through
if e.kind() != ErrorKind::NotFound { if e.kind() != ErrorKind::NotFound {
return FetcherResult::Error(format!("{}", e)); return Err(e.into());
} }
} }
} }
// We can't fetch as we are not async and we don't want to block the caller. // We can't fetch as we are not async and we don't want to block the caller.
// So we save this request as pending, and ask the syncer to sync us. // So we save this request as pending, and ask the syncer to sync us.
self.pending.insert(url); self.pending.write().unwrap().insert(url);
let _ = GLOBALS.to_syncer.send("sync_fetcher".to_owned()); let _ = GLOBALS.to_syncer.send("sync_fetcher".to_owned());
FetcherResult::Pending Ok(None)
} }
pub async fn sync(&mut self) -> Result<(), Error> { pub async fn sync(&self) -> Result<(), Error> {
// Error if we are dead // Error if we are dead
if let Some(reason) = &self.dead { if let Some(reason) = &self.dead {
return Err(Error::General(format!("Fetcher is dead: {}", reason))); return Err(Error::General(format!("Fetcher is dead: {}", reason)));
} }
// FIXME: do these in parallel // FIXME: do these in parallel
let urls = self.pending.clone(); let urls = self.pending.read().unwrap().clone();
for url in urls.iter() { for url in urls.iter() {
if let Err(e) = self.sync_inner(url.clone()).await { if let Err(e) = self.sync_inner(url.clone()).await {
tracing::error!("{}", e); tracing::error!("{}", e);
self.failed.insert(url.clone(), e); self.failed.write().unwrap().insert(url.clone(), e);
// leave it in pending too, it won't matter // leave it in pending too, it won't matter
} else { } else {
// Remove it from pending // Remove it from pending
self.pending.remove(url); self.pending.write().unwrap().remove(url);
} }
} }
Ok(()) Ok(())
} }
async fn sync_inner(&mut self, url: Url) -> Result<(), Error> { async fn sync_inner(&self, url: Url) -> Result<(), Error> {
let timeout = std::time::Duration::new(10, 0); let timeout = std::time::Duration::new(10, 0);
let client = self.client.clone(); let client = self.client.clone();

View File

@ -83,7 +83,7 @@ pub struct Globals {
pub feed: Mutex<Feed>, pub feed: Mutex<Feed>,
/// Fetcher /// Fetcher
pub fetcher: RwLock<Fetcher>, pub fetcher: Fetcher,
} }
lazy_static! { lazy_static! {
@ -118,7 +118,7 @@ lazy_static! {
dismissed: RwLock::new(Vec::new()), dismissed: RwLock::new(Vec::new()),
event_is_new: RwLock::new(Vec::new()), event_is_new: RwLock::new(Vec::new()),
feed: Mutex::new(Feed::new()), feed: Mutex::new(Feed::new()),
fetcher: RwLock::new(Fetcher::new()), fetcher: Fetcher::new(),
} }
}; };
} }

View File

@ -23,8 +23,10 @@ impl People {
pub async fn get_followed_pubkeys(&self) -> Vec<PublicKeyHex> { pub async fn get_followed_pubkeys(&self) -> Vec<PublicKeyHex> {
let mut output: Vec<PublicKeyHex> = Vec::new(); let mut output: Vec<PublicKeyHex> = Vec::new();
for person in self.people.iter() for person in self
.filter_map(|(_,p)| if p.followed==1 { Some(p) } else { None }) .people
.iter()
.filter_map(|(_, p)| if p.followed == 1 { Some(p) } else { None })
{ {
output.push(person.pubkey.clone()); output.push(person.pubkey.clone());
} }

View File

@ -27,7 +27,7 @@ impl Syncer {
} }
} }
"sync_fetcher" => { "sync_fetcher" => {
if let Err(e) = GLOBALS.fetcher.write().await.sync().await { if let Err(e) = GLOBALS.fetcher.sync().await {
tracing::error!("Problem fetching from web: {}", e); tracing::error!("Problem fetching from web: {}", e);
} }
} }