diff --git a/NostrStreamer/Database/UserStream.cs b/NostrStreamer/Database/UserStream.cs new file mode 100644 index 0000000..8e96f18 --- /dev/null +++ b/NostrStreamer/Database/UserStream.cs @@ -0,0 +1,6 @@ +namespace NostrStreamer.Database; + +public class UserStream +{ + +} diff --git a/NostrStreamer/Extensions.cs b/NostrStreamer/Extensions.cs index 1ec02ed..b0c4684 100644 --- a/NostrStreamer/Extensions.cs +++ b/NostrStreamer/Extensions.cs @@ -1,3 +1,5 @@ +using Newtonsoft.Json; +using Nostr.Client.Json; using Nostr.Client.Messages; using NostrStreamer.Database; @@ -5,4 +7,8 @@ namespace NostrStreamer; public static class Extensions { + public static NostrEvent? GetNostrEvent(this User user) + { + return user.Event != default ? JsonConvert.DeserializeObject(user.Event, NostrSerializer.Settings) : null; + } } diff --git a/NostrStreamer/Program.cs b/NostrStreamer/Program.cs index 67180e9..782a50a 100644 --- a/NostrStreamer/Program.cs +++ b/NostrStreamer/Program.cs @@ -49,6 +49,7 @@ internal static class Program // streaming services services.AddTransient(); services.AddTransient(); + services.AddHostedService(); // lnd services services.AddSingleton(); diff --git a/NostrStreamer/Services/BackgroundStreamManager.cs b/NostrStreamer/Services/BackgroundStreamManager.cs new file mode 100644 index 0000000..0562d26 --- /dev/null +++ b/NostrStreamer/Services/BackgroundStreamManager.cs @@ -0,0 +1,42 @@ +namespace NostrStreamer.Services; + +public class BackgroundStreamManager : BackgroundService +{ + private readonly ILogger _logger; + private readonly IServiceScopeFactory _scopeFactory; + + public BackgroundStreamManager(ILogger logger, IServiceScopeFactory scopeFactory) + { + _logger = logger; + _scopeFactory = scopeFactory; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + using var scope = _scopeFactory.CreateScope(); + + var streamManager = scope.ServiceProvider.GetRequiredService(); + var srsApi = scope.ServiceProvider.GetRequiredService(); + + var clients = await srsApi.ListClients(); + var streams = clients.Where(a => !a.Publish).GroupBy(a => a.Url); + foreach (var stream in streams) + { + var viewers = stream.Count(); + var streamKey = stream.Key.Split("/").Last(); + await streamManager.UpdateViewers(streamKey, viewers); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to run"); + } + + await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); + } + } +} diff --git a/NostrStreamer/Services/SrsApi.cs b/NostrStreamer/Services/SrsApi.cs index 6a54e94..c21a478 100644 --- a/NostrStreamer/Services/SrsApi.cs +++ b/NostrStreamer/Services/SrsApi.cs @@ -18,6 +18,12 @@ public class SrsApi return rsp!.Streams; } + public async Task> ListClients() + { + var rsp = await _client.GetFromJsonAsync("/api/v1/clients"); + return rsp!.Clients; + } + public async Task KickClient(string clientId) { await _client.SendAsync(new HttpRequestMessage(HttpMethod.Delete, $"/api/v1/clients/{clientId}")); @@ -128,3 +134,55 @@ public class Video [JsonProperty("height")] public int? Height { get; set; } } + +public class Client +{ + [JsonProperty("id")] + public string Id { get; set; } + + [JsonProperty("vhost")] + public string Vhost { get; set; } + + [JsonProperty("stream")] + public string Stream { get; set; } + + [JsonProperty("ip")] + public string Ip { get; set; } + + [JsonProperty("pageUrl")] + public string PageUrl { get; set; } + + [JsonProperty("swfUrl")] + public string SwfUrl { get; set; } + + [JsonProperty("tcUrl")] + public string TcUrl { get; set; } + + [JsonProperty("url")] + public string Url { get; set; } + + [JsonProperty("type")] + public string Type { get; set; } + + [JsonProperty("publish")] + public bool Publish { get; set; } + + [JsonProperty("alive")] + public double? Alive { get; set; } + + [JsonProperty("kbps")] + public Kbps Kbps { get; set; } +} + +public class ListClientsResponse +{ + [JsonProperty("code")] + public int? Code { get; set; } + + [JsonProperty("server")] + public string Server { get; set; } + + [JsonProperty("clients")] + public List Clients { get; set; } +} + diff --git a/NostrStreamer/Services/StreamManager.cs b/NostrStreamer/Services/StreamManager.cs index 5aecafc..49ea84f 100644 --- a/NostrStreamer/Services/StreamManager.cs +++ b/NostrStreamer/Services/StreamManager.cs @@ -92,15 +92,27 @@ public class StreamManager user.Image = image; user.Tags = tags != null ? string.Join(",", tags) : null; - var existingEvent = user.Event != default ? JsonConvert.DeserializeObject(user.Event, NostrSerializer.Settings) : null; - var ev = CreateStreamEvent(user, existingEvent?.Tags?.FindFirstTagValue("status") ?? "ended"); + var ev = CreateStreamEvent(user); user.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); await _db.SaveChangesAsync(); - _nostr.Send(new NostrEventRequest(ev)); } + public async Task UpdateViewers(string streamKey, int viewers) + { + var user = await GetUserFromStreamKey(streamKey); + if (user == default) throw new Exception("No stream key found"); + + var existingEvent = user.GetNostrEvent(); + var oldViewers = existingEvent?.Tags?.FindFirstTagValue("viewers"); + if (string.IsNullOrEmpty(oldViewers) || int.Parse(oldViewers) != viewers) + { + var ev = CreateStreamEvent(user, viewers: viewers); + await PublishEvent(user, ev); + } + } + private async Task PublishEvent(User user, NostrEvent ev) { await _db.Users @@ -126,19 +138,32 @@ public class StreamManager return ev.Sign(pk); } - private NostrEvent CreateStreamEvent(User user, string state) + private NostrEvent CreateStreamEvent(User user, string? state = null, int? viewers = null) { - var tags = new List() + var existingEvent = user.GetNostrEvent(); + var status = state ?? existingEvent?.Tags?.FindFirstTagValue("status") ?? "ended"; + + var tags = new List { new("d", user.PubKey), new("title", user.Title ?? ""), new("summary", user.Summary ?? ""), new("streaming", GetStreamUrl(user)), new("image", user.Image ?? ""), - new("status", state), + new("status", status), new("p", user.PubKey, "", "host") }; + if (status == "live") + { + var starts = existingEvent?.Tags?.FindFirstTagValue("starts") ?? DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(); + tags.Add(new ("starts", starts)); + tags.Add( + new("current_participants", + (viewers.HasValue ? viewers.ToString() : null) ?? + existingEvent?.Tags?.FindFirstTagValue("current_participants") ?? "0")); + } + foreach (var tag in !string.IsNullOrEmpty(user.Tags) ? user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty()) {