Settings: use new settings

This commit is contained in:
Mike Dilger 2023-08-11 18:14:25 +12:00
parent 2d5b255e15
commit 24064272b4
10 changed files with 168 additions and 90 deletions

View File

@ -239,7 +239,6 @@ impl Feed {
// Filter further for the general feed
let dismissed = GLOBALS.dismissed.read().await.clone();
let now = Unixtime::now().unwrap();
let one_month_ago = now - Duration::new(60 * 60 * 24 * 30, 0);
let current_feed_kind = self.current_feed_kind.read().to_owned();
match current_feed_kind {
@ -346,12 +345,14 @@ impl Feed {
}
}
FeedKind::Person(person_pubkey) => {
let since = now - Duration::from_secs(GLOBALS.settings.read().person_feed_chunk);
let events: Vec<(Unixtime, Id)> = GLOBALS
.storage
.find_events(
&kinds, // feed kinds
&[], // any person (due to delegation condition) // FIXME GINA
Some(one_month_ago), // one year ago
&kinds, // feed kinds
&[], // any person (due to delegation condition) // FIXME
Some(since),
|e| {
if dismissed.contains(&e.id) {
return false;

View File

@ -44,8 +44,9 @@ pub struct Fetcher {
impl Fetcher {
pub fn new() -> Fetcher {
let connect_timeout = std::time::Duration::new(10, 0);
let timeout = std::time::Duration::new(15, 0);
let connect_timeout =
std::time::Duration::new(GLOBALS.settings.read().fetcher_connect_timeout_sec, 0);
let timeout = std::time::Duration::new(GLOBALS.settings.read().fetcher_timeout_sec, 0);
let client = match Client::builder()
.gzip(true)
.brotli(true)
@ -83,10 +84,10 @@ impl Fetcher {
pub fn start() {
// Setup periodic queue management
tokio::task::spawn(async {
let fetcher_looptime_ms = GLOBALS.settings.read().fetcher_looptime_ms;
tokio::task::spawn(async move {
loop {
// Every 1200 milliseconds...
tokio::time::sleep(Duration::from_millis(1200)).await;
tokio::time::sleep(Duration::from_millis(fetcher_looptime_ms)).await;
// Process the queue
GLOBALS.fetcher.process_queue().await;
@ -150,7 +151,7 @@ impl Fetcher {
}
let load = self.fetch_host_load(&host);
if load >= 3 {
if load >= GLOBALS.settings.read().fetcher_max_requests_per_host {
continue; // We cannot overload any given host
}
@ -384,6 +385,19 @@ impl Fetcher {
let maybe_response = req.send().await;
let low_exclusion = GLOBALS
.settings
.read()
.fetcher_host_exclusion_on_low_error_secs;
let med_exclusion = GLOBALS
.settings
.read()
.fetcher_host_exclusion_on_med_error_secs;
let high_exclusion = GLOBALS
.settings
.read()
.fetcher_host_exclusion_on_high_error_secs;
// Deal with response errors
let response = match maybe_response {
Ok(r) => r,
@ -391,11 +405,16 @@ impl Fetcher {
if e.is_builder() {
finish(FailOutcome::Fail, "builder error", Some(e.into()), 0);
} else if e.is_timeout() {
finish(FailOutcome::Requeue, "timeout", Some(e.into()), 30);
finish(
FailOutcome::Requeue,
"timeout",
Some(e.into()),
low_exclusion,
);
} else if e.is_request() {
finish(FailOutcome::Fail, "request error", Some(e.into()), 0);
} else if e.is_connect() {
finish(FailOutcome::Fail, "connect error", Some(e.into()), 15);
finish(FailOutcome::Fail, "connect error", Some(e.into()), 0);
} else if e.is_body() {
finish(FailOutcome::Fail, "body error", Some(e.into()), 0);
} else if e.is_decode() {
@ -410,7 +429,12 @@ impl Fetcher {
// Deal with status codes
let status = response.status();
if status.is_informational() {
finish(FailOutcome::Requeue, "informational error", None, 30);
finish(
FailOutcome::Requeue,
"informational error",
None,
low_exclusion,
);
return;
} else if status.is_redirection() {
if status == StatusCode::NOT_MODIFIED {
@ -421,20 +445,23 @@ impl Fetcher {
}
return;
} else if status.is_server_error() {
finish(FailOutcome::Requeue, "server error", None, 300);
// Give the server time to recover
finish(FailOutcome::Requeue, "server error", None, high_exclusion);
return;
// give them 5 minutes, maybe the server will recover
} else if status.is_success() {
// fall through
} else {
match status {
StatusCode::REQUEST_TIMEOUT => {
finish(FailOutcome::Requeue, "request timeout", None, 30);
// give 30 seconds and try again
finish(FailOutcome::Requeue, "request timeout", None, low_exclusion);
}
StatusCode::TOO_MANY_REQUESTS => {
finish(FailOutcome::Requeue, "too many requests", None, 30);
// give 15 seconds and try again
finish(
FailOutcome::Requeue,
"too many requests",
None,
med_exclusion,
);
}
_ => {
finish(FailOutcome::Fail, &format!("{}", status), None, 0);

View File

@ -121,11 +121,10 @@ impl Media {
return None; // can recover if the setting is switched
}
match GLOBALS
.fetcher
.try_get(url, Duration::from_secs(60 * 60 * 24 * 3))
{
// cache expires in 3 days
match GLOBALS.fetcher.try_get(
url,
Duration::from_secs(60 * 60 * GLOBALS.settings.read().media_becomes_stale_hours),
) {
Ok(None) => None,
Ok(Some(bytes)) => {
self.data_temp.insert(url.clone(), bytes);
@ -158,8 +157,18 @@ pub(crate) fn load_image_bytes(
image = crop_square(image);
}
if force_resize || image.width() > 16384 || image.height() > 16384 {
// https://docs.rs/image/latest/image/imageops/enum.FilterType.html
let algo = match &*GLOBALS.settings.read().image_resize_algorithm {
"Nearest" => FilterType::Nearest,
"Triangle" => FilterType::Triangle,
"CatmullRom" => FilterType::CatmullRom,
"Gaussian" => FilterType::Gaussian,
"Lanczos3" => FilterType::Lanczos3,
_ => FilterType::Triangle,
};
// This preserves aspect ratio. The sizes represent bounds.
image = image.resize(default_size, default_size, FilterType::Triangle);
image = image.resize(default_size, default_size, algo);
}
let current_size = [image.width() as _, image.height() as _];
let image_buffer = image.into_rgba8();

View File

@ -77,6 +77,9 @@ impl Minion {
// minion will log when it connects
tracing::trace!("{}: Minion handling started", &self.url);
let fetcher_timeout =
std::time::Duration::new(GLOBALS.settings.read().fetcher_timeout_sec, 0);
// Connect to the relay
let websocket_stream = {
// Parse the URI
@ -94,7 +97,7 @@ impl Minion {
// Fetch NIP-11 data
let request_nip11_future = reqwest::Client::builder()
.timeout(std::time::Duration::new(30, 0))
.timeout(fetcher_timeout)
.redirect(reqwest::redirect::Policy::none())
.gzip(true)
.brotli(true)
@ -126,7 +129,12 @@ impl Minion {
"{}: Unable to parse response as NIP-11 ({}): {}\n",
&self.url,
e,
text.lines().take(10).collect::<Vec<_>>().join("\n")
text.lines()
.take(
GLOBALS.settings.read().nip11_lines_to_output_on_error
)
.collect::<Vec<_>>()
.join("\n")
);
}
}
@ -182,13 +190,16 @@ impl Minion {
// Cameri nostream relay limits to 0.5 a megabyte
// Based on my current database of 7356 events, the longest was 11,121 bytes.
// Cameri said people with >2k followers were losing data at 128kb cutoff.
max_message_size: Some(1024 * 1024), // 1 MB
max_frame_size: Some(1024 * 1024), // 1 MB
accept_unmasked_frames: false, // default is false which is the standard
max_message_size: Some(
GLOBALS.settings.read().max_websocket_message_size_kb * 1024,
),
max_frame_size: Some(GLOBALS.settings.read().max_websocket_frame_size_kb * 1024),
accept_unmasked_frames: GLOBALS.settings.read().websocket_accept_unmasked_frames,
};
let connect_timeout = GLOBALS.settings.read().websocket_connect_timeout_sec;
let (websocket_stream, response) = tokio::time::timeout(
std::time::Duration::new(15, 0),
std::time::Duration::new(connect_timeout, 0),
tokio_tungstenite::connect_async_with_config(req, Some(config), false),
)
.await??;
@ -250,7 +261,10 @@ impl Minion {
async fn loop_handler(&mut self) -> Result<(), Error> {
let ws_stream = self.stream.as_mut().unwrap();
let mut timer = tokio::time::interval(std::time::Duration::new(55, 0));
let mut timer = tokio::time::interval(std::time::Duration::new(
GLOBALS.settings.read().websocket_ping_frequency_sec,
0,
));
timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
timer.tick().await; // use up the first immediate tick.

View File

@ -670,7 +670,8 @@ impl Overlord {
.write("Pruning database, please be patient..".to_owned());
let now = Unixtime::now().unwrap();
let then = now - Duration::new(60 * 60 * 24 * 180, 0); // 180 days
let then = now
- Duration::new(GLOBALS.settings.read().prune_period_days * 60 * 60 * 24, 0);
let count = GLOBALS.storage.prune(then)?;
GLOBALS.status_queue.write().write(format!(

View File

@ -198,8 +198,11 @@ impl People {
task::spawn(async {
loop {
let fetch_metadata_looptime_ms =
GLOBALS.settings.read().fetcher_metadata_looptime_ms;
// Every 3 seconds...
tokio::time::sleep(Duration::from_millis(3000)).await;
tokio::time::sleep(Duration::from_millis(fetch_metadata_looptime_ms)).await;
// We fetch needed metadata
GLOBALS.people.maybe_fetch_metadata().await;
@ -224,12 +227,11 @@ impl People {
&self,
among_these: &[PublicKey],
) -> Vec<PublicKey> {
let one_day_ago = Unixtime::now().unwrap().0 - (60 * 60 * 8);
let stale = Unixtime::now().unwrap().0
- 60 * 60 * GLOBALS.settings.read().relay_list_becomes_stale_hours as i64;
if let Ok(vec) = GLOBALS.storage.filter_people(|p| {
p.followed
&& p.relay_list_last_received < one_day_ago
&& among_these.contains(&p.pubkey)
p.followed && p.relay_list_last_received < stale && among_these.contains(&p.pubkey)
}) {
vec.iter().map(|p| p.pubkey).collect()
} else {
@ -270,9 +272,11 @@ impl People {
let need = {
// Metadata refresh interval
let now = Unixtime::now().unwrap();
let eight_hours = Duration::from_secs(60 * 60 * 8);
let stale = Duration::from_secs(
60 * 60 * GLOBALS.settings.read().metadata_becomes_stale_hours,
);
person.metadata_created_at.is_none()
|| person.metadata_last_received < (now - eight_hours).0
|| person.metadata_last_received < (now - stale).0
};
if !need {
return;
@ -394,9 +398,16 @@ impl People {
} else if let Some(last) = person.nip05_last_checked {
// FIXME make these settings
let recheck_duration = if person.nip05_valid {
Duration::from_secs(60 * 60 * 24 * 14)
Duration::from_secs(
60 * 60 * GLOBALS.settings.read().nip05_becomes_stale_if_valid_hours,
)
} else {
Duration::from_secs(60 * 60 * 24)
Duration::from_secs(
60 * GLOBALS
.settings
.read()
.nip05_becomes_stale_if_invalid_minutes,
)
};
Unixtime::now().unwrap() - Unixtime(last as i64) > recheck_duration
} else {
@ -464,10 +475,10 @@ impl People {
}
};
match GLOBALS
.fetcher
.try_get(&url, Duration::from_secs(60 * 60 * 24 * 3))
{
match GLOBALS.fetcher.try_get(
&url,
Duration::from_secs(60 * 60 * GLOBALS.settings.read().avatar_becomes_stale_hours),
) {
// cache expires in 3 days
Ok(None) => None,
Ok(Some(bytes)) => {

View File

@ -46,7 +46,7 @@ pub async fn process_new_event(
if let Some(ref relay_url) = seen_on {
// Verify the event
let mut maxtime = now;
maxtime.0 += 60 * 15; // 15 minutes into the future
maxtime.0 += GLOBALS.settings.read().future_allowance_secs as i64;
if let Err(e) = event.verify(Some(maxtime)) {
tracing::error!(
"{}: VERIFY ERROR: {}, {}",

View File

@ -8,8 +8,6 @@ use nostr_types::{
use parking_lot::RwLock;
use tokio::task;
const DEFAULT_LOG_N: u8 = 18;
#[derive(Default)]
pub struct Signer {
public: RwLock<Option<PublicKey>>,
@ -70,7 +68,7 @@ impl Signer {
}
pub fn set_private_key(&self, pk: PrivateKey, pass: &str) -> Result<(), Error> {
*self.encrypted.write() = Some(pk.export_encrypted(pass, DEFAULT_LOG_N)?);
*self.encrypted.write() = Some(pk.export_encrypted(pass, GLOBALS.settings.read().log_n)?);
*self.public.write() = Some(pk.public_key());
*self.private.write() = Some(pk);
Ok(())
@ -88,7 +86,8 @@ impl Signer {
// If older version, re-encrypt with new version at default 2^18 rounds
if epk.version()? < 2 {
*self.encrypted.write() = Some(private.export_encrypted(pass, DEFAULT_LOG_N)?);
*self.encrypted.write() =
Some(private.export_encrypted(pass, GLOBALS.settings.read().log_n)?);
// and eventually save
task::spawn(async move {
if let Err(e) = GLOBALS.signer.save_through_settings().await {
@ -130,7 +129,7 @@ impl Signer {
Some(epk) => {
// Test password
let pk = epk.decrypt(old)?;
let epk = pk.export_encrypted(new, DEFAULT_LOG_N)?;
let epk = pk.export_encrypted(new, GLOBALS.settings.read().log_n)?;
*self.encrypted.write() = Some(epk);
task::spawn(async move {
if let Err(e) = GLOBALS.signer.save_through_settings().await {
@ -149,7 +148,7 @@ impl Signer {
pub fn generate_private_key(&self, pass: &str) -> Result<(), Error> {
let pk = PrivateKey::generate();
*self.encrypted.write() = Some(pk.export_encrypted(pass, DEFAULT_LOG_N)?);
*self.encrypted.write() = Some(pk.export_encrypted(pass, GLOBALS.settings.read().log_n)?);
*self.public.write() = Some(pk.public_key());
*self.private.write() = Some(pk);
Ok(())
@ -201,7 +200,7 @@ impl Signer {
// We have to regenerate encrypted private key because it may have fallen from
// medium to weak security. And then we need to save that
let epk = pk.export_encrypted(pass, DEFAULT_LOG_N)?;
let epk = pk.export_encrypted(pass, GLOBALS.settings.read().log_n)?;
*self.encrypted.write() = Some(epk);
*self.private.write() = Some(pk);
task::spawn(async move {
@ -226,7 +225,7 @@ impl Signer {
// We have to regenerate encrypted private key because it may have fallen from
// medium to weak security. And then we need to save that
let epk = pk.export_encrypted(pass, DEFAULT_LOG_N)?;
let epk = pk.export_encrypted(pass, GLOBALS.settings.read().log_n)?;
*self.encrypted.write() = Some(epk);
*self.private.write() = Some(pk);
task::spawn(async move {

View File

@ -112,6 +112,8 @@ impl Storage {
// Some filesystem that doesn't handle sparse files may allocate all
// of this, so we don't go too crazy big.
// NOTE: this cannot be a setting because settings are only available
// after the database has been launched.
builder.set_map_size(1048576 * 1024 * 24); // 24 GB
let env = builder.open(&Profile::current()?.lmdb_dir)?;

View File

@ -576,6 +576,55 @@ impl eframe::App for GossipUi {
}
}
if self.settings.status_bar {
egui::TopBottomPanel::top("stats-bar")
.frame(
egui::Frame::side_top_panel(&self.settings.theme.get_style()).inner_margin(
egui::Margin {
left: 0.0,
right: 0.0,
top: 0.0,
bottom: 0.0,
},
),
)
.show(
ctx,
|ui| {
ui.with_layout(egui::Layout::right_to_left(egui::Align::BOTTOM), |ui| {
let in_flight = GLOBALS.fetcher.requests_in_flight();
let queued = GLOBALS.fetcher.requests_queued();
let events = GLOBALS
.storage
.get_event_stats()
.map(|s| s.entries())
.unwrap_or(0);
let relays = GLOBALS.connected_relays.len();
let processed = GLOBALS.events_processed.load(Ordering::Relaxed);
let subs = GLOBALS.open_subscriptions.load(Ordering::Relaxed);
let stats_message = format!(
"EVENTS PROCESSED={} STORED={} RELAYS CONNS={} SUBS={} HTTP: {} / {}",
processed,
events,
relays,
subs,
in_flight,
in_flight + queued
);
let stats_message = RichText::new(stats_message)
.color(self.settings.theme.notice_marker_text_color());
ui.add(Label::new(stats_message))
.on_hover_text(
"events processed: number of events relays have sent to us, including duplicates.\n\
events stored: number of unique events in storage\n\
relay conns: number of relays currently connected\n\
relay subs: number of subscriptions that have not come to EOSE yet\n\
http: number of fetches in flight / number of requests queued");
});
},
);
}
egui::SidePanel::left("main-naviation-panel")
.show_separator_line(false)
.frame(
@ -825,41 +874,6 @@ impl eframe::App for GossipUi {
ui.add_space(7.0);
feed::post::posting_area(self, ctx, frame, ui);
}
/*
ui.vertical(|ui| {
ui.add_space(5.0);
ui.with_layout(egui::Layout::right_to_left(egui::Align::BOTTOM), |ui| {
let in_flight = GLOBALS.fetcher.requests_in_flight();
let queued = GLOBALS.fetcher.requests_queued();
let events = GLOBALS
.storage
.get_event_stats()
.map(|s| s.entries())
.unwrap_or(0);
let relays = GLOBALS.connected_relays.len();
let processed = GLOBALS.events_processed.load(Ordering::Relaxed);
let subs = GLOBALS.open_subscriptions.load(Ordering::Relaxed);
let stats_message = format!(
"EVENTS PROCESSED={} STORED={} RELAYS CONNS={} SUBS={} HTTP: {} / {}",
processed,
events,
relays,
subs,
in_flight,
in_flight + queued
);
let stats_message = RichText::new(stats_message)
.color(self.settings.theme.notice_marker_text_color());
ui.add(Label::new(stats_message))
.on_hover_text(
"events processed: number of events relays have sent to us, including duplicates.\n\
events stored: number of unique events in storage\n\
relay conns: number of relays currently connected\n\
relay subs: number of subscriptions that have not come to EOSE yet\n\
http: number of fetches in flight / number of requests queued");
});
});
*/
});
// Prepare local zap data once per frame for easier compute at render time