refactor: stream api

This commit is contained in:
kieran 2024-09-30 18:16:43 +01:00
parent 4ebaa31250
commit 01ad11b0ee
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
6 changed files with 222 additions and 70 deletions

103
Cargo.lock generated
View File

@ -88,6 +88,28 @@ dependencies = [
"zstd-safe", "zstd-safe",
] ]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.74" version = "0.3.74"
@ -193,6 +215,19 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "console"
version = "0.15.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb"
dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"unicode-width",
"windows-sys",
]
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.4.2" version = "1.4.2"
@ -202,6 +237,12 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.34" version = "1.0.34"
@ -236,6 +277,28 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "indicatif"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3"
dependencies = [
"console",
"instant",
"number_prefix",
"portable-atomic",
"unicode-width",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "is_terminal_polyfill" name = "is_terminal_polyfill"
version = "1.70.1" version = "1.70.1"
@ -257,6 +320,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.159" version = "0.2.159"
@ -279,18 +348,27 @@ dependencies = [
] ]
[[package]] [[package]]
name = "nostr_stat" name = "nostr_archive"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-compression", "async-compression",
"async-stream",
"clap", "clap",
"hex", "hex",
"indicatif",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream",
] ]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]] [[package]]
name = "object" name = "object"
version = "0.36.4" version = "0.36.4"
@ -312,6 +390,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "portable-atomic"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.86" version = "1.0.86"
@ -420,12 +504,29 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.13" version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "unicode-width"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
[[package]] [[package]]
name = "utf8parse" name = "utf8parse"
version = "0.2.2" version = "0.2.2"

View File

@ -1,8 +1,15 @@
[package] [package]
name = "nostr_stat" name = "nostr_archive"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[lib]
name = "nostr_archive_utils"
[[bin]]
name = "nostr_archive"
path = "src/bin/main.rs"
[dependencies] [dependencies]
anyhow = "1.0.89" anyhow = "1.0.89"
clap = { version = "4.5.18", features = ["derive"] } clap = { version = "4.5.18", features = ["derive"] }
@ -11,3 +18,6 @@ tokio = { version = "1.40.0", features = ["rt", "rt-multi-thread", "macros", "fs
serde_json = "1.0.128" serde_json = "1.0.128"
hex = "0.4.3" hex = "0.4.3"
async-compression = { version = "0.4.12", features = ["tokio", "gzip", "zstd", "bzip2"] } async-compression = { version = "0.4.12", features = ["tokio", "gzip", "zstd", "bzip2"] }
indicatif = "0.17.8"
tokio-stream = "0.1.16"
async-stream = "0.3.5"

66
src/bin/main.rs Normal file
View File

@ -0,0 +1,66 @@
use async_compression::tokio::write::ZstdEncoder;
use clap::Parser;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use nostr_archive_utils::cursor::NostrCursor;
#[derive(Parser)]
#[command(about, version)]
struct Args {
#[arg(long)]
pub dir: String,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args = Args::parse();
let dir: PathBuf = args.dir.parse()?;
println!("Reading data from: {}", dir.to_str().unwrap());
let out_dir = &dir.join("out");
tokio::fs::create_dir_all(out_dir).await?;
let mut fout = ZstdEncoder::new(File::create(out_dir.join("combined.jsonl.zst")).await?);
let mut event_dates: HashMap<u64, u64> = HashMap::new();
let mut event_kinds: HashMap<u32, u64> = HashMap::new();
let mut binding = NostrCursor::new(dir);
let mut cursor = Box::pin(binding.walk());
while let Some(Ok(e)) = cursor.next().await {
let day = e.created_at / (60 * 60 * 24);
if let Some(x) = event_dates.get_mut(&day) {
*x += 1u64;
} else {
event_dates.insert(day, 1);
}
if let Some(x) = event_kinds.get_mut(&e.kind) {
*x += 1u64;
} else {
event_kinds.insert(e.kind, 1);
}
let json = serde_json::to_vec(&e)?;
fout.write(json.as_slice()).await?;
fout.write("\n".as_bytes()).await?;
}
fout.flush().await?;
write_csv(&out_dir.join("kinds.csv"), &event_kinds).await?;
write_csv(&out_dir.join("days.csv"), &event_dates).await?;
Ok(())
}
async fn write_csv<K, V>(dst: &PathBuf, data: &HashMap<K, V>) -> Result<(), anyhow::Error>
where
K: ToString,
V: ToString,
{
let mut fout = File::create(dst).await?;
for (k, v) in data {
fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes()).await?;
}
fout.flush().await?;
Ok(())
}

View File

@ -1,10 +1,12 @@
use crate::event::NostrEvent; use crate::event::NostrEvent;
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, ZstdDecoder}; use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, ZstdDecoder};
use async_stream::try_stream;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio_stream::Stream;
#[derive(Ord, PartialOrd, Eq, PartialEq, Hash)] #[derive(Ord, PartialOrd, Eq, PartialEq, Hash)]
struct EventId([u8; 32]); struct EventId([u8; 32]);
@ -27,44 +29,52 @@ impl NostrCursor {
} }
} }
pub async fn walk<T>(&mut self, mut fx: T) -> Result<(), anyhow::Error> pub fn walk(&mut self) -> impl Stream<Item=Result<NostrEvent, anyhow::Error>> + '_
where
T: FnMut(&NostrEvent),
{ {
let mut dir_reader = tokio::fs::read_dir(&self.dir).await?; try_stream! {
while let Ok(Some(path)) = dir_reader.next_entry().await { let mut dir_reader = tokio::fs::read_dir(&self.dir).await?;
let path = path.path(); while let Ok(Some(path)) = dir_reader.next_entry().await {
println!("Reading: {}", path.to_str().unwrap()); if path.file_type().await?.is_dir() {
let file: Pin<Box<dyn AsyncRead>> = match path.extension() { continue;
Some(ext) => {
let buf_reader = BufReader::new(File::open(path.clone()).await?);
match ext.to_str().unwrap() {
"json" => Box::pin(buf_reader),
"gz" => Box::pin(GzipDecoder::new(buf_reader)),
"zstd" => Box::pin(ZstdDecoder::new(buf_reader)),
"bz2" => Box::pin(BzDecoder::new(buf_reader)),
_ => anyhow::bail!("Unknown extension")
}
}
None => anyhow::bail!("Could not determine archive format")
};
let mut file = BufReader::new(file);
let mut line = String::new();
while let Ok(size) = file.read_line(&mut line).await {
if size == 0 {
break;
} }
let path = path.path();
println!("Reading: {}", path.to_str().unwrap());
let file = self.open_file(path).await?;
if let Ok(event) = serde_json::from_str::<NostrEvent>(&line[..size]) { let mut file = BufReader::new(file);
let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); let mut line = String::new();
if self.ids.insert(ev_id) { while let Ok(size) = file.read_line(&mut line).await {
fx(&event); if size == 0 {
break;
} }
if let Ok(event) = serde_json::from_str::<NostrEvent>(&line[..size]) {
let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?);
if self.ids.insert(ev_id) {
yield event
}
}
line.clear();
} }
line.clear();
} }
} }
}
Ok(()) async fn open_file(&self, path: PathBuf) -> Result<Pin<Box<dyn AsyncRead>>, anyhow::Error>
{
let f = BufReader::new(File::open(path.clone()).await?);
match path.extension() {
Some(ext) => {
match ext.to_str().unwrap() {
"json" => Ok(Box::pin(f)),
"jsonl" => Ok(Box::pin(f)),
"gz" => Ok(Box::pin(GzipDecoder::new(f))),
"zst" => Ok(Box::pin(ZstdDecoder::new(f))),
"bz2" => Ok(Box::pin(BzDecoder::new(f))),
_ => anyhow::bail!("Unknown extension")
}
}
None => anyhow::bail!("Could not determine archive format")
}
} }
} }

2
src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod event;
pub mod cursor;

View File

@ -1,37 +0,0 @@
mod event;
mod cursor;
use std::collections::HashMap;
use std::path::PathBuf;
use clap::Parser;
use crate::cursor::NostrCursor;
#[derive(Parser)]
#[command(about, version)]
struct Args {
#[arg(long)]
pub dir: String,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args = Args::parse();
let dir: PathBuf = args.dir.parse()?;
println!("Reading data from: {}", dir.to_str().unwrap());
let mut event_dates: HashMap<u64, u64> = HashMap::new();
let mut cursor = NostrCursor::new(dir);
cursor.walk(|e| {
let day = e.created_at / (60 * 60 * 24);
if let Some(x) = event_dates.get_mut(&day) {
*x += 1u64;
} else {
event_dates.insert(day, 1);
}
}).await?;
for (day, count) in event_dates {
println!("{}: {}", day, count);
}
Ok(())
}