From 79d484b0ca2db889bd5f9b1a6c757aae90cad4ef Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Tue, 21 Feb 2023 10:04:10 -0600 Subject: [PATCH] Rework alerts --- ROADMAP.md | 70 ++-- src/agent/database.ts | 16 +- src/agent/network.ts | 151 ++++++--- src/agent/pool.ts | 362 ++++++++------------- src/agent/relays.ts | 12 +- src/agent/sync.ts | 2 +- src/app/alerts.js | 68 ++-- src/app/connection.js | 6 +- src/app/messages.js | 16 +- src/partials/{Like.svelte => Alert.svelte} | 18 +- src/partials/Channel.svelte | 6 +- src/partials/Notes.svelte | 4 +- src/partials/RelayCard.svelte | 2 +- src/partials/RelayCardSimple.svelte | 5 +- src/routes/Alerts.svelte | 32 +- src/routes/Chat.svelte | 13 +- src/routes/ChatRoom.svelte | 35 +- src/routes/Messages.svelte | 20 +- src/routes/Person.svelte | 20 +- src/routes/RelayList.svelte | 2 +- src/util/nostr.js | 2 +- src/views/ConnectUser.svelte | 82 +++-- src/views/NoteDetail.svelte | 8 +- src/views/SearchPeople.svelte | 5 +- 24 files changed, 500 insertions(+), 457 deletions(-) rename src/partials/{Like.svelte => Alert.svelte} (78%) diff --git a/ROADMAP.md b/ROADMAP.md index 219aba96..17c540ba 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,34 +1,51 @@ # Current -- [ ] Try lumping tables into a single key each to reduce load/save contention and time - [ ] Keep track of relays that fail to connect and don't use them -- [ ] Do round robin of user read relays batched by 10 on global feed - - Try paginating again, keep track of last time the feed was visited to ensure fresh content -- [ ] Put feed state and scroll positionn outside component so you can go back to it. -- [ ] Trim feeds once the user scrolls way down to save on memory -- [ ] Make main page for notes a list of editable custom view cards - [ ] Fix loading routes speed, index by pubkey to avoid filtering - [ ] Include everyone in person lists, re-fetch missing people - [ ] Fix initial relay loading, don't nuke people's relay lists +- [ ] Don't waste space caching rooms, load those lazily -# Snacks +# Next +- [ ] Relay detail page, with more information about the relay + tabbed feeds (notes, more?) +- [ ] Add customize icon and route with editable custom view cards using "lists" nip + - nevent1qqspjcqw2hu5gfcpkrjhs0aqvxuzjgtp50l375mcqjfpmk48cg5hevgpr3mhxue69uhkummnw3ez6un9d3shjtnhd3m8xtnnwpskxegpzamhxue69uhkummnw3ezuendwsh8w6t69e3xj7spramhxue69uhkummnw3ez6un9d3shjtnwdahxxefwv93kzer9d4usz9rhwden5te0wfjkccte9ejxzmt4wvhxjmcpr9mhxue69uhkummnw3ezuer9d3hjuum0ve68wctjv5n8hwfg + - [ ] Custom views should combine pubkeys, relays, event ids, and topics + +# Lightning + +- [ ] Linkify invoices +- [ ] Linkify bech32 entities w/ NIP 21 https://github.com/nostr-protocol/nips/blob/master/21.md +- [ ] Support invoices, tips, zaps https://twitter.com/jb55/status/1604131336247476224 + - nevent1qqsd0x0xzfwtppu0n52ngw0zhynlwv0sjsr77aflcpufms2wrl3v8mspr9mhxue69uhhyetvv9ujuumwdae8gtnnda3kjctv9uqs7amnwvaz7tmwdaehgu3wd4hk6d7ewgp + +# Groups + +- [ ] Groups - may need a new NIP, or maybe use topics + +# More + +- [ ] Support relay auth - [ ] Following indicator on person info -- [ ] Change feed tabs to follows/network -- [ ] Don't lose feeds when navigating, persist modals. Remember scroll state - [ ] Share button for notes, shows qr code and nevent - [ ] If a user has no write relays (or is not logged in), open a modal - [ ] open web+nostr links like snort -- [ ] DM/chat read status in encrypted note +- [ ] Channels + - [ ] Separate chat and DMs + - [ ] Damus has chats divided into DMs and requests + - [ ] Ability to leave/mute DM conversation + - [ ] Add petnames for channels + - [ ] Add notifications for chat messages +- [ ] Add encrypted settings storage using nostr events + - [ ] Save DM/chat read status in encrypted note - [ ] Relay recommendations based on follows/followers - [ ] Pinned posts ala snort - [ ] Likes list on note detail. Maybe a sidebar or header for note detail page? - [ ] Support key delegation - https://github.com/nbd-wtf/nostr-tools/blob/master/nip26.ts - [ ] Add keyword mutes -- [ ] Add encrypted settings storage using nostr events - [ ] Attachments (a tag w/content type and url) -- [ ] Linkify bech32 entities w/ NIP 21 https://github.com/nostr-protocol/nips/blob/master/21.md - [ ] Sign in as user with one click to view things from their pubkey's perspective - do this with multiple accounts - nevent1qqsyyxtrhpsqeqaqgucd6uzpyh8eq2hkfgr0yzr7ku7tgyl5cn9jw5qpz3mhxue69uhhyetvv9ujumn0wd68ytnzvuq3gamnwvaz7tmjv4kxz7fwv3sk6atn9e5k7l564wx - [ ] Search by nip05 alias @@ -36,17 +53,7 @@ - [ ] Show options on note detail for retrieving replies - Replies from user's network - All replies from author's + user's read relays, including spam - -# Missions - -- [ ] Make feeds page customizable. This could potentially use the "lists" NIP - - nevent1qqspjcqw2hu5gfcpkrjhs0aqvxuzjgtp50l375mcqjfpmk48cg5hevgpr3mhxue69uhkummnw3ez6un9d3shjtnhd3m8xtnnwpskxegpzamhxue69uhkummnw3ezuendwsh8w6t69e3xj7spramhxue69uhkummnw3ez6un9d3shjtnwdahxxefwv93kzer9d4usz9rhwden5te0wfjkccte9ejxzmt4wvhxjmcpr9mhxue69uhkummnw3ezuer9d3hjuum0ve68wctjv5n8hwfg - - [ ] Click through on relays page to view a feed for only that relay. - - [ ] Custom views should combine pubkeys, relays, event ids, and topics - [ ] Topics/hashtag views -- [ ] Support paid relays - - atlas.nostr.land - - eden.nostr.land - [ ] Re-license using https://polyformproject.org/ - [ ] Image uploads - Default will charge via lightning and have a tos, others can self-host and skip that. @@ -55,9 +62,6 @@ - https://github.com/brandonsavage/Upload - https://github.com/seaweedfs/seaweedfs - https://github.com/cubefs/cubefs -- [ ] Support relay auth -- [ ] Support invoices, tips, zaps https://twitter.com/jb55/status/1604131336247476224 - - nevent1qqsd0x0xzfwtppu0n52ngw0zhynlwv0sjsr77aflcpufms2wrl3v8mspr9mhxue69uhhyetvv9ujuumwdae8gtnnda3kjctv9uqs7amnwvaz7tmwdaehgu3wd4hk6d7ewgp - [ ] Separate settings for read, write, and broadcast relays based on NIP 65 - [ ] Release to android - https://svelte-native.technology/docs @@ -66,20 +70,14 @@ - Capture certain events in a local db - File import/export from db, NFC transfer - Save user notes to db - -# Maintenance - + - Fixes when you hide something, but the event doesn't get retrived, and it gets un-hidden - [ ] Keep track of all relays an event was seen on -- [ ] Don't waste space caching rooms, load those lazily -- [ ] Normalize relay urls (lowercase, strip trailing slash) - [ ] Use nip 56 for reporting - https://github.com/nostr-protocol/nips/pull/205#issuecomment-1419234230 - [ ] Sync mentions box and in-reply mentions -- [ ] Channels - - [ ] Damus has chats divided into DMs and requests - - [ ] Ability to leave/mute DM conversation - - [ ] Add petnames for channels - - [ ] Add notifications for chat messages - [ ] Compress events - https://github.com/nostr-protocol/nips/issues/265#issuecomment-1434250263 -- [ ] If you hide something, but the event doesn't get retrived, it gets un-hidden +- [ ] Refine feeds + - [ ] Trim feeds once the user scrolls way down to save on memory + - [ ] Don't lose feeds when navigating, persist modals. Remember scroll state +- [ ] Offline-first diff --git a/src/agent/database.ts b/src/agent/database.ts index 84ab22b2..6084121a 100644 --- a/src/agent/database.ts +++ b/src/agent/database.ts @@ -184,7 +184,21 @@ class Table { const people = new Table('people', 'pubkey') const rooms = new Table('rooms', 'id') const messages = new Table('messages', 'id') -const alerts = new Table('alerts', 'id') + +const alerts = new Table('alerts', 'id', { + initialize: async table => { + // We changed our alerts format, clear out the old version + const isValid = alert => typeof alert.isMention === 'boolean' + const [valid, invalid] = partition(isValid, Object.values(await table.dump() || {})) + + console.log(valid, invalid) + + table.bulkRemove(pluck('id', invalid)) + + return valid + }, +}) + const relays = new Table('relays', 'url') const routes = new Table('routes', 'id', { diff --git a/src/agent/network.ts b/src/agent/network.ts index 8807ac10..01d38c58 100644 --- a/src/agent/network.ts +++ b/src/agent/network.ts @@ -1,10 +1,12 @@ -import {uniq, uniqBy, prop, map, propEq, indexBy, pluck} from 'ramda' +import type {MyEvent} from 'src/util/types' +import {uniq, uniqBy, prop, map, propEq, without, indexBy, pluck} from 'ramda' import {personKinds, findReplyId} from 'src/util/nostr' +import {log} from 'src/util/logger' import {chunk} from 'hurdak/lib/hurdak' import {batch, timedelta, now} from 'src/util/misc' import { getRelaysForEventParent, getAllPubkeyWriteRelays, aggregateScores, - getRelaysForEventChildren, sampleRelays, + getRelaysForEventChildren, sampleRelays, normalizeRelays, } from 'src/agent/relays' import database from 'src/agent/database' import pool from 'src/agent/pool' @@ -31,46 +33,93 @@ const publish = async (relays, event) => { return signedEvent } -const load = async (relays, filter, opts?): Promise[]> => { - const events = await pool.request(sampleRelays(relays), filter, opts) +const listen = ({relays, filter, onChunk, shouldProcess = true}) => { + relays = normalizeRelays(relays) - await sync.processEvents(events) - - return events -} - -const listen = (relays, filter, onEvents, {shouldProcess = true}: any = {}) => { - return pool.subscribe(sampleRelays(relays), filter, { + return pool.subscribe({ + filter, + relays, onEvent: batch(300, events => { if (shouldProcess) { sync.processEvents(events) } - if (onEvents) { - onEvents(events) + if (onChunk) { + onChunk(events) } }), }) } -const listenUntilEose = (relays, filter, onEvents, {shouldProcess = true}: any = {}) => { +const load = ({relays, filter, onChunk = null, shouldProcess = true, timeout = 10_000}) => { return new Promise(resolve => { - pool.subscribeUntilEose(sampleRelays(relays), filter, { - onClose: () => resolve(), - onEvent: batch(300, events => { + relays = normalizeRelays(relays) + + const now = Date.now() + const done = new Set() + const events = [] + + const attemptToComplete = async () => { + const sub = await subPromise + + // If we've already unsubscribed we're good + if (!sub.isActive()) { + return + } + + const isDone = done.size === relays.length + const isTimeout = Date.now() - now >= timeout + + if (isTimeout) { + const timedOutRelays = without(Array.from(done), relays) + + log(`Timing out ${timedOutRelays.length} relays after ${timeout}ms`, timedOutRelays) + + timedOutRelays.forEach(url => { + const conn = pool.getConnection(url) + + if (conn) { + conn.stats.timeouts += 1 + } + }) + } + + if (isDone || isTimeout) { + sub.unsub() + resolve(events) + } + } + + // If a relay takes too long, give up + setTimeout(attemptToComplete, timeout) + + const subPromise = pool.subscribe({ + relays, + filter, + onEvent: batch(300, event => { if (shouldProcess) { sync.processEvents(events) } - if (onEvents) { - onEvents(events) + if (onChunk) { + onChunk(events) } + + events.push(event) }), + onEose: url => { + done.add(url) + attemptToComplete() + }, + onError: url => { + done.add(url) + attemptToComplete() + }, }) - }) as Promise + }) as Promise } -const loadPeople = async (pubkeys, {relays = null, kinds = personKinds, force = false, ...opts} = {}) => { +const loadPeople = async (pubkeys, {relays = null, kinds = personKinds, force = false} = {}) => { pubkeys = uniq(pubkeys) // If we're not reloading, only get pubkeys we don't already know about @@ -80,11 +129,10 @@ const loadPeople = async (pubkeys, {relays = null, kinds = personKinds, force = await Promise.all( chunk(256, pubkeys).map(async chunk => { - await load( - sampleRelays(relays || getAllPubkeyWriteRelays(chunk), 0.5), - {kinds, authors: chunk}, - opts - ) + await load({ + relays: sampleRelays(relays || getAllPubkeyWriteRelays(chunk), 0.5), + filter: {kinds, authors: chunk}, + }) }) ) } @@ -92,10 +140,10 @@ const loadPeople = async (pubkeys, {relays = null, kinds = personKinds, force = const loadParents = notes => { const notesWithParent = notes.filter(findReplyId) - return load( - sampleRelays(aggregateScores(notesWithParent.map(getRelaysForEventParent)), 0.3), - {kinds: [1], ids: notesWithParent.map(findReplyId)} - ) + return load({ + relays: sampleRelays(aggregateScores(notesWithParent.map(getRelaysForEventParent)), 0.3), + filter: {kinds: [1], ids: notesWithParent.map(findReplyId)} + }) } const streamContext = ({notes, updateNotes, depth = 0}) => { @@ -110,35 +158,38 @@ const streamContext = ({notes, updateNotes, depth = 0}) => { } // Load authors and reactions in one subscription - listenUntilEose(relays, filter, events => { - const repliesByParentId = indexBy(findReplyId, events.filter(propEq('kind', 1))) - const reactionsByParentId = indexBy(findReplyId, events.filter(propEq('kind', 7))) + load({ + relays, + filter, + onChunk: events => { + const repliesByParentId = indexBy(findReplyId, events.filter(propEq('kind', 1))) + const reactionsByParentId = indexBy(findReplyId, events.filter(propEq('kind', 7))) - // Recur if we need to - if (depth > 0) { - streamContext({notes: events, updateNotes, depth: depth - 1}) - } - - const annotate = ({replies = [], reactions = [], children = [], ...note}) => { + // Recur if we need to if (depth > 0) { - children = uniqBy(prop('id'), children.concat(replies)) + streamContext({notes: events, updateNotes, depth: depth - 1}) } - return { - ...note, - replies: uniqBy(prop('id'), replies.concat(repliesByParentId[note.id] || [])), - reactions: uniqBy(prop('id'), reactions.concat(reactionsByParentId[note.id] || [])), - children: children.map(annotate), - } - } + const annotate = ({replies = [], reactions = [], children = [], ...note}) => { + if (depth > 0) { + children = uniqBy(prop('id'), children.concat(replies)) + } - updateNotes(map(annotate)) + return { + ...note, + replies: uniqBy(prop('id'), replies.concat(repliesByParentId[note.id] || [])), + reactions: uniqBy(prop('id'), reactions.concat(reactionsByParentId[note.id] || [])), + children: children.map(annotate), + } + } + + updateNotes(map(annotate)) + }, }) }) } export default { - publish, load, listen, listenUntilEose, loadPeople, personKinds, - loadParents, streamContext, + publish, listen, load, loadPeople, personKinds, loadParents, streamContext, } diff --git a/src/agent/pool.ts b/src/agent/pool.ts index 6fe7494f..636d4fdf 100644 --- a/src/agent/pool.ts +++ b/src/agent/pool.ts @@ -1,14 +1,14 @@ -import type {Relay} from 'nostr-tools' +import type {Relay, Filter} from 'nostr-tools' import type {MyEvent} from 'src/util/types' import {relayInit} from 'nostr-tools' -import {uniqBy, without, prop, find, is} from 'ramda' +import {is} from 'ramda' import {ensurePlural} from 'hurdak/lib/hurdak' import {warn, log, error} from 'src/util/logger' -import {isRelay} from 'src/util/nostr' -import {sleep} from 'src/util/misc' -import database from 'src/agent/database' +import {isRelay, normalizeRelayUrl} from 'src/util/nostr' -const connections = [] +// Connection management + +const connections = {} const CONNECTION_STATUS = { NEW: 'new', @@ -37,7 +37,7 @@ class Connection { activeSubsCount: 0, } - connections.push(this) + connections[url] = this } async connect() { const shouldConnect = ( @@ -59,29 +59,21 @@ class Connection { this.nostr.on('error', () => { this.status = CONNECTION_STATUS.ERROR }) - } - if (this.status === CONNECTION_STATUS.PENDING) { - try { - await this.promise - this.status = CONNECTION_STATUS.READY - } catch (e) { - this.status = CONNECTION_STATUS.ERROR - } + this.nostr.on('disconnect', () => { + this.status = CONNECTION_STATUS.CLOSED + }) } this.lastConnectionAttempt = Date.now() - return this - } - async disconnect() { - this.status = CONNECTION_STATUS.CLOSED - try { - await this.nostr.close() + await this.promise } catch (e) { - // For some reason bugsnag is saying this.nostr is undefined, even if we check it + // This is already handled in the on error handler above } + + return this } getQuality() { if (this.status === CONNECTION_STATUS.ERROR) { @@ -120,31 +112,138 @@ class Connection { const getConnections = () => connections -const findConnection = url => find(c => c.nostr.url === url, connections) +const getConnection = url => connections[url] -const connect = async url => { - const conn = findConnection(url) || new Connection(url) - - await database.relays.patch({url}) - await Promise.race([conn.connect(), sleep(5000)]) - - if (conn.status === 'ready') { - return conn +const connect = url => { + if (!isRelay(url)) { + throw new Error(`Invalid relay url ${url}`) } + + url = normalizeRelayUrl(url) + + if (!connections[url]) { + connections[url] = new Connection(url) + } + + return connections[url].connect() } +// Public api - publish/subscribe + const publish = async (relays, event) => { + if (relays.length === 0) { + error(`Attempted to publish to zero relays`, event) + } else { + log(`Publishing to ${relays.length} relays`, event, relays) + } + return Promise.all( - relays.filter(r => r.write !== '!' && isRelay(r.url)).map(async relay => { + relays.map(async relay => { const conn = await connect(relay.url) - if (conn) { + if (conn.status === CONNECTION_STATUS.READY) { return conn.nostr.publish(event) } }) ) } +type SubscribeOpts = { + relays: Relay[] + filter: Filter[] | Filter + onEvent: (event: MyEvent) => void + onError?: (url: string) => void + onEose?: (url: string) => void +} + +const subscribe = async ( + {relays, filter, onEvent, onEose, onError}: SubscribeOpts +) => { + filter = ensurePlural(filter) + + const id = createFilterId(filter) + const now = Date.now() + const seen = new Set() + const eose = new Set() + + let active = true + + if (relays.length === 0) { + error(`Attempted to start subscription ${id} with zero relays`, filter) + } else { + log(`Starting subscription ${id} with ${relays.length} relays`, filter, relays) + } + + const promises = relays.map(async relay => { + const conn = await connect(relay.url) + + if (conn.status !== 'ready') { + if (onError) { + onError(relay.url) + } + + return + } + + const sub = conn.nostr.sub(filter, {id}) + + sub.on('event', e => { + conn.stats.eventsCount += 1 + + if (!seen.has(e.id)) { + seen.add(e.id) + + onEvent({...e, seen_on: relay.url}) + } + }) + + sub.on('eose', () => { + if (onEose) { + onEose(conn.nostr.url) + } + + // Keep track of relay timing stats, but only for the first eose we get + if (!eose.has(conn.nostr.url)) { + eose.add(conn.nostr.url) + + conn.stats.eoseCount += 1 + conn.stats.eoseTimer += Date.now() - now + } + }) + + conn.stats.subsCount += 1 + conn.stats.activeSubsCount += 1 + + if (conn.stats.activeSubsCount > 10) { + warn(`Relay ${conn.nostr.url} has >10 active subscriptions`) + } + + return Object.assign(sub, {conn}) + }) + + return { + isActive: () => active, + unsub: () => { + log(`Closing subscription ${id}`) + + promises.forEach(async promise => { + const sub = await promise + + if (sub) { + sub.unsub() + active = false + sub.conn.stats.activeSubsCount -= 1 + } + }) + }, + } +} + +// Utils + +const createFilterId = filters => + [Math.random().toString().slice(2, 6), filters.map(describeFilter).join(':')].join('-') + const describeFilter = ({kinds = [], ...filter}) => { const parts = [] @@ -161,201 +260,6 @@ const describeFilter = ({kinds = [], ...filter}) => { return '(' + parts.join(',') + ')' } -const normalizeRelays = relays => uniqBy(prop('url'), relays.filter(r => isRelay(r.url))) - -const subscribe = async (relays, filters, {onEvent, onEose}: Record void>) => { - relays = normalizeRelays(relays) - filters = ensurePlural(filters) - - // Create a human readable subscription id for debugging - const id = [ - Math.random().toString().slice(2, 6), - filters.map(describeFilter).join(':'), - ].join('-') - - // Deduplicate events, track eose stats - const now = Date.now() - const seen = new Set() - const eose = new Set() - - if (relays.length === 0) { - error(`Attempted to start subscription ${id} with zero relays`, filters) - } else { - log(`Starting subscription ${id} with ${relays.length} relays`, filters, relays) - } - - // Don't await before returning so we're not blocking on slow connects - const promises = relays.map(async relay => { - const conn = await connect(relay.url) - - // If the relay failed to connect, give up - if (!conn || conn.status === 'closed') { - return null - } - - const sub = conn.nostr.sub(filters, {id}) - - if (onEvent) { - sub.on('event', e => { - if (!seen.has(e.id)) { - seen.add(e.id) - - conn.stats.eventsCount += 1 - e.seen_on = conn.nostr.url - - onEvent(e as MyEvent) - } - }) - } - - if (onEose) { - sub.on('eose', () => { - onEose(conn.nostr.url) - - // Keep track of relay timing stats, but only for the first eose we get - if (!eose.has(conn.nostr.url)) { - eose.add(conn.nostr.url) - - conn.stats.eoseCount += 1 - conn.stats.eoseTimer += Date.now() - now - } - }) - } - - conn.stats.subsCount += 1 - conn.stats.activeSubsCount += 1 - - if (conn.stats.activeSubsCount > 10) { - warn(`Relay ${conn.nostr.url} has >10 active subscriptions`) - } - - return Object.assign(sub, {conn}) - }) - - let active = true - - return { - isActive: () => active, - unsub: () => { - log(`Closing subscription ${id}`) - - promises.forEach(async promise => { - const sub = await promise - - if (sub) { - if (sub.conn.status === 'ready') { - sub.unsub() - } - - active = false - sub.conn.stats.activeSubsCount -= 1 - } - }) - }, - } -} - -const subscribeUntilEose = async ( - relays, - filters, - {onEvent, onEose, onClose, timeout = 10_000}: { - onEvent: (events: Array) => void, - onEose?: (url: string) => void, - onClose?: () => void, - timeout?: number - } -) => { - relays = normalizeRelays(relays) - - const now = Date.now() - const eose = new Set() - - const attemptToComplete = () => { - // If we've already unsubscribed we're good - if (!agg.isActive()) { - return - } - - const isComplete = eose.size === relays.length - const isTimeout = Date.now() - now >= timeout - - if (isTimeout) { - const timedOutRelays = without(Array.from(eose), relays) - - log(`Timing out ${timedOutRelays.length} relays after ${timeout}ms`, timedOutRelays) - - timedOutRelays.forEach(url => { - const conn = findConnection(url) - - if (conn) { - conn.stats.timeouts += 1 - } - }) - } - - if (isComplete || isTimeout) { - onClose?.() - agg.unsub() - } - } - - // If a relay takes too long, give up - setTimeout(attemptToComplete, timeout) - - const agg = await subscribe(relays, filters, { - onEvent, - onEose: url => { - onEose?.(url) - attemptToComplete() - }, - }) - - return agg -} - -const request = (relays, filters, {threshold = 0.5} = {}): Promise[]> => { - return new Promise(async resolve => { - relays = normalizeRelays(relays) - threshold = relays.length * threshold - - const now = Date.now() - const relaysWithEvents = new Set() - const events = [] - const eose = new Set() - - const attemptToComplete = () => { - const allEose = eose.size === relays.length - const atThreshold = Array.from(eose) - .filter(url => relaysWithEvents.has(url)).length >= threshold - - const hardTimeout = Date.now() - now >= 5000 - const softTimeout = ( - Date.now() - now >= 1000 - && eose.size > relays.length - Math.round(relays.length / 10) - ) - - if (allEose || atThreshold || hardTimeout || softTimeout) { - agg.unsub() - resolve(events) - } - } - - // If a relay takes too long, give up - setTimeout(attemptToComplete, 5000) - - const agg = await subscribe(relays, filters, { - onEvent: e => { - relaysWithEvents.add(e.seen_on) - events.push(e) - }, - onEose: async url => { - eose.add(url) - attemptToComplete() - }, - }) - }) -} - export default { - getConnections, findConnection, connect, publish, subscribe, subscribeUntilEose, request, + getConnections, getConnection, connect, publish, subscribe, } diff --git a/src/agent/relays.ts b/src/agent/relays.ts index 43e1c536..c84373bf 100644 --- a/src/agent/relays.ts +++ b/src/agent/relays.ts @@ -1,8 +1,8 @@ import type {Relay} from 'src/util/types' import {warn} from 'src/util/logger' import {pick, objOf, map, assoc, sortBy, uniqBy, prop} from 'ramda' -import {first, createMap} from 'hurdak/lib/hurdak' -import {Tags, isRelay, findReplyId} from 'src/util/nostr' +import {first, createMap, updateIn} from 'hurdak/lib/hurdak' +import {Tags, normalizeRelayUrl, isRelay, findReplyId} from 'src/util/nostr' import {shuffle} from 'src/util/misc' import database from 'src/agent/database' import user from 'src/agent/user' @@ -126,6 +126,14 @@ export const uniqByUrl = uniqBy(prop('url')) export const sortByScore = sortBy(r => -r.score) +export const normalizeRelays = (relays: Relay[]): Relay[] => + uniqBy( + prop('url'), + relays + .filter(r => isRelay(r.url)) + .map(updateIn('url', normalizeRelayUrl)) + ) + export const sampleRelays = (relays, scale = 1) => { let limit = user.getSetting('relayLimit') diff --git a/src/agent/sync.ts b/src/agent/sync.ts index cd58b250..a8581928 100644 --- a/src/agent/sync.ts +++ b/src/agent/sync.ts @@ -54,8 +54,8 @@ const processProfileEvents = async events => { const {relays = []} = database.getPersonWithFallback(e.pubkey) return { - relays: relays.concat({url: e.content}), relays_updated_at: e.created_at, + relays: relays.concat({url: e.content}), } } }, diff --git a/src/app/alerts.js b/src/app/alerts.js index 059f92f4..fb7a2b03 100644 --- a/src/app/alerts.js +++ b/src/app/alerts.js @@ -1,48 +1,66 @@ import {get} from 'svelte/store' -import {groupBy, pluck, partition, propEq} from 'ramda' +import {uniq, partition, propEq} from 'ramda' import {createMap} from 'hurdak/lib/hurdak' import {synced, timedelta, now} from 'src/util/misc' import {isAlert, findReplyId} from 'src/util/nostr' import database from 'src/agent/database' import network from 'src/agent/network' import {getUserReadRelays} from 'src/agent/relays' -import {asDisplayEvent, mergeParents} from 'src/app' let listener const mostRecentAlert = synced("app/alerts/mostRecentAlert", 0) const lastCheckedAlerts = synced("app/alerts/lastCheckedAlerts", 0) +const asAlert = e => ({...e, replies: [], likedBy: [], isMention: false}) + const onChunk = async (pubkey, events) => { events = events.filter(e => isAlert(e, pubkey)) - if (events.length > 0) { - const parents = await network.loadParents(events) - const [likes, notes] = partition(propEq('kind', 7), events) - const annotatedNotes = mergeParents(notes.concat(parents).map(asDisplayEvent)) - const likesByParent = groupBy(findReplyId, likes) - const likedNotes = parents - .filter(e => likesByParent[e.id]) - .map(e => asDisplayEvent({...e, likedBy: pluck('pubkey', likesByParent[e.id])})) + const parents = createMap('id', await network.loadParents(events)) - await database.alerts.bulkPut(createMap('id', annotatedNotes.concat(likedNotes))) + const isPubkeyChild = e => { + const parentId = findReplyId(e) - mostRecentAlert.update($t => events.reduce((t, e) => Math.max(t, e.created_at), $t)) + return parents[parentId]?.pubkey === pubkey } + + const [likes, notes] = partition(propEq('kind', 7), events) + const [replies, mentions] = partition(isPubkeyChild, notes) + + likes.filter(isPubkeyChild).forEach(e => { + const parent = parents[findReplyId(e)] + const note = database.alerts.get(parent.id) || asAlert(parent) + + database.alerts.put({...note, likedBy: uniq(note.likedBy.concat(e.pubkey))}) + }) + + replies.forEach(e => { + const parent = parents[findReplyId(e)] + const note = database.alerts.get(parent.id) || asAlert(parent) + + database.alerts.put({...note, replies: uniq(note.replies.concat(e.pubkey))}) + }) + + mentions.forEach(e => { + const note = database.alerts.get(e.id) || asAlert(e) + + database.alerts.put({...note, isMention: true}) + }) + + mostRecentAlert.update($t => events.reduce((t, e) => Math.max(t, e.created_at), $t)) } -const load = async pubkey => { +const load = pubkey => { // Include an offset so we don't miss alerts on one relay but not another const since = get(mostRecentAlert) - timedelta(30, 'days') // Crank the threshold up since we can afford for this to be slow - const events = await network.load( - getUserReadRelays(), - {kinds: [1, 7], '#p': [pubkey], since, limit: 1000}, - {threshold: 0.9} - ) - - onChunk(pubkey, events) + network.load({ + relays: getUserReadRelays(), + filter: {kinds: [1, 7], '#p': [pubkey], since, limit: 1000}, + onChunk: events => onChunk(pubkey, events) + }) } const listen = async pubkey => { @@ -50,11 +68,11 @@ const listen = async pubkey => { listener.unsub() } - listener = await network.listen( - getUserReadRelays(), - {kinds: [1, 7], '#p': [pubkey], since: now()}, - events => onChunk(pubkey, events) - ) + listener = await network.listen({ + relays: getUserReadRelays(), + filter: {kinds: [1, 7], '#p': [pubkey], since: now()}, + onChunk: events => onChunk(pubkey, events) + }) } export default {load, listen, mostRecentAlert, lastCheckedAlerts} diff --git a/src/app/connection.js b/src/app/connection.js index 939f8a4e..b78e35f7 100644 --- a/src/app/connection.js +++ b/src/app/connection.js @@ -11,13 +11,13 @@ setInterval(() => { const relayUrls = new Set(pluck('url', getUserRelays())) // Prune connections we haven't used in a while - pool.getConnections() + Object.values(pool.getConnections()) .filter(conn => conn.lastRequest < Date.now() - 60_000) - .forEach(conn => conn.disconnect()) + .forEach(conn => conn.nostr.close()) // Alert the user to any heinously slow connections slowConnections.set( - pool.getConnections() + Object.values(pool.getConnections()) .filter(c => relayUrls.has(c.nostr.url) && first(c.getQuality()) < 0.3) ) }, 30_000) diff --git a/src/app/messages.js b/src/app/messages.js index 57eaa93a..4ba642e6 100644 --- a/src/app/messages.js +++ b/src/app/messages.js @@ -29,11 +29,13 @@ const listen = async pubkey => { listener.unsub() } - listener = await network.listen( - getUserReadRelays(), - [{kinds: [4], authors: [pubkey], since}, - {kinds: [4], '#p': [pubkey], since}], - async events => { + listener = await network.listen({ + relays: getUserReadRelays(), + filter: [ + {kinds: [4], authors: [pubkey], since}, + {kinds: [4], '#p': [pubkey], since}, + ], + onChunk: async events => { // Reload annotated messages, don't alert about messages to self const messages = reject(e => e.pubkey === e.recipient, await database.messages.all()) @@ -50,8 +52,8 @@ const listen = async pubkey => { return o }) } - } - ) + }, + }) } export default {listen, mostRecentByPubkey, lastCheckedByPubkey, hasNewMessages} diff --git a/src/partials/Like.svelte b/src/partials/Alert.svelte similarity index 78% rename from src/partials/Like.svelte rename to src/partials/Alert.svelte index 08a0fbdb..ec89771b 100644 --- a/src/partials/Like.svelte +++ b/src/partials/Alert.svelte @@ -1,7 +1,6 @@
diff --git a/src/routes/Alerts.svelte b/src/routes/Alerts.svelte index d581ff71..a0d1f7d4 100644 --- a/src/routes/Alerts.svelte +++ b/src/routes/Alerts.svelte @@ -2,14 +2,16 @@ import {sortBy} from 'ramda' import {onMount} from 'svelte' import {fly} from 'svelte/transition' - import {now, createScroller} from 'src/util/misc' - import Note from 'src/partials/Note.svelte' + import {ellipsize} from 'hurdak/lib/hurdak' + import {now, formatTimestamp, createScroller} from 'src/util/misc' import Spinner from 'src/partials/Spinner.svelte' import Content from 'src/partials/Content.svelte' - import Like from 'src/partials/Like.svelte' + import Badge from "src/partials/Badge.svelte" + import Alert from 'src/partials/Alert.svelte' import database from 'src/agent/database' import alerts from 'src/app/alerts' import {asDisplayEvent} from 'src/app' + import {modal} from 'src/app/ui' let limit = 0 let notes = null @@ -29,12 +31,28 @@ {#if notes} - {#each notes as e (e.id)} + {#each notes as note (note.id)}
- {#if e.likedBy} - + {#if note.replies.length > 0} + + {:else if note.likedBy.length > 0} + {:else} - + {/if}
{:else} diff --git a/src/routes/Chat.svelte b/src/routes/Chat.svelte index 0a815aab..4919eb8d 100644 --- a/src/routes/Chat.svelte +++ b/src/routes/Chat.svelte @@ -1,6 +1,5 @@ diff --git a/src/routes/ChatRoom.svelte b/src/routes/ChatRoom.svelte index d9addf91..0cc82810 100644 --- a/src/routes/ChatRoom.svelte +++ b/src/routes/ChatRoom.svelte @@ -16,28 +16,31 @@ const room = database.watch('rooms', rooms => rooms.get(roomId)) const listenForMessages = async cb => { - const relays = getRelaysForEventChildren($room) - - return network.listen( - relays, - // Listen for updates to the room in case we didn't get them before - [{kinds: [40, 41], ids: [roomId]}, - {kinds: [42], '#e': [roomId], since: now()}], - events => { + // Listen for updates to the room in case we didn't get them before + return network.listen({ + relays: getRelaysForEventChildren($room), + filter: [ + {kinds: [40, 41], ids: [roomId]}, + {kinds: [42], '#e': [roomId], since: now()}, + ], + onChunk: events => { network.loadPeople(pluck('pubkey', events)) cb(events.filter(e => e.kind === 42)) - } - ) + }, + }) } - const loadMessages = async ({until, limit}) => { - const relays = getRelaysForEventChildren($room) - const events = await network.load(relays, {kinds: [42], '#e': [roomId], until, limit}) + const loadMessages = ({until, limit}, onChunk) => { + network.load({ + relays: getRelaysForEventChildren($room), + filter: {kinds: [42], '#e': [roomId], until, limit}, + onChunk: events => { + network.loadPeople(pluck('pubkey', events)) - network.loadPeople(pluck('pubkey', events)) - - return events + onChunk(events) + }, + }) } const editRoom = () => { diff --git a/src/routes/Messages.svelte b/src/routes/Messages.svelte index 641c189f..4801d1ef 100644 --- a/src/routes/Messages.svelte +++ b/src/routes/Messages.svelte @@ -35,19 +35,21 @@ return events.map(renameProp('decryptedContent', 'content')) } - const listenForMessages = cb => network.listen( - getRelays(), - [{kinds: personKinds, authors: [pubkey]}, - {kinds: [4], authors: [user.getPubkey()], '#p': [pubkey]}, - {kinds: [4], authors: [pubkey], '#p': [user.getPubkey()]}], - async events => { + const listenForMessages = cb => network.listen({ + relays: getRelays(), + filter: [ + {kinds: personKinds, authors: [pubkey]}, + {kinds: [4], authors: [user.getPubkey()], '#p': [pubkey]}, + {kinds: [4], authors: [pubkey], '#p': [user.getPubkey()]}, + ], + onChunk: async events => { // Reload from db since we annotate messages there const messageIds = pluck('id', events.filter(e => e.kind === 4)) const messages = await database.messages.all({id: messageIds}) cb(await decryptMessages(messages)) - } - ) + }, + }) const loadMessages = async ({until, limit}) => { const fromThem = await database.messages.all({pubkey}) @@ -55,7 +57,7 @@ const events = fromThem.concat(toThem).filter(e => e.created_at < until) const messages = sortBy(e => -e.created_at, events).slice(0, limit) - return await decryptMessages(messages) + return await decryptMessages(messages) } const sendMessage = async content => { diff --git a/src/routes/Person.svelte b/src/routes/Person.svelte index bf0419de..db3849a1 100644 --- a/src/routes/Person.svelte +++ b/src/routes/Person.svelte @@ -18,7 +18,7 @@ import Likes from "src/views/person/Likes.svelte" import Relays from "src/views/person/Relays.svelte" import user from "src/agent/user" - import {getUserReadRelays, getPubkeyWriteRelays} from "src/agent/relays" + import {sampleRelays, getPubkeyWriteRelays} from "src/agent/relays" import network from "src/agent/network" import keys from "src/agent/keys" import database from "src/agent/database" @@ -43,12 +43,6 @@ onMount(async () => { log('Person', npub, person) - // Add all the relays we know the person uses, as well as our own - // in case we don't have much information - relays = relays - .concat(getPubkeyWriteRelays(pubkey).slice(0, 3)) - .concat(getUserReadRelays().slice(0, 3)) - // Refresh our person network.loadPeople([pubkey], {force: true}).then(() => { person = database.getPersonWithFallback(pubkey) @@ -64,18 +58,18 @@ }) // Round out our followers count - await network.listenUntilEose( - relays, - [{kinds: [3], '#p': [pubkey]}], - events => { + await network.load({ + shouldProcess: false, + relays: sampleRelays(getPubkeyWriteRelays(pubkey)), + filter: [{kinds: [3], '#p': [pubkey]}], + onChunk: events => { for (const e of events) { followers.add(e.pubkey) } followersCount.set(followers.size) }, - {shouldProcess: false}, - ) + }) }) const setActiveTab = tab => navigate(routes.person(pubkey, tab)) diff --git a/src/routes/RelayList.svelte b/src/routes/RelayList.svelte index 1a799000..5878a25f 100644 --- a/src/routes/RelayList.svelte +++ b/src/routes/RelayList.svelte @@ -26,7 +26,7 @@ interact with the network, but you can join as many as you like.

{#if $relays.length === 0} -
+
No relays connected
diff --git a/src/util/nostr.js b/src/util/nostr.js index 1e25d3d3..451b6626 100644 --- a/src/util/nostr.js +++ b/src/util/nostr.js @@ -94,6 +94,6 @@ export const isRelay = url => ( && !url.slice(6).match(/\/npub/) ) -export const normalizeRelayUrl = url => url.replace(/\/+$/, '') +export const normalizeRelayUrl = url => url.replace(/\/+$/, '').toLowerCase() export const roomAttrs = ['name', 'about', 'picture'] diff --git a/src/views/ConnectUser.svelte b/src/views/ConnectUser.svelte index f047f9c1..d51826ca 100644 --- a/src/views/ConnectUser.svelte +++ b/src/views/ConnectUser.svelte @@ -1,11 +1,12 @@ - - Connect to Nostr + + Connect to Nostr

We're searching for your profile on the network. If you'd like to select your relays manually instead, click here.

{#if currentRelays.length > 0} -

- Currently searching: -

-
    - {#each currentRelays as relay} -
  • {last(relay.url.split('://'))}
  • - {/each} -
+

Currently searching:

+ {#each currentRelays as relay} +
+ {#if relay} + + {/if} +
+ {/each} {/if}
diff --git a/src/views/NoteDetail.svelte b/src/views/NoteDetail.svelte index b89e262f..21f33505 100644 --- a/src/views/NoteDetail.svelte +++ b/src/views/NoteDetail.svelte @@ -17,7 +17,13 @@ onMount(async () => { if (!note.pubkey) { - note = first(await network.load(relays, {ids: [note.id]})) + await network.load({ + relays, + filter: {kinds: [1], ids: [note.id]}, + onChunk: events => { + note = first(events) + }, + }) } if (note) { diff --git a/src/views/SearchPeople.svelte b/src/views/SearchPeople.svelte index 58105402..061dedd2 100644 --- a/src/views/SearchPeople.svelte +++ b/src/views/SearchPeople.svelte @@ -24,7 +24,10 @@ }) // Prime our database, in case we don't have any people stored yet - network.listenUntilEose(getUserReadRelays(), {kinds: personKinds, limit: 50}) + network.load({ + relays: getUserReadRelays(), + filter: {kinds: personKinds, limit: 10}, + })