feat: r96util import
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
kieran 2025-02-12 12:11:58 +00:00
parent f5b206dad3
commit 4dad339c09
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
6 changed files with 152 additions and 39 deletions

View File

@ -1,11 +1,14 @@
use anyhow::Error;
use anyhow::{Error, Result};
use clap::{Parser, Subcommand};
use config::Config;
use log::{info, warn};
use route96::db::Database;
use route96::filesystem::FileStore;
use log::{debug, error, info, warn};
use route96::db::{Database, FileUpload};
use route96::filesystem::{FileStore, FileSystemResult};
use route96::settings::Settings;
use std::path::PathBuf;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::SystemTime;
#[derive(Parser, Debug)]
#[command(version, about)]
@ -20,11 +23,24 @@ struct Args {
#[derive(Debug, Subcommand)]
enum Commands {
/// Check file hash matches filename / path
Check { delete: Option<bool> },
Check {
#[arg(long)]
delete: Option<bool>,
},
/// Import a directory into the filesystem
/// (does NOT import files into the database)
Import { from: PathBuf },
/// (does NOT import files into the database, use database-import command for that)
Import {
#[arg(long)]
from: PathBuf,
},
/// Import files from filesystem into database
DatabaseImport {
/// Don't actually import data and just print which files WOULD be imported
#[arg(long, default_missing_value = "true", num_args = 0..=1)]
dry_run: Option<bool>,
},
}
#[tokio::main]
@ -51,30 +67,124 @@ async fn main() -> Result<(), Error> {
Commands::Check { delete } => {
info!("Checking files in: {}", settings.storage_dir);
let fs = FileStore::new(settings.clone());
let dir = walkdir::WalkDir::new(fs.storage_dir());
let dir = dir.into_iter().filter_map(Result::ok).filter(|f| f.file_type().is_file());
for entry in dir {
let id = if let Ok(f) = hex::decode(entry.file_name().to_str().unwrap()) {
f
} else {
warn!("Skipping invalid filename: {}", entry.path().display());
continue;
};
let hash = FileStore::hash_file(entry.path()).await?;
if hash != id {
if delete.unwrap_or(false) {
warn!("Deleting corrupt file: {}", entry.path().display());
tokio::fs::remove_file(entry.path()).await?;
iter_files(&fs.storage_dir(), |entry| {
Box::pin(async move {
let id = if let Some(i) = id_from_path(&entry) {
i
} else {
warn!("File is corrupted: {}", entry.path().display());
warn!("Skipping invalid file: {}", &entry.display());
return Ok(());
};
let hash = FileStore::hash_file(&entry).await?;
if hash != id {
if delete.unwrap_or(false) {
warn!("Deleting corrupt file: {}", &entry.display());
tokio::fs::remove_file(&entry).await?;
} else {
warn!("File is corrupted: {}", &entry.display());
}
}
}
}
Ok(())
})
})
.await?;
}
Commands::Import { from } => {
info!("Importing from: {}", from.display());
let fs = FileStore::new(settings.clone());
let db = Database::new(&settings.database).await?;
db.migrate().await?;
info!("Importing from: {}", fs.storage_dir().display());
iter_files(&from, |entry| {
let fs = fs.clone();
Box::pin(async move {
let mime = infer::get_from_path(&entry)?
.map(|m| m.mime_type())
.unwrap_or("application/octet-stream");
let file = tokio::fs::File::open(&entry).await?;
let dst = fs.put(file, mime, false).await?;
match dst {
FileSystemResult::AlreadyExists(_) => {
info!("Duplicate file: {}", &entry.display())
}
FileSystemResult::NewFile(_) => info!("Imported: {}", &entry.display()),
}
Ok(())
})
})
.await?;
}
Commands::DatabaseImport { dry_run } => {
let fs = FileStore::new(settings.clone());
let db = Database::new(&settings.database).await?;
db.migrate().await?;
info!("Importing to DB from: {}", fs.storage_dir().display());
iter_files(&fs.storage_dir(), |entry| {
let db = db.clone();
Box::pin(async move {
let id = if let Some(i) = id_from_path(&entry) {
i
} else {
warn!("Skipping invalid file: {}", &entry.display());
return Ok(());
};
let u = db.get_file(&id).await?;
if u.is_none() {
if !dry_run.unwrap_or(false) {
info!("Importing file: {}", &entry.display());
let mime = infer::get_from_path(&entry)?
.map(|m| m.mime_type())
.unwrap_or("application/octet-stream")
.to_string();
let entry = FileUpload {
id,
name: None,
size: entry.metadata()?.len(),
mime_type: mime,
created: entry
.metadata()?
.created()
.unwrap_or(SystemTime::now())
.into(),
width: None,
height: None,
blur_hash: None,
alt: None,
duration: None,
bitrate: None,
};
db.add_file(&entry, None).await?;
} else {
info!("[DRY-RUN] Importing file: {}", &entry.display());
}
}
Ok(())
})
})
.await?;
}
}
Ok(())
}
fn id_from_path(path: &Path) -> Option<Vec<u8>> {
hex::decode(path.file_name()?.to_str()?).ok()
}
async fn iter_files<F>(p: &Path, mut op: F) -> Result<()>
where
F: FnMut(PathBuf) -> Pin<Box<dyn Future<Output = Result<()>>>>,
{
info!("Scanning files: {}", p.display());
let entries = walkdir::WalkDir::new(p);
for entry in entries
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
{
debug!("Checking file: {}", entry.path().display());
if let Err(e) = op(entry.path().to_path_buf()).await {
error!("Error processing file: {} {}", entry.path().display(), e);
}
}
Ok(())

View File

@ -103,7 +103,7 @@ async fn migrate_file(
let src_path = PathBuf::new()
.join(&args.data_path)
.join(VoidFile::map_to_path(&f.id));
let dst_path = fs.map_path(&id_vec);
let dst_path = fs.get(&id_vec);
if src_path.exists() && !dst_path.exists() {
info!(
"Copying file: {} from {} => {}",
@ -142,6 +142,6 @@ async fn migrate_file(
duration: None,
bitrate: None,
};
db.add_file(&fu, uid).await?;
db.add_file(&fu, Some(uid)).await?;
Ok(())
}

View File

@ -148,7 +148,7 @@ impl Database {
.try_get(0)
}
pub async fn add_file(&self, file: &FileUpload, user_id: u64) -> Result<(), Error> {
pub async fn add_file(&self, file: &FileUpload, user_id: Option<u64>) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
let q = sqlx::query("insert ignore into \
uploads(id,name,size,mime_type,blur_hash,width,height,alt,created,duration,bitrate) values(?,?,?,?,?,?,?,?,?,?,?)")
@ -165,10 +165,13 @@ impl Database {
.bind(file.bitrate);
tx.execute(q).await?;
let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)")
.bind(&file.id)
.bind(user_id);
tx.execute(q2).await?;
if let Some(user_id) = user_id {
let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)")
.bind(&file.id)
.bind(user_id);
tx.execute(q2).await?;
}
#[cfg(feature = "labels")]
for lbl in &file.labels {

View File

@ -61,7 +61,7 @@ impl FileStore {
/// Store a new file
pub async fn put<'r, S>(
&self,
stream: S,
path: S,
mime_type: &str,
compress: bool,
) -> Result<FileSystemResult>
@ -69,7 +69,7 @@ impl FileStore {
S: AsyncRead + Unpin + 'r,
{
// store file in temp path and hash the file
let (temp_file, size, hash) = self.store_hash_temp_file(stream).await?;
let (temp_file, size, hash) = self.store_hash_temp_file(path).await?;
let dst_path = self.map_path(&hash);
// check if file hash already exists
@ -247,7 +247,7 @@ impl FileStore {
Ok(res.to_vec())
}
pub fn map_path(&self, id: &Vec<u8>) -> PathBuf {
fn map_path(&self, id: &Vec<u8>) -> PathBuf {
let id = hex::encode(id);
self.storage_dir().join(&id[0..2]).join(&id[2..4]).join(id)
}

View File

@ -415,7 +415,7 @@ where
return BlossomResponse::error(format!("Failed to save file (db): {}", e));
}
};
if let Err(e) = db.add_file(&upload, user_id).await {
if let Err(e) = db.add_file(&upload, Some(user_id)).await {
error!("{}", e.to_string());
BlossomResponse::error(format!("Error saving file (db): {}", e))
} else {

View File

@ -232,7 +232,7 @@ async fn upload(
Err(e) => return Nip96Response::error(&format!("Could not save user: {}", e)),
};
if let Err(e) = db.add_file(&upload, user_id).await {
if let Err(e) = db.add_file(&upload, Some(user_id)).await {
error!("{}", e.to_string());
return Nip96Response::error(&format!("Could not save file (db): {}", e));
}