fix: various

This commit is contained in:
kieran 2024-04-23 15:43:07 +01:00
parent 9ddd8fc6c2
commit a3299ab29a
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
8 changed files with 59 additions and 104 deletions

View File

@ -6,6 +6,7 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
#system: SystemInterface;
readonly cache: CachedTable<T>;
#log = debug(this.name());
#blacklist = new Set();
/**
* List of pubkeys to fetch metadata for
@ -43,11 +44,6 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
*/
protected abstract buildSub(missing: Array<string>): RequestBuilder;
/**
* Create a placeholder value when no data can be found
*/
protected abstract makePlaceholder(key: string): T | undefined;
/**
* Start requesting a set of keys to be loaded
*/
@ -92,7 +88,7 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
}
async #FetchMetadata() {
const loading = [...this.#wantsKeys];
const loading = [...this.#wantsKeys].filter(a => !this.#blacklist.has(a));
await this.cache.buffer(loading);
const missing = loading.filter(a => (this.cache.getFromCache(a)?.loaded ?? 0) < this.getExpireCutoff());
@ -100,12 +96,9 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
this.#log("Fetching keys: %O", missing);
try {
const found = await this.#loadData(missing);
const noResult = removeUndefined(
missing.filter(a => !found.some(b => a === this.cache.key(b))).map(a => this.makePlaceholder(a)),
);
const noResult = removeUndefined(missing.filter(a => !found.some(b => a === this.cache.key(b))));
if (noResult.length > 0) {
this.#log("Adding placeholders for %O", noResult);
await Promise.all(noResult.map(a => this.cache.update(a)));
noResult.forEach(a => this.#blacklist.add(a));
}
} catch (e) {
this.#log("Error: %O", e);
@ -122,19 +115,11 @@ export abstract class BackgroundLoader<T extends { loaded: number; created: numb
await Promise.all(results.map(a => this.cache.update(a)));
return results;
} else {
const hookHandled = new Set<string>();
const v = await this.#system.Fetch(this.buildSub(missing), async e => {
this.#log("Callback handled %o", e);
for (const pe of e) {
const m = this.onEvent(pe);
if (m) {
await this.cache.update(m);
hookHandled.add(pe.id);
}
}
});
const v = await this.#system.Fetch(this.buildSub(missing));
this.#log("Got data", v);
return removeUndefined(v.map(this.onEvent));
const results = removeUndefined(v.map(this.onEvent));
await Promise.all(results.map(a => this.cache.update(a)));
return results;
}
}
}

View File

@ -121,9 +121,8 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
if (builder) {
this.#connectionBuilder = builder;
} else {
this.#connectionBuilder = async (addr, options, ephemeral) => {
const c = new Connection(addr, options, ephemeral);
return c as unknown as T;
this.#connectionBuilder = (addr, options, ephemeral) => {
return Promise.resolve<T>(new Connection(addr, options, ephemeral) as unknown as T);
};
}
}
@ -167,6 +166,10 @@ export class DefaultConnectionPool<T extends ConnectionType = Connection>
if (existing.ephemeral && !ephemeral) {
existing.ephemeral = ephemeral;
}
// re-open if closed
if (existing.ephemeral && !existing.isOpen) {
await existing.connect();
}
return existing;
}
} catch (e) {

View File

@ -120,6 +120,9 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
const wasReconnect = this.Socket !== null;
if (this.Socket) {
this.id = uuid();
if (this.isOpen) {
this.Socket.close();
}
this.Socket.onopen = null;
this.Socket.onmessage = null;
this.Socket.onerror = null;
@ -133,10 +136,8 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
this.Socket.onclose = e => this.#onClose(e);
}
close(final = true) {
if (final) {
this.#closing = true;
}
close() {
this.#closing = true;
this.Socket?.close();
}
@ -326,14 +327,10 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
* @param cmd The REQ to send to the server
*/
request(cmd: ReqCommand | SyncCommand, cbSent?: () => void) {
const requestKinds = dedupe(
cmd
.slice(2)
.map(a => (a as ReqFilter).kinds ?? [])
.flat(),
);
const filters = (cmd[0] === "REQ" ? cmd.slice(2) : cmd.slice(3)) as Array<ReqFilter>;
const requestKinds = new Set(filters.flatMap(a => a.kinds ?? []));
const ExpectAuth = [EventKind.DirectMessage, EventKind.GiftWrap];
if (ExpectAuth.some(a => requestKinds.includes(a)) && !this.#expectAuth) {
if (ExpectAuth.some(a => requestKinds.has(a)) && !this.#expectAuth) {
this.#expectAuth = true;
this.#log("Setting expectAuth flag %o", requestKinds);
}
@ -518,7 +515,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
}
get #maxSubscriptions() {
return this.info?.limitation?.max_subscriptions ?? 25;
return this.info?.limitation?.max_subscriptions ?? 20;
}
#setupEphemeral() {
@ -529,7 +526,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
if (this.ephemeral) {
this.#ephemeralCheck = setInterval(() => {
const lastActivity = unixNowMs() - this.#activity;
if (lastActivity > 30_000 && !this.#closing) {
if (lastActivity > 10_000 && !this.#closing) {
if (this.#activeRequests.size > 0) {
this.#log(
"Inactive connection has %d active requests! %O",
@ -537,7 +534,7 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
this.#activeRequests,
);
} else {
this.close(false);
this.close();
}
}
}, 5_000);

View File

@ -21,13 +21,16 @@ export type EventFetcher = {
export function parseRelayTag(tag: Array<string>) {
if (tag[0] !== "r") return;
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
const url = sanitizeRelayUrl(tag[1]);
if (url) {
return {
url,
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
}
export function parseRelayTags(tag: Array<Array<string>>) {
@ -39,15 +42,19 @@ export function parseRelaysFromKind(ev: NostrEvent) {
const relaysInContent =
ev.content.length > 0 ? (JSON.parse(ev.content) as Record<string, { read: boolean; write: boolean }>) : undefined;
if (relaysInContent) {
return Object.entries(relaysInContent).map(
([k, v]) =>
({
url: sanitizeRelayUrl(k),
settings: {
read: v.read,
write: v.write,
},
}) as FullRelaySettings,
return removeUndefined(
Object.entries(relaysInContent).map(([k, v]) => {
const url = sanitizeRelayUrl(k);
if (url) {
return {
url,
settings: {
read: v.read,
write: v.write,
},
} as FullRelaySettings;
}
}),
);
}
} else if (ev.kind === EventKind.Relays) {

View File

@ -34,13 +34,4 @@ export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
rb.withFilter().authors(missing).kinds([EventKind.Relays, EventKind.ContactList]);
return rb;
}
protected override makePlaceholder(key: string): UsersRelays | undefined {
return {
relays: [],
pubkey: key,
created: 0,
loaded: this.getExpireCutoff() + 300000,
};
}
}

View File

@ -22,12 +22,4 @@ export class ProfileLoaderService extends BackgroundLoader<CachedMetadata> {
sub.withFilter().kinds([EventKind.SetMetadata]).authors(missing).relay(MetadataRelays);
return sub;
}
protected override makePlaceholder(key: string): CachedMetadata | undefined {
return {
pubkey: key,
loaded: unixNowMs() - ProfileCacheExpire + 30_000,
created: 0,
} as CachedMetadata;
}
}

View File

@ -107,7 +107,7 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
if (data.length > 0) {
qSend.syncFrom = data as Array<TaggedNostrEvent>;
this.#log("Adding from cache: %O", data);
q.feed.add(data as Array<TaggedNostrEvent>);
q.feed.add(data.map(a => ({ ...a, relays: [] })));
}
}
@ -128,26 +128,16 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
if (qSend.relay) {
this.#log("Sending query to %s %s %O", qSend.relay, q.id, qSend);
const s = this.#system.pool.getConnection(qSend.relay);
if (s) {
const qt = q.sendToRelay(s, qSend);
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
if (nc) {
const qt = q.sendToRelay(nc, qSend);
if (qt) {
return [qt];
} else {
this.#log("Query not sent to %s: %O", qSend.relay, qSend);
}
} else {
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
if (nc) {
const qt = q.sendToRelay(nc, qSend);
if (qt) {
return [qt];
} else {
this.#log("Query not sent to %s: %O", qSend.relay, qSend);
}
} else {
console.warn("Failed to connect to new relay for:", qSend.relay, q);
}
console.warn("Failed to connect to new relay for:", qSend.relay, q);
}
} else {
const ret = [];

View File

@ -11,7 +11,6 @@ import { LRUCache } from "lru-cache";
import { ConnectionType } from "./connection-pool";
interface QueryTraceEvents {
change: () => void;
close: (id: string) => void;
eose: (id: string, connId: string, wasForced: boolean) => void;
}
@ -39,13 +38,11 @@ export class QueryTrace extends EventEmitter<QueryTraceEvents> {
sentToRelay() {
this.sent = unixNowMs();
this.emit("change");
}
gotEose() {
this.eose = unixNowMs();
this.emit("eose", this.id, this.connId, false);
this.emit("change");
}
forceEose() {
@ -59,7 +56,6 @@ export class QueryTrace extends EventEmitter<QueryTraceEvents> {
sendClose() {
this.close = unixNowMs();
this.emit("close", this.id);
this.emit("change");
}
/**
@ -354,14 +350,6 @@ export class Query extends EventEmitter<QueryEvents> {
}
}
#onProgress() {
const isFinished = this.progress === 1;
if (isFinished) {
this.#log("%s loading=%s, progress=%d, traces=%O", this.id, !isFinished, this.progress, this.#tracing);
this.emit("done");
}
}
#stopCheckTraces() {
if (this.#checkTrace) {
clearInterval(this.#checkTrace);
@ -411,16 +399,18 @@ export class Query extends EventEmitter<QueryEvents> {
let filters = q.filters;
const qt = new QueryTrace(c.address, filters, c.id);
qt.on("close", x => c.closeRequest(x));
qt.on("change", () => this.#onProgress());
qt.on("eose", (id, connId, forced) =>
qt.on("eose", (id, connId, forced) => {
this.emit("trace", {
id,
conn: c,
wasForced: forced,
queued: qt.queued,
responseTime: qt.responseTime,
} as TraceReport),
);
} as TraceReport);
if (this.progress === 1) {
this.emit("done");
}
});
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
if (this.request.options?.fillStore ?? true) {
this.handleEvent(sub, ev);