mirror of
https://github.com/mikedilger/gossip.git
synced 2024-09-29 16:31:18 +00:00
Merge branch 'unstable' into lmdb
This commit is contained in:
commit
4c67016675
121
src/fetcher.rs
121
src/fetcher.rs
@ -215,7 +215,7 @@ impl Fetcher {
|
||||
// We had a bug that put empty cache files in place (maybe we still have it).
|
||||
// In any case, if the file is empty, don't honor it and wipe any etag
|
||||
if md.len() == 0 {
|
||||
let etag_file = GLOBALS.fetcher.etag_file(&url);
|
||||
let etag_file = GLOBALS.fetcher.etag_file(url);
|
||||
let _ = fs::remove_file(etag_file);
|
||||
} else {
|
||||
if let Ok(modified) = md.modified() {
|
||||
@ -321,16 +321,29 @@ impl Fetcher {
|
||||
|
||||
// closure to run when finished (if we didn't succeed)
|
||||
let cache_file2 = cache_file.clone();
|
||||
let finish = |outcome, message, e: Error, sinbin_secs| {
|
||||
let finish = |outcome, message, err: Option<Error>, sinbin_secs| {
|
||||
match outcome {
|
||||
FailOutcome::Fail => {
|
||||
if stale {
|
||||
tracing::info!("FETCH {url}: Failed (using stale cache): {message}: {e}");
|
||||
if let Some(e) = err {
|
||||
tracing::info!(
|
||||
"FETCH {url}: Failed (using stale cache): {message}: {e}"
|
||||
);
|
||||
} else {
|
||||
tracing::info!("FETCH {url}: Failed (using stale cache): {message}");
|
||||
}
|
||||
// FIXME: bumping the mtime might not be the best way to do this.
|
||||
let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now());
|
||||
let _ = filetime::set_file_mtime(
|
||||
cache_file2.as_path(),
|
||||
filetime::FileTime::now(),
|
||||
);
|
||||
self.urls.write().unwrap().remove(&url);
|
||||
} else {
|
||||
tracing::info!("FETCH {url}: Failed: {message}: {e}");
|
||||
if let Some(e) = err {
|
||||
tracing::info!("FETCH {url}: Failed: {message}: {e}");
|
||||
} else {
|
||||
tracing::info!("FETCH {url}: Failed: {message}");
|
||||
}
|
||||
self.urls
|
||||
.write()
|
||||
.unwrap()
|
||||
@ -339,11 +352,16 @@ impl Fetcher {
|
||||
}
|
||||
FailOutcome::NotModified => {
|
||||
tracing::info!("FETCH {url}: Succeeded: {message}");
|
||||
let _ = filetime::set_file_mtime(cache_file2, filetime::FileTime::now());
|
||||
let _ =
|
||||
filetime::set_file_mtime(cache_file2.as_path(), filetime::FileTime::now());
|
||||
self.urls.write().unwrap().remove(&url);
|
||||
}
|
||||
FailOutcome::Requeue => {
|
||||
tracing::info!("FETCH {url}: Re-Queued: {message}: {e}");
|
||||
if let Some(e) = err {
|
||||
tracing::info!("FETCH {url}: Re-Queued: {message}: {e}");
|
||||
} else {
|
||||
tracing::info!("FETCH {url}: Re-Queued: {message}");
|
||||
}
|
||||
self.urls
|
||||
.write()
|
||||
.unwrap()
|
||||
@ -363,54 +381,54 @@ impl Fetcher {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
if e.is_builder() {
|
||||
finish(FailOutcome::Fail, "builder error", e.into(), 0);
|
||||
finish(FailOutcome::Fail, "builder error", Some(e.into()), 0);
|
||||
} else if e.is_timeout() {
|
||||
finish(FailOutcome::Requeue, "timeout", e.into(), 0);
|
||||
finish(FailOutcome::Requeue, "timeout", Some(e.into()), 0);
|
||||
} else if e.is_request() {
|
||||
finish(FailOutcome::Fail, "request error", e.into(), 0);
|
||||
finish(FailOutcome::Fail, "request error", Some(e.into()), 0);
|
||||
} else if e.is_connect() {
|
||||
finish(FailOutcome::Fail, "connect error", e.into(), 15);
|
||||
finish(FailOutcome::Fail, "connect error", Some(e.into()), 15);
|
||||
} else if e.is_body() {
|
||||
finish(FailOutcome::Fail, "body error", e.into(), 0);
|
||||
finish(FailOutcome::Fail, "body error", Some(e.into()), 0);
|
||||
} else if e.is_decode() {
|
||||
finish(FailOutcome::Fail, "decode error", e.into(), 0);
|
||||
} else if let Some(status) = e.status() {
|
||||
if status.is_informational() {
|
||||
finish(FailOutcome::Requeue, "informational error", e.into(), 0);
|
||||
} else if status.is_success() {
|
||||
finish(FailOutcome::Requeue, "success error", e.into(), 0);
|
||||
} else if status.is_redirection() {
|
||||
if status == StatusCode::NOT_MODIFIED {
|
||||
finish(FailOutcome::NotModified, "not modified", e.into(), 0);
|
||||
} else {
|
||||
// Our client follows up to 10 redirects. This is a failure.
|
||||
finish(FailOutcome::Fail, "redirection error", e.into(), 0);
|
||||
}
|
||||
} else if status.is_server_error() {
|
||||
finish(FailOutcome::Requeue, "server error", e.into(), 300);
|
||||
// give them 5 minutes, maybe the server will recover
|
||||
} else {
|
||||
match status {
|
||||
StatusCode::REQUEST_TIMEOUT => {
|
||||
finish(FailOutcome::Requeue, "request timeout", e.into(), 20);
|
||||
// give 30 seconds and try again
|
||||
}
|
||||
StatusCode::TOO_MANY_REQUESTS => {
|
||||
finish(FailOutcome::Requeue, "too many requests", e.into(), 10);
|
||||
// give 15 seconds and try again
|
||||
}
|
||||
_ => {
|
||||
finish(FailOutcome::Fail, "other", e.into(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
finish(FailOutcome::Fail, "decode error", Some(e.into()), 0);
|
||||
} else {
|
||||
finish(FailOutcome::Fail, "other", e.into(), 0);
|
||||
finish(FailOutcome::Fail, "other", Some(e.into()), 0);
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Deal with status codes
|
||||
let status = response.status();
|
||||
if status.is_informational() {
|
||||
finish(FailOutcome::Requeue, "informational error", None, 0);
|
||||
} else if status.is_redirection() {
|
||||
if status == StatusCode::NOT_MODIFIED {
|
||||
finish(FailOutcome::NotModified, "not modified", None, 0);
|
||||
} else {
|
||||
// Our client follows up to 10 redirects. This is a failure.
|
||||
finish(FailOutcome::Fail, "redirection error", None, 0);
|
||||
}
|
||||
} else if status.is_server_error() {
|
||||
finish(FailOutcome::Requeue, "server error", None, 300);
|
||||
// give them 5 minutes, maybe the server will recover
|
||||
} else {
|
||||
match status {
|
||||
StatusCode::REQUEST_TIMEOUT => {
|
||||
finish(FailOutcome::Requeue, "request timeout", None, 20);
|
||||
// give 30 seconds and try again
|
||||
}
|
||||
StatusCode::TOO_MANY_REQUESTS => {
|
||||
finish(FailOutcome::Requeue, "too many requests", None, 10);
|
||||
// give 15 seconds and try again
|
||||
}
|
||||
_ => {
|
||||
finish(FailOutcome::Fail, "other", None, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let maybe_etag = response
|
||||
.headers()
|
||||
.get(ETAG)
|
||||
@ -421,16 +439,27 @@ impl Fetcher {
|
||||
let bytes = match maybe_bytes {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
finish(FailOutcome::Fail, "response bytes", e.into(), 0);
|
||||
finish(FailOutcome::Fail, "response bytes", Some(e.into()), 0);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Do not accept zero-length files, and don't try again
|
||||
if bytes.is_empty() {
|
||||
finish(FailOutcome::Fail, "zero length file", None, 10);
|
||||
return;
|
||||
}
|
||||
|
||||
GLOBALS.bytes_read.fetch_add(bytes.len(), Ordering::Relaxed);
|
||||
|
||||
// Write to the file
|
||||
if let Err(e) = tokio::fs::write(cache_file.as_path(), bytes).await {
|
||||
finish(FailOutcome::Fail, "writing to cache file", e.into(), 0);
|
||||
finish(
|
||||
FailOutcome::Fail,
|
||||
"writing to cache file",
|
||||
Some(e.into()),
|
||||
0,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user