From 867b66531bde0ed598fa999ddb45fdcb12c4885b Mon Sep 17 00:00:00 2001 From: SondreB Date: Fri, 3 Feb 2023 16:32:14 +0100 Subject: [PATCH] Fix issue with .close on relay never returning --- package-lock.json | 14 +- package.json | 5 +- src/app/queue/queue.ts | 3 +- src/app/services/interfaces.ts | 1 + src/app/services/relay.ts | 100 +++---- src/app/settings/settings.ts | 8 - src/app/shared/relay/relay.ts | 1 + src/app/types/relay.ts | 1 + src/app/workers/relay.ts | 448 +++++++++++++++++++++++++--- src/app/workers/relay.worker.ts | 508 +------------------------------- 10 files changed, 470 insertions(+), 619 deletions(-) diff --git a/package-lock.json b/package-lock.json index 71bd86a..e6c7df6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -33,7 +33,7 @@ "moment": "^2.29.4", "ngx-colors": "^3.1.4", "ngx-loading-buttons": "^15.0.0", - "nostr-tools": "1.2.0", + "nostr-tools": "1.2.1", "qrcode": "^1.5.1", "qs": "^6.11.0", "rxjs": "~7.8.0", @@ -9318,9 +9318,9 @@ } }, "node_modules/nostr-tools": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.0.tgz", - "integrity": "sha512-5bISmkAsFP2ZgSWlWo7Ra0PHpkDRDKylOxQQAfWs3I8OpuQBUmOKuQAAO2jYSFa3FGRvcCFmF+tO7+m533dtLw==", + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.1.tgz", + "integrity": "sha512-SL0sst29mrQ7oUPGQn+NMH4yuTe69a8S4bliNpYB2IG0fDl3Cx+xSLnuCTb4nZiNalatYsA5l+751wQiDGA3+A==", "dependencies": { "@noble/hashes": "^0.5.7", "@noble/secp256k1": "^1.7.0", @@ -19605,9 +19605,9 @@ "dev": true }, "nostr-tools": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.0.tgz", - "integrity": "sha512-5bISmkAsFP2ZgSWlWo7Ra0PHpkDRDKylOxQQAfWs3I8OpuQBUmOKuQAAO2jYSFa3FGRvcCFmF+tO7+m533dtLw==", + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.1.tgz", + "integrity": "sha512-SL0sst29mrQ7oUPGQn+NMH4yuTe69a8S4bliNpYB2IG0fDl3Cx+xSLnuCTb4nZiNalatYsA5l+751wQiDGA3+A==", "requires": { "@noble/hashes": "^0.5.7", "@noble/secp256k1": "^1.7.0", diff --git a/package.json b/package.json index ea06330..0b4b688 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,8 @@ "version": "0.0.0", "scripts": { "ng": "ng", - "start": "ng serve --hmr", + "start": "ng serve", + "hot": "ng serve --hmr", "build": "ng build", "watch": "ng build --watch --configuration development", "test": "ng test", @@ -37,7 +38,7 @@ "moment": "^2.29.4", "ngx-colors": "^3.1.4", "ngx-loading-buttons": "^15.0.0", - "nostr-tools": "1.2.0", + "nostr-tools": "1.2.1", "qrcode": "^1.5.1", "qs": "^6.11.0", "rxjs": "~7.8.0", diff --git a/src/app/queue/queue.ts b/src/app/queue/queue.ts index 075784e..178b41e 100644 --- a/src/app/queue/queue.ts +++ b/src/app/queue/queue.ts @@ -15,10 +15,9 @@ export class QueueComponent { ngOnInit() { this.appState.updateTitle('Media Queue'); - } remove(item: MediaItem) { - this.media.dequeue (item); + this.media.dequeue(item); } } diff --git a/src/app/services/interfaces.ts b/src/app/services/interfaces.ts index 2fd5ea4..d1365fe 100644 --- a/src/app/services/interfaces.ts +++ b/src/app/services/interfaces.ts @@ -93,6 +93,7 @@ export interface NostrRelayDocument { modified?: number; type: number; timeouts?: number; + eventcount?: number; } /** OBSOLETE */ diff --git a/src/app/services/relay.ts b/src/app/services/relay.ts index d0ddca7..a05f1c7 100644 --- a/src/app/services/relay.ts +++ b/src/app/services/relay.ts @@ -77,7 +77,7 @@ export class RelayService { // Whenever the visibility becomes visible, run connect to ensure we're connected to the relays. this.appState.visibility$.subscribe((visible) => { if (visible) { - this.connect(); + // this.connect(); } }); @@ -144,6 +144,18 @@ export class RelayService { } } + setRelayCounter(url: string) { + const item = this.items2.find((r) => r.url == url); + + if (item) { + if (item.eventcount == null) { + item.eventcount = 0; + } + + item.eventcount++; + } + } + async setRelayNIP11(url: string, data: any) { console.log('setRelayNIP11:', data); @@ -202,11 +214,17 @@ export class RelayService { // Relays to remove const relaysToRemove = this.items2.filter((r) => keepRelays.indexOf(r.url) == -1); + console.log('relaysToRemove:', relaysToRemove); + for (let index = 0; index < relaysToRemove.length; index++) { const relay = relaysToRemove[index]; await this.db.storage.deleteRelay(relay.url); + console.log(`${relay.url}: Deleted from database.`); + const worker = this.workers.find((w) => w.url == relay.url); + console.log(`${relay.url}: Terminating this Web Worker!`, worker?.url); + worker?.terminate(); } @@ -366,13 +384,14 @@ export class RelayService { break; case 'terminated': // When being terminated, we'll remove this worker from the array. - console.log('WE HAVE TERMINATED:', url); + console.log(`${url}: WE HAVE TERMINATED`); const index = this.workers.findIndex((v) => v.url == url); // Set the status and then terminate this instance. const worker = this.workers[index]; worker.status = 'terminated'; + console.log(`${url}: Calling actually TERMINATE on Web Worker!`); // Perform the actual termination of the Web Worker. worker.worker?.terminate(); @@ -386,6 +405,7 @@ export class RelayService { break; case 'event': console.log('EVENT FROM:', url); + this.setRelayCounter(url); await this.processEvent(response); break; case 'nip11': @@ -414,44 +434,32 @@ export class RelayService { // Avoid adding duplicate workers, but make sure we initiate a connect action. if (index > -1) { + console.log(`${url}: This relay already exists, calling connect on it.`); this.workers[index].connect(undefined, event); - - // TODO: Make sure we also re-create subscriptions. return; } const relayType = new RelayType(url); + console.log(`${url}: Creating this web worker.`); + + // Append the worker immediately to the array. + this.workers.push(relayType); + const worker = relayType.start(); worker.onmessage = async (ev) => { + console.log(`${relayType.url}: onmessage`, ev.data); await this.handleRelayMessage(ev, relayType.url); }; worker.onerror = async (ev) => { + console.log(`${relayType.url}: onerror`, ev.error); await this.handleRelayError(ev, relayType.url); }; - this.workers.push(relayType); - relayType.connect(this.subs2, event); - // if (typeof Worker !== 'undefined') { - // // Create a new - // const worker = new Worker(new URL('../workers/relay.worker', import.meta.url)); - - // worker.onmessage = ({ data }) => { - // console.log(`page got message: ${JSON.stringify(data)}`); - // }; - - // // worker.postMessage({ url: url, message: 'hello world' }); - // return worker; - // } else { - // // Web Workers are not supported in this environment. - // // You should add a fallback so that your program still executes correctly. - // alert('Your browser does not support Web Workers and the app cannot continue to work.'); - // } - - // return undefined; + console.table(this.workers); } getActiveRelay(url: string) { @@ -596,45 +604,15 @@ export class RelayService { return this.relays.filter((r) => r.status === 1); } - async connect() { - const enabledRelays = this.items2.filter((r) => r.type == 1); + // async connect() { + // debugger; + // const enabledRelays = this.items2.filter((r) => r.type == 1); - for (let index = 0; index < enabledRelays.length; index++) { - const relay = enabledRelays[index]; - - this.createRelayWorker(relay.url); - } - - // const items = await this.table.toArray(); - // let relayCountCountdown = items.filter((i: { enabled: boolean }) => i.enabled !== false).length; - // const observables = []; - // for (var i = 0; i < items.length; i++) { - // const entry = items[i]; - // const existingConnection = this.relays.find((r) => r.url == entry.url); - // // If we are already connected, skip opening connection again. - // if (existingConnection && (existingConnection.status == 1 || existingConnection.metadata.enabled === false)) { - // continue; - // } - // observables.push(this.openConnection(entry)); - // } - // let timer: any; - // merge(...observables).subscribe(() => { - // // As we receive an initial connection, let's create a new observable that will trigger the connection status - // // update either when everything is connected or a timeout is reached. - // relayCountCountdown--; - // // If we reach zero, update the status immediately. - // if (relayCountCountdown == 0) { - // clearTimeout(timer); - // this.appState.updateConnectionStatus(true); - // } - // if (!timer) { - // // Wait a maximum of 3 seconds for all connections to finish. - // timer = setTimeout(() => { - // this.appState.updateConnectionStatus(true); - // }, 3000); - // } - // }); - } + // for (let index = 0; index < enabledRelays.length; index++) { + // const relay = enabledRelays[index]; + // this.createRelayWorker(relay.url); + // } + // } // async reset() { // console.log('RESET RUNNING!'); diff --git a/src/app/settings/settings.ts b/src/app/settings/settings.ts index e1f9674..f360976 100644 --- a/src/app/settings/settings.ts +++ b/src/app/settings/settings.ts @@ -93,9 +93,6 @@ export class SettingsComponent { async getDefaultRelays() { // Append the default relays. await this.relayService.appendRelays(this.relayService.defaultRelays); - - // Initiate connection to the updated relay list. - await this.relayService.connect(); } // private getPublicPublicKeys() { @@ -121,9 +118,6 @@ export class SettingsComponent { // Append the default relays. await this.relayService.appendRelays(relays); - - // Initiate connection to the updated relay list. - await this.relayService.connect(); } ngOnInit() { @@ -163,8 +157,6 @@ export class SettingsComponent { } await this.relayService.appendRelay(result.url, result.read, result.write); - - await this.relayService.connect(); }); } } diff --git a/src/app/shared/relay/relay.ts b/src/app/shared/relay/relay.ts index 902a266..adee913 100644 --- a/src/app/shared/relay/relay.ts +++ b/src/app/shared/relay/relay.ts @@ -64,6 +64,7 @@ export class RelayComponent { this.relayService.terminate(this.relay.url); } else { this.relayService.createRelayWorker(this.relay.url); + } } diff --git a/src/app/types/relay.ts b/src/app/types/relay.ts index f648a6d..a738f4c 100644 --- a/src/app/types/relay.ts +++ b/src/app/types/relay.ts @@ -9,6 +9,7 @@ export class RelayType { status = 'ok'; start() { + console.log(`${this.url}: start method new Worker called.`); this.worker = new Worker(new URL('../workers/relay.worker', import.meta.url)); return this.worker; } diff --git a/src/app/workers/relay.ts b/src/app/workers/relay.ts index 859b9ec..08191d1 100644 --- a/src/app/workers/relay.ts +++ b/src/app/workers/relay.ts @@ -1,51 +1,421 @@ -// import { Relay, relayInit, Sub } from 'nostr-tools'; -// import { NostrRelay } from '../services/interfaces'; +import { Event, Filter, relayInit } from 'nostr-tools'; +import { NostrRelay, NostrRelaySubscription, NostrSub, QueryJob } from '../services/interfaces'; +import { RelayResponse } from '../services/messages'; +import { Queue } from '../services/queue'; -// export class RelayWorker { -// subs: Sub[] = []; +export class RelayWorker { + relay!: NostrRelay; -// relays: NostrRelay[] = []; + /** These are the subscription instances connected to the relay. */ + // subs: NostrSub[] = []; + /** These are the subscriptions the app has requested and manages. */ + subscriptions: NostrRelaySubscription[] = []; -// constructor() {} + queue: Queue; -// async connect(url: string) { -// // const relay = relayInit('wss://relay.nostr.info'); -// const relay = relayInit(url) as NostrRelay; -// relay.subscriptions = []; + constructor(public url: string) { + this.queue = new Queue(); + } -// relay.on('connect', () => { -// console.log(`connected to ${relay?.url}`); -// // onConnected(relay); -// //this.onConnected(relay); -// }); + async publish(event: Event) { + let pub = this.relay.publish(event); + pub.on('ok', () => { + console.log(`${this.relay.url} has accepted our event`); + }); + pub.on('seen', () => { + console.log(`we saw the event on ${this.relay.url}`); + }); + pub.on('failed', (reason: any) => { + console.log(`failed to publish to ${this.relay.url}: ${reason}`); + }); + } -// relay.on('disconnect', () => { -// console.log(`DISCONNECTED! ${relay?.url}`); -// relay.subscriptions = []; -// }); + /** Enques a job to be processed against connected relays. */ + enque(job: QueryJob) { + // It is way more optimal to just delegate jobs into separate queues when enquing than querying later. + if (job.type == 'Profile') { + this.queue.queues.profile.jobs.push(job); + } else if (job.type == 'Contacts') { + this.queue.queues.contacts.jobs.push(job); + } else if (job.type == 'Event') { + this.queue.queues.event.jobs.push(job); + } else { + throw Error(`This type of job (${job.type}) is currently not supported.`); + } -// relay.on('notice', (msg: any) => { -// console.log(`NOTICE FROM ${relay?.url}: ${msg}`); -// }); + console.log(`${this.url}: Job enqued...processing...`); -// // Keep a reference of the metadata on the relay instance. -// // relay.metadata = server; + // We always delay the processing in case we receive more. + setTimeout(() => { + this.process(); + }, 150); + } -// if (relay.metadata.enabled == undefined) { -// relay.metadata.enabled = true; -// } + process() { + this.processProfiles(); + this.processContacts(); + this.processSubscriptions(); + } -// try { -// if (relay.metadata.enabled) { -// await relay.connect(); -// } -// } catch (err) { -// console.log(err); -// relay.metadata.error = 'Unable to connect.'; -// } + processSubscriptions() { + if (!this.relay || this.relay.status != 1) { + return; + } -// // await this.addRelay(relay); + if (this.queue.queues.subscriptions.jobs.length == 0) { + return; + } -// return relay; -// } -// } + while (this.queue.queues.subscriptions.jobs.length) { + const job = this.queue.queues.subscriptions.jobs.shift(); + + if (job) { + this.subscribe(job.filters, job.id); + } + } + } + + processProfiles() { + if (!this.relay || this.relay.status != 1 || this.queue.queues.profile.active) { + console.log(`${this.url}: processProfiles: Relay not ready or currently active: ${this.queue.queues.profile.active}.`, this.relay); + return; + } + + console.log(`${this.url}: processProfiles: Processing with downloading... Count: ` + this.queue.queues.profile.jobs.length); + + if (this.queue.queues.profile.jobs.length == 0) { + this.queue.queues.profile.active = false; + return; + } + + this.queue.queues.profile.active = true; + + console.log(this.relay); + + const profilesToDownload = this.queue.queues.profile.jobs + .splice(0, 500) + .map((j) => j.identifier) + .filter((v, i, a) => a.indexOf(v) === i); // Unique, it can happen that multiple of same is added. + + console.log('profilesToDownload:', profilesToDownload); + + this.downloadProfile(profilesToDownload, profilesToDownload.length * 3); + + // const job = this.queue.queues.profile.jobs.shift(); + // console.log(`${this.url}: processProfiles: Job: `, job); + // this.downloadProfile(job!.identifier); + } + + processContacts() { + if (!this.relay || this.relay.status != 1 || this.queue.queues.contacts.active) { + return; + } + + if (this.queue.queues.contacts.jobs.length == 0) { + this.queue.queues.contacts.active = false; + return; + } + + this.queue.queues.contacts.active = true; + const job = this.queue.queues.contacts.jobs.shift(); + + this.downloadContacts(job!.identifier, () => { + this.queue.queues.contacts.active = false; + this.processContacts(); + }); + } + + processEvents() {} + + /** Provide event to publish and terminate immediately. */ + async connect(event?: any) { + // const relay = relayInit('wss://relay.nostr.info'); + const relay = relayInit(this.url) as NostrRelay; + // relay.subscriptions = []; + + this.relay = relay; + + relay.on('connect', async () => { + console.log(`${this.url}: Connected.`); + postMessage({ type: 'status', data: 1, url: relay.url } as RelayResponse); + + // If there was an event provided, publish it and then disconnect. + if (event) { + await this.publish(event); + await this.disconnect(); + postMessage({ type: 'terminated', url: this.url } as RelayResponse); + } else { + // Make sure we set the relay as well before processing. + // this.relay = relay; + + // Upon connection, make sure we process anything that is in the queue immediately: + this.process(); + // onConnected(relay); + //this.onConnected(relay); + } + }); + + relay.on('disconnect', () => { + console.log(`${this.url}: DISCONNECTED!`); + this.subscriptions = []; + postMessage({ type: 'status', data: 0, url: relay.url } as RelayResponse); + }); + + relay.on('notice', (msg: any) => { + console.log(`${this.url}: NOTICE: ${msg}`); + postMessage({ type: 'notice', data: msg, url: relay.url } as RelayResponse); + }); + + try { + await relay.connect(); + } catch (err) { + postMessage({ type: 'error', relay: this.url, error: 'Unable to connect.' }); + console.error('Unable to connect.'); + } + } + + async disconnect() { + if (this.relay.status == 1) { + console.log(`${this.url}: relay.status: ${this.relay.status}, calling close!`); + return this.relay.close(); + } + } + + unsubscribe(id: string) { + const index = this.subscriptions.findIndex((s) => s.id === id); + + if (index == -1) { + return; + } + + const sub = this.subscriptions[index]; + this.subscriptions.splice(index, 1); + + // Unsub from the relay. + sub.sub?.unsub(); + console.log('Unsubscribed: ', id); + } + + // subscribeAll(subscriptions: NostrRelaySubscription[]) { + // debugger; + + // if (!subscriptions) { + // return; + // } + + // for (let index = 0; index < subscriptions.length; index++) { + // const sub = subscriptions[index]; + // this.subscribe(sub.filters, sub.id); + // } + // } + + profileSub?: NostrSub; + profileTimer?: any; + + contactsSub?: NostrSub; + contactsTimer?: any; + + clearProfileSub() { + this.profileSub?.unsub(); + this.profileSub = undefined; + } + + clearContactsSub() { + this.contactsSub?.unsub(); + this.contactsSub = undefined; + } + + downloadProfile(pubkeys: string[], timeoutSeconds: number = 12) { + console.log('DOWNLOAD PROFILE....'); + let finalizedCalled = false; + + if (!this.relay) { + debugger; + console.warn('This relay does not have active connection and download cannot be executed at this time.'); + return; + } + + // If the profilesub already exists, unsub and remove. + if (this.profileSub) { + console.log('Profile sub already existed, unsub before continue.'); + this.clearProfileSub(); + } + + // Skip if the subscription is already added. + // if (this.subscriptions.findIndex((s) => s.id == id) > -1) { + // debugger; + // console.log('This subscription is already added!'); + // return; + // } + + const sub = this.relay.sub([{ kinds: [0], authors: pubkeys }]) as NostrSub; + this.profileSub = sub; + // sub.id = id; + // console.log('SUBSCRIPTION:', sub); + // this.subscriptions.push({ id: id, filters: filters, sub: sub }); + + // const sub = relay.sub(filters, {}) as NostrSubscription; + // relay.subscriptions.push(sub); + + sub.on('event', (originalEvent: any) => { + console.log('POST MESSAGE BACK TO MAIN'); + postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); + console.log('FINISHED POST MESSAGE BACK TO MAIN'); + // console.log('CLEAR PROFILE SUBSCRIPTION....'); + + // this.clearProfileSub(); + // clearTimeout(this.profileTimer); + // console.log('FINISHED CLEAR PROFILE SUBSCRIPTION....'); + + // this.queue.queues.profile.active = false; + // this.processProfiles(); + + // if (!finalizedCalled) { + // finalizedCalled = true; + // console.log('Calling finalized!!!'); + // finalized(); + // console.log('Called finalized!!!'); + // } + + // console.log('Profile event received, finalized called.'); + + // const event = this.eventService.processEvent(originalEvent); + // if (!event) { + // return; + // } + // observer.next(event); + }); + + sub.on('eose', () => { + console.log('eose on profile, profile likely not found.'); + clearTimeout(this.profileTimer); + this.clearProfileSub(); + this.queue.queues.profile.active = false; + this.processProfiles(); + }); + + console.log('REGISTER TIMEOUT!!', timeoutSeconds * 1000); + + this.profileTimer = setTimeout(() => { + console.warn(`${this.url}: Profile download timeout reached.`); + this.clearProfileSub(); + this.queue.queues.profile.active = false; + this.processProfiles(); + + postMessage({ url: this.url, type: 'timeout', data: { type: 'Profile', identifier: pubkeys } } as RelayResponse); + + // if (!finalizedCalled) { + // finalizedCalled = true; + // finalized(); + // } + }, timeoutSeconds * 1000); + } + + downloadContacts(pubkey: string, finalized: any, timeoutSeconds: number = 3000) { + console.log('DOWNLOAD CONTACTS....'); + let finalizedCalled = false; + + if (!this.relay) { + console.warn('This relay does not have active connection and download cannot be executed at this time.'); + return; + } + + // If the profilesub already exists, unsub and remove. + if (this.contactsSub) { + this.clearContactsSub(); + } + + const sub = this.relay.sub([{ kinds: [3], authors: [pubkey] }]) as NostrSub; + this.contactsSub = sub; + + sub.on('event', (originalEvent: any) => { + postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); + this.clearContactsSub(); + clearTimeout(this.contactsTimer); + if (!finalizedCalled) { + finalizedCalled = true; + finalized(); + } + }); + + this.contactsTimer = setTimeout(() => { + this.clearContactsSub(); + if (!finalizedCalled) { + finalizedCalled = true; + finalized(); + } + }, timeoutSeconds * 1000); + } + + subscribe(filters: Filter[], id: string) { + console.log('SUBSCRIBE....'); + + if (!this.relay || this.relay.status != 1) { + // If we don't have a connection yet, schedule the subscription to be added later. + this.queue.queues.subscriptions.jobs.push({ id: id, filters: filters }); + console.warn('This relay does not have active connection and subscription cannot be created at this time. Subscription has been scheduled for adding later.'); + return; + } + + // Skip if the subscription is already added. + if (this.subscriptions.findIndex((s) => s.id == id) > -1) { + console.log('This subscription is already added!'); + return; + } + + const sub = this.relay.sub(filters) as NostrSub; + // sub.id = id; + + console.log('SUBSCRIPTION:', sub); + this.subscriptions.push({ id: id, filters: filters, sub: sub }); + + // const sub = relay.sub(filters, {}) as NostrSubscription; + // relay.subscriptions.push(sub); + + sub.on('event', (originalEvent: any) => { + postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); + // const event = this.eventService.processEvent(originalEvent); + // if (!event) { + // return; + // } + // observer.next(event); + }); + + sub.on('eose', () => { + console.log('eose on:', this.url); + }); + + // return () => { + // console.log('subscribeToRelay:finished:unsub'); + // // When the observable is finished, this return function is called. + // sub.unsub(); + // const subIndex = relay.subscriptions.findIndex((s) => s == sub); + // if (subIndex > -1) { + // relay.subscriptions.splice(subIndex, 1); + // } + // }; + } + + async info() { + try { + const url = new URL(this.url); + const infoUrl = `https://${url.hostname}`; + const rawResponse = await fetch(infoUrl, { + method: 'GET', + mode: 'cors', + headers: { + Accept: 'application/nostr+json', + }, + }); + if (rawResponse.status === 200) { + const content = await rawResponse.json(); + postMessage({ type: 'nip11', data: content, url: this.url } as RelayResponse); + } else { + postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${rawResponse.statusText}` }, url: this.url } as RelayResponse); + } + } catch (err) { + console.warn(err); + postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${err}` }, url: this.url } as RelayResponse); + } + } +} diff --git a/src/app/workers/relay.worker.ts b/src/app/workers/relay.worker.ts index fa9f9ac..4302936 100644 --- a/src/app/workers/relay.worker.ts +++ b/src/app/workers/relay.worker.ts @@ -1,59 +1,12 @@ /// -import { Event, relayInit, Filter, Sub } from 'nostr-tools'; -import { NostrRelay, NostrRelaySubscription, NostrSub, QueryJob } from '../services/interfaces'; import { RelayRequest, RelayResponse } from '../services/messages'; -import { Queue } from '../services/queue'; -import { Storage } from '../types/storage'; +import { RelayWorker } from './relay'; let relayWorker: RelayWorker; -let relay = undefined; -let storage = undefined; addEventListener('message', async (ev: MessageEvent) => { - console.log('MESSAGE RECEIVED IN RELAY WORKER!!', JSON.stringify(ev.data)); - - // postMessage(ev.data); - - // storage = new Storage('blockcore-notes-' + '123', 1); - // await storage.open(); - - // // await this.storage.putCircle({ id: 1, name: 'Circle 1' }); - - // // const circle = await storage.getCircle(1); - // // console.log(circle); - - // // // await this.storage.putCircle({ id: 1, name: 'Circle 2' }); - - // // const circle2 = await storage.getCircle(1); - // // console.log(circle2); - - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 50, pubkey: '123', kind: 1, id: '1', tags: [] }); - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 100, pubkey: '123', kind: 1, id: '2', tags: [] }); - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 101, pubkey: '123', kind: 1, id: '3', tags: [] }); - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 199, pubkey: '123', kind: 1, id: '4', tags: [] }); - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 200, pubkey: '123', kind: 1, id: '5', tags: [] }); - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 201, pubkey: '123', kind: 1, id: '6', tags: [] }); - - // const start = performance.now(); - - // for (let index = 1; index < 10000; index++) { - // await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: index, pubkey: index.toString(), kind: 1, id: index.toString(), tags: [] }); - // } - - // const end = performance.now(); - // console.log(`Execution time: ${end - start} ms`); - - // const start2 = performance.now(); - - // const events = await storage.getEventsByCreated('123', IDBKeyRange.bound(100, 200)); - // console.log('FOUND EVENTS:', events); - - // const end2 = performance.now(); - // console.log(`Execution time 2: ${end2 - start2} ms`); - - // const events2 = await storage.getEventsByCreated('123', IDBKeyRange.bound(0, 100)); - // console.log('FOUND EVENTS2:', events2); + console.log(`${relayWorker?.url}: MESSAGE RECEIVED IN RELAY WORKER!!`, JSON.stringify(ev.data)); const request = ev.data as RelayRequest; @@ -67,22 +20,14 @@ addEventListener('message', async (ev: MessageEvent) => { relayWorker = new RelayWorker(request.data.url); await relayWorker.connect(request.data.event); await relayWorker.info(); - // debugger; - // relayWorker.subscribeAll(request.data.subscriptions); break; } case 'disconnect': await relayWorker.disconnect(); - // postMessage({ type: 'disconnect', result: true } as RelayResponse); break; case 'publish': await relayWorker.publish(request.data); break; - // case 'connect-and-publish': - // await relayWorker.publish(request.data); - // await relayWorker.disconnect(); - // postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse); - // break; case 'enque': await relayWorker.enque(request.data); break; @@ -93,18 +38,15 @@ addEventListener('message', async (ev: MessageEvent) => { await relayWorker.unsubscribe(request.data); break; case 'terminate': - await relayWorker.disconnect(); + try { + await relayWorker.disconnect(); + } catch (err) { + console.error('Error during disconnect.', err); + } + console.log(`${relayWorker.url}: Sending 'terminated' event.`); postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse); break; - // case 'initialize': - // relayWorker = new RelayWorker(request.data); - // await relayWorker.connect(); - // // postMessage({ type: 'disconnect', result: true } as RelayResponse); - // break; } - - // const response = `worker response to ${data}`; - // postMessage(response); }); function yieldToMain() { @@ -112,437 +54,3 @@ function yieldToMain() { setTimeout(resolve, 0); }); } - -export class RelayWorker { - relay!: NostrRelay; - - /** These are the subscription instances connected to the relay. */ - // subs: NostrSub[] = []; - /** These are the subscriptions the app has requested and manages. */ - subscriptions: NostrRelaySubscription[] = []; - - queue: Queue; - - constructor(public url: string) { - this.queue = new Queue(); - } - - async publish(event: Event) { - let pub = this.relay.publish(event); - pub.on('ok', () => { - console.log(`${this.relay.url} has accepted our event`); - }); - pub.on('seen', () => { - console.log(`we saw the event on ${this.relay.url}`); - }); - pub.on('failed', (reason: any) => { - console.log(`failed to publish to ${this.relay.url}: ${reason}`); - }); - } - - /** Enques a job to be processed against connected relays. */ - enque(job: QueryJob) { - // It is way more optimal to just delegate jobs into separate queues when enquing than querying later. - if (job.type == 'Profile') { - this.queue.queues.profile.jobs.push(job); - } else if (job.type == 'Contacts') { - this.queue.queues.contacts.jobs.push(job); - } else if (job.type == 'Event') { - this.queue.queues.event.jobs.push(job); - } else { - throw Error(`This type of job (${job.type}) is currently not supported.`); - } - - console.log(`${this.url}: Job enqued...processing...`); - - // We always delay the processing in case we receive more. - setTimeout(() => { - this.process(); - }, 150); - } - - process() { - this.processProfiles(); - this.processContacts(); - this.processSubscriptions(); - } - - processSubscriptions() { - if (!this.relay || this.relay.status != 1) { - return; - } - - if (this.queue.queues.subscriptions.jobs.length == 0) { - return; - } - - while (this.queue.queues.subscriptions.jobs.length) { - const job = this.queue.queues.subscriptions.jobs.shift(); - - if (job) { - this.subscribe(job.filters, job.id); - } - } - } - - processProfiles() { - if (!this.relay || this.relay.status != 1 || this.queue.queues.profile.active) { - console.log(`${this.url}: processProfiles: Relay not ready or currently active: ${this.queue.queues.profile.active}.`, this.relay); - return; - } - - console.log(`${this.url}: processProfiles: Processing with downloading... Count: ` + this.queue.queues.profile.jobs.length); - - if (this.queue.queues.profile.jobs.length == 0) { - this.queue.queues.profile.active = false; - return; - } - - this.queue.queues.profile.active = true; - - console.log(this.relay); - - debugger; - - const profilesToDownload = this.queue.queues.profile.jobs.splice(0, 500); - - console.log('profilesToDownload:', profilesToDownload); - - this.downloadProfile( - profilesToDownload.map((j) => j.identifier), - profilesToDownload.length * 3 - ); - - // const job = this.queue.queues.profile.jobs.shift(); - // console.log(`${this.url}: processProfiles: Job: `, job); - // this.downloadProfile(job!.identifier); - } - - processContacts() { - if (!this.relay || this.relay.status != 1 || this.queue.queues.contacts.active) { - return; - } - - if (this.queue.queues.contacts.jobs.length == 0) { - this.queue.queues.contacts.active = false; - return; - } - - this.queue.queues.contacts.active = true; - const job = this.queue.queues.contacts.jobs.shift(); - - this.downloadContacts(job!.identifier, () => { - this.queue.queues.contacts.active = false; - this.processContacts(); - }); - } - - processEvents() {} - - /** Provide event to publish and terminate immediately. */ - async connect(event?: any) { - // const relay = relayInit('wss://relay.nostr.info'); - const relay = relayInit(this.url) as NostrRelay; - // relay.subscriptions = []; - - this.relay = relay; - - relay.on('connect', async () => { - console.log(`${this.url}: Connected.`); - postMessage({ type: 'status', data: 1, url: relay.url } as RelayResponse); - - // If there was an event provided, publish it and then disconnect. - if (event) { - await this.publish(event); - await this.disconnect(); - postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse); - } else { - // Make sure we set the relay as well before processing. - // this.relay = relay; - - // Upon connection, make sure we process anything that is in the queue immediately: - this.process(); - // onConnected(relay); - //this.onConnected(relay); - } - }); - - relay.on('disconnect', () => { - console.log(`${this.url}: DISCONNECTED!`); - this.subscriptions = []; - postMessage({ type: 'status', data: 0, url: relay.url } as RelayResponse); - }); - - relay.on('notice', (msg: any) => { - console.log(`${this.url}: NOTICE: ${msg}`); - postMessage({ type: 'notice', data: msg, url: relay.url } as RelayResponse); - }); - - // Keep a reference of the metadata on the relay instance. - // relay.metadata = server; - - // if (relay.metadata.enabled == undefined) { - // relay.metadata.enabled = true; - // } - - try { - // if (relay.metadata.enabled) { - await relay.connect(); - // } - } catch (err) { - postMessage({ relay: this.url, error: 'Unable to connect.' }); - console.log(err); - return; - // relay.metadata.error = 'Unable to connect.'; - } - - // this.relay = relay; - // console.log(`${this.url}: THIS.RELAY WAS SET:`, this.relay); - // await this.addRelay(relay); - } - - async disconnect() { - // this.subscriptions = []; - return this.relay.close(); - } - - unsubscribe(id: string) { - const index = this.subscriptions.findIndex((s) => s.id === id); - - if (index == -1) { - return; - } - - const sub = this.subscriptions[index]; - this.subscriptions.splice(index, 1); - - // Unsub from the relay. - sub.sub?.unsub(); - console.log('Unsubscribed: ', id); - } - - // subscribeAll(subscriptions: NostrRelaySubscription[]) { - // debugger; - - // if (!subscriptions) { - // return; - // } - - // for (let index = 0; index < subscriptions.length; index++) { - // const sub = subscriptions[index]; - // this.subscribe(sub.filters, sub.id); - // } - // } - - profileSub?: NostrSub; - profileTimer?: any; - - contactsSub?: NostrSub; - contactsTimer?: any; - - clearProfileSub() { - this.profileSub?.unsub(); - this.profileSub = undefined; - } - - clearContactsSub() { - this.contactsSub?.unsub(); - this.contactsSub = undefined; - } - - downloadProfile(pubkeys: string[], timeoutSeconds: number = 12) { - console.log('DOWNLOAD PROFILE....'); - let finalizedCalled = false; - - if (!this.relay) { - debugger; - console.warn('This relay does not have active connection and download cannot be executed at this time.'); - return; - } - - // If the profilesub already exists, unsub and remove. - if (this.profileSub) { - console.log('Profile sub already existed, unsub before continue.'); - this.clearProfileSub(); - } - - // Skip if the subscription is already added. - // if (this.subscriptions.findIndex((s) => s.id == id) > -1) { - // debugger; - // console.log('This subscription is already added!'); - // return; - // } - - const sub = this.relay.sub([{ kinds: [0], authors: pubkeys }]) as NostrSub; - this.profileSub = sub; - // sub.id = id; - // console.log('SUBSCRIPTION:', sub); - // this.subscriptions.push({ id: id, filters: filters, sub: sub }); - - // const sub = relay.sub(filters, {}) as NostrSubscription; - // relay.subscriptions.push(sub); - - sub.on('event', (originalEvent: any) => { - console.log('POST MESSAGE BACK TO MAIN'); - postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); - console.log('FINISHED POST MESSAGE BACK TO MAIN'); - // console.log('CLEAR PROFILE SUBSCRIPTION....'); - - // this.clearProfileSub(); - // clearTimeout(this.profileTimer); - // console.log('FINISHED CLEAR PROFILE SUBSCRIPTION....'); - - // this.queue.queues.profile.active = false; - // this.processProfiles(); - - // if (!finalizedCalled) { - // finalizedCalled = true; - // console.log('Calling finalized!!!'); - // finalized(); - // console.log('Called finalized!!!'); - // } - - // console.log('Profile event received, finalized called.'); - - // const event = this.eventService.processEvent(originalEvent); - // if (!event) { - // return; - // } - // observer.next(event); - }); - - sub.on('eose', () => { - console.log('eose on profile, profile likely not found.'); - clearTimeout(this.profileTimer); - this.clearProfileSub(); - this.queue.queues.profile.active = false; - this.processProfiles(); - }); - - console.log('REGISTER TIMEOUT!!', timeoutSeconds * 1000); - - this.profileTimer = setTimeout(() => { - console.warn(`${this.url}: Profile download timeout reached.`); - this.clearProfileSub(); - this.queue.queues.profile.active = false; - this.processProfiles(); - - postMessage({ url: this.url, type: 'timeout', data: { type: 'Profile', identifier: pubkeys } } as RelayResponse); - - // if (!finalizedCalled) { - // finalizedCalled = true; - // finalized(); - // } - }, timeoutSeconds * 1000); - } - - downloadContacts(pubkey: string, finalized: any, timeoutSeconds: number = 3000) { - console.log('DOWNLOAD CONTACTS....'); - let finalizedCalled = false; - - if (!this.relay) { - console.warn('This relay does not have active connection and download cannot be executed at this time.'); - return; - } - - // If the profilesub already exists, unsub and remove. - if (this.contactsSub) { - this.clearContactsSub(); - } - - const sub = this.relay.sub([{ kinds: [3], authors: [pubkey] }]) as NostrSub; - this.contactsSub = sub; - - sub.on('event', (originalEvent: any) => { - postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); - this.clearContactsSub(); - clearTimeout(this.contactsTimer); - if (!finalizedCalled) { - finalizedCalled = true; - finalized(); - } - }); - - this.contactsTimer = setTimeout(() => { - this.clearContactsSub(); - if (!finalizedCalled) { - finalizedCalled = true; - finalized(); - } - }, timeoutSeconds * 1000); - } - - subscribe(filters: Filter[], id: string) { - console.log('SUBSCRIBE....'); - - if (!this.relay || this.relay.status != 1) { - // If we don't have a connection yet, schedule the subscription to be added later. - this.queue.queues.subscriptions.jobs.push({ id: id, filters: filters }); - console.warn('This relay does not have active connection and subscription cannot be created at this time. Subscription has been scheduled for adding later.'); - return; - } - - // Skip if the subscription is already added. - if (this.subscriptions.findIndex((s) => s.id == id) > -1) { - console.log('This subscription is already added!'); - return; - } - - const sub = this.relay.sub(filters) as NostrSub; - // sub.id = id; - - console.log('SUBSCRIPTION:', sub); - this.subscriptions.push({ id: id, filters: filters, sub: sub }); - - // const sub = relay.sub(filters, {}) as NostrSubscription; - // relay.subscriptions.push(sub); - - sub.on('event', (originalEvent: any) => { - postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse); - // const event = this.eventService.processEvent(originalEvent); - // if (!event) { - // return; - // } - // observer.next(event); - }); - - sub.on('eose', () => { - console.log('eose on:', this.url); - }); - - // return () => { - // console.log('subscribeToRelay:finished:unsub'); - // // When the observable is finished, this return function is called. - // sub.unsub(); - // const subIndex = relay.subscriptions.findIndex((s) => s == sub); - // if (subIndex > -1) { - // relay.subscriptions.splice(subIndex, 1); - // } - // }; - } - - async info() { - try { - const url = new URL(this.url); - const infoUrl = `https://${url.hostname}`; - - const rawResponse = await fetch(infoUrl, { - method: 'GET', - mode: 'cors', - headers: { - Accept: 'application/nostr+json', - }, - }); - - if (rawResponse.status === 200) { - const content = await rawResponse.json(); - postMessage({ type: 'nip11', data: content, url: this.url } as RelayResponse); - } else { - postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${rawResponse.statusText}` }, url: this.url } as RelayResponse); - } - } catch (err) { - console.warn(err); - postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${err}` }, url: this.url } as RelayResponse); - } - } -}