implement nip20 ok at the connection level

This commit is contained in:
ennmichael 2023-03-02 22:36:31 +01:00
parent 459f3b98de
commit 3c78f740e0
No known key found for this signature in database
GPG Key ID: 6E6E183431A26AF7
4 changed files with 129 additions and 100 deletions

View File

@ -1,6 +1,6 @@
import { ProtocolError } from "../error" import { ProtocolError } from "../error"
import { Filters, SubscriptionId } from "." import { Filters, SubscriptionId } from "."
import { RawEvent, SignedEvent } from "../event" import { EventId, RawEvent, SignedEvent } from "../event"
import WebSocket from "ws" import WebSocket from "ws"
import { unixTimestamp } from "../util" import { unixTimestamp } from "../util"
@ -14,7 +14,6 @@ import { unixTimestamp } from "../util"
*/ */
export class Conn { export class Conn {
readonly #socket: WebSocket readonly #socket: WebSocket
/** /**
* Messages which were requested to be sent before the websocket was ready. * Messages which were requested to be sent before the websocket was ready.
* Once the websocket becomes ready, these messages will be sent and cleared. * Once the websocket becomes ready, these messages will be sent and cleared.
@ -23,15 +22,25 @@ export class Conn {
// before NIP-44 auth. The legacy code reuses the same array for these two but I think they should be // before NIP-44 auth. The legacy code reuses the same array for these two but I think they should be
// different, and the NIP-44 stuff should be handled by Nostr. // different, and the NIP-44 stuff should be handled by Nostr.
#pending: OutgoingMessage[] = [] #pending: OutgoingMessage[] = []
/**
#msgCallback?: IncomingMessageCallback * Callback for errors.
#errorCallback?: ErrorCallback */
readonly #onError: (err: unknown) => void
get url(): string { get url(): string {
return this.#socket.url return this.#socket.url
} }
constructor(endpoint: string | URL) { constructor({
endpoint,
onMessage,
onError,
}: {
endpoint: string | URL
onMessage: (msg: IncomingMessage) => void
onError: (err: unknown) => void
}) {
this.#onError = onError
this.#socket = new WebSocket(endpoint) this.#socket = new WebSocket(endpoint)
// Handle incoming messages. // Handle incoming messages.
@ -40,14 +49,14 @@ export class Conn {
// Validate and parse the message. // Validate and parse the message.
if (typeof value !== "string") { if (typeof value !== "string") {
const err = new ProtocolError(`invalid message data: ${value}`) const err = new ProtocolError(`invalid message data: ${value}`)
this.#errorCallback?.(err) onError(err)
return return
} }
try { try {
const msg = await parseIncomingMessage(value) const msg = await Conn.#parseIncomingMessage(value)
this.#msgCallback?.(msg) onMessage(msg)
} catch (err) { } catch (err) {
this.#errorCallback?.(err) onError(err)
} }
}) })
@ -60,22 +69,10 @@ export class Conn {
}) })
this.#socket.addEventListener("error", (err) => { this.#socket.addEventListener("error", (err) => {
this.#errorCallback?.(err) onError(err)
}) })
} }
on(on: "message", cb: IncomingMessageCallback): void
on(on: "error", cb: ErrorCallback): void
on(on: "message" | "error", cb: IncomingMessageCallback | ErrorCallback) {
if (on === "message") {
this.#msgCallback = cb as IncomingMessageCallback
} else if (on === "error") {
this.#errorCallback = cb as ErrorCallback
} else {
throw new Error(`unexpected input: ${on}`)
}
}
send(msg: OutgoingMessage): void { send(msg: OutgoingMessage): void {
if (this.#socket.readyState < WebSocket.OPEN) { if (this.#socket.readyState < WebSocket.OPEN) {
this.#pending.push(msg) this.#pending.push(msg)
@ -84,11 +81,11 @@ export class Conn {
try { try {
this.#socket.send(serializeOutgoingMessage(msg), (err) => { this.#socket.send(serializeOutgoingMessage(msg), (err) => {
if (err !== undefined && err !== null) { if (err !== undefined && err !== null) {
this.#errorCallback?.(err) this.#onError?.(err)
} }
}) })
} catch (err) { } catch (err) {
this.#errorCallback?.(err) this.#onError?.(err)
} }
} }
@ -96,86 +93,11 @@ export class Conn {
try { try {
this.#socket.close() this.#socket.close()
} catch (err) { } catch (err) {
this.#errorCallback?.(err) this.#onError?.(err)
}
} }
} }
/** static async #parseIncomingMessage(data: string): Promise<IncomingMessage> {
* A message sent from a relay to the client.
*/
export type IncomingMessage = IncomingEvent | IncomingNotice
export type IncomingKind = "event" | "notice"
/**
* Incoming "EVENT" message.
*/
export interface IncomingEvent {
kind: "event"
subscriptionId: SubscriptionId
signed: SignedEvent
raw: RawEvent
}
/**
* Incoming "NOTICE" message.
*/
export interface IncomingNotice {
kind: "notice"
notice: string
}
/**
* A message sent from the client to a relay.
*/
export type OutgoingMessage =
| OutgoingEvent
| OutgoingOpenSubscription
| OutgoingCloseSubscription
export type OutgoingKind = "event" | "openSubscription" | "closeSubscription"
/**
* Outgoing "EVENT" message.
*/
export interface OutgoingEvent {
kind: "event"
event: SignedEvent | RawEvent
}
/**
* Outgoing "REQ" message, which opens a subscription.
*/
export interface OutgoingOpenSubscription {
kind: "openSubscription"
id: SubscriptionId
filters: Filters[]
}
/**
* Outgoing "CLOSE" message, which closes a subscription.
*/
export interface OutgoingCloseSubscription {
kind: "closeSubscription"
id: SubscriptionId
}
type IncomingMessageCallback = (message: IncomingMessage) => unknown
type ErrorCallback = (error: unknown) => unknown
interface RawFilters {
ids?: string[]
authors?: string[]
kinds?: number[]
["#e"]?: string[]
["#p"]?: string[]
since?: number
until?: number
limit?: number
}
async function parseIncomingMessage(data: string): Promise<IncomingMessage> {
const json = parseJson(data) const json = parseJson(data)
if (!(json instanceof Array)) { if (!(json instanceof Array)) {
throw new ProtocolError(`incoming message is not an array: ${data}`) throw new ProtocolError(`incoming message is not an array: ${data}`)
@ -213,8 +135,113 @@ async function parseIncomingMessage(data: string): Promise<IncomingMessage> {
notice: json[1], notice: json[1],
} }
} }
if (json[0] === "OK") {
if (typeof json[1] !== "string") {
throw new ProtocolError(
`second element of "OK" should be a string, but wasn't: ${data}`
)
}
if (typeof json[2] !== "boolean") {
throw new ProtocolError(
`third element of "OK" should be a boolean, but wasn't: ${data}`
)
}
if (typeof json[3] !== "string") {
throw new ProtocolError(
`fourth element of "OK" should be a string, but wasn't: ${data}`
)
}
return {
kind: "ok",
eventId: new EventId(json[1]),
ok: json[2],
message: json[3],
}
}
throw new ProtocolError(`unknown incoming message: ${data}`) throw new ProtocolError(`unknown incoming message: ${data}`)
} }
}
/**
* A message sent from a relay to the client.
*/
export type IncomingMessage = IncomingEvent | IncomingNotice | IncomingOk
export type IncomingKind = "event" | "notice" | "ok"
/**
* Incoming "EVENT" message.
*/
export interface IncomingEvent {
kind: "event"
subscriptionId: SubscriptionId
signed: SignedEvent
raw: RawEvent
}
/**
* Incoming "NOTICE" message.
*/
export interface IncomingNotice {
kind: "notice"
notice: string
}
/**
* Incoming "OK" message.
*/
export interface IncomingOk {
kind: "ok"
eventId: EventId
ok: boolean
message: string
}
/**
* A message sent from the client to a relay.
*/
export type OutgoingMessage =
| OutgoingEvent
| OutgoingOpenSubscription
| OutgoingCloseSubscription
export type OutgoingKind = "event" | "openSubscription" | "closeSubscription"
/**
* Outgoing "EVENT" message.
*/
export interface OutgoingEvent {
kind: "event"
event: SignedEvent | RawEvent
}
/**
* Outgoing "REQ" message, which opens a subscription.
*/
export interface OutgoingOpenSubscription {
kind: "openSubscription"
id: SubscriptionId
filters: Filters[]
}
/**
* Outgoing "CLOSE" message, which closes a subscription.
*/
export interface OutgoingCloseSubscription {
kind: "closeSubscription"
id: SubscriptionId
}
interface RawFilters {
ids?: string[]
authors?: string[]
kinds?: number[]
["#e"]?: string[]
["#p"]?: string[]
since?: number
until?: number
limit?: number
}
function serializeOutgoingMessage(msg: OutgoingMessage): string { function serializeOutgoingMessage(msg: OutgoingMessage): string {
if (msg.kind === "event") { if (msg.kind === "event") {

View File

@ -47,10 +47,10 @@ export class Nostr extends EventEmitter {
} }
// If there is no existing connection, open a new one. // If there is no existing connection, open a new one.
const conn = new Conn(url) const conn = new Conn({
endpoint: url,
// Handle messages on this connection. // Handle messages on this connection.
conn.on("message", async (msg) => { onMessage: (msg) => {
try { try {
if (msg.kind === "event") { if (msg.kind === "event") {
this.emit( this.emit(
@ -70,11 +70,9 @@ export class Nostr extends EventEmitter {
} catch (err) { } catch (err) {
this.emit("error", err, this) this.emit("error", err, this)
} }
}) },
// Forward errors on this connection.
// Forward connection errors to the error callbacks. onError: (err) => this.emit("error", err, this),
conn.on("error", (err) => {
this.emit("error", err, this)
}) })
// Resend existing subscriptions to this connection. // Resend existing subscriptions to this connection.

View File

@ -87,7 +87,8 @@ export class EventId {
} }
} }
private constructor(hex: string) { constructor(hex: string) {
// TODO Validate that this is 32-byte hex
this.#hex = hex this.#hex = hex
} }

View File

@ -34,6 +34,9 @@ describe("single event communication", function () {
// for future events. // for future events.
subscriber.off("event", listener) subscriber.off("event", listener)
subscriber.on("error", done)
publisher.on("error", done)
publisher.close() publisher.close()
subscriber.close() subscriber.close()