diff --git a/Cargo.lock b/Cargo.lock index ff81b83..0485f62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -691,6 +691,19 @@ dependencies = [ "yaml-rust2", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width", + "windows-sys 0.59.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -958,6 +971,12 @@ dependencies = [ "serde", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1975,6 +1994,19 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.17.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width", + "web-time", +] + [[package]] name = "infer" version = "0.19.0" @@ -2454,6 +2486,12 @@ dependencies = [ "syn", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.36.7" @@ -2760,6 +2798,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "powerfmt" version = "0.2.0" @@ -3265,6 +3309,7 @@ dependencies = [ "ffmpeg-rs-raw", "hex", "http-range-header", + "indicatif", "infer", "libc", "log", @@ -3279,6 +3324,7 @@ dependencies = [ "tokio", "tokio-util", "uuid", + "walkdir", ] [[package]] @@ -4528,6 +4574,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-width" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -4742,6 +4794,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "whoami" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index e77979f..da191b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ path = "src/bin/main.rs" name = "route96" [features] -default = ["nip96", "blossom", "analytics", "react-ui", "payments"] +default = ["nip96", "blossom", "analytics", "react-ui", "payments", "r96util"] media-compression = ["dep:ffmpeg-rs-raw", "dep:libc"] labels = ["media-compression", "dep:candle-core", "dep:candle-nn", "dep:candle-transformers"] nip96 = ["media-compression"] @@ -19,6 +19,7 @@ blossom = [] analytics = [] react-ui = [] payments = ["dep:fedimint-tonic-lnd"] +r96util = ["dep:walkdir", "dep:indicatif"] [dependencies] log = "0.4.21" @@ -48,4 +49,6 @@ ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "aa candle-core = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } candle-nn = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } candle-transformers = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } -fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = false, features = ["invoicesrpc", "lightningrpc"] } \ No newline at end of file +fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = false, features = ["invoicesrpc", "lightningrpc"] } +walkdir = { version = "2.5.0", optional = true } +indicatif = { version = "0.17.11", optional = true } \ No newline at end of file diff --git a/src/bin/r96util.rs b/src/bin/r96util.rs new file mode 100644 index 0000000..5df0e34 --- /dev/null +++ b/src/bin/r96util.rs @@ -0,0 +1,229 @@ +use anyhow::{Context, Error, Result}; +use clap::{Parser, Subcommand}; +use config::Config; +use indicatif::{ProgressBar, ProgressStyle}; +use log::{error, info}; +use route96::db::{Database, FileUpload}; +use route96::filesystem::{FileStore, FileSystemResult}; +use route96::processing::probe_file; +use route96::settings::Settings; +use std::future::Future; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::Arc; +use std::time::SystemTime; +use pretty_env_logger::env_logger; +use tokio::sync::Semaphore; + +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + #[arg(long)] + pub config: Option, + + #[clap(subcommand)] + pub command: Commands, +} + +#[derive(Debug, Subcommand)] +enum Commands { + /// Check file hash matches filename / path + Check { + #[arg(long)] + delete: Option, + }, + + /// Import a directory into the filesystem + /// (does NOT import files into the database, use database-import command for that) + Import { + #[arg(long)] + from: PathBuf, + #[arg(long, default_missing_value = "true", num_args = 0..=1)] + probe_media: Option, + }, + + /// Import files from filesystem into database + DatabaseImport { + /// Don't actually import data and just print which files WOULD be imported + #[arg(long, default_missing_value = "true", num_args = 0..=1)] + dry_run: Option, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "info"); } + } + env_logger::init(); + + let args: Args = Args::parse(); + + let builder = Config::builder() + .add_source(config::File::with_name(if let Some(ref c) = args.config { + c.as_str() + } else { + "config.yaml" + })) + .add_source(config::Environment::with_prefix("APP")) + .build()?; + + let settings: Settings = builder.try_deserialize()?; + + match args.command { + Commands::Check { delete } => { + info!("Checking files in: {}", settings.storage_dir); + let fs = FileStore::new(settings.clone()); + iter_files(&fs.storage_dir(), 4, |entry, p| { + let p = p.clone(); + Box::pin(async move { + let id = if let Some(i) = id_from_path(&entry) { + i + } else { + p.set_message(format!("Skipping invalid file: {}", &entry.display())); + return Ok(()); + }; + + let hash = FileStore::hash_file(&entry).await?; + if hash != id { + if delete.unwrap_or(false) { + p.set_message(format!("Deleting corrupt file: {}", &entry.display())); + tokio::fs::remove_file(&entry).await?; + } else { + p.set_message(format!("File is corrupted: {}", &entry.display())); + } + } + Ok(()) + }) + }) + .await?; + } + Commands::Import { from, probe_media } => { + let fs = FileStore::new(settings.clone()); + let db = Database::new(&settings.database).await?; + db.migrate().await?; + info!("Importing from: {}", fs.storage_dir().display()); + iter_files(&from, 4, |entry, p| { + let fs = fs.clone(); + let p = p.clone(); + Box::pin(async move { + let mime = infer::get_from_path(&entry)? + .map(|m| m.mime_type()) + .unwrap_or("application/octet-stream"); + + // test media is not corrupt + if probe_media.unwrap_or(true) + && (mime.starts_with("image/") || mime.starts_with("video/")) + && probe_file(&entry).is_err() + { + p.set_message(format!("Skipping media invalid file: {}", &entry.display())); + return Ok(()); + } + + let file = tokio::fs::File::open(&entry).await?; + let dst = fs.put(file, mime, false).await?; + match dst { + FileSystemResult::AlreadyExists(_) => { + p.set_message(format!("Duplicate file: {}", &entry.display())); + } + FileSystemResult::NewFile(_) => { + p.set_message(format!("Imported: {}", &entry.display())); + } + } + Ok(()) + }) + }) + .await?; + } + Commands::DatabaseImport { dry_run } => { + let fs = FileStore::new(settings.clone()); + let db = Database::new(&settings.database).await?; + db.migrate().await?; + info!("Importing to DB from: {}", fs.storage_dir().display()); + iter_files(&fs.storage_dir(), 4, |entry, p| { + let db = db.clone(); + let p = p.clone(); + Box::pin(async move { + let id = if let Some(i) = id_from_path(&entry) { + i + } else { + p.set_message(format!("Skipping invalid file: {}", &entry.display())); + return Ok(()); + }; + let u = db.get_file(&id).await.context("db get_file")?; + if u.is_none() { + if !dry_run.unwrap_or(false) { + p.set_message(format!("Importing file: {}", &entry.display())); + let mime = infer::get_from_path(&entry) + .context("infer")? + .map(|m| m.mime_type()) + .unwrap_or("application/octet-stream") + .to_string(); + let meta = entry.metadata().context("file metadata")?; + let entry = FileUpload { + id, + name: None, + size: meta.len(), + mime_type: mime, + created: meta.created().unwrap_or(SystemTime::now()).into(), + width: None, + height: None, + blur_hash: None, + alt: None, + duration: None, + bitrate: None, + }; + db.add_file(&entry, None).await.context("db add_file")?; + } else { + p.set_message(format!( + "[DRY-RUN] Importing file: {}", + &entry.display() + )); + } + } + Ok(()) + }) + }) + .await?; + } + } + Ok(()) +} + +fn id_from_path(path: &Path) -> Option> { + hex::decode(path.file_name()?.to_str()?).ok() +} + +async fn iter_files(p: &Path, threads: usize, mut op: F) -> Result<()> +where + F: FnMut(PathBuf, ProgressBar) -> Pin> + Send>>, +{ + let semaphore = Arc::new(Semaphore::new(threads)); + info!("Scanning files: {}", p.display()); + let entries = walkdir::WalkDir::new(p); + let dir = entries + .into_iter() + .filter_map(Result::ok) + .filter(|e| e.file_type().is_file()) + .collect::>(); + let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template( + "{spinner} [{pos}/{len}] {msg}", + )?); + let mut all_tasks = vec![]; + for entry in dir { + let _lock = semaphore.clone().acquire_owned().await?; + p.inc(1); + let fut = op(entry.path().to_path_buf(), p.clone()); + all_tasks.push(tokio::spawn(async move { + if let Err(e) = fut.await { + error!("Error processing file: {} {}", entry.path().display(), e); + } + drop(_lock); + })); + } + for task in all_tasks { + task.await?; + } + p.finish_with_message("Done!"); + Ok(()) +} \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 2957f0f..5334cb1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -193,7 +193,7 @@ impl Database { .try_get(0) } - pub async fn add_file(&self, file: &FileUpload, user_id: u64) -> Result<(), Error> { + pub async fn add_file(&self, file: &FileUpload, user_id: Option) -> Result<(), Error> { let mut tx = self.pool.begin().await?; let q = sqlx::query("insert ignore into \ uploads(id,name,size,mime_type,blur_hash,width,height,alt,created,duration,bitrate) values(?,?,?,?,?,?,?,?,?,?,?)") @@ -210,10 +210,12 @@ impl Database { .bind(file.bitrate); tx.execute(q).await?; - let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)") - .bind(&file.id) - .bind(user_id); - tx.execute(q2).await?; + if let Some(uid) = user_id { + let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)") + .bind(&file.id) + .bind(uid); + tx.execute(q2).await?; + } #[cfg(feature = "labels")] for lbl in &file.labels { diff --git a/src/routes/blossom.rs b/src/routes/blossom.rs index e9e9db2..cd33f35 100644 --- a/src/routes/blossom.rs +++ b/src/routes/blossom.rs @@ -465,7 +465,7 @@ where } } } - if let Err(e) = db.add_file(&upload, user_id).await { + if let Err(e) = db.add_file(&upload, Some(user_id)).await { error!("{}", e); BlossomResponse::error(format!("Error saving file (db): {}", e)) } else { diff --git a/src/routes/nip96.rs b/src/routes/nip96.rs index 7a22f4e..f1f5ba3 100644 --- a/src/routes/nip96.rs +++ b/src/routes/nip96.rs @@ -281,7 +281,7 @@ async fn upload( } } - if let Err(e) = db.add_file(&upload, user_id).await { + if let Err(e) = db.add_file(&upload, Some(user_id)).await { error!("{}", e); return Nip96Response::error(&format!("Could not save file (db): {}", e)); }