From c4f30f3d0d7a23e70e0d225b7c3124de08c46a71 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sun, 1 Jan 2023 07:01:16 +1300 Subject: [PATCH] Fetcher: changed to interior mutability --- src/fetcher.rs | 52 +++++++++++++++++++++++------------------------ src/globals.rs | 4 ++-- src/people/mod.rs | 6 ++++-- src/syncer.rs | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/fetcher.rs b/src/fetcher.rs index 50ff7433..86f2d0cc 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -7,14 +7,9 @@ use std::collections::{HashMap, HashSet}; use std::fs; use std::io::ErrorKind; use std::path::PathBuf; +use std::sync::RwLock; use tokio::task; -pub enum FetcherResult { - Success(Vec), - Pending, - Error(String), -} - pub struct Fetcher { // we don't want new() to fail in lazy_static init, so we just mark it dead if there was an error // on creation @@ -22,8 +17,10 @@ pub struct Fetcher { cache_dir: PathBuf, client: Client, - pending: HashSet, - failed: HashMap, + + // We use std::sync::RwLock because this isn't used in async code + pending: RwLock>, + failed: RwLock>, } impl Fetcher { @@ -32,8 +29,8 @@ impl Fetcher { dead: None, cache_dir: PathBuf::new(), client: Client::new(), - pending: HashSet::new(), - failed: HashMap::new(), + pending: RwLock::new(HashSet::new()), + failed: RwLock::new(HashMap::new()), }; // Setup the cache directory @@ -71,68 +68,69 @@ impl Fetcher { cache_file } - pub fn try_get(&mut self, url: Url) -> FetcherResult { + #[allow(dead_code)] + pub fn try_get(&self, url: Url) -> Result>, Error> { // Error if we are dead if let Some(reason) = &self.dead { - return FetcherResult::Error(format!("Fetcher is dead: {}", reason)); + return Err(Error::General(format!("Fetcher is dead: {}", reason))); } // Error if we couldn't fetch this item - if let Some(error) = self.failed.get(&url) { - return FetcherResult::Error(format!("{}", error)); + if let Some(error) = self.failed.read().unwrap().get(&url) { + return Err(Error::General(format!("{}", error))); } // Pending if we are trying to fetch this item - if self.pending.contains(&url) { - return FetcherResult::Pending; + if self.pending.read().unwrap().contains(&url) { + return Ok(None); } // Try to get it from the cache file // FIXME - even this can be time consuming and should be synced instead of tried // directly, especially on spinning hard drives. let cache_file = self.cache_file(&url); - match fs::read(&cache_file) { + match fs::read(cache_file) { Ok(contents) => { - tracing::debug!("Found web content at {}", cache_file.display()); - return FetcherResult::Success(contents); + tracing::debug!("cache hit"); + return Ok(Some(contents)); } Err(e) => { // Any error other than this falls through if e.kind() != ErrorKind::NotFound { - return FetcherResult::Error(format!("{}", e)); + return Err(e.into()); } } } // We can't fetch as we are not async and we don't want to block the caller. // So we save this request as pending, and ask the syncer to sync us. - self.pending.insert(url); + self.pending.write().unwrap().insert(url); let _ = GLOBALS.to_syncer.send("sync_fetcher".to_owned()); - FetcherResult::Pending + Ok(None) } - pub async fn sync(&mut self) -> Result<(), Error> { + pub async fn sync(&self) -> Result<(), Error> { // Error if we are dead if let Some(reason) = &self.dead { return Err(Error::General(format!("Fetcher is dead: {}", reason))); } // FIXME: do these in parallel - let urls = self.pending.clone(); + let urls = self.pending.read().unwrap().clone(); for url in urls.iter() { if let Err(e) = self.sync_inner(url.clone()).await { tracing::error!("{}", e); - self.failed.insert(url.clone(), e); + self.failed.write().unwrap().insert(url.clone(), e); // leave it in pending too, it won't matter } else { // Remove it from pending - self.pending.remove(url); + self.pending.write().unwrap().remove(url); } } Ok(()) } - async fn sync_inner(&mut self, url: Url) -> Result<(), Error> { + async fn sync_inner(&self, url: Url) -> Result<(), Error> { let timeout = std::time::Duration::new(10, 0); let client = self.client.clone(); diff --git a/src/globals.rs b/src/globals.rs index 660c50f4..ed7c059f 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -83,7 +83,7 @@ pub struct Globals { pub feed: Mutex, /// Fetcher - pub fetcher: RwLock, + pub fetcher: Fetcher, } lazy_static! { @@ -118,7 +118,7 @@ lazy_static! { dismissed: RwLock::new(Vec::new()), event_is_new: RwLock::new(Vec::new()), feed: Mutex::new(Feed::new()), - fetcher: RwLock::new(Fetcher::new()), + fetcher: Fetcher::new(), } }; } diff --git a/src/people/mod.rs b/src/people/mod.rs index e2daa999..d9626735 100644 --- a/src/people/mod.rs +++ b/src/people/mod.rs @@ -23,8 +23,10 @@ impl People { pub async fn get_followed_pubkeys(&self) -> Vec { let mut output: Vec = Vec::new(); - for person in self.people.iter() - .filter_map(|(_,p)| if p.followed==1 { Some(p) } else { None }) + for person in self + .people + .iter() + .filter_map(|(_, p)| if p.followed == 1 { Some(p) } else { None }) { output.push(person.pubkey.clone()); } diff --git a/src/syncer.rs b/src/syncer.rs index f3c3a53b..c2691c73 100644 --- a/src/syncer.rs +++ b/src/syncer.rs @@ -27,7 +27,7 @@ impl Syncer { } } "sync_fetcher" => { - if let Err(e) = GLOBALS.fetcher.write().await.sync().await { + if let Err(e) = GLOBALS.fetcher.sync().await { tracing::error!("Problem fetching from web: {}", e); } }