Merge branch 'nip65'

This commit is contained in:
Mike Dilger 2023-02-05 19:25:25 +13:00
commit 376dc968a0
13 changed files with 607 additions and 167 deletions

View File

@ -4,6 +4,7 @@ use nostr_types::{Event, Id, IdHex, Metadata, PublicKey, PublicKeyHex, RelayUrl,
#[derive(Debug, Clone)]
pub enum ToOverlordMessage {
AddRelay(RelayUrl),
AdvertiseRelayList,
DeletePub,
DeletePriv(String),
FollowPubkeyAndRelay(String, RelayUrl),
@ -25,7 +26,8 @@ pub enum ToOverlordMessage {
RefreshFollowedMetadata,
RankRelay(RelayUrl, u8),
SaveSettings,
SetRelayPost(RelayUrl, bool),
SetRelayReadWrite(RelayUrl, bool, bool),
SetRelayAdvertise(RelayUrl, bool),
SetThreadFeed(Id, Id, Option<Id>),
Shutdown,
UnlockKey(String),
@ -49,6 +51,7 @@ pub enum ToMinionPayload {
PullFollowing,
Shutdown,
SubscribeGeneralFeed(Vec<PublicKeyHex>),
SubscribeMentions,
SubscribePersonFeed(PublicKeyHex),
SubscribeThreadFeed(IdHex, Vec<IdHex>),
TempSubscribeMetadata(Vec<PublicKeyHex>),

View File

@ -75,6 +75,29 @@ impl DbEvent {
Ok(output?.drain(..).next())
}
pub async fn fetch_relay_lists() -> Result<Vec<Event>, Error> {
// FIXME, only get the last per pubkey
let sql = "SELECT raw FROM event WHERE event.kind=10002";
let output: Result<Vec<Event>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
let mut rows = stmt.raw_query();
let mut events: Vec<Event> = Vec::new();
while let Some(row) = rows.next()? {
let raw: String = row.get(0)?;
let event: Event = serde_json::from_str(&raw)?;
events.push(event);
}
Ok(events)
})
.await?;
output
}
pub async fn fetch_reply_related(since: i64) -> Result<Vec<DbEvent>, Error> {
let public_key: PublicKeyHex = match GLOBALS.signer.public_key() {
None => return Ok(vec![]),

View File

@ -103,7 +103,7 @@ macro_rules! apply_sql {
}
fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
let current_version = 20;
let current_version = 21;
if version > current_version {
panic!(
"Database version {} is newer than this binary which expects version {}.",
@ -133,6 +133,7 @@ fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
apply_sql!(db, version, 18, "schema18.sql");
apply_sql!(db, version, 19, "schema19.sql");
apply_sql!(db, version, 20, "schema20.sql");
apply_sql!(db, version, 21, "schema21.sql");
tracing::info!("Database is at version {}", version);
Ok(())
}

View File

@ -13,6 +13,8 @@ pub struct DbPersonRelay {
pub last_suggested_nip23: Option<u64>,
pub last_suggested_nip05: Option<u64>,
pub last_suggested_bytag: Option<u64>,
pub read: bool,
pub write: bool,
}
impl DbPersonRelay {
@ -25,10 +27,12 @@ impl DbPersonRelay {
let sql = format!(
"SELECT person, relay, person_relay.last_fetched, \
last_suggested_kind2, last_suggested_kind3, last_suggested_nip23, \
last_suggested_nip05, last_suggested_bytag \
last_suggested_nip05, last_suggested_bytag, \
person_relay.read, person_relay.write \
FROM person_relay \
INNER JOIN relay ON person_relay.relay=relay.url \
WHERE person IN ({}) ORDER BY person, relay.rank DESC, \
WHERE person IN ({}) ORDER BY person, \
person_relay.write DESC, relay.rank DESC, \
last_suggested_nip23 DESC, last_suggested_kind3 DESC, \
last_suggested_nip05 DESC, last_suggested_kind2 DESC, \
last_fetched DESC, last_suggested_bytag DESC",
@ -57,6 +61,8 @@ impl DbPersonRelay {
last_suggested_nip23: row.get(5)?,
last_suggested_nip05: row.get(6)?,
last_suggested_bytag: row.get(7)?,
read: row.get(8)?,
write: row.get(9)?,
});
}
}
@ -71,8 +77,8 @@ impl DbPersonRelay {
pub async fn insert(person_relay: DbPersonRelay) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO person_relay (person, relay, last_fetched, \
last_suggested_kind2, last_suggested_kind3, last_suggested_nip23, \
last_suggested_nip05, last_suggested_bytag) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
last_suggested_nip05, last_suggested_bytag, read, write) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
@ -88,6 +94,8 @@ impl DbPersonRelay {
&person_relay.last_suggested_nip23,
&person_relay.last_suggested_nip05,
&person_relay.last_suggested_bytag,
&person_relay.read,
&person_relay.write,
))?;
Ok::<(), Error>(())
})
@ -225,11 +233,76 @@ impl DbPersonRelay {
Ok(())
}
pub async fn set_relay_list(
person: PublicKeyHex,
read_relays: Vec<RelayUrl>,
write_relays: Vec<RelayUrl>,
) -> Result<(), Error> {
// Clear the current ones
let sql1 = "UPDATE person_relay SET read=0, write=0 WHERE person=?";
// Set the reads
let sql2 = format!(
"UPDATE person_relay SET read=1 WHERE person=? AND relay IN ({})",
repeat_vars(read_relays.len())
);
let mut params2: Vec<String> = vec![person.to_string()];
for relay in read_relays.iter() {
params2.push(relay.to_string());
}
// Set the writes
let sql3 = format!(
"UPDATE person_relay SET write=1 WHERE person=? AND relay IN ({})",
repeat_vars(write_relays.len())
);
let mut params3: Vec<String> = vec![person.to_string()];
for relay in write_relays.iter() {
params3.push(relay.to_string());
}
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let inner = || -> Result<(), Error> {
let mut stmt = db.prepare("BEGIN TRANSACTION")?;
stmt.execute(())?;
let mut stmt = db.prepare(sql1)?;
stmt.execute((person.as_str(),))?;
let mut stmt = db.prepare(&sql2)?;
stmt.execute(rusqlite::params_from_iter(params2))?;
let mut stmt = db.prepare(&sql3)?;
stmt.execute(rusqlite::params_from_iter(params3))?;
let mut stmt = db.prepare("COMMIT TRANSACTION")?;
stmt.execute(())?;
Ok(())
};
if let Err(e) = inner() {
tracing::error!("{}", e);
let mut stmt = db.prepare("ROLLBACK TRANSACTION")?;
stmt.execute(())?;
}
Ok::<(), Error>(())
})
.await??;
Ok(())
}
/// This returns the best relays for the person along with a score, in order of score
pub async fn get_best_relays(pubkey: PublicKeyHex) -> Result<Vec<(RelayUrl, u64)>, Error> {
let sql = "SELECT person, relay, last_suggested_nip23, last_suggested_kind3, \
last_suggested_nip05, last_fetched, last_suggested_kind2, \
last_suggested_bytag \
last_suggested_bytag, read, write \
FROM person_relay WHERE person=?";
let ranked_relays: Result<Vec<(RelayUrl, u64)>, Error> = spawn_blocking(move || {
@ -253,6 +326,8 @@ impl DbPersonRelay {
last_suggested_nip23: row.get(5)?,
last_suggested_nip05: row.get(6)?,
last_suggested_bytag: row.get(7)?,
read: row.get(8)?,
write: row.get(9)?,
};
dbprs.push(dbpr);
}
@ -268,9 +343,14 @@ impl DbPersonRelay {
// This ranks the relays
pub fn rank(mut dbprs: Vec<DbPersonRelay>) -> Vec<(RelayUrl, u64)> {
// This is the ranking we are using. There might be reasons
// for ranking differently:
// nip23 (score=10) > kind3 (score=8) > nip05 (score=6) > fetched (score=4)
// > kind2 (score=2) > bytag (score=1)
// for ranking differently.
// write (score=20) [ they claim (to us) ]
// nip23 (score=10) [ they say (to themselves) ]
// kind3 tag (score=5) [ we say ]
// nip05 (score=4) [ they claim, unsigned ]
// fetched (score=3) [ we found ]
// kind2 (score=2) [ they mention ]
// bytag (score=1) [ someone else mentions ]
let now = Unixtime::now().unwrap().0 as u64;
let mut output: Vec<(RelayUrl, u64)> = Vec::new();
@ -283,30 +363,43 @@ impl DbPersonRelay {
for dbpr in dbprs.drain(..) {
let mut score = 0;
// nip23 is an author-signed explicit claim of using this relay
// 'write' is an author-signed explicit claim of where they write
if dbpr.write {
score += 20;
}
// nip23 is an author-signed statement to themselves
// kind-3 content also substitutes for nip23, this comes from either.
if let Some(when) = dbpr.last_suggested_nip23 {
score += scorefn(when, 60 * 60 * 24 * 30, 15);
}
// kind3 is a temporary (not NIPped) author-signed explicit claim of using this relay
if let Some(when) = dbpr.last_suggested_kind3 {
score += scorefn(when, 60 * 60 * 24 * 30, 15);
}
// kind2 is an author-signed recommended relay list
if let Some(when) = dbpr.last_suggested_kind2 {
score += scorefn(when, 60 * 60 * 24 * 30, 10);
}
// nip05 is an unsigned dns claim of using this relay
if let Some(when) = dbpr.last_suggested_nip05 {
score += scorefn(when, 60 * 60 * 24 * 15, 6);
// kind3 is our memory of where we are following someone
if let Some(when) = dbpr.last_suggested_kind3 {
score += scorefn(when, 60 * 60 * 24 * 30, 7);
}
// nip05 is an unsigned dns-based author claim of using this relay
if let Some(when) = dbpr.last_suggested_nip05 {
score += scorefn(when, 60 * 60 * 24 * 15, 4);
}
// last_fetched is gossip verified happened-to-work-before
if let Some(when) = dbpr.last_fetched {
score += scorefn(when, 60 * 60 * 24 * 3, 6);
score += scorefn(when, 60 * 60 * 24 * 3, 3);
}
// kind2 is an author-signed relay recommendation
if let Some(when) = dbpr.last_suggested_kind2 {
score += scorefn(when, 60 * 60 * 24 * 30, 2);
}
// last_suggested_bytag is an anybody-signed suggestion
if let Some(when) = dbpr.last_suggested_bytag {
score += scorefn(when, 60 * 60 * 24 * 2, 1);
}
output.push((dbpr.relay, score));
}
@ -314,22 +407,6 @@ impl DbPersonRelay {
output
}
/*
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM person_relay WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
*/
}
fn repeat_vars(count: usize) -> String {

View File

@ -11,7 +11,9 @@ pub struct DbRelay {
pub rank: u64,
pub last_connected_at: Option<u64>,
pub last_general_eose_at: Option<u64>,
pub post: bool,
pub read: bool,
pub write: bool,
pub advertise: bool,
}
impl DbRelay {
@ -23,7 +25,9 @@ impl DbRelay {
rank: 3,
last_connected_at: None,
last_general_eose_at: None,
post: false,
read: false,
write: false,
advertise: false,
}
}
@ -41,7 +45,7 @@ impl DbRelay {
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbRelay>, Error> {
let sql = "SELECT url, success_count, failure_count, rank, last_connected_at, \
last_general_eose_at, post FROM relay"
last_general_eose_at, read, write, advertise FROM relay"
.to_owned();
let sql = match criteria {
None => sql,
@ -56,7 +60,6 @@ impl DbRelay {
let mut rows = stmt.query([])?;
let mut output: Vec<DbRelay> = Vec::new();
while let Some(row) = rows.next()? {
let postint: u32 = row.get(6)?;
let s: String = row.get(0)?;
// just skip over invalid relay URLs
if let Ok(url) = RelayUrl::try_from_str(&s) {
@ -67,7 +70,9 @@ impl DbRelay {
rank: row.get(3)?,
last_connected_at: row.get(4)?,
last_general_eose_at: row.get(5)?,
post: (postint > 0),
read: row.get(6)?,
write: row.get(7)?,
advertise: row.get(8)?,
});
}
}
@ -90,15 +95,14 @@ impl DbRelay {
pub async fn insert(relay: DbRelay) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, \
last_connected_at, last_general_eose_at, post) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
last_connected_at, last_general_eose_at, read, write, advertise) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
let postint = i32::from(relay.post);
stmt.execute((
&relay.url.0,
&relay.success_count,
@ -106,7 +110,9 @@ impl DbRelay {
&relay.rank,
&relay.last_connected_at,
&relay.last_general_eose_at,
&postint,
&relay.read,
&relay.write,
&relay.advertise,
))?;
Ok::<(), Error>(())
})
@ -117,21 +123,22 @@ impl DbRelay {
pub async fn update(relay: DbRelay) -> Result<(), Error> {
let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=?, \
last_connected_at=?, last_general_eose_at=?, post=? WHERE url=?";
last_connected_at=?, last_general_eose_at=?, read=?, write=?, advertise=? WHERE url=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
let postint = i32::from(relay.post);
stmt.execute((
&relay.success_count,
&relay.failure_count,
&relay.rank,
&relay.last_connected_at,
&relay.last_general_eose_at,
&postint,
&relay.read,
&relay.write,
&relay.advertise,
&relay.url.0,
))?;
Ok::<(), Error>(())
@ -162,13 +169,31 @@ impl DbRelay {
Ok(())
}
pub async fn update_post(url: RelayUrl, post: bool) -> Result<(), Error> {
let sql = "UPDATE relay SET post = ? WHERE url = ?";
pub async fn update_read_and_write(
url: RelayUrl,
read: bool,
write: bool,
) -> Result<(), Error> {
let sql = "UPDATE relay SET read = ?, write = ? WHERE url = ?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&post, &url.0))?;
stmt.execute((&read, &write, &url.0))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
pub async fn update_advertise(url: RelayUrl, advertise: bool) -> Result<(), Error> {
let sql = "UPDATE relay SET advertise = ? WHERE url = ?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&advertise, &url.0))?;
Ok::<(), Error>(())
})
.await??;
@ -241,10 +266,10 @@ impl DbRelay {
}
pub async fn recommended_relay_for_reply(reply_to: Id) -> Result<Option<RelayUrl>, Error> {
// Try to find a relay where the event was seen AND that I post to which
// Try to find a relay where the event was seen AND that I write to which
// has a rank>1
let sql = "SELECT url FROM relay INNER JOIN event_seen ON relay.url=event_seen.relay \
WHERE event_seen.event=? AND relay.post=1 AND relay.rank>1";
WHERE event_seen.event=? AND relay.write=1 AND relay.rank>1";
let output: Option<RelayUrl> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();

8
src/db/schema21.sql Normal file
View File

@ -0,0 +1,8 @@
ALTER TABLE person_relay ADD COLUMN read INTEGER NOT NULL DEFAULT 0;
ALTER TABLE person_relay ADD COLUMN write INTEGER NOT NULL DEFAULT 0;
ALTER TABLE person RENAME COLUMN contact_list_last_received TO relay_list_last_received;
ALTER TABLE person ADD COLUMN relay_list_created_at INTEGER NOT NULL DEFAULT 0;
ALTER TABLE relay RENAME COLUMN post TO write;
ALTER TABLE relay ADD COLUMN read INTEGER NOT NULL DEFAULT 0;
ALTER TABLE relay ADD COLUMN advertise INTEGER NOT NULL DEFAULT 0;

View File

@ -219,7 +219,7 @@ impl Globals {
.relays
.blocking_read()
.iter()
.filter(|(_, r)| r.post)
.filter(|(_, r)| r.write)
{
profile.relays.push(url.to_unchecked_url())
}

View File

@ -296,6 +296,9 @@ impl Minion {
ToMinionPayload::SubscribeGeneralFeed(pubkeys) => {
self.subscribe_general_feed(pubkeys).await?;
}
ToMinionPayload::SubscribeMentions => {
self.subscribe_mentions().await?;
}
ToMinionPayload::SubscribePersonFeed(pubkeyhex) => {
self.subscribe_person_feed(pubkeyhex).await?;
}
@ -329,17 +332,17 @@ impl Minion {
Ok(())
}
// Subscribe to the user's followers on the relays they write to
async fn subscribe_general_feed(
&mut self,
followed_pubkeys: Vec<PublicKeyHex>,
) -> Result<(), Error> {
let mut filters: Vec<Filter> = Vec::new();
let (overlap, feed_chunk, replies_chunk) = {
let (overlap, feed_chunk) = {
let settings = GLOBALS.settings.read().await.clone();
(
Duration::from_secs(settings.overlap),
Duration::from_secs(settings.feed_chunk),
Duration::from_secs(settings.replies_chunk),
)
};
@ -350,30 +353,25 @@ impl Minion {
);
// Compute how far to look back
let (feed_since, replies_since) = {
let feed_since = {
// Start with where we left off, the time we last got something from
// this relay.
let mut replies_since: Unixtime = match self.dbrelay.last_general_eose_at {
let mut feed_since: Unixtime = match self.dbrelay.last_general_eose_at {
Some(u) => Unixtime(u as i64),
None => Unixtime(0),
};
// Subtract overlap to avoid gaps due to clock sync and event
// propagation delay
replies_since = replies_since - overlap;
feed_since = feed_since - overlap;
// Some relays don't like dates before 1970. Hell, we don't need anything before 2020:
if replies_since.0 < 1577836800 {
replies_since.0 = 1577836800;
if feed_since.0 < 1577836800 {
feed_since.0 = 1577836800;
}
let one_replieschunk_ago = Unixtime::now().unwrap() - replies_chunk;
let replies_since = replies_since.max(one_replieschunk_ago);
let one_feedchunk_ago = Unixtime::now().unwrap() - feed_chunk;
let feed_since = replies_since.max(one_feedchunk_ago);
(feed_since, replies_since)
feed_since.max(one_feedchunk_ago)
};
let enable_reactions = GLOBALS.settings.read().await.reactions;
@ -389,53 +387,24 @@ impl Minion {
}
// feed related by me
// FIXME copy this to listening to my write relays
let pkh: PublicKeyHex = pubkey.into();
filters.push(Filter {
authors: vec![pkh.clone().into()],
authors: vec![pkh.into()],
kinds,
since: Some(feed_since),
..Default::default()
});
// Any mentions of me
// (but not in peoples contact lists, for example)
let mut kinds = vec![EventKind::TextNote];
if enable_reactions {
kinds.push(EventKind::Reaction);
}
if enable_reposts {
kinds.push(EventKind::Repost);
}
filters.push(Filter {
p: vec![pkh.clone()],
kinds,
since: Some(replies_since),
..Default::default()
});
// Listen for my metadata and similar kinds of posts
filters.push(Filter {
authors: vec![pkh.into()],
kinds: vec![
EventKind::Metadata,
EventKind::RecommendRelay,
EventKind::ContactList,
EventKind::RelaysListNip23,
],
since: Some(replies_since),
..Default::default()
});
}
if !followed_pubkeys.is_empty() {
let mut kinds = vec![
EventKind::TextNote,
EventKind::Repost,
EventKind::EventDeletion,
];
let mut kinds = vec![EventKind::TextNote, EventKind::EventDeletion];
if enable_reactions {
kinds.push(EventKind::Reaction);
}
if enable_reposts {
kinds.push(EventKind::Repost);
}
let pkp: Vec<PublicKeyHexPrefix> = followed_pubkeys
.iter()
@ -450,41 +419,35 @@ impl Minion {
..Default::default()
});
// Try to find where people post.
// Subscribe to kind-10002 `RelayList`s to see where people post.
// Subscribe to ContactLists so we can look at the contents and
// divine relays people write to (if using a client that does that).
//
// BUT ONLY for people whose contact list has not been received in the last
// 24 hours.
let contact_list_keys: Vec<PublicKeyHexPrefix> = GLOBALS
// divine relays people write to (if using a client that does that).
// BUT ONLY for people where this kind of data hasn't been received
// in the last 8 hours (so we don't do it every client restart).
let keys_needing_relay_lists: Vec<PublicKeyHexPrefix> = GLOBALS
.people
.get_followed_pubkeys_needing_contact_lists(&followed_pubkeys)
.get_followed_pubkeys_needing_relay_lists(&followed_pubkeys)
.drain(..)
.map(|pk| pk.into())
.collect();
if !contact_list_keys.is_empty() {
if !keys_needing_relay_lists.is_empty() {
tracing::debug!(
"Need contact lists from {} people on {}",
contact_list_keys.len(),
"Looking to update relay lists from {} people on {}",
keys_needing_relay_lists.len(),
&self.url
);
// TBD: EventKind::RelaysList from nip23
filters.push(Filter {
authors: contact_list_keys,
kinds: vec![EventKind::ContactList],
authors: keys_needing_relay_lists,
kinds: vec![EventKind::RelayList, EventKind::ContactList],
// No since. These are replaceable events, we should only get 1 per person.
..Default::default()
});
}
}
// reactions to posts by me
// FIXME TBD
// reactions to posts by people followed
// FIXME TBD
// NO REPLIES OR ANCESTORS
if filters.is_empty() {
@ -508,6 +471,97 @@ impl Minion {
Ok(())
}
// Subscribe to anybody mentioning the user on the relays the user reads from
// (and any other relay for the time being until nip65 is in widespread use)
async fn subscribe_mentions(&mut self) -> Result<(), Error> {
let mut filters: Vec<Filter> = Vec::new();
let (overlap, replies_chunk) = {
let settings = GLOBALS.settings.read().await.clone();
(
Duration::from_secs(settings.overlap),
Duration::from_secs(settings.replies_chunk),
)
};
// Compute how far to look back
let replies_since = {
// Start with where we left off, the time we last got something from
// this relay.
let mut replies_since: Unixtime = match self.dbrelay.last_general_eose_at {
Some(u) => Unixtime(u as i64),
None => Unixtime(0),
};
// Subtract overlap to avoid gaps due to clock sync and event
// propagation delay
replies_since = replies_since - overlap;
// Some relays don't like dates before 1970. Hell, we don't need anything before 2020:
if replies_since.0 < 1577836800 {
replies_since.0 = 1577836800;
}
let one_replieschunk_ago = Unixtime::now().unwrap() - replies_chunk;
replies_since.max(one_replieschunk_ago)
};
let enable_reactions = GLOBALS.settings.read().await.reactions;
let enable_reposts = GLOBALS.settings.read().await.reposts;
if let Some(pubkey) = GLOBALS.signer.public_key() {
// Any mentions of me
// (but not in peoples contact lists, for example)
let mut kinds = vec![EventKind::TextNote];
if enable_reactions {
kinds.push(EventKind::Reaction);
}
if enable_reposts {
kinds.push(EventKind::Repost);
}
let pkh: PublicKeyHex = pubkey.into();
filters.push(Filter {
p: vec![pkh.clone()],
kinds,
since: Some(replies_since),
..Default::default()
});
// Listen for my metadata and similar kinds of posts
// FIXME - move this to listening to my WRITE relays
filters.push(Filter {
authors: vec![pkh.into()],
kinds: vec![
EventKind::Metadata,
EventKind::RecommendRelay,
EventKind::ContactList,
EventKind::RelayList,
],
since: Some(replies_since),
..Default::default()
});
}
self.subscribe(filters, "mentions_feed").await?;
if let Some(sub) = self.subscriptions.get_mut("mentions_feed") {
if let Some(nip11) = &self.nip11 {
if !nip11.supports_nip(15) {
// Does not support EOSE. Set subscription to EOSE now.
sub.set_eose();
}
} else {
// Does not support EOSE. Set subscription to EOSE now.
sub.set_eose();
}
}
Ok(())
}
// Subscribe to the posts a person generates on the relays they write to
async fn subscribe_person_feed(&mut self, pubkey: PublicKeyHex) -> Result<(), Error> {
// NOTE we do not unsubscribe to the general feed

View File

@ -196,6 +196,19 @@ impl Overlord {
tracing::info!("Loaded {} feed related events from the database", count);
}
// Load relay lists from the database and process
{
let events: Vec<Event> = DbEvent::fetch_relay_lists().await?;
// Process these events
let mut count = 0;
for event in events.iter() {
count += 1;
crate::process::process_new_event(event, false, None, None).await?;
}
tracing::info!("Loaded {} relay list events from the database", count);
}
// Pick Relays and start Minions
if !GLOBALS.settings.read().await.offline {
// Create a new RelayPicker
@ -205,6 +218,33 @@ impl Overlord {
self.pick_relays().await;
}
// For NIP-65, separately subscribe to our mentions on our read relays
let read_relay_urls: Vec<RelayUrl> = GLOBALS
.relays
.read()
.await
.iter()
.filter_map(|(url, dbrelay)| {
if dbrelay.read {
Some(url.clone())
} else {
None
}
})
.collect();
for relay_url in read_relay_urls.iter() {
// Start a minion for this relay if there is none
if !GLOBALS.relays_watching.read().await.contains(relay_url) {
self.start_minion(relay_url.clone()).await?;
}
// Subscribe to our mentions
let _ = self.to_minions.send(ToMinionMessage {
target: relay_url.to_string(),
payload: ToMinionPayload::SubscribeMentions,
});
}
'mainloop: loop {
match self.loop_handler().await {
Ok(keepgoing) => {
@ -251,6 +291,13 @@ impl Overlord {
),
});
// Until NIP-65 is in widespread use, we should listen for mentions
// of us on all these relays too
let _ = self.to_minions.send(ToMinionMessage {
target: relay_assignment.relay.url.0.clone(),
payload: ToMinionPayload::SubscribeMentions,
});
tracing::info!(
"Picked relay {} covering {} people.",
&relay_assignment.relay.url,
@ -441,6 +488,9 @@ impl Overlord {
let dbrelay = DbRelay::new(relay_str);
DbRelay::insert(dbrelay).await?;
}
ToOverlordMessage::AdvertiseRelayList => {
self.advertise_relay_list().await?;
}
ToOverlordMessage::DeletePub => {
GLOBALS.signer.clear_public_key();
GLOBALS.signer.save_through_settings().await?;
@ -565,11 +615,18 @@ impl Overlord {
GLOBALS.settings.read().await.save().await?;
tracing::debug!("Settings saved.");
}
ToOverlordMessage::SetRelayPost(relay_url, post) => {
ToOverlordMessage::SetRelayReadWrite(relay_url, read, write) => {
if let Some(relay) = GLOBALS.relays.write().await.get_mut(&relay_url) {
relay.post = post;
relay.read = read;
relay.write = write;
}
DbRelay::update_post(relay_url, post).await?;
DbRelay::update_read_and_write(relay_url, read, write).await?;
}
ToOverlordMessage::SetRelayAdvertise(relay_url, advertise) => {
if let Some(relay) = GLOBALS.relays.write().await.get_mut(&relay_url) {
relay.advertise = advertise;
}
DbRelay::update_advertise(relay_url, advertise).await?;
}
ToOverlordMessage::SetThreadFeed(id, referenced_by, previous_thread_parent) => {
self.set_thread_feed(id, referenced_by, previous_thread_parent)
@ -639,16 +696,20 @@ impl Overlord {
let db_relay = DbRelay::new(relay.clone());
DbRelay::insert(db_relay).await?;
let now = Unixtime::now().unwrap().0 as u64;
// Save person_relay
DbPersonRelay::insert(DbPersonRelay {
person: pkhex.to_string(),
relay,
last_fetched: None,
last_suggested_kind2: None,
last_suggested_kind3: None,
last_suggested_kind3: Some(now), // consider it our claim in our contact list
last_suggested_nip23: None,
last_suggested_nip05: None,
last_suggested_bytag: None,
read: true,
write: true,
})
.await?;
@ -710,7 +771,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
@ -734,6 +795,88 @@ impl Overlord {
Ok(())
}
async fn advertise_relay_list(&mut self) -> Result<(), Error> {
let public_key = match GLOBALS.signer.public_key() {
Some(pk) => pk,
None => {
tracing::warn!("No public key! Not posting");
return Ok(());
}
};
let read_or_write_relays: Vec<DbRelay> = GLOBALS
.relays
.read()
.await
.iter()
.filter_map(|(_url, r)| {
if r.read || r.write {
Some(r.to_owned())
} else {
None
}
})
.collect();
let mut tags: Vec<Tag> = Vec::new();
for relay in read_or_write_relays.iter() {
tags.push(Tag::Reference {
url: relay.url.to_unchecked_url(),
marker: if relay.read && relay.write {
None
} else if relay.read {
Some("read".to_owned())
} else if relay.write {
Some("write".to_owned())
} else {
unreachable!()
},
});
}
let pre_event = PreEvent {
pubkey: public_key,
created_at: Unixtime::now().unwrap(),
kind: EventKind::RelayList,
tags,
content: "".to_string(),
ots: None,
};
let event = GLOBALS.signer.sign_preevent(pre_event, None)?;
let advertise_to_relay_urls: Vec<RelayUrl> = GLOBALS
.relays
.read()
.await
.iter()
.filter_map(|(url, r)| {
if r.advertise {
Some(url.to_owned())
} else {
None
}
})
.collect();
for relay_url in advertise_to_relay_urls {
// Start a minion for it, if there is none
if !GLOBALS.relays_watching.read().await.contains(&relay_url) {
self.start_minion(relay_url.clone()).await?;
}
// Send it the event to post
tracing::debug!("Asking {} to post", &relay_url);
let _ = self.to_minions.send(ToMinionMessage {
target: relay_url.0.clone(),
payload: ToMinionPayload::PostEvent(Box::new(event.clone())),
});
}
Ok(())
}
async fn post_reply(
&mut self,
mut content: String,
@ -838,7 +981,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
@ -913,7 +1056,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
@ -948,7 +1091,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
@ -981,7 +1124,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {
@ -1025,7 +1168,7 @@ impl Overlord {
.read()
.await
.iter()
.filter_map(|(_, r)| if r.post { Some(r.to_owned()) } else { None })
.filter_map(|(_, r)| if r.write { Some(r.to_owned()) } else { None })
.collect();
for relay in relays {

View File

@ -24,7 +24,8 @@ pub struct DbPerson {
pub followed: u8,
pub followed_last_updated: i64,
pub muted: u8,
pub contact_list_last_received: i64,
pub relay_list_last_received: i64,
pub relay_list_created_at: i64,
}
impl DbPerson {
@ -38,7 +39,8 @@ impl DbPerson {
followed: 0,
followed_last_updated: 0,
muted: 0,
contact_list_last_received: 0,
relay_list_last_received: 0,
relay_list_created_at: 0,
}
}
@ -90,7 +92,7 @@ pub struct People {
// the person's NIP-05 when that metadata come in. We remember this here.
recheck_nip05: DashSet<PublicKeyHex>,
// Date of the last self-owend contact list we processed
// Date of the last self-owned contact list we processed
pub last_contact_list_asof: AtomicI64,
}
@ -118,15 +120,16 @@ impl People {
output
}
pub fn get_followed_pubkeys_needing_contact_lists(
pub fn get_followed_pubkeys_needing_relay_lists(
&self,
among_these: &[PublicKeyHex],
) -> Vec<PublicKeyHex> {
let one_day_ago = Unixtime::now().unwrap().0 - (60 * 60 * 24);
// FIXME make this a setting (8 hours)
let one_day_ago = Unixtime::now().unwrap().0 - (60 * 60 * 8);
let mut output: Vec<PublicKeyHex> = Vec::new();
for person in self.people.iter().filter_map(|p| {
if p.followed == 1
&& p.contact_list_last_received < one_day_ago
&& p.relay_list_last_received < one_day_ago
&& among_these.contains(&p.pubkey)
{
Some(p)
@ -306,7 +309,8 @@ impl People {
// who are muted, so they can be found and unmuted as necessary.
let sql = "SELECT pubkey, metadata, metadata_at, nip05_valid, nip05_last_checked, \
followed, followed_last_updated, muted, contact_list_last_received \
followed, followed_last_updated, muted, relay_list_last_received, \
relay_list_created_at \
FROM person WHERE followed=1 OR muted=1"
.to_owned();
@ -333,7 +337,8 @@ impl People {
followed: row.get(5)?,
followed_last_updated: row.get(6)?,
muted: row.get(7)?,
contact_list_last_received: row.get(8)?,
relay_list_last_received: row.get(8)?,
relay_list_created_at: row.get(9)?,
});
}
Ok(output)
@ -814,25 +819,42 @@ impl People {
Ok(())
}
pub async fn update_contact_list_last_received(
// Returns true if the date passed in is newer than what we already had
pub async fn update_relay_list_stamps(
&self,
pubkeyhex: PublicKeyHex,
) -> Result<(), Error> {
mut created_at: i64,
) -> Result<bool, Error> {
let now = Unixtime::now().unwrap().0;
let mut retval = false;
if let Some(mut person) = self.people.get_mut(&pubkeyhex) {
person.contact_list_last_received = now;
person.relay_list_last_received = now;
if created_at > person.relay_list_created_at {
retval = true;
person.relay_list_created_at = created_at;
} else {
created_at = person.relay_list_created_at; // for the update below
}
} else {
tracing::warn!("FIXME: RelayList for person we don't have. We should create them.");
return Ok(false);
}
task::spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt =
db.prepare("UPDATE person SET contact_list_last_received=? WHERE pubkey=?")?;
stmt.execute((&now, pubkeyhex.as_str()))?;
Ok(())
let mut stmt = db.prepare(
"UPDATE person SET relay_list_last_received=?, \
relay_list_created_at=? WHERE pubkey=?",
)?;
stmt.execute((&now, &created_at, pubkeyhex.as_str()))?;
Ok::<(), Error>(())
})
.await?
.await??;
Ok(retval)
}
pub async fn update_nip05_last_checked(&self, pubkeyhex: PublicKeyHex) -> Result<(), Error> {
@ -910,7 +932,8 @@ impl People {
let sql = "SELECT pubkey, metadata, metadata_at, \
nip05_valid, nip05_last_checked, \
followed, followed_last_updated, muted, \
contact_list_last_received FROM person"
relay_list_last_received, relay_list_created_at \
FROM person"
.to_owned();
let sql = match criteria {
None => sql,
@ -940,7 +963,8 @@ impl People {
followed: row.get(5)?,
followed_last_updated: row.get(6)?,
muted: row.get(7)?,
contact_list_last_received: row.get(8)?,
relay_list_last_received: row.get(8)?,
relay_list_created_at: row.get(9)?,
});
}
Ok(output)
@ -963,8 +987,8 @@ impl People {
async fn fetch_many(pubkeys: &[&PublicKeyHex]) -> Result<Vec<DbPerson>, Error> {
let sql = format!(
"SELECT pubkey, metadata, metadata_at, nip05_valid, nip05_last_checked, \
followed, followed_last_updated, muted, contact_list_last_received \
FROM person WHERE pubkey IN ({})",
followed, followed_last_updated, muted, relay_list_last_received, \
relay_list_created_at FROM person WHERE pubkey IN ({})",
repeat_vars(pubkeys.len())
);
@ -1000,7 +1024,8 @@ impl People {
followed: row.get(5)?,
followed_last_updated: row.get(6)?,
muted: row.get(7)?,
contact_list_last_received: row.get(8)?,
relay_list_last_received: row.get(8)?,
relay_list_created_at: row.get(9)?,
});
}

View File

@ -213,11 +213,6 @@ pub async fn process_new_event(
}
if event.kind == EventKind::ContactList {
GLOBALS
.people
.update_contact_list_last_received(event.pubkey.into())
.await?;
if let Some(pubkey) = GLOBALS.signer.public_key() {
if event.pubkey == pubkey {
process_your_contact_list(event).await?;
@ -229,6 +224,10 @@ pub async fn process_new_event(
}
}
if event.kind == EventKind::RelayList {
process_relay_list(event).await?;
}
// TBD (have to parse runes language for this)
//if event.kind == EventKind::RelayList {
// process_somebody_elses_relay_list(event.pubkey.clone(), &event.contents).await?;
@ -239,6 +238,42 @@ pub async fn process_new_event(
Ok(())
}
async fn process_relay_list(event: &Event) -> Result<(), Error> {
// Update that we received the relay list (and optionally bump forward the date
// if this relay list happens to be newer)
let newer = GLOBALS
.people
.update_relay_list_stamps(event.pubkey.into(), event.created_at.0)
.await?;
if !newer {
return Ok(());
}
let mut read_relays: Vec<RelayUrl> = Vec::new();
let mut write_relays: Vec<RelayUrl> = Vec::new();
for tag in event.tags.iter() {
if let Tag::Reference { url, marker } = tag {
if let Ok(relay_url) = RelayUrl::try_from_unchecked_url(url) {
if let Some(m) = marker {
match &*m.trim().to_lowercase() {
"read" => read_relays.push(relay_url.clone()),
"write" => write_relays.push(relay_url.clone()),
_ => {} // ignore unknown marker
}
} else {
read_relays.push(relay_url.clone());
write_relays.push(relay_url.clone());
}
}
}
}
DbPersonRelay::set_relay_list(event.pubkey.into(), read_relays, write_relays).await?;
Ok(())
}
async fn process_somebody_elses_contact_list(
pubkey: PublicKey,
event: &Event,
@ -253,6 +288,13 @@ async fn process_somebody_elses_contact_list(
// person_relay.last_suggested_kind3 is updated based on the p-tag, not the contents,
// of kind3.
// This counts as a relay list for now, but never clobbers the actual relay list
// relay_list_created_at field. So would nip23 events.
let _ = GLOBALS
.people
.update_relay_list_stamps(pubkey.into(), 0)
.await?;
for (url, simple_relay_usage) in srl.0.iter() {
// Only if they write there (we don't care where they read from)
if simple_relay_usage.write {

View File

@ -23,7 +23,7 @@ pub(super) fn posting_area(
}
ui.label(" to post.");
});
} else if !GLOBALS.relays.blocking_read().iter().any(|(_, r)| r.post) {
} else if !GLOBALS.relays.blocking_read().iter().any(|(_, r)| r.write) {
ui.horizontal_wrapped(|ui| {
ui.label("You need to ");
if ui.link("choose relays").clicked() {

View File

@ -31,6 +31,12 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr
"That's not a valid relay URL.".to_owned();
}
}
ui.separator();
if ui.button("↑ Advertise Relay List ↑").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::AdvertiseRelayList);
}
});
ui.add_space(10.0);
@ -40,7 +46,7 @@ pub(super) fn update(app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::Fr
// TBD time how long this takes. We don't want expensive code in the UI
let mut relays = GLOBALS.relays.blocking_read().clone();
let mut relays: Vec<DbRelay> = relays.drain().map(|(_, relay)| relay).collect();
relays.sort_by(|a, b| b.post.cmp(&a.post).then(a.url.cmp(&b.url)));
relays.sort_by(|a, b| b.write.cmp(&a.write).then(a.url.cmp(&b.url)));
ui.with_layout(Layout::bottom_up(Align::Center), |ui| {
ui.add_space(18.0);
@ -62,6 +68,8 @@ fn relay_table(ui: &mut Ui, relays: &mut [DbRelay], id: &'static str) {
.column(Column::auto().resizable(true))
.column(Column::auto().resizable(true))
.column(Column::auto().resizable(true))
.column(Column::auto().resizable(true))
.column(Column::auto().resizable(true))
.column(Column::remainder())
.header(20.0, |mut header| {
header.col(|ui| {
@ -81,11 +89,20 @@ fn relay_table(ui: &mut Ui, relays: &mut [DbRelay], id: &'static str) {
.on_hover_text("This only counts events served after EOSE, as they mark where we can pick up from next time.");
});
header.col(|ui| {
ui.heading("Write");
ui.heading("Read")
.on_hover_text("Read for events with mentions of you on these relays. It is recommended to have a few." );
});
header.col(|ui| {
ui.heading("Write")
.on_hover_text("Write your events to these relays. It is recommended to have a few." );
});
header.col(|ui| {
ui.heading("Advertise")
.on_hover_text("Advertise your read/write settings to this relay. It is recommended to advertise to many relays so that you can be found.");
});
header.col(|ui| {
ui.heading("Read rank")
.on_hover_text("0-9: 0 disables, 3 is default, 9 is highest rank".to_string());
.on_hover_text("How likely we will connect to relays to read other people's posts, from 0 (never) to 9 (highly). Default is 3.".to_string());
});
}).body(|body| {
body.rows(24.0, relays.len(), |row_index, mut row| {
@ -112,14 +129,36 @@ fn relay_table(ui: &mut Ui, relays: &mut [DbRelay], id: &'static str) {
}
});
row.col(|ui| {
let mut post = relay.post; // checkbox needs a mutable state variable.
if ui.checkbox(&mut post, "")
.on_hover_text("If selected, posts you create will be sent to this relay. But you have to press [SAVE CHANGES] at the bottom of this page.")
let mut read = relay.read; // checkbox needs a mutable state variable.
if ui.checkbox(&mut read, "")
.on_hover_text("If selected, we will search for posts mentioning you on this relay.")
.clicked()
{
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::SetRelayPost(relay.url.clone(), post));
.send(ToOverlordMessage::SetRelayReadWrite(relay.url.clone(), read, relay.write));
}
});
row.col(|ui| {
let mut write = relay.write; // checkbox needs a mutable state variable.
if ui.checkbox(&mut write, "")
.on_hover_text("If selected, posts you create will be sent to this relay.")
.clicked()
{
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::SetRelayReadWrite(relay.url.clone(), relay.read, write));
}
});
row.col(|ui| {
let mut advertise = relay.advertise; // checkbox needs a mutable state variable.
if ui.checkbox(&mut advertise, "")
.on_hover_text("If selected, when you send out your relay list advertisements, one of them will go to this relay.")
.clicked()
{
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::SetRelayAdvertise(relay.url.clone(), advertise));
}
});
row.col(|ui| {