From e111e5019944a78588a377b6b5b660cd6dd53b39 Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 18 Nov 2024 16:05:25 +0000 Subject: [PATCH] feat: hls progress --- Cargo.lock | 2 +- Cargo.toml | 5 +- TODO.md | 2 + config.yaml | 6 ++ src/bin/zap_stream_core.rs | 74 +++++++++++++------- src/blossom.rs | 9 ++- src/egress/mod.rs | 15 ---- src/egress/recorder.rs | 34 +++++++--- src/ingress/file.rs | 8 +-- src/ingress/mod.rs | 4 +- src/ingress/srt.rs | 11 +-- src/ingress/tcp.rs | 8 +-- src/ingress/test.rs | 27 +++++--- src/mux/hls.rs | 136 ++++++++++++++++++++++++++++++------- src/overseer/mod.rs | 40 +++++++++-- src/overseer/webhook.rs | 10 +++ src/overseer/zap_stream.rs | 120 +++++++++++++++++++++++++------- src/pipeline/mod.rs | 14 ++-- src/pipeline/runner.rs | 111 ++++++++++++++++++++---------- src/settings.rs | 12 ++++ src/variant/audio.rs | 15 ++-- src/variant/video.rs | 16 ++--- zap-stream-db/src/db.rs | 11 ++- 23 files changed, 489 insertions(+), 201 deletions(-) create mode 100644 TODO.md diff --git a/Cargo.lock b/Cargo.lock index 0d01e8b..c37fc8f 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -1022,7 +1022,7 @@ dependencies = [ [[package]] name = "ffmpeg-rs-raw" version = "0.1.0" -source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=0abe0c5229adeb64b013d1895c7eba3d917f05a4#0abe0c5229adeb64b013d1895c7eba3d917f05a4" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=c2ae78acbcbe315137aea94c77b0db7ea538a709#c2ae78acbcbe315137aea94c77b0db7ea538a709" dependencies = [ "anyhow", "ffmpeg-sys-the-third", diff --git a/Cargo.toml b/Cargo.toml index b57e1a5..4c9b262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,11 @@ zap-stream = [ "tokio/fs", "dep:base64", "dep:sha2", - "dep:hex" ] test-pattern = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue", "dep:ringbuf", "zap-stream-db/test-pattern"] [dependencies] -ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "0abe0c5229adeb64b013d1895c7eba3d917f05a4" } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "c2ae78acbcbe315137aea94c77b0db7ea538a709" } tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } anyhow = { version = "^1.0.91", features = ["backtrace"] } pretty_env_logger = "0.5.0" @@ -42,6 +41,7 @@ warp = "0.3.7" libc = "0.2.162" m3u8-rs = "6.0.0" chrono = "^0.4.38" +hex = "0.4.3" # test-pattern srt-tokio = { version = "0.4.3", optional = true } @@ -58,4 +58,3 @@ fedimint-tonic-lnd = { version = "0.2.0", optional = true, default-features = fa reqwest = { version = "0.12.9", optional = true, features = ["stream"] } base64 = { version = "0.22.1", optional = true } sha2 = { version = "0.10.8", optional = true } -hex = { version = "0.4.3", optional = true } diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..cbafb29 --- /dev/null +++ b/TODO.md @@ -0,0 +1,2 @@ +- Setup multi-variant output +- Manage event lifecycle (close stream) \ No newline at end of file diff --git a/config.yaml b/config.yaml index 081dd57..86817bc 100755 --- a/config.yaml +++ b/config.yaml @@ -9,6 +9,12 @@ endpoints: # Output directory for recording / hls output_dir: "./out" +# Public URL for serving files for [output_dir] +public_url: "http://localhost:8080" + +# Bind address for http server serving files from [output_dir] +listen_http: "127.0.0.1:8080" + # Overseer is the main control structure which controls access to the service # # ** ONLY 1 OVERSEER CAN BE CONFIGURED AT A TIME ** diff --git a/src/bin/zap_stream_core.rs b/src/bin/zap_stream_core.rs index 14951ae..019ab94 100644 --- a/src/bin/zap_stream_core.rs +++ b/src/bin/zap_stream_core.rs @@ -1,8 +1,12 @@ +use anyhow::{bail, Result}; use clap::Parser; use config::Config; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::av_version_info; -use ffmpeg_rs_raw::rstr; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_log_set_callback, av_version_info}; +use ffmpeg_rs_raw::{av_log_redirect, rstr}; use log::{error, info}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::task::JoinHandle; use url::Url; use zap_stream_core::egress::http::listen_out_dir; @@ -12,19 +16,20 @@ use zap_stream_core::ingress::srt; use zap_stream_core::ingress::test; use zap_stream_core::ingress::{file, tcp}; +use zap_stream_core::overseer::Overseer; use zap_stream_core::settings::Settings; #[derive(Parser, Debug)] struct Args {} #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { pretty_env_logger::init(); let _args = Args::parse(); unsafe { - //ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_DEBUG); + av_log_set_callback(Some(av_log_redirect)); info!("FFMPEG version={}", rstr!(av_version_info())); } @@ -38,38 +43,55 @@ async fn main() -> anyhow::Result<()> { let mut listeners = vec![]; for e in &settings.endpoints { - let u: Url = e.parse()?; - match u.scheme() { - #[cfg(feature = "srt")] - "srt" => listeners.push(tokio::spawn(srt::listen( - u.host().unwrap().to_string(), - overseer.clone(), - ))), - "tcp" => listeners.push(tokio::spawn(tcp::listen( - u.host().unwrap().to_string(), - overseer.clone(), - ))), - "file" => listeners.push(tokio::spawn(file::listen( - u.path().parse()?, - overseer.clone(), - ))), - #[cfg(feature = "test-pattern")] - "test-pattern" => listeners.push(tokio::spawn(test::listen(overseer.clone()))), - _ => { - error!("Unknown endpoint config: {e}"); - } + match try_create_listener(e, &settings.output_dir, &overseer) { + Ok(l) => listeners.push(l), + Err(e) => error!("{}", e), } } listeners.push(tokio::spawn(listen_out_dir( - "0.0.0.0:8080".to_owned(), + settings.listen_http, settings.output_dir, ))); for handle in listeners { - if let Err(e) = handle.await { + if let Err(e) = handle.await? { error!("{e}"); } } info!("Server closed"); Ok(()) } + +fn try_create_listener( + u: &str, + out_dir: &str, + overseer: &Arc, +) -> Result>> { + let url: Url = u.parse()?; + match url.scheme() { + #[cfg(feature = "srt")] + "srt" => Ok(tokio::spawn(srt::listen( + out_dir.to_string(), + format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + overseer.clone(), + ))), + "tcp" => Ok(tokio::spawn(tcp::listen( + out_dir.to_string(), + format!("{}:{}", url.host().unwrap(), url.port().unwrap()), + overseer.clone(), + ))), + "file" => Ok(tokio::spawn(file::listen( + out_dir.to_string(), + PathBuf::from(url.path()), + overseer.clone(), + ))), + #[cfg(feature = "test-pattern")] + "test-pattern" => Ok(tokio::spawn(test::listen( + out_dir.to_string(), + overseer.clone(), + ))), + _ => { + bail!("Unknown endpoint config: {u}"); + } + } +} diff --git a/src/blossom.rs b/src/blossom.rs index 0d9c148..6c9f016 100644 --- a/src/blossom.rs +++ b/src/blossom.rs @@ -51,7 +51,12 @@ impl Blossom { Ok(hex::encode(hash)) } - pub async fn upload(&self, from_file: &PathBuf, keys: &Keys) -> Result { + pub async fn upload( + &self, + from_file: &PathBuf, + keys: &Keys, + mime: Option<&str>, + ) -> Result { let mut f = File::open(from_file).await?; let hash = Self::hash_file(&mut f).await?; let auth_event = EventBuilder::new( @@ -69,7 +74,7 @@ impl Blossom { let rsp: BlobDescriptor = self .client .put(self.url.join("/upload").unwrap()) - .header("Content-Type", "application/octet-stream") + .header("Content-Type", mime.unwrap_or("application/octet-stream")) .header( "Authorization", &format!( diff --git a/src/egress/mod.rs b/src/egress/mod.rs index a503203..9d27a0d 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -2,7 +2,6 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::fmt::{Display, Formatter}; use std::path::PathBuf; use uuid::Uuid; @@ -13,24 +12,10 @@ pub mod recorder; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EgressConfig { pub name: String, - pub out_dir: String, /// Which variants will be used in this muxer pub variants: HashSet, } -impl Display for EgressConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}: out_dir={}", self.name, self.out_dir)?; - if !self.variants.is_empty() { - write!(f, "\n\tStreams: ")?; - for v in &self.variants { - write!(f, "\n\t\t{}", v)?; - } - } - Ok(()) - } -} - pub trait Egress { unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result; diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index 5291008..861e62e 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -1,40 +1,51 @@ use anyhow::Result; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use ffmpeg_rs_raw::{Encoder, Muxer}; +use std::collections::HashMap; use std::fs; use std::path::PathBuf; use uuid::Uuid; -use crate::egress::{Egress, EgressConfig, EgressResult}; +use crate::egress::{Egress, EgressResult}; +use crate::variant::{StreamMapping, VariantStream}; pub struct RecorderEgress { + /// Pipeline ID id: Uuid, - config: EgressConfig, + /// Internal muxer writing the output packets muxer: Muxer, + /// Mapping from Variant ID to stream index + var_map: HashMap, } impl RecorderEgress { pub fn new<'a>( - config: EgressConfig, - variants: impl Iterator, + id: &Uuid, + out_dir: &str, + variants: impl Iterator, ) -> Result { - let id = Uuid::new_v4(); - let base = PathBuf::from(&config.out_dir).join(id.to_string()); + let base = PathBuf::from(out_dir).join(id.to_string()); let out_file = base.join("recording.ts"); fs::create_dir_all(&base)?; + let mut var_map = HashMap::new(); let muxer = unsafe { let mut m = Muxer::builder() .with_output_path(out_file.to_str().unwrap(), None)? .build()?; - for var in variants { - m.add_stream_encoder(var)?; + for (var, enc) in variants { + let stream = m.add_stream_encoder(enc)?; + var_map.insert(var.id(), (*stream).index); } m.open(None)?; m }; - Ok(Self { id, config, muxer }) + Ok(Self { + id: id.clone(), + muxer, + var_map, + }) } } @@ -44,7 +55,10 @@ impl Egress for RecorderEgress { packet: *mut AVPacket, variant: &Uuid, ) -> Result { - if self.config.variants.contains(variant) { + if let Some(stream) = self.var_map.get(variant) { + // very important for muxer to know which stream this pkt belongs to + (*packet).stream_index = *stream; + self.muxer.write_packet(packet)?; } Ok(EgressResult::None) diff --git a/src/ingress/file.rs b/src/ingress/file.rs index 2ba8108..ed4e836 100644 --- a/src/ingress/file.rs +++ b/src/ingress/file.rs @@ -5,16 +5,16 @@ use log::info; use std::path::PathBuf; use std::sync::Arc; -pub async fn listen(path: PathBuf, overseer: Arc) -> Result<()> { - info!("Sending file {}", path.to_str().unwrap()); +pub async fn listen(out_dir: String, path: PathBuf, overseer: Arc) -> Result<()> { + info!("Sending file: {}", path.display()); let info = ConnectionInfo { ip_addr: "127.0.0.1:6969".to_string(), endpoint: "file-input".to_owned(), - key: "".to_string(), + key: "test".to_string(), }; let file = std::fs::File::open(path)?; - spawn_pipeline(info, overseer.clone(), Box::new(file)).await; + spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(file)).await; Ok(()) } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index bf23c94..f8500ad 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -27,14 +27,16 @@ pub struct ConnectionInfo { pub async fn spawn_pipeline( info: ConnectionInfo, + out_dir: String, seer: Arc, reader: Box, ) { info!("New client connected: {}", &info.ip_addr); let handle = Handle::current(); let seer = seer.clone(); + let out_dir = out_dir.to_string(); std::thread::spawn(move || unsafe { - match PipelineRunner::new(handle, seer, info, reader) { + match PipelineRunner::new(handle, out_dir, seer, info, reader) { Ok(mut pl) => loop { if let Err(e) = pl.run() { error!("Pipeline run failed: {}", e); diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs index 774f845..6a5ab9a 100644 --- a/src/ingress/srt.rs +++ b/src/ingress/srt.rs @@ -9,17 +9,18 @@ use srt_tokio::{SrtListener, SrtSocket}; use std::sync::Arc; use tokio::sync::mpsc::unbounded_channel; -pub async fn listen(listen_addr: String, overseer: Arc) -> Result<()> { - let (_binding, mut packets) = SrtListener::builder().bind(listen_addr.clone()).await?; +pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { + let (_binding, mut packets) = SrtListener::builder().bind(&addr).await?; - info!("SRT listening on: {}", listen_addr.clone()); + info!("SRT listening on: {}", &addr); while let Some(request) = packets.incoming().next().await { let mut socket = request.accept(None).await?; let info = ConnectionInfo { - endpoint: listen_addr.clone(), + endpoint: addr.clone(), ip_addr: socket.settings().remote.to_string(), + key: "".to_string(), }; - spawn_pipeline(info, overseer.clone(), Box::new(socket)).await; + spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; } Ok(()) } diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs index c96473e..d3f0ed8 100644 --- a/src/ingress/tcp.rs +++ b/src/ingress/tcp.rs @@ -6,10 +6,10 @@ use tokio::net::TcpListener; use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; -pub async fn listen(addr: String, overseer: Arc) -> Result<()> { - let listener = TcpListener::bind(addr.clone()).await?; +pub async fn listen(out_dir: String, addr: String, overseer: Arc) -> Result<()> { + let listener = TcpListener::bind(&addr).await?; - info!("TCP listening on: {}", addr.clone()); + info!("TCP listening on: {}", &addr); while let Ok((socket, ip)) = listener.accept().await { let info = ConnectionInfo { ip_addr: ip.to_string(), @@ -17,7 +17,7 @@ pub async fn listen(addr: String, overseer: Arc) -> Result<()> { key: "".to_string(), }; let socket = socket.into_std()?; - spawn_pipeline(info, overseer.clone(), Box::new(socket)).await; + spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(socket)).await; } Ok(()) } diff --git a/src/ingress/test.rs b/src/ingress/test.rs index 2aee756..9502a3c 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -1,12 +1,11 @@ use crate::ingress::{spawn_pipeline, ConnectionInfo}; use crate::overseer::Overseer; use anyhow::Result; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P}; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_alloc, av_frame_get_buffer, AV_PROFILE_H264_MAIN, + av_frame_alloc, av_frame_free, av_frame_get_buffer, av_packet_free, AV_PROFILE_H264_MAIN, }; use ffmpeg_rs_raw::{Encoder, Muxer, Scaler}; use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; @@ -19,16 +18,16 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tiny_skia::Pixmap; -pub async fn listen(overseer: Arc) -> Result<()> { +pub async fn listen(out_dir: String, overseer: Arc) -> Result<()> { info!("Test pattern enabled"); let info = ConnectionInfo { endpoint: "test-pattern".to_string(), ip_addr: "test-pattern".to_string(), - key: "test-pattern".to_string(), + key: "test".to_string(), }; let src = TestPatternSrc::new()?; - spawn_pipeline(info, overseer.clone(), Box::new(src)).await; + spawn_pipeline(info, out_dir.clone(), overseer.clone(), Box::new(src)).await; Ok(()) } @@ -49,9 +48,9 @@ impl TestPatternSrc { pub fn new() -> Result { let scaler = Scaler::new(); let encoder = unsafe { - Encoder::new(AV_CODEC_ID_H264)? + Encoder::new_with_name("libx264")? .with_stream_index(0) - .with_framerate(30.0) + .with_framerate(30.0)? .with_bitrate(1_000_000) .with_pix_fmt(AV_PIX_FMT_YUV420P) .with_width(1280) @@ -64,7 +63,10 @@ impl TestPatternSrc { let svg_data = include_bytes!("../../test.svg"); let tree = usvg::Tree::from_data(svg_data, &Default::default())?; let mut pixmap = Pixmap::new(1280, 720).unwrap(); - let render_ts = tiny_skia::Transform::from_scale(1f32, 1f32); + let render_ts = tiny_skia::Transform::from_scale( + pixmap.width() as f32 / tree.size().width(), + pixmap.height() as f32 / tree.size().height(), + ); resvg::render(&tree, render_ts, &mut pixmap.as_mut()); let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8]; @@ -108,7 +110,7 @@ impl TestPatternSrc { self.frame_no += 1; - let src_frame = unsafe { + let mut src_frame = unsafe { let src_frame = av_frame_alloc(); (*src_frame).width = 1280; @@ -152,12 +154,15 @@ impl TestPatternSrc { } // scale/encode - let frame = self + let mut frame = self .scaler .process_frame(src_frame, 1280, 720, AV_PIX_FMT_YUV420P)?; - for pkt in self.encoder.encode_frame(frame)? { + for mut pkt in self.encoder.encode_frame(frame)? { self.muxer.write_packet(pkt)?; + av_packet_free(&mut pkt); } + av_frame_free(&mut frame); + av_frame_free(&mut src_frame); Ok(()) } } diff --git a/src/mux/hls.rs b/src/mux/hls.rs index bd8e897..989d31b 100644 --- a/src/mux/hls.rs +++ b/src/mux/hls.rs @@ -1,9 +1,10 @@ use crate::egress::NewSegment; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVIO_FLAG_WRITE, - AV_PKT_FLAG_KEY, + av_free, av_opt_set, av_q2d, av_write_frame, avio_flush, avio_open, AVPacket, AVStream, + AVIO_FLAG_WRITE, AV_PKT_FLAG_KEY, }; use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; @@ -41,6 +42,14 @@ impl HlsVariantStream { HlsVariantStream::Subtitle { id, .. } => id, } } + + pub fn index(&self) -> &usize { + match self { + HlsVariantStream::Video { index, .. } => index, + HlsVariantStream::Audio { index, .. } => index, + HlsVariantStream::Subtitle { index, .. } => index, + } + } } impl Display for HlsVariantStream { @@ -64,6 +73,8 @@ pub struct HlsVariant { pub segment_length: f32, /// Current segment index pub idx: u64, + /// Current segment start time in seconds (duration) + pub pkt_start: f32, /// Output directory (base) pub out_dir: String, /// List of segments to be included in the playlist @@ -142,6 +153,7 @@ impl HlsVariant { mux, streams, idx: 1, + pkt_start: 0.0, segments: Vec::from([SegmentInfo(1, segment_length)]), out_dir: out_dir.to_string(), }) @@ -165,21 +177,22 @@ impl HlsVariant { /// Mux a packet created by the encoder for this variant pub unsafe fn mux_packet(&mut self, pkt: *mut AVPacket) -> Result> { + let pkt_q = av_q2d((*pkt).time_base); // time of this packet in seconds - let pkt_time = (*pkt).pts as f32 * av_q2d((*pkt).time_base) as f32; + let pkt_time = (*pkt).pts as f32 * pkt_q as f32; // what segment this pkt should be in (index) let pkt_seg = 1 + (pkt_time / self.segment_length).floor() as u64; let mut result = None; let can_split = (*pkt).flags & AV_PKT_FLAG_KEY == AV_PKT_FLAG_KEY; if pkt_seg != self.idx && can_split { - result = Some(self.split_next_seg()?); + result = Some(self.split_next_seg(pkt_time)?); } self.mux.write_packet(pkt)?; Ok(result) } - unsafe fn split_next_seg(&mut self) -> Result { + unsafe fn split_next_seg(&mut self, pkt_time: f32) -> Result { self.idx += 1; // Manually reset muxer avio @@ -204,9 +217,8 @@ impl HlsVariant { 0, ); - // TODO: calc actual duration - let duration = 2.0; - info!("Writing segment {}", &next_seg_url); + let duration = pkt_time - self.pkt_start; + info!("Writing segment {} [{}s]", &next_seg_url, duration); if let Err(e) = self.add_segment(self.idx, duration) { warn!("Failed to update playlist: {}", e); } @@ -214,20 +226,24 @@ impl HlsVariant { /// Get the video variant for this group /// since this could actually be audio which would not be useful for /// [Overseer] impl - let video_var = self - .streams - .iter() - .find(|a| matches!(*a, HlsVariantStream::Video { .. })) - .map_or(Default::default(), |v| v.id().clone()); + let video_var = self.video_stream().unwrap_or(self.streams.first().unwrap()); // emit result of the previously completed segment, let prev_seg = self.idx - 1; - Ok(NewSegment { - variant: video_var, + let ret = NewSegment { + variant: *video_var.id(), idx: prev_seg, duration, path: PathBuf::from(Self::map_segment_path(&*self.out_dir, &self.name, prev_seg)), - }) + }; + self.pkt_start = pkt_time; + Ok(ret) + } + + fn video_stream(&self) -> Option<&HlsVariantStream> { + self.streams + .iter() + .find(|a| matches!(*a, HlsVariantStream::Video { .. })) } fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> { @@ -258,34 +274,104 @@ impl HlsVariant { pl.write_to(&mut f_out)?; Ok(()) } + + /// https://git.ffmpeg.org/gitweb/ffmpeg.git/blob/HEAD:/libavformat/hlsenc.c#l351 + unsafe fn to_codec_attr(&self, stream: *mut AVStream) -> Option { + let p = (*stream).codecpar; + if (*p).codec_id == AV_CODEC_ID_H264 { + let data = (*p).extradata; + if !data.is_null() { + let mut id_ptr = ptr::null_mut(); + let ds: *mut u16 = data as *mut u16; + if (*ds) == 1 && (*data.add(4)) & 0x1F == 7 { + id_ptr = data.add(5); + } else if (*ds) == 1 && (*data.add(3)) & 0x1F == 7 { + id_ptr = data.add(4); + } else if *data.add(0) == 1 { + id_ptr = data.add(1); + } else { + return None; + } + + return Some(format!( + "avc1.{}", + hex::encode([*id_ptr.add(0), *id_ptr.add(1), *id_ptr.add(2)]) + )); + } + } + None + } + + pub fn to_playlist_variant(&self) -> m3u8_rs::VariantStream { + unsafe { + let pes = self.video_stream().unwrap_or(self.streams.first().unwrap()); + let av_stream = *(*self.mux.context()).streams.add(*pes.index()); + let codec_par = (*av_stream).codecpar; + m3u8_rs::VariantStream { + is_i_frame: false, + uri: format!("{}/live.m3u8", self.name), + bandwidth: 0, + average_bandwidth: Some((*codec_par).bit_rate as u64), + codecs: self.to_codec_attr(av_stream), + resolution: Some(m3u8_rs::Resolution { + width: (*codec_par).width as _, + height: (*codec_par).height as _, + }), + frame_rate: Some(av_q2d((*codec_par).framerate)), + hdcp_level: None, + audio: None, + video: None, + subtitles: None, + closed_captions: None, + other_attributes: None, + } + } + } } pub struct HlsMuxer { + out_dir: PathBuf, variants: Vec, } impl HlsMuxer { pub fn new<'a>( + id: &Uuid, out_dir: &str, segment_length: f32, encoders: impl Iterator, ) -> Result { - let id = Uuid::new_v4(); - let base = PathBuf::from(out_dir) - .join(id.to_string()) - .to_string_lossy() - .to_string(); + let base = PathBuf::from(out_dir).join(id.to_string()); let mut vars = Vec::new(); for (k, group) in &encoders .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .chunk_by(|a| a.0.group_id()) { - let var = HlsVariant::new(&base, segment_length, k, group)?; + let var = HlsVariant::new(base.to_str().unwrap(), segment_length, k, group)?; vars.push(var); } - Ok(Self { variants: vars }) + let ret = Self { + out_dir: base, + variants: vars, + }; + ret.write_master_playlist()?; + Ok(ret) + } + + fn write_master_playlist(&self) -> Result<()> { + let mut pl = m3u8_rs::MasterPlaylist::default(); + pl.version = Some(3); + pl.variants = self + .variants + .iter() + .map(|v| v.to_playlist_variant()) + .collect(); + + let mut f_out = File::create(self.out_dir.join("live.m3u8"))?; + pl.write_to(&mut f_out)?; + Ok(()) } /// Mux an encoded packet from [Encoder] @@ -295,7 +381,9 @@ impl HlsMuxer { variant: &Uuid, ) -> Result> { for var in self.variants.iter_mut() { - if var.streams.iter().any(|s| s.id() == variant) { + if let Some(vs) = var.streams.iter().find(|s| s.id() == variant) { + // very important for muxer to know which stream this pkt belongs to + (*pkt).stream_index = *vs.index() as _; return var.mux_packet(pkt); } } diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index 3048514..e3b7674 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -71,6 +71,16 @@ pub trait Overseer: Send + Sync { duration: f32, path: &PathBuf, ) -> Result<()>; + + /// At a regular interval, pipeline will emit one of the frames for processing as a + /// thumbnail + async fn on_thumbnail( + &self, + pipeline_id: &Uuid, + width: usize, + height: usize, + path: &PathBuf, + ) -> Result<()>; } impl Settings { @@ -86,13 +96,23 @@ impl Settings { database, lnd, relays, + blossom, } => { #[cfg(not(feature = "zap-stream"))] panic!("zap.stream overseer is not enabled"); #[cfg(feature = "zap-stream")] Ok(Arc::new( - ZapStreamOverseer::new(private_key, database, lnd, relays).await?, + ZapStreamOverseer::new( + &self.output_dir, + &self.public_url, + private_key, + database, + lnd, + relays, + blossom, + ) + .await?, )) } } @@ -123,7 +143,7 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result Result Result<()> { todo!() } + + async fn on_thumbnail( + &self, + pipeline_id: &Uuid, + width: usize, + height: usize, + path: &PathBuf, + ) -> Result<()> { + todo!() + } } diff --git a/src/overseer/webhook.rs b/src/overseer/webhook.rs index dfe58f8..0956ce1 100644 --- a/src/overseer/webhook.rs +++ b/src/overseer/webhook.rs @@ -39,4 +39,14 @@ impl Overseer for WebhookOverseer { ) -> Result<()> { todo!() } + + async fn on_thumbnail( + &self, + pipeline_id: &Uuid, + width: usize, + height: usize, + path: &PathBuf, + ) -> Result<()> { + todo!() + } } diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index 12d33a8..58b3f76 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -10,33 +10,52 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use chrono::Utc; use fedimint_tonic_lnd::verrpc::VersionRequest; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_MJPEG; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVFrame; +use ffmpeg_rs_raw::Encoder; use futures_util::FutureExt; use log::info; use nostr_sdk::bitcoin::PrivateKey; -use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag}; +use nostr_sdk::prelude::Coordinate; +use nostr_sdk::{Client, Event, EventBuilder, JsonUtil, Keys, Kind, Tag, ToBech32}; use std::env::temp_dir; use std::fs::create_dir_all; use std::path::PathBuf; use std::str::FromStr; +use url::Url; use uuid::Uuid; +use zap_stream_db::sqlx::Encode; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; -const STREAM_EVENT_KIND: u16 = 30_313; +const STREAM_EVENT_KIND: u16 = 30_311; /// zap.stream NIP-53 overseer pub struct ZapStreamOverseer { + /// Dir where HTTP server serves files from + out_dir: String, + /// Database instance for accounts/streams db: ZapStreamDb, + /// LND node connection lnd: fedimint_tonic_lnd::Client, + /// Nostr client for publishing events client: Client, + /// Nostr keys used to sign events keys: Keys, + /// List of blossom servers to upload segments to + blossom_servers: Vec, + /// Public facing URL pointing to [out_dir] + public_url: String, } impl ZapStreamOverseer { pub async fn new( + out_dir: &String, + public_url: &String, private_key: &str, db: &str, lnd: &LndSettings, relays: &Vec, + blossom_servers: &Option>, ) -> Result { let db = ZapStreamDb::new(db).await?; db.migrate().await?; @@ -62,10 +81,18 @@ impl ZapStreamOverseer { client.connect().await; Ok(Self { + out_dir: out_dir.clone(), db, lnd, client, keys, + blossom_servers: blossom_servers + .as_ref() + .unwrap_or(&Vec::new()) + .into_iter() + .map(|b| Blossom::new(b)) + .collect(), + public_url: public_url.clone(), }) } } @@ -83,15 +110,11 @@ impl Overseer for ZapStreamOverseer { .await? .ok_or_else(|| anyhow::anyhow!("User not found"))?; - let out_dir = temp_dir().join("zap-stream"); - create_dir_all(&out_dir)?; - let variants = get_default_variants(&stream_info)?; let mut egress = vec![]; egress.push(EgressType::HLS(EgressConfig { - name: "nip94-hls".to_string(), - out_dir: out_dir.to_string_lossy().to_string(), + name: "hls".to_string(), variants: variants.iter().map(|v| v.id()).collect(), })); @@ -103,7 +126,8 @@ impl Overseer for ZapStreamOverseer { state: UserStreamState::Live, ..Default::default() }; - let stream_event = publish_stream_event(&new_stream, &self.client).await?; + let stream_event = + publish_stream_event(&new_stream, &self.client, &self.keys, &self.public_url).await?; new_stream.event = Some(stream_event.as_json()); self.db.insert_stream(&new_stream).await?; @@ -122,27 +146,43 @@ impl Overseer for ZapStreamOverseer { duration: f32, path: &PathBuf, ) -> Result<()> { - let blossom = Blossom::new("http://localhost:8881/"); - let blob = blossom.upload(path, &self.keys).await?; + // Upload to blossom servers if configured + let mut blobs = vec![]; + for b in &self.blossom_servers { + blobs.push(b.upload(path, &self.keys, Some("video/mp2t")).await?); + } + if let Some(blob) = blobs.first() { + let a_tag = format!( + "{}:{}:{}", + STREAM_EVENT_KIND, + self.keys.public_key.to_hex(), + pipeline_id + ); + let mut n94 = blob_to_event_builder(blob)?.add_tags(Tag::parse(&["a", &a_tag])); + for b in blobs.iter().skip(1) { + n94 = n94.add_tags(Tag::parse(&["url", &b.url])); + } + let n94 = n94.sign_with_keys(&self.keys)?; + self.client.send_event(n94).await?; + info!("Published N94 segment for {}", a_tag); + } - let a_tag = format!( - "{}:{}:{}", - pipeline_id, - self.keys.public_key.to_hex(), - STREAM_EVENT_KIND - ); - // publish nip94 tagged to stream - let n96 = blob_to_event_builder(&blob)? - .add_tags(Tag::parse(&["a", &a_tag])) - .sign_with_keys(&self.keys)?; - self.client.send_event(n96).await?; - info!("Published N96 segment for {}", a_tag); + Ok(()) + } + async fn on_thumbnail( + &self, + pipeline_id: &Uuid, + width: usize, + height: usize, + pixels: &PathBuf, + ) -> Result<()> { + // nothing to do Ok(()) } } -fn stream_to_event_builder(this: &UserStream) -> Result { +fn stream_to_event_builder(this: &UserStream, keys: &Keys) -> Result { let mut tags = vec![ Tag::parse(&["d".to_string(), this.id.to_string()])?, Tag::parse(&["status".to_string(), this.state.to_string()])?, @@ -183,11 +223,39 @@ fn stream_to_event_builder(this: &UserStream) -> Result { tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?); } } - Ok(EventBuilder::new(Kind::from(STREAM_EVENT_KIND), "", tags)) + + let kind = Kind::from(STREAM_EVENT_KIND); + let coord = Coordinate::new(kind, keys.public_key).identifier(this.id); + tags.push(Tag::parse(&[ + "alt", + &format!("Watch live on https://zap.stream/{}", coord.to_bech32()?), + ])?); + Ok(EventBuilder::new(kind, "", tags)) } -async fn publish_stream_event(this: &UserStream, client: &Client) -> Result { - let ev = stream_to_event_builder(this)? +fn stream_url_mapping(this: &UserStream, public_url: &str) -> Result { + let u: Url = public_url.parse()?; + // hls muxer always writes the master playlist like this + Ok(u.join(&format!("/{}/live.m3u8", this.id))?.to_string()) +} + +fn image_url_mapping(this: &UserStream, public_url: &str) -> Result { + let u: Url = public_url.parse()?; + // pipeline always writes a thumbnail like this + Ok(u.join(&format!("/{}/thumb.webp", this.id))?.to_string()) +} + +async fn publish_stream_event( + this: &UserStream, + client: &Client, + keys: &Keys, + public_url: &str, +) -> Result { + let ev = stream_to_event_builder(this, keys)? + .add_tags([ + Tag::parse(&["streaming", stream_url_mapping(this, public_url)?.as_str()])?, + Tag::parse(&["image", image_url_mapping(this, public_url)?.as_str()])?, + ]) .sign(&client.signer().await?) .await?; client.send_event(ev.clone()).await?; diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 7f822b3..fde15c4 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -31,15 +31,11 @@ impl EgressType { impl Display for EgressType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - EgressType::HLS(c) => format!("{}", c), - EgressType::Recorder(c) => format!("{}", c), - EgressType::RTMPForwarder(c) => format!("{}", c), - } - ) + match self { + EgressType::HLS(_) => write!(f, "HLS"), + EgressType::Recorder(_) => write!(f, "Recorder"), + EgressType::RTMPForwarder(_) => write!(f, "RTMPForwarder"), + } } } diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 1df4357..a53c264 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::io::Read; use std::mem::transmute; use std::ops::Sub; +use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; @@ -13,11 +14,15 @@ use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; use anyhow::{bail, Result}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_WEBP; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ - av_frame_free, av_get_sample_fmt, av_packet_free, av_rescale_q, + av_frame_free, av_get_sample_fmt, av_packet_free, av_q2d, av_rescale_q, AVMediaType, }; use ffmpeg_rs_raw::{ - cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, StreamType, + cstr, get_frame_from_hw, AudioFifo, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, + StreamType, }; use itertools::Itertools; use log::{error, info, warn}; @@ -47,8 +52,8 @@ pub struct PipelineRunner { /// Scaler for a variant (variant_id, Scaler) scalers: HashMap, - /// Resampler for a variant (variant_id, Resample) - resampler: HashMap, + /// Resampler for a variant (variant_id, Resample+FIFO) + resampler: HashMap, /// Encoder for a variant (variant_id, Encoder) encoders: HashMap, @@ -59,25 +64,28 @@ pub struct PipelineRunner { /// All configured egress' egress: Vec>, - fps_counter_start: Instant, - frame_ctr: u64, - /// Info about the input stream info: Option, /// Overseer managing this pipeline overseer: Arc, + + fps_counter_start: Instant, + frame_ctr: u64, + out_dir: String, } impl PipelineRunner { pub fn new( handle: Handle, + out_dir: String, overseer: Arc, connection: ConnectionInfo, recv: Box, ) -> Result { Ok(Self { handle, + out_dir, overseer, connection, config: Default::default(), @@ -118,12 +126,39 @@ impl PipelineRunner { let mut egress_results = vec![]; for frame in frames { - self.frame_ctr += 1; - // Copy frame from GPU if using hwaccel decoding let mut frame = get_frame_from_hw(frame)?; (*frame).time_base = (*stream).time_base; + let p = (*stream).codecpar; + if (*p).codec_type == AVMediaType::AVMEDIA_TYPE_VIDEO { + let pts_sec = ((*frame).pts as f64 * av_q2d((*stream).time_base)).floor() as u64; + // write thumbnail every 1min + if pts_sec % 60 == 0 && pts_sec != 0 { + let dst_pic = PathBuf::from(&self.out_dir) + .join(config.id.to_string()) + .join("thumb.webp"); + let mut sw = Scaler::new(); + let mut frame = sw.process_frame( + frame, + (*frame).width as _, + (*frame).height as _, + AV_PIX_FMT_YUV420P, + )?; + Encoder::new(AV_CODEC_ID_WEBP)? + .with_height((*frame).height) + .with_width((*frame).width) + .with_pix_fmt(transmute((*frame).format)) + .open(None)? + .save_picture(frame, dst_pic.to_str().unwrap())?; + info!("Saved thumb to: {}", dst_pic.display()); + av_frame_free(&mut frame); + } + + // TODO: fix this, multiple video streams in + self.frame_ctr += 1; + } + // Get the variants which want this pkt let pkt_vars = config .variants @@ -148,11 +183,21 @@ impl PipelineRunner { } } VariantStream::Audio(a) => { - if let Some(r) = self.resampler.get_mut(&a.id()) { + if let Some((r, f)) = self.resampler.get_mut(&a.id()) { let frame_size = (*enc.codec_context()).frame_size; - // TODO: resample audio fifo new_frame = true; - r.process_frame(frame, frame_size)? + let mut resampled_frame = r.process_frame(frame, frame_size)?; + if let Some(ret) = + f.buffer_frame(resampled_frame, frame_size as usize)? + { + av_frame_free(&mut resampled_frame); + // assume timebase of the encoder + //(*ret).time_base = (*enc.codec_context()).time_base; + ret + } else { + av_frame_free(&mut resampled_frame); + continue; + } } else { frame } @@ -163,6 +208,7 @@ impl PipelineRunner { // before encoding frame, rescale timestamps if !frame.is_null() { let enc_ctx = enc.codec_context(); + (*frame).pict_type = AV_PICTURE_TYPE_NONE; (*frame).pts = av_rescale_q((*frame).pts, (*frame).time_base, (*enc_ctx).time_base); (*frame).pkt_dts = @@ -288,12 +334,10 @@ impl PipelineRunner { } VariantStream::Audio(a) => { let enc = a.try_into()?; - let rs = Resample::new( - av_get_sample_fmt(cstr!(a.sample_fmt.as_str())), - a.sample_rate as _, - a.channels as _, - ); - self.resampler.insert(out_stream.id(), rs); + let fmt = av_get_sample_fmt(cstr!(a.sample_fmt.as_str())); + let rs = Resample::new(fmt, a.sample_rate as _, a.channels as _); + let f = AudioFifo::new(fmt, a.channels as _)?; + self.resampler.insert(out_stream.id(), (rs, f)); self.encoders.insert(out_stream.id(), enc); } _ => continue, @@ -304,27 +348,22 @@ impl PipelineRunner { // Setup egress for e in &cfg.egress { + let c = e.config(); + let encoders = self.encoders.iter().filter_map(|(k, v)| { + if c.variants.contains(k) { + let var = cfg.variants.iter().find(|x| x.id() == *k)?; + Some((var, v)) + } else { + None + } + }); match e { - EgressType::HLS(ref c) => { - let encoders = self.encoders.iter().filter_map(|(k, v)| { - if c.variants.contains(k) { - let var = cfg.variants.iter().find(|x| x.id() == *k)?; - Some((var, v)) - } else { - None - } - }); - - let hls = HlsEgress::new(&c.out_dir, 2.0, encoders)?; + EgressType::HLS(_) => { + let hls = HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders)?; self.egress.push(Box::new(hls)); } - EgressType::Recorder(ref c) => { - let encoders = self - .encoders - .iter() - .filter(|(k, _v)| c.variants.contains(k)) - .map(|(_, v)| v); - let rec = RecorderEgress::new(c.clone(), encoders)?; + EgressType::Recorder(_) => { + let rec = RecorderEgress::new(&cfg.id, &self.out_dir, encoders)?; self.egress.push(Box::new(rec)); } _ => warn!("{} is not implemented", e), diff --git a/src/settings.rs b/src/settings.rs index e9ea14d..e1d9173 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -12,6 +12,12 @@ pub struct Settings { /// Where to store output (static files) pub output_dir: String, + /// Public facing URL that maps to [output_dir] + pub public_url: String, + + /// Binding address for http server serving files from [output_dir] + pub listen_http: String, + /// Overseer service see [crate::overseer::Overseer] for more info pub overseer: OverseerConfig, } @@ -31,10 +37,16 @@ pub enum OverseerConfig { }, /// NIP-53 service (i.e. zap.stream backend) ZapStream { + /// MYSQL database connection string database: String, + /// LND node connection details lnd: LndSettings, + /// Relays to publish events to relays: Vec, + /// Nsec to sign nostr events nsec: String, + /// Blossom servers + blossom: Option>, }, } diff --git a/src/variant/audio.rs b/src/variant/audio.rs index a8c0845..fda6c58 100644 --- a/src/variant/audio.rs +++ b/src/variant/audio.rs @@ -1,8 +1,7 @@ -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_get_sample_fmt, avcodec_get_name}; -use ffmpeg_rs_raw::{cstr, rstr, Encoder}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::av_get_sample_fmt; +use ffmpeg_rs_raw::{cstr, Encoder}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; -use std::intrinsics::transmute; use uuid::Uuid; use crate::variant::{StreamMapping, VariantMapping}; @@ -16,8 +15,8 @@ pub struct AudioVariant { /// Bitrate of this stream pub bitrate: u64, - /// AVCodecID - pub codec: usize, + /// Codec name + pub codec: String, /// Number of channels pub channels: u16, @@ -36,7 +35,7 @@ impl Display for AudioVariant { "Audio #{}->{}: {}, {}kbps", self.mapping.src_index, self.mapping.dst_index, - unsafe { rstr!(avcodec_get_name(transmute(self.codec as i32))) }, + self.codec, self.bitrate / 1000 ) } @@ -67,8 +66,8 @@ impl TryInto for &AudioVariant { fn try_into(self) -> Result { unsafe { - let enc = Encoder::new(transmute(self.codec as u32))? - .with_sample_rate(self.sample_rate as _) + let enc = Encoder::new_with_name(&self.codec)? + .with_sample_rate(self.sample_rate as _)? .with_bitrate(self.bitrate as _) .with_default_channel_layout(self.channels as _) .with_sample_format(av_get_sample_fmt(cstr!(self.sample_fmt.as_bytes()))) diff --git a/src/variant/video.rs b/src/variant/video.rs index ab0400b..9052ce7 100644 --- a/src/variant/video.rs +++ b/src/variant/video.rs @@ -1,7 +1,5 @@ -use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_BT709; -use ffmpeg_rs_raw::ffmpeg_sys_the_third::{avcodec_get_name, AVCodecID}; -use ffmpeg_rs_raw::{rstr, Encoder}; +use ffmpeg_rs_raw::Encoder; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; @@ -28,8 +26,8 @@ pub struct VideoVariant { /// Bitrate of this stream pub bitrate: u64, - /// AVCodecID - pub codec: usize, + /// Codec name + pub codec: String, /// Codec profile pub profile: usize, @@ -51,7 +49,7 @@ impl Display for VideoVariant { "Video #{}->{}: {}, {}x{}, {}fps, {}kbps", self.mapping.src_index, self.mapping.dst_index, - unsafe { rstr!(avcodec_get_name(transmute(self.codec as i32))) }, + self.codec, self.width, self.height, self.fps, @@ -87,18 +85,18 @@ impl TryInto for &VideoVariant { fn try_into(self) -> Result { unsafe { let mut opt = HashMap::new(); - if self.codec == transmute::(AV_CODEC_ID_H264) as usize { + if self.codec == "x264" { opt.insert("preset".to_string(), "fast".to_string()); //opt.insert("tune".to_string(), "zerolatency".to_string()); } - let enc = Encoder::new(transmute(self.codec as u32))? + let enc = Encoder::new_with_name(&self.codec)? .with_bitrate(self.bitrate as _) .with_width(self.width as _) .with_height(self.height as _) .with_pix_fmt(transmute(self.pixel_format)) .with_profile(transmute(self.profile as i32)) .with_level(transmute(self.level as i32)) - .with_framerate(self.fps) + .with_framerate(self.fps)? .with_options(|ctx| { (*ctx).gop_size = self.keyframe_interval as _; (*ctx).keyint_min = self.keyframe_interval as _; diff --git a/zap-stream-db/src/db.rs b/zap-stream-db/src/db.rs index dc77c26..65cc886 100644 --- a/zap-stream-db/src/db.rs +++ b/zap-stream-db/src/db.rs @@ -1,6 +1,7 @@ use crate::UserStream; use anyhow::Result; use sqlx::{MySqlPool, Row}; +use uuid::Uuid; pub struct ZapStreamDb { db: MySqlPool, @@ -20,7 +21,7 @@ impl ZapStreamDb { /// Find user by stream key, typical first lookup from ingress pub async fn find_user_stream_key(&self, key: &str) -> Result> { #[cfg(feature = "test-pattern")] - if key == "test-pattern" { + if key == "test" { // use the 00 pubkey for test sources return Ok(Some(self.upsert_user(&[0; 32]).await?)); } @@ -83,4 +84,12 @@ impl ZapStreamDb { .map_err(anyhow::Error::new)?; Ok(()) } + + pub async fn get_stream(&self, id: &Uuid) -> Result { + Ok(sqlx::query_as("select * from user_stream where id = ?") + .bind(id) + .fetch_one(&self.db) + .await + .map_err(anyhow::Error::new)?) + } }