This commit is contained in:
Martti Malmi 2023-08-29 23:16:08 +03:00
parent 1e3480899f
commit 811e81ea61
3 changed files with 65 additions and 35 deletions

View File

@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest';
import MemoryAdapter from '@/state/MemoryAdapter.ts'; import MemoryAdapter from '@/state/MemoryAdapter.ts';
import { Callback, Unsubscribe } from '@/state/types.ts'; import { Callback, Unsubscribe } from '@/state/types.ts';
import Node from './Node'; import Node, { DIR_VALUE } from './Node';
describe('Node', () => { describe('Node', () => {
let node; let node;
@ -100,10 +100,10 @@ describe('Node', () => {
const node = new Node({ id: 'root', adapters: [new MemoryAdapter()] }); const node = new Node({ id: 'root', adapters: [new MemoryAdapter()] });
const mockCallback: Callback = vi.fn(); const mockCallback: Callback = vi.fn();
const unsubscribe = node.get('chats').map(mockCallback); const unsubscribe = node.get('chats').map(mockCallback);
await node.get('chats').get('someChatId').put({ id: 'someChatId' }); await node.get('chats').get('someChatId').get('latest').put({ id: 'messageId', text: 'hi' });
expect(mockCallback).toHaveBeenCalledWith( expect(mockCallback).toHaveBeenCalledWith(
{ id: 'someChatId' }, DIR_VALUE,
'root/chats/someChatId', 'root/chats/someChatId',
expect.any(Number), expect.any(Number),
expect.any(Function), expect.any(Function),

View File

@ -12,8 +12,13 @@ type NodeProps = {
parent?: Node | null; parent?: Node | null;
}; };
const DIR_VALUE = '__DIR__'; export const DIR_VALUE = '__DIR__';
/**
* Nodes represent queries into the tree rather than the tree itself. The actual tree data is stored by Adapters.
*
* Node can be a branch node or a leaf node. Branch nodes have children, leaf nodes have a value (stored in an adapter).
*/
export default class Node { export default class Node {
id: string; id: string;
parent: Node | null; parent: Node | null;
@ -21,7 +26,7 @@ export default class Node {
on_subscriptions = new Map<number, Callback>(); on_subscriptions = new Map<number, Callback>();
map_subscriptions = new Map<number, Callback>(); map_subscriptions = new Map<number, Callback>();
adapters: Adapter[]; adapters: Adapter[];
counter = 0; private counter = 0;
constructor({ id = '', adapters, parent = null }: NodeProps = {}) { constructor({ id = '', adapters, parent = null }: NodeProps = {}) {
this.id = id; this.id = id;
@ -29,6 +34,10 @@ export default class Node {
this.adapters = adapters ?? parent?.adapters ?? [new MemoryAdapter(), new LocalForageAdapter()]; this.adapters = adapters ?? parent?.adapters ?? [new MemoryAdapter(), new LocalForageAdapter()];
} }
isBranchNode() {
return this.children.size > 0;
}
/** /**
* *
* @param key * @param key
@ -45,8 +54,10 @@ export default class Node {
return new_node; return new_node;
} }
private async putLeaf(value: any, updatedAt: number) { private async putValue(value: any, updatedAt: number) {
this.children = new Map(); if (value !== DIR_VALUE) {
this.children = new Map();
}
const nodeValue: NodeValue = { const nodeValue: NodeValue = {
updatedAt, updatedAt,
value, value,
@ -58,11 +69,12 @@ export default class Node {
await Promise.all(promises); await Promise.all(promises);
} }
private async putBranch(value: Record<string, any>, updatedAt: number) { private async putChildValues(value: Record<string, any>, updatedAt: number) {
const promises = this.adapters.map((adapter) => const promises = this.adapters.map((adapter) =>
adapter.set(this.id, { value: DIR_VALUE, updatedAt }), adapter.set(this.id, { value: DIR_VALUE, updatedAt }),
); );
const children = Object.keys(value); const children = Object.keys(value);
// the following probably causes the same callbacks to be fired too many times
const childPromises = children.map((key) => this.get(key).put(value[key], updatedAt)); const childPromises = children.map((key) => this.get(key).put(value[key], updatedAt));
await Promise.all([...promises, ...childPromises]); await Promise.all([...promises, ...childPromises]);
} }
@ -74,51 +86,68 @@ export default class Node {
*/ */
async put(value: any, updatedAt = Date.now()) { async put(value: any, updatedAt = Date.now()) {
if (typeof value === 'object' && value !== null) { if (typeof value === 'object' && value !== null) {
await this.putBranch(value, updatedAt); await this.putChildValues(value, updatedAt);
} else { } else {
await this.putLeaf(value, updatedAt); await this.putValue(value, updatedAt);
} }
if (this.parent) { if (this.parent) {
this.parent.map_subscriptions.forEach((callback) => { await this.parent.put(DIR_VALUE, updatedAt);
callback(value, this.id, updatedAt, () => {}); const childName = this.id.split('/').pop()!;
}); if (!this.parent.children.has(childName)) {
this.parent.children.set(childName, this);
}
for (const [id, callback] of this.parent.map_subscriptions) {
console.log('calling map callback of ', this.parent.id, ' with ', this.id, value);
callback(value, this.id, updatedAt, () => {
this.parent?.map_subscriptions.delete(id);
});
}
} }
} }
doBranchNodeCallback(callback: Callback) {
const aggregated: Record<string, any> = {};
const keys = Array.from(this.children.keys());
const total = keys.length;
let count = 0;
keys.forEach((key) => {
this.children.get(key)?.once((childValue) => {
aggregated[key] = childValue;
count++;
if (count === total) {
callback(aggregated, this.id, Date.now(), () => {});
}
});
});
}
// note to self: may be problematic that on behaves differently for leaf and branch nodes
/** /**
* Subscribe to a value * Subscribe to a value
* @param callback * @param callback
*/ */
on(callback: Callback): Unsubscribe { on(callback: Callback, returnIfUndefined: boolean = false): Unsubscribe {
let latest: NodeValue | null = null; let latest: NodeValue | null = null;
const cb = (value, path, updatedAt, unsubscribe) => { const cb = (value, path, updatedAt, unsubscribe) => {
if (value !== DIR_VALUE && (latest === null || latest.updatedAt < value.updatedAt)) { if (value !== DIR_VALUE && (latest === null || latest.updatedAt < value.updatedAt)) {
latest = { value, updatedAt }; if (value !== undefined || returnIfUndefined) {
callback(value, path, updatedAt, unsubscribe); latest = { value, updatedAt };
callback(value, path, updatedAt, unsubscribe);
}
// TODO send to other adapters? or PubSub which decides where to send? // TODO send to other adapters? or PubSub which decides where to send?
} }
}; };
const subId = this.counter++; const subId = this.counter++;
this.on_subscriptions.set(subId, cb); this.on_subscriptions.set(subId, cb);
// if it's not a dir, adapters will call the callback directly
const adapterSubs = this.adapters.map((adapter) => adapter.get(this.id, cb)); const adapterSubs = this.adapters.map((adapter) => adapter.get(this.id, cb));
if (this.children.size > 0) { if (this.isBranchNode()) {
const aggregated: Record<string, any> = {}; this.doBranchNodeCallback(callback);
const keys = Array.from(this.children.keys());
const total = keys.length;
let count = 0;
keys.forEach((key) => {
this.children.get(key)?.once((childValue) => {
aggregated[key] = childValue;
count++;
if (count === total) {
callback(aggregated, this.id, Date.now(), () => {});
}
});
});
} }
const unsubscribe = () => { const unsubscribe = () => {
@ -137,7 +166,7 @@ export default class Node {
this.map_subscriptions.set(id, callback); this.map_subscriptions.set(id, callback);
const unsubscribe = () => this.map_subscriptions.delete(id); const unsubscribe = () => this.map_subscriptions.delete(id);
for (const child of this.children.values()) { for (const child of this.children.values()) {
child.once(callback, unsubscribe); child.once(callback, false, unsubscribe);
} }
return unsubscribe; return unsubscribe;
} }
@ -147,7 +176,7 @@ export default class Node {
* @param callback * @param callback
* @param unsubscribe * @param unsubscribe
*/ */
once(callback?: Callback, unsubscribe?: Unsubscribe): Promise<any> { once(callback?: Callback, returnIfUndefined = false, unsubscribe?: Unsubscribe): Promise<any> {
return new Promise((resolve) => { return new Promise((resolve) => {
const cb = (value, updatedAt, path, unsub) => { const cb = (value, updatedAt, path, unsub) => {
if (unsubscribe) { if (unsubscribe) {
@ -157,7 +186,7 @@ export default class Node {
callback?.(value, updatedAt, path, unsub); callback?.(value, updatedAt, path, unsub);
unsub(); unsub();
}; };
this.on(cb); this.on(cb, returnIfUndefined);
}); });
} }
} }

View File

@ -12,4 +12,5 @@ export type Callback = (
export abstract class Adapter { export abstract class Adapter {
abstract get(path: string, callback: Callback): Unsubscribe; abstract get(path: string, callback: Callback): Unsubscribe;
abstract set(path: string, data: NodeValue): Promise<void>; abstract set(path: string, data: NodeValue): Promise<void>;
// abstract list(path: string, callback: Callback): Unsubscribe; ?
} }