tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 79b00952e272ddb1dbf7805ae77e37bab868f853
parent acfc75b4a2e55a0cb7ce9ac5e6710afc501610aa
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 01:35:59 -0700

cli: add runtime config loader

Diffstat:
Mcrates/tangle_runtime/src/lib.rs | 523+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 515 insertions(+), 8 deletions(-)

diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -10,12 +10,13 @@ use axum::{ use core::fmt; use http::{HeaderMap, HeaderValue, StatusCode, header}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeSet; +use std::{collections::BTreeSet, fs, net::SocketAddr, path::Path as FsPath}; use tangle_core::{ AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec, MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RateLimitDecision, - RuntimeLimits, SubscriptionCloseOutcome, SubscriptionManager, SubscriptionMatcher, + RuntimeLimitValues, RuntimeLimits, SubscriptionCloseOutcome, SubscriptionManager, + SubscriptionMatcher, }; use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event}; use tangle_protocol::{ @@ -23,7 +24,9 @@ use tangle_protocol::{ SubscriptionId, UnixTimestamp, parse_client_message, parse_event_json, }; use tangle_store::{StoreEventOutcome, StoredEvent}; -use tangle_store_surreal::{ListingProjectionQuery, SearchDocumentQuery, SurrealStore}; +use tangle_store_surreal::{ + ListingProjectionQuery, SearchDocumentQuery, SurrealConnectionConfig, SurrealStore, +}; use url::form_urlencoded; pub const TANGLE_SUPPORTED_NIPS: [u16; 8] = [1, 9, 11, 16, 33, 42, 50, 99]; @@ -214,6 +217,302 @@ impl Default for WebSocketHttpState { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TangleRuntimeConfig { + listen_addr: SocketAddr, + relay_connection: RelayConnectionConfig, + database: SurrealConnectionConfig, + limits: RuntimeLimits, +} + +impl TangleRuntimeConfig { + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn relay_connection_config(&self) -> &RelayConnectionConfig { + &self.relay_connection + } + + pub fn database_config(&self) -> &SurrealConnectionConfig { + &self.database + } + + pub fn limits(&self) -> RuntimeLimits { + self.limits + } + + pub fn websocket_state(&self, shutdown_signal: GracefulShutdownSignal) -> WebSocketHttpState { + WebSocketHttpState::with_shutdown(self.relay_connection.clone(), shutdown_signal) + } + + pub fn listings_state(&self, store: SurrealStore) -> ListingsHttpState { + ListingsHttpState::new(store, self.limits) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RuntimeConfigErrorKind { + Read, + Parse, + Invalid, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RuntimeConfigError { + kind: RuntimeConfigErrorKind, + message: String, +} + +impl RuntimeConfigError { + pub fn new(kind: RuntimeConfigErrorKind, message: impl Into<String>) -> Self { + Self { + kind, + message: message.into(), + } + } + + pub fn read(message: impl Into<String>) -> Self { + Self::new(RuntimeConfigErrorKind::Read, message) + } + + pub fn parse(message: impl Into<String>) -> Self { + Self::new(RuntimeConfigErrorKind::Parse, message) + } + + pub fn invalid(message: impl Into<String>) -> Self { + Self::new(RuntimeConfigErrorKind::Invalid, message) + } + + pub fn kind(&self) -> RuntimeConfigErrorKind { + self.kind + } + + pub fn message(&self) -> &str { + &self.message + } +} + +impl fmt::Display for RuntimeConfigError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(formatter, "{:?}: {}", self.kind, self.message) + } +} + +impl std::error::Error for RuntimeConfigError {} + +pub fn load_runtime_config( + path: impl AsRef<FsPath>, +) -> Result<TangleRuntimeConfig, RuntimeConfigError> { + let path = path.as_ref(); + let raw = fs::read_to_string(path).map_err(|error| { + RuntimeConfigError::read(format!( + "failed to read runtime config `{}`: {error}", + path.display() + )) + })?; + parse_runtime_config_json(&raw) +} + +pub fn parse_runtime_config_json(raw: &str) -> Result<TangleRuntimeConfig, RuntimeConfigError> { + let document = serde_json::from_str::<RuntimeConfigDocument>(raw).map_err(|error| { + RuntimeConfigError::parse(format!("runtime config JSON is invalid: {error}")) + })?; + runtime_config_from_document(document) +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeConfigDocument { + server: RuntimeServerConfigDocument, + database: RuntimeDatabaseConfigDocument, + auth: RuntimeAuthConfigDocument, + limits: RuntimeLimitsConfigDocument, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeServerConfigDocument { + listen_addr: String, + relay_url: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeDatabaseConfigDocument { + mode: RuntimeDatabaseModeDocument, + endpoint: Option<String>, + namespace: String, + database: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +enum RuntimeDatabaseModeDocument { + Memory, + Http, + #[serde(alias = "websocket")] + WebSocket, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeAuthConfigDocument { + challenge_ttl_seconds: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeLimitsConfigDocument { + message_rate_limit: RuntimeRateLimitConfigDocument, + #[serde(default)] + runtime: RuntimeLimitValuesDocument, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RuntimeRateLimitConfigDocument { + limit: u64, + window_seconds: u64, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +struct RuntimeLimitValuesDocument { + max_event_bytes: Option<u64>, + max_content_bytes: Option<u64>, + max_tags_per_event: Option<u64>, + max_tag_values_per_tag: Option<u64>, + max_tag_value_bytes: Option<u64>, + max_filters_per_subscription: Option<u64>, + max_subscriptions_per_connection: Option<u64>, + max_search_query_bytes: Option<u64>, + max_search_tokens: Option<u64>, + max_filter_complexity: Option<u64>, + max_future_seconds: Option<u64>, + live_event_buffer: Option<u64>, + pending_store_events: Option<u64>, +} + +fn runtime_config_from_document( + document: RuntimeConfigDocument, +) -> Result<TangleRuntimeConfig, RuntimeConfigError> { + let listen_addr = document + .server + .listen_addr + .parse::<SocketAddr>() + .map_err(|error| { + RuntimeConfigError::invalid(format!("server.listen_addr is invalid: {error}")) + })?; + let limits = runtime_limits_from_document(document.limits)?; + let relay_connection = RelayConnectionConfig::new( + document.server.relay_url, + document.auth.challenge_ttl_seconds, + limits.message_rate_limit, + limits.runtime, + ) + .map_err(RuntimeConfigError::invalid)?; + let database = database_config_from_document(document.database)?; + Ok(TangleRuntimeConfig { + listen_addr, + relay_connection, + database, + limits: limits.runtime, + }) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct ResolvedRuntimeLimits { + message_rate_limit: RateLimitConfig, + runtime: RuntimeLimits, +} + +fn runtime_limits_from_document( + document: RuntimeLimitsConfigDocument, +) -> Result<ResolvedRuntimeLimits, RuntimeConfigError> { + let message_rate_limit = RateLimitConfig::new( + document.message_rate_limit.limit, + document.message_rate_limit.window_seconds, + ) + .map_err(|error| RuntimeConfigError::invalid(error.to_string()))?; + let runtime = RuntimeLimits::from_values(document.runtime.apply(RuntimeLimitValues::default())) + .map_err(|error| RuntimeConfigError::invalid(error.to_string()))?; + Ok(ResolvedRuntimeLimits { + message_rate_limit, + runtime, + }) +} + +fn database_config_from_document( + document: RuntimeDatabaseConfigDocument, +) -> Result<SurrealConnectionConfig, RuntimeConfigError> { + match document.mode { + RuntimeDatabaseModeDocument::Memory => { + if document.endpoint.is_some() { + return Err(RuntimeConfigError::invalid( + "database.endpoint must be omitted for memory mode", + )); + } + SurrealConnectionConfig::memory(&document.namespace, &document.database) + } + RuntimeDatabaseModeDocument::Http => SurrealConnectionConfig::http( + &required_endpoint(document.endpoint, "http")?, + &document.namespace, + &document.database, + ), + RuntimeDatabaseModeDocument::WebSocket => SurrealConnectionConfig::websocket( + &required_endpoint(document.endpoint, "websocket")?, + &document.namespace, + &document.database, + ), + } + .map_err(|error| RuntimeConfigError::invalid(error.to_string())) +} + +fn required_endpoint(value: Option<String>, mode: &str) -> Result<String, RuntimeConfigError> { + value.ok_or_else(|| { + RuntimeConfigError::invalid(format!("database.endpoint is required for {mode} mode")) + }) +} + +impl RuntimeLimitValuesDocument { + fn apply(self, mut values: RuntimeLimitValues) -> RuntimeLimitValues { + if let Some(value) = self.max_event_bytes { + values.max_event_bytes = value; + } + if let Some(value) = self.max_content_bytes { + values.max_content_bytes = value; + } + if let Some(value) = self.max_tags_per_event { + values.max_tags_per_event = value; + } + if let Some(value) = self.max_tag_values_per_tag { + values.max_tag_values_per_tag = value; + } + if let Some(value) = self.max_tag_value_bytes { + values.max_tag_value_bytes = value; + } + if let Some(value) = self.max_filters_per_subscription { + values.max_filters_per_subscription = value; + } + if let Some(value) = self.max_subscriptions_per_connection { + values.max_subscriptions_per_connection = value; + } + if let Some(value) = self.max_search_query_bytes { + values.max_search_query_bytes = value; + } + if let Some(value) = self.max_search_tokens { + values.max_search_tokens = value; + } + if let Some(value) = self.max_filter_complexity { + values.max_filter_complexity = value; + } + if let Some(value) = self.max_future_seconds { + values.max_future_seconds = value; + } + if let Some(value) = self.live_event_buffer { + values.live_event_buffer = value; + } + if let Some(value) = self.pending_store_events { + values.pending_store_events = value; + } + values + } +} + #[derive(Debug, Clone)] pub struct GracefulShutdownSignal { sender: tokio::sync::watch::Sender<bool>, @@ -1838,10 +2137,11 @@ mod tests { ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome, EventMessageHandler, GracefulShutdownSignal, ListingsHttpState, LiveEventFanout, ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig, - RelayConnectionId, RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE, - TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router, listing_item_document, - listing_projection_query, listings_router, parse_listing_query, - parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router, + RelayConnectionId, RelayInfoDocument, ReqMessageHandler, RuntimeConfigErrorKind, + TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router, + listing_item_document, listing_projection_query, listings_router, load_runtime_config, + parse_listing_query, parse_marketplace_search_query, parse_runtime_config_json, + relay_info_router, search_document_query, websocket_router, }; use axum::{body::Body, response::IntoResponse}; use http::{HeaderValue, Request, StatusCode, header}; @@ -1855,7 +2155,9 @@ mod tests { filter_from_value, }; use tangle_store::StoredEvent; - use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore, base_migration_plan}; + use tangle_store_surreal::{ + SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, base_migration_plan, + }; use tangle_test_support::{auth_event_spec, build_fixture_event, valid_public_listing_spec}; use tower::ServiceExt; @@ -2061,6 +2363,211 @@ mod tests { assert_eq!(second.is_shutdown_requested(), true); } + #[test] + fn runtime_config_loader_parses_memory_config() { + let config = parse_runtime_config_json( + r#"{ + "server": { + "listen_addr": "127.0.0.1:7000", + "relay_url": "ws://127.0.0.1:7000" + }, + "database": { + "mode": "memory", + "namespace": "tangle_test", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 120 + }, + "limits": { + "message_rate_limit": { + "limit": 3, + "window_seconds": 5 + }, + "runtime": { + "max_event_bytes": 2048, + "max_content_bytes": 1024, + "max_subscriptions_per_connection": 8 + } + } + }"#, + ) + .expect("runtime config"); + let (shutdown, _) = GracefulShutdownSignal::new(); + let websocket_state = config.websocket_state(shutdown); + + assert_eq!(config.listen_addr().to_string(), "127.0.0.1:7000"); + assert_eq!( + config.relay_connection_config().relay_url(), + "ws://127.0.0.1:7000" + ); + assert_eq!(config.relay_connection_config().auth_ttl_seconds(), 120); + assert_eq!( + config.relay_connection_config().message_rate_limit(), + RateLimitConfig::new(3, 5).expect("rate") + ); + assert_eq!(config.limits().max_event_bytes(), 2048); + assert_eq!(config.limits().max_content_bytes(), 1024); + assert_eq!(config.limits().max_subscriptions_per_connection(), 8); + assert_eq!(config.database_config().namespace(), "tangle_test"); + assert_eq!(config.database_config().database(), "relay"); + assert_eq!( + config.database_config().mode(), + &SurrealConnectionMode::Memory + ); + assert_eq!( + websocket_state.connection_config(), + config.relay_connection_config() + ); + } + + #[test] + fn runtime_config_loader_parses_websocket_database_config() { + let config = parse_runtime_config_json( + r#"{ + "server": { + "listen_addr": "127.0.0.1:7100", + "relay_url": "wss://relay.radroots.test" + }, + "database": { + "mode": "web_socket", + "endpoint": "ws://127.0.0.1:8000", + "namespace": "tangle", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + } + }"#, + ) + .expect("runtime config"); + + assert_eq!( + config.database_config().mode(), + &SurrealConnectionMode::WebSocket { + endpoint: "ws://127.0.0.1:8000".to_owned() + } + ); + assert_eq!(config.limits(), RuntimeLimits::default()); + } + + #[test] + fn runtime_config_loader_rejects_invalid_documents() { + let parse_error = parse_runtime_config_json("{").expect_err("parse"); + let invalid_listen = parse_runtime_config_json( + r#"{ + "server": { + "listen_addr": "not a socket", + "relay_url": "ws://127.0.0.1:7000" + }, + "database": { + "mode": "memory", + "namespace": "tangle", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + } + }"#, + ) + .expect_err("listen"); + let missing_endpoint = parse_runtime_config_json( + r#"{ + "server": { + "listen_addr": "127.0.0.1:7000", + "relay_url": "ws://127.0.0.1:7000" + }, + "database": { + "mode": "http", + "namespace": "tangle", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + } + }"#, + ) + .expect_err("endpoint"); + + assert_eq!(parse_error.kind(), RuntimeConfigErrorKind::Parse); + assert!( + parse_error + .message() + .starts_with("runtime config JSON is invalid:") + ); + assert_eq!(invalid_listen.kind(), RuntimeConfigErrorKind::Invalid); + assert!( + invalid_listen + .message() + .starts_with("server.listen_addr is invalid:") + ); + assert_eq!(missing_endpoint.kind(), RuntimeConfigErrorKind::Invalid); + assert_eq!( + missing_endpoint.message(), + "database.endpoint is required for http mode" + ); + } + + #[test] + fn runtime_config_loader_reads_config_file() { + let path = std::env::temp_dir().join(format!( + "tangle-runtime-config-loader-{}.json", + std::process::id() + )); + std::fs::write( + &path, + r#"{ + "server": { + "listen_addr": "127.0.0.1:7200", + "relay_url": "ws://127.0.0.1:7200" + }, + "database": { + "mode": "memory", + "namespace": "tangle_file", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + } + }"#, + ) + .expect("write config"); + + let config = load_runtime_config(&path).expect("loaded config"); + std::fs::remove_file(&path).expect("remove config"); + + assert_eq!(config.listen_addr().to_string(), "127.0.0.1:7200"); + assert_eq!(config.database_config().namespace(), "tangle_file"); + assert_eq!( + load_runtime_config(&path).expect_err("missing").kind(), + RuntimeConfigErrorKind::Read + ); + } + #[tokio::test] async fn websocket_route_requires_upgrade_headers() { let response = websocket_router(WebSocketHttpState::default())