refactor: move connection sync module
Some checks reported errors
continuous-integration/drone/push Build encountered an error

This commit is contained in:
kieran 2024-06-05 13:08:55 +01:00
parent 4185f117cb
commit 57bf51c41c
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
10 changed files with 181 additions and 152 deletions

View File

@ -34,15 +34,16 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL
const myNotifications = useMemo(() => {
return notifications
.sort((a, b) => a.created_at > b.created_at ? -1 : 1)
.sort((a, b) => (a.created_at > b.created_at ? -1 : 1))
.slice(0, limit)
.filter(a => !isMuted(a.pubkey) && a.tags.some(b => b[0] === "p" && b[1] === login.publicKey));
}, [notifications, login.publicKey, limit]);
const timeGrouped = useMemo(() => {
return myNotifications.reduce((acc, v) => {
const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${v.kind
}`;
const key = `${timeKey(v)}:${getNotificationContext(v as TaggedNostrEvent)?.encode(CONFIG.eventLinkPrefix)}:${
v.kind
}`;
if (acc.has(key)) {
unwrap(acc.get(key)).push(v as TaggedNostrEvent);
} else {
@ -63,7 +64,11 @@ export default function NotificationsPage({ onClick }: { onClick?: (link: NostrL
{login.publicKey &&
[...timeGrouped.entries()].map(([k, g]) => <NotificationGroup key={k} evs={g} onClick={onClick} />)}
<AutoLoadMore onClick={() => { setLimit(l => l + 100) }} />
<AutoLoadMore
onClick={() => {
setLimit(l => l + 100);
}}
/>
</div>
</>
);

View File

@ -1778,6 +1778,9 @@
"yCLnBC": {
"defaultMessage": "LNURL or Lightning Address"
},
"z3UjXR": {
"defaultMessage": "Debug"
},
"zCb8fX": {
"defaultMessage": "Weight"
},

View File

@ -590,6 +590,7 @@
"y1Z3or": "Language",
"yAztTU": "{n} eSats",
"yCLnBC": "LNURL or Lightning Address",
"z3UjXR": "Debug",
"zCb8fX": "Weight",
"zFegDD": "Contact",
"zINlao": "Owner",

View File

@ -5,6 +5,7 @@ import { EventEmitter } from "eventemitter3";
import { Connection, RelaySettings, SyncCommand } from "./connection";
import { NostrEvent, OkResponse, ReqCommand, TaggedNostrEvent } from "./nostr";
import { RelayInfo, SystemInterface } from ".";
import { ConnectionSyncModule, DefaultSyncModule } from "./sync/connection";
/**
* Events which the ConnectionType must emit
@ -93,6 +94,7 @@ export type ConnectionBuilder<T extends ConnectionType> = (
address: string,
options: RelaySettings,
ephemeral: boolean,
syncModule?: ConnectionSyncModule,
) => Promise<T> | T;
/**
@ -105,6 +107,11 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
#system: SystemInterface;
#log = debug("ConnectionPool");
/**
* Track if a connection request has started
*/
#connectStarted = new Set<string>();
/**
* All currently connected websockets
*/
@ -122,7 +129,8 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
this.#connectionBuilder = builder;
} else {
this.#connectionBuilder = (addr, options, ephemeral) => {
return new Connection(addr, options, ephemeral) as unknown as T;
const sync = new DefaultSyncModule(this.#system.config.fallbackSync);
return new Connection(addr, options, ephemeral, sync) as unknown as T;
};
}
}
@ -140,12 +148,14 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
*/
async connect(address: string, options: RelaySettings, ephemeral: boolean) {
const addr = unwrap(sanitizeRelayUrl(address));
if (this.#connectStarted.has(addr)) return;
this.#connectStarted.add(addr);
try {
const existing = this.#sockets.get(addr);
if (!existing) {
const c = await this.#connectionBuilder(addr, options, ephemeral);
this.#sockets.set(addr, c);
c.on("event", (s, e) => {
if (this.#system.checkSigs && !this.#system.optimizer.schnorrVerify(e)) {
this.#log("Reject invalid event %o", e);
@ -177,6 +187,8 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
this.#log("%O", e);
this.emit("connectFailed", addr);
this.#sockets.delete(addr);
} finally {
this.#connectStarted.delete(addr);
}
}

View File

@ -1,18 +1,16 @@
import { v4 as uuid } from "uuid";
import debug from "debug";
import WebSocket from "isomorphic-ws";
import { unixNowMs, dedupe } from "@snort/shared";
import { unixNowMs } from "@snort/shared";
import { EventEmitter } from "eventemitter3";
import { DefaultConnectTimeout } from "./const";
import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr";
import { RelayInfo } from "./relay-info";
import EventKind from "./event-kind";
import { EventExt, EventType } from "./event-ext";
import { NegentropyFlow } from "./negentropy/negentropy-flow";
import { EventExt } from "./event-ext";
import { ConnectionType, ConnectionTypeEvents } from "./connection-pool";
import { RangeSync } from "./sync";
import { NoteCollection } from "./note-collection";
import { ConnectionSyncModule } from "./sync/connection";
/**
* Relay settings
@ -46,6 +44,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
#downCount = 0;
#activeRequests = new Set<string>();
#connectStarted = false;
#syncModule?: ConnectionSyncModule;
id: string;
readonly address: string;
@ -64,7 +63,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
AwaitingAuth: Map<string, boolean>;
Authed = false;
constructor(addr: string, options: RelaySettings, ephemeral: boolean = false) {
constructor(addr: string, options: RelaySettings, ephemeral: boolean = false, syncModule?: ConnectionSyncModule) {
super();
this.id = uuid();
this.address = addr;
@ -72,6 +71,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
this.EventsCallback = new Map();
this.AwaitingAuth = new Map();
this.#ephemeral = ephemeral;
this.#syncModule = syncModule;
this.#log = debug("Connection").extend(addr);
}
@ -395,58 +395,10 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
this.#activeRequests.add(cmd[1]);
this.#send(cmd);
} else if (cmd[0] === "SYNC") {
const [_, id, eventSet, ...filters] = cmd;
const lastResortSync = () => {
const isReplacableSync = filters.every(a => a.kinds?.every(b => EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable) ?? false);
if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplacableSync) {
this.request(["REQ", id, ...filters], item.cb);
} else {
const rs = RangeSync.forFetcher(async (rb, cb) => {
return await new Promise((resolve, reject) => {
const results = new NoteCollection();
const f = rb.buildRaw();
this.on("event", (c, e) => {
if (rb.id === c) {
cb?.([e]);
results.add(e);
}
});
this.on("eose", s => {
if (s === rb.id) {
resolve(results.takeSnapshot());
}
});
this.request(["REQ", rb.id, ...f], undefined);
});
});
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
rs.setStartPoint(latest + 1);
rs.on("event", ev => {
ev.forEach(e => this.emit("event", id, e));
});
for (const f of filters) {
rs.sync(f);
}
}
};
if (this.info?.negentropy === "v1") {
const newFilters = filters;
const neg = new NegentropyFlow(id, this, eventSet, newFilters);
neg.once("finish", filters => {
if (filters.length > 0) {
this.request(["REQ", cmd[1], ...filters], item.cb);
} else {
// no results to query, emulate closed
this.emit("closed", id, "Nothing to sync");
}
});
neg.once("error", () => {
lastResortSync();
});
neg.start();
} else {
lastResortSync();
if (!this.#syncModule) {
throw new Error("no sync module");
}
this.#syncModule.sync(this, cmd, item.cb);
}
} catch (e) {
console.error(e);

View File

@ -1,97 +1,33 @@
import debug from "debug";
import { EventEmitter } from "eventemitter3";
import { CachedTable, unixNowMs } from "@snort/shared";
import { unixNowMs } from "@snort/shared";
import { NostrEvent, TaggedNostrEvent, OkResponse } from "./nostr";
import { RelaySettings } from "./connection";
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
import { RelayMetricHandler } from "./relay-metric-handler";
import {
CachedMetadata,
ProfileLoaderService,
RelayMetrics,
SystemInterface,
SystemSnapshot,
UserProfileCache,
UserRelaysCache,
RelayMetricCache,
UsersRelays,
QueryLike,
OutboxModel,
socialGraphInstance,
EventKind,
UsersFollows,
ID,
NostrSystemEvents,
SystemConfig,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayMetadataLoader } from "./outbox";
import { Optimizer, DefaultOptimizer } from "./query-optimizer";
import { ConnectionPool, DefaultConnectionPool } from "./connection-pool";
import { QueryManager } from "./query-manager";
import { CacheRelay } from "./cache-relay";
import { RequestRouter } from "./request-router";
import { UserFollowsCache } from "./cache/user-follows-lists";
import { SystemBase } from "./system-base";
/**
* Manages nostr content retrieval system
*/
export class NostrSystem extends EventEmitter<NostrSystemEvents> implements SystemInterface {
export class NostrSystem extends SystemBase implements SystemInterface {
#log = debug("System");
#queryManager: QueryManager;
#config: SystemConfig;
/**
* Storage class for user relay lists
*/
get relayCache(): CachedTable<UsersRelays> {
return this.#config.relays;
}
/**
* Storage class for user profiles
*/
get profileCache(): CachedTable<CachedMetadata> {
return this.#config.profiles;
}
/**
* Storage class for relay metrics (connects/disconnects)
*/
get relayMetricsCache(): CachedTable<RelayMetrics> {
return this.#config.relayMetrics;
}
/**
* Optimizer instance, contains optimized functions for processing data
*/
get optimizer(): Optimizer {
return this.#config.optimizer;
}
get eventsCache(): CachedTable<NostrEvent> {
return this.#config.events;
}
get userFollowsCache(): CachedTable<UsersFollows> {
return this.#config.contactLists;
}
get cacheRelay(): CacheRelay | undefined {
return this.#config.cachingRelay;
}
/**
* Check event signatures (recommended)
*/
get checkSigs(): boolean {
return this.#config.checkSigs;
}
set checkSigs(v: boolean) {
this.#config.checkSigs = v;
}
readonly profileLoader: ProfileLoaderService;
readonly relayMetricsHandler: RelayMetricHandler;
@ -100,39 +36,26 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
readonly requestRouter: RequestRouter | undefined;
constructor(props: Partial<SystemConfig>) {
super();
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
relayMetrics: props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics),
events: props.events ?? new EventsCache(props.db?.events),
contactLists: props.contactLists ?? new UserFollowsCache(props.db?.contacts),
optimizer: props.optimizer ?? DefaultOptimizer,
checkSigs: props.checkSigs ?? false,
cachingRelay: props.cachingRelay,
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
buildFollowGraph: props.buildFollowGraph ?? false,
};
super(props);
this.profileLoader = new ProfileLoaderService(this, this.profileCache);
this.relayMetricsHandler = new RelayMetricHandler(this.relayMetricsCache);
this.relayLoader = new RelayMetadataLoader(this, this.relayCache);
// if automatic outbox model, setup request router as OutboxModel
if (this.#config.automaticOutboxModel) {
if (this.config.automaticOutboxModel) {
this.requestRouter = OutboxModel.fromSystem(this);
}
// Cache everything
if (this.#config.cachingRelay) {
if (this.config.cachingRelay) {
this.on("event", async (_, ev) => {
await this.#config.cachingRelay?.event(ev);
await this.config.cachingRelay?.event(ev);
});
}
// Hook on-event when building follow graph
if (this.#config.buildFollowGraph) {
if (this.config.buildFollowGraph) {
let evBuf: Array<TaggedNostrEvent> = [];
let t: ReturnType<typeof setTimeout> | undefined;
this.on("event", (_, ev) => {
@ -213,7 +136,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
async PreloadSocialGraph() {
// Insert data to socialGraph from cache
if (this.#config.buildFollowGraph) {
if (this.config.buildFollowGraph) {
for (const list of this.userFollowsCache.snapshot()) {
const user = ID(list.pubkey);
for (const fx of list.follows) {

View File

@ -412,8 +412,12 @@ export class Query extends EventEmitter<QueryEvents> {
}
});
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
if (this.request.options?.fillStore ?? true) {
this.handleEvent(sub, ev);
if ((this.request.options?.fillStore ?? true) && qt.id === sub) {
if (qt.filters.some(v => eventMatchesFilter(ev, v))) {
this.feed.add(ev);
} else {
this.#log("Event did not match filter, rejecting %O %O", ev, qt);
}
}
};
const eoseHandler = (sub: string) => {

View File

@ -0,0 +1,111 @@
import { Connection, SyncCommand } from "../connection";
import { FallbackSyncMethod } from "../system";
import { EventExt, EventType } from "../event-ext";
import { NoteCollection } from "../note-collection";
import { RangeSync } from "./range-sync";
import { NegentropyFlow } from "../negentropy/negentropy-flow";
export interface ConnectionSyncModule {
sync: (c: Connection, item: SyncCommand, cb?: () => void) => void;
}
export class DefaultSyncModule implements ConnectionSyncModule {
constructor(readonly method: FallbackSyncMethod) {}
sync(c: Connection, item: SyncCommand, cb?: () => void) {
const [_, id, eventSet, ...filters] = item;
if (c.info?.negentropy === "v1") {
const newFilters = filters;
const neg = new NegentropyFlow(id, c, eventSet, newFilters);
neg.once("finish", filters => {
if (filters.length > 0) {
c.request(["REQ", id, ...filters], cb);
} else {
// no results to query, emulate closed
c.emit("closed", id, "Nothing to sync");
}
});
neg.once("error", () => {
this.#fallbackSync(c, item, cb);
});
neg.start();
} else {
this.#fallbackSync(c, item, cb);
}
}
#fallbackSync(c: Connection, item: SyncCommand, cb?: () => void) {
const [type, id, eventSet, ...filters] = item;
if (type !== "SYNC") throw new Error("Must be a SYNC command");
// if the event is replaceable there is no need to use any special sync query,
// just send the filters directly
const isReplaceableSync = filters.every(
a =>
a.kinds?.every(
b =>
EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable,
) ?? false,
);
if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplaceableSync) {
c.request(["REQ", id, ...filters], cb);
} else if (this.method === FallbackSyncMethod.Since) {
this.#syncSince(c, item, cb);
} else if (this.method === FallbackSyncMethod.RangeSync) {
this.#syncRangeSync(c, item, cb);
} else {
throw new Error("No fallback sync method");
}
}
/**
* Using the latest data, fetch only newer items
*
* The downfall of this method is when the dataset is truncated by the relay (ie. limit results to 1000 items)
*/
#syncSince(c: Connection, item: SyncCommand, cb?: () => void) {
const [type, id, eventSet, ...filters] = item;
if (type !== "SYNC") throw new Error("Must be a SYNC command");
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
const newFilters = filters.map(a => ({
...a,
since: latest + 1,
}));
c.request(["REQ", id, ...newFilters], cb);
}
/**
* Using the RangeSync class, sync data using fixed window size
*/
#syncRangeSync(c: Connection, item: SyncCommand, cb?: () => void) {
const [type, id, eventSet, ...filters] = item;
if (type !== "SYNC") throw new Error("Must be a SYNC command");
const rs = RangeSync.forFetcher(async (rb, cb) => {
return await new Promise((resolve, reject) => {
const results = new NoteCollection();
const f = rb.buildRaw();
c.on("event", (c, e) => {
if (rb.id === c) {
cb?.([e]);
results.add(e);
}
});
c.on("eose", s => {
if (s === rb.id) {
resolve(results.takeSnapshot());
}
});
c.request(["REQ", rb.id, ...f], undefined);
});
});
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
rs.setStartPoint(latest + 1);
rs.on("event", ev => {
ev.forEach(e => c.emit("event", id, e));
});
for (const f of filters) {
rs.sync(f);
}
}
}

View File

@ -5,14 +5,19 @@ import { EventsCache } from "./cache/events";
import { UserFollowsCache } from "./cache/user-follows-lists";
import { UserRelaysCache, UserProfileCache, RelayMetricCache, NostrEvent } from "./index";
import { DefaultOptimizer, Optimizer } from "./query-optimizer";
import { NostrSystemEvents, SystemConfig } from "./system";
import { FallbackSyncMethod, NostrSystemEvents, SystemConfig } from "./system";
import { EventEmitter } from "eventemitter3";
export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
#config: SystemConfig;
get config() {
return this.#config;
}
constructor(props: Partial<SystemConfig>) {
super();
this.#config = {
relays: props.relays ?? new UserRelaysCache(props.db?.userRelays),
profiles: props.profiles ?? new UserProfileCache(props.db?.users),
@ -25,6 +30,7 @@ export abstract class SystemBase extends EventEmitter<NostrSystemEvents> {
db: props.db,
automaticOutboxModel: props.automaticOutboxModel ?? true,
buildFollowGraph: props.buildFollowGraph ?? false,
fallbackSync: props.fallbackSync ?? FallbackSyncMethod.Since,
};
}

View File

@ -91,6 +91,16 @@ export interface SystemConfig {
* for users when fetching by author.
*/
buildFollowGraph: boolean;
/**
* Pick a fallback sync method when negentropy is not available
*/
fallbackSync: FallbackSyncMethod;
}
export enum FallbackSyncMethod {
Since = "since",
RangeSync = "range-sync",
}
export interface SystemInterface {
@ -200,6 +210,8 @@ export interface SystemInterface {
* Request router instance
*/
get requestRouter(): RequestRouter | undefined;
get config(): SystemConfig;
}
export interface SystemSnapshot {