From 6f618ef58f3dfbe1556e78c1daabe90eccd45a15 Mon Sep 17 00:00:00 2001 From: kieran Date: Wed, 13 Nov 2024 11:43:28 +0000 Subject: [PATCH] feat: upgrade to ffmpeg-rs-raw (WIP) --- Cargo.lock | 301 +++++++++++++++++++---------------- Cargo.toml | 34 ++-- src/decode/mod.rs | 129 --------------- src/demux/info.rs | 73 --------- src/demux/mod.rs | 194 ---------------------- src/egress/hls.rs | 294 ++++++---------------------------- src/egress/mod.rs | 30 +--- src/egress/recorder.rs | 176 ++++---------------- src/encode/audio.rs | 292 ---------------------------------- src/encode/mod.rs | 64 -------- src/encode/video.rs | 134 ---------------- src/ingress/file.rs | 53 ++---- src/ingress/mod.rs | 33 ++++ src/ingress/srt.rs | 40 ++--- src/ingress/tcp.rs | 64 ++------ src/ingress/test.rs | 231 ++++++++++++--------------- src/main.rs | 41 ++--- src/pipeline/builder.rs | 29 ---- src/pipeline/mod.rs | 108 ++----------- src/pipeline/runner.rs | 345 ++++++++++++++++++++-------------------- src/scale/mod.rs | 119 -------------- src/utils.rs | 12 -- src/variant/audio.rs | 88 ++-------- src/variant/mapping.rs | 8 +- src/variant/mod.rs | 28 +--- src/variant/video.rs | 130 +++++---------- src/webhook.rs | 34 ++-- 27 files changed, 712 insertions(+), 2372 deletions(-) delete mode 100644 src/decode/mod.rs delete mode 100644 src/demux/info.rs delete mode 100644 src/demux/mod.rs delete mode 100644 src/encode/audio.rs delete mode 100644 src/encode/mod.rs delete mode 100644 src/encode/video.rs delete mode 100644 src/pipeline/builder.rs delete mode 100644 src/scale/mod.rs delete mode 100644 src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 91b69d0..73c4585 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.80" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" dependencies = [ "backtrace", ] @@ -133,9 +133,9 @@ checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" [[package]] name = "arrayref" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" [[package]] name = "arrayvec" @@ -189,22 +189,22 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bindgen" -version = "0.64.0" +version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.2", "cexpr", "clang-sys", + "itertools 0.12.1", "lazy_static", "lazycell", - "peeking_take_while", "proc-macro2", "quote", "regex", "rustc-hash", "shlex", - "syn 1.0.109", + "syn 2.0.52", ] [[package]] @@ -233,9 +233,9 @@ dependencies = [ [[package]] name = "bytemuck" -version = "1.17.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773d90827bc3feecfb67fab12e24de0749aad83c74b9504ecde46237b5cd24e2" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" [[package]] name = "byteorder" @@ -287,10 +287,20 @@ dependencies = [ ] [[package]] -name = "clang-sys" -version = "1.7.0" +name = "clang" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +checksum = "84c044c781163c001b913cd018fc95a628c50d0d2dfea8bca77dad71edb16e37" +dependencies = [ + "clang-sys", + "libc", +] + +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" dependencies = [ "glob", "libc", @@ -458,9 +468,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "data-url" @@ -509,9 +519,9 @@ checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "encoding_rs" -version = "0.8.33" +version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ "cfg-if", ] @@ -537,32 +547,43 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "fdeflate" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f9bfee30e4dedf0ab8b422f03af778d9612b63f502710fc500a334ebe2de645" +checksum = "07c6f4c64c1d33a3111c4466f7365ebdcc37c5bd1ea0d62aae2e3d722aacbedb" dependencies = [ "simd-adler32", ] [[package]] -name = "ffmpeg-sys-next" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2529ad916d08c3562c754c21bc9b17a26c7882c0f5706cc2cd69472175f1620" +name = "ffmpeg-rs-raw" +version = "0.1.0" +source = "git+https://git.v0l.io/Kieran/ffmpeg-rs-raw.git?rev=07a2c728883dff15961edbe1f8c6aa5e65cef293#07a2c728883dff15961edbe1f8c6aa5e65cef293" +dependencies = [ + "anyhow", + "ffmpeg-sys-the-third", + "libc", + "log", + "slimbox", +] + +[[package]] +name = "ffmpeg-sys-the-third" +version = "2.1.0+ffmpeg-7.1" +source = "git+https://github.com/shssoichiro/ffmpeg-the-third.git?branch=master#0fdfa9ab506f5c92aad5a175db081c8a2c1579a1" dependencies = [ "bindgen", "cc", + "clang", "libc", - "num_cpus", "pkg-config", "vcpkg", ] [[package]] name = "flate2" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", "miniz_oxide 0.8.0", @@ -591,9 +612,9 @@ dependencies = [ [[package]] name = "fontdb" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37be9fc20d966be438cd57a45767f73349477fb0f85ce86e000557f787298afb" +checksum = "a3a6f9af55fb97ad673fb7a69533eb2f967648a06fa21f8c9bb2cd6d33975716" dependencies = [ "fontconfig-parser", "log", @@ -744,16 +765,16 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -786,7 +807,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha1", @@ -798,7 +819,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] @@ -839,6 +860,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -846,15 +878,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] [[package]] name = "httparse" -version = "1.8.0" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "httpdate" @@ -870,16 +902,16 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", @@ -963,6 +995,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -991,9 +1032,9 @@ dependencies = [ [[package]] name = "kurbo" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e5aa9f0f96a938266bdb12928a67169e8d22c6a786fda8ed984b85e6ba93c3c" +checksum = "89234b2cc610a7dd927ebde6b41dd1a5d4214cffaef4cf1fb2195d592f92518f" dependencies = [ "arrayvec", "smallvec", @@ -1013,15 +1054,15 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libloading" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", "windows-targets 0.52.4", @@ -1029,9 +1070,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.8" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "linked-hash-map" @@ -1053,9 +1094,9 @@ checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "memmap2" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" dependencies = [ "libc", ] @@ -1068,9 +1109,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" -version = "2.0.4" +version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" dependencies = [ "mime", "unicase", @@ -1089,7 +1130,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", - "simd-adler32", ] [[package]] @@ -1099,6 +1139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -1121,7 +1162,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 0.2.12", "httparse", "log", "memchr", @@ -1199,12 +1240,6 @@ dependencies = [ "digest", ] -[[package]] -name = "peeking_take_while" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1264,18 +1299,18 @@ checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", @@ -1296,21 +1331,21 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" [[package]] name = "png" -version = "0.17.13" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e4b0d3d1312775e782c86c91a111aa1f910cbb65e1337f9975b5f9a554b5e1" +checksum = "52f9d46a34a05a6a57566bc2bfae066ef07585a6e3fa30fbbdff5936380623f0" dependencies = [ "bitflags 1.3.2", "crc32fast", "fdeflate", "flate2", - "miniz_oxide 0.7.2", + "miniz_oxide 0.8.0", ] [[package]] @@ -1319,12 +1354,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pretty-hex" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbc83ee4a840062f368f9096d80077a9841ec117e17e7f700df81958f1451254" - [[package]] name = "pretty_env_logger" version = "0.5.0" @@ -1420,9 +1449,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "resvg" -version = "0.43.0" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7314563c59c7ce31c18e23ad3dd092c37b928a0fa4e1c0a1a6504351ab411d1" +checksum = "4a325d5e8d1cebddd070b13f44cec8071594ab67d1012797c121f27a669b7958" dependencies = [ "gif", "image-webp", @@ -1437,9 +1466,9 @@ dependencies = [ [[package]] name = "rgb" -version = "0.8.48" +version = "0.8.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f86ae463694029097b846d8f99fd5536740602ae00022c0c50c5600720b2f71" +checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a" dependencies = [ "bytemuck", ] @@ -1493,15 +1522,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", -] - [[package]] name = "rustybuzz" version = "0.18.0" @@ -1659,6 +1679,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slimbox" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26dfcf7e4fe830e4b9245b9e0def30d3df9ea194aca707e9a78b079d2b646b1a" + [[package]] name = "slotmap" version = "1.0.7" @@ -1734,36 +1760,6 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "stream-core" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "clap", - "config", - "ffmpeg-sys-next", - "fontdue", - "futures-util", - "itertools", - "libc", - "log", - "pretty-hex", - "pretty_env_logger", - "rand", - "resvg", - "serde", - "srt-tokio", - "tiny-skia", - "tokio", - "tokio-stream", - "url", - "usvg", - "uuid", - "warp", -] - [[package]] name = "streaming-stats" version = "0.2.3" @@ -1953,9 +1949,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" dependencies = [ "futures-util", "log", @@ -2013,9 +2009,9 @@ dependencies = [ [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -2060,14 +2056,14 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 1.1.0", "httparse", "log", "rand", @@ -2091,12 +2087,9 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "unicase" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" [[package]] name = "unicode-bidi" @@ -2133,15 +2126,15 @@ dependencies = [ [[package]] name = "unicode-properties" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ea75f83c0137a9b98608359a5f1af8144876eb67bcb1ce837368e906a9f524" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" [[package]] name = "unicode-script" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad8d71f5726e5f285a935e9fe8edfd53f0491eb6e9a5774097fdabee7cd8c9cd" +checksum = "9fb421b350c9aff471779e262955939f565ec18b86c15364e6bdf0d662ca7c1f" [[package]] name = "unicode-segmentation" @@ -2168,9 +2161,9 @@ dependencies = [ [[package]] name = "usvg" -version = "0.43.0" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6803057b5cbb426e9fb8ce2216f3a9b4ca1dd2c705ba3cbebc13006e437735fd" +checksum = "7447e703d7223b067607655e625e0dbca80822880248937da65966194c4864e6" dependencies = [ "base64 0.22.1", "data-url", @@ -2238,15 +2231,15 @@ dependencies = [ [[package]] name = "warp" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" dependencies = [ "bytes", "futures-channel", "futures-util", "headers", - "http", + "http 0.2.12", "hyper", "log", "mime", @@ -2254,13 +2247,11 @@ dependencies = [ "multer", "percent-encoding", "pin-project", - "rustls-pemfile", "scoped-tls", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-stream", "tokio-tungstenite", "tokio-util", "tower-service", @@ -2466,6 +2457,34 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zap-stream-core" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "clap", + "config", + "ffmpeg-rs-raw", + "fontdue", + "futures-util", + "itertools 0.13.0", + "libc", + "log", + "pretty_env_logger", + "rand", + "resvg", + "serde", + "srt-tokio", + "tiny-skia", + "tokio", + "tokio-stream", + "url", + "usvg", + "uuid", + "warp", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/Cargo.toml b/Cargo.toml index bae9100..50f575d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,36 @@ [package] -name = "stream-core" +name = "zap-stream-core" version = "0.1.0" edition = "2021" +[features] +default = ["test-source"] +srt = ["dep:srt-tokio"] +test-source = ["dep:resvg", "dep:usvg", "dep:tiny-skia", "dep:fontdue"] + [dependencies] -srt-tokio = "0.4.3" -tokio = { version = "1.36.0", features = ["rt-multi-thread", "sync"] } -anyhow = { version = "1.0.80", features = ["backtrace"] } +ffmpeg-rs-raw = { git = "https://git.v0l.io/Kieran/ffmpeg-rs-raw.git", rev = "07a2c728883dff15961edbe1f8c6aa5e65cef293" } + +tokio = { version = "1.36.0", features = ["rt", "rt-multi-thread", "macros"] } +anyhow = { version = "^1.0.91", features = ["backtrace"] } pretty_env_logger = "0.5.0" -bytes = "1.5.0" + tokio-stream = "0.1.14" futures-util = "0.3.30" async-trait = "0.1.77" log = "0.4.21" -ffmpeg-sys-next = { version = "6.1.0", features = ["avformat", "avcodec", "swscale", "avfilter"] } -libc = "0.2.153" -pretty-hex = "0.4.1" uuid = { version = "1.8.0", features = ["v4", "serde"] } serde = { version = "1.0.197", features = ["derive"] } config = { version = "0.14.0", features = ["toml"] } url = "2.5.0" -itertools = "0.12.1" -warp = "0.3.6" +itertools = "0.13.0" rand = "0.8.5" -resvg = "0.43.0" -usvg = "0.43.0" -tiny-skia = "0.11.4" -fontdue = "0.9.2" clap = { version = "4.5.16", features = ["derive"] } +warp = "0.3.7" + +srt-tokio = { version = "0.4.3", optional = true } +resvg = { version = "0.44.0", optional = true } +usvg = { version = "0.44.0", optional = true } +tiny-skia = { version = "0.11.4", optional = true } +fontdue = { version = "0.9.2", optional = true } +libc = "0.2.162" diff --git a/src/decode/mod.rs b/src/decode/mod.rs deleted file mode 100644 index 1a6c9f7..0000000 --- a/src/decode/mod.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::collections::HashMap; -use std::ffi::CStr; -use std::ptr; - -use anyhow::Error; -use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE; -use ffmpeg_sys_next::{ - av_frame_alloc, avcodec_alloc_context3, avcodec_find_decoder, avcodec_free_context, - avcodec_get_name, avcodec_open2, avcodec_parameters_to_context, avcodec_receive_frame, - avcodec_send_packet, AVCodec, AVCodecContext, AVPacket, AVERROR, AVERROR_EOF, -}; - -use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload}; - -struct CodecContext { - pub context: *mut AVCodecContext, - pub codec: *const AVCodec, -} - -impl Drop for CodecContext { - fn drop(&mut self) { - unsafe { - avcodec_free_context(&mut self.context); - self.codec = ptr::null_mut(); - self.context = ptr::null_mut(); - } - } -} - -pub struct Decoder { - codecs: HashMap, - pts: i64, -} - -unsafe impl Send for Decoder {} - -unsafe impl Sync for Decoder {} - -impl Decoder { - pub fn new() -> Self { - Self { - codecs: HashMap::new(), - pts: 0, - } - } - - pub unsafe fn decode_pkt( - &mut self, - pkt: *mut AVPacket, - src: &AVPacketSource, - ) -> Result, Error> { - let stream_index = (*pkt).stream_index; - let stream = match src { - AVPacketSource::Demuxer(s) => *s, - _ => return Err(Error::msg(format!("Cannot decode packet from: {:?}", src))), - }; - - assert_eq!( - stream_index, - (*stream).index, - "Passed stream reference does not match stream_index of packet" - ); - - let codec_par = (*stream).codecpar; - assert_ne!( - codec_par, - ptr::null_mut(), - "Codec parameters are missing from stream" - ); - - if let std::collections::hash_map::Entry::Vacant(e) = self.codecs.entry(stream_index) { - let codec = avcodec_find_decoder((*codec_par).codec_id); - if codec.is_null() { - return Err(Error::msg(format!( - "Failed to find codec: {}", - CStr::from_ptr(avcodec_get_name((*codec_par).codec_id)) - .to_str()? - ))); - } - let context = avcodec_alloc_context3(ptr::null()); - if context.is_null() { - return Err(Error::msg("Failed to alloc context")); - } - if avcodec_parameters_to_context(context, (*stream).codecpar) != 0 { - return Err(Error::msg("Failed to copy codec parameters to context")); - } - (*context).pkt_timebase = (*stream).time_base; - if avcodec_open2(context, codec, ptr::null_mut()) < 0 { - return Err(Error::msg("Failed to open codec")); - } - e.insert(CodecContext { context, codec }); - } - if let Some(ctx) = self.codecs.get_mut(&stream_index) { - let mut ret = avcodec_send_packet(ctx.context, pkt); - if ret < 0 { - return Err(Error::msg(format!("Failed to decode packet {}", ret))); - } - - let mut pkgs = Vec::new(); - while ret >= 0 { - let frame = av_frame_alloc(); - ret = avcodec_receive_frame(ctx.context, frame); - if ret < 0 { - if ret == AVERROR_EOF || ret == AVERROR(libc::EAGAIN) { - break; - } - return Err(Error::msg(format!("Failed to decode {}", ret))); - } - - (*frame).pict_type = AV_PICTURE_TYPE_NONE; // encoder prints warnings - pkgs.push(PipelinePayload::AvFrame( - frame, - AVFrameSource::Decoder(stream), - )); - } - Ok(pkgs) - } else { - Ok(vec![]) - } - } - - pub fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - if let PipelinePayload::AvPacket(pkt, ref src) = pkg { - unsafe { self.decode_pkt(pkt, src) } - } else { - Err(Error::msg("Payload not supported")) - } - } -} diff --git a/src/demux/info.rs b/src/demux/info.rs deleted file mode 100644 index 7fd3838..0000000 --- a/src/demux/info.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::fraction::Fraction; -use ffmpeg_sys_next::AVFormatContext; -use std::fmt::{Display, Formatter}; - -#[derive(Clone, Debug, PartialEq)] -pub struct DemuxerInfo { - pub channels: Vec, - pub ctx: *const AVFormatContext, -} - -unsafe impl Send for DemuxerInfo {} -unsafe impl Sync for DemuxerInfo {} - -impl Display for DemuxerInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Demuxer Info:")?; - for c in &self.channels { - write!(f, "\n{}", c)?; - } - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum StreamChannelType { - Video, - Audio, -} - -impl Display for StreamChannelType { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - StreamChannelType::Video => "video", - StreamChannelType::Audio => "audio", - } - ) - } -} - -#[derive(Clone, Debug, PartialEq)] -pub struct StreamInfoChannel { - pub index: usize, - pub channel_type: StreamChannelType, - pub width: usize, - pub height: usize, - pub fps: f32, - pub format: usize, -} - -impl TryInto for StreamInfoChannel { - type Error = (); - - fn try_into(self) -> Result { - if self.channel_type == StreamChannelType::Video { - Ok(Fraction::from((self.width, self.height))) - } else { - Err(()) - } - } -} - -impl Display for StreamInfoChannel { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{} #{}: size={}x{},fps={}", - self.channel_type, self.index, self.width, self.height, self.fps - ) - } -} diff --git a/src/demux/mod.rs b/src/demux/mod.rs deleted file mode 100644 index 033a36c..0000000 --- a/src/demux/mod.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::ptr; -use std::time::Duration; - -use anyhow::Error; -use bytes::{BufMut, Bytes}; -use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; -use ffmpeg_sys_next::*; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::time::Instant; - -use crate::demux::info::{DemuxerInfo, StreamChannelType, StreamInfoChannel}; -use crate::pipeline::{AVPacketSource, PipelinePayload}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; - -pub mod info; - -pub(crate) struct Demuxer { - ctx: *mut AVFormatContext, - started: Instant, - state: DemuxerBuffer, - info: Option, -} - -unsafe impl Send for Demuxer {} - -unsafe impl Sync for Demuxer {} - -struct DemuxerBuffer { - pub chan_in: UnboundedReceiver, - pub buffer: bytes::BytesMut, -} - -unsafe extern "C" fn read_data( - opaque: *mut libc::c_void, - buffer: *mut libc::c_uchar, - size: libc::c_int, -) -> libc::c_int { - let state = opaque as *mut DemuxerBuffer; - loop { - match (*state).chan_in.try_recv() { - Ok(data) => { - if !data.is_empty() { - (*state).buffer.put(data); - } - if (*state).buffer.len() >= size as usize { - let buf_take = (*state).buffer.split_to(size as usize); - memcpy( - buffer as *mut libc::c_void, - buf_take.as_ptr() as *const libc::c_void, - buf_take.len() as libc::c_ulonglong, - ); - return size; - } else { - continue; - } - } - Err(e) => match e { - TryRecvError::Empty => {} - TryRecvError::Disconnected => { - return AVERROR_EOF; - } - }, - } - } -} - -impl Demuxer { - pub fn new(chan_in: UnboundedReceiver) -> Self { - unsafe { - let ps = avformat_alloc_context(); - (*ps).flags |= AVFMT_FLAG_CUSTOM_IO; - - Self { - ctx: ps, - state: DemuxerBuffer { - chan_in, - buffer: bytes::BytesMut::new(), - }, - info: None, - started: Instant::now(), - } - } - } - - unsafe fn probe_input(&mut self) -> Result { - const BUFFER_SIZE: usize = 4096; - let buf_ptr = ptr::from_mut(&mut self.state) as *mut libc::c_void; - let pb = avio_alloc_context( - av_mallocz(BUFFER_SIZE) as *mut libc::c_uchar, - BUFFER_SIZE as libc::c_int, - 0, - buf_ptr, - Some(read_data), - None, - None, - ); - - (*self.ctx).pb = pb; - let ret = avformat_open_input( - &mut self.ctx, - ptr::null_mut(), - ptr::null_mut(), - ptr::null_mut(), - ); - return_ffmpeg_error!(ret); - - if avformat_find_stream_info(self.ctx, ptr::null_mut()) < 0 { - return Err(Error::msg("Could not find stream info")); - } - av_dump_format(self.ctx, 0, ptr::null_mut(), 0); - - let mut channel_infos = vec![]; - let video_stream_index = - av_find_best_stream(self.ctx, AVMEDIA_TYPE_VIDEO, -1, -1, ptr::null_mut(), 0) as usize; - if video_stream_index != AVERROR_STREAM_NOT_FOUND as usize { - let video_stream = *(*self.ctx).streams.add(video_stream_index); - channel_infos.push(StreamInfoChannel { - index: video_stream_index, - channel_type: StreamChannelType::Video, - width: (*(*video_stream).codecpar).width as usize, - height: (*(*video_stream).codecpar).height as usize, - fps: av_q2d((*video_stream).avg_frame_rate) as f32, - format: (*(*video_stream).codecpar).format as usize - }); - } - - let audio_stream_index = - av_find_best_stream(self.ctx, AVMEDIA_TYPE_AUDIO, -1, -1, ptr::null_mut(), 0) as usize; - if audio_stream_index != AVERROR_STREAM_NOT_FOUND as usize { - let audio_stream = *(*self.ctx).streams.add(audio_stream_index); - let _codec_copy = unsafe { - let ptr = avcodec_parameters_alloc(); - avcodec_parameters_copy(ptr, (*audio_stream).codecpar); - ptr - }; - channel_infos.push(StreamInfoChannel { - index: audio_stream_index, - channel_type: StreamChannelType::Audio, - width: (*(*audio_stream).codecpar).width as usize, - height: (*(*audio_stream).codecpar).height as usize, - fps: 0.0, - format: (*(*audio_stream).codecpar).format as usize - }); - } - - let info = DemuxerInfo { - channels: channel_infos, - ctx: self.ctx, - }; - Ok(info) - } - - pub unsafe fn get_packet(&mut self) -> Result { - let pkt: *mut AVPacket = av_packet_alloc(); - let ret = av_read_frame(self.ctx, pkt); - if ret == AVERROR_EOF { - return Ok(PipelinePayload::Flush); - } - if ret < 0 { - let msg = get_ffmpeg_error_msg(ret); - return Err(Error::msg(msg)); - } - let stream = *(*self.ctx).streams.add((*pkt).stream_index as usize); - let pkg = PipelinePayload::AvPacket(pkt, AVPacketSource::Demuxer(stream)); - Ok(pkg) - } - - /// Try probe input stream - pub fn try_probe(&mut self) -> Result, Error> { - match &self.info { - None => { - if (Instant::now() - self.started) > Duration::from_millis(500) { - let inf = unsafe { self.probe_input()? }; - self.info = Some(inf.clone()); - Ok(Some(inf)) - } else { - Ok(None) - } - } - Some(i) => Ok(Some(i.clone())), - } - } -} - -impl Drop for Demuxer { - fn drop(&mut self) { - unsafe { - avformat_free_context(self.ctx); - self.ctx = ptr::null_mut(); - } - } -} diff --git a/src/egress/hls.rs b/src/egress/hls.rs index 44debc9..320f365 100644 --- a/src/egress/hls.rs +++ b/src/egress/hls.rs @@ -1,45 +1,21 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::HashMap; use std::fmt::Display; -use std::ptr; +use std::path::PathBuf; -use anyhow::Error; -use ffmpeg_sys_next::{ - av_dump_format, av_interleaved_write_frame, av_opt_set, av_packet_rescale_ts, - avcodec_parameters_copy, avcodec_parameters_from_context, avformat_alloc_output_context2, - avformat_free_context, avformat_write_header, AVFormatContext, AVPacket, AVStream, -}; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; +use ffmpeg_rs_raw::{Encoder, Muxer}; use itertools::Itertools; use log::info; use uuid::Uuid; -use crate::egress::{map_variants_to_streams, EgressConfig}; -use crate::encode::dump_pkt_info; -use crate::pipeline::{AVPacketSource, PipelinePayload, PipelineProcessor}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; -use crate::variant::{find_stream, StreamMapping, VariantStream}; +use crate::egress::{Egress, EgressConfig}; +use crate::variant::{StreamMapping, VariantStream}; pub struct HlsEgress { id: Uuid, config: EgressConfig, - variants: Vec, - ctx: *mut AVFormatContext, - stream_init: HashSet, - init: bool, - packet_buffer: VecDeque, -} - -unsafe impl Send for HlsEgress {} - -unsafe impl Sync for HlsEgress {} - -impl Drop for HlsEgress { - fn drop(&mut self) { - unsafe { - avformat_free_context(self.ctx); - self.ctx = ptr::null_mut(); - } - } + muxer: Muxer, } enum HlsMapEntry { @@ -70,110 +46,51 @@ impl Display for HlsStream { } impl HlsEgress { - pub fn new(id: Uuid, config: EgressConfig, variants: Vec) -> Self { - let filtered_vars: Vec = config - .variants - .iter() - .filter_map(|x| variants.iter().find(|y| y.id() == *x)) - .cloned() - .collect(); + pub fn new<'a>( + config: EgressConfig, + encoded: impl Iterator, + ) -> Result { + let id = Uuid::new_v4(); + let base = PathBuf::from(&config.out_dir).join(id.to_string()); - Self { - id, - config, - variants: filtered_vars, - ctx: ptr::null_mut(), - init: false, - stream_init: HashSet::new(), - packet_buffer: VecDeque::new(), - } - } + let mut opts = HashMap::new(); + opts.insert( + "hls_segment_filename".to_string(), + base.join("%v/live.m3u8").to_string_lossy().to_string(), + ); + opts.insert("master_pl_name".to_string(), "live.m3u8".to_string()); + opts.insert("master_pl_publish_rate".to_string(), "10".to_string()); + opts.insert("hls_time".to_string(), "2".to_string()); + opts.insert("hls_flags".to_string(), "delete_segments".to_string()); - pub(crate) fn setup_muxer(&mut self) -> Result<(), Error> { - unsafe { - let mut ctx = ptr::null_mut(); - - let base = format!("{}/{}", self.config.out_dir, self.id); - - let ret = avformat_alloc_output_context2( - &mut ctx, - ptr::null(), - "hls\0".as_ptr() as *const libc::c_char, - format!("{}/%v/live.m3u8\0", base).as_ptr() as *const libc::c_char, - ); - return_ffmpeg_error!(ret); - av_opt_set( - (*ctx).priv_data, - "hls_segment_filename\0".as_ptr() as *const libc::c_char, - format!("{}/%v/%05d.ts\0", base).as_ptr() as *const libc::c_char, - 0, - ); - - av_opt_set( - (*ctx).priv_data, - "master_pl_name\0".as_ptr() as *const libc::c_char, - "live.m3u8\0".as_ptr() as *const libc::c_char, - 0, - ); - - av_opt_set( - (*ctx).priv_data, - "master_pl_publish_rate\0".as_ptr() as *const libc::c_char, - "10\0".as_ptr() as *const libc::c_char, - 0, - ); - - if let Some(first_video_track) = self.variants.iter().find_map(|v| { - if let VariantStream::Video(vv) = v { - Some(vv) - } else { - None - } - }) { - av_opt_set( - (*ctx).priv_data, - "hls_time\0".as_ptr() as *const libc::c_char, - format!( - "{}\0", - first_video_track.keyframe_interval / first_video_track.fps - ) - .as_ptr() as *const libc::c_char, - 0, - ); + let muxer = unsafe { + let mut m = Muxer::new().with_output(&base, Some("hls"), Some(opts))?; + for e in encoded { + m.add_stream_encoder(e)?; } + m.open()?; + m + }; - av_opt_set( - (*ctx).priv_data, - "hls_flags\0".as_ptr() as *const libc::c_char, - "delete_segments\0".as_ptr() as *const libc::c_char, - 0, - ); - - map_variants_to_streams(ctx, &self.variants)?; - self.ctx = ctx; - Ok(()) - } + Ok(Self { id, config, muxer }) } - unsafe fn setup_hls_mapping(&mut self) -> Result<(), Error> { - if self.ctx.is_null() { - return Err(Error::msg("Context not setup")); - } - + unsafe fn setup_hls_mapping<'a>( + variants: impl Iterator, + ) -> Result { // configure mapping let mut stream_map = Vec::new(); - for (g, vars) in &self - .variants - .iter() + for (g, vars) in &variants .sorted_by(|a, b| a.group_id().cmp(&b.group_id())) .group_by(|x| x.group_id()) { - let mut group = HlsStream { + let group = HlsStream { name: format!("stream_{}", g), entries: Vec::new(), }; for var in vars { - let n = Self::get_as_nth_stream_type(self.ctx, var); + todo!("get nth stream"); + let n = 0; match var { VariantStream::Video(_) => group.entries.push(HlsMapEntry::Video(n)), VariantStream::Audio(_) => group.entries.push(HlsMapEntry::Audio(n)), @@ -187,137 +104,16 @@ impl HlsEgress { info!("map_str={}", stream_map); - av_opt_set( - (*self.ctx).priv_data, - "var_stream_map\0".as_ptr() as *const libc::c_char, - format!("{}\0", stream_map).as_ptr() as *const libc::c_char, - 0, - ); - - av_dump_format(self.ctx, 0, ptr::null(), 1); - Ok(()) - } - unsafe fn process_av_packet_internal( - &mut self, - pkt: *mut AVPacket, - src: &AVPacketSource, - ) -> Result<(), Error> { - let variant = match src { - AVPacketSource::Encoder(v) => find_stream(&self.variants, v)?, - AVPacketSource::Demuxer(v) => { - let var = self - .variants - .iter() - .find(|x| x.src_index() == (*(*v)).index as usize) - .ok_or(Error::msg("Demuxer packet didn't match any variant"))?; - let dst_stream = Self::get_dst_stream(self.ctx, var.dst_index()); - av_packet_rescale_ts(pkt, (*(*v)).time_base, (*dst_stream).time_base); - var - } - }; - (*pkt).stream_index = variant.dst_index() as libc::c_int; - - let ret = av_interleaved_write_frame(self.ctx, pkt); - return_ffmpeg_error!(ret); - Ok(()) - } - - fn process_payload_internal(&mut self, pkg: PipelinePayload) -> Result<(), Error> { - if let PipelinePayload::AvPacket(p, ref s) = pkg { - unsafe { - self.process_av_packet_internal(p, s)?; - } - } - Ok(()) - } - - unsafe fn process_payload(&mut self, pkg: PipelinePayload) -> Result<(), Error> { - if !self.init && self.stream_init.len() == self.config.variants.len() { - self.setup_hls_mapping()?; - - let ret = avformat_write_header(self.ctx, ptr::null_mut()); - return_ffmpeg_error!(ret); - - self.init = true; - // dequeue buffer - while let Some(pkt) = self.packet_buffer.pop_front() { - self.process_payload_internal(pkt)?; - } - return Ok(()); - } else if !self.init { - self.packet_buffer.push_back(pkg); - return Ok(()); - } - - self.process_payload_internal(pkg) - } - - unsafe fn get_dst_stream(ctx: *const AVFormatContext, idx: usize) -> *mut AVStream { - for x in 0..(*ctx).nb_streams { - let stream = *(*ctx).streams.add(x as usize); - if (*stream).index as usize == idx { - return stream; - } - } - panic!("Stream index not found in output") - } - - unsafe fn get_as_nth_stream_type(ctx: *const AVFormatContext, var: &VariantStream) -> usize { - let stream = Self::get_dst_stream(ctx, var.dst_index()); - let mut ctr = 0; - for x in 0..(*ctx).nb_streams { - let stream_x = *(*ctx).streams.add(x as usize); - if (*(*stream).codecpar).codec_type == (*(*stream_x).codecpar).codec_type { - if (*stream_x).index == (*stream).index { - break; - } - ctr += 1; - } - } - ctr + Ok(stream_map) } } -impl PipelineProcessor for HlsEgress { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - match pkg { - PipelinePayload::AvPacket(_, _) => unsafe { - self.process_payload(pkg)?; - }, - PipelinePayload::SourceInfo(ref d) => unsafe { - for var in &self.variants { - match var { - VariantStream::CopyVideo(cv) => { - let src = *(*d.ctx).streams.add(cv.src_index); - let dst = Self::get_dst_stream(self.ctx, cv.dst_index); - let ret = avcodec_parameters_copy((*dst).codecpar, (*src).codecpar); - return_ffmpeg_error!(ret); - self.stream_init.insert(var.id()); - } - VariantStream::CopyAudio(ca) => { - let src = *(*d.ctx).streams.add(ca.src_index); - let dst = Self::get_dst_stream(self.ctx, ca.dst_index); - let ret = avcodec_parameters_copy((*dst).codecpar, (*src).codecpar); - return_ffmpeg_error!(ret); - self.stream_init.insert(var.id()); - } - _ => {} - } - } - }, - PipelinePayload::EncoderInfo(ref var, ctx) => unsafe { - if let Some(my_var) = self.variants.iter().find(|x| x.id() == *var) { - if !self.stream_init.contains(var) { - let out_stream = Self::get_dst_stream(self.ctx, my_var.dst_index()); - avcodec_parameters_from_context((*out_stream).codecpar, ctx); - self.stream_init.insert(*var); - } - } - }, - _ => return Err(Error::msg(format!("Payload not supported: {:?}", pkg))), +impl Egress for HlsEgress { + unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result<()> { + if self.config.variants.contains(variant) { + self.muxer.write_packet(packet) + } else { + Ok(()) } - - // Muxer never returns anything - Ok(vec![]) } } diff --git a/src/egress/mod.rs b/src/egress/mod.rs index 0bff404..9ed1957 100644 --- a/src/egress/mod.rs +++ b/src/egress/mod.rs @@ -1,10 +1,8 @@ -use std::fmt::{Display, Formatter}; -use std::ptr; - -use crate::variant::{StreamMapping, VariantStream}; -use anyhow::Error; -use ffmpeg_sys_next::{avformat_new_stream, AVFormatContext}; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::fmt::{Display, Formatter}; use uuid::Uuid; pub mod hls; @@ -16,7 +14,7 @@ pub struct EgressConfig { pub name: String, pub out_dir: String, /// Which variants will be used in this muxer - pub variants: Vec, + pub variants: HashSet, } impl Display for EgressConfig { @@ -32,20 +30,6 @@ impl Display for EgressConfig { } } -pub unsafe fn map_variants_to_streams( - ctx: *mut AVFormatContext, - variants: &Vec, -) -> Result<(), Error> { - for var in variants { - let stream = avformat_new_stream(ctx, ptr::null()); - if stream.is_null() { - return Err(Error::msg("Failed to add stream to output")); - } - - // replace stream index value with variant dst_index - (*stream).index = var.dst_index() as libc::c_int; - - var.to_stream(stream); - } - Ok(()) +pub trait Egress { + unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result<()>; } diff --git a/src/egress/recorder.rs b/src/egress/recorder.rs index 6b95f76..af43557 100644 --- a/src/egress/recorder.rs +++ b/src/egress/recorder.rs @@ -1,160 +1,54 @@ -use std::collections::{HashSet, VecDeque}; -use std::{fs, ptr}; - -use anyhow::Error; -use ffmpeg_sys_next::{ - av_dump_format, av_interleaved_write_frame, av_opt_set, avformat_alloc_output_context2, - avformat_free_context, avio_open2, AVFormatContext, AVPacket, AVIO_FLAG_WRITE, -}; -use ffmpeg_sys_next::{ - avcodec_parameters_from_context, avformat_write_header, AVFMT_GLOBALHEADER, - AV_CODEC_FLAG_GLOBAL_HEADER, -}; -use log::info; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPacket; +use ffmpeg_rs_raw::{Encoder, Muxer}; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; use uuid::Uuid; -use crate::egress::{map_variants_to_streams, EgressConfig}; -use crate::pipeline::{PipelinePayload, PipelineProcessor}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; -use crate::variant::{find_stream, StreamMapping, VariantStream}; +use crate::egress::{Egress, EgressConfig}; pub struct RecorderEgress { id: Uuid, config: EgressConfig, - variants: Vec, - ctx: *mut AVFormatContext, - stream_init: HashSet, - init: bool, - packet_buffer: VecDeque, -} - -unsafe impl Send for RecorderEgress {} - -unsafe impl Sync for RecorderEgress {} - -impl Drop for RecorderEgress { - fn drop(&mut self) { - unsafe { - avformat_free_context(self.ctx); - self.ctx = ptr::null_mut(); - } - } + muxer: Muxer, } impl RecorderEgress { - pub fn new(id: Uuid, config: EgressConfig, variants: Vec) -> Self { - let filtered_vars: Vec = config - .variants - .iter() - .filter_map(|x| variants.iter().find(|y| y.id() == *x)) - .cloned() - .collect(); - Self { - id, - config, - variants: filtered_vars, - ctx: ptr::null_mut(), - stream_init: HashSet::new(), - init: false, - packet_buffer: VecDeque::new(), - } - } + pub fn new<'a>( + config: EgressConfig, + variants: impl Iterator, + ) -> Result { + let id = Uuid::new_v4(); + let base = PathBuf::from(&config.out_dir).join(id.to_string()); - unsafe fn setup_muxer(&mut self) -> Result<(), Error> { - let base = format!("{}/{}", self.config.out_dir, self.id); + let out_file = base.join("recording.mp4"); + fs::create_dir_all(&base)?; - let out_file = format!("{}/recording.mp4\0", base); - fs::create_dir_all(base.clone())?; - - let mut ctx = ptr::null_mut(); - let ret = avformat_alloc_output_context2( - &mut ctx, - ptr::null_mut(), - ptr::null_mut(), - out_file.as_ptr() as *const libc::c_char, + let mut opt = HashMap::new(); + opt.insert( + "movflags".to_string(), + "+dash+delay_moov+skip_sidx+skip_trailer".to_string(), ); - return_ffmpeg_error!(ret); - map_variants_to_streams(ctx, &self.variants)?; - if (*(*ctx).oformat).flags & AVFMT_GLOBALHEADER != 0 { - (*ctx).flags |= AV_CODEC_FLAG_GLOBAL_HEADER as libc::c_int; - } - av_opt_set( - (*ctx).priv_data, - "movflags\0".as_ptr() as *const libc::c_char, - "+dash+delay_moov+skip_sidx+skip_trailer\0".as_ptr() as *const libc::c_char, - 0, - ); - self.ctx = ctx; - Ok(()) + let muxer = unsafe { + let mut m = Muxer::new().with_output(&out_file, None, Some(opt))?; + for var in variants { + m.add_stream_encoder(var)?; + } + m.open()?; + m + }; + Ok(Self { id, config, muxer }) } +} - unsafe fn open_muxer(&mut self) -> Result { - if !self.init && self.stream_init.len() == self.config.variants.len() { - let ret = avio_open2( - &mut (*self.ctx).pb, - (*self.ctx).url, - AVIO_FLAG_WRITE, - ptr::null_mut(), - ptr::null_mut(), - ); - return_ffmpeg_error!(ret); - - av_dump_format(self.ctx, 0, ptr::null(), 1); - let ret = avformat_write_header(self.ctx, ptr::null_mut()); - return_ffmpeg_error!(ret); - - self.init = true; - Ok(true) +impl Egress for RecorderEgress { + unsafe fn process_pkt(&mut self, packet: *mut AVPacket, variant: &Uuid) -> Result<()> { + if self.config.variants.contains(variant) { + self.muxer.write_packet(packet) } else { - Ok(self.init) + Ok(()) } } - - unsafe fn process_pkt(&mut self, pkt: *mut AVPacket) -> Result<(), Error> { - //dump_pkt_info(pkt); - let ret = av_interleaved_write_frame(self.ctx, pkt); - return_ffmpeg_error!(ret); - Ok(()) - } -} - -impl PipelineProcessor for RecorderEgress { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - match pkg { - PipelinePayload::AvPacket(pkt, ref src) => unsafe { - if self.open_muxer()? { - while let Some(pkt) = self.packet_buffer.pop_front() { - match pkt { - PipelinePayload::AvPacket(pkt, ref src) => { - self.process_pkt(pkt)?; - } - _ => return Err(Error::msg("")), - } - } - self.process_pkt(pkt)?; - } else { - self.packet_buffer.push_back(pkg); - } - }, - PipelinePayload::EncoderInfo(ref var, ctx) => unsafe { - if self.ctx.is_null() { - self.setup_muxer()?; - } - if !self.stream_init.contains(var) { - let my_var = find_stream(&self.variants, var)?; - let out_stream = *(*self.ctx).streams.add(my_var.dst_index()); - avcodec_parameters_from_context((*out_stream).codecpar, ctx); - (*(*out_stream).codecpar).codec_tag = 0; - - self.stream_init.insert(*var); - info!("Setup encoder info: {}", my_var); - } - }, - _ => return Err(Error::msg("Payload not supported")), - } - // Muxer never returns anything - Ok(vec![]) - } } diff --git a/src/encode/audio.rs b/src/encode/audio.rs deleted file mode 100644 index c92952a..0000000 --- a/src/encode/audio.rs +++ /dev/null @@ -1,292 +0,0 @@ -use std::ffi::CStr; -use std::mem::transmute; -use std::ptr; - -use anyhow::Error; -use ffmpeg_sys_next::{ - av_audio_fifo_alloc, av_audio_fifo_free, av_audio_fifo_read, av_audio_fifo_size, - av_audio_fifo_write, av_channel_layout_copy, av_frame_alloc, av_frame_free, - av_get_sample_fmt_name, av_packet_alloc, av_packet_free, av_packet_rescale_ts, - av_samples_alloc_array_and_samples, AVAudioFifo, AVCodec, - avcodec_alloc_context3, avcodec_free_context, avcodec_open2, avcodec_receive_packet, - avcodec_send_frame, AVCodecContext, AVERROR, AVFrame, AVRational, swr_alloc_set_opts2, swr_convert_frame, - swr_free, swr_init, SwrContext, -}; -use libc::EAGAIN; -use log::info; - -use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; -use crate::variant::{EncodedStream, StreamMapping}; -use crate::variant::audio::AudioVariant; - -pub struct AudioEncoder { - variant: AudioVariant, - ctx: *mut AVCodecContext, - codec: *const AVCodec, - swr_ctx: *mut SwrContext, - fifo: *mut AVAudioFifo, - pts: i64, -} - -unsafe impl Send for AudioEncoder {} - -unsafe impl Sync for AudioEncoder {} - -impl Drop for AudioEncoder { - fn drop(&mut self) { - unsafe { - swr_free(&mut self.swr_ctx); - av_audio_fifo_free(self.fifo); - avcodec_free_context(&mut self.ctx); - } - } -} - -impl AudioEncoder { - pub fn new(variant: AudioVariant) -> Self { - Self { - ctx: ptr::null_mut(), - codec: ptr::null(), - swr_ctx: ptr::null_mut(), - fifo: ptr::null_mut(), - variant, - pts: 0, - } - } - - unsafe fn setup_encoder( - &mut self, - frame: *mut AVFrame, - ) -> Result, Error> { - if !self.ctx.is_null() { - return Ok(None); - } - - let encoder = self.variant.get_codec(); - if encoder.is_null() { - return Err(Error::msg("Encoder not found")); - } - - let ctx = avcodec_alloc_context3(encoder); - if ctx.is_null() { - return Err(Error::msg("Failed to allocate encoder context")); - } - - self.variant.to_codec_context(ctx); - - // setup re-sampler if output format does not match input format - if (*ctx).sample_fmt != transmute((*frame).format) - || (*ctx).sample_rate != (*frame).sample_rate - || (*ctx).ch_layout.nb_channels != (*frame).ch_layout.nb_channels - { - info!( - "Setup audio resampler: {}.{}@{} -> {}.{}@{}", - (*frame).ch_layout.nb_channels, - CStr::from_ptr(av_get_sample_fmt_name(transmute((*frame).format))) - .to_str()?, - (*frame).sample_rate, - (*ctx).ch_layout.nb_channels, - CStr::from_ptr(av_get_sample_fmt_name((*ctx).sample_fmt)) - .to_str()?, - (*ctx).sample_rate - ); - - let mut swr_ctx = ptr::null_mut(); - let ret = swr_alloc_set_opts2( - &mut swr_ctx, - &(*ctx).ch_layout, - (*ctx).sample_fmt, - (*ctx).sample_rate, - &(*frame).ch_layout, - transmute((*frame).format), - (*frame).sample_rate, - 0, - ptr::null_mut(), - ); - return_ffmpeg_error!(ret); - - let ret = swr_init(swr_ctx); - return_ffmpeg_error!(ret); - - self.swr_ctx = swr_ctx; - - let fifo = av_audio_fifo_alloc((*ctx).sample_fmt, (*ctx).ch_layout.nb_channels, 1); - if fifo.is_null() { - return Err(Error::msg("Failed to allocate audio FIFO")); - } - - self.fifo = fifo; - } - - let ret = avcodec_open2(ctx, encoder, ptr::null_mut()); - return_ffmpeg_error!(ret); - - self.ctx = ctx; - self.codec = encoder; - Ok(Some(PipelinePayload::EncoderInfo(self.variant.id(), ctx))) - } - - /// Returns true if we should process audio frame from FIFO - /// false if nothing to process this frame - unsafe fn process_audio_frame( - &mut self, - frame: *mut AVFrame, - ) -> Result, Error> { - if self.swr_ctx.is_null() { - // no re-sampler, return input frame - return Ok(Some(frame)); - } - - let mut out_frame = self.new_frame(); - let ret = swr_convert_frame(self.swr_ctx, out_frame, frame); - if ret < 0 { - av_frame_free(&mut out_frame); - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - let ret = av_audio_fifo_write( - self.fifo, - (*out_frame).extended_data as *const *mut libc::c_void, - (*out_frame).nb_samples, - ); - if ret < 0 { - av_frame_free(&mut out_frame); - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - if ret != (*out_frame).nb_samples { - av_frame_free(&mut out_frame); - return Err(Error::msg(format!( - "FIFO write {} != {}", - ret, - (*out_frame).nb_samples - ))); - } - - //info!("Resampled {}->{} (wrote={})", in_samples, (*out_frame).nb_samples, ret); - av_frame_free(&mut out_frame); - - let buff = av_audio_fifo_size(self.fifo); - if buff < (*self.ctx).frame_size { - Ok(None) - } else { - let out_frame = self.read_fifo_frame()?; - Ok(Some(out_frame)) - } - } - - unsafe fn read_fifo_frame(&mut self) -> Result<*mut AVFrame, Error> { - let mut out_frame = self.new_frame(); - - let ret = av_samples_alloc_array_and_samples( - &mut (*out_frame).extended_data, - ptr::null_mut(), - (*out_frame).ch_layout.nb_channels, - (*out_frame).nb_samples, - transmute((*out_frame).format), - 0, - ); - if ret < 0 { - av_frame_free(&mut out_frame); - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - let ret = av_audio_fifo_read( - self.fifo, - (*out_frame).extended_data as *const *mut libc::c_void, - (*out_frame).nb_samples, - ); - if ret < 0 { - av_frame_free(&mut out_frame); - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - assert_eq!( - ret, - (*out_frame).nb_samples, - "Read wrong number of samples from FIFO" - ); - Ok(out_frame) - } - - unsafe fn new_frame(&self) -> *mut AVFrame { - let out_frame = av_frame_alloc(); - (*out_frame).nb_samples = (*self.ctx).frame_size; - av_channel_layout_copy(&mut (*out_frame).ch_layout, &(*self.ctx).ch_layout); - (*out_frame).format = (*self.ctx).sample_fmt as libc::c_int; - (*out_frame).sample_rate = (*self.ctx).sample_rate; - out_frame - } - - unsafe fn process_frame( - &mut self, - frame: *mut AVFrame, - in_tb: &AVRational, - ) -> Result, Error> { - let mut pkgs = Vec::new(); - if let Some(di) = self.setup_encoder(frame)? { - pkgs.push(di); - } - let frame = self.process_audio_frame(frame)?; - if frame.is_none() { - return Ok(pkgs); - } - let mut frame = frame.unwrap(); - - // examples do it like this - (*frame).pts = self.pts; - self.pts += (*frame).nb_samples as i64; - - let mut ret = avcodec_send_frame(self.ctx, frame); - if ret < 0 && ret != AVERROR(EAGAIN) { - av_frame_free(&mut frame); - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - while ret > 0 || ret == AVERROR(EAGAIN) { - let mut pkt = av_packet_alloc(); - ret = avcodec_receive_packet(self.ctx, pkt); - if ret < 0 { - av_packet_free(&mut pkt); - if ret == AVERROR(EAGAIN) { - break; - } - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - //set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant); - av_packet_rescale_ts(pkt, *in_tb, self.variant.time_base()); - pkgs.push(PipelinePayload::AvPacket( - pkt, - AVPacketSource::Encoder(self.variant.id()), - )); - } - - av_frame_free(&mut frame); - Ok(pkgs) - } -} - -impl PipelineProcessor for AudioEncoder { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - match pkg { - PipelinePayload::AvFrame(frm, ref src) => unsafe { - let in_stream = match src { - AVFrameSource::Decoder(s) => *s, - _ => return Err(Error::msg(format!("Cannot process frame from: {:?}", src))), - }; - if self.variant.src_index() == (*in_stream).index as usize { - self.process_frame(frm, &(*in_stream).time_base) - } else { - // stream didnt match, skipping - Ok(vec![]) - } - }, - PipelinePayload::Flush => unsafe { - Ok(self.process_frame(ptr::null_mut(), &AVRational { num: 0, den: 1 })?) - }, - _ => Err(Error::msg("Payload not supported")), - } - } -} diff --git a/src/encode/mod.rs b/src/encode/mod.rs deleted file mode 100644 index 924af23..0000000 --- a/src/encode/mod.rs +++ /dev/null @@ -1,64 +0,0 @@ -use ffmpeg_sys_next::AVMediaType::AVMEDIA_TYPE_VIDEO; -use ffmpeg_sys_next::{ - av_packet_rescale_ts, AVCodecContext, AVPacket, AVRational, AV_NOPTS_VALUE, AV_PKT_FLAG_KEY, -}; -use log::info; - -use crate::variant::{EncodedStream, StreamMapping}; - -pub mod audio; -pub mod video; - -/// Set packet details based on decoded frame -pub unsafe fn set_encoded_pkt_timing( - ctx: *mut AVCodecContext, - pkt: *mut AVPacket, - in_tb: &AVRational, - pts: &mut i64, - var: &TVar, -) where - TVar: EncodedStream + StreamMapping, -{ - let out_tb = (*ctx).time_base; - - (*pkt).stream_index = var.dst_index() as libc::c_int; - if (*pkt).duration <= 0 && (*ctx).codec_type == AVMEDIA_TYPE_VIDEO { - let tb_sec = in_tb.den as i64 / in_tb.num as i64; - let fps = (*ctx).framerate.num as i64 * (*ctx).framerate.den as i64; - (*pkt).duration = tb_sec / if fps == 0 { 1 } else { fps } - } - - av_packet_rescale_ts(pkt, *in_tb, out_tb); - (*pkt).time_base = var.time_base(); - (*pkt).pos = -1; - if (*pkt).pts == AV_NOPTS_VALUE { - (*pkt).pts = *pts; - *pts += (*pkt).duration; - } - if (*pkt).dts == AV_NOPTS_VALUE { - (*pkt).dts = (*pkt).pts; - } -} - -pub unsafe fn dump_pkt_info(pkt: *const AVPacket) { - let tb = (*pkt).time_base; - info!( - "stream {}: keyframe={}, duration={}, dts={}, pts={}, size={}, tb={}/{}", - (*pkt).stream_index, - ((*pkt).flags & AV_PKT_FLAG_KEY) != 0, - (*pkt).duration, - if (*pkt).dts == AV_NOPTS_VALUE { - "N/A".to_owned() - } else { - format!("{}", (*pkt).dts) - }, - if (*pkt).pts == AV_NOPTS_VALUE { - "N/A".to_owned() - } else { - format!("{}", (*pkt).pts) - }, - (*pkt).size, - tb.num, - tb.den - ); -} diff --git a/src/encode/video.rs b/src/encode/video.rs deleted file mode 100644 index 6e15a57..0000000 --- a/src/encode/video.rs +++ /dev/null @@ -1,134 +0,0 @@ -use std::mem::transmute; -use std::ptr; - -use anyhow::Error; -use ffmpeg_sys_next::{ - av_packet_alloc, av_packet_free, av_packet_rescale_ts, avcodec_alloc_context3, - avcodec_find_encoder, avcodec_open2, avcodec_receive_packet, avcodec_send_frame, AVCodec, - AVCodecContext, AVFrame, AVRational, AVERROR, -}; -use libc::EAGAIN; - -use crate::pipeline::{AVFrameSource, AVPacketSource, PipelinePayload, PipelineProcessor}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; -use crate::variant::video::VideoVariant; -use crate::variant::{EncodedStream, StreamMapping}; - -pub struct VideoEncoder { - variant: VideoVariant, - ctx: *mut AVCodecContext, - codec: *const AVCodec, - pts: i64, -} - -unsafe impl Send for VideoEncoder {} - -unsafe impl Sync for VideoEncoder {} - -impl VideoEncoder { - pub fn new(variant: VideoVariant) -> Self { - Self { - ctx: ptr::null_mut(), - codec: ptr::null(), - variant, - pts: 0, - } - } - - unsafe fn setup_encoder(&mut self) -> Result, Error> { - if !self.ctx.is_null() { - return Ok(None); - } - - let codec = self.variant.codec; - let encoder = avcodec_find_encoder(transmute(codec as i32)); - if encoder.is_null() { - return Err(Error::msg("Encoder not found")); - } - - let ctx = avcodec_alloc_context3(encoder); - if ctx.is_null() { - return Err(Error::msg("Failed to allocate encoder context")); - } - - self.variant.to_codec_context(ctx); - - let ret = avcodec_open2(ctx, encoder, ptr::null_mut()); - return_ffmpeg_error!(ret); - - self.ctx = ctx; - self.codec = encoder; - Ok(Some(PipelinePayload::EncoderInfo(self.variant.id(), ctx))) - } - - unsafe fn process_frame( - &mut self, - frame: *mut AVFrame, - in_tb: &AVRational, - ) -> Result, Error> { - let mut pkgs = Vec::new(); - - if let Some(ei) = self.setup_encoder()? { - pkgs.push(ei); - } - - (*frame).pts = self.pts; - self.pts += (*frame).duration; - - let mut ret = avcodec_send_frame(self.ctx, frame); - if ret < 0 && ret != AVERROR(EAGAIN) { - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - while ret > 0 || ret == AVERROR(EAGAIN) { - let mut pkt = av_packet_alloc(); - ret = avcodec_receive_packet(self.ctx, pkt); - if ret != 0 { - av_packet_free(&mut pkt); - if ret == AVERROR(EAGAIN) { - break; - } - return Err(Error::msg(get_ffmpeg_error_msg(ret))); - } - - //set_encoded_pkt_timing(self.ctx, pkt, in_tb, &mut self.pts, &self.variant); - av_packet_rescale_ts(pkt, *in_tb, self.variant.time_base()); - //dump_pkt_info(pkt); - pkgs.push(PipelinePayload::AvPacket( - pkt, - AVPacketSource::Encoder(self.variant.id()), - )); - } - - Ok(pkgs) - } -} - -impl PipelineProcessor for VideoEncoder { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - match pkg { - PipelinePayload::AvFrame(frm, ref src) => unsafe { - let (in_stream, idx) = match src { - AVFrameSource::Decoder(s) => (*s, (*(*s)).index as usize), - AVFrameSource::None(s) => (ptr::null_mut(), *s), - _ => return Err(Error::msg(format!("Cannot process frame from: {:?}", src))), - }; - if self.variant.src_index() == idx { - let tb = if in_stream.is_null() { - self.variant.time_base() - } else { - (*in_stream).time_base - }; - self.process_frame(frm, &tb) - } else { - Ok(vec![]) - } - }, - PipelinePayload::Flush => unsafe { - self.process_frame(ptr::null_mut(), &AVRational { num: 0, den: 1 }) - }, - _ => Err(Error::msg("Payload not supported")), - } - } -} diff --git a/src/ingress/file.rs b/src/ingress/file.rs index 70041de..09bf0e9 100644 --- a/src/ingress/file.rs +++ b/src/ingress/file.rs @@ -1,50 +1,19 @@ +use anyhow::Result; +use log::info; use std::path::PathBuf; -use log::{error, info}; -use tokio::io::AsyncReadExt; -use tokio::sync::mpsc::unbounded_channel; +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::settings::Settings; -use crate::ingress::ConnectionInfo; -use crate::pipeline::builder::PipelineBuilder; - -pub async fn listen(path: PathBuf, builder: PipelineBuilder) -> Result<(), anyhow::Error> { +pub async fn listen(path: PathBuf, settings: Settings) -> Result<()> { info!("Sending file {}", path.to_str().unwrap()); - tokio::spawn(async move { - let (tx, rx) = unbounded_channel(); - let info = ConnectionInfo { - ip_addr: "".to_owned(), - endpoint: "file-input".to_owned(), - }; + let info = ConnectionInfo { + ip_addr: "127.0.0.1:6969".to_string(), + endpoint: "file-input".to_owned(), + }; + let file = std::fs::File::open(path)?; + spawn_pipeline(info, settings, Box::new(file)); - if let Ok(mut pl) = builder.build_for(info, rx).await { - std::thread::spawn(move || loop { - if let Err(e) = pl.run() { - error!("Pipeline error: {}\n{}", e, e.backtrace()); - break; - } - }); - - if let Ok(mut stream) = tokio::fs::File::open(path).await { - let mut buf = [0u8; 4096]; - loop { - if let Ok(r) = stream.read(&mut buf).await { - if r > 0 { - if let Err(e) = tx.send(bytes::Bytes::copy_from_slice(&buf[..r])) { - error!("Failed to send file: {}", e); - break; - } - } else { - break; - } - } else { - break; - } - } - - info!("EOF"); - } - } - }); Ok(()) } diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 9b6bf1e..d35eb15 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -1,8 +1,16 @@ +use crate::pipeline::runner::PipelineRunner; +use crate::settings::Settings; +use crate::webhook::Webhook; +use anyhow::Result; +use log::{error, info}; use serde::{Deserialize, Serialize}; +use std::io::Read; pub mod file; +#[cfg(feature = "srt")] pub mod srt; pub mod tcp; +#[cfg(feature = "test-source")] pub mod test; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -13,3 +21,28 @@ pub struct ConnectionInfo { /// IP address of the connection pub ip_addr: String, } + +pub(crate) fn spawn_pipeline( + info: ConnectionInfo, + settings: Settings, + reader: Box, +) { + info!("New client connected: {}", &info.ip_addr); + std::thread::spawn(move || unsafe { + if let Err(e) = spawn_pipeline_inner(info, settings, reader) { + error!("{}", e); + } + }); +} + +unsafe fn spawn_pipeline_inner( + info: ConnectionInfo, + settings: Settings, + reader: Box, +) -> Result<()> { + let webhook = Webhook::new(settings.clone()); + let mut pl = PipelineRunner::new(info, webhook, reader)?; + loop { + pl.run()? + } +} diff --git a/src/ingress/srt.rs b/src/ingress/srt.rs index 4cea9dd..3492880 100644 --- a/src/ingress/srt.rs +++ b/src/ingress/srt.rs @@ -1,42 +1,24 @@ +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::pipeline::runner::PipelineRunner; +use crate::settings::Settings; +use crate::webhook::Webhook; +use anyhow::Result; use futures_util::{StreamExt, TryStreamExt}; use log::{error, info, warn}; -use srt_tokio::SrtListener; +use srt_tokio::{SrtListener, SrtSocket}; use tokio::sync::mpsc::unbounded_channel; -use crate::ingress::ConnectionInfo; -use crate::pipeline::builder::PipelineBuilder; +pub async fn listen(listen_addr: String, settings: Settings) -> Result<()> { + let (_binding, mut packets) = SrtListener::builder().bind(listen_addr.clone()).await?; -pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow::Error> { - let (_binding, mut packets) = SrtListener::builder().bind(addr.clone()).await?; - - info!("SRT listening on: {}", addr.clone()); + info!("SRT listening on: {}", listen_addr.clone()); while let Some(request) = packets.incoming().next().await { let mut socket = request.accept(None).await?; - let ep = addr.clone(); let info = ConnectionInfo { - endpoint: ep.clone(), + endpoint: listen_addr.clone(), ip_addr: socket.settings().remote.to_string(), }; - let (tx, rx) = unbounded_channel(); - if let Ok(mut pipeline) = builder.build_for(info, rx).await { - std::thread::spawn(move || loop { - if let Err(e) = pipeline.run() { - error!("Pipeline error: {}\n{}", e, e.backtrace()); - break; - } - }); - tokio::spawn(async move { - info!("New client connected: {}", ep); - while let Ok(Some((_inst, bytes))) = socket.try_next().await { - if let Err(e) = tx.send(bytes) { - warn!("SRT Error: {e}"); - break; - } - } - socket.close_and_finish().await.unwrap(); - info!("Client {} disconnected", ep); - }); - } + spawn_pipeline(info, settings.clone(), Box::new(socket)); } Ok(()) } diff --git a/src/ingress/tcp.rs b/src/ingress/tcp.rs index 8de5f07..7622f3b 100644 --- a/src/ingress/tcp.rs +++ b/src/ingress/tcp.rs @@ -1,59 +1,21 @@ -use std::io; - -use log::{error, info, warn}; -use tokio::io::AsyncReadExt; +use anyhow::Result; +use log::info; use tokio::net::TcpListener; -use tokio::sync::mpsc::unbounded_channel; -use crate::ingress::ConnectionInfo; -use crate::pipeline::builder::PipelineBuilder; +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::settings::Settings; -pub async fn listen(addr: String, builder: PipelineBuilder) -> Result<(), anyhow::Error> { - let listener = TcpListener::bind(addr.clone()).await.unwrap(); +pub async fn listen(addr: String, settings: Settings) -> Result<()> { + let listener = TcpListener::bind(addr.clone()).await?; info!("TCP listening on: {}", addr.clone()); - while let Ok((mut socket, ip)) = listener.accept().await { - info!("New client connected: {}", ip.clone()); - let ep = addr.clone(); - let builder = builder.clone(); - tokio::spawn(async move { - let (sender, recv) = unbounded_channel(); - let info = ConnectionInfo { - ip_addr: ip.to_string(), - endpoint: ep, - }; - - if let Ok(mut pl) = builder.build_for(info, recv).await { - std::thread::spawn(move || loop { - if let Err(e) = pl.run() { - error!("Pipeline error: {}\n{}", e, e.backtrace()); - break; - } - }); - - let mut buf = [0u8; 4096]; - loop { - match socket.read(&mut buf).await { - Ok(0) => break, - Ok(n) => { - let bytes = bytes::Bytes::copy_from_slice(&buf[0..n]); - if let Err(e) = sender.send(bytes) { - warn!("{:?}", e); - break; - } - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - error!("{}", e); - break; - } - } - } - info!("Client disconnected: {}", ip); - } - }); + while let Ok((socket, ip)) = listener.accept().await { + let info = ConnectionInfo { + ip_addr: ip.to_string(), + endpoint: addr.clone(), + }; + let socket = socket.into_std()?; + spawn_pipeline(info, settings.clone(), Box::new(socket)); } Ok(()) } diff --git a/src/ingress/test.rs b/src/ingress/test.rs index 1310337..ce37291 100644 --- a/src/ingress/test.rs +++ b/src/ingress/test.rs @@ -1,97 +1,88 @@ +use crate::ingress::{spawn_pipeline, ConnectionInfo}; +use crate::settings::Settings; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_RGB; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPictureType::AV_PICTURE_TYPE_NONE; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P}; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{ + av_frame_alloc, av_frame_get_buffer, AV_PROFILE_H264_MAIN, +}; +use ffmpeg_rs_raw::{Encoder, Scaler}; +use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; +use fontdue::Font; +use log::info; +use std::collections::VecDeque; +use std::io::Read; use std::ops::Add; use std::slice; use std::time::{Duration, Instant}; +use tiny_skia::Pixmap; +use warp::Buf; -use crate::encode::video::VideoEncoder; -use crate::ingress::ConnectionInfo; -use crate::pipeline::builder::PipelineBuilder; -use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor}; -use crate::scale::Scaler; -use crate::variant::mapping::VariantMapping; -use crate::variant::video::VideoVariant; -use ffmpeg_sys_next::AVCodecID::AV_CODEC_ID_H264; -use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_RGB; -use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_NONE; -use ffmpeg_sys_next::AVPixelFormat::{AV_PIX_FMT_RGBA, AV_PIX_FMT_YUV420P}; -use ffmpeg_sys_next::{ - av_frame_alloc, av_frame_get_buffer, AV_PROFILE_H264_MAIN, -}; -use fontdue::layout::{CoordinateSystem, Layout, TextStyle}; -use libc::memcpy; -use log::{error, info, warn}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; -use uuid::Uuid; - -const WIDTH: libc::c_int = 1920; -const HEIGHT: libc::c_int = 1080; -const FPS: libc::c_int = 25; - -pub async fn listen(builder: PipelineBuilder) -> Result<(), anyhow::Error> { +pub async fn listen(settings: Settings) -> Result<()> { info!("Test pattern enabled"); - let (tx, rx) = unbounded_channel(); let info = ConnectionInfo { - ip_addr: "".to_owned(), - endpoint: "test-pattern".to_owned(), + endpoint: "test-source".to_string(), + ip_addr: "".to_string(), }; - - if let Ok(mut pl) = builder.build_for(info, rx).await { - let pipeline = std::thread::spawn(move || loop { - if let Err(e) = pl.run() { - error!("Pipeline error: {}\n{}", e, e.backtrace()); - break; - } - }); - let encoder = std::thread::spawn(move || { - run_encoder(tx); - }); - if encoder.join().is_err() { - error!("Encoder thread error"); - } - if pipeline.join().is_err() { - error!("Pipeline thread error"); - } - } + let src = TestPatternSrc::new()?; + spawn_pipeline(info, settings, Box::new(src)); Ok(()) } -fn run_encoder(tx: UnboundedSender) { - let var = VideoVariant { - mapping: VariantMapping { - id: Uuid::new_v4(), - src_index: 0, - dst_index: 0, - group_id: 0, - }, - width: WIDTH as u16, - height: HEIGHT as u16, - fps: FPS as u16, - bitrate: 1_000_000, - codec: AV_CODEC_ID_H264 as usize, - profile: AV_PROFILE_H264_MAIN as usize, - level: 51, - keyframe_interval: FPS as u16, - pixel_format: AV_PIX_FMT_YUV420P as u32, - }; - let mut sws = Scaler::new(var.clone()); - let mut enc = VideoEncoder::new(var.clone()); +struct TestPatternSrc { + encoder: Encoder, + scaler: Scaler, + background: Pixmap, + font: [Font; 1], + frame_no: u64, + start: Instant, + buf: VecDeque, +} - let svg_data = std::fs::read("./test.svg").unwrap(); - let tree = usvg::Tree::from_data(&svg_data, &Default::default()).unwrap(); - let mut pixmap = tiny_skia::Pixmap::new(WIDTH as u32, HEIGHT as u32).unwrap(); - let render_ts = tiny_skia::Transform::from_scale(1f32, 1f32); - resvg::render(&tree, render_ts, &mut pixmap.as_mut()); +unsafe impl Send for TestPatternSrc {} - let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8]; - let scp = fontdue::Font::from_bytes(font, Default::default()).unwrap(); - let mut layout = Layout::new(CoordinateSystem::PositiveYDown); - let fonts = &[&scp]; +impl TestPatternSrc { + pub fn new() -> Result { + let scaler = Scaler::new(); + let encoder = unsafe { + Encoder::new(AV_CODEC_ID_H264)? + .with_stream_index(0) + .with_framerate(30.0) + .with_bitrate(1_000_000) + .with_pix_fmt(AV_PIX_FMT_YUV420P) + .with_width(1280) + .with_height(720) + .with_level(51) + .with_profile(AV_PROFILE_H264_MAIN) + .open(None)? + }; - let start = Instant::now(); - let mut frame_number: u64 = 0; - loop { - let stream_time = Duration::from_secs_f64(frame_number as f64 / FPS as f64); - let real_time = Instant::now().duration_since(start); + let svg_data = include_bytes!("../../test.svg"); + let tree = usvg::Tree::from_data(svg_data, &Default::default())?; + let mut pixmap = Pixmap::new(1280, 720).unwrap(); + let render_ts = tiny_skia::Transform::from_scale(1f32, 1f32); + resvg::render(&tree, render_ts, &mut pixmap.as_mut()); + + let font = include_bytes!("../../SourceCodePro-Regular.ttf") as &[u8]; + let font = Font::from_bytes(font, Default::default()).unwrap(); + + Ok(Self { + encoder, + scaler, + background: pixmap, + font: [font], + frame_no: 0, + start: Instant::now(), + buf: VecDeque::new(), + }) + } + + pub unsafe fn next_pkt(&mut self) -> Result> { + let stream_time = Duration::from_secs_f64(self.frame_no as f64 / 30.0); + let real_time = Instant::now().duration_since(self.start); let wait_time = if stream_time > real_time { stream_time - real_time } else { @@ -101,35 +92,35 @@ fn run_encoder(tx: UnboundedSender) { std::thread::sleep(wait_time); } - frame_number += 1; + self.frame_no += 1; let src_frame = unsafe { let src_frame = av_frame_alloc(); - (*src_frame).width = WIDTH; - (*src_frame).height = HEIGHT; + (*src_frame).width = 1280; + (*src_frame).height = 720; (*src_frame).pict_type = AV_PICTURE_TYPE_NONE; (*src_frame).key_frame = 1; (*src_frame).colorspace = AVCOL_SPC_RGB; - (*src_frame).format = AV_PIX_FMT_RGBA as libc::c_int; - (*src_frame).pts = frame_number as i64; + (*src_frame).format = AV_PIX_FMT_RGBA as _; + (*src_frame).pts = self.frame_no as i64; (*src_frame).duration = 1; av_frame_get_buffer(src_frame, 0); - memcpy( - (*src_frame).data[0] as *mut libc::c_void, - pixmap.data().as_ptr() as *const libc::c_void, - (WIDTH * HEIGHT * 4) as libc::size_t, - ); + self.background + .data() + .as_ptr() + .copy_to((*src_frame).data[0] as *mut _, 1280 * 720 * 4); src_frame }; + let mut layout = Layout::new(CoordinateSystem::PositiveYDown); layout.clear(); layout.append( - fonts, - &TextStyle::new(&format!("frame={}", frame_number), 40.0, 0), + &self.font, + &TextStyle::new(&format!("frame={}", self.frame_no), 40.0, 0), ); for g in layout.glyphs() { - let (metrics, bitmap) = scp.rasterize_config_subpixel(g.key); + let (metrics, bitmap) = self.font[0].rasterize_config_subpixel(g.key); for y in 0..metrics.height { for x in 0..metrics.width { let dst_x = x + g.x as usize; @@ -146,40 +137,28 @@ fn run_encoder(tx: UnboundedSender) { } } + let mut ret = Vec::new(); // scale/encode - let pkgs = match sws.process(PipelinePayload::AvFrame(src_frame, AVFrameSource::None(0))) { - Ok(p) => p, - Err(e) => { - error!("Failed to scale frame: {}", e); - return; - } - }; - for pkg in pkgs { - match enc.process(pkg) { - Ok(pkgs) => { - for pkg in pkgs { - match pkg { - PipelinePayload::AvPacket(pkt, _) => unsafe { - let buf = bytes::Bytes::from(slice::from_raw_parts( - (*pkt).data, - (*pkt).size as usize, - )); - if let Err(e) = tx.send(buf) { - error!("Failed to send test pkt: {}", e); - return; - } - }, - _ => { - warn!("Unknown payload from encoder: {:?}", pkg); - } - } - } - } - Err(e) => { - error!("Failed to encode: {}", e); - return; - } - } + let frame = self + .scaler + .process_frame(src_frame, 1280, 720, AV_PIX_FMT_YUV420P)?; + for pkt in self.encoder.encode_frame(frame)? { + let buf = slice::from_raw_parts((*pkt).data, (*pkt).size as usize); + ret.extend(buf); } + Ok(ret) + } +} + +impl Read for TestPatternSrc { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + unsafe { + while self.buf.len() < buf.len() { + let data = self.next_pkt().map_err(|e| std::io::Error::other(e))?; + self.buf.extend(data); + } + } + self.buf.copy_to_slice(buf); + Ok(buf.len()) } } diff --git a/src/main.rs b/src/main.rs index 68cb382..8f332ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,19 @@ -use std::ffi::CStr; - use clap::Parser; use config::Config; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::av_version_info; +use ffmpeg_rs_raw::rstr; use log::{error, info}; use url::Url; use crate::egress::http::listen_out_dir; -use crate::pipeline::builder::PipelineBuilder; use crate::settings::Settings; -use crate::webhook::Webhook; -mod decode; -mod demux; mod egress; -mod encode; mod fraction; mod ingress; mod ipc; mod pipeline; -mod scale; mod settings; -mod utils; mod variant; mod webhook; @@ -31,11 +24,11 @@ struct Args { file: Option, /// Add input test pattern at startup + #[cfg(feature = "test-source")] #[arg(long)] test_pattern: bool, } -/// Test: ffmpeg -re -f lavfi -i testsrc -g 2 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -f mpegts srt://localhost:3333 #[tokio::main] async fn main() -> anyhow::Result<()> { pretty_env_logger::init(); @@ -44,12 +37,7 @@ async fn main() -> anyhow::Result<()> { unsafe { //ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_DEBUG); - info!( - "FFMPEG version={}", - CStr::from_ptr(ffmpeg_sys_next::av_version_info()) - .to_str() - .unwrap() - ); + info!("FFMPEG version={}", rstr!(av_version_info())); } let builder = Config::builder() @@ -59,15 +47,14 @@ async fn main() -> anyhow::Result<()> { let settings: Settings = builder.try_deserialize()?; - let webhook = Webhook::new(settings.clone()); - let builder = PipelineBuilder::new(webhook); let mut listeners = vec![]; for e in &settings.endpoints { let u: Url = e.parse()?; let addr = format!("{}:{}", u.host_str().unwrap(), u.port().unwrap()); match u.scheme() { - "srt" => listeners.push(tokio::spawn(ingress::srt::listen(addr, builder.clone()))), - "tcp" => listeners.push(tokio::spawn(ingress::tcp::listen(addr, builder.clone()))), + #[cfg(feature = "srt")] + "srt" => listeners.push(tokio::spawn(ingress::srt::listen(addr, settings.clone()))), + "tcp" => listeners.push(tokio::spawn(ingress::tcp::listen(addr, settings.clone()))), _ => { error!("Unknown endpoint config: {e}"); } @@ -81,11 +68,12 @@ async fn main() -> anyhow::Result<()> { if let Some(p) = args.file { listeners.push(tokio::spawn(ingress::file::listen( p.parse()?, - builder.clone(), + settings.clone(), ))); } + #[cfg(feature = "test-source")] if args.test_pattern { - listeners.push(tokio::spawn(ingress::test::listen(builder.clone()))); + listeners.push(tokio::spawn(ingress::test::listen(settings.clone()))); } for handle in listeners { @@ -96,12 +84,3 @@ async fn main() -> anyhow::Result<()> { info!("Server closed"); Ok(()) } - -#[macro_export] -macro_rules! return_ffmpeg_error { - ($x:expr) => { - if $x < 0 { - return Err(Error::msg(get_ffmpeg_error_msg($x))); - } - }; -} diff --git a/src/pipeline/builder.rs b/src/pipeline/builder.rs deleted file mode 100644 index cfa463f..0000000 --- a/src/pipeline/builder.rs +++ /dev/null @@ -1,29 +0,0 @@ -use tokio::sync::mpsc::UnboundedReceiver; - -use crate::ingress::ConnectionInfo; -use crate::pipeline::runner::PipelineRunner; -use crate::webhook::Webhook; - -#[derive(Clone)] -pub struct PipelineBuilder { - webhook: Webhook, -} - -impl PipelineBuilder { - pub fn new(webhook: Webhook) -> Self { - Self { webhook } - } - - pub async fn build_for( - &self, - info: ConnectionInfo, - recv: UnboundedReceiver, - ) -> Result { - Ok(PipelineRunner::new( - Default::default(), - info, - self.webhook.clone(), - recv, - )) - } -} diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 9a41602..7f822b3 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,18 +1,10 @@ use std::fmt::{Display, Formatter}; -use anyhow::Error; -use ffmpeg_sys_next::{ - av_frame_clone, av_frame_copy_props, av_frame_free, av_packet_clone, av_packet_copy_props, - av_packet_free, AVCodecContext, AVFrame, AVPacket, AVStream, -}; +use crate::egress::EgressConfig; +use crate::variant::VariantStream; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::demux::info::DemuxerInfo; -use crate::egress::EgressConfig; -use crate::variant::VariantStream; - -pub mod builder; pub mod runner; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -27,6 +19,16 @@ pub enum EgressType { RTMPForwarder(EgressConfig), } +impl EgressType { + pub fn config(&self) -> &EgressConfig { + match self { + EgressType::HLS(c) => c, + EgressType::Recorder(c) => c, + EgressType::RTMPForwarder(c) => c, + } + } +} + impl Display for EgressType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( @@ -44,10 +46,8 @@ impl Display for EgressType { #[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct PipelineConfig { pub id: Uuid, - /// Transcoded/Copied stream config pub variants: Vec, - /// Output muxers pub egress: Vec, } @@ -68,87 +68,3 @@ impl Display for PipelineConfig { Ok(()) } } - -#[derive(Debug, PartialEq, Clone)] -pub enum AVPacketSource { - /// AVPacket from demuxer - Demuxer(*mut AVStream), - /// AVPacket from an encoder - Encoder(Uuid), -} - -#[derive(Debug, PartialEq, Clone)] -pub enum AVFrameSource { - /// ACPacket from decoder source stream - Decoder(*mut AVStream), - /// AVPacket from frame scaler step - Scaler(*mut AVStream), - /// Flush frame (empty) - Flush, - /// No context provided, dst_stream manually matched - None(usize), -} - -#[derive(Debug, PartialEq)] -pub enum PipelinePayload { - /// No output - Empty, - /// Raw bytes from ingress - Bytes(bytes::Bytes), - /// FFMpeg AVPacket - AvPacket(*mut AVPacket, AVPacketSource), - /// FFMpeg AVFrame - AvFrame(*mut AVFrame, AVFrameSource), - /// Information about an encoder in this pipeline - EncoderInfo(Uuid, *const AVCodecContext), - /// Source stream information provided by the demuxer - SourceInfo(DemuxerInfo), - /// Flush pipeline - Flush, -} - -unsafe impl Send for PipelinePayload {} - -unsafe impl Sync for PipelinePayload {} - -impl Clone for PipelinePayload { - fn clone(&self) -> Self { - match self { - PipelinePayload::AvPacket(p, v) => unsafe { - assert!(!(**p).data.is_null(), "Cannot clone empty packet"); - let new_pkt = av_packet_clone(*p); - av_packet_copy_props(new_pkt, *p); - PipelinePayload::AvPacket(new_pkt, v.clone()) - }, - PipelinePayload::AvFrame(p, v) => unsafe { - assert!(!(**p).extended_data.is_null(), "Cannot clone empty frame"); - let new_frame = av_frame_clone(*p); - av_frame_copy_props(new_frame, *p); - PipelinePayload::AvFrame(new_frame, v.clone()) - }, - PipelinePayload::Empty => PipelinePayload::Empty, - PipelinePayload::Bytes(b) => PipelinePayload::Bytes(b.clone()), - PipelinePayload::EncoderInfo(a, b) => PipelinePayload::EncoderInfo(*a, *b), - PipelinePayload::SourceInfo(a) => PipelinePayload::SourceInfo(a.clone()), - PipelinePayload::Flush => PipelinePayload::Flush - } - } -} - -impl Drop for PipelinePayload { - fn drop(&mut self) { - match self { - PipelinePayload::AvPacket(p, _) => unsafe { - av_packet_free(p); - }, - PipelinePayload::AvFrame(p, _) => unsafe { - av_frame_free(p); - }, - _ => {} - } - } -} - -pub trait PipelineProcessor { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error>; -} diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index 43aca69..bace5e8 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -1,222 +1,227 @@ +use std::collections::{HashMap, HashSet}; +use std::io::Read; +use std::mem::transmute; use std::ops::Sub; use std::time::Instant; -use anyhow::Error; -use log::info; -use tokio::sync::mpsc::UnboundedReceiver; -use uuid::Uuid; - -use crate::decode::Decoder; -use crate::demux::info::DemuxerInfo; -use crate::demux::Demuxer; use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; -use crate::encode::audio::AudioEncoder; -use crate::encode::video::VideoEncoder; +use crate::egress::Egress; use crate::ingress::ConnectionInfo; -use crate::pipeline::{ - AVPacketSource, EgressType, PipelineConfig, PipelinePayload, PipelineProcessor, -}; -use crate::scale::Scaler; +use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; use crate::webhook::Webhook; +use anyhow::Result; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_get_sample_fmt, av_packet_free}; +use ffmpeg_rs_raw::{ + cstr, get_frame_from_hw, Decoder, Demuxer, DemuxerInfo, Encoder, Resample, Scaler, +}; +use itertools::Itertools; +use log::{info, warn}; +use uuid::Uuid; -type BoxedProcessor = Box; - -/// Resample/Encode -struct Transcoder { - pub variant: Uuid, - - /// A resampler which can take decoded sames (Audio or Video) - pub sampler: Option, - - /// The encoder which will encode the resampled frames - pub encoder: BoxedProcessor, -} - -/// -/// |----------------------------------------------------| -/// | Demuxer +/// Pipeline runner is the main entry process for stream transcoding +/// Each client connection spawns a new [PipelineRunner] and it should be run in its own thread +/// using [ingress::spawn_pipeline] pub struct PipelineRunner { + connection: ConnectionInfo, + + /// Configuration for this pipeline (variants, egress config etc.) config: PipelineConfig, - info: ConnectionInfo, + /// Singleton demuxer for this input demuxer: Demuxer, - decoder: Decoder, - transcoders: Vec, - muxers: Vec, - started: Instant, - frame_no: u64, - stream_info: Option, + /// Singleton decoder for all stream + decoder: Decoder, + + /// Scaler for a variant (variant_id, Scaler) + scalers: HashMap, + + /// Resampler for a variant (variant_id, Resample) + resampler: HashMap, + + /// Encoder for a variant (variant_id, Encoder) + encoders: HashMap, + + /// Simple mapping to copy streams + copy_stream: HashMap, + + /// All configured egress' + egress: Vec>, + + fps_counter_start: Instant, + frame_ctr: u64, webhook: Webhook, + + info: Option, } impl PipelineRunner { pub fn new( - config: PipelineConfig, - info: ConnectionInfo, + connection: ConnectionInfo, webhook: Webhook, - recv: UnboundedReceiver, - ) -> Self { - Self { - config, - info, - demuxer: Demuxer::new(recv), + recv: Box, + ) -> Result { + Ok(Self { + connection, + config: Default::default(), + demuxer: Demuxer::new_custom_io(recv, None)?, decoder: Decoder::new(), - transcoders: vec![], - muxers: vec![], - started: Instant::now(), - frame_no: 0, - stream_info: None, + scalers: Default::default(), + resampler: Default::default(), + encoders: Default::default(), + copy_stream: Default::default(), + fps_counter_start: Instant::now(), + egress: Vec::new(), + frame_ctr: 0, webhook, - } + info: None, + }) } - pub fn run(&mut self) -> Result<(), Error> { - if self.stream_info.is_none() { - if let Some(cfg) = self.demuxer.try_probe()? { - self.configure_pipeline(&cfg)?; - for mux in &mut self.muxers { - mux.process(PipelinePayload::SourceInfo(cfg.clone()))?; - } - self.stream_info = Some(cfg); - } else { - return Ok(()); - } - } + /// Main processor, should be called in a loop + pub unsafe fn run(&mut self) -> Result<()> { + self.setup()?; - let demux_pkg = unsafe { self.demuxer.get_packet() }?; + // run transcoder pipeline + let (mut pkt, stream) = self.demuxer.get_packet()?; + let src_index = (*stream).index; - let src_index = if let PipelinePayload::AvPacket(_, s) = &demux_pkg { - if let AVPacketSource::Demuxer(s) = s { - unsafe { (*(*s)).index } - } else { - -1 - } - } else { - -1 - }; - let pkg_variant = self.config.variants.iter().find(|v| match v { - VariantStream::Video(vx) => vx.src_index() as i32 == src_index, - VariantStream::Audio(ax) => ax.src_index() as i32 == src_index, - _ => false, - }); - let transcoded_pkgs = if let Some(var) = pkg_variant { - let frames = self.decoder.process(demux_pkg.clone())?; - if let VariantStream::Video(_) = var { - self.frame_no += frames.len() as u64; - //TODO: Account for multiple video streams in - } + // TODO: For copy streams, skip decoder + for frame in self.decoder.decode_pkt(pkt)? { + self.frame_ctr += 1; - let mut pkgs = Vec::new(); - for frame in &frames { - for tran in &mut self.transcoders { - let frames = if let Some(ref mut smp) = tran.sampler { - smp.process(frame.clone())? - } else { - vec![frame.clone()] - }; + // Copy frame from GPU if using hwaccel decoding + let frame = get_frame_from_hw(frame)?; - for frame in frames { - for pkg in tran.encoder.process(frame)? { - pkgs.push(pkg); + /// Get the variants which want this pkt + let pkt_vars = self + .config + .variants + .iter() + .filter(|v| v.src_index() == src_index as usize); + for var in pkt_vars { + let frame = match var { + VariantStream::Video(v) => { + if let Some(s) = self.scalers.get_mut(&v.id()) { + s.process_frame(frame, v.width, v.height, transmute(v.pixel_format))? + } else { + frame } } + VariantStream::Audio(a) => { + if let Some(r) = self.resampler.get_mut(&a.id()) { + r.process_frame(frame)? + } else { + frame + } + } + _ => frame, + }; + + let packets = if let Some(enc) = self.encoders.get_mut(&var.id()) { + enc.encode_frame(frame)? + } else { + //warn!("Frame had nowhere to go in {} :/", var.id()); + continue; + }; + + // pass new packets to egress + for eg in self.egress.iter_mut() { + for pkt in packets.iter() { + eg.process_pkt(*pkt, &var.id())?; + } } } - pkgs - } else { - vec![] - }; - - // mux - for pkg in transcoded_pkgs { - for ref mut mux in &mut self.muxers { - mux.process(pkg.clone())?; - } - } - for ref mut mux in &mut self.muxers { - mux.process(demux_pkg.clone())?; } - let elapsed = Instant::now().sub(self.started).as_secs_f32(); + av_packet_free(&mut pkt); + + let elapsed = Instant::now().sub(self.fps_counter_start).as_secs_f32(); if elapsed >= 2f32 { - info!("Average fps: {:.2}", self.frame_no as f32 / elapsed); - self.started = Instant::now(); - self.frame_no = 0; + info!("Average fps: {:.2}", self.frame_ctr as f32 / elapsed); + self.fps_counter_start = Instant::now(); + self.frame_ctr = 0; } Ok(()) } - /// Setup pipeline based on the demuxer info - fn configure_pipeline(&mut self, info: &DemuxerInfo) -> Result<(), Error> { - // re-configure with demuxer info - self.config = self.webhook.start(info); - info!("Configuring pipeline {}", self.config); - if self.config.egress.iter().any(|x| match x { - EgressType::HLS(_) => true, - _ => false, - }) { - info!( - "Livestream url: http://localhost:8080/{}/live.m3u8", - self.config.id - ); + unsafe fn setup(&mut self) -> Result<()> { + if self.info.is_some() { + return Ok(()); } - // configure transcoders - for var in &self.config.variants { - match var { + let info = self.demuxer.probe_input()?; + self.setup_pipeline(&info)?; + self.info = Some(info); + Ok(()) + } + + unsafe fn setup_pipeline(&mut self, info: &DemuxerInfo) -> Result<()> { + let cfg = self.webhook.start(info); + self.config = cfg.clone(); + + // src stream indexes + let inputs: HashSet = cfg.variants.iter().map(|e| e.src_index()).collect(); + + // enable hardware decoding + self.decoder.enable_hw_decoder_any(); + + // setup decoders + for input_idx in inputs { + let stream = info.streams.iter().find(|f| f.index == input_idx).unwrap(); + self.decoder.setup_decoder(stream, None)?; + } + + // setup scaler/encoders + for out_stream in &cfg.variants { + match out_stream { VariantStream::Video(v) => { - let scaler = Scaler::new(v.clone()); - let encoder = VideoEncoder::new(v.clone()); - self.transcoders.push(Transcoder { - variant: v.id(), - sampler: Some(Box::new(scaler)), - encoder: Box::new(encoder), - }); + self.encoders.insert(out_stream.id(), v.try_into()?); + self.scalers.insert(out_stream.id(), Scaler::new()); } VariantStream::Audio(a) => { - let encoder = AudioEncoder::new(a.clone()); - self.transcoders.push(Transcoder { - variant: a.id(), - sampler: None, - encoder: Box::new(encoder), - }); - } - _ => { - //ignored - } - } - } - - // configure muxers - for mux in &self.config.egress { - match mux { - EgressType::HLS(c) => { - let mut hls = - HlsEgress::new(Uuid::new_v4(), c.clone(), self.config.variants.clone()); - hls.setup_muxer()?; - self.muxers.push(Box::new(hls)); - } - EgressType::Recorder(c) => { - let recorder = RecorderEgress::new( - Uuid::new_v4(), - c.clone(), - self.config.variants.clone(), + let enc = a.try_into()?; + let rs = Resample::new( + av_get_sample_fmt(cstr!(&a.sample_fmt)), + a.sample_rate as _, + a.channels as _, ); - self.muxers.push(Box::new(recorder)); - } - EgressType::RTMPForwarder(c) => { - todo!("Implement this") + self.resampler.insert(out_stream.id(), rs); + self.encoders.insert(out_stream.id(), enc); } + _ => continue, } } - if self.muxers.is_empty() { - Err(Error::msg("No egress config, pipeline misconfigured!")) - } else { - Ok(()) + // Setup copy streams + + // Setup egress + for e in cfg.egress { + match e { + EgressType::HLS(ref c) => { + let encoders = self + .encoders + .iter() + .filter(|(k, v)| c.variants.contains(k)) + .map(|(_, v)| v); + + let hls = HlsEgress::new(c.clone(), encoders)?; + self.egress.push(Box::new(hls)); + } + EgressType::Recorder(ref c) => { + let encoders = self + .encoders + .iter() + .filter(|(k, v)| c.variants.contains(k)) + .map(|(_, v)| v); + let rec = RecorderEgress::new(c.clone(), encoders)?; + self.egress.push(Box::new(rec)); + } + _ => warn!("{} is not implemented", e), + } } + Ok(()) } } diff --git a/src/scale/mod.rs b/src/scale/mod.rs deleted file mode 100644 index 910a978..0000000 --- a/src/scale/mod.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::ffi::CStr; -use std::mem::transmute; -use std::ptr; - -use anyhow::Error; -use ffmpeg_sys_next::{ - av_frame_alloc, av_frame_copy_props, av_get_pix_fmt_name, AVFrame, SWS_BILINEAR, - sws_freeContext, sws_getContext, sws_scale_frame, SwsContext, -}; -use log::info; - -use crate::pipeline::{AVFrameSource, PipelinePayload, PipelineProcessor}; -use crate::return_ffmpeg_error; -use crate::utils::get_ffmpeg_error_msg; -use crate::variant::StreamMapping; -use crate::variant::video::VideoVariant; - -pub struct Scaler { - variant: VideoVariant, - ctx: *mut SwsContext, -} - -unsafe impl Send for Scaler {} - -unsafe impl Sync for Scaler {} - -impl Drop for Scaler { - fn drop(&mut self) { - unsafe { - sws_freeContext(self.ctx); - self.ctx = ptr::null_mut(); - } - } -} - -impl Scaler { - pub fn new(variant: VideoVariant) -> Self { - Self { - variant, - ctx: ptr::null_mut(), - } - } - - unsafe fn setup_scaler(&mut self, frame: *const AVFrame) -> Result<(), Error> { - if !self.ctx.is_null() { - return Ok(()); - } - - let ctx = sws_getContext( - (*frame).width, - (*frame).height, - transmute((*frame).format), - self.variant.width as libc::c_int, - self.variant.height as libc::c_int, - transmute(self.variant.pixel_format), - SWS_BILINEAR, - ptr::null_mut(), - ptr::null_mut(), - ptr::null_mut(), - ); - if ctx.is_null() { - return Err(Error::msg("Failed to create scalar context")); - } - info!( - "Scalar config: {}x{}@{} => {}x{}@{}", - (*frame).width, - (*frame).height, - CStr::from_ptr(av_get_pix_fmt_name(transmute((*frame).format))) - .to_str()?, - self.variant.width, - self.variant.height, - CStr::from_ptr(av_get_pix_fmt_name(transmute(self.variant.pixel_format))) - .to_str()? - ); - self.ctx = ctx; - Ok(()) - } - - unsafe fn process_frame( - &mut self, - frame: *mut AVFrame, - src: &AVFrameSource, - ) -> Result, Error> { - self.setup_scaler(frame)?; - - let dst_frame = av_frame_alloc(); - let ret = av_frame_copy_props(dst_frame, frame); - return_ffmpeg_error!(ret); - - let ret = sws_scale_frame(self.ctx, dst_frame, frame); - return_ffmpeg_error!(ret); - - Ok(vec![PipelinePayload::AvFrame(dst_frame, src.clone())]) - } -} - -impl PipelineProcessor for Scaler { - fn process(&mut self, pkg: PipelinePayload) -> Result, Error> { - match pkg { - PipelinePayload::AvFrame(frm, ref src) => unsafe { - let idx = match src { - AVFrameSource::Decoder(s) => (**s).index as usize, - AVFrameSource::None(s) => *s, - _ => return Err(Error::msg(format!("Cannot process frame from: {:?}", src))), - }; - if self.variant.src_index() == idx { - self.process_frame(frm, src) - } else { - Ok(vec![]) - } - }, - PipelinePayload::Flush => { - // pass flush to next step - Ok(vec![pkg]) - } - _ => Err(Error::msg("Payload not supported payload")), - } - } -} diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index 23f6a1f..0000000 --- a/src/utils.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::ffi::CStr; - -use ffmpeg_sys_next::av_make_error_string; - -pub fn get_ffmpeg_error_msg(ret: libc::c_int) -> String { - unsafe { - const BUF_SIZE: usize = 512; - let mut buf: [libc::c_char; BUF_SIZE] = [0; BUF_SIZE]; - av_make_error_string(buf.as_mut_ptr(), BUF_SIZE, ret); - String::from(CStr::from_ptr(buf.as_ptr()).to_str().unwrap()) - } -} diff --git a/src/variant/audio.rs b/src/variant/audio.rs index 2a7d5d0..78469e4 100644 --- a/src/variant/audio.rs +++ b/src/variant/audio.rs @@ -1,19 +1,11 @@ -use std::ffi::CStr; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{av_get_sample_fmt, avcodec_get_name}; +use ffmpeg_rs_raw::{cstr, rstr, Encoder}; +use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; use std::intrinsics::transmute; -use std::ptr; - -use ffmpeg_sys_next::AVChannelOrder::AV_CHANNEL_ORDER_NATIVE; -use ffmpeg_sys_next::AVCodecID::AV_CODEC_ID_AAC; -use ffmpeg_sys_next::{ - av_get_sample_fmt, avcodec_find_encoder, avcodec_find_encoder_by_name, avcodec_get_name, - AVChannelLayout, AVChannelLayout__bindgen_ty_1, AVCodec, AVCodecContext, AVCodecParameters, - AVRational, AVStream, AV_CH_LAYOUT_STEREO, -}; -use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::variant::{EncodedStream, StreamMapping, VariantMapping}; +use crate::variant::{StreamMapping, VariantMapping}; /// Information related to variant streams for a given egress #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -44,11 +36,7 @@ impl Display for AudioVariant { "Audio #{}->{}: {}, {}kbps", self.mapping.src_index, self.mapping.dst_index, - unsafe { - CStr::from_ptr(avcodec_get_name(transmute(self.codec as i32))) - .to_str() - .unwrap() - }, + unsafe { rstr!(avcodec_get_name(transmute(self.codec as i32))) }, self.bitrate / 1000 ) } @@ -72,65 +60,21 @@ impl StreamMapping for AudioVariant { fn group_id(&self) -> usize { self.mapping.group_id } - - unsafe fn to_stream(&self, stream: *mut AVStream) { - (*stream).time_base = self.time_base(); - self.to_codec_params((*stream).codecpar); - } } -impl EncodedStream for AudioVariant { - fn time_base(&self) -> AVRational { - AVRational { - num: 1, - den: self.sample_rate as libc::c_int, - } - } +impl TryInto for &AudioVariant { + type Error = anyhow::Error; - unsafe fn get_codec(&self) -> *const AVCodec { - if self.codec == AV_CODEC_ID_AAC as usize { - avcodec_find_encoder_by_name("libfdk_aac\0".as_ptr() as *const libc::c_char) - } else { - avcodec_find_encoder(transmute(self.codec as u32)) - } - } + fn try_into(self) -> Result { + unsafe { + let enc = Encoder::new(transmute(self.codec as u32))? + .with_sample_rate(self.sample_rate as _) + .with_bitrate(self.bitrate as _) + .with_default_channel_layout(self.channels as _) + .with_sample_format(av_get_sample_fmt(cstr!(&self.sample_fmt))) + .open(None)?; - unsafe fn to_codec_context(&self, ctx: *mut AVCodecContext) { - let codec = self.get_codec(); - (*ctx).codec_id = (*codec).id; - (*ctx).codec_type = (*codec).type_; - (*ctx).time_base = self.time_base(); - (*ctx).sample_fmt = - av_get_sample_fmt(format!("{}\0", self.sample_fmt).as_ptr() as *const libc::c_char); - (*ctx).bit_rate = self.bitrate as i64; - (*ctx).sample_rate = self.sample_rate as libc::c_int; - (*ctx).ch_layout = self.channel_layout(); - (*ctx).frame_size = 1024; - } - - unsafe fn to_codec_params(&self, params: *mut AVCodecParameters) { - let codec = self.get_codec(); - (*params).codec_id = (*codec).id; - (*params).codec_type = (*codec).type_; - (*params).format = - av_get_sample_fmt(format!("{}\0", self.sample_fmt).as_ptr() as *const libc::c_char) - as libc::c_int; - (*params).bit_rate = self.bitrate as i64; - (*params).sample_rate = self.sample_rate as libc::c_int; - (*params).ch_layout = self.channel_layout(); - (*params).frame_size = 1024; //TODO: fix this - } -} - -impl AudioVariant { - fn channel_layout(&self) -> AVChannelLayout { - AVChannelLayout { - order: AV_CHANNEL_ORDER_NATIVE, - nb_channels: 2, - u: AVChannelLayout__bindgen_ty_1 { - mask: AV_CH_LAYOUT_STEREO, - }, - opaque: ptr::null_mut(), + Ok(enc) } } } diff --git a/src/variant/mapping.rs b/src/variant/mapping.rs index d87915d..92b99c9 100644 --- a/src/variant/mapping.rs +++ b/src/variant/mapping.rs @@ -1,7 +1,5 @@ -use std::fmt::{Display, Formatter}; - -use ffmpeg_sys_next::AVStream; use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; use uuid::Uuid; use crate::variant::StreamMapping; @@ -47,8 +45,4 @@ impl StreamMapping for VariantMapping { fn group_id(&self) -> usize { self.group_id } - - unsafe fn to_stream(&self, stream: *mut AVStream) { - // do nothing - } } diff --git a/src/variant/mod.rs b/src/variant/mod.rs index 53aba9d..0c456f3 100644 --- a/src/variant/mod.rs +++ b/src/variant/mod.rs @@ -1,13 +1,10 @@ -use std::fmt::{Display, Formatter}; - -use anyhow::Error; -use ffmpeg_sys_next::{AVCodec, AVCodecContext, AVCodecParameters, AVRational, AVStream}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - use crate::variant::audio::AudioVariant; use crate::variant::mapping::VariantMapping; use crate::variant::video::VideoVariant; +use anyhow::Error; +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; +use uuid::Uuid; pub mod audio; pub mod mapping; @@ -70,15 +67,6 @@ impl StreamMapping for VariantStream { VariantStream::CopyVideo(v) => v.group_id(), } } - - unsafe fn to_stream(&self, stream: *mut AVStream) { - match self { - VariantStream::Video(v) => v.to_stream(stream), - VariantStream::Audio(v) => v.to_stream(stream), - VariantStream::CopyAudio(v) => v.to_stream(stream), - VariantStream::CopyVideo(v) => v.to_stream(stream), - } - } } impl Display for VariantStream { @@ -98,14 +86,6 @@ pub trait StreamMapping { fn dst_index(&self) -> usize; fn set_dst_index(&mut self, dst: usize); fn group_id(&self) -> usize; - unsafe fn to_stream(&self, stream: *mut AVStream); -} - -pub trait EncodedStream { - fn time_base(&self) -> AVRational; - unsafe fn get_codec(&self) -> *const AVCodec; - unsafe fn to_codec_context(&self, ctx: *mut AVCodecContext); - unsafe fn to_codec_params(&self, params: *mut AVCodecParameters); } /// Find a stream by ID in a vec of streams diff --git a/src/variant/video.rs b/src/variant/video.rs index 12afba4..ab0400b 100644 --- a/src/variant/video.rs +++ b/src/variant/video.rs @@ -1,21 +1,17 @@ -use std::ffi::CStr; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVCodecID::AV_CODEC_ID_H264; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVColorSpace::AVCOL_SPC_BT709; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::{avcodec_get_name, AVCodecID}; +use ffmpeg_rs_raw::{rstr, Encoder}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::intrinsics::transmute; - -use ffmpeg_sys_next::AVCodecID::AV_CODEC_ID_H264; -use ffmpeg_sys_next::AVColorSpace::AVCOL_SPC_BT709; -use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; -use ffmpeg_sys_next::{ - av_opt_set, avcodec_find_encoder, avcodec_get_name, AVCodec, AVCodecContext, AVCodecParameters, - AVRational, AVStream, -}; -use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::variant::{EncodedStream, StreamMapping, VariantMapping}; +use crate::variant::{StreamMapping, VariantMapping}; /// Information related to variant streams for a given egress -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct VideoVariant { /// Id, Src, Dst pub mapping: VariantMapping, @@ -27,7 +23,7 @@ pub struct VideoVariant { pub height: u16, /// FPS for this stream - pub fps: u16, + pub fps: f32, /// Bitrate of this stream pub bitrate: u64, @@ -55,11 +51,7 @@ impl Display for VideoVariant { "Video #{}->{}: {}, {}x{}, {}fps, {}kbps", self.mapping.src_index, self.mapping.dst_index, - unsafe { - CStr::from_ptr(avcodec_get_name(transmute(self.codec as i32))) - .to_str() - .unwrap() - }, + unsafe { rstr!(avcodec_get_name(transmute(self.codec as i32))) }, self.width, self.height, self.fps, @@ -87,83 +79,35 @@ impl StreamMapping for VideoVariant { fn group_id(&self) -> usize { self.mapping.group_id } - - unsafe fn to_stream(&self, stream: *mut AVStream) { - (*stream).time_base = self.time_base(); - (*stream).avg_frame_rate = AVRational { - num: self.fps as libc::c_int, - den: 1, - }; - (*stream).r_frame_rate = AVRational { - num: self.fps as libc::c_int, - den: 1, - }; - self.to_codec_params((*stream).codecpar); - } } -impl EncodedStream for VideoVariant { - fn time_base(&self) -> AVRational { - AVRational { - num: 1, - den: 90_000, +impl TryInto for &VideoVariant { + type Error = anyhow::Error; + + fn try_into(self) -> Result { + unsafe { + let mut opt = HashMap::new(); + if self.codec == transmute::(AV_CODEC_ID_H264) as usize { + opt.insert("preset".to_string(), "fast".to_string()); + //opt.insert("tune".to_string(), "zerolatency".to_string()); + } + let enc = Encoder::new(transmute(self.codec as u32))? + .with_bitrate(self.bitrate as _) + .with_width(self.width as _) + .with_height(self.height as _) + .with_pix_fmt(transmute(self.pixel_format)) + .with_profile(transmute(self.profile as i32)) + .with_level(transmute(self.level as i32)) + .with_framerate(self.fps) + .with_options(|ctx| { + (*ctx).gop_size = self.keyframe_interval as _; + (*ctx).keyint_min = self.keyframe_interval as _; + (*ctx).max_b_frames = 3; + (*ctx).colorspace = AVCOL_SPC_BT709; + }) + .open(Some(opt))?; + + Ok(enc) } } - - unsafe fn get_codec(&self) -> *const AVCodec { - avcodec_find_encoder(transmute(self.codec as u32)) - } - - unsafe fn to_codec_context(&self, ctx: *mut AVCodecContext) { - let codec = self.get_codec(); - (*ctx).codec_id = (*codec).id; - (*ctx).codec_type = (*codec).type_; - (*ctx).time_base = self.time_base(); - (*ctx).bit_rate = self.bitrate as i64; - (*ctx).width = self.width as libc::c_int; - (*ctx).height = self.height as libc::c_int; - (*ctx).level = self.level as libc::c_int; - (*ctx).profile = self.profile as libc::c_int; - (*ctx).framerate = AVRational { - num: self.fps as libc::c_int, - den: 1, - }; - - (*ctx).gop_size = self.keyframe_interval as libc::c_int; - (*ctx).keyint_min = self.keyframe_interval as libc::c_int; - (*ctx).max_b_frames = 3; - (*ctx).pix_fmt = AV_PIX_FMT_YUV420P; - (*ctx).colorspace = AVCOL_SPC_BT709; - if (*codec).id == AV_CODEC_ID_H264 { - av_opt_set( - (*ctx).priv_data, - "preset\0".as_ptr() as *const libc::c_char, - "fast\0".as_ptr() as *const libc::c_char, - 0, - ); - av_opt_set( - (*ctx).priv_data, - "tune\0".as_ptr() as *const libc::c_char, - "zerolatency\0".as_ptr() as *const libc::c_char, - 0, - ); - } - } - - unsafe fn to_codec_params(&self, params: *mut AVCodecParameters) { - let codec = self.get_codec(); - (*params).codec_id = (*codec).id; - (*params).codec_type = (*codec).type_; - (*params).height = self.height as libc::c_int; - (*params).width = self.width as libc::c_int; - (*params).format = AV_PIX_FMT_YUV420P as i32; - (*params).framerate = AVRational { - num: self.fps as libc::c_int, - den: 1, - }; - (*params).bit_rate = self.bitrate as i64; - (*params).color_space = AVCOL_SPC_BT709; - (*params).level = self.level as libc::c_int; - (*params).profile = self.profile as libc::c_int; - } } diff --git a/src/webhook.rs b/src/webhook.rs index 70eb7b5..3c319b1 100644 --- a/src/webhook.rs +++ b/src/webhook.rs @@ -1,7 +1,7 @@ -use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; +use ffmpeg_rs_raw::ffmpeg_sys_the_third::AVPixelFormat::AV_PIX_FMT_YUV420P; +use ffmpeg_rs_raw::{DemuxerInfo, StreamType}; use uuid::Uuid; -use crate::demux::info::{DemuxerInfo, StreamChannelType}; use crate::egress::EgressConfig; use crate::pipeline::{EgressType, PipelineConfig}; use crate::settings::Settings; @@ -23,9 +23,9 @@ impl Webhook { pub fn start(&self, stream_info: &DemuxerInfo) -> PipelineConfig { let mut vars: Vec = vec![]; if let Some(video_src) = stream_info - .channels + .streams .iter() - .find(|c| c.channel_type == StreamChannelType::Video) + .find(|c| c.stream_type == StreamType::Video) { vars.push(VariantStream::CopyVideo(VariantMapping { id: Uuid::new_v4(), @@ -38,11 +38,11 @@ impl Webhook { id: Uuid::new_v4(), src_index: video_src.index, dst_index: 1, - group_id: 1 + group_id: 1, }, width: 1280, height: 720, - fps: video_src.fps as u16, + fps: video_src.fps, bitrate: 3_000_000, codec: 27, profile: 100, @@ -53,28 +53,28 @@ impl Webhook { } if let Some(audio_src) = stream_info - .channels + .streams .iter() - .find(|c| c.channel_type == StreamChannelType::Audio) + .find(|c| c.stream_type == StreamType::Audio) { vars.push(VariantStream::CopyAudio(VariantMapping { id: Uuid::new_v4(), src_index: audio_src.index, dst_index: 2, - group_id: 0 + group_id: 0, })); vars.push(VariantStream::Audio(AudioVariant { mapping: VariantMapping { id: Uuid::new_v4(), src_index: audio_src.index, dst_index: 3, - group_id: 1 + group_id: 1, }, bitrate: 192_000, codec: 86018, channels: 2, sample_rate: 48_000, - sample_fmt: "s16".to_owned(), + sample_fmt: "flt".to_owned(), })); } @@ -83,16 +83,16 @@ impl Webhook { id: Uuid::new_v4(), variants: vars, egress: vec![ - /*EgressType::Recorder(EgressConfig { + EgressType::Recorder(EgressConfig { name: "REC".to_owned(), out_dir: self.config.output_dir.clone(), - variants: vars.clone(), - }),*/ - EgressType::HLS(EgressConfig { - name: "HLS".to_owned(), - out_dir: self.config.output_dir.clone(), variants: var_ids, }), + /*EgressType::HLS(EgressConfig { + name: "HLS".to_owned(), + out_dir: self.config.output_dir.clone(), + variants: var_ids, + }),*/ ], } }