diff --git a/src/db/contact.rs b/src/db/contact.rs new file mode 100644 index 00000000..18d77f39 --- /dev/null +++ b/src/db/contact.rs @@ -0,0 +1,85 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DbContact { + pub source: String, + pub contact: String, + pub relay: Option, + pub petname: Option, +} + +impl DbContact { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT source, contact, relay, petname FROM contact".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbContact { + source: row.get(0)?, + contact: row.get(1)?, + relay: row.get(2)?, + petname: row.get(3)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn insert(contact: DbContact) -> Result<(), Error> { + let sql = "INSERT OR IGNORE INTO contact (source, contact, relay, petname) \ + VALUES (?1, ?2, ?3, ?4)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &contact.source, + &contact.contact, + &contact.relay, + &contact.petname, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM contact WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/db/event.rs b/src/db/event.rs new file mode 100644 index 00000000..953e24f5 --- /dev/null +++ b/src/db/event.rs @@ -0,0 +1,159 @@ +use super::{DbEventSeen, DbEventTag}; +use crate::error::Error; +use crate::globals::GLOBALS; +use nostr_proto::{Event, IdHex, PublicKeyHex, Unixtime, Url}; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DbEvent { + pub id: IdHex, + pub raw: String, + pub pubkey: PublicKeyHex, + pub created_at: i64, + pub kind: u64, + pub content: String, + pub ots: Option, +} + +impl DbEvent { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT id, raw, pubkey, created_at, kind, content, ots FROM event".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbEvent { + id: IdHex(row.get(0)?), + raw: row.get(1)?, + pubkey: PublicKeyHex(row.get(2)?), + created_at: row.get(3)?, + kind: row.get(4)?, + content: row.get(5)?, + ots: row.get(6)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn insert(event: DbEvent) -> Result<(), Error> { + let sql = "INSERT OR IGNORE INTO event (id, raw, pubkey, created_at, kind, content, ots) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &event.id.0, + &event.raw, + &event.pubkey.0, + &event.created_at, + &event.kind, + &event.content, + &event.ots, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM event WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn get_author(id: IdHex) -> Result, Error> { + let sql = "SELECT pubkey FROM event WHERE id=?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + let mut stmt = db.prepare(sql)?; + let mut rows = stmt.query_map([id.0], |row| row.get(0))?; + if let Some(row) = rows.next() { + return Ok(Some(PublicKeyHex(row?))); + } + Ok(None) + }) + .await? + } + + #[allow(dead_code)] + pub async fn save_nostr_event(event: &Event, seen_on: Option) -> Result<(), Error> { + // Convert a nostr Event into a DbEvent + let db_event = DbEvent { + id: event.id.into(), + raw: serde_json::to_string(&event)?, + pubkey: event.pubkey.into(), + created_at: event.created_at.0, + kind: event.kind.into(), + content: event.content.clone(), + ots: event.ots.clone(), + }; + + // Save into event table + DbEvent::insert(db_event).await?; + + // Save the tags into event_tag table + for (seq, tag) in event.tags.iter().enumerate() { + // convert to vec of strings + let v: Vec = serde_json::from_str(&serde_json::to_string(&tag)?)?; + + let db_event_tag = DbEventTag { + event: event.id.as_hex_string(), + seq: seq as u64, + label: v.get(0).cloned(), + field0: v.get(1).cloned(), + field1: v.get(2).cloned(), + field2: v.get(3).cloned(), + field3: v.get(4).cloned(), + }; + DbEventTag::insert(db_event_tag).await?; + } + + // Save the event into event_seen table + if let Some(url) = seen_on { + let db_event_seen = DbEventSeen { + event: event.id.as_hex_string(), + relay: url.0, + when_seen: Unixtime::now()?.0 as u64, + }; + DbEventSeen::replace(db_event_seen).await?; + } + + Ok(()) + } +} diff --git a/src/db/event_seen.rs b/src/db/event_seen.rs new file mode 100644 index 00000000..fcdd4d81 --- /dev/null +++ b/src/db/event_seen.rs @@ -0,0 +1,78 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DbEventSeen { + pub event: String, + pub relay: String, + pub when_seen: u64, +} + +impl DbEventSeen { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT event, relay, when_seen FROM event_seen".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbEventSeen { + event: row.get(0)?, + relay: row.get(1)?, + when_seen: row.get(2)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn replace(event_seen: DbEventSeen) -> Result<(), Error> { + let sql = "REPLACE INTO event_seen (event, relay, when_seen) \ + VALUES (?1, ?2, ?3)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&event_seen.event, &event_seen.relay, &event_seen.when_seen))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM event_seen WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/db/event_tag.rs b/src/db/event_tag.rs new file mode 100644 index 00000000..16fa495e --- /dev/null +++ b/src/db/event_tag.rs @@ -0,0 +1,96 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DbEventTag { + pub event: String, + pub seq: u64, + pub label: Option, + pub field0: Option, + pub field1: Option, + pub field2: Option, + pub field3: Option, +} + +impl DbEventTag { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = + "SELECT event, seq, label, field0, field1, field2, field3 FROM event_tag".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbEventTag { + event: row.get(0)?, + seq: row.get(1)?, + label: row.get(2)?, + field0: row.get(3)?, + field1: row.get(4)?, + field2: row.get(5)?, + field3: row.get(6)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn insert(event_tag: DbEventTag) -> Result<(), Error> { + let sql = + "INSERT OR IGNORE INTO event_tag (event, seq, label, field0, field1, field2, field3) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &event_tag.event, + &event_tag.seq, + &event_tag.label, + &event_tag.field0, + &event_tag.field1, + &event_tag.field2, + &event_tag.field3, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM event_tag WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 00000000..a2419868 --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,108 @@ +mod event; +pub use event::DbEvent; + +mod event_seen; +pub use event_seen::DbEventSeen; + +mod event_tag; +pub use event_tag::DbEventTag; + +mod relay; +pub use relay::DbRelay; + +mod person; +pub use person::DbPerson; + +mod contact; +pub use contact::DbContact; + +mod person_relay; +pub use person_relay::DbPersonRelay; + +mod setting; +pub use setting::DbSetting; + +use crate::error::Error; +use crate::globals::GLOBALS; +use rusqlite::Connection; +use std::fs; +use tracing::info; + +// This sets up the database +#[allow(dead_code)] +#[allow(clippy::or_fun_call)] +pub async fn setup_database() -> Result<(), Error> { + let mut data_dir = dirs::data_dir() + .ok_or::("Cannot find a directory to store application data.".into())?; + data_dir.push("gossip"); + + // Create our data directory only if it doesn't exist + fs::create_dir_all(&data_dir)?; + + // Connect to (or create) our database + let mut db_path = data_dir.clone(); + db_path.push("gossip.sqlite"); + let connection = Connection::open_with_flags( + &db_path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE + | rusqlite::OpenFlags::SQLITE_OPEN_CREATE + | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX + | rusqlite::OpenFlags::SQLITE_OPEN_NOFOLLOW, + )?; + + // Save the connection globally + { + let mut db = GLOBALS.db.lock().await; + *db = Some(connection); + } + + // Check and upgrade our data schema + check_and_upgrade().await?; + + Ok(()) +} + +#[allow(dead_code)] +async fn check_and_upgrade() -> Result<(), Error> { + let maybe_db = GLOBALS.db.lock().await; + let db = maybe_db.as_ref().unwrap(); + + // Load the current version + match db.query_row( + "SELECT value FROM settings WHERE key=?", + ["version"], + |row| row.get::(0), + ) { + Ok(v) => upgrade(db, v.parse::().unwrap()), + Err(_e) => { + // Check the error first! + upgrade(db, 0) + } + } +} + +macro_rules! apply_sql { + ($db:ident, $version:ident, $thisversion:expr, $file:expr) => {{ + if $version < $thisversion { + info!("Upgrading database to version {}", $thisversion); + $db.execute_batch(include_str!($file))?; + $db.execute( + &format!( + "UPDATE settings SET value='{}' WHERE key='version'", + $thisversion + ), + (), + )?; + $version = $thisversion; + } + }}; +} + +#[allow(dead_code)] +fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> { + apply_sql!(db, version, 1, "schema1.sql"); + + info!("Database is at version {}", version); + + Ok(()) +} diff --git a/src/db/person.rs b/src/db/person.rs new file mode 100644 index 00000000..3525dadb --- /dev/null +++ b/src/db/person.rs @@ -0,0 +1,176 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use nostr_proto::PublicKeyHex; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DbPerson { + pub pubkey: PublicKeyHex, + pub name: Option, + pub about: Option, + pub picture: Option, + pub dns_id: Option, + pub dns_id_valid: u8, + pub dns_id_last_checked: Option, + pub metadata_at: Option, + pub followed: u8, +} + +impl DbPerson { + #[allow(dead_code)] + pub fn new(pubkey: PublicKeyHex) -> DbPerson { + DbPerson { + pubkey, + name: None, + about: None, + picture: None, + dns_id: None, + dns_id_valid: 0, + dns_id_last_checked: None, + metadata_at: None, + followed: 0, + } + } + + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = + "SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed FROM person".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbPerson { + pubkey: PublicKeyHex(row.get(0)?), + name: row.get(1)?, + about: row.get(2)?, + picture: row.get(3)?, + dns_id: row.get(4)?, + dns_id_valid: row.get(5)?, + dns_id_last_checked: row.get(6)?, + metadata_at: row.get(7)?, + followed: row.get(8)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn fetch_one(pubkey: PublicKeyHex) -> Result, Error> { + let people = DbPerson::fetch(Some(&format!("pubkey='{}'", pubkey))).await?; + + if people.is_empty() { + Ok(None) + } else { + Ok(Some(people[0].clone())) + } + } + + #[allow(dead_code)] + pub async fn insert(person: DbPerson) -> Result<(), Error> { + let sql = + "INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &person.pubkey.0, + &person.name, + &person.about, + &person.picture, + &person.dns_id, + &person.dns_id_valid, + &person.dns_id_last_checked, + &person.metadata_at, + &person.followed, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn update(person: DbPerson) -> Result<(), Error> { + let sql = + "UPDATE person SET name=?, about=?, picture=?, dns_id=?, dns_id_valid=?, dns_id_last_checked=?, metadata_at=?, followed=? WHERE pubkey=?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &person.name, + &person.about, + &person.picture, + &person.dns_id, + &person.dns_id_valid, + &person.dns_id_last_checked, + &person.metadata_at, + &person.followed, + &person.pubkey.0, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM person WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn populate_new_people(follow_everybody: bool) -> Result<(), Error> { + let sql = if follow_everybody { + "INSERT or IGNORE INTO person (pubkey, followed) SELECT DISTINCT pubkey, 1 FROM EVENT" + } else { + "INSERT or IGNORE INTO person (pubkey) SELECT DISTINCT pubkey FROM EVENT" + }; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/db/person_relay.rs b/src/db/person_relay.rs new file mode 100644 index 00000000..03b335ed --- /dev/null +++ b/src/db/person_relay.rs @@ -0,0 +1,189 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use nostr_proto::PublicKeyHex; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DbPersonRelay { + pub person: String, + pub relay: String, + pub recommended: u8, + pub last_fetched: Option, +} + +impl DbPersonRelay { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT person, relay, recommended, last_fetched FROM person_relay".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbPersonRelay { + person: row.get(0)?, + relay: row.get(1)?, + recommended: row.get(2)?, + last_fetched: row.get(3)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + /// Fetch records matching the given public keys, ordered from highest to lowest rank + #[allow(dead_code)] + pub async fn fetch_for_pubkeys(pubkeys: &[PublicKeyHex]) -> Result, Error> { + if pubkeys.is_empty() { + return Ok(vec![]); + } + + let sql = format!( + "SELECT person, relay, recommended, person_relay.last_fetched \ + FROM person_relay \ + INNER JOIN relay ON person_relay.relay=relay.url \ + WHERE person IN ({}) ORDER BY person, relay.rank DESC", + repeat_vars(pubkeys.len()) + ); + + let pubkey_strings: Vec = pubkeys.iter().map(|p| p.0.clone()).collect(); + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map(rusqlite::params_from_iter(pubkey_strings), |row| { + Ok(DbPersonRelay { + person: row.get(0)?, + relay: row.get(1)?, + recommended: row.get(2)?, + last_fetched: row.get(3)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + /// Fetch oldest last_fetched among a set of public keys for a relay + #[allow(dead_code)] + pub async fn fetch_oldest_last_fetched( + pubkeys: &[PublicKeyHex], + relay: &str, + ) -> Result { + if pubkeys.is_empty() { + return Ok(0); + } + + let sql = format!( + "SELECT min(coalesce(last_fetched,0)) FROM person_relay + WHERE relay=? AND person in ({})", + repeat_vars(pubkeys.len()) + ); + + let mut params: Vec = vec![relay.to_string()]; + params.extend(pubkeys.iter().map(|p| p.0.clone())); + + let output: Result = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let mut rows = stmt.query_map(rusqlite::params_from_iter(params), |row| row.get(0))?; + if let Some(result) = rows.next() { + Ok(result?) + } else { + Ok(0) + } + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn insert(person_relay: DbPersonRelay) -> Result<(), Error> { + let sql = "INSERT OR IGNORE INTO person_relay (person, relay, recommended, last_fetched) \ + VALUES (?1, ?2, ?3, ?4)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &person_relay.person, + &person_relay.relay, + &person_relay.recommended, + &person_relay.last_fetched, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn update_last_fetched(relay: String, last_fetched: u64) -> Result<(), Error> { + let sql = "UPDATE person_relay SET last_fetched=? where relay=?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&last_fetched, &*relay))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM person_relay WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} + +fn repeat_vars(count: usize) -> String { + assert_ne!(count, 0); + let mut s = "?,".repeat(count); + // Remove trailing comma + s.pop(); + s +} diff --git a/src/db/relay.rs b/src/db/relay.rs new file mode 100644 index 00000000..f40eef0a --- /dev/null +++ b/src/db/relay.rs @@ -0,0 +1,145 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use nostr_proto::Url; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DbRelay { + pub url: String, + pub success_count: u64, + pub failure_count: u64, + pub rank: Option, +} + +impl DbRelay { + #[allow(dead_code)] + pub fn new(url: String) -> DbRelay { + DbRelay { + url, + success_count: 0, + failure_count: 0, + rank: Some(3), + } + } + + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT url, success_count, failure_count, rank FROM relay".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbRelay { + url: row.get(0)?, + success_count: row.get(1)?, + failure_count: row.get(2)?, + rank: row.get(3)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn fetch_one(url: &Url) -> Result, Error> { + let relays = DbRelay::fetch(Some(&format!("url='{}'", url))).await?; + + if relays.is_empty() { + Ok(None) + } else { + Ok(Some(relays[0].clone())) + } + } + + #[allow(dead_code)] + pub async fn insert(relay: DbRelay) -> Result<(), Error> { + let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank) \ + VALUES (?1, ?2, ?3, ?4)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &relay.url, + &relay.success_count, + &relay.failure_count, + &relay.rank, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn update(relay: DbRelay) -> Result<(), Error> { + let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=? WHERE url=?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute(( + &relay.success_count, + &relay.failure_count, + &relay.rank, + &relay.url, + ))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM relay WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn populate_new_relays() -> Result<(), Error> { + let sql = + "INSERT OR IGNORE INTO relay (url, rank) SELECT DISTINCT relay, 3 FROM person_relay"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/db/schema1.sql b/src/db/schema1.sql new file mode 100644 index 00000000..d5e30768 --- /dev/null +++ b/src/db/schema1.sql @@ -0,0 +1,92 @@ +CREATE TABLE settings ( + key TEXT PRIMARY KEY NOT NULL, + value TEXT NOT NULL +) WITHOUT ROWID; + +CREATE TABLE person ( + pubkey TEXT PRIMARY KEY NOT NULL, + name TEXT DEFAULT NULL, + about TEXT DEFAULT NULL, + picture TEXT DEFAULT NULL, + dns_id TEXT DEFAULT NULL, + dns_id_valid INTEGER DEFAULT 0, + dns_id_last_checked INTEGER DEFAULT NULL, + metadata_at INTEGER DEFAULT NULL, + followed INTEGER DEFAULT 0 +) WITHOUT ROWID; + +CREATE TABLE relay ( + url TEXT PRIMARY KEY NOT NULL, + success_count INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + rank INTEGER DEFAULT 3 +) WITHOUT ROWID; + +CREATE TABLE person_relay ( + person TEXT NOT NULL, + relay TEXT NOT NULL, + recommended INTEGER DEFAULT 0, + last_fetched INTEGER DEFAULT NULL, + UNIQUE(person, relay) +); + +CREATE TABLE contact ( + source TEXT NOT NULL, + contact TEXT NOT NULL, + relay TEXT DEFAULT NULL, + petname TEXT DEFAULT NULL, + UNIQUE(source, contact) +); + +CREATE TABLE event ( + id TEXT PRIMARY KEY NOT NULL, + raw TEXT NOT NULL, + pubkey TEXT NOT NULL, + created_at INTEGER NOT NULL, + kind INTEGER NOT NULL, + content TEXT NOT NULL, + ots TEXT DEFAULT NULL +) WITHOUT ROWID; + +CREATE TABLE event_tag ( + event TEXT NOT NULL, + seq INTEGER NOT NULL, + label TEXT DEFAULT NULL, + field0 TEXT DEFAULT NULL, + field1 TEXT DEFAULT NULL, + field2 TEXT DEFAULT NULL, + field3 TEXT DEFAULT NULL, + CONSTRAINT fk_event + FOREIGN KEY (event) REFERENCES event (id) + ON DELETE CASCADE +); + +CREATE TABLE event_seen ( + event TEXT NOT NULL, + relay TEXT NOT NULL, + when_seen INTEGER NOT NULL, + UNIQUE (event, relay), + CONSTRAINT fk_event + FOREIGN KEY (event) REFERENCES event (id) + ON DELETE CASCADE +); + +INSERT INTO settings (key, value) values ('version', '1'); +-- INSERT INTO settings (key, value) values ('user_private_key', ''); -- no setting if no key +INSERT INTO settings (key, value) values ('overlap', '600'); +INSERT INTO settings (key, value) values ('feed_chunk', '43200'); +INSERT INTO settings (key, value) values ('autofollow', '0'); + +INSERT OR IGNORE INTO relay (url) values +('wss://nostr-pub.wellorder.net'), +('wss://nostr.bitcoiner.social'), +('wss://nostr-relay.wlvs.space'), +('wss://nostr-pub.semisol.dev'), +('wss://relay.damus.io'), +('wss://nostr.openchain.fr'), +('wss://nostr.delo.software'), +('wss://relay.nostr.info'), +('wss://nostr.oxtr.dev'), +('wss://nostr.ono.re'), +('wss://relay.grunch.dev'), +('wss://nostr.sandwich.farm'); diff --git a/src/db/setting.rs b/src/db/setting.rs new file mode 100644 index 00000000..6e65d2f6 --- /dev/null +++ b/src/db/setting.rs @@ -0,0 +1,144 @@ +use crate::error::Error; +use crate::globals::GLOBALS; +use rusqlite::ToSql; +use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DbSetting { + pub key: String, + pub value: String, +} + +impl DbSetting { + #[allow(dead_code)] + pub async fn fetch(criteria: Option<&str>) -> Result, Error> { + let sql = "SELECT key, value FROM settings".to_owned(); + let sql = match criteria { + None => sql, + Some(crit) => format!("{} WHERE {}", sql, crit), + }; + + let output: Result, Error> = spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(&sql)?; + let rows = stmt.query_map([], |row| { + Ok(DbSetting { + key: row.get(0)?, + value: row.get(1)?, + }) + })?; + + let mut output: Vec = Vec::new(); + for row in rows { + output.push(row?); + } + Ok(output) + }) + .await?; + + output + } + + #[allow(dead_code)] + pub async fn fetch_setting(key: &str) -> Result, Error> { + let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?; + + if db_settings.is_empty() { + Ok(None) + } else { + Ok(Some(db_settings[0].value.clone())) + } + } + + #[allow(dead_code)] + pub async fn fetch_setting_or_default(key: &str, default: &str) -> Result { + let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?; + + if db_settings.is_empty() { + Ok(default.to_string()) + } else { + Ok(db_settings[0].value.clone()) + } + } + + #[allow(dead_code)] + pub async fn fetch_setting_u64_or_default(key: &str, default: u64) -> Result { + let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?; + + if db_settings.is_empty() { + Ok(default) + } else { + Ok(db_settings[0].value.parse::().unwrap_or(default)) + } + } + + #[allow(dead_code)] + pub async fn set(setting: DbSetting) -> Result<(), Error> { + let sql = "INSERT OR REPLACE INTO settings (key, value) VALUES (?1, ?2)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&setting.key, &setting.value))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn insert(setting: DbSetting) -> Result<(), Error> { + let sql = "INSERT OR IGNORE INTO settings (key, value) \ + VALUES (?1, ?2)"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&setting.key, &setting.value))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn update(key: String, value: T) -> Result<(), Error> { + let sql = "UPDATE settings SET value=? WHERE key=?"; + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + + let mut stmt = db.prepare(sql)?; + stmt.execute((&value, &key))?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } + + #[allow(dead_code)] + pub async fn delete(criteria: &str) -> Result<(), Error> { + let sql = format!("DELETE FROM settings WHERE {}", criteria); + + spawn_blocking(move || { + let maybe_db = GLOBALS.db.blocking_lock(); + let db = maybe_db.as_ref().unwrap(); + db.execute(&sql, [])?; + Ok::<(), Error>(()) + }) + .await??; + + Ok(()) + } +} diff --git a/src/globals.rs b/src/globals.rs index 884f9f21..d04ca580 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,9 +1,13 @@ use crate::comms::BusMessage; +use rusqlite::Connection; use tokio::sync::{broadcast, mpsc, Mutex}; /// Only one of these is ever created, via lazy_static!, and represents /// global state for the rust application pub struct Globals { + /// This is our connection to SQLite. Only one thread at a time. + pub db: Mutex>, + /// This is a broadcast channel. All Minions should listen on it. /// To create a receiver, just run .subscribe() on it. pub to_minions: broadcast::Sender, @@ -27,6 +31,7 @@ lazy_static! { let (to_overlord, from_minions) = mpsc::unbounded_channel(); Globals { + db: Mutex::new(None), to_minions, to_overlord, from_minions: Mutex::new(Some(from_minions)), diff --git a/src/main.rs b/src/main.rs index 7fb960e8..92b909a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate lazy_static; mod comms; +mod db; mod error; mod globals;