tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 9cdaf9dc25a60c48e7da6d9ec5db32ad4b9935b0
parent dfc34f3f6f9280046ed85200e783ff74cdb8232e
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 01:57:50 -0700

cli: add run command

Diffstat:
MCargo.lock | 170++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle/Cargo.toml | 11++++++++++-
Mcrates/tangle/src/lib.rs | 22++++++++++++++++++++--
Mcrates/tangle/src/main.rs | 16++++++++++++++++
Acrates/tangle/tests/run_integration.rs | 308+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/Cargo.toml | 2+-
Mcrates/tangle_runtime/src/lib.rs | 496++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/tangle_store_surreal/Cargo.toml | 2+-
Mcrates/tangle_store_surreal/src/lib.rs | 56+++++++++++++++++++++++++++++++++++++++++++++++++++++++-
9 files changed, 1060 insertions(+), 23 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -351,6 +351,24 @@ dependencies = [ ] [[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "itertools 0.11.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex 1.3.0", + "syn 2.0.117", +] + +[[package]] name = "bitflags" version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -510,6 +528,16 @@ dependencies = [ ] [[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + +[[package]] name = "castaway" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -527,7 +555,16 @@ dependencies = [ "find-msvc-tools", "jobserver", "libc", - "shlex", + "shlex 2.0.1", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", ] [[package]] @@ -594,6 +631,17 @@ dependencies = [ ] [[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] name = "cmake" version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1389,6 +1437,12 @@ dependencies = [ ] [[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + +[[package]] name = "group" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2047,12 +2101,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.1", +] + +[[package]] name = "libm" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] +name = "libz-sys" +version = "1.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bc9657773828b90eeb625adff10eeac83cc21bbfd8e23a03eaa8a33c9e28d9" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + +[[package]] name = "linux-raw-sys" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2171,6 +2246,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] name = "mio" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2236,6 +2317,16 @@ dependencies = [ ] [[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] name = "ntapi" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2650,6 +2741,12 @@ dependencies = [ ] [[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + +[[package]] name = "portable-atomic" version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3212,6 +3309,12 @@ dependencies = [ ] [[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + +[[package]] name = "rustc_version" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3421,6 +3524,12 @@ dependencies = [ [[package]] name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "shlex" version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" @@ -3432,6 +3541,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7f45b8998ced5134fb1d75732c77842a3e888f19c1ff98481822e8fbfbf930b" [[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] name = "signature" version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3709,6 +3828,7 @@ dependencies = [ "subtle", "surrealdb-collections", "surrealdb-protocol", + "surrealdb-rocksdb", "surrealdb-strand", "surrealdb-types", "surrealmx", @@ -3729,6 +3849,21 @@ dependencies = [ ] [[package]] +name = "surrealdb-librocksdb-sys" +version = "0.18.3+11.0.0-4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b82a18c7fa4b57206784a1a31e7b942ae1d3e24493e0c733019a409b2b4bea" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] name = "surrealdb-protocol" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3753,6 +3888,16 @@ dependencies = [ ] [[package]] +name = "surrealdb-rocksdb" +version = "0.24.0-surreal.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e8c3a004982458af159bcbf369e41663d538cd4a291a49c0d4a2fb373cbb7e" +dependencies = [ + "libc", + "surrealdb-librocksdb-sys", +] + +[[package]] name = "surrealdb-strand" version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3884,8 +4029,14 @@ dependencies = [ name = "tangle" version = "0.1.0" dependencies = [ + "futures-util", + "serde_json", + "tangle_protocol", "tangle_runtime", + "tangle_store_surreal", + "tangle_test_support", "tokio", + "tokio-tungstenite 0.29.0", ] [[package]] @@ -4104,6 +4255,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -4473,6 +4625,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1982d899e57d646498709735f16e9224cf1e8680676ad687f930cf8b5b555ae" [[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -5163,3 +5321,13 @@ name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/crates/tangle/Cargo.toml b/crates/tangle/Cargo.toml @@ -10,7 +10,16 @@ readme = "../../README" [dependencies] tangle_runtime = { path = "../tangle_runtime" } -tokio = { version = "1", features = ["rt"] } +tokio = { version = "1", features = ["rt", "signal"] } + +[dev-dependencies] +futures-util = "0.3" +serde_json = "1" +tangle_protocol = { path = "../tangle_protocol" } +tangle_store_surreal = { path = "../tangle_store_surreal" } +tangle_test_support = { path = "../tangle_test_support" } +tokio = { version = "1", features = ["macros", "rt", "time"] } +tokio-tungstenite = "0.29" [lints] workspace = true diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -39,7 +39,7 @@ impl TangleCommand { } pub fn implemented(self) -> bool { - matches!(self, Self::Version | Self::Help | Self::Migrate) + matches!(self, Self::Version | Self::Help | Self::Migrate | Self::Run) } } @@ -201,6 +201,21 @@ pub async fn migrate_with_config(path: &str) -> Result<String, String> { Ok(migrate_output(report)) } +pub async fn run_with_config(path: &str) -> Result<(), String> { + let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?; + let (shutdown, _) = tangle_runtime::GracefulShutdownSignal::new(); + let signal = shutdown.clone(); + tokio::spawn(async move { + if tokio::signal::ctrl_c().await.is_ok() { + signal.request_shutdown(); + } + }); + tangle_runtime::run_runtime_server(config, shutdown) + .await + .map(|_| ()) + .map_err(|error| error.to_string()) +} + #[cfg(test)] mod tests { use super::{ @@ -257,7 +272,10 @@ mod tests { expected.implemented(), matches!( expected, - TangleCommand::Version | TangleCommand::Help | TangleCommand::Migrate + TangleCommand::Version + | TangleCommand::Help + | TangleCommand::Migrate + | TangleCommand::Run ) ); } diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs @@ -31,6 +31,13 @@ fn main() -> ExitCode { ExitCode::from(2) } }, + tangle::TangleCommand::Run => match run_server(&invocation) { + Ok(()) => ExitCode::SUCCESS, + Err(error) => { + eprintln!("{error}"); + ExitCode::from(2) + } + }, command => { eprintln!("command not implemented: {}", command.as_str()); ExitCode::from(2) @@ -46,3 +53,12 @@ fn run_migrate(invocation: &tangle::TangleInvocation) -> Result<String, String> .map_err(|error| format!("failed to start runtime: {error}"))?; runtime.block_on(tangle::migrate_with_config(config_path)) } + +fn run_server(invocation: &tangle::TangleInvocation) -> Result<(), String> { + let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|error| format!("failed to start runtime: {error}"))?; + runtime.block_on(tangle::run_with_config(config_path)) +} diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs @@ -0,0 +1,308 @@ +#![forbid(unsafe_code)] + +use futures_util::{SinkExt, StreamExt}; +use serde_json::Value; +use std::fs; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::Path; +use std::process::{Child, Command, Stdio}; +use std::time::{Duration, Instant}; +use tangle_protocol::event_to_value; +use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore}; +use tangle_test_support::{ + FixtureKey, auth_event_spec, build_fixture_event, valid_public_listing_spec, +}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +#[tokio::test] +async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { + let port = free_port(); + let root = std::env::temp_dir().join(format!( + "tangle-run-integration-{}-{port}", + std::process::id() + )); + let db_path = root.join("surrealdb"); + let config_path = root.join("runtime.json"); + fs::create_dir_all(&root).expect("runtime root"); + write_runtime_config(&config_path, &db_path, port); + + let mut relay = Command::new(env!("CARGO_BIN_EXE_tangle")) + .args(["run", "--config"]) + .arg(&config_path) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .spawn() + .expect("spawn tangle run"); + + wait_for_http(port, &mut relay); + let nip11 = http_get(port, "/"); + assert!(nip11.contains("200 OK")); + assert!(nip11.contains("application/nostr+json")); + assert!(nip11.contains("\"supported_nips\"")); + + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let auth = build_fixture_event(&auth_event_spec()).expect("auth"); + let seller = FixtureKey::Seller.public_key(); + + let (mut subscriber, _) = connect_async(format!("ws://127.0.0.1:{port}/ws")) + .await + .expect("subscriber connect"); + assert_eq!(next_label(&mut subscriber).await, "AUTH"); + subscriber + .send(Message::Text( + serde_json::json!([ + "REQ", + "sub-live", + { + "kinds": [30402], + "authors": [seller.as_str()] + } + ]) + .to_string() + .into(), + )) + .await + .expect("subscribe"); + assert_eq!(next_label(&mut subscriber).await, "EOSE"); + + let (mut publisher, _) = connect_async(format!("ws://127.0.0.1:{port}/ws")) + .await + .expect("publisher connect"); + assert_eq!(next_label(&mut publisher).await, "AUTH"); + publisher + .send(Message::Text( + serde_json::json!(["AUTH", event_to_value(&auth)]) + .to_string() + .into(), + )) + .await + .expect("auth send"); + assert_ok(&next_json(&mut publisher).await, true); + + publisher + .send(Message::Text( + serde_json::json!(["EVENT", event_to_value(&listing)]) + .to_string() + .into(), + )) + .await + .expect("event send"); + assert_ok(&next_json(&mut publisher).await, true); + let live = next_json(&mut subscriber).await; + assert_eq!(live[0], "EVENT"); + assert_eq!(live[1], "sub-live"); + assert_eq!(live[2]["id"], listing.id().as_str()); + + publisher + .send(Message::Text( + serde_json::json!(["REQ", "sub-fetch", { "ids": [listing.id().as_str()] }]) + .to_string() + .into(), + )) + .await + .expect("fetch send"); + let fetched = next_json(&mut publisher).await; + assert_eq!(fetched[0], "EVENT"); + assert_eq!(fetched[1], "sub-fetch"); + assert_eq!(fetched[2]["id"], listing.id().as_str()); + assert_eq!(next_label(&mut publisher).await, "EOSE"); + + subscriber + .send(Message::Text( + serde_json::json!(["CLOSE", "sub-live"]).to_string().into(), + )) + .await + .expect("close send"); + + let listings = http_get(port, "/api/listings?limit=5"); + assert!(listings.contains("200 OK")); + assert!(listings.contains("Carrot bunches")); + assert!(listings.contains(listing.id().as_str())); + let detail = http_get( + port, + &format!("/api/listings/{}/listing-a", seller.as_str()), + ); + assert!(detail.contains("200 OK")); + assert!(detail.contains("listing-a")); + assert!(detail.contains("Carrot bunches")); + let search = http_get(port, "/api/search?q=carrots&limit=5"); + assert!(search.contains("200 OK")); + assert!(search.contains(listing.id().as_str())); + let seller_detail = http_get(port, &format!("/api/sellers/{}", seller.as_str())); + assert!(seller_detail.contains("200 OK")); + assert!(seller_detail.contains(seller.as_str())); + + stop_relay(relay); + + let store_config = + SurrealConnectionConfig::rocksdb(db_path.to_str().expect("db path"), "tangle_it", "relay") + .expect("store config"); + let store = reopen_store(&store_config).await; + let listing_key = format!("30402:{}:listing-a", seller.as_str()); + assert!( + store + .raw_event_row(listing.id()) + .await + .expect("raw row") + .is_some() + ); + assert!( + store + .listing_current_row(&listing_key) + .await + .expect("listing row") + .is_some() + ); + assert!( + store + .search_document_row(&listing_key) + .await + .expect("search row") + .is_some() + ); + + drop(store); + fs::remove_dir_all(&root).expect("remove runtime root"); +} + +fn write_runtime_config(path: &Path, db_path: &Path, port: u16) { + let config = serde_json::json!({ + "server": { + "listen_addr": format!("127.0.0.1:{port}"), + "relay_url": "wss://relay.radroots.test" + }, + "database": { + "mode": "rocks_db", + "path": db_path.to_str().expect("db path"), + "namespace": "tangle_it", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + }, + "policy": { + "approved_sellers": [FixtureKey::Seller.public_key().as_str()] + } + }); + fs::write( + path, + serde_json::to_string_pretty(&config).expect("config JSON"), + ) + .expect("write config"); +} + +fn free_port() -> u16 { + TcpListener::bind("127.0.0.1:0") + .expect("bind port") + .local_addr() + .expect("local addr") + .port() +} + +fn wait_for_http(port: u16, child: &mut Child) { + let started = Instant::now(); + loop { + if let Ok(response) = try_http_get(port, "/healthz") + && response.contains("200 OK") + { + return; + } + if let Some(status) = child.try_wait().expect("child status") { + panic!("relay exited before readiness: {status}"); + } + assert!( + started.elapsed() < Duration::from_secs(10), + "relay did not open port {port}" + ); + std::thread::sleep(Duration::from_millis(50)); + } +} + +fn http_get(port: u16, path: &str) -> String { + try_http_get(port, path).expect("http get") +} + +fn try_http_get(port: u16, path: &str) -> Result<String, std::io::Error> { + let mut stream = TcpStream::connect(("127.0.0.1", port))?; + stream.set_read_timeout(Some(Duration::from_secs(2)))?; + stream.set_write_timeout(Some(Duration::from_secs(2)))?; + write!( + stream, + "GET {path} HTTP/1.1\r\nHost: 127.0.0.1:{port}\r\nAccept: application/nostr+json\r\nConnection: close\r\n\r\n" + )?; + let mut response = String::new(); + stream.read_to_string(&mut response)?; + Ok(response) +} + +async fn reopen_store(config: &SurrealConnectionConfig) -> SurrealStore { + let started = Instant::now(); + loop { + match SurrealStore::connect_local(config).await { + Ok(store) => return store, + Err(error) if started.elapsed() < Duration::from_secs(5) => { + let _ = error; + tokio::time::sleep(Duration::from_millis(50)).await; + } + Err(error) => panic!("store reopen failed: {error}"), + } + } +} + +async fn next_json( + socket: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, + >, +) -> Value { + let message = socket + .next() + .await + .expect("websocket message") + .expect("websocket frame"); + let text = message.into_text().expect("text frame"); + serde_json::from_str(&text).expect("relay JSON") +} + +async fn next_label( + socket: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, + >, +) -> String { + next_json(socket).await[0] + .as_str() + .expect("label") + .to_owned() +} + +fn assert_ok(message: &Value, accepted: bool) { + assert_eq!(message[0], "OK"); + assert_eq!(message[2], accepted); +} + +fn stop_relay(mut relay: Child) { + stop_child(&mut relay); + let status = relay.wait().expect("relay exit"); + assert!(status.success()); +} + +#[cfg(unix)] +fn stop_child(relay: &mut Child) { + let status = Command::new("kill") + .args(["-INT", &relay.id().to_string()]) + .status() + .expect("send interrupt"); + assert!(status.success()); +} + +#[cfg(not(unix))] +fn stop_child(relay: &mut Child) { + relay.kill().expect("kill relay"); +} diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -17,7 +17,7 @@ tangle_nips = { path = "../tangle_nips" } tangle_protocol = { path = "../tangle_protocol" } tangle_store = { path = "../tangle_store" } tangle_store_surreal = { path = "../tangle_store_surreal" } -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["net", "sync"] } url = "2" [dev-dependencies] diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -2,7 +2,7 @@ use axum::{ Json, Router, - extract::ws::WebSocketUpgrade, + extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::{Path, RawQuery, State}, response::{IntoResponse, Response}, routing::get, @@ -10,13 +10,23 @@ use axum::{ use core::fmt; use http::{HeaderMap, HeaderValue, StatusCode, header}; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeSet, fs, net::SocketAddr, path::Path as FsPath}; +use std::{ + collections::BTreeSet, + fs, + net::SocketAddr, + path::Path as FsPath, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{SystemTime, UNIX_EPOCH}, +}; use tangle_core::{ - AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter, - MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec, - MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RateLimitDecision, - RuntimeLimitValues, RuntimeLimits, SubscriptionCloseOutcome, SubscriptionManager, - SubscriptionMatcher, + AdmissionContext, AdmissionEffect, AdmissionPolicy, AuthChallengeState, EventValidator, + FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, + MarketplaceQuerySpec, MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, + RateLimitConfig, RateLimitDecision, RuntimeLimitValues, RuntimeLimits, + SubscriptionCloseOutcome, SubscriptionManager, SubscriptionMatcher, UnapprovedSellerAction, }; use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event}; use tangle_protocol::{ @@ -28,6 +38,8 @@ use tangle_store_surreal::{ ListingProjectionQuery, MigrationApplyOutcome, SearchDocumentQuery, SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, base_migration_plan, }; +use tokio::net::TcpListener; +use tokio::sync::broadcast; use url::form_urlencoded; pub const TANGLE_SUPPORTED_NIPS: [u16; 8] = [1, 9, 11, 16, 33, 42, 50, 99]; @@ -223,6 +235,7 @@ pub struct TangleRuntimeConfig { listen_addr: SocketAddr, relay_connection: RelayConnectionConfig, database: SurrealConnectionConfig, + admission_policy: AdmissionPolicy, limits: RuntimeLimits, } @@ -239,6 +252,10 @@ impl TangleRuntimeConfig { &self.database } + pub fn admission_policy(&self) -> &AdmissionPolicy { + &self.admission_policy + } + pub fn limits(&self) -> RuntimeLimits { self.limits } @@ -399,14 +416,7 @@ impl std::error::Error for RuntimeCommandError {} pub async fn migrate_runtime_database( config: &TangleRuntimeConfig, ) -> Result<RuntimeMigrationReport, RuntimeCommandError> { - if config.database_config().mode() != &SurrealConnectionMode::Memory { - return Err(RuntimeCommandError::unsupported( - "migrate currently supports memory SurrealDB configs only", - )); - } - let store = SurrealStore::connect_memory(config.database_config()) - .await - .map_err(|error| RuntimeCommandError::store(error.to_string()))?; + let store = connect_runtime_store(config).await?; let outcomes = store .apply_plan(&base_migration_plan()) .await @@ -426,12 +436,384 @@ pub async fn migrate_runtime_database( )) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RuntimeServerReport { + listen_addr: SocketAddr, +} + +impl RuntimeServerReport { + pub fn new(listen_addr: SocketAddr) -> Self { + Self { listen_addr } + } + + pub fn listen_addr(self) -> SocketAddr { + self.listen_addr + } +} + +#[derive(Debug, Clone)] +pub struct RuntimeServer { + config: TangleRuntimeConfig, + shutdown_signal: GracefulShutdownSignal, +} + +impl RuntimeServer { + pub fn new(config: TangleRuntimeConfig, shutdown_signal: GracefulShutdownSignal) -> Self { + Self { + config, + shutdown_signal, + } + } + + pub async fn run(&self) -> Result<RuntimeServerReport, RuntimeCommandError> { + let store = connect_runtime_store(&self.config).await?; + store + .apply_plan(&base_migration_plan()) + .await + .map_err(|error| RuntimeCommandError::store(error.to_string()))?; + let listener = TcpListener::bind(self.config.listen_addr()) + .await + .map_err(|error| RuntimeCommandError::store(format!("listen failed: {error}")))?; + let listen_addr = listener + .local_addr() + .map_err(|error| RuntimeCommandError::store(format!("listen addr failed: {error}")))?; + let mut shutdown = self.shutdown_signal.subscribe(); + let app = runtime_router(self.config.clone(), store, self.shutdown_signal.clone()); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + shutdown.wait_for_shutdown().await; + }) + .await + .map_err(|error| RuntimeCommandError::store(format!("server failed: {error}")))?; + Ok(RuntimeServerReport::new(listen_addr)) + } +} + +pub async fn run_runtime_server( + config: TangleRuntimeConfig, + shutdown_signal: GracefulShutdownSignal, +) -> Result<RuntimeServerReport, RuntimeCommandError> { + RuntimeServer::new(config, shutdown_signal).run().await +} + +async fn connect_runtime_store( + config: &TangleRuntimeConfig, +) -> Result<SurrealStore, RuntimeCommandError> { + match config.database_config().mode() { + SurrealConnectionMode::Memory | SurrealConnectionMode::RocksDb { .. } => { + SurrealStore::connect_local(config.database_config()) + .await + .map_err(|error| RuntimeCommandError::store(error.to_string())) + } + SurrealConnectionMode::Http { .. } | SurrealConnectionMode::WebSocket { .. } => { + Err(RuntimeCommandError::unsupported( + "runtime commands currently support memory or rocksdb SurrealDB configs only", + )) + } + } +} + +#[derive(Clone)] +struct RuntimeRelayState { + config: TangleRuntimeConfig, + store: SurrealStore, + shutdown_signal: GracefulShutdownSignal, + event_tx: broadcast::Sender<Event>, + next_connection_id: Arc<AtomicU64>, +} + +impl RuntimeRelayState { + fn new( + config: TangleRuntimeConfig, + store: SurrealStore, + shutdown_signal: GracefulShutdownSignal, + ) -> Self { + let (event_tx, _) = broadcast::channel(config.limits().values().live_event_buffer as usize); + Self { + config, + store, + shutdown_signal, + event_tx, + next_connection_id: Arc::new(AtomicU64::new(1)), + } + } + + fn next_connection(&self) -> RelayConnection { + let id = self.next_connection_id.fetch_add(1, Ordering::Relaxed); + RelayConnection::new( + RelayConnectionId::new(&format!("conn-{id}")) + .expect("generated connection id is valid"), + self.config.relay_connection_config().clone(), + ) + } + + fn validator(&self) -> EventValidator { + EventValidator::new(self.config.limits(), self.config.admission_policy().clone()) + } +} + +fn runtime_router( + config: TangleRuntimeConfig, + store: SurrealStore, + shutdown_signal: GracefulShutdownSignal, +) -> Router { + let state = RuntimeRelayState::new(config, store, shutdown_signal); + Router::new() + .route("/", get(runtime_relay_info)) + .route("/ws", get(runtime_websocket_upgrade)) + .route("/healthz", get(runtime_healthz)) + .route("/readyz", get(runtime_readyz)) + .route("/api/listings", get(runtime_listings)) + .route("/api/listings/{pubkey}/{d}", get(runtime_listing_detail)) + .route("/api/search", get(runtime_marketplace_search)) + .route("/api/sellers/{pubkey}", get(runtime_seller_detail)) + .with_state(state) +} + +async fn runtime_relay_info(headers: HeaderMap) -> Response { + relay_info(State(RelayInfoDocument::tangle_default()), headers).await +} + +async fn runtime_websocket_upgrade( + State(state): State<RuntimeRelayState>, + headers: HeaderMap, + websocket: WebSocketUpgrade, +) -> Response { + if !is_websocket_upgrade(&headers) { + return ApiError::invalid_request("websocket upgrade required").into_response(); + } + websocket + .on_upgrade(move |socket| async move { + handle_websocket(socket, state).await; + }) + .into_response() +} + +async fn runtime_healthz() -> Json<HealthDocument> { + healthz().await +} + +async fn runtime_readyz() -> (StatusCode, Json<ReadinessDocument>) { + readyz(State(ReadinessState::ready())).await +} + +async fn runtime_listings( + State(state): State<RuntimeRelayState>, + RawQuery(query): RawQuery, +) -> Result<Json<ListingsDocument>, ApiError> { + listings( + State(ListingsHttpState::new( + state.store.clone(), + state.config.limits(), + )), + RawQuery(query), + ) + .await +} + +async fn runtime_listing_detail( + State(state): State<RuntimeRelayState>, + Path((pubkey, d)): Path<(String, String)>, +) -> Result<Json<ListingDetailDocument>, ApiError> { + listing_detail( + State(ListingsHttpState::new( + state.store.clone(), + state.config.limits(), + )), + Path((pubkey, d)), + ) + .await +} + +async fn runtime_marketplace_search( + State(state): State<RuntimeRelayState>, + RawQuery(query): RawQuery, +) -> Result<Json<ListingsDocument>, ApiError> { + marketplace_search( + State(ListingsHttpState::new( + state.store.clone(), + state.config.limits(), + )), + RawQuery(query), + ) + .await +} + +async fn runtime_seller_detail( + State(state): State<RuntimeRelayState>, + Path(pubkey): Path<String>, +) -> Result<Json<SellerDocument>, ApiError> { + seller_detail( + State(ListingsHttpState::new( + state.store.clone(), + state.config.limits(), + )), + Path(pubkey), + ) + .await +} + +async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) { + let mut shutdown = state.shutdown_signal.subscribe(); + let mut event_rx = state.event_tx.subscribe(); + let mut loop_state = ClientMessageLoop::new(state.next_connection()); + let event_handler = EventMessageHandler::new(state.store.clone(), state.validator()); + let auth_handler = AuthMessageHandler; + let req_handler = ReqMessageHandler::new(state.store.clone(), NostrFilterCompiler::default()); + let close_handler = CloseMessageHandler; + let fanout = LiveEventFanout; + let challenge = auth_handler.issue_challenge( + loop_state.connection_mut(), + "challenge-001", + UnixTimestamp::new(1_714_124_430), + ); + if send_relay_message(&mut socket, &challenge).await.is_err() { + return; + } + loop { + tokio::select! { + _ = shutdown.wait_for_shutdown() => { + let _ = socket.send(Message::Close(None)).await; + break; + } + event = event_rx.recv() => { + match event { + Ok(event) => { + for message in fanout.fanout(loop_state.connection(), &event) { + if send_relay_message(&mut socket, &message).await.is_err() { + return; + } + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + frame = socket.recv() => { + let Some(frame) = frame else { + break; + }; + let Ok(frame) = frame else { + break; + }; + match loop_state.handle_frame_at(client_frame_from_message(frame), now_timestamp()) { + ClientFrameOutcome::Message(message) => { + if handle_client_message( + &mut socket, + &mut loop_state, + &event_handler, + &auth_handler, + &req_handler, + &close_handler, + &state.event_tx, + message, + ) + .await + .is_err() + { + break; + } + } + ClientFrameOutcome::Reject(message) => { + if send_relay_message(&mut socket, &message).await.is_err() { + break; + } + } + ClientFrameOutcome::Ignore => {} + ClientFrameOutcome::Close => break, + } + } + } + } +} + +async fn handle_client_message( + socket: &mut WebSocket, + loop_state: &mut ClientMessageLoop, + event_handler: &EventMessageHandler, + auth_handler: &AuthMessageHandler, + req_handler: &ReqMessageHandler, + close_handler: &CloseMessageHandler, + event_tx: &broadcast::Sender<Event>, + message: ClientMessage, +) -> Result<(), axum::Error> { + match message { + ClientMessage::Event(event) => { + let accepted_event = event.clone(); + let response = event_handler + .handle_event( + loop_state.connection(), + event, + now_timestamp(), + now_timestamp(), + ) + .await; + let accepted = matches!(response, RelayMessage::Ok { accepted: true, .. }); + send_relay_message(socket, &response).await?; + if accepted { + let _ = event_tx.send(accepted_event); + } + } + ClientMessage::Auth(event) => { + let response = auth_handler.handle_auth( + loop_state.connection_mut(), + event.clone(), + event.unsigned().created_at(), + ); + send_relay_message(socket, &response).await?; + } + ClientMessage::Req { + subscription_id, + filters, + } => { + for response in req_handler + .handle_req(loop_state.connection_mut(), subscription_id, filters) + .await + { + send_relay_message(socket, &response).await?; + } + } + ClientMessage::Close(subscription_id) => { + close_handler.handle_close(loop_state.connection_mut(), &subscription_id); + } + } + Ok(()) +} + +fn client_frame_from_message(message: Message) -> ClientFrame { + match message { + Message::Text(value) => ClientFrame::Text(value.to_string()), + Message::Binary(value) => ClientFrame::Binary(value.to_vec()), + Message::Ping(value) => ClientFrame::Ping(value.to_vec()), + Message::Pong(value) => ClientFrame::Pong(value.to_vec()), + Message::Close(_) => ClientFrame::Close, + } +} + +async fn send_relay_message( + socket: &mut WebSocket, + message: &RelayMessage, +) -> Result<(), axum::Error> { + socket.send(Message::Text(message.encode().into())).await +} + +fn now_timestamp() -> UnixTimestamp { + UnixTimestamp::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .unwrap_or(0), + ) +} + #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] struct RuntimeConfigDocument { server: RuntimeServerConfigDocument, database: RuntimeDatabaseConfigDocument, auth: RuntimeAuthConfigDocument, limits: RuntimeLimitsConfigDocument, + #[serde(default)] + policy: RuntimePolicyConfigDocument, } #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] @@ -444,6 +826,7 @@ struct RuntimeServerConfigDocument { struct RuntimeDatabaseConfigDocument { mode: RuntimeDatabaseModeDocument, endpoint: Option<String>, + path: Option<String>, namespace: String, database: String, } @@ -452,6 +835,7 @@ struct RuntimeDatabaseConfigDocument { #[serde(rename_all = "snake_case")] enum RuntimeDatabaseModeDocument { Memory, + RocksDb, Http, #[serde(alias = "websocket")] WebSocket, @@ -492,6 +876,23 @@ struct RuntimeLimitValuesDocument { pending_store_events: Option<u64>, } +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +struct RuntimePolicyConfigDocument { + require_write_auth: Option<bool>, + unapproved_seller_action: Option<RuntimeUnapprovedSellerActionDocument>, + #[serde(default)] + approved_sellers: Vec<String>, + #[serde(default)] + blocked_pubkeys: Vec<String>, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +enum RuntimeUnapprovedSellerActionDocument { + StoreRawOnly, + RejectWrite, +} + fn runtime_config_from_document( document: RuntimeConfigDocument, ) -> Result<TangleRuntimeConfig, RuntimeConfigError> { @@ -511,10 +912,12 @@ fn runtime_config_from_document( ) .map_err(RuntimeConfigError::invalid)?; let database = database_config_from_document(document.database)?; + let admission_policy = admission_policy_from_document(document.policy)?; Ok(TangleRuntimeConfig { listen_addr, relay_connection, database, + admission_policy, limits: limits.runtime, }) } @@ -551,8 +954,25 @@ fn database_config_from_document( "database.endpoint must be omitted for memory mode", )); } + if document.path.is_some() { + return Err(RuntimeConfigError::invalid( + "database.path must be omitted for memory mode", + )); + } SurrealConnectionConfig::memory(&document.namespace, &document.database) } + RuntimeDatabaseModeDocument::RocksDb => { + if document.endpoint.is_some() { + return Err(RuntimeConfigError::invalid( + "database.endpoint must be omitted for rocksdb mode", + )); + } + SurrealConnectionConfig::rocksdb( + &required_path(document.path, "rocksdb")?, + &document.namespace, + &document.database, + ) + } RuntimeDatabaseModeDocument::Http => SurrealConnectionConfig::http( &required_endpoint(document.endpoint, "http")?, &document.namespace, @@ -573,6 +993,43 @@ fn required_endpoint(value: Option<String>, mode: &str) -> Result<String, Runtim }) } +fn required_path(value: Option<String>, mode: &str) -> Result<String, RuntimeConfigError> { + value.ok_or_else(|| { + RuntimeConfigError::invalid(format!("database.path is required for {mode} mode")) + }) +} + +fn admission_policy_from_document( + document: RuntimePolicyConfigDocument, +) -> Result<AdmissionPolicy, RuntimeConfigError> { + let action = match document.unapproved_seller_action { + Some(RuntimeUnapprovedSellerActionDocument::StoreRawOnly) | None => { + UnapprovedSellerAction::StoreRawOnly + } + Some(RuntimeUnapprovedSellerActionDocument::RejectWrite) => { + UnapprovedSellerAction::RejectWrite + } + }; + let mut policy = AdmissionPolicy::new() + .with_write_auth_required(document.require_write_auth.unwrap_or(true)) + .with_unapproved_seller_action(action); + for pubkey in document.approved_sellers { + policy = policy.approve_seller(PublicKeyHex::new(&pubkey).map_err(|error| { + RuntimeConfigError::invalid(format!( + "policy.approved_sellers contains invalid pubkey: {error}" + )) + })?); + } + for pubkey in document.blocked_pubkeys { + policy = policy.block_pubkey(PublicKeyHex::new(&pubkey).map_err(|error| { + RuntimeConfigError::invalid(format!( + "policy.blocked_pubkeys contains invalid pubkey: {error}" + )) + })?); + } + Ok(policy) +} + impl RuntimeLimitValuesDocument { fn apply(self, mut values: RuntimeLimitValues) -> RuntimeLimitValues { if let Some(value) = self.max_event_bytes { @@ -1789,6 +2246,13 @@ fn accepts_nostr_json(value: Option<&HeaderValue>) -> bool { }) } +fn is_websocket_upgrade(headers: &HeaderMap) -> bool { + headers + .get(header::UPGRADE) + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value.eq_ignore_ascii_case("websocket")) +} + fn listing_projection_query(parsed: &ListingHttpQuery) -> Result<ListingProjectionQuery, ApiError> { let query = parsed.marketplace(); if !query.categories.is_empty() { @@ -2747,7 +3211,7 @@ mod tests { assert_eq!(error.kind(), RuntimeCommandErrorKind::Unsupported); assert_eq!( error.message(), - "migrate currently supports memory SurrealDB configs only" + "runtime commands currently support memory or rocksdb SurrealDB configs only" ); } diff --git a/crates/tangle_store_surreal/Cargo.toml b/crates/tangle_store_surreal/Cargo.toml @@ -10,7 +10,7 @@ description = "SurrealDB storage backend for tangle" [dependencies] serde_json = "1" sha2 = "0.10" -surrealdb = { version = "3.1.3", default-features = false, features = ["kv-mem"] } +surrealdb = { version = "3.1.3", default-features = false, features = ["kv-mem", "kv-rocksdb"] } tangle_nips = { path = "../tangle_nips" } tangle_protocol = { path = "../tangle_protocol" } tangle_store = { path = "../tangle_store" } diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs @@ -4,7 +4,7 @@ use core::fmt; use sha2::{Digest, Sha256}; use std::collections::BTreeSet; use surrealdb::Surreal; -use surrealdb::engine::local::{Db, Mem}; +use surrealdb::engine::local::{Db, Mem, RocksDb}; use tangle_nips::{ DeletionTarget, ListingProjection, ListingProjectionEvaluation, NIP99_DRAFT_LISTING_KIND, NIP99_PUBLIC_LISTING_KIND, evaluate_listing_projection, parse_deletion_request, @@ -15,6 +15,7 @@ use tangle_store::{StoreEventOutcome, StoredEvent}; #[derive(Debug, Clone, PartialEq, Eq)] pub enum SurrealConnectionMode { Memory, + RocksDb { path: String }, Http { endpoint: String }, WebSocket { endpoint: String }, } @@ -31,6 +32,15 @@ impl SurrealConnectionConfig { Self::new(SurrealConnectionMode::Memory, namespace, database) } + pub fn rocksdb( + path: &str, + namespace: &str, + database: &str, + ) -> Result<Self, SurrealConfigError> { + let path = normalized_endpoint(path, "rocksdb path")?; + Self::new(SurrealConnectionMode::RocksDb { path }, namespace, database) + } + pub fn http( endpoint: &str, namespace: &str, @@ -813,6 +823,29 @@ impl fmt::Debug for SurrealStore { } impl SurrealStore { + pub async fn connect_local( + config: &SurrealConnectionConfig, + ) -> Result<Self, SurrealStoreError> { + match config.mode() { + SurrealConnectionMode::Memory => Self::connect_memory(config).await, + SurrealConnectionMode::RocksDb { path } => { + let db = Surreal::new::<RocksDb>(path) + .await + .map_err(SurrealStoreError::from)?; + db.use_ns(config.namespace()) + .use_db(config.database()) + .await + .map_err(SurrealStoreError::from)?; + Ok(Self { db }) + } + SurrealConnectionMode::Http { .. } | SurrealConnectionMode::WebSocket { .. } => { + Err(SurrealStoreError::new( + "surreal local connection requires memory or rocksdb mode config", + )) + } + } + } + pub async fn connect_memory( config: &SurrealConnectionConfig, ) -> Result<Self, SurrealStoreError> { @@ -2447,12 +2480,20 @@ mod tests { #[test] fn remote_config_preserves_trimmed_endpoints() { + let rocksdb = SurrealConnectionConfig::rocksdb(" /tmp/tangle-rocksdb ", "ns", "db") + .expect("rocksdb config"); let http = SurrealConnectionConfig::http(" http://127.0.0.1:8000 ", "ns", "db") .expect("http config"); let websocket = SurrealConnectionConfig::websocket(" ws://127.0.0.1:8000 ", "ns", "db") .expect("websocket config"); assert_eq!( + rocksdb.mode(), + &SurrealConnectionMode::RocksDb { + path: "/tmp/tangle-rocksdb".to_owned() + } + ); + assert_eq!( http.mode(), &SurrealConnectionMode::Http { endpoint: "http://127.0.0.1:8000".to_owned() @@ -2487,6 +2528,12 @@ mod tests { } ); assert_eq!( + SurrealConnectionConfig::rocksdb("", "ns", "db").expect_err("path error"), + SurrealConfigError { + message: "surreal rocksdb path must not be empty".to_owned() + } + ); + assert_eq!( SurrealConnectionConfig::websocket(" ", "ns", "db") .expect_err("websocket endpoint error") .to_string(), @@ -2600,6 +2647,13 @@ mod tests { error.message(), "surreal memory connection requires memory mode config" ); + let local_error = SurrealStore::connect_local(&config) + .await + .expect_err("remote local mismatch"); + assert_eq!( + local_error.message(), + "surreal local connection requires memory or rocksdb mode config" + ); } #[tokio::test]