server.rs (48313B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 errors::BaseRelayError, 5 host::{HostResolutionError, TangleHostRuntime, TenantRuntimeEntry}, 6 logging, 7 nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response}, 8 ops::BaseRelayReadinessCheckStatus, 9 session::TangleWebSocketSession, 10 }; 11 use axum::{ 12 Json, Router, 13 extract::{ 14 ConnectInfo, State, 15 ws::{WebSocketUpgrade, rejection::WebSocketUpgradeRejection}, 16 }, 17 response::{IntoResponse, Response}, 18 routing::get, 19 }; 20 use http::{HeaderMap, StatusCode}; 21 use std::net::SocketAddr; 22 use tokio::net::TcpListener; 23 24 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 25 pub struct TangleServeReport { 26 listen_addr: SocketAddr, 27 closed_subscriptions: usize, 28 } 29 30 impl TangleServeReport { 31 pub fn new(listen_addr: SocketAddr, closed_subscriptions: usize) -> Self { 32 Self { 33 listen_addr, 34 closed_subscriptions, 35 } 36 } 37 38 pub fn listen_addr(self) -> SocketAddr { 39 self.listen_addr 40 } 41 42 pub fn closed_subscriptions(self) -> usize { 43 self.closed_subscriptions 44 } 45 } 46 47 pub async fn serve_until_shutdown( 48 runtime: TangleHostRuntime, 49 ) -> Result<TangleServeReport, BaseRelayError> { 50 let listener = TcpListener::bind(runtime.config().host().listen_addr()) 51 .await 52 .map_err(|error| BaseRelayError::error(error.to_string()))?; 53 serve_listener_until_shutdown(runtime, listener).await 54 } 55 56 pub async fn serve_listener_until_shutdown( 57 runtime: TangleHostRuntime, 58 listener: TcpListener, 59 ) -> Result<TangleServeReport, BaseRelayError> { 60 let listen_addr = listener 61 .local_addr() 62 .map_err(|error| BaseRelayError::error(error.to_string()))?; 63 for tenant in runtime.registry().active_tenants() { 64 tenant 65 .runtime() 66 .readiness_handle() 67 .set_server_bind(BaseRelayReadinessCheckStatus::Ready); 68 } 69 let shutdown_signal = runtime.shutdown_signal().clone(); 70 let router = tangle_http_router(runtime.clone()); 71 let mut shutdown = shutdown_signal.subscribe(); 72 logging::log_server_listening(listen_addr, "tangle-host"); 73 axum::serve( 74 listener, 75 router.into_make_service_with_connect_info::<SocketAddr>(), 76 ) 77 .with_graceful_shutdown(async move { 78 loop { 79 if *shutdown.borrow() { 80 break; 81 } 82 if shutdown.changed().await.is_err() { 83 break; 84 } 85 } 86 }) 87 .await 88 .map_err(|error| BaseRelayError::error(error.to_string()))?; 89 let shutdown = runtime.shutdown().await?; 90 logging::log_server_shutdown(listen_addr, shutdown.closed_subscriptions()); 91 Ok(TangleServeReport::new( 92 listen_addr, 93 shutdown.closed_subscriptions(), 94 )) 95 } 96 97 pub fn tangle_http_router(runtime: TangleHostRuntime) -> Router { 98 Router::new() 99 .route("/", get(tangle_root)) 100 .route("/.well-known/tangle/ready", get(tangle_host_ready)) 101 .route("/.well-known/tangle/metrics", get(tangle_host_metrics)) 102 .route("/.well-known/tangle/tenants", get(tangle_host_tenants)) 103 .with_state(TangleHttpState { runtime }) 104 } 105 106 #[derive(Debug, Clone)] 107 struct TangleHttpState { 108 runtime: TangleHostRuntime, 109 } 110 111 async fn tangle_root( 112 State(state): State<TangleHttpState>, 113 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>, 114 websocket: Result<WebSocketUpgrade, WebSocketUpgradeRejection>, 115 headers: HeaderMap, 116 ) -> Response { 117 let tenant = match state.runtime.tenant_for_request(&headers, peer_addr) { 118 Ok(tenant) => tenant.clone(), 119 Err(error) => return error.into_response(), 120 }; 121 match websocket { 122 Ok(websocket) => { 123 let connection = match state.runtime.resources().try_open_connection() { 124 Ok(connection) => connection, 125 Err(error) => { 126 return (StatusCode::TOO_MANY_REQUESTS, error.prefixed_message()) 127 .into_response(); 128 } 129 }; 130 let tenant_runtime = tenant.runtime().clone(); 131 let session = match tenant_runtime.auth_state().await { 132 Ok(auth) => TangleWebSocketSession::new_with_peer_and_resources( 133 tenant_runtime.limits(), 134 state.runtime.shutdown_signal().subscribe(), 135 tenant_runtime.clone(), 136 auth, 137 tenant_runtime.subscribe_events().await, 138 Some(peer_addr.ip()), 139 Some(state.runtime.resources()), 140 ), 141 Err(error) => Err(error), 142 }; 143 match session { 144 Ok(session) => websocket 145 .protocols(["nostr"]) 146 .on_upgrade(move |socket| async move { 147 let _connection = connection; 148 session.run(socket).await; 149 }) 150 .into_response(), 151 Err(error) => ( 152 http::StatusCode::INTERNAL_SERVER_ERROR, 153 error.prefixed_message(), 154 ) 155 .into_response(), 156 } 157 } 158 Err(_) => match tenant_info_document(&tenant) { 159 Ok(info) => base_relay_info_response(info, headers), 160 Err(error) => { 161 (StatusCode::INTERNAL_SERVER_ERROR, error.prefixed_message()).into_response() 162 } 163 }, 164 } 165 } 166 167 async fn tangle_host_ready(State(state): State<TangleHttpState>) -> Response { 168 if !state.runtime.config().host().ops().enabled() { 169 return host_ops_disabled_response(); 170 } 171 let readiness = state.runtime.readiness_state(); 172 let status = if readiness.is_ready() { 173 StatusCode::OK 174 } else { 175 StatusCode::SERVICE_UNAVAILABLE 176 }; 177 ( 178 status, 179 Json(serde_json::json!({ 180 "status": if readiness.is_ready() { "ready" } else { "not_ready" }, 181 "checks": { 182 "config": readiness.config().as_str(), 183 "tenant_registry": readiness.tenant_registry().as_str(), 184 "active_tenants": readiness.active_tenants().as_str(), 185 "shutdown_requested": readiness.shutdown_requested() 186 } 187 })), 188 ) 189 .into_response() 190 } 191 192 async fn tangle_host_metrics(State(state): State<TangleHttpState>) -> Response { 193 if !state.runtime.config().host().ops().enabled() { 194 return host_ops_disabled_response(); 195 } 196 let metrics = state.runtime.metrics_snapshot(); 197 let mut values = serde_json::Map::new(); 198 values.insert( 199 "tangle_host_configured_tenants".to_owned(), 200 serde_json::json!(metrics.configured_tenants()), 201 ); 202 values.insert( 203 "tangle_host_active_tenants".to_owned(), 204 serde_json::json!(metrics.active_tenants()), 205 ); 206 values.insert( 207 "tangle_host_inactive_tenants".to_owned(), 208 serde_json::json!(metrics.inactive_tenants()), 209 ); 210 values.insert( 211 "tangle_host_ws_connections_current".to_owned(), 212 serde_json::json!(metrics.active_connections()), 213 ); 214 values.insert( 215 "tangle_host_subscriptions_current".to_owned(), 216 serde_json::json!(metrics.active_subscriptions()), 217 ); 218 values.insert( 219 "tangle_host_ws_connections_limit".to_owned(), 220 serde_json::json!(metrics.max_total_connections()), 221 ); 222 values.insert( 223 "tangle_host_subscriptions_limit".to_owned(), 224 serde_json::json!(metrics.max_total_subscriptions()), 225 ); 226 values.insert( 227 "tangle_readiness_ready".to_owned(), 228 serde_json::json!(state.runtime.readiness_state().is_ready()), 229 ); 230 for tenant in state.runtime.registry().active_tenants() { 231 let snapshot = tenant 232 .runtime() 233 .metrics() 234 .snapshot_with_readiness(tenant.runtime().readiness_handle().snapshot().is_ready()); 235 let serde_json::Value::Object(snapshot) = 236 serde_json::to_value(snapshot).expect("tenant metrics serialize") 237 else { 238 continue; 239 }; 240 for (key, value) in snapshot { 241 if key == "tangle_readiness_ready" { 242 continue; 243 } 244 if let Some(value) = value.as_u64() { 245 let current = values 246 .get(&key) 247 .and_then(serde_json::Value::as_u64) 248 .unwrap_or(0); 249 values.insert(key, serde_json::json!(current.saturating_add(value))); 250 } 251 } 252 } 253 Json(serde_json::Value::Object(values)).into_response() 254 } 255 256 async fn tangle_host_tenants(State(state): State<TangleHttpState>) -> Response { 257 let ops = state.runtime.config().host().ops(); 258 if !ops.enabled() { 259 return host_ops_disabled_response(); 260 } 261 if !ops.expose_tenant_inventory() { 262 return ( 263 StatusCode::NOT_FOUND, 264 "tangle host tenant inventory is disabled", 265 ) 266 .into_response(); 267 } 268 let tenants = state 269 .runtime 270 .tenant_inventory() 271 .into_iter() 272 .map(|tenant| { 273 serde_json::json!({ 274 "tenant_id": tenant.tenant_id().as_str(), 275 "tenant_schema": tenant.tenant_schema().as_str(), 276 "host": tenant.host().as_str(), 277 "relay_url": tenant.relay_url().as_str(), 278 "status": if tenant.active() { "active" } else { "inactive" }, 279 "ready": tenant.ready() 280 }) 281 }) 282 .collect::<Vec<_>>(); 283 Json(serde_json::json!({ "tenants": tenants })).into_response() 284 } 285 286 fn host_ops_disabled_response() -> Response { 287 (StatusCode::NOT_FOUND, "tangle host ops are disabled").into_response() 288 } 289 290 fn tenant_info_document( 291 tenant: &TenantRuntimeEntry, 292 ) -> Result<BaseRelayInfoDocument, BaseRelayError> { 293 BaseRelayInfoConfig::from_tenant_config(tenant.config())?.build_document() 294 } 295 296 impl IntoResponse for HostResolutionError { 297 fn into_response(self) -> Response { 298 match self { 299 Self::Missing => (StatusCode::BAD_REQUEST, "missing host").into_response(), 300 Self::Invalid => (StatusCode::BAD_REQUEST, "invalid host").into_response(), 301 Self::Unknown => (StatusCode::NOT_FOUND, "unknown host").into_response(), 302 } 303 } 304 } 305 306 #[cfg(test)] 307 mod tests { 308 use super::{serve_until_shutdown, tangle_http_router}; 309 use crate::{ 310 config::{ 311 TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json, 312 parse_tenant_runtime_config_json, 313 }, 314 host::TangleHostRuntime, 315 }; 316 use axum::{ 317 body::{Body, to_bytes}, 318 extract::ConnectInfo, 319 }; 320 use futures_util::{SinkExt, StreamExt}; 321 use http::{Request, header}; 322 use serde_json::json; 323 use std::{ 324 net::SocketAddr, 325 path::{Path, PathBuf}, 326 time::{SystemTime, UNIX_EPOCH}, 327 }; 328 use tangle_crypto::RelaySigner; 329 use tangle_protocol::{ 330 Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, 331 event_to_value, 332 }; 333 use tangle_store_pocket::{ 334 PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime, 335 }; 336 use tangle_test_support::FixtureKey; 337 use tokio::net::TcpListener; 338 use tokio::time::{Duration, timeout}; 339 use tokio_tungstenite::tungstenite::{ 340 Message as TungsteniteMessage, client::IntoClientRequest, 341 }; 342 use tower::ServiceExt; 343 344 #[tokio::test] 345 async fn serve_until_shutdown_binds_listener_and_exits_on_signal() { 346 let root = temp_root("serve-until-shutdown"); 347 let _ = std::fs::remove_dir_all(&root); 348 let runtime = host_runtime(&root); 349 let shutdown = runtime.shutdown_signal().clone(); 350 let task = tokio::spawn(serve_until_shutdown(runtime)); 351 352 tokio::task::yield_now().await; 353 shutdown.request_shutdown(); 354 355 let report = task.await.expect("task").expect("serve"); 356 assert_eq!(report.listen_addr().ip().to_string(), "127.0.0.1"); 357 assert_ne!(report.listen_addr().port(), 0); 358 assert_eq!(report.closed_subscriptions(), 0); 359 360 let _ = std::fs::remove_dir_all(root); 361 } 362 363 #[tokio::test] 364 async fn serve_until_shutdown_accepts_websocket_upgrade() { 365 let root = temp_root("websocket-upgrade"); 366 let _ = std::fs::remove_dir_all(&root); 367 let runtime = host_runtime(&root); 368 let shutdown = runtime.shutdown_signal().clone(); 369 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 370 let address = listener.local_addr().expect("address"); 371 let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); 372 let mut request = format!("ws://{address}/") 373 .into_client_request() 374 .expect("request"); 375 request.headers_mut().insert( 376 header::SEC_WEBSOCKET_PROTOCOL, 377 http::HeaderValue::from_static("nostr"), 378 ); 379 request.headers_mut().insert( 380 header::HOST, 381 http::HeaderValue::from_static("relay.radroots.test"), 382 ); 383 384 let (_socket, response) = tokio_tungstenite::connect_async(request) 385 .await 386 .expect("websocket"); 387 388 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 389 assert_eq!( 390 response 391 .headers() 392 .get(header::SEC_WEBSOCKET_PROTOCOL) 393 .expect("protocol"), 394 "nostr" 395 ); 396 397 shutdown.request_shutdown(); 398 let report = task.await.expect("task").expect("serve"); 399 assert_eq!(report.listen_addr(), address); 400 let _ = std::fs::remove_dir_all(root); 401 } 402 403 #[tokio::test] 404 async fn serve_until_shutdown_closes_websocket_sessions() { 405 let root = temp_root("websocket-shutdown"); 406 let _ = std::fs::remove_dir_all(&root); 407 let runtime = host_runtime(&root); 408 let shutdown = runtime.shutdown_signal().clone(); 409 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 410 let address = listener.local_addr().expect("address"); 411 let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); 412 let mut request = format!("ws://{address}/") 413 .into_client_request() 414 .expect("request"); 415 request.headers_mut().insert( 416 header::SEC_WEBSOCKET_PROTOCOL, 417 http::HeaderValue::from_static("nostr"), 418 ); 419 request.headers_mut().insert( 420 header::HOST, 421 http::HeaderValue::from_static("relay.radroots.test"), 422 ); 423 let (mut socket, response) = tokio_tungstenite::connect_async(request) 424 .await 425 .expect("websocket"); 426 427 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 428 let _ = read_auth_challenge(&mut socket).await; 429 430 shutdown.request_shutdown(); 431 432 let next = timeout(Duration::from_secs(1), socket.next()) 433 .await 434 .expect("websocket close"); 435 match next { 436 Some(Ok(TungsteniteMessage::Close(_))) | None => {} 437 other => panic!("expected websocket close, got {other:?}"), 438 } 439 let report = timeout(Duration::from_secs(1), task) 440 .await 441 .expect("server shutdown") 442 .expect("task") 443 .expect("serve"); 444 assert_eq!(report.listen_addr(), address); 445 let _ = std::fs::remove_dir_all(root); 446 } 447 448 #[tokio::test] 449 async fn websocket_session_dispatches_base_client_messages() { 450 let root = temp_root("websocket-dispatch"); 451 let _ = std::fs::remove_dir_all(&root); 452 let runtime = host_runtime(&root); 453 let shutdown = runtime.shutdown_signal().clone(); 454 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 455 let address = listener.local_addr().expect("address"); 456 let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); 457 let mut request = format!("ws://{address}/") 458 .into_client_request() 459 .expect("request"); 460 request.headers_mut().insert( 461 header::SEC_WEBSOCKET_PROTOCOL, 462 http::HeaderValue::from_static("nostr"), 463 ); 464 request.headers_mut().insert( 465 header::HOST, 466 http::HeaderValue::from_static("relay.radroots.test"), 467 ); 468 let (mut socket, response) = tokio_tungstenite::connect_async(request) 469 .await 470 .expect("websocket"); 471 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello") 472 .expect("event"); 473 474 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 475 let challenge = read_auth_challenge(&mut socket).await; 476 assert_eq!(challenge.len(), 64); 477 assert_eq!(challenge, challenge.to_ascii_lowercase()); 478 479 let auth_created_at = current_unix_timestamp(); 480 let owner_auth = tangle_v2_auth_event(FixtureKey::Owner, &challenge, auth_created_at) 481 .expect("owner auth"); 482 let admin_auth = tangle_v2_auth_event( 483 FixtureKey::Admin, 484 &challenge, 485 auth_created_at.saturating_add(1), 486 ) 487 .expect("admin auth"); 488 489 send_client_text(&mut socket, "{").await; 490 let notice = read_relay_value(&mut socket).await; 491 assert_eq!(notice[0], "NOTICE"); 492 assert!( 493 notice[1] 494 .as_str() 495 .expect("notice") 496 .starts_with("invalid: client message JSON is invalid:") 497 ); 498 499 send_client_value(&mut socket, json!(["EVENT", event_to_value(&event)])).await; 500 assert_eq!( 501 read_relay_value(&mut socket).await, 502 json!(["OK", event.id().as_str(), true, ""]) 503 ); 504 505 send_client_value( 506 &mut socket, 507 json!(["COUNT", "count-a", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_433}]), 508 ) 509 .await; 510 assert_eq!( 511 read_relay_value(&mut socket).await, 512 json!(["COUNT", "count-a", {"count": 1}]) 513 ); 514 515 send_client_value(&mut socket, json!(["REQ", "sub-a", {}])).await; 516 let req_event = read_relay_value(&mut socket).await; 517 assert_eq!(req_event[0], "EVENT"); 518 assert_eq!(req_event[1], "sub-a"); 519 assert_eq!(req_event[2]["id"], event.id().as_str()); 520 assert_eq!( 521 read_relay_value(&mut socket).await, 522 json!(["EOSE", "sub-a"]) 523 ); 524 525 send_client_value( 526 &mut socket, 527 json!(["REQ", "sub-search", {"search": "fresh carrots", "limit": 1}]), 528 ) 529 .await; 530 assert_eq!( 531 read_relay_value(&mut socket).await, 532 json!([ 533 "CLOSED", 534 "sub-search", 535 "unsupported: search filters are not supported" 536 ]) 537 ); 538 539 send_client_value(&mut socket, json!(["AUTH", event_to_value(&owner_auth)])).await; 540 assert_eq!( 541 read_relay_value(&mut socket).await, 542 json!(["OK", owner_auth.id().as_str(), true, ""]) 543 ); 544 545 send_client_value(&mut socket, json!(["AUTH", event_to_value(&admin_auth)])).await; 546 assert_eq!( 547 read_relay_value(&mut socket).await, 548 json!(["OK", admin_auth.id().as_str(), true, ""]) 549 ); 550 551 send_client_value(&mut socket, json!(["CLOSE", "sub-a"])).await; 552 assert!( 553 timeout(Duration::from_millis(50), socket.next()) 554 .await 555 .is_err() 556 ); 557 558 shutdown.request_shutdown(); 559 let report = timeout(Duration::from_secs(1), task) 560 .await 561 .expect("server shutdown") 562 .expect("task") 563 .expect("serve"); 564 assert_eq!(report.listen_addr(), address); 565 let _ = std::fs::remove_dir_all(root); 566 } 567 568 #[tokio::test] 569 async fn tangle_http_router_serves_nip11_and_host_ops_routes() { 570 let root = temp_root("http-router"); 571 let runtime = host_runtime(&root); 572 for tenant in runtime.registry().active_tenants() { 573 tenant 574 .runtime() 575 .readiness_handle() 576 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready); 577 } 578 let router = tangle_http_router(runtime); 579 let nip11 = router 580 .clone() 581 .oneshot( 582 Request::builder() 583 .uri("/") 584 .header(header::HOST, "relay.radroots.test") 585 .header(header::ACCEPT, "application/nostr+json") 586 .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_000)))) 587 .body(axum::body::Body::empty()) 588 .expect("request"), 589 ) 590 .await 591 .expect("nip11"); 592 let root_without_accept = router 593 .clone() 594 .oneshot( 595 Request::builder() 596 .uri("/") 597 .header(header::HOST, "relay.radroots.test") 598 .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_001)))) 599 .body(axum::body::Body::empty()) 600 .expect("request"), 601 ) 602 .await 603 .expect("root"); 604 let health = router 605 .clone() 606 .oneshot( 607 Request::builder() 608 .uri("/healthz") 609 .body(axum::body::Body::empty()) 610 .expect("request"), 611 ) 612 .await 613 .expect("health"); 614 let ready = router 615 .clone() 616 .oneshot( 617 Request::builder() 618 .uri("/.well-known/tangle/ready") 619 .body(axum::body::Body::empty()) 620 .expect("request"), 621 ) 622 .await 623 .expect("ready"); 624 let metrics = router 625 .clone() 626 .oneshot( 627 Request::builder() 628 .uri("/.well-known/tangle/metrics") 629 .body(axum::body::Body::empty()) 630 .expect("request"), 631 ) 632 .await 633 .expect("metrics"); 634 let tenants = router 635 .oneshot( 636 Request::builder() 637 .uri("/.well-known/tangle/tenants") 638 .body(axum::body::Body::empty()) 639 .expect("request"), 640 ) 641 .await 642 .expect("tenants"); 643 644 assert_eq!(nip11.status(), http::StatusCode::OK); 645 assert_eq!( 646 nip11.headers().get(header::CONTENT_TYPE).expect("type"), 647 "application/nostr+json" 648 ); 649 let nip11_body = to_bytes(nip11.into_body(), usize::MAX).await.expect("body"); 650 let nip11_value = serde_json::from_slice::<serde_json::Value>(&nip11_body).expect("json"); 651 assert_eq!(nip11_value["name"], "Radroots Test Relay"); 652 assert_eq!(nip11_value["limitation"]["max_message_length"], 1_048_576); 653 assert_eq!(nip11_value["limitation"]["max_subscriptions"], 64); 654 assert_eq!(nip11_value["limitation"]["max_filters"], 10); 655 assert_eq!(nip11_value["limitation"]["max_limit"], 500); 656 assert_eq!(nip11_value["limitation"]["max_query_complexity"], 2_048); 657 assert_eq!(nip11_value["limitation"]["max_subid_length"], 64); 658 assert_eq!(nip11_value["limitation"]["max_event_tags"], 200); 659 assert_eq!(nip11_value["limitation"]["max_content_length"], 65_536); 660 assert_eq!(nip11_value["limitation"]["auth_required"], false); 661 assert_eq!(nip11_value["limitation"]["payment_required"], false); 662 assert_eq!(nip11_value["limitation"]["restricted_writes"], true); 663 assert_eq!(nip11_value["limitation"]["default_limit"], 100); 664 assert_eq!(nip11_value["retention"]["physical_erasure"], false); 665 assert_eq!(nip11_value["retention"]["compaction_guarantee"], false); 666 assert!( 667 nip11_value["supported_nips"] 668 .as_array() 669 .expect("nips") 670 .contains(&serde_json::json!(29)) 671 ); 672 assert_eq!(root_without_accept.status(), http::StatusCode::NOT_FOUND); 673 assert_eq!(health.status(), http::StatusCode::NOT_FOUND); 674 assert_eq!(ready.status(), http::StatusCode::OK); 675 let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); 676 let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json"); 677 assert_eq!(ready_value["checks"]["active_tenants"], "ready"); 678 assert_eq!(metrics.status(), http::StatusCode::OK); 679 let metrics_body = to_bytes(metrics.into_body(), usize::MAX) 680 .await 681 .expect("body"); 682 let metrics_value = 683 serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json"); 684 assert_eq!(metrics_value["tangle_readiness_ready"], true); 685 assert_eq!(metrics_value["tangle_host_active_tenants"], 1); 686 assert_eq!(metrics_value["tangle_ws_connections_current"], 0); 687 assert_eq!(metrics_value["tangle_stored_event_offsets_total"], 0); 688 assert_eq!(tenants.status(), http::StatusCode::OK); 689 let tenants_body = to_bytes(tenants.into_body(), usize::MAX) 690 .await 691 .expect("body"); 692 let tenants_value = 693 serde_json::from_slice::<serde_json::Value>(&tenants_body).expect("json"); 694 assert_eq!(tenants_value["tenants"][0]["tenant_id"], "test-relay"); 695 assert_eq!(tenants_value["tenants"][0]["ready"], true); 696 let root_body = to_bytes(root_without_accept.into_body(), usize::MAX) 697 .await 698 .expect("body"); 699 assert_eq!( 700 String::from_utf8(root_body.to_vec()).expect("utf8"), 701 "relay information requires application/nostr+json" 702 ); 703 let _ = std::fs::remove_dir_all(root); 704 } 705 706 #[tokio::test] 707 async fn tangle_http_router_enforces_host_ops_config() { 708 let root = temp_root("http-router-ops-config"); 709 let _ = std::fs::remove_dir_all(&root); 710 let inventory_enabled = ready_runtime(host_runtime_with_ops(&root, true, true)); 711 let inventory_disabled = ready_runtime(host_runtime_with_ops(&root, true, false)); 712 let ops_disabled = ready_runtime(host_runtime_with_ops(&root, false, true)); 713 714 let tenants = host_ops_response( 715 &tangle_http_router(inventory_enabled), 716 "/.well-known/tangle/tenants", 717 ) 718 .await; 719 assert_eq!(tenants.status(), http::StatusCode::OK); 720 let tenants_json = response_json(tenants).await; 721 assert_eq!(tenants_json["tenants"][0]["tenant_id"], "test-relay"); 722 723 let tenants = host_ops_response( 724 &tangle_http_router(inventory_disabled), 725 "/.well-known/tangle/tenants", 726 ) 727 .await; 728 assert_eq!(tenants.status(), http::StatusCode::NOT_FOUND); 729 assert_eq!( 730 response_text(tenants).await, 731 "tangle host tenant inventory is disabled" 732 ); 733 734 let router = tangle_http_router(ops_disabled); 735 for path in [ 736 "/.well-known/tangle/ready", 737 "/.well-known/tangle/metrics", 738 "/.well-known/tangle/tenants", 739 ] { 740 let response = host_ops_response(&router, path).await; 741 assert_eq!(response.status(), http::StatusCode::NOT_FOUND); 742 assert_eq!( 743 response_text(response).await, 744 "tangle host ops are disabled" 745 ); 746 } 747 748 let nip11 = nip11_response(&router, Some("relay.radroots.test"), None, 39_002).await; 749 assert_eq!(nip11.status(), http::StatusCode::OK); 750 let nip11_json = response_json(nip11).await; 751 assert_eq!(nip11_json["name"], "Radroots Test Relay"); 752 753 let _ = std::fs::remove_dir_all(root); 754 } 755 756 #[tokio::test] 757 async fn tangle_http_router_routes_by_host_and_fails_closed() { 758 let root = temp_root("host-routing"); 759 let _ = std::fs::remove_dir_all(&root); 760 let runtime = multi_host_runtime(&root); 761 for tenant in runtime.registry().active_tenants() { 762 tenant 763 .runtime() 764 .readiness_handle() 765 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready); 766 } 767 let router = tangle_http_router(runtime); 768 769 let alpha = nip11_response( 770 &router, 771 Some("alpha.relay.test"), 772 Some("beta.relay.test"), 773 39_010, 774 ) 775 .await; 776 assert_eq!(alpha.status(), http::StatusCode::OK); 777 let alpha = response_json(alpha).await; 778 assert_eq!(alpha["name"], "Alpha Relay"); 779 780 let beta = nip11_response(&router, Some("beta.relay.test"), None, 39_011).await; 781 assert_eq!(beta.status(), http::StatusCode::OK); 782 let beta = response_json(beta).await; 783 assert_eq!(beta["name"], "Beta Relay"); 784 785 let unknown = nip11_response(&router, Some("unknown.relay.test"), None, 39_012).await; 786 assert_eq!(unknown.status(), http::StatusCode::NOT_FOUND); 787 assert_eq!(response_text(unknown).await, "unknown host"); 788 789 let inactive = nip11_response(&router, Some("inactive.relay.test"), None, 39_013).await; 790 assert_eq!(inactive.status(), http::StatusCode::NOT_FOUND); 791 assert_eq!(response_text(inactive).await, "unknown host"); 792 793 let missing = nip11_response(&router, None, None, 39_014).await; 794 assert_eq!(missing.status(), http::StatusCode::BAD_REQUEST); 795 assert_eq!(response_text(missing).await, "missing host"); 796 797 for path in ["/healthz", "/readyz", "/metricsz"] { 798 let legacy = router 799 .clone() 800 .oneshot( 801 Request::builder() 802 .uri(path) 803 .body(Body::empty()) 804 .expect("request"), 805 ) 806 .await 807 .expect("legacy route"); 808 assert_eq!(legacy.status(), http::StatusCode::NOT_FOUND); 809 } 810 811 let _ = std::fs::remove_dir_all(root); 812 } 813 814 #[tokio::test] 815 async fn websocket_auth_rejects_cross_tenant_relay_url() { 816 let root = temp_root("cross-tenant-auth"); 817 let _ = std::fs::remove_dir_all(&root); 818 let runtime = multi_host_runtime(&root); 819 let shutdown = runtime.shutdown_signal().clone(); 820 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 821 let address = listener.local_addr().expect("address"); 822 let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); 823 let mut request = format!("ws://{address}/") 824 .into_client_request() 825 .expect("request"); 826 request.headers_mut().insert( 827 header::SEC_WEBSOCKET_PROTOCOL, 828 http::HeaderValue::from_static("nostr"), 829 ); 830 request.headers_mut().insert( 831 header::HOST, 832 http::HeaderValue::from_static("beta.relay.test"), 833 ); 834 let (mut socket, response) = tokio_tungstenite::connect_async(request) 835 .await 836 .expect("websocket"); 837 838 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 839 let challenge = read_auth_challenge(&mut socket).await; 840 let created_at = current_unix_timestamp(); 841 let alpha_auth = tangle_v2_auth_event_for_relay( 842 FixtureKey::Owner, 843 &challenge, 844 created_at, 845 "wss://alpha.relay.test", 846 ) 847 .expect("alpha auth"); 848 let beta_auth = tangle_v2_auth_event_for_relay( 849 FixtureKey::Owner, 850 &challenge, 851 created_at.saturating_add(1), 852 "wss://beta.relay.test", 853 ) 854 .expect("beta auth"); 855 856 send_client_value(&mut socket, json!(["AUTH", event_to_value(&alpha_auth)])).await; 857 assert_eq!( 858 read_relay_value(&mut socket).await, 859 json!([ 860 "OK", 861 alpha_auth.id().as_str(), 862 false, 863 "auth-required: auth relay does not match canonical relay URL" 864 ]) 865 ); 866 867 send_client_value(&mut socket, json!(["AUTH", event_to_value(&beta_auth)])).await; 868 assert_eq!( 869 read_relay_value(&mut socket).await, 870 json!(["OK", beta_auth.id().as_str(), true, ""]) 871 ); 872 873 shutdown.request_shutdown(); 874 let report = timeout(Duration::from_secs(1), task) 875 .await 876 .expect("server shutdown") 877 .expect("task") 878 .expect("serve"); 879 assert_eq!(report.listen_addr(), address); 880 let _ = std::fs::remove_dir_all(root); 881 } 882 883 fn tangle_v2_event( 884 key: FixtureKey, 885 created_at: u64, 886 kind: u64, 887 tags: Vec<Tag>, 888 content: &str, 889 ) -> Result<Event, String> { 890 let event = server_pocket_event(key, created_at, kind, tags, content); 891 server_pocket_event_to_protocol(&event) 892 } 893 894 fn tangle_v2_auth_event( 895 key: FixtureKey, 896 challenge: &str, 897 created_at: u64, 898 ) -> Result<Event, String> { 899 tangle_v2_auth_event_for_relay(key, challenge, created_at, "wss://relay.radroots.test") 900 } 901 902 fn tangle_v2_auth_event_for_relay( 903 key: FixtureKey, 904 challenge: &str, 905 created_at: u64, 906 relay_url: &str, 907 ) -> Result<Event, String> { 908 tangle_v2_event( 909 key, 910 created_at, 911 22_242, 912 vec![ 913 Tag::from_parts("relay", &[relay_url])?, 914 Tag::from_parts("challenge", &[challenge])?, 915 ], 916 "", 917 ) 918 } 919 920 fn server_pocket_event( 921 key: FixtureKey, 922 created_at: u64, 923 kind: u64, 924 tags: Vec<Tag>, 925 content: &str, 926 ) -> PocketOwnedEvent { 927 let tags = server_pocket_tags_from_protocol(&tags); 928 let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32); 929 RelaySigner::from_secret_hex(&secret) 930 .expect("signer") 931 .sign_pocket_event( 932 PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")), 933 &tags, 934 PocketTime::from_u64(created_at), 935 content.as_bytes(), 936 ) 937 .expect("pocket event") 938 } 939 940 fn server_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 941 let parts = tags 942 .iter() 943 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 944 .collect::<Vec<_>>(); 945 PocketOwnedTags::new(&parts).expect("pocket tags") 946 } 947 948 fn server_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> { 949 let tags = event 950 .tags() 951 .map_err(|error| error.to_string())? 952 .iter() 953 .map(|tag| { 954 Tag::new( 955 tag.map(|value| { 956 std::str::from_utf8(value) 957 .map(str::to_owned) 958 .map_err(|error| error.to_string()) 959 }) 960 .collect::<Result<Vec<_>, _>>()?, 961 ) 962 .map_err(|error| error.to_string()) 963 }) 964 .collect::<Result<Vec<_>, _>>()?; 965 Ok(Event::new( 966 EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?, 967 UnsignedEvent::new( 968 PublicKeyHex::new(&event.pubkey().as_hex_string()) 969 .map_err(|error| error.to_string())?, 970 UnixTimestamp::new(event.created_at().as_u64()), 971 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?, 972 tags, 973 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?, 974 ), 975 SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?, 976 )) 977 } 978 979 fn fixture_secret_byte(key: FixtureKey) -> u8 { 980 match key { 981 FixtureKey::Relay => 9, 982 FixtureKey::Owner => 10, 983 FixtureKey::Admin => 11, 984 FixtureKey::Member => 12, 985 FixtureKey::Outsider => 13, 986 } 987 } 988 989 fn host_runtime(root: &Path) -> TangleHostRuntime { 990 host_runtime_with_ops(root, true, true) 991 } 992 993 fn host_runtime_with_ops( 994 root: &Path, 995 ops_enabled: bool, 996 expose_tenant_inventory: bool, 997 ) -> TangleHostRuntime { 998 host_runtime_from_tenants_with_host( 999 host_config_value_with_ops(ops_enabled, expose_tenant_inventory), 1000 vec![tenant_config_value( 1001 root, 1002 TenantConfigFixture { 1003 tenant_id: "test-relay", 1004 tenant_schema: "test_relay", 1005 host: "relay.radroots.test", 1006 relay_url: "wss://relay.radroots.test", 1007 name: "Radroots Test Relay", 1008 inactive: false, 1009 relay_secret_byte: 0x77, 1010 }, 1011 )], 1012 ) 1013 } 1014 1015 fn multi_host_runtime(root: &Path) -> TangleHostRuntime { 1016 host_runtime_from_tenants(vec![ 1017 tenant_config_value( 1018 root, 1019 TenantConfigFixture { 1020 tenant_id: "alpha", 1021 tenant_schema: "alpha_schema", 1022 host: "alpha.relay.test", 1023 relay_url: "wss://alpha.relay.test", 1024 name: "Alpha Relay", 1025 inactive: false, 1026 relay_secret_byte: 0x77, 1027 }, 1028 ), 1029 tenant_config_value( 1030 root, 1031 TenantConfigFixture { 1032 tenant_id: "beta", 1033 tenant_schema: "beta_schema", 1034 host: "beta.relay.test", 1035 relay_url: "wss://beta.relay.test", 1036 name: "Beta Relay", 1037 inactive: false, 1038 relay_secret_byte: 0x88, 1039 }, 1040 ), 1041 tenant_config_value( 1042 root, 1043 TenantConfigFixture { 1044 tenant_id: "inactive", 1045 tenant_schema: "inactive_schema", 1046 host: "inactive.relay.test", 1047 relay_url: "wss://inactive.relay.test", 1048 name: "Inactive Relay", 1049 inactive: true, 1050 relay_secret_byte: 0x99, 1051 }, 1052 ), 1053 ]) 1054 } 1055 1056 fn host_runtime_from_tenants(tenant_values: Vec<serde_json::Value>) -> TangleHostRuntime { 1057 host_runtime_from_tenants_with_host(host_config_value(), tenant_values) 1058 } 1059 1060 fn host_runtime_from_tenants_with_host( 1061 host_value: serde_json::Value, 1062 tenant_values: Vec<serde_json::Value>, 1063 ) -> TangleHostRuntime { 1064 let host = 1065 parse_tangle_host_runtime_config_json(&host_value.to_string()).expect("host config"); 1066 let tenants = tenant_values 1067 .into_iter() 1068 .map(|tenant| parse_tenant_runtime_config_json(&tenant.to_string()).expect("tenant")) 1069 .collect::<Vec<_>>(); 1070 let config = TangleHostRuntimeConfigSet::new(host, tenants).expect("config set"); 1071 TangleHostRuntime::open(config).expect("host runtime") 1072 } 1073 1074 fn host_config_value() -> serde_json::Value { 1075 host_config_value_with_ops(true, true) 1076 } 1077 1078 fn host_config_value_with_ops( 1079 ops_enabled: bool, 1080 expose_tenant_inventory: bool, 1081 ) -> serde_json::Value { 1082 json!({ 1083 "listen_addr": "127.0.0.1:0", 1084 "tenant_config_dir": "tenants", 1085 "limits": { 1086 "max_total_connections": 64, 1087 "max_total_subscriptions": 256, 1088 "tenant_startup_concurrency": 4 1089 }, 1090 "ops": { 1091 "enabled": ops_enabled, 1092 "expose_tenant_inventory": expose_tenant_inventory 1093 } 1094 }) 1095 } 1096 1097 struct TenantConfigFixture<'a> { 1098 tenant_id: &'a str, 1099 tenant_schema: &'a str, 1100 host: &'a str, 1101 relay_url: &'a str, 1102 name: &'a str, 1103 inactive: bool, 1104 relay_secret_byte: u8, 1105 } 1106 1107 fn tenant_config_value(root: &Path, fixture: TenantConfigFixture<'_>) -> serde_json::Value { 1108 let relay_secret = format!("{:02x}", fixture.relay_secret_byte).repeat(32); 1109 json!({ 1110 "tenant_id": fixture.tenant_id, 1111 "tenant_schema": fixture.tenant_schema, 1112 "host": fixture.host, 1113 "relay_url": fixture.relay_url, 1114 "inactive": fixture.inactive, 1115 "info": { 1116 "name": fixture.name 1117 }, 1118 "pocket": { 1119 "data_directory": root.join(format!("{}-pocket", fixture.tenant_id)), 1120 "sync_policy": "flush_on_shutdown", 1121 }, 1122 "pocket_query": { 1123 "allow_scraping": false, 1124 "allow_scrape_if_limited_to": 100, 1125 "allow_scrape_if_max_seconds": 3600 1126 }, 1127 "groups": { 1128 "enabled": true, 1129 "canonical_relay_url": fixture.relay_url, 1130 "relay_secret": relay_secret, 1131 "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"] 1132 }, 1133 "auth": { 1134 "challenge_ttl_seconds": 300, 1135 "created_at_skew_seconds": 600 1136 }, 1137 "limits": { 1138 "max_message_length": 1048576, 1139 "max_subid_length": 64, 1140 "max_subscriptions_per_connection": 64, 1141 "max_filters_per_request": 10, 1142 "max_tag_values_per_filter": 100, 1143 "max_query_complexity": 2048, 1144 "max_limit": 500, 1145 "default_limit": 100, 1146 "max_event_tags": 200, 1147 "max_content_length": 65536, 1148 "broadcast_channel_capacity": 8, 1149 "per_connection_outbound_queue": 8 1150 }, 1151 "rate_limits": { 1152 "auth": { 1153 "per_ip": {"window_seconds": 60, "max_hits": 120}, 1154 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 1155 "failures": {"window_seconds": 300, "max_hits": 5}, 1156 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 1157 }, 1158 "event": { 1159 "per_ip": {"window_seconds": 60, "max_hits": 600}, 1160 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 1161 "per_kind": {"window_seconds": 60, "max_hits": 1000} 1162 }, 1163 "group": { 1164 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 1165 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 1166 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 1167 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 1168 "join_flow": {"window_seconds": 300, "max_hits": 10}, 1169 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 1170 }, 1171 "req": { 1172 "per_ip": {"window_seconds": 60, "max_hits": 600}, 1173 "per_connection": {"window_seconds": 60, "max_hits": 120}, 1174 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 1175 "per_group": {"window_seconds": 60, "max_hits": 240}, 1176 "per_kind": {"window_seconds": 60, "max_hits": 500}, 1177 "broad": {"window_seconds": 60, "max_hits": 30} 1178 }, 1179 "count": { 1180 "per_ip": {"window_seconds": 60, "max_hits": 300}, 1181 "per_connection": {"window_seconds": 60, "max_hits": 60}, 1182 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 1183 "per_group": {"window_seconds": 60, "max_hits": 120}, 1184 "per_kind": {"window_seconds": 60, "max_hits": 240}, 1185 "broad": {"window_seconds": 60, "max_hits": 20} 1186 } 1187 } 1188 }) 1189 } 1190 1191 async fn nip11_response( 1192 router: &axum::Router, 1193 host: Option<&str>, 1194 forwarded_host: Option<&str>, 1195 peer_port: u16, 1196 ) -> http::Response<Body> { 1197 let mut builder = Request::builder() 1198 .uri("/") 1199 .header(header::ACCEPT, "application/nostr+json") 1200 .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], peer_port)))); 1201 if let Some(host) = host { 1202 builder = builder.header(header::HOST, host); 1203 } 1204 if let Some(forwarded_host) = forwarded_host { 1205 builder = builder.header("x-forwarded-host", forwarded_host); 1206 } 1207 router 1208 .clone() 1209 .oneshot(builder.body(Body::empty()).expect("request")) 1210 .await 1211 .expect("response") 1212 } 1213 1214 async fn host_ops_response(router: &axum::Router, path: &str) -> http::Response<Body> { 1215 router 1216 .clone() 1217 .oneshot( 1218 Request::builder() 1219 .uri(path) 1220 .body(Body::empty()) 1221 .expect("request"), 1222 ) 1223 .await 1224 .expect("response") 1225 } 1226 1227 fn ready_runtime(runtime: TangleHostRuntime) -> TangleHostRuntime { 1228 for tenant in runtime.registry().active_tenants() { 1229 tenant 1230 .runtime() 1231 .readiness_handle() 1232 .set_server_bind(crate::ops::BaseRelayReadinessCheckStatus::Ready); 1233 } 1234 runtime 1235 } 1236 1237 async fn response_json(response: http::Response<Body>) -> serde_json::Value { 1238 let body = to_bytes(response.into_body(), usize::MAX) 1239 .await 1240 .expect("body"); 1241 serde_json::from_slice::<serde_json::Value>(&body).expect("json") 1242 } 1243 1244 async fn response_text(response: http::Response<Body>) -> String { 1245 let body = to_bytes(response.into_body(), usize::MAX) 1246 .await 1247 .expect("body"); 1248 String::from_utf8(body.to_vec()).expect("utf8") 1249 } 1250 1251 fn temp_root(name: &str) -> PathBuf { 1252 std::env::temp_dir().join(format!("tangle-server-{name}-{}", std::process::id())) 1253 } 1254 1255 type TestWebSocket = tokio_tungstenite::WebSocketStream< 1256 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, 1257 >; 1258 1259 async fn send_client_value(socket: &mut TestWebSocket, value: serde_json::Value) { 1260 send_client_text(socket, &value.to_string()).await; 1261 } 1262 1263 async fn send_client_text(socket: &mut TestWebSocket, value: &str) { 1264 socket 1265 .send(TungsteniteMessage::Text(value.to_owned().into())) 1266 .await 1267 .expect("send client message"); 1268 } 1269 1270 async fn read_relay_value(socket: &mut TestWebSocket) -> serde_json::Value { 1271 let message = timeout(Duration::from_secs(1), socket.next()) 1272 .await 1273 .expect("relay message timeout") 1274 .expect("relay message") 1275 .expect("relay message result"); 1276 let TungsteniteMessage::Text(text) = message else { 1277 panic!("expected relay text message, got {message:?}"); 1278 }; 1279 serde_json::from_str(text.as_str()).expect("relay json") 1280 } 1281 1282 async fn read_auth_challenge(socket: &mut TestWebSocket) -> String { 1283 let auth = read_relay_value(socket).await; 1284 assert_eq!(auth[0], "AUTH"); 1285 auth[1].as_str().expect("auth challenge").to_owned() 1286 } 1287 1288 fn current_unix_timestamp() -> u64 { 1289 SystemTime::now() 1290 .duration_since(UNIX_EPOCH) 1291 .expect("system time") 1292 .as_secs() 1293 } 1294 }