Batch requests

This commit is contained in:
kieran 2024-09-05 10:29:30 +01:00
parent d828a4b0f0
commit ae107978f7
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941

View File

@ -2,10 +2,10 @@ use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use nostr::{Event, EventId, Filter, Kind, Url};
use nostr::prelude::Nip19; use nostr::prelude::Nip19;
use nostr::{Event, EventId, Filter, Kind, Url};
use nostr_sdk::{FilterOptions, RelayOptions, RelayPool}; use nostr_sdk::{FilterOptions, RelayOptions, RelayPool};
use tokio::sync::{Mutex, oneshot}; use tokio::sync::{oneshot, Mutex};
struct QueueItem { struct QueueItem {
pub handler: oneshot::Sender<Option<Event>>, pub handler: oneshot::Sender<Option<Event>>,
@ -30,8 +30,7 @@ impl FetchQueue {
let pool_lock = self.pool.lock().await; let pool_lock = self.pool.lock().await;
pool_lock pool_lock
.add_relay(relay.clone(), RelayOptions::default()) .add_relay(relay.clone(), RelayOptions::default())
.await .await?;
.unwrap();
if let Err(e) = pool_lock.connect_relay(relay, None).await { if let Err(e) = pool_lock.connect_relay(relay, None).await {
Err(anyhow::Error::new(e)) Err(anyhow::Error::new(e))
} else { } else {
@ -52,21 +51,23 @@ impl FetchQueue {
pub async fn process_queue(&self) { pub async fn process_queue(&self) {
let mut q_lock = self.queue.lock().await; let mut q_lock = self.queue.lock().await;
if let Some(q) = q_lock.pop_front() { let mut batch = Vec::new();
while let Some(q) = q_lock.pop_front() {
batch.push(q);
}
let filters: Vec<Filter> =
batch.iter().map(move |x| Self::nip19_to_filter(&x.request).unwrap()).collect();
let pool_lock = self.pool.lock().await; let pool_lock = self.pool.lock().await;
let filters = vec![Self::nip19_to_filter(&q.request).unwrap()]; info!("Sending filters: {:?}", &filters);
//info!("Sending filters: {:?}", filters);
if let Ok(evs) = pool_lock if let Ok(evs) = pool_lock
.get_events_of(filters, Duration::from_secs(5), FilterOptions::ExitOnEOSE) .get_events_of(filters, Duration::from_secs(2), FilterOptions::ExitOnEOSE)
.await .await
{ {
if let Some(e) = evs.first() { for b in batch {
q.handler.send(Some(e.clone())).unwrap(); let f = Self::nip19_to_filter(&b.request).unwrap();
} else { let ev = evs.iter().find(|e| f.match_event(e));
q.handler.send(None).unwrap(); b.handler.send(ev.cloned()).unwrap()
}
} else {
q.handler.send(None).unwrap();
} }
} }
} }