fix: try improve hls playback

This commit is contained in:
2025-06-06 15:19:36 +01:00
parent b79eb3b0f7
commit 56f72b129d
7 changed files with 102 additions and 37 deletions

View File

@ -6,6 +6,12 @@ members = [
"crates/zap-stream-db"
]
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
panic = "abort"
[workspace.dependencies]
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "29ab0547478256c574766b4acc6fcda8ebf4cae6" }
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }

View File

@ -113,11 +113,15 @@ impl RtmpClient {
}
ServerSessionResult::RaisedEvent(ev) => self.handle_event(ev)?,
ServerSessionResult::UnhandleableMessageReceived(m) => {
// treat any non-flv streams as raw media stream in rtmp
// Log unhandleable messages for debugging
error!("Received unhandleable message with {} bytes", m.data.len());
// Only append data if it looks like valid media data
if !m.data.is_empty() && m.data.len() > 4 {
self.media_buf.extend(&m.data);
}
}
}
}
Ok(())
}
@ -164,10 +168,20 @@ impl RtmpClient {
);
}
ServerSessionEvent::AudioDataReceived { data, .. } => {
// Validate audio data before adding to buffer
if !data.is_empty() {
self.media_buf.extend(data);
} else {
error!("Received empty audio data");
}
}
ServerSessionEvent::VideoDataReceived { data, .. } => {
// Validate video data before adding to buffer
if !data.is_empty() {
self.media_buf.extend(data);
} else {
error!("Received empty video data");
}
}
ServerSessionEvent::UnhandleableAmf0Command { .. } => {}
ServerSessionEvent::PlayStreamRequested { request_id, .. } => {

View File

@ -186,11 +186,7 @@ impl HlsVariant {
streams,
idx: 1,
pkt_start: 0.0,
segments: Vec::from([SegmentInfo {
index: 1,
duration: segment_length,
kind: segment_type,
}]),
segments: Vec::new(), // Start with empty segments list
out_dir: out_dir.to_string(),
segment_type,
})
@ -220,8 +216,9 @@ impl HlsVariant {
let pkt_q = av_q2d((*pkt).time_base);
// time of this packet in seconds
let pkt_time = (*pkt).pts as f32 * pkt_q as f32;
// what segment this pkt should be in (index)
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
// what segment this pkt should be in (index) - use relative time from start
let relative_time = pkt_time - self.pkt_start;
let pkt_seg = self.idx + (relative_time / self.segment_length).floor() as u64;
let mut result = EgressResult::None;
let pkt_stream = *(*self.mux.context())
@ -361,11 +358,18 @@ impl HlsVariant {
}
fn write_playlist(&mut self) -> Result<()> {
if self.segments.is_empty() {
return Ok(()); // Don't write empty playlists
}
let mut pl = m3u8_rs::MediaPlaylist::default();
pl.target_duration = self.segment_length as u64;
// Round up target duration to ensure compliance
pl.target_duration = (self.segment_length.ceil() as u64).max(1);
pl.segments = self.segments.iter().map(|s| s.to_media_segment()).collect();
pl.version = Some(3);
pl.media_sequence = self.segments.first().map(|s| s.index).unwrap_or(0);
// For live streams, don't set end list
pl.end_list = false;
let mut f_out = File::create(self.out_dir().join("live.m3u8"))?;
pl.write_to(&mut f_out)?;

View File

@ -77,6 +77,9 @@ pub struct PipelineRunner {
/// Total number of frames produced
frame_ctr: u64,
out_dir: String,
/// Thumbnail generation interval (0 = disabled)
thumb_interval: u64,
}
impl PipelineRunner {
@ -104,6 +107,7 @@ impl PipelineRunner {
frame_ctr: 0,
fps_last_frame_ctr: 0,
info: None,
thumb_interval: 1800, // Disable thumbnails by default for performance
})
}
@ -165,7 +169,9 @@ impl PipelineRunner {
let p = (*stream).codecpar;
if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO {
if (self.frame_ctr % 1800) == 0 {
// Conditionally generate thumbnails based on interval (0 = disabled)
if self.thumb_interval > 0 && (self.frame_ctr % self.thumb_interval) == 0 {
let thumb_start = Instant::now();
let dst_pic = PathBuf::from(&self.out_dir)
.join(config.id.to_string())
.join("thumb.webp");
@ -182,11 +188,15 @@ impl PipelineRunner {
.with_pix_fmt(transmute((*frame).format))
.open(None)?
.save_picture(frame, dst_pic.to_str().unwrap())?;
info!("Saved thumb to: {}", dst_pic.display());
let thumb_duration = thumb_start.elapsed();
info!(
"Saved thumb ({:.2}ms) to: {}",
thumb_duration.as_millis() as f32 / 1000.0,
dst_pic.display(),
);
av_frame_free(&mut frame);
}
// TODO: fix this, multiple video streams in
self.frame_ctr += 1;
}
@ -271,7 +281,8 @@ impl PipelineRunner {
av_packet_free(&mut pkt);
// egress results
// egress results - process async operations without blocking if possible
if !egress_results.is_empty() {
self.handle.block_on(async {
for er in egress_results {
if let EgressResult::Segments { created, deleted } = er {
@ -286,6 +297,7 @@ impl PipelineRunner {
}
Ok(())
})?;
}
let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32();
if elapsed >= 2f32 {
let n_frames = self.frame_ctr - self.fps_last_frame_ctr;

View File

@ -1,4 +1,5 @@
use crate::http::check_nip98_auth;
use crate::overseer::ZapStreamOverseer;
use crate::settings::Settings;
use crate::ListenerEndpoint;
use anyhow::{anyhow, bail, Result};
@ -8,14 +9,16 @@ use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::{Method, Request, Response};
use log::warn;
use matchit::Router;
use nostr_sdk::{serde_json, PublicKey};
use nostr_sdk::{serde_json, JsonUtil, PublicKey};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;
use zap_stream_db::ZapStreamDb;
use zap_stream_db::{UserStream, ZapStreamDb};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Route {
@ -35,10 +38,11 @@ pub struct Api {
settings: Settings,
lnd: fedimint_tonic_lnd::Client,
router: Router<Route>,
overseer: Arc<ZapStreamOverseer>,
}
impl Api {
pub fn new(db: ZapStreamDb, settings: Settings, lnd: fedimint_tonic_lnd::Client) -> Self {
pub fn new(overseer: Arc<ZapStreamOverseer>, settings: Settings) -> Self {
let mut router = Router::new();
// Define routes (path only, method will be matched separately)
@ -54,10 +58,11 @@ impl Api {
router.insert("/api/v1/keys", Route::Keys).unwrap();
Self {
db,
db: overseer.database(),
settings,
lnd,
lnd: overseer.lnd_client(),
router,
overseer,
}
}
@ -410,7 +415,16 @@ impl Api {
self.db.update_stream(&stream).await?;
// TODO: Update the nostr event and republish like C# version
// Update the nostr event and republish like C# version
if let Err(e) = self
.republish_stream_event(&stream, pubkey.to_bytes())
.await
{
warn!(
"Failed to republish nostr event for stream {}: {}",
stream.id, e
);
}
} else {
// Update user default stream info
self.db
@ -619,6 +633,21 @@ impl Api {
event: None, // TODO: Build proper nostr event like C# version
})
}
/// Republish stream event to nostr relays using the same code as overseer
async fn republish_stream_event(&self, stream: &UserStream, pubkey: [u8; 32]) -> Result<()> {
let event = self
.overseer
.publish_stream_event(stream, &pubkey.to_vec())
.await?;
// Update the stream with the new event JSON
let mut updated_stream = stream.clone();
updated_stream.event = Some(event.as_json());
self.db.update_stream(&updated_stream).await?;
Ok(())
}
}
#[derive(Deserialize, Serialize)]

View File

@ -71,7 +71,7 @@ async fn main() -> Result<()> {
let http_addr: SocketAddr = settings.listen_http.parse()?;
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
let api = Api::new(overseer.database(), settings.clone(), overseer.lnd_client());
let api = Api::new(overseer.clone(), settings.clone());
// HTTP server
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
tasks.push(tokio::spawn(async move {

View File

@ -191,7 +191,7 @@ impl ZapStreamOverseer {
Ok(EventBuilder::new(Kind::FileMetadata, "").tags(tags))
}
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
pub async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
let extra_tags = vec![
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse([