tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 51373b43e9566f6162e864a4da35ef68c88965f1
parent 1b5f9c9afc9aad27ab178683245354c157474e27
Author: triesap <tyson@radroots.org>
Date:   Wed, 17 Jun 2026 15:58:31 -0700

server: route requests by tenant host

- serve Tangle through the host runtime router
- resolve tenant Host before NIP-11 and WebSocket sessions
- replace legacy ops paths with well-known host endpoints
- enforce host connection and subscription caps in sessions

Diffstat:
Mcrates/tangle/src/lib.rs | 15+++++++++------
Mcrates/tangle/tests/version.rs | 23++++++++++++++---------
Mcrates/tangle_runtime/src/config.rs | 16----------------
Mcrates/tangle_runtime/src/host.rs | 2+-
Mcrates/tangle_runtime/src/nip11.rs | 23++++++++++++++++++++++-
Mcrates/tangle_runtime/src/relay/live.rs | 4++++
Mcrates/tangle_runtime/src/runtime.rs | 4++++
Mcrates/tangle_runtime/src/server.rs | 702+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
Mcrates/tangle_runtime/src/session.rs | 44++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 145+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
10 files changed, 822 insertions(+), 156 deletions(-)

diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -196,14 +196,17 @@ pub async fn run_with_config( ) -> Result<tangle_runtime::server::TangleServeReport, String> { let config_set = tangle_runtime::load_tangle_host_runtime_config(config_path) .map_err(|error| error.to_string())?; - let config = config_set - .single_active_runtime_config() + tangle_runtime::logging::init_tangle_tracing(config_set.host().tracing()) .map_err(|error| error.to_string())?; - tangle_runtime::logging::init_tangle_tracing(config.tracing()) + for tenant in config_set.active_tenants() { + let config = tenant.to_base_relay_runtime_config( + config_set.host().listen_addr(), + config_set.host().tracing().clone(), + ); + tangle_runtime::logging::log_runtime_config_loaded(&config); + } + let runtime = tangle_runtime::host::TangleHostRuntime::open(config_set) .map_err(|error| error.to_string())?; - tangle_runtime::logging::log_runtime_config_loaded(&config); - let runtime = - tangle_runtime::runtime::TenantRuntime::open(config).map_err(|error| error.to_string())?; tangle_runtime::server::serve_until_shutdown(runtime) .await .map_err(|error| error.to_string()) diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -209,21 +209,26 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { .expect("write config"); let mut child = TangleChild::spawn(&config_path); - let health = wait_for_http_ok(listen_addr, "/healthz", None); - let ready = wait_for_http_ok(listen_addr, "/readyz", None); + let ready = wait_for_http_ok(listen_addr, "/.well-known/tangle/ready", None); + let metrics = wait_for_http_ok(listen_addr, "/.well-known/tangle/metrics", None); + let tenants = wait_for_http_ok(listen_addr, "/.well-known/tangle/tenants", None); let nip11 = wait_for_http_ok(listen_addr, "/", Some("application/nostr+json")); - let health_value = - serde_json::from_str::<serde_json::Value>(response_body(&health)).expect("health json"); let ready_value = serde_json::from_str::<serde_json::Value>(response_body(&ready)).expect("ready json"); + let metrics_value = + serde_json::from_str::<serde_json::Value>(response_body(&metrics)).expect("metrics json"); + let tenants_value = + serde_json::from_str::<serde_json::Value>(response_body(&tenants)).expect("tenants json"); let nip11_value = serde_json::from_str::<serde_json::Value>(response_body(&nip11)).expect("nip11 json"); - assert_eq!(health_value["status"], "ok"); assert_eq!(ready_value["status"], "ready"); - assert_eq!(ready_value["checks"]["server_bind"], "ready"); - assert_eq!(ready_value["checks"]["pocket_storage"], "ready"); - assert_eq!(nip11_value["name"], "tangle"); + assert_eq!(ready_value["checks"]["active_tenants"], "ready"); + assert_eq!(metrics_value["tangle_host_configured_tenants"], 1); + assert_eq!(metrics_value["tangle_host_active_tenants"], 1); + assert_eq!(tenants_value["tenants"][0]["tenant_id"], "farmers-market"); + assert_eq!(tenants_value["tenants"][0]["ready"], true); + assert_eq!(nip11_value["name"], "Radroots Farmers Market"); assert_eq!(nip11_value["limitation"]["max_message_length"], 1_048_576); assert_eq!(nip11_value["limitation"]["max_subscriptions"], 64); assert_eq!(nip11_value["limitation"]["max_filters"], 10); @@ -327,7 +332,7 @@ fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::R let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(200))?; stream.set_read_timeout(Some(Duration::from_millis(500)))?; stream.set_write_timeout(Some(Duration::from_millis(500)))?; - let mut request = format!("GET {path} HTTP/1.1\r\nHost: {address}\r\n"); + let mut request = format!("GET {path} HTTP/1.1\r\nHost: relay.radroots.test\r\n"); if let Some(accept) = accept { request.push_str("Accept: "); request.push_str(accept); diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs @@ -61,22 +61,6 @@ impl TangleHostRuntimeConfigSet { pub fn active_tenants(&self) -> impl Iterator<Item = &TenantRuntimeConfig> { self.tenants.iter().filter(|tenant| !tenant.inactive()) } - - pub fn single_active_runtime_config(&self) -> Result<BaseRelayRuntimeConfig, BaseRelayError> { - let mut active = self.active_tenants(); - let Some(tenant) = active.next() else { - return Err(BaseRelayError::invalid( - "at least one active tenant is required", - )); - }; - if active.next().is_some() { - return Err(BaseRelayError::invalid( - "multi-tenant HTTP routing is not implemented yet", - )); - } - Ok(tenant - .to_base_relay_runtime_config(self.host.listen_addr(), self.host.tracing().clone())) - } } impl TangleHostRuntimeConfig { diff --git a/crates/tangle_runtime/src/host.rs b/crates/tangle_runtime/src/host.rs @@ -15,7 +15,7 @@ use std::{ }, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TangleHostRuntime { config: TangleHostRuntimeConfigSet, registry: TenantRegistry, diff --git a/crates/tangle_runtime/src/nip11.rs b/crates/tangle_runtime/src/nip11.rs @@ -1,7 +1,7 @@ #![forbid(unsafe_code)] use crate::{ - config::{BaseRelayRuntimeConfig, BaseRelayRuntimeLimitsConfig}, + config::{BaseRelayRuntimeConfig, BaseRelayRuntimeLimitsConfig, TenantRuntimeConfig}, errors::BaseRelayError, }; use axum::{ @@ -71,6 +71,27 @@ impl BaseRelayInfoConfig { }) } + pub fn from_tenant_config(tenant: &TenantRuntimeConfig) -> Result<Self, BaseRelayError> { + let mut config = Self { + name: tenant.info().name().to_owned(), + description: tenant.info().description().map(str::to_owned), + contact: tenant.info().contact().map(str::to_owned), + icon: tenant.info().icon().map(str::to_owned), + groups: tenant.groups().clone(), + limits: tenant.limits(), + software: crate::TANGLE_RELAY_SOFTWARE.to_owned(), + version: crate::TANGLE_RELAY_VERSION.to_owned(), + payment_required: false, + restricted_writes: true, + supported_nips: supported_nips_for_group_capability(tenant.groups().enabled()), + }; + if config.name.trim().is_empty() { + return Err(BaseRelayError::invalid("relay name must not be empty")); + } + config.name = config.name.trim().to_owned(); + Ok(config) + } + pub fn with_description(mut self, description: impl Into<String>) -> Self { self.description = Some(description.into()); self diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs @@ -77,6 +77,10 @@ impl LiveSubscriptionSet { } } + pub(crate) fn contains(&self, subscription_id: &SubscriptionId) -> bool { + self.subscriptions.contains_key(subscription_id) + } + pub(crate) fn close_all(&mut self) -> usize { let closed = self.subscriptions.len(); self.subscriptions.clear(); diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -809,6 +809,10 @@ impl TenantRuntimeHandle { self.inner.readiness.clone() } + pub fn limits(&self) -> TangleRuntimeLimits { + self.inner.limits + } + pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { self.inner.config.auth_state() } diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -2,17 +2,15 @@ use crate::{ errors::BaseRelayError, + host::{TangleHostRuntime, TenantRuntimeEntry}, logging, nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response}, - ops::{BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, base_relay_ops_router}, - runtime::{ - TangleRuntimeLimits, TangleRuntimeMetrics, TangleShutdownSignal, TenantRuntime, - TenantRuntimeHandle, - }, + ops::BaseRelayReadinessCheckStatus, session::TangleWebSocketSession, + tenant::CanonicalHost, }; use axum::{ - Router, + Json, Router, extract::{ ConnectInfo, State, ws::{WebSocketUpgrade, rejection::WebSocketUpgradeRejection}, @@ -20,8 +18,8 @@ use axum::{ response::{IntoResponse, Response}, routing::get, }; -use http::HeaderMap; -use std::net::SocketAddr; +use http::{HeaderMap, StatusCode, header}; +use std::{collections::BTreeSet, net::SocketAddr}; use tokio::net::TcpListener; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -48,39 +46,31 @@ impl TangleServeReport { } pub async fn serve_until_shutdown( - runtime: TenantRuntime, + runtime: TangleHostRuntime, ) -> Result<TangleServeReport, BaseRelayError> { - let listener = TcpListener::bind(runtime.config().listen_addr()) + let listener = TcpListener::bind(runtime.config().host().listen_addr()) .await .map_err(|error| BaseRelayError::error(error.to_string()))?; serve_listener_until_shutdown(runtime, listener).await } pub async fn serve_listener_until_shutdown( - runtime: TenantRuntime, + runtime: TangleHostRuntime, listener: TcpListener, ) -> Result<TangleServeReport, BaseRelayError> { let listen_addr = listener .local_addr() .map_err(|error| BaseRelayError::error(error.to_string()))?; - let relay_url = runtime.config().relay_url().to_owned(); - let info = BaseRelayInfoConfig::new("tangle", runtime.config())?.build_document()?; - let readiness = runtime.readiness_handle(); - readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); - let limits = runtime.limits(); - let metrics = runtime.metrics().clone(); + for tenant in runtime.registry().active_tenants() { + tenant + .runtime() + .readiness_handle() + .set_server_bind(BaseRelayReadinessCheckStatus::Ready); + } let shutdown_signal = runtime.shutdown_signal().clone(); - let runtime = TenantRuntimeHandle::new(runtime); - let router = tangle_http_router( - readiness, - info, - limits, - metrics, - shutdown_signal.clone(), - runtime.clone(), - ); + let router = tangle_http_router(runtime.clone()); let mut shutdown = shutdown_signal.subscribe(); - logging::log_server_listening(listen_addr, &relay_url); + logging::log_server_listening(listen_addr, "tangle-host"); axum::serve( listener, router.into_make_service_with_connect_info::<SocketAddr>(), @@ -105,31 +95,18 @@ pub async fn serve_listener_until_shutdown( )) } -pub fn tangle_http_router( - readiness: BaseRelayReadinessHandle, - info: BaseRelayInfoDocument, - limits: TangleRuntimeLimits, - metrics: TangleRuntimeMetrics, - shutdown: TangleShutdownSignal, - runtime: TenantRuntimeHandle, -) -> Router { +pub fn tangle_http_router(runtime: TangleHostRuntime) -> Router { Router::new() .route("/", get(tangle_root)) - .with_state(TangleHttpState { - info, - limits, - shutdown, - runtime, - }) - .merge(base_relay_ops_router(readiness, metrics)) + .route("/.well-known/tangle/ready", get(tangle_host_ready)) + .route("/.well-known/tangle/metrics", get(tangle_host_metrics)) + .route("/.well-known/tangle/tenants", get(tangle_host_tenants)) + .with_state(TangleHttpState { runtime }) } #[derive(Debug, Clone)] struct TangleHttpState { - info: BaseRelayInfoDocument, - limits: TangleRuntimeLimits, - shutdown: TangleShutdownSignal, - runtime: TenantRuntimeHandle, + runtime: TangleHostRuntime, } async fn tangle_root( @@ -138,23 +115,39 @@ async fn tangle_root( websocket: Result<WebSocketUpgrade, WebSocketUpgradeRejection>, headers: HeaderMap, ) -> Response { + let tenant = match resolve_tenant(&state.runtime, &headers, peer_addr) { + Ok(tenant) => tenant.clone(), + Err(error) => return error.into_response(), + }; match websocket { Ok(websocket) => { - let session = match state.runtime.auth_state().await { - Ok(auth) => TangleWebSocketSession::new_with_peer( - state.limits, - state.shutdown.subscribe(), - state.runtime.clone(), + let connection = match state.runtime.resources().try_open_connection() { + Ok(connection) => connection, + Err(error) => { + return (StatusCode::TOO_MANY_REQUESTS, error.prefixed_message()) + .into_response(); + } + }; + let tenant_runtime = tenant.runtime().clone(); + let session = match tenant_runtime.auth_state().await { + Ok(auth) => TangleWebSocketSession::new_with_peer_and_host_resources( + tenant_runtime.limits(), + state.runtime.shutdown_signal().subscribe(), + tenant_runtime.clone(), auth, - state.runtime.subscribe_events().await, + tenant_runtime.subscribe_events().await, Some(peer_addr.ip()), + Some(state.runtime.resources()), ), Err(error) => Err(error), }; match session { Ok(session) => websocket .protocols(["nostr"]) - .on_upgrade(move |socket| session.run(socket)) + .on_upgrade(move |socket| async move { + let _connection = connection; + session.run(socket).await; + }) .into_response(), Err(error) => ( http::StatusCode::INTERNAL_SERVER_ERROR, @@ -163,7 +156,217 @@ async fn tangle_root( .into_response(), } } - Err(_) => base_relay_info_response(state.info, headers), + Err(_) => match tenant_info_document(&tenant) { + Ok(info) => base_relay_info_response(info, headers), + Err(error) => { + (StatusCode::INTERNAL_SERVER_ERROR, error.prefixed_message()).into_response() + } + }, + } +} + +async fn tangle_host_ready(State(state): State<TangleHttpState>) -> Response { + let readiness = state.runtime.readiness_state(); + let status = if readiness.is_ready() { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + ( + status, + Json(serde_json::json!({ + "status": if readiness.is_ready() { "ready" } else { "not_ready" }, + "checks": { + "config": readiness.config().as_str(), + "tenant_registry": readiness.tenant_registry().as_str(), + "active_tenants": readiness.active_tenants().as_str(), + "shutdown_requested": readiness.shutdown_requested() + } + })), + ) + .into_response() +} + +async fn tangle_host_metrics(State(state): State<TangleHttpState>) -> Json<serde_json::Value> { + let metrics = state.runtime.metrics_snapshot(); + let mut values = serde_json::Map::new(); + values.insert( + "tangle_host_configured_tenants".to_owned(), + serde_json::json!(metrics.configured_tenants()), + ); + values.insert( + "tangle_host_active_tenants".to_owned(), + serde_json::json!(metrics.active_tenants()), + ); + values.insert( + "tangle_host_inactive_tenants".to_owned(), + serde_json::json!(metrics.inactive_tenants()), + ); + values.insert( + "tangle_host_ws_connections_current".to_owned(), + serde_json::json!(metrics.active_connections()), + ); + values.insert( + "tangle_host_subscriptions_current".to_owned(), + serde_json::json!(metrics.active_subscriptions()), + ); + values.insert( + "tangle_host_ws_connections_limit".to_owned(), + serde_json::json!(metrics.max_total_connections()), + ); + values.insert( + "tangle_host_subscriptions_limit".to_owned(), + serde_json::json!(metrics.max_total_subscriptions()), + ); + values.insert( + "tangle_readiness_ready".to_owned(), + serde_json::json!(state.runtime.readiness_state().is_ready()), + ); + for tenant in state.runtime.registry().active_tenants() { + let snapshot = tenant + .runtime() + .metrics() + .snapshot_with_readiness(tenant.runtime().readiness_handle().snapshot().is_ready()); + let serde_json::Value::Object(snapshot) = + serde_json::to_value(snapshot).expect("tenant metrics serialize") + else { + continue; + }; + for (key, value) in snapshot { + if key == "tangle_readiness_ready" { + continue; + } + if let Some(value) = value.as_u64() { + let current = values + .get(&key) + .and_then(serde_json::Value::as_u64) + .unwrap_or(0); + values.insert(key, serde_json::json!(current.saturating_add(value))); + } + } + } + Json(serde_json::Value::Object(values)) +} + +async fn tangle_host_tenants(State(state): State<TangleHttpState>) -> Json<serde_json::Value> { + let active_tenant_ids = state + .runtime + .registry() + .active_tenants() + .map(|tenant| tenant.tenant_id().as_str().to_owned()) + .collect::<BTreeSet<_>>(); + let tenants = state + .runtime + .config() + .tenants() + .iter() + .map(|tenant| { + let active = active_tenant_ids.contains(tenant.tenant_id().as_str()); + let readiness = state + .runtime + .registry() + .tenant_by_id(tenant.tenant_id()) + .map(|entry| entry.runtime().readiness_handle().snapshot().is_ready()); + serde_json::json!({ + "tenant_id": tenant.tenant_id().as_str(), + "tenant_schema": tenant.tenant_schema().as_str(), + "host": tenant.host().as_str(), + "relay_url": tenant.relay_url().as_str(), + "status": if active { "active" } else { "inactive" }, + "ready": readiness.unwrap_or(false) + }) + }) + .collect::<Vec<_>>(); + Json(serde_json::json!({ "tenants": tenants })) +} + +fn resolve_tenant<'a>( + runtime: &'a TangleHostRuntime, + headers: &HeaderMap, + peer_addr: SocketAddr, +) -> Result<&'a TenantRuntimeEntry, HostResolutionError> { + let host = resolve_request_host(headers, peer_addr, runtime.config().host().trusted_proxy())?; + runtime + .registry() + .tenant_by_host(&host) + .ok_or(HostResolutionError::Unknown) +} + +fn tenant_info_document( + tenant: &TenantRuntimeEntry, +) -> Result<BaseRelayInfoDocument, BaseRelayError> { + BaseRelayInfoConfig::from_tenant_config(tenant.config())?.build_document() +} + +fn resolve_request_host( + headers: &HeaderMap, + peer_addr: SocketAddr, + trusted_proxy: &crate::config::TangleTrustedProxyConfig, +) -> Result<CanonicalHost, HostResolutionError> { + let forwarded_host = trusted_proxy_peer_enabled(trusted_proxy, peer_addr) + .then(|| forwarded_host_header(headers)) + .flatten(); + let host = forwarded_host + .or_else(|| { + headers + .get(header::HOST) + .and_then(|value| value.to_str().ok()) + }) + .ok_or(HostResolutionError::Missing)?; + let host = host + .split(',') + .next() + .map(str::trim) + .filter(|host| !host.is_empty()) + .ok_or(HostResolutionError::Missing)?; + CanonicalHost::new(host).map_err(|_| HostResolutionError::Invalid) +} + +fn trusted_proxy_peer_enabled( + trusted_proxy: &crate::config::TangleTrustedProxyConfig, + peer_addr: SocketAddr, +) -> bool { + trusted_proxy.enabled() + && trusted_proxy + .trusted_peers() + .iter() + .any(|peer| peer == &peer_addr.ip().to_string() || peer == &peer_addr.to_string()) +} + +fn forwarded_host_header(headers: &HeaderMap) -> Option<&str> { + headers + .get("x-forwarded-host") + .and_then(|value| value.to_str().ok()) + .or_else(|| { + headers + .get("forwarded") + .and_then(|value| value.to_str().ok()) + .and_then(forwarded_host_value) + }) +} + +fn forwarded_host_value(value: &str) -> Option<&str> { + value.split(';').find_map(|part| { + let (name, value) = part.trim().split_once('=')?; + name.eq_ignore_ascii_case("host") + .then(|| value.trim_matches('"')) + }) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum HostResolutionError { + Missing, + Invalid, + Unknown, +} + +impl IntoResponse for HostResolutionError { + fn into_response(self) -> Response { + match self { + Self::Missing => (StatusCode::BAD_REQUEST, "missing host").into_response(), + Self::Invalid => (StatusCode::BAD_REQUEST, "invalid host").into_response(), + Self::Unknown => (StatusCode::NOT_FOUND, "unknown host").into_response(), + } } } @@ -171,12 +374,16 @@ async fn tangle_root( mod tests { use super::{serve_until_shutdown, tangle_http_router}; use crate::{ - config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, - nip11::BaseRelayInfoConfig, - ops::BaseRelayReadinessCheckStatus, - runtime::{TangleShutdownSignal, TenantRuntime, TenantRuntimeHandle}, + config::{ + TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json, + parse_tenant_runtime_config_json, + }, + host::TangleHostRuntime, + }; + use axum::{ + body::{Body, to_bytes}, + extract::ConnectInfo, }; - use axum::{body::to_bytes, extract::ConnectInfo}; use futures_util::{SinkExt, StreamExt}; use http::{Request, header}; use serde_json::json; @@ -205,7 +412,7 @@ mod tests { async fn serve_until_shutdown_binds_listener_and_exits_on_signal() { let root = temp_root("serve-until-shutdown"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = host_runtime(&root); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_until_shutdown(runtime)); @@ -224,7 +431,7 @@ mod tests { async fn serve_until_shutdown_accepts_websocket_upgrade() { let root = temp_root("websocket-upgrade"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = host_runtime(&root); let shutdown = runtime.shutdown_signal().clone(); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); @@ -236,6 +443,10 @@ mod tests { header::SEC_WEBSOCKET_PROTOCOL, http::HeaderValue::from_static("nostr"), ); + request.headers_mut().insert( + header::HOST, + http::HeaderValue::from_static("relay.radroots.test"), + ); let (_socket, response) = tokio_tungstenite::connect_async(request) .await @@ -260,7 +471,7 @@ mod tests { async fn serve_until_shutdown_closes_websocket_sessions() { let root = temp_root("websocket-shutdown"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = host_runtime(&root); let shutdown = runtime.shutdown_signal().clone(); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); @@ -272,6 +483,10 @@ mod tests { header::SEC_WEBSOCKET_PROTOCOL, http::HeaderValue::from_static("nostr"), ); + request.headers_mut().insert( + header::HOST, + http::HeaderValue::from_static("relay.radroots.test"), + ); let (mut socket, response) = tokio_tungstenite::connect_async(request) .await .expect("websocket"); @@ -301,7 +516,7 @@ mod tests { async fn websocket_session_dispatches_base_client_messages() { let root = temp_root("websocket-dispatch"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = host_runtime(&root); let shutdown = runtime.shutdown_signal().clone(); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); @@ -313,6 +528,10 @@ mod tests { header::SEC_WEBSOCKET_PROTOCOL, http::HeaderValue::from_static("nostr"), ); + request.headers_mut().insert( + header::HOST, + http::HeaderValue::from_static("relay.radroots.test"), + ); let (mut socket, response) = tokio_tungstenite::connect_async(request) .await .expect("websocket"); @@ -414,31 +633,22 @@ mod tests { } #[tokio::test] - async fn tangle_http_router_serves_nip11_health_and_ready_routes() { + async fn tangle_http_router_serves_nip11_and_host_ops_routes() { let root = temp_root("http-router"); - let config = runtime_config(&root); - let info = BaseRelayInfoConfig::new("tangle", &config) - .expect("info config") - .build_document() - .expect("info"); - let runtime = TenantRuntime::open(config).expect("runtime"); - let readiness = runtime.readiness_handle(); - readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); - let limits = runtime.limits(); - let metrics = runtime.metrics().clone(); - let router = tangle_http_router( - readiness, - info, - limits, - metrics, - TangleShutdownSignal::new(), - TenantRuntimeHandle::new(runtime), - ); + let runtime = host_runtime(&root); + for tenant in runtime.registry().active_tenants() { + tenant + .runtime() + .readiness_handle() + .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready); + } + let router = tangle_http_router(runtime); let nip11 = router .clone() .oneshot( Request::builder() .uri("/") + .header(header::HOST, "relay.radroots.test") .header(header::ACCEPT, "application/nostr+json") .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_000)))) .body(axum::body::Body::empty()) @@ -451,6 +661,7 @@ mod tests { .oneshot( Request::builder() .uri("/") + .header(header::HOST, "relay.radroots.test") .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_001)))) .body(axum::body::Body::empty()) .expect("request"), @@ -471,21 +682,31 @@ mod tests { .clone() .oneshot( Request::builder() - .uri("/readyz") + .uri("/.well-known/tangle/ready") .body(axum::body::Body::empty()) .expect("request"), ) .await .expect("ready"); let metrics = router + .clone() .oneshot( Request::builder() - .uri("/metricsz") + .uri("/.well-known/tangle/metrics") .body(axum::body::Body::empty()) .expect("request"), ) .await .expect("metrics"); + let tenants = router + .oneshot( + Request::builder() + .uri("/.well-known/tangle/tenants") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("tenants"); assert_eq!(nip11.status(), http::StatusCode::OK); assert_eq!( @@ -494,7 +715,7 @@ mod tests { ); let nip11_body = to_bytes(nip11.into_body(), usize::MAX).await.expect("body"); let nip11_value = serde_json::from_slice::<serde_json::Value>(&nip11_body).expect("json"); - assert_eq!(nip11_value["name"], "tangle"); + assert_eq!(nip11_value["name"], "Radroots Test Relay"); assert_eq!(nip11_value["limitation"]["max_message_length"], 1_048_576); assert_eq!(nip11_value["limitation"]["max_subscriptions"], 64); assert_eq!(nip11_value["limitation"]["max_filters"], 10); @@ -516,12 +737,11 @@ mod tests { .contains(&serde_json::json!(29)) ); assert_eq!(root_without_accept.status(), http::StatusCode::NOT_FOUND); - assert_eq!(health.status(), http::StatusCode::OK); + assert_eq!(health.status(), http::StatusCode::NOT_FOUND); assert_eq!(ready.status(), http::StatusCode::OK); let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json"); - assert_eq!(ready_value["checks"]["server_bind"], "ready"); - assert_eq!(ready_value["checks"]["event_bus"], "ready"); + assert_eq!(ready_value["checks"]["active_tenants"], "ready"); assert_eq!(metrics.status(), http::StatusCode::OK); let metrics_body = to_bytes(metrics.into_body(), usize::MAX) .await @@ -529,8 +749,17 @@ mod tests { let metrics_value = serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json"); assert_eq!(metrics_value["tangle_readiness_ready"], true); + assert_eq!(metrics_value["tangle_host_active_tenants"], 1); assert_eq!(metrics_value["tangle_ws_connections_current"], 0); assert_eq!(metrics_value["tangle_stored_event_offsets_total"], 0); + assert_eq!(tenants.status(), http::StatusCode::OK); + let tenants_body = to_bytes(tenants.into_body(), usize::MAX) + .await + .expect("body"); + let tenants_value = + serde_json::from_slice::<serde_json::Value>(&tenants_body).expect("json"); + assert_eq!(tenants_value["tenants"][0]["tenant_id"], "test-relay"); + assert_eq!(tenants_value["tenants"][0]["ready"], true); let root_body = to_bytes(root_without_accept.into_body(), usize::MAX) .await .expect("body"); @@ -541,6 +770,133 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + #[tokio::test] + async fn tangle_http_router_routes_by_host_and_fails_closed() { + let root = temp_root("host-routing"); + let _ = std::fs::remove_dir_all(&root); + let runtime = multi_host_runtime(&root); + for tenant in runtime.registry().active_tenants() { + tenant + .runtime() + .readiness_handle() + .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready); + } + let router = tangle_http_router(runtime); + + let alpha = nip11_response( + &router, + Some("alpha.relay.test"), + Some("beta.relay.test"), + 39_010, + ) + .await; + assert_eq!(alpha.status(), http::StatusCode::OK); + let alpha = response_json(alpha).await; + assert_eq!(alpha["name"], "Alpha Relay"); + + let beta = nip11_response(&router, Some("beta.relay.test"), None, 39_011).await; + assert_eq!(beta.status(), http::StatusCode::OK); + let beta = response_json(beta).await; + assert_eq!(beta["name"], "Beta Relay"); + + let unknown = nip11_response(&router, Some("unknown.relay.test"), None, 39_012).await; + assert_eq!(unknown.status(), http::StatusCode::NOT_FOUND); + assert_eq!(response_text(unknown).await, "unknown host"); + + let inactive = nip11_response(&router, Some("inactive.relay.test"), None, 39_013).await; + assert_eq!(inactive.status(), http::StatusCode::NOT_FOUND); + assert_eq!(response_text(inactive).await, "unknown host"); + + let missing = nip11_response(&router, None, None, 39_014).await; + assert_eq!(missing.status(), http::StatusCode::BAD_REQUEST); + assert_eq!(response_text(missing).await, "missing host"); + + for path in ["/healthz", "/readyz", "/metricsz"] { + let legacy = router + .clone() + .oneshot( + Request::builder() + .uri(path) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("legacy route"); + assert_eq!(legacy.status(), http::StatusCode::NOT_FOUND); + } + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn websocket_auth_rejects_cross_tenant_relay_url() { + let root = temp_root("cross-tenant-auth"); + let _ = std::fs::remove_dir_all(&root); + let runtime = multi_host_runtime(&root); + let shutdown = runtime.shutdown_signal().clone(); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); + let mut request = format!("ws://{address}/") + .into_client_request() + .expect("request"); + request.headers_mut().insert( + header::SEC_WEBSOCKET_PROTOCOL, + http::HeaderValue::from_static("nostr"), + ); + request.headers_mut().insert( + header::HOST, + http::HeaderValue::from_static("beta.relay.test"), + ); + let (mut socket, response) = tokio_tungstenite::connect_async(request) + .await + .expect("websocket"); + + assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); + let challenge = read_auth_challenge(&mut socket).await; + let created_at = current_unix_timestamp(); + let alpha_auth = tangle_v2_auth_event_for_relay( + FixtureKey::Owner, + &challenge, + created_at, + "wss://alpha.relay.test", + ) + .expect("alpha auth"); + let beta_auth = tangle_v2_auth_event_for_relay( + FixtureKey::Owner, + &challenge, + created_at.saturating_add(1), + "wss://beta.relay.test", + ) + .expect("beta auth"); + + send_client_value(&mut socket, json!(["AUTH", event_to_value(&alpha_auth)])).await; + assert_eq!( + read_relay_value(&mut socket).await, + json!([ + "OK", + alpha_auth.id().as_str(), + false, + "auth-required: auth relay does not match canonical relay URL" + ]) + ); + + send_client_value(&mut socket, json!(["AUTH", event_to_value(&beta_auth)])).await; + assert_eq!( + read_relay_value(&mut socket).await, + json!(["OK", beta_auth.id().as_str(), true, ""]) + ); + + shutdown.request_shutdown(); + let report = timeout(Duration::from_secs(1), task) + .await + .expect("server shutdown") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + let _ = std::fs::remove_dir_all(root); + } + fn tangle_v2_event( key: FixtureKey, created_at: u64, @@ -557,12 +913,21 @@ mod tests { challenge: &str, created_at: u64, ) -> Result<Event, String> { + tangle_v2_auth_event_for_relay(key, challenge, created_at, "wss://relay.radroots.test") + } + + fn tangle_v2_auth_event_for_relay( + key: FixtureKey, + challenge: &str, + created_at: u64, + relay_url: &str, + ) -> Result<Event, String> { tangle_v2_event( key, created_at, 22_242, vec![ - Tag::from_parts("relay", &["wss://relay.radroots.test"])?, + Tag::from_parts("relay", &[relay_url])?, Tag::from_parts("challenge", &[challenge])?, ], "", @@ -638,25 +1003,119 @@ mod tests { } } - fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { - let raw = json!({ - "server": { - "listen_addr": "127.0.0.1:0", - "relay_url": "wss://relay.radroots.test" + fn host_runtime(root: &Path) -> TangleHostRuntime { + host_runtime_from_tenants(vec![tenant_config_value( + root, + TenantConfigFixture { + tenant_id: "test-relay", + tenant_schema: "test_relay", + host: "relay.radroots.test", + relay_url: "wss://relay.radroots.test", + name: "Radroots Test Relay", + inactive: false, + relay_secret_byte: 0x77, + }, + )]) + } + + fn multi_host_runtime(root: &Path) -> TangleHostRuntime { + host_runtime_from_tenants(vec![ + tenant_config_value( + root, + TenantConfigFixture { + tenant_id: "alpha", + tenant_schema: "alpha_schema", + host: "alpha.relay.test", + relay_url: "wss://alpha.relay.test", + name: "Alpha Relay", + inactive: false, + relay_secret_byte: 0x77, + }, + ), + tenant_config_value( + root, + TenantConfigFixture { + tenant_id: "beta", + tenant_schema: "beta_schema", + host: "beta.relay.test", + relay_url: "wss://beta.relay.test", + name: "Beta Relay", + inactive: false, + relay_secret_byte: 0x88, + }, + ), + tenant_config_value( + root, + TenantConfigFixture { + tenant_id: "inactive", + tenant_schema: "inactive_schema", + host: "inactive.relay.test", + relay_url: "wss://inactive.relay.test", + name: "Inactive Relay", + inactive: true, + relay_secret_byte: 0x99, + }, + ), + ]) + } + + fn host_runtime_from_tenants(tenant_values: Vec<serde_json::Value>) -> TangleHostRuntime { + let host = parse_tangle_host_runtime_config_json(&host_config_value().to_string()) + .expect("host config"); + let tenants = tenant_values + .into_iter() + .map(|tenant| parse_tenant_runtime_config_json(&tenant.to_string()).expect("tenant")) + .collect::<Vec<_>>(); + let config = TangleHostRuntimeConfigSet::new(host, tenants).expect("config set"); + TangleHostRuntime::open(config).expect("host runtime") + } + + fn host_config_value() -> serde_json::Value { + json!({ + "listen_addr": "127.0.0.1:0", + "tenant_config_dir": "tenants", + "limits": { + "max_total_connections": 64, + "max_total_subscriptions": 256, + "tenant_startup_concurrency": 4 + } + }) + } + + struct TenantConfigFixture<'a> { + tenant_id: &'a str, + tenant_schema: &'a str, + host: &'a str, + relay_url: &'a str, + name: &'a str, + inactive: bool, + relay_secret_byte: u8, + } + + fn tenant_config_value(root: &Path, fixture: TenantConfigFixture<'_>) -> serde_json::Value { + let relay_secret = format!("{:02x}", fixture.relay_secret_byte).repeat(32); + json!({ + "tenant_id": fixture.tenant_id, + "tenant_schema": fixture.tenant_schema, + "host": fixture.host, + "relay_url": fixture.relay_url, + "inactive": fixture.inactive, + "info": { + "name": fixture.name }, "pocket": { - "data_directory": root.join("pocket"), + "data_directory": root.join(format!("{}-pocket", fixture.tenant_id)), "sync_policy": "flush_on_shutdown", - "query": { - "allow_scraping": false, - "allow_scrape_if_limited_to": 100, - "allow_scrape_if_max_seconds": 3600 - } + }, + "pocket_query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 }, "groups": { "enabled": true, - "canonical_relay_url": "wss://relay.radroots.test", - "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", + "canonical_relay_url": fixture.relay_url, + "relay_secret": relay_secret, "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"] }, "auth": { @@ -715,8 +1174,43 @@ mod tests { } } }) - .to_string(); - parse_base_relay_runtime_config_json(&raw).expect("config") + } + + async fn nip11_response( + router: &axum::Router, + host: Option<&str>, + forwarded_host: Option<&str>, + peer_port: u16, + ) -> http::Response<Body> { + let mut builder = Request::builder() + .uri("/") + .header(header::ACCEPT, "application/nostr+json") + .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], peer_port)))); + if let Some(host) = host { + builder = builder.header(header::HOST, host); + } + if let Some(forwarded_host) = forwarded_host { + builder = builder.header("x-forwarded-host", forwarded_host); + } + router + .clone() + .oneshot(builder.body(Body::empty()).expect("request")) + .await + .expect("response") + } + + async fn response_json(response: http::Response<Body>) -> serde_json::Value { + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("body"); + serde_json::from_slice::<serde_json::Value>(&body).expect("json") + } + + async fn response_text(response: http::Response<Body>) -> String { + let body = to_bytes(response.into_body(), usize::MAX) + .await + .expect("body"); + String::from_utf8(body.to_vec()).expect("utf8") } fn temp_root(name: &str) -> PathBuf { diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -4,6 +4,7 @@ use crate::{ client_message::{RuntimeClientMessage, parse_runtime_client_message}, errors::BaseRelayError, event_bus::{TangleEventReceiveError, TangleEventReceiver}, + host::{TangleHostResourceLimiter, TangleHostSubscriptionPermit}, logging, relay::{ auth::{BaseAuthState, generate_auth_challenge}, @@ -18,6 +19,7 @@ use crate::{ }; use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket}; use std::{ + collections::BTreeMap, net::IpAddr, sync::atomic::{AtomicU64, Ordering}, time::{Instant, SystemTime, UNIX_EPOCH}, @@ -38,6 +40,8 @@ pub struct TangleWebSocketSession { limits: TangleRuntimeLimits, auth: BaseAuthState, subscriptions: LiveSubscriptionSet, + host_resources: Option<TangleHostResourceLimiter>, + host_subscription_permits: BTreeMap<SubscriptionId, TangleHostSubscriptionPermit>, events: TangleEventReceiver, } @@ -62,6 +66,20 @@ impl TangleWebSocketSession { events: TangleEventReceiver, peer_ip: Option<IpAddr>, ) -> Result<Self, BaseRelayError> { + Self::new_with_peer_and_host_resources( + limits, shutdown, runtime, auth, events, peer_ip, None, + ) + } + + pub fn new_with_peer_and_host_resources( + limits: TangleRuntimeLimits, + shutdown: watch::Receiver<bool>, + runtime: TenantRuntimeHandle, + auth: BaseAuthState, + events: TangleEventReceiver, + peer_ip: Option<IpAddr>, + host_resources: Option<TangleHostResourceLimiter>, + ) -> Result<Self, BaseRelayError> { let outbound_queue_capacity = limits.outbound_queue_capacity(); let (sender, receiver) = mpsc::channel(outbound_queue_capacity); let subscriptions = LiveSubscriptionSet::new( @@ -82,6 +100,8 @@ impl TangleWebSocketSession { limits, auth, subscriptions, + host_resources, + host_subscription_permits: BTreeMap::new(), events, }) } @@ -108,7 +128,7 @@ impl TangleWebSocketSession { metrics.record_session_opened(); logging::log_websocket_session_opened(self.connection_id, self.peer_ip); if !self.issue_auth_challenge() { - let closed_subscriptions = self.subscriptions.close_all(); + let closed_subscriptions = self.close_all_subscriptions(); metrics.record_subscriptions_closed(closed_subscriptions); metrics.record_session_closed(); metrics.record_event_bus_receivers( @@ -168,7 +188,7 @@ impl TangleWebSocketSession { } } } - let closed_subscriptions = self.subscriptions.close_all(); + let closed_subscriptions = self.close_all_subscriptions(); metrics.record_subscriptions_closed(closed_subscriptions); metrics.record_session_closed(); metrics.record_event_bus_receivers(metrics.event_bus_receivers_current().saturating_sub(1)); @@ -307,6 +327,7 @@ impl TangleWebSocketSession { .base_relay_limits() .validate_subscription_id(&subscription_id)?; if self.subscriptions.close(&subscription_id) == CloseResult::Closed { + self.host_subscription_permits.remove(&subscription_id); metrics.record_subscriptions_closed(1); } Ok(Vec::new()) @@ -358,6 +379,7 @@ impl TangleWebSocketSession { return Ok(vec![message.into()]); } let should_subscribe = !pocket_filters_are_complete(&filters); + let already_subscribed = self.subscriptions.contains(&subscription_id); if should_subscribe { self.subscriptions .ensure_can_subscribe(&subscription_id, &filters)?; @@ -374,14 +396,32 @@ impl TangleWebSocketSession { let closes_subscription = report.group_read_denied(); let replies = report.into_messages(); if should_subscribe && !closes_subscription { + let host_permit = if already_subscribed { + None + } else { + self.host_resources + .as_ref() + .map(|resources| resources.try_open_subscriptions(1)) + .transpose()? + }; self.subscriptions .subscribe(subscription_id.clone(), filters)?; + if let Some(permit) = host_permit { + self.host_subscription_permits + .insert(subscription_id.clone(), permit); + } metrics.record_subscription_opened(); logging::log_subscription_opened(self.connection_id, &subscription_id); } Ok(replies) } + fn close_all_subscriptions(&mut self) -> usize { + let closed = self.subscriptions.close_all(); + self.host_subscription_permits.clear(); + closed + } + fn client_rate_limit_context(&self) -> TangleClientRateLimitContext { TangleClientRateLimitContext::new(self.peer_ip, Some(self.connection_id)) } diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -24,8 +24,12 @@ use tangle_protocol::{ UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, filter_to_value, }; use tangle_runtime::{ - config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + config::{ + BaseRelayRuntimeConfig, TangleHostRuntimeConfigSet, parse_base_relay_runtime_config_json, + parse_tangle_host_runtime_config_json, parse_tenant_runtime_config_json, + }, errors::BaseRelayError, + host::TangleHostRuntime, nip11::BaseRelayInfoConfig, relay::{auth::BaseAuthState, core::BaseRelay}, runtime::TenantRuntime, @@ -349,21 +353,22 @@ async fn tangle_run_serves_until_shutdown() { let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); - let health = wait_for_http_ok(address, "/healthz", None).await; - let ready = wait_for_http_ok(address, "/readyz", None).await; - let metrics = wait_for_http_ok(address, "/metricsz", None).await; + let ready = wait_for_http_ok(address, "/.well-known/tangle/ready", None).await; + let metrics = wait_for_http_ok(address, "/.well-known/tangle/metrics", None).await; + let tenants = wait_for_http_ok(address, "/.well-known/tangle/tenants", None).await; let nip11 = wait_for_http_ok(address, "/", Some("application/nostr+json")).await; - assert!(health.contains(r#""status":"ok""#)); assert!(ready.contains(r#""status":"ready""#)); - assert!(ready.contains(r#""server_bind":"ready""#)); + assert!(ready.contains(r#""active_tenants":"ready""#)); assert!(metrics.contains(r#""tangle_readiness_ready":true"#)); + assert!(metrics.contains(r#""tangle_host_active_tenants":1"#)); assert!(metrics.contains(r#""tangle_ws_connections_current":0"#)); assert!(metrics.contains(r#""tangle_stored_event_offsets_total":0"#)); + assert!(tenants.contains(r#""tenant_id":"acceptance-relay""#)); assert!(nip11.contains(r#""name":"tangle""#)); assert!( nip11 @@ -391,7 +396,7 @@ async fn websocket_clients_use_nip01_nip42_and_nip45_flows() { let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut first = connect_nostr_socket(address).await; @@ -560,7 +565,7 @@ async fn websocket_public_relay_covers_query_count_ephemeral_and_rejection_flows let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut publisher = connect_nostr_socket(address).await; @@ -754,7 +759,7 @@ async fn websocket_healthy_subscriber_receives_more_than_outbound_capacity() { let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut publisher = connect_nostr_socket(address).await; @@ -817,7 +822,7 @@ async fn websocket_nip29_group_lifecycle_state_and_live_paths_are_integrated() { let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut owner = connect_nostr_socket(address).await; @@ -1005,7 +1010,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut owner_writer = connect_nostr_socket(address).await; @@ -1431,7 +1436,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ ) .await; - let metrics = wait_for_http_ok(address, "/metricsz", None).await; + let metrics = wait_for_http_ok(address, "/.well-known/tangle/metrics", None).await; for private_value in [ "PrivateSocket", "HiddenSocket", @@ -1485,7 +1490,7 @@ async fn nip11_includes_cors_headers_and_truthful_supported_nips() { let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); @@ -2173,7 +2178,7 @@ async fn relay_generated_events_are_stored_projected_and_broadcast_to_websocket_ let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut owner = connect_nostr_socket(address).await; @@ -2283,7 +2288,7 @@ async fn private_relay_generated_events_reach_authorized_websocket_subscribers() let _ = std::fs::remove_dir_all(&root); let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); let address = listener.local_addr().expect("address"); - let runtime = TenantRuntime::open(runtime_config(&root, address)).expect("runtime"); + let runtime = host_runtime(&root, address); let shutdown = runtime.shutdown_signal().clone(); let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); let mut owner_writer = connect_nostr_socket(address).await; @@ -2393,6 +2398,107 @@ fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfi .expect("config") } +fn host_runtime(root: &Path, listen_addr: SocketAddr) -> TangleHostRuntime { + let host = parse_tangle_host_runtime_config_json( + &json!({ + "listen_addr": listen_addr.to_string(), + "tenant_config_dir": "tenants", + "limits": { + "max_total_connections": 128, + "max_total_subscriptions": 1024, + "tenant_startup_concurrency": 4 + } + }) + .to_string(), + ) + .expect("host config"); + let tenant = parse_tenant_runtime_config_json( + &json!({ + "tenant_id": "acceptance-relay", + "tenant_schema": "acceptance_relay", + "host": "relay.radroots.test", + "relay_url": "wss://relay.radroots.test", + "info": { + "name": "tangle" + }, + "pocket": { + "data_directory": root.join("pocket"), + "sync_policy": "flush_on_shutdown" + }, + "pocket_query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 + }, + "groups": { + "enabled": true, + "canonical_relay_url": "wss://relay.radroots.test", + "relay_secret": TANGLE_V2_RELAY_SECRET_HEX, + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] + }, + "auth": { + "challenge_ttl_seconds": 300, + "created_at_skew_seconds": 600 + }, + "limits": { + "max_message_length": 1048576, + "max_subid_length": 64, + "max_subscriptions_per_connection": 64, + "max_filters_per_request": 10, + "max_tag_values_per_filter": 100, + "max_query_complexity": 2048, + "max_limit": 500, + "default_limit": 100, + "max_event_tags": 200, + "max_content_length": 65536, + "broadcast_channel_capacity": 8, + "per_connection_outbound_queue": 8 + }, + "rate_limits": { + "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} + }, + "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} + }, + "req": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_connection": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 240}, + "per_group": {"window_seconds": 60, "max_hits": 240}, + "per_kind": {"window_seconds": 60, "max_hits": 500}, + "broad": {"window_seconds": 60, "max_hits": 30} + }, + "count": { + "per_ip": {"window_seconds": 60, "max_hits": 300}, + "per_connection": {"window_seconds": 60, "max_hits": 60}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_group": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 240}, + "broad": {"window_seconds": 60, "max_hits": 20} + } + } + }) + .to_string(), + ) + .expect("tenant config"); + let config = TangleHostRuntimeConfigSet::new(host, vec![tenant]).expect("host config set"); + TangleHostRuntime::open(config).expect("host runtime") +} + fn runtime_config_value(root: &Path, listen_addr: SocketAddr) -> Value { json!({ "server": { @@ -2501,7 +2607,8 @@ fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::R let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(200))?; stream.set_read_timeout(Some(Duration::from_millis(500)))?; stream.set_write_timeout(Some(Duration::from_millis(500)))?; - let mut request = format!("GET {path} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n"); + let mut request = + format!("GET {path} HTTP/1.1\r\nHost: relay.radroots.test\r\nConnection: close\r\n"); if let Some(accept) = accept { request.push_str(&format!("Accept: {accept}\r\n")); } @@ -2531,6 +2638,10 @@ async fn connect_nostr_socket(address: SocketAddr) -> TestWebSocket { header::SEC_WEBSOCKET_PROTOCOL, http::HeaderValue::from_static("nostr"), ); + request.headers_mut().insert( + header::HOST, + http::HeaderValue::from_static("relay.radroots.test"), + ); let (socket, response) = tokio_tungstenite::connect_async(request) .await .expect("websocket");