diff --git a/Cargo.lock b/Cargo.lock index 31f119c..627bcca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,8 +175,9 @@ dependencies = [ [[package]] name = "async-wsocket" -version = "0.11.0" -source = "git+https://github.com/yukibtc/async-wsocket?rev=27f606af6b2028634022a97b5e56c332dfe3f611#27f606af6b2028634022a97b5e56c332dfe3f611" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "197c8c4d501d615e193e548ff0225925306b1f413a0f27f0b0952367ff25e41c" dependencies = [ "async-utility", "futures", @@ -185,7 +186,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-socks", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite", "url", "wasm-bindgen", "web-sys", @@ -218,16 +219,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "base58ck" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c8d66485a3a2ea485c1913c4572ce0256067a5377ac8c75c4960e1cda98605f" -dependencies = [ - "bitcoin-internals 0.3.0", - "bitcoin_hashes 0.14.0", -] - [[package]] name = "base64" version = "0.21.7" @@ -263,62 +254,25 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "bitcoin" -version = "0.32.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6bc65742dea50536e35ad42492b234c27904a27f0abdcbce605015cb4ea026" -dependencies = [ - "base58ck", - "bech32", - "bitcoin-internals 0.3.0", - "bitcoin-io", - "bitcoin-units", - "bitcoin_hashes 0.14.0", - "hex-conservative 0.2.1", - "hex_lit", - "secp256k1", - "serde", -] - [[package]] name = "bitcoin-internals" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9425c3bf7089c983facbae04de54513cce73b41c7f9ff8c845b54e7bc64ebbfb" -[[package]] -name = "bitcoin-internals" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdbe14aa07b06e6cfeffc529a1f099e5fbe249524f8125358604df99a4bed2" -dependencies = [ - "serde", -] - [[package]] name = "bitcoin-io" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b47c4ab7a93edb0c7198c5535ed9b52b63095f4e9b45279c6736cec4b856baf" -[[package]] -name = "bitcoin-units" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5285c8bcaa25876d07f37e3d30c303f2609179716e11d688f51e8f1fe70063e2" -dependencies = [ - "bitcoin-internals 0.3.0", - "serde", -] - [[package]] name = "bitcoin_hashes" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1930a4dabfebb8d7d9992db18ebe3ae2876f0a305fab206fd168df931ede293b" dependencies = [ - "bitcoin-internals 0.2.0", + "bitcoin-internals", "hex-conservative 0.1.2", ] @@ -507,20 +461,20 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "config" -version = "0.14.1" +version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68578f196d2a33ff61b27fae256c3164f65e36382648e30666dde05b8cc9dfdf" +checksum = "e26695492a475c4a091cfda61446d5ba01aac2e1dfbcd27a12fdd11aa2e32596" dependencies = [ "async-trait", "convert_case", "json5", - "nom", "pathdiff", "ron", "rust-ini", "serde", "serde_json", "toml", + "winnow 0.7.1", "yaml-rust2", ] @@ -820,25 +774,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "h2" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.14.5" @@ -864,30 +799,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "headers" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" -dependencies = [ - "base64 0.21.7", - "bytes", - "headers-core", - "http 0.2.12", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http 0.2.12", -] - [[package]] name = "heck" version = "0.5.0" @@ -915,12 +826,6 @@ dependencies = [ "arrayvec", ] -[[package]] -name = "hex_lit" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3011d1213f159867b13cfd6ac92d2cd5f1345762c63be3554e84092d85a50bbd" - [[package]] name = "hmac" version = "0.12.1" @@ -930,17 +835,6 @@ dependencies = [ "digest", ] -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.2.0" @@ -954,12 +848,24 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.6" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 0.2.12", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", "pin-project-lite", ] @@ -983,26 +889,36 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.31" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "h2", - "http 0.2.12", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", "tokio", - "tower-service", - "tracing", - "want", ] [[package]] @@ -1216,6 +1132,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -1286,28 +1211,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "mime" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" - -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1328,24 +1231,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "multer" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" -dependencies = [ - "bytes", - "encoding_rs", - "futures-util", - "http 0.2.12", - "httparse", - "log", - "memchr", - "mime", - "spin", - "version_check", -] - [[package]] name = "negentropy" version = "0.3.1" @@ -1354,39 +1239,26 @@ checksum = "e664971378a3987224f7a0e10059782035e89899ae403718ee07de85bec42afe" [[package]] name = "negentropy" -version = "0.4.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a88da9dd148bbcdce323dd6ac47d369b4769d4a3b78c6c52389b9269f77932" - -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] +checksum = "f0efe882e02d206d8d279c20eb40e03baf7cb5136a1476dc084a324fbc3ec42d" [[package]] name = "nostr" -version = "0.37.0" -source = "git+https://github.com/rust-nostr/nostr.git?rev=f21ffbd2de4e9ad87cd8345158039754cee05031#f21ffbd2de4e9ad87cd8345158039754cee05031" +version = "0.39.0" +source = "git+https://github.com/rust-nostr/nostr.git?rev=b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0#b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0" dependencies = [ - "async-trait", "base64 0.22.1", "bech32", "bip39", - "bitcoin", + "bitcoin_hashes 0.14.0", "cbc", "chacha20", "chacha20poly1305", "getrandom", "instant", - "negentropy 0.3.1", - "negentropy 0.4.3", - "once_cell", "scrypt", + "secp256k1", "serde", "serde_json", "unicode-normalization", @@ -1395,40 +1267,38 @@ dependencies = [ [[package]] name = "nostr-database" -version = "0.37.0" -source = "git+https://github.com/rust-nostr/nostr.git?rev=f21ffbd2de4e9ad87cd8345158039754cee05031#f21ffbd2de4e9ad87cd8345158039754cee05031" +version = "0.39.0" +source = "git+https://github.com/rust-nostr/nostr.git?rev=b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0#b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0" dependencies = [ - "async-trait", "nostr", "tokio", ] [[package]] name = "nostr-relay-builder" -version = "0.37.0" -source = "git+https://github.com/rust-nostr/nostr.git?rev=f21ffbd2de4e9ad87cd8345158039754cee05031#f21ffbd2de4e9ad87cd8345158039754cee05031" +version = "0.39.0" +source = "git+https://github.com/rust-nostr/nostr.git?rev=b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0#b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0" dependencies = [ "async-utility", "async-wsocket", "atomic-destructor", - "negentropy 0.4.3", + "negentropy 0.5.0", "nostr", "nostr-database", - "thiserror 1.0.69", "tokio", "tracing", ] [[package]] name = "nostr-relay-pool" -version = "0.37.0" -source = "git+https://github.com/rust-nostr/nostr.git?rev=f21ffbd2de4e9ad87cd8345158039754cee05031#f21ffbd2de4e9ad87cd8345158039754cee05031" +version = "0.39.0" +source = "git+https://github.com/rust-nostr/nostr.git?rev=b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0#b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0" dependencies = [ "async-utility", "async-wsocket", "atomic-destructor", "negentropy 0.3.1", - "negentropy 0.4.3", + "negentropy 0.5.0", "nostr", "nostr-database", "tokio", @@ -1437,14 +1307,13 @@ dependencies = [ [[package]] name = "nostr-sdk" -version = "0.37.0" -source = "git+https://github.com/rust-nostr/nostr.git?rev=f21ffbd2de4e9ad87cd8345158039754cee05031#f21ffbd2de4e9ad87cd8345158039754cee05031" +version = "0.39.0" +source = "git+https://github.com/rust-nostr/nostr.git?rev=b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0#b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0" dependencies = [ "async-utility", "nostr", "nostr-database", "nostr-relay-pool", - "thiserror 1.0.69", "tokio", "tracing", ] @@ -1455,17 +1324,23 @@ version = "0.1.0" dependencies = [ "anyhow", "async-compression", + "base64 0.22.1", "chrono", "clap", "config", + "http-body-util", + "hyper", + "hyper-util", + "itertools", "log", + "nostr", "nostr-relay-builder", "nostr-sdk", "pretty_env_logger", "serde", "sled", "tokio", - "warp", + "tokio-util", ] [[package]] @@ -1611,26 +1486,6 @@ dependencies = [ "sha2", ] -[[package]] -name = "pin-project" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.15" @@ -1794,12 +1649,13 @@ dependencies = [ [[package]] name = "rust-ini" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" dependencies = [ "cfg-if", "ordered-multimap", + "trim-in-place", ] [[package]] @@ -1854,12 +1710,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -1884,7 +1734,6 @@ version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" dependencies = [ - "bitcoin_hashes 0.14.0", "rand", "secp256k1-sys", "serde", @@ -1925,7 +1774,6 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ - "indexmap", "itoa", "memchr", "ryu", @@ -1941,18 +1789,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "sha1" version = "0.10.6" @@ -1981,15 +1817,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook-registry" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" -dependencies = [ - "libc", -] - [[package]] name = "slab" version = "0.4.9" @@ -2171,7 +1998,6 @@ dependencies = [ "libc", "mio", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -2212,21 +2038,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.21.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.21.0", -] - -[[package]] -name = "tokio-tungstenite" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +checksum = "be4bf6fecd69fcdede0ec680aaf474cdab988f9de6bc73d3758f0160e3b7025a" dependencies = [ "futures-util", "log", @@ -2234,7 +2048,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite 0.24.0", + "tungstenite", "webpki-roots", ] @@ -2282,22 +2096,15 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.20", ] -[[package]] -name = "tower-service" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" - [[package]] name = "tracing" version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2324,47 +2131,28 @@ dependencies = [ ] [[package]] -name = "try-lock" -version = "0.2.5" +name = "trim-in-place" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" [[package]] name = "tungstenite" -version = "0.21.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +checksum = "413083a99c579593656008130e29255e54dcaae495be556cc26888f211648c24" dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.2.0", - "httparse", - "log", - "rand", - "sha1", - "thiserror 1.0.69", - "url", - "utf-8", -] - -[[package]] -name = "tungstenite" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http 1.2.0", + "http", "httparse", "log", "rand", "rustls", "rustls-pki-types", "sha1", - "thiserror 1.0.69", + "thiserror 2.0.7", "utf-8", ] @@ -2380,12 +2168,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" -[[package]] -name = "unicase" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" - [[package]] name = "unicode-ident" version = "1.0.14" @@ -2465,44 +2247,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "want" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" -dependencies = [ - "try-lock", -] - -[[package]] -name = "warp" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "headers", - "http 0.2.12", - "hyper", - "log", - "mime", - "mime_guess", - "multer", - "percent-encoding", - "pin-project", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-tungstenite 0.21.0", - "tokio-util", - "tower-service", - "tracing", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2726,6 +2470,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86e376c75f4f43f44db463cf729e0d3acbf954d13e22c51e26e4c264b4ab545f" +dependencies = [ + "memchr", +] + [[package]] name = "write16" version = "1.0.0" @@ -2740,9 +2493,9 @@ checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" [[package]] name = "yaml-rust2" -version = "0.8.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8902160c4e6f2fb145dbe9d6760a75e3c9522d8bf796ed7047c85919ac7115f8" +checksum = "2a1a1c0bc9823338a3bdf8c61f994f23ac004c6fa32c08cd152984499b445e8d" dependencies = [ "arraydeque", "encoding_rs", diff --git a/Cargo.toml b/Cargo.toml index b23f81d..2c0381e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,19 @@ anyhow = "1.0.94" async-compression = { version = "0.4.18", features = ["tokio", "zstd"] } chrono = "0.4.39" clap = { version = "4.5.23", features = ["derive"] } -config = { version = "0.14.1", features = ["yaml"] } +config = { version = "0.15.7", features = ["yaml"] } log = "0.4.22" -nostr-relay-builder = { git = "https://github.com/rust-nostr/nostr.git", rev = "f21ffbd2de4e9ad87cd8345158039754cee05031", package = "nostr-relay-builder" } pretty_env_logger = "0.5.0" sled = "0.34.7" -tokio = { version = "1.42.0", features = ["macros", "fs", "rt", "rt-multi-thread", "signal"] } -warp = "0.3.7" +tokio = { version = "1.42.0", features = ["macros", "fs", "rt", "rt-multi-thread"] } serde = { version = "1.0.216", features = ["derive"] } -nostr-sdk = { git = "https://github.com/rust-nostr/nostr.git", rev = "f21ffbd2de4e9ad87cd8345158039754cee05031", package = "nostr-sdk" } +hyper = { version = "1.5", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } +base64 = "0.22.1" +itertools = "0.14.0" + +nostr-relay-builder = { git = "https://github.com/rust-nostr/nostr.git", rev = "b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0", package = "nostr-relay-builder" } +nostr-sdk = { git = "https://github.com/rust-nostr/nostr.git", rev = "b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0", package = "nostr-sdk" } +nostr = { git = "https://github.com/rust-nostr/nostr.git", rev = "b5b6c0d422ad3f99d479768bbb011ac7a7cc68f0", package = "nostr" } +http-body-util = "0.1.2" +tokio-util = { version = "0.7.13", features = ["io"] } diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..d4970c1 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,170 @@ +use crate::writer::FlatFileWriter; +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use log::debug; +use nostr::prelude::{BoxedFuture, CoordinateBorrow}; +use nostr::{Event, EventId, Filter, Timestamp}; +use nostr_relay_builder::prelude::NostrDatabaseWipe; +use nostr_sdk::prelude::{ + Backend, DatabaseError, DatabaseEventStatus, Events, NostrDatabase, NostrEventsDatabase, + RejectedReason, SaveEventStatus, +}; +use std::fmt::{Debug, Formatter}; +use std::fs::create_dir_all; +use std::io::{Error, ErrorKind}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct FlatFileDatabase { + out_dir: PathBuf, + database: sled::Db, + file: Arc>, +} + +impl Debug for FlatFileDatabase { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct ArchiveFile { + pub path: PathBuf, + pub size: u64, + pub created: DateTime, +} + +impl FlatFileDatabase { + pub fn new(dir: PathBuf) -> Result { + create_dir_all(&dir)?; + let db = sled::open(dir.join("index"))?; + Ok(Self { + out_dir: dir.clone(), + database: db, + file: Arc::new(Mutex::new(FlatFileWriter { + dir, + current_date: Utc::now(), + current_handle: None, + })), + }) + } + + pub async fn write_event(&self, ev: &Event) -> Result<()> { + self.file.lock().await.write_event(ev).await + } + + pub async fn list_files(&self) -> Result> { + let mut list = tokio::fs::read_dir(&self.out_dir).await?; + let mut files = Vec::new(); + while let Ok(Some(entry)) = list.next_entry().await { + if entry.file_type().await?.is_dir() { + continue; + } + + let meta = entry.metadata().await?; + files.push(ArchiveFile { + path: entry.path(), + size: meta.len(), + created: meta.created()?.into(), + }); + } + Ok(files) + } + + /// Return archive file if it exists + pub fn get_file(&self, path: &str) -> Result { + let p = self.out_dir.join(&path[1..]); + if p.exists() && p.is_file() { + let meta = p.metadata()?; + Ok(ArchiveFile { + path: p, + size: meta.len(), + created: meta.created()?.into(), + }) + } else { + Err(anyhow!("No such file or directory")) + } + } +} + +impl NostrEventsDatabase for FlatFileDatabase { + fn save_event<'a>( + &'a self, + event: &'a Event, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + match self.check_id(&event.id).await? { + DatabaseEventStatus::NotExistent => { + self.database + .insert(event.id, &[]) + .map_err(|e| DatabaseError::Backend(Box::new(e)))?; + + self.write_event(event).await.map_err(|e| { + DatabaseError::Backend(Box::new(Error::new(ErrorKind::Other, e))) + })?; + debug!("Saved event: {}", event.id); + Ok(SaveEventStatus::Success) + } + _ => Ok(SaveEventStatus::Rejected(RejectedReason::Duplicate)), + } + }) + } + + fn check_id<'a>( + &'a self, + event_id: &'a EventId, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + if self + .database + .contains_key(event_id) + .map_err(|e| DatabaseError::Backend(Box::new(e)))? + { + Ok(DatabaseEventStatus::Saved) + } else { + Ok(DatabaseEventStatus::NotExistent) + } + }) + } + + fn has_coordinate_been_deleted( + &self, + _coordinate: &CoordinateBorrow, + _timestamp: &Timestamp, + ) -> BoxedFuture> { + Box::pin(async move { Ok(false) }) + } + + fn event_by_id( + &self, + _event_id: &EventId, + ) -> BoxedFuture, DatabaseError>> { + Box::pin(async move { Ok(None) }) + } + + fn count(&self, _filters: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(0) }) + } + + fn query(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(Events::new(&filter)) }) + } + + fn delete(&self, _filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(()) }) + } +} + +impl NostrDatabaseWipe for FlatFileDatabase { + fn wipe(&self) -> BoxedFuture> { + Box::pin(async move { Ok(()) }) + } +} + +impl NostrDatabase for FlatFileDatabase { + fn backend(&self) -> Backend { + Backend::Custom("FlatFileDatabase".to_string()) + } +} diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..5b9064b --- /dev/null +++ b/src/http.rs @@ -0,0 +1,175 @@ +use crate::db::FlatFileDatabase; +use base64::prelude::*; +use http_body_util::Either; +use hyper::body::{Body, Bytes, Frame, Incoming}; +use hyper::header::{CONNECTION, SEC_WEBSOCKET_ACCEPT, UPGRADE}; +use hyper::service::Service; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use itertools::Itertools; +use log::error; +use nostr::hashes::sha1::Hash as Sha1Hash; +use nostr::hashes::{Hash, HashEngine}; +use nostr_relay_builder::LocalRelay; +use nostr_sdk::prelude::StreamExt; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::fs::File; +use tokio_util::io::ReaderStream; + +pub(crate) struct HttpServer { + relay: LocalRelay, + db: FlatFileDatabase, + remote: SocketAddr, +} + +/// Copied from https://github.com/snapview/tungstenite-rs/blob/c16778797b2eeb118aa064aa5b483f90c3989627/src/handshake/mod.rs#L112C1-L125C1 +/// Derive the `Sec-WebSocket-Accept` response header from a `Sec-WebSocket-Key` request header. +/// +/// This function can be used to perform a handshake before passing a raw TCP stream to +/// [`WebSocket::from_raw_socket`][crate::protocol::WebSocket::from_raw_socket]. +pub fn derive_accept_key(request_key: &[u8]) -> String { + // ... field is constructed by concatenating /key/ ... + // ... with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455) + const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + let mut engine = Sha1Hash::engine(); + engine.input(request_key); + engine.input(WS_GUID); + let hash: Sha1Hash = Sha1Hash::from_engine(engine); + BASE64_STANDARD.encode(hash) +} + +impl HttpServer { + pub fn new(relay: LocalRelay, db: FlatFileDatabase, remote: SocketAddr) -> Self { + HttpServer { relay, db, remote } + } +} + +impl Service> for HttpServer { + type Response = Response>; + type Error = String; + type Future = Pin> + Send>>; + + fn call(&self, req: Request) -> Self::Future { + let base = Response::builder() + .header("server", "nostr-relay-builder") + .status(404); + + // check is upgrade + if let (Some(c), Some(w)) = ( + req.headers().get("connection"), + req.headers().get("upgrade"), + ) { + if c.to_str() + .map(|s| s.to_lowercase() == "upgrade") + .unwrap_or(false) + && w.to_str() + .map(|s| s.to_lowercase() == "websocket") + .unwrap_or(false) + { + let key = req.headers().get("sec-websocket-key"); + let derived = key.map(|k| derive_accept_key(k.as_bytes())); + + let addr = self.remote; + let relay = self.relay.clone(); + tokio::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + if let Err(e) = + relay.take_connection(TokioIo::new(upgraded), addr).await + { + error!("{}", e); + } + } + Err(e) => error!("{}", e), + } + }); + return Box::pin(async move { + Ok(base + .status(101) + .header(CONNECTION, "upgrade") + .header(UPGRADE, "websocket") + .header(SEC_WEBSOCKET_ACCEPT, derived.unwrap()) + .body(Either::Left(String::new())) + .unwrap()) + }); + } + } + + // Check path is file path to serve file + let path = req.uri().path(); + if path != "/" && path != "/index.html" { + if let Ok(f) = self.db.get_file(path) { + Box::pin(async move { + File::open(f.path) + .await + .map(|h| { + base.status(200) + .header("content-type", "application/octet-stream") + .header("content-length", f.size.to_string()) + .body(Either::Right(ArchiveFileReader { + handle: ReaderStream::new(h), + })) + .unwrap() + }) + .map_err(|_| "Failed to open file".to_owned()) + }) + } else { + Box::pin(async move { Ok(base.body(Either::Left(String::new())).unwrap()) }) + } + } else { + // serve landing page otherwise + let template = include_str!("./index.html"); + let db = self.db.clone(); + Box::pin(async move { + let files: Vec = db + .list_files() + .await + .unwrap() + .iter() + .sorted_by(|a, b| b.created.cmp(&a.created)) + .map(|f| { + let name = f.path.file_name().unwrap().to_str().unwrap(); + format!( + "{} ({}M)", + name, + name, + f.size / 1024 / 1024 + ) + }) + .collect(); + + Ok(base + .status(200) + .header("content-type", "text/html") + .body(Either::Left( + template.replace("%%_LINKS_%%", &files.join("\n")), + )) + .unwrap()) + }) + } + } +} + +pub struct ArchiveFileReader { + pub handle: ReaderStream, +} + +impl Body for ArchiveFileReader { + type Data = Bytes; + type Error = String; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.handle.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(Frame::data(data)))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.to_string()))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/main.rs b/src/main.rs index 94863dc..f1ea456 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,255 +1,28 @@ +use crate::db::FlatFileDatabase; +use crate::http::HttpServer; +use crate::policy::{KindPolicy, NoQuery}; use anyhow::Result; -use async_compression::tokio::write::ZstdEncoder; -use chrono::{DateTime, Utc}; use clap::Parser; -use config::{Config, ConfigBuilder}; -use log::{debug, error, info}; -use nostr_relay_builder::builder::{PolicyResult, QueryPolicy, RateLimit, WritePolicy}; -use nostr_relay_builder::prelude::{ - async_trait, Backend, Coordinate, DatabaseError, DatabaseEventStatus, Event, EventId, Events, - JsonUtil, Kind, NostrDatabase, RejectedReason, Timestamp, -}; -use nostr_relay_builder::prelude::{ - Filter as RelayFilter, NostrEventsDatabase, RelayUrl, SaveEventStatus, -}; +use config::Config; +use hyper::server::conn::http1; +use hyper_util::rt::TokioIo; +use log::{error, info}; +use nostr_relay_builder::builder::RateLimit; +use nostr_relay_builder::prelude::Kind; +use nostr_relay_builder::prelude::NostrEventsDatabase; use nostr_relay_builder::{LocalRelay, RelayBuilder}; -use nostr_sdk::prelude::StreamExt; -use nostr_sdk::{Client, FilterOptions}; +use nostr_sdk::prelude::{ReqExitPolicy, StreamExt}; +use nostr_sdk::Client; use serde::Deserialize; -use std::collections::HashSet; -use std::fmt::{Debug, Formatter}; -use std::fs::create_dir_all; -use std::io::{Error, ErrorKind}; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::Mutex; -use warp::reply::html; -use warp::Filter; +use tokio::net::TcpListener; -#[derive(Debug)] -struct NoQuery; - -#[async_trait] -impl QueryPolicy for NoQuery { - async fn admit_query(&self, _query: &[RelayFilter], _addr: &SocketAddr) -> PolicyResult { - PolicyResult::Reject("queries not allowed".to_string()) - } -} - -#[derive(Debug)] -struct KindPolicy(HashSet); - -#[async_trait] -impl WritePolicy for KindPolicy { - async fn admit_event(&self, event: &Event, _addr: &SocketAddr) -> PolicyResult { - if self.0.contains(&event.kind) { - PolicyResult::Accept - } else { - PolicyResult::Reject("Kind not accepted".to_string()) - } - } -} - -struct FlatFileWriter { - pub dir: PathBuf, - pub current_date: DateTime, - pub current_handle: Option<(PathBuf, File)>, -} - -impl FlatFileWriter { - /// Spawn a task to compress a file - async fn compress_file(file: PathBuf) -> Result<()> { - let out_path = file.with_extension("jsonl.zstd"); - let mut in_file = File::open(file.clone()).await?; - { - let out_file = File::create(out_path.clone()).await?; - let mut enc = ZstdEncoder::new(out_file); - let mut buf: [u8; 1024] = [0; 1024]; - while let Ok(n) = in_file.read(&mut buf).await { - if n == 0 { - break; - } - enc.write_all(&buf[..n]).await?; - } - enc.shutdown().await?; - } - - let in_size = in_file.metadata().await?.len(); - let out_size = File::open(out_path).await?.metadata().await?.len(); - drop(in_file); - tokio::fs::remove_file(file).await?; - info!( - "Compressed file ratio={:.2}x, size={}M", - in_size as f32 / out_size as f32, - out_size as f32 / 1024.0 / 1024.0 - ); - - Ok(()) - } - - /// Write event to the current file handle, or move to the next file handle - async fn write_event(&mut self, ev: &Event) -> Result<()> { - const EVENT_FORMAT: &str = "%Y%m%d"; - let now = Utc::now(); - if self.current_date.format(EVENT_FORMAT).to_string() - != now.format(EVENT_FORMAT).to_string() - { - if let Some((path, ref mut handle)) = self.current_handle.take() { - handle.flush().await?; - info!("Closing file {:?}", &path); - tokio::spawn(async move { - if let Err(e) = Self::compress_file(path).await { - error!("Failed to compress file: {}", e); - } - }); - } - - // open new file - self.current_date = now; - } - - if self.current_handle.is_none() { - let path = self.dir.join(format!( - "events_{}.jsonl", - self.current_date.format(EVENT_FORMAT) - )); - info!("Creating file {:?}", &path); - self.current_handle = Some(( - path.clone(), - OpenOptions::new() - .append(true) - .create(true) - .open(path) - .await?, - )); - } - - if let Some((_path, ref mut handle)) = self.current_handle.as_mut() { - handle.write_all(ev.as_json().as_bytes()).await?; - handle.write(b"\n").await?; - } - Ok(()) - } -} - -#[derive(Clone)] -struct FlatFileDatabase { - database: sled::Db, - dir: PathBuf, - file: Arc>, -} - -impl Debug for FlatFileDatabase { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Ok(()) - } -} - -impl FlatFileDatabase { - pub fn new(dir: PathBuf) -> Result { - create_dir_all(&dir)?; - let db = sled::open(dir.join("index"))?; - Ok(Self { - dir: dir.clone(), - database: db, - file: Arc::new(Mutex::new(FlatFileWriter { - dir, - current_date: Utc::now(), - current_handle: None, - })), - }) - } - - pub async fn write_event(&self, ev: &Event) -> Result<()> { - self.file.lock().await.write_event(ev).await - } -} - -#[async_trait] -impl NostrEventsDatabase for FlatFileDatabase { - async fn save_event(&self, event: &Event) -> Result { - match self.check_id(&event.id).await? { - DatabaseEventStatus::NotExistent => { - self.database - .insert(&event.id, &[]) - .map_err(|e| DatabaseError::Backend(Box::new(e)))?; - - self.write_event(event).await.map_err(|e| { - DatabaseError::Backend(Box::new(Error::new(ErrorKind::Other, e))) - })?; - debug!("Saved event: {}", event.id); - Ok(SaveEventStatus::Success) - } - _ => Ok(SaveEventStatus::Rejected(RejectedReason::Duplicate)), - } - } - - async fn check_id(&self, event_id: &EventId) -> Result { - if self - .database - .contains_key(event_id) - .map_err(|e| DatabaseError::Backend(Box::new(e)))? - { - Ok(DatabaseEventStatus::Saved) - } else { - Ok(DatabaseEventStatus::NotExistent) - } - } - - async fn has_coordinate_been_deleted( - &self, - coordinate: &Coordinate, - timestamp: &Timestamp, - ) -> Result { - Ok(false) - } - - async fn event_id_seen( - &self, - event_id: EventId, - relay_url: RelayUrl, - ) -> Result<(), DatabaseError> { - Ok(()) - } - - async fn event_seen_on_relays( - &self, - event_id: &EventId, - ) -> Result>, DatabaseError> { - Ok(None) - } - - async fn event_by_id(&self, event_id: &EventId) -> Result, DatabaseError> { - Ok(None) - } - - async fn count(&self, filters: Vec) -> Result { - Ok(0) - } - - async fn query(&self, filters: Vec) -> Result { - Ok(Events::new(&[])) - } - - async fn delete(&self, filter: RelayFilter) -> Result<(), DatabaseError> { - Ok(()) - } -} - -#[async_trait] -impl NostrDatabase for FlatFileDatabase { - fn backend(&self) -> Backend { - Backend::Custom("FlatFileDatabase".to_string()) - } - - async fn wipe(&self) -> Result<(), DatabaseError> { - Ok(()) - } -} +mod db; +mod http; +mod policy; +mod writer; #[derive(Parser)] #[command(version, about)] @@ -300,9 +73,9 @@ async fn main() -> Result<()> { let mut stream = client .pool() .stream_events( - vec![nostr_sdk::Filter::new().limit(100)], + nostr_sdk::Filter::new().limit(100), never, - FilterOptions::WaitDurationAfterEOSE(never), + ReqExitPolicy::WaitDurationAfterEOSE(never), ) .await?; let db = db.clone(); @@ -314,61 +87,36 @@ async fn main() -> Result<()> { } }); } - let mut b = RelayBuilder::default() + + let mut builder = RelayBuilder::default() .database(db.clone()) - .addr(addr.ip()) - .port(addr.port()) .query_policy(NoQuery) .rate_limit(RateLimit { max_reqs: 20, notes_per_minute: 100_000, }); - - if let Some(k) = config.kinds { - b = b.write_policy(KindPolicy( + if let Some(k) = &config.kinds { + builder = builder.write_policy(KindPolicy::new( k.iter().map(|k| Kind::Custom(*k as u16)).collect(), )); } + let relay = LocalRelay::new(builder).await?; - let relay = LocalRelay::run(b).await?; + let listener = TcpListener::bind(&addr).await?; + info!("Listening on {}", &addr); + loop { + let (socket, addr) = listener.accept().await?; - info!("Relay started on: {}", relay.url()); - - let template = include_str!("./index.html"); - let f = warp::get() - .and(warp::path::end()) - .then(move || async move { - let mut list = tokio::fs::read_dir("./data").await.unwrap(); - let mut files = Vec::new(); - while let Ok(Some(entry)) = list.next_entry().await { - if entry.file_type().await.unwrap().is_dir() { - continue; - } - - let ff = entry - .path() - .file_name() - .unwrap() - .to_string_lossy() - .to_string(); - let fs = entry.metadata().await.unwrap().len(); - files.push(format!("{ff} ({}M)", fs / 1024 / 1024)); + let io = TokioIo::new(socket); + let server = HttpServer::new(relay.clone(), db.clone(), addr); + tokio::spawn(async move { + if let Err(e) = http1::Builder::new() + .serve_connection(io, server) + .with_upgrades() + .await + { + error!("Failed to handle request: {}", e); } - html(template.replace("%%_LINKS_%%", &files.join("\n"))) - }) - .or(warp::fs::dir(out_dir)); - - let addr: SocketAddr = "0.0.0.0:8000".parse()?; - let (addr, fut) = warp::serve(f).bind_with_graceful_shutdown(addr, async move { - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .unwrap() - .recv() - .await - .expect("failed to listen to shutdown signal"); - }); - info!("Listening on http://{}", addr); - fut.await; - relay.shutdown(); - - Ok(()) + }); + } } diff --git a/src/policy.rs b/src/policy.rs new file mode 100644 index 0000000..7e2b0ae --- /dev/null +++ b/src/policy.rs @@ -0,0 +1,39 @@ +use nostr::prelude::BoxedFuture; +use nostr::{Event, Filter, Kind}; +use nostr_relay_builder::prelude::{PolicyResult, QueryPolicy, WritePolicy}; +use std::collections::HashSet; +use std::net::SocketAddr; + +#[derive(Debug)] +pub struct NoQuery; + +impl QueryPolicy for NoQuery { + fn admit_query(&self, _query: &Filter, _addr: &SocketAddr) -> BoxedFuture { + Box::pin(async move { PolicyResult::Reject("queries not allowed".to_string()) }) + } +} + +#[derive(Debug)] +pub struct KindPolicy(HashSet); + +impl KindPolicy { + pub fn new(kinds: HashSet) -> Self { + Self(kinds) + } +} + +impl WritePolicy for KindPolicy { + fn admit_event<'a>( + &'a self, + event: &'a Event, + _addr: &SocketAddr, + ) -> BoxedFuture<'a, PolicyResult> { + Box::pin(async move { + if self.0.contains(&event.kind) { + PolicyResult::Accept + } else { + PolicyResult::Reject("Kind not accepted".to_string()) + } + }) + } +} diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..4c42d65 --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,90 @@ +use anyhow::Result; +use async_compression::tokio::write::ZstdEncoder; +use chrono::{DateTime, Utc}; +use log::{error, info}; +use nostr::{Event, JsonUtil}; +use std::path::PathBuf; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub struct FlatFileWriter { + pub dir: PathBuf, + pub current_date: DateTime, + pub current_handle: Option<(PathBuf, File)>, +} + +impl FlatFileWriter { + /// Spawn a task to compress a file + async fn compress_file(file: PathBuf) -> Result<()> { + let out_path = file.with_extension("jsonl.zstd"); + let mut in_file = File::open(file.clone()).await?; + { + let out_file = File::create(out_path.clone()).await?; + let mut enc = ZstdEncoder::new(out_file); + let mut buf: [u8; 1024] = [0; 1024]; + while let Ok(n) = in_file.read(&mut buf).await { + if n == 0 { + break; + } + enc.write_all(&buf[..n]).await?; + } + enc.shutdown().await?; + } + + let in_size = in_file.metadata().await?.len(); + let out_size = File::open(out_path).await?.metadata().await?.len(); + drop(in_file); + tokio::fs::remove_file(file).await?; + info!( + "Compressed file ratio={:.2}x, size={}M", + in_size as f32 / out_size as f32, + out_size as f32 / 1024.0 / 1024.0 + ); + + Ok(()) + } + + /// Write event to the current file handle, or move to the next file handle + pub(crate) async fn write_event(&mut self, ev: &Event) -> Result<()> { + const EVENT_FORMAT: &str = "%Y%m%d"; + let now = Utc::now(); + if self.current_date.format(EVENT_FORMAT).to_string() + != now.format(EVENT_FORMAT).to_string() + { + if let Some((path, ref mut handle)) = self.current_handle.take() { + handle.flush().await?; + info!("Closing file {:?}", &path); + tokio::spawn(async move { + if let Err(e) = Self::compress_file(path).await { + error!("Failed to compress file: {}", e); + } + }); + } + + // open new file + self.current_date = now; + } + + if self.current_handle.is_none() { + let path = self.dir.join(format!( + "events_{}.jsonl", + self.current_date.format(EVENT_FORMAT) + )); + info!("Creating file {:?}", &path); + self.current_handle = Some(( + path.clone(), + OpenOptions::new() + .append(true) + .create(true) + .open(path) + .await?, + )); + } + + if let Some((_path, ref mut handle)) = self.current_handle.as_mut() { + handle.write_all(ev.as_json().as_bytes()).await?; + handle.write(b"\n").await?; + } + Ok(()) + } +}