mirror of
https://github.com/v0l/zap-stream-core.git
synced 2025-06-19 21:21:01 +00:00
Update index.html with stream links and HLS.js player using mustache templating (#6)
* Initial plan for issue * Add mustache templating and update index.html with stream links Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Add documentation and update TODO for completed stream index feature Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Implement serializable structs and stream caching to address review feedback Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Fix stream cache sharing and remove duplicated caching code Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> * Address feedback: remove docs file, revert TODO changes, optimize template compilation Co-authored-by: v0l <1172179+v0l@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: v0l <1172179+v0l@users.noreply.github.com>
This commit is contained in:
@ -658,6 +658,16 @@ impl Api {
|
||||
pub fn get_viewer_count(&self, stream_id: &str) -> usize {
|
||||
self.overseer.viewer_tracker().get_viewer_count(stream_id)
|
||||
}
|
||||
|
||||
/// Get active streams from database
|
||||
pub async fn get_active_streams(&self) -> Result<Vec<UserStream>> {
|
||||
self.db.list_live_streams().await
|
||||
}
|
||||
|
||||
/// Get the public URL from settings
|
||||
pub fn get_public_url(&self) -> String {
|
||||
self.settings.public_url.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
|
@ -11,29 +11,129 @@ use hyper::service::Service;
|
||||
use hyper::{Method, Request, Response};
|
||||
use log::error;
|
||||
use nostr_sdk::{serde_json, Alphabet, Event, Kind, PublicKey, SingleLetterTag, TagKind};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use tokio::fs::File;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs::File;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use zap_stream_core::viewer::ViewerTracker;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StreamData {
|
||||
id: String,
|
||||
title: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
summary: Option<String>,
|
||||
live_url: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
viewer_count: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct IndexTemplateData {
|
||||
public_url: String,
|
||||
has_streams: bool,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
streams: Vec<StreamData>,
|
||||
}
|
||||
|
||||
struct CachedStreams {
|
||||
data: IndexTemplateData,
|
||||
cached_at: Instant,
|
||||
}
|
||||
|
||||
pub type StreamCache = Arc<RwLock<Option<CachedStreams>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpServer {
|
||||
index: String,
|
||||
index_template: String,
|
||||
files_dir: PathBuf,
|
||||
api: Api,
|
||||
stream_cache: StreamCache,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(index: String, files_dir: PathBuf, api: Api) -> Self {
|
||||
pub fn new(index_template: String, files_dir: PathBuf, api: Api, stream_cache: StreamCache) -> Self {
|
||||
Self {
|
||||
index,
|
||||
index_template,
|
||||
files_dir,
|
||||
api,
|
||||
stream_cache,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_cached_or_fetch_streams(&self) -> Result<IndexTemplateData> {
|
||||
Self::get_cached_or_fetch_streams_static(&self.stream_cache, &self.api).await
|
||||
}
|
||||
|
||||
async fn get_cached_or_fetch_streams_static(stream_cache: &StreamCache, api: &Api) -> Result<IndexTemplateData> {
|
||||
const CACHE_DURATION: Duration = Duration::from_secs(60); // 1 minute
|
||||
|
||||
// Check if we have valid cached data
|
||||
{
|
||||
let cache = stream_cache.read().await;
|
||||
if let Some(ref cached) = *cache {
|
||||
if cached.cached_at.elapsed() < CACHE_DURATION {
|
||||
return Ok(cached.data.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cache is expired or missing, fetch new data
|
||||
let active_streams = api.get_active_streams().await?;
|
||||
let public_url = api.get_public_url();
|
||||
|
||||
let template_data = if !active_streams.is_empty() {
|
||||
let streams: Vec<StreamData> = active_streams
|
||||
.into_iter()
|
||||
.map(|stream| {
|
||||
let viewer_count = api.get_viewer_count(&stream.id);
|
||||
StreamData {
|
||||
id: stream.id.clone(),
|
||||
title: stream.title.unwrap_or_else(|| format!("Stream {}", &stream.id[..8])),
|
||||
summary: stream.summary,
|
||||
live_url: format!("/{}/live.m3u8", stream.id),
|
||||
viewer_count: if viewer_count > 0 { Some(viewer_count) } else { None },
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
IndexTemplateData {
|
||||
public_url,
|
||||
has_streams: true,
|
||||
streams,
|
||||
}
|
||||
} else {
|
||||
IndexTemplateData {
|
||||
public_url,
|
||||
has_streams: false,
|
||||
streams: Vec::new(),
|
||||
}
|
||||
};
|
||||
|
||||
// Update cache
|
||||
{
|
||||
let mut cache = stream_cache.write().await;
|
||||
*cache = Some(CachedStreams {
|
||||
data: template_data.clone(),
|
||||
cached_at: Instant::now(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(template_data)
|
||||
}
|
||||
|
||||
async fn render_index(&self) -> Result<String> {
|
||||
let template_data = self.get_cached_or_fetch_streams().await?;
|
||||
let template = mustache::compile_str(&self.index_template)?;
|
||||
let rendered = template.render_to_string(&template_data)?;
|
||||
Ok(rendered)
|
||||
}
|
||||
|
||||
async fn handle_hls_playlist(
|
||||
api: &Api,
|
||||
req: &Request<Incoming>,
|
||||
@ -162,16 +262,52 @@ impl Service<Request<Incoming>> for HttpServer {
|
||||
if req.method() == Method::GET && req.uri().path() == "/"
|
||||
|| req.uri().path() == "/index.html"
|
||||
{
|
||||
let index = self.index.clone();
|
||||
let stream_cache = self.stream_cache.clone();
|
||||
let api = self.api.clone();
|
||||
|
||||
// Compile template outside async move for better performance
|
||||
let template = match mustache::compile_str(&self.index_template) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
error!("Failed to compile template: {}", e);
|
||||
return Box::pin(async move {
|
||||
Ok(Response::builder()
|
||||
.status(500)
|
||||
.body(BoxBody::default()).unwrap())
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return Box::pin(async move {
|
||||
Ok(Response::builder()
|
||||
.header("content-type", "text/html")
|
||||
.header("server", "zap-stream-core")
|
||||
.body(
|
||||
Full::new(Bytes::from(index))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?)
|
||||
// Use the existing method to get cached template data
|
||||
let template_data = Self::get_cached_or_fetch_streams_static(&stream_cache, &api).await;
|
||||
|
||||
match template_data {
|
||||
Ok(data) => {
|
||||
match template.render_to_string(&data) {
|
||||
Ok(index_html) => Ok(Response::builder()
|
||||
.header("content-type", "text/html")
|
||||
.header("server", "zap-stream-core")
|
||||
.body(
|
||||
Full::new(Bytes::from(index_html))
|
||||
.map_err(|e| match e {})
|
||||
.boxed(),
|
||||
)?),
|
||||
Err(e) => {
|
||||
error!("Failed to render template: {}", e);
|
||||
Ok(Response::builder()
|
||||
.status(500)
|
||||
.body(BoxBody::default())?)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to fetch template data: {}", e);
|
||||
Ok(Response::builder()
|
||||
.status(500)
|
||||
.body(BoxBody::default())?)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
@ -23,7 +24,7 @@ use zap_stream_core::ingress::srt;
|
||||
use zap_stream_core::ingress::test;
|
||||
|
||||
use crate::api::Api;
|
||||
use crate::http::HttpServer;
|
||||
use crate::http::{HttpServer, StreamCache};
|
||||
use crate::monitor::BackgroundMonitor;
|
||||
use crate::overseer::ZapStreamOverseer;
|
||||
use crate::settings::Settings;
|
||||
@ -69,11 +70,13 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
let http_addr: SocketAddr = settings.listen_http.parse()?;
|
||||
let index_html = include_str!("../index.html").replace("%%PUBLIC_URL%%", &settings.public_url);
|
||||
let index_template = include_str!("../index.html");
|
||||
|
||||
let api = Api::new(overseer.clone(), settings.clone());
|
||||
// Create shared stream cache
|
||||
let stream_cache: StreamCache = Arc::new(RwLock::new(None));
|
||||
// HTTP server
|
||||
let server = HttpServer::new(index_html, PathBuf::from(settings.output_dir), api);
|
||||
let server = HttpServer::new(index_template.to_string(), PathBuf::from(settings.output_dir), api, stream_cache);
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(&http_addr).await?;
|
||||
|
||||
|
Reference in New Issue
Block a user