mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 08:59:35 +00:00
Compare commits
3 Commits
09577cc2c8
...
ca70bf964c
Author | SHA1 | Date | |
---|---|---|---|
ca70bf964c
|
|||
cc973f0d9b
|
|||
a7ff18b34c
|
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -1612,6 +1612,12 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "http-range-header"
|
||||||
|
version = "0.4.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "httparse"
|
name = "httparse"
|
||||||
version = "1.10.0"
|
version = "1.10.0"
|
||||||
@ -2147,8 +2153,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "m3u8-rs"
|
name = "m3u8-rs"
|
||||||
version = "6.0.0"
|
version = "6.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/v0l/m3u8-rs.git?rev=d76ff96326814237a6d5e92288cdfe7060a43168#d76ff96326814237a6d5e92288cdfe7060a43168"
|
||||||
checksum = "f03cd3335fb5f2447755d45cda9c70f76013626a9db44374973791b0926a86c3"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"nom",
|
"nom",
|
||||||
@ -5008,6 +5013,7 @@ dependencies = [
|
|||||||
"futures-util",
|
"futures-util",
|
||||||
"hex",
|
"hex",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
|
"http-range-header",
|
||||||
"hyper 1.6.0",
|
"hyper 1.6.0",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"log 0.4.25",
|
"log 0.4.25",
|
||||||
|
@ -24,6 +24,6 @@ url = "2.5.0"
|
|||||||
itertools = "0.14.0"
|
itertools = "0.14.0"
|
||||||
chrono = { version = "^0.4.38", features = ["serde"] }
|
chrono = { version = "^0.4.38", features = ["serde"] }
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
m3u8-rs = "6.0.0"
|
m3u8-rs = { git = "https://github.com/v0l/m3u8-rs.git", rev = "d76ff96326814237a6d5e92288cdfe7060a43168" }
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
data-encoding = "2.9.0"
|
data-encoding = "2.9.0"
|
@ -1,24 +1,51 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket;
|
||||||
|
use ffmpeg_rs_raw::Encoder;
|
||||||
|
use std::path::PathBuf;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::egress::{Egress, EgressResult};
|
use crate::egress::{Egress, EgressResult};
|
||||||
use crate::mux::HlsMuxer;
|
use crate::mux::{HlsMuxer, SegmentType};
|
||||||
|
use crate::variant::VariantStream;
|
||||||
|
|
||||||
/// Alias the muxer directly
|
/// Alias the muxer directly
|
||||||
pub type HlsEgress = HlsMuxer;
|
pub struct HlsEgress {
|
||||||
|
mux: HlsMuxer,
|
||||||
|
}
|
||||||
|
|
||||||
impl Egress for HlsMuxer {
|
impl HlsEgress {
|
||||||
|
pub const PATH: &'static str = "hls";
|
||||||
|
|
||||||
|
pub fn new<'a>(
|
||||||
|
id: &Uuid,
|
||||||
|
out_dir: &str,
|
||||||
|
segment_length: f32,
|
||||||
|
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
|
||||||
|
segment_type: SegmentType,
|
||||||
|
) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
mux: HlsMuxer::new(
|
||||||
|
id,
|
||||||
|
PathBuf::from(out_dir).join(Self::PATH).to_str().unwrap(),
|
||||||
|
segment_length,
|
||||||
|
encoders,
|
||||||
|
segment_type,
|
||||||
|
)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Egress for HlsEgress {
|
||||||
unsafe fn process_pkt(
|
unsafe fn process_pkt(
|
||||||
&mut self,
|
&mut self,
|
||||||
packet: *mut AVPacket,
|
packet: *mut AVPacket,
|
||||||
variant: &Uuid,
|
variant: &Uuid,
|
||||||
) -> Result<EgressResult> {
|
) -> Result<EgressResult> {
|
||||||
self.mux_packet(packet, variant)
|
self.mux.mux_packet(packet, variant)
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn reset(&mut self) -> Result<()> {
|
unsafe fn reset(&mut self) -> Result<()> {
|
||||||
for var in &mut self.variants {
|
for var in &mut self.mux.variants {
|
||||||
var.reset()?
|
var.reset()?
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -13,7 +13,7 @@ pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc<dyn Overseer>)
|
|||||||
let info = ConnectionInfo {
|
let info = ConnectionInfo {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
ip_addr: "127.0.0.1:6969".to_string(),
|
ip_addr: "127.0.0.1:6969".to_string(),
|
||||||
endpoint: "file-input".to_owned(),
|
endpoint: "file-input",
|
||||||
app_name: "".to_string(),
|
app_name: "".to_string(),
|
||||||
key: "test".to_string(),
|
key: "test".to_string(),
|
||||||
};
|
};
|
||||||
|
@ -21,8 +21,8 @@ pub struct ConnectionInfo {
|
|||||||
/// Unique ID of this connection / pipeline
|
/// Unique ID of this connection / pipeline
|
||||||
pub id: Uuid,
|
pub id: Uuid,
|
||||||
|
|
||||||
/// Endpoint of the ingress
|
/// Name of the ingest point
|
||||||
pub endpoint: String,
|
pub endpoint: &'static str,
|
||||||
|
|
||||||
/// IP address of the connection
|
/// IP address of the connection
|
||||||
pub ip_addr: String,
|
pub ip_addr: String,
|
||||||
@ -58,7 +58,10 @@ pub fn run_pipeline(mut pl: PipelineRunner) -> anyhow::Result<()> {
|
|||||||
info!("New client connected: {}", &pl.connection.ip_addr);
|
info!("New client connected: {}", &pl.connection.ip_addr);
|
||||||
|
|
||||||
std::thread::Builder::new()
|
std::thread::Builder::new()
|
||||||
.name(format!("pipeline-{}", pl.connection.id))
|
.name(format!(
|
||||||
|
"client:{}:{}",
|
||||||
|
pl.connection.endpoint, pl.connection.id
|
||||||
|
))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
pl.run();
|
pl.run();
|
||||||
})?;
|
})?;
|
||||||
|
@ -201,7 +201,9 @@ impl RtmpClient {
|
|||||||
self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
|
self.published_stream = Some(RtmpPublishedStream(app_name, stream_key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ServerSessionEvent::PublishStreamFinished { .. } => {}
|
ServerSessionEvent::PublishStreamFinished { .. } => {
|
||||||
|
// TODO: shutdown pipeline
|
||||||
|
}
|
||||||
ServerSessionEvent::StreamMetadataChanged {
|
ServerSessionEvent::StreamMetadataChanged {
|
||||||
app_name,
|
app_name,
|
||||||
stream_key,
|
stream_key,
|
||||||
@ -270,12 +272,12 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
|||||||
info!("RTMP listening on: {}", &addr);
|
info!("RTMP listening on: {}", &addr);
|
||||||
while let Ok((socket, ip)) = listener.accept().await {
|
while let Ok((socket, ip)) = listener.accept().await {
|
||||||
let mut cc = RtmpClient::new(socket.into_std()?)?;
|
let mut cc = RtmpClient::new(socket.into_std()?)?;
|
||||||
let addr = addr.clone();
|
|
||||||
let overseer = overseer.clone();
|
let overseer = overseer.clone();
|
||||||
let out_dir = out_dir.clone();
|
let out_dir = out_dir.clone();
|
||||||
let handle = Handle::current();
|
let handle = Handle::current();
|
||||||
|
let new_id = Uuid::new_v4();
|
||||||
std::thread::Builder::new()
|
std::thread::Builder::new()
|
||||||
.name("rtmp-client".to_string())
|
.name(format!("client:rtmp:{}", new_id))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
if let Err(e) = cc.handshake() {
|
if let Err(e) = cc.handshake() {
|
||||||
bail!("Error during handshake: {}", e)
|
bail!("Error during handshake: {}", e)
|
||||||
@ -286,9 +288,9 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
|||||||
|
|
||||||
let pr = cc.published_stream.as_ref().unwrap();
|
let pr = cc.published_stream.as_ref().unwrap();
|
||||||
let info = ConnectionInfo {
|
let info = ConnectionInfo {
|
||||||
id: Uuid::new_v4(),
|
id: new_id,
|
||||||
ip_addr: ip.to_string(),
|
ip_addr: ip.to_string(),
|
||||||
endpoint: addr.clone(),
|
endpoint: "rtmp",
|
||||||
app_name: pr.0.clone(),
|
app_name: pr.0.clone(),
|
||||||
key: pr.1.clone(),
|
key: pr.1.clone(),
|
||||||
};
|
};
|
||||||
@ -305,8 +307,6 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
|||||||
bail!("Failed to create PipelineRunner {}", e)
|
bail!("Failed to create PipelineRunner {}", e)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
//pl.set_demuxer_format("flv");
|
|
||||||
//pl.set_demuxer_buffer_size(1024 * 64);
|
|
||||||
pl.run();
|
pl.run();
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
|
@ -22,7 +22,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
|||||||
let socket = request.accept(None).await?;
|
let socket = request.accept(None).await?;
|
||||||
let info = ConnectionInfo {
|
let info = ConnectionInfo {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
endpoint: addr.clone(),
|
endpoint: "srt",
|
||||||
ip_addr: socket.settings().remote.to_string(),
|
ip_addr: socket.settings().remote.to_string(),
|
||||||
app_name: "".to_string(),
|
app_name: "".to_string(),
|
||||||
key: socket
|
key: socket
|
||||||
|
@ -15,7 +15,7 @@ pub async fn listen(out_dir: String, addr: String, overseer: Arc<dyn Overseer>)
|
|||||||
let info = ConnectionInfo {
|
let info = ConnectionInfo {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
ip_addr: ip.to_string(),
|
ip_addr: ip.to_string(),
|
||||||
endpoint: addr.clone(),
|
endpoint: "tcp",
|
||||||
app_name: "".to_string(),
|
app_name: "".to_string(),
|
||||||
key: "test".to_string(),
|
key: "test".to_string(),
|
||||||
};
|
};
|
||||||
|
@ -4,7 +4,9 @@ use crate::overseer::Overseer;
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVSampleFormat::AV_SAMPLE_FMT_FLTP;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_frame_free, av_packet_free, AV_PROFILE_H264_MAIN, AVRational};
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||||
|
av_frame_free, av_packet_free, AVRational, AV_PROFILE_H264_MAIN,
|
||||||
|
};
|
||||||
use ffmpeg_rs_raw::{Encoder, Muxer};
|
use ffmpeg_rs_raw::{Encoder, Muxer};
|
||||||
use log::info;
|
use log::info;
|
||||||
use ringbuf::traits::{Observer, Split};
|
use ringbuf::traits::{Observer, Split};
|
||||||
@ -25,7 +27,7 @@ pub async fn listen(out_dir: String, overseer: Arc<dyn Overseer>) -> Result<()>
|
|||||||
|
|
||||||
let info = ConnectionInfo {
|
let info = ConnectionInfo {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
endpoint: "test-pattern".to_string(),
|
endpoint: "test-pattern",
|
||||||
ip_addr: "test-pattern".to_string(),
|
ip_addr: "test-pattern".to_string(),
|
||||||
app_name: "".to_string(),
|
app_name: "".to_string(),
|
||||||
key: "test".to_string(),
|
key: "test".to_string(),
|
||||||
@ -115,8 +117,14 @@ impl TestPatternSrc {
|
|||||||
SAMPLE_RATE,
|
SAMPLE_RATE,
|
||||||
frame_size,
|
frame_size,
|
||||||
1,
|
1,
|
||||||
AVRational { num: 1, den: VIDEO_FPS as i32 },
|
AVRational {
|
||||||
AVRational { num: 1, den: SAMPLE_RATE as i32 },
|
num: 1,
|
||||||
|
den: VIDEO_FPS as i32,
|
||||||
|
},
|
||||||
|
AVRational {
|
||||||
|
num: 1,
|
||||||
|
den: SAMPLE_RATE as i32,
|
||||||
|
},
|
||||||
)?,
|
)?,
|
||||||
video_encoder,
|
video_encoder,
|
||||||
audio_encoder,
|
audio_encoder,
|
||||||
|
@ -4,13 +4,13 @@ use anyhow::{bail, ensure, Result};
|
|||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
|
||||||
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, AVPacket,
|
av_free, av_opt_set, av_q2d, av_write_frame, avio_close, avio_flush, avio_open, avio_size,
|
||||||
AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
|
AVPacket, AVStream, AVIO_FLAG_WRITE, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY,
|
||||||
};
|
};
|
||||||
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
|
use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::{info, trace, warn};
|
use log::{info, trace, warn};
|
||||||
use m3u8_rs::MediaSegment;
|
use m3u8_rs::{ByteRange, MediaSegment, MediaSegmentType, Part, PartInf};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -18,7 +18,7 @@ use std::path::PathBuf;
|
|||||||
use std::ptr;
|
use std::ptr;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy, PartialEq)]
|
||||||
pub enum SegmentType {
|
pub enum SegmentType {
|
||||||
MPEGTS,
|
MPEGTS,
|
||||||
FMP4,
|
FMP4,
|
||||||
@ -79,14 +79,14 @@ pub struct HlsVariant {
|
|||||||
streams: Vec<HlsVariantStream>,
|
streams: Vec<HlsVariantStream>,
|
||||||
/// Segment length in seconds
|
/// Segment length in seconds
|
||||||
segment_length: f32,
|
segment_length: f32,
|
||||||
/// Total number of segments to store for this variant
|
/// Total number of seconds of video to store
|
||||||
segment_window: Option<u16>,
|
segment_window: f32,
|
||||||
/// Current segment index
|
/// Current segment index
|
||||||
idx: u64,
|
idx: u64,
|
||||||
/// Output directory (base)
|
/// Output directory (base)
|
||||||
out_dir: String,
|
out_dir: String,
|
||||||
/// List of segments to be included in the playlist
|
/// List of segments to be included in the playlist
|
||||||
segments: Vec<SegmentInfo>,
|
segments: Vec<HlsSegment>,
|
||||||
/// Type of segments to create
|
/// Type of segments to create
|
||||||
segment_type: SegmentType,
|
segment_type: SegmentType,
|
||||||
/// Ending presentation timestamp
|
/// Ending presentation timestamp
|
||||||
@ -97,8 +97,30 @@ pub struct HlsVariant {
|
|||||||
packets_written: u64,
|
packets_written: u64,
|
||||||
/// Reference stream used to track duration
|
/// Reference stream used to track duration
|
||||||
ref_stream_index: i32,
|
ref_stream_index: i32,
|
||||||
|
/// LL-HLS: Target duration for partial segments
|
||||||
|
partial_target_duration: f32,
|
||||||
|
/// HLS-LL: Current partial index
|
||||||
|
current_partial_index: u64,
|
||||||
|
/// HLS-LL: Current duration in this partial
|
||||||
|
current_partial_duration: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
enum HlsSegment {
|
||||||
|
Full(SegmentInfo),
|
||||||
|
Partial(PartialSegmentInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HlsSegment {
|
||||||
|
fn to_media_segment(&self) -> MediaSegmentType {
|
||||||
|
match self {
|
||||||
|
HlsSegment::Full(s) => s.to_media_segment(),
|
||||||
|
HlsSegment::Partial(s) => s.to_media_segment(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
struct SegmentInfo {
|
struct SegmentInfo {
|
||||||
index: u64,
|
index: u64,
|
||||||
duration: f32,
|
duration: f32,
|
||||||
@ -106,13 +128,12 @@ struct SegmentInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentInfo {
|
impl SegmentInfo {
|
||||||
fn to_media_segment(&self) -> MediaSegment {
|
fn to_media_segment(&self) -> MediaSegmentType {
|
||||||
MediaSegment {
|
MediaSegmentType::Full(MediaSegment {
|
||||||
uri: self.filename(),
|
uri: self.filename(),
|
||||||
duration: self.duration,
|
duration: self.duration,
|
||||||
title: None,
|
|
||||||
..MediaSegment::default()
|
..MediaSegment::default()
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filename(&self) -> String {
|
fn filename(&self) -> String {
|
||||||
@ -120,6 +141,35 @@ impl SegmentInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq)]
|
||||||
|
struct PartialSegmentInfo {
|
||||||
|
index: u64,
|
||||||
|
parent_index: u64,
|
||||||
|
parent_kind: SegmentType,
|
||||||
|
duration: f64,
|
||||||
|
independent: bool,
|
||||||
|
byte_range: Option<(u64, Option<u64>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialSegmentInfo {
|
||||||
|
fn to_media_segment(&self) -> MediaSegmentType {
|
||||||
|
MediaSegmentType::Partial(Part {
|
||||||
|
uri: self.filename(),
|
||||||
|
duration: self.duration,
|
||||||
|
independent: self.independent,
|
||||||
|
gap: false,
|
||||||
|
byte_range: self.byte_range.map(|r| ByteRange {
|
||||||
|
length: r.0,
|
||||||
|
offset: r.1,
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filename(&self) -> String {
|
||||||
|
HlsVariant::segment_name(self.parent_kind, self.parent_index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl HlsVariant {
|
impl HlsVariant {
|
||||||
pub fn new<'a>(
|
pub fn new<'a>(
|
||||||
out_dir: &'a str,
|
out_dir: &'a str,
|
||||||
@ -207,17 +257,20 @@ impl HlsVariant {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
segment_length,
|
segment_length,
|
||||||
segment_window: Some(10), //TODO: configure window
|
segment_window: 30.0,
|
||||||
mux,
|
mux,
|
||||||
streams,
|
streams,
|
||||||
idx: 1,
|
idx: 1,
|
||||||
segments: Vec::new(), // Start with empty segments list
|
segments: Vec::new(),
|
||||||
out_dir: out_dir.to_string(),
|
out_dir: out_dir.to_string(),
|
||||||
segment_type,
|
segment_type,
|
||||||
end_pts: AV_NOPTS_VALUE,
|
end_pts: AV_NOPTS_VALUE,
|
||||||
duration: 0.0,
|
duration: 0.0,
|
||||||
packets_written: 0,
|
packets_written: 0,
|
||||||
ref_stream_index,
|
ref_stream_index,
|
||||||
|
partial_target_duration: 0.33,
|
||||||
|
current_partial_index: 0,
|
||||||
|
current_partial_duration: 0.0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,21 +312,9 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check if current packet is keyframe, flush current segment
|
// check if current packet is keyframe, flush current segment
|
||||||
if self.packets_written > 0 && can_split {
|
if self.packets_written > 1 && can_split && self.duration >= self.segment_length as f64 {
|
||||||
trace!(
|
|
||||||
"{} segmentation check: pts={}, duration={:.3}, timebase={}/{}, target={:.3}",
|
|
||||||
self.name,
|
|
||||||
(*pkt).pts,
|
|
||||||
self.duration,
|
|
||||||
(*pkt).time_base.num,
|
|
||||||
(*pkt).time_base.den,
|
|
||||||
self.segment_length
|
|
||||||
);
|
|
||||||
|
|
||||||
if self.duration >= self.segment_length as f64 {
|
|
||||||
result = self.split_next_seg()?;
|
result = self.split_next_seg()?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// track duration from pts
|
// track duration from pts
|
||||||
if is_ref_pkt {
|
if is_ref_pkt {
|
||||||
@ -282,13 +323,22 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
let pts_diff = (*pkt).pts - self.end_pts;
|
let pts_diff = (*pkt).pts - self.end_pts;
|
||||||
if pts_diff > 0 {
|
if pts_diff > 0 {
|
||||||
self.duration += pts_diff as f64 * av_q2d((*pkt).time_base);
|
let time_delta = pts_diff as f64 * av_q2d((*pkt).time_base);
|
||||||
|
self.duration += time_delta;
|
||||||
|
self.current_partial_duration += time_delta;
|
||||||
}
|
}
|
||||||
self.end_pts = (*pkt).pts;
|
self.end_pts = (*pkt).pts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write to current segment
|
||||||
self.mux.write_packet(pkt)?;
|
self.mux.write_packet(pkt)?;
|
||||||
self.packets_written += 1;
|
self.packets_written += 1;
|
||||||
|
|
||||||
|
// HLS-LL: write next partial segment
|
||||||
|
if is_ref_pkt && self.current_partial_duration >= self.partial_target_duration as f64 {
|
||||||
|
self.create_partial_segment(can_split)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,6 +346,46 @@ impl HlsVariant {
|
|||||||
self.mux.close()
|
self.mux.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a partial segment for LL-HLS
|
||||||
|
fn create_partial_segment(&mut self, independent: bool) -> Result<()> {
|
||||||
|
let ctx = self.mux.context();
|
||||||
|
let pos = unsafe {
|
||||||
|
avio_flush((*ctx).pb);
|
||||||
|
avio_size((*ctx).pb) as u64
|
||||||
|
};
|
||||||
|
|
||||||
|
let previous_partial_end = self.segments.last().and_then(|s| match &s {
|
||||||
|
HlsSegment::Partial(p) => p.byte_range.as_ref().map(|(len, start)| start.unwrap_or(0) + len),
|
||||||
|
_ => None,
|
||||||
|
});
|
||||||
|
let partial_info = PartialSegmentInfo {
|
||||||
|
index: self.current_partial_index,
|
||||||
|
parent_index: self.idx,
|
||||||
|
parent_kind: self.segment_type,
|
||||||
|
duration: self.current_partial_duration,
|
||||||
|
independent,
|
||||||
|
byte_range: match previous_partial_end {
|
||||||
|
Some(prev_end) => Some((pos - prev_end, Some(prev_end))),
|
||||||
|
_ => Some((pos, Some(0))),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
"{} created partial segment {} [{:.3}s, independent={}]",
|
||||||
|
self.name,
|
||||||
|
partial_info.index,
|
||||||
|
partial_info.duration,
|
||||||
|
independent
|
||||||
|
);
|
||||||
|
self.segments.push(HlsSegment::Partial(partial_info));
|
||||||
|
self.current_partial_index += 1;
|
||||||
|
self.current_partial_duration = 0.0;
|
||||||
|
|
||||||
|
self.write_playlist()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset the muxer state and start the next segment
|
/// Reset the muxer state and start the next segment
|
||||||
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> {
|
unsafe fn split_next_seg(&mut self) -> Result<EgressResult> {
|
||||||
let completed_segment_idx = self.idx;
|
let completed_segment_idx = self.idx;
|
||||||
@ -383,6 +473,7 @@ impl HlsVariant {
|
|||||||
warn!("Failed to update playlist: {}", e);
|
warn!("Failed to update playlist: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset counters for next segment
|
||||||
self.packets_written = 0;
|
self.packets_written = 0;
|
||||||
self.duration = 0.0;
|
self.duration = 0.0;
|
||||||
|
|
||||||
@ -400,25 +491,44 @@ impl HlsVariant {
|
|||||||
|
|
||||||
/// Add a new segment to the variant and return a list of deleted segments
|
/// Add a new segment to the variant and return a list of deleted segments
|
||||||
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
fn push_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
|
||||||
self.segments.push(SegmentInfo {
|
self.segments.push(HlsSegment::Full(SegmentInfo {
|
||||||
index: idx,
|
index: idx,
|
||||||
duration,
|
duration,
|
||||||
kind: self.segment_type,
|
kind: self.segment_type,
|
||||||
});
|
}));
|
||||||
|
|
||||||
self.write_playlist()
|
self.write_playlist()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete segments which are too old
|
/// Delete segments which are too old
|
||||||
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
fn clean_segments(&mut self) -> Result<Vec<SegmentInfo>> {
|
||||||
const MAX_SEGMENTS: usize = 10;
|
let drain_from_hls_segment = {
|
||||||
|
let mut acc = 0.0;
|
||||||
|
let mut seg_match = None;
|
||||||
|
for seg in self
|
||||||
|
.segments
|
||||||
|
.iter()
|
||||||
|
.filter(|e| matches!(e, HlsSegment::Full(_)))
|
||||||
|
.rev()
|
||||||
|
{
|
||||||
|
if acc >= self.segment_window {
|
||||||
|
seg_match = Some(seg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
acc += match seg {
|
||||||
|
HlsSegment::Full(seg) => seg.duration,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
seg_match
|
||||||
|
};
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
if self.segments.len() > MAX_SEGMENTS {
|
if let Some(seg_match) = drain_from_hls_segment {
|
||||||
let n_drain = self.segments.len() - MAX_SEGMENTS;
|
if let Some(drain_pos) = self.segments.iter().position(|e| e == seg_match) {
|
||||||
let seg_dir = self.out_dir();
|
let seg_dir = self.out_dir();
|
||||||
for seg in self.segments.drain(..n_drain) {
|
for seg in self.segments.drain(..drain_pos) {
|
||||||
// delete file
|
match seg {
|
||||||
|
HlsSegment::Full(seg) => {
|
||||||
let seg_path = seg_dir.join(seg.filename());
|
let seg_path = seg_dir.join(seg.filename());
|
||||||
if let Err(e) = std::fs::remove_file(&seg_path) {
|
if let Err(e) = std::fs::remove_file(&seg_path) {
|
||||||
warn!(
|
warn!(
|
||||||
@ -428,9 +538,15 @@ impl HlsVariant {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
trace!("Removed segment file: {}", seg_path.display());
|
trace!("Removed segment file: {}", seg_path.display());
|
||||||
|
|
||||||
ret.push(seg);
|
ret.push(seg);
|
||||||
}
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,11 +556,20 @@ impl HlsVariant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut pl = m3u8_rs::MediaPlaylist::default();
|
let mut pl = m3u8_rs::MediaPlaylist::default();
|
||||||
// Round up target duration to ensure compliance
|
|
||||||
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
|
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
|
||||||
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
|
||||||
pl.version = Some(3);
|
pl.version = Some(6);
|
||||||
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
|
pl.part_inf = Some(PartInf {
|
||||||
|
part_target: self.partial_target_duration as f64,
|
||||||
|
});
|
||||||
|
pl.media_sequence = self
|
||||||
|
.segments
|
||||||
|
.iter()
|
||||||
|
.find_map(|s| match s {
|
||||||
|
HlsSegment::Full(ss) => Some(ss.index),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.unwrap_or(self.idx);
|
||||||
// For live streams, don't set end list
|
// For live streams, don't set end list
|
||||||
pl.end_list = false;
|
pl.end_list = false;
|
||||||
|
|
||||||
@ -522,6 +647,9 @@ impl HlsMuxer {
|
|||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let base = PathBuf::from(out_dir).join(id.to_string());
|
let base = PathBuf::from(out_dir).join(id.to_string());
|
||||||
|
|
||||||
|
if !base.exists() {
|
||||||
|
std::fs::create_dir_all(&base)?;
|
||||||
|
}
|
||||||
let mut vars = Vec::new();
|
let mut vars = Vec::new();
|
||||||
for (k, group) in &encoders
|
for (k, group) in &encoders
|
||||||
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
|
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
|
||||||
|
@ -32,7 +32,7 @@ use tokio::runtime::Handle;
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Idle mode timeout in seconds
|
/// Idle mode timeout in seconds
|
||||||
const IDLE_TIMEOUT_SECS: u64 = 600;
|
const IDLE_TIMEOUT_SECS: u64 = 60;
|
||||||
|
|
||||||
/// Circuit breaker threshold for consecutive decode failures
|
/// Circuit breaker threshold for consecutive decode failures
|
||||||
const DEFAULT_MAX_CONSECUTIVE_FAILURES: u32 = 50;
|
const DEFAULT_MAX_CONSECUTIVE_FAILURES: u32 = 50;
|
||||||
@ -705,7 +705,7 @@ impl PipelineRunner {
|
|||||||
let hls = HlsEgress::new(
|
let hls = HlsEgress::new(
|
||||||
&self.connection.id,
|
&self.connection.id,
|
||||||
&self.out_dir,
|
&self.out_dir,
|
||||||
2.0, // TODO: configure segment length
|
6.0, // TODO: configure segment length
|
||||||
encoders,
|
encoders,
|
||||||
SegmentType::MPEGTS,
|
SegmentType::MPEGTS,
|
||||||
)?;
|
)?;
|
||||||
|
@ -44,3 +44,4 @@ clap = { version = "4.5.16", features = ["derive"] }
|
|||||||
futures-util = "0.3.31"
|
futures-util = "0.3.31"
|
||||||
matchit = "0.8.4"
|
matchit = "0.8.4"
|
||||||
mustache = "0.9.0"
|
mustache = "0.9.0"
|
||||||
|
http-range-header = "0.4.2"
|
||||||
|
@ -5,6 +5,7 @@ endpoints:
|
|||||||
- "rtmp://127.0.0.1:3336"
|
- "rtmp://127.0.0.1:3336"
|
||||||
- "srt://127.0.0.1:3335"
|
- "srt://127.0.0.1:3335"
|
||||||
- "tcp://127.0.0.1:3334"
|
- "tcp://127.0.0.1:3334"
|
||||||
|
- "test-pattern://"
|
||||||
|
|
||||||
# Public hostname which points to the IP address used to listen for all [endpoints]
|
# Public hostname which points to the IP address used to listen for all [endpoints]
|
||||||
endpoints_public_hostname: "localhost"
|
endpoints_public_hostname: "localhost"
|
||||||
|
@ -1,25 +1,36 @@
|
|||||||
use crate::api::Api;
|
use crate::api::Api;
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use http_body_util::combinators::BoxBody;
|
use http_body_util::combinators::BoxBody;
|
||||||
use http_body_util::{BodyExt, Full, StreamBody};
|
use http_body_util::{BodyExt, Full, StreamBody};
|
||||||
|
use http_range_header::{
|
||||||
|
parse_range_header, EndPosition, StartPosition, SyntacticallyCorrectRange,
|
||||||
|
};
|
||||||
use hyper::body::{Frame, Incoming};
|
use hyper::body::{Frame, Incoming};
|
||||||
|
use hyper::http::response::Builder;
|
||||||
use hyper::service::Service;
|
use hyper::service::Service;
|
||||||
use hyper::{Method, Request, Response};
|
use hyper::{Request, Response, StatusCode};
|
||||||
use log::error;
|
use log::{error, warn};
|
||||||
|
use matchit::Router;
|
||||||
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
|
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
use std::ops::Range;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
use std::pin::{pin, Pin};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::task::Poll;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use zap_stream_core::egress::hls::HlsEgress;
|
||||||
use zap_stream_core::viewer::ViewerTracker;
|
use zap_stream_core::viewer::ViewerTracker;
|
||||||
|
|
||||||
#[derive(Serialize, Clone)]
|
#[derive(Serialize, Clone)]
|
||||||
@ -46,6 +57,14 @@ pub struct CachedStreams {
|
|||||||
cached_at: Instant,
|
cached_at: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum HttpServerPath {
|
||||||
|
Index,
|
||||||
|
HlsMasterPlaylist,
|
||||||
|
HlsVariantPlaylist,
|
||||||
|
HlsSegmentFile,
|
||||||
|
}
|
||||||
|
|
||||||
pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>;
|
pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -54,6 +73,7 @@ pub struct HttpServer {
|
|||||||
files_dir: PathBuf,
|
files_dir: PathBuf,
|
||||||
api: Api,
|
api: Api,
|
||||||
stream_cache: StreamCache,
|
stream_cache: StreamCache,
|
||||||
|
router: Router<HttpServerPath>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpServer {
|
impl HttpServer {
|
||||||
@ -63,18 +83,37 @@ impl HttpServer {
|
|||||||
api: Api,
|
api: Api,
|
||||||
stream_cache: StreamCache,
|
stream_cache: StreamCache,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let mut router = Router::new();
|
||||||
|
router.insert("/", HttpServerPath::Index).unwrap();
|
||||||
|
router.insert("/index.html", HttpServerPath::Index).unwrap();
|
||||||
|
router
|
||||||
|
.insert(
|
||||||
|
format!("/{}/{{stream}}/live.m3u8", HlsEgress::PATH),
|
||||||
|
HttpServerPath::HlsMasterPlaylist,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
router
|
||||||
|
.insert(
|
||||||
|
format!("/{}/{{stream}}/{{variant}}/live.m3u8", HlsEgress::PATH),
|
||||||
|
HttpServerPath::HlsVariantPlaylist,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
router
|
||||||
|
.insert(
|
||||||
|
format!("/{}/{{stream}}/{{variant}}/{{seg}}.ts", HlsEgress::PATH),
|
||||||
|
HttpServerPath::HlsSegmentFile,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
index_template,
|
index_template,
|
||||||
files_dir,
|
files_dir,
|
||||||
api,
|
api,
|
||||||
stream_cache,
|
stream_cache,
|
||||||
|
router,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_cached_or_fetch_streams(&self) -> Result<IndexTemplateData> {
|
|
||||||
Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_cached_or_fetch_streams_static(
|
async fn get_cached_or_fetch_streams_static(
|
||||||
stream_cache: &StreamCache,
|
stream_cache: &StreamCache,
|
||||||
api: &Api,
|
api: &Api,
|
||||||
@ -100,13 +139,14 @@ impl HttpServer {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|stream| {
|
.map(|stream| {
|
||||||
let viewer_count = api.get_viewer_count(&stream.id);
|
let viewer_count = api.get_viewer_count(&stream.id);
|
||||||
|
// TODO: remove HLS assumption
|
||||||
StreamData {
|
StreamData {
|
||||||
id: stream.id.clone(),
|
id: stream.id.clone(),
|
||||||
title: stream
|
title: stream
|
||||||
.title
|
.title
|
||||||
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
|
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
|
||||||
summary: stream.summary,
|
summary: stream.summary,
|
||||||
live_url: format!("/{}/live.m3u8", stream.id),
|
live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id),
|
||||||
viewer_count: if viewer_count > 0 {
|
viewer_count: if viewer_count > 0 {
|
||||||
Some(viewer_count as _)
|
Some(viewer_count as _)
|
||||||
} else {
|
} else {
|
||||||
@ -141,31 +181,97 @@ impl HttpServer {
|
|||||||
Ok(template_data)
|
Ok(template_data)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn render_index(&self) -> Result<String> {
|
async fn handle_index(
|
||||||
let template_data = self.get_cached_or_fetch_streams().await?;
|
api: Api,
|
||||||
let template = mustache::compile_str(&self.index_template)?;
|
stream_cache: StreamCache,
|
||||||
let rendered = template.render_to_string(&template_data)?;
|
template: String,
|
||||||
Ok(rendered)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_hls_playlist(
|
|
||||||
api: &Api,
|
|
||||||
req: &Request<Incoming>,
|
|
||||||
playlist_path: &PathBuf,
|
|
||||||
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||||
// Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid)
|
// Compile template outside async move for better performance
|
||||||
let path_parts: Vec<&str> = req
|
let template = match mustache::compile_str(&template) {
|
||||||
.uri()
|
Ok(t) => t,
|
||||||
.path()
|
Err(e) => {
|
||||||
.trim_start_matches('/')
|
error!("Failed to compile template: {}", e);
|
||||||
.split('/')
|
return Ok(Self::base_response().status(500).body(BoxBody::default())?);
|
||||||
.collect();
|
}
|
||||||
if path_parts.len() < 2 {
|
};
|
||||||
return Ok(Response::builder().status(404).body(BoxBody::default())?);
|
|
||||||
|
let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
|
||||||
|
|
||||||
|
match template_data {
|
||||||
|
Ok(data) => match template.render_to_string(&data) {
|
||||||
|
Ok(index_html) => Ok(Self::base_response()
|
||||||
|
.header("content-type", "text/html")
|
||||||
|
.body(
|
||||||
|
Full::new(Bytes::from(index_html))
|
||||||
|
.map_err(|e| match e {})
|
||||||
|
.boxed(),
|
||||||
|
)?),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to render template: {}", e);
|
||||||
|
Ok(Self::base_response().status(500).body(BoxBody::default())?)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to fetch template data: {}", e);
|
||||||
|
Ok(Self::base_response().status(500).body(BoxBody::default())?)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let stream_id = path_parts[0];
|
async fn handle_hls_segment(
|
||||||
|
req: &Request<Incoming>,
|
||||||
|
segment_path: PathBuf,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||||
|
let mut response = Self::base_response().header("accept-ranges", "bytes");
|
||||||
|
|
||||||
|
if let Some(r) = req.headers().get("range") {
|
||||||
|
if let Ok(ranges) = parse_range_header(r.to_str()?) {
|
||||||
|
if ranges.ranges.len() > 1 {
|
||||||
|
warn!("Multipart ranges are not supported, fallback to non-range request");
|
||||||
|
Self::path_to_response(segment_path).await
|
||||||
|
} else {
|
||||||
|
let file = File::open(&segment_path).await?;
|
||||||
|
let metadata = file.metadata().await?;
|
||||||
|
let single_range = ranges.ranges.first().unwrap();
|
||||||
|
let range = match RangeBody::get_range(metadata.len(), single_range) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get range: {}", e);
|
||||||
|
return Ok(response
|
||||||
|
.status(StatusCode::RANGE_NOT_SATISFIABLE)
|
||||||
|
.body(BoxBody::default())?);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let r_body = RangeBody::new(file, metadata.len(), range.clone());
|
||||||
|
|
||||||
|
response = response.status(StatusCode::PARTIAL_CONTENT);
|
||||||
|
let headers = r_body.get_headers();
|
||||||
|
for (k, v) in headers {
|
||||||
|
response = response.header(k, v);
|
||||||
|
}
|
||||||
|
let f_stream = ReaderStream::new(r_body);
|
||||||
|
let body = StreamBody::new(
|
||||||
|
f_stream
|
||||||
|
.map_ok(Frame::data)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
|
||||||
|
)
|
||||||
|
.boxed();
|
||||||
|
Ok(response.body(body)?)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(Self::base_response().status(400).body(BoxBody::default())?)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Self::path_to_response(segment_path).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_hls_master_playlist(
|
||||||
|
api: Api,
|
||||||
|
req: &Request<Incoming>,
|
||||||
|
stream_id: &str,
|
||||||
|
playlist_path: PathBuf,
|
||||||
|
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||||
// Get client IP and User-Agent for tracking
|
// Get client IP and User-Agent for tracking
|
||||||
let client_ip = Self::get_client_ip(req);
|
let client_ip = Self::get_client_ip(req);
|
||||||
let user_agent = req
|
let user_agent = req
|
||||||
@ -203,17 +309,15 @@ impl HttpServer {
|
|||||||
let modified_content =
|
let modified_content =
|
||||||
Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
|
Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
let response = Self::base_response()
|
||||||
.header("content-type", "application/vnd.apple.mpegurl")
|
.header("content-type", "application/vnd.apple.mpegurl")
|
||||||
.header("server", "zap-stream-core")
|
|
||||||
.header("access-control-allow-origin", "*")
|
|
||||||
.header("access-control-allow-headers", "*")
|
|
||||||
.header("access-control-allow-methods", "HEAD, GET")
|
|
||||||
.body(
|
.body(
|
||||||
Full::new(Bytes::from(modified_content))
|
Full::new(Bytes::from(modified_content))
|
||||||
.map_err(|e| match e {})
|
.map_err(|e| match e {})
|
||||||
.boxed(),
|
.boxed(),
|
||||||
)?)
|
)?;
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_client_ip(req: &Request<Incoming>) -> String {
|
fn get_client_ip(req: &Request<Incoming>) -> String {
|
||||||
@ -232,8 +336,8 @@ impl HttpServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback to connection IP (note: in real deployment this might be a proxy)
|
// use random string as IP to avoid broken view tracker due to proxying
|
||||||
"unknown".to_string()
|
Uuid::new_v4().to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> {
|
fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> {
|
||||||
@ -271,6 +375,27 @@ impl HttpServer {
|
|||||||
format!("{}?vt={}", url, viewer_token)
|
format!("{}?vt={}", url, viewer_token)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn base_response() -> Builder {
|
||||||
|
Response::builder()
|
||||||
|
.header("server", "zap-stream-core")
|
||||||
|
.header("access-control-allow-origin", "*")
|
||||||
|
.header("access-control-allow-headers", "*")
|
||||||
|
.header("access-control-allow-methods", "HEAD, GET")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a response object for a file body
|
||||||
|
async fn path_to_response(path: PathBuf) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
|
||||||
|
let f = File::open(&path).await?;
|
||||||
|
let f_stream = ReaderStream::new(f);
|
||||||
|
let body = StreamBody::new(
|
||||||
|
f_stream
|
||||||
|
.map_ok(Frame::data)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
|
||||||
|
)
|
||||||
|
.boxed();
|
||||||
|
Ok(Self::base_response().body(body)?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<Request<Incoming>> for HttpServer {
|
impl Service<Request<Incoming>> for HttpServer {
|
||||||
@ -279,89 +404,50 @@ impl Service<Request<Incoming>> for HttpServer {
|
|||||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||||
|
|
||||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||||
// check is index.html
|
let path = req.uri().path().to_owned();
|
||||||
if req.method() == Method::GET && req.uri().path() == "/"
|
// request path as a file path pointing to the output directory
|
||||||
|| req.uri().path() == "/index.html"
|
|
||||||
{
|
|
||||||
let stream_cache = self.stream_cache.clone();
|
|
||||||
let api = self.api.clone();
|
|
||||||
|
|
||||||
// Compile template outside async move for better performance
|
|
||||||
let template = match mustache::compile_str(&self.index_template) {
|
|
||||||
Ok(t) => t,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to compile template: {}", e);
|
|
||||||
return Box::pin(async move {
|
|
||||||
Ok(Response::builder()
|
|
||||||
.status(500)
|
|
||||||
.body(BoxBody::default())
|
|
||||||
.unwrap())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
return Box::pin(async move {
|
|
||||||
// Use the existing method to get cached template data
|
|
||||||
let template_data =
|
|
||||||
Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
|
|
||||||
|
|
||||||
match template_data {
|
|
||||||
Ok(data) => match template.render_to_string(&data) {
|
|
||||||
Ok(index_html) => Ok(Response::builder()
|
|
||||||
.header("content-type", "text/html")
|
|
||||||
.header("server", "zap-stream-core")
|
|
||||||
.body(
|
|
||||||
Full::new(Bytes::from(index_html))
|
|
||||||
.map_err(|e| match e {})
|
|
||||||
.boxed(),
|
|
||||||
)?),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to render template: {}", e);
|
|
||||||
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to fetch template data: {}", e);
|
|
||||||
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if mapped to file
|
|
||||||
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||||
if dst_path.exists() {
|
|
||||||
let api_clone = self.api.clone();
|
if let Ok(m) = self.router.at(&path) {
|
||||||
|
match m.value {
|
||||||
|
HttpServerPath::Index => {
|
||||||
|
let api = self.api.clone();
|
||||||
|
let cache = self.stream_cache.clone();
|
||||||
|
let template = self.index_template.clone();
|
||||||
|
return Box::pin(async move { Self::handle_index(api, cache, template).await });
|
||||||
|
}
|
||||||
|
HttpServerPath::HlsMasterPlaylist => {
|
||||||
|
let api = self.api.clone();
|
||||||
|
let stream_id = m.params.get("stream").map(|s| s.to_string());
|
||||||
|
let file_path = dst_path.clone();
|
||||||
return Box::pin(async move {
|
return Box::pin(async move {
|
||||||
let rsp = Response::builder()
|
let stream_id = stream_id.context("stream id missing")?;
|
||||||
.header("server", "zap-stream-core")
|
Ok(
|
||||||
.header("access-control-allow-origin", "*")
|
Self::handle_hls_master_playlist(api, &req, &stream_id, file_path)
|
||||||
.header("access-control-allow-headers", "*")
|
.await?,
|
||||||
.header("access-control-allow-methods", "HEAD, GET");
|
|
||||||
|
|
||||||
if req.method() == Method::HEAD {
|
|
||||||
return Ok(rsp.body(BoxBody::default())?);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle HLS playlists with viewer tracking
|
|
||||||
if req.uri().path().ends_with("/live.m3u8") {
|
|
||||||
return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle regular files
|
|
||||||
let f = File::open(&dst_path).await?;
|
|
||||||
let f_stream = ReaderStream::new(f);
|
|
||||||
let body = StreamBody::new(
|
|
||||||
f_stream
|
|
||||||
.map_ok(Frame::data)
|
|
||||||
.map_err(|e| Self::Error::new(e)),
|
|
||||||
)
|
)
|
||||||
.boxed();
|
|
||||||
Ok(rsp.body(body)?)
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
HttpServerPath::HlsVariantPlaylist => {
|
||||||
|
// let file handler handle this one, may be used later for HLS-LL to create
|
||||||
|
// delta updates
|
||||||
|
}
|
||||||
|
HttpServerPath::HlsSegmentFile => {
|
||||||
|
// handle segment file (range requests)
|
||||||
|
let file_path = dst_path.clone();
|
||||||
|
return Box::pin(async move {
|
||||||
|
Ok(Self::handle_hls_segment(&req, file_path).await?)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// otherwise handle in overseer
|
// check if mapped to file (not handled route)
|
||||||
|
if dst_path.exists() {
|
||||||
|
return Box::pin(async move { Self::path_to_response(dst_path).await });
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback to api handler
|
||||||
let api = self.api.clone();
|
let api = self.api.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match api.handler(req).await {
|
match api.handler(req).await {
|
||||||
@ -466,3 +552,110 @@ pub fn check_nip98_auth(req: &Request<Incoming>, public_url: &str) -> Result<Aut
|
|||||||
event,
|
event,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Range request handler over file handle
|
||||||
|
struct RangeBody {
|
||||||
|
file: File,
|
||||||
|
range_start: u64,
|
||||||
|
range_end: u64,
|
||||||
|
current_offset: u64,
|
||||||
|
poll_complete: bool,
|
||||||
|
file_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
const MAX_UNBOUNDED_RANGE: u64 = 1024 * 1024;
|
||||||
|
impl RangeBody {
|
||||||
|
pub fn new(file: File, file_size: u64, range: Range<u64>) -> Self {
|
||||||
|
Self {
|
||||||
|
file,
|
||||||
|
file_size,
|
||||||
|
range_start: range.start,
|
||||||
|
range_end: range.end,
|
||||||
|
current_offset: 0,
|
||||||
|
poll_complete: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_range(file_size: u64, header: &SyntacticallyCorrectRange) -> Result<Range<u64>> {
|
||||||
|
let range_start = match header.start {
|
||||||
|
StartPosition::Index(i) => {
|
||||||
|
ensure!(i < file_size, "Range start out of range");
|
||||||
|
i
|
||||||
|
}
|
||||||
|
StartPosition::FromLast(i) => file_size.saturating_sub(i),
|
||||||
|
};
|
||||||
|
let range_end = match header.end {
|
||||||
|
EndPosition::Index(i) => {
|
||||||
|
ensure!(i <= file_size, "Range end out of range");
|
||||||
|
i
|
||||||
|
}
|
||||||
|
EndPosition::LastByte => {
|
||||||
|
(file_size.saturating_sub(1)).min(range_start + MAX_UNBOUNDED_RANGE)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(range_start..range_end)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_headers(&self) -> Vec<(&'static str, String)> {
|
||||||
|
let r_len = (self.range_end - self.range_start) + 1;
|
||||||
|
vec![
|
||||||
|
("content-length", r_len.to_string()),
|
||||||
|
(
|
||||||
|
"content-range",
|
||||||
|
format!(
|
||||||
|
"bytes {}-{}/{}",
|
||||||
|
self.range_start, self.range_end, self.file_size
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for RangeBody {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
let range_start = self.range_start + self.current_offset;
|
||||||
|
let range_len = self.range_end.saturating_sub(range_start) + 1;
|
||||||
|
let bytes_to_read = buf.remaining().min(range_len as usize) as u64;
|
||||||
|
|
||||||
|
if bytes_to_read == 0 {
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// when no pending poll, seek to starting position
|
||||||
|
if !self.poll_complete {
|
||||||
|
let pinned = pin!(&mut self.file);
|
||||||
|
pinned.start_seek(SeekFrom::Start(range_start))?;
|
||||||
|
self.poll_complete = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check poll completion
|
||||||
|
if self.poll_complete {
|
||||||
|
let pinned = pin!(&mut self.file);
|
||||||
|
match pinned.poll_complete(cx) {
|
||||||
|
Poll::Ready(Ok(_)) => {
|
||||||
|
self.poll_complete = false;
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||||
|
Poll::Pending => return Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read data from the file
|
||||||
|
let pinned = pin!(&mut self.file);
|
||||||
|
match pinned.poll_read(cx, buf) {
|
||||||
|
Poll::Ready(Ok(_)) => {
|
||||||
|
self.current_offset += bytes_to_read;
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||||
|
Poll::Pending => {
|
||||||
|
self.poll_complete = true;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -15,6 +15,7 @@ use std::sync::Arc;
|
|||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
use zap_stream_core::egress::hls::HlsEgress;
|
||||||
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
||||||
use zap_stream_core::ingress::ConnectionInfo;
|
use zap_stream_core::ingress::ConnectionInfo;
|
||||||
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
|
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
|
||||||
@ -227,15 +228,18 @@ impl ZapStreamOverseer {
|
|||||||
stream: &UserStream,
|
stream: &UserStream,
|
||||||
pubkey: &Vec<u8>,
|
pubkey: &Vec<u8>,
|
||||||
) -> Result<Event> {
|
) -> Result<Event> {
|
||||||
|
// TODO: remove assumption that HLS is enabled
|
||||||
|
let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string());
|
||||||
let extra_tags = vec![
|
let extra_tags = vec![
|
||||||
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||||
Tag::parse([
|
Tag::parse([
|
||||||
"streaming",
|
"streaming",
|
||||||
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(),
|
self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())?
|
||||||
|
.as_str(),
|
||||||
])?,
|
])?,
|
||||||
Tag::parse([
|
Tag::parse([
|
||||||
"image",
|
"image",
|
||||||
self.map_to_stream_public_url(stream, "thumb.webp")?
|
self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())?
|
||||||
.as_str(),
|
.as_str(),
|
||||||
])?,
|
])?,
|
||||||
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
||||||
@ -248,10 +252,6 @@ impl ZapStreamOverseer {
|
|||||||
Ok(ev)
|
Ok(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
|
|
||||||
self.map_to_public_url(&format!("{}/{}", stream.id, path))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn map_to_public_url(&self, path: &str) -> Result<String> {
|
fn map_to_public_url(&self, path: &str) -> Result<String> {
|
||||||
let u: Url = self.public_url.parse()?;
|
let u: Url = self.public_url.parse()?;
|
||||||
Ok(u.join(path)?.to_string())
|
Ok(u.join(path)?.to_string())
|
||||||
@ -371,6 +371,12 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
starts: Utc::now(),
|
starts: Utc::now(),
|
||||||
state: UserStreamState::Live,
|
state: UserStreamState::Live,
|
||||||
endpoint_id: Some(endpoint.id),
|
endpoint_id: Some(endpoint.id),
|
||||||
|
title: user.title.clone(),
|
||||||
|
summary: user.summary.clone(),
|
||||||
|
thumb: user.image.clone(),
|
||||||
|
content_warning: user.content_warning.clone(),
|
||||||
|
goal: user.goal.clone(),
|
||||||
|
tags: user.tags.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
|
||||||
@ -427,7 +433,7 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
.tick_stream(pipeline_id, stream.user_id, duration, cost)
|
.tick_stream(pipeline_id, stream.user_id, duration, cost)
|
||||||
.await?;
|
.await?;
|
||||||
if bal <= 0 {
|
if bal <= 0 {
|
||||||
bail!("Not enough balance");
|
bail!("Balance has run out");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update last segment time for this stream
|
// Update last segment time for this stream
|
||||||
@ -508,6 +514,7 @@ impl Overseer for ZapStreamOverseer {
|
|||||||
viewer_states.remove(&stream.id);
|
viewer_states.remove(&stream.id);
|
||||||
|
|
||||||
stream.state = UserStreamState::Ended;
|
stream.state = UserStreamState::Ended;
|
||||||
|
stream.ends = Some(Utc::now());
|
||||||
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
||||||
stream.event = Some(event.as_json());
|
stream.event = Some(event.as_json());
|
||||||
self.db.update_stream(&stream).await?;
|
self.db.update_stream(&stream).await?;
|
||||||
|
Reference in New Issue
Block a user