Add download of profiles not being followed

This commit is contained in:
SondreB 2022-12-31 02:25:20 +01:00
parent a22aeddf14
commit 21da4b5cf1
No known key found for this signature in database
GPG Key ID: D6CC44C75005FDBF
6 changed files with 400 additions and 233 deletions

View File

@ -15,6 +15,7 @@ import { FeedService } from './services/feed.service';
import { RelayService } from './services/relay.service';
import { RelayStorageService } from './services/relay.storage.service';
import { DataService } from './services/data.service';
import { ProfileService } from './services/profile.service';
@Component({
selector: 'app-root',
@ -39,7 +40,8 @@ export class AppComponent {
private relayStorage: RelayStorageService,
private feedService: FeedService,
private relayService: RelayService,
private dataService: DataService
private dataService: DataService,
private profileService: ProfileService
) {
// appState.title = 'Blockcore Notes';
console.log('CONSTRUCTOR FOR APP!');
@ -69,6 +71,7 @@ export class AppComponent {
await this.storage.open();
await this.storage.initialize();
await this.profileService.populate();
await this.relayStorage.initialize();
await this.relayService.initialize();
await this.relayService.connect();

View File

@ -8,10 +8,10 @@ import { OptionsService } from './options.service';
providedIn: 'root',
})
export class DataValidation {
contentLimit = 560;
contentLimit = 1024;
tagsLimit = 10;
profileLimit = 1024;
profileLimit = 2048;
profileTagsLimit = 10;
constructor(private options: OptionsService) {}
@ -83,10 +83,18 @@ export class DataValidation {
return null;
}
// Reduce the content length to reduce system resource usage and improve UI experience.
if (event.content.length > this.contentLimit) {
event.content = event.content.substring(0, this.contentLimit);
event.contentCut = true;
if (event.kind === 0) {
// Reduce the content length to reduce system resource usage and improve UI experience.
if (event.content.length > this.profileLimit) {
event.content = event.content.substring(0, this.profileLimit);
event.contentCut = true;
}
} else {
// Reduce the content length to reduce system resource usage and improve UI experience.
if (event.content.length > this.contentLimit) {
event.content = event.content.substring(0, this.contentLimit);
event.contentCut = true;
}
}
// TODO: Do we need more validation for tags? Probably limited length?

View File

@ -1,32 +1,175 @@
import { Injectable } from '@angular/core';
import { NostrProfileDocument } from './interfaces';
import { NostrEvent, NostrProfileDocument } from './interfaces';
import { StorageService } from './storage.service';
import { ProfileService } from './profile.service';
import * as moment from 'moment';
import { FeedService } from './feed.service';
import { EventService } from './event.service';
import { RelayService } from './relay.service';
import { Relay } from 'nostr-tools';
import { DataValidation } from './data-validation.service';
@Injectable({
providedIn: 'root',
})
export class DataService {
daysToKeepProfiles = 14;
cleanProfileInterval = 1000 * 60 * 60; // Every hour
//downloadProfileInterval = 1000 * 3; // Every 3 seconds
downloadProfileInterval = 250; // Every 250 ms
constructor(private storage: StorageService, private profile: ProfileService) {}
constructor(private storage: StorageService, private profileService: ProfileService, private feedService: FeedService, private validator: DataValidation, private eventService: EventService, private relayService: RelayService) {
// Whenever the profile service needs to get a profile from the network, this event is triggered.
this.profileService.profileRequested$.subscribe(async (pubkey) => {
if (!pubkey) {
return;
}
async initialize() {
// Don't start the data processing until an initial timeout.
setTimeout(() => {
this.process();
}, 5000);
await this.downloadProfile(pubkey);
});
}
async process() {
console.log('Data Service Process Interval...');
await this.cleanProfiles();
async initialize() {
setTimeout(async () => {
await this.cleanProfiles();
}, this.cleanProfileInterval);
setTimeout(async () => {
await this.process();
}, 5000);
await this.downloadProfiles();
}, this.downloadProfileInterval);
}
async downloadProfiles() {
// this.profile.profileDownloadQueue
// this.profile.downloadProfile()
this.processProfilesQueue();
setTimeout(async () => {
await this.downloadProfiles();
}, this.downloadProfileInterval);
}
// scheduleProfileDownload() {
// setTimeout(() => {
// this.processProfilesQueue();
// this.scheduleProfileDownload();
// }, 5000);
// }
isFetching = false;
profileQueue: string[] = [];
processProfilesQueue() {
// console.log('processProfilesQueue', this.isFetching);
// If currently fetching, just skip until next interval.
if (this.isFetching) {
return;
}
// Grab all queued up profiles and ask for them, or should we have a maximum item?
// For now, let us grab 10 and process those until next interval.
const pubkeys = this.profileQueue.splice(0, 10);
this.fetchProfiles(this.relayService.relays[0], pubkeys);
}
async downloadProfile(pubkey: string) {
console.log('ADD DOWNLOAD PROFILE:', pubkey);
if (!pubkey) {
debugger;
return;
}
this.profileQueue.push(pubkey);
// Wait some CPU cycles for potentially more profiles before we process.
setTimeout(() => {
this.processProfilesQueue();
}, 500);
// TODO: Loop all relays until we find the profile.
// return this.fetchProfiles(this.relays[0], [pubkey]);
}
fetchProfiles(relay: Relay, authors: string[]) {
if (!authors || authors.length === 0) {
return;
}
console.log('FETCHING PROFILE!', authors);
// Add a protection timeout if we never receive the profiles. After 30 seconds, cancel and allow query to continue.
setTimeout(() => {
this.isFetching = false;
try {
profileSub.unsub();
} catch (err) {
console.warn('Error during automatic failover for profile fetch.', err);
}
}, 30000);
this.isFetching = true;
let profileSub = relay.sub([{ kinds: [0], authors: authors }], {});
profileSub.on('event', async (originalEvent: NostrEvent) => {
console.log('EVENT ON PROFILE:', originalEvent);
const prossedEvent = this.eventService.processEvent(originalEvent);
if (!prossedEvent) {
return;
}
try {
const jsonParsed = JSON.parse(prossedEvent.content) as NostrProfileDocument;
const profile = this.validator.sanitizeProfile(jsonParsed) as NostrProfileDocument;
console.log('GOT PROFILE:;', profile);
// Persist the profile.
await this.profileService.updateProfile(prossedEvent.pubkey, profile);
// TODO: Add NIP-05 and nostr.directory verification.
// const displayName = encodeURIComponent(profile.name);
// const url = `https://www.nostr.directory/.well-known/nostr.json?name=${displayName}`;
// const rawResponse = await fetch(url, {
// method: 'GET',
// mode: 'cors',
// });
// if (rawResponse.status === 200) {
// const content = await rawResponse.json();
// const directoryPublicKey = content.names[displayName];
// if (event.pubkey === directoryPublicKey) {
// if (!profile.verifications) {
// profile.verifications = [];
// }
// profile.verifications.push('@nostr.directory');
// // Update the profile with verification data.
// await this.profile.putProfile(event.pubkey, profile);
// } else {
// // profile.verified = false;
// console.warn('Nickname reuse:', url);
// }
// } else {
// // profile.verified = false;
// }
} catch (err) {
console.warn('This profile event was not parsed due to errors:', prossedEvent);
}
});
profileSub.on('eose', () => {
console.log('eose for profile', authors);
profileSub.unsub();
this.isFetching = false;
});
}
async cleanProfiles() {
@ -49,5 +192,9 @@ export class DataService {
await profileTable.del(key);
}
}
setTimeout(async () => {
await this.cleanProfiles();
}, this.cleanProfileInterval);
}
}

View File

@ -236,12 +236,12 @@ export class FeedService {
});
}
scheduleProfileDownload() {
setTimeout(() => {
this.processProfilesQueue();
this.scheduleProfileDownload();
}, 5000);
}
// scheduleProfileDownload() {
// setTimeout(() => {
// this.processProfilesQueue();
// this.scheduleProfileDownload();
// }, 5000);
// }
// public subscribe<T>(key: string): Observable<EventBusMetaData> {
// return this.eventBus.asObservable().pipe(
@ -551,135 +551,135 @@ export class FeedService {
});
}
async downloadProfile(pubkey: string) {
console.log('ADD DOWNLOAD PROFILE:', pubkey);
if (!pubkey) {
debugger;
return;
}
// async downloadProfile(pubkey: string) {
// console.log('ADD DOWNLOAD PROFILE:', pubkey);
// if (!pubkey) {
// debugger;
// return;
// }
this.profileQueue.push(pubkey);
// this.profileQueue.push(pubkey);
// Wait some CPU cycles for potentially more profiles before we process.
setTimeout(() => {
this.processProfilesQueue();
}, 500);
// // Wait some CPU cycles for potentially more profiles before we process.
// setTimeout(() => {
// this.processProfilesQueue();
// }, 500);
// TODO: Loop all relays until we find the profile.
// return this.fetchProfiles(this.relays[0], [pubkey]);
}
// // TODO: Loop all relays until we find the profile.
// // return this.fetchProfiles(this.relays[0], [pubkey]);
// }
profileQueue: string[] = [];
// profileQueue: string[] = [];
processProfilesQueue() {
// console.log('processProfilesQueue', this.isFetching);
// processProfilesQueue() {
// // console.log('processProfilesQueue', this.isFetching);
// If currently fetching, just skip until next interval.
if (this.isFetching) {
return;
}
// // If currently fetching, just skip until next interval.
// if (this.isFetching) {
// return;
// }
// Grab all queued up profiles and ask for them, or should we have a maximum item?
// For now, let us grab 10 and process those until next interval.
const pubkeys = this.profileQueue.splice(0, 10);
this.fetchProfiles(this.relayService.relays[0], pubkeys);
}
// // Grab all queued up profiles and ask for them, or should we have a maximum item?
// // For now, let us grab 10 and process those until next interval.
// const pubkeys = this.profileQueue.splice(0, 10);
// this.fetchProfiles(this.relayService.relays[0], pubkeys);
// }
isFetching = false;
// isFetching = false;
fetchProfiles(relay: Relay, authors: string[]) {
if (!authors || authors.length === 0) {
return;
}
// fetchProfiles(relay: Relay, authors: string[]) {
// if (!authors || authors.length === 0) {
// return;
// }
console.log('FETCHING PROFILE!', authors);
// console.log('FETCHING PROFILE!', authors);
// Add a protection timeout if we never receive the profiles. After 30 seconds, cancel and allow query to continue.
setTimeout(() => {
this.isFetching = false;
// // Add a protection timeout if we never receive the profiles. After 30 seconds, cancel and allow query to continue.
// setTimeout(() => {
// this.isFetching = false;
try {
profileSub.unsub();
} catch (err) {
console.warn('Error during automatic failover for profile fetch.', err);
}
}, 30000);
// try {
// profileSub.unsub();
// } catch (err) {
// console.warn('Error during automatic failover for profile fetch.', err);
// }
// }, 30000);
this.isFetching = true;
let profileSub = relay.sub([{ kinds: [0], authors: authors }], {});
// this.isFetching = true;
// let profileSub = relay.sub([{ kinds: [0], authors: authors }], {});
profileSub.on('event', async (originalEvent: NostrEvent) => {
console.log('EVENT ON PROFILE:', originalEvent);
const event = this.eventService.processEvent(originalEvent);
// profileSub.on('event', async (originalEvent: NostrEvent) => {
// console.log('EVENT ON PROFILE:', originalEvent);
// const event = this.eventService.processEvent(originalEvent);
if (!event) {
return;
}
// if (!event) {
// return;
// }
try {
const profile = this.validator.sanitizeProfile(JSON.parse(event.content) as NostrProfileDocument) as NostrProfileDocument;
// try {
// const profile = this.validator.sanitizeProfile(JSON.parse(event.content) as NostrProfileDocument) as NostrProfileDocument;
console.log('GOT PROFILE:;', profile);
// console.log('GOT PROFILE:;', profile);
// Persist the profile.
await this.profileService.updateProfile(event.pubkey, profile);
// // Persist the profile.
// await this.profileService.updateProfile(event.pubkey, profile);
// TODO: Add NIP-05 and nostr.directory verification.
// const displayName = encodeURIComponent(profile.name);
// const url = `https://www.nostr.directory/.well-known/nostr.json?name=${displayName}`;
// // TODO: Add NIP-05 and nostr.directory verification.
// // const displayName = encodeURIComponent(profile.name);
// // const url = `https://www.nostr.directory/.well-known/nostr.json?name=${displayName}`;
// const rawResponse = await fetch(url, {
// method: 'GET',
// mode: 'cors',
// });
// // const rawResponse = await fetch(url, {
// // method: 'GET',
// // mode: 'cors',
// // });
// if (rawResponse.status === 200) {
// const content = await rawResponse.json();
// const directoryPublicKey = content.names[displayName];
// // if (rawResponse.status === 200) {
// // const content = await rawResponse.json();
// // const directoryPublicKey = content.names[displayName];
// if (event.pubkey === directoryPublicKey) {
// if (!profile.verifications) {
// profile.verifications = [];
// }
// // if (event.pubkey === directoryPublicKey) {
// // if (!profile.verifications) {
// // profile.verifications = [];
// // }
// profile.verifications.push('@nostr.directory');
// // profile.verifications.push('@nostr.directory');
// // Update the profile with verification data.
// await this.profile.putProfile(event.pubkey, profile);
// } else {
// // profile.verified = false;
// console.warn('Nickname reuse:', url);
// }
// } else {
// // profile.verified = false;
// }
} catch (err) {
console.warn('This profile event was not parsed due to errors:', event);
}
});
// // // Update the profile with verification data.
// // await this.profile.putProfile(event.pubkey, profile);
// // } else {
// // // profile.verified = false;
// // console.warn('Nickname reuse:', url);
// // }
// // } else {
// // // profile.verified = false;
// // }
// } catch (err) {
// console.warn('This profile event was not parsed due to errors:', event);
// }
// });
profileSub.on('eose', () => {
console.log('eose for profile', authors);
profileSub.unsub();
this.isFetching = false;
});
}
// profileSub.on('eose', () => {
// console.log('eose for profile', authors);
// profileSub.unsub();
// this.isFetching = false;
// });
// }
async initialize() {
// Whenever the profile service needs to get a profile from the network, this event is triggered.
this.profileService.profileRequested$.subscribe(async (pubkey) => {
if (!pubkey) {
return;
}
// this.profileService.profileRequested$.subscribe(async (pubkey) => {
// if (!pubkey) {
// return;
// }
await this.downloadProfile(pubkey);
});
// await this.downloadProfile(pubkey);
// });
// TODO: Use rxjs to trigger the queue to process and then complete, don't do this setInterval.
this.scheduleProfileDownload();
// this.scheduleProfileDownload();
// Populate the profile observable.
await this.profileService.populate();
// await this.profileService.populate();
// Load all persisted events. This will of course be too many as user get more and more... so
// this must be changed into a filter ASAP. Two filters are needed: "Current View" which allows scrolling back in time,
@ -689,9 +689,9 @@ export class FeedService {
this.#eventsUpdated();
// Every time profiles are updated, we must change our profile subscription.
this.profileService.profiles$.subscribe((profiles) => {
console.log('Profiles changed:', profiles);
});
// this.profileService.profiles$.subscribe((profiles) => {
// console.log('Profiles changed:', profiles);
// });
// this.openConnection('wss://nostr-pub.wellorder.net');
// this.openConnection('wss://nostr-pub2.wellorder.net');

View File

@ -65,11 +65,20 @@ export class ProfileService {
this.#profileRequested.next(pubkey);
}
// profileDownloadQueue: string[] = [];
/** Will attempt to get the profile from local storage, if not available will attempt to get from relays. */
async getProfile(pubkey: string) {
const profile = await this.#get<NostrProfileDocument>(pubkey);
if (!profile) {
await this.downloadProfile(pubkey);
// if (!this.profileDownloadQueue.find((p) => p === pubkey)) {
// // Register this profile in queue for downloading
// this.profileDownloadQueue.unshift(pubkey);
// }
return;
}

View File

@ -302,12 +302,12 @@ export class RelayService {
});
}
scheduleProfileDownload() {
setTimeout(() => {
this.processProfilesQueue();
this.scheduleProfileDownload();
}, 5000);
}
// scheduleProfileDownload() {
// setTimeout(() => {
// this.processProfilesQueue();
// this.scheduleProfileDownload();
// }, 5000);
// }
async downloadRecent(pubkeys: string[]) {
console.log('DOWNLOAD RECENT FOR:', pubkeys);
@ -380,119 +380,119 @@ export class RelayService {
});
}
async downloadProfile(pubkey: string) {
console.log('ADD DOWNLOAD PROFILE:', pubkey);
if (!pubkey) {
debugger;
return;
}
// async downloadProfile(pubkey: string) {
// console.log('ADD DOWNLOAD PROFILE:', pubkey);
// if (!pubkey) {
// debugger;
// return;
// }
this.profileQueue.push(pubkey);
// this.profileQueue.push(pubkey);
// Wait some CPU cycles for potentially more profiles before we process.
setTimeout(() => {
this.processProfilesQueue();
}, 500);
// // Wait some CPU cycles for potentially more profiles before we process.
// setTimeout(() => {
// this.processProfilesQueue();
// }, 500);
// TODO: Loop all relays until we find the profile.
// return this.fetchProfiles(this.relays[0], [pubkey]);
}
// // TODO: Loop all relays until we find the profile.
// // return this.fetchProfiles(this.relays[0], [pubkey]);
// }
profileQueue: string[] = [];
// profileQueue: string[] = [];
processProfilesQueue() {
// console.log('processProfilesQueue', this.isFetching);
// processProfilesQueue() {
// // console.log('processProfilesQueue', this.isFetching);
// If currently fetching, just skip until next interval.
if (this.isFetching) {
return;
}
// // If currently fetching, just skip until next interval.
// if (this.isFetching) {
// return;
// }
// Grab all queued up profiles and ask for them, or should we have a maximum item?
// For now, let us grab 10 and process those until next interval.
const pubkeys = this.profileQueue.splice(0, 10);
this.fetchProfiles(this.relays[0], pubkeys);
}
// // Grab all queued up profiles and ask for them, or should we have a maximum item?
// // For now, let us grab 10 and process those until next interval.
// const pubkeys = this.profileQueue.splice(0, 10);
// this.fetchProfiles(this.relays[0], pubkeys);
// }
isFetching = false;
// isFetching = false;
fetchProfiles(relay: Relay, authors: string[]) {
if (!authors || authors.length === 0) {
return;
}
// fetchProfiles(relay: Relay, authors: string[]) {
// if (!authors || authors.length === 0) {
// return;
// }
console.log('FETCHING PROFILE!', authors);
// console.log('FETCHING PROFILE!', authors);
// Add a protection timeout if we never receive the profiles. After 30 seconds, cancel and allow query to continue.
setTimeout(() => {
this.isFetching = false;
// // Add a protection timeout if we never receive the profiles. After 30 seconds, cancel and allow query to continue.
// setTimeout(() => {
// this.isFetching = false;
try {
profileSub.unsub();
} catch (err) {
console.warn('Error during automatic failover for profile fetch.', err);
}
}, 30000);
// try {
// profileSub.unsub();
// } catch (err) {
// console.warn('Error during automatic failover for profile fetch.', err);
// }
// }, 30000);
this.isFetching = true;
let profileSub = relay.sub([{ kinds: [0], authors: authors }], {});
// this.isFetching = true;
// let profileSub = relay.sub([{ kinds: [0], authors: authors }], {});
profileSub.on('event', async (originalEvent: NostrEvent) => {
console.log('EVENT ON PROFILE:', originalEvent);
const event = this.eventService.processEvent(originalEvent);
// profileSub.on('event', async (originalEvent: NostrEvent) => {
// console.log('EVENT ON PROFILE:', originalEvent);
// const event = this.eventService.processEvent(originalEvent);
if (!event) {
return;
}
// if (!event) {
// return;
// }
try {
const profile = this.validator.sanitizeProfile(JSON.parse(event.content) as NostrProfileDocument) as NostrProfileDocument;
// try {
// const profile = this.validator.sanitizeProfile(JSON.parse(event.content) as NostrProfileDocument) as NostrProfileDocument;
console.log('GOT PROFILE:;', profile);
// console.log('GOT PROFILE:;', profile);
// Persist the profile.
await this.profileService.updateProfile(event.pubkey, profile);
// // Persist the profile.
// await this.profileService.updateProfile(event.pubkey, profile);
// TODO: Add NIP-05 and nostr.directory verification.
// const displayName = encodeURIComponent(profile.name);
// const url = `https://www.nostr.directory/.well-known/nostr.json?name=${displayName}`;
// // TODO: Add NIP-05 and nostr.directory verification.
// // const displayName = encodeURIComponent(profile.name);
// // const url = `https://www.nostr.directory/.well-known/nostr.json?name=${displayName}`;
// const rawResponse = await fetch(url, {
// method: 'GET',
// mode: 'cors',
// });
// // const rawResponse = await fetch(url, {
// // method: 'GET',
// // mode: 'cors',
// // });
// if (rawResponse.status === 200) {
// const content = await rawResponse.json();
// const directoryPublicKey = content.names[displayName];
// // if (rawResponse.status === 200) {
// // const content = await rawResponse.json();
// // const directoryPublicKey = content.names[displayName];
// if (event.pubkey === directoryPublicKey) {
// if (!profile.verifications) {
// profile.verifications = [];
// }
// // if (event.pubkey === directoryPublicKey) {
// // if (!profile.verifications) {
// // profile.verifications = [];
// // }
// profile.verifications.push('@nostr.directory');
// // profile.verifications.push('@nostr.directory');
// // Update the profile with verification data.
// await this.profile.putProfile(event.pubkey, profile);
// } else {
// // profile.verified = false;
// console.warn('Nickname reuse:', url);
// }
// } else {
// // profile.verified = false;
// }
} catch (err) {
console.warn('This profile event was not parsed due to errors:', event);
}
});
// // // Update the profile with verification data.
// // await this.profile.putProfile(event.pubkey, profile);
// // } else {
// // // profile.verified = false;
// // console.warn('Nickname reuse:', url);
// // }
// // } else {
// // // profile.verified = false;
// // }
// } catch (err) {
// console.warn('This profile event was not parsed due to errors:', event);
// }
// });
profileSub.on('eose', () => {
console.log('eose for profile', authors);
profileSub.unsub();
this.isFetching = false;
});
}
// profileSub.on('eose', () => {
// console.log('eose for profile', authors);
// profileSub.unsub();
// this.isFetching = false;
// });
// }
/** Takes relay in the format used for extensions and adds to persistent storage. This method does not connect to relays. */
async appendRelays(relays: any) {
@ -592,31 +592,31 @@ export class RelayService {
async initialize() {
// Whenever the profile service needs to get a profile from the network, this event is triggered.
this.profileService.profileRequested$.subscribe(async (pubkey) => {
if (!pubkey) {
return;
}
// this.profileService.profileRequested$.subscribe(async (pubkey) => {
// if (!pubkey) {
// return;
// }
await this.downloadProfile(pubkey);
});
// await this.downloadProfile(pubkey);
// });
// TODO: Use rxjs to trigger the queue to process and then complete, don't do this setInterval.
this.scheduleProfileDownload();
// // TODO: Use rxjs to trigger the queue to process and then complete, don't do this setInterval.
// this.scheduleProfileDownload();
// Populate the profile observable.
await this.profileService.populate();
// await this.profileService.populate();
// Load all persisted events. This will of course be too many as user get more and more... so
// this must be changed into a filter ASAP. Two filters are needed: "Current View" which allows scrolling back in time,
// and an initial load which should likely just return top 100?
this.events = await this.followEvents(50);
// this.events = await this.followEvents(50);
this.#updated();
// this.#updated();
// Every time profiles are updated, we must change our profile subscription.
this.profileService.profiles$.subscribe((profiles) => {
console.log('Profiles changed:', profiles);
});
// this.profileService.profiles$.subscribe((profiles) => {
// console.log('Profiles changed:', profiles);
// });
// this.openConnection('wss://relay.damus.io');
// this.openConnection('wss://nostr-pub.wellorder.net');