feat: media report

This commit is contained in:
kieran 2024-10-07 20:41:23 +01:00
parent 01ad11b0ee
commit 491f2d3482
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
6 changed files with 261 additions and 36 deletions

106
Cargo.lock generated
View File

@ -17,6 +17,15 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.6.15" version = "0.6.15"
@ -253,6 +262,15 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
dependencies = [
"percent-encoding",
]
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.30" version = "0.3.30"
@ -277,6 +295,16 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "idna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "indicatif" name = "indicatif"
version = "0.17.8" version = "0.17.8"
@ -357,10 +385,12 @@ dependencies = [
"clap", "clap",
"hex", "hex",
"indicatif", "indicatif",
"regex",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"url",
] ]
[[package]] [[package]]
@ -378,6 +408,12 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.14" version = "0.2.14"
@ -414,6 +450,35 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "regex"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.24" version = "0.1.24"
@ -481,6 +546,21 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tinyvec"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.40.0" version = "1.40.0"
@ -515,18 +595,44 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "unicode-bidi"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.13" version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "unicode-normalization"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [
"tinyvec",
]
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"
version = "0.1.14" version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
[[package]]
name = "url"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]] [[package]]
name = "utf8parse" name = "utf8parse"
version = "0.2.2" version = "0.2.2"

View File

@ -21,3 +21,5 @@ async-compression = { version = "0.4.12", features = ["tokio", "gzip", "zstd", "
indicatif = "0.17.8" indicatif = "0.17.8"
tokio-stream = "0.1.16" tokio-stream = "0.1.16"
async-stream = "0.3.5" async-stream = "0.3.5"
regex = "1.11.0"
url = "2.5.2"

View File

@ -1,17 +1,29 @@
use async_compression::tokio::write::ZstdEncoder; use async_compression::tokio::write::ZstdEncoder;
use clap::Parser; use clap::{Parser, ValueEnum};
use nostr_archive_utils::cursor::NostrCursor;
use regex::Regex;
use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use nostr_archive_utils::cursor::NostrCursor; use url::Url;
#[derive(ValueEnum, Debug, Clone)]
enum ArgsOperation {
Combine,
MediaReport,
}
#[derive(Parser)] #[derive(Parser)]
#[command(about, version)] #[command(about, version)]
struct Args { struct Args {
#[arg(long)] #[arg(long)]
pub dir: String, pub dir: String,
#[arg(long)]
pub operation: ArgsOperation,
} }
#[tokio::main] #[tokio::main]
@ -20,7 +32,19 @@ async fn main() -> Result<(), anyhow::Error> {
let dir: PathBuf = args.dir.parse()?; let dir: PathBuf = args.dir.parse()?;
println!("Reading data from: {}", dir.to_str().unwrap()); println!("Reading data from: {}", dir.to_str().unwrap());
match args.operation {
ArgsOperation::Combine => {
combine(dir).await?;
}
ArgsOperation::MediaReport => {
media_report(dir).await?;
}
}
Ok(())
}
async fn combine(dir: PathBuf) -> Result<(), anyhow::Error> {
let out_dir = &dir.join("out"); let out_dir = &dir.join("out");
tokio::fs::create_dir_all(out_dir).await?; tokio::fs::create_dir_all(out_dir).await?;
@ -41,9 +65,9 @@ async fn main() -> Result<(), anyhow::Error> {
} else { } else {
event_kinds.insert(e.kind, 1); event_kinds.insert(e.kind, 1);
} }
let json = serde_json::to_vec(&e)?; let json = serde_json::to_string(&e)?;
fout.write(json.as_slice()).await?; fout.write_all(json.as_bytes()).await?;
fout.write("\n".as_bytes()).await?; fout.write_all(b"\n").await?;
} }
fout.flush().await?; fout.flush().await?;
@ -52,6 +76,69 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(()) Ok(())
} }
async fn media_report(dir: PathBuf) -> Result<(), anyhow::Error> {
let mut report = MediaReport::default();
let mut binding = NostrCursor::new(dir.clone());
let mut cursor = Box::pin(binding.walk());
let media_regex = Regex::new(
r"/((?:http|ftp|https|nostr|web\+nostr|magnet|lnurl[p|w]?):/?/?[\w+?.]+(?:[\p{L}\p{N}~!@#$%^&*()_\-=+\\/?.:;',]*)?[-a-z0-9+&@#/%=~()_|])/iu",
)?;
let file_exts = vec![
".webp", ".jpg", ".jpeg", ".bmp", ".png", ".gif", ".webm", ".mp4", ".mov", ".mkv",
];
let mut notes = 0u64;
while let Some(Ok(e)) = cursor.next().await {
if e.kind != 1 {
continue;
}
notes += 1;
for text in media_regex.split(e.content.as_str()) {
if let Ok(u) = Url::parse(text) {
let ext = match file_exts
.iter()
.find(|e| text.to_ascii_lowercase().ends_with(*e))
{
Some(ext) => ext,
None => continue,
};
let host = match u.host_str() {
Some(host) => host,
None => continue,
};
inc_map(&mut report.hosts_count, host, 1);
inc_map(&mut report.extensions, ext, 1);
if let Some(imeta) = e.tags.iter().find(|e| e[0] == "imeta") {
if let Some(size) = imeta.iter().find(|a| a.starts_with("size")) {
let size_n = size.split(" ").last().unwrap().parse::<u64>()?;
inc_map(&mut report.hosts_size, host, size_n);
}
inc_map(&mut report.hosts_imeta, host, 1);
} else {
inc_map(&mut report.hosts_no_imeta, host, 1);
}
}
}
}
println!("Processed {notes} notes, writing report!");
let mut fout = File::create(dir.join("media_report.json")).await?;
fout.write_all(serde_json::to_vec(&report)?.as_slice())
.await?;
Ok(())
}
fn inc_map(map: &mut HashMap<String, u64>, key: &str, n: u64) {
if let Some(v) = map.get_mut(key) {
*v += n;
} else {
map.insert(key.to_string(), n);
}
}
async fn write_csv<K, V>(dst: &PathBuf, data: &HashMap<K, V>) -> Result<(), anyhow::Error> async fn write_csv<K, V>(dst: &PathBuf, data: &HashMap<K, V>) -> Result<(), anyhow::Error>
where where
K: ToString, K: ToString,
@ -59,8 +146,18 @@ where
{ {
let mut fout = File::create(dst).await?; let mut fout = File::create(dst).await?;
for (k, v) in data { for (k, v) in data {
fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes()).await?; fout.write_all(format!("\"{}\",\"{}\"\n", k.to_string(), v.to_string()).as_bytes())
.await?;
} }
fout.flush().await?; fout.flush().await?;
Ok(()) Ok(())
} }
#[derive(Serialize, Default)]
struct MediaReport {
pub hosts_count: HashMap<String, u64>,
pub hosts_size: HashMap<String, u64>,
pub hosts_imeta: HashMap<String, u64>,
pub hosts_no_imeta: HashMap<String, u64>,
pub extensions: HashMap<String, u64>,
}

View File

@ -29,8 +29,7 @@ impl NostrCursor {
} }
} }
pub fn walk(&mut self) -> impl Stream<Item=Result<NostrEvent, anyhow::Error>> + '_ pub fn walk(&mut self) -> impl Stream<Item = Result<NostrEvent, anyhow::Error>> + '_ {
{
try_stream! { try_stream! {
let mut dir_reader = tokio::fs::read_dir(&self.dir).await?; let mut dir_reader = tokio::fs::read_dir(&self.dir).await?;
while let Ok(Some(path)) = dir_reader.next_entry().await { while let Ok(Some(path)) = dir_reader.next_entry().await {
@ -39,42 +38,63 @@ impl NostrCursor {
} }
let path = path.path(); let path = path.path();
println!("Reading: {}", path.to_str().unwrap()); println!("Reading: {}", path.to_str().unwrap());
let file = self.open_file(path).await?; if let Ok(file) = self.open_file(path).await {
let mut file = BufReader::new(file);
let mut line = Vec::new();
let mut lines = 0u64;
let mut duplicates = 0u64;
let mut file = BufReader::new(file); loop {
let mut line = String::new(); match file.read_until(10, &mut line).await {
while let Ok(size) = file.read_line(&mut line).await { Ok(size) => {
if size == 0 { if size == 0 {
break; println!("EOF. lines={lines}, duplicates={duplicates}");
} break;
}
lines += 1;
if let Ok(event) = serde_json::from_str::<NostrEvent>(&line[..size]) { let line_json = &line[..size];
let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?); match serde_json::from_slice::<NostrEvent>(line_json) {
if self.ids.insert(ev_id) { Ok(event) => {
yield event let ev_id = EventId(hex::decode(&event.id)?.as_slice().try_into()?);
if self.ids.insert(ev_id) {
yield event
} else {
duplicates += 1;
}
},
Err(e) => {
println!("Invalid json on {} {e}", String::from_utf8_lossy(line_json))
}
}
line.clear();
}
Err(e) => {
println!("Error reading file: {}", e);
break;
}
} }
} }
line.clear(); } else {
println!("Could not open");
} }
} }
} }
} }
async fn open_file(&self, path: PathBuf) -> Result<Pin<Box<dyn AsyncRead>>, anyhow::Error> async fn open_file(&self, path: PathBuf) -> Result<Pin<Box<dyn AsyncRead>>, anyhow::Error> {
{
let f = BufReader::new(File::open(path.clone()).await?); let f = BufReader::new(File::open(path.clone()).await?);
match path.extension() { match path.extension() {
Some(ext) => { Some(ext) => match ext.to_str().unwrap() {
match ext.to_str().unwrap() { "json" => Ok(Box::pin(f)),
"json" => Ok(Box::pin(f)), "jsonl" => Ok(Box::pin(f)),
"jsonl" => Ok(Box::pin(f)), "gz" => Ok(Box::pin(GzipDecoder::new(f))),
"gz" => Ok(Box::pin(GzipDecoder::new(f))), "zst" => Ok(Box::pin(ZstdDecoder::new(f))),
"zst" => Ok(Box::pin(ZstdDecoder::new(f))), "bz2" => Ok(Box::pin(BzDecoder::new(f))),
"bz2" => Ok(Box::pin(BzDecoder::new(f))), _ => anyhow::bail!("Unknown extension"),
_ => anyhow::bail!("Unknown extension") },
} None => anyhow::bail!("Could not determine archive format"),
}
None => anyhow::bail!("Could not determine archive format")
} }
} }
} }

View File

@ -1,2 +1,2 @@
pub mod event;
pub mod cursor; pub mod cursor;
pub mod event;