Pass subscription (handle) to process_new_events

This commit is contained in:
Mike Dilger 2023-01-06 06:56:23 +13:00
parent 9fd668bb82
commit dbba43c87d
4 changed files with 20 additions and 12 deletions

View File

@ -35,7 +35,8 @@ pub struct Globals {
pub events: RwLock<HashMap<Id, Event>>,
/// Events coming in from relays that are not processed yet
pub incoming_events: RwLock<Vec<(Event, Url)>>,
/// stored with Url they came from and Subscription they came in on
pub incoming_events: RwLock<Vec<(Event, Url, Option<String>)>>,
/// All relationships between events
pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
@ -165,7 +166,7 @@ impl Globals {
let mut count = 0;
for event in events.iter() {
count += 1;
crate::process::process_new_event(event, false, None).await?;
crate::process::process_new_event(event, false, None, None).await?;
}
tracing::info!("Loaded {} desired events from the database", count);
}

View File

@ -31,7 +31,13 @@ impl Minion {
tracing::debug!("{}: {}: NEW EVENT", &self.url, handle);
// Try processing everything immediately
crate::process::process_new_event(&event, true, Some(self.url.clone())).await?;
crate::process::process_new_event(
&event,
true,
Some(self.url.clone()),
Some(handle),
)
.await?;
/*
if event.kind == EventKind::TextNote {
@ -40,7 +46,7 @@ impl Minion {
.incoming_events
.write()
.await
.push((*event, self.url.clone()));
.push((*event, self.url.clone(), handle));
} else {
// Process everything else immediately
crate::process::process_new_event(&event, true, Some(self.url.clone()))

View File

@ -121,7 +121,7 @@ impl Overlord {
};
// Process this metadata event to update people
crate::process::process_new_event(&e, false, None).await?;
crate::process::process_new_event(&e, false, None, None).await?;
}
}
@ -147,7 +147,7 @@ impl Overlord {
let mut count = 0;
for event in events.iter() {
count += 1;
crate::process::process_new_event(event, false, None).await?;
crate::process::process_new_event(event, false, None, None).await?;
}
tracing::info!("Loaded {} events from the database", count);
}
@ -492,9 +492,9 @@ impl Overlord {
GLOBALS.event_is_new.write().await.clear();
let _ = tokio::spawn(async move {
for (event, url) in GLOBALS.incoming_events.write().await.drain(..) {
let _ =
crate::process::process_new_event(&event, true, Some(url)).await;
for (event, url, sub) in GLOBALS.incoming_events.write().await.drain(..) {
let _ = crate::process::process_new_event(&event, true, Some(url), sub)
.await;
}
});
}
@ -774,7 +774,7 @@ impl Overlord {
}
// Process the message for ourself
crate::process::process_new_event(&event, false, None).await?;
crate::process::process_new_event(&event, false, None, None).await?;
Ok(())
}
@ -832,7 +832,7 @@ impl Overlord {
}
// Process the message for ourself
crate::process::process_new_event(&event, false, None).await?;
crate::process::process_new_event(&event, false, None, None).await?;
Ok(())
}
@ -897,7 +897,7 @@ impl Overlord {
}
// Process the message for ourself
crate::process::process_new_event(&event, false, None).await?;
crate::process::process_new_event(&event, false, None, None).await?;
Ok(())
}

View File

@ -12,6 +12,7 @@ pub async fn process_new_event(
event: &Event,
from_relay: bool,
seen_on: Option<Url>,
_subscription: Option<String>,
) -> Result<(), Error> {
// Save the event into the database
if from_relay {