feat: cleanup stream on end

fix: audio codecs
fix: hls segmenter
This commit is contained in:
kieran 2024-11-21 22:08:47 +00:00
parent f192a915e0
commit 9937f9a6f9
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
16 changed files with 131 additions and 54 deletions

4
Cargo.lock generated
View File

@ -3430,7 +3430,6 @@ dependencies = [
"tokio-stream",
"tracing",
"url",
"uuid",
]
[[package]]
@ -3512,7 +3511,6 @@ dependencies = [
"stringprep",
"thiserror 1.0.57",
"tracing",
"uuid",
"whoami",
]
@ -3552,7 +3550,6 @@ dependencies = [
"stringprep",
"thiserror 1.0.57",
"tracing",
"uuid",
"whoami",
]
@ -3578,7 +3575,6 @@ dependencies = [
"sqlx-core",
"tracing",
"url",
"uuid",
]
[[package]]

View File

@ -20,4 +20,11 @@ impl Egress for HlsMuxer {
Ok(EgressResult::None)
}
}
unsafe fn reset(&mut self) -> Result<()> {
for var in &mut self.variants {
var.reset()?
}
Ok(())
}
}

View File

@ -19,6 +19,7 @@ pub struct EgressConfig {
pub trait Egress {
unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid)
-> Result<EgressResult>;
unsafe fn reset(&mut self) -> Result<()>;
}
#[derive(Debug, Clone)]

View File

@ -63,4 +63,8 @@ impl Egress for RecorderEgress {
}
Ok(EgressResult::None)
}
unsafe fn reset(&mut self) -> Result<()> {
self.muxer.reset()
}
}

View File

@ -38,9 +38,16 @@ pub async fn spawn_pipeline(
std::thread::spawn(move || unsafe {
match PipelineRunner::new(handle, out_dir, seer, info, reader) {
Ok(mut pl) => loop {
if let Err(e) = pl.run() {
error!("Pipeline run failed: {}", e);
break;
match pl.run() {
Ok(c) => {
if !c {
break;
}
}
Err(e) => {
error!("Pipeline run failed: {}", e);
break;
}
}
},
Err(e) => {

View File

@ -3,6 +3,7 @@ use crate::overseer::Overseer;
use crate::pipeline::runner::PipelineRunner;
use crate::settings::Settings;
use anyhow::Result;
use futures_util::stream::FusedStream;
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use log::{error, info, warn};
use srt_tokio::{SrtListener, SrtSocket};
@ -54,6 +55,9 @@ impl Read for SrtReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let (mut rx, _) = self.socket.split_mut();
while self.buf.len() < buf.len() {
if rx.is_terminated() {
return Ok(0);
}
if let Some((_, mut data)) = self.handle.block_on(rx.next()) {
self.buf.extend(data.iter().as_slice());
}

View File

@ -2,6 +2,7 @@ use crate::egress::NewSegment;
use crate::variant::{StreamMapping, VariantStream};
use anyhow::{bail, Result};
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVMediaType::AVMEDIA_TYPE_VIDEO;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVStream,
AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY,
@ -215,7 +216,11 @@ impl HlsVariant {
let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64;
let mut result = None;
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY;
let pkt_stream = *(*self.mux.context())
.streams
.add((*pkt).stream_index as usize);
let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY
&& (*(*pkt_stream).codecpar).codec_type == AVMEDIA_TYPE_VIDEO;
if pkt_seg != self.idx && can_split {
result = Some(self.split_next_seg(pkt_time)?);
}
@ -223,6 +228,10 @@ impl HlsVariant {
Ok(result)
}
pub unsafe fn reset(&mut self) -> Result<()> {
self.mux.reset()
}
unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result<NewSegment> {
self.idx += 1;
@ -368,8 +377,8 @@ impl HlsVariant {
}
pub struct HlsMuxer {
out_dir: PathBuf,
variants: Vec<HlsVariant>,
pub out_dir: PathBuf,
pub variants: Vec<HlsVariant>,
}
impl HlsMuxer {

View File

@ -82,6 +82,9 @@ pub trait Overseer: Send + Sync {
height: usize,
path: &PathBuf,
) -> Result<()>;
/// Stream is finished
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()>;
}
impl Settings {
@ -171,10 +174,10 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStre
group_id: 1,
},
bitrate: 192_000,
codec: "libfdk_aac".to_string(),
codec: "aac".to_string(),
channels: 2,
sample_rate: 48_000,
sample_fmt: "s16".to_owned(),
sample_fmt: "fltp".to_owned(),
}));
}
@ -231,4 +234,10 @@ impl Overseer for StaticOverseer {
// nothing to do here
Ok(())
}
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
// nothing to do here
Ok(())
}
}

View File

@ -49,4 +49,8 @@ impl Overseer for WebhookOverseer {
) -> Result<()> {
todo!()
}
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
todo!()
}
}

View File

@ -6,7 +6,7 @@ use crate::overseer::{get_default_variants, IngressInfo, Overseer};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::settings::LndSettings;
use crate::variant::StreamMapping;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use chrono::Utc;
use fedimint_tonic_lnd::verrpc::VersionRequest;
@ -140,7 +140,7 @@ impl ZapStreamOverseer {
}
let kind = Kind::from(STREAM_EVENT_KIND);
let coord = Coordinate::new(kind, self.keys.public_key).identifier(stream.id);
let coord = Coordinate::new(kind, self.keys.public_key).identifier(&stream.id);
tags.push(Tag::parse(&[
"alt",
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
@ -226,9 +226,10 @@ impl Overseer for ZapStreamOverseer {
}));
let user = self.db.get_user(uid).await?;
let stream_id = Uuid::new_v4();
// insert new stream record
let mut new_stream = UserStream {
id: Uuid::new_v4(),
id: stream_id.to_string(),
user_id: uid,
starts: Utc::now(),
state: UserStreamState::Live,
@ -238,8 +239,9 @@ impl Overseer for ZapStreamOverseer {
new_stream.event = Some(stream_event.as_json());
self.db.insert_stream(&new_stream).await?;
self.db.update_stream(&new_stream).await?;
Ok(PipelineConfig {
id: new_stream.id,
id: stream_id,
variants,
egress,
})
@ -291,4 +293,17 @@ impl Overseer for ZapStreamOverseer {
// nothing to do
Ok(())
}
async fn on_end(&self, pipeline_id: &Uuid) -> Result<()> {
let mut stream = self.db.get_stream(pipeline_id).await?;
let user = self.db.get_user(stream.user_id).await?;
stream.state = UserStreamState::Ended;
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
stream.event = Some(event.as_json());
self.db.update_stream(&stream).await?;
info!("Stream ended {}", stream.id);
Ok(())
}
}

View File

@ -3,6 +3,7 @@ use std::io::Read;
use std::mem::transmute;
use std::ops::Sub;
use std::path::PathBuf;
use std::ptr;
use std::sync::Arc;
use std::time::Instant;
@ -19,7 +20,8 @@ use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_rs_raw::ffmpeg_sys_the_third::{
av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType,
av_frame_free, av_get_sample_fmt, av_packet_free, av_pkt_dump_log2, av_q2d, av_rescale_q,
AVMediaType,
};
use ffmpeg_rs_raw::{
cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler,
@ -103,11 +105,28 @@ impl PipelineRunner {
})
}
/// EOF, cleanup
unsafe fn flush(&mut self) -> Result<()> {
for (var, enc) in &mut self.encoders {
for mut pkt in enc.encode_frame(ptr::null_mut())? {
for eg in self.egress.iter_mut() {
eg.process_pkt(pkt, &var)?;
}
av_packet_free(&mut pkt);
}
}
for eg in self.egress.iter_mut() {
eg.reset()?;
}
Ok(())
}
/// Main processor, should be called in a loop
pub unsafe fn run(&mut self) -> Result<()> {
/// Returns false when stream data ended (EOF)
pub unsafe fn run(&mut self) -> Result<bool> {
self.setup()?;
let config = if let Some(ref config) = self.config {
let config = if let Some(config) = &self.config {
config
} else {
bail!("Pipeline not configured, cannot run")
@ -115,14 +134,23 @@ impl PipelineRunner {
// run transcoder pipeline
let (mut pkt, stream) = self.demuxer.get_packet()?;
let src_index = (*stream).index;
if pkt.is_null() {
self.handle.block_on(async {
if let Err(e) = self.overseer.on_end(&config.id).await {
error!("Failed to end stream: {e}");
}
});
self.flush()?;
return Ok(false);
}
// TODO: For copy streams, skip decoder
let frames = if let Ok(frames) = self.decoder.decode_pkt(pkt) {
frames
} else {
warn!("Error decoding frames");
return Ok(());
let frames = match self.decoder.decode_pkt(pkt) {
Ok(f) => f,
Err(e) => {
warn!("Error decoding frames, {e}");
return Ok(true);
}
};
let mut egress_results = vec![];
@ -164,7 +192,7 @@ impl PipelineRunner {
let pkt_vars = config
.variants
.iter()
.filter(|v| v.src_index() == src_index as usize);
.filter(|v| v.src_index() == (*stream).index as usize);
for var in pkt_vars {
let enc = if let Some(enc) = self.encoders.get_mut(&var.id()) {
enc
@ -172,6 +200,18 @@ impl PipelineRunner {
//warn!("Frame had nowhere to go in {} :/", var.id());
continue;
};
// before encoding frame, rescale timestamps
if !frame.is_null() {
let enc_ctx = enc.codec_context();
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).pts =
av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).pkt_dts =
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).duration =
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
(*frame).time_base = (*enc_ctx).time_base;
}
let mut new_frame = false;
let mut frame = match var {
@ -192,8 +232,6 @@ impl PipelineRunner {
f.buffer_frame(resampled_frame, frame_size as usize)?
{
av_frame_free(&mut resampled_frame);
// assume timebase of the encoder
//(*ret).time_base = (*enc.codec_context()).time_base;
ret
} else {
av_frame_free(&mut resampled_frame);
@ -206,19 +244,6 @@ impl PipelineRunner {
_ => frame,
};
// before encoding frame, rescale timestamps
if !frame.is_null() {
let enc_ctx = enc.codec_context();
(*frame).pict_type = AV_PICTURE_TYPE_NONE;
(*frame).pts =
av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).pkt_dts =
av_rescale_q((*frame).pkt_dts, (*frame).time_base, (*enc_ctx).time_base);
(*frame).duration =
av_rescale_q((*frame).duration, (*frame).time_base, (*enc_ctx).time_base);
(*frame).time_base = (*enc_ctx).time_base;
}
let packets = enc.encode_frame(frame)?;
// pass new packets to egress
for mut pkt in packets {
@ -259,7 +284,7 @@ impl PipelineRunner {
self.fps_counter_start = Instant::now();
self.frame_ctr = 0;
}
Ok(())
Ok(true)
}
unsafe fn setup(&mut self) -> Result<()> {

View File

@ -1274,7 +1274,6 @@ dependencies = [
"tokio-stream",
"tracing",
"url",
"uuid",
]
[[package]]
@ -1356,7 +1355,6 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
"uuid",
"whoami",
]
@ -1396,7 +1394,6 @@ dependencies = [
"stringprep",
"thiserror",
"tracing",
"uuid",
"whoami",
]
@ -1422,7 +1419,6 @@ dependencies = [
"sqlx-core",
"tracing",
"url",
"uuid",
]
[[package]]

View File

@ -10,6 +10,6 @@ test-pattern = []
[dependencies]
anyhow = "^1.0.70"
chrono = { version = "0.4.38", features = ["serde"] }
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono", "uuid"] }
sqlx = { version = "0.8.1", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
log = "0.4.22"
uuid = { version = "1.11.0", features = ["v4"] }

View File

@ -14,11 +14,11 @@ create table user
create unique index ix_user_pubkey on user (pubkey);
create table user_stream
(
id UUID not null primary key,
id varchar(50) not null primary key,
user_id integer unsigned not null,
starts timestamp not null,
starts timestamp not null,
ends timestamp,
state smallint not null,
state tinyint unsigned not null,
title text,
summary text,
image text,
@ -28,9 +28,9 @@ create table user_stream
goal text,
pinned text,
-- milli-sats paid for this stream
cost bigint not null default 0,
cost bigint unsigned not null default 0,
-- duration in seconds
duration float not null default 0,
duration float not null default 0,
-- admission fee
fee integer unsigned,
-- current nostr event json

View File

@ -96,7 +96,7 @@ impl ZapStreamDb {
pub async fn get_stream(&self, id: &Uuid) -> Result<UserStream> {
Ok(sqlx::query_as("select * from user_stream where id = ?")
.bind(id)
.bind(id.to_string())
.fetch_one(&self.db)
.await
.map_err(anyhow::Error::new)?)

View File

@ -48,7 +48,7 @@ impl Display for UserStreamState {
#[derive(Debug, Clone, Default, FromRow)]
pub struct UserStream {
pub id: Uuid,
pub id: String,
pub user_id: u64,
pub starts: DateTime<Utc>,
pub ends: Option<DateTime<Utc>>,