Run job on relays no longer async; use new manager funcs more in overlord

This commit is contained in:
Mike Dilger 2024-08-03 06:33:26 +12:00
parent bcadb73170
commit 51f36e36e7
2 changed files with 101 additions and 125 deletions

View File

@ -3,7 +3,6 @@ use crate::error::{Error, ErrorKind};
use crate::globals::GLOBALS;
use crate::minion::Minion;
use crate::pending::PendingItem;
use crate::relay::Relay;
use dashmap::mapref::entry::Entry;
use nostr_types::RelayUrl;
@ -16,13 +15,13 @@ use nostr_types::RelayUrl;
///
/// This function returns quickly, as it spawns a separate task to do the engagement
/// so you won't get any feedback.
pub(crate) async fn run_job_on_relays(urls: Vec<RelayUrl>, jobs: Vec<RelayJob>, count: usize) {
pub(crate) fn run_jobs_on_some_relays(urls: Vec<RelayUrl>, count: usize, jobs: Vec<RelayJob>) {
// Keep engaging relays until `count` engagements were successful
// Do from a spawned task so that we don't hold up the overlord
let _join_handle = tokio::spawn(async move {
let mut successes: usize = 0;
for url in urls.iter() {
if let Ok(_) = engage_minion(url.to_owned(), jobs.clone()).await {
if let Ok(_) = engage_minion_inner(url.to_owned(), jobs.clone()).await {
successes += 1;
if successes >= count {
break;
@ -32,7 +31,30 @@ pub(crate) async fn run_job_on_relays(urls: Vec<RelayUrl>, jobs: Vec<RelayJob>,
});
}
pub(crate) async fn engage_minion(url: RelayUrl, jobs: Vec<RelayJob>) -> Result<(), Error> {
/// This runs the job on all relays.
///
/// This function returns quickly, as it spawns a separate task to do the engagement
/// so you won't get any feedback.
pub(crate) fn run_jobs_on_all_relays(urls: Vec<RelayUrl>, jobs: Vec<RelayJob>) {
// Keep engaging relays until `count` engagements were successful
// Do from a spawned task so that we don't hold up the overlord
let _ = tokio::spawn(async move {
let mut futures = Vec::new();
for url in urls.iter() {
futures.push(engage_minion_inner(url.to_owned(), jobs.clone()));
}
futures::future::join_all(futures).await;
});
}
/// This will try to engage the minion and give no feedback, returning immediately.
pub(crate) fn engage_minion(url: RelayUrl, jobs: Vec<RelayJob>) {
let _ = tokio::spawn(async move {
let _ = engage_minion_inner(url, jobs).await;
});
}
async fn engage_minion_inner(url: RelayUrl, mut jobs: Vec<RelayJob>) -> Result<(), Error> {
let relay = GLOBALS.storage.read_or_create_relay(&url, None)?;
if GLOBALS
@ -53,14 +75,6 @@ pub(crate) async fn engage_minion(url: RelayUrl, jobs: Vec<RelayJob>) -> Result<
}
} // else fall through
engage_minion_inner(relay, url, jobs).await
}
pub(crate) async fn engage_minion_inner(
relay: Relay,
url: RelayUrl,
mut jobs: Vec<RelayJob>,
) -> Result<(), Error> {
// Do not connect if we are offline
if GLOBALS.storage.read_setting_offline() {
return Err(ErrorKind::Offline.into());

View File

@ -358,7 +358,7 @@ impl Overlord {
}
// Subscribe to the general feed
manager::engage_minion(assignment.relay_url.clone(), jobs).await?;
manager::engage_minion(assignment.relay_url.clone(), jobs);
Ok(())
}
@ -694,7 +694,7 @@ impl Overlord {
Self::rank_relay(relay_url, rank)?;
}
ToOverlordMessage::ReengageMinion(url, jobs) => {
manager::engage_minion(url, jobs).await?;
manager::engage_minion(url, jobs);
}
ToOverlordMessage::RefreshSubscribedMetadata => {
self.refresh_subscribed_metadata().await?;
@ -873,7 +873,7 @@ impl Overlord {
// Send it the event to post
tracing::debug!("Asking {} to advertise relay list", &relay_url);
if let Err(e) = manager::engage_minion(
manager::engage_minion(
relay_url,
vec![RelayJob {
reason: RelayConnectionReason::Advertising,
@ -882,11 +882,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::AdvertiseRelayList(event, dmevent),
},
}],
)
.await
{
tracing::error!("{}", e);
}
);
let _ = GLOBALS
.advertise_jobs_remaining
@ -971,19 +967,16 @@ impl Overlord {
let config_relays: Vec<RelayUrl> = Relay::choose_relay_urls(Relay::WRITE, |_| true)?;
for relay_url in config_relays.iter() {
manager::engage_minion(
relay_url.to_owned(),
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
}
manager::run_job_on_all_relays(
config_relays,
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
);
Ok(())
}
@ -1049,8 +1042,7 @@ impl Overlord {
// Start the job
if let Some((url, jobs)) = GLOBALS.pending.take_relay_connection_request(&relay_url) {
let relay = GLOBALS.storage.read_or_create_relay(&url, None)?;
manager::engage_minion_inner(relay, url, jobs).await?;
manager::engage_minion(url, jobs);
}
Ok(())
@ -1210,19 +1202,16 @@ impl Overlord {
}
// Send event to all these relays
for url in relay_urls {
manager::engage_minion(
url.to_owned(),
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
}
manager::run_job_on_all_relays(
relay_urls,
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
);
Ok(())
}
@ -1271,7 +1260,6 @@ impl Overlord {
.storage
.get_event_seen_on_relay(id)?
.iter()
.take(6) // doesn't have to be everywhere
.map(|(url, _time)| url.to_owned())
.collect();
relay_urls.extend(seen_on);
@ -1280,22 +1268,16 @@ impl Overlord {
relay_urls.dedup();
}
for url in relay_urls {
// Send it the event to post
tracing::debug!("Asking {} to delete", &url);
manager::engage_minion(
url.to_owned(),
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
}
manager::run_job_on_all_relays(
relay_urls,
vec![RelayJob {
reason: RelayConnectionReason::PostEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
);
Ok(())
}
@ -1344,23 +1326,24 @@ impl Overlord {
}
// Don't do this if we already have the event
if !GLOBALS.storage.has_event(id)? {
// Note: minions will remember if they get the same id multiple times
// not to fetch it multiple times.
if GLOBALS.storage.has_event(id)? {
return Ok(());
}
for url in relay_urls.iter() {
manager::engage_minion(
url.to_owned(),
vec![RelayJob {
reason: RelayConnectionReason::FetchEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::FetchEvent(id),
},
}],
)
.await?;
}
// Note: minions will remember if they get the same id multiple times
// not to fetch it multiple times.
for url in relay_urls.iter() {
manager::engage_minion(
url.to_owned(),
vec![RelayJob {
reason: RelayConnectionReason::FetchEvent,
payload: ToMinionPayload {
job_id: rand::random::<u64>(),
detail: ToMinionPayloadDetail::FetchEvent(id),
},
}],
);
}
Ok(())
@ -1379,8 +1362,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::FetchNAddr(ea.clone()),
},
}],
)
.await?;
);
}
}
@ -1585,8 +1567,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
// Process the message for ourself
@ -1628,8 +1609,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::TempSubscribeInboxFeedChunk(anchor),
},
}],
)
.await?;
);
}
}
FeedKind::Person(pubkey) => {
@ -1650,8 +1630,7 @@ impl Overlord {
},
},
}],
)
.await?;
);
}
}
_ => (), // other feeds can't load more
@ -1782,8 +1761,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
}
@ -1806,8 +1784,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
Ok(())
@ -1831,8 +1808,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
Ok(())
@ -1910,8 +1886,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
Ok(())
@ -1949,8 +1924,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
Ok(())
@ -1998,8 +1972,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
},
}],
)
.await?;
);
}
Ok(())
@ -2118,8 +2091,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::PostEvents(vec![event.clone()]),
},
}],
)
.await?;
);
}
Ok(())
@ -2277,8 +2249,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeDmChannel(dmchannel.clone()),
},
}],
)
.await?;
);
}
Ok(())
@ -2298,8 +2269,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribePersonFeed(pubkey, anchor),
},
}],
)
.await?;
);
}
Ok(())
@ -2459,7 +2429,7 @@ impl Overlord {
},
}];
manager::engage_minion(url.to_owned(), jobs).await?;
manager::engage_minion(url.to_owned(), jobs);
}
}
@ -2504,7 +2474,7 @@ impl Overlord {
},
}];
manager::engage_minion(url.to_owned(), jobs).await?;
manager::engage_minion(url.to_owned(), jobs);
}
}
@ -2571,8 +2541,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeConfig,
},
}],
)
.await?;
);
}
Ok(())
@ -2618,8 +2587,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeDiscover(pubkeys.clone()),
},
}],
)
.await?;
);
}
Ok(())
@ -2642,8 +2610,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeInbox(now),
},
}],
)
.await?;
);
}
Ok(())
@ -2668,8 +2635,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeGiftwraps(after),
},
}],
)
.await?;
);
}
Ok(())
@ -2687,8 +2653,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeNip46,
},
}],
)
.await?;
);
}
Ok(())
@ -2736,8 +2701,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::TempSubscribeMetadata(vec![pubkey]),
},
}],
)
.await?;
);
}
// Mark in globals that we want to recheck their nip-05 when that metadata
@ -2775,8 +2739,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::TempSubscribeMetadata(pubkeys),
},
}],
)
.await?;
);
}
Ok(())
@ -3109,8 +3072,7 @@ impl Overlord {
detail: ToMinionPayloadDetail::SubscribeAugments(ids_hex),
},
}],
)
.await?;
);
}
Ok(())