tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 65dc08c1f9068277f31892d1d7caa1c27572a711
parent 9e8468363e641d84102146dfa00049dae75bdd8c
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 02:45:45 -0700

runtime: prove server lifecycle integration

- extend the CLI child-process test across health, readiness, and NIP-11 routes
- promote the long-running server acceptance target into an active async integration test
- verify the server stays alive until shutdown and reports the bound listener address
- verify formatting, CLI tests, runtime tests, workspace checks, and clippy

Diffstat:
Mcrates/tangle/tests/version.rs | 49+++++++++++++++++++++++++++++++++++++------------
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 126++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 159 insertions(+), 16 deletions(-)

diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -1,7 +1,6 @@ #![forbid(unsafe_code)] use std::{ - fmt::Write as _, io::{Read, Write}, net::{SocketAddr, TcpListener, TcpStream}, process::{Child, Command, Output, Stdio}, @@ -137,9 +136,28 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { .expect("write config"); let mut child = TangleChild::spawn(&config_path); - let response = wait_for_http_ok(listen_addr, "/healthz"); + let health = wait_for_http_ok(listen_addr, "/healthz", None); + let ready = wait_for_http_ok(listen_addr, "/readyz", None); + let nip11 = wait_for_http_ok(listen_addr, "/", Some("application/nostr+json")); + let health_value = + serde_json::from_str::<serde_json::Value>(response_body(&health)).expect("health json"); + let ready_value = + serde_json::from_str::<serde_json::Value>(response_body(&ready)).expect("ready json"); + let nip11_value = + serde_json::from_str::<serde_json::Value>(response_body(&nip11)).expect("nip11 json"); - assert!(response.contains(r#""status":"ok""#)); + assert_eq!(health_value["status"], "ok"); + assert_eq!(ready_value["status"], "ready"); + assert_eq!(ready_value["checks"]["pocket_storage"], "ready"); + assert_eq!(nip11_value["name"], "tangle"); + assert_eq!(nip11_value["limitation"]["payment_required"], false); + assert_eq!(nip11_value["limitation"]["restricted_writes"], true); + assert!( + nip11_value["supported_nips"] + .as_array() + .expect("supported nips") + .contains(&serde_json::json!(29)) + ); assert!(child.try_wait().expect("child status").is_none()); assert!(data_dir.exists()); @@ -198,18 +216,17 @@ fn reserve_loopback_addr() -> SocketAddr { listener.local_addr().expect("loopback address") } -fn wait_for_http_ok(address: SocketAddr, path: &str) -> String { +fn wait_for_http_ok(address: SocketAddr, path: &str, accept: Option<&str>) -> String { let deadline = Instant::now() + Duration::from_secs(5); let mut last_error = String::new(); while Instant::now() < deadline { - match http_get(address, path) { + match http_get(address, path, accept) { Ok(response) if response.starts_with("HTTP/1.1 200 OK") => return response, Ok(response) => { last_error = response.lines().next().unwrap_or("").to_owned(); } Err(error) => { - last_error.clear(); - write!(&mut last_error, "{error}").expect("format error"); + last_error = error.to_string(); } } std::thread::sleep(Duration::from_millis(50)); @@ -217,15 +234,23 @@ fn wait_for_http_ok(address: SocketAddr, path: &str) -> String { panic!("server did not answer {path}: {last_error}"); } -fn http_get(address: SocketAddr, path: &str) -> std::io::Result<String> { +fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::Result<String> { let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(200))?; stream.set_read_timeout(Some(Duration::from_millis(500)))?; stream.set_write_timeout(Some(Duration::from_millis(500)))?; - write!( - stream, - "GET {path} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n" - )?; + let mut request = format!("GET {path} HTTP/1.1\r\nHost: {address}\r\n"); + if let Some(accept) = accept { + request.push_str("Accept: "); + request.push_str(accept); + request.push_str("\r\n"); + } + request.push_str("Connection: close\r\n\r\n"); + stream.write_all(request.as_bytes())?; let mut response = String::new(); stream.read_to_string(&mut response)?; Ok(response) } + +fn response_body(response: &str) -> &str { + response.split_once("\r\n\r\n").expect("response body").1 +} diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1,9 +1,54 @@ #![forbid(unsafe_code)] -#[test] -#[ignore = "phase2 target: long-lived server runtime"] -fn tangle_run_serves_until_shutdown() { - pending("tangle run --config PATH must bind a server and run until shutdown"); +use std::{ + io::{Read, Write}, + net::{SocketAddr, TcpStream}, + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; +use tangle_runtime::{ + config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + runtime::TangleRuntime, + server::serve_listener_until_shutdown, +}; +use tangle_test_support::{FixtureKey, TANGLE_V2_RELAY_SECRET_HEX}; +use tokio::{net::TcpListener, time::timeout}; + +#[tokio::test] +async fn tangle_run_serves_until_shutdown() { + let root = temp_root("acceptance-server"); + let _ = std::fs::remove_dir_all(&root); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let runtime = TangleRuntime::open(runtime_config(&root, address)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); + + let health = wait_for_http_ok(address, "/healthz", None).await; + let ready = wait_for_http_ok(address, "/readyz", None).await; + let nip11 = wait_for_http_ok(address, "/", Some("application/nostr+json")).await; + + assert!(health.contains(r#""status":"ok""#)); + assert!(ready.contains(r#""status":"ready""#)); + assert!(nip11.contains(r#""name":"tangle""#)); + assert!( + nip11 + .to_ascii_lowercase() + .contains("content-type: application/nostr+json") + ); + assert!(!task.is_finished()); + + shutdown.request_shutdown(); + + let report = timeout(Duration::from_secs(2), task) + .await + .expect("shutdown timeout") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + assert_eq!(report.closed_subscriptions(), 0); + + let _ = std::fs::remove_dir_all(root); } #[test] @@ -79,3 +124,76 @@ fn relay_generated_events_are_stored_projected_recovered_and_broadcast() { fn pending(target: &str) { panic!("{target}"); } + +fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfig { + let raw = serde_json::json!({ + "server": { + "listen_addr": listen_addr.to_string(), + "relay_url": "wss://relay.radroots.test" + }, + "pocket": { + "data_directory": root.join("pocket"), + "map_size_bytes": 1073741824_u64, + "reader_slots": 128, + "sync_policy": "flush_on_shutdown" + }, + "groups": { + "enabled": true, + "canonical_relay_url": "wss://relay.radroots.test", + "relay_secret": TANGLE_V2_RELAY_SECRET_HEX, + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "max_pending_events": 8 + } + }) + .to_string(); + parse_base_relay_runtime_config_json(&raw).expect("config") +} + +async fn wait_for_http_ok( + address: SocketAddr, + path: &'static str, + accept: Option<&'static str>, +) -> String { + let deadline = Instant::now() + Duration::from_secs(3); + let mut last_error = String::new(); + while Instant::now() < deadline { + match tokio::task::spawn_blocking(move || http_get(address, path, accept)) + .await + .expect("http task") + { + Ok(response) if response.starts_with("HTTP/1.1 200 OK") => return response, + Ok(response) => { + last_error = response.lines().next().unwrap_or("").to_owned(); + } + Err(error) => { + last_error = error.to_string(); + } + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + panic!("server did not answer {path}: {last_error}"); +} + +fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::Result<String> { + let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(200))?; + stream.set_read_timeout(Some(Duration::from_millis(500)))?; + stream.set_write_timeout(Some(Duration::from_millis(500)))?; + let mut request = format!("GET {path} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n"); + if let Some(accept) = accept { + request.push_str(&format!("Accept: {accept}\r\n")); + } + request.push_str("\r\n"); + stream.write_all(request.as_bytes())?; + let mut response = String::new(); + stream.read_to_string(&mut response)?; + Ok(response) +} + +fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) +}