diff --git a/packages/app/src/Feed/WorkerRelayView.ts b/packages/app/src/Feed/WorkerRelayView.ts index 7f3e6125..32e219ec 100644 --- a/packages/app/src/Feed/WorkerRelayView.ts +++ b/packages/app/src/Feed/WorkerRelayView.ts @@ -13,7 +13,7 @@ export function useNotificationsView() { rb.withOptions({ leaveOpen: true, }); - rb.withFilter().kinds(kinds).tag("p", [publicKey]).limit(100); + rb.withFilter().kinds(kinds).tag("p", [publicKey]); return rb; } }, [publicKey]); diff --git a/packages/app/src/Pages/Notifications/Notifications.tsx b/packages/app/src/Pages/Notifications/Notifications.tsx index c6d7e3e1..fe6cf4f0 100644 --- a/packages/app/src/Pages/Notifications/Notifications.tsx +++ b/packages/app/src/Pages/Notifications/Notifications.tsx @@ -2,7 +2,7 @@ import "./Notifications.css"; import { unwrap } from "@snort/shared"; import { NostrEvent, NostrLink, TaggedNostrEvent } from "@snort/system"; -import { lazy, Suspense, useEffect, useMemo } from "react"; +import { lazy, Suspense, useEffect, useMemo, useState } from "react"; import { AutoLoadMore } from "@/Components/Event/LoadMore"; import PageSpinner from "@/Components/PageSpinner"; @@ -19,6 +19,7 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL const login = useLogin(); const { isMuted } = useModeration(); const groupInterval = 3600 * 6; + const [limit, setLimit] = useState(100); useEffect(() => { markNotificationsRead(login); @@ -32,14 +33,16 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL }; const myNotifications = useMemo(() => { - return notifications.filter(a => !isMuted(a.pubkey) && a.tags.some(b => b[0] === "p" && b[1] === login.publicKey)); - }, [notifications, login.publicKey]); + return notifications + .sort((a, b) => a.created_at > b.created_at ? -1 : 1) + .slice(0, limit) + .filter(a => !isMuted(a.pubkey) && a.tags.some(b => b[0] === "p" && b[1] === login.publicKey)); + }, [notifications, login.publicKey, limit]); const timeGrouped = useMemo(() => { return myNotifications.reduce((acc, v) => { - const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${ - v.kind - }`; + const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${v.kind + }`; if (acc.has(key)) { unwrap(acc.get(key)).push(v as TaggedNostrEvent); } else { @@ -54,13 +57,13 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL
{CONFIG.features.notificationGraph && ( }> - + )} {login.publicKey && [...timeGrouped.entries()].map(([k, g]) => )} - {}} /> + { setLimit(l => l + 100) }} />
); diff --git a/packages/app/src/Pages/settings/tools/sync-account.tsx b/packages/app/src/Pages/settings/tools/sync-account.tsx index 47f8eeef..ee627b3f 100644 --- a/packages/app/src/Pages/settings/tools/sync-account.tsx +++ b/packages/app/src/Pages/settings/tools/sync-account.tsx @@ -20,7 +20,7 @@ export default function SyncAccountTool() { const relays = Object.entries(myRelays) .filter(([, v]) => v.write) .map(([k]) => k); - const sync = new RangeSync(system); + const sync = RangeSync.forSystem(system); sync.on("event", evs => { setResults(r => [...r, ...evs]); }); diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts index 93570609..2abf8d73 100644 --- a/packages/system/src/connection.ts +++ b/packages/system/src/connection.ts @@ -8,9 +8,11 @@ import { DefaultConnectTimeout } from "./const"; import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr"; import { RelayInfo } from "./relay-info"; import EventKind from "./event-kind"; -import { EventExt } from "./event-ext"; +import { EventExt, EventType } from "./event-ext"; import { NegentropyFlow } from "./negentropy/negentropy-flow"; import { ConnectionType, ConnectionTypeEvents } from "./connection-pool"; +import { RangeSync } from "./sync"; +import { NoteCollection } from "./note-collection"; /** * Relay settings @@ -395,15 +397,36 @@ export class Connection extends EventEmitter implements Co } else if (cmd[0] === "SYNC") { const [_, id, eventSet, ...filters] = cmd; const lastResortSync = () => { - if (filters.some(a => a.since || a.until || a.ids)) { + const isReplacableSync = filters.every(a => a.kinds?.every(b => EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable) ?? false); + if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplacableSync) { this.request(["REQ", id, ...filters], item.cb); } else { + const rs = RangeSync.forFetcher(async (rb, cb) => { + return await new Promise((resolve, reject) => { + const results = new NoteCollection(); + const f = rb.buildRaw(); + this.on("event", (c, e) => { + if (rb.id === c) { + cb?.([e]); + results.add(e); + } + }); + this.on("eose", s => { + if (s === rb.id) { + resolve(results.takeSnapshot()); + } + }); + this.request(["REQ", rb.id, ...f], undefined); + }); + }); const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0); - const newFilters = filters.map(a => ({ - ...a, - since: latest + 1, - })); - this.request(["REQ", id, ...newFilters], item.cb); + rs.setStartPoint(latest + 1); + rs.on("event", ev => { + ev.forEach(e => this.emit("event", id, e)); + }); + for (const f of filters) { + rs.sync(f); + } } }; if (this.info?.negentropy === "v1") { diff --git a/packages/system/src/sync/range-sync.ts b/packages/system/src/sync/range-sync.ts index 45d1ab6b..c109487e 100644 --- a/packages/system/src/sync/range-sync.ts +++ b/packages/system/src/sync/range-sync.ts @@ -1,6 +1,7 @@ import { unixNow } from "@snort/shared"; import EventEmitter from "eventemitter3"; import { ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from ".."; +import { v4 as uuid } from "uuid"; /** * When nostr was created @@ -16,13 +17,27 @@ interface RangeSyncEvents { * A simple time based sync for pulling lots of data from nostr */ export class RangeSync extends EventEmitter { + #id = uuid(); #start: number = NostrBirthday; #windowSize: number = 60 * 60 * 12; + #fetcher!: SystemInterface["Fetch"]; - constructor(readonly system: SystemInterface) { + private constructor() { super(); } + static forSystem(system: SystemInterface) { + const rs = new RangeSync(); + rs.#fetcher = (r, c) => system.Fetch(r, c); + return rs; + } + + static forFetcher(fn: SystemInterface["Fetch"]) { + const rs = new RangeSync(); + rs.#fetcher = fn; + return rs; + } + /** * Set window size in seconds */ @@ -52,18 +67,15 @@ export class RangeSync extends EventEmitter { throw new Error("Filter must not contain since/until/limit"); } - if (!this.system.requestRouter) { - throw new Error("RangeSync cannot work without request router!"); - } - const now = unixNow(); + let ctr = 1; for (let end = now; end > this.#start; end -= this.#windowSize) { - const rb = new RequestBuilder(`range-query:${end}`); + const rb = new RequestBuilder(`${this.#id}+${ctr++}`); rb.withBareFilter(filter) .since(end - this.#windowSize) .until(end); this.emit("scan", end); - const results = await this.system.Fetch(rb); + const results = await this.#fetcher(rb); this.emit("event", results); } }