mirror of
https://github.com/nostrlabs-io/notepush.git
synced 2025-06-14 11:07:43 +00:00
Remove Mutex in front of nostr_network_helper for better performance.
This commit drastically improves the performance of notification manager by changing the mutex architecture to be only around the cache, instead of the entire nostr_network_helper. Furthermore, the lock is acquired twice when getting events that may be cached (or need caching), to avoid having the cache locked across Nostr fetch requests, which can take up to 10 seconds each in the worst case scenario. Testing -------- Quickly smoke tested sending some events to ensure push notifications are still working overall Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
use tokio::sync::Mutex;
|
||||
use super::nostr_event_extensions::MaybeConvertibleToMuteList;
|
||||
use super::ExtendedEvent;
|
||||
use nostr_sdk::prelude::*;
|
||||
@ -9,7 +10,7 @@ const CACHE_MAX_AGE: Duration = Duration::from_secs(60);
|
||||
|
||||
pub struct NostrNetworkHelper {
|
||||
client: Client,
|
||||
cache: Cache,
|
||||
cache: Mutex<Cache>,
|
||||
}
|
||||
|
||||
impl NostrNetworkHelper {
|
||||
@ -20,13 +21,13 @@ impl NostrNetworkHelper {
|
||||
client.add_relay(relay_url.clone()).await?;
|
||||
client.connect().await;
|
||||
|
||||
Ok(NostrNetworkHelper { client, cache: Cache::new(CACHE_MAX_AGE) })
|
||||
Ok(NostrNetworkHelper { client, cache: Mutex::new(Cache::new(CACHE_MAX_AGE)) })
|
||||
}
|
||||
|
||||
// MARK: - Answering questions about a user
|
||||
|
||||
pub async fn should_mute_notification_for_pubkey(
|
||||
&mut self,
|
||||
&self,
|
||||
event: &Event,
|
||||
pubkey: &PublicKey,
|
||||
) -> bool {
|
||||
@ -71,7 +72,7 @@ impl NostrNetworkHelper {
|
||||
}
|
||||
|
||||
pub async fn does_pubkey_follow_pubkey(
|
||||
&mut self,
|
||||
&self,
|
||||
source_pubkey: &PublicKey,
|
||||
target_pubkey: &PublicKey,
|
||||
) -> bool {
|
||||
@ -88,30 +89,34 @@ impl NostrNetworkHelper {
|
||||
|
||||
// MARK: - Getting specific event types with caching
|
||||
|
||||
pub async fn get_public_mute_list(&mut self, pubkey: &PublicKey) -> Option<MuteList> {
|
||||
match self.cache.get_mute_list(pubkey) {
|
||||
Ok(optional_mute_list) => optional_mute_list,
|
||||
Err(_) => {
|
||||
// We don't have an answer from the cache, so we need to fetch it
|
||||
let mute_list_event = self.fetch_single_event(pubkey, Kind::MuteList)
|
||||
.await;
|
||||
self.cache.add_optional_mute_list_with_author(pubkey, mute_list_event.clone());
|
||||
mute_list_event?.to_mute_list()
|
||||
pub async fn get_public_mute_list(&self, pubkey: &PublicKey) -> Option<MuteList> {
|
||||
{
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
if let Ok(optional_mute_list) = cache_mutex_guard.get_mute_list(pubkey) {
|
||||
return optional_mute_list;
|
||||
}
|
||||
}
|
||||
} // Release the lock here for improved performance
|
||||
|
||||
// We don't have an answer from the cache, so we need to fetch it
|
||||
let mute_list_event = self.fetch_single_event(pubkey, Kind::MuteList).await;
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
cache_mutex_guard.add_optional_mute_list_with_author(pubkey, mute_list_event.clone());
|
||||
mute_list_event?.to_mute_list()
|
||||
}
|
||||
|
||||
pub async fn get_contact_list(&mut self, pubkey: &PublicKey) -> Option<Event> {
|
||||
match self.cache.get_contact_list(pubkey) {
|
||||
Ok(optional_contact_list) => optional_contact_list,
|
||||
Err(_) => {
|
||||
// We don't have an answer from the cache, so we need to fetch it
|
||||
let contact_list_event = self.fetch_single_event(pubkey, Kind::ContactList)
|
||||
.await;
|
||||
self.cache.add_optional_contact_list_with_author(pubkey, contact_list_event.clone());
|
||||
contact_list_event
|
||||
pub async fn get_contact_list(&self, pubkey: &PublicKey) -> Option<Event> {
|
||||
{
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
if let Ok(optional_contact_list) = cache_mutex_guard.get_contact_list(pubkey) {
|
||||
return optional_contact_list;
|
||||
}
|
||||
}
|
||||
} // Release the lock here for improved performance
|
||||
|
||||
// We don't have an answer from the cache, so we need to fetch it
|
||||
let contact_list_event = self.fetch_single_event(pubkey, Kind::ContactList).await;
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
cache_mutex_guard.add_optional_contact_list_with_author(pubkey, contact_list_event.clone());
|
||||
contact_list_event
|
||||
}
|
||||
|
||||
// MARK: - Lower level fetching functions
|
||||
@ -121,15 +126,15 @@ impl NostrNetworkHelper {
|
||||
.kinds(vec![kind])
|
||||
.authors(vec![author.clone()])
|
||||
.limit(1);
|
||||
|
||||
|
||||
let mut notifications = self.client.notifications();
|
||||
let this_subscription_id = self
|
||||
.client
|
||||
.subscribe(Vec::from([subscription_filter]), None)
|
||||
.await;
|
||||
|
||||
let mut event: Option<Event> = None;
|
||||
let mut notifications = self.client.notifications();
|
||||
|
||||
|
||||
while let Ok(result) = timeout(NOTE_FETCH_TIMEOUT, notifications.recv()).await {
|
||||
if let Ok(notification) = result {
|
||||
if let RelayPoolNotification::Event {
|
||||
|
@ -27,7 +27,7 @@ pub struct NotificationManager {
|
||||
db: Mutex<r2d2::Pool<SqliteConnectionManager>>,
|
||||
apns_topic: String,
|
||||
apns_client: Mutex<Client>,
|
||||
nostr_network_helper: Mutex<NostrNetworkHelper>,
|
||||
nostr_network_helper: NostrNetworkHelper,
|
||||
}
|
||||
|
||||
impl NotificationManager {
|
||||
@ -42,8 +42,6 @@ impl NotificationManager {
|
||||
apns_environment: a2::client::Endpoint,
|
||||
apns_topic: String,
|
||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let mute_manager = NostrNetworkHelper::new(relay_url.clone()).await?;
|
||||
|
||||
let connection = db.get()?;
|
||||
Self::setup_database(&connection)?;
|
||||
|
||||
@ -60,7 +58,7 @@ impl NotificationManager {
|
||||
apns_topic,
|
||||
apns_client: Mutex::new(client),
|
||||
db: Mutex::new(db),
|
||||
nostr_network_helper: Mutex::new(mute_manager),
|
||||
nostr_network_helper: NostrNetworkHelper::new(relay_url.clone()).await?,
|
||||
})
|
||||
}
|
||||
|
||||
@ -220,8 +218,7 @@ impl NotificationManager {
|
||||
let mut pubkeys_to_notify = HashSet::new();
|
||||
for pubkey in relevant_pubkeys_yet_to_receive {
|
||||
let should_mute: bool = {
|
||||
let mut mute_manager_mutex_guard = self.nostr_network_helper.lock().await;
|
||||
mute_manager_mutex_guard
|
||||
self.nostr_network_helper
|
||||
.should_mute_notification_for_pubkey(event, &pubkey)
|
||||
.await
|
||||
};
|
||||
@ -285,8 +282,7 @@ impl NotificationManager {
|
||||
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let notification_preferences = self.get_user_notification_settings(pubkey, device_token).await?;
|
||||
if notification_preferences.only_notifications_from_following_enabled {
|
||||
let mut nostr_network_helper_mutex_guard = self.nostr_network_helper.lock().await;
|
||||
if !nostr_network_helper_mutex_guard.does_pubkey_follow_pubkey(pubkey, &event.author()).await {
|
||||
if !self.nostr_network_helper.does_pubkey_follow_pubkey(pubkey, &event.author()).await {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user