From 2a82e2c00bbfdf3a735053b4d1c490dc85e48033 Mon Sep 17 00:00:00 2001 From: kieran Date: Mon, 25 Mar 2024 19:19:31 +0000 Subject: [PATCH] fix packet timestamps --- .gitignore | 3 +- Cargo.lock | 620 +++++++++++++++++++++------------------- Cargo.toml | 1 + src/decode/mod.rs | 8 +- src/demux/mod.rs | 2 +- src/egress/hls.rs | 82 +++--- src/encode/mod.rs | 44 ++- src/main.rs | 4 +- src/pipeline/builder.rs | 14 +- src/pipeline/mod.rs | 45 ++- src/pipeline/runner.rs | 22 +- src/scale/mod.rs | 10 +- src/utils.rs | 8 +- src/variant.rs | 34 ++- src/webhook.rs | 92 +++--- 15 files changed, 520 insertions(+), 469 deletions(-) mode change 100755 => 100644 Cargo.toml diff --git a/.gitignore b/.gitignore index 6b39d31..bf1546b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.idea/ \ No newline at end of file +.idea/ +out/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 12be4a3..ea23732 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ - "gimli", + "gimli", ] [[package]] @@ -23,9 +23,9 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", + "cfg-if", + "cipher", + "cpufeatures", ] [[package]] @@ -34,7 +34,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ - "memchr", + "memchr", ] [[package]] @@ -43,7 +43,7 @@ version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" dependencies = [ - "backtrace", + "backtrace", ] [[package]] @@ -64,9 +64,9 @@ version = "0.1.77" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -81,13 +81,13 @@ version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ - "addr2line", - "cc", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", ] [[package]] @@ -102,18 +102,18 @@ version = "0.64.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" dependencies = [ - "bitflags 1.3.2", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", - "syn 1.0.109", + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", ] [[package]] @@ -128,7 +128,7 @@ version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" dependencies = [ - "serde", + "serde", ] [[package]] @@ -137,7 +137,7 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "generic-array", + "generic-array", ] [[package]] @@ -158,7 +158,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom", ] [[package]] @@ -173,8 +173,8 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ - "crypto-common", - "inout", + "crypto-common", + "inout", ] [[package]] @@ -183,9 +183,9 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" dependencies = [ - "glob", - "libc", - "libloading", + "glob", + "libc", + "libloading", ] [[package]] @@ -194,18 +194,18 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be" dependencies = [ - "async-trait", - "convert_case 0.6.0", - "json5", - "lazy_static", - "nom", - "pathdiff", - "ron", - "rust-ini", - "serde", - "serde_json", - "toml", - "yaml-rust", + "async-trait", + "convert_case 0.6.0", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", ] [[package]] @@ -214,7 +214,7 @@ version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" dependencies = [ - "const-random-macro", + "const-random-macro", ] [[package]] @@ -223,9 +223,9 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "getrandom", - "once_cell", - "tiny-keccak", + "getrandom", + "once_cell", + "tiny-keccak", ] [[package]] @@ -240,7 +240,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" dependencies = [ - "unicode-segmentation", + "unicode-segmentation", ] [[package]] @@ -249,7 +249,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ - "libc", + "libc", ] [[package]] @@ -264,8 +264,8 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ - "generic-array", - "typenum", + "generic-array", + "typenum", ] [[package]] @@ -274,7 +274,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher", + "cipher", ] [[package]] @@ -283,11 +283,11 @@ version = "0.99.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ - "convert_case 0.4.0", - "proc-macro2", - "quote", - "rustc_version", - "syn 1.0.109", + "convert_case 0.4.0", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", ] [[package]] @@ -296,9 +296,9 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", - "crypto-common", - "subtle", + "block-buffer", + "crypto-common", + "subtle", ] [[package]] @@ -307,20 +307,26 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" dependencies = [ - "const-random", + "const-random", ] +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + [[package]] name = "env_logger" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", ] [[package]] @@ -335,12 +341,12 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2529ad916d08c3562c754c21bc9b17a26c7882c0f5706cc2cd69472175f1620" dependencies = [ - "bindgen", - "cc", - "libc", - "num_cpus", - "pkg-config", - "vcpkg", + "bindgen", + "cc", + "libc", + "num_cpus", + "pkg-config", + "vcpkg", ] [[package]] @@ -349,7 +355,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ - "percent-encoding", + "percent-encoding", ] [[package]] @@ -358,12 +364,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", ] [[package]] @@ -372,8 +378,8 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ - "futures-core", - "futures-sink", + "futures-core", + "futures-sink", ] [[package]] @@ -394,9 +400,9 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -417,16 +423,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", ] [[package]] @@ -435,8 +441,8 @@ version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ - "typenum", - "version_check", + "typenum", + "version_check", ] [[package]] @@ -445,9 +451,9 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ - "cfg-if", - "libc", - "wasi", + "cfg-if", + "libc", + "wasi", ] [[package]] @@ -492,7 +498,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest", ] [[package]] @@ -507,8 +513,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -517,8 +523,8 @@ version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ - "equivalent", - "hashbrown 0.14.3", + "equivalent", + "hashbrown 0.14.3", ] [[package]] @@ -527,7 +533,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" dependencies = [ - "generic-array", + "generic-array", ] [[package]] @@ -536,9 +542,18 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.52.0", + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", ] [[package]] @@ -553,9 +568,9 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" dependencies = [ - "pest", - "pest_derive", - "serde", + "pest", + "pest_derive", + "serde", ] [[package]] @@ -564,7 +579,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap", + "indexmap", ] [[package]] @@ -591,8 +606,8 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ - "cfg-if", - "windows-targets 0.52.4", + "cfg-if", + "windows-targets 0.52.4", ] [[package]] @@ -625,7 +640,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ - "adler", + "adler", ] [[package]] @@ -634,9 +649,9 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ - "libc", - "wasi", - "windows-sys 0.48.0", + "libc", + "wasi", + "windows-sys 0.48.0", ] [[package]] @@ -645,8 +660,8 @@ version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ - "memchr", - "minimal-lexical", + "memchr", + "minimal-lexical", ] [[package]] @@ -655,7 +670,7 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ - "autocfg", + "autocfg", ] [[package]] @@ -664,8 +679,8 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", - "libc", + "hermit-abi", + "libc", ] [[package]] @@ -674,7 +689,7 @@ version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ - "memchr", + "memchr", ] [[package]] @@ -689,8 +704,8 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" dependencies = [ - "dlv-list", - "hashbrown 0.13.2", + "dlv-list", + "hashbrown 0.13.2", ] [[package]] @@ -705,7 +720,7 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ - "digest", + "digest", ] [[package]] @@ -726,9 +741,9 @@ version = "2.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f8023d0fb78c8e03784ea1c7f3fa36e68a723138990b8d5a47d916b651e7a8" dependencies = [ - "memchr", - "thiserror", - "ucd-trie", + "memchr", + "thiserror", + "ucd-trie", ] [[package]] @@ -737,8 +752,8 @@ version = "2.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0d24f72393fd16ab6ac5738bc33cdb6a9aa73f8b902e8fe29cf4e67d7dd1026" dependencies = [ - "pest", - "pest_generator", + "pest", + "pest_generator", ] [[package]] @@ -747,11 +762,11 @@ version = "2.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdc17e2a6c7d0a492f0158d7a4bd66cc17280308bbaff78d5bef566dca35ab80" dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn 2.0.52", + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -760,9 +775,9 @@ version = "2.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "934cd7631c050f4674352a6e835d5f6711ffbfb9345c2fc0107155ac495ae293" dependencies = [ - "once_cell", - "pest", - "sha2", + "once_cell", + "pest", + "sha2", ] [[package]] @@ -801,8 +816,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" dependencies = [ - "env_logger", - "log", + "env_logger", + "log", ] [[package]] @@ -811,7 +826,7 @@ version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ - "unicode-ident", + "unicode-ident", ] [[package]] @@ -820,7 +835,7 @@ version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ - "proc-macro2", + "proc-macro2", ] [[package]] @@ -829,9 +844,9 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ - "libc", - "rand_chacha", - "rand_core", + "libc", + "rand_chacha", + "rand_core", ] [[package]] @@ -840,8 +855,8 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ - "ppv-lite86", - "rand_core", + "ppv-lite86", + "rand_core", ] [[package]] @@ -850,7 +865,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom", ] [[package]] @@ -859,10 +874,10 @@ version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ - "aho-corasick", - "memchr", - "regex-automata", - "regex-syntax", + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", ] [[package]] @@ -871,9 +886,9 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", + "aho-corasick", + "memchr", + "regex-syntax", ] [[package]] @@ -888,10 +903,10 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" dependencies = [ - "base64", - "bitflags 2.4.2", - "serde", - "serde_derive", + "base64", + "bitflags 2.4.2", + "serde", + "serde_derive", ] [[package]] @@ -900,8 +915,8 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" dependencies = [ - "cfg-if", - "ordered-multimap", + "cfg-if", + "ordered-multimap", ] [[package]] @@ -922,7 +937,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver", + "semver", ] [[package]] @@ -943,7 +958,7 @@ version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ - "serde_derive", + "serde_derive", ] [[package]] @@ -952,9 +967,9 @@ version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -963,9 +978,9 @@ version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ - "itoa", - "ryu", - "serde", + "itoa", + "ryu", + "serde", ] [[package]] @@ -974,7 +989,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" dependencies = [ - "serde", + "serde", ] [[package]] @@ -983,9 +998,9 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "cfg-if", - "cpufeatures", - "digest", + "cfg-if", + "cpufeatures", + "digest", ] [[package]] @@ -994,9 +1009,9 @@ version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "cfg-if", - "cpufeatures", - "digest", + "cfg-if", + "cpufeatures", + "digest", ] [[package]] @@ -1011,7 +1026,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" dependencies = [ - "autocfg", + "autocfg", ] [[package]] @@ -1020,8 +1035,8 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ - "libc", - "windows-sys 0.52.0", + "libc", + "windows-sys 0.52.0", ] [[package]] @@ -1030,26 +1045,26 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db1a69f75dbd56399adf74daf49e077d31969431aa5cfa2ed1aa8ae175335ea7" dependencies = [ - "aes", - "array-init", - "arraydeque", - "bitflags 2.4.2", - "bytes", - "cipher", - "ctr", - "derive_more", - "hex", - "hmac", - "keyed_priority_queue", - "log", - "pbkdf2", - "rand", - "regex", - "sha-1", - "streaming-stats", - "take-until", - "thiserror", - "url", + "aes", + "array-init", + "arraydeque", + "bitflags 2.4.2", + "bytes", + "cipher", + "ctr", + "derive_more", + "hex", + "hmac", + "keyed_priority_queue", + "log", + "pbkdf2", + "rand", + "regex", + "sha-1", + "streaming-stats", + "take-until", + "thiserror", + "url", ] [[package]] @@ -1058,36 +1073,37 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2383b0b3530aa62f72950ac949b1e27be4f8184305ada5c8ebc0fc9bac8010" dependencies = [ - "bytes", - "futures", - "log", - "rand", - "socket2", - "srt-protocol", - "tokio", - "tokio-stream", + "bytes", + "futures", + "log", + "rand", + "socket2", + "srt-protocol", + "tokio", + "tokio-stream", ] [[package]] name = "stream-core" version = "0.1.0" dependencies = [ - "anyhow", - "async-trait", - "bytes", - "config", - "ffmpeg-sys-next", - "futures-util", - "libc", - "log", - "pretty-hex", - "pretty_env_logger", - "serde", - "srt-tokio", - "tokio", - "tokio-stream", - "url", - "uuid", + "anyhow", + "async-trait", + "bytes", + "config", + "ffmpeg-sys-next", + "futures-util", + "itertools", + "libc", + "log", + "pretty-hex", + "pretty_env_logger", + "serde", + "srt-tokio", + "tokio", + "tokio-stream", + "url", + "uuid", ] [[package]] @@ -1096,7 +1112,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0d670ce4e348a2081843569e0f79b21c99c91bb9028b3b3ecb0f050306de547" dependencies = [ - "num-traits", + "num-traits", ] [[package]] @@ -1111,9 +1127,9 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", + "proc-macro2", + "quote", + "unicode-ident", ] [[package]] @@ -1122,9 +1138,9 @@ version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", + "proc-macro2", + "quote", + "unicode-ident", ] [[package]] @@ -1139,7 +1155,7 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" dependencies = [ - "winapi-util", + "winapi-util", ] [[package]] @@ -1148,7 +1164,7 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ - "thiserror-impl", + "thiserror-impl", ] [[package]] @@ -1157,9 +1173,9 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -1168,7 +1184,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" dependencies = [ - "crunchy", + "crunchy", ] [[package]] @@ -1177,7 +1193,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" dependencies = [ - "tinyvec_macros", + "tinyvec_macros", ] [[package]] @@ -1192,15 +1208,15 @@ version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ - "backtrace", - "bytes", - "libc", - "mio", - "num_cpus", - "pin-project-lite", - "socket2", - "tokio-macros", - "windows-sys 0.48.0", + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", ] [[package]] @@ -1209,9 +1225,9 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", + "proc-macro2", + "quote", + "syn 2.0.52", ] [[package]] @@ -1220,10 +1236,10 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -1232,11 +1248,11 @@ version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] @@ -1245,10 +1261,10 @@ version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit", + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", ] [[package]] @@ -1257,7 +1273,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" dependencies = [ - "serde", + "serde", ] [[package]] @@ -1266,11 +1282,11 @@ version = "0.22.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c12219811e0c1ba077867254e5ad62ee2c9c190b0d957110750ac0cda1ae96cd" dependencies = [ - "indexmap", - "serde", - "serde_spanned", - "toml_datetime", - "winnow", + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", ] [[package]] @@ -1303,7 +1319,7 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ - "tinyvec", + "tinyvec", ] [[package]] @@ -1318,9 +1334,9 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ - "form_urlencoded", - "idna", - "percent-encoding", + "form_urlencoded", + "idna", + "percent-encoding", ] [[package]] @@ -1329,8 +1345,8 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ - "getrandom", - "serde", + "getrandom", + "serde", ] [[package]] @@ -1357,8 +1373,8 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", ] [[package]] @@ -1373,7 +1389,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ - "winapi", + "winapi", ] [[package]] @@ -1388,7 +1404,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.5", + "windows-targets 0.48.5", ] [[package]] @@ -1397,7 +1413,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.4", ] [[package]] @@ -1406,13 +1422,13 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -1421,13 +1437,13 @@ version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.4", + "windows_aarch64_msvc 0.52.4", + "windows_i686_gnu 0.52.4", + "windows_i686_msvc 0.52.4", + "windows_x86_64_gnu 0.52.4", + "windows_x86_64_gnullvm 0.52.4", + "windows_x86_64_msvc 0.52.4", ] [[package]] @@ -1520,7 +1536,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8" dependencies = [ - "memchr", + "memchr", ] [[package]] @@ -1529,5 +1545,5 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" dependencies = [ - "linked-hash-map", + "linked-hash-map", ] diff --git a/Cargo.toml b/Cargo.toml old mode 100755 new mode 100644 index c7ba3d9..e62b5e6 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,4 @@ uuid = { version = "1.8.0", features = ["v4", "serde"] } serde = { version = "1.0.197", features = ["derive"] } config = { version = "0.14.0", features = ["toml"] } url = "2.5.0" +itertools = "0.12.1" diff --git a/src/decode/mod.rs b/src/decode/mod.rs index f6b4dc0..1d5f04e 100644 --- a/src/decode/mod.rs +++ b/src/decode/mod.rs @@ -7,6 +7,7 @@ use ffmpeg_sys_next::{ avcodec_find_decoder, avcodec_free_context, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame, avcodec_send_packet, AVCodecContext, AVERROR, AVERROR_EOF, AVPacket, AVStream, }; +use ffmpeg_sys_next::AVPictureType::{AV_PICTURE_TYPE_I, AV_PICTURE_TYPE_NONE}; use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedReceiver; @@ -99,9 +100,8 @@ impl Decoder { } return Err(Error::msg(format!("Failed to decode {}", ret))); } - (*frame).time_base = (*pkt).time_base; - (*frame).opaque = stream as *mut libc::c_void; - self.chan_out.send(PipelinePayload::AvFrame(frame))?; + (*frame).time_base = (*stream).time_base; + self.chan_out.send(PipelinePayload::AvFrame("Decoder frame".to_owned(), frame))?; frames += 1; } return Ok(frames); @@ -111,7 +111,7 @@ impl Decoder { pub fn process(&mut self) -> Result { while let Ok(pkg) = self.chan_in.try_recv() { - return if let PipelinePayload::AvPacket(pkt) = pkg { + return if let PipelinePayload::AvPacket(_, pkt) = pkg { unsafe { let frames = self.decode_pkt(pkt)?; Ok(frames) diff --git a/src/demux/mod.rs b/src/demux/mod.rs index 0d8c1f7..8eb92d3 100644 --- a/src/demux/mod.rs +++ b/src/demux/mod.rs @@ -158,7 +158,7 @@ impl Demuxer { } (*pkt).opaque = stream as *mut libc::c_void; - let pkg = PipelinePayload::AvPacket(pkt); + let pkg = PipelinePayload::AvPacket("Demuxer packet".to_owned(), pkt); self.chan_out.send(pkg)?; Ok(()) } diff --git a/src/egress/hls.rs b/src/egress/hls.rs index a908ea3..3594716 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::mem::transmute; use std::ptr; @@ -14,9 +15,8 @@ use ffmpeg_sys_next::{ use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; -use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; -use ffmpeg_sys_next::AVSampleFormat::AV_SAMPLE_FMT_FLT; use futures_util::StreamExt; +use itertools::Itertools; use log::info; use tokio::sync::mpsc::{Receiver, UnboundedReceiver}; use uuid::{Bytes, Uuid, Variant}; @@ -27,6 +27,8 @@ use crate::pipeline::{HLSEgressConfig, PipelinePayload}; use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid}; use crate::variant::{VariantStream, VideoVariant}; +use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; + pub struct HlsEgress { /// Pipeline id id: Uuid, @@ -56,11 +58,13 @@ impl HlsEgress { unsafe fn setup_muxer(&mut self) -> Result<(), Error> { let mut ctx = ptr::null_mut(); + let base = format!("{}/{}", self.config.out_dir, self.id); + let ret = avformat_alloc_output_context2( &mut ctx, ptr::null(), "hls\0".as_ptr() as *const libc::c_char, - format!("{}/stream_%v/live.m3u8\0", self.id).as_ptr() as *const libc::c_char, + format!("{}/stream_%v/live.m3u8\0", base).as_ptr() as *const libc::c_char, ); if ret < 0 { return Err(Error::msg(get_ffmpeg_error_msg(ret))); @@ -69,7 +73,7 @@ impl HlsEgress { av_opt_set( (*ctx).priv_data, "hls_segment_filename\0".as_ptr() as *const libc::c_char, - format!("{}/stream_%v/seg_%05d.ts\0", self.id).as_ptr() as *const libc::c_char, + format!("{}/stream_%v/seg_%05d.ts\0", base).as_ptr() as *const libc::c_char, 0, ); @@ -94,16 +98,8 @@ impl HlsEgress { 0, ); - info!("map_str={}", self.config.stream_map); - - av_opt_set( - (*ctx).priv_data, - "var_stream_map\0".as_ptr() as *const libc::c_char, - format!("{}\0", self.config.stream_map).as_ptr() as *const libc::c_char, - 0, - ); - for var in &mut self.config.variants { + let tb = var.time_base(); match var { VariantStream::Video(vs) => { let stream = avformat_new_stream(ctx, ptr::null()); @@ -113,6 +109,7 @@ impl HlsEgress { // overwrite dst_index to match output stream vs.dst_index = (*stream).index as usize; + (*stream).time_base = tb; let params = (*stream).codecpar; (*params).height = vs.height as libc::c_int; @@ -137,6 +134,7 @@ impl HlsEgress { // overwrite dst_index to match output stream va.dst_index = (*stream).index as usize; + (*stream).time_base = tb; let params = (*stream).codecpar; @@ -160,6 +158,34 @@ impl HlsEgress { } } + // configure mapping + let mut stream_map: HashMap> = HashMap::new(); + for var in &self.config.variants { + let cfg = match var { + VariantStream::Video(vx) => format!("v:{}", vx.dst_index), + VariantStream::Audio(ax) => format!("a:{}", ax.dst_index), + }; + if let Some(out_stream) = stream_map.get_mut(&var.dst_index()) { + out_stream.push(cfg); + } else { + stream_map.insert(var.dst_index(), vec![cfg]); + } + } + let stream_map = stream_map + .values() + .into_iter() + .map(|v| v.join(",")) + .join(" "); + + info!("map_str={}", stream_map); + + av_opt_set( + (*ctx).priv_data, + "var_stream_map\0".as_ptr() as *const libc::c_char, + format!("{}\0", stream_map).as_ptr() as *const libc::c_char, + 0, + ); + av_dump_format(ctx, 0, ptr::null(), 1); let ret = avformat_write_header(ctx, ptr::null_mut()); @@ -172,34 +198,16 @@ impl HlsEgress { } unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { - let variant_id = id_ref_to_uuid((*pkt).opaque_ref); - let dst_stream_index = self.config.variants.iter().find_map(|v| match &v { - VariantStream::Video(vv) => { - if vv.id.eq(&variant_id) { - Some(vv.dst_index) - } else { - None - } - } - VariantStream::Audio(va) => { - if va.id.eq(&variant_id) { - Some(va.dst_index) - } else { - None - } - } - _ => None, - }); - if let None = dst_stream_index { + let variant_id = id_ref_to_uuid((*pkt).opaque_ref)?; + let variant = self.config.variants.iter().find(|v| v.id() == variant_id); + if variant.is_none() { return Err(Error::msg(format!( "No stream found with id={:?}", - dst_stream_index + variant_id ))); } - let stream = *(*self.ctx).streams.add(dst_stream_index.unwrap()); - av_packet_rescale_ts(pkt, (*pkt).time_base, (*stream).time_base); - + let stream = *(*self.ctx).streams.add(variant.unwrap().dst_index()); (*pkt).stream_index = (*stream).index; let ret = av_interleaved_write_frame(self.ctx, pkt); @@ -213,7 +221,7 @@ impl HlsEgress { pub fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv() { match pkg { - PipelinePayload::AvPacket(pkt) => unsafe { + PipelinePayload::AvPacket(_, pkt) => unsafe { if self.ctx == ptr::null_mut() { self.setup_muxer()?; } diff --git a/src/encode/mod.rs b/src/encode/mod.rs index a08917d..08db29b 100644 --- a/src/encode/mod.rs +++ b/src/encode/mod.rs @@ -3,11 +3,11 @@ use std::ptr; use anyhow::Error; use ffmpeg_sys_next::{ - av_buffer_ref, AV_CH_LAYOUT_STEREO, AV_CODEC_FLAG_GLOBAL_HEADER, av_get_sample_fmt, av_opt_set, - av_packet_alloc, av_packet_free, AVBufferRef, AVChannelLayout, - AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, avcodec_open2, - avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational, - AVStream, + av_buffer_ref, AV_CH_LAYOUT_STEREO, av_get_sample_fmt, av_opt_set, av_packet_alloc, + av_packet_free, av_packet_rescale_ts, AVBufferRef, AVChannelLayout, + AVChannelLayout__bindgen_ty_1, AVCodec, avcodec_alloc_context3, avcodec_find_encoder, + avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodecContext, AVERROR, + AVFrame, }; use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; @@ -16,7 +16,7 @@ use tokio::sync::mpsc::UnboundedSender; use crate::ipc::Rx; use crate::pipeline::PipelinePayload; -use crate::utils::{get_ffmpeg_error_msg, variant_id_ref}; +use crate::utils::{get_ffmpeg_error_msg, id_ref_to_uuid, variant_id_ref}; use crate::variant::VariantStream; pub struct Encoder { @@ -33,8 +33,8 @@ unsafe impl Send for Encoder {} unsafe impl Sync for Encoder {} impl Encoder - where - TRecv: Rx, +where + TRecv: Rx, { pub fn new( chan_in: TRecv, @@ -69,17 +69,15 @@ impl Encoder return Err(Error::msg("Failed to allocate encoder context")); } + (*ctx).time_base = self.variant.time_base(); match &self.variant { VariantStream::Video(vv) => { (*ctx).bit_rate = vv.bitrate as i64; (*ctx).width = (*frame).width; (*ctx).height = (*frame).height; - (*ctx).time_base = AVRational { - num: 1, - den: vv.fps as libc::c_int, - }; - (*ctx).gop_size = (vv.fps * vv.keyframe_interval) as libc::c_int; + let key_frames = vv.fps * vv.keyframe_interval; + (*ctx).gop_size = key_frames as libc::c_int; (*ctx).max_b_frames = 1; (*ctx).pix_fmt = AV_PIX_FMT_YUV420P; av_opt_set( @@ -103,10 +101,6 @@ impl Encoder }, opaque: ptr::null_mut(), }; - (*ctx).time_base = AVRational { - num: 1, - den: va.sample_rate as libc::c_int, - }; } _ => { // nothing @@ -125,12 +119,11 @@ impl Encoder } unsafe fn process_frame(&mut self, frame: *mut AVFrame) -> Result<(), Error> { - let stream = (*frame).opaque as *mut AVStream; - if (*stream).index as usize != self.variant.src_index() { - return Ok(()); - } self.setup_encoder(frame)?; + let var_id = id_ref_to_uuid((*frame).opaque_ref)?; + assert_eq!(var_id, self.variant.id()); + let mut ret = avcodec_send_frame(self.ctx, frame); if ret < 0 && ret != AVERROR(EAGAIN) { return Err(Error::msg(get_ffmpeg_error_msg(ret))); @@ -147,11 +140,12 @@ impl Encoder return Err(Error::msg(get_ffmpeg_error_msg(ret))); } + (*pkt).time_base = (*self.ctx).time_base; (*pkt).duration = (*frame).duration; - (*pkt).time_base = (*frame).time_base; - (*pkt).opaque = stream as *mut libc::c_void; + av_packet_rescale_ts(pkt, (*frame).time_base, (*self.ctx).time_base); (*pkt).opaque_ref = av_buffer_ref(self.var_id_ref); - self.chan_out.send(PipelinePayload::AvPacket(pkt))?; + self.chan_out + .send(PipelinePayload::AvPacket("Encoder packet".to_owned(), pkt))?; } Ok(()) @@ -160,7 +154,7 @@ impl Encoder pub fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv_next() { match pkg { - PipelinePayload::AvFrame(frm) => unsafe { + PipelinePayload::AvFrame(_, frm) => unsafe { self.process_frame(frm)?; }, _ => return Err(Error::msg("Payload not supported")), diff --git a/src/main.rs b/src/main.rs index 9ad68ba..3b0d1f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { pretty_env_logger::init(); unsafe { - //ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_MAX_OFFSET); + //ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_DEBUG); info!( "FFMPEG version={}", CStr::from_ptr(ffmpeg_sys_next::av_version_info()) @@ -47,7 +47,7 @@ async fn main() -> anyhow::Result<()> { let settings: Settings = builder.try_deserialize()?; - let webhook = Webhook::new(settings.webhook_url); + let webhook = Webhook::new(settings.clone()); let builder = PipelineBuilder::new(webhook); let mut listeners = vec![]; for e in settings.endpoints { diff --git a/src/pipeline/builder.rs b/src/pipeline/builder.rs index 0b9a9ee..6d12559 100644 --- a/src/pipeline/builder.rs +++ b/src/pipeline/builder.rs @@ -14,8 +14,16 @@ impl PipelineBuilder { Self { webhook } } - pub async fn build_for(&self, info: ConnectionInfo, recv: UnboundedReceiver) -> Result { - let config = self.webhook.start(info).await?; - Ok(PipelineRunner::new(config, recv)) + pub async fn build_for( + &self, + info: ConnectionInfo, + recv: UnboundedReceiver, + ) -> Result { + self.webhook.start(info).await?; + Ok(PipelineRunner::new( + Default::default(), + self.webhook.clone(), + recv, + )) } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 6f85c62..0d627e7 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,10 +1,6 @@ use std::ops::{Deref, DerefMut}; -use async_trait::async_trait; -use ffmpeg_sys_next::{ - av_frame_alloc, av_frame_free, av_frame_ref, av_packet_alloc, av_packet_free, av_packet_ref, - AVFrame, AVPacket, -}; +use ffmpeg_sys_next::{av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props, av_packet_free, AVFrame, AVPacket}; use serde::{Deserialize, Serialize}; use crate::demux::info::DemuxStreamInfo; @@ -18,19 +14,16 @@ pub enum EgressType { HLS(HLSEgressConfig), DASH, WHEP, + MPEGTS, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct HLSEgressConfig { + pub out_dir: String, pub variants: Vec, - - /// FFMPEG stream mapping string - /// - /// v:0,a:0 v:1,a:0, v:2,a:1 etc.. - pub stream_map: String, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct PipelineConfig { pub id: uuid::Uuid, pub recording: Vec, @@ -44,9 +37,9 @@ pub enum PipelinePayload { /// Raw bytes from ingress Bytes(bytes::Bytes), /// FFMpeg AVPacket - AvPacket(*mut AVPacket), + AvPacket(String, *mut AVPacket), /// FFMpeg AVFrame - AvFrame(*mut AVFrame), + AvFrame(String, *mut AVFrame), /// Information about the input stream SourceInfo(DemuxStreamInfo), } @@ -60,15 +53,15 @@ impl Clone for PipelinePayload { match self { PipelinePayload::Empty => PipelinePayload::Empty, PipelinePayload::Bytes(b) => PipelinePayload::Bytes(b.clone()), - PipelinePayload::AvPacket(p) => unsafe { - let new_pkt = av_packet_alloc(); - av_packet_ref(new_pkt, *p); - PipelinePayload::AvPacket(new_pkt) + PipelinePayload::AvPacket(t, p) => unsafe { + let new_pkt = av_packet_clone(*p); + av_packet_copy_props(new_pkt, *p); + PipelinePayload::AvPacket(t.clone(), new_pkt) }, - PipelinePayload::AvFrame(p) => unsafe { - let new_frame = av_frame_alloc(); - av_frame_ref(new_frame, *p); - PipelinePayload::AvFrame(new_frame) + PipelinePayload::AvFrame(t, p) => unsafe { + let new_frame = av_frame_clone(*p); + av_frame_copy_props(new_frame, *p); + PipelinePayload::AvFrame(t.clone(), new_frame) }, PipelinePayload::SourceInfo(i) => PipelinePayload::SourceInfo(i.clone()), } @@ -80,19 +73,13 @@ impl Drop for PipelinePayload { match self { PipelinePayload::Empty => {} PipelinePayload::Bytes(_) => {} - PipelinePayload::AvPacket(p) => unsafe { + PipelinePayload::AvPacket(_, p) => unsafe { av_packet_free(p); }, - PipelinePayload::AvFrame(p) => unsafe { + PipelinePayload::AvFrame(_, p) => unsafe { av_frame_free(p); }, PipelinePayload::SourceInfo(_) => {} } } } - -#[async_trait] -pub trait PipelineStep { - fn name(&self) -> String; - async fn process(&mut self, pkg: &PipelinePayload) -> Result; -} diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 59ac198..25f2bc1 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -2,7 +2,9 @@ use std::ops::{Add, AddAssign}; use std::time::{Duration, Instant}; use anyhow::Error; +use itertools::Itertools; use log::info; +use tokio::runtime::Runtime; use tokio::sync::broadcast; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -11,9 +13,10 @@ use crate::demux::Demuxer; use crate::demux::info::{DemuxStreamInfo, StreamChannelType}; use crate::egress::hls::HlsEgress; use crate::encode::Encoder; -use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload, PipelineStep}; +use crate::pipeline::{EgressType, PipelineConfig, PipelinePayload}; use crate::scale::Scaler; use crate::variant::VariantStream; +use crate::webhook::Webhook; struct ScalerEncoder { pub scaler: Scaler, @@ -31,10 +34,15 @@ pub struct PipelineRunner { started: Instant, frame_no: u64, stream_info: Option, + webhook: Webhook, } impl PipelineRunner { - pub fn new(config: PipelineConfig, recv: UnboundedReceiver) -> Self { + pub fn new( + config: PipelineConfig, + webhook: Webhook, + recv: UnboundedReceiver, + ) -> Self { let (demux_out, demux_in) = unbounded_channel(); let (dec_tx, dec_rx) = broadcast::channel::(32); Self { @@ -48,6 +56,7 @@ impl PipelineRunner { started: Instant::now(), frame_no: 0, stream_info: None, + webhook, } } @@ -102,6 +111,9 @@ impl PipelineRunner { info!("Configuring pipeline {:?}", info); self.stream_info = Some(info.clone()); + // re-configure with demuxer info + self.config = self.webhook.configure(&info); + let video_stream = info .channels .iter() @@ -116,16 +128,16 @@ impl PipelineRunner { .push(HlsEgress::new(egress_rx, self.config.id, cfg.clone())); for v in &cfg.variants { - let (var_tx, var_rx) = unbounded_channel(); match v { VariantStream::Video(vs) => { + let (sw_tx, sw_rx) = unbounded_channel(); self.scalers.push(ScalerEncoder { scaler: Scaler::new( self.decoder_output.resubscribe(), - var_tx.clone(), + sw_tx.clone(), vs.clone(), ), - encoder: Encoder::new(var_rx, egress_tx.clone(), v.clone()), + encoder: Encoder::new(sw_rx, egress_tx.clone(), v.clone()), }); } VariantStream::Audio(_) => { diff --git a/src/scale/mod.rs b/src/scale/mod.rs index 36129c3..b9bcec8 100644 --- a/src/scale/mod.rs +++ b/src/scale/mod.rs @@ -3,7 +3,7 @@ use std::ptr; use anyhow::Error; use ffmpeg_sys_next::{ - av_buffer_ref, av_frame_alloc, av_frame_copy_props, av_frame_unref, AVBufferRef, + av_buffer_ref, av_frame_alloc, av_frame_copy_props, AVBufferRef, AVFrame, SWS_BILINEAR, sws_getContext, sws_scale_frame, SwsContext, }; use tokio::sync::broadcast; @@ -74,24 +74,20 @@ impl Scaler { } let ret = sws_scale_frame(self.ctx, dst_frame, frame); - av_frame_unref(frame); if ret < 0 { return Err(Error::msg(get_ffmpeg_error_msg(ret))); } - (*dst_frame).time_base = (*frame).time_base; - (*dst_frame).pts = (*frame).pts; - (*dst_frame).pkt_dts = (*frame).pkt_dts; (*dst_frame).opaque_ref = av_buffer_ref(self.var_id_ref); - self.chan_out.send(PipelinePayload::AvFrame(dst_frame))?; + self.chan_out.send(PipelinePayload::AvFrame("Scaler frame".to_owned(), dst_frame))?; Ok(()) } pub fn process(&mut self) -> Result<(), Error> { while let Ok(pkg) = self.chan_in.try_recv() { match pkg { - PipelinePayload::AvFrame(frm) => unsafe { + PipelinePayload::AvFrame(_, frm) => unsafe { self.process_frame(frm)?; }, _ => return Err(Error::msg("Payload not supported payload")), diff --git a/src/utils.rs b/src/utils.rs index 1d0b61c..4f4dfd7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,5 @@ use std::ffi::CStr; +use std::ptr; use anyhow::Error; use ffmpeg_sys_next::{av_buffer_allocz, av_make_error_string, AVBufferRef, memcpy}; @@ -53,9 +54,12 @@ pub fn video_variant_id_ref(var: &VideoVariant) -> *mut AVBufferRef { } } -pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Uuid { +pub fn id_ref_to_uuid(buf: *mut AVBufferRef) -> Result { unsafe { + if buf == ptr::null_mut() { + return Err(Error::msg("Buffer was null")); + } let binding = Bytes::from(*((*buf).data as *const [u8; 16])); - Uuid::from_bytes_ref(&binding).clone() + Ok(Uuid::from_bytes_ref(&binding).clone()) } } diff --git a/src/variant.rs b/src/variant.rs index ed9b82d..6a857f3 100644 --- a/src/variant.rs +++ b/src/variant.rs @@ -1,5 +1,6 @@ use std::fmt::{Display, Formatter}; +use ffmpeg_sys_next::AVRational; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -9,10 +10,6 @@ pub enum VariantStream { Video(VideoVariant), /// Audio stream mapping Audio(AudioVariant), - /// Copy source stream (video) - CopyVideo(usize), - /// Copy source stream (audio) - CopyAudio(usize), } /// Information related to variant streams for a given egress @@ -107,12 +104,37 @@ impl Display for AudioVariant { } impl VariantStream { + pub fn id(&self) -> Uuid { + match self { + VariantStream::Video(v) => v.id, + VariantStream::Audio(v) => v.id, + } + } + pub fn src_index(&self) -> usize { match self { VariantStream::Video(v) => v.src_index, VariantStream::Audio(v) => v.src_index, - VariantStream::CopyVideo(v) => v.clone(), - VariantStream::CopyAudio(v) => v.clone(), + } + } + + pub fn dst_index(&self) -> usize { + match self { + VariantStream::Video(v) => v.dst_index, + VariantStream::Audio(v) => v.dst_index, + } + } + + pub fn time_base(&self) -> AVRational { + match &self { + VariantStream::Video(vv) => AVRational { + num: 1, + den: 90_000, + }, + VariantStream::Audio(va) => AVRational { + num: 1, + den: va.sample_rate as libc::c_int, + }, } } } diff --git a/src/webhook.rs b/src/webhook.rs index f9f3847..774bb7d 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -1,28 +1,30 @@ use std::fmt::Display; -use ffmpeg_sys_next::{AV_LEVEL_UNKNOWN, AV_PROFILE_H264_HIGH}; -use ffmpeg_sys_next::AVCodecID::{AV_CODEC_ID_AAC, AV_CODEC_ID_H264}; use uuid::Uuid; +use crate::demux::info::{DemuxStreamInfo, StreamChannelType}; use crate::ingress::ConnectionInfo; use crate::pipeline::{EgressType, HLSEgressConfig, PipelineConfig}; +use crate::settings::Settings; use crate::variant::{AudioVariant, VariantStream, VideoVariant}; #[derive(Clone)] pub struct Webhook { - url: String, + config: Settings, } impl Webhook { - pub fn new(url: String) -> Self { - Self { url } + pub fn new(config: Settings) -> Self { + Self { config } } - pub async fn start( - &self, - connection_info: ConnectionInfo, - ) -> Result { - let video_var = VideoVariant { + pub async fn start(&self, connection_info: ConnectionInfo) -> Result<(), anyhow::Error> { + Ok(()) + } + + pub fn configure(&self, stream_info: &DemuxStreamInfo) -> PipelineConfig { + let mut vars: Vec = vec![]; + vars.push(VariantStream::Video(VideoVariant { id: Uuid::new_v4(), src_index: 0, dst_index: 0, @@ -34,11 +36,11 @@ impl Webhook { profile: 100, level: 1, keyframe_interval: 2, - }; - let video_var_2 = VideoVariant { + })); + vars.push(VariantStream::Video(VideoVariant { id: Uuid::new_v4(), src_index: 0, - dst_index: 0, + dst_index: 1, width: 640, height: 360, fps: 30, @@ -47,41 +49,41 @@ impl Webhook { profile: 100, level: 1, keyframe_interval: 2, - }; - let audio_var = AudioVariant { - id: Uuid::new_v4(), - src_index: 1, - dst_index: 0, - bitrate: 320_000, - codec: 86018, - channels: 2, - sample_rate: 44_100, - sample_fmt: "fltp".to_owned(), - }; + })); + let has_audio = stream_info + .channels + .iter() + .any(|c| c.channel_type == StreamChannelType::Audio); + if has_audio { + vars.push(VariantStream::Audio(AudioVariant { + id: Uuid::new_v4(), + src_index: 1, + dst_index: 0, + bitrate: 320_000, + codec: 86018, + channels: 2, + sample_rate: 44_100, + sample_fmt: "fltp".to_owned(), + })); + vars.push(VariantStream::Audio(AudioVariant { + id: Uuid::new_v4(), + src_index: 1, + dst_index: 1, + bitrate: 220_000, + codec: 86018, + channels: 2, + sample_rate: 44_100, + sample_fmt: "fltp".to_owned(), + })); + } - let audio_var_2 = AudioVariant { - id: Uuid::new_v4(), - src_index: 1, - dst_index: 0, - bitrate: 220_000, - codec: 86018, - channels: 2, - sample_rate: 44_100, - sample_fmt: "fltp".to_owned(), - }; - - Ok(PipelineConfig { + PipelineConfig { id: Uuid::new_v4(), + recording: vec![], egress: vec![EgressType::HLS(HLSEgressConfig { - variants: vec![ - VariantStream::Video(video_var), - VariantStream::Video(video_var_2), - VariantStream::Audio(audio_var), - VariantStream::Audio(audio_var_2), - ], - stream_map: "v:0,a:0 v:1,a:1".to_owned(), + out_dir: self.config.output_dir.clone(), + variants: vars, })], - recording: vec![VariantStream::CopyVideo(0), VariantStream::CopyAudio(1)], - }) + } } }