mirror of
https://github.com/v0l/route96.git
synced 2025-06-17 08:38:49 +00:00
feat: infer mime
feat: store in tmp dir next to output dir
This commit is contained in:
@ -1,32 +1,43 @@
|
||||
use std::env::temp_dir;
|
||||
use std::fs;
|
||||
use std::io::SeekFrom;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::Error;
|
||||
use chrono::Utc;
|
||||
use ffmpeg_rs_raw::DemuxerInfo;
|
||||
use log::info;
|
||||
use rocket::form::validate::Contains;
|
||||
use serde::Serialize;
|
||||
use sha2::{Digest, Sha256};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
#[cfg(feature = "labels")]
|
||||
use crate::db::FileLabel;
|
||||
use crate::db::FileUpload;
|
||||
#[cfg(feature = "labels")]
|
||||
use crate::processing::labeling::label_frame;
|
||||
#[cfg(feature = "media-compression")]
|
||||
use crate::processing::{compress_file, probe_file, FileProcessorResult};
|
||||
use crate::processing::{compress_file, probe_file};
|
||||
use crate::settings::Settings;
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
#[cfg(feature = "media-compression")]
|
||||
use ffmpeg_rs_raw::DemuxerInfo;
|
||||
#[cfg(feature = "media-compression")]
|
||||
use rocket::form::validate::Contains;
|
||||
use serde::Serialize;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Default, Serialize)]
|
||||
pub struct FileSystemResult {
|
||||
#[derive(Clone)]
|
||||
pub enum FileSystemResult {
|
||||
/// File hash already exists
|
||||
AlreadyExists(Vec<u8>),
|
||||
/// New file created on disk and is stored
|
||||
NewFile(NewFileResult),
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct NewFileResult {
|
||||
pub path: PathBuf,
|
||||
pub upload: FileUpload,
|
||||
#[serde(with = "hex")]
|
||||
pub id: Vec<u8>,
|
||||
pub size: u64,
|
||||
pub mime_type: String,
|
||||
pub width: Option<u32>,
|
||||
pub height: Option<u32>,
|
||||
pub blur_hash: Option<String>,
|
||||
#[cfg(feature = "labels")]
|
||||
pub labels: Vec<FileLabel>,
|
||||
}
|
||||
|
||||
pub struct FileStore {
|
||||
@ -44,178 +55,168 @@ impl FileStore {
|
||||
}
|
||||
|
||||
/// Store a new file
|
||||
pub async fn put<S>(
|
||||
pub async fn put<'r, S>(
|
||||
&self,
|
||||
stream: S,
|
||||
mime_type: &str,
|
||||
compress: bool,
|
||||
) -> Result<FileSystemResult, Error>
|
||||
) -> Result<FileSystemResult>
|
||||
where
|
||||
S: AsyncRead + Unpin,
|
||||
S: AsyncRead + Unpin + 'r,
|
||||
{
|
||||
let result = self
|
||||
.store_compress_file(stream, mime_type, compress)
|
||||
.await?;
|
||||
let dst_path = self.map_path(&result.upload.id);
|
||||
// store file in temp path and hash the file
|
||||
let (temp_file, size, hash) = self.store_hash_temp_file(stream).await?;
|
||||
let dst_path = self.map_path(&hash);
|
||||
|
||||
// check if file hash already exists
|
||||
if dst_path.exists() {
|
||||
fs::remove_file(result.path)?;
|
||||
return Ok(FileSystemResult {
|
||||
path: dst_path,
|
||||
..result
|
||||
});
|
||||
tokio::fs::remove_file(temp_file).await?;
|
||||
return Ok(FileSystemResult::AlreadyExists(hash));
|
||||
}
|
||||
fs::create_dir_all(dst_path.parent().unwrap())?;
|
||||
if let Err(e) = fs::copy(&result.path, &dst_path) {
|
||||
fs::remove_file(&result.path)?;
|
||||
Err(Error::from(e))
|
||||
|
||||
let mut res = if compress {
|
||||
#[cfg(feature = "media-compression")]
|
||||
{
|
||||
let res = match self.compress_file(&temp_file, mime_type).await {
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&temp_file).await?;
|
||||
return Err(e);
|
||||
}
|
||||
Ok(res) => res,
|
||||
};
|
||||
tokio::fs::remove_file(temp_file).await?;
|
||||
res
|
||||
}
|
||||
#[cfg(not(feature = "media-compression"))]
|
||||
{
|
||||
anyhow::bail!("Compression not supported!");
|
||||
}
|
||||
} else {
|
||||
fs::remove_file(result.path)?;
|
||||
Ok(FileSystemResult {
|
||||
path: dst_path,
|
||||
..result
|
||||
})
|
||||
let (width, height, mime_type) = {
|
||||
#[cfg(feature = "media-compression")]
|
||||
{
|
||||
let probe = probe_file(&temp_file).ok();
|
||||
let v_stream = probe.as_ref().and_then(|p| p.best_video());
|
||||
let mime = Self::hack_mime_type(mime_type, &probe, &temp_file);
|
||||
(
|
||||
v_stream.map(|v| v.width as u32),
|
||||
v_stream.map(|v| v.height as u32),
|
||||
mime,
|
||||
)
|
||||
}
|
||||
#[cfg(not(feature = "media-compression"))]
|
||||
(None, None, Self::infer_mime_type(mime_type, &temp_file))
|
||||
};
|
||||
NewFileResult {
|
||||
path: temp_file,
|
||||
id: hash,
|
||||
size,
|
||||
mime_type,
|
||||
width,
|
||||
height,
|
||||
blur_hash: None,
|
||||
}
|
||||
};
|
||||
|
||||
// copy temp file to final destination
|
||||
let final_dest = self.map_path(&res.id);
|
||||
|
||||
// Compressed file already exists
|
||||
if final_dest.exists() {
|
||||
tokio::fs::remove_file(&res.path).await?;
|
||||
Ok(FileSystemResult::AlreadyExists(res.id))
|
||||
} else {
|
||||
tokio::fs::create_dir_all(final_dest.parent().unwrap()).await?;
|
||||
tokio::fs::rename(&res.path, &final_dest).await?;
|
||||
|
||||
res.path = final_dest;
|
||||
Ok(FileSystemResult::NewFile(res))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "media-compression")]
|
||||
/// Try to replace the mime-type when unknown using ffmpeg probe result
|
||||
fn hack_mime_type(mime_type: &str, p: &DemuxerInfo) -> String {
|
||||
if mime_type == "application/octet-stream" {
|
||||
fn hack_mime_type(mime_type: &str, p: &Option<DemuxerInfo>, out_path: &PathBuf) -> String {
|
||||
if let Some(p) = p {
|
||||
if p.format.contains("mp4") {
|
||||
"video/mp4".to_string()
|
||||
return "video/mp4".to_string();
|
||||
} else if p.format.contains("webp") {
|
||||
"image/webp".to_string()
|
||||
return "image/webp".to_string();
|
||||
} else if p.format.contains("jpeg") {
|
||||
"image/jpeg".to_string()
|
||||
return "image/jpeg".to_string();
|
||||
} else if p.format.contains("png") {
|
||||
"image/png".to_string()
|
||||
return "image/png".to_string();
|
||||
} else if p.format.contains("gif") {
|
||||
"image/gif".to_string()
|
||||
} else {
|
||||
mime_type.to_string()
|
||||
return "image/gif".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// infer mime type
|
||||
Self::infer_mime_type(mime_type, out_path)
|
||||
}
|
||||
|
||||
fn infer_mime_type(mime_type: &str, out_path: &PathBuf) -> String {
|
||||
// infer mime type
|
||||
if let Ok(Some(i)) = infer::get_from_path(out_path) {
|
||||
i.mime_type().to_string()
|
||||
} else {
|
||||
mime_type.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
async fn store_compress_file<S>(
|
||||
&self,
|
||||
mut stream: S,
|
||||
mime_type: &str,
|
||||
compress: bool,
|
||||
) -> Result<FileSystemResult, Error>
|
||||
where
|
||||
S: AsyncRead + Unpin,
|
||||
{
|
||||
let random_id = uuid::Uuid::new_v4();
|
||||
let tmp_path = FileStore::map_temp(random_id);
|
||||
let mut file = File::options()
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.write(true)
|
||||
.read(true)
|
||||
.open(tmp_path.clone())
|
||||
.await?;
|
||||
tokio::io::copy(&mut stream, &mut file).await?;
|
||||
async fn compress_file(&self, input: &PathBuf, mime_type: &str) -> Result<NewFileResult> {
|
||||
let compressed_result = compress_file(input, mime_type, &self.temp_dir())?;
|
||||
#[cfg(feature = "labels")]
|
||||
let labels = if let Some(mp) = &self.settings.vit_model {
|
||||
label_frame(
|
||||
&compressed_result.result,
|
||||
mp.model.clone(),
|
||||
mp.config.clone(),
|
||||
)?
|
||||
.iter()
|
||||
.map(|l| FileLabel::new(l.0.clone(), "vit224".to_string()))
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
let hash = FileStore::hash_file(&compressed_result.result).await?;
|
||||
|
||||
info!("File saved to temp path: {}", tmp_path.to_str().unwrap());
|
||||
|
||||
#[cfg(feature = "media-compression")]
|
||||
if compress {
|
||||
let start = SystemTime::now();
|
||||
let proc_result = compress_file(tmp_path.clone(), mime_type)?;
|
||||
if let FileProcessorResult::NewFile(new_temp) = proc_result {
|
||||
let old_size = tmp_path.metadata()?.len();
|
||||
let new_size = new_temp.result.metadata()?.len();
|
||||
let time_compress = SystemTime::now().duration_since(start)?;
|
||||
let start = SystemTime::now();
|
||||
|
||||
#[cfg(feature = "labels")]
|
||||
let labels = if let Some(mp) = &self.settings.vit_model {
|
||||
label_frame(&new_temp.result, mp.model.clone(), mp.config.clone())?
|
||||
.iter()
|
||||
.map(|l| FileLabel::new(l.0.clone(), "vit224".to_string()))
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
let time_labels = SystemTime::now().duration_since(start)?;
|
||||
|
||||
// delete old temp
|
||||
fs::remove_file(tmp_path)?;
|
||||
file = File::options()
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.write(true)
|
||||
.read(true)
|
||||
.open(new_temp.result.clone())
|
||||
.await?;
|
||||
let n = file.metadata().await?.len();
|
||||
let hash = FileStore::hash_file(&mut file).await?;
|
||||
|
||||
info!("Processed media: ratio={:.2}x, old_size={:.3}kb, new_size={:.3}kb, duration_compress={:.2}ms, duration_labels={:.2}ms",
|
||||
old_size as f32 / new_size as f32,
|
||||
old_size as f32 / 1024.0,
|
||||
new_size as f32 / 1024.0,
|
||||
time_compress.as_micros() as f64 / 1000.0,
|
||||
time_labels.as_micros() as f64 / 1000.0
|
||||
);
|
||||
|
||||
return Ok(FileSystemResult {
|
||||
path: new_temp.result,
|
||||
upload: FileUpload {
|
||||
id: hash,
|
||||
name: "".to_string(),
|
||||
size: n,
|
||||
width: Some(new_temp.width as u32),
|
||||
height: Some(new_temp.height as u32),
|
||||
blur_hash: None,
|
||||
mime_type: new_temp.mime_type,
|
||||
#[cfg(feature = "labels")]
|
||||
labels,
|
||||
created: Utc::now(),
|
||||
..Default::default()
|
||||
},
|
||||
});
|
||||
}
|
||||
} else if let Ok(p) = probe_file(tmp_path.clone()) {
|
||||
let n = file.metadata().await?.len();
|
||||
let hash = FileStore::hash_file(&mut file).await?;
|
||||
let v_stream = p.best_video();
|
||||
return Ok(FileSystemResult {
|
||||
path: tmp_path,
|
||||
upload: FileUpload {
|
||||
id: hash,
|
||||
name: "".to_string(),
|
||||
size: n,
|
||||
created: Utc::now(),
|
||||
mime_type: Self::hack_mime_type(mime_type, &p),
|
||||
width: v_stream.map(|v| v.width as u32),
|
||||
height: v_stream.map(|v| v.height as u32),
|
||||
..Default::default()
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
let n = file.metadata().await?.len();
|
||||
let hash = FileStore::hash_file(&mut file).await?;
|
||||
Ok(FileSystemResult {
|
||||
path: tmp_path,
|
||||
upload: FileUpload {
|
||||
id: hash,
|
||||
name: "".to_string(),
|
||||
size: n,
|
||||
created: Utc::now(),
|
||||
mime_type: mime_type.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
let n = File::open(&compressed_result.result)
|
||||
.await?
|
||||
.metadata()
|
||||
.await?
|
||||
.len();
|
||||
Ok(NewFileResult {
|
||||
path: compressed_result.result,
|
||||
id: hash,
|
||||
size: n,
|
||||
width: Some(compressed_result.width as u32),
|
||||
height: Some(compressed_result.height as u32),
|
||||
blur_hash: None,
|
||||
mime_type: compressed_result.mime_type,
|
||||
#[cfg(feature = "labels")]
|
||||
labels,
|
||||
})
|
||||
}
|
||||
|
||||
async fn hash_file(file: &mut File) -> Result<Vec<u8>, Error> {
|
||||
async fn store_hash_temp_file<S>(&self, mut stream: S) -> Result<(PathBuf, u64, Vec<u8>)>
|
||||
where
|
||||
S: AsyncRead + Unpin,
|
||||
{
|
||||
let uid = Uuid::new_v4();
|
||||
let out_path = self.temp_dir().join(uid.to_string());
|
||||
tokio::fs::create_dir_all(&out_path.parent().unwrap()).await?;
|
||||
|
||||
let mut file = File::create(&out_path).await?;
|
||||
let n = tokio::io::copy(&mut stream, &mut file).await?;
|
||||
|
||||
let hash = FileStore::hash_file(&out_path).await?;
|
||||
Ok((out_path, n, hash))
|
||||
}
|
||||
|
||||
async fn hash_file(p: &PathBuf) -> Result<Vec<u8>, Error> {
|
||||
let mut file = File::open(p).await?;
|
||||
let mut hasher = Sha256::new();
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut buf = [0; 4096];
|
||||
loop {
|
||||
let n = file.read(&mut buf).await?;
|
||||
@ -228,15 +229,16 @@ impl FileStore {
|
||||
Ok(res.to_vec())
|
||||
}
|
||||
|
||||
fn map_temp(id: uuid::Uuid) -> PathBuf {
|
||||
temp_dir().join(id.to_string())
|
||||
}
|
||||
|
||||
pub fn map_path(&self, id: &Vec<u8>) -> PathBuf {
|
||||
let id = hex::encode(id);
|
||||
Path::new(&self.settings.storage_dir)
|
||||
.join(&id[0..2])
|
||||
.join(&id[2..4])
|
||||
.join(id)
|
||||
self.storage_dir().join(&id[0..2]).join(&id[2..4]).join(id)
|
||||
}
|
||||
|
||||
pub fn temp_dir(&self) -> PathBuf {
|
||||
self.storage_dir().join("tmp")
|
||||
}
|
||||
|
||||
pub fn storage_dir(&self) -> PathBuf {
|
||||
PathBuf::from(&self.settings.storage_dir)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user