feat: emit updates from relay-worker

This commit is contained in:
2024-01-18 12:26:47 +00:00
parent 6d8c0325e4
commit 2d4c323cf7
7 changed files with 135 additions and 46 deletions

View File

@ -8,34 +8,47 @@ import useSubscribe from "@/Hooks/useSubscribe";
import { Relay } from "@/system"; import { Relay } from "@/system";
import { Day } from "@/Utils/Const"; import { Day } from "@/Utils/Const";
export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, maxWindow?: number) { export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, leaveOpen?: boolean, maxWindow?: number) {
const [events, setEvents] = useState<Array<NostrEvent>>([]); const [events, setEvents] = useState<Array<NostrEvent>>([]);
const [rb, setRb] = useState<RequestBuilder>(); const [rb, setRb] = useState<RequestBuilder>();
useRequestBuilder(rb); useRequestBuilder(rb);
useEffect(() => { useEffect(() => {
Relay.req([ Relay.req({
"REQ", id: `${id}+latest`,
`${id}+latest`, filters: filters.map(f => ({
...filters.map(f => ({
...f, ...f,
until: undefined, until: undefined,
since: undefined, since: undefined,
limit: 1, limit: 1,
})), })),
]).then(latest => { }).then(latest => {
const rb = new RequestBuilder(id); const rb = new RequestBuilder(id);
filters filters
.map((f, i) => ({ .map((f, i) => ({
...f, ...f,
limit: undefined, limit: undefined,
until: undefined, until: undefined,
since: latest?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined),
})) }))
.forEach(f => rb.withBareFilter(f)); .forEach(f => rb.withBareFilter(f));
setRb(rb); setRb(rb);
}); });
Relay.req(["REQ", id, ...filters]).then(setEvents); Relay.req({ id, filters, leaveOpen }).then(res => {
setEvents(res.results);
if (res.port) {
res.port.addEventListener("message", ev => {
const evs = ev.data as Array<NostrEvent>;
if (evs.length > 0) {
setEvents(x => [...x, ...evs]);
}
});
res.port.start();
}
});
return () => {
Relay.close(id);
};
}, [id, filters, maxWindow]); }, [id, filters, maxWindow]);
return events as Array<TaggedNostrEvent>; return events as Array<TaggedNostrEvent>;
@ -47,28 +60,27 @@ export function useWorkerRelayViewCount(id: string, filters: Array<ReqFilter>, m
useRequestBuilder(rb); useRequestBuilder(rb);
useEffect(() => { useEffect(() => {
Relay.req([ Relay.req({
"REQ", id: `${id}+latest`,
`${id}+latest`, filters: filters.map(f => ({
...filters.map(f => ({
...f, ...f,
until: undefined, until: undefined,
since: undefined, since: undefined,
limit: 1, limit: 1,
})), })),
]).then(latest => { }).then(latest => {
const rb = new RequestBuilder(id); const rb = new RequestBuilder(id);
filters filters
.map((f, i) => ({ .map((f, i) => ({
...f, ...f,
limit: undefined, limit: undefined,
until: undefined, until: undefined,
since: latest?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined),
})) }))
.forEach(f => rb.withBareFilter(f)); .forEach(f => rb.withBareFilter(f));
setRb(rb); setRb(rb);
}); });
Relay.count(["REQ", id, ...filters]).then(setCount); Relay.count({ id, filters }).then(setCount);
}, [id, filters, maxWindow]); }, [id, filters, maxWindow]);
return count; return count;
@ -78,15 +90,16 @@ export function useFollowsTimelineView(limit = 20) {
const follows = useLogin(s => s.follows.item); const follows = useLogin(s => s.follows.item);
const kinds = [EventKind.TextNote, EventKind.Repost, EventKind.Polls]; const kinds = [EventKind.TextNote, EventKind.Repost, EventKind.Polls];
const filter = useMemo( const filter = useMemo(() => {
() => ({ return [
authors: follows, {
kinds, authors: follows,
limit, kinds,
}), limit,
[follows, limit], },
); ];
return useSubscribe("follows-timeline", filter); }, [follows, limit]);
return useWorkerRelayView("follows-timeline", filter, true, Day * 7);
} }
export function useNotificationsView() { export function useNotificationsView() {
@ -101,7 +114,7 @@ export function useNotificationsView() {
}, },
]; ];
}, [publicKey]); }, [publicKey]);
return useSubscribe("notifications", req[0]); return useWorkerRelayView("notifications", req, true, Day * 30);
} }
export function useReactionsView(ids: Array<NostrLink>, leaveOpen = true) { export function useReactionsView(ids: Array<NostrLink>, leaveOpen = true) {
@ -123,7 +136,7 @@ export function useReactionsView(ids: Array<NostrLink>, leaveOpen = true) {
return rb.buildRaw(); return rb.buildRaw();
}, [ids]); }, [ids]);
return useSubscribe("reactions", req[0]); return useWorkerRelayView("reactions", req, leaveOpen, undefined);
} }
export function useReactionsViewCount(ids: Array<NostrLink>, leaveOpen = true) { export function useReactionsViewCount(ids: Array<NostrLink>, leaveOpen = true) {
@ -160,5 +173,5 @@ export function useFollowsContactListView() {
}, },
]; ];
}, [follows]); }, [follows]);
return useSubscribe("follows-contacts-relays", filter[0]); return useWorkerRelayView("follows-contacts-relays", filter, undefined, undefined);
} }

View File

@ -18,6 +18,7 @@
"dependencies": { "dependencies": {
"@sqlite.org/sqlite-wasm": "^3.44.2-build3", "@sqlite.org/sqlite-wasm": "^3.44.2-build3",
"debug": "^4.3.4", "debug": "^4.3.4",
"eventemitter3": "^5.0.1",
"uuid": "^9.0.1" "uuid": "^9.0.1"
}, },
"devDependencies": { "devDependencies": {

View File

@ -5,7 +5,7 @@ import { v4 as uuid } from "uuid";
export class WorkerRelayInterface { export class WorkerRelayInterface {
#worker: Worker; #worker: Worker;
#log = (msg: any) => console.debug(msg); #log = (msg: any) => console.debug(msg);
#commandQueue: Map<string, (v: unknown) => void> = new Map(); #commandQueue: Map<string, (v: unknown, ports: ReadonlyArray<MessagePort>) => void> = new Map();
constructor(path: string) { constructor(path: string) {
this.#log(`Module path: ${path}`); this.#log(`Module path: ${path}`);
@ -14,7 +14,7 @@ export class WorkerRelayInterface {
const cmd = e.data as WorkerMessage<any>; const cmd = e.data as WorkerMessage<any>;
if (cmd.cmd === "reply") { if (cmd.cmd === "reply") {
const q = this.#commandQueue.get(cmd.id); const q = this.#commandQueue.get(cmd.id);
q?.(cmd); q?.(cmd, e.ports);
this.#commandQueue.delete(cmd.id); this.#commandQueue.delete(cmd.id);
} }
}; };
@ -37,7 +37,7 @@ export class WorkerRelayInterface {
} }
async req(req: ReqCommand) { async req(req: ReqCommand) {
return await this.#workerRpc<ReqCommand, Array<NostrEvent>>("req", req); return await this.#workerRpc<ReqCommand, { results: Array<NostrEvent>; port?: Readonly<MessagePort> }>("req", req);
} }
async count(req: ReqCommand) { async count(req: ReqCommand) {
@ -48,6 +48,10 @@ export class WorkerRelayInterface {
return await this.#workerRpc<void, Record<string, number>>("summary"); return await this.#workerRpc<void, Record<string, number>>("summary");
} }
async close(id: string) {
return await this.#workerRpc<string, boolean>("close", id);
}
#workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) { #workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) {
const id = uuid(); const id = uuid();
const msg = { const msg = {
@ -58,13 +62,14 @@ export class WorkerRelayInterface {
this.#worker.postMessage(msg); this.#worker.postMessage(msg);
return new Promise<R>((resolve, reject) => { return new Promise<R>((resolve, reject) => {
let t: ReturnType<typeof setTimeout>; let t: ReturnType<typeof setTimeout>;
this.#commandQueue.set(id, v => { this.#commandQueue.set(id, (v, ports) => {
clearTimeout(t); clearTimeout(t);
const cmdReply = v as WorkerMessage<R>; const cmdReply = v as WorkerMessage<R>;
resolve(cmdReply.args); resolve({ ...cmdReply.args, port: ports.length > 0 ? ports[0] : undefined });
}); });
t = setTimeout(() => { t = setTimeout(() => {
reject("timeout"); reject("timeout");
this.#commandQueue.delete(id);
}, timeout); }, timeout);
}); });
} }

View File

@ -1,8 +1,13 @@
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm"; import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import debug from "debug"; import debug from "debug";
import { EventEmitter } from "eventemitter3";
import { NostrEvent, ReqFilter, unixNowMs } from "./types"; import { NostrEvent, ReqFilter, unixNowMs } from "./types";
export class WorkerRelay { export interface WorkerRelayEvents {
event: (evs: Array<NostrEvent>) => void;
}
export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
#sqlite?: Sqlite3Static; #sqlite?: Sqlite3Static;
#log = debug("WorkerRelay"); #log = debug("WorkerRelay");
#db?: Database; #db?: Database;
@ -72,6 +77,7 @@ export class WorkerRelay {
}); });
if (eventInserted) { if (eventInserted) {
this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`); this.#log(`Inserted: kind=${ev.kind},authors=${ev.pubkey},id=${ev.id}`);
this.emit("event", [ev]);
} }
return eventInserted; return eventInserted;
} }
@ -90,6 +96,7 @@ export class WorkerRelay {
}); });
if (eventsInserted.length > 0) { if (eventsInserted.length > 0) {
this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}`); this.#log(`Inserted Batch: ${eventsInserted.length}/${evs.length}`);
this.emit("event", eventsInserted);
} }
return eventsInserted.length > 0; return eventsInserted.length > 0;
} }

View File

@ -1,6 +1,6 @@
export interface WorkerMessage<T> { export interface WorkerMessage<T> {
id: string; id: string;
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary"; cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close";
args: T; args: T;
} }
@ -14,7 +14,11 @@ export interface NostrEvent {
sig: string; sig: string;
} }
export type ReqCommand = [cmd: "REQ", id: string, ...filters: Array<ReqFilter>]; export interface ReqCommand {
id: string;
filters: Array<ReqFilter>;
leaveOpen?: boolean;
}
export interface ReqFilter { export interface ReqFilter {
ids?: string[]; ids?: string[];

View File

@ -4,14 +4,41 @@ import { WorkQueueItem, barrierQueue, processWorkQueue } from "./queue";
import { WorkerRelay } from "./relay"; import { WorkerRelay } from "./relay";
import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types"; import { NostrEvent, ReqCommand, ReqFilter, WorkerMessage } from "./types";
const relay = new WorkerRelay(); interface PortedFilter {
filters: Array<ReqFilter>;
port: MessagePort;
}
async function reply<T>(id: string, obj?: T) { // Active open subscriptions awaiting new events
globalThis.postMessage({ const ActiveSubscriptions = new Map<string, PortedFilter>();
id,
cmd: "reply", const relay = new WorkerRelay();
args: obj, relay.on("event", evs => {
} as WorkerMessage<T>); for (const pf of ActiveSubscriptions.values()) {
const pfSend = [];
for (const ev of evs) {
for (const fx of pf.filters) {
if (eventMatchesFilter(ev, fx)) {
pfSend.push(ev);
continue;
}
}
}
if (pfSend.length > 0) {
pf.port.postMessage(pfSend);
}
}
});
async function reply<T>(id: string, obj?: T, transferables?: Transferable[]) {
globalThis.postMessage(
{
id,
cmd: "reply",
args: obj,
} as WorkerMessage<T>,
transferables ?? [],
);
} }
// Event inserter queue // Event inserter queue
@ -25,7 +52,7 @@ async function insertBatch() {
} }
setTimeout(() => insertBatch(), 100); setTimeout(() => insertBatch(), 100);
let cmdQueue: Array<WorkQueueItem> = []; const cmdQueue: Array<WorkQueueItem> = [];
processWorkQueue(cmdQueue, 50); processWorkQueue(cmdQueue, 50);
globalThis.onclose = () => { globalThis.onclose = () => {
@ -64,14 +91,26 @@ globalThis.onmessage = ev => {
reply(msg.id, true); reply(msg.id, true);
break; break;
} }
case "close": {
ActiveSubscriptions.delete(msg.args as string);
reply(msg.id, true);
break;
}
case "req": { case "req": {
barrierQueue(cmdQueue, async () => { barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand; const req = msg.args as ReqCommand;
const chan = new MessageChannel();
if (req.leaveOpen) {
ActiveSubscriptions.set(req.id, {
filters: req.filters,
port: chan.port1,
});
}
const results = []; const results = [];
for (const r of req.slice(2)) { for (const r of req.filters) {
results.push(...relay.req(r as ReqFilter)); results.push(...relay.req(r as ReqFilter));
} }
reply(msg.id, results); reply(msg.id, { results }, req.leaveOpen ? [chan.port2] : undefined);
}); });
break; break;
} }
@ -79,7 +118,7 @@ globalThis.onmessage = ev => {
barrierQueue(cmdQueue, async () => { barrierQueue(cmdQueue, async () => {
const req = msg.args as ReqCommand; const req = msg.args as ReqCommand;
let results = 0; let results = 0;
for (const r of req.slice(2)) { for (const r of req.filters) {
const c = relay.count(r as ReqFilter); const c = relay.count(r as ReqFilter);
results += c; results += c;
} }
@ -103,3 +142,22 @@ globalThis.onmessage = ev => {
reply(msg.id, { error: e }); reply(msg.id, { error: e });
} }
}; };
export function eventMatchesFilter(ev: NostrEvent, filter: ReqFilter) {
if (filter.since && ev.created_at < filter.since) {
return false;
}
if (filter.until && ev.created_at > filter.until) {
return false;
}
if (!(filter.ids?.includes(ev.id) ?? true)) {
return false;
}
if (!(filter.authors?.includes(ev.pubkey) ?? true)) {
return false;
}
if (!(filter.kinds?.includes(ev.kind) ?? true)) {
return false;
}
return true;
}

View File

@ -3107,6 +3107,7 @@ __metadata:
"@types/debug": ^4.1.12 "@types/debug": ^4.1.12
"@types/uuid": ^9.0.7 "@types/uuid": ^9.0.7
debug: ^4.3.4 debug: ^4.3.4
eventemitter3: ^5.0.1
typescript: ^5.2.2 typescript: ^5.2.2
uuid: ^9.0.1 uuid: ^9.0.1
languageName: unknown languageName: unknown