diff --git a/packages/app/src/Cache/UserRelayCache.ts b/packages/app/src/Cache/UserRelayCache.ts index 37e38036..c0c9fb5a 100644 --- a/packages/app/src/Cache/UserRelayCache.ts +++ b/packages/app/src/Cache/UserRelayCache.ts @@ -1,7 +1,7 @@ import { db, UsersRelays } from "Db"; import FeedCache from "./FeedCache"; -class UsersRelaysCache extends FeedCache { +export class UsersRelaysCache extends FeedCache { constructor() { super("UserRelays", db.userRelays); } diff --git a/packages/app/src/Element/Relay.tsx b/packages/app/src/Element/Relay.tsx index e4779802..e79ed1ab 100644 --- a/packages/app/src/Element/Relay.tsx +++ b/packages/app/src/Element/Relay.tsx @@ -20,7 +20,9 @@ export interface RelayProps { export default function Relay(props: RelayProps) { const navigate = useNavigate(); const login = useLogin(); - const relaySettings = unwrap(login.relays.item[props.addr] ?? System.Sockets.get(props.addr)?.Settings ?? {}); + const relaySettings = unwrap( + login.relays.item[props.addr] ?? System.Sockets.find(a => a.address === props.addr)?.settings ?? {} + ); const state = useRelayState(props.addr); const name = useMemo(() => getRelayName(props.addr), [props.addr]); diff --git a/packages/app/src/Element/SubDebug.tsx b/packages/app/src/Element/SubDebug.tsx index 66982fba..acf16e47 100644 --- a/packages/app/src/Element/SubDebug.tsx +++ b/packages/app/src/Element/SubDebug.tsx @@ -66,8 +66,8 @@ const SubDebug = () => { return ( <> Connections: - {[...System.Sockets.keys()].map(k => ( - + {System.Sockets.map(k => ( + ))} ); diff --git a/packages/app/src/ExternalStore.ts b/packages/app/src/ExternalStore.ts index f9b77a88..4b1dedea 100644 --- a/packages/app/src/ExternalStore.ts +++ b/packages/app/src/ExternalStore.ts @@ -1,18 +1,18 @@ -type HookFn = () => void; +type HookFn = (e?: TSnapshot) => void; -interface HookFilter { - fn: HookFn; +interface HookFilter { + fn: HookFn; } /** * Simple React hookable store with manual change notifications */ export default abstract class ExternalStore { - #hooks: Array = []; + #hooks: Array> = []; #snapshot: Readonly = {} as Readonly; #changed = true; - hook(fn: HookFn) { + hook(fn: HookFn) { this.#hooks.push({ fn, }); @@ -32,9 +32,9 @@ export default abstract class ExternalStore { return this.#snapshot; } - protected notifyChange() { + protected notifyChange(sn?: TSnapshot) { this.#changed = true; - this.#hooks.forEach(h => h.fn()); + this.#hooks.forEach(h => h.fn(sn)); } abstract takeSnapshot(): TSnapshot; diff --git a/packages/app/src/Feed/RelayState.ts b/packages/app/src/Feed/RelayState.ts index 830ccb66..c5c5c5a1 100644 --- a/packages/app/src/Feed/RelayState.ts +++ b/packages/app/src/Feed/RelayState.ts @@ -1,18 +1,6 @@ -import { useSyncExternalStore } from "react"; -import { StateSnapshot } from "System"; import { System } from "index"; -const noop = () => { - return () => undefined; -}; -const noopState = (): StateSnapshot | undefined => { - return undefined; -}; - export default function useRelayState(addr: string) { - const c = System.Sockets.get(addr); - return useSyncExternalStore( - c?.StatusHook.bind(c) ?? noop, - c?.GetState.bind(c) ?? noopState - ); + const c = System.Sockets.find(a => a.address === addr); + return c; } diff --git a/packages/app/src/Hooks/useRequestBuilder.tsx b/packages/app/src/Hooks/useRequestBuilder.tsx index 70d28030..5f7a37f7 100644 --- a/packages/app/src/Hooks/useRequestBuilder.tsx +++ b/packages/app/src/Hooks/useRequestBuilder.tsx @@ -10,7 +10,7 @@ const useRequestBuilder = { const subscribe = (onChanged: () => void) => { - const store = System.Query(type, rb); + const store = (System.Query(type, rb)?.feed as TStore) ?? new type(); let t: ReturnType | undefined; const release = store.hook(() => { if (!t) { @@ -24,7 +24,7 @@ const useRequestBuilder = { if (rb?.id) { - System.CancelQuery(rb.id); + System.GetQuery(rb.id)?.cancel(); } release(); }; diff --git a/packages/app/src/Pages/Layout.tsx b/packages/app/src/Pages/Layout.tsx index 76dc35a7..6c450958 100644 --- a/packages/app/src/Pages/Layout.tsx +++ b/packages/app/src/Pages/Layout.tsx @@ -75,9 +75,9 @@ export default function Layout() { for (const [k, v] of Object.entries(relays.item)) { await System.ConnectToRelay(k, v); } - for (const [k, c] of System.Sockets) { - if (!relays.item[k] && !c.Ephemeral) { - System.DisconnectRelay(k); + for (const v of System.Sockets) { + if (!relays.item[v.address] && !v.ephemeral) { + System.DisconnectRelay(v.address); } } })(); diff --git a/packages/app/src/Pages/NostrLinkHandler.tsx b/packages/app/src/Pages/NostrLinkHandler.tsx index bc2397e0..3a4d966d 100644 --- a/packages/app/src/Pages/NostrLinkHandler.tsx +++ b/packages/app/src/Pages/NostrLinkHandler.tsx @@ -6,7 +6,6 @@ import { useNavigate, useParams } from "react-router-dom"; import Spinner from "Icons/Spinner"; import { parseNostrLink, profileLink } from "SnortUtils"; import { getNip05PubKey } from "Pages/LoginPage"; -import { System } from "index"; export default function NostrLinkHandler() { const params = useParams(); @@ -18,9 +17,6 @@ export default function NostrLinkHandler() { async function handleLink(link: string) { const nav = parseNostrLink(link); if (nav) { - if ((nav.relays?.length ?? 0) > 0) { - nav.relays?.map(a => System.ConnectEphemeralRelay(a)); - } if (nav.type === NostrPrefix.Event || nav.type === NostrPrefix.Note || nav.type === NostrPrefix.Address) { navigate(`/e/${nav.encode()}`); } else if (nav.type === NostrPrefix.PublicKey || nav.type === NostrPrefix.Profile) { diff --git a/packages/app/src/Pages/Root.tsx b/packages/app/src/Pages/Root.tsx index 10de36e0..74c63627 100644 --- a/packages/app/src/Pages/Root.tsx +++ b/packages/app/src/Pages/Root.tsx @@ -170,10 +170,10 @@ const GlobalTab = () => { useEffect(() => { return debounce(500, () => { const ret: RelayOption[] = []; - System.Sockets.forEach((v, k) => { + System.Sockets.forEach(v => { ret.push({ - url: k, - paid: v.Info?.limitation?.payment_required ?? false, + url: v.address, + paid: v.info?.limitation?.payment_required ?? false, }); }); ret.sort(a => (a.paid ? -1 : 1)); diff --git a/packages/app/src/Pages/SearchPage.tsx b/packages/app/src/Pages/SearchPage.tsx index f7268402..28ddd82b 100644 --- a/packages/app/src/Pages/SearchPage.tsx +++ b/packages/app/src/Pages/SearchPage.tsx @@ -42,7 +42,7 @@ const SearchPage = () => { useEffect(() => { const addedRelays: string[] = []; for (const [k, v] of SearchRelays) { - if (!System.Sockets.has(k)) { + if (!System.Sockets.some(v => v.address === k)) { System.ConnectToRelay(k, v); addedRelays.push(k); } diff --git a/packages/app/src/Pages/ZapPool.tsx b/packages/app/src/Pages/ZapPool.tsx index 431d0af3..44614edd 100644 --- a/packages/app/src/Pages/ZapPool.tsx +++ b/packages/app/src/Pages/ZapPool.tsx @@ -78,15 +78,14 @@ export default function ZapPoolPage() { const { wallet } = useWallet(); const relayConnections = useMemo(() => { - return [...System.Sockets.values()] - .map(a => { - if (a.Info?.pubkey && !a.Ephemeral) { - return { - address: a.Address, - pubkey: a.Info.pubkey, - }; - } - }) + return System.Sockets.map(a => { + if (a.info?.pubkey && !a.ephemeral) { + return { + address: a.address, + pubkey: a.info.pubkey, + }; + } + }) .filter(a => a !== undefined) .map(unwrap); }, [login.relays]); diff --git a/packages/app/src/Pages/settings/RelayInfo.tsx b/packages/app/src/Pages/settings/RelayInfo.tsx index fc6d9a2f..cf292887 100644 --- a/packages/app/src/Pages/settings/RelayInfo.tsx +++ b/packages/app/src/Pages/settings/RelayInfo.tsx @@ -14,8 +14,8 @@ const RelayInfo = () => { const navigate = useNavigate(); const login = useLogin(); - const conn = Array.from(System.Sockets.values()).find(a => a.Id === params.id); - const stats = useRelayState(conn?.Address ?? ""); + const conn = System.Sockets.find(a => a.id === params.id); + const stats = useRelayState(conn?.address ?? ""); return ( <> @@ -105,7 +105,7 @@ const RelayInfo = () => {
{ - removeRelay(login, unwrap(conn).Address); + removeRelay(login, unwrap(conn).address); navigate("/settings/relays"); }}> diff --git a/packages/app/src/Pages/settings/Relays.tsx b/packages/app/src/Pages/settings/Relays.tsx index 52f992bf..78b490c7 100644 --- a/packages/app/src/Pages/settings/Relays.tsx +++ b/packages/app/src/Pages/settings/Relays.tsx @@ -16,7 +16,7 @@ const RelaySettingsPage = () => { const [newRelay, setNewRelay] = useState(); const otherConnections = useMemo(() => { - return [...System.Sockets.keys()].filter(a => relays.item[a] === undefined); + return System.Sockets.filter(a => relays.item[a.address] === undefined); }, [relays]); async function saveRelays() { @@ -98,7 +98,7 @@ const RelaySettingsPage = () => {
{otherConnections.map(a => ( - + ))}
diff --git a/packages/app/src/System/Connection.ts b/packages/app/src/System/Connection.ts index 56727d58..75e2832b 100644 --- a/packages/app/src/System/Connection.ts +++ b/packages/app/src/System/Connection.ts @@ -5,8 +5,8 @@ import { ConnectionStats } from "./ConnectionStats"; import { RawEvent, ReqCommand, TaggedRawEvent, u256 } from "./Nostr"; import { RelayInfo } from "./RelayInfo"; import { unwrap } from "./Util"; +import ExternalStore from "ExternalStore"; -export type CustomHook = (state: Readonly) => void; export type AuthHandler = (challenge: string, relay: string) => Promise; /** @@ -20,7 +20,7 @@ export interface RelaySettings { /** * Snapshot of connection stats */ -export interface StateSnapshot { +export interface ConnectionStateSnapshot { connected: boolean; disconnects: number; avgLatency: number; @@ -28,13 +28,16 @@ export interface StateSnapshot { received: number; send: number; }; + settings?: RelaySettings; info?: RelayInfo; pendingRequests: Array; activeRequests: Array; id: string; + ephemeral: boolean; + address: string; } -export class Connection { +export class Connection extends ExternalStore { Id: string; Address: string; Socket: WebSocket | null = null; @@ -50,10 +53,7 @@ export class Connection { Info?: RelayInfo; ConnectTimeout: number = DefaultConnectTimeout; Stats: ConnectionStats = new ConnectionStats(); - StateHooks: Map = new Map(); HasStateChange: boolean = true; - CurrentState: StateSnapshot; - LastState: Readonly; IsClosed: boolean; ReconnectTimer: ReturnType | null; EventsCallback: Map void>; @@ -69,19 +69,10 @@ export class Connection { Down = true; constructor(addr: string, options: RelaySettings, auth?: AuthHandler, ephemeral: boolean = false) { + super(); this.Id = uuid(); this.Address = addr; this.Settings = options; - this.CurrentState = { - connected: false, - disconnects: 0, - avgLatency: 0, - events: { - received: 0, - send: 0, - }, - } as StateSnapshot; - this.LastState = Object.freeze({ ...this.CurrentState }); this.IsClosed = false; this.ReconnectTimer = null; this.EventsCallback = new Map(); @@ -146,7 +137,7 @@ export class Connection { this.ReconnectTimer = null; } this.Socket?.close(); - this.#UpdateState(); + this.notifyChange(); } OnOpen() { @@ -181,7 +172,7 @@ export class Connection { this.#ResetQueues(); // reset connection Id on disconnect, for query-tracking this.Id = uuid(); - this.#UpdateState(); + this.notifyChange(); } OnMessage(e: MessageEvent) { @@ -194,7 +185,7 @@ export class Connection { .then(() => this.#sendPendingRaw()) .catch(console.error); this.Stats.EventsReceived++; - this.#UpdateState(); + this.notifyChange(); break; } case "EVENT": { @@ -203,7 +194,7 @@ export class Connection { relays: [this.Address], }); this.Stats.EventsReceived++; - this.#UpdateState(); + this.notifyChange(); break; } case "EOSE": { @@ -235,7 +226,7 @@ export class Connection { OnError(e: Event) { console.error(e); - this.#UpdateState(); + this.notifyChange(); } /** @@ -248,7 +239,7 @@ export class Connection { const req = ["EVENT", e]; this.#SendJson(req); this.Stats.EventsSent++; - this.#UpdateState(); + this.notifyChange(); } /** @@ -271,32 +262,10 @@ export class Connection { const req = ["EVENT", e]; this.#SendJson(req); this.Stats.EventsSent++; - this.#UpdateState(); + this.notifyChange(); }); } - /** - * Hook status for connection - */ - StatusHook(fnHook: CustomHook) { - const id = uuid(); - this.StateHooks.set(id, fnHook); - return () => { - this.StateHooks.delete(id); - }; - } - - /** - * Returns the current state of this connection - */ - GetState() { - if (this.HasStateChange) { - this.LastState = Object.freeze({ ...this.CurrentState }); - this.HasStateChange = false; - } - return this.LastState; - } - /** * Using relay document to determine if this relay supports a feature */ @@ -320,7 +289,7 @@ export class Connection { this.#SendJson(cmd); cbSent(); } - this.#UpdateState(); + this.notifyChange(); } CloseReq(id: string) { @@ -329,7 +298,28 @@ export class Connection { this.OnEose?.(id); this.#SendQueuedRequests(); } - this.#UpdateState(); + this.notifyChange(); + } + + takeSnapshot(): ConnectionStateSnapshot { + return { + connected: this.Socket?.readyState === WebSocket.OPEN, + events: { + received: this.Stats.EventsReceived, + send: this.Stats.EventsSent, + }, + avgLatency: + this.Stats.Latency.length > 0 + ? this.Stats.Latency.reduce((acc, v) => acc + v, 0) / this.Stats.Latency.length + : 0, + disconnects: this.Stats.Disconnects, + info: this.Info, + id: this.Id, + pendingRequests: [...this.PendingRequests.map(a => a.cmd[1])], + activeRequests: [...this.ActiveRequests], + ephemeral: this.Ephemeral, + address: this.Address, + }; } #SendQueuedRequests() { @@ -351,30 +341,7 @@ export class Connection { this.ActiveRequests.clear(); this.PendingRequests = []; this.PendingRaw = []; - this.#UpdateState(); - } - - #UpdateState() { - this.CurrentState.connected = this.Socket?.readyState === WebSocket.OPEN; - this.CurrentState.events.received = this.Stats.EventsReceived; - this.CurrentState.events.send = this.Stats.EventsSent; - this.CurrentState.avgLatency = - this.Stats.Latency.length > 0 ? this.Stats.Latency.reduce((acc, v) => acc + v, 0) / this.Stats.Latency.length : 0; - this.CurrentState.disconnects = this.Stats.Disconnects; - this.CurrentState.info = this.Info; - this.CurrentState.id = this.Id; - this.CurrentState.pendingRequests = [...this.PendingRequests.map(a => a.cmd[1])]; - this.CurrentState.activeRequests = [...this.ActiveRequests]; - this.Stats.Latency = this.Stats.Latency.slice(-20); // trim - this.HasStateChange = true; - this.#NotifyState(); - } - - #NotifyState() { - const state = this.GetState(); - for (const [, h] of this.StateHooks) { - h(state); - } + this.notifyChange(); } #SendJson(obj: object) { diff --git a/packages/app/src/System/EventPublisher.ts b/packages/app/src/System/EventPublisher.ts index e77f4533..5fecf80f 100644 --- a/packages/app/src/System/EventPublisher.ts +++ b/packages/app/src/System/EventPublisher.ts @@ -7,6 +7,7 @@ import { Lists, RawEvent, RelaySettings, + SystemInterface, TaggedRawEvent, u256, UserMetadata, @@ -38,11 +39,6 @@ declare global { } } -interface SystemInterface { - BroadcastEvent(ev: RawEvent): void; - WriteOnceToRelay(relay: string, ev: RawEvent): Promise; -} - export class EventPublisher { #system: SystemInterface; #pubKey: string; diff --git a/packages/app/src/System/NostrSystem.ts b/packages/app/src/System/NostrSystem.ts new file mode 100644 index 00000000..9af2e9b2 --- /dev/null +++ b/packages/app/src/System/NostrSystem.ts @@ -0,0 +1,249 @@ +import debug from "debug"; +import { v4 as uuid } from "uuid"; + +import ExternalStore from "ExternalStore"; +import { RawEvent, RawReqFilter, TaggedRawEvent } from "./Nostr"; +import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./Connection"; +import { Query, QueryBase } from "./Query"; +import { RelayCache } from "./GossipModel"; +import { NoteStore } from "./NoteCollection"; +import { BuiltRawReqFilter, RequestBuilder } from "./RequestBuilder"; +import { unwrap, sanitizeRelayUrl, unixNowMs } from "./Util"; +import { SystemInterface, SystemSnapshot } from "System"; + +/** + * Manages nostr content retrieval system + */ +export class NostrSystem extends ExternalStore implements SystemInterface { + /** + * All currently connected websockets + */ + #sockets = new Map(); + + /** + * All active queries + */ + Queries: Map = new Map(); + + /** + * Handler function for NIP-42 + */ + HandleAuth?: AuthHandler; + + #log = debug("System"); + #relayCache: RelayCache; + + constructor(relayCache: RelayCache) { + super(); + this.#relayCache = relayCache; + this.#cleanup(); + } + + get Sockets(): ConnectionStateSnapshot[] { + return [...this.#sockets.values()].map(a => a.snapshot()); + } + + /** + * Connect to a NOSTR relay if not already connected + */ + async ConnectToRelay(address: string, options: RelaySettings) { + try { + const addr = unwrap(sanitizeRelayUrl(address)); + if (!this.#sockets.has(addr)) { + const c = new Connection(addr, options, this.HandleAuth?.bind(this)); + this.#sockets.set(addr, c); + c.OnEvent = (s, e) => this.OnEvent(s, e); + c.OnEose = s => this.OnEndOfStoredEvents(c, s); + c.OnDisconnect = id => this.OnRelayDisconnect(id); + await c.Connect(); + } else { + // update settings if already connected + unwrap(this.#sockets.get(addr)).Settings = options; + } + } catch (e) { + console.error(e); + } + } + + OnRelayDisconnect(id: string) { + for (const [, q] of this.Queries) { + q.connectionLost(id); + } + } + + OnEndOfStoredEvents(c: Readonly, sub: string) { + for (const [, v] of this.Queries) { + v.eose(sub, c); + } + } + + OnEvent(sub: string, ev: TaggedRawEvent) { + for (const [, v] of this.Queries) { + v.onEvent(sub, ev); + } + } + + /** + * + * @param address Relay address URL + */ + async ConnectEphemeralRelay(address: string): Promise { + try { + const addr = unwrap(sanitizeRelayUrl(address)); + if (!this.#sockets.has(addr)) { + const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true); + this.#sockets.set(addr, c); + c.OnEvent = (s, e) => this.OnEvent(s, e); + c.OnEose = s => this.OnEndOfStoredEvents(c, s); + c.OnDisconnect = id => this.OnRelayDisconnect(id); + await c.Connect(); + return c; + } + } catch (e) { + console.error(e); + } + } + + /** + * Disconnect from a relay + */ + DisconnectRelay(address: string) { + const c = this.#sockets.get(address); + if (c) { + this.#sockets.delete(address); + c.Close(); + } + } + + GetQuery(id: string): Query | undefined { + return this.Queries.get(id); + } + + Query(type: { new (): T }, req: RequestBuilder | null): Query | undefined { + if (!req) return; + + const existing = this.Queries.get(req.id); + if (existing) { + const filters = req.buildDiff(this.#relayCache, existing.filters); + if (filters.length === 0 && !req.options?.skipDiff) { + return existing; + } else { + for (const subQ of filters) { + this.SendQuery(existing, subQ).then(qta => + qta.forEach(v => this.#log("New QT from diff %s %s %O from: %O", req.id, v.id, v.filters, existing.filters)) + ); + } + this.notifyChange(); + return existing; + } + } else { + const store = new type(); + + const filters = req.build(this.#relayCache); + const q = new Query(req.id, store); + if (req.options?.leaveOpen) { + q.leaveOpen = req.options.leaveOpen; + } + + this.Queries.set(req.id, q); + for (const subQ of filters) { + this.SendQuery(q, subQ).then(qta => + qta.forEach(v => this.#log("New QT from diff %s %s %O", req.id, v.id, v.filters)) + ); + } + this.notifyChange(); + return q; + } + } + + async SendQuery(q: Query, qSend: BuiltRawReqFilter) { + if (qSend.relay) { + this.#log("Sending query to %s %O", qSend.relay, qSend); + const s = this.#sockets.get(qSend.relay); + if (s) { + const qt = q.sendToRelay(s, qSend); + if (qt) { + return [qt]; + } + } else { + const nc = await this.ConnectEphemeralRelay(qSend.relay); + if (nc) { + const qt = q.sendToRelay(nc, qSend); + if (qt) { + return [qt]; + } + } else { + console.warn("Failed to connect to new relay for:", qSend.relay, q); + } + } + } else { + const ret = []; + for (const [, s] of this.#sockets) { + if (!s.Ephemeral) { + const qt = q.sendToRelay(s, qSend); + if (qt) { + ret.push(qt); + } + } + } + return ret; + } + return []; + } + + /** + * Send events to writable relays + */ + BroadcastEvent(ev: RawEvent) { + for (const [, s] of this.#sockets) { + s.SendEvent(ev); + } + } + + /** + * Write an event to a relay then disconnect + */ + async WriteOnceToRelay(address: string, ev: RawEvent) { + return new Promise((resolve, reject) => { + const c = new Connection(address, { write: true, read: false }, this.HandleAuth, true); + + const t = setTimeout(reject, 5_000); + c.OnConnected = async () => { + clearTimeout(t); + await c.SendAsync(ev); + c.Close(); + resolve(); + }; + c.Connect(); + }); + } + + takeSnapshot(): SystemSnapshot { + return { + queries: [...this.Queries.values()].map(a => { + return { + id: a.id, + filters: a.filters, + closing: a.closing, + subFilters: [], + }; + }), + }; + } + + #cleanup() { + const now = unixNowMs(); + let changed = false; + for (const [k, v] of this.Queries) { + if (v.closingAt && v.closingAt < now) { + v.sendClose(); + this.Queries.delete(k); + changed = true; + } + } + if (changed) { + this.notifyChange(); + } + setTimeout(() => this.#cleanup(), 1_000); + } +} diff --git a/packages/app/src/System/ProfileCache.ts b/packages/app/src/System/ProfileCache.ts index da5fe5e8..fe49f371 100644 --- a/packages/app/src/System/ProfileCache.ts +++ b/packages/app/src/System/ProfileCache.ts @@ -1,4 +1,4 @@ -import { EventKind, HexKey, NostrSystem, TaggedRawEvent } from "System"; +import { EventKind, HexKey, SystemInterface, TaggedRawEvent } from "System"; import { ProfileCacheExpire } from "Const"; import { mapEventToProfile, MetadataCache } from "Cache"; import { UserCache } from "Cache/UserCache"; @@ -7,7 +7,7 @@ import { unixNowMs } from "SnortUtils"; import debug from "debug"; export class ProfileLoaderService { - #system: NostrSystem; + #system: SystemInterface; /** * List of pubkeys to fetch metadata for @@ -16,7 +16,7 @@ export class ProfileLoaderService { readonly #log = debug("ProfileCache"); - constructor(system: NostrSystem) { + constructor(system: SystemInterface) { this.#system = system; this.#FetchMetadata(); } @@ -74,8 +74,9 @@ export class ProfileLoaderService { const newProfiles = new Set(); const q = this.#system.Query(PubkeyReplaceableNoteStore, sub); + const feed = (q?.feed as PubkeyReplaceableNoteStore) ?? new PubkeyReplaceableNoteStore(); // never release this callback, it will stop firing anyway after eose - const releaseOnEvent = q.onEvent(async e => { + const releaseOnEvent = feed.onEvent(async e => { for (const pe of e) { newProfiles.add(pe.id); await this.onProfileEvent(pe); @@ -83,17 +84,17 @@ export class ProfileLoaderService { }); const results = await new Promise>>(resolve => { let timeout: ReturnType | undefined = undefined; - const release = q.hook(() => { - if (!q.loading) { + const release = feed.hook(() => { + if (!feed.loading) { clearTimeout(timeout); - resolve(q.getSnapshotData() ?? []); + resolve(feed.getSnapshotData() ?? []); this.#log("Profiles finished: %s", sub.id); release(); } }); timeout = setTimeout(() => { release(); - resolve(q.getSnapshotData() ?? []); + resolve(feed.getSnapshotData() ?? []); this.#log("Profiles timeout: %s", sub.id); }, 5_000); }); diff --git a/packages/app/src/System/Query.test.ts b/packages/app/src/System/Query.test.ts index d6617896..29a7477c 100644 --- a/packages/app/src/System/Query.test.ts +++ b/packages/app/src/System/Query.test.ts @@ -3,6 +3,7 @@ import { describe, expect } from "@jest/globals"; import { Query, QueryBase } from "./Query"; import { getRandomValues } from "crypto"; import { FlatNoteStore } from "./NoteCollection"; +import { RequestStrategy } from "./RequestBuilder"; window.crypto = {} as any; window.crypto.getRandomValues = getRandomValues as any; @@ -21,43 +22,88 @@ describe("query", () => { const c3 = new Connection("wss://three.com", opt); c3.Down = false; - q.sendToRelay(c1, { - id: "test", + const f = { + relay: "", + strategy: RequestStrategy.DefaultRelays, filters: [ { kinds: [1], authors: ["test"], }, ], - }); - q.sendToRelay(c2); - q.sendToRelay(c3); + }; + const qt1 = q.sendToRelay(c1, f); + const qt2 = q.sendToRelay(c2, f); + const qt3 = q.sendToRelay(c3, f); expect(q.progress).toBe(0); - q.eose(q.id, c1); + q.eose(qt1!.id, c1); expect(q.progress).toBe(1 / 3); - q.eose(q.id, c1); + q.eose(qt1!.id, c1); expect(q.progress).toBe(1 / 3); - q.eose(q.id, c2); + q.eose(qt2!.id, c2); expect(q.progress).toBe(2 / 3); - q.eose(q.id, c3); + q.eose(qt3!.id, c3); expect(q.progress).toBe(1); const qs = { - id: "test-1", + relay: "", + strategy: RequestStrategy.DefaultRelays, filters: [ { kinds: [1], authors: ["test-sub"], }, ], - } as QueryBase; - q.sendToRelay(c1, qs); + }; + const qt = q.sendToRelay(c1, qs); expect(q.progress).toBe(3 / 4); - q.eose(qs.id, c1); + q.eose(qt!.id, c1); expect(q.progress).toBe(1); q.sendToRelay(c2, qs); expect(q.progress).toBe(4 / 5); }); + + it("should merge all sub-query filters", () => { + const q = new Query("test", new FlatNoteStore()); + const c0 = new Connection("wss://test.com", { read: true, write: true }); + q.sendToRelay(c0, { + filters: [ + { + authors: ["a"], + kinds: [1], + }, + ], + relay: "", + strategy: RequestStrategy.DefaultRelays, + }); + q.sendToRelay(c0, { + filters: [ + { + authors: ["b"], + kinds: [1, 2], + }, + ], + relay: "", + strategy: RequestStrategy.DefaultRelays, + }); + q.sendToRelay(c0, { + filters: [ + { + authors: ["c"], + kinds: [2], + }, + ], + relay: "", + strategy: RequestStrategy.DefaultRelays, + }); + + expect(q.filters).toEqual([ + { + authors: ["a", "b", "c"], + kinds: [1, 2], + }, + ]); + }); }); diff --git a/packages/app/src/System/Query.ts b/packages/app/src/System/Query.ts index 4521d1b6..305ebe77 100644 --- a/packages/app/src/System/Query.ts +++ b/packages/app/src/System/Query.ts @@ -1,9 +1,11 @@ import { v4 as uuid } from "uuid"; import debug from "debug"; -import { Connection, RawReqFilter, Nips } from "System"; +import { Connection, RawReqFilter, Nips, TaggedRawEvent } from "System"; import { unixNowMs, unwrap } from "SnortUtils"; import { NoteStore } from "./NoteCollection"; -import { mergeSimilar } from "./RequestMerger"; +import { simpleMerge } from "./RequestMerger"; +import { eventMatchesFilter } from "./RequestMatcher"; +import { BuiltRawReqFilter } from "./RequestBuilder"; /** * Tracing for relay query status @@ -19,7 +21,6 @@ class QueryTrace { readonly #fnProgress: () => void; constructor( - readonly subId: string, readonly relay: string, readonly filters: Array, readonly connId: string, @@ -51,7 +52,7 @@ class QueryTrace { sendClose() { this.close = unixNowMs(); - this.#fnClose(this.subId); + this.#fnClose(this.id); this.#fnProgress(); } @@ -135,7 +136,6 @@ export class Query implements QueryBase { */ #feed: NoteStore; - subQueryCounter = 0; #log = debug("Query"); constructor(id: string, feed: NoteStore) { @@ -152,32 +152,37 @@ export class Query implements QueryBase { return this.#cancelTimeout; } + get filters() { + const filters = this.#tracing.flatMap(a => a.filters); + return [simpleMerge(filters)]; + } + get feed() { return this.#feed; } - get filters() { - const filters = this.#tracing.flatMap(a => a.filters); - return mergeSimilar(filters); + onEvent(sub: string, e: TaggedRawEvent) { + for (const t of this.#tracing) { + if (t.id === sub) { + this.feed.add(e); + break; + } + } } cancel() { this.#cancelTimeout = unixNowMs() + 5_000; } - unCancel() { - this.#cancelTimeout = undefined; - } - cleanup() { this.#stopCheckTraces(); } - sendToRelay(c: Connection, subq?: QueryBase) { - if (!this.#canSendQuery(c, subq ?? this)) { + sendToRelay(c: Connection, subq: BuiltRawReqFilter) { + if (!this.#canSendQuery(c, subq)) { return; } - this.#sendQueryInternal(c, subq ?? this); + return this.#sendQueryInternal(c, subq); } connectionLost(id: string) { @@ -192,7 +197,7 @@ export class Query implements QueryBase { } eose(sub: string, conn: Readonly) { - const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id); + const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id); qt?.gotEose(); if (!this.leaveOpen) { qt?.sendClose(); @@ -235,12 +240,12 @@ export class Query implements QueryBase { }, 500); } - #canSendQuery(c: Connection, q: QueryBase) { - if (q.relays && !q.relays.includes(c.Address)) { + #canSendQuery(c: Connection, q: BuiltRawReqFilter) { + if (q.relay && q.relay !== c.Address) { return false; } - if ((q.relays?.length ?? 0) === 0 && c.Ephemeral) { - this.#log("Cant send non-specific REQ to ephemeral connection %O %O %O", q, q.relays, c); + if (!q.relay && c.Ephemeral) { + this.#log("Cant send non-specific REQ to ephemeral connection %O %O %O", q, q.relay, c); return false; } if (q.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) { @@ -250,9 +255,8 @@ export class Query implements QueryBase { return true; } - #sendQueryInternal(c: Connection, q: QueryBase) { + #sendQueryInternal(c: Connection, q: BuiltRawReqFilter) { const qt = new QueryTrace( - q.id, c.Address, q.filters, c.Id, @@ -260,6 +264,7 @@ export class Query implements QueryBase { () => this.#onProgress() ); this.#tracing.push(qt); - c.QueueReq(["REQ", q.id, ...q.filters], () => qt.sentToRelay()); + c.QueueReq(["REQ", qt.id, ...q.filters], () => qt.sentToRelay()); + return qt; } } diff --git a/packages/app/src/System/RequestBuilder.test.ts b/packages/app/src/System/RequestBuilder.test.ts index 079d36c5..e2546cda 100644 --- a/packages/app/src/System/RequestBuilder.test.ts +++ b/packages/app/src/System/RequestBuilder.test.ts @@ -88,10 +88,7 @@ describe("RequestBuilder", () => { f0.authors(["a"]); expect(a).toEqual([{}]); - const b = rb.buildDiff(DummyCache, { - filters: a, - id: "test", - }); + const b = rb.buildDiff(DummyCache, a); expect(b).toMatchObject([ { filters: [{ authors: ["a"] }], diff --git a/packages/app/src/System/RequestBuilder.ts b/packages/app/src/System/RequestBuilder.ts index 56a49e85..2ee6e7ef 100644 --- a/packages/app/src/System/RequestBuilder.ts +++ b/packages/app/src/System/RequestBuilder.ts @@ -1,9 +1,7 @@ import { RawReqFilter, u256, HexKey, EventKind } from "System"; import { appendDedupe, dedupe } from "SnortUtils"; -import { QueryBase } from "./Query"; import { diffFilters } from "./RequestSplitter"; import { RelayCache, splitAllByWriteRelays, splitByWriteRelays } from "./GossipModel"; -import { mergeSimilar } from "./RequestMerger"; /** * Which strategy is used when building REQ filters @@ -84,8 +82,8 @@ export class RequestBuilder { } build(relays: RelayCache): Array { - const expanded = this.#builders.map(a => a.build(relays)).flat(); - return this.#mergeSimilar(expanded); + const expanded = this.#builders.flatMap(a => a.build(relays, this.id)); + return this.#groupByRelay(expanded); } /** @@ -93,11 +91,10 @@ export class RequestBuilder { * @param q All previous filters merged * @returns */ - buildDiff(relays: RelayCache, q: QueryBase): Array { + buildDiff(relays: RelayCache, filters: Array): Array { const next = this.buildRaw(); - const diff = diffFilters(q.filters, next); + const diff = diffFilters(filters, next); if (diff.changed) { - console.debug("DIFF", q.filters, next, diff); return splitAllByWriteRelays(relays, diff.filters).map(a => { return { strategy: RequestStrategy.AuthorsRelays, @@ -114,7 +111,7 @@ export class RequestBuilder { * @param expanded * @returns */ - #mergeSimilar(expanded: Array) { + #groupByRelay(expanded: Array) { const relayMerged = expanded.reduce((acc, v) => { const existing = acc.get(v.relay); if (existing) { @@ -125,14 +122,12 @@ export class RequestBuilder { return acc; }, new Map>()); - const filtersSquashed = [...relayMerged.values()].flatMap(a => { - return mergeSimilar(a.flatMap(b => b.filters)).map(b => { - return { - filters: [b], - relay: a[0].relay, - strategy: a[0].strategy, - } as BuiltRawReqFilter; - }); + const filtersSquashed = [...relayMerged.values()].map(a => { + return { + filters: a.flatMap(b => b.filters), + relay: a[0].relay, + strategy: a[0].strategy, + } as BuiltRawReqFilter; }); return filtersSquashed; @@ -211,7 +206,7 @@ export class RequestFilterBuilder { /** * Build/expand this filter into a set of relay specific queries */ - build(relays: RelayCache): Array { + build(relays: RelayCache, id: string): Array { // when querying for specific event ids with relay hints // take the first approach which is to split the filter by relay if (this.#filter.ids && this.#relayHints.size > 0) { diff --git a/packages/app/src/System/RequestMatcher.test.ts b/packages/app/src/System/RequestMatcher.test.ts new file mode 100644 index 00000000..49754ff5 --- /dev/null +++ b/packages/app/src/System/RequestMatcher.test.ts @@ -0,0 +1,23 @@ +import { eventMatchesFilter } from "./RequestMatcher"; + +describe("RequestMatcher", () => { + it("should match simple filter", () => { + const ev = { + id: "test", + kind: 1, + pubkey: "pubkey", + created_at: 99, + tags: [], + content: "test", + sig: "", + }; + const filter = { + ids: ["test"], + authors: ["pubkey", "other"], + kinds: [1, 2, 3], + since: 1, + before: 100, + }; + expect(eventMatchesFilter(ev, filter)).toBe(true); + }); +}); diff --git a/packages/app/src/System/RequestMatcher.ts b/packages/app/src/System/RequestMatcher.ts new file mode 100644 index 00000000..e8ffc450 --- /dev/null +++ b/packages/app/src/System/RequestMatcher.ts @@ -0,0 +1,20 @@ +import { RawEvent, RawReqFilter } from "./Nostr"; + +export function eventMatchesFilter(ev: RawEvent, filter: RawReqFilter) { + if (!(filter.ids?.includes(ev.id) ?? false)) { + return false; + } + if (!(filter.authors?.includes(ev.pubkey) ?? false)) { + return false; + } + if (!(filter.kinds?.includes(ev.kind) ?? false)) { + return false; + } + if (filter.since && ev.created_at < filter.since) { + return false; + } + if (filter.until && ev.created_at > filter.until) { + return false; + } + return true; +} diff --git a/packages/app/src/System/RequestMerger.test.ts b/packages/app/src/System/RequestMerger.test.ts index 4eac9ab7..2de15e7d 100644 --- a/packages/app/src/System/RequestMerger.test.ts +++ b/packages/app/src/System/RequestMerger.test.ts @@ -1,5 +1,5 @@ import { RawReqFilter } from "System"; -import { filterIncludes, mergeSimilar } from "./RequestMerger"; +import { filterIncludes, mergeSimilar, simpleMerge } from "./RequestMerger"; describe("RequestMerger", () => { it("should simple merge authors", () => { @@ -53,4 +53,19 @@ describe("RequestMerger", () => { } as RawReqFilter; expect(filterIncludes(bigger, smaller)).toBe(true); }); + + it("simpleMerge", () => { + const a = { + authors: ["a", "b", "c"], + since: 99, + } as RawReqFilter; + const b = { + authors: ["c", "d", "e"], + since: 100, + } as RawReqFilter; + expect(simpleMerge([a, b])).toEqual({ + authors: ["a", "b", "c", "d", "e"], + since: 100, + }); + }); }); diff --git a/packages/app/src/System/RequestMerger.ts b/packages/app/src/System/RequestMerger.ts index 79cfe1e8..a33f0226 100644 --- a/packages/app/src/System/RequestMerger.ts +++ b/packages/app/src/System/RequestMerger.ts @@ -9,7 +9,12 @@ export function mergeSimilar(filters: Array): Array return [...(canEasilyMerge.length > 0 ? [simpleMerge(canEasilyMerge)] : []), ...cannotMerge]; } -function simpleMerge(filters: Array) { +/** + * Simply flatten all filters into one + * @param filters + * @returns + */ +export function simpleMerge(filters: Array) { const result: any = {}; filters.forEach(filter => { @@ -21,7 +26,7 @@ function simpleMerge(filters: Array) { result[key] = [...new Set([...result[key], ...value])]; } } else { - throw new Error("Cannot simple merge with non-array filter properties"); + result[key] = value; } }); }); diff --git a/packages/app/src/System/RequestSplitter.test.ts b/packages/app/src/System/RequestSplitter.test.ts index 979a0330..10e78a6b 100644 --- a/packages/app/src/System/RequestSplitter.test.ts +++ b/packages/app/src/System/RequestSplitter.test.ts @@ -1,6 +1,6 @@ import { RawReqFilter } from "System"; import { describe, expect } from "@jest/globals"; -import { diffFilters } from "./RequestSplitter"; +import { diffFilters, expandFilter } from "./RequestSplitter"; describe("RequestSplitter", () => { test("single filter add value", () => { @@ -72,4 +72,33 @@ describe("RequestSplitter", () => { changed: true, }); }); + test("expand filter", () => { + const a = { + authors: ["a", "b", "c"], + kinds: [1, 2, 3], + ids: ["x", "y"], + since: 99, + limit: 10, + }; + expect(expandFilter(a)).toEqual([ + { authors: ["a"], kinds: [1], ids: ["x"], since: 99, limit: 10 }, + { authors: ["a"], kinds: [1], ids: ["y"], since: 99, limit: 10 }, + { authors: ["a"], kinds: [2], ids: ["x"], since: 99, limit: 10 }, + { authors: ["a"], kinds: [2], ids: ["y"], since: 99, limit: 10 }, + { authors: ["a"], kinds: [3], ids: ["x"], since: 99, limit: 10 }, + { authors: ["a"], kinds: [3], ids: ["y"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [1], ids: ["x"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [1], ids: ["y"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [2], ids: ["x"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [2], ids: ["y"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [3], ids: ["x"], since: 99, limit: 10 }, + { authors: ["b"], kinds: [3], ids: ["y"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [1], ids: ["x"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [1], ids: ["y"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [2], ids: ["x"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [2], ids: ["y"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [3], ids: ["x"], since: 99, limit: 10 }, + { authors: ["c"], kinds: [3], ids: ["y"], since: 99, limit: 10 }, + ]); + }); }); diff --git a/packages/app/src/System/RequestSplitter.ts b/packages/app/src/System/RequestSplitter.ts index 44fc2f91..2c942ff9 100644 --- a/packages/app/src/System/RequestSplitter.ts +++ b/packages/app/src/System/RequestSplitter.ts @@ -42,3 +42,35 @@ export function diffFilters(a: Array, b: Array) { changed: anyChanged, }; } + +/** + * Expand a filter into its most fine grained form + */ +export function expandFilter(f: RawReqFilter): Array { + const ret: Array = []; + const src = Object.entries(f); + const keys = src.filter(([, v]) => Array.isArray(v)).map(a => a[0]); + const props = src.filter(([, v]) => !Array.isArray(v)); + + function generateCombinations(index: number, currentCombination: RawReqFilter) { + if (index === keys.length) { + ret.push(currentCombination); + return; + } + + const key = keys[index]; + const values = (f as Record>)[key]; + + for (let i = 0; i < values.length; i++) { + const value = values[i]; + const updatedCombination = { ...currentCombination, [key]: [value] }; + generateCombinations(index + 1, updatedCombination); + } + } + + generateCombinations(0, { + ...Object.fromEntries(props), + }); + + return ret; +} diff --git a/packages/app/src/System/SystemWorker.ts b/packages/app/src/System/SystemWorker.ts new file mode 100644 index 00000000..eae78c98 --- /dev/null +++ b/packages/app/src/System/SystemWorker.ts @@ -0,0 +1,69 @@ +import ExternalStore from "ExternalStore"; +import { + NoteStore, + Query, + RawEvent, + RelaySettings, + RequestBuilder, + SystemSnapshot, + SystemInterface, + ConnectionStateSnapshot, + AuthHandler, +} from "System"; + +export class SystemWorker extends ExternalStore implements SystemInterface { + #port: MessagePort; + + constructor() { + super(); + if ("SharedWorker" in window) { + const worker = new SharedWorker("/system.js"); + this.#port = worker.port; + this.#port.onmessage = m => this.#onMessage(m); + } else { + throw new Error("SharedWorker is not supported"); + } + } + + HandleAuth?: AuthHandler; + + get Sockets(): ConnectionStateSnapshot[] { + throw new Error("Method not implemented."); + } + + Query(type: new () => T, req: RequestBuilder | null): Query | undefined { + throw new Error("Method not implemented."); + } + + CancelQuery(sub: string): void { + throw new Error("Method not implemented."); + } + + GetQuery(sub: string): Query | undefined { + throw new Error("Method not implemented."); + } + + ConnectToRelay(address: string, options: RelaySettings): Promise { + throw new Error("Method not implemented."); + } + + DisconnectRelay(address: string): void { + throw new Error("Method not implemented."); + } + + BroadcastEvent(ev: RawEvent): void { + throw new Error("Method not implemented."); + } + + WriteOnceToRelay(relay: string, ev: RawEvent): Promise { + throw new Error("Method not implemented."); + } + + takeSnapshot(): SystemSnapshot { + throw new Error("Method not implemented."); + } + + #onMessage(e: MessageEvent) { + console.debug(e); + } +} diff --git a/packages/app/src/System/Util.ts b/packages/app/src/System/Util.ts index 6d50c763..0e417814 100644 --- a/packages/app/src/System/Util.ts +++ b/packages/app/src/System/Util.ts @@ -32,3 +32,11 @@ export function sanitizeRelayUrl(url: string) { // ignore } } + +export function unixNow() { + return Math.floor(unixNowMs() / 1000); +} + +export function unixNowMs() { + return new Date().getTime(); +} diff --git a/packages/app/src/System/index.ts b/packages/app/src/System/index.ts index f6e75275..1bb1380e 100644 --- a/packages/app/src/System/index.ts +++ b/packages/app/src/System/index.ts @@ -1,8 +1,4 @@ -import debug from "debug"; - -import { sanitizeRelayUrl, unixNowMs, unwrap } from "SnortUtils"; -import { RawEvent, RawReqFilter, TaggedRawEvent } from "./Nostr"; -import { AuthHandler, Connection, RelaySettings, StateSnapshot } from "./Connection"; +import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./Connection"; import { RequestBuilder } from "./RequestBuilder"; import { EventBuilder } from "./EventBuilder"; import { @@ -12,10 +8,10 @@ import { ParameterizedReplaceableNoteStore, ReplaceableNoteStore, } from "./NoteCollection"; -import { Query, QueryBase } from "./Query"; -import ExternalStore from "ExternalStore"; -import { RelayCache } from "./GossipModel"; +import { Query } from "./Query"; +import { RawEvent, RawReqFilter } from "./Nostr"; +export * from "./NostrSystem"; export { default as EventKind } from "./EventKind"; export * from "./Nostr"; export * from "./Links"; @@ -23,6 +19,29 @@ export { default as Tag } from "./Tag"; export * from "./Nips"; export * from "./RelayInfo"; +export interface SystemInterface { + /** + * Handler function for NIP-42 + */ + HandleAuth?: AuthHandler; + get Sockets(): Array; + GetQuery(id: string): Query | undefined; + Query(type: { new (): T }, req: RequestBuilder | null): Query | undefined; + ConnectToRelay(address: string, options: RelaySettings): Promise; + DisconnectRelay(address: string): void; + BroadcastEvent(ev: RawEvent): void; + WriteOnceToRelay(relay: string, ev: RawEvent): Promise; +} + +export interface SystemSnapshot { + queries: Array<{ + id: string; + filters: Array; + subFilters: Array; + closing: boolean; + }>; +} + export { NoteStore, RequestBuilder, @@ -35,298 +54,5 @@ export { AuthHandler, Connection, RelaySettings, - StateSnapshot, + ConnectionStateSnapshot, }; - -export interface SystemSnapshot { - queries: Array<{ - id: string; - filters: Array; - subFilters: Array; - closing: boolean; - }>; -} - -export type HookSystemSnapshotRelease = () => void; -export type HookSystemSnapshot = () => void; - -/** - * Manages nostr content retrieval system - */ -export class NostrSystem extends ExternalStore { - /** - * All currently connected websockets - */ - Sockets: Map; - - /** - * All active queries - */ - Queries: Map = new Map(); - - /** - * Handler function for NIP-42 - */ - HandleAuth?: AuthHandler; - - #log = debug("System"); - #relayCache: RelayCache; - - constructor(relayCache: RelayCache) { - super(); - this.Sockets = new Map(); - this.#relayCache = relayCache; - this.#cleanup(); - } - - /** - * Connect to a NOSTR relay if not already connected - */ - async ConnectToRelay(address: string, options: RelaySettings) { - try { - const addr = unwrap(sanitizeRelayUrl(address)); - if (!this.Sockets.has(addr)) { - const c = new Connection(addr, options, this.HandleAuth?.bind(this)); - this.Sockets.set(addr, c); - c.OnEvent = (s, e) => this.OnEvent(s, e); - c.OnEose = s => this.OnEndOfStoredEvents(c, s); - c.OnDisconnect = id => this.OnRelayDisconnect(id); - c.OnConnected = () => { - for (const [, q] of this.Queries) { - q.sendToRelay(c, q); - } - }; - await c.Connect(); - } else { - // update settings if already connected - unwrap(this.Sockets.get(addr)).Settings = options; - } - } catch (e) { - console.error(e); - } - } - - OnRelayDisconnect(id: string) { - for (const [, q] of this.Queries) { - q.connectionLost(id); - } - } - - OnEndOfStoredEvents(c: Readonly, sub: string) { - const q = this.GetQuery(sub); - if (q) { - q.eose(sub, c); - } - } - - OnEvent(sub: string, ev: TaggedRawEvent) { - const q = this.GetQuery(sub); - if (q?.feed) { - q.feed.add(ev); - } - } - - GetQuery(sub: string) { - const subFilterId = /-\d+$/i; - if (sub.match(subFilterId)) { - // feed events back into parent query - sub = sub.split(subFilterId)[0]; - } - return this.Queries.get(sub); - } - - /** - * - * @param address Relay address URL - */ - async ConnectEphemeralRelay(address: string): Promise { - try { - const addr = unwrap(sanitizeRelayUrl(address)); - if (!this.Sockets.has(addr)) { - const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true); - this.Sockets.set(addr, c); - c.OnEvent = (s, e) => this.OnEvent(s, e); - c.OnEose = s => this.OnEndOfStoredEvents(c, s); - c.OnDisconnect = id => this.OnRelayDisconnect(id); - c.OnConnected = () => { - for (const [, q] of this.Queries) { - if (q.progress !== 1) { - q.sendToRelay(c, q); - } - } - }; - await c.Connect(); - return c; - } - } catch (e) { - console.error(e); - } - } - - /** - * Disconnect from a relay - */ - DisconnectRelay(address: string) { - const c = this.Sockets.get(address); - if (c) { - this.Sockets.delete(address); - c.Close(); - } - } - - Query(type: { new (): T }, req: RequestBuilder | null): Readonly { - /** - * ## Notes - * - * Given a set of existing filters: - * ["REQ", "1", { kinds: [0, 7], authors: [...], since: now()-1hr, until: now() }] - * ["REQ", "2", { kinds: [0, 7], authors: [...], since: now(), limit: 0 }] - * - * ## Problem 1: - * Assume we now want to update sub "1" with a new set of authors, - * what should we do, should we close sub "1" and send the new set or create another - * subscription with the new pubkeys (diff) - * - * Creating a new subscription sounds great but also is a problem when relays limit - * active subscriptions, maybe we should instead queue the new - * subscription (assuming that we expect to close at EOSE) - * - * ## Problem 2: - * When multiple filters a specifid in a single filter but only 1 filter changes, - * ~~same as above~~ - * - * Seems reasonable to do "Queue Diff", should also be possible to collapse multiple - * pending filters for the same subscription - */ - - if (!req) return new type(); - - const existing = this.Queries.get(req.id); - if (existing) { - const filters = req.buildDiff(this.#relayCache, existing); - existing.unCancel(); - - if (filters.length === 0 && !req.options?.skipDiff) { - this.notifyChange(); - return existing.feed as Readonly; - } else { - for (const subQ of filters) { - this.SendQuery(existing, { - id: `${existing.id}-${existing.subQueryCounter++}`, - filters: subQ.filters, - relays: subQ.relay ? [subQ.relay] : undefined, - }); - } - this.notifyChange(); - return existing.feed as Readonly; - } - } else { - const store = new type(); - - const filters = req.build(this.#relayCache); - const q = new Query(req.id, store); - if (req.options?.leaveOpen) { - q.leaveOpen = req.options.leaveOpen; - } - - this.Queries.set(req.id, q); - for (const subQ of filters) { - this.SendQuery(q, { - id: `${q.id}-${q.subQueryCounter++}`, - filters: subQ.filters, - relays: subQ.relay ? [subQ.relay] : undefined, - }); - } - this.notifyChange(); - return q.feed as Readonly; - } - } - - CancelQuery(sub: string) { - const q = this.Queries.get(sub); - if (q) { - q.cancel(); - } - } - - async SendQuery(q: Query, qSend: QueryBase) { - if (qSend.relays && qSend.relays.length > 0) { - for (const r of qSend.relays) { - this.#log("Sending query to %s %O", r, qSend); - const s = this.Sockets.get(r); - if (s) { - q.sendToRelay(s, qSend); - } else { - const nc = await this.ConnectEphemeralRelay(r); - if (nc) { - q.sendToRelay(nc, qSend); - } else { - console.warn("Failed to connect to new relay for:", r, q); - } - } - } - } else { - for (const [, s] of this.Sockets) { - if (!s.Ephemeral) { - q.sendToRelay(s, qSend); - } - } - } - } - - /** - * Send events to writable relays - */ - BroadcastEvent(ev: RawEvent) { - for (const [, s] of this.Sockets) { - s.SendEvent(ev); - } - } - - /** - * Write an event to a relay then disconnect - */ - async WriteOnceToRelay(address: string, ev: RawEvent) { - return new Promise((resolve, reject) => { - const c = new Connection(address, { write: true, read: false }, this.HandleAuth, true); - - const t = setTimeout(reject, 5_000); - c.OnConnected = async () => { - clearTimeout(t); - await c.SendAsync(ev); - c.Close(); - resolve(); - }; - c.Connect(); - }); - } - - takeSnapshot(): SystemSnapshot { - return { - queries: [...this.Queries.values()].map(a => { - return { - id: a.id, - filters: a.filters, - closing: a.closing, - subFilters: [], - }; - }), - }; - } - - #cleanup() { - const now = unixNowMs(); - let changed = false; - for (const [k, v] of this.Queries) { - if (v.closingAt && v.closingAt < now) { - v.sendClose(); - this.Queries.delete(k); - changed = true; - } - } - if (changed) { - this.notifyChange(); - } - setTimeout(() => this.#cleanup(), 1_000); - } -} diff --git a/packages/app/src/System/worker.ts b/packages/app/src/System/worker.ts new file mode 100644 index 00000000..4f47dad8 --- /dev/null +++ b/packages/app/src/System/worker.ts @@ -0,0 +1,21 @@ +/// +import { UsersRelaysCache } from "Cache/UserRelayCache"; +import { NostrSystem } from "."; +declare const self: SharedWorkerGlobalScope; + +const RelayCache = new UsersRelaysCache(); +const System = new NostrSystem({ + get: pk => RelayCache.getFromCache(pk)?.relays, +}); + +self.onconnect = e => { + const port = e.ports[0]; + + port.addEventListener("message", async e1 => { + console.debug(e1); + const [cmd, ...others] = e1.data; + switch (cmd) { + } + }); + port.start(); +}; diff --git a/packages/app/src/index.tsx b/packages/app/src/index.tsx index 3561d7e8..8dfb2a8a 100644 --- a/packages/app/src/index.tsx +++ b/packages/app/src/index.tsx @@ -35,9 +35,9 @@ import DebugPage from "Pages/Debug"; import { db } from "Db"; import { preload } from "Cache"; import { LoginStore } from "Login"; -import { UserRelays } from "Cache/UserRelayCache"; -import { NostrSystem } from "System"; import { ProfileLoaderService } from "System/ProfileCache"; +import { NostrSystem } from "System"; +import { UserRelays } from "Cache/UserRelayCache"; /** * Singleton nostr system