Fetcher fixed, plus a requests-in-flight tracker

This commit is contained in:
Mike Dilger 2023-01-02 21:13:38 +13:00
parent 61597e841b
commit 01b1defa7b
2 changed files with 53 additions and 32 deletions

View File

@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet};
use std::fs; use std::fs;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock; use std::sync::RwLock;
use tokio::task; use tokio::task;
@ -21,6 +22,8 @@ pub struct Fetcher {
// We use std::sync::RwLock because this isn't used in async code // We use std::sync::RwLock because this isn't used in async code
pending: RwLock<HashSet<Url>>, pending: RwLock<HashSet<Url>>,
failed: RwLock<HashMap<Url, Error>>, failed: RwLock<HashMap<Url, Error>>,
pub requests_in_flight: AtomicUsize,
} }
impl Fetcher { impl Fetcher {
@ -31,6 +34,7 @@ impl Fetcher {
client: Client::new(), client: Client::new(),
pending: RwLock::new(HashSet::new()), pending: RwLock::new(HashSet::new()),
failed: RwLock::new(HashMap::new()), failed: RwLock::new(HashMap::new()),
requests_in_flight: AtomicUsize::new(0),
}; };
// Setup the cache directory // Setup the cache directory
@ -107,51 +111,61 @@ impl Fetcher {
} }
// We can't fetch as we are not async and we don't want to block the caller. // We can't fetch as we are not async and we don't want to block the caller.
// So we save this request as pending, and ask the syncer to sync us. // So we save this request as pending, and then spawn a task to get it.
self.pending.write().unwrap().insert(url); self.pending.write().unwrap().insert(url.clone());
tokio::spawn(async move {
if let Err(e) = GLOBALS.fetcher.sync().await { task::spawn(async move {
tracing::error!("Problem fetching from web: {}", e); if let Err(e) = Fetcher::fetch(url.clone()).await {
tracing::error!("Problem fetching from web: {}: {}", e, &url);
// Add to errors
GLOBALS
.fetcher
.failed
.write()
.unwrap()
.insert(url.clone(), e);
} }
// Remove from pending
GLOBALS.fetcher.pending.write().unwrap().remove(&url);
}); });
Ok(None) Ok(None)
} }
pub async fn sync(&self) -> Result<(), Error> { pub async fn fetch(url: Url) -> Result<(), Error> {
// Error if we are dead // Error if we are dead
if let Some(reason) = &self.dead { if let Some(reason) = &GLOBALS.fetcher.dead {
return Err(Error::General(format!("Fetcher is dead: {}", reason))); return Err(Error::General(format!("Fetcher is dead: {}", reason)));
} }
// FIXME: do these in parallel let timeout = std::time::Duration::new(60, 0);
let urls = self.pending.read().unwrap().clone();
for url in urls.iter() {
if let Err(e) = self.sync_inner(url.clone()).await {
tracing::error!("{}", e);
self.failed.write().unwrap().insert(url.clone(), e);
// leave it in pending too, it won't matter
} else {
// Remove it from pending
self.pending.write().unwrap().remove(url);
}
}
Ok(())
}
async fn sync_inner(&self, url: Url) -> Result<(), Error> { let client = GLOBALS.fetcher.client.clone();
let timeout = std::time::Duration::new(10, 0);
let client = self.client.clone(); GLOBALS
let url2 = url.clone(); .fetcher
let bytes = .requests_in_flight
task::spawn(async move { client.get(url2.inner()).timeout(timeout).send().await }) .fetch_add(1, Ordering::SeqCst);
.await??
.bytes() // Fetch the resource
.await?; let maybe_response = client.get(url.inner()).timeout(timeout).send().await;
let cache_file = self.cache_file(&url);
// Deal with response errors
let response = maybe_response?;
// Convert to bytes
let bytes = response.bytes().await?;
GLOBALS
.fetcher
.requests_in_flight
.fetch_sub(1, Ordering::SeqCst);
let cache_file = GLOBALS.fetcher.cache_file(&url);
// Write to the file
fs::write(cache_file, bytes)?; fs::write(cache_file, bytes)?;
self.pending.write().unwrap().remove(&url);
Ok(()) Ok(())
} }
} }

View File

@ -63,6 +63,13 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
if ui.button("open all").clicked() { if ui.button("open all").clicked() {
app.hides.clear(); app.hides.clear();
} }
ui.label(&format!(
"RIF={}",
GLOBALS
.fetcher
.requests_in_flight
.load(std::sync::atomic::Ordering::Relaxed)
));
}); });
ui.vertical(|ui| { ui.vertical(|ui| {