From 053d34cde75d908c2335be62ef4a604843d4fa8d Mon Sep 17 00:00:00 2001 From: Kieran Date: Mon, 18 Dec 2023 12:19:09 +0000 Subject: [PATCH] Push notifications --- NostrStreamer/ApiModel/PushMessage.cs | 24 + .../ApiModel/PushSubscriptionRequest.cs | 18 + NostrStreamer/Config.cs | 12 + NostrStreamer/Controllers/NostrController.cs | 147 +++++- .../PushSubscriptionConfiguration.cs | 32 ++ .../PushSubscriptionTargetConfiguration.cs | 23 + NostrStreamer/Database/PushSubscription.cs | 23 + .../Database/PushSubscriptionTarget.cs | 14 + NostrStreamer/Database/StreamerContext.cs | 4 + NostrStreamer/Extensions.cs | 11 + ...231218112430_PushSubscriptions.Designer.cs | 458 ++++++++++++++++++ .../20231218112430_PushSubscriptions.cs | 67 +++ .../StreamerContextModelSnapshot.cs | 64 +++ NostrStreamer/NostrStreamer.csproj | 3 + NostrStreamer/Program.cs | 39 +- NostrStreamer/Services/EventStream.cs | 49 ++ NostrStreamer/Services/PushSender.cs | 133 +++++ NostrStreamer/Services/SnortApi.cs | 50 ++ NostrStreamer/appsettings.json | 8 +- 19 files changed, 1175 insertions(+), 4 deletions(-) create mode 100644 NostrStreamer/ApiModel/PushMessage.cs create mode 100644 NostrStreamer/ApiModel/PushSubscriptionRequest.cs create mode 100644 NostrStreamer/Database/Configuration/PushSubscriptionConfiguration.cs create mode 100644 NostrStreamer/Database/Configuration/PushSubscriptionTargetConfiguration.cs create mode 100644 NostrStreamer/Database/PushSubscription.cs create mode 100644 NostrStreamer/Database/PushSubscriptionTarget.cs create mode 100644 NostrStreamer/Migrations/20231218112430_PushSubscriptions.Designer.cs create mode 100644 NostrStreamer/Migrations/20231218112430_PushSubscriptions.cs create mode 100644 NostrStreamer/Services/EventStream.cs create mode 100644 NostrStreamer/Services/PushSender.cs create mode 100644 NostrStreamer/Services/SnortApi.cs diff --git a/NostrStreamer/ApiModel/PushMessage.cs b/NostrStreamer/ApiModel/PushMessage.cs new file mode 100644 index 0000000..2125c15 --- /dev/null +++ b/NostrStreamer/ApiModel/PushMessage.cs @@ -0,0 +1,24 @@ +using Newtonsoft.Json; + +namespace NostrStreamer.ApiModel; + +public enum PushMessageType +{ + StreamStarted = 1 +} + +public class PushMessage +{ + [JsonProperty("type")] + public PushMessageType Type { get; init; } + + [JsonProperty("pubkey")] + public string Pubkey { get; init; } = null!; + + [JsonProperty("name")] + public string? Name { get; init; } + + [JsonProperty("avatar")] + public string? Avatar { get; init; } + +} diff --git a/NostrStreamer/ApiModel/PushSubscriptionRequest.cs b/NostrStreamer/ApiModel/PushSubscriptionRequest.cs new file mode 100644 index 0000000..147f91e --- /dev/null +++ b/NostrStreamer/ApiModel/PushSubscriptionRequest.cs @@ -0,0 +1,18 @@ +using Newtonsoft.Json; + +namespace NostrStreamer.ApiModel; + +public class PushSubscriptionRequest +{ + [JsonProperty("endpoint")] + public string Endpoint { get; init; } = null!; + + [JsonProperty("auth")] + public string Auth { get; init; } = null!; + + [JsonProperty("key")] + public string Key { get; init; } = null!; + + [JsonProperty("scope")] + public string Scope { get; init; } = null!; +} diff --git a/NostrStreamer/Config.cs b/NostrStreamer/Config.cs index 9e592c1..e6bfe5a 100644 --- a/NostrStreamer/Config.cs +++ b/NostrStreamer/Config.cs @@ -48,6 +48,18 @@ public class Config public TwitchApi Twitch { get; init; } = null!; public string DataProtectionKeyPath { get; init; } = null!; + + public VapidKeyDetails VapidKey { get; init; } = null!; + + public string Redis { get; init; } = null!; + + public Uri SnortApi { get; init; } = null!; +} + +public class VapidKeyDetails +{ + public string PublicKey { get; init; } = null!; + public string PrivateKey { get; init; } = null!; } public class TwitchApi diff --git a/NostrStreamer/Controllers/NostrController.cs b/NostrStreamer/Controllers/NostrController.cs index f95b798..ac65f47 100644 --- a/NostrStreamer/Controllers/NostrController.cs +++ b/NostrStreamer/Controllers/NostrController.cs @@ -5,11 +5,13 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; using Newtonsoft.Json; using Nostr.Client.Json; +using Nostr.Client.Messages; using NostrStreamer.ApiModel; using NostrStreamer.Database; using NostrStreamer.Services; using NostrStreamer.Services.Clips; using NostrStreamer.Services.StreamManager; +using WebPush; namespace NostrStreamer.Controllers; @@ -23,15 +25,19 @@ public class NostrController : Controller private readonly StreamManagerFactory _streamManagerFactory; private readonly UserService _userService; private readonly IClipService _clipService; + private readonly ILogger _logger; + private readonly PushSender _pushSender; public NostrController(StreamerContext db, Config config, StreamManagerFactory streamManager, UserService userService, - IClipService clipService) + IClipService clipService, ILogger logger, PushSender pushSender) { _db = db; _config = config; _streamManagerFactory = streamManager; _userService = userService; _clipService = clipService; + _logger = logger; + _pushSender = pushSender; } [HttpGet("account")] @@ -206,6 +212,145 @@ public class NostrController : Controller return File(fs, "video/mp4", enableRangeProcessing: true); } + [HttpGet("notifications/info")] + [AllowAnonymous] + public IActionResult GetInfo() + { + return Json(new + { + publicKey = _config.VapidKey.PublicKey + }); + } + +#if DEBUG + [HttpGet("notifications/generate-keys")] + [AllowAnonymous] + public IActionResult GenerateKeys() + { + var vapidKeys = VapidHelper.GenerateVapidKeys(); + + return Json(new + { + publicKey = vapidKeys.PublicKey, + privateKey = vapidKeys.PrivateKey + }); + } + + [HttpPost("notifications/test")] + [AllowAnonymous] + public void TestNotification([FromBody] NostrEvent ev) + { + _pushSender.Add(ev); + } + +#endif + + [HttpPost("notifications/register")] + public async Task Register([FromBody] PushSubscriptionRequest sub) + { + if (string.IsNullOrEmpty(sub.Endpoint) || string.IsNullOrEmpty(sub.Auth) || string.IsNullOrEmpty(sub.Key)) + return BadRequest(); + + var pubkey = GetPubKey(); + if (string.IsNullOrEmpty(pubkey)) + return BadRequest(); + + var count = await _db.PushSubscriptions.CountAsync(a => a.Pubkey == pubkey); + if (count >= 5) + return Json(new + { + error = "Too many active subscriptions" + }); + + var existing = await _db.PushSubscriptions.FirstOrDefaultAsync(a => a.Key == sub.Key); + if (existing != default) + { + return Json(new {id = existing.Id}); + } + + var newId = Guid.NewGuid(); + _db.PushSubscriptions.Add(new() + { + Id = newId, + Pubkey = pubkey, + Endpoint = sub.Endpoint, + Key = sub.Key, + Auth = sub.Auth, + Scope = sub.Scope + }); + + await _db.SaveChangesAsync(); + _logger.LogInformation("{pubkey} registered for notifications", pubkey); + return Json(new + { + id = newId + }); + } + + [HttpGet("notifications")] + public async Task ListNotifications([FromQuery] string auth) + { + var userPubkey = GetPubKey(); + if (string.IsNullOrEmpty(userPubkey)) + return BadRequest(); + + var sub = await _db.PushSubscriptionTargets + .Join(_db.PushSubscriptions, a => a.SubscriberPubkey, b => b.Pubkey, + (a, b) => new {a.SubscriberPubkey, a.TargetPubkey, b.Auth}) + .Where(a => a.SubscriberPubkey == userPubkey && a.Auth == auth) + .Select(a => a.TargetPubkey) + .ToListAsync(); + + return Json(sub); + } + + [HttpPatch("notifications")] + public async Task RegisterForStreamer([FromQuery] string pubkey) + { + if (string.IsNullOrEmpty(pubkey)) return BadRequest(); + + var userPubkey = GetPubKey(); + if (string.IsNullOrEmpty(userPubkey)) + return BadRequest(); + + var sub = await _db.PushSubscriptionTargets + .CountAsync(a => a.SubscriberPubkey == userPubkey && a.TargetPubkey == pubkey); + + if (sub > 0) return Ok(); + + _db.PushSubscriptionTargets.Add(new() + { + SubscriberPubkey = userPubkey, + TargetPubkey = pubkey + }); + + await _db.SaveChangesAsync(); + + return Accepted(); + } + + [HttpDelete("notifications")] + public async Task UnregisterForStreamer([FromQuery] string pubkey) + { + if (string.IsNullOrEmpty(pubkey)) return BadRequest(); + + var userPubkey = GetPubKey(); + if (string.IsNullOrEmpty(userPubkey)) + return BadRequest(); + + var sub = await _db.PushSubscriptionTargets + .AsNoTracking() + .FirstOrDefaultAsync(a => a.SubscriberPubkey == userPubkey && a.TargetPubkey == pubkey); + + if (sub == default) return NotFound(); + + await _db.PushSubscriptionTargets + .Where(a => a.Id == sub.Id) + .ExecuteDeleteAsync(); + + return Accepted(); + } + private async Task GetUser() { var pk = GetPubKey(); diff --git a/NostrStreamer/Database/Configuration/PushSubscriptionConfiguration.cs b/NostrStreamer/Database/Configuration/PushSubscriptionConfiguration.cs new file mode 100644 index 0000000..a4f43bc --- /dev/null +++ b/NostrStreamer/Database/Configuration/PushSubscriptionConfiguration.cs @@ -0,0 +1,32 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace NostrStreamer.Database.Configuration; + +public class PushSubscriptionConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(a => a.Id); + builder.Property(a => a.Created) + .IsRequired(); + + builder.Property(a => a.LastUsed) + .IsRequired(); + + builder.Property(a => a.Pubkey) + .IsRequired(); + + builder.Property(a => a.Endpoint) + .IsRequired(); + + builder.Property(a => a.Auth) + .IsRequired(); + + builder.Property(a => a.Key) + .IsRequired(); + + builder.Property(a => a.Scope) + .IsRequired(); + } +} diff --git a/NostrStreamer/Database/Configuration/PushSubscriptionTargetConfiguration.cs b/NostrStreamer/Database/Configuration/PushSubscriptionTargetConfiguration.cs new file mode 100644 index 0000000..480f1c3 --- /dev/null +++ b/NostrStreamer/Database/Configuration/PushSubscriptionTargetConfiguration.cs @@ -0,0 +1,23 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace NostrStreamer.Database.Configuration; + +public class PushSubscriptionTargetConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(a => a.Id); + + builder.Property(a => a.TargetPubkey) + .IsRequired(); + + builder.Property(a => a.SubscriberPubkey) + .IsRequired(); + + builder.HasIndex(a => a.TargetPubkey); + + builder.HasIndex(a => new {a.SubscriberPubkey, a.TargetPubkey}) + .IsUnique(); + } +} diff --git a/NostrStreamer/Database/PushSubscription.cs b/NostrStreamer/Database/PushSubscription.cs new file mode 100644 index 0000000..9d8a28a --- /dev/null +++ b/NostrStreamer/Database/PushSubscription.cs @@ -0,0 +1,23 @@ +using System.ComponentModel.DataAnnotations; + +namespace NostrStreamer.Database; + +public class PushSubscription +{ + public Guid Id { get; init; } = Guid.NewGuid(); + + public DateTime Created { get; init; } = DateTime.UtcNow; + + public DateTime LastUsed { get; init; } = DateTime.UtcNow; + + [MaxLength(64)] + public string Pubkey { get; init; } = null!; + + public string Endpoint { get; init; } = null!; + + public string Key { get; init; } = null!; + + public string Auth { get; init; } = null!; + + public string Scope { get; init; } = null!; +} diff --git a/NostrStreamer/Database/PushSubscriptionTarget.cs b/NostrStreamer/Database/PushSubscriptionTarget.cs new file mode 100644 index 0000000..4032b1b --- /dev/null +++ b/NostrStreamer/Database/PushSubscriptionTarget.cs @@ -0,0 +1,14 @@ +using System.ComponentModel.DataAnnotations; + +namespace NostrStreamer.Database; + +public class PushSubscriptionTarget +{ + public Guid Id { get; init; } = Guid.NewGuid(); + + [MaxLength(64)] + public string SubscriberPubkey { get; init; } = null!; + + [MaxLength(64)] + public string TargetPubkey { get; init; } = null!; +} diff --git a/NostrStreamer/Database/StreamerContext.cs b/NostrStreamer/Database/StreamerContext.cs index ff68619..597d65d 100644 --- a/NostrStreamer/Database/StreamerContext.cs +++ b/NostrStreamer/Database/StreamerContext.cs @@ -33,4 +33,8 @@ public class StreamerContext : DbContext public DbSet Forwards => Set(); public DbSet Clips => Set(); + + public DbSet PushSubscriptions => Set(); + + public DbSet PushSubscriptionTargets => Set(); } diff --git a/NostrStreamer/Extensions.cs b/NostrStreamer/Extensions.cs index 843c6f0..e99e47f 100644 --- a/NostrStreamer/Extensions.cs +++ b/NostrStreamer/Extensions.cs @@ -91,6 +91,17 @@ public static class Extensions return 6376500.0 * (2.0 * Math.Atan2(Math.Sqrt(d3), Math.Sqrt(1.0 - d3))); } + + public static string GetHost(this NostrEvent ev) + { + var hostTag = ev.Tags!.FirstOrDefault(a => a.TagIdentifier == "p" && a.AdditionalData[2] == "host")?.AdditionalData[0]; + if (!string.IsNullOrEmpty(hostTag)) + { + return hostTag; + } + + return ev.Pubkey!; + } } public class Variant diff --git a/NostrStreamer/Migrations/20231218112430_PushSubscriptions.Designer.cs b/NostrStreamer/Migrations/20231218112430_PushSubscriptions.Designer.cs new file mode 100644 index 0000000..52437e9 --- /dev/null +++ b/NostrStreamer/Migrations/20231218112430_PushSubscriptions.Designer.cs @@ -0,0 +1,458 @@ +// +using System; +using System.Collections.Generic; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using NostrStreamer.Database; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + [DbContext(typeof(StreamerContext))] + [Migration("20231218112430_PushSubscriptions")] + partial class PushSubscriptions + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "7.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("App") + .IsRequired() + .HasColumnType("text"); + + b.Property>("Capabilities") + .IsRequired() + .HasColumnType("text[]"); + + b.Property("Cost") + .HasColumnType("integer"); + + b.Property("Forward") + .IsRequired() + .HasColumnType("text"); + + b.Property("Name") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("App") + .IsUnique(); + + b.ToTable("Endpoints"); + }); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.Property("PaymentHash") + .HasColumnType("text"); + + b.Property("Amount") + .HasColumnType("numeric(20,0)"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("Invoice") + .HasColumnType("text"); + + b.Property("IsPaid") + .HasColumnType("boolean"); + + b.Property("Nostr") + .HasColumnType("text"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Type") + .HasColumnType("integer"); + + b.HasKey("PaymentHash"); + + b.HasIndex("PubKey"); + + b.ToTable("Payments"); + }); + + modelBuilder.Entity("NostrStreamer.Database.PushSubscription", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Auth") + .IsRequired() + .HasColumnType("text"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("Endpoint") + .IsRequired() + .HasColumnType("text"); + + b.Property("Key") + .IsRequired() + .HasColumnType("text"); + + b.Property("LastUsed") + .HasColumnType("timestamp with time zone"); + + b.Property("Pubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.Property("Scope") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.ToTable("PushSubscriptions"); + }); + + modelBuilder.Entity("NostrStreamer.Database.PushSubscriptionTarget", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("SubscriberPubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.Property("TargetPubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.HasKey("Id"); + + b.HasIndex("TargetPubkey"); + + b.HasIndex("SubscriberPubkey", "TargetPubkey") + .IsUnique(); + + b.ToTable("PushSubscriptionTargets"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Property("PubKey") + .HasColumnType("text"); + + b.Property("Balance") + .HasColumnType("bigint"); + + b.Property("ContentWarning") + .HasColumnType("text"); + + b.Property("Goal") + .HasColumnType("text"); + + b.Property("Image") + .HasColumnType("text"); + + b.Property("StreamKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Summary") + .HasColumnType("text"); + + b.Property("Tags") + .HasColumnType("text"); + + b.Property("Title") + .HasColumnType("text"); + + b.Property("TosAccepted") + .HasColumnType("timestamp with time zone"); + + b.Property("Version") + .IsConcurrencyToken() + .ValueGeneratedOnAddOrUpdate() + .HasColumnType("xid") + .HasColumnName("xmin"); + + b.HasKey("PubKey"); + + b.ToTable("Users"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("EdgeIp") + .IsRequired() + .HasColumnType("text"); + + b.Property("EndpointId") + .HasColumnType("uuid"); + + b.Property("Ends") + .HasColumnType("timestamp with time zone"); + + b.Property("Event") + .IsRequired() + .HasColumnType("text"); + + b.Property("ForwardClientId") + .IsRequired() + .HasColumnType("text"); + + b.Property("LastSegment") + .HasColumnType("timestamp with time zone"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Starts") + .HasColumnType("timestamp with time zone"); + + b.Property("State") + .HasColumnType("integer"); + + b.Property("StreamId") + .IsRequired() + .HasColumnType("text"); + + b.Property("Thumbnail") + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("EndpointId"); + + b.HasIndex("PubKey"); + + b.ToTable("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamClip", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("TakenByPubkey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Url") + .IsRequired() + .HasColumnType("text"); + + b.Property("UserStreamId") + .HasColumnType("uuid"); + + b.HasKey("Id"); + + b.HasIndex("UserStreamId"); + + b.ToTable("Clips"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamForwards", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Name") + .IsRequired() + .HasColumnType("text"); + + b.Property("Target") + .IsRequired() + .HasColumnType("text"); + + b.Property("UserPubkey") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.HasIndex("UserPubkey"); + + b.ToTable("Forwards"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("PubKey") + .IsRequired() + .HasColumnType("text"); + + b.Property("Relay") + .HasColumnType("text"); + + b.Property("Role") + .HasColumnType("text"); + + b.Property("Sig") + .HasColumnType("text"); + + b.Property("StreamId") + .HasColumnType("uuid"); + + b.Property("ZapSplit") + .HasColumnType("numeric"); + + b.HasKey("Id"); + + b.HasIndex("StreamId", "PubKey") + .IsUnique(); + + b.ToTable("Guests"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Duration") + .HasColumnType("double precision"); + + b.Property("Timestamp") + .HasColumnType("timestamp with time zone"); + + b.Property("Url") + .IsRequired() + .HasColumnType("text"); + + b.Property("UserStreamId") + .HasColumnType("uuid"); + + b.HasKey("Id"); + + b.HasIndex("UserStreamId"); + + b.ToTable("Recordings"); + }); + + modelBuilder.Entity("NostrStreamer.Database.Payment", b => + { + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Payments") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint") + .WithMany() + .HasForeignKey("EndpointId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Streams") + .HasForeignKey("PubKey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Endpoint"); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamClip", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "UserStream") + .WithMany() + .HasForeignKey("UserStreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("UserStream"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamForwards", b => + { + b.HasOne("NostrStreamer.Database.User", "User") + .WithMany("Forwards") + .HasForeignKey("UserPubkey") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("User"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "Stream") + .WithMany("Guests") + .HasForeignKey("StreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Stream"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b => + { + b.HasOne("NostrStreamer.Database.UserStream", "Stream") + .WithMany() + .HasForeignKey("UserStreamId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Stream"); + }); + + modelBuilder.Entity("NostrStreamer.Database.User", b => + { + b.Navigation("Forwards"); + + b.Navigation("Payments"); + + b.Navigation("Streams"); + }); + + modelBuilder.Entity("NostrStreamer.Database.UserStream", b => + { + b.Navigation("Guests"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/NostrStreamer/Migrations/20231218112430_PushSubscriptions.cs b/NostrStreamer/Migrations/20231218112430_PushSubscriptions.cs new file mode 100644 index 0000000..6b04e86 --- /dev/null +++ b/NostrStreamer/Migrations/20231218112430_PushSubscriptions.cs @@ -0,0 +1,67 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace NostrStreamer.Migrations +{ + /// + public partial class PushSubscriptions : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "PushSubscriptions", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + Created = table.Column(type: "timestamp with time zone", nullable: false), + LastUsed = table.Column(type: "timestamp with time zone", nullable: false), + Pubkey = table.Column(type: "character varying(64)", maxLength: 64, nullable: false), + Endpoint = table.Column(type: "text", nullable: false), + Key = table.Column(type: "text", nullable: false), + Auth = table.Column(type: "text", nullable: false), + Scope = table.Column(type: "text", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_PushSubscriptions", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "PushSubscriptionTargets", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + SubscriberPubkey = table.Column(type: "character varying(64)", maxLength: 64, nullable: false), + TargetPubkey = table.Column(type: "character varying(64)", maxLength: 64, nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_PushSubscriptionTargets", x => x.Id); + }); + + migrationBuilder.CreateIndex( + name: "IX_PushSubscriptionTargets_SubscriberPubkey_TargetPubkey", + table: "PushSubscriptionTargets", + columns: new[] { "SubscriberPubkey", "TargetPubkey" }, + unique: true); + + migrationBuilder.CreateIndex( + name: "IX_PushSubscriptionTargets_TargetPubkey", + table: "PushSubscriptionTargets", + column: "TargetPubkey"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "PushSubscriptions"); + + migrationBuilder.DropTable( + name: "PushSubscriptionTargets"); + } + } +} diff --git a/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs b/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs index 54e3685..d07280d 100644 --- a/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs +++ b/NostrStreamer/Migrations/StreamerContextModelSnapshot.cs @@ -90,6 +90,70 @@ namespace NostrStreamer.Migrations b.ToTable("Payments"); }); + modelBuilder.Entity("NostrStreamer.Database.PushSubscription", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Auth") + .IsRequired() + .HasColumnType("text"); + + b.Property("Created") + .HasColumnType("timestamp with time zone"); + + b.Property("Endpoint") + .IsRequired() + .HasColumnType("text"); + + b.Property("Key") + .IsRequired() + .HasColumnType("text"); + + b.Property("LastUsed") + .HasColumnType("timestamp with time zone"); + + b.Property("Pubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.Property("Scope") + .IsRequired() + .HasColumnType("text"); + + b.HasKey("Id"); + + b.ToTable("PushSubscriptions"); + }); + + modelBuilder.Entity("NostrStreamer.Database.PushSubscriptionTarget", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("SubscriberPubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.Property("TargetPubkey") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)"); + + b.HasKey("Id"); + + b.HasIndex("TargetPubkey"); + + b.HasIndex("SubscriberPubkey", "TargetPubkey") + .IsUnique(); + + b.ToTable("PushSubscriptionTargets"); + }); + modelBuilder.Entity("NostrStreamer.Database.User", b => { b.Property("PubKey") diff --git a/NostrStreamer/NostrStreamer.csproj b/NostrStreamer/NostrStreamer.csproj index e0bbd52..a89d174 100644 --- a/NostrStreamer/NostrStreamer.csproj +++ b/NostrStreamer/NostrStreamer.csproj @@ -51,5 +51,8 @@ + + + diff --git a/NostrStreamer/Program.cs b/NostrStreamer/Program.cs index 80a3365..49b23df 100644 --- a/NostrStreamer/Program.cs +++ b/NostrStreamer/Program.cs @@ -4,6 +4,9 @@ using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization.Infrastructure; using Microsoft.AspNetCore.DataProtection; using Microsoft.EntityFrameworkCore; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; +using Newtonsoft.Json.Serialization; using Nostr.Client.Client; using NostrStreamer.Database; using NostrStreamer.Services; @@ -13,11 +16,26 @@ using NostrStreamer.Services.Dvr; using NostrStreamer.Services.StreamManager; using NostrStreamer.Services.Thumbnail; using Prometheus; +using StackExchange.Redis; namespace NostrStreamer; internal static class Program { + private static void ConfigureSerializer(JsonSerializerSettings s) + { + s.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; + s.Formatting = Formatting.None; + s.NullValueHandling = NullValueHandling.Ignore; + s.ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor; + s.Converters = new List() + { + new UnixDateTimeConverter() + }; + + s.ContractResolver = new CamelCasePropertyNamesContractResolver(); + } + public static async Task Main(string[] args) { var builder = WebApplication.CreateBuilder(args); @@ -30,9 +48,16 @@ internal static class Program services.AddMemoryCache(); services.AddHttpClient(); services.AddRazorPages(); - services.AddControllers().AddNewtonsoftJson(); + services.AddControllers().AddNewtonsoftJson(opt => { ConfigureSerializer(opt.SerializerSettings); }); + + services.AddSwaggerGen(); services.AddSingleton(config); + // Redis + var cx = await ConnectionMultiplexer.ConnectAsync(config.Redis); + services.AddSingleton(cx); + services.AddTransient(svc => svc.GetRequiredService().GetDatabase()); + // GeoIP services.AddSingleton(_ => new DatabaseReader(config.GeoIpDatabase)); services.AddTransient(); @@ -74,7 +99,7 @@ internal static class Program // dvr services services.AddTransient(); - + // thumbnail services services.AddTransient(); services.AddHostedService(); @@ -89,6 +114,14 @@ internal static class Program // clip services services.AddTransient(); services.AddTransient(); + + // notifications services + services.AddSingleton(); + services.AddHostedService(); + services.AddHostedService(); + + // snort api + services.AddTransient(); var app = builder.Build(); @@ -106,6 +139,8 @@ internal static class Program app.MapRazorPages(); app.MapControllers(); app.MapMetrics(); + app.UseSwagger(); + app.UseSwaggerUI(); await app.RunAsync(); } diff --git a/NostrStreamer/Services/EventStream.cs b/NostrStreamer/Services/EventStream.cs new file mode 100644 index 0000000..8193b2a --- /dev/null +++ b/NostrStreamer/Services/EventStream.cs @@ -0,0 +1,49 @@ +using Newtonsoft.Json; +using Nostr.Client.Json; +using Nostr.Client.Messages; +using StackExchange.Redis; + +namespace NostrStreamer.Services; + +public class EventStream : BackgroundService +{ + private readonly ILogger _logger; + private readonly IServiceScopeFactory _scopeFactory; + + public EventStream(ILogger logger, IServiceScopeFactory scopeFactory) + { + _logger = logger; + _scopeFactory = scopeFactory; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + using var scope = _scopeFactory.CreateScope(); + var redis = scope.ServiceProvider.GetRequiredService(); + var push = scope.ServiceProvider.GetRequiredService(); + var queue = await redis.GetSubscriber().SubscribeAsync("event-stream"); + + while (!stoppingToken.IsCancellationRequested) + { + var msg = await queue.ReadAsync(stoppingToken); + + var ev = JsonConvert.DeserializeObject(msg.Message!, NostrSerializer.Settings); + if (ev is {Kind: NostrKind.LiveEvent}) + { + push.Add(ev); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "failed {msg}", ex.Message); + } + + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + } + } +} diff --git a/NostrStreamer/Services/PushSender.cs b/NostrStreamer/Services/PushSender.cs new file mode 100644 index 0000000..6e4ee4c --- /dev/null +++ b/NostrStreamer/Services/PushSender.cs @@ -0,0 +1,133 @@ +using System.Net; +using System.Threading.Tasks.Dataflow; +using Microsoft.EntityFrameworkCore; +using Newtonsoft.Json; +using Nostr.Client.Json; +using Nostr.Client.Messages; +using NostrStreamer.ApiModel; +using NostrStreamer.Database; +using StackExchange.Redis; +using WebPush; +using PushSubscription = NostrStreamer.Database.PushSubscription; + +namespace NostrStreamer.Services; + +public record PushNotificationQueue(PushMessage Notification, PushSubscription Subscription); + +public class PushSender +{ + private readonly BufferBlock _queue = new(); + + public void Add(NostrEvent ev) + { + _queue.Post(ev); + } + + public Task Next() + { + return _queue.ReceiveAsync(); + } +} + +public class PushSenderService : BackgroundService +{ + private readonly PushSender _sender; + private readonly HttpClient _client; + private readonly Config _config; + private readonly ILogger _logger; + private readonly IServiceScopeFactory _scopeFactory; + private readonly IDatabase _redis; + private readonly SnortApi _snort; + + public PushSenderService(PushSender sender, HttpClient client, Config config, IServiceScopeFactory scopeFactory, + ILogger logger, SnortApi snort, IDatabase redis) + { + _sender = sender; + _client = client; + _config = config; + _scopeFactory = scopeFactory; + _logger = logger; + _snort = snort; + _redis = redis; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + using var scope = _scopeFactory.CreateScope(); + await using var db = scope.ServiceProvider.GetRequiredService(); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + var ev = await _sender.Next(); + foreach (var (msg, sub) in await ComputeNotifications(db, ev)) + { + var vapid = new VapidDetails(sub.Scope, _config.VapidKey.PublicKey, _config.VapidKey.PrivateKey); + using var webPush = new WebPushClient(_client); + try + { + var pushMsg = JsonConvert.SerializeObject(msg, NostrSerializer.Settings); + _logger.LogInformation("Sending notification {msg}", pushMsg); + var webSub = new WebPush.PushSubscription(sub.Endpoint, sub.Key, sub.Auth); + await webPush.SendNotificationAsync(webSub, pushMsg, vapid, stoppingToken); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to send push for {pubkey} {error}", sub.Pubkey, ex.Message); + if (ex is WebPushException {StatusCode: HttpStatusCode.Gone}) + { + await db.PushSubscriptions.Where(a => a.Id == sub.Id) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); + } + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in PushSender {message}", ex.Message); + } + } + } + + private async Task> ComputeNotifications(StreamerContext db, NostrEvent ev) + { + var ret = new List(); + var notification = await MakeNotificationFromEvent(ev); + if (notification != null) + { + foreach (var sub in await db.PushSubscriptions + .AsNoTracking() + .Join(db.PushSubscriptionTargets, a => a.Pubkey, b => b.SubscriberPubkey, + (a, b) => new {Subscription = a, Target = b}) + .Where(a => a.Target.TargetPubkey == notification.Pubkey) + .ToListAsync()) + { + ret.Add(new(notification, sub.Subscription)); + } + } + + return ret; + } + + private async Task MakeNotificationFromEvent(NostrEvent ev) + { + if (ev.Kind != NostrKind.LiveEvent) return default; + + var dTag = ev.Tags!.FindFirstTagValue("d"); + var key = $"live-event-seen:{ev.Pubkey}:{dTag}"; + if (await _redis.KeyExistsAsync(key)) return default; + + await _redis.StringSetAsync(key, ev.Id!, TimeSpan.FromDays(7)); + + var host = ev.GetHost(); + var profile = await _snort.Profile(host); + return new PushMessage + { + Type = PushMessageType.StreamStarted, + Pubkey = host, + Name = profile?.Name, + Avatar = profile?.Picture + }; + } +} diff --git a/NostrStreamer/Services/SnortApi.cs b/NostrStreamer/Services/SnortApi.cs new file mode 100644 index 0000000..b1fc325 --- /dev/null +++ b/NostrStreamer/Services/SnortApi.cs @@ -0,0 +1,50 @@ +using Newtonsoft.Json; + +namespace NostrStreamer.Services; + +public class SnortApi +{ + private readonly HttpClient _client; + + public SnortApi(HttpClient client, Config config) + { + _client = client; + _client.BaseAddress = config.SnortApi; + _client.Timeout = TimeSpan.FromSeconds(30); + } + + public async Task Profile(string pubkey) + { + var json = await _client.GetStringAsync($"/api/v1/raw/p/{pubkey}"); + if (!string.IsNullOrEmpty(json)) + { + return JsonConvert.DeserializeObject(json); + } + + return default; + } +} + +public class SnortProfile +{ + [JsonProperty("pubKey")] + public string PubKey { get; init; } = null!; + + [JsonProperty("name")] + public string? Name { get; init; } + + [JsonProperty("about")] + public string? About { get; init; } + + [JsonProperty("picture")] + public string? Picture { get; init; } + + [JsonProperty("nip05")] + public string? Nip05 { get; init; } + + [JsonProperty("lud16")] + public string? Lud16 { get; init; } + + [JsonProperty("banner")] + public string? Banner { get; init; } +} diff --git a/NostrStreamer/appsettings.json b/NostrStreamer/appsettings.json index ffa81d0..723858d 100644 --- a/NostrStreamer/appsettings.json +++ b/NostrStreamer/appsettings.json @@ -12,6 +12,7 @@ "Database": "User ID=postgres;Password=postgres;Database=streaming;Pooling=true;Host=127.0.0.1:5431" }, "Config": { + "Redis": "localhost:6666", "RtmpHost": "rtmp://localhost:9005", "SrsHttpHost": "http://localhost:9003", "SrsApiHost": "http://localhost:9002", @@ -33,6 +34,7 @@ "SecretKey": "p7EK4qew6DBkBPqrpRPuJgTOc6ChUlfIcEdAwE7K", "PublicHost": "http://localhost:9010" }, + "SnortApi": "https://api.snort.social", "GeoIpDatabase": "/Users/kieran/Downloads/GeoLite2-City_20230801/GeoLite2-City.mmdb", "Edges": [ { @@ -52,6 +54,10 @@ "ClientId": "123", "ClientSecret": "aaa" }, - "DataProtectionKeyPath": "./keys" + "DataProtectionKeyPath": "./keys", + "VapidKey": { + "PublicKey": "BOlCzqQENSe0TR8wCfQmTW2p_QhaOSqLLVMqKduTNcZKuebLHQuXjh17Ewo_g-Q4iDTnKVj2BdxBqxf5Dc6FhvU", + "PrivateKey": "JL5_OHhNaD9SzYdOfLYd9W_G-4V-J22TANpbD4JXEkI" + } } }