Viewer counts / start time

This commit is contained in:
2023-07-06 12:58:47 +01:00
parent 2592c824ff
commit 13f788a311
6 changed files with 144 additions and 6 deletions

View File

@ -0,0 +1,6 @@
namespace NostrStreamer.Database;
public class UserStream
{
}

View File

@ -1,3 +1,5 @@
using Newtonsoft.Json;
using Nostr.Client.Json;
using Nostr.Client.Messages; using Nostr.Client.Messages;
using NostrStreamer.Database; using NostrStreamer.Database;
@ -5,4 +7,8 @@ namespace NostrStreamer;
public static class Extensions public static class Extensions
{ {
public static NostrEvent? GetNostrEvent(this User user)
{
return user.Event != default ? JsonConvert.DeserializeObject<NostrEvent>(user.Event, NostrSerializer.Settings) : null;
}
} }

View File

@ -49,6 +49,7 @@ internal static class Program
// streaming services // streaming services
services.AddTransient<StreamManager>(); services.AddTransient<StreamManager>();
services.AddTransient<SrsApi>(); services.AddTransient<SrsApi>();
services.AddHostedService<BackgroundStreamManager>();
// lnd services // lnd services
services.AddSingleton<LndNode>(); services.AddSingleton<LndNode>();

View File

@ -0,0 +1,42 @@
namespace NostrStreamer.Services;
public class BackgroundStreamManager : BackgroundService
{
private readonly ILogger<BackgroundStreamManager> _logger;
private readonly IServiceScopeFactory _scopeFactory;
public BackgroundStreamManager(ILogger<BackgroundStreamManager> logger, IServiceScopeFactory scopeFactory)
{
_logger = logger;
_scopeFactory = scopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _scopeFactory.CreateScope();
var streamManager = scope.ServiceProvider.GetRequiredService<StreamManager>();
var srsApi = scope.ServiceProvider.GetRequiredService<SrsApi>();
var clients = await srsApi.ListClients();
var streams = clients.Where(a => !a.Publish).GroupBy(a => a.Url);
foreach (var stream in streams)
{
var viewers = stream.Count();
var streamKey = stream.Key.Split("/").Last();
await streamManager.UpdateViewers(streamKey, viewers);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to run");
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
}

View File

@ -18,6 +18,12 @@ public class SrsApi
return rsp!.Streams; return rsp!.Streams;
} }
public async Task<List<Client>> ListClients()
{
var rsp = await _client.GetFromJsonAsync<ListClientsResponse>("/api/v1/clients");
return rsp!.Clients;
}
public async Task KickClient(string clientId) public async Task KickClient(string clientId)
{ {
await _client.SendAsync(new HttpRequestMessage(HttpMethod.Delete, $"/api/v1/clients/{clientId}")); await _client.SendAsync(new HttpRequestMessage(HttpMethod.Delete, $"/api/v1/clients/{clientId}"));
@ -128,3 +134,55 @@ public class Video
[JsonProperty("height")] [JsonProperty("height")]
public int? Height { get; set; } public int? Height { get; set; }
} }
public class Client
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("vhost")]
public string Vhost { get; set; }
[JsonProperty("stream")]
public string Stream { get; set; }
[JsonProperty("ip")]
public string Ip { get; set; }
[JsonProperty("pageUrl")]
public string PageUrl { get; set; }
[JsonProperty("swfUrl")]
public string SwfUrl { get; set; }
[JsonProperty("tcUrl")]
public string TcUrl { get; set; }
[JsonProperty("url")]
public string Url { get; set; }
[JsonProperty("type")]
public string Type { get; set; }
[JsonProperty("publish")]
public bool Publish { get; set; }
[JsonProperty("alive")]
public double? Alive { get; set; }
[JsonProperty("kbps")]
public Kbps Kbps { get; set; }
}
public class ListClientsResponse
{
[JsonProperty("code")]
public int? Code { get; set; }
[JsonProperty("server")]
public string Server { get; set; }
[JsonProperty("clients")]
public List<Client> Clients { get; set; }
}

View File

@ -92,15 +92,27 @@ public class StreamManager
user.Image = image; user.Image = image;
user.Tags = tags != null ? string.Join(",", tags) : null; user.Tags = tags != null ? string.Join(",", tags) : null;
var existingEvent = user.Event != default ? JsonConvert.DeserializeObject<NostrEvent>(user.Event, NostrSerializer.Settings) : null; var ev = CreateStreamEvent(user);
var ev = CreateStreamEvent(user, existingEvent?.Tags?.FindFirstTagValue("status") ?? "ended");
user.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); user.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings);
await _db.SaveChangesAsync(); await _db.SaveChangesAsync();
_nostr.Send(new NostrEventRequest(ev)); _nostr.Send(new NostrEventRequest(ev));
} }
public async Task UpdateViewers(string streamKey, int viewers)
{
var user = await GetUserFromStreamKey(streamKey);
if (user == default) throw new Exception("No stream key found");
var existingEvent = user.GetNostrEvent();
var oldViewers = existingEvent?.Tags?.FindFirstTagValue("viewers");
if (string.IsNullOrEmpty(oldViewers) || int.Parse(oldViewers) != viewers)
{
var ev = CreateStreamEvent(user, viewers: viewers);
await PublishEvent(user, ev);
}
}
private async Task PublishEvent(User user, NostrEvent ev) private async Task PublishEvent(User user, NostrEvent ev)
{ {
await _db.Users await _db.Users
@ -126,19 +138,32 @@ public class StreamManager
return ev.Sign(pk); return ev.Sign(pk);
} }
private NostrEvent CreateStreamEvent(User user, string state) private NostrEvent CreateStreamEvent(User user, string? state = null, int? viewers = null)
{ {
var tags = new List<NostrEventTag>() var existingEvent = user.GetNostrEvent();
var status = state ?? existingEvent?.Tags?.FindFirstTagValue("status") ?? "ended";
var tags = new List<NostrEventTag>
{ {
new("d", user.PubKey), new("d", user.PubKey),
new("title", user.Title ?? ""), new("title", user.Title ?? ""),
new("summary", user.Summary ?? ""), new("summary", user.Summary ?? ""),
new("streaming", GetStreamUrl(user)), new("streaming", GetStreamUrl(user)),
new("image", user.Image ?? ""), new("image", user.Image ?? ""),
new("status", state), new("status", status),
new("p", user.PubKey, "", "host") new("p", user.PubKey, "", "host")
}; };
if (status == "live")
{
var starts = existingEvent?.Tags?.FindFirstTagValue("starts") ?? DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
tags.Add(new ("starts", starts));
tags.Add(
new("current_participants",
(viewers.HasValue ? viewers.ToString() : null) ??
existingEvent?.Tags?.FindFirstTagValue("current_participants") ?? "0"));
}
foreach (var tag in !string.IsNullOrEmpty(user.Tags) ? foreach (var tag in !string.IsNullOrEmpty(user.Tags) ?
user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty<string>()) user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty<string>())
{ {