Improve relay draft

This commit brings a compilable, runnable relay draft, with logging
setup, basic README instructions, Mutex protection around shared
resources, implemented with code safety in mind

Notes:
- This is not fully tested
- The mutex design is not the most efficient, and could cause some contention on high traffic
- The REST API is not yet integrated
This commit is contained in:
Daniel D’Aquino
2024-07-16 11:00:36 -07:00
parent e11c636ce3
commit c0cded7255
11 changed files with 674 additions and 185 deletions

3
.gitignore vendored
View File

@ -2,8 +2,9 @@ target/
.direnv/
.buildcmd
.build-result
shell.nix
.envrc
tags
.env
.DS_Store
*.p8
apns_notifications.db

393
Cargo.lock generated
View File

@ -82,6 +82,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
[[package]]
name = "android-tzdata"
version = "0.1.1"
@ -146,6 +152,67 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "async-trait"
version = "0.1.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-utility"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a349201d80b4aa18d17a34a182bdd7f8ddf845e9e57d2ea130a12e10ef1e3a47"
dependencies = [
"futures-util",
"gloo-timers",
"tokio",
"wasm-bindgen-futures",
]
[[package]]
name = "async-wsocket"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3445f8f330db8e5f3be7912f170f32e43fec90d995c71ced1ec3b8394b4a873c"
dependencies = [
"async-utility",
"futures-util",
"thiserror",
"tokio",
"tokio-rustls 0.26.0",
"tokio-socks",
"tokio-tungstenite",
"url",
"wasm-ws",
"webpki-roots",
]
[[package]]
name = "async_io_stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
dependencies = [
"futures",
"pharos",
"rustc_version",
]
[[package]]
name = "atomic-destructor"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d919cb60ba95c87ba42777e9e246c4e8d658057299b437b7512531ce0a09a23"
dependencies = [
"tracing",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -393,24 +460,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "damus-push-notification-relay"
version = "0.1.0"
dependencies = [
"a2",
"chrono",
"env_logger",
"log",
"nostr",
"rusqlite",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tungstenite",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
@ -428,6 +477,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "either"
version = "1.13.0"
@ -514,6 +569,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -521,6 +591,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -529,6 +600,34 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -547,10 +646,16 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
@ -582,6 +687,18 @@ version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "gloo-timers"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "h2"
version = "0.4.5"
@ -608,6 +725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
@ -877,6 +995,18 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "lnurl-pay"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c042191c2e3f27147decfad8182eea2c7dd1c6c1733562e25d3d401369669d"
dependencies = [
"bech32",
"reqwest",
"serde",
"serde_json",
]
[[package]]
name = "lock_api"
version = "0.4.12"
@ -893,6 +1023,15 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lru"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc"
dependencies = [
"hashbrown",
]
[[package]]
name = "memchr"
version = "2.7.4"
@ -961,6 +1100,103 @@ dependencies = [
"web-sys",
]
[[package]]
name = "nostr-database"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a88a72f92fbd5d2514db36e07a864646f1c1f44931c4a5ea195f6961029af4b3"
dependencies = [
"async-trait",
"lru",
"nostr",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-relay-pool"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7b7bf72b02a24ccc7cf87033fa5ddfd57001c7d8c2e757321a7ca7a6df39876"
dependencies = [
"async-utility",
"async-wsocket",
"atomic-destructor",
"nostr",
"nostr-database",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-sdk"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "005915a59ee6401f23ba510c3a9ac4a07b457f80dfe1dc05cd2c8fdbde439246"
dependencies = [
"async-utility",
"atomic-destructor",
"lnurl-pay",
"nostr",
"nostr-database",
"nostr-relay-pool",
"nostr-signer",
"nostr-zapper",
"nwc",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-signer"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f99449c2077bef43c02c8f9a9386d01c87e7ad8ece70d7de87a2c59771b4c0fe"
dependencies = [
"async-utility",
"nostr",
"nostr-relay-pool",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-zapper"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "430c2527c0efd2e7f1a421b0c7df01a03b334a79c60c39cc7a1ca684f720f2bf"
dependencies = [
"async-trait",
"nostr",
"thiserror",
]
[[package]]
name = "notepush"
version = "0.1.0"
dependencies = [
"a2",
"chrono",
"dotenv",
"env_logger",
"log",
"nostr",
"nostr-sdk",
"r2d2",
"r2d2_sqlite",
"rusqlite",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tungstenite",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -980,6 +1216,20 @@ dependencies = [
"libc",
]
[[package]]
name = "nwc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6fb91e4be3f6b872fc23c7714bbb500a58a1d59f458eb6eb9dd249fbec42fc2"
dependencies = [
"async-utility",
"nostr",
"nostr-relay-pool",
"nostr-zapper",
"thiserror",
"tracing",
]
[[package]]
name = "object"
version = "0.36.1"
@ -1089,6 +1339,16 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pharos"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
dependencies = [
"futures",
"rustc_version",
]
[[package]]
name = "pin-project"
version = "1.1.5"
@ -1209,6 +1469,28 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_sqlite"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a982edf65c129796dba72f8775b292ef482b40d035e827a9825b3bc07ccc5f2"
dependencies = [
"r2d2",
"rusqlite",
"uuid",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -1361,6 +1643,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rustls"
version = "0.22.4"
@ -1431,6 +1722,15 @@ dependencies = [
"cipher",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -1470,6 +1770,18 @@ dependencies = [
"cc",
]
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "send_wrapper"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
[[package]]
name = "serde"
version = "1.0.204"
@ -1698,6 +2010,22 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd"
dependencies = [
"futures-util",
"log",
"rustls 0.23.11",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
"tungstenite",
"webpki-roots",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
@ -1797,6 +2125,8 @@ dependencies = [
"httparse",
"log",
"rand",
"rustls 0.23.11",
"rustls-pki-types",
"sha1",
"thiserror",
"utf-8",
@ -1869,6 +2199,16 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom",
"rand",
]
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -1962,6 +2302,23 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "wasm-ws"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "688c5806d1b06b4f3d90d015e23364dc5d3af412ee64abba6dde8fdc01637e33"
dependencies = [
"async_io_stream",
"futures",
"js-sys",
"pharos",
"send_wrapper",
"thiserror",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.69"

View File

@ -1,5 +1,5 @@
[package]
name = "damus-push-notification-relay"
name = "notepush"
version = "0.1.0"
edition = "2021"
@ -25,3 +25,7 @@ tungstenite = "0.23.0"
nostr = "0.32.1"
log = "0.4"
env_logger = "0.11.3"
nostr-sdk = "0.32.0"
r2d2_sqlite = "0.24.0"
r2d2 = "0.8.10"
dotenv = "0.15.0"

View File

@ -1,6 +1,53 @@
Notepush
========
# Damus Push notification relay
A high performance Nostr relay for sending out push notifications
A high performance Nostr relay for sending out push notifications using the Apple Push Notification Service (APNS).
🔥WIP! Experimental!⚠️🔥
## Installation
1. Get a build or build it yourself using `cargo build --release`
2. On the working directory from which you start this relay, create an `.env` file with the following contents:
```env
APNS_TOPIC="com.your_org.your_app" # Your app's bundle ID
APNS_AUTH_PRIVATE_KEY_FILE_PATH=./AuthKey_1234567890.p8 # Path to the private key file used to generate JWT tokens with the Apple APNS server. You can obtain this from https://developer.apple.com/account/resources/authkeys/list
APNS_AUTH_PRIVATE_KEY_ID=1234567890 # The ID of the private key used to generate JWT tokens with the Apple APNS server. You can obtain this from https://developer.apple.com/account/resources/authkeys/list
APNS_ENVIRONMENT="development" # The environment to use with the APNS server. Can be "development" or "production"
APPLE_TEAM_ID=1248163264 # The ID of the team. Can be found in AppStore Connect.
DB_PATH=./apns_notifications.db # Path to the SQLite database file that will be used to store data about sent notifications, relative to the working directory
RELAY_URL=ws://localhost:7777 # URL to the relay server which will be consulted to get information such as mute lists.
API_BASE_URL=http://localhost:8000 # Base URL from the API is allowed access (used by the server to perform NIP-98 authentication)
HOST="0.0.0.0" # The host to bind the server to (Defaults to 0.0.0.0 to bind to all interfaces)
PORT=9001 # The port to bind the server to. Defaults to 9001
```
6. Run this relay using the built binary or the `cargo run` command. If you want to change the log level, you can set the `RUST_LOG` environment variable to `DEBUG` or `INFO` before running the relay.
Example:
```sh
$ RUST_LOG=DEBUG cargo run
```
## Contributions
For contribution guidelines, please check [this](https://github.com/damus-io/damus/blob/master/docs/CONTRIBUTING.md) document.
## Development setup
1. Install the Rust toolchain
2. Clone this repository
3. Run `cargo build` to build the project
4. Run `cargo test` to run the tests
5. Run `cargo run` to run the project
## Testing utilities
You can use `test/test-inputs` with a websockets test tool such as `websocat` to play around with the relay. If you have Nix installed, you can run:
```sh
$ nix-shell
[nix-shell] $ websocat ws://localhost:9001
<ENTER_FULL_JSON_PAYLOAD_HERE_AND_PRESS_ENTER>
```

5
shell.nix Normal file
View File

@ -0,0 +1,5 @@
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
buildInputs = [ pkgs.websocat ];
}

View File

@ -1,42 +1,59 @@
use std::io::Read;
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::Arc;
use tokio::sync::Mutex;
mod notification_manager;
use log;
use env_logger;
use r2d2_sqlite::SqliteConnectionManager;
mod relay_connection;
use relay_connection::RelayConnection;
use r2d2;
mod notepush_env;
use notepush_env::NotePushEnv;
fn main () {
#[tokio::main]
async fn main () {
// MARK: - Setup basics
env_logger::init();
let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = std::env::var("PORT").unwrap_or_else(|_| "9001".to_string());
let address = format!("{}:{}", host, port);
let server = TcpListener::bind(&address).unwrap();
let env = NotePushEnv::load_env().expect("Failed to load environment variables");
let server = TcpListener::bind(&env.address()).expect("Failed to bind to address");
let notification_manager = Arc::new(Mutex::new(notification_manager::NotificationManager::new(None, None).unwrap()));
let manager = SqliteConnectionManager::file(env.db_path.clone());
let pool: r2d2::Pool<SqliteConnectionManager> = r2d2::Pool::new(manager).expect("Failed to create SQLite connection pool");
// Notification manager is a shared resource that will be used by all connections via a mutex and an atomic reference counter.
// This is shared to avoid data races when reading/writing to the sqlite database, and reduce outgoing relay connections.
let notification_manager = Arc::new(Mutex::new(notification_manager::NotificationManager::new(
pool,
env.relay_url.clone(),
env.apns_private_key_path.clone(),
env.apns_private_key_id.clone(),
env.apns_team_id.clone(),
env.apns_environment.clone(),
env.apns_topic.clone(),
).await.expect("Failed to create notification manager")));
log::info!("Server listening on {}", address);
log::info!("Server listening on {}", env.address().clone());
// MARK: - Start handling incoming connections
for stream in server.incoming() {
if let Ok(stream) = stream {
log::info!("New connection: {:?}", stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string()));
} else if let Err(e) = stream {
log::error!("Error: {:?}", e);
}
thread::spawn (move || {
let peer_address_string = stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string());
log::info!("New connection from {}", peer_address_string);
let notification_manager = notification_manager.clone();
let websocket_connection = RelayConnection::new(stream, notification_manager);
match websocket_connection.start() {
Ok(_) => {}
Err(e) => {
log::error!("Error in websocket connection: {:?}", e);
tokio::spawn(async move {
match RelayConnection::run(stream, notification_manager).await {
Ok(_) => {}
Err(e) => {
log::error!("Error with websocket connection from {}: {:?}", peer_address_string, e);
}
}
}
});
});
} else if let Err(e) = stream {
log::error!("Error in incoming connection stream: {:?}", e);
}
}
}

56
src/notepush_env.rs Normal file
View File

@ -0,0 +1,56 @@
use std::env;
use dotenv::dotenv;
use a2;
const DEFAULT_DB_PATH: &str = "./apns_notifications.db";
const DEFAULT_HOST: &str = "0.0.0.0";
const DEFAULT_PORT: &str = "9001";
const DEFAULT_RELAY_URL: &str = "ws://localhost:7777";
pub struct NotePushEnv {
pub apns_private_key_path: String,
pub apns_private_key_id: String,
pub apns_team_id: String,
pub apns_environment: a2::client::Endpoint,
pub apns_topic: String,
pub db_path: String,
pub host: String,
pub port: String,
pub relay_url: String,
}
impl NotePushEnv {
pub fn load_env() -> Result<NotePushEnv, env::VarError> {
dotenv().ok();
let apns_private_key_path = env::var("APNS_AUTH_PRIVATE_KEY_FILE_PATH")?;
let apns_private_key_id = env::var("APNS_AUTH_PRIVATE_KEY_ID")?;
let apns_team_id = env::var("APPLE_TEAM_ID")?;
let db_path = env::var("DB_PATH").unwrap_or(DEFAULT_DB_PATH.to_string());
let host = env::var("HOST").unwrap_or(DEFAULT_HOST.to_string());
let port = env::var("PORT").unwrap_or(DEFAULT_PORT.to_string());
let relay_url = env::var("RELAY_URL").unwrap_or(DEFAULT_RELAY_URL.to_string());
let apns_environment_string = env::var("APNS_ENVIRONMENT").unwrap_or("development".to_string());
let apns_environment = match apns_environment_string.as_str() {
"development" => a2::client::Endpoint::Sandbox,
"production" => a2::client::Endpoint::Production,
_ => a2::client::Endpoint::Sandbox,
};
let apns_topic = env::var("APNS_TOPIC")?;
Ok(NotePushEnv {
apns_private_key_path,
apns_private_key_id,
apns_team_id,
apns_environment,
apns_topic,
db_path,
host,
port,
relay_url,
})
}
pub fn address(&self) -> String {
format!("{}:{}", self.host, self.port)
}
}

View File

@ -1,48 +1,56 @@
use std::collections::HashSet;
use tokio::sync::mpsc;
use nostr_sdk::prelude::*;
use super::ExtendedEvent;
pub struct MuteManager {
relay_url: String,
client: Option<Client>,
client: Client,
}
impl MuteManager {
pub fn new(relay_url: String) -> Self {
let mut manager = MuteManager {
pub async fn new(relay_url: String) -> Result<Self, Box<dyn std::error::Error>> {
let client = Client::new(&Keys::generate());
client.add_relay(relay_url.clone()).await?;
client.connect().await;
Ok(MuteManager {
relay_url,
client: None,
};
tokio::spawn(async move {
let client = Client::new(&Keys::generate());
client.add_relay(manager.relay_url.clone()).await.unwrap();
client.connect().await;
manager.client = Some(client);
});
manager
client
})
}
pub async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &str) -> bool {
pub async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &PublicKey) -> bool {
if let Some(mute_list) = self.get_public_mute_list(pubkey).await {
for tag in mute_list.tags() {
match tag.as_slice() {
["p", muted_pubkey] => {
if event.pubkey == *muted_pubkey {
return true;
match tag.kind() {
TagKind::SingleLetter(SingleLetterTag { character: Alphabet::P, uppercase: false }) => {
let tagged_pubkey: Option<PublicKey> = tag.content().and_then(|h| { PublicKey::from_hex(h).ok() });
if let Some(tagged_pubkey) = tagged_pubkey {
if event.pubkey == tagged_pubkey {
return true
}
}
}
["e", muted_event_id] => {
if event.id == *muted_event_id || event.referenced_event_ids().contains(muted_event_id) {
return true;
TagKind::SingleLetter(SingleLetterTag { character: Alphabet::E, uppercase: false }) => {
let tagged_event_id: Option<EventId> = tag.content().and_then(|h| { EventId::from_hex(h).ok() });
if let Some(tagged_event_id) = tagged_event_id {
if event.id == tagged_event_id || event.referenced_event_ids().contains(&tagged_event_id) {
return true
}
}
}
["t", muted_hashtag] => {
if event.tags.iter().any(|t| t.to_vec().as_slice() == ["t", muted_hashtag]) {
return true;
TagKind::SingleLetter(SingleLetterTag { character: Alphabet::T, uppercase: false }) => {
let tagged_hashtag: Option<String> = tag.content().map(|h| h.to_string());
if let Some(tagged_hashtag) = tagged_hashtag {
let tags_content = event.get_tags_content(TagKind::SingleLetter(SingleLetterTag { character: Alphabet::T, uppercase: false }));
let should_mute = tags_content.iter().any(|t| t == &tagged_hashtag);
return should_mute
}
}
["word", muted_word] => {
if event.content.to_lowercase().contains(&muted_word.to_lowercase()) {
return true;
TagKind::Word => {
let tagged_word: Option<String> = tag.content().map(|h| h.to_string());
if let Some(tagged_word) = tagged_word {
if event.content.to_lowercase().contains(&tagged_word.to_lowercase()) {
return true
}
}
}
_ => {}
@ -52,45 +60,26 @@ impl MuteManager {
false
}
pub async fn get_public_mute_list(&self, pubkey: &str) -> Option<Event> {
if let Some(client) = &self.client {
let (tx, mut rx) = mpsc::channel(100);
pub async fn get_public_mute_list(&self, pubkey: &PublicKey) -> Option<Event> {
let subscription_filter = Filter::new()
.kinds(vec![Kind::MuteList])
.authors(vec![pubkey.clone()])
.limit(1);
let subscription = Filter::new()
.kinds(vec![Kind::MuteList])
.authors(vec![pubkey.to_string()])
.limit(1);
client.subscribe(vec![subscription]).await;
let mut mute_lists = Vec::new();
while let Some(notification) = rx.recv().await {
if let RelayPoolNotification::Event(_url, event) = notification {
mute_lists.push(event);
break;
let this_subscription_id = self.client.subscribe(Vec::from([subscription_filter]), None).await;
let mut mute_list: Option<Event> = None;
let mut notifications = self.client.notifications();
while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Event { subscription_id, event, .. } = notification {
if this_subscription_id == subscription_id && event.kind == Kind::MuteList {
mute_list = Some((*event).clone());
break
}
}
client.unsubscribe().await;
mute_lists.into_iter().next()
} else {
None
}
}
}
trait EventExt {
fn referenced_event_ids(&self) -> HashSet<String>;
}
impl EventExt for Event {
fn referenced_event_ids(&self) -> HashSet<String> {
self.tags
.iter()
.filter(|tag| tag.first() == Some(&"e".to_string()))
.filter_map(|tag| tag.get(1).cloned())
.collect()
self.client.unsubscribe(this_subscription_id).await;
mute_list
}
}

View File

@ -5,53 +5,45 @@ use nostr::types::Timestamp;
use rusqlite;
use rusqlite::params;
use std::collections::HashSet;
use std::env;
use std::fs::File;
use super::mute_manager::MuteManager;
use nostr::Event;
use super::SqlStringConvertible;
use super::ExtendedEvent;
// MARK: - Constants
const DEFAULT_DB_PATH: &str = "./apns_notifications.db";
const DEFAULT_RELAY_URL: &str = "ws://localhost:7777";
use r2d2_sqlite::SqliteConnectionManager;
use r2d2;
// MARK: - NotificationManager
pub struct NotificationManager {
db_path: String,
db: r2d2::Pool<SqliteConnectionManager>,
relay_url: String,
apns_private_key_path: String,
apns_private_key_id: String,
apns_team_id: String,
db: rusqlite::Connection,
apns_environment: a2::client::Endpoint,
apns_topic: String,
mute_manager: MuteManager,
}
impl NotificationManager {
// MARK: - Initialization
pub fn new(db_path: Option<String>, relay_url: Option<String>) -> Result<Self, Box<dyn std::error::Error>> {
let db_path = db_path.unwrap_or(env::var("DB_PATH").unwrap_or(DEFAULT_DB_PATH.to_string()));
let relay_url = relay_url.unwrap_or(env::var("RELAY_URL").unwrap_or(DEFAULT_RELAY_URL.to_string()));
let apns_private_key_path = env::var("APNS_PRIVATE_KEY_PATH")?;
let apns_private_key_id = env::var("APNS_PRIVATE_KEY_ID")?;
let apns_team_id = env::var("APNS_TEAM_ID")?;
pub async fn new(db: r2d2::Pool<SqliteConnectionManager>, relay_url: String, apns_private_key_path: String, apns_private_key_id: String, apns_team_id: String, apns_environment: a2::client::Endpoint, apns_topic: String) -> Result<Self, Box<dyn std::error::Error>> {
let mute_manager = MuteManager::new(relay_url.clone()).await?;
let db = rusqlite::Connection::open(&db_path)?;
let mute_manager = MuteManager::new(relay_url.clone());
Self::setup_database(&db)?;
let connection = db.get()?;
Self::setup_database(&connection)?;
Ok(Self {
db_path,
relay_url,
apns_private_key_path,
apns_private_key_id,
apns_team_id,
apns_environment,
apns_topic,
db,
mute_manager,
})
@ -117,11 +109,11 @@ impl NotificationManager {
return Ok(());
}
let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event)?;
let pubkeys_to_notify = self.pubkeys_to_notify_for_event(event).await?;
for pubkey in pubkeys_to_notify {
self.send_event_notifications_to_pubkey(event, &pubkey).await?;
self.db.execute(
self.db.get()?.execute(
"INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at)
VALUES (?, ?, ?, ?, ?)",
params![
@ -136,7 +128,7 @@ impl NotificationManager {
Ok(())
}
fn pubkeys_to_notify_for_event(&self, event: &Event) -> Result<HashSet<nostr::PublicKey>, Box<dyn std::error::Error>> {
async fn pubkeys_to_notify_for_event(&self, event: &Event) -> Result<HashSet<nostr::PublicKey>, Box<dyn std::error::Error>> {
let notification_status = self.get_notification_status(event)?;
let relevant_pubkeys = self.pubkeys_relevant_to_event(event)?;
let pubkeys_that_received_notification = notification_status.pubkeys_that_received_notification();
@ -148,7 +140,8 @@ impl NotificationManager {
let mut pubkeys_to_notify = HashSet::new();
for pubkey in relevant_pubkeys_yet_to_receive {
if !self.mute_manager.should_mute_notification_for_pubkey(event, &pubkey)? {
let should_mute: bool = self.mute_manager.should_mute_notification_for_pubkey(event, &pubkey).await;
if !should_mute {
pubkeys_to_notify.insert(pubkey);
}
}
@ -170,7 +163,8 @@ impl NotificationManager {
}
fn pubkeys_subscribed_to_event_id(&self, event_id: &EventId) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
let mut stmt = self.db.prepare("SELECT pubkey FROM notifications WHERE event_id = ?")?;
let connection = self.db.get()?;
let mut stmt = connection.prepare("SELECT pubkey FROM notifications WHERE event_id = ?")?;
let pubkeys = stmt.query_map([event_id.to_sql_string()], |row| row.get(0))?
.filter_map(|r| r.ok())
.filter_map(|r: String| PublicKey::from_sql_string(r).ok())
@ -187,7 +181,8 @@ impl NotificationManager {
}
fn get_user_device_tokens(&self, pubkey: &PublicKey) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let mut stmt = self.db.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?;
let connection = self.db.get()?;
let mut stmt = connection.prepare("SELECT device_token FROM user_info WHERE pubkey = ?")?;
let device_tokens = stmt.query_map([pubkey.to_sql_string()], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
@ -195,7 +190,8 @@ impl NotificationManager {
}
fn get_notification_status(&self, event: &Event) -> Result<NotificationStatus, Box<dyn std::error::Error>> {
let mut stmt = self.db.prepare("SELECT pubkey, received_notification FROM notifications WHERE event_id = ?")?;
let connection = self.db.get()?;
let mut stmt = connection.prepare("SELECT pubkey, received_notification FROM notifications WHERE event_id = ?")?;
let rows: std::collections::HashMap<PublicKey, bool> = stmt.query_map([event.id.to_sql_string()], |row| {
Ok((row.get(0)?, row.get(1)?))
})?
@ -231,6 +227,7 @@ impl NotificationManager {
Default::default()
);
payload.add_custom_data("nostr_event", event);
payload.options.apns_topic = Some(self.apns_topic.as_str());
let mut file = File::open(&self.apns_private_key_path)?;
@ -238,7 +235,8 @@ impl NotificationManager {
&mut file,
&self.apns_private_key_id,
&self.apns_team_id,
ClientConfig::default())?;
ClientConfig::new(self.apns_environment.clone())
)?;
let _response = client.send(payload).await?;
Ok(())
@ -253,7 +251,7 @@ impl NotificationManager {
pub fn save_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box<dyn std::error::Error>> {
let current_time_unix = Timestamp::now();
self.db.execute(
self.db.get()?.execute(
"INSERT OR REPLACE INTO user_info (id, pubkey, device_token, added_at) VALUES (?, ?, ?, ?)",
params![
format!("{}:{}", pubkey, device_token),
@ -266,7 +264,7 @@ impl NotificationManager {
}
pub fn remove_user_device_info(&self, pubkey: &str, device_token: &str) -> Result<(), Box<dyn std::error::Error>> {
self.db.execute(
self.db.get()?.execute(
"DELETE FROM user_info WHERE pubkey = ? AND device_token = ?",
params![pubkey, device_token],
)?;
@ -287,11 +285,3 @@ impl NotificationStatus {
.collect()
}
}
impl Drop for NotificationManager {
fn drop(&mut self) {
if let Err(e) = self.db.close() {
eprintln!("Error closing database: {:?}", e);
}
}
}

View File

@ -1,62 +1,91 @@
use damus_push_notification_relay::log_describable::LogDescribable;
use nostr::util::JsonUtil;
use nostr::{RelayMessage, ClientMessage};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;
use serde_json::Value;
use notification_manager::NotificationManager;
use crate::notification_manager::NotificationManager;
use std::str::FromStr;
use std::net::TcpStream;
use tungstenite::accept;
use tungstenite::{accept, WebSocket};
use log;
use std::fmt;
use std::fmt::{self, Debug};
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
pub struct RelayConnection {
stream: TcpStream,
websocket: WebSocket<TcpStream>,
notification_manager: Arc<Mutex<NotificationManager>>
}
impl RelayConnection {
pub fn new(stream: TcpStream, notification_manager: Arc<Mutex<NotificationManager>>) -> Self {
RelayConnection {
stream,
// MARK: - Initializers
pub fn new(stream: TcpStream, notification_manager: Arc<Mutex<NotificationManager>>) -> Result<Self, Box<dyn std::error::Error>> {
let address = stream.peer_addr()?;
let websocket = accept(stream)?;
log::info!("Accepted connection from {:?}", address);
Ok(RelayConnection {
websocket,
notification_manager
}
})
}
pub fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let notification_manager = NotificationManager::new(None, None)?;
let mut websocket = accept(self.stream)?;
pub async fn run(stream: TcpStream, notification_manager: Arc<Mutex<NotificationManager>>) -> Result<(), Box<dyn std::error::Error>> {
let mut connection = RelayConnection::new(stream, notification_manager)?;
Ok(connection.run_loop().await?)
}
// MARK: - Connection Runtime management
pub async fn run_loop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut consecutive_errors = 0;
log::debug!("Starting run loop for connection with {:?}", self.websocket);
loop {
match self.run_loop_iteration() {
Ok(_) => {}
match self.run_loop_iteration().await {
Ok(_) => {
consecutive_errors = 0;
}
Err(e) => {
log::error!("Error in {}: {:?}", self, e);
log::error!("Error in websocket connection with {:?}: {:?}", self.websocket, e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
log::error!("Too many consecutive errors, closing connection with {:?}", self.websocket);
return Err(e);
}
}
}
}
}
fn run_loop_iteration(&self) -> Result<(), Box<dyn std::error::Error>> {
let raw_message = self.stream.read()?;
pub async fn run_loop_iteration<'a>(&'a mut self) -> Result<(), Box<dyn std::error::Error>> {
let websocket = &mut self.websocket;
let raw_message = websocket.read()?;
if raw_message.is_text() {
let message: ClientMessage = ClientMessage::from_value(Value::from_str(raw_msg.to_text()?)?)?;
let response = self.handle_client_message(message)?;
self.stream.send(response.try_as_json()?)?;
let message: ClientMessage = ClientMessage::from_value(Value::from_str(raw_message.to_text()?)?)?;
let response = self.handle_client_message(message).await?;
self.websocket.send(tungstenite::Message::text(response.try_as_json()?))?;
}
Ok(())
}
fn handle_client_message(&self, message: ClientMessage) -> Result<RelayMessage, Box<dyn std::error::Error>> {
// MARK: - Message handling
async fn handle_client_message<'b>(&'b self, message: ClientMessage) -> Result<RelayMessage, Box<dyn std::error::Error>> {
match message {
ClientMessage::Event(event) => {
log::info("Received event: {:?}", event);
self.notification_manager.lock()?.send_notification_if_needed(&event)?;
log::info!("Received event: {:?}", event);
{
// TODO: Reduce resource contention by reducing the scope of the mutex into NotificationManager logic.
let mutex_guard = self.notification_manager.lock().await;
mutex_guard.send_notifications_if_needed(&event).await?;
}; // Only hold the mutex for as little time as possible.
let notice_message = format!("blocked: This relay does not store events");
let response = RelayMessage::Ok { event_id: event.id, status: false, message: notice_message };
Ok(response)
}
_ => {
log::info("Received unsupported message: {:?}", message);
log::info!("Received unsupported message: {:?}", message);
let notice_message = format!("Unsupported message: {:?}", message);
let response = RelayMessage::Notice { message: notice_message };
Ok(response)
@ -65,10 +94,8 @@ impl RelayConnection {
}
}
impl fmt::Debug for RelayConnection {
impl Debug for RelayConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let peer_address = self.stream.peer_addr().map_or("unknown".to_string(), |addr| addr.to_string());
let description = format!("relay connection with {}", peer_address);
write!(f, "{}", description)
write!(f, "RelayConnection with websocket: {:?}", self.websocket)
}
}

View File

@ -1,5 +1 @@
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.1","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
{"type": "new","receivedAt":12345,"sourceType":"IP4","sourceInfo": "127.0.0.2","event":{"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}}
["EVENT", {"id": "68421a122cef086512b2c5bd29ca6285ced8bd8e302e347e3c5d90466c860a76","pubkey": "16c21558762108afc34e4ff19e4ed51d9a48f79e0c34531efc423d21ab435e93","created_at": 1720408658,"kind": 1,"tags": [],"content": "hi","sig": "7b76471744ded2b720ca832cdc89e670f6093ce38aeef55a5c6a4e077883d7d80dda1e9051032fb1faa1c3c212c517e93ee42b3ceac8e8e9b04bad46a361de90"}]