mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 08:59:35 +00:00
Compare commits
3 Commits
ca70bf964c
...
047b3fec59
Author | SHA1 | Date | |
---|---|---|---|
047b3fec59
|
|||
fee5e77407
|
|||
d88f829645
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -2153,7 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
|
||||
[[package]]
|
||||
name = "m3u8-rs"
|
||||
version = "6.0.0"
|
||||
source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168"
|
||||
source = "git+https://git.v0l.io/Kieran/m3u8-rs.git?rev=5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f#5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"nom",
|
||||
|
@ -24,6 +24,6 @@ url = "2.5.0"
|
||||
itertools = "0.14.0"
|
||||
chrono = { version = "^0.4.38", features = ["serde"] }
|
||||
hex = "0.4.3"
|
||||
m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" }
|
||||
m3u8-rs = { git = "https://git.v0l.io/Kieran/m3u8-rs.git", rev = "5b7aa0c65994b5ab2780b7ed27d84c03bc32d19f" }
|
||||
sha2 = "0.10.8"
|
||||
data-encoding = "2.9.0"
|
@ -10,7 +10,7 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
|
||||
use itertools::Itertools;
|
||||
use log::{info, trace, warn};
|
||||
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf};
|
||||
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf, PreloadHint};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::fs::File;
|
||||
@ -103,6 +103,8 @@ pub struct HlsVariant {
|
||||
current_partial_index: u64,
|
||||
/// HLS-LL: Current duration in this partial
|
||||
current_partial_duration: f64,
|
||||
/// HLS-LL: Whether the next partial segment should be marked as independent
|
||||
next_partial_independent: bool,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
@ -114,8 +116,8 @@ enum HlsSegment {
|
||||
impl HlsSegment {
|
||||
fn to_media_segment(&self) -> MediaSegmentType {
|
||||
match self {
|
||||
HlsSegment::Full(s) => s.to_media_segment(),
|
||||
HlsSegment::Partial(s) => s.to_media_segment(),
|
||||
HlsSegment::Full(f) => f.to_media_segment(),
|
||||
HlsSegment::Partial(p) => p.to_media_segment(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -168,6 +170,13 @@ impl PartialSegmentInfo {
|
||||
fn filename(&self) -> String {
|
||||
HlsVariant::segment_name(self.parent_kind, self.parent_index)
|
||||
}
|
||||
|
||||
/// Byte offset where this partial segment ends
|
||||
fn end_pos(&self) -> Option<u64> {
|
||||
self.byte_range
|
||||
.as_ref()
|
||||
.map(|(len, start)| start.unwrap_or(0) + len)
|
||||
}
|
||||
}
|
||||
|
||||
impl HlsVariant {
|
||||
@ -271,6 +280,7 @@ impl HlsVariant {
|
||||
partial_target_duration: 0.33,
|
||||
current_partial_index: 0,
|
||||
current_partial_duration: 0.0,
|
||||
next_partial_independent: false,
|
||||
})
|
||||
}
|
||||
|
||||
@ -311,6 +321,16 @@ impl HlsVariant {
|
||||
is_ref_pkt = false;
|
||||
}
|
||||
|
||||
// HLS-LL: write prev partial segment
|
||||
if self.current_partial_duration >= self.partial_target_duration as f64 {
|
||||
self.create_partial_segment()?;
|
||||
|
||||
// HLS-LL: Mark next partial as independent if this packet is a keyframe
|
||||
if can_split {
|
||||
self.next_partial_independent = true;
|
||||
}
|
||||
}
|
||||
|
||||
// check if current packet is keyframe, flush current segment
|
||||
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
|
||||
result = self.split_next_seg()?;
|
||||
@ -334,11 +354,6 @@ impl HlsVariant {
|
||||
self.mux.write_packet(pkt)?;
|
||||
self.packets_written += 1;
|
||||
|
||||
// HLS-LL: write next partial segment
|
||||
if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 {
|
||||
self.create_partial_segment(can_split)?;
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@ -347,27 +362,30 @@ impl HlsVariant {
|
||||
}
|
||||
|
||||
/// Create a partial segment for LL-HLS
|
||||
fn create_partial_segment(&mut self, independent: bool) -> Result<()> {
|
||||
fn create_partial_segment(&mut self) -> Result<()> {
|
||||
let ctx = self.mux.context();
|
||||
let pos = unsafe {
|
||||
let end_pos = unsafe {
|
||||
avio_flush((*ctx).pb);
|
||||
avio_size((*ctx).pb) as u64
|
||||
};
|
||||
|
||||
let previous_partial_end = self.segments.last().and_then(|s| match &s {
|
||||
HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len),
|
||||
_ => None,
|
||||
});
|
||||
let previous_end_pos = self
|
||||
.segments
|
||||
.last()
|
||||
.and_then(|s| match &s {
|
||||
HlsSegment::Partial(p) => p.end_pos(),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or(0);
|
||||
let independent = self.next_partial_independent;
|
||||
let partial_size = end_pos - previous_end_pos;
|
||||
let partial_info = PartialSegmentInfo {
|
||||
index: self.current_partial_index,
|
||||
parent_index: self.idx,
|
||||
parent_kind: self.segment_type,
|
||||
duration: self.current_partial_duration,
|
||||
independent,
|
||||
byte_range: match previous_partial_end {
|
||||
Some(prev_end) => Some((pos - prev_end, Some(prev_end))),
|
||||
_ => Some((pos, Some(0))),
|
||||
},
|
||||
byte_range: Some((partial_size, Some(previous_end_pos))),
|
||||
};
|
||||
|
||||
trace!(
|
||||
@ -380,6 +398,7 @@ impl HlsVariant {
|
||||
self.segments.push(HlsSegment::Partial(partial_info));
|
||||
self.current_partial_index += 1;
|
||||
self.current_partial_duration = 0.0;
|
||||
self.next_partial_independent = false;
|
||||
|
||||
self.write_playlist()?;
|
||||
|
||||
@ -558,6 +577,17 @@ impl HlsVariant {
|
||||
let mut pl = m3u8_rs::MediaPlaylist::default();
|
||||
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
|
||||
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
||||
|
||||
// append segment preload for next part segment
|
||||
if let Some(HlsSegment::Partial(partial)) = self.segments.last() {
|
||||
// TODO: try to estimate if there will be another partial segment
|
||||
pl.segments.push(MediaSegmentType::PreloadHint(PreloadHint {
|
||||
hint_type: "PART".to_string(),
|
||||
uri: partial.filename(),
|
||||
byte_range_start: partial.end_pos(),
|
||||
byte_range_length: None,
|
||||
}));
|
||||
}
|
||||
pl.version = Some(6);
|
||||
pl.part_inf = Some(PartInf {
|
||||
part_target: self.partial_target_duration as f64,
|
||||
|
@ -208,13 +208,15 @@ impl PipelineRunner {
|
||||
unsafe fn generate_thumb_from_frame(&mut self, frame: *mut AVFrame) -> Result<()> {
|
||||
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
|
||||
let frame = av_frame_clone(frame).addr();
|
||||
let dst_pic = PathBuf::from(&self.out_dir)
|
||||
.join(self.connection.id.to_string())
|
||||
.join("thumb.webp");
|
||||
let dir = PathBuf::from(&self.out_dir).join(self.connection.id.to_string());
|
||||
if !dir.exists() {
|
||||
std::fs::create_dir_all(&dir)?;
|
||||
}
|
||||
std::thread::spawn(move || unsafe {
|
||||
let mut frame = frame as *mut AVFrame; //TODO: danger??
|
||||
let thumb_start = Instant::now();
|
||||
|
||||
let dst_pic = dir.join("thumb.webp");
|
||||
if let Err(e) = Self::save_thumb(frame, &dst_pic) {
|
||||
warn!("Failed to save thumb: {}", e);
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ impl ZapStreamDb {
|
||||
|
||||
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {
|
||||
sqlx::query(
|
||||
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ? where id = ?",
|
||||
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ?, endpoint_id = ? where id = ?",
|
||||
)
|
||||
.bind(&user_stream.state)
|
||||
.bind(&user_stream.starts)
|
||||
@ -109,6 +109,7 @@ impl ZapStreamDb {
|
||||
.bind(&user_stream.pinned)
|
||||
.bind(&user_stream.fee)
|
||||
.bind(&user_stream.event)
|
||||
.bind(&user_stream.endpoint_id)
|
||||
.bind(&user_stream.id)
|
||||
.execute(&self.db)
|
||||
.await
|
||||
|
@ -419,10 +419,10 @@ impl Overseer for ZapStreamOverseer {
|
||||
if let Some(endpoint) = self.db.get_ingest_endpoint(endpoint_id).await? {
|
||||
endpoint.cost
|
||||
} else {
|
||||
0
|
||||
bail!("Endpoint doesnt exist");
|
||||
}
|
||||
} else {
|
||||
0
|
||||
bail!("Endpoint id not set on stream");
|
||||
};
|
||||
|
||||
// Convert duration from seconds to minutes and calculate cost
|
||||
@ -532,7 +532,7 @@ impl ZapStreamOverseer {
|
||||
let default = endpoints.iter().max_by_key(|e| e.cost);
|
||||
Ok(endpoints
|
||||
.iter()
|
||||
.find(|e| e.name == connection.endpoint)
|
||||
.find(|e| e.name.eq_ignore_ascii_case(connection.endpoint))
|
||||
.or(default)
|
||||
.unwrap()
|
||||
.clone())
|
||||
|
Reference in New Issue
Block a user