URL related code updated for upstream

This commit is contained in:
Mike Dilger 2022-12-29 08:39:43 +13:00
parent 8e25e106bb
commit 45c6fe3915
7 changed files with 84 additions and 50 deletions

View File

@ -17,7 +17,10 @@ pub struct DbRelay {
impl DbRelay {
pub fn new(url: String) -> Result<DbRelay, Error> {
let _ = Url::new_validated(&url)?;
let u = Url::new(&url);
if !u.is_valid() {
return Err(Error::InvalidUrl(u.inner().to_owned()));
}
Ok(DbRelay {
dirty: false,
@ -79,7 +82,10 @@ impl DbRelay {
}
pub async fn insert(relay: DbRelay) -> Result<(), Error> {
let _ = Url::new_validated(&relay.url)?;
let url = Url::new(&relay.url);
if !url.is_valid() {
return Err(Error::InvalidUrl(relay.url.clone()));
}
let sql = "INSERT OR IGNORE INTO relay (url, success_count, failure_count, rank, last_success_at, post) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)";

View File

@ -45,6 +45,9 @@ pub enum Error {
#[error("Invalid URI: {0}")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[error("Bad integer: {0}")]
ParseInt(#[from] std::num::ParseIntError),

View File

@ -131,8 +131,9 @@ impl Globals {
.entry(id)
.and_modify(|urls| {
if let Some(ref u) = url {
if let Ok(valid) = Url::new_validated(u) {
urls.push(valid);
let n = Url::new(u);
if n.is_valid() {
urls.push(n);
}
}
})
@ -313,7 +314,10 @@ pub async fn follow_key_and_relay(pubkey: String, relay: String) -> Result<DbPer
let pubkeyhex = PublicKeyHex(pubkey.clone());
// Validate the url
let _ = Url::new_validated(&relay).map_err(|e| format!("{}", e))?;
let u = Url::new(&relay);
if !u.is_valid() {
return Err(format!("Invalid url: {}", relay));
}
// Create or update them
let person = match DbPerson::fetch_one(pubkeyhex.clone())

View File

@ -36,14 +36,16 @@ pub struct Minion {
impl Minion {
pub async fn new(url: Url) -> Result<Minion, Error> {
let _ = Url::new_validated(&url)?;
if !url.is_valid() {
return Err(Error::InvalidUrl(url.inner().to_owned()));
}
let to_overlord = GLOBALS.to_overlord.clone();
let from_overlord = GLOBALS.to_minions.subscribe();
let dbrelay = match DbRelay::fetch_one(&url).await? {
Some(dbrelay) => dbrelay,
None => {
let dbrelay = DbRelay::new(url.0.clone())?;
let dbrelay = DbRelay::new(url.inner().to_owned())?;
DbRelay::insert(dbrelay.clone()).await?;
dbrelay
}
@ -84,7 +86,7 @@ impl Minion {
// Connect to the relay
let websocket_stream = {
let uri: http::Uri = self.url.0.parse::<Uri>()?;
let uri: http::Uri = self.url.inner().parse::<Uri>()?;
let authority = uri.authority().ok_or(Error::UrlHasNoHostname)?.as_str();
let host = authority
.find('@')
@ -219,7 +221,7 @@ impl Minion {
Err(e) => return Err(e.into())
};
#[allow(clippy::collapsible_if)]
if bus_message.target == self.url.0 {
if bus_message.target == self.url.inner() {
self.handle_bus_message(bus_message).await?;
} else if &*bus_message.target == "all" {
if &*bus_message.kind == "shutdown" {
@ -266,7 +268,7 @@ impl Minion {
// Find the oldest 'last_fetched' among the 'person_relay' table.
// Null values will come through as 0.
let mut special_since: i64 =
DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, &self.url.0).await? as i64;
DbPersonRelay::fetch_oldest_last_fetched(&pubkeys, self.url.inner()).await? as i64;
let (overlap, feed_chunk) = {
let settings = GLOBALS.settings.read().await.clone();

View File

@ -117,7 +117,7 @@ impl Overlord {
.relays
.write()
.await
.insert(Url(relay.url.clone()), relay.clone());
.insert(Url::new(&relay.url), relay.clone());
}
// Load people from the database
@ -264,12 +264,15 @@ impl Overlord {
}
async fn start_minion(&mut self, url: String) -> Result<(), Error> {
let moved_url = Url(url.clone());
let mut minion = Minion::new(moved_url).await?;
let url = Url::new(&url);
if !url.is_valid() {
return Err(Error::InvalidUrl(url.inner().to_owned()));
}
let mut minion = Minion::new(url.clone()).await?;
let abort_handle = self.minions.spawn(async move { minion.handle().await });
let id = abort_handle.id();
self.minions_task_url.insert(id, Url(url.clone()));
self.urls_watching.push(Url(url.clone()));
self.minions_task_url.insert(id, url.clone());
self.urls_watching.push(url.clone());
Ok(())
}
@ -460,11 +463,8 @@ impl Overlord {
for relay in dirty_relays.iter() {
// Just update 'post' since that's all 'dirty' indicates currently
DbRelay::update_post(relay.url.to_owned(), relay.post).await?;
if let Some(relay) = GLOBALS
.relays
.write()
.await
.get_mut(&Url(relay.url.clone()))
if let Some(relay) =
GLOBALS.relays.write().await.get_mut(&Url::new(&relay.url))
{
relay.dirty = false;
}
@ -509,14 +509,14 @@ impl Overlord {
// If we don't have such a minion, start one
if !self.urls_watching.contains(url) {
// Start a minion
self.start_minion(url.0.clone()).await?;
self.start_minion(url.inner().to_owned()).await?;
}
debug!("{}: Asking to fetch {} events", &url.0, ids.len());
debug!("{}: Asking to fetch {} events", url.inner(), ids.len());
// Tell it to get these events
let _ = self.to_minions.send(BusMessage {
target: url.0.clone(),
target: url.inner().to_owned(),
kind: "fetch_events".to_string(),
json_payload: serde_json::to_string(&ids).unwrap(),
});
@ -572,18 +572,20 @@ impl Overlord {
for relay in relays.iter() {
// Save relay
let relay_url = Url::new_validated(relay)?;
let db_relay = DbRelay::new(relay_url.0)?;
let relay_url = Url::new(relay);
if relay_url.is_valid() {
let db_relay = DbRelay::new(relay_url.inner().to_owned())?;
DbRelay::insert(db_relay).await?;
// Save person_relay
DbPersonRelay::upsert_last_suggested_nip35(
(*pubkey).into(),
relay.0.clone(),
relay.inner().to_owned(),
Unixtime::now().unwrap().0 as u64,
)
.await?;
}
}
info!("Setup {} relays for {}", relays.len(), &dns_id);
@ -598,14 +600,17 @@ impl Overlord {
debug!("Followed {}", &pkhex);
// Save relay
let relay_url = Url::new_validated(&relay)?;
let relay_url = Url::new(&relay);
if !relay_url.is_valid() {
return Err(Error::InvalidUrl(relay));
}
let db_relay = DbRelay::new(relay.to_string())?;
DbRelay::insert(db_relay).await?;
// Save person_relay
DbPersonRelay::insert(DbPersonRelay {
person: pkhex.0.clone(),
relay: relay_url.0.clone(),
relay: relay_url.inner().to_owned(),
..Default::default()
})
.await?;
@ -623,14 +628,17 @@ impl Overlord {
debug!("Followed {}", &pkhex);
// Save relay
let relay_url = Url::new_validated(&relay)?;
let relay_url = Url::new(&relay);
if !relay_url.is_valid() {
return Err(Error::InvalidUrl(relay));
}
let db_relay = DbRelay::new(relay.to_string())?;
DbRelay::insert(db_relay).await?;
// Save person_relay
DbPersonRelay::insert(DbPersonRelay {
person: pkhex.0.clone(),
relay: relay_url.0.clone(),
relay: relay_url.inner().to_owned(),
..Default::default()
})
.await?;
@ -672,7 +680,7 @@ impl Overlord {
for relay in relays {
// Start a minion for it, if there is none
if !self.urls_watching.contains(&Url(relay.url.clone())) {
if !self.urls_watching.contains(&Url::new(&relay.url)) {
self.start_minion(relay.url.clone()).await?;
}

View File

@ -38,13 +38,18 @@ pub async fn process_new_event(
// Save event_seen data
let db_event_seen = DbEventSeen {
event: event.id.as_hex_string(),
relay: url.0.clone(),
relay: url.inner().to_owned(),
when_seen: now,
};
DbEventSeen::replace(db_event_seen).await?;
// Update person_relay.last_fetched
DbPersonRelay::upsert_last_fetched(event.pubkey.as_hex_string(), url.0, now).await?;
DbPersonRelay::upsert_last_fetched(
event.pubkey.as_hex_string(),
url.inner().to_owned(),
now,
)
.await?;
}
}
@ -80,9 +85,10 @@ pub async fn process_new_event(
recommended_relay_url: Some(should_be_url),
marker: _,
} => {
if let Ok(url) = Url::new_validated(should_be_url) {
let url = Url::new(should_be_url);
if url.is_valid() {
// Insert (or ignore) into relays table
let dbrelay = DbRelay::new(url.0.clone())?;
let dbrelay = DbRelay::new(url.inner().to_owned())?;
DbRelay::insert(dbrelay).await?;
}
}
@ -91,16 +97,17 @@ pub async fn process_new_event(
recommended_relay_url: Some(should_be_url),
petname: _,
} => {
if let Ok(url) = Url::new_validated(should_be_url) {
let url = Url::new(should_be_url);
if url.is_valid() {
// Insert (or ignore) into relays table
let dbrelay = DbRelay::new(url.0.clone())?;
let dbrelay = DbRelay::new(url.inner().to_owned())?;
DbRelay::insert(dbrelay).await?;
// upsert person_relay.last_suggested_bytag
let now = Unixtime::now()?.0 as u64;
DbPersonRelay::upsert_last_suggested_bytag(
pubkey.as_hex_string(),
url.0.clone(),
url.inner().to_owned(),
now,
)
.await?;

View File

@ -23,7 +23,11 @@ pub(super) fn update(_app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::F
let mut relays: Vec<DbRelay> = relays.drain().map(|(_, relay)| relay).collect();
relays.sort_by(|a, b| a.url.cmp(&b.url));
let postrelays: Vec<DbRelay> = relays.iter().filter(|r| r.post).map(|r| r.to_owned()).collect();
let postrelays: Vec<DbRelay> = relays
.iter()
.filter(|r| r.post)
.map(|r| r.to_owned())
.collect();
ui.add_space(32.0);
@ -38,7 +42,6 @@ pub(super) fn update(_app: &mut GossipUi, _ctx: &Context, _frame: &mut eframe::F
}
ui.with_layout(Layout::top_down(Align::Center), |ui| {
ui.heading("Your Relays (write):");
for relay in postrelays.iter() {
@ -72,11 +75,12 @@ fn render_relay (ui: &mut Ui, relay: &DbRelay, bold: bool) {
let mut post = relay.post; // checkbox needs a mutable state variable.
if ui.checkbox(&mut post, "Post Here")
let url = Url::new(&relay.url);
if url.is_valid() && ui.checkbox(&mut post, "Post Here")
.on_hover_text("If selected, posts you create will be sent to this relay. But you have to press [SAVE CHANGES] at the bottom of this page.")
.clicked()
{
if let Some(relay) = GLOBALS.relays.blocking_write().get_mut(&Url(relay.url.clone())) {
if let Some(relay) = GLOBALS.relays.blocking_write().get_mut(&url) {
relay.post = post;
relay.dirty = true;
}