New subscription work, not yet in use

This commit is contained in:
Mike Dilger 2023-01-02 11:14:54 +13:00
parent 5e1b234a2d
commit 56dac88404
2 changed files with 158 additions and 8 deletions

View File

@ -19,9 +19,6 @@ impl Minion {
let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?; let v: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.get_events(v).await?; self.get_events(v).await?;
} }
"follow_event_reactions" => {
warn!("{}: follow event reactions unimplemented", &self.url);
}
"post_event" => { "post_event" => {
let event: Event = serde_json::from_str(&bus_message.json_payload)?; let event: Event = serde_json::from_str(&bus_message.json_payload)?;
let msg = ClientMessage::Event(Box::new(event)); let msg = ClientMessage::Event(Box::new(event));
@ -30,6 +27,41 @@ impl Minion {
ws_sink.send(WsMessage::Text(wire)).await?; ws_sink.send(WsMessage::Text(wire)).await?;
info!("Posted event to {}", &self.url); info!("Posted event to {}", &self.url);
} }
//
// NEW handling
//
"subscribe_ephemeral_for_all" => {
let data: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_ephemeral_for_all(data).await?;
}
"subscribe_posts_by_me" => {
let data: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_posts_by_me(data).await?;
}
"subscribe_posts_by_followed" => {
let data: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_posts_by_followed(data).await?;
}
"subscribe_ancestors" => {
let data: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_ancestors(data).await?;
}
"subscribe_my_descendants" => {
let data: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_my_descendants(data).await?;
}
"subscribe_follower_descendants" => {
let data: Vec<IdHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_follower_descendants(data).await?;
}
"subscribe_my_mentions" => {
let data: PublicKeyHex = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_my_mentions(data).await?;
}
"subscribe_follower_mentions" => {
let data: Vec<PublicKeyHex> = serde_json::from_str(&bus_message.json_payload)?;
self.subscribe_follower_mentions(data).await?;
}
_ => { _ => {
warn!( warn!(
"{} Unrecognized bus message kind received by minion: {}", "{} Unrecognized bus message kind received by minion: {}",

View File

@ -12,6 +12,7 @@ use http::Uri;
use nostr_types::{ use nostr_types::{
EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url, EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url,
}; };
use std::time::Duration;
use subscription::Subscriptions; use subscription::Subscriptions;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::select; use tokio::select;
@ -386,10 +387,127 @@ impl Minion {
Ok(()) Ok(())
} }
/* async fn subscribe_ephemeral_for_all(
async fn follow_event_reactions(&mut self, _ids: Vec<IdHex>) -> Result<(), Error> { &mut self,
// Create or extend the "reactions" subscription people: Vec<PublicKeyHex>,
unimplemented!() ) -> Result<(), Error> {
let filter = Filter {
authors: people,
kinds: vec![EventKind::Metadata, EventKind::ContactList],
// No since. The list of people changes so we can't even use 'last checked' here.
..Default::default()
};
self.subscribe(vec![filter], "ephemeral_for_all").await
}
async fn subscribe_posts_by_me(&mut self, me: PublicKeyHex) -> Result<(), Error> {
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let filter = Filter {
authors: vec![me],
// leave ALL kinds
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
};
self.subscribe(vec![filter], "posts_by_me").await
}
async fn subscribe_posts_by_followed(
&mut self,
followed: Vec<PublicKeyHex>,
) -> Result<(), Error> {
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let filter = Filter {
authors: followed,
// leave ALL kinds
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
};
self.subscribe(vec![filter], "posts_by_followed").await
}
async fn subscribe_ancestors(&mut self, ancestors: Vec<IdHex>) -> Result<(), Error> {
let filter = Filter {
ids: ancestors,
// leave ALL kinds
// no since filter
..Default::default()
};
self.subscribe(vec![filter], "ancestors").await
}
async fn subscribe_my_descendants(&mut self, my_posts: Vec<IdHex>) -> Result<(), Error> {
let filter = Filter {
e: my_posts,
// leave ALL kinds
// no since filter
..Default::default()
};
self.subscribe(vec![filter], "my_descendants").await
}
async fn subscribe_follower_descendants(
&mut self,
follower_posts: Vec<IdHex>,
) -> Result<(), Error> {
let filter = Filter {
e: follower_posts,
// leave ALL kinds
// no since filter
..Default::default()
};
self.subscribe(vec![filter], "follower_descendants").await
}
async fn subscribe_my_mentions(&mut self, me: PublicKeyHex) -> Result<(), Error> {
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let filter = Filter {
p: vec![me],
// leave ALL kinds
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
};
self.subscribe(vec![filter], "my_mentions").await
}
async fn subscribe_follower_mentions(
&mut self,
followers: Vec<PublicKeyHex>,
) -> Result<(), Error> {
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
let filter = Filter {
p: followers,
// leave ALL kinds
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
};
self.subscribe(vec![filter], "follower_mentions").await
}
async fn subscribe(&mut self, filters: Vec<Filter>, handle: &str) -> Result<(), Error> {
let req_message = if self.subscriptions.has(handle) {
let sub = self.subscriptions.get_mut(handle).unwrap();
*sub.get_mut() = filters;
sub.req_message()
} else {
self.subscriptions.add(handle, filters);
self.subscriptions.get(handle).unwrap().req_message()
};
let wire = serde_json::to_string(&req_message)?;
let websocket_sink = self.sink.as_mut().unwrap();
websocket_sink.send(WsMessage::Text(wire.clone())).await?;
trace!("{}: Sent {}", &self.url, &wire);
Ok(())
} }
*/
} }