mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-16 17:08:50 +00:00
Init demuxer
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/target
|
1188
Cargo.lock
generated
Normal file
1188
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
Normal file
17
Cargo.toml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
name = "stream-core"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
srt-tokio = "0.4.3"
|
||||||
|
tokio = { version = "1.36.0" , features = ["rt-multi-thread"]}
|
||||||
|
anyhow = "1.0.80"
|
||||||
|
pretty_env_logger = "0.5.0"
|
||||||
|
bytes = "1.5.0"
|
||||||
|
tokio-stream = "0.1.14"
|
||||||
|
futures-util = "0.3.30"
|
||||||
|
async-trait = "0.1.77"
|
||||||
|
log = "0.4.21"
|
||||||
|
ffmpeg-sys-next = { version = "6.1.0", features = ["avformat", "avcodec", "swscale", "avfilter"]}
|
||||||
|
libc = "0.2.153"
|
3
README.md
Normal file
3
README.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# zap.stream core
|
||||||
|
|
||||||
|
Pure rust zap.stream core streaming server
|
172
src/demux/mod.rs
Normal file
172
src/demux/mod.rs
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
use std::ffi::CStr;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
|
use ffmpeg_sys_next::*;
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
|
||||||
|
use crate::pipeline::{PipelinePayload, PipelineStep};
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Demuxer supports demuxing and decoding
|
||||||
|
///
|
||||||
|
/// | Type | Value |
|
||||||
|
/// | ------ | ----------------------------- |
|
||||||
|
/// | Video | H264, H265, VP8, VP9, AV1 |
|
||||||
|
/// | Audio | AAC, Opus |
|
||||||
|
/// | Format | MPEG-TS |
|
||||||
|
///
|
||||||
|
pub(crate) struct Demuxer {
|
||||||
|
buffer: BytesMut,
|
||||||
|
ctx: *mut AVFormatContext,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for Demuxer {}
|
||||||
|
unsafe impl Sync for Demuxer {}
|
||||||
|
|
||||||
|
unsafe extern "C" fn read_data(
|
||||||
|
opaque: *mut libc::c_void,
|
||||||
|
buffer: *mut libc::c_uchar,
|
||||||
|
size: libc::c_int,
|
||||||
|
) -> libc::c_int {
|
||||||
|
let muxer = opaque as *mut Demuxer;
|
||||||
|
let len = size.min((*muxer).buffer.len() as libc::c_int);
|
||||||
|
//info!("ask={}, have={}", size, len);
|
||||||
|
if len > 0 {
|
||||||
|
memcpy(
|
||||||
|
buffer as *mut libc::c_void,
|
||||||
|
(*muxer).buffer.as_ptr() as *const libc::c_void,
|
||||||
|
len as libc::c_ulonglong,
|
||||||
|
);
|
||||||
|
_ = (*muxer).buffer.split_to(len as usize);
|
||||||
|
len
|
||||||
|
} else {
|
||||||
|
-1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Demuxer {
|
||||||
|
const BUFFER_SIZE: usize = 1024 * 1024;
|
||||||
|
const MIN_BUFFER_TO_READ: usize = 8192;
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
unsafe {
|
||||||
|
let ps = avformat_alloc_context();
|
||||||
|
(*ps).probesize = Self::BUFFER_SIZE as i64;
|
||||||
|
(*ps).flags |= AVFMT_FLAG_CUSTOM_IO;
|
||||||
|
|
||||||
|
Self {
|
||||||
|
ctx: ps,
|
||||||
|
buffer: BytesMut::with_capacity(Self::BUFFER_SIZE),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn setup_io(&mut self, bytes: &Bytes) -> Result<bool, Error> {
|
||||||
|
self.buffer.extend_from_slice(bytes);
|
||||||
|
self.probe_input()
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn probe_input(&mut self) -> Result<bool, Error> {
|
||||||
|
let size = self.buffer.len();
|
||||||
|
let score = (*self.ctx).probe_score;
|
||||||
|
if score == 0 && size >= Self::MIN_BUFFER_TO_READ {
|
||||||
|
let pb = avio_alloc_context(
|
||||||
|
av_mallocz(4096) as *mut libc::c_uchar,
|
||||||
|
4096,
|
||||||
|
0,
|
||||||
|
self as *const Self as *mut libc::c_void,
|
||||||
|
Some(read_data),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
(*self.ctx).pb = pb;
|
||||||
|
let ret = avformat_open_input(
|
||||||
|
&mut self.ctx,
|
||||||
|
ptr::null_mut(),
|
||||||
|
ptr::null_mut(),
|
||||||
|
ptr::null_mut(),
|
||||||
|
);
|
||||||
|
if ret < 0 {
|
||||||
|
let msg = Self::get_ffmpeg_error_msg(ret);
|
||||||
|
return Err(Error::msg(msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
if avformat_find_stream_info(self.ctx, ptr::null_mut()) < 0 {
|
||||||
|
return Err(Error::msg("Could not find stream info"));
|
||||||
|
}
|
||||||
|
|
||||||
|
for x in 0..(*self.ctx).nb_streams {
|
||||||
|
av_dump_format(self.ctx, x as libc::c_int, ptr::null_mut(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(score > 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn decode_packet(&mut self) -> Result<Option<*mut AVPacket>, Error> {
|
||||||
|
let pkt: *mut AVPacket = av_packet_alloc();
|
||||||
|
av_init_packet(pkt);
|
||||||
|
|
||||||
|
let ret = av_read_frame(self.ctx, pkt);
|
||||||
|
if ret < 0 {
|
||||||
|
let msg = Self::get_ffmpeg_error_msg(ret);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(Some(pkt))
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn print_buffer_info(&mut self) {
|
||||||
|
let mut pb = (*self.ctx).pb;
|
||||||
|
let offset = (*pb).pos;
|
||||||
|
let remaining = (*pb).buffer_size as i64 - (*pb).pos;
|
||||||
|
info!("offset={}, remaining={}", offset, remaining);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_ffmpeg_error_msg(ret: libc::c_int) -> String {
|
||||||
|
unsafe {
|
||||||
|
let mut buf: [libc::c_char; 255] = [0; 255];
|
||||||
|
av_make_error_string(buf.as_mut_ptr(), 255, ret);
|
||||||
|
String::from(CStr::from_ptr(buf.as_ptr()).to_str().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Demuxer {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
unsafe {
|
||||||
|
avformat_free_context(self.ctx);
|
||||||
|
self.ctx = ptr::null_mut();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PipelineStep for Demuxer {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"Demuxer".to_owned()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process(&mut self, pkg: PipelinePayload) -> Result<PipelinePayload, Error> {
|
||||||
|
match pkg {
|
||||||
|
PipelinePayload::Bytes(ref bb) => unsafe {
|
||||||
|
if !self.setup_io(bb)? {
|
||||||
|
return Ok(PipelinePayload::Empty);
|
||||||
|
}
|
||||||
|
match self.decode_packet() {
|
||||||
|
Ok(pkt) => match pkt {
|
||||||
|
Some(pkt) => Ok(PipelinePayload::AvPacket(pkt)),
|
||||||
|
None => Ok(PipelinePayload::Empty),
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
warn!("{}", e);
|
||||||
|
Ok(PipelinePayload::Empty)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => return Err(Error::msg("Wrong pkg format")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
5
src/ingress/mod.rs
Normal file
5
src/ingress/mod.rs
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
pub mod srt;
|
||||||
|
|
||||||
|
pub struct ConnectionInfo {
|
||||||
|
|
||||||
|
}
|
54
src/ingress/srt.rs
Normal file
54
src/ingress/srt.rs
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
use crate::ingress::ConnectionInfo;
|
||||||
|
use crate::pipeline::builder::PipelineBuilder;
|
||||||
|
use crate::pipeline::runner::PipelineRunner;
|
||||||
|
use futures_util::{StreamExt, TryStreamExt};
|
||||||
|
use log::{info, warn};
|
||||||
|
use srt_tokio::{SrtListener, SrtSocket};
|
||||||
|
|
||||||
|
pub async fn listen_srt(port: u16, pipeline: PipelineBuilder) -> Result<(), anyhow::Error> {
|
||||||
|
let (_binding, mut packets) = SrtListener::builder().bind(port).await?;
|
||||||
|
|
||||||
|
while let Some(request) = packets.incoming().next().await {
|
||||||
|
let mut socket = request.accept(None).await?;
|
||||||
|
let pipeline = pipeline.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let info = ConnectionInfo {};
|
||||||
|
if let Ok(pl) = pipeline.build_for(info).await {
|
||||||
|
let mut stream = SrtStream::new(socket);
|
||||||
|
stream.run(pl).await;
|
||||||
|
} else {
|
||||||
|
socket.close_and_finish().await.unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SrtStream {
|
||||||
|
socket: SrtSocket,
|
||||||
|
prev: Option<(bytes::Bytes, usize)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SrtStream {
|
||||||
|
pub fn new(socket: SrtSocket) -> Self {
|
||||||
|
Self { socket, prev: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self, mut pipeline: PipelineRunner) {
|
||||||
|
let socket_id = self.socket.settings().remote_sockid.0;
|
||||||
|
let client_desc = format!(
|
||||||
|
"(ip_port: {}, socket_id: {}, stream_id: {:?})",
|
||||||
|
self.socket.settings().remote,
|
||||||
|
socket_id,
|
||||||
|
self.socket.settings().stream_id,
|
||||||
|
);
|
||||||
|
info!("New client connected: {}", client_desc);
|
||||||
|
while let Ok(Some((_inst, bytes))) = self.socket.try_next().await {
|
||||||
|
if let Err(e) = pipeline.push(bytes).await {
|
||||||
|
warn!("{:?}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("Client {} disconnected", client_desc);
|
||||||
|
}
|
||||||
|
}
|
30
src/main.rs
Normal file
30
src/main.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
mod pipeline;
|
||||||
|
mod ingress;
|
||||||
|
mod webhook;
|
||||||
|
mod demux;
|
||||||
|
|
||||||
|
use std::ffi::CStr;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use log::info;
|
||||||
|
use crate::pipeline::builder::PipelineBuilder;
|
||||||
|
use crate::webhook::Webhook;
|
||||||
|
|
||||||
|
/// Test: ffmpeg -re -f lavfi -i testsrc -g 2 -r 30 -pix_fmt yuv420p -s 1280x720 -c:v h264 -b:v 2000k -f mpegts srt://localhost:3333
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
pretty_env_logger::init();
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
ffmpeg_sys_next::av_log_set_level(ffmpeg_sys_next::AV_LOG_INFO);
|
||||||
|
info!("{}", CStr::from_ptr(ffmpeg_sys_next::av_version_info()).to_str().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
let webhook = Webhook::new("".to_owned());
|
||||||
|
let builder = PipelineBuilder::new(webhook);
|
||||||
|
let srt = tokio::spawn(ingress::srt::listen_srt(3333, builder));
|
||||||
|
|
||||||
|
srt.await?.expect("TODO: panic message");
|
||||||
|
|
||||||
|
println!("\nServer closed");
|
||||||
|
Ok(())
|
||||||
|
}
|
25
src/pipeline/builder.rs
Normal file
25
src/pipeline/builder.rs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
use crate::demux::Demuxer;
|
||||||
|
use crate::ingress::ConnectionInfo;
|
||||||
|
use crate::pipeline::runner::PipelineRunner;
|
||||||
|
use crate::pipeline::PipelineStep;
|
||||||
|
use crate::webhook::Webhook;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PipelineBuilder {
|
||||||
|
webhook: Webhook,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PipelineBuilder {
|
||||||
|
pub fn new(webhook: Webhook) -> Self {
|
||||||
|
Self { webhook }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build_for(&self, info: ConnectionInfo) -> Result<PipelineRunner, anyhow::Error> {
|
||||||
|
let config = self.webhook.start(info).await?;
|
||||||
|
|
||||||
|
let mut steps: Vec<Box<dyn PipelineStep + Sync + Send>> = Vec::new();
|
||||||
|
steps.push(Box::new(Demuxer::new()));
|
||||||
|
|
||||||
|
Ok(PipelineRunner::new(steps))
|
||||||
|
}
|
||||||
|
}
|
42
src/pipeline/mod.rs
Normal file
42
src/pipeline/mod.rs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use ffmpeg_sys_next::{av_packet_unref, AVPacket};
|
||||||
|
use std::ops::DerefMut;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
pub mod builder;
|
||||||
|
pub mod runner;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PipelinePayload {
|
||||||
|
/// No output
|
||||||
|
Empty,
|
||||||
|
/// Skip this step
|
||||||
|
Skip,
|
||||||
|
/// Raw bytes from ingress
|
||||||
|
Bytes(bytes::Bytes),
|
||||||
|
/// FFMpeg AVPacket
|
||||||
|
AvPacket(*mut AVPacket),
|
||||||
|
/// FFMpeg AVFrame
|
||||||
|
AvFrame(),
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for PipelinePayload {}
|
||||||
|
unsafe impl Sync for PipelinePayload {}
|
||||||
|
|
||||||
|
impl Drop for PipelinePayload {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
match self {
|
||||||
|
PipelinePayload::AvPacket(pkt) => unsafe {
|
||||||
|
av_packet_unref(*pkt);
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait PipelineStep {
|
||||||
|
fn name(&self) -> String;
|
||||||
|
|
||||||
|
async fn process(&mut self, pkg: PipelinePayload) -> Result<PipelinePayload, anyhow::Error>;
|
||||||
|
}
|
21
src/pipeline/runner.rs
Normal file
21
src/pipeline/runner.rs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
use crate::pipeline::{PipelinePayload, PipelineStep};
|
||||||
|
|
||||||
|
pub struct PipelineRunner {
|
||||||
|
steps: Vec<Box<dyn PipelineStep + Sync + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PipelineRunner {
|
||||||
|
pub fn new(steps: Vec<Box<dyn PipelineStep + Sync + Send>>) -> Self {
|
||||||
|
Self { steps }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn push(&mut self, bytes: bytes::Bytes) -> Result<(), anyhow::Error> {
|
||||||
|
let mut output = PipelinePayload::Bytes(bytes);
|
||||||
|
for step in &mut self.steps {
|
||||||
|
let output2 = step.process(output).await?;
|
||||||
|
//info!("{} result: {:?}", step.name(), output2);
|
||||||
|
output = output2;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
18
src/webhook.rs
Normal file
18
src/webhook.rs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
use crate::ingress::ConnectionInfo;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct Webhook {
|
||||||
|
url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Webhook {
|
||||||
|
pub fn new(url: String) -> Self {
|
||||||
|
Self { url }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(&self, connection_info: ConnectionInfo) -> Result<PipelineConfig, anyhow::Error> {
|
||||||
|
Ok(PipelineConfig {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct PipelineConfig {}
|
Reference in New Issue
Block a user