Capturing EOSE events and keeping messages within relay limits (10 subscriptions and 10 REQ events at a time)

This commit is contained in:
Vitor Pamplona 2023-02-02 16:57:03 -05:00
parent fb73308995
commit a82f260142
18 changed files with 106 additions and 135 deletions

View File

@ -51,7 +51,7 @@ class Note(val idHex: String) {
this.mentions = mentions
this.replyTo = replyTo
invalidateData(live)
live.invalidateData()
}
fun formattedDateTime(timestamp: Long): String {
@ -83,22 +83,22 @@ class Note(val idHex: String) {
fun addReply(note: Note) {
if (replies.add(note))
invalidateData(liveReplies)
liveReplies.invalidateData()
}
fun addBoost(note: Note) {
if (boosts.add(note))
invalidateData(liveBoosts)
liveBoosts.invalidateData()
}
fun addReaction(note: Note) {
if (reactions.add(note))
invalidateData(liveReactions)
liveReactions.invalidateData()
}
fun addReport(note: Note) {
if (reports.add(note))
invalidateData(liveReports)
liveReports.invalidateData()
}
fun isReactedBy(user: User): Boolean {
@ -155,24 +155,24 @@ class Note(val idHex: String) {
val liveBoosts: NoteLiveData = NoteLiveData(this)
val liveReplies: NoteLiveData = NoteLiveData(this)
val liveReports: NoteLiveData = NoteLiveData(this)
}
class NoteLiveData(val note: Note): LiveData<NoteState>(NoteState(note)) {
// Refreshes observers in batches.
var handlerWaiting = false
@Synchronized
fun invalidateData(live: NoteLiveData) {
fun invalidateData() {
if (handlerWaiting) return
handlerWaiting = true
val scope = CoroutineScope(Job() + Dispatchers.Default)
scope.launch {
delay(100)
live.refresh()
refresh()
handlerWaiting = false
}
}
}
class NoteLiveData(val note: Note): LiveData<NoteState>(NoteState(note)) {
fun refresh() {
postValue(NoteState(note))
}

View File

@ -43,6 +43,9 @@ class User(val pubkeyHex: String) {
val reports = Collections.synchronizedSet(mutableSetOf<Note>())
val relaysBeingUsed = Collections.synchronizedMap(mutableMapOf<String, RelayInfo>())
var latestMetadataRequestEOSE: Long? = null
var latestReportRequestEOSE: Long? = null
fun toBestDisplayName(): String {
return bestDisplayName() ?: bestUsername() ?: pubkeyDisplayHex
}
@ -64,8 +67,8 @@ class User(val pubkeyHex: String) {
follows.add(user)
user.followers.add(this)
invalidateData(liveFollows)
user.invalidateData(liveFollows)
liveFollows.invalidateData()
user.liveFollows.invalidateData()
listeners.forEach {
it.onFollowsChange()
@ -75,8 +78,8 @@ class User(val pubkeyHex: String) {
follows.remove(user)
user.followers.remove(this)
invalidateData(liveFollows)
user.invalidateData(liveFollows)
liveFollows.invalidateData()
user.liveFollows.invalidateData()
updateSubscribers {
it.onFollowsChange()
@ -91,7 +94,7 @@ class User(val pubkeyHex: String) {
fun addReport(note: Note) {
if (reports.add(note)) {
updateSubscribers { it.onNewReports() }
invalidateData(liveReports)
liveReports.invalidateData()
}
}
@ -118,7 +121,7 @@ class User(val pubkeyHex: String) {
fun addMessage(user: User, msg: Note) {
getOrCreateChannel(user).add(msg)
invalidateData(liveMessages)
liveMessages.invalidateData()
updateSubscribers { it.onNewMessage() }
}
@ -140,7 +143,7 @@ class User(val pubkeyHex: String) {
}
updateSubscribers { it.onNewRelayInfo() }
invalidateData(liveRelayInfo)
liveRelayInfo.invalidateData()
}
fun updateFollows(newFollows: Set<User>, updateAt: Long) {
@ -168,14 +171,14 @@ class User(val pubkeyHex: String) {
}
}
invalidateData(liveRelays)
liveRelays.invalidateData()
}
fun updateUserInfo(newUserInfo: UserMetadata, updateAt: Long) {
info = newUserInfo
updatedMetadataAt = updateAt
invalidateData(liveMetadata)
liveMetadata.invalidateData()
}
fun isFollowing(user: User): Boolean {
@ -248,21 +251,6 @@ class User(val pubkeyHex: String) {
val liveRelays: UserLiveData = UserLiveData(this)
val liveRelayInfo: UserLiveData = UserLiveData(this)
val liveMetadata: UserLiveData = UserLiveData(this)
// Refreshes observers in batches.
var handlerWaiting = false
@Synchronized
fun invalidateData(live: UserLiveData) {
if (handlerWaiting) return
handlerWaiting = true
val scope = CoroutineScope(Job() + Dispatchers.Main)
scope.launch {
delay(100)
live.refresh()
handlerWaiting = false
}
}
}
class UserMetadata {
@ -291,7 +279,23 @@ class UserMetadata {
}
class UserLiveData(val user: User): LiveData<UserState>(UserState(user)) {
fun refresh() {
// Refreshes observers in batches.
var handlerWaiting = false
@Synchronized
fun invalidateData() {
if (handlerWaiting) return
handlerWaiting = true
val scope = CoroutineScope(Job() + Dispatchers.Main)
scope.launch {
delay(100)
refresh()
handlerWaiting = false
}
}
private fun refresh() {
postValue(UserState(user))
}

View File

@ -4,12 +4,12 @@ import java.util.UUID
import nostr.postr.JsonFilter
data class Channel (
val id: String = UUID.randomUUID().toString().substring(0,4)
val id: String = UUID.randomUUID().toString().substring(0,4),
val onEOSE: ((Long) -> Unit)? = null
) {
var filter: List<JsonFilter>? = null // Inactive when null
private var lastEOSE: Long? = null
fun updateEOSE(l: Long) {
lastEOSE = l
onEOSE?.let { it(l) }
}
}

View File

@ -17,7 +17,7 @@ object NostrAccountDataSource: NostrDataSource<Note>("AccountData") {
return JsonFilter(
kinds = listOf(ContactListEvent.kind),
authors = listOf(account.userProfile().pubkeyHex),
limit = 5
limit = 1
)
}
@ -25,12 +25,16 @@ object NostrAccountDataSource: NostrDataSource<Note>("AccountData") {
return JsonFilter(
kinds = listOf(MetadataEvent.kind),
authors = listOf(account.userProfile().pubkeyHex),
limit = 5
limit = 1
)
}
val accountMetadataChannel = requestNewChannel()
val accountContactListChannel = requestNewChannel()
fun createNotificationFilter() = JsonFilter(
tags = mapOf("p" to listOf(account.userProfile().pubkeyHex)),
limit = 100
)
val accountChannel = requestNewChannel()
override fun feed(): List<Note> {
val user = account.userProfile()
@ -49,10 +53,10 @@ object NostrAccountDataSource: NostrDataSource<Note>("AccountData") {
override fun updateChannelFilters() {
// gets everthing about the user logged in
val newAccountMetadataFilter = createAccountMetadataFilter()
accountMetadataChannel.filter = listOf(newAccountMetadataFilter).ifEmpty { null }
val newAccountContactListEvent = createAccountContactListFilter()
accountContactListChannel.filter = listOf(newAccountContactListEvent).ifEmpty { null }
accountChannel.filter = listOf(
createAccountMetadataFilter(),
createAccountContactListFilter(),
createNotificationFilter()
).ifEmpty { null }
}
}

View File

@ -28,8 +28,7 @@ object NostrChatRoomDataSource: NostrDataSource<Note>("ChatroomFeed") {
tags = withUser?.let { mapOf("p" to listOf(it.pubkeyHex)) }
)
val incomingChannel = requestNewChannel()
val outgoingChannel = requestNewChannel()
val inandoutChannel = requestNewChannel()
// returns the last Note of each user.
override fun feed(): List<Note> {
@ -43,7 +42,6 @@ object NostrChatRoomDataSource: NostrDataSource<Note>("ChatroomFeed") {
}
override fun updateChannelFilters() {
incomingChannel.filter = listOf(createMessagesToMeFilter()).ifEmpty { null }
outgoingChannel.filter = listOf(createMessagesFromMeFilter()).ifEmpty { null }
inandoutChannel.filter = listOf(createMessagesToMeFilter(), createMessagesFromMeFilter()).ifEmpty { null }
}
}

View File

@ -52,14 +52,7 @@ object NostrChatroomListDataSource: NostrDataSource<Note>("MailBoxFeed") {
}
}
val incomingChannel = requestNewChannel()
val outgoingChannel = requestNewChannel()
val myChannelsChannel = requestNewChannel()
val myChannelsInfoChannel = requestNewChannel()
val myChannelsMessagesChannel = requestNewChannel()
val myChannelsCreatedbyMeChannel = requestNewChannel()
val chatroomListChannel = requestNewChannel()
// returns the last Note of each user.
override fun feed(): List<Note> {
@ -85,12 +78,16 @@ object NostrChatroomListDataSource: NostrDataSource<Note>("MailBoxFeed") {
}
override fun updateChannelFilters() {
incomingChannel.filter = listOf(createMessagesToMeFilter()).ifEmpty { null }
outgoingChannel.filter = listOf(createMessagesFromMeFilter()).ifEmpty { null }
myChannelsChannel.filter = listOf(createMyChannelsFilter()).ifEmpty { null }
myChannelsInfoChannel.filter = createLastChannelInfoFilter().ifEmpty { null }
myChannelsMessagesChannel.filter = createLastMessageOfEachChannelFilter().ifEmpty { null }
val list = listOf(
createMessagesToMeFilter(),
createMessagesFromMeFilter(),
createMyChannelsFilter()
)
//myChannelsCreatedbyMeChannel.filter = listOf(createChannelsCreatedbyMeFilter()).ifEmpty { null }
chatroomListChannel.filter = listOfNotNull(
list,
createLastChannelInfoFilter(),
createLastMessageOfEachChannelFilter()
).flatten().ifEmpty { null }
}
}

View File

@ -2,6 +2,7 @@ package com.vitorpamplona.amethyst.service
import android.os.Handler
import android.os.Looper
import android.util.Log
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.UrlCachedPreviewer
@ -88,7 +89,7 @@ abstract class NostrDataSource<T>(val debugName: String) {
}
override fun onError(error: Error, subscriptionId: String, relay: Relay) {
//Log.e("ERROR", "Relay ${relay.url}: ${error.message}")
Log.e("ERROR", "Relay ${relay.url}: ${error.message}")
}
override fun onRelayStateChange(type: Relay.Type, relay: Relay, channel: String?) {
@ -145,8 +146,8 @@ abstract class NostrDataSource<T>(val debugName: String) {
}
}
fun requestNewChannel(): Channel {
val newChannel = Channel(debugName+UUID.randomUUID().toString().substring(0,4))
fun requestNewChannel(onEOSE: ((Long) -> Unit)? = null): Channel {
val newChannel = Channel(debugName+UUID.randomUUID().toString().substring(0,4), onEOSE)
channels.add(newChannel)
channelIds.add(newChannel.id)
return newChannel

View File

@ -4,11 +4,7 @@ import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.model.UserState
import com.vitorpamplona.amethyst.service.model.RepostEvent
import com.vitorpamplona.amethyst.service.relays.Client
import com.vitorpamplona.amethyst.service.relays.Relay
import com.vitorpamplona.amethyst.service.relays.RelayPool
import nostr.postr.JsonFilter
import nostr.postr.events.TextNoteEvent
import nostr.postr.toHex

View File

@ -12,13 +12,6 @@ import nostr.postr.JsonFilter
object NostrNotificationDataSource: NostrDataSource<Note>("NotificationFeed") {
lateinit var account: Account
fun createNotificationFilter() = JsonFilter(
tags = mapOf("p" to listOf(account.userProfile().pubkeyHex)),
limit = 100
)
val notificationChannel = requestNewChannel()
override fun feed(): List<Note> {
val set = account.userProfile().taggedPosts
val filtered = synchronized(set) {
@ -31,7 +24,5 @@ object NostrNotificationDataSource: NostrDataSource<Note>("NotificationFeed") {
}.sortedBy { it.event?.createdAt }.reversed()
}
override fun updateChannelFilters() {
notificationChannel.filter = listOf(createNotificationFilter()).ifEmpty { null }
}
override fun updateChannelFilters() {}
}

View File

@ -46,8 +46,7 @@ object NostrSingleChannelDataSource: NostrDataSource<Note>("SingleChannelFeed")
)
}
val repliesAndReactionsChannel = requestNewChannel()
val loadEventsChannel = requestNewChannel()
val singleChannelChannel = requestNewChannel()
override fun feed(): List<Note> {
return emptyList()
@ -57,8 +56,7 @@ object NostrSingleChannelDataSource: NostrDataSource<Note>("SingleChannelFeed")
val reactions = createRepliesAndReactionsFilter()
val missing = createLoadEventsIfNotLoadedFilter()
repliesAndReactionsChannel.filter = listOfNotNull(reactions).ifEmpty { null }
loadEventsChannel.filter = listOfNotNull(missing).ifEmpty { null }
singleChannelChannel.filter = listOfNotNull(reactions, missing).ifEmpty { null }
}
fun add(eventId: String) {

View File

@ -60,8 +60,7 @@ object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
)
}
val repliesAndReactionsChannel = requestNewChannel()
val loadEventsChannel = requestNewChannel()
val singleEventChannel = requestNewChannel()
override fun feed(): List<Note> {
return synchronized(eventsToWatch) {
@ -75,8 +74,7 @@ object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
val reactions = createRepliesAndReactionsFilter()
val missing = createLoadEventsIfNotLoadedFilter()
repliesAndReactionsChannel.filter = listOfNotNull(reactions).ifEmpty { null }
loadEventsChannel.filter = listOfNotNull(missing).ifEmpty { null }
singleEventChannel.filter = listOfNotNull(reactions, missing).ifEmpty { null }
}
fun add(eventId: String) {

View File

@ -34,8 +34,11 @@ object NostrSingleUserDataSource: NostrDataSource<User>("SingleUserFeed") {
}
}
val userChannel = requestNewChannel()
val userReportChannel = requestNewChannel()
val userChannel = requestNewChannel(){
// Many relays operate with limits in the amount of filters.
// As information comes, the filters will be rotated to get more data.
invalidateFilters()
}
override fun feed(): List<User> {
return synchronized(usersToWatch) {
@ -46,8 +49,7 @@ object NostrSingleUserDataSource: NostrDataSource<User>("SingleUserFeed") {
}
override fun updateChannelFilters() {
userChannel.filter = createUserFilter()
userReportChannel.filter = createUserReportFilter()
userChannel.filter = listOfNotNull(createUserFilter()).flatten()
}
fun add(userId: String) {

View File

@ -43,7 +43,6 @@ object NostrThreadDataSource: NostrDataSource<Note>("SingleThreadFeed") {
)
}
val repliesAndReactionsChannel = requestNewChannel()
val loadEventsChannel = requestNewChannel()
override fun feed(): List<Note> {
@ -55,8 +54,7 @@ object NostrThreadDataSource: NostrDataSource<Note>("SingleThreadFeed") {
}
override fun updateChannelFilters() {
repliesAndReactionsChannel.filter = listOfNotNull(createRepliesAndReactionsFilter()).ifEmpty { null }
loadEventsChannel.filter = listOfNotNull(createLoadEventsIfNotLoadedFilter()).ifEmpty { null }
loadEventsChannel.filter = listOfNotNull(createLoadEventsIfNotLoadedFilter(), createRepliesAndReactionsFilter()).ifEmpty { null }
}
fun searchRoot(note: Note, testedNotes: MutableSet<Note> = mutableSetOf()): Note? {

View File

@ -6,6 +6,7 @@ import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.model.toByteArray
import nostr.postr.JsonFilter
import nostr.postr.events.ContactListEvent
import nostr.postr.events.MetadataEvent
import nostr.postr.events.TextNoteEvent
@ -34,8 +35,20 @@ object NostrUserProfileDataSource: NostrDataSource<Note>("UserProfileFeed") {
)
}
fun createFollowFilter(): JsonFilter {
return JsonFilter(
kinds = listOf(ContactListEvent.kind),
authors = listOf(user!!.pubkeyHex),
limit = 1
)
}
fun createFollowersFilter() = JsonFilter(
kinds = listOf(ContactListEvent.kind),
tags = mapOf("p" to listOf(user!!.pubkeyHex))
)
val userInfoChannel = requestNewChannel()
val notesChannel = requestNewChannel()
override fun feed(): List<Note> {
val notes = user?.notes ?: return emptyList()
@ -46,7 +59,11 @@ object NostrUserProfileDataSource: NostrDataSource<Note>("UserProfileFeed") {
}
override fun updateChannelFilters() {
userInfoChannel.filter = listOf(createUserInfoFilter()).ifEmpty { null }
notesChannel.filter = listOf(createUserPostsFilter()).ifEmpty { null }
userInfoChannel.filter = listOf(
createUserInfoFilter(),
createUserPostsFilter(),
createFollowFilter(),
createFollowersFilter()
).ifEmpty { null }
}
}

View File

@ -15,13 +15,6 @@ object NostrUserProfileFollowersDataSource: NostrDataSource<User>("UserProfileFo
resetFilters()
}
fun createFollowersFilter() = JsonFilter(
kinds = listOf(ContactListEvent.kind),
tags = mapOf("p" to listOf(user!!.pubkeyHex))
)
val followerChannel = requestNewChannel()
override fun feed(): List<User> {
val followers = user?.followers ?: emptyList()
@ -30,7 +23,5 @@ object NostrUserProfileFollowersDataSource: NostrDataSource<User>("UserProfileFo
}
}
override fun updateChannelFilters() {
followerChannel.filter = listOf(createFollowersFilter()).ifEmpty { null }
}
override fun updateChannelFilters() {}
}

View File

@ -15,16 +15,6 @@ object NostrUserProfileFollowsDataSource: NostrDataSource<User>("UserProfileFoll
resetFilters()
}
fun createFollowFilter(): JsonFilter {
return JsonFilter(
kinds = listOf(ContactListEvent.kind),
authors = listOf(user!!.pubkeyHex),
limit = 1
)
}
val followChannel = requestNewChannel()
override fun feed(): List<User> {
val follows = user?.follows ?: emptyList()
@ -33,7 +23,5 @@ object NostrUserProfileFollowsDataSource: NostrDataSource<User>("UserProfileFoll
}
}
override fun updateChannelFilters() {
followChannel.filter = listOf(createFollowFilter()).ifEmpty { null }
}
override fun updateChannelFilters() {}
}

View File

@ -143,7 +143,7 @@ class Relay(
val filters = Client.getSubscriptionFilters(requestId)
if (filters.isNotEmpty()) {
val request =
"""["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]"""
"""["REQ","$requestId",${filters.take(10).joinToString(",") { it.toJson() }}]"""
//println("FILTERSSENT ${url} " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""")
socket?.send(request)
}

View File

@ -9,14 +9,9 @@ import androidx.compose.material.TabRow
import androidx.compose.material.TabRowDefaults
import androidx.compose.material.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisposableEffect
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.getValue
import androidx.compose.runtime.livedata.observeAsState
import androidx.compose.runtime.remember
import androidx.compose.runtime.rememberCoroutineScope
import androidx.compose.ui.Modifier
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.unit.dp
import androidx.lifecycle.viewmodel.compose.viewModel
import androidx.navigation.NavController
@ -24,15 +19,8 @@ import com.google.accompanist.pager.ExperimentalPagerApi
import com.google.accompanist.pager.HorizontalPager
import com.google.accompanist.pager.pagerTabIndicatorOffset
import com.google.accompanist.pager.rememberPagerState
import com.vitorpamplona.amethyst.NotificationCache
import com.vitorpamplona.amethyst.service.NostrHomeDataSource
import com.vitorpamplona.amethyst.service.NostrUserProfileDataSource
import com.vitorpamplona.amethyst.service.NostrUserProfileFollowersDataSource
import com.vitorpamplona.amethyst.service.NostrUserProfileFollowsDataSource
import com.vitorpamplona.amethyst.ui.navigation.Route
import com.vitorpamplona.amethyst.ui.navigation.Routes
import com.vitorpamplona.amethyst.ui.screen.loggedIn.AccountViewModel
import java.lang.System.currentTimeMillis
import kotlinx.coroutines.launch
@OptIn(ExperimentalPagerApi::class)