feat: note publishing progress

This commit is contained in:
2023-10-11 15:41:36 +01:00
parent c239fba3df
commit 0e4a040750
22 changed files with 438 additions and 351 deletions

View File

@ -18,6 +18,13 @@ export interface RelaySettings {
write: boolean;
}
export interface OkResponse {
ok: boolean;
id: string;
relay: string;
message?: string;
}
/**
* Snapshot of connection stats
*/
@ -61,7 +68,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
HasStateChange: boolean = true;
IsClosed: boolean;
ReconnectTimer?: ReturnType<typeof setTimeout>;
EventsCallback: Map<u256, (msg: boolean[]) => void>;
EventsCallback: Map<u256, (msg: Array<string | boolean>) => void>;
OnConnected?: (wasReconnect: boolean) => void;
OnEvent?: (sub: string, e: TaggedNostrEvent) => void;
OnEose?: (sub: string) => void;
@ -175,11 +182,11 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
OnMessage(e: WebSocket.MessageEvent) {
this.#activity = unixNowMs();
if ((e.data as string).length > 0) {
const msg = JSON.parse(e.data as string);
const tag = msg[0];
const msg = JSON.parse(e.data as string) as Array<string | NostrEvent | boolean>;
const tag = msg[0] as string;
switch (tag) {
case "AUTH": {
this.#onAuthAsync(msg[1])
this.#onAuthAsync(msg[1] as string)
.then(() => this.#sendPendingRaw())
.catch(this.#log);
this.Stats.EventsReceived++;
@ -187,8 +194,8 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
break;
}
case "EVENT": {
this.OnEvent?.(msg[1], {
...msg[2],
this.OnEvent?.(msg[1] as string, {
...(msg[2] as NostrEvent),
relays: [this.Address],
});
this.Stats.EventsReceived++;
@ -196,17 +203,17 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
break;
}
case "EOSE": {
this.OnEose?.(msg[1]);
this.OnEose?.(msg[1] as string);
break;
}
case "OK": {
// feedback to broadcast call
this.#log(`${this.Address} OK: %O`, msg);
const id = msg[1];
if (this.EventsCallback.has(id)) {
const cb = unwrap(this.EventsCallback.get(id));
const id = msg[1] as string;
const cb = this.EventsCallback.get(id);
if (cb) {
this.EventsCallback.delete(id);
cb(msg);
cb(msg as Array<string | boolean>);
}
break;
}
@ -244,17 +251,40 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
* Send event on this connection and wait for OK response
*/
async SendAsync(e: NostrEvent, timeout = 5000) {
return new Promise<void>(resolve => {
return await new Promise<OkResponse>((resolve, reject) => {
if (!this.Settings.write) {
resolve();
reject(new Error("Not a write relay"));
return;
}
if (this.EventsCallback.has(e.id)) {
resolve({
ok: false,
id: e.id,
relay: this.Address,
message: "Duplicate request",
});
return;
}
const t = setTimeout(() => {
resolve();
this.EventsCallback.delete(e.id);
resolve({
ok: false,
id: e.id,
relay: this.Address,
message: "Timout waiting for OK response",
});
}, timeout);
this.EventsCallback.set(e.id, () => {
this.EventsCallback.set(e.id, msg => {
clearTimeout(t);
resolve();
const [_, id, accepted, message] = msg;
resolve({
ok: accepted as boolean,
id: id as string,
relay: this.Address,
message: message as string | undefined,
});
});
const req = ["EVENT", e];
@ -395,7 +425,7 @@ export class Connection extends ExternalStore<ConnectionStateSnapshot> {
resolve();
}, 10_000);
this.EventsCallback.set(authEvent.id, (msg: boolean[]) => {
this.EventsCallback.set(authEvent.id, msg => {
clearTimeout(t);
authCleanup();
if (msg.length > 3 && msg[2] === true) {

View File

@ -1,4 +1,4 @@
import { AuthHandler, RelaySettings, ConnectionStateSnapshot } from "./connection";
import { AuthHandler, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { RequestBuilder } from "./request-builder";
import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { Query } from "./query";
@ -87,15 +87,16 @@ export interface SystemInterface {
/**
* Send an event to all permanent connections
* @param ev Event to broadcast
* @param cb Callback to handle OkResponse as they arrive
*/
BroadcastEvent(ev: NostrEvent): void;
BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<Array<OkResponse>>;
/**
* Connect to a specific relay and send an event and wait for the response
* @param relay Relay URL
* @param ev Event to send
*/
WriteOnceToRelay(relay: string, ev: NostrEvent): Promise<void>;
WriteOnceToRelay(relay: string, ev: NostrEvent): Promise<OkResponse>;
/**
* Profile cache/loader

View File

@ -1,8 +1,8 @@
import debug from "debug";
import { unwrap, sanitizeRelayUrl, ExternalStore, FeedCache } from "@snort/shared";
import { unwrap, sanitizeRelayUrl, ExternalStore, FeedCache, removeUndefined } from "@snort/shared";
import { NostrEvent, TaggedNostrEvent } from "./nostr";
import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot } from "./connection";
import { AuthHandler, Connection, RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { Query } from "./query";
import { NoteCollection, NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { BuiltRawReqFilter, RequestBuilder, RequestStrategy } from "./request-builder";
@ -354,35 +354,45 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
/**
* Send events to writable relays
*/
BroadcastEvent(ev: NostrEvent) {
for (const [, s] of this.#sockets) {
if (!s.Ephemeral) {
s.SendEvent(ev);
}
}
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void) {
const socks = [...this.#sockets.values()].filter(a => !a.Ephemeral && a.Settings.write);
const oks = await Promise.all(
socks.map(async s => {
try {
const rsp = await s.SendAsync(ev);
cb?.(rsp);
return rsp;
} catch (e) {
console.error(e);
}
return;
}),
);
return removeUndefined(oks);
}
/**
* Write an event to a relay then disconnect
*/
async WriteOnceToRelay(address: string, ev: NostrEvent) {
async WriteOnceToRelay(address: string, ev: NostrEvent): Promise<OkResponse> {
const addrClean = sanitizeRelayUrl(address);
if (!addrClean) {
throw new Error("Invalid relay address");
}
if (this.#sockets.has(addrClean)) {
await this.#sockets.get(addrClean)?.SendAsync(ev);
const existing = this.#sockets.get(addrClean);
if (existing) {
return await existing.SendAsync(ev);
} else {
return await new Promise<void>((resolve, reject) => {
return await new Promise<OkResponse>((resolve, reject) => {
const c = new Connection(address, { write: true, read: true }, this.#handleAuth?.bind(this), true);
const t = setTimeout(reject, 5_000);
const t = setTimeout(reject, 10_000);
c.OnConnected = async () => {
clearTimeout(t);
await c.SendAsync(ev);
const rsp = await c.SendAsync(ev);
c.Close();
resolve();
resolve(rsp);
};
c.Connect();
});

View File

@ -1,82 +0,0 @@
import { ExternalStore } from "@snort/shared";
import { SystemSnapshot, SystemInterface, ProfileLoaderService } from ".";
import { AuthHandler, ConnectionStateSnapshot, RelaySettings } from "./connection";
import { NostrEvent, TaggedNostrEvent } from "./nostr";
import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { Query } from "./query";
import { RequestBuilder } from "./request-builder";
import { RelayCache } from "./gossip-model";
import { QueryOptimizer } from "./query-optimizer";
export class SystemWorker extends ExternalStore<SystemSnapshot> implements SystemInterface {
#port: MessagePort;
constructor() {
super();
if ("SharedWorker" in window) {
const worker = new SharedWorker("/system.js");
this.#port = worker.port;
this.#port.onmessage = m => this.#onMessage(m);
} else {
throw new Error("SharedWorker is not supported");
}
}
Fetch(req: RequestBuilder, cb?: (evs: Array<TaggedNostrEvent>) => void): Promise<NoteStoreSnapshotData> {
throw new Error("Method not implemented.");
}
get ProfileLoader(): ProfileLoaderService {
throw new Error("Method not implemented.");
}
get RelayCache(): RelayCache {
throw new Error("Method not implemented.");
}
get QueryOptimizer(): QueryOptimizer {
throw new Error("Method not implemented.");
}
HandleAuth?: AuthHandler;
get Sockets(): ConnectionStateSnapshot[] {
throw new Error("Method not implemented.");
}
Query<T extends NoteStore>(type: new () => T, req: RequestBuilder | null): Query {
throw new Error("Method not implemented.");
}
CancelQuery(sub: string): void {
throw new Error("Method not implemented.");
}
GetQuery(sub: string): Query | undefined {
throw new Error("Method not implemented.");
}
ConnectToRelay(address: string, options: RelaySettings): Promise<void> {
throw new Error("Method not implemented.");
}
DisconnectRelay(address: string): void {
throw new Error("Method not implemented.");
}
BroadcastEvent(ev: NostrEvent): void {
throw new Error("Method not implemented.");
}
WriteOnceToRelay(relay: string, ev: NostrEvent): Promise<void> {
throw new Error("Method not implemented.");
}
takeSnapshot(): SystemSnapshot {
throw new Error("Method not implemented.");
}
#onMessage(e: MessageEvent<any>) {
console.debug(e);
}
}

View File

@ -1,4 +1,4 @@
import { unwrap } from "@snort/shared";
import { removeUndefined } from "@snort/shared";
import {
CashuRegex,
@ -230,8 +230,8 @@ export function transformText(body: string, tags: Array<Array<string>>) {
fragments = extractCashuTokens(fragments);
fragments = extractCustomEmoji(fragments, tags);
fragments = extractMarkdownCode(fragments);
fragments = fragments
.map(a => {
fragments = removeUndefined(
fragments.map(a => {
if (typeof a === "string") {
if (a.length > 0) {
return { type: "text", content: a } as ParsedFragment;
@ -239,8 +239,7 @@ export function transformText(body: string, tags: Array<Array<string>>) {
} else {
return a;
}
})
.filter(a => a)
.map(a => unwrap(a));
}),
);
return fragments as Array<ParsedFragment>;
}