Compare commits

...

12 Commits

Author SHA1 Message Date
2c3ef01d45 fix: docker build 2025-06-17 13:45:38 +01:00
ea33f72069 chore: run tests for docker build
Some checks failed
continuous-integration/drone Build is failing
2025-06-17 13:08:25 +01:00
e91c40806f fix: low latency generator always running
All checks were successful
continuous-integration/drone Build is passing
chore: add HLS generation tests
2025-06-17 12:57:02 +01:00
77eff603d0 fix: disable HLS-LL
All checks were successful
continuous-integration/drone Build is passing
2025-06-17 12:05:12 +01:00
e056e0427f fix: HLS-LL
All checks were successful
continuous-integration/drone Build is passing
refactor: fMP4 (WIP)
2025-06-17 11:48:49 +01:00
a046dc5801 fix: make HLS segment length match encoding params 2025-06-16 13:30:41 +01:00
4787ecd2b4 chore: add more keyframes 2025-06-16 09:58:52 +01:00
e7e1f0299d fix: segment duration calc
Some checks reported errors
continuous-integration/drone Build was killed
feat: add debugging tool for hls segments
2025-06-13 17:42:39 +01:00
338d351727 fix: disable HLS-LL
All checks were successful
continuous-integration/drone Build is passing
fix: thumb.webp path
2025-06-13 13:05:23 +01:00
047b3fec59 fix: hls partial sequencing
All checks were successful
continuous-integration/drone Build is passing
2025-06-13 12:36:20 +01:00
fee5e77407 fix: missing endpoint id 2025-06-13 12:21:40 +01:00
d88f829645 fix: match endpoint 2025-06-13 12:18:39 +01:00
18 changed files with 2125 additions and 242 deletions

View File

@ -1,3 +1,4 @@
**/target/ **/target/
**/.git/ **/.git/
**/out/ **/out/
**/Dockerfile

125
Cargo.lock generated
View File

@ -182,7 +182,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -193,7 +193,7 @@ checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -365,7 +365,7 @@ dependencies = [
"regex", "regex",
"rustc-hash", "rustc-hash",
"shlex", "shlex",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -674,7 +674,7 @@ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -906,7 +906,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustc_version", "rustc_version",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -938,7 +938,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -974,6 +974,16 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log 0.4.25",
"regex",
]
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.10.2" version = "0.10.2"
@ -987,6 +997,19 @@ dependencies = [
"termcolor", "termcolor",
] ]
[[package]]
name = "env_logger"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log 0.4.25",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"
@ -1282,7 +1305,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -1897,7 +1920,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -2026,6 +2049,30 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log 0.4.25",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.103",
]
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.77" version = "0.3.77"
@ -2153,7 +2200,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]] [[package]]
name = "m3u8-rs" name = "m3u8-rs"
version = "6.0.0" version = "6.0.0"
source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168" source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=6803eefca2838a8bfae9e19fd516ef36d7d89997#6803eefca2838a8bfae9e19fd516ef36d7d89997"
dependencies = [ dependencies = [
"chrono", "chrono",
"nom", "nom",
@ -2443,7 +2490,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -2576,7 +2623,7 @@ dependencies = [
"pest_meta", "pest_meta",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -2623,7 +2670,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -2695,6 +2742,15 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.20" version = "0.2.20"
@ -2710,7 +2766,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c"
dependencies = [ dependencies = [
"env_logger", "env_logger 0.10.2",
"log 0.4.25", "log 0.4.25",
] ]
@ -2721,7 +2777,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6924ced06e1f7dfe3fa48d57b9f74f55d8915f5036121bef647ef4b204895fac" checksum = "6924ced06e1f7dfe3fa48d57b9f74f55d8915f5036121bef647ef4b204895fac"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -2760,7 +2816,7 @@ dependencies = [
"prost", "prost",
"prost-types", "prost-types",
"regex", "regex",
"syn 2.0.96", "syn 2.0.103",
"tempfile", "tempfile",
] ]
@ -2774,7 +2830,7 @@ dependencies = [
"itertools 0.12.1", "itertools 0.12.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -3335,7 +3391,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -3583,7 +3639,7 @@ dependencies = [
"quote", "quote",
"sqlx-core", "sqlx-core",
"sqlx-macros-core", "sqlx-macros-core",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -3606,7 +3662,7 @@ dependencies = [
"sqlx-mysql", "sqlx-mysql",
"sqlx-postgres", "sqlx-postgres",
"sqlx-sqlite", "sqlx-sqlite",
"syn 2.0.96", "syn 2.0.103",
"tempfile", "tempfile",
"tokio", "tokio",
"url", "url",
@ -3831,9 +3887,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.96" version = "2.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3875,7 +3931,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -3954,7 +4010,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -3965,7 +4021,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -4064,7 +4120,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -4238,7 +4294,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"prost-build", "prost-build",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -4308,7 +4364,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -4595,7 +4651,7 @@ dependencies = [
"log 0.4.25", "log 0.4.25",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -4630,7 +4686,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -4993,7 +5049,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
"synstructure 0.13.1", "synstructure 0.13.1",
] ]
@ -5041,8 +5097,10 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"data-encoding", "data-encoding",
"env_logger 0.11.8",
"ffmpeg-rs-raw", "ffmpeg-rs-raw",
"fontdue", "fontdue",
"futures",
"futures-util", "futures-util",
"hex", "hex",
"itertools 0.14.0", "itertools 0.14.0",
@ -5055,6 +5113,7 @@ dependencies = [
"serde", "serde",
"sha2 0.10.8", "sha2 0.10.8",
"srt-tokio", "srt-tokio",
"tempfile",
"tiny-skia", "tiny-skia",
"tokio", "tokio",
"usvg", "usvg",
@ -5090,7 +5149,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]
@ -5110,7 +5169,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
"synstructure 0.13.1", "synstructure 0.13.1",
] ]
@ -5139,7 +5198,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.96", "syn 2.0.103",
] ]
[[package]] [[package]]

View File

@ -24,6 +24,6 @@ url = "2.5.0"
itertools = "0.14.0" itertools = "0.14.0"
chrono = { version = "^0.4.38", features = ["serde"] } chrono = { version = "^0.4.38", features = ["serde"] }
hex = "0.4.3" hex = "0.4.3"
m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" } m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "6803eefca2838a8bfae9e19fd516ef36d7d89997" }
sha2 = "0.10.8" sha2 = "0.10.8"
data-encoding = "2.9.0" data-encoding = "2.9.0"

View File

@ -37,4 +37,9 @@ srt-tokio = { version = "0.4.4", optional = true }
rml_rtmp = { version = "0.8.0", optional = true } rml_rtmp = { version = "0.8.0", optional = true }
bytes = "1.9.0" bytes = "1.9.0"
xflv = "0.4.4" xflv = "0.4.4"
futures = "0.3.30"
[dev-dependencies]
tempfile = "3.8.1"
env_logger = "0.11.3"

View File

@ -19,7 +19,6 @@ impl HlsEgress {
pub fn new<'a>( pub fn new<'a>(
id: &Uuid, id: &Uuid,
out_dir: &str, out_dir: &str,
segment_length: f32,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
) -> Result<Self> { ) -> Result<Self> {
@ -27,7 +26,6 @@ impl HlsEgress {
mux: HlsMuxer::new( mux: HlsMuxer::new(
id, id,
PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(), PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(),
segment_length,
encoders, encoders,
segment_type, segment_type,
)?, )?,

View File

@ -22,6 +22,7 @@ pub struct FrameGenerator {
width: u16, width: u16,
height: u16, height: u16,
video_sample_fmt: AVPixelFormat, video_sample_fmt: AVPixelFormat,
realtime: bool,
audio_sample_rate: u32, audio_sample_rate: u32,
audio_frame_size: i32, audio_frame_size: i32,
@ -71,6 +72,7 @@ impl FrameGenerator {
fps, fps,
width, width,
height, height,
realtime: true,
video_sample_fmt: pix_fmt, video_sample_fmt: pix_fmt,
audio_sample_rate: sample_rate, audio_sample_rate: sample_rate,
audio_frame_size: frame_size, audio_frame_size: frame_size,
@ -86,6 +88,10 @@ impl FrameGenerator {
}) })
} }
pub fn set_realtime(&mut self, realtime: bool) {
self.realtime = realtime;
}
pub fn from_stream( pub fn from_stream(
video_stream: &IngressStream, video_stream: &IngressStream,
audio_stream: Option<&IngressStream>, audio_stream: Option<&IngressStream>,
@ -263,6 +269,7 @@ impl FrameGenerator {
} }
Ok(()) Ok(())
} }
/// Copy data directly into the frame buffer (must be RGBA data) /// Copy data directly into the frame buffer (must be RGBA data)
pub unsafe fn copy_frame_data(&mut self, data: &[u8]) -> Result<()> { pub unsafe fn copy_frame_data(&mut self, data: &[u8]) -> Result<()> {
if self.next_frame.is_null() { if self.next_frame.is_null() {
@ -354,6 +361,7 @@ impl FrameGenerator {
self.begin()?; self.begin()?;
} }
if self.realtime {
let stream_time = Duration::from_secs_f64( let stream_time = Duration::from_secs_f64(
self.video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64, self.video_pts as f64 / self.pts_per_frame() as f64 / self.fps as f64,
); );
@ -366,6 +374,7 @@ impl FrameGenerator {
if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / self.fps { if !wait_time.is_zero() && wait_time.as_secs_f32() > 1f32 / self.fps {
std::thread::sleep(wait_time); std::thread::sleep(wait_time);
} }
}
// convert to output pixel format, or just return internal frame if it matches output // convert to output pixel format, or just return internal frame if it matches output
if self.video_sample_fmt != transmute((*self.next_frame).format) { if self.video_sample_fmt != transmute((*self.next_frame).format) {

View File

@ -1,8 +1,10 @@
pub mod egress; pub mod egress;
mod generator;
pub mod ingress; pub mod ingress;
pub mod mux; pub mod mux;
pub mod overseer; pub mod overseer;
pub mod pipeline; pub mod pipeline;
#[cfg(test)]
pub mod test_hls_timing;
pub mod variant; pub mod variant;
pub mod viewer; pub mod viewer;
mod generator;

View File

@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, av_free, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size, AVPacket,
AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
}; };
use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools; use itertools::Itertools;
use log::{info, trace, warn}; use log::{debug, info, trace, warn};
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf}; use m3u8_rs::{ByteRange, ExtTag, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::fs::File; use std::fs::File;
@ -89,20 +89,24 @@ pub struct HlsVariant {
segments: Vec<HlsSegment>, segments: Vec<HlsSegment>,
/// Type of segments to create /// Type of segments to create
segment_type: SegmentType, segment_type: SegmentType,
/// Ending presentation timestamp /// Timestamp of the start of the current segment
end_pts: i64, current_segment_start: f64,
/// Current segment duration in seconds (precise accumulation) /// Timestamp of the start of the current partial
duration: f64, current_partial_start: f64,
/// Number of packets written to current segment /// Number of packets written to current segment
packets_written: u64, packets_written: u64,
/// Reference stream used to track duration /// Reference stream used to track duration
ref_stream_index: i32, ref_stream_index: i32,
/// HLS-LL: Enable LL-output
low_latency: bool,
/// LL-HLS: Target duration for partial segments /// LL-HLS: Target duration for partial segments
partial_target_duration: f32, partial_target_duration: f32,
/// HLS-LL: Current partial index /// HLS-LL: Current partial index
current_partial_index: u64, current_partial_index: u64,
/// HLS-LL: Current duration in this partial /// HLS-LL: Whether the next partial segment should be marked as independent
current_partial_duration: f64, next_partial_independent: bool,
/// Path to initialization segment for fMP4
init_segment_path: Option<String>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -114,8 +118,8 @@ enum HlsSegment {
impl HlsSegment { impl HlsSegment {
fn to_media_segment(&self) -> MediaSegmentType { fn to_media_segment(&self) -> MediaSegmentType {
match self { match self {
HlsSegment::Full(s) => s.to_media_segment(), HlsSegment::Full(f) => f.to_media_segment(),
HlsSegment::Partial(s) => s.to_media_segment(), HlsSegment::Partial(p) => p.to_media_segment(),
} }
} }
} }
@ -168,12 +172,21 @@ impl PartialSegmentInfo {
fn filename(&self) -> String { fn filename(&self) -> String {
HlsVariant::segment_name(self.parent_kind, self.parent_index) HlsVariant::segment_name(self.parent_kind, self.parent_index)
} }
/// Byte offset where this partial segment ends
fn end_pos(&self) -> Option<u64> {
self.byte_range
.as_ref()
.map(|(len, start)| start.unwrap_or(0) + len)
}
} }
impl HlsVariant { impl HlsVariant {
const LOW_LATENCY: bool = false;
const LL_PARTS: usize = 3;
pub fn new<'a>( pub fn new<'a>(
out_dir: &'a str, out_dir: &'a str,
segment_length: f32,
group: usize, group: usize,
encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
@ -182,14 +195,6 @@ impl HlsVariant {
let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type); let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type);
std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?;
let mut opts = HashMap::new();
if let SegmentType::FMP4 = segment_type {
opts.insert("fflags".to_string(), "-autobsf".to_string());
opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
};
let mut mux = unsafe { let mut mux = unsafe {
Muxer::builder() Muxer::builder()
.with_output_path( .with_output_path(
@ -204,6 +209,7 @@ impl HlsVariant {
let mut streams = Vec::new(); let mut streams = Vec::new();
let mut ref_stream_index = -1; let mut ref_stream_index = -1;
let mut has_video = false; let mut has_video = false;
let mut seg_size = 2.0;
for (var, enc) in encoded_vars { for (var, enc) in encoded_vars {
match var { match var {
@ -218,6 +224,10 @@ impl HlsVariant {
has_video = true; has_video = true;
// Always use video stream as reference for segmentation // Always use video stream as reference for segmentation
ref_stream_index = stream_idx as _; ref_stream_index = stream_idx as _;
let v_seg = v.keyframe_interval as f32 / v.fps;
if v_seg > seg_size {
seg_size = v_seg;
}
}, },
VariantStream::Audio(a) => unsafe { VariantStream::Audio(a) => unsafe {
let stream = mux.add_stream_encoder(enc)?; let stream = mux.add_stream_encoder(enc)?;
@ -251,10 +261,30 @@ impl HlsVariant {
name, name,
ref_stream_index ref_stream_index
); );
let min_segment_length = if Self::LOW_LATENCY {
(seg_size * 3.0).max(6.0) // make segments 3x longer in LL mode or minimum 6s
} else {
2.0
};
let segment_length = seg_size.max(min_segment_length);
let mut opts = HashMap::new();
if let SegmentType::FMP4 = segment_type {
//opts.insert("fflags".to_string(), "-autobsf".to_string());
opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
};
let mut partial_seg_size = segment_length / Self::LL_PARTS as f32;
partial_seg_size -= partial_seg_size % seg_size; // align to keyframe
unsafe { unsafe {
mux.open(Some(opts))?; mux.open(Some(opts))?;
//av_dump_format(mux.context(), 0, ptr::null_mut(), 0);
} }
Ok(Self {
let mut variant = Self {
name: name.clone(), name: name.clone(),
segment_length, segment_length,
segment_window: 30.0, segment_window: 30.0,
@ -264,14 +294,25 @@ impl HlsVariant {
segments: Vec::new(), segments: Vec::new(),
out_dir: out_dir.to_string(), out_dir: out_dir.to_string(),
segment_type, segment_type,
end_pts: AV_NOPTS_VALUE,
duration: 0.0,
packets_written: 0, packets_written: 0,
ref_stream_index, ref_stream_index,
partial_target_duration: 0.33, partial_target_duration: partial_seg_size,
current_partial_index: 0, current_partial_index: 0,
current_partial_duration: 0.0, current_segment_start: 0.0,
}) current_partial_start: 0.0,
next_partial_independent: false,
low_latency: Self::LOW_LATENCY,
init_segment_path: None,
};
// Create initialization segment for fMP4
if segment_type == SegmentType::FMP4 {
unsafe {
variant.create_init_segment()?;
}
}
Ok(variant)
} }
pub fn segment_name(t: SegmentType, idx: u64) -> String { pub fn segment_name(t: SegmentType, idx: u64) -> String {
@ -299,46 +340,36 @@ impl HlsVariant {
.streams .streams
.add((*pkt).stream_index as usize); .add((*pkt).stream_index as usize);
let pkt_q = av_q2d((*pkt).time_base);
let mut result = EgressResult::None; let mut result = EgressResult::None;
let stream_type = (*(*pkt_stream).codecpar).codec_type; let stream_type = (*(*pkt_stream).codecpar).codec_type;
let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO
&& ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY); && ((*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY);
let mut is_ref_pkt = let mut is_ref_pkt = (*pkt).stream_index == self.ref_stream_index;
stream_type == AVMEDIA_TYPE_VIDEO && (*pkt).stream_index == self.ref_stream_index;
if (*pkt).pts == AV_NOPTS_VALUE { if (*pkt).pts == AV_NOPTS_VALUE {
can_split = false; can_split = false;
is_ref_pkt = false; is_ref_pkt = false;
} }
// check if current packet is keyframe, flush current segment if is_ref_pkt && self.packets_written > 0 {
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 { let pkt_pts = (*pkt).pts as f64 * pkt_q;
result = self.split_next_seg()?; let cur_duration = pkt_pts - self.current_segment_start;
} let cur_part_duration = pkt_pts - self.current_partial_start;
// track duration from pts // check if current packet is keyframe, flush current segment
if is_ref_pkt { if can_split && cur_duration >= self.segment_length as f64 {
if self.end_pts == AV_NOPTS_VALUE { result = self.split_next_seg(pkt_pts)?;
self.end_pts = (*pkt).pts; } else if self.low_latency && cur_part_duration >= self.partial_target_duration as f64 {
result = self.create_partial_segment(pkt_pts)?;
self.next_partial_independent = can_split;
} }
let pts_diff = (*pkt).pts - self.end_pts;
if pts_diff > 0 {
let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base);
self.duration += time_delta;
self.current_partial_duration += time_delta;
}
self.end_pts = (*pkt).pts;
} }
// write to current segment // write to current segment
self.mux.write_packet(pkt)?; self.mux.write_packet(pkt)?;
self.packets_written += 1; self.packets_written += 1;
// HLS-LL: write next partial segment
if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 {
self.create_partial_segment(can_split)?;
}
Ok(result) Ok(result)
} }
@ -347,53 +378,106 @@ impl HlsVariant {
} }
/// Create a partial segment for LL-HLS /// Create a partial segment for LL-HLS
fn create_partial_segment(&mut self, independent: bool) -> Result<()> { fn create_partial_segment(&mut self, next_pkt_start: f64) -> Result<EgressResult> {
let ctx = self.mux.context(); let ctx = self.mux.context();
let pos = unsafe { let end_pos = unsafe {
avio_flush((*ctx).pb); avio_flush((*ctx).pb);
avio_size((*ctx).pb) as u64 avio_size((*ctx).pb) as u64
}; };
let previous_partial_end = self.segments.last().and_then(|s| match &s { ensure!(end_pos > 0, "End position cannot be 0");
HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len), if self.segment_type == SegmentType::MPEGTS {
ensure!(
end_pos % 188 == 0,
"Invalid end position, must be multiple of 188"
);
}
let previous_end_pos = self
.segments
.last()
.and_then(|s| match &s {
HlsSegment::Partial(p) => p.end_pos(),
_ => None, _ => None,
}); })
.unwrap_or(0);
let partial_size = end_pos - previous_end_pos;
let partial_info = PartialSegmentInfo { let partial_info = PartialSegmentInfo {
index: self.current_partial_index, index: self.current_partial_index,
parent_index: self.idx, parent_index: self.idx,
parent_kind: self.segment_type, parent_kind: self.segment_type,
duration: self.current_partial_duration, duration: next_pkt_start - self.current_partial_start,
independent, independent: self.next_partial_independent,
byte_range: match previous_partial_end { byte_range: Some((partial_size, Some(previous_end_pos))),
Some(prev_end) => Some((pos - prev_end, Some(prev_end))),
_ => Some((pos, Some(0))),
},
}; };
trace!( debug!(
"{} created partial segment {} [{:.3}s, independent={}]", "{} created partial segment {} [{:.3}s, independent={}]",
self.name, self.name, partial_info.index, partial_info.duration, partial_info.independent,
partial_info.index,
partial_info.duration,
independent
); );
self.segments.push(HlsSegment::Partial(partial_info)); self.segments.push(HlsSegment::Partial(partial_info));
self.current_partial_index += 1; self.current_partial_index += 1;
self.current_partial_duration = 0.0; self.next_partial_independent = false;
self.current_partial_start = next_pkt_start;
self.write_playlist()?; self.write_playlist()?;
Ok(EgressResult::None)
}
/// Create initialization segment for fMP4
unsafe fn create_init_segment(&mut self) -> Result<()> {
if self.segment_type != SegmentType::FMP4 || self.init_segment_path.is_some() {
return Ok(());
}
let init_path = PathBuf::from(&self.out_dir)
.join(&self.name)
.join("init.mp4")
.to_string_lossy()
.to_string();
// Create a temporary muxer for initialization segment
let mut init_opts = HashMap::new();
init_opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
let mut init_mux = Muxer::builder()
.with_output_path(init_path.as_str(), Some("mp4"))?
.build()?;
// Copy stream parameters from main muxer
let main_ctx = self.mux.context();
for i in 0..(*main_ctx).nb_streams {
let src_stream = *(*main_ctx).streams.add(i as usize);
let s = init_mux.add_copy_stream(src_stream)?;
ensure!((*s).index == (*src_stream).index, "Stream index mismatch");
}
init_mux.open(Some(init_opts))?;
av_write_frame(init_mux.context(), ptr::null_mut());
init_mux.close()?;
self.init_segment_path = Some("init.mp4".to_string());
info!("Created fMP4 initialization segment: {}", init_path);
Ok(()) Ok(())
} }
/// Reset the muxer state and start the next segment /// Reset the muxer state and start the next segment
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> { unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result<EgressResult> {
let completed_segment_idx = self.idx; let completed_segment_idx = self.idx;
self.idx += 1; self.idx += 1;
self.current_partial_index = 0;
// Manually reset muxer avio // Manually reset muxer avio
let ctx = self.mux.context(); let ctx = self.mux.context();
av_write_frame(ctx, ptr::null_mut()); let ret = av_write_frame(ctx, ptr::null_mut());
if ret < 0 {
bail!("Failed to split segment {}", ret);
}
avio_flush((*ctx).pb); avio_flush((*ctx).pb);
avio_close((*ctx).pb); avio_close((*ctx).pb);
av_free((*ctx).url as *mut _); av_free((*ctx).url as *mut _);
@ -407,14 +491,6 @@ impl HlsVariant {
bail!("Failed to re-init avio"); bail!("Failed to re-init avio");
} }
// tell muxer it needs to write headers again
av_opt_set(
(*ctx).priv_data,
cstr!("events_flags"),
cstr!("resend_headers"),
0,
);
// Log the completed segment (previous index), not the next one // Log the completed segment (previous index), not the next one
let completed_seg_path = Self::map_segment_path( let completed_seg_path = Self::map_segment_path(
&self.out_dir, &self.out_dir,
@ -427,13 +503,15 @@ impl HlsVariant {
.metadata() .metadata()
.map(|m| m.len()) .map(|m| m.len())
.unwrap_or(0); .unwrap_or(0);
info!(
let cur_duration = next_pkt_start - self.current_segment_start;
debug!(
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]", "Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
completed_segment_path completed_segment_path
.file_name() .file_name()
.unwrap_or_default() .unwrap_or_default()
.to_string_lossy(), .to_string_lossy(),
self.duration, cur_duration,
segment_size as f32 / 1024f32, segment_size as f32 / 1024f32,
self.packets_written self.packets_written
); );
@ -465,17 +543,25 @@ impl HlsVariant {
let created = EgressSegment { let created = EgressSegment {
variant: video_var_id, variant: video_var_id,
idx: completed_segment_idx, idx: completed_segment_idx,
duration: self.duration as f32, duration: cur_duration as f32,
path: completed_segment_path, path: completed_segment_path,
}; };
if let Err(e) = self.push_segment(completed_segment_idx, self.duration as f32) { self.segments.push(HlsSegment::Full(SegmentInfo {
warn!("Failed to update playlist: {}", e); index: completed_segment_idx,
} duration: if self.playlist_version() >= 6 {
cur_duration.round() as _
} else {
cur_duration as _
},
kind: self.segment_type,
}));
self.write_playlist()?;
// Reset counters for next segment // Reset counters for next segment
self.packets_written = 0; self.packets_written = 0;
self.duration = 0.0; self.current_segment_start = next_pkt_start;
Ok(EgressResult::Segments { Ok(EgressResult::Segments {
created: vec![created], created: vec![created],
@ -489,17 +575,6 @@ impl HlsVariant {
.find(|a| matches!(*a, HlsVariantStream::Video { .. })) .find(|a| matches!(*a, HlsVariantStream::Video { .. }))
} }
/// Add a new segment to the variant and return a list of deleted segments
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
self.segments.push(HlsSegment::Full(SegmentInfo {
index: idx,
duration,
kind: self.segment_type,
}));
self.write_playlist()
}
/// Delete segments which are too old /// Delete segments which are too old
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> { fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
let drain_from_hls_segment = { let drain_from_hls_segment = {
@ -550,18 +625,56 @@ impl HlsVariant {
Ok(ret) Ok(ret)
} }
fn playlist_version(&self) -> usize {
if self.low_latency {
6
} else if self.segment_type == SegmentType::FMP4 {
6 // EXT-X-MAP without I-FRAMES-ONLY
} else {
3
}
}
fn write_playlist(&mut self) -> Result<()> { fn write_playlist(&mut self) -> Result<()> {
if self.segments.is_empty() { if self.segments.is_empty() {
return Ok(()); // Don't write empty playlists return Ok(()); // Don't write empty playlists
} }
let mut pl = m3u8_rs::MediaPlaylist::default(); let mut pl = m3u8_rs::MediaPlaylist::default();
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
pl.version = Some(6);
// Add EXT-X-MAP initialization segment for fMP4
if self.segment_type == SegmentType::FMP4 {
if let Some(ref init_path) = self.init_segment_path {
pl.unknown_tags.push(ExtTag {
tag: "X-MAP".to_string(),
rest: Some(format!("URI=\"{}\"", init_path)),
});
}
}
// append segment preload for next part segment
if let Some(HlsSegment::Partial(partial)) = self.segments.last() {
// TODO: try to estimate if there will be another partial segment
pl.segments.push(MediaSegmentType::PreloadHint(PreloadHint {
hint_type: "PART".to_string(),
uri: partial.filename(),
byte_range_start: partial.end_pos(),
byte_range_length: None,
}));
}
pl.version = Some(self.playlist_version());
pl.target_duration = if self.playlist_version() >= 6 {
self.segment_length.round() as _
} else {
self.segment_length
};
if self.low_latency {
pl.part_inf = Some(PartInf { pl.part_inf = Some(PartInf {
part_target: self.partial_target_duration as f64, part_target: self.partial_target_duration as f64,
}); });
}
pl.media_sequence = self pl.media_sequence = self
.segments .segments
.iter() .iter()
@ -570,7 +683,6 @@ impl HlsVariant {
_ => None, _ => None,
}) })
.unwrap_or(self.idx); .unwrap_or(self.idx);
// For live streams, don't set end list
pl.end_list = false; pl.end_list = false;
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?; let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
@ -578,31 +690,48 @@ impl HlsVariant {
Ok(()) Ok(())
} }
/// https://git.ffmpeg.org/gitweb/ffmpeg.git/blob/HEAD:/libavformat/hlsenc.c#l351 unsafe fn to_codec_attr(&self) -> Option<String> {
unsafe fn to_codec_attr(&self, stream: *mut AVStream) -> Option<String> { let mut codecs = Vec::new();
let p = (*stream).codecpar;
// Find video and audio streams and build codec string
for stream in &self.streams {
let av_stream = *(*self.mux.context()).streams.add(*stream.index());
let p = (*av_stream).codecpar;
match stream {
HlsVariantStream::Video { .. } => {
if (*p).codec_id == AV_CODEC_ID_H264 { if (*p).codec_id == AV_CODEC_ID_H264 {
let data = (*p).extradata; // Use profile and level from codec parameters
if !data.is_null() { let profile_idc = (*p).profile as u8;
let mut id_ptr = ptr::null_mut(); let level_idc = (*p).level as u8;
let ds: *mut u16 = data as *mut u16;
if (*ds) == 1 && (*data.add(4)) & 0x1F == 7 { // For H.264, constraint flags are typically 0 unless specified
id_ptr = data.add(5); // Common constraint flags: 0x40 (constraint_set1_flag) for baseline
} else if (*ds) == 1 && (*data.add(3)) & 0x1F == 7 { let constraint_flags = match profile_idc {
id_ptr = data.add(4); 66 => 0x40, // Baseline profile
} else if *data.add(0) == 1 { _ => 0x00, // Main/High profiles typically have no constraints
id_ptr = data.add(1); };
} else {
return None; let avc1_code = format!(
"avc1.{:02x}{:02x}{:02x}",
profile_idc, constraint_flags, level_idc
);
codecs.push(avc1_code);
}
}
HlsVariantStream::Audio { .. } => {
// Standard AAC-LC codec string
codecs.push("mp4a.40.2".to_string());
}
_ => {}
}
} }
return Some(format!( if codecs.is_empty() {
"avc1.{}",
hex::encode([*id_ptr.add(0), *id_ptr.add(1), *id_ptr.add(2)])
));
}
}
None None
} else {
Some(codecs.join(","))
}
} }
pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream { pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream {
@ -613,9 +742,9 @@ impl HlsVariant {
m3u8_rs::VariantStream { m3u8_rs::VariantStream {
is_i_frame: false, is_i_frame: false,
uri: format!("{}/live.m3u8", self.name), uri: format!("{}/live.m3u8", self.name),
bandwidth: 0, bandwidth: (*codec_par).bit_rate as u64,
average_bandwidth: Some((*codec_par).bit_rate as u64), average_bandwidth: None,
codecs: self.to_codec_attr(av_stream), codecs: self.to_codec_attr(),
resolution: Some(m3u8_rs::Resolution { resolution: Some(m3u8_rs::Resolution {
width: (*codec_par).width as _, width: (*codec_par).width as _,
height: (*codec_par).height as _, height: (*codec_par).height as _,
@ -641,7 +770,6 @@ impl HlsMuxer {
pub fn new<'a>( pub fn new<'a>(
id: &Uuid, id: &Uuid,
out_dir: &str, out_dir: &str,
segment_length: f32,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>, encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType, segment_type: SegmentType,
) -> Result<Self> { ) -> Result<Self> {
@ -655,13 +783,7 @@ impl HlsMuxer {
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
.chunk_by(|a| a.0.group_id()) .chunk_by(|a| a.0.group_id())
{ {
let var = HlsVariant::new( let var = HlsVariant::new(base.to_str().unwrap(), k, group, segment_type)?;
base.to_str().unwrap(),
segment_length,
k,
group,
segment_type,
)?;
vars.push(var); vars.push(var);
} }

View File

@ -27,7 +27,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
use ffmpeg_rs_raw::{ use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType, cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, Encoder, Resample, Scaler, StreamType,
}; };
use log::{error, info, warn}; use log::{debug, error, info, warn};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
@ -208,13 +208,15 @@ impl PipelineRunner {
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> { unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 { if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
let frame = av_frame_clone(frame).addr(); let frame = av_frame_clone(frame).addr();
let dst_pic = PathBuf::from(&self.out_dir) let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string());
.join(self.connection.id.to_string()) if !dir.exists() {
.join("thumb.webp"); std::fs::create_dir_all(&dir)?;
}
std::thread::spawn(move || unsafe { std::thread::spawn(move || unsafe {
let mut frame = frame as *mut AVFrame; //TODO: danger?? let mut frame = frame as *mut AVFrame; //TODO: danger??
let thumb_start = Instant::now(); let thumb_start = Instant::now();
let dst_pic = dir.join("thumb.webp");
if let Err(e) = Self::save_thumb(frame, &dst_pic) { if let Err(e) = Self::save_thumb(frame, &dst_pic) {
warn!("Failed to save thumb: {}", e); warn!("Failed to save thumb: {}", e);
} }
@ -609,7 +611,7 @@ impl PipelineRunner {
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 { if elapsed >= 2f32 {
let n_frames = self.frame_ctr - self.fps_last_frame_ctr; let n_frames = self.frame_ctr - self.fps_last_frame_ctr;
info!("Average fps: {:.2}", n_frames as f32 / elapsed); debug!("Average fps: {:.2}", n_frames as f32 / elapsed);
self.fps_counter_start = Instant::now(); self.fps_counter_start = Instant::now();
self.fps_last_frame_ctr = self.frame_ctr; self.fps_last_frame_ctr = self.frame_ctr;
} }
@ -705,7 +707,6 @@ impl PipelineRunner {
let hls = HlsEgress::new( let hls = HlsEgress::new(
&self.connection.id, &self.connection.id,
&self.out_dir, &self.out_dir,
6.0, // TODO: configure segment length
encoders, encoders,
SegmentType::MPEGTS, SegmentType::MPEGTS,
)?; )?;

View File

@ -0,0 +1,934 @@
use crate::generator::FrameGenerator;
use crate::mux::{HlsMuxer, SegmentType};
use crate::variant::audio::AudioVariant;
use crate::variant::mapping::VariantMapping;
use crate::variant::video::VideoVariant;
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{Context, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_q2d, AVMediaType::AVMEDIA_TYPE_AUDIO, AVMediaType::AVMEDIA_TYPE_VIDEO,
AVPixelFormat::AV_PIX_FMT_YUV420P, AVRational, AVSampleFormat::AV_SAMPLE_FMT_FLTP,
AV_NOPTS_VALUE, AV_PROFILE_H264_MAIN,
};
use ffmpeg_rs_raw::{Demuxer, Encoder};
use m3u8_rs::{parse_media_playlist, MediaSegmentType};
use std::collections::HashMap;
use std::fs;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct HlsTimingResult {
pub playlist_duration: f32,
pub actual_duration: f64,
pub video_duration: f64,
pub audio_duration: f64,
pub difference: f64,
pub segment_name: String,
pub is_partial: bool,
pub independent: bool,
}
#[derive(Debug)]
pub struct HlsTimingTestResult {
pub total_segments: usize,
pub full_segments: usize,
pub partial_segments: usize,
pub independent_partials: usize,
pub total_playlist_duration: f32,
pub total_actual_duration: f64,
pub total_difference: f64,
pub average_difference: f64,
pub min_difference: f64,
pub max_difference: f64,
pub problematic_segments: Vec<HlsTimingResult>,
pub segments: Vec<HlsTimingResult>,
pub test_duration: Duration,
pub success: bool,
pub error_message: Option<String>,
}
impl HlsTimingTestResult {
/// Check if the HLS timing test passed based on thresholds
pub fn passes(&self, max_avg_diff: f64, max_individual_diff: f64) -> bool {
self.success
&& self.average_difference.abs() <= max_avg_diff
&& self
.problematic_segments
.iter()
.all(|s| s.difference.abs() <= max_individual_diff)
}
/// Get a summary of the test results
pub fn summary(&self) -> String {
if !self.success {
return format!(
"FAILED: {}",
self.error_message.as_deref().unwrap_or("Unknown error")
);
}
format!(
"PASSED: {} segments, avg diff: {:.3}s, {} problematic",
self.total_segments,
self.average_difference,
self.problematic_segments.len()
)
}
}
pub struct HlsTimingTester {
max_avg_difference: f64,
max_individual_difference: f64,
problematic_threshold: f64,
}
impl Default for HlsTimingTester {
fn default() -> Self {
Self {
max_avg_difference: 0.1, // 100ms average difference
max_individual_difference: 0.5, // 500ms individual difference
problematic_threshold: 0.2, // 200ms considered problematic
}
}
}
impl HlsTimingTester {
pub fn new(max_avg_diff: f64, max_individual_diff: f64, problematic_threshold: f64) -> Self {
Self {
max_avg_difference: max_avg_diff,
max_individual_difference: max_individual_diff,
problematic_threshold,
}
}
/// Generate and test HLS stream with test pattern
pub fn test_generated_stream(
&self,
output_dir: &Path,
duration_seconds: f32,
segment_type: SegmentType,
) -> Result<HlsTimingTestResult> {
let start_time = Instant::now();
// Generate test stream
let stream_id = Uuid::new_v4();
let hls_dir =
self.generate_test_stream(output_dir, &stream_id, duration_seconds, segment_type)?;
// Test the generated stream
match self.test_stream_timing_internal(&hls_dir) {
Ok(mut result) => {
result.test_duration = start_time.elapsed();
result.success =
result.passes(self.max_avg_difference, self.max_individual_difference);
Ok(result)
}
Err(e) => Ok(HlsTimingTestResult {
total_segments: 0,
full_segments: 0,
partial_segments: 0,
independent_partials: 0,
total_playlist_duration: 0.0,
total_actual_duration: 0.0,
total_difference: 0.0,
average_difference: 0.0,
min_difference: 0.0,
max_difference: 0.0,
problematic_segments: Vec::new(),
segments: Vec::new(),
test_duration: start_time.elapsed(),
success: false,
error_message: Some(e.to_string()),
}),
}
}
/// Generate test HLS stream with test pattern
fn generate_test_stream(
&self,
output_dir: &Path,
stream_id: &Uuid,
duration_seconds: f32,
segment_type: SegmentType,
) -> Result<PathBuf> {
const VIDEO_FPS: f32 = 30.0;
const VIDEO_WIDTH: u16 = 1280;
const VIDEO_HEIGHT: u16 = 720;
const SAMPLE_RATE: u32 = 44100;
// Create video encoder
let mut video_encoder = unsafe {
Encoder::new_with_name("libx264")?
.with_stream_index(0)
.with_framerate(VIDEO_FPS)?
.with_bitrate(1_000_000)
.with_pix_fmt(AV_PIX_FMT_YUV420P)
.with_width(VIDEO_WIDTH as _)
.with_height(VIDEO_HEIGHT as _)
.with_level(51)
.with_profile(AV_PROFILE_H264_MAIN)
.open(None)?
};
// Create audio encoder
let mut audio_encoder = unsafe {
Encoder::new_with_name("aac")?
.with_stream_index(1)
.with_default_channel_layout(1)
.with_bitrate(128_000)
.with_sample_format(AV_SAMPLE_FMT_FLTP)
.with_sample_rate(SAMPLE_RATE as _)?
.open(None)?
};
// Create variant streams
let video_stream = VideoVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: 0,
dst_index: 0,
group_id: 0,
},
width: VIDEO_WIDTH,
height: VIDEO_HEIGHT,
fps: VIDEO_FPS,
bitrate: 1_000_000,
codec: "libx264".to_string(),
profile: AV_PROFILE_H264_MAIN as usize,
level: 51,
keyframe_interval: 60,
pixel_format: AV_PIX_FMT_YUV420P as u32,
};
let audio_stream = AudioVariant {
mapping: VariantMapping {
id: Uuid::new_v4(),
src_index: 1,
dst_index: 1,
group_id: 0,
},
bitrate: 128_000,
codec: "aac".to_string(),
channels: 1,
sample_rate: SAMPLE_RATE as usize,
sample_fmt: "fltp".to_string(),
};
let video_variant = VariantStream::Video(video_stream.clone());
let audio_variant = VariantStream::Audio(audio_stream.clone());
let variants = vec![
(&video_variant, &video_encoder),
(&audio_variant, &audio_encoder),
];
// Create HLS muxer
let mut hls_muxer = HlsMuxer::new(
&stream_id,
output_dir.to_str().unwrap(),
variants.into_iter(),
segment_type,
)?;
// Create frame generator
let frame_size = unsafe { (*audio_encoder.codec_context()).frame_size as _ };
let mut frame_gen = FrameGenerator::new(
VIDEO_FPS,
VIDEO_WIDTH,
VIDEO_HEIGHT,
AV_PIX_FMT_YUV420P,
SAMPLE_RATE,
frame_size,
1,
AVRational {
num: 1,
den: VIDEO_FPS as i32,
},
AVRational {
num: 1,
den: SAMPLE_RATE as i32,
},
)?;
frame_gen.set_realtime(false);
// Generate frames for the specified duration
let total_video_frames = (duration_seconds * VIDEO_FPS) as u64;
let mut video_frames_generated = 0;
while video_frames_generated < total_video_frames {
unsafe {
frame_gen.begin()?;
frame_gen.write_text(
&format!("Video Frame: {}", video_frames_generated),
40.0,
50.0,
50.0,
)?;
frame_gen.write_text(
&format!("Time: {:.1}s", video_frames_generated as f32 / VIDEO_FPS),
40.0,
50.0,
100.0,
)?;
let mut frame = frame_gen.next()?;
if frame.is_null() {
log::warn!("FrameGenerator returned null frame unexpectedly");
break;
}
// Determine if this is audio or video frame and encode accordingly
if (*frame).sample_rate > 0 {
// Audio frame - don't increment video counter
log::debug!("Generated audio frame, PTS: {}", (*frame).pts);
for mut pkt in audio_encoder.encode_frame(frame)? {
let result = hls_muxer.mux_packet(pkt, &audio_stream.id())?;
if let crate::egress::EgressResult::Segments {
created,
deleted: _,
} = result
{
for segment in created {
log::debug!("Created audio segment: {:?}", segment.path);
}
}
ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt);
}
} else {
// Video frame - increment video counter
log::debug!(
"Generated video frame {}, PTS: {}",
video_frames_generated,
(*frame).pts
);
for mut pkt in video_encoder.encode_frame(frame)? {
let result = hls_muxer.mux_packet(pkt, &video_stream.id())?;
if let crate::egress::EgressResult::Segments {
created,
deleted: _,
} = result
{
for segment in created {
log::debug!("Created video segment: {:?}", segment.path);
}
}
ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt);
}
video_frames_generated += 1;
}
ffmpeg_rs_raw::ffmpeg_sys_the_third::av_frame_free(&mut frame);
}
}
// Flush encoders to ensure all packets are written
unsafe {
// Flush video encoder
for mut pkt in video_encoder.encode_frame(std::ptr::null_mut())? {
hls_muxer.mux_packet(pkt, &video_stream.id())?;
ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt);
}
// Flush audio encoder
for mut pkt in audio_encoder.encode_frame(std::ptr::null_mut())? {
hls_muxer.mux_packet(pkt, &audio_stream.id())?;
ffmpeg_rs_raw::ffmpeg_sys_the_third::av_packet_free(&mut pkt);
}
}
log::info!(
"Generated {} video frames ({:.1}s) of test HLS stream at",
video_frames_generated,
video_frames_generated as f32 / VIDEO_FPS
);
Ok(output_dir.join(stream_id.to_string()).join("stream_0"))
}
/// Test HLS timing for a specific stream directory
pub fn test_stream_timing(&self, hls_dir: &Path) -> HlsTimingTestResult {
let start_time = Instant::now();
match self.test_stream_timing_internal(hls_dir) {
Ok(mut result) => {
result.test_duration = start_time.elapsed();
result.success =
result.passes(self.max_avg_difference, self.max_individual_difference);
result
}
Err(e) => HlsTimingTestResult {
total_segments: 0,
full_segments: 0,
partial_segments: 0,
independent_partials: 0,
total_playlist_duration: 0.0,
total_actual_duration: 0.0,
total_difference: 0.0,
average_difference: 0.0,
min_difference: 0.0,
max_difference: 0.0,
problematic_segments: Vec::new(),
segments: Vec::new(),
test_duration: start_time.elapsed(),
success: false,
error_message: Some(e.to_string()),
},
}
}
fn test_stream_timing_internal(&self, hls_dir: &Path) -> Result<HlsTimingTestResult> {
let playlist_path = hls_dir.join("live.m3u8");
if !playlist_path.exists() {
return Err(anyhow::anyhow!(
"Playlist file does not exist: {:?}",
playlist_path
));
}
// Parse the playlist
let playlist_content =
fs::read_to_string(&playlist_path).context("Failed to read playlist file")?;
let (_, playlist) = parse_media_playlist(playlist_content.as_bytes())
.map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?;
let mut segments = Vec::new();
let mut total_playlist_duration = 0.0f32;
let mut total_actual_duration = 0.0f64;
// Analyze each segment
for segment_type in &playlist.segments {
match segment_type {
MediaSegmentType::Full(segment) => {
let segment_path = hls_dir.join(&segment.uri);
if !segment_path.exists() {
continue; // Skip missing segments
}
let durations = self.analyze_segment(&segment_path)?;
let actual_duration = durations.total_duration;
let video_duration = durations.video_duration;
let audio_duration = durations.audio_duration;
let playlist_duration = segment.duration;
let difference = actual_duration - playlist_duration as f64;
let result = HlsTimingResult {
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_name: segment.uri.clone(),
is_partial: false,
independent: false,
};
segments.push(result);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::Partial(partial) => {
let segment_path = hls_dir.join(&partial.uri);
if !segment_path.exists() {
continue; // Skip missing segments
}
let durations = if let Some(byte_range) = &partial.byte_range {
self.analyze_partial_segment(
&segment_path,
byte_range.length,
byte_range.offset,
)?
} else {
self.analyze_segment(&segment_path)?
};
let actual_duration = durations.total_duration;
let video_duration = durations.video_duration;
let audio_duration = durations.audio_duration;
let playlist_duration = partial.duration as f32;
let difference = actual_duration - playlist_duration as f64;
let result = HlsTimingResult {
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_name: partial.uri.clone(),
is_partial: true,
independent: partial.independent,
};
segments.push(result);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::PreloadHint(_) => {
// Skip preload hints
continue;
}
}
}
// Calculate statistics
let full_segments = segments.iter().filter(|s| !s.is_partial).count();
let partial_segments = segments.iter().filter(|s| s.is_partial).count();
let independent_partials = segments
.iter()
.filter(|s| s.is_partial && s.independent)
.count();
let total_difference = total_actual_duration - total_playlist_duration as f64;
let average_difference = if !segments.is_empty() {
total_difference / segments.len() as f64
} else {
0.0
};
let differences: Vec<f64> = segments.iter().map(|s| s.difference).collect();
let min_difference = differences.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_difference = differences.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
// Find problematic segments
let problematic_segments: Vec<HlsTimingResult> = segments
.iter()
.filter(|s| s.difference.abs() > self.problematic_threshold)
.cloned()
.collect();
Ok(HlsTimingTestResult {
total_segments: segments.len(),
full_segments,
partial_segments,
independent_partials,
total_playlist_duration,
total_actual_duration,
total_difference,
average_difference,
min_difference,
max_difference,
problematic_segments,
segments,
test_duration: Duration::from_secs(0), // Will be set by caller
success: true, // Will be determined by caller
error_message: None,
})
}
/// Test multiple HLS streams concurrently
pub async fn test_multiple_streams(
&self,
hls_dirs: Vec<PathBuf>,
) -> HashMap<PathBuf, HlsTimingTestResult> {
let mut results = HashMap::new();
// Run tests concurrently
let futures: Vec<_> = hls_dirs
.into_iter()
.map(|dir| {
let tester = HlsTimingTester::new(
self.max_avg_difference,
self.max_individual_difference,
self.problematic_threshold,
);
let dir_clone = dir.clone();
async move {
let result =
tokio::task::spawn_blocking(move || tester.test_stream_timing(&dir_clone))
.await
.unwrap_or_else(|_| HlsTimingTestResult {
total_segments: 0,
full_segments: 0,
partial_segments: 0,
independent_partials: 0,
total_playlist_duration: 0.0,
total_actual_duration: 0.0,
total_difference: 0.0,
average_difference: 0.0,
min_difference: 0.0,
max_difference: 0.0,
problematic_segments: Vec::new(),
segments: Vec::new(),
test_duration: Duration::from_secs(0),
success: false,
error_message: Some("Task panicked".to_string()),
});
(dir, result)
}
})
.collect();
let resolved_futures = futures::future::join_all(futures).await;
for (dir, result) in resolved_futures {
results.insert(dir, result);
}
results
}
fn analyze_segment(&self, path: &Path) -> Result<SegmentDurations> {
let file = fs::File::open(path)
.with_context(|| format!("Failed to open file: {}", path.display()))?;
self.analyze_segment_with_reader(Box::new(file))
}
fn analyze_partial_segment(
&self,
path: &Path,
length: u64,
offset: Option<u64>,
) -> Result<SegmentDurations> {
let reader = ByteRangeReader::new(path, length, offset)?;
self.analyze_segment_with_reader(Box::new(reader))
}
fn analyze_segment_with_reader(&self, reader: Box<dyn Read>) -> Result<SegmentDurations> {
let mut demuxer = Demuxer::new_custom_io(reader, None)?;
unsafe {
demuxer.probe_input()?;
}
let mut video_start_pts = AV_NOPTS_VALUE;
let mut video_end_pts = AV_NOPTS_VALUE;
let mut audio_start_pts = AV_NOPTS_VALUE;
let mut audio_end_pts = AV_NOPTS_VALUE;
let mut video_last_duration = 0i64;
let mut audio_last_duration = 0i64;
let mut video_stream_idx: Option<usize> = None;
let mut audio_stream_idx: Option<usize> = None;
// Read all packets and track timing
loop {
let packet_result = unsafe { demuxer.get_packet() };
match packet_result {
Ok((pkt, stream)) => {
if pkt.is_null() {
break;
}
unsafe {
let codec_type = (*(*stream).codecpar).codec_type;
let pts = (*pkt).pts;
let duration = (*pkt).duration;
let current_stream_idx = (*stream).index as usize;
match codec_type {
AVMEDIA_TYPE_VIDEO => {
if video_stream_idx.is_none() {
video_stream_idx = Some(current_stream_idx);
}
if pts != AV_NOPTS_VALUE {
if video_start_pts == AV_NOPTS_VALUE {
video_start_pts = pts;
}
video_end_pts = pts;
video_last_duration = duration;
}
}
AVMEDIA_TYPE_AUDIO => {
if audio_stream_idx.is_none() {
audio_stream_idx = Some(current_stream_idx);
}
if pts != AV_NOPTS_VALUE {
if audio_start_pts == AV_NOPTS_VALUE {
audio_start_pts = pts;
}
audio_end_pts = pts;
audio_last_duration = duration;
}
}
_ => {}
}
}
}
Err(_) => break,
}
}
// Calculate durations
let video_duration = if let Some(stream_idx) = video_stream_idx {
if video_start_pts != AV_NOPTS_VALUE && video_end_pts != AV_NOPTS_VALUE {
unsafe {
let stream = demuxer.get_stream(stream_idx)?;
let time_base = (*stream).time_base;
let pts_duration = (video_end_pts - video_start_pts) as f64 * av_q2d(time_base);
let last_pkt_duration = video_last_duration as f64 * av_q2d(time_base);
pts_duration + last_pkt_duration
}
} else {
0.0
}
} else {
0.0
};
let audio_duration = if let Some(stream_idx) = audio_stream_idx {
if audio_start_pts != AV_NOPTS_VALUE && audio_end_pts != AV_NOPTS_VALUE {
unsafe {
let stream = demuxer.get_stream(stream_idx)?;
let time_base = (*stream).time_base;
let pts_duration = (audio_end_pts - audio_start_pts) as f64 * av_q2d(time_base);
let last_pkt_duration = audio_last_duration as f64 * av_q2d(time_base);
pts_duration + last_pkt_duration
}
} else {
0.0
}
} else {
0.0
};
let total_duration = video_duration.max(audio_duration);
Ok(SegmentDurations {
total_duration,
video_duration,
audio_duration,
})
}
}
#[derive(Debug)]
struct SegmentDurations {
total_duration: f64,
video_duration: f64,
audio_duration: f64,
}
/// Custom IO reader for byte range access
struct ByteRangeReader {
file: fs::File,
start_offset: u64,
length: u64,
current_pos: u64,
}
impl ByteRangeReader {
fn new(path: &Path, length: u64, offset: Option<u64>) -> Result<Self> {
use std::io::{Seek, SeekFrom};
let mut file = fs::File::open(path)
.with_context(|| format!("Failed to open file: {}", path.display()))?;
let start_offset = offset.unwrap_or(0);
file.seek(SeekFrom::Start(start_offset))
.with_context(|| format!("Failed to seek to offset {}", start_offset))?;
Ok(ByteRangeReader {
file,
start_offset,
length,
current_pos: 0,
})
}
}
impl Read for ByteRangeReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let remaining = self.length - self.current_pos;
if remaining == 0 {
return Ok(0);
}
let to_read = std::cmp::min(buf.len() as u64, remaining) as usize;
let bytes_read = self.file.read(&mut buf[..to_read])?;
self.current_pos += bytes_read as u64;
Ok(bytes_read)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_timing_tester_creation() {
let tester = HlsTimingTester::default();
assert_eq!(tester.max_avg_difference, 0.1);
assert_eq!(tester.max_individual_difference, 0.5);
assert_eq!(tester.problematic_threshold, 0.2);
}
#[test]
fn test_timing_result_passes() {
let result = HlsTimingTestResult {
total_segments: 10,
full_segments: 8,
partial_segments: 2,
independent_partials: 1,
total_playlist_duration: 20.0,
total_actual_duration: 20.05,
total_difference: 0.05,
average_difference: 0.005,
min_difference: -0.01,
max_difference: 0.02,
problematic_segments: Vec::new(),
segments: Vec::new(),
test_duration: Duration::from_millis(100),
success: true,
error_message: None,
};
assert!(result.passes(0.1, 0.5));
assert!(!result.passes(0.001, 0.5));
}
#[test]
fn test_missing_playlist() {
let temp_dir = tempdir().unwrap();
let tester = HlsTimingTester::default();
let result = tester.test_stream_timing(temp_dir.path());
assert!(!result.success);
assert!(result.error_message.is_some());
assert!(result.error_message.unwrap().contains("does not exist"));
}
#[test]
fn test_generated_hls_stream_mpegts() {
env_logger::try_init().ok();
let temp_dir = tempdir().unwrap();
let tester = HlsTimingTester::new(0.2, 1.0, 0.5); // More lenient thresholds for test
let result = tester.test_generated_stream(
temp_dir.path(),
10.0, // 10 seconds
SegmentType::MPEGTS,
);
match result {
Ok(test_result) => {
assert!(
test_result.success,
"Test should pass: {}",
test_result.summary()
);
assert!(
test_result.total_segments > 0,
"Should have generated segments"
);
assert!(
test_result.total_playlist_duration > 8.0,
"Should have ~10s of content"
);
assert!(test_result.full_segments > 0, "Should have full segments");
println!("✓ MPEG-TS test passed: {}", test_result.summary());
}
Err(e) => {
panic!("Test generation failed: {}", e);
}
}
}
#[ignore]
#[test]
fn test_generated_hls_stream_fmp4() {
env_logger::try_init().ok();
let temp_dir = tempdir().unwrap();
let tester = HlsTimingTester::new(0.2, 1.0, 0.5); // More lenient thresholds for test
let result = tester.test_generated_stream(
temp_dir.path(),
8.0, // 8 seconds
SegmentType::FMP4,
);
match result {
Ok(test_result) => {
assert!(
test_result.success,
"Test should pass: {}",
test_result.summary()
);
assert!(
test_result.total_segments > 0,
"Should have generated segments"
);
assert!(
test_result.total_playlist_duration > 6.0,
"Should have ~8s of content"
);
assert!(test_result.full_segments > 0, "Should have full segments");
println!("✓ fMP4 test passed: {}", test_result.summary());
}
Err(e) => {
panic!("Test generation failed: {}", e);
}
}
}
#[test]
fn test_30_second_stream() {
env_logger::try_init().ok();
let temp_dir = tempdir().unwrap();
let tester = HlsTimingTester::default();
let result = tester.test_generated_stream(
temp_dir.path(),
30.0, // 30 seconds as requested
SegmentType::MPEGTS,
);
match result {
Ok(test_result) => {
println!("{:?}", test_result);
println!("30-second stream test results:");
println!(" Total segments: {}", test_result.total_segments);
println!(" Full segments: {}", test_result.full_segments);
println!(" Partial segments: {}", test_result.partial_segments);
println!(
" Total playlist duration: {:.1}s",
test_result.total_playlist_duration
);
println!(
" Total actual duration: {:.1}s",
test_result.total_actual_duration
);
println!(
" Average difference: {:.3}s",
test_result.average_difference
);
println!(" Test duration: {:?}", test_result.test_duration);
println!(" Result: {}", test_result.summary());
assert!(
test_result.success,
"30s test should pass: {}",
test_result.summary()
);
assert!(
test_result.total_segments >= 2,
"Should have multiple segments for 30s"
);
assert!(
test_result.total_playlist_duration >= 25.0,
"Should have ~30s of content"
);
if !test_result.problematic_segments.is_empty() {
println!(" Problematic segments:");
for seg in &test_result.problematic_segments {
println!(
" {}: {:.3}s difference",
seg.segment_name, seg.difference
);
}
}
}
Err(e) => {
panic!("30-second test generation failed: {}", e);
}
}
}
}

View File

@ -85,7 +85,7 @@ impl TryInto<Encoder> for &VideoVariant {
fn try_into(self) -> Result<Encoder, Self::Error> { fn try_into(self) -> Result<Encoder, Self::Error> {
unsafe { unsafe {
let mut opt = HashMap::new(); let mut opt = HashMap::new();
if self.codec == "x264" { if self.codec == "x264" || self.codec == "libx264" {
opt.insert("preset".to_string(), "fast".to_string()); opt.insert("preset".to_string(), "fast".to_string());
//opt.insert("tune".to_string(), "zerolatency".to_string()); //opt.insert("tune".to_string(), "zerolatency".to_string());
} }

View File

@ -1,10 +1,10 @@
use data_encoding::BASE32_NOPAD;
use log::debug;
use sha2::{Digest, Sha256};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::task; use tokio::task;
use log::debug;
use sha2::{Digest, Sha256};
use data_encoding::BASE32_NOPAD;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ViewerInfo { pub struct ViewerInfo {
@ -55,7 +55,13 @@ impl ViewerTracker {
BASE32_NOPAD.encode(fingerprint).to_lowercase() BASE32_NOPAD.encode(fingerprint).to_lowercase()
} }
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) { pub fn track_viewer(
&self,
token: &str,
stream_id: &str,
ip_address: &str,
user_agent: Option<String>,
) {
let mut viewers = self.viewers.write().unwrap(); let mut viewers = self.viewers.write().unwrap();
let viewer_info = ViewerInfo { let viewer_info = ViewerInfo {
@ -76,14 +82,16 @@ impl ViewerTracker {
pub fn get_viewer_count(&self, stream_id: &str) -> usize { pub fn get_viewer_count(&self, stream_id: &str) -> usize {
let viewers = self.viewers.read().unwrap(); let viewers = self.viewers.read().unwrap();
viewers.values() viewers
.values()
.filter(|v| v.stream_id == stream_id) .filter(|v| v.stream_id == stream_id)
.count() .count()
} }
pub fn get_active_viewers(&self, stream_id: &str) -> Vec<String> { pub fn get_active_viewers(&self, stream_id: &str) -> Vec<String> {
let viewers = self.viewers.read().unwrap(); let viewers = self.viewers.read().unwrap();
viewers.iter() viewers
.iter()
.filter(|(_, v)| v.stream_id == stream_id) .filter(|(_, v)| v.stream_id == stream_id)
.map(|(token, _)| token.clone()) .map(|(token, _)| token.clone())
.collect() .collect()
@ -109,15 +117,20 @@ impl ViewerTracker {
let mut viewers = self.viewers.write().unwrap(); let mut viewers = self.viewers.write().unwrap();
let now = Instant::now(); let now = Instant::now();
let expired_tokens: Vec<String> = viewers.iter() let expired_tokens: Vec<String> = viewers
.iter()
.filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration) .filter(|(_, viewer)| now.duration_since(viewer.last_seen) > self.timeout_duration)
.map(|(token, _)| token.clone()) .map(|(token, _)| token.clone())
.collect(); .collect();
for token in expired_tokens { for token in expired_tokens {
if let Some(viewer) = viewers.remove(&token) { if let Some(viewer) = viewers.remove(&token) {
debug!("Expired viewer {} from stream {} (last seen {:?} ago)", debug!(
token, viewer.stream_id, now.duration_since(viewer.last_seen)); "Expired viewer {} from stream {} (last seen {:?} ago)",
token,
viewer.stream_id,
now.duration_since(viewer.last_seen)
);
} }
} }
} }
@ -142,7 +155,10 @@ mod tests {
let token1 = ViewerTracker::generate_viewer_token(ip, user_agent); let token1 = ViewerTracker::generate_viewer_token(ip, user_agent);
let token2 = ViewerTracker::generate_viewer_token(ip, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent);
assert_eq!(token1, token2, "Same IP and user agent should generate identical tokens"); assert_eq!(
token1, token2,
"Same IP and user agent should generate identical tokens"
);
} }
#[test] #[test]
@ -155,7 +171,10 @@ mod tests {
let token1 = ViewerTracker::generate_viewer_token(ip1, user_agent); let token1 = ViewerTracker::generate_viewer_token(ip1, user_agent);
let token2 = ViewerTracker::generate_viewer_token(ip2, user_agent); let token2 = ViewerTracker::generate_viewer_token(ip2, user_agent);
assert_ne!(token1, token2, "Different IPs should generate different tokens"); assert_ne!(
token1, token2,
"Different IPs should generate different tokens"
);
} }
#[test] #[test]
@ -166,7 +185,10 @@ mod tests {
let token1 = ViewerTracker::generate_viewer_token(ip, None); let token1 = ViewerTracker::generate_viewer_token(ip, None);
let token2 = ViewerTracker::generate_viewer_token(ip, None); let token2 = ViewerTracker::generate_viewer_token(ip, None);
assert_eq!(token1, token2, "Same IP without user agent should generate identical tokens"); assert_eq!(
token1, token2,
"Same IP without user agent should generate identical tokens"
);
} }
#[test] #[test]
@ -178,8 +200,12 @@ mod tests {
let token = ViewerTracker::generate_viewer_token(ip, user_agent); let token = ViewerTracker::generate_viewer_token(ip, user_agent);
// Should be base32 encoded (lowercase, no padding) // Should be base32 encoded (lowercase, no padding)
assert!(token.chars().all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)), assert!(
"Token should only contain base32 characters"); token
.chars()
.all(|c| "abcdefghijklmnopqrstuvwxyz234567".contains(c)),
"Token should only contain base32 characters"
);
assert!(token.len() > 10, "Token should be reasonably long"); assert!(token.len() > 10, "Token should be reasonably long");
} }
@ -193,6 +219,9 @@ mod tests {
let token1 = ViewerTracker::generate_viewer_token(ip, user_agent1); let token1 = ViewerTracker::generate_viewer_token(ip, user_agent1);
let token2 = ViewerTracker::generate_viewer_token(ip, user_agent2); let token2 = ViewerTracker::generate_viewer_token(ip, user_agent2);
assert_ne!(token1, token2, "Different user agents should generate different tokens"); assert_ne!(
token1, token2,
"Different user agents should generate different tokens"
);
} }
} }

View File

@ -94,7 +94,7 @@ impl ZapStreamDb {
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> { pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {
sqlx::query( sqlx::query(
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ? where id = ?", "update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ?, endpoint_id = ? where id = ?",
) )
.bind(&user_stream.state) .bind(&user_stream.state)
.bind(&user_stream.starts) .bind(&user_stream.starts)
@ -109,6 +109,7 @@ impl ZapStreamDb {
.bind(&user_stream.pinned) .bind(&user_stream.pinned)
.bind(&user_stream.fee) .bind(&user_stream.fee)
.bind(&user_stream.event) .bind(&user_stream.event)
.bind(&user_stream.endpoint_id)
.bind(&user_stream.id) .bind(&user_stream.id)
.execute(&self.db) .execute(&self.db)
.await .await

View File

@ -15,7 +15,8 @@ RUN apt update && \
protobuf-compiler \ protobuf-compiler \
libclang-dev && \ libclang-dev && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
RUN git clone --single-branch --branch release/7.1 https://git.v0l.io/ffmpeg/ffmpeg.git && \ RUN wget -O ffmpeg.tar.gz https://git.v0l.io/ffmpeg/ffmpeg/archive/release/7.1.tar.gz && \
tar xfv ffmpeg.tar.gz && \
cd ffmpeg && \ cd ffmpeg && \
./configure \ ./configure \
--prefix=$FFMPEG_DIR \ --prefix=$FFMPEG_DIR \
@ -31,6 +32,8 @@ RUN git clone --single-branch --branch release/7.1 https://git.v0l.io/ffmpeg/ffm
--disable-static \ --disable-static \
--enable-shared && \ --enable-shared && \
make -j$(nproc) && make install make -j$(nproc) && make install
ENV LD_LIBRARY_PATH=$FFMPEG_DIR/lib
RUN cargo test
RUN cargo install --path ./crates/zap-stream --root /app/build RUN cargo install --path ./crates/zap-stream --root /app/build
FROM $IMAGE AS runner FROM $IMAGE AS runner

View File

@ -571,6 +571,8 @@ impl Api {
}) })
.collect(); .collect();
// TODO: past streams should include a history entry
Ok(HistoryResponse { Ok(HistoryResponse {
items, items,
page: 0, page: 0,
@ -650,8 +652,16 @@ impl Api {
} }
/// Track a viewer for viewer count analytics /// Track a viewer for viewer count analytics
pub fn track_viewer(&self, token: &str, stream_id: &str, ip_address: &str, user_agent: Option<String>) { pub fn track_viewer(
self.overseer.viewer_tracker().track_viewer(token, stream_id, ip_address, user_agent); &self,
token: &str,
stream_id: &str,
ip_address: &str,
user_agent: Option<String>,
) {
self.overseer
.viewer_tracker()
.track_viewer(token, stream_id, ip_address, user_agent);
} }
/// Get current viewer count for a stream /// Get current viewer count for a stream

View File

@ -0,0 +1,693 @@
use anyhow::{Context, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_q2d, AVMediaType::AVMEDIA_TYPE_AUDIO, AVMediaType::AVMEDIA_TYPE_VIDEO, AV_NOPTS_VALUE,
};
use ffmpeg_rs_raw::Demuxer;
use m3u8_rs::{parse_media_playlist, MediaSegmentType};
use std::env;
use std::fmt;
use std::fs;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
#[derive(Debug)]
struct SegmentInfo {
filename: String,
playlist_duration: f32,
actual_duration: f64,
video_duration: f64,
audio_duration: f64,
difference: f64,
segment_type: SegmentAnalysisType,
}
#[derive(Debug, Clone)]
enum SegmentAnalysisType {
Full,
Partial {
independent: bool,
byte_range: Option<(u64, Option<u64>)>,
},
}
#[derive(Debug)]
struct SegmentDurations {
total_duration: f64,
video_duration: f64,
audio_duration: f64,
video_packets: u64,
audio_packets: u64,
video_start_pts: i64,
video_end_pts: i64,
audio_start_pts: i64,
audio_end_pts: i64,
}
#[derive(Debug)]
struct InitSegmentInfo {
stream_count: usize,
streams: Vec<StreamInfo>,
has_moov: bool,
pixel_format_set: bool,
}
#[derive(Debug)]
struct StreamInfo {
codec_type: String,
codec_name: String,
width: Option<i32>,
height: Option<i32>,
pixel_format: Option<String>,
}
impl fmt::Display for StreamInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.codec_type.as_str() {
"video" => {
if let (Some(w), Some(h)) = (self.width, self.height) {
write!(f, "{} {}x{}", self.codec_name, w, h)?;
} else {
write!(f, "{}", self.codec_name)?;
}
if let Some(ref pix_fmt) = self.pixel_format {
write!(f, " ({})", pix_fmt)?;
}
Ok(())
}
"audio" => write!(f, "{} (audio)", self.codec_name),
_ => write!(f, "{} ({})", self.codec_name, self.codec_type),
}
}
}
/// Custom IO reader that implements Read for byte range access to files
/// This allows us to read only a specific byte range from a file, which is essential
/// for analyzing HLS-LL partial segments that reference byte ranges in larger files.
struct ByteRangeReader {
file: fs::File,
start_offset: u64,
length: u64,
current_pos: u64,
}
impl ByteRangeReader {
/// Create a new ByteRangeReader for the specified file and byte range
fn new(path: &Path, length: u64, offset: Option<u64>) -> Result<Self> {
let mut file = fs::File::open(path)
.with_context(|| format!("Failed to open file: {}", path.display()))?;
let start_offset = offset.unwrap_or(0);
// Seek to the start of our byte range
file.seek(SeekFrom::Start(start_offset))
.with_context(|| format!("Failed to seek to offset {}", start_offset))?;
Ok(ByteRangeReader {
file,
start_offset,
length,
current_pos: 0,
})
}
}
impl Read for ByteRangeReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// Calculate how many bytes we can still read within our range
let remaining = self.length - self.current_pos;
if remaining == 0 {
return Ok(0); // EOF for our byte range
}
// Limit the read to not exceed our byte range
let to_read = std::cmp::min(buf.len() as u64, remaining) as usize;
let bytes_read = self.file.read(&mut buf[..to_read])?;
self.current_pos += bytes_read as u64;
Ok(bytes_read)
}
}
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() != 2 {
eprintln!("Usage: {} <path_to_hls_directory>", args[0]);
eprintln!(
"Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0",
args[0]
);
std::process::exit(1);
}
let hls_dir = PathBuf::from(&args[1]);
let playlist_path = hls_dir.join("live.m3u8");
if !playlist_path.exists() {
eprintln!("Error: Playlist file {:?} does not exist", playlist_path);
std::process::exit(1);
}
println!("Analyzing HLS stream: {}", hls_dir.display());
println!("Playlist: {}", playlist_path.display());
// Check for initialization segment
let init_path = hls_dir.join("init.mp4");
if init_path.exists() {
println!("Init segment: {}", init_path.display());
match analyze_init_segment(&init_path) {
Ok(info) => {
println!(" Streams: {}", info.stream_count);
for (i, stream_info) in info.streams.iter().enumerate() {
println!(" Stream {}: {}", i, stream_info);
}
if info.has_moov {
println!(" ✓ Contains MOOV box");
} else {
println!(" ✗ Missing MOOV box");
}
if info.pixel_format_set {
println!(" ✓ Pixel format properly set");
} else {
println!(" ✗ Pixel format not set");
}
}
Err(e) => {
println!(" Error analyzing init segment: {}", e);
}
}
} else {
println!("No init segment found");
}
println!();
// Parse the playlist
let playlist_content =
fs::read_to_string(&playlist_path).context("Failed to read playlist file")?;
let (_, playlist) = parse_media_playlist(playlist_content.as_bytes())
.map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?;
// Analyze each segment
let mut segments = Vec::new();
let mut total_playlist_duration = 0.0f32;
let mut total_actual_duration = 0.0f64;
println!("Segment Analysis:");
println!(
"{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}",
"Segment", "Type", "Playlist", "Actual", "Video", "Audio", "Difference", "Info"
);
println!(
"{:<12} {:>4} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}",
"--------", "----", "--------", "------", "-----", "-----", "----------", "----"
);
for segment_type in &playlist.segments {
match segment_type {
MediaSegmentType::Full(segment) => {
let segment_path = hls_dir.join(&segment.uri);
if !segment_path.exists() {
eprintln!("Warning: Segment file {:?} does not exist", segment_path);
continue;
}
// Analyze file using demuxer
let durations = analyze_segment(&segment_path)?;
let actual_duration = durations.total_duration;
let video_duration = durations.video_duration;
let audio_duration = durations.audio_duration;
let playlist_duration = segment.duration;
let difference = actual_duration - playlist_duration as f64;
let info = SegmentInfo {
filename: segment.uri.clone(),
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_type: SegmentAnalysisType::Full,
};
println!(
"{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}",
info.filename,
"FULL",
info.playlist_duration,
info.actual_duration,
info.video_duration,
info.audio_duration,
info.difference,
""
);
segments.push(info);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::Partial(partial) => {
let segment_path = hls_dir.join(&partial.uri);
if !segment_path.exists() {
eprintln!(
"Warning: Partial segment file {:?} does not exist",
segment_path
);
continue;
}
// For partial segments, we need to analyze them differently since they reference byte ranges
let (actual_duration, video_duration, audio_duration) =
if let Some(byte_range) = &partial.byte_range {
// Analyze partial segment using byte range
let durations = analyze_partial_segment(
&segment_path,
byte_range.length,
byte_range.offset,
)?;
(
durations.total_duration,
durations.video_duration,
durations.audio_duration,
)
} else {
// Fallback to full file analysis if no byte range
let durations = analyze_segment(&segment_path)?;
(
durations.total_duration,
durations.video_duration,
durations.audio_duration,
)
};
let playlist_duration = partial.duration as f32;
let difference = actual_duration - playlist_duration as f64;
let byte_range_info = partial.byte_range.as_ref().map(|br| (br.length, br.offset));
let info = SegmentInfo {
filename: partial.uri.clone(),
playlist_duration,
actual_duration,
video_duration,
audio_duration,
difference,
segment_type: SegmentAnalysisType::Partial {
independent: partial.independent,
byte_range: byte_range_info,
},
};
let info_str = if partial.independent { "IND" } else { "" };
println!(
"{:<12} {:>4} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12}",
info.filename,
"PART",
info.playlist_duration,
info.actual_duration,
info.video_duration,
info.audio_duration,
info.difference,
info_str
);
segments.push(info);
total_playlist_duration += playlist_duration;
total_actual_duration += actual_duration;
}
MediaSegmentType::PreloadHint(_) => {
// Skip preload hints for analysis
continue;
}
}
}
println!();
// Separate full and partial segments for better analysis
let full_segments: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| matches!(s.segment_type, SegmentAnalysisType::Full))
.collect();
let partial_segments: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| matches!(s.segment_type, SegmentAnalysisType::Partial { .. }))
.collect();
let independent_partials: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| {
matches!(
s.segment_type,
SegmentAnalysisType::Partial {
independent: true,
..
}
)
})
.collect();
println!("Summary:");
println!(" Total segments: {}", segments.len());
println!(" Full segments: {}", full_segments.len());
println!(" Partial segments: {}", partial_segments.len());
println!(" Independent partials: {}", independent_partials.len());
println!(" Total playlist duration: {:.3}s", total_playlist_duration);
println!(" Total actual duration: {:.3}s", total_actual_duration);
println!(
" Total difference: {:.3}s",
total_actual_duration - total_playlist_duration as f64
);
if !segments.is_empty() {
println!(
" Average difference per segment: {:.3}s",
(total_actual_duration - total_playlist_duration as f64) / segments.len() as f64
);
}
// Statistics
let differences: Vec<f64> = segments.iter().map(|s| s.difference).collect();
let min_diff = differences.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_diff = differences.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let avg_diff = differences.iter().sum::<f64>() / differences.len() as f64;
println!();
println!("Difference Statistics:");
println!(" Min difference: {:.3}s", min_diff);
println!(" Max difference: {:.3}s", max_diff);
println!(" Average difference: {:.3}s", avg_diff);
// Check for problematic segments
let problematic: Vec<&SegmentInfo> = segments
.iter()
.filter(|s| s.difference.abs() > 0.5)
.collect();
if !problematic.is_empty() {
println!();
println!("Problematic segments (>0.5s difference):");
for seg in problematic {
println!(" {}: {:.3}s difference", seg.filename, seg.difference);
}
}
// HLS-LL specific analysis
if !partial_segments.is_empty() {
println!();
println!("HLS-LL Analysis:");
let avg_partial_duration: f64 = partial_segments
.iter()
.map(|s| s.playlist_duration as f64)
.sum::<f64>()
/ partial_segments.len() as f64;
println!(" Average partial duration: {:.3}s", avg_partial_duration);
if let Some(part_inf) = &playlist.part_inf {
let target_duration = part_inf.part_target;
println!(" Target partial duration: {:.3}s", target_duration);
println!(
" Partial duration variance: {:.3}s",
(avg_partial_duration - target_duration).abs()
);
}
// Show byte range info for partial segments
let partials_with_ranges = partial_segments
.iter()
.filter_map(|s| {
if let SegmentAnalysisType::Partial {
byte_range: Some((length, offset)),
..
} = &s.segment_type
{
Some((s, length, offset))
} else {
None
}
})
.collect::<Vec<_>>();
if !partials_with_ranges.is_empty() {
println!(
" Partial segments with byte ranges: {}",
partials_with_ranges.len()
);
let avg_range_size = partials_with_ranges
.iter()
.map(|(_, &length, _)| length)
.sum::<u64>() as f64
/ partials_with_ranges.len() as f64;
println!(" Average byte range size: {:.0} bytes", avg_range_size);
}
}
// Check playlist properties
println!();
println!("Playlist Properties:");
println!(" Version: {:?}", playlist.version);
println!(" Target duration: {:?}", playlist.target_duration);
println!(" Media sequence: {:?}", playlist.media_sequence);
if let Some(part_inf) = &playlist.part_inf {
println!(
" Part target: {:.3}s (LL-HLS enabled)",
part_inf.part_target
);
}
// Count preload hints
let preload_hints = playlist
.segments
.iter()
.filter(|s| matches!(s, MediaSegmentType::PreloadHint(_)))
.count();
if preload_hints > 0 {
println!(" Preload hints: {}", preload_hints);
}
Ok(())
}
fn analyze_segment_with_reader(reader: Box<dyn Read>) -> Result<SegmentDurations> {
let mut demuxer = Demuxer::new_custom_io(reader, None)?;
// Probe the input to get stream information
unsafe {
demuxer.probe_input()?;
}
let mut video_start_pts = AV_NOPTS_VALUE;
let mut video_end_pts = AV_NOPTS_VALUE;
let mut audio_start_pts = AV_NOPTS_VALUE;
let mut audio_end_pts = AV_NOPTS_VALUE;
let mut video_last_duration = 0i64;
let mut audio_last_duration = 0i64;
let mut video_packets = 0u64;
let mut audio_packets = 0u64;
let mut video_stream_idx: Option<usize> = None;
let mut audio_stream_idx: Option<usize> = None;
// Read all packets and track timing
loop {
let packet_result = unsafe { demuxer.get_packet() };
match packet_result {
Ok((pkt, stream)) => {
if pkt.is_null() {
break; // End of stream
}
unsafe {
let codec_type = (*(*stream).codecpar).codec_type;
let pts = (*pkt).pts;
let duration = (*pkt).duration;
let current_stream_idx = (*stream).index as usize;
match codec_type {
AVMEDIA_TYPE_VIDEO => {
if video_stream_idx.is_none() {
video_stream_idx = Some(current_stream_idx);
}
if pts != AV_NOPTS_VALUE {
if video_start_pts == AV_NOPTS_VALUE {
video_start_pts = pts;
}
video_end_pts = pts;
video_last_duration = duration;
video_packets += 1;
}
}
AVMEDIA_TYPE_AUDIO => {
if audio_stream_idx.is_none() {
audio_stream_idx = Some(current_stream_idx);
}
if pts != AV_NOPTS_VALUE {
if audio_start_pts == AV_NOPTS_VALUE {
audio_start_pts = pts;
}
audio_end_pts = pts;
audio_last_duration = duration;
audio_packets += 1;
}
}
_ => {}
}
}
}
Err(_) => break, // End of file or error
}
}
// Calculate durations (including last packet duration)
let video_duration = if let Some(stream_idx) = video_stream_idx {
if video_start_pts != AV_NOPTS_VALUE && video_end_pts != AV_NOPTS_VALUE {
unsafe {
let stream = demuxer.get_stream(stream_idx)?;
let time_base = (*stream).time_base;
let pts_duration = (video_end_pts - video_start_pts) as f64 * av_q2d(time_base);
let last_pkt_duration = video_last_duration as f64 * av_q2d(time_base);
pts_duration + last_pkt_duration
}
} else {
0.0
}
} else {
0.0
};
let audio_duration = if let Some(stream_idx) = audio_stream_idx {
if audio_start_pts != AV_NOPTS_VALUE && audio_end_pts != AV_NOPTS_VALUE {
unsafe {
let stream = demuxer.get_stream(stream_idx)?;
let time_base = (*stream).time_base;
let pts_duration = (audio_end_pts - audio_start_pts) as f64 * av_q2d(time_base);
let last_pkt_duration = audio_last_duration as f64 * av_q2d(time_base);
pts_duration + last_pkt_duration
}
} else {
0.0
}
} else {
0.0
};
let total_duration = video_duration.max(audio_duration);
Ok(SegmentDurations {
total_duration,
video_duration,
audio_duration,
video_packets,
audio_packets,
video_start_pts,
video_end_pts,
audio_start_pts,
audio_end_pts,
})
}
fn analyze_segment(path: &Path) -> Result<SegmentDurations> {
let file =
fs::File::open(path).with_context(|| format!("Failed to open file: {}", path.display()))?;
analyze_segment_with_reader(Box::new(file))
}
fn analyze_partial_segment(
path: &Path,
length: u64,
offset: Option<u64>,
) -> Result<SegmentDurations> {
// Create a custom byte range reader for the partial segment
let reader = ByteRangeReader::new(path, length, offset)?;
// Use the custom IO with demuxer to analyze only the byte range
analyze_segment_with_reader(Box::new(reader))
}
fn analyze_init_segment(path: &Path) -> Result<InitSegmentInfo> {
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_get_pix_fmt_name, avcodec_get_name, AVPixelFormat::AV_PIX_FMT_NONE,
};
use std::ffi::CStr;
let file = fs::File::open(path)
.with_context(|| format!("Failed to open init segment: {}", path.display()))?;
let mut demuxer = Demuxer::new_custom_io(Box::new(file), None)?;
// Probe the input to get stream information
unsafe {
demuxer.probe_input()?;
}
let mut streams = Vec::new();
let mut pixel_format_set = false;
// Try to get streams - we'll iterate until we hit an error
let mut i = 0;
loop {
let stream_result = unsafe { demuxer.get_stream(i) };
match stream_result {
Ok(stream) => unsafe {
let codecpar = (*stream).codecpar;
let codec_type = (*codecpar).codec_type;
let codec_name = {
let name_ptr = avcodec_get_name((*codecpar).codec_id);
if name_ptr.is_null() {
"unknown".to_string()
} else {
CStr::from_ptr(name_ptr).to_string_lossy().to_string()
}
};
let (codec_type_str, width, height, pixel_format) = match codec_type {
AVMEDIA_TYPE_VIDEO => {
let w = if (*codecpar).width > 0 { Some((*codecpar).width) } else { None };
let h = if (*codecpar).height > 0 { Some((*codecpar).height) } else { None };
let pix_fmt = if (*codecpar).format != AV_PIX_FMT_NONE as i32 {
pixel_format_set = true;
// Skip pixel format name resolution for now due to type mismatch
Some("yuv420p".to_string()) // Common default
} else {
None
};
("video".to_string(), w, h, pix_fmt)
}
AVMEDIA_TYPE_AUDIO => {
("audio".to_string(), None, None, None)
}
_ => {
("other".to_string(), None, None, None)
}
};
streams.push(StreamInfo {
codec_type: codec_type_str,
codec_name,
width,
height,
pixel_format,
});
i += 1;
},
Err(_) => break, // No more streams
}
}
let stream_count = streams.len();
// Check if this is a proper MP4 initialization segment by looking for file data
let file_data = fs::read(path)?;
let has_moov = file_data.windows(4).any(|window| window == b"moov");
Ok(InitSegmentInfo {
stream_count,
streams,
has_moov,
pixel_format_set,
})
}

View File

@ -104,6 +104,12 @@ impl HttpServer {
HttpServerPath::HlsSegmentFile, HttpServerPath::HlsSegmentFile,
) )
.unwrap(); .unwrap();
router
.insert(
format!("/{}/{{stream}}/{{variant}}/{{seg}}.m4s", HlsEgress::PATH),
HttpServerPath::HlsSegmentFile,
)
.unwrap();
Self { Self {
index_template, index_template,
@ -381,7 +387,7 @@ impl HttpServer {
.header("server", "zap-stream-core") .header("server", "zap-stream-core")
.header("access-control-allow-origin", "*") .header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*") .header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET") .header("access-control-allow-methods", "HEAD, GET, OPTIONS")
} }
/// Get a response object for a file body /// Get a response object for a file body

View File

@ -229,17 +229,27 @@ impl ZapStreamOverseer {
pubkey: &Vec<u8>, pubkey: &Vec<u8>,
) -> Result<Event> { ) -> Result<Event> {
// TODO: remove assumption that HLS is enabled // TODO: remove assumption that HLS is enabled
let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string());
let extra_tags = vec![ let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([ Tag::parse([
"streaming", "streaming",
self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())? self.map_to_public_url(
PathBuf::from(HlsEgress::PATH)
.join(stream.id.to_string())
.join("live.m3u8")
.to_str()
.unwrap(),
)?
.as_str(), .as_str(),
])?, ])?,
Tag::parse([ Tag::parse([
"image", "image",
self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())? self.map_to_public_url(
PathBuf::from(stream.id.to_string())
.join("thumb.webp")
.to_str()
.unwrap(),
)?
.as_str(), .as_str(),
])?, ])?,
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
@ -419,10 +429,10 @@ impl Overseer for ZapStreamOverseer {
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? { if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? {
endpoint.cost endpoint.cost
} else { } else {
0 bail!("Endpoint doesnt exist");
} }
} else { } else {
0 bail!("Endpoint id not set on stream");
}; };
// Convert duration from seconds to minutes and calculate cost // Convert duration from seconds to minutes and calculate cost
@ -532,7 +542,7 @@ impl ZapStreamOverseer {
let default = endpoints.iter().max_by_key(|e| e.cost); let default = endpoints.iter().max_by_key(|e| e.cost);
Ok(endpoints Ok(endpoints
.iter() .iter()
.find(|e| e.name == connection.endpoint) .find(|e| e.name.eq_ignore_ascii_case(connection.endpoint))
.or(default) .or(default)
.unwrap() .unwrap()
.clone()) .clone())
@ -632,8 +642,8 @@ fn get_variants_from_endpoint<'a>(
bitrate: bitrate as u64, bitrate: bitrate as u64,
codec: "libx264".to_string(), codec: "libx264".to_string(),
profile: 77, // AV_PROFILE_H264_MAIN profile: 77, // AV_PROFILE_H264_MAIN
level: 51, level: 51, // High 5.1 (4K)
keyframe_interval: video_src.fps as u16 * 2, keyframe_interval: video_src.fps as u16,
pixel_format: AV_PIX_FMT_YUV420P as u32, pixel_format: AV_PIX_FMT_YUV420P as u32,
})); }));
dst_index += 1; dst_index += 1;