using System.Buffers; using System.Net.WebSockets; using System.Text; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Nostr.Client.Json; using Nostr.Client.Requests; using Nostr.Client.Responses; namespace NostrRelay; public class NostrRelay where THandler : INostrRelay { private readonly ILogger> _logger; private readonly CancellationTokenSource _cts = new(); private readonly THandler _handler; private readonly NostrClientContext _ctx; public NostrRelay(THandler handler, NostrClientContext ctx, CancellationToken ct, ILogger> logger) { _handler = handler; _ctx = ctx; _logger = logger; ct.Register(() => _cts.Cancel()); } private async Task WriteResponse(T obj) { var rspJson = JsonConvert.SerializeObject(obj, NostrSerializer.Settings); _logger.LogTrace("Sending {msg}", rspJson); await _ctx.WebSocket.SendAsync(Encoding.UTF8.GetBytes(rspJson), WebSocketMessageType.Text, true, _cts.Token); } public async Task Read() { if (!await _handler.AcceptConnection(_ctx)) return; var offset = 0; var mem = MemoryPool.Shared.Rent(1024 * 1024 * 32); while (!_cts.IsCancellationRequested) { var msg = await _ctx.WebSocket.ReceiveAsync(mem.Memory[offset..], _cts.Token); if (msg.MessageType is WebSocketMessageType.Text) { var buff = mem.Memory[..(offset + msg.Count)]; offset = !msg.EndOfMessage ? offset + msg.Count : 0; if (!msg.EndOfMessage) continue; var str = Encoding.UTF8.GetString(buff.Span); _logger.LogTrace("Got msg {msg}", str); try { if (str.StartsWith("[\"REQ\"")) { var req = JsonConvert.DeserializeObject(str, NostrSerializer.Settings); if (req != default) { await foreach (var ev in _handler.HandleRequest(_ctx, req)) { await WriteResponse(new NostrEventResponse() { MessageType = "EVENT", Subscription = req.Subscription, Event = ev }); } var rsp = new NostrEoseResponse { MessageType = "EOSE", Subscription = req.Subscription }; await WriteResponse(rsp); } } else if (str.StartsWith("[\"EVENT\"")) { var req = JsonConvert.DeserializeObject(str, NostrSerializer.Settings); if (req != default) { if (!req.Event.IsSignatureValid()) { var rsp = new NostrOkResponse() { MessageType = "OK", EventId = req.Event.Id, Accepted = false, Message = "invalid: sig check failed" }; await WriteResponse(rsp); } else { var result = await _handler.HandleEvent(_ctx, req.Event); var rsp = new NostrOkResponse() { MessageType = "OK", EventId = req.Event.Id, Accepted = result.Ok, Message = result.Message ?? "" }; await WriteResponse(rsp); } } } } catch (Exception ex) { _logger.LogWarning("Failed to process msg {error}", ex.Message); offset = 0; } } } } } [JsonConverter(typeof(ArrayConverter))] public class NostrInboundRequest { public NostrInboundRequest() { } public NostrInboundRequest(string subscription, NostrFilter nostrFilter) { this.Subscription = subscription; this.NostrFilter = nostrFilter; } public static implicit operator NostrRequest(NostrInboundRequest req) { return new(req.Subscription, req.NostrFilter); } [ArrayProperty(0)] public string Type { get; init; } = "REQ"; [ArrayProperty(1)] public string Subscription { get; init; } [ArrayProperty(2)] public NostrFilter NostrFilter { get; init; } }