From 1e7eb30d6b8358ac440245a00b69a9533ed39de8 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Wed, 2 Aug 2023 22:11:52 +1200 Subject: [PATCH] Fix even more fetcher problems --- src/fetcher.rs | 121 ++++++++++++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 46 deletions(-) diff --git a/src/fetcher.rs b/src/fetcher.rs index b1784af2..c466f206 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -215,7 +215,7 @@ impl Fetcher { // We had a bug that put empty cache files in place (maybe we still have it). // In any case, if the file is empty, don't honor it and wipe any etag if md.len() == 0 { - let etag_file = GLOBALS.fetcher.etag_file(&url); + let etag_file = GLOBALS.fetcher.etag_file(url); let _ = fs::remove_file(etag_file); } else { if let Ok(modified) = md.modified() { @@ -321,16 +321,29 @@ impl Fetcher { // closure to run when finished (if we didn't succeed) let cache_file2 = cache_file.clone(); - let finish = |outcome, message, e: Error, sinbin_secs| { + let finish = |outcome, message, err: Option, sinbin_secs| { match outcome { FailOutcome::Fail => { if stale { - tracing::info!("FETCH {url}: Failed (using stale cache): {message}: {e}"); + if let Some(e) = err { + tracing::info!( + "FETCH {url}: Failed (using stale cache): {message}: {e}" + ); + } else { + tracing::info!("FETCH {url}: Failed (using stale cache): {message}"); + } // FIXME: bumping the mtime might not be the best way to do this. - let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now()); + let _ = filetime::set_file_mtime( + cache_file2.as_path(), + filetime::FileTime::now(), + ); self.urls.write().unwrap().remove(&url); } else { - tracing::info!("FETCH {url}: Failed: {message}: {e}"); + if let Some(e) = err { + tracing::info!("FETCH {url}: Failed: {message}: {e}"); + } else { + tracing::info!("FETCH {url}: Failed: {message}"); + } self.urls .write() .unwrap() @@ -339,11 +352,16 @@ impl Fetcher { } FailOutcome::NotModified => { tracing::info!("FETCH {url}: Succeeded: {message}"); - let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now()); + let _ = + filetime::set_file_mtime(cache_file2.as_path(), filetime::FileTime::now()); self.urls.write().unwrap().remove(&url); } FailOutcome::Requeue => { - tracing::info!("FETCH {url}: Re-Queued: {message}: {e}"); + if let Some(e) = err { + tracing::info!("FETCH {url}: Re-Queued: {message}: {e}"); + } else { + tracing::info!("FETCH {url}: Re-Queued: {message}"); + } self.urls .write() .unwrap() @@ -363,54 +381,54 @@ impl Fetcher { Ok(r) => r, Err(e) => { if e.is_builder() { - finish(FailOutcome::Fail, "builder error", e.into(), 0); + finish(FailOutcome::Fail, "builder error", Some(e.into()), 0); } else if e.is_timeout() { - finish(FailOutcome::Requeue, "timeout", e.into(), 0); + finish(FailOutcome::Requeue, "timeout", Some(e.into()), 0); } else if e.is_request() { - finish(FailOutcome::Fail, "request error", e.into(), 0); + finish(FailOutcome::Fail, "request error", Some(e.into()), 0); } else if e.is_connect() { - finish(FailOutcome::Fail, "connect error", e.into(), 15); + finish(FailOutcome::Fail, "connect error", Some(e.into()), 15); } else if e.is_body() { - finish(FailOutcome::Fail, "body error", e.into(), 0); + finish(FailOutcome::Fail, "body error", Some(e.into()), 0); } else if e.is_decode() { - finish(FailOutcome::Fail, "decode error", e.into(), 0); - } else if let Some(status) = e.status() { - if status.is_informational() { - finish(FailOutcome::Requeue, "informational error", e.into(), 0); - } else if status.is_success() { - finish(FailOutcome::Requeue, "success error", e.into(), 0); - } else if status.is_redirection() { - if status == StatusCode::NOT_MODIFIED { - finish(FailOutcome::NotModified, "not modified", e.into(), 0); - } else { - // Our client follows up to 10 redirects. This is a failure. - finish(FailOutcome::Fail, "redirection error", e.into(), 0); - } - } else if status.is_server_error() { - finish(FailOutcome::Requeue, "server error", e.into(), 300); - // give them 5 minutes, maybe the server will recover - } else { - match status { - StatusCode::REQUEST_TIMEOUT => { - finish(FailOutcome::Requeue, "request timeout", e.into(), 20); - // give 30 seconds and try again - } - StatusCode::TOO_MANY_REQUESTS => { - finish(FailOutcome::Requeue, "too many requests", e.into(), 10); - // give 15 seconds and try again - } - _ => { - finish(FailOutcome::Fail, "other", e.into(), 0); - } - } - } + finish(FailOutcome::Fail, "decode error", Some(e.into()), 0); } else { - finish(FailOutcome::Fail, "other", e.into(), 0); + finish(FailOutcome::Fail, "other", Some(e.into()), 0); } return; } }; + // Deal with status codes + let status = response.status(); + if status.is_informational() { + finish(FailOutcome::Requeue, "informational error", None, 0); + } else if status.is_redirection() { + if status == StatusCode::NOT_MODIFIED { + finish(FailOutcome::NotModified, "not modified", None, 0); + } else { + // Our client follows up to 10 redirects. This is a failure. + finish(FailOutcome::Fail, "redirection error", None, 0); + } + } else if status.is_server_error() { + finish(FailOutcome::Requeue, "server error", None, 300); + // give them 5 minutes, maybe the server will recover + } else { + match status { + StatusCode::REQUEST_TIMEOUT => { + finish(FailOutcome::Requeue, "request timeout", None, 20); + // give 30 seconds and try again + } + StatusCode::TOO_MANY_REQUESTS => { + finish(FailOutcome::Requeue, "too many requests", None, 10); + // give 15 seconds and try again + } + _ => { + finish(FailOutcome::Fail, "other", None, 0); + } + } + } + let maybe_etag = response .headers() .get(ETAG) @@ -421,16 +439,27 @@ impl Fetcher { let bytes = match maybe_bytes { Ok(bytes) => bytes, Err(e) => { - finish(FailOutcome::Fail, "response bytes", e.into(), 0); + finish(FailOutcome::Fail, "response bytes", Some(e.into()), 0); return; } }; + // Do not accept zero-length files, and don't try again + if bytes.is_empty() { + finish(FailOutcome::Fail, "zero length file", None, 10); + return; + } + GLOBALS.bytes_read.fetch_add(bytes.len(), Ordering::Relaxed); // Write to the file if let Err(e) = tokio::fs::write(cache_file.as_path(), bytes).await { - finish(FailOutcome::Fail, "writing to cache file", e.into(), 0); + finish( + FailOutcome::Fail, + "writing to cache file", + Some(e.into()), + 0, + ); return; }