mirror of
https://github.com/mikedilger/gossip.git
synced 2024-09-19 19:46:50 +00:00
Strip out desired events handling
This commit is contained in:
parent
c581d45855
commit
9336ece7db
@ -9,7 +9,6 @@ pub enum ToOverlordMessage {
|
|||||||
FollowHex(String, String),
|
FollowHex(String, String),
|
||||||
FollowNip05(String),
|
FollowNip05(String),
|
||||||
GeneratePrivateKey(String),
|
GeneratePrivateKey(String),
|
||||||
GetMissingEvents,
|
|
||||||
ImportPriv(String, String),
|
ImportPriv(String, String),
|
||||||
ImportPub(String),
|
ImportPub(String),
|
||||||
Like(Id, PublicKey),
|
Like(Id, PublicKey),
|
||||||
|
@ -51,6 +51,7 @@ impl DbEvent {
|
|||||||
output
|
output
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn fetch_by_ids(ids: Vec<IdHex>) -> Result<Vec<DbEvent>, Error> {
|
pub async fn fetch_by_ids(ids: Vec<IdHex>) -> Result<Vec<DbEvent>, Error> {
|
||||||
if ids.is_empty() {
|
if ids.is_empty() {
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
@ -183,6 +184,7 @@ impl DbEvent {
|
|||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
fn repeat_vars(count: usize) -> String {
|
fn repeat_vars(count: usize) -> String {
|
||||||
assert_ne!(count, 0);
|
assert_ne!(count, 0);
|
||||||
let mut s = "?,".repeat(count);
|
let mut s = "?,".repeat(count);
|
||||||
|
104
src/globals.rs
104
src/globals.rs
@ -1,6 +1,5 @@
|
|||||||
use crate::comms::{ToMinionMessage, ToOverlordMessage};
|
use crate::comms::{ToMinionMessage, ToOverlordMessage};
|
||||||
use crate::db::{DbEvent, DbRelay};
|
use crate::db::DbRelay;
|
||||||
use crate::error::Error;
|
|
||||||
use crate::feed::Feed;
|
use crate::feed::Feed;
|
||||||
use crate::fetcher::Fetcher;
|
use crate::fetcher::Fetcher;
|
||||||
use crate::people::People;
|
use crate::people::People;
|
||||||
@ -8,7 +7,7 @@ use crate::relationship::Relationship;
|
|||||||
use crate::settings::Settings;
|
use crate::settings::Settings;
|
||||||
use crate::signer::Signer;
|
use crate::signer::Signer;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use nostr_types::{Event, Id, IdHex, PublicKeyHex, Url};
|
use nostr_types::{Event, Id, PublicKeyHex, Url};
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
@ -42,10 +41,6 @@ pub struct Globals {
|
|||||||
/// All relationships between events
|
/// All relationships between events
|
||||||
pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
|
pub relationships: RwLock<HashMap<Id, Vec<(Id, Relationship)>>>,
|
||||||
|
|
||||||
/// Desired events, referred to by others, with possible URLs where we can
|
|
||||||
/// get them. We may already have these, but if not we should ask for them.
|
|
||||||
pub desired_events: RwLock<HashMap<Id, Vec<Url>>>,
|
|
||||||
|
|
||||||
/// All nostr people records currently loaded into memory, keyed by pubkey
|
/// All nostr people records currently loaded into memory, keyed by pubkey
|
||||||
pub people: People,
|
pub people: People,
|
||||||
|
|
||||||
@ -103,7 +98,6 @@ lazy_static! {
|
|||||||
events: DashMap::new(),
|
events: DashMap::new(),
|
||||||
incoming_events: RwLock::new(Vec::new()),
|
incoming_events: RwLock::new(Vec::new()),
|
||||||
relationships: RwLock::new(HashMap::new()),
|
relationships: RwLock::new(HashMap::new()),
|
||||||
desired_events: RwLock::new(HashMap::new()),
|
|
||||||
people: People::new(),
|
people: People::new(),
|
||||||
relays: RwLock::new(HashMap::new()),
|
relays: RwLock::new(HashMap::new()),
|
||||||
relays_watching: RwLock::new(Vec::new()),
|
relays_watching: RwLock::new(Vec::new()),
|
||||||
@ -122,100 +116,6 @@ lazy_static! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Globals {
|
impl Globals {
|
||||||
pub async fn store_desired_event(id: Id, url: Option<Url>) {
|
|
||||||
let mut desired_events = GLOBALS.desired_events.write().await;
|
|
||||||
desired_events
|
|
||||||
.entry(id)
|
|
||||||
.and_modify(|urls| {
|
|
||||||
if let Some(ref u) = url {
|
|
||||||
let n = Url::new(u);
|
|
||||||
if n.is_valid_relay_url() {
|
|
||||||
urls.push(n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.or_insert_with(|| if let Some(u) = url { vec![u] } else { vec![] });
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn trim_desired_events_sync() {
|
|
||||||
let mut desired_events = GLOBALS.desired_events.blocking_write();
|
|
||||||
desired_events.retain(|&id, _| !GLOBALS.events.contains_key(&id));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn trim_desired_events() {
|
|
||||||
let mut desired_events = GLOBALS.desired_events.write().await;
|
|
||||||
desired_events.retain(|&id, _| !GLOBALS.events.contains_key(&id));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_local_desired_events() -> Result<(), Error> {
|
|
||||||
Self::trim_desired_events().await;
|
|
||||||
|
|
||||||
// Load from database
|
|
||||||
{
|
|
||||||
let ids: Vec<IdHex> = GLOBALS
|
|
||||||
.desired_events
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.iter()
|
|
||||||
.map(|(id, _)| Into::<IdHex>::into(*id))
|
|
||||||
.collect();
|
|
||||||
let db_events = DbEvent::fetch_by_ids(ids).await?;
|
|
||||||
let mut events: Vec<Event> = Vec::with_capacity(db_events.len());
|
|
||||||
for dbevent in db_events.iter() {
|
|
||||||
let e = serde_json::from_str(&dbevent.raw)?;
|
|
||||||
events.push(e);
|
|
||||||
}
|
|
||||||
let mut count = 0;
|
|
||||||
for event in events.iter() {
|
|
||||||
count += 1;
|
|
||||||
crate::process::process_new_event(event, false, None, None).await?;
|
|
||||||
}
|
|
||||||
tracing::info!("Loaded {} desired events from the database", count);
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::trim_desired_events().await; // again
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_desired_events() -> Result<(HashMap<Url, Vec<Id>>, Vec<Id>), Error> {
|
|
||||||
Globals::get_local_desired_events().await?;
|
|
||||||
|
|
||||||
let desired_events = GLOBALS.desired_events.read().await;
|
|
||||||
let mut output: HashMap<Url, Vec<Id>> = HashMap::new();
|
|
||||||
let mut orphans: Vec<Id> = Vec::new();
|
|
||||||
for (id, vec_url) in desired_events.iter() {
|
|
||||||
if vec_url.is_empty() {
|
|
||||||
orphans.push(*id);
|
|
||||||
} else {
|
|
||||||
for url in vec_url.iter() {
|
|
||||||
output
|
|
||||||
.entry(url.to_owned())
|
|
||||||
.and_modify(|vec| vec.push(*id))
|
|
||||||
.or_insert_with(|| vec![*id]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((output, orphans))
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
pub async fn get_desired_events_for_url(url: Url) -> Result<Vec<Id>, Error> {
|
|
||||||
Globals::get_desired_events_prelude().await?;
|
|
||||||
|
|
||||||
let desired_events = GLOBALS.desired_events.read().await;
|
|
||||||
let mut output: Vec<Id> = Vec::new();
|
|
||||||
for (id, vec_url) in desired_events.iter() {
|
|
||||||
if vec_url.is_empty() || vec_url.contains(&url) {
|
|
||||||
output.push(*id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(output)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) {
|
pub async fn add_relationship(id: Id, related: Id, relationship: Relationship) {
|
||||||
let r = (related, relationship);
|
let r = (related, relationship);
|
||||||
let mut relationships = GLOBALS.relationships.write().await;
|
let mut relationships = GLOBALS.relationships.write().await;
|
||||||
|
@ -4,11 +4,11 @@ mod relay_picker;
|
|||||||
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
|
use crate::comms::{ToMinionMessage, ToMinionPayload, ToOverlordMessage};
|
||||||
use crate::db::{DbEvent, DbPersonRelay, DbRelay};
|
use crate::db::{DbEvent, DbPersonRelay, DbRelay};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::globals::{Globals, GLOBALS};
|
use crate::globals::GLOBALS;
|
||||||
use crate::people::People;
|
use crate::people::People;
|
||||||
use minion::Minion;
|
use minion::Minion;
|
||||||
use nostr_types::{
|
use nostr_types::{
|
||||||
Event, EventKind, Id, IdHex, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url,
|
Event, EventKind, Id, PreEvent, PrivateKey, PublicKey, PublicKeyHex, Tag, Unixtime, Url,
|
||||||
};
|
};
|
||||||
use relay_picker::{BestRelay, RelayPicker};
|
use relay_picker::{BestRelay, RelayPicker};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@ -401,9 +401,6 @@ impl Overlord {
|
|||||||
password.zeroize();
|
password.zeroize();
|
||||||
GLOBALS.signer.read().await.save_through_settings().await?;
|
GLOBALS.signer.read().await.save_through_settings().await?;
|
||||||
}
|
}
|
||||||
ToOverlordMessage::GetMissingEvents => {
|
|
||||||
self.get_missing_events().await?;
|
|
||||||
}
|
|
||||||
ToOverlordMessage::ImportPriv(mut import_priv, mut password) => {
|
ToOverlordMessage::ImportPriv(mut import_priv, mut password) => {
|
||||||
let maybe_pk1 = PrivateKey::try_from_bech32_string(&import_priv);
|
let maybe_pk1 = PrivateKey::try_from_bech32_string(&import_priv);
|
||||||
let maybe_pk2 = PrivateKey::try_from_hex_string(&import_priv);
|
let maybe_pk2 = PrivateKey::try_from_hex_string(&import_priv);
|
||||||
@ -534,54 +531,6 @@ impl Overlord {
|
|||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_missing_events(&mut self) -> Result<(), Error> {
|
|
||||||
let (desired_events_map, orphans): (HashMap<Url, Vec<Id>>, Vec<Id>) =
|
|
||||||
Globals::get_desired_events().await?;
|
|
||||||
|
|
||||||
let desired_count = GLOBALS.desired_events.read().await.len();
|
|
||||||
|
|
||||||
if desired_count == 0 {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::info!("Seeking {} events", desired_count);
|
|
||||||
|
|
||||||
let urls: Vec<Url> = desired_events_map
|
|
||||||
.keys()
|
|
||||||
.map(|u| u.to_owned())
|
|
||||||
.filter(|u| u.is_valid_relay_url())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for url in urls.iter() {
|
|
||||||
// Get all the ones slated for this relay
|
|
||||||
let mut ids = desired_events_map.get(url).cloned().unwrap_or_default();
|
|
||||||
|
|
||||||
// Add the orphans
|
|
||||||
ids.extend(&orphans);
|
|
||||||
|
|
||||||
if ids.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we don't have such a minion, start one
|
|
||||||
if !GLOBALS.relays_watching.read().await.contains(url) {
|
|
||||||
// Start a minion
|
|
||||||
self.start_minion(url.inner().to_owned()).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::debug!("{}: Asking to fetch {} events", url.inner(), ids.len());
|
|
||||||
|
|
||||||
let ids: Vec<IdHex> = ids.iter().map(|id| (*id).into()).collect();
|
|
||||||
// Tell it to get these events
|
|
||||||
let _ = self.to_minions.send(ToMinionMessage {
|
|
||||||
target: url.inner().to_owned(),
|
|
||||||
payload: ToMinionPayload::FetchEvents(ids),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn follow_bech32(bech32: String, relay: String) -> Result<(), Error> {
|
async fn follow_bech32(bech32: String, relay: String) -> Result<(), Error> {
|
||||||
let pk = PublicKey::try_from_bech32_string(&bech32)?;
|
let pk = PublicKey::try_from_bech32_string(&bech32)?;
|
||||||
let pkhex: PublicKeyHex = pk.into();
|
let pkhex: PublicKeyHex = pk.into();
|
||||||
|
@ -140,16 +140,8 @@ pub async fn process_new_event(
|
|||||||
Globals::add_relationship(id, event.id, Relationship::Reply).await;
|
Globals::add_relationship(id, event.id, Relationship::Reply).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We desire all ancestors
|
|
||||||
for (id, maybe_url) in event.replies_to_ancestors() {
|
|
||||||
// Insert desired event if relevant
|
|
||||||
if !GLOBALS.events.contains_key(&id) {
|
|
||||||
Globals::store_desired_event(id, maybe_url).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// reacts to
|
// reacts to
|
||||||
if let Some((id, reaction, maybe_url)) = event.reacts_to() {
|
if let Some((id, reaction, _maybe_url)) = event.reacts_to() {
|
||||||
if from_relay {
|
if from_relay {
|
||||||
let db_event_relationship = DbEventRelationship {
|
let db_event_relationship = DbEventRelationship {
|
||||||
original: event.id.as_hex_string(),
|
original: event.id.as_hex_string(),
|
||||||
@ -160,11 +152,6 @@ pub async fn process_new_event(
|
|||||||
db_event_relationship.insert().await?;
|
db_event_relationship.insert().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert desired event if relevant
|
|
||||||
if !GLOBALS.events.contains_key(&id) {
|
|
||||||
Globals::store_desired_event(id, maybe_url).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert into relationships
|
// Insert into relationships
|
||||||
Globals::add_relationship(id, event.id, Relationship::Reaction(reaction)).await;
|
Globals::add_relationship(id, event.id, Relationship::Reaction(reaction)).await;
|
||||||
}
|
}
|
||||||
|
@ -71,51 +71,6 @@ pub(super) fn update(app: &mut GossipUi, ctx: &Context, frame: &mut eframe::Fram
|
|||||||
|
|
||||||
ui.separator();
|
ui.separator();
|
||||||
|
|
||||||
// Top Buttons
|
|
||||||
Globals::trim_desired_events_sync();
|
|
||||||
let desired_count: isize = match GLOBALS.desired_events.try_read() {
|
|
||||||
Ok(v) => v.len() as isize,
|
|
||||||
Err(_) => -1,
|
|
||||||
};
|
|
||||||
/*
|
|
||||||
let incoming_count: isize = match GLOBALS.incoming_events.try_read() {
|
|
||||||
Ok(v) => v.len() as isize,
|
|
||||||
Err(_) => -1,
|
|
||||||
};
|
|
||||||
*/
|
|
||||||
ui.with_layout(Layout::right_to_left(Align::TOP), |ui| {
|
|
||||||
if ui
|
|
||||||
.button(&format!("QM {}", desired_count))
|
|
||||||
.on_hover_text("Query Relays for Missing Events")
|
|
||||||
.clicked()
|
|
||||||
{
|
|
||||||
let _ = GLOBALS
|
|
||||||
.to_overlord
|
|
||||||
.send(ToOverlordMessage::GetMissingEvents);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Hide for now, as they are processed automatically at present
|
|
||||||
if ui
|
|
||||||
.button(&format!("PQ {}", incoming_count))
|
|
||||||
.on_hover_text("Process Queue of Incoming Events")
|
|
||||||
.clicked()
|
|
||||||
{
|
|
||||||
let _ = GLOBALS.to_overlord.send(ToOverlordMessage::ProcessIncomingEvents);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
ui.label(&format!(
|
|
||||||
"RIF={}",
|
|
||||||
GLOBALS
|
|
||||||
.fetcher
|
|
||||||
.requests_in_flight
|
|
||||||
.load(std::sync::atomic::Ordering::Relaxed)
|
|
||||||
))
|
|
||||||
.on_hover_text("Requests In Flight (http, not wss)");
|
|
||||||
});
|
|
||||||
|
|
||||||
ui.separator();
|
|
||||||
|
|
||||||
match feed_kind {
|
match feed_kind {
|
||||||
FeedKind::General => {
|
FeedKind::General => {
|
||||||
let feed = GLOBALS.feed.get_general();
|
let feed = GLOBALS.feed.get_general();
|
||||||
|
Loading…
Reference in New Issue
Block a user