Fix issue with .close on relay never returning

This commit is contained in:
SondreB 2023-02-03 16:32:14 +01:00
parent 5f80ad4e06
commit 867b66531b
No known key found for this signature in database
GPG Key ID: D6CC44C75005FDBF
10 changed files with 470 additions and 619 deletions

14
package-lock.json generated
View File

@ -33,7 +33,7 @@
"moment": "^2.29.4", "moment": "^2.29.4",
"ngx-colors": "^3.1.4", "ngx-colors": "^3.1.4",
"ngx-loading-buttons": "^15.0.0", "ngx-loading-buttons": "^15.0.0",
"nostr-tools": "1.2.0", "nostr-tools": "1.2.1",
"qrcode": "^1.5.1", "qrcode": "^1.5.1",
"qs": "^6.11.0", "qs": "^6.11.0",
"rxjs": "~7.8.0", "rxjs": "~7.8.0",
@ -9318,9 +9318,9 @@
} }
}, },
"node_modules/nostr-tools": { "node_modules/nostr-tools": {
"version": "1.2.0", "version": "1.2.1",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.0.tgz", "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.1.tgz",
"integrity": "sha512-5bISmkAsFP2ZgSWlWo7Ra0PHpkDRDKylOxQQAfWs3I8OpuQBUmOKuQAAO2jYSFa3FGRvcCFmF+tO7+m533dtLw==", "integrity": "sha512-SL0sst29mrQ7oUPGQn+NMH4yuTe69a8S4bliNpYB2IG0fDl3Cx+xSLnuCTb4nZiNalatYsA5l+751wQiDGA3+A==",
"dependencies": { "dependencies": {
"@noble/hashes": "^0.5.7", "@noble/hashes": "^0.5.7",
"@noble/secp256k1": "^1.7.0", "@noble/secp256k1": "^1.7.0",
@ -19605,9 +19605,9 @@
"dev": true "dev": true
}, },
"nostr-tools": { "nostr-tools": {
"version": "1.2.0", "version": "1.2.1",
"resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.0.tgz", "resolved": "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.2.1.tgz",
"integrity": "sha512-5bISmkAsFP2ZgSWlWo7Ra0PHpkDRDKylOxQQAfWs3I8OpuQBUmOKuQAAO2jYSFa3FGRvcCFmF+tO7+m533dtLw==", "integrity": "sha512-SL0sst29mrQ7oUPGQn+NMH4yuTe69a8S4bliNpYB2IG0fDl3Cx+xSLnuCTb4nZiNalatYsA5l+751wQiDGA3+A==",
"requires": { "requires": {
"@noble/hashes": "^0.5.7", "@noble/hashes": "^0.5.7",
"@noble/secp256k1": "^1.7.0", "@noble/secp256k1": "^1.7.0",

View File

@ -3,7 +3,8 @@
"version": "0.0.0", "version": "0.0.0",
"scripts": { "scripts": {
"ng": "ng", "ng": "ng",
"start": "ng serve --hmr", "start": "ng serve",
"hot": "ng serve --hmr",
"build": "ng build", "build": "ng build",
"watch": "ng build --watch --configuration development", "watch": "ng build --watch --configuration development",
"test": "ng test", "test": "ng test",
@ -37,7 +38,7 @@
"moment": "^2.29.4", "moment": "^2.29.4",
"ngx-colors": "^3.1.4", "ngx-colors": "^3.1.4",
"ngx-loading-buttons": "^15.0.0", "ngx-loading-buttons": "^15.0.0",
"nostr-tools": "1.2.0", "nostr-tools": "1.2.1",
"qrcode": "^1.5.1", "qrcode": "^1.5.1",
"qs": "^6.11.0", "qs": "^6.11.0",
"rxjs": "~7.8.0", "rxjs": "~7.8.0",

View File

@ -15,10 +15,9 @@ export class QueueComponent {
ngOnInit() { ngOnInit() {
this.appState.updateTitle('Media Queue'); this.appState.updateTitle('Media Queue');
} }
remove(item: MediaItem) { remove(item: MediaItem) {
this.media.dequeue (item); this.media.dequeue(item);
} }
} }

View File

@ -93,6 +93,7 @@ export interface NostrRelayDocument {
modified?: number; modified?: number;
type: number; type: number;
timeouts?: number; timeouts?: number;
eventcount?: number;
} }
/** OBSOLETE */ /** OBSOLETE */

View File

@ -77,7 +77,7 @@ export class RelayService {
// Whenever the visibility becomes visible, run connect to ensure we're connected to the relays. // Whenever the visibility becomes visible, run connect to ensure we're connected to the relays.
this.appState.visibility$.subscribe((visible) => { this.appState.visibility$.subscribe((visible) => {
if (visible) { if (visible) {
this.connect(); // this.connect();
} }
}); });
@ -144,6 +144,18 @@ export class RelayService {
} }
} }
setRelayCounter(url: string) {
const item = this.items2.find((r) => r.url == url);
if (item) {
if (item.eventcount == null) {
item.eventcount = 0;
}
item.eventcount++;
}
}
async setRelayNIP11(url: string, data: any) { async setRelayNIP11(url: string, data: any) {
console.log('setRelayNIP11:', data); console.log('setRelayNIP11:', data);
@ -202,11 +214,17 @@ export class RelayService {
// Relays to remove // Relays to remove
const relaysToRemove = this.items2.filter((r) => keepRelays.indexOf(r.url) == -1); const relaysToRemove = this.items2.filter((r) => keepRelays.indexOf(r.url) == -1);
console.log('relaysToRemove:', relaysToRemove);
for (let index = 0; index < relaysToRemove.length; index++) { for (let index = 0; index < relaysToRemove.length; index++) {
const relay = relaysToRemove[index]; const relay = relaysToRemove[index];
await this.db.storage.deleteRelay(relay.url); await this.db.storage.deleteRelay(relay.url);
console.log(`${relay.url}: Deleted from database.`);
const worker = this.workers.find((w) => w.url == relay.url); const worker = this.workers.find((w) => w.url == relay.url);
console.log(`${relay.url}: Terminating this Web Worker!`, worker?.url);
worker?.terminate(); worker?.terminate();
} }
@ -366,13 +384,14 @@ export class RelayService {
break; break;
case 'terminated': case 'terminated':
// When being terminated, we'll remove this worker from the array. // When being terminated, we'll remove this worker from the array.
console.log('WE HAVE TERMINATED:', url); console.log(`${url}: WE HAVE TERMINATED`);
const index = this.workers.findIndex((v) => v.url == url); const index = this.workers.findIndex((v) => v.url == url);
// Set the status and then terminate this instance. // Set the status and then terminate this instance.
const worker = this.workers[index]; const worker = this.workers[index];
worker.status = 'terminated'; worker.status = 'terminated';
console.log(`${url}: Calling actually TERMINATE on Web Worker!`);
// Perform the actual termination of the Web Worker. // Perform the actual termination of the Web Worker.
worker.worker?.terminate(); worker.worker?.terminate();
@ -386,6 +405,7 @@ export class RelayService {
break; break;
case 'event': case 'event':
console.log('EVENT FROM:', url); console.log('EVENT FROM:', url);
this.setRelayCounter(url);
await this.processEvent(response); await this.processEvent(response);
break; break;
case 'nip11': case 'nip11':
@ -414,44 +434,32 @@ export class RelayService {
// Avoid adding duplicate workers, but make sure we initiate a connect action. // Avoid adding duplicate workers, but make sure we initiate a connect action.
if (index > -1) { if (index > -1) {
console.log(`${url}: This relay already exists, calling connect on it.`);
this.workers[index].connect(undefined, event); this.workers[index].connect(undefined, event);
// TODO: Make sure we also re-create subscriptions.
return; return;
} }
const relayType = new RelayType(url); const relayType = new RelayType(url);
console.log(`${url}: Creating this web worker.`);
// Append the worker immediately to the array.
this.workers.push(relayType);
const worker = relayType.start(); const worker = relayType.start();
worker.onmessage = async (ev) => { worker.onmessage = async (ev) => {
console.log(`${relayType.url}: onmessage`, ev.data);
await this.handleRelayMessage(ev, relayType.url); await this.handleRelayMessage(ev, relayType.url);
}; };
worker.onerror = async (ev) => { worker.onerror = async (ev) => {
console.log(`${relayType.url}: onerror`, ev.error);
await this.handleRelayError(ev, relayType.url); await this.handleRelayError(ev, relayType.url);
}; };
this.workers.push(relayType);
relayType.connect(this.subs2, event); relayType.connect(this.subs2, event);
// if (typeof Worker !== 'undefined') { console.table(this.workers);
// // Create a new
// const worker = new Worker(new URL('../workers/relay.worker', import.meta.url));
// worker.onmessage = ({ data }) => {
// console.log(`page got message: ${JSON.stringify(data)}`);
// };
// // worker.postMessage({ url: url, message: 'hello world' });
// return worker;
// } else {
// // Web Workers are not supported in this environment.
// // You should add a fallback so that your program still executes correctly.
// alert('Your browser does not support Web Workers and the app cannot continue to work.');
// }
// return undefined;
} }
getActiveRelay(url: string) { getActiveRelay(url: string) {
@ -596,45 +604,15 @@ export class RelayService {
return this.relays.filter((r) => r.status === 1); return this.relays.filter((r) => r.status === 1);
} }
async connect() { // async connect() {
const enabledRelays = this.items2.filter((r) => r.type == 1); // debugger;
// const enabledRelays = this.items2.filter((r) => r.type == 1);
for (let index = 0; index < enabledRelays.length; index++) { // for (let index = 0; index < enabledRelays.length; index++) {
const relay = enabledRelays[index]; // const relay = enabledRelays[index];
// this.createRelayWorker(relay.url);
this.createRelayWorker(relay.url);
}
// const items = await this.table.toArray();
// let relayCountCountdown = items.filter((i: { enabled: boolean }) => i.enabled !== false).length;
// const observables = [];
// for (var i = 0; i < items.length; i++) {
// const entry = items[i];
// const existingConnection = this.relays.find((r) => r.url == entry.url);
// // If we are already connected, skip opening connection again.
// if (existingConnection && (existingConnection.status == 1 || existingConnection.metadata.enabled === false)) {
// continue;
// } // }
// observables.push(this.openConnection(entry));
// } // }
// let timer: any;
// merge(...observables).subscribe(() => {
// // As we receive an initial connection, let's create a new observable that will trigger the connection status
// // update either when everything is connected or a timeout is reached.
// relayCountCountdown--;
// // If we reach zero, update the status immediately.
// if (relayCountCountdown == 0) {
// clearTimeout(timer);
// this.appState.updateConnectionStatus(true);
// }
// if (!timer) {
// // Wait a maximum of 3 seconds for all connections to finish.
// timer = setTimeout(() => {
// this.appState.updateConnectionStatus(true);
// }, 3000);
// }
// });
}
// async reset() { // async reset() {
// console.log('RESET RUNNING!'); // console.log('RESET RUNNING!');

View File

@ -93,9 +93,6 @@ export class SettingsComponent {
async getDefaultRelays() { async getDefaultRelays() {
// Append the default relays. // Append the default relays.
await this.relayService.appendRelays(this.relayService.defaultRelays); await this.relayService.appendRelays(this.relayService.defaultRelays);
// Initiate connection to the updated relay list.
await this.relayService.connect();
} }
// private getPublicPublicKeys() { // private getPublicPublicKeys() {
@ -121,9 +118,6 @@ export class SettingsComponent {
// Append the default relays. // Append the default relays.
await this.relayService.appendRelays(relays); await this.relayService.appendRelays(relays);
// Initiate connection to the updated relay list.
await this.relayService.connect();
} }
ngOnInit() { ngOnInit() {
@ -163,8 +157,6 @@ export class SettingsComponent {
} }
await this.relayService.appendRelay(result.url, result.read, result.write); await this.relayService.appendRelay(result.url, result.read, result.write);
await this.relayService.connect();
}); });
} }
} }

View File

@ -64,6 +64,7 @@ export class RelayComponent {
this.relayService.terminate(this.relay.url); this.relayService.terminate(this.relay.url);
} else { } else {
this.relayService.createRelayWorker(this.relay.url); this.relayService.createRelayWorker(this.relay.url);
} }
} }

View File

@ -9,6 +9,7 @@ export class RelayType {
status = 'ok'; status = 'ok';
start() { start() {
console.log(`${this.url}: start method new Worker called.`);
this.worker = new Worker(new URL('../workers/relay.worker', import.meta.url)); this.worker = new Worker(new URL('../workers/relay.worker', import.meta.url));
return this.worker; return this.worker;
} }

View File

@ -1,51 +1,421 @@
// import { Relay, relayInit, Sub } from 'nostr-tools'; import { Event, Filter, relayInit } from 'nostr-tools';
// import { NostrRelay } from '../services/interfaces'; import { NostrRelay, NostrRelaySubscription, NostrSub, QueryJob } from '../services/interfaces';
import { RelayResponse } from '../services/messages';
import { Queue } from '../services/queue';
// export class RelayWorker { export class RelayWorker {
// subs: Sub[] = []; relay!: NostrRelay;
// relays: NostrRelay[] = []; /** These are the subscription instances connected to the relay. */
// subs: NostrSub[] = [];
/** These are the subscriptions the app has requested and manages. */
subscriptions: NostrRelaySubscription[] = [];
// constructor() {} queue: Queue;
// async connect(url: string) { constructor(public url: string) {
// // const relay = relayInit('wss://relay.nostr.info'); this.queue = new Queue();
// const relay = relayInit(url) as NostrRelay; }
// relay.subscriptions = [];
// relay.on('connect', () => { async publish(event: Event) {
// console.log(`connected to ${relay?.url}`); let pub = this.relay.publish(event);
// // onConnected(relay); pub.on('ok', () => {
// //this.onConnected(relay); console.log(`${this.relay.url} has accepted our event`);
// }); });
pub.on('seen', () => {
console.log(`we saw the event on ${this.relay.url}`);
});
pub.on('failed', (reason: any) => {
console.log(`failed to publish to ${this.relay.url}: ${reason}`);
});
}
// relay.on('disconnect', () => { /** Enques a job to be processed against connected relays. */
// console.log(`DISCONNECTED! ${relay?.url}`); enque(job: QueryJob) {
// relay.subscriptions = []; // It is way more optimal to just delegate jobs into separate queues when enquing than querying later.
// }); if (job.type == 'Profile') {
this.queue.queues.profile.jobs.push(job);
} else if (job.type == 'Contacts') {
this.queue.queues.contacts.jobs.push(job);
} else if (job.type == 'Event') {
this.queue.queues.event.jobs.push(job);
} else {
throw Error(`This type of job (${job.type}) is currently not supported.`);
}
// relay.on('notice', (msg: any) => { console.log(`${this.url}: Job enqued...processing...`);
// console.log(`NOTICE FROM ${relay?.url}: ${msg}`);
// });
// // Keep a reference of the metadata on the relay instance. // We always delay the processing in case we receive more.
// // relay.metadata = server; setTimeout(() => {
this.process();
}, 150);
}
// if (relay.metadata.enabled == undefined) { process() {
// relay.metadata.enabled = true; this.processProfiles();
// } this.processContacts();
this.processSubscriptions();
}
// try { processSubscriptions() {
// if (relay.metadata.enabled) { if (!this.relay || this.relay.status != 1) {
// await relay.connect(); return;
// } }
// } catch (err) {
// console.log(err);
// relay.metadata.error = 'Unable to connect.';
// }
// // await this.addRelay(relay); if (this.queue.queues.subscriptions.jobs.length == 0) {
return;
}
// return relay; while (this.queue.queues.subscriptions.jobs.length) {
// } const job = this.queue.queues.subscriptions.jobs.shift();
// }
if (job) {
this.subscribe(job.filters, job.id);
}
}
}
processProfiles() {
if (!this.relay || this.relay.status != 1 || this.queue.queues.profile.active) {
console.log(`${this.url}: processProfiles: Relay not ready or currently active: ${this.queue.queues.profile.active}.`, this.relay);
return;
}
console.log(`${this.url}: processProfiles: Processing with downloading... Count: ` + this.queue.queues.profile.jobs.length);
if (this.queue.queues.profile.jobs.length == 0) {
this.queue.queues.profile.active = false;
return;
}
this.queue.queues.profile.active = true;
console.log(this.relay);
const profilesToDownload = this.queue.queues.profile.jobs
.splice(0, 500)
.map((j) => j.identifier)
.filter((v, i, a) => a.indexOf(v) === i); // Unique, it can happen that multiple of same is added.
console.log('profilesToDownload:', profilesToDownload);
this.downloadProfile(profilesToDownload, profilesToDownload.length * 3);
// const job = this.queue.queues.profile.jobs.shift();
// console.log(`${this.url}: processProfiles: Job: `, job);
// this.downloadProfile(job!.identifier);
}
processContacts() {
if (!this.relay || this.relay.status != 1 || this.queue.queues.contacts.active) {
return;
}
if (this.queue.queues.contacts.jobs.length == 0) {
this.queue.queues.contacts.active = false;
return;
}
this.queue.queues.contacts.active = true;
const job = this.queue.queues.contacts.jobs.shift();
this.downloadContacts(job!.identifier, () => {
this.queue.queues.contacts.active = false;
this.processContacts();
});
}
processEvents() {}
/** Provide event to publish and terminate immediately. */
async connect(event?: any) {
// const relay = relayInit('wss://relay.nostr.info');
const relay = relayInit(this.url) as NostrRelay;
// relay.subscriptions = [];
this.relay = relay;
relay.on('connect', async () => {
console.log(`${this.url}: Connected.`);
postMessage({ type: 'status', data: 1, url: relay.url } as RelayResponse);
// If there was an event provided, publish it and then disconnect.
if (event) {
await this.publish(event);
await this.disconnect();
postMessage({ type: 'terminated', url: this.url } as RelayResponse);
} else {
// Make sure we set the relay as well before processing.
// this.relay = relay;
// Upon connection, make sure we process anything that is in the queue immediately:
this.process();
// onConnected(relay);
//this.onConnected(relay);
}
});
relay.on('disconnect', () => {
console.log(`${this.url}: DISCONNECTED!`);
this.subscriptions = [];
postMessage({ type: 'status', data: 0, url: relay.url } as RelayResponse);
});
relay.on('notice', (msg: any) => {
console.log(`${this.url}: NOTICE: ${msg}`);
postMessage({ type: 'notice', data: msg, url: relay.url } as RelayResponse);
});
try {
await relay.connect();
} catch (err) {
postMessage({ type: 'error', relay: this.url, error: 'Unable to connect.' });
console.error('Unable to connect.');
}
}
async disconnect() {
if (this.relay.status == 1) {
console.log(`${this.url}: relay.status: ${this.relay.status}, calling close!`);
return this.relay.close();
}
}
unsubscribe(id: string) {
const index = this.subscriptions.findIndex((s) => s.id === id);
if (index == -1) {
return;
}
const sub = this.subscriptions[index];
this.subscriptions.splice(index, 1);
// Unsub from the relay.
sub.sub?.unsub();
console.log('Unsubscribed: ', id);
}
// subscribeAll(subscriptions: NostrRelaySubscription[]) {
// debugger;
// if (!subscriptions) {
// return;
// }
// for (let index = 0; index < subscriptions.length; index++) {
// const sub = subscriptions[index];
// this.subscribe(sub.filters, sub.id);
// }
// }
profileSub?: NostrSub;
profileTimer?: any;
contactsSub?: NostrSub;
contactsTimer?: any;
clearProfileSub() {
this.profileSub?.unsub();
this.profileSub = undefined;
}
clearContactsSub() {
this.contactsSub?.unsub();
this.contactsSub = undefined;
}
downloadProfile(pubkeys: string[], timeoutSeconds: number = 12) {
console.log('DOWNLOAD PROFILE....');
let finalizedCalled = false;
if (!this.relay) {
debugger;
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.profileSub) {
console.log('Profile sub already existed, unsub before continue.');
this.clearProfileSub();
}
// Skip if the subscription is already added.
// if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
// debugger;
// console.log('This subscription is already added!');
// return;
// }
const sub = this.relay.sub([{ kinds: [0], authors: pubkeys }]) as NostrSub;
this.profileSub = sub;
// sub.id = id;
// console.log('SUBSCRIPTION:', sub);
// this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
console.log('POST MESSAGE BACK TO MAIN');
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
console.log('FINISHED POST MESSAGE BACK TO MAIN');
// console.log('CLEAR PROFILE SUBSCRIPTION....');
// this.clearProfileSub();
// clearTimeout(this.profileTimer);
// console.log('FINISHED CLEAR PROFILE SUBSCRIPTION....');
// this.queue.queues.profile.active = false;
// this.processProfiles();
// if (!finalizedCalled) {
// finalizedCalled = true;
// console.log('Calling finalized!!!');
// finalized();
// console.log('Called finalized!!!');
// }
// console.log('Profile event received, finalized called.');
// const event = this.eventService.processEvent(originalEvent);
// if (!event) {
// return;
// }
// observer.next(event);
});
sub.on('eose', () => {
console.log('eose on profile, profile likely not found.');
clearTimeout(this.profileTimer);
this.clearProfileSub();
this.queue.queues.profile.active = false;
this.processProfiles();
});
console.log('REGISTER TIMEOUT!!', timeoutSeconds * 1000);
this.profileTimer = setTimeout(() => {
console.warn(`${this.url}: Profile download timeout reached.`);
this.clearProfileSub();
this.queue.queues.profile.active = false;
this.processProfiles();
postMessage({ url: this.url, type: 'timeout', data: { type: 'Profile', identifier: pubkeys } } as RelayResponse);
// if (!finalizedCalled) {
// finalizedCalled = true;
// finalized();
// }
}, timeoutSeconds * 1000);
}
downloadContacts(pubkey: string, finalized: any, timeoutSeconds: number = 3000) {
console.log('DOWNLOAD CONTACTS....');
let finalizedCalled = false;
if (!this.relay) {
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.contactsSub) {
this.clearContactsSub();
}
const sub = this.relay.sub([{ kinds: [3], authors: [pubkey] }]) as NostrSub;
this.contactsSub = sub;
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
this.clearContactsSub();
clearTimeout(this.contactsTimer);
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
});
this.contactsTimer = setTimeout(() => {
this.clearContactsSub();
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
}, timeoutSeconds * 1000);
}
subscribe(filters: Filter[], id: string) {
console.log('SUBSCRIBE....');
if (!this.relay || this.relay.status != 1) {
// If we don't have a connection yet, schedule the subscription to be added later.
this.queue.queues.subscriptions.jobs.push({ id: id, filters: filters });
console.warn('This relay does not have active connection and subscription cannot be created at this time. Subscription has been scheduled for adding later.');
return;
}
// Skip if the subscription is already added.
if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
console.log('This subscription is already added!');
return;
}
const sub = this.relay.sub(filters) as NostrSub;
// sub.id = id;
console.log('SUBSCRIPTION:', sub);
this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
// const event = this.eventService.processEvent(originalEvent);
// if (!event) {
// return;
// }
// observer.next(event);
});
sub.on('eose', () => {
console.log('eose on:', this.url);
});
// return () => {
// console.log('subscribeToRelay:finished:unsub');
// // When the observable is finished, this return function is called.
// sub.unsub();
// const subIndex = relay.subscriptions.findIndex((s) => s == sub);
// if (subIndex > -1) {
// relay.subscriptions.splice(subIndex, 1);
// }
// };
}
async info() {
try {
const url = new URL(this.url);
const infoUrl = `https://${url.hostname}`;
const rawResponse = await fetch(infoUrl, {
method: 'GET',
mode: 'cors',
headers: {
Accept: 'application/nostr+json',
},
});
if (rawResponse.status === 200) {
const content = await rawResponse.json();
postMessage({ type: 'nip11', data: content, url: this.url } as RelayResponse);
} else {
postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${rawResponse.statusText}` }, url: this.url } as RelayResponse);
}
} catch (err) {
console.warn(err);
postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${err}` }, url: this.url } as RelayResponse);
}
}
}

View File

@ -1,59 +1,12 @@
/// <reference lib="webworker" /> /// <reference lib="webworker" />
import { Event, relayInit, Filter, Sub } from 'nostr-tools';
import { NostrRelay, NostrRelaySubscription, NostrSub, QueryJob } from '../services/interfaces';
import { RelayRequest, RelayResponse } from '../services/messages'; import { RelayRequest, RelayResponse } from '../services/messages';
import { Queue } from '../services/queue'; import { RelayWorker } from './relay';
import { Storage } from '../types/storage';
let relayWorker: RelayWorker; let relayWorker: RelayWorker;
let relay = undefined;
let storage = undefined;
addEventListener('message', async (ev: MessageEvent) => { addEventListener('message', async (ev: MessageEvent) => {
console.log('MESSAGE RECEIVED IN RELAY WORKER!!', JSON.stringify(ev.data)); console.log(`${relayWorker?.url}: MESSAGE RECEIVED IN RELAY WORKER!!`, JSON.stringify(ev.data));
// postMessage(ev.data);
// storage = new Storage('blockcore-notes-' + '123', 1);
// await storage.open();
// // await this.storage.putCircle({ id: 1, name: 'Circle 1' });
// // const circle = await storage.getCircle(1);
// // console.log(circle);
// // // await this.storage.putCircle({ id: 1, name: 'Circle 2' });
// // const circle2 = await storage.getCircle(1);
// // console.log(circle2);
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 50, pubkey: '123', kind: 1, id: '1', tags: [] });
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 100, pubkey: '123', kind: 1, id: '2', tags: [] });
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 101, pubkey: '123', kind: 1, id: '3', tags: [] });
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 199, pubkey: '123', kind: 1, id: '4', tags: [] });
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 200, pubkey: '123', kind: 1, id: '5', tags: [] });
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: 201, pubkey: '123', kind: 1, id: '6', tags: [] });
// const start = performance.now();
// for (let index = 1; index < 10000; index++) {
// await storage.putEvents({ contentCut: false, tagsCut: false, content: '', created_at: index, pubkey: index.toString(), kind: 1, id: index.toString(), tags: [] });
// }
// const end = performance.now();
// console.log(`Execution time: ${end - start} ms`);
// const start2 = performance.now();
// const events = await storage.getEventsByCreated('123', IDBKeyRange.bound(100, 200));
// console.log('FOUND EVENTS:', events);
// const end2 = performance.now();
// console.log(`Execution time 2: ${end2 - start2} ms`);
// const events2 = await storage.getEventsByCreated('123', IDBKeyRange.bound(0, 100));
// console.log('FOUND EVENTS2:', events2);
const request = ev.data as RelayRequest; const request = ev.data as RelayRequest;
@ -67,22 +20,14 @@ addEventListener('message', async (ev: MessageEvent) => {
relayWorker = new RelayWorker(request.data.url); relayWorker = new RelayWorker(request.data.url);
await relayWorker.connect(request.data.event); await relayWorker.connect(request.data.event);
await relayWorker.info(); await relayWorker.info();
// debugger;
// relayWorker.subscribeAll(request.data.subscriptions);
break; break;
} }
case 'disconnect': case 'disconnect':
await relayWorker.disconnect(); await relayWorker.disconnect();
// postMessage({ type: 'disconnect', result: true } as RelayResponse);
break; break;
case 'publish': case 'publish':
await relayWorker.publish(request.data); await relayWorker.publish(request.data);
break; break;
// case 'connect-and-publish':
// await relayWorker.publish(request.data);
// await relayWorker.disconnect();
// postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse);
// break;
case 'enque': case 'enque':
await relayWorker.enque(request.data); await relayWorker.enque(request.data);
break; break;
@ -93,18 +38,15 @@ addEventListener('message', async (ev: MessageEvent) => {
await relayWorker.unsubscribe(request.data); await relayWorker.unsubscribe(request.data);
break; break;
case 'terminate': case 'terminate':
try {
await relayWorker.disconnect(); await relayWorker.disconnect();
} catch (err) {
console.error('Error during disconnect.', err);
}
console.log(`${relayWorker.url}: Sending 'terminated' event.`);
postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse); postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse);
break; break;
// case 'initialize':
// relayWorker = new RelayWorker(request.data);
// await relayWorker.connect();
// // postMessage({ type: 'disconnect', result: true } as RelayResponse);
// break;
} }
// const response = `worker response to ${data}`;
// postMessage(response);
}); });
function yieldToMain() { function yieldToMain() {
@ -112,437 +54,3 @@ function yieldToMain() {
setTimeout(resolve, 0); setTimeout(resolve, 0);
}); });
} }
export class RelayWorker {
relay!: NostrRelay;
/** These are the subscription instances connected to the relay. */
// subs: NostrSub[] = [];
/** These are the subscriptions the app has requested and manages. */
subscriptions: NostrRelaySubscription[] = [];
queue: Queue;
constructor(public url: string) {
this.queue = new Queue();
}
async publish(event: Event) {
let pub = this.relay.publish(event);
pub.on('ok', () => {
console.log(`${this.relay.url} has accepted our event`);
});
pub.on('seen', () => {
console.log(`we saw the event on ${this.relay.url}`);
});
pub.on('failed', (reason: any) => {
console.log(`failed to publish to ${this.relay.url}: ${reason}`);
});
}
/** Enques a job to be processed against connected relays. */
enque(job: QueryJob) {
// It is way more optimal to just delegate jobs into separate queues when enquing than querying later.
if (job.type == 'Profile') {
this.queue.queues.profile.jobs.push(job);
} else if (job.type == 'Contacts') {
this.queue.queues.contacts.jobs.push(job);
} else if (job.type == 'Event') {
this.queue.queues.event.jobs.push(job);
} else {
throw Error(`This type of job (${job.type}) is currently not supported.`);
}
console.log(`${this.url}: Job enqued...processing...`);
// We always delay the processing in case we receive more.
setTimeout(() => {
this.process();
}, 150);
}
process() {
this.processProfiles();
this.processContacts();
this.processSubscriptions();
}
processSubscriptions() {
if (!this.relay || this.relay.status != 1) {
return;
}
if (this.queue.queues.subscriptions.jobs.length == 0) {
return;
}
while (this.queue.queues.subscriptions.jobs.length) {
const job = this.queue.queues.subscriptions.jobs.shift();
if (job) {
this.subscribe(job.filters, job.id);
}
}
}
processProfiles() {
if (!this.relay || this.relay.status != 1 || this.queue.queues.profile.active) {
console.log(`${this.url}: processProfiles: Relay not ready or currently active: ${this.queue.queues.profile.active}.`, this.relay);
return;
}
console.log(`${this.url}: processProfiles: Processing with downloading... Count: ` + this.queue.queues.profile.jobs.length);
if (this.queue.queues.profile.jobs.length == 0) {
this.queue.queues.profile.active = false;
return;
}
this.queue.queues.profile.active = true;
console.log(this.relay);
debugger;
const profilesToDownload = this.queue.queues.profile.jobs.splice(0, 500);
console.log('profilesToDownload:', profilesToDownload);
this.downloadProfile(
profilesToDownload.map((j) => j.identifier),
profilesToDownload.length * 3
);
// const job = this.queue.queues.profile.jobs.shift();
// console.log(`${this.url}: processProfiles: Job: `, job);
// this.downloadProfile(job!.identifier);
}
processContacts() {
if (!this.relay || this.relay.status != 1 || this.queue.queues.contacts.active) {
return;
}
if (this.queue.queues.contacts.jobs.length == 0) {
this.queue.queues.contacts.active = false;
return;
}
this.queue.queues.contacts.active = true;
const job = this.queue.queues.contacts.jobs.shift();
this.downloadContacts(job!.identifier, () => {
this.queue.queues.contacts.active = false;
this.processContacts();
});
}
processEvents() {}
/** Provide event to publish and terminate immediately. */
async connect(event?: any) {
// const relay = relayInit('wss://relay.nostr.info');
const relay = relayInit(this.url) as NostrRelay;
// relay.subscriptions = [];
this.relay = relay;
relay.on('connect', async () => {
console.log(`${this.url}: Connected.`);
postMessage({ type: 'status', data: 1, url: relay.url } as RelayResponse);
// If there was an event provided, publish it and then disconnect.
if (event) {
await this.publish(event);
await this.disconnect();
postMessage({ type: 'terminated', url: relayWorker.url } as RelayResponse);
} else {
// Make sure we set the relay as well before processing.
// this.relay = relay;
// Upon connection, make sure we process anything that is in the queue immediately:
this.process();
// onConnected(relay);
//this.onConnected(relay);
}
});
relay.on('disconnect', () => {
console.log(`${this.url}: DISCONNECTED!`);
this.subscriptions = [];
postMessage({ type: 'status', data: 0, url: relay.url } as RelayResponse);
});
relay.on('notice', (msg: any) => {
console.log(`${this.url}: NOTICE: ${msg}`);
postMessage({ type: 'notice', data: msg, url: relay.url } as RelayResponse);
});
// Keep a reference of the metadata on the relay instance.
// relay.metadata = server;
// if (relay.metadata.enabled == undefined) {
// relay.metadata.enabled = true;
// }
try {
// if (relay.metadata.enabled) {
await relay.connect();
// }
} catch (err) {
postMessage({ relay: this.url, error: 'Unable to connect.' });
console.log(err);
return;
// relay.metadata.error = 'Unable to connect.';
}
// this.relay = relay;
// console.log(`${this.url}: THIS.RELAY WAS SET:`, this.relay);
// await this.addRelay(relay);
}
async disconnect() {
// this.subscriptions = [];
return this.relay.close();
}
unsubscribe(id: string) {
const index = this.subscriptions.findIndex((s) => s.id === id);
if (index == -1) {
return;
}
const sub = this.subscriptions[index];
this.subscriptions.splice(index, 1);
// Unsub from the relay.
sub.sub?.unsub();
console.log('Unsubscribed: ', id);
}
// subscribeAll(subscriptions: NostrRelaySubscription[]) {
// debugger;
// if (!subscriptions) {
// return;
// }
// for (let index = 0; index < subscriptions.length; index++) {
// const sub = subscriptions[index];
// this.subscribe(sub.filters, sub.id);
// }
// }
profileSub?: NostrSub;
profileTimer?: any;
contactsSub?: NostrSub;
contactsTimer?: any;
clearProfileSub() {
this.profileSub?.unsub();
this.profileSub = undefined;
}
clearContactsSub() {
this.contactsSub?.unsub();
this.contactsSub = undefined;
}
downloadProfile(pubkeys: string[], timeoutSeconds: number = 12) {
console.log('DOWNLOAD PROFILE....');
let finalizedCalled = false;
if (!this.relay) {
debugger;
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.profileSub) {
console.log('Profile sub already existed, unsub before continue.');
this.clearProfileSub();
}
// Skip if the subscription is already added.
// if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
// debugger;
// console.log('This subscription is already added!');
// return;
// }
const sub = this.relay.sub([{ kinds: [0], authors: pubkeys }]) as NostrSub;
this.profileSub = sub;
// sub.id = id;
// console.log('SUBSCRIPTION:', sub);
// this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
console.log('POST MESSAGE BACK TO MAIN');
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
console.log('FINISHED POST MESSAGE BACK TO MAIN');
// console.log('CLEAR PROFILE SUBSCRIPTION....');
// this.clearProfileSub();
// clearTimeout(this.profileTimer);
// console.log('FINISHED CLEAR PROFILE SUBSCRIPTION....');
// this.queue.queues.profile.active = false;
// this.processProfiles();
// if (!finalizedCalled) {
// finalizedCalled = true;
// console.log('Calling finalized!!!');
// finalized();
// console.log('Called finalized!!!');
// }
// console.log('Profile event received, finalized called.');
// const event = this.eventService.processEvent(originalEvent);
// if (!event) {
// return;
// }
// observer.next(event);
});
sub.on('eose', () => {
console.log('eose on profile, profile likely not found.');
clearTimeout(this.profileTimer);
this.clearProfileSub();
this.queue.queues.profile.active = false;
this.processProfiles();
});
console.log('REGISTER TIMEOUT!!', timeoutSeconds * 1000);
this.profileTimer = setTimeout(() => {
console.warn(`${this.url}: Profile download timeout reached.`);
this.clearProfileSub();
this.queue.queues.profile.active = false;
this.processProfiles();
postMessage({ url: this.url, type: 'timeout', data: { type: 'Profile', identifier: pubkeys } } as RelayResponse);
// if (!finalizedCalled) {
// finalizedCalled = true;
// finalized();
// }
}, timeoutSeconds * 1000);
}
downloadContacts(pubkey: string, finalized: any, timeoutSeconds: number = 3000) {
console.log('DOWNLOAD CONTACTS....');
let finalizedCalled = false;
if (!this.relay) {
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.contactsSub) {
this.clearContactsSub();
}
const sub = this.relay.sub([{ kinds: [3], authors: [pubkey] }]) as NostrSub;
this.contactsSub = sub;
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
this.clearContactsSub();
clearTimeout(this.contactsTimer);
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
});
this.contactsTimer = setTimeout(() => {
this.clearContactsSub();
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
}, timeoutSeconds * 1000);
}
subscribe(filters: Filter[], id: string) {
console.log('SUBSCRIBE....');
if (!this.relay || this.relay.status != 1) {
// If we don't have a connection yet, schedule the subscription to be added later.
this.queue.queues.subscriptions.jobs.push({ id: id, filters: filters });
console.warn('This relay does not have active connection and subscription cannot be created at this time. Subscription has been scheduled for adding later.');
return;
}
// Skip if the subscription is already added.
if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
console.log('This subscription is already added!');
return;
}
const sub = this.relay.sub(filters) as NostrSub;
// sub.id = id;
console.log('SUBSCRIPTION:', sub);
this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
// const event = this.eventService.processEvent(originalEvent);
// if (!event) {
// return;
// }
// observer.next(event);
});
sub.on('eose', () => {
console.log('eose on:', this.url);
});
// return () => {
// console.log('subscribeToRelay:finished:unsub');
// // When the observable is finished, this return function is called.
// sub.unsub();
// const subIndex = relay.subscriptions.findIndex((s) => s == sub);
// if (subIndex > -1) {
// relay.subscriptions.splice(subIndex, 1);
// }
// };
}
async info() {
try {
const url = new URL(this.url);
const infoUrl = `https://${url.hostname}`;
const rawResponse = await fetch(infoUrl, {
method: 'GET',
mode: 'cors',
headers: {
Accept: 'application/nostr+json',
},
});
if (rawResponse.status === 200) {
const content = await rawResponse.json();
postMessage({ type: 'nip11', data: content, url: this.url } as RelayResponse);
} else {
postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${rawResponse.statusText}` }, url: this.url } as RelayResponse);
}
} catch (err) {
console.warn(err);
postMessage({ type: 'nip11', data: { error: `Unable to get NIP-11 data. Status: ${err}` }, url: this.url } as RelayResponse);
}
}
}