Basic fetch

This commit is contained in:
kieran 2024-08-16 22:55:46 +01:00
parent 70e0d70d8b
commit ac91e30db7
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
5 changed files with 371 additions and 2 deletions

266
Cargo.lock generated
View File

@ -104,6 +104,47 @@ dependencies = [
"syn",
]
[[package]]
name = "async-utility"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a349201d80b4aa18d17a34a182bdd7f8ddf845e9e57d2ea130a12e10ef1e3a47"
dependencies = [
"futures-util",
"gloo-timers",
"tokio",
"wasm-bindgen-futures",
]
[[package]]
name = "async-wsocket"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3445f8f330db8e5f3be7912f170f32e43fec90d995c71ced1ec3b8394b4a873c"
dependencies = [
"async-utility",
"futures-util",
"thiserror",
"tokio",
"tokio-rustls",
"tokio-socks",
"tokio-tungstenite",
"url",
"wasm-ws",
"webpki-roots",
]
[[package]]
name = "async_io_stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
dependencies = [
"futures",
"pharos",
"rustc_version",
]
[[package]]
name = "atomic"
version = "0.5.3"
@ -119,6 +160,15 @@ dependencies = [
"bytemuck",
]
[[package]]
name = "atomic-destructor"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d919cb60ba95c87ba42777e9e246c4e8d658057299b437b7512531ce0a09a23"
dependencies = [
"tracing",
]
[[package]]
name = "autocfg"
version = "1.3.0"
@ -387,6 +437,12 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "deranged"
version = "0.3.11"
@ -547,6 +603,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -569,12 +626,34 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -596,6 +675,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -661,6 +741,18 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-timers"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "h2"
version = "0.3.26"
@ -971,6 +1063,18 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lnurl-pay"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c042191c2e3f27147decfad8182eea2c7dd1c6c1733562e25d3d401369669d"
dependencies = [
"bech32",
"reqwest",
"serde",
"serde_json",
]
[[package]]
name = "lock_api"
version = "0.4.12"
@ -1122,6 +1226,67 @@ dependencies = [
"tracing",
]
[[package]]
name = "nostr-relay-pool"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afa5502a3df456790ca16d90cc688a677117d57ab56b079dcfa091390ac9f202"
dependencies = [
"async-utility",
"async-wsocket",
"atomic-destructor",
"nostr",
"nostr-database",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-sdk"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b427dceefbbb49a9dd98abb8c4e40d25fdd467e99821aaad88615252bdb915bd"
dependencies = [
"async-utility",
"atomic-destructor",
"lnurl-pay",
"nostr",
"nostr-database",
"nostr-relay-pool",
"nostr-signer",
"nostr-zapper",
"nwc",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-signer"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "665268b316f41cd8fa791be54b6c7935c5a239461708c380a699d6677be9af38"
dependencies = [
"async-utility",
"nostr",
"nostr-relay-pool",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "nostr-zapper"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69922e74f8eab1f9d287008c0c06acdec87277a2d8f44bd9d38e003422aea0ab"
dependencies = [
"async-trait",
"nostr",
"thiserror",
]
[[package]]
name = "nostr_services_rs"
version = "0.1.0"
@ -1130,6 +1295,7 @@ dependencies = [
"log",
"nostr",
"nostr-database",
"nostr-sdk",
"pretty_env_logger",
"rocket",
"serde",
@ -1163,6 +1329,20 @@ dependencies = [
"libc",
]
[[package]]
name = "nwc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2e04b3edb5e9572e95b62842430625f1718e8a4a3596a30aeb04e6734764ea"
dependencies = [
"async-utility",
"nostr",
"nostr-relay-pool",
"nostr-zapper",
"thiserror",
"tracing",
]
[[package]]
name = "object"
version = "0.36.1"
@ -1288,6 +1468,16 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pharos"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
dependencies = [
"futures",
"rustc_version",
]
[[package]]
name = "pin-project"
version = "1.1.5"
@ -1830,6 +2020,12 @@ version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "send_wrapper"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
[[package]]
name = "serde"
version = "1.0.204"
@ -1883,6 +2079,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha2"
version = "0.10.8"
@ -2160,6 +2367,22 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd"
dependencies = [
"futures-util",
"log",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tungstenite",
"webpki-roots",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
@ -2301,6 +2524,26 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"rustls",
"rustls-pki-types",
"sha1",
"thiserror",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -2381,6 +2624,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "valuable"
version = "0.1.0"
@ -2474,6 +2723,23 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "wasm-ws"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "688c5806d1b06b4f3d90d015e23364dc5d3af412ee64abba6dde8fdc01637e33"
dependencies = [
"async_io_stream",
"futures",
"js-sys",
"pharos",
"send_wrapper",
"thiserror",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.69"

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
tokio = { version = "1.38.1", features = ["rt", "rt-multi-thread", "macros"] }
nostr = "0.33.0"
nostr-sdk = { version = "0.33.0" }
rocket = { version = "0.5.1", features = ["json"] }
serde = { version = "1.0.204", features = ["derive"] }
log = "0.4.22"

View File

@ -4,6 +4,7 @@ use rocket::{Route, State};
use rocket::http::Status;
use rocket::serde::json::Json;
use crate::fetch::FetchQueue;
use crate::store::SledDatabase;
pub fn routes() -> Vec<Route> {
@ -29,8 +30,9 @@ async fn import_event(
}
#[rocket::get("/event/<id>")]
fn get_event(
async fn get_event(
db: &State<SledDatabase>,
fetch: &State<FetchQueue>,
id: &str,
) -> Option<Json<Event>> {
let id = match Nip19::from_bech32(id) {
@ -39,9 +41,15 @@ fn get_event(
};
match db.event_by_id(&id) {
Ok(ev) => Some(Json::from(ev)),
_ => {
let mut fetch = fetch.inner().clone();
match fetch.demand(&id).await.await {
Ok(Some(ev)) => Some(Json::from(ev)),
_ => None
}
}
}
}
#[rocket::get("/event/<kind>/<pubkey>")]
fn get_event_by_kind(

78
src/fetch.rs Normal file
View File

@ -0,0 +1,78 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use nostr::{Event, Filter, Url};
use nostr::prelude::Nip19;
use nostr_sdk::{FilterOptions, RelayOptions, RelayPool};
use tokio::sync::{Mutex, oneshot};
struct QueueItem {
pub handler: oneshot::Sender<Option<Event>>,
pub request: Nip19,
}
#[derive(Clone)]
pub struct FetchQueue {
queue: Arc<Mutex<VecDeque<QueueItem>>>,
pool: Arc<Mutex<RelayPool>>,
}
impl FetchQueue {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
pool: Default::default(),
}
}
pub async fn add_relay(&mut self, relay: Url) -> Result<bool, anyhow::Error> {
let pool_lock = self.pool.lock().await;
pool_lock.add_relay(relay.clone(), RelayOptions::default()).await.unwrap();
if let Err(e) = pool_lock.connect_relay(relay, None).await {
Err(anyhow::Error::new(e))
} else {
Ok(true)
}
}
pub async fn demand(&mut self, ent: &Nip19) -> oneshot::Receiver<Option<Event>> {
let (tx, rx) = oneshot::channel();
let mut q_lock = self.queue.lock().await;
q_lock.push_back(QueueItem {
handler: tx,
request: ent.clone(),
});
rx
}
pub async fn process_queue(&self) {
let mut q_lock = self.queue.lock().await;
if let Some(q) = q_lock.pop_front() {
let pool_lock = self.pool.lock().await;
let filters = vec![Self::nip19_to_filter(&q.request).unwrap()];
//info!("Sending filters: {:?}", filters);
if let Ok(evs) = pool_lock.get_events_of(filters, Duration::from_secs(5), FilterOptions::ExitOnEOSE).await {
if let Some(e) = evs.first() {
q.handler.send(Some(e.clone())).unwrap();
} else {
q.handler.send(None).unwrap();
}
} else {
q.handler.send(None).unwrap();
}
}
}
fn nip19_to_filter(filter: &Nip19) -> Option<Filter> {
match filter {
Nip19::Coordinate(c) => Some(Filter::from(c)),
Nip19::Event(e) => Some(Filter::new()
.id(e.event_id)
),
Nip19::EventId(e) => Some(Filter::new().id(*e)),
_ => None,
}
}
}

View File

@ -1,13 +1,17 @@
#[macro_use]
extern crate rocket;
use std::time::Duration;
use anyhow::Error;
use rocket::shield::Shield;
use crate::fetch::FetchQueue;
use crate::store::SledDatabase;
mod events;
mod store;
mod fetch;
#[rocket::main]
async fn main() -> Result<(), Error> {
@ -15,8 +19,20 @@ async fn main() -> Result<(), Error> {
let db = SledDatabase::new("nostr.db");
let mut fetch = FetchQueue::new();
fetch.add_relay("wss://relay.snort.social".parse().unwrap()).await.unwrap();
let fetch2 = fetch.clone();
tokio::spawn(async move {
loop {
fetch2.process_queue().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let rocket = rocket::Rocket::build()
.manage(db)
.manage(fetch)
.attach(Shield::new()) // disable
.mount("/", events::routes())
.launch()