Process reactions from queue
This commit is contained in:
@ -19,6 +19,9 @@ builder.Services.AddSingleton<NostrStore>(svc =>
|
|||||||
return new NostrStore("./data", logger);
|
return new NostrStore("./data", logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
builder.Services.AddSingleton<ReactionQueue>();
|
||||||
|
builder.Services.AddHostedService<ReactionQueueWorker>();
|
||||||
|
|
||||||
var host = builder.Build();
|
var host = builder.Build();
|
||||||
|
|
||||||
host.UseWebSockets();
|
host.UseWebSockets();
|
||||||
|
191
PayForReactions/ReactionQueue.cs
Normal file
191
PayForReactions/ReactionQueue.cs
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
using System.Threading.Tasks.Dataflow;
|
||||||
|
using FASTER.core;
|
||||||
|
using Microsoft.Extensions.Caching.Memory;
|
||||||
|
using Nostr.Client.Json;
|
||||||
|
using Nostr.Client.Keys;
|
||||||
|
using Nostr.Client.Messages;
|
||||||
|
using NostrRelay;
|
||||||
|
using NostrServices.Client;
|
||||||
|
using EventSession =
|
||||||
|
FASTER.core.ClientSession<byte[], byte[], byte[], byte[], FASTER.core.Empty, FASTER.core.SimpleFunctions<byte[], byte[]>>;
|
||||||
|
|
||||||
|
|
||||||
|
namespace PayForReactions;
|
||||||
|
|
||||||
|
public class ReactionQueue
|
||||||
|
{
|
||||||
|
public readonly BufferBlock<NostrEvent> Queue = new();
|
||||||
|
}
|
||||||
|
|
||||||
|
public class ReactionQueueWorker : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly ILogger<ReactionQueueWorker> _logger;
|
||||||
|
private readonly ReactionQueue _queue;
|
||||||
|
private readonly IMemoryCache _cache;
|
||||||
|
private readonly Config _config;
|
||||||
|
private readonly Lnurl _lnurl;
|
||||||
|
private readonly AlbyApi _albyApi;
|
||||||
|
private readonly NostrServicesClient _nostrServices;
|
||||||
|
private readonly EventSession _session;
|
||||||
|
|
||||||
|
public ReactionQueueWorker(ILogger<ReactionQueueWorker> logger, ReactionQueue queue, IMemoryCache cache, Config config, Lnurl lnurl,
|
||||||
|
AlbyApi albyApi, NostrServicesClient nostrServices, NostrStore store)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_queue = queue;
|
||||||
|
_cache = cache;
|
||||||
|
_config = config;
|
||||||
|
_lnurl = lnurl;
|
||||||
|
_albyApi = albyApi;
|
||||||
|
_nostrServices = nostrServices;
|
||||||
|
_session = store.MainStore.For(new SimpleFunctions<byte[], byte[]>()).NewSession<SimpleFunctions<byte[], byte[]>>();
|
||||||
|
;
|
||||||
|
}
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var target = NostrPublicKey.FromBech32(_config.Target).Hex;
|
||||||
|
var ev = await _queue.Queue.ReceiveAsync(stoppingToken);
|
||||||
|
if (ev.Pubkey == target)
|
||||||
|
{
|
||||||
|
var id = Convert.FromHexString(ev.Id!);
|
||||||
|
var obj = NostrBuf.Encode(ev);
|
||||||
|
(await _session.UpsertAsync(ref id, ref obj, token: stoppingToken)).Complete();
|
||||||
|
_logger.LogInformation("Got targets event, saving {ev}", NostrJson.Serialize(ev));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var amount = MapKindToAmount(ev.Kind);
|
||||||
|
if (amount != default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Got reaction {json}", NostrJson.Serialize(ev));
|
||||||
|
var eTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "e");
|
||||||
|
var pTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "p" && a.AdditionalData[0] == target);
|
||||||
|
if (pTag == default || eTag == default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: must be a reaction to {target}", target);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var idBytes = Convert.FromHexString(eTag.AdditionalData[0]);
|
||||||
|
var refEventResult = (await _session.ReadAsync(ref idBytes)).Complete();
|
||||||
|
if (refEventResult.status.NotFound)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: parent event not found");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var seenId = Convert.FromHexString(ev.Id!);
|
||||||
|
var seenIdResult = (await _session.ReadAsync(ref seenId)).Complete();
|
||||||
|
if (seenIdResult.status.Found)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: already zapped this one, how dare you!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var refEvent = NostrBuf.Decode(refEventResult.output);
|
||||||
|
if (refEvent == default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: parent event not found");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (refEvent.Pubkey != target)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: parent event not posted by {target}", target);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var sender = ev.Pubkey!;
|
||||||
|
var senderProfile = await _nostrServices.Profile(NostrPublicKey.FromHex(sender).Bech32);
|
||||||
|
if (senderProfile == default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: couldn't find your profile anon!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var parsedTarget = Lnurl.ParseLnUrl(senderProfile.LightningAddress ?? "");
|
||||||
|
if (parsedTarget == default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"blocked: so sad... couldn't send a zap because you don't have a lightning address in your profile {name}!",
|
||||||
|
senderProfile.Name);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation("Starting payment for {name} - {addr}", senderProfile.Name, senderProfile.LightningAddress);
|
||||||
|
var svc = await _lnurl.LoadAsync(parsedTarget.ToString());
|
||||||
|
if (svc == default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: wallet is down, no zap for you!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var keyEventWalletSeen = $"zapaped:{refEvent.Id}:{parsedTarget}";
|
||||||
|
var eventWalletZapTry = _cache.Get<int>(keyEventWalletSeen);
|
||||||
|
if (eventWalletZapTry > 0)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: hey i already zapped you! (count={count})", eventWalletZapTry);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_cache.Set(keyEventWalletSeen, ++eventWalletZapTry);
|
||||||
|
|
||||||
|
var key = NostrPrivateKey.FromBech32(_config.PrivateKey);
|
||||||
|
var myPubkey = key.DerivePublicKey().Hex;
|
||||||
|
var zap = new NostrEvent
|
||||||
|
{
|
||||||
|
Kind = NostrKind.ZapRequest,
|
||||||
|
Content = "Thanks for your interaction!",
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
Pubkey = myPubkey,
|
||||||
|
Tags = new NostrEventTags(
|
||||||
|
new NostrEventTag("e", ev.Id!),
|
||||||
|
new NostrEventTag("p", sender),
|
||||||
|
new NostrEventTag("relays", "wss://relay.snort.social", "wss://nos.lol", "wss://relay.damus.io"),
|
||||||
|
new NostrEventTag("amount", (amount * 1000).ToString()!)
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
var zapSigned = zap.Sign(key);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var invoice = await _lnurl.GetInvoiceAsync(svc, 5, "Thanks for your interaction!", zapSigned);
|
||||||
|
if (string.IsNullOrEmpty(invoice.Pr))
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: failed to get invoice from {target}", parsedTarget);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation("Paying invoice {pr}", invoice.Pr);
|
||||||
|
if (!await _albyApi.PayInvoice(invoice.Pr))
|
||||||
|
{
|
||||||
|
_logger.LogInformation("blocked: failed to pay invoice!");
|
||||||
|
}
|
||||||
|
|
||||||
|
var seenEvent = NostrBuf.Encode(ev);
|
||||||
|
(await _session.UpsertAsync(ref seenId, ref seenEvent, token: stoppingToken)).Complete();
|
||||||
|
|
||||||
|
_logger.LogInformation("Zapped {name}!", senderProfile.Name);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_logger.LogError(e.ToString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private int? MapKindToAmount(NostrKind k)
|
||||||
|
{
|
||||||
|
switch (k)
|
||||||
|
{
|
||||||
|
case NostrKind.Reaction: return 50;
|
||||||
|
case NostrKind.GenericRepost: return 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
return default;
|
||||||
|
}
|
||||||
|
}
|
@ -1,36 +1,17 @@
|
|||||||
using FASTER.core;
|
using System.Threading.Tasks.Dataflow;
|
||||||
using Microsoft.Extensions.Caching.Memory;
|
|
||||||
using Nostr.Client.Json;
|
|
||||||
using Nostr.Client.Keys;
|
|
||||||
using Nostr.Client.Messages;
|
using Nostr.Client.Messages;
|
||||||
using Nostr.Client.Requests;
|
using Nostr.Client.Requests;
|
||||||
using NostrRelay;
|
using NostrRelay;
|
||||||
using NostrServices.Client;
|
|
||||||
using EventSession =
|
|
||||||
FASTER.core.ClientSession<byte[], byte[], byte[], byte[], FASTER.core.Empty, FASTER.core.SimpleFunctions<byte[], byte[]>>;
|
|
||||||
|
|
||||||
namespace PayForReactions;
|
namespace PayForReactions;
|
||||||
|
|
||||||
public class ZapperRelay : INostrRelay, IDisposable
|
public class ZapperRelay : INostrRelay, IDisposable
|
||||||
{
|
{
|
||||||
private readonly ILogger<ZapperRelay> _logger;
|
private readonly ReactionQueue _queue;
|
||||||
private readonly IMemoryCache _cache;
|
|
||||||
private readonly Config _config;
|
|
||||||
private readonly Lnurl _lnurl;
|
|
||||||
private readonly AlbyApi _albyApi;
|
|
||||||
private readonly NostrServicesClient _nostrServices;
|
|
||||||
private readonly EventSession _session;
|
|
||||||
|
|
||||||
public ZapperRelay(Lnurl lnurl, AlbyApi albyApi, NostrServicesClient nostrServices, Config config, NostrStore store,
|
public ZapperRelay(ReactionQueue queue)
|
||||||
ILogger<ZapperRelay> logger, IMemoryCache cache)
|
|
||||||
{
|
{
|
||||||
_lnurl = lnurl;
|
_queue = queue;
|
||||||
_albyApi = albyApi;
|
|
||||||
_nostrServices = nostrServices;
|
|
||||||
_config = config;
|
|
||||||
_logger = logger;
|
|
||||||
_cache = cache;
|
|
||||||
_session = store.MainStore.For(new SimpleFunctions<byte[], byte[]>()).NewSession<SimpleFunctions<byte[], byte[]>>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<bool> AcceptConnection(NostrClientContext context)
|
public ValueTask<bool> AcceptConnection(NostrClientContext context)
|
||||||
@ -45,141 +26,11 @@ public class ZapperRelay : INostrRelay, IDisposable
|
|||||||
|
|
||||||
public async ValueTask<HandleEventResponse> HandleEvent(NostrClientContext context, NostrEvent ev)
|
public async ValueTask<HandleEventResponse> HandleEvent(NostrClientContext context, NostrEvent ev)
|
||||||
{
|
{
|
||||||
var target = NostrPublicKey.FromBech32(_config.Target).Hex;
|
_queue.Queue.Post(ev);
|
||||||
if (ev.Pubkey == target)
|
return new(true, "");
|
||||||
{
|
|
||||||
var id = Convert.FromHexString(ev.Id!);
|
|
||||||
var obj = NostrBuf.Encode(ev);
|
|
||||||
(await _session.UpsertAsync(ref id, ref obj)).Complete();
|
|
||||||
_logger.LogInformation("Got targets event, saving {ev}", NostrJson.Serialize(ev));
|
|
||||||
return new(true, "");
|
|
||||||
}
|
|
||||||
|
|
||||||
var amount = MapKindToAmount(ev.Kind);
|
|
||||||
if (amount != default)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Got reaction {json}", NostrJson.Serialize(ev));
|
|
||||||
var eTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "e");
|
|
||||||
var pTag = ev.Tags?.FirstOrDefault(a => a.TagIdentifier == "p" && a.AdditionalData[0] == target);
|
|
||||||
if (pTag == default || eTag == default)
|
|
||||||
{
|
|
||||||
return new(false, $"blocked: must be a reaction to {target}");
|
|
||||||
}
|
|
||||||
|
|
||||||
var idBytes = Convert.FromHexString(eTag.AdditionalData[0]);
|
|
||||||
var refEventResult = (await _session.ReadAsync(ref idBytes)).Complete();
|
|
||||||
if (refEventResult.status.NotFound)
|
|
||||||
{
|
|
||||||
return new(false, "blocked: parent event not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
var seenId = Convert.FromHexString(ev.Id!);
|
|
||||||
var seenIdResult = (await _session.ReadAsync(ref seenId)).Complete();
|
|
||||||
if (seenIdResult.status.Found)
|
|
||||||
{
|
|
||||||
return new(false, "blocked: already zapped this one, how dare you!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var refEvent = NostrBuf.Decode(refEventResult.output);
|
|
||||||
if (refEvent == default)
|
|
||||||
{
|
|
||||||
return new(false, "blocked: parent event not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (refEvent.Pubkey != target)
|
|
||||||
{
|
|
||||||
return new(false, $"blocked: parent event not posted by {target}");
|
|
||||||
}
|
|
||||||
|
|
||||||
var sender = ev.Pubkey!;
|
|
||||||
var senderProfile = await _nostrServices.Profile(NostrPublicKey.FromHex(sender).Bech32);
|
|
||||||
if (senderProfile == default)
|
|
||||||
{
|
|
||||||
return new(false, "blocked: couldn't find your profile anon!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var parsedTarget = Lnurl.ParseLnUrl(senderProfile.LightningAddress ?? "");
|
|
||||||
if (parsedTarget == default)
|
|
||||||
{
|
|
||||||
return new(false,
|
|
||||||
$"blocked: so sad... couldn't send a zap because you don't have a lightning address in your profile {senderProfile.Name}!");
|
|
||||||
}
|
|
||||||
|
|
||||||
_logger.LogInformation("Starting payment for {name} - {addr}", senderProfile.Name, senderProfile.LightningAddress);
|
|
||||||
var svc = await _lnurl.LoadAsync(parsedTarget.ToString());
|
|
||||||
if (svc == default)
|
|
||||||
{
|
|
||||||
return new(false, "blocked: wallet is down, no zap for you!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var keyEventWalletSeen = $"zapaped:{refEvent.Id}:{parsedTarget}";
|
|
||||||
var eventWalletZapTry = _cache.Get<int>(keyEventWalletSeen);
|
|
||||||
if (eventWalletZapTry > 0)
|
|
||||||
{
|
|
||||||
return new(false, $"blocked: hey i already zapped you! (count={eventWalletZapTry})");
|
|
||||||
}
|
|
||||||
|
|
||||||
_cache.Set(keyEventWalletSeen, ++eventWalletZapTry);
|
|
||||||
|
|
||||||
var key = NostrPrivateKey.FromBech32(_config.PrivateKey);
|
|
||||||
var myPubkey = key.DerivePublicKey().Hex;
|
|
||||||
var zap = new NostrEvent
|
|
||||||
{
|
|
||||||
Kind = NostrKind.ZapRequest,
|
|
||||||
Content = "Thanks for your interaction!",
|
|
||||||
CreatedAt = DateTime.UtcNow,
|
|
||||||
Pubkey = myPubkey,
|
|
||||||
Tags = new NostrEventTags(
|
|
||||||
new NostrEventTag("e", ev.Id!),
|
|
||||||
new NostrEventTag("p", sender),
|
|
||||||
new NostrEventTag("relays", "wss://relay.snort.social", "wss://nos.lol", "wss://relay.damus.io"),
|
|
||||||
new NostrEventTag("amount", (amount * 1000).ToString()!)
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
var zapSigned = zap.Sign(key);
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var invoice = await _lnurl.GetInvoiceAsync(svc, 5, "Thanks for your interaction!", zapSigned);
|
|
||||||
if (string.IsNullOrEmpty(invoice.Pr))
|
|
||||||
{
|
|
||||||
return new(false, $"blocked: failed to get invoice from {parsedTarget}");
|
|
||||||
}
|
|
||||||
|
|
||||||
_logger.LogInformation("Paying invoice {pr}", invoice.Pr);
|
|
||||||
if (!await _albyApi.PayInvoice(invoice.Pr))
|
|
||||||
{
|
|
||||||
return new(false, "blocked: failed to pay invoice!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var seenEvent = NostrBuf.Encode(ev);
|
|
||||||
(await _session.UpsertAsync(ref seenId, ref seenEvent)).Complete();
|
|
||||||
|
|
||||||
_logger.LogInformation("Zapped {name}!", senderProfile.Name);
|
|
||||||
return new(true, "Zapped!");
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
_logger.LogError(e.ToString());
|
|
||||||
return new(false, $"blocked: Oh no! something went wrong! {e.Message}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new(false, "blocked: kind not accepted, no zap for you!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
private int? MapKindToAmount(NostrKind k)
|
|
||||||
{
|
|
||||||
switch (k)
|
|
||||||
{
|
|
||||||
case NostrKind.Reaction: return 5;
|
|
||||||
case NostrKind.GenericRepost: return 10;
|
|
||||||
}
|
|
||||||
|
|
||||||
return default;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user