send help
This commit is contained in:
parent
86ec7f41d7
commit
25e7f68dce
@ -1,7 +1,7 @@
|
||||
import { db, UsersRelays } from "Db";
|
||||
import FeedCache from "./FeedCache";
|
||||
|
||||
class UsersRelaysCache extends FeedCache<UsersRelays> {
|
||||
export class UsersRelaysCache extends FeedCache<UsersRelays> {
|
||||
constructor() {
|
||||
super("UserRelays", db.userRelays);
|
||||
}
|
||||
|
@ -20,7 +20,9 @@ export interface RelayProps {
|
||||
export default function Relay(props: RelayProps) {
|
||||
const navigate = useNavigate();
|
||||
const login = useLogin();
|
||||
const relaySettings = unwrap(login.relays.item[props.addr] ?? System.Sockets.get(props.addr)?.Settings ?? {});
|
||||
const relaySettings = unwrap(
|
||||
login.relays.item[props.addr] ?? System.Sockets.find(a => a.address === props.addr)?.settings ?? {}
|
||||
);
|
||||
const state = useRelayState(props.addr);
|
||||
const name = useMemo(() => getRelayName(props.addr), [props.addr]);
|
||||
|
||||
|
@ -66,8 +66,8 @@ const SubDebug = () => {
|
||||
return (
|
||||
<>
|
||||
<b>Connections:</b>
|
||||
{[...System.Sockets.keys()].map(k => (
|
||||
<RelayInfo id={k} />
|
||||
{System.Sockets.map(k => (
|
||||
<RelayInfo id={k.address} />
|
||||
))}
|
||||
</>
|
||||
);
|
||||
|
@ -1,18 +1,18 @@
|
||||
type HookFn = () => void;
|
||||
type HookFn<TSnapshot> = (e?: TSnapshot) => void;
|
||||
|
||||
interface HookFilter {
|
||||
fn: HookFn;
|
||||
interface HookFilter<TSnapshot> {
|
||||
fn: HookFn<TSnapshot>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple React hookable store with manual change notifications
|
||||
*/
|
||||
export default abstract class ExternalStore<TSnapshot> {
|
||||
#hooks: Array<HookFilter> = [];
|
||||
#hooks: Array<HookFilter<TSnapshot>> = [];
|
||||
#snapshot: Readonly<TSnapshot> = {} as Readonly<TSnapshot>;
|
||||
#changed = true;
|
||||
|
||||
hook(fn: HookFn) {
|
||||
hook(fn: HookFn<TSnapshot>) {
|
||||
this.#hooks.push({
|
||||
fn,
|
||||
});
|
||||
@ -32,9 +32,9 @@ export default abstract class ExternalStore<TSnapshot> {
|
||||
return this.#snapshot;
|
||||
}
|
||||
|
||||
protected notifyChange() {
|
||||
protected notifyChange(sn?: TSnapshot) {
|
||||
this.#changed = true;
|
||||
this.#hooks.forEach(h => h.fn());
|
||||
this.#hooks.forEach(h => h.fn(sn));
|
||||
}
|
||||
|
||||
abstract takeSnapshot(): TSnapshot;
|
||||
|
@ -1,18 +1,6 @@
|
||||
import { useSyncExternalStore } from "react";
|
||||
import { StateSnapshot } from "System";
|
||||
import { System } from "index";
|
||||
|
||||
const noop = () => {
|
||||
return () => undefined;
|
||||
};
|
||||
const noopState = (): StateSnapshot | undefined => {
|
||||
return undefined;
|
||||
};
|
||||
|
||||
export default function useRelayState(addr: string) {
|
||||
const c = System.Sockets.get(addr);
|
||||
return useSyncExternalStore<StateSnapshot | undefined>(
|
||||
c?.StatusHook.bind(c) ?? noop,
|
||||
c?.GetState.bind(c) ?? noopState
|
||||
);
|
||||
const c = System.Sockets.find(a => a.address === addr);
|
||||
return c;
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
|
||||
debounced?: number
|
||||
) => {
|
||||
const subscribe = (onChanged: () => void) => {
|
||||
const store = System.Query<TStore>(type, rb);
|
||||
const store = (System.Query<TStore>(type, rb)?.feed as TStore) ?? new type();
|
||||
let t: ReturnType<typeof setTimeout> | undefined;
|
||||
const release = store.hook(() => {
|
||||
if (!t) {
|
||||
@ -24,7 +24,7 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
|
||||
|
||||
return () => {
|
||||
if (rb?.id) {
|
||||
System.CancelQuery(rb.id);
|
||||
System.GetQuery(rb.id)?.cancel();
|
||||
}
|
||||
release();
|
||||
};
|
||||
|
@ -75,9 +75,9 @@ export default function Layout() {
|
||||
for (const [k, v] of Object.entries(relays.item)) {
|
||||
await System.ConnectToRelay(k, v);
|
||||
}
|
||||
for (const [k, c] of System.Sockets) {
|
||||
if (!relays.item[k] && !c.Ephemeral) {
|
||||
System.DisconnectRelay(k);
|
||||
for (const v of System.Sockets) {
|
||||
if (!relays.item[v.address] && !v.ephemeral) {
|
||||
System.DisconnectRelay(v.address);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
@ -6,7 +6,6 @@ import { useNavigate, useParams } from "react-router-dom";
|
||||
import Spinner from "Icons/Spinner";
|
||||
import { parseNostrLink, profileLink } from "SnortUtils";
|
||||
import { getNip05PubKey } from "Pages/LoginPage";
|
||||
import { System } from "index";
|
||||
|
||||
export default function NostrLinkHandler() {
|
||||
const params = useParams();
|
||||
@ -18,9 +17,6 @@ export default function NostrLinkHandler() {
|
||||
async function handleLink(link: string) {
|
||||
const nav = parseNostrLink(link);
|
||||
if (nav) {
|
||||
if ((nav.relays?.length ?? 0) > 0) {
|
||||
nav.relays?.map(a => System.ConnectEphemeralRelay(a));
|
||||
}
|
||||
if (nav.type === NostrPrefix.Event || nav.type === NostrPrefix.Note || nav.type === NostrPrefix.Address) {
|
||||
navigate(`/e/${nav.encode()}`);
|
||||
} else if (nav.type === NostrPrefix.PublicKey || nav.type === NostrPrefix.Profile) {
|
||||
|
@ -170,10 +170,10 @@ const GlobalTab = () => {
|
||||
useEffect(() => {
|
||||
return debounce(500, () => {
|
||||
const ret: RelayOption[] = [];
|
||||
System.Sockets.forEach((v, k) => {
|
||||
System.Sockets.forEach(v => {
|
||||
ret.push({
|
||||
url: k,
|
||||
paid: v.Info?.limitation?.payment_required ?? false,
|
||||
url: v.address,
|
||||
paid: v.info?.limitation?.payment_required ?? false,
|
||||
});
|
||||
});
|
||||
ret.sort(a => (a.paid ? -1 : 1));
|
||||
|
@ -42,7 +42,7 @@ const SearchPage = () => {
|
||||
useEffect(() => {
|
||||
const addedRelays: string[] = [];
|
||||
for (const [k, v] of SearchRelays) {
|
||||
if (!System.Sockets.has(k)) {
|
||||
if (!System.Sockets.some(v => v.address === k)) {
|
||||
System.ConnectToRelay(k, v);
|
||||
addedRelays.push(k);
|
||||
}
|
||||
|
@ -78,15 +78,14 @@ export default function ZapPoolPage() {
|
||||
const { wallet } = useWallet();
|
||||
|
||||
const relayConnections = useMemo(() => {
|
||||
return [...System.Sockets.values()]
|
||||
.map(a => {
|
||||
if (a.Info?.pubkey && !a.Ephemeral) {
|
||||
return {
|
||||
address: a.Address,
|
||||
pubkey: a.Info.pubkey,
|
||||
};
|
||||
}
|
||||
})
|
||||
return System.Sockets.map(a => {
|
||||
if (a.info?.pubkey && !a.ephemeral) {
|
||||
return {
|
||||
address: a.address,
|
||||
pubkey: a.info.pubkey,
|
||||
};
|
||||
}
|
||||
})
|
||||
.filter(a => a !== undefined)
|
||||
.map(unwrap);
|
||||
}, [login.relays]);
|
||||
|
@ -14,8 +14,8 @@ const RelayInfo = () => {
|
||||
const navigate = useNavigate();
|
||||
const login = useLogin();
|
||||
|
||||
const conn = Array.from(System.Sockets.values()).find(a => a.Id === params.id);
|
||||
const stats = useRelayState(conn?.Address ?? "");
|
||||
const conn = System.Sockets.find(a => a.id === params.id);
|
||||
const stats = useRelayState(conn?.address ?? "");
|
||||
|
||||
return (
|
||||
<>
|
||||
@ -105,7 +105,7 @@ const RelayInfo = () => {
|
||||
<div
|
||||
className="btn error"
|
||||
onClick={() => {
|
||||
removeRelay(login, unwrap(conn).Address);
|
||||
removeRelay(login, unwrap(conn).address);
|
||||
navigate("/settings/relays");
|
||||
}}>
|
||||
<FormattedMessage {...messages.Remove} />
|
||||
|
@ -16,7 +16,7 @@ const RelaySettingsPage = () => {
|
||||
const [newRelay, setNewRelay] = useState<string>();
|
||||
|
||||
const otherConnections = useMemo(() => {
|
||||
return [...System.Sockets.keys()].filter(a => relays.item[a] === undefined);
|
||||
return System.Sockets.filter(a => relays.item[a.address] === undefined);
|
||||
}, [relays]);
|
||||
|
||||
async function saveRelays() {
|
||||
@ -98,7 +98,7 @@ const RelaySettingsPage = () => {
|
||||
</h3>
|
||||
<div className="flex f-col mb10">
|
||||
{otherConnections.map(a => (
|
||||
<Relay addr={a} key={a} />
|
||||
<Relay addr={a.address} key={a.id} />
|
||||
))}
|
||||
</div>
|
||||
</>
|
||||
|
@ -5,8 +5,8 @@ import { ConnectionStats } from "./ConnectionStats";
|
||||
import { RawEvent, ReqCommand, TaggedRawEvent, u256 } from "./Nostr";
|
||||
import { RelayInfo } from "./RelayInfo";
|
||||
import { unwrap } from "./Util";
|
||||
import ExternalStore from "ExternalStore";
|
||||
|
||||
export type CustomHook = (state: Readonly<StateSnapshot>) => void;
|
||||
export type AuthHandler = (challenge: string, relay: string) => Promise<RawEvent | undefined>;
|
||||
|
||||
/**
|
||||
@ -20,7 +20,7 @@ export interface RelaySettings {
|
||||
/**
|
||||
* Snapshot of connection stats
|
||||
*/
|
||||
export interface StateSnapshot {
|
||||
export interface ConnectionStateSnapshot {
|
||||
connected: boolean;
|
||||
disconnects: number;
|
||||
avgLatency: number;
|
||||
@ -28,13 +28,16 @@ export interface StateSnapshot {
|
||||
received: number;
|
||||
send: number;
|
||||
};
|
||||
settings?: RelaySettings;
|
||||
info?: RelayInfo;
|
||||
pendingRequests: Array<string>;
|
||||
activeRequests: Array<string>;
|
||||
id: string;
|
||||
ephemeral: boolean;
|
||||
address: string;
|
||||
}
|
||||
|
||||
export class Connection {
|
||||
export class Connection extends ExternalStore<ConnectionStateSnapshot> {
|
||||
Id: string;
|
||||
Address: string;
|
||||
Socket: WebSocket | null = null;
|
||||
@ -50,10 +53,7 @@ export class Connection {
|
||||
Info?: RelayInfo;
|
||||
ConnectTimeout: number = DefaultConnectTimeout;
|
||||
Stats: ConnectionStats = new ConnectionStats();
|
||||
StateHooks: Map<string, CustomHook> = new Map();
|
||||
HasStateChange: boolean = true;
|
||||
CurrentState: StateSnapshot;
|
||||
LastState: Readonly<StateSnapshot>;
|
||||
IsClosed: boolean;
|
||||
ReconnectTimer: ReturnType<typeof setTimeout> | null;
|
||||
EventsCallback: Map<u256, (msg: boolean[]) => void>;
|
||||
@ -69,19 +69,10 @@ export class Connection {
|
||||
Down = true;
|
||||
|
||||
constructor(addr: string, options: RelaySettings, auth?: AuthHandler, ephemeral: boolean = false) {
|
||||
super();
|
||||
this.Id = uuid();
|
||||
this.Address = addr;
|
||||
this.Settings = options;
|
||||
this.CurrentState = {
|
||||
connected: false,
|
||||
disconnects: 0,
|
||||
avgLatency: 0,
|
||||
events: {
|
||||
received: 0,
|
||||
send: 0,
|
||||
},
|
||||
} as StateSnapshot;
|
||||
this.LastState = Object.freeze({ ...this.CurrentState });
|
||||
this.IsClosed = false;
|
||||
this.ReconnectTimer = null;
|
||||
this.EventsCallback = new Map();
|
||||
@ -146,7 +137,7 @@ export class Connection {
|
||||
this.ReconnectTimer = null;
|
||||
}
|
||||
this.Socket?.close();
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
OnOpen() {
|
||||
@ -181,7 +172,7 @@ export class Connection {
|
||||
this.#ResetQueues();
|
||||
// reset connection Id on disconnect, for query-tracking
|
||||
this.Id = uuid();
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
OnMessage(e: MessageEvent) {
|
||||
@ -194,7 +185,7 @@ export class Connection {
|
||||
.then(() => this.#sendPendingRaw())
|
||||
.catch(console.error);
|
||||
this.Stats.EventsReceived++;
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
break;
|
||||
}
|
||||
case "EVENT": {
|
||||
@ -203,7 +194,7 @@ export class Connection {
|
||||
relays: [this.Address],
|
||||
});
|
||||
this.Stats.EventsReceived++;
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
break;
|
||||
}
|
||||
case "EOSE": {
|
||||
@ -235,7 +226,7 @@ export class Connection {
|
||||
|
||||
OnError(e: Event) {
|
||||
console.error(e);
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -248,7 +239,7 @@ export class Connection {
|
||||
const req = ["EVENT", e];
|
||||
this.#SendJson(req);
|
||||
this.Stats.EventsSent++;
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -271,32 +262,10 @@ export class Connection {
|
||||
const req = ["EVENT", e];
|
||||
this.#SendJson(req);
|
||||
this.Stats.EventsSent++;
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook status for connection
|
||||
*/
|
||||
StatusHook(fnHook: CustomHook) {
|
||||
const id = uuid();
|
||||
this.StateHooks.set(id, fnHook);
|
||||
return () => {
|
||||
this.StateHooks.delete(id);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current state of this connection
|
||||
*/
|
||||
GetState() {
|
||||
if (this.HasStateChange) {
|
||||
this.LastState = Object.freeze({ ...this.CurrentState });
|
||||
this.HasStateChange = false;
|
||||
}
|
||||
return this.LastState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Using relay document to determine if this relay supports a feature
|
||||
*/
|
||||
@ -320,7 +289,7 @@ export class Connection {
|
||||
this.#SendJson(cmd);
|
||||
cbSent();
|
||||
}
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
CloseReq(id: string) {
|
||||
@ -329,7 +298,28 @@ export class Connection {
|
||||
this.OnEose?.(id);
|
||||
this.#SendQueuedRequests();
|
||||
}
|
||||
this.#UpdateState();
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
takeSnapshot(): ConnectionStateSnapshot {
|
||||
return {
|
||||
connected: this.Socket?.readyState === WebSocket.OPEN,
|
||||
events: {
|
||||
received: this.Stats.EventsReceived,
|
||||
send: this.Stats.EventsSent,
|
||||
},
|
||||
avgLatency:
|
||||
this.Stats.Latency.length > 0
|
||||
? this.Stats.Latency.reduce((acc, v) => acc + v, 0) / this.Stats.Latency.length
|
||||
: 0,
|
||||
disconnects: this.Stats.Disconnects,
|
||||
info: this.Info,
|
||||
id: this.Id,
|
||||
pendingRequests: [...this.PendingRequests.map(a => a.cmd[1])],
|
||||
activeRequests: [...this.ActiveRequests],
|
||||
ephemeral: this.Ephemeral,
|
||||
address: this.Address,
|
||||
};
|
||||
}
|
||||
|
||||
#SendQueuedRequests() {
|
||||
@ -351,30 +341,7 @@ export class Connection {
|
||||
this.ActiveRequests.clear();
|
||||
this.PendingRequests = [];
|
||||
this.PendingRaw = [];
|
||||
this.#UpdateState();
|
||||
}
|
||||
|
||||
#UpdateState() {
|
||||
this.CurrentState.connected = this.Socket?.readyState === WebSocket.OPEN;
|
||||
this.CurrentState.events.received = this.Stats.EventsReceived;
|
||||
this.CurrentState.events.send = this.Stats.EventsSent;
|
||||
this.CurrentState.avgLatency =
|
||||
this.Stats.Latency.length > 0 ? this.Stats.Latency.reduce((acc, v) => acc + v, 0) / this.Stats.Latency.length : 0;
|
||||
this.CurrentState.disconnects = this.Stats.Disconnects;
|
||||
this.CurrentState.info = this.Info;
|
||||
this.CurrentState.id = this.Id;
|
||||
this.CurrentState.pendingRequests = [...this.PendingRequests.map(a => a.cmd[1])];
|
||||
this.CurrentState.activeRequests = [...this.ActiveRequests];
|
||||
this.Stats.Latency = this.Stats.Latency.slice(-20); // trim
|
||||
this.HasStateChange = true;
|
||||
this.#NotifyState();
|
||||
}
|
||||
|
||||
#NotifyState() {
|
||||
const state = this.GetState();
|
||||
for (const [, h] of this.StateHooks) {
|
||||
h(state);
|
||||
}
|
||||
this.notifyChange();
|
||||
}
|
||||
|
||||
#SendJson(obj: object) {
|
||||
|
@ -7,6 +7,7 @@ import {
|
||||
Lists,
|
||||
RawEvent,
|
||||
RelaySettings,
|
||||
SystemInterface,
|
||||
TaggedRawEvent,
|
||||
u256,
|
||||
UserMetadata,
|
||||
@ -38,11 +39,6 @@ declare global {
|
||||
}
|
||||
}
|
||||
|
||||
interface SystemInterface {
|
||||
BroadcastEvent(ev: RawEvent): void;
|
||||
WriteOnceToRelay(relay: string, ev: RawEvent): Promise<void>;
|
||||
}
|
||||
|
||||
export class EventPublisher {
|
||||
#system: SystemInterface;
|
||||
#pubKey: string;
|
||||
|
249
packages/app/src/System/NostrSystem.ts
Normal file
249
packages/app/src/System/NostrSystem.ts
Normal file
@ -0,0 +1,249 @@
|
||||
import debug from "debug";
|
||||
import { v4 as uuid } from "uuid";
|
||||
|
||||
import ExternalStore from "ExternalStore";
|
||||
import { RawEvent, RawReqFilter, TaggedRawEvent } from "./Nostr";
|
||||
import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./Connection";
|
||||
import { Query, QueryBase } from "./Query";
|
||||
import { RelayCache } from "./GossipModel";
|
||||
import { NoteStore } from "./NoteCollection";
|
||||
import { BuiltRawReqFilter, RequestBuilder } from "./RequestBuilder";
|
||||
import { unwrap, sanitizeRelayUrl, unixNowMs } from "./Util";
|
||||
import { SystemInterface, SystemSnapshot } from "System";
|
||||
|
||||
/**
|
||||
* Manages nostr content retrieval system
|
||||
*/
|
||||
export class NostrSystem extends ExternalStore<SystemSnapshot> implements SystemInterface {
|
||||
/**
|
||||
* All currently connected websockets
|
||||
*/
|
||||
#sockets = new Map<string, Connection>();
|
||||
|
||||
/**
|
||||
* All active queries
|
||||
*/
|
||||
Queries: Map<string, Query> = new Map();
|
||||
|
||||
/**
|
||||
* Handler function for NIP-42
|
||||
*/
|
||||
HandleAuth?: AuthHandler;
|
||||
|
||||
#log = debug("System");
|
||||
#relayCache: RelayCache;
|
||||
|
||||
constructor(relayCache: RelayCache) {
|
||||
super();
|
||||
this.#relayCache = relayCache;
|
||||
this.#cleanup();
|
||||
}
|
||||
|
||||
get Sockets(): ConnectionStateSnapshot[] {
|
||||
return [...this.#sockets.values()].map(a => a.snapshot());
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a NOSTR relay if not already connected
|
||||
*/
|
||||
async ConnectToRelay(address: string, options: RelaySettings) {
|
||||
try {
|
||||
const addr = unwrap(sanitizeRelayUrl(address));
|
||||
if (!this.#sockets.has(addr)) {
|
||||
const c = new Connection(addr, options, this.HandleAuth?.bind(this));
|
||||
this.#sockets.set(addr, c);
|
||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||
c.OnDisconnect = id => this.OnRelayDisconnect(id);
|
||||
await c.Connect();
|
||||
} else {
|
||||
// update settings if already connected
|
||||
unwrap(this.#sockets.get(addr)).Settings = options;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
OnRelayDisconnect(id: string) {
|
||||
for (const [, q] of this.Queries) {
|
||||
q.connectionLost(id);
|
||||
}
|
||||
}
|
||||
|
||||
OnEndOfStoredEvents(c: Readonly<Connection>, sub: string) {
|
||||
for (const [, v] of this.Queries) {
|
||||
v.eose(sub, c);
|
||||
}
|
||||
}
|
||||
|
||||
OnEvent(sub: string, ev: TaggedRawEvent) {
|
||||
for (const [, v] of this.Queries) {
|
||||
v.onEvent(sub, ev);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param address Relay address URL
|
||||
*/
|
||||
async ConnectEphemeralRelay(address: string): Promise<Connection | undefined> {
|
||||
try {
|
||||
const addr = unwrap(sanitizeRelayUrl(address));
|
||||
if (!this.#sockets.has(addr)) {
|
||||
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true);
|
||||
this.#sockets.set(addr, c);
|
||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||
c.OnDisconnect = id => this.OnRelayDisconnect(id);
|
||||
await c.Connect();
|
||||
return c;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from a relay
|
||||
*/
|
||||
DisconnectRelay(address: string) {
|
||||
const c = this.#sockets.get(address);
|
||||
if (c) {
|
||||
this.#sockets.delete(address);
|
||||
c.Close();
|
||||
}
|
||||
}
|
||||
|
||||
GetQuery(id: string): Query | undefined {
|
||||
return this.Queries.get(id);
|
||||
}
|
||||
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder | null): Query | undefined {
|
||||
if (!req) return;
|
||||
|
||||
const existing = this.Queries.get(req.id);
|
||||
if (existing) {
|
||||
const filters = req.buildDiff(this.#relayCache, existing.filters);
|
||||
if (filters.length === 0 && !req.options?.skipDiff) {
|
||||
return existing;
|
||||
} else {
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(existing, subQ).then(qta =>
|
||||
qta.forEach(v => this.#log("New QT from diff %s %s %O from: %O", req.id, v.id, v.filters, existing.filters))
|
||||
);
|
||||
}
|
||||
this.notifyChange();
|
||||
return existing;
|
||||
}
|
||||
} else {
|
||||
const store = new type();
|
||||
|
||||
const filters = req.build(this.#relayCache);
|
||||
const q = new Query(req.id, store);
|
||||
if (req.options?.leaveOpen) {
|
||||
q.leaveOpen = req.options.leaveOpen;
|
||||
}
|
||||
|
||||
this.Queries.set(req.id, q);
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(q, subQ).then(qta =>
|
||||
qta.forEach(v => this.#log("New QT from diff %s %s %O", req.id, v.id, v.filters))
|
||||
);
|
||||
}
|
||||
this.notifyChange();
|
||||
return q;
|
||||
}
|
||||
}
|
||||
|
||||
async SendQuery(q: Query, qSend: BuiltRawReqFilter) {
|
||||
if (qSend.relay) {
|
||||
this.#log("Sending query to %s %O", qSend.relay, qSend);
|
||||
const s = this.#sockets.get(qSend.relay);
|
||||
if (s) {
|
||||
const qt = q.sendToRelay(s, qSend);
|
||||
if (qt) {
|
||||
return [qt];
|
||||
}
|
||||
} else {
|
||||
const nc = await this.ConnectEphemeralRelay(qSend.relay);
|
||||
if (nc) {
|
||||
const qt = q.sendToRelay(nc, qSend);
|
||||
if (qt) {
|
||||
return [qt];
|
||||
}
|
||||
} else {
|
||||
console.warn("Failed to connect to new relay for:", qSend.relay, q);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const ret = [];
|
||||
for (const [, s] of this.#sockets) {
|
||||
if (!s.Ephemeral) {
|
||||
const qt = q.sendToRelay(s, qSend);
|
||||
if (qt) {
|
||||
ret.push(qt);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Send events to writable relays
|
||||
*/
|
||||
BroadcastEvent(ev: RawEvent) {
|
||||
for (const [, s] of this.#sockets) {
|
||||
s.SendEvent(ev);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an event to a relay then disconnect
|
||||
*/
|
||||
async WriteOnceToRelay(address: string, ev: RawEvent) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const c = new Connection(address, { write: true, read: false }, this.HandleAuth, true);
|
||||
|
||||
const t = setTimeout(reject, 5_000);
|
||||
c.OnConnected = async () => {
|
||||
clearTimeout(t);
|
||||
await c.SendAsync(ev);
|
||||
c.Close();
|
||||
resolve();
|
||||
};
|
||||
c.Connect();
|
||||
});
|
||||
}
|
||||
|
||||
takeSnapshot(): SystemSnapshot {
|
||||
return {
|
||||
queries: [...this.Queries.values()].map(a => {
|
||||
return {
|
||||
id: a.id,
|
||||
filters: a.filters,
|
||||
closing: a.closing,
|
||||
subFilters: [],
|
||||
};
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
#cleanup() {
|
||||
const now = unixNowMs();
|
||||
let changed = false;
|
||||
for (const [k, v] of this.Queries) {
|
||||
if (v.closingAt && v.closingAt < now) {
|
||||
v.sendClose();
|
||||
this.Queries.delete(k);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if (changed) {
|
||||
this.notifyChange();
|
||||
}
|
||||
setTimeout(() => this.#cleanup(), 1_000);
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
import { EventKind, HexKey, NostrSystem, TaggedRawEvent } from "System";
|
||||
import { EventKind, HexKey, SystemInterface, TaggedRawEvent } from "System";
|
||||
import { ProfileCacheExpire } from "Const";
|
||||
import { mapEventToProfile, MetadataCache } from "Cache";
|
||||
import { UserCache } from "Cache/UserCache";
|
||||
@ -7,7 +7,7 @@ import { unixNowMs } from "SnortUtils";
|
||||
import debug from "debug";
|
||||
|
||||
export class ProfileLoaderService {
|
||||
#system: NostrSystem;
|
||||
#system: SystemInterface;
|
||||
|
||||
/**
|
||||
* List of pubkeys to fetch metadata for
|
||||
@ -16,7 +16,7 @@ export class ProfileLoaderService {
|
||||
|
||||
readonly #log = debug("ProfileCache");
|
||||
|
||||
constructor(system: NostrSystem) {
|
||||
constructor(system: SystemInterface) {
|
||||
this.#system = system;
|
||||
this.#FetchMetadata();
|
||||
}
|
||||
@ -74,8 +74,9 @@ export class ProfileLoaderService {
|
||||
|
||||
const newProfiles = new Set<string>();
|
||||
const q = this.#system.Query<PubkeyReplaceableNoteStore>(PubkeyReplaceableNoteStore, sub);
|
||||
const feed = (q?.feed as PubkeyReplaceableNoteStore) ?? new PubkeyReplaceableNoteStore();
|
||||
// never release this callback, it will stop firing anyway after eose
|
||||
const releaseOnEvent = q.onEvent(async e => {
|
||||
const releaseOnEvent = feed.onEvent(async e => {
|
||||
for (const pe of e) {
|
||||
newProfiles.add(pe.id);
|
||||
await this.onProfileEvent(pe);
|
||||
@ -83,17 +84,17 @@ export class ProfileLoaderService {
|
||||
});
|
||||
const results = await new Promise<Readonly<Array<TaggedRawEvent>>>(resolve => {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined = undefined;
|
||||
const release = q.hook(() => {
|
||||
if (!q.loading) {
|
||||
const release = feed.hook(() => {
|
||||
if (!feed.loading) {
|
||||
clearTimeout(timeout);
|
||||
resolve(q.getSnapshotData() ?? []);
|
||||
resolve(feed.getSnapshotData() ?? []);
|
||||
this.#log("Profiles finished: %s", sub.id);
|
||||
release();
|
||||
}
|
||||
});
|
||||
timeout = setTimeout(() => {
|
||||
release();
|
||||
resolve(q.getSnapshotData() ?? []);
|
||||
resolve(feed.getSnapshotData() ?? []);
|
||||
this.#log("Profiles timeout: %s", sub.id);
|
||||
}, 5_000);
|
||||
});
|
||||
|
@ -3,6 +3,7 @@ import { describe, expect } from "@jest/globals";
|
||||
import { Query, QueryBase } from "./Query";
|
||||
import { getRandomValues } from "crypto";
|
||||
import { FlatNoteStore } from "./NoteCollection";
|
||||
import { RequestStrategy } from "./RequestBuilder";
|
||||
|
||||
window.crypto = {} as any;
|
||||
window.crypto.getRandomValues = getRandomValues as any;
|
||||
@ -21,43 +22,88 @@ describe("query", () => {
|
||||
const c3 = new Connection("wss://three.com", opt);
|
||||
c3.Down = false;
|
||||
|
||||
q.sendToRelay(c1, {
|
||||
id: "test",
|
||||
const f = {
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
filters: [
|
||||
{
|
||||
kinds: [1],
|
||||
authors: ["test"],
|
||||
},
|
||||
],
|
||||
});
|
||||
q.sendToRelay(c2);
|
||||
q.sendToRelay(c3);
|
||||
};
|
||||
const qt1 = q.sendToRelay(c1, f);
|
||||
const qt2 = q.sendToRelay(c2, f);
|
||||
const qt3 = q.sendToRelay(c3, f);
|
||||
|
||||
expect(q.progress).toBe(0);
|
||||
q.eose(q.id, c1);
|
||||
q.eose(qt1!.id, c1);
|
||||
expect(q.progress).toBe(1 / 3);
|
||||
q.eose(q.id, c1);
|
||||
q.eose(qt1!.id, c1);
|
||||
expect(q.progress).toBe(1 / 3);
|
||||
q.eose(q.id, c2);
|
||||
q.eose(qt2!.id, c2);
|
||||
expect(q.progress).toBe(2 / 3);
|
||||
q.eose(q.id, c3);
|
||||
q.eose(qt3!.id, c3);
|
||||
expect(q.progress).toBe(1);
|
||||
|
||||
const qs = {
|
||||
id: "test-1",
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
filters: [
|
||||
{
|
||||
kinds: [1],
|
||||
authors: ["test-sub"],
|
||||
},
|
||||
],
|
||||
} as QueryBase;
|
||||
q.sendToRelay(c1, qs);
|
||||
};
|
||||
const qt = q.sendToRelay(c1, qs);
|
||||
|
||||
expect(q.progress).toBe(3 / 4);
|
||||
q.eose(qs.id, c1);
|
||||
q.eose(qt!.id, c1);
|
||||
expect(q.progress).toBe(1);
|
||||
q.sendToRelay(c2, qs);
|
||||
expect(q.progress).toBe(4 / 5);
|
||||
});
|
||||
|
||||
it("should merge all sub-query filters", () => {
|
||||
const q = new Query("test", new FlatNoteStore());
|
||||
const c0 = new Connection("wss://test.com", { read: true, write: true });
|
||||
q.sendToRelay(c0, {
|
||||
filters: [
|
||||
{
|
||||
authors: ["a"],
|
||||
kinds: [1],
|
||||
},
|
||||
],
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
});
|
||||
q.sendToRelay(c0, {
|
||||
filters: [
|
||||
{
|
||||
authors: ["b"],
|
||||
kinds: [1, 2],
|
||||
},
|
||||
],
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
});
|
||||
q.sendToRelay(c0, {
|
||||
filters: [
|
||||
{
|
||||
authors: ["c"],
|
||||
kinds: [2],
|
||||
},
|
||||
],
|
||||
relay: "",
|
||||
strategy: RequestStrategy.DefaultRelays,
|
||||
});
|
||||
|
||||
expect(q.filters).toEqual([
|
||||
{
|
||||
authors: ["a", "b", "c"],
|
||||
kinds: [1, 2],
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
@ -1,9 +1,11 @@
|
||||
import { v4 as uuid } from "uuid";
|
||||
import debug from "debug";
|
||||
import { Connection, RawReqFilter, Nips } from "System";
|
||||
import { Connection, RawReqFilter, Nips, TaggedRawEvent } from "System";
|
||||
import { unixNowMs, unwrap } from "SnortUtils";
|
||||
import { NoteStore } from "./NoteCollection";
|
||||
import { mergeSimilar } from "./RequestMerger";
|
||||
import { simpleMerge } from "./RequestMerger";
|
||||
import { eventMatchesFilter } from "./RequestMatcher";
|
||||
import { BuiltRawReqFilter } from "./RequestBuilder";
|
||||
|
||||
/**
|
||||
* Tracing for relay query status
|
||||
@ -19,7 +21,6 @@ class QueryTrace {
|
||||
readonly #fnProgress: () => void;
|
||||
|
||||
constructor(
|
||||
readonly subId: string,
|
||||
readonly relay: string,
|
||||
readonly filters: Array<RawReqFilter>,
|
||||
readonly connId: string,
|
||||
@ -51,7 +52,7 @@ class QueryTrace {
|
||||
|
||||
sendClose() {
|
||||
this.close = unixNowMs();
|
||||
this.#fnClose(this.subId);
|
||||
this.#fnClose(this.id);
|
||||
this.#fnProgress();
|
||||
}
|
||||
|
||||
@ -135,7 +136,6 @@ export class Query implements QueryBase {
|
||||
*/
|
||||
#feed: NoteStore;
|
||||
|
||||
subQueryCounter = 0;
|
||||
#log = debug("Query");
|
||||
|
||||
constructor(id: string, feed: NoteStore) {
|
||||
@ -152,32 +152,37 @@ export class Query implements QueryBase {
|
||||
return this.#cancelTimeout;
|
||||
}
|
||||
|
||||
get filters() {
|
||||
const filters = this.#tracing.flatMap(a => a.filters);
|
||||
return [simpleMerge(filters)];
|
||||
}
|
||||
|
||||
get feed() {
|
||||
return this.#feed;
|
||||
}
|
||||
|
||||
get filters() {
|
||||
const filters = this.#tracing.flatMap(a => a.filters);
|
||||
return mergeSimilar(filters);
|
||||
onEvent(sub: string, e: TaggedRawEvent) {
|
||||
for (const t of this.#tracing) {
|
||||
if (t.id === sub) {
|
||||
this.feed.add(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cancel() {
|
||||
this.#cancelTimeout = unixNowMs() + 5_000;
|
||||
}
|
||||
|
||||
unCancel() {
|
||||
this.#cancelTimeout = undefined;
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
this.#stopCheckTraces();
|
||||
}
|
||||
|
||||
sendToRelay(c: Connection, subq?: QueryBase) {
|
||||
if (!this.#canSendQuery(c, subq ?? this)) {
|
||||
sendToRelay(c: Connection, subq: BuiltRawReqFilter) {
|
||||
if (!this.#canSendQuery(c, subq)) {
|
||||
return;
|
||||
}
|
||||
this.#sendQueryInternal(c, subq ?? this);
|
||||
return this.#sendQueryInternal(c, subq);
|
||||
}
|
||||
|
||||
connectionLost(id: string) {
|
||||
@ -192,7 +197,7 @@ export class Query implements QueryBase {
|
||||
}
|
||||
|
||||
eose(sub: string, conn: Readonly<Connection>) {
|
||||
const qt = this.#tracing.find(a => a.subId === sub && a.connId === conn.Id);
|
||||
const qt = this.#tracing.find(a => a.id === sub && a.connId === conn.Id);
|
||||
qt?.gotEose();
|
||||
if (!this.leaveOpen) {
|
||||
qt?.sendClose();
|
||||
@ -235,12 +240,12 @@ export class Query implements QueryBase {
|
||||
}, 500);
|
||||
}
|
||||
|
||||
#canSendQuery(c: Connection, q: QueryBase) {
|
||||
if (q.relays && !q.relays.includes(c.Address)) {
|
||||
#canSendQuery(c: Connection, q: BuiltRawReqFilter) {
|
||||
if (q.relay && q.relay !== c.Address) {
|
||||
return false;
|
||||
}
|
||||
if ((q.relays?.length ?? 0) === 0 && c.Ephemeral) {
|
||||
this.#log("Cant send non-specific REQ to ephemeral connection %O %O %O", q, q.relays, c);
|
||||
if (!q.relay && c.Ephemeral) {
|
||||
this.#log("Cant send non-specific REQ to ephemeral connection %O %O %O", q, q.relay, c);
|
||||
return false;
|
||||
}
|
||||
if (q.filters.some(a => a.search) && !c.SupportsNip(Nips.Search)) {
|
||||
@ -250,9 +255,8 @@ export class Query implements QueryBase {
|
||||
return true;
|
||||
}
|
||||
|
||||
#sendQueryInternal(c: Connection, q: QueryBase) {
|
||||
#sendQueryInternal(c: Connection, q: BuiltRawReqFilter) {
|
||||
const qt = new QueryTrace(
|
||||
q.id,
|
||||
c.Address,
|
||||
q.filters,
|
||||
c.Id,
|
||||
@ -260,6 +264,7 @@ export class Query implements QueryBase {
|
||||
() => this.#onProgress()
|
||||
);
|
||||
this.#tracing.push(qt);
|
||||
c.QueueReq(["REQ", q.id, ...q.filters], () => qt.sentToRelay());
|
||||
c.QueueReq(["REQ", qt.id, ...q.filters], () => qt.sentToRelay());
|
||||
return qt;
|
||||
}
|
||||
}
|
||||
|
@ -88,10 +88,7 @@ describe("RequestBuilder", () => {
|
||||
f0.authors(["a"]);
|
||||
expect(a).toEqual([{}]);
|
||||
|
||||
const b = rb.buildDiff(DummyCache, {
|
||||
filters: a,
|
||||
id: "test",
|
||||
});
|
||||
const b = rb.buildDiff(DummyCache, a);
|
||||
expect(b).toMatchObject([
|
||||
{
|
||||
filters: [{ authors: ["a"] }],
|
||||
|
@ -1,9 +1,7 @@
|
||||
import { RawReqFilter, u256, HexKey, EventKind } from "System";
|
||||
import { appendDedupe, dedupe } from "SnortUtils";
|
||||
import { QueryBase } from "./Query";
|
||||
import { diffFilters } from "./RequestSplitter";
|
||||
import { RelayCache, splitAllByWriteRelays, splitByWriteRelays } from "./GossipModel";
|
||||
import { mergeSimilar } from "./RequestMerger";
|
||||
|
||||
/**
|
||||
* Which strategy is used when building REQ filters
|
||||
@ -84,8 +82,8 @@ export class RequestBuilder {
|
||||
}
|
||||
|
||||
build(relays: RelayCache): Array<BuiltRawReqFilter> {
|
||||
const expanded = this.#builders.map(a => a.build(relays)).flat();
|
||||
return this.#mergeSimilar(expanded);
|
||||
const expanded = this.#builders.flatMap(a => a.build(relays, this.id));
|
||||
return this.#groupByRelay(expanded);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,11 +91,10 @@ export class RequestBuilder {
|
||||
* @param q All previous filters merged
|
||||
* @returns
|
||||
*/
|
||||
buildDiff(relays: RelayCache, q: QueryBase): Array<BuiltRawReqFilter> {
|
||||
buildDiff(relays: RelayCache, filters: Array<RawReqFilter>): Array<BuiltRawReqFilter> {
|
||||
const next = this.buildRaw();
|
||||
const diff = diffFilters(q.filters, next);
|
||||
const diff = diffFilters(filters, next);
|
||||
if (diff.changed) {
|
||||
console.debug("DIFF", q.filters, next, diff);
|
||||
return splitAllByWriteRelays(relays, diff.filters).map(a => {
|
||||
return {
|
||||
strategy: RequestStrategy.AuthorsRelays,
|
||||
@ -114,7 +111,7 @@ export class RequestBuilder {
|
||||
* @param expanded
|
||||
* @returns
|
||||
*/
|
||||
#mergeSimilar(expanded: Array<BuiltRawReqFilter>) {
|
||||
#groupByRelay(expanded: Array<BuiltRawReqFilter>) {
|
||||
const relayMerged = expanded.reduce((acc, v) => {
|
||||
const existing = acc.get(v.relay);
|
||||
if (existing) {
|
||||
@ -125,14 +122,12 @@ export class RequestBuilder {
|
||||
return acc;
|
||||
}, new Map<string, Array<BuiltRawReqFilter>>());
|
||||
|
||||
const filtersSquashed = [...relayMerged.values()].flatMap(a => {
|
||||
return mergeSimilar(a.flatMap(b => b.filters)).map(b => {
|
||||
return {
|
||||
filters: [b],
|
||||
relay: a[0].relay,
|
||||
strategy: a[0].strategy,
|
||||
} as BuiltRawReqFilter;
|
||||
});
|
||||
const filtersSquashed = [...relayMerged.values()].map(a => {
|
||||
return {
|
||||
filters: a.flatMap(b => b.filters),
|
||||
relay: a[0].relay,
|
||||
strategy: a[0].strategy,
|
||||
} as BuiltRawReqFilter;
|
||||
});
|
||||
|
||||
return filtersSquashed;
|
||||
@ -211,7 +206,7 @@ export class RequestFilterBuilder {
|
||||
/**
|
||||
* Build/expand this filter into a set of relay specific queries
|
||||
*/
|
||||
build(relays: RelayCache): Array<BuiltRawReqFilter> {
|
||||
build(relays: RelayCache, id: string): Array<BuiltRawReqFilter> {
|
||||
// when querying for specific event ids with relay hints
|
||||
// take the first approach which is to split the filter by relay
|
||||
if (this.#filter.ids && this.#relayHints.size > 0) {
|
||||
|
23
packages/app/src/System/RequestMatcher.test.ts
Normal file
23
packages/app/src/System/RequestMatcher.test.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { eventMatchesFilter } from "./RequestMatcher";
|
||||
|
||||
describe("RequestMatcher", () => {
|
||||
it("should match simple filter", () => {
|
||||
const ev = {
|
||||
id: "test",
|
||||
kind: 1,
|
||||
pubkey: "pubkey",
|
||||
created_at: 99,
|
||||
tags: [],
|
||||
content: "test",
|
||||
sig: "",
|
||||
};
|
||||
const filter = {
|
||||
ids: ["test"],
|
||||
authors: ["pubkey", "other"],
|
||||
kinds: [1, 2, 3],
|
||||
since: 1,
|
||||
before: 100,
|
||||
};
|
||||
expect(eventMatchesFilter(ev, filter)).toBe(true);
|
||||
});
|
||||
});
|
20
packages/app/src/System/RequestMatcher.ts
Normal file
20
packages/app/src/System/RequestMatcher.ts
Normal file
@ -0,0 +1,20 @@
|
||||
import { RawEvent, RawReqFilter } from "./Nostr";
|
||||
|
||||
export function eventMatchesFilter(ev: RawEvent, filter: RawReqFilter) {
|
||||
if (!(filter.ids?.includes(ev.id) ?? false)) {
|
||||
return false;
|
||||
}
|
||||
if (!(filter.authors?.includes(ev.pubkey) ?? false)) {
|
||||
return false;
|
||||
}
|
||||
if (!(filter.kinds?.includes(ev.kind) ?? false)) {
|
||||
return false;
|
||||
}
|
||||
if (filter.since && ev.created_at < filter.since) {
|
||||
return false;
|
||||
}
|
||||
if (filter.until && ev.created_at > filter.until) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import { RawReqFilter } from "System";
|
||||
import { filterIncludes, mergeSimilar } from "./RequestMerger";
|
||||
import { filterIncludes, mergeSimilar, simpleMerge } from "./RequestMerger";
|
||||
|
||||
describe("RequestMerger", () => {
|
||||
it("should simple merge authors", () => {
|
||||
@ -53,4 +53,19 @@ describe("RequestMerger", () => {
|
||||
} as RawReqFilter;
|
||||
expect(filterIncludes(bigger, smaller)).toBe(true);
|
||||
});
|
||||
|
||||
it("simpleMerge", () => {
|
||||
const a = {
|
||||
authors: ["a", "b", "c"],
|
||||
since: 99,
|
||||
} as RawReqFilter;
|
||||
const b = {
|
||||
authors: ["c", "d", "e"],
|
||||
since: 100,
|
||||
} as RawReqFilter;
|
||||
expect(simpleMerge([a, b])).toEqual({
|
||||
authors: ["a", "b", "c", "d", "e"],
|
||||
since: 100,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -9,7 +9,12 @@ export function mergeSimilar(filters: Array<RawReqFilter>): Array<RawReqFilter>
|
||||
return [...(canEasilyMerge.length > 0 ? [simpleMerge(canEasilyMerge)] : []), ...cannotMerge];
|
||||
}
|
||||
|
||||
function simpleMerge(filters: Array<RawReqFilter>) {
|
||||
/**
|
||||
* Simply flatten all filters into one
|
||||
* @param filters
|
||||
* @returns
|
||||
*/
|
||||
export function simpleMerge(filters: Array<RawReqFilter>) {
|
||||
const result: any = {};
|
||||
|
||||
filters.forEach(filter => {
|
||||
@ -21,7 +26,7 @@ function simpleMerge(filters: Array<RawReqFilter>) {
|
||||
result[key] = [...new Set([...result[key], ...value])];
|
||||
}
|
||||
} else {
|
||||
throw new Error("Cannot simple merge with non-array filter properties");
|
||||
result[key] = value;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { RawReqFilter } from "System";
|
||||
import { describe, expect } from "@jest/globals";
|
||||
import { diffFilters } from "./RequestSplitter";
|
||||
import { diffFilters, expandFilter } from "./RequestSplitter";
|
||||
|
||||
describe("RequestSplitter", () => {
|
||||
test("single filter add value", () => {
|
||||
@ -72,4 +72,33 @@ describe("RequestSplitter", () => {
|
||||
changed: true,
|
||||
});
|
||||
});
|
||||
test("expand filter", () => {
|
||||
const a = {
|
||||
authors: ["a", "b", "c"],
|
||||
kinds: [1, 2, 3],
|
||||
ids: ["x", "y"],
|
||||
since: 99,
|
||||
limit: 10,
|
||||
};
|
||||
expect(expandFilter(a)).toEqual([
|
||||
{ authors: ["a"], kinds: [1], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["a"], kinds: [1], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["a"], kinds: [2], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["a"], kinds: [2], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["a"], kinds: [3], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["a"], kinds: [3], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [1], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [1], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [2], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [2], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [3], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["b"], kinds: [3], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [1], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [1], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [2], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [2], ids: ["y"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [3], ids: ["x"], since: 99, limit: 10 },
|
||||
{ authors: ["c"], kinds: [3], ids: ["y"], since: 99, limit: 10 },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
@ -42,3 +42,35 @@ export function diffFilters(a: Array<RawReqFilter>, b: Array<RawReqFilter>) {
|
||||
changed: anyChanged,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Expand a filter into its most fine grained form
|
||||
*/
|
||||
export function expandFilter(f: RawReqFilter): Array<RawReqFilter> {
|
||||
const ret: Array<RawReqFilter> = [];
|
||||
const src = Object.entries(f);
|
||||
const keys = src.filter(([, v]) => Array.isArray(v)).map(a => a[0]);
|
||||
const props = src.filter(([, v]) => !Array.isArray(v));
|
||||
|
||||
function generateCombinations(index: number, currentCombination: RawReqFilter) {
|
||||
if (index === keys.length) {
|
||||
ret.push(currentCombination);
|
||||
return;
|
||||
}
|
||||
|
||||
const key = keys[index];
|
||||
const values = (f as Record<string, Array<string | number>>)[key];
|
||||
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
const value = values[i];
|
||||
const updatedCombination = { ...currentCombination, [key]: [value] };
|
||||
generateCombinations(index + 1, updatedCombination);
|
||||
}
|
||||
}
|
||||
|
||||
generateCombinations(0, {
|
||||
...Object.fromEntries(props),
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
69
packages/app/src/System/SystemWorker.ts
Normal file
69
packages/app/src/System/SystemWorker.ts
Normal file
@ -0,0 +1,69 @@
|
||||
import ExternalStore from "ExternalStore";
|
||||
import {
|
||||
NoteStore,
|
||||
Query,
|
||||
RawEvent,
|
||||
RelaySettings,
|
||||
RequestBuilder,
|
||||
SystemSnapshot,
|
||||
SystemInterface,
|
||||
ConnectionStateSnapshot,
|
||||
AuthHandler,
|
||||
} from "System";
|
||||
|
||||
export class SystemWorker extends ExternalStore<SystemSnapshot> implements SystemInterface {
|
||||
#port: MessagePort;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
if ("SharedWorker" in window) {
|
||||
const worker = new SharedWorker("/system.js");
|
||||
this.#port = worker.port;
|
||||
this.#port.onmessage = m => this.#onMessage(m);
|
||||
} else {
|
||||
throw new Error("SharedWorker is not supported");
|
||||
}
|
||||
}
|
||||
|
||||
HandleAuth?: AuthHandler;
|
||||
|
||||
get Sockets(): ConnectionStateSnapshot[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
Query<T extends NoteStore>(type: new () => T, req: RequestBuilder | null): Query | undefined {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
CancelQuery(sub: string): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
GetQuery(sub: string): Query | undefined {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
ConnectToRelay(address: string, options: RelaySettings): Promise<void> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
DisconnectRelay(address: string): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
BroadcastEvent(ev: RawEvent): void {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
WriteOnceToRelay(relay: string, ev: RawEvent): Promise<void> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
takeSnapshot(): SystemSnapshot {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
#onMessage(e: MessageEvent<any>) {
|
||||
console.debug(e);
|
||||
}
|
||||
}
|
@ -32,3 +32,11 @@ export function sanitizeRelayUrl(url: string) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
export function unixNow() {
|
||||
return Math.floor(unixNowMs() / 1000);
|
||||
}
|
||||
|
||||
export function unixNowMs() {
|
||||
return new Date().getTime();
|
||||
}
|
||||
|
@ -1,8 +1,4 @@
|
||||
import debug from "debug";
|
||||
|
||||
import { sanitizeRelayUrl, unixNowMs, unwrap } from "SnortUtils";
|
||||
import { RawEvent, RawReqFilter, TaggedRawEvent } from "./Nostr";
|
||||
import { AuthHandler, Connection, RelaySettings, StateSnapshot } from "./Connection";
|
||||
import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./Connection";
|
||||
import { RequestBuilder } from "./RequestBuilder";
|
||||
import { EventBuilder } from "./EventBuilder";
|
||||
import {
|
||||
@ -12,10 +8,10 @@ import {
|
||||
ParameterizedReplaceableNoteStore,
|
||||
ReplaceableNoteStore,
|
||||
} from "./NoteCollection";
|
||||
import { Query, QueryBase } from "./Query";
|
||||
import ExternalStore from "ExternalStore";
|
||||
import { RelayCache } from "./GossipModel";
|
||||
import { Query } from "./Query";
|
||||
import { RawEvent, RawReqFilter } from "./Nostr";
|
||||
|
||||
export * from "./NostrSystem";
|
||||
export { default as EventKind } from "./EventKind";
|
||||
export * from "./Nostr";
|
||||
export * from "./Links";
|
||||
@ -23,6 +19,29 @@ export { default as Tag } from "./Tag";
|
||||
export * from "./Nips";
|
||||
export * from "./RelayInfo";
|
||||
|
||||
export interface SystemInterface {
|
||||
/**
|
||||
* Handler function for NIP-42
|
||||
*/
|
||||
HandleAuth?: AuthHandler;
|
||||
get Sockets(): Array<ConnectionStateSnapshot>;
|
||||
GetQuery(id: string): Query | undefined;
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder | null): Query | undefined;
|
||||
ConnectToRelay(address: string, options: RelaySettings): Promise<void>;
|
||||
DisconnectRelay(address: string): void;
|
||||
BroadcastEvent(ev: RawEvent): void;
|
||||
WriteOnceToRelay(relay: string, ev: RawEvent): Promise<void>;
|
||||
}
|
||||
|
||||
export interface SystemSnapshot {
|
||||
queries: Array<{
|
||||
id: string;
|
||||
filters: Array<RawReqFilter>;
|
||||
subFilters: Array<RawReqFilter>;
|
||||
closing: boolean;
|
||||
}>;
|
||||
}
|
||||
|
||||
export {
|
||||
NoteStore,
|
||||
RequestBuilder,
|
||||
@ -35,298 +54,5 @@ export {
|
||||
AuthHandler,
|
||||
Connection,
|
||||
RelaySettings,
|
||||
StateSnapshot,
|
||||
ConnectionStateSnapshot,
|
||||
};
|
||||
|
||||
export interface SystemSnapshot {
|
||||
queries: Array<{
|
||||
id: string;
|
||||
filters: Array<RawReqFilter>;
|
||||
subFilters: Array<RawReqFilter>;
|
||||
closing: boolean;
|
||||
}>;
|
||||
}
|
||||
|
||||
export type HookSystemSnapshotRelease = () => void;
|
||||
export type HookSystemSnapshot = () => void;
|
||||
|
||||
/**
|
||||
* Manages nostr content retrieval system
|
||||
*/
|
||||
export class NostrSystem extends ExternalStore<SystemSnapshot> {
|
||||
/**
|
||||
* All currently connected websockets
|
||||
*/
|
||||
Sockets: Map<string, Connection>;
|
||||
|
||||
/**
|
||||
* All active queries
|
||||
*/
|
||||
Queries: Map<string, Query> = new Map();
|
||||
|
||||
/**
|
||||
* Handler function for NIP-42
|
||||
*/
|
||||
HandleAuth?: AuthHandler;
|
||||
|
||||
#log = debug("System");
|
||||
#relayCache: RelayCache;
|
||||
|
||||
constructor(relayCache: RelayCache) {
|
||||
super();
|
||||
this.Sockets = new Map();
|
||||
this.#relayCache = relayCache;
|
||||
this.#cleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a NOSTR relay if not already connected
|
||||
*/
|
||||
async ConnectToRelay(address: string, options: RelaySettings) {
|
||||
try {
|
||||
const addr = unwrap(sanitizeRelayUrl(address));
|
||||
if (!this.Sockets.has(addr)) {
|
||||
const c = new Connection(addr, options, this.HandleAuth?.bind(this));
|
||||
this.Sockets.set(addr, c);
|
||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||
c.OnDisconnect = id => this.OnRelayDisconnect(id);
|
||||
c.OnConnected = () => {
|
||||
for (const [, q] of this.Queries) {
|
||||
q.sendToRelay(c, q);
|
||||
}
|
||||
};
|
||||
await c.Connect();
|
||||
} else {
|
||||
// update settings if already connected
|
||||
unwrap(this.Sockets.get(addr)).Settings = options;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
OnRelayDisconnect(id: string) {
|
||||
for (const [, q] of this.Queries) {
|
||||
q.connectionLost(id);
|
||||
}
|
||||
}
|
||||
|
||||
OnEndOfStoredEvents(c: Readonly<Connection>, sub: string) {
|
||||
const q = this.GetQuery(sub);
|
||||
if (q) {
|
||||
q.eose(sub, c);
|
||||
}
|
||||
}
|
||||
|
||||
OnEvent(sub: string, ev: TaggedRawEvent) {
|
||||
const q = this.GetQuery(sub);
|
||||
if (q?.feed) {
|
||||
q.feed.add(ev);
|
||||
}
|
||||
}
|
||||
|
||||
GetQuery(sub: string) {
|
||||
const subFilterId = /-\d+$/i;
|
||||
if (sub.match(subFilterId)) {
|
||||
// feed events back into parent query
|
||||
sub = sub.split(subFilterId)[0];
|
||||
}
|
||||
return this.Queries.get(sub);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param address Relay address URL
|
||||
*/
|
||||
async ConnectEphemeralRelay(address: string): Promise<Connection | undefined> {
|
||||
try {
|
||||
const addr = unwrap(sanitizeRelayUrl(address));
|
||||
if (!this.Sockets.has(addr)) {
|
||||
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true);
|
||||
this.Sockets.set(addr, c);
|
||||
c.OnEvent = (s, e) => this.OnEvent(s, e);
|
||||
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
|
||||
c.OnDisconnect = id => this.OnRelayDisconnect(id);
|
||||
c.OnConnected = () => {
|
||||
for (const [, q] of this.Queries) {
|
||||
if (q.progress !== 1) {
|
||||
q.sendToRelay(c, q);
|
||||
}
|
||||
}
|
||||
};
|
||||
await c.Connect();
|
||||
return c;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from a relay
|
||||
*/
|
||||
DisconnectRelay(address: string) {
|
||||
const c = this.Sockets.get(address);
|
||||
if (c) {
|
||||
this.Sockets.delete(address);
|
||||
c.Close();
|
||||
}
|
||||
}
|
||||
|
||||
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder | null): Readonly<T> {
|
||||
/**
|
||||
* ## Notes
|
||||
*
|
||||
* Given a set of existing filters:
|
||||
* ["REQ", "1", { kinds: [0, 7], authors: [...], since: now()-1hr, until: now() }]
|
||||
* ["REQ", "2", { kinds: [0, 7], authors: [...], since: now(), limit: 0 }]
|
||||
*
|
||||
* ## Problem 1:
|
||||
* Assume we now want to update sub "1" with a new set of authors,
|
||||
* what should we do, should we close sub "1" and send the new set or create another
|
||||
* subscription with the new pubkeys (diff)
|
||||
*
|
||||
* Creating a new subscription sounds great but also is a problem when relays limit
|
||||
* active subscriptions, maybe we should instead queue the new
|
||||
* subscription (assuming that we expect to close at EOSE)
|
||||
*
|
||||
* ## Problem 2:
|
||||
* When multiple filters a specifid in a single filter but only 1 filter changes,
|
||||
* ~~same as above~~
|
||||
*
|
||||
* Seems reasonable to do "Queue Diff", should also be possible to collapse multiple
|
||||
* pending filters for the same subscription
|
||||
*/
|
||||
|
||||
if (!req) return new type();
|
||||
|
||||
const existing = this.Queries.get(req.id);
|
||||
if (existing) {
|
||||
const filters = req.buildDiff(this.#relayCache, existing);
|
||||
existing.unCancel();
|
||||
|
||||
if (filters.length === 0 && !req.options?.skipDiff) {
|
||||
this.notifyChange();
|
||||
return existing.feed as Readonly<T>;
|
||||
} else {
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(existing, {
|
||||
id: `${existing.id}-${existing.subQueryCounter++}`,
|
||||
filters: subQ.filters,
|
||||
relays: subQ.relay ? [subQ.relay] : undefined,
|
||||
});
|
||||
}
|
||||
this.notifyChange();
|
||||
return existing.feed as Readonly<T>;
|
||||
}
|
||||
} else {
|
||||
const store = new type();
|
||||
|
||||
const filters = req.build(this.#relayCache);
|
||||
const q = new Query(req.id, store);
|
||||
if (req.options?.leaveOpen) {
|
||||
q.leaveOpen = req.options.leaveOpen;
|
||||
}
|
||||
|
||||
this.Queries.set(req.id, q);
|
||||
for (const subQ of filters) {
|
||||
this.SendQuery(q, {
|
||||
id: `${q.id}-${q.subQueryCounter++}`,
|
||||
filters: subQ.filters,
|
||||
relays: subQ.relay ? [subQ.relay] : undefined,
|
||||
});
|
||||
}
|
||||
this.notifyChange();
|
||||
return q.feed as Readonly<T>;
|
||||
}
|
||||
}
|
||||
|
||||
CancelQuery(sub: string) {
|
||||
const q = this.Queries.get(sub);
|
||||
if (q) {
|
||||
q.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
async SendQuery(q: Query, qSend: QueryBase) {
|
||||
if (qSend.relays && qSend.relays.length > 0) {
|
||||
for (const r of qSend.relays) {
|
||||
this.#log("Sending query to %s %O", r, qSend);
|
||||
const s = this.Sockets.get(r);
|
||||
if (s) {
|
||||
q.sendToRelay(s, qSend);
|
||||
} else {
|
||||
const nc = await this.ConnectEphemeralRelay(r);
|
||||
if (nc) {
|
||||
q.sendToRelay(nc, qSend);
|
||||
} else {
|
||||
console.warn("Failed to connect to new relay for:", r, q);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const [, s] of this.Sockets) {
|
||||
if (!s.Ephemeral) {
|
||||
q.sendToRelay(s, qSend);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send events to writable relays
|
||||
*/
|
||||
BroadcastEvent(ev: RawEvent) {
|
||||
for (const [, s] of this.Sockets) {
|
||||
s.SendEvent(ev);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an event to a relay then disconnect
|
||||
*/
|
||||
async WriteOnceToRelay(address: string, ev: RawEvent) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const c = new Connection(address, { write: true, read: false }, this.HandleAuth, true);
|
||||
|
||||
const t = setTimeout(reject, 5_000);
|
||||
c.OnConnected = async () => {
|
||||
clearTimeout(t);
|
||||
await c.SendAsync(ev);
|
||||
c.Close();
|
||||
resolve();
|
||||
};
|
||||
c.Connect();
|
||||
});
|
||||
}
|
||||
|
||||
takeSnapshot(): SystemSnapshot {
|
||||
return {
|
||||
queries: [...this.Queries.values()].map(a => {
|
||||
return {
|
||||
id: a.id,
|
||||
filters: a.filters,
|
||||
closing: a.closing,
|
||||
subFilters: [],
|
||||
};
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
#cleanup() {
|
||||
const now = unixNowMs();
|
||||
let changed = false;
|
||||
for (const [k, v] of this.Queries) {
|
||||
if (v.closingAt && v.closingAt < now) {
|
||||
v.sendClose();
|
||||
this.Queries.delete(k);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if (changed) {
|
||||
this.notifyChange();
|
||||
}
|
||||
setTimeout(() => this.#cleanup(), 1_000);
|
||||
}
|
||||
}
|
||||
|
21
packages/app/src/System/worker.ts
Normal file
21
packages/app/src/System/worker.ts
Normal file
@ -0,0 +1,21 @@
|
||||
/// <reference lib="webworker" />
|
||||
import { UsersRelaysCache } from "Cache/UserRelayCache";
|
||||
import { NostrSystem } from ".";
|
||||
declare const self: SharedWorkerGlobalScope;
|
||||
|
||||
const RelayCache = new UsersRelaysCache();
|
||||
const System = new NostrSystem({
|
||||
get: pk => RelayCache.getFromCache(pk)?.relays,
|
||||
});
|
||||
|
||||
self.onconnect = e => {
|
||||
const port = e.ports[0];
|
||||
|
||||
port.addEventListener("message", async e1 => {
|
||||
console.debug(e1);
|
||||
const [cmd, ...others] = e1.data;
|
||||
switch (cmd) {
|
||||
}
|
||||
});
|
||||
port.start();
|
||||
};
|
@ -35,9 +35,9 @@ import DebugPage from "Pages/Debug";
|
||||
import { db } from "Db";
|
||||
import { preload } from "Cache";
|
||||
import { LoginStore } from "Login";
|
||||
import { UserRelays } from "Cache/UserRelayCache";
|
||||
import { NostrSystem } from "System";
|
||||
import { ProfileLoaderService } from "System/ProfileCache";
|
||||
import { NostrSystem } from "System";
|
||||
import { UserRelays } from "Cache/UserRelayCache";
|
||||
|
||||
/**
|
||||
* Singleton nostr system
|
||||
|
Loading…
x
Reference in New Issue
Block a user