feat: list files (nip96)
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
kieran 2024-05-29 14:31:44 +01:00
parent a9a9ba6328
commit 5547673331
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
6 changed files with 262 additions and 102 deletions

View File

@ -1,14 +1,21 @@
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::{Error, Executor, FromRow, Row};
use sqlx::migrate::MigrateError;
#[derive(Clone, FromRow)]
#[derive(Clone, FromRow, Default, Serialize)]
pub struct FileUpload {
pub id: Vec<u8>,
pub name: String,
pub size: u64,
pub mime_type: String,
pub created: DateTime<Utc>,
pub width: Option<u32>,
pub height: Option<u32>,
pub blur_hash: Option<String>,
#[sqlx(skip)]
pub labels: Vec<FileLabel>,
}
#[derive(Clone, FromRow)]
@ -18,6 +25,25 @@ pub struct User {
pub created: DateTime<Utc>,
}
#[derive(Clone, FromRow, Serialize)]
pub struct FileLabel {
pub file: Vec<u8>,
pub label: String,
pub created: DateTime<Utc>,
pub model: String,
}
impl FileLabel {
pub fn new(label: String, model: String) -> Self {
Self {
file: vec![],
label,
created: Utc::now(),
model,
}
}
}
#[derive(Clone)]
pub struct Database {
pool: sqlx::pool::Pool<sqlx::mysql::MySql>,
@ -58,16 +84,28 @@ impl Database {
pub async fn add_file(&self, file: &FileUpload, user_id: u64) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
let q = sqlx::query("insert ignore into uploads(id,name,size,mime_type) values(?,?,?,?)")
let q = sqlx::query("insert ignore into uploads(id,name,size,mime_type,blur_hash,width,height) values(?,?,?,?,?,?,?)")
.bind(&file.id)
.bind(&file.name)
.bind(file.size)
.bind(&file.mime_type);
.bind(&file.mime_type)
.bind(&file.blur_hash)
.bind(file.width)
.bind(file.height);
tx.execute(q).await?;
let q2 = sqlx::query("insert into user_uploads(file,user_id) values(?,?)")
.bind(&file.id)
.bind(user_id);
tx.execute(q).await?;
tx.execute(q2).await?;
for lbl in &file.labels {
let q3 = sqlx::query("insert into upload_labels(file,label,model) values(?,?,?)")
.bind(&file.id)
.bind(&lbl.label)
.bind(&lbl.model);
tx.execute(q3).await?;
}
tx.commit().await?;
Ok(())
}
@ -86,6 +124,13 @@ impl Database {
.await
}
pub async fn get_file_labels(&self, file: &Vec<u8>) -> Result<Vec<FileLabel>, Error> {
sqlx::query_as("select upload_labels.* from uploads, upload_labels where uploads.id = ? and uploads.id = upload_labels.file")
.bind(file)
.fetch_all(&self.pool)
.await
}
pub async fn delete_file_owner(&self, file: &Vec<u8>, owner: u64) -> Result<(), Error> {
sqlx::query("delete from user_uploads where file = ? and user_id = ?")
.bind(file)
@ -94,7 +139,7 @@ impl Database {
.await?;
Ok(())
}
pub async fn delete_file(&self, file: &Vec<u8>) -> Result<(), Error> {
sqlx::query("delete from uploads where id = ?")
.bind(file)
@ -103,13 +148,31 @@ impl Database {
Ok(())
}
pub async fn list_files(&self, pubkey: &Vec<u8>) -> Result<Vec<FileUpload>, Error> {
pub async fn list_files(&self, pubkey: &Vec<u8>, offset: u32, limit: u32) -> Result<(Vec<FileUpload>, i64), Error> {
let results: Vec<FileUpload> = sqlx::query_as(
"select * from uploads where user_id = (select id from users where pubkey = ?)",
"select uploads.* from uploads, users, user_uploads \
where users.pubkey = ? \
and users.id = user_uploads.user_id \
and user_uploads.file = uploads.id \
order by uploads.created desc \
limit ? offset ?",
)
.bind(pubkey)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(results)
let count: i64 = sqlx::query(
"select count(uploads.id) from uploads, users, user_uploads \
where users.pubkey = ? \
and users.id = user_uploads.user_id \
and user_uploads.file = uploads.id \
order by uploads.created desc")
.bind(pubkey)
.fetch_one(&self.pool)
.await?
.try_get(0)?;
Ok((results, count))
}
}

View File

@ -5,29 +5,22 @@ use std::path::{Path, PathBuf};
use std::time::SystemTime;
use anyhow::Error;
use chrono::Utc;
use log::info;
use serde::Serialize;
use serde_with::serde_as;
use sha2::{Digest, Sha256};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
use crate::db::{FileLabel, FileUpload};
use crate::processing::{compress_file, FileProcessorResult};
use crate::processing::labeling::label_frame;
use crate::settings::Settings;
#[serde_as]
#[derive(Clone, Default, Serialize)]
pub struct FileSystemResult {
pub path: PathBuf,
#[serde_as(as = "serde_with::hex::Hex")]
pub sha256: Vec<u8>,
pub size: u64,
pub mime_type: String,
pub width: Option<usize>,
pub height: Option<usize>,
pub blur_hash: Option<String>,
pub labels: Option<Vec<String>>,
pub upload: FileUpload,
}
pub struct FileStore {
@ -52,7 +45,7 @@ impl FileStore {
TStream: AsyncRead + Unpin,
{
let result = self.store_compress_file(stream, mime_type, compress).await?;
let dst_path = self.map_path(&result.sha256);
let dst_path = self.map_path(&result.upload.id);
if dst_path.exists() {
fs::remove_file(result.path)?;
return Ok(FileSystemResult {
@ -104,7 +97,7 @@ impl FileStore {
new_temp.height as u32,
new_temp.image.as_slice(),
)?;
let time_blurhash = SystemTime::now().duration_since(start).unwrap();
let time_blur_hash = SystemTime::now().duration_since(start).unwrap();
let start = SystemTime::now();
let labels = if let Some(mp) = &self.settings.vit_model_path {
label_frame(
@ -112,6 +105,8 @@ impl FileStore {
new_temp.width,
new_temp.height,
mp.clone())?
.iter().map(|l| FileLabel::new(l.clone(), "vit224".to_string()))
.collect()
} else {
vec![]
};
@ -129,24 +124,28 @@ impl FileStore {
let n = file.metadata().await?.len();
let hash = FileStore::hash_file(&mut file).await?;
info!("Processed media: ratio={:.2}x, old_size={:.3}kb, new_size={:.3}kb, duration_compress={:.2}ms, duration_blurhash={:.2}ms, duration_labels={:.2}ms",
info!("Processed media: ratio={:.2}x, old_size={:.3}kb, new_size={:.3}kb, duration_compress={:.2}ms, duration_blur_hash={:.2}ms, duration_labels={:.2}ms",
old_size as f32 / new_size as f32,
old_size as f32 / 1024.0,
new_size as f32 / 1024.0,
time_compress.as_micros() as f64 / 1000.0,
time_blurhash.as_micros() as f64 / 1000.0,
time_blur_hash.as_micros() as f64 / 1000.0,
time_labels.as_micros() as f64 / 1000.0
);
return Ok(FileSystemResult {
size: n,
sha256: hash,
path: new_temp.result,
width: Some(new_temp.width),
height: Some(new_temp.height),
blur_hash: Some(blur_hash),
mime_type: new_temp.mime_type,
labels: Some(labels),
upload: FileUpload {
id: hash,
name: "".to_string(),
size: n,
width: Some(new_temp.width as u32),
height: Some(new_temp.height as u32),
blur_hash: Some(blur_hash),
mime_type: new_temp.mime_type,
labels,
created: Utc::now(),
},
});
}
}
@ -154,10 +153,13 @@ impl FileStore {
let hash = FileStore::hash_file(&mut file).await?;
Ok(FileSystemResult {
path: tmp_path,
sha256: hash,
size: n,
mime_type: mime_type.to_string(),
..Default::default()
upload: FileUpload {
id: hash,
name: "".to_string(),
size: n,
created: Utc::now(),
..Default::default()
},
})
}

View File

@ -9,9 +9,27 @@ use crate::processing::webp::WebpProcessor;
mod webp;
pub mod labeling;
mod probe;
pub struct ProbeResult {
pub streams: Vec<ProbeStream>,
}
pub enum ProbeStream {
Video {
width: u32,
height: u32,
codec: String,
},
Audio {
sample_rate: u32,
codec: String,
},
}
pub(crate) enum FileProcessorResult {
NewFile(NewFileProcessorResult),
Probe(ProbeResult),
Skip,
}

View File

@ -122,7 +122,9 @@ async fn upload(
.put(data.open(ByteUnit::from(settings.max_upload_bytes)), &mime_type, false)
.await
{
Ok(blob) => {
Ok(mut blob) => {
blob.upload.name = name.unwrap_or("".to_string());
let pubkey_vec = auth.event.pubkey.to_bytes().to_vec();
if let Some(wh) = webhook.as_ref() {
match wh.store_file(&pubkey_vec, blob.clone()) {
@ -142,14 +144,7 @@ async fn upload(
return BlossomResponse::error(format!("Failed to save file (db): {}", e));
}
};
let f = FileUpload {
id: blob.sha256,
name: name.unwrap_or("".to_string()),
size: blob.size,
mime_type: blob.mime_type,
created: Utc::now(),
};
if let Err(e) = db.add_file(&f, user_id).await {
if let Err(e) = db.add_file(&blob.upload, user_id).await {
error!("{}", e.to_string());
let _ = fs::remove_file(blob.path);
if let Some(dbe) = e.as_database_error() {
@ -162,7 +157,7 @@ async fn upload(
BlossomResponse::error(format!("Error saving file (db): {}", e))
} else {
BlossomResponse::BlobDescriptor(Json(BlobDescriptor::from_upload(
&f,
&blob.upload,
&settings.public_url,
)))
}
@ -185,11 +180,11 @@ async fn list_files(
} else {
return BlossomResponse::error("invalid pubkey");
};
match db.list_files(&id).await {
Ok(files) => BlossomResponse::BlobDescriptorList(Json(
match db.list_files(&id, 0, 10_000).await {
Ok((files, _count)) => BlossomResponse::BlobDescriptorList(Json(
files
.iter()
.map(|f| BlobDescriptor::from_upload(&f, &settings.public_url))
.map(|f| BlobDescriptor::from_upload(f, &settings.public_url))
.collect(),
)),
Err(e) => BlossomResponse::error(format!("Could not list files: {}", e)),

View File

@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::fs;
use chrono::Utc;
use log::error;
use rocket::{FromForm, Responder, Route, routes, State};
use rocket::form::Form;
@ -63,6 +62,15 @@ struct Nip96MediaTransformations {
pub video: Option<Vec<String>>,
}
#[derive(Serialize, Default)]
#[serde(crate = "rocket::serde")]
struct Nip96FileListResults {
pub count: u32,
pub page: u32,
pub total: u32,
pub files: Vec<Nip94Event>,
}
#[derive(Responder)]
enum Nip96Response {
#[response(status = 500)]
@ -70,6 +78,9 @@ enum Nip96Response {
#[response(status = 200)]
UploadResult(Json<Nip96UploadResult>),
#[response(status = 200)]
FileList(Json<Nip96FileListResults>),
}
impl Nip96Response {
@ -102,9 +113,50 @@ struct Nip96UploadResult {
pub nip94_event: Option<Nip94Event>,
}
impl Nip96UploadResult {
pub fn from_upload(settings: &Settings, upload: &FileUpload) -> Self {
let hex_id = hex::encode(&upload.id);
let mut tags = vec![
vec![
"url".to_string(),
format!("{}/{}", &settings.public_url, &hex_id),
],
vec!["x".to_string(), hex_id],
vec!["m".to_string(), upload.mime_type.clone()],
];
if let Some(bh) = &upload.blur_hash {
tags.push(vec!["blurhash".to_string(), bh.clone()]);
}
if let (Some(w), Some(h)) = (upload.width, upload.height) {
tags.push(vec!["dim".to_string(), format!("{}x{}", w, h)])
}
for l in &upload.labels {
let val = if l.label.contains(',') {
let split_val: Vec<&str> = l.label.split(',').collect();
split_val[0].to_string()
} else {
l.label.clone()
};
tags.push(vec!["t".to_string(), val])
}
Self {
status: "success".to_string(),
nip94_event: Some(Nip94Event {
content: upload.name.clone(),
created_at: upload.created.timestamp(),
tags,
}),
..Default::default()
}
}
}
#[derive(Serialize, Default)]
#[serde(crate = "rocket::serde")]
struct Nip94Event {
pub created_at: i64,
pub content: String,
pub tags: Vec<Vec<String>>,
}
@ -121,7 +173,7 @@ struct Nip96Form<'r> {
}
pub fn nip96_routes() -> Vec<Route> {
routes![get_info_doc, upload, delete]
routes![get_info_doc, upload, delete, list_files]
}
#[rocket::get("/.well-known/nostr/nip96.json")]
@ -172,6 +224,13 @@ async fn upload(
let mime_type = form.media_type
.unwrap_or("application/octet-stream");
if form.expiration.is_some() {
return Nip96Response::error("Expiration not supported");
}
if form.alt.is_some() {
return Nip96Response::error("\"alt\" is not supported");
}
// check whitelist
if let Some(wl) = &settings.whitelist {
if !wl.contains(&auth.event.pubkey.to_hex()) {
@ -182,7 +241,11 @@ async fn upload(
.put(file, mime_type, !form.no_transform.unwrap_or(false))
.await
{
Ok(blob) => {
Ok(mut blob) => {
blob.upload.name = match &form.caption {
Some(c) => c.to_string(),
None => "".to_string(),
};
let pubkey_vec = auth.event.pubkey.to_bytes().to_vec();
if let Some(wh) = webhook.as_ref() {
match wh.store_file(&pubkey_vec, blob.clone()) {
@ -200,19 +263,10 @@ async fn upload(
Ok(u) => u,
Err(e) => return Nip96Response::error(&format!("Could not save user: {}", e)),
};
let file_upload = FileUpload {
id: blob.sha256,
name: match &form.caption {
Some(c) => c.to_string(),
None => "".to_string(),
},
size: blob.size,
mime_type: blob.mime_type,
created: Utc::now(),
};
if let Err(e) = db.add_file(&file_upload, user_id).await {
let tmp_file = blob.path.clone();
if let Err(e) = db.add_file(&blob.upload, user_id).await {
error!("{}", e.to_string());
let _ = fs::remove_file(blob.path);
let _ = fs::remove_file(tmp_file);
if let Some(dbe) = e.as_database_error() {
if let Some(c) = dbe.code() {
if c == "23000" {
@ -223,39 +277,7 @@ async fn upload(
return Nip96Response::error(&format!("Could not save file (db): {}", e));
}
let hex_id = hex::encode(&file_upload.id);
let mut tags = vec![
vec![
"url".to_string(),
format!("{}/{}", &settings.public_url, &hex_id),
],
vec!["x".to_string(), hex_id],
vec!["m".to_string(), file_upload.mime_type],
];
if let Some(bh) = blob.blur_hash {
tags.push(vec!["blurhash".to_string(), bh]);
}
if let (Some(w), Some(h)) = (blob.width, blob.height) {
tags.push(vec!["dim".to_string(), format!("{}x{}", w, h)])
}
if let Some(lbls) = blob.labels {
for l in lbls {
let val = if l.contains(',') {
let split_val: Vec<&str> = l.split(',').collect();
split_val[0].to_string()
} else {
l
};
tags.push(vec!["t".to_string(), val])
}
}
Nip96Response::UploadResult(Json(Nip96UploadResult {
status: "success".to_string(),
nip94_event: Some(Nip94Event {
tags,
}),
..Default::default()
}))
Nip96Response::UploadResult(Json(Nip96UploadResult::from_upload(settings, &blob.upload)))
}
Err(e) => {
error!("{}", e.to_string());
@ -276,3 +298,29 @@ async fn delete(
Err(e) => Nip96Response::error(&format!("Failed to delete file: {}", e)),
}
}
#[rocket::get("/n96?<page>&<count>")]
async fn list_files(
auth: Nip98Auth,
page: u32,
count: u32,
db: &State<Database>,
settings: &State<Settings>,
) -> Nip96Response {
let pubkey_vec = auth.event.pubkey.to_bytes().to_vec();
let server_count = count.min(5_000).max(1);
match db.list_files(&pubkey_vec, page * server_count, server_count).await {
Ok((files, total)) => Nip96Response::FileList(Json(Nip96FileListResults {
count: server_count,
page,
total: total as u32,
files: files
.iter()
.map(|f| Nip96UploadResult::from_upload(settings, f).nip94_event.unwrap())
.collect(),
}
)),
Err(e) => Nip96Response::error(&format!("Could not list files: {}", e)),
}
}

View File

@ -25,6 +25,41 @@
}
</style>
<script>
async function dumpToLog(rsp) {
console.debug(rsp);
const text = await rsp.text();
if (rsp.ok) {
document.querySelector("#log").append(JSON.stringify(JSON.parse(text), undefined, 2));
} else {
document.querySelector("#log").append(text);
}
document.querySelector("#log").append("\n");
}
async function listFiles() {
try {
const auth_event = await window.nostr.signEvent({
kind: 27235,
created_at: Math.floor(new Date().getTime() / 1000),
content: "",
tags: [
["u", `${window.location.protocol}//${window.location.host}/n96`],
["method", "GET"]
]
});
const rsp = await fetch("/n96?page=0&count=100", {
method: "GET",
headers: {
accept: "application/json",
authorization: `Nostr ${btoa(JSON.stringify(auth_event))}`,
},
});
await dumpToLog(rsp);
} catch (e) {
}
}
async function uploadFiles(e) {
try {
const input = document.querySelector("#file");
@ -55,14 +90,7 @@
authorization: `Nostr ${btoa(JSON.stringify(auth_event))}`,
},
});
console.debug(rsp);
const text = await rsp.text();
if (rsp.ok) {
document.querySelector("#log").append(JSON.stringify(JSON.parse(text), undefined, 2));
} else {
document.querySelector("#log").append(text);
}
document.querySelector("#log").append("\n");
await dumpToLog(rsp);
} catch (ex) {
if (ex instanceof Error) {
alert(ex.message);
@ -94,6 +122,12 @@
Upload
</button>
</div>
<div>
<button type="submit" onclick="listFiles()">
List Uploads
</button>
</div>
</div>
<pre id="log"></pre>
</body>