From 01ad11b0ee2ddb4bdd305d20a217b43e1b3e5a7a Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 30 Sep 2024 18:16:43 +0100 Subject: [PATCH] refactor: stream api --- Cargo.lock | 103 +++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 12 +++++- src/bin/main.rs | 66 +++++++++++++++++++++++++++++++ src/cursor.rs | 72 ++++++++++++++++++--------------- src/lib.rs | 2 + src/main.rs | 37 ----------------- 6 files changed, 222 insertions(+), 70 deletions(-) create mode 100644 src/bin/main.rs create mode 100644 src/lib.rs delete mode 100644 src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 343e75a..aead4b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,6 +88,28 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -193,6 +215,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -202,6 +237,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "flate2" version = "1.0.34" @@ -236,6 +277,28 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "indicatif" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +dependencies = [ + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", +] + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -257,6 +320,12 @@ dependencies = [ "libc", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.159" @@ -279,18 +348,27 @@ dependencies = [ ] [[package]] -name = "nostr_stat" +name = "nostr_archive" version = "0.1.0" dependencies = [ "anyhow", "async-compression", + "async-stream", "clap", "hex", + "indicatif", "serde", "serde_json", "tokio", + "tokio-stream", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.36.4" @@ -312,6 +390,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "proc-macro2" version = "1.0.86" @@ -420,12 +504,29 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index c8240eb..de5c33c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,15 @@ [package] -name = "nostr_stat" +name = "nostr_archive" version = "0.1.0" edition = "2021" +[lib] +name = "nostr_archive_utils" + +[[bin]] +name = "nostr_archive" +path = "src/bin/main.rs" + [dependencies] anyhow = "1.0.89" clap = { version = "4.5.18", features = ["derive"] } @@ -11,3 +18,6 @@ tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "fs serde_json = "1.0.128" hex = "0.4.3" async-compression = { version = "0.4.12", features = ["tokio", "gzip", "zstd", "bzip2"] } +indicatif = "0.17.8" +tokio-stream = "0.1.16" +async-stream = "0.3.5" diff --git a/src/bin/main.rs b/src/bin/main.rs new file mode 100644 index 0000000..89cc9bd --- /dev/null +++ b/src/bin/main.rs @@ -0,0 +1,66 @@ +use async_compression::tokio::write::ZstdEncoder; +use clap::Parser; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; +use nostr_archive_utils::cursor::NostrCursor; + +#[derive(Parser)] +#[command(about, version)] +struct Args { + #[arg(long)] + pub dir: String, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let args = Args::parse(); + + let dir: PathBuf = args.dir.parse()?; + println!("Reading data from: {}", dir.to_str().unwrap()); + + let out_dir = &dir.join("out"); + tokio::fs::create_dir_all(out_dir).await?; + + let mut fout = ZstdEncoder::new(File::create(out_dir.join("combined.jsonl.zst")).await?); + let mut event_dates: HashMap = HashMap::new(); + let mut event_kinds: HashMap = HashMap::new(); + let mut binding = NostrCursor::new(dir); + let mut cursor = Box::pin(binding.walk()); + while let Some(Ok(e)) = cursor.next().await { + let day = e.created_at / (60 * 60 * 24); + if let Some(x) = event_dates.get_mut(&day) { + *x += 1u64; + } else { + event_dates.insert(day, 1); + } + if let Some(x) = event_kinds.get_mut(&e.kind) { + *x += 1u64; + } else { + event_kinds.insert(e.kind, 1); + } + let json = serde_json::to_vec(&e)?; + fout.write(json.as_slice()).await?; + fout.write("\n".as_bytes()).await?; + } + fout.flush().await?; + + write_csv(&out_dir.join("kinds.csv"), &event_kinds).await?; + write_csv(&out_dir.join("days.csv"), &event_dates).await?; + Ok(()) +} + +async fn write_csv(dst: &PathBuf, data: &HashMap) -> Result<(), anyhow::Error> +where + K: ToString, + V: ToString, +{ + let mut fout = File::create(dst).await?; + for (k, v) in data { + fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes()).await?; + } + fout.flush().await?; + Ok(()) +} diff --git a/src/cursor.rs b/src/cursor.rs index 108dd18..e9b0c7e 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,10 +1,12 @@ use crate::event::NostrEvent; use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, ZstdDecoder}; +use async_stream::try_stream; use std::collections::HashSet; use std::path::PathBuf; use std::pin::Pin; use tokio::fs::File; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio_stream::Stream; #[derive(Ord, PartialOrd, Eq, PartialEq, Hash)] struct EventId([u8; 32]); @@ -27,44 +29,52 @@ impl NostrCursor { } } - pub async fn walk(&mut self, mut fx: T) -> Result<(), anyhow::Error> - where - T: FnMut(&NostrEvent), + pub fn walk(&mut self) -> impl Stream> + '_ { - let mut dir_reader = tokio::fs::read_dir(&self.dir).await?; - while let Ok(Some(path)) = dir_reader.next_entry().await { - let path = path.path(); - println!("Reading: {}", path.to_str().unwrap()); - let file: Pin> = match path.extension() { - Some(ext) => { - let buf_reader = BufReader::new(File::open(path.clone()).await?); - match ext.to_str().unwrap() { - "json" => Box::pin(buf_reader), - "gz" => Box::pin(GzipDecoder::new(buf_reader)), - "zstd" => Box::pin(ZstdDecoder::new(buf_reader)), - "bz2" => Box::pin(BzDecoder::new(buf_reader)), - _ => anyhow::bail!("Unknown extension") - } - } - None => anyhow::bail!("Could not determine archive format") - }; - let mut file = BufReader::new(file); - let mut line = String::new(); - while let Ok(size) = file.read_line(&mut line).await { - if size == 0 { - break; + try_stream! { + let mut dir_reader = tokio::fs::read_dir(&self.dir).await?; + while let Ok(Some(path)) = dir_reader.next_entry().await { + if path.file_type().await?.is_dir() { + continue; } + let path = path.path(); + println!("Reading: {}", path.to_str().unwrap()); + let file = self.open_file(path).await?; - if let Ok(event) = serde_json::from_str::(&line[..size]) { - let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); - if self.ids.insert(ev_id) { - fx(&event); + let mut file = BufReader::new(file); + let mut line = String::new(); + while let Ok(size) = file.read_line(&mut line).await { + if size == 0 { + break; } + + if let Ok(event) = serde_json::from_str::(&line[..size]) { + let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); + if self.ids.insert(ev_id) { + yield event + } + } + line.clear(); } - line.clear(); } } + } - Ok(()) + async fn open_file(&self, path: PathBuf) -> Result>, anyhow::Error> + { + let f = BufReader::new(File::open(path.clone()).await?); + match path.extension() { + Some(ext) => { + match ext.to_str().unwrap() { + "json" => Ok(Box::pin(f)), + "jsonl" => Ok(Box::pin(f)), + "gz" => Ok(Box::pin(GzipDecoder::new(f))), + "zst" => Ok(Box::pin(ZstdDecoder::new(f))), + "bz2" => Ok(Box::pin(BzDecoder::new(f))), + _ => anyhow::bail!("Unknown extension") + } + } + None => anyhow::bail!("Could not determine archive format") + } } } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..aecb31d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,2 @@ +pub mod event; +pub mod cursor; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index f0f1a6c..0000000 --- a/src/main.rs +++ /dev/null @@ -1,37 +0,0 @@ -mod event; -mod cursor; - -use std::collections::HashMap; -use std::path::PathBuf; -use clap::Parser; -use crate::cursor::NostrCursor; - -#[derive(Parser)] -#[command(about, version)] -struct Args { - #[arg(long)] - pub dir: String, -} - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - let args = Args::parse(); - - let dir: PathBuf = args.dir.parse()?; - println!("Reading data from: {}", dir.to_str().unwrap()); - - let mut event_dates: HashMap = HashMap::new(); - let mut cursor = NostrCursor::new(dir); - cursor.walk(|e| { - let day = e.created_at / (60 * 60 * 24); - if let Some(x) = event_dates.get_mut(&day) { - *x += 1u64; - } else { - event_dates.insert(day, 1); - } - }).await?; - for (day, count) in event_dates { - println!("{}: {}", day, count); - } - Ok(()) -}