diff --git a/gossip-lib/src/manager.rs b/gossip-lib/src/manager.rs index af4b2ee1..6fe9d18a 100644 --- a/gossip-lib/src/manager.rs +++ b/gossip-lib/src/manager.rs @@ -3,7 +3,6 @@ use crate::error::{Error, ErrorKind}; use crate::globals::GLOBALS; use crate::minion::Minion; use crate::pending::PendingItem; -use crate::relay::Relay; use dashmap::mapref::entry::Entry; use nostr_types::RelayUrl; @@ -16,13 +15,13 @@ use nostr_types::RelayUrl; /// /// This function returns quickly, as it spawns a separate task to do the engagement /// so you won't get any feedback. -pub(crate) async fn run_job_on_relays(urls: Vec, jobs: Vec, count: usize) { +pub(crate) fn run_jobs_on_some_relays(urls: Vec, count: usize, jobs: Vec) { // Keep engaging relays until `count` engagements were successful // Do from a spawned task so that we don't hold up the overlord let _join_handle = tokio::spawn(async move { let mut successes: usize = 0; for url in urls.iter() { - if let Ok(_) = engage_minion(url.to_owned(), jobs.clone()).await { + if let Ok(_) = engage_minion_inner(url.to_owned(), jobs.clone()).await { successes += 1; if successes >= count { break; @@ -32,7 +31,30 @@ pub(crate) async fn run_job_on_relays(urls: Vec, jobs: Vec, }); } -pub(crate) async fn engage_minion(url: RelayUrl, jobs: Vec) -> Result<(), Error> { +/// This runs the job on all relays. +/// +/// This function returns quickly, as it spawns a separate task to do the engagement +/// so you won't get any feedback. +pub(crate) fn run_jobs_on_all_relays(urls: Vec, jobs: Vec) { + // Keep engaging relays until `count` engagements were successful + // Do from a spawned task so that we don't hold up the overlord + let _ = tokio::spawn(async move { + let mut futures = Vec::new(); + for url in urls.iter() { + futures.push(engage_minion_inner(url.to_owned(), jobs.clone())); + } + futures::future::join_all(futures).await; + }); +} + +/// This will try to engage the minion and give no feedback, returning immediately. +pub(crate) fn engage_minion(url: RelayUrl, jobs: Vec) { + let _ = tokio::spawn(async move { + let _ = engage_minion_inner(url, jobs).await; + }); +} + +async fn engage_minion_inner(url: RelayUrl, mut jobs: Vec) -> Result<(), Error> { let relay = GLOBALS.storage.read_or_create_relay(&url, None)?; if GLOBALS @@ -53,14 +75,6 @@ pub(crate) async fn engage_minion(url: RelayUrl, jobs: Vec) -> Result< } } // else fall through - engage_minion_inner(relay, url, jobs).await -} - -pub(crate) async fn engage_minion_inner( - relay: Relay, - url: RelayUrl, - mut jobs: Vec, -) -> Result<(), Error> { // Do not connect if we are offline if GLOBALS.storage.read_setting_offline() { return Err(ErrorKind::Offline.into()); diff --git a/gossip-lib/src/overlord.rs b/gossip-lib/src/overlord.rs index 88d9325e..193ae7a9 100644 --- a/gossip-lib/src/overlord.rs +++ b/gossip-lib/src/overlord.rs @@ -358,7 +358,7 @@ impl Overlord { } // Subscribe to the general feed - manager::engage_minion(assignment.relay_url.clone(), jobs).await?; + manager::engage_minion(assignment.relay_url.clone(), jobs); Ok(()) } @@ -694,7 +694,7 @@ impl Overlord { Self::rank_relay(relay_url, rank)?; } ToOverlordMessage::ReengageMinion(url, jobs) => { - manager::engage_minion(url, jobs).await?; + manager::engage_minion(url, jobs); } ToOverlordMessage::RefreshSubscribedMetadata => { self.refresh_subscribed_metadata().await?; @@ -873,7 +873,7 @@ impl Overlord { // Send it the event to post tracing::debug!("Asking {} to advertise relay list", &relay_url); - if let Err(e) = manager::engage_minion( + manager::engage_minion( relay_url, vec![RelayJob { reason: RelayConnectionReason::Advertising, @@ -882,11 +882,7 @@ impl Overlord { detail: ToMinionPayloadDetail::AdvertiseRelayList(event, dmevent), }, }], - ) - .await - { - tracing::error!("{}", e); - } + ); let _ = GLOBALS .advertise_jobs_remaining @@ -971,19 +967,16 @@ impl Overlord { let config_relays: Vec = Relay::choose_relay_urls(Relay::WRITE, |_| true)?; - for relay_url in config_relays.iter() { - manager::engage_minion( - relay_url.to_owned(), - vec![RelayJob { - reason: RelayConnectionReason::PostEvent, - payload: ToMinionPayload { - job_id: rand::random::(), - detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), - }, - }], - ) - .await?; - } + manager::run_job_on_all_relays( + config_relays, + vec![RelayJob { + reason: RelayConnectionReason::PostEvent, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), + }, + }], + ); Ok(()) } @@ -1049,8 +1042,7 @@ impl Overlord { // Start the job if let Some((url, jobs)) = GLOBALS.pending.take_relay_connection_request(&relay_url) { - let relay = GLOBALS.storage.read_or_create_relay(&url, None)?; - manager::engage_minion_inner(relay, url, jobs).await?; + manager::engage_minion(url, jobs); } Ok(()) @@ -1210,19 +1202,16 @@ impl Overlord { } // Send event to all these relays - for url in relay_urls { - manager::engage_minion( - url.to_owned(), - vec![RelayJob { - reason: RelayConnectionReason::PostEvent, - payload: ToMinionPayload { - job_id: rand::random::(), - detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), - }, - }], - ) - .await?; - } + manager::run_job_on_all_relays( + relay_urls, + vec![RelayJob { + reason: RelayConnectionReason::PostEvent, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), + }, + }], + ); Ok(()) } @@ -1271,7 +1260,6 @@ impl Overlord { .storage .get_event_seen_on_relay(id)? .iter() - .take(6) // doesn't have to be everywhere .map(|(url, _time)| url.to_owned()) .collect(); relay_urls.extend(seen_on); @@ -1280,22 +1268,16 @@ impl Overlord { relay_urls.dedup(); } - for url in relay_urls { - // Send it the event to post - tracing::debug!("Asking {} to delete", &url); - - manager::engage_minion( - url.to_owned(), - vec![RelayJob { - reason: RelayConnectionReason::PostEvent, - payload: ToMinionPayload { - job_id: rand::random::(), - detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), - }, - }], - ) - .await?; - } + manager::run_job_on_all_relays( + relay_urls, + vec![RelayJob { + reason: RelayConnectionReason::PostEvent, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), + }, + }], + ); Ok(()) } @@ -1344,23 +1326,24 @@ impl Overlord { } // Don't do this if we already have the event - if !GLOBALS.storage.has_event(id)? { - // Note: minions will remember if they get the same id multiple times - // not to fetch it multiple times. + if GLOBALS.storage.has_event(id)? { + return Ok(()); + } - for url in relay_urls.iter() { - manager::engage_minion( - url.to_owned(), - vec![RelayJob { - reason: RelayConnectionReason::FetchEvent, - payload: ToMinionPayload { - job_id: rand::random::(), - detail: ToMinionPayloadDetail::FetchEvent(id), - }, - }], - ) - .await?; - } + // Note: minions will remember if they get the same id multiple times + // not to fetch it multiple times. + + for url in relay_urls.iter() { + manager::engage_minion( + url.to_owned(), + vec![RelayJob { + reason: RelayConnectionReason::FetchEvent, + payload: ToMinionPayload { + job_id: rand::random::(), + detail: ToMinionPayloadDetail::FetchEvent(id), + }, + }], + ); } Ok(()) @@ -1379,8 +1362,7 @@ impl Overlord { detail: ToMinionPayloadDetail::FetchNAddr(ea.clone()), }, }], - ) - .await?; + ); } } @@ -1585,8 +1567,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } // Process the message for ourself @@ -1628,8 +1609,7 @@ impl Overlord { detail: ToMinionPayloadDetail::TempSubscribeInboxFeedChunk(anchor), }, }], - ) - .await?; + ); } } FeedKind::Person(pubkey) => { @@ -1650,8 +1630,7 @@ impl Overlord { }, }, }], - ) - .await?; + ); } } _ => (), // other feeds can't load more @@ -1782,8 +1761,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } } @@ -1806,8 +1784,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } Ok(()) @@ -1831,8 +1808,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } Ok(()) @@ -1910,8 +1886,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } Ok(()) @@ -1949,8 +1924,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } Ok(()) @@ -1998,8 +1972,7 @@ impl Overlord { detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys), }, }], - ) - .await?; + ); } Ok(()) @@ -2118,8 +2091,7 @@ impl Overlord { detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]), }, }], - ) - .await?; + ); } Ok(()) @@ -2277,8 +2249,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeDmChannel(dmchannel.clone()), }, }], - ) - .await?; + ); } Ok(()) @@ -2298,8 +2269,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribePersonFeed(pubkey, anchor), }, }], - ) - .await?; + ); } Ok(()) @@ -2459,7 +2429,7 @@ impl Overlord { }, }]; - manager::engage_minion(url.to_owned(), jobs).await?; + manager::engage_minion(url.to_owned(), jobs); } } @@ -2504,7 +2474,7 @@ impl Overlord { }, }]; - manager::engage_minion(url.to_owned(), jobs).await?; + manager::engage_minion(url.to_owned(), jobs); } } @@ -2571,8 +2541,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeConfig, }, }], - ) - .await?; + ); } Ok(()) @@ -2618,8 +2587,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeDiscover(pubkeys.clone()), }, }], - ) - .await?; + ); } Ok(()) @@ -2642,8 +2610,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeInbox(now), }, }], - ) - .await?; + ); } Ok(()) @@ -2668,8 +2635,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeGiftwraps(after), }, }], - ) - .await?; + ); } Ok(()) @@ -2687,8 +2653,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeNip46, }, }], - ) - .await?; + ); } Ok(()) @@ -2736,8 +2701,7 @@ impl Overlord { detail: ToMinionPayloadDetail::TempSubscribeMetadata(vec![pubkey]), }, }], - ) - .await?; + ); } // Mark in globals that we want to recheck their nip-05 when that metadata @@ -2775,8 +2739,7 @@ impl Overlord { detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys), }, }], - ) - .await?; + ); } Ok(()) @@ -3109,8 +3072,7 @@ impl Overlord { detail: ToMinionPayloadDetail::SubscribeAugments(ids_hex), }, }], - ) - .await?; + ); } Ok(())