Set up the closing of subscriptions upon EOSE. This is in preparation for batching subscriptions and throttling based on backlog.

This commit is contained in:
Robert C. Martin 2023-04-21 10:42:55 -05:00
parent ab6a3936ea
commit a3c00964a9
2 changed files with 23 additions and 15 deletions

View File

@ -18,14 +18,13 @@
date (Date. (long time))]
(.format (SimpleDateFormat. "MM/dd/yyyy kk:mm:ss z") date)))
(defn send-request [request close]
(defn send-request [[_type id _filters :as request]]
(doseq [url (keys @relays)]
(when (not= :read-none (get-in @relays [url :read]))
(let [relay (:connection (get @relays url))]
(when (some? relay)
(relay/send relay request)
(future (do (Thread/sleep 2000)
(relay/send relay close))))))))
(set-mem [:active-subscriptions url id] {:close true})
(relay/send relay request))))))
(defn- make-request-id []
(let [r (rand-int 1000000)]
@ -33,9 +32,8 @@
(defn request-note [id]
(let [req-id (make-request-id)
request ["REQ" req-id {"kinds" [1] "ids" [(util/hexify id)]} ]
close ["CLOSE" req-id]]
(send-request request close))
request ["REQ" req-id {"kinds" [1] "ids" [(util/hexify id)]}]]
(send-request request))
)
(defn request-profiles-and-contacts-for [authors]
@ -46,9 +44,8 @@
(map #(subs % 0 10) (take 1000 (shuffle hexified-authors))))
r (rand-int 1000000)
request ["REQ" (str "ms-request-" r) {"kinds" [0 3]
"authors" trimmed-authors}]
close ["CLOSE" (str "ms-request-" r)]]
(send-request request close)
"authors" trimmed-authors}]]
(send-request request)
))
(defn request-contact-lists [relay]
@ -83,6 +80,7 @@
past-mention-filter (add-trustees "#p" past-filter trustees)
future-mention-filter (add-trustees "#p" future-filter trustees)]
(when (> now since)
(set-mem [:active-subscriptions (::ws-relay/url relay) "ms-past"] {:close true})
(if (some? who)
(relay/send relay ["REQ" "ms-past" past-author-filter past-mention-filter])
(relay/send relay ["REQ" "ms-past" past-filter])))
@ -122,12 +120,22 @@
latest (if (nil? latest) time (max latest time))]
[earliest latest]))
(defn handle-relay-message [relay message]
(let [url (::ws-relay/url relay)
(defn handle-relay-message [url message]
(let [relay (:connection (get @relays url))
[type id event] message]
(when (= type "EVENT")
(cond
(= type "EVENT") ;nobody uses this right now...
(let [created-at (get event "created_at")]
(update-mem [:relay-subscription-event-times url id] add-event-time created-at)))
(update-mem [:relay-subscription-event-times url id] add-event-time created-at))
(= type "EOSE")
(let [active-subscription (get-mem [:active-subscriptions url id])]
(when (and (some? active-subscription)
(:close active-subscription))
(relay/send relay ["CLOSE" id])
(update-mem [:active-subscriptions url] dissoc id)))
)
(update-mem :websocket-backlog inc)
(update-mem :incoming-events inc)
(update-mem [:events-by-relay url] #(if (nil? %) 1 (inc %)))

View File

@ -26,7 +26,7 @@
(when last
(try
(let [envelope (json/read-str (.toString buffer))]
((:recv callbacks) relay envelope))
((:recv callbacks) (::url relay) envelope))
(catch Exception e
(log-pr 1 'onText url (.getMessage e))
(log-pr 1 (.toString buffer))))