refactor: live streams

This commit is contained in:
2024-09-12 19:46:04 +01:00
parent 9049f337b0
commit c74af0159c
9 changed files with 99 additions and 123 deletions

View File

@ -25,7 +25,7 @@ async function tryUseCacheRelay(url: string) {
localStorage.setItem("cache-relay", url); localStorage.setItem("cache-relay", url);
return conn; return conn;
} catch (e) { } catch (e) {
console.error(e); console.warn(e);
} }
} }
@ -53,7 +53,7 @@ export async function initRelayWorker() {
} }
try { try {
await workerRelay.debug(""); await workerRelay.debug("*");
await workerRelay.init({ await workerRelay.init({
databasePath: "relay.db", databasePath: "relay.db",
insertBatchSize: 100, insertBatchSize: 100,

View File

@ -5,7 +5,6 @@ import { CSSProperties, useMemo } from "react";
import { FormattedMessage } from "react-intl"; import { FormattedMessage } from "react-intl";
import { Link } from "react-router-dom"; import { Link } from "react-router-dom";
import useFollowsControls from "@/Hooks/useFollowControls";
import useImgProxy from "@/Hooks/useImgProxy"; import useImgProxy from "@/Hooks/useImgProxy";
import { findTag } from "@/Utils"; import { findTag } from "@/Utils";
import { Hour } from "@/Utils/Const"; import { Hour } from "@/Utils/Const";
@ -13,21 +12,16 @@ import { Hour } from "@/Utils/Const";
import Avatar from "../User/Avatar"; import Avatar from "../User/Avatar";
export function LiveStreams() { export function LiveStreams() {
const { followList } = useFollowsControls();
const sub = useMemo(() => { const sub = useMemo(() => {
const rb = new RequestBuilder("follows:streams"); const rb = new RequestBuilder("streams");
if (followList.length > 0) { rb.withFilter()
rb.withFilter() .kinds([EventKind.LiveEvent])
.kinds([EventKind.LiveEvent]) .since(unixNow() - Hour);
.authors(followList) rb.withFilter()
.since(unixNow() - Hour); .kinds([EventKind.LiveEvent])
rb.withFilter() .since(unixNow() - Hour);
.kinds([EventKind.LiveEvent])
.tag("p", followList)
.since(unixNow() - Hour);
}
return rb; return rb;
}, [followList.length]); }, []);
const streams = useRequestBuilder(sub); const streams = useRequestBuilder(sub);
if (streams.length === 0) return null; if (streams.length === 0) return null;
@ -72,16 +66,22 @@ function LiveStreamEvent({ ev }: { ev: NostrEvent }) {
backgroundImage: `url(${imageProxy})`, backgroundImage: `url(${imageProxy})`,
} as CSSProperties } as CSSProperties
}></div> }></div>
<div className="absolute left-0 top-7 w-full overflow-hidden"> <div className="absolute left-0 top-0 w-full overflow-hidden">
<div className="whitespace-nowrap px-2 text-ellipsis overflow-hidden text-xs">{title}</div> <div
className="whitespace-nowrap px-1 text-ellipsis overflow-hidden text-xs font-medium bg-background opacity-70 text-center"
title={title}>
{title}
</div>
</div> </div>
<div className="absolute top-1 left-1 bg-heart rounded-md px-2 uppercase font-bold">{status}</div> <div className="absolute bottom-1 left-1 bg-heart rounded-md px-2 uppercase font-bold">{status}</div>
<div className="absolute right-1 top-1"> <div className="absolute right-1 bottom-1">
<Avatar pubkey={host} user={hostProfile} size={25} className="outline outline-2 outline-highlight" /> <Avatar pubkey={host} user={hostProfile} size={25} className="outline outline-2 outline-highlight" />
</div> </div>
<div className="absolute left-1 bottom-1 rounded-md px-2 py-1 text-xs bg-gray font-medium"> {viewers && (
<FormattedMessage defaultMessage="{n} viewers" values={{ n: viewers }} /> <div className="absolute left-1 bottom-7 rounded-md px-2 py-1 text-xs bg-gray font-medium">
</div> <FormattedMessage defaultMessage="{n} viewers" values={{ n: viewers }} />
</div>
)}
</div> </div>
</Link> </Link>
); );

View File

@ -28,11 +28,11 @@ export default function useLoginFeed() {
}, [login, publisher, system]); }, [login, publisher, system]);
const subLogin = useMemo(() => { const subLogin = useMemo(() => {
const b = new RequestBuilder(`login:sub`);
b.withOptions({
leaveOpen: true,
});
if (CONFIG.features.subscriptions && !login.readonly) { if (CONFIG.features.subscriptions && !login.readonly) {
const b = new RequestBuilder(`login`);
b.withOptions({
leaveOpen: true,
});
if (pubKey) { if (pubKey) {
b.withFilter() b.withFilter()
.relay("wss://relay.snort.social/") .relay("wss://relay.snort.social/")
@ -41,8 +41,8 @@ export default function useLoginFeed() {
.tag("p", [pubKey]) .tag("p", [pubKey])
.limit(10); .limit(10);
} }
return b;
} }
return b;
}, [pubKey, login]); }, [pubKey, login]);
const loginFeed = useRequestBuilder(subLogin); const loginFeed = useRequestBuilder(subLogin);

View File

@ -18,7 +18,7 @@
"dist" "dist"
], ],
"dependencies": { "dependencies": {
"@sqlite.org/sqlite-wasm": "^3.45.3-build3", "@sqlite.org/sqlite-wasm": "^3.46.1-build3",
"eventemitter3": "^5.0.1", "eventemitter3": "^5.0.1",
"uuid": "^9.0.1" "uuid": "^9.0.1"
}, },

View File

@ -64,7 +64,6 @@ export class WorkerRelayInterface {
} }
async delete(req: ReqCommand) { async delete(req: ReqCommand) {
console.debug("DELETE", req);
return await this.#workerRpc<ReqCommand, Array<string>>("delete", req); return await this.#workerRpc<ReqCommand, Array<string>>("delete", req);
} }

View File

@ -4,7 +4,7 @@ export interface WorkQueueItem {
reject(e: unknown): void; reject(e: unknown): void;
} }
export async function processWorkQueue(queue?: Array<WorkQueueItem>, queueDelay = 200) { export async function processWorkQueue(queue?: Array<WorkQueueItem>) {
while (queue && queue.length > 0) { while (queue && queue.length > 0) {
const v = queue.shift(); const v = queue.shift();
if (v) { if (v) {
@ -16,7 +16,6 @@ export async function processWorkQueue(queue?: Array<WorkQueueItem>, queueDelay
} }
} }
} }
setTimeout(() => processWorkQueue(queue, queueDelay), queueDelay);
} }
export const barrierQueue = async <T>(queue: Array<WorkQueueItem>, then: () => Promise<T>): Promise<T> => { export const barrierQueue = async <T>(queue: Array<WorkQueueItem>, then: () => Promise<T>): Promise<T> => {

View File

@ -24,22 +24,20 @@ let eventWriteQueue: Array<NostrEvent> = [];
async function insertBatch() { async function insertBatch() {
// Only insert event batches when the command queue is empty // Only insert event batches when the command queue is empty
// This is to make req's execute first and not block them // This is to make req's execute first and not block them
if (eventWriteQueue.length > 0 && cmdQueue.length === 0) { if (eventWriteQueue.length > 0) {
await barrierQueue(cmdQueue, async () => { const start = unixNowMs();
const start = unixNowMs(); const timeLimit = 1000;
const timeLimit = 1000; if (relay) {
if (relay) { while (eventWriteQueue.length > 0) {
while (eventWriteQueue.length > 0) { if (unixNowMs() - start >= timeLimit) {
if (unixNowMs() - start >= timeLimit) { //console.debug("Yield insert, queue length: ", eventWriteQueue.length, ", cmds: ", cmdQueue.length);
//console.debug("Yield insert, queue length: ", eventWriteQueue.length, ", cmds: ", cmdQueue.length); break;
break;
}
const batch = eventWriteQueue.splice(0, insertBatchSize);
eventWriteQueue = eventWriteQueue.slice(batch.length);
relay.eventBatch(batch);
} }
const batch = eventWriteQueue.splice(0, insertBatchSize);
eventWriteQueue = eventWriteQueue.slice(batch.length);
relay.eventBatch(batch);
} }
}); }
} }
setTimeout(() => insertBatch(), 100); setTimeout(() => insertBatch(), 100);
} }
@ -47,7 +45,6 @@ async function insertBatch() {
const cmdQueue: Array<WorkQueueItem> = []; const cmdQueue: Array<WorkQueueItem> = [];
try { try {
setTimeout(() => insertBatch(), 100); setTimeout(() => insertBatch(), 100);
processWorkQueue(cmdQueue, 50);
} catch (e) { } catch (e) {
console.error(e); console.error(e);
} }
@ -75,23 +72,21 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
break; break;
} }
case "init": { case "init": {
await barrierQueue(cmdQueue, async () => { const args = msg.args as InitAargs;
const args = msg.args as InitAargs; insertBatchSize = args.insertBatchSize ?? 10;
insertBatchSize = args.insertBatchSize ?? 10; try {
try { if ("WebAssembly" in self) {
if ("WebAssembly" in self) { relay = new SqliteRelay();
relay = new SqliteRelay(); } else {
} else {
relay = new InMemoryRelay();
}
await relay.init(args.databasePath);
} catch (e) {
console.error("Fallback to InMemoryRelay", e);
relay = new InMemoryRelay(); relay = new InMemoryRelay();
await relay.init(args.databasePath);
} }
reply(msg.id, true); await relay.init(args.databasePath);
}); } catch (e) {
console.error("Fallback to InMemoryRelay", e);
relay = new InMemoryRelay();
await relay.init(args.databasePath);
}
reply(msg.id, true);
break; break;
} }
case "event": { case "event": {
@ -105,85 +100,68 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
break; break;
} }
case "close": { case "close": {
await barrierQueue(cmdQueue, async () => { const res = relay!.close();
const res = relay!.close(); reply(msg.id, res);
reply(msg.id, res);
});
break; break;
} }
case "req": { case "req": {
await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand;
const req = msg.args as ReqCommand; const filters = req.slice(2) as Array<ReqFilter>;
const filters = req.slice(2) as Array<ReqFilter>; const results: Array<string | NostrEvent> = [];
const results: Array<string | NostrEvent> = []; const ids = new Set<string>();
const ids = new Set<string>(); for (const r of filters) {
for (const r of filters) { const rx = relay!.req(req[1], r);
const rx = relay!.req(req[1], r); for (const x of rx) {
for (const x of rx) { if ((typeof x === "string" && ids.has(x)) || ids.has((x as NostrEvent).id)) {
if ((typeof x === "string" && ids.has(x)) || ids.has((x as NostrEvent).id)) { continue;
continue;
}
ids.add(typeof x === "string" ? x : (x as NostrEvent).id);
results.push(x);
} }
ids.add(typeof x === "string" ? x : (x as NostrEvent).id);
results.push(x);
} }
reply(msg.id, results); }
}); reply(msg.id, results);
break; break;
} }
case "count": { case "count": {
await barrierQueue(cmdQueue, async () => { const req = msg.args as ReqCommand;
const req = msg.args as ReqCommand; let results = 0;
let results = 0; const filters = req.slice(2) as Array<ReqFilter>;
const filters = req.slice(2) as Array<ReqFilter>; for (const r of filters) {
for (const r of filters) { const c = relay!.count(r);
const c = relay!.count(r); results += c;
results += c; }
} reply(msg.id, results);
reply(msg.id, results);
});
break; break;
} }
case "delete": { case "delete": {
console.debug("DELETE", msg.args); const req = msg.args as ReqCommand;
await barrierQueue(cmdQueue, async () => { let results = [];
const req = msg.args as ReqCommand; const filters = req.slice(2) as Array<ReqFilter>;
let results = []; for (const r of filters) {
const filters = req.slice(2) as Array<ReqFilter>; const c = relay!.delete(r);
for (const r of filters) { results.push(...c);
const c = relay!.delete(r); }
results.push(...c); reply(msg.id, results);
}
reply(msg.id, results);
});
break; break;
} }
case "summary": { case "summary": {
await barrierQueue(cmdQueue, async () => { const res = relay!.summary();
const res = relay!.summary(); reply(msg.id, res);
reply(msg.id, res);
});
break; break;
} }
case "dumpDb": { case "dumpDb": {
await barrierQueue(cmdQueue, async () => { const res = await relay!.dump();
const res = await relay!.dump(); reply(msg.id, res);
reply(msg.id, res);
});
break; break;
} }
case "forYouFeed": { case "forYouFeed": {
await barrierQueue(cmdQueue, async () => { const res = await getForYouFeed(relay!, msg.args as string);
const res = await getForYouFeed(relay!, msg.args as string); reply(msg.id, res);
reply(msg.id, res);
});
break; break;
} }
case "setEventMetadata": { case "setEventMetadata": {
await barrierQueue(cmdQueue, async () => { const [id, metadata] = msg.args as [string, EventMetadata];
const [id, metadata] = msg.args as [string, EventMetadata]; relay!.setEventMetadata(id, metadata);
relay!.setEventMetadata(id, metadata);
});
break; break;
} }
default: { default: {

View File

@ -4842,7 +4842,7 @@ __metadata:
version: 0.0.0-use.local version: 0.0.0-use.local
resolution: "@snort/worker-relay@workspace:packages/worker-relay" resolution: "@snort/worker-relay@workspace:packages/worker-relay"
dependencies: dependencies:
"@sqlite.org/sqlite-wasm": "npm:^3.45.3-build3" "@sqlite.org/sqlite-wasm": "npm:^3.46.1-build3"
"@types/debug": "npm:^4.1.12" "@types/debug": "npm:^4.1.12"
"@types/sharedworker": "npm:^0.0.112" "@types/sharedworker": "npm:^0.0.112"
"@types/uuid": "npm:^9.0.7" "@types/uuid": "npm:^9.0.7"
@ -4860,12 +4860,12 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@sqlite.org/sqlite-wasm@npm:^3.45.3-build3": "@sqlite.org/sqlite-wasm@npm:^3.46.1-build3":
version: 3.45.3-build3 version: 3.46.1-build3
resolution: "@sqlite.org/sqlite-wasm@npm:3.45.3-build3" resolution: "@sqlite.org/sqlite-wasm@npm:3.46.1-build3"
bin: bin:
sqlite-wasm: bin/index.js sqlite-wasm: bin/index.js
checksum: 10/58dd96936973c8bb3989a7d23d6d6021b05e4b35ae997d617d28b5faf5f370196aaf71d1a060eb1822fed3636acb4b5e0d444807d86687bff2a6efa637f9f1dc checksum: 10/a64225fd784ed2ee8c8bf82f042d05b567b4707fba22a9508fbe3ac42cf1cf7e722d2ede0e1d91556bfec66b140aeb3cf3967546249693f39cc81475d7be90ff
languageName: node languageName: node
linkType: hard linkType: hard