Merge remote-tracking branch 'dilger/unstable' into updated-egui-test

# Conflicts:
#	gossip-bin/Cargo.toml
This commit is contained in:
Bu5hm4nn 2024-02-12 12:30:45 -06:00
commit f48728b141
10 changed files with 402 additions and 235 deletions

206
Cargo.lock generated
View File

@ -1770,7 +1770,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
dependencies = [
"spin 0.9.8",
"spin",
]
[[package]]
@ -2182,7 +2182,7 @@ dependencies = [
"gossip-relay-picker",
"heed",
"hex",
"http",
"http 1.0.0",
"image",
"kamadak-exif",
"lazy_static",
@ -2216,7 +2216,7 @@ dependencies = [
[[package]]
name = "gossip-relay-picker"
version = "0.2.0-unstable"
source = "git+https://github.com/mikedilger/gossip-relay-picker?rev=1d13cacb0d7b32d72d0c8c27414461d5389ca2ec#1d13cacb0d7b32d72d0c8c27414461d5389ca2ec"
source = "git+https://github.com/mikedilger/gossip-relay-picker?rev=672a6a27ab688170a154aba1feb16d6c331b99ed#672a6a27ab688170a154aba1feb16d6c331b99ed"
dependencies = [
"async-trait",
"dashmap",
@ -2289,7 +2289,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"http 0.2.11",
"indexmap",
"slab",
"tokio",
@ -2440,6 +2440,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
@ -2447,7 +2458,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http",
"http 0.2.11",
"pin-project-lite",
]
@ -2483,7 +2494,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.11",
"http-body",
"httparse",
"httpdate",
@ -2503,11 +2514,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
"http",
"http 0.2.11",
"hyper",
"rustls",
"rustls 0.21.10",
"tokio",
"tokio-rustls",
"tokio-rustls 0.24.1",
]
[[package]]
@ -3176,7 +3187,7 @@ dependencies = [
"derive_more",
"hex",
"hmac",
"http",
"http 0.2.11",
"inout",
"lazy_static",
"lightning-invoice",
@ -3938,7 +3949,7 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.11",
"http-body",
"hyper",
"hyper-rustls",
@ -3951,9 +3962,9 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-native-certs",
"rustls-pemfile",
"rustls 0.21.10",
"rustls-native-certs 0.6.3",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
@ -3961,7 +3972,7 @@ dependencies = [
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-rustls 0.24.1",
"tokio-util",
"tower-service",
"url",
@ -4026,21 +4037,6 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin 0.5.2",
"untrusted 0.7.1",
"web-sys",
"winapi",
]
[[package]]
name = "ring"
version = "0.17.7"
@ -4050,8 +4046,8 @@ dependencies = [
"cc",
"getrandom 0.2.12",
"libc",
"spin 0.9.8",
"untrusted 0.9.0",
"spin",
"untrusted",
"windows-sys 0.48.0",
]
@ -4167,11 +4163,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
dependencies = [
"log",
"ring 0.17.7",
"ring",
"rustls-webpki 0.101.7",
"sct",
]
[[package]]
name = "rustls"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
dependencies = [
"log",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.2",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
@ -4179,7 +4189,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-native-certs"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.0.0",
"rustls-pki-types",
"schannel",
"security-framework",
]
@ -4194,23 +4217,40 @@ dependencies = [
]
[[package]]
name = "rustls-webpki"
version = "0.100.3"
name = "rustls-pemfile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3"
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
"base64 0.21.7",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
"ring",
"untrusted",
]
[[package]]
name = "rustls-webpki"
version = "0.102.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
@ -4298,8 +4338,8 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
"ring",
"untrusted",
]
[[package]]
@ -4646,12 +4686,6 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
@ -5019,26 +5053,38 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"rustls 0.21.10",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.2",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.19.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"native-tls",
"rustls",
"rustls-native-certs",
"rustls 0.22.2",
"rustls-native-certs 0.7.0",
"rustls-pki-types",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-rustls 0.25.0",
"tungstenite",
"webpki-roots 0.23.1",
"webpki-roots 0.26.1",
]
[[package]]
@ -5165,26 +5211,26 @@ checksum = "17f77d76d837a7830fe1d4f12b7b4ba4192c1888001c7164257e4bc6d21d96b4"
[[package]]
name = "tungstenite"
version = "0.19.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"http 1.0.0",
"httparse",
"log",
"native-tls",
"rand 0.8.5",
"rustls",
"rustls-native-certs",
"rustls 0.22.2",
"rustls-native-certs 0.7.0",
"rustls-pki-types",
"sha1",
"thiserror",
"url",
"utf-8",
"webpki",
"webpki-roots 0.23.1",
"webpki-roots 0.26.1",
]
[[package]]
@ -5292,12 +5338,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "untrusted"
version = "0.9.0"
@ -5655,31 +5695,21 @@ dependencies = [
"web-sys",
]
[[package]]
name = "webpki"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
]
[[package]]
name = "webpki-roots"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-webpki 0.100.3",
]
[[package]]
name = "webpki-roots"
version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "webpki-roots"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "weezl"
version = "0.1.8"

View File

@ -23,7 +23,7 @@ eframe = { git = "https://github.com/bu5hm4nn/egui", rev = "f954e98816d4fb760de3
egui-winit = { git = "https://github.com/bu5hm4nn/egui", rev = "f954e98816d4fb760de31eeec4afcd15ae39a687", features = [ "default" ] }
#egui = { git = "https://github.com/bu5hm4nn/egui", rev = "f954e98816d4fb760de31eeec4afcd15ae39a687", features = [ "deadlock_detection" ] }
egui-video = { git = "https://github.com/bu5hm4nn/egui-video", rev = "baad84b9bb897004a243166758d149d3387d4811", features = [ "from_bytes" ], optional = true }
gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker", rev = "1d13cacb0d7b32d72d0c8c27414461d5389ca2ec" }
gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker", rev = "672a6a27ab688170a154aba1feb16d6c331b99ed" }
gossip-lib = { path = "../gossip-lib" }
humansize = "2.1"
image = { version = "0.24.6", features = [ "png", "jpeg" ] }

View File

@ -62,12 +62,10 @@ use egui_winit::egui::Response;
use egui_winit::egui::ViewportBuilder;
use egui_winit::winit::raw_window_handle::HasDisplayHandle;
use gossip_lib::comms::ToOverlordMessage;
use gossip_lib::About;
use gossip_lib::Error;
use gossip_lib::FeedKind;
use gossip_lib::{DmChannel, DmChannelData};
use gossip_lib::{Person, PersonList};
use gossip_lib::{ZapState, GLOBALS};
use gossip_lib::nip46::Approval;
use gossip_lib::{
About, DmChannel, DmChannelData, Error, FeedKind, Person, PersonList, ZapState, GLOBALS,
};
use nostr_types::ContentSegment;
use nostr_types::{Id, Metadata, MilliSatoshi, Profile, PublicKey, UncheckedUrl, Url};
use std::collections::{HashMap, HashSet};
@ -2280,20 +2278,31 @@ fn approval_dialog_inner(_app: &mut GossipUi, ui: &mut Ui) {
ui.horizontal(|ui| {
let text = format!("Allow {}", parsed_command.method);
ui.label(text);
if ui.button("Approve").clicked() {
if ui.button("Approve Once").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::Nip46ServerOpApproved(
.send(ToOverlordMessage::Nip46ServerOpApprovalResponse(
*pubkey,
parsed_command.clone(),
Approval::Once,
));
}
if ui.button("Approve Always").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::Nip46ServerOpApprovalResponse(
*pubkey,
parsed_command.clone(),
Approval::Always,
));
}
if ui.button("Decline").clicked() {
let _ = GLOBALS
.to_overlord
.send(ToOverlordMessage::Nip46ServerOpDeclined(
.send(ToOverlordMessage::Nip46ServerOpApprovalResponse(
*pubkey,
parsed_command.clone(),
Approval::None,
));
}
});

View File

@ -47,10 +47,10 @@ fallible-iterator = "0.2"
filetime = "0.2"
futures = "0.3"
futures-util = "0.3"
gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker", rev = "1d13cacb0d7b32d72d0c8c27414461d5389ca2ec" }
gossip-relay-picker = { git = "https://github.com/mikedilger/gossip-relay-picker", rev = "672a6a27ab688170a154aba1feb16d6c331b99ed" }
heed = { git = "https://github.com/meilisearch/heed", rev = "64fd6fec293c0dee94855b8267557ce03e7ce5d8" }
hex = "0.4"
http = "0.2"
http = "1.0"
image = { version = "0.24.6", features = [ "png", "jpeg" ] }
kamadak-exif = "0.5"
lazy_static = "1.4"
@ -73,8 +73,8 @@ textnonce = "1"
tiny-skia = "0.10.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tokio-tungstenite = { version = "0.19", default-features = false, features = [ "connect", "handshake" ] }
tungstenite = { version = "0.19", default-features = false }
tokio-tungstenite = { version = "0.21", default-features = false, features = [ "connect", "handshake" ] }
tungstenite = { version = "0.21", default-features = false }
url = "2.4"
usvg = "0.35.0"
zeroize = "1.6"

View File

@ -1,5 +1,5 @@
use crate::dm_channel::DmChannel;
use crate::nip46::ParsedCommand;
use crate::nip46::{Approval, ParsedCommand};
use crate::people::PersonList;
use crate::relay::Relay;
use nostr_types::{
@ -106,11 +106,8 @@ pub enum ToOverlordMessage {
/// internal (minions use this channel too)
MinionJobUpdated(RelayUrl, u64, u64),
/// Calls [nip46_server_op_approved](crate::Overlord::nip46_server_op_approved)
Nip46ServerOpApproved(PublicKey, ParsedCommand),
/// Calls [nip46_server_op_declined](crate::Overlord::nip46_server_op_declined)
Nip46ServerOpDeclined(PublicKey, ParsedCommand),
/// Calls [nip46_server_op_approval_response](crate::Overlord::nip46_server_op_approval_response)
Nip46ServerOpApprovalResponse(PublicKey, ParsedCommand, Approval),
/// Calls [post](crate::Overlord::post)
Post {

View File

@ -32,10 +32,10 @@ pub fn fetch(url: &str, filters: Vec<Filter>) -> Result<Vec<Event>, Error> {
let (mut websocket, _response) = tungstenite::connect(request)?;
websocket.write_message(Message::Text(wire))?;
websocket.send(Message::Text(wire))?;
loop {
let message = match websocket.read_message() {
let message = match websocket.read() {
Ok(m) => m,
Err(e) => {
tracing::error!("Problem reading from websocket: {}", e);
@ -59,11 +59,11 @@ pub fn fetch(url: &str, filters: Vec<Filter>) -> Result<Vec<Event>, Error> {
return Ok(events);
}
};
if let Err(e) = websocket.write_message(Message::Text(wire)) {
if let Err(e) = websocket.send(Message::Text(wire)) {
tracing::error!("Could not write close subscription message: {}", e);
return Ok(events);
}
if let Err(e) = websocket.write_message(Message::Close(None)) {
if let Err(e) = websocket.send(Message::Close(None)) {
tracing::error!("Could not write websocket close message: {}", e);
return Ok(events);
}
@ -82,7 +82,7 @@ pub fn fetch(url: &str, filters: Vec<Filter>) -> Result<Vec<Event>, Error> {
}
Message::Binary(_) => tracing::debug!("IGNORING BINARY MESSAGE"),
Message::Ping(vec) => {
if let Err(e) = websocket.write_message(Message::Pong(vec)) {
if let Err(e) = websocket.send(Message::Pong(vec)) {
tracing::warn!("Unable to pong: {}", e);
}
}
@ -124,11 +124,11 @@ pub fn post(url: &str, event: Event) -> Result<(), Error> {
let (mut websocket, _response) = tungstenite::connect(request)?;
websocket.write_message(Message::Text(wire))?;
websocket.send(Message::Text(wire))?;
// Get and print one response message
let message = match websocket.read_message() {
let message = match websocket.read() {
Ok(m) => m,
Err(e) => {
tracing::error!("Problem reading from websocket: {}", e);
@ -155,7 +155,7 @@ pub fn post(url: &str, event: Event) -> Result<(), Error> {
}
Message::Binary(_) => tracing::debug!("IGNORING BINARY MESSAGE"),
Message::Ping(vec) => {
if let Err(e) = websocket.write_message(Message::Pong(vec)) {
if let Err(e) = websocket.send(Message::Pong(vec)) {
tracing::warn!("Unable to pong: {}", e);
}
}

View File

@ -39,15 +39,18 @@ impl Nip46UnconnectedServer {
None => return Err(ErrorKind::NoPublicKey.into()),
};
let mut token = format!("{}#{}?", public_key.as_bech32_string(), self.connect_secret);
let relay_part = &self
.relays
.iter()
.map(|r| format!("relay={}", r))
.collect::<Vec<String>>()
.join("&");
token.push_str(
&self
.relays
.iter()
.map(|r| format!("relay={}", r))
.collect::<Vec<String>>()
.join("&"),
let token = format!(
"bunker://{}?{}&secret={}",
public_key.as_hex_string(),
relay_part,
self.connect_secret
);
Ok(token)
@ -57,15 +60,26 @@ impl Nip46UnconnectedServer {
#[derive(Debug, Copy, Clone, Readable, Writable)]
pub enum Approval {
None,
Once,
Until(Unixtime),
Always,
}
impl Approval {
pub fn is_approved(&self) -> bool {
fn is_approved(&mut self) -> bool {
match self {
Approval::None => false,
Approval::Until(time) => Unixtime::now().unwrap() > *time,
Approval::Once => {
*self = Approval::None;
true
}
Approval::Until(time) => {
let approved = Unixtime::now().unwrap() < *time;
if !approved {
*self = Approval::None;
}
approved
}
Approval::Always => true,
}
}
@ -150,7 +164,7 @@ impl Nip46Server {
*/
pub fn handle(&self, cmd: &ParsedCommand) -> Result<(), Error> {
pub fn handle(&mut self, cmd: &ParsedCommand) -> Result<(), Error> {
let ParsedCommand {
ref id,
ref method,
@ -467,7 +481,7 @@ fn send_response(
pub fn handle_command(event: &Event, seen_on: Option<RelayUrl>) -> Result<(), Error> {
// If we have a server for that pubkey
if let Some(server) = GLOBALS.storage.read_nip46server(event.pubkey)? {
if let Some(mut server) = GLOBALS.storage.read_nip46server(event.pubkey)? {
// Parse the command
let parsed_command = match parse_command(event.pubkey, &event.content) {
Ok(pc) => pc,

View File

@ -1,4 +1,4 @@
use super::Minion;
use super::{AuthState, Minion};
use crate::comms::ToOverlordMessage;
use crate::error::Error;
use crate::globals::GLOBALS;
@ -151,14 +151,16 @@ impl Minion {
};
// If we are waiting for a response for this id, process
if self.waiting_for_auth.is_some() && self.waiting_for_auth.unwrap() == id {
self.waiting_for_auth = None;
if !ok {
// Auth failed. Let's disconnect
tracing::warn!("AUTH failed to {}: {}", &self.url, ok_message);
self.keepgoing = false;
} else {
self.try_resubscribe_to_corked().await?;
if let AuthState::Waiting(waiting_id) = self.auth_state {
if waiting_id == id {
if !ok {
self.auth_state = AuthState::Failed;
// Auth failed.
tracing::warn!("AUTH failed to {}: {}", &self.url, ok_message);
} else {
self.auth_state = AuthState::Authenticated;
self.try_subscribe_waiting().await?;
}
}
} else if self.postings.contains(&id) {
if ok {
@ -203,8 +205,6 @@ impl Minion {
tracing::info!("{}: Closed: {}: {}", &self.url, handle, message);
let mut retry = false;
// Check the machine-readable prefix
if let Some(prefix) = message.split(':').next() {
match prefix {
@ -224,32 +224,77 @@ impl Minion {
);
}
"rate-limited" => {
retry = true;
// Wait to retry later
self.subscriptions_rate_limited.push(handle);
// return now, don't remove sub from map
return Ok(());
}
"invalid" => {
tracing::warn!(
"{} won't serve our {} sub (says invalid)",
&self.url,
&handle
);
self.failed_subs.insert(handle.clone());
}
"error" => {
tracing::warn!(
"{} won't serve our {} sub (says error)",
&self.url,
&handle
);
self.failed_subs.insert(handle.clone());
}
"invalid" => {}
"error" => {}
"auth-required" => {
if self.waiting_for_auth.is_none() {
tracing::warn!("{} says AUTH required for {}, but it has not AUTH challenged us yet", &self.url, handle);
match self.auth_state {
AuthState::None => {
// authenticate
self.authenticate().await?;
// cork and retry once auth completes
self.subscriptions_waiting_for_auth
.push((handle, Unixtime::now().unwrap()));
// return now, don't remove sub from map
return Ok(());
}
AuthState::Waiting(_) => {
// cork and retry once auth completes
self.subscriptions_waiting_for_auth
.push((handle, Unixtime::now().unwrap()));
// return now, don't remove sub from map
return Ok(());
}
AuthState::Authenticated => {
// We are authenticated, but it doesn't like us.
// fail this subscription handle
self.failed_subs.insert(handle.clone());
}
AuthState::Failed => {
// fail this subscription handle
self.failed_subs.insert(handle.clone());
}
}
retry = true;
}
"restricted" => {}
"restricted" => {
tracing::warn!(
"{} won't serve our {} sub (says restricted)",
&self.url,
&handle
);
self.failed_subs.insert(handle.clone());
}
_ => {
tracing::warn!("{} closed with unknown prefix {}", &self.url, prefix);
}
}
}
if retry {
// Save as corked, try it again later
self.corked_subscriptions
.push((handle, Unixtime::now().unwrap()));
} else {
// Remove the subscription
tracing::info!("{}: removed subscription {}", &self.url, handle);
let _ = self.subscription_map.remove(&handle);
}
// Remove the subscription
tracing::info!("{}: removed subscription {}", &self.url, handle);
let _ = self.subscription_map.remove(&handle);
}
}

View File

@ -32,11 +32,38 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::protocol::{Message as WsMessage, WebSocketConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthState {
None,
Waiting(Id), // we sent AUTH, have not got response back yet
Authenticated,
Failed,
}
pub struct EventSeekState {
pub job_ids: Vec<u64>,
pub asked: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MinionExitReason {
GotDisconnected,
GotShutdownMessage,
GotWSClose,
LostOverlord,
SubscriptionsHaveCompleted,
Unknown,
}
impl MinionExitReason {
pub fn benign(&self) -> bool {
matches!(
*self,
MinionExitReason::GotShutdownMessage | MinionExitReason::SubscriptionsHaveCompleted
)
}
}
pub struct Minion {
url: RelayUrl,
to_overlord: UnboundedSender<ToOverlordMessage>,
@ -46,17 +73,19 @@ pub struct Minion {
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
subscription_map: SubscriptionMap,
next_events_subscription_id: u32,
keepgoing: bool,
postings: HashSet<Id>,
sought_events: HashMap<Id, EventSeekState>,
last_message_sent: String,
waiting_for_auth: Option<Id>,
auth_challenge: String,
corked_subscriptions: Vec<(String, Unixtime)>,
corked_metadata: Vec<(u64, Vec<PublicKey>)>,
subscriptions_waiting_for_auth: Vec<(String, Unixtime)>,
subscriptions_waiting_for_metadata: Vec<(u64, Vec<PublicKey>)>,
subscriptions_rate_limited: Vec<String>,
general_feed_start: Option<Unixtime>,
person_feed_start: Option<Unixtime>,
inbox_feed_start: Option<Unixtime>,
exiting: Option<MinionExitReason>,
auth_state: AuthState,
failed_subs: HashSet<String>,
}
impl Minion {
@ -74,23 +103,28 @@ impl Minion {
stream: None,
subscription_map: SubscriptionMap::new(),
next_events_subscription_id: 0,
keepgoing: true,
postings: HashSet::new(),
sought_events: HashMap::new(),
last_message_sent: String::new(),
waiting_for_auth: None,
auth_challenge: "".to_string(),
corked_subscriptions: Vec::new(),
corked_metadata: Vec::new(),
subscriptions_waiting_for_auth: Vec::new(),
subscriptions_waiting_for_metadata: Vec::new(),
subscriptions_rate_limited: Vec::new(),
general_feed_start: None,
person_feed_start: None,
inbox_feed_start: None,
exiting: None,
auth_state: AuthState::None,
failed_subs: HashSet::new(),
})
}
}
impl Minion {
pub(crate) async fn handle(&mut self, mut messages: Vec<ToMinionPayload>) -> Result<(), Error> {
pub(crate) async fn handle(
&mut self,
mut messages: Vec<ToMinionPayload>,
) -> Result<MinionExitReason, Error> {
// minion will log when it connects
tracing::trace!("{}: Minion handling started", &self.url);
@ -216,7 +250,6 @@ impl Minion {
.body(())?;
let config: WebSocketConfig = WebSocketConfig {
max_send_queue: None,
// Tungstenite default is 64 MiB.
// Cameri nostream relay limits to 0.5 a megabyte
// Based on my current database of 7356 events, the longest was 11,121 bytes.
@ -230,6 +263,7 @@ impl Minion {
accept_unmasked_frames: GLOBALS
.storage
.read_setting_websocket_accept_unmasked_frames(),
..Default::default()
};
let connect_timeout_secs = if short_timeout {
@ -267,7 +301,7 @@ impl Minion {
'relayloop: loop {
match self.loop_handler().await {
Ok(_) => {
if !self.keepgoing {
if self.exiting.is_some() {
break 'relayloop;
}
}
@ -292,7 +326,10 @@ impl Minion {
}
}
Ok(())
match self.exiting {
Some(reason) => Ok(reason),
None => Ok(MinionExitReason::Unknown),
}
}
async fn loop_handler(&mut self) -> Result<(), Error> {
@ -320,14 +357,14 @@ impl Minion {
// Update subscription for sought events
self.get_events().await?;
// Try to subscribe to corked subscriptions and metadata
self.try_resubscribe_to_corked().await?;
// Try to subscribe to subscriptions waiting for something
self.try_subscribe_waiting().await?;
},
to_minion_message = self.from_overlord.recv() => {
let to_minion_message = match to_minion_message {
Ok(m) => m,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
self.keepgoing = false;
self.exiting = Some(MinionExitReason::LostOverlord);
return Ok(());
},
Err(e) => return Err(e.into())
@ -343,7 +380,7 @@ impl Minion {
if ws_stream.is_terminated() {
// possibly connection reset
tracing::info!("{}: connected terminated", &self.url);
self.keepgoing = false;
self.exiting = Some(MinionExitReason::GotDisconnected);
}
return Ok(());
}
@ -363,7 +400,9 @@ impl Minion {
WsMessage::Binary(_) => tracing::warn!("{}, Unexpected binary message", &self.url),
WsMessage::Ping(_) => { }, // tungstenite automatically pongs.
WsMessage::Pong(_) => { }, // Verify it is 0x1? Nah. It's just for keep-alive.
WsMessage::Close(_) => self.keepgoing = false,
WsMessage::Close(_) => {
self.exiting = Some(MinionExitReason::GotWSClose);
}
WsMessage::Frame(_) => tracing::warn!("{}: Unexpected frame message", &self.url),
}
},
@ -371,7 +410,7 @@ impl Minion {
// Don't continue if we have no more subscriptions
if self.subscription_map.is_empty() {
self.keepgoing = false;
self.exiting = Some(MinionExitReason::SubscriptionsHaveCompleted);
}
Ok(())
@ -436,7 +475,7 @@ impl Minion {
}
ToMinionPayloadDetail::Shutdown => {
tracing::debug!("{}: Websocket listener shutting down", &self.url);
self.keepgoing = false;
self.exiting = Some(MinionExitReason::GotShutdownMessage);
}
ToMinionPayloadDetail::SubscribeAugments(ids) => {
self.subscribe_augments(message.job_id, ids).await?;
@ -1102,19 +1141,16 @@ impl Minion {
Ok(())
}
async fn try_resubscribe_to_corked(&mut self) -> Result<(), Error> {
// Do not do this if we are waiting for AUTH
if self.waiting_for_auth.is_some() {
return Ok(());
}
async fn try_subscribe_waiting(&mut self) -> Result<(), Error> {
// Subscribe to metadata
if !self.subscription_map.has("temp_subscribe_metadata") && !self.corked_metadata.is_empty()
if !self.subscription_map.has("temp_subscribe_metadata")
&& !self.subscriptions_waiting_for_metadata.is_empty()
{
let mut corked_metadata = std::mem::take(&mut self.corked_metadata);
let mut subscriptions_waiting_for_metadata =
std::mem::take(&mut self.subscriptions_waiting_for_metadata);
let mut combined_job_id: Option<u64> = None;
let mut combined_pubkeys: Vec<PublicKey> = Vec::new();
for (job_id, pubkeys) in corked_metadata.drain(..) {
for (job_id, pubkeys) in subscriptions_waiting_for_metadata.drain(..) {
if combined_job_id.is_none() {
combined_job_id = Some(job_id)
} else {
@ -1132,19 +1168,35 @@ impl Minion {
.await?;
}
// Apply subscriptions that were waiting for auth
let mut handles = std::mem::take(&mut self.corked_subscriptions);
let now = Unixtime::now().unwrap();
for (handle, when) in handles.drain(..) {
// Do not try if we just inserted it within the last second
if when - now < Duration::from_secs(1) {
// re-insert
self.corked_subscriptions.push((handle, when));
continue;
}
// If we are authenticated
if self.auth_state != AuthState::Authenticated {
// Apply subscriptions that were waiting for auth
let mut handles = std::mem::take(&mut self.subscriptions_waiting_for_auth);
let now = Unixtime::now().unwrap();
for (handle, when) in handles.drain(..) {
// Do not try if we just inserted it within the last second
if when - now < Duration::from_secs(1) {
// re-insert
self.subscriptions_waiting_for_auth.push((handle, when));
continue;
}
tracing::info!("Sending corked subscription {} to {}", handle, &self.url);
self.send_subscription(&handle).await?;
tracing::info!("Sending corked subscription {} to {}", handle, &self.url);
self.send_subscription(&handle).await?;
}
}
// Retry rate-limited subscriptions
if !self.subscriptions_rate_limited.is_empty() {
let mut handles = std::mem::take(&mut self.subscriptions_rate_limited);
for handle in handles.drain(..) {
tracing::info!(
"Sending previously rate-limited subscription {} to {}",
handle,
&self.url
);
self.send_subscription(&handle).await?;
}
}
Ok(())
@ -1229,7 +1281,8 @@ impl Minion {
) -> Result<(), Error> {
if self.subscription_map.has("temp_subscribe_metadata") {
// Save for later
self.corked_metadata.push((job_id, pubkeys));
self.subscriptions_waiting_for_metadata
.push((job_id, pubkeys));
return Ok(());
}
@ -1259,6 +1312,15 @@ impl Minion {
return Ok(());
}
if self.failed_subs.contains(handle) {
tracing::info!(
"{}: Avoiding resubscribing to a previously failed subscription: {}",
&self.url,
handle
);
return Ok(());
}
if let Some(sub) = self.subscription_map.get_mut(handle) {
// Gratitously bump the EOSE as if the relay was finished, since it was
// our fault the subscription is getting cut off. This way we will pick up
@ -1296,9 +1358,9 @@ impl Minion {
);
}
if self.waiting_for_auth.is_some() {
if matches!(self.auth_state, AuthState::Waiting(_)) {
// Save this, subscribe after AUTH completes
self.corked_subscriptions
self.subscriptions_waiting_for_auth
.push((handle.to_owned(), Unixtime::now().unwrap()));
return Ok(());
}
@ -1353,6 +1415,12 @@ impl Minion {
}
async fn authenticate(&mut self) -> Result<(), Error> {
match self.auth_state {
AuthState::Authenticated => return Ok(()),
AuthState::Waiting(_) => return Ok(()),
_ => (),
}
if !GLOBALS.identity.is_unlocked() {
return Err(ErrorKind::NoPrivateKeyForAuth(self.url.clone()).into());
}
@ -1381,7 +1449,8 @@ impl Minion {
ws_stream.send(WsMessage::Text(wire)).await?;
tracing::info!("Authenticated to {}", &self.url);
self.waiting_for_auth = Some(id);
self.auth_state = AuthState::Waiting(id);
Ok(())
}

View File

@ -18,7 +18,7 @@ use crate::tags::{
use gossip_relay_picker::{Direction, RelayAssignment};
use heed::RwTxn;
use http::StatusCode;
use minion::Minion;
use minion::{Minion, MinionExitReason};
use nostr_types::{
ContentEncryptionAlgorithm, EncryptedPrivateKey, Event, EventAddr, EventKind, EventReference,
Id, IdHex, Metadata, MilliSatoshi, NostrBech32, PayRequestData, PreEvent, PrivateKey, Profile,
@ -33,7 +33,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
use tokio::{select, task};
use zeroize::Zeroize;
type MinionResult = Result<(), Error>;
type MinionResult = Result<MinionExitReason, Error>;
/// The overlord handles any operation that involves talking to relays, and a few more.
///
@ -48,7 +48,7 @@ pub struct Overlord {
inbox: UnboundedReceiver<ToOverlordMessage>,
// All the minion tasks running.
minions: task::JoinSet<Result<(), Error>>,
minions: task::JoinSet<Result<MinionExitReason, Error>>,
// Map from minion task::Id to Url
minions_task_url: HashMap<task::Id, RelayUrl>,
@ -420,7 +420,8 @@ impl Overlord {
// Set to not connected
let relayjobs = GLOBALS.connected_relays.remove(&url).map(|(_, v)| v);
let mut exclusion: u64 = 30;
let mut exclusion: u64;
let mut completed: bool = false;
match join_result {
Err(join_error) => {
@ -429,9 +430,24 @@ impl Overlord {
exclusion = 120;
}
Ok((_id, result)) => match result {
Ok(_) => {
tracing::debug!("Minion {} completed", &url);
// no exclusion
Ok(exitreason) => {
if exitreason.benign() {
tracing::debug!("Minion {} completed: {:?}", &url, exitreason);
} else {
tracing::info!("Minion {} completed: {:?}", &url, exitreason);
}
exclusion = match exitreason {
MinionExitReason::GotDisconnected => 120,
MinionExitReason::GotWSClose => 120,
MinionExitReason::Unknown => 120,
MinionExitReason::SubscriptionsHaveCompleted => 5,
_ => 5,
};
// Remember if the relay says all the jobs have completed
if matches!(exitreason, MinionExitReason::SubscriptionsHaveCompleted) {
completed = true;
}
}
Err(e) => {
Self::bump_failure_count(&url);
@ -479,7 +495,7 @@ impl Overlord {
// We might need to act upon this minion exiting
if !GLOBALS.shutting_down.load(Ordering::Relaxed) {
self.recover_from_minion_exit(url, relayjobs, exclusion)
self.recover_from_minion_exit(url, relayjobs, exclusion, completed)
.await;
}
}
@ -489,6 +505,7 @@ impl Overlord {
url: RelayUrl,
jobs: Option<Vec<RelayJob>>,
exclusion: u64,
completed: bool,
) {
// For people we are following, pick relays
if let Err(e) = GLOBALS.relay_picker.refresh_person_relay_scores().await {
@ -502,7 +519,11 @@ impl Overlord {
GLOBALS.active_advertise_jobs.remove(&job.payload.job_id);
}
// If we have any persistent jobs, restart after a delaythe relay
if completed {
return;
}
// If we have any persistent jobs, restart after a delay
let persistent_jobs: Vec<RelayJob> = jobs
.drain(..)
.filter(|job| job.reason.persistent())
@ -643,12 +664,8 @@ impl Overlord {
self.maybe_disconnect_relay(&url)?;
}
}
ToOverlordMessage::Nip46ServerOpApproved(pubkey, parsed_command) => {
self.nip46_server_op_approved(pubkey, parsed_command)
.await?;
}
ToOverlordMessage::Nip46ServerOpDeclined(pubkey, parsed_command) => {
self.nip46_server_op_declined(pubkey, parsed_command)
ToOverlordMessage::Nip46ServerOpApprovalResponse(pubkey, parsed_command, approval) => {
self.nip46_server_op_approval_response(pubkey, parsed_command, approval)
.await?;
}
ToOverlordMessage::RefreshScoresAndPickRelays => {
@ -1666,10 +1683,11 @@ impl Overlord {
}
/// Process approved nip46 server operation
pub async fn nip46_server_op_approved(
pub async fn nip46_server_op_approval_response(
&mut self,
pubkey: PublicKey,
parsed_command: ParsedCommand,
approval: Approval,
) -> Result<(), Error> {
// Clear the request
GLOBALS
@ -1684,12 +1702,12 @@ impl Overlord {
// So the approval only applies to this one time. FIXME: we should use the options
// to approve always (saved) and Until a set time.
match parsed_command.method.as_str() {
"sign_event" => server.sign_approval = Approval::Always,
"nip04_encrypt" | "nip44_encrypt" => server.encrypt_approval = Approval::Always,
"nip04_decrypt" | "nip44_decrypt" => server.decrypt_approval = Approval::Always,
"sign_event" => server.sign_approval = approval,
"nip04_encrypt" | "nip44_encrypt" => server.encrypt_approval = approval,
"nip04_decrypt" | "nip44_decrypt" => server.decrypt_approval = approval,
"nip44_get_key" => {
server.encrypt_approval = Approval::Always;
server.decrypt_approval = Approval::Always;
server.encrypt_approval = approval;
server.decrypt_approval = approval;
}
_ => {}
}
@ -1700,21 +1718,6 @@ impl Overlord {
Ok(())
}
/// Process declined nip46 server operation
pub async fn nip46_server_op_declined(
&mut self,
pubkey: PublicKey,
parsed_command: ParsedCommand,
) -> Result<(), Error> {
// Clear the request
GLOBALS
.nip46_approval_requests
.write()
.retain(|(pk, pc)| *pk != pubkey || *pc != parsed_command);
Ok(())
}
/// Trigger the relay picker to find relays for people not fully covered
pub async fn refresh_scores_and_pick_relays(&mut self) -> Result<(), Error> {
// When manually doing this, we refresh person_relay scores first which