mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-20 05:30:29 +00:00
This commit is contained in:
@ -43,4 +43,5 @@ pretty_env_logger = "0.5.0"
|
||||
clap = { version = "4.5.16", features = ["derive"] }
|
||||
futures-util = "0.3.31"
|
||||
matchit = "0.8.4"
|
||||
mustache = "0.9.0"
|
||||
mustache = "0.9.0"
|
||||
http-range-header = "0.4.2"
|
||||
|
@ -5,6 +5,7 @@ endpoints:
|
||||
- "rtmp://127.0.0.1:3336"
|
||||
- "srt://127.0.0.1:3335"
|
||||
- "tcp://127.0.0.1:3334"
|
||||
- "test-pattern://"
|
||||
|
||||
# Public hostname which points to the IP address used to listen for all [endpoints]
|
||||
endpoints_public_hostname: "localhost"
|
||||
|
@ -1,25 +1,36 @@
|
||||
use crate::api::Api;
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use base64::Engine;
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::TryStreamExt;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full, StreamBody};
|
||||
use http_range_header::{
|
||||
parse_range_header, EndPosition, StartPosition, SyntacticallyCorrectRange,
|
||||
};
|
||||
use hyper::body::{Frame, Incoming};
|
||||
use hyper::http::response::Builder;
|
||||
use hyper::service::Service;
|
||||
use hyper::{Method, Request, Response};
|
||||
use log::error;
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use log::{error, warn};
|
||||
use matchit::Router;
|
||||
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
|
||||
use serde::Serialize;
|
||||
use std::future::Future;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::pin::{pin, Pin};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use uuid::Uuid;
|
||||
use zap_stream_core::egress::hls::HlsEgress;
|
||||
use zap_stream_core::viewer::ViewerTracker;
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
@ -46,6 +57,14 @@ pub struct CachedStreams {
|
||||
cached_at: Instant,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum HttpServerPath {
|
||||
Index,
|
||||
HlsMasterPlaylist,
|
||||
HlsVariantPlaylist,
|
||||
HlsSegmentFile,
|
||||
}
|
||||
|
||||
pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -54,6 +73,7 @@ pub struct HttpServer {
|
||||
files_dir: PathBuf,
|
||||
api: Api,
|
||||
stream_cache: StreamCache,
|
||||
router: Router<HttpServerPath>,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
@ -63,18 +83,37 @@ impl HttpServer {
|
||||
api: Api,
|
||||
stream_cache: StreamCache,
|
||||
) -> Self {
|
||||
let mut router = Router::new();
|
||||
router.insert("/", HttpServerPath::Index).unwrap();
|
||||
router.insert("/index.html", HttpServerPath::Index).unwrap();
|
||||
router
|
||||
.insert(
|
||||
format!("/{}/{{stream}}/live.m3u8", HlsEgress::PATH),
|
||||
HttpServerPath::HlsMasterPlaylist,
|
||||
)
|
||||
.unwrap();
|
||||
router
|
||||
.insert(
|
||||
format!("/{}/{{stream}}/{{variant}}/live.m3u8", HlsEgress::PATH),
|
||||
HttpServerPath::HlsVariantPlaylist,
|
||||
)
|
||||
.unwrap();
|
||||
router
|
||||
.insert(
|
||||
format!("/{}/{{stream}}/{{variant}}/{{seg}}.ts", HlsEgress::PATH),
|
||||
HttpServerPath::HlsSegmentFile,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
index_template,
|
||||
files_dir,
|
||||
api,
|
||||
stream_cache,
|
||||
router,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_cached_or_fetch_streams(&self) -> Result<IndexTemplateData> {
|
||||
Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await
|
||||
}
|
||||
|
||||
async fn get_cached_or_fetch_streams_static(
|
||||
stream_cache: &StreamCache,
|
||||
api: &Api,
|
||||
@ -100,13 +139,14 @@ impl HttpServer {
|
||||
.into_iter()
|
||||
.map(|stream| {
|
||||
let viewer_count = api.get_viewer_count(&stream.id);
|
||||
// TODO: remove HLS assumption
|
||||
StreamData {
|
||||
id: stream.id.clone(),
|
||||
title: stream
|
||||
.title
|
||||
.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
|
||||
summary: stream.summary,
|
||||
live_url: format!("/{}/live.m3u8", stream.id),
|
||||
live_url: format!("/{}/{}/live.m3u8", HlsEgress::PATH, stream.id),
|
||||
viewer_count: if viewer_count > 0 {
|
||||
Some(viewer_count as _)
|
||||
} else {
|
||||
@ -141,31 +181,97 @@ impl HttpServer {
|
||||
Ok(template_data)
|
||||
}
|
||||
|
||||
async fn render_index(&self) -> Result<String> {
|
||||
let template_data = self.get_cached_or_fetch_streams().await?;
|
||||
let template = mustache::compile_str(&self.index_template)?;
|
||||
let rendered = template.render_to_string(&template_data)?;
|
||||
Ok(rendered)
|
||||
async fn handle_index(
|
||||
api: Api,
|
||||
stream_cache: StreamCache,
|
||||
template: String,
|
||||
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||
// Compile template outside async move for better performance
|
||||
let template = match mustache::compile_str(&template) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
error!("Failed to compile template: {}", e);
|
||||
return Ok(Self::base_response().status(500).body(BoxBody::default())?);
|
||||
}
|
||||
};
|
||||
|
||||
let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
|
||||
|
||||
match template_data {
|
||||
Ok(data) => match template.render_to_string(&data) {
|
||||
Ok(index_html) => Ok(Self::base_response()
|
||||
.header("content-type", "text/html")
|
||||
.body(
|
||||
Full::new(Bytes::from(index_html))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?),
|
||||
Err(e) => {
|
||||
error!("Failed to render template: {}", e);
|
||||
Ok(Self::base_response().status(500).body(BoxBody::default())?)
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to fetch template data: {}", e);
|
||||
Ok(Self::base_response().status(500).body(BoxBody::default())?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_hls_playlist(
|
||||
api: &Api,
|
||||
async fn handle_hls_segment(
|
||||
req: &Request<Incoming>,
|
||||
playlist_path: &PathBuf,
|
||||
segment_path: PathBuf,
|
||||
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||
// Extract stream ID from path (e.g., /uuid/live.m3u8 -> uuid)
|
||||
let path_parts: Vec<&str> = req
|
||||
.uri()
|
||||
.path()
|
||||
.trim_start_matches('/')
|
||||
.split('/')
|
||||
.collect();
|
||||
if path_parts.len() < 2 {
|
||||
return Ok(Response::builder().status(404).body(BoxBody::default())?);
|
||||
let mut response = Self::base_response().header("accept-ranges", "bytes");
|
||||
|
||||
if let Some(r) = req.headers().get("range") {
|
||||
if let Ok(ranges) = parse_range_header(r.to_str()?) {
|
||||
if ranges.ranges.len() > 1 {
|
||||
warn!("Multipart ranges are not supported, fallback to non-range request");
|
||||
Self::path_to_response(segment_path).await
|
||||
} else {
|
||||
let file = File::open(&segment_path).await?;
|
||||
let metadata = file.metadata().await?;
|
||||
let single_range = ranges.ranges.first().unwrap();
|
||||
let range = match RangeBody::get_range(metadata.len(), single_range) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!("Failed to get range: {}", e);
|
||||
return Ok(response
|
||||
.status(StatusCode::RANGE_NOT_SATISFIABLE)
|
||||
.body(BoxBody::default())?);
|
||||
}
|
||||
};
|
||||
let r_body = RangeBody::new(file, metadata.len(), range.clone());
|
||||
|
||||
response = response.status(StatusCode::PARTIAL_CONTENT);
|
||||
let headers = r_body.get_headers();
|
||||
for (k, v) in headers {
|
||||
response = response.header(k, v);
|
||||
}
|
||||
let f_stream = ReaderStream::new(r_body);
|
||||
let body = StreamBody::new(
|
||||
f_stream
|
||||
.map_ok(Frame::data)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
|
||||
)
|
||||
.boxed();
|
||||
Ok(response.body(body)?)
|
||||
}
|
||||
} else {
|
||||
Ok(Self::base_response().status(400).body(BoxBody::default())?)
|
||||
}
|
||||
} else {
|
||||
Self::path_to_response(segment_path).await
|
||||
}
|
||||
}
|
||||
|
||||
let stream_id = path_parts[0];
|
||||
|
||||
async fn handle_hls_master_playlist(
|
||||
api: Api,
|
||||
req: &Request<Incoming>,
|
||||
stream_id: &str,
|
||||
playlist_path: PathBuf,
|
||||
) -> Result<Response<BoxBody<Bytes, anyhow::Error>>, anyhow::Error> {
|
||||
// Get client IP and User-Agent for tracking
|
||||
let client_ip = Self::get_client_ip(req);
|
||||
let user_agent = req
|
||||
@ -203,17 +309,15 @@ impl HttpServer {
|
||||
let modified_content =
|
||||
Self::add_viewer_token_to_playlist(&playlist_content, &viewer_token)?;
|
||||
|
||||
Ok(Response::builder()
|
||||
let response = Self::base_response()
|
||||
.header("content-type", "application/vnd.apple.mpegurl")
|
||||
.header("server", "zap-stream-core")
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
.header("access-control-allow-methods", "HEAD, GET")
|
||||
.body(
|
||||
Full::new(Bytes::from(modified_content))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?)
|
||||
)?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_client_ip(req: &Request<Incoming>) -> String {
|
||||
@ -232,8 +336,8 @@ impl HttpServer {
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to connection IP (note: in real deployment this might be a proxy)
|
||||
"unknown".to_string()
|
||||
// use random string as IP to avoid broken view tracker due to proxying
|
||||
Uuid::new_v4().to_string()
|
||||
}
|
||||
|
||||
fn add_viewer_token_to_playlist(content: &[u8], viewer_token: &str) -> Result<String> {
|
||||
@ -271,6 +375,27 @@ impl HttpServer {
|
||||
format!("{}?vt={}", url, viewer_token)
|
||||
}
|
||||
}
|
||||
|
||||
fn base_response() -> Builder {
|
||||
Response::builder()
|
||||
.header("server", "zap-stream-core")
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
.header("access-control-allow-methods", "HEAD, GET")
|
||||
}
|
||||
|
||||
/// Get a response object for a file body
|
||||
async fn path_to_response(path: PathBuf) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
|
||||
let f = File::open(&path).await?;
|
||||
let f_stream = ReaderStream::new(f);
|
||||
let body = StreamBody::new(
|
||||
f_stream
|
||||
.map_ok(Frame::data)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e)),
|
||||
)
|
||||
.boxed();
|
||||
Ok(Self::base_response().body(body)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<Request<Incoming>> for HttpServer {
|
||||
@ -279,89 +404,50 @@ impl Service<Request<Incoming>> for HttpServer {
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
||||
// check is index.html
|
||||
if req.method() == Method::GET && req.uri().path() == "/"
|
||||
|| req.uri().path() == "/index.html"
|
||||
{
|
||||
let stream_cache = self.stream_cache.clone();
|
||||
let api = self.api.clone();
|
||||
let path = req.uri().path().to_owned();
|
||||
// request path as a file path pointing to the output directory
|
||||
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||
|
||||
// Compile template outside async move for better performance
|
||||
let template = match mustache::compile_str(&self.index_template) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
error!("Failed to compile template: {}", e);
|
||||
if let Ok(m) = self.router.at(&path) {
|
||||
match m.value {
|
||||
HttpServerPath::Index => {
|
||||
let api = self.api.clone();
|
||||
let cache = self.stream_cache.clone();
|
||||
let template = self.index_template.clone();
|
||||
return Box::pin(async move { Self::handle_index(api, cache, template).await });
|
||||
}
|
||||
HttpServerPath::HlsMasterPlaylist => {
|
||||
let api = self.api.clone();
|
||||
let stream_id = m.params.get("stream").map(|s| s.to_string());
|
||||
let file_path = dst_path.clone();
|
||||
return Box::pin(async move {
|
||||
Ok(Response::builder()
|
||||
.status(500)
|
||||
.body(BoxBody::default())
|
||||
.unwrap())
|
||||
let stream_id = stream_id.context("stream id missing")?;
|
||||
Ok(
|
||||
Self::handle_hls_master_playlist(api, &req, &stream_id, file_path)
|
||||
.await?,
|
||||
)
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return Box::pin(async move {
|
||||
// Use the existing method to get cached template data
|
||||
let template_data =
|
||||
Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
|
||||
|
||||
match template_data {
|
||||
Ok(data) => match template.render_to_string(&data) {
|
||||
Ok(index_html) => Ok(Response::builder()
|
||||
.header("content-type", "text/html")
|
||||
.header("server", "zap-stream-core")
|
||||
.body(
|
||||
Full::new(Bytes::from(index_html))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?),
|
||||
Err(e) => {
|
||||
error!("Failed to render template: {}", e);
|
||||
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to fetch template data: {}", e);
|
||||
Ok(Response::builder().status(500).body(BoxBody::default())?)
|
||||
}
|
||||
HttpServerPath::HlsVariantPlaylist => {
|
||||
// let file handler handle this one, may be used later for HLS-LL to create
|
||||
// delta updates
|
||||
}
|
||||
});
|
||||
HttpServerPath::HlsSegmentFile => {
|
||||
// handle segment file (range requests)
|
||||
let file_path = dst_path.clone();
|
||||
return Box::pin(async move {
|
||||
Ok(Self::handle_hls_segment(&req, file_path).await?)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if mapped to file
|
||||
let dst_path = self.files_dir.join(req.uri().path()[1..].to_string());
|
||||
// check if mapped to file (not handled route)
|
||||
if dst_path.exists() {
|
||||
let api_clone = self.api.clone();
|
||||
return Box::pin(async move {
|
||||
let rsp = Response::builder()
|
||||
.header("server", "zap-stream-core")
|
||||
.header("access-control-allow-origin", "*")
|
||||
.header("access-control-allow-headers", "*")
|
||||
.header("access-control-allow-methods", "HEAD, GET");
|
||||
|
||||
if req.method() == Method::HEAD {
|
||||
return Ok(rsp.body(BoxBody::default())?);
|
||||
}
|
||||
|
||||
// Handle HLS playlists with viewer tracking
|
||||
if req.uri().path().ends_with("/live.m3u8") {
|
||||
return Self::handle_hls_playlist(&api_clone, &req, &dst_path).await;
|
||||
}
|
||||
|
||||
// Handle regular files
|
||||
let f = File::open(&dst_path).await?;
|
||||
let f_stream = ReaderStream::new(f);
|
||||
let body = StreamBody::new(
|
||||
f_stream
|
||||
.map_ok(Frame::data)
|
||||
.map_err(|e| Self::Error::new(e)),
|
||||
)
|
||||
.boxed();
|
||||
Ok(rsp.body(body)?)
|
||||
});
|
||||
return Box::pin(async move { Self::path_to_response(dst_path).await });
|
||||
}
|
||||
|
||||
// otherwise handle in overseer
|
||||
// fallback to api handler
|
||||
let api = self.api.clone();
|
||||
Box::pin(async move {
|
||||
match api.handler(req).await {
|
||||
@ -466,3 +552,110 @@ pub fn check_nip98_auth(req: &Request<Incoming>, public_url: &str) -> Result<Aut
|
||||
event,
|
||||
})
|
||||
}
|
||||
|
||||
/// Range request handler over file handle
|
||||
struct RangeBody {
|
||||
file: File,
|
||||
range_start: u64,
|
||||
range_end: u64,
|
||||
current_offset: u64,
|
||||
poll_complete: bool,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
const MAX_UNBOUNDED_RANGE: u64 = 1024 * 1024;
|
||||
impl RangeBody {
|
||||
pub fn new(file: File, file_size: u64, range: Range<u64>) -> Self {
|
||||
Self {
|
||||
file,
|
||||
file_size,
|
||||
range_start: range.start,
|
||||
range_end: range.end,
|
||||
current_offset: 0,
|
||||
poll_complete: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_range(file_size: u64, header: &SyntacticallyCorrectRange) -> Result<Range<u64>> {
|
||||
let range_start = match header.start {
|
||||
StartPosition::Index(i) => {
|
||||
ensure!(i < file_size, "Range start out of range");
|
||||
i
|
||||
}
|
||||
StartPosition::FromLast(i) => file_size.saturating_sub(i),
|
||||
};
|
||||
let range_end = match header.end {
|
||||
EndPosition::Index(i) => {
|
||||
ensure!(i <= file_size, "Range end out of range");
|
||||
i
|
||||
}
|
||||
EndPosition::LastByte => {
|
||||
(file_size.saturating_sub(1)).min(range_start + MAX_UNBOUNDED_RANGE)
|
||||
}
|
||||
};
|
||||
Ok(range_start..range_end)
|
||||
}
|
||||
|
||||
pub fn get_headers(&self) -> Vec<(&'static str, String)> {
|
||||
let r_len = (self.range_end - self.range_start) + 1;
|
||||
vec![
|
||||
("content-length", r_len.to_string()),
|
||||
(
|
||||
"content-range",
|
||||
format!(
|
||||
"bytes {}-{}/{}",
|
||||
self.range_start, self.range_end, self.file_size
|
||||
),
|
||||
),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for RangeBody {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let range_start = self.range_start + self.current_offset;
|
||||
let range_len = self.range_end.saturating_sub(range_start) + 1;
|
||||
let bytes_to_read = buf.remaining().min(range_len as usize) as u64;
|
||||
|
||||
if bytes_to_read == 0 {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
// when no pending poll, seek to starting position
|
||||
if !self.poll_complete {
|
||||
let pinned = pin!(&mut self.file);
|
||||
pinned.start_seek(SeekFrom::Start(range_start))?;
|
||||
self.poll_complete = true;
|
||||
}
|
||||
|
||||
// check poll completion
|
||||
if self.poll_complete {
|
||||
let pinned = pin!(&mut self.file);
|
||||
match pinned.poll_complete(cx) {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
self.poll_complete = false;
|
||||
}
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
// Read data from the file
|
||||
let pinned = pin!(&mut self.file);
|
||||
match pinned.poll_read(cx, buf) {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
self.current_offset += bytes_to_read;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Pending => {
|
||||
self.poll_complete = true;
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
use zap_stream_core::egress::hls::HlsEgress;
|
||||
use zap_stream_core::egress::{EgressConfig, EgressSegment};
|
||||
use zap_stream_core::ingress::ConnectionInfo;
|
||||
use zap_stream_core::overseer::{IngressInfo, IngressStream, IngressStreamType, Overseer};
|
||||
@ -227,15 +228,18 @@ impl ZapStreamOverseer {
|
||||
stream: &UserStream,
|
||||
pubkey: &Vec<u8>,
|
||||
) -> Result<Event> {
|
||||
// TODO: remove assumption that HLS is enabled
|
||||
let base_streaming_path = PathBuf::from(HlsEgress::PATH).join(stream.id.to_string());
|
||||
let extra_tags = vec![
|
||||
Tag::parse(["p", hex::encode(pubkey).as_str(), "", "host"])?,
|
||||
Tag::parse([
|
||||
"streaming",
|
||||
self.map_to_stream_public_url(stream, "live.m3u8")?.as_str(),
|
||||
self.map_to_public_url(base_streaming_path.join("live.m3u8").to_str().unwrap())?
|
||||
.as_str(),
|
||||
])?,
|
||||
Tag::parse([
|
||||
"image",
|
||||
self.map_to_stream_public_url(stream, "thumb.webp")?
|
||||
self.map_to_public_url(base_streaming_path.join("thumb.webp").to_str().unwrap())?
|
||||
.as_str(),
|
||||
])?,
|
||||
Tag::parse(["service", self.map_to_public_url("api/v1")?.as_str()])?,
|
||||
@ -248,10 +252,6 @@ impl ZapStreamOverseer {
|
||||
Ok(ev)
|
||||
}
|
||||
|
||||
fn map_to_stream_public_url(&self, stream: &UserStream, path: &str) -> Result<String> {
|
||||
self.map_to_public_url(&format!("{}/{}", stream.id, path))
|
||||
}
|
||||
|
||||
fn map_to_public_url(&self, path: &str) -> Result<String> {
|
||||
let u: Url = self.public_url.parse()?;
|
||||
Ok(u.join(path)?.to_string())
|
||||
@ -433,7 +433,7 @@ impl Overseer for ZapStreamOverseer {
|
||||
.tick_stream(pipeline_id, stream.user_id, duration, cost)
|
||||
.await?;
|
||||
if bal <= 0 {
|
||||
bail!("Not enough balance");
|
||||
bail!("Balance has run out");
|
||||
}
|
||||
|
||||
// Update last segment time for this stream
|
||||
@ -514,6 +514,7 @@ impl Overseer for ZapStreamOverseer {
|
||||
viewer_states.remove(&stream.id);
|
||||
|
||||
stream.state = UserStreamState::Ended;
|
||||
stream.ends = Some(Utc::now());
|
||||
let event = self.publish_stream_event(&stream, &user.pubkey).await?;
|
||||
stream.event = Some(event.as_json());
|
||||
self.db.update_stream(&stream).await?;
|
||||
|
Reference in New Issue
Block a user