This commit is contained in:
2025-02-27 12:50:04 +00:00
commit 30c60e3932
7 changed files with 3533 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.idea/

2930
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

19
Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "rust_dvm"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.96"
clap = { version = "4.5.31", features = ["derive"] }
env_logger = "0.11.6"
log = "0.4.26"
nostr-sdk = { version = "0.39.0" }
tokio = { version = "1.43.0", features = ["macros", "rt", "rt-multi-thread"] }
yt-dlp = "1.2.3"
serde = { version = "1.0.218", features = ["derive"] }
base64 = "0.22.1"
sha2 = "0.10.8"
url = "2.5.4"
reqwest = "0.12.12"
hex = "0.4.3"

97
src/blossom.rs Normal file
View File

@ -0,0 +1,97 @@
use anyhow::Result;
use base64::Engine;
use log::info;
use nostr_sdk::{serde_json, EventBuilder, JsonUtil, Kind, NostrSigner, Tag, Timestamp};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::io::SeekFrom;
use std::ops::Add;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use url::Url;
#[derive(Debug, Clone)]
pub struct Blossom {
url: Url,
client: reqwest::Client,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobDescriptor {
pub url: String,
pub sha256: String,
pub size: u64,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub mime_type: Option<String>,
pub uploaded: u64,
#[serde(rename = "nip94", skip_serializing_if = "Option::is_none")]
pub nip94: Option<HashMap<String, String>>,
}
impl Blossom {
pub fn new(url: &str) -> Self {
Self {
url: url.parse().unwrap(),
client: reqwest::Client::new(),
}
}
async fn hash_file(f: &mut File) -> Result<String> {
let mut hash = Sha256::new();
let mut buf: [u8; 1024] = [0; 1024];
f.seek(SeekFrom::Start(0)).await?;
while let Ok(data) = f.read(&mut buf[..]).await {
if data == 0 {
break;
}
hash.update(&buf[..data]);
}
let hash = hash.finalize();
f.seek(SeekFrom::Start(0)).await?;
Ok(hex::encode(hash))
}
pub async fn upload<S>(
&self,
from_file: &PathBuf,
keys: &S,
mime: Option<&str>,
) -> Result<BlobDescriptor>
where
S: NostrSigner,
{
let mut f = File::open(from_file).await?;
let hash = Self::hash_file(&mut f).await?;
let auth_event = EventBuilder::new(Kind::Custom(24242), "Upload blob").tags([
Tag::hashtag("upload"),
Tag::parse(["x", &hash])?,
Tag::expiration(Timestamp::now().add(60)),
]);
let auth_event = auth_event.sign(keys).await?;
let rsp = self
.client
.put(self.url.join("/upload").unwrap())
.header("Content-Type", mime.unwrap_or("application/octet-stream"))
.header(
"Authorization",
&format!(
"Nostr {}",
base64::engine::general_purpose::STANDARD
.encode(auth_event.as_json().as_bytes())
),
)
.body(f)
.send()
.await?
.text()
.await?;
info!("Upload response: {}", rsp);
Ok(serde_json::from_str::<BlobDescriptor>(&rsp)?)
}
}

81
src/dvm/mod.rs Normal file
View File

@ -0,0 +1,81 @@
use crate::blossom::Blossom;
use crate::dvm::tiktok::TikTokDvm;
use anyhow::Result;
use nostr_sdk::prelude::DataVendingMachineStatus;
use nostr_sdk::{Client, Event, EventBuilder, EventId, Kind, Tag, Timestamp, Url};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
mod tiktok;
#[derive(Clone)]
pub struct DVMJobRequest {
/// The source event
pub event: Event,
/// Input data for the job (zero or more inputs)
pub inputs: Vec<DVMInput>,
/// Expected output format. Different job request kind defines this more precisely.
pub output_type: Option<String>,
/// Optional parameters for the job as key (first argument)/value (second argument).
/// Different job request kind defines this more precisely. (e.g. [ "param", "lang", "es" ])
pub params: HashMap<String, String>,
/// Customer MAY specify a maximum amount (in millisats) they are willing to pay
pub bid: Option<u64>,
/// List of relays where Service Providers SHOULD publish responses to
pub relays: Vec<String>,
}
#[derive(Clone)]
pub enum DVMInput {
Url {
url: Url,
relay: Option<String>,
marker: Option<String>,
},
Event {
event: EventId,
relay: Option<String>,
marker: Option<String>,
},
Job {
event: EventId,
relay: Option<String>,
marker: Option<String>,
},
Text {
data: String,
relay: Option<String>,
marker: Option<String>,
},
}
/// Basic DVM handler that accepts a job request
pub trait DVMHandler: Send + Sync {
fn handle_request(
&mut self,
request: DVMJobRequest,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
}
/// Return list of all enabled DVMs
pub fn create_dvms(client: &Client, blossom: &Blossom) -> Vec<Box<dyn DVMHandler>> {
let ret: Vec<Box<dyn DVMHandler>> =
vec![Box::new(TikTokDvm::new(client.clone(), blossom.clone()))];
ret
}
pub fn build_status_for_job(
req: &DVMJobRequest,
status: DataVendingMachineStatus,
extra: Option<&str>,
content: Option<&str>,
) -> EventBuilder {
EventBuilder::new(Kind::JobFeedback, content.unwrap_or("")).tags([
Tag::parse(["status", status.to_string().as_str(), extra.unwrap_or("")]).unwrap(),
Tag::expiration(Timestamp::now() + Duration::from_secs(30)),
Tag::event(req.event.id),
Tag::public_key(req.event.pubkey),
])
}

169
src/dvm/tiktok.rs Normal file
View File

@ -0,0 +1,169 @@
use crate::blossom::Blossom;
use crate::dvm::{build_status_for_job, DVMHandler, DVMInput, DVMJobRequest};
use anyhow::{bail, Result};
use log::info;
use nostr_sdk::prelude::DataVendingMachineStatus;
use nostr_sdk::{serde_json, Client, EventBuilder, Kind, Tag};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
use std::fmt::format;
use std::fs::create_dir_all;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;
use yt_dlp::fetcher::deps::Libraries;
use yt_dlp::utils::executor::Executor;
use yt_dlp::Youtube;
/// Basic DVM which clones videos from TikTok and posts them as kind 22 shorts
pub struct TikTokDvm {
client: Client,
blossom: Blossom,
}
impl TikTokDvm {
pub fn new(client: Client, blossom: Blossom) -> Self {
Self { client, blossom }
}
}
impl DVMHandler for TikTokDvm {
fn handle_request(
&mut self,
request: DVMJobRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let client = self.client.clone();
let blossom = self.blossom.clone();
Box::pin(async move {
// TODO: process multiple inputs
let input = if let Some(i) = request.inputs.first() {
i
} else {
bail!("No job inputs found")
};
let input = if let DVMInput::Url { url, .. } = input {
url
} else {
bail!("Only URL inputs are accepted")
};
if input.host_str() != Some("www.tiktok.com") {
bail!("Only tiktok urls are accepted");
}
// Send starting status
let starting =
build_status_for_job(&request, DataVendingMachineStatus::Processing, None, None);
client.send_event_builder(starting).await?;
// search for lis
let tmp_dir = temp_dir().join("tiktok_dvm");
if !tmp_dir.exists() {
create_dir_all(&tmp_dir)?;
}
let lib_dir = tmp_dir.join("libs");
let libs = Libraries::new(lib_dir.join("yt-dlp"), lib_dir.join("ffmpeg"));
libs.install_youtube().await?;
let dlp = TikTok::new(libs);
let job_id = request.event.id;
let output_file = temp_dir().join(&format!("{}.mp4", request.event.id.to_hex()));
info!("[{}] Starting download of: {}", job_id, input);
dlp.download_video_from_url(input.as_str(), &output_file)
.await?;
info!(
"[{}] Uploading video from: {}",
job_id,
output_file.display()
);
let signer = client.signer().await?;
let res = blossom
.upload(&output_file, &signer, Some("video/mp4"))
.await?;
let info = dlp.get_video_info(input.as_str()).await?;
let mut imeta = vec!["imeta".to_string()];
for (k, v) in res.nip94.unwrap() {
imeta.push(format!("{} {}", k, v));
}
let ev = EventBuilder::new(Kind::Custom(22), "")
.tag(Tag::parse(imeta)?)
.tags([
Tag::parse(["title", &info.full_title])?,
Tag::parse(["r", input.as_str()])?,
]);
let event_posted = client.send_event_builder(ev).await?;
let job_data = NoviaVideoData {
event_id: event_posted.val.to_hex(),
video: res.sha256,
};
let status = build_status_for_job(
&request,
DataVendingMachineStatus::Success,
None,
Some(&serde_json::to_string(&job_data)?),
);
client.send_event_builder(status).await?;
info!("[{}] Job finished", job_id);
Ok(())
})
}
}
struct TikTok {
libs: Libraries,
}
impl TikTok {
pub fn new(libs: Libraries) -> Self {
Self { libs }
}
pub async fn get_video_info(&self, url: &str) -> Result<TikTokVideo> {
let exe = Executor {
executable_path: self.libs.youtube.clone(),
timeout: Duration::from_secs(60),
args: vec![url.to_string(), "--dump-json".to_string()],
};
let output = exe.execute().await?;
Ok(serde_json::from_str(&output.stdout)?)
}
pub async fn download_video_from_url(&self, url: &str, output: &Path) -> Result<()> {
let exe = Executor {
executable_path: self.libs.youtube.clone(),
timeout: Duration::from_secs(60),
args: vec![
url.to_string(),
"-o".to_string(),
output.to_string_lossy().into_owned(),
],
};
exe.execute().await?;
Ok(())
}
}
#[derive(Deserialize, Serialize)]
struct TikTokVideo {
#[serde(rename = "fulltitle")]
pub full_title: String,
}
#[derive(Deserialize, Serialize)]
struct NoviaVideoData {
event_id: String,
video: String,
}

235
src/main.rs Normal file
View File

@ -0,0 +1,235 @@
use crate::blossom::Blossom;
use crate::dvm::{build_status_for_job, create_dvms, DVMInput, DVMJobRequest};
use anyhow::{anyhow, Result};
use clap::builder::TypedValueParser;
use clap::Parser;
use log::{error, info, warn};
use nostr_sdk::async_utility::futures_util::SinkExt;
use nostr_sdk::prelude::{DataVendingMachineStatus, JobFeedbackData, StreamExt};
use nostr_sdk::{
ClientBuilder, Event, EventBuilder, EventId, Filter, Keys, Kind, RelayPoolNotification, Tag,
Timestamp,
};
use std::collections::HashMap;
mod blossom;
mod dvm;
#[derive(clap::Parser)]
struct Args {
#[arg(long)]
nsec: Option<String>,
/// One of more relays to listen to events on
#[arg(long)]
relay: Option<Vec<String>>,
/// One or more blossom servers to upload content to
#[arg(long)]
blossom: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let args = Args::parse();
let keys = if let Some(keys) = args.nsec {
Keys::parse(&keys)?
} else {
Keys::generate()
};
let client = ClientBuilder::new().signer(keys).build();
if let Some(relays) = &args.relay {
for relay in relays {
info!("Connecting relay {}", &relay);
client.add_relay(relay).await?;
}
} else {
info!("Using default relays");
client.add_relay("wss://relay.damus.io").await?;
client.add_relay("wss://relay.primal.net").await?;
client.add_relay("wss://relay.snort.social").await?;
client.add_relay("wss://relay.nostr.band").await?;
client.add_relay("wss://nos.lol").await?;
}
client.connect().await;
// TODO: support multiple servers
let blossom = Blossom::new(args.blossom.first().unwrap());
let mut dvms = create_dvms(&client, &blossom);
const JOB_REQUEST_KIND: u16 = 5_205;
// https://github.com/teamnovia/novia?tab=readme-ov-file#dvm-archive-aka-download-request-kind-5205
let sub = client
.subscribe(
Filter::new()
.kind(Kind::from(JOB_REQUEST_KIND))
.since(Timestamp::now()),
None,
)
.await?;
info!("Listening for jobs..");
let mut rx = client.notifications();
while let Ok(e) = rx.recv().await {
match e {
RelayPoolNotification::Event { event, .. } => match event.kind.as_u16() {
JOB_REQUEST_KIND => match parse_job_request(&event) {
Ok(req) => {
for mut dvm in &mut dvms {
if let Err(e) = dvm.handle_request(req.clone()).await {
error!("Error handling job request: {}", e);
let data = build_status_for_job(
&req,
DataVendingMachineStatus::Error,
Some(e.to_string().as_str()),
None,
);
client.send_event_builder(data).await?;
}
}
}
Err(e) => warn!("Invalid job request: {:?}", e),
},
_ => {
warn!("Unknown event kind {}", event.kind);
}
},
RelayPoolNotification::Message { .. } => {}
RelayPoolNotification::Shutdown => {}
}
}
client.unsubscribe(sub.val).await;
Ok(())
}
fn parse_job_request(event: &Event) -> Result<DVMJobRequest> {
let mut inputs = vec![];
for i_tag in event
.tags
.iter()
.filter(|t| t.kind().as_str() == "i")
.map(|t| t.as_slice())
{
let input = match i_tag[2].as_str() {
"url" => DVMInput::Url {
url: if let Ok(u) = i_tag[1].parse() {
u
} else {
warn!("Invalid url: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"event" => DVMInput::Event {
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
t
} else {
warn!("Invalid event id: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"job" => DVMInput::Job {
event: if let Ok(t) = EventId::parse(&i_tag[1]) {
t
} else {
warn!("Invalid event id in job: {}", i_tag[1]);
continue;
},
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
"text" => DVMInput::Text {
data: i_tag[1].to_string(),
relay: if i_tag.len() > 3 {
Some(i_tag[3].to_string())
} else {
None
},
marker: if i_tag.len() > 4 {
Some(i_tag[4].to_string())
} else {
None
},
},
t => {
warn!("unknown tag: {}", t);
continue;
}
};
inputs.push(input);
}
let params: HashMap<String, String> = event
.tags
.iter()
.filter(|t| t.kind().as_str() == "param")
.filter_map(|p| {
let p = p.as_slice();
if p.len() == 3 {
Some((p[0].clone(), p[1].clone()))
} else {
warn!("Invalid param: {}", p.join(", "));
None
}
})
.collect();
Ok(DVMJobRequest {
event: event.clone(),
inputs,
output_type: event
.tags
.iter()
.find(|t| t.kind().as_str() == "output")
.and_then(|t| t.content())
.map(|s| s.to_string()),
params,
bid: event
.tags
.iter()
.find(|t| t.kind().as_str() == "bid")
.and_then(|t| t.content())
.and_then(|t| t.parse::<u64>().ok()),
relays: event
.tags
.iter()
.filter(|t| t.kind().as_str() == "relay")
.map(|c| &c.as_slice()[1..])
.flatten()
.map(|s| s.to_string())
.collect(),
})
}