diff --git a/src/fetch.rs b/src/fetch.rs index ab1cba0..fed62ff 100644 --- a/src/fetch.rs +++ b/src/fetch.rs @@ -2,10 +2,10 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; -use nostr::{Event, EventId, Filter, Kind, Url}; use nostr::prelude::Nip19; +use nostr::{Event, EventId, Filter, Kind, Url}; use nostr_sdk::{FilterOptions, RelayOptions, RelayPool}; -use tokio::sync::{Mutex, oneshot}; +use tokio::sync::{oneshot, Mutex}; struct QueueItem { pub handler: oneshot::Sender>, @@ -30,8 +30,7 @@ impl FetchQueue { let pool_lock = self.pool.lock().await; pool_lock .add_relay(relay.clone(), RelayOptions::default()) - .await - .unwrap(); + .await?; if let Err(e) = pool_lock.connect_relay(relay, None).await { Err(anyhow::Error::new(e)) } else { @@ -52,21 +51,23 @@ impl FetchQueue { pub async fn process_queue(&self) { let mut q_lock = self.queue.lock().await; - if let Some(q) = q_lock.pop_front() { - let pool_lock = self.pool.lock().await; - let filters = vec![Self::nip19_to_filter(&q.request).unwrap()]; - //info!("Sending filters: {:?}", filters); - if let Ok(evs) = pool_lock - .get_events_of(filters, Duration::from_secs(5), FilterOptions::ExitOnEOSE) - .await - { - if let Some(e) = evs.first() { - q.handler.send(Some(e.clone())).unwrap(); - } else { - q.handler.send(None).unwrap(); - } - } else { - q.handler.send(None).unwrap(); + let mut batch = Vec::new(); + while let Some(q) = q_lock.pop_front() { + batch.push(q); + } + let filters: Vec = + batch.iter().map(move |x| Self::nip19_to_filter(&x.request).unwrap()).collect(); + + let pool_lock = self.pool.lock().await; + info!("Sending filters: {:?}", &filters); + if let Ok(evs) = pool_lock + .get_events_of(filters, Duration::from_secs(2), FilterOptions::ExitOnEOSE) + .await + { + for b in batch { + let f = Self::nip19_to_filter(&b.request).unwrap(); + let ev = evs.iter().find(|e| f.match_event(e)); + b.handler.send(ev.cloned()).unwrap() } } }