using Microsoft.EntityFrameworkCore; using Newtonsoft.Json; using Nostr.Client.Client; using Nostr.Client.Json; using Nostr.Client.Keys; using Nostr.Client.Messages; using Nostr.Client.Requests; using NostrStreamer.Database; namespace NostrStreamer.Services; public class StreamManager { private const NostrKind StreamEventKind = (NostrKind)30_311; private const NostrKind StreamChatKind = (NostrKind)1311; private readonly ILogger _logger; private readonly StreamerContext _db; private readonly Config _config; private readonly INostrClient _nostr; private readonly SrsApi _srsApi; public StreamManager(ILogger logger, StreamerContext db, Config config, INostrClient nostr, SrsApi srsApi) { _logger = logger; _db = db; _config = config; _nostr = nostr; _srsApi = srsApi; } public async Task StreamStarted(string streamKey) { var user = await GetUserFromStreamKey(streamKey); if (user == default) throw new Exception("No stream key found"); _logger.LogInformation("Stream started for: {pubkey}", user.PubKey); if (user.Balance <= 0) { throw new Exception("User balance empty"); } var ev = CreateStreamEvent(user, "live"); await PublishEvent(user, ev); } public async Task StreamStopped(string streamKey) { var user = await GetUserFromStreamKey(streamKey); if (user == default) throw new Exception("No stream key found"); _logger.LogInformation("Stream stopped for: {pubkey}", user.PubKey); var ev = CreateStreamEvent(user, "ended"); await PublishEvent(user, ev); } public async Task ConsumeQuota(string streamKey, double duration, string clientId) { var user = await GetUserFromStreamKey(streamKey); if (user == default) throw new Exception("No stream key found"); const long balanceAlertThreshold = 500; const double rate = 21.0d; // 21 sats/min var cost = Math.Round(rate * (duration / 60d)); await _db.Users .Where(a => a.PubKey == user.PubKey) .ExecuteUpdateAsync(o => o.SetProperty(v => v.Balance, v => v.Balance - cost)); _logger.LogInformation("Stream consumed {n} seconds for {pubkey} costing {cost} sats", duration, user.PubKey, cost); if (user.Balance >= balanceAlertThreshold && user.Balance - cost < balanceAlertThreshold) { _nostr.Send(new NostrEventRequest(CreateStreamChat(user, $"Your balance is below {balanceAlertThreshold} sats, please topup"))); } if (user.Balance <= 0) { _logger.LogInformation("Kicking stream due to low balance"); await _srsApi.KickClient(clientId); } } public async Task PatchEvent(string pubkey, string? title, string? summary, string? image) { var user = await _db.Users.SingleOrDefaultAsync(a => a.PubKey == pubkey); if (user == default) throw new Exception("User not found"); user.Title = title; user.Summary = summary; user.Image = image; var existingEvent = user.Event != default ? JsonConvert.DeserializeObject(user.Event, NostrSerializer.Settings) : null; var ev = CreateStreamEvent(user, existingEvent?.Tags?.FindFirstTagValue("status") ?? "planned"); user.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); await _db.SaveChangesAsync(); _nostr.Send(new NostrEventRequest(ev)); } private async Task PublishEvent(User user, NostrEvent ev) { await _db.Users .Where(a => a.PubKey == user.PubKey) .ExecuteUpdateAsync(o => o.SetProperty(v => v.Event, JsonConvert.SerializeObject(ev, NostrSerializer.Settings))); _nostr.Send(new NostrEventRequest(ev)); } private NostrEvent CreateStreamChat(User user, string message) { var pk = NostrPrivateKey.FromBech32(_config.PrivateKey); var ev = new NostrEvent { Kind = StreamChatKind, Content = message, CreatedAt = DateTime.Now, Tags = new NostrEventTags( new NostrEventTag("a", $"{StreamEventKind}:{pk.DerivePublicKey().Hex}:{user.PubKey}") ) }; return ev.Sign(pk); } private NostrEvent CreateStreamEvent(User user, string state) { var tags = new List() { new("d", user.PubKey), new("title", user.Title ?? ""), new("summary", user.Summary ?? ""), new("streaming", GetStreamUrl(user)), new("image", user.Image ?? ""), new("status", state), new("p", user.PubKey, "", "host") }; foreach (var tag in user.Tags?.Split(",") ?? Array.Empty()) { tags.Add(new("t", tag.Trim())); } var ev = new NostrEvent { Kind = StreamEventKind, Content = "", CreatedAt = DateTime.Now, Tags = new NostrEventTags(tags) }; return ev.Sign(NostrPrivateKey.FromBech32(_config.PrivateKey)); } private string GetStreamUrl(User u) { var ub = new Uri(_config.DataHost, $"{u.PubKey}.m3u8"); return ub.ToString(); } private async Task GetUserFromStreamKey(string streamKey) { return await _db.Users.SingleOrDefaultAsync(a => a.StreamKey == streamKey); } }