Make query optimizer pluggable
This commit is contained in:
parent
a4c1ba8450
commit
e2e1bb90ca
@ -2,14 +2,14 @@ import "./index.css";
|
||||
import "@szhsin/react-menu/dist/index.css";
|
||||
import "./fonts/inter.css";
|
||||
|
||||
import {default as wasmInit} from "@snort/system-query";
|
||||
import { compress, expand_filter, flat_merge, get_diff, default as wasmInit } from "@snort/system-query";
|
||||
import WasmPath from "@snort/system-query/pkg/system_query_bg.wasm";
|
||||
|
||||
import { StrictMode } from "react";
|
||||
import * as ReactDOM from "react-dom/client";
|
||||
import { Provider } from "react-redux";
|
||||
import { createBrowserRouter, RouterProvider } from "react-router-dom";
|
||||
import { EventPublisher, NostrSystem, ProfileLoaderService, Nip7Signer, PowWorker } from "@snort/system";
|
||||
import { EventPublisher, NostrSystem, ProfileLoaderService, Nip7Signer, PowWorker, QueryOptimizer, FlatReqFilter, ReqFilter } from "@snort/system";
|
||||
import { SnortContext } from "@snort/system-react";
|
||||
|
||||
import * as serviceWorkerRegistration from "serviceWorkerRegistration";
|
||||
@ -39,6 +39,21 @@ import { db } from "Db";
|
||||
import { preload, RelayMetrics, UserCache, UserRelays } from "Cache";
|
||||
import { LoginStore } from "Login";
|
||||
|
||||
const WasmQueryOptimizer = {
|
||||
expandFilter: (f: ReqFilter) => {
|
||||
return expand_filter(f) as Array<FlatReqFilter>;
|
||||
},
|
||||
getDiff: (prev: Array<ReqFilter>, next: Array<ReqFilter>) => {
|
||||
return get_diff(prev, next) as Array<FlatReqFilter>;
|
||||
},
|
||||
flatMerge: (all: Array<FlatReqFilter>) => {
|
||||
return flat_merge(all) as Array<ReqFilter>;
|
||||
},
|
||||
compress: (all: Array<ReqFilter>) => {
|
||||
return compress(all) as Array<ReqFilter>;
|
||||
}
|
||||
} as QueryOptimizer;
|
||||
|
||||
/**
|
||||
* Singleton nostr system
|
||||
*/
|
||||
@ -46,6 +61,7 @@ export const System = new NostrSystem({
|
||||
relayCache: UserRelays,
|
||||
profileCache: UserCache,
|
||||
relayMetrics: RelayMetrics,
|
||||
queryOptimizer: WasmQueryOptimizer,
|
||||
authHandler: async (c, r) => {
|
||||
const { publicKey, privateKey } = LoginStore.snapshot();
|
||||
if (privateKey) {
|
||||
|
6
packages/system-query/pkg/system_query.d.ts
vendored
6
packages/system-query/pkg/system_query.d.ts
vendored
@ -22,6 +22,11 @@ export function get_diff(prev: any, next: any): any;
|
||||
* @returns {any}
|
||||
*/
|
||||
export function flat_merge(val: any): any;
|
||||
/**
|
||||
* @param {any} val
|
||||
* @returns {any}
|
||||
*/
|
||||
export function compress(val: any): any;
|
||||
|
||||
export type InitInput = RequestInfo | URL | Response | BufferSource | WebAssembly.Module;
|
||||
|
||||
@ -31,6 +36,7 @@ export interface InitOutput {
|
||||
readonly expand_filter: (a: number, b: number) => void;
|
||||
readonly get_diff: (a: number, b: number, c: number) => void;
|
||||
readonly flat_merge: (a: number, b: number) => void;
|
||||
readonly compress: (a: number, b: number) => void;
|
||||
readonly __wbindgen_malloc: (a: number, b: number) => number;
|
||||
readonly __wbindgen_realloc: (a: number, b: number, c: number, d: number) => number;
|
||||
readonly __wbindgen_add_to_stack_pointer: (a: number) => number;
|
||||
|
@ -270,6 +270,26 @@ export function flat_merge(val) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {any} val
|
||||
* @returns {any}
|
||||
*/
|
||||
export function compress(val) {
|
||||
try {
|
||||
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
|
||||
wasm.compress(retptr, addHeapObject(val));
|
||||
var r0 = getInt32Memory0()[retptr / 4 + 0];
|
||||
var r1 = getInt32Memory0()[retptr / 4 + 1];
|
||||
var r2 = getInt32Memory0()[retptr / 4 + 2];
|
||||
if (r2) {
|
||||
throw takeObject(r1);
|
||||
}
|
||||
return takeObject(r0);
|
||||
} finally {
|
||||
wasm.__wbindgen_add_to_stack_pointer(16);
|
||||
}
|
||||
}
|
||||
|
||||
function handleError(f, args) {
|
||||
try {
|
||||
return f.apply(this, args);
|
||||
|
Binary file not shown.
@ -5,6 +5,7 @@ export function diff_filters(a: number, b: number, c: number): void;
|
||||
export function expand_filter(a: number, b: number): void;
|
||||
export function get_diff(a: number, b: number, c: number): void;
|
||||
export function flat_merge(a: number, b: number): void;
|
||||
export function compress(a: number, b: number): void;
|
||||
export function __wbindgen_malloc(a: number, b: number): number;
|
||||
export function __wbindgen_realloc(a: number, b: number, c: number, d: number): number;
|
||||
export function __wbindgen_add_to_stack_pointer(a: number): number;
|
||||
|
@ -44,6 +44,13 @@ pub fn flat_merge(val: JsValue) -> Result<JsValue, JsValue> {
|
||||
Ok(serde_wasm_bindgen::to_value(&result)?)
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub fn compress(val: JsValue) -> Result<JsValue, JsValue> {
|
||||
let val_parsed: Vec<ReqFilter> = serde_wasm_bindgen::from_value(val)?;
|
||||
let result = merge::merge::<ReqFilter, ReqFilter>(val_parsed.iter().collect());
|
||||
Ok(serde_wasm_bindgen::to_value(&result)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { ReqFilter, UsersRelays } from ".";
|
||||
import { dedupe, unwrap } from "@snort/shared";
|
||||
import debug from "debug";
|
||||
import { FlatReqFilter } from "request-expander";
|
||||
import { FlatReqFilter } from "./query-optimizer";
|
||||
|
||||
const PickNRelays = 2;
|
||||
|
||||
|
@ -1,9 +1,11 @@
|
||||
import { AuthHandler, RelaySettings, ConnectionStateSnapshot } from "./connection";
|
||||
import { RequestBuilder } from "./request-builder";
|
||||
import { NoteStore, NoteStoreHook, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { Query } from "./query";
|
||||
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
|
||||
import { ProfileLoaderService } from "./profile-cache";
|
||||
import { RelayCache } from "./gossip-model";
|
||||
import { QueryOptimizer } from "./query-optimizer";
|
||||
|
||||
export * from "./nostr-system";
|
||||
export { default as EventKind } from "./event-kind";
|
||||
@ -24,6 +26,7 @@ export * from "./signer";
|
||||
export * from "./text";
|
||||
export * from "./pow";
|
||||
export * from "./pow-util";
|
||||
export * from "./query-optimizer";
|
||||
|
||||
export * from "./impl/nip4";
|
||||
export * from "./impl/nip44";
|
||||
@ -96,6 +99,16 @@ export interface SystemInterface {
|
||||
* Profile cache/loader
|
||||
*/
|
||||
get ProfileLoader(): ProfileLoaderService;
|
||||
|
||||
/**
|
||||
* Relay cache for "Gossip" model
|
||||
*/
|
||||
get RelayCache(): RelayCache;
|
||||
|
||||
/**
|
||||
* Query optimizer
|
||||
*/
|
||||
get QueryOptimizer(): QueryOptimizer;
|
||||
}
|
||||
|
||||
export interface SystemSnapshot {
|
||||
@ -121,4 +134,4 @@ export interface MessageEncryptor {
|
||||
getSharedSecret(privateKey: string, publicKey: string): Promise<Uint8Array> | Uint8Array;
|
||||
encryptData(plaintext: string, sharedSecet: Uint8Array): Promise<MessageEncryptorPayload> | MessageEncryptorPayload;
|
||||
decryptData(payload: MessageEncryptorPayload, sharedSecet: Uint8Array): Promise<string> | string;
|
||||
}
|
||||
}
|
@ -20,6 +20,8 @@ import {
|
||||
UsersRelays,
|
||||
} from ".";
|
||||
import { EventsCache } from "./cache/events";
|
||||
import { RelayCache } from "./gossip-model";
|
||||
import { QueryOptimizer, DefaultQueryOptimizer } from "./query-optimizer";
|
||||
|
||||
/**
|
||||
* Manages nostr content retrieval system
|
||||
@ -72,12 +74,18 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
*/
|
||||
#eventsCache: FeedCache<NostrEvent>;
|
||||
|
||||
/**
|
||||
* Query optimizer instance
|
||||
*/
|
||||
#queryOptimizer: QueryOptimizer;
|
||||
|
||||
constructor(props: {
|
||||
authHandler?: AuthHandler;
|
||||
relayCache?: FeedCache<UsersRelays>;
|
||||
profileCache?: FeedCache<MetadataCache>;
|
||||
relayMetrics?: FeedCache<RelayMetrics>;
|
||||
eventsCache?: FeedCache<NostrEvent>;
|
||||
queryOptimizer?: QueryOptimizer;
|
||||
}) {
|
||||
super();
|
||||
this.#handleAuth = props.authHandler;
|
||||
@ -85,6 +93,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
this.#profileCache = props.profileCache ?? new UserProfileCache();
|
||||
this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache();
|
||||
this.#eventsCache = props.eventsCache ?? new EventsCache();
|
||||
this.#queryOptimizer = props.queryOptimizer ?? DefaultQueryOptimizer;
|
||||
|
||||
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
|
||||
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
|
||||
@ -92,9 +101,6 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
}
|
||||
HandleAuth?: AuthHandler | undefined;
|
||||
|
||||
/**
|
||||
* Profile loader service allows you to request profiles
|
||||
*/
|
||||
get ProfileLoader() {
|
||||
return this.#profileLoader;
|
||||
}
|
||||
@ -103,6 +109,14 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
return [...this.#sockets.values()].map(a => a.snapshot());
|
||||
}
|
||||
|
||||
get RelayCache(): RelayCache {
|
||||
return this.#relayCache;
|
||||
}
|
||||
|
||||
get QueryOptimizer(): QueryOptimizer {
|
||||
return this.#queryOptimizer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup caches
|
||||
*/
|
||||
@ -241,8 +255,8 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
return existing;
|
||||
}
|
||||
const filters = !req.options?.skipDiff
|
||||
? req.buildDiff(this.#relayCache, existing.filters)
|
||||
: req.build(this.#relayCache);
|
||||
? req.buildDiff(this, existing.filters)
|
||||
: req.build(this);
|
||||
if (filters.length === 0 && !!req.options?.skipDiff) {
|
||||
return existing;
|
||||
} else {
|
||||
@ -255,7 +269,7 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
} else {
|
||||
const store = new type();
|
||||
|
||||
const filters = req.build(this.#relayCache);
|
||||
const filters = req.build(this);
|
||||
const q = new Query(req.id, req.instance, store, req.options?.leaveOpen);
|
||||
if (filters.some(a => a.filters.some(b => b.ids))) {
|
||||
const expectIds = new Set(filters.flatMap(a => a.filters).flatMap(a => a.ids ?? []));
|
||||
@ -397,4 +411,4 @@ export class NostrSystem extends ExternalStore<SystemSnapshot> implements System
|
||||
}
|
||||
setTimeout(() => this.#cleanup(), 1_000);
|
||||
}
|
||||
}
|
||||
}
|
43
packages/system/src/query-optimizer/index.ts
Normal file
43
packages/system/src/query-optimizer/index.ts
Normal file
@ -0,0 +1,43 @@
|
||||
import { ReqFilter } from "../nostr"
|
||||
import { expandFilter } from "./request-expander"
|
||||
import { flatMerge, mergeSimilar } from "./request-merger"
|
||||
import { diffFilters } from "./request-splitter"
|
||||
|
||||
export interface FlatReqFilter {
|
||||
keys: number;
|
||||
ids?: string;
|
||||
authors?: string;
|
||||
kinds?: number;
|
||||
"#e"?: string;
|
||||
"#p"?: string;
|
||||
"#t"?: string;
|
||||
"#d"?: string;
|
||||
"#r"?: string;
|
||||
search?: string;
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
export interface QueryOptimizer {
|
||||
expandFilter(f: ReqFilter): Array<FlatReqFilter>
|
||||
getDiff(prev: Array<ReqFilter>, next: Array<ReqFilter>): Array<FlatReqFilter>
|
||||
flatMerge(all: Array<FlatReqFilter>): Array<ReqFilter>
|
||||
compress(all: Array<ReqFilter>): Array<ReqFilter>
|
||||
}
|
||||
|
||||
export const DefaultQueryOptimizer = {
|
||||
expandFilter: (f: ReqFilter) => {
|
||||
return expandFilter(f);
|
||||
},
|
||||
getDiff: (prev: Array<ReqFilter>, next: Array<ReqFilter>) => {
|
||||
const diff = diffFilters(prev.flatMap(a => expandFilter(a)), next.flatMap(a => expandFilter(a)));
|
||||
return diff.added;
|
||||
},
|
||||
flatMerge: (all: Array<FlatReqFilter>) => {
|
||||
return flatMerge(all);
|
||||
},
|
||||
compress: (all: Array<ReqFilter>) => {
|
||||
return mergeSimilar(all);
|
||||
}
|
||||
} as QueryOptimizer;
|
@ -1,27 +1,11 @@
|
||||
import { ReqFilter } from "./nostr";
|
||||
import {expand_filter} from "@snort/system-query";
|
||||
|
||||
export interface FlatReqFilter {
|
||||
keys: number;
|
||||
ids?: string;
|
||||
authors?: string;
|
||||
kinds?: number;
|
||||
"#e"?: string;
|
||||
"#p"?: string;
|
||||
"#t"?: string;
|
||||
"#d"?: string;
|
||||
"#r"?: string;
|
||||
search?: string;
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
}
|
||||
import { FlatReqFilter } from ".";
|
||||
import { ReqFilter } from "../nostr";
|
||||
|
||||
/**
|
||||
* Expand a filter into its most fine grained form
|
||||
*/
|
||||
export function expandFilter(f: ReqFilter): Array<FlatReqFilter> {
|
||||
/*const ret: Array<FlatReqFilter> = [];
|
||||
const ret: Array<FlatReqFilter> = [];
|
||||
const src = Object.entries(f);
|
||||
const keys = src.filter(([, v]) => Array.isArray(v)).map(a => a[0]);
|
||||
const props = src.filter(([, v]) => !Array.isArray(v));
|
||||
@ -47,8 +31,5 @@ export function expandFilter(f: ReqFilter): Array<FlatReqFilter> {
|
||||
...Object.fromEntries(props),
|
||||
});
|
||||
|
||||
return ret;*/
|
||||
|
||||
const ret = expand_filter(f);
|
||||
return ret as Array<FlatReqFilter>;
|
||||
return ret;
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
import { distance } from "@snort/shared";
|
||||
import { ReqFilter } from ".";
|
||||
import { FlatReqFilter } from "./request-expander";
|
||||
import { ReqFilter } from "..";
|
||||
import { FlatReqFilter } from ".";
|
||||
|
||||
/**
|
||||
* Keys which can change the entire meaning of the filter outside the array types
|
@ -1,9 +1,8 @@
|
||||
import { flatFilterEq } from "./utils";
|
||||
import { FlatReqFilter } from "./request-expander";
|
||||
import { diff_filters } from "@snort/system-query";
|
||||
import { flatFilterEq } from "../utils";
|
||||
import { FlatReqFilter } from ".";
|
||||
|
||||
export function diffFilters(prev: Array<FlatReqFilter>, next: Array<FlatReqFilter>, calcRemoved?: boolean) {
|
||||
/*const added = [];
|
||||
const added = [];
|
||||
const removed = [];
|
||||
|
||||
for (const n of next) {
|
||||
@ -29,12 +28,5 @@ export function diffFilters(prev: Array<FlatReqFilter>, next: Array<FlatReqFilte
|
||||
added: changed ? added : [],
|
||||
removed: changed ? removed : [],
|
||||
changed,
|
||||
};*/
|
||||
|
||||
const added = diff_filters(prev, next);
|
||||
return {
|
||||
changed: added.length > 0,
|
||||
added: (added as Array<FlatReqFilter>),
|
||||
removed: []
|
||||
}
|
||||
};
|
||||
}
|
@ -1,12 +1,11 @@
|
||||
import debug from "debug";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { appendDedupe, sanitizeRelayUrl, unixNowMs } from "@snort/shared";
|
||||
import { flat_merge, get_diff }from "@snort/system-query";
|
||||
|
||||
import { ReqFilter, u256, HexKey, EventKind } from ".";
|
||||
import EventKind from "./event-kind";
|
||||
import { SystemInterface } from "index";
|
||||
import { ReqFilter, u256, HexKey } from "./nostr";
|
||||
import { RelayCache, splitByWriteRelays, splitFlatByWriteRelays } from "./gossip-model";
|
||||
import { flatMerge, mergeSimilar } from "./request-merger";
|
||||
import { FlatReqFilter } from "./request-expander";
|
||||
|
||||
/**
|
||||
* Which strategy is used when building REQ filters
|
||||
@ -95,27 +94,25 @@ export class RequestBuilder {
|
||||
return this.#builders.map(f => f.filter);
|
||||
}
|
||||
|
||||
build(relays: RelayCache): Array<BuiltRawReqFilter> {
|
||||
const expanded = this.#builders.flatMap(a => a.build(relays, this.id));
|
||||
return this.#groupByRelay(expanded);
|
||||
build(system: SystemInterface): Array<BuiltRawReqFilter> {
|
||||
const expanded = this.#builders.flatMap(a => a.build(system.RelayCache, this.id));
|
||||
return this.#groupByRelay(system, expanded);
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects a change in request from a previous set of filters
|
||||
*/
|
||||
buildDiff(relays: RelayCache, prev: Array<ReqFilter>): Array<BuiltRawReqFilter> {
|
||||
buildDiff(system: SystemInterface, prev: Array<ReqFilter>): Array<BuiltRawReqFilter> {
|
||||
const start = unixNowMs();
|
||||
|
||||
//const next = this.#builders.flatMap(f => expandFilter(f.filter));
|
||||
//const diff = diffFilters(prev, next);
|
||||
const diff = get_diff(prev, this.buildRaw()) as Array<FlatReqFilter>;
|
||||
const diff = system.QueryOptimizer.getDiff(prev, this.buildRaw());
|
||||
const ts = unixNowMs() - start;
|
||||
this.#log("buildDiff %s %d ms", this.id, ts);
|
||||
if (diff.length > 0) {
|
||||
return splitFlatByWriteRelays(relays, diff).map(a => {
|
||||
return splitFlatByWriteRelays(system.RelayCache, diff).map(a => {
|
||||
return {
|
||||
strategy: RequestStrategy.AuthorsRelays,
|
||||
filters: flat_merge(a.filters) as Array<ReqFilter>,
|
||||
filters: system.QueryOptimizer.flatMerge(a.filters),
|
||||
relay: a.relay,
|
||||
};
|
||||
});
|
||||
@ -130,7 +127,7 @@ export class RequestBuilder {
|
||||
* @param expanded
|
||||
* @returns
|
||||
*/
|
||||
#groupByRelay(expanded: Array<BuiltRawReqFilter>) {
|
||||
#groupByRelay(system: SystemInterface, expanded: Array<BuiltRawReqFilter>) {
|
||||
const relayMerged = expanded.reduce((acc, v) => {
|
||||
const existing = acc.get(v.relay);
|
||||
if (existing) {
|
||||
@ -143,7 +140,7 @@ export class RequestBuilder {
|
||||
|
||||
const filtersSquashed = [...relayMerged.values()].map(a => {
|
||||
return {
|
||||
filters: mergeSimilar(a.flatMap(b => b.filters)),
|
||||
filters: system.QueryOptimizer.compress(a.flatMap(b => b.filters)),
|
||||
relay: a[0].relay,
|
||||
strategy: a[0].strategy,
|
||||
} as BuiltRawReqFilter;
|
||||
|
@ -6,6 +6,8 @@ import { NostrEvent, TaggedNostrEvent } from "./nostr";
|
||||
import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
|
||||
import { Query } from "./query";
|
||||
import { RequestBuilder } from "./request-builder";
|
||||
import { RelayCache } from "./gossip-model";
|
||||
import { QueryOptimizer } from "./query-optimizer";
|
||||
|
||||
export class SystemWorker extends ExternalStore<SystemSnapshot> implements SystemInterface {
|
||||
#port: MessagePort;
|
||||
@ -29,6 +31,13 @@ export class SystemWorker extends ExternalStore<SystemSnapshot> implements Syste
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
get RelayCache(): RelayCache {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
get QueryOptimizer(): QueryOptimizer {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
HandleAuth?: AuthHandler;
|
||||
|
||||
get Sockets(): ConnectionStateSnapshot[] {
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { equalProp } from "@snort/shared";
|
||||
import { FlatReqFilter } from "./request-expander";
|
||||
import { FlatReqFilter } from "./query-optimizer";
|
||||
import { NostrEvent, ReqFilter } from "./nostr";
|
||||
|
||||
export function findTag(e: NostrEvent, tag: string) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user