operability_e2e.rs (14090B)
1 use std::path::{Path, PathBuf}; 2 use std::time::Duration; 3 use std::time::{SystemTime, UNIX_EPOCH}; 4 5 use myc::{ 6 MycActiveIdentity, MycConfig, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, 7 MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, 8 MycRuntimeAuditBackend, MycRuntimeStatus, MycSignerStateBackend, MycTransportDeliveryPolicy, 9 collect_status_full, 10 }; 11 use radroots_identity::RadrootsIdentity; 12 use radroots_nostr::prelude::{ 13 RadrootsNostrEventBuilder, RadrootsNostrKind, RadrootsNostrRelayUrl, 14 }; 15 use radroots_nostr_signer::prelude::{ 16 RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionDraft, 17 }; 18 use tokio::net::TcpListener; 19 use tokio::sync::oneshot; 20 use tokio::time::sleep; 21 22 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>; 23 24 struct TestRelay { 25 url: String, 26 shutdown_tx: Option<oneshot::Sender<()>>, 27 } 28 29 impl TestRelay { 30 async fn spawn() -> TestResult<Self> { 31 let listener = TcpListener::bind("127.0.0.1:0").await?; 32 let addr = listener.local_addr()?; 33 let url = format!("ws://{addr}"); 34 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 35 36 tokio::spawn(async move { 37 loop { 38 tokio::select! { 39 _ = &mut shutdown_rx => break, 40 accept = listener.accept() => { 41 let Ok((stream, _)) = accept else { 42 break; 43 }; 44 tokio::spawn(async move { 45 let _ = tokio_tungstenite::accept_async(stream).await; 46 }); 47 } 48 } 49 } 50 }); 51 52 Ok(Self { 53 url, 54 shutdown_tx: Some(shutdown_tx), 55 }) 56 } 57 58 fn url(&self) -> &str { 59 self.url.as_str() 60 } 61 } 62 63 impl Drop for TestRelay { 64 fn drop(&mut self) { 65 if let Some(shutdown_tx) = self.shutdown_tx.take() { 66 let _ = shutdown_tx.send(()); 67 } 68 } 69 } 70 71 struct HangingRelay { 72 url: String, 73 shutdown_tx: Option<oneshot::Sender<()>>, 74 } 75 76 impl HangingRelay { 77 async fn spawn(hold_open_for: Duration) -> TestResult<Self> { 78 let listener = TcpListener::bind("127.0.0.1:0").await?; 79 let addr = listener.local_addr()?; 80 let url = format!("ws://{addr}"); 81 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 82 83 tokio::spawn(async move { 84 loop { 85 tokio::select! { 86 _ = &mut shutdown_rx => break, 87 accept = listener.accept() => { 88 let Ok((stream, _)) = accept else { 89 break; 90 }; 91 tokio::spawn(async move { 92 sleep(hold_open_for).await; 93 drop(stream); 94 }); 95 } 96 } 97 } 98 }); 99 100 Ok(Self { 101 url, 102 shutdown_tx: Some(shutdown_tx), 103 }) 104 } 105 106 fn url(&self) -> &str { 107 self.url.as_str() 108 } 109 } 110 111 impl Drop for HangingRelay { 112 fn drop(&mut self) { 113 if let Some(shutdown_tx) = self.shutdown_tx.take() { 114 let _ = shutdown_tx.send(()); 115 } 116 } 117 } 118 119 fn write_test_identity(path: &Path, secret_key: &str) { 120 let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret"); 121 myc::identity_files::store_encrypted_identity(path, &identity).expect("write identity"); 122 } 123 124 fn signed_delivery_event(identity: &MycActiveIdentity, content: &str) -> nostr::Event { 125 identity 126 .sign_event_builder( 127 RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), content), 128 "operability delivery test event", 129 ) 130 .expect("sign event") 131 } 132 133 fn now_unix_secs() -> u64 { 134 SystemTime::now() 135 .duration_since(UNIX_EPOCH) 136 .expect("system time") 137 .as_secs() 138 } 139 140 fn build_runtime<F>(configure: F) -> MycRuntime 141 where 142 F: FnOnce(&mut MycConfig), 143 { 144 let temp = tempfile::tempdir().expect("tempdir").keep(); 145 let mut config = MycConfig::default(); 146 config.paths.state_dir = PathBuf::from(&temp).join("state"); 147 config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json"); 148 config.paths.user_identity_path = PathBuf::from(&temp).join("user.json"); 149 config.transport.connect_timeout_secs = 1; 150 write_test_identity( 151 &config.paths.signer_identity_path, 152 "1111111111111111111111111111111111111111111111111111111111111111", 153 ); 154 write_test_identity( 155 &config.paths.user_identity_path, 156 "2222222222222222222222222222222222222222222222222222222222222222", 157 ); 158 configure(&mut config); 159 MycRuntime::bootstrap(config).expect("runtime") 160 } 161 162 #[tokio::test] 163 async fn status_is_unready_when_transport_is_disabled() -> TestResult<()> { 164 let runtime = build_runtime(|_| {}); 165 166 let status = collect_status_full(&runtime).await?; 167 168 assert_eq!(status.status, MycRuntimeStatus::Unready); 169 assert!(!status.ready); 170 assert_eq!(status.transport.status, MycRuntimeStatus::Unready); 171 assert!( 172 status 173 .reasons 174 .iter() 175 .any(|reason| reason == "transport is disabled") 176 ); 177 Ok(()) 178 } 179 180 #[tokio::test] 181 async fn status_is_degraded_but_ready_when_any_policy_has_one_live_relay() -> TestResult<()> { 182 let relay = TestRelay::spawn().await?; 183 let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; 184 let runtime = build_runtime(|config| { 185 config.transport.enabled = true; 186 config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; 187 config.transport.delivery_policy = MycTransportDeliveryPolicy::Any; 188 }); 189 190 let status = collect_status_full(&runtime).await?; 191 192 assert_eq!(status.status, MycRuntimeStatus::Degraded); 193 assert!(status.ready); 194 assert_eq!(status.transport.status, MycRuntimeStatus::Degraded); 195 assert_eq!(status.transport.available_relay_count, 1); 196 assert_eq!(status.transport.unavailable_relay_count, 1); 197 Ok(()) 198 } 199 200 #[tokio::test] 201 async fn status_is_unready_when_all_policy_cannot_be_satisfied() -> TestResult<()> { 202 let relay = TestRelay::spawn().await?; 203 let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; 204 let runtime = build_runtime(|config| { 205 config.transport.enabled = true; 206 config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; 207 config.transport.delivery_policy = MycTransportDeliveryPolicy::All; 208 }); 209 210 let status = collect_status_full(&runtime).await?; 211 212 assert_eq!(status.status, MycRuntimeStatus::Unready); 213 assert!(!status.ready); 214 assert_eq!(status.transport.status, MycRuntimeStatus::Unready); 215 assert_eq!(status.transport.available_relay_count, 1); 216 assert_eq!(status.transport.required_available_relays, 2); 217 Ok(()) 218 } 219 220 #[tokio::test] 221 async fn status_is_unready_when_critical_delivery_job_is_blocked() -> TestResult<()> { 222 let relay = TestRelay::spawn().await?; 223 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 224 let runtime = build_runtime(|config| { 225 config.transport.enabled = true; 226 config.transport.relays = vec![relay.url().to_owned()]; 227 }); 228 let client_identity = RadrootsIdentity::from_secret_key_str( 229 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 230 )?; 231 let manager = runtime.signer_manager()?; 232 let connection = manager.register_connection( 233 RadrootsNostrSignerConnectionDraft::new( 234 client_identity.public_key(), 235 runtime.user_public_identity(), 236 ) 237 .with_connect_secret("blocked-secret") 238 .with_relays(vec![relay_url.clone()]) 239 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 240 )?; 241 let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?; 242 let outbox_record = MycDeliveryOutboxRecord::new( 243 MycDeliveryOutboxKind::ListenerResponsePublish, 244 signed_delivery_event(runtime.signer_identity(), "blocked-listener"), 245 vec![relay_url], 246 )? 247 .with_connection_id(&connection.connection_id) 248 .with_request_id("blocked-request") 249 .with_signer_publish_workflow_id(&workflow.workflow_id); 250 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 251 manager.cancel_publish_workflow(&workflow.workflow_id)?; 252 253 let status = collect_status_full(&runtime).await?; 254 255 assert_eq!(status.transport.status, MycRuntimeStatus::Healthy); 256 assert_eq!(status.status, MycRuntimeStatus::Unready); 257 assert!(!status.ready); 258 assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Unready); 259 assert!(!status.delivery_outbox.ready); 260 assert_eq!(status.delivery_outbox.unfinished_job_count, 1); 261 assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 1); 262 assert_eq!(status.delivery_outbox.blocked_job_count, 1); 263 assert_eq!(status.delivery_outbox.critical_blocked_job_count, 1); 264 assert!( 265 status 266 .reasons 267 .iter() 268 .any(|reason| reason == "1 critical delivery outbox job(s) are blocked") 269 ); 270 Ok(()) 271 } 272 273 #[tokio::test] 274 async fn status_is_degraded_but_ready_when_only_discovery_job_is_stuck() -> TestResult<()> { 275 let relay = TestRelay::spawn().await?; 276 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 277 let runtime = build_runtime(|config| { 278 config.transport.enabled = true; 279 config.transport.relays = vec![relay.url().to_owned()]; 280 config.transport.connect_timeout_secs = 1; 281 }); 282 let mut outbox_record = MycDeliveryOutboxRecord::new( 283 MycDeliveryOutboxKind::DiscoveryHandlerPublish, 284 signed_delivery_event(runtime.signer_identity(), "stuck-discovery"), 285 vec![relay_url], 286 )? 287 .with_attempt_id("discovery-attempt-1"); 288 let old_timestamp = now_unix_secs().saturating_sub(30); 289 outbox_record.created_at_unix = old_timestamp; 290 outbox_record.updated_at_unix = old_timestamp; 291 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 292 293 let status = collect_status_full(&runtime).await?; 294 295 assert_eq!(status.transport.status, MycRuntimeStatus::Healthy); 296 assert_eq!(status.status, MycRuntimeStatus::Degraded); 297 assert!(status.ready); 298 assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Degraded); 299 assert!(status.delivery_outbox.ready); 300 assert_eq!(status.delivery_outbox.unfinished_job_count, 1); 301 assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 0); 302 assert_eq!(status.delivery_outbox.blocked_job_count, 1); 303 assert_eq!(status.delivery_outbox.critical_blocked_job_count, 0); 304 assert_eq!(status.delivery_outbox.oldest_blocked_age_secs, Some(30)); 305 assert!( 306 status 307 .reasons 308 .iter() 309 .any(|reason| reason == "1 non-critical delivery outbox job(s) are blocked") 310 ); 311 Ok(()) 312 } 313 314 #[tokio::test] 315 async fn status_surfaces_last_delivery_recovery_result() -> TestResult<()> { 316 let runtime = build_runtime(|_| {}); 317 runtime.record_operation_audit(&MycOperationAuditRecord::new( 318 MycOperationAuditKind::DeliveryRecovery, 319 MycOperationAuditOutcome::Succeeded, 320 None, 321 None, 322 2, 323 2, 324 "recovered 2/2 delivery outbox job(s); republished 1", 325 )); 326 327 let status = collect_status_full(&runtime).await?; 328 let last_recovery = status 329 .delivery_outbox 330 .last_recovery 331 .expect("last delivery recovery"); 332 333 assert_eq!(last_recovery.outcome, MycOperationAuditOutcome::Succeeded); 334 assert_eq!( 335 last_recovery.summary, 336 "recovered 2/2 delivery outbox job(s); republished 1" 337 ); 338 Ok(()) 339 } 340 341 #[tokio::test] 342 async fn status_reports_sqlite_persistence_schema_state() -> TestResult<()> { 343 let runtime = build_runtime(|config| { 344 config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite; 345 config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite; 346 }); 347 348 let status = collect_status_full(&runtime).await?; 349 350 assert_eq!( 351 status.persistence.signer_state.backend, 352 MycSignerStateBackend::Sqlite 353 ); 354 assert!(status.persistence.signer_state.exists); 355 assert_eq!( 356 status 357 .persistence 358 .signer_state 359 .sqlite_schema 360 .as_ref() 361 .expect("signer sqlite schema") 362 .applied_migration_count, 363 Some(2) 364 ); 365 assert_eq!( 366 status 367 .persistence 368 .signer_state 369 .sqlite_schema 370 .as_ref() 371 .expect("signer sqlite schema") 372 .journal_mode 373 .as_deref(), 374 Some("wal") 375 ); 376 assert_eq!( 377 status 378 .persistence 379 .signer_state 380 .sqlite_schema 381 .as_ref() 382 .expect("signer sqlite schema") 383 .store_version, 384 Some(1) 385 ); 386 assert_eq!( 387 status.persistence.runtime_audit.backend, 388 MycRuntimeAuditBackend::Sqlite 389 ); 390 assert!(status.persistence.runtime_audit.exists); 391 assert_eq!( 392 status 393 .persistence 394 .runtime_audit 395 .sqlite_schema 396 .as_ref() 397 .expect("audit sqlite schema") 398 .applied_migration_count, 399 Some(1) 400 ); 401 assert_eq!( 402 status 403 .persistence 404 .runtime_audit 405 .sqlite_schema 406 .as_ref() 407 .expect("audit sqlite schema") 408 .latest_migration 409 .as_deref(), 410 Some("0000_runtime_audit_init") 411 ); 412 assert_eq!( 413 status 414 .persistence 415 .runtime_audit 416 .sqlite_schema 417 .as_ref() 418 .expect("audit sqlite schema") 419 .journal_mode 420 .as_deref(), 421 Some("wal") 422 ); 423 Ok(()) 424 }