From 66842b4c7b08dc02a3671fbb63205e45c8ec93bf Mon Sep 17 00:00:00 2001 From: Kieran Date: Mon, 15 Jan 2024 11:06:44 +0000 Subject: [PATCH] Process reactions from queue --- PayForReactions/Program.cs | 3 + PayForReactions/ReactionQueue.cs | 191 +++++++++++++++++++++++++++++++ PayForReactions/ZapperRelay.cs | 161 +------------------------- 3 files changed, 200 insertions(+), 155 deletions(-) create mode 100644 PayForReactions/ReactionQueue.cs diff --git a/PayForReactions/Program.cs b/PayForReactions/Program.cs index 01b5519..f083907 100644 --- a/PayForReactions/Program.cs +++ b/PayForReactions/Program.cs @@ -19,6 +19,9 @@ builder.Services.AddSingleton(svc => return new NostrStore("./data", logger); }); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(); + var host = builder.Build(); host.UseWebSockets(); diff --git a/PayForReactions/ReactionQueue.cs b/PayForReactions/ReactionQueue.cs new file mode 100644 index 0000000..701bf07 --- /dev/null +++ b/PayForReactions/ReactionQueue.cs @@ -0,0 +1,191 @@ +using System.Threading.Tasks.Dataflow; +using FASTER.core; +using Microsoft.Extensions.Caching.Memory; +using Nostr.Client.Json; +using Nostr.Client.Keys; +using Nostr.Client.Messages; +using NostrRelay; +using NostrServices.Client; +using EventSession = + FASTER.core.ClientSession>; + + +namespace PayForReactions; + +public class ReactionQueue +{ + public readonly BufferBlock Queue = new(); +} + +public class ReactionQueueWorker : BackgroundService +{ + private readonly ILogger _logger; + private readonly ReactionQueue _queue; + private readonly IMemoryCache _cache; + private readonly Config _config; + private readonly Lnurl _lnurl; + private readonly AlbyApi _albyApi; + private readonly NostrServicesClient _nostrServices; + private readonly EventSession _session; + + public ReactionQueueWorker(ILogger logger, ReactionQueue queue, IMemoryCache cache, Config config, Lnurl lnurl, + AlbyApi albyApi, NostrServicesClient nostrServices, NostrStore store) + { + _logger = logger; + _queue = queue; + _cache = cache; + _config = config; + _lnurl = lnurl; + _albyApi = albyApi; + _nostrServices = nostrServices; + _session = store.MainStore.For(new SimpleFunctions()).NewSession>(); + ; + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + var target = NostrPublicKey.FromBech32(_config.Target).Hex; + var ev = await _queue.Queue.ReceiveAsync(stoppingToken); + if (ev.Pubkey == target) + { + var id = Convert.FromHexString(ev.Id!); + var obj = NostrBuf.Encode(ev); + (await _session.UpsertAsync(ref id, ref obj, token: stoppingToken)).Complete(); + _logger.LogInformation("Got targets event, saving {ev}", NostrJson.Serialize(ev)); + continue; + } + + var amount = MapKindToAmount(ev.Kind); + if (amount != default) + { + _logger.LogInformation("Got reaction {json}", NostrJson.Serialize(ev)); + var eTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "e"); + var pTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "p" && a.AdditionalData[0] == target); + if (pTag == default || eTag == default) + { + _logger.LogInformation("blocked: must be a reaction to {target}", target); + continue; + } + + var idBytes = Convert.FromHexString(eTag.AdditionalData[0]); + var refEventResult = (await _session.ReadAsync(ref idBytes)).Complete(); + if (refEventResult.status.NotFound) + { + _logger.LogInformation("blocked: parent event not found"); + continue; + } + + var seenId = Convert.FromHexString(ev.Id!); + var seenIdResult = (await _session.ReadAsync(ref seenId)).Complete(); + if (seenIdResult.status.Found) + { + _logger.LogInformation("blocked: already zapped this one, how dare you!"); + continue; + } + + var refEvent = NostrBuf.Decode(refEventResult.output); + if (refEvent == default) + { + _logger.LogInformation("blocked: parent event not found"); + continue; + } + + if (refEvent.Pubkey != target) + { + _logger.LogInformation("blocked: parent event not posted by {target}", target); + continue; + } + + var sender = ev.Pubkey!; + var senderProfile = await _nostrServices.Profile(NostrPublicKey.FromHex(sender).Bech32); + if (senderProfile == default) + { + _logger.LogInformation("blocked: couldn't find your profile anon!"); + continue; + } + + var parsedTarget = Lnurl.ParseLnUrl(senderProfile.LightningAddress ?? ""); + if (parsedTarget == default) + { + _logger.LogInformation( + "blocked: so sad... couldn't send a zap because you don't have a lightning address in your profile {name}!", + senderProfile.Name); + + continue; + } + + _logger.LogInformation("Starting payment for {name} - {addr}", senderProfile.Name, senderProfile.LightningAddress); + var svc = await _lnurl.LoadAsync(parsedTarget.ToString()); + if (svc == default) + { + _logger.LogInformation("blocked: wallet is down, no zap for you!"); + continue; + } + + var keyEventWalletSeen = $"zapaped:{refEvent.Id}:{parsedTarget}"; + var eventWalletZapTry = _cache.Get(keyEventWalletSeen); + if (eventWalletZapTry > 0) + { + _logger.LogInformation("blocked: hey i already zapped you! (count={count})", eventWalletZapTry); + continue; + } + + _cache.Set(keyEventWalletSeen, ++eventWalletZapTry); + + var key = NostrPrivateKey.FromBech32(_config.PrivateKey); + var myPubkey = key.DerivePublicKey().Hex; + var zap = new NostrEvent + { + Kind = NostrKind.ZapRequest, + Content = "Thanks for your interaction!", + CreatedAt = DateTime.UtcNow, + Pubkey = myPubkey, + Tags = new NostrEventTags( + new NostrEventTag("e", ev.Id!), + new NostrEventTag("p", sender), + new NostrEventTag("relays", "wss://relay.snort.social", "wss://nos.lol", "wss://relay.damus.io"), + new NostrEventTag("amount", (amount * 1000).ToString()!) + ) + }; + + var zapSigned = zap.Sign(key); + try + { + var invoice = await _lnurl.GetInvoiceAsync(svc, 5, "Thanks for your interaction!", zapSigned); + if (string.IsNullOrEmpty(invoice.Pr)) + { + _logger.LogInformation("blocked: failed to get invoice from {target}", parsedTarget); + } + + _logger.LogInformation("Paying invoice {pr}", invoice.Pr); + if (!await _albyApi.PayInvoice(invoice.Pr)) + { + _logger.LogInformation("blocked: failed to pay invoice!"); + } + + var seenEvent = NostrBuf.Encode(ev); + (await _session.UpsertAsync(ref seenId, ref seenEvent, token: stoppingToken)).Complete(); + + _logger.LogInformation("Zapped {name}!", senderProfile.Name); + } + catch (Exception e) + { + _logger.LogError(e.ToString()); + } + } + } + } + + + private int? MapKindToAmount(NostrKind k) + { + switch (k) + { + case NostrKind.Reaction: return 50; + case NostrKind.GenericRepost: return 100; + } + + return default; + } +} diff --git a/PayForReactions/ZapperRelay.cs b/PayForReactions/ZapperRelay.cs index c6758fc..cda6b09 100644 --- a/PayForReactions/ZapperRelay.cs +++ b/PayForReactions/ZapperRelay.cs @@ -1,36 +1,17 @@ -using FASTER.core; -using Microsoft.Extensions.Caching.Memory; -using Nostr.Client.Json; -using Nostr.Client.Keys; +using System.Threading.Tasks.Dataflow; using Nostr.Client.Messages; using Nostr.Client.Requests; using NostrRelay; -using NostrServices.Client; -using EventSession = - FASTER.core.ClientSession>; namespace PayForReactions; public class ZapperRelay : INostrRelay, IDisposable { - private readonly ILogger _logger; - private readonly IMemoryCache _cache; - private readonly Config _config; - private readonly Lnurl _lnurl; - private readonly AlbyApi _albyApi; - private readonly NostrServicesClient _nostrServices; - private readonly EventSession _session; + private readonly ReactionQueue _queue; - public ZapperRelay(Lnurl lnurl, AlbyApi albyApi, NostrServicesClient nostrServices, Config config, NostrStore store, - ILogger logger, IMemoryCache cache) + public ZapperRelay(ReactionQueue queue) { - _lnurl = lnurl; - _albyApi = albyApi; - _nostrServices = nostrServices; - _config = config; - _logger = logger; - _cache = cache; - _session = store.MainStore.For(new SimpleFunctions()).NewSession>(); + _queue = queue; } public ValueTask AcceptConnection(NostrClientContext context) @@ -45,141 +26,11 @@ public class ZapperRelay : INostrRelay, IDisposable public async ValueTask HandleEvent(NostrClientContext context, NostrEvent ev) { - var target = NostrPublicKey.FromBech32(_config.Target).Hex; - if (ev.Pubkey == target) - { - var id = Convert.FromHexString(ev.Id!); - var obj = NostrBuf.Encode(ev); - (await _session.UpsertAsync(ref id, ref obj)).Complete(); - _logger.LogInformation("Got targets event, saving {ev}", NostrJson.Serialize(ev)); - return new(true, ""); - } - - var amount = MapKindToAmount(ev.Kind); - if (amount != default) - { - _logger.LogInformation("Got reaction {json}", NostrJson.Serialize(ev)); - var eTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "e"); - var pTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "p" && a.AdditionalData[0] == target); - if (pTag == default || eTag == default) - { - return new(false, $"blocked: must be a reaction to {target}"); - } - - var idBytes = Convert.FromHexString(eTag.AdditionalData[0]); - var refEventResult = (await _session.ReadAsync(ref idBytes)).Complete(); - if (refEventResult.status.NotFound) - { - return new(false, "blocked: parent event not found"); - } - - var seenId = Convert.FromHexString(ev.Id!); - var seenIdResult = (await _session.ReadAsync(ref seenId)).Complete(); - if (seenIdResult.status.Found) - { - return new(false, "blocked: already zapped this one, how dare you!"); - } - - var refEvent = NostrBuf.Decode(refEventResult.output); - if (refEvent == default) - { - return new(false, "blocked: parent event not found"); - } - - if (refEvent.Pubkey != target) - { - return new(false, $"blocked: parent event not posted by {target}"); - } - - var sender = ev.Pubkey!; - var senderProfile = await _nostrServices.Profile(NostrPublicKey.FromHex(sender).Bech32); - if (senderProfile == default) - { - return new(false, "blocked: couldn't find your profile anon!"); - } - - var parsedTarget = Lnurl.ParseLnUrl(senderProfile.LightningAddress ?? ""); - if (parsedTarget == default) - { - return new(false, - $"blocked: so sad... couldn't send a zap because you don't have a lightning address in your profile {senderProfile.Name}!"); - } - - _logger.LogInformation("Starting payment for {name} - {addr}", senderProfile.Name, senderProfile.LightningAddress); - var svc = await _lnurl.LoadAsync(parsedTarget.ToString()); - if (svc == default) - { - return new(false, "blocked: wallet is down, no zap for you!"); - } - - var keyEventWalletSeen = $"zapaped:{refEvent.Id}:{parsedTarget}"; - var eventWalletZapTry = _cache.Get(keyEventWalletSeen); - if (eventWalletZapTry > 0) - { - return new(false, $"blocked: hey i already zapped you! (count={eventWalletZapTry})"); - } - - _cache.Set(keyEventWalletSeen, ++eventWalletZapTry); - - var key = NostrPrivateKey.FromBech32(_config.PrivateKey); - var myPubkey = key.DerivePublicKey().Hex; - var zap = new NostrEvent - { - Kind = NostrKind.ZapRequest, - Content = "Thanks for your interaction!", - CreatedAt = DateTime.UtcNow, - Pubkey = myPubkey, - Tags = new NostrEventTags( - new NostrEventTag("e", ev.Id!), - new NostrEventTag("p", sender), - new NostrEventTag("relays", "wss://relay.snort.social", "wss://nos.lol", "wss://relay.damus.io"), - new NostrEventTag("amount", (amount * 1000).ToString()!) - ) - }; - - var zapSigned = zap.Sign(key); - try - { - var invoice = await _lnurl.GetInvoiceAsync(svc, 5, "Thanks for your interaction!", zapSigned); - if (string.IsNullOrEmpty(invoice.Pr)) - { - return new(false, $"blocked: failed to get invoice from {parsedTarget}"); - } - - _logger.LogInformation("Paying invoice {pr}", invoice.Pr); - if (!await _albyApi.PayInvoice(invoice.Pr)) - { - return new(false, "blocked: failed to pay invoice!"); - } - - var seenEvent = NostrBuf.Encode(ev); - (await _session.UpsertAsync(ref seenId, ref seenEvent)).Complete(); - - _logger.LogInformation("Zapped {name}!", senderProfile.Name); - return new(true, "Zapped!"); - } - catch (Exception e) - { - _logger.LogError(e.ToString()); - return new(false, $"blocked: Oh no! something went wrong! {e.Message}"); - } - } - - return new(false, "blocked: kind not accepted, no zap for you!"); + _queue.Queue.Post(ev); + return new(true, ""); } public void Dispose() { } - - private int? MapKindToAmount(NostrKind k) - { - switch (k) - { - case NostrKind.Reaction: return 5; - case NostrKind.GenericRepost: return 10; - } - - return default; - } }