mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2024-09-29 16:30:49 +00:00
Merge remote-tracking branch 'origin/better_use_of_eose_at_memory_cost'
This commit is contained in:
commit
0980208cce
@ -46,7 +46,7 @@ open class Note(val idHex: String) {
|
||||
var relays = setOf<String>()
|
||||
private set
|
||||
|
||||
var lastReactionsDownloadTime: Long? = null
|
||||
var lastReactionsDownloadTime: Map<String, Long> = emptyMap()
|
||||
|
||||
fun id() = Hex.decode(idHex)
|
||||
open fun idNote() = id().toNote()
|
||||
|
@ -31,7 +31,7 @@ class User(val pubkeyHex: String) {
|
||||
var reports = mapOf<User, Set<Note>>()
|
||||
private set
|
||||
|
||||
var latestReportTime: Long = 0
|
||||
var latestEOSEs: Map<String, Long> = emptyMap()
|
||||
|
||||
var zaps = mapOf<Note, Note?>()
|
||||
private set
|
||||
@ -124,11 +124,6 @@ class User(val pubkeyHex: String) {
|
||||
reports = reports + Pair(author, (reports[author] ?: emptySet()) + note)
|
||||
liveSet?.reports?.invalidateData()
|
||||
}
|
||||
|
||||
val reportTime = note.createdAt() ?: 0
|
||||
if (reportTime > latestReportTime) {
|
||||
latestReportTime = reportTime
|
||||
}
|
||||
}
|
||||
|
||||
fun removeReport(deleteNote: Note) {
|
||||
|
@ -114,7 +114,7 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
|
||||
if (type == Relay.Type.EOSE && channel != null) {
|
||||
// updates a per subscripton since date
|
||||
subscriptions[channel]?.updateEOSE(Date().time / 1000)
|
||||
subscriptions[channel]?.updateEOSE(Date().time / 1000, relay.url)
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,7 +139,7 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
}
|
||||
}
|
||||
|
||||
fun requestNewChannel(onEOSE: ((Long) -> Unit)? = null): Subscription {
|
||||
fun requestNewChannel(onEOSE: ((Long, String) -> Unit)? = null): Subscription {
|
||||
val newSubscription = Subscription(UUID.randomUUID().toString().substring(0, 4), onEOSE)
|
||||
subscriptions = subscriptions + Pair(newSubscription.id, newSubscription)
|
||||
return newSubscription
|
||||
|
@ -33,10 +33,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
|
||||
|
||||
val now = Date().time / 1000
|
||||
|
||||
return addressesToWatch.filter {
|
||||
val lastTime = it.lastReactionsDownloadTime
|
||||
lastTime == null || lastTime < (now - 10)
|
||||
}.mapNotNull {
|
||||
return addressesToWatch.mapNotNull {
|
||||
it.address()?.let { aTag ->
|
||||
TypedFilter(
|
||||
types = FeedType.values().toSet(),
|
||||
@ -64,10 +61,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
|
||||
|
||||
val now = Date().time / 1000
|
||||
|
||||
return addressesToWatch.filter {
|
||||
val lastTime = it.lastReactionsDownloadTime
|
||||
lastTime == null || lastTime < (now - 10)
|
||||
}.mapNotNull {
|
||||
return addressesToWatch.mapNotNull {
|
||||
it.address()?.let { aTag ->
|
||||
TypedFilter(
|
||||
types = FeedType.values().toSet(),
|
||||
@ -90,10 +84,7 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
|
||||
|
||||
val now = Date().time / 1000
|
||||
|
||||
return reactionsToWatch.filter {
|
||||
val lastTime = it.lastReactionsDownloadTime
|
||||
lastTime == null || lastTime < (now - 10)
|
||||
}.map {
|
||||
return reactionsToWatch.map {
|
||||
TypedFilter(
|
||||
types = FeedType.values().toSet(),
|
||||
filter = JsonFilter(
|
||||
@ -145,9 +136,9 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
|
||||
)
|
||||
}
|
||||
|
||||
val singleEventChannel = requestNewChannel { time ->
|
||||
val singleEventChannel = requestNewChannel { time, relayUrl ->
|
||||
eventsToWatch.forEach {
|
||||
it.lastReactionsDownloadTime = time
|
||||
it.lastReactionsDownloadTime = it.lastReactionsDownloadTime + Pair(relayUrl, time)
|
||||
}
|
||||
// Many relays operate with limits in the amount of filters.
|
||||
// As information comes, the filters will be rotated to get more data.
|
||||
|
@ -34,13 +34,16 @@ object NostrSingleUserDataSource : NostrDataSource("SingleUserFeed") {
|
||||
filter = JsonFilter(
|
||||
kinds = listOf(ReportEvent.kind),
|
||||
tags = mapOf("p" to listOf(it.pubkeyHex)),
|
||||
since = it.latestReportTime
|
||||
since = it.latestEOSEs
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val userChannel = requestNewChannel() {
|
||||
val userChannel = requestNewChannel() { time, relayUrl ->
|
||||
usersToWatch.forEach {
|
||||
it.latestEOSEs = it.latestEOSEs + Pair(relayUrl, time)
|
||||
}
|
||||
// Many relays operate with limits in the amount of filters.
|
||||
// As information comes, the filters will be rotated to get more data.
|
||||
invalidateFilters()
|
||||
|
@ -25,7 +25,7 @@ object NostrThreadDataSource : NostrDataSource("SingleThreadFeed") {
|
||||
)
|
||||
}
|
||||
|
||||
val loadEventsChannel = requestNewChannel() {
|
||||
val loadEventsChannel = requestNewChannel() { eoseTime, relay ->
|
||||
// Many relays operate with limits in the amount of filters.
|
||||
// As information comes, the filters will be rotated to get more data.
|
||||
invalidateFilters()
|
||||
|
@ -4,26 +4,19 @@ import com.google.gson.Gson
|
||||
import com.google.gson.GsonBuilder
|
||||
import com.google.gson.JsonArray
|
||||
import com.google.gson.JsonObject
|
||||
import com.vitorpamplona.amethyst.service.model.Event
|
||||
import java.io.Serializable
|
||||
import java.util.*
|
||||
|
||||
interface Filter {
|
||||
fun match(event: Event): Boolean
|
||||
fun toShortString(): String
|
||||
}
|
||||
|
||||
class JsonFilter(
|
||||
val ids: List<String>? = null,
|
||||
val authors: List<String>? = null,
|
||||
val kinds: List<Int>? = null,
|
||||
val tags: Map<String, List<String>>? = null,
|
||||
val since: Long? = null,
|
||||
val since: Map<String, Long>? = null,
|
||||
val until: Long? = null,
|
||||
val limit: Int? = null,
|
||||
val search: String? = null
|
||||
) : Filter, Serializable {
|
||||
fun toJson(): String {
|
||||
) {
|
||||
fun toJson(forRelay: String? = null): String {
|
||||
val jsonObject = JsonObject()
|
||||
ids?.run {
|
||||
jsonObject.add("ids", JsonArray().apply { ids.forEach { add(it) } })
|
||||
@ -40,7 +33,20 @@ class JsonFilter(
|
||||
}
|
||||
}
|
||||
since?.run {
|
||||
jsonObject.addProperty("since", since)
|
||||
if (!isEmpty()) {
|
||||
if (forRelay != null) {
|
||||
val relaySince = get(forRelay)
|
||||
if (relaySince != null) {
|
||||
jsonObject.addProperty("since", relaySince)
|
||||
}
|
||||
} else {
|
||||
val jsonObjectSince = JsonObject()
|
||||
entries.forEach { sincePairs ->
|
||||
jsonObjectSince.addProperty(sincePairs.key, "${sincePairs.value}")
|
||||
}
|
||||
jsonObject.add("since", jsonObjectSince)
|
||||
}
|
||||
}
|
||||
}
|
||||
until?.run {
|
||||
jsonObject.addProperty("until", until)
|
||||
@ -54,90 +60,7 @@ class JsonFilter(
|
||||
return gson.toJson(jsonObject)
|
||||
}
|
||||
|
||||
override fun match(event: Event): Boolean {
|
||||
if (ids?.any { event.id == it } == false) return false
|
||||
if (kinds?.any { event.kind == it } == false) return false
|
||||
if (authors?.any { event.pubKey == it } == false) return false
|
||||
tags?.forEach { tag ->
|
||||
if (!event.tags.any { it.first() == tag.key && it[1] in tag.value }) return false
|
||||
}
|
||||
if (event.createdAt !in (since ?: Long.MIN_VALUE)..(until ?: Long.MAX_VALUE)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun toString(): String = "JsonFilter${toJson()}"
|
||||
|
||||
override fun toShortString(): String {
|
||||
val list = ArrayList<String>()
|
||||
ids?.run {
|
||||
list.add("ids")
|
||||
}
|
||||
authors?.run {
|
||||
list.add("authors")
|
||||
}
|
||||
kinds?.run {
|
||||
list.add("kinds[${kinds.joinToString()}]")
|
||||
}
|
||||
tags?.run {
|
||||
list.add("tags")
|
||||
}
|
||||
since?.run {
|
||||
list.add("since")
|
||||
}
|
||||
until?.run {
|
||||
list.add("until")
|
||||
}
|
||||
limit?.run {
|
||||
list.add("limit")
|
||||
}
|
||||
search?.run {
|
||||
list.add("search")
|
||||
}
|
||||
return list.joinToString()
|
||||
}
|
||||
|
||||
companion object {
|
||||
val gson: Gson = GsonBuilder().create()
|
||||
|
||||
fun fromJson(json: String): JsonFilter {
|
||||
val jsonFilter = gson.fromJson(json, JsonObject::class.java)
|
||||
return fromJson(jsonFilter)
|
||||
}
|
||||
|
||||
val declaredFields = JsonFilter::class.java.declaredFields.map { it.name }
|
||||
fun fromJson(json: JsonObject): JsonFilter {
|
||||
// sanity check
|
||||
if (json.keySet().any { !(it.startsWith("#") || it in declaredFields) }) {
|
||||
println("Filter $json contains unknown parameters.")
|
||||
}
|
||||
return JsonFilter(
|
||||
ids = if (json.has("ids")) json.getAsJsonArray("ids").map { it.asString } else null,
|
||||
authors = if (json.has("authors")) {
|
||||
json.getAsJsonArray("authors")
|
||||
.map { it.asString }
|
||||
} else {
|
||||
null
|
||||
},
|
||||
kinds = if (json.has("kinds")) {
|
||||
json.getAsJsonArray("kinds")
|
||||
.map { it.asInt }
|
||||
} else {
|
||||
null
|
||||
},
|
||||
tags = json
|
||||
.entrySet()
|
||||
.filter { it.key.startsWith("#") }
|
||||
.associate {
|
||||
it.key.substring(1) to it.value.asJsonArray.map { it.asString }
|
||||
}
|
||||
.ifEmpty { null },
|
||||
since = if (json.has("since")) json.get("since").asLong else null,
|
||||
until = if (json.has("until")) json.get("until").asLong else null,
|
||||
limit = if (json.has("limit")) json.get("limit").asInt else null,
|
||||
search = if (json.has("search")) json.get("search").asString else null
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +92,11 @@ class Relay(
|
||||
// Log.w("Relay", "Relay onEVENT $url, $channel")
|
||||
eventDownloadCounterInBytes += text.bytesUsedInMemory()
|
||||
val event = Event.fromJson(msg[2], Client.lenient)
|
||||
|
||||
if (event.kind == 23195 || event.kind == 23196) {
|
||||
println("AAAAA ${event.toJson()}")
|
||||
}
|
||||
|
||||
listeners.forEach { it.onEvent(this@Relay, channel, event) }
|
||||
}
|
||||
"EOSE" -> listeners.forEach {
|
||||
@ -183,8 +188,8 @@ class Relay(
|
||||
val filters = Client.getSubscriptionFilters(requestId).filter { activeTypes.intersect(it.types).isNotEmpty() }
|
||||
if (filters.isNotEmpty()) {
|
||||
val request =
|
||||
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.filter.toJson() }}]"""
|
||||
// println("FILTERSSENT ${url} ${request}")
|
||||
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.filter.toJson(url) }}]"""
|
||||
// println("FILTERSSENT $url $request")
|
||||
socket?.send(request)
|
||||
eventUploadCounterInBytes += request.bytesUsedInMemory()
|
||||
}
|
||||
|
@ -7,12 +7,12 @@ import java.util.UUID
|
||||
|
||||
data class Subscription(
|
||||
val id: String = UUID.randomUUID().toString().substring(0, 4),
|
||||
val onEOSE: ((Long) -> Unit)? = null
|
||||
val onEOSE: ((Long, String) -> Unit)? = null
|
||||
) {
|
||||
var typedFilters: List<TypedFilter>? = null // Inactive when null
|
||||
|
||||
fun updateEOSE(l: Long) {
|
||||
onEOSE?.let { it(l) }
|
||||
fun updateEOSE(time: Long, relay: String) {
|
||||
onEOSE?.let { it(time, relay) }
|
||||
}
|
||||
|
||||
fun toJson(): String {
|
||||
|
@ -40,9 +40,15 @@ class TypedFilter(
|
||||
jsonObject.add("#${kv.key}", JsonArray().apply { kv.value.forEach { add(it) } })
|
||||
}
|
||||
}
|
||||
/*
|
||||
Does not include since in the json comparison
|
||||
filter.since?.run {
|
||||
jsonObject.addProperty("since", filter.since)
|
||||
val jsonObjectSince = JsonObject()
|
||||
entries.forEach { sincePairs ->
|
||||
jsonObjectSince.addProperty(sincePairs.key, "${sincePairs.value}")
|
||||
}
|
||||
jsonObject.add("since", jsonObjectSince)
|
||||
}*/
|
||||
filter.until?.run {
|
||||
jsonObject.addProperty("until", filter.until)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user