fix: relay-worker insert replacable events

This commit is contained in:
Kieran 2024-01-18 16:00:20 +00:00
parent e3f8d48ddb
commit f147edd03c
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
8 changed files with 155 additions and 91 deletions

View File

@ -41,11 +41,7 @@ export default function useLoginFeed() {
const { publicKey: pubKey, follows } = login; const { publicKey: pubKey, follows } = login;
const { publisher, system } = useEventPublisher(); const { publisher, system } = useEventPublisher();
const followLists = useFollowsContactListView(); useFollowsContactListView();
useEffect(() => {
followLists.forEach(e => socialGraphInstance.handleEvent(e));
}, followLists);
useEffect(() => { useEffect(() => {
system.checkSigs = login.appData.item.preferences.checkSigs; system.checkSigs = login.appData.item.preferences.checkSigs;
}, [login]); }, [login]);

View File

@ -36,13 +36,13 @@ export function useWorkerRelayView(id: string, filters: Array<ReqFilter>, leaveO
...f, ...f,
limit: undefined, limit: undefined,
until: undefined, until: undefined,
since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined),
})) }))
.forEach(f => rb.withBareFilter(f)); .forEach(f => rb.withBareFilter(f));
setRb(rb); setRb(rb);
}); });
Relay.req({ id, filters, leaveOpen }).then(res => { Relay.req({ id, filters, leaveOpen }).then(res => {
setEvents(res.results); setEvents(res.result);
if (res.port) { if (res.port) {
res.port.addEventListener("message", ev => { res.port.addEventListener("message", ev => {
const evs = ev.data as Array<NostrEvent>; const evs = ev.data as Array<NostrEvent>;
@ -82,7 +82,7 @@ export function useWorkerRelayViewCount(id: string, filters: Array<ReqFilter>, m
...f, ...f,
limit: undefined, limit: undefined,
until: undefined, until: undefined,
since: latest.results?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined), since: latest.result?.at(i)?.created_at ?? (maxWindow ? unixNow() - maxWindow : undefined),
})) }))
.forEach(f => rb.withBareFilter(f)); .forEach(f => rb.withBareFilter(f));
setRb(rb); setRb(rb);

View File

@ -74,25 +74,33 @@ function RelayCacheStats() {
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{Object.entries(counts) {Object.entries(counts).sort(([, a], [, b]) => a > b ? -1 : 1).map(([k, v]) => {
.sort(([, a], [, b]) => (a > b ? -1 : 1))
.map(([k, v]) => {
return ( return (
<tr key={k}> <tr key={k}>
<td>{k}</td> <td><FormattedNumber value={Number(k)} /></td>
<td> <td><FormattedNumber value={v} /></td>
<FormattedNumber value={v} />
</td>
</tr> </tr>
); );
})} })}
</tbody> </tbody>
</table> </table>
</div> </div>
<div> <div className="flex flex-col gap-2">
<AsyncButton onClick={() => { }}> <AsyncButton onClick={() => { }}>
<FormattedMessage defaultMessage="Clear" id="/GCoTA" /> <FormattedMessage defaultMessage="Clear" id="/GCoTA" />
</AsyncButton> </AsyncButton>
<AsyncButton onClick={async () => {
const data = await Relay.dump();
const url = URL.createObjectURL(new File([data], "snort.db", {
type: "application/octet-stream"
}));
const a = document.createElement("a");
a.href = url;
a.download = "snort.db";
a.click();
}}>
<FormattedMessage defaultMessage="Dump" id="f2CAxA" />
</AsyncButton>
</div> </div>
</div > </div >
); );

View File

@ -1,5 +1,4 @@
import { removeUndefined, throwIfOffline } from "@snort/shared"; import { removeUndefined, throwIfOffline } from "@snort/shared";
import LRUSet from "@snort/shared/src/LRUSet";
import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system"; import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system";
import { WorkerRelayInterface } from "@snort/worker-relay"; import { WorkerRelayInterface } from "@snort/worker-relay";
import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url"; import WorkerRelayPath from "@snort/worker-relay/dist/worker?worker&url";
@ -63,28 +62,13 @@ export async function fetchProfile(key: string) {
} }
export const Relay = new WorkerRelayInterface(WorkerRelayPath); export const Relay = new WorkerRelayInterface(WorkerRelayPath);
let relayInitStarted = false;
export async function initRelayWorker() { export async function initRelayWorker() {
if (relayInitStarted) return;
relayInitStarted = true;
try { try {
if (await Relay.init()) { if (await Relay.init()) {
if (await Relay.open()) { if (await Relay.open()) {
await Relay.migrate(); await Relay.migrate();
const seen = new LRUSet<string>(100); System.on("event", (_, ev) => {
System.on("event", async (_, ev) => { Relay.event(ev);
if (seen.has(ev.id)) return;
seen.add(ev.id);
await Relay.event(ev);
});
System.on("request", async (subId, f) => {
const evs = await Relay.req(["REQ", "", ...f.filters]);
evs.forEach(ev => {
seen.add(ev.id);
queueMicrotask(() => {
System.HandleEvent(subId, { ...ev, relays: [] });
});
});
}); });
} }
} }

View File

@ -21,35 +21,39 @@ export class WorkerRelayInterface {
} }
async init() { async init() {
return await this.#workerRpc<void, boolean>("init"); return (await this.#workerRpc<void, boolean>("init")).result;
} }
async open() { async open() {
return await this.#workerRpc<void, boolean>("open"); return (await this.#workerRpc<void, boolean>("open")).result;
} }
async migrate() { async migrate() {
return await this.#workerRpc<void, boolean>("migrate"); return (await this.#workerRpc<void, boolean>("migrate")).result;
} }
async event(ev: NostrEvent) { async event(ev: NostrEvent) {
return await this.#workerRpc<NostrEvent, boolean>("event", ev); return (await this.#workerRpc<NostrEvent, boolean>("event", ev)).result;
} }
async req(req: ReqCommand) { async req(req: ReqCommand) {
return await this.#workerRpc<ReqCommand, { results: Array<NostrEvent>; port?: Readonly<MessagePort> }>("req", req); return await this.#workerRpc<ReqCommand, Array<NostrEvent>>("req", req);
} }
async count(req: ReqCommand) { async count(req: ReqCommand) {
return await this.#workerRpc<ReqCommand, number>("count", req); return (await this.#workerRpc<ReqCommand, number>("count", req)).result;
} }
async summary() { async summary() {
return await this.#workerRpc<void, Record<string, number>>("summary"); return (await this.#workerRpc<void, Record<string, number>>("summary")).result;
} }
async close(id: string) { async close(id: string) {
return await this.#workerRpc<string, boolean>("close", id); return (await this.#workerRpc<string, boolean>("close", id)).result;
}
async dump() {
return (await this.#workerRpc<void, Uint8Array>("dumpDb")).result;
} }
#workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) { #workerRpc<T, R>(cmd: string, args?: T, timeout = 30_000) {
@ -60,12 +64,18 @@ export class WorkerRelayInterface {
args, args,
} as WorkerMessage<T>; } as WorkerMessage<T>;
this.#worker.postMessage(msg); this.#worker.postMessage(msg);
return new Promise<R>((resolve, reject) => { return new Promise<{
result: R;
port: MessagePort | undefined;
}>((resolve, reject) => {
let t: ReturnType<typeof setTimeout>; let t: ReturnType<typeof setTimeout>;
this.#commandQueue.set(id, (v, ports) => { this.#commandQueue.set(id, (v, port) => {
clearTimeout(t); clearTimeout(t);
const cmdReply = v as WorkerMessage<R>; const cmdReply = v as WorkerMessage<R>;
resolve({ ...cmdReply.args, port: ports.length > 0 ? ports[0] : undefined }); resolve({
result: cmdReply.args,
port: port.length > 0 ? port[0] : undefined,
});
}); });
t = setTimeout(() => { t = setTimeout(() => {
reject("timeout"); reject("timeout");

View File

@ -9,8 +9,9 @@ export interface WorkerRelayEvents {
export class WorkerRelay extends EventEmitter<WorkerRelayEvents> { export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
#sqlite?: Sqlite3Static; #sqlite?: Sqlite3Static;
#log = debug("WorkerRelay"); #log = (...args: any[]) => console.debug(...args);
#db?: Database; #db?: Database;
#seenInserts = new Set<string>();
/** /**
* Initialize the SQLite driver * Initialize the SQLite driver
@ -101,27 +102,45 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
return eventsInserted.length > 0; return eventsInserted.length > 0;
} }
#deleteById(db: Database, ids: Array<string>) {
db.exec(`delete from events where id in (${this.#repeatParams(ids.length)})`, {
bind: ids,
});
}
#insertEvent(db: Database, ev: NostrEvent) { #insertEvent(db: Database, ev: NostrEvent) {
if (this.#seenInserts.has(ev.id)) return false;
const legacyReplacable = [0, 3, 41]; const legacyReplacable = [0, 3, 41];
if (legacyReplacable.includes(ev.kind) || (ev.kind >= 10_000 && ev.kind < 20_000)) { if (legacyReplacable.includes(ev.kind) || (ev.kind >= 10_000 && ev.kind < 20_000)) {
db.exec("delete from events where kind = ? and pubkey = ? and created < ?", { const oldEvents = db.selectValues("select id from events where kind = ? and pubkey = ? and created <= ?", [
bind: [ev.kind, ev.pubkey, ev.created_at], ev.kind,
}); ev.pubkey,
const oldDeleted = db.changes(); ev.created_at,
if (oldDeleted === 0) { ]) as Array<string>;
if (oldEvents.includes(ev.id)) {
// we already have this event, return
this.#seenInserts.add(ev.id);
if (oldEvents.length > 1) {
const toDelete = oldEvents.filter(a => a !== ev.id);
this.#deleteById(db, toDelete);
}
return false; return false;
} }
} }
if (ev.kind >= 30_000 && ev.kind < 40_000) { if (ev.kind >= 30_000 && ev.kind < 40_000) {
const dTag = ev.tags.find(a => a[0] === "d")![1]; const dTag = ev.tags.find(a => a[0] === "d")![1];
db.exec( const oldEvents = db.selectValues(
"delete from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)", "select id from events where id in (select id from events, tags where events.id = tags.event_id and tags.key = ? and tags.value = ?)",
{ ["d", dTag],
bind: ["d", dTag], ) as Array<string>;
}, if (oldEvents.includes(ev.id)) {
); // we have this version
const oldDeleted = db.changes(); this.#seenInserts.add(ev.id);
if (oldDeleted === 0) { if (oldEvents.length > 1) {
const toDelete = oldEvents.filter(a => a !== ev.id);
this.#deleteById(db, toDelete);
}
return false; return false;
} }
} }
@ -136,23 +155,21 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
}); });
} }
} }
this.#seenInserts.add(ev.id);
return eventInserted; return eventInserted;
} }
/** /**
* Query relay by nostr filter * Query relay by nostr filter
*/ */
req(req: ReqFilter) { req(id: string, req: ReqFilter) {
const start = unixNowMs(); const start = unixNowMs();
const [sql, params] = this.#buildQuery(req); const [sql, params] = this.#buildQuery(req);
const rows = this.#db?.exec(sql, { const res = this.#db?.selectArrays(sql, params);
bind: params, const results = res?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? [];
returnValue: "resultRows",
});
const results = rows?.map(a => JSON.parse(a[0] as string) as NostrEvent) ?? [];
const time = unixNowMs() - start; const time = unixNowMs() - start;
this.#log(`Query results took ${time.toLocaleString()}ms`); //this.#log(`Query ${id} results took ${time.toLocaleString()}ms`);
return results; return results;
} }
@ -182,38 +199,55 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []); return Object.fromEntries(res?.map(a => [String(a[0]), a[1] as number]) ?? []);
} }
#buildQuery(req: ReqFilter, count = false) { /**
* Dump the database file
*/
async dump() {
const filePath = String(this.#db?.filename ?? "");
try {
this.#db?.close();
this.#db = undefined;
const dir = await navigator.storage.getDirectory();
// @ts-expect-error
for await (const [name, file] of dir) {
if (`/${name}` === filePath) {
const fh = await (file as FileSystemFileHandle).getFile();
const ret = new Uint8Array(await fh.arrayBuffer());
return ret;
}
}
} catch (e) {
console.error(e);
} finally {
this.open(filePath);
}
return new Uint8Array();
}
#buildQuery(req: ReqFilter, count = false): [string, Array<any>] {
const conditions: Array<string> = []; const conditions: Array<string> = [];
const params: Array<any> = []; const params: Array<any> = [];
const repeatParams = (n: number) => {
const ret: Array<string> = [];
for (let x = 0; x < n; x++) {
ret.push("?");
}
return ret.join(", ");
};
let sql = `select ${count ? "count(json)" : "json"} from events`; let sql = `select ${count ? "count(json)" : "json"} from events`;
const tags = Object.entries(req).filter(([k]) => k.startsWith("#")); const tags = Object.entries(req).filter(([k]) => k.startsWith("#"));
for (const [key, values] of tags) { for (const [key, values] of tags) {
const vArray = values as Array<string>; const vArray = values as Array<string>;
sql += ` inner join tags on events.id = tags.event_id and tags.key = ? and tags.value in (${repeatParams( sql += ` inner join tags on events.id = tags.event_id and tags.key = ? and tags.value in (${this.#repeatParams(
vArray.length, vArray.length,
)})`; )})`;
params.push(key.slice(1)); params.push(key.slice(1));
params.push(...vArray); params.push(...vArray);
} }
if (req.ids) { if (req.ids) {
conditions.push(`id in (${repeatParams(req.ids.length)})`); conditions.push(`id in (${this.#repeatParams(req.ids.length)})`);
params.push(...req.ids); params.push(...req.ids);
} }
if (req.authors) { if (req.authors) {
conditions.push(`pubkey in (${repeatParams(req.authors.length)})`); conditions.push(`pubkey in (${this.#repeatParams(req.authors.length)})`);
params.push(...req.authors); params.push(...req.authors);
} }
if (req.kinds) { if (req.kinds) {
conditions.push(`kind in (${repeatParams(req.kinds.length)})`); conditions.push(`kind in (${this.#repeatParams(req.kinds.length)})`);
params.push(...req.kinds); params.push(...req.kinds);
} }
if (req.since) { if (req.since) {
@ -233,6 +267,32 @@ export class WorkerRelay extends EventEmitter<WorkerRelayEvents> {
return [sql, params]; return [sql, params];
} }
#repeatParams(n: number) {
const ret: Array<string> = [];
for (let x = 0; x < n; x++) {
ret.push("?");
}
return ret.join(", ");
}
#replaceParamsDebug(sql: string, params: Array<number | string>) {
let res = "";
let cIdx = 0;
for (const chr of sql) {
if (chr === "?") {
const px = params[cIdx++];
if (typeof px === "number") {
res += px.toString();
} else if (typeof px === "string") {
res += `'${px}'`;
}
} else {
res += chr;
}
}
return res;
}
#migrate_v1() { #migrate_v1() {
this.#db?.transaction(db => { this.#db?.transaction(db => {
db.exec( db.exec(

View File

@ -1,6 +1,6 @@
export interface WorkerMessage<T> { export interface WorkerMessage<T> {
id: string; id: string;
cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close"; cmd: "reply" | "init" | "open" | "migrate" | "event" | "req" | "count" | "summary" | "close" | "dumpDb";
args: T; args: T;
} }

View File

@ -60,8 +60,6 @@ globalThis.onclose = () => {
}; };
globalThis.onmessage = ev => { globalThis.onmessage = ev => {
//console.debug(ev);
const msg = ev.data as WorkerMessage<any>; const msg = ev.data as WorkerMessage<any>;
try { try {
switch (msg.cmd) { switch (msg.cmd) {
@ -108,9 +106,9 @@ globalThis.onmessage = ev => {
} }
const results = []; const results = [];
for (const r of req.filters) { for (const r of req.filters) {
results.push(...relay.req(r as ReqFilter)); results.push(...relay.req(req.id, r as ReqFilter));
} }
reply(msg.id, { results }, req.leaveOpen ? [chan.port2] : undefined); reply(msg.id, results, req.leaveOpen ? [chan.port2] : undefined);
}); });
break; break;
} }
@ -133,12 +131,20 @@ globalThis.onmessage = ev => {
}); });
break; break;
} }
case "dumpDb": {
barrierQueue(cmdQueue, async () => {
const res = await relay.dump();
reply(msg.id, res);
});
break;
}
default: { default: {
reply(msg.id, { error: "Unknown command" }); reply(msg.id, { error: "Unknown command" });
break; break;
} }
} }
} catch (e) { } catch (e) {
console.error(e);
reply(msg.id, { error: e }); reply(msg.id, { error: e });
} }
}; };