chore: adjust sync method
This commit is contained in:
@ -8,9 +8,11 @@ import { DefaultConnectTimeout } from "./const";
|
||||
import { NostrEvent, OkResponse, ReqCommand, ReqFilter, TaggedNostrEvent, u256 } from "./nostr";
|
||||
import { RelayInfo } from "./relay-info";
|
||||
import EventKind from "./event-kind";
|
||||
import { EventExt } from "./event-ext";
|
||||
import { EventExt, EventType } from "./event-ext";
|
||||
import { NegentropyFlow } from "./negentropy/negentropy-flow";
|
||||
import { ConnectionType, ConnectionTypeEvents } from "./connection-pool";
|
||||
import { RangeSync } from "./sync";
|
||||
import { NoteCollection } from "./note-collection";
|
||||
|
||||
/**
|
||||
* Relay settings
|
||||
@ -395,15 +397,36 @@ export class Connection extends EventEmitter<ConnectionTypeEvents> implements Co
|
||||
} else if (cmd[0] === "SYNC") {
|
||||
const [_, id, eventSet, ...filters] = cmd;
|
||||
const lastResortSync = () => {
|
||||
if (filters.some(a => a.since || a.until || a.ids)) {
|
||||
const isReplacableSync = filters.every(a => a.kinds?.every(b => EventExt.getType(b) === EventType.Replaceable || EventExt.getType(b) === EventType.ParameterizedReplaceable) ?? false);
|
||||
if (filters.some(a => a.since || a.until || a.ids || a.limit) || isReplacableSync) {
|
||||
this.request(["REQ", id, ...filters], item.cb);
|
||||
} else {
|
||||
const rs = RangeSync.forFetcher(async (rb, cb) => {
|
||||
return await new Promise((resolve, reject) => {
|
||||
const results = new NoteCollection();
|
||||
const f = rb.buildRaw();
|
||||
this.on("event", (c, e) => {
|
||||
if (rb.id === c) {
|
||||
cb?.([e]);
|
||||
results.add(e);
|
||||
}
|
||||
});
|
||||
this.on("eose", s => {
|
||||
if (s === rb.id) {
|
||||
resolve(results.takeSnapshot());
|
||||
}
|
||||
});
|
||||
this.request(["REQ", rb.id, ...f], undefined);
|
||||
});
|
||||
});
|
||||
const latest = eventSet.reduce((acc, v) => (acc = v.created_at > acc ? v.created_at : acc), 0);
|
||||
const newFilters = filters.map(a => ({
|
||||
...a,
|
||||
since: latest + 1,
|
||||
}));
|
||||
this.request(["REQ", id, ...newFilters], item.cb);
|
||||
rs.setStartPoint(latest + 1);
|
||||
rs.on("event", ev => {
|
||||
ev.forEach(e => this.emit("event", id, e));
|
||||
});
|
||||
for (const f of filters) {
|
||||
rs.sync(f);
|
||||
}
|
||||
}
|
||||
};
|
||||
if (this.info?.negentropy === "v1") {
|
||||
|
@ -1,6 +1,7 @@
|
||||
import { unixNow } from "@snort/shared";
|
||||
import EventEmitter from "eventemitter3";
|
||||
import { ReqFilter, RequestBuilder, SystemInterface, TaggedNostrEvent } from "..";
|
||||
import { v4 as uuid } from "uuid";
|
||||
|
||||
/**
|
||||
* When nostr was created
|
||||
@ -16,13 +17,27 @@ interface RangeSyncEvents {
|
||||
* A simple time based sync for pulling lots of data from nostr
|
||||
*/
|
||||
export class RangeSync extends EventEmitter<RangeSyncEvents> {
|
||||
#id = uuid();
|
||||
#start: number = NostrBirthday;
|
||||
#windowSize: number = 60 * 60 * 12;
|
||||
#fetcher!: SystemInterface["Fetch"];
|
||||
|
||||
constructor(readonly system: SystemInterface) {
|
||||
private constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
static forSystem(system: SystemInterface) {
|
||||
const rs = new RangeSync();
|
||||
rs.#fetcher = (r, c) => system.Fetch(r, c);
|
||||
return rs;
|
||||
}
|
||||
|
||||
static forFetcher(fn: SystemInterface["Fetch"]) {
|
||||
const rs = new RangeSync();
|
||||
rs.#fetcher = fn;
|
||||
return rs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set window size in seconds
|
||||
*/
|
||||
@ -52,18 +67,15 @@ export class RangeSync extends EventEmitter<RangeSyncEvents> {
|
||||
throw new Error("Filter must not contain since/until/limit");
|
||||
}
|
||||
|
||||
if (!this.system.requestRouter) {
|
||||
throw new Error("RangeSync cannot work without request router!");
|
||||
}
|
||||
|
||||
const now = unixNow();
|
||||
let ctr = 1;
|
||||
for (let end = now; end > this.#start; end -= this.#windowSize) {
|
||||
const rb = new RequestBuilder(`range-query:${end}`);
|
||||
const rb = new RequestBuilder(`${this.#id}+${ctr++}`);
|
||||
rb.withBareFilter(filter)
|
||||
.since(end - this.#windowSize)
|
||||
.until(end);
|
||||
this.emit("scan", end);
|
||||
const results = await this.system.Fetch(rb);
|
||||
const results = await this.#fetcher(rb);
|
||||
this.emit("event", results);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user