diff --git a/src/api_request_handler.rs b/src/api_request_handler.rs index 60c1ef9..c77ba8c 100644 --- a/src/api_request_handler.rs +++ b/src/api_request_handler.rs @@ -1,20 +1,17 @@ use crate::nip98_auth; -use crate::notification_manager::notification_manager::UserNotificationSettings; +use crate::notification_manager::UserNotificationSettings; use crate::relay_connection::RelayConnection; use http_body_util::Full; use hyper::body::Buf; use hyper::body::Bytes; use hyper::body::Incoming; use hyper::{Request, Response, StatusCode}; -use hyper_tungstenite; use http_body_util::BodyExt; -use nostr; use serde_json::from_value; use crate::notification_manager::NotificationManager; use hyper::Method; -use log; use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; @@ -85,13 +82,13 @@ impl APIHandler { } }; - Ok(Response::builder() + Response::builder() .header("Content-Type", "application/json") .header("Access-Control-Allow-Origin", "*") .status(final_api_response.status) .body(http_body_util::Full::new(Bytes::from( final_api_response.body.to_string(), - )))?) + ))) } async fn handle_websocket_upgrade( @@ -144,7 +141,7 @@ impl APIHandler { }; // 2. NIP-98 authentication - let authorized_pubkey = match self.authenticate(&req, body_bytes).await? { + let authorized_pubkey = match self.authenticate(req, body_bytes).await? { Ok(pubkey) => pubkey, Err(auth_error) => { return Err(Box::new(APIError::AuthenticationError(auth_error))); @@ -169,7 +166,7 @@ impl APIHandler { if let Some(url_params) = route_match( &Method::PUT, "/user-info/:pubkey/:deviceToken", - &parsed_request, + parsed_request, ) { return self.handle_user_info(parsed_request, &url_params).await; } @@ -177,7 +174,7 @@ impl APIHandler { if let Some(url_params) = route_match( &Method::DELETE, "/user-info/:pubkey/:deviceToken", - &parsed_request, + parsed_request, ) { return self .handle_user_info_remove(parsed_request, &url_params) @@ -187,7 +184,7 @@ impl APIHandler { if let Some(url_params) = route_match( &Method::GET, "/user-info/:pubkey/:deviceToken/preferences", - &parsed_request, + parsed_request, ) { return self.get_user_settings(parsed_request, &url_params).await; } @@ -195,7 +192,7 @@ impl APIHandler { if let Some(url_params) = route_match( &Method::PUT, "/user-info/:pubkey/:deviceToken/preferences", - &parsed_request, + parsed_request, ) { return self.set_user_settings(parsed_request, &url_params).await; } @@ -408,10 +405,11 @@ impl APIHandler { settings, ) .await?; - return Ok(APIResponse { + + Ok(APIResponse { status: StatusCode::OK, body: json!({ "message": "User settings saved successfully" }), - }); + }) } async fn get_user_settings( @@ -536,8 +534,7 @@ fn route_match<'a>( } for (i, segment) in path_segments.iter().enumerate() { - if segment.starts_with(':') { - let key = &segment[1..]; + if let Some(key) = segment.strip_prefix(':') { let value = req_segments[i].to_string(); params.insert(key, value); } else if segment != &req_segments[i] { diff --git a/src/main.rs b/src/main.rs index a147e89..ea57cdd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,9 @@ use hyper_util::rt::TokioIo; use std::sync::Arc; use tokio::net::TcpListener; mod notification_manager; -use env_logger; -use log; use r2d2_sqlite::SqliteConnectionManager; -mod relay_connection; -use r2d2; mod notepush_env; +mod relay_connection; use notepush_env::NotePushEnv; mod api_request_handler; mod nip98_auth; diff --git a/src/nip98_auth.rs b/src/nip98_auth.rs index e7a0847..51e9b9c 100644 --- a/src/nip98_auth.rs +++ b/src/nip98_auth.rs @@ -1,6 +1,5 @@ use super::utils::time_delta::TimeDelta; use base64::prelude::*; -use nostr; use nostr::bitcoin::hashes::sha256::Hash as Sha256Hash; use nostr::bitcoin::hashes::Hash; use nostr::util::hex; @@ -33,14 +32,14 @@ pub async fn nip98_verify_auth_header( let decoded_note_json = BASE64_STANDARD .decode(base64_encoded_note.as_bytes()) .map_err(|_| { - format!("Failed to decode base64 encoded note from Nostr authorization header") + "Failed to decode base64 encoded note from Nostr authorization header".to_string() })?; let note_value: Value = serde_json::from_slice(&decoded_note_json) - .map_err(|_| format!("Could not parse JSON note from authorization header"))?; + .map_err(|_| "Could not parse JSON note from authorization header".to_string())?; let note: nostr::Event = nostr::Event::from_value(note_value) - .map_err(|_| format!("Could not parse Nostr note from JSON"))?; + .map_err(|_| "Could not parse Nostr note from JSON".to_string())?; if note.kind != nostr::Kind::HttpAuth { return Err("Nostr note kind in authorization header is incorrect".to_string()); @@ -81,12 +80,12 @@ pub async fn nip98_verify_auth_header( .ok_or("Missing 'payload' tag from Nostr authorization header")?, ) .map_err(|_| { - format!("Failed to decode hex encoded payload from Nostr authorization header") + "Failed to decode hex encoded payload from Nostr authorization header".to_string() })?; let authorized_content_hash: Sha256Hash = Sha256Hash::from_slice(&authorized_content_hash_bytes) - .map_err(|_| format!("Failed to convert hex encoded payload to Sha256Hash"))?; + .map_err(|_| "Failed to convert hex encoded payload to Sha256Hash".to_string())?; let body_hash = Sha256Hash::hash(body_data); if authorized_content_hash != body_hash { diff --git a/src/notepush_env.rs b/src/notepush_env.rs index 9d78804..5784c64 100644 --- a/src/notepush_env.rs +++ b/src/notepush_env.rs @@ -1,4 +1,3 @@ -use a2; use dotenv::dotenv; use std::env; @@ -53,7 +52,7 @@ impl NotePushEnv { let nostr_event_cache_max_age = env::var("NOSTR_EVENT_CACHE_MAX_AGE") .unwrap_or(DEFAULT_NOSTR_EVENT_CACHE_MAX_AGE.to_string()) .parse::() - .map(|s| std::time::Duration::from_secs(s)) + .map(std::time::Duration::from_secs) .unwrap_or(std::time::Duration::from_secs( DEFAULT_NOSTR_EVENT_CACHE_MAX_AGE, )); diff --git a/src/notification_manager/mod.rs b/src/notification_manager/mod.rs index ea72b0a..70f5bde 100644 --- a/src/notification_manager/mod.rs +++ b/src/notification_manager/mod.rs @@ -1,8 +1,753 @@ mod nostr_event_cache; mod nostr_event_extensions; pub mod nostr_network_helper; -pub mod notification_manager; pub mod utils; use nostr_event_extensions::{ExtendedEvent, SqlStringConvertible}; -pub use notification_manager::NotificationManager; + +use a2::{Client, ClientConfig, DefaultNotificationBuilder, NotificationBuilder}; +use nostr::key::PublicKey; +use nostr::nips::nip51::MuteList; +use nostr::types::Timestamp; +use nostr_sdk::JsonUtil; +use nostr_sdk::Kind; +use rusqlite::params; +use serde::Deserialize; +use serde::Serialize; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::Mutex; + +use nostr::Event; +use nostr_event_extensions::Codable; +use nostr_event_extensions::MaybeConvertibleToMuteList; +use nostr_event_extensions::TimestampedMuteList; +use nostr_network_helper::NostrNetworkHelper; +use r2d2_sqlite::SqliteConnectionManager; +use std::fs::File; +use utils::should_mute_notification_for_mutelist; + +// MARK: - NotificationManager + +pub struct NotificationManager { + db: Arc>>, + apns_topic: String, + apns_client: Mutex, + nostr_network_helper: NostrNetworkHelper, + pub event_saver: EventSaver, +} + +#[derive(Clone)] +pub struct EventSaver { + db: Arc>>, +} + +impl EventSaver { + pub fn new(db: Arc>>) -> Self { + Self { db } + } + + pub async fn save_if_needed( + &self, + event: &nostr::Event, + ) -> Result> { + match event.to_mute_list() { + Some(mute_list) => { + match self + .get_saved_mute_list_for(event.author()) + .await + .ok() + .flatten() + { + Some(saved_timestamped_mute_list) => { + let saved_mute_list_timestamp = saved_timestamped_mute_list.timestamp; + if saved_mute_list_timestamp < event.created_at() { + self.save_mute_list(event.author(), mute_list, event.created_at) + .await?; + } else { + return Ok(false); + } + } + None => { + self.save_mute_list(event.author(), mute_list, event.created_at) + .await?; + } + } + Ok(true) + } + None => Ok(false), + } + } + + // MARK: - Muting preferences + + pub async fn save_mute_list( + &self, + pubkey: PublicKey, + mute_list: MuteList, + created_at: Timestamp, + ) -> Result<(), Box> { + let mute_list_json = mute_list.to_json()?; + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + + connection.execute( + "INSERT OR REPLACE INTO muting_preferences (user_pubkey, mute_list, created_at) VALUES (?, ?, ?)", + params![ + pubkey.to_sql_string(), + mute_list_json, + created_at.to_sql_string() + ], + )?; + + log::debug!("Mute list saved for pubkey {}", pubkey.to_hex()); + log::debug!("Mute list: {:?}", mute_list); + + Ok(()) + } + + pub async fn get_saved_mute_list_for( + &self, + pubkey: PublicKey, + ) -> Result, Box> { + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + + let mut stmt = connection.prepare( + "SELECT mute_list, created_at FROM muting_preferences WHERE user_pubkey = ?", + )?; + + let mute_list_info: (serde_json::Value, nostr::Timestamp) = match stmt + .query_row([pubkey.to_sql_string()], |row| { + Ok((row.get(0)?, row.get(1)?)) + }) { + Ok(info) => (info.0, nostr::Timestamp::from_sql_string(info.1)?), + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + let mute_list = MuteList::from_json(mute_list_info.0)?; + let timestamped_mute_list = TimestampedMuteList { + mute_list, + timestamp: mute_list_info.1, + }; + + Ok(Some(timestamped_mute_list)) + } +} + +impl NotificationManager { + // MARK: - Initialization + + #[allow(clippy::too_many_arguments)] + pub async fn new( + db: r2d2::Pool, + relay_url: String, + apns_private_key_path: String, + apns_private_key_id: String, + apns_team_id: String, + apns_environment: a2::client::Endpoint, + apns_topic: String, + cache_max_age: std::time::Duration, + ) -> Result> { + let connection = db.get()?; + Self::setup_database(&connection)?; + + let mut file = File::open(&apns_private_key_path)?; + + let client = Client::token( + &mut file, + &apns_private_key_id, + &apns_team_id, + ClientConfig::new(apns_environment.clone()), + )?; + + let db = Arc::new(Mutex::new(db)); + let event_saver = EventSaver::new(db.clone()); + + let manager = NotificationManager { + db, + apns_topic, + apns_client: Mutex::new(client), + nostr_network_helper: NostrNetworkHelper::new( + relay_url.clone(), + cache_max_age, + event_saver.clone(), + ) + .await?, + event_saver, + }; + + Ok(manager) + } + + // MARK: - Database setup operations + + pub fn setup_database(db: &rusqlite::Connection) -> Result<(), rusqlite::Error> { + // Initial schema setup + + db.execute( + "CREATE TABLE IF NOT EXISTS notifications ( + id TEXT PRIMARY KEY, + event_id TEXT, + pubkey TEXT, + received_notification BOOLEAN + )", + [], + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS notification_event_id_index ON notifications (event_id)", + [], + )?; + + db.execute( + "CREATE TABLE IF NOT EXISTS user_info ( + id TEXT PRIMARY KEY, + device_token TEXT, + pubkey TEXT + )", + [], + )?; + + db.execute( + "CREATE INDEX IF NOT EXISTS user_info_pubkey_index ON user_info (pubkey)", + [], + )?; + + Self::add_column_if_not_exists(db, "notifications", "sent_at", "INTEGER", None)?; + Self::add_column_if_not_exists(db, "user_info", "added_at", "INTEGER", None)?; + + // Notification settings migration (https://github.com/damus-io/damus/issues/2360) + + Self::add_column_if_not_exists( + db, + "user_info", + "zap_notifications_enabled", + "BOOLEAN", + Some("true"), + )?; + Self::add_column_if_not_exists( + db, + "user_info", + "mention_notifications_enabled", + "BOOLEAN", + Some("true"), + )?; + Self::add_column_if_not_exists( + db, + "user_info", + "repost_notifications_enabled", + "BOOLEAN", + Some("true"), + )?; + Self::add_column_if_not_exists( + db, + "user_info", + "reaction_notifications_enabled", + "BOOLEAN", + Some("true"), + )?; + Self::add_column_if_not_exists( + db, + "user_info", + "dm_notifications_enabled", + "BOOLEAN", + Some("true"), + )?; + Self::add_column_if_not_exists( + db, + "user_info", + "only_notifications_from_following_enabled", + "BOOLEAN", + Some("false"), + )?; + + // Migration related to mute list improvements (https://github.com/damus-io/damus/issues/2118) + + db.execute( + "CREATE TABLE IF NOT EXISTS muting_preferences ( + user_pubkey TEXT PRIMARY KEY, + mute_list JSON NOT NULL, + created_at TEXT NOT NULL + )", + [], + )?; + + Ok(()) + } + + fn add_column_if_not_exists( + db: &rusqlite::Connection, + table_name: &str, + column_name: &str, + column_type: &str, + default_value: Option<&str>, + ) -> Result<(), rusqlite::Error> { + let query = format!("PRAGMA table_info({})", table_name); + let mut stmt = db.prepare(&query)?; + let column_names: Vec = stmt + .query_map([], |row| row.get(1))? + .filter_map(|r| r.ok()) + .collect(); + + if !column_names.contains(&column_name.to_string()) { + let query = format!( + "ALTER TABLE {} ADD COLUMN {} {} {}", + table_name, + column_name, + column_type, + match default_value { + Some(value) => format!("DEFAULT {}", value), + None => "".to_string(), + }, + ); + db.execute(&query, [])?; + } + Ok(()) + } + + // MARK: - Business logic + + pub async fn send_notifications_if_needed( + &self, + event: &Event, + ) -> Result<(), Box> { + log::debug!( + "Checking if notifications need to be sent for event: {}", + event.id + ); + let one_week_ago = nostr::Timestamp::now() - 7 * 24 * 60 * 60; + if event.created_at < one_week_ago { + log::debug!("Event is older than a week, not sending notifications"); + return Ok(()); + } + + if !Self::is_event_kind_supported(event.kind) { + log::debug!("Event kind is not supported, not sending notifications"); + return Ok(()); + } + + let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event).await?; + + log::debug!( + "Sending notifications to {} pubkeys", + pubkeys_to_notify.len() + ); + + for pubkey in pubkeys_to_notify { + self.send_event_notifications_to_pubkey(event, &pubkey) + .await?; + { + let db_mutex_guard = self.db.lock().await; + db_mutex_guard.get()?.execute( + "INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at) + VALUES (?, ?, ?, ?, ?)", + params![ + format!("{}:{}", event.id, pubkey), + event.id.to_sql_string(), + pubkey.to_sql_string(), + true, + nostr::Timestamp::now().to_sql_string(), + ], + )?; + } + } + Ok(()) + } + + fn is_event_kind_supported(event_kind: nostr::Kind) -> bool { + match event_kind { + nostr_sdk::Kind::TextNote => true, + nostr_sdk::Kind::EncryptedDirectMessage => true, + nostr_sdk::Kind::Repost => true, + nostr_sdk::Kind::GenericRepost => true, + nostr_sdk::Kind::Reaction => true, + nostr_sdk::Kind::ZapPrivateMessage => true, + nostr_sdk::Kind::ZapRequest => false, + nostr_sdk::Kind::ZapReceipt => true, + _ => false, + } + } + + async fn pubkeys_to_notify_for_event( + &self, + event: &Event, + ) -> Result, Box> { + let notification_status = self.get_notification_status(event).await?; + let relevant_pubkeys = self.pubkeys_relevant_to_event(event); + let mut relevant_pubkeys_that_are_registered = HashSet::new(); + for pubkey in relevant_pubkeys { + if self.is_pubkey_registered(&pubkey).await? { + relevant_pubkeys_that_are_registered.insert(pubkey); + } + } + let pubkeys_that_received_notification = + notification_status.pubkeys_that_received_notification(); + let relevant_pubkeys_yet_to_receive: HashSet = + relevant_pubkeys_that_are_registered + .difference(&pubkeys_that_received_notification) + .filter(|&x| *x != event.pubkey) + .cloned() + .collect(); + + let mut pubkeys_to_notify = HashSet::new(); + for pubkey in relevant_pubkeys_yet_to_receive { + let should_mute: bool = { + self.should_mute_notification_for_pubkey(event, &pubkey) + .await + }; + if !should_mute { + pubkeys_to_notify.insert(pubkey); + } + } + Ok(pubkeys_to_notify) + } + + async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &PublicKey) -> bool { + let latest_mute_list = self + .get_newest_mute_list_available(pubkey) + .await + .ok() + .flatten(); + if let Some(latest_mute_list) = latest_mute_list { + return should_mute_notification_for_mutelist(event, &latest_mute_list); + } + false + } + + async fn get_newest_mute_list_available( + &self, + pubkey: &PublicKey, + ) -> Result, Box> { + let timestamped_saved_mute_list = self.event_saver.get_saved_mute_list_for(*pubkey).await?; + let timestamped_network_mute_list = + self.nostr_network_helper.get_public_mute_list(pubkey).await; + Ok( + match (timestamped_saved_mute_list, timestamped_network_mute_list) { + (Some(local_mute), Some(network_mute)) => { + if local_mute.timestamp > network_mute.timestamp { + log::debug!("Mute lists available in both database and from the network for pubkey {}. Using local mute list since it's newer.", pubkey.to_hex()); + Some(local_mute.mute_list) + } else { + log::debug!("Mute lists available in both database and from the network for pubkey {}. Using network mute list since it's newer.", pubkey.to_hex()); + Some(network_mute.mute_list) + } + } + (Some(local_mute), None) => { + log::debug!("Mute list available in database for pubkey {}, but not from the network. Using local mute list.", pubkey.to_hex()); + Some(local_mute.mute_list) + } + (None, Some(network_mute)) => { + log::debug!("Mute list for pubkey {} available from the network, but not in the database. Using network mute list.", pubkey.to_hex()); + Some(network_mute.mute_list) + } + (None, None) => { + log::debug!("No mute list available for pubkey {}", pubkey.to_hex()); + None + } + }, + ) + } + + fn pubkeys_relevant_to_event(&self, event: &Event) -> HashSet { + event.relevant_pubkeys() + } + + async fn send_event_notifications_to_pubkey( + &self, + event: &Event, + pubkey: &PublicKey, + ) -> Result<(), Box> { + let user_device_tokens = self.get_user_device_tokens(pubkey).await?; + for device_token in user_device_tokens { + if !self + .user_wants_notification(pubkey, device_token.clone(), event) + .await? + { + continue; + } + self.send_event_notification_to_device_token(event, &device_token) + .await?; + } + Ok(()) + } + + async fn user_wants_notification( + &self, + pubkey: &PublicKey, + device_token: String, + event: &Event, + ) -> Result> { + let notification_preferences = self + .get_user_notification_settings(pubkey, device_token) + .await?; + if notification_preferences.only_notifications_from_following_enabled + && !self + .nostr_network_helper + .does_pubkey_follow_pubkey(pubkey, &event.author()) + .await + { + return Ok(false); + } + match event.kind { + Kind::TextNote => Ok(notification_preferences.mention_notifications_enabled), // TODO: Not 100% accurate + Kind::EncryptedDirectMessage => Ok(notification_preferences.dm_notifications_enabled), + Kind::Repost => Ok(notification_preferences.repost_notifications_enabled), + Kind::GenericRepost => Ok(notification_preferences.repost_notifications_enabled), + Kind::Reaction => Ok(notification_preferences.reaction_notifications_enabled), + Kind::ZapPrivateMessage => Ok(notification_preferences.zap_notifications_enabled), + Kind::ZapRequest => Ok(notification_preferences.zap_notifications_enabled), + Kind::ZapReceipt => Ok(notification_preferences.zap_notifications_enabled), + _ => Ok(false), + } + } + + async fn is_pubkey_token_pair_registered( + &self, + pubkey: &PublicKey, + device_token: &str, + ) -> Result> { + let current_device_tokens = self.get_user_device_tokens(pubkey).await?; + Ok(current_device_tokens.contains(&device_token.to_string())) + } + + async fn is_pubkey_registered( + &self, + pubkey: &PublicKey, + ) -> Result> { + Ok(!self.get_user_device_tokens(pubkey).await?.is_empty()) + } + + async fn get_user_device_tokens( + &self, + pubkey: &PublicKey, + ) -> Result, Box> { + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + let mut stmt = connection.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?; + let device_tokens = stmt + .query_map([pubkey.to_sql_string()], |row| row.get(0))? + .filter_map(|r| r.ok()) + .collect(); + Ok(device_tokens) + } + + async fn get_notification_status( + &self, + event: &Event, + ) -> Result> { + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + let mut stmt = connection.prepare( + "SELECT pubkey, received_notification FROM notifications WHERE event_id = ?", + )?; + let rows: std::collections::HashMap = stmt + .query_map([event.id.to_sql_string()], |row| { + Ok((row.get(0)?, row.get(1)?)) + })? + .filter_map(|r: Result<(String, bool), rusqlite::Error>| r.ok()) + .filter_map(|r: (String, bool)| { + let pubkey = PublicKey::from_sql_string(r.0).ok()?; + let received_notification = r.1; + Some((pubkey, received_notification)) + }) + .collect(); + + let mut status_info = std::collections::HashMap::new(); + for row in rows { + let (pubkey, received_notification) = row; + status_info.insert(pubkey, received_notification); + } + + Ok(NotificationStatus { status_info }) + } + + async fn send_event_notification_to_device_token( + &self, + event: &Event, + device_token: &str, + ) -> Result<(), Box> { + let (title, subtitle, body) = self.format_notification_message(event); + + log::debug!("Sending notification to device token: {}", device_token); + + let mut payload = DefaultNotificationBuilder::new() + .set_title(&title) + .set_subtitle(&subtitle) + .set_body(&body) + .set_mutable_content() + .set_content_available() + .build(device_token, Default::default()); + + payload.options.apns_topic = Some(self.apns_topic.as_str()); + payload.data.insert( + "nostr_event", + serde_json::Value::String(event.try_as_json()?), + ); + + let apns_client_mutex_guard = self.apns_client.lock().await; + + match apns_client_mutex_guard.send(payload).await { + Ok(_response) => {} + Err(e) => log::error!( + "Failed to send notification to device token '{}': {}", + device_token, + e + ), + } + + log::info!("Notification sent to device token: {}", device_token); + + Ok(()) + } + + fn format_notification_message(&self, event: &Event) -> (String, String, String) { + // NOTE: This is simple because the client will handle formatting. These are just fallbacks. + let (title, body) = match event.kind { + nostr_sdk::Kind::TextNote => ("New activity".to_string(), event.content.clone()), + nostr_sdk::Kind::EncryptedDirectMessage => ( + "New direct message".to_string(), + "Contents are encrypted".to_string(), + ), + nostr_sdk::Kind::Repost => ("Someone reposted".to_string(), event.content.clone()), + nostr_sdk::Kind::Reaction => { + let content_text = event.content.clone(); + let formatted_text = match content_text.as_str() { + "" => "❤️", + "+" => "❤️", + "-" => "👎", + _ => content_text.as_str(), + }; + ("New reaction".to_string(), formatted_text.to_string()) + } + nostr_sdk::Kind::ZapPrivateMessage => ( + "New zap private message".to_string(), + "Contents are encrypted".to_string(), + ), + nostr_sdk::Kind::ZapReceipt => ("Someone zapped you".to_string(), "".to_string()), + _ => ("New activity".to_string(), "".to_string()), + }; + (title, "".to_string(), body) + } + + // MARK: - User device info and settings + + pub async fn save_user_device_info_if_not_present( + &self, + pubkey: nostr::PublicKey, + device_token: &str, + ) -> Result<(), Box> { + if self + .is_pubkey_token_pair_registered(&pubkey, device_token) + .await? + { + return Ok(()); + } + self.save_user_device_info(pubkey, device_token).await + } + + pub async fn save_user_device_info( + &self, + pubkey: nostr::PublicKey, + device_token: &str, + ) -> Result<(), Box> { + let current_time_unix = Timestamp::now(); + let db_mutex_guard = self.db.lock().await; + db_mutex_guard.get()?.execute( + "INSERT OR REPLACE INTO user_info (id, pubkey, device_token, added_at) VALUES (?, ?, ?, ?)", + params![ + format!("{}:{}", pubkey.to_sql_string(), device_token), + pubkey.to_sql_string(), + device_token, + current_time_unix.to_sql_string() + ], + )?; + Ok(()) + } + + pub async fn remove_user_device_info( + &self, + pubkey: nostr::PublicKey, + device_token: &str, + ) -> Result<(), Box> { + let db_mutex_guard = self.db.lock().await; + db_mutex_guard.get()?.execute( + "DELETE FROM user_info WHERE pubkey = ? AND device_token = ?", + params![pubkey.to_sql_string(), device_token], + )?; + Ok(()) + } + + pub async fn get_user_notification_settings( + &self, + pubkey: &PublicKey, + device_token: String, + ) -> Result> { + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + let mut stmt = connection.prepare( + "SELECT zap_notifications_enabled, mention_notifications_enabled, repost_notifications_enabled, reaction_notifications_enabled, dm_notifications_enabled, only_notifications_from_following_enabled FROM user_info WHERE pubkey = ? AND device_token = ?", + )?; + let settings = stmt.query_row([pubkey.to_sql_string(), device_token], |row| { + Ok(UserNotificationSettings { + zap_notifications_enabled: row.get(0)?, + mention_notifications_enabled: row.get(1)?, + repost_notifications_enabled: row.get(2)?, + reaction_notifications_enabled: row.get(3)?, + dm_notifications_enabled: row.get(4)?, + only_notifications_from_following_enabled: row.get(5)?, + }) + })?; + + Ok(settings) + } + + pub async fn save_user_notification_settings( + &self, + pubkey: &PublicKey, + device_token: String, + settings: UserNotificationSettings, + ) -> Result<(), Box> { + let db_mutex_guard = self.db.lock().await; + let connection = db_mutex_guard.get()?; + connection.execute( + "UPDATE user_info SET zap_notifications_enabled = ?, mention_notifications_enabled = ?, repost_notifications_enabled = ?, reaction_notifications_enabled = ?, dm_notifications_enabled = ?, only_notifications_from_following_enabled = ? WHERE pubkey = ? AND device_token = ?", + params![ + settings.zap_notifications_enabled, + settings.mention_notifications_enabled, + settings.repost_notifications_enabled, + settings.reaction_notifications_enabled, + settings.dm_notifications_enabled, + settings.only_notifications_from_following_enabled, + pubkey.to_sql_string(), + device_token, + ], + )?; + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct UserNotificationSettings { + zap_notifications_enabled: bool, + mention_notifications_enabled: bool, + repost_notifications_enabled: bool, + reaction_notifications_enabled: bool, + dm_notifications_enabled: bool, + only_notifications_from_following_enabled: bool, +} + +struct NotificationStatus { + status_info: std::collections::HashMap, +} + +impl NotificationStatus { + fn pubkeys_that_received_notification(&self) -> HashSet { + self.status_info + .iter() + .filter(|&(_, &received_notification)| received_notification) + .map(|(pubkey, _)| *pubkey) + .collect() + } +} diff --git a/src/notification_manager/nostr_event_cache.rs b/src/notification_manager/nostr_event_cache.rs index c4bc531..f11cabc 100644 --- a/src/notification_manager/nostr_event_cache.rs +++ b/src/notification_manager/nostr_event_cache.rs @@ -1,6 +1,5 @@ use super::nostr_event_extensions::{MaybeConvertibleToRelayList, RelayList, TimestampedMuteList}; use crate::notification_manager::nostr_event_extensions::MaybeConvertibleToTimestampedMuteList; -use log; use nostr_sdk::prelude::*; use std::collections::HashMap; use tokio::time::{Duration, Instant}; @@ -92,8 +91,8 @@ impl Cache { // MARK: - Adding items to the cache - pub fn add_optional_mute_list_with_author<'a>( - &'a mut self, + pub fn add_optional_mute_list_with_author( + &mut self, author: &PublicKey, mute_list: Option<&Event>, ) { @@ -105,8 +104,8 @@ impl Cache { } } - pub fn add_optional_relay_list_with_author<'a>( - &'a mut self, + pub fn add_optional_relay_list_with_author( + &mut self, author: &PublicKey, relay_list_event: Option<&Event>, ) { @@ -118,8 +117,8 @@ impl Cache { } } - pub fn add_optional_contact_list_with_author<'a>( - &'a mut self, + pub fn add_optional_contact_list_with_author( + &mut self, author: &PublicKey, contact_list: Option<&Event>, ) { @@ -135,7 +134,7 @@ impl Cache { match event.kind { Kind::MuteList => { self.mute_lists.insert( - event.pubkey.clone(), + event.pubkey, CacheEntry::maybe(event.to_timestamped_mute_list()), ); log::debug!( @@ -149,17 +148,15 @@ impl Cache { event.id.to_hex() ); self.contact_lists - .insert(event.pubkey.clone(), CacheEntry::new(event.to_owned())); + .insert(event.pubkey, CacheEntry::new(event.to_owned())); } Kind::RelayList => { log::debug!( "Added relay list to the cache. Event ID: {}", event.id.to_hex() ); - self.relay_lists.insert( - event.pubkey.clone(), - CacheEntry::maybe(event.to_relay_list()), - ); + self.relay_lists + .insert(event.pubkey, CacheEntry::maybe(event.to_relay_list())); } _ => { log::debug!( diff --git a/src/notification_manager/nostr_event_extensions.rs b/src/notification_manager/nostr_event_extensions.rs index 9885471..4f45b17 100644 --- a/src/notification_manager/nostr_event_extensions.rs +++ b/src/notification_manager/nostr_event_extensions.rs @@ -35,7 +35,7 @@ impl ExtendedEvent for nostr::Event { /// Retrieves a set of pubkeys relevant to the note fn relevant_pubkeys(&self) -> std::collections::HashSet { let mut pubkeys = self.referenced_pubkeys(); - pubkeys.insert(self.pubkey.clone()); + pubkeys.insert(self.pubkey); pubkeys } @@ -110,21 +110,9 @@ impl MaybeConvertibleToMuteList for nostr::Event { return None; } Some(MuteList { - public_keys: self - .referenced_pubkeys() - .iter() - .map(|pk| pk.clone()) - .collect(), - hashtags: self - .referenced_hashtags() - .iter() - .map(|tag| tag.clone()) - .collect(), - event_ids: self - .referenced_event_ids() - .iter() - .map(|id| id.clone()) - .collect(), + public_keys: self.referenced_pubkeys().iter().copied().collect(), + hashtags: self.referenced_hashtags().iter().cloned().collect(), + event_ids: self.referenced_event_ids().iter().copied().collect(), words: self .get_tags_content(TagKind::Word) .iter() @@ -142,7 +130,7 @@ impl MaybeConvertibleToTimestampedMuteList for nostr::Event { let mute_list = self.to_mute_list()?; Some(TimestampedMuteList { mute_list, - timestamp: self.created_at.clone(), + timestamp: self.created_at, }) } } @@ -158,11 +146,11 @@ impl MaybeConvertibleToRelayList for nostr::Event { if self.kind != Kind::RelayList { return None; } - let extracted_relay_list = nip65::extract_relay_list(&self); + let extracted_relay_list = nip65::extract_relay_list(self); // Convert the extracted relay list data fully into owned data that can be returned let extracted_relay_list_owned = extracted_relay_list .into_iter() - .map(|(url, metadata)| (url.clone(), metadata.as_ref().map(|m| m.clone()))) + .map(|(url, metadata)| (url.clone(), metadata.clone())) .collect(); Some(extracted_relay_list_owned) diff --git a/src/notification_manager/nostr_network_helper.rs b/src/notification_manager/nostr_network_helper.rs index 94534cd..da84cbc 100644 --- a/src/notification_manager/nostr_network_helper.rs +++ b/src/notification_manager/nostr_network_helper.rs @@ -1,6 +1,6 @@ use super::nostr_event_cache::Cache; use super::nostr_event_extensions::{RelayList, TimestampedMuteList}; -use super::notification_manager::EventSaver; +use super::EventSaver; use super::ExtendedEvent; use nostr_sdk::prelude::*; use tokio::sync::Mutex; @@ -22,7 +22,7 @@ impl NostrNetworkHelper { cache_max_age: Duration, event_saver: EventSaver, ) -> Result> { - let client = Client::new(&Keys::generate()); + let client = Client::new(Keys::generate()); client.add_relay(relay_url.clone()).await?; client.connect().await; Ok(NostrNetworkHelper { @@ -133,7 +133,7 @@ impl NostrNetworkHelper { } async fn make_client_for(&self, author: &PublicKey) -> Option { - let client = Client::new(&Keys::generate()); + let client = Client::new(Keys::generate()); let relay_list = self.get_relay_list(author).await?; for (url, metadata) in relay_list { @@ -157,7 +157,7 @@ impl NostrNetworkHelper { ) -> Option { let subscription_filter = Filter::new() .kinds(vec![kind]) - .authors(vec![author.clone()]) + .authors(vec![*author]) .limit(1); let mut notifications = client.notifications(); @@ -168,17 +168,15 @@ impl NostrNetworkHelper { let mut event: Option = None; while let Ok(result) = timeout(NOTE_FETCH_TIMEOUT, notifications.recv()).await { - if let Ok(notification) = result { - if let RelayPoolNotification::Event { - subscription_id, - event: event_option, - .. - } = notification - { - if this_subscription_id == subscription_id && event_option.kind == kind { - event = Some((*event_option).clone()); - break; - } + if let Ok(RelayPoolNotification::Event { + subscription_id, + event: event_option, + .. + }) = result + { + if this_subscription_id == subscription_id && event_option.kind == kind { + event = Some((*event_option).clone()); + break; } } } diff --git a/src/notification_manager/notification_manager.rs b/src/notification_manager/notification_manager.rs deleted file mode 100644 index 8b97af5..0000000 --- a/src/notification_manager/notification_manager.rs +++ /dev/null @@ -1,752 +0,0 @@ -use a2::{Client, ClientConfig, DefaultNotificationBuilder, NotificationBuilder}; -use log; -use nostr::key::PublicKey; -use nostr::nips::nip51::MuteList; -use nostr::types::Timestamp; -use nostr_sdk::JsonUtil; -use nostr_sdk::Kind; -use rusqlite; -use rusqlite::params; -use serde::Deserialize; -use serde::Serialize; -use std::collections::HashSet; -use std::sync::Arc; -use tokio; -use tokio::sync::Mutex; - -use super::nostr_event_extensions::Codable; -use super::nostr_event_extensions::MaybeConvertibleToMuteList; -use super::nostr_event_extensions::TimestampedMuteList; -use super::nostr_network_helper::NostrNetworkHelper; -use super::utils::should_mute_notification_for_mutelist; -use super::ExtendedEvent; -use super::SqlStringConvertible; -use nostr::Event; -use r2d2; -use r2d2_sqlite::SqliteConnectionManager; -use std::fs::File; - -// MARK: - NotificationManager - -pub struct NotificationManager { - db: Arc>>, - apns_topic: String, - apns_client: Mutex, - nostr_network_helper: NostrNetworkHelper, - pub event_saver: EventSaver, -} - -#[derive(Clone)] -pub struct EventSaver { - db: Arc>>, -} - -impl EventSaver { - pub fn new(db: Arc>>) -> Self { - Self { db } - } - - pub async fn save_if_needed( - &self, - event: &nostr::Event, - ) -> Result> { - match event.to_mute_list() { - Some(mute_list) => { - match self - .get_saved_mute_list_for(event.author()) - .await - .ok() - .flatten() - { - Some(saved_timestamped_mute_list) => { - let saved_mute_list_timestamp = saved_timestamped_mute_list.timestamp; - if saved_mute_list_timestamp < event.created_at() { - self.save_mute_list(event.author(), mute_list, event.created_at) - .await?; - } else { - return Ok(false); - } - } - None => { - self.save_mute_list(event.author(), mute_list, event.created_at) - .await?; - } - } - Ok(true) - } - None => Ok(false), - } - } - - // MARK: - Muting preferences - - pub async fn save_mute_list( - &self, - pubkey: PublicKey, - mute_list: MuteList, - created_at: Timestamp, - ) -> Result<(), Box> { - let mute_list_json = mute_list.to_json()?; - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - - connection.execute( - "INSERT OR REPLACE INTO muting_preferences (user_pubkey, mute_list, created_at) VALUES (?, ?, ?)", - params![ - pubkey.to_sql_string(), - mute_list_json, - created_at.to_sql_string() - ], - )?; - - log::debug!("Mute list saved for pubkey {}", pubkey.to_hex()); - log::debug!("Mute list: {:?}", mute_list); - - Ok(()) - } - - pub async fn get_saved_mute_list_for( - &self, - pubkey: PublicKey, - ) -> Result, Box> { - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - - let mut stmt = connection.prepare( - "SELECT mute_list, created_at FROM muting_preferences WHERE user_pubkey = ?", - )?; - - let mute_list_info: (serde_json::Value, nostr::Timestamp) = match stmt - .query_row([pubkey.to_sql_string()], |row| { - Ok((row.get(0)?, row.get(1)?)) - }) { - Ok(info) => (info.0, nostr::Timestamp::from_sql_string(info.1)?), - Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), - Err(e) => return Err(e.into()), - }; - - let mute_list = MuteList::from_json(mute_list_info.0)?; - let timestamped_mute_list = TimestampedMuteList { - mute_list, - timestamp: mute_list_info.1, - }; - - Ok(Some(timestamped_mute_list)) - } -} - -impl NotificationManager { - // MARK: - Initialization - - pub async fn new( - db: r2d2::Pool, - relay_url: String, - apns_private_key_path: String, - apns_private_key_id: String, - apns_team_id: String, - apns_environment: a2::client::Endpoint, - apns_topic: String, - cache_max_age: std::time::Duration, - ) -> Result> { - let connection = db.get()?; - Self::setup_database(&connection)?; - - let mut file = File::open(&apns_private_key_path)?; - - let client = Client::token( - &mut file, - &apns_private_key_id, - &apns_team_id, - ClientConfig::new(apns_environment.clone()), - )?; - - let db = Arc::new(Mutex::new(db)); - let event_saver = EventSaver::new(db.clone()); - - let manager = NotificationManager { - db, - apns_topic, - apns_client: Mutex::new(client), - nostr_network_helper: NostrNetworkHelper::new( - relay_url.clone(), - cache_max_age, - event_saver.clone(), - ) - .await?, - event_saver, - }; - - Ok(manager) - } - - // MARK: - Database setup operations - - pub fn setup_database(db: &rusqlite::Connection) -> Result<(), rusqlite::Error> { - // Initial schema setup - - db.execute( - "CREATE TABLE IF NOT EXISTS notifications ( - id TEXT PRIMARY KEY, - event_id TEXT, - pubkey TEXT, - received_notification BOOLEAN - )", - [], - )?; - - db.execute( - "CREATE INDEX IF NOT EXISTS notification_event_id_index ON notifications (event_id)", - [], - )?; - - db.execute( - "CREATE TABLE IF NOT EXISTS user_info ( - id TEXT PRIMARY KEY, - device_token TEXT, - pubkey TEXT - )", - [], - )?; - - db.execute( - "CREATE INDEX IF NOT EXISTS user_info_pubkey_index ON user_info (pubkey)", - [], - )?; - - Self::add_column_if_not_exists(&db, "notifications", "sent_at", "INTEGER", None)?; - Self::add_column_if_not_exists(&db, "user_info", "added_at", "INTEGER", None)?; - - // Notification settings migration (https://github.com/damus-io/damus/issues/2360) - - Self::add_column_if_not_exists( - &db, - "user_info", - "zap_notifications_enabled", - "BOOLEAN", - Some("true"), - )?; - Self::add_column_if_not_exists( - &db, - "user_info", - "mention_notifications_enabled", - "BOOLEAN", - Some("true"), - )?; - Self::add_column_if_not_exists( - &db, - "user_info", - "repost_notifications_enabled", - "BOOLEAN", - Some("true"), - )?; - Self::add_column_if_not_exists( - &db, - "user_info", - "reaction_notifications_enabled", - "BOOLEAN", - Some("true"), - )?; - Self::add_column_if_not_exists( - &db, - "user_info", - "dm_notifications_enabled", - "BOOLEAN", - Some("true"), - )?; - Self::add_column_if_not_exists( - &db, - "user_info", - "only_notifications_from_following_enabled", - "BOOLEAN", - Some("false"), - )?; - - // Migration related to mute list improvements (https://github.com/damus-io/damus/issues/2118) - - db.execute( - "CREATE TABLE IF NOT EXISTS muting_preferences ( - user_pubkey TEXT PRIMARY KEY, - mute_list JSON NOT NULL, - created_at TEXT NOT NULL - )", - [], - )?; - - Ok(()) - } - - fn add_column_if_not_exists( - db: &rusqlite::Connection, - table_name: &str, - column_name: &str, - column_type: &str, - default_value: Option<&str>, - ) -> Result<(), rusqlite::Error> { - let query = format!("PRAGMA table_info({})", table_name); - let mut stmt = db.prepare(&query)?; - let column_names: Vec = stmt - .query_map([], |row| row.get(1))? - .filter_map(|r| r.ok()) - .collect(); - - if !column_names.contains(&column_name.to_string()) { - let query = format!( - "ALTER TABLE {} ADD COLUMN {} {} {}", - table_name, - column_name, - column_type, - match default_value { - Some(value) => format!("DEFAULT {}", value), - None => "".to_string(), - }, - ); - db.execute(&query, [])?; - } - Ok(()) - } - - // MARK: - Business logic - - pub async fn send_notifications_if_needed( - &self, - event: &Event, - ) -> Result<(), Box> { - log::debug!( - "Checking if notifications need to be sent for event: {}", - event.id - ); - let one_week_ago = nostr::Timestamp::now() - 7 * 24 * 60 * 60; - if event.created_at < one_week_ago { - log::debug!("Event is older than a week, not sending notifications"); - return Ok(()); - } - - if !Self::is_event_kind_supported(event.kind) { - log::debug!("Event kind is not supported, not sending notifications"); - return Ok(()); - } - - let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event).await?; - - log::debug!( - "Sending notifications to {} pubkeys", - pubkeys_to_notify.len() - ); - - for pubkey in pubkeys_to_notify { - self.send_event_notifications_to_pubkey(event, &pubkey) - .await?; - { - let db_mutex_guard = self.db.lock().await; - db_mutex_guard.get()?.execute( - "INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at) - VALUES (?, ?, ?, ?, ?)", - params![ - format!("{}:{}", event.id, pubkey), - event.id.to_sql_string(), - pubkey.to_sql_string(), - true, - nostr::Timestamp::now().to_sql_string(), - ], - )?; - } - } - Ok(()) - } - - fn is_event_kind_supported(event_kind: nostr::Kind) -> bool { - match event_kind { - nostr_sdk::Kind::TextNote => true, - nostr_sdk::Kind::EncryptedDirectMessage => true, - nostr_sdk::Kind::Repost => true, - nostr_sdk::Kind::GenericRepost => true, - nostr_sdk::Kind::Reaction => true, - nostr_sdk::Kind::ZapPrivateMessage => true, - nostr_sdk::Kind::ZapRequest => false, - nostr_sdk::Kind::ZapReceipt => true, - _ => false, - } - } - - async fn pubkeys_to_notify_for_event( - &self, - event: &Event, - ) -> Result, Box> { - let notification_status = self.get_notification_status(event).await?; - let relevant_pubkeys = self.pubkeys_relevant_to_event(event); - let mut relevant_pubkeys_that_are_registered = HashSet::new(); - for pubkey in relevant_pubkeys { - if self.is_pubkey_registered(&pubkey).await? { - relevant_pubkeys_that_are_registered.insert(pubkey); - } - } - let pubkeys_that_received_notification = - notification_status.pubkeys_that_received_notification(); - let relevant_pubkeys_yet_to_receive: HashSet = - relevant_pubkeys_that_are_registered - .difference(&pubkeys_that_received_notification) - .filter(|&x| *x != event.pubkey) - .cloned() - .collect(); - - let mut pubkeys_to_notify = HashSet::new(); - for pubkey in relevant_pubkeys_yet_to_receive { - let should_mute: bool = { - self.should_mute_notification_for_pubkey(event, &pubkey) - .await - }; - if !should_mute { - pubkeys_to_notify.insert(pubkey); - } - } - Ok(pubkeys_to_notify) - } - - async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &PublicKey) -> bool { - let latest_mute_list = self - .get_newest_mute_list_available(pubkey) - .await - .ok() - .flatten(); - if let Some(latest_mute_list) = latest_mute_list { - return should_mute_notification_for_mutelist(event, &latest_mute_list); - } - return false; - } - - async fn get_newest_mute_list_available( - &self, - pubkey: &PublicKey, - ) -> Result, Box> { - let timestamped_saved_mute_list = self.event_saver.get_saved_mute_list_for(*pubkey).await?; - let timestamped_network_mute_list = - self.nostr_network_helper.get_public_mute_list(pubkey).await; - Ok( - match (timestamped_saved_mute_list, timestamped_network_mute_list) { - (Some(local_mute), Some(network_mute)) => { - if local_mute.timestamp > network_mute.timestamp { - log::debug!("Mute lists available in both database and from the network for pubkey {}. Using local mute list since it's newer.", pubkey.to_hex()); - Some(local_mute.mute_list) - } else { - log::debug!("Mute lists available in both database and from the network for pubkey {}. Using network mute list since it's newer.", pubkey.to_hex()); - Some(network_mute.mute_list) - } - } - (Some(local_mute), None) => { - log::debug!("Mute list available in database for pubkey {}, but not from the network. Using local mute list.", pubkey.to_hex()); - Some(local_mute.mute_list) - } - (None, Some(network_mute)) => { - log::debug!("Mute list for pubkey {} available from the network, but not in the database. Using network mute list.", pubkey.to_hex()); - Some(network_mute.mute_list) - } - (None, None) => { - log::debug!("No mute list available for pubkey {}", pubkey.to_hex()); - None - } - }, - ) - } - - fn pubkeys_relevant_to_event(&self, event: &Event) -> HashSet { - event.relevant_pubkeys() - } - - async fn send_event_notifications_to_pubkey( - &self, - event: &Event, - pubkey: &PublicKey, - ) -> Result<(), Box> { - let user_device_tokens = self.get_user_device_tokens(pubkey).await?; - for device_token in user_device_tokens { - if !self - .user_wants_notification(pubkey, device_token.clone(), event) - .await? - { - continue; - } - self.send_event_notification_to_device_token(event, &device_token) - .await?; - } - Ok(()) - } - - async fn user_wants_notification( - &self, - pubkey: &PublicKey, - device_token: String, - event: &Event, - ) -> Result> { - let notification_preferences = self - .get_user_notification_settings(pubkey, device_token) - .await?; - if notification_preferences.only_notifications_from_following_enabled { - if !self - .nostr_network_helper - .does_pubkey_follow_pubkey(pubkey, &event.author()) - .await - { - return Ok(false); - } - } - match event.kind { - Kind::TextNote => Ok(notification_preferences.mention_notifications_enabled), // TODO: Not 100% accurate - Kind::EncryptedDirectMessage => Ok(notification_preferences.dm_notifications_enabled), - Kind::Repost => Ok(notification_preferences.repost_notifications_enabled), - Kind::GenericRepost => Ok(notification_preferences.repost_notifications_enabled), - Kind::Reaction => Ok(notification_preferences.reaction_notifications_enabled), - Kind::ZapPrivateMessage => Ok(notification_preferences.zap_notifications_enabled), - Kind::ZapRequest => Ok(notification_preferences.zap_notifications_enabled), - Kind::ZapReceipt => Ok(notification_preferences.zap_notifications_enabled), - _ => Ok(false), - } - } - - async fn is_pubkey_token_pair_registered( - &self, - pubkey: &PublicKey, - device_token: &str, - ) -> Result> { - let current_device_tokens = self.get_user_device_tokens(pubkey).await?; - Ok(current_device_tokens.contains(&device_token.to_string())) - } - - async fn is_pubkey_registered( - &self, - pubkey: &PublicKey, - ) -> Result> { - Ok(!self.get_user_device_tokens(pubkey).await?.is_empty()) - } - - async fn get_user_device_tokens( - &self, - pubkey: &PublicKey, - ) -> Result, Box> { - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - let mut stmt = connection.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?; - let device_tokens = stmt - .query_map([pubkey.to_sql_string()], |row| row.get(0))? - .filter_map(|r| r.ok()) - .collect(); - Ok(device_tokens) - } - - async fn get_notification_status( - &self, - event: &Event, - ) -> Result> { - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - let mut stmt = connection.prepare( - "SELECT pubkey, received_notification FROM notifications WHERE event_id = ?", - )?; - let rows: std::collections::HashMap = stmt - .query_map([event.id.to_sql_string()], |row| { - Ok((row.get(0)?, row.get(1)?)) - })? - .filter_map(|r: Result<(String, bool), rusqlite::Error>| r.ok()) - .filter_map(|r: (String, bool)| { - let pubkey = PublicKey::from_sql_string(r.0).ok()?; - let received_notification = r.1; - Some((pubkey, received_notification)) - }) - .collect(); - - let mut status_info = std::collections::HashMap::new(); - for row in rows { - let (pubkey, received_notification) = row; - status_info.insert(pubkey, received_notification); - } - - Ok(NotificationStatus { status_info }) - } - - async fn send_event_notification_to_device_token( - &self, - event: &Event, - device_token: &str, - ) -> Result<(), Box> { - let (title, subtitle, body) = self.format_notification_message(event); - - log::debug!("Sending notification to device token: {}", device_token); - - let mut payload = DefaultNotificationBuilder::new() - .set_title(&title) - .set_subtitle(&subtitle) - .set_body(&body) - .set_mutable_content() - .set_content_available() - .build(device_token, Default::default()); - - payload.options.apns_topic = Some(self.apns_topic.as_str()); - payload.data.insert( - "nostr_event", - serde_json::Value::String(event.try_as_json()?), - ); - - let apns_client_mutex_guard = self.apns_client.lock().await; - - match apns_client_mutex_guard.send(payload).await { - Ok(_response) => {} - Err(e) => log::error!( - "Failed to send notification to device token '{}': {}", - device_token, - e - ), - } - - log::info!("Notification sent to device token: {}", device_token); - - Ok(()) - } - - fn format_notification_message(&self, event: &Event) -> (String, String, String) { - // NOTE: This is simple because the client will handle formatting. These are just fallbacks. - let (title, body) = match event.kind { - nostr_sdk::Kind::TextNote => ("New activity".to_string(), event.content.clone()), - nostr_sdk::Kind::EncryptedDirectMessage => ( - "New direct message".to_string(), - "Contents are encrypted".to_string(), - ), - nostr_sdk::Kind::Repost => ("Someone reposted".to_string(), event.content.clone()), - nostr_sdk::Kind::Reaction => { - let content_text = event.content.clone(); - let formatted_text = match content_text.as_str() { - "" => "❤️", - "+" => "❤️", - "-" => "👎", - _ => content_text.as_str(), - }; - ("New reaction".to_string(), formatted_text.to_string()) - } - nostr_sdk::Kind::ZapPrivateMessage => ( - "New zap private message".to_string(), - "Contents are encrypted".to_string(), - ), - nostr_sdk::Kind::ZapReceipt => ("Someone zapped you".to_string(), "".to_string()), - _ => ("New activity".to_string(), "".to_string()), - }; - (title, "".to_string(), body) - } - - // MARK: - User device info and settings - - pub async fn save_user_device_info_if_not_present( - &self, - pubkey: nostr::PublicKey, - device_token: &str, - ) -> Result<(), Box> { - if self - .is_pubkey_token_pair_registered(&pubkey, &device_token) - .await? - { - return Ok(()); - } - self.save_user_device_info(pubkey, device_token).await - } - - pub async fn save_user_device_info( - &self, - pubkey: nostr::PublicKey, - device_token: &str, - ) -> Result<(), Box> { - let current_time_unix = Timestamp::now(); - let db_mutex_guard = self.db.lock().await; - db_mutex_guard.get()?.execute( - "INSERT OR REPLACE INTO user_info (id, pubkey, device_token, added_at) VALUES (?, ?, ?, ?)", - params![ - format!("{}:{}", pubkey.to_sql_string(), device_token), - pubkey.to_sql_string(), - device_token, - current_time_unix.to_sql_string() - ], - )?; - Ok(()) - } - - pub async fn remove_user_device_info( - &self, - pubkey: nostr::PublicKey, - device_token: &str, - ) -> Result<(), Box> { - let db_mutex_guard = self.db.lock().await; - db_mutex_guard.get()?.execute( - "DELETE FROM user_info WHERE pubkey = ? AND device_token = ?", - params![pubkey.to_sql_string(), device_token], - )?; - Ok(()) - } - - pub async fn get_user_notification_settings( - &self, - pubkey: &PublicKey, - device_token: String, - ) -> Result> { - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - let mut stmt = connection.prepare( - "SELECT zap_notifications_enabled, mention_notifications_enabled, repost_notifications_enabled, reaction_notifications_enabled, dm_notifications_enabled, only_notifications_from_following_enabled FROM user_info WHERE pubkey = ? AND device_token = ?", - )?; - let settings = stmt.query_row([pubkey.to_sql_string(), device_token], |row| { - Ok(UserNotificationSettings { - zap_notifications_enabled: row.get(0)?, - mention_notifications_enabled: row.get(1)?, - repost_notifications_enabled: row.get(2)?, - reaction_notifications_enabled: row.get(3)?, - dm_notifications_enabled: row.get(4)?, - only_notifications_from_following_enabled: row.get(5)?, - }) - })?; - - Ok(settings) - } - - pub async fn save_user_notification_settings( - &self, - pubkey: &PublicKey, - device_token: String, - settings: UserNotificationSettings, - ) -> Result<(), Box> { - let db_mutex_guard = self.db.lock().await; - let connection = db_mutex_guard.get()?; - connection.execute( - "UPDATE user_info SET zap_notifications_enabled = ?, mention_notifications_enabled = ?, repost_notifications_enabled = ?, reaction_notifications_enabled = ?, dm_notifications_enabled = ?, only_notifications_from_following_enabled = ? WHERE pubkey = ? AND device_token = ?", - params![ - settings.zap_notifications_enabled, - settings.mention_notifications_enabled, - settings.repost_notifications_enabled, - settings.reaction_notifications_enabled, - settings.dm_notifications_enabled, - settings.only_notifications_from_following_enabled, - pubkey.to_sql_string(), - device_token, - ], - )?; - Ok(()) - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct UserNotificationSettings { - zap_notifications_enabled: bool, - mention_notifications_enabled: bool, - repost_notifications_enabled: bool, - reaction_notifications_enabled: bool, - dm_notifications_enabled: bool, - only_notifications_from_following_enabled: bool, -} - -struct NotificationStatus { - status_info: std::collections::HashMap, -} - -impl NotificationStatus { - fn pubkeys_that_received_notification(&self) -> HashSet { - self.status_info - .iter() - .filter(|&(_, &received_notification)| received_notification) - .map(|(pubkey, _)| pubkey.clone()) - .collect() - } -} diff --git a/src/notification_manager/utils.rs b/src/notification_manager/utils.rs index e52b313..821d823 100644 --- a/src/notification_manager/utils.rs +++ b/src/notification_manager/utils.rs @@ -9,7 +9,7 @@ pub fn should_mute_notification_for_mutelist(event: &Event, mute_list: &MuteList } } for muted_event_id in &mute_list.event_ids { - if event.id == *muted_event_id || event.referenced_event_ids().contains(&muted_event_id) { + if event.id == *muted_event_id || event.referenced_event_ids().contains(muted_event_id) { return true; } } diff --git a/src/relay_connection.rs b/src/relay_connection.rs index 8e64c87..4c6ee6f 100644 --- a/src/relay_connection.rs +++ b/src/relay_connection.rs @@ -4,7 +4,6 @@ use futures::StreamExt; use hyper::upgrade::Upgraded; use hyper_tungstenite::{HyperWebsocket, WebSocketStream}; use hyper_util::rt::TokioIo; -use log; use nostr::util::JsonUtil; use nostr::{ClientMessage, RelayMessage}; use serde_json::Value; @@ -36,7 +35,7 @@ impl RelayConnection { notification_manager: Arc, ) -> Result<(), Box> { let mut connection = RelayConnection::new(notification_manager).await?; - Ok(connection.run_loop(websocket).await?) + connection.run_loop(websocket).await } // MARK: - Connection Runtime management @@ -111,7 +110,7 @@ impl RelayConnection { self.notification_manager .send_notifications_if_needed(&event) .await?; - let notice_message = format!("blocked: This relay does not store events"); + let notice_message = "blocked: This relay does not store events".to_string(); let response = RelayMessage::Ok { event_id: event.id, status: false, @@ -122,7 +121,7 @@ impl RelayConnection { _ => { log::info!("Received unsupported Nostr client message"); log::debug!("Unsupported Nostr client message: {:?}", message); - let notice_message = format!("Unsupported message."); + let notice_message = "Unsupported message.".to_string(); let response = RelayMessage::Notice { message: notice_message, };