chore: testing n94 streams

This commit is contained in:
kieran 2024-11-19 17:12:03 +00:00
parent e111e50199
commit 877db958fc
No known key found for this signature in database
GPG Key ID: DE71CEB3925BE941
9 changed files with 223 additions and 141 deletions

View File

@ -1,2 +1,7 @@
- Store user preference for (rates and recording) [DB]
- Setup multi-variant output
- Manage event lifecycle (close stream)
- Manage event lifecycle (close stream)
- RTMP?
- API parity
- fMP4 instead of MPEG-TS segments
- HLS-LL

View File

@ -39,6 +39,8 @@ listen_http: "127.0.0.1:8080"
overseer:
zap-stream:
nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4"
blossom:
- "http://localhost:8881"
relays:
- "ws://localhost:7766"
database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2"

View File

@ -10,12 +10,19 @@ use ffmpeg_rs_raw::{cstr, Encoder, Muxer};
use itertools::Itertools;
use log::{info, warn};
use m3u8_rs::MediaSegment;
use std::collections::HashMap;
use std::fmt::Display;
use std::fs::File;
use std::path::PathBuf;
use std::ptr;
use uuid::Uuid;
#[derive(Clone, Copy)]
pub enum SegmentType {
MPEGTS,
FMP4,
}
pub enum HlsVariantStream {
Video {
group: usize,
@ -79,22 +86,24 @@ pub struct HlsVariant {
pub out_dir: String,
/// List of segments to be included in the playlist
pub segments: Vec<SegmentInfo>,
/// Type of segments to create
pub segment_type: SegmentType,
}
struct SegmentInfo(u64, f32);
struct SegmentInfo(u64, f32, SegmentType);
impl SegmentInfo {
fn to_media_segment(&self) -> MediaSegment {
MediaSegment {
uri: HlsVariant::segment_name(self.0),
uri: self.filename(),
duration: self.1,
title: Some("no desc".to_string()),
title: None,
..MediaSegment::default()
}
}
fn filename(&self) -> String {
HlsVariant::segment_name(self.0)
HlsVariant::segment_name(self.2, self.0)
}
}
@ -104,14 +113,32 @@ impl HlsVariant {
segment_length: f32,
group: usize,
encoded_vars: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType,
) -> Result<Self> {
let name = format!("stream_{}", group);
let first_seg = Self::map_segment_path(out_dir, &name, 1);
let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type);
std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?;
let mut opts = HashMap::new();
match segment_type {
SegmentType::FMP4 => {
opts.insert("fflags".to_string(), "-autobsf".to_string());
opts.insert(
"movflags".to_string(),
"+frag_custom+dash+delay_moov".to_string(),
);
}
_ => {}
};
let mut mux = unsafe {
Muxer::builder()
.with_output_path(first_seg.as_str(), Some("mpegts"))?
.with_output_path(
first_seg.as_str(),
match segment_type {
SegmentType::MPEGTS => Some("mpegts"),
SegmentType::FMP4 => Some("mp4"),
},
)?
.build()?
};
let mut streams = Vec::new();
@ -145,7 +172,7 @@ impl HlsVariant {
}
}
unsafe {
mux.open(None)?;
mux.open(Some(opts))?;
}
Ok(Self {
name: name.clone(),
@ -154,23 +181,27 @@ impl HlsVariant {
streams,
idx: 1,
pkt_start: 0.0,
segments: Vec::from([SegmentInfo(1, segment_length)]),
segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]),
out_dir: out_dir.to_string(),
segment_type,
})
}
pub fn segment_name(idx: u64) -> String {
format!("{}.ts", idx)
pub fn segment_name(t: SegmentType, idx: u64) -> String {
match t {
SegmentType::MPEGTS => format!("{}.ts", idx),
SegmentType::FMP4 => format!("{}.m4s", idx),
}
}
pub fn out_dir(&self) -> PathBuf {
PathBuf::from(&self.out_dir).join(&self.name)
}
pub fn map_segment_path(out_dir: &str, name: &str, idx: u64) -> String {
pub fn map_segment_path(out_dir: &str, name: &str, idx: u64, typ: SegmentType) -> String {
PathBuf::from(out_dir)
.join(name)
.join(Self::segment_name(idx))
.join(Self::segment_name(typ, idx))
.to_string_lossy()
.to_string()
}
@ -201,7 +232,8 @@ impl HlsVariant {
avio_flush((*ctx).pb);
av_free((*ctx).url as *mut _);
let next_seg_url = Self::map_segment_path(&*self.out_dir, &self.name, self.idx);
let next_seg_url =
Self::map_segment_path(&*self.out_dir, &self.name, self.idx, self.segment_type);
(*ctx).url = cstr!(next_seg_url.as_str());
let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE);
@ -234,7 +266,12 @@ impl HlsVariant {
variant: *video_var.id(),
idx: prev_seg,
duration,
path: PathBuf::from(Self::map_segment_path(&*self.out_dir, &self.name, prev_seg)),
path: PathBuf::from(Self::map_segment_path(
&*self.out_dir,
&self.name,
prev_seg,
self.segment_type,
)),
};
self.pkt_start = pkt_time;
Ok(ret)
@ -247,7 +284,8 @@ impl HlsVariant {
}
fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> {
self.segments.push(SegmentInfo(idx, duration));
self.segments
.push(SegmentInfo(idx, duration, self.segment_type));
const MAX_SEGMENTS: usize = 10;
@ -340,6 +378,7 @@ impl HlsMuxer {
out_dir: &str,
segment_length: f32,
encoders: impl Iterator<Item = (&'a VariantStream, &'a Encoder)>,
segment_type: SegmentType,
) -> Result<Self> {
let base = PathBuf::from(out_dir).join(id.to_string());
@ -348,7 +387,13 @@ impl HlsMuxer {
.sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id()))
.chunk_by(|a| a.0.group_id())
{
let var = HlsVariant::new(base.to_str().unwrap(), segment_length, k, group)?;
let var = HlsVariant::new(
base.to_str().unwrap(),
segment_length,
k,
group,
segment_type,
)?;
vars.push(var);
}

View File

@ -16,6 +16,7 @@ use std::cmp::PartialEq;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
use warp::Filter;
mod webhook;
@ -179,8 +180,9 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result<Vec<VariantStre
Ok(vars)
}
/// Simple static file output without any access controls
struct StaticOverseer {}
struct StaticOverseer;
impl StaticOverseer {
fn new(out_dir: &str, egress_types: &Vec<String>) -> Self {
@ -192,7 +194,7 @@ impl StaticOverseer {
impl Overseer for StaticOverseer {
async fn start_stream(
&self,
connection: &ConnectionInfo,
_connection: &ConnectionInfo,
stream_info: &IngressInfo,
) -> Result<PipelineConfig> {
let vars = get_default_variants(stream_info)?;
@ -200,17 +202,10 @@ impl Overseer for StaticOverseer {
Ok(PipelineConfig {
id: Uuid::new_v4(),
variants: vars,
egress: vec![
/*EgressType::Recorder(EgressConfig {
name: "REC".to_owned(),
out_dir: self.config.output_dir.clone(),
variants: var_ids,
}),*/
EgressType::HLS(EgressConfig {
name: "HLS".to_owned(),
variants: var_ids,
}),
],
egress: vec![EgressType::HLS(EgressConfig {
name: "HLS".to_owned(),
variants: var_ids,
})],
})
}
@ -222,7 +217,8 @@ impl Overseer for StaticOverseer {
duration: f32,
path: &PathBuf,
) -> Result<()> {
todo!()
// nothing to do here
Ok(())
}
async fn on_thumbnail(
@ -232,6 +228,7 @@ impl Overseer for StaticOverseer {
height: usize,
path: &PathBuf,
) -> Result<()> {
todo!()
// nothing to do here
Ok(())
}
}

View File

@ -24,6 +24,7 @@ use std::path::PathBuf;
use std::str::FromStr;
use url::Url;
use uuid::Uuid;
use warp::Filter;
use zap_stream_db::sqlx::Encode;
use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb};
@ -95,6 +96,112 @@ impl ZapStreamOverseer {
public_url: public_url.clone(),
})
}
fn stream_to_event_builder(&self, stream: &UserStream) -> Result<EventBuilder> {
let mut tags = vec![
Tag::parse(&["d".to_string(), stream.id.to_string()])?,
Tag::parse(&["status".to_string(), stream.state.to_string()])?,
Tag::parse(&["starts".to_string(), stream.starts.timestamp().to_string()])?,
];
if let Some(ref ends) = stream.ends {
tags.push(Tag::parse(&[
"ends".to_string(),
ends.timestamp().to_string(),
])?);
}
if let Some(ref title) = stream.title {
tags.push(Tag::parse(&["title".to_string(), title.to_string()])?);
}
if let Some(ref summary) = stream.summary {
tags.push(Tag::parse(&["summary".to_string(), summary.to_string()])?);
}
if let Some(ref image) = stream.image {
tags.push(Tag::parse(&["image".to_string(), image.to_string()])?);
}
if let Some(ref thumb) = stream.thumb {
tags.push(Tag::parse(&["thumb".to_string(), thumb.to_string()])?);
}
if let Some(ref content_warning) = stream.content_warning {
tags.push(Tag::parse(&[
"content_warning".to_string(),
content_warning.to_string(),
])?);
}
if let Some(ref goal) = stream.goal {
tags.push(Tag::parse(&["goal".to_string(), goal.to_string()])?);
}
if let Some(ref pinned) = stream.pinned {
tags.push(Tag::parse(&["pinned".to_string(), pinned.to_string()])?);
}
if let Some(ref tags_csv) = stream.tags {
for tag in tags_csv.split(',') {
tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?);
}
}
let kind = Kind::from(STREAM_EVENT_KIND);
let coord = Coordinate::new(kind, self.keys.public_key).identifier(stream.id);
tags.push(Tag::parse(&[
"alt",
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
])?);
Ok(EventBuilder::new(kind, "", tags))
}
fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result<EventBuilder> {
let tags = if let Some(tags) = stream.nip94.as_ref() {
tags.iter()
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
.collect()
} else {
let mut tags = vec![
Tag::parse(&["x", &stream.sha256])?,
Tag::parse(&["url", &stream.url])?,
Tag::parse(&["size", &stream.size.to_string()])?,
];
if let Some(m) = stream.mime_type.as_ref() {
tags.push(Tag::parse(&["m", m])?)
}
tags
};
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
}
async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec<u8>) -> Result<Event> {
let mut extra_tags = vec![
Tag::parse(&["p", hex::encode(pubkey).as_str(), "", "host"])?,
Tag::parse(&[
"streaming",
self.map_to_public_url(stream, "live.m3u8")?.as_str(),
])?,
Tag::parse(&[
"image",
self.map_to_public_url(stream, "thumb.webp")?.as_str(),
])?,
];
// flag NIP94 streaming when using blossom servers
if self.blossom_servers.len() > 0 {
extra_tags.push(Tag::parse(&["streaming", "nip94"])?);
}
let ev = self
.stream_to_event_builder(stream)?
.add_tags(extra_tags)
.sign_with_keys(&self.keys)?;
self.client.send_event(ev.clone()).await?;
Ok(ev)
}
fn map_to_public_url<'a>(
&self,
stream: &UserStream,
path: impl Into<&'a str>,
) -> Result<String> {
let u: Url = self.public_url.parse()?;
Ok(u.join(&format!("/{}/", stream.id))?
.join(path.into())?
.to_string())
}
}
#[async_trait]
@ -118,6 +225,7 @@ impl Overseer for ZapStreamOverseer {
variants: variants.iter().map(|v| v.id()).collect(),
}));
let user = self.db.get_user(uid).await?;
// insert new stream record
let mut new_stream = UserStream {
id: Uuid::new_v4(),
@ -126,8 +234,7 @@ impl Overseer for ZapStreamOverseer {
state: UserStreamState::Live,
..Default::default()
};
let stream_event =
publish_stream_event(&new_stream, &self.client, &self.keys, &self.public_url).await?;
let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?;
new_stream.event = Some(stream_event.as_json());
self.db.insert_stream(&new_stream).await?;
@ -158,13 +265,17 @@ impl Overseer for ZapStreamOverseer {
self.keys.public_key.to_hex(),
pipeline_id
);
let mut n94 = blob_to_event_builder(blob)?.add_tags(Tag::parse(&["a", &a_tag]));
let mut n94 = self.blob_to_event_builder(blob)?.add_tags([
Tag::parse(&["a", &a_tag])?,
Tag::parse(&["d", variant_id.to_string().as_str()])?,
Tag::parse(&["duration", duration.to_string().as_str()])?,
]);
for b in blobs.iter().skip(1) {
n94 = n94.add_tags(Tag::parse(&["url", &b.url]));
}
let n94 = n94.sign_with_keys(&self.keys)?;
self.client.send_event(n94).await?;
info!("Published N94 segment for {}", a_tag);
info!("Published N94 segment to {}", blob.url);
}
Ok(())
@ -181,103 +292,3 @@ impl Overseer for ZapStreamOverseer {
Ok(())
}
}
fn stream_to_event_builder(this: &UserStream, keys: &Keys) -> Result<EventBuilder> {
let mut tags = vec![
Tag::parse(&["d".to_string(), this.id.to_string()])?,
Tag::parse(&["status".to_string(), this.state.to_string()])?,
Tag::parse(&["starts".to_string(), this.starts.timestamp().to_string()])?,
];
if let Some(ref ends) = this.ends {
tags.push(Tag::parse(&[
"ends".to_string(),
ends.timestamp().to_string(),
])?);
}
if let Some(ref title) = this.title {
tags.push(Tag::parse(&["title".to_string(), title.to_string()])?);
}
if let Some(ref summary) = this.summary {
tags.push(Tag::parse(&["summary".to_string(), summary.to_string()])?);
}
if let Some(ref image) = this.image {
tags.push(Tag::parse(&["image".to_string(), image.to_string()])?);
}
if let Some(ref thumb) = this.thumb {
tags.push(Tag::parse(&["thumb".to_string(), thumb.to_string()])?);
}
if let Some(ref content_warning) = this.content_warning {
tags.push(Tag::parse(&[
"content_warning".to_string(),
content_warning.to_string(),
])?);
}
if let Some(ref goal) = this.goal {
tags.push(Tag::parse(&["goal".to_string(), goal.to_string()])?);
}
if let Some(ref pinned) = this.pinned {
tags.push(Tag::parse(&["pinned".to_string(), pinned.to_string()])?);
}
if let Some(ref tags_csv) = this.tags {
for tag in tags_csv.split(',') {
tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?);
}
}
let kind = Kind::from(STREAM_EVENT_KIND);
let coord = Coordinate::new(kind, keys.public_key).identifier(this.id);
tags.push(Tag::parse(&[
"alt",
&format!("Watch live on https://zap.stream/{}", coord.to_bech32()?),
])?);
Ok(EventBuilder::new(kind, "", tags))
}
fn stream_url_mapping(this: &UserStream, public_url: &str) -> Result<String> {
let u: Url = public_url.parse()?;
// hls muxer always writes the master playlist like this
Ok(u.join(&format!("/{}/live.m3u8", this.id))?.to_string())
}
fn image_url_mapping(this: &UserStream, public_url: &str) -> Result<String> {
let u: Url = public_url.parse()?;
// pipeline always writes a thumbnail like this
Ok(u.join(&format!("/{}/thumb.webp", this.id))?.to_string())
}
async fn publish_stream_event(
this: &UserStream,
client: &Client,
keys: &Keys,
public_url: &str,
) -> Result<Event> {
let ev = stream_to_event_builder(this, keys)?
.add_tags([
Tag::parse(&["streaming", stream_url_mapping(this, public_url)?.as_str()])?,
Tag::parse(&["image", image_url_mapping(this, public_url)?.as_str()])?,
])
.sign(&client.signer().await?)
.await?;
client.send_event(ev.clone()).await?;
Ok(ev)
}
fn blob_to_event_builder(this: &BlobDescriptor) -> Result<EventBuilder> {
let tags = if let Some(tags) = this.nip94.as_ref() {
tags.iter()
.map_while(|(k, v)| Tag::parse(&[k, v]).ok())
.collect()
} else {
let mut tags = vec![
Tag::parse(&["x", &this.sha256])?,
Tag::parse(&["url", &this.url])?,
Tag::parse(&["size", &this.size.to_string()])?,
];
if let Some(m) = this.mime_type.as_ref() {
tags.push(Tag::parse(&["m", m])?)
}
tags
};
Ok(EventBuilder::new(Kind::FileMetadata, "", tags))
}

View File

@ -10,6 +10,7 @@ use crate::egress::hls::HlsEgress;
use crate::egress::recorder::RecorderEgress;
use crate::egress::{Egress, EgressResult};
use crate::ingress::ConnectionInfo;
use crate::mux::SegmentType;
use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
use crate::pipeline::{EgressType, PipelineConfig};
use crate::variant::{StreamMapping, VariantStream};
@ -359,7 +360,8 @@ impl PipelineRunner {
});
match e {
EgressType::HLS(_) => {
let hls = HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders)?;
let hls =
HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders, SegmentType::MPEGTS)?;
self.egress.push(Box::new(hls));
}
EgressType::Recorder(_) => {

View File

@ -8,7 +8,8 @@ create table user
tos_accepted timestamp,
stream_key text not null default uuid(),
is_admin bool not null default false,
is_blocked bool not null default false
is_blocked bool not null default false,
recording bool not null default false
);
create unique index ix_user_pubkey on user (pubkey);
create table user_stream

View File

@ -1,4 +1,4 @@
use crate::UserStream;
use crate::{User, UserStream};
use anyhow::Result;
use sqlx::{MySqlPool, Row};
use uuid::Uuid;
@ -33,6 +33,15 @@ impl ZapStreamDb {
.map(|r| r.try_get(0).unwrap()))
}
/// Get user by id
pub async fn get_user(&self, uid: u64) -> Result<User> {
Ok(sqlx::query_as("select * from user where id = ?")
.bind(uid)
.fetch_one(&self.db)
.await
.map_err(anyhow::Error::new)?)
}
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
.bind(pubkey.as_slice())

View File

@ -5,14 +5,24 @@ use uuid::Uuid;
#[derive(Debug, Clone, FromRow)]
pub struct User {
/// Database ID for this uer
pub id: u64,
pub pubkey: [u8; 32],
/// Nostr pubkey of this user
pub pubkey: Vec<u8>,
/// Timestamp when this user first used the service
pub created: DateTime<Utc>,
/// Current balance in milli-sats
pub balance: i64,
pub tos_accepted: DateTime<Utc>,
/// When the TOS was accepted
pub tos_accepted: Option<DateTime<Utc>>,
/// Primary stream key
pub stream_key: String,
/// If the user is an admin
pub is_admin: bool,
/// If the user is blocked from streaming
pub is_blocked: bool,
/// Streams are recorded
pub recording: bool,
}
#[derive(Default, Debug, Clone, Type)]