Push notifications
This commit is contained in:
49
NostrStreamer/Services/EventStream.cs
Normal file
49
NostrStreamer/Services/EventStream.cs
Normal file
@ -0,0 +1,49 @@
|
||||
using Newtonsoft.Json;
|
||||
using Nostr.Client.Json;
|
||||
using Nostr.Client.Messages;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace NostrStreamer.Services;
|
||||
|
||||
public class EventStream : BackgroundService
|
||||
{
|
||||
private readonly ILogger<EventStream> _logger;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
|
||||
public EventStream(ILogger<EventStream> logger, IServiceScopeFactory scopeFactory)
|
||||
{
|
||||
_logger = logger;
|
||||
_scopeFactory = scopeFactory;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var redis = scope.ServiceProvider.GetRequiredService<ConnectionMultiplexer>();
|
||||
var push = scope.ServiceProvider.GetRequiredService<PushSender>();
|
||||
var queue = await redis.GetSubscriber().SubscribeAsync("event-stream");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
var msg = await queue.ReadAsync(stoppingToken);
|
||||
|
||||
var ev = JsonConvert.DeserializeObject<NostrEvent>(msg.Message!, NostrSerializer.Settings);
|
||||
if (ev is {Kind: NostrKind.LiveEvent})
|
||||
{
|
||||
push.Add(ev);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "failed {msg}", ex.Message);
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
133
NostrStreamer/Services/PushSender.cs
Normal file
133
NostrStreamer/Services/PushSender.cs
Normal file
@ -0,0 +1,133 @@
|
||||
using System.Net;
|
||||
using System.Threading.Tasks.Dataflow;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Newtonsoft.Json;
|
||||
using Nostr.Client.Json;
|
||||
using Nostr.Client.Messages;
|
||||
using NostrStreamer.ApiModel;
|
||||
using NostrStreamer.Database;
|
||||
using StackExchange.Redis;
|
||||
using WebPush;
|
||||
using PushSubscription = NostrStreamer.Database.PushSubscription;
|
||||
|
||||
namespace NostrStreamer.Services;
|
||||
|
||||
public record PushNotificationQueue(PushMessage Notification, PushSubscription Subscription);
|
||||
|
||||
public class PushSender
|
||||
{
|
||||
private readonly BufferBlock<NostrEvent> _queue = new();
|
||||
|
||||
public void Add(NostrEvent ev)
|
||||
{
|
||||
_queue.Post(ev);
|
||||
}
|
||||
|
||||
public Task<NostrEvent> Next()
|
||||
{
|
||||
return _queue.ReceiveAsync();
|
||||
}
|
||||
}
|
||||
|
||||
public class PushSenderService : BackgroundService
|
||||
{
|
||||
private readonly PushSender _sender;
|
||||
private readonly HttpClient _client;
|
||||
private readonly Config _config;
|
||||
private readonly ILogger<PushSenderService> _logger;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly IDatabase _redis;
|
||||
private readonly SnortApi _snort;
|
||||
|
||||
public PushSenderService(PushSender sender, HttpClient client, Config config, IServiceScopeFactory scopeFactory,
|
||||
ILogger<PushSenderService> logger, SnortApi snort, IDatabase redis)
|
||||
{
|
||||
_sender = sender;
|
||||
_client = client;
|
||||
_config = config;
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
_snort = snort;
|
||||
_redis = redis;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
await using var db = scope.ServiceProvider.GetRequiredService<StreamerContext>();
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var ev = await _sender.Next();
|
||||
foreach (var (msg, sub) in await ComputeNotifications(db, ev))
|
||||
{
|
||||
var vapid = new VapidDetails(sub.Scope, _config.VapidKey.PublicKey, _config.VapidKey.PrivateKey);
|
||||
using var webPush = new WebPushClient(_client);
|
||||
try
|
||||
{
|
||||
var pushMsg = JsonConvert.SerializeObject(msg, NostrSerializer.Settings);
|
||||
_logger.LogInformation("Sending notification {msg}", pushMsg);
|
||||
var webSub = new WebPush.PushSubscription(sub.Endpoint, sub.Key, sub.Auth);
|
||||
await webPush.SendNotificationAsync(webSub, pushMsg, vapid, stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to send push for {pubkey} {error}", sub.Pubkey, ex.Message);
|
||||
if (ex is WebPushException {StatusCode: HttpStatusCode.Gone})
|
||||
{
|
||||
await db.PushSubscriptions.Where(a => a.Id == sub.Id)
|
||||
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in PushSender {message}", ex.Message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IEnumerable<PushNotificationQueue>> ComputeNotifications(StreamerContext db, NostrEvent ev)
|
||||
{
|
||||
var ret = new List<PushNotificationQueue>();
|
||||
var notification = await MakeNotificationFromEvent(ev);
|
||||
if (notification != null)
|
||||
{
|
||||
foreach (var sub in await db.PushSubscriptions
|
||||
.AsNoTracking()
|
||||
.Join(db.PushSubscriptionTargets, a => a.Pubkey, b => b.SubscriberPubkey,
|
||||
(a, b) => new {Subscription = a, Target = b})
|
||||
.Where(a => a.Target.TargetPubkey == notification.Pubkey)
|
||||
.ToListAsync())
|
||||
{
|
||||
ret.Add(new(notification, sub.Subscription));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private async Task<PushMessage?> MakeNotificationFromEvent(NostrEvent ev)
|
||||
{
|
||||
if (ev.Kind != NostrKind.LiveEvent) return default;
|
||||
|
||||
var dTag = ev.Tags!.FindFirstTagValue("d");
|
||||
var key = $"live-event-seen:{ev.Pubkey}:{dTag}";
|
||||
if (await _redis.KeyExistsAsync(key)) return default;
|
||||
|
||||
await _redis.StringSetAsync(key, ev.Id!, TimeSpan.FromDays(7));
|
||||
|
||||
var host = ev.GetHost();
|
||||
var profile = await _snort.Profile(host);
|
||||
return new PushMessage
|
||||
{
|
||||
Type = PushMessageType.StreamStarted,
|
||||
Pubkey = host,
|
||||
Name = profile?.Name,
|
||||
Avatar = profile?.Picture
|
||||
};
|
||||
}
|
||||
}
|
50
NostrStreamer/Services/SnortApi.cs
Normal file
50
NostrStreamer/Services/SnortApi.cs
Normal file
@ -0,0 +1,50 @@
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace NostrStreamer.Services;
|
||||
|
||||
public class SnortApi
|
||||
{
|
||||
private readonly HttpClient _client;
|
||||
|
||||
public SnortApi(HttpClient client, Config config)
|
||||
{
|
||||
_client = client;
|
||||
_client.BaseAddress = config.SnortApi;
|
||||
_client.Timeout = TimeSpan.FromSeconds(30);
|
||||
}
|
||||
|
||||
public async Task<SnortProfile?> Profile(string pubkey)
|
||||
{
|
||||
var json = await _client.GetStringAsync($"/api/v1/raw/p/{pubkey}");
|
||||
if (!string.IsNullOrEmpty(json))
|
||||
{
|
||||
return JsonConvert.DeserializeObject<SnortProfile>(json);
|
||||
}
|
||||
|
||||
return default;
|
||||
}
|
||||
}
|
||||
|
||||
public class SnortProfile
|
||||
{
|
||||
[JsonProperty("pubKey")]
|
||||
public string PubKey { get; init; } = null!;
|
||||
|
||||
[JsonProperty("name")]
|
||||
public string? Name { get; init; }
|
||||
|
||||
[JsonProperty("about")]
|
||||
public string? About { get; init; }
|
||||
|
||||
[JsonProperty("picture")]
|
||||
public string? Picture { get; init; }
|
||||
|
||||
[JsonProperty("nip05")]
|
||||
public string? Nip05 { get; init; }
|
||||
|
||||
[JsonProperty("lud16")]
|
||||
public string? Lud16 { get; init; }
|
||||
|
||||
[JsonProperty("banner")]
|
||||
public string? Banner { get; init; }
|
||||
}
|
Reference in New Issue
Block a user