overlord: refactor handle_tasknextjoined()

This commit is contained in:
Mike Dilger 2022-12-26 13:17:22 +13:00
parent bd9f57cb44
commit eb8df21e2e

View File

@ -73,49 +73,9 @@ impl Overlord {
// Listen on self.minions until it is empty // Listen on self.minions until it is empty
while !self.minions.is_empty() { while !self.minions.is_empty() {
let task_next_joined = self.minions.join_next_with_id().await; let task_nextjoined = self.minions.join_next_with_id().await;
if task_next_joined.is_none() {
continue;
} // rare but possible
match task_next_joined.unwrap() {
Err(join_error) => {
let id = join_error.id();
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
// JoinError also has is_cancelled, is_panic, into_panic, try_into_panic
// Minion probably alreaedy logged, this may be redundant.
warn!("Minion {} completed with error: {}", &url, join_error);
// Minion probably already logged failure in relay table self.handle_task_nextjoined(task_nextjoined).await;
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
}
None => {
warn!("Minion UNKNOWN completed with error: {}", join_error);
}
}
}
Ok((id, _)) => {
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
info!("Relay Task {} completed", &url);
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
}
None => warn!("Relay Task UNKNOWN completed"),
}
}
}
} }
info!("Overlord confirms all minions have shutdown"); info!("Overlord confirms all minions have shutdown");
@ -288,55 +248,62 @@ impl Overlord {
}; };
keepgoing = self.handle_bus_message(bus_message).await?; keepgoing = self.handle_bus_message(bus_message).await?;
}, },
task_next_joined = self.minions.join_next_with_id() => { task_nextjoined = self.minions.join_next_with_id() => {
if task_next_joined.is_none() { self.handle_task_nextjoined(task_nextjoined).await;
return Ok(true); // rare but possible
}
match task_next_joined.unwrap() {
Err(join_error) => {
let id = join_error.id();
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
// JoinError also has is_cancelled, is_panic, into_panic, try_into_panic
// Minion probably alreaedy logged, this may be redundant.
warn!("Minion {} completed with error: {}", &url, join_error);
// Minion probably already logged failure in relay table
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
},
None => {
warn!("Minion UNKNOWN completed with error: {}", join_error);
}
}
},
Ok((id, _)) => {
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
warn!("Relay Task {} completed", &url);
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
},
None => warn!("Relay Task UNKNOWN completed"),
}
}
}
} }
} }
Ok(keepgoing) Ok(keepgoing)
} }
async fn handle_task_nextjoined(
&mut self,
task_nextjoined: Option<Result<(task::Id, ()), task::JoinError>>,
) {
if task_nextjoined.is_none() {
return; // rare but possible
}
match task_nextjoined.unwrap() {
Err(join_error) => {
let id = join_error.id();
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
// JoinError also has is_cancelled, is_panic, into_panic, try_into_panic
// Minion probably alreaedy logged, this may be redundant.
warn!("Minion {} completed with error: {}", &url, join_error);
// Minion probably already logged failure in relay table
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
}
None => {
warn!("Minion UNKNOWN completed with error: {}", join_error);
}
}
}
Ok((id, _)) => {
let maybe_url = self.minions_task_url.get(&id);
match maybe_url {
Some(url) => {
warn!("Relay Task {} completed", &url);
// Remove from our urls_watching vec
self.urls_watching.retain(|value| value != url);
// Remove from our hashmap
self.minions_task_url.remove(&id);
}
None => warn!("Relay Task UNKNOWN completed"),
}
}
}
}
async fn handle_bus_message(&mut self, bus_message: BusMessage) -> Result<bool, Error> { async fn handle_bus_message(&mut self, bus_message: BusMessage) -> Result<bool, Error> {
#[allow(clippy::single_match)] // because temporarily so #[allow(clippy::single_match)] // because temporarily so
match &*bus_message.target { match &*bus_message.target {