diff --git a/src/agent/state.ts b/src/agent/state.ts index 66f641cd..e12b333f 100644 --- a/src/agent/state.ts +++ b/src/agent/state.ts @@ -22,4 +22,4 @@ export const onReady = cb => { }) } -window.state = {people, contacts, rooms, alerts, relays, routes} +(window as any).t = {people, contacts, rooms, alerts, relays, routes} diff --git a/src/agent/database.ts b/src/agent/storage.ts similarity index 100% rename from src/agent/database.ts rename to src/agent/storage.ts diff --git a/src/agent/sync.ts b/src/agent/sync.ts index e7daef7f..96031b5f 100644 --- a/src/agent/sync.ts +++ b/src/agent/sync.ts @@ -1,289 +1,49 @@ -import {uniq, pick, identity, isEmpty} from 'ramda' -import {nip05} from 'nostr-tools' -import {noop, ensurePlural, chunk, switcherFn} from 'hurdak/lib/hurdak' -import {log} from 'src/util/logger' -import {lnurlEncode, tryFunc, lnurlDecode, tryFetch, now, sleep, tryJson, timedelta, shuffle, hash} from 'src/util/misc' -import {Tags, roomAttrs, personKinds, isRelay, isShareableRelay, normalizeRelayUrl} from 'src/util/nostr' -import {getPersonWithFallback, people, relays, rooms, routes} from 'src/agent/state' +import {uniq, pick, identity} from "ramda" +import {nip05} from "nostr-tools" +import {noop, ensurePlural, chunk} from "hurdak/lib/hurdak" +import { + lnurlEncode, + tryFunc, + lnurlDecode, + tryFetch, + now, + sleep, + tryJson, + timedelta, + hash, +} from "src/util/misc" +import { + Tags, + roomAttrs, + isRelay, + isShareableRelay, + normalizeRelayUrl, +} from "src/util/nostr" +import {getPersonWithFallback, people, relays, rooms, routes} from "src/agent/state" +import {uniqByUrl} from "src/agent/relays" + +const handlers = {} + +const addHandler = (kind, f) => (handlers[kind] || []).push(f) const processEvents = async events => { - await Promise.all([ - batchProcess(processProfileEvents, events), - batchProcess(processRoomEvents, events), - batchProcess(processRoutes, events), - ]) -} - -const batchProcess = async (processChunk, events) => { const chunks = chunk(100, ensurePlural(events)) - // Don't lock everything up when processing a lot of events for (let i = 0; i < chunks.length; i++) { - processChunk(chunks[i]) + for (const event of chunks[i]) { + for (const handler of handlers[event.kind] || []) { + handler(event) + } + } + // Don't lock up the ui when processing a lot of events if (i < chunks.length - 1) { await sleep(30) } } } -const processProfileEvents = async events => { - const profileEvents = events.filter(e => personKinds.includes(e.kind)) - - for (const e of profileEvents) { - const person = getPersonWithFallback(e.pubkey) - - people.put({ - ...person, - pubkey: e.pubkey, - ...switcherFn(e.kind, { - 0: () => tryJson(() => { - const kind0 = JSON.parse(e.content) - - if (e.created_at >= (person.kind0_updated_at || 0)) { - if (kind0.nip05) { - verifyNip05(e.pubkey, kind0.nip05) - } - - const address = kind0.lud16 || kind0.lud06 - - if (address) { - verifyZapper(e.pubkey, address.toLowerCase()) - } - - return { - kind0: {...person?.kind0, ...kind0}, - kind0_updated_at: e.created_at, - } - } - }), - 2: () => { - if (e.created_at > (person.relays_updated_at || 0)) { - const {relays = []} = getPersonWithFallback(e.pubkey) - - return { - relays_updated_at: e.created_at, - relays: relays.concat({url: e.content}), - } - } - }, - 3: () => { - const data = {petnames: e.tags} - - if (e.content && e.created_at > (person.relays_updated_at || 0)) { - tryJson(() => { - Object.assign(data, { - relays_updated_at: e.created_at, - relays: Object.entries(JSON.parse(e.content)) - .map(([url, conditions]) => { - const {write, read} = conditions as Record - - return { - url, - write: [false, '!'].includes(write) ? false : true, - read: [false, '!'].includes(read) ? false : true, - } - }) - .filter(r => isRelay(r.url)), - }) - }) - } - - return data - }, - 10000: () => { - if (e.created_at > (person.mutes_updated_at || 0)) { - return { - mutes_updated_at: e.created_at, - mutes: e.tags, - } - } - }, - // DEPRECATED - 12165: () => { - if (e.created_at > (person.mutes_updated_at || 0)) { - return { - mutes_updated_at: e.created_at, - mutes: e.tags, - } - } - }, - // DEPRECATED - 10001: () => { - if (e.created_at > (person.relays_updated_at || 0)) { - return { - relays_updated_at: e.created_at, - relays: e.tags.map(([url, read, write]) => - ({url, read: read !== '!', write: write !== '!'})), - } - } - }, - 10002: () => { - if (e.created_at > (person.relays_updated_at || 0)) { - return { - relays_updated_at: e.created_at, - relays: e.tags.map(([_, url, mode]) => { - const read = (mode || 'read') === 'read' - const write = (mode || 'write') === 'write' - - return {url, read, write} - }), - } - } - }, - default: () => { - log(`Received unsupported event type ${e.kind}`) - }, - }), - updated_at: now(), - }) - } -} - -// Chat rooms - -const processRoomEvents = events => { - const roomEvents = events.filter(e => [40, 41].includes(e.kind)) - - for (const e of roomEvents) { - const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content))) - const roomId = e.kind === 40 ? e.id : Tags.from(e).type("e").values().first() - - // Ignore non-standard rooms that don't have a name - if (!roomId || !content?.name) { - continue - } - - const room = rooms.get(roomId) - - // Don't let old edits override new ones - if (room?.updated_at >= e.created_at) { - continue - } - - rooms.put({ - ...room, - ...content, - id: roomId, - pubkey: e.pubkey, - updated_at: e.created_at, - }) - } -} - -// Routes - -const getWeight = type => { - if (type === 'nip05') return 1 - if (type === 'kind:10002') return 1 - if (type === 'kind:3') return 0.8 - if (type === 'kind:2') return 0.5 - if (type === 'seen') return 0.2 -} - -const calculateRoute = (pubkey, rawUrl, type, mode, created_at) => { - if (!isShareableRelay(rawUrl)) { - return - } - - const url = normalizeRelayUrl(rawUrl) - const id = hash([pubkey, url, mode].join('')).toString() - const score = getWeight(type) * (1 - (now() - created_at) / timedelta(30, 'days')) - const defaults = {id, pubkey, url, mode, score: 0, count: 0, types: []} - const route = routes.get(id) || defaults - - const newTotalScore = route.score * route.count + score - const newCount = route.count + 1 - - if (score > 0) { - return { - ...route, - count: newCount, - score: newTotalScore / newCount, - types: uniq(route.types.concat(type)), - last_seen: Math.max(created_at, route.last_seen || 0), - } - } -} - -const processRoutes = async events => { - let updates = [] - - // Sample events so we're not burning too many resources - for (const e of shuffle(events).slice(0, 10)) { - switcherFn(e.kind, { - 0: () => { - updates.push( - calculateRoute(e.pubkey, e.content, 'seen', 'write', e.created_at) - ) - }, - 2: () => { - updates.push( - calculateRoute(e.pubkey, e.content, 'kind:2', 'read', e.created_at) - ) - updates.push( - calculateRoute(e.pubkey, e.content, 'kind:2', 'write', e.created_at) - ) - }, - 3: () => { - if (e.content) { - tryJson(() => { - Object.entries(JSON.parse(e.content)) - .forEach(([url, conditions]) => { - const {write, read} = conditions as Record - - if (![false, '!'].includes(write)) { - updates.push( - calculateRoute(e.pubkey, url, 'kind:3', 'write', e.created_at) - ) - } - - if (![false, '!'].includes(read)) { - updates.push( - calculateRoute(e.pubkey, url, 'kind:3', 'read', e.created_at) - ) - } - }) - }) - } - }, - // DEPRECATED - 10001: () => { - e.tags - .forEach(([url, read, write]) => { - if (write !== '!') { - calculateRoute(e.pubkey, url, 'kind:10002', 'write', e.created_at) - } - - if (read !== '!') { - calculateRoute(e.pubkey, url, 'kind:10002', 'read', e.created_at) - } - }) - }, - 10002: () => { - e.tags - .forEach(([_, url, mode]) => { - if (mode) { - calculateRoute(e.pubkey, url, 'kind:10002', mode, e.created_at) - } else { - calculateRoute(e.pubkey, url, 'kind:10002', 'read', e.created_at) - calculateRoute(e.pubkey, url, 'kind:10002', 'write', e.created_at) - } - }) - }, - default: noop, - }) - } - - updates = updates.filter(identity) - - if (!isEmpty(updates)) { - await relays.bulkPatch(updates.map(pick(['url']))) - await routes.bulkPut(updates) - } -} - -// Utils +// People const verifyNip05 = (pubkey, as) => nip05.queryProfile(as).then(result => { @@ -298,10 +58,12 @@ const verifyNip05 = (pubkey, as) => relays.bulkPatch(urls.map(url => ({url: normalizeRelayUrl(url)}))) routes.bulkPut( - urls.flatMap(url => [ - calculateRoute(pubkey, url, 'nip05', 'write', now()), - calculateRoute(pubkey, url, 'nip05', 'read', now()), - ]).filter(identity) + urls + .flatMap(url => [ + addRoute(pubkey, url, "nip05", "write", now()), + addRoute(pubkey, url, "nip05", "read", now()), + ]) + .filter(identity) ) } } @@ -311,10 +73,10 @@ const verifyZapper = async (pubkey, address) => { let url // Try to parse it as a lud06 LNURL or as a lud16 address - if (address.startsWith('lnurl1')) { + if (address.startsWith("lnurl1")) { url = tryFunc(() => lnurlDecode(address)) - } else if (address.includes('@')) { - const [name, domain] = address.split('@') + } else if (address.includes("@")) { + const [name, domain] = address.split("@") if (domain && name) { url = `https://${domain}/.well-known/lnurlp/${name}` @@ -327,11 +89,259 @@ const verifyZapper = async (pubkey, address) => { const res = await tryFetch(() => fetch(url)) const zapper = await tryJson(() => res?.json()) - const lnurl = lnurlEncode('lnurl', url) + const lnurl = lnurlEncode("lnurl", url) if (zapper?.allowsNostr && zapper?.nostrPubkey) { people.patch({pubkey, zapper, lnurl}) } } +addHandler(0, e => { + tryJson(() => { + const kind0 = JSON.parse(e.content) + + const person = people.get(e.pubkey) + + if (e.created_at < person?.kind0_updated_at) { + return + } + + if (kind0.nip05) { + verifyNip05(e.pubkey, kind0.nip05) + } + + const address = kind0.lud16 || kind0.lud06 + + if (address) { + verifyZapper(e.pubkey, address.toLowerCase()) + } + + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + kind0: {...person?.kind0, ...kind0}, + kind0_updated_at: e.created_at, + }) + }) +}) + +addHandler(2, e => { + const person = people.get(e.pubkey) + + if (e.created_at < person?.relays_updated_at) { + return + } + + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + relays_updated_at: e.created_at, + relays: uniqByUrl((person?.relays || []).concat({url: e.content})), + }) +}) + +addHandler(3, e => { + const person = people.get(e.pubkey) + + if (e.created_at > (person?.petnames_updated_at || 0)) { + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + petnames_updated_at: e.created_at, + petnames: e.tags.filter(t => t[0] === "p"), + }) + } + + if (e.created_at > (person.relays_updated_at || 0)) { + tryJson(() => { + people.patch({ + pubkey: e.pubkey, + relays_updated_at: e.created_at, + relays: Object.entries(JSON.parse(e.content)) + .map(([url, conditions]) => { + const {write, read} = conditions as Record + + return { + url: normalizeRelayUrl(url), + write: [false, "!"].includes(write) ? false : true, + read: [false, "!"].includes(read) ? false : true, + } + }) + .filter(r => isRelay(r.url)), + }) + }) + } +}) + +addHandler(10000, e => { + const person = people.get(e.pubkey) + + if (e.created_at < person?.mutes_updated_at) { + return + } + + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + mutes_updated_at: e.created_at, + mutes: e.tags, + }) +}) + +// DEPRECATED +addHandler(12165, e => { + const person = people.get(e.pubkey) + + if (e.created_at < person?.mutes_updated_at) { + return + } + + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + mutes_updated_at: e.created_at, + mutes: e.tags, + }) +}) + +addHandler(10002, e => { + const person = people.get(e.pubkey) + + if (e.created_at < person?.relays_updated_at) { + return + } + + people.patch({ + pubkey: e.pubkey, + updated_at: now(), + relays_updated_at: e.created_at, + relays: e.tags.map(([_, url, mode]) => { + const read = (mode || "read") === "read" + const write = (mode || "write") === "write" + + return {url, read, write} + }), + }) +}) + +// Rooms + +addHandler(40, e => { + const room = rooms.get(e.id) + + if (e.created_at < room?.updated_at) { + return + } + + const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content))) + + if (!content?.name) { + return + } + + rooms.patch({ + id: e.id, + pubkey: e.pubkey, + updated_at: e.created_at, + ...content, + }) +}) + +addHandler(41, e => { + const roomId = Tags.from(e).type("e").values().first() + + if (!roomId) { + return + } + + const room = rooms.get(roomId) + + if (e.created_at < room?.updated_at) { + return + } + + const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content))) + + if (!content?.name) { + return + } + + rooms.patch({ + id: roomId, + pubkey: e.pubkey, + updated_at: e.created_at, + ...content, + }) +}) + +// Routes + +const getWeight = type => { + if (type === "nip05") return 1 + if (type === "kind:10002") return 1 + if (type === "kind:3") return 0.8 + if (type === "kind:2") return 0.5 +} + +const addRoute = (pubkey, rawUrl, type, mode, created_at) => { + if (!isShareableRelay(rawUrl)) { + return + } + + const url = normalizeRelayUrl(rawUrl) + const id = hash([pubkey, url, mode].join("")).toString() + const score = getWeight(type) * (1 - (now() - created_at) / timedelta(30, "days")) + const defaults = {id, pubkey, url, mode, score: 0, count: 0, types: []} + const route = routes.get(id) || defaults + + const newTotalScore = route.score * route.count + score + const newCount = route.count + 1 + + if (score > 0) { + relays.patch({ + url: route.url, + }) + + routes.put({ + ...route, + count: newCount, + score: newTotalScore / newCount, + types: uniq(route.types.concat(type)), + last_seen: Math.max(created_at, route.last_seen || 0), + }) + } +} + +addHandler(2, e => { + addRoute(e.pubkey, e.content, "kind:2", "read", e.created_at) + addRoute(e.pubkey, e.content, "kind:2", "write", e.created_at) +}) + +addHandler(3, e => { + tryJson(() => { + Object.entries(JSON.parse(e.content || "")).forEach(([url, conditions]) => { + const {write, read} = conditions as Record + + if (![false, "!"].includes(write)) { + addRoute(e.pubkey, url, "kind:3", "write", e.created_at) + } + + if (![false, "!"].includes(read)) { + addRoute(e.pubkey, url, "kind:3", "read", e.created_at) + } + }) + }) +}) + +addHandler(10002, e => { + e.tags.forEach(([_, url, mode]) => { + if (mode) { + addRoute(e.pubkey, url, "kind:10002", mode, e.created_at) + } else { + addRoute(e.pubkey, url, "kind:10002", "read", e.created_at) + addRoute(e.pubkey, url, "kind:10002", "write", e.created_at) + } + }) +}) + export default {processEvents} diff --git a/src/agent/table.ts b/src/agent/table.ts index ed586982..55870a91 100644 --- a/src/agent/table.ts +++ b/src/agent/table.ts @@ -6,7 +6,7 @@ import {writable} from 'svelte/store' import {isObject, mapValues, ensurePlural} from 'hurdak/lib/hurdak' import {log} from 'src/util/logger' import {where} from 'src/util/misc' -import {lf} from 'src/agent/database' +import {lf} from 'src/agent/storage' // Local copy of data so we can provide a sync observable interface. The worker // is just for storing data and processing expensive queries