Cork metadata and subscribe to it only after previous subscriptions complete

This commit is contained in:
Mike Dilger 2023-12-11 08:28:21 +13:00
parent 5774bcfab8
commit c33700c91b

View File

@ -52,6 +52,7 @@ pub struct Minion {
last_message_sent: String,
waiting_for_auth: Option<Id>,
corked_subscriptions: Vec<(String, Unixtime)>,
corked_metadata: Vec<(u64, Vec<PublicKey>)>,
}
impl Minion {
@ -82,6 +83,7 @@ impl Minion {
last_message_sent: String::new(),
waiting_for_auth: None,
corked_subscriptions: Vec::new(),
corked_metadata: Vec::new(),
})
}
}
@ -300,7 +302,7 @@ impl Minion {
// Update subscription for sought events
self.get_events().await?;
// Try to subscribe to corked subscriptions
// Try to subscribe to corked subscriptions and metadata
self.try_resubscribe_to_corked().await?;
},
to_minion_message = self.from_overlord.recv() => {
@ -832,11 +834,33 @@ impl Minion {
return Ok(());
}
// Subscribe to metadata
if !self.subscription_map.has("temp_subscribe_metadata") && !self.corked_metadata.is_empty()
{
let mut corked_metadata = std::mem::take(&mut self.corked_metadata);
let mut combined_job_id: Option<u64> = None;
let mut combined_pubkeys: Vec<PublicKey> = Vec::new();
for (job_id, pubkeys) in corked_metadata.drain(..) {
if combined_job_id.is_none() {
combined_job_id = Some(job_id)
} else {
// Tell the overlord this job id is over (it got combined into
// another job_id)
self.to_overlord.send(ToOverlordMessage::MinionJobComplete(
self.url.clone(),
job_id,
))?;
}
combined_pubkeys.extend(pubkeys);
}
self.temp_subscribe_metadata(combined_job_id.unwrap(), combined_pubkeys)
.await?;
}
// Apply subscriptions that were waiting for auth
let mut handles = std::mem::take(&mut self.corked_subscriptions);
let now = Unixtime::now().unwrap();
for (handle, when) in handles.drain(..) {
// Do not try if we just inserted it within the last second
if when - now < Duration::from_secs(1) {
@ -872,6 +896,12 @@ impl Minion {
job_id: u64,
mut pubkeys: Vec<PublicKey>,
) -> Result<(), Error> {
if self.subscription_map.has("temp_subscribe_metadata") {
// Save for later
self.corked_metadata.push((job_id, pubkeys));
return Ok(());
}
let pkhp: Vec<PublicKeyHex> = pubkeys.drain(..).map(|pk| pk.into()).collect();
tracing::trace!("Temporarily subscribing to metadata on {}", &self.url);