Database code copied in

This commit is contained in:
Mike Dilger 2022-12-20 18:49:07 +13:00
parent 6449757005
commit b31d1f4334
12 changed files with 1278 additions and 0 deletions

85
src/db/contact.rs Normal file
View File

@ -0,0 +1,85 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbContact {
pub source: String,
pub contact: String,
pub relay: Option<String>,
pub petname: Option<String>,
}
impl DbContact {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbContact>, Error> {
let sql = "SELECT source, contact, relay, petname FROM contact".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbContact>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbContact {
source: row.get(0)?,
contact: row.get(1)?,
relay: row.get(2)?,
petname: row.get(3)?,
})
})?;
let mut output: Vec<DbContact> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn insert(contact: DbContact) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO contact (source, contact, relay, petname) \
VALUES (?1, ?2, ?3, ?4)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&contact.source,
&contact.contact,
&contact.relay,
&contact.petname,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM contact WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

159
src/db/event.rs Normal file
View File

@ -0,0 +1,159 @@
use super::{DbEventSeen, DbEventTag};
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_proto::{Event, IdHex, PublicKeyHex, Unixtime, Url};
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DbEvent {
pub id: IdHex,
pub raw: String,
pub pubkey: PublicKeyHex,
pub created_at: i64,
pub kind: u64,
pub content: String,
pub ots: Option<String>,
}
impl DbEvent {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbEvent>, Error> {
let sql = "SELECT id, raw, pubkey, created_at, kind, content, ots FROM event".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbEvent>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbEvent {
id: IdHex(row.get(0)?),
raw: row.get(1)?,
pubkey: PublicKeyHex(row.get(2)?),
created_at: row.get(3)?,
kind: row.get(4)?,
content: row.get(5)?,
ots: row.get(6)?,
})
})?;
let mut output: Vec<DbEvent> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn insert(event: DbEvent) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO event (id, raw, pubkey, created_at, kind, content, ots) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&event.id.0,
&event.raw,
&event.pubkey.0,
&event.created_at,
&event.kind,
&event.content,
&event.ots,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM event WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn get_author(id: IdHex) -> Result<Option<PublicKeyHex>, Error> {
let sql = "SELECT pubkey FROM event WHERE id=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
let mut rows = stmt.query_map([id.0], |row| row.get(0))?;
if let Some(row) = rows.next() {
return Ok(Some(PublicKeyHex(row?)));
}
Ok(None)
})
.await?
}
#[allow(dead_code)]
pub async fn save_nostr_event(event: &Event, seen_on: Option<Url>) -> Result<(), Error> {
// Convert a nostr Event into a DbEvent
let db_event = DbEvent {
id: event.id.into(),
raw: serde_json::to_string(&event)?,
pubkey: event.pubkey.into(),
created_at: event.created_at.0,
kind: event.kind.into(),
content: event.content.clone(),
ots: event.ots.clone(),
};
// Save into event table
DbEvent::insert(db_event).await?;
// Save the tags into event_tag table
for (seq, tag) in event.tags.iter().enumerate() {
// convert to vec of strings
let v: Vec<String> = serde_json::from_str(&serde_json::to_string(&tag)?)?;
let db_event_tag = DbEventTag {
event: event.id.as_hex_string(),
seq: seq as u64,
label: v.get(0).cloned(),
field0: v.get(1).cloned(),
field1: v.get(2).cloned(),
field2: v.get(3).cloned(),
field3: v.get(4).cloned(),
};
DbEventTag::insert(db_event_tag).await?;
}
// Save the event into event_seen table
if let Some(url) = seen_on {
let db_event_seen = DbEventSeen {
event: event.id.as_hex_string(),
relay: url.0,
when_seen: Unixtime::now()?.0 as u64,
};
DbEventSeen::replace(db_event_seen).await?;
}
Ok(())
}
}

78
src/db/event_seen.rs Normal file
View File

@ -0,0 +1,78 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbEventSeen {
pub event: String,
pub relay: String,
pub when_seen: u64,
}
impl DbEventSeen {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbEventSeen>, Error> {
let sql = "SELECT event, relay, when_seen FROM event_seen".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbEventSeen>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbEventSeen {
event: row.get(0)?,
relay: row.get(1)?,
when_seen: row.get(2)?,
})
})?;
let mut output: Vec<DbEventSeen> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn replace(event_seen: DbEventSeen) -> Result<(), Error> {
let sql = "REPLACE INTO event_seen (event, relay, when_seen) \
VALUES (?1, ?2, ?3)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&event_seen.event, &event_seen.relay, &event_seen.when_seen))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM event_seen WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

96
src/db/event_tag.rs Normal file
View File

@ -0,0 +1,96 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbEventTag {
pub event: String,
pub seq: u64,
pub label: Option<String>,
pub field0: Option<String>,
pub field1: Option<String>,
pub field2: Option<String>,
pub field3: Option<String>,
}
impl DbEventTag {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbEventTag>, Error> {
let sql =
"SELECT event, seq, label, field0, field1, field2, field3 FROM event_tag".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbEventTag>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbEventTag {
event: row.get(0)?,
seq: row.get(1)?,
label: row.get(2)?,
field0: row.get(3)?,
field1: row.get(4)?,
field2: row.get(5)?,
field3: row.get(6)?,
})
})?;
let mut output: Vec<DbEventTag> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn insert(event_tag: DbEventTag) -> Result<(), Error> {
let sql =
"INSERT OR IGNORE INTO event_tag (event, seq, label, field0, field1, field2, field3) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&event_tag.event,
&event_tag.seq,
&event_tag.label,
&event_tag.field0,
&event_tag.field1,
&event_tag.field2,
&event_tag.field3,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM event_tag WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

108
src/db/mod.rs Normal file
View File

@ -0,0 +1,108 @@
mod event;
pub use event::DbEvent;
mod event_seen;
pub use event_seen::DbEventSeen;
mod event_tag;
pub use event_tag::DbEventTag;
mod relay;
pub use relay::DbRelay;
mod person;
pub use person::DbPerson;
mod contact;
pub use contact::DbContact;
mod person_relay;
pub use person_relay::DbPersonRelay;
mod setting;
pub use setting::DbSetting;
use crate::error::Error;
use crate::globals::GLOBALS;
use rusqlite::Connection;
use std::fs;
use tracing::info;
// This sets up the database
#[allow(dead_code)]
#[allow(clippy::or_fun_call)]
pub async fn setup_database() -> Result<(), Error> {
let mut data_dir = dirs::data_dir()
.ok_or::<Error>("Cannot find a directory to store application data.".into())?;
data_dir.push("gossip");
// Create our data directory only if it doesn't exist
fs::create_dir_all(&data_dir)?;
// Connect to (or create) our database
let mut db_path = data_dir.clone();
db_path.push("gossip.sqlite");
let connection = Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
| rusqlite::OpenFlags::SQLITE_OPEN_CREATE
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_NOFOLLOW,
)?;
// Save the connection globally
{
let mut db = GLOBALS.db.lock().await;
*db = Some(connection);
}
// Check and upgrade our data schema
check_and_upgrade().await?;
Ok(())
}
#[allow(dead_code)]
async fn check_and_upgrade() -> Result<(), Error> {
let maybe_db = GLOBALS.db.lock().await;
let db = maybe_db.as_ref().unwrap();
// Load the current version
match db.query_row(
"SELECT value FROM settings WHERE key=?",
["version"],
|row| row.get::<usize, String>(0),
) {
Ok(v) => upgrade(db, v.parse::<u16>().unwrap()),
Err(_e) => {
// Check the error first!
upgrade(db, 0)
}
}
}
macro_rules! apply_sql {
($db:ident, $version:ident, $thisversion:expr, $file:expr) => {{
if $version < $thisversion {
info!("Upgrading database to version {}", $thisversion);
$db.execute_batch(include_str!($file))?;
$db.execute(
&format!(
"UPDATE settings SET value='{}' WHERE key='version'",
$thisversion
),
(),
)?;
$version = $thisversion;
}
}};
}
#[allow(dead_code)]
fn upgrade(db: &Connection, mut version: u16) -> Result<(), Error> {
apply_sql!(db, version, 1, "schema1.sql");
info!("Database is at version {}", version);
Ok(())
}

176
src/db/person.rs Normal file
View File

@ -0,0 +1,176 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_proto::PublicKeyHex;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbPerson {
pub pubkey: PublicKeyHex,
pub name: Option<String>,
pub about: Option<String>,
pub picture: Option<String>,
pub dns_id: Option<String>,
pub dns_id_valid: u8,
pub dns_id_last_checked: Option<u64>,
pub metadata_at: Option<i64>,
pub followed: u8,
}
impl DbPerson {
#[allow(dead_code)]
pub fn new(pubkey: PublicKeyHex) -> DbPerson {
DbPerson {
pubkey,
name: None,
about: None,
picture: None,
dns_id: None,
dns_id_valid: 0,
dns_id_last_checked: None,
metadata_at: None,
followed: 0,
}
}
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbPerson>, Error> {
let sql =
"SELECT pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed FROM person".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbPerson>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbPerson {
pubkey: PublicKeyHex(row.get(0)?),
name: row.get(1)?,
about: row.get(2)?,
picture: row.get(3)?,
dns_id: row.get(4)?,
dns_id_valid: row.get(5)?,
dns_id_last_checked: row.get(6)?,
metadata_at: row.get(7)?,
followed: row.get(8)?,
})
})?;
let mut output: Vec<DbPerson> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn fetch_one(pubkey: PublicKeyHex) -> Result<Option<DbPerson>, Error> {
let people = DbPerson::fetch(Some(&format!("pubkey='{}'", pubkey))).await?;
if people.is_empty() {
Ok(None)
} else {
Ok(Some(people[0].clone()))
}
}
#[allow(dead_code)]
pub async fn insert(person: DbPerson) -> Result<(), Error> {
let sql =
"INSERT OR IGNORE INTO person (pubkey, name, about, picture, dns_id, dns_id_valid, dns_id_last_checked, metadata_at, followed) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&person.pubkey.0,
&person.name,
&person.about,
&person.picture,
&person.dns_id,
&person.dns_id_valid,
&person.dns_id_last_checked,
&person.metadata_at,
&person.followed,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn update(person: DbPerson) -> Result<(), Error> {
let sql =
"UPDATE person SET name=?, about=?, picture=?, dns_id=?, dns_id_valid=?, dns_id_last_checked=?, metadata_at=?, followed=? WHERE pubkey=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&person.name,
&person.about,
&person.picture,
&person.dns_id,
&person.dns_id_valid,
&person.dns_id_last_checked,
&person.metadata_at,
&person.followed,
&person.pubkey.0,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM person WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn populate_new_people(follow_everybody: bool) -> Result<(), Error> {
let sql = if follow_everybody {
"INSERT or IGNORE INTO person (pubkey, followed) SELECT DISTINCT pubkey, 1 FROM EVENT"
} else {
"INSERT or IGNORE INTO person (pubkey) SELECT DISTINCT pubkey FROM EVENT"
};
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

189
src/db/person_relay.rs Normal file
View File

@ -0,0 +1,189 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_proto::PublicKeyHex;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbPersonRelay {
pub person: String,
pub relay: String,
pub recommended: u8,
pub last_fetched: Option<u64>,
}
impl DbPersonRelay {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbPersonRelay>, Error> {
let sql = "SELECT person, relay, recommended, last_fetched FROM person_relay".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbPersonRelay>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbPersonRelay {
person: row.get(0)?,
relay: row.get(1)?,
recommended: row.get(2)?,
last_fetched: row.get(3)?,
})
})?;
let mut output: Vec<DbPersonRelay> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
/// Fetch records matching the given public keys, ordered from highest to lowest rank
#[allow(dead_code)]
pub async fn fetch_for_pubkeys(pubkeys: &[PublicKeyHex]) -> Result<Vec<DbPersonRelay>, Error> {
if pubkeys.is_empty() {
return Ok(vec![]);
}
let sql = format!(
"SELECT person, relay, recommended, person_relay.last_fetched \
FROM person_relay \
INNER JOIN relay ON person_relay.relay=relay.url \
WHERE person IN ({}) ORDER BY person, relay.rank DESC",
repeat_vars(pubkeys.len())
);
let pubkey_strings: Vec<String> = pubkeys.iter().map(|p| p.0.clone()).collect();
let output: Result<Vec<DbPersonRelay>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params_from_iter(pubkey_strings), |row| {
Ok(DbPersonRelay {
person: row.get(0)?,
relay: row.get(1)?,
recommended: row.get(2)?,
last_fetched: row.get(3)?,
})
})?;
let mut output: Vec<DbPersonRelay> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
/// Fetch oldest last_fetched among a set of public keys for a relay
#[allow(dead_code)]
pub async fn fetch_oldest_last_fetched(
pubkeys: &[PublicKeyHex],
relay: &str,
) -> Result<u64, Error> {
if pubkeys.is_empty() {
return Ok(0);
}
let sql = format!(
"SELECT min(coalesce(last_fetched,0)) FROM person_relay
WHERE relay=? AND person in ({})",
repeat_vars(pubkeys.len())
);
let mut params: Vec<String> = vec![relay.to_string()];
params.extend(pubkeys.iter().map(|p| p.0.clone()));
let output: Result<u64, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let mut rows = stmt.query_map(rusqlite::params_from_iter(params), |row| row.get(0))?;
if let Some(result) = rows.next() {
Ok(result?)
} else {
Ok(0)
}
})
.await?;
output
}
#[allow(dead_code)]
pub async fn insert(person_relay: DbPersonRelay) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO person_relay (person, relay, recommended, last_fetched) \
VALUES (?1, ?2, ?3, ?4)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&person_relay.person,
&person_relay.relay,
&person_relay.recommended,
&person_relay.last_fetched,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn update_last_fetched(relay: String, last_fetched: u64) -> Result<(), Error> {
let sql = "UPDATE person_relay SET last_fetched=? where relay=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&last_fetched, &*relay))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM person_relay WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}
fn repeat_vars(count: usize) -> String {
assert_ne!(count, 0);
let mut s = "?,".repeat(count);
// Remove trailing comma
s.pop();
s
}

145
src/db/relay.rs Normal file
View File

@ -0,0 +1,145 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use nostr_proto::Url;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DbRelay {
pub url: String,
pub success_count: u64,
pub failure_count: u64,
pub rank: Option<u64>,
}
impl DbRelay {
#[allow(dead_code)]
pub fn new(url: String) -> DbRelay {
DbRelay {
url,
success_count: 0,
failure_count: 0,
rank: Some(3),
}
}
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbRelay>, Error> {
let sql = "SELECT url, success_count, failure_count, rank FROM relay".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbRelay>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbRelay {
url: row.get(0)?,
success_count: row.get(1)?,
failure_count: row.get(2)?,
rank: row.get(3)?,
})
})?;
let mut output: Vec<DbRelay> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn fetch_one(url: &Url) -> Result<Option<DbRelay>, Error> {
let relays = DbRelay::fetch(Some(&format!("url='{}'", url))).await?;
if relays.is_empty() {
Ok(None)
} else {
Ok(Some(relays[0].clone()))
}
}
#[allow(dead_code)]
pub async fn insert(relay: DbRelay) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank) \
VALUES (?1, ?2, ?3, ?4)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&relay.url,
&relay.success_count,
&relay.failure_count,
&relay.rank,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn update(relay: DbRelay) -> Result<(), Error> {
let sql = "UPDATE relay SET success_count=?, failure_count=?, rank=? WHERE url=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((
&relay.success_count,
&relay.failure_count,
&relay.rank,
&relay.url,
))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM relay WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn populate_new_relays() -> Result<(), Error> {
let sql =
"INSERT OR IGNORE INTO relay (url, rank) SELECT DISTINCT relay, 3 FROM person_relay";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

92
src/db/schema1.sql Normal file
View File

@ -0,0 +1,92 @@
CREATE TABLE settings (
key TEXT PRIMARY KEY NOT NULL,
value TEXT NOT NULL
) WITHOUT ROWID;
CREATE TABLE person (
pubkey TEXT PRIMARY KEY NOT NULL,
name TEXT DEFAULT NULL,
about TEXT DEFAULT NULL,
picture TEXT DEFAULT NULL,
dns_id TEXT DEFAULT NULL,
dns_id_valid INTEGER DEFAULT 0,
dns_id_last_checked INTEGER DEFAULT NULL,
metadata_at INTEGER DEFAULT NULL,
followed INTEGER DEFAULT 0
) WITHOUT ROWID;
CREATE TABLE relay (
url TEXT PRIMARY KEY NOT NULL,
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
rank INTEGER DEFAULT 3
) WITHOUT ROWID;
CREATE TABLE person_relay (
person TEXT NOT NULL,
relay TEXT NOT NULL,
recommended INTEGER DEFAULT 0,
last_fetched INTEGER DEFAULT NULL,
UNIQUE(person, relay)
);
CREATE TABLE contact (
source TEXT NOT NULL,
contact TEXT NOT NULL,
relay TEXT DEFAULT NULL,
petname TEXT DEFAULT NULL,
UNIQUE(source, contact)
);
CREATE TABLE event (
id TEXT PRIMARY KEY NOT NULL,
raw TEXT NOT NULL,
pubkey TEXT NOT NULL,
created_at INTEGER NOT NULL,
kind INTEGER NOT NULL,
content TEXT NOT NULL,
ots TEXT DEFAULT NULL
) WITHOUT ROWID;
CREATE TABLE event_tag (
event TEXT NOT NULL,
seq INTEGER NOT NULL,
label TEXT DEFAULT NULL,
field0 TEXT DEFAULT NULL,
field1 TEXT DEFAULT NULL,
field2 TEXT DEFAULT NULL,
field3 TEXT DEFAULT NULL,
CONSTRAINT fk_event
FOREIGN KEY (event) REFERENCES event (id)
ON DELETE CASCADE
);
CREATE TABLE event_seen (
event TEXT NOT NULL,
relay TEXT NOT NULL,
when_seen INTEGER NOT NULL,
UNIQUE (event, relay),
CONSTRAINT fk_event
FOREIGN KEY (event) REFERENCES event (id)
ON DELETE CASCADE
);
INSERT INTO settings (key, value) values ('version', '1');
-- INSERT INTO settings (key, value) values ('user_private_key', ''); -- no setting if no key
INSERT INTO settings (key, value) values ('overlap', '600');
INSERT INTO settings (key, value) values ('feed_chunk', '43200');
INSERT INTO settings (key, value) values ('autofollow', '0');
INSERT OR IGNORE INTO relay (url) values
('wss://nostr-pub.wellorder.net'),
('wss://nostr.bitcoiner.social'),
('wss://nostr-relay.wlvs.space'),
('wss://nostr-pub.semisol.dev'),
('wss://relay.damus.io'),
('wss://nostr.openchain.fr'),
('wss://nostr.delo.software'),
('wss://relay.nostr.info'),
('wss://nostr.oxtr.dev'),
('wss://nostr.ono.re'),
('wss://relay.grunch.dev'),
('wss://nostr.sandwich.farm');

144
src/db/setting.rs Normal file
View File

@ -0,0 +1,144 @@
use crate::error::Error;
use crate::globals::GLOBALS;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
#[derive(Debug, Serialize, Deserialize)]
pub struct DbSetting {
pub key: String,
pub value: String,
}
impl DbSetting {
#[allow(dead_code)]
pub async fn fetch(criteria: Option<&str>) -> Result<Vec<DbSetting>, Error> {
let sql = "SELECT key, value FROM settings".to_owned();
let sql = match criteria {
None => sql,
Some(crit) => format!("{} WHERE {}", sql, crit),
};
let output: Result<Vec<DbSetting>, Error> = spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DbSetting {
key: row.get(0)?,
value: row.get(1)?,
})
})?;
let mut output: Vec<DbSetting> = Vec::new();
for row in rows {
output.push(row?);
}
Ok(output)
})
.await?;
output
}
#[allow(dead_code)]
pub async fn fetch_setting(key: &str) -> Result<Option<String>, Error> {
let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?;
if db_settings.is_empty() {
Ok(None)
} else {
Ok(Some(db_settings[0].value.clone()))
}
}
#[allow(dead_code)]
pub async fn fetch_setting_or_default(key: &str, default: &str) -> Result<String, Error> {
let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?;
if db_settings.is_empty() {
Ok(default.to_string())
} else {
Ok(db_settings[0].value.clone())
}
}
#[allow(dead_code)]
pub async fn fetch_setting_u64_or_default(key: &str, default: u64) -> Result<u64, Error> {
let db_settings = DbSetting::fetch(Some(&format!("key='{}'", key))).await?;
if db_settings.is_empty() {
Ok(default)
} else {
Ok(db_settings[0].value.parse::<u64>().unwrap_or(default))
}
}
#[allow(dead_code)]
pub async fn set(setting: DbSetting) -> Result<(), Error> {
let sql = "INSERT OR REPLACE INTO settings (key, value) VALUES (?1, ?2)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&setting.key, &setting.value))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn insert(setting: DbSetting) -> Result<(), Error> {
let sql = "INSERT OR IGNORE INTO settings (key, value) \
VALUES (?1, ?2)";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&setting.key, &setting.value))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn update<T: ToSql + Send + 'static>(key: String, value: T) -> Result<(), Error> {
let sql = "UPDATE settings SET value=? WHERE key=?";
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
let mut stmt = db.prepare(sql)?;
stmt.execute((&value, &key))?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
#[allow(dead_code)]
pub async fn delete(criteria: &str) -> Result<(), Error> {
let sql = format!("DELETE FROM settings WHERE {}", criteria);
spawn_blocking(move || {
let maybe_db = GLOBALS.db.blocking_lock();
let db = maybe_db.as_ref().unwrap();
db.execute(&sql, [])?;
Ok::<(), Error>(())
})
.await??;
Ok(())
}
}

View File

@ -1,9 +1,13 @@
use crate::comms::BusMessage;
use rusqlite::Connection;
use tokio::sync::{broadcast, mpsc, Mutex};
/// Only one of these is ever created, via lazy_static!, and represents
/// global state for the rust application
pub struct Globals {
/// This is our connection to SQLite. Only one thread at a time.
pub db: Mutex<Option<Connection>>,
/// This is a broadcast channel. All Minions should listen on it.
/// To create a receiver, just run .subscribe() on it.
pub to_minions: broadcast::Sender<BusMessage>,
@ -27,6 +31,7 @@ lazy_static! {
let (to_overlord, from_minions) = mpsc::unbounded_channel();
Globals {
db: Mutex::new(None),
to_minions,
to_overlord,
from_minions: Mutex::new(Some(from_minions)),

View File

@ -2,6 +2,7 @@
extern crate lazy_static;
mod comms;
mod db;
mod error;
mod globals;