2024-01-12 16:22:38 +00:00

148 lines
5.2 KiB
C#

using System.Buffers;
using System.Net.WebSockets;
using System.Text;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Nostr.Client.Json;
using Nostr.Client.Requests;
using Nostr.Client.Responses;
namespace NostrRelay;
public class NostrRelay<THandler> where THandler : INostrRelay
{
private readonly ILogger<NostrRelay<THandler>> _logger;
private readonly CancellationTokenSource _cts = new();
private readonly THandler _handler;
private readonly NostrClientContext _ctx;
public NostrRelay(THandler handler, NostrClientContext ctx, CancellationToken ct, ILogger<NostrRelay<THandler>> logger)
{
_handler = handler;
_ctx = ctx;
_logger = logger;
ct.Register(() => _cts.Cancel());
}
private async Task WriteResponse<T>(T obj)
{
var rspJson = JsonConvert.SerializeObject(obj, NostrSerializer.Settings);
_logger.LogTrace("Sending {msg}", rspJson);
await _ctx.WebSocket.SendAsync(Encoding.UTF8.GetBytes(rspJson), WebSocketMessageType.Text, true, _cts.Token);
}
public async Task Read()
{
if (!await _handler.AcceptConnection(_ctx)) return;
var offset = 0;
var mem = MemoryPool<byte>.Shared.Rent(1024 * 1024 * 32);
while (!_cts.IsCancellationRequested)
{
var msg = await _ctx.WebSocket.ReceiveAsync(mem.Memory[offset..], _cts.Token);
if (msg.MessageType is WebSocketMessageType.Text)
{
var buff = mem.Memory[..(offset + msg.Count)];
offset = !msg.EndOfMessage ? offset + msg.Count : 0;
if (!msg.EndOfMessage) continue;
var str = Encoding.UTF8.GetString(buff.Span);
_logger.LogTrace("Got msg {msg}", str);
try
{
if (str.StartsWith("[\"REQ\""))
{
var req = JsonConvert.DeserializeObject<NostrInboundRequest>(str, NostrSerializer.Settings);
if (req != default)
{
await foreach (var ev in _handler.HandleRequest(_ctx, req))
{
await WriteResponse(new NostrEventResponse()
{
MessageType = "EVENT",
Subscription = req.Subscription,
Event = ev
});
}
var rsp = new NostrEoseResponse
{
MessageType = "EOSE",
Subscription = req.Subscription
};
await WriteResponse(rsp);
}
}
else if (str.StartsWith("[\"EVENT\""))
{
var req = JsonConvert.DeserializeObject<NostrEventRequest>(str, NostrSerializer.Settings);
if (req != default)
{
if (!req.Event.IsSignatureValid())
{
var rsp = new NostrOkResponse()
{
MessageType = "OK",
EventId = req.Event.Id,
Accepted = false,
Message = "invalid: sig check failed"
};
await WriteResponse(rsp);
}
else
{
var result = await _handler.HandleEvent(_ctx, req.Event);
var rsp = new NostrOkResponse()
{
MessageType = "OK",
EventId = req.Event.Id,
Accepted = result.Ok,
Message = result.Message ?? ""
};
await WriteResponse(rsp);
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning("Failed to process msg {error}", ex.Message);
offset = 0;
}
}
}
}
}
[JsonConverter(typeof(ArrayConverter))]
public class NostrInboundRequest
{
public NostrInboundRequest()
{
}
public NostrInboundRequest(string subscription, NostrFilter nostrFilter)
{
this.Subscription = subscription;
this.NostrFilter = nostrFilter;
}
public static implicit operator NostrRequest(NostrInboundRequest req)
{
return new(req.Subscription, req.NostrFilter);
}
[ArrayProperty(0)]
public string Type { get; init; } = "REQ";
[ArrayProperty(1)]
public string Subscription { get; init; }
[ArrayProperty(2)]
public NostrFilter NostrFilter { get; init; }
}