refactor: frame gen

This commit is contained in:
2025-06-09 13:08:03 +01:00
parent e400e969fd
commit 5d7da09801
15 changed files with 865 additions and 775 deletions

View File

@ -11,18 +11,18 @@ use hyper::service::Service;
use hyper::{Method, Request, Response};
use log::error;
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
use serde::{Serialize, Deserialize};
use serde::Serialize;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::File;
use tokio::fs::File;
use tokio::sync::RwLock;
use tokio_util::io::ReaderStream;
use zap_stream_core::viewer::ViewerTracker;
#[derive(Serialize)]
#[derive(Serialize, Clone)]
struct StreamData {
id: String,
title: String,
@ -33,7 +33,7 @@ struct StreamData {
viewer_count: Option<u64>,
}
#[derive(Serialize)]
#[derive(Serialize, Clone)]
struct IndexTemplateData {
public_url: String,
has_streams: bool,
@ -41,7 +41,7 @@ struct IndexTemplateData {
streams: Vec<StreamData>,
}
struct CachedStreams {
pub struct CachedStreams {
data: IndexTemplateData,
cached_at: Instant,
}
@ -57,7 +57,12 @@ pub struct HttpServer {
}
impl HttpServer {
pub fn new(index_template: String, files_dir: PathBuf, api: Api, stream_cache: StreamCache) -> Self {
pub fn new(
index_template: String,
files_dir: PathBuf,
api: Api,
stream_cache: StreamCache,
) -> Self {
Self {
index_template,
files_dir,
@ -70,8 +75,11 @@ impl HttpServer {
Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await
}
async fn get_cached_or_fetch_streams_static(stream_cache: &StreamCache, api: &Api) -> Result<IndexTemplateData> {
const CACHE_DURATION: Duration = Duration::from_secs(60); // 1 minute
async fn get_cached_or_fetch_streams_static(
stream_cache: &StreamCache,
api: &Api,
) -> Result<IndexTemplateData> {
const CACHE_DURATION: Duration = Duration::from_secs(10);
// Check if we have valid cached data
{
@ -86,7 +94,7 @@ impl HttpServer {
// Cache is expired or missing, fetch new data
let active_streams = api.get_active_streams().await?;
let public_url = api.get_public_url();
let template_data = if !active_streams.is_empty() {
let streams: Vec<StreamData> = active_streams
.into_iter()
@ -94,10 +102,16 @@ impl HttpServer {
let viewer_count = api.get_viewer_count(&stream.id);
StreamData {
id: stream.id.clone(),
title: stream.title.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
title: stream
.title
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
summary: stream.summary,
live_url: format!("/{}/live.m3u8", stream.id),
viewer_count: if viewer_count > 0 { Some(viewer_count) } else { None },
viewer_count: if viewer_count > 0 {
Some(viewer_count as _)
} else {
None
},
}
})
.collect();
@ -140,13 +154,18 @@ impl HttpServer {
playlist_path: &PathBuf,
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
// Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid)
let path_parts: Vec<&str> = req.uri().path().trim_start_matches('/').split('/').collect();
let path_parts: Vec<&str> = req
.uri()
.path()
.trim_start_matches('/')
.split('/')
.collect();
if path_parts.len() < 2 {
return Ok(Response::builder().status(404).body(BoxBody::default())?);
}
let stream_id = path_parts[0];
// Get client IP and User-Agent for tracking
let client_ip = Self::get_client_ip(req);
let user_agent = req
@ -179,9 +198,10 @@ impl HttpServer {
// Read the playlist file
let playlist_content = tokio::fs::read(playlist_path).await?;
// Parse and modify playlist to add viewer token to URLs
let modified_content = Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
let modified_content =
Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
Ok(Response::builder()
.header("content-type", "application/vnd.apple.mpegurl")
@ -205,7 +225,7 @@ impl HttpServer {
}
}
}
if let Some(real_ip) = req.headers().get("x-real-ip") {
if let Ok(ip_str) = real_ip.to_str() {
return ip_str.to_string();
@ -220,17 +240,18 @@ impl HttpServer {
// Parse the M3U8 playlist using the m3u8-rs crate
let (_, playlist) = m3u8_rs::parse_playlist(content)
.map_err(|e| anyhow::anyhow!("Failed to parse M3U8 playlist: {}", e))?;
match playlist {
m3u8_rs::Playlist::MasterPlaylist(mut master) => {
// For master playlists, add viewer token to variant streams
for variant in &mut master.variants {
variant.uri = Self::add_token_to_url(&variant.uri, viewer_token);
}
// Write the modified playlist back to string
let mut output = Vec::new();
master.write_to(&mut output)
master
.write_to(&mut output)
.map_err(|e| anyhow::anyhow!("Failed to write master playlist: {}", e))?;
String::from_utf8(output)
.map_err(|e| anyhow::anyhow!("Failed to convert playlist to string: {}", e))
@ -242,7 +263,7 @@ impl HttpServer {
}
}
}
fn add_token_to_url(url: &str, viewer_token: &str) -> String {
if url.contains('?') {
format!("{}&vt={}", url, viewer_token)
@ -264,7 +285,7 @@ impl Service<Request<Incoming>> for HttpServer {
{
let stream_cache = self.stream_cache.clone();
let api = self.api.clone();
// Compile template outside async move for better performance
let template = match mustache::compile_str(&self.index_template) {
Ok(t) => t,
@ -272,40 +293,36 @@ impl Service<Request<Incoming>> for HttpServer {
error!("Failed to compile template: {}", e);
return Box::pin(async move {
Ok(Response::builder()
.status(500)
.body(BoxBody::default()).unwrap())
.status(500)
.body(BoxBody::default())
.unwrap())
});
}
};
return Box::pin(async move {
// Use the existing method to get cached template data
let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
let template_data =
Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
match template_data {
Ok(data) => {
match template.render_to_string(&data) {
Ok(index_html) => Ok(Response::builder()
.header("content-type", "text/html")
.header("server", "zap-stream-core")
.body(
Full::new(Bytes::from(index_html))
.map_err(|e| match e {})
.boxed(),
)?),
Err(e) => {
error!("Failed to render template: {}", e);
Ok(Response::builder()
.status(500)
.body(BoxBody::default())?)
}
Ok(data) => match template.render_to_string(&data) {
Ok(index_html) => Ok(Response::builder()
.header("content-type", "text/html")
.header("server", "zap-stream-core")
.body(
Full::new(Bytes::from(index_html))
.map_err(|e| match e {})
.boxed(),
)?),
Err(e) => {
error!("Failed to render template: {}", e);
Ok(Response::builder().status(500).body(BoxBody::default())?)
}
}
},
Err(e) => {
error!("Failed to fetch template data: {}", e);
Ok(Response::builder()
.status(500)
.body(BoxBody::default())?)
Ok(Response::builder().status(500).body(BoxBody::default())?)
}
}
});
@ -415,12 +432,21 @@ pub fn check_nip98_auth(req: &Request<Incoming>, public_url: &str) -> Result<Aut
// Construct full URI using public_url + path + query
let request_uri = match req.uri().query() {
Some(query) => format!("{}{}?{}", public_url.trim_end_matches('/'), req.uri().path(), query),
Some(query) => format!(
"{}{}?{}",
public_url.trim_end_matches('/'),
req.uri().path(),
query
),
None => format!("{}{}", public_url.trim_end_matches('/'), req.uri().path()),
};
if !url_tag.eq_ignore_ascii_case(&request_uri) {
bail!("Invalid nostr event, URL tag invalid. Expected: {}, Got: {}", request_uri, url_tag);
bail!(
"Invalid nostr event, URL tag invalid. Expected: {}, Got: {}",
request_uri,
url_tag
);
}
// Check method tag

View File

@ -17,14 +17,14 @@ use url::Url;
use uuid::Uuid;
use zap_stream_core::egress::{EgressConfig, EgressSegment};
use zap_stream_core::ingress::ConnectionInfo;
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
use zap_stream_core::pipeline::{EgressType, PipelineConfig};
use zap_stream_core::variant::audio::AudioVariant;
use zap_stream_core::variant::mapping::VariantMapping;
use zap_stream_core::variant::video::VideoVariant;
use zap_stream_core::variant::{StreamMapping, VariantStream};
use zap_stream_core::viewer::ViewerTracker;
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
use zap_stream_db::{IngestEndpoint, UserStream, UserStreamState, ZapStreamDb};
const STREAM_EVENT_KIND: u16 = 30_311;
@ -353,22 +353,18 @@ impl Overseer for ZapStreamOverseer {
}
// Get ingest endpoint configuration based on connection type
let endpoint_id = self
.detect_endpoint(&connection)
.await?
.ok_or_else(|| anyhow::anyhow!("No ingest endpoints configured"))?;
let endpoint = self
.db
.get_ingest_endpoint(endpoint_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Ingest endpoint not found"))?;
let endpoint = self.detect_endpoint(&connection).await?;
let variants = get_variants_from_endpoint(&stream_info, &endpoint)?;
let cfg = get_variants_from_endpoint(&stream_info, &endpoint)?;
if cfg.video_src.is_none() || cfg.variants.is_empty() {
bail!("No video src found");
}
let mut egress = vec![];
egress.push(EgressType::HLS(EgressConfig {
name: "hls".to_string(),
variants: variants.iter().map(|v| v.id()).collect(),
variants: cfg.variants.iter().map(|v| v.id()).collect(),
}));
let stream_id = Uuid::new_v4();
@ -378,7 +374,7 @@ impl Overseer for ZapStreamOverseer {
user_id: uid,
starts: Utc::now(),
state: UserStreamState::Live,
endpoint_id: Some(endpoint_id),
endpoint_id: Some(endpoint.id),
..Default::default()
};
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
@ -399,8 +395,11 @@ impl Overseer for ZapStreamOverseer {
Ok(PipelineConfig {
id: stream_id,
variants,
variants: cfg.variants,
egress,
ingress_info: stream_info.clone(),
video_src: cfg.video_src.unwrap().index,
audio_src: cfg.audio_src.map(|s| s.index),
})
}
@ -525,25 +524,29 @@ impl Overseer for ZapStreamOverseer {
impl ZapStreamOverseer {
/// Detect which ingest endpoint should be used based on connection info
async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result<Option<u64>> {
// Get all ingest endpoints and match by name against connection endpoint
async fn detect_endpoint(&self, connection: &ConnectionInfo) -> Result<IngestEndpoint> {
let endpoints = self.db.get_ingest_endpoints().await?;
for endpoint in &endpoints {
if endpoint.name == connection.endpoint {
return Ok(Some(endpoint.id));
}
}
// No matching endpoint found, use the most expensive one
Ok(endpoints.into_iter().max_by_key(|e| e.cost).map(|e| e.id))
let default = endpoints.iter().max_by_key(|e| e.cost);
Ok(endpoints
.iter()
.find(|e| e.name == connection.endpoint)
.or(default)
.unwrap()
.clone())
}
}
fn get_variants_from_endpoint(
info: &IngressInfo,
struct EndpointConfig<'a> {
video_src: Option<&'a IngressStream>,
audio_src: Option<&'a IngressStream>,
variants: Vec<VariantStream>,
}
fn get_variants_from_endpoint<'a>(
info: &'a IngressInfo,
endpoint: &zap_stream_db::IngestEndpoint,
) -> Result<Vec<VariantStream>> {
) -> Result<EndpointConfig<'a>> {
let capabilities_str = endpoint.capabilities.as_deref().unwrap_or("");
let capabilities: Vec<&str> = capabilities_str.split(',').collect();
@ -658,5 +661,9 @@ fn get_variants_from_endpoint(
// Handle other capabilities like dvr:720h here if needed
}
Ok(vars)
Ok(EndpointConfig {
audio_src,
video_src,
variants: vars,
})
}