mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 08:59:35 +00:00
fix: audio sync
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1044,7 +1044,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "ffmpeg-rs-raw"
|
name = "ffmpeg-rs-raw"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385#a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385"
|
source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=29ab0547478256c574766b4acc6fcda8ebf4cae6#29ab0547478256c574766b4acc6fcda8ebf4cae6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"ffmpeg-sys-the-third",
|
"ffmpeg-sys-the-third",
|
||||||
|
@ -7,7 +7,7 @@ members = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "a63b88ef3c8f58c7c0ac57d361d06ff0bb3ed385" }
|
ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "29ab0547478256c574766b4acc6fcda8ebf4cae6" }
|
||||||
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
|
tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] }
|
||||||
anyhow = { version = "^1.0.91", features = ["backtrace"] }
|
anyhow = { version = "^1.0.91", features = ["backtrace"] }
|
||||||
async-trait = "0.1.77"
|
async-trait = "0.1.77"
|
||||||
|
@ -26,7 +26,6 @@ use ffmpeg_rs_raw::{
|
|||||||
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
|
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
|
||||||
StreamType,
|
StreamType,
|
||||||
};
|
};
|
||||||
use itertools::Itertools;
|
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@ -203,19 +202,8 @@ impl PipelineRunner {
|
|||||||
//warn!("Frame had nowhere to go in {} :/", var.id());
|
//warn!("Frame had nowhere to go in {} :/", var.id());
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
// before encoding frame, rescale timestamps
|
|
||||||
if !frame.is_null() {
|
|
||||||
let enc_ctx = enc.codec_context();
|
|
||||||
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
|
|
||||||
(*frame).pts =
|
|
||||||
av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
|
|
||||||
(*frame).pkt_dts =
|
|
||||||
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
|
|
||||||
(*frame).duration =
|
|
||||||
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
|
|
||||||
(*frame).time_base = (*enc_ctx).time_base;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// scaling / resampling
|
||||||
let mut new_frame = false;
|
let mut new_frame = false;
|
||||||
let mut frame = match var {
|
let mut frame = match var {
|
||||||
VariantStream::Video(v) => {
|
VariantStream::Video(v) => {
|
||||||
@ -234,6 +222,9 @@ impl PipelineRunner {
|
|||||||
if let Some(ret) =
|
if let Some(ret) =
|
||||||
f.buffer_frame(resampled_frame, frame_size as usize)?
|
f.buffer_frame(resampled_frame, frame_size as usize)?
|
||||||
{
|
{
|
||||||
|
// Set correct timebase for audio (1/sample_rate)
|
||||||
|
(*ret).time_base.num = 1;
|
||||||
|
(*ret).time_base.den = a.sample_rate as i32;
|
||||||
av_frame_free(&mut resampled_frame);
|
av_frame_free(&mut resampled_frame);
|
||||||
ret
|
ret
|
||||||
} else {
|
} else {
|
||||||
@ -247,6 +238,19 @@ impl PipelineRunner {
|
|||||||
_ => frame,
|
_ => frame,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// before encoding frame, rescale timestamps
|
||||||
|
if !frame.is_null() {
|
||||||
|
let enc_ctx = enc.codec_context();
|
||||||
|
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
|
||||||
|
(*frame).pts =
|
||||||
|
av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
|
||||||
|
(*frame).pkt_dts =
|
||||||
|
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
|
||||||
|
(*frame).duration =
|
||||||
|
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
|
||||||
|
(*frame).time_base = (*enc_ctx).time_base;
|
||||||
|
}
|
||||||
|
|
||||||
let packets = enc.encode_frame(frame)?;
|
let packets = enc.encode_frame(frame)?;
|
||||||
// pass new packets to egress
|
// pass new packets to egress
|
||||||
for mut pkt in packets {
|
for mut pkt in packets {
|
||||||
|
@ -60,10 +60,10 @@ impl Service<Request<Incoming>> for HttpServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check if mapped to file
|
// check if mapped to file
|
||||||
let mut dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||||
if dst_path.exists() {
|
if dst_path.exists() {
|
||||||
return Box::pin(async move {
|
return Box::pin(async move {
|
||||||
let mut rsp = Response::builder()
|
let rsp = Response::builder()
|
||||||
.header("server", "zap-stream-core")
|
.header("server", "zap-stream-core")
|
||||||
.header("access-control-allow-origin", "*")
|
.header("access-control-allow-origin", "*")
|
||||||
.header("access-control-allow-headers", "*")
|
.header("access-control-allow-headers", "*")
|
||||||
@ -85,7 +85,7 @@ impl Service<Request<Incoming>> for HttpServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// otherwise handle in overseer
|
// otherwise handle in overseer
|
||||||
let mut api = self.api.clone();
|
let api = self.api.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match api.handler(req).await {
|
match api.handler(req).await {
|
||||||
Ok(res) => Ok(res),
|
Ok(res) => Ok(res),
|
||||||
|
@ -1,36 +1,21 @@
|
|||||||
use crate::blossom::{BlobDescriptor, Blossom};
|
use crate::blossom::{BlobDescriptor, Blossom};
|
||||||
use crate::settings::LndSettings;
|
use crate::settings::LndSettings;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use base64::alphabet::STANDARD;
|
|
||||||
use base64::Engine;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
use fedimint_tonic_lnd::verrpc::VersionRequest;
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG;
|
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame;
|
|
||||||
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
|
||||||
use ffmpeg_rs_raw::Encoder;
|
|
||||||
use futures_util::FutureExt;
|
|
||||||
use http_body_util::combinators::BoxBody;
|
|
||||||
use http_body_util::{BodyExt, Full};
|
|
||||||
use hyper::body::Incoming;
|
|
||||||
use hyper::{Method, Request, Response};
|
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use nostr_sdk::bitcoin::PrivateKey;
|
|
||||||
use nostr_sdk::prelude::Coordinate;
|
use nostr_sdk::prelude::Coordinate;
|
||||||
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::env::temp_dir;
|
|
||||||
use std::fs::create_dir_all;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use zap_stream_core::egress::hls::HlsEgress;
|
|
||||||
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
||||||
use zap_stream_core::ingress::ConnectionInfo;
|
use zap_stream_core::ingress::ConnectionInfo;
|
||||||
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
|
use zap_stream_core::overseer::{IngressInfo, IngressStreamType, Overseer};
|
||||||
@ -39,7 +24,6 @@ use zap_stream_core::variant::audio::AudioVariant;
|
|||||||
use zap_stream_core::variant::mapping::VariantMapping;
|
use zap_stream_core::variant::mapping::VariantMapping;
|
||||||
use zap_stream_core::variant::video::VideoVariant;
|
use zap_stream_core::variant::video::VideoVariant;
|
||||||
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
use zap_stream_core::variant::{StreamMapping, VariantStream};
|
||||||
use zap_stream_db::sqlx::Encode;
|
|
||||||
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
|
||||||
|
|
||||||
const STREAM_EVENT_KIND: u16 = 30_311;
|
const STREAM_EVENT_KIND: u16 = 30_311;
|
||||||
@ -436,7 +420,7 @@ fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStream>> {
|
|||||||
fps: video_src.fps,
|
fps: video_src.fps,
|
||||||
bitrate: 3_000_000,
|
bitrate: 3_000_000,
|
||||||
codec: "libx264".to_string(),
|
codec: "libx264".to_string(),
|
||||||
profile: 100,
|
profile: 77, // AV_PROFILE_H264_MAIN
|
||||||
level: 51,
|
level: 51,
|
||||||
keyframe_interval: video_src.fps as u16 * 2,
|
keyframe_interval: video_src.fps as u16 * 2,
|
||||||
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
pixel_format: AV_PIX_FMT_YUV420P as u32,
|
||||||
|
Reference in New Issue
Block a user