diff --git a/Cargo.lock b/Cargo.lock index aead4b8..8de7312 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.6.15" @@ -253,6 +262,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures-core" version = "0.3.30" @@ -277,6 +295,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indicatif" version = "0.17.8" @@ -357,10 +385,12 @@ dependencies = [ "clap", "hex", "indicatif", + "regex", "serde", "serde_json", "tokio", "tokio-stream", + "url", ] [[package]] @@ -378,6 +408,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -414,6 +450,35 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -481,6 +546,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.40.0" @@ -515,18 +595,44 @@ dependencies = [ "tokio", ] +[[package]] +name = "unicode-bidi" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-width" version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index de5c33c..832615d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,5 @@ async-compression = { version = "0.4.12", features = ["tokio", "gzip", "zstd", " indicatif = "0.17.8" tokio-stream = "0.1.16" async-stream = "0.3.5" +regex = "1.11.0" +url = "2.5.2" diff --git a/src/bin/main.rs b/src/bin/main.rs index 89cc9bd..8eeb7f5 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,17 +1,29 @@ use async_compression::tokio::write::ZstdEncoder; -use clap::Parser; +use clap::{Parser, ValueEnum}; +use nostr_archive_utils::cursor::NostrCursor; +use regex::Regex; +use serde::Serialize; use std::collections::HashMap; use std::path::PathBuf; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; -use nostr_archive_utils::cursor::NostrCursor; +use url::Url; + +#[derive(ValueEnum, Debug, Clone)] +enum ArgsOperation { + Combine, + MediaReport, +} #[derive(Parser)] #[command(about, version)] struct Args { #[arg(long)] pub dir: String, + + #[arg(long)] + pub operation: ArgsOperation, } #[tokio::main] @@ -20,7 +32,19 @@ async fn main() -> Result<(), anyhow::Error> { let dir: PathBuf = args.dir.parse()?; println!("Reading data from: {}", dir.to_str().unwrap()); + match args.operation { + ArgsOperation::Combine => { + combine(dir).await?; + } + ArgsOperation::MediaReport => { + media_report(dir).await?; + } + } + Ok(()) +} + +async fn combine(dir: PathBuf) -> Result<(), anyhow::Error> { let out_dir = &dir.join("out"); tokio::fs::create_dir_all(out_dir).await?; @@ -41,9 +65,9 @@ async fn main() -> Result<(), anyhow::Error> { } else { event_kinds.insert(e.kind, 1); } - let json = serde_json::to_vec(&e)?; - fout.write(json.as_slice()).await?; - fout.write("\n".as_bytes()).await?; + let json = serde_json::to_string(&e)?; + fout.write_all(json.as_bytes()).await?; + fout.write_all(b"\n").await?; } fout.flush().await?; @@ -52,6 +76,69 @@ async fn main() -> Result<(), anyhow::Error> { Ok(()) } +async fn media_report(dir: PathBuf) -> Result<(), anyhow::Error> { + let mut report = MediaReport::default(); + + let mut binding = NostrCursor::new(dir.clone()); + let mut cursor = Box::pin(binding.walk()); + let media_regex = Regex::new( + r"/((?:http|ftp|https|nostr|web\+nostr|magnet|lnurl[p|w]?):/?/?[\w+?.]+(?:[\p{L}\p{N}~!@#$%^&*()_\-=+\\/?.:;',]*)?[-a-z0-9+&@#/%=~()_|])/iu", + )?; + let file_exts = vec![ + ".webp", ".jpg", ".jpeg", ".bmp", ".png", ".gif", ".webm", ".mp4", ".mov", ".mkv", + ]; + let mut notes = 0u64; + while let Some(Ok(e)) = cursor.next().await { + if e.kind != 1 { + continue; + } + + notes += 1; + for text in media_regex.split(e.content.as_str()) { + if let Ok(u) = Url::parse(text) { + let ext = match file_exts + .iter() + .find(|e| text.to_ascii_lowercase().ends_with(*e)) + { + Some(ext) => ext, + None => continue, + }; + let host = match u.host_str() { + Some(host) => host, + None => continue, + }; + inc_map(&mut report.hosts_count, host, 1); + inc_map(&mut report.extensions, ext, 1); + + if let Some(imeta) = e.tags.iter().find(|e| e[0] == "imeta") { + if let Some(size) = imeta.iter().find(|a| a.starts_with("size")) { + let size_n = size.split(" ").last().unwrap().parse::()?; + inc_map(&mut report.hosts_size, host, size_n); + } + inc_map(&mut report.hosts_imeta, host, 1); + } else { + inc_map(&mut report.hosts_no_imeta, host, 1); + } + } + } + } + + println!("Processed {notes} notes, writing report!"); + let mut fout = File::create(dir.join("media_report.json")).await?; + fout.write_all(serde_json::to_vec(&report)?.as_slice()) + .await?; + + Ok(()) +} + +fn inc_map(map: &mut HashMap, key: &str, n: u64) { + if let Some(v) = map.get_mut(key) { + *v += n; + } else { + map.insert(key.to_string(), n); + } +} + async fn write_csv(dst: &PathBuf, data: &HashMap) -> Result<(), anyhow::Error> where K: ToString, @@ -59,8 +146,18 @@ where { let mut fout = File::create(dst).await?; for (k, v) in data { - fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes()).await?; + fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes()) + .await?; } fout.flush().await?; Ok(()) } + +#[derive(Serialize, Default)] +struct MediaReport { + pub hosts_count: HashMap, + pub hosts_size: HashMap, + pub hosts_imeta: HashMap, + pub hosts_no_imeta: HashMap, + pub extensions: HashMap, +} diff --git a/src/cursor.rs b/src/cursor.rs index e9b0c7e..37c4a6f 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -29,8 +29,7 @@ impl NostrCursor { } } - pub fn walk(&mut self) -> impl Stream> + '_ - { + pub fn walk(&mut self) -> impl Stream> + '_ { try_stream! { let mut dir_reader = tokio::fs::read_dir(&self.dir).await?; while let Ok(Some(path)) = dir_reader.next_entry().await { @@ -39,42 +38,63 @@ impl NostrCursor { } let path = path.path(); println!("Reading: {}", path.to_str().unwrap()); - let file = self.open_file(path).await?; + if let Ok(file) = self.open_file(path).await { + let mut file = BufReader::new(file); + let mut line = Vec::new(); + let mut lines = 0u64; + let mut duplicates = 0u64; - let mut file = BufReader::new(file); - let mut line = String::new(); - while let Ok(size) = file.read_line(&mut line).await { - if size == 0 { - break; - } + loop { + match file.read_until(10, &mut line).await { + Ok(size) => { + if size == 0 { + println!("EOF. lines={lines}, duplicates={duplicates}"); + break; + } + lines += 1; - if let Ok(event) = serde_json::from_str::(&line[..size]) { - let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); - if self.ids.insert(ev_id) { - yield event + let line_json = &line[..size]; + match serde_json::from_slice::(line_json) { + Ok(event) => { + let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); + if self.ids.insert(ev_id) { + yield event + } else { + duplicates += 1; + } + }, + Err(e) => { + println!("Invalid json on {} {e}", String::from_utf8_lossy(line_json)) + } + } + + line.clear(); + } + Err(e) => { + println!("Error reading file: {}", e); + break; + } } } - line.clear(); + } else { + println!("Could not open"); } } } } - async fn open_file(&self, path: PathBuf) -> Result>, anyhow::Error> - { + async fn open_file(&self, path: PathBuf) -> Result>, anyhow::Error> { let f = BufReader::new(File::open(path.clone()).await?); match path.extension() { - Some(ext) => { - match ext.to_str().unwrap() { - "json" => Ok(Box::pin(f)), - "jsonl" => Ok(Box::pin(f)), - "gz" => Ok(Box::pin(GzipDecoder::new(f))), - "zst" => Ok(Box::pin(ZstdDecoder::new(f))), - "bz2" => Ok(Box::pin(BzDecoder::new(f))), - _ => anyhow::bail!("Unknown extension") - } - } - None => anyhow::bail!("Could not determine archive format") + Some(ext) => match ext.to_str().unwrap() { + "json" => Ok(Box::pin(f)), + "jsonl" => Ok(Box::pin(f)), + "gz" => Ok(Box::pin(GzipDecoder::new(f))), + "zst" => Ok(Box::pin(ZstdDecoder::new(f))), + "bz2" => Ok(Box::pin(BzDecoder::new(f))), + _ => anyhow::bail!("Unknown extension"), + }, + None => anyhow::bail!("Could not determine archive format"), } } -} \ No newline at end of file +} diff --git a/src/event.rs b/src/event.rs index ef41892..c859c2d 100644 --- a/src/event.rs +++ b/src/event.rs @@ -9,4 +9,4 @@ pub struct NostrEvent { pub sig: String, pub content: String, pub tags: Vec>, -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index aecb31d..e14a99f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,2 @@ +pub mod cursor; pub mod event; -pub mod cursor; \ No newline at end of file