1
0
forked from Kieran/snort

Compare commits

...

9 Commits

11 changed files with 258 additions and 60 deletions

View File

@ -47,5 +47,6 @@
"alby": {
"clientId": "pohiJjPhQR",
"clientSecret": "GAl1YKLA3FveK1gLBYok"
}
},
"useIndexedDBWorker": false
}

View File

@ -45,5 +45,6 @@
"wss://eden.nostr.land/": { "read": true, "write": false },
"wss://relay.nostr.band/": { "read": true, "write": true },
"wss://relay.damus.io/": { "read": true, "write": true }
}
},
"useIndexedDBEvents": true
}

View File

@ -96,6 +96,8 @@ declare const CONFIG: {
clientId: string;
clientSecret: string;
};
useIndexedDBEvents: boolean;
};
/**

View File

@ -27,6 +27,17 @@ export function rootTabItems(base: string, pubKey: string | undefined, tags: New
</>
),
},
{
tab: "for-you",
path: `${base}/for-you`,
show: Boolean(pubKey) && CONFIG.useIndexedDBEvents,
element: (
<>
<Icon name="user-v2" />
<FormattedMessage defaultMessage="For you" id="xEjBS7" />
</>
),
},
{
tab: "trending-notes",
path: `${base}/trending/notes`,

View File

@ -3,20 +3,23 @@ import { ReqFilter as Filter, TaggedNostrEvent } from "@snort/system";
import * as Comlink from "comlink";
import Dexie, { Table } from "dexie";
type Tag = {
type PackedNostrEvent = {
id: string;
eventId: string;
type: string;
value: string;
pubkey: string;
kind: number;
tags: Array<Array<string>>;
flatTags: string[];
sig: string;
created_at: number;
content: string;
relays: string[];
};
type SaveQueueEntry = { event: TaggedNostrEvent; tags: Tag[] };
type Task = () => Promise<void>;
class IndexedDB extends Dexie {
events!: Table<TaggedNostrEvent>;
tags!: Table<Tag>;
private saveQueue: SaveQueueEntry[] = [];
events!: Table<PackedNostrEvent>;
private saveQueue: PackedNostrEvent[] = [];
private subscribedEventIds = new Set<string>();
private subscribedAuthors = new Set<string>();
private subscribedTags = new Set<string>();
@ -28,27 +31,79 @@ class IndexedDB extends Dexie {
constructor() {
super("EventDB");
this.version(6).stores({
// TODO use multientry index for *tags
events: "++id, pubkey, kind, created_at, [pubkey+kind]",
tags: "&[type+value+eventId], [type+value], eventId",
this.version(7).stores({
events: "++id, pubkey, kind, created_at, [pubkey+kind], *flatTags",
});
this.startInterval();
}
async getForYouFeed(pubkey: string): Promise<TaggedNostrEvent[]> {
// get ids of events where pubkey is pubkey and kind is 7
const myReactedEvents = new Set<string>();
await this.events
.where("pubkey")
.equals(pubkey)
.each(e => {
e.tags.forEach(tag => {
if (tag[0] === "e") {
myReactedEvents.add(tag[1]);
}
});
});
console.log("myReactedEvents", myReactedEvents);
const othersWhoReacted = new Set<string>();
for (const id of myReactedEvents) {
await this.events
.where("flatTags")
.equals("e_" + id)
.each(e => {
if (e.pubkey !== pubkey) {
othersWhoReacted.add(e.pubkey);
}
});
}
console.log("othersWhoReacted.length", othersWhoReacted.size);
const reactedByOthers = new Set<string>();
for (const pubkey of othersWhoReacted) {
await this.events
.where("pubkey")
.equals(pubkey)
.each(e => {
e.tags.forEach(tag => {
if (tag[0] === "e") {
reactedByOthers.add(tag[1]);
}
});
});
}
const ids = [...reactedByOthers].filter(id => !myReactedEvents.has(id));
const events: TaggedNostrEvent[] = [];
for (const id of ids) {
await this.events
.where("id")
.equals(id)
.each(e => {
if (e.tags.some(t => t[0] === "e")) {
return; // no replies
}
events.push(this.unpack(e));
this.seenEvents.add(e.id);
});
}
return events.sort((a, b) => b.created_at - a.created_at);
}
private startInterval() {
const processQueue = async () => {
if (this.saveQueue.length > 0) {
try {
const eventsToSave: TaggedNostrEvent[] = [];
const tagsToSave: Tag[] = [];
for (const item of this.saveQueue) {
eventsToSave.push(item.event);
tagsToSave.push(...item.tags);
const eventsToSave: PackedNostrEvent[] = [];
for (const event of this.saveQueue) {
eventsToSave.push(event);
}
await this.events.bulkPut(eventsToSave);
await this.tags.bulkPut(tagsToSave);
} catch (e) {
console.error(e);
} finally {
@ -61,39 +116,46 @@ class IndexedDB extends Dexie {
setTimeout(() => processQueue(), 3000);
}
pack(event: TaggedNostrEvent): PackedNostrEvent {
const flatTags =
event.kind === 3
? []
: event.tags.filter(tag => ["e", "p", "d"].includes(tag[0])).map(tag => `${tag[0]}_${tag[1]}`);
return {
id: event.id,
pubkey: event.pubkey,
kind: event.kind,
tags: event.tags,
flatTags,
sig: event.sig,
created_at: event.created_at,
content: event.content,
relays: event.relays,
};
}
unpack(event: PackedNostrEvent): TaggedNostrEvent {
return {
id: event.id,
pubkey: event.pubkey,
kind: event.kind,
tags: event.tags,
sig: event.sig,
created_at: event.created_at,
content: event.content,
relays: event.relays,
};
}
handleEvent(event: TaggedNostrEvent) {
if (this.seenEvents.has(event.id)) {
return;
}
this.seenEvents.add(event.id);
// maybe we don't want event.kind 3 tags
const tags =
event.kind === 3
? []
: event.tags
?.filter(tag => {
if (tag[0] === "d") {
return true;
}
if (tag[0] === "e") {
return true;
}
// we're only interested in p tags where we are mentioned
/*
if (tag[0] === "p") {
Key.isMine(tag[1])) { // TODO
return true;
}*/
return false;
})
.map(tag => ({
eventId: event.id,
type: tag[0],
value: tag[1],
})) || [];
const packedEvent = this.pack(event);
this.saveQueue.push({ event, tags });
this.saveQueue.push(packedEvent);
}
private async startReadQueue() {
@ -124,7 +186,7 @@ class IndexedDB extends Dexie {
.where("pubkey")
.anyOf(authors)
.limit(limit || 1000)
.each(callback);
.each(e => callback(this.unpack(e)));
});
};
@ -132,21 +194,25 @@ class IndexedDB extends Dexie {
this.enqueueRead("getByEventIds", async () => {
const ids = [...this.subscribedEventIds];
this.subscribedEventIds.clear();
await this.events.where("id").anyOf(ids).each(callback);
await this.events
.where("id")
.anyOf(ids)
.each(e => callback(this.unpack(e)));
});
};
getByTags = async (callback: (event: TaggedNostrEvent) => void) => {
this.enqueueRead("getByTags", async () => {
const tagPairs = [...this.subscribedTags].map(tag => tag.split("|"));
const tags = [...this.subscribedTags];
this.subscribedTags.clear();
await this.tags
.where("[type+value]")
.anyOf(tagPairs)
.each(tag => this.subscribedEventIds.add(tag.eventId));
await this.getByEventIds(callback);
const flatTags = tags.map(tag => {
const [type, value] = tag.split("_");
return [type, value];
});
await this.events
.where("flatTags")
.anyOf(flatTags)
.each(e => callback(this.unpack(e)));
});
};
@ -188,7 +254,7 @@ class IndexedDB extends Dexie {
const values = filter[key];
if (Array.isArray(values)) {
for (const value of values) {
this.subscribedTags.add(tagName + "|" + value);
this.subscribedTags.add(`${tagName}_${value}`);
}
}
}

View File

@ -0,0 +1,65 @@
import { TaggedNostrEvent } from "@snort/system";
import { useContext, useEffect, useMemo, useState } from "react";
import { FormattedMessage } from "react-intl";
import { Link } from "react-router-dom";
import { TimelineRenderer } from "@/Components/Feed/TimelineRenderer";
import { TaskList } from "@/Components/Tasks/TaskList";
import useLogin from "@/Hooks/useLogin";
import { DeckContext } from "@/Pages/DeckLayout";
import messages from "@/Pages/messages";
import { indexedDBWorker, System } from "@/system";
const FollowsHint = () => {
const { publicKey: pubKey, follows } = useLogin();
if (follows.item?.length === 0 && pubKey) {
return (
<FormattedMessage
{...messages.NoFollows}
values={{
newUsersPage: (
<Link to={"/discover"}>
<FormattedMessage {...messages.NewUsers} />
</Link>
),
}}
/>
);
}
return null;
};
export const ForYouTab = () => {
const [notes, setNotes] = useState<TaggedNostrEvent[]>([]);
const { publicKey } = useLogin();
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const deckContext = useContext(DeckContext);
useEffect(() => {
indexedDBWorker.getForYouFeed(publicKey).then(notes => {
setNotes(notes);
notes.forEach(note => {
queueMicrotask(() => {
System.HandleEvent(note);
});
});
});
}, []);
const frags = useMemo(() => {
return [
{
events: notes,
refTime: Date.now(),
},
];
}, [notes]);
return (
<>
<FollowsHint />
<TaskList />
<TimelineRenderer frags={frags} latest={[]} />
</>
);
};

View File

@ -6,6 +6,7 @@ import HashTagsPage from "@/Pages/HashTagsPage";
import { ConversationsTab } from "@/Pages/Root/ConversationsTab";
import { DefaultTab } from "@/Pages/Root/DefaultTab";
import { FollowedByFriendsTab } from "@/Pages/Root/FollowedByFriendsTab";
import { ForYouTab } from "@/Pages/Root/ForYouTab";
import { GlobalTab } from "@/Pages/Root/GlobalTab";
import { NotesTab } from "@/Pages/Root/NotesTab";
import { TagsTab } from "@/Pages/Root/TagsTab";
@ -16,6 +17,10 @@ export const RootTabRoutes = [
path: "",
element: <DefaultTab />,
},
{
path: "for-you",
element: <ForYouTab />,
},
{
path: "global",
element: <GlobalTab />,

View File

@ -1700,6 +1700,9 @@
"x82IOl": {
"defaultMessage": "Mute"
},
"xEjBS7": {
"defaultMessage": "For you"
},
"xIcAOU": {
"defaultMessage": "Votes by {type}"
},

View File

@ -1,11 +1,15 @@
import { removeUndefined, throwIfOffline } from "@snort/shared";
import { mapEventToProfile, NostrEvent, NostrSystem, ProfileLoaderService, socialGraphInstance } from "@snort/system";
import * as Comlink from "comlink";
import { RelayMetrics, SystemDb, UserCache, UserRelays } from "@/Cache";
import { addCachedMetadataToFuzzySearch, addEventToFuzzySearch } from "@/Db/FuzzySearch";
import IndexedDBWorker from "@/Db/IndexedDB?worker";
import { LoginStore } from "@/Utils/Login";
import { hasWasm, WasmOptimizer } from "@/Utils/wasm";
export const indexedDBWorker = Comlink.wrap(new IndexedDBWorker());
/**
* Singleton nostr system
*/
@ -28,8 +32,30 @@ System.on("auth", async (c, r, cb) => {
System.on("event", (_, ev) => {
addEventToFuzzySearch(ev);
socialGraphInstance.handleEvent(ev);
if (CONFIG.useIndexedDBEvents && socialGraphInstance.getFollowDistance(ev.pubkey) <= 2) {
queueMicrotask(() => {
indexedDBWorker.handleEvent(ev);
});
}
});
/* disabled idb querying for now
System.on("filters", (req: BuiltRawReqFilter) => {
if (CONFIG.useIndexedDBEvents) {
req.filters.forEach(filter => {
indexedDB.find(
filter,
Comlink.proxy((e: TaggedNostrEvent) => {
queueMicrotask(() => {
System.HandleEvent(e);
});
}),
);
});
}
});
*/
System.profileCache.on("change", keys => {
const changed = removeUndefined(keys.map(a => System.profileCache.getFromCache(a)));
changed.forEach(addCachedMetadataToFuzzySearch);

View File

@ -561,6 +561,7 @@
"wtLjP6": "Copy ID",
"x/Fx2P": "Fund the services that you use by splitting a portion of all your zaps into a pool of funds!",
"x82IOl": "Mute",
"xEjBS7": "For you",
"xIcAOU": "Votes by {type}",
"xIoGG9": "Go to",
"xQtL3v": "Unlock",

View File

@ -1,5 +1,6 @@
import debug from "debug";
import EventEmitter from "eventemitter3";
import inMemoryDB from "./InMemoryDB";
import { FeedCache } from "@snort/shared";
import { NostrEvent, ReqFilter, TaggedNostrEvent } from "./nostr";
@ -122,6 +123,9 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
});
this.pool.on("event", (_, sub, ev) => {
ev.relays?.length && this.relayMetricsHandler.onEvent(ev.relays[0]);
queueMicrotask(() => {
inMemoryDB.handleEvent(ev);
});
this.emit("event", sub, ev);
});
this.pool.on("disconnect", (id, code) => {
@ -149,7 +153,17 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
this.#queryManager.on("trace", t => {
this.relayMetricsHandler.onTraceReport(t);
});
this.#queryManager.on("filters", (f: BuiltRawReqFilter) => this.emit("filters", f));
this.#queryManager.on("filters", (f: BuiltRawReqFilter) => {
f.filters.forEach(filter => {
queueMicrotask(() => {
inMemoryDB.find(filter, e => {
console.log("got from inmemorydb", e);
this.HandleEvent(e);
});
});
});
this.emit("filters", f);
});
}
get Sockets(): ConnectionStateSnapshot[] {
@ -191,8 +205,11 @@ export class NostrSystem extends EventEmitter<NostrSystemEvents> implements Syst
}
HandleEvent(ev: TaggedNostrEvent) {
this.emit("event", "*", ev);
this.#queryManager.handleEvent(ev);
queueMicrotask(() => {
inMemoryDB.handleEvent(ev);
this.emit("event", "*", ev);
this.#queryManager.handleEvent(ev);
});
}
async BroadcastEvent(ev: NostrEvent, cb?: (rsp: OkResponse) => void): Promise<OkResponse[]> {