feat: query system

This commit is contained in:
kieran 2024-10-16 10:59:28 +01:00
parent 9a8bb54e08
commit f0a50918a8
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
25 changed files with 536 additions and 317 deletions

3
.gitignore vendored
View File

@ -1 +1,4 @@
/target
/lock.mdb
/data.mdb
/.idea

21
Cargo.lock generated
View File

@ -138,9 +138,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.88"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "arbitrary"
@ -203,9 +203,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.82"
version = "0.1.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1"
checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
dependencies = [
"proc-macro2",
"quote",
@ -4403,6 +4403,15 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "uuid"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [
"getrandom",
]
[[package]]
name = "v_frame"
version = "0.3.8"
@ -5237,7 +5246,10 @@ checksum = "ec7a2a501ed189703dba8b08142f057e887dfc4b2cc4db2d343ac6376ba3e0b9"
name = "zap_stream_app"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bech32",
"chrono",
"eframe",
"egui",
"egui-video",
@ -5250,6 +5262,7 @@ dependencies = [
"nostrdb",
"pretty_env_logger",
"tokio",
"uuid",
]
[[package]]

View File

@ -18,3 +18,7 @@ egui_inbox = "0.6.0"
bech32 = "0.11.0"
libc = "0.2.158"
egui-video = { git = "https://github.com/v0l/egui-video.git", rev = "4766d939ce4d34b5a3a57b2fbe750ea10f389f39" }
uuid = { version = "1.11.0", features = ["v4"] }
chrono = "0.4.38"
anyhow = "1.0.89"
async-trait = "0.1.83"

View File

@ -14,13 +14,18 @@ pub struct ZapStreamApp {
impl ZapStreamApp {
pub fn new(cc: &CreationContext) -> Self {
let client = Client::builder().database(MemoryDatabase::with_opts(Default::default())).build();
let client = Client::builder()
.database(MemoryDatabase::with_opts(Default::default()))
.build();
let notifications = client.notifications();
let ctx_clone = cc.egui_ctx.clone();
let client_clone = client.clone();
tokio::spawn(async move {
client_clone.add_relay("wss://nos.lol").await.expect("Failed to add relay");
client_clone
.add_relay("wss://nos.lol")
.await
.expect("Failed to add relay");
client_clone.connect().await;
let mut notifications = client_clone.notifications();
while let Ok(_) = notifications.recv().await {
@ -48,8 +53,6 @@ impl App for ZapStreamApp {
egui::CentralPanel::default()
.frame(app_frame)
.show(ctx, |ui| {
self.router.show(ui)
});
.show(ctx, |ui| self.router.show(ui));
}
}

View File

@ -42,7 +42,13 @@ pub enum NostrLinkType {
}
impl NostrLink {
pub fn new(hrp: NostrLinkType, id: IdOrStr, kind: Option<u32>, author: Option<[u8; 32]>, relays: Vec<String>) -> Self {
pub fn new(
hrp: NostrLinkType,
id: IdOrStr,
kind: Option<u32>,
author: Option<[u8; 32]>,
relays: Vec<String>,
) -> Self {
Self {
hrp,
id,
@ -56,7 +62,14 @@ impl NostrLink {
if note.kind() >= 30_000 && note.kind() < 40_000 {
Self {
hrp: NostrLinkType::Coordinate,
id: IdOrStr::Str(note.get_tag_value("d").unwrap().variant().str().unwrap().to_string()),
id: IdOrStr::Str(
note.get_tag_value("d")
.unwrap()
.variant()
.str()
.unwrap()
.to_string(),
),
kind: Some(note.kind()),
author: Some(note.pubkey().clone()),
relays: vec![],
@ -82,7 +95,12 @@ impl NostrLink {
pub fn to_tag_value(&self) -> String {
if self.hrp == NostrLinkType::Coordinate {
format!("{}:{}:{}", self.kind.unwrap(), hex::encode(self.author.unwrap()), self.id)
format!(
"{}:{}:{}",
self.kind.unwrap(),
hex::encode(self.author.unwrap()),
self.id
)
} else {
self.id.to_string()
}
@ -93,18 +111,17 @@ impl TryInto<Filter> for &NostrLink {
type Error = ();
fn try_into(self) -> Result<Filter, Self::Error> {
match self.hrp {
NostrLinkType::Coordinate => {
Ok(Filter::new()
.tags([match self.id {
NostrLinkType::Coordinate => Ok(Filter::new()
.tags(
[match self.id {
IdOrStr::Str(ref s) => s.to_owned(),
IdOrStr::Id(ref i) => hex::encode(i)
}], 'd')
.build())
}
NostrLinkType::Event | NostrLinkType::Note => {
Ok(Filter::new().build())
}
_ => Err(())
IdOrStr::Id(ref i) => hex::encode(i),
}],
'd',
)
.build()),
NostrLinkType::Event | NostrLinkType::Note => Ok(Filter::new().build()),
_ => Err(()),
}
}
}
@ -133,12 +150,17 @@ impl Display for NostrLink {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.hrp {
NostrLinkType::Note | NostrLinkType::PrivateKey | NostrLinkType::PublicKey => {
Ok(bech32::encode_to_fmt::<NoChecksum, Formatter>(f, self.hrp.to_hrp(), match &self.id {
Ok(bech32::encode_to_fmt::<NoChecksum, Formatter>(
f,
self.hrp.to_hrp(),
match &self.id {
IdOrStr::Str(s) => s.as_bytes(),
IdOrStr::Id(i) => i
}).map_err(|e| std::fmt::Error)?)
IdOrStr::Id(i) => i,
},
)
.map_err(|e| std::fmt::Error)?)
}
NostrLinkType::Event | NostrLinkType::Profile | NostrLinkType::Coordinate => todo!()
NostrLinkType::Event | NostrLinkType::Profile | NostrLinkType::Coordinate => todo!(),
}
}
}

View File

@ -1,24 +1,24 @@
use eframe::Renderer;
use crate::app::ZapStreamApp;
use eframe::Renderer;
use egui::Vec2;
use nostrdb::Note;
mod app;
pub mod widgets;
mod services;
mod route;
mod note_util;
mod link;
mod note_util;
mod route;
mod services;
mod stream_info;
pub mod widgets;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
// TODO: redirect FFMPEG logs to log file (noisy)
let mut options = eframe::NativeOptions::default();
options.renderer = Renderer::Glow;
options.viewport = options.viewport
.with_inner_size(Vec2::new(360., 720.));
options.viewport = options.viewport.with_inner_size(Vec2::new(360., 720.));
let _res = eframe::run_native(
"zap.stream",

View File

@ -35,7 +35,6 @@ impl<'a> NoteUtil for Note<'a> {
}
}
#[derive(Debug, Clone)]
pub struct TagIterBorrow<'a> {
tag: &'a Tag<'a>,

View File

@ -1,47 +1,41 @@
use crate::note_util::OwnedNote;
use crate::route::RouteServices;
use crate::services::ndb_wrapper::NDBWrapper;
use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper};
use crate::widgets;
use crate::widgets::NostrWidget;
use egui::{Response, Ui, Widget};
use log::{error, info};
use nostrdb::{Filter, Ndb, Note, NoteKey, Subscription, Transaction};
use nostrdb::{Filter, Ndb, Note, NoteKey, Transaction};
pub struct HomePage {
sub: Subscription,
sub: SubWrapper,
events: Vec<OwnedNote>,
ndb: NDBWrapper,
}
impl HomePage {
pub fn new(ndb: &NDBWrapper, tx: &Transaction) -> Self {
let filter = [
Filter::new()
.kinds([30_311])
.limit(10)
.build()
];
let (sub, events) = ndb.subscribe_with_results(&filter, tx, 100);
let filter = [Filter::new().kinds([30_311]).limit(10).build()];
let (sub, events) = ndb.subscribe_with_results("home-page", &filter, tx, 100);
Self {
sub,
events: events.iter().map(|e| OwnedNote(e.note_key.as_u64())).collect(),
ndb: ndb.clone(),
events: events
.iter()
.map(|e| OwnedNote(e.note_key.as_u64()))
.collect(),
}
}
}
impl Drop for HomePage {
fn drop(&mut self) {
self.ndb.unsubscribe(self.sub);
}
}
impl NostrWidget for HomePage {
fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response {
let new_notes = services.ndb.poll(self.sub, 100);
new_notes.iter().for_each(|n| self.events.push(OwnedNote(n.as_u64())));
let new_notes = services.ndb.poll(&self.sub, 100);
new_notes
.iter()
.for_each(|n| self.events.push(OwnedNote(n.as_u64())));
let events: Vec<Note<'_>> = self.events.iter()
let events: Vec<Note<'_>> = self
.events
.iter()
.map(|n| services.ndb.get_note_by_key(services.tx, NoteKey::new(n.0)))
.map_while(|f| f.map_or(None, |f| Some(f)))
.collect();

View File

@ -4,7 +4,6 @@ use crate::route;
use crate::route::home::HomePage;
use crate::route::stream::StreamPage;
use crate::services::ndb_wrapper::NDBWrapper;
use crate::services::profile::ProfileService;
use crate::widgets::{Header, NostrWidget, StreamList};
use egui::{Context, Response, ScrollArea, Ui, Widget};
use egui_inbox::{UiInbox, UiInboxSender};
@ -15,8 +14,8 @@ use nostr_sdk::{Client, Kind, PublicKey};
use nostrdb::{Filter, Ndb, Note, Transaction};
use std::borrow::Borrow;
mod stream;
mod home;
mod stream;
#[derive(PartialEq)]
pub enum Routes {
@ -45,7 +44,6 @@ pub struct Router {
router: UiInbox<Routes>,
ctx: Context,
profile_service: ProfileService,
ndb: NDBWrapper,
login: Option<[u8; 32]>,
client: Client,
@ -58,7 +56,6 @@ impl Router {
current_widget: None,
router: UiInbox::new(),
ctx: ctx.clone(),
profile_service: ProfileService::new(client.clone(), ctx.clone()),
ndb: NDBWrapper::new(ctx.clone(), ndb.clone(), client.clone()),
client,
login: None,
@ -75,7 +72,7 @@ impl Router {
let w = StreamPage::new_from_link(&self.ndb, tx, link.clone());
self.current_widget = Some(Box::new(w));
}
_ => warn!("Not implemented")
_ => warn!("Not implemented"),
}
self.current = route;
}
@ -87,10 +84,8 @@ impl Router {
while let Some(r) = self.router.read(ui).next() {
if let Routes::Action(a) = &r {
match a {
RouteAction::Login(k) => {
self.login = Some(k.clone())
}
_ => info!("Not implemented")
RouteAction::Login(k) => self.login = Some(k.clone()),
_ => info!("Not implemented"),
}
} else {
self.load_widget(r, &tx);
@ -104,31 +99,31 @@ impl Router {
let svc = RouteServices {
context: self.ctx.clone(),
profile: &self.profile_service,
router: self.router.sender(),
ndb: self.ndb.clone(),
ndb: &self.ndb,
tx: &tx,
login: &self.login,
};
// display app
ScrollArea::vertical().show(ui, |ui| {
ui.add(Header::new(&svc));
ScrollArea::vertical()
.show(ui, |ui| {
Header::new().render(ui, &svc);
if let Some(w) = self.current_widget.as_mut() {
w.render(ui, &svc)
} else {
ui.label("No widget")
}
}).inner
})
.inner
}
}
pub struct RouteServices<'a> {
pub context: Context, //cloned
pub router: UiInboxSender<Routes>, //cloned
pub ndb: NDBWrapper, //cloned
pub profile: &'a ProfileService, //ref
pub ndb: &'a NDBWrapper, //ref
pub tx: &'a Transaction, //ref
pub login: &'a Option<[u8; 32]>, //ref
}

View File

@ -1,7 +1,7 @@
use crate::link::NostrLink;
use crate::note_util::{NoteUtil, OwnedNote};
use crate::route::RouteServices;
use crate::services::ndb_wrapper::NDBWrapper;
use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper};
use crate::stream_info::StreamInfo;
use crate::widgets::{Chat, NostrWidget, StreamPlayer};
use egui::{Color32, Label, Response, RichText, TextWrapMode, Ui, Widget};
@ -13,20 +13,20 @@ pub struct StreamPage {
event: Option<OwnedNote>,
player: Option<StreamPlayer>,
chat: Option<Chat>,
sub: Subscription,
sub: SubWrapper,
}
impl StreamPage {
pub fn new_from_link(ndb: &NDBWrapper, tx: &Transaction, link: NostrLink) -> Self {
let f: Filter = link.borrow().try_into().unwrap();
let f = [
f.limit_mut(1)
];
let (sub, events) = ndb.subscribe_with_results(&f, tx, 1);
let f = [f.limit_mut(1)];
let (sub, events) = ndb.subscribe_with_results("streams", &f, tx, 1);
Self {
link,
sub,
event: events.first().map_or(None, |n| Some(OwnedNote(n.note_key.as_u64()))),
event: events
.first()
.map_or(None, |n| Some(OwnedNote(n.note_key.as_u64()))),
chat: None,
player: None,
}
@ -35,13 +35,15 @@ impl StreamPage {
impl NostrWidget for StreamPage {
fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response {
let poll = services.ndb.poll(self.sub, 1);
let poll = services.ndb.poll(&self.sub, 1);
if let Some(k) = poll.first() {
self.event = Some(OwnedNote(k.as_u64()))
}
let event = if let Some(k) = &self.event {
services.ndb.get_note_by_key(services.tx, NoteKey::new(k.0))
services
.ndb
.get_note_by_key(services.tx, NoteKey::new(k.0))
.map_or(None, |f| Some(f))
} else {
None

View File

@ -1,2 +1,2 @@
pub mod profile;
pub mod ndb_wrapper;
pub mod query;

View File

@ -1,15 +1,44 @@
use crate::services::query::QueryManager;
use egui::CursorIcon::Default;
use log::{info, warn};
use nostr_sdk::secp256k1::Context;
use nostr_sdk::{Client, JsonUtil, Kind, RelayPoolNotification};
use nostrdb::{Error, Filter, Ndb, Note, NoteKey, ProfileRecord, QueryResult, Subscription, Transaction};
use nostrdb::{
Error, Filter, Ndb, NdbProfile, Note, NoteKey, ProfileRecord, QueryResult, Subscription,
Transaction,
};
use std::sync::{Arc, RwLock};
use tokio::sync::mpsc::UnboundedSender;
#[derive(Debug, Clone)]
pub struct NDBWrapper {
ctx: egui::Context,
ndb: Ndb,
client: Client,
query_manager: QueryManager<Client>,
}
/// Automatic cleanup for subscriptions
pub struct SubWrapper {
ndb: Ndb,
subscription: Subscription,
}
impl SubWrapper {
pub fn new(ndb: Ndb, subscription: Subscription) -> Self {
Self { ndb, subscription }
}
}
impl Into<u64> for &SubWrapper {
fn into(self) -> u64 {
self.subscription.id()
}
}
impl Drop for SubWrapper {
fn drop(&mut self) {
self.ndb.unsubscribe(self.subscription).unwrap()
}
}
impl NDBWrapper {
@ -34,49 +63,93 @@ impl NDBWrapper {
}
}
});
Self { ctx, ndb, client }
let qm = QueryManager::new(client.clone());
Self {
ctx,
ndb,
client,
query_manager: qm,
}
}
pub fn start_transaction(&self) -> Transaction {
Transaction::new(&self.ndb).unwrap()
}
pub fn subscribe(&self, filters: &[Filter]) -> Subscription {
pub fn subscribe(&self, id: &str, filters: &[Filter]) -> SubWrapper {
let sub = self.ndb.subscribe(filters).unwrap();
let c_clone = self.client.clone();
let filters = filters.iter().map(|f| nostr_sdk::Filter::from_json(f.json().unwrap()).unwrap()).collect();
let id_clone = sub.id();
tokio::spawn(async move {
let nostr_sub = c_clone.subscribe(filters, None).await.unwrap();
info!("Sub mapping {}->{}", id_clone, nostr_sub.id())
});
sub
// very lazy conversion
let filters: Vec<nostr_sdk::Filter> = filters
.iter()
.map(|f| nostr_sdk::Filter::from_json(f.json().unwrap()).unwrap())
.collect();
self.query_manager.queue_query(id, filters);
SubWrapper::new(self.ndb.clone(), sub)
}
pub fn unsubscribe(&self, sub: Subscription) {
self.ndb.unsubscribe(sub).unwrap()
pub fn unsubscribe(&self, sub: &SubWrapper) {
self.ndb.unsubscribe(sub.subscription).unwrap()
}
pub fn subscribe_with_results<'a>(&self, filters: &[Filter], tx: &'a Transaction, max_results: i32) -> (Subscription, Vec<QueryResult<'a>>) {
let sub = self.subscribe(filters);
pub fn subscribe_with_results<'a>(
&self,
id: &str,
filters: &[Filter],
tx: &'a Transaction,
max_results: i32,
) -> (SubWrapper, Vec<QueryResult<'a>>) {
let sub = self.subscribe(id, filters);
let q = self.query(tx, filters, max_results);
(sub, q)
}
pub fn query<'a>(&self, tx: &'a Transaction, filters: &[Filter], max_results: i32) -> Vec<QueryResult<'a>> {
pub fn query<'a>(
&self,
tx: &'a Transaction,
filters: &[Filter],
max_results: i32,
) -> Vec<QueryResult<'a>> {
self.ndb.query(tx, filters, max_results).unwrap()
}
pub fn poll(&self, sub: Subscription, max_results: u32) -> Vec<NoteKey> {
self.ndb.poll_for_notes(sub, max_results)
pub fn poll(&self, sub: &SubWrapper, max_results: u32) -> Vec<NoteKey> {
self.ndb.poll_for_notes(sub.subscription, max_results)
}
pub fn get_note_by_key<'a>(&self, tx: &'a Transaction, key: NoteKey) -> Result<Note<'a>, Error> {
pub fn get_note_by_key<'a>(
&self,
tx: &'a Transaction,
key: NoteKey,
) -> Result<Note<'a>, Error> {
self.ndb.get_note_by_key(tx, key)
}
pub fn get_profile_by_pubkey<'a>(&self, tx: &'a Transaction, pubkey: &[u8; 32]) -> Result<ProfileRecord<'a>, Error> {
pub fn get_profile_by_pubkey<'a>(
&self,
tx: &'a Transaction,
pubkey: &[u8; 32],
) -> Result<ProfileRecord<'a>, Error> {
self.ndb.get_profile_by_pubkey(tx, pubkey)
}
pub fn fetch_profile<'a>(
&self,
tx: &'a Transaction,
pubkey: &[u8; 32],
) -> (Option<NdbProfile<'a>>, Option<SubWrapper>) {
let p = self
.get_profile_by_pubkey(tx, pubkey)
.map_or(None, |p| p.record().profile());
let sub = if p.is_none() {
Some(self.subscribe(
"profile",
&[Filter::new().kinds([0]).authors([pubkey]).build()],
))
} else {
None
};
(p, sub)
}
}

View File

@ -1,85 +0,0 @@
use egui::load::BytesLoader;
use log::{info, warn};
use nostr_sdk::prelude::hex;
use nostr_sdk::{Client, Metadata, PublicKey};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
pub struct ProfileService {
client: Client,
pub profiles: Arc<Mutex<HashMap<[u8; 32], Option<Metadata>>>>,
ctx: egui::Context,
request_profile: UnboundedSender<[u8; 32]>,
}
struct Loader {
client: Client,
ctx: egui::Context,
profiles: Arc<Mutex<HashMap<[u8; 32], Option<Metadata>>>>,
queue: UnboundedReceiver<[u8; 32]>,
}
impl Loader {
pub async fn run(&mut self) {
while let Some(key) = self.queue.recv().await {
let mut profiles = self.profiles.lock().await;
if !profiles.contains_key(&key) {
info!("Requesting profile {}", hex::encode(key));
match self.client.fetch_metadata(PublicKey::from_slice(&key).unwrap(),
Some(Duration::from_secs(3))).await {
Ok(meta) => {
profiles.insert(key, Some(meta));
self.ctx.request_repaint();
}
Err(e) => {
warn!("Error getting metadata: {}", e);
}
}
}
}
}
}
impl ProfileService {
pub fn new(client: Client, ctx: egui::Context) -> ProfileService
{
let profiles = Arc::new(Mutex::new(HashMap::new()));
let (tx, rx) = unbounded_channel();
let mut loader = Loader {
client: client.clone(),
ctx: ctx.clone(),
profiles: profiles.clone(),
queue: rx,
};
tokio::spawn(async move {
loader.run().await;
});
Self {
client,
ctx,
profiles,
request_profile: tx,
}
}
pub fn get_profile(&self, public_key: &[u8; 32]) -> Option<Metadata> {
if let Ok(profiles) = self.profiles.try_lock() {
return if let Some(p) = profiles.get(public_key) {
if let Some(p) = p {
Some(p.clone())
} else {
None
}
} else {
self.request_profile.send(*public_key).expect("Failed request");
None
};
}
None
}
}

158
src/services/query.rs Normal file
View File

@ -0,0 +1,158 @@
use anyhow::Error;
use chrono::Utc;
use log::{error, info};
use nostr_sdk::prelude::StreamExt;
use nostr_sdk::{Client, Filter, SubscriptionId};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use uuid::Uuid;
#[async_trait::async_trait]
pub trait QueryClient {
async fn subscribe(&self, id: &str, filters: &[QueryFilter]) -> Result<(), Error>;
}
pub type QueryFilter = Filter;
pub struct Query {
pub id: String,
queue: HashSet<QueryFilter>,
traces: HashSet<QueryTrace>,
}
#[derive(Hash, Eq, PartialEq, Debug)]
pub struct QueryTrace {
/// Subscription id on the relay
pub id: Uuid,
/// Filters associated with this subscription
pub filters: Vec<QueryFilter>,
/// When the query was created
pub queued: u64,
/// When the query was sent to the relay
pub sent: Option<u64>,
/// When EOSE was received
pub eose: Option<u64>,
}
impl Query {
pub fn new(id: &str) -> Self {
Self {
id: id.to_string(),
queue: HashSet::new(),
traces: HashSet::new(),
}
}
/// Add filters to query
pub fn add(&mut self, filter: Vec<QueryFilter>) {
for f in filter {
self.queue.insert(f);
}
}
/// Return next query batch
pub fn next(&mut self) -> Option<QueryTrace> {
let next: Vec<QueryFilter> = self.queue.drain().collect();
if next.len() == 0 {
return None;
}
let now = Utc::now();
let id = Uuid::new_v4();
Some(QueryTrace {
id,
filters: next,
queued: now.timestamp() as u64,
sent: None,
eose: None,
})
}
}
struct QueueDefer {
id: String,
filters: Vec<QueryFilter>,
}
pub struct QueryManager<C> {
client: C,
queries: Arc<RwLock<HashMap<String, Query>>>,
queue_into_queries: UnboundedSender<QueueDefer>,
sender: JoinHandle<()>,
}
impl<C> QueryManager<C>
where
C: QueryClient + Clone + Send + Sync + 'static,
{
pub(crate) fn new(client: C) -> Self {
let queries = Arc::new(RwLock::new(HashMap::new()));
let (tx, mut rx) = unbounded_channel::<QueueDefer>();
Self {
client: client.clone(),
queries: queries.clone(),
queue_into_queries: tx,
sender: tokio::spawn(async move {
loop {
{
let mut q = queries.write().await;
while let Ok(x) = rx.try_recv() {
Self::push_filters(&mut q, &x.id, x.filters);
}
for (k, v) in q.iter_mut() {
if let Some(qt) = v.next() {
info!("Sending trace: {:?}", qt);
match client
.subscribe(&qt.id.to_string(), qt.filters.as_slice())
.await
{
Ok(_) => {}
Err(e) => {
error!("Failed to subscribe to query filters: {}", e);
}
}
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}),
}
}
pub async fn query<F>(&mut self, id: &str, filters: F)
where
F: Into<Vec<QueryFilter>>,
{
let mut qq = self.queries.write().await;
Self::push_filters(&mut qq, id, filters.into());
}
fn push_filters(qq: &mut HashMap<String, Query>, id: &str, filters: Vec<QueryFilter>) {
if let Some(q) = qq.get_mut(id) {
q.add(filters.into());
} else {
let mut q = Query::new(id);
q.add(filters.into());
qq.insert(id.to_string(), q);
}
}
pub fn queue_query<F>(&self, id: &str, filters: F)
where
F: Into<Vec<QueryFilter>>,
{
}
}
#[async_trait::async_trait]
impl QueryClient for Client {
async fn subscribe(&self, id: &str, filters: &[QueryFilter]) -> Result<(), Error> {
self.subscribe_with_id(SubscriptionId::new(id), filters.into(), None)
.await?;
Ok(())
}
}

View File

@ -1,27 +1,36 @@
use crate::services::profile::ProfileService;
use crate::route::RouteServices;
use crate::services::ndb_wrapper::SubWrapper;
use egui::{Color32, Image, Rect, Response, Rounding, Sense, Ui, Vec2, Widget};
pub struct Avatar<'a> {
image: Option<Image<'a>>,
sub: Option<SubWrapper>,
}
impl<'a> Avatar<'a> {
pub fn new(img: Image<'a>) -> Self {
Self { image: Some(img) }
Self {
image: Some(img),
sub: None,
}
}
pub fn new_optional(img: Option<Image<'a>>) -> Self {
Self { image: img }
Self {
image: img,
sub: None,
}
}
pub fn public_key(svc: &'a ProfileService, pk: &[u8; 32]) -> Self {
if let Some(meta) = svc.get_profile(pk) {
if let Some(img) = &meta.picture {
return Self { image: Some(Image::from_uri(img.clone())) };
pub fn pubkey(pk: &[u8; 32], svc: &RouteServices<'a>) -> Self {
let (img, sub) = svc.ndb.fetch_profile(svc.tx, pk);
Self {
image: img
.map_or(None, |p| p.picture())
.map(|p| Image::from_uri(p)),
sub,
}
}
Self { image: None }
}
pub fn max_size(mut self, size: f32) -> Self {
self.image = if let Some(i) = self.image {
@ -45,9 +54,7 @@ impl<'a> Avatar<'a> {
impl<'a> Widget for Avatar<'a> {
fn ui(self, ui: &mut Ui) -> Response {
match self.image {
Some(img) => {
img.rounding(Rounding::same(ui.available_height())).ui(ui)
}
Some(img) => img.rounding(Rounding::same(ui.available_height())).ui(ui),
None => {
let h = ui.available_height();
let rnd = Rounding::same(h);

View File

@ -1,17 +1,16 @@
use crate::link::NostrLink;
use crate::note_util::OwnedNote;
use crate::route::RouteServices;
use crate::services::ndb_wrapper::NDBWrapper;
use crate::services::ndb_wrapper::{NDBWrapper, SubWrapper};
use crate::widgets::chat_message::ChatMessage;
use crate::widgets::NostrWidget;
use egui::{Response, ScrollArea, Ui, Widget};
use nostrdb::{Filter, Note, NoteKey, Subscription, Transaction};
use std::borrow::Borrow;
use nostrdb::{Filter, Note, NoteKey, Transaction};
pub struct Chat {
link: NostrLink,
events: Vec<OwnedNote>,
sub: Subscription,
sub: SubWrapper,
}
impl Chat {
@ -22,33 +21,45 @@ impl Chat {
.build();
let filter = [filter];
let (sub, events) = ndb.subscribe_with_results(&filter, tx, 500);
let (sub, events) = ndb.subscribe_with_results("live-chat", &filter, tx, 500);
Self {
link,
sub,
events: events.iter().map(|n| OwnedNote(n.note_key.as_u64())).collect(),
events: events
.iter()
.map(|n| OwnedNote(n.note_key.as_u64()))
.collect(),
}
}
}
impl NostrWidget for Chat {
fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response {
let poll = services.ndb.poll(self.sub, 500);
poll.iter().for_each(|n| self.events.push(OwnedNote(n.as_u64())));
let poll = services.ndb.poll(&self.sub, 500);
poll.iter()
.for_each(|n| self.events.push(OwnedNote(n.as_u64())));
let events: Vec<Note> = self.events.iter().map_while(|n|
services.ndb
let events: Vec<Note> = self
.events
.iter()
.map_while(|n| {
services
.ndb
.get_note_by_key(services.tx, NoteKey::new(n.0))
.map_or(None, |n| Some(n))
).collect();
})
.collect();
ScrollArea::vertical().show(ui, |ui| {
ScrollArea::vertical()
.show(ui, |ui| {
ui.vertical(|ui| {
for ev in events {
ChatMessage::new(&ev, services).ui(ui);
}
}).response
}).inner
})
.response
})
.inner
}
}

View File

@ -20,12 +20,11 @@ impl<'a> Widget for ChatMessage<'a> {
ui.horizontal(|ui| {
ui.spacing_mut().item_spacing = Vec2::new(8., 2.);
let author = self.ev.pubkey();
Profile::new(author, self.services)
.size(24.)
.ui(ui);
Profile::new(author, self.services).size(24.).ui(ui);
let content = self.ev.content();
ui.label(content);
}).response
})
.response
}
}

View File

@ -1,38 +1,43 @@
use crate::route::{RouteServices, Routes};
use crate::widgets::avatar::Avatar;
use crate::widgets::NostrWidget;
use eframe::emath::Align;
use eframe::epaint::Vec2;
use egui::{Frame, Image, Layout, Margin, Response, Sense, Ui, Widget};
use nostr_sdk::util::hex;
pub struct Header<'a> {
services: &'a RouteServices<'a>,
}
pub struct Header;
impl<'a> Header<'a> {
pub fn new(services: &'a RouteServices) -> Self {
Self { services }
impl Header {
pub fn new() -> Self {
Self {}
}
}
impl Widget for Header<'_> {
fn ui(self, ui: &mut Ui) -> Response {
let login: [u8; 32] = hex::decode("63fe6318dc58583cfe16810f86dd09e18bfd76aabc24a0081ce2856f330504ed").unwrap().try_into().unwrap();
impl NostrWidget for Header {
fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response {
let logo_bytes = include_bytes!("../logo.svg");
Frame::none()
.outer_margin(Margin::symmetric(16., 8.))
.show(ui, |ui| {
ui.allocate_ui_with_layout(Vec2::new(ui.available_width(), 32.), Layout::left_to_right(Align::Center), |ui| {
ui.style_mut()
.spacing.item_spacing.x = 16.;
ui.allocate_ui_with_layout(
Vec2::new(ui.available_width(), 32.),
Layout::left_to_right(Align::Center),
|ui| {
ui.style_mut().spacing.item_spacing.x = 16.;
if Image::from_bytes("logo.svg", logo_bytes)
.max_height(22.62)
.sense(Sense::click())
.ui(ui).clicked() {
self.services.navigate(Routes::HomePage);
.ui(ui)
.clicked()
{
services.navigate(Routes::HomePage);
}
ui.add(Avatar::public_key(&self.services.profile, &login));
if let Some(pk) = services.login {
ui.add(Avatar::pubkey(pk, services));
}
},
)
})
}).response
.response
}
}

View File

@ -1,24 +1,24 @@
mod header;
mod stream;
mod stream_list;
mod avatar;
mod stream_player;
mod video_placeholder;
mod chat;
mod chat_message;
mod header;
mod profile;
mod stream;
mod stream_list;
mod stream_player;
mod video_placeholder;
use egui::{Response, Ui};
use crate::route::RouteServices;
use egui::{Response, Ui};
pub trait NostrWidget {
fn render(&mut self, ui: &mut Ui, services: &RouteServices<'_>) -> Response;
}
pub use self::avatar::Avatar;
pub use self::header::Header;
pub use self::stream_list::StreamList;
pub use self::video_placeholder::VideoPlaceholder;
pub use self::stream_player::StreamPlayer;
pub use self::profile::Profile;
pub use self::chat::Chat;
pub use self::header::Header;
pub use self::profile::Profile;
pub use self::stream_list::StreamList;
pub use self::stream_player::StreamPlayer;
pub use self::video_placeholder::VideoPlaceholder;

View File

@ -1,4 +1,5 @@
use crate::route::RouteServices;
use crate::services::ndb_wrapper::SubWrapper;
use crate::widgets::Avatar;
use egui::{Color32, Image, Label, Response, RichText, TextWrapMode, Ui, Widget};
use nostrdb::NdbProfile;
@ -7,15 +8,18 @@ pub struct Profile<'a> {
size: f32,
pubkey: &'a [u8; 32],
profile: Option<NdbProfile<'a>>,
sub: Option<SubWrapper>,
}
impl<'a> Profile<'a> {
pub fn new(pubkey: &'a [u8; 32], services: &'a RouteServices<'a>) -> Self {
let p = services.ndb.get_profile_by_pubkey(services.tx, &pubkey)
.map(|f| f.record().profile())
.map_or(None, |r| r);
Self { pubkey, size: 40., profile: p }
let (p, sub) = services.ndb.fetch_profile(services.tx, pubkey);
Self {
pubkey,
size: 40.,
profile: p,
sub,
}
}
pub fn size(self, size: f32) -> Self {
@ -28,14 +32,17 @@ impl<'a> Widget for Profile<'a> {
ui.horizontal(|ui| {
ui.spacing_mut().item_spacing.x = 8.;
let img = self.profile.map_or(None, |f| f.picture().map(|f| Image::from_uri(f)));
let img = self
.profile
.map_or(None, |f| f.picture().map(|f| Image::from_uri(f)));
ui.add(Avatar::new_optional(img).size(self.size));
let name = self.profile.map_or("Nostrich", |f| f.name().map_or("Nostrich", |f| f));
let name = RichText::new(name)
.size(13.)
.color(Color32::WHITE);
let name = self
.profile
.map_or("Nostrich", |f| f.name().map_or("Nostrich", |f| f));
let name = RichText::new(name).size(13.).color(Color32::WHITE);
ui.add(Label::new(name).wrap_mode(TextWrapMode::Truncate));
}).response
})
.response
}
}

View File

@ -20,24 +20,29 @@ impl<'a> StreamEvent<'a> {
Some(i) => match i.variant().str() {
Some(i) => Some(Image::from_uri(i)),
None => None,
}
},
None => None,
};
Self { event, picture: cover, services }
Self {
event,
picture: cover,
services,
}
}
}
impl Widget for StreamEvent<'_> {
fn ui(self, ui: &mut Ui) -> Response {
ui.vertical(|ui| {
ui.style_mut()
.spacing.item_spacing = Vec2::new(12., 16.);
ui.style_mut().spacing.item_spacing = Vec2::new(12., 16.);
let host = match self.event.find_tag_value(|t| t[0].variant().str() == Some("p") && t[3].variant().str() == Some("host")) {
let host = match self.event.find_tag_value(|t| {
t[0].variant().str() == Some("p") && t[3].variant().str() == Some("host")
}) {
Some(t) => match t.variant() {
NdbStrVariant::Id(i) => i,
NdbStrVariant::Str(s) => self.event.pubkey(),
}
None => self.event.pubkey()
},
None => self.event.pubkey(),
};
let w = ui.available_width();
let h = (w / 16.0) * 9.0;
@ -49,9 +54,7 @@ impl Widget for StreamEvent<'_> {
.rounding(Rounding::same(12.))
.sense(Sense::click())
.ui(ui),
None => {
VideoPlaceholder.ui(ui)
}
None => VideoPlaceholder.ui(ui),
};
if img.clicked() {
self.services.navigate(Routes::Event {
@ -60,7 +63,7 @@ impl Widget for StreamEvent<'_> {
});
}
ui.horizontal(|ui| {
ui.add(Avatar::public_key(self.services.profile, &host).size(40.));
ui.add(Avatar::pubkey(&host, self.services).size(40.));
let title = RichText::new(match self.event.get_tag_value("title") {
Some(s) => s.variant().str().unwrap_or("Unknown"),
None => "Unknown",
@ -69,6 +72,7 @@ impl Widget for StreamEvent<'_> {
.color(Color32::WHITE);
ui.add(Label::new(title).wrap_mode(TextWrapMode::Truncate));
})
}).response
})
.response
}
}

View File

@ -25,6 +25,7 @@ impl Widget for StreamList<'_> {
ui.add(StreamEvent::new(event, self.services));
}
})
}).response
})
.response
}
}

View File

@ -12,7 +12,7 @@ impl StreamPlayer {
player: Player::new(ctx, url).map_or(None, |mut f| {
f.start();
Some(f)
})
}),
}
}
}

View File

@ -9,7 +9,11 @@ impl Widget for VideoPlaceholder {
let img_size = Vec2::new(w, h);
let (response, painter) = ui.allocate_painter(img_size, Sense::click());
painter.rect_filled(Rect::EVERYTHING, Rounding::same(12.), Color32::from_rgb(200, 200, 200));
painter.rect_filled(
Rect::EVERYTHING,
Rounding::same(12.),
Color32::from_rgb(200, 200, 200),
);
response
}
}