tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

server.rs (48313B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{
      4     errors::BaseRelayError,
      5     host::{HostResolutionError, TangleHostRuntime, TenantRuntimeEntry},
      6     logging,
      7     nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response},
      8     ops::BaseRelayReadinessCheckStatus,
      9     session::TangleWebSocketSession,
     10 };
     11 use axum::{
     12     Json, Router,
     13     extract::{
     14         ConnectInfo, State,
     15         ws::{WebSocketUpgrade, rejection::WebSocketUpgradeRejection},
     16     },
     17     response::{IntoResponse, Response},
     18     routing::get,
     19 };
     20 use http::{HeaderMap, StatusCode};
     21 use std::net::SocketAddr;
     22 use tokio::net::TcpListener;
     23 
     24 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     25 pub struct TangleServeReport {
     26     listen_addr: SocketAddr,
     27     closed_subscriptions: usize,
     28 }
     29 
     30 impl TangleServeReport {
     31     pub fn new(listen_addr: SocketAddr, closed_subscriptions: usize) -> Self {
     32         Self {
     33             listen_addr,
     34             closed_subscriptions,
     35         }
     36     }
     37 
     38     pub fn listen_addr(self) -> SocketAddr {
     39         self.listen_addr
     40     }
     41 
     42     pub fn closed_subscriptions(self) -> usize {
     43         self.closed_subscriptions
     44     }
     45 }
     46 
     47 pub async fn serve_until_shutdown(
     48     runtime: TangleHostRuntime,
     49 ) -> Result<TangleServeReport, BaseRelayError> {
     50     let listener = TcpListener::bind(runtime.config().host().listen_addr())
     51         .await
     52         .map_err(|error| BaseRelayError::error(error.to_string()))?;
     53     serve_listener_until_shutdown(runtime, listener).await
     54 }
     55 
     56 pub async fn serve_listener_until_shutdown(
     57     runtime: TangleHostRuntime,
     58     listener: TcpListener,
     59 ) -> Result<TangleServeReport, BaseRelayError> {
     60     let listen_addr = listener
     61         .local_addr()
     62         .map_err(|error| BaseRelayError::error(error.to_string()))?;
     63     for tenant in runtime.registry().active_tenants() {
     64         tenant
     65             .runtime()
     66             .readiness_handle()
     67             .set_server_bind(BaseRelayReadinessCheckStatus::Ready);
     68     }
     69     let shutdown_signal = runtime.shutdown_signal().clone();
     70     let router = tangle_http_router(runtime.clone());
     71     let mut shutdown = shutdown_signal.subscribe();
     72     logging::log_server_listening(listen_addr, "tangle-host");
     73     axum::serve(
     74         listener,
     75         router.into_make_service_with_connect_info::<SocketAddr>(),
     76     )
     77     .with_graceful_shutdown(async move {
     78         loop {
     79             if *shutdown.borrow() {
     80                 break;
     81             }
     82             if shutdown.changed().await.is_err() {
     83                 break;
     84             }
     85         }
     86     })
     87     .await
     88     .map_err(|error| BaseRelayError::error(error.to_string()))?;
     89     let shutdown = runtime.shutdown().await?;
     90     logging::log_server_shutdown(listen_addr, shutdown.closed_subscriptions());
     91     Ok(TangleServeReport::new(
     92         listen_addr,
     93         shutdown.closed_subscriptions(),
     94     ))
     95 }
     96 
     97 pub fn tangle_http_router(runtime: TangleHostRuntime) -> Router {
     98     Router::new()
     99         .route("/", get(tangle_root))
    100         .route("/.well-known/tangle/ready", get(tangle_host_ready))
    101         .route("/.well-known/tangle/metrics", get(tangle_host_metrics))
    102         .route("/.well-known/tangle/tenants", get(tangle_host_tenants))
    103         .with_state(TangleHttpState { runtime })
    104 }
    105 
    106 #[derive(Debug, Clone)]
    107 struct TangleHttpState {
    108     runtime: TangleHostRuntime,
    109 }
    110 
    111 async fn tangle_root(
    112     State(state): State<TangleHttpState>,
    113     ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
    114     websocket: Result<WebSocketUpgrade, WebSocketUpgradeRejection>,
    115     headers: HeaderMap,
    116 ) -> Response {
    117     let tenant = match state.runtime.tenant_for_request(&headers, peer_addr) {
    118         Ok(tenant) => tenant.clone(),
    119         Err(error) => return error.into_response(),
    120     };
    121     match websocket {
    122         Ok(websocket) => {
    123             let connection = match state.runtime.resources().try_open_connection() {
    124                 Ok(connection) => connection,
    125                 Err(error) => {
    126                     return (StatusCode::TOO_MANY_REQUESTS, error.prefixed_message())
    127                         .into_response();
    128                 }
    129             };
    130             let tenant_runtime = tenant.runtime().clone();
    131             let session = match tenant_runtime.auth_state().await {
    132                 Ok(auth) => TangleWebSocketSession::new_with_peer_and_resources(
    133                     tenant_runtime.limits(),
    134                     state.runtime.shutdown_signal().subscribe(),
    135                     tenant_runtime.clone(),
    136                     auth,
    137                     tenant_runtime.subscribe_events().await,
    138                     Some(peer_addr.ip()),
    139                     Some(state.runtime.resources()),
    140                 ),
    141                 Err(error) => Err(error),
    142             };
    143             match session {
    144                 Ok(session) => websocket
    145                     .protocols(["nostr"])
    146                     .on_upgrade(move |socket| async move {
    147                         let _connection = connection;
    148                         session.run(socket).await;
    149                     })
    150                     .into_response(),
    151                 Err(error) => (
    152                     http::StatusCode::INTERNAL_SERVER_ERROR,
    153                     error.prefixed_message(),
    154                 )
    155                     .into_response(),
    156             }
    157         }
    158         Err(_) => match tenant_info_document(&tenant) {
    159             Ok(info) => base_relay_info_response(info, headers),
    160             Err(error) => {
    161                 (StatusCode::INTERNAL_SERVER_ERROR, error.prefixed_message()).into_response()
    162             }
    163         },
    164     }
    165 }
    166 
    167 async fn tangle_host_ready(State(state): State<TangleHttpState>) -> Response {
    168     if !state.runtime.config().host().ops().enabled() {
    169         return host_ops_disabled_response();
    170     }
    171     let readiness = state.runtime.readiness_state();
    172     let status = if readiness.is_ready() {
    173         StatusCode::OK
    174     } else {
    175         StatusCode::SERVICE_UNAVAILABLE
    176     };
    177     (
    178         status,
    179         Json(serde_json::json!({
    180             "status": if readiness.is_ready() { "ready" } else { "not_ready" },
    181             "checks": {
    182                 "config": readiness.config().as_str(),
    183                 "tenant_registry": readiness.tenant_registry().as_str(),
    184                 "active_tenants": readiness.active_tenants().as_str(),
    185                 "shutdown_requested": readiness.shutdown_requested()
    186             }
    187         })),
    188     )
    189         .into_response()
    190 }
    191 
    192 async fn tangle_host_metrics(State(state): State<TangleHttpState>) -> Response {
    193     if !state.runtime.config().host().ops().enabled() {
    194         return host_ops_disabled_response();
    195     }
    196     let metrics = state.runtime.metrics_snapshot();
    197     let mut values = serde_json::Map::new();
    198     values.insert(
    199         "tangle_host_configured_tenants".to_owned(),
    200         serde_json::json!(metrics.configured_tenants()),
    201     );
    202     values.insert(
    203         "tangle_host_active_tenants".to_owned(),
    204         serde_json::json!(metrics.active_tenants()),
    205     );
    206     values.insert(
    207         "tangle_host_inactive_tenants".to_owned(),
    208         serde_json::json!(metrics.inactive_tenants()),
    209     );
    210     values.insert(
    211         "tangle_host_ws_connections_current".to_owned(),
    212         serde_json::json!(metrics.active_connections()),
    213     );
    214     values.insert(
    215         "tangle_host_subscriptions_current".to_owned(),
    216         serde_json::json!(metrics.active_subscriptions()),
    217     );
    218     values.insert(
    219         "tangle_host_ws_connections_limit".to_owned(),
    220         serde_json::json!(metrics.max_total_connections()),
    221     );
    222     values.insert(
    223         "tangle_host_subscriptions_limit".to_owned(),
    224         serde_json::json!(metrics.max_total_subscriptions()),
    225     );
    226     values.insert(
    227         "tangle_readiness_ready".to_owned(),
    228         serde_json::json!(state.runtime.readiness_state().is_ready()),
    229     );
    230     for tenant in state.runtime.registry().active_tenants() {
    231         let snapshot = tenant
    232             .runtime()
    233             .metrics()
    234             .snapshot_with_readiness(tenant.runtime().readiness_handle().snapshot().is_ready());
    235         let serde_json::Value::Object(snapshot) =
    236             serde_json::to_value(snapshot).expect("tenant metrics serialize")
    237         else {
    238             continue;
    239         };
    240         for (key, value) in snapshot {
    241             if key == "tangle_readiness_ready" {
    242                 continue;
    243             }
    244             if let Some(value) = value.as_u64() {
    245                 let current = values
    246                     .get(&key)
    247                     .and_then(serde_json::Value::as_u64)
    248                     .unwrap_or(0);
    249                 values.insert(key, serde_json::json!(current.saturating_add(value)));
    250             }
    251         }
    252     }
    253     Json(serde_json::Value::Object(values)).into_response()
    254 }
    255 
    256 async fn tangle_host_tenants(State(state): State<TangleHttpState>) -> Response {
    257     let ops = state.runtime.config().host().ops();
    258     if !ops.enabled() {
    259         return host_ops_disabled_response();
    260     }
    261     if !ops.expose_tenant_inventory() {
    262         return (
    263             StatusCode::NOT_FOUND,
    264             "tangle host tenant inventory is disabled",
    265         )
    266             .into_response();
    267     }
    268     let tenants = state
    269         .runtime
    270         .tenant_inventory()
    271         .into_iter()
    272         .map(|tenant| {
    273             serde_json::json!({
    274                 "tenant_id": tenant.tenant_id().as_str(),
    275                 "tenant_schema": tenant.tenant_schema().as_str(),
    276                 "host": tenant.host().as_str(),
    277                 "relay_url": tenant.relay_url().as_str(),
    278                 "status": if tenant.active() { "active" } else { "inactive" },
    279                 "ready": tenant.ready()
    280             })
    281         })
    282         .collect::<Vec<_>>();
    283     Json(serde_json::json!({ "tenants": tenants })).into_response()
    284 }
    285 
    286 fn host_ops_disabled_response() -> Response {
    287     (StatusCode::NOT_FOUND, "tangle host ops are disabled").into_response()
    288 }
    289 
    290 fn tenant_info_document(
    291     tenant: &TenantRuntimeEntry,
    292 ) -> Result<BaseRelayInfoDocument, BaseRelayError> {
    293     BaseRelayInfoConfig::from_tenant_config(tenant.config())?.build_document()
    294 }
    295 
    296 impl IntoResponse for HostResolutionError {
    297     fn into_response(self) -> Response {
    298         match self {
    299             Self::Missing => (StatusCode::BAD_REQUEST, "missing host").into_response(),
    300             Self::Invalid => (StatusCode::BAD_REQUEST, "invalid host").into_response(),
    301             Self::Unknown => (StatusCode::NOT_FOUND, "unknown host").into_response(),
    302         }
    303     }
    304 }
    305 
    306 #[cfg(test)]
    307 mod tests {
    308     use super::{serve_until_shutdown, tangle_http_router};
    309     use crate::{
    310         config::{
    311             TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json,
    312             parse_tenant_runtime_config_json,
    313         },
    314         host::TangleHostRuntime,
    315     };
    316     use axum::{
    317         body::{Body, to_bytes},
    318         extract::ConnectInfo,
    319     };
    320     use futures_util::{SinkExt, StreamExt};
    321     use http::{Request, header};
    322     use serde_json::json;
    323     use std::{
    324         net::SocketAddr,
    325         path::{Path, PathBuf},
    326         time::{SystemTime, UNIX_EPOCH},
    327     };
    328     use tangle_crypto::RelaySigner;
    329     use tangle_protocol::{
    330         Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
    331         event_to_value,
    332     };
    333     use tangle_store_pocket::{
    334         PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime,
    335     };
    336     use tangle_test_support::FixtureKey;
    337     use tokio::net::TcpListener;
    338     use tokio::time::{Duration, timeout};
    339     use tokio_tungstenite::tungstenite::{
    340         Message as TungsteniteMessage, client::IntoClientRequest,
    341     };
    342     use tower::ServiceExt;
    343 
    344     #[tokio::test]
    345     async fn serve_until_shutdown_binds_listener_and_exits_on_signal() {
    346         let root = temp_root("serve-until-shutdown");
    347         let _ = std::fs::remove_dir_all(&root);
    348         let runtime = host_runtime(&root);
    349         let shutdown = runtime.shutdown_signal().clone();
    350         let task = tokio::spawn(serve_until_shutdown(runtime));
    351 
    352         tokio::task::yield_now().await;
    353         shutdown.request_shutdown();
    354 
    355         let report = task.await.expect("task").expect("serve");
    356         assert_eq!(report.listen_addr().ip().to_string(), "127.0.0.1");
    357         assert_ne!(report.listen_addr().port(), 0);
    358         assert_eq!(report.closed_subscriptions(), 0);
    359 
    360         let _ = std::fs::remove_dir_all(root);
    361     }
    362 
    363     #[tokio::test]
    364     async fn serve_until_shutdown_accepts_websocket_upgrade() {
    365         let root = temp_root("websocket-upgrade");
    366         let _ = std::fs::remove_dir_all(&root);
    367         let runtime = host_runtime(&root);
    368         let shutdown = runtime.shutdown_signal().clone();
    369         let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
    370         let address = listener.local_addr().expect("address");
    371         let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener));
    372         let mut request = format!("ws://{address}/")
    373             .into_client_request()
    374             .expect("request");
    375         request.headers_mut().insert(
    376             header::SEC_WEBSOCKET_PROTOCOL,
    377             http::HeaderValue::from_static("nostr"),
    378         );
    379         request.headers_mut().insert(
    380             header::HOST,
    381             http::HeaderValue::from_static("relay.radroots.test"),
    382         );
    383 
    384         let (_socket, response) = tokio_tungstenite::connect_async(request)
    385             .await
    386             .expect("websocket");
    387 
    388         assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    389         assert_eq!(
    390             response
    391                 .headers()
    392                 .get(header::SEC_WEBSOCKET_PROTOCOL)
    393                 .expect("protocol"),
    394             "nostr"
    395         );
    396 
    397         shutdown.request_shutdown();
    398         let report = task.await.expect("task").expect("serve");
    399         assert_eq!(report.listen_addr(), address);
    400         let _ = std::fs::remove_dir_all(root);
    401     }
    402 
    403     #[tokio::test]
    404     async fn serve_until_shutdown_closes_websocket_sessions() {
    405         let root = temp_root("websocket-shutdown");
    406         let _ = std::fs::remove_dir_all(&root);
    407         let runtime = host_runtime(&root);
    408         let shutdown = runtime.shutdown_signal().clone();
    409         let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
    410         let address = listener.local_addr().expect("address");
    411         let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener));
    412         let mut request = format!("ws://{address}/")
    413             .into_client_request()
    414             .expect("request");
    415         request.headers_mut().insert(
    416             header::SEC_WEBSOCKET_PROTOCOL,
    417             http::HeaderValue::from_static("nostr"),
    418         );
    419         request.headers_mut().insert(
    420             header::HOST,
    421             http::HeaderValue::from_static("relay.radroots.test"),
    422         );
    423         let (mut socket, response) = tokio_tungstenite::connect_async(request)
    424             .await
    425             .expect("websocket");
    426 
    427         assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    428         let _ = read_auth_challenge(&mut socket).await;
    429 
    430         shutdown.request_shutdown();
    431 
    432         let next = timeout(Duration::from_secs(1), socket.next())
    433             .await
    434             .expect("websocket close");
    435         match next {
    436             Some(Ok(TungsteniteMessage::Close(_))) | None => {}
    437             other => panic!("expected websocket close, got {other:?}"),
    438         }
    439         let report = timeout(Duration::from_secs(1), task)
    440             .await
    441             .expect("server shutdown")
    442             .expect("task")
    443             .expect("serve");
    444         assert_eq!(report.listen_addr(), address);
    445         let _ = std::fs::remove_dir_all(root);
    446     }
    447 
    448     #[tokio::test]
    449     async fn websocket_session_dispatches_base_client_messages() {
    450         let root = temp_root("websocket-dispatch");
    451         let _ = std::fs::remove_dir_all(&root);
    452         let runtime = host_runtime(&root);
    453         let shutdown = runtime.shutdown_signal().clone();
    454         let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
    455         let address = listener.local_addr().expect("address");
    456         let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener));
    457         let mut request = format!("ws://{address}/")
    458             .into_client_request()
    459             .expect("request");
    460         request.headers_mut().insert(
    461             header::SEC_WEBSOCKET_PROTOCOL,
    462             http::HeaderValue::from_static("nostr"),
    463         );
    464         request.headers_mut().insert(
    465             header::HOST,
    466             http::HeaderValue::from_static("relay.radroots.test"),
    467         );
    468         let (mut socket, response) = tokio_tungstenite::connect_async(request)
    469             .await
    470             .expect("websocket");
    471         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello")
    472             .expect("event");
    473 
    474         assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    475         let challenge = read_auth_challenge(&mut socket).await;
    476         assert_eq!(challenge.len(), 64);
    477         assert_eq!(challenge, challenge.to_ascii_lowercase());
    478 
    479         let auth_created_at = current_unix_timestamp();
    480         let owner_auth = tangle_v2_auth_event(FixtureKey::Owner, &challenge, auth_created_at)
    481             .expect("owner auth");
    482         let admin_auth = tangle_v2_auth_event(
    483             FixtureKey::Admin,
    484             &challenge,
    485             auth_created_at.saturating_add(1),
    486         )
    487         .expect("admin auth");
    488 
    489         send_client_text(&mut socket, "{").await;
    490         let notice = read_relay_value(&mut socket).await;
    491         assert_eq!(notice[0], "NOTICE");
    492         assert!(
    493             notice[1]
    494                 .as_str()
    495                 .expect("notice")
    496                 .starts_with("invalid: client message JSON is invalid:")
    497         );
    498 
    499         send_client_value(&mut socket, json!(["EVENT", event_to_value(&event)])).await;
    500         assert_eq!(
    501             read_relay_value(&mut socket).await,
    502             json!(["OK", event.id().as_str(), true, ""])
    503         );
    504 
    505         send_client_value(
    506             &mut socket,
    507             json!(["COUNT", "count-a", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_433}]),
    508         )
    509         .await;
    510         assert_eq!(
    511             read_relay_value(&mut socket).await,
    512             json!(["COUNT", "count-a", {"count": 1}])
    513         );
    514 
    515         send_client_value(&mut socket, json!(["REQ", "sub-a", {}])).await;
    516         let req_event = read_relay_value(&mut socket).await;
    517         assert_eq!(req_event[0], "EVENT");
    518         assert_eq!(req_event[1], "sub-a");
    519         assert_eq!(req_event[2]["id"], event.id().as_str());
    520         assert_eq!(
    521             read_relay_value(&mut socket).await,
    522             json!(["EOSE", "sub-a"])
    523         );
    524 
    525         send_client_value(
    526             &mut socket,
    527             json!(["REQ", "sub-search", {"search": "fresh carrots", "limit": 1}]),
    528         )
    529         .await;
    530         assert_eq!(
    531             read_relay_value(&mut socket).await,
    532             json!([
    533                 "CLOSED",
    534                 "sub-search",
    535                 "unsupported: search filters are not supported"
    536             ])
    537         );
    538 
    539         send_client_value(&mut socket, json!(["AUTH", event_to_value(&owner_auth)])).await;
    540         assert_eq!(
    541             read_relay_value(&mut socket).await,
    542             json!(["OK", owner_auth.id().as_str(), true, ""])
    543         );
    544 
    545         send_client_value(&mut socket, json!(["AUTH", event_to_value(&admin_auth)])).await;
    546         assert_eq!(
    547             read_relay_value(&mut socket).await,
    548             json!(["OK", admin_auth.id().as_str(), true, ""])
    549         );
    550 
    551         send_client_value(&mut socket, json!(["CLOSE", "sub-a"])).await;
    552         assert!(
    553             timeout(Duration::from_millis(50), socket.next())
    554                 .await
    555                 .is_err()
    556         );
    557 
    558         shutdown.request_shutdown();
    559         let report = timeout(Duration::from_secs(1), task)
    560             .await
    561             .expect("server shutdown")
    562             .expect("task")
    563             .expect("serve");
    564         assert_eq!(report.listen_addr(), address);
    565         let _ = std::fs::remove_dir_all(root);
    566     }
    567 
    568     #[tokio::test]
    569     async fn tangle_http_router_serves_nip11_and_host_ops_routes() {
    570         let root = temp_root("http-router");
    571         let runtime = host_runtime(&root);
    572         for tenant in runtime.registry().active_tenants() {
    573             tenant
    574                 .runtime()
    575                 .readiness_handle()
    576                 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready);
    577         }
    578         let router = tangle_http_router(runtime);
    579         let nip11 = router
    580             .clone()
    581             .oneshot(
    582                 Request::builder()
    583                     .uri("/")
    584                     .header(header::HOST, "relay.radroots.test")
    585                     .header(header::ACCEPT, "application/nostr+json")
    586                     .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_000))))
    587                     .body(axum::body::Body::empty())
    588                     .expect("request"),
    589             )
    590             .await
    591             .expect("nip11");
    592         let root_without_accept = router
    593             .clone()
    594             .oneshot(
    595                 Request::builder()
    596                     .uri("/")
    597                     .header(header::HOST, "relay.radroots.test")
    598                     .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_001))))
    599                     .body(axum::body::Body::empty())
    600                     .expect("request"),
    601             )
    602             .await
    603             .expect("root");
    604         let health = router
    605             .clone()
    606             .oneshot(
    607                 Request::builder()
    608                     .uri("/healthz")
    609                     .body(axum::body::Body::empty())
    610                     .expect("request"),
    611             )
    612             .await
    613             .expect("health");
    614         let ready = router
    615             .clone()
    616             .oneshot(
    617                 Request::builder()
    618                     .uri("/.well-known/tangle/ready")
    619                     .body(axum::body::Body::empty())
    620                     .expect("request"),
    621             )
    622             .await
    623             .expect("ready");
    624         let metrics = router
    625             .clone()
    626             .oneshot(
    627                 Request::builder()
    628                     .uri("/.well-known/tangle/metrics")
    629                     .body(axum::body::Body::empty())
    630                     .expect("request"),
    631             )
    632             .await
    633             .expect("metrics");
    634         let tenants = router
    635             .oneshot(
    636                 Request::builder()
    637                     .uri("/.well-known/tangle/tenants")
    638                     .body(axum::body::Body::empty())
    639                     .expect("request"),
    640             )
    641             .await
    642             .expect("tenants");
    643 
    644         assert_eq!(nip11.status(), http::StatusCode::OK);
    645         assert_eq!(
    646             nip11.headers().get(header::CONTENT_TYPE).expect("type"),
    647             "application/nostr+json"
    648         );
    649         let nip11_body = to_bytes(nip11.into_body(), usize::MAX).await.expect("body");
    650         let nip11_value = serde_json::from_slice::<serde_json::Value>(&nip11_body).expect("json");
    651         assert_eq!(nip11_value["name"], "Radroots Test Relay");
    652         assert_eq!(nip11_value["limitation"]["max_message_length"], 1_048_576);
    653         assert_eq!(nip11_value["limitation"]["max_subscriptions"], 64);
    654         assert_eq!(nip11_value["limitation"]["max_filters"], 10);
    655         assert_eq!(nip11_value["limitation"]["max_limit"], 500);
    656         assert_eq!(nip11_value["limitation"]["max_query_complexity"], 2_048);
    657         assert_eq!(nip11_value["limitation"]["max_subid_length"], 64);
    658         assert_eq!(nip11_value["limitation"]["max_event_tags"], 200);
    659         assert_eq!(nip11_value["limitation"]["max_content_length"], 65_536);
    660         assert_eq!(nip11_value["limitation"]["auth_required"], false);
    661         assert_eq!(nip11_value["limitation"]["payment_required"], false);
    662         assert_eq!(nip11_value["limitation"]["restricted_writes"], true);
    663         assert_eq!(nip11_value["limitation"]["default_limit"], 100);
    664         assert_eq!(nip11_value["retention"]["physical_erasure"], false);
    665         assert_eq!(nip11_value["retention"]["compaction_guarantee"], false);
    666         assert!(
    667             nip11_value["supported_nips"]
    668                 .as_array()
    669                 .expect("nips")
    670                 .contains(&serde_json::json!(29))
    671         );
    672         assert_eq!(root_without_accept.status(), http::StatusCode::NOT_FOUND);
    673         assert_eq!(health.status(), http::StatusCode::NOT_FOUND);
    674         assert_eq!(ready.status(), http::StatusCode::OK);
    675         let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body");
    676         let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json");
    677         assert_eq!(ready_value["checks"]["active_tenants"], "ready");
    678         assert_eq!(metrics.status(), http::StatusCode::OK);
    679         let metrics_body = to_bytes(metrics.into_body(), usize::MAX)
    680             .await
    681             .expect("body");
    682         let metrics_value =
    683             serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
    684         assert_eq!(metrics_value["tangle_readiness_ready"], true);
    685         assert_eq!(metrics_value["tangle_host_active_tenants"], 1);
    686         assert_eq!(metrics_value["tangle_ws_connections_current"], 0);
    687         assert_eq!(metrics_value["tangle_stored_event_offsets_total"], 0);
    688         assert_eq!(tenants.status(), http::StatusCode::OK);
    689         let tenants_body = to_bytes(tenants.into_body(), usize::MAX)
    690             .await
    691             .expect("body");
    692         let tenants_value =
    693             serde_json::from_slice::<serde_json::Value>(&tenants_body).expect("json");
    694         assert_eq!(tenants_value["tenants"][0]["tenant_id"], "test-relay");
    695         assert_eq!(tenants_value["tenants"][0]["ready"], true);
    696         let root_body = to_bytes(root_without_accept.into_body(), usize::MAX)
    697             .await
    698             .expect("body");
    699         assert_eq!(
    700             String::from_utf8(root_body.to_vec()).expect("utf8"),
    701             "relay information requires application/nostr+json"
    702         );
    703         let _ = std::fs::remove_dir_all(root);
    704     }
    705 
    706     #[tokio::test]
    707     async fn tangle_http_router_enforces_host_ops_config() {
    708         let root = temp_root("http-router-ops-config");
    709         let _ = std::fs::remove_dir_all(&root);
    710         let inventory_enabled = ready_runtime(host_runtime_with_ops(&root, true, true));
    711         let inventory_disabled = ready_runtime(host_runtime_with_ops(&root, true, false));
    712         let ops_disabled = ready_runtime(host_runtime_with_ops(&root, false, true));
    713 
    714         let tenants = host_ops_response(
    715             &tangle_http_router(inventory_enabled),
    716             "/.well-known/tangle/tenants",
    717         )
    718         .await;
    719         assert_eq!(tenants.status(), http::StatusCode::OK);
    720         let tenants_json = response_json(tenants).await;
    721         assert_eq!(tenants_json["tenants"][0]["tenant_id"], "test-relay");
    722 
    723         let tenants = host_ops_response(
    724             &tangle_http_router(inventory_disabled),
    725             "/.well-known/tangle/tenants",
    726         )
    727         .await;
    728         assert_eq!(tenants.status(), http::StatusCode::NOT_FOUND);
    729         assert_eq!(
    730             response_text(tenants).await,
    731             "tangle host tenant inventory is disabled"
    732         );
    733 
    734         let router = tangle_http_router(ops_disabled);
    735         for path in [
    736             "/.well-known/tangle/ready",
    737             "/.well-known/tangle/metrics",
    738             "/.well-known/tangle/tenants",
    739         ] {
    740             let response = host_ops_response(&router, path).await;
    741             assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
    742             assert_eq!(
    743                 response_text(response).await,
    744                 "tangle host ops are disabled"
    745             );
    746         }
    747 
    748         let nip11 = nip11_response(&router, Some("relay.radroots.test"), None, 39_002).await;
    749         assert_eq!(nip11.status(), http::StatusCode::OK);
    750         let nip11_json = response_json(nip11).await;
    751         assert_eq!(nip11_json["name"], "Radroots Test Relay");
    752 
    753         let _ = std::fs::remove_dir_all(root);
    754     }
    755 
    756     #[tokio::test]
    757     async fn tangle_http_router_routes_by_host_and_fails_closed() {
    758         let root = temp_root("host-routing");
    759         let _ = std::fs::remove_dir_all(&root);
    760         let runtime = multi_host_runtime(&root);
    761         for tenant in runtime.registry().active_tenants() {
    762             tenant
    763                 .runtime()
    764                 .readiness_handle()
    765                 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready);
    766         }
    767         let router = tangle_http_router(runtime);
    768 
    769         let alpha = nip11_response(
    770             &router,
    771             Some("alpha.relay.test"),
    772             Some("beta.relay.test"),
    773             39_010,
    774         )
    775         .await;
    776         assert_eq!(alpha.status(), http::StatusCode::OK);
    777         let alpha = response_json(alpha).await;
    778         assert_eq!(alpha["name"], "Alpha Relay");
    779 
    780         let beta = nip11_response(&router, Some("beta.relay.test"), None, 39_011).await;
    781         assert_eq!(beta.status(), http::StatusCode::OK);
    782         let beta = response_json(beta).await;
    783         assert_eq!(beta["name"], "Beta Relay");
    784 
    785         let unknown = nip11_response(&router, Some("unknown.relay.test"), None, 39_012).await;
    786         assert_eq!(unknown.status(), http::StatusCode::NOT_FOUND);
    787         assert_eq!(response_text(unknown).await, "unknown host");
    788 
    789         let inactive = nip11_response(&router, Some("inactive.relay.test"), None, 39_013).await;
    790         assert_eq!(inactive.status(), http::StatusCode::NOT_FOUND);
    791         assert_eq!(response_text(inactive).await, "unknown host");
    792 
    793         let missing = nip11_response(&router, None, None, 39_014).await;
    794         assert_eq!(missing.status(), http::StatusCode::BAD_REQUEST);
    795         assert_eq!(response_text(missing).await, "missing host");
    796 
    797         for path in ["/healthz", "/readyz", "/metricsz"] {
    798             let legacy = router
    799                 .clone()
    800                 .oneshot(
    801                     Request::builder()
    802                         .uri(path)
    803                         .body(Body::empty())
    804                         .expect("request"),
    805                 )
    806                 .await
    807                 .expect("legacy route");
    808             assert_eq!(legacy.status(), http::StatusCode::NOT_FOUND);
    809         }
    810 
    811         let _ = std::fs::remove_dir_all(root);
    812     }
    813 
    814     #[tokio::test]
    815     async fn websocket_auth_rejects_cross_tenant_relay_url() {
    816         let root = temp_root("cross-tenant-auth");
    817         let _ = std::fs::remove_dir_all(&root);
    818         let runtime = multi_host_runtime(&root);
    819         let shutdown = runtime.shutdown_signal().clone();
    820         let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
    821         let address = listener.local_addr().expect("address");
    822         let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener));
    823         let mut request = format!("ws://{address}/")
    824             .into_client_request()
    825             .expect("request");
    826         request.headers_mut().insert(
    827             header::SEC_WEBSOCKET_PROTOCOL,
    828             http::HeaderValue::from_static("nostr"),
    829         );
    830         request.headers_mut().insert(
    831             header::HOST,
    832             http::HeaderValue::from_static("beta.relay.test"),
    833         );
    834         let (mut socket, response) = tokio_tungstenite::connect_async(request)
    835             .await
    836             .expect("websocket");
    837 
    838         assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    839         let challenge = read_auth_challenge(&mut socket).await;
    840         let created_at = current_unix_timestamp();
    841         let alpha_auth = tangle_v2_auth_event_for_relay(
    842             FixtureKey::Owner,
    843             &challenge,
    844             created_at,
    845             "wss://alpha.relay.test",
    846         )
    847         .expect("alpha auth");
    848         let beta_auth = tangle_v2_auth_event_for_relay(
    849             FixtureKey::Owner,
    850             &challenge,
    851             created_at.saturating_add(1),
    852             "wss://beta.relay.test",
    853         )
    854         .expect("beta auth");
    855 
    856         send_client_value(&mut socket, json!(["AUTH", event_to_value(&alpha_auth)])).await;
    857         assert_eq!(
    858             read_relay_value(&mut socket).await,
    859             json!([
    860                 "OK",
    861                 alpha_auth.id().as_str(),
    862                 false,
    863                 "auth-required: auth relay does not match canonical relay URL"
    864             ])
    865         );
    866 
    867         send_client_value(&mut socket, json!(["AUTH", event_to_value(&beta_auth)])).await;
    868         assert_eq!(
    869             read_relay_value(&mut socket).await,
    870             json!(["OK", beta_auth.id().as_str(), true, ""])
    871         );
    872 
    873         shutdown.request_shutdown();
    874         let report = timeout(Duration::from_secs(1), task)
    875             .await
    876             .expect("server shutdown")
    877             .expect("task")
    878             .expect("serve");
    879         assert_eq!(report.listen_addr(), address);
    880         let _ = std::fs::remove_dir_all(root);
    881     }
    882 
    883     fn tangle_v2_event(
    884         key: FixtureKey,
    885         created_at: u64,
    886         kind: u64,
    887         tags: Vec<Tag>,
    888         content: &str,
    889     ) -> Result<Event, String> {
    890         let event = server_pocket_event(key, created_at, kind, tags, content);
    891         server_pocket_event_to_protocol(&event)
    892     }
    893 
    894     fn tangle_v2_auth_event(
    895         key: FixtureKey,
    896         challenge: &str,
    897         created_at: u64,
    898     ) -> Result<Event, String> {
    899         tangle_v2_auth_event_for_relay(key, challenge, created_at, "wss://relay.radroots.test")
    900     }
    901 
    902     fn tangle_v2_auth_event_for_relay(
    903         key: FixtureKey,
    904         challenge: &str,
    905         created_at: u64,
    906         relay_url: &str,
    907     ) -> Result<Event, String> {
    908         tangle_v2_event(
    909             key,
    910             created_at,
    911             22_242,
    912             vec![
    913                 Tag::from_parts("relay", &[relay_url])?,
    914                 Tag::from_parts("challenge", &[challenge])?,
    915             ],
    916             "",
    917         )
    918     }
    919 
    920     fn server_pocket_event(
    921         key: FixtureKey,
    922         created_at: u64,
    923         kind: u64,
    924         tags: Vec<Tag>,
    925         content: &str,
    926     ) -> PocketOwnedEvent {
    927         let tags = server_pocket_tags_from_protocol(&tags);
    928         let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32);
    929         RelaySigner::from_secret_hex(&secret)
    930             .expect("signer")
    931             .sign_pocket_event(
    932                 PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")),
    933                 &tags,
    934                 PocketTime::from_u64(created_at),
    935                 content.as_bytes(),
    936             )
    937             .expect("pocket event")
    938     }
    939 
    940     fn server_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags {
    941         let parts = tags
    942             .iter()
    943             .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
    944             .collect::<Vec<_>>();
    945         PocketOwnedTags::new(&parts).expect("pocket tags")
    946     }
    947 
    948     fn server_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> {
    949         let tags = event
    950             .tags()
    951             .map_err(|error| error.to_string())?
    952             .iter()
    953             .map(|tag| {
    954                 Tag::new(
    955                     tag.map(|value| {
    956                         std::str::from_utf8(value)
    957                             .map(str::to_owned)
    958                             .map_err(|error| error.to_string())
    959                     })
    960                     .collect::<Result<Vec<_>, _>>()?,
    961                 )
    962                 .map_err(|error| error.to_string())
    963             })
    964             .collect::<Result<Vec<_>, _>>()?;
    965         Ok(Event::new(
    966             EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?,
    967             UnsignedEvent::new(
    968                 PublicKeyHex::new(&event.pubkey().as_hex_string())
    969                     .map_err(|error| error.to_string())?,
    970                 UnixTimestamp::new(event.created_at().as_u64()),
    971                 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?,
    972                 tags,
    973                 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?,
    974             ),
    975             SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?,
    976         ))
    977     }
    978 
    979     fn fixture_secret_byte(key: FixtureKey) -> u8 {
    980         match key {
    981             FixtureKey::Relay => 9,
    982             FixtureKey::Owner => 10,
    983             FixtureKey::Admin => 11,
    984             FixtureKey::Member => 12,
    985             FixtureKey::Outsider => 13,
    986         }
    987     }
    988 
    989     fn host_runtime(root: &Path) -> TangleHostRuntime {
    990         host_runtime_with_ops(root, true, true)
    991     }
    992 
    993     fn host_runtime_with_ops(
    994         root: &Path,
    995         ops_enabled: bool,
    996         expose_tenant_inventory: bool,
    997     ) -> TangleHostRuntime {
    998         host_runtime_from_tenants_with_host(
    999             host_config_value_with_ops(ops_enabled, expose_tenant_inventory),
   1000             vec![tenant_config_value(
   1001                 root,
   1002                 TenantConfigFixture {
   1003                     tenant_id: "test-relay",
   1004                     tenant_schema: "test_relay",
   1005                     host: "relay.radroots.test",
   1006                     relay_url: "wss://relay.radroots.test",
   1007                     name: "Radroots Test Relay",
   1008                     inactive: false,
   1009                     relay_secret_byte: 0x77,
   1010                 },
   1011             )],
   1012         )
   1013     }
   1014 
   1015     fn multi_host_runtime(root: &Path) -> TangleHostRuntime {
   1016         host_runtime_from_tenants(vec![
   1017             tenant_config_value(
   1018                 root,
   1019                 TenantConfigFixture {
   1020                     tenant_id: "alpha",
   1021                     tenant_schema: "alpha_schema",
   1022                     host: "alpha.relay.test",
   1023                     relay_url: "wss://alpha.relay.test",
   1024                     name: "Alpha Relay",
   1025                     inactive: false,
   1026                     relay_secret_byte: 0x77,
   1027                 },
   1028             ),
   1029             tenant_config_value(
   1030                 root,
   1031                 TenantConfigFixture {
   1032                     tenant_id: "beta",
   1033                     tenant_schema: "beta_schema",
   1034                     host: "beta.relay.test",
   1035                     relay_url: "wss://beta.relay.test",
   1036                     name: "Beta Relay",
   1037                     inactive: false,
   1038                     relay_secret_byte: 0x88,
   1039                 },
   1040             ),
   1041             tenant_config_value(
   1042                 root,
   1043                 TenantConfigFixture {
   1044                     tenant_id: "inactive",
   1045                     tenant_schema: "inactive_schema",
   1046                     host: "inactive.relay.test",
   1047                     relay_url: "wss://inactive.relay.test",
   1048                     name: "Inactive Relay",
   1049                     inactive: true,
   1050                     relay_secret_byte: 0x99,
   1051                 },
   1052             ),
   1053         ])
   1054     }
   1055 
   1056     fn host_runtime_from_tenants(tenant_values: Vec<serde_json::Value>) -> TangleHostRuntime {
   1057         host_runtime_from_tenants_with_host(host_config_value(), tenant_values)
   1058     }
   1059 
   1060     fn host_runtime_from_tenants_with_host(
   1061         host_value: serde_json::Value,
   1062         tenant_values: Vec<serde_json::Value>,
   1063     ) -> TangleHostRuntime {
   1064         let host =
   1065             parse_tangle_host_runtime_config_json(&host_value.to_string()).expect("host config");
   1066         let tenants = tenant_values
   1067             .into_iter()
   1068             .map(|tenant| parse_tenant_runtime_config_json(&tenant.to_string()).expect("tenant"))
   1069             .collect::<Vec<_>>();
   1070         let config = TangleHostRuntimeConfigSet::new(host, tenants).expect("config set");
   1071         TangleHostRuntime::open(config).expect("host runtime")
   1072     }
   1073 
   1074     fn host_config_value() -> serde_json::Value {
   1075         host_config_value_with_ops(true, true)
   1076     }
   1077 
   1078     fn host_config_value_with_ops(
   1079         ops_enabled: bool,
   1080         expose_tenant_inventory: bool,
   1081     ) -> serde_json::Value {
   1082         json!({
   1083             "listen_addr": "127.0.0.1:0",
   1084             "tenant_config_dir": "tenants",
   1085             "limits": {
   1086                 "max_total_connections": 64,
   1087                 "max_total_subscriptions": 256,
   1088                 "tenant_startup_concurrency": 4
   1089             },
   1090             "ops": {
   1091                 "enabled": ops_enabled,
   1092                 "expose_tenant_inventory": expose_tenant_inventory
   1093             }
   1094         })
   1095     }
   1096 
   1097     struct TenantConfigFixture<'a> {
   1098         tenant_id: &'a str,
   1099         tenant_schema: &'a str,
   1100         host: &'a str,
   1101         relay_url: &'a str,
   1102         name: &'a str,
   1103         inactive: bool,
   1104         relay_secret_byte: u8,
   1105     }
   1106 
   1107     fn tenant_config_value(root: &Path, fixture: TenantConfigFixture<'_>) -> serde_json::Value {
   1108         let relay_secret = format!("{:02x}", fixture.relay_secret_byte).repeat(32);
   1109         json!({
   1110             "tenant_id": fixture.tenant_id,
   1111             "tenant_schema": fixture.tenant_schema,
   1112             "host": fixture.host,
   1113             "relay_url": fixture.relay_url,
   1114             "inactive": fixture.inactive,
   1115             "info": {
   1116                 "name": fixture.name
   1117             },
   1118             "pocket": {
   1119                 "data_directory": root.join(format!("{}-pocket", fixture.tenant_id)),
   1120                 "sync_policy": "flush_on_shutdown",
   1121             },
   1122             "pocket_query": {
   1123               "allow_scraping": false,
   1124               "allow_scrape_if_limited_to": 100,
   1125               "allow_scrape_if_max_seconds": 3600
   1126             },
   1127             "groups": {
   1128                 "enabled": true,
   1129                 "canonical_relay_url": fixture.relay_url,
   1130                 "relay_secret": relay_secret,
   1131                 "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"]
   1132             },
   1133             "auth": {
   1134                 "challenge_ttl_seconds": 300,
   1135                 "created_at_skew_seconds": 600
   1136             },
   1137             "limits": {
   1138                 "max_message_length": 1048576,
   1139                 "max_subid_length": 64,
   1140                 "max_subscriptions_per_connection": 64,
   1141                 "max_filters_per_request": 10,
   1142                 "max_tag_values_per_filter": 100,
   1143                 "max_query_complexity": 2048,
   1144                 "max_limit": 500,
   1145                 "default_limit": 100,
   1146                 "max_event_tags": 200,
   1147                 "max_content_length": 65536,
   1148                 "broadcast_channel_capacity": 8,
   1149                 "per_connection_outbound_queue": 8
   1150             },
   1151             "rate_limits": {
   1152                 "auth": {
   1153                     "per_ip": {"window_seconds": 60, "max_hits": 120},
   1154                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
   1155                     "failures": {"window_seconds": 300, "max_hits": 5},
   1156                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
   1157                 },
   1158                 "event": {
   1159                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   1160                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   1161                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
   1162                 },
   1163                 "group": {
   1164                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
   1165                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
   1166                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
   1167                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
   1168                     "join_flow": {"window_seconds": 300, "max_hits": 10},
   1169                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
   1170                 },
   1171                 "req": {
   1172                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   1173                     "per_connection": {"window_seconds": 60, "max_hits": 120},
   1174                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
   1175                     "per_group": {"window_seconds": 60, "max_hits": 240},
   1176                     "per_kind": {"window_seconds": 60, "max_hits": 500},
   1177                     "broad": {"window_seconds": 60, "max_hits": 30}
   1178                 },
   1179                 "count": {
   1180                     "per_ip": {"window_seconds": 60, "max_hits": 300},
   1181                     "per_connection": {"window_seconds": 60, "max_hits": 60},
   1182                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   1183                     "per_group": {"window_seconds": 60, "max_hits": 120},
   1184                     "per_kind": {"window_seconds": 60, "max_hits": 240},
   1185                     "broad": {"window_seconds": 60, "max_hits": 20}
   1186                 }
   1187             }
   1188         })
   1189     }
   1190 
   1191     async fn nip11_response(
   1192         router: &axum::Router,
   1193         host: Option<&str>,
   1194         forwarded_host: Option<&str>,
   1195         peer_port: u16,
   1196     ) -> http::Response<Body> {
   1197         let mut builder = Request::builder()
   1198             .uri("/")
   1199             .header(header::ACCEPT, "application/nostr+json")
   1200             .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], peer_port))));
   1201         if let Some(host) = host {
   1202             builder = builder.header(header::HOST, host);
   1203         }
   1204         if let Some(forwarded_host) = forwarded_host {
   1205             builder = builder.header("x-forwarded-host", forwarded_host);
   1206         }
   1207         router
   1208             .clone()
   1209             .oneshot(builder.body(Body::empty()).expect("request"))
   1210             .await
   1211             .expect("response")
   1212     }
   1213 
   1214     async fn host_ops_response(router: &axum::Router, path: &str) -> http::Response<Body> {
   1215         router
   1216             .clone()
   1217             .oneshot(
   1218                 Request::builder()
   1219                     .uri(path)
   1220                     .body(Body::empty())
   1221                     .expect("request"),
   1222             )
   1223             .await
   1224             .expect("response")
   1225     }
   1226 
   1227     fn ready_runtime(runtime: TangleHostRuntime) -> TangleHostRuntime {
   1228         for tenant in runtime.registry().active_tenants() {
   1229             tenant
   1230                 .runtime()
   1231                 .readiness_handle()
   1232                 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready);
   1233         }
   1234         runtime
   1235     }
   1236 
   1237     async fn response_json(response: http::Response<Body>) -> serde_json::Value {
   1238         let body = to_bytes(response.into_body(), usize::MAX)
   1239             .await
   1240             .expect("body");
   1241         serde_json::from_slice::<serde_json::Value>(&body).expect("json")
   1242     }
   1243 
   1244     async fn response_text(response: http::Response<Body>) -> String {
   1245         let body = to_bytes(response.into_body(), usize::MAX)
   1246             .await
   1247             .expect("body");
   1248         String::from_utf8(body.to_vec()).expect("utf8")
   1249     }
   1250 
   1251     fn temp_root(name: &str) -> PathBuf {
   1252         std::env::temp_dir().join(format!("tangle-server-{name}-{}", std::process::id()))
   1253     }
   1254 
   1255     type TestWebSocket = tokio_tungstenite::WebSocketStream<
   1256         tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
   1257     >;
   1258 
   1259     async fn send_client_value(socket: &mut TestWebSocket, value: serde_json::Value) {
   1260         send_client_text(socket, &value.to_string()).await;
   1261     }
   1262 
   1263     async fn send_client_text(socket: &mut TestWebSocket, value: &str) {
   1264         socket
   1265             .send(TungsteniteMessage::Text(value.to_owned().into()))
   1266             .await
   1267             .expect("send client message");
   1268     }
   1269 
   1270     async fn read_relay_value(socket: &mut TestWebSocket) -> serde_json::Value {
   1271         let message = timeout(Duration::from_secs(1), socket.next())
   1272             .await
   1273             .expect("relay message timeout")
   1274             .expect("relay message")
   1275             .expect("relay message result");
   1276         let TungsteniteMessage::Text(text) = message else {
   1277             panic!("expected relay text message, got {message:?}");
   1278         };
   1279         serde_json::from_str(text.as_str()).expect("relay json")
   1280     }
   1281 
   1282     async fn read_auth_challenge(socket: &mut TestWebSocket) -> String {
   1283         let auth = read_relay_value(socket).await;
   1284         assert_eq!(auth[0], "AUTH");
   1285         auth[1].as_str().expect("auth challenge").to_owned()
   1286     }
   1287 
   1288     fn current_unix_timestamp() -> u64 {
   1289         SystemTime::now()
   1290             .duration_since(UNIX_EPOCH)
   1291             .expect("system time")
   1292             .as_secs()
   1293     }
   1294 }