From c0cded7255149dc1bba79d85fc7c7e78d310eb59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Tue, 16 Jul 2024 11:00:36 -0700 Subject: [PATCH] Improve relay draft This commit brings a compilable, runnable relay draft, with logging setup, basic README instructions, Mutex protection around shared resources, implemented with code safety in mind Notes: - This is not fully tested - The mutex design is not the most efficient, and could cause some contention on high traffic - The REST API is not yet integrated --- .gitignore | 3 +- Cargo.lock | 393 +++++++++++++++++- Cargo.toml | 6 +- README.md | 53 ++- shell.nix | 5 + src/main.rs | 65 +-- src/notepush_env.rs | 56 +++ src/notification_manager/mute_manager.rs | 117 +++--- .../notification_manager.rs | 70 ++-- src/relay_connection.rs | 85 ++-- test/test-inputs | 6 +- 11 files changed, 674 insertions(+), 185 deletions(-) create mode 100644 shell.nix create mode 100644 src/notepush_env.rs diff --git a/.gitignore b/.gitignore index 23ea83d..d789a71 100644 --- a/.gitignore +++ b/.gitignore @@ -2,8 +2,9 @@ target/ .direnv/ .buildcmd .build-result -shell.nix .envrc tags .env .DS_Store +*.p8 +apns_notifications.db diff --git a/Cargo.lock b/Cargo.lock index 5dc4c35..8c49ba7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -146,6 +152,67 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "async-trait" +version = "0.1.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-utility" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a349201d80b4aa18d17a34a182bdd7f8ddf845e9e57d2ea130a12e10ef1e3a47" +dependencies = [ + "futures-util", + "gloo-timers", + "tokio", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-wsocket" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3445f8f330db8e5f3be7912f170f32e43fec90d995c71ced1ec3b8394b4a873c" +dependencies = [ + "async-utility", + "futures-util", + "thiserror", + "tokio", + "tokio-rustls 0.26.0", + "tokio-socks", + "tokio-tungstenite", + "url", + "wasm-ws", + "webpki-roots", +] + +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", +] + +[[package]] +name = "atomic-destructor" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d919cb60ba95c87ba42777e9e246c4e8d658057299b437b7512531ce0a09a23" +dependencies = [ + "tracing", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -393,24 +460,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "damus-push-notification-relay" -version = "0.1.0" -dependencies = [ - "a2", - "chrono", - "env_logger", - "log", - "nostr", - "rusqlite", - "serde", - "serde_json", - "tokio", - "toml", - "tracing", - "tungstenite", -] - [[package]] name = "data-encoding" version = "2.6.0" @@ -428,6 +477,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "either" version = "1.13.0" @@ -514,6 +569,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -521,6 +591,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -529,6 +600,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -547,10 +646,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -582,6 +687,18 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.5" @@ -608,6 +725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", + "allocator-api2", ] [[package]] @@ -877,6 +995,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "lnurl-pay" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c042191c2e3f27147decfad8182eea2c7dd1c6c1733562e25d3d401369669d" +dependencies = [ + "bech32", + "reqwest", + "serde", + "serde_json", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -893,6 +1023,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown", +] + [[package]] name = "memchr" version = "2.7.4" @@ -961,6 +1100,103 @@ dependencies = [ "web-sys", ] +[[package]] +name = "nostr-database" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88a72f92fbd5d2514db36e07a864646f1c1f44931c4a5ea195f6961029af4b3" +dependencies = [ + "async-trait", + "lru", + "nostr", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-relay-pool" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7b7bf72b02a24ccc7cf87033fa5ddfd57001c7d8c2e757321a7ca7a6df39876" +dependencies = [ + "async-utility", + "async-wsocket", + "atomic-destructor", + "nostr", + "nostr-database", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-sdk" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "005915a59ee6401f23ba510c3a9ac4a07b457f80dfe1dc05cd2c8fdbde439246" +dependencies = [ + "async-utility", + "atomic-destructor", + "lnurl-pay", + "nostr", + "nostr-database", + "nostr-relay-pool", + "nostr-signer", + "nostr-zapper", + "nwc", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-signer" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f99449c2077bef43c02c8f9a9386d01c87e7ad8ece70d7de87a2c59771b4c0fe" +dependencies = [ + "async-utility", + "nostr", + "nostr-relay-pool", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-zapper" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430c2527c0efd2e7f1a421b0c7df01a03b334a79c60c39cc7a1ca684f720f2bf" +dependencies = [ + "async-trait", + "nostr", + "thiserror", +] + +[[package]] +name = "notepush" +version = "0.1.0" +dependencies = [ + "a2", + "chrono", + "dotenv", + "env_logger", + "log", + "nostr", + "nostr-sdk", + "r2d2", + "r2d2_sqlite", + "rusqlite", + "serde", + "serde_json", + "tokio", + "toml", + "tracing", + "tungstenite", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -980,6 +1216,20 @@ dependencies = [ "libc", ] +[[package]] +name = "nwc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6fb91e4be3f6b872fc23c7714bbb500a58a1d59f458eb6eb9dd249fbec42fc2" +dependencies = [ + "async-utility", + "nostr", + "nostr-relay-pool", + "nostr-zapper", + "thiserror", + "tracing", +] + [[package]] name = "object" version = "0.36.1" @@ -1089,6 +1339,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -1209,6 +1469,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a982edf65c129796dba72f8775b292ef482b40d035e827a9825b3bc07ccc5f2" +dependencies = [ + "r2d2", + "rusqlite", + "uuid", +] + [[package]] name = "rand" version = "0.8.5" @@ -1361,6 +1643,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.22.4" @@ -1431,6 +1722,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1470,6 +1770,18 @@ dependencies = [ "cc", ] +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.204" @@ -1698,6 +2010,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "rustls 0.23.11", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.0", + "tungstenite", + "webpki-roots", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -1797,6 +2125,8 @@ dependencies = [ "httparse", "log", "rand", + "rustls 0.23.11", + "rustls-pki-types", "sha1", "thiserror", "utf-8", @@ -1869,6 +2199,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -1962,6 +2302,23 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "wasm-ws" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c5806d1b06b4f3d90d015e23364dc5d3af412ee64abba6dde8fdc01637e33" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "pharos", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.69" diff --git a/Cargo.toml b/Cargo.toml index 4eac052..ad72e60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "damus-push-notification-relay" +name = "notepush" version = "0.1.0" edition = "2021" @@ -25,3 +25,7 @@ tungstenite = "0.23.0" nostr = "0.32.1" log = "0.4" env_logger = "0.11.3" +nostr-sdk = "0.32.0" +r2d2_sqlite = "0.24.0" +r2d2 = "0.8.10" +dotenv = "0.15.0" diff --git a/README.md b/README.md index 452256c..3e3d73e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,53 @@ +Notepush +======== -# Damus Push notification relay - -A high performance Nostr relay for sending out push notifications +A high performance Nostr relay for sending out push notifications using the Apple Push Notification Service (APNS). ⚠️🔥WIP! Experimental!⚠️🔥 + +## Installation + +1. Get a build or build it yourself using `cargo build --release` +2. On the working directory from which you start this relay, create an `.env` file with the following contents: + +```env +APNS_TOPIC="com.your_org.your_app" # Your app's bundle ID +APNS_AUTH_PRIVATE_KEY_FILE_PATH=./AuthKey_1234567890.p8 # Path to the private key file used to generate JWT tokens with the Apple APNS server. You can obtain this from https://developer.apple.com/account/resources/authkeys/list +APNS_AUTH_PRIVATE_KEY_ID=1234567890 # The ID of the private key used to generate JWT tokens with the Apple APNS server. You can obtain this from https://developer.apple.com/account/resources/authkeys/list +APNS_ENVIRONMENT="development" # The environment to use with the APNS server. Can be "development" or "production" +APPLE_TEAM_ID=1248163264 # The ID of the team. Can be found in AppStore Connect. +DB_PATH=./apns_notifications.db # Path to the SQLite database file that will be used to store data about sent notifications, relative to the working directory +RELAY_URL=ws://localhost:7777 # URL to the relay server which will be consulted to get information such as mute lists. +API_BASE_URL=http://localhost:8000 # Base URL from the API is allowed access (used by the server to perform NIP-98 authentication) +HOST="0.0.0.0" # The host to bind the server to (Defaults to 0.0.0.0 to bind to all interfaces) +PORT=9001 # The port to bind the server to. Defaults to 9001 +``` + +6. Run this relay using the built binary or the `cargo run` command. If you want to change the log level, you can set the `RUST_LOG` environment variable to `DEBUG` or `INFO` before running the relay. + +Example: +```sh +$ RUST_LOG=DEBUG cargo run +``` + +## Contributions + +For contribution guidelines, please check [this](https://github.com/damus-io/damus/blob/master/docs/CONTRIBUTING.md) document. + +## Development setup + +1. Install the Rust toolchain +2. Clone this repository +3. Run `cargo build` to build the project +4. Run `cargo test` to run the tests +5. Run `cargo run` to run the project + +## Testing utilities + +You can use `test/test-inputs` with a websockets test tool such as `websocat` to play around with the relay. If you have Nix installed, you can run: + +```sh +$ nix-shell +[nix-shell] $ websocat ws://localhost:9001 + +``` diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..56c703b --- /dev/null +++ b/shell.nix @@ -0,0 +1,5 @@ +{ pkgs ? import {} }: + +pkgs.mkShell { + buildInputs = [ pkgs.websocat ]; +} diff --git a/src/main.rs b/src/main.rs index 6f55224..94d34ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,42 +1,59 @@ -use std::io::Read; use std::net::TcpListener; -use std::sync::{Arc, Mutex}; -use std::thread; +use std::sync::Arc; +use tokio::sync::Mutex; mod notification_manager; use log; use env_logger; - +use r2d2_sqlite::SqliteConnectionManager; mod relay_connection; use relay_connection::RelayConnection; +use r2d2; +mod notepush_env; +use notepush_env::NotePushEnv; - -fn main () { +#[tokio::main] +async fn main () { + + // MARK: - Setup basics + env_logger::init(); - let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = std::env::var("PORT").unwrap_or_else(|_| "9001".to_string()); - let address = format!("{}:{}", host, port); - let server = TcpListener::bind(&address).unwrap(); + let env = NotePushEnv::load_env().expect("Failed to load environment variables"); + let server = TcpListener::bind(&env.address()).expect("Failed to bind to address"); - let notification_manager = Arc::new(Mutex::new(notification_manager::NotificationManager::new(None, None).unwrap())); + let manager = SqliteConnectionManager::file(env.db_path.clone()); + let pool: r2d2::Pool = r2d2::Pool::new(manager).expect("Failed to create SQLite connection pool"); + // Notification manager is a shared resource that will be used by all connections via a mutex and an atomic reference counter. + // This is shared to avoid data races when reading/writing to the sqlite database, and reduce outgoing relay connections. + let notification_manager = Arc::new(Mutex::new(notification_manager::NotificationManager::new( + pool, + env.relay_url.clone(), + env.apns_private_key_path.clone(), + env.apns_private_key_id.clone(), + env.apns_team_id.clone(), + env.apns_environment.clone(), + env.apns_topic.clone(), + ).await.expect("Failed to create notification manager"))); - log::info!("Server listening on {}", address); + log::info!("Server listening on {}", env.address().clone()); + + // MARK: - Start handling incoming connections for stream in server.incoming() { if let Ok(stream) = stream { - log::info!("New connection: {:?}", stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string())); - } else if let Err(e) = stream { - log::error!("Error: {:?}", e); - } - thread::spawn (move || { + let peer_address_string = stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string()); + log::info!("New connection from {}", peer_address_string); let notification_manager = notification_manager.clone(); - let websocket_connection = RelayConnection::new(stream, notification_manager); - match websocket_connection.start() { - Ok(_) => {} - Err(e) => { - log::error!("Error in websocket connection: {:?}", e); + tokio::spawn(async move { + match RelayConnection::run(stream, notification_manager).await { + Ok(_) => {} + Err(e) => { + log::error!("Error with websocket connection from {}: {:?}", peer_address_string, e); + } } - } - }); + }); + } else if let Err(e) = stream { + log::error!("Error in incoming connection stream: {:?}", e); + } } } diff --git a/src/notepush_env.rs b/src/notepush_env.rs new file mode 100644 index 0000000..f28972b --- /dev/null +++ b/src/notepush_env.rs @@ -0,0 +1,56 @@ +use std::env; +use dotenv::dotenv; +use a2; + +const DEFAULT_DB_PATH: &str = "./apns_notifications.db"; +const DEFAULT_HOST: &str = "0.0.0.0"; +const DEFAULT_PORT: &str = "9001"; +const DEFAULT_RELAY_URL: &str = "ws://localhost:7777"; + +pub struct NotePushEnv { + pub apns_private_key_path: String, + pub apns_private_key_id: String, + pub apns_team_id: String, + pub apns_environment: a2::client::Endpoint, + pub apns_topic: String, + pub db_path: String, + pub host: String, + pub port: String, + pub relay_url: String, +} + +impl NotePushEnv { + pub fn load_env() -> Result { + dotenv().ok(); + let apns_private_key_path = env::var("APNS_AUTH_PRIVATE_KEY_FILE_PATH")?; + let apns_private_key_id = env::var("APNS_AUTH_PRIVATE_KEY_ID")?; + let apns_team_id = env::var("APPLE_TEAM_ID")?; + let db_path = env::var("DB_PATH").unwrap_or(DEFAULT_DB_PATH.to_string()); + let host = env::var("HOST").unwrap_or(DEFAULT_HOST.to_string()); + let port = env::var("PORT").unwrap_or(DEFAULT_PORT.to_string()); + let relay_url = env::var("RELAY_URL").unwrap_or(DEFAULT_RELAY_URL.to_string()); + let apns_environment_string = env::var("APNS_ENVIRONMENT").unwrap_or("development".to_string()); + let apns_environment = match apns_environment_string.as_str() { + "development" => a2::client::Endpoint::Sandbox, + "production" => a2::client::Endpoint::Production, + _ => a2::client::Endpoint::Sandbox, + }; + let apns_topic = env::var("APNS_TOPIC")?; + + Ok(NotePushEnv { + apns_private_key_path, + apns_private_key_id, + apns_team_id, + apns_environment, + apns_topic, + db_path, + host, + port, + relay_url, + }) + } + + pub fn address(&self) -> String { + format!("{}:{}", self.host, self.port) + } +} diff --git a/src/notification_manager/mute_manager.rs b/src/notification_manager/mute_manager.rs index 518fc67..8ec9e7f 100644 --- a/src/notification_manager/mute_manager.rs +++ b/src/notification_manager/mute_manager.rs @@ -1,48 +1,56 @@ -use std::collections::HashSet; -use tokio::sync::mpsc; +use nostr_sdk::prelude::*; +use super::ExtendedEvent; pub struct MuteManager { relay_url: String, - client: Option, + client: Client, } impl MuteManager { - pub fn new(relay_url: String) -> Self { - let mut manager = MuteManager { + pub async fn new(relay_url: String) -> Result> { + let client = Client::new(&Keys::generate()); + client.add_relay(relay_url.clone()).await?; + client.connect().await; + Ok(MuteManager { relay_url, - client: None, - }; - tokio::spawn(async move { - let client = Client::new(&Keys::generate()); - client.add_relay(manager.relay_url.clone()).await.unwrap(); - client.connect().await; - manager.client = Some(client); - }); - manager + client + }) } - pub async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &str) -> bool { + pub async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &PublicKey) -> bool { if let Some(mute_list) = self.get_public_mute_list(pubkey).await { for tag in mute_list.tags() { - match tag.as_slice() { - ["p", muted_pubkey] => { - if event.pubkey == *muted_pubkey { - return true; + match tag.kind() { + TagKind::SingleLetter(SingleLetterTag { character: Alphabet::P, uppercase: false }) => { + let tagged_pubkey: Option = tag.content().and_then(|h| { PublicKey::from_hex(h).ok() }); + if let Some(tagged_pubkey) = tagged_pubkey { + if event.pubkey == tagged_pubkey { + return true + } } } - ["e", muted_event_id] => { - if event.id == *muted_event_id || event.referenced_event_ids().contains(muted_event_id) { - return true; + TagKind::SingleLetter(SingleLetterTag { character: Alphabet::E, uppercase: false }) => { + let tagged_event_id: Option = tag.content().and_then(|h| { EventId::from_hex(h).ok() }); + if let Some(tagged_event_id) = tagged_event_id { + if event.id == tagged_event_id || event.referenced_event_ids().contains(&tagged_event_id) { + return true + } } } - ["t", muted_hashtag] => { - if event.tags.iter().any(|t| t.to_vec().as_slice() == ["t", muted_hashtag]) { - return true; + TagKind::SingleLetter(SingleLetterTag { character: Alphabet::T, uppercase: false }) => { + let tagged_hashtag: Option = tag.content().map(|h| h.to_string()); + if let Some(tagged_hashtag) = tagged_hashtag { + let tags_content = event.get_tags_content(TagKind::SingleLetter(SingleLetterTag { character: Alphabet::T, uppercase: false })); + let should_mute = tags_content.iter().any(|t| t == &tagged_hashtag); + return should_mute } } - ["word", muted_word] => { - if event.content.to_lowercase().contains(&muted_word.to_lowercase()) { - return true; + TagKind::Word => { + let tagged_word: Option = tag.content().map(|h| h.to_string()); + if let Some(tagged_word) = tagged_word { + if event.content.to_lowercase().contains(&tagged_word.to_lowercase()) { + return true + } } } _ => {} @@ -52,45 +60,26 @@ impl MuteManager { false } - pub async fn get_public_mute_list(&self, pubkey: &str) -> Option { - if let Some(client) = &self.client { - let (tx, mut rx) = mpsc::channel(100); + pub async fn get_public_mute_list(&self, pubkey: &PublicKey) -> Option { + let subscription_filter = Filter::new() + .kinds(vec![Kind::MuteList]) + .authors(vec![pubkey.clone()]) + .limit(1); - let subscription = Filter::new() - .kinds(vec![Kind::MuteList]) - .authors(vec![pubkey.to_string()]) - .limit(1); - - client.subscribe(vec![subscription]).await; - - let mut mute_lists = Vec::new(); - - while let Some(notification) = rx.recv().await { - if let RelayPoolNotification::Event(_url, event) = notification { - mute_lists.push(event); - break; + let this_subscription_id = self.client.subscribe(Vec::from([subscription_filter]), None).await; + + let mut mute_list: Option = None; + let mut notifications = self.client.notifications(); + while let Ok(notification) = notifications.recv().await { + if let RelayPoolNotification::Event { subscription_id, event, .. } = notification { + if this_subscription_id == subscription_id && event.kind == Kind::MuteList { + mute_list = Some((*event).clone()); + break } } - - client.unsubscribe().await; - - mute_lists.into_iter().next() - } else { - None } - } -} - -trait EventExt { - fn referenced_event_ids(&self) -> HashSet; -} - -impl EventExt for Event { - fn referenced_event_ids(&self) -> HashSet { - self.tags - .iter() - .filter(|tag| tag.first() == Some(&"e".to_string())) - .filter_map(|tag| tag.get(1).cloned()) - .collect() + + self.client.unsubscribe(this_subscription_id).await; + mute_list } } diff --git a/src/notification_manager/notification_manager.rs b/src/notification_manager/notification_manager.rs index 8570a71..943158a 100644 --- a/src/notification_manager/notification_manager.rs +++ b/src/notification_manager/notification_manager.rs @@ -5,53 +5,45 @@ use nostr::types::Timestamp; use rusqlite; use rusqlite::params; use std::collections::HashSet; -use std::env; + use std::fs::File; use super::mute_manager::MuteManager; use nostr::Event; use super::SqlStringConvertible; use super::ExtendedEvent; - -// MARK: - Constants - -const DEFAULT_DB_PATH: &str = "./apns_notifications.db"; -const DEFAULT_RELAY_URL: &str = "ws://localhost:7777"; +use r2d2_sqlite::SqliteConnectionManager; +use r2d2; // MARK: - NotificationManager pub struct NotificationManager { - db_path: String, + db: r2d2::Pool, relay_url: String, apns_private_key_path: String, apns_private_key_id: String, apns_team_id: String, - - db: rusqlite::Connection, + apns_environment: a2::client::Endpoint, + apns_topic: String, + mute_manager: MuteManager, } impl NotificationManager { - // MARK: - Initialization - - pub fn new(db_path: Option, relay_url: Option) -> Result> { - let db_path = db_path.unwrap_or(env::var("DB_PATH").unwrap_or(DEFAULT_DB_PATH.to_string())); - let relay_url = relay_url.unwrap_or(env::var("RELAY_URL").unwrap_or(DEFAULT_RELAY_URL.to_string())); - let apns_private_key_path = env::var("APNS_PRIVATE_KEY_PATH")?; - let apns_private_key_id = env::var("APNS_PRIVATE_KEY_ID")?; - let apns_team_id = env::var("APNS_TEAM_ID")?; + + pub async fn new(db: r2d2::Pool, relay_url: String, apns_private_key_path: String, apns_private_key_id: String, apns_team_id: String, apns_environment: a2::client::Endpoint, apns_topic: String) -> Result> { + let mute_manager = MuteManager::new(relay_url.clone()).await?; - let db = rusqlite::Connection::open(&db_path)?; - let mute_manager = MuteManager::new(relay_url.clone()); - - Self::setup_database(&db)?; + let connection = db.get()?; + Self::setup_database(&connection)?; Ok(Self { - db_path, relay_url, apns_private_key_path, apns_private_key_id, apns_team_id, + apns_environment, + apns_topic, db, mute_manager, }) @@ -117,11 +109,11 @@ impl NotificationManager { return Ok(()); } - let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event)?; + let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event).await?; for pubkey in pubkeys_to_notify { self.send_event_notifications_to_pubkey(event, &pubkey).await?; - self.db.execute( + self.db.get()?.execute( "INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at) VALUES (?, ?, ?, ?, ?)", params![ @@ -136,7 +128,7 @@ impl NotificationManager { Ok(()) } - fn pubkeys_to_notify_for_event(&self, event: &Event) -> Result, Box> { + async fn pubkeys_to_notify_for_event(&self, event: &Event) -> Result, Box> { let notification_status = self.get_notification_status(event)?; let relevant_pubkeys = self.pubkeys_relevant_to_event(event)?; let pubkeys_that_received_notification = notification_status.pubkeys_that_received_notification(); @@ -148,7 +140,8 @@ impl NotificationManager { let mut pubkeys_to_notify = HashSet::new(); for pubkey in relevant_pubkeys_yet_to_receive { - if !self.mute_manager.should_mute_notification_for_pubkey(event, &pubkey)? { + let should_mute: bool = self.mute_manager.should_mute_notification_for_pubkey(event, &pubkey).await; + if !should_mute { pubkeys_to_notify.insert(pubkey); } } @@ -170,7 +163,8 @@ impl NotificationManager { } fn pubkeys_subscribed_to_event_id(&self, event_id: &EventId) -> Result, Box> { - let mut stmt = self.db.prepare("SELECT pubkey FROM notifications WHERE event_id = ?")?; + let connection = self.db.get()?; + let mut stmt = connection.prepare("SELECT pubkey FROM notifications WHERE event_id = ?")?; let pubkeys = stmt.query_map([event_id.to_sql_string()], |row| row.get(0))? .filter_map(|r| r.ok()) .filter_map(|r: String| PublicKey::from_sql_string(r).ok()) @@ -187,7 +181,8 @@ impl NotificationManager { } fn get_user_device_tokens(&self, pubkey: &PublicKey) -> Result, Box> { - let mut stmt = self.db.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?; + let connection = self.db.get()?; + let mut stmt = connection.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?; let device_tokens = stmt.query_map([pubkey.to_sql_string()], |row| row.get(0))? .filter_map(|r| r.ok()) .collect(); @@ -195,7 +190,8 @@ impl NotificationManager { } fn get_notification_status(&self, event: &Event) -> Result> { - let mut stmt = self.db.prepare("SELECT pubkey, received_notification FROM notifications WHERE event_id = ?")?; + let connection = self.db.get()?; + let mut stmt = connection.prepare("SELECT pubkey, received_notification FROM notifications WHERE event_id = ?")?; let rows: std::collections::HashMap = stmt.query_map([event.id.to_sql_string()], |row| { Ok((row.get(0)?, row.get(1)?)) })? @@ -231,6 +227,7 @@ impl NotificationManager { Default::default() ); payload.add_custom_data("nostr_event", event); + payload.options.apns_topic = Some(self.apns_topic.as_str()); let mut file = File::open(&self.apns_private_key_path)?; @@ -238,7 +235,8 @@ impl NotificationManager { &mut file, &self.apns_private_key_id, &self.apns_team_id, - ClientConfig::default())?; + ClientConfig::new(self.apns_environment.clone()) + )?; let _response = client.send(payload).await?; Ok(()) @@ -253,7 +251,7 @@ impl NotificationManager { pub fn save_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box> { let current_time_unix = Timestamp::now(); - self.db.execute( + self.db.get()?.execute( "INSERT OR REPLACE INTO user_info (id, pubkey, device_token, added_at) VALUES (?, ?, ?, ?)", params![ format!("{}:{}", pubkey, device_token), @@ -266,7 +264,7 @@ impl NotificationManager { } pub fn remove_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box> { - self.db.execute( + self.db.get()?.execute( "DELETE FROM user_info WHERE pubkey = ? AND device_token = ?", params![pubkey, device_token], )?; @@ -287,11 +285,3 @@ impl NotificationStatus { .collect() } } - -impl Drop for NotificationManager { - fn drop(&mut self) { - if let Err(e) = self.db.close() { - eprintln!("Error closing database: {:?}", e); - } - } -} diff --git a/src/relay_connection.rs b/src/relay_connection.rs index 1f8ccce..9c41fbe 100644 --- a/src/relay_connection.rs +++ b/src/relay_connection.rs @@ -1,62 +1,91 @@ -use damus_push_notification_relay::log_describable::LogDescribable; use nostr::util::JsonUtil; use nostr::{RelayMessage, ClientMessage}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; use serde_json::Value; -use notification_manager::NotificationManager; +use crate::notification_manager::NotificationManager; use std::str::FromStr; use std::net::TcpStream; -use tungstenite::accept; +use tungstenite::{accept, WebSocket}; use log; -use std::fmt; +use std::fmt::{self, Debug}; + +const MAX_CONSECUTIVE_ERRORS: u32 = 10; pub struct RelayConnection { - stream: TcpStream, + websocket: WebSocket, notification_manager: Arc> } impl RelayConnection { - pub fn new(stream: TcpStream, notification_manager: Arc>) -> Self { - RelayConnection { - stream, + + // MARK: - Initializers + + pub fn new(stream: TcpStream, notification_manager: Arc>) -> Result> { + let address = stream.peer_addr()?; + let websocket = accept(stream)?; + log::info!("Accepted connection from {:?}", address); + Ok(RelayConnection { + websocket, notification_manager - } + }) } - pub fn start(&self) -> Result<(), Box> { - let notification_manager = NotificationManager::new(None, None)?; - let mut websocket = accept(self.stream)?; + pub async fn run(stream: TcpStream, notification_manager: Arc>) -> Result<(), Box> { + let mut connection = RelayConnection::new(stream, notification_manager)?; + Ok(connection.run_loop().await?) + } + + // MARK: - Connection Runtime management + + pub async fn run_loop(&mut self) -> Result<(), Box> { + let mut consecutive_errors = 0; + log::debug!("Starting run loop for connection with {:?}", self.websocket); loop { - match self.run_loop_iteration() { - Ok(_) => {} + match self.run_loop_iteration().await { + Ok(_) => { + consecutive_errors = 0; + } Err(e) => { - log::error!("Error in {}: {:?}", self, e); + log::error!("Error in websocket connection with {:?}: {:?}", self.websocket, e); + consecutive_errors += 1; + if consecutive_errors >= MAX_CONSECUTIVE_ERRORS { + log::error!("Too many consecutive errors, closing connection with {:?}", self.websocket); + return Err(e); + } } } } } - fn run_loop_iteration(&self) -> Result<(), Box> { - let raw_message = self.stream.read()?; + pub async fn run_loop_iteration<'a>(&'a mut self) -> Result<(), Box> { + let websocket = &mut self.websocket; + let raw_message = websocket.read()?; if raw_message.is_text() { - let message: ClientMessage = ClientMessage::from_value(Value::from_str(raw_msg.to_text()?)?)?; - let response = self.handle_client_message(message)?; - self.stream.send(response.try_as_json()?)?; + let message: ClientMessage = ClientMessage::from_value(Value::from_str(raw_message.to_text()?)?)?; + let response = self.handle_client_message(message).await?; + self.websocket.send(tungstenite::Message::text(response.try_as_json()?))?; } Ok(()) } - fn handle_client_message(&self, message: ClientMessage) -> Result> { + // MARK: - Message handling + + async fn handle_client_message<'b>(&'b self, message: ClientMessage) -> Result> { match message { ClientMessage::Event(event) => { - log::info("Received event: {:?}", event); - self.notification_manager.lock()?.send_notification_if_needed(&event)?; + log::info!("Received event: {:?}", event); + { + // TODO: Reduce resource contention by reducing the scope of the mutex into NotificationManager logic. + let mutex_guard = self.notification_manager.lock().await; + mutex_guard.send_notifications_if_needed(&event).await?; + }; // Only hold the mutex for as little time as possible. let notice_message = format!("blocked: This relay does not store events"); let response = RelayMessage::Ok { event_id: event.id, status: false, message: notice_message }; Ok(response) } _ => { - log::info("Received unsupported message: {:?}", message); + log::info!("Received unsupported message: {:?}", message); let notice_message = format!("Unsupported message: {:?}", message); let response = RelayMessage::Notice { message: notice_message }; Ok(response) @@ -65,10 +94,8 @@ impl RelayConnection { } } -impl fmt::Debug for RelayConnection { +impl Debug for RelayConnection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let peer_address = self.stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string()); - let description = format!("relay connection with {}", peer_address); - write!(f, "{}", description) + write!(f, "RelayConnection with websocket: {:?}", self.websocket) } } diff --git a/test/test-inputs b/test/test-inputs index 16f713a..d8e13aa 100644 --- a/test/test-inputs +++ b/test/test-inputs @@ -1,5 +1 @@ -{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}} -{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}} -{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}} -{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}} -{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}} +["EVENT", {"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}]