feat: write reply events to recipients relays
Some checks are pending
continuous-integration/drone/push Build is running

This commit is contained in:
Kieran 2023-11-22 11:40:28 +00:00
parent 56db2f652d
commit 92c26ca609
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
8 changed files with 91 additions and 34 deletions

View File

@ -1,7 +1,6 @@
import { useMemo } from "react";
import { HexKey, EventKind, RequestBuilder, ReplaceableNoteStore } from "@snort/system";
import { HexKey, EventKind, RequestBuilder, ReplaceableNoteStore, parseRelayTags } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import { parseRelayTag } from "./RelaysFeedFollows";
export default function useRelaysFeed(pubkey?: HexKey) {
const sub = useMemo(() => {
@ -12,5 +11,5 @@ export default function useRelaysFeed(pubkey?: HexKey) {
}, [pubkey]);
const relays = useRequestBuilder(ReplaceableNoteStore, sub);
return relays.data?.tags.filter(a => a[0] === "r").map(parseRelayTag) ?? [];
return parseRelayTags(relays.data?.tags.filter(a => a[0] === "r") ?? []);
}

View File

@ -1,9 +1,8 @@
import { useMemo } from "react";
import { HexKey, FullRelaySettings, TaggedNostrEvent, EventKind, NoteCollection, RequestBuilder } from "@snort/system";
import { HexKey, FullRelaySettings, TaggedNostrEvent, EventKind, NoteCollection, RequestBuilder, parseRelayTags } from "@snort/system";
import { useRequestBuilder } from "@snort/system-react";
import debug from "debug";
import { sanitizeRelayUrl } from "@/SnortUtils";
import { UserRelays } from "@/Cache";
interface RelayList {
@ -26,7 +25,7 @@ export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array<RelayList
return {
pubkey: ev.pubkey,
created_at: ev.created_at,
relays: ev.tags.map(parseRelayTag).filter(a => a.url !== undefined),
relays: parseRelayTags(ev.tags),
};
});
}
@ -36,14 +35,4 @@ export default function useRelaysFeedFollows(pubkeys: HexKey[]): Array<RelayList
return useMemo(() => {
return mapFromRelays(notesRelays);
}, [relays]);
}
export function parseRelayTag(tag: Array<string>) {
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
}

View File

@ -46,6 +46,7 @@ export interface UsersRelays {
pubkey: string;
created_at: number;
relays: FullRelaySettings[];
loaded: number;
}
export function mapEventToProfile(ev: NostrEvent) {

View File

@ -19,6 +19,11 @@ export const TagRefRegex = /(#\[\d+\])/gm;
*/
export const ProfileCacheExpire = 1_000 * 60 * 60 * 6;
/**
* How long before relay lists should be refreshed
*/
export const RelayListCacheExpire = 1_000 * 60 * 60 * 12;
/**
* Extract file extensions regex
*/

View File

@ -4,7 +4,7 @@ import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { Query } from "./query";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { ProfileLoaderService } from "./profile-cache";
import { RelayCache } from "./gossip-model";
import { RelayCache } from "./outbox-model";
import { QueryOptimizer } from "./query-optimizer";
import { base64 } from "@scure/base";
@ -31,6 +31,7 @@ export * from "./pow";
export * from "./pow-util";
export * from "./query-optimizer";
export * from "./encrypted";
export * from "./outbox-model";
export * from "./impl/nip4";
export * from "./impl/nip44";
@ -71,7 +72,7 @@ export interface SystemInterface {
* @param req Request to send to relays
* @param cb A callback which will fire every 100ms when new data is received
*/
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<NoteStoreSnapshotData>;
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<Array<TaggedNostrEvent>>;
/**
* Create a new permanent connection to a relay

View File

@ -22,7 +22,7 @@ import {
EventExt,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayCache } from "./gossip-model";
import { RelayCache, pickRelaysForReply } from "./outbox-model";
import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer";
import { trimFilters } from "./request-trim";
@ -246,7 +246,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
const q = this.Query(NoteCollection, req);
return new Promise<NoteStoreSnapshotData>(resolve => {
return new Promise<Array<TaggedNostrEvent>>(resolve => {
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
const releaseOnEvent = cb
@ -267,7 +267,7 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
releaseOnEvent?.();
releaseFeedHook();
q.cancel();
resolve(unwrap(q.feed.snapshot.data));
resolve(unwrap((q.feed as NoteCollection).snapshot.data));
}
});
});
@ -382,8 +382,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
*/
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
const socks = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write);
const oks = await Promise.all(
socks.map(async s => {
const replyRelays = await pickRelaysForReply(ev, this);
const oks = await Promise.all([
...socks.map(async s => {
try {
const rsp = await s.SendAsync(ev);
cb?.(rsp);
@ -393,7 +394,8 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
return;
}),
);
...replyRelays.filter(a => !socks.some(b => b.Address === a)).map(a => this.WriteOnceToRelay(a, ev)),
]);
return removeUndefined(oks);
}

View File

@ -1,7 +1,16 @@
import { ReqFilter, UsersRelays } from ".";
import { dedupe, unwrap } from "@snort/shared";
import {
EventKind,
FullRelaySettings,
NostrEvent,
ReqFilter,
RequestBuilder,
SystemInterface,
UsersRelays,
} from ".";
import { dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
import debug from "debug";
import { FlatReqFilter } from "./query-optimizer";
import { RelayListCacheExpire } from "./const";
const PickNRelays = 2;
@ -20,8 +29,13 @@ export interface RelayTaggedFilters {
filters: Array<ReqFilter>;
}
const logger = debug("OutboxModel");
export interface RelayCache {
getFromCache(pubkey?: string): UsersRelays | undefined;
update(obj: UsersRelays): Promise<"new" | "updated" | "refresh" | "no_change">;
buffer(keys: Array<string>): Promise<Array<string>>;
bulkSet(objs: Array<UsersRelays>): Promise<void>;
}
export function splitAllByWriteRelays(cache: RelayCache, filters: Array<ReqFilter>) {
@ -61,7 +75,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
];
}
const topRelays = pickTopRelays(cache, unwrap(authors), PickNRelays);
const topRelays = pickTopRelays(cache, unwrap(authors), PickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
@ -84,7 +98,7 @@ export function splitByWriteRelays(cache: RelayCache, filter: ReqFilter): Array<
},
});
}
debug("GOSSIP")("Picked %O => %O", filter, picked);
logger("Picked %O => %O", filter, picked);
return picked;
}
@ -101,7 +115,7 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
},
];
}
const topRelays = pickTopRelays(cache, authors, PickNRelays);
const topRelays = pickTopRelays(cache, authors, PickNRelays, "write");
const pickedRelays = dedupe(topRelays.flatMap(a => a.relays));
const picked = pickedRelays.map(a => {
@ -119,21 +133,21 @@ export function splitFlatByWriteRelays(cache: RelayCache, input: Array<FlatReqFi
} as RelayTaggedFlatFilters);
}
debug("GOSSIP")("Picked %d relays from %d filters", picked.length, input.length);
logger("Picked %d relays from %d filters", picked.length, input.length);
return picked;
}
/**
* Pick most popular relays for each authors
*/
function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number) {
function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number, type: "write" | "read") {
// map of pubkey -> [write relays]
const allRelays = authors.map(a => {
return {
key: a,
relays: cache
.getFromCache(a)
?.relays?.filter(a => a.settings.write)
?.relays?.filter(a => (type === "write" ? a.settings.write : a.settings.read))
.sort(() => (Math.random() < 0.5 ? 1 : -1)),
};
});
@ -178,3 +192,49 @@ function pickTopRelays(cache: RelayCache, authors: Array<string>, n: number) {
}),
);
}
/**
* Pick read relays for sending reply events
*/
export async function pickRelaysForReply(ev: NostrEvent, system: SystemInterface) {
const recipients = dedupe(ev.tags.filter(a => a[0] === "p").map(a => a[1]));
await updateRelayLists(recipients, system);
const relays = pickTopRelays(system.RelayCache, recipients, 2, "read");
const ret = dedupe(relays.map(a => a.relays).flat());
logger("Picked %O from authors %O", ret, recipients);
return ret;
}
export function parseRelayTag(tag: Array<string>) {
return {
url: sanitizeRelayUrl(tag[1]),
settings: {
read: tag[2] === "read" || tag[2] === undefined,
write: tag[2] === "write" || tag[2] === undefined,
},
} as FullRelaySettings;
}
export function parseRelayTags(tag: Array<Array<string>>) {
return tag.map(parseRelayTag).filter(a => a !== null);
}
export async function updateRelayLists(authors: Array<string>, system: SystemInterface) {
await system.RelayCache.buffer(authors);
const expire = unixNowMs() - RelayListCacheExpire;
const expired = authors.filter(a => (system.RelayCache.getFromCache(a)?.loaded ?? 0) < expire);
if (expired.length > 0) {
logger("Updating relays for authors: %O", expired);
const rb = new RequestBuilder("system-update-relays-for-outbox");
rb.withFilter().authors(expired).kinds([EventKind.Relays]);
const relayLists = await system.Fetch(rb);
await system.RelayCache.bulkSet(
relayLists.map(a => ({
relays: parseRelayTags(a.tags),
pubkey: a.pubkey,
created_at: a.created_at,
loaded: unixNowMs(),
})),
);
}
}

View File

@ -5,7 +5,7 @@ import { appendDedupe, dedupe, sanitizeRelayUrl, unixNowMs, unwrap } from "@snor
import EventKind from "./event-kind";
import { NostrLink, NostrPrefix, SystemInterface } from ".";
import { ReqFilter, u256, HexKey } from "./nostr";
import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./gossip-model";
import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./outbox-model";
/**
* Which strategy is used when building REQ filters