Improve subscription object management

This commit is contained in:
SondreB 2023-01-29 14:38:36 +01:00
parent ee2914d140
commit 54074601f0
No known key found for this signature in database
GPG Key ID: D6CC44C75005FDBF
6 changed files with 97 additions and 25 deletions

View File

@ -253,6 +253,20 @@ export class AppComponent {
},
});
// Create the listeners (filters) for relays:
// TODO: There is limit on maximum following, we need a strategy to handle that.
// potentially subscribing and unsubscribing on intervals with a .since field between each interval.
const pubKeys = this.profileService.profiles.map((p) => p.pubkey);
// Add self to the top of listening list:
pubKeys.unshift(this.appState.getPublicKey());
console.log('PUB KEYS:', pubKeys);
this.relayService.queueSubscription([{ authors: pubKeys }]);
// this.relayService.
// .subscribe(async (profile) => {
// // TODO: Figure out why we get promises from this observable.
// const p = await profile;

View File

@ -362,7 +362,7 @@ export class DataService {
subscribeToRelay(filters: Filter[], relay: NostrRelay): Observable<NostrEventDocument> {
return new Observable<NostrEventDocument>((observer: Observer<NostrEventDocument>) => {
const sub = relay.sub(filters, {}) as NostrSubscription;
relay.subscriptions.push(sub);
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
const event = this.eventService.processEvent(originalEvent);
@ -380,10 +380,10 @@ export class DataService {
console.log('subscribeToRelay:finished:unsub');
// When the observable is finished, this return function is called.
sub.unsub();
const subIndex = relay.subscriptions.findIndex((s) => s == sub);
if (subIndex > -1) {
relay.subscriptions.splice(subIndex, 1);
}
// const subIndex = relay.subscriptions.findIndex((s) => s == sub);
// if (subIndex > -1) {
// relay.subscriptions.splice(subIndex, 1);
// }
};
});
}
@ -391,7 +391,7 @@ export class DataService {
downloadFromRelay(filters: Filter[], relay: NostrRelay, requestTimeout = 10000): Observable<NostrEventDocument> {
return new Observable<NostrEventDocument>((observer: Observer<NostrEventDocument>) => {
const sub = relay.sub([...filters], {}) as NostrSubscription;
relay.subscriptions.push(sub);
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
const event = this.eventService.processEvent(originalEvent);

View File

@ -1,4 +1,4 @@
import { Event, Relay, Sub } from 'nostr-tools';
import { Event, Filter, Relay, Sub } from 'nostr-tools';
export interface Circle {
id?: number;
@ -64,7 +64,14 @@ export interface NostrRelay extends Relay {
// nip11: any;
// error: string;
metadata: NostrRelayDocument;
subscriptions: Sub[];
// subscriptions: Sub[];
}
export interface NostrRelaySubscription {
id: string;
// subid: string;
sub?: NostrSub;
filters: Filter[];
}
export interface NostrRelayDocument {
@ -88,7 +95,7 @@ export interface NostrEvent extends Event {
}
export interface NostrSub extends Sub {
id: string;
// id: string;
}
export interface NostrEventDocument extends Event {
@ -225,4 +232,3 @@ export class ChatModel {
'lastMessageLength': string | number;
'chat': Array<MessageModel>;
}

View File

@ -1,5 +1,5 @@
import { Injectable } from '@angular/core';
import { NostrEventDocument, NostrRelay, NostrRelayDocument } from './interfaces';
import { NostrEventDocument, NostrRelay, NostrRelayDocument, NostrRelaySubscription } from './interfaces';
import { Observable, BehaviorSubject, from, merge, timeout, catchError, of, finalize, tap } from 'rxjs';
import { Filter, Relay, relayInit, Sub } from 'nostr-tools';
import { EventService } from './event';
@ -199,8 +199,19 @@ export class RelayService {
await this.setRelayStatus(url, response.data);
// Upon first successful connection, we'll set the status to online.
// Upon status set to 1, make sure we subscribe to the existing subscriptions.
if (response.data === 1) {
this.appState.updateConnectionStatus(true);
const index = this.workers.findIndex((v) => v.url == url);
const worker = this.workers[index];
debugger;
for (let index = 0; index < this.subs2.length; index++) {
const sub = this.subs2[index];
worker.subscribe(sub.filters, sub.id);
}
}
break;
@ -254,6 +265,8 @@ export class RelayService {
// Avoid adding duplicate workers, but make sure we initiate a connect action.
if (index > -1) {
this.workers[index].connect();
// TODO: Make sure we also re-create subscriptions.
return;
}
@ -270,7 +283,7 @@ export class RelayService {
this.workers.push(relayType);
relayType.connect();
relayType.connect(this.subs2);
// if (typeof Worker !== 'undefined') {
// // Create a new
@ -488,7 +501,7 @@ export class RelayService {
// const relay = relayInit('wss://relay.nostr.info');
const relay = relayInit(server.url) as NostrRelay;
relay.subscriptions = [];
// relay.subscriptions = [];
relay.on('connect', () => {
// console.log(`connected to ${relay?.url}`);
@ -498,7 +511,7 @@ export class RelayService {
relay.on('disconnect', () => {
console.log(`DISCONNECTED! ${relay?.url}`);
relay.subscriptions = [];
// relay.subscriptions = [];
});
relay.on('notice', (msg: any) => {
@ -551,7 +564,14 @@ export class RelayService {
subscriptions: any = {};
subs2: any[] = [];
subs2: NostrRelaySubscription[] = [];
/** Queues up subscription that will be activated whenever the relay is connected. */
queueSubscription(filters: Filter[]) {
const id = uuidv4();
this.subs2.push({ id: id, filters: filters });
return id;
}
subscribe(filters: Filter[]) {
const id = uuidv4();

View File

@ -1,4 +1,5 @@
import { Filter } from 'nostr-tools';
import { NostrRelaySubscription } from '../services/interfaces';
import { RelayRequest, RelayResponse } from '../services/messages';
/** Relay type that holds a connection to the Web Worker and abstracts calling different actions to the Web Worker. */
@ -12,8 +13,8 @@ export class RelayType {
return this.worker;
}
connect() {
this.action('connect', this.url);
connect(subscriptions?: NostrRelaySubscription[]) {
this.action('connect', { url: this.url, subscriptions });
}
disconnect() {

View File

@ -1,7 +1,7 @@
/// <reference lib="webworker" />
import { Event, relayInit, Filter, Sub } from 'nostr-tools';
import { NostrRelay, NostrSub } from '../services/interfaces';
import { NostrRelay, NostrRelaySubscription, NostrSub } from '../services/interfaces';
import { RelayRequest, RelayResponse } from '../services/messages';
import { Storage } from '../types/storage';
@ -63,9 +63,11 @@ addEventListener('message', async (ev: MessageEvent) => {
console.log('Already connected...');
break;
} else {
relayWorker = new RelayWorker(request.data);
relayWorker = new RelayWorker(request.data.url);
await relayWorker.connect();
await relayWorker.info();
// debugger;
// relayWorker.subscribeAll(request.data.subscriptions);
break;
}
case 'disconnect':
@ -76,6 +78,7 @@ addEventListener('message', async (ev: MessageEvent) => {
await relayWorker.publish(request.data);
break;
case 'subscribe':
debugger;
await relayWorker.subscribe(request.data.filters, request.data.id);
break;
case 'unsubscribe':
@ -98,7 +101,11 @@ addEventListener('message', async (ev: MessageEvent) => {
export class RelayWorker {
relay!: NostrRelay;
subscriptions: NostrSub[] = [];
/** These are the subscription instances connected to the relay. */
// subs: NostrSub[] = [];
/** These are the subscriptions the app has requested and manages. */
subscriptions: NostrRelaySubscription[] = [];
constructor(public url: string) {}
@ -129,7 +136,8 @@ export class RelayWorker {
relay.on('disconnect', () => {
console.log(`DISCONNECTED! ${relay?.url}`);
relay.subscriptions = [];
debugger;
this.subscriptions = [];
postMessage({ type: 'status', data: 0, url: relay.url } as RelayResponse);
});
@ -161,6 +169,7 @@ export class RelayWorker {
}
async disconnect() {
// this.subscriptions = [];
return this.relay.close();
}
@ -173,23 +182,45 @@ export class RelayWorker {
const sub = this.subscriptions[index];
this.subscriptions.splice(index, 1);
sub.unsub();
// Unsub from the relay.
sub.sub?.unsub();
console.log('Unsubscribed: ', id);
}
// subscribeAll(subscriptions: NostrRelaySubscription[]) {
// debugger;
// if (!subscriptions) {
// return;
// }
// for (let index = 0; index < subscriptions.length; index++) {
// const sub = subscriptions[index];
// this.subscribe(sub.filters, sub.id);
// }
// }
subscribe(filters: Filter[], id: string) {
console.log('SUBSCRIBE....');
if (!this.relay) {
console.warn('This relay does not have active connection and subscription cannot be created at this time.');
return;
}
const sub = this.relay.sub(filters) as NostrSub;
// Skip if the subscription is already added.
if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
debugger;
console.log('This subscription is already added!');
return;
}
sub.id = id;
const sub = this.relay.sub(filters) as NostrSub;
// sub.id = id;
console.log('SUBSCRIPTION:', sub);
this.subscriptions.push(sub);
this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);