tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit aa4fea1c236bed5ee7575fb236359ef07e6c4825
parent 91c6ddfdf3aba9466a2c92242de299d6a0ca540f
Author: triesap <tyson@radroots.org>
Date:   Wed, 17 Jun 2026 15:16:37 -0700

config: add tenant host config contract

- add typed tenant identity and canonical host primitives
- add strict v1 MVP host and tenant config parsers
- add host and tenant example config files
- guard the virtual relay tenancy source authority

Diffstat:
Aconfig/tangle.host.example.json | 24++++++++++++++++++++++++
Aconfig/tenants/farmers_market.example.json | 178+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle/tests/source_invariant.rs | 14++++++++++++++
Mcrates/tangle_runtime/src/config.rs | 640++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle_runtime/src/lib.rs | 1+
Acrates/tangle_runtime/src/tenant.rs | 336+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 1191 insertions(+), 2 deletions(-)

diff --git a/config/tangle.host.example.json b/config/tangle.host.example.json @@ -0,0 +1,24 @@ +{ + "listen_addr": "0.0.0.0:7000", + "tenant_config_dir": "config/tenants", + "limits": { + "max_total_connections": 10000, + "max_total_subscriptions": 25000, + "tenant_startup_concurrency": 4 + }, + "ops": { + "enabled": true, + "expose_tenant_inventory": true + }, + "trusted_proxy": { + "enabled": false, + "trusted_peers": [] + }, + "observability": { + "tracing": { + "enabled": true, + "filter": "info,tangle=info,tangle_runtime=info,tangle_groups=info,tangle_store_pocket=info", + "format": "json" + } + } +} diff --git a/config/tenants/farmers_market.example.json b/config/tenants/farmers_market.example.json @@ -0,0 +1,178 @@ +{ + "tenant_id": "farmers-market", + "tenant_schema": "farmers_market", + "host": "relay.radroots.test", + "relay_url": "wss://relay.radroots.test", + "inactive": false, + "info": { + "name": "Radroots Farmers Market", + "description": "Tangle virtual relay tenant for the Radroots farmers market", + "contact": "mailto:operators@radroots.test", + "icon": "https://radroots.test/icon.png" + }, + "pocket": { + "data_directory": "runtime/tenants/farmers_market/pocket", + "sync_policy": "flush_on_shutdown" + }, + "pocket_query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 + }, + "groups": { + "enabled": true, + "canonical_relay_url": "wss://relay.radroots.test", + "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", + "owner_pubkeys": [ + "0000000000000000000000000000000000000000000000000000000000000001" + ], + "admin_pubkeys": [ + "0000000000000000000000000000000000000000000000000000000000000002" + ], + "policy": { + "public_join": false, + "invites_enabled": false + }, + "limits": { + "max_group_id_bytes": 128, + "max_group_tags_per_event": 8, + "max_supported_kinds": 512, + "max_member_list_pubkeys": 100000, + "max_outbox_replay_batch": 1000 + } + }, + "auth": { + "challenge_ttl_seconds": 300, + "created_at_skew_seconds": 600 + }, + "limits": { + "max_message_length": 1048576, + "max_subid_length": 64, + "max_subscriptions_per_connection": 64, + "max_filters_per_request": 10, + "max_tag_values_per_filter": 100, + "max_query_complexity": 2048, + "max_limit": 500, + "default_limit": 100, + "max_event_tags": 200, + "max_content_length": 65536, + "broadcast_channel_capacity": 4096, + "per_connection_outbound_queue": 256 + }, + "rate_limits": { + "auth": { + "per_ip": { + "window_seconds": 60, + "max_hits": 120 + }, + "per_pubkey": { + "window_seconds": 60, + "max_hits": 30 + }, + "failures": { + "window_seconds": 300, + "max_hits": 5 + }, + "failures_per_ip": { + "window_seconds": 300, + "max_hits": 20 + } + }, + "event": { + "per_ip": { + "window_seconds": 60, + "max_hits": 600 + }, + "per_pubkey": { + "window_seconds": 60, + "max_hits": 120 + }, + "per_kind": { + "window_seconds": 60, + "max_hits": 1000 + } + }, + "group": { + "write_per_ip": { + "window_seconds": 60, + "max_hits": 300 + }, + "write_per_pubkey": { + "window_seconds": 60, + "max_hits": 60 + }, + "write_per_group": { + "window_seconds": 60, + "max_hits": 90 + }, + "write_per_kind": { + "window_seconds": 60, + "max_hits": 300 + }, + "join_flow": { + "window_seconds": 300, + "max_hits": 10 + }, + "join_flow_per_ip": { + "window_seconds": 300, + "max_hits": 30 + } + }, + "req": { + "per_ip": { + "window_seconds": 60, + "max_hits": 600 + }, + "per_connection": { + "window_seconds": 60, + "max_hits": 120 + }, + "per_pubkey": { + "window_seconds": 60, + "max_hits": 240 + }, + "per_group": { + "window_seconds": 60, + "max_hits": 240 + }, + "per_kind": { + "window_seconds": 60, + "max_hits": 500 + }, + "broad": { + "window_seconds": 60, + "max_hits": 30 + } + }, + "count": { + "per_ip": { + "window_seconds": 60, + "max_hits": 300 + }, + "per_connection": { + "window_seconds": 60, + "max_hits": 60 + }, + "per_pubkey": { + "window_seconds": 60, + "max_hits": 120 + }, + "per_group": { + "window_seconds": 60, + "max_hits": 120 + }, + "per_kind": { + "window_seconds": 60, + "max_hits": 240 + }, + "broad": { + "window_seconds": 60, + "max_hits": 20 + } + } + }, + "backup_export": { + "backup_enabled": true, + "export_enabled": true + } +} diff --git a/crates/tangle/tests/source_invariant.rs b/crates/tangle/tests/source_invariant.rs @@ -29,6 +29,20 @@ fn production_runtime_and_group_source_keeps_pocket_native_boundary() { } #[test] +fn tangle_v1_mvp_authority_requires_virtual_relay_tenancy() { + let workspace_root = workspace_root(); + let agents = fs::read_to_string(workspace_root.join("AGENTS.md")).expect("AGENTS.md"); + assert!( + agents.contains("virtual relay tenancy is required for `tangle_v1_mvp`"), + "AGENTS.md must state the approved tangle_v1_mvp virtual relay tenancy requirement" + ); + assert!( + !agents.contains("no multi-tenancy"), + "AGENTS.md must not keep the stale no multi-tenancy directive" + ); +} + +#[test] fn scanner_removes_test_gated_items_without_removing_production_items() { let source = [ "#[cfg(test)]\n", diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs @@ -10,16 +10,324 @@ use crate::{ auth::BaseAuthState, core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, }, + tenant::{CanonicalHost, TenantId, TenantRelayUrl, TenantSchema}, }; use serde::Deserialize; use std::{net::SocketAddr, path::PathBuf}; +use tangle_crypto::RelaySigner; use tangle_groups::GroupRuntimeConfig; -use tangle_protocol::SubscriptionId; +use tangle_protocol::{PublicKeyHex, SubscriptionId}; use tangle_store_pocket::{PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy}; const MAX_POCKET_QUERY_SCRAPE_WINDOW_SECONDS: u64 = 86_400; #[derive(Debug, Clone, PartialEq, Eq)] +pub struct TangleHostRuntimeConfig { + listen_addr: SocketAddr, + tenant_config_dir: PathBuf, + limits: TangleHostLimitsConfig, + ops: TangleHostOpsConfig, + trusted_proxy: TangleTrustedProxyConfig, + tracing: BaseRelayTracingConfig, +} + +impl TangleHostRuntimeConfig { + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn tenant_config_dir(&self) -> &std::path::Path { + &self.tenant_config_dir + } + + pub fn limits(&self) -> TangleHostLimitsConfig { + self.limits + } + + pub fn ops(&self) -> TangleHostOpsConfig { + self.ops + } + + pub fn trusted_proxy(&self) -> &TangleTrustedProxyConfig { + &self.trusted_proxy + } + + pub fn tracing(&self) -> &BaseRelayTracingConfig { + &self.tracing + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleHostLimitsConfig { + max_total_connections: usize, + max_total_subscriptions: usize, + tenant_startup_concurrency: usize, +} + +impl TangleHostLimitsConfig { + pub fn new( + max_total_connections: usize, + max_total_subscriptions: usize, + tenant_startup_concurrency: usize, + ) -> Result<Self, BaseRelayError> { + require_positive("limits.max_total_connections", max_total_connections)?; + require_positive("limits.max_total_subscriptions", max_total_subscriptions)?; + require_positive( + "limits.tenant_startup_concurrency", + tenant_startup_concurrency, + )?; + Ok(Self { + max_total_connections, + max_total_subscriptions, + tenant_startup_concurrency, + }) + } + + pub fn max_total_connections(self) -> usize { + self.max_total_connections + } + + pub fn max_total_subscriptions(self) -> usize { + self.max_total_subscriptions + } + + pub fn tenant_startup_concurrency(self) -> usize { + self.tenant_startup_concurrency + } +} + +impl Default for TangleHostLimitsConfig { + fn default() -> Self { + Self::new(10_000, 25_000, 4).expect("default host limits are valid") + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleHostOpsConfig { + enabled: bool, + expose_tenant_inventory: bool, +} + +impl TangleHostOpsConfig { + pub fn new(enabled: bool, expose_tenant_inventory: bool) -> Self { + Self { + enabled, + expose_tenant_inventory, + } + } + + pub fn enabled(self) -> bool { + self.enabled + } + + pub fn expose_tenant_inventory(self) -> bool { + self.expose_tenant_inventory + } +} + +impl Default for TangleHostOpsConfig { + fn default() -> Self { + Self::new(true, true) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TangleTrustedProxyConfig { + enabled: bool, + trusted_peers: Vec<String>, +} + +impl TangleTrustedProxyConfig { + pub fn new(enabled: bool, trusted_peers: Vec<String>) -> Result<Self, BaseRelayError> { + for peer in &trusted_peers { + if peer.trim().is_empty() || peer.trim() != peer { + return Err(BaseRelayError::invalid( + "trusted_proxy.trusted_peers entries must not be empty or padded", + )); + } + } + Ok(Self { + enabled, + trusted_peers, + }) + } + + pub fn enabled(&self) -> bool { + self.enabled + } + + pub fn trusted_peers(&self) -> &[String] { + &self.trusted_peers + } +} + +impl Default for TangleTrustedProxyConfig { + fn default() -> Self { + Self::new(false, Vec::new()).expect("default trusted proxy config is valid") + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TenantRelayInfoConfig { + name: String, + description: Option<String>, + contact: Option<String>, + icon: Option<String>, +} + +impl TenantRelayInfoConfig { + pub fn new( + name: impl Into<String>, + description: Option<String>, + contact: Option<String>, + icon: Option<String>, + ) -> Result<Self, BaseRelayError> { + let name = name.into(); + if name.trim().is_empty() || name.trim() != name { + return Err(BaseRelayError::invalid( + "info.name must not be empty or padded", + )); + } + Ok(Self { + name, + description: validate_optional_text("info.description", description)?, + contact: validate_optional_text("info.contact", contact)?, + icon: validate_optional_text("info.icon", icon)?, + }) + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn description(&self) -> Option<&str> { + self.description.as_deref() + } + + pub fn contact(&self) -> Option<&str> { + self.contact.as_deref() + } + + pub fn icon(&self) -> Option<&str> { + self.icon.as_deref() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantBackupExportConfig { + backup_enabled: bool, + export_enabled: bool, +} + +impl TenantBackupExportConfig { + pub fn new(backup_enabled: bool, export_enabled: bool) -> Self { + Self { + backup_enabled, + export_enabled, + } + } + + pub fn backup_enabled(self) -> bool { + self.backup_enabled + } + + pub fn export_enabled(self) -> bool { + self.export_enabled + } +} + +impl Default for TenantBackupExportConfig { + fn default() -> Self { + Self::new(true, true) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TenantRuntimeConfig { + tenant_id: TenantId, + tenant_schema: TenantSchema, + host: CanonicalHost, + relay_url: TenantRelayUrl, + inactive: bool, + info: TenantRelayInfoConfig, + pocket: PocketStoreConfig, + pocket_query: PocketQueryConfig, + groups: GroupRuntimeConfig, + auth_ttl_seconds: u64, + auth_created_at_skew_seconds: u64, + limits: BaseRelayRuntimeLimitsConfig, + rate_limits: TangleRateLimitConfig, + backup_export: TenantBackupExportConfig, +} + +impl TenantRuntimeConfig { + pub fn tenant_id(&self) -> &TenantId { + &self.tenant_id + } + + pub fn tenant_schema(&self) -> &TenantSchema { + &self.tenant_schema + } + + pub fn host(&self) -> &CanonicalHost { + &self.host + } + + pub fn relay_url(&self) -> &TenantRelayUrl { + &self.relay_url + } + + pub fn inactive(&self) -> bool { + self.inactive + } + + pub fn info(&self) -> &TenantRelayInfoConfig { + &self.info + } + + pub fn pocket_config(&self) -> &PocketStoreConfig { + &self.pocket + } + + pub fn pocket_query_config(&self) -> PocketQueryConfig { + self.pocket_query + } + + pub fn groups(&self) -> &GroupRuntimeConfig { + &self.groups + } + + pub fn relay_self_pubkey(&self) -> Result<Option<PublicKeyHex>, BaseRelayError> { + self.groups + .relay_secret() + .map(|secret| RelaySigner::from_secret_hex(secret.expose_for_signing())) + .transpose() + .map(|signer| signer.map(|signer| signer.public_key().clone())) + .map_err(BaseRelayError::invalid) + } + + pub fn auth_ttl_seconds(&self) -> u64 { + self.auth_ttl_seconds + } + + pub fn auth_created_at_skew_seconds(&self) -> u64 { + self.auth_created_at_skew_seconds + } + + pub fn limits(&self) -> BaseRelayRuntimeLimitsConfig { + self.limits + } + + pub fn rate_limits(&self) -> TangleRateLimitConfig { + self.rate_limits + } + + pub fn backup_export(&self) -> TenantBackupExportConfig { + self.backup_export + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct BaseRelayRuntimeConfig { listen_addr: SocketAddr, relay_url: String, @@ -295,6 +603,128 @@ impl BaseRelayRuntimeLimitsConfig { #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] +struct TangleHostRuntimeConfigDocument { + listen_addr: String, + tenant_config_dir: String, + #[serde(default)] + limits: TangleHostLimitsConfigDocument, + #[serde(default)] + ops: TangleHostOpsConfigDocument, + #[serde(default)] + trusted_proxy: TangleTrustedProxyConfigDocument, + #[serde(default)] + observability: BaseRelayObservabilityConfigDocument, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TangleHostLimitsConfigDocument { + max_total_connections: usize, + max_total_subscriptions: usize, + tenant_startup_concurrency: usize, +} + +impl Default for TangleHostLimitsConfigDocument { + fn default() -> Self { + let defaults = TangleHostLimitsConfig::default(); + Self { + max_total_connections: defaults.max_total_connections(), + max_total_subscriptions: defaults.max_total_subscriptions(), + tenant_startup_concurrency: defaults.tenant_startup_concurrency(), + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TangleHostOpsConfigDocument { + enabled: bool, + expose_tenant_inventory: bool, +} + +impl Default for TangleHostOpsConfigDocument { + fn default() -> Self { + let defaults = TangleHostOpsConfig::default(); + Self { + enabled: defaults.enabled(), + expose_tenant_inventory: defaults.expose_tenant_inventory(), + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TangleTrustedProxyConfigDocument { + enabled: bool, + #[serde(default)] + trusted_peers: Vec<String>, +} + +impl Default for TangleTrustedProxyConfigDocument { + fn default() -> Self { + let defaults = TangleTrustedProxyConfig::default(); + Self { + enabled: defaults.enabled(), + trusted_peers: defaults.trusted_peers().to_vec(), + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TenantRuntimeConfigDocument { + tenant_id: String, + tenant_schema: String, + host: String, + relay_url: String, + #[serde(default)] + inactive: bool, + info: TenantRelayInfoConfigDocument, + pocket: TenantPocketConfigDocument, + pocket_query: BaseRelayPocketQueryConfigDocument, + groups: serde_json::Value, + auth: BaseRelayAuthConfigDocument, + limits: BaseRelayRuntimeLimitsDocument, + rate_limits: BaseRelayRateLimitsDocument, + #[serde(default)] + backup_export: TenantBackupExportConfigDocument, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TenantRelayInfoConfigDocument { + name: String, + description: Option<String>, + contact: Option<String>, + icon: Option<String>, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TenantPocketConfigDocument { + data_directory: String, + sync_policy: BaseRelayPocketSyncPolicyDocument, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct TenantBackupExportConfigDocument { + backup_enabled: bool, + export_enabled: bool, +} + +impl Default for TenantBackupExportConfigDocument { + fn default() -> Self { + let defaults = TenantBackupExportConfig::default(); + Self { + backup_enabled: defaults.backup_enabled(), + export_enabled: defaults.export_enabled(), + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] struct BaseRelayRuntimeConfigDocument { server: BaseRelayServerConfigDocument, pocket: BaseRelayPocketConfigDocument, @@ -438,6 +868,120 @@ enum BaseRelayTracingFormatDocument { Json, } +pub fn parse_tangle_host_runtime_config_json( + raw: &str, +) -> Result<TangleHostRuntimeConfig, BaseRelayError> { + reject_legacy_single_relay_config(raw)?; + let document = + serde_json::from_str::<TangleHostRuntimeConfigDocument>(raw).map_err(|error| { + BaseRelayError::invalid(format!( + "tangle host runtime config JSON is invalid: {error}" + )) + })?; + let listen_addr = document + .listen_addr + .parse::<SocketAddr>() + .map_err(|error| BaseRelayError::invalid(format!("listen_addr is invalid: {error}")))?; + if document.tenant_config_dir.trim().is_empty() + || document.tenant_config_dir.trim() != document.tenant_config_dir + { + return Err(BaseRelayError::invalid( + "tenant_config_dir must not be empty or padded", + )); + } + Ok(TangleHostRuntimeConfig { + listen_addr, + tenant_config_dir: PathBuf::from(document.tenant_config_dir), + limits: TangleHostLimitsConfig::new( + document.limits.max_total_connections, + document.limits.max_total_subscriptions, + document.limits.tenant_startup_concurrency, + )?, + ops: TangleHostOpsConfig::new(document.ops.enabled, document.ops.expose_tenant_inventory), + trusted_proxy: TangleTrustedProxyConfig::new( + document.trusted_proxy.enabled, + document.trusted_proxy.trusted_peers, + )?, + tracing: base_relay_tracing_config_from_document(document.observability.tracing)?, + }) +} + +pub fn parse_tenant_runtime_config_json(raw: &str) -> Result<TenantRuntimeConfig, BaseRelayError> { + reject_legacy_single_relay_config(raw)?; + let document = serde_json::from_str::<TenantRuntimeConfigDocument>(raw).map_err(|error| { + BaseRelayError::invalid(format!("tenant runtime config JSON is invalid: {error}")) + })?; + let tenant_id = TenantId::new(document.tenant_id)?; + let tenant_schema = TenantSchema::new(document.tenant_schema)?; + let host = CanonicalHost::new(document.host)?; + let relay_url = TenantRelayUrl::new(document.relay_url)?; + let pocket = PocketStoreConfig::new( + PathBuf::from(document.pocket.data_directory), + match document.pocket.sync_policy { + BaseRelayPocketSyncPolicyDocument::FlushOnWrite => PocketSyncPolicy::FlushOnWrite, + BaseRelayPocketSyncPolicyDocument::FlushOnShutdown => PocketSyncPolicy::FlushOnShutdown, + }, + ) + .map_err(|error| BaseRelayError::invalid(error.to_string()))?; + let groups_raw = serde_json::to_string(&document.groups).map_err(|error| { + BaseRelayError::invalid(format!("groups config JSON is invalid: {error}")) + })?; + let groups = tangle_groups::parse_group_runtime_config_json(&groups_raw) + .map_err(|error| BaseRelayError::invalid(error.to_string()))?; + if let Some(group_relay_url) = groups.canonical_relay_url() + && group_relay_url.as_str() != relay_url.as_str() + { + return Err(BaseRelayError::invalid( + "groups.canonical_relay_url must match relay_url", + )); + } + let limits = BaseRelayRuntimeLimitsConfig::from_document(document.limits)?; + let pocket_query = pocket_query_config_from_document(document.pocket_query, limits)?; + if document.auth.created_at_skew_seconds == 0 { + return Err(BaseRelayError::invalid( + "auth.created_at_skew_seconds must be greater than zero", + )); + } + Ok(TenantRuntimeConfig { + tenant_id, + tenant_schema, + host, + relay_url, + inactive: document.inactive, + info: TenantRelayInfoConfig::new( + document.info.name, + document.info.description, + document.info.contact, + document.info.icon, + )?, + pocket, + pocket_query, + groups, + auth_ttl_seconds: document.auth.challenge_ttl_seconds, + auth_created_at_skew_seconds: document.auth.created_at_skew_seconds, + limits, + rate_limits: base_relay_rate_limits_from_document(document.rate_limits)?, + backup_export: TenantBackupExportConfig::new( + document.backup_export.backup_enabled, + document.backup_export.export_enabled, + ), + }) +} + +fn reject_legacy_single_relay_config(raw: &str) -> Result<(), BaseRelayError> { + let value = serde_json::from_str::<serde_json::Value>(raw) + .map_err(|error| BaseRelayError::invalid(format!("config JSON is invalid: {error}")))?; + if value + .as_object() + .is_some_and(|object| object.contains_key("server")) + { + return Err(BaseRelayError::invalid( + "legacy single-relay config is not supported", + )); + } + Ok(()) +} + pub fn parse_base_relay_runtime_config_json( raw: &str, ) -> Result<BaseRelayRuntimeConfig, BaseRelayError> { @@ -648,13 +1192,105 @@ fn base_relay_tracing_config_from_document( ) } +fn validate_optional_text( + field: &str, + value: Option<String>, +) -> Result<Option<String>, BaseRelayError> { + if let Some(value) = value { + if value.trim().is_empty() || value.trim() != value { + return Err(BaseRelayError::invalid(format!( + "{field} must not be empty or padded" + ))); + } + Ok(Some(value)) + } else { + Ok(None) + } +} + #[cfg(test)] mod tests { - use super::{BaseRelayTracingFormat, parse_base_relay_runtime_config_json}; + use super::{ + BaseRelayTracingFormat, parse_base_relay_runtime_config_json, + parse_tangle_host_runtime_config_json, parse_tenant_runtime_config_json, + }; use std::path::Path; use tangle_store_pocket::PocketSyncPolicy; #[test] + fn tangle_host_runtime_config_parses_v1_mvp_example() { + let config = parse_tangle_host_runtime_config_json(include_str!( + "../../../config/tangle.host.example.json" + )) + .expect("host config"); + + assert_eq!(config.listen_addr().to_string(), "0.0.0.0:7000"); + assert_eq!(config.tenant_config_dir(), Path::new("config/tenants")); + assert_eq!(config.limits().max_total_connections(), 10_000); + assert_eq!(config.limits().max_total_subscriptions(), 25_000); + assert_eq!(config.limits().tenant_startup_concurrency(), 4); + assert!(config.ops().enabled()); + assert!(config.ops().expose_tenant_inventory()); + assert!(!config.trusted_proxy().enabled()); + assert!(config.trusted_proxy().trusted_peers().is_empty()); + assert!(config.tracing().enabled()); + assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json); + } + + #[test] + fn tenant_runtime_config_parses_v1_mvp_example() { + let config = parse_tenant_runtime_config_json(include_str!( + "../../../config/tenants/farmers_market.example.json" + )) + .expect("tenant config"); + + assert_eq!(config.tenant_id().as_str(), "farmers-market"); + assert_eq!(config.tenant_schema().as_str(), "farmers_market"); + assert_eq!(config.host().as_str(), "relay.radroots.test"); + assert_eq!(config.relay_url().as_str(), "wss://relay.radroots.test"); + assert!(!config.inactive()); + assert_eq!(config.info().name(), "Radroots Farmers Market"); + assert_eq!( + config.info().description(), + Some("Tangle virtual relay tenant for the Radroots farmers market") + ); + assert_eq!( + config.pocket_config().data_directory(), + Path::new("runtime/tenants/farmers_market/pocket") + ); + assert_eq!( + config.pocket_config().sync_policy(), + PocketSyncPolicy::FlushOnShutdown + ); + assert!(config.groups().enabled()); + assert_eq!(config.auth_ttl_seconds(), 300); + assert_eq!(config.auth_created_at_skew_seconds(), 600); + assert_eq!(config.limits().max_subscriptions_per_connection(), 64); + assert_eq!(config.rate_limits().auth().per_ip().max_hits(), 120); + assert!(config.backup_export().backup_enabled()); + assert!(config.backup_export().export_enabled()); + assert!(config.relay_self_pubkey().expect("relay self").is_some()); + } + + #[test] + fn tangle_v1_mvp_config_rejects_legacy_single_relay_shape() { + let raw = include_str!("../../../config/tangle.example.json"); + + assert_eq!( + parse_tangle_host_runtime_config_json(raw) + .expect_err("legacy host config") + .prefixed_message(), + "invalid: legacy single-relay config is not supported" + ); + assert_eq!( + parse_tenant_runtime_config_json(raw) + .expect_err("legacy tenant config") + .prefixed_message(), + "invalid: legacy single-relay config is not supported" + ); + } + + #[test] fn base_relay_runtime_config_parses_v2_production_example() { let config = parse_base_relay_runtime_config_json(include_str!( "../../../config/tangle.example.json" diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -15,6 +15,7 @@ pub mod relay; pub mod runtime; pub mod server; pub mod session; +pub mod tenant; use std::{fmt, fs, path::Path, path::PathBuf}; diff --git a/crates/tangle_runtime/src/tenant.rs b/crates/tangle_runtime/src/tenant.rs @@ -0,0 +1,336 @@ +#![forbid(unsafe_code)] + +use crate::errors::BaseRelayError; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TenantId(String); + +impl TenantId { + pub const MAX_LENGTH: usize = 64; + + pub fn new(value: impl Into<String>) -> Result<Self, BaseRelayError> { + let value = value.into(); + validate_identifier( + "tenant_id", + &value, + Self::MAX_LENGTH, + IdentifierAlphabet::TenantId, + )?; + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for TenantId { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(&self.0) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TenantSchema(String); + +impl TenantSchema { + pub const MAX_LENGTH: usize = 64; + + pub fn new(value: impl Into<String>) -> Result<Self, BaseRelayError> { + let value = value.into(); + validate_identifier( + "tenant_schema", + &value, + Self::MAX_LENGTH, + IdentifierAlphabet::TenantSchema, + )?; + Ok(Self(value)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for TenantSchema { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(&self.0) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct CanonicalHost(String); + +impl CanonicalHost { + pub fn new(value: impl AsRef<str>) -> Result<Self, BaseRelayError> { + let raw = value.as_ref(); + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err(BaseRelayError::invalid("host must not be empty")); + } + if trimmed != raw { + return Err(BaseRelayError::invalid( + "host must not contain leading or trailing whitespace", + )); + } + if trimmed.contains("://") { + return Err(BaseRelayError::invalid( + "host must not include a URL scheme", + )); + } + if trimmed.chars().any(|character| { + character.is_whitespace() || matches!(character, '/' | '\\' | '?' | '#' | '@') + }) { + return Err(BaseRelayError::invalid( + "host must not contain whitespace, path, query, fragment, or credentials", + )); + } + if trimmed.contains('[') || trimmed.contains(']') { + return Err(BaseRelayError::invalid( + "host must use a DNS name or IPv4 address with optional port", + )); + } + let lowercase = trimmed.to_ascii_lowercase(); + let (host, port) = split_host_port(&lowercase)?; + validate_host_name(host)?; + if let Some(port) = port { + validate_port(port)?; + } + Ok(Self(lowercase)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for CanonicalHost { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(&self.0) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TenantRelayUrl(String); + +impl TenantRelayUrl { + pub fn new(value: impl AsRef<str>) -> Result<Self, BaseRelayError> { + let raw = value.as_ref(); + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err(BaseRelayError::invalid("relay_url must not be empty")); + } + if trimmed != raw { + return Err(BaseRelayError::invalid( + "relay_url must not contain leading or trailing whitespace", + )); + } + if trimmed.chars().any(char::is_whitespace) { + return Err(BaseRelayError::invalid( + "relay_url must not contain whitespace", + )); + } + let (scheme, rest) = trimmed + .split_once("://") + .ok_or_else(|| BaseRelayError::invalid("relay_url must include ws:// or wss://"))?; + let scheme = match scheme { + "ws" => "ws", + "wss" => "wss", + _ => { + return Err(BaseRelayError::invalid( + "relay_url must start with ws:// or wss://", + )); + } + }; + let authority = rest.split('/').next().unwrap_or_default(); + if authority.is_empty() { + return Err(BaseRelayError::invalid("relay_url host must not be empty")); + } + let host = CanonicalHost::new(authority)?; + let suffix = rest + .strip_prefix(authority) + .expect("authority came from rest prefix"); + Ok(Self(format!("{scheme}://{}{}", host.as_str(), suffix))) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for TenantRelayUrl { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(&self.0) + } +} + +#[derive(Clone, Copy)] +enum IdentifierAlphabet { + TenantId, + TenantSchema, +} + +fn validate_identifier( + field: &str, + value: &str, + max_length: usize, + alphabet: IdentifierAlphabet, +) -> Result<(), BaseRelayError> { + if value.is_empty() { + return Err(BaseRelayError::invalid(format!( + "{field} must not be empty" + ))); + } + if value.len() > max_length { + return Err(BaseRelayError::invalid(format!( + "{field} must be {max_length} bytes or less" + ))); + } + if value.trim() != value { + return Err(BaseRelayError::invalid(format!( + "{field} must not contain leading or trailing whitespace" + ))); + } + let Some(first) = value.as_bytes().first().copied() else { + return Err(BaseRelayError::invalid(format!( + "{field} must not be empty" + ))); + }; + if !first.is_ascii_lowercase() { + return Err(BaseRelayError::invalid(format!( + "{field} must start with a lowercase ASCII letter" + ))); + } + let valid = value.bytes().all(|byte| match alphabet { + IdentifierAlphabet::TenantId => { + byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'_' || byte == b'-' + } + IdentifierAlphabet::TenantSchema => { + byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'_' + } + }); + if !valid { + return Err(BaseRelayError::invalid(format!( + "{field} must contain only lowercase ASCII letters, digits, and approved separators" + ))); + } + Ok(()) +} + +fn split_host_port(host: &str) -> Result<(&str, Option<&str>), BaseRelayError> { + if host.matches(':').count() > 1 { + return Err(BaseRelayError::invalid( + "host must not contain multiple port separators", + )); + } + if let Some((name, port)) = host.rsplit_once(':') { + if name.is_empty() || port.is_empty() { + return Err(BaseRelayError::invalid( + "host and port must both be present", + )); + } + Ok((name, Some(port))) + } else { + Ok((host, None)) + } +} + +fn validate_host_name(host: &str) -> Result<(), BaseRelayError> { + if host.is_empty() { + return Err(BaseRelayError::invalid("host name must not be empty")); + } + for label in host.split('.') { + if label.is_empty() { + return Err(BaseRelayError::invalid("host labels must not be empty")); + } + if label.starts_with('-') || label.ends_with('-') { + return Err(BaseRelayError::invalid( + "host labels must not start or end with hyphen", + )); + } + if !label + .bytes() + .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-') + { + return Err(BaseRelayError::invalid( + "host labels must contain only lowercase ASCII letters, digits, and hyphen", + )); + } + } + Ok(()) +} + +fn validate_port(port: &str) -> Result<(), BaseRelayError> { + if !port.bytes().all(|byte| byte.is_ascii_digit()) { + return Err(BaseRelayError::invalid("host port must be numeric")); + } + let parsed = port + .parse::<u16>() + .map_err(|_| BaseRelayError::invalid("host port must be between 1 and 65535"))?; + if parsed == 0 { + return Err(BaseRelayError::invalid( + "host port must be between 1 and 65535", + )); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{CanonicalHost, TenantId, TenantRelayUrl, TenantSchema}; + + #[test] + fn tenant_identity_types_accept_canonical_values() { + assert_eq!( + TenantId::new("farmers-market").expect("id").as_str(), + "farmers-market" + ); + assert_eq!( + TenantSchema::new("farmers_market") + .expect("schema") + .as_str(), + "farmers_market" + ); + assert_eq!( + CanonicalHost::new("Relay.Example.TEST:8083") + .expect("host") + .as_str(), + "relay.example.test:8083" + ); + assert_eq!( + TenantRelayUrl::new("wss://Relay.Example.TEST:443/groups") + .expect("url") + .as_str(), + "wss://relay.example.test:443/groups" + ); + } + + #[test] + fn tenant_identity_types_reject_noncanonical_values() { + for value in ["", "Farm", "farm space", "farm.market"] { + assert!(TenantId::new(value).is_err()); + } + for value in ["", "farm-market", "Farm", "_farm"] { + assert!(TenantSchema::new(value).is_err()); + } + for value in [ + "", + " https://relay.example.test", + "https://relay.example.test", + "relay.example.test/path", + "user@relay.example.test", + ] { + assert!(CanonicalHost::new(value).is_err()); + } + for value in [ + "", + "http://relay.example.test", + "wss://", + "wss://user@relay.example.test", + "wss://relay example.test", + ] { + assert!(TenantRelayUrl::new(value).is_err()); + } + } +}