Compare commits

...

11 Commits

Author SHA1 Message Date
6998f0ffac
feat: r96util probe media before importing
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-12 14:52:06 +00:00
317b0708e0
feat: r96util parallel
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-12 14:24:13 +00:00
069aa30d52
feat: r96util progress
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-12 13:50:59 +00:00
4dad339c09
feat: r96util import
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-12 12:11:58 +00:00
f5b206dad3
fix: walkdir
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-12 11:23:31 +00:00
b6bd190252
feat: r96util
All checks were successful
continuous-integration/drone/push Build is passing
2025-02-10 20:48:40 +00:00
3b4bb866ab
fix: uploaded timestamp blossom 2025-02-07 14:36:01 +00:00
c885a71295
feat: return thumbnail url in meta 2025-02-07 09:50:46 +00:00
e1fca9a604
feat: filter list by mime 2025-02-06 22:33:26 +00:00
16a14de5d6
fix: dont patch video metadata for image files (always empty) 2025-02-04 13:30:22 +00:00
314d0c68af
feat: accept void-cat uuid 2025-02-04 13:09:02 +00:00
13 changed files with 374 additions and 37 deletions

62
Cargo.lock generated
View File

@ -737,6 +737,19 @@ dependencies = [
"yaml-rust2",
]
[[package]]
name = "console"
version = "0.15.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea3c6ecd8059b57859df5c69830340ed3c41d30e3da0c1cbed90a96ac853041b"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"unicode-width",
"windows-sys 0.59.0",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@ -1040,6 +1053,12 @@ dependencies = [
"serde",
]
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "encoding_rs"
version = "0.8.35"
@ -2017,6 +2036,19 @@ dependencies = [
"serde",
]
[[package]]
name = "indicatif"
version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width",
"web-time",
]
[[package]]
name = "infer"
version = "0.16.0"
@ -2525,6 +2557,12 @@ dependencies = [
"syn",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.36.5"
@ -2801,6 +2839,12 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -3195,6 +3239,7 @@ dependencies = [
"ffmpeg-rs-raw",
"hex",
"http-range-header",
"indicatif",
"infer",
"libc",
"log",
@ -3213,6 +3258,7 @@ dependencies = [
"tokio",
"tokio-util",
"uuid",
"walkdir",
]
[[package]]
@ -4353,6 +4399,12 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]]
name = "unicode-width"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
[[package]]
name = "unicode-xid"
version = "0.2.6"
@ -4560,6 +4612,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "whoami"
version = "1.5.2"

View File

@ -15,11 +15,16 @@ required-features = ["bin-void-cat-force-migrate"]
name = "route96"
path = "src/bin/main.rs"
[[bin]]
name = "r96util"
path = "src/bin/r96util.rs"
required-features = ["r96util"]
[lib]
name = "route96"
[features]
default = ["nip96", "blossom", "analytics", "ranges", "react-ui"]
default = ["nip96", "blossom", "analytics", "ranges", "react-ui", "r96util"]
media-compression = ["dep:ffmpeg-rs-raw", "dep:libc"]
labels = ["nip96", "dep:candle-core", "dep:candle-nn", "dep:candle-transformers"]
nip96 = ["media-compression"]
@ -31,6 +36,7 @@ analytics = []
void-cat-redirects = ["dep:sqlx-postgres"]
ranges = ["dep:http-range-header"]
react-ui = []
r96util = ["dep:walkdir", "dep:indicatif"]
[dependencies]
log = "0.4.21"
@ -54,6 +60,7 @@ mime2ext = "0.1.53"
infer = "0.16.0"
tokio-util = { version = "0.7.13", features = ["io", "io-util"] }
libc = { version = "0.2.153", optional = true }
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385", optional = true }
candle-core = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true }
@ -63,3 +70,5 @@ sqlx-postgres = { version = "0.8.2", optional = true, features = ["chrono", "uui
http-range-header = { version = "0.4.2", optional = true }
nostr-cursor = { git = "https://git.v0l.io/Kieran/nostr_backup_proc.git", branch = "main", optional = true }
regex = { version = "1.11.1", optional = true }
walkdir = { version = "2.5.0", optional = true }
indicatif = { version = "0.17.11", optional = true }

View File

@ -68,7 +68,9 @@ impl MediaMetadata {
impl Database {
pub async fn get_missing_media_metadata(&mut self) -> Result<Vec<FileUpload>> {
let results: Vec<FileUpload> = sqlx::query_as("select * from uploads where (width is null or height is null or bitrate is null or duration is null) and (mime_type like 'image/%' or mime_type like 'video/%')")
let results: Vec<FileUpload> = sqlx::query_as("select * from uploads where \
(mime_type like 'image/%' and (width is null or height is null)) or \
(mime_type like 'video/%' and (width is null or height is null or bitrate is null or duration is null))")
.fetch_all(&self.pool)
.await?;

View File

@ -73,7 +73,13 @@ async fn main() -> Result<(), Error> {
.attach(Shield::new()) // disable
.mount(
"/",
routes![root, get_blob, head_blob, routes::void_cat_redirect, routes::void_cat_redirect_head],
routes![
root,
get_blob,
head_blob,
routes::void_cat_redirect,
routes::void_cat_redirect_head
],
)
.mount("/admin", routes::admin_routes());

228
src/bin/r96util.rs Normal file
View File

@ -0,0 +1,228 @@
use anyhow::{Context, Error, Result};
use clap::{Parser, Subcommand};
use config::Config;
use indicatif::{ProgressBar, ProgressStyle};
use log::{error, info};
use route96::db::{Database, FileUpload};
use route96::filesystem::{FileStore, FileSystemResult};
use route96::processing::probe_file;
use route96::settings::Settings;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Semaphore;
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
#[arg(long)]
pub config: Option<String>,
#[clap(subcommand)]
pub command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
/// Check file hash matches filename / path
Check {
#[arg(long)]
delete: Option<bool>,
},
/// Import a directory into the filesystem
/// (does NOT import files into the database, use database-import command for that)
Import {
#[arg(long)]
from: PathBuf,
#[arg(long, default_missing_value = "true", num_args = 0..=1)]
probe_media: Option<bool>,
},
/// Import files from filesystem into database
DatabaseImport {
/// Don't actually import data and just print which files WOULD be imported
#[arg(long, default_missing_value = "true", num_args = 0..=1)]
dry_run: Option<bool>,
},
}
#[tokio::main]
async fn main() -> Result<(), Error> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
pretty_env_logger::init();
let args: Args = Args::parse();
let builder = Config::builder()
.add_source(config::File::with_name(if let Some(ref c) = args.config {
c.as_str()
} else {
"config.yaml"
}))
.add_source(config::Environment::with_prefix("APP"))
.build()?;
let settings: Settings = builder.try_deserialize()?;
match args.command {
Commands::Check { delete } => {
info!("Checking files in: {}", settings.storage_dir);
let fs = FileStore::new(settings.clone());
iter_files(&fs.storage_dir(), 4, |entry, p| {
let p = p.clone();
Box::pin(async move {
let id = if let Some(i) = id_from_path(&entry) {
i
} else {
p.set_message(format!("Skipping invalid file: {}", &entry.display()));
return Ok(());
};
let hash = FileStore::hash_file(&entry).await?;
if hash != id {
if delete.unwrap_or(false) {
p.set_message(format!("Deleting corrupt file: {}", &entry.display()));
tokio::fs::remove_file(&entry).await?;
} else {
p.set_message(format!("File is corrupted: {}", &entry.display()));
}
}
Ok(())
})
})
.await?;
}
Commands::Import { from, probe_media } => {
let fs = FileStore::new(settings.clone());
let db = Database::new(&settings.database).await?;
db.migrate().await?;
info!("Importing from: {}", fs.storage_dir().display());
iter_files(&from, 4, |entry, p| {
let fs = fs.clone();
let p = p.clone();
Box::pin(async move {
let mime = infer::get_from_path(&entry)?
.map(|m| m.mime_type())
.unwrap_or("application/octet-stream");
// test media is not corrupt
if probe_media.unwrap_or(true)
&& (mime.starts_with("image/") || mime.starts_with("video/"))
&& probe_file(&entry).is_err()
{
p.set_message(format!("Skipping media invalid file: {}", &entry.display()));
return Ok(());
}
let file = tokio::fs::File::open(&entry).await?;
let dst = fs.put(file, mime, false).await?;
match dst {
FileSystemResult::AlreadyExists(_) => {
p.set_message(format!("Duplicate file: {}", &entry.display()));
}
FileSystemResult::NewFile(_) => {
p.set_message(format!("Imported: {}", &entry.display()));
}
}
Ok(())
})
})
.await?;
}
Commands::DatabaseImport { dry_run } => {
let fs = FileStore::new(settings.clone());
let db = Database::new(&settings.database).await?;
db.migrate().await?;
info!("Importing to DB from: {}", fs.storage_dir().display());
iter_files(&fs.storage_dir(), 4, |entry, p| {
let db = db.clone();
let p = p.clone();
Box::pin(async move {
let id = if let Some(i) = id_from_path(&entry) {
i
} else {
p.set_message(format!("Skipping invalid file: {}", &entry.display()));
return Ok(());
};
let u = db.get_file(&id).await.context("db get_file")?;
if u.is_none() {
if !dry_run.unwrap_or(false) {
p.set_message(format!("Importing file: {}", &entry.display()));
let mime = infer::get_from_path(&entry)
.context("infer")?
.map(|m| m.mime_type())
.unwrap_or("application/octet-stream")
.to_string();
let meta = entry.metadata().context("file metadata")?;
let entry = FileUpload {
id,
name: None,
size: meta.len(),
mime_type: mime,
created: meta.created().unwrap_or(SystemTime::now()).into(),
width: None,
height: None,
blur_hash: None,
alt: None,
duration: None,
bitrate: None,
};
db.add_file(&entry, None).await.context("db add_file")?;
} else {
p.set_message(format!(
"[DRY-RUN] Importing file: {}",
&entry.display()
));
}
}
Ok(())
})
})
.await?;
}
}
Ok(())
}
fn id_from_path(path: &Path) -> Option<Vec<u8>> {
hex::decode(path.file_name()?.to_str()?).ok()
}
async fn iter_files<F>(p: &Path, threads: usize, mut op: F) -> Result<()>
where
F: FnMut(PathBuf, ProgressBar) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>,
{
let semaphore = Arc::new(Semaphore::new(threads));
info!("Scanning files: {}", p.display());
let entries = walkdir::WalkDir::new(p);
let dir = entries
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
.collect::<Vec<_>>();
let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template(
"{spinner} [{pos}/{len}] {msg}",
)?);
let mut all_tasks = vec![];
for entry in dir {
let _lock = semaphore.clone().acquire_owned().await?;
p.inc(1);
let fut = op(entry.path().to_path_buf(), p.clone());
all_tasks.push(tokio::spawn(async move {
if let Err(e) = fut.await {
error!("Error processing file: {} {}", entry.path().display(), e);
}
drop(_lock);
}));
}
for task in all_tasks {
task.await?;
}
p.finish_with_message("Done!");
Ok(())
}

View File

@ -103,7 +103,7 @@ async fn migrate_file(
let src_path = PathBuf::new()
.join(&args.data_path)
.join(VoidFile::map_to_path(&f.id));
let dst_path = fs.map_path(&id_vec);
let dst_path = fs.get(&id_vec);
if src_path.exists() && !dst_path.exists() {
info!(
"Copying file: {} from {} => {}",
@ -142,6 +142,6 @@ async fn migrate_file(
duration: None,
bitrate: None,
};
db.add_file(&fu, uid).await?;
db.add_file(&fu, Some(uid)).await?;
Ok(())
}

View File

@ -148,7 +148,7 @@ impl Database {
.try_get(0)
}
pub async fn add_file(&self, file: &FileUpload, user_id: u64) -> Result<(), Error> {
pub async fn add_file(&self, file: &FileUpload, user_id: Option<u64>) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
let q = sqlx::query("insert ignore into \
uploads(id,name,size,mime_type,blur_hash,width,height,alt,created,duration,bitrate) values(?,?,?,?,?,?,?,?,?,?,?)")
@ -165,10 +165,13 @@ impl Database {
.bind(file.bitrate);
tx.execute(q).await?;
let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)")
.bind(&file.id)
.bind(user_id);
tx.execute(q2).await?;
if let Some(user_id) = user_id {
let q2 = sqlx::query("insert ignore into user_uploads(file,user_id) values(?,?)")
.bind(&file.id)
.bind(user_id);
tx.execute(q2).await?;
}
#[cfg(feature = "labels")]
for lbl in &file.labels {

View File

@ -14,7 +14,7 @@ use ffmpeg_rs_raw::DemuxerInfo;
use rocket::form::validate::Contains;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
@ -61,7 +61,7 @@ impl FileStore {
/// Store a new file
pub async fn put<'r, S>(
&self,
stream: S,
path: S,
mime_type: &str,
compress: bool,
) -> Result<FileSystemResult>
@ -69,7 +69,7 @@ impl FileStore {
S: AsyncRead + Unpin + 'r,
{
// store file in temp path and hash the file
let (temp_file, size, hash) = self.store_hash_temp_file(stream).await?;
let (temp_file, size, hash) = self.store_hash_temp_file(path).await?;
let dst_path = self.map_path(&hash);
// check if file hash already exists
@ -232,7 +232,7 @@ impl FileStore {
Ok((out_path, n, hash))
}
async fn hash_file(p: &PathBuf) -> Result<Vec<u8>, Error> {
pub async fn hash_file(p: &Path) -> Result<Vec<u8>, Error> {
let mut file = File::open(p).await?;
let mut hasher = Sha256::new();
let mut buf = [0; 4096];
@ -247,7 +247,7 @@ impl FileStore {
Ok(res.to_vec())
}
pub fn map_path(&self, id: &Vec<u8>) -> PathBuf {
fn map_path(&self, id: &Vec<u8>) -> PathBuf {
let id = hex::encode(id);
self.storage_dir().join(&id[0..2]).join(&id[2..4]).join(id)
}

View File

@ -25,7 +25,7 @@ pub struct BlobDescriptor {
pub size: u64,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub mime_type: Option<String>,
pub created: u64,
pub uploaded: u64,
#[serde(rename = "nip94", skip_serializing_if = "Option::is_none")]
pub nip94: Option<HashMap<String, String>>,
}
@ -45,7 +45,7 @@ impl BlobDescriptor {
sha256: id_hex,
size: value.size,
mime_type: Some(value.mime_type.clone()),
created: value.created.timestamp() as u64,
uploaded: value.created.timestamp() as u64,
nip94: Some(
Nip94Event::from_upload(settings, value)
.tags
@ -415,7 +415,7 @@ where
return BlossomResponse::error(format!("Failed to save file (db): {}", e));
}
};
if let Err(e) = db.add_file(&upload, user_id).await {
if let Err(e) = db.add_file(&upload, Some(user_id)).await {
error!("{}", e.to_string());
BlossomResponse::error(format!("Error saving file (db): {}", e))
} else {

View File

@ -59,22 +59,27 @@ struct PagedResult<T> {
impl Nip94Event {
pub fn from_upload(settings: &Settings, upload: &FileUpload) -> Self {
let hex_id = hex::encode(&upload.id);
let ext = if upload.mime_type != "application/octet-stream" {
mime2ext::mime2ext(&upload.mime_type)
} else {
None
};
let mut tags = vec![
vec![
"url".to_string(),
format!(
"{}/{}{}",
&settings.public_url,
&hex_id,
mime2ext::mime2ext(&upload.mime_type)
.map(|m| format!(".{m}"))
.unwrap_or("".to_string())
),
format!("{}/{}.{}", &settings.public_url, &hex_id, ext.unwrap_or("")),
],
vec!["x".to_string(), hex_id],
vec!["x".to_string(), hex_id.clone()],
vec!["m".to_string(), upload.mime_type.clone()],
vec!["size".to_string(), upload.size.to_string()],
];
if upload.mime_type.starts_with("image/") || upload.mime_type.starts_with("video/") {
tags.push(vec![
"thumb".to_string(),
format!("{}/thumb/{}.webp", &settings.public_url, &hex_id),
]);
}
if let Some(bh) = &upload.blur_hash {
tags.push(vec!["blurhash".to_string(), bh.clone()]);
}
@ -402,6 +407,10 @@ pub async fn get_blob_thumb(
return Err(Status::NotFound);
};
if !(info.mime_type.starts_with("image/") || info.mime_type.starts_with("video/")) {
return Err(Status::NotFound);
}
let file_path = fs.get(&id);
let mut thumb_file = temp_dir().join(format!("thumb_{}", sha256));
@ -437,10 +446,15 @@ pub async fn void_cat_redirect(id: &str, settings: &State<Settings>) -> Option<N
id
};
if let Some(base) = &settings.void_cat_files {
let uuid =
uuid::Uuid::from_slice_le(nostr::bitcoin::base58::decode(id).unwrap().as_slice())
.unwrap();
let f = base.join(VoidFile::map_to_path(&uuid));
let uuid = if let Ok(b58) = nostr::bitcoin::base58::decode(id) {
uuid::Uuid::from_slice_le(b58.as_slice())
} else {
uuid::Uuid::parse_str(id)
};
if uuid.is_err() {
return None;
}
let f = base.join(VoidFile::map_to_path(&uuid.unwrap()));
debug!("Legacy file map: {} => {}", id, f.display());
if let Ok(f) = NamedFile::open(f).await {
Some(f)

View File

@ -232,7 +232,7 @@ async fn upload(
Err(e) => return Nip96Response::error(&format!("Could not save user: {}", e)),
};
if let Err(e) = db.add_file(&upload, user_id).await {
if let Err(e) = db.add_file(&upload, Some(user_id)).await {
error!("{}", e.to_string());
return Nip96Response::error(&format!("Could not save file (db): {}", e));
}

View File

@ -22,9 +22,9 @@ export class Route96 {
return data;
}
async listFiles(page = 0, count = 10) {
async listFiles(page = 0, count = 10, mime: string | undefined) {
const rsp = await this.#req(
`admin/files?page=${page}&count=${count}`,
`admin/files?page=${page}&count=${count}${mime ? `&mime_type=${mime}` : ""}`,
"GET",
);
const data = await this.#handleResponse<AdminResponseFileList>(rsp);

View File

@ -23,6 +23,7 @@ export default function Upload() {
const [adminListedFiles, setAdminListedFiles] = useState<Nip96FileList>();
const [listedPage, setListedPage] = useState(0);
const [adminListedPage, setAdminListedPage] = useState(0);
const [mimeFilter, setMimeFilter] = useState<string>();
const login = useLogin();
const pub = usePublisher();
@ -85,7 +86,7 @@ export default function Upload() {
try {
setError(undefined);
const uploader = new Route96(url, pub);
const result = await uploader.listFiles(n, 50);
const result = await uploader.listFiles(n, 50, mimeFilter);
setAdminListedFiles(result);
} catch (e) {
if (e instanceof Error) {
@ -135,7 +136,7 @@ export default function Upload() {
useEffect(() => {
listAllUploads(adminListedPage);
}, [adminListedPage]);
}, [adminListedPage, mimeFilter]);
useEffect(() => {
if (pub && !self) {
@ -249,6 +250,18 @@ export default function Upload() {
<hr />
<h3>Admin File List:</h3>
<Button onClick={() => listAllUploads(0)}>List All Uploads</Button>
<div>
<select value={mimeFilter} onChange={e => setMimeFilter(e.target.value)}>
<option value={""}>All</option>
<option>image/webp</option>
<option>image/jpeg</option>
<option>image/jpg</option>
<option>image/png</option>
<option>image/gif</option>
<option>video/mp4</option>
<option>video/mov</option>
</select>
</div>
{adminListedFiles && (
<FileList
files={adminListedFiles.files}