diff --git a/packages/nostr/package.json b/packages/nostr/package.json index caf7b6e..bc8b9d3 100644 --- a/packages/nostr/package.json +++ b/packages/nostr/package.json @@ -12,5 +12,9 @@ }, "prettier": { "semi": false + }, + "dependencies": { + "isomorphic-ws": "^5.0.0", + "ws": "^8.12.1" } } diff --git a/packages/nostr/src/client/conn.ts b/packages/nostr/src/client/conn.ts new file mode 100644 index 0000000..a18138c --- /dev/null +++ b/packages/nostr/src/client/conn.ts @@ -0,0 +1,161 @@ +import { ProtocolError } from "../error" +import { Filters, SubscriptionId } from "." +import { formatOutgoingMessage, parseIncomingMessage, RawEvent } from "../raw" +import { Event } from "../event" + +/** + * The connection to a relay. This is the lowest layer of the nostr protocol. + * The only responsibility of this type is to send and receive + * well-formatted nostr messages on the underlying websocket. All other details of the protocol + * are handled by `Nostr`. + * + * @see Nostr + */ +export class Conn { + readonly #socket: WebSocket + + /** + * Messages which were requested to be sent before the websocket was ready. + * Once the websocket becomes ready, these messages will be sent and cleared. + */ + // TODO Another reason why pending messages might be required is when the user tries to send a message + // before NIP-44 auth. The legacy code reuses the same array for these two but I think they should be + // different, and the NIP-44 stuff should be handled by Nostr. + #pending: OutgoingMessage[] = [] + + readonly #msgCallbacks: IncomingMessageCallback[] = [] + readonly #errorCallbacks: ConnErrorCallback[] = [] + + get url(): string { + return this.#socket.url + } + + constructor(endpoint: string | URL) { + this.#socket = new WebSocket(endpoint) + + // Handle incoming messages. + this.#socket.addEventListener("message", (msgData) => { + const value = msgData.data.valueOf() + // Validate and parse the message. + if (typeof value !== "string") { + const err = new ProtocolError(`invalid message data: ${value}`) + for (const cb of this.#errorCallbacks) { + cb(err) + } + return + } + try { + const msg = parseIncomingMessage(value) + for (const cb of this.#msgCallbacks) { + cb(msg) + } + } catch (err) { + if (err instanceof ProtocolError) { + for (const cb of this.#errorCallbacks) { + cb(err) + } + } else { + throw err + } + } + }) + + // When the connection is ready, send any outstanding messages. + this.#socket.addEventListener("open", () => { + for (const msg of this.#pending) { + this.send(msg) + } + this.#pending = [] + }) + } + + onMessage(cb: IncomingMessageCallback): void { + this.#msgCallbacks.push(cb) + } + + onError(cb: ConnErrorCallback): void { + this.#errorCallbacks.push(cb) + } + + send(msg: OutgoingMessage): void { + if (this.#socket.readyState < WebSocket.OPEN) { + this.#pending.push(msg) + return + } + this.#socket.send(formatOutgoingMessage(msg)) + } + + close(): void { + this.#socket.close() + } +} + +/** + * A message sent from a relay to the client. + */ +export type IncomingMessage = IncomingEvent | IncomingNotice + +export const enum IncomingKind { + Event, + Notice, +} + +/** + * Incoming "EVENT" message. + */ +export interface IncomingEvent { + kind: IncomingKind.Event + subscriptionId: SubscriptionId + event: Event + raw: RawEvent +} + +/** + * Incoming "NOTICE" message. + */ +export interface IncomingNotice { + kind: IncomingKind.Notice + notice: string +} + +/** + * A message sent from the client to a relay. + */ +export type OutgoingMessage = + | OutgoingEvent + | OutgoingSubscription + | OutgoingUnsubscription + +export const enum OutgoingKind { + Event, + Subscription, + Unsubscription, +} + +/** + * Outgoing "EVENT" message. + */ +export interface OutgoingEvent { + kind: OutgoingKind.Event + event: Event +} + +/** + * Outgoing "REQ" message, representing a subscription. + */ +export interface OutgoingSubscription { + kind: OutgoingKind.Subscription + id: SubscriptionId + filters: Filters[] +} + +/** + * Outgoing "CLOSE" message, representing an unsubscription. + */ +export interface OutgoingUnsubscription { + kind: OutgoingKind.Unsubscription + id: SubscriptionId +} + +type IncomingMessageCallback = (message: IncomingMessage) => unknown +type ConnErrorCallback = (error: ProtocolError) => unknown diff --git a/packages/nostr/src/client/index.ts b/packages/nostr/src/client/index.ts new file mode 100644 index 0000000..5ac5b31 --- /dev/null +++ b/packages/nostr/src/client/index.ts @@ -0,0 +1,297 @@ +import { ProtocolError } from "../error" +import { + EventId, + Event, + serializeId as serializeEventId, + EventKind, +} from "../event" +import { PublicKey } from "../keypair" +import { Conn, IncomingKind, OutgoingKind } from "./conn" +import * as secp from "@noble/secp256k1" +import { RawEvent } from "../raw" + +/** + * A nostr client. + */ +export class Nostr { + // TODO NIP-44 AUTH, leave this for later + /** + * Open connections to relays. + */ + readonly #conns: Map< + string, + { + conn: Conn + /** + * Has this connection been authenticated via NIP-44 AUTH? + */ + auth: boolean + /** + * Should this connection be used for receiving messages? + */ + read: boolean + /** + * Should this connection be used for publishing events? + */ + write: boolean + } + > + + /** + * Mapping of subscription IDs to corresponding filters. + */ + readonly #subscriptions: Map = new Map() + + readonly #eventCallbacks: EventCallback[] = [] + readonly #noticeCallbacks: NoticeCallback[] = [] + readonly #errorCallbacks: ErrorCallback[] = [] + + /** + * Add a new callback for received events. + */ + onEvent(cb: EventCallback): void { + this.#eventCallbacks.push(cb) + } + + /** + * Add a new callback for received notices. + */ + onNotice(cb: NoticeCallback): void { + this.#noticeCallbacks.push(cb) + } + + /** + * Add a new callback for errors. + */ + onError(cb: ErrorCallback): void { + this.#errorCallbacks.push(cb) + } + + /** + * Connect and start communicating with a relay. This method recreates all existing + * subscriptions on the new relay as well. If there is already an existing connection, + * this method will only update it with the new options, and an exception will be thrown + * if no options are specified. + */ + connect(url: URL | string, opts?: { read?: boolean; write?: boolean }): void { + // If the connection already exists, update the options. + const existingConn = this.#conns.get(url.toString()) + if (existingConn !== undefined) { + if (opts === undefined) { + throw new Error( + `called connect with existing connection ${url}, but options were not specified` + ) + } + if (opts.read !== undefined) { + existingConn.read = opts.read + } + if (opts.write !== undefined) { + existingConn.write = opts.write + } + return + } + + // If there is no existing connection, open a new one. + const conn = new Conn(url) + + // Handle messages on this connection. + conn.onMessage(async (msg) => { + if (msg.kind === IncomingKind.Event) { + for (const cb of this.#eventCallbacks) { + cb( + { + event: msg.event, + eventId: await serializeEventId(msg.raw), + subscriptionId: msg.subscriptionId, + raw: msg.raw, + }, + this + ) + } + } else if (msg.kind === IncomingKind.Notice) { + for (const cb of this.#noticeCallbacks) { + cb(msg.notice, this) + } + } else { + const err = new ProtocolError(`invalid message ${msg}`) + for (const cb of this.#errorCallbacks) { + cb(err, this) + } + } + }) + + // Forward connection errors to the error callbacks. + conn.onError((err) => { + for (const cb of this.#errorCallbacks) { + cb(err, this) + } + }) + + // Resend existing subscriptions to this connection. + for (const [key, filters] of this.#subscriptions.entries()) { + const subscriptionId = new SubscriptionId(key) + conn.send({ + kind: OutgoingKind.Subscription, + id: subscriptionId, + filters, + }) + } + + this.#conns.set(url.toString(), { + conn, + auth: false, + read: opts?.read ?? true, + write: opts?.write ?? true, + }) + } + + /** + * Disconnect from a relay. If there is no open connection to this relay, an exception is thrown. + * + * TODO There needs to be a way to check connection state. isOpen(), isReady(), isClosing() maybe? + * Because of how WebSocket states work this isn't as simple as it seems. + */ + disconnect(url: URL | string): void { + const c = this.#conns.get(url.toString()) + if (c === undefined) { + throw new Error(`connection to ${url} doesn't exist`) + } + this.#conns.delete(url.toString()) + c.conn.close() + } + + /** + * Check if a subscription exists. + */ + subscribed(subscriptionId: SubscriptionId): boolean { + return this.#subscriptions.has(subscriptionId.toString()) + } + + /** + * Create a new subscription. If the subscription already exists, it will be overwritten (as per NIP-01). + * + * @param filters The filters to apply to this message. If any filter passes, the message is let through. + * @param subscriptionId An optional subscription ID, otherwise a random subscription ID will be used. + * @returns The subscription ID. + */ + subscribe( + filters: Filters[], + subscriptionId?: SubscriptionId + ): SubscriptionId { + subscriptionId ??= SubscriptionId.random() + this.#subscriptions.set(subscriptionId.toString(), filters) + for (const { conn, read } of this.#conns.values()) { + if (!read) { + continue + } + conn.send({ + kind: OutgoingKind.Subscription, + id: subscriptionId, + filters, + }) + } + return subscriptionId + } + + /** + * Remove a subscription. If the subscription does not exist, an exception is thrown. + * + * TODO Reference subscribed() + */ + async unsubscribe(subscriptionId: SubscriptionId): Promise { + if (!this.#subscriptions.delete(subscriptionId.toString())) { + throw new Error(`subscription ${subscriptionId} does not exist`) + } + for (const { conn, read } of this.#conns.values()) { + if (!read) { + continue + } + conn.send({ + kind: OutgoingKind.Unsubscription, + id: subscriptionId, + }) + } + } + + /** + * Publish an event. + */ + async publish(event: Event): Promise { + for (const { conn, write } of this.#conns.values()) { + if (!write) { + continue + } + conn.send({ + kind: OutgoingKind.Event, + event, + }) + } + } +} + +/** + * A string uniquely identifying a client subscription. + */ +export class SubscriptionId { + #id: string + + constructor(subscriptionId: string) { + this.#id = subscriptionId + } + + static random(): SubscriptionId { + return new SubscriptionId(secp.utils.randomBytes(32).toString()) + } + + toString() { + return this.#id + } +} + +/** + * A prefix filter. These filters match events which have the appropriate prefix. + * This also means that exact matches pass the filters. No special syntax is required. + */ +export class Prefix { + #prefix: T + + constructor(prefix: T) { + this.#prefix = prefix + } + + toString(): string { + return this.#prefix.toString() + } +} + +/** + * Subscription filters. All filters from the fields must pass for a message to get through. + */ +export interface Filters { + // TODO Document the filters, document that for the arrays only one is enough for the message to pass + ids?: Prefix[] + authors?: Prefix[] + kinds?: EventKind[] + /** + * Filters for the "#e" tags. + */ + eventTags?: EventId[] + /** + * Filters for the "#p" tags. + */ + pubkeyTags?: PublicKey[] + since?: Date + until?: Date + limit?: number +} + +export type EventCallback = (params: EventParams, nostr: Nostr) => unknown +export type NoticeCallback = (notice: string, nostr: Nostr) => unknown +export type ErrorCallback = (error: ProtocolError, nostr: Nostr) => unknown + +export interface EventParams { + event: Event + eventId: EventId + subscriptionId: SubscriptionId + raw: RawEvent +} diff --git a/packages/nostr/src/event.ts b/packages/nostr/src/event.ts index 9636844..c41178c 100644 --- a/packages/nostr/src/event.ts +++ b/packages/nostr/src/event.ts @@ -137,7 +137,7 @@ async function checkSignature( } } -async function serializeId(raw: RawEvent): Promise { +export async function serializeId(raw: RawEvent): Promise { // It's not defined whether JSON.stringify produces a string with whitespace stripped. // Building the JSON string manually this way ensures that there's no whitespace. // In hindsight using JSON as a data format for hashing and signing is not the best diff --git a/packages/nostr/src/nostr.ts b/packages/nostr/src/nostr.ts deleted file mode 100644 index 0a9d814..0000000 --- a/packages/nostr/src/nostr.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { ProtocolError } from "./error" -import { EventId, Event } from "./event" -import { RawEvent } from "./raw" - -/** - * A nostr client. - */ -export class Nostr { - /** - * Open connections to relays. - */ - #conns: Conn[] = [] - /** - * Is this client closed? - */ - #closed: boolean = false - /** - * Mapping of subscription IDs to corresponding filters. - */ - #subscriptions: Map = new Map() - - #eventCallbacks: EventCallback[] = [] - #noticeCallbacks: NoticeCallback[] = [] - #errorCallbacks: ErrorCallback[] = [] - - /** - * Add a new callback for received events. - */ - onEvent(cb: EventCallback): void { - this.#eventCallbacks.push(cb) - } - - /** - * Add a new callback for received notices. - */ - onNotice(cb: NoticeCallback): void { - this.#noticeCallbacks.push(cb) - } - - /** - * Add a new callback for errors. - */ - onError(cb: ErrorCallback): void { - this.#errorCallbacks.push(cb) - } - - /** - * Connect and start communicating with a relay. This method recreates all existing - * subscriptions on the new relay as well. - */ - async connect(relay: URL | string): Promise { - this.#checkClosed() - throw new Error("todo try to connect and send subscriptions") - } - - /** - * Create a new subscription. - * - * @param subscriptionId An optional subscription ID, otherwise a random subscription ID will be used. - * @returns The subscription ID. - */ - async subscribe( - filters: Filters, - subscriptionId?: SubscriptionId | string - ): Promise { - this.#checkClosed() - throw new Error("todo subscribe to the relays and add the subscription") - } - - /** - * Remove a subscription. - */ - async unsubscribe(subscriptionId: SubscriptionId): Promise { - this.#checkClosed() - throw new Error( - "todo unsubscribe from the relays and remove the subscription" - ) - } - - /** - * Publish an event. - */ - async publish(event: Event): Promise { - this.#checkClosed() - throw new Error("todo") - } - - /** - * Close connections to all relays. This method can only be called once. After the - * connections have been closed, no other methods can be called. - */ - async close(): Promise {} - - #checkClosed() { - if (this.#closed) { - throw new Error("the client has been closed") - } - } -} - -/** - * A string uniquely identifying a client subscription. - */ -export class SubscriptionId { - #id: string - - constructor(subscriptionId: string) { - this.#id = subscriptionId - } - - toString() { - return this.#id - } -} - -/** - * Subscription filters. - */ -export interface Filters {} - -export type EventCallback = (params: EventParams, nostr: Nostr) => unknown -export type NoticeCallback = (notice: string, nostr: Nostr) => unknown -export type ErrorCallback = (error: ProtocolError, nostr: Nostr) => unknown - -export interface EventParams { - event: Event - id: EventId - raw: RawEvent -} - -/** - * The connection to a relay. - */ -class Conn {} diff --git a/packages/nostr/src/raw.ts b/packages/nostr/src/raw.ts index 66df762..d6c8a7b 100644 --- a/packages/nostr/src/raw.ts +++ b/packages/nostr/src/raw.ts @@ -1,8 +1,10 @@ -import { ProtocolError } from "./error" - /** - * Raw event to be transferred over the wire. + * Types defining data in the format sent over the wire. */ + +import { ProtocolError } from "./error" +import { IncomingMessage, OutgoingMessage } from "./client/conn" + export interface RawEvent { id: string pubkey: string @@ -13,7 +15,33 @@ export interface RawEvent { sig: string } -export function parseRawEvent(data: string): RawEvent { +interface RawFilters { + ids: string[] + authors: string[] + kinds: number[] + ["#e"]: string[] + ["#p"]: string[] + since: number + until: number + limit: number +} + +type RawIncomingMessage = ["EVENT", string, RawEvent] | ["NOTICE", string] + +type RawOutgoingMessage = + | ["EVENT", RawEvent] + | ["REQ", string, RawFilters] + | ["CLOSE", string] + +export function parseIncomingMessage(msg: string): IncomingMessage { + throw new Error("todo") +} + +export function formatOutgoingMessage(msg: OutgoingMessage): string { + throw new Error("todo") +} + +function parseRawEvent(data: string): RawEvent { const json = parseJson(data) if ( typeof json["id"] !== "string" || diff --git a/yarn.lock b/yarn.lock index ba19f06..54ac31b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5698,6 +5698,11 @@ isexe@^2.0.0: resolved "https://registry.yarnpkg.com/isexe/-/isexe-2.0.0.tgz#e8fbf374dc556ff8947a10dcb0572d633f2cfa10" integrity sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw== +isomorphic-ws@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz#e5529148912ecb9b451b46ed44d53dae1ce04bbf" + integrity sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw== + istanbul-lib-coverage@^3.0.0, istanbul-lib-coverage@^3.2.0: version "3.2.0" resolved "https://registry.yarnpkg.com/istanbul-lib-coverage/-/istanbul-lib-coverage-3.2.0.tgz#189e7909d0a39fa5a3dfad5b03f71947770191d3" @@ -10400,6 +10405,11 @@ ws@^7.4.6: resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591" integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q== +ws@^8.12.1: + version "8.12.1" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.1.tgz#c51e583d79140b5e42e39be48c934131942d4a8f" + integrity sha512-1qo+M9Ba+xNhPB+YTWUlK6M17brTut5EXbcBaMRN5pH5dFrXz7lzz1ChFSUq3bOUl8yEvSenhHmYUNJxFzdJew== + ws@^8.4.2: version "8.12.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"