* Properly reconnect to relays, #21

* Wait for relay acknowledgements when publishing events
This commit is contained in:
styppo 2023-01-27 15:47:21 +00:00
parent 7514a48e63
commit 2422782015
No known key found for this signature in database
GPG Key ID: 3AAA685C50724C28
12 changed files with 164 additions and 74 deletions

View File

@ -114,29 +114,30 @@ export default {
}, },
async publishPost() { async publishPost() {
this.publishing = true this.publishing = true
try {
const event = this.ancestor const event = this.ancestor
? EventBuilder.reply(this.ancestor, this.app.myPubkey, this.content).build() ? EventBuilder.reply(this.ancestor, this.app.myPubkey, this.content).build()
: EventBuilder.post(this.app.myPubkey, this.content).build() : EventBuilder.post(this.app.myPubkey, this.content).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event)
const numRelays = await this.nostr.publish(event)
if (numRelays) {
this.reset() this.reset()
this.$emit('publish', event) this.$emit('publish', event)
// TODO i18n // TODO i18n
const postType = this.ancestor ? 'Reply' : 'Post' const postType = this.ancestor ? 'Reply' : 'Post'
this.$q.notify({ this.$q.notify({
message: `${postType} published`, message: `${postType} published to ${numRelays} relays`,
color: 'positive', color: 'positive',
}) })
} catch (e) { } else {
console.error('Failed to publish post', e)
this.$q.notify({ this.$q.notify({
message: `Failed to publish post`, message: `Failed to publish post`,
color: 'negative' color: 'negative'
}) })
} }
this.publishing = false this.publishing = false
}, },
}, },

View File

@ -94,24 +94,23 @@ export default {
}, },
async publishMessage() { async publishMessage() {
this.publishing = true this.publishing = true
try {
const ciphertext = await this.app.encryptMessage(this.recipient, this.content) const ciphertext = await this.app.encryptMessage(this.recipient, this.content)
if (!ciphertext) return if (!ciphertext) return
const event = EventBuilder.message(this.app.myPubkey, this.recipient, ciphertext).build() const event = EventBuilder.message(this.app.myPubkey, this.recipient, ciphertext).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event)
if (await this.nostr.publish(event)) {
this.reset() this.reset()
this.$nextTick(this.focus.bind(this)) this.$nextTick(this.focus.bind(this))
this.$emit('publish', event) this.$emit('publish', event)
} catch (e) { } else {
console.error('Failed to send message', e)
this.$q.notify({ this.$q.notify({
message: `Failed to send message`, message: `Failed to send message`,
color: 'negative' color: 'negative'
}) })
} }
this.publishing = false this.publishing = false
}, },
}, },

View File

@ -76,13 +76,23 @@ export default {
async publishLike() { async publishLike() {
const event = EventBuilder.reaction(this.note, this.app.myPubkey).build() const event = EventBuilder.reaction(this.note, this.app.myPubkey).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event) if (!await this.nostr.publish(event)) {
this.$q.notify({
message: 'Failed to publish reaction',
color: 'negative',
})
}
}, },
async deleteLike() { async deleteLike() {
const ids = this.ourReactions.map(r => r.id) const ids = this.ourReactions.map(r => r.id)
const event = EventBuilder.delete(this.app.myPubkey, ids).build() const event = EventBuilder.delete(this.app.myPubkey, ids).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event) if (!await this.nostr.publish(event)) {
this.$q.notify({
message: 'Failed to delete reaction',
color: 'negative',
})
}
}, },
}, },
} }

View File

@ -78,7 +78,12 @@ export default {
} }
const event = EventBuilder.metadata(this.pubkey, metadata).build() const event = EventBuilder.metadata(this.pubkey, metadata).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event) if (!await this.nostr.publish(event)) {
this.$q.notify({
message: 'Failed to update profile',
color: 'negative'
})
}
}, },
}, },
watch: { watch: {

View File

@ -41,12 +41,17 @@ export default {
const event = EventBuilder.metadata(account.pubkey, {name: this.username}).build() const event = EventBuilder.metadata(account.pubkey, {name: this.username}).build()
await useAppStore().signEvent(event) await useAppStore().signEvent(event)
useNostrStore().publish(event) if (await useNostrStore().publish(event)) {
this.$emit('complete', { this.$emit('complete', {
pubkey: account.pubkey, pubkey: account.pubkey,
name: this.username name: this.username
}) })
} else {
this.$q.notify({
message: 'Failed to create profile',
color: 'negative',
})
}
} }
}, },
mounted() { mounted() {

View File

@ -41,7 +41,12 @@ export default {
async updateContacts(contacts) { async updateContacts(contacts) {
const event = EventBuilder.contacts(this.app.myPubkey, contacts.map(c => c.pubkey)).build() const event = EventBuilder.contacts(this.app.myPubkey, contacts.map(c => c.pubkey)).build()
if (!await this.app.signEvent(event)) return if (!await this.app.signEvent(event)) return
this.nostr.publish(event) if (!await this.nostr.publish(event)) {
this.$q.notify({
message: 'Failed to update followers',
color: 'negative',
})
}
}, },
toggleFollow() { toggleFollow() {
return this.isFollowing return this.isFollowing

View File

@ -1,4 +1,4 @@
import {Observable} from 'src/nostr/utils' import {Observable} from 'src/nostr/Observable'
export default class FetchQueue extends Observable { export default class FetchQueue extends Observable {
constructor(client, subId, fnGetId, fnCreateFilter, opts = {}) { constructor(client, subId, fnGetId, fnCreateFilter, opts = {}) {

View File

@ -10,7 +10,7 @@ import {useSettingsStore} from 'stores/Settings'
import {useStatStore} from 'src/nostr/store/StatStore' import {useStatStore} from 'src/nostr/store/StatStore'
import {useAppStore} from 'stores/App' import {useAppStore} from 'stores/App'
import {useMessageStore} from 'src/nostr/store/MessageStore' import {useMessageStore} from 'src/nostr/store/MessageStore'
import {Observable} from 'src/nostr/utils' import {Observable} from 'src/nostr/Observable'
import {CloseAfter} from 'src/nostr/Relay' import {CloseAfter} from 'src/nostr/Relay'
import DateUtils from 'src/utils/DateUtils' import DateUtils from 'src/utils/DateUtils'
@ -137,10 +137,13 @@ export const useNostrStore = defineStore('nostr', {
return !!this.seenBy[id] return !!this.seenBy[id]
}, },
publish(event) { async publish(event) {
const result = await this.client.publish(event)
if (result) {
// FIXME represent 'local' somehow // FIXME represent 'local' somehow
this.addEvent(event, {url: '<local>'}) this.addEvent(event, {url: '<local>'})
return this.client.publish(event) }
return result
}, },
subscribeForUser(pubkey) { subscribeForUser(pubkey) {

43
src/nostr/Observable.js Normal file
View File

@ -0,0 +1,43 @@
export class Observable {
constructor() {
this.listeners = {}
}
on(event, callback) {
this.addListener(event, {callback, once: false})
}
once(event, callback) {
this.addListener(event, {callback, once: true})
}
off(event, callback) {
const listeners = this.listeners[event]
if (!listeners) return
const idx = listeners.findIndex(listener => listener.callback === callback)
if (idx >= 0) listeners.splice(idx, 1)
}
addListener(event, listener) {
if (!this.listeners[event]) {
this.listeners[event] = [listener]
} else {
this.listeners[event].push(listener)
}
}
emit(event, ...args) {
const listeners = this.listeners[event]
if (!listeners) return
for (const listener of listeners) {
try {
listener.callback.apply(null, args)
} catch (e) {
console.error(`Exception thrown from '${event}' listener: ${e.message || e}`, e)
}
}
this.listeners[event] = listeners.filter(listener => !listener.once)
}
}

View File

@ -1,4 +1,4 @@
import {Observable} from 'src/nostr/utils' import {Observable} from 'src/nostr/Observable'
import Event from 'src/nostr/model/Event' import Event from 'src/nostr/model/Event'
export class Subscription extends Observable { export class Subscription extends Observable {
@ -75,7 +75,26 @@ export class Relay extends Observable {
} }
publish(event) { publish(event) {
this.socket.send(['EVENT', event]) return new Promise(resolve => {
if (!this.socket.send(['EVENT', event])) {
return resolve(false)
}
let timeout
const callback = (eventId, wasSaved) => {
if (eventId === event.id && wasSaved) {
clearTimeout(timeout)
this.off('ok', callback)
resolve(true)
}
}
timeout = setTimeout(() => {
this.off('ok', callback)
resolve(false)
}, 4000) // TODO make this a parameter
this.on('ok', callback)
})
} }
subscribe(filters, subId = null, closeAfter = CloseAfter.NEVER) { subscribe(filters, subId = null, closeAfter = CloseAfter.NEVER) {
@ -176,11 +195,15 @@ class ReconnectingWebSocket extends Observable {
this.disconnected = false this.disconnected = false
this.reconnectAfter = this.opts.reconnectAfter this.reconnectAfter = this.opts.reconnectAfter
this.reconnectTimer = null this.reconnectTimer = null
window.addEventListener('online', this.connect.bind(this))
window.addEventListener('focus', this.connect.bind(this))
} }
connect() { connect() {
if (this.socket) return if (this.isConnected()) return
this.disconnected = false this.disconnected = false
this.reconnectTimer = null
const ws = new WebSocket(this.url) const ws = new WebSocket(this.url)
ws.onopen = this.onOpen.bind(this) ws.onopen = this.onOpen.bind(this)
@ -192,36 +215,47 @@ class ReconnectingWebSocket extends Observable {
disconnect() { disconnect() {
this.disconnected = true this.disconnected = true
if (this.socket) this.socket.close() this.close()
this.socket = null
} }
reconnect() { reconnect() {
if (this.disconnected || this.reconnectTimer) return if (this.disconnected || this.reconnectTimer) return
console.log(`[RELAY] Scheduling reconnect to ${this.url} in ${this.reconnectAfter}ms`)
this.reconnectTimer = setTimeout( this.reconnectTimer = setTimeout(
() => { () => {
this.connect() console.log(`[RELAY] Reconnecting to ${this.url} now`)
this.reconnectTimer = null this.reconnectTimer = null
this.connect()
}, },
this.reconnectAfter this.reconnectAfter
) )
this.reconnectAfter *= 2 this.reconnectAfter = Math.min(this.reconnectAfter *= 2, 1000 * 60 * 5)
} }
isConnected() { isConnected() {
return this.socket && this.socket.readyState === WebSocket.OPEN return this.socket && this.socket.readyState === WebSocket.OPEN
} }
close() {
if (this.socket) this.socket.close()
this.socket = null
}
send(message) { send(message) {
// TODO Wait for connected? // TODO Wait for connected?
if (!this.isConnected()) { if (!this.isConnected()) {
console.warn(`Not connected to ${this.url} (currently ${this.socket?.readyState})`) console.warn(`Not connected to ${this.url} (currently ${this.socket?.readyState})`)
return return false
} }
try {
this.socket.send(JSON.stringify(message)) this.socket.send(JSON.stringify(message))
return true
} catch (e) {
return false
}
} }
onOpen() { onOpen() {
@ -230,16 +264,20 @@ class ReconnectingWebSocket extends Observable {
} }
onClose() { onClose() {
this.close()
this.emit('close', this) this.emit('close', this)
if (this.opts.reconnect) this.reconnect() if (this.opts.reconnect) this.reconnect()
} }
onError(error) { onError(error) {
console.log(`Socket error from relay ${this.url}`, error) console.log(`Socket error from relay ${this.url}`, error)
this.emit('error', error, this) this.emit('error', error, this)
if (!this.isConnected()) {
this.close()
if (this.opts.reconnect) this.reconnect() if (this.opts.reconnect) this.reconnect()
} }
}
onMessage(message) { onMessage(message) {
this.emit('message', message, this) this.emit('message', message, this)

View File

@ -1,5 +1,5 @@
import {CloseAfter, Relay} from 'src/nostr/Relay' import {CloseAfter, Relay} from 'src/nostr/Relay'
import {Observable} from 'src/nostr/utils' import {Observable} from 'src/nostr/Observable'
class MultiSubscription extends Observable { class MultiSubscription extends Observable {
constructor(subId, subs) { constructor(subId, subs) {
@ -95,10 +95,17 @@ export default class ReplayPool extends Observable {
delete this.relays[url] delete this.relays[url]
} }
publish(event) { async publish(event) {
const promises = []
for (const relay of this.connectedRelays()) { for (const relay of this.connectedRelays()) {
relay.publish(event) promises.push(relay.publish(event))
} }
return Promise.all(promises)
.then(results => results.filter(res => res).length)
.catch(e => {
console.error('Error while publishing', e)
return 0
})
} }
subscribe(filters, subId = null, closeAfter = CloseAfter.NEVER) { subscribe(filters, subId = null, closeAfter = CloseAfter.NEVER) {

View File

@ -1,26 +0,0 @@
export class Observable {
constructor() {
this.listeners = {}
}
on(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = [callback]
} else {
this.listeners[event].push(callback)
}
}
emit(event, ...args) {
const listeners = this.listeners[event]
if (!listeners) return
for (const listener of listeners) {
try {
listener.apply(null, args)
} catch (e) {
console.error(`Exception thrown from '${event}' listener: ${e.message || e}`, e)
}
}
}
}