Merge branch 'master' into tag-unique

This commit is contained in:
Michael Dilger 2023-01-15 14:35:57 +13:00 committed by GitHub
commit 6d1f516ad8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 242 additions and 104 deletions

3
Cargo.lock generated
View File

@ -1698,6 +1698,7 @@ dependencies = [
name = "gossip"
version = "0.3.1-unstable"
dependencies = [
"async-recursion",
"base64 0.20.0",
"dashmap",
"dirs",
@ -2352,7 +2353,7 @@ dependencies = [
[[package]]
name = "nostr-types"
version = "0.2.0-unstable"
source = "git+https://github.com/mikedilger/nostr-types#1dbae78600763d3ccee6b8e8dc7e724279fb800e"
source = "git+https://github.com/mikedilger/nostr-types#9ee82ea432fac7aab6c1d949c1941495a98eaeac"
dependencies = [
"aes",
"base64 0.13.1",

View File

@ -12,6 +12,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-recursion = "1.0"
base64 = "0.20"
dashmap = "5.4"
dirs = "4.0"

View File

@ -21,6 +21,7 @@ pub enum ToOverlordMessage {
PushFollow,
SaveRelays,
SaveSettings,
SetThreadFeed(Id),
Shutdown,
UnlockKey(String),
UpdateMetadata(PublicKeyHex),
@ -44,6 +45,7 @@ pub enum ToMinionPayload {
Shutdown,
SubscribeGeneralFeed,
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(Id),
SubscribeThreadFeed(IdHex, Vec<IdHex>),
TempSubscribeMetadata(PublicKeyHex),
UnsubscribeThreadFeed,
}

View File

@ -1,5 +1,6 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_types::{Id, Url};
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
@ -44,6 +45,28 @@ impl DbEventSeen {
}
*/
#[allow(dead_code)]
pub async fn get_relays_for_event(id: Id) -> Result<Vec<Url>, Error> {
let sql = "SELECT relay FROM event_seen WHERE event=?";
let relays: Result<Vec<Url>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.raw_bind_parameter(1, id.as_hex_string())?;
let mut rows = stmt.raw_query();
let mut relays: Vec<Url> = Vec::new();
while let Some(row) = rows.next()? {
let s: String = row.get(0)?;
relays.push(Url::new(&s));
}
Ok(relays)
})
.await?;
relays
}
pub async fn replace(event_seen: DbEventSeen) -> Result<(), Error> {
let sql = "REPLACE INTO event_seen (event, relay, when_seen) \
VALUES (?1, ?2, ?3)";

View File

@ -1,5 +1,6 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use async_recursion::async_recursion;
use dashmap::{DashMap, DashSet};
use nostr_types::{Event, Id};
use tokio::task;
@ -22,6 +23,7 @@ impl Events {
let _ = self.events.insert(event.id, event);
}
#[allow(dead_code)]
pub fn contains_key(&self, id: &Id) -> bool {
self.events.contains_key(id)
}
@ -59,6 +61,23 @@ impl Events {
}
}
#[allow(dead_code)]
#[async_recursion]
pub async fn get_highest_local_parent(&self, id: &Id) -> Result<Option<Id>, Error> {
if let Some(event) = self.get_local(*id).await? {
if let Some((parent_id, _opturl)) = event.replies_to() {
match self.get_highest_local_parent(&parent_id).await? {
Some(top_id) => Ok(Some(top_id)), // went higher
None => Ok(Some(*id)), // couldn't go higher, stay here
}
} else {
Ok(Some(*id)) // is a root
}
} else {
Ok(None) // not present locally
}
}
pub fn is_new(&self, id: &Id) -> bool {
self.new_events.contains(id)
}

View File

@ -1,9 +1,11 @@
use crate::comms::{ToMinionMessage, ToMinionPayload};
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_types::{Event, EventKind, Id, PublicKeyHex, Unixtime};
use parking_lot::RwLock;
use std::collections::HashSet;
use std::time::{Duration, Instant};
use tokio::task;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FeedKind {
@ -26,6 +28,8 @@ pub struct Feed {
// We track these to update subscriptions on them
my_event_ids: RwLock<Vec<Id>>,
followed_event_ids: RwLock<Vec<Id>>,
thread_parent: RwLock<Option<Id>>,
}
impl Feed {
@ -38,6 +42,7 @@ impl Feed {
last_computed: RwLock::new(Instant::now()),
my_event_ids: RwLock::new(Vec::new()),
followed_event_ids: RwLock::new(Vec::new()),
thread_parent: RwLock::new(None),
}
}
@ -46,26 +51,43 @@ impl Feed {
// because it won't have changed, but the relays will shower you with
// all those events again.
*self.current_feed_kind.write() = FeedKind::General;
*self.thread_parent.write() = None;
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
}
pub fn set_feed_to_replies(&self) {
*self.current_feed_kind.write() = FeedKind::Replies;
*self.thread_parent.write() = None;
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
}
pub fn set_feed_to_thread(&self, id: Id) {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::SubscribeThreadFeed(id),
});
*self.current_feed_kind.write() = FeedKind::Thread(id);
// Parent starts with the post itself
// Overlord will climb it, and recompute will climb it
*self.thread_parent.write() = Some(id);
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::SetThreadFeed(id));
}
pub fn set_feed_to_person(&self, pubkey: PublicKeyHex) {
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
let _ = GLOBALS.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::SubscribePersonFeed(pubkey.clone()),
});
*self.current_feed_kind.write() = FeedKind::Person(pubkey);
*self.thread_parent.write() = None;
}
pub fn get_feed_kind(&self) -> FeedKind {
@ -76,10 +98,14 @@ impl Feed {
let now = Instant::now();
if *self.last_computed.read() + Duration::from_millis(*self.interval_ms.read() as u64) < now
{
self.recompute();
*self.last_computed.write() = now;
let now = now;
task::spawn(async move {
if let Err(e) = GLOBALS.feed.recompute().await {
tracing::error!("{}", e);
}
*GLOBALS.feed.last_computed.write() = now;
});
}
self.general_feed.read().clone()
}
@ -87,39 +113,17 @@ impl Feed {
let now = Instant::now();
if *self.last_computed.read() + Duration::from_millis(*self.interval_ms.read() as u64) < now
{
self.recompute();
*self.last_computed.write() = now;
let now = now;
task::spawn(async move {
if let Err(e) = GLOBALS.feed.recompute().await {
tracing::error!("{}", e);
}
*GLOBALS.feed.last_computed.write() = now;
});
}
self.replies_feed.read().clone()
}
pub fn get_thread_parent(&self, id: Id) -> Id {
let mut event = match GLOBALS.events.get(&id) {
None => return id,
Some(e) => e,
};
// Try for root
if let Some((root, _)) = event.replies_to_root() {
if GLOBALS.events.contains_key(&root) {
return root;
}
}
// Climb parents as high as we can
while let Some((parent, _)) = event.replies_to() {
if let Some(e) = GLOBALS.events.get(&parent) {
event = e.to_owned();
} else {
break;
}
}
// The highest event id we have
event.id
}
pub fn get_person_feed(&self, person: PublicKeyHex) -> Vec<Id> {
let mut events: Vec<Event> = GLOBALS
.events
@ -147,8 +151,17 @@ impl Feed {
self.followed_event_ids.read().clone()
}
fn recompute(&self) {
let settings = GLOBALS.settings.blocking_read().clone();
pub fn get_thread_parent(&self) -> Option<Id> {
*self.thread_parent.read()
}
// Overlord climbs and sets this
pub fn set_thread_parent(&self, id: Id) {
*self.thread_parent.write() = Some(id);
}
pub async fn recompute(&self) -> Result<(), Error> {
let settings = GLOBALS.settings.read().await.clone();
*self.interval_ms.write() = settings.feed_recompute_interval_ms;
let events: Vec<Event> = GLOBALS
@ -159,12 +172,12 @@ impl Feed {
.collect();
let mut pubkeys = GLOBALS.people.get_followed_pubkeys();
if let Some(pubkey) = GLOBALS.signer.blocking_read().public_key() {
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
pubkeys.push(pubkey.into()); // add the user
}
// My event ids
if let Some(pubkey) = GLOBALS.signer.blocking_read().public_key() {
if let Some(pubkey) = GLOBALS.signer.read().await.public_key() {
*self.my_event_ids.write() = events
.iter()
.filter_map(|e| if e.pubkey == pubkey { Some(e.id) } else { None })
@ -187,9 +200,11 @@ impl Feed {
// Filter further for the general feed
let now = Unixtime::now().unwrap();
let dismissed = GLOBALS.dismissed.read().await.clone();
let mut fevents: Vec<Event> = events
.iter()
.filter(|e| !GLOBALS.dismissed.blocking_read().contains(&e.id))
.filter(|e| !dismissed.contains(&e.id))
.filter(|e| pubkeys.contains(&e.pubkey.into())) // something we follow
.filter(|e| e.created_at <= now)
.cloned()
@ -201,7 +216,7 @@ impl Feed {
let my_events: HashSet<Id> = self.my_event_ids.read().iter().copied().collect();
let mut revents: Vec<Event> = events
.iter()
.filter(|e| !GLOBALS.dismissed.blocking_read().contains(&e.id))
.filter(|e| !dismissed.contains(&e.id))
.filter(|e| {
// FIXME: maybe try replies_to_ancestors to go deeper
if let Some((id, _)) = e.replies_to() {
@ -215,5 +230,17 @@ impl Feed {
.collect();
revents.sort_by(|a, b| b.created_at.cmp(&a.created_at));
*self.replies_feed.write() = revents.iter().map(|e| e.id).collect();
// Potentially update thread parent to a higher parent
let maybe_tp = *self.thread_parent.read();
if let Some(tp) = maybe_tp {
if let Some(new_tp) = GLOBALS.events.get_highest_local_parent(&tp).await? {
if new_tp != tp {
*self.thread_parent.write() = Some(new_tp);
}
}
}
Ok(())
}
}

View File

@ -38,7 +38,8 @@ impl Minion {
.subscriptions
.get_handle_by_id(&subid.0)
.unwrap_or_else(|| "_".to_owned());
tracing::debug!("{}: {}: NEW EVENT", &self.url, handle);
tracing::debug!("{}: {}: New Event: {:?}", &self.url, handle, event.kind);
// Events that come in after EOSE on the general feed bump the last_general_eose
// timestamp for that relay, so we don't query before them next time we run.

View File

@ -9,8 +9,7 @@ use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use http::Uri;
use nostr_types::{
ClientMessage, EventKind, Filter, Id, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime,
Url,
ClientMessage, EventKind, Filter, IdHex, PublicKeyHex, RelayInformationDocument, Unixtime, Url,
};
use std::time::Duration;
use subscription::Subscriptions;
@ -254,12 +253,15 @@ impl Minion {
ToMinionPayload::SubscribePersonFeed(pubkeyhex) => {
self.subscribe_person_feed(pubkeyhex).await?;
}
ToMinionPayload::SubscribeThreadFeed(id) => {
self.subscribe_thread_feed(id).await?;
ToMinionPayload::SubscribeThreadFeed(main, parents) => {
self.subscribe_thread_feed(main, parents).await?;
}
ToMinionPayload::TempSubscribeMetadata(pubkeyhex) => {
self.temp_subscribe_metadata(pubkeyhex).await?;
}
ToMinionPayload::UnsubscribeThreadFeed => {
self.unsubscribe_thread_feed().await?;
}
}
Ok(true)
}
@ -327,6 +329,7 @@ impl Minion {
authors: vec![pubkey.into()],
kinds: vec![
EventKind::TextNote,
EventKind::Repost,
EventKind::Reaction,
EventKind::EventDeletion,
],
@ -335,8 +338,10 @@ impl Minion {
});
// Any mentions of me
// (but not in peoples contact lists, for example)
filters.push(Filter {
p: vec![pubkey.into()],
kinds: vec![EventKind::TextNote, EventKind::Repost, EventKind::Reaction],
since: Some(special_since),
..Default::default()
});
@ -359,6 +364,7 @@ impl Minion {
authors: followed_pubkeys.clone(),
kinds: vec![
EventKind::TextNote,
EventKind::Repost,
EventKind::Reaction,
EventKind::EventDeletion,
],
@ -458,56 +464,49 @@ impl Minion {
Ok(())
}
async fn subscribe_thread_feed(&mut self, id: Id) -> Result<(), Error> {
async fn subscribe_thread_feed(
&mut self,
main: IdHex,
vec_ids: Vec<IdHex>,
) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed
let mut filters: Vec<Filter> = Vec::new();
let feed_chunk = GLOBALS.settings.read().await.feed_chunk;
// This post and ancestors
let mut ids: Vec<IdHex> = vec![id.into()];
// FIXME - We could have this precalculated like GLOBALS.relationships
// in reverse. It would be potentially more complete having
// iteratively climbed the chain.
if let Some(event) = GLOBALS.events.get(&id) {
for (id, url) in &event.replies_to_ancestors() {
if let Some(url) = url {
if url == &self.url {
ids.push((*id).into());
}
} else {
ids.push((*id).into());
}
}
if !vec_ids.is_empty() {
// Get ancestors we know of so far
filters.push(Filter {
ids: vec_ids.clone(),
..Default::default()
});
// Get reactions to ancestors, but not replies
filters.push(Filter {
e: vec_ids,
kinds: vec![EventKind::Reaction, EventKind::EventDeletion],
..Default::default()
});
}
// Get ancestors we know of
// Get replies to main event
filters.push(Filter {
ids: ids.clone(),
..Default::default()
});
// Replies and reactions to ancestors
filters.push(Filter {
e: ids,
e: vec![main],
kinds: vec![
EventKind::TextNote,
EventKind::Repost,
EventKind::Reaction,
EventKind::EventDeletion,
],
since: Some(Unixtime::now().unwrap() - Duration::from_secs(feed_chunk)),
..Default::default()
});
// Metadata for people in those events
// TBD
self.subscribe(filters, "thread_feed").await?;
if filters.is_empty() {
self.unsubscribe("thread_feed").await?;
} else {
self.subscribe(filters, "thread_feed").await?;
}
Ok(())
}
async fn unsubscribe_thread_feed(&mut self) -> Result<(), Error> {
self.unsubscribe("thread_feed").await?;
Ok(())
}

View File

@ -2,13 +2,13 @@ mod minion;
mod relay_picker;
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
use crate::db::{DbEvent, DbPersonRelay, DbRelay};
use crate::db::{DbEvent, DbEventSeen, DbPersonRelay, DbRelay};
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::people::People;
use minion::Minion;
use nostr_types::{
Event, EventKind, Id, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url,
Event, EventKind, Id, IdHex, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url,
};
use relay_picker::{BestRelay, RelayPicker};
use std::collections::HashMap;
@ -485,6 +485,9 @@ impl Overlord {
GLOBALS.settings.read().await.save().await?;
tracing::debug!("Settings saved.");
}
ToOverlordMessage::SetThreadFeed(id) => {
self.set_thread_feed(id).await?;
}
ToOverlordMessage::Shutdown => {
tracing::info!("Overlord shutting down");
return Ok(false);
@ -906,4 +909,65 @@ impl Overlord {
Ok(())
}
async fn set_thread_feed(&mut self, id: Id) -> Result<(), Error> {
// Cancel current thread subscriptions, if any
let _ = self.to_minions.send(ToMinionMessage {
target: "all".to_string(),
payload: ToMinionPayload::UnsubscribeThreadFeed,
});
// Climb the tree as high as we can, and if there are higher events,
// we will ask for those in the initial subscription
let highest_parent_id = match GLOBALS.events.get_highest_local_parent(&id).await? {
Some(id) => id,
None => return Ok(()), // can't do anything
};
// Set that in the feed
GLOBALS.feed.set_thread_parent(highest_parent_id);
// get that highest event
let highest_parent = match GLOBALS.events.get_local(highest_parent_id).await? {
Some(event) => event,
None => return Ok(()), // can't do anything
};
// strictly speaking, we are only certainly missing the next parent up, we might have
// parents further above. But this isn't asking for much extra.
let mut missing_ancestors: Vec<(Id, Option<Url>)> = highest_parent.replies_to_ancestors();
let missing_ids: Vec<Id> = missing_ancestors.iter().map(|(id, _)| *id).collect();
let missing_ids_hex: Vec<IdHex> = missing_ids.iter().map(|id| (*id).into()).collect();
tracing::debug!("Seeking ancestors {:?}", missing_ids_hex);
// Determine which relays to subscribe on
// (everywhere the main event was seen, and all relays suggested in the 'e' tags)
let mut relay_urls = DbEventSeen::get_relays_for_event(id).await?;
let suggested_urls: Vec<Url> = missing_ancestors
.drain(..)
.filter_map(|(_, opturl)| opturl)
.collect();
relay_urls.extend(suggested_urls);
relay_urls = relay_urls
.drain(..)
.filter(|u| u.is_valid_relay_url())
.collect();
relay_urls.sort();
relay_urls.dedup();
for url in relay_urls.iter() {
// Start minion if needed
if !GLOBALS.relays_watching.read().await.contains(url) {
self.start_minion(url.inner().to_string()).await?;
}
// Subscribe
let _ = self.to_minions.send(ToMinionMessage {
target: url.inner().to_string(),
payload: ToMinionPayload::SubscribeThreadFeed(id.into(), missing_ids_hex.clone()),
});
}
Ok(())
}
}

View File

@ -107,11 +107,11 @@ impl People {
if doit {
// Process fresh metadata
person.name = metadata.get("name");
person.about = metadata.get("about");
person.picture = metadata.get("picture");
if person.dns_id != metadata.get("nip05") {
person.dns_id = metadata.get("nip05");
person.name = metadata.name;
person.about = metadata.about;
person.picture = metadata.picture;
if person.dns_id != metadata.nip05 {
person.dns_id = metadata.nip05;
person.dns_id_valid = 0; // changed, so reset to invalid
person.dns_id_last_checked = None; // we haven't checked this one yet
}

View File

@ -6,7 +6,7 @@ use crate::ui::widgets::{CopyButton, LikeButton, ReplyButton};
use eframe::egui;
use egui::{
Align, Color32, Context, Frame, Image, Layout, RichText, ScrollArea, SelectableLabel, Sense,
Separator, Stroke, TextEdit, Ui, Vec2,
Separator, Stroke, TextEdit, TextStyle, Ui, Vec2,
};
use linkify::{LinkFinder, LinkKind};
use nostr_types::{Event, EventKind, Id, IdHex, PublicKeyHex, Tag};
@ -89,9 +89,10 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
let feed = GLOBALS.feed.get_replies();
render_a_feed(app, ctx, frame, ui, feed, true);
}
FeedKind::Thread(id) => {
let parent = GLOBALS.feed.get_thread_parent(id);
render_a_feed(app, ctx, frame, ui, vec![parent], true);
FeedKind::Thread(_id) => {
if let Some(parent) = GLOBALS.feed.get_thread_parent() {
render_a_feed(app, ctx, frame, ui, vec![parent], true);
}
}
FeedKind::Person(pubkeyhex) => {
let feed = GLOBALS.feed.get_person_feed(pubkeyhex);
@ -473,17 +474,17 @@ fn render_post_actual(
ui.horizontal(|ui| {
GossipUi::render_person_name_line(ui, maybe_person.as_ref());
if app.page == Page::FeedGeneral || app.page == Page::FeedPerson {
if let Some((irt, _)) = event.replies_to() {
ui.add_space(8.0);
if let Some((irt, _)) = event.replies_to() {
ui.add_space(8.0);
let idhex: IdHex = irt.into();
let nam = format!("replies to #{}", GossipUi::hex_id_short(&idhex));
if ui.link(&nam).clicked() {
GLOBALS.feed.set_feed_to_thread(irt);
app.page = Page::FeedThread;
};
}
ui.style_mut().override_text_style = Some(TextStyle::Small);
let idhex: IdHex = irt.into();
let nam = format!("replies to #{}", GossipUi::hex_id_short(&idhex));
if ui.link(&nam).clicked() {
GLOBALS.feed.set_feed_to_thread(irt);
app.page = Page::FeedThread;
};
ui.reset_style();
}
ui.add_space(8.0);