diff --git a/packages/app/src/Pages/Notifications/Notifications.tsx b/packages/app/src/Pages/Notifications/Notifications.tsx
index fe6cf4f0..75c596bc 100644
--- a/packages/app/src/Pages/Notifications/Notifications.tsx
+++ b/packages/app/src/Pages/Notifications/Notifications.tsx
@@ -34,15 +34,16 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL
const myNotifications = useMemo(() => {
return notifications
- .sort((a, b) => a.created_at > b.created_at ? -1 : 1)
+ .sort((a, b) => (a.created_at > b.created_at ? -1 : 1))
.slice(0, limit)
.filter(a => !isMuted(a.pubkey) && a.tags.some(b => b[0] === "p" && b[1] === login.publicKey));
}, [notifications, login.publicKey, limit]);
const timeGrouped = useMemo(() => {
return myNotifications.reduce((acc, v) => {
- const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${v.kind
- }`;
+ const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${
+ v.kind
+ }`;
if (acc.has(key)) {
unwrap(acc.get(key)).push(v as TaggedNostrEvent);
} else {
@@ -63,7 +64,11 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL
{login.publicKey &&
[...timeGrouped.entries()].map(([k, g]) => )}
- { setLimit(l => l + 100) }} />
+ {
+ setLimit(l => l + 100);
+ }}
+ />
>
);
diff --git a/packages/app/src/lang.json b/packages/app/src/lang.json
index cf36ef3b..2ab3d24b 100644
--- a/packages/app/src/lang.json
+++ b/packages/app/src/lang.json
@@ -1778,6 +1778,9 @@
"yCLnBC": {
"defaultMessage": "LNURL or Lightning Address"
},
+ "z3UjXR": {
+ "defaultMessage": "Debug"
+ },
"zCb8fX": {
"defaultMessage": "Weight"
},
diff --git a/packages/app/src/translations/en.json b/packages/app/src/translations/en.json
index e78935c6..24b6395f 100644
--- a/packages/app/src/translations/en.json
+++ b/packages/app/src/translations/en.json
@@ -590,6 +590,7 @@
"y1Z3or": "Language",
"yAztTU": "{n} eSats",
"yCLnBC": "LNURL or Lightning Address",
+ "z3UjXR": "Debug",
"zCb8fX": "Weight",
"zFegDD": "Contact",
"zINlao": "Owner",
diff --git a/packages/system/src/connection-pool.ts b/packages/system/src/connection-pool.ts
index 2922d885..dad5eace 100644
--- a/packages/system/src/connection-pool.ts
+++ b/packages/system/src/connection-pool.ts
@@ -5,6 +5,7 @@ import { EventEmitter } from "eventemitter3";
import { Connection, RelaySettings, SyncCommand } from "./connection";
import { NostrEvent, OkResponse, ReqCommand, TaggedNostrEvent } from "./nostr";
import { RelayInfo, SystemInterface } from ".";
+import { ConnectionSyncModule, DefaultSyncModule } from "./sync/connection";
/**
* Events which the ConnectionType must emit
@@ -93,6 +94,7 @@ export type ConnectionBuilder = (
address: string,
options: RelaySettings,
ephemeral: boolean,
+ syncModule?: ConnectionSyncModule,
) => Promise | T;
/**
@@ -105,6 +107,11 @@ export class DefaultConnectionPool
#system: SystemInterface;
#log = debug("ConnectionPool");
+ /**
+ * Track if a connection request has started
+ */
+ #connectStarted = new Set();
+
/**
* All currently connected websockets
*/
@@ -122,7 +129,8 @@ export class DefaultConnectionPool
this.#connectionBuilder = builder;
} else {
this.#connectionBuilder = (addr, options, ephemeral) => {
- return new Connection(addr, options, ephemeral) as unknown as T;
+ const sync = new DefaultSyncModule(this.#system.config.fallbackSync);
+ return new Connection(addr, options, ephemeral, sync) as unknown as T;
};
}
}
@@ -140,12 +148,14 @@ export class DefaultConnectionPool
*/
async connect(address: string, options: RelaySettings, ephemeral: boolean) {
const addr = unwrap(sanitizeRelayUrl(address));
+ if (this.#connectStarted.has(addr)) return;
+ this.#connectStarted.add(addr);
+
try {
const existing = this.#sockets.get(addr);
if (!existing) {
const c = await this.#connectionBuilder(addr, options, ephemeral);
this.#sockets.set(addr, c);
-
c.on("event", (s, e) => {
if (this.#system.checkSigs && !this.#system.optimizer.schnorrVerify(e)) {
this.#log("Reject invalid event %o", e);
@@ -177,6 +187,8 @@ export class DefaultConnectionPool
this.#log("%O", e);
this.emit("connectFailed", addr);
this.#sockets.delete(addr);
+ } finally {
+ this.#connectStarted.delete(addr);
}
}
diff --git a/packages/system/src/connection.ts b/packages/system/src/connection.ts
index 2abf8d73..18cc7df7 100644
--- a/packages/system/src/connection.ts
+++ b/packages/system/src/connection.ts
@@ -1,18 +1,16 @@
import { v4 as uuid } from "uuid";
import debug from "debug";
import WebSocket from "isomorphic-ws";
-import { unixNowMs, dedupe } from "@snort/shared";
+import { unixNowMs } from "@snort/shared";
import { EventEmitter } from "eventemitter3";
import { DefaultConnectTimeout } from "./const";
import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr";
import { RelayInfo } from "./relay-info";
import EventKind from "./event-kind";
-import { EventExt, EventType } from "./event-ext";
-import { NegentropyFlow } from "./negentropy/negentropy-flow";
+import { EventExt } from "./event-ext";
import { ConnectionType, ConnectionTypeEvents } from "./connection-pool";
-import { RangeSync } from "./sync";
-import { NoteCollection } from "./note-collection";
+import { ConnectionSyncModule } from "./sync/connection";
/**
* Relay settings
@@ -46,6 +44,7 @@ export class Connection extends EventEmitter implements Co
#downCount = 0;
#activeRequests = new Set();
#connectStarted = false;
+ #syncModule?: ConnectionSyncModule;
id: string;
readonly address: string;
@@ -64,7 +63,7 @@ export class Connection extends EventEmitter implements Co
AwaitingAuth: Map;
Authed = false;
- constructor(addr: string, options: RelaySettings, ephemeral: boolean = false) {
+ constructor(addr: string, options: RelaySettings, ephemeral: boolean = false, syncModule?: ConnectionSyncModule) {
super();
this.id = uuid();
this.address = addr;
@@ -72,6 +71,7 @@ export class Connection extends EventEmitter implements Co
this.EventsCallback = new Map();
this.AwaitingAuth = new Map();
this.#ephemeral = ephemeral;
+ this.#syncModule = syncModule;
this.#log = debug("Connection").extend(addr);
}
@@ -395,58 +395,10 @@ export class Connection extends EventEmitter implements Co
this.#activeRequests.add(cmd[1]);
this.#send(cmd);
} else if (cmd[0] === "SYNC") {
- const [_, id, eventSet, ...filters] = cmd;
- const lastResortSync = () => {
- const isReplacableSync = filters.every(a => a.kinds?.every(b => EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable) ?? false);
- if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplacableSync) {
- this.request(["REQ", id, ...filters], item.cb);
- } else {
- const rs = RangeSync.forFetcher(async (rb, cb) => {
- return await new Promise((resolve, reject) => {
- const results = new NoteCollection();
- const f = rb.buildRaw();
- this.on("event", (c, e) => {
- if (rb.id === c) {
- cb?.([e]);
- results.add(e);
- }
- });
- this.on("eose", s => {
- if (s === rb.id) {
- resolve(results.takeSnapshot());
- }
- });
- this.request(["REQ", rb.id, ...f], undefined);
- });
- });
- const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
- rs.setStartPoint(latest + 1);
- rs.on("event", ev => {
- ev.forEach(e => this.emit("event", id, e));
- });
- for (const f of filters) {
- rs.sync(f);
- }
- }
- };
- if (this.info?.negentropy === "v1") {
- const newFilters = filters;
- const neg = new NegentropyFlow(id, this, eventSet, newFilters);
- neg.once("finish", filters => {
- if (filters.length > 0) {
- this.request(["REQ", cmd[1], ...filters], item.cb);
- } else {
- // no results to query, emulate closed
- this.emit("closed", id, "Nothing to sync");
- }
- });
- neg.once("error", () => {
- lastResortSync();
- });
- neg.start();
- } else {
- lastResortSync();
+ if (!this.#syncModule) {
+ throw new Error("no sync module");
}
+ this.#syncModule.sync(this, cmd, item.cb);
}
} catch (e) {
console.error(e);
diff --git a/packages/system/src/nostr-system.ts b/packages/system/src/nostr-system.ts
index 83f15a4e..f38aaba1 100644
--- a/packages/system/src/nostr-system.ts
+++ b/packages/system/src/nostr-system.ts
@@ -1,97 +1,33 @@
import debug from "debug";
-import { EventEmitter } from "eventemitter3";
-import { CachedTable, unixNowMs } from "@snort/shared";
+import { unixNowMs } from "@snort/shared";
import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr";
import { RelaySettings } from "./connection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { RelayMetricHandler } from "./relay-metric-handler";
import {
- CachedMetadata,
ProfileLoaderService,
- RelayMetrics,
SystemInterface,
SystemSnapshot,
- UserProfileCache,
- UserRelaysCache,
- RelayMetricCache,
- UsersRelays,
QueryLike,
OutboxModel,
socialGraphInstance,
EventKind,
- UsersFollows,
ID,
- NostrSystemEvents,
SystemConfig,
} from ".";
-import { EventsCache } from "./cache/events";
import { RelayMetadataLoader } from "./outbox";
-import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
import { QueryManager } from "./query-manager";
-import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "./request-router";
-import { UserFollowsCache } from "./cache/user-follows-lists";
+import { SystemBase } from "./system-base";
/**
* Manages nostr content retrieval system
*/
-export class NostrSystem extends EventEmitter implements SystemInterface {
+export class NostrSystem extends SystemBase implements SystemInterface {
#log = debug("System");
#queryManager: QueryManager;
- #config: SystemConfig;
-
- /**
- * Storage class for user relay lists
- */
- get relayCache(): CachedTable {
- return this.#config.relays;
- }
-
- /**
- * Storage class for user profiles
- */
- get profileCache(): CachedTable {
- return this.#config.profiles;
- }
-
- /**
- * Storage class for relay metrics (connects/disconnects)
- */
- get relayMetricsCache(): CachedTable {
- return this.#config.relayMetrics;
- }
-
- /**
- * Optimizer instance, contains optimized functions for processing data
- */
- get optimizer(): Optimizer {
- return this.#config.optimizer;
- }
-
- get eventsCache(): CachedTable {
- return this.#config.events;
- }
-
- get userFollowsCache(): CachedTable {
- return this.#config.contactLists;
- }
-
- get cacheRelay(): CacheRelay | undefined {
- return this.#config.cachingRelay;
- }
-
- /**
- * Check event signatures (recommended)
- */
- get checkSigs(): boolean {
- return this.#config.checkSigs;
- }
-
- set checkSigs(v: boolean) {
- this.#config.checkSigs = v;
- }
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
@@ -100,39 +36,26 @@ export class NostrSystem extends EventEmitter implements Syst
readonly requestRouter: RequestRouter | undefined;
constructor(props: Partial) {
- super();
- this.#config = {
- relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
- profiles: props.profiles ?? new UserProfileCache(props.db?.users),
- relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
- events: props.events ?? new EventsCache(props.db?.events),
- contactLists: props.contactLists ?? new UserFollowsCache(props.db?.contacts),
- optimizer: props.optimizer ?? DefaultOptimizer,
- checkSigs: props.checkSigs ?? false,
- cachingRelay: props.cachingRelay,
- db: props.db,
- automaticOutboxModel: props.automaticOutboxModel ?? true,
- buildFollowGraph: props.buildFollowGraph ?? false,
- };
+ super(props);
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
// if automatic outbox model, setup request router as OutboxModel
- if (this.#config.automaticOutboxModel) {
+ if (this.config.automaticOutboxModel) {
this.requestRouter = OutboxModel.fromSystem(this);
}
// Cache everything
- if (this.#config.cachingRelay) {
+ if (this.config.cachingRelay) {
this.on("event", async (_, ev) => {
- await this.#config.cachingRelay?.event(ev);
+ await this.config.cachingRelay?.event(ev);
});
}
// Hook on-event when building follow graph
- if (this.#config.buildFollowGraph) {
+ if (this.config.buildFollowGraph) {
let evBuf: Array = [];
let t: ReturnType | undefined;
this.on("event", (_, ev) => {
@@ -213,7 +136,7 @@ export class NostrSystem extends EventEmitter implements Syst
async PreloadSocialGraph() {
// Insert data to socialGraph from cache
- if (this.#config.buildFollowGraph) {
+ if (this.config.buildFollowGraph) {
for (const list of this.userFollowsCache.snapshot()) {
const user = ID(list.pubkey);
for (const fx of list.follows) {
diff --git a/packages/system/src/query.ts b/packages/system/src/query.ts
index 5fa4d33f..7e3058d5 100644
--- a/packages/system/src/query.ts
+++ b/packages/system/src/query.ts
@@ -412,8 +412,12 @@ export class Query extends EventEmitter {
}
});
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
- if (this.request.options?.fillStore ?? true) {
- this.handleEvent(sub, ev);
+ if ((this.request.options?.fillStore ?? true) && qt.id === sub) {
+ if (qt.filters.some(v => eventMatchesFilter(ev, v))) {
+ this.feed.add(ev);
+ } else {
+ this.#log("Event did not match filter, rejecting %O %O", ev, qt);
+ }
}
};
const eoseHandler = (sub: string) => {
diff --git a/packages/system/src/sync/connection.ts b/packages/system/src/sync/connection.ts
new file mode 100644
index 00000000..35e97710
--- /dev/null
+++ b/packages/system/src/sync/connection.ts
@@ -0,0 +1,111 @@
+import { Connection, SyncCommand } from "../connection";
+import { FallbackSyncMethod } from "../system";
+import { EventExt, EventType } from "../event-ext";
+import { NoteCollection } from "../note-collection";
+import { RangeSync } from "./range-sync";
+import { NegentropyFlow } from "../negentropy/negentropy-flow";
+
+export interface ConnectionSyncModule {
+ sync: (c: Connection, item: SyncCommand, cb?: () => void) => void;
+}
+
+export class DefaultSyncModule implements ConnectionSyncModule {
+ constructor(readonly method: FallbackSyncMethod) {}
+
+ sync(c: Connection, item: SyncCommand, cb?: () => void) {
+ const [_, id, eventSet, ...filters] = item;
+ if (c.info?.negentropy === "v1") {
+ const newFilters = filters;
+ const neg = new NegentropyFlow(id, c, eventSet, newFilters);
+ neg.once("finish", filters => {
+ if (filters.length > 0) {
+ c.request(["REQ", id, ...filters], cb);
+ } else {
+ // no results to query, emulate closed
+ c.emit("closed", id, "Nothing to sync");
+ }
+ });
+ neg.once("error", () => {
+ this.#fallbackSync(c, item, cb);
+ });
+ neg.start();
+ } else {
+ this.#fallbackSync(c, item, cb);
+ }
+ }
+
+ #fallbackSync(c: Connection, item: SyncCommand, cb?: () => void) {
+ const [type, id, eventSet, ...filters] = item;
+ if (type !== "SYNC") throw new Error("Must be a SYNC command");
+
+ // if the event is replaceable there is no need to use any special sync query,
+ // just send the filters directly
+ const isReplaceableSync = filters.every(
+ a =>
+ a.kinds?.every(
+ b =>
+ EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable,
+ ) ?? false,
+ );
+ if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) {
+ c.request(["REQ", id, ...filters], cb);
+ } else if (this.method === FallbackSyncMethod.Since) {
+ this.#syncSince(c, item, cb);
+ } else if (this.method === FallbackSyncMethod.RangeSync) {
+ this.#syncRangeSync(c, item, cb);
+ } else {
+ throw new Error("No fallback sync method");
+ }
+ }
+
+ /**
+ * Using the latest data, fetch only newer items
+ *
+ * The downfall of this method is when the dataset is truncated by the relay (ie. limit results to 1000 items)
+ */
+ #syncSince(c: Connection, item: SyncCommand, cb?: () => void) {
+ const [type, id, eventSet, ...filters] = item;
+ if (type !== "SYNC") throw new Error("Must be a SYNC command");
+ const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
+ const newFilters = filters.map(a => ({
+ ...a,
+ since: latest + 1,
+ }));
+ c.request(["REQ", id, ...newFilters], cb);
+ }
+
+ /**
+ * Using the RangeSync class, sync data using fixed window size
+ */
+ #syncRangeSync(c: Connection, item: SyncCommand, cb?: () => void) {
+ const [type, id, eventSet, ...filters] = item;
+ if (type !== "SYNC") throw new Error("Must be a SYNC command");
+
+ const rs = RangeSync.forFetcher(async (rb, cb) => {
+ return await new Promise((resolve, reject) => {
+ const results = new NoteCollection();
+ const f = rb.buildRaw();
+ c.on("event", (c, e) => {
+ if (rb.id === c) {
+ cb?.([e]);
+ results.add(e);
+ }
+ });
+ c.on("eose", s => {
+ if (s === rb.id) {
+ resolve(results.takeSnapshot());
+ }
+ });
+ c.request(["REQ", rb.id, ...f], undefined);
+ });
+ });
+ const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
+ rs.setStartPoint(latest + 1);
+ rs.on("event", ev => {
+ ev.forEach(e => c.emit("event", id, e));
+ });
+ for (const f of filters) {
+ rs.sync(f);
+ }
+ }
+}
diff --git a/packages/system/src/system-base.ts b/packages/system/src/system-base.ts
index a150c1e5..2cc1e908 100644
--- a/packages/system/src/system-base.ts
+++ b/packages/system/src/system-base.ts
@@ -5,14 +5,19 @@ import { EventsCache } from "./cache/events";
import { UserFollowsCache } from "./cache/user-follows-lists";
import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from "./index";
import { DefaultOptimizer, Optimizer } from "./query-optimizer";
-import { NostrSystemEvents, SystemConfig } from "./system";
+import { FallbackSyncMethod, NostrSystemEvents, SystemConfig } from "./system";
import { EventEmitter } from "eventemitter3";
export abstract class SystemBase extends EventEmitter {
#config: SystemConfig;
+ get config() {
+ return this.#config;
+ }
+
constructor(props: Partial) {
super();
+
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
@@ -25,6 +30,7 @@ export abstract class SystemBase extends EventEmitter {
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
buildFollowGraph: props.buildFollowGraph ?? false,
+ fallbackSync: props.fallbackSync ?? FallbackSyncMethod.Since,
};
}
diff --git a/packages/system/src/system.ts b/packages/system/src/system.ts
index 427e8685..760fe480 100644
--- a/packages/system/src/system.ts
+++ b/packages/system/src/system.ts
@@ -91,6 +91,16 @@ export interface SystemConfig {
* for users when fetching by author.
*/
buildFollowGraph: boolean;
+
+ /**
+ * Pick a fallback sync method when negentropy is not available
+ */
+ fallbackSync: FallbackSyncMethod;
+}
+
+export enum FallbackSyncMethod {
+ Since = "since",
+ RangeSync = "range-sync",
}
export interface SystemInterface {
@@ -200,6 +210,8 @@ export interface SystemInterface {
* Request router instance
*/
get requestRouter(): RequestRouter | undefined;
+
+ get config(): SystemConfig;
}
export interface SystemSnapshot {