Handle profile updates

This commit is contained in:
SondreB 2023-01-30 00:05:38 +01:00
parent f69c0f1072
commit 7de8748442
No known key found for this signature in database
GPG Key ID: D6CC44C75005FDBF
14 changed files with 372 additions and 95 deletions

View File

@ -204,73 +204,11 @@ export class AppComponent {
// This service will perform data cleanup, etc.
await this.dataService.initialize();
// Download the profile of the user.
this.dataService.enque({
identifier: this.appState.getPublicKey(),
type: 'Profile',
// callback: (data: any) => {
// // This call back is only called if we found a newer profile than already exists.
// // So when this happens, we'll show the import sheet.
// console.log(data);
// debugger;
// this.openImportSheet();
this.appState.connected$.subscribe(() => {
// // if (!this.profileService.profile?.following || this.profileService.profile?.following.length === 0) {
// // }
// },
});
// Download the following of the user.
this.dataService.enque({
identifier: this.appState.getPublicKey(),
type: 'Contacts',
callback: (data: any) => {
// The callback is called for all contacts lists, not just the one we call for.
if (data.pubkey !== this.appState.getPublicKey()) {
return;
}
// Sometimes we might discover newer or older profiles, make sure we only update UI dialog if newer.
if (this.discoveredProfileDate < data.created_at) {
this.discoveredProfileDate = data.created_at;
const following = this.profileService.profile?.following;
const pubkeys = data.tags.map((t: any[]) => t[1]);
console.log('FOLLOWING:' + JSON.stringify(following));
if (!following) {
const dialogData: any = { pubkeys: pubkeys, pubkey: data.pubkey };
if (data.content) {
dialogData.relays = JSON.parse(data.content);
dialogData.relaysCount = Object.keys(dialogData.relays).length;
}
this.openImportSheet(dialogData);
}
}
},
});
// Create the listeners (filters) for relays:
// TODO: There is limit on maximum following, we need a strategy to handle that.
// potentially subscribing and unsubscribing on intervals with a .since field between each interval.
const pubKeys = this.profileService.following.map((p) => p.pubkey);
// Add self to the top of listening list:
pubKeys.unshift(this.appState.getPublicKey());
console.log('PUB KEYS:', pubKeys);
// Subscribe to new events but don't get any history (limit: 0).
console.log('queueSubscription:', { authors: pubKeys, since: this.db.state.since });
this.relayService.queueSubscription([{ authors: pubKeys, since: this.db.state.since }]);
// this.relayService.
// .subscribe(async (profile) => {

View File

@ -1,6 +1,8 @@
<div class="page">
<p>This page act as examples for more specialized implementation details of the app.</p>
<button mat-stroked-button (click)="downloadProfile()">Enque Profile Download</button>
<button mat-stroked-button (click)="database()">Create Database</button>
<button mat-stroked-button (click)="databaseWorker()">Create Database in Worker</button>

View File

@ -88,6 +88,10 @@ export class DevelopmentComponent {
this.relayService.appendRelays(this.relayService.defaultRelays);
}
downloadProfile() {
this.relayService.enque({ identifier: this.appState.getPublicKey(), type: 'Profile' });
}
sub?: string;
subscription() {

View File

@ -6,7 +6,7 @@ import { ApplicationState } from '../services/applicationstate';
import { DataService } from '../services/data';
import { NostrEventDocument, NostrProfileDocument } from '../services/interfaces';
import { ProfileService } from '../services/profile';
import { QueueService } from '../services/queue';
import { QueueService } from '../services/queue.service';
import { UIService } from '../services/ui';
@Component({

View File

@ -1,7 +1,7 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, finalize, distinct, flatMap, from, groupBy, map, Observable, of, Subscription, switchMap } from 'rxjs';
import { ChatModel, NostrEventDocument, UserModel } from './interfaces';
import { QueueService } from './queue';
import { QueueService } from './queue.service';
import { DataService } from './data';
import { ApplicationState } from './applicationstate';

View File

@ -9,7 +9,7 @@ import { ApplicationState } from './applicationstate';
import { timeout, map, merge, Observable, delay, Observer, race, take, switchMap, mergeMap, tap, finalize, concatMap, mergeAll, exhaustMap, catchError, of, combineAll, combineLatestAll, filter, from } from 'rxjs';
import { Utilities } from './utilities';
import { StorageService } from './storage';
import { QueueService } from './queue';
import { QueueService } from './queue.service';
import { UIService } from './ui';
@Injectable({
@ -42,6 +42,8 @@ export class DataService {
this.connected = connected;
if (this.connected) {
this.initialDataLoad();
console.log('Connection established, start processing queues.');
this.processQueues();
}
@ -55,6 +57,71 @@ export class DataService {
});
}
async initialDataLoad() {
// Download the profile of the user.
this.enque({
identifier: this.appState.getPublicKey(),
type: 'Profile',
// callback: (data: any) => {
// // This call back is only called if we found a newer profile than already exists.
// // So when this happens, we'll show the import sheet.
// console.log(data);
// debugger;
// this.openImportSheet();
// // if (!this.profileService.profile?.following || this.profileService.profile?.following.length === 0) {
// // }
// },
});
// Download the following of the user.
this.enque({
identifier: this.appState.getPublicKey(),
type: 'Contacts',
callback: (data: any) => {
// TODO: MIGRATE THIS LOGIC!!
// The callback is called for all contacts lists, not just the one we call for.
// if (data.pubkey !== this.appState.getPublicKey()) {
// return;
// }
// // Sometimes we might discover newer or older profiles, make sure we only update UI dialog if newer.
// if (this.discoveredProfileDate < data.created_at) {
// this.discoveredProfileDate = data.created_at;
// const following = this.profileService.profile?.following;
// const pubkeys = data.tags.map((t: any[]) => t[1]);
// console.log('FOLLOWING:' + JSON.stringify(following));
// if (!following) {
// const dialogData: any = { pubkeys: pubkeys, pubkey: data.pubkey };
// if (data.content) {
// dialogData.relays = JSON.parse(data.content);
// dialogData.relaysCount = Object.keys(dialogData.relays).length;
// }
// this.openImportSheet(dialogData);
// }
// }
},
});
// Create the listeners (filters) for relays:
// TODO: There is limit on maximum following, we need a strategy to handle that.
// potentially subscribing and unsubscribing on intervals with a .since field between each interval.
const pubKeys = this.profileService.following.map((p) => p.pubkey);
// Add self to the top of listening list:
pubKeys.unshift(this.appState.getPublicKey());
console.log('PUB KEYS:', pubKeys);
// Subscribe to new events but don't get any history (limit: 0).
console.log('queueSubscription:', { authors: pubKeys, since: this.storage.state.since });
// this.relayService.queueSubscription([{ authors: pubKeys, since: this.storage.state.since }]);
this.relayService.subscribe([{ authors: pubKeys, since: this.storage.state.since }]);
}
async initialize() {
setTimeout(async () => {
await this.cleanProfiles();
@ -78,6 +145,9 @@ export class DataService {
throw Error(`This type of job (${job.type}) is currently not supported.`);
}
// Enque the job on all web workers.
this.relayService.action('enque', job);
// We always delay the processing in case we receive
setTimeout(() => {
this.processQueues();
@ -99,6 +169,7 @@ export class DataService {
return;
}
// Processing queues basically just copies the jobs from data service to the individual web workers.
if (this.queueService.queues.profile.jobs.length > 0) {
this.processProfileQueue();
}

View File

@ -6,7 +6,7 @@ import { Utilities } from './utilities';
import { StorageService } from './storage';
import { CacheService } from './cache';
import { dexieToRx } from '../shared/utilities';
import { QueueService } from './queue';
import { QueueService } from './queue.service';
import { UIService } from './ui';
@Injectable({

View File

@ -0,0 +1,57 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { QueryJob } from './interfaces';
@Injectable({
providedIn: 'root',
})
export class QueueService {
#queuesChanged: BehaviorSubject<void> = new BehaviorSubject<void>(undefined);
get queues$(): Observable<void> {
return this.#queuesChanged.asObservable();
}
trigger() {
this.#queuesChanged.next();
}
enqueProfile(identifier: string, callback?: any) {
this.queues.profile.jobs.push({ identifier: identifier, type: 'Profile', callback: callback });
this.trigger();
}
enqueEvent(identifier: string, callback?: any, limit?: number) {
this.queues.event.jobs.push({ identifier: identifier, type: 'Event', callback: callback, limit: limit });
this.trigger();
}
enqueContacts(identifier: string, callback?: any) {
this.queues.contacts.jobs.push({ identifier: identifier, type: 'Contacts', callback: callback });
this.trigger();
}
enque(identifier: string, type: 'Profile' | 'Event' | 'Contacts', limit?: number) {
if (type === 'Profile') {
this.enqueProfile(identifier);
} else if (type === 'Event') {
this.enqueEvent(identifier, undefined, limit);
} else if (type === 'Contacts') {
this.enqueContacts(identifier);
}
}
queues = {
profile: {
active: false,
jobs: [] as QueryJob[],
},
event: {
active: false,
jobs: [] as QueryJob[],
},
contacts: {
active: false,
jobs: [] as QueryJob[],
},
};
}

View File

@ -1,33 +1,16 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { QueryJob } from './interfaces';
@Injectable({
providedIn: 'root',
})
export class QueueService {
#queuesChanged: BehaviorSubject<void> = new BehaviorSubject<void>(undefined);
get queues$(): Observable<void> {
return this.#queuesChanged.asObservable();
}
trigger() {
this.#queuesChanged.next();
}
export class Queue {
enqueProfile(identifier: string, callback?: any) {
this.queues.profile.jobs.push({ identifier: identifier, type: 'Profile', callback: callback });
this.trigger();
}
enqueEvent(identifier: string, callback?: any, limit?: number) {
this.queues.event.jobs.push({ identifier: identifier, type: 'Event', callback: callback, limit: limit });
this.trigger();
}
enqueContacts(identifier: string, callback?: any) {
this.queues.contacts.jobs.push({ identifier: identifier, type: 'Contacts', callback: callback });
this.trigger();
}
enque(identifier: string, type: 'Profile' | 'Event' | 'Contacts', limit?: number) {

View File

@ -1,7 +1,7 @@
import { Injectable } from '@angular/core';
import { NostrEventDocument, NostrRelay, NostrRelayDocument, NostrRelaySubscription } from './interfaces';
import { NostrEventDocument, NostrRelay, NostrRelayDocument, NostrRelaySubscription, QueryJob } from './interfaces';
import { Observable, BehaviorSubject, from, merge, timeout, catchError, of, finalize, tap } from 'rxjs';
import { Filter, Relay, relayInit, Sub } from 'nostr-tools';
import { Filter, Kind, Relay, relayInit, Sub } from 'nostr-tools';
import { EventService } from './event';
import { OptionsService } from './options';
import { ApplicationState } from './applicationstate';
@ -9,6 +9,8 @@ import { CacheService } from './cache';
import { StorageService } from './storage';
import { RelayType } from '../types/relay';
import { RelayResponse } from './messages';
import { ProfileService } from './profile';
import { Utilities } from './utilities';
import { v4 as uuidv4 } from 'uuid';
@Injectable({
@ -59,7 +61,7 @@ export class RelayService {
return this.#relaysChanged.asObservable();
}
constructor(private db: StorageService, private options: OptionsService, private eventService: EventService, private appState: ApplicationState) {
constructor(private utilities: Utilities, private profileService: ProfileService, private db: StorageService, private options: OptionsService, private eventService: EventService, private appState: ApplicationState) {
// Whenever the visibility becomes visible, run connect to ensure we're connected to the relays.
this.appState.visibility$.subscribe((visible) => {
if (visible) {
@ -173,12 +175,28 @@ export class RelayService {
console.log('SAVE EVENT?:', event);
// If the event we received is from someone the user is following, always persist it if not already persisted.
if (event.pubkey === this.appState.getPublicKey()) {
await this.db.storage.putEvents(event);
if (event.kind == Kind.Metadata) {
// This is a profile event, store it.
const nostrProfileDocument = this.utilities.mapProfileEvent(event);
if (nostrProfileDocument) {
await this.profileService.updateProfile(nostrProfileDocument.pubkey, nostrProfileDocument);
}
} else if (event.kind == Kind.Contacts) {
// TODO: Implement the contacts handling.
} else {
// If the event we received is from someone the user is following, always persist it if not already persisted.
if (event.pubkey === this.appState.getPublicKey()) {
await this.db.storage.putEvents(event);
}
}
}
enque(job: QueryJob) {
// Enque the job on all web workers.
this.action('enque', job);
}
async handleRelayMessage(ev: MessageEvent, url: string) {
const response = ev.data as RelayResponse;
@ -572,6 +590,13 @@ export class RelayService {
return id;
}
action(action: string, data: any) {
for (let index = 0; index < this.workers.length; index++) {
const worker = this.workers[index];
worker.action(action, data);
}
}
unsubscribe(id: string) {
for (let index = 0; index < this.workers.length; index++) {
const worker = this.workers[index];

View File

@ -15,6 +15,8 @@ export class ImportSheet {
this.bottomSheetRef.dismiss();
event.preventDefault();
debugger;
if (this.data.relaysCount > 0) {
// Reset all existing default connections.
await this.relayService.deleteRelays();

View File

@ -1,5 +1,5 @@
import { Filter } from 'nostr-tools';
import { NostrRelaySubscription } from '../services/interfaces';
import { NostrRelaySubscription, QueryJob } from '../services/interfaces';
import { RelayRequest, RelayResponse } from '../services/messages';
/** Relay type that holds a connection to the Web Worker and abstracts calling different actions to the Web Worker. */
@ -30,6 +30,10 @@ export class RelayType {
this.action('terminate');
}
enque(data: QueryJob) {
this.action('enque', data);
}
publish(data: any) {
this.action('publish', data);
}

View File

@ -14,7 +14,7 @@ import { MatTabChangeEvent } from '@angular/material/tabs';
import { map, Observable, of, Subscription, tap, BehaviorSubject, finalize } from 'rxjs';
import { DataService } from '../services/data';
import { NotesService } from '../services/notes';
import { QueueService } from '../services/queue';
import { QueueService } from '../services/queue.service';
import { UIService } from '../services/ui';
import { StorageService } from '../services/storage';

View File

@ -1,8 +1,9 @@
/// <reference lib="webworker" />
import { Event, relayInit, Filter, Sub } from 'nostr-tools';
import { NostrRelay, NostrRelaySubscription, NostrSub } from '../services/interfaces';
import { NostrRelay, NostrRelaySubscription, NostrSub, QueryJob } from '../services/interfaces';
import { RelayRequest, RelayResponse } from '../services/messages';
import { Queue } from '../services/queue';
import { Storage } from '../types/storage';
let relayWorker: RelayWorker;
@ -77,6 +78,9 @@ addEventListener('message', async (ev: MessageEvent) => {
case 'publish':
await relayWorker.publish(request.data);
break;
case 'enque':
await relayWorker.enque(request.data);
break;
case 'subscribe':
await relayWorker.subscribe(request.data.filters, request.data.id);
break;
@ -98,6 +102,12 @@ addEventListener('message', async (ev: MessageEvent) => {
// postMessage(response);
});
function yieldToMain() {
return new Promise((resolve) => {
setTimeout(resolve, 0);
});
}
export class RelayWorker {
relay!: NostrRelay;
@ -106,7 +116,11 @@ export class RelayWorker {
/** These are the subscriptions the app has requested and manages. */
subscriptions: NostrRelaySubscription[] = [];
constructor(public url: string) {}
queue: Queue;
constructor(public url: string) {
this.queue = new Queue();
}
async publish(event: Event) {
let pub = this.relay.publish(event);
@ -121,6 +135,68 @@ export class RelayWorker {
});
}
/** Enques a job to be processed against connected relays. */
enque(job: QueryJob) {
// It is way more optimal to just delegate jobs into separate queues when enquing than querying later.
if (job.type == 'Profile') {
this.queue.queues.profile.jobs.push(job);
} else if (job.type == 'Contacts') {
this.queue.queues.contacts.jobs.push(job);
} else if (job.type == 'Event') {
this.queue.queues.event.jobs.push(job);
} else {
throw Error(`This type of job (${job.type}) is currently not supported.`);
}
this.processProfiles();
this.processContacts();
// We always delay the processing in case we receive more.
// setTimeout(() => {
// this.processQueues();
// }, 100);
}
processProfiles() {
if (this.queue.queues.profile.active) {
return;
}
if (this.queue.queues.profile.jobs.length == 0) {
this.queue.queues.profile.active = false;
return;
}
this.queue.queues.profile.active = true;
const job = this.queue.queues.profile.jobs.shift();
this.downloadProfile(job!.identifier, () => {
this.queue.queues.profile.active = false;
this.processProfiles();
});
}
processContacts() {
if (this.queue.queues.contacts.active) {
return;
}
if (this.queue.queues.contacts.jobs.length == 0) {
this.queue.queues.contacts.active = false;
return;
}
this.queue.queues.contacts.active = true;
const job = this.queue.queues.contacts.jobs.shift();
this.downloadContacts(job!.identifier, () => {
this.queue.queues.contacts.active = false;
this.processContacts();
});
}
processEvents() {}
async connect() {
// const relay = relayInit('wss://relay.nostr.info');
const relay = relayInit(this.url) as NostrRelay;
@ -199,6 +275,121 @@ export class RelayWorker {
// }
// }
profileSub?: NostrSub;
profileTimer?: any;
contactsSub?: NostrSub;
contactsTimer?: any;
clearProfileSub() {
this.profileSub?.unsub();
this.profileSub = undefined;
}
clearContactsSub() {
this.contactsSub?.unsub();
this.contactsSub = undefined;
}
downloadProfile(pubkey: string, finalized: any, timeoutSeconds: number = 3000) {
console.log('DOWNLOAD PROFILE....');
let finalizedCalled = false;
if (!this.relay) {
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.profileSub) {
this.clearProfileSub();
}
// Skip if the subscription is already added.
// if (this.subscriptions.findIndex((s) => s.id == id) > -1) {
// debugger;
// console.log('This subscription is already added!');
// return;
// }
const sub = this.relay.sub([{ kinds: [0], authors: [pubkey] }]) as NostrSub;
this.profileSub = sub;
// sub.id = id;
// console.log('SUBSCRIPTION:', sub);
// this.subscriptions.push({ id: id, filters: filters, sub: sub });
// const sub = relay.sub(filters, {}) as NostrSubscription;
// relay.subscriptions.push(sub);
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
this.clearProfileSub();
clearTimeout(this.profileTimer);
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
// const event = this.eventService.processEvent(originalEvent);
// if (!event) {
// return;
// }
// observer.next(event);
});
// sub.on('eose', () => {
// clearTimeout(this.profileTimer);
// this.profileSub?.unsub();
// this.profileSub = undefined;
// });
this.profileTimer = setTimeout(() => {
this.clearProfileSub();
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
}, timeoutSeconds * 1000);
}
downloadContacts(pubkey: string, finalized: any, timeoutSeconds: number = 3000) {
console.log('DOWNLOAD CONTACTS....');
let finalizedCalled = false;
if (!this.relay) {
console.warn('This relay does not have active connection and download cannot be executed at this time.');
return;
}
// If the profilesub already exists, unsub and remove.
if (this.contactsSub) {
this.clearContactsSub();
}
const sub = this.relay.sub([{ kinds: [3], authors: [pubkey] }]) as NostrSub;
this.contactsSub = sub;
sub.on('event', (originalEvent: any) => {
postMessage({ url: this.url, type: 'event', data: originalEvent } as RelayResponse);
this.clearContactsSub();
clearTimeout(this.contactsTimer);
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
});
this.contactsTimer = setTimeout(() => {
this.clearContactsSub();
if (!finalizedCalled) {
finalizedCalled = true;
finalized();
}
}, timeoutSeconds * 1000);
}
subscribe(filters: Filter[], id: string) {
console.log('SUBSCRIBE....');