diff --git a/Cargo.lock b/Cargo.lock index d66e637..03e2fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385#a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=928ab9664ff47c1b0bd8313ebc73d13b1ab43fc5#928ab9664ff47c1b0bd8313ebc73d13b1ab43fc5" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index 3a0a7c0..1644ddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ http-range-header = { version = "0.4.2" } base58 = "0.2.0" libc = { version = "0.2.153", optional = true } -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385", optional = true } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "928ab9664ff47c1b0bd8313ebc73d13b1ab43fc5", optional = true } candle-core = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } candle-nn = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } candle-transformers = { git = "https://git.v0l.io/huggingface/candle.git", tag = "0.8.1", optional = true } diff --git a/config.yaml b/config.yaml index 14104ef..2d9ac3d 100644 --- a/config.yaml +++ b/config.yaml @@ -38,8 +38,8 @@ payments: # LND node config lnd: endpoint: "https://127.0.0.1:10001" - tls: "/home/kieran/.polar/networks/3/volumes/lnd/alice/tls.cert" - macaroon: "/home/kieran/.polar/networks/3/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon" + tls: "/home/kieran/.polar/networks/1/volumes/lnd/alice/tls.cert" + macaroon: "/home/kieran/.polar/networks/1/volumes/lnd/alice/data/chain/bitcoin/regtest/admin.macaroon" # Cost per unit (BTC/USD/EUR/AUD/CAD/JPY/GBP) cost: currency: "BTC" diff --git a/src/background/media_metadata.rs b/src/background/media_metadata.rs index f946224..4f71ec4 100644 --- a/src/background/media_metadata.rs +++ b/src/background/media_metadata.rs @@ -3,6 +3,7 @@ use crate::filesystem::FileStore; use crate::processing::probe_file; use anyhow::Result; use log::{error, info, warn}; +use tokio::sync::broadcast::Receiver; pub struct MediaMetadata { db: Database, @@ -14,12 +15,15 @@ impl MediaMetadata { Self { db, fs } } - pub async fn process(&mut self) -> Result<()> { + pub async fn process(&mut self, mut shutdown: Receiver<()>) -> Result<()> { let to_migrate = self.db.get_missing_media_metadata().await?; info!("{} files are missing metadata", to_migrate.len()); for file in to_migrate { + if shutdown.try_recv().is_ok() { + break; + } // probe file and update metadata let path = self.fs.get(&file.id); match probe_file(&path) { diff --git a/src/bin/main.rs b/src/bin/main.rs index d047766..7bdba07 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -79,9 +79,7 @@ async fn main() -> Result<(), Error> { routes![ root, get_blob, - head_blob, - routes::void_cat_redirect, - routes::void_cat_redirect_head + head_blob ], ) .mount("/admin", routes::admin_routes()); diff --git a/src/bin/r96util.rs b/src/bin/r96util.rs deleted file mode 100644 index f936299..0000000 --- a/src/bin/r96util.rs +++ /dev/null @@ -1,228 +0,0 @@ -use anyhow::{Context, Error, Result}; -use clap::{Parser, Subcommand}; -use config::Config; -use indicatif::{ProgressBar, ProgressStyle}; -use log::{error, info}; -use route96::db::{Database, FileUpload}; -use route96::filesystem::{FileStore, FileSystemResult}; -use route96::processing::probe_file; -use route96::settings::Settings; -use std::future::Future; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::Arc; -use std::time::SystemTime; -use tokio::sync::Semaphore; - -#[derive(Parser, Debug)] -#[command(version, about)] -struct Args { - #[arg(long)] - pub config: Option, - - #[clap(subcommand)] - pub command: Commands, -} - -#[derive(Debug, Subcommand)] -enum Commands { - /// Check file hash matches filename / path - Check { - #[arg(long)] - delete: Option, - }, - - /// Import a directory into the filesystem - /// (does NOT import files into the database, use database-import command for that) - Import { - #[arg(long)] - from: PathBuf, - #[arg(long, default_missing_value = "true", num_args = 0..=1)] - probe_media: Option, - }, - - /// Import files from filesystem into database - DatabaseImport { - /// Don't actually import data and just print which files WOULD be imported - #[arg(long, default_missing_value = "true", num_args = 0..=1)] - dry_run: Option, - }, -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info"); - } - env_logger::init(); - - let args: Args = Args::parse(); - - let builder = Config::builder() - .add_source(config::File::with_name(if let Some(ref c) = args.config { - c.as_str() - } else { - "config.yaml" - })) - .add_source(config::Environment::with_prefix("APP")) - .build()?; - - let settings: Settings = builder.try_deserialize()?; - - match args.command { - Commands::Check { delete } => { - info!("Checking files in: {}", settings.storage_dir); - let fs = FileStore::new(settings.clone()); - iter_files(&fs.storage_dir(), 4, |entry, p| { - let p = p.clone(); - Box::pin(async move { - let id = if let Some(i) = id_from_path(&entry) { - i - } else { - p.set_message(format!("Skipping invalid file: {}", &entry.display())); - return Ok(()); - }; - - let hash = FileStore::hash_file(&entry).await?; - if hash != id { - if delete.unwrap_or(false) { - p.set_message(format!("Deleting corrupt file: {}", &entry.display())); - tokio::fs::remove_file(&entry).await?; - } else { - p.set_message(format!("File is corrupted: {}", &entry.display())); - } - } - Ok(()) - }) - }) - .await?; - } - Commands::Import { from, probe_media } => { - let fs = FileStore::new(settings.clone()); - let db = Database::new(&settings.database).await?; - db.migrate().await?; - info!("Importing from: {}", fs.storage_dir().display()); - iter_files(&from, 4, |entry, p| { - let fs = fs.clone(); - let p = p.clone(); - Box::pin(async move { - let mime = infer::get_from_path(&entry)? - .map(|m| m.mime_type()) - .unwrap_or("application/octet-stream"); - - // test media is not corrupt - if probe_media.unwrap_or(true) - && (mime.starts_with("image/") || mime.starts_with("video/")) - && probe_file(&entry).is_err() - { - p.set_message(format!("Skipping media invalid file: {}", &entry.display())); - return Ok(()); - } - - let file = tokio::fs::File::open(&entry).await?; - let dst = fs.put(file, mime, false).await?; - match dst { - FileSystemResult::AlreadyExists(_) => { - p.set_message(format!("Duplicate file: {}", &entry.display())); - } - FileSystemResult::NewFile(_) => { - p.set_message(format!("Imported: {}", &entry.display())); - } - } - Ok(()) - }) - }) - .await?; - } - Commands::DatabaseImport { dry_run } => { - let fs = FileStore::new(settings.clone()); - let db = Database::new(&settings.database).await?; - db.migrate().await?; - info!("Importing to DB from: {}", fs.storage_dir().display()); - iter_files(&fs.storage_dir(), 4, |entry, p| { - let db = db.clone(); - let p = p.clone(); - Box::pin(async move { - let id = if let Some(i) = id_from_path(&entry) { - i - } else { - p.set_message(format!("Skipping invalid file: {}", &entry.display())); - return Ok(()); - }; - let u = db.get_file(&id).await.context("db get_file")?; - if u.is_none() { - if !dry_run.unwrap_or(false) { - p.set_message(format!("Importing file: {}", &entry.display())); - let mime = infer::get_from_path(&entry) - .context("infer")? - .map(|m| m.mime_type()) - .unwrap_or("application/octet-stream") - .to_string(); - let meta = entry.metadata().context("file metadata")?; - let entry = FileUpload { - id, - name: None, - size: meta.len(), - mime_type: mime, - created: meta.created().unwrap_or(SystemTime::now()).into(), - width: None, - height: None, - blur_hash: None, - alt: None, - duration: None, - bitrate: None, - }; - db.add_file(&entry, None).await.context("db add_file")?; - } else { - p.set_message(format!( - "[DRY-RUN] Importing file: {}", - &entry.display() - )); - } - } - Ok(()) - }) - }) - .await?; - } - } - Ok(()) -} - -fn id_from_path(path: &Path) -> Option> { - hex::decode(path.file_name()?.to_str()?).ok() -} - -async fn iter_files(p: &Path, threads: usize, mut op: F) -> Result<()> -where - F: FnMut(PathBuf, ProgressBar) -> Pin> + Send>>, -{ - let semaphore = Arc::new(Semaphore::new(threads)); - info!("Scanning files: {}", p.display()); - let entries = walkdir::WalkDir::new(p); - let dir = entries - .into_iter() - .filter_map(Result::ok) - .filter(|e| e.file_type().is_file()) - .collect::>(); - let p = ProgressBar::new(dir.len() as u64).with_style(ProgressStyle::with_template( - "{spinner} [{pos}/{len}] {msg}", - )?); - let mut all_tasks = vec![]; - for entry in dir { - let _lock = semaphore.clone().acquire_owned().await?; - p.inc(1); - let fut = op(entry.path().to_path_buf(), p.clone()); - all_tasks.push(tokio::spawn(async move { - if let Err(e) = fut.await { - error!("Error processing file: {} {}", entry.path().display(), e); - } - drop(_lock); - })); - } - for task in all_tasks { - task.await?; - } - p.finish_with_message("Done!"); - Ok(()) -} diff --git a/src/bin/void_cat_forced_migrate.rs b/src/bin/void_cat_forced_migrate.rs deleted file mode 100644 index 86ab37a..0000000 --- a/src/bin/void_cat_forced_migrate.rs +++ /dev/null @@ -1,70 +0,0 @@ -use clap::Parser; -use log::{info, warn}; -use nostr::serde_json; -use nostr_cursor::cursor::NostrCursor; -use regex::Regex; -use rocket::futures::StreamExt; -use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; -use uuid::Uuid; - -#[derive(Parser, Debug)] -#[command(version, about)] -struct ProgramArgs { - /// Directory pointing to archives to scan - #[arg(short, long)] - pub archive: PathBuf, - - /// Output path .csv - #[arg(short, long)] - pub output: PathBuf, -} - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - env_logger::init(); - - let args: ProgramArgs = ProgramArgs::parse(); - - let mut report: HashMap> = HashMap::new(); - - let mut binding = NostrCursor::new(args.archive); - let mut cursor = Box::pin(binding.walk()); - let matcher = Regex::new(r"void\.cat/d/(\w+)")?; - while let Some(Ok(e)) = cursor.next().await { - if e.content.contains("void.cat") { - let links = matcher.captures_iter(&e.content).collect::>(); - for link in links { - let g = link.get(1).unwrap().as_str(); - let base58 = if let Ok(b) = nostr::bitcoin::base58::decode(g) { - b - } else { - warn!("Invalid base58 id {}", g); - continue; - }; - let _uuid = if let Ok(u) = Uuid::from_slice_le(base58.as_slice()) { - u - } else { - warn!("Invalid uuid {}", g); - continue; - }; - info!("Got link: {} => {}", g, e.pubkey); - if let Some(ur) = report.get_mut(&e.pubkey) { - ur.insert(g.to_string()); - } else { - report.insert(e.pubkey.clone(), HashSet::from([g.to_string()])); - } - } - } - } - - let json = serde_json::to_string(&report)?; - File::create(args.output) - .await? - .write_all(json.as_bytes()) - .await?; - - Ok(()) -} diff --git a/src/bin/void_cat_migrate.rs b/src/bin/void_cat_migrate.rs deleted file mode 100644 index a8146aa..0000000 --- a/src/bin/void_cat_migrate.rs +++ /dev/null @@ -1,147 +0,0 @@ -use anyhow::Error; -use clap::Parser; -use config::Config; -use log::{info, warn}; -use nostr::bitcoin::base58; -use route96::db::{Database, FileUpload}; -use route96::filesystem::FileStore; -use route96::settings::Settings; -use route96::void_db::VoidCatDb; -use route96::void_file::VoidFile; -use std::path::PathBuf; -use tokio::io::{AsyncWriteExt, BufWriter}; - -#[derive(Debug, Clone, clap::ValueEnum)] -enum ArgOperation { - Migrate, - ExportNginxRedirects, -} - -#[derive(Parser, Debug)] -#[command(version, about)] -struct Args { - /// Database connection string for void.cat DB - #[arg(long)] - pub database: String, - - /// Path to filestore on void.cat - #[arg(long)] - pub data_path: String, - - #[arg(long)] - pub operation: ArgOperation, -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - env_logger::init(); - - let builder = Config::builder() - .add_source(config::File::with_name("config.yaml")) - .add_source(config::Environment::with_prefix("APP")) - .build()?; - - let settings: Settings = builder.try_deserialize()?; - - let db = Database::new(&settings.database).await?; - let fs = FileStore::new(settings.clone()); - - let args: Args = Args::parse(); - - let db_void = VoidCatDb::connect(&args.database).await?; - - match args.operation { - ArgOperation::Migrate => { - let mut page = 0; - loop { - let files = db_void.list_files(page).await?; - if files.is_empty() { - break; - } - for f in files { - if let Err(e) = migrate_file(&f, &db, &fs, &args).await { - warn!("Failed to migrate file: {}, {}", &f.id, e); - } - } - page += 1; - } - } - ArgOperation::ExportNginxRedirects => { - let path: PathBuf = args.data_path.parse()?; - let conf_path = &path.join("nginx.conf"); - info!("Writing redirects to {}", conf_path.to_str().unwrap()); - let mut fout = BufWriter::new(tokio::fs::File::create(conf_path).await?); - let mut page = 0; - loop { - let files = db_void.list_files(page).await?; - if files.is_empty() { - break; - } - for f in files { - let legacy_id = base58::encode(f.id.to_bytes_le().as_slice()); - let redirect = format!("location ^\\/d\\/{}(?:\\.\\w+)?$ {{\n\treturn 301 https://nostr.download/{};\n}}\n", &legacy_id, &f.digest); - fout.write_all(redirect.as_bytes()).await?; - } - page += 1; - } - } - } - - Ok(()) -} - -async fn migrate_file( - f: &VoidFile, - db: &Database, - fs: &FileStore, - args: &Args, -) -> Result<(), Error> { - let pubkey_vec = hex::decode(&f.email)?; - let id_vec = hex::decode(&f.digest)?; - - // copy file - let src_path = PathBuf::new() - .join(&args.data_path) - .join(VoidFile::map_to_path(&f.id)); - let dst_path = fs.get(&id_vec); - if src_path.exists() && !dst_path.exists() { - info!( - "Copying file: {} from {} => {}", - &f.id, - src_path.to_str().unwrap(), - dst_path.to_str().unwrap() - ); - - tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?; - tokio::fs::copy(src_path, dst_path).await?; - } else if dst_path.exists() { - info!("File already exists {}, continuing...", &f.id); - } else { - anyhow::bail!("Source file not found {}", src_path.to_str().unwrap()); - } - let uid = db.upsert_user(&pubkey_vec).await?; - info!("Mapped user {} => {}", &f.email, uid); - - let md: Option> = f.media_dimensions.as_ref().map(|s| s.split("x").collect()); - let fu = FileUpload { - id: id_vec, - name: f.name.clone(), - size: f.size as u64, - mime_type: f.mime_type.clone(), - created: f.uploaded, - width: match &md { - Some(s) => Some(s[0].parse::()?), - None => None, - }, - height: match &md { - Some(s) => Some(s[1].parse::()?), - None => None, - }, - blur_hash: None, - alt: f.description.clone(), - duration: None, - bitrate: None, - }; - db.add_file(&fu, Some(uid)).await?; - Ok(()) -} diff --git a/src/routes/blossom.rs b/src/routes/blossom.rs index 301966a..f59b4ea 100644 --- a/src/routes/blossom.rs +++ b/src/routes/blossom.rs @@ -5,7 +5,7 @@ use crate::routes::{delete_file, Nip94Event}; use crate::settings::Settings; use log::error; use nostr::prelude::hex; -use nostr::{Alphabet, SingleLetterTag, TagKind, JsonUtil}; +use nostr::{Alphabet, JsonUtil, SingleLetterTag, TagKind}; use rocket::data::ByteUnit; use rocket::futures::StreamExt; use rocket::http::{Header, Status}; @@ -46,6 +46,7 @@ impl BlobDescriptor { mime_type: Some(value.mime_type.clone()), created: value.created.timestamp() as u64, nip94: Some(Nip94Event::from_upload(settings, value).tags), + } } } @@ -70,7 +71,14 @@ pub fn blossom_routes() -> Vec { #[cfg(not(feature = "media-compression"))] pub fn blossom_routes() -> Vec { - routes![delete_blob, upload, list_files, upload_head, mirror, report_file] + routes![ + delete_blob, + upload, + list_files, + upload_head, + mirror, + report_file + ] } /// Generic holder response, mostly for errors @@ -358,12 +366,17 @@ async fn process_upload( // check quota #[cfg(feature = "payments")] if let Some(upload_size) = size { - let free_quota = settings.payments.as_ref() + let free_quota = settings + .payments + .as_ref() .and_then(|p| p.free_quota_bytes) .unwrap_or(104857600); // Default to 100MB let pubkey_vec = auth.event.pubkey.to_bytes().to_vec(); - - match db.check_user_quota(&pubkey_vec, upload_size, free_quota).await { + + match db + .check_user_quota(&pubkey_vec, upload_size, free_quota) + .await + { Ok(false) => return BlossomResponse::error("Upload would exceed quota"), Err(_) => return BlossomResponse::error("Failed to check quota"), Ok(true) => {} // Quota check passed @@ -466,7 +479,7 @@ async fn report_file( // Verify the reported file exists match db.get_file(&file_sha256).await { - Ok(Some(_)) => {}, // File exists, continue + Ok(Some(_)) => {} // File exists, continue Ok(None) => return BlossomResponse::error("File not found"), Err(e) => return BlossomResponse::error(format!("Failed to check file: {}", e)), } @@ -478,7 +491,10 @@ async fn report_file( }; // Store the report (the database will handle duplicate prevention via unique index) - match db.add_report(&file_sha256, reporter_id, &data.as_json()).await { + match db + .add_report(&file_sha256, reporter_id, &data.as_json()) + .await + { Ok(()) => BlossomResponse::Generic(BlossomGenericResponse { status: Status::Ok, message: Some("Report submitted successfully".to_string()), diff --git a/src/routes/mod.rs b/src/routes/mod.rs index f0cf377..e474ec7 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -8,12 +8,11 @@ pub use crate::routes::blossom::blossom_routes; #[cfg(feature = "nip96")] pub use crate::routes::nip96::nip96_routes; use crate::settings::Settings; -use crate::void_file::VoidFile; use anyhow::{Error, Result}; use http_range_header::{ parse_range_header, EndPosition, StartPosition, SyntacticallyCorrectRange, }; -use log::{debug, warn}; +use log::warn; use nostr::Event; use rocket::fs::NamedFile; use rocket::http::{ContentType, Header, Status}; @@ -29,12 +28,13 @@ use std::task::{Context, Poll}; use tokio::fs::File; use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; +mod admin; #[cfg(feature = "blossom")] mod blossom; #[cfg(feature = "nip96")] mod nip96; - -mod admin; +#[cfg(feature = "payments")] +pub mod payment; pub struct FilePayload { pub file: File, @@ -442,56 +442,6 @@ pub async fn get_blob_thumb( } } -/// Legacy URL redirect for void.cat uploads -#[rocket::get("/d/")] -pub async fn void_cat_redirect(id: &str, settings: &State) -> Option { - let id = if id.contains(".") { - id.split('.').next().unwrap() - } else { - id - }; - if let Some(base) = &settings.void_cat_files { - let uuid = if let Ok(b58) = nostr::bitcoin::base58::decode(id) { - uuid::Uuid::from_slice_le(b58.as_slice()) - } else { - uuid::Uuid::parse_str(id) - }; - if uuid.is_err() { - return None; - } - let f = base.join(VoidFile::map_to_path(&uuid.unwrap())); - debug!("Legacy file map: {} => {}", id, f.display()); - if let Ok(f) = NamedFile::open(f).await { - Some(f) - } else { - None - } - } else { - None - } -} - -#[rocket::head("/d/")] -pub async fn void_cat_redirect_head(id: &str) -> VoidCatFile { - let id = if id.contains(".") { - id.split('.').next().unwrap() - } else { - id - }; - let uuid = - uuid::Uuid::from_slice_le(nostr::bitcoin::base58::decode(id).unwrap().as_slice()).unwrap(); - VoidCatFile { - status: Status::Ok, - uuid: Header::new("X-UUID", uuid.to_string()), - } -} - -#[derive(Responder)] -pub struct VoidCatFile { - pub status: Status, - pub uuid: Header<'static>, -} - #[cfg(test)] mod tests { use super::*;