mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 08:59:35 +00:00
Compare commits
5 Commits
ca70bf964c
...
main
Author | SHA1 | Date | |
---|---|---|---|
e7e1f0299d
|
|||
338d351727
|
|||
047b3fec59
|
|||
fee5e77407
|
|||
d88f829645
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -2153,7 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "m3u8-rs"
|
name = "m3u8-rs"
|
||||||
version = "6.0.0"
|
version = "6.0.0"
|
||||||
source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168"
|
source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f#5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"nom",
|
"nom",
|
||||||
|
@ -24,6 +24,6 @@ url = "2.5.0"
|
|||||||
itertools = "0.14.0"
|
itertools = "0.14.0"
|
||||||
chrono = { version = "^0.4.38", features = ["serde"] }
|
chrono = { version = "^0.4.38", features = ["serde"] }
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" }
|
m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" }
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
data-encoding = "2.9.0"
|
data-encoding = "2.9.0"
|
@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result};
|
|||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||||
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size,
|
av_free, av_interleaved_write_frame, av_opt_set, av_q2d, avio_close, avio_flush, avio_open,
|
||||||
AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
|
avio_size, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
|
||||||
};
|
};
|
||||||
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
|
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::{info, trace, warn};
|
use log::{info, trace, warn};
|
||||||
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf};
|
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -89,20 +89,26 @@ pub struct HlsVariant {
|
|||||||
segments: Vec<HlsSegment>,
|
segments: Vec<HlsSegment>,
|
||||||
/// Type of segments to create
|
/// Type of segments to create
|
||||||
segment_type: SegmentType,
|
segment_type: SegmentType,
|
||||||
/// Ending presentation timestamp
|
/// Timestamp of the previous packet
|
||||||
end_pts: i64,
|
last_pkt_pts: i64,
|
||||||
|
/// Timestamp of the start of the current segment
|
||||||
|
current_segment_start: f64,
|
||||||
/// Current segment duration in seconds (precise accumulation)
|
/// Current segment duration in seconds (precise accumulation)
|
||||||
duration: f64,
|
duration: f64,
|
||||||
/// Number of packets written to current segment
|
/// Number of packets written to current segment
|
||||||
packets_written: u64,
|
packets_written: u64,
|
||||||
/// Reference stream used to track duration
|
/// Reference stream used to track duration
|
||||||
ref_stream_index: i32,
|
ref_stream_index: i32,
|
||||||
|
/// HLS-LL: Enable LL-output
|
||||||
|
low_latency: bool,
|
||||||
/// LL-HLS: Target duration for partial segments
|
/// LL-HLS: Target duration for partial segments
|
||||||
partial_target_duration: f32,
|
partial_target_duration: f32,
|
||||||
/// HLS-LL: Current partial index
|
/// HLS-LL: Current partial index
|
||||||
current_partial_index: u64,
|
current_partial_index: u64,
|
||||||
/// HLS-LL: Current duration in this partial
|
/// HLS-LL: Current duration in this partial
|
||||||
current_partial_duration: f64,
|
current_partial_duration: f64,
|
||||||
|
/// HLS-LL: Whether the next partial segment should be marked as independent
|
||||||
|
next_partial_independent: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq)]
|
#[derive(PartialEq)]
|
||||||
@ -114,8 +120,8 @@ enum HlsSegment {
|
|||||||
impl HlsSegment {
|
impl HlsSegment {
|
||||||
fn to_media_segment(&self) -> MediaSegmentType {
|
fn to_media_segment(&self) -> MediaSegmentType {
|
||||||
match self {
|
match self {
|
||||||
HlsSegment::Full(s) => s.to_media_segment(),
|
HlsSegment::Full(f) => f.to_media_segment(),
|
||||||
HlsSegment::Partial(s) => s.to_media_segment(),
|
HlsSegment::Partial(p) => p.to_media_segment(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,6 +174,13 @@ impl PartialSegmentInfo {
|
|||||||
fn filename(&self) -> String {
|
fn filename(&self) -> String {
|
||||||
HlsVariant::segment_name(self.parent_kind, self.parent_index)
|
HlsVariant::segment_name(self.parent_kind, self.parent_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Byte offset where this partial segment ends
|
||||||
|
fn end_pos(&self) -> Option<u64> {
|
||||||
|
self.byte_range
|
||||||
|
.as_ref()
|
||||||
|
.map(|(len, start)| start.unwrap_or(0) + len)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HlsVariant {
|
impl HlsVariant {
|
||||||
@ -264,13 +277,16 @@ impl HlsVariant {
|
|||||||
segments: Vec::new(),
|
segments: Vec::new(),
|
||||||
out_dir: out_dir.to_string(),
|
out_dir: out_dir.to_string(),
|
||||||
segment_type,
|
segment_type,
|
||||||
end_pts: AV_NOPTS_VALUE,
|
last_pkt_pts: AV_NOPTS_VALUE,
|
||||||
duration: 0.0,
|
duration: 0.0,
|
||||||
packets_written: 0,
|
packets_written: 0,
|
||||||
ref_stream_index,
|
ref_stream_index,
|
||||||
partial_target_duration: 0.33,
|
partial_target_duration: 0.33,
|
||||||
current_partial_index: 0,
|
current_partial_index: 0,
|
||||||
current_partial_duration: 0.0,
|
current_partial_duration: 0.0,
|
||||||
|
current_segment_start: 0.0,
|
||||||
|
next_partial_independent: false,
|
||||||
|
low_latency: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,6 +315,7 @@ impl HlsVariant {
|
|||||||
.streams
|
.streams
|
||||||
.add((*pkt).stream_index as usize);
|
.add((*pkt).stream_index as usize);
|
||||||
|
|
||||||
|
let pkt_q = av_q2d((*pkt).time_base);
|
||||||
let mut result = EgressResult::None;
|
let mut result = EgressResult::None;
|
||||||
let stream_type = (*(*pkt_stream).codecpar).codec_type;
|
let stream_type = (*(*pkt_stream).codecpar).codec_type;
|
||||||
let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO
|
let mut can_split = stream_type == AVMEDIA_TYPE_VIDEO
|
||||||
@ -311,34 +328,44 @@ impl HlsVariant {
|
|||||||
is_ref_pkt = false;
|
is_ref_pkt = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HLS-LL: write prev partial segment
|
||||||
|
if self.low_latency && self.current_partial_duration >= self.partial_target_duration as f64
|
||||||
|
{
|
||||||
|
self.create_partial_segment()?;
|
||||||
|
|
||||||
|
// HLS-LL: Mark next partial as independent if this packet is a keyframe
|
||||||
|
if can_split {
|
||||||
|
self.next_partial_independent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
// check if current packet is keyframe, flush current segment
|
// check if current packet is keyframe, flush current segment
|
||||||
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
|
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
|
||||||
result = self.split_next_seg()?;
|
result = self.split_next_seg((*pkt).pts as f64 * pkt_q)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// track duration from pts
|
// track duration from pts
|
||||||
if is_ref_pkt {
|
if is_ref_pkt {
|
||||||
if self.end_pts == AV_NOPTS_VALUE {
|
if self.last_pkt_pts == AV_NOPTS_VALUE {
|
||||||
self.end_pts = (*pkt).pts;
|
self.last_pkt_pts = (*pkt).pts;
|
||||||
}
|
}
|
||||||
let pts_diff = (*pkt).pts - self.end_pts;
|
let time_delta = if (*pkt).duration != 0 {
|
||||||
if pts_diff > 0 {
|
(*pkt).duration as f64 * pkt_q
|
||||||
let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base);
|
} else {
|
||||||
|
((*pkt).pts - self.last_pkt_pts) as f64 * pkt_q
|
||||||
|
};
|
||||||
|
if time_delta > 0.0 {
|
||||||
self.duration += time_delta;
|
self.duration += time_delta;
|
||||||
self.current_partial_duration += time_delta;
|
if self.low_latency {
|
||||||
|
self.current_partial_duration += time_delta;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.end_pts = (*pkt).pts;
|
self.last_pkt_pts = (*pkt).pts;
|
||||||
}
|
}
|
||||||
|
|
||||||
// write to current segment
|
// write to current segment
|
||||||
self.mux.write_packet(pkt)?;
|
self.mux.write_packet(pkt)?;
|
||||||
self.packets_written += 1;
|
self.packets_written += 1;
|
||||||
|
|
||||||
// HLS-LL: write next partial segment
|
|
||||||
if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 {
|
|
||||||
self.create_partial_segment(can_split)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,27 +374,30 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create a partial segment for LL-HLS
|
/// Create a partial segment for LL-HLS
|
||||||
fn create_partial_segment(&mut self, independent: bool) -> Result<()> {
|
fn create_partial_segment(&mut self) -> Result<()> {
|
||||||
let ctx = self.mux.context();
|
let ctx = self.mux.context();
|
||||||
let pos = unsafe {
|
let end_pos = unsafe {
|
||||||
avio_flush((*ctx).pb);
|
avio_flush((*ctx).pb);
|
||||||
avio_size((*ctx).pb) as u64
|
avio_size((*ctx).pb) as u64
|
||||||
};
|
};
|
||||||
|
|
||||||
let previous_partial_end = self.segments.last().and_then(|s| match &s {
|
let previous_end_pos = self
|
||||||
HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len),
|
.segments
|
||||||
_ => None,
|
.last()
|
||||||
});
|
.and_then(|s| match &s {
|
||||||
|
HlsSegment::Partial(p) => p.end_pos(),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.unwrap_or(0);
|
||||||
|
let independent = self.next_partial_independent;
|
||||||
|
let partial_size = end_pos - previous_end_pos;
|
||||||
let partial_info = PartialSegmentInfo {
|
let partial_info = PartialSegmentInfo {
|
||||||
index: self.current_partial_index,
|
index: self.current_partial_index,
|
||||||
parent_index: self.idx,
|
parent_index: self.idx,
|
||||||
parent_kind: self.segment_type,
|
parent_kind: self.segment_type,
|
||||||
duration: self.current_partial_duration,
|
duration: self.current_partial_duration,
|
||||||
independent,
|
independent,
|
||||||
byte_range: match previous_partial_end {
|
byte_range: Some((partial_size, Some(previous_end_pos))),
|
||||||
Some(prev_end) => Some((pos - prev_end, Some(prev_end))),
|
|
||||||
_ => Some((pos, Some(0))),
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
@ -380,6 +410,7 @@ impl HlsVariant {
|
|||||||
self.segments.push(HlsSegment::Partial(partial_info));
|
self.segments.push(HlsSegment::Partial(partial_info));
|
||||||
self.current_partial_index += 1;
|
self.current_partial_index += 1;
|
||||||
self.current_partial_duration = 0.0;
|
self.current_partial_duration = 0.0;
|
||||||
|
self.next_partial_independent = false;
|
||||||
|
|
||||||
self.write_playlist()?;
|
self.write_playlist()?;
|
||||||
|
|
||||||
@ -387,13 +418,16 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reset the muxer state and start the next segment
|
/// Reset the muxer state and start the next segment
|
||||||
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> {
|
unsafe fn split_next_seg(&mut self, next_pkt_start: f64) -> Result<EgressResult> {
|
||||||
let completed_segment_idx = self.idx;
|
let completed_segment_idx = self.idx;
|
||||||
self.idx += 1;
|
self.idx += 1;
|
||||||
|
|
||||||
// Manually reset muxer avio
|
// Manually reset muxer avio
|
||||||
let ctx = self.mux.context();
|
let ctx = self.mux.context();
|
||||||
av_write_frame(ctx, ptr::null_mut());
|
let ret = av_interleaved_write_frame(ctx, ptr::null_mut());
|
||||||
|
if ret < 0 {
|
||||||
|
bail!("Failed to split segment {}", ret);
|
||||||
|
}
|
||||||
avio_flush((*ctx).pb);
|
avio_flush((*ctx).pb);
|
||||||
avio_close((*ctx).pb);
|
avio_close((*ctx).pb);
|
||||||
av_free((*ctx).url as *mut _);
|
av_free((*ctx).url as *mut _);
|
||||||
@ -427,13 +461,15 @@ impl HlsVariant {
|
|||||||
.metadata()
|
.metadata()
|
||||||
.map(|m| m.len())
|
.map(|m| m.len())
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let cur_duration = next_pkt_start - self.current_segment_start;
|
||||||
info!(
|
info!(
|
||||||
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
|
"Finished segment {} [{:.3}s, {:.2} kB, {} pkts]",
|
||||||
completed_segment_path
|
completed_segment_path
|
||||||
.file_name()
|
.file_name()
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string_lossy(),
|
.to_string_lossy(),
|
||||||
self.duration,
|
cur_duration,
|
||||||
segment_size as f32 / 1024f32,
|
segment_size as f32 / 1024f32,
|
||||||
self.packets_written
|
self.packets_written
|
||||||
);
|
);
|
||||||
@ -465,17 +501,22 @@ impl HlsVariant {
|
|||||||
let created = EgressSegment {
|
let created = EgressSegment {
|
||||||
variant: video_var_id,
|
variant: video_var_id,
|
||||||
idx: completed_segment_idx,
|
idx: completed_segment_idx,
|
||||||
duration: self.duration as f32,
|
duration: cur_duration as f32,
|
||||||
path: completed_segment_path,
|
path: completed_segment_path,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = self.push_segment(completed_segment_idx, self.duration as f32) {
|
self.segments.push(HlsSegment::Full(SegmentInfo {
|
||||||
warn!("Failed to update playlist: {}", e);
|
index: completed_segment_idx,
|
||||||
}
|
duration: cur_duration as f32,
|
||||||
|
kind: self.segment_type,
|
||||||
|
}));
|
||||||
|
|
||||||
|
self.write_playlist()?;
|
||||||
|
|
||||||
// Reset counters for next segment
|
// Reset counters for next segment
|
||||||
self.packets_written = 0;
|
self.packets_written = 0;
|
||||||
self.duration = 0.0;
|
self.duration = 0.0;
|
||||||
|
self.current_segment_start = next_pkt_start;
|
||||||
|
|
||||||
Ok(EgressResult::Segments {
|
Ok(EgressResult::Segments {
|
||||||
created: vec![created],
|
created: vec![created],
|
||||||
@ -489,17 +530,6 @@ impl HlsVariant {
|
|||||||
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
|
.find(|a| matches!(*a, HlsVariantStream::Video { .. }))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a new segment to the variant and return a list of deleted segments
|
|
||||||
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
|
||||||
self.segments.push(HlsSegment::Full(SegmentInfo {
|
|
||||||
index: idx,
|
|
||||||
duration,
|
|
||||||
kind: self.segment_type,
|
|
||||||
}));
|
|
||||||
|
|
||||||
self.write_playlist()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Delete segments which are too old
|
/// Delete segments which are too old
|
||||||
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
||||||
let drain_from_hls_segment = {
|
let drain_from_hls_segment = {
|
||||||
@ -558,10 +588,23 @@ impl HlsVariant {
|
|||||||
let mut pl = m3u8_rs::MediaPlaylist::default();
|
let mut pl = m3u8_rs::MediaPlaylist::default();
|
||||||
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
|
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
|
||||||
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
||||||
pl.version = Some(6);
|
|
||||||
pl.part_inf = Some(PartInf {
|
// append segment preload for next part segment
|
||||||
part_target: self.partial_target_duration as f64,
|
if let Some(HlsSegment::Partial(partial)) = self.segments.last() {
|
||||||
});
|
// TODO: try to estimate if there will be another partial segment
|
||||||
|
pl.segments.push(MediaSegmentType::PreloadHint(PreloadHint {
|
||||||
|
hint_type: "PART".to_string(),
|
||||||
|
uri: partial.filename(),
|
||||||
|
byte_range_start: partial.end_pos(),
|
||||||
|
byte_range_length: None,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
pl.version = Some(if self.low_latency { 6 } else { 3 });
|
||||||
|
if self.low_latency {
|
||||||
|
pl.part_inf = Some(PartInf {
|
||||||
|
part_target: self.partial_target_duration as f64,
|
||||||
|
});
|
||||||
|
}
|
||||||
pl.media_sequence = self
|
pl.media_sequence = self
|
||||||
.segments
|
.segments
|
||||||
.iter()
|
.iter()
|
||||||
@ -570,7 +613,6 @@ impl HlsVariant {
|
|||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.unwrap_or(self.idx);
|
.unwrap_or(self.idx);
|
||||||
// For live streams, don't set end list
|
|
||||||
pl.end_list = false;
|
pl.end_list = false;
|
||||||
|
|
||||||
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
|
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
|
||||||
|
@ -208,13 +208,15 @@ impl PipelineRunner {
|
|||||||
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
|
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
|
||||||
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
|
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
|
||||||
let frame = av_frame_clone(frame).addr();
|
let frame = av_frame_clone(frame).addr();
|
||||||
let dst_pic = PathBuf::from(&self.out_dir)
|
let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string());
|
||||||
.join(self.connection.id.to_string())
|
if !dir.exists() {
|
||||||
.join("thumb.webp");
|
std::fs::create_dir_all(&dir)?;
|
||||||
|
}
|
||||||
std::thread::spawn(move || unsafe {
|
std::thread::spawn(move || unsafe {
|
||||||
let mut frame = frame as *mut AVFrame; //TODO: danger??
|
let mut frame = frame as *mut AVFrame; //TODO: danger??
|
||||||
let thumb_start = Instant::now();
|
let thumb_start = Instant::now();
|
||||||
|
|
||||||
|
let dst_pic = dir.join("thumb.webp");
|
||||||
if let Err(e) = Self::save_thumb(frame, &dst_pic) {
|
if let Err(e) = Self::save_thumb(frame, &dst_pic) {
|
||||||
warn!("Failed to save thumb: {}", e);
|
warn!("Failed to save thumb: {}", e);
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ impl ZapStreamDb {
|
|||||||
|
|
||||||
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {
|
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ? where id = ?",
|
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ?, endpoint_id = ? where id = ?",
|
||||||
)
|
)
|
||||||
.bind(&user_stream.state)
|
.bind(&user_stream.state)
|
||||||
.bind(&user_stream.starts)
|
.bind(&user_stream.starts)
|
||||||
@ -109,6 +109,7 @@ impl ZapStreamDb {
|
|||||||
.bind(&user_stream.pinned)
|
.bind(&user_stream.pinned)
|
||||||
.bind(&user_stream.fee)
|
.bind(&user_stream.fee)
|
||||||
.bind(&user_stream.event)
|
.bind(&user_stream.event)
|
||||||
|
.bind(&user_stream.endpoint_id)
|
||||||
.bind(&user_stream.id)
|
.bind(&user_stream.id)
|
||||||
.execute(&self.db)
|
.execute(&self.db)
|
||||||
.await
|
.await
|
||||||
|
275
crates/zap-stream/src/bin/hls_debug.rs
Normal file
275
crates/zap-stream/src/bin/hls_debug.rs
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||||
|
av_q2d, AV_NOPTS_VALUE, AVMediaType::AVMEDIA_TYPE_VIDEO, AVMediaType::AVMEDIA_TYPE_AUDIO,
|
||||||
|
};
|
||||||
|
use ffmpeg_rs_raw::Demuxer;
|
||||||
|
use m3u8_rs::{parse_media_playlist, MediaSegmentType};
|
||||||
|
use std::env;
|
||||||
|
use std::fs;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct SegmentInfo {
|
||||||
|
filename: String,
|
||||||
|
playlist_duration: f32,
|
||||||
|
actual_duration: f64,
|
||||||
|
video_duration: f64,
|
||||||
|
audio_duration: f64,
|
||||||
|
difference: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct SegmentDurations {
|
||||||
|
total_duration: f64,
|
||||||
|
video_duration: f64,
|
||||||
|
audio_duration: f64,
|
||||||
|
video_packets: u64,
|
||||||
|
audio_packets: u64,
|
||||||
|
video_start_pts: i64,
|
||||||
|
video_end_pts: i64,
|
||||||
|
audio_start_pts: i64,
|
||||||
|
audio_end_pts: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
let args: Vec<String> = env::args().collect();
|
||||||
|
if args.len() != 2 {
|
||||||
|
eprintln!("Usage: {} <path_to_hls_directory>", args[0]);
|
||||||
|
eprintln!("Example: {} out/hls/8c220348-fdbb-44cd-94d5-97a11a9ec91d/stream_0", args[0]);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let hls_dir = PathBuf::from(&args[1]);
|
||||||
|
let playlist_path = hls_dir.join("live.m3u8");
|
||||||
|
|
||||||
|
if !playlist_path.exists() {
|
||||||
|
eprintln!("Error: Playlist file {:?} does not exist", playlist_path);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Analyzing HLS stream: {}", hls_dir.display());
|
||||||
|
println!("Playlist: {}", playlist_path.display());
|
||||||
|
println!();
|
||||||
|
|
||||||
|
// Parse the playlist
|
||||||
|
let playlist_content = fs::read_to_string(&playlist_path)
|
||||||
|
.context("Failed to read playlist file")?;
|
||||||
|
|
||||||
|
let (_, playlist) = parse_media_playlist(playlist_content.as_bytes())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to parse playlist: {:?}", e))?;
|
||||||
|
|
||||||
|
// Analyze each segment
|
||||||
|
let mut segments = Vec::new();
|
||||||
|
let mut total_playlist_duration = 0.0f32;
|
||||||
|
let mut total_actual_duration = 0.0f64;
|
||||||
|
|
||||||
|
println!("Segment Analysis:");
|
||||||
|
println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}",
|
||||||
|
"Segment", "Playlist", "Actual", "Video", "Audio", "Difference");
|
||||||
|
println!("{:<12} {:>12} {:>12} {:>12} {:>12} {:>12}",
|
||||||
|
"--------", "--------", "------", "-----", "-----", "----------");
|
||||||
|
|
||||||
|
for segment_type in &playlist.segments {
|
||||||
|
if let MediaSegmentType::Full(segment) = segment_type {
|
||||||
|
let segment_path = hls_dir.join(&segment.uri);
|
||||||
|
|
||||||
|
if !segment_path.exists() {
|
||||||
|
eprintln!("Warning: Segment file {:?} does not exist", segment_path);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analyze file using demuxer
|
||||||
|
let durations = analyze_segment(&segment_path)?;
|
||||||
|
let actual_duration = durations.total_duration;
|
||||||
|
let video_duration = durations.video_duration;
|
||||||
|
let audio_duration = durations.audio_duration;
|
||||||
|
|
||||||
|
let playlist_duration = segment.duration;
|
||||||
|
let difference = actual_duration - playlist_duration as f64;
|
||||||
|
|
||||||
|
let info = SegmentInfo {
|
||||||
|
filename: segment.uri.clone(),
|
||||||
|
playlist_duration,
|
||||||
|
actual_duration,
|
||||||
|
video_duration,
|
||||||
|
audio_duration,
|
||||||
|
difference,
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("{:<12} {:>12.3} {:>12.3} {:>12.3} {:>12.3} {:>12.3}",
|
||||||
|
info.filename,
|
||||||
|
info.playlist_duration,
|
||||||
|
info.actual_duration,
|
||||||
|
info.video_duration,
|
||||||
|
info.audio_duration,
|
||||||
|
info.difference);
|
||||||
|
|
||||||
|
segments.push(info);
|
||||||
|
total_playlist_duration += playlist_duration;
|
||||||
|
total_actual_duration += actual_duration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!("Summary:");
|
||||||
|
println!(" Total segments: {}", segments.len());
|
||||||
|
println!(" Total playlist duration: {:.3}s", total_playlist_duration);
|
||||||
|
println!(" Total actual duration: {:.3}s", total_actual_duration);
|
||||||
|
println!(" Total difference: {:.3}s", total_actual_duration - total_playlist_duration as f64);
|
||||||
|
println!(" Average difference per segment: {:.3}s",
|
||||||
|
(total_actual_duration - total_playlist_duration as f64) / segments.len() as f64);
|
||||||
|
|
||||||
|
// Statistics
|
||||||
|
let differences: Vec<f64> = segments.iter().map(|s| s.difference).collect();
|
||||||
|
let min_diff = differences.iter().fold(f64::INFINITY, |a, &b| a.min(b));
|
||||||
|
let max_diff = differences.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
|
||||||
|
let avg_diff = differences.iter().sum::<f64>() / differences.len() as f64;
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!("Difference Statistics:");
|
||||||
|
println!(" Min difference: {:.3}s", min_diff);
|
||||||
|
println!(" Max difference: {:.3}s", max_diff);
|
||||||
|
println!(" Average difference: {:.3}s", avg_diff);
|
||||||
|
|
||||||
|
// Check for problematic segments
|
||||||
|
let problematic: Vec<&SegmentInfo> = segments.iter()
|
||||||
|
.filter(|s| s.difference.abs() > 0.5)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !problematic.is_empty() {
|
||||||
|
println!();
|
||||||
|
println!("Problematic segments (>0.5s difference):");
|
||||||
|
for seg in problematic {
|
||||||
|
println!(" {}: {:.3}s difference", seg.filename, seg.difference);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check playlist properties
|
||||||
|
println!();
|
||||||
|
println!("Playlist Properties:");
|
||||||
|
println!(" Version: {:?}", playlist.version);
|
||||||
|
println!(" Target duration: {:?}", playlist.target_duration);
|
||||||
|
println!(" Media sequence: {:?}", playlist.media_sequence);
|
||||||
|
if let Some(part_inf) = &playlist.part_inf {
|
||||||
|
println!(" Part target: {:.3}s (LL-HLS enabled)", part_inf.part_target);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn analyze_segment(path: &Path) -> Result<SegmentDurations> {
|
||||||
|
let mut demuxer = Demuxer::new(path.to_str().unwrap())?;
|
||||||
|
|
||||||
|
// Probe the input to get stream information
|
||||||
|
unsafe {
|
||||||
|
demuxer.probe_input()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut video_start_pts = AV_NOPTS_VALUE;
|
||||||
|
let mut video_end_pts = AV_NOPTS_VALUE;
|
||||||
|
let mut audio_start_pts = AV_NOPTS_VALUE;
|
||||||
|
let mut audio_end_pts = AV_NOPTS_VALUE;
|
||||||
|
let mut video_last_duration = 0i64;
|
||||||
|
let mut audio_last_duration = 0i64;
|
||||||
|
let mut video_packets = 0u64;
|
||||||
|
let mut audio_packets = 0u64;
|
||||||
|
let mut video_stream_idx: Option<usize> = None;
|
||||||
|
let mut audio_stream_idx: Option<usize> = None;
|
||||||
|
|
||||||
|
// Read all packets and track timing
|
||||||
|
loop {
|
||||||
|
let packet_result = unsafe { demuxer.get_packet() };
|
||||||
|
match packet_result {
|
||||||
|
Ok((pkt, stream)) => {
|
||||||
|
if pkt.is_null() {
|
||||||
|
break; // End of stream
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
let codec_type = (*(*stream).codecpar).codec_type;
|
||||||
|
let pts = (*pkt).pts;
|
||||||
|
let duration = (*pkt).duration;
|
||||||
|
let current_stream_idx = (*stream).index as usize;
|
||||||
|
|
||||||
|
match codec_type {
|
||||||
|
AVMEDIA_TYPE_VIDEO => {
|
||||||
|
if video_stream_idx.is_none() {
|
||||||
|
video_stream_idx = Some(current_stream_idx);
|
||||||
|
}
|
||||||
|
if pts != AV_NOPTS_VALUE {
|
||||||
|
if video_start_pts == AV_NOPTS_VALUE {
|
||||||
|
video_start_pts = pts;
|
||||||
|
}
|
||||||
|
video_end_pts = pts;
|
||||||
|
video_last_duration = duration;
|
||||||
|
video_packets += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AVMEDIA_TYPE_AUDIO => {
|
||||||
|
if audio_stream_idx.is_none() {
|
||||||
|
audio_stream_idx = Some(current_stream_idx);
|
||||||
|
}
|
||||||
|
if pts != AV_NOPTS_VALUE {
|
||||||
|
if audio_start_pts == AV_NOPTS_VALUE {
|
||||||
|
audio_start_pts = pts;
|
||||||
|
}
|
||||||
|
audio_end_pts = pts;
|
||||||
|
audio_last_duration = duration;
|
||||||
|
audio_packets += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => break, // End of file or error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate durations (including last packet duration)
|
||||||
|
let video_duration = if let Some(stream_idx) = video_stream_idx {
|
||||||
|
if video_start_pts != AV_NOPTS_VALUE && video_end_pts != AV_NOPTS_VALUE {
|
||||||
|
unsafe {
|
||||||
|
let stream = demuxer.get_stream(stream_idx)?;
|
||||||
|
let time_base = (*stream).time_base;
|
||||||
|
let pts_duration = (video_end_pts - video_start_pts) as f64 * av_q2d(time_base);
|
||||||
|
let last_pkt_duration = video_last_duration as f64 * av_q2d(time_base);
|
||||||
|
pts_duration + last_pkt_duration
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
let audio_duration = if let Some(stream_idx) = audio_stream_idx {
|
||||||
|
if audio_start_pts != AV_NOPTS_VALUE && audio_end_pts != AV_NOPTS_VALUE {
|
||||||
|
unsafe {
|
||||||
|
let stream = demuxer.get_stream(stream_idx)?;
|
||||||
|
let time_base = (*stream).time_base;
|
||||||
|
let pts_duration = (audio_end_pts - audio_start_pts) as f64 * av_q2d(time_base);
|
||||||
|
let last_pkt_duration = audio_last_duration as f64 * av_q2d(time_base);
|
||||||
|
pts_duration + last_pkt_duration
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
let total_duration = video_duration.max(audio_duration);
|
||||||
|
|
||||||
|
Ok(SegmentDurations {
|
||||||
|
total_duration,
|
||||||
|
video_duration,
|
||||||
|
audio_duration,
|
||||||
|
video_packets,
|
||||||
|
audio_packets,
|
||||||
|
video_start_pts,
|
||||||
|
video_end_pts,
|
||||||
|
audio_start_pts,
|
||||||
|
audio_end_pts,
|
||||||
|
})
|
||||||
|
}
|
@ -381,7 +381,7 @@ impl HttpServer {
|
|||||||
.header("server", "zap-stream-core")
|
.header("server", "zap-stream-core")
|
||||||
.header("access-control-allow-origin", "*")
|
.header("access-control-allow-origin", "*")
|
||||||
.header("access-control-allow-headers", "*")
|
.header("access-control-allow-headers", "*")
|
||||||
.header("access-control-allow-methods", "HEAD, GET")
|
.header("access-control-allow-methods", "HEAD, GET, OPTIONS")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a response object for a file body
|
/// Get a response object for a file body
|
||||||
|
@ -229,18 +229,28 @@ impl ZapStreamOverseer {
|
|||||||
pubkey: &Vec<u8>,
|
pubkey: &Vec<u8>,
|
||||||
) -> Result<Event> {
|
) -> Result<Event> {
|
||||||
// TODO: remove assumption that HLS is enabled
|
// TODO: remove assumption that HLS is enabled
|
||||||
let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string());
|
|
||||||
let extra_tags = vec![
|
let extra_tags = vec![
|
||||||
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||||
Tag::parse([
|
Tag::parse([
|
||||||
"streaming",
|
"streaming",
|
||||||
self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())?
|
self.map_to_public_url(
|
||||||
.as_str(),
|
PathBuf::from(HlsEgress::PATH)
|
||||||
|
.join(stream.id.to_string())
|
||||||
|
.join("live.m3u8")
|
||||||
|
.to_str()
|
||||||
|
.unwrap(),
|
||||||
|
)?
|
||||||
|
.as_str(),
|
||||||
])?,
|
])?,
|
||||||
Tag::parse([
|
Tag::parse([
|
||||||
"image",
|
"image",
|
||||||
self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())?
|
self.map_to_public_url(
|
||||||
.as_str(),
|
PathBuf::from(stream.id.to_string())
|
||||||
|
.join("thumb.webp")
|
||||||
|
.to_str()
|
||||||
|
.unwrap(),
|
||||||
|
)?
|
||||||
|
.as_str(),
|
||||||
])?,
|
])?,
|
||||||
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
||||||
];
|
];
|
||||||
@ -419,10 +429,10 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? {
|
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? {
|
||||||
endpoint.cost
|
endpoint.cost
|
||||||
} else {
|
} else {
|
||||||
0
|
bail!("Endpoint doesnt exist");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
0
|
bail!("Endpoint id not set on stream");
|
||||||
};
|
};
|
||||||
|
|
||||||
// Convert duration from seconds to minutes and calculate cost
|
// Convert duration from seconds to minutes and calculate cost
|
||||||
@ -532,7 +542,7 @@ impl ZapStreamOverseer {
|
|||||||
let default = endpoints.iter().max_by_key(|e| e.cost);
|
let default = endpoints.iter().max_by_key(|e| e.cost);
|
||||||
Ok(endpoints
|
Ok(endpoints
|
||||||
.iter()
|
.iter()
|
||||||
.find(|e| e.name == connection.endpoint)
|
.find(|e| e.name.eq_ignore_ascii_case(connection.endpoint))
|
||||||
.or(default)
|
.or(default)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.clone())
|
.clone())
|
||||||
|
Reference in New Issue
Block a user