From 6ed9088aaadcd2e2c621ae565010ff08b2b4c12a Mon Sep 17 00:00:00 2001 From: kieran Date: Tue, 30 Apr 2024 16:07:15 +0100 Subject: [PATCH] Upload/list progress --- .gitignore | 3 +- Cargo.lock | 74 ++++++++++ Cargo.toml | 3 +- migrations/20240428220811_init.sql | 1 + src/auth.rs | 18 ++- src/blob.rs | 14 ++ src/cors.rs | 15 +- src/db.rs | 44 ++++-- src/filesystem.rs | 40 ++--- src/routes.rs | 229 ++++++++++++++++++++++------- 10 files changed, 349 insertions(+), 92 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..b143b9c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/target +target/ +data/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2de1025..55a14e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,21 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.82" @@ -334,6 +349,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-targets 0.52.5", +] + [[package]] name = "cipher" version = "0.4.4" @@ -411,6 +441,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -1056,6 +1092,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -2333,6 +2392,7 @@ dependencies = [ "atoi", "byteorder", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -2393,6 +2453,7 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-mysql", + "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -2411,6 +2472,7 @@ dependencies = [ "bitflags 2.5.0", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -2452,6 +2514,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.5.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -2487,6 +2550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", @@ -3016,6 +3080,7 @@ version = "0.1.0" dependencies = [ "anyhow", "base64 0.21.7", + "chrono", "config", "hex", "log", @@ -3185,6 +3250,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index a910c3e..ac1afa7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,5 +17,6 @@ serde = { version = "1.0.198", features = ["derive"] } uuid = { version = "1.8.0", features = ["v4"] } anyhow = "1.0.82" sha2 = "0.10.8" -sqlx = { version = "0.7.4", features = ["mysql", "runtime-tokio"] } +sqlx = { version = "0.7.4", features = ["mysql", "runtime-tokio", "chrono"] } config = { version = "0.14.0", features = ["toml"] } +chrono = { version = "0.4.38", features = ["serde"] } diff --git a/migrations/20240428220811_init.sql b/migrations/20240428220811_init.sql index 2f2f7ba..35dd423 100644 --- a/migrations/20240428220811_init.sql +++ b/migrations/20240428220811_init.sql @@ -11,6 +11,7 @@ create table uploads user_id integer unsigned not null, name varchar(256) not null, size integer unsigned not null, + mime_type varchar(128) not null, created timestamp default current_timestamp, constraint fk_uploads_user diff --git a/src/auth.rs b/src/auth.rs index 72e5e63..8a89ee0 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,12 +1,12 @@ +use base64::prelude::*; +use log::info; +use nostr::{Event, JsonUtil, Kind, Tag, Timestamp}; +use rocket::{async_trait, Request}; use rocket::http::Status; use rocket::request::{FromRequest, Outcome}; -use rocket::{async_trait, Request}; - -use base64::prelude::*; -use nostr::{Event, JsonUtil, Kind, Tag, TagKind, Timestamp}; pub struct BlossomAuth { - pub pubkey: String, + pub content_type: Option, pub event: Event, } @@ -52,9 +52,15 @@ impl<'r> FromRequest<'r> for BlossomAuth { if let Err(_) = event.verify() { return Outcome::Error((Status::new(401), "Event signature invalid")); } + + info!("{}", event.as_json()); Outcome::Success(BlossomAuth { - pubkey: event.pubkey.to_string(), event, + content_type: request.headers().iter().find_map(|h| if h.name == "content-type" { + Some(h.value.to_string()) + } else { + None + }), }) } else { Outcome::Error((Status::new(403), "Auth scheme must be Nostr")) diff --git a/src/blob.rs b/src/blob.rs index 27cfe1c..22e55ba 100644 --- a/src/blob.rs +++ b/src/blob.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +use crate::db::FileUpload; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(crate = "rocket::serde")] pub struct BlobDescriptor { @@ -9,4 +11,16 @@ pub struct BlobDescriptor { #[serde(rename = "type", skip_serializing_if = "Option::is_none")] pub mime_type: Option, pub created: u64, +} + +impl From<&FileUpload> for BlobDescriptor { + fn from(value: &FileUpload) -> Self { + Self { + url: "".to_string(), + sha256: hex::encode(&value.id), + size: value.size, + mime_type: Some(value.mime_type.clone()), + created: value.created.timestamp() as u64, + } + } } \ No newline at end of file diff --git a/src/cors.rs b/src/cors.rs index 409b4fb..b54fec1 100644 --- a/src/cors.rs +++ b/src/cors.rs @@ -1,6 +1,7 @@ +use std::io::Cursor; use rocket::fairing::{Fairing, Info, Kind}; -use rocket::http::Header; -use rocket::{Request, Response}; +use rocket::http::{Header, Method, Status}; +use rocket::{Data, Request, Response}; pub struct CORS; @@ -13,13 +14,19 @@ impl Fairing for CORS { } } - async fn on_response<'r>(&self, _req: &'r Request<'_>, response: &mut Response<'r>) { + async fn on_response<'r>(&self, req: &'r Request<'_>, response: &mut Response<'r>) { response.set_header(Header::new("Access-Control-Allow-Origin", "*")); response.set_header(Header::new( "Access-Control-Allow-Methods", - "POST, GET, HEAD, DELETE, OPTIONS", + "PUT, GET, HEAD, DELETE, OPTIONS", )); response.set_header(Header::new("Access-Control-Allow-Headers", "*")); response.set_header(Header::new("Access-Control-Allow-Credentials", "true")); + + // force status 200 for options requests + if req.method() == Method::Options { + response.set_status(Status::Ok); + response.set_sized_body(None, Cursor::new("")) + } } } diff --git a/src/db.rs b/src/db.rs index a147cfc..0d3eaeb 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,13 +1,15 @@ +use chrono::{DateTime, Utc}; +use sqlx::{Error, FromRow, Row}; use sqlx::migrate::MigrateError; -use sqlx::{Error, Row}; -#[derive(Clone, sqlx::FromRow)] +#[derive(Clone, FromRow)] pub struct FileUpload { pub id: Vec, pub user_id: u64, pub name: String, pub size: u64, - pub created: u64, + pub mime_type: String, + pub created: DateTime, } #[derive(Clone)] @@ -25,27 +27,49 @@ impl Database { sqlx::migrate!("./migrations/").run(&self.pool).await } - pub async fn add_user(&self, pubkey: Vec) -> Result { - let res = sqlx::query("insert into users(pubkey) values(?) returning id") + pub async fn upsert_user(&self, pubkey: &Vec) -> Result { + let res = sqlx::query("insert ignore into users(pubkey) values(?) returning id") .bind(pubkey) - .fetch_one(&self.pool) + .fetch_optional(&self.pool) .await?; - res.try_get(0) + match res { + None => sqlx::query("select id from users where pubkey = ?") + .bind(pubkey) + .fetch_one(&self.pool) + .await? + .try_get(0), + Some(res) => res.try_get(0) + } } - pub async fn add_file(&self, file: FileUpload) -> Result<(), Error> { - sqlx::query("insert into uploads(id,user_id,name,size) values(?,?,?,?)") + pub async fn add_file(&self, file: &FileUpload) -> Result<(), Error> { + sqlx::query("insert into uploads(id,user_id,name,size,mime_type) values(?,?,?,?,?)") .bind(&file.id) .bind(&file.user_id) .bind(&file.name) .bind(&file.size) + .bind(&file.mime_type) .execute(&self.pool) .await?; Ok(()) } + pub async fn get_file(&self, file: &Vec) -> Result, Error> { + sqlx::query_as("select * from uploads where id = ?") + .bind(&file) + .fetch_optional(&self.pool) + .await + } + + pub async fn delete_file(&self, file: &Vec) -> Result<(), Error> { + sqlx::query_as("delete from uploads where id = ?") + .bind(&file) + .execute(&self.pool) + .await? + } + pub async fn list_files(&self, pubkey: &Vec) -> Result, Error> { - let results: Vec = sqlx::query_as("select * from uploads where user_id = ?") + let results: Vec = sqlx::query_as("select * from uploads where user_id = (select id from users where pubkey = ?)") .bind(&pubkey) .fetch_all(&self.pool) .await?; diff --git a/src/filesystem.rs b/src/filesystem.rs index 913292b..e390060 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -4,10 +4,11 @@ use std::io::SeekFrom; use std::path::{Path, PathBuf}; use anyhow::Error; +use log::info; use rocket::data::DataStream; use sha2::{Digest, Sha256}; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, BufWriter}; use crate::db::Database; use crate::settings::Settings; @@ -40,25 +41,24 @@ impl FileStore { let random_id = uuid::Uuid::new_v4(); let tmp_path = FileStore::map_temp(random_id); - let file = stream.into_file(&tmp_path).await; - match file { - Err(e) => Err(Error::from(e)), - Ok(file) => { - let size = file.n.written; - let mut file = file.value; - let hash = FileStore::hash_file(&mut file).await?; - let dst_path = self.map_path(&hash); - if let Err(e) = fs::rename(&tmp_path, &dst_path) { - fs::remove_file(&tmp_path)?; - Err(Error::from(e)) - } else { - Ok(FileSystemResult { - size, - sha256: hash, - path: dst_path, - }) - } - } + let mut file = File::options() + .create(true).write(true).read(true) + .open(tmp_path.clone()).await?; + let n = stream.stream_to(&mut BufWriter::new(&mut file)).await?; + + info!("File saved to temp path: {}", tmp_path.to_str().unwrap()); + let hash = FileStore::hash_file(&mut file).await?; + let dst_path = self.map_path(&hash); + fs::create_dir_all(dst_path.parent().unwrap())?; + if let Err(e) = fs::rename(&tmp_path, &dst_path) { + fs::remove_file(&tmp_path)?; + Err(Error::from(e)) + } else { + Ok(FileSystemResult { + size: n.written, + sha256: hash, + path: dst_path, + }) } } diff --git a/src/routes.rs b/src/routes.rs index 54f6641..72a5f42 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,19 +1,85 @@ -use crate::auth::BlossomAuth; -use crate::blob::BlobDescriptor; -use crate::db::Database; -use crate::filesystem::FileStore; +use std::fs; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; + +use chrono::Utc; +use log::{error, info}; +use nostr::{JsonUtil, Tag, TagKind}; use nostr::prelude::hex; -use nostr::Tag; +use rocket::{async_trait, Data, Request, Route, routes, State, uri}; use rocket::data::ToByteUnit; use rocket::fs::NamedFile; -use rocket::http::Status; +use rocket::http::{ContentType, Header, Status}; +use rocket::http::hyper::header::CONTENT_DISPOSITION; use rocket::request::{FromRequest, Outcome}; +use rocket::response::Responder; use rocket::response::status::NotFound; use rocket::serde::json::Json; -use rocket::{async_trait, routes, Data, Request, Route, State}; +use serde::{Deserialize, Serialize}; + +use crate::auth::BlossomAuth; +use crate::blob::BlobDescriptor; +use crate::db::{Database, FileUpload}; +use crate::filesystem::FileStore; +use crate::routes::BlossomResponse::BlobDescriptorList; pub fn all() -> Vec { - routes![root, get_blob, get_blob_check, upload, list_files] + routes![root, get_blob, head_blob, delete_blob, upload, list_files] +} + +#[derive(Serialize, Deserialize)] +struct BlossomError { + pub message: String, +} + +impl BlossomError { + pub fn new(msg: String) -> Self { + Self { message: msg } + } +} + +struct BlossomFile { + pub file: File, + pub info: FileUpload, +} + +impl<'r> Responder<'r, 'static> for BlossomFile { + fn respond_to(self, request: &'r Request<'_>) -> rocket::response::Result<'static> { + let mut response = self.file.respond_to(request)?; + if let Ok(ct) = ContentType::from_str(&self.info.mime_type) { + response.set_header(ct); + } + response.set_header(Header::new("content-disposition", format!("inline; filename=\"{}\"", self.info.name))); + Ok(response) + } +} + +#[derive(Responder)] +enum BlossomResponse { + #[response(status = 403)] + Unauthorized(Json), + + #[response(status = 500)] + GenericError(Json), + + #[response(status = 200)] + File(BlossomFile), + + #[response(status = 200)] + BlobDescriptor(Json), + + #[response(status = 200)] + BlobDescriptorList(Json>), + + StatusOnly(Status), +} + +impl BlossomResponse { + pub fn error(msg: impl Into) -> Self { + Self::GenericError(Json(BlossomError::new(msg.into()))) + } } fn check_method(event: &nostr::Event, method: &str) -> bool { @@ -21,9 +87,7 @@ fn check_method(event: &nostr::Event, method: &str) -> bool { Tag::Hashtag(tag) => Some(tag), _ => None, }) { - if t == method { - return false; - } + return t == method; } false } @@ -34,81 +98,146 @@ async fn root() -> &'static str { } #[rocket::get("/")] -async fn get_blob(sha256: &str, fs: &State) -> Result { +async fn get_blob(sha256: &str, fs: &State, db: &State) -> BlossomResponse { + let sha256 = if sha256.contains(".") { + sha256.split('.').next().unwrap() + } else { + sha256 + }; let id = if let Ok(i) = hex::decode(sha256) { i } else { - return Err(Status::NotFound); + return BlossomResponse::error("Invalid file id"); }; if id.len() != 32 { - return Err(Status::NotFound); + return BlossomResponse::error("Invalid file id"); } - if let Ok(f) = NamedFile::open(fs.get(&id)).await { - Ok(f) - } else { - Err(Status::NotFound) + if let Ok(Some(info)) = db.get_file(&id).await { + if let Ok(f) = File::open(fs.get(&id)) { + return BlossomResponse::File(BlossomFile { + file: f, + info, + }); + } } + BlossomResponse::StatusOnly(Status::NotFound) } #[rocket::head("/")] -async fn get_blob_check(sha256: &str, fs: &State) -> Status { +async fn head_blob(sha256: &str, fs: &State) -> BlossomResponse { + let sha256 = if sha256.contains(".") { + sha256.split('.').next().unwrap() + } else { + sha256 + }; let id = if let Ok(i) = hex::decode(sha256) { i } else { - return Status::NotFound; + return BlossomResponse::error("Invalid file id"); }; if id.len() != 32 { - return Status::NotFound; + return BlossomResponse::error("Invalid file id"); } if fs.get(&id).exists() { - Status::Ok + BlossomResponse::StatusOnly(Status::Ok) } else { - Status::NotFound + BlossomResponse::StatusOnly(Status::NotFound) + } +} + +#[rocket::delete("/")] +async fn delete_blob(sha256: &str, fs: &State, db: &State) -> BlossomResponse { + let sha256 = if sha256.contains(".") { + sha256.split('.').next().unwrap() + } else { + sha256 + }; + let id = if let Ok(i) = hex::decode(sha256) { + i + } else { + return BlossomResponse::error("Invalid file id"); + }; + + if id.len() != 32 { + return BlossomResponse::error("Invalid file id"); + } + if let Ok(Some(_info)) = db.get_file(&id).await { + db.delete_file(&id).await?; + fs::remove_file(fs.get(&id))?; + BlossomResponse::StatusOnly(Status::Ok) + } else { + BlossomResponse::StatusOnly(Status::NotFound) } } #[rocket::put("/upload", data = "")] -async fn upload(auth: BlossomAuth, fs: &State, data: Data<'_>) -> Status { +async fn upload(auth: BlossomAuth, fs: &State, db: &State, data: Data<'_>) + -> BlossomResponse { if !check_method(&auth.event, "upload") { - return Status::NotFound; + return BlossomResponse::error("Invalid request method tag"); } + let name = auth.event.tags.iter().find_map(|t| match t { + Tag::Name(s) => Some(s.clone()), + _ => None + }); + let size = auth.event.tags.iter().find_map(|t| { + let values = t.as_vec(); + if values.len() == 2 && values[0] == "size" { + Some(values[1].parse::().unwrap()) + } else { + None + } + }); + if size.is_none() { + return BlossomResponse::error("Invalid request, no size tag"); + } match fs.put(data.open(8.gigabytes())).await { - Ok(blob) => Status::Ok, - Err(e) => Status::InternalServerError, + Ok(blob) => { + let pubkey_vec = auth.event.pubkey.to_bytes().to_vec(); + let user_id = match db.upsert_user(&pubkey_vec).await { + Ok(u) => u, + Err(e) => return BlossomResponse::error(format!("Failed to save file (db): {}", e)) + }; + let f = FileUpload { + id: blob.sha256, + user_id: user_id as u64, + name: name.unwrap_or("".to_string()), + size: blob.size, + mime_type: auth.content_type.unwrap_or("application/octet-stream".to_string()), + created: Utc::now(), + }; + if let Err(e) = db.add_file(&f).await { + error!("{:?}", e); + BlossomResponse::error(format!("Error saving file (db): {}", e)) + } else { + BlossomResponse::BlobDescriptor(Json(BlobDescriptor::from(&f))) + } + } + Err(e) => { + error!("{:?}", e); + BlossomResponse::error(format!("Error saving file (disk): {}", e)) + } } } #[rocket::get("/list/")] async fn list_files( - auth: BlossomAuth, db: &State, - pubkey: String, -) -> Result>, Status> { - if !check_method(&auth.event, "list") { - return Err(Status::NotFound); - } + pubkey: &str, +) -> BlossomResponse { let id = if let Ok(i) = hex::decode(pubkey) { i } else { - return Err(Status::NotFound); + return BlossomResponse::error("invalid pubkey"); }; - if let Ok(files) = db.list_files(&id).await { - Ok(Json( - files - .iter() - .map(|f| BlobDescriptor { - url: "".to_string(), - sha256: hex::encode(&f.id), - size: f.size, - mime_type: None, - created: f.created, - }) - .collect(), - )) - } else { - Err(Status::InternalServerError) + match db.list_files(&id).await { + Ok(files) => BlobDescriptorList(Json(files.iter() + .map(|f| BlobDescriptor::from(f)) + .collect()) + ), + Err(e) => BlossomResponse::error(format!("Could not list files: {}", e)) } -} +} \ No newline at end of file