import 'dart:async'; import 'dart:collection'; import 'dart:developer' as developer; import 'package:flutter/widgets.dart'; import 'package:ndk/ndk.dart'; import 'package:rxdart/rxdart.dart'; import 'package:zap_stream_flutter/const.dart'; /// Reactive filter which builds the widget with a snapshot of the data class RxFilter extends StatefulWidget { final List filters; final bool leaveOpen; final Widget Function(BuildContext, List?) builder; final T Function(Nip01Event)? mapper; final List? relays; const RxFilter( Key key, { required this.filters, required this.builder, this.mapper, this.leaveOpen = true, this.relays, }) : super(key: key); @override State createState() => _RxFilter(); } class _RxFilter extends State> { late NdkResponse _response; late StreamSubscription _listener; HashMap? _events; @override void initState() { super.initState(); developer.log("RX:SEDNING ${widget.filters}"); _response = ndk.requests.subscription( filters: widget.filters, explicitRelays: widget.relays, cacheWrite: true ); if (!widget.leaveOpen) { _response.future.then((_) { developer.log("RX:CLOSING ${widget.filters}"); ndk.requests.closeSubscription(_response.requestId); }); } _listener = _response.stream .bufferTime(const Duration(milliseconds: 500)) .where((events) => events.isNotEmpty) .handleError((e) { developer.log("RX:ERROR $e"); }) .listen((events) { if (context.mounted) { setState(() { developer.log( "RX:GOT ${events.length} events for ${widget.filters}", ); events.forEach(_replaceInto); }); } }); } void _replaceInto(Nip01Event ev) { final evKey = _eventKey(ev); _events ??= HashMap(); final existing = _events![evKey]; if (existing == null || existing.$1 < ev.createdAt) { _events![evKey] = ( ev.createdAt, widget.mapper != null ? widget.mapper!(ev) : ev as T, ); } } String _eventKey(Nip01Event ev) { if ([0, 3].contains(ev.kind) || (ev.kind >= 10000 && ev.kind < 20000)) { return "${ev.kind}:${ev.pubKey}"; } else if (ev.kind >= 30000 && ev.kind < 40000) { return "${ev.kind}:${ev.pubKey}:${ev.getDtag()}"; } else { return ev.id; } } @override void dispose() { super.dispose(); developer.log("RX:CLOSING ${widget.filters}"); _listener.cancel(); ndk.requests.closeSubscription(_response.requestId); } @override Widget build(BuildContext context) { return widget.builder(context, _events?.values.map((v) => v.$2).toList()); } } /// An async filter loader into [RxFilter] class RxFutureFilter extends StatelessWidget { final Future> Function() filterBuilder; final bool leaveOpen; final Widget Function(BuildContext, List?) builder; final Widget? loadingWidget; final T Function(Nip01Event)? mapper; const RxFutureFilter( Key key, { required this.filterBuilder, required this.builder, this.mapper, this.leaveOpen = true, this.loadingWidget, }) : super(key: key); @override Widget build(BuildContext context) { return FutureBuilder>( future: filterBuilder(), builder: (ctx, data) { if (data.hasData) { return RxFilter( super.key!, filters: data.data!, mapper: mapper, builder: builder, ); } else { return loadingWidget ?? SizedBox.shrink(); } }, ); } }