Increases timeout period and avoid double websocket connections.

This commit is contained in:
Vitor Pamplona 2023-02-04 13:32:09 -05:00
parent 80bc09df92
commit 1940303130

View File

@ -17,12 +17,14 @@ class Relay(
var write: Boolean = true
) {
private val httpClient = OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.connectTimeout(100, TimeUnit.SECONDS)
.readTimeout(100, TimeUnit.SECONDS)
.callTimeout(100, TimeUnit.SECONDS)
.build();
private var listeners = setOf<Listener>()
private var socket: WebSocket? = null
private var isReady: Boolean = false
var eventDownloadCounter = 0
var eventUploadCounter = 0
@ -43,14 +45,19 @@ class Relay(
return socket != null
}
@Synchronized
fun requestAndWatch() {
if (socket != null) return
try {
val request = Request.Builder().url(url.trim()).build()
val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
isReady = true
ping = response.receivedResponseAtMillis - response.sentRequestAtMillis
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
// Sends everything.
Client.allSubscriptions().forEach {
sendFilter(requestId = it)
@ -65,21 +72,26 @@ class Relay(
val channel = msg[1].asString
when (type) {
"EVENT" -> {
//Log.w("Relay", "Relay onEVENT $url, $channel")
eventDownloadCounter++
val event = Event.fromJson(msg[2], Client.lenient)
listeners.forEach { it.onEvent(this@Relay, channel, event) }
}
"EOSE" -> listeners.forEach {
//Log.w("Relay", "Relay onEOSE $url, $channel")
it.onRelayStateChange(this@Relay, Type.EOSE, channel)
}
"NOTICE" -> listeners.forEach {
//Log.w("Relay", "Relay onNotice $url, $channel")
// "channel" being the second string in the string array ...
it.onError(this@Relay, channel, Error("Relay sent notice: $channel"))
}
"OK" -> listeners.forEach {
//Log.w("Relay", "Relay onOK $url, $channel")
it.onSendResponse(this@Relay, msg[1].asString, msg[2].asBoolean, msg[3].asString)
}
else -> listeners.forEach {
//Log.w("Relay", "Relay something else $url, $channel")
it.onError(
this@Relay,
channel,
@ -105,6 +117,7 @@ class Relay(
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
socket = null
isReady = false
closingTime = Date().time / 1000
listeners.forEach { it.onRelayStateChange(this@Relay, Type.DISCONNECT, null) }
}
@ -115,6 +128,7 @@ class Relay(
socket?.close(1000, "Normal close")
// Failures disconnect the relay.
socket = null
isReady = false
closingTime = Date().time / 1000
Log.w("Relay", "Relay onFailure $url, ${response?.message}")
@ -138,6 +152,7 @@ class Relay(
closingTime = Date().time / 1000
socket?.close(1000, "Normal close")
socket = null
isReady = false
}
fun sendFilter(requestId: String) {
@ -148,12 +163,14 @@ class Relay(
requestAndWatch()
}
} else {
val filters = Client.getSubscriptionFilters(requestId)
if (filters.isNotEmpty()) {
val request =
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.toJson() }}]"""
//println("FILTERSSENT ${url} " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""")
socket?.send(request)
if (isReady) {
val filters = Client.getSubscriptionFilters(requestId)
if (filters.isNotEmpty()) {
val request =
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.toJson() }}]"""
//println("FILTERSSENT ${url} " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""")
socket?.send(request)
}
}
}
}
@ -161,6 +178,7 @@ class Relay(
fun sendFilterOnlyIfDisconnected() {
if (socket == null) {
println("sendfilter Only if Disconnected ${url} ")
requestAndWatch()
}
}