From 41773cc3a0993639df024de19937371330c8e93f Mon Sep 17 00:00:00 2001 From: Kieran Date: Thu, 5 Jun 2025 16:18:03 +0100 Subject: [PATCH] fix: audio sync --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/core/src/pipeline/runner.rs | 30 +++++++++++++++++------------- crates/zap-stream/src/http.rs | 6 +++--- crates/zap-stream/src/overseer.rs | 20 ++------------------ 5 files changed, 24 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e5e0f4..1ebd2b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,7 +1044,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385#a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=29ab0547478256c574766b4acc6fcda8ebf4cae6#29ab0547478256c574766b4acc6fcda8ebf4cae6" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index 8695427..9acab4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "29ab0547478256c574766b4acc6fcda8ebf4cae6" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } async-trait = "0.1.77" diff --git a/crates/core/src/pipeline/runner.rs b/crates/core/src/pipeline/runner.rs index 03639fd..afb2059 100644 --- a/crates/core/src/pipeline/runner.rs +++ b/crates/core/src/pipeline/runner.rs @@ -26,7 +26,6 @@ use ffmpeg_rs_raw::{ cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType, }; -use itertools::Itertools; use log::{error, info, warn}; use tokio::runtime::Handle; use uuid::Uuid; @@ -203,19 +202,8 @@ impl PipelineRunner { //warn!("Frame had nowhere to go in {} :/", var.id()); continue; }; - // before encoding frame, rescale timestamps - if !frame.is_null() { - let enc_ctx = enc.codec_context(); - (*frame).pict_type = AV_PICTURE_TYPE_NONE; - (*frame).pts = - av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).pkt_dts = - av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); - (*frame).duration = - av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); - (*frame).time_base = (*enc_ctx).time_base; - } + // scaling / resampling let mut new_frame = false; let mut frame = match var { VariantStream::Video(v) => { @@ -234,6 +222,9 @@ impl PipelineRunner { if let Some(ret) = f.buffer_frame(resampled_frame, frame_size as usize)? { + // Set correct timebase for audio (1/sample_rate) + (*ret).time_base.num = 1; + (*ret).time_base.den = a.sample_rate as i32; av_frame_free(&mut resampled_frame); ret } else { @@ -247,6 +238,19 @@ impl PipelineRunner { _ => frame, }; + // before encoding frame, rescale timestamps + if !frame.is_null() { + let enc_ctx = enc.codec_context(); + (*frame).pict_type = AV_PICTURE_TYPE_NONE; + (*frame).pts = + av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).pkt_dts = + av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base); + (*frame).duration = + av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base); + (*frame).time_base = (*enc_ctx).time_base; + } + let packets = enc.encode_frame(frame)?; // pass new packets to egress for mut pkt in packets { diff --git a/crates/zap-stream/src/http.rs b/crates/zap-stream/src/http.rs index b949596..7c67bff 100644 --- a/crates/zap-stream/src/http.rs +++ b/crates/zap-stream/src/http.rs @@ -60,10 +60,10 @@ impl Service> for HttpServer { } // check if mapped to file - let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); + let dst_path = self.files_dir.join(req.uri().path()[1..].to_string()); if dst_path.exists() { return Box::pin(async move { - let mut rsp = Response::builder() + let rsp = Response::builder() .header("server", "zap-stream-core") .header("access-control-allow-origin", "*") .header("access-control-allow-headers", "*") @@ -85,7 +85,7 @@ impl Service> for HttpServer { } // otherwise handle in overseer - let mut api = self.api.clone(); + let api = self.api.clone(); Box::pin(async move { match api.handler(req).await { Ok(res) => Ok(res), diff --git a/crates/zap-stream/src/overseer.rs b/crates/zap-stream/src/overseer.rs index ad4f893..b880aad 100644 --- a/crates/zap-stream/src/overseer.rs +++ b/crates/zap-stream/src/overseer.rs @@ -1,36 +1,21 @@ use crate::blossom::{BlobDescriptor, Blossom}; use crate::settings::LndSettings; -use anyhow::{anyhow, bail, Result}; +use anyhow::{bail, Result}; use async_trait::async_trait; -use base64::alphabet::STANDARD; -use base64::Engine; -use bytes::Bytes; use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; -use ffmpeg_rs_raw::Encoder; -use futures_util::FutureExt; -use http_body_util::combinators::BoxBody; -use http_body_util::{BodyExt, Full}; -use hyper::body::Incoming; -use hyper::{Method, Request, Response}; use log::{error, info, warn}; -use nostr_sdk::bitcoin::PrivateKey; use nostr_sdk::prelude::Coordinate; use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; use serde::Serialize; use std::collections::HashSet; -use std::env::temp_dir; -use std::fs::create_dir_all; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use tokio::sync::RwLock; use url::Url; use uuid::Uuid; -use zap_stream_core::egress::hls::HlsEgress; use zap_stream_core::egress::{EgressConfig, EgressSegment}; use zap_stream_core::ingress::ConnectionInfo; use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer}; @@ -39,7 +24,6 @@ use zap_stream_core::variant::audio::AudioVariant; use zap_stream_core::variant::mapping::VariantMapping; use zap_stream_core::variant::video::VideoVariant; use zap_stream_core::variant::{StreamMapping, VariantStream}; -use zap_stream_db::sqlx::Encode; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; const STREAM_EVENT_KIND: u16 = 30_311; @@ -436,7 +420,7 @@ fn get_default_variants(info: &IngressInfo) -> Result> { fps: video_src.fps, bitrate: 3_000_000, codec: "libx264".to_string(), - profile: 100, + profile: 77, // AV_PROFILE_H264_MAIN level: 51, keyframe_interval: video_src.fps as u16 * 2, pixel_format: AV_PIX_FMT_YUV420P as u32,