Fix even more fetcher problems

This commit is contained in:
Mike Dilger 2023-08-02 22:11:52 +12:00
parent 460565fdd8
commit 1e7eb30d6b

View File

@ -215,7 +215,7 @@ impl Fetcher {
// We had a bug that put empty cache files in place (maybe we still have it).
// In any case, if the file is empty, don't honor it and wipe any etag
if md.len() == 0 {
let etag_file = GLOBALS.fetcher.etag_file(&url);
let etag_file = GLOBALS.fetcher.etag_file(url);
let _ = fs::remove_file(etag_file);
} else {
if let Ok(modified) = md.modified() {
@ -321,16 +321,29 @@ impl Fetcher {
// closure to run when finished (if we didn't succeed)
let cache_file2 = cache_file.clone();
let finish = |outcome, message, e: Error, sinbin_secs| {
let finish = |outcome, message, err: Option<Error>, sinbin_secs| {
match outcome {
FailOutcome::Fail => {
if stale {
tracing::info!("FETCH {url}: Failed (using stale cache): {message}: {e}");
if let Some(e) = err {
tracing::info!(
"FETCH {url}: Failed (using stale cache): {message}: {e}"
);
} else {
tracing::info!("FETCH {url}: Failed (using stale cache): {message}");
}
// FIXME: bumping the mtime might not be the best way to do this.
let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now());
let _ = filetime::set_file_mtime(
cache_file2.as_path(),
filetime::FileTime::now(),
);
self.urls.write().unwrap().remove(&url);
} else {
tracing::info!("FETCH {url}: Failed: {message}: {e}");
if let Some(e) = err {
tracing::info!("FETCH {url}: Failed: {message}: {e}");
} else {
tracing::info!("FETCH {url}: Failed: {message}");
}
self.urls
.write()
.unwrap()
@ -339,11 +352,16 @@ impl Fetcher {
}
FailOutcome::NotModified => {
tracing::info!("FETCH {url}: Succeeded: {message}");
let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now());
let _ =
filetime::set_file_mtime(cache_file2.as_path(), filetime::FileTime::now());
self.urls.write().unwrap().remove(&url);
}
FailOutcome::Requeue => {
tracing::info!("FETCH {url}: Re-Queued: {message}: {e}");
if let Some(e) = err {
tracing::info!("FETCH {url}: Re-Queued: {message}: {e}");
} else {
tracing::info!("FETCH {url}: Re-Queued: {message}");
}
self.urls
.write()
.unwrap()
@ -363,54 +381,54 @@ impl Fetcher {
Ok(r) => r,
Err(e) => {
if e.is_builder() {
finish(FailOutcome::Fail, "builder error", e.into(), 0);
finish(FailOutcome::Fail, "builder error", Some(e.into()), 0);
} else if e.is_timeout() {
finish(FailOutcome::Requeue, "timeout", e.into(), 0);
finish(FailOutcome::Requeue, "timeout", Some(e.into()), 0);
} else if e.is_request() {
finish(FailOutcome::Fail, "request error", e.into(), 0);
finish(FailOutcome::Fail, "request error", Some(e.into()), 0);
} else if e.is_connect() {
finish(FailOutcome::Fail, "connect error", e.into(), 15);
finish(FailOutcome::Fail, "connect error", Some(e.into()), 15);
} else if e.is_body() {
finish(FailOutcome::Fail, "body error", e.into(), 0);
finish(FailOutcome::Fail, "body error", Some(e.into()), 0);
} else if e.is_decode() {
finish(FailOutcome::Fail, "decode error", e.into(), 0);
} else if let Some(status) = e.status() {
if status.is_informational() {
finish(FailOutcome::Requeue, "informational error", e.into(), 0);
} else if status.is_success() {
finish(FailOutcome::Requeue, "success error", e.into(), 0);
} else if status.is_redirection() {
if status == StatusCode::NOT_MODIFIED {
finish(FailOutcome::NotModified, "not modified", e.into(), 0);
} else {
// Our client follows up to 10 redirects. This is a failure.
finish(FailOutcome::Fail, "redirection error", e.into(), 0);
}
} else if status.is_server_error() {
finish(FailOutcome::Requeue, "server error", e.into(), 300);
// give them 5 minutes, maybe the server will recover
} else {
match status {
StatusCode::REQUEST_TIMEOUT => {
finish(FailOutcome::Requeue, "request timeout", e.into(), 20);
// give 30 seconds and try again
}
StatusCode::TOO_MANY_REQUESTS => {
finish(FailOutcome::Requeue, "too many requests", e.into(), 10);
// give 15 seconds and try again
}
_ => {
finish(FailOutcome::Fail, "other", e.into(), 0);
}
}
}
finish(FailOutcome::Fail, "decode error", Some(e.into()), 0);
} else {
finish(FailOutcome::Fail, "other", e.into(), 0);
finish(FailOutcome::Fail, "other", Some(e.into()), 0);
}
return;
}
};
// Deal with status codes
let status = response.status();
if status.is_informational() {
finish(FailOutcome::Requeue, "informational error", None, 0);
} else if status.is_redirection() {
if status == StatusCode::NOT_MODIFIED {
finish(FailOutcome::NotModified, "not modified", None, 0);
} else {
// Our client follows up to 10 redirects. This is a failure.
finish(FailOutcome::Fail, "redirection error", None, 0);
}
} else if status.is_server_error() {
finish(FailOutcome::Requeue, "server error", None, 300);
// give them 5 minutes, maybe the server will recover
} else {
match status {
StatusCode::REQUEST_TIMEOUT => {
finish(FailOutcome::Requeue, "request timeout", None, 20);
// give 30 seconds and try again
}
StatusCode::TOO_MANY_REQUESTS => {
finish(FailOutcome::Requeue, "too many requests", None, 10);
// give 15 seconds and try again
}
_ => {
finish(FailOutcome::Fail, "other", None, 0);
}
}
}
let maybe_etag = response
.headers()
.get(ETAG)
@ -421,16 +439,27 @@ impl Fetcher {
let bytes = match maybe_bytes {
Ok(bytes) => bytes,
Err(e) => {
finish(FailOutcome::Fail, "response bytes", e.into(), 0);
finish(FailOutcome::Fail, "response bytes", Some(e.into()), 0);
return;
}
};
// Do not accept zero-length files, and don't try again
if bytes.is_empty() {
finish(FailOutcome::Fail, "zero length file", None, 10);
return;
}
GLOBALS.bytes_read.fetch_add(bytes.len(), Ordering::Relaxed);
// Write to the file
if let Err(e) = tokio::fs::write(cache_file.as_path(), bytes).await {
finish(FailOutcome::Fail, "writing to cache file", e.into(), 0);
finish(
FailOutcome::Fail,
"writing to cache file",
Some(e.into()),
0,
);
return;
}