feat: gossip model

This commit is contained in:
2023-05-08 17:55:46 +01:00
parent 78bfa57628
commit 185e089ffd
11 changed files with 275 additions and 250 deletions

View File

@ -12,6 +12,7 @@ import {
} from "./NoteCollection";
import { diffFilters } from "./RequestSplitter";
import { Query } from "./Query";
import { splitAllByWriteRelays } from "./GossipModel";
export {
NoteStore,
@ -89,7 +90,7 @@ export class NostrSystem {
try {
const addr = unwrap(sanitizeRelayUrl(address));
if (!this.Sockets.has(addr)) {
const c = new Connection(addr, options, this.HandleAuth);
const c = new Connection(addr, options, this.HandleAuth?.bind(this));
this.Sockets.set(addr, c);
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
@ -146,7 +147,7 @@ export class NostrSystem {
try {
const addr = unwrap(sanitizeRelayUrl(address));
if (!this.Sockets.has(addr)) {
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth, true);
const c = new Connection(addr, { read: true, write: false }, this.HandleAuth?.bind(this), true);
this.Sockets.set(addr, c);
c.OnEvent = (s, e) => this.OnEvent(s, e);
c.OnEose = s => this.OnEndOfStoredEvents(c, s);
@ -212,11 +213,15 @@ export class NostrSystem {
this.#changed();
return unwrap(q.feed) as Readonly<T>;
} else {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, filters, q.feed);
q.subQueries.push(subQ);
const splitFilters = splitAllByWriteRelays(filters);
for (const sf of splitFilters) {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed);
subQ.relays = sf.relay ? [sf.relay] : [];
q.subQueries.push(subQ);
this.SendQuery(subQ);
}
q.filters = filters;
q.feed.loading = true;
this.SendQuery(subQ);
this.#changed();
return q.feed as Readonly<T>;
}
@ -227,7 +232,9 @@ export class NostrSystem {
AddQuery<T extends NoteStore>(type: { new (): T }, rb: RequestBuilder): T {
const store = new type();
const q = new Query(rb.id, rb.build(), store);
const filters = rb.build();
const q = new Query(rb.id, filters, store);
if (rb.options?.leaveOpen) {
q.leaveOpen = rb.options.leaveOpen;
}
@ -236,7 +243,17 @@ export class NostrSystem {
}
this.Queries.set(rb.id, q);
this.SendQuery(q);
const splitFilters = splitAllByWriteRelays(filters);
if (splitFilters.length > 1) {
for (const sf of splitFilters) {
const subQ = new Query(`${q.id}-${q.subQueries.length + 1}`, sf.filters, q.feed);
subQ.relays = sf.relay ? [sf.relay] : [];
q.subQueries.push(subQ);
this.SendQuery(subQ);
}
} else {
this.SendQuery(q);
}
this.#changed();
return store;
}
@ -248,9 +265,27 @@ export class NostrSystem {
}
}
SendQuery(q: Query) {
for (const [, s] of this.Sockets) {
q.sendToRelay(s);
async SendQuery(q: Query) {
if (q.relays.length > 0) {
for (const r of q.relays) {
const s = this.Sockets.get(r);
if (s) {
q.sendToRelay(s);
} else {
const nc = await this.ConnectEphemeralRelay(r);
if (nc) {
q.sendToRelay(nc);
} else {
console.warn("Failed to connect to new relay for:", r, q);
}
}
}
} else {
for (const [, s] of this.Sockets) {
if (!s.Ephemeral) {
q.sendToRelay(s);
}
}
}
}
@ -304,7 +339,6 @@ export class NostrSystem {
if (v.closingAt && v.closingAt < now) {
v.sendClose();
this.Queries.delete(k);
console.debug("Removed:", k);
changed = true;
}
}