1
0
mirror of git://jb55.com/damus synced 2024-09-19 19:46:51 +00:00

refinements to RelayConnection and RelayPool

This commit is contained in:
Bryan Montz 2023-02-24 22:39:58 -06:00
parent 0210ae5d61
commit 673358408a
9 changed files with 135 additions and 161 deletions

View File

@ -31,9 +31,9 @@ class EventsModel: ObservableObject {
} }
func subscribe() { func subscribe() {
state.pool.subscribe(sub_id: sub_id, state.pool.subscribe_to(sub_id: sub_id,
filters: [get_filter()], filters: [get_filter()],
handler: handle_nostr_event) handler: handle_nostr_event)
} }
func unsubscribe() { func unsubscribe() {

View File

@ -40,7 +40,7 @@ class FollowersModel: ObservableObject {
let filter = get_filter() let filter = get_filter()
let filters = [filter] let filters = [filter]
print_filters(relay_id: "following", filters: [filters]) print_filters(relay_id: "following", filters: [filters])
self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event) self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event)
} }
func unsubscribe() { func unsubscribe() {

View File

@ -41,7 +41,7 @@ class FollowingModel {
} }
let filters = [filter] let filters = [filter]
print_filters(relay_id: "following", filters: [filters]) print_filters(relay_id: "following", filters: [filters])
self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event) self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event)
} }
func unsubscribe() { func unsubscribe() {

View File

@ -83,8 +83,8 @@ class ProfileModel: ObservableObject, Equatable {
print("subscribing to profile \(pubkey) with sub_id \(sub_id)") print("subscribing to profile \(pubkey) with sub_id \(sub_id)")
print_filters(relay_id: "profile", filters: [[text_filter], [profile_filter]]) print_filters(relay_id: "profile", filters: [[text_filter], [profile_filter]])
damus.pool.subscribe(sub_id: sub_id, filters: [text_filter], handler: handle_event) damus.pool.subscribe_to(sub_id: sub_id, filters: [text_filter], handler: handle_event)
damus.pool.subscribe(sub_id: prof_subid, filters: [profile_filter], handler: handle_event) damus.pool.subscribe_to(sub_id: prof_subid, filters: [profile_filter], handler: handle_event)
} }
func handle_profile_contact_event(_ ev: NostrEvent) { func handle_profile_contact_event(_ ev: NostrEvent) {

View File

@ -38,7 +38,7 @@ class SearchHomeModel: ObservableObject {
func subscribe() { func subscribe() {
loading = true loading = true
let to_relays = determine_to_relays(pool: damus_state.pool, filters: damus_state.relay_filters) let to_relays = determine_to_relays(pool: damus_state.pool, filters: damus_state.relay_filters)
damus_state.pool.subscribe(sub_id: base_subid, filters: [get_base_filter()], handler: handle_event, to: to_relays) damus_state.pool.subscribe_to(sub_id: base_subid, filters: [get_base_filter()], to: to_relays, handler: handle_event)
} }
func unsubscribe(to: String? = nil) { func unsubscribe(to: String? = nil) {

View File

@ -29,7 +29,7 @@ class ZapsModel: ObservableObject {
case .note(let note_target): case .note(let note_target):
filter.referenced_ids = [note_target.note_id] filter.referenced_ids = [note_target.note_id]
} }
state.pool.subscribe(sub_id: zaps_subid, filters: [filter], handler: handle_event) state.pool.subscribe_to(sub_id: zaps_subid, filters: [filter], handler: handle_event)
} }
func unsubscribe() { func unsubscribe() {

View File

@ -14,9 +14,15 @@ enum NostrConnectionEvent {
} }
final class RelayConnection: WebSocketDelegate { final class RelayConnection: WebSocketDelegate {
private(set) var isConnected = false enum State {
private(set) var isConnecting = false case notConnected
private(set) var isReconnecting = false case connecting
case reconnecting
case connected
case failed
}
private(set) var state: State = .notConnected
private(set) var last_connection_attempt: TimeInterval = 0 private(set) var last_connection_attempt: TimeInterval = 0
private lazy var socket = { private lazy var socket = {
@ -25,38 +31,36 @@ final class RelayConnection: WebSocketDelegate {
socket.delegate = self socket.delegate = self
return socket return socket
}() }()
private var handleEvent: (NostrConnectionEvent) -> () private let eventHandler: (NostrConnectionEvent) -> ()
let url: URL let url: URL
init(url: URL, handleEvent: @escaping (NostrConnectionEvent) -> ()) { init(url: URL, eventHandler: @escaping (NostrConnectionEvent) -> ()) {
self.url = url self.url = url
self.handleEvent = handleEvent self.eventHandler = eventHandler
} }
func reconnect() { func reconnect() {
if isConnected { if state == .connected {
isReconnecting = true state = .reconnecting
disconnect() disconnect()
} else { } else {
// we're already disconnected, so just connect // we're already disconnected, so just connect
connect(force: true) connect()
} }
} }
func connect(force: Bool = false) { func connect(force: Bool = false) {
if !force && (isConnected || isConnecting) { if !force && (state == .connected || state == .connecting) {
return return
} }
isConnecting = true state = .connecting
last_connection_attempt = Date().timeIntervalSince1970 last_connection_attempt = Date().timeIntervalSince1970
socket.connect() socket.connect()
} }
func disconnect() { func disconnect() {
socket.disconnect() socket.disconnect()
isConnected = false
isConnecting = false
} }
func send(_ req: NostrRequest) { func send(_ req: NostrRequest) {
@ -68,51 +72,52 @@ final class RelayConnection: WebSocketDelegate {
socket.write(string: req) socket.write(string: req)
} }
private func decodeEvent(_ txt: String) throws -> NostrConnectionEvent {
if let ev = decode_nostr_event(txt: txt) {
return .nostr_event(ev)
} else {
throw DecodingError.dataCorrupted(.init(codingPath: [], debugDescription: "decoding event failed"))
}
}
@MainActor
private func handleEvent(_ event: NostrConnectionEvent) async {
eventHandler(event)
}
// MARK: - WebSocketDelegate // MARK: - WebSocketDelegate
func didReceive(event: WebSocketEvent, client: WebSocket) { func didReceive(event: WebSocketEvent, client: WebSocket) {
switch event { switch event {
case .connected: case .connected:
self.isConnected = true state = .connected
self.isConnecting = false
case .disconnected: case .disconnected:
self.isConnecting = false if state == .reconnecting {
self.isConnected = false connect()
if self.isReconnecting { } else {
self.isReconnecting = false state = .notConnected
self.connect()
} }
case .cancelled, .error: case .cancelled, .error:
self.isConnecting = false state = .failed
self.isConnected = false
case .text(let txt): case .text(let txt):
if txt.count > 2000 { Task(priority: .userInitiated) {
DispatchQueue.global(qos: .default).async { do {
if let ev = decode_nostr_event(txt: txt) { let event = try decodeEvent(txt)
DispatchQueue.main.async { await handleEvent(event)
self.handleEvent(.nostr_event(ev)) } catch {
} print("decode failed for \(txt): \(error)")
return // TODO: trigger event error
}
}
} else {
if let ev = decode_nostr_event(txt: txt) {
handleEvent(.nostr_event(ev))
return
} }
} }
print("decode failed for \(txt)")
// TODO: trigger event error
default: default:
break break
} }
handleEvent(.ws_event(event)) eventHandler(.ws_event(event))
} }
} }

View File

@ -7,22 +7,6 @@
import Foundation import Foundation
struct SubscriptionId: Identifiable, CustomStringConvertible {
let id: String
var description: String {
id
}
}
struct RelayId: Identifiable, CustomStringConvertible {
let id: String
var description: String {
id
}
}
struct RelayHandler { struct RelayHandler {
let sub_id: String let sub_id: String
let callback: (String, NostrConnectionEvent) -> () let callback: (String, NostrConnectionEvent) -> ()
@ -33,22 +17,21 @@ struct QueuedRequest {
let relay: String let relay: String
} }
struct NostrRequestId: Equatable, Hashable { final class RelayPool {
let relay: String? private enum Constants {
let sub_id: String /// Used for an exponential backoff algorithm when retrying stale connections
} /// Each retry attempt will be delayed by raising this base delay to an exponent
/// equal to the number of previous retries.
class RelayPool { static let base_reconnect_delay: TimeInterval = 2
/// Used for an exponential backoff algorithm when retrying stale connections static let max_queued_requests = 10
/// Each retry attempt will be delayed by raising this base delay to an exponent static let max_retry_attempts = 3
/// equal to the number of previous retries. }
private static let base_reconnect_delay: TimeInterval = 2
var relays: [Relay] = [] private(set) var relays: [Relay] = []
var handlers: [RelayHandler] = [] private var handlers: [RelayHandler] = []
var request_queue: [QueuedRequest] = [] private var request_queue: [QueuedRequest] = []
var seen: Set<String> = Set() private var seen: Set<String> = Set()
var counts: [String: UInt64] = [:] private var counts: [String: UInt64] = [:]
private var retry_attempts_per_relay: [URL: Int] = [:] private var retry_attempts_per_relay: [URL: Int] = [:]
var descriptors: [RelayDescriptor] { var descriptors: [RelayDescriptor] {
@ -56,37 +39,28 @@ class RelayPool {
} }
var num_connecting: Int { var num_connecting: Int {
return relays.reduce(0) { n, r in n + (r.connection.isConnecting ? 1 : 0) } relays.reduce(0) { n, r in n + (r.connection.state == .connecting ? 1 : 0) }
} }
func remove_handler(sub_id: String) { private func remove_handler(sub_id: String) {
self.handlers = handlers.filter { $0.sub_id != sub_id } self.handlers = handlers.filter { $0.sub_id != sub_id }
print("removing \(sub_id) handler, current: \(handlers.count)") print("removing \(sub_id) handler, current: \(handlers.count)")
} }
func register_handler(sub_id: String, handler: @escaping (String, NostrConnectionEvent) -> ()) { func register_handler(sub_id: String, handler: @escaping (String, NostrConnectionEvent) -> ()) {
for handler in handlers { guard !handlers.contains(where: { $0.sub_id == sub_id }) else {
// don't add duplicate handlers return // don't add duplicate handlers
if handler.sub_id == sub_id {
return
}
} }
self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
print("registering \(sub_id) handler, current: \(self.handlers.count)") print("registering \(sub_id) handler, current: \(self.handlers.count)")
} }
func remove_relay(_ relay_id: String) { func remove_relay(_ relay_id: String) {
var i: Int = 0 disconnect(from: [relay_id])
self.disconnect(to: [relay_id]) if let index = relays.firstIndex(where: { $0.id == relay_id }) {
relays.remove(at: index)
for relay in relays {
if relay.id == relay_id {
relays.remove(at: i)
break
}
i += 1
} }
} }
@ -105,38 +79,51 @@ class RelayPool {
/// This is used to retry dead connections /// This is used to retry dead connections
func connect_to_disconnected() { func connect_to_disconnected() {
for relay in relays where !relay.is_broken && !relay.connection.isConnected { for relay in relays where !relay.is_broken && relay.connection.state != .connected {
let c = relay.connection let c = relay.connection
let is_connecting = c.isReconnecting || c.isConnecting let is_connecting = c.state == .reconnecting || c.state == .connecting
let retry_attempts = retry_attempts_per_relay[c.url] ?? 0 let retry_attempts = retry_attempts_per_relay[c.url] ?? 0
let delay = pow(RelayPool.base_reconnect_delay, TimeInterval(retry_attempts_per_relay[c.url] ?? 0))
let delay = pow(Constants.base_reconnect_delay, TimeInterval(retry_attempts + 1)) // the + 1 helps us avoid a 1-second delay for the first retry
if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > delay { if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > delay {
print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying after \(delay) seconds...") if retry_attempts > Constants.max_retry_attempts {
relay.connection.connect(force: true) if c.state != .notConnected {
retry_attempts_per_relay[c.url] = retry_attempts + 1 c.disconnect()
print("exceeded max connection attempts with \(relay.descriptor.url.absoluteString)")
relay.mark_broken()
}
continue
} else {
print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying after \(delay) seconds...")
c.connect(force: true)
retry_attempts_per_relay[c.url] = retry_attempts + 1
}
} else if is_connecting { } else if is_connecting {
continue continue
} else { } else {
relay.connection.reconnect() c.reconnect()
} }
} }
} }
func reconnect(to: [String]? = nil) { func reconnect(to relay_ids: [String]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays let relays: [Relay]
for relay in relays { if let relay_ids {
relays = get_relays(relay_ids)
} else {
relays = self.relays
}
for relay in relays where !relay.is_broken {
// don't try to reconnect to broken relays // don't try to reconnect to broken relays
relay.connection.reconnect() relay.connection.reconnect()
} }
} }
func mark_broken(_ relay_id: String) { func mark_broken(_ relay_id: String) {
for relay in relays { relays.first(where: { $0.id == relay_id })?.mark_broken()
relay.mark_broken()
}
} }
func connect(to: [String]? = nil) { func connect(to: [String]? = nil) {
@ -146,8 +133,8 @@ class RelayPool {
} }
} }
func disconnect(to: [String]? = nil) { private func disconnect(from: [String]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays let relays = from.map{ get_relays($0) } ?? self.relays
for relay in relays { for relay in relays {
relay.connection.disconnect() relay.connection.disconnect()
} }
@ -155,35 +142,23 @@ class RelayPool {
func unsubscribe(sub_id: String, to: [String]? = nil) { func unsubscribe(sub_id: String, to: [String]? = nil) {
if to == nil { if to == nil {
self.remove_handler(sub_id: sub_id) remove_handler(sub_id: sub_id)
} }
self.send(.unsubscribe(sub_id), to: to) send(.unsubscribe(sub_id), to: to)
} }
func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (String, NostrConnectionEvent) -> (), to: [String]? = nil) { func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]? = nil, handler: @escaping (String, NostrConnectionEvent) -> ()) {
register_handler(sub_id: sub_id, handler: handler) register_handler(sub_id: sub_id, handler: handler)
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to) send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
} }
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]?, handler: @escaping (String, NostrConnectionEvent) -> ()) { private func count_queued(relay: String) -> Int {
register_handler(sub_id: sub_id, handler: handler) request_queue.filter({ $0.relay == relay }).count
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
}
func count_queued(relay: String) -> Int {
var c = 0
for request in request_queue {
if request.relay == relay {
c += 1
}
}
return c
} }
func queue_req(r: NostrRequest, relay: String) { func queue_req(r: NostrRequest, relay: String) {
let count = count_queued(relay: relay) let count = count_queued(relay: relay)
guard count <= 10 else { guard count <= Constants.max_queued_requests else {
print("can't queue, too many queued events for \(relay)") print("can't queue, too many queued events for \(relay)")
return return
} }
@ -193,10 +168,10 @@ class RelayPool {
} }
func send(_ req: NostrRequest, to: [String]? = nil) { func send(_ req: NostrRequest, to: [String]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays let relays = to.map { get_relays($0) } ?? self.relays
for relay in relays { for relay in relays {
guard relay.connection.isConnected else { guard relay.connection.state == .connected else {
queue_req(r: req, relay: relay.id) queue_req(r: req, relay: relay.id)
continue continue
} }
@ -216,17 +191,14 @@ class RelayPool {
func record_last_pong(relay_id: String, event: NostrConnectionEvent) { func record_last_pong(relay_id: String, event: NostrConnectionEvent) {
if case .ws_event(let ws_event) = event { if case .ws_event(let ws_event) = event {
if case .pong = ws_event { if case .pong = ws_event {
for relay in relays { if let relay = relays.first(where: { $0.id == relay_id }) {
if relay.id == relay_id { relay.last_pong = UInt32(Date.now.timeIntervalSince1970)
relay.last_pong = UInt32(Date.now.timeIntervalSince1970)
return
}
} }
} }
} }
} }
func run_queue(_ relay_id: String) { private func run_queue(_ relay_id: String) {
self.request_queue = request_queue.reduce(into: Array<QueuedRequest>()) { (q, req) in self.request_queue = request_queue.reduce(into: Array<QueuedRequest>()) { (q, req) in
guard req.relay == relay_id else { guard req.relay == relay_id else {
q.append(req) q.append(req)
@ -238,23 +210,20 @@ class RelayPool {
} }
} }
func record_seen(relay_id: String, event: NostrConnectionEvent) { private func record_seen(relay_id: String, event: NostrConnectionEvent) {
if case .nostr_event(let ev) = event { if case .nostr_event(let ev) = event {
if case .event(_, let nev) = ev { if case .event(_, let nev) = ev {
let k = relay_id + nev.id let k = relay_id + nev.id
if !seen.contains(k) { if !seen.contains(k) {
seen.insert(k) seen.insert(k)
if counts[relay_id] == nil { let prev_count = counts[relay_id] ?? 0
counts[relay_id] = 1 counts[relay_id] = prev_count + 1
} else {
counts[relay_id] = (counts[relay_id] ?? 0) + 1
}
} }
} }
} }
} }
func handle_event(relay_id: String, event: NostrConnectionEvent) { private func handle_event(relay_id: String, event: NostrConnectionEvent) {
record_last_pong(relay_id: relay_id, event: event) record_last_pong(relay_id: relay_id, event: event)
record_seen(relay_id: relay_id, event: event) record_seen(relay_id: relay_id, event: event)
@ -276,5 +245,3 @@ func add_rw_relay(_ pool: RelayPool, _ url: String) {
let url_ = URL(string: url)! let url_ = URL(string: url)!
try? pool.add_relay(url_, info: RelayInfo.rw) try? pool.add_relay(url_, info: RelayInfo.rw)
} }

View File

@ -7,6 +7,16 @@
import SwiftUI import SwiftUI
extension RelayConnection.State {
var indicatorColor: Color {
switch self {
case .connected: return .green
case .connecting, .reconnecting: return .yellow
default: return .red
}
}
}
struct RelayStatus: View { struct RelayStatus: View {
let pool: RelayPool let pool: RelayPool
let relay: String let relay: String
@ -16,18 +26,10 @@ struct RelayStatus: View {
@State var conn_color: Color = .gray @State var conn_color: Color = .gray
func update_connection_color() { func update_connection_color() {
for relay in pool.relays { guard let relay = pool.relays.first(where: { $0.id == relay }) else {
if relay.id == self.relay { return
let c = relay.connection
if c.isConnected {
conn_color = .green
} else if c.isConnecting || c.isReconnecting {
conn_color = .yellow
} else {
conn_color = .red
}
}
} }
conn_color = relay.connection.state.indicatorColor
} }
var body: some View { var body: some View {