feat: setup for blossom uploads

refactor: move db to crate
This commit is contained in:
2024-11-15 17:26:20 +00:00
parent 0539468a5c
commit 9fdc1defaa
18 changed files with 2825 additions and 370 deletions

2007
zap-stream-db/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

14
zap-stream-db/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "zap-stream-db"
version = "0.1.0"
edition = "2021"
[features]
default = []
test-pattern = []
[dependencies]
anyhow = "^1.0.70"
chrono = { version = "0.4.38", features = ["serde"] }
sqlx = { version = "0.8.2", features = ["runtime-tokio", "migrate", "mysql", "chrono"] }
log = "0.4.22"

View File

@ -0,0 +1,40 @@
-- Add migration script here
create table user
(
id integer unsigned not null auto_increment primary key,
pubkey binary(32) not null,
created timestamp default current_timestamp,
balance bigint not null default 0,
tos_accepted timestamp,
stream_key text not null default uuid(),
is_admin bool not null default false,
is_blocked bool not null default false
);
create unique index ix_user_pubkey on user (pubkey);
create table user_stream
(
id integer unsigned not null auto_increment primary key,
user_id integer unsigned not null,
starts timestamp not null,
ends timestamp,
state smallint not null,
title text,
summary text,
image text,
thumb text,
tags text,
content_warning text,
goal text,
pinned text,
-- milli-sats paid for this stream
cost bigint not null default 0,
-- duration in seconds
duration float not null default 0,
-- admission fee
fee integer unsigned,
-- current nostr event json
event text,
constraint fk_user_stream_user
foreign key (user_id) references user (id)
);

88
zap-stream-db/src/db.rs Normal file
View File

@ -0,0 +1,88 @@
use crate::UserStream;
use anyhow::Result;
use log::info;
use sqlx::{MySqlPool, Row};
pub struct ZapStreamDb {
db: MySqlPool,
}
impl ZapStreamDb {
pub async fn new(db: &str) -> Result<Self> {
let db = MySqlPool::connect(db).await?;
Ok(ZapStreamDb { db })
}
pub async fn migrate(&self) -> Result<()> {
sqlx::migrate!().run(&self.db).await?;
Ok(())
}
/// Find user by stream key, typical first lookup from ingress
pub async fn find_user_stream_key(&self, key: &str) -> Result<Option<u64>> {
#[cfg(feature = "test-pattern")]
if key == "test-pattern" {
// use the 00 pubkey for test sources
return Ok(Some(self.upsert_user(&[0; 32]).await?));
}
Ok(sqlx::query("select id from user where stream_key = ?")
.bind(key)
.fetch_optional(&self.db)
.await?
.map(|r| r.try_get(0).unwrap()))
}
pub async fn upsert_user(&self, pubkey: &[u8; 32]) -> Result<u64> {
let res = sqlx::query("insert ignore into user(pubkey) values(?) returning id")
.bind(pubkey.as_slice())
.fetch_optional(&self.db)
.await?;
match res {
None => sqlx::query("select id from user where pubkey = ?")
.bind(pubkey.as_slice())
.fetch_one(&self.db)
.await?
.try_get(0)
.map_err(anyhow::Error::new),
Some(res) => res.try_get(0).map_err(anyhow::Error::new),
}
}
pub async fn insert_stream(&self, user_stream: &UserStream) -> Result<u64> {
sqlx::query(
"insert into user_stream (user_id, state, starts) values (?, ?, ?) returning id",
)
.bind(&user_stream.user_id)
.bind(&user_stream.state)
.bind(&user_stream.starts)
.fetch_one(&self.db)
.await?
.try_get(0)
.map_err(anyhow::Error::new)
}
pub async fn update_stream(&self, user_stream: &UserStream) -> Result<()> {
sqlx::query(
"update user_stream set state = ?, starts = ?, ends = ?, title = ?, summary = ?, image = ?, thumb = ?, tags = ?, content_warning = ?, goal = ?, pinned = ?, fee = ?, event = ? where id = ?",
)
.bind(&user_stream.state)
.bind(&user_stream.starts)
.bind(&user_stream.ends)
.bind(&user_stream.title)
.bind(&user_stream.summary)
.bind(&user_stream.image)
.bind(&user_stream.thumb)
.bind(&user_stream.tags)
.bind(&user_stream.content_warning)
.bind(&user_stream.goal)
.bind(&user_stream.pinned)
.bind(&user_stream.fee)
.bind(&user_stream.event)
.bind(&user_stream.id)
.execute(&self.db)
.await
.map_err(anyhow::Error::new)?;
Ok(())
}
}

7
zap-stream-db/src/lib.rs Normal file
View File

@ -0,0 +1,7 @@
mod db;
mod model;
pub use db::*;
pub use model::*;
pub use sqlx;

View File

@ -0,0 +1,57 @@
use chrono::{DateTime, Utc};
use sqlx::{FromRow, Type};
use std::fmt::{Display, Formatter};
#[derive(Debug, Clone, FromRow)]
pub struct User {
pub id: u64,
pub pubkey: [u8; 32],
pub created: DateTime<Utc>,
pub balance: i64,
pub tos_accepted: DateTime<Utc>,
pub stream_key: String,
pub is_admin: bool,
pub is_blocked: bool,
}
#[derive(Default, Debug, Clone, Type)]
#[repr(u8)]
pub enum UserStreamState {
#[default]
Unknown = 0,
Planned = 1,
Live = 2,
Ended = 3,
}
impl Display for UserStreamState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
UserStreamState::Unknown => write!(f, "unknown"),
UserStreamState::Planned => write!(f, "planned"),
UserStreamState::Live => write!(f, "live"),
UserStreamState::Ended => write!(f, "ended"),
}
}
}
#[derive(Debug, Clone, Default, FromRow)]
pub struct UserStream {
pub id: u64,
pub user_id: u64,
pub starts: DateTime<Utc>,
pub ends: Option<DateTime<Utc>>,
pub state: UserStreamState,
pub title: Option<String>,
pub summary: Option<String>,
pub image: Option<String>,
pub thumb: Option<String>,
pub tags: Option<String>,
pub content_warning: Option<String>,
pub goal: Option<String>,
pub pinned: Option<String>,
pub cost: u64,
pub duration: f32,
pub fee: Option<u32>,
pub event: Option<String>,
}