diff --git a/Cargo.lock b/Cargo.lock index 4e0898b..c38f299 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1612,6 +1612,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" + [[package]] name = "httparse" version = "1.10.0" @@ -2147,8 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "m3u8-rs" version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f03cd3335fb5f2447755d45cda9c70f76013626a9db44374973791b0926a86c3" +source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168" dependencies = [ "chrono", "nom", @@ -5008,6 +5013,7 @@ dependencies = [ "futures-util", "hex", "http-body-util", + "http-range-header", "hyper 1.6.0", "hyper-util", "log 0.4.25", diff --git a/Cargo.toml b/Cargo.toml index 7f09cc9..47e3baa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,6 @@ url = "2.5.0" itertools = "0.14.0" chrono = { version = "^0.4.38", features = ["serde"] } hex = "0.4.3" -m3u8-rs = "6.0.0" +m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" } sha2 = "0.10.8" data-encoding = "2.9.0" \ No newline at end of file diff --git a/crates/core/src/egress/hls.rs b/crates/core/src/egress/hls.rs index 0653e2f..613f5cc 100644 --- a/crates/core/src/egress/hls.rs +++ b/crates/core/src/egress/hls.rs @@ -1,24 +1,51 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; +use ffmpeg_rs_raw::Encoder; +use std::path::PathBuf; use uuid::Uuid; use crate::egress::{Egress, EgressResult}; -use crate::mux::HlsMuxer; +use crate::mux::{HlsMuxer, SegmentType}; +use crate::variant::VariantStream; /// Alias the muxer directly -pub type HlsEgress = HlsMuxer; +pub struct HlsEgress { + mux: HlsMuxer, +} -impl Egress for HlsMuxer { +impl HlsEgress { + pub const PATH: &'static str = "hls"; + + pub fn new<'a>( + id: &Uuid, + out_dir: &str, + segment_length: f32, + encoders: impl Iterator, + segment_type: SegmentType, + ) -> Result { + Ok(Self { + mux: HlsMuxer::new( + id, + PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(), + segment_length, + encoders, + segment_type, + )?, + }) + } +} + +impl Egress for HlsEgress { unsafe fn process_pkt( &mut self, packet: *mut AVPacket, variant: &Uuid, ) -> Result { - self.mux_packet(packet, variant) + self.mux.mux_packet(packet, variant) } unsafe fn reset(&mut self) -> Result<()> { - for var in &mut self.variants { + for var in &mut self.mux.variants { var.reset()? } Ok(()) diff --git a/crates/core/src/mux/hls.rs b/crates/core/src/mux/hls.rs index 00bc272..f3e974c 100644 --- a/crates/core/src/mux/hls.rs +++ b/crates/core/src/mux/hls.rs @@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, AVPacket, - AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, + av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, + AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; use log::{info, trace, warn}; -use m3u8_rs::MediaSegment; +use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf}; use std::collections::HashMap; use std::fmt::Display; use std::fs::File; @@ -18,7 +18,7 @@ use std::path::PathBuf; use std::ptr; use uuid::Uuid; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq)] pub enum SegmentType { MPEGTS, FMP4, @@ -79,14 +79,14 @@ pub struct HlsVariant { streams: Vec, /// Segment length in seconds segment_length: f32, - /// Total number of segments to store for this variant - segment_window: Option, + /// Total number of seconds of video to store + segment_window: f32, /// Current segment index idx: u64, /// Output directory (base) out_dir: String, /// List of segments to be included in the playlist - segments: Vec, + segments: Vec, /// Type of segments to create segment_type: SegmentType, /// Ending presentation timestamp @@ -97,8 +97,30 @@ pub struct HlsVariant { packets_written: u64, /// Reference stream used to track duration ref_stream_index: i32, + /// LL-HLS: Target duration for partial segments + partial_target_duration: f32, + /// HLS-LL: Current partial index + current_partial_index: u64, + /// HLS-LL: Current duration in this partial + current_partial_duration: f64, } +#[derive(PartialEq)] +enum HlsSegment { + Full(SegmentInfo), + Partial(PartialSegmentInfo), +} + +impl HlsSegment { + fn to_media_segment(&self) -> MediaSegmentType { + match self { + HlsSegment::Full(s) => s.to_media_segment(), + HlsSegment::Partial(s) => s.to_media_segment(), + } + } +} + +#[derive(PartialEq)] struct SegmentInfo { index: u64, duration: f32, @@ -106,13 +128,12 @@ struct SegmentInfo { } impl SegmentInfo { - fn to_media_segment(&self) -> MediaSegment { - MediaSegment { + fn to_media_segment(&self) -> MediaSegmentType { + MediaSegmentType::Full(MediaSegment { uri: self.filename(), duration: self.duration, - title: None, ..MediaSegment::default() - } + }) } fn filename(&self) -> String { @@ -120,6 +141,35 @@ impl SegmentInfo { } } +#[derive(PartialEq)] +struct PartialSegmentInfo { + index: u64, + parent_index: u64, + parent_kind: SegmentType, + duration: f64, + independent: bool, + byte_range: Option<(u64, Option)>, +} + +impl PartialSegmentInfo { + fn to_media_segment(&self) -> MediaSegmentType { + MediaSegmentType::Partial(Part { + uri: self.filename(), + duration: self.duration, + independent: self.independent, + gap: false, + byte_range: self.byte_range.map(|r| ByteRange { + length: r.0, + offset: r.1, + }), + }) + } + + fn filename(&self) -> String { + HlsVariant::segment_name(self.parent_kind, self.parent_index) + } +} + impl HlsVariant { pub fn new<'a>( out_dir: &'a str, @@ -207,17 +257,20 @@ impl HlsVariant { Ok(Self { name: name.clone(), segment_length, - segment_window: Some(10), //TODO: configure window + segment_window: 30.0, mux, streams, idx: 1, - segments: Vec::new(), // Start with empty segments list + segments: Vec::new(), out_dir: out_dir.to_string(), segment_type, end_pts: AV_NOPTS_VALUE, duration: 0.0, packets_written: 0, ref_stream_index, + partial_target_duration: 0.33, + current_partial_index: 0, + current_partial_duration: 0.0, }) } @@ -259,20 +312,8 @@ impl HlsVariant { } // check if current packet is keyframe, flush current segment - if self.packets_written > 0 && can_split { - trace!( - "{} segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}", - self.name, - (*pkt).pts, - self.duration, - (*pkt).time_base.num, - (*pkt).time_base.den, - self.segment_length - ); - - if self.duration >= self.segment_length as f64 { - result = self.split_next_seg()?; - } + if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 { + result = self.split_next_seg()?; } // track duration from pts @@ -282,13 +323,22 @@ impl HlsVariant { } let pts_diff = (*pkt).pts - self.end_pts; if pts_diff > 0 { - self.duration += pts_diff as f64 * av_q2d((*pkt).time_base); + let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base); + self.duration += time_delta; + self.current_partial_duration += time_delta; } self.end_pts = (*pkt).pts; } + // write to current segment self.mux.write_packet(pkt)?; self.packets_written += 1; + + // HLS-LL: write next partial segment + if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 { + self.create_partial_segment(can_split)?; + } + Ok(result) } @@ -296,6 +346,46 @@ impl HlsVariant { self.mux.close() } + /// Create a partial segment for LL-HLS + fn create_partial_segment(&mut self, independent: bool) -> Result<()> { + let ctx = self.mux.context(); + let pos = unsafe { + avio_flush((*ctx).pb); + avio_size((*ctx).pb) as u64 + }; + + let previous_partial_end = self.segments.last().and_then(|s| match &s { + HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len), + _ => None, + }); + let partial_info = PartialSegmentInfo { + index: self.current_partial_index, + parent_index: self.idx, + parent_kind: self.segment_type, + duration: self.current_partial_duration, + independent, + byte_range: match previous_partial_end { + Some(prev_end) => Some((pos - prev_end, Some(prev_end))), + _ => Some((pos, Some(0))), + }, + }; + + trace!( + "{} created partial segment {} [{:.3}s, independent={}]", + self.name, + partial_info.index, + partial_info.duration, + independent + ); + self.segments.push(HlsSegment::Partial(partial_info)); + self.current_partial_index += 1; + self.current_partial_duration = 0.0; + + self.write_playlist()?; + + Ok(()) + } + /// Reset the muxer state and start the next segment unsafe fn split_next_seg(&mut self) -> Result { let completed_segment_idx = self.idx; @@ -383,6 +473,7 @@ impl HlsVariant { warn!("Failed to update playlist: {}", e); } + // Reset counters for next segment self.packets_written = 0; self.duration = 0.0; @@ -400,37 +491,62 @@ impl HlsVariant { /// Add a new segment to the variant and return a list of deleted segments fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> { - self.segments.push(SegmentInfo { + self.segments.push(HlsSegment::Full(SegmentInfo { index: idx, duration, kind: self.segment_type, - }); + })); self.write_playlist() } /// Delete segments which are too old fn clean_segments(&mut self) -> Result> { - const MAX_SEGMENTS: usize = 10; - - let mut ret = vec![]; - if self.segments.len() > MAX_SEGMENTS { - let n_drain = self.segments.len() - MAX_SEGMENTS; - let seg_dir = self.out_dir(); - for seg in self.segments.drain(..n_drain) { - // delete file - let seg_path = seg_dir.join(seg.filename()); - if let Err(e) = std::fs::remove_file(&seg_path) { - warn!( - "Failed to remove segment file: {} {}", - seg_path.display(), - e - ); + let drain_from_hls_segment = { + let mut acc = 0.0; + let mut seg_match = None; + for seg in self + .segments + .iter() + .filter(|e| matches!(e, HlsSegment::Full(_))) + .rev() + { + if acc >= self.segment_window { + seg_match = Some(seg); + break; + } + acc += match seg { + HlsSegment::Full(seg) => seg.duration, + _ => 0.0, + }; + } + seg_match + }; + let mut ret = vec![]; + if let Some(seg_match) = drain_from_hls_segment { + if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) { + let seg_dir = self.out_dir(); + for seg in self.segments.drain(..drain_pos) { + match seg { + HlsSegment::Full(seg) => { + let seg_path = seg_dir.join(seg.filename()); + if let Err(e) = std::fs::remove_file(&seg_path) { + warn!( + "Failed to remove segment file: {} {}", + seg_path.display(), + e + ); + } + trace!("Removed segment file: {}", seg_path.display()); + + ret.push(seg); + } + _ => {} + } } - trace!("Removed segment file: {}", seg_path.display()); - ret.push(seg); } } + Ok(ret) } @@ -440,11 +556,20 @@ impl HlsVariant { } let mut pl = m3u8_rs::MediaPlaylist::default(); - // Round up target duration to ensure compliance pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); - pl.version = Some(3); - pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0); + pl.version = Some(6); + pl.part_inf = Some(PartInf { + part_target: self.partial_target_duration as f64, + }); + pl.media_sequence = self + .segments + .iter() + .find_map(|s| match s { + HlsSegment::Full(ss) => Some(ss.index), + _ => None, + }) + .unwrap_or(self.idx); // For live streams, don't set end list pl.end_list = false; @@ -522,6 +647,9 @@ impl HlsMuxer { ) -> Result { let base = PathBuf::from(out_dir).join(id.to_string()); + if !base.exists() { + std::fs::create_dir_all(&base)?; + } let mut vars = Vec::new(); for (k, group) in &encoders .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 1adff31..f1d0973 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -705,7 +705,7 @@ impl PipelineRunner { let hls = HlsEgress::new( &self.connection.id, &self.out_dir, - 2.0, // TODO: configure segment length + 6.0, // TODO: configure segment length encoders, SegmentType::MPEGTS, )?; diff --git a/crates/zap-stream/Cargo.toml b/crates/zap-stream/Cargo.toml index fe0c9e3..371d109 100644 --- a/crates/zap-stream/Cargo.toml +++ b/crates/zap-stream/Cargo.toml @@ -43,4 +43,5 @@ pretty_env_logger = "0.5.0" clap = { version = "4.5.16", features = ["derive"] } futures-util = "0.3.31" matchit = "0.8.4" -mustache = "0.9.0" \ No newline at end of file +mustache = "0.9.0" +http-range-header = "0.4.2" diff --git a/crates/zap-stream/config.yaml b/crates/zap-stream/config.yaml index f24b6ef..7f6f8af 100755 --- a/crates/zap-stream/config.yaml +++ b/crates/zap-stream/config.yaml @@ -5,6 +5,7 @@ endpoints: - "rtmp://127.0.0.1:3336" - "srt://127.0.0.1:3335" - "tcp://127.0.0.1:3334" + - "test-pattern://" # Public hostname which points to the IP address used to listen for all [endpoints] endpoints_public_hostname: "localhost" diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index c369bda..64020fb 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -1,25 +1,36 @@ use crate::api::Api; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Context, Result}; use base64::Engine; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full, StreamBody}; +use http_range_header::{ + parse_range_header, EndPosition, StartPosition, SyntacticallyCorrectRange, +}; use hyper::body::{Frame, Incoming}; +use hyper::http::response::Builder; use hyper::service::Service; -use hyper::{Method, Request, Response}; -use log::error; +use hyper::{Request, Response, StatusCode}; +use log::{error, warn}; +use matchit::Router; use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind}; use serde::Serialize; use std::future::Future; +use std::io::SeekFrom; +use std::ops::Range; use std::path::PathBuf; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; +use std::task::Poll; use std::time::{Duration, Instant}; use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; use tokio::sync::RwLock; use tokio_util::io::ReaderStream; +use uuid::Uuid; +use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::viewer::ViewerTracker; #[derive(Serialize, Clone)] @@ -46,6 +57,14 @@ pub struct CachedStreams { cached_at: Instant, } +#[derive(Clone)] +pub enum HttpServerPath { + Index, + HlsMasterPlaylist, + HlsVariantPlaylist, + HlsSegmentFile, +} + pub type StreamCache = Arc>>; #[derive(Clone)] @@ -54,6 +73,7 @@ pub struct HttpServer { files_dir: PathBuf, api: Api, stream_cache: StreamCache, + router: Router, } impl HttpServer { @@ -63,18 +83,37 @@ impl HttpServer { api: Api, stream_cache: StreamCache, ) -> Self { + let mut router = Router::new(); + router.insert("/", HttpServerPath::Index).unwrap(); + router.insert("/index.html", HttpServerPath::Index).unwrap(); + router + .insert( + format!("/{}/{{stream}}/live.m3u8", HlsEgress::PATH), + HttpServerPath::HlsMasterPlaylist, + ) + .unwrap(); + router + .insert( + format!("/{}/{{stream}}/{{variant}}/live.m3u8", HlsEgress::PATH), + HttpServerPath::HlsVariantPlaylist, + ) + .unwrap(); + router + .insert( + format!("/{}/{{stream}}/{{variant}}/{{seg}}.ts", HlsEgress::PATH), + HttpServerPath::HlsSegmentFile, + ) + .unwrap(); + Self { index_template, files_dir, api, stream_cache, + router, } } - async fn get_cached_or_fetch_streams(&self) -> Result { - Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await - } - async fn get_cached_or_fetch_streams_static( stream_cache: &StreamCache, api: &Api, @@ -100,13 +139,14 @@ impl HttpServer { .into_iter() .map(|stream| { let viewer_count = api.get_viewer_count(&stream.id); + // TODO: remove HLS assumption StreamData { id: stream.id.clone(), title: stream .title .unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), summary: stream.summary, - live_url: format!("/{}/live.m3u8", stream.id), + live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id), viewer_count: if viewer_count > 0 { Some(viewer_count as _) } else { @@ -141,31 +181,97 @@ impl HttpServer { Ok(template_data) } - async fn render_index(&self) -> Result { - let template_data = self.get_cached_or_fetch_streams().await?; - let template = mustache::compile_str(&self.index_template)?; - let rendered = template.render_to_string(&template_data)?; - Ok(rendered) + async fn handle_index( + api: Api, + stream_cache: StreamCache, + template: String, + ) -> Result>, anyhow::Error> { + // Compile template outside async move for better performance + let template = match mustache::compile_str(&template) { + Ok(t) => t, + Err(e) => { + error!("Failed to compile template: {}", e); + return Ok(Self::base_response().status(500).body(BoxBody::default())?); + } + }; + + let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await; + + match template_data { + Ok(data) => match template.render_to_string(&data) { + Ok(index_html) => Ok(Self::base_response() + .header("content-type", "text/html") + .body( + Full::new(Bytes::from(index_html)) + .map_err(|e| match e {}) + .boxed(), + )?), + Err(e) => { + error!("Failed to render template: {}", e); + Ok(Self::base_response().status(500).body(BoxBody::default())?) + } + }, + Err(e) => { + error!("Failed to fetch template data: {}", e); + Ok(Self::base_response().status(500).body(BoxBody::default())?) + } + } } - async fn handle_hls_playlist( - api: &Api, + async fn handle_hls_segment( req: &Request, - playlist_path: &PathBuf, + segment_path: PathBuf, ) -> Result>, anyhow::Error> { - // Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid) - let path_parts: Vec<&str> = req - .uri() - .path() - .trim_start_matches('/') - .split('/') - .collect(); - if path_parts.len() < 2 { - return Ok(Response::builder().status(404).body(BoxBody::default())?); + let mut response = Self::base_response().header("accept-ranges", "bytes"); + + if let Some(r) = req.headers().get("range") { + if let Ok(ranges) = parse_range_header(r.to_str()?) { + if ranges.ranges.len() > 1 { + warn!("Multipart ranges are not supported, fallback to non-range request"); + Self::path_to_response(segment_path).await + } else { + let file = File::open(&segment_path).await?; + let metadata = file.metadata().await?; + let single_range = ranges.ranges.first().unwrap(); + let range = match RangeBody::get_range(metadata.len(), single_range) { + Ok(r) => r, + Err(e) => { + warn!("Failed to get range: {}", e); + return Ok(response + .status(StatusCode::RANGE_NOT_SATISFIABLE) + .body(BoxBody::default())?); + } + }; + let r_body = RangeBody::new(file, metadata.len(), range.clone()); + + response = response.status(StatusCode::PARTIAL_CONTENT); + let headers = r_body.get_headers(); + for (k, v) in headers { + response = response.header(k, v); + } + let f_stream = ReaderStream::new(r_body); + let body = StreamBody::new( + f_stream + .map_ok(Frame::data) + .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)), + ) + .boxed(); + Ok(response.body(body)?) + } + } else { + Ok(Self::base_response().status(400).body(BoxBody::default())?) + } + } else { + Self::path_to_response(segment_path).await } + } - let stream_id = path_parts[0]; - + async fn handle_hls_master_playlist( + api: Api, + req: &Request, + stream_id: &str, + playlist_path: PathBuf, + ) -> Result>, anyhow::Error> { // Get client IP and User-Agent for tracking let client_ip = Self::get_client_ip(req); let user_agent = req @@ -203,17 +309,15 @@ impl HttpServer { let modified_content = Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?; - Ok(Response::builder() + let response = Self::base_response() .header("content-type", "application/vnd.apple.mpegurl") - .header("server", "zap-stream-core") - .header("access-control-allow-origin", "*") - .header("access-control-allow-headers", "*") - .header("access-control-allow-methods", "HEAD, GET") .body( Full::new(Bytes::from(modified_content)) .map_err(|e| match e {}) .boxed(), - )?) + )?; + + Ok(response) } fn get_client_ip(req: &Request) -> String { @@ -232,8 +336,8 @@ impl HttpServer { } } - // Fallback to connection IP (note: in real deployment this might be a proxy) - "unknown".to_string() + // use random string as IP to avoid broken view tracker due to proxying + Uuid::new_v4().to_string() } fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result { @@ -271,6 +375,27 @@ impl HttpServer { format!("{}?vt={}", url, viewer_token) } } + + fn base_response() -> Builder { + Response::builder() + .header("server", "zap-stream-core") + .header("access-control-allow-origin", "*") + .header("access-control-allow-headers", "*") + .header("access-control-allow-methods", "HEAD, GET") + } + + /// Get a response object for a file body + async fn path_to_response(path: PathBuf) -> Result>> { + let f = File::open(&path).await?; + let f_stream = ReaderStream::new(f); + let body = StreamBody::new( + f_stream + .map_ok(Frame::data) + .map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)), + ) + .boxed(); + Ok(Self::base_response().body(body)?) + } } impl Service> for HttpServer { @@ -279,89 +404,50 @@ impl Service> for HttpServer { type Future = Pin> + Send>>; fn call(&self, req: Request) -> Self::Future { - // check is index.html - if req.method() == Method::GET && req.uri().path() == "/" - || req.uri().path() == "/index.html" - { - let stream_cache = self.stream_cache.clone(); - let api = self.api.clone(); + let path = req.uri().path().to_owned(); + // request path as a file path pointing to the output directory + let dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); - // Compile template outside async move for better performance - let template = match mustache::compile_str(&self.index_template) { - Ok(t) => t, - Err(e) => { - error!("Failed to compile template: {}", e); + if let Ok(m) = self.router.at(&path) { + match m.value { + HttpServerPath::Index => { + let api = self.api.clone(); + let cache = self.stream_cache.clone(); + let template = self.index_template.clone(); + return Box::pin(async move { Self::handle_index(api, cache, template).await }); + } + HttpServerPath::HlsMasterPlaylist => { + let api = self.api.clone(); + let stream_id = m.params.get("stream").map(|s| s.to_string()); + let file_path = dst_path.clone(); return Box::pin(async move { - Ok(Response::builder() - .status(500) - .body(BoxBody::default()) - .unwrap()) + let stream_id = stream_id.context("stream id missing")?; + Ok( + Self::handle_hls_master_playlist(api, &req, &stream_id, file_path) + .await?, + ) }); } - }; - - return Box::pin(async move { - // Use the existing method to get cached template data - let template_data = - Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await; - - match template_data { - Ok(data) => match template.render_to_string(&data) { - Ok(index_html) => Ok(Response::builder() - .header("content-type", "text/html") - .header("server", "zap-stream-core") - .body( - Full::new(Bytes::from(index_html)) - .map_err(|e| match e {}) - .boxed(), - )?), - Err(e) => { - error!("Failed to render template: {}", e); - Ok(Response::builder().status(500).body(BoxBody::default())?) - } - }, - Err(e) => { - error!("Failed to fetch template data: {}", e); - Ok(Response::builder().status(500).body(BoxBody::default())?) - } + HttpServerPath::HlsVariantPlaylist => { + // let file handler handle this one, may be used later for HLS-LL to create + // delta updates } - }); + HttpServerPath::HlsSegmentFile => { + // handle segment file (range requests) + let file_path = dst_path.clone(); + return Box::pin(async move { + Ok(Self::handle_hls_segment(&req, file_path).await?) + }); + } + } } - // check if mapped to file - let dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); + // check if mapped to file (not handled route) if dst_path.exists() { - let api_clone = self.api.clone(); - return Box::pin(async move { - let rsp = Response::builder() - .header("server", "zap-stream-core") - .header("access-control-allow-origin", "*") - .header("access-control-allow-headers", "*") - .header("access-control-allow-methods", "HEAD, GET"); - - if req.method() == Method::HEAD { - return Ok(rsp.body(BoxBody::default())?); - } - - // Handle HLS playlists with viewer tracking - if req.uri().path().ends_with("/live.m3u8") { - return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await; - } - - // Handle regular files - let f = File::open(&dst_path).await?; - let f_stream = ReaderStream::new(f); - let body = StreamBody::new( - f_stream - .map_ok(Frame::data) - .map_err(|e| Self::Error::new(e)), - ) - .boxed(); - Ok(rsp.body(body)?) - }); + return Box::pin(async move { Self::path_to_response(dst_path).await }); } - // otherwise handle in overseer + // fallback to api handler let api = self.api.clone(); Box::pin(async move { match api.handler(req).await { @@ -466,3 +552,110 @@ pub fn check_nip98_auth(req: &Request, public_url: &str) -> Result) -> Self { + Self { + file, + file_size, + range_start: range.start, + range_end: range.end, + current_offset: 0, + poll_complete: false, + } + } + + pub fn get_range(file_size: u64, header: &SyntacticallyCorrectRange) -> Result> { + let range_start = match header.start { + StartPosition::Index(i) => { + ensure!(i < file_size, "Range start out of range"); + i + } + StartPosition::FromLast(i) => file_size.saturating_sub(i), + }; + let range_end = match header.end { + EndPosition::Index(i) => { + ensure!(i <= file_size, "Range end out of range"); + i + } + EndPosition::LastByte => { + (file_size.saturating_sub(1)).min(range_start + MAX_UNBOUNDED_RANGE) + } + }; + Ok(range_start..range_end) + } + + pub fn get_headers(&self) -> Vec<(&'static str, String)> { + let r_len = (self.range_end - self.range_start) + 1; + vec![ + ("content-length", r_len.to_string()), + ( + "content-range", + format!( + "bytes {}-{}/{}", + self.range_start, self.range_end, self.file_size + ), + ), + ] + } +} + +impl AsyncRead for RangeBody { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let range_start = self.range_start + self.current_offset; + let range_len = self.range_end.saturating_sub(range_start) + 1; + let bytes_to_read = buf.remaining().min(range_len as usize) as u64; + + if bytes_to_read == 0 { + return Poll::Ready(Ok(())); + } + + // when no pending poll, seek to starting position + if !self.poll_complete { + let pinned = pin!(&mut self.file); + pinned.start_seek(SeekFrom::Start(range_start))?; + self.poll_complete = true; + } + + // check poll completion + if self.poll_complete { + let pinned = pin!(&mut self.file); + match pinned.poll_complete(cx) { + Poll::Ready(Ok(_)) => { + self.poll_complete = false; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + + // Read data from the file + let pinned = pin!(&mut self.file); + match pinned.poll_read(cx, buf) { + Poll::Ready(Ok(_)) => { + self.current_offset += bytes_to_read; + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => { + self.poll_complete = true; + Poll::Pending + } + } + } +} diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index 8e90270..1b7c225 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use tokio::sync::RwLock; use url::Url; use uuid::Uuid; +use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; @@ -227,15 +228,18 @@ impl ZapStreamOverseer { stream: &UserStream, pubkey: &Vec, ) -> Result { + // TODO: remove assumption that HLS is enabled + let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string()); let extra_tags = vec![ Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse([ "streaming", - self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(), + self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())? + .as_str(), ])?, Tag::parse([ "image", - self.map_to_stream_public_url(stream, "thumb.webp")? + self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())? .as_str(), ])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, @@ -248,10 +252,6 @@ impl ZapStreamOverseer { Ok(ev) } - fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result { - self.map_to_public_url(&format!("{}/{}", stream.id, path)) - } - fn map_to_public_url(&self, path: &str) -> Result { let u: Url = self.public_url.parse()?; Ok(u.join(path)?.to_string()) @@ -433,7 +433,7 @@ impl Overseer for ZapStreamOverseer { .tick_stream(pipeline_id, stream.user_id, duration, cost) .await?; if bal <= 0 { - bail!("Not enough balance"); + bail!("Balance has run out"); } // Update last segment time for this stream @@ -514,6 +514,7 @@ impl Overseer for ZapStreamOverseer { viewer_states.remove(&stream.id); stream.state = UserStreamState::Ended; + stream.ends = Some(Utc::now()); let event = self.publish_stream_event(&stream, &user.pubkey).await?; stream.event = Some(event.as_json()); self.db.update_stream(&stream).await?;