From ac91e30db71e7c6f830921d66f87696f6e0c4a78 Mon Sep 17 00:00:00 2001 From: kieran Date: Fri, 16 Aug 2024 22:55:46 +0100 Subject: [PATCH] Basic fetch --- Cargo.lock | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/events.rs | 12 ++- src/fetch.rs | 78 +++++++++++++++ src/main.rs | 16 +++ 5 files changed, 371 insertions(+), 2 deletions(-) create mode 100644 src/fetch.rs diff --git a/Cargo.lock b/Cargo.lock index 96459c0..fe1f146 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,6 +104,47 @@ dependencies = [ "syn", ] +[[package]] +name = "async-utility" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a349201d80b4aa18d17a34a182bdd7f8ddf845e9e57d2ea130a12e10ef1e3a47" +dependencies = [ + "futures-util", + "gloo-timers", + "tokio", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-wsocket" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3445f8f330db8e5f3be7912f170f32e43fec90d995c71ced1ec3b8394b4a873c" +dependencies = [ + "async-utility", + "futures-util", + "thiserror", + "tokio", + "tokio-rustls", + "tokio-socks", + "tokio-tungstenite", + "url", + "wasm-ws", + "webpki-roots", +] + +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", +] + [[package]] name = "atomic" version = "0.5.3" @@ -119,6 +160,15 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-destructor" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d919cb60ba95c87ba42777e9e246c4e8d658057299b437b7512531ce0a09a23" +dependencies = [ + "tracing", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -387,6 +437,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "deranged" version = "0.3.11" @@ -547,6 +603,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -569,12 +626,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -596,6 +675,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -661,6 +741,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.26" @@ -971,6 +1063,18 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lnurl-pay" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c042191c2e3f27147decfad8182eea2c7dd1c6c1733562e25d3d401369669d" +dependencies = [ + "bech32", + "reqwest", + "serde", + "serde_json", +] + [[package]] name = "lock_api" version = "0.4.12" @@ -1122,6 +1226,67 @@ dependencies = [ "tracing", ] +[[package]] +name = "nostr-relay-pool" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afa5502a3df456790ca16d90cc688a677117d57ab56b079dcfa091390ac9f202" +dependencies = [ + "async-utility", + "async-wsocket", + "atomic-destructor", + "nostr", + "nostr-database", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-sdk" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b427dceefbbb49a9dd98abb8c4e40d25fdd467e99821aaad88615252bdb915bd" +dependencies = [ + "async-utility", + "atomic-destructor", + "lnurl-pay", + "nostr", + "nostr-database", + "nostr-relay-pool", + "nostr-signer", + "nostr-zapper", + "nwc", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-signer" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "665268b316f41cd8fa791be54b6c7935c5a239461708c380a699d6677be9af38" +dependencies = [ + "async-utility", + "nostr", + "nostr-relay-pool", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "nostr-zapper" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69922e74f8eab1f9d287008c0c06acdec87277a2d8f44bd9d38e003422aea0ab" +dependencies = [ + "async-trait", + "nostr", + "thiserror", +] + [[package]] name = "nostr_services_rs" version = "0.1.0" @@ -1130,6 +1295,7 @@ dependencies = [ "log", "nostr", "nostr-database", + "nostr-sdk", "pretty_env_logger", "rocket", "serde", @@ -1163,6 +1329,20 @@ dependencies = [ "libc", ] +[[package]] +name = "nwc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2e04b3edb5e9572e95b62842430625f1718e8a4a3596a30aeb04e6734764ea" +dependencies = [ + "async-utility", + "nostr", + "nostr-relay-pool", + "nostr-zapper", + "thiserror", + "tracing", +] + [[package]] name = "object" version = "0.36.1" @@ -1288,6 +1468,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -1830,6 +2020,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.204" @@ -1883,6 +2079,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2160,6 +2367,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki-roots", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -2301,6 +2524,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -2381,6 +2624,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "valuable" version = "0.1.0" @@ -2474,6 +2723,23 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "wasm-ws" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c5806d1b06b4f3d90d015e23364dc5d3af412ee64abba6dde8fdc01637e33" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "pharos", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.69" diff --git a/Cargo.toml b/Cargo.toml index d6650ba..9bf4295 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread", "macros"] } nostr = "0.33.0" +nostr-sdk = { version = "0.33.0" } rocket = { version = "0.5.1", features = ["json"] } serde = { version = "1.0.204", features = ["derive"] } log = "0.4.22" diff --git a/src/events.rs b/src/events.rs index 61df29b..ff6f80a 100644 --- a/src/events.rs +++ b/src/events.rs @@ -4,6 +4,7 @@ use rocket::{Route, State}; use rocket::http::Status; use rocket::serde::json::Json; +use crate::fetch::FetchQueue; use crate::store::SledDatabase; pub fn routes() -> Vec { @@ -29,8 +30,9 @@ async fn import_event( } #[rocket::get("/event/")] -fn get_event( +async fn get_event( db: &State, + fetch: &State, id: &str, ) -> Option> { let id = match Nip19::from_bech32(id) { @@ -39,7 +41,13 @@ fn get_event( }; match db.event_by_id(&id) { Ok(ev) => Some(Json::from(ev)), - _ => None + _ => { + let mut fetch = fetch.inner().clone(); + match fetch.demand(&id).await.await { + Ok(Some(ev)) => Some(Json::from(ev)), + _ => None + } + } } } diff --git a/src/fetch.rs b/src/fetch.rs new file mode 100644 index 0000000..e78d8c8 --- /dev/null +++ b/src/fetch.rs @@ -0,0 +1,78 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; + +use nostr::{Event, Filter, Url}; +use nostr::prelude::Nip19; +use nostr_sdk::{FilterOptions, RelayOptions, RelayPool}; +use tokio::sync::{Mutex, oneshot}; + +struct QueueItem { + pub handler: oneshot::Sender>, + pub request: Nip19, +} + +#[derive(Clone)] +pub struct FetchQueue { + queue: Arc>>, + pool: Arc>, +} + +impl FetchQueue { + pub fn new() -> Self { + Self { + queue: Arc::new(Mutex::new(VecDeque::new())), + pool: Default::default(), + } + } + + pub async fn add_relay(&mut self, relay: Url) -> Result { + let pool_lock = self.pool.lock().await; + pool_lock.add_relay(relay.clone(), RelayOptions::default()).await.unwrap(); + if let Err(e) = pool_lock.connect_relay(relay, None).await { + Err(anyhow::Error::new(e)) + } else { + Ok(true) + } + } + + pub async fn demand(&mut self, ent: &Nip19) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + + let mut q_lock = self.queue.lock().await; + q_lock.push_back(QueueItem { + handler: tx, + request: ent.clone(), + }); + rx + } + + pub async fn process_queue(&self) { + let mut q_lock = self.queue.lock().await; + if let Some(q) = q_lock.pop_front() { + let pool_lock = self.pool.lock().await; + let filters = vec![Self::nip19_to_filter(&q.request).unwrap()]; + //info!("Sending filters: {:?}", filters); + if let Ok(evs) = pool_lock.get_events_of(filters, Duration::from_secs(5), FilterOptions::ExitOnEOSE).await { + if let Some(e) = evs.first() { + q.handler.send(Some(e.clone())).unwrap(); + } else { + q.handler.send(None).unwrap(); + } + } else { + q.handler.send(None).unwrap(); + } + } + } + + fn nip19_to_filter(filter: &Nip19) -> Option { + match filter { + Nip19::Coordinate(c) => Some(Filter::from(c)), + Nip19::Event(e) => Some(Filter::new() + .id(e.event_id) + ), + Nip19::EventId(e) => Some(Filter::new().id(*e)), + _ => None, + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c5ba093..53684da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,17 @@ #[macro_use] extern crate rocket; +use std::time::Duration; + use anyhow::Error; use rocket::shield::Shield; +use crate::fetch::FetchQueue; use crate::store::SledDatabase; mod events; mod store; +mod fetch; #[rocket::main] async fn main() -> Result<(), Error> { @@ -15,8 +19,20 @@ async fn main() -> Result<(), Error> { let db = SledDatabase::new("nostr.db"); + let mut fetch = FetchQueue::new(); + fetch.add_relay("wss://relay.snort.social".parse().unwrap()).await.unwrap(); + + let fetch2 = fetch.clone(); + tokio::spawn(async move { + loop { + fetch2.process_queue().await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let rocket = rocket::Rocket::build() .manage(db) + .manage(fetch) .attach(Shield::new()) // disable .mount("/", events::routes()) .launch()