mirror of
https://github.com/block-core/blockcore-notes.git
synced 2024-09-29 06:20:42 +00:00
Add initial connection wait and add queue processing for relays
This commit is contained in:
parent
be15b12ba7
commit
84390c9bcc
@ -183,16 +183,18 @@ export class AppComponent {
|
||||
await this.dataService.initialize();
|
||||
|
||||
// Download the profile of the user.
|
||||
await this.dataService.downloadNewestProfiles([this.appState.getPublicKey()]).subscribe(async (profile) => {
|
||||
// TODO: Figure out why we get promises from this observable.
|
||||
const p = await profile;
|
||||
this.dataService.enque({ identifier: this.appState.getPublicKey(), type: 'Profile' });
|
||||
|
||||
if (!p) {
|
||||
return;
|
||||
}
|
||||
// .subscribe(async (profile) => {
|
||||
// // TODO: Figure out why we get promises from this observable.
|
||||
// const p = await profile;
|
||||
|
||||
await this.profileService.updateProfile(p.pubkey, p);
|
||||
});
|
||||
// if (!p) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
// await this.profileService.updateProfile(p.pubkey, p);
|
||||
// });
|
||||
}
|
||||
|
||||
async ngOnInit() {
|
||||
|
@ -30,7 +30,7 @@ export class ConnectComponent {
|
||||
localStorage.setItem('blockcore:notes:nostr:consent', this.consent.toString());
|
||||
}
|
||||
|
||||
async connect() {
|
||||
async connect() {
|
||||
if (!this.consent) {
|
||||
const element = document.getElementById('consent-card');
|
||||
// document.body.scroll(0, 5000);
|
||||
|
@ -6,7 +6,9 @@
|
||||
<!-- <div *ngIf="profileService.item$ | async as profile">
|
||||
{{ profile.name }}
|
||||
</div> -->
|
||||
<div class="page">
|
||||
<app-profile-widget *ngFor="let pubkey of pubkeys" [pubkey]="pubkey"></app-profile-widget>
|
||||
<div class="page" *ngIf="ui.profile$ | async as profile">
|
||||
{{profile.name}}
|
||||
|
||||
<app-profile-widget *ngFor="let pubkey of profile.following" [pubkey]="pubkey"></app-profile-widget>
|
||||
</div>
|
||||
</mat-tab-nav-panel>
|
||||
|
@ -6,6 +6,8 @@ import { ApplicationState } from '../services/applicationstate';
|
||||
import { DataService } from '../services/data';
|
||||
import { NostrEventDocument, NostrProfileDocument } from '../services/interfaces';
|
||||
import { ProfileService } from '../services/profile';
|
||||
import { QueueService } from '../services/queue';
|
||||
import { UIService } from '../services/ui';
|
||||
|
||||
@Component({
|
||||
selector: 'app-following',
|
||||
@ -15,35 +17,37 @@ import { ProfileService } from '../services/profile';
|
||||
})
|
||||
export class FollowingComponent {
|
||||
pubkey?: string;
|
||||
|
||||
subscriptions: Subscription[] = [];
|
||||
|
||||
pubkeys?: string[] = [];
|
||||
|
||||
constructor(private appState: ApplicationState, private dataService: DataService, public profileService: ProfileService, private activatedRoute: ActivatedRoute, private router: Router) {}
|
||||
constructor(public ui: UIService, private appState: ApplicationState, private dataService: DataService, public profileService: ProfileService, private activatedRoute: ActivatedRoute, private router: Router) {}
|
||||
|
||||
ngOnInit() {
|
||||
this.appState.showBackButton = true;
|
||||
|
||||
this.subscriptions.push(
|
||||
this.profileService.item$.subscribe((profile) => {
|
||||
this.ui.profile$.subscribe((profile) => {
|
||||
if (!profile) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.appState.title = `@${profile.name}`;
|
||||
this.appState.updateTitle(profile.name);
|
||||
|
||||
if (profile.following) {
|
||||
this.pubkeys = profile.following;
|
||||
} else {
|
||||
this.downloadFollowingAndRelays(profile);
|
||||
}
|
||||
// If there are no following calculated on the profile, attempt to get it.
|
||||
// if (profile.following) {
|
||||
this.dataService.enque({ type: 'Contacts', identifier: profile.pubkey });
|
||||
// this.downloadFollowingAndRelays(profile);
|
||||
// }
|
||||
})
|
||||
);
|
||||
|
||||
this.subscriptions.push(
|
||||
this.activatedRoute.paramMap.subscribe(async (params) => {
|
||||
const pubkey: any = params.get('id');
|
||||
|
||||
if (pubkey) {
|
||||
this.ui.setPubKey(pubkey);
|
||||
}
|
||||
|
||||
this.pubkey = pubkey;
|
||||
this.profileService.setItemByPubKey(pubkey);
|
||||
this.appState.backUrl = '/p/' + this.pubkey;
|
||||
@ -57,15 +61,15 @@ export class FollowingComponent {
|
||||
}
|
||||
}
|
||||
|
||||
downloadFollowingAndRelays(profile: NostrProfileDocument) {
|
||||
this.subscriptions.push(
|
||||
this.dataService.downloadNewestContactsEvents([profile.pubkey]).subscribe((event) => {
|
||||
const nostrEvent = event as NostrEventDocument;
|
||||
const publicKeys = nostrEvent.tags.map((t) => t[1]);
|
||||
// downloadFollowingAndRelays(profile: NostrProfileDocument) {
|
||||
// this.subscriptions.push(
|
||||
// this.dataService.downloadNewestContactsEvents([profile.pubkey]).subscribe((event) => {
|
||||
// const nostrEvent = event as NostrEventDocument;
|
||||
// const publicKeys = nostrEvent.tags.map((t) => t[1]);
|
||||
|
||||
this.profileService.following(profile.pubkey, publicKeys);
|
||||
this.pubkeys = publicKeys;
|
||||
})
|
||||
);
|
||||
}
|
||||
// this.profileService.following(profile.pubkey, publicKeys);
|
||||
// this.pubkeys = publicKeys;
|
||||
// })
|
||||
// );
|
||||
// }
|
||||
}
|
||||
|
@ -150,24 +150,30 @@ export class HomeComponent {
|
||||
'edcd20558f17d99327d841e4582f9b006331ac4010806efa020ef0d40078e6da',
|
||||
];
|
||||
|
||||
const observable = this.dataService.downloadNewestProfiles(array).subscribe((profile) => {
|
||||
console.log('PROFILE RECEIVED:', profile);
|
||||
|
||||
// let doc = profile as NostrEventDocument;
|
||||
// const index = array.findIndex((a) => a == doc.pubkey);
|
||||
|
||||
// if (index > -1) {
|
||||
// array.splice(index, 1);
|
||||
// }
|
||||
|
||||
// if (array.length === 0) {
|
||||
// console.log('FOUND ALL!!!!');
|
||||
// }
|
||||
array.map((pubkey) => {
|
||||
this.dataService.enque({ identifier: pubkey, type: 'Profile' });
|
||||
});
|
||||
|
||||
setInterval(() => {
|
||||
console.log('observable.closed:', observable.closed);
|
||||
}, 250);
|
||||
// const observable = this.dataService.downloadNewestProfiles(array);
|
||||
|
||||
// .subscribe((profile) => {
|
||||
// console.log('PROFILE RECEIVED:', profile);
|
||||
|
||||
// // let doc = profile as NostrEventDocument;
|
||||
// // const index = array.findIndex((a) => a == doc.pubkey);
|
||||
|
||||
// // if (index > -1) {
|
||||
// // array.splice(index, 1);
|
||||
// // }
|
||||
|
||||
// // if (array.length === 0) {
|
||||
// // console.log('FOUND ALL!!!!');
|
||||
// // }
|
||||
// });
|
||||
|
||||
// setInterval(() => {
|
||||
// console.log('observable.closed:', observable.closed);
|
||||
// }, 250);
|
||||
}
|
||||
|
||||
downloadProfiles2() {
|
||||
|
@ -71,10 +71,12 @@ export class ApplicationState {
|
||||
|
||||
connectedChanged: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(this.connected);
|
||||
|
||||
/** This will check if status has changed and trigger. If status is the same, the observable is not triggered. */
|
||||
updateConnectionStatus(status: boolean) {
|
||||
this.connected = status;
|
||||
this.connectedChanged.next(status);
|
||||
// }
|
||||
if (this.connected != status) {
|
||||
this.connected = status;
|
||||
this.connectedChanged.next(status);
|
||||
}
|
||||
}
|
||||
|
||||
visibility$: Observable<boolean>;
|
||||
|
@ -43,7 +43,6 @@ export class AuthenticationService {
|
||||
}
|
||||
|
||||
const publicKey = readOnlyKey || '354faab36ca511a7956f0bfc2b64e06fe5395cd7208d9b65d6665270298743d8';
|
||||
debugger;
|
||||
const user = this.createUser(publicKey);
|
||||
localStorage.setItem('blockcore:notes:nostr:pubkey', publicKey);
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { NostrEvent, NostrEventDocument, NostrProfileDocument, NostrRelay, NostrSubscription } from './interfaces';
|
||||
import { NostrEvent, NostrEventDocument, NostrProfileDocument, NostrRelay, NostrSubscription, QueryJob } from './interfaces';
|
||||
import { ProfileService } from './profile';
|
||||
import * as moment from 'moment';
|
||||
import { EventService } from './event';
|
||||
@ -7,8 +7,10 @@ import { RelayService } from './relay';
|
||||
import { Filter, Relay, Event, getEventHash, validateEvent, verifySignature } from 'nostr-tools';
|
||||
import { DataValidation } from './data-validation';
|
||||
import { ApplicationState } from './applicationstate';
|
||||
import { timeout, map, merge, Observable, Observer, race, take, switchMap, mergeMap, tap, finalize, concatMap, mergeAll, exhaustMap, catchError, of, combineAll, combineLatestAll, filter } from 'rxjs';
|
||||
import { timeout, map, merge, Observable, delay, Observer, race, take, switchMap, mergeMap, tap, finalize, concatMap, mergeAll, exhaustMap, catchError, of, combineAll, combineLatestAll, filter, from } from 'rxjs';
|
||||
import { Utilities } from './utilities';
|
||||
import { StorageService } from './storage';
|
||||
import { QueueService } from './queue';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root',
|
||||
@ -20,8 +22,31 @@ export class DataService {
|
||||
// downloadProfileInterval = 500;
|
||||
profileBatchSize = 20;
|
||||
refreshUserProfile = 1000 * 60 * 60 * 2; // Every second hour
|
||||
connected = false;
|
||||
|
||||
// Observable that can be merged with to avoid performing calls unless we have connected to relays.
|
||||
connected$ = this.appState.connected$.pipe(map((status) => status === true));
|
||||
|
||||
constructor(
|
||||
private queueService: QueueService,
|
||||
private profileService: ProfileService,
|
||||
private appState: ApplicationState,
|
||||
private utilities: Utilities,
|
||||
private validator: DataValidation,
|
||||
private eventService: EventService,
|
||||
private relayService: RelayService
|
||||
) {
|
||||
// We use a global observable for the connected state to avoid having many subscriptions and we'll skip processing until this is true.
|
||||
this.appState.connected$.subscribe((connected) => {
|
||||
console.log('Connection state changed: ', connected);
|
||||
this.connected = connected;
|
||||
|
||||
if (this.connected) {
|
||||
console.log('Connection established, start processing queues.');
|
||||
this.processQueues();
|
||||
}
|
||||
});
|
||||
|
||||
constructor(private appState: ApplicationState, private utilities: Utilities, private validator: DataValidation, private eventService: EventService, private relayService: RelayService) {
|
||||
// Whenever the profile service needs to get a profile from the network, this event is triggered.
|
||||
// this.profileService.profileRequested$.subscribe(async (pubkey) => {
|
||||
// if (!pubkey) {
|
||||
@ -65,6 +90,128 @@ export class DataService {
|
||||
|
||||
isFetching = false;
|
||||
profileQueue: string[] = [];
|
||||
queue: QueryJob[] = [];
|
||||
|
||||
/** Enques a job to be processed against connected relays. */
|
||||
enque(job: QueryJob) {
|
||||
// It is way more optimal to just delegate jobs into separate queues when enquing than querying later.
|
||||
if (job.type == 'Profile') {
|
||||
this.queueService.queues.profile.jobs.push(job);
|
||||
} else if (job.type == 'Contacts') {
|
||||
this.queueService.queues.contacts.jobs.push(job);
|
||||
} else if (job.type == 'Event') {
|
||||
this.queueService.queues.event.jobs.push(job);
|
||||
} else {
|
||||
throw Error(`This type of job (${job.type}) is currently not supported.`);
|
||||
}
|
||||
|
||||
// We always delay the processing in case we receive
|
||||
setTimeout(() => {
|
||||
this.processQueues();
|
||||
}, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Since most relays limits at 10 we must ensure we don't go above that.
|
||||
* 1 reserved for profile retrieval and we queue up maximum 50 on each in batches.
|
||||
* 1 reserved for live public feed.
|
||||
* 1 reserved for current profile feed.
|
||||
* 1 reserved for logged on user feed.
|
||||
* ...
|
||||
*/
|
||||
|
||||
processQueues() {
|
||||
if (!this.connected) {
|
||||
console.warn('Cannot process queues, no connection to relays.');
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.queueService.queues.profile.jobs.length > 0) {
|
||||
this.processProfileQueue();
|
||||
}
|
||||
|
||||
if (this.queueService.queues.contacts.jobs.length > 0) {
|
||||
this.processContactsQueue();
|
||||
}
|
||||
|
||||
if (this.queueService.queues.event.jobs.length > 0) {
|
||||
this.processEventQueue();
|
||||
}
|
||||
}
|
||||
|
||||
processEventQueue() {}
|
||||
|
||||
processProfileQueue() {
|
||||
console.log('processProfileQueue');
|
||||
// If already active, just skip processing for now.
|
||||
if (this.queueService.queues.profile.active) {
|
||||
console.log('processProfileQueue: skip');
|
||||
return;
|
||||
}
|
||||
|
||||
// Grab a batch of jobs.
|
||||
const jobs = this.queueService.queues.profile.jobs.splice(0, 50);
|
||||
const pubkeys = jobs.map((j) => j.identifier);
|
||||
|
||||
console.log('processProfileQueue: pubkeys', pubkeys);
|
||||
|
||||
// Download the profiles that was queued up.
|
||||
this.downloadNewestProfiles(pubkeys, 10000, pubkeys.length).subscribe(async (event) => {
|
||||
// const e = await event;
|
||||
console.log('processProfileQueue: event', event);
|
||||
|
||||
if (!event) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Make sure we run update and not put whenever we download the latest profile.
|
||||
await this.profileService.updateProfile(event.pubkey, event);
|
||||
});
|
||||
}
|
||||
|
||||
processContactsQueue() {
|
||||
console.log('processContactsQueue');
|
||||
// If already active, just skip processing for now.
|
||||
if (this.queueService.queues.contacts.active) {
|
||||
console.log('processContactsQueue: skip');
|
||||
return;
|
||||
}
|
||||
|
||||
this.queueService.queues.contacts.active = true;
|
||||
|
||||
// Grab a batch of jobs.
|
||||
const jobs = this.queueService.queues.contacts.jobs.splice(0, 50);
|
||||
const pubkeys = jobs.map((j) => j.identifier);
|
||||
|
||||
console.log('processContactsQueue: pubkeys', pubkeys);
|
||||
|
||||
// Use a dynamic timeout depending on the number of pubkeys requested.
|
||||
// const timeout = pubkeys.length * 1000;
|
||||
const timeout = pubkeys.length < 10 ? 10000 : 20000;
|
||||
|
||||
// Download the profiles that was queued up.
|
||||
this.downloadNewestContactsEvents(pubkeys, timeout)
|
||||
.pipe(
|
||||
finalize(() => {
|
||||
this.queueService.queues.contacts.active = false;
|
||||
console.log('processContactsQueue: completed');
|
||||
})
|
||||
)
|
||||
.subscribe(async (event) => {
|
||||
console.log('processContactsQueue: event', event);
|
||||
|
||||
if (!event) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Whenever we download the contacts document, we'll refresh the RELAYS and FOLLOWING
|
||||
// on the profile in question.
|
||||
const following = event.tags.map((t) => t[1]);
|
||||
|
||||
// Make sure we run update and not put whenever we download the latest profile.
|
||||
this.profileService.followingAndRelays(event.pubkey, following, event.content);
|
||||
});
|
||||
}
|
||||
|
||||
processProfilesQueue() {
|
||||
// console.log('processProfilesQueue', this.isFetching);
|
||||
@ -86,13 +233,14 @@ export class DataService {
|
||||
}
|
||||
}
|
||||
|
||||
// Observable that can be merged with to avoid performing calls unless we have connected to relays.
|
||||
connected$ = this.appState.connected$.pipe(map((status) => status === true));
|
||||
|
||||
/** Creates an observable that will attempt to get newest profile entry across all relays and perform multiple callbacks if newer is found. */
|
||||
downloadNewestProfiles(pubkeys: string[], requestTimeout = 10000) {
|
||||
return this.downloadNewestProfileEvents(pubkeys).pipe(
|
||||
map(async (event: any) => {
|
||||
downloadNewestProfiles(pubkeys: string[], requestTimeout = 10000, expectedCount = -1) {
|
||||
return this.downloadNewestProfileEvents(pubkeys, requestTimeout, expectedCount).pipe(
|
||||
map((event: any) => {
|
||||
if (!event) {
|
||||
return;
|
||||
}
|
||||
|
||||
const profile = this.utilities.mapProfileEvent(event);
|
||||
return profile;
|
||||
})
|
||||
@ -100,16 +248,16 @@ export class DataService {
|
||||
}
|
||||
|
||||
/** Creates an observable that will attempt to get newest profile events across all relays and perform multiple callbacks if newer is found. */
|
||||
downloadNewestProfileEvents(pubkeys: string[], requestTimeout = 10000) {
|
||||
return this.downloadNewestEvents(pubkeys, [0], requestTimeout);
|
||||
downloadNewestProfileEvents(pubkeys: string[], requestTimeout = 10000, expectedCount = -1) {
|
||||
return this.downloadNewestEvents(pubkeys, [0], requestTimeout, expectedCount);
|
||||
}
|
||||
|
||||
downloadNewestContactsEvents(pubkeys: string[], requestTimeout = 10000) {
|
||||
return this.downloadNewestEvents(pubkeys, [3], requestTimeout);
|
||||
downloadNewestContactsEvents(pubkeys: string[], requestTimeout = 10000, expectedEventCount = -1) {
|
||||
return this.downloadNewestEvents(pubkeys, [3], requestTimeout, expectedEventCount);
|
||||
}
|
||||
|
||||
downloadNewestEvents(pubkeys: string[], kinds: number[], requestTimeout = 10000) {
|
||||
return this.downloadNewestEventsByQuery([{ kinds: kinds, authors: pubkeys }]);
|
||||
downloadNewestEvents(pubkeys: string[], kinds: number[], requestTimeout = 10000, expectedEventCount = -1) {
|
||||
return this.downloadNewestEventsByQuery([{ kinds: kinds, authors: pubkeys }], requestTimeout, expectedEventCount);
|
||||
}
|
||||
|
||||
downloadEventsByTags(query: any[], requestTimeout = 10000) {
|
||||
@ -124,23 +272,20 @@ export class DataService {
|
||||
downloadEventByQuery(query: any, requestTimeout = 10000) {
|
||||
let event: any;
|
||||
|
||||
return (
|
||||
this.connected$
|
||||
// .pipe(take(1))
|
||||
.pipe(mergeMap(() => this.relayService.connectedRelays())) // TODO: Time this, it appears to take a lot of time??
|
||||
.pipe(mergeMap((relay) => this.downloadFromRelay(query, relay)))
|
||||
.pipe(
|
||||
filter((data) => {
|
||||
// Only trigger the reply once.
|
||||
if (!event) {
|
||||
event = data;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
})
|
||||
)
|
||||
);
|
||||
return this.connected$
|
||||
.pipe(mergeMap(() => this.relayService.connectedRelays())) // TODO: Time this, it appears to take a lot of time??
|
||||
.pipe(mergeMap((relay) => this.downloadFromRelay(query, relay)))
|
||||
.pipe(
|
||||
filter((data) => {
|
||||
// Only trigger the reply once.
|
||||
if (!event) {
|
||||
event = data;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
})
|
||||
);
|
||||
// .pipe(
|
||||
// timeout(requestTimeout),
|
||||
// catchError((error) => of(`The query timed out before it could complete: ${JSON.stringify(query)}.`))
|
||||
@ -148,41 +293,51 @@ export class DataService {
|
||||
}
|
||||
|
||||
/** Creates an observable that will attempt to get newest events across all relays and perform multiple callbacks if newer is found. */
|
||||
downloadNewestEventsByQuery(query: any, requestTimeout = 10000) {
|
||||
downloadNewestEventsByQuery(query: any, requestTimeout = 10000, expectedEventCount = -1) {
|
||||
// TODO: Tune the timeout. There is no point waiting for too long if the relay is overwhelmed with requests as we will simply build up massive backpressure in the client.
|
||||
const totalEvents: NostrEventDocument[] = [];
|
||||
// TODO: Figure out if we end up having memory leak with this totalEvents array.
|
||||
const observables = this.relayService.connectedRelays().map((relay) => this.downloadFromRelay(query, relay));
|
||||
|
||||
return (
|
||||
this.connected$
|
||||
// .pipe(take(1))
|
||||
.pipe(mergeMap(() => this.relayService.connectedRelays())) // TODO: Time this, it appears to take a lot of time??
|
||||
.pipe(mergeMap((relay) => this.downloadFromRelay(query, relay)))
|
||||
.pipe(
|
||||
filter((data) => {
|
||||
// This logic is to ensure we don't care about receiving the same data more than once, unless the data is newer.
|
||||
const existingEventIndex = totalEvents.findIndex((e) => e.id === data.id);
|
||||
if (existingEventIndex > -1) {
|
||||
const existingEvent = totalEvents[existingEventIndex];
|
||||
return merge(...observables)
|
||||
.pipe(
|
||||
finalize(() => {
|
||||
console.log('FINALIZED THE DOWNLOAD FROM ALL RELAYS!!');
|
||||
})
|
||||
)
|
||||
.pipe(
|
||||
filter((data, index) => {
|
||||
let result = false;
|
||||
|
||||
// Verify if newer, then replace
|
||||
if (existingEvent.created_at < data.created_at) {
|
||||
totalEvents[existingEventIndex] = data;
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
totalEvents.push(data);
|
||||
return true;
|
||||
// This logic is to ensure we don't care about receiving the same data more than once, unless the data is newer.
|
||||
const existingEventIndex = totalEvents.findIndex((e) => e.id === data.id);
|
||||
if (existingEventIndex > -1) {
|
||||
const existingEvent = totalEvents[existingEventIndex];
|
||||
|
||||
// Verify if newer, then replace
|
||||
if (existingEvent.created_at < data.created_at) {
|
||||
totalEvents[existingEventIndex] = data;
|
||||
result = true;
|
||||
}
|
||||
} else {
|
||||
totalEvents.push(data);
|
||||
result = true;
|
||||
}
|
||||
|
||||
return false;
|
||||
})
|
||||
)
|
||||
);
|
||||
// .pipe(
|
||||
// timeout(requestTimeout),
|
||||
// catchError((error) => of(`The query timed out before it could complete: ${JSON.stringify(query)}.`))
|
||||
// );
|
||||
if (expectedEventCount > -1 && expectedEventCount != 0) {
|
||||
expectedEventCount--;
|
||||
}
|
||||
|
||||
return result;
|
||||
})
|
||||
)
|
||||
.pipe(
|
||||
timeout(requestTimeout),
|
||||
catchError((error) => {
|
||||
console.warn('The observable was timed out.');
|
||||
return of(undefined);
|
||||
}) // Simply return undefined when the timeout is reached.
|
||||
);
|
||||
}
|
||||
|
||||
downloadEventsByQuery(query: any[], requestTimeout = 10000) {
|
||||
@ -225,9 +380,9 @@ export class DataService {
|
||||
console.log('subscribeToRelay:finished:unsub');
|
||||
// When the observable is finished, this return function is called.
|
||||
sub.unsub();
|
||||
const subIndex = relay.subscriptions.findIndex(s => s==sub)
|
||||
if (subIndex > -1){
|
||||
relay.subscriptions.splice (subIndex, 1)
|
||||
const subIndex = relay.subscriptions.findIndex((s) => s == sub);
|
||||
if (subIndex > -1) {
|
||||
relay.subscriptions.splice(subIndex, 1);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
@ -16,6 +16,14 @@ export interface Contact {
|
||||
name?: string;
|
||||
}
|
||||
|
||||
export interface QueryJob {
|
||||
type: 'Profile' | 'Event' | 'Contacts';
|
||||
identifier: string;
|
||||
// id?: string;
|
||||
// pubkeys: string[]; // Using array instead of singular job makes everything harder.
|
||||
// ids: string[];
|
||||
}
|
||||
|
||||
export interface Action {
|
||||
tooltip: string;
|
||||
icon: string;
|
||||
@ -46,7 +54,7 @@ export interface NostrRelay extends Relay {
|
||||
// nip11: any;
|
||||
// error: string;
|
||||
metadata: NostrRelayDocument;
|
||||
subscriptions: Sub [];
|
||||
subscriptions: Sub[];
|
||||
}
|
||||
|
||||
export interface NostrRelayDocument {
|
||||
@ -150,6 +158,8 @@ export interface NostrProfileDocument extends NostrProfile {
|
||||
created_at?: number;
|
||||
|
||||
following?: string[];
|
||||
|
||||
relays?: any;
|
||||
}
|
||||
|
||||
export interface CircleStyle {
|
||||
|
@ -8,7 +8,6 @@ import { StorageService } from './storage';
|
||||
import { liveQuery } from 'dexie';
|
||||
import { CacheService } from './cache';
|
||||
import { FetchService } from './fetch';
|
||||
import { DataService } from './data';
|
||||
import { dexieToRx } from '../shared/utilities';
|
||||
|
||||
@Injectable({
|
||||
@ -192,7 +191,7 @@ export class ProfileService {
|
||||
this.#profilesChangedSubject.next(undefined);
|
||||
}
|
||||
|
||||
constructor(private db: StorageService, private dataService: DataService, private fetchService: FetchService, private appState: ApplicationState, private utilities: Utilities) {}
|
||||
constructor(private db: StorageService, private fetchService: FetchService, private appState: ApplicationState, private utilities: Utilities) {}
|
||||
|
||||
// async downloadProfile(pubkey: string) {
|
||||
// this.#profileRequested.next(pubkey);
|
||||
@ -231,17 +230,17 @@ export class ProfileService {
|
||||
return;
|
||||
}
|
||||
|
||||
this.dataService.downloadNewestProfiles([pubkey]).subscribe(async (profile) => {
|
||||
// TODO: Figure out why we get Promise back here and not the time. No time to debug anymore!
|
||||
const p = await profile;
|
||||
// this.dataService.downloadNewestProfiles([pubkey]).subscribe(async (profile) => {
|
||||
// // TODO: Figure out why we get Promise back here and not the time. No time to debug anymore!
|
||||
// const p = await profile;
|
||||
|
||||
if (p) {
|
||||
this.updateProfile(p.pubkey, p);
|
||||
} else {
|
||||
console.log('NULL PROFILE!!');
|
||||
debugger;
|
||||
}
|
||||
});
|
||||
// if (p) {
|
||||
// this.updateProfile(p.pubkey, p);
|
||||
// } else {
|
||||
// console.log('NULL PROFILE!!');
|
||||
// debugger;
|
||||
// }
|
||||
// });
|
||||
})
|
||||
.catch((err) => {
|
||||
debugger;
|
||||
@ -460,19 +459,19 @@ export class ProfileService {
|
||||
await this.table.put(existingProfile);
|
||||
|
||||
// Now retrieve this profile
|
||||
this.dataService.downloadNewestProfiles([pubkey]).subscribe(async (profile) => {
|
||||
// TODO: Figure out why we get Promise back here and not the time. No time to debug anymore!
|
||||
const p = await profile;
|
||||
// this.dataService.downloadNewestProfiles([pubkey]).subscribe(async (profile) => {
|
||||
// // TODO: Figure out why we get Promise back here and not the time. No time to debug anymore!
|
||||
// const p = await profile;
|
||||
|
||||
console.log('Downloaded profile: ', p);
|
||||
// console.log('Downloaded profile: ', p);
|
||||
|
||||
if (p) {
|
||||
this.updateProfile(p.pubkey, p);
|
||||
} else {
|
||||
console.log('NULL PROFILE!!');
|
||||
debugger;
|
||||
}
|
||||
});
|
||||
// if (p) {
|
||||
// this.updateProfile(p.pubkey, p);
|
||||
// } else {
|
||||
// console.log('NULL PROFILE!!');
|
||||
// debugger;
|
||||
// }
|
||||
// });
|
||||
} else {
|
||||
profile.status = ProfileStatus.Follow;
|
||||
profile.modified = now;
|
||||
@ -491,6 +490,14 @@ export class ProfileService {
|
||||
});
|
||||
}
|
||||
|
||||
async followingAndRelays(pubkey: string, following: string[], relays: any) {
|
||||
return this.#updateProfileValues(pubkey, (profile) => {
|
||||
profile.following = following;
|
||||
profile.relays = relays;
|
||||
return profile;
|
||||
});
|
||||
}
|
||||
|
||||
async setCircle(pubkey: string, circle?: number) {
|
||||
return this.#updateProfileValues(pubkey, (profile) => {
|
||||
profile.circle = circle;
|
||||
|
21
src/app/services/queue.ts
Normal file
21
src/app/services/queue.ts
Normal file
@ -0,0 +1,21 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { QueryJob } from './interfaces';
|
||||
@Injectable({
|
||||
providedIn: 'root',
|
||||
})
|
||||
export class QueueService {
|
||||
queues = {
|
||||
profile: {
|
||||
active: false,
|
||||
jobs: [] as QueryJob[],
|
||||
},
|
||||
event: {
|
||||
active: false,
|
||||
jobs: [] as QueryJob[],
|
||||
},
|
||||
contacts: {
|
||||
active: false,
|
||||
jobs: [] as QueryJob[],
|
||||
},
|
||||
};
|
||||
}
|
@ -1,14 +1,15 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { NostrEventDocument, NostrRelay, NostrRelayDocument } from './interfaces';
|
||||
import { Observable, BehaviorSubject, from } from 'rxjs';
|
||||
import { Observable, BehaviorSubject, from, merge, timeout, catchError, of, finalize, tap } from 'rxjs';
|
||||
import { Relay, relayInit, Sub } from 'nostr-tools';
|
||||
import { EventService } from './event';
|
||||
import { OptionsService } from './options';
|
||||
import { ApplicationState } from './applicationstate';
|
||||
import { CacheService } from './cache';
|
||||
import { liveQuery } from 'dexie';
|
||||
import { liveQuery, Subscription } from 'dexie';
|
||||
import { StorageService } from './storage';
|
||||
import { dexieToRx } from '../shared/utilities';
|
||||
import { mergeNsAndName } from '@angular/compiler';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root',
|
||||
@ -384,6 +385,9 @@ export class RelayService {
|
||||
|
||||
async connect() {
|
||||
const items = await this.table.toArray();
|
||||
let relayCountCountdown = items.length;
|
||||
|
||||
const observables = [];
|
||||
|
||||
for (var i = 0; i < items.length; i++) {
|
||||
const entry = items[i];
|
||||
@ -394,8 +398,29 @@ export class RelayService {
|
||||
continue;
|
||||
}
|
||||
|
||||
this.openConnection(entry);
|
||||
observables.push(this.openConnection(entry));
|
||||
}
|
||||
|
||||
let timer: any;
|
||||
|
||||
merge(...observables).subscribe(() => {
|
||||
// As we receive an initial connection, let's create a new observable that will trigger the connection status
|
||||
// update either when everything is connected or a timeout is reached.
|
||||
relayCountCountdown--;
|
||||
|
||||
// If we reach zero, update the status immediately.
|
||||
if (relayCountCountdown == 0) {
|
||||
clearTimeout(timer);
|
||||
this.appState.updateConnectionStatus(true);
|
||||
}
|
||||
|
||||
if (!timer) {
|
||||
// Wait a maximum of 3 seconds for all connections to finish.
|
||||
timer = setTimeout(() => {
|
||||
this.appState.updateConnectionStatus(true);
|
||||
}, 3000);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async reset() {
|
||||
@ -425,7 +450,7 @@ export class RelayService {
|
||||
|
||||
// const relay = relayInit('wss://relay.nostr.info');
|
||||
const relay = relayInit(server.url) as NostrRelay;
|
||||
relay.subscriptions=[];
|
||||
relay.subscriptions = [];
|
||||
|
||||
relay.on('connect', () => {
|
||||
// console.log(`connected to ${relay?.url}`);
|
||||
@ -435,7 +460,7 @@ export class RelayService {
|
||||
|
||||
relay.on('disconnect', () => {
|
||||
console.log(`DISCONNECTED! ${relay?.url}`);
|
||||
relay.subscriptions=[];
|
||||
relay.subscriptions = [];
|
||||
});
|
||||
|
||||
relay.on('notice', (msg: any) => {
|
||||
@ -458,13 +483,18 @@ export class RelayService {
|
||||
}
|
||||
|
||||
openConnection(server: NostrRelayDocument) {
|
||||
this.#connectToRelay(server, (relay: Relay) => {
|
||||
console.log('Connected to:', relay.url);
|
||||
return new Observable((observer) => {
|
||||
this.#connectToRelay(server, (relay: Relay) => {
|
||||
console.log('Connected to:', relay.url);
|
||||
|
||||
// When finished, trigger an observable that we are connected.
|
||||
this.appState.updateConnectionStatus(true);
|
||||
// Put the connected relay into the array together with the metadata.
|
||||
this.relays.push(relay as NostrRelay);
|
||||
|
||||
// this.subscribeToFollowing(relay);
|
||||
observer.next(true);
|
||||
observer.complete();
|
||||
|
||||
// this.subscribeToFollowing(relay);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
62
src/app/services/ui.ts
Normal file
62
src/app/services/ui.ts
Normal file
@ -0,0 +1,62 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { ActivatedRoute } from '@angular/router';
|
||||
import { BehaviorSubject, Observable } from 'rxjs';
|
||||
import { NostrProfileDocument } from './interfaces';
|
||||
import { ProfileService } from './profile';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root',
|
||||
})
|
||||
export class UIService {
|
||||
constructor(private profileService: ProfileService) {
|
||||
this.pubkey$.subscribe((pubkey) => {
|
||||
if (!pubkey) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.profileService.getProfile(pubkey).subscribe((profile) => {
|
||||
this.#profile = profile;
|
||||
this.#profileChanged.next(this.#profile);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#pubkey: string | undefined = undefined;
|
||||
|
||||
get pubkey() {
|
||||
return this.#pubkey;
|
||||
}
|
||||
|
||||
#pubkeyChanged: BehaviorSubject<string | undefined> = new BehaviorSubject<string | undefined>(this.pubkey);
|
||||
|
||||
get pubkey$(): Observable<string | undefined> {
|
||||
return this.#pubkeyChanged.asObservable();
|
||||
}
|
||||
|
||||
setPubKey(pubkey: string) {
|
||||
this.#pubkey = pubkey;
|
||||
this.#pubkeyChanged.next(this.#pubkey);
|
||||
}
|
||||
|
||||
#profile: NostrProfileDocument | undefined = undefined;
|
||||
|
||||
get profile() {
|
||||
return this.#profile;
|
||||
}
|
||||
|
||||
#profileChanged: BehaviorSubject<NostrProfileDocument | undefined> = new BehaviorSubject<NostrProfileDocument | undefined>(this.profile);
|
||||
|
||||
get profile$(): Observable<NostrProfileDocument | undefined> {
|
||||
return this.#profileChanged.asObservable();
|
||||
}
|
||||
|
||||
// get profile$(): Observable<NostrProfileDocument | undefined> {
|
||||
// return this.#pubkeyChanged.pipe((pubkey) => {
|
||||
// if (!pubkey) {
|
||||
// return null;
|
||||
// }
|
||||
|
||||
// return this.profileService.getProfile(pubkey);
|
||||
// });
|
||||
// }
|
||||
}
|
@ -73,6 +73,7 @@ export class Utilities {
|
||||
const jsonParsed = JSON.parse(event.content) as NostrProfileDocument;
|
||||
const profile = this.validator.sanitizeProfile(jsonParsed) as NostrProfileDocument;
|
||||
profile.pubkey = event.pubkey;
|
||||
profile.created_at = event.created_at;
|
||||
return profile;
|
||||
} catch (err) {
|
||||
debugger;
|
||||
|
@ -90,4 +90,12 @@
|
||||
<mat-icon>content_copy</mat-icon>
|
||||
<span>Profile Data</span>
|
||||
</button>
|
||||
<button mat-menu-item *ngIf="this.profile?.following" (click)="copyFollowing()">
|
||||
<mat-icon>content_copy</mat-icon>
|
||||
<span>Following List</span>
|
||||
</button>
|
||||
<button mat-menu-item *ngIf="this.profile?.relays" (click)="copyRelays()">
|
||||
<mat-icon>content_copy</mat-icon>
|
||||
<span>Relay List</span>
|
||||
</button>
|
||||
</mat-menu>
|
||||
|
@ -106,6 +106,14 @@ export class ProfileActionsComponent {
|
||||
this.copy(JSON.stringify(profile));
|
||||
}
|
||||
|
||||
copyFollowing() {
|
||||
this.copy(JSON.stringify(this.profile!.following));
|
||||
}
|
||||
|
||||
copyRelays() {
|
||||
this.copy(this.profile!.relays);
|
||||
}
|
||||
|
||||
copy(text: string) {
|
||||
copyToClipboard(text);
|
||||
|
||||
|
@ -247,6 +247,10 @@ export class UserComponent {
|
||||
});
|
||||
|
||||
this.feedSubscription = this.dataService.downloadNewestEventsByQuery([{ kinds: [1], authors: [this.pubkey], limit: 100 }]).subscribe((event) => {
|
||||
if (!event) {
|
||||
return;
|
||||
}
|
||||
|
||||
const existingIndex = this.events.findIndex((e) => e.id == event.id);
|
||||
|
||||
if (existingIndex !== -1) {
|
||||
|
Loading…
Reference in New Issue
Block a user