Compare commits
2 Commits
7c1f2c539f
...
0de93a0a53
Author | SHA1 | Date | |
---|---|---|---|
0de93a0a53 | |||
579589f635 |
@ -6,6 +6,7 @@ import { Connection, RelaySettings } from "./connection";
|
|||||||
import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr";
|
import { NostrEvent, OkResponse, TaggedNostrEvent } from "./nostr";
|
||||||
import { pickRelaysForReply } from "./outbox-model";
|
import { pickRelaysForReply } from "./outbox-model";
|
||||||
import { SystemInterface } from ".";
|
import { SystemInterface } from ".";
|
||||||
|
import LRUSet from "@snort/shared/src/LRUSet";
|
||||||
|
|
||||||
export interface NostrConnectionPoolEvents {
|
export interface NostrConnectionPoolEvents {
|
||||||
connected: (address: string, wasReconnect: boolean) => void;
|
connected: (address: string, wasReconnect: boolean) => void;
|
||||||
@ -38,6 +39,7 @@ export class DefaultConnectionPool extends EventEmitter<NostrConnectionPoolEvent
|
|||||||
* All currently connected websockets
|
* All currently connected websockets
|
||||||
*/
|
*/
|
||||||
#sockets = new Map<string, Connection>();
|
#sockets = new Map<string, Connection>();
|
||||||
|
#requestedIds = new LRUSet<string>(1000);
|
||||||
|
|
||||||
constructor(system: SystemInterface) {
|
constructor(system: SystemInterface) {
|
||||||
super();
|
super();
|
||||||
@ -69,6 +71,21 @@ export class DefaultConnectionPool extends EventEmitter<NostrConnectionPoolEvent
|
|||||||
}
|
}
|
||||||
this.emit("event", addr, s, e);
|
this.emit("event", addr, s, e);
|
||||||
});
|
});
|
||||||
|
c.on("have", async (s, id) => {
|
||||||
|
this.#log("%s have: %s %o", c.Address, s, id);
|
||||||
|
if (this.#requestedIds.has(id)) {
|
||||||
|
this.#log("HAVE: Already requested from another relay %s", id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.#requestedIds.add(id);
|
||||||
|
if (await this.#system.eventsCache.get(id)) {
|
||||||
|
// TODO better local cache / db check
|
||||||
|
this.#log("HAVE: Already have %s", id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.#log("HAVE: GET requesting %s", id);
|
||||||
|
c.queueReq(["GET", id], () => {});
|
||||||
|
});
|
||||||
c.on("eose", s => this.emit("eose", addr, s));
|
c.on("eose", s => this.emit("eose", addr, s));
|
||||||
c.on("disconnect", code => this.emit("disconnect", addr, code));
|
c.on("disconnect", code => this.emit("disconnect", addr, code));
|
||||||
c.on("connected", r => this.emit("connected", addr, r));
|
c.on("connected", r => this.emit("connected", addr, r));
|
||||||
|
@ -28,6 +28,7 @@ interface ConnectionEvents {
|
|||||||
disconnect: (code: number) => void;
|
disconnect: (code: number) => void;
|
||||||
auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void;
|
auth: (challenge: string, relay: string, cb: (ev: NostrEvent) => void) => void;
|
||||||
notice: (msg: string) => void;
|
notice: (msg: string) => void;
|
||||||
|
have: (sub: string, ids: u256) => void; // NIP-114
|
||||||
unknownMessage: (obj: Array<any>) => void;
|
unknownMessage: (obj: Array<any>) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +211,11 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
|||||||
// todo: stats events received
|
// todo: stats events received
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// NIP-114: GetMatchingEventIds
|
||||||
|
case "HAVE": {
|
||||||
|
this.emit("have", msg[1] as string, msg[2] as u256);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case "EOSE": {
|
case "EOSE": {
|
||||||
this.emit("eose", msg[1] as string);
|
this.emit("eose", msg[1] as string);
|
||||||
break;
|
break;
|
||||||
@ -374,7 +380,7 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
|||||||
#sendRequestCommand(item: ConnectionQueueItem) {
|
#sendRequestCommand(item: ConnectionQueueItem) {
|
||||||
try {
|
try {
|
||||||
const cmd = item.obj;
|
const cmd = item.obj;
|
||||||
if (cmd[0] === "REQ") {
|
if (cmd[0] === "REQ" || cmd[0] === "GET") {
|
||||||
this.ActiveRequests.add(cmd[1]);
|
this.ActiveRequests.add(cmd[1]);
|
||||||
this.send(cmd);
|
this.send(cmd);
|
||||||
} else if (cmd[0] === "SYNC") {
|
} else if (cmd[0] === "SYNC") {
|
||||||
@ -392,10 +398,18 @@ export class Connection extends EventEmitter<ConnectionEvents> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (this.Info?.software?.includes("strfry")) {
|
if (this.Info?.software?.includes("strfry")) {
|
||||||
const neg = new NegentropyFlow(id, this, eventSet, filters);
|
const newFilters = filters.map(a => {
|
||||||
|
if (a.ids_only) {
|
||||||
|
const copy = { ...a };
|
||||||
|
delete copy.ids_only;
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
return a;
|
||||||
|
});
|
||||||
|
const neg = new NegentropyFlow(id, this, eventSet, newFilters);
|
||||||
neg.once("finish", filters => {
|
neg.once("finish", filters => {
|
||||||
if (filters.length > 0) {
|
if (filters.length > 0) {
|
||||||
this.queueReq(["REQ", cmd[1], ...filters], item.cb);
|
this.queueReq(["REQ", cmd[1], ...newFilters], item.cb);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
neg.once("error", () => {
|
neg.once("error", () => {
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
export enum Nips {
|
export enum Nips {
|
||||||
Search = 50,
|
Search = 50,
|
||||||
|
GetMatchingEventIds = 114,
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ export type MaybeHexKey = HexKey | undefined;
|
|||||||
*/
|
*/
|
||||||
export type u256 = string;
|
export type u256 = string;
|
||||||
|
|
||||||
export type ReqCommand = [cmd: "REQ", id: string, ...filters: Array<ReqFilter>];
|
export type ReqCommand = [cmd: "REQ" | "IDS" | "GET", id: string, ...filters: Array<ReqFilter>];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Raw REQ filter object
|
* Raw REQ filter object
|
||||||
|
@ -394,6 +394,9 @@ export class Query extends EventEmitter<QueryEvents> {
|
|||||||
|
|
||||||
#sendQueryInternal(c: Connection, q: BuiltRawReqFilter) {
|
#sendQueryInternal(c: Connection, q: BuiltRawReqFilter) {
|
||||||
let filters = q.filters;
|
let filters = q.filters;
|
||||||
|
if (c.supportsNip(Nips.GetMatchingEventIds)) {
|
||||||
|
filters = filters.map(f => ({ ...f, ids_only: true }));
|
||||||
|
}
|
||||||
|
|
||||||
const qt = new QueryTrace(c.Address, filters, c.Id);
|
const qt = new QueryTrace(c.Address, filters, c.Id);
|
||||||
qt.on("close", x => c.closeReq(x));
|
qt.on("close", x => c.closeReq(x));
|
||||||
|
Loading…
Reference in New Issue
Block a user