From 01b1defa7bd9f27045521d48b527109c25e45cfb Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Mon, 2 Jan 2023 21:13:38 +1300 Subject: [PATCH] Fetcher fixed, plus a requests-in-flight tracker --- src/fetcher.rs | 78 +++++++++++++++++++++++++++++--------------------- src/ui/feed.rs | 7 +++++ 2 files changed, 53 insertions(+), 32 deletions(-) diff --git a/src/fetcher.rs b/src/fetcher.rs index c0fa2074..85ff6113 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet}; use std::fs; use std::io::ErrorKind; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; use tokio::task; @@ -21,6 +22,8 @@ pub struct Fetcher { // We use std::sync::RwLock because this isn't used in async code pending: RwLock>, failed: RwLock>, + + pub requests_in_flight: AtomicUsize, } impl Fetcher { @@ -31,6 +34,7 @@ impl Fetcher { client: Client::new(), pending: RwLock::new(HashSet::new()), failed: RwLock::new(HashMap::new()), + requests_in_flight: AtomicUsize::new(0), }; // Setup the cache directory @@ -107,51 +111,61 @@ impl Fetcher { } // We can't fetch as we are not async and we don't want to block the caller. - // So we save this request as pending, and ask the syncer to sync us. - self.pending.write().unwrap().insert(url); - tokio::spawn(async move { - if let Err(e) = GLOBALS.fetcher.sync().await { - tracing::error!("Problem fetching from web: {}", e); + // So we save this request as pending, and then spawn a task to get it. + self.pending.write().unwrap().insert(url.clone()); + + task::spawn(async move { + if let Err(e) = Fetcher::fetch(url.clone()).await { + tracing::error!("Problem fetching from web: {}: {}", e, &url); + // Add to errors + GLOBALS + .fetcher + .failed + .write() + .unwrap() + .insert(url.clone(), e); } + // Remove from pending + GLOBALS.fetcher.pending.write().unwrap().remove(&url); }); Ok(None) } - pub async fn sync(&self) -> Result<(), Error> { + pub async fn fetch(url: Url) -> Result<(), Error> { // Error if we are dead - if let Some(reason) = &self.dead { + if let Some(reason) = &GLOBALS.fetcher.dead { return Err(Error::General(format!("Fetcher is dead: {}", reason))); } - // FIXME: do these in parallel - let urls = self.pending.read().unwrap().clone(); - for url in urls.iter() { - if let Err(e) = self.sync_inner(url.clone()).await { - tracing::error!("{}", e); - self.failed.write().unwrap().insert(url.clone(), e); - // leave it in pending too, it won't matter - } else { - // Remove it from pending - self.pending.write().unwrap().remove(url); - } - } - Ok(()) - } + let timeout = std::time::Duration::new(60, 0); - async fn sync_inner(&self, url: Url) -> Result<(), Error> { - let timeout = std::time::Duration::new(10, 0); + let client = GLOBALS.fetcher.client.clone(); - let client = self.client.clone(); - let url2 = url.clone(); - let bytes = - task::spawn(async move { client.get(url2.inner()).timeout(timeout).send().await }) - .await?? - .bytes() - .await?; - let cache_file = self.cache_file(&url); + GLOBALS + .fetcher + .requests_in_flight + .fetch_add(1, Ordering::SeqCst); + + // Fetch the resource + let maybe_response = client.get(url.inner()).timeout(timeout).send().await; + + // Deal with response errors + let response = maybe_response?; + + // Convert to bytes + let bytes = response.bytes().await?; + + GLOBALS + .fetcher + .requests_in_flight + .fetch_sub(1, Ordering::SeqCst); + + let cache_file = GLOBALS.fetcher.cache_file(&url); + + // Write to the file fs::write(cache_file, bytes)?; - self.pending.write().unwrap().remove(&url); + Ok(()) } } diff --git a/src/ui/feed.rs b/src/ui/feed.rs index 2abdef3d..c747f4a3 100644 --- a/src/ui/feed.rs +++ b/src/ui/feed.rs @@ -63,6 +63,13 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram if ui.button("open all").clicked() { app.hides.clear(); } + ui.label(&format!( + "RIF={}", + GLOBALS + .fetcher + .requests_in_flight + .load(std::sync::atomic::Ordering::Relaxed) + )); }); ui.vertical(|ui| {