diff --git a/Cargo.toml b/Cargo.toml index 1bf252f..8cf3af8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ required-features = ["r96util"] name = "route96" [features] -default = ["nip96", "blossom", "analytics", "ranges", "react-ui"] +default = ["nip96", "blossom", "analytics", "ranges", "react-ui", "r96util"] media-compression = ["dep:ffmpeg-rs-raw", "dep:libc"] labels = ["nip96", "dep:candle-core", "dep:candle-nn", "dep:candle-transformers"] nip96 = ["media-compression"] diff --git a/src/bin/r96util.rs b/src/bin/r96util.rs index ba82790..a8ceb9b 100644 --- a/src/bin/r96util.rs +++ b/src/bin/r96util.rs @@ -1,15 +1,17 @@ -use anyhow::{Error, Result}; +use anyhow::{Context, Error, Result}; use clap::{Parser, Subcommand}; use config::Config; use indicatif::{ProgressBar, ProgressStyle}; -use log::info; +use log::{error, info}; use route96::db::{Database, FileUpload}; use route96::filesystem::{FileStore, FileSystemResult}; use route96::settings::Settings; use std::future::Future; use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::sync::Arc; use std::time::SystemTime; +use tokio::sync::Semaphore; #[derive(Parser, Debug)] #[command(version, about)] @@ -68,7 +70,7 @@ async fn main() -> Result<(), Error> { Commands::Check { delete } => { info!("Checking files in: {}", settings.storage_dir); let fs = FileStore::new(settings.clone()); - iter_files(&fs.storage_dir(), |entry, p| { + iter_files(&fs.storage_dir(), 4, |entry, p| { let p = p.clone(); Box::pin(async move { let id = if let Some(i) = id_from_path(&entry) { @@ -97,7 +99,7 @@ async fn main() -> Result<(), Error> { let db = Database::new(&settings.database).await?; db.migrate().await?; info!("Importing from: {}", fs.storage_dir().display()); - iter_files(&from, |entry, p| { + iter_files(&from, 4, |entry, p| { let fs = fs.clone(); let p = p.clone(); Box::pin(async move { @@ -124,7 +126,7 @@ async fn main() -> Result<(), Error> { let db = Database::new(&settings.database).await?; db.migrate().await?; info!("Importing to DB from: {}", fs.storage_dir().display()); - iter_files(&fs.storage_dir(), |entry, p| { + iter_files(&fs.storage_dir(), 4, |entry, p| { let db = db.clone(); let p = p.clone(); Box::pin(async move { @@ -134,24 +136,22 @@ async fn main() -> Result<(), Error> { p.set_message(format!("Skipping invalid file: {}", &entry.display())); return Ok(()); }; - let u = db.get_file(&id).await?; + let u = db.get_file(&id).await.context("db get_file")?; if u.is_none() { if !dry_run.unwrap_or(false) { p.set_message(format!("Importing file: {}", &entry.display())); - let mime = infer::get_from_path(&entry)? + let mime = infer::get_from_path(&entry) + .context("infer")? .map(|m| m.mime_type()) .unwrap_or("application/octet-stream") .to_string(); + let meta = entry.metadata().context("file metadata")?; let entry = FileUpload { id, name: None, - size: entry.metadata()?.len(), + size: meta.len(), mime_type: mime, - created: entry - .metadata()? - .created() - .unwrap_or(SystemTime::now()) - .into(), + created: meta.created().unwrap_or(SystemTime::now()).into(), width: None, height: None, blur_hash: None, @@ -159,7 +159,7 @@ async fn main() -> Result<(), Error> { duration: None, bitrate: None, }; - db.add_file(&entry, None).await?; + db.add_file(&entry, None).await.context("db add_file")?; } else { p.set_message(format!( "[DRY-RUN] Importing file: {}", @@ -180,10 +180,11 @@ fn id_from_path(path: &Path) -> Option> { hex::decode(path.file_name()?.to_str()?).ok() } -async fn iter_files(p: &Path, mut op: F) -> Result<()> +async fn iter_files(p: &Path, threads: usize, mut op: F) -> Result<()> where - F: FnMut(PathBuf, &ProgressBar) -> Pin>>>, + F: FnMut(PathBuf, ProgressBar) -> Pin> + Send>>, { + let semaphore = Arc::new(Semaphore::new(threads)); info!("Scanning files: {}", p.display()); let entries = walkdir::WalkDir::new(p); let dir = entries @@ -194,15 +195,21 @@ where let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template( "{spinner} [{pos}/{len}] {msg}", )?); + let mut all_tasks = vec![]; for entry in dir { + let _lock = semaphore.clone().acquire_owned().await?; p.inc(1); - if let Err(e) = op(entry.path().to_path_buf(), &p).await { - p.set_message(format!( - "Error processing file: {} {}", - entry.path().display(), - e - )); - } + let fut = op(entry.path().to_path_buf(), p.clone()); + all_tasks.push(tokio::spawn(async move { + if let Err(e) = fut.await { + error!("Error processing file: {} {}", entry.path().display(), e); + } + drop(_lock); + })); } + for task in all_tasks { + task.await?; + } + p.finish_with_message("Done!"); Ok(()) }