useSubscribe, handle emitted requests in sqlite
This commit is contained in:
@ -131,6 +131,10 @@ class InMemoryDB {
|
||||
}
|
||||
}
|
||||
|
||||
count(filter: Filter): number {
|
||||
return this.findArray(filter).length;
|
||||
}
|
||||
|
||||
find(filter: Filter, callback: (event: TaggedNostrEvent) => void): void {
|
||||
this.findArray(filter).forEach(event => {
|
||||
callback(event);
|
||||
|
@ -10,6 +10,8 @@ import { NostrEvent, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nos
|
||||
import { RelayInfo } from "./relay-info";
|
||||
import EventKind from "./event-kind";
|
||||
import { EventExt } from "./event-ext";
|
||||
import { getHex64 } from "./utils";
|
||||
import inMemoryDB from "./InMemoryDB";
|
||||
|
||||
/**
|
||||
* Relay settings
|
||||
@ -210,6 +212,12 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
||||
}
|
||||
*/
|
||||
|
||||
const id = getHex64(e.data as string, "id");
|
||||
if (inMemoryDB.has(id)) {
|
||||
this.#log("Already have, skip processing %s", id);
|
||||
return;
|
||||
}
|
||||
|
||||
const msg = JSON.parse(e.data as string) as Array<string | NostrEvent | boolean>;
|
||||
const tag = msg[0] as string;
|
||||
switch (tag) {
|
||||
@ -232,7 +240,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
||||
} as TaggedNostrEvent;
|
||||
|
||||
if (!EventExt.isValid(ev)) {
|
||||
//this.#log("Rejecting invalid event %O", ev);
|
||||
this.#log("Rejecting invalid event %O", ev);
|
||||
return;
|
||||
}
|
||||
this.emit("event", msg[1] as string, ev);
|
||||
|
@ -109,7 +109,7 @@ export interface SystemInterface {
|
||||
/**
|
||||
* Push an event into the system from external source
|
||||
*/
|
||||
HandleEvent(ev: TaggedNostrEvent): void;
|
||||
HandleEvent(subId: string, ev: TaggedNostrEvent): void;
|
||||
|
||||
/**
|
||||
* Send an event to all permanent connections
|
||||
|
@ -24,12 +24,13 @@ import { RelayMetadataLoader } from "./outbox-model";
|
||||
import { Optimizer, DefaultOptimizer } from "./query-optimizer";
|
||||
import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
|
||||
import { QueryManager } from "./query-manager";
|
||||
import inMemoryDB from "./InMemoryDB";
|
||||
|
||||
export interface NostrSystemEvents {
|
||||
change: (state: SystemSnapshot) => void;
|
||||
auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void;
|
||||
event: (subId: string, ev: TaggedNostrEvent) => void;
|
||||
filters: (filter: BuiltRawReqFilter) => void;
|
||||
request: (subId: string, filter: BuiltRawReqFilter) => void;
|
||||
}
|
||||
|
||||
export interface NostrsystemProps {
|
||||
@ -122,6 +123,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
this.pool.on("event", (_, sub, ev) => {
|
||||
ev.relays?.length && this.relayMetricsHandler.onEvent(ev.relays[0]);
|
||||
this.emit("event", sub, ev);
|
||||
inMemoryDB.handleEvent(ev);
|
||||
});
|
||||
this.pool.on("disconnect", (id, code) => {
|
||||
const c = this.pool.getConnection(id);
|
||||
@ -148,7 +150,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
this.#queryManager.on("trace", t => {
|
||||
this.relayMetricsHandler.onTraceReport(t);
|
||||
});
|
||||
this.#queryManager.on("filters", (f: BuiltRawReqFilter) => this.emit("filters", f));
|
||||
this.#queryManager.on("request", (subId: string, f: BuiltRawReqFilter) => this.emit("request", subId, f));
|
||||
}
|
||||
|
||||
get Sockets(): ConnectionStateSnapshot[] {
|
||||
@ -189,13 +191,14 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
|
||||
return this.#queryManager.query(req);
|
||||
}
|
||||
|
||||
HandleEvent(ev: TaggedNostrEvent) {
|
||||
this.emit("event", "*", ev);
|
||||
HandleEvent(subId: string, ev: TaggedNostrEvent) {
|
||||
inMemoryDB.handleEvent(ev);
|
||||
this.emit("event", subId, ev);
|
||||
this.#queryManager.handleEvent(ev);
|
||||
}
|
||||
|
||||
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]> {
|
||||
this.HandleEvent({ ...ev, relays: [] });
|
||||
this.HandleEvent("*", { ...ev, relays: [] });
|
||||
return await this.pool.broadcast(this, ev, cb);
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ import { trimFilters } from "./request-trim";
|
||||
interface QueryManagerEvents {
|
||||
change: () => void;
|
||||
trace: (report: TraceReport) => void;
|
||||
filters: (req: BuiltRawReqFilter) => void;
|
||||
request: (subId: string, req: BuiltRawReqFilter) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -57,9 +57,9 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
||||
} else {
|
||||
const q = new Query(this.#system, req);
|
||||
q.on("trace", r => this.emit("trace", r));
|
||||
q.on("filters", fx => {
|
||||
q.on("request", (id, fx) => {
|
||||
this.#send(q, fx);
|
||||
this.emit("filters", fx);
|
||||
this.emit("request", id, fx);
|
||||
});
|
||||
|
||||
this.#queries.set(req.id, q);
|
||||
@ -78,7 +78,7 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
||||
async fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
|
||||
const q = new Query(this.#system, req);
|
||||
q.on("trace", r => this.emit("trace", r));
|
||||
q.on("filters", fx => {
|
||||
q.on("request", (subId, fx) => {
|
||||
this.#send(q, fx);
|
||||
});
|
||||
if (cb) {
|
||||
|
@ -100,7 +100,7 @@ export interface TraceReport {
|
||||
export interface QueryEvents {
|
||||
loading: (v: boolean) => void;
|
||||
trace: (report: TraceReport) => void;
|
||||
filters: (req: BuiltRawReqFilter) => void;
|
||||
request: (subId: string, req: BuiltRawReqFilter) => void;
|
||||
event: (evs: Array<TaggedNostrEvent>) => void;
|
||||
end: () => void;
|
||||
}
|
||||
@ -329,11 +329,11 @@ export class Query extends EventEmitter<QueryEvents> {
|
||||
if (!(this.request.options?.skipDiff ?? false) && existing.length > 0) {
|
||||
const filters = this.request.buildDiff(this.#system, existing);
|
||||
this.#log("Build %s %O", this.id, filters);
|
||||
filters.forEach(f => this.emit("filters", f));
|
||||
filters.forEach(f => this.emit("request", this.id, f));
|
||||
} else {
|
||||
const filters = this.request.build(this.#system);
|
||||
this.#log("Build %s %O", this.id, filters);
|
||||
filters.forEach(f => this.emit("filters", f));
|
||||
filters.forEach(f => this.emit("request", this.id, f));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,7 +124,7 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
|
||||
this.#workerRpc(WorkerCommand.DisconnectRelay, address);
|
||||
}
|
||||
|
||||
HandleEvent(ev: TaggedNostrEvent): void {
|
||||
HandleEvent(subId: string, ev: TaggedNostrEvent): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user