diff --git a/src/bin/r96util.rs b/src/bin/r96util.rs index 7603dcb..36e2f07 100644 --- a/src/bin/r96util.rs +++ b/src/bin/r96util.rs @@ -1,11 +1,14 @@ -use anyhow::Error; +use anyhow::{Error, Result}; use clap::{Parser, Subcommand}; use config::Config; -use log::{info, warn}; -use route96::db::Database; -use route96::filesystem::FileStore; +use log::{debug, error, info, warn}; +use route96::db::{Database, FileUpload}; +use route96::filesystem::{FileStore, FileSystemResult}; use route96::settings::Settings; -use std::path::PathBuf; +use std::future::Future; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::time::SystemTime; #[derive(Parser, Debug)] #[command(version, about)] @@ -20,11 +23,24 @@ struct Args { #[derive(Debug, Subcommand)] enum Commands { /// Check file hash matches filename / path - Check { delete: Option }, + Check { + #[arg(long)] + delete: Option, + }, /// Import a directory into the filesystem - /// (does NOT import files into the database) - Import { from: PathBuf }, + /// (does NOT import files into the database, use database-import command for that) + Import { + #[arg(long)] + from: PathBuf, + }, + + /// Import files from filesystem into database + DatabaseImport { + /// Don't actually import data and just print which files WOULD be imported + #[arg(long, default_missing_value = "true", num_args = 0..=1)] + dry_run: Option, + }, } #[tokio::main] @@ -51,30 +67,124 @@ async fn main() -> Result<(), Error> { Commands::Check { delete } => { info!("Checking files in: {}", settings.storage_dir); let fs = FileStore::new(settings.clone()); - let dir = walkdir::WalkDir::new(fs.storage_dir()); - let dir = dir.into_iter().filter_map(Result::ok).filter(|f| f.file_type().is_file()); - for entry in dir { - let id = if let Ok(f) = hex::decode(entry.file_name().to_str().unwrap()) { - f - } else { - warn!("Skipping invalid filename: {}", entry.path().display()); - continue; - }; - - let hash = FileStore::hash_file(entry.path()).await?; - if hash != id { - if delete.unwrap_or(false) { - warn!("Deleting corrupt file: {}", entry.path().display()); - tokio::fs::remove_file(entry.path()).await?; + iter_files(&fs.storage_dir(), |entry| { + Box::pin(async move { + let id = if let Some(i) = id_from_path(&entry) { + i } else { - warn!("File is corrupted: {}", entry.path().display()); + warn!("Skipping invalid file: {}", &entry.display()); + return Ok(()); + }; + + let hash = FileStore::hash_file(&entry).await?; + if hash != id { + if delete.unwrap_or(false) { + warn!("Deleting corrupt file: {}", &entry.display()); + tokio::fs::remove_file(&entry).await?; + } else { + warn!("File is corrupted: {}", &entry.display()); + } } - } - } + Ok(()) + }) + }) + .await?; } Commands::Import { from } => { - info!("Importing from: {}", from.display()); + let fs = FileStore::new(settings.clone()); let db = Database::new(&settings.database).await?; + db.migrate().await?; + info!("Importing from: {}", fs.storage_dir().display()); + iter_files(&from, |entry| { + let fs = fs.clone(); + Box::pin(async move { + let mime = infer::get_from_path(&entry)? + .map(|m| m.mime_type()) + .unwrap_or("application/octet-stream"); + let file = tokio::fs::File::open(&entry).await?; + let dst = fs.put(file, mime, false).await?; + match dst { + FileSystemResult::AlreadyExists(_) => { + info!("Duplicate file: {}", &entry.display()) + } + FileSystemResult::NewFile(_) => info!("Imported: {}", &entry.display()), + } + Ok(()) + }) + }) + .await?; + } + Commands::DatabaseImport { dry_run } => { + let fs = FileStore::new(settings.clone()); + let db = Database::new(&settings.database).await?; + db.migrate().await?; + info!("Importing to DB from: {}", fs.storage_dir().display()); + iter_files(&fs.storage_dir(), |entry| { + let db = db.clone(); + Box::pin(async move { + let id = if let Some(i) = id_from_path(&entry) { + i + } else { + warn!("Skipping invalid file: {}", &entry.display()); + return Ok(()); + }; + let u = db.get_file(&id).await?; + if u.is_none() { + if !dry_run.unwrap_or(false) { + info!("Importing file: {}", &entry.display()); + let mime = infer::get_from_path(&entry)? + .map(|m| m.mime_type()) + .unwrap_or("application/octet-stream") + .to_string(); + let entry = FileUpload { + id, + name: None, + size: entry.metadata()?.len(), + mime_type: mime, + created: entry + .metadata()? + .created() + .unwrap_or(SystemTime::now()) + .into(), + width: None, + height: None, + blur_hash: None, + alt: None, + duration: None, + bitrate: None, + }; + db.add_file(&entry, None).await?; + } else { + info!("[DRY-RUN] Importing file: {}", &entry.display()); + } + } + Ok(()) + }) + }) + .await?; + } + } + Ok(()) +} + +fn id_from_path(path: &Path) -> Option> { + hex::decode(path.file_name()?.to_str()?).ok() +} + +async fn iter_files(p: &Path, mut op: F) -> Result<()> +where + F: FnMut(PathBuf) -> Pin>>>, +{ + info!("Scanning files: {}", p.display()); + let entries = walkdir::WalkDir::new(p); + for entry in entries + .into_iter() + .filter_map(Result::ok) + .filter(|e| e.file_type().is_file()) + { + debug!("Checking file: {}", entry.path().display()); + if let Err(e) = op(entry.path().to_path_buf()).await { + error!("Error processing file: {} {}", entry.path().display(), e); } } Ok(()) diff --git a/src/bin/void_cat_migrate.rs b/src/bin/void_cat_migrate.rs index 2c08fa1..a468e0b 100644 --- a/src/bin/void_cat_migrate.rs +++ b/src/bin/void_cat_migrate.rs @@ -103,7 +103,7 @@ async fn migrate_file( let src_path = PathBuf::new() .join(&args.data_path) .join(VoidFile::map_to_path(&f.id)); - let dst_path = fs.map_path(&id_vec); + let dst_path = fs.get(&id_vec); if src_path.exists() && !dst_path.exists() { info!( "Copying file: {} from {} => {}", @@ -142,6 +142,6 @@ async fn migrate_file( duration: None, bitrate: None, }; - db.add_file(&fu, uid).await?; + db.add_file(&fu, Some(uid)).await?; Ok(()) } diff --git a/src/db.rs b/src/db.rs index b678057..cf3afbb 100644 --- a/src/db.rs +++ b/src/db.rs @@ -148,7 +148,7 @@ impl Database { .try_get(0) } - pub async fn add_file(&self, file: &FileUpload, user_id: u64) -> Result<(), Error> { + pub async fn add_file(&self, file: &FileUpload, user_id: Option) -> Result<(), Error> { let mut tx = self.pool.begin().await?; let q = sqlx::query("insert ignore into \ uploads(id,name,size,mime_type,blur_hash,width,height,alt,created,duration,bitrate) values(?,?,?,?,?,?,?,?,?,?,?)") @@ -165,10 +165,13 @@ impl Database { .bind(file.bitrate); tx.execute(q).await?; - let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)") - .bind(&file.id) - .bind(user_id); - tx.execute(q2).await?; + if let Some(user_id) = user_id { + let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)") + .bind(&file.id) + .bind(user_id); + + tx.execute(q2).await?; + } #[cfg(feature = "labels")] for lbl in &file.labels { diff --git a/src/filesystem.rs b/src/filesystem.rs index f55a0d6..9e91876 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -61,7 +61,7 @@ impl FileStore { /// Store a new file pub async fn put<'r, S>( &self, - stream: S, + path: S, mime_type: &str, compress: bool, ) -> Result @@ -69,7 +69,7 @@ impl FileStore { S: AsyncRead + Unpin + 'r, { // store file in temp path and hash the file - let (temp_file, size, hash) = self.store_hash_temp_file(stream).await?; + let (temp_file, size, hash) = self.store_hash_temp_file(path).await?; let dst_path = self.map_path(&hash); // check if file hash already exists @@ -247,7 +247,7 @@ impl FileStore { Ok(res.to_vec()) } - pub fn map_path(&self, id: &Vec) -> PathBuf { + fn map_path(&self, id: &Vec) -> PathBuf { let id = hex::encode(id); self.storage_dir().join(&id[0..2]).join(&id[2..4]).join(id) } diff --git a/src/routes/blossom.rs b/src/routes/blossom.rs index 0072f39..4af6675 100644 --- a/src/routes/blossom.rs +++ b/src/routes/blossom.rs @@ -415,7 +415,7 @@ where return BlossomResponse::error(format!("Failed to save file (db): {}", e)); } }; - if let Err(e) = db.add_file(&upload, user_id).await { + if let Err(e) = db.add_file(&upload, Some(user_id)).await { error!("{}", e.to_string()); BlossomResponse::error(format!("Error saving file (db): {}", e)) } else { diff --git a/src/routes/nip96.rs b/src/routes/nip96.rs index 1993d1a..ea35ee1 100644 --- a/src/routes/nip96.rs +++ b/src/routes/nip96.rs @@ -232,7 +232,7 @@ async fn upload( Err(e) => return Nip96Response::error(&format!("Could not save user: {}", e)), }; - if let Err(e) = db.add_file(&upload, user_id).await { + if let Err(e) = db.add_file(&upload, Some(user_id)).await { error!("{}", e.to_string()); return Nip96Response::error(&format!("Could not save file (db): {}", e)); }