refactor: improve send safety

This commit is contained in:
2025-05-23 17:04:16 +01:00
parent ecc96c382f
commit 735b947ec2
5 changed files with 102 additions and 134 deletions

View File

@ -59,9 +59,9 @@ impl APIHandler {
return Ok(Response::builder()
.header("Content-Type", "text/html")
.header("Access-Control-Allow-Origin", "*")
.body(Full::new(Bytes::from_static(
include_bytes!("./index.html"),
)))?);
.body(Full::new(Bytes::from_static(include_bytes!(
"./index.html"
))))?);
}
// If not, handle the request as a normal API request.
@ -120,11 +120,8 @@ impl APIHandler {
let new_notification_manager = self.notification_manager.clone();
tokio::spawn(async move {
match RelayConnection::run(websocket, new_notification_manager).await {
Ok(_) => {}
Err(e) => {
log::error!("Error with websocket connection: {:?}", e);
}
if let Err(e) = RelayConnection::run(websocket, new_notification_manager).await {
log::error!("Error with websocket connection: {}", e.to_string());
}
});
@ -135,7 +132,7 @@ impl APIHandler {
&self,
mut req: Request<Incoming>,
config: &NotePushConfig,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let parsed_request = self.parse_http_request(&mut req).await?;
let api_response: APIResponse = self
.handle_parsed_http_request(&parsed_request, config)
@ -147,7 +144,7 @@ impl APIHandler {
async fn parse_http_request(
&self,
req: &mut Request<Incoming>,
) -> Result<ParsedRequest, Box<dyn std::error::Error>> {
) -> Result<ParsedRequest, Box<dyn std::error::Error + Send + Sync>> {
// 1. Read the request body
let body_buffer = req.body_mut().collect().await?.aggregate();
let body_bytes = body_buffer.chunk();
@ -187,7 +184,7 @@ impl APIHandler {
&self,
parsed_request: &ParsedRequest,
config: &NotePushConfig,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
enum RouteMatch {
UserInfo,
UserSetting,
@ -261,7 +258,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let device_token = get_required_param(&url_params, "deviceToken")?;
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
@ -293,7 +290,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let device_token = get_required_param(&url_params, "deviceToken")?;
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
@ -310,7 +307,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
@ -323,7 +320,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
let target = get_required_param(&url_params, "target")?;
@ -350,7 +347,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
let target = get_required_param(&url_params, "target")?;
@ -367,7 +364,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let device_token = get_required_param(&url_params, "deviceToken")?;
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
@ -393,7 +390,7 @@ impl APIHandler {
&self,
req: &ParsedRequest,
url_params: Params<'_, '_>,
) -> Result<APIResponse, Box<dyn std::error::Error>> {
) -> Result<APIResponse, Box<dyn std::error::Error + Send + Sync>> {
let device_token = get_required_param(&url_params, "deviceToken")?;
let pubkey = get_required_param(&url_params, "pubkey")?;
let pubkey = check_pubkey(&pubkey, req)?;
@ -401,7 +398,7 @@ impl APIHandler {
// Proceed with the main logic after passing all checks
let settings = self
.notification_manager
.get_user_notification_settings(&pubkey, device_token.to_string())
.get_user_notification_settings(&pubkey, &device_token)
.await?;
Ok(APIResponse::ok_body(&settings))

View File

@ -82,8 +82,13 @@ pub enum NotificationManagerError {
APNSMissing,
#[error("FCM is not configured")]
FCMMissing,
#[error(transparent)]
Other(BoxSendError),
}
type BoxSendError = Box<dyn std::error::Error + Send + Sync>;
type NResult<T> = Result<T, BoxSendError>;
type DatabaseArc = Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>;
pub struct NotificationManager {
@ -105,10 +110,7 @@ impl EventSaver {
Self { db }
}
pub async fn save_if_needed(
&self,
event: &nostr::Event,
) -> Result<bool, Box<dyn std::error::Error>> {
pub async fn save_if_needed(&self, event: &nostr::Event) -> NResult<bool> {
match event.to_mute_list() {
Some(mute_list) => {
match self
@ -144,7 +146,7 @@ impl EventSaver {
pubkey: PublicKey,
mute_list: MuteList,
created_at: Timestamp,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let mute_list_json = mute_list.to_json()?;
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
@ -167,7 +169,7 @@ impl EventSaver {
pub async fn get_saved_mute_list_for(
&self,
pubkey: PublicKey,
) -> Result<Option<TimestampedMuteList>, Box<dyn std::error::Error>> {
) -> NResult<Option<TimestampedMuteList>> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
@ -175,11 +177,11 @@ impl EventSaver {
"SELECT mute_list, created_at FROM muting_preferences WHERE user_pubkey = ?",
)?;
let mute_list_info: (serde_json::Value, nostr::Timestamp) = match stmt
let mute_list_info: (serde_json::Value, Timestamp) = match stmt
.query_row([pubkey.to_sql_string()], |row| {
Ok((row.get(0)?, row.get(1)?))
}) {
Ok(info) => (info.0, nostr::Timestamp::from_sql_string(info.1)?),
Ok(info) => (info.0, Timestamp::from_sql_string(info.1)?),
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(e.into()),
};
@ -202,7 +204,7 @@ impl NotificationManager {
db: r2d2::Pool<SqliteConnectionManager>,
relay_url: String,
cache_max_age: std::time::Duration,
) -> Result<Self, Box<dyn std::error::Error>> {
) -> NResult<Self> {
let connection = db.get()?;
Self::setup_database(&connection)?;
@ -234,7 +236,7 @@ impl NotificationManager {
apns_team_id: String,
apns_environment: a2::client::Endpoint,
apns_topic: String,
) -> Result<Self, Box<dyn std::error::Error>> {
) -> NResult<Self> {
let mut file = File::open(&apns_private_key_path)?;
let client = Client::token(
@ -250,16 +252,13 @@ impl NotificationManager {
Ok(self)
}
pub fn with_fcm(
mut self,
google_services_file_path: impl Into<String>
) -> Result<Self, Box<dyn std::error::Error>> {
pub fn with_fcm(mut self, google_services_file_path: impl Into<String>) -> NResult<Self> {
self.fcm_client
.replace(Mutex::new(FcmService::new(google_services_file_path)));
Ok(self)
}
pub async fn handle_event(&self, event: &Event) -> Result<(), Box<dyn std::error::Error>> {
pub async fn handle_event(&self, event: &Event) -> NResult<()> {
log::info!(
"Received event kind={},id={}",
event.kind.as_u32(),
@ -427,10 +426,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
// MARK: - "Business" logic
pub async fn send_notifications_if_needed(
&self,
event: &Event,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn send_notifications_if_needed(&self, event: &Event) -> NResult<()> {
log::debug!(
"Checking if notifications need to be sent for event: {}",
event.id
@ -469,9 +465,11 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
);
for pubkey in pubkeys_to_notify {
self.send_event_notifications_to_pubkey(event, &pubkey)
.await?;
match self
.send_event_notifications_to_pubkey(event, &pubkey)
.await
{
Ok(_) => {
let db_mutex_guard = self.db.lock().await;
db_mutex_guard.get()?.execute(
"INSERT OR REPLACE INTO notifications (id, event_id, pubkey, received_notification, sent_at)
@ -485,6 +483,12 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
],
)?;
}
Err(e) => log::warn!(
"Error while sending notifications to {}: {}",
pubkey.to_hex(),
e.to_string()
),
}
}
Ok(())
}
@ -507,10 +511,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Self::supported_kinds().contains(&event_kind)
}
async fn pubkeys_to_notify_for_event(
&self,
event: &Event,
) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
async fn pubkeys_to_notify_for_event(&self, event: &Event) -> NResult<HashSet<PublicKey>> {
let notification_status = self.get_notification_status(event).await?;
let relevant_pubkeys = self.pubkeys_relevant_to_event(event).await?;
let mut relevant_pubkeys_that_are_registered = HashSet::new();
@ -557,7 +558,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
async fn get_newest_mute_list_available(
&self,
pubkey: &PublicKey,
) -> Result<Option<MuteList>, Box<dyn std::error::Error>> {
) -> NResult<Option<MuteList>> {
let timestamped_saved_mute_list = self.event_saver.get_saved_mute_list_for(*pubkey).await?;
let timestamped_network_mute_list =
self.nostr_network_helper.get_public_mute_list(pubkey).await;
@ -588,10 +589,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
)
}
async fn pubkeys_relevant_to_event(
&self,
event: &Event,
) -> Result<HashSet<PublicKey>, Box<dyn std::error::Error>> {
async fn pubkeys_relevant_to_event(&self, event: &Event) -> NResult<HashSet<PublicKey>> {
let mut direct_keys = event.relevant_pubkeys();
let fake_target = match event.kind {
// accept p tagged host as target for notifications in live event
@ -639,17 +637,22 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
&self,
event: &Event,
pubkey: &PublicKey,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let user_device_tokens = self.get_user_device_tokens(pubkey).await?;
for (device_token, backend) in user_device_tokens {
if !self
.user_wants_notification(pubkey, device_token.clone(), event)
.await?
{
let wants_notif = self
.user_wants_notification(pubkey, &device_token, event)
.await?;
if !wants_notif {
continue;
}
self.send_event_notification_to_device_token(event, &device_token, backend)
.await?;
// as long as we send to at least one device for each user we consider it a success
if let Err(e) = self
.send_event_notification_to_device_token(event, &device_token, backend)
.await
{
log::warn!("Failed to send notification to {}: {}", &device_token, e);
}
}
Ok(())
}
@ -657,9 +660,9 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
async fn user_wants_notification(
&self,
pubkey: &PublicKey,
device_token: String,
device_token: &str,
event: &Event,
) -> Result<bool, Box<dyn std::error::Error>> {
) -> NResult<bool> {
let notification_preferences = self
.get_user_notification_settings(pubkey, device_token)
.await?;
@ -683,31 +686,14 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Ok(notification_preferences.merge_kinds().contains(&event.kind))
}
async fn is_pubkey_token_pair_registered(
&self,
pubkey: &PublicKey,
device_token: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let current_device_tokens: Vec<String> = self
.get_user_device_tokens(pubkey)
.await?
.into_iter()
.map(|(d, _)| d)
.collect();
Ok(current_device_tokens.contains(&device_token.to_string()))
}
async fn is_pubkey_registered(
&self,
pubkey: &PublicKey,
) -> Result<bool, Box<dyn std::error::Error>> {
async fn is_pubkey_registered(&self, pubkey: &PublicKey) -> NResult<bool> {
Ok(!self.get_user_device_tokens(pubkey).await?.is_empty())
}
async fn get_notify_from_target(
&self,
target: &PublicKey,
) -> Result<Vec<(PublicKey, Vec<Kind>)>, Box<dyn std::error::Error>> {
) -> NResult<Vec<(PublicKey, Vec<Kind>)>> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
let mut stmt =
@ -732,7 +718,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
async fn get_user_device_tokens(
&self,
pubkey: &PublicKey,
) -> Result<Vec<(String, NotificationBackend)>, Box<dyn std::error::Error>> {
) -> NResult<Vec<(String, NotificationBackend)>> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
let mut stmt =
@ -749,10 +735,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Ok(device_tokens)
}
async fn get_notification_status(
&self,
event: &Event,
) -> Result<NotificationStatus, Box<dyn std::error::Error>> {
async fn get_notification_status(&self, event: &Event) -> NResult<NotificationStatus> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
let mut stmt = connection.prepare(
@ -785,7 +768,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
event: &Event,
device_token: &str,
backend: NotificationBackend,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
log::debug!("Sending notification to device token: {}", device_token);
match &backend {
@ -803,11 +786,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Ok(())
}
async fn send_event_notification_apns(
&self,
event: &Event,
device_token: &str,
) -> Result<(), Box<dyn std::error::Error>> {
async fn send_event_notification_apns(&self, event: &Event, device_token: &str) -> NResult<()> {
let client = self
.apns_client
.as_ref()
@ -842,11 +821,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Ok(())
}
async fn send_event_notification_fcm(
&self,
event: &Event,
device_token: &str,
) -> Result<(), Box<dyn std::error::Error>> {
async fn send_event_notification_fcm(&self, event: &Event, device_token: &str) -> NResult<()> {
let client = self
.fcm_client
.as_ref()
@ -865,7 +840,10 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
"nostr_event".to_string(),
event.as_json(),
)])));
client.send_notification(msg).await?;
client
.send_notification(msg)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
@ -915,7 +893,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
pubkey: PublicKey,
device_token: &str,
backend: NotificationBackend,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let current_time_unix = Timestamp::now();
let db_mutex_guard = self.db.lock().await;
db_mutex_guard.get()?.execute(
@ -935,7 +913,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
&self,
pubkey: &PublicKey,
device_token: &str,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let db_mutex_guard = self.db.lock().await;
db_mutex_guard.get()?.execute(
"DELETE FROM user_info WHERE pubkey = ? AND device_token = ?",
@ -944,10 +922,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
Ok(())
}
pub async fn get_notify_keys(
&self,
pubkey: PublicKey,
) -> Result<Vec<PublicKey>, Box<dyn std::error::Error>> {
pub async fn get_notify_keys(&self, pubkey: PublicKey) -> NResult<Vec<PublicKey>> {
let db_mutex_guard = self.db.lock().await;
let conn = db_mutex_guard.get()?;
let mut stmt =
@ -967,7 +942,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
pubkey: PublicKey,
target: PublicKey,
kinds: Vec<Kind>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let current_time_unix = Timestamp::now();
let db_mutex_guard = self.db.lock().await;
db_mutex_guard.get()?.execute(
@ -977,16 +952,12 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
target.to_sql_string(),
current_time_unix.to_sql_string(),
kinds.iter().map(|k| k.as_u32().to_string()).collect::<Vec<String>>().join(",")
]
],
)?;
Ok(())
}
pub async fn delete_notify_key(
&self,
pubkey: PublicKey,
target: PublicKey,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn delete_notify_key(&self, pubkey: PublicKey, target: PublicKey) -> NResult<()> {
let db_mutex_guard = self.db.lock().await;
db_mutex_guard.get()?.execute(
"delete from notify_keys where user_pubkey = ? and target_pubkey = ?",
@ -998,14 +969,14 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
pub async fn get_user_notification_settings(
&self,
pubkey: &PublicKey,
device_token: String,
) -> Result<UserNotificationSettings, Box<dyn std::error::Error>> {
device_token: &str,
) -> NResult<UserNotificationSettings> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
let mut stmt = connection.prepare(
"SELECT kinds, only_notifications_from_following_enabled, hellthread_notifications_disabled, hellthread_notifications_max_pubkeys FROM user_info WHERE pubkey = ? AND device_token = ?",
)?;
let settings = stmt.query_row([pubkey.to_sql_string(), device_token], |row| {
let settings = stmt.query_row([pubkey.to_sql_string().as_str(), device_token], |row| {
let kinds: String = row.get(0)?;
let kinds: Vec<Kind> = kinds
.split(',')
@ -1036,7 +1007,7 @@ ALTER TABLE user_info drop column dm_notifications_enabled;",
pubkey: &PublicKey,
device_token: String,
settings: UserNotificationSettings,
) -> Result<(), Box<dyn std::error::Error>> {
) -> NResult<()> {
let db_mutex_guard = self.db.lock().await;
let connection = db_mutex_guard.get()?;
connection.execute(

View File

@ -80,7 +80,7 @@ impl ExtendedEvent for nostr::Event {
pub trait SqlStringConvertible {
fn to_sql_string(&self) -> String;
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>>
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>
where
Self: Sized;
}
@ -90,7 +90,7 @@ impl SqlStringConvertible for nostr::EventId {
self.to_hex()
}
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
nostr::EventId::from_hex(s).map_err(|e| e.into())
}
}
@ -100,7 +100,7 @@ impl SqlStringConvertible for nostr::PublicKey {
self.to_hex()
}
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
PublicKey::from_hex(s).map_err(|e| e.into())
}
}
@ -110,7 +110,7 @@ impl SqlStringConvertible for nostr::Timestamp {
self.as_u64().to_string()
}
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error>> {
fn from_sql_string(s: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let u64_timestamp: u64 = s.parse()?;
Ok(nostr::Timestamp::from(u64_timestamp))
}
@ -180,14 +180,14 @@ impl MaybeConvertibleToRelayList for nostr::Event {
/// A trait for types that can be encoded to and decoded from JSON, specific to this crate.
/// This is defined to overcome the rust compiler's limitation of implementing a trait for a type that is not defined in the same crate.
pub trait Codable {
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error>>;
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error>>
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>;
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>
where
Self: Sized;
}
impl Codable for MuteList {
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
Ok(serde_json::json!({
"public_keys": self.public_keys.iter().map(|pk| pk.to_hex()).collect::<Vec<String>>(),
"hashtags": self.hashtags.clone(),
@ -196,7 +196,7 @@ impl Codable for MuteList {
}))
}
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error>>
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>
where
Self: Sized,
{

View File

@ -21,7 +21,7 @@ impl NostrNetworkHelper {
relay_url: String,
cache_max_age: Duration,
event_saver: EventSaver,
) -> Result<Self, Box<dyn std::error::Error>> {
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::new(Keys::generate());
client.add_relay(relay_url.clone()).await?;
client.connect().await;

View File

@ -72,7 +72,7 @@ impl RelayConnection {
&mut self,
raw_message: Result<Message, Error>,
stream: &mut WebSocketStream<TokioIo<Upgraded>>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let raw_message = raw_message?;
self.run_loop_iteration(raw_message, stream).await
}
@ -81,7 +81,7 @@ impl RelayConnection {
&mut self,
raw_message: Message,
stream: &mut WebSocketStream<TokioIo<Upgraded>>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if raw_message.is_text() {
let message: ClientMessage =
ClientMessage::from_value(Value::from_str(raw_message.to_text()?)?)?;
@ -98,7 +98,7 @@ impl RelayConnection {
async fn handle_client_message(
&self,
message: ClientMessage,
) -> Result<RelayMessage, Box<dyn std::error::Error>> {
) -> Result<RelayMessage, Box<dyn std::error::Error + Send + Sync>> {
match message {
ClientMessage::Event(event) => {
self.notification_manager.handle_event(&event).await?;