Add persistent subscription for new notifications

This commit is contained in:
Jonathan Staab 2023-09-15 11:09:32 -07:00
parent dab1dc1061
commit 358809fe22
5 changed files with 58 additions and 56 deletions

View File

@ -2,26 +2,22 @@ import Bugsnag from "@bugsnag/js"
import {nip19} from "nostr-tools" import {nip19} from "nostr-tools"
import {navigate} from "svelte-routing" import {navigate} from "svelte-routing"
import {writable} from "svelte/store" import {writable} from "svelte/store"
import {path, filter, pluck, sortBy, slice} from "ramda" import {hash, union, sleep} from "hurdak"
import {hash, union, sleep, doPipe} from "hurdak"
import {warn} from "src/util/logger" import {warn} from "src/util/logger"
import {now} from "src/util/misc" import {now} from "src/util/misc"
import {userKinds, noteKinds} from "src/util/nostr" import {userKinds} from "src/util/nostr"
import {modal, toast} from "src/partials/state" import {modal, toast} from "src/partials/state"
import type {Event} from "src/engine2"
import { import {
env, env,
pool, pool,
session, session,
follows,
loadDeletes, loadDeletes,
loadPubkeys, loadPubkeys,
channels,
follows,
subscribe,
getUserRelayUrls, getUserRelayUrls,
listenForNotifications,
getSetting, getSetting,
dufflepud, dufflepud,
events,
} from "src/engine2" } from "src/engine2"
// Routing // Routing
@ -111,43 +107,6 @@ setInterval(() => {
// Synchronization from events to state // Synchronization from events to state
let listener
let timeout
export const listenForNotifications = async () => {
const {pubkey} = session.get()
const channelIds = pluck("id", channels.get().filter(path(["nip28", "joined"])))
const eventIds: string[] = doPipe(events.get(), [
filter((e: Event) => noteKinds.includes(e.kind)),
sortBy((e: Event) => -e.created_at),
slice(0, 256),
pluck("id"),
])
// Only grab one event from each category/relay so we have enough to show
// the notification badges, but load the details lazily
listener?.close()
listener = subscribe({
relays: getUserRelayUrls("read"),
filters: [
// Messages
{kinds: [4], authors: [pubkey], limit: 1},
{kinds: [4], "#p": [pubkey], limit: 1},
// {kinds: [1059], "#p": [pubkey], limit: 1},
// Chat
{kinds: [42], "#e": channelIds, limit: 1},
// Mentions/replies
{kinds: noteKinds, "#p": [pubkey], limit: 1},
{kinds: noteKinds, "#e": eventIds, limit: 1},
],
})
clearTimeout(timeout)
timeout = setTimeout(listenForNotifications, 3 * 60_000)
}
export const loadAppData = async () => { export const loadAppData = async () => {
const {pubkey} = session.get() const {pubkey} = session.get()

View File

@ -21,8 +21,8 @@
publishProfile, publishProfile,
publishRelays, publishRelays,
loginWithPrivateKey, loginWithPrivateKey,
listenForNotifications,
} from "src/engine2" } from "src/engine2"
import {listenForNotifications} from "src/app/state"
import {modal} from "src/partials/state" import {modal} from "src/partials/state"
export let stage export let stage

View File

@ -1,4 +1,4 @@
import {sortBy, identity, find, pipe, filter, path, whereEq} from "ramda" import {sortBy, identity, find, filter, path, whereEq} from "ramda"
import {fuzzy} from "src/util/misc" import {fuzzy} from "src/util/misc"
import type {Channel} from "src/engine2/model" import type {Channel} from "src/engine2/model"
import {channels} from "src/engine2/state" import {channels} from "src/engine2/state"
@ -37,8 +37,8 @@ export const nip28ChannelsWithMeta = channels
.throttle(300) .throttle(300)
.derived(filter((c: Channel) => c.meta && c.type === "nip28")) .derived(filter((c: Channel) => c.meta && c.type === "nip28"))
export const nip28ChannelsForUser = nip28ChannelsWithMeta.derived(filter(path(["nip28", "joined"])))
export const searchNip28Channels = nip28ChannelsWithMeta.derived(getChannelSearch) export const searchNip28Channels = nip28ChannelsWithMeta.derived(getChannelSearch)
export const hasNewNip28Messages = nip28ChannelsWithMeta.derived( export const hasNewNip28Messages = nip28ChannelsForUser.derived(find(hasNewMessages))
pipe(filter(path(["nip28", "joined"])), find(hasNewMessages))
)

View File

@ -1,12 +1,12 @@
import {pluck, without, sortBy} from "ramda" import {pluck, slice, filter, without, sortBy} from "ramda"
import {seconds, batch} from "hurdak" import {seconds, batch, doPipe} from "hurdak"
import {now} from "src/util/misc" import {now} from "src/util/misc"
import type {Event} from "src/engine2/model" import type {Event} from "src/engine2/model"
import {EventKind} from "src/engine2/model" import {EventKind} from "src/engine2/model"
import {noteKinds, reactionKinds} from "src/util/nostr" import {noteKinds, reactionKinds} from "src/util/nostr"
import {env, sessions, events, notificationsLastChecked} from "src/engine2/state" import {env, sessions, events, notificationsLastChecked} from "src/engine2/state"
import {mergeHints, getPubkeyHints} from "src/engine2/queries" import {mergeHints, getPubkeyHints, nip28ChannelsForUser} from "src/engine2/queries"
import {subscribe} from "./subscription" import {subscribe, subscribePersistent} from "./subscription"
import {ContextLoader} from "./context" import {ContextLoader} from "./context"
export const loadNotifications = () => { export const loadNotifications = () => {
@ -54,3 +54,31 @@ export const loadNotifications = () => {
}), }),
}) })
} }
export const listenForNotifications = async () => {
const pubkeys = Object.keys(sessions.get())
const channelIds = pluck("id", nip28ChannelsForUser.get())
const eventIds: string[] = doPipe(events.get(), [
filter((e: Event) => noteKinds.includes(e.kind)),
sortBy((e: Event) => -e.created_at),
slice(0, 256),
pluck("id"),
])
// Only grab one event from each category/relay so we have enough to show
// the notification badges, but load the details lazily
subscribePersistent({
relays: mergeHints(pubkeys.map(pk => getPubkeyHints(pk, "read"))),
filters: [
// Messages
{kinds: [4], "#p": pubkeys, limit: 1},
{kinds: [1059], "#p": pubkeys, limit: 1},
// Chat
{kinds: [42], "#e": channelIds, limit: 1},
// Mentions/replies
{kinds: noteKinds, "#p": pubkeys, limit: 1},
{kinds: noteKinds, "#e": eventIds, limit: 1},
],
})
}

View File

@ -1,9 +1,11 @@
import {verifySignature, matchFilters} from "nostr-tools" import {verifySignature, matchFilters} from "nostr-tools"
import type {Executor} from "paravel" import type {Executor} from "paravel"
import EventEmitter from "events" import EventEmitter from "events"
import {defer, tryFunc} from "hurdak" import {assoc, map} from "ramda"
import {defer, tryFunc, updateIn} from "hurdak"
import {warn, info} from "src/util/logger" import {warn, info} from "src/util/logger"
import {isShareableRelay} from "src/util/nostr" import {isShareableRelay} from "src/util/nostr"
import {now} from "src/util/misc"
import type {Event, Filter} from "src/engine2/model" import type {Event, Filter} from "src/engine2/model"
import {getUrls, getExecutor} from "src/engine2/queries" import {getUrls, getExecutor} from "src/engine2/queries"
import {projections} from "src/engine2/projections" import {projections} from "src/engine2/projections"
@ -29,7 +31,11 @@ export class Subscription extends EventEmitter {
constructor(readonly opts: SubscriptionOpts) { constructor(readonly opts: SubscriptionOpts) {
super() super()
const {timeout, relays, filters} = opts this.start()
}
start = () => {
const {timeout, relays, filters} = this.opts
if (timeout) { if (timeout) {
setTimeout(this.close, timeout) setTimeout(this.close, timeout)
@ -122,3 +128,12 @@ export const subscribe = (opts: SubscribeOpts) => {
return sub return sub
} }
export const subscribePersistent = async (opts: SubscribeOpts) => {
/* eslint no-constant-condition: 0 */
while (true) {
await subscribe(opts).result
opts = updateIn("filters", map(assoc("since", now())), opts)
}
}