system-worker progress
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Kieran 2024-01-09 09:28:48 +00:00
parent 24e145a0a0
commit bbdfb43834
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
10 changed files with 124 additions and 53 deletions

View File

@ -4,7 +4,7 @@ import { FormattedMessage } from "react-intl";
import { injectIntl } from "react-intl";
import messages from "@/Components/messages";
import { ProfileLoader } from "@/system";
import { System } from "@/system";
import { LoginStore } from "@/Utils/Login";
import AccountName from "./AccountName";
@ -285,8 +285,8 @@ class IrisAccount extends Component<Props> {
componentDidMount() {
const session = LoginStore.snapshot();
const myPub = session.publicKey;
ProfileLoader.Cache.hook(() => {
const profile = ProfileLoader.Cache.getFromCache(myPub);
System.ProfileLoader.Cache.hook(() => {
const profile = System.ProfileLoader.Cache.getFromCache(myPub);
const irisToActive = profile && profile.nip05 && profile.nip05.endsWith("@iris.to");
this.setState({ profile, irisToActive });
if (profile && !irisToActive) {

View File

@ -28,7 +28,7 @@ export function useRefreshFeedCache<T>(c: RefreshFeedCache<T>, leaveOpen = false
const q = system.Query(NoopStore, sub);
let t: ReturnType<typeof setTimeout> | undefined;
let tBuf: Array<TaggedNostrEvent> = [];
q.feed.on("event", evs => {
q.on("event", evs => {
if (!t) {
tBuf = [...evs];
t = setTimeout(() => {
@ -41,9 +41,8 @@ export function useRefreshFeedCache<T>(c: RefreshFeedCache<T>, leaveOpen = false
});
q.uncancel();
return () => {
q.feed.off("event");
q.off("event");
q.cancel();
q.sendClose();
};
}
}, [sub]);

View File

@ -34,6 +34,7 @@ export default defineConfig({
assetsInclude: ["**/*.md", "**/*.wasm"],
build: {
outDir: "build",
commonjsOptions: { transformMixedEsModules: true },
},
clearScreen: false,
publicDir: appConfig.get("publicDir"),
@ -51,4 +52,7 @@ export default defineConfig({
globals: true,
environment: "jsdom",
},
worker: {
format: "es",
},
});

View File

@ -14,10 +14,10 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
const subscribe = (onChanged: () => void) => {
if (rb) {
const q = system.Query<TStore>(type, rb);
q.feed.on("event", onChanged);
q.on("event", onChanged);
q.uncancel();
return () => {
q.feed.off("event", onChanged);
q.off("event", onChanged);
q.cancel();
};
}
@ -28,7 +28,7 @@ const useRequestBuilder = <TStore extends NoteStore, TSnapshot = ReturnType<TSto
const getState = (): StoreSnapshot<TSnapshot> => {
const q = system.GetQuery(rb?.id ?? "");
if (q) {
return unwrap(q).feed?.snapshot as StoreSnapshot<TSnapshot>;
return q.snapshot as StoreSnapshot<TSnapshot>;
}
return EmptySnapshot as StoreSnapshot<TSnapshot>;
};

View File

@ -1,7 +1,6 @@
import { RelaySettings, ConnectionStateSnapshot, OkResponse } from "./connection";
import { RequestBuilder } from "./request-builder";
import { NoteStore, NoteStoreSnapshotData } from "./note-collection";
import { Query } from "./query";
import { NoteStore, NoteStoreSnapshotData, StoreSnapshot } from "./note-collection";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
import { ProfileLoaderService } from "./profile-cache";
import { RelayCache } from "./outbox-model";
@ -43,7 +42,15 @@ export * from "./cache/user-relays";
export * from "./cache/user-metadata";
export * from "./cache/relay-metric";
export * from "./worker";
export * from "./worker/system-worker";
export interface QueryLike {
on: (event: "event", fn?: (evs: Array<TaggedNostrEvent>) => void) => void;
off: (event: "event", fn?: (evs: Array<TaggedNostrEvent>) => void) => void;
cancel: () => void;
uncancel: () => void;
get snapshot(): StoreSnapshot<NoteStoreSnapshotData>;
}
export interface SystemInterface {
/**
@ -65,14 +72,14 @@ export interface SystemInterface {
* Get an active query by ID
* @param id Query ID
*/
GetQuery(id: string): Query | undefined;
GetQuery(id: string): QueryLike | undefined;
/**
* Open a new query to relays
* @param type Store type
* @param req Request to send to relays
*/
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query;
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): QueryLike;
/**
* Fetch data from nostr relays asynchronously

View File

@ -20,6 +20,7 @@ import {
UsersRelays,
SnortSystemDb,
EventExt,
QueryLike,
} from ".";
import { EventsCache } from "./cache/events";
import { RelayCache, RelayMetadataLoader } from "./outbox-model";
@ -224,16 +225,16 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#pool.disconnect(address);
}
GetQuery(id: string): Query | undefined {
return this.#queryManager.get(id);
GetQuery(id: string): QueryLike | undefined {
return this.#queryManager.get(id) as QueryLike;
}
Fetch(req: RequestBuilder, cb?: (evs: ReadonlyArray<TaggedNostrEvent>) => void) {
return this.#queryManager.fetch(req, cb);
}
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): Query {
return this.#queryManager.query(type, req);
Query<T extends NoteStore>(type: { new (): T }, req: RequestBuilder): QueryLike {
return this.#queryManager.query(type, req) as QueryLike;
}
async #sendQuery(q: Query, qSend: BuiltRawReqFilter) {

View File

@ -116,6 +116,7 @@ export interface TraceReport {
interface QueryEvents {
trace: (report: TraceReport) => void;
event: (evs: ReadonlyArray<TaggedNostrEvent>) => void;
}
/**
@ -172,6 +173,8 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
this.#leaveOpen = leaveOpen ?? false;
this.#timeout = timeout ?? 5_000;
this.#checkTraces();
this.feed.on("event", evs => this.emit("event", evs));
}
isOpen() {
@ -193,6 +196,10 @@ export class Query extends EventEmitter<QueryEvents> implements QueryBase {
return this.#feed;
}
get snapshot() {
return this.#feed.snapshot;
}
handleEvent(sub: string, e: TaggedNostrEvent) {
for (const t of this.#tracing) {
if (t.id === sub || sub === "*") {

View File

@ -4,6 +4,8 @@ export const enum WorkerCommand {
Init,
ConnectRelay,
DisconnectRelay,
Query,
QueryResult,
}
export interface WorkerMessage<T> {

View File

@ -1,9 +1,11 @@
/// <reference lib="webworker" />
import { NostrSystem, NostrsystemProps } from "../nostr-system";
import { NostrSystem } from "../nostr-system";
import { WorkerMessage, WorkerCommand } from ".";
let system: NostrSystem | undefined;
const system = new NostrSystem({
checkSigs: true,
});
function reply<T>(id: string, type: WorkerCommand, data: T) {
globalThis.postMessage({
@ -18,31 +20,19 @@ function okReply(id: string, message?: string) {
function errorReply(id: string, message: string) {
reply<string>(id, WorkerCommand.ErrorResponse, message);
}
function checkInitialized() {
if (system === undefined) {
throw new Error("System not initialized");
}
}
globalThis.onmessage = async ev => {
console.debug(ev);
const data = ev.data as { id: string; type: WorkerCommand };
try {
switch (data.type) {
case WorkerCommand.Init: {
const cmd = ev.data as WorkerMessage<NostrsystemProps>;
if (system === undefined) {
system = new NostrSystem(cmd.data);
await system.Init();
okReply(data.id);
} else {
errorReply(data.id, "System is already initialized");
}
await system.Init();
okReply(data.id);
break;
}
case WorkerCommand.ConnectRelay: {
checkInitialized();
const cmd = ev.data as WorkerMessage<[string, { read: boolean; write: boolean }]>;
await system?.ConnectToRelay(cmd.data[0], cmd.data[1]);
await system.ConnectToRelay(cmd.data[0], cmd.data[1]);
okReply(data.id, "Connected");
break;
}

View File

@ -12,51 +12,110 @@ import {
RequestBuilder,
SystemInterface,
TaggedNostrEvent,
CachedMetadata,
DefaultOptimizer,
RelayMetadataLoader,
RelayMetricCache,
RelayMetrics,
UserProfileCache,
UserRelaysCache,
UsersRelays,
QueryLike,
} from "..";
import { NostrSystemEvents, NostrsystemProps } from "../nostr-system";
import { Query } from "../query";
import { WorkerCommand, WorkerMessage } from ".";
import { FeedCache } from "@snort/shared";
import { EventsCache } from "../cache/events";
import { RelayMetricHandler } from "../relay-metric-handler";
import debug from "debug";
export class SystemWorker extends EventEmitter<NostrSystemEvents> implements SystemInterface {
#log = debug("SystemWorker");
#worker: Worker;
#commandQueue: Map<string, (v: unknown) => void> = new Map();
checkSigs: boolean;
#relayCache: FeedCache<UsersRelays>;
#profileCache: FeedCache<CachedMetadata>;
#relayMetricsCache: FeedCache<RelayMetrics>;
#profileLoader: ProfileLoaderService;
#relayMetrics: RelayMetricHandler;
#eventsCache: FeedCache<NostrEvent>;
#relayLoader: RelayMetadataLoader;
get checkSigs() {
return true;
}
set checkSigs(v: boolean) {
// not used
}
constructor(scriptPath: string, props: NostrsystemProps) {
super();
this.checkSigs = props.checkSigs ?? false;
this.#relayCache = props.relayCache ?? new UserRelaysCache(props.db?.userRelays);
this.#profileCache = props.profileCache ?? new UserProfileCache(props.db?.users);
this.#relayMetricsCache = props.relayMetrics ?? new RelayMetricCache(props.db?.relayMetrics);
this.#eventsCache = props.eventsCache ?? new EventsCache(props.db?.events);
this.#profileLoader = new ProfileLoaderService(this, this.#profileCache);
this.#relayMetrics = new RelayMetricHandler(this.#relayMetricsCache);
this.#relayLoader = new RelayMetadataLoader(this, this.#relayCache);
this.#worker = new Worker(scriptPath, {
name: "SystemWorker",
type: "module",
});
this.#worker.onmessage = async e => {
const cmd = e.data as { id: string; type: WorkerCommand; data?: unknown };
if (cmd.type === WorkerCommand.OkResponse) {
const q = this.#commandQueue.get(cmd.id);
q?.(cmd.data);
this.#commandQueue.delete(cmd.id);
}
};
}
get Sockets(): ConnectionStateSnapshot[] {
throw new Error("Method not implemented.");
return [];
}
async Init() {
await this.#workerRpc<void, string>(WorkerCommand.Init, undefined);
await this.#workerRpc(WorkerCommand.Init);
}
GetQuery(id: string): Query | undefined {
GetQuery(id: string): QueryLike | undefined {
return undefined;
}
Query<T extends NoteStore>(type: new () => T, req: RequestBuilder): Query {
throw new Error("Method not implemented.");
Query<T extends NoteStore>(type: new () => T, req: RequestBuilder): QueryLike {
const chan = this.#workerRpc<[RequestBuilder], { id: string; port: MessagePort }>(WorkerCommand.Query, [req]);
return {
on: (_: "event", cb) => {
chan.then(c => {
c.port.onmessage = e => {
cb?.(e.data as Array<TaggedNostrEvent>);
};
});
},
off: (_: "event", cb) => {
chan.then(c => {
c.port.close();
});
},
cancel: () => {},
uncancel: () => {},
} as QueryLike;
}
Fetch(req: RequestBuilder, cb?: ((evs: TaggedNostrEvent[]) => void) | undefined): Promise<TaggedNostrEvent[]> {
throw new Error("Method not implemented.");
}
ConnectToRelay(address: string, options: RelaySettings): Promise<void> {
throw new Error("Method not implemented.");
async ConnectToRelay(address: string, options: RelaySettings) {
await this.#workerRpc(WorkerCommand.ConnectRelay, [address, options, false]);
}
DisconnectRelay(address: string): void {
throw new Error("Method not implemented.");
this.#workerRpc(WorkerCommand.DisconnectRelay, address);
}
HandleEvent(ev: TaggedNostrEvent): void {
@ -72,24 +131,26 @@ export class SystemWorker extends EventEmitter<NostrSystemEvents> implements Sys
}
get ProfileLoader(): ProfileLoaderService {
throw new Error("Method not implemented.");
return this.#profileLoader;
}
get RelayCache(): RelayCache {
throw new Error("Method not implemented.");
return this.#relayCache;
}
get Optimizer(): Optimizer {
throw new Error("Method not implemented.");
return DefaultOptimizer;
}
#workerRpc<T, R>(type: WorkerCommand, data: T, timeout = 5_000) {
#workerRpc<T, R>(type: WorkerCommand, data?: T, timeout = 5_000) {
const id = uuid();
this.#worker.postMessage({
const msg = {
id,
type,
data,
} as WorkerMessage<T>);
} as WorkerMessage<T>;
this.#log(msg);
this.#worker.postMessage(msg);
return new Promise<R>((resolve, reject) => {
let t: ReturnType<typeof setTimeout>;
this.#commandQueue.set(id, v => {