feat: remove buildDiff
This commit is contained in:
parent
8e84dc6a89
commit
4a48f4f340
@ -21,7 +21,7 @@ export default function NoteFooter(props: NoteFooterProps) {
|
|||||||
const link = useMemo(() => NostrLink.fromEvent(ev), [ev.id]);
|
const link = useMemo(() => NostrLink.fromEvent(ev), [ev.id]);
|
||||||
const [showReactions, setShowReactions] = useState(false);
|
const [showReactions, setShowReactions] = useState(false);
|
||||||
|
|
||||||
const related = useReactions(`reactions:${link.tagKey}`, link);
|
const related = useReactions("reactions", link);
|
||||||
const { replies, reactions, zaps, reposts } = useEventReactions(link, related);
|
const { replies, reactions, zaps, reposts } = useEventReactions(link, related);
|
||||||
const { positive } = reactions;
|
const { positive } = reactions;
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ const ReactionsModal = ({ onClose, event, initialTab = 0 }: ReactionsModalProps)
|
|||||||
|
|
||||||
const link = NostrLink.fromEvent(event);
|
const link = NostrLink.fromEvent(event);
|
||||||
|
|
||||||
const related = useReactions(`reactions:${link.tagKey}`, link, undefined, false);
|
const related = useReactions("reactions", link, undefined, false);
|
||||||
const { reactions, zaps, reposts } = useEventReactions(link, related);
|
const { reactions, zaps, reposts } = useEventReactions(link, related);
|
||||||
const { positive, negative } = reactions;
|
const { positive, negative } = reactions;
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@ export class RelayMetadataLoader extends BackgroundLoader<UsersRelays> {
|
|||||||
protected override buildSub(missing: string[]): RequestBuilder {
|
protected override buildSub(missing: string[]): RequestBuilder {
|
||||||
const rb = new RequestBuilder("relay-loader");
|
const rb = new RequestBuilder("relay-loader");
|
||||||
rb.withOptions({
|
rb.withOptions({
|
||||||
skipDiff: true,
|
|
||||||
timeout: 10000,
|
timeout: 10000,
|
||||||
outboxPickN: 4,
|
outboxPickN: 4,
|
||||||
});
|
});
|
||||||
|
@ -76,7 +76,7 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
* Async fetch results
|
* Async fetch results
|
||||||
*/
|
*/
|
||||||
async fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
|
async fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void) {
|
||||||
const filters = req.buildRaw(this.#system);
|
const filters = req.buildRaw();
|
||||||
const q = this.query(req);
|
const q = this.query(req);
|
||||||
if (cb) {
|
if (cb) {
|
||||||
q.on("event", cb);
|
q.on("event", cb);
|
||||||
@ -101,15 +101,6 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
for (const qfl of this.#queryCacheLayers) {
|
for (const qfl of this.#queryCacheLayers) {
|
||||||
qSend = await qfl.processFilter(q, qSend);
|
qSend = await qfl.processFilter(q, qSend);
|
||||||
}
|
}
|
||||||
if (this.#system.cacheRelay) {
|
|
||||||
// fetch results from cache first, flag qSend for sync
|
|
||||||
const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]);
|
|
||||||
if (data.length > 0) {
|
|
||||||
qSend.syncFrom = data as Array<TaggedNostrEvent>;
|
|
||||||
this.#log("Adding from cache: %O", data);
|
|
||||||
q.feed.add(data.map(a => ({ ...a, relays: [] })));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// automated outbox model, load relays for queried authors
|
// automated outbox model, load relays for queried authors
|
||||||
for (const f of qSend.filters) {
|
for (const f of qSend.filters) {
|
||||||
@ -119,19 +110,42 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check for empty filters
|
// check for empty filters
|
||||||
const fNew = trimFilters(qSend.filters);
|
qSend.filters = trimFilters(qSend.filters);
|
||||||
if (fNew.length === 0) {
|
|
||||||
|
// fetch results from cache first, flag qSend for sync
|
||||||
|
if (this.#system.cacheRelay) {
|
||||||
|
const data = await this.#system.cacheRelay.query(["REQ", q.id, ...qSend.filters]);
|
||||||
|
if (data.length > 0) {
|
||||||
|
qSend.syncFrom = data as Array<TaggedNostrEvent>;
|
||||||
|
this.#log("Adding from cache: %O", data);
|
||||||
|
q.feed.add(data.map(a => ({ ...a, relays: [] })));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove satisfied filters
|
||||||
|
if (qSend.syncFrom && qSend.syncFrom.length > 0) {
|
||||||
|
// only remove the "ids" filters
|
||||||
|
const newFilters = qSend.filters.filter(
|
||||||
|
a => !a.ids || (a.ids && !qSend.syncFrom?.some(b => eventMatchesFilter(b, a))),
|
||||||
|
);
|
||||||
|
if (newFilters.length !== qSend.filters.length) {
|
||||||
|
this.#log("Removing satisfied filters %o %o", newFilters, qSend.filters);
|
||||||
|
qSend.filters = newFilters;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// nothing left to send
|
||||||
|
if (qSend.filters.length === 0) {
|
||||||
this.#log("Dropping %s %o", q.id, qSend);
|
this.#log("Dropping %s %o", q.id, qSend);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
qSend.filters = fNew;
|
|
||||||
|
|
||||||
if (qSend.relay) {
|
if (qSend.relay) {
|
||||||
this.#log("Sending query to %s %s %O", qSend.relay, q.id, qSend);
|
|
||||||
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
|
const nc = await this.#system.pool.connect(qSend.relay, { read: true, write: true }, true);
|
||||||
if (nc) {
|
if (nc) {
|
||||||
const qt = q.sendToRelay(nc, qSend);
|
const qt = q.sendToRelay(nc, qSend);
|
||||||
if (qt) {
|
if (qt) {
|
||||||
|
this.#log("Sent query %s to %s %s %O", qt.id, qSend.relay, q.id, qSend);
|
||||||
return [qt];
|
return [qt];
|
||||||
} else {
|
} else {
|
||||||
this.#log("Query not sent to %s: %O", qSend.relay, qSend);
|
this.#log("Query not sent to %s: %O", qSend.relay, qSend);
|
||||||
@ -143,9 +157,9 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
|
|||||||
const ret = [];
|
const ret = [];
|
||||||
for (const [a, s] of this.#system.pool) {
|
for (const [a, s] of this.#system.pool) {
|
||||||
if (!s.ephemeral) {
|
if (!s.ephemeral) {
|
||||||
this.#log("Sending query to %s %s %O", a, q.id, qSend);
|
|
||||||
const qt = q.sendToRelay(s, qSend);
|
const qt = q.sendToRelay(s, qSend);
|
||||||
if (qt) {
|
if (qt) {
|
||||||
|
this.#log("Sent query %s to %s %s %O", qt.id, qSend.relay, q.id, qSend);
|
||||||
ret.push(qt);
|
ret.push(qt);
|
||||||
} else {
|
} else {
|
||||||
this.#log("Query not sent to %s: %O", a, qSend);
|
this.#log("Query not sent to %s: %O", a, qSend);
|
||||||
|
@ -3,11 +3,10 @@ import debug from "debug";
|
|||||||
import { EventEmitter } from "eventemitter3";
|
import { EventEmitter } from "eventemitter3";
|
||||||
import { unixNowMs, unwrap } from "@snort/shared";
|
import { unixNowMs, unwrap } from "@snort/shared";
|
||||||
|
|
||||||
import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment } from ".";
|
import { ReqFilter, Nips, TaggedNostrEvent, SystemInterface, ParsedFragment, FlatReqFilter } from ".";
|
||||||
import { NoteCollection } from "./note-collection";
|
import { NoteCollection } from "./note-collection";
|
||||||
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
import { BuiltRawReqFilter, RequestBuilder } from "./request-builder";
|
||||||
import { eventMatchesFilter } from "./request-matcher";
|
import { eventMatchesFilter } from "./request-matcher";
|
||||||
import { LRUCache } from "lru-cache";
|
|
||||||
import { ConnectionType } from "./connection-pool";
|
import { ConnectionType } from "./connection-pool";
|
||||||
|
|
||||||
interface QueryTraceEvents {
|
interface QueryTraceEvents {
|
||||||
@ -103,23 +102,16 @@ export interface QueryEvents {
|
|||||||
done: () => void;
|
done: () => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
const QueryCache = new LRUCache<string, Array<TaggedNostrEvent>>({
|
|
||||||
ttl: 60_000 * 3,
|
|
||||||
ttlAutopurge: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Active or queued query on the system
|
* Active or queued query on the system
|
||||||
*/
|
*/
|
||||||
export class Query extends EventEmitter<QueryEvents> {
|
export class Query extends EventEmitter<QueryEvents> {
|
||||||
get id() {
|
id: string;
|
||||||
return this.request.id;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RequestBuilder instance
|
* RequestBuilder instance
|
||||||
*/
|
*/
|
||||||
request: RequestBuilder;
|
requests: Array<ReqFilter> = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nostr system interface
|
* Nostr system interface
|
||||||
@ -175,7 +167,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
|
|
||||||
constructor(system: SystemInterface, req: RequestBuilder) {
|
constructor(system: SystemInterface, req: RequestBuilder) {
|
||||||
super();
|
super();
|
||||||
this.request = req;
|
this.id = req.id;
|
||||||
this.#system = system;
|
this.#system = system;
|
||||||
this.#feed = new NoteCollection();
|
this.#feed = new NoteCollection();
|
||||||
this.#leaveOpen = req.options?.leaveOpen ?? false;
|
this.#leaveOpen = req.options?.leaveOpen ?? false;
|
||||||
@ -183,11 +175,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
this.#groupingDelay = req.options?.groupingDelay ?? 100;
|
this.#groupingDelay = req.options?.groupingDelay ?? 100;
|
||||||
this.#checkTraces();
|
this.#checkTraces();
|
||||||
|
|
||||||
const cached = QueryCache.get(this.request.id);
|
this.requests.push(...req.buildRaw());
|
||||||
if (cached) {
|
|
||||||
this.#log("Restored %o for %s", cached, this.request.id);
|
|
||||||
this.feed.add(cached);
|
|
||||||
}
|
|
||||||
this.feed.on("event", evs => this.emit("event", evs));
|
this.feed.on("event", evs => this.emit("event", evs));
|
||||||
this.#start();
|
this.#start();
|
||||||
}
|
}
|
||||||
@ -196,12 +184,8 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
* Adds another request to this one
|
* Adds another request to this one
|
||||||
*/
|
*/
|
||||||
addRequest(req: RequestBuilder) {
|
addRequest(req: RequestBuilder) {
|
||||||
if (req.instance === this.request.instance) {
|
|
||||||
// same requst, do nothing
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.#log("Add query %O to %s", req, this.id);
|
this.#log("Add query %O to %s", req, this.id);
|
||||||
this.request.add(req);
|
this.requests.push(...req.buildRaw());
|
||||||
this.#start();
|
this.#start();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -263,8 +247,6 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
}
|
}
|
||||||
this.#stopCheckTraces();
|
this.#stopCheckTraces();
|
||||||
this.emit("end");
|
this.emit("end");
|
||||||
QueryCache.set(this.request.id, this.feed.snapshot);
|
|
||||||
this.#log("Saved %O for %s", this.feed.snapshot, this.request.id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -345,16 +327,39 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
|
|
||||||
async #emitFilters() {
|
async #emitFilters() {
|
||||||
this.#log("Starting emit of %s", this.id);
|
this.#log("Starting emit of %s", this.id);
|
||||||
const existing = this.filters;
|
let rawFilters = [...this.requests];
|
||||||
if (!(this.request.options?.skipDiff ?? false) && existing.length > 0) {
|
this.requests = [];
|
||||||
const filters = this.request.buildDiff(this.#system, existing);
|
if (this.#system.requestRouter) {
|
||||||
this.#log("Build %s %O", this.id, filters);
|
rawFilters = this.#system.requestRouter.forAllRequest(rawFilters);
|
||||||
filters.forEach(f => this.emit("request", this.id, f));
|
|
||||||
} else {
|
|
||||||
const filters = this.request.build(this.#system);
|
|
||||||
this.#log("Build %s %O", this.id, filters);
|
|
||||||
filters.forEach(f => this.emit("request", this.id, f));
|
|
||||||
}
|
}
|
||||||
|
const expanded = rawFilters.flatMap(a => this.#system.optimizer.expandFilter(a));
|
||||||
|
const fx = this.#groupFlatByRelay(expanded);
|
||||||
|
fx.forEach(a => this.emit("request", this.id, a));
|
||||||
|
}
|
||||||
|
|
||||||
|
#groupFlatByRelay(filters: Array<FlatReqFilter>) {
|
||||||
|
const relayMerged = filters.reduce((acc, v) => {
|
||||||
|
const relay = v.relay ?? "";
|
||||||
|
// delete relay from filter
|
||||||
|
delete v.relay;
|
||||||
|
const existing = acc.get(relay);
|
||||||
|
if (existing) {
|
||||||
|
existing.push(v);
|
||||||
|
} else {
|
||||||
|
acc.set(relay, [v]);
|
||||||
|
}
|
||||||
|
return acc;
|
||||||
|
}, new Map<string, Array<FlatReqFilter>>());
|
||||||
|
|
||||||
|
const ret = [];
|
||||||
|
for (const [k, v] of relayMerged.entries()) {
|
||||||
|
const filters = this.#system.optimizer.flatMerge(v);
|
||||||
|
ret.push({
|
||||||
|
relay: k,
|
||||||
|
filters,
|
||||||
|
} as BuiltRawReqFilter);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
#stopCheckTraces() {
|
#stopCheckTraces() {
|
||||||
@ -419,7 +424,7 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
|
const eventHandler = (sub: string, ev: TaggedNostrEvent) => {
|
||||||
if ((this.request.options?.fillStore ?? true) && qt.id === sub) {
|
if (qt.id === sub) {
|
||||||
if (qt.filters.some(v => eventMatchesFilter(ev, v))) {
|
if (qt.filters.some(v => eventMatchesFilter(ev, v))) {
|
||||||
this.feed.add(ev);
|
this.feed.add(ev);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
import debug from "debug";
|
|
||||||
import { v4 as uuid } from "uuid";
|
import { v4 as uuid } from "uuid";
|
||||||
import { appendDedupe, dedupe, removeUndefined, sanitizeRelayUrl, unixNowMs, unwrap } from "@snort/shared";
|
import { appendDedupe, dedupe, removeUndefined, sanitizeRelayUrl, unwrap } from "@snort/shared";
|
||||||
|
|
||||||
import EventKind from "./event-kind";
|
import EventKind from "./event-kind";
|
||||||
import { FlatReqFilter, NostrLink, NostrPrefix, SystemInterface, ToNostrEventTag } from ".";
|
import { NostrLink, NostrPrefix, ToNostrEventTag } from ".";
|
||||||
import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr";
|
import { ReqFilter, u256, HexKey, TaggedNostrEvent } from "./nostr";
|
||||||
import { RequestRouter } from "./request-router";
|
import { RequestRouter } from "./request-router";
|
||||||
|
|
||||||
@ -23,11 +22,6 @@ export interface RequestBuilderOptions {
|
|||||||
*/
|
*/
|
||||||
leaveOpen?: boolean;
|
leaveOpen?: boolean;
|
||||||
|
|
||||||
/**
|
|
||||||
* Do not apply diff logic and always use full filters for query
|
|
||||||
*/
|
|
||||||
skipDiff?: boolean;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pick N relays per pubkey when using outbox strategy
|
* Pick N relays per pubkey when using outbox strategy
|
||||||
*/
|
*/
|
||||||
@ -42,12 +36,6 @@ export interface RequestBuilderOptions {
|
|||||||
* How many milli-seconds to wait to allow grouping
|
* How many milli-seconds to wait to allow grouping
|
||||||
*/
|
*/
|
||||||
groupingDelay?: number;
|
groupingDelay?: number;
|
||||||
|
|
||||||
/**
|
|
||||||
* If events should be added automatically to the internal NoteCollection
|
|
||||||
* default=true
|
|
||||||
*/
|
|
||||||
fillStore?: boolean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -58,8 +46,6 @@ export class RequestBuilder {
|
|||||||
instance: string;
|
instance: string;
|
||||||
#builders: Array<RequestFilterBuilder>;
|
#builders: Array<RequestFilterBuilder>;
|
||||||
#options?: RequestBuilderOptions;
|
#options?: RequestBuilderOptions;
|
||||||
#log = debug("RequestBuilder");
|
|
||||||
#rawCached?: Array<ReqFilter>;
|
|
||||||
|
|
||||||
constructor(id: string) {
|
constructor(id: string) {
|
||||||
this.instance = uuid();
|
this.instance = uuid();
|
||||||
@ -84,20 +70,17 @@ export class RequestBuilder {
|
|||||||
*/
|
*/
|
||||||
add(other: RequestBuilder) {
|
add(other: RequestBuilder) {
|
||||||
this.#builders.push(...other.#builders);
|
this.#builders.push(...other.#builders);
|
||||||
this.#rawCached = undefined;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
withFilter() {
|
withFilter() {
|
||||||
const ret = new RequestFilterBuilder();
|
const ret = new RequestFilterBuilder();
|
||||||
this.#builders.push(ret);
|
this.#builders.push(ret);
|
||||||
this.#rawCached = undefined;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
withBareFilter(f: ReqFilter) {
|
withBareFilter(f: ReqFilter) {
|
||||||
const ret = new RequestFilterBuilder(f);
|
const ret = new RequestFilterBuilder(f);
|
||||||
this.#builders.push(ret);
|
this.#builders.push(ret);
|
||||||
this.#rawCached = undefined;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,67 +92,8 @@ export class RequestBuilder {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
buildRaw(system?: SystemInterface): Array<ReqFilter> {
|
buildRaw(): Array<ReqFilter> {
|
||||||
if (!this.#rawCached && system) {
|
return this.#builders.map(f => f.filter);
|
||||||
this.#rawCached = system.optimizer.compress(this.#builders.map(f => f.filter));
|
|
||||||
}
|
|
||||||
return this.#rawCached ?? this.#builders.map(f => f.filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
build(system: SystemInterface): Array<BuiltRawReqFilter> {
|
|
||||||
let rawFilters = this.buildRaw(system);
|
|
||||||
if (system.requestRouter) {
|
|
||||||
rawFilters = system.requestRouter.forAllRequest(rawFilters);
|
|
||||||
}
|
|
||||||
const expanded = rawFilters.flatMap(a => system.optimizer.expandFilter(a));
|
|
||||||
return this.#groupFlatByRelay(system, expanded);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Detects a change in request from a previous set of filters
|
|
||||||
*/
|
|
||||||
buildDiff(system: SystemInterface, prev: Array<ReqFilter>): Array<BuiltRawReqFilter> {
|
|
||||||
const start = unixNowMs();
|
|
||||||
|
|
||||||
let rawFilters = this.buildRaw(system);
|
|
||||||
if (system.requestRouter) {
|
|
||||||
rawFilters = system.requestRouter.forAllRequest(rawFilters);
|
|
||||||
}
|
|
||||||
const diff = system.optimizer.getDiff(prev, rawFilters);
|
|
||||||
if (diff.length > 0) {
|
|
||||||
const ret = this.#groupFlatByRelay(system, diff);
|
|
||||||
const ts = unixNowMs() - start;
|
|
||||||
if (ts >= 100) {
|
|
||||||
this.#log("slow diff %s %d ms, consider separate query ids, or use skipDiff: %O", this.id, ts, prev);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
#groupFlatByRelay(system: SystemInterface, filters: Array<FlatReqFilter>) {
|
|
||||||
const relayMerged = filters.reduce((acc, v) => {
|
|
||||||
const relay = v.relay ?? "";
|
|
||||||
// delete relay from filter
|
|
||||||
delete v.relay;
|
|
||||||
const existing = acc.get(relay);
|
|
||||||
if (existing) {
|
|
||||||
existing.push(v);
|
|
||||||
} else {
|
|
||||||
acc.set(relay, [v]);
|
|
||||||
}
|
|
||||||
return acc;
|
|
||||||
}, new Map<string, Array<FlatReqFilter>>());
|
|
||||||
|
|
||||||
const ret = [];
|
|
||||||
for (const [k, v] of relayMerged.entries()) {
|
|
||||||
const filters = system.optimizer.flatMerge(v);
|
|
||||||
ret.push({
|
|
||||||
relay: k,
|
|
||||||
filters,
|
|
||||||
} as BuiltRawReqFilter);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user