feat: Added caching, subscriiption for drives

This commit is contained in:
florian 2024-03-18 10:05:40 +01:00
parent 7322c4781d
commit d0185042c4
2 changed files with 127 additions and 30 deletions

View File

@ -1,12 +1,14 @@
import NDK from "@nostr-dev-kit/ndk";
import NDK, { NDKEvent, NDKKind } from "@nostr-dev-kit/ndk";
import debug from "debug";
import NodeCache from "node-cache";
import { Drive } from "./types.js";
import uniqBy from "lodash/uniqBy.js";
const driveCache = new NodeCache({ stdTTL: 30 }); // 30s for development
const driveCache = new NodeCache({ stdTTL: 60 * 60 }); // 5min for development
const log = debug("web:drive:nostr");
// TODO use relays from naddr
const ndk = new NDK({
explicitRelayUrls: [
"wss://nostrue.com",
@ -19,6 +21,57 @@ const ndk = new NDK({
ndk.connect();
export const DRIVE_KIND = 30563;
const handleEvent = (event: NDKEvent | null): Drive | undefined => {
if (!event) return;
const kind = event.kind;
const pubkey = event.pubkey;
const driveIdentifier = event.tags.find((t) => t[0] === "d")?.[1];
const driveKey = `${kind}-${pubkey}-${driveIdentifier}`;
log(driveKey);
const treeTags = event.tags.filter((t) => t[0] === "x" || t[0] === "folder");
const files = treeTags.map((t) => ({
hash: t[1],
path: t[2],
size: parseInt(t[3], 10) || 0,
mimeType: t[4],
}));
// log(files);
const servers = event.tags.filter((t) => t[0] === "r" && t[1]).map((t) => new URL("/", t[1]).toString()) || [];
// log(servers);
const drive = { files, servers };
driveCache.set(driveKey, drive);
return drive;
};
const fetchAllDrives = async () => {
// TODO incremental update with from date
let sortedEvents: NDKEvent[] = [];
const sub = await ndk.subscribe(
{
kinds: [DRIVE_KIND as NDKKind],
},
{ closeOnEose: true },
);
sub.on("event", (event: NDKEvent) => {
const newEvents = sortedEvents
.concat([event])
.sort((a: NDKEvent, b: NDKEvent) => (b.created_at ?? 0) - (a.created_at ?? 0));
sortedEvents = uniqBy(newEvents, (e: NDKEvent) => e.tagId());
});
sub.start();
sub.on("close", () => {
for (const event of sortedEvents) {
handleEvent(event);
}
});
setTimeout(() => sub.stop(), 10000);
};
export const readDrive = async (kind: number, pubkey: string, driveIdentifier: string): Promise<Drive | undefined> => {
const driveKey = `${kind}-${pubkey}-${driveIdentifier}`;
if (driveCache.has(driveKey)) {
@ -38,25 +91,13 @@ export const readDrive = async (kind: number, pubkey: string, driveIdentifier: s
log("fetch finsihed");
if (event) {
const treeTags = event.tags.filter((t) => t[0] === "x" || t[0] === "folder");
const files = treeTags.map((t) => ({
hash: t[1],
path: t[2],
size: parseInt(t[3], 10) || 0,
mimeType: t[4],
}));
// log(files);
const servers = event.tags.filter((t) => t[0] === "r" && t[1]).map((t) => new URL("/", t[1]).toString()) || [];
// log(servers);
const drive = { files, servers };
driveCache.set(driveKey, drive);
return drive;
return handleEvent(event);
} else {
log("no drive event found.");
}
};
// TODO add lastchange date, to fetch changes
// Load all drives into cache and refresh periodically
fetchAllDrives();
setInterval(() => fetchAllDrives(), 60000);

View File

@ -7,6 +7,9 @@ import { AddressPointer } from "nostr-tools/nip19";
import { Drive, FileMeta } from "./types.js";
import { readDrive } from "./drive.js";
import { searchCdn } from "./cdn.js";
import { PassThrough } from "stream";
import fs from "node:fs";
import nodePath from "path";
const log = debug("web");
const app = new Koa();
@ -24,6 +27,11 @@ const findFolderContents = (drive: Drive, filePathToSearch: string): FileMeta[]
return drive.files.filter((f) => f.path.startsWith("/" + filePathToSearch));
};
const cacheDir = "./cache";
if (!fs.existsSync(cacheDir)) {
fs.mkdirSync(cacheDir, { recursive: true });
}
// handle errors
app.use(async (ctx, next) => {
try {
@ -41,6 +49,27 @@ app.use(async (ctx, next) => {
}
});
router.get("/health", async (ctx, next) => {
ctx.status = 200;
});
const storeInCache = (src: NodeJS.ReadableStream, cacheFile: string) => {
log("storing cache file:", cacheFile);
const cacheFileStream = fs.createWriteStream(cacheFile);
src.pipe(cacheFileStream);
//const hash = createHash("sha256");
//stream.pipe(hash);
return new Promise<void>((res) => {
src.on("end", async () => {
log("cache file stored:", cacheFile);
res();
});
});
};
router.get("(.*)", async (ctx, next) => {
log(`serving: host:${ctx.hostname} path:${ctx.path}`);
@ -76,7 +105,7 @@ router.get("(.*)", async (ctx, next) => {
// fetch drive descriptor -> ndk
const drive = await readDrive(kind, pubkey, identifier);
log(`drive: ${drive}`);
// log(`drive: ${drive}`);
if (drive) {
// lookup file in file-list
@ -84,22 +113,49 @@ router.get("(.*)", async (ctx, next) => {
// log(`fileMeta: ${fileMeta}`);
if (fileMeta) {
const { hash, mimeType } = fileMeta;
const { hash, mimeType, size } = fileMeta;
// lookup media sevrers for user -> ndk (optional)
const cdnSource = await searchCdn([...drive.servers, ...additionalServers], hash);
//log(cdnSource);
if (cdnSource) {
const cacheFile = nodePath.join(cacheDir, hash);
if (fs.existsSync(cacheFile)) {
log(`returning cached data for ${hash}`);
ctx.set({
"Content-Type": mimeType,
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
ctx.body = cdnSource;
const src = fs.createReadStream(cacheFile);
ctx.body = src;
} else {
log("no CDN server found for blob: " + hash);
// lookup media sevrers for user -> ndk (optional)
const cdnSource = await searchCdn([...drive.servers, ...additionalServers], hash);
//log(cdnSource);
if (cdnSource) {
if (size < 100000) {
// if small file < 100KB, download and serve downloaded file
await storeInCache(cdnSource, cacheFile);
ctx.set({
"Content-Type": mimeType,
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
ctx.body = fs.createReadStream(cacheFile);
} else {
// else ()>100kb) stream content from backend.
// TODO or maybe redirect????
ctx.set({
"Content-Type": mimeType,
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
ctx.body = cdnSource;
}
} else {
log("no CDN server found for blob: " + hash);
}
}
} else {
const folder = findFolderContents(drive, searchPath);