operability_server.rs (8840B)
1 use std::net::{SocketAddr, TcpListener as StdTcpListener}; 2 use std::path::{Path, PathBuf}; 3 use std::time::Duration; 4 5 use myc::{MycConfig, MycRuntime, MycTransportDeliveryPolicy}; 6 use radroots_identity::RadrootsIdentity; 7 use serde_json::Value; 8 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 9 use tokio::net::{TcpListener, TcpStream}; 10 use tokio::sync::oneshot; 11 use tokio::time::{sleep, timeout}; 12 13 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>; 14 15 const HTTP_READY_TIMEOUT: Duration = Duration::from_secs(15); 16 17 struct TestRelay { 18 url: String, 19 shutdown_tx: Option<oneshot::Sender<()>>, 20 } 21 22 impl TestRelay { 23 async fn spawn() -> TestResult<Self> { 24 let listener = TcpListener::bind("127.0.0.1:0").await?; 25 let addr = listener.local_addr()?; 26 let url = format!("ws://{addr}"); 27 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 28 29 tokio::spawn(async move { 30 loop { 31 tokio::select! { 32 _ = &mut shutdown_rx => break, 33 accept = listener.accept() => { 34 let Ok((stream, _)) = accept else { 35 break; 36 }; 37 tokio::spawn(async move { 38 let _ = tokio_tungstenite::accept_async(stream).await; 39 }); 40 } 41 } 42 } 43 }); 44 45 Ok(Self { 46 url, 47 shutdown_tx: Some(shutdown_tx), 48 }) 49 } 50 51 fn url(&self) -> &str { 52 self.url.as_str() 53 } 54 } 55 56 impl Drop for TestRelay { 57 fn drop(&mut self) { 58 if let Some(shutdown_tx) = self.shutdown_tx.take() { 59 let _ = shutdown_tx.send(()); 60 } 61 } 62 } 63 64 struct HangingRelay { 65 url: String, 66 shutdown_tx: Option<oneshot::Sender<()>>, 67 } 68 69 impl HangingRelay { 70 async fn spawn(hold_open_for: Duration) -> TestResult<Self> { 71 let listener = TcpListener::bind("127.0.0.1:0").await?; 72 let addr = listener.local_addr()?; 73 let url = format!("ws://{addr}"); 74 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 75 76 tokio::spawn(async move { 77 loop { 78 tokio::select! { 79 _ = &mut shutdown_rx => break, 80 accept = listener.accept() => { 81 let Ok((stream, _)) = accept else { 82 break; 83 }; 84 tokio::spawn(async move { 85 sleep(hold_open_for).await; 86 drop(stream); 87 }); 88 } 89 } 90 } 91 }); 92 93 Ok(Self { 94 url, 95 shutdown_tx: Some(shutdown_tx), 96 }) 97 } 98 99 fn url(&self) -> &str { 100 self.url.as_str() 101 } 102 } 103 104 impl Drop for HangingRelay { 105 fn drop(&mut self) { 106 if let Some(shutdown_tx) = self.shutdown_tx.take() { 107 let _ = shutdown_tx.send(()); 108 } 109 } 110 } 111 112 fn write_test_identity(path: &Path, secret_key: &str) { 113 let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret"); 114 myc::identity_files::store_encrypted_identity(path, &identity).expect("write identity"); 115 } 116 117 fn free_loopback_addr() -> SocketAddr { 118 let listener = StdTcpListener::bind("127.0.0.1:0").expect("bind free loopback addr"); 119 let addr = listener.local_addr().expect("local addr"); 120 drop(listener); 121 addr 122 } 123 124 fn build_runtime<F>(configure: F) -> (MycRuntime, SocketAddr) 125 where 126 F: FnOnce(&mut MycConfig), 127 { 128 let temp = tempfile::tempdir().expect("tempdir").keep(); 129 let bind_addr = free_loopback_addr(); 130 let mut config = MycConfig::default(); 131 config.paths.state_dir = PathBuf::from(&temp).join("state"); 132 config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json"); 133 config.paths.user_identity_path = PathBuf::from(&temp).join("user.json"); 134 config.transport.connect_timeout_secs = 1; 135 config.observability.enabled = true; 136 config.observability.bind_addr = bind_addr; 137 write_test_identity( 138 &config.paths.signer_identity_path, 139 "1111111111111111111111111111111111111111111111111111111111111111", 140 ); 141 write_test_identity( 142 &config.paths.user_identity_path, 143 "2222222222222222222222222222222222222222222222222222222222222222", 144 ); 145 configure(&mut config); 146 (MycRuntime::bootstrap(config).expect("runtime"), bind_addr) 147 } 148 149 async fn spawn_runtime(runtime: MycRuntime) -> oneshot::Sender<()> { 150 let (shutdown_tx, shutdown_rx) = oneshot::channel(); 151 tokio::spawn(async move { 152 let _ = runtime 153 .run_until(async move { 154 let _ = shutdown_rx.await; 155 }) 156 .await; 157 }); 158 shutdown_tx 159 } 160 161 async fn wait_for_http(addr: SocketAddr) -> TestResult<()> { 162 timeout(HTTP_READY_TIMEOUT, async { 163 loop { 164 match TcpStream::connect(addr).await { 165 Ok(mut stream) => { 166 let _ = stream.shutdown().await; 167 return; 168 } 169 Err(_) => sleep(Duration::from_millis(50)).await, 170 } 171 } 172 }) 173 .await?; 174 Ok(()) 175 } 176 177 struct SimpleHttpResponse { 178 status: u16, 179 content_type: Option<String>, 180 body: String, 181 } 182 183 async fn http_get(addr: SocketAddr, path: &str) -> TestResult<SimpleHttpResponse> { 184 let mut stream = TcpStream::connect(addr).await?; 185 let request = format!("GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n"); 186 stream.write_all(request.as_bytes()).await?; 187 let mut response = Vec::new(); 188 stream.read_to_end(&mut response).await?; 189 let response = String::from_utf8(response)?; 190 let (head, body) = response 191 .split_once("\r\n\r\n") 192 .ok_or("missing http body separator")?; 193 let mut lines = head.lines(); 194 let status_line = lines.next().ok_or("missing status line")?; 195 let status = status_line 196 .split_whitespace() 197 .nth(1) 198 .ok_or("missing status code")? 199 .parse::<u16>()?; 200 let content_type = lines.find_map(|line| { 201 let (key, value) = line.split_once(':')?; 202 if key.eq_ignore_ascii_case("content-type") { 203 Some(value.trim().to_owned()) 204 } else { 205 None 206 } 207 }); 208 Ok(SimpleHttpResponse { 209 status, 210 content_type, 211 body: body.to_owned(), 212 }) 213 } 214 215 #[tokio::test] 216 async fn observability_server_reports_unready_when_transport_is_disabled() -> TestResult<()> { 217 let (runtime, bind_addr) = build_runtime(|_| {}); 218 let shutdown_tx = spawn_runtime(runtime).await; 219 wait_for_http(bind_addr).await?; 220 221 let health = http_get(bind_addr, "/healthz").await?; 222 assert_eq!(health.status, 503); 223 assert_eq!(health.body, "unready"); 224 225 let ready = http_get(bind_addr, "/readyz").await?; 226 assert_eq!(ready.status, 503); 227 assert_eq!(ready.body, "unready"); 228 229 let status = http_get(bind_addr, "/status").await?; 230 assert_eq!(status.status, 200); 231 let body: Value = serde_json::from_str(status.body.as_str())?; 232 assert_eq!(body["status"], "unready"); 233 assert_eq!(body["ready"], false); 234 235 let metrics = http_get(bind_addr, "/metrics").await?; 236 assert_eq!(metrics.status, 200); 237 assert!( 238 metrics 239 .content_type 240 .as_deref() 241 .unwrap_or_default() 242 .starts_with("text/plain") 243 ); 244 assert!(metrics.body.contains("myc_runtime_operation_total")); 245 246 let _ = shutdown_tx.send(()); 247 Ok(()) 248 } 249 250 #[tokio::test] 251 async fn observability_server_reports_degraded_but_ready_partial_outage() -> TestResult<()> { 252 let relay = TestRelay::spawn().await?; 253 let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; 254 let (runtime, bind_addr) = build_runtime(|config| { 255 config.transport.enabled = true; 256 config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; 257 config.transport.delivery_policy = MycTransportDeliveryPolicy::Any; 258 }); 259 let shutdown_tx = spawn_runtime(runtime).await; 260 wait_for_http(bind_addr).await?; 261 262 let health = http_get(bind_addr, "/healthz").await?; 263 assert_eq!(health.status, 200); 264 assert_eq!(health.body, "degraded"); 265 266 let ready = http_get(bind_addr, "/readyz").await?; 267 assert_eq!(ready.status, 200); 268 assert_eq!(ready.body, "ready"); 269 270 let status = http_get(bind_addr, "/status").await?; 271 let body: Value = serde_json::from_str(status.body.as_str())?; 272 assert_eq!(body["status"], "degraded"); 273 assert_eq!(body["ready"], true); 274 assert_eq!(body["transport"]["available_relay_count"], 1); 275 assert_eq!(body["transport"]["unavailable_relay_count"], 1); 276 277 let _ = shutdown_tx.send(()); 278 Ok(()) 279 }