feat: NoteStore event-emitter

This commit is contained in:
Kieran 2024-01-08 15:09:55 +00:00
parent ca2cb76380
commit 5d3abc553a
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
9 changed files with 48 additions and 78 deletions

View File

@ -28,7 +28,7 @@ export function useRefreshFeedCache<T>(c: RefreshFeedCache<T>, leaveOpen = false
const q = system.Query(NoopStore, sub); const q = system.Query(NoopStore, sub);
let t: ReturnType<typeof setTimeout> | undefined; let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = []; let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = q.feed.onEvent(evs => { q.feed.on("event", evs => {
if (!t) { if (!t) {
tBuf = [...evs]; tBuf = [...evs];
t = setTimeout(() => { t = setTimeout(() => {
@ -41,9 +41,9 @@ export function useRefreshFeedCache<T>(c: RefreshFeedCache<T>, leaveOpen = false
}); });
q.uncancel(); q.uncancel();
return () => { return () => {
q.feed.off("event");
q.cancel(); q.cancel();
q.sendClose(); q.sendClose();
releaseOnEvent();
}; };
} }
}, [sub]); }, [sub]);

View File

@ -14,11 +14,11 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
const subscribe = (onChanged: () => void) => { const subscribe = (onChanged: () => void) => {
if (rb) { if (rb) {
const q = system.Query<TStore>(type, rb); const q = system.Query<TStore>(type, rb);
const release = q.feed.hook(onChanged); q.feed.on("event", onChanged);
q.uncancel(); q.uncancel();
return () => { return () => {
q.feed.off("event", onChanged);
q.cancel(); q.cancel();
release();
}; };
} }
return () => { return () => {

View File

@ -43,7 +43,7 @@ export * from "./cache/user-relays";
export * from "./cache/user-metadata"; export * from "./cache/user-metadata";
export * from "./cache/relay-metric"; export * from "./cache/relay-metric";
export * from "./worker/system-worker"; export * from "./worker";
export interface SystemInterface { export interface SystemInterface {
/** /**
@ -79,7 +79,7 @@ export interface SystemInterface {
* @param req Request to send to relays * @param req Request to send to relays
* @param cb A callback which will fire every 100ms when new data is received * @param cb A callback which will fire every 100ms when new data is received
*/ */
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<Array<TaggedNostrEvent>>; Fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray<TaggedNostrEvent>) => void): Promise<Array<TaggedNostrEvent>>;
/** /**
* Create a new permanent connection to a relay * Create a new permanent connection to a relay

View File

@ -17,10 +17,19 @@ export interface NostrConnectionPoolEvents {
notice: (address: string, msg: string) => void; notice: (address: string, msg: string) => void;
} }
export type ConnectionPool = {
getState(): ConnectionStateSnapshot[];
getConnection(id: string): Connection | undefined;
connect(address: string, options: RelaySettings, ephemeral: boolean): Promise<Connection | undefined>;
disconnect(address: string): void;
broadcast(system: SystemInterface, ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]>;
broadcastTo(address: string, ev: NostrEvent): Promise<OkResponse>;
} & EventEmitter<NostrConnectionPoolEvents>;
/** /**
* Simple connection pool containing connections to multiple nostr relays * Simple connection pool containing connections to multiple nostr relays
*/ */
export class NostrConnectionPool extends EventEmitter<NostrConnectionPoolEvents> { export class NostrConnectionPool extends EventEmitter<NostrConnectionPoolEvents> implements ConnectionPool {
#log = debug("NostrConnectionPool"); #log = debug("NostrConnectionPool");
/** /**

View File

@ -73,28 +73,17 @@ export class NostrQueryManager extends EventEmitter<NostrQueryManagerEvents> {
/** /**
* Async fetch results * Async fetch results
*/ */
fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) { fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray<TaggedNostrEvent>) => void) {
const q = this.query(NoteCollection, req); const q = this.query(NoteCollection, req);
return new Promise<Array<TaggedNostrEvent>>(resolve => { return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined; let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = []; let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = cb if (cb) {
? q.feed.onEvent(evs => { q.feed.on("event", cb);
if (!t) { }
tBuf = [...evs]; q.feed.on("progress", loading => {
t = setTimeout(() => { if (!loading) {
t = undefined; q.feed.off("event");
cb(tBuf);
}, 100);
} else {
tBuf.push(...evs);
}
})
: undefined;
const releaseFeedHook = q.feed.hook(() => {
if (q.progress === 1) {
releaseOnEvent?.();
releaseFeedHook();
q.cancel(); q.cancel();
resolve(unwrap((q.feed as NoteCollection).snapshot.data)); resolve(unwrap((q.feed as NoteCollection).snapshot.data));
} }

View File

@ -228,7 +228,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
return this.#queryManager.get(id); return this.#queryManager.get(id);
} }
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) { Fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray<TaggedNostrEvent>) => void) {
return this.#queryManager.fetch(req, cb); return this.#queryManager.fetch(req, cb);
} }

View File

@ -1,6 +1,7 @@
import { appendDedupe, SortedMap } from "@snort/shared"; import { appendDedupe, SortedMap } from "@snort/shared";
import { EventExt, EventType, TaggedNostrEvent, u256 } from "."; import { EventExt, EventType, TaggedNostrEvent, u256 } from ".";
import { findTag } from "./utils"; import { findTag } from "./utils";
import EventEmitter from "eventemitter3";
export interface StoreSnapshot<TSnapshot> { export interface StoreSnapshot<TSnapshot> {
data: TSnapshot | undefined; data: TSnapshot | undefined;
@ -28,28 +29,25 @@ export type OnEventCallbackRelease = () => void;
export type OnEoseCallback = (c: string) => void; export type OnEoseCallback = (c: string) => void;
export type OnEoseCallbackRelease = () => void; export type OnEoseCallbackRelease = () => void;
export interface NostrStoreEvents {
progress: (loading: boolean) => void;
event: (evs: Readonly<Array<TaggedNostrEvent>>) => void;
}
/** /**
* Generic note store interface * Generic note store interface
*/ */
export abstract class NoteStore { export abstract class NoteStore extends EventEmitter<NostrStoreEvents> {
abstract add(ev: Readonly<TaggedNostrEvent> | Readonly<Array<TaggedNostrEvent>>): void; abstract add(ev: Readonly<TaggedNostrEvent> | Readonly<Array<TaggedNostrEvent>>): void;
abstract clear(): void; abstract clear(): void;
// react hooks
abstract hook(cb: NoteStoreHook): NoteStoreHookRelease;
abstract getSnapshotData(): NoteStoreSnapshotData | undefined; abstract getSnapshotData(): NoteStoreSnapshotData | undefined;
// events
abstract onEvent(cb: OnEventCallback): OnEventCallbackRelease;
abstract get snapshot(): StoreSnapshot<NoteStoreSnapshotData>; abstract get snapshot(): StoreSnapshot<NoteStoreSnapshotData>;
abstract get loading(): boolean; abstract get loading(): boolean;
abstract set loading(v: boolean); abstract set loading(v: boolean);
} }
export abstract class HookedNoteStore<TSnapshot extends NoteStoreSnapshotData> implements NoteStore { export abstract class HookedNoteStore<TSnapshot extends NoteStoreSnapshotData> extends NoteStore {
#hooks: Array<NoteStoreHook> = [];
#eventHooks: Array<OnEventCallback> = [];
#loading = true; #loading = true;
#storeSnapshot: StoreSnapshot<TSnapshot> = { #storeSnapshot: StoreSnapshot<TSnapshot> = {
clear: () => this.clear(), clear: () => this.clear(),
@ -59,6 +57,7 @@ export abstract class HookedNoteStore<TSnapshot extends NoteStoreSnapshotData> i
}; };
#needsSnapshot = true; #needsSnapshot = true;
#nextNotifyTimer?: ReturnType<typeof setTimeout>; #nextNotifyTimer?: ReturnType<typeof setTimeout>;
#bufEmit: Array<TaggedNostrEvent> = [];
get snapshot() { get snapshot() {
this.#updateSnapshot(); this.#updateSnapshot();
@ -71,56 +70,29 @@ export abstract class HookedNoteStore<TSnapshot extends NoteStoreSnapshotData> i
set loading(v: boolean) { set loading(v: boolean) {
this.#loading = v; this.#loading = v;
this.onChange([]); this.emit("progress", v);
} }
abstract add(ev: Readonly<TaggedNostrEvent> | Readonly<Array<TaggedNostrEvent>>): void; abstract override add(ev: Readonly<TaggedNostrEvent> | Readonly<Array<TaggedNostrEvent>>): void;
abstract clear(): void; abstract override clear(): void;
hook(cb: NoteStoreHook): NoteStoreHookRelease {
this.#hooks.push(cb);
return () => {
const idx = this.#hooks.findIndex(a => a === cb);
this.#hooks.splice(idx, 1);
};
}
getSnapshotData() { getSnapshotData() {
this.#updateSnapshot(); this.#updateSnapshot();
return this.#storeSnapshot.data; return this.#storeSnapshot.data;
} }
onEvent(cb: OnEventCallback): OnEventCallbackRelease {
const existing = this.#eventHooks.find(a => a === cb);
if (!existing) {
this.#eventHooks.push(cb);
return () => {
const idx = this.#eventHooks.findIndex(a => a === cb);
this.#eventHooks.splice(idx, 1);
};
}
return () => {
//noop
};
}
protected abstract takeSnapshot(): TSnapshot | undefined; protected abstract takeSnapshot(): TSnapshot | undefined;
protected onChange(changes: Readonly<Array<TaggedNostrEvent>>): void { protected onChange(changes: Readonly<Array<TaggedNostrEvent>>): void {
this.#needsSnapshot = true; this.#needsSnapshot = true;
this.#bufEmit.push(...changes);
if (!this.#nextNotifyTimer) { if (!this.#nextNotifyTimer) {
this.#nextNotifyTimer = setTimeout(() => { this.#nextNotifyTimer = setTimeout(() => {
this.#nextNotifyTimer = undefined; this.#nextNotifyTimer = undefined;
for (const hk of this.#hooks) { this.emit("event", this.#bufEmit);
hk(); this.#bufEmit = [];
}
}, 500); }, 500);
} }
if (changes.length > 0) {
for (const hkE of this.#eventHooks) {
hkE(changes);
}
}
} }
#updateSnapshot() { #updateSnapshot() {

View File

@ -1,4 +1,4 @@
export const enum NostrSystemCommand { export const enum WorkerCommand {
OkResponse, OkResponse,
ErrorResponse, ErrorResponse,
Init, Init,
@ -6,8 +6,8 @@ export const enum NostrSystemCommand {
DisconnectRelay, DisconnectRelay,
} }
export interface NostrSystemMessage<T> { export interface WorkerMessage<T> {
id: string; id: string;
type: NostrSystemCommand; type: WorkerCommand;
data: T; data: T;
} }

View File

@ -15,7 +15,7 @@ import {
} from ".."; } from "..";
import { NostrSystemEvents, NostrsystemProps } from "../nostr-system"; import { NostrSystemEvents, NostrsystemProps } from "../nostr-system";
import { Query } from "../query"; import { Query } from "../query";
import { NostrSystemCommand, NostrSystemMessage } from "."; import { WorkerCommand, WorkerMessage } from ".";
export class SystemWorker extends EventEmitter<NostrSystemEvents> implements SystemInterface { export class SystemWorker extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#worker: Worker; #worker: Worker;
@ -36,7 +36,7 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
} }
async Init() { async Init() {
await this.#workerRpc<void, string>(NostrSystemCommand.Init, undefined); await this.#workerRpc<void, string>(WorkerCommand.Init, undefined);
} }
GetQuery(id: string): Query | undefined { GetQuery(id: string): Query | undefined {
@ -83,19 +83,19 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
#workerRpc<T, R>(type: NostrSystemCommand, data: T, timeout = 5_000) { #workerRpc<T, R>(type: WorkerCommand, data: T, timeout = 5_000) {
const id = uuid(); const id = uuid();
this.#worker.postMessage({ this.#worker.postMessage({
id, id,
type, type,
data, data,
} as NostrSystemMessage<T>); } as WorkerMessage<T>);
return new Promise<R>((resolve, reject) => { return new Promise<R>((resolve, reject) => {
let t: ReturnType<typeof setTimeout>; let t: ReturnType<typeof setTimeout>;
this.#commandQueue.set(id, v => { this.#commandQueue.set(id, v => {
clearTimeout(t); clearTimeout(t);
const cmdReply = v as NostrSystemMessage<R>; const cmdReply = v as WorkerMessage<R>;
if (cmdReply.type === NostrSystemCommand.OkResponse) { if (cmdReply.type === WorkerCommand.OkResponse) {
resolve(cmdReply.data); resolve(cmdReply.data);
} else { } else {
reject(cmdReply.data); reject(cmdReply.data);