From 877db958fcdc8099fd3d0ebf41e001fb99116a29 Mon Sep 17 00:00:00 2001 From: kieran Date: Tue, 19 Nov 2024 17:12:03 +0000 Subject: [PATCH] chore: testing n94 streams --- TODO.md | 7 +- config.yaml | 2 + src/mux/hls.rs | 77 ++++-- src/overseer/mod.rs | 27 +-- src/overseer/zap_stream.rs | 219 +++++++++--------- src/pipeline/runner.rs | 4 +- .../migrations/20241115120541_init.sql | 3 +- zap-stream-db/src/db.rs | 11 +- zap-stream-db/src/model.rs | 14 +- 9 files changed, 223 insertions(+), 141 deletions(-) diff --git a/TODO.md b/TODO.md index cbafb29..b491287 100644 --- a/TODO.md +++ b/TODO.md @@ -1,2 +1,7 @@ +- Store user preference for (rates and recording) [DB] - Setup multi-variant output -- Manage event lifecycle (close stream) \ No newline at end of file +- Manage event lifecycle (close stream) +- RTMP? +- API parity +- fMP4 instead of MPEG-TS segments +- HLS-LL \ No newline at end of file diff --git a/config.yaml b/config.yaml index 86817bc..20b1a3b 100755 --- a/config.yaml +++ b/config.yaml @@ -39,6 +39,8 @@ listen_http: "127.0.0.1:8080" overseer: zap-stream: nsec: "nsec1wya428srvpu96n4h78gualaj7wqw4ecgatgja8d5ytdqrxw56r2se440y4" + blossom: + - "http://localhost:8881" relays: - "ws://localhost:7766" database: "mysql://root:root@localhost:3368/zap_stream?max_connections=2" diff --git a/src/mux/hls.rs b/src/mux/hls.rs index 989d31b..b56564a 100644 --- a/src/mux/hls.rs +++ b/src/mux/hls.rs @@ -10,12 +10,19 @@ use ffmpeg_rs_raw::{cstr, Encoder, Muxer}; use itertools::Itertools; use log::{info, warn}; use m3u8_rs::MediaSegment; +use std::collections::HashMap; use std::fmt::Display; use std::fs::File; use std::path::PathBuf; use std::ptr; use uuid::Uuid; +#[derive(Clone, Copy)] +pub enum SegmentType { + MPEGTS, + FMP4, +} + pub enum HlsVariantStream { Video { group: usize, @@ -79,22 +86,24 @@ pub struct HlsVariant { pub out_dir: String, /// List of segments to be included in the playlist pub segments: Vec, + /// Type of segments to create + pub segment_type: SegmentType, } -struct SegmentInfo(u64, f32); +struct SegmentInfo(u64, f32, SegmentType); impl SegmentInfo { fn to_media_segment(&self) -> MediaSegment { MediaSegment { - uri: HlsVariant::segment_name(self.0), + uri: self.filename(), duration: self.1, - title: Some("no desc".to_string()), + title: None, ..MediaSegment::default() } } fn filename(&self) -> String { - HlsVariant::segment_name(self.0) + HlsVariant::segment_name(self.2, self.0) } } @@ -104,14 +113,32 @@ impl HlsVariant { segment_length: f32, group: usize, encoded_vars: impl Iterator, + segment_type: SegmentType, ) -> Result { let name = format!("stream_{}", group); - let first_seg = Self::map_segment_path(out_dir, &name, 1); + let first_seg = Self::map_segment_path(out_dir, &name, 1, segment_type); std::fs::create_dir_all(PathBuf::from(&first_seg).parent().unwrap())?; + let mut opts = HashMap::new(); + match segment_type { + SegmentType::FMP4 => { + opts.insert("fflags".to_string(), "-autobsf".to_string()); + opts.insert( + "movflags".to_string(), + "+frag_custom+dash+delay_moov".to_string(), + ); + } + _ => {} + }; let mut mux = unsafe { Muxer::builder() - .with_output_path(first_seg.as_str(), Some("mpegts"))? + .with_output_path( + first_seg.as_str(), + match segment_type { + SegmentType::MPEGTS => Some("mpegts"), + SegmentType::FMP4 => Some("mp4"), + }, + )? .build()? }; let mut streams = Vec::new(); @@ -145,7 +172,7 @@ impl HlsVariant { } } unsafe { - mux.open(None)?; + mux.open(Some(opts))?; } Ok(Self { name: name.clone(), @@ -154,23 +181,27 @@ impl HlsVariant { streams, idx: 1, pkt_start: 0.0, - segments: Vec::from([SegmentInfo(1, segment_length)]), + segments: Vec::from([SegmentInfo(1, segment_length, segment_type)]), out_dir: out_dir.to_string(), + segment_type, }) } - pub fn segment_name(idx: u64) -> String { - format!("{}.ts", idx) + pub fn segment_name(t: SegmentType, idx: u64) -> String { + match t { + SegmentType::MPEGTS => format!("{}.ts", idx), + SegmentType::FMP4 => format!("{}.m4s", idx), + } } pub fn out_dir(&self) -> PathBuf { PathBuf::from(&self.out_dir).join(&self.name) } - pub fn map_segment_path(out_dir: &str, name: &str, idx: u64) -> String { + pub fn map_segment_path(out_dir: &str, name: &str, idx: u64, typ: SegmentType) -> String { PathBuf::from(out_dir) .join(name) - .join(Self::segment_name(idx)) + .join(Self::segment_name(typ, idx)) .to_string_lossy() .to_string() } @@ -201,7 +232,8 @@ impl HlsVariant { avio_flush((*ctx).pb); av_free((*ctx).url as *mut _); - let next_seg_url = Self::map_segment_path(&*self.out_dir, &self.name, self.idx); + let next_seg_url = + Self::map_segment_path(&*self.out_dir, &self.name, self.idx, self.segment_type); (*ctx).url = cstr!(next_seg_url.as_str()); let ret = avio_open(&mut (*ctx).pb, (*ctx).url, AVIO_FLAG_WRITE); @@ -234,7 +266,12 @@ impl HlsVariant { variant: *video_var.id(), idx: prev_seg, duration, - path: PathBuf::from(Self::map_segment_path(&*self.out_dir, &self.name, prev_seg)), + path: PathBuf::from(Self::map_segment_path( + &*self.out_dir, + &self.name, + prev_seg, + self.segment_type, + )), }; self.pkt_start = pkt_time; Ok(ret) @@ -247,7 +284,8 @@ impl HlsVariant { } fn add_segment(&mut self, idx: u64, duration: f32) -> Result<()> { - self.segments.push(SegmentInfo(idx, duration)); + self.segments + .push(SegmentInfo(idx, duration, self.segment_type)); const MAX_SEGMENTS: usize = 10; @@ -340,6 +378,7 @@ impl HlsMuxer { out_dir: &str, segment_length: f32, encoders: impl Iterator, + segment_type: SegmentType, ) -> Result { let base = PathBuf::from(out_dir).join(id.to_string()); @@ -348,7 +387,13 @@ impl HlsMuxer { .sorted_by(|a, b| a.0.group_id().cmp(&b.0.group_id())) .chunk_by(|a| a.0.group_id()) { - let var = HlsVariant::new(base.to_str().unwrap(), segment_length, k, group)?; + let var = HlsVariant::new( + base.to_str().unwrap(), + segment_length, + k, + group, + segment_type, + )?; vars.push(var); } diff --git a/src/overseer/mod.rs b/src/overseer/mod.rs index e3b7674..4b1347a 100644 --- a/src/overseer/mod.rs +++ b/src/overseer/mod.rs @@ -16,6 +16,7 @@ use std::cmp::PartialEq; use std::path::PathBuf; use std::sync::Arc; use uuid::Uuid; +use warp::Filter; mod webhook; @@ -179,8 +180,9 @@ pub(crate) fn get_default_variants(info: &IngressInfo) -> Result) -> Self { @@ -192,7 +194,7 @@ impl StaticOverseer { impl Overseer for StaticOverseer { async fn start_stream( &self, - connection: &ConnectionInfo, + _connection: &ConnectionInfo, stream_info: &IngressInfo, ) -> Result { let vars = get_default_variants(stream_info)?; @@ -200,17 +202,10 @@ impl Overseer for StaticOverseer { Ok(PipelineConfig { id: Uuid::new_v4(), variants: vars, - egress: vec![ - /*EgressType::Recorder(EgressConfig { - name: "REC".to_owned(), - out_dir: self.config.output_dir.clone(), - variants: var_ids, - }),*/ - EgressType::HLS(EgressConfig { - name: "HLS".to_owned(), - variants: var_ids, - }), - ], + egress: vec![EgressType::HLS(EgressConfig { + name: "HLS".to_owned(), + variants: var_ids, + })], }) } @@ -222,7 +217,8 @@ impl Overseer for StaticOverseer { duration: f32, path: &PathBuf, ) -> Result<()> { - todo!() + // nothing to do here + Ok(()) } async fn on_thumbnail( @@ -232,6 +228,7 @@ impl Overseer for StaticOverseer { height: usize, path: &PathBuf, ) -> Result<()> { - todo!() + // nothing to do here + Ok(()) } } diff --git a/src/overseer/zap_stream.rs b/src/overseer/zap_stream.rs index 58b3f76..903208c 100644 --- a/src/overseer/zap_stream.rs +++ b/src/overseer/zap_stream.rs @@ -24,6 +24,7 @@ use std::path::PathBuf; use std::str::FromStr; use url::Url; use uuid::Uuid; +use warp::Filter; use zap_stream_db::sqlx::Encode; use zap_stream_db::{UserStream, UserStreamState, ZapStreamDb}; @@ -95,6 +96,112 @@ impl ZapStreamOverseer { public_url: public_url.clone(), }) } + + fn stream_to_event_builder(&self, stream: &UserStream) -> Result { + let mut tags = vec![ + Tag::parse(&["d".to_string(), stream.id.to_string()])?, + Tag::parse(&["status".to_string(), stream.state.to_string()])?, + Tag::parse(&["starts".to_string(), stream.starts.timestamp().to_string()])?, + ]; + if let Some(ref ends) = stream.ends { + tags.push(Tag::parse(&[ + "ends".to_string(), + ends.timestamp().to_string(), + ])?); + } + if let Some(ref title) = stream.title { + tags.push(Tag::parse(&["title".to_string(), title.to_string()])?); + } + if let Some(ref summary) = stream.summary { + tags.push(Tag::parse(&["summary".to_string(), summary.to_string()])?); + } + if let Some(ref image) = stream.image { + tags.push(Tag::parse(&["image".to_string(), image.to_string()])?); + } + if let Some(ref thumb) = stream.thumb { + tags.push(Tag::parse(&["thumb".to_string(), thumb.to_string()])?); + } + if let Some(ref content_warning) = stream.content_warning { + tags.push(Tag::parse(&[ + "content_warning".to_string(), + content_warning.to_string(), + ])?); + } + if let Some(ref goal) = stream.goal { + tags.push(Tag::parse(&["goal".to_string(), goal.to_string()])?); + } + if let Some(ref pinned) = stream.pinned { + tags.push(Tag::parse(&["pinned".to_string(), pinned.to_string()])?); + } + if let Some(ref tags_csv) = stream.tags { + for tag in tags_csv.split(',') { + tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?); + } + } + + let kind = Kind::from(STREAM_EVENT_KIND); + let coord = Coordinate::new(kind, self.keys.public_key).identifier(stream.id); + tags.push(Tag::parse(&[ + "alt", + &format!("Watch live on https://zap.stream/{}", coord.to_bech32()?), + ])?); + Ok(EventBuilder::new(kind, "", tags)) + } + + fn blob_to_event_builder(&self, stream: &BlobDescriptor) -> Result { + let tags = if let Some(tags) = stream.nip94.as_ref() { + tags.iter() + .map_while(|(k, v)| Tag::parse(&[k, v]).ok()) + .collect() + } else { + let mut tags = vec![ + Tag::parse(&["x", &stream.sha256])?, + Tag::parse(&["url", &stream.url])?, + Tag::parse(&["size", &stream.size.to_string()])?, + ]; + if let Some(m) = stream.mime_type.as_ref() { + tags.push(Tag::parse(&["m", m])?) + } + tags + }; + + Ok(EventBuilder::new(Kind::FileMetadata, "", tags)) + } + + async fn publish_stream_event(&self, stream: &UserStream, pubkey: &Vec) -> Result { + let mut extra_tags = vec![ + Tag::parse(&["p", hex::encode(pubkey).as_str(), "", "host"])?, + Tag::parse(&[ + "streaming", + self.map_to_public_url(stream, "live.m3u8")?.as_str(), + ])?, + Tag::parse(&[ + "image", + self.map_to_public_url(stream, "thumb.webp")?.as_str(), + ])?, + ]; + // flag NIP94 streaming when using blossom servers + if self.blossom_servers.len() > 0 { + extra_tags.push(Tag::parse(&["streaming", "nip94"])?); + } + let ev = self + .stream_to_event_builder(stream)? + .add_tags(extra_tags) + .sign_with_keys(&self.keys)?; + self.client.send_event(ev.clone()).await?; + Ok(ev) + } + + fn map_to_public_url<'a>( + &self, + stream: &UserStream, + path: impl Into<&'a str>, + ) -> Result { + let u: Url = self.public_url.parse()?; + Ok(u.join(&format!("/{}/", stream.id))? + .join(path.into())? + .to_string()) + } } #[async_trait] @@ -118,6 +225,7 @@ impl Overseer for ZapStreamOverseer { variants: variants.iter().map(|v| v.id()).collect(), })); + let user = self.db.get_user(uid).await?; // insert new stream record let mut new_stream = UserStream { id: Uuid::new_v4(), @@ -126,8 +234,7 @@ impl Overseer for ZapStreamOverseer { state: UserStreamState::Live, ..Default::default() }; - let stream_event = - publish_stream_event(&new_stream, &self.client, &self.keys, &self.public_url).await?; + let stream_event = self.publish_stream_event(&new_stream, &user.pubkey).await?; new_stream.event = Some(stream_event.as_json()); self.db.insert_stream(&new_stream).await?; @@ -158,13 +265,17 @@ impl Overseer for ZapStreamOverseer { self.keys.public_key.to_hex(), pipeline_id ); - let mut n94 = blob_to_event_builder(blob)?.add_tags(Tag::parse(&["a", &a_tag])); + let mut n94 = self.blob_to_event_builder(blob)?.add_tags([ + Tag::parse(&["a", &a_tag])?, + Tag::parse(&["d", variant_id.to_string().as_str()])?, + Tag::parse(&["duration", duration.to_string().as_str()])?, + ]); for b in blobs.iter().skip(1) { n94 = n94.add_tags(Tag::parse(&["url", &b.url])); } let n94 = n94.sign_with_keys(&self.keys)?; self.client.send_event(n94).await?; - info!("Published N94 segment for {}", a_tag); + info!("Published N94 segment to {}", blob.url); } Ok(()) @@ -181,103 +292,3 @@ impl Overseer for ZapStreamOverseer { Ok(()) } } - -fn stream_to_event_builder(this: &UserStream, keys: &Keys) -> Result { - let mut tags = vec![ - Tag::parse(&["d".to_string(), this.id.to_string()])?, - Tag::parse(&["status".to_string(), this.state.to_string()])?, - Tag::parse(&["starts".to_string(), this.starts.timestamp().to_string()])?, - ]; - if let Some(ref ends) = this.ends { - tags.push(Tag::parse(&[ - "ends".to_string(), - ends.timestamp().to_string(), - ])?); - } - if let Some(ref title) = this.title { - tags.push(Tag::parse(&["title".to_string(), title.to_string()])?); - } - if let Some(ref summary) = this.summary { - tags.push(Tag::parse(&["summary".to_string(), summary.to_string()])?); - } - if let Some(ref image) = this.image { - tags.push(Tag::parse(&["image".to_string(), image.to_string()])?); - } - if let Some(ref thumb) = this.thumb { - tags.push(Tag::parse(&["thumb".to_string(), thumb.to_string()])?); - } - if let Some(ref content_warning) = this.content_warning { - tags.push(Tag::parse(&[ - "content_warning".to_string(), - content_warning.to_string(), - ])?); - } - if let Some(ref goal) = this.goal { - tags.push(Tag::parse(&["goal".to_string(), goal.to_string()])?); - } - if let Some(ref pinned) = this.pinned { - tags.push(Tag::parse(&["pinned".to_string(), pinned.to_string()])?); - } - if let Some(ref tags_csv) = this.tags { - for tag in tags_csv.split(',') { - tags.push(Tag::parse(&["t".to_string(), tag.to_string()])?); - } - } - - let kind = Kind::from(STREAM_EVENT_KIND); - let coord = Coordinate::new(kind, keys.public_key).identifier(this.id); - tags.push(Tag::parse(&[ - "alt", - &format!("Watch live on https://zap.stream/{}", coord.to_bech32()?), - ])?); - Ok(EventBuilder::new(kind, "", tags)) -} - -fn stream_url_mapping(this: &UserStream, public_url: &str) -> Result { - let u: Url = public_url.parse()?; - // hls muxer always writes the master playlist like this - Ok(u.join(&format!("/{}/live.m3u8", this.id))?.to_string()) -} - -fn image_url_mapping(this: &UserStream, public_url: &str) -> Result { - let u: Url = public_url.parse()?; - // pipeline always writes a thumbnail like this - Ok(u.join(&format!("/{}/thumb.webp", this.id))?.to_string()) -} - -async fn publish_stream_event( - this: &UserStream, - client: &Client, - keys: &Keys, - public_url: &str, -) -> Result { - let ev = stream_to_event_builder(this, keys)? - .add_tags([ - Tag::parse(&["streaming", stream_url_mapping(this, public_url)?.as_str()])?, - Tag::parse(&["image", image_url_mapping(this, public_url)?.as_str()])?, - ]) - .sign(&client.signer().await?) - .await?; - client.send_event(ev.clone()).await?; - Ok(ev) -} - -fn blob_to_event_builder(this: &BlobDescriptor) -> Result { - let tags = if let Some(tags) = this.nip94.as_ref() { - tags.iter() - .map_while(|(k, v)| Tag::parse(&[k, v]).ok()) - .collect() - } else { - let mut tags = vec![ - Tag::parse(&["x", &this.sha256])?, - Tag::parse(&["url", &this.url])?, - Tag::parse(&["size", &this.size.to_string()])?, - ]; - if let Some(m) = this.mime_type.as_ref() { - tags.push(Tag::parse(&["m", m])?) - } - tags - }; - - Ok(EventBuilder::new(Kind::FileMetadata, "", tags)) -} diff --git a/src/pipeline/runner.rs b/src/pipeline/runner.rs index a53c264..e22b3d3 100644 --- a/src/pipeline/runner.rs +++ b/src/pipeline/runner.rs @@ -10,6 +10,7 @@ use crate::egress::hls::HlsEgress; use crate::egress::recorder::RecorderEgress; use crate::egress::{Egress, EgressResult}; use crate::ingress::ConnectionInfo; +use crate::mux::SegmentType; use crate::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer}; use crate::pipeline::{EgressType, PipelineConfig}; use crate::variant::{StreamMapping, VariantStream}; @@ -359,7 +360,8 @@ impl PipelineRunner { }); match e { EgressType::HLS(_) => { - let hls = HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders)?; + let hls = + HlsEgress::new(&cfg.id, &self.out_dir, 2.0, encoders, SegmentType::MPEGTS)?; self.egress.push(Box::new(hls)); } EgressType::Recorder(_) => { diff --git a/zap-stream-db/migrations/20241115120541_init.sql b/zap-stream-db/migrations/20241115120541_init.sql index 669b3e1..08c69a4 100644 --- a/zap-stream-db/migrations/20241115120541_init.sql +++ b/zap-stream-db/migrations/20241115120541_init.sql @@ -8,7 +8,8 @@ create table user tos_accepted timestamp, stream_key text not null default uuid(), is_admin bool not null default false, - is_blocked bool not null default false + is_blocked bool not null default false, + recording bool not null default false ); create unique index ix_user_pubkey on user (pubkey); create table user_stream diff --git a/zap-stream-db/src/db.rs b/zap-stream-db/src/db.rs index 65cc886..678490e 100644 --- a/zap-stream-db/src/db.rs +++ b/zap-stream-db/src/db.rs @@ -1,4 +1,4 @@ -use crate::UserStream; +use crate::{User, UserStream}; use anyhow::Result; use sqlx::{MySqlPool, Row}; use uuid::Uuid; @@ -33,6 +33,15 @@ impl ZapStreamDb { .map(|r| r.try_get(0).unwrap())) } + /// Get user by id + pub async fn get_user(&self, uid: u64) -> Result { + Ok(sqlx::query_as("select * from user where id = ?") + .bind(uid) + .fetch_one(&self.db) + .await + .map_err(anyhow::Error::new)?) + } + pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result { let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id") .bind(pubkey.as_slice()) diff --git a/zap-stream-db/src/model.rs b/zap-stream-db/src/model.rs index 8971239..6250149 100644 --- a/zap-stream-db/src/model.rs +++ b/zap-stream-db/src/model.rs @@ -5,14 +5,24 @@ use uuid::Uuid; #[derive(Debug, Clone, FromRow)] pub struct User { + /// Database ID for this uer pub id: u64, - pub pubkey: [u8; 32], + /// Nostr pubkey of this user + pub pubkey: Vec, + /// Timestamp when this user first used the service pub created: DateTime, + /// Current balance in milli-sats pub balance: i64, - pub tos_accepted: DateTime, + /// When the TOS was accepted + pub tos_accepted: Option>, + /// Primary stream key pub stream_key: String, + /// If the user is an admin pub is_admin: bool, + /// If the user is blocked from streaming pub is_blocked: bool, + /// Streams are recorded + pub recording: bool, } #[derive(Default, Debug, Clone, Type)]