diff --git a/Cargo.lock b/Cargo.lock index b2cb1bc..3e3502e 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,6 +140,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -277,6 +283,12 @@ dependencies = [ "cipher", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "derive_more" version = "0.99.17" @@ -316,6 +328,15 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -474,6 +495,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.13.2" @@ -486,6 +526,30 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "hermit-abi" version = "0.3.9" @@ -509,9 +573,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ "bytes", "fnv", @@ -520,14 +584,27 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http", + "pin-project-lite", ] +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" @@ -536,14 +613,26 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "1.2.0" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", "http", "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", "tokio", + "tower-service", + "tracing", + "want", ] [[package]] @@ -667,6 +756,22 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -693,6 +798,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -819,6 +942,26 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -979,12 +1122,27 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "semver" version = "1.0.22" @@ -1031,6 +1189,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -1042,6 +1212,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1078,6 +1259,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "srt-protocol" version = "0.4.3" @@ -1132,7 +1319,6 @@ dependencies = [ "config", "ffmpeg-sys-next", "futures-util", - "hyper", "itertools", "libc", "log", @@ -1144,6 +1330,7 @@ dependencies = [ "tokio-stream", "url", "uuid", + "warp", ] [[package]] @@ -1282,6 +1469,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -1293,6 +1492,7 @@ dependencies = [ "futures-sink", "pin-project-lite", "tokio", + "tracing", ] [[package]] @@ -1329,6 +1529,57 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "log", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -1341,6 +1592,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -1379,6 +1639,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "1.8.0" @@ -1401,6 +1667,46 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] +name = "warp" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "rustls-pemfile", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 82525db..a4781e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ serde = { version = "1.0.197", features = ["derive"] } config = { version = "0.14.0", features = ["toml"] } url = "2.5.0" itertools = "0.12.1" -hyper = "1.2.0" +warp = "0.3.6" diff --git a/src/decode/mod.rs b/src/decode/mod.rs index eeb1d15..4577a96 100644 --- a/src/decode/mod.rs +++ b/src/decode/mod.rs @@ -7,6 +7,7 @@ use ffmpeg_sys_next::{ avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream, }; +use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE; use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedReceiver; @@ -99,7 +100,9 @@ impl Decoder { } return Err(Error::msg(format!("Failed to decode {}", ret))); } - (*frame).time_base = (*stream).time_base; + // reset picture type, not to confuse the encoder + (*frame).pict_type = AV_PICTURE_TYPE_NONE; + (*frame).opaque = stream as *mut libc::c_void; self.chan_out.send(PipelinePayload::AvFrame( "Decoder frame".to_owned(), frame, diff --git a/src/egress/hls.rs b/src/egress/hls.rs index e69fc28..f90efbf 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -4,7 +4,7 @@ use std::mem::transmute; use std::ptr; use anyhow::Error; -use ffmpeg_sys_next::{AV_CH_LAYOUT_STEREO, av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_parameters_from_context, AVCodecContext, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket, AVRational}; +use ffmpeg_sys_next::{AV_CH_LAYOUT_STEREO, av_dump_format, av_get_sample_fmt, av_interleaved_write_frame, av_opt_set, AVChannelLayout, AVChannelLayout__bindgen_ty_1, avcodec_find_encoder, avcodec_parameters_from_context, AVCodecContext, avformat_alloc_output_context2, avformat_free_context, avformat_new_stream, avformat_write_header, AVFormatContext, AVPacket, AVRational}; use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; @@ -145,7 +145,6 @@ impl HlsEgress { ); for var in &mut self.config.variants { - let tb = var.time_base(); match var { VariantStream::Video(vs) => { let stream = avformat_new_stream(ctx, ptr::null()); @@ -155,22 +154,8 @@ impl HlsEgress { // overwrite dst_index to match output stream vs.dst_index = (*stream).index as usize; - (*stream).time_base = tb; - - let params = (*stream).codecpar; - (*params).height = vs.height as libc::c_int; - (*params).width = vs.width as libc::c_int; - (*params).codec_id = transmute(vs.codec as i32); - (*params).codec_type = AVMEDIA_TYPE_VIDEO; - (*params).format = AV_PIX_FMT_YUV420P as i32; - (*params).framerate = AVRational { - num: 1, - den: vs.fps as libc::c_int, - }; - (*params).bit_rate = vs.bitrate as i64; - (*params).color_space = AVCOL_SPC_BT709; - (*params).level = vs.level as libc::c_int; - (*params).profile = vs.profile as libc::c_int; + vs.to_stream(stream); + vs.to_codec_params((*stream).codecpar); } VariantStream::Audio(va) => { let stream = avformat_new_stream(ctx, ptr::null()); @@ -180,25 +165,8 @@ impl HlsEgress { // overwrite dst_index to match output stream va.dst_index = (*stream).index as usize; - (*stream).time_base = tb; - - let params = (*stream).codecpar; - - (*params).codec_id = transmute(va.codec as i32); - (*params).codec_type = AVMEDIA_TYPE_AUDIO; - (*params).format = av_get_sample_fmt( - format!("{}\0", va.sample_fmt).as_ptr() as *const libc::c_char - ) as libc::c_int; - (*params).bit_rate = va.bitrate as i64; - (*params).sample_rate = va.sample_rate as libc::c_int; - (*params).ch_layout = AVChannelLayout { - order: AV_CHANNEL_ORDER_NATIVE, - nb_channels: 2, - u: AVChannelLayout__bindgen_ty_1 { - mask: AV_CH_LAYOUT_STEREO, - }, - opaque: ptr::null_mut(), - }; + va.to_stream(stream); + va.to_codec_params((*stream).codecpar); } } } diff --git a/src/egress/http.rs b/src/egress/http.rs new file mode 100644 index 0000000..a1e2f19 --- /dev/null +++ b/src/egress/http.rs @@ -0,0 +1,18 @@ +use std::net::SocketAddr; + +use anyhow::Error; +use warp::{cors, Filter}; + +use crate::settings::Settings; + +pub async fn listen_out_dir(addr: String, settings: Settings) -> Result<(), Error> { + let addr: SocketAddr = addr.parse()?; + let cors = cors().allow_any_origin().allow_methods(vec!["GET"]); + + let warp_out = warp::get() + .and(warp::fs::dir(settings.output_dir.clone())) + .with(cors); + + warp::serve(warp_out).run(addr).await; + Ok(()) +} diff --git a/src/egress/mod.rs b/src/egress/mod.rs index 02d457b..52d31a2 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -1 +1,2 @@ -pub mod hls; \ No newline at end of file +pub mod hls; +pub mod http; \ No newline at end of file diff --git a/src/encode/audio.rs b/src/encode/audio.rs index 9d8ce7d..66783a8 100644 --- a/src/encode/audio.rs +++ b/src/encode/audio.rs @@ -5,18 +5,16 @@ use anyhow::Error; use ffmpeg_sys_next::{ av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_realloc, av_audio_fifo_size, av_audio_fifo_write, av_buffer_ref, av_buffer_unref, - AV_CH_LAYOUT_STEREO, av_channel_layout_copy, av_frame_alloc, av_frame_free, av_frame_get_buffer, - av_freep, av_get_sample_fmt, av_packet_alloc, av_packet_free, - av_packet_rescale_ts, av_samples_alloc_array_and_samples, AVAudioFifo, - AVBufferRef, AVChannelLayout, AVChannelLayout__bindgen_ty_1, AVCodec, - avcodec_alloc_context3, avcodec_find_encoder, avcodec_free_context, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, - AVCodecContext, AVERROR, AVFrame, swr_alloc_set_opts2, swr_convert, swr_free, - swr_init, SwrContext, + av_channel_layout_copy, av_frame_alloc, av_frame_free, av_frame_get_buffer, av_freep, + av_packet_alloc, av_packet_free, av_samples_alloc_array_and_samples, AVAudioFifo, + AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_free_context, + avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, swr_alloc_set_opts2, + swr_convert, swr_free, swr_init, SwrContext, }; -use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use libc::EAGAIN; use tokio::sync::mpsc::UnboundedSender; +use crate::encode::set_encoded_pkt_timing; use crate::ipc::Rx; use crate::pipeline::{PipelinePayload, PipelineProcessor}; use crate::utils::{audio_variant_id_ref, get_ffmpeg_error_msg, id_ref_to_uuid}; @@ -49,8 +47,8 @@ impl Drop for AudioEncoder { } impl AudioEncoder - where - TRecv: Rx, +where + TRecv: Rx, { pub fn new( chan_in: TRecv, @@ -72,8 +70,7 @@ impl AudioEncoder unsafe fn setup_encoder(&mut self, frame: *mut AVFrame) -> Result<(), Error> { if self.ctx.is_null() { - let codec = self.variant.codec; - let encoder = avcodec_find_encoder(transmute(codec as i32)); + let encoder = self.variant.get_codec(); if encoder.is_null() { return Err(Error::msg("Encoder not found")); } @@ -83,20 +80,7 @@ impl AudioEncoder return Err(Error::msg("Failed to allocate encoder context")); } - (*ctx).time_base = self.variant.time_base(); - (*ctx).sample_fmt = av_get_sample_fmt( - format!("{}\0", self.variant.sample_fmt).as_ptr() as *const libc::c_char, - ); - (*ctx).bit_rate = self.variant.bitrate as i64; - (*ctx).sample_rate = self.variant.sample_rate as libc::c_int; - (*ctx).ch_layout = AVChannelLayout { - order: AV_CHANNEL_ORDER_NATIVE, - nb_channels: 2, - u: AVChannelLayout__bindgen_ty_1 { - mask: AV_CH_LAYOUT_STEREO, - }, - opaque: ptr::null_mut(), - }; + self.variant.to_codec_context(ctx); // setup audio FIFO let fifo = av_audio_fifo_alloc((*ctx).sample_fmt, 2, 1); @@ -220,13 +204,17 @@ impl AudioEncoder assert_eq!(var_id, self.variant.id); self.setup_encoder(frame)?; - if !self.process_audio_frame(frame)? { return Ok(()); } // read audio from FIFO - let frame = self.get_fifo_frame()?; + let fifo_frame = self.get_fifo_frame()?; + + // copy pointer to input stream + (*fifo_frame).opaque = (*frame).opaque; + let frame = fifo_frame; + let mut ret = avcodec_send_frame(self.ctx, frame); if ret < 0 && ret != AVERROR(EAGAIN) { return Err(Error::msg(get_ffmpeg_error_msg(ret))); @@ -243,9 +231,7 @@ impl AudioEncoder return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - (*pkt).time_base = (*self.ctx).time_base; - (*pkt).duration = (*frame).duration; - av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base); + set_encoded_pkt_timing(self.ctx, pkt, frame); (*pkt).opaque = self.ctx as *mut libc::c_void; (*pkt).opaque_ref = av_buffer_ref(self.var_id_ref); self.chan_out @@ -257,8 +243,8 @@ impl AudioEncoder } impl PipelineProcessor for AudioEncoder - where - TRecv: Rx, +where + TRecv: Rx, { fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv_next() { diff --git a/src/encode/mod.rs b/src/encode/mod.rs index 071517d..e64440d 100644 --- a/src/encode/mod.rs +++ b/src/encode/mod.rs @@ -1,2 +1,22 @@ +use ffmpeg_sys_next::{av_packet_rescale_ts, AVCodecContext, AVFrame, AVPacket, AVStream}; +use ffmpeg_sys_next::AVMediaType::AVMEDIA_TYPE_VIDEO; + pub mod audio; -pub mod video; \ No newline at end of file +pub mod video; + +/// Set packet details based on decoded frame +pub unsafe fn set_encoded_pkt_timing( + ctx: *mut AVCodecContext, + pkt: *mut AVPacket, + in_frame: *mut AVFrame, +) { + assert!(!(*in_frame).opaque.is_null()); + let in_stream = (*in_frame).opaque as *mut AVStream; + let tb = (*ctx).time_base; + (*pkt).stream_index = (*in_stream).index; + if (*ctx).codec_type == AVMEDIA_TYPE_VIDEO { + (*pkt).duration = tb.den as i64 / tb.num as i64 / (*in_stream).avg_frame_rate.num as i64 + * (*in_stream).avg_frame_rate.den as i64; + } + av_packet_rescale_ts(pkt, (*in_stream).time_base, (*ctx).time_base); +} diff --git a/src/encode/video.rs b/src/encode/video.rs index 4086e02..19b3aa0 100644 --- a/src/encode/video.rs +++ b/src/encode/video.rs @@ -3,14 +3,13 @@ use std::ptr; use anyhow::Error; use ffmpeg_sys_next::{ - av_buffer_ref, av_opt_set, av_packet_alloc, av_packet_free, av_packet_rescale_ts, - AVBufferRef, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, - avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational, + av_buffer_ref, av_packet_alloc, av_packet_free, av_packet_rescale_ts, avcodec_alloc_context3, + avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVBufferRef, + AVCodec, AVCodecContext, AVFrame, AVStream, AVERROR, }; -use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; -use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; use libc::EAGAIN; use tokio::sync::mpsc::UnboundedSender; +use crate::encode::set_encoded_pkt_timing; use crate::ipc::Rx; use crate::pipeline::{PipelinePayload, PipelineProcessor}; @@ -31,8 +30,8 @@ unsafe impl Send for VideoEncoder {} unsafe impl Sync for VideoEncoder {} impl VideoEncoder - where - TRecv: Rx, +where + TRecv: Rx, { pub fn new( chan_in: TRecv, @@ -63,28 +62,7 @@ impl VideoEncoder return Err(Error::msg("Failed to allocate encoder context")); } - (*ctx).time_base = self.variant.time_base(); - (*ctx).bit_rate = self.variant.bitrate as i64; - (*ctx).width = (*frame).width; - (*ctx).height = (*frame).height; - (*ctx).level = self.variant.level as libc::c_int; - (*ctx).profile = self.variant.profile as libc::c_int; - (*ctx).framerate = AVRational { - num: 1, - den: self.variant.fps as libc::c_int, - }; - - let key_frames = self.variant.fps * self.variant.keyframe_interval; - (*ctx).gop_size = key_frames as libc::c_int; - (*ctx).max_b_frames = 1; - (*ctx).pix_fmt = AV_PIX_FMT_YUV420P; - (*ctx).colorspace = AVCOL_SPC_BT709; - av_opt_set( - (*ctx).priv_data, - "preset\0".as_ptr() as *const libc::c_char, - "fast\0".as_ptr() as *const libc::c_char, - 0, - ); + self.variant.to_codec_context(ctx); let ret = avcodec_open2(ctx, encoder, ptr::null_mut()); if ret < 0 { @@ -119,9 +97,7 @@ impl VideoEncoder return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - (*pkt).time_base = (*self.ctx).time_base; - (*pkt).duration = (*frame).duration; - av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base); + set_encoded_pkt_timing(self.ctx, pkt, frame); (*pkt).opaque = self.ctx as *mut libc::c_void; (*pkt).opaque_ref = av_buffer_ref(self.var_id_ref); self.chan_out @@ -133,8 +109,8 @@ impl VideoEncoder } impl PipelineProcessor for VideoEncoder - where - TRecv: Rx, +where + TRecv: Rx, { fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv_next() { diff --git a/src/main.rs b/src/main.rs index f33c799..841d088 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use config::Config; use log::{error, info}; use url::Url; +use crate::egress::http::listen_out_dir; use crate::pipeline::builder::PipelineBuilder; use crate::settings::Settings; use crate::webhook::Webhook; @@ -14,14 +15,14 @@ mod egress; mod encode; mod fraction; mod ingress; +mod ipc; mod pipeline; mod scale; mod settings; +mod tag_frame; mod utils; mod variant; mod webhook; -mod ipc; -mod tag_frame; /// Test: ffmpeg -re -f lavfi -i testsrc -g 2 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -f mpegts srt://localhost:3333 #[tokio::main] @@ -48,7 +49,7 @@ async fn main() -> anyhow::Result<()> { let webhook = Webhook::new(settings.clone()); let builder = PipelineBuilder::new(webhook); let mut listeners = vec![]; - for e in settings.endpoints { + for e in &settings.endpoints { let u: Url = e.parse()?; let addr = format!("{}:{}", u.host_str().unwrap(), u.port().unwrap()); match u.scheme() { @@ -59,6 +60,11 @@ async fn main() -> anyhow::Result<()> { } } } + listeners.push(tokio::spawn(listen_out_dir( + "0.0.0.0:8080".to_owned(), + settings.clone(), + ))); + for handle in listeners { if let Err(e) = handle.await { error!("{e}"); diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 189086e..41ea4b2 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -2,7 +2,7 @@ use std::ops::Add; use std::time::{Duration, Instant}; use anyhow::Error; -use log::info; +use log::{info, warn}; use tokio::sync::broadcast; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -59,7 +59,7 @@ impl PipelineRunner { } pub fn run(&mut self) -> Result<(), Error> { - if let Some(info) = &self.stream_info { + /*if let Some(info) = &self.stream_info { if let Some(v_stream) = info .channels .iter() @@ -73,7 +73,7 @@ impl PipelineRunner { std::thread::sleep(poll_sleep); } } - } + }*/ if let Some(cfg) = self.demuxer.process()? { self.configure_pipeline(cfg)?; } diff --git a/src/scale/mod.rs b/src/scale/mod.rs index f8b81c3..fcb5379 100644 --- a/src/scale/mod.rs +++ b/src/scale/mod.rs @@ -57,7 +57,7 @@ impl Scaler { let ctx = sws_getContext( (*frame).width, (*frame).height, - dst_fmt, + transmute((*frame).format), self.variant.width as libc::c_int, self.variant.height as libc::c_int, dst_fmt, @@ -83,6 +83,7 @@ impl Scaler { return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + (*dst_frame).opaque = (*frame).opaque; (*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref); self.chan_out.send(PipelinePayload::AvFrame( diff --git a/src/tag_frame.rs b/src/tag_frame.rs index 2bec1f7..0da6b68 100644 --- a/src/tag_frame.rs +++ b/src/tag_frame.rs @@ -48,6 +48,7 @@ impl PipelineProcessor for TagFrame if idx == self.variant.src_index() { let new_frame = av_frame_clone(frm); av_frame_copy_props(new_frame, frm); + (*new_frame).opaque = (*frm).opaque; (*new_frame).opaque_ref = av_buffer_ref(self.var_id_ref); self.chan_out .send(PipelinePayload::AvFrame(tag.clone(), new_frame, idx))?; diff --git a/src/variant.rs b/src/variant.rs index 15b05fe..bae01ef 100644 --- a/src/variant.rs +++ b/src/variant.rs @@ -1,8 +1,18 @@ use std::ffi::CStr; use std::fmt::{Display, Formatter}; use std::mem::transmute; +use std::ptr; -use ffmpeg_sys_next::{avcodec_get_name, AVRational}; +use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; +use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_H264}; +use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; +use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; +use ffmpeg_sys_next::{ + av_get_sample_fmt, av_opt_set, avcodec_find_encoder, avcodec_find_encoder_by_name, + avcodec_get_name, AVChannelLayout, AVChannelLayout__bindgen_ty_1, AVCodec, AVCodecContext, + AVCodecParameters, AVRational, AVStream, AV_CH_LAYOUT_STEREO, +}; +use ffmpeg_sys_next::AVColorRange::{AVCOL_RANGE_JPEG, AVCOL_RANGE_MPEG}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -162,6 +172,77 @@ impl VideoVariant { den: 90_000, } } + + pub fn get_codec(&self) -> *const AVCodec { + unsafe { avcodec_find_encoder(transmute(self.codec as u32)) } + } + + pub unsafe fn to_codec_context(&self, ctx: *mut AVCodecContext) { + let codec = self.get_codec(); + (*ctx).codec_id = (*codec).id; + (*ctx).codec_type = (*codec).type_; + (*ctx).time_base = self.time_base(); + (*ctx).bit_rate = self.bitrate as i64; + (*ctx).width = self.width as libc::c_int; + (*ctx).height = self.height as libc::c_int; + (*ctx).level = self.level as libc::c_int; + (*ctx).profile = self.profile as libc::c_int; + (*ctx).framerate = AVRational { + num: self.fps as libc::c_int, + den: 1, + }; + + let key_frames = self.fps * self.keyframe_interval; + (*ctx).gop_size = key_frames as libc::c_int; + (*ctx).keyint_min = key_frames as libc::c_int; + (*ctx).max_b_frames = 1; + (*ctx).pix_fmt = AV_PIX_FMT_YUV420P; + (*ctx).colorspace = AVCOL_SPC_BT709; + (*ctx).color_range = AVCOL_RANGE_MPEG; + if (*codec).id == AV_CODEC_ID_H264 { + av_opt_set( + (*ctx).priv_data, + "preset\0".as_ptr() as *const libc::c_char, + "fast\0".as_ptr() as *const libc::c_char, + 0, + ); + av_opt_set( + (*ctx).priv_data, + "tune\0".as_ptr() as *const libc::c_char, + "zerolatency\0".as_ptr() as *const libc::c_char, + 0, + ); + } + } + + pub unsafe fn to_codec_params(&self, params: *mut AVCodecParameters) { + let codec = self.get_codec(); + (*params).codec_id = (*codec).id; + (*params).codec_type = (*codec).type_; + (*params).height = self.height as libc::c_int; + (*params).width = self.width as libc::c_int; + (*params).format = AV_PIX_FMT_YUV420P as i32; + (*params).framerate = AVRational { + num: self.fps as libc::c_int, + den: 1, + }; + (*params).bit_rate = self.bitrate as i64; + (*params).color_space = AVCOL_SPC_BT709; + (*params).level = self.level as libc::c_int; + (*params).profile = self.profile as libc::c_int; + } + + pub unsafe fn to_stream(&self, stream: *mut AVStream) { + (*stream).time_base = self.time_base(); + (*stream).avg_frame_rate = AVRational { + num: self.fps as libc::c_int, + den: 1, + }; + (*stream).r_frame_rate = AVRational { + num: self.fps as libc::c_int, + den: 1, + }; + } } impl AudioVariant { @@ -171,4 +252,57 @@ impl AudioVariant { den: self.sample_rate as libc::c_int, } } + + pub fn get_codec(&self) -> *const AVCodec { + unsafe { + if self.codec == AV_CODEC_ID_AAC as usize { + avcodec_find_encoder_by_name("libfdk_aac\0".as_ptr() as *const libc::c_char) + } else { + avcodec_find_encoder(transmute(self.codec as u32)) + } + } + } + + pub unsafe fn to_codec_context(&self, ctx: *mut AVCodecContext) { + let codec = self.get_codec(); + (*ctx).codec_id = (*codec).id; + (*ctx).codec_type = (*codec).type_; + (*ctx).time_base = self.time_base(); + (*ctx).sample_fmt = + av_get_sample_fmt(format!("{}\0", self.sample_fmt).as_ptr() as *const libc::c_char); + (*ctx).bit_rate = self.bitrate as i64; + (*ctx).sample_rate = self.sample_rate as libc::c_int; + (*ctx).ch_layout = self.channel_layout(); + } + + pub unsafe fn to_codec_params(&self, params: *mut AVCodecParameters) { + let codec = self.get_codec(); + (*params).codec_id = (*codec).id; + (*params).codec_type = (*codec).type_; + (*params).format = + av_get_sample_fmt(format!("{}\0", self.sample_fmt).as_ptr() as *const libc::c_char) + as libc::c_int; + (*params).bit_rate = self.bitrate as i64; + (*params).sample_rate = self.sample_rate as libc::c_int; + (*params).ch_layout = self.channel_layout(); + } + + pub unsafe fn to_stream(&self, stream: *mut AVStream) { + (*stream).time_base = self.time_base(); + (*stream).r_frame_rate = AVRational { + num: (*stream).time_base.den, + den: (*stream).time_base.num, + }; + } + + pub fn channel_layout(&self) -> AVChannelLayout { + AVChannelLayout { + order: AV_CHANNEL_ORDER_NATIVE, + nb_channels: 2, + u: AVChannelLayout__bindgen_ty_1 { + mask: AV_CH_LAYOUT_STEREO, + }, + opaque: ptr::null_mut(), + } + } } diff --git a/src/webhook.rs b/src/webhook.rs index 7195c75..72159b6 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -1,3 +1,4 @@ +use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_AAC_LATM}; use uuid::Uuid; use crate::demux::info::{DemuxStreamInfo, StreamChannelType}; @@ -62,7 +63,7 @@ impl Webhook { codec: 86018, channels: 2, sample_rate: 44_100, - sample_fmt: "fltp".to_owned(), + sample_fmt: "s16".to_owned(), })); vars.push(VariantStream::Audio(AudioVariant { id: Uuid::new_v4(), @@ -72,7 +73,7 @@ impl Webhook { codec: 86018, channels: 2, sample_rate: 44_100, - sample_fmt: "fltp".to_owned(), + sample_fmt: "s16".to_owned(), })); } diff --git a/test.sh b/test.sh index 0ce955e..0de6df7 100755 --- a/test.sh +++ b/test.sh @@ -1,5 +1,6 @@ #!/bin/bash ffmpeg \ - -re -f lavfi -i testsrc -g 300 -r 60 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -c:a aac -b:a 192k \ - -f mpegts srt://localhost:3333 + -f lavfi -i "sine=frequency=1000:sample_rate=48000" \ + -re -f lavfi -i testsrc -g 300 -r 60 -pix_fmt yuv420p -s 1280x720 \ + -c:v h264 -b:v 2000k -c:a aac -b:a 192k $@ -f mpegts srt://localhost:3333