mirror of
https://github.com/nostrlabs-io/notepush.git
synced 2025-06-15 11:28:23 +00:00
Better mute handling
This commit improves the robustness of mute list handling: 1. It listens to new mute lists from the relay interface, and saves them whenever there is a new one 2. It uses the user's own relay lists to fetch events (such as mute lists), helping to ensure we get their mute lists even when they are not stored in the Damus relay 3. It saves events it sees when fetching them from the network, if applicable Changelog-Changed: Improved robustness of mute handling Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1658,6 +1658,7 @@ dependencies = [
|
||||
"fallible-streaming-iterator",
|
||||
"hashlink",
|
||||
"libsqlite3-sys",
|
||||
"serde_json",
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
|
@ -17,7 +17,7 @@ tracing = "0.1.40"
|
||||
# That said, it's not ideal for all scenarios and in particular, generic
|
||||
# libraries built around `rusqlite` should probably not enable it, which
|
||||
# is why it is not a default feature -- it could become hard to disable.
|
||||
rusqlite = { version = "0.31.0", features = ["bundled"] }
|
||||
rusqlite = { version = "0.31.0", features = ["bundled", "serde_json"] }
|
||||
chrono = { version = "0.4.38" }
|
||||
a2 = { version = "0.10.0" }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
|
@ -2,7 +2,7 @@ pub mod nostr_network_helper;
|
||||
mod nostr_event_extensions;
|
||||
mod nostr_event_cache;
|
||||
pub mod notification_manager;
|
||||
pub mod utils;
|
||||
|
||||
pub use nostr_network_helper::NostrNetworkHelper;
|
||||
use nostr_event_extensions::{ExtendedEvent, SqlStringConvertible};
|
||||
pub use notification_manager::NotificationManager;
|
||||
|
@ -1,11 +1,10 @@
|
||||
use crate::utils::time_delta::TimeDelta;
|
||||
use crate::{notification_manager::nostr_event_extensions::MaybeConvertibleToTimestampedMuteList, utils::time_delta::TimeDelta};
|
||||
use tokio::time::Duration;
|
||||
use nostr_sdk::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use log;
|
||||
|
||||
use super::nostr_event_extensions::MaybeConvertibleToMuteList;
|
||||
use super::nostr_event_extensions::{MaybeConvertibleToRelayList, RelayList, TimestampedMuteList};
|
||||
|
||||
struct CacheEntry {
|
||||
event: Option<Event>, // `None` means the event does not exist as far as we know (It does NOT mean expired)
|
||||
@ -23,6 +22,7 @@ pub struct Cache {
|
||||
entries: HashMap<EventId, Arc<CacheEntry>>,
|
||||
mute_lists: HashMap<PublicKey, Arc<CacheEntry>>,
|
||||
contact_lists: HashMap<PublicKey, Arc<CacheEntry>>,
|
||||
relay_lists: HashMap<PublicKey, Arc<CacheEntry>>,
|
||||
max_age: Duration,
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ impl Cache {
|
||||
entries: HashMap::new(),
|
||||
mute_lists: HashMap::new(),
|
||||
contact_lists: HashMap::new(),
|
||||
relay_lists: HashMap::new(),
|
||||
max_age,
|
||||
}
|
||||
}
|
||||
@ -54,6 +55,20 @@ impl Cache {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_optional_relay_list_with_author(&mut self, author: &PublicKey, relay_list_event: Option<Event>) {
|
||||
if let Some(relay_list_event) = relay_list_event {
|
||||
self.add_event(relay_list_event);
|
||||
} else {
|
||||
self.relay_lists.insert(
|
||||
author.clone(),
|
||||
Arc::new(CacheEntry {
|
||||
event: None,
|
||||
added_at: nostr::Timestamp::now(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_optional_contact_list_with_author(&mut self, author: &PublicKey, contact_list: Option<Event>) {
|
||||
if let Some(contact_list) = contact_list {
|
||||
self.add_event(contact_list);
|
||||
@ -84,23 +99,27 @@ impl Cache {
|
||||
self.contact_lists
|
||||
.insert(event.pubkey.clone(), entry.clone());
|
||||
log::debug!("Added contact list to the cache. Event ID: {}", event.id.to_hex());
|
||||
}
|
||||
},
|
||||
Kind::RelayList => {
|
||||
self.relay_lists.insert(event.pubkey.clone(), entry.clone());
|
||||
log::debug!("Added relay list to the cache. Event ID: {}", event.id.to_hex());
|
||||
},
|
||||
_ => {
|
||||
log::debug!("Added event to the cache. Event ID: {}", event.id.to_hex());
|
||||
log::debug!("Unknown event kind, not adding to any cache. Event ID: {}", event.id.to_hex());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Fetching items from the cache
|
||||
|
||||
pub fn get_mute_list(&mut self, pubkey: &PublicKey) -> Result<Option<MuteList>, CacheError> {
|
||||
pub fn get_mute_list(&mut self, pubkey: &PublicKey) -> Result<Option<TimestampedMuteList>, CacheError> {
|
||||
if let Some(entry) = self.mute_lists.get(pubkey) {
|
||||
let entry = entry.clone(); // Clone the Arc to avoid borrowing issues
|
||||
if !entry.is_expired(self.max_age) {
|
||||
match &entry.event {
|
||||
Some(event) => {
|
||||
log::debug!("Cached mute list for pubkey {} was found", pubkey.to_hex());
|
||||
return Ok(event.to_mute_list());
|
||||
return Ok(event.to_timestamped_mute_list());
|
||||
}
|
||||
None => {
|
||||
log::debug!("Empty mute list cache entry for pubkey {}", pubkey.to_hex());
|
||||
@ -117,6 +136,22 @@ impl Cache {
|
||||
Err(CacheError::NotFound)
|
||||
}
|
||||
|
||||
pub fn get_relay_list(&mut self, pubkey: &PublicKey) -> Result<Option<RelayList>, CacheError> {
|
||||
if let Some(entry) = self.relay_lists.get(pubkey) {
|
||||
let entry = entry.clone(); // Clone the Arc to avoid borrowing issues
|
||||
if !entry.is_expired(self.max_age) {
|
||||
if let Some(event) = entry.event.clone() {
|
||||
return Ok(event.to_relay_list());
|
||||
}
|
||||
} else {
|
||||
log::debug!("Relay list for pubkey {} is expired, removing it from the cache", pubkey.to_hex());
|
||||
self.mute_lists.remove(pubkey);
|
||||
self.remove_event_from_all_maps(&entry.event);
|
||||
}
|
||||
}
|
||||
Err(CacheError::NotFound)
|
||||
}
|
||||
|
||||
pub fn get_contact_list(&mut self, pubkey: &PublicKey) -> Result<Option<Event>, CacheError> {
|
||||
if let Some(entry) = self.contact_lists.get(pubkey) {
|
||||
let entry = entry.clone(); // Clone the Arc to avoid borrowing issues
|
||||
|
@ -1,5 +1,5 @@
|
||||
use nostr::{self, key::PublicKey, nips::nip51::MuteList, Alphabet, SingleLetterTag, TagKind::SingleLetter};
|
||||
use nostr_sdk::{Kind, TagKind};
|
||||
use nostr::{self, key::PublicKey, nips::{nip51::MuteList, nip65}, Alphabet, SingleLetterTag, TagKind::SingleLetter};
|
||||
use nostr_sdk::{EventId, Kind, TagKind};
|
||||
|
||||
/// Temporary scaffolding of old methods that have not been ported to use native Event methods
|
||||
pub trait ExtendedEvent {
|
||||
@ -102,6 +102,10 @@ pub trait MaybeConvertibleToMuteList {
|
||||
fn to_mute_list(&self) -> Option<MuteList>;
|
||||
}
|
||||
|
||||
pub trait MaybeConvertibleToTimestampedMuteList {
|
||||
fn to_timestamped_mute_list(&self) -> Option<TimestampedMuteList>;
|
||||
}
|
||||
|
||||
impl MaybeConvertibleToMuteList for nostr::Event {
|
||||
fn to_mute_list(&self) -> Option<MuteList> {
|
||||
if self.kind != Kind::MuteList {
|
||||
@ -115,3 +119,104 @@ impl MaybeConvertibleToMuteList for nostr::Event {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl MaybeConvertibleToTimestampedMuteList for nostr::Event {
|
||||
fn to_timestamped_mute_list(&self) -> Option<TimestampedMuteList> {
|
||||
if self.kind != Kind::MuteList {
|
||||
return None;
|
||||
}
|
||||
let mute_list = self.to_mute_list()?;
|
||||
Some(TimestampedMuteList {
|
||||
mute_list,
|
||||
timestamp: self.created_at.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type RelayList = Vec<(nostr::Url, Option<nostr::nips::nip65::RelayMetadata>)>;
|
||||
|
||||
pub trait MaybeConvertibleToRelayList {
|
||||
fn to_relay_list(&self) -> Option<RelayList>;
|
||||
}
|
||||
|
||||
impl MaybeConvertibleToRelayList for nostr::Event {
|
||||
fn to_relay_list(&self) -> Option<RelayList> {
|
||||
if self.kind != Kind::RelayList {
|
||||
return None;
|
||||
}
|
||||
let extracted_relay_list = nip65::extract_relay_list(&self);
|
||||
// Convert the extracted relay list data fully into owned data that can be returned
|
||||
let extracted_relay_list_owned = extracted_relay_list.into_iter()
|
||||
.map(|(url, metadata)| (url.clone(), metadata.as_ref().map(|m| m.clone())))
|
||||
.collect();
|
||||
Some(extracted_relay_list_owned)
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for types that can be encoded to and decoded from JSON, specific to this crate.
|
||||
/// This is defined to overcome the rust compiler's limitation of implementing a trait for a type that is not defined in the same crate.
|
||||
pub trait Codable {
|
||||
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error>>;
|
||||
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error>>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
impl Codable for MuteList {
|
||||
fn to_json(&self) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
|
||||
Ok(serde_json::json!({
|
||||
"public_keys": self.public_keys.iter().map(|pk| pk.to_hex()).collect::<Vec<String>>(),
|
||||
"hashtags": self.hashtags.clone(),
|
||||
"event_ids": self.event_ids.iter().map(|id| id.to_hex()).collect::<Vec<String>>(),
|
||||
"words": self.words.clone()
|
||||
}))
|
||||
}
|
||||
|
||||
fn from_json(json: serde_json::Value) -> Result<Self, Box<dyn std::error::Error>>
|
||||
where
|
||||
Self: Sized {
|
||||
let public_keys = json.get("public_keys")
|
||||
.ok_or_else(|| "Missing 'public_keys' field".to_string())?
|
||||
.as_array()
|
||||
.ok_or_else(|| "'public_keys' must be an array".to_string())?
|
||||
.iter()
|
||||
.map(|pk| PublicKey::from_hex(pk.as_str().unwrap_or_default()).map_err(|e| e.to_string()))
|
||||
.collect::<Result<Vec<PublicKey>, String>>()?;
|
||||
|
||||
let hashtags = json.get("hashtags")
|
||||
.ok_or_else(|| "Missing 'hashtags' field".to_string())?
|
||||
.as_array()
|
||||
.ok_or_else(|| "'hashtags' must be an array".to_string())?
|
||||
.iter()
|
||||
.map(|tag| tag.as_str().map(|s| s.to_string()).ok_or_else(|| "Invalid hashtag".to_string()))
|
||||
.collect::<Result<Vec<String>, String>>()?;
|
||||
|
||||
let event_ids = json.get("event_ids")
|
||||
.ok_or_else(|| "Missing 'event_ids' field".to_string())?
|
||||
.as_array()
|
||||
.ok_or_else(|| "'event_ids' must be an array".to_string())?
|
||||
.iter()
|
||||
.map(|id| EventId::from_hex(id.as_str().unwrap_or_default()).map_err(|e| e.to_string()))
|
||||
.collect::<Result<Vec<EventId>, String>>()?;
|
||||
|
||||
let words = json.get("words")
|
||||
.ok_or_else(|| "Missing 'words' field".to_string())?
|
||||
.as_array()
|
||||
.ok_or_else(|| "'words' must be an array".to_string())?
|
||||
.iter()
|
||||
.map(|word| word.as_str().map(|s| s.to_string()).ok_or_else(|| "Invalid word".to_string()))
|
||||
.collect::<Result<Vec<String>, String>>()?;
|
||||
|
||||
Ok(MuteList {
|
||||
public_keys,
|
||||
hashtags,
|
||||
event_ids,
|
||||
words,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimestampedMuteList {
|
||||
pub mute_list: MuteList,
|
||||
pub timestamp: nostr::Timestamp,
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use tokio::sync::Mutex;
|
||||
use super::nostr_event_extensions::MaybeConvertibleToMuteList;
|
||||
use super::nostr_event_extensions::{MaybeConvertibleToRelayList, MaybeConvertibleToTimestampedMuteList, RelayList, TimestampedMuteList};
|
||||
use super::notification_manager::EventSaver;
|
||||
use super::ExtendedEvent;
|
||||
use nostr_sdk::prelude::*;
|
||||
use super::nostr_event_cache::Cache;
|
||||
@ -8,71 +9,27 @@ use tokio::time::{timeout, Duration};
|
||||
const NOTE_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
pub struct NostrNetworkHelper {
|
||||
client: Client,
|
||||
bootstrap_client: Client,
|
||||
cache: Mutex<Cache>,
|
||||
event_saver: EventSaver
|
||||
}
|
||||
|
||||
impl NostrNetworkHelper {
|
||||
// MARK: - Initialization
|
||||
|
||||
pub async fn new(relay_url: String, cache_max_age: Duration) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
pub async fn new(relay_url: String, cache_max_age: Duration, event_saver: EventSaver) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let client = Client::new(&Keys::generate());
|
||||
client.add_relay(relay_url.clone()).await?;
|
||||
client.connect().await;
|
||||
|
||||
Ok(NostrNetworkHelper {
|
||||
client,
|
||||
bootstrap_client: client,
|
||||
cache: Mutex::new(Cache::new(cache_max_age)),
|
||||
event_saver,
|
||||
})
|
||||
}
|
||||
|
||||
// MARK: - Answering questions about a user
|
||||
|
||||
pub async fn should_mute_notification_for_pubkey(
|
||||
&self,
|
||||
event: &Event,
|
||||
pubkey: &PublicKey,
|
||||
) -> bool {
|
||||
log::debug!(
|
||||
"Checking if event {:?} should be muted for pubkey {:?}",
|
||||
event,
|
||||
pubkey
|
||||
);
|
||||
if let Some(mute_list) = self.get_public_mute_list(pubkey).await {
|
||||
for muted_public_key in mute_list.public_keys {
|
||||
if event.pubkey == muted_public_key {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_event_id in mute_list.event_ids {
|
||||
if event.id == muted_event_id
|
||||
|| event.referenced_event_ids().contains(&muted_event_id)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_hashtag in mute_list.hashtags {
|
||||
if event
|
||||
.referenced_hashtags()
|
||||
.iter()
|
||||
.any(|t| t == &muted_hashtag)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_word in mute_list.words {
|
||||
if event
|
||||
.content
|
||||
.to_lowercase()
|
||||
.contains(&muted_word.to_lowercase())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub async fn does_pubkey_follow_pubkey(
|
||||
&self,
|
||||
source_pubkey: &PublicKey,
|
||||
@ -91,7 +48,7 @@ impl NostrNetworkHelper {
|
||||
|
||||
// MARK: - Getting specific event types with caching
|
||||
|
||||
pub async fn get_public_mute_list(&self, pubkey: &PublicKey) -> Option<MuteList> {
|
||||
pub async fn get_public_mute_list(&self, pubkey: &PublicKey) -> Option<TimestampedMuteList> {
|
||||
{
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
if let Ok(optional_mute_list) = cache_mutex_guard.get_mute_list(pubkey) {
|
||||
@ -103,7 +60,22 @@ impl NostrNetworkHelper {
|
||||
let mute_list_event = self.fetch_single_event(pubkey, Kind::MuteList).await;
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
cache_mutex_guard.add_optional_mute_list_with_author(pubkey, mute_list_event.clone());
|
||||
mute_list_event?.to_mute_list()
|
||||
Some(mute_list_event?.to_timestamped_mute_list()?)
|
||||
}
|
||||
|
||||
pub async fn get_relay_list(&self, pubkey: &PublicKey) -> Option<RelayList> {
|
||||
{
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
if let Ok(optional_relay_list) = cache_mutex_guard.get_relay_list(pubkey) {
|
||||
return optional_relay_list;
|
||||
}
|
||||
} // Release the lock here for improved performance
|
||||
|
||||
// We don't have an answer from the cache, so we need to fetch it
|
||||
let relay_list_event = NostrNetworkHelper::fetch_single_event_from_client(pubkey, Kind::RelayList, &self.bootstrap_client).await;
|
||||
let mut cache_mutex_guard = self.cache.lock().await;
|
||||
cache_mutex_guard.add_optional_relay_list_with_author(pubkey, relay_list_event.clone());
|
||||
relay_list_event?.to_relay_list()
|
||||
}
|
||||
|
||||
pub async fn get_contact_list(&self, pubkey: &PublicKey) -> Option<Event> {
|
||||
@ -124,14 +96,48 @@ impl NostrNetworkHelper {
|
||||
// MARK: - Lower level fetching functions
|
||||
|
||||
async fn fetch_single_event(&self, author: &PublicKey, kind: Kind) -> Option<Event> {
|
||||
let event = match self.make_client_for(author).await {
|
||||
Some(client) => {
|
||||
NostrNetworkHelper::fetch_single_event_from_client(author, kind, &client).await
|
||||
},
|
||||
None => {
|
||||
NostrNetworkHelper::fetch_single_event_from_client(author, kind, &self.bootstrap_client).await
|
||||
},
|
||||
};
|
||||
// Save event to our database if needed
|
||||
if let Some(event) = event.clone() {
|
||||
if let Err(error) = self.event_saver.save_if_needed(&event).await {
|
||||
log::warn!("Failed to save event '{:?}'. Error: {:?}", event.id.to_hex(), error)
|
||||
}
|
||||
}
|
||||
event
|
||||
}
|
||||
|
||||
async fn make_client_for(&self, author: &PublicKey) -> Option<Client> {
|
||||
let client = Client::new(&Keys::generate());
|
||||
|
||||
let relay_list = self.get_relay_list(author).await?;
|
||||
for (url, metadata) in relay_list {
|
||||
if metadata.map_or(true, |m| m == RelayMetadata::Write) { // Only add "write" relays, as per NIP-65 spec on reading data FROM user
|
||||
if let Err(e) = client.add_relay(url.clone()).await {
|
||||
log::warn!("Failed to add relay URL: {:?}, error: {:?}", url, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.connect().await;
|
||||
|
||||
Some(client)
|
||||
}
|
||||
|
||||
async fn fetch_single_event_from_client(author: &PublicKey, kind: Kind, client: &Client) -> Option<Event> {
|
||||
let subscription_filter = Filter::new()
|
||||
.kinds(vec![kind])
|
||||
.authors(vec![author.clone()])
|
||||
.limit(1);
|
||||
|
||||
let mut notifications = self.client.notifications();
|
||||
let this_subscription_id = self
|
||||
.client
|
||||
let mut notifications = client.notifications();
|
||||
let this_subscription_id = client
|
||||
.subscribe(Vec::from([subscription_filter]), None)
|
||||
.await;
|
||||
|
||||
@ -157,7 +163,7 @@ impl NostrNetworkHelper {
|
||||
log::info!("Event of kind {:?} not found for pubkey {:?}", kind, author);
|
||||
}
|
||||
|
||||
self.client.unsubscribe(this_subscription_id).await;
|
||||
client.unsubscribe(this_subscription_id).await;
|
||||
event
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ use a2::{Client, ClientConfig, DefaultNotificationBuilder, NotificationBuilder};
|
||||
use log;
|
||||
use nostr::event::EventId;
|
||||
use nostr::key::PublicKey;
|
||||
use nostr::nips::nip51::MuteList;
|
||||
use nostr::types::Timestamp;
|
||||
use nostr_sdk::JsonUtil;
|
||||
use nostr_sdk::Kind;
|
||||
@ -11,9 +12,14 @@ use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::Mutex;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tokio;
|
||||
|
||||
use super::nostr_event_extensions::Codable;
|
||||
use super::nostr_event_extensions::MaybeConvertibleToMuteList;
|
||||
use super::nostr_event_extensions::TimestampedMuteList;
|
||||
use super::nostr_network_helper::NostrNetworkHelper;
|
||||
use super::utils::should_mute_notification_for_mutelist;
|
||||
use super::ExtendedEvent;
|
||||
use super::SqlStringConvertible;
|
||||
use nostr::Event;
|
||||
@ -24,10 +30,90 @@ use std::fs::File;
|
||||
// MARK: - NotificationManager
|
||||
|
||||
pub struct NotificationManager {
|
||||
db: Mutex<r2d2::Pool<SqliteConnectionManager>>,
|
||||
db: Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>,
|
||||
apns_topic: String,
|
||||
apns_client: Mutex<Client>,
|
||||
nostr_network_helper: NostrNetworkHelper,
|
||||
pub event_saver: EventSaver
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EventSaver {
|
||||
db: Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>
|
||||
}
|
||||
|
||||
impl EventSaver {
|
||||
pub fn new(db: Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
pub async fn save_if_needed(&self, event: &nostr::Event) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
match event.to_mute_list() {
|
||||
Some(mute_list) => {
|
||||
match self.get_saved_mute_list_for(event.author()).await.ok().flatten() {
|
||||
Some(saved_timestamped_mute_list) => {
|
||||
let saved_mute_list_timestamp = saved_timestamped_mute_list.timestamp;
|
||||
if saved_mute_list_timestamp < event.created_at() {
|
||||
self.save_mute_list(event.author(), mute_list, event.created_at).await?;
|
||||
}
|
||||
else {
|
||||
return Ok(false)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
self.save_mute_list(event.author(), mute_list, event.created_at).await?;
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
},
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Muting preferences
|
||||
|
||||
pub async fn save_mute_list(&self, pubkey: PublicKey, mute_list: MuteList, created_at: Timestamp) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mute_list_json = mute_list.to_json()?;
|
||||
let db_mutex_guard = self.db.lock().await;
|
||||
let connection = db_mutex_guard.get()?;
|
||||
|
||||
connection.execute(
|
||||
"INSERT OR REPLACE INTO muting_preferences (user_pubkey, mute_list, created_at) VALUES (?, ?, ?)",
|
||||
params![
|
||||
pubkey.to_sql_string(),
|
||||
mute_list_json,
|
||||
created_at.to_sql_string()
|
||||
],
|
||||
)?;
|
||||
|
||||
log::debug!("Mute list saved for pubkey {}", pubkey.to_hex());
|
||||
log::debug!("Mute list: {:?}", mute_list);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_saved_mute_list_for(&self, pubkey: PublicKey) -> Result<Option<TimestampedMuteList>, Box<dyn std::error::Error>> {
|
||||
let db_mutex_guard = self.db.lock().await;
|
||||
let connection = db_mutex_guard.get()?;
|
||||
|
||||
let mut stmt = connection.prepare(
|
||||
"SELECT mute_list, created_at FROM muting_preferences WHERE user_pubkey = ?",
|
||||
)?;
|
||||
|
||||
let mute_list_info: (serde_json::Value, nostr::Timestamp) = match stmt.query_row([pubkey.to_sql_string()], |row| { Ok((row.get(0)?, row.get(1)?)) }) {
|
||||
Ok(info) => (info.0, nostr::Timestamp::from_sql_string(info.1)?),
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let mute_list = MuteList::from_json(mute_list_info.0)?;
|
||||
let timestamped_mute_list = TimestampedMuteList {
|
||||
mute_list,
|
||||
timestamp: mute_list_info.1
|
||||
};
|
||||
|
||||
Ok(Some(timestamped_mute_list))
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationManager {
|
||||
@ -55,12 +141,18 @@ impl NotificationManager {
|
||||
ClientConfig::new(apns_environment.clone()),
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
let db = Arc::new(Mutex::new(db));
|
||||
let event_saver = EventSaver::new(db.clone());
|
||||
|
||||
let manager = NotificationManager {
|
||||
db,
|
||||
apns_topic,
|
||||
apns_client: Mutex::new(client),
|
||||
db: Mutex::new(db),
|
||||
nostr_network_helper: NostrNetworkHelper::new(relay_url.clone(), cache_max_age).await?,
|
||||
})
|
||||
nostr_network_helper: NostrNetworkHelper::new(relay_url.clone(), cache_max_age, event_saver.clone()).await?,
|
||||
event_saver,
|
||||
};
|
||||
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
// MARK: - Database setup operations
|
||||
@ -109,6 +201,17 @@ impl NotificationManager {
|
||||
Self::add_column_if_not_exists(&db, "user_info", "dm_notifications_enabled", "BOOLEAN", Some("true"))?;
|
||||
Self::add_column_if_not_exists(&db, "user_info", "only_notifications_from_following_enabled", "BOOLEAN", Some("false"))?;
|
||||
|
||||
// Migration related to mute list improvements (https://github.com/damus-io/damus/issues/2118)
|
||||
|
||||
db.execute(
|
||||
"CREATE TABLE IF NOT EXISTS muting_preferences (
|
||||
user_pubkey TEXT PRIMARY KEY,
|
||||
mute_list JSON NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)",
|
||||
[],
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -226,8 +329,7 @@ impl NotificationManager {
|
||||
let mut pubkeys_to_notify = HashSet::new();
|
||||
for pubkey in relevant_pubkeys_yet_to_receive {
|
||||
let should_mute: bool = {
|
||||
self.nostr_network_helper
|
||||
.should_mute_notification_for_pubkey(event, &pubkey)
|
||||
self.should_mute_notification_for_pubkey(event, &pubkey)
|
||||
.await
|
||||
};
|
||||
if !should_mute {
|
||||
@ -237,6 +339,42 @@ impl NotificationManager {
|
||||
Ok(pubkeys_to_notify)
|
||||
}
|
||||
|
||||
async fn should_mute_notification_for_pubkey(&self, event: &Event, pubkey: &PublicKey) -> bool {
|
||||
let latest_mute_list = self.get_newest_mute_list_available(pubkey).await.ok().flatten();
|
||||
if let Some(latest_mute_list) = latest_mute_list {
|
||||
return should_mute_notification_for_mutelist(event, &latest_mute_list)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
async fn get_newest_mute_list_available(&self, pubkey: &PublicKey) -> Result<Option<MuteList>, Box<dyn std::error::Error>> {
|
||||
let timestamped_saved_mute_list = self.event_saver.get_saved_mute_list_for(*pubkey).await?;
|
||||
let timestamped_network_mute_list = self.nostr_network_helper.get_public_mute_list(pubkey).await;
|
||||
Ok(match (timestamped_saved_mute_list, timestamped_network_mute_list) {
|
||||
(Some(local_mute), Some(network_mute)) => {
|
||||
if local_mute.timestamp > network_mute.timestamp {
|
||||
log::debug!("Mute lists available in both database and from the network for pubkey {}. Using local mute list since it's newer.", pubkey.to_hex());
|
||||
Some(local_mute.mute_list)
|
||||
} else {
|
||||
log::debug!("Mute lists available in both database and from the network for pubkey {}. Using network mute list since it's newer.", pubkey.to_hex());
|
||||
Some(network_mute.mute_list)
|
||||
}
|
||||
},
|
||||
(Some(local_mute), None) => {
|
||||
log::debug!("Mute list available in database for pubkey {}, but not from the network. Using local mute list.", pubkey.to_hex());
|
||||
Some(local_mute.mute_list)
|
||||
},
|
||||
(None, Some(network_mute)) => {
|
||||
log::debug!("Mute list for pubkey {} available from the network, but not in the database. Using network mute list.", pubkey.to_hex());
|
||||
Some(network_mute.mute_list)
|
||||
},
|
||||
(None, None) => {
|
||||
log::debug!("No mute list available for pubkey {}", pubkey.to_hex());
|
||||
None
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
async fn pubkeys_relevant_to_event(
|
||||
&self,
|
||||
event: &Event,
|
||||
|
41
src/notification_manager/utils.rs
Normal file
41
src/notification_manager/utils.rs
Normal file
@ -0,0 +1,41 @@
|
||||
use nostr::nips::nip51::MuteList;
|
||||
use nostr_sdk::Event;
|
||||
use super::nostr_event_extensions::ExtendedEvent;
|
||||
|
||||
|
||||
pub fn should_mute_notification_for_mutelist(
|
||||
event: &Event,
|
||||
mute_list: &MuteList,
|
||||
) -> bool {
|
||||
for muted_public_key in &mute_list.public_keys {
|
||||
if event.pubkey == *muted_public_key {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_event_id in &mute_list.event_ids {
|
||||
if event.id == *muted_event_id
|
||||
|| event.referenced_event_ids().contains(&muted_event_id)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_hashtag in &mute_list.hashtags {
|
||||
if event
|
||||
.referenced_hashtags()
|
||||
.iter()
|
||||
.any(|t| t == muted_hashtag)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for muted_word in &mute_list.words {
|
||||
if event
|
||||
.content
|
||||
.to_lowercase()
|
||||
.contains(&muted_word.to_lowercase())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
@ -104,6 +104,7 @@ impl RelayConnection {
|
||||
ClientMessage::Event(event) => {
|
||||
log::info!("Received event with id: {:?}", event.id.to_hex());
|
||||
log::debug!("Event received: {:?}", event);
|
||||
self.notification_manager.event_saver.save_if_needed(&event).await?;
|
||||
self.notification_manager.send_notifications_if_needed(&event).await?;
|
||||
let notice_message = format!("blocked: This relay does not store events");
|
||||
let response = RelayMessage::Ok {
|
||||
|
Reference in New Issue
Block a user