Re-work sync to be a bit more elegant

This commit is contained in:
Jonathan Staab 2023-03-14 12:09:35 -05:00
parent de91a06806
commit bd2bceaecc
4 changed files with 294 additions and 284 deletions

View File

@ -22,4 +22,4 @@ export const onReady = cb => {
})
}
window.state = {people, contacts, rooms, alerts, relays, routes}
(window as any).t = {people, contacts, rooms, alerts, relays, routes}

View File

@ -1,289 +1,49 @@
import {uniq, pick, identity, isEmpty} from 'ramda'
import {nip05} from 'nostr-tools'
import {noop, ensurePlural, chunk, switcherFn} from 'hurdak/lib/hurdak'
import {log} from 'src/util/logger'
import {lnurlEncode, tryFunc, lnurlDecode, tryFetch, now, sleep, tryJson, timedelta, shuffle, hash} from 'src/util/misc'
import {Tags, roomAttrs, personKinds, isRelay, isShareableRelay, normalizeRelayUrl} from 'src/util/nostr'
import {getPersonWithFallback, people, relays, rooms, routes} from 'src/agent/state'
import {uniq, pick, identity} from "ramda"
import {nip05} from "nostr-tools"
import {noop, ensurePlural, chunk} from "hurdak/lib/hurdak"
import {
lnurlEncode,
tryFunc,
lnurlDecode,
tryFetch,
now,
sleep,
tryJson,
timedelta,
hash,
} from "src/util/misc"
import {
Tags,
roomAttrs,
isRelay,
isShareableRelay,
normalizeRelayUrl,
} from "src/util/nostr"
import {getPersonWithFallback, people, relays, rooms, routes} from "src/agent/state"
import {uniqByUrl} from "src/agent/relays"
const handlers = {}
const addHandler = (kind, f) => (handlers[kind] || []).push(f)
const processEvents = async events => {
await Promise.all([
batchProcess(processProfileEvents, events),
batchProcess(processRoomEvents, events),
batchProcess(processRoutes, events),
])
}
const batchProcess = async (processChunk, events) => {
const chunks = chunk(100, ensurePlural(events))
// Don't lock everything up when processing a lot of events
for (let i = 0; i < chunks.length; i++) {
processChunk(chunks[i])
for (const event of chunks[i]) {
for (const handler of handlers[event.kind] || []) {
handler(event)
}
}
// Don't lock up the ui when processing a lot of events
if (i < chunks.length - 1) {
await sleep(30)
}
}
}
const processProfileEvents = async events => {
const profileEvents = events.filter(e => personKinds.includes(e.kind))
for (const e of profileEvents) {
const person = getPersonWithFallback(e.pubkey)
people.put({
...person,
pubkey: e.pubkey,
...switcherFn(e.kind, {
0: () => tryJson(() => {
const kind0 = JSON.parse(e.content)
if (e.created_at >= (person.kind0_updated_at || 0)) {
if (kind0.nip05) {
verifyNip05(e.pubkey, kind0.nip05)
}
const address = kind0.lud16 || kind0.lud06
if (address) {
verifyZapper(e.pubkey, address.toLowerCase())
}
return {
kind0: {...person?.kind0, ...kind0},
kind0_updated_at: e.created_at,
}
}
}),
2: () => {
if (e.created_at > (person.relays_updated_at || 0)) {
const {relays = []} = getPersonWithFallback(e.pubkey)
return {
relays_updated_at: e.created_at,
relays: relays.concat({url: e.content}),
}
}
},
3: () => {
const data = {petnames: e.tags}
if (e.content && e.created_at > (person.relays_updated_at || 0)) {
tryJson(() => {
Object.assign(data, {
relays_updated_at: e.created_at,
relays: Object.entries(JSON.parse(e.content))
.map(([url, conditions]) => {
const {write, read} = conditions as Record<string, boolean|string>
return {
url,
write: [false, '!'].includes(write) ? false : true,
read: [false, '!'].includes(read) ? false : true,
}
})
.filter(r => isRelay(r.url)),
})
})
}
return data
},
10000: () => {
if (e.created_at > (person.mutes_updated_at || 0)) {
return {
mutes_updated_at: e.created_at,
mutes: e.tags,
}
}
},
// DEPRECATED
12165: () => {
if (e.created_at > (person.mutes_updated_at || 0)) {
return {
mutes_updated_at: e.created_at,
mutes: e.tags,
}
}
},
// DEPRECATED
10001: () => {
if (e.created_at > (person.relays_updated_at || 0)) {
return {
relays_updated_at: e.created_at,
relays: e.tags.map(([url, read, write]) =>
({url, read: read !== '!', write: write !== '!'})),
}
}
},
10002: () => {
if (e.created_at > (person.relays_updated_at || 0)) {
return {
relays_updated_at: e.created_at,
relays: e.tags.map(([_, url, mode]) => {
const read = (mode || 'read') === 'read'
const write = (mode || 'write') === 'write'
return {url, read, write}
}),
}
}
},
default: () => {
log(`Received unsupported event type ${e.kind}`)
},
}),
updated_at: now(),
})
}
}
// Chat rooms
const processRoomEvents = events => {
const roomEvents = events.filter(e => [40, 41].includes(e.kind))
for (const e of roomEvents) {
const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content)))
const roomId = e.kind === 40 ? e.id : Tags.from(e).type("e").values().first()
// Ignore non-standard rooms that don't have a name
if (!roomId || !content?.name) {
continue
}
const room = rooms.get(roomId)
// Don't let old edits override new ones
if (room?.updated_at >= e.created_at) {
continue
}
rooms.put({
...room,
...content,
id: roomId,
pubkey: e.pubkey,
updated_at: e.created_at,
})
}
}
// Routes
const getWeight = type => {
if (type === 'nip05') return 1
if (type === 'kind:10002') return 1
if (type === 'kind:3') return 0.8
if (type === 'kind:2') return 0.5
if (type === 'seen') return 0.2
}
const calculateRoute = (pubkey, rawUrl, type, mode, created_at) => {
if (!isShareableRelay(rawUrl)) {
return
}
const url = normalizeRelayUrl(rawUrl)
const id = hash([pubkey, url, mode].join('')).toString()
const score = getWeight(type) * (1 - (now() - created_at) / timedelta(30, 'days'))
const defaults = {id, pubkey, url, mode, score: 0, count: 0, types: []}
const route = routes.get(id) || defaults
const newTotalScore = route.score * route.count + score
const newCount = route.count + 1
if (score > 0) {
return {
...route,
count: newCount,
score: newTotalScore / newCount,
types: uniq(route.types.concat(type)),
last_seen: Math.max(created_at, route.last_seen || 0),
}
}
}
const processRoutes = async events => {
let updates = []
// Sample events so we're not burning too many resources
for (const e of shuffle(events).slice(0, 10)) {
switcherFn(e.kind, {
0: () => {
updates.push(
calculateRoute(e.pubkey, e.content, 'seen', 'write', e.created_at)
)
},
2: () => {
updates.push(
calculateRoute(e.pubkey, e.content, 'kind:2', 'read', e.created_at)
)
updates.push(
calculateRoute(e.pubkey, e.content, 'kind:2', 'write', e.created_at)
)
},
3: () => {
if (e.content) {
tryJson(() => {
Object.entries(JSON.parse(e.content))
.forEach(([url, conditions]) => {
const {write, read} = conditions as Record<string, boolean|string>
if (![false, '!'].includes(write)) {
updates.push(
calculateRoute(e.pubkey, url, 'kind:3', 'write', e.created_at)
)
}
if (![false, '!'].includes(read)) {
updates.push(
calculateRoute(e.pubkey, url, 'kind:3', 'read', e.created_at)
)
}
})
})
}
},
// DEPRECATED
10001: () => {
e.tags
.forEach(([url, read, write]) => {
if (write !== '!') {
calculateRoute(e.pubkey, url, 'kind:10002', 'write', e.created_at)
}
if (read !== '!') {
calculateRoute(e.pubkey, url, 'kind:10002', 'read', e.created_at)
}
})
},
10002: () => {
e.tags
.forEach(([_, url, mode]) => {
if (mode) {
calculateRoute(e.pubkey, url, 'kind:10002', mode, e.created_at)
} else {
calculateRoute(e.pubkey, url, 'kind:10002', 'read', e.created_at)
calculateRoute(e.pubkey, url, 'kind:10002', 'write', e.created_at)
}
})
},
default: noop,
})
}
updates = updates.filter(identity)
if (!isEmpty(updates)) {
await relays.bulkPatch(updates.map(pick(['url'])))
await routes.bulkPut(updates)
}
}
// Utils
// People
const verifyNip05 = (pubkey, as) =>
nip05.queryProfile(as).then(result => {
@ -298,10 +58,12 @@ const verifyNip05 = (pubkey, as) =>
relays.bulkPatch(urls.map(url => ({url: normalizeRelayUrl(url)})))
routes.bulkPut(
urls.flatMap(url => [
calculateRoute(pubkey, url, 'nip05', 'write', now()),
calculateRoute(pubkey, url, 'nip05', 'read', now()),
]).filter(identity)
urls
.flatMap(url => [
addRoute(pubkey, url, "nip05", "write", now()),
addRoute(pubkey, url, "nip05", "read", now()),
])
.filter(identity)
)
}
}
@ -311,10 +73,10 @@ const verifyZapper = async (pubkey, address) => {
let url
// Try to parse it as a lud06 LNURL or as a lud16 address
if (address.startsWith('lnurl1')) {
if (address.startsWith("lnurl1")) {
url = tryFunc(() => lnurlDecode(address))
} else if (address.includes('@')) {
const [name, domain] = address.split('@')
} else if (address.includes("@")) {
const [name, domain] = address.split("@")
if (domain && name) {
url = `https://${domain}/.well-known/lnurlp/${name}`
@ -327,11 +89,259 @@ const verifyZapper = async (pubkey, address) => {
const res = await tryFetch(() => fetch(url))
const zapper = await tryJson(() => res?.json())
const lnurl = lnurlEncode('lnurl', url)
const lnurl = lnurlEncode("lnurl", url)
if (zapper?.allowsNostr && zapper?.nostrPubkey) {
people.patch({pubkey, zapper, lnurl})
}
}
addHandler(0, e => {
tryJson(() => {
const kind0 = JSON.parse(e.content)
const person = people.get(e.pubkey)
if (e.created_at < person?.kind0_updated_at) {
return
}
if (kind0.nip05) {
verifyNip05(e.pubkey, kind0.nip05)
}
const address = kind0.lud16 || kind0.lud06
if (address) {
verifyZapper(e.pubkey, address.toLowerCase())
}
people.patch({
pubkey: e.pubkey,
updated_at: now(),
kind0: {...person?.kind0, ...kind0},
kind0_updated_at: e.created_at,
})
})
})
addHandler(2, e => {
const person = people.get(e.pubkey)
if (e.created_at < person?.relays_updated_at) {
return
}
people.patch({
pubkey: e.pubkey,
updated_at: now(),
relays_updated_at: e.created_at,
relays: uniqByUrl((person?.relays || []).concat({url: e.content})),
})
})
addHandler(3, e => {
const person = people.get(e.pubkey)
if (e.created_at > (person?.petnames_updated_at || 0)) {
people.patch({
pubkey: e.pubkey,
updated_at: now(),
petnames_updated_at: e.created_at,
petnames: e.tags.filter(t => t[0] === "p"),
})
}
if (e.created_at > (person.relays_updated_at || 0)) {
tryJson(() => {
people.patch({
pubkey: e.pubkey,
relays_updated_at: e.created_at,
relays: Object.entries(JSON.parse(e.content))
.map(([url, conditions]) => {
const {write, read} = conditions as Record<string, boolean | string>
return {
url: normalizeRelayUrl(url),
write: [false, "!"].includes(write) ? false : true,
read: [false, "!"].includes(read) ? false : true,
}
})
.filter(r => isRelay(r.url)),
})
})
}
})
addHandler(10000, e => {
const person = people.get(e.pubkey)
if (e.created_at < person?.mutes_updated_at) {
return
}
people.patch({
pubkey: e.pubkey,
updated_at: now(),
mutes_updated_at: e.created_at,
mutes: e.tags,
})
})
// DEPRECATED
addHandler(12165, e => {
const person = people.get(e.pubkey)
if (e.created_at < person?.mutes_updated_at) {
return
}
people.patch({
pubkey: e.pubkey,
updated_at: now(),
mutes_updated_at: e.created_at,
mutes: e.tags,
})
})
addHandler(10002, e => {
const person = people.get(e.pubkey)
if (e.created_at < person?.relays_updated_at) {
return
}
people.patch({
pubkey: e.pubkey,
updated_at: now(),
relays_updated_at: e.created_at,
relays: e.tags.map(([_, url, mode]) => {
const read = (mode || "read") === "read"
const write = (mode || "write") === "write"
return {url, read, write}
}),
})
})
// Rooms
addHandler(40, e => {
const room = rooms.get(e.id)
if (e.created_at < room?.updated_at) {
return
}
const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content)))
if (!content?.name) {
return
}
rooms.patch({
id: e.id,
pubkey: e.pubkey,
updated_at: e.created_at,
...content,
})
})
addHandler(41, e => {
const roomId = Tags.from(e).type("e").values().first()
if (!roomId) {
return
}
const room = rooms.get(roomId)
if (e.created_at < room?.updated_at) {
return
}
const content = tryJson(() => pick(roomAttrs, JSON.parse(e.content)))
if (!content?.name) {
return
}
rooms.patch({
id: roomId,
pubkey: e.pubkey,
updated_at: e.created_at,
...content,
})
})
// Routes
const getWeight = type => {
if (type === "nip05") return 1
if (type === "kind:10002") return 1
if (type === "kind:3") return 0.8
if (type === "kind:2") return 0.5
}
const addRoute = (pubkey, rawUrl, type, mode, created_at) => {
if (!isShareableRelay(rawUrl)) {
return
}
const url = normalizeRelayUrl(rawUrl)
const id = hash([pubkey, url, mode].join("")).toString()
const score = getWeight(type) * (1 - (now() - created_at) / timedelta(30, "days"))
const defaults = {id, pubkey, url, mode, score: 0, count: 0, types: []}
const route = routes.get(id) || defaults
const newTotalScore = route.score * route.count + score
const newCount = route.count + 1
if (score > 0) {
relays.patch({
url: route.url,
})
routes.put({
...route,
count: newCount,
score: newTotalScore / newCount,
types: uniq(route.types.concat(type)),
last_seen: Math.max(created_at, route.last_seen || 0),
})
}
}
addHandler(2, e => {
addRoute(e.pubkey, e.content, "kind:2", "read", e.created_at)
addRoute(e.pubkey, e.content, "kind:2", "write", e.created_at)
})
addHandler(3, e => {
tryJson(() => {
Object.entries(JSON.parse(e.content || "")).forEach(([url, conditions]) => {
const {write, read} = conditions as Record<string, boolean | string>
if (![false, "!"].includes(write)) {
addRoute(e.pubkey, url, "kind:3", "write", e.created_at)
}
if (![false, "!"].includes(read)) {
addRoute(e.pubkey, url, "kind:3", "read", e.created_at)
}
})
})
})
addHandler(10002, e => {
e.tags.forEach(([_, url, mode]) => {
if (mode) {
addRoute(e.pubkey, url, "kind:10002", mode, e.created_at)
} else {
addRoute(e.pubkey, url, "kind:10002", "read", e.created_at)
addRoute(e.pubkey, url, "kind:10002", "write", e.created_at)
}
})
})
export default {processEvents}

View File

@ -6,7 +6,7 @@ import {writable} from 'svelte/store'
import {isObject, mapValues, ensurePlural} from 'hurdak/lib/hurdak'
import {log} from 'src/util/logger'
import {where} from 'src/util/misc'
import {lf} from 'src/agent/database'
import {lf} from 'src/agent/storage'
// Local copy of data so we can provide a sync observable interface. The worker
// is just for storing data and processing expensive queries