nostr package part 3
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
import { ProtocolError } from "../error"
|
||||
import { Filters, SubscriptionId } from "."
|
||||
import { formatOutgoingMessage, parseIncomingMessage, RawEvent } from "../raw"
|
||||
import { Event } from "../event"
|
||||
import { RawEvent, SignedEvent } from "../event"
|
||||
import WebSocket from "ws"
|
||||
|
||||
/**
|
||||
* The connection to a relay. This is the lowest layer of the nostr protocol.
|
||||
@ -23,8 +23,8 @@ export class Conn {
|
||||
// different, and the NIP-44 stuff should be handled by Nostr.
|
||||
#pending: OutgoingMessage[] = []
|
||||
|
||||
readonly #msgCallbacks: IncomingMessageCallback[] = []
|
||||
readonly #errorCallbacks: ConnErrorCallback[] = []
|
||||
#msgCallback?: IncomingMessageCallback
|
||||
#errorCallback?: ErrorCallback
|
||||
|
||||
get url(): string {
|
||||
return this.#socket.url
|
||||
@ -34,27 +34,22 @@ export class Conn {
|
||||
this.#socket = new WebSocket(endpoint)
|
||||
|
||||
// Handle incoming messages.
|
||||
this.#socket.addEventListener("message", (msgData) => {
|
||||
this.#socket.addEventListener("message", async (msgData) => {
|
||||
const value = msgData.data.valueOf()
|
||||
// Validate and parse the message.
|
||||
if (typeof value !== "string") {
|
||||
const err = new ProtocolError(`invalid message data: ${value}`)
|
||||
for (const cb of this.#errorCallbacks) {
|
||||
cb(err)
|
||||
}
|
||||
this.#errorCallback?.(err)
|
||||
return
|
||||
}
|
||||
try {
|
||||
const msg = parseIncomingMessage(value)
|
||||
for (const cb of this.#msgCallbacks) {
|
||||
cb(msg)
|
||||
}
|
||||
const msg = await parseIncomingMessage(value)
|
||||
this.#msgCallback?.(msg)
|
||||
} catch (err) {
|
||||
if (err instanceof ProtocolError) {
|
||||
for (const cb of this.#errorCallbacks) {
|
||||
cb(err)
|
||||
}
|
||||
this.#errorCallback?.(err)
|
||||
} else {
|
||||
// TODO Not sure if this is the case?
|
||||
throw err
|
||||
}
|
||||
}
|
||||
@ -69,12 +64,16 @@ export class Conn {
|
||||
})
|
||||
}
|
||||
|
||||
onMessage(cb: IncomingMessageCallback): void {
|
||||
this.#msgCallbacks.push(cb)
|
||||
}
|
||||
|
||||
onError(cb: ConnErrorCallback): void {
|
||||
this.#errorCallbacks.push(cb)
|
||||
on(on: "message", cb: IncomingMessageCallback): void
|
||||
on(on: "error", cb: ErrorCallback): void
|
||||
on(on: "message" | "error", cb: IncomingMessageCallback | ErrorCallback) {
|
||||
if (on === "message") {
|
||||
this.#msgCallback = cb as IncomingMessageCallback
|
||||
} else if (on === "error") {
|
||||
this.#errorCallback = cb as ErrorCallback
|
||||
} else {
|
||||
throw new Error(`unexpected input: ${on}`)
|
||||
}
|
||||
}
|
||||
|
||||
send(msg: OutgoingMessage): void {
|
||||
@ -82,7 +81,7 @@ export class Conn {
|
||||
this.#pending.push(msg)
|
||||
return
|
||||
}
|
||||
this.#socket.send(formatOutgoingMessage(msg))
|
||||
this.#socket.send(serializeOutgoingMessage(msg))
|
||||
}
|
||||
|
||||
close(): void {
|
||||
@ -106,7 +105,7 @@ export const enum IncomingKind {
|
||||
export interface IncomingEvent {
|
||||
kind: IncomingKind.Event
|
||||
subscriptionId: SubscriptionId
|
||||
event: Event
|
||||
signed: SignedEvent
|
||||
raw: RawEvent
|
||||
}
|
||||
|
||||
@ -137,7 +136,7 @@ export const enum OutgoingKind {
|
||||
*/
|
||||
export interface OutgoingEvent {
|
||||
kind: OutgoingKind.Event
|
||||
event: Event
|
||||
signed: SignedEvent
|
||||
}
|
||||
|
||||
/**
|
||||
@ -146,7 +145,7 @@ export interface OutgoingEvent {
|
||||
export interface OutgoingSubscription {
|
||||
kind: OutgoingKind.Subscription
|
||||
id: SubscriptionId
|
||||
filters: Filters[]
|
||||
filters: Filters
|
||||
}
|
||||
|
||||
/**
|
||||
@ -158,4 +157,123 @@ export interface OutgoingUnsubscription {
|
||||
}
|
||||
|
||||
type IncomingMessageCallback = (message: IncomingMessage) => unknown
|
||||
type ConnErrorCallback = (error: ProtocolError) => unknown
|
||||
type ErrorCallback = (error: ProtocolError) => unknown
|
||||
|
||||
interface RawFilters {
|
||||
ids?: string[]
|
||||
authors?: string[]
|
||||
kinds?: number[]
|
||||
["#e"]?: string[]
|
||||
["#p"]?: string[]
|
||||
since?: number
|
||||
until?: number
|
||||
limit?: number
|
||||
}
|
||||
|
||||
async function parseIncomingMessage(data: string): Promise<IncomingMessage> {
|
||||
const json = parseJson(data)
|
||||
if (!(json instanceof Array)) {
|
||||
throw new ProtocolError(`incoming message is not an array: ${data}`)
|
||||
}
|
||||
if (json.length === 3) {
|
||||
if (json[0] !== "EVENT") {
|
||||
throw new ProtocolError(`expected "EVENT" message, but got: ${data}`)
|
||||
}
|
||||
if (typeof json[1] !== "string") {
|
||||
throw new ProtocolError(
|
||||
`second element of "EVENT" should be a string, but wasn't: ${data}`
|
||||
)
|
||||
}
|
||||
if (typeof json[2] !== "object") {
|
||||
throw new ProtocolError(
|
||||
`second element of "EVENT" should be an object, but wasn't: ${data}`
|
||||
)
|
||||
}
|
||||
const raw = parseEventData(json[2])
|
||||
return {
|
||||
kind: IncomingKind.Event,
|
||||
subscriptionId: new SubscriptionId(json[1]),
|
||||
signed: await SignedEvent.verify(raw),
|
||||
raw,
|
||||
}
|
||||
} else if (json.length === 2) {
|
||||
if (json[0] !== "NOTICE") {
|
||||
throw new ProtocolError(`expected "NOTICE" message, but got: ${data}`)
|
||||
}
|
||||
if (typeof json[1] !== "string") {
|
||||
throw new ProtocolError(
|
||||
`second element of "NOTICE" should be a string, but wasn't: ${data}`
|
||||
)
|
||||
}
|
||||
return {
|
||||
kind: IncomingKind.Notice,
|
||||
notice: json[1],
|
||||
}
|
||||
} else {
|
||||
throw new ProtocolError(
|
||||
`incoming message has unexpected number of elements: ${data}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
function serializeOutgoingMessage(msg: OutgoingMessage): string {
|
||||
if (msg.kind === OutgoingKind.Event) {
|
||||
return JSON.stringify(["EVENT", msg.signed.serialize()])
|
||||
} else if (msg.kind === OutgoingKind.Subscription) {
|
||||
return JSON.stringify([
|
||||
"REQ",
|
||||
msg.id.toString(),
|
||||
serializeFilters(msg.filters),
|
||||
])
|
||||
} else if (msg.kind === OutgoingKind.Unsubscription) {
|
||||
return JSON.stringify(["CLOSE", msg.id.toString()])
|
||||
} else {
|
||||
throw new Error(`invalid message: ${JSON.stringify(msg)}`)
|
||||
}
|
||||
}
|
||||
|
||||
function serializeFilters(filters: Filters): RawFilters {
|
||||
return {
|
||||
ids: filters.ids?.map((id) => id.toString()),
|
||||
authors: filters.authors?.map((author) => author.toString()),
|
||||
kinds: filters.kinds?.map((kind) => kind),
|
||||
["#e"]: filters.eventTags?.map((e) => e.toString()),
|
||||
["#p"]: filters.pubkeyTags?.map((p) => p.toString()),
|
||||
// TODO The Math.floor has been repeated too many times at this point, have a unix timestamp function in event.ts
|
||||
since:
|
||||
filters.since !== undefined
|
||||
? Math.floor(filters.since.getTime() / 1000)
|
||||
: undefined,
|
||||
until:
|
||||
filters.until !== undefined
|
||||
? Math.floor(filters.until.getTime() / 1000)
|
||||
: undefined,
|
||||
limit: filters.limit,
|
||||
}
|
||||
}
|
||||
|
||||
function parseEventData(json: object): RawEvent {
|
||||
if (
|
||||
typeof json["id"] !== "string" ||
|
||||
typeof json["pubkey"] !== "string" ||
|
||||
typeof json["created_at"] !== "number" ||
|
||||
typeof json["kind"] !== "number" ||
|
||||
!(json["tags"] instanceof Array) ||
|
||||
!json["tags"].every(
|
||||
(x) => x instanceof Array && x.every((y) => typeof y === "string")
|
||||
) ||
|
||||
typeof json["content"] !== "string" ||
|
||||
typeof json["sig"] !== "string"
|
||||
) {
|
||||
throw new ProtocolError(`invalid event: ${JSON.stringify(json)}`)
|
||||
}
|
||||
return json as RawEvent
|
||||
}
|
||||
|
||||
function parseJson(data: string) {
|
||||
try {
|
||||
return JSON.parse(data)
|
||||
} catch (e) {
|
||||
throw new ProtocolError(`invalid event json: ${data}`)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user