mirror of
https://github.com/nostrlabs-io/notepush.git
synced 2025-06-16 11:48:51 +00:00
Initial draft commit
This commit is contained in:
9
.gitignore
vendored
Normal file
9
.gitignore
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
target/
|
||||||
|
.direnv/
|
||||||
|
.buildcmd
|
||||||
|
.build-result
|
||||||
|
shell.nix
|
||||||
|
.envrc
|
||||||
|
tags
|
||||||
|
.env
|
||||||
|
.DS_Store
|
19
COPYING
Normal file
19
COPYING
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
Copyright 2024 Damus Nostr, Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
this software and associated documentation files (the “Software”), to deal in
|
||||||
|
the Software without restriction, including without limitation the rights to
|
||||||
|
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
|
||||||
|
of the Software, and to permit persons to whom the Software is furnished to do
|
||||||
|
so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
2166
Cargo.lock
generated
Normal file
2166
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
27
Cargo.toml
Normal file
27
Cargo.toml
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
[package]
|
||||||
|
name = "damus-push-notification-relay"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
toml = "0.5"
|
||||||
|
tracing = "0.1.40"
|
||||||
|
# `bundled` causes us to automatically compile and link in an up to date
|
||||||
|
# version of SQLite for you. This avoids many common build issues, and
|
||||||
|
# avoids depending on the version of SQLite on the users system (or your
|
||||||
|
# system), which may be old or missing. It's the right choice for most
|
||||||
|
# programs that control their own SQLite databases.
|
||||||
|
#
|
||||||
|
# That said, it's not ideal for all scenarios and in particular, generic
|
||||||
|
# libraries built around `rusqlite` should probably not enable it, which
|
||||||
|
# is why it is not a default feature -- it could become hard to disable.
|
||||||
|
rusqlite = { version = "0.31.0", features = ["bundled"] }
|
||||||
|
chrono = { version = "0.4.38" }
|
||||||
|
a2 = { version = "0.10.0" }
|
||||||
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
|
tungstenite = "0.23.0"
|
||||||
|
nostr = "0.32.1"
|
||||||
|
log = "0.4"
|
||||||
|
env_logger = "0.11.3"
|
6
README.md
Normal file
6
README.md
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
|
||||||
|
# Damus Push notification relay
|
||||||
|
|
||||||
|
A high performance Nostr relay for sending out push notifications
|
||||||
|
|
||||||
|
⚠️🔥WIP! Experimental!⚠️🔥
|
1
src/lib.rs
Normal file
1
src/lib.rs
Normal file
@ -0,0 +1 @@
|
|||||||
|
pub mod notification_manager;
|
42
src/main.rs
Normal file
42
src/main.rs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
use std::io::Read;
|
||||||
|
use std::net::TcpListener;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread;
|
||||||
|
mod notification_manager;
|
||||||
|
use log;
|
||||||
|
use env_logger;
|
||||||
|
|
||||||
|
mod relay_connection;
|
||||||
|
use relay_connection::RelayConnection;
|
||||||
|
|
||||||
|
|
||||||
|
fn main () {
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||||
|
let port = std::env::var("PORT").unwrap_or_else(|_| "9001".to_string());
|
||||||
|
let address = format!("{}:{}", host, port);
|
||||||
|
let server = TcpListener::bind(&address).unwrap();
|
||||||
|
|
||||||
|
let notification_manager = Arc::new(Mutex::new(notification_manager::NotificationManager::new(None, None).unwrap()));
|
||||||
|
|
||||||
|
log::info!("Server listening on {}", address);
|
||||||
|
|
||||||
|
for stream in server.incoming() {
|
||||||
|
if let Ok(stream) = stream {
|
||||||
|
log::info!("New connection: {:?}", stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string()));
|
||||||
|
} else if let Err(e) = stream {
|
||||||
|
log::error!("Error: {:?}", e);
|
||||||
|
}
|
||||||
|
thread::spawn (move || {
|
||||||
|
let notification_manager = notification_manager.clone();
|
||||||
|
let websocket_connection = RelayConnection::new(stream, notification_manager);
|
||||||
|
match websocket_connection.start() {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Error in websocket connection: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
7
src/notification_manager/mod.rs
Normal file
7
src/notification_manager/mod.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
pub mod notification_manager;
|
||||||
|
pub mod mute_manager;
|
||||||
|
mod nostr_event_extensions;
|
||||||
|
|
||||||
|
pub use notification_manager::NotificationManager;
|
||||||
|
pub use mute_manager::MuteManager;
|
||||||
|
use nostr_event_extensions::{ExtendedEvent, SqlStringConvertible};
|
96
src/notification_manager/mute_manager.rs
Normal file
96
src/notification_manager/mute_manager.rs
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
use std::collections::HashSet;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
pub struct MuteManager {
|
||||||
|
relay_url: String,
|
||||||
|
client: Option<Client>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MuteManager {
|
||||||
|
pub fn new(relay_url: String) -> Self {
|
||||||
|
let mut manager = MuteManager {
|
||||||
|
relay_url,
|
||||||
|
client: None,
|
||||||
|
};
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let client = Client::new(&Keys::generate());
|
||||||
|
client.add_relay(manager.relay_url.clone()).await.unwrap();
|
||||||
|
client.connect().await;
|
||||||
|
manager.client = Some(client);
|
||||||
|
});
|
||||||
|
manager
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &str) -> bool {
|
||||||
|
if let Some(mute_list) = self.get_public_mute_list(pubkey).await {
|
||||||
|
for tag in mute_list.tags() {
|
||||||
|
match tag.as_slice() {
|
||||||
|
["p", muted_pubkey] => {
|
||||||
|
if event.pubkey == *muted_pubkey {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
["e", muted_event_id] => {
|
||||||
|
if event.id == *muted_event_id || event.referenced_event_ids().contains(muted_event_id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
["t", muted_hashtag] => {
|
||||||
|
if event.tags.iter().any(|t| t.to_vec().as_slice() == ["t", muted_hashtag]) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
["word", muted_word] => {
|
||||||
|
if event.content.to_lowercase().contains(&muted_word.to_lowercase()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_public_mute_list(&self, pubkey: &str) -> Option<Event> {
|
||||||
|
if let Some(client) = &self.client {
|
||||||
|
let (tx, mut rx) = mpsc::channel(100);
|
||||||
|
|
||||||
|
let subscription = Filter::new()
|
||||||
|
.kinds(vec![Kind::MuteList])
|
||||||
|
.authors(vec![pubkey.to_string()])
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
client.subscribe(vec![subscription]).await;
|
||||||
|
|
||||||
|
let mut mute_lists = Vec::new();
|
||||||
|
|
||||||
|
while let Some(notification) = rx.recv().await {
|
||||||
|
if let RelayPoolNotification::Event(_url, event) = notification {
|
||||||
|
mute_lists.push(event);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client.unsubscribe().await;
|
||||||
|
|
||||||
|
mute_lists.into_iter().next()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait EventExt {
|
||||||
|
fn referenced_event_ids(&self) -> HashSet<String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventExt for Event {
|
||||||
|
fn referenced_event_ids(&self) -> HashSet<String> {
|
||||||
|
self.tags
|
||||||
|
.iter()
|
||||||
|
.filter(|tag| tag.first() == Some(&"e".to_string()))
|
||||||
|
.filter_map(|tag| tag.get(1).cloned())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
89
src/notification_manager/nostr_event_extensions.rs
Normal file
89
src/notification_manager/nostr_event_extensions.rs
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
use nostr::{self, key::PublicKey, TagKind::SingleLetter, Alphabet, SingleLetterTag};
|
||||||
|
|
||||||
|
/// Temporary scaffolding of old methods that have not been ported to use native Event methods
|
||||||
|
pub trait ExtendedEvent {
|
||||||
|
/// Checks if the note references a given pubkey
|
||||||
|
fn references_pubkey(&self, pubkey: &PublicKey) -> bool;
|
||||||
|
|
||||||
|
/// Retrieves a set of pubkeys referenced by the note
|
||||||
|
fn referenced_pubkeys(&self) -> std::collections::HashSet<nostr::PublicKey>;
|
||||||
|
|
||||||
|
/// Retrieves a set of pubkeys relevant to the note
|
||||||
|
fn relevant_pubkeys(&self) -> std::collections::HashSet<nostr::PublicKey>;
|
||||||
|
|
||||||
|
/// Retrieves a set of event IDs referenced by the note
|
||||||
|
fn referenced_event_ids(&self) -> std::collections::HashSet<nostr::EventId>;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a wrapper around the Event type from strfry-policies, which adds some useful methods
|
||||||
|
impl ExtendedEvent for nostr::Event {
|
||||||
|
/// Checks if the note references a given pubkey
|
||||||
|
fn references_pubkey(&self, pubkey: &PublicKey) -> bool {
|
||||||
|
self.referenced_pubkeys().contains(pubkey)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieves a set of pubkeys referenced by the note
|
||||||
|
fn referenced_pubkeys(&self) -> std::collections::HashSet<nostr::PublicKey> {
|
||||||
|
self.get_tags_content(SingleLetter(SingleLetterTag::lowercase(Alphabet::P)))
|
||||||
|
.iter()
|
||||||
|
.filter_map(|tag| {
|
||||||
|
PublicKey::from_hex(tag).ok()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieves a set of pubkeys relevant to the note
|
||||||
|
fn relevant_pubkeys(&self) -> std::collections::HashSet<nostr::PublicKey> {
|
||||||
|
let mut pubkeys = self.referenced_pubkeys();
|
||||||
|
pubkeys.insert(self.pubkey.clone());
|
||||||
|
pubkeys
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieves a set of event IDs referenced by the note
|
||||||
|
fn referenced_event_ids(&self) -> std::collections::HashSet<nostr::EventId> {
|
||||||
|
self.get_tag_content(SingleLetter(SingleLetterTag::lowercase(Alphabet::E)))
|
||||||
|
.iter()
|
||||||
|
.filter_map(|tag| {
|
||||||
|
nostr::EventId::from_hex(tag).ok()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - SQL String Convertible
|
||||||
|
|
||||||
|
pub trait SqlStringConvertible {
|
||||||
|
fn to_sql_string(&self) -> String;
|
||||||
|
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> where Self: Sized;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqlStringConvertible for nostr::EventId {
|
||||||
|
fn to_sql_string(&self) -> String {
|
||||||
|
self.to_hex()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
nostr::EventId::from_hex(s).map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqlStringConvertible for nostr::PublicKey {
|
||||||
|
fn to_sql_string(&self) -> String {
|
||||||
|
self.to_hex()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
nostr::PublicKey::from_hex(s).map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqlStringConvertible for nostr::Timestamp {
|
||||||
|
fn to_sql_string(&self) -> String {
|
||||||
|
self.as_u64().to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let u64_timestamp: u64 = s.parse()?;
|
||||||
|
Ok(nostr::Timestamp::from(u64_timestamp))
|
||||||
|
}
|
||||||
|
}
|
297
src/notification_manager/notification_manager.rs
Normal file
297
src/notification_manager/notification_manager.rs
Normal file
@ -0,0 +1,297 @@
|
|||||||
|
use a2::{Client, ClientConfig, DefaultNotificationBuilder, NotificationBuilder};
|
||||||
|
use nostr::event::EventId;
|
||||||
|
use nostr::key::PublicKey;
|
||||||
|
use nostr::types::Timestamp;
|
||||||
|
use rusqlite;
|
||||||
|
use rusqlite::params;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::env;
|
||||||
|
use std::fs::File;
|
||||||
|
use super::mute_manager::MuteManager;
|
||||||
|
use nostr::Event;
|
||||||
|
use super::SqlStringConvertible;
|
||||||
|
use super::ExtendedEvent;
|
||||||
|
|
||||||
|
// MARK: - Constants
|
||||||
|
|
||||||
|
const DEFAULT_DB_PATH: &str = "./apns_notifications.db";
|
||||||
|
const DEFAULT_RELAY_URL: &str = "ws://localhost:7777";
|
||||||
|
|
||||||
|
// MARK: - NotificationManager
|
||||||
|
|
||||||
|
pub struct NotificationManager {
|
||||||
|
db_path: String,
|
||||||
|
relay_url: String,
|
||||||
|
apns_private_key_path: String,
|
||||||
|
apns_private_key_id: String,
|
||||||
|
apns_team_id: String,
|
||||||
|
|
||||||
|
db: rusqlite::Connection,
|
||||||
|
mute_manager: MuteManager,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NotificationManager {
|
||||||
|
|
||||||
|
// MARK: - Initialization
|
||||||
|
|
||||||
|
pub fn new(db_path: Option<String>, relay_url: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let db_path = db_path.unwrap_or(env::var("DB_PATH").unwrap_or(DEFAULT_DB_PATH.to_string()));
|
||||||
|
let relay_url = relay_url.unwrap_or(env::var("RELAY_URL").unwrap_or(DEFAULT_RELAY_URL.to_string()));
|
||||||
|
let apns_private_key_path = env::var("APNS_PRIVATE_KEY_PATH")?;
|
||||||
|
let apns_private_key_id = env::var("APNS_PRIVATE_KEY_ID")?;
|
||||||
|
let apns_team_id = env::var("APNS_TEAM_ID")?;
|
||||||
|
|
||||||
|
let db = rusqlite::Connection::open(&db_path)?;
|
||||||
|
let mute_manager = MuteManager::new(relay_url.clone());
|
||||||
|
|
||||||
|
Self::setup_database(&db)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
db_path,
|
||||||
|
relay_url,
|
||||||
|
apns_private_key_path,
|
||||||
|
apns_private_key_id,
|
||||||
|
apns_team_id,
|
||||||
|
db,
|
||||||
|
mute_manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Database setup operations
|
||||||
|
|
||||||
|
pub fn setup_database(db: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
|
||||||
|
db.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS notifications (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
event_id TEXT,
|
||||||
|
pubkey TEXT,
|
||||||
|
received_notification BOOLEAN
|
||||||
|
)",
|
||||||
|
[],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS notification_event_id_index ON notifications (event_id)",
|
||||||
|
[],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS user_info (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
device_token TEXT,
|
||||||
|
pubkey TEXT
|
||||||
|
)",
|
||||||
|
[],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
db.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS user_info_pubkey_index ON user_info (pubkey)",
|
||||||
|
[],
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Self::add_column_if_not_exists(&db, "notifications", "sent_at", "INTEGER")?;
|
||||||
|
Self::add_column_if_not_exists(&db, "user_info", "added_at", "INTEGER")?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_column_if_not_exists(db: &rusqlite::Connection, table_name: &str, column_name: &str, column_type: &str) -> Result<(), rusqlite::Error> {
|
||||||
|
let query = format!("PRAGMA table_info({})", table_name);
|
||||||
|
let mut stmt = db.prepare(&query)?;
|
||||||
|
let column_names: Vec<String> = stmt.query_map([], |row| row.get(1))?
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !column_names.contains(&column_name.to_string()) {
|
||||||
|
let query = format!("ALTER TABLE {} ADD COLUMN {} {}", table_name, column_name, column_type);
|
||||||
|
db.execute(&query, [])?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Business logic
|
||||||
|
|
||||||
|
pub async fn send_notifications_if_needed(&self, event: &Event) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let one_week_ago = nostr::Timestamp::now() - 7 * 24 * 60 * 60;
|
||||||
|
if event.created_at < one_week_ago {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event)?;
|
||||||
|
|
||||||
|
for pubkey in pubkeys_to_notify {
|
||||||
|
self.send_event_notifications_to_pubkey(event, &pubkey).await?;
|
||||||
|
self.db.execute(
|
||||||
|
"INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?)",
|
||||||
|
params![
|
||||||
|
format!("{}:{}", event.id, pubkey),
|
||||||
|
event.id.to_sql_string(),
|
||||||
|
pubkey.to_sql_string(),
|
||||||
|
true,
|
||||||
|
nostr::Timestamp::now().to_sql_string(),
|
||||||
|
],
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pubkeys_to_notify_for_event(&self, event: &Event) -> Result<HashSet<nostr::PublicKey>, Box<dyn std::error::Error>> {
|
||||||
|
let notification_status = self.get_notification_status(event)?;
|
||||||
|
let relevant_pubkeys = self.pubkeys_relevant_to_event(event)?;
|
||||||
|
let pubkeys_that_received_notification = notification_status.pubkeys_that_received_notification();
|
||||||
|
let relevant_pubkeys_yet_to_receive: HashSet<PublicKey> = relevant_pubkeys
|
||||||
|
.difference(&pubkeys_that_received_notification)
|
||||||
|
.filter(|&x| *x != event.pubkey)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut pubkeys_to_notify = HashSet::new();
|
||||||
|
for pubkey in relevant_pubkeys_yet_to_receive {
|
||||||
|
if !self.mute_manager.should_mute_notification_for_pubkey(event, &pubkey)? {
|
||||||
|
pubkeys_to_notify.insert(pubkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(pubkeys_to_notify)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pubkeys_relevant_to_event(&self, event: &Event) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
|
||||||
|
let mut relevant_pubkeys = event.relevant_pubkeys();
|
||||||
|
let referenced_event_ids = event.referenced_event_ids();
|
||||||
|
for referenced_event_id in referenced_event_ids {
|
||||||
|
let pubkeys_relevant_to_referenced_event = self.pubkeys_subscribed_to_event_id(&referenced_event_id)?;
|
||||||
|
relevant_pubkeys.extend(pubkeys_relevant_to_referenced_event);
|
||||||
|
}
|
||||||
|
Ok(relevant_pubkeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pubkeys_subscribed_to_event(&self, event: &Event) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
|
||||||
|
self.pubkeys_subscribed_to_event_id(&event.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pubkeys_subscribed_to_event_id(&self, event_id: &EventId) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
|
||||||
|
let mut stmt = self.db.prepare("SELECT pubkey FROM notifications WHERE event_id = ?")?;
|
||||||
|
let pubkeys = stmt.query_map([event_id.to_sql_string()], |row| row.get(0))?
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.filter_map(|r: String| PublicKey::from_sql_string(r).ok())
|
||||||
|
.collect();
|
||||||
|
Ok(pubkeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_event_notifications_to_pubkey(&self, event: &Event, pubkey: &PublicKey) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let user_device_tokens = self.get_user_device_tokens(pubkey)?;
|
||||||
|
for device_token in user_device_tokens {
|
||||||
|
self.send_event_notification_to_device_token(event, &device_token).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_user_device_tokens(&self, pubkey: &PublicKey) -> Result<Vec<String>, Box<dyn std::error::Error>> {
|
||||||
|
let mut stmt = self.db.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?;
|
||||||
|
let device_tokens = stmt.query_map([pubkey.to_sql_string()], |row| row.get(0))?
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.collect();
|
||||||
|
Ok(device_tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_notification_status(&self, event: &Event) -> Result<NotificationStatus, Box<dyn std::error::Error>> {
|
||||||
|
let mut stmt = self.db.prepare("SELECT pubkey, received_notification FROM notifications WHERE event_id = ?")?;
|
||||||
|
let rows: std::collections::HashMap<PublicKey, bool> = stmt.query_map([event.id.to_sql_string()], |row| {
|
||||||
|
Ok((row.get(0)?, row.get(1)?))
|
||||||
|
})?
|
||||||
|
.filter_map(|r: Result<(String, bool), rusqlite::Error>| r.ok())
|
||||||
|
.filter_map(|r: (String, bool)| {
|
||||||
|
let pubkey = PublicKey::from_sql_string(r.0).ok()?;
|
||||||
|
let received_notification = r.1;
|
||||||
|
Some((pubkey, received_notification))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut status_info = std::collections::HashMap::new();
|
||||||
|
for row in rows {
|
||||||
|
let (pubkey, received_notification) = row;
|
||||||
|
status_info.insert(pubkey, received_notification);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(NotificationStatus { status_info })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_event_notification_to_device_token(&self, event: &Event, device_token: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let (title, subtitle, body) = self.format_notification_message(event);
|
||||||
|
|
||||||
|
let builder = DefaultNotificationBuilder::new()
|
||||||
|
.set_title(&title)
|
||||||
|
.set_subtitle(&subtitle)
|
||||||
|
.set_body(&body)
|
||||||
|
.set_mutable_content()
|
||||||
|
.set_content_available();
|
||||||
|
|
||||||
|
let mut payload = builder.build(
|
||||||
|
device_token,
|
||||||
|
Default::default()
|
||||||
|
);
|
||||||
|
payload.add_custom_data("nostr_event", event);
|
||||||
|
|
||||||
|
let mut file = File::open(&self.apns_private_key_path)?;
|
||||||
|
|
||||||
|
let client = Client::token(
|
||||||
|
&mut file,
|
||||||
|
&self.apns_private_key_id,
|
||||||
|
&self.apns_team_id,
|
||||||
|
ClientConfig::default())?;
|
||||||
|
|
||||||
|
let _response = client.send(payload).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_notification_message(&self, event: &Event) -> (String, String, String) {
|
||||||
|
let title = "New activity".to_string();
|
||||||
|
let subtitle = format!("From: {}", event.pubkey);
|
||||||
|
let body = event.content.clone();
|
||||||
|
(title, subtitle, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn save_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let current_time_unix = Timestamp::now();
|
||||||
|
self.db.execute(
|
||||||
|
"INSERT OR REPLACE INTO user_info (id, pubkey, device_token, added_at) VALUES (?, ?, ?, ?)",
|
||||||
|
params![
|
||||||
|
format!("{}:{}", pubkey, device_token),
|
||||||
|
pubkey,
|
||||||
|
device_token,
|
||||||
|
current_time_unix.to_sql_string()
|
||||||
|
],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
self.db.execute(
|
||||||
|
"DELETE FROM user_info WHERE pubkey = ? AND device_token = ?",
|
||||||
|
params![pubkey, device_token],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NotificationStatus {
|
||||||
|
status_info: std::collections::HashMap<PublicKey, bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NotificationStatus {
|
||||||
|
fn pubkeys_that_received_notification(&self) -> HashSet<PublicKey> {
|
||||||
|
self.status_info
|
||||||
|
.iter()
|
||||||
|
.filter(|&(_, &received_notification)| received_notification)
|
||||||
|
.map(|(pubkey, _)| pubkey.clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for NotificationManager {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Err(e) = self.db.close() {
|
||||||
|
eprintln!("Error closing database: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
74
src/relay_connection.rs
Normal file
74
src/relay_connection.rs
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
use damus_push_notification_relay::log_describable::LogDescribable;
|
||||||
|
use nostr::util::JsonUtil;
|
||||||
|
use nostr::{RelayMessage, ClientMessage};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use serde_json::Value;
|
||||||
|
use notification_manager::NotificationManager;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::net::TcpStream;
|
||||||
|
use tungstenite::accept;
|
||||||
|
use log;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
pub struct RelayConnection {
|
||||||
|
stream: TcpStream,
|
||||||
|
notification_manager: Arc<Mutex<NotificationManager>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayConnection {
|
||||||
|
pub fn new(stream: TcpStream, notification_manager: Arc<Mutex<NotificationManager>>) -> Self {
|
||||||
|
RelayConnection {
|
||||||
|
stream,
|
||||||
|
notification_manager
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let notification_manager = NotificationManager::new(None, None)?;
|
||||||
|
let mut websocket = accept(self.stream)?;
|
||||||
|
loop {
|
||||||
|
match self.run_loop_iteration() {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Error in {}: {:?}", self, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_loop_iteration(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let raw_message = self.stream.read()?;
|
||||||
|
if raw_message.is_text() {
|
||||||
|
let message: ClientMessage = ClientMessage::from_value(Value::from_str(raw_msg.to_text()?)?)?;
|
||||||
|
let response = self.handle_client_message(message)?;
|
||||||
|
self.stream.send(response.try_as_json()?)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_client_message(&self, message: ClientMessage) -> Result<RelayMessage, Box<dyn std::error::Error>> {
|
||||||
|
match message {
|
||||||
|
ClientMessage::Event(event) => {
|
||||||
|
log::info("Received event: {:?}", event);
|
||||||
|
self.notification_manager.lock()?.send_notification_if_needed(&event)?;
|
||||||
|
let notice_message = format!("blocked: This relay does not store events");
|
||||||
|
let response = RelayMessage::Ok { event_id: event.id, status: false, message: notice_message };
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
log::info("Received unsupported message: {:?}", message);
|
||||||
|
let notice_message = format!("Unsupported message: {:?}", message);
|
||||||
|
let response = RelayMessage::Notice { message: notice_message };
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for RelayConnection {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
let peer_address = self.stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string());
|
||||||
|
let description = format!("relay connection with {}", peer_address);
|
||||||
|
write!(f, "{}", description)
|
||||||
|
}
|
||||||
|
}
|
5
test/test-inputs
Normal file
5
test/test-inputs
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
|
||||||
|
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
|
||||||
|
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
|
||||||
|
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
|
||||||
|
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
|
Reference in New Issue
Block a user