Compare commits

...

3 Commits

Author SHA1 Message Date
ca70bf964c feat: HLS-LL
Some checks reported errors
continuous-integration/drone Build was killed
2025-06-13 11:30:52 +01:00
cc973f0d9b chore: format thread names 2025-06-12 17:29:22 +01:00
a7ff18b34c fix: add default stream info to stream 2025-06-12 17:25:28 +01:00
15 changed files with 568 additions and 194 deletions

10
Cargo.lock generated
View File

@ -1612,6 +1612,12 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "http-range-header"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.10.0" version = "1.10.0"
@ -2147,8 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]] [[package]]
name = "m3u8-rs" name = "m3u8-rs"
version = "6.0.0" version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168"
checksum = "f03cd3335fb5f2447755d45cda9c70f76013626a9db44374973791b0926a86c3"
dependencies = [ dependencies = [
"chrono", "chrono",
"nom", "nom",
@ -5008,6 +5013,7 @@ dependencies = [
"futures-util", "futures-util",
"hex", "hex",
"http-body-util", "http-body-util",
"http-range-header",
"hyper 1.6.0", "hyper 1.6.0",
"hyper-util", "hyper-util",
"log 0.4.25", "log 0.4.25",

View File

@ -24,6 +24,6 @@ url = "2.5.0"
itertools = "0.14.0" itertools = "0.14.0"
chrono = { version = "^0.4.38", features = ["serde"] } chrono = { version = "^0.4.38", features = ["serde"] }
hex = "0.4.3" hex = "0.4.3"
m3u8-rs = "6.0.0" m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" }
sha2 = "0.10.8" sha2 = "0.10.8"
data-encoding = "2.9.0" data-encoding = "2.9.0"

View File

@ -1,24 +1,51 @@
use anyhow::Result; use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
use ffmpeg_rs_raw::Encoder;
use std::path::PathBuf;
use uuid::Uuid; use uuid::Uuid;
use crate::egress::{Egress, EgressResult}; use crate::egress::{Egress, EgressResult};
use crate::mux::HlsMuxer; use crate::mux::{HlsMuxer, SegmentType};
use crate::variant::VariantStream;
/// Alias the muxer directly /// Alias the muxer directly
pub type HlsEgress = HlsMuxer; pub struct HlsEgress {
mux: HlsMuxer,
}
impl Egress for HlsMuxer { impl HlsEgress {
pub const PATH: &'static str = "hls";
pub fn new<'a>(
id: &Uuid,
out_dir: &str,
segment_length: f32,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType,
) -> Result<Self> {
Ok(Self {
mux: HlsMuxer::new(
id,
PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(),
segment_length,
encoders,
segment_type,
)?,
})
}
}
impl Egress for HlsEgress {
unsafe fn process_pkt( unsafe fn process_pkt(
&mut self, &mut self,
packet: *mut AVPacket, packet: *mut AVPacket,
variant: &Uuid, variant: &Uuid,
) -> Result<EgressResult> { ) -> Result<EgressResult> {
self.mux_packet(packet, variant) self.mux.mux_packet(packet, variant)
} }
unsafe fn reset(&mut self) -> Result<()> { unsafe fn reset(&mut self) -> Result<()> {
for var in &mut self.variants { for var in &mut self.mux.variants {
var.reset()? var.reset()?
} }
Ok(()) Ok(())

View File

@ -13,7 +13,7 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: Uuid::new_v4(),
ip_addr: "127.0.0.1:6969".to_string(), ip_addr: "127.0.0.1:6969".to_string(),
endpoint: "file-input".to_owned(), endpoint: "file-input",
app_name: "".to_string(), app_name: "".to_string(),
key: "test".to_string(), key: "test".to_string(),
}; };

View File

@ -21,8 +21,8 @@ pub struct ConnectionInfo {
/// Unique ID of this connection / pipeline /// Unique ID of this connection / pipeline
pub id: Uuid, pub id: Uuid,
/// Endpoint of the ingress /// Name of the ingest point
pub endpoint: String, pub endpoint: &'static str,
/// IP address of the connection /// IP address of the connection
pub ip_addr: String, pub ip_addr: String,
@ -58,7 +58,10 @@ pub fn run_pipeline(mut pl: PipelineRunner) -> anyhow::Result<()> {
info!("New client connected: {}", &pl.connection.ip_addr); info!("New client connected: {}", &pl.connection.ip_addr);
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("pipeline-{}", pl.connection.id)) .name(format!(
"client:{}:{}",
pl.connection.endpoint, pl.connection.id
))
.spawn(move || { .spawn(move || {
pl.run(); pl.run();
})?; })?;

View File

@ -201,7 +201,9 @@ impl RtmpClient {
self.published_stream = Some(RtmpPublishedStream(app_name, stream_key)); self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
} }
} }
ServerSessionEvent::PublishStreamFinished { .. } => {} ServerSessionEvent::PublishStreamFinished { .. } => {
// TODO: shutdown pipeline
}
ServerSessionEvent::StreamMetadataChanged { ServerSessionEvent::StreamMetadataChanged {
app_name, app_name,
stream_key, stream_key,
@ -270,12 +272,12 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
info!("RTMP listening on: {}", &addr); info!("RTMP listening on: {}", &addr);
while let Ok((socket, ip)) = listener.accept().await { while let Ok((socket, ip)) = listener.accept().await {
let mut cc = RtmpClient::new(socket.into_std()?)?; let mut cc = RtmpClient::new(socket.into_std()?)?;
let addr = addr.clone();
let overseer = overseer.clone(); let overseer = overseer.clone();
let out_dir = out_dir.clone(); let out_dir = out_dir.clone();
let handle = Handle::current(); let handle = Handle::current();
let new_id = Uuid::new_v4();
std::thread::Builder::new() std::thread::Builder::new()
.name("rtmp-client".to_string()) .name(format!("client:rtmp:{}", new_id))
.spawn(move || { .spawn(move || {
if let Err(e) = cc.handshake() { if let Err(e) = cc.handshake() {
bail!("Error during handshake: {}", e) bail!("Error during handshake: {}", e)
@ -286,9 +288,9 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let pr = cc.published_stream.as_ref().unwrap(); let pr = cc.published_stream.as_ref().unwrap();
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: new_id,
ip_addr: ip.to_string(), ip_addr: ip.to_string(),
endpoint: addr.clone(), endpoint: "rtmp",
app_name: pr.0.clone(), app_name: pr.0.clone(),
key: pr.1.clone(), key: pr.1.clone(),
}; };
@ -305,8 +307,6 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
bail!("Failed to create PipelineRunner {}", e) bail!("Failed to create PipelineRunner {}", e)
} }
}; };
//pl.set_demuxer_format("flv");
//pl.set_demuxer_buffer_size(1024 * 64);
pl.run(); pl.run();
Ok(()) Ok(())
})?; })?;

View File

@ -22,7 +22,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let socket = request.accept(None).await?; let socket = request.accept(None).await?;
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: Uuid::new_v4(),
endpoint: addr.clone(), endpoint: "srt",
ip_addr: socket.settings().remote.to_string(), ip_addr: socket.settings().remote.to_string(),
app_name: "".to_string(), app_name: "".to_string(),
key: socket key: socket

View File

@ -15,7 +15,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: Uuid::new_v4(),
ip_addr: ip.to_string(), ip_addr: ip.to_string(),
endpoint: addr.clone(), endpoint: "tcp",
app_name: "".to_string(), app_name: "".to_string(),
key: "test".to_string(), key: "test".to_string(),
}; };

View File

@ -4,7 +4,9 @@ use crate::overseer::Overseer;
use anyhow::Result; use anyhow::Result;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN, AVRational}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_free, av_packet_free, AVRational, AV_PROFILE_H264_MAIN,
};
use ffmpeg_rs_raw::{Encoder, Muxer}; use ffmpeg_rs_raw::{Encoder, Muxer};
use log::info; use log::info;
use ringbuf::traits::{Observer, Split}; use ringbuf::traits::{Observer, Split};
@ -25,7 +27,7 @@ pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()>
let info = ConnectionInfo { let info = ConnectionInfo {
id: Uuid::new_v4(), id: Uuid::new_v4(),
endpoint: "test-pattern".to_string(), endpoint: "test-pattern",
ip_addr: "test-pattern".to_string(), ip_addr: "test-pattern".to_string(),
app_name: "".to_string(), app_name: "".to_string(),
key: "test".to_string(), key: "test".to_string(),
@ -115,8 +117,14 @@ impl TestPatternSrc {
SAMPLE_RATE, SAMPLE_RATE,
frame_size, frame_size,
1, 1,
AVRational { num: 1, den: VIDEO_FPS as i32 }, AVRational {
AVRational { num: 1, den: SAMPLE_RATE as i32 }, num: 1,
den: VIDEO_FPS as i32,
},
AVRational {
num: 1,
den: SAMPLE_RATE as i32,
},
)?, )?,
video_encoder, video_encoder,
audio_encoder, audio_encoder,

View File

@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, AVPacket, av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size,
AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
}; };
use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools; use itertools::Itertools;
use log::{info, trace, warn}; use log::{info, trace, warn};
use m3u8_rs::MediaSegment; use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use std::fs::File; use std::fs::File;
@ -18,7 +18,7 @@ use std::path::PathBuf;
use std::ptr; use std::ptr;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Copy)] #[derive(Clone, Copy, PartialEq)]
pub enum SegmentType { pub enum SegmentType {
MPEGTS, MPEGTS,
FMP4, FMP4,
@ -79,14 +79,14 @@ pub struct HlsVariant {
streams: Vec<HlsVariantStream>, streams: Vec<HlsVariantStream>,
/// Segment length in seconds /// Segment length in seconds
segment_length: f32, segment_length: f32,
/// Total number of segments to store for this variant /// Total number of seconds of video to store
segment_window: Option<u16>, segment_window: f32,
/// Current segment index /// Current segment index
idx: u64, idx: u64,
/// Output directory (base) /// Output directory (base)
out_dir: String, out_dir: String,
/// List of segments to be included in the playlist /// List of segments to be included in the playlist
segments: Vec<SegmentInfo>, segments: Vec<HlsSegment>,
/// Type of segments to create /// Type of segments to create
segment_type: SegmentType, segment_type: SegmentType,
/// Ending presentation timestamp /// Ending presentation timestamp
@ -97,8 +97,30 @@ pub struct HlsVariant {
packets_written: u64, packets_written: u64,
/// Reference stream used to track duration /// Reference stream used to track duration
ref_stream_index: i32, ref_stream_index: i32,
/// LL-HLS: Target duration for partial segments
partial_target_duration: f32,
/// HLS-LL: Current partial index
current_partial_index: u64,
/// HLS-LL: Current duration in this partial
current_partial_duration: f64,
} }
#[derive(PartialEq)]
enum HlsSegment {
Full(SegmentInfo),
Partial(PartialSegmentInfo),
}
impl HlsSegment {
fn to_media_segment(&self) -> MediaSegmentType {
match self {
HlsSegment::Full(s) => s.to_media_segment(),
HlsSegment::Partial(s) => s.to_media_segment(),
}
}
}
#[derive(PartialEq)]
struct SegmentInfo { struct SegmentInfo {
index: u64, index: u64,
duration: f32, duration: f32,
@ -106,13 +128,12 @@ struct SegmentInfo {
} }
impl SegmentInfo { impl SegmentInfo {
fn to_media_segment(&self) -> MediaSegment { fn to_media_segment(&self) -> MediaSegmentType {
MediaSegment { MediaSegmentType::Full(MediaSegment {
uri: self.filename(), uri: self.filename(),
duration: self.duration, duration: self.duration,
title: None,
..MediaSegment::default() ..MediaSegment::default()
} })
} }
fn filename(&self) -> String { fn filename(&self) -> String {
@ -120,6 +141,35 @@ impl SegmentInfo {
} }
} }
#[derive(PartialEq)]
struct PartialSegmentInfo {
index: u64,
parent_index: u64,
parent_kind: SegmentType,
duration: f64,
independent: bool,
byte_range: Option<(u64, Option<u64>)>,
}
impl PartialSegmentInfo {
fn to_media_segment(&self) -> MediaSegmentType {
MediaSegmentType::Partial(Part {
uri: self.filename(),
duration: self.duration,
independent: self.independent,
gap: false,
byte_range: self.byte_range.map(|r| ByteRange {
length: r.0,
offset: r.1,
}),
})
}
fn filename(&self) -> String {
HlsVariant::segment_name(self.parent_kind, self.parent_index)
}
}
impl HlsVariant { impl HlsVariant {
pub fn new<'a>( pub fn new<'a>(
out_dir: &'a str, out_dir: &'a str,
@ -207,17 +257,20 @@ impl HlsVariant {
Ok(Self { Ok(Self {
name: name.clone(), name: name.clone(),
segment_length, segment_length,
segment_window: Some(10), //TODO: configure window segment_window: 30.0,
mux, mux,
streams, streams,
idx: 1, idx: 1,
segments: Vec::new(), // Start with empty segments list segments: Vec::new(),
out_dir: out_dir.to_string(), out_dir: out_dir.to_string(),
segment_type, segment_type,
end_pts: AV_NOPTS_VALUE, end_pts: AV_NOPTS_VALUE,
duration: 0.0, duration: 0.0,
packets_written: 0, packets_written: 0,
ref_stream_index, ref_stream_index,
partial_target_duration: 0.33,
current_partial_index: 0,
current_partial_duration: 0.0,
}) })
} }
@ -259,20 +312,8 @@ impl HlsVariant {
} }
// check if current packet is keyframe, flush current segment // check if current packet is keyframe, flush current segment
if self.packets_written > 0 && can_split { if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
trace!( result = self.split_next_seg()?;
"{} segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}",
self.name,
(*pkt).pts,
self.duration,
(*pkt).time_base.num,
(*pkt).time_base.den,
self.segment_length
);
if self.duration >= self.segment_length as f64 {
result = self.split_next_seg()?;
}
} }
// track duration from pts // track duration from pts
@ -282,13 +323,22 @@ impl HlsVariant {
} }
let pts_diff = (*pkt).pts - self.end_pts; let pts_diff = (*pkt).pts - self.end_pts;
if pts_diff > 0 { if pts_diff > 0 {
self.duration += pts_diff as f64 * av_q2d((*pkt).time_base); let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base);
self.duration += time_delta;
self.current_partial_duration += time_delta;
} }
self.end_pts = (*pkt).pts; self.end_pts = (*pkt).pts;
} }
// write to current segment
self.mux.write_packet(pkt)?; self.mux.write_packet(pkt)?;
self.packets_written += 1; self.packets_written += 1;
// HLS-LL: write next partial segment
if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 {
self.create_partial_segment(can_split)?;
}
Ok(result) Ok(result)
} }
@ -296,6 +346,46 @@ impl HlsVariant {
self.mux.close() self.mux.close()
} }
/// Create a partial segment for LL-HLS
fn create_partial_segment(&mut self, independent: bool) -> Result<()> {
let ctx = self.mux.context();
let pos = unsafe {
avio_flush((*ctx).pb);
avio_size((*ctx).pb) as u64
};
let previous_partial_end = self.segments.last().and_then(|s| match &s {
HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len),
_ => None,
});
let partial_info = PartialSegmentInfo {
index: self.current_partial_index,
parent_index: self.idx,
parent_kind: self.segment_type,
duration: self.current_partial_duration,
independent,
byte_range: match previous_partial_end {
Some(prev_end) => Some((pos - prev_end, Some(prev_end))),
_ => Some((pos, Some(0))),
},
};
trace!(
"{} created partial segment {} [{:.3}s, independent={}]",
self.name,
partial_info.index,
partial_info.duration,
independent
);
self.segments.push(HlsSegment::Partial(partial_info));
self.current_partial_index += 1;
self.current_partial_duration = 0.0;
self.write_playlist()?;
Ok(())
}
/// Reset the muxer state and start the next segment /// Reset the muxer state and start the next segment
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> { unsafe fn split_next_seg(&mut self) -> Result<EgressResult> {
let completed_segment_idx = self.idx; let completed_segment_idx = self.idx;
@ -383,6 +473,7 @@ impl HlsVariant {
warn!("Failed to update playlist: {}", e); warn!("Failed to update playlist: {}", e);
} }
// Reset counters for next segment
self.packets_written = 0; self.packets_written = 0;
self.duration = 0.0; self.duration = 0.0;
@ -400,37 +491,62 @@ impl HlsVariant {
/// Add a new segment to the variant and return a list of deleted segments /// Add a new segment to the variant and return a list of deleted segments
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> { fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
self.segments.push(SegmentInfo { self.segments.push(HlsSegment::Full(SegmentInfo {
index: idx, index: idx,
duration, duration,
kind: self.segment_type, kind: self.segment_type,
}); }));
self.write_playlist() self.write_playlist()
} }
/// Delete segments which are too old /// Delete segments which are too old
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> { fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
const MAX_SEGMENTS: usize = 10; let drain_from_hls_segment = {
let mut acc = 0.0;
let mut ret = vec![]; let mut seg_match = None;
if self.segments.len() > MAX_SEGMENTS { for seg in self
let n_drain = self.segments.len() - MAX_SEGMENTS; .segments
let seg_dir = self.out_dir(); .iter()
for seg in self.segments.drain(..n_drain) { .filter(|e| matches!(e, HlsSegment::Full(_)))
// delete file .rev()
let seg_path = seg_dir.join(seg.filename()); {
if let Err(e) = std::fs::remove_file(&seg_path) { if acc >= self.segment_window {
warn!( seg_match = Some(seg);
"Failed to remove segment file: {} {}", break;
seg_path.display(), }
e acc += match seg {
); HlsSegment::Full(seg) => seg.duration,
_ => 0.0,
};
}
seg_match
};
let mut ret = vec![];
if let Some(seg_match) = drain_from_hls_segment {
if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) {
let seg_dir = self.out_dir();
for seg in self.segments.drain(..drain_pos) {
match seg {
HlsSegment::Full(seg) => {
let seg_path = seg_dir.join(seg.filename());
if let Err(e) = std::fs::remove_file(&seg_path) {
warn!(
"Failed to remove segment file: {} {}",
seg_path.display(),
e
);
}
trace!("Removed segment file: {}", seg_path.display());
ret.push(seg);
}
_ => {}
}
} }
trace!("Removed segment file: {}", seg_path.display());
ret.push(seg);
} }
} }
Ok(ret) Ok(ret)
} }
@ -440,11 +556,20 @@ impl HlsVariant {
} }
let mut pl = m3u8_rs::MediaPlaylist::default(); let mut pl = m3u8_rs::MediaPlaylist::default();
// Round up target duration to ensure compliance
pl.target_duration = (self.segment_length.ceil() as u64).max(1); pl.target_duration = (self.segment_length.ceil() as u64).max(1);
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect(); pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
pl.version = Some(3); pl.version = Some(6);
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0); pl.part_inf = Some(PartInf {
part_target: self.partial_target_duration as f64,
});
pl.media_sequence = self
.segments
.iter()
.find_map(|s| match s {
HlsSegment::Full(ss) => Some(ss.index),
_ => None,
})
.unwrap_or(self.idx);
// For live streams, don't set end list // For live streams, don't set end list
pl.end_list = false; pl.end_list = false;
@ -522,6 +647,9 @@ impl HlsMuxer {
) -> Result<Self> { ) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string()); let base = PathBuf::from(out_dir).join(id.to_string());
if !base.exists() {
std::fs::create_dir_all(&base)?;
}
let mut vars = Vec::new(); let mut vars = Vec::new();
for (k, group) in &encoders for (k, group) in &encoders
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))

View File

@ -32,7 +32,7 @@ use tokio::runtime::Handle;
use uuid::Uuid; use uuid::Uuid;
/// Idle mode timeout in seconds /// Idle mode timeout in seconds
const IDLE_TIMEOUT_SECS: u64 = 600; const IDLE_TIMEOUT_SECS: u64 = 60;
/// Circuit breaker threshold for consecutive decode failures /// Circuit breaker threshold for consecutive decode failures
const DEFAULT_MAX_CONSECUTIVE_FAILURES: u32 = 50; const DEFAULT_MAX_CONSECUTIVE_FAILURES: u32 = 50;
@ -705,7 +705,7 @@ impl PipelineRunner {
let hls = HlsEgress::new( let hls = HlsEgress::new(
&self.connection.id, &self.connection.id,
&self.out_dir, &self.out_dir,
2.0, // TODO: configure segment length 6.0, // TODO: configure segment length
encoders, encoders,
SegmentType::MPEGTS, SegmentType::MPEGTS,
)?; )?;

View File

@ -43,4 +43,5 @@ pretty_env_logger = "0.5.0"
clap = { version = "4.5.16", features = ["derive"] } clap = { version = "4.5.16", features = ["derive"] }
futures-util = "0.3.31" futures-util = "0.3.31"
matchit = "0.8.4" matchit = "0.8.4"
mustache = "0.9.0" mustache = "0.9.0"
http-range-header = "0.4.2"

View File

@ -5,6 +5,7 @@ endpoints:
- "rtmp://127.0.0.1:3336" - "rtmp://127.0.0.1:3336"
- "srt://127.0.0.1:3335" - "srt://127.0.0.1:3335"
- "tcp://127.0.0.1:3334" - "tcp://127.0.0.1:3334"
- "test-pattern://"
# Public hostname which points to the IP address used to listen for all [endpoints] # Public hostname which points to the IP address used to listen for all [endpoints]
endpoints_public_hostname: "localhost" endpoints_public_hostname: "localhost"

View File

@ -1,25 +1,36 @@
use crate::api::Api; use crate::api::Api;
use anyhow::{bail, Result}; use anyhow::{bail, ensure, Context, Result};
use base64::Engine; use base64::Engine;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use http_body_util::combinators::BoxBody; use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody}; use http_body_util::{BodyExt, Full, StreamBody};
use http_range_header::{
parse_range_header, EndPosition, StartPosition, SyntacticallyCorrectRange,
};
use hyper::body::{Frame, Incoming}; use hyper::body::{Frame, Incoming};
use hyper::http::response::Builder;
use hyper::service::Service; use hyper::service::Service;
use hyper::{Method, Request, Response}; use hyper::{Request, Response, StatusCode};
use log::error; use log::{error, warn};
use matchit::Router;
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind}; use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
use serde::Serialize; use serde::Serialize;
use std::future::Future; use std::future::Future;
use std::io::SeekFrom;
use std::ops::Range;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::{pin, Pin};
use std::sync::Arc; use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use uuid::Uuid;
use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::viewer::ViewerTracker; use zap_stream_core::viewer::ViewerTracker;
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
@ -46,6 +57,14 @@ pub struct CachedStreams {
cached_at: Instant, cached_at: Instant,
} }
#[derive(Clone)]
pub enum HttpServerPath {
Index,
HlsMasterPlaylist,
HlsVariantPlaylist,
HlsSegmentFile,
}
pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>; pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>;
#[derive(Clone)] #[derive(Clone)]
@ -54,6 +73,7 @@ pub struct HttpServer {
files_dir: PathBuf, files_dir: PathBuf,
api: Api, api: Api,
stream_cache: StreamCache, stream_cache: StreamCache,
router: Router<HttpServerPath>,
} }
impl HttpServer { impl HttpServer {
@ -63,18 +83,37 @@ impl HttpServer {
api: Api, api: Api,
stream_cache: StreamCache, stream_cache: StreamCache,
) -> Self { ) -> Self {
let mut router = Router::new();
router.insert("/", HttpServerPath::Index).unwrap();
router.insert("/index.html", HttpServerPath::Index).unwrap();
router
.insert(
format!("/{}/{{stream}}/live.m3u8", HlsEgress::PATH),
HttpServerPath::HlsMasterPlaylist,
)
.unwrap();
router
.insert(
format!("/{}/{{stream}}/{{variant}}/live.m3u8", HlsEgress::PATH),
HttpServerPath::HlsVariantPlaylist,
)
.unwrap();
router
.insert(
format!("/{}/{{stream}}/{{variant}}/{{seg}}.ts", HlsEgress::PATH),
HttpServerPath::HlsSegmentFile,
)
.unwrap();
Self { Self {
index_template, index_template,
files_dir, files_dir,
api, api,
stream_cache, stream_cache,
router,
} }
} }
async fn get_cached_or_fetch_streams(&self) -> Result<IndexTemplateData> {
Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await
}
async fn get_cached_or_fetch_streams_static( async fn get_cached_or_fetch_streams_static(
stream_cache: &StreamCache, stream_cache: &StreamCache,
api: &Api, api: &Api,
@ -100,13 +139,14 @@ impl HttpServer {
.into_iter() .into_iter()
.map(|stream| { .map(|stream| {
let viewer_count = api.get_viewer_count(&stream.id); let viewer_count = api.get_viewer_count(&stream.id);
// TODO: remove HLS assumption
StreamData { StreamData {
id: stream.id.clone(), id: stream.id.clone(),
title: stream title: stream
.title .title
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])), .unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
summary: stream.summary, summary: stream.summary,
live_url: format!("/{}/live.m3u8", stream.id), live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id),
viewer_count: if viewer_count > 0 { viewer_count: if viewer_count > 0 {
Some(viewer_count as _) Some(viewer_count as _)
} else { } else {
@ -141,31 +181,97 @@ impl HttpServer {
Ok(template_data) Ok(template_data)
} }
async fn render_index(&self) -> Result<String> { async fn handle_index(
let template_data = self.get_cached_or_fetch_streams().await?; api: Api,
let template = mustache::compile_str(&self.index_template)?; stream_cache: StreamCache,
let rendered = template.render_to_string(&template_data)?; template: String,
Ok(rendered) ) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
// Compile template outside async move for better performance
let template = match mustache::compile_str(&template) {
Ok(t) => t,
Err(e) => {
error!("Failed to compile template: {}", e);
return Ok(Self::base_response().status(500).body(BoxBody::default())?);
}
};
let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
match template_data {
Ok(data) => match template.render_to_string(&data) {
Ok(index_html) => Ok(Self::base_response()
.header("content-type", "text/html")
.body(
Full::new(Bytes::from(index_html))
.map_err(|e| match e {})
.boxed(),
)?),
Err(e) => {
error!("Failed to render template: {}", e);
Ok(Self::base_response().status(500).body(BoxBody::default())?)
}
},
Err(e) => {
error!("Failed to fetch template data: {}", e);
Ok(Self::base_response().status(500).body(BoxBody::default())?)
}
}
} }
async fn handle_hls_playlist( async fn handle_hls_segment(
api: &Api,
req: &Request<Incoming>, req: &Request<Incoming>,
playlist_path: &PathBuf, segment_path: PathBuf,
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> { ) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
// Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid) let mut response = Self::base_response().header("accept-ranges", "bytes");
let path_parts: Vec<&str> = req
.uri() if let Some(r) = req.headers().get("range") {
.path() if let Ok(ranges) = parse_range_header(r.to_str()?) {
.trim_start_matches('/') if ranges.ranges.len() > 1 {
.split('/') warn!("Multipart ranges are not supported, fallback to non-range request");
.collect(); Self::path_to_response(segment_path).await
if path_parts.len() < 2 { } else {
return Ok(Response::builder().status(404).body(BoxBody::default())?); let file = File::open(&segment_path).await?;
let metadata = file.metadata().await?;
let single_range = ranges.ranges.first().unwrap();
let range = match RangeBody::get_range(metadata.len(), single_range) {
Ok(r) => r,
Err(e) => {
warn!("Failed to get range: {}", e);
return Ok(response
.status(StatusCode::RANGE_NOT_SATISFIABLE)
.body(BoxBody::default())?);
}
};
let r_body = RangeBody::new(file, metadata.len(), range.clone());
response = response.status(StatusCode::PARTIAL_CONTENT);
let headers = r_body.get_headers();
for (k, v) in headers {
response = response.header(k, v);
}
let f_stream = ReaderStream::new(r_body);
let body = StreamBody::new(
f_stream
.map_ok(Frame::data)
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
)
.boxed();
Ok(response.body(body)?)
}
} else {
Ok(Self::base_response().status(400).body(BoxBody::default())?)
}
} else {
Self::path_to_response(segment_path).await
} }
}
let stream_id = path_parts[0]; async fn handle_hls_master_playlist(
api: Api,
req: &Request<Incoming>,
stream_id: &str,
playlist_path: PathBuf,
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
// Get client IP and User-Agent for tracking // Get client IP and User-Agent for tracking
let client_ip = Self::get_client_ip(req); let client_ip = Self::get_client_ip(req);
let user_agent = req let user_agent = req
@ -203,17 +309,15 @@ impl HttpServer {
let modified_content = let modified_content =
Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?; Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
Ok(Response::builder() let response = Self::base_response()
.header("content-type", "application/vnd.apple.mpegurl") .header("content-type", "application/vnd.apple.mpegurl")
.header("server", "zap-stream-core")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET")
.body( .body(
Full::new(Bytes::from(modified_content)) Full::new(Bytes::from(modified_content))
.map_err(|e| match e {}) .map_err(|e| match e {})
.boxed(), .boxed(),
)?) )?;
Ok(response)
} }
fn get_client_ip(req: &Request<Incoming>) -> String { fn get_client_ip(req: &Request<Incoming>) -> String {
@ -232,8 +336,8 @@ impl HttpServer {
} }
} }
// Fallback to connection IP (note: in real deployment this might be a proxy) // use random string as IP to avoid broken view tracker due to proxying
"unknown".to_string() Uuid::new_v4().to_string()
} }
fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> { fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> {
@ -271,6 +375,27 @@ impl HttpServer {
format!("{}?vt={}", url, viewer_token) format!("{}?vt={}", url, viewer_token)
} }
} }
fn base_response() -> Builder {
Response::builder()
.header("server", "zap-stream-core")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET")
}
/// Get a response object for a file body
async fn path_to_response(path: PathBuf) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
let f = File::open(&path).await?;
let f_stream = ReaderStream::new(f);
let body = StreamBody::new(
f_stream
.map_ok(Frame::data)
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
)
.boxed();
Ok(Self::base_response().body(body)?)
}
} }
impl Service<Request<Incoming>> for HttpServer { impl Service<Request<Incoming>> for HttpServer {
@ -279,89 +404,50 @@ impl Service<Request<Incoming>> for HttpServer {
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future { fn call(&self, req: Request<Incoming>) -> Self::Future {
// check is index.html let path = req.uri().path().to_owned();
if req.method() == Method::GET && req.uri().path() == "/" // request path as a file path pointing to the output directory
|| req.uri().path() == "/index.html" let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
{
let stream_cache = self.stream_cache.clone();
let api = self.api.clone();
// Compile template outside async move for better performance if let Ok(m) = self.router.at(&path) {
let template = match mustache::compile_str(&self.index_template) { match m.value {
Ok(t) => t, HttpServerPath::Index => {
Err(e) => { let api = self.api.clone();
error!("Failed to compile template: {}", e); let cache = self.stream_cache.clone();
let template = self.index_template.clone();
return Box::pin(async move { Self::handle_index(api, cache, template).await });
}
HttpServerPath::HlsMasterPlaylist => {
let api = self.api.clone();
let stream_id = m.params.get("stream").map(|s| s.to_string());
let file_path = dst_path.clone();
return Box::pin(async move { return Box::pin(async move {
Ok(Response::builder() let stream_id = stream_id.context("stream id missing")?;
.status(500) Ok(
.body(BoxBody::default()) Self::handle_hls_master_playlist(api, &req, &stream_id, file_path)
.unwrap()) .await?,
)
}); });
} }
}; HttpServerPath::HlsVariantPlaylist => {
// let file handler handle this one, may be used later for HLS-LL to create
return Box::pin(async move { // delta updates
// Use the existing method to get cached template data
let template_data =
Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
match template_data {
Ok(data) => match template.render_to_string(&data) {
Ok(index_html) => Ok(Response::builder()
.header("content-type", "text/html")
.header("server", "zap-stream-core")
.body(
Full::new(Bytes::from(index_html))
.map_err(|e| match e {})
.boxed(),
)?),
Err(e) => {
error!("Failed to render template: {}", e);
Ok(Response::builder().status(500).body(BoxBody::default())?)
}
},
Err(e) => {
error!("Failed to fetch template data: {}", e);
Ok(Response::builder().status(500).body(BoxBody::default())?)
}
} }
}); HttpServerPath::HlsSegmentFile => {
// handle segment file (range requests)
let file_path = dst_path.clone();
return Box::pin(async move {
Ok(Self::handle_hls_segment(&req, file_path).await?)
});
}
}
} }
// check if mapped to file // check if mapped to file (not handled route)
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
if dst_path.exists() { if dst_path.exists() {
let api_clone = self.api.clone(); return Box::pin(async move { Self::path_to_response(dst_path).await });
return Box::pin(async move {
let rsp = Response::builder()
.header("server", "zap-stream-core")
.header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*")
.header("access-control-allow-methods", "HEAD, GET");
if req.method() == Method::HEAD {
return Ok(rsp.body(BoxBody::default())?);
}
// Handle HLS playlists with viewer tracking
if req.uri().path().ends_with("/live.m3u8") {
return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await;
}
// Handle regular files
let f = File::open(&dst_path).await?;
let f_stream = ReaderStream::new(f);
let body = StreamBody::new(
f_stream
.map_ok(Frame::data)
.map_err(|e| Self::Error::new(e)),
)
.boxed();
Ok(rsp.body(body)?)
});
} }
// otherwise handle in overseer // fallback to api handler
let api = self.api.clone(); let api = self.api.clone();
Box::pin(async move { Box::pin(async move {
match api.handler(req).await { match api.handler(req).await {
@ -466,3 +552,110 @@ pub fn check_nip98_auth(req: &Request<Incoming>, public_url: &str) -> Result<Aut
event, event,
}) })
} }
/// Range request handler over file handle
struct RangeBody {
file: File,
range_start: u64,
range_end: u64,
current_offset: u64,
poll_complete: bool,
file_size: u64,
}
const MAX_UNBOUNDED_RANGE: u64 = 1024 * 1024;
impl RangeBody {
pub fn new(file: File, file_size: u64, range: Range<u64>) -> Self {
Self {
file,
file_size,
range_start: range.start,
range_end: range.end,
current_offset: 0,
poll_complete: false,
}
}
pub fn get_range(file_size: u64, header: &SyntacticallyCorrectRange) -> Result<Range<u64>> {
let range_start = match header.start {
StartPosition::Index(i) => {
ensure!(i < file_size, "Range start out of range");
i
}
StartPosition::FromLast(i) => file_size.saturating_sub(i),
};
let range_end = match header.end {
EndPosition::Index(i) => {
ensure!(i <= file_size, "Range end out of range");
i
}
EndPosition::LastByte => {
(file_size.saturating_sub(1)).min(range_start + MAX_UNBOUNDED_RANGE)
}
};
Ok(range_start..range_end)
}
pub fn get_headers(&self) -> Vec<(&'static str, String)> {
let r_len = (self.range_end - self.range_start) + 1;
vec![
("content-length", r_len.to_string()),
(
"content-range",
format!(
"bytes {}-{}/{}",
self.range_start, self.range_end, self.file_size
),
),
]
}
}
impl AsyncRead for RangeBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let range_start = self.range_start + self.current_offset;
let range_len = self.range_end.saturating_sub(range_start) + 1;
let bytes_to_read = buf.remaining().min(range_len as usize) as u64;
if bytes_to_read == 0 {
return Poll::Ready(Ok(()));
}
// when no pending poll, seek to starting position
if !self.poll_complete {
let pinned = pin!(&mut self.file);
pinned.start_seek(SeekFrom::Start(range_start))?;
self.poll_complete = true;
}
// check poll completion
if self.poll_complete {
let pinned = pin!(&mut self.file);
match pinned.poll_complete(cx) {
Poll::Ready(Ok(_)) => {
self.poll_complete = false;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
}
// Read data from the file
let pinned = pin!(&mut self.file);
match pinned.poll_read(cx, buf) {
Poll::Ready(Ok(_)) => {
self.current_offset += bytes_to_read;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => {
self.poll_complete = true;
Poll::Pending
}
}
}
}

View File

@ -15,6 +15,7 @@ use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
use zap_stream_core::egress::hls::HlsEgress;
use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::egress::{EgressConfig, EgressSegment};
use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::ingress::ConnectionInfo;
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
@ -227,15 +228,18 @@ impl ZapStreamOverseer {
stream: &UserStream, stream: &UserStream,
pubkey: &Vec<u8>, pubkey: &Vec<u8>,
) -> Result<Event> { ) -> Result<Event> {
// TODO: remove assumption that HLS is enabled
let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string());
let extra_tags = vec![ let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?, Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([ Tag::parse([
"streaming", "streaming",
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(), self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())?
.as_str(),
])?, ])?,
Tag::parse([ Tag::parse([
"image", "image",
self.map_to_stream_public_url(stream, "thumb.webp")? self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())?
.as_str(), .as_str(),
])?, ])?,
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?, Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
@ -248,10 +252,6 @@ impl ZapStreamOverseer {
Ok(ev) Ok(ev)
} }
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
self.map_to_public_url(&format!("{}/{}", stream.id, path))
}
fn map_to_public_url(&self, path: &str) -> Result<String> { fn map_to_public_url(&self, path: &str) -> Result<String> {
let u: Url = self.public_url.parse()?; let u: Url = self.public_url.parse()?;
Ok(u.join(path)?.to_string()) Ok(u.join(path)?.to_string())
@ -371,6 +371,12 @@ impl Overseer for ZapStreamOverseer {
starts: Utc::now(), starts: Utc::now(),
state: UserStreamState::Live, state: UserStreamState::Live,
endpoint_id: Some(endpoint.id), endpoint_id: Some(endpoint.id),
title: user.title.clone(),
summary: user.summary.clone(),
thumb: user.image.clone(),
content_warning: user.content_warning.clone(),
goal: user.goal.clone(),
tags: user.tags.clone(),
..Default::default() ..Default::default()
}; };
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
@ -427,7 +433,7 @@ impl Overseer for ZapStreamOverseer {
.tick_stream(pipeline_id, stream.user_id, duration, cost) .tick_stream(pipeline_id, stream.user_id, duration, cost)
.await?; .await?;
if bal <= 0 { if bal <= 0 {
bail!("Not enough balance"); bail!("Balance has run out");
} }
// Update last segment time for this stream // Update last segment time for this stream
@ -508,6 +514,7 @@ impl Overseer for ZapStreamOverseer {
viewer_states.remove(&stream.id); viewer_states.remove(&stream.id);
stream.state = UserStreamState::Ended; stream.state = UserStreamState::Ended;
stream.ends = Some(Utc::now());
let event = self.publish_stream_event(&stream, &user.pubkey).await?; let event = self.publish_stream_event(&stream, &user.pubkey).await?;
stream.event = Some(event.as_json()); stream.event = Some(event.as_json());
self.db.update_stream(&stream).await?; self.db.update_stream(&stream).await?;