Re-work database class

This commit is contained in:
Jonathan Staab 2023-02-16 14:19:53 -06:00
parent 130773a90c
commit b106e4b52f
3 changed files with 116 additions and 128 deletions

View File

@ -1,5 +1,6 @@
import type {Writable} from 'svelte/store'
import {debounce} from 'throttle-debounce'
import {filter, always, is, prop, find, without, pluck, all, identity} from 'ramda'
import {partition, is, prop, find, without, pluck, all, identity} from 'ramda'
import {writable, derived} from 'svelte/store'
import {switcherFn, createMap, ensurePlural} from 'hurdak/lib/hurdak'
import {log, error} from 'src/util/logger'
@ -12,18 +13,6 @@ type Message = {
payload: object
}
type Table = {
name: string
subscribe: (subscription: (value: any) => void) => (() => void)
put: (data: object) => void
patch: (data: object) => void
bulkPut: (data: object) => void
bulkPatch: (data: object) => void
iter: (spec?: object) => Promise<Array<Record<string, any>>>
all: (spec?: object) => Array<Record<string, any>>
get: (key: string) => Record<string, any>
}
// Plumbing
const worker = new Worker(
@ -80,17 +69,6 @@ const callLocalforage = async (storeName, method, ...args) => {
// Methods that proxy localforage
const getItem = (storeName, ...args) => callLocalforage(storeName, 'getItem', ...args)
const setItem = (storeName, ...args) => callLocalforage(storeName, 'setItem', ...args)
const setItems = (storeName, ...args) => callLocalforage(storeName, 'setItems', ...args)
const removeItem = (storeName, ...args) => callLocalforage(storeName, 'removeItem', ...args)
const removeItems = (storeName, ...args) => callLocalforage(storeName, 'removeItems', ...args)
const length = storeName => callLocalforage(storeName, 'length')
const clear = storeName => callLocalforage(storeName, 'clear')
const keys = storeName => callLocalforage(storeName, 'keys')
const dump = storeName => callLocalforage(storeName, 'dump')
const iterate = (storeName, where = {}) => ({
[Symbol.asyncIterator]() {
let done = false
@ -140,111 +118,149 @@ const iterate = (storeName, where = {}) => ({
// Local copy of data so we can provide a sync observable interface. The worker
// is just for storing data and processing expensive queries
const registry = {}
const registry = {} as Record<string, Table>
type TableOpts = {
isValid?: (x: any) => boolean
resetOnInit?: boolean
initialize?: (table: Table) => Promise<object>
}
const defineTable = (name: string, pk: string, opts: TableOpts = {}): Table => {
const {isValid = always(true), resetOnInit = false} = opts
let p = Promise.resolve()
let listeners = []
let data = {}
class Table {
name: string
pk: string
opts: TableOpts
listeners: Array<(data: Record<string, any>) => void>
data: Record<string, any>
ready: Writable<boolean>
constructor(name, pk, opts: TableOpts = {}) {
this.name = name
this.pk = pk
this.opts = {initialize: t => this.dump(), ...opts}
this.listeners = []
this.data = {}
this.ready = writable(false)
const ready = writable(false)
registry[name] = this
const subscribe = f => {
listeners.push(f)
// Sync from storage initially
;(async () => {
const t = Date.now()
f(data)
this._setAndNotify(await this.opts.initialize(this))
return () => {
listeners = without([f], listeners)
}
const {length: recordsCount} = Object.keys(this.data)
const timeElapsed = Date.now() - t
log(`Table ${name} ready in ${timeElapsed}ms (${recordsCount} records)`)
this.ready.set(true)
})()
}
const setAndNotify = newData => {
_setAndNotify(newData) {
// Update our local copy
data = newData
this.data = newData
// Notify subscribers
for (const f of listeners) {
f(data)
for (const cb of this.listeners) {
cb(this.data)
}
}
subscribe(cb) {
this.listeners.push(cb)
const bulkPut = (newData: Record<string, object>): void => {
cb(this.data)
return () => {
this.listeners = without([cb], this.listeners)
}
}
async bulkPut(newData: Record<string, object>): Promise<void> {
if (is(Array, newData)) {
throw new Error(`Updates must be an object, not an array`)
}
newData = filter(isValid, newData)
setAndNotify({...data, ...newData})
this._setAndNotify({...this.data, ...newData})
// Sync to storage, keeping updates in order
p = p.then(() => setItems(name, newData)) as Promise<void>
callLocalforage(this.name, 'setItems', newData)
}
const bulkPatch = (updates: Record<string, object>): void => {
async bulkPatch(updates: Record<string, object>): Promise<void> {
if (is(Array, updates)) {
throw new Error(`Updates must be an object, not an array`)
}
const newData = {}
for (const [k, v] of Object.entries(updates)) {
newData[k] = {...data[k], ...v}
newData[k] = {...this.data[k], ...v}
}
bulkPut(newData)
this.bulkPut({...this.data, ...newData})
}
const put = item => bulkPut(createMap(pk, [item]))
const patch = item => bulkPatch(createMap(pk, [item]))
const toArray = () => Object.values(data)
const iter = (spec = {}) => asyncIterableToArray(iterate(name, spec), prop('v'))
const all = (spec = {}) => toArray().filter(where(spec))
const one = (spec = {}) => find(where(spec), toArray())
const get = k => data[k]
// Sync from storage initially
;(async () => {
const t = Date.now()
const initialData = filter(isValid, await dump(name))
if (resetOnInit) {
await clear(name)
await bulkPut(initialData)
} else {
setAndNotify(initialData)
}
const recordsCount = Object.keys(initialData).length
log(`Table ${name} ready in ${Date.now() - t}ms (${recordsCount} records)`)
ready.set(true)
})()
registry[name] = {
name, subscribe, bulkPut, bulkPatch, put, patch, toArray, iter, all, one, get,
ready,
async bulkRemove(keys) {
await callLocalforage(this.name, 'removeItems', keys)
}
put(item) {
return this.bulkPut(createMap(this.pk, [item]))
}
patch(item) {
return this.bulkPatch(createMap(this.pk, [item]))
}
remove(k) {
return this.bulkRemove([k])
}
clear() {
return callLocalforage(this.name, 'clear')
}
dump() {
return callLocalforage(this.name, 'dump')
}
toArray() {
return Object.values(this.data)
}
iter(spec = {}) {
return asyncIterableToArray(iterate(name, spec), prop('v'))
}
all(spec = {}) {
return this.toArray().filter(where(spec))
}
one(spec = {}) {
return find(where(spec), this.toArray())
}
get(k) {
return this.data[k]
}
return registry[name]
}
const people = defineTable('people', 'pubkey')
const rooms = defineTable('rooms', 'id')
const messages = defineTable('messages', 'id')
const alerts = defineTable('alerts', 'id')
const relays = defineTable('relays', 'url')
const routes = defineTable('routes', 'id', {
resetOnInit: true,
isValid: route =>
route.last_seen > now() - timedelta(7, 'days'),
const people = new Table('people', 'pubkey')
const rooms = new Table('rooms', 'id')
const messages = new Table('messages', 'id')
const alerts = new Table('alerts', 'id')
const relays = new Table('relays', 'url', {
initialize: async table => {
const data = await table.dump()
const defaults = createMap('url', [
{url: 'wss://brb.io'},
{url: 'wss://nostr.zebedee.cloud'},
{url: 'wss://nostr-pub.wellorder.net'},
{url: 'wss://relay.nostr.band'},
{url: 'nostr.pleb.network'},
{url: 'relay.nostrich.de'},
{url: 'relay.damus.io'},
])
return Object.assign(data, defaults)
},
})
const routes = new Table('routes', 'id', {
initialize: async table => {
const isValid = r => r.last_seen > now() - timedelta(7, 'days')
const [valid, invalid] = partition(isValid, Object.values(await table.dump()))
// Delete stale routes asynchronously
table.bulkRemove(pluck('id', invalid))
return valid
},
})
// Helper to allow us to listen to changes of any given table
@ -300,7 +316,7 @@ const watch = (names, f) => {
const getPersonWithFallback = pubkey => people.get(pubkey) || {pubkey}
const clearAll = () => Promise.all(Object.keys(registry).map(clear))
const clearAll = () => Promise.all(Object.values(registry).map(t => t.clear()))
const ready = derived(pluck('ready', Object.values(registry)), all(identity))
@ -314,7 +330,6 @@ const onReady = cb => {
}
export default {
getItem, setItem, setItems, removeItem, removeItems, length, clear, keys,
dump, iterate, watch, getPersonWithFallback, clearAll, people, rooms, messages,
watch, getPersonWithFallback, clearAll, people, rooms, messages,
alerts, relays, routes, ready, onReady,
}

View File

@ -1,15 +0,0 @@
export default {
petnames: [
["p", "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d", "fiatjaf", "wss://nostr-pub.wellorder.net"],
["p", "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245", "jb55", "wss://relay.damus.io"],
["p", "97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322", "hodlbod", "wss://nostr-pub.wellorder.net"],
["p", "472f440f29ef996e92a186b8d320ff180c855903882e59d50de1b8bd5669301e", "MartyBent", "wss://relay.damus.io"],
["p", "82341f882b6eabcd2ba7f1ef90aad961cf074af15b9ef44a09f9d2a8fbfbe6a2", "jack", "wss://brb.io"],
["p", "85080d3bad70ccdcd7f74c29a44f55bb85cbcd3dd0cbb957da1d215bdb931204", "preston", "wss://relay.damus.io"],
],
relays: [
{url: 'wss://brb.io'},
{url: 'wss://nostr.zebedee.cloud'},
{url: 'wss://nostr-pub.wellorder.net'},
],
}

View File

@ -1,7 +1,5 @@
<script>
import {pluck, objOf} from 'ramda'
import {noop, createMap} from 'hurdak/lib/hurdak'
import {get} from 'svelte/store'
import {pluck} from 'ramda'
import {fly} from 'svelte/transition'
import {fuzzy} from "src/util/misc"
import Input from "src/partials/Input.svelte"
@ -10,22 +8,12 @@
import RelayCard from "src/partials/RelayCard.svelte"
import {relays} from "src/agent/relays"
import database from 'src/agent/database'
import {modal, settings} from "src/app"
import defaults from "src/agent/defaults"
import {modal} from "src/app"
let q = ""
let search
let knownRelays = database.watch('relays', t => t.all())
fetch(get(settings).dufflepudUrl + '/relay')
.then(async res => {
const json = await res.json()
database.relays.bulkPatch(createMap('url', json.relays.map(objOf('url'))))
}).catch(noop)
database.relays.bulkPatch(createMap('url', defaults.relays))
$: {
const joined = new Set(pluck('url', $relays))