diff --git a/NostrStreamer/ApiModel/Account.cs b/NostrStreamer/ApiModel/Account.cs index 8842ba4..b20404e 100644 --- a/NostrStreamer/ApiModel/Account.cs +++ b/NostrStreamer/ApiModel/Account.cs @@ -5,20 +5,44 @@ namespace NostrStreamer.ApiModel; public class Account { - [JsonProperty("url")] - public string Url { get; init; } = null!; - - [JsonProperty("key")] - public string Key { get; init; } = null!; - [JsonProperty("event")] public NostrEvent? Event { get; init; } - [JsonProperty("quota")] - public AccountQuota Quota { get; init; } = null!; + [JsonProperty("endpoints")] + public List Endpoints { get; init; } = new(); + + [JsonProperty("balance")] + public long Balance { get; init; } } +public class AccountEndpoint +{ + [JsonProperty("name")] + public string Name { get; init; } = null!; + + [JsonProperty("url")] + public string Url { get; init; } = null!; + + [JsonProperty("key")] + public string Key { get; init; } = null!; + [JsonProperty("cost")] + public EndpointCost Cost { get; init; } = null!; + + [JsonProperty("capabilities")] + public List Capabilities { get; init; } = new(); +} + +public class EndpointCost +{ + [JsonProperty("rate")] + public double Rate { get; init; } + + [JsonProperty("unit")] + public string Unit { get; init; } = null!; +} + +[Obsolete("Use EndpointCost")] public class AccountQuota { [JsonProperty("rate")] diff --git a/NostrStreamer/Config.cs b/NostrStreamer/Config.cs index 1930588..0f2989b 100644 --- a/NostrStreamer/Config.cs +++ b/NostrStreamer/Config.cs @@ -12,11 +12,6 @@ public class Config /// public Uri RtmpHost { get; init; } = null!; - /// - /// SRS app name - /// - public string App { get; init; } = "live"; - /// /// SRS api server host /// @@ -32,20 +27,15 @@ public class Config /// public Uri DataHost { get; init; } = null!; + /// + /// Public URL for the api + /// + public Uri ApiHost { get; init; } = null!; + public string PrivateKey { get; init; } = null!; public string[] Relays { get; init; } = Array.Empty(); public LndConfig Lnd { get; init; } = null!; - - /// - /// Cost/min (milli-sats) - /// - public int Cost { get; init; } = 10_000; - - /// - /// List of video variants - /// - public List Variants { get; init; } = null!; } public class LndConfig @@ -56,11 +46,3 @@ public class LndConfig public string MacaroonPath { get; init; } = null!; } - -public class Variant -{ - public string Name { get; init; } = null!; - public int Width { get; init; } - public int Height { get; init; } - public int Bandwidth { get; init; } -} diff --git a/NostrStreamer/Controllers/LnurlController.cs b/NostrStreamer/Controllers/LnurlController.cs new file mode 100644 index 0000000..5d8b65f --- /dev/null +++ b/NostrStreamer/Controllers/LnurlController.cs @@ -0,0 +1,86 @@ +using BTCPayServer.Lightning; +using LNURL; +using Microsoft.AspNetCore.Mvc; +using Newtonsoft.Json; +using Nostr.Client.Json; +using Nostr.Client.Messages; +using NostrStreamer.Database; +using NostrStreamer.Services; + +namespace NostrStreamer.Controllers; + +[Route("/api/pay")] +public class LnurlController : Controller +{ + private readonly Config _config; + private readonly UserService _userService; + + public LnurlController(Config config, UserService userService) + { + _config = config; + _userService = userService; + } + + [HttpGet("/.well-known/lnurlp/{key}")] + public async Task GetPayService([FromRoute] string key) + { + var user = await _userService.GetUser(key); + if (user == default) return LnurlError("User not found"); + + var metadata = GetMetadata(user); + var pubKey = _config.GetPubKey(); + return Json(new LNURLPayRequest + { + Callback = new Uri(_config.ApiHost, $"/api/pay/{key}"), + Metadata = JsonConvert.SerializeObject(metadata), + MinSendable = LightMoney.Satoshis(1), + MaxSendable = LightMoney.Coins(1), + Tag = "payRequest", + NostrPubkey = pubKey + }); + } + + [HttpGet("{key}")] + public async Task PayUserBalance([FromRoute] string key, [FromQuery] long amount, [FromQuery] string? nostr) + { + try + { + if (!string.IsNullOrEmpty(nostr)) + { + var ev = JsonConvert.DeserializeObject(nostr, NostrSerializer.Settings); + if (ev?.Kind != NostrKind.ZapRequest || ev.Tags?.FindFirstTagValue("amount") != amount.ToString() || + !ev.IsSignatureValid()) + { + throw new Exception("Invalid nostr event"); + } + } + + var invoice = await _userService.CreateTopup(key, (ulong)(amount / 1000), nostr); + return Json(new LNURLPayRequest.LNURLPayRequestCallbackResponse + { + Pr = invoice + }); + } + catch (Exception ex) + { + return LnurlError($"Failed to create invoice (${ex.Message})"); + } + } + + private List> GetMetadata(User u) + { + return new List>() + { + new("text/plain", $"Topup for {u.PubKey}") + }; + } + + private IActionResult LnurlError(string reason) + { + return Json(new LNUrlStatusResponse() + { + Reason = reason, + Status = "ERROR" + }); + } +} diff --git a/NostrStreamer/Controllers/NostrController.cs b/NostrStreamer/Controllers/NostrController.cs index f74fb04..a37d46f 100644 --- a/NostrStreamer/Controllers/NostrController.cs +++ b/NostrStreamer/Controllers/NostrController.cs @@ -4,11 +4,11 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; using Newtonsoft.Json; using Nostr.Client.Json; -using Nostr.Client.Messages; using Nostr.Client.Utils; using NostrStreamer.ApiModel; using NostrStreamer.Database; using NostrStreamer.Services; +using NostrStreamer.Services.StreamManager; namespace NostrStreamer.Controllers; @@ -18,14 +18,14 @@ public class NostrController : Controller { private readonly StreamerContext _db; private readonly Config _config; - private readonly StreamManager _streamManager; + private readonly StreamManagerFactory _streamManagerFactory; private readonly LndNode _lnd; - public NostrController(StreamerContext db, Config config, StreamManager streamManager, LndNode lnd) + public NostrController(StreamerContext db, Config config, StreamManagerFactory streamManager, LndNode lnd) { _db = db; _config = config; - _streamManager = streamManager; + _streamManagerFactory = streamManager; _lnd = lnd; } @@ -48,18 +48,23 @@ public class NostrController : Controller await _db.SaveChangesAsync(); } + var endpoints = await _db.Endpoints.ToListAsync(); var account = new Account { - Url = new Uri(_config.RtmpHost, _config.App).ToString(), - Key = user.StreamKey, - Event = !string.IsNullOrEmpty(user.Event) ? JsonConvert.DeserializeObject(user.Event, NostrSerializer.Settings) : - null, - Quota = new() + Event = null, + Endpoints = endpoints.Select(a => new AccountEndpoint() { - Unit = "min", - Rate = (int)Math.Ceiling(_config.Cost / 1000m), - Remaining = (long)Math.Floor(user.Balance / 1000m) - } + Name = a.Name, + Url = new Uri(_config.RtmpHost, a.App).ToString(), + Key = user.StreamKey, + Capabilities = a.Capabilities, + Cost = new() + { + Unit = "min", + Rate = a.Cost / 1000d + } + }).ToList(), + Balance = (long)Math.Floor(user.Balance / 1000m) }; return Content(JsonConvert.SerializeObject(account, NostrSerializer.Settings), "application/json"); @@ -71,7 +76,8 @@ public class NostrController : Controller var pubkey = GetPubKey(); if (string.IsNullOrEmpty(pubkey)) return Unauthorized(); - await _streamManager.PatchEvent(pubkey, req.Title, req.Summary, req.Image, req.Tags, req.ContentWarning); + var streamManager = await _streamManagerFactory.ForCurrentStream(pubkey); + await streamManager.PatchEvent(req.Title, req.Summary, req.Image, req.Tags, req.ContentWarning); return Accepted(); } diff --git a/NostrStreamer/Controllers/PlaylistController.cs b/NostrStreamer/Controllers/PlaylistController.cs index ac8e8b5..8af06ca 100644 --- a/NostrStreamer/Controllers/PlaylistController.cs +++ b/NostrStreamer/Controllers/PlaylistController.cs @@ -1,9 +1,8 @@ using System.Text.RegularExpressions; using Microsoft.AspNetCore.Mvc; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Caching.Memory; using NostrStreamer.Database; using NostrStreamer.Services; +using NostrStreamer.Services.StreamManager; namespace NostrStreamer.Controllers; @@ -11,36 +10,30 @@ namespace NostrStreamer.Controllers; public class PlaylistController : Controller { private readonly ILogger _logger; - private readonly IMemoryCache _cache; private readonly Config _config; - private readonly IServiceScopeFactory _scopeFactory; private readonly HttpClient _client; private readonly SrsApi _srsApi; private readonly ViewCounter _viewCounter; + private readonly StreamManagerFactory _streamManagerFactory; - public PlaylistController(Config config, IMemoryCache cache, ILogger logger, IServiceScopeFactory scopeFactory, - HttpClient client, SrsApi srsApi, ViewCounter viewCounter) + public PlaylistController(Config config, ILogger logger, + HttpClient client, SrsApi srsApi, ViewCounter viewCounter, StreamManagerFactory streamManagerFactory) { _config = config; - _cache = cache; _logger = logger; - _scopeFactory = scopeFactory; _client = client; _srsApi = srsApi; _viewCounter = viewCounter; + _streamManagerFactory = streamManagerFactory; } - [HttpGet("{variant}/{pubkey}.m3u8")] - public async Task RewritePlaylist([FromRoute] string pubkey, [FromRoute] string variant, [FromQuery(Name = "hls_ctx")] string hlsCtx) + [HttpGet("{variant}/{id}.m3u8")] + public async Task RewritePlaylist([FromRoute] Guid id, [FromRoute] string variant, [FromQuery(Name = "hls_ctx")] string hlsCtx) { - var key = await GetStreamKey(pubkey); - if (string.IsNullOrEmpty(key)) - { - Response.StatusCode = 404; - return; - } + var streamManager = await _streamManagerFactory.ForStream(id); + var userStream = streamManager.GetStream(); - var path = $"/{_config.App}/{variant}/{key}.m3u8"; + var path = $"/{userStream.Endpoint.App}/{variant}/{userStream.User.StreamKey}.m3u8"; var ub = new UriBuilder(_config.SrsHttpHost) { Path = path, @@ -66,8 +59,8 @@ public class PlaylistController : Controller { await sw.WriteLineAsync(line); var trackPath = await sr.ReadLineAsync(); - var seg = Regex.Match(trackPath!, @"-(\d+)\.ts$"); - await sw.WriteLineAsync($"{pubkey}/{seg.Groups[1].Value}.ts"); + var seg = Regex.Match(trackPath!, @"-(\d+)\.ts"); + await sw.WriteLineAsync($"{id}/{seg.Groups[1].Value}.ts"); } else { @@ -76,71 +69,72 @@ public class PlaylistController : Controller } Response.Body.Close(); - _viewCounter.Activity(key, hlsCtx); + _viewCounter.Activity(userStream.Id, hlsCtx); } [HttpGet("{pubkey}.m3u8")] public async Task CreateMultiBitrate([FromRoute] string pubkey) { - var key = await GetStreamKey(pubkey); - if (string.IsNullOrEmpty(key)) + try { - Response.StatusCode = 404; - return; + var streamManager = await _streamManagerFactory.ForCurrentStream(pubkey); + + var userStream = streamManager.GetStream(); + var hlsCtx = await GetHlsCtx(userStream); + if (string.IsNullOrEmpty(hlsCtx)) + { + Response.StatusCode = 404; + return; + } + + Response.ContentType = "application/x-mpegurl"; + await using var sw = new StreamWriter(Response.Body); + + var streams = await _srsApi.ListStreams(); + await sw.WriteLineAsync("#EXTM3U"); + + foreach (var variant in userStream.Endpoint.GetVariants().OrderBy(a => a.Bandwidth)) + { + var stream = streams.FirstOrDefault(a => + a.Name == userStream.User.StreamKey && a.App == $"{userStream.Endpoint.App}/{variant.SourceName}"); + + var resArg = stream?.Video != default ? $"RESOLUTION={stream.Video?.Width}x{stream.Video?.Height}" : + variant.ToResolutionArg(); + + var bandwidthArg = variant.ToBandwidthArg(); + + var averageBandwidthArg = stream?.Kbps?.Recv30s.HasValue ?? false ? $"AVERAGE-BANDWIDTH={stream.Kbps.Recv30s * 1000}" : ""; + var codecArg = "CODECS=\"avc1.640028,mp4a.40.2\""; + var allArgs = new[] {bandwidthArg, averageBandwidthArg, resArg, codecArg}.Where(a => !string.IsNullOrEmpty(a)); + await sw.WriteLineAsync( + $"#EXT-X-STREAM-INF:{string.Join(",", allArgs)}"); + + var u = new Uri(_config.DataHost, + $"{variant.SourceName}/{userStream.Id}.m3u8{(!string.IsNullOrEmpty(hlsCtx) ? $"?hls_ctx={hlsCtx}" : "")}"); + + await sw.WriteLineAsync(u.ToString()); + } } - - var hlsCtx = await GetHlsCtx(key); - if (string.IsNullOrEmpty(hlsCtx)) + catch (Exception ex) { + _logger.LogWarning(ex, "Failed to get stream for {pubkey} {message}", pubkey, ex.Message); Response.StatusCode = 404; - return; - } - - Response.ContentType = "application/x-mpegurl"; - await using var sw = new StreamWriter(Response.Body); - - var streams = await _srsApi.ListStreams(); - await sw.WriteLineAsync("#EXTM3U"); - - foreach (var variant in _config.Variants.OrderBy(a => a.Bandwidth)) - { - var stream = streams.FirstOrDefault(a => - a.Name == key && a.App == $"{_config.App}/{variant.Name}"); - - var resArg = stream?.Video != default ? $"RESOLUTION={stream.Video?.Width}x{stream.Video?.Height}" : - $"RESOLUTION={variant.Width}x{variant.Height}"; - - var bandwidthArg = $"BANDWIDTH={variant.Bandwidth * 1000}"; - - var averageBandwidthArg = stream?.Kbps?.Recv30s.HasValue ?? false ? $"AVERAGE-BANDWIDTH={stream.Kbps.Recv30s * 1000}" : ""; - var allArgs = new[] {bandwidthArg, averageBandwidthArg, resArg}.Where(a => !string.IsNullOrEmpty(a)); - await sw.WriteLineAsync( - $"#EXT-X-STREAM-INF:{string.Join(",", allArgs)},CODECS=\"avc1.640028,mp4a.40.2\""); - - var u = new Uri(_config.DataHost, - $"{variant.Name}/{pubkey}.m3u8{(!string.IsNullOrEmpty(hlsCtx) ? $"?hls_ctx={hlsCtx}" : "")}"); - - await sw.WriteLineAsync(u.ToString()); } } - [HttpGet("{variant}/{pubkey}/{segment}")] - public async Task GetSegment([FromRoute] string pubkey, [FromRoute] string segment, [FromRoute] string variant) + [HttpGet("{variant}/{id}/{segment}")] + public async Task GetSegment([FromRoute] Guid id, [FromRoute] string segment, [FromRoute] string variant) { - var key = await GetStreamKey(pubkey); - if (string.IsNullOrEmpty(key)) - { - Response.StatusCode = 404; - return; - } + var streamManager = await _streamManagerFactory.ForStream(id); + var userStream = streamManager.GetStream(); - var path = $"/{_config.App}/{variant}/{key}-{segment}"; + var path = $"/{userStream.Endpoint.App}/{variant}/{userStream.User.StreamKey}-{segment}"; await ProxyRequest(path); } - private async Task GetHlsCtx(string key) + private async Task GetHlsCtx(UserStream stream) { - var path = $"/{_config.App}/source/{key}.m3u8"; + var path = $"/{stream.Endpoint.App}/source/{stream.User.StreamKey}.m3u8"; var ub = new Uri(_config.SrsHttpHost, path); var req = CreateProxyRequest(ub); using var rsp = await _client.SendAsync(req); @@ -186,21 +180,4 @@ public class PlaylistController : Controller return req; } - - private async Task GetStreamKey(string pubkey) - { - var cacheKey = $"stream-key:{pubkey}"; - var cached = _cache.Get(cacheKey); - if (cached != default) - { - return cached; - } - - using var scope = _scopeFactory.CreateScope(); - await using var db = scope.ServiceProvider.GetRequiredService(); - var user = await db.Users.SingleOrDefaultAsync(a => a.PubKey == pubkey); - - _cache.Set(cacheKey, user?.StreamKey); - return user?.StreamKey; - } } diff --git a/NostrStreamer/Controllers/SRSController.cs b/NostrStreamer/Controllers/SRSController.cs index 8a08a5e..32208eb 100644 --- a/NostrStreamer/Controllers/SRSController.cs +++ b/NostrStreamer/Controllers/SRSController.cs @@ -1,6 +1,6 @@ using Microsoft.AspNetCore.Mvc; using Newtonsoft.Json; -using NostrStreamer.Services; +using NostrStreamer.Services.StreamManager; namespace NostrStreamer.Controllers; @@ -8,14 +8,12 @@ namespace NostrStreamer.Controllers; public class SrsController : Controller { private readonly ILogger _logger; - private readonly Config _config; - private readonly StreamManager _streamManager; + private readonly StreamManagerFactory _streamManagerFactory; - public SrsController(ILogger logger, Config config, StreamManager streamManager) + public SrsController(ILogger logger, StreamManagerFactory streamManager) { _logger = logger; - _config = config; - _streamManager = streamManager; + _streamManagerFactory = streamManager; } [HttpPost] @@ -24,8 +22,7 @@ public class SrsController : Controller _logger.LogInformation("OnStream: {obj}", JsonConvert.SerializeObject(req)); try { - if (string.IsNullOrEmpty(req.Stream) || string.IsNullOrEmpty(req.App) || string.IsNullOrEmpty(req.Stream) || - !req.App.StartsWith(_config.App, StringComparison.InvariantCultureIgnoreCase)) + if (string.IsNullOrEmpty(req.Stream) || string.IsNullOrEmpty(req.App)) { return new() { @@ -33,23 +30,44 @@ public class SrsController : Controller }; } + var appSplit = req.App.Split("/"); + var streamManager = await _streamManagerFactory.ForStream(new StreamInfo + { + App = appSplit[0], + Variant = appSplit.Length > 1 ? appSplit[1] : "source", + ClientId = req.ClientId!, + StreamKey = req.Stream + }); + + if (req.Action == "on_forward") + { + var urls = await streamManager.OnForward(); + return new SrsForwardHookReply + { + Data = new() + { + Urls = urls + } + }; + } + if (req.App.EndsWith("/source")) { if (req.Action == "on_publish") { - await _streamManager.StreamStarted(req.Stream); + await streamManager.StreamStarted(); return new(); } if (req.Action == "on_unpublish") { - await _streamManager.StreamStopped(req.Stream); + await streamManager.StreamStopped(); return new(); } if (req.Action == "on_hls" && req.Duration.HasValue && !string.IsNullOrEmpty(req.ClientId)) { - await _streamManager.ConsumeQuota(req.Stream, req.Duration.Value, req.ClientId); + await streamManager.ConsumeQuota(req.Duration.Value); return new(); } } @@ -76,6 +94,18 @@ public class SrsHookReply public int Code { get; init; } } +public class SrsForwardHookReply : SrsHookReply +{ + [JsonProperty("data")] + public SrsUrlList Data { get; init; } = null!; +} + +public class SrsUrlList +{ + [JsonProperty("urls")] + public List Urls { get; init; } = new(); +} + public class SrsHook { [JsonProperty("action")] diff --git a/NostrStreamer/Database/Configuration/IngestEndpointConfiguration.cs b/NostrStreamer/Database/Configuration/IngestEndpointConfiguration.cs new file mode 100644 index 0000000..093da19 --- /dev/null +++ b/NostrStreamer/Database/Configuration/IngestEndpointConfiguration.cs @@ -0,0 +1,30 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace NostrStreamer.Database.Configuration; + +public class IngestEndpointConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(a => a.Id); + + builder.Property(a => a.Name) + .IsRequired(); + + builder.Property(a => a.App) + .IsRequired(); + + builder.Property(a => a.Forward) + .IsRequired(); + + builder.Property(a => a.Cost) + .IsRequired(); + + builder.Property(a => a.Capabilities) + .IsRequired(); + + builder.HasIndex(a => a.App) + .IsUnique(); + } +} diff --git a/NostrStreamer/Database/Configuration/PaymentsConfiguration.cs b/NostrStreamer/Database/Configuration/PaymentsConfiguration.cs index fbded35..5037b4f 100644 --- a/NostrStreamer/Database/Configuration/PaymentsConfiguration.cs +++ b/NostrStreamer/Database/Configuration/PaymentsConfiguration.cs @@ -19,7 +19,11 @@ public class PaymentsConfiguration : IEntityTypeConfiguration builder.Property(a => a.Created) .IsRequired(); - + + builder.Property(a => a.Nostr); + builder.Property(a => a.Type) + .IsRequired(); + builder.HasOne(a => a.User) .WithMany(a => a.Payments) .HasForeignKey(a => a.PubKey); diff --git a/NostrStreamer/Database/Configuration/UserConfiguration.cs b/NostrStreamer/Database/Configuration/UserConfiguration.cs index 6c548ea..dfe5a0c 100644 --- a/NostrStreamer/Database/Configuration/UserConfiguration.cs +++ b/NostrStreamer/Database/Configuration/UserConfiguration.cs @@ -11,7 +11,6 @@ public class UserConfiguration : IEntityTypeConfiguration builder.Property(a => a.StreamKey) .IsRequired(); - builder.Property(a => a.Event); builder.Property(a => a.Balance) .IsRequired(); diff --git a/NostrStreamer/Database/Configuration/UserStreamConfiguration.cs b/NostrStreamer/Database/Configuration/UserStreamConfiguration.cs new file mode 100644 index 0000000..a946478 --- /dev/null +++ b/NostrStreamer/Database/Configuration/UserStreamConfiguration.cs @@ -0,0 +1,35 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace NostrStreamer.Database.Configuration; + +public class UserStreamConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(a => a.Id); + builder.Property(a => a.ClientId) + .IsRequired(); + + builder.Property(a => a.Starts) + .IsRequired(); + + builder.Property(a => a.Ends); + + builder.Property(a => a.State) + .IsRequired(); + + builder.Property(a => a.Event) + .IsRequired(); + + builder.Property(a => a.Recording); + + builder.HasOne(a => a.Endpoint) + .WithMany() + .HasForeignKey(a => a.EndpointId); + + builder.HasOne(a => a.User) + .WithMany(a => a.Streams) + .HasForeignKey(a => a.PubKey); + } +} diff --git a/NostrStreamer/Database/Configuration/UserStreamGuestConfiguration.cs b/NostrStreamer/Database/Configuration/UserStreamGuestConfiguration.cs new file mode 100644 index 0000000..72c8ceb --- /dev/null +++ b/NostrStreamer/Database/Configuration/UserStreamGuestConfiguration.cs @@ -0,0 +1,26 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace NostrStreamer.Database.Configuration; + +public class UserStreamGuestConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(a => a.Id); + builder.Property(a => a.PubKey) + .IsRequired(); + + builder.Property(a => a.Sig); + builder.Property(a => a.Relay); + builder.Property(a => a.Role); + builder.Property(a => a.ZapSplit); + + builder.HasOne(a => a.Stream) + .WithMany(a => a.Guests) + .HasForeignKey(a => a.StreamId); + + builder.HasIndex(a => new {a.StreamId, a.PubKey}) + .IsUnique(); + } +} diff --git a/NostrStreamer/Database/IngestEndpoint.cs b/NostrStreamer/Database/IngestEndpoint.cs new file mode 100644 index 0000000..571b7c6 --- /dev/null +++ b/NostrStreamer/Database/IngestEndpoint.cs @@ -0,0 +1,28 @@ +namespace NostrStreamer.Database; + +public class IngestEndpoint +{ + public Guid Id { get; init; } = Guid.NewGuid(); + + public string Name { get; init; } = null!; + + /// + /// Stream app name at ingest + /// + public string App { get; init; } = null!; + + /// + /// Forward to VHost + /// + public string Forward { get; init; } = null!; + + /// + /// Cost/min (milli-sats) + /// + public int Cost { get; init; } = 10_000; + + /// + /// Stream capability tags + /// + public List Capabilities { get; init; } = new(); +} diff --git a/NostrStreamer/Database/Payment.cs b/NostrStreamer/Database/Payment.cs index 262fae1..1bb7431 100644 --- a/NostrStreamer/Database/Payment.cs +++ b/NostrStreamer/Database/Payment.cs @@ -14,4 +14,14 @@ public class Payment public ulong Amount { get; init; } public DateTime Created { get; init; } = DateTime.UtcNow; + + public string? Nostr { get; init; } + + public PaymentType Type { get; init; } } + +public enum PaymentType +{ + Topup = 0, + Zap = 1 +} \ No newline at end of file diff --git a/NostrStreamer/Database/StreamerContext.cs b/NostrStreamer/Database/StreamerContext.cs index aa8e753..b441325 100644 --- a/NostrStreamer/Database/StreamerContext.cs +++ b/NostrStreamer/Database/StreamerContext.cs @@ -6,21 +6,25 @@ public class StreamerContext : DbContext { public StreamerContext() { - } public StreamerContext(DbContextOptions ctx) : base(ctx) { - } - + protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.ApplyConfigurationsFromAssembly(typeof(StreamerContext).Assembly); } - + public DbSet Users => Set(); public DbSet Payments => Set(); + + public DbSet Streams => Set(); + + public DbSet Guests => Set(); + + public DbSet Endpoints => Set(); } diff --git a/NostrStreamer/Database/User.cs b/NostrStreamer/Database/User.cs index 0db3880..f6f6e90 100644 --- a/NostrStreamer/Database/User.cs +++ b/NostrStreamer/Database/User.cs @@ -8,11 +8,6 @@ public class User /// Stream key /// public string StreamKey { get; init; } = null!; - - /// - /// Most recent nostr event published - /// - public string? Event { get; set; } /// /// Milli sats balance @@ -50,4 +45,5 @@ public class User public uint Version { get; set; } public List Payments { get; init; } = new(); + public List Streams { get; init; } = new(); } diff --git a/NostrStreamer/Database/UserStream.cs b/NostrStreamer/Database/UserStream.cs index 8e96f18..b082653 100644 --- a/NostrStreamer/Database/UserStream.cs +++ b/NostrStreamer/Database/UserStream.cs @@ -2,5 +2,38 @@ namespace NostrStreamer.Database; public class UserStream { + public Guid Id { get; init; } = Guid.NewGuid(); + + public string PubKey { get; init; } = null!; + public User User { get; init; } = null!; + + public string ClientId { get; init; } = null!; + public DateTime Starts { get; init; } = DateTime.UtcNow; + + public DateTime? Ends { get; set; } + + public UserStreamState State { get; set; } + + /// + /// Nostr Event for this stream + /// + public string Event { get; set; } = null!; + + /// + /// Recording URL of ended stream + /// + public string? Recording { get; set; } + + public Guid EndpointId { get; init; } + public IngestEndpoint Endpoint { get; init; } = null!; + + public List Guests { get; init; } = new(); } + +public enum UserStreamState +{ + Planned = 1, + Live = 2, + Ended = 3 +} \ No newline at end of file diff --git a/NostrStreamer/Database/UserStreamGuest.cs b/NostrStreamer/Database/UserStreamGuest.cs new file mode 100644 index 0000000..1b09001 --- /dev/null +++ b/NostrStreamer/Database/UserStreamGuest.cs @@ -0,0 +1,19 @@ +namespace NostrStreamer.Database; + +public class UserStreamGuest +{ + public Guid Id { get; init; } = Guid.NewGuid(); + + public Guid StreamId { get; init; } + public UserStream Stream { get; init; } = null!; + + public string PubKey { get; init; } = null!; + + public string? Relay { get; init; } + + public string? Role { get; init; } + + public string? Sig { get; init; } + + public decimal ZapSplit { get; init; } +} diff --git a/NostrStreamer/Extensions.cs b/NostrStreamer/Extensions.cs index b0c4684..856494f 100644 --- a/NostrStreamer/Extensions.cs +++ b/NostrStreamer/Extensions.cs @@ -1,5 +1,6 @@ using Newtonsoft.Json; using Nostr.Client.Json; +using Nostr.Client.Keys; using Nostr.Client.Messages; using NostrStreamer.Database; @@ -7,8 +8,91 @@ namespace NostrStreamer; public static class Extensions { - public static NostrEvent? GetNostrEvent(this User user) + public static NostrEvent? GetEvent(this UserStream us) { - return user.Event != default ? JsonConvert.DeserializeObject(user.Event, NostrSerializer.Settings) : null; + return JsonConvert.DeserializeObject(us.Event, NostrSerializer.Settings); + } + + public static string GetPubKey(this Config cfg) + { + return NostrPrivateKey.FromBech32(cfg.PrivateKey).DerivePublicKey().Hex; + } + + public static List GetVariants(this IngestEndpoint ep) + { + return ep.Capabilities + .Where(a => a.StartsWith("variant")) + .Select(Variant.FromString).ToList(); + } +} + +public class Variant +{ + public int Width { get; init; } + public int Height { get; init; } + public int Bandwidth { get; init; } + + public string SourceName => Bandwidth == -1 ? "source" : $"{Height}h"; + + /// + /// variant:{px}h:{bandwidth} + /// + /// + /// + /// + public static Variant FromString(string str) + { + if (str.Equals("variant:source", StringComparison.InvariantCultureIgnoreCase)) + { + return new() + { + Width = 0, + Height = 0, + Bandwidth = -1 + }; + } + + var strSplit = str.Split(":"); + if (strSplit.Length != 3 || !int.TryParse(strSplit[1][..^1], out var h) || !int.TryParse(strSplit[2], out var b)) + { + throw new FormatException(); + } + + return new() + { + Height = h, + Width = (int)Math.Ceiling(h / 9m * 16m), + Bandwidth = b + }; + } + + public override string ToString() + { + if (Bandwidth == -1) + { + return "variant:source"; + } + + return $"variant:{SourceName}:{Bandwidth}"; + } + + public string ToResolutionArg() + { + if (Bandwidth == -1) + { + return string.Empty; + } + + return $"RESOLUTION={Width}x{Height}"; + } + + public string ToBandwidthArg() + { + if (Bandwidth == -1) + { + return string.Empty; + } + + return $"BANDWIDTH={Bandwidth * 1000}"; } } diff --git a/NostrStreamer/Migrations/20230725105922_V2.Designer.cs b/NostrStreamer/Migrations/20230725105922_V2.Designer.cs new file mode 100644 index 0000000..e724713 --- /dev/null +++ b/NostrStreamer/Migrations/20230725105922_V2.Designer.cs @@ -0,0 +1,218 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using NostrStreamer.Database; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + [DbContext(typeof(StreamerContext))] + [Migration("20230725105922_V2")] + partial class V2 + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "7.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.Property("PaymentHash") + .HasColumnType("text"); + + b.Property("Amount") + .HasColumnType("numeric(20,0)"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("Invoice") + .IsRequired() + .HasColumnType("text"); + + b.Property("IsPaid") + .HasColumnType("boolean"); + + b.Property("Nostr") + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Type") + .HasColumnType("integer"); + + b.HasKey("PaymentHash"); + + b.HasIndex("PubKey"); + + b.ToTable("Payments"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Property("PubKey") + .HasColumnType("text"); + + b.Property("Balance") + .HasColumnType("bigint"); + + b.Property("ContentWarning") + .HasColumnType("text"); + + b.Property("Image") + .HasColumnType("text"); + + b.Property("StreamKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Summary") + .HasColumnType("text"); + + b.Property("Tags") + .HasColumnType("text"); + + b.Property("Title") + .HasColumnType("text"); + + b.Property("Version") + .IsConcurrencyToken() + .ValueGeneratedOnAddOrUpdate() + .HasColumnType("xid") + .HasColumnName("xmin"); + + b.HasKey("PubKey"); + + b.ToTable("Users"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("ClientId") + .IsRequired() + .HasColumnType("text"); + + b.Property("Ends") + .HasColumnType("timestamp with time zone"); + + b.Property("Event") + .IsRequired() + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Recording") + .HasColumnType("text"); + + b.Property("Starts") + .HasColumnType("timestamp with time zone"); + + b.Property("State") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("PubKey"); + + b.ToTable("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Relay") + .HasColumnType("text"); + + b.Property("Role") + .HasColumnType("text"); + + b.Property("Sig") + .HasColumnType("text"); + + b.Property("StreamId") + .HasColumnType("uuid"); + + b.Property("ZapSplit") + .HasColumnType("numeric"); + + b.HasKey("Id"); + + b.HasIndex("StreamId", "PubKey") + .IsUnique(); + + b.ToTable("Guests"); + }); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Payments") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Streams") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "Stream") + .WithMany("Guests") + .HasForeignKey("StreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Stream"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Navigation("Payments"); + + b.Navigation("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Navigation("Guests"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/NostrStreamer/Migrations/20230725105922_V2.cs b/NostrStreamer/Migrations/20230725105922_V2.cs new file mode 100644 index 0000000..827ca4f --- /dev/null +++ b/NostrStreamer/Migrations/20230725105922_V2.cs @@ -0,0 +1,114 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + /// + public partial class V2 : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "Event", + table: "Users"); + + migrationBuilder.AddColumn( + name: "Nostr", + table: "Payments", + type: "text", + nullable: true); + + migrationBuilder.AddColumn( + name: "Type", + table: "Payments", + type: "integer", + nullable: false, + defaultValue: 0); + + migrationBuilder.CreateTable( + name: "Streams", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + PubKey = table.Column(type: "text", nullable: false), + ClientId = table.Column(type: "text", nullable: false), + Starts = table.Column(type: "timestamp with time zone", nullable: false), + Ends = table.Column(type: "timestamp with time zone", nullable: true), + State = table.Column(type: "integer", nullable: false), + Event = table.Column(type: "text", nullable: false), + Recording = table.Column(type: "text", nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_Streams", x => x.Id); + table.ForeignKey( + name: "FK_Streams_Users_PubKey", + column: x => x.PubKey, + principalTable: "Users", + principalColumn: "PubKey", + onDelete: ReferentialAction.Cascade); + }); + + migrationBuilder.CreateTable( + name: "Guests", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + StreamId = table.Column(type: "uuid", nullable: false), + PubKey = table.Column(type: "text", nullable: false), + Relay = table.Column(type: "text", nullable: true), + Role = table.Column(type: "text", nullable: true), + Sig = table.Column(type: "text", nullable: true), + ZapSplit = table.Column(type: "numeric", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_Guests", x => x.Id); + table.ForeignKey( + name: "FK_Guests_Streams_StreamId", + column: x => x.StreamId, + principalTable: "Streams", + principalColumn: "Id", + onDelete: ReferentialAction.Cascade); + }); + + migrationBuilder.CreateIndex( + name: "IX_Guests_StreamId_PubKey", + table: "Guests", + columns: new[] { "StreamId", "PubKey" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_Streams_PubKey", + table: "Streams", + column: "PubKey"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "Guests"); + + migrationBuilder.DropTable( + name: "Streams"); + + migrationBuilder.DropColumn( + name: "Nostr", + table: "Payments"); + + migrationBuilder.DropColumn( + name: "Type", + table: "Payments"); + + migrationBuilder.AddColumn( + name: "Event", + table: "Users", + type: "text", + nullable: true); + } + } +} diff --git a/NostrStreamer/Migrations/20230725123250_Endpoints.Designer.cs b/NostrStreamer/Migrations/20230725123250_Endpoints.Designer.cs new file mode 100644 index 0000000..a82fd52 --- /dev/null +++ b/NostrStreamer/Migrations/20230725123250_Endpoints.Designer.cs @@ -0,0 +1,265 @@ +// +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using NostrStreamer.Database; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + [DbContext(typeof(StreamerContext))] + [Migration("20230725123250_Endpoints")] + partial class Endpoints + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "7.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("App") + .IsRequired() + .HasColumnType("text"); + + b.Property>("Capabilities") + .IsRequired() + .HasColumnType("text[]"); + + b.Property("Cost") + .HasColumnType("integer"); + + b.Property("Forward") + .IsRequired() + .HasColumnType("text"); + + b.Property("Name") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("App") + .IsUnique(); + + b.ToTable("Endpoints"); + }); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.Property("PaymentHash") + .HasColumnType("text"); + + b.Property("Amount") + .HasColumnType("numeric(20,0)"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("Invoice") + .IsRequired() + .HasColumnType("text"); + + b.Property("IsPaid") + .HasColumnType("boolean"); + + b.Property("Nostr") + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Type") + .HasColumnType("integer"); + + b.HasKey("PaymentHash"); + + b.HasIndex("PubKey"); + + b.ToTable("Payments"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Property("PubKey") + .HasColumnType("text"); + + b.Property("Balance") + .HasColumnType("bigint"); + + b.Property("ContentWarning") + .HasColumnType("text"); + + b.Property("Image") + .HasColumnType("text"); + + b.Property("StreamKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Summary") + .HasColumnType("text"); + + b.Property("Tags") + .HasColumnType("text"); + + b.Property("Title") + .HasColumnType("text"); + + b.Property("Version") + .IsConcurrencyToken() + .ValueGeneratedOnAddOrUpdate() + .HasColumnType("xid") + .HasColumnName("xmin"); + + b.HasKey("PubKey"); + + b.ToTable("Users"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("ClientId") + .IsRequired() + .HasColumnType("text"); + + b.Property("EndpointId") + .HasColumnType("uuid"); + + b.Property("Ends") + .HasColumnType("timestamp with time zone"); + + b.Property("Event") + .IsRequired() + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Recording") + .HasColumnType("text"); + + b.Property("Starts") + .HasColumnType("timestamp with time zone"); + + b.Property("State") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("EndpointId"); + + b.HasIndex("PubKey"); + + b.ToTable("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Relay") + .HasColumnType("text"); + + b.Property("Role") + .HasColumnType("text"); + + b.Property("Sig") + .HasColumnType("text"); + + b.Property("StreamId") + .HasColumnType("uuid"); + + b.Property("ZapSplit") + .HasColumnType("numeric"); + + b.HasKey("Id"); + + b.HasIndex("StreamId", "PubKey") + .IsUnique(); + + b.ToTable("Guests"); + }); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Payments") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint") + .WithMany() + .HasForeignKey("EndpointId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Streams") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Endpoint"); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "Stream") + .WithMany("Guests") + .HasForeignKey("StreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Stream"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Navigation("Payments"); + + b.Navigation("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Navigation("Guests"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/NostrStreamer/Migrations/20230725123250_Endpoints.cs b/NostrStreamer/Migrations/20230725123250_Endpoints.cs new file mode 100644 index 0000000..8761b8c --- /dev/null +++ b/NostrStreamer/Migrations/20230725123250_Endpoints.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + /// + public partial class Endpoints : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "EndpointId", + table: "Streams", + type: "uuid", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + + migrationBuilder.CreateTable( + name: "Endpoints", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + Name = table.Column(type: "text", nullable: false), + App = table.Column(type: "text", nullable: false), + Forward = table.Column(type: "text", nullable: false), + Cost = table.Column(type: "integer", nullable: false), + Capabilities = table.Column>(type: "text[]", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_Endpoints", x => x.Id); + }); + + migrationBuilder.CreateIndex( + name: "IX_Streams_EndpointId", + table: "Streams", + column: "EndpointId"); + + migrationBuilder.CreateIndex( + name: "IX_Endpoints_App", + table: "Endpoints", + column: "App", + unique: true); + + migrationBuilder.AddForeignKey( + name: "FK_Streams_Endpoints_EndpointId", + table: "Streams", + column: "EndpointId", + principalTable: "Endpoints", + principalColumn: "Id", + onDelete: ReferentialAction.Cascade); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropForeignKey( + name: "FK_Streams_Endpoints_EndpointId", + table: "Streams"); + + migrationBuilder.DropTable( + name: "Endpoints"); + + migrationBuilder.DropIndex( + name: "IX_Streams_EndpointId", + table: "Streams"); + + migrationBuilder.DropColumn( + name: "EndpointId", + table: "Streams"); + } + } +} diff --git a/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs b/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs index 00c186f..85d58f6 100644 --- a/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs +++ b/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs @@ -1,5 +1,6 @@ // using System; +using System.Collections.Generic; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Storage.ValueConversion; @@ -22,6 +23,39 @@ namespace NostrStreamer.Migrations NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("App") + .IsRequired() + .HasColumnType("text"); + + b.Property>("Capabilities") + .IsRequired() + .HasColumnType("text[]"); + + b.Property("Cost") + .HasColumnType("integer"); + + b.Property("Forward") + .IsRequired() + .HasColumnType("text"); + + b.Property("Name") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("App") + .IsUnique(); + + b.ToTable("Endpoints"); + }); + modelBuilder.Entity("NostrStreamer.Database.Payment", b => { b.Property("PaymentHash") @@ -40,10 +74,16 @@ namespace NostrStreamer.Migrations b.Property("IsPaid") .HasColumnType("boolean"); + b.Property("Nostr") + .HasColumnType("text"); + b.Property("PubKey") .IsRequired() .HasColumnType("text"); + b.Property("Type") + .HasColumnType("integer"); + b.HasKey("PaymentHash"); b.HasIndex("PubKey"); @@ -62,9 +102,6 @@ namespace NostrStreamer.Migrations b.Property("ContentWarning") .HasColumnType("text"); - b.Property("Event") - .HasColumnType("text"); - b.Property("Image") .HasColumnType("text"); @@ -92,6 +129,81 @@ namespace NostrStreamer.Migrations b.ToTable("Users"); }); + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("ClientId") + .IsRequired() + .HasColumnType("text"); + + b.Property("EndpointId") + .HasColumnType("uuid"); + + b.Property("Ends") + .HasColumnType("timestamp with time zone"); + + b.Property("Event") + .IsRequired() + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Recording") + .HasColumnType("text"); + + b.Property("Starts") + .HasColumnType("timestamp with time zone"); + + b.Property("State") + .HasColumnType("integer"); + + b.HasKey("Id"); + + b.HasIndex("EndpointId"); + + b.HasIndex("PubKey"); + + b.ToTable("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Relay") + .HasColumnType("text"); + + b.Property("Role") + .HasColumnType("text"); + + b.Property("Sig") + .HasColumnType("text"); + + b.Property("StreamId") + .HasColumnType("uuid"); + + b.Property("ZapSplit") + .HasColumnType("numeric"); + + b.HasKey("Id"); + + b.HasIndex("StreamId", "PubKey") + .IsUnique(); + + b.ToTable("Guests"); + }); + modelBuilder.Entity("NostrStreamer.Database.Payment", b => { b.HasOne("NostrStreamer.Database.User", "User") @@ -103,9 +215,46 @@ namespace NostrStreamer.Migrations b.Navigation("User"); }); + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint") + .WithMany() + .HasForeignKey("EndpointId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Streams") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Endpoint"); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "Stream") + .WithMany("Guests") + .HasForeignKey("StreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Stream"); + }); + modelBuilder.Entity("NostrStreamer.Database.User", b => { b.Navigation("Payments"); + + b.Navigation("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Navigation("Guests"); }); #pragma warning restore 612, 618 } diff --git a/NostrStreamer/NostrStreamer.csproj b/NostrStreamer/NostrStreamer.csproj index be36bc3..33fc995 100644 --- a/NostrStreamer/NostrStreamer.csproj +++ b/NostrStreamer/NostrStreamer.csproj @@ -20,8 +20,11 @@ docker-compose.yaml - - srs.conf + + srs-edge.conf + + + srs-origin.conf @@ -32,6 +35,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + @@ -41,4 +45,8 @@ + + + + diff --git a/NostrStreamer/Program.cs b/NostrStreamer/Program.cs index fd1b46c..8066bb6 100644 --- a/NostrStreamer/Program.cs +++ b/NostrStreamer/Program.cs @@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore; using Nostr.Client.Client; using NostrStreamer.Database; using NostrStreamer.Services; +using NostrStreamer.Services.StreamManager; namespace NostrStreamer; @@ -47,11 +48,13 @@ internal static class Program services.AddHostedService(); // streaming services - services.AddTransient(); services.AddTransient(); services.AddHostedService(); services.AddSingleton(); services.AddHostedService(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); // lnd services services.AddSingleton(); diff --git a/NostrStreamer/Services/BackgroundStreamManager.cs b/NostrStreamer/Services/BackgroundStreamManager.cs index 1ff6e74..9c55dbd 100644 --- a/NostrStreamer/Services/BackgroundStreamManager.cs +++ b/NostrStreamer/Services/BackgroundStreamManager.cs @@ -1,3 +1,7 @@ +using Microsoft.EntityFrameworkCore; +using NostrStreamer.Database; +using NostrStreamer.Services.StreamManager; + namespace NostrStreamer.Services; public class BackgroundStreamManager : BackgroundService @@ -19,15 +23,19 @@ public class BackgroundStreamManager : BackgroundService { using var scope = _scopeFactory.CreateScope(); - var streamManager = scope.ServiceProvider.GetRequiredService(); - var srsApi = scope.ServiceProvider.GetRequiredService(); - var viewCounter = scope.ServiceProvider.GetRequiredService(); + var streamManager = scope.ServiceProvider.GetRequiredService(); + var db = scope.ServiceProvider.GetRequiredService(); - var streams = await srsApi.ListStreams(); - foreach (var stream in streams.GroupBy(a => a.Name)) + var liveStreams = await db.Streams + .AsNoTracking() + .Where(a => a.State == UserStreamState.Live) + .Select(a => a.Id) + .ToListAsync(cancellationToken: stoppingToken); + + foreach (var id in liveStreams) { - var viewers = viewCounter.Current(stream.Key); - await streamManager.UpdateViewers(stream.Key, viewers); + var manager = await streamManager.ForStream(id); + await manager.UpdateViewers(); } } catch (Exception ex) diff --git a/NostrStreamer/Services/SrsApi.cs b/NostrStreamer/Services/SrsApi.cs index c66ac1e..6b822d5 100644 --- a/NostrStreamer/Services/SrsApi.cs +++ b/NostrStreamer/Services/SrsApi.cs @@ -18,6 +18,12 @@ public class SrsApi return rsp!.Streams; } + public async Task GetStream(string id) + { + var rsp = await _client.GetFromJsonAsync($"/api/v1/streams/{id}"); + return rsp!; + } + public async Task> ListClients() { var rsp = await _client.GetFromJsonAsync("/api/v1/clients/?count=10000"); diff --git a/NostrStreamer/Services/StreamEventBuilder.cs b/NostrStreamer/Services/StreamEventBuilder.cs new file mode 100644 index 0000000..81f7c38 --- /dev/null +++ b/NostrStreamer/Services/StreamEventBuilder.cs @@ -0,0 +1,97 @@ +using System.ComponentModel; +using Nostr.Client.Client; +using Nostr.Client.Keys; +using Nostr.Client.Messages; +using Nostr.Client.Requests; +using NostrStreamer.Database; + +namespace NostrStreamer.Services; + +public class StreamEventBuilder +{ + private const NostrKind StreamEventKind = (NostrKind)30_311; + private const NostrKind StreamChatKind = (NostrKind)1311; + private readonly Config _config; + private readonly ViewCounter _viewCounter; + private readonly INostrClient _nostrClient; + + public StreamEventBuilder(Config config, ViewCounter viewCounter, INostrClient nostrClient) + { + _config = config; + _viewCounter = viewCounter; + _nostrClient = nostrClient; + } + + public NostrEvent CreateStreamEvent(User user, UserStream stream) + { + var status = stream.State switch + { + UserStreamState.Planned => "planned", + UserStreamState.Live => "live", + UserStreamState.Ended => "ended", + _ => throw new InvalidEnumArgumentException() + }; + + var tags = new List + { + new("d", stream.Id.ToString()), + new("title", user.Title ?? ""), + new("summary", user.Summary ?? ""), + new("streaming", new Uri(_config.DataHost, $"{user.PubKey}.m3u8").ToString()), + new("image", user.Image ?? new Uri(_config.DataHost, $"{user.PubKey}.png").ToString()), + new("status", status), + new("p", user.PubKey, "", "host"), + new("relays", _config.Relays), + }; + + if (status == "live") + { + var viewers = _viewCounter.Current(stream.Id); + var starts = new DateTimeOffset(stream.Starts).ToUnixTimeSeconds(); + tags.Add(new("starts", starts.ToString())); + tags.Add(new("current_participants", viewers.ToString())); + + if (!string.IsNullOrEmpty(user.ContentWarning)) + { + tags.Add(new("content-warning", user.ContentWarning)); + } + } + + foreach (var tag in !string.IsNullOrEmpty(user.Tags) ? + user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty()) + { + tags.Add(new("t", tag)); + } + + var ev = new NostrEvent + { + Kind = StreamEventKind, + Content = "", + CreatedAt = DateTime.Now, + Tags = new NostrEventTags(tags) + }; + + return ev.Sign(NostrPrivateKey.FromBech32(_config.PrivateKey)); + } + + public NostrEvent CreateStreamChat(UserStream stream, string message) + { + var pk = NostrPrivateKey.FromBech32(_config.PrivateKey); + var ev = new NostrEvent + { + Kind = StreamChatKind, + Content = message, + CreatedAt = DateTime.Now, + Tags = new NostrEventTags( + new NostrEventTag("a", $"{StreamEventKind}:{pk.DerivePublicKey().Hex}:{stream.Id}") + ) + }; + + return ev.Sign(pk); + } + + public void BroadcastEvent(NostrEvent ev) + { + _nostrClient.Send(new NostrEventRequest(ev)); + } +} diff --git a/NostrStreamer/Services/StreamManager.cs b/NostrStreamer/Services/StreamManager.cs deleted file mode 100644 index 1400ef3..0000000 --- a/NostrStreamer/Services/StreamManager.cs +++ /dev/null @@ -1,205 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Newtonsoft.Json; -using Nostr.Client.Client; -using Nostr.Client.Json; -using Nostr.Client.Keys; -using Nostr.Client.Messages; -using Nostr.Client.Requests; -using NostrStreamer.Database; - -namespace NostrStreamer.Services; - -public class StreamManager -{ - private const NostrKind StreamEventKind = (NostrKind)30_311; - private const NostrKind StreamChatKind = (NostrKind)1311; - - private readonly ILogger _logger; - private readonly StreamerContext _db; - private readonly Config _config; - private readonly INostrClient _nostr; - private readonly SrsApi _srsApi; - - public StreamManager(ILogger logger, StreamerContext db, Config config, INostrClient nostr, SrsApi srsApi) - { - _logger = logger; - _db = db; - _config = config; - _nostr = nostr; - _srsApi = srsApi; - } - - public async Task StreamStarted(string streamKey) - { - var user = await GetUserFromStreamKey(streamKey); - if (user == default) throw new Exception("No stream key found"); - - _logger.LogInformation("Stream started for: {pubkey}", user.PubKey); - - if (user.Balance <= 0) - { - throw new Exception("User balance empty"); - } - - var ev = CreateStreamEvent(user, "live"); - await PublishEvent(user, ev); - } - - public async Task StreamStopped(string streamKey) - { - var user = await GetUserFromStreamKey(streamKey); - if (user == default) throw new Exception("No stream key found"); - - _logger.LogInformation("Stream stopped for: {pubkey}", user.PubKey); - - var ev = CreateStreamEvent(user, "ended"); - await PublishEvent(user, ev); - } - - public async Task ConsumeQuota(string streamKey, double duration, string clientId) - { - var user = await GetUserFromStreamKey(streamKey); - if (user == default) throw new Exception("No stream key found"); - - const long balanceAlertThreshold = 500_000; - var cost = (long)Math.Ceiling(_config.Cost * (duration / 60d)); - if (cost > 0) - { - await _db.Users - .Where(a => a.PubKey == user.PubKey) - .ExecuteUpdateAsync(o => o.SetProperty(v => v.Balance, v => v.Balance - cost)); - } - - _logger.LogInformation("Stream consumed {n} seconds for {pubkey} costing {cost:#,##0} milli-sats", duration, user.PubKey, cost); - if (user.Balance >= balanceAlertThreshold && user.Balance - cost < balanceAlertThreshold) - { - _nostr.Send(new NostrEventRequest(CreateStreamChat(user, - $"Your balance is below {(int)(balanceAlertThreshold / 1000m)} sats, please topup"))); - } - - if (user.Balance <= 0) - { - _logger.LogInformation("Kicking stream due to low balance"); - await _srsApi.KickClient(clientId); - } - } - - public async Task PatchEvent(string pubkey, string? title, string? summary, string? image, string[]? tags, string? contentWarning) - { - var user = await _db.Users.SingleOrDefaultAsync(a => a.PubKey == pubkey); - if (user == default) throw new Exception("User not found"); - - user.Title = title; - user.Summary = summary; - user.Image = image; - user.Tags = tags != null ? string.Join(",", tags) : null; - user.ContentWarning = contentWarning; - - var ev = CreateStreamEvent(user); - user.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); - - await _db.SaveChangesAsync(); - _nostr.Send(new NostrEventRequest(ev)); - } - - public async Task UpdateViewers(string streamKey, int viewers) - { - var user = await GetUserFromStreamKey(streamKey); - if (user == default) throw new Exception("No stream key found"); - - var existingEvent = user.GetNostrEvent(); - if (existingEvent?.Tags != default && (existingEvent.Tags.FindFirstTagValue("status")?.Equals("ended") ?? false)) return; - - var oldViewers = existingEvent?.Tags?.FindFirstTagValue("current_participants"); - if (string.IsNullOrEmpty(oldViewers) || int.Parse(oldViewers) != viewers) - { - var ev = CreateStreamEvent(user, viewers: viewers); - await PublishEvent(user, ev); - } - } - - private async Task PublishEvent(User user, NostrEvent ev) - { - await _db.Users - .Where(a => a.PubKey == user.PubKey) - .ExecuteUpdateAsync(o => o.SetProperty(v => v.Event, JsonConvert.SerializeObject(ev, NostrSerializer.Settings))); - - _nostr.Send(new NostrEventRequest(ev)); - } - - private NostrEvent CreateStreamChat(User user, string message) - { - var pk = NostrPrivateKey.FromBech32(_config.PrivateKey); - var ev = new NostrEvent - { - Kind = StreamChatKind, - Content = message, - CreatedAt = DateTime.Now, - Tags = new NostrEventTags( - new NostrEventTag("a", $"{StreamEventKind}:{pk.DerivePublicKey().Hex}:{user.PubKey}") - ) - }; - - return ev.Sign(pk); - } - - private NostrEvent CreateStreamEvent(User user, string? state = null, int? viewers = null) - { - var existingEvent = user.GetNostrEvent(); - var status = state ?? existingEvent?.Tags?.FindFirstTagValue("status") ?? "ended"; - - var tags = new List - { - new("d", user.PubKey), - new("title", user.Title ?? ""), - new("summary", user.Summary ?? ""), - new("streaming", GetStreamUrl(user)), - new("image", user.Image ?? ""), - new("status", status), - new("p", user.PubKey, "", "host"), - new("relays", _config.Relays), - }; - - if (status == "live") - { - var starts = existingEvent?.Tags?.FindFirstTagValue("starts") ?? DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString(); - tags.Add(new("starts", starts)); - tags.Add( - new("current_participants", - (viewers.HasValue ? viewers.ToString() : null) ?? - existingEvent?.Tags?.FindFirstTagValue("current_participants") ?? "0")); - - if (!string.IsNullOrEmpty(user.ContentWarning)) - { - tags.Add(new("content-warning", user.ContentWarning)); - } - } - - foreach (var tag in !string.IsNullOrEmpty(user.Tags) ? - user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty()) - { - tags.Add(new("t", tag)); - } - - var ev = new NostrEvent - { - Kind = StreamEventKind, - Content = "", - CreatedAt = DateTime.Now, - Tags = new NostrEventTags(tags) - }; - - return ev.Sign(NostrPrivateKey.FromBech32(_config.PrivateKey)); - } - - private string GetStreamUrl(User u) - { - var ub = new Uri(_config.DataHost, $"{u.PubKey}.m3u8"); - return ub.ToString(); - } - - private async Task GetUserFromStreamKey(string streamKey) - { - return await _db.Users.SingleOrDefaultAsync(a => a.StreamKey == streamKey); - } -} diff --git a/NostrStreamer/Services/StreamManager/IStreamManager.cs b/NostrStreamer/Services/StreamManager/IStreamManager.cs new file mode 100644 index 0000000..e85d421 --- /dev/null +++ b/NostrStreamer/Services/StreamManager/IStreamManager.cs @@ -0,0 +1,69 @@ +using NostrStreamer.Database; + +namespace NostrStreamer.Services.StreamManager; + +public interface IStreamManager +{ + /// + /// Return the current stream + /// + /// + UserStream GetStream(); + + /// + /// Stream ingress check on srs-edge + /// + /// List of forward URLs + Task> OnForward(); + + /// + /// Stream started at origin for HLS split + /// + /// + Task StreamStarted(); + + /// + /// Stream stopped + /// + /// + Task StreamStopped(); + + /// + /// Stream reap HLS + /// + /// + /// + Task ConsumeQuota(double duration); + + /// + /// Update stream details + /// + /// + /// + /// + /// + /// + /// + Task PatchEvent(string? title, string? summary, string? image, string[]? tags, string? contentWarning); + + /// + /// Update viewer count + /// + public Task UpdateViewers(); + + /// + /// Add a guest to the stream + /// + /// + /// + /// + /// + Task AddGuest(string pubkey, string role, decimal zapSplit); + + /// + /// Remove guest from the stream + /// + /// + /// + Task RemoveGuest(string pubkey); +} \ No newline at end of file diff --git a/NostrStreamer/Services/StreamManager/LowBalanceException.cs b/NostrStreamer/Services/StreamManager/LowBalanceException.cs new file mode 100644 index 0000000..9a5609c --- /dev/null +++ b/NostrStreamer/Services/StreamManager/LowBalanceException.cs @@ -0,0 +1,8 @@ +namespace NostrStreamer.Services.StreamManager; + +public class LowBalanceException : Exception +{ + public LowBalanceException(string message) : base(message) + { + } +} diff --git a/NostrStreamer/Services/StreamManager/NostrStreamManager.cs b/NostrStreamer/Services/StreamManager/NostrStreamManager.cs new file mode 100644 index 0000000..ac13018 --- /dev/null +++ b/NostrStreamer/Services/StreamManager/NostrStreamManager.cs @@ -0,0 +1,167 @@ +using Microsoft.EntityFrameworkCore; +using Newtonsoft.Json; +using Nostr.Client.Json; +using NostrStreamer.Database; + +namespace NostrStreamer.Services.StreamManager; + +public class NostrStreamManager : IStreamManager +{ + private readonly ILogger _logger; + private readonly StreamManagerContext _context; + private readonly StreamEventBuilder _eventBuilder; + private readonly SrsApi _srsApi; + + public NostrStreamManager(ILogger logger, StreamManagerContext context, + StreamEventBuilder eventBuilder, SrsApi srsApi) + { + _logger = logger; + _context = context; + _eventBuilder = eventBuilder; + _srsApi = srsApi; + } + + public UserStream GetStream() + { + return _context.UserStream; + } + + public Task> OnForward() + { + if (_context.User.Balance <= 0) + { + throw new LowBalanceException("User balance empty"); + } + + return Task.FromResult(new List() + { + $"rtmp://localhost/{_context.UserStream.Endpoint.App}/{_context.User.StreamKey}?vhost={_context.UserStream.Endpoint.Forward}" + }); + } + + public async Task StreamStarted() + { + _logger.LogInformation("Stream started for: {pubkey}", _context.User.PubKey); + + if (_context.User.Balance <= 0) + { + throw new Exception("User balance empty"); + } + + await UpdateStreamState(UserStreamState.Live); + } + + public async Task StreamStopped() + { + _logger.LogInformation("Stream stopped for: {pubkey}", _context.User.PubKey); + + await UpdateStreamState(UserStreamState.Ended); + } + + public async Task ConsumeQuota(double duration) + { + const long balanceAlertThreshold = 500_000; + var cost = (long)Math.Ceiling(_context.UserStream.Endpoint.Cost * (duration / 60d)); + if (cost > 0) + { + await _context.Db.Users + .Where(a => a.PubKey == _context.User.PubKey) + .ExecuteUpdateAsync(o => o.SetProperty(v => v.Balance, v => v.Balance - cost)); + } + + _logger.LogInformation("Stream consumed {n} seconds for {pubkey} costing {cost:#,##0} milli-sats", duration, _context.User.PubKey, + cost); + + if (_context.User.Balance >= balanceAlertThreshold && _context.User.Balance - cost < balanceAlertThreshold) + { + var chat = _eventBuilder.CreateStreamChat(_context.UserStream, + $"Your balance is below {(int)(balanceAlertThreshold / 1000m)} sats, please topup"); + + _eventBuilder.BroadcastEvent(chat); + } + + if (_context.User.Balance <= 0) + { + _logger.LogInformation("Kicking stream due to low balance"); + await _srsApi.KickClient(_context.UserStream.ClientId); + } + } + + public async Task PatchEvent(string? title, string? summary, string? image, string[]? tags, string? contentWarning) + { + var user = _context.User; + + await _context.Db.Users + .Where(a => a.PubKey == _context.User.PubKey) + .ExecuteUpdateAsync(o => o.SetProperty(v => v.Title, title) + .SetProperty(v => v.Summary, summary) + .SetProperty(v => v.Image, image) + .SetProperty(v => v.Tags, tags != null ? string.Join(",", tags) : null) + .SetProperty(v => v.ContentWarning, contentWarning)); + + user.Title = title; + user.Summary = summary; + user.Image = image; + user.Tags = tags != null ? string.Join(",", tags) : null; + user.ContentWarning = contentWarning; + + var ev = _eventBuilder.CreateStreamEvent(user, _context.UserStream); + await _context.Db.Streams.Where(a => a.Id == _context.UserStream.Id) + .ExecuteUpdateAsync(o => o.SetProperty(v => v.Event, JsonConvert.SerializeObject(ev, NostrSerializer.Settings))); + + _eventBuilder.BroadcastEvent(ev); + } + + public async Task AddGuest(string pubkey, string role, decimal zapSplit) + { + _context.Db.Guests.Add(new() + { + StreamId = _context.UserStream.Id, + PubKey = pubkey, + Role = role, + ZapSplit = zapSplit + }); + + await _context.Db.SaveChangesAsync(); + } + + public async Task RemoveGuest(string pubkey) + { + await _context.Db.Guests + .Where(a => a.PubKey == pubkey && a.StreamId == _context.UserStream.Id) + .ExecuteDeleteAsync(); + } + + public async Task UpdateViewers() + { + if (_context.UserStream.State is not UserStreamState.Live) return; + + var existingEvent = _context.UserStream.GetEvent(); + var oldViewers = existingEvent?.Tags?.FindFirstTagValue("current_participants"); + + var newEvent = _eventBuilder.CreateStreamEvent(_context.User, _context.UserStream); + var newViewers = newEvent?.Tags?.FindFirstTagValue("current_participants"); + + if (newEvent != default && int.TryParse(oldViewers, out var a) && int.TryParse(newViewers, out var b) && a != b) + { + await _context.Db.Streams.Where(a => a.Id == _context.UserStream.Id) + .ExecuteUpdateAsync(o => o.SetProperty(v => v.Event, JsonConvert.SerializeObject(newEvent, NostrSerializer.Settings))); + + _eventBuilder.BroadcastEvent(newEvent); + } + } + + private async Task UpdateStreamState(UserStreamState state) + { + _context.UserStream.State = state; + var ev = _eventBuilder.CreateStreamEvent(_context.User, _context.UserStream); + + DateTime? ends = state == UserStreamState.Ended ? DateTime.UtcNow : null; + await _context.Db.Streams.Where(a => a.Id == _context.UserStream.Id) + .ExecuteUpdateAsync(o => o.SetProperty(v => v.State, state) + .SetProperty(v => v.Event, JsonConvert.SerializeObject(ev, NostrSerializer.Settings)) + .SetProperty(v => v.Ends, ends)); + + _eventBuilder.BroadcastEvent(ev); + } +} diff --git a/NostrStreamer/Services/StreamManager/StreamInfo.cs b/NostrStreamer/Services/StreamManager/StreamInfo.cs new file mode 100644 index 0000000..e244b6e --- /dev/null +++ b/NostrStreamer/Services/StreamManager/StreamInfo.cs @@ -0,0 +1,12 @@ +namespace NostrStreamer.Services.StreamManager; + +public class StreamInfo +{ + public string App { get; init; } = null!; + + public string Variant { get; init; } = null!; + + public string StreamKey { get; init; } = null!; + + public string ClientId { get; init; } = null!; +} diff --git a/NostrStreamer/Services/StreamManager/StreamManagerContext.cs b/NostrStreamer/Services/StreamManager/StreamManagerContext.cs new file mode 100644 index 0000000..b67a227 --- /dev/null +++ b/NostrStreamer/Services/StreamManager/StreamManagerContext.cs @@ -0,0 +1,10 @@ +using NostrStreamer.Database; + +namespace NostrStreamer.Services.StreamManager; + +public class StreamManagerContext +{ + public StreamerContext Db { get; init; } = null!; + public UserStream UserStream { get; init; } = null!; + public User User => UserStream.User; +} diff --git a/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs b/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs new file mode 100644 index 0000000..6c2c0eb --- /dev/null +++ b/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs @@ -0,0 +1,119 @@ +using Microsoft.EntityFrameworkCore; +using Newtonsoft.Json; +using Nostr.Client.Json; +using NostrStreamer.Database; + +namespace NostrStreamer.Services.StreamManager; + +public class StreamManagerFactory +{ + private readonly StreamerContext _db; + private readonly ILoggerFactory _loggerFactory; + private readonly StreamEventBuilder _eventBuilder; + private readonly SrsApi _srsApi; + + public StreamManagerFactory(StreamerContext db, ILoggerFactory loggerFactory, StreamEventBuilder eventBuilder, + SrsApi srsApi) + { + _db = db; + _loggerFactory = loggerFactory; + _eventBuilder = eventBuilder; + _srsApi = srsApi; + } + + public async Task ForStream(Guid id) + { + var currentStream = await _db.Streams + .AsNoTracking() + .Include(a => a.User) + .Include(a => a.Endpoint) + .FirstOrDefaultAsync(a => a.Id == id); + + if (currentStream == default) throw new Exception("No live stream"); + + var ctx = new StreamManagerContext + { + Db = _db, + UserStream = currentStream + }; + + return new NostrStreamManager(_loggerFactory.CreateLogger(), ctx, _eventBuilder, _srsApi); + } + + public async Task ForCurrentStream(string pubkey) + { + var currentStream = await _db.Streams + .AsNoTracking() + .Include(a => a.User) + .Include(a => a.Endpoint) + .FirstOrDefaultAsync(a => a.PubKey.Equals(pubkey) && a.State == UserStreamState.Live); + + if (currentStream == default) throw new Exception("No live stream"); + + var ctx = new StreamManagerContext + { + Db = _db, + UserStream = currentStream + }; + + return new NostrStreamManager(_loggerFactory.CreateLogger(), ctx, _eventBuilder, _srsApi); + } + + public async Task ForStream(StreamInfo info) + { + var stream = await _db.Streams + .AsNoTracking() + .Include(a => a.User) + .Include(a => a.Endpoint) + .FirstOrDefaultAsync(a => + a.ClientId.Equals(info.ClientId) && + a.User.StreamKey.Equals(info.StreamKey) && + a.Endpoint.App.Equals(info.App)); + + if (stream == default) + { + var user = await _db.Users + .AsNoTracking() + .SingleOrDefaultAsync(a => a.StreamKey.Equals(info.StreamKey)); + + if (user == default) throw new Exception("No user found"); + + var ep = await _db.Endpoints + .AsNoTracking() + .SingleOrDefaultAsync(a => a.App.Equals(info.App)); + + if (ep == default) throw new Exception("No endpoint found"); + + stream = new() + { + EndpointId = ep.Id, + PubKey = user.PubKey, + ClientId = info.ClientId, + State = UserStreamState.Planned + }; + + var ev = _eventBuilder.CreateStreamEvent(user, stream); + stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); + _db.Streams.Add(stream); + await _db.SaveChangesAsync(); + + // replace again with new values + stream = new() + { + Id = stream.Id, + User = user, + Endpoint = ep, + ClientId = info.ClientId, + State = UserStreamState.Planned, + }; + } + + var ctx = new StreamManagerContext + { + Db = _db, + UserStream = stream + }; + + return new NostrStreamManager(_loggerFactory.CreateLogger(), ctx, _eventBuilder, _srsApi); + } +} diff --git a/NostrStreamer/Services/UserBalanceService.cs b/NostrStreamer/Services/UserBalanceService.cs new file mode 100644 index 0000000..78a5aed --- /dev/null +++ b/NostrStreamer/Services/UserBalanceService.cs @@ -0,0 +1,47 @@ +using System.Security.Cryptography; +using System.Text; +using Microsoft.EntityFrameworkCore; +using Nostr.Client.Utils; +using NostrStreamer.Database; + +namespace NostrStreamer.Services; + +public class UserService +{ + private readonly StreamerContext _db; + private readonly LndNode _lnd; + + public UserService(StreamerContext db, LndNode lnd) + { + _db = db; + _lnd = lnd; + } + + public async Task CreateTopup(string pubkey, ulong amount, string? nostr) + { + var user = await GetUser(pubkey); + if (user == default) throw new Exception("No user found"); + + var descHash = string.IsNullOrEmpty(nostr) ? null : SHA256.HashData(Encoding.UTF8.GetBytes(nostr)).ToHex(); + var invoice = await _lnd.AddInvoice(amount * 1000, TimeSpan.FromMinutes(10), $"Top up for {pubkey}", descHash); + _db.Payments.Add(new() + { + PubKey = pubkey, + Amount = amount, + Invoice = invoice.PaymentRequest, + PaymentHash = invoice.RHash.ToByteArray().ToHex(), + Nostr = nostr, + Type = string.IsNullOrEmpty(nostr) ? PaymentType.Topup : PaymentType.Zap + }); + + await _db.SaveChangesAsync(); + + return invoice.PaymentRequest; + } + + public async Task GetUser(string pubkey) + { + return await _db.Users.AsNoTracking() + .SingleOrDefaultAsync(a => a.PubKey.Equals(pubkey)); + } +} diff --git a/NostrStreamer/Services/ViewCounter.cs b/NostrStreamer/Services/ViewCounter.cs index 773eba3..e7cd61e 100644 --- a/NostrStreamer/Services/ViewCounter.cs +++ b/NostrStreamer/Services/ViewCounter.cs @@ -4,15 +4,15 @@ namespace NostrStreamer.Services; public class ViewCounter { - private readonly ConcurrentDictionary> _sessions = new(); + private readonly ConcurrentDictionary> _sessions = new(); - public void Activity(string key, string token) + public void Activity(Guid id, string token) { - if (!_sessions.ContainsKey(key)) + if (!_sessions.ContainsKey(id)) { - _sessions.TryAdd(key, new()); + _sessions.TryAdd(id, new()); } - if (_sessions.TryGetValue(key, out var x)) + if (_sessions.TryGetValue(id, out var x)) { x[token] = DateTime.Now; } @@ -36,9 +36,9 @@ public class ViewCounter } } - public int Current(string key) + public int Current(Guid id) { - if (_sessions.TryGetValue(key, out var x)) + if (_sessions.TryGetValue(id, out var x)) { return x.Count; } diff --git a/NostrStreamer/appsettings.json b/NostrStreamer/appsettings.json index 3e69a54..7f40aba 100644 --- a/NostrStreamer/appsettings.json +++ b/NostrStreamer/appsettings.json @@ -11,30 +11,41 @@ "Database": "User ID=postgres;Password=postgres;Database=streaming;Pooling=true;Host=127.0.0.1:5431" }, "Config": { - "RtmpHost": "rtmp://localhost:1935", - "SrsHttpHost": "http://localhost:8082", - "SrsApiHost": "http://localhost:1985", + "RtmpHost": "rtmp://localhost:9005", + "SrsHttpHost": "http://localhost:9003", + "SrsApiHost": "http://localhost:9002", "DataHost": "http://localhost:5295/api/playlist/", - "App": "test", - "Relays": ["ws://localhost:8081"], + "ApiHost": "http://localhost:5295", + "Relays": [ + "ws://localhost:8081" + ], "PrivateKey": "nsec1yqtv8s8y9krh6l8pwp09lk2jkulr9e0klu95tlk7dgus9cklr4ssdv3d88", "Lnd": { "Endpoint": "https://localhost:10002", "CertPath": "/Users/kieran/.polar/networks/1/volumes/lnd/bob/tls.cert", "MacaroonPath": "/Users/kieran/.polar/networks/1/volumes/lnd/bob/data/chain/bitcoin/regtest/admin.macaroon" }, - "Variants": [ + "Endpoints": [ { - "Name": "source", - "Width": 1920, - "Height": 1080, - "Bandwidth": 8000 + "Name": "Premium", + "App": "test", + "Capabilities": [ + "variant:source", + "variant:240h:500", + "hls" + ], + "Forward": "full.in.zap.stream", + "Cost": 10000 }, { - "Name": "240p", - "Width": 426, - "Height": 240, - "Bandwidth": 500 + "Name": "Passthrough", + "App": "test2", + "Capabilities": [ + "variant:source", + "hls" + ], + "Forward": "base.in.zap.stream", + "Cost": 2100 } ] } diff --git a/docker-compose.yaml b/docker-compose.yaml index f91fc95..36b9bb7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,13 +1,22 @@ services: - srs: - image: ossrs/srs + srs-origin: + image: ossrs/srs:5 volumes: - - "./docker/srs.conf:/usr/local/srs/conf/docker.conf" + - "./docker/srs-origin.conf:/usr/local/srs/conf/srs.conf" ports: - - "1935:1935" - - "1985:1985" - - "8082:8080" - - "8003:8000" + - "9001:1935" + - "9002:1985" + - "9003:8080" + - "9004:8000" + srs-edge: + image: ossrs/srs:5 + volumes: + - "./docker/srs-edge.conf:/usr/local/srs/conf/srs.conf" + ports: + - "9005:1935" + - "9006:1985" + - "9007:8080" + - "9008:8000" nostr: image: scsibug/nostr-rs-relay ports: diff --git a/docker/srs.conf b/docker/srs-edge.conf similarity index 76% rename from docker/srs.conf rename to docker/srs-edge.conf index f2ac09f..ea01965 100644 --- a/docker/srs.conf +++ b/docker/srs-edge.conf @@ -2,44 +2,31 @@ listen 1935; max_connections 1000; daemon off; srs_log_tank console; + http_api { enabled on; listen 1985; } + http_server { enabled on; listen 8080; } + rtc_server { enabled on; listen 8000; candidate *; } -vhost transcode { - hls { - enabled on; - hls_dispose 30; - hls_fragment 2; - hls_window 10; - } - rtc { - enabled on; - rtmp_to_rtc on; - rtc_to_rtmp on; - } - http_remux { - enabled on; - mount [vhost]/[app]/[stream].ts; - } - http_hooks { - enabled on; - on_publish http://10.100.2.226:5295/api/srs; - on_unpublish http://10.100.2.226:5295/api/srs; - on_hls http://10.100.2.226:5295/api/srs; + +vhost hls.zap.stream { + cluster { + mode remote; + origin srs-origin; } } -vhost __defaultVhost__ { +vhost base.in.zap.stream { transcode { enabled on; ffmpeg ./objs/ffmpeg/bin/ffmpeg; @@ -48,9 +35,23 @@ vhost __defaultVhost__ { enabled on; vcodec copy; acodec copy; - output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=transcode; + output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=hls.zap.stream; } - engine 720p { + } +} + +vhost full.in.zap.stream { + transcode { + enabled on; + ffmpeg ./objs/ffmpeg/bin/ffmpeg; + + engine source { + enabled on; + vcodec copy; + acodec copy; + output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=hls.zap.stream; + } + engine 720h { enabled on; vcodec libx264; vbitrate 3000; @@ -68,9 +69,9 @@ vhost __defaultVhost__ { abitrate 160; asample_rate 44100; achannels 2; - output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=transcode; + output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=hls.zap.stream; } - engine 480p { + engine 480h { enabled off; vcodec libx264; vbitrate 1000; @@ -88,9 +89,9 @@ vhost __defaultVhost__ { abitrate 96; asample_rate 44100; achannels 2; - output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=transcode; + output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=hls.zap.stream; } - engine 240p { + engine 240h { enabled off; vcodec libx264; vbitrate 500; @@ -108,7 +109,15 @@ vhost __defaultVhost__ { abitrate 72; asample_rate 44100; achannels 2; - output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=transcode; + output rtmp://127.0.0.1:[port]/[app]/[engine]/[stream]?vhost=hls.zap.stream; } } } + +# forward ingest, api decides route +vhost __defaultVhost__ { + forward { + enabled on; + backend http://10.100.2.226:5295/api/srs; + } +} \ No newline at end of file diff --git a/docker/srs-origin.conf b/docker/srs-origin.conf new file mode 100644 index 0000000..944faee --- /dev/null +++ b/docker/srs-origin.conf @@ -0,0 +1,55 @@ +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; + +http_api { + enabled on; + listen 1985; +} + +http_server { + enabled on; + listen 8080; +} + +vhost hls.zap.stream { + cluster { + mode local; + } + + hls { + enabled on; + hls_dispose 30; + hls_fragment 2; + hls_window 20; + } + + rtc { + enabled on; + rtmp_to_rtc on; + } + + http_hooks { + enabled on; + on_publish http://10.100.2.226:5295/api/srs; + on_unpublish http://10.100.2.226:5295/api/srs; + on_hls http://10.100.2.226:5295/api/srs; + } + + transcode { + enabled on; + ffmpeg ./objs/ffmpeg/bin/ffmpeg; + + engine { + enabled on; + vcodec png; + acodec an; + vparams { + vframes 1; + } + oformat image2; + output ./objs/nginx/html/[app]/[stream].png; + } + } +} \ No newline at end of file