feat: r96util parallel
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
kieran 2025-02-12 14:24:13 +00:00
parent 069aa30d52
commit 317b0708e0
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
2 changed files with 31 additions and 24 deletions

View File

@ -24,7 +24,7 @@ required-features = ["r96util"]
name = "route96" name = "route96"
[features] [features]
default = ["nip96", "blossom", "analytics", "ranges", "react-ui"] default = ["nip96", "blossom", "analytics", "ranges", "react-ui", "r96util"]
media-compression = ["dep:ffmpeg-rs-raw", "dep:libc"] media-compression = ["dep:ffmpeg-rs-raw", "dep:libc"]
labels = ["nip96", "dep:candle-core", "dep:candle-nn", "dep:candle-transformers"] labels = ["nip96", "dep:candle-core", "dep:candle-nn", "dep:candle-transformers"]
nip96 = ["media-compression"] nip96 = ["media-compression"]

View File

@ -1,15 +1,17 @@
use anyhow::{Error, Result}; use anyhow::{Context, Error, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use config::Config; use config::Config;
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use log::info; use log::{error, info};
use route96::db::{Database, FileUpload}; use route96::db::{Database, FileUpload};
use route96::filesystem::{FileStore, FileSystemResult}; use route96::filesystem::{FileStore, FileSystemResult};
use route96::settings::Settings; use route96::settings::Settings;
use std::future::Future; use std::future::Future;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::Semaphore;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about)] #[command(version, about)]
@ -68,7 +70,7 @@ async fn main() -> Result<(), Error> {
Commands::Check { delete } => { Commands::Check { delete } => {
info!("Checking files in: {}", settings.storage_dir); info!("Checking files in: {}", settings.storage_dir);
let fs = FileStore::new(settings.clone()); let fs = FileStore::new(settings.clone());
iter_files(&fs.storage_dir(), |entry, p| { iter_files(&fs.storage_dir(), 4, |entry, p| {
let p = p.clone(); let p = p.clone();
Box::pin(async move { Box::pin(async move {
let id = if let Some(i) = id_from_path(&entry) { let id = if let Some(i) = id_from_path(&entry) {
@ -97,7 +99,7 @@ async fn main() -> Result<(), Error> {
let db = Database::new(&settings.database).await?; let db = Database::new(&settings.database).await?;
db.migrate().await?; db.migrate().await?;
info!("Importing from: {}", fs.storage_dir().display()); info!("Importing from: {}", fs.storage_dir().display());
iter_files(&from, |entry, p| { iter_files(&from, 4, |entry, p| {
let fs = fs.clone(); let fs = fs.clone();
let p = p.clone(); let p = p.clone();
Box::pin(async move { Box::pin(async move {
@ -124,7 +126,7 @@ async fn main() -> Result<(), Error> {
let db = Database::new(&settings.database).await?; let db = Database::new(&settings.database).await?;
db.migrate().await?; db.migrate().await?;
info!("Importing to DB from: {}", fs.storage_dir().display()); info!("Importing to DB from: {}", fs.storage_dir().display());
iter_files(&fs.storage_dir(), |entry, p| { iter_files(&fs.storage_dir(), 4, |entry, p| {
let db = db.clone(); let db = db.clone();
let p = p.clone(); let p = p.clone();
Box::pin(async move { Box::pin(async move {
@ -134,24 +136,22 @@ async fn main() -> Result<(), Error> {
p.set_message(format!("Skipping invalid file: {}", &entry.display())); p.set_message(format!("Skipping invalid file: {}", &entry.display()));
return Ok(()); return Ok(());
}; };
let u = db.get_file(&id).await?; let u = db.get_file(&id).await.context("db get_file")?;
if u.is_none() { if u.is_none() {
if !dry_run.unwrap_or(false) { if !dry_run.unwrap_or(false) {
p.set_message(format!("Importing file: {}", &entry.display())); p.set_message(format!("Importing file: {}", &entry.display()));
let mime = infer::get_from_path(&entry)? let mime = infer::get_from_path(&entry)
.context("infer")?
.map(|m| m.mime_type()) .map(|m| m.mime_type())
.unwrap_or("application/octet-stream") .unwrap_or("application/octet-stream")
.to_string(); .to_string();
let meta = entry.metadata().context("file metadata")?;
let entry = FileUpload { let entry = FileUpload {
id, id,
name: None, name: None,
size: entry.metadata()?.len(), size: meta.len(),
mime_type: mime, mime_type: mime,
created: entry created: meta.created().unwrap_or(SystemTime::now()).into(),
.metadata()?
.created()
.unwrap_or(SystemTime::now())
.into(),
width: None, width: None,
height: None, height: None,
blur_hash: None, blur_hash: None,
@ -159,7 +159,7 @@ async fn main() -> Result<(), Error> {
duration: None, duration: None,
bitrate: None, bitrate: None,
}; };
db.add_file(&entry, None).await?; db.add_file(&entry, None).await.context("db add_file")?;
} else { } else {
p.set_message(format!( p.set_message(format!(
"[DRY-RUN] Importing file: {}", "[DRY-RUN] Importing file: {}",
@ -180,10 +180,11 @@ fn id_from_path(path: &Path) -> Option<Vec<u8>> {
hex::decode(path.file_name()?.to_str()?).ok() hex::decode(path.file_name()?.to_str()?).ok()
} }
async fn iter_files<F>(p: &Path, mut op: F) -> Result<()> async fn iter_files<F>(p: &Path, threads: usize, mut op: F) -> Result<()>
where where
F: FnMut(PathBuf, &ProgressBar) -> Pin<Box<dyn Future<Output = Result<()>>>>, F: FnMut(PathBuf, ProgressBar) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>,
{ {
let semaphore = Arc::new(Semaphore::new(threads));
info!("Scanning files: {}", p.display()); info!("Scanning files: {}", p.display());
let entries = walkdir::WalkDir::new(p); let entries = walkdir::WalkDir::new(p);
let dir = entries let dir = entries
@ -194,15 +195,21 @@ where
let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template( let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template(
"{spinner} [{pos}/{len}] {msg}", "{spinner} [{pos}/{len}] {msg}",
)?); )?);
let mut all_tasks = vec![];
for entry in dir { for entry in dir {
let _lock = semaphore.clone().acquire_owned().await?;
p.inc(1); p.inc(1);
if let Err(e) = op(entry.path().to_path_buf(), &p).await { let fut = op(entry.path().to_path_buf(), p.clone());
p.set_message(format!( all_tasks.push(tokio::spawn(async move {
"Error processing file: {} {}", if let Err(e) = fut.await {
entry.path().display(), error!("Error processing file: {} {}", entry.path().display(), e);
e }
)); drop(_lock);
} }));
} }
for task in all_tasks {
task.await?;
}
p.finish_with_message("Done!");
Ok(()) Ok(())
} }