transport.rs (48550B)
1 use futures::future::BoxFuture; 2 use nostr::JsonUtil; 3 use radroots_event_store::{RadrootsEventStore, RadrootsEventVerificationStatus}; 4 use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent}; 5 use radroots_events::kinds::KIND_POST; 6 use radroots_nostr::prelude::{ 7 RadrootsNostrKeys, RadrootsNostrSecretKey, RadrootsNostrTimestamp, radroots_nostr_build_event, 8 radroots_nostr_sign_frozen_draft, 9 }; 10 use radroots_outbox::{ 11 RadrootsOutbox, RadrootsOutboxClaimedEvent, RadrootsOutboxEventState, 12 RadrootsOutboxOperationInput, RadrootsOutboxOperationStatus, RadrootsOutboxRelayStatus, 13 }; 14 use radroots_relay_transport::{ 15 RadrootsMockRelayFetchAdapter, RadrootsMockRelayPublishAdapter, RadrootsOutboxPublishPolicy, 16 RadrootsRelayFetchItem, RadrootsRelayFetchOutcomeKind, RadrootsRelayFetchRequest, 17 RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter, 18 RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, RadrootsRelayTargetSet, 19 RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy, 20 fetch_and_ingest_relay_events, publish_claimed_outbox_event, publish_signed_event, 21 }; 22 use std::net::{IpAddr, Ipv4Addr}; 23 24 const FIXTURE_ALICE_SECRET_KEY_HEX: &str = 25 "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5"; 26 const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str = 27 "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df"; 28 const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com"; 29 const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com"; 30 const RELAY_TERTIARY_WSS: &str = "wss://relay-3.example.com"; 31 32 struct TransportFailurePublishAdapter; 33 34 impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter { 35 fn publish<'a>( 36 &'a self, 37 _request: RadrootsRelayPublishRequest, 38 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> 39 { 40 Box::pin(async { 41 Err(RadrootsRelayTransportError::Transport( 42 "adapter boundary unavailable".to_owned(), 43 )) 44 }) 45 } 46 } 47 48 struct NostrJsonFailurePublishAdapter; 49 50 impl RadrootsRelayPublishAdapter for NostrJsonFailurePublishAdapter { 51 fn publish<'a>( 52 &'a self, 53 _request: RadrootsRelayPublishRequest, 54 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> 55 { 56 Box::pin(async { 57 Err(RadrootsRelayTransportError::NostrEventJson( 58 "adapter rejected raw event".to_owned(), 59 )) 60 }) 61 } 62 } 63 64 fn fixture_keys() -> RadrootsNostrKeys { 65 let secret_key = 66 RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key"); 67 RadrootsNostrKeys::new(secret_key) 68 } 69 70 fn signed_post(content: &str) -> RadrootsSignedNostrEvent { 71 let draft = RadrootsFrozenEventDraft::new( 72 "radroots.social.post.v1", 73 KIND_POST, 74 1_700_000_000, 75 vec![vec!["t".to_owned(), "soil".to_owned()]], 76 content, 77 FIXTURE_ALICE_PUBLIC_KEY_HEX, 78 ) 79 .expect("draft"); 80 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event") 81 } 82 83 async fn complete_claimed_signing( 84 outbox: &RadrootsOutbox, 85 claimed: &RadrootsOutboxClaimedEvent, 86 now_ms: i64, 87 ) -> RadrootsSignedNostrEvent { 88 if let Some(signed_event) = claimed.signed_event.clone() { 89 return signed_event; 90 } 91 let signed_event = 92 radroots_nostr_sign_frozen_draft(&fixture_keys(), &claimed.draft).expect("signed event"); 93 outbox 94 .complete_signing( 95 claimed.outbox_event_id, 96 claimed.claim_token.as_str(), 97 signed_event, 98 now_ms, 99 ) 100 .await 101 .expect("complete signing") 102 } 103 104 fn unsupported_raw_event() -> String { 105 let event = radroots_nostr_build_event(999, "unsupported", Vec::new()) 106 .expect("event builder") 107 .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_001)) 108 .sign_with_keys(&fixture_keys()) 109 .expect("signed unsupported event"); 110 event.as_json() 111 } 112 113 fn tampered_raw_event() -> String { 114 let signed = signed_post("trusted"); 115 let mut value = 116 serde_json::from_str::<serde_json::Value>(signed.raw_json.as_str()).expect("raw json"); 117 value["content"] = serde_json::Value::String("tampered".to_owned()); 118 serde_json::to_string(&value).expect("tampered json") 119 } 120 121 #[test] 122 fn relay_url_validation_and_target_normalization() { 123 let relay = RadrootsRelayUrl::parse("wss://Relay.Example.com", RadrootsRelayUrlPolicy::Public) 124 .expect("relay"); 125 assert_eq!(relay.as_str(), RELAY_PRIMARY_WSS); 126 assert_eq!(relay.clone().into_string(), RELAY_PRIMARY_WSS); 127 let relay_path = RadrootsRelayUrl::parse( 128 "wss://Relay.Example.com/nostr", 129 RadrootsRelayUrlPolicy::Public, 130 ) 131 .expect("relay path"); 132 assert_eq!(relay_path.as_str(), "wss://relay.example.com/nostr"); 133 134 assert!( 135 RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::Public).is_err() 136 ); 137 let local = RadrootsRelayUrl::parse("ws://localhost:7777", RadrootsRelayUrlPolicy::Localhost) 138 .expect("local relay"); 139 assert_eq!(local.as_str(), "ws://localhost:7777"); 140 let local_ipv4 = 141 RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::Localhost) 142 .expect("local ipv4 relay"); 143 assert_eq!(local_ipv4.as_str(), "ws://127.0.0.1:7777"); 144 let local_ipv6 = RadrootsRelayUrl::parse("ws://[::1]:7777", RadrootsRelayUrlPolicy::Localhost) 145 .expect("local ipv6 relay"); 146 assert_eq!(local_ipv6.as_str(), "ws://[::1]:7777"); 147 assert!( 148 RadrootsRelayUrl::parse("ws://example.com", RadrootsRelayUrlPolicy::Localhost).is_err() 149 ); 150 assert!( 151 RadrootsRelayUrl::parse("ws://192.168.1.10:7777", RadrootsRelayUrlPolicy::Localhost) 152 .is_err() 153 ); 154 assert!(matches!( 155 RadrootsRelayUrl::parse("wss://127.0.0.1", RadrootsRelayUrlPolicy::Public), 156 Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. }) 157 )); 158 assert!(matches!( 159 RadrootsRelayUrl::parse("wss://10.1.2.3", RadrootsRelayUrlPolicy::Public), 160 Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. }) 161 )); 162 assert!(matches!( 163 RadrootsRelayUrl::parse("wss://[::1]", RadrootsRelayUrlPolicy::Public), 164 Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. }) 165 )); 166 assert!(matches!( 167 RadrootsRelayUrl::parse("wss://[fd00::1]", RadrootsRelayUrlPolicy::Public), 168 Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. }) 169 )); 170 let public_relay = 171 RadrootsRelayUrl::parse("wss://relay.example.com", RadrootsRelayUrlPolicy::Public) 172 .expect("public relay"); 173 public_relay 174 .validate_public_resolved_ip_addrs([IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34))]) 175 .expect("public resolved ip"); 176 assert!(matches!( 177 public_relay 178 .validate_public_resolved_ip_addrs([IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10))]), 179 Err(RadrootsRelayTransportError::RelayUrlResolvedForbiddenDestination { .. }) 180 )); 181 182 assert!( 183 RadrootsRelayUrl::parse("https://relay.example.com", RadrootsRelayUrlPolicy::Public) 184 .is_err() 185 ); 186 assert!( 187 RadrootsRelayUrl::parse( 188 "wss://user@relay.example.com", 189 RadrootsRelayUrlPolicy::Public 190 ) 191 .is_err() 192 ); 193 assert!(matches!( 194 RadrootsRelayUrl::parse( 195 "wss://user:password@relay.example.com", 196 RadrootsRelayUrlPolicy::Public 197 ), 198 Err(RadrootsRelayTransportError::RelayUrlUserinfo { .. }) 199 )); 200 assert!(matches!( 201 RadrootsRelayUrl::parse( 202 "wss://:password@relay.example.com", 203 RadrootsRelayUrlPolicy::Public 204 ), 205 Err(RadrootsRelayTransportError::RelayUrlUserinfo { .. }) 206 )); 207 assert!( 208 RadrootsRelayUrl::parse( 209 "wss://relay.example.com:bad", 210 RadrootsRelayUrlPolicy::Public 211 ) 212 .is_err() 213 ); 214 assert!(RadrootsRelayUrl::parse("wss://", RadrootsRelayUrlPolicy::Public).is_err()); 215 assert!(matches!( 216 RadrootsRelayUrl::parse("radroots:relay", RadrootsRelayUrlPolicy::Public), 217 Err(RadrootsRelayTransportError::EmptyRelayHost { .. }) 218 )); 219 assert!(matches!( 220 RadrootsRelayUrl::parse("relay.example.com", RadrootsRelayUrlPolicy::Public), 221 Err(RadrootsRelayTransportError::RelayUrlParse { .. }) 222 )); 223 assert!( 224 RadrootsRelayUrl::parse( 225 "wss://relay.example.com?subscription=1", 226 RadrootsRelayUrlPolicy::Public 227 ) 228 .is_err() 229 ); 230 assert!( 231 RadrootsRelayUrl::parse( 232 "wss://relay.example.com#fragment", 233 RadrootsRelayUrlPolicy::Public 234 ) 235 .is_err() 236 ); 237 238 let targets = RadrootsRelayTargetSet::new( 239 vec![ 240 RELAY_TERTIARY_WSS, 241 RELAY_PRIMARY_WSS, 242 RELAY_PRIMARY_WSS, 243 RELAY_SECONDARY_WSS, 244 ], 245 RadrootsRelayUrlPolicy::Public, 246 ) 247 .expect("targets"); 248 assert_eq!( 249 targets.relay_strings(), 250 vec![ 251 RELAY_TERTIARY_WSS.to_owned(), 252 RELAY_PRIMARY_WSS.to_owned(), 253 RELAY_SECONDARY_WSS.to_owned() 254 ] 255 ); 256 257 let from_urls = RadrootsRelayTargetSet::from_urls(vec![ 258 relay_path.clone(), 259 relay_path.clone(), 260 RadrootsRelayUrl::parse(RELAY_SECONDARY_WSS, RadrootsRelayUrlPolicy::Public) 261 .expect("secondary"), 262 ]) 263 .expect("from urls"); 264 assert_eq!(from_urls.len(), 2); 265 assert!(!from_urls.is_empty()); 266 assert_eq!(from_urls.relays()[0], relay_path); 267 assert_eq!( 268 from_urls.relays()[0].to_string(), 269 "wss://relay.example.com/nostr" 270 ); 271 assert!(matches!( 272 RadrootsRelayTargetSet::new(Vec::<&str>::new(), RadrootsRelayUrlPolicy::Public), 273 Err(RadrootsRelayTransportError::EmptyTargetSet) 274 )); 275 assert!(matches!( 276 RadrootsRelayTargetSet::from_urls(Vec::new()), 277 Err(RadrootsRelayTransportError::EmptyTargetSet) 278 )); 279 } 280 281 #[test] 282 fn outcome_prefix_classification_covers_required_kinds() { 283 let cases = [ 284 ("blocked: policy", RadrootsRelayOutcomeKind::Blocked), 285 ( 286 "rate-limited: slow down", 287 RadrootsRelayOutcomeKind::RateLimited, 288 ), 289 ("invalid: bad event", RadrootsRelayOutcomeKind::Invalid), 290 ("pow: difficulty 24", RadrootsRelayOutcomeKind::PowRequired), 291 ( 292 "restricted: group write denied", 293 RadrootsRelayOutcomeKind::Restricted, 294 ), 295 ( 296 "auth-required: challenge", 297 RadrootsRelayOutcomeKind::AuthRequired, 298 ), 299 ("mute: pubkey muted", RadrootsRelayOutcomeKind::Muted), 300 ( 301 "unsupported: event kind", 302 RadrootsRelayOutcomeKind::Unsupported, 303 ), 304 ( 305 "payment-required: paid relay", 306 RadrootsRelayOutcomeKind::PaymentRequired, 307 ), 308 ( 309 "duplicate: already have it", 310 RadrootsRelayOutcomeKind::DuplicateAccepted, 311 ), 312 ("error: relay failed", RadrootsRelayOutcomeKind::Error), 313 ("timeout: no OK", RadrootsRelayOutcomeKind::Timeout), 314 ("strange relay text", RadrootsRelayOutcomeKind::Unknown), 315 ]; 316 317 for (message, kind) in cases { 318 let outcome = RadrootsRelayOutcome::classify(message); 319 assert_eq!(outcome.kind, kind); 320 } 321 322 assert!(RadrootsRelayOutcome::classify("duplicate: already have it").counts_toward_quorum()); 323 assert!( 324 RadrootsRelayOutcome::skipped_already_accepted("already accepted").counts_toward_quorum() 325 ); 326 assert!(RadrootsRelayOutcome::classify("auth-required: challenge").is_retryable()); 327 assert!(RadrootsRelayOutcome::classify("restricted: denied").is_terminal_failure()); 328 assert!(RadrootsRelayOutcome::relay_url_rejected("unsafe relay").is_terminal_failure()); 329 assert!(RadrootsRelayOutcome::classify("mute: pubkey muted").is_terminal_failure()); 330 } 331 332 #[tokio::test] 333 async fn mock_publish_preserves_exact_raw_json_and_counts_outcomes() { 334 let signed = signed_post("hello"); 335 let targets = RadrootsRelayTargetSet::new( 336 vec![RELAY_PRIMARY_WSS, RELAY_SECONDARY_WSS, RELAY_TERTIARY_WSS], 337 RadrootsRelayUrlPolicy::Public, 338 ) 339 .expect("targets"); 340 let adapter = RadrootsMockRelayPublishAdapter::new() 341 .with_outcome( 342 RELAY_SECONDARY_WSS, 343 RadrootsRelayOutcome::classify("duplicate: already have it"), 344 ) 345 .with_outcome( 346 RELAY_TERTIARY_WSS, 347 RadrootsRelayOutcome::classify("auth-required: challenge"), 348 ); 349 350 let receipt = publish_signed_event( 351 &adapter, 352 radroots_relay_transport::RadrootsRelayPublishRequest::new(signed.clone(), targets, 1_000) 353 .with_accepted_quorum(2), 354 ) 355 .await 356 .expect("publish"); 357 358 assert_eq!(adapter.captured_raw_events(), vec![signed.raw_json]); 359 assert_eq!(receipt.attempted_count, 3); 360 assert_eq!(receipt.accepted_count, 2); 361 assert_eq!(receipt.retryable_count, 1); 362 assert!(receipt.quorum_met); 363 serde_json::to_string(&receipt).expect("receipt json"); 364 } 365 366 #[tokio::test] 367 async fn publish_receipts_track_terminal_skipped_and_adapter_errors() { 368 let signed = signed_post("terminal"); 369 let targets = RadrootsRelayTargetSet::new( 370 vec![RELAY_PRIMARY_WSS, RELAY_SECONDARY_WSS], 371 RadrootsRelayUrlPolicy::Public, 372 ) 373 .expect("targets"); 374 let adapter = RadrootsMockRelayPublishAdapter::new().with_outcome( 375 RELAY_SECONDARY_WSS, 376 RadrootsRelayOutcome::classify("restricted: group write denied"), 377 ); 378 379 let receipt = publish_signed_event( 380 &adapter, 381 RadrootsRelayPublishRequest::new(signed.clone(), targets, 1_050).with_accepted_quorum(2), 382 ) 383 .await 384 .expect("publish"); 385 386 assert_eq!(receipt.event_id, signed.id); 387 assert_eq!(receipt.attempted_count, 2); 388 assert_eq!(receipt.accepted_count, 1); 389 assert_eq!(receipt.retryable_count, 0); 390 assert_eq!(receipt.terminal_count, 1); 391 assert_eq!(receipt.quorum, 2); 392 assert!(!receipt.quorum_met); 393 394 let skipped = RadrootsRelayPublishRelayReceipt::skipped( 395 RELAY_TERTIARY_WSS, 396 RadrootsRelayOutcome::timeout("timeout: no OK"), 397 ); 398 assert_eq!(skipped.relay_url, RELAY_TERTIARY_WSS); 399 assert!(!skipped.attempted); 400 assert_eq!(skipped.outcome.kind, RadrootsRelayOutcomeKind::Timeout); 401 402 let error = publish_signed_event( 403 &TransportFailurePublishAdapter, 404 RadrootsRelayPublishRequest::new( 405 signed, 406 RadrootsRelayTargetSet::new(vec![RELAY_PRIMARY_WSS], RadrootsRelayUrlPolicy::Public) 407 .expect("targets"), 408 1_060, 409 ), 410 ) 411 .await 412 .expect_err("transport failure"); 413 assert!(matches!(error, RadrootsRelayTransportError::Transport(_))); 414 } 415 416 #[tokio::test] 417 async fn fetch_ingests_events_and_records_relay_observations() { 418 let signed = signed_post("hello"); 419 let store = RadrootsEventStore::open_memory().await.expect("store"); 420 let adapter = RadrootsMockRelayFetchAdapter::new(vec![ 421 RadrootsRelayFetchItem::Event { 422 relay_url: RELAY_PRIMARY_WSS.to_owned(), 423 raw_json: signed.raw_json.clone(), 424 observed_at_ms: 1_000, 425 }, 426 RadrootsRelayFetchItem::Event { 427 relay_url: RELAY_PRIMARY_WSS.to_owned(), 428 raw_json: signed.raw_json.clone(), 429 observed_at_ms: 1_001, 430 }, 431 RadrootsRelayFetchItem::Event { 432 relay_url: RELAY_SECONDARY_WSS.to_owned(), 433 raw_json: unsupported_raw_event(), 434 observed_at_ms: 1_002, 435 }, 436 RadrootsRelayFetchItem::Event { 437 relay_url: RELAY_SECONDARY_WSS.to_owned(), 438 raw_json: tampered_raw_event(), 439 observed_at_ms: 1_003, 440 }, 441 RadrootsRelayFetchItem::Event { 442 relay_url: RELAY_TERTIARY_WSS.to_owned(), 443 raw_json: "{not json".to_owned(), 444 observed_at_ms: 1_004, 445 }, 446 RadrootsRelayFetchItem::Eose { 447 relay_url: RELAY_PRIMARY_WSS.to_owned(), 448 }, 449 RadrootsRelayFetchItem::Closed { 450 relay_url: RELAY_SECONDARY_WSS.to_owned(), 451 message: "auth-required: challenge".to_owned(), 452 }, 453 RadrootsRelayFetchItem::Closed { 454 relay_url: RELAY_TERTIARY_WSS.to_owned(), 455 message: "restricted: group write denied".to_owned(), 456 }, 457 RadrootsRelayFetchItem::Notice { 458 relay_url: RELAY_TERTIARY_WSS.to_owned(), 459 message: "notice: test".to_owned(), 460 }, 461 ]); 462 463 let receipt = fetch_and_ingest_relay_events( 464 &adapter, 465 &store, 466 RadrootsRelayFetchRequest::fetch(1_000, 10), 467 ) 468 .await 469 .expect("fetch ingest"); 470 471 assert_eq!(receipt.inserted_count, 3); 472 assert_eq!(receipt.duplicate_count, 1); 473 assert_eq!(receipt.unsupported_count, 1); 474 assert_eq!(receipt.malformed_count, 1); 475 assert_eq!(receipt.eose_count, 1); 476 assert_eq!(receipt.closed_count, 2); 477 assert_eq!(receipt.notice_count, 1); 478 assert_eq!(receipt.relay_outcomes.len(), 4); 479 assert_eq!(receipt.relay_outcomes[0].relay_url, RELAY_PRIMARY_WSS); 480 assert_eq!( 481 receipt.relay_outcomes[0].kind, 482 RadrootsRelayFetchOutcomeKind::Eose 483 ); 484 assert!(receipt.relay_outcomes[0].relay_outcome.is_none()); 485 assert_eq!(receipt.relay_outcomes[1].relay_url, RELAY_SECONDARY_WSS); 486 assert_eq!( 487 receipt.relay_outcomes[1] 488 .relay_outcome 489 .as_ref() 490 .expect("auth outcome") 491 .kind, 492 RadrootsRelayOutcomeKind::AuthRequired 493 ); 494 assert_eq!(receipt.relay_outcomes[2].relay_url, RELAY_TERTIARY_WSS); 495 assert_eq!( 496 receipt.relay_outcomes[2] 497 .relay_outcome 498 .as_ref() 499 .expect("restricted outcome") 500 .kind, 501 RadrootsRelayOutcomeKind::Restricted 502 ); 503 assert_eq!( 504 receipt.relay_outcomes[3].kind, 505 RadrootsRelayFetchOutcomeKind::Notice 506 ); 507 assert!(receipt.relay_outcomes[3].relay_outcome.is_none()); 508 assert_eq!( 509 receipt.events[0].verification_status.as_deref(), 510 Some(RadrootsEventVerificationStatus::Verified.as_str()) 511 ); 512 assert!(receipt.events[0].projection_eligible); 513 assert_eq!( 514 receipt.events[1].verification_status.as_deref(), 515 Some(RadrootsEventVerificationStatus::Verified.as_str()) 516 ); 517 assert!(!receipt.events[1].projection_eligible); 518 assert_eq!( 519 receipt.events[2].verification_status.as_deref(), 520 Some(RadrootsEventVerificationStatus::Verified.as_str()) 521 ); 522 assert!(!receipt.events[2].projection_eligible); 523 assert_eq!( 524 receipt.events[3].verification_status.as_deref(), 525 Some(RadrootsEventVerificationStatus::IdMismatch.as_str()) 526 ); 527 assert!(!receipt.events[3].projection_eligible); 528 assert_eq!(receipt.events[4].verification_status, None); 529 assert!(!receipt.events[4].projection_eligible); 530 531 let observations = store 532 .observations_for_event(signed.id.as_str()) 533 .await 534 .expect("observations"); 535 assert_eq!(observations.len(), 1); 536 assert_eq!(observations[0].relay_url, RELAY_PRIMARY_WSS); 537 assert_eq!(observations[0].observation_count, 2); 538 } 539 540 #[tokio::test] 541 async fn fetch_event_cap_preserves_later_control_outcomes() { 542 let first = signed_post("first capped event"); 543 let skipped = signed_post("skipped capped event"); 544 let store = RadrootsEventStore::open_memory().await.expect("store"); 545 let adapter = RadrootsMockRelayFetchAdapter::new(vec![ 546 RadrootsRelayFetchItem::Event { 547 relay_url: RELAY_PRIMARY_WSS.to_owned(), 548 raw_json: first.raw_json.clone(), 549 observed_at_ms: 1_100, 550 }, 551 RadrootsRelayFetchItem::Event { 552 relay_url: RELAY_PRIMARY_WSS.to_owned(), 553 raw_json: skipped.raw_json, 554 observed_at_ms: 1_101, 555 }, 556 RadrootsRelayFetchItem::Event { 557 relay_url: RELAY_SECONDARY_WSS.to_owned(), 558 raw_json: "{not json".to_owned(), 559 observed_at_ms: 1_102, 560 }, 561 RadrootsRelayFetchItem::Event { 562 relay_url: RELAY_SECONDARY_WSS.to_owned(), 563 raw_json: unsupported_raw_event(), 564 observed_at_ms: 1_103, 565 }, 566 RadrootsRelayFetchItem::Eose { 567 relay_url: RELAY_PRIMARY_WSS.to_owned(), 568 }, 569 RadrootsRelayFetchItem::Closed { 570 relay_url: RELAY_SECONDARY_WSS.to_owned(), 571 message: "auth-required: challenge".to_owned(), 572 }, 573 RadrootsRelayFetchItem::Notice { 574 relay_url: RELAY_TERTIARY_WSS.to_owned(), 575 message: "notice: still visible".to_owned(), 576 }, 577 ]); 578 579 let receipt = 580 fetch_and_ingest_relay_events(&adapter, &store, RadrootsRelayFetchRequest::fetch(1_100, 1)) 581 .await 582 .expect("fetch ingest"); 583 584 assert_eq!(receipt.inserted_count, 1); 585 assert_eq!(receipt.duplicate_count, 0); 586 assert_eq!(receipt.unsupported_count, 0); 587 assert_eq!(receipt.malformed_count, 0); 588 assert_eq!(receipt.events.len(), 1); 589 assert_eq!(receipt.eose_count, 1); 590 assert_eq!(receipt.closed_count, 1); 591 assert_eq!(receipt.notice_count, 1); 592 assert_eq!(receipt.relay_outcomes.len(), 3); 593 assert_eq!( 594 receipt.relay_outcomes[0].kind, 595 RadrootsRelayFetchOutcomeKind::Eose 596 ); 597 assert_eq!( 598 receipt.relay_outcomes[1] 599 .relay_outcome 600 .as_ref() 601 .expect("closed outcome") 602 .kind, 603 RadrootsRelayOutcomeKind::AuthRequired 604 ); 605 assert_eq!( 606 receipt.relay_outcomes[2].kind, 607 RadrootsRelayFetchOutcomeKind::Notice 608 ); 609 } 610 611 #[tokio::test] 612 async fn fetch_subscription_mode_and_store_errors_are_reported() { 613 let signed = signed_post("subscription"); 614 let store = RadrootsEventStore::open_memory().await.expect("store"); 615 let adapter = RadrootsMockRelayFetchAdapter::new(vec![RadrootsRelayFetchItem::Event { 616 relay_url: RELAY_PRIMARY_WSS.to_owned(), 617 raw_json: signed.raw_json.clone(), 618 observed_at_ms: 1_200, 619 }]); 620 621 let receipt = fetch_and_ingest_relay_events( 622 &adapter, 623 &store, 624 RadrootsRelayFetchRequest::subscription(1_200, 10), 625 ) 626 .await 627 .expect("fetch ingest"); 628 629 assert_eq!(receipt.inserted_count, 1); 630 let observations = store 631 .observations_for_event(signed.id.as_str()) 632 .await 633 .expect("observations"); 634 assert_eq!(observations.len(), 1); 635 assert_eq!(observations[0].observation_type, "subscription"); 636 637 let closed_store = RadrootsEventStore::open_memory().await.expect("store"); 638 closed_store.pool().close().await; 639 let adapter = RadrootsMockRelayFetchAdapter::new(vec![RadrootsRelayFetchItem::Event { 640 relay_url: RELAY_PRIMARY_WSS.to_owned(), 641 raw_json: signed.raw_json, 642 observed_at_ms: 1_210, 643 }]); 644 let receipt = fetch_and_ingest_relay_events( 645 &adapter, 646 &closed_store, 647 RadrootsRelayFetchRequest::fetch(1_210, 10), 648 ) 649 .await 650 .expect("fetch ingest"); 651 652 assert_eq!(receipt.inserted_count, 0); 653 assert_eq!(receipt.malformed_count, 1); 654 assert!(receipt.events[0].malformed); 655 assert!(receipt.events[0].message.is_some()); 656 } 657 658 #[tokio::test] 659 async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() { 660 let signed = signed_post("hello"); 661 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 662 let store = RadrootsEventStore::open_memory().await.expect("store"); 663 let draft = RadrootsFrozenEventDraft::new( 664 "radroots.social.post.v1", 665 KIND_POST, 666 signed.created_at, 667 signed.tags.clone(), 668 signed.content.clone(), 669 signed.pubkey.as_str(), 670 ) 671 .expect("draft"); 672 let receipt = outbox 673 .enqueue_operation(RadrootsOutboxOperationInput::new( 674 "publish_post", 675 draft, 676 vec![ 677 RELAY_PRIMARY_WSS.to_owned(), 678 RELAY_SECONDARY_WSS.to_owned(), 679 RELAY_TERTIARY_WSS.to_owned(), 680 ], 681 1_000, 682 )) 683 .await 684 .expect("enqueue"); 685 let claimed = outbox 686 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 687 .await 688 .expect("claim") 689 .expect("claim"); 690 let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await; 691 outbox.recover_expired_claims(2_001).await.expect("recover"); 692 let publish_claim = outbox 693 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 694 .await 695 .expect("claim") 696 .expect("publish claim"); 697 assert_eq!(publish_claim.state, RadrootsOutboxEventState::Publishing); 698 699 let adapter = RadrootsMockRelayPublishAdapter::new() 700 .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted()) 701 .with_outcome( 702 RELAY_SECONDARY_WSS, 703 RadrootsRelayOutcome::timeout("timeout: no OK"), 704 ) 705 .with_outcome( 706 RELAY_TERTIARY_WSS, 707 RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"), 708 ); 709 let first = publish_claimed_outbox_event( 710 &outbox, 711 &store, 712 &adapter, 713 &publish_claim, 714 RadrootsOutboxPublishPolicy::new(2_500), 715 2_200, 716 ) 717 .await 718 .expect("publish"); 719 720 assert_eq!(first.publish.attempted_count, 3); 721 assert_eq!(first.publish.accepted_count, 2); 722 assert!(!first.publish.quorum_met); 723 let event = outbox 724 .get_event(receipt.outbox_event_id) 725 .await 726 .expect("event") 727 .expect("event"); 728 assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); 729 assert_eq!(event.accepted_quorum, 3); 730 731 let statuses = outbox 732 .relay_statuses(receipt.outbox_event_id) 733 .await 734 .expect("statuses"); 735 assert_eq!( 736 statuses 737 .iter() 738 .find(|status| status.relay_url == RELAY_PRIMARY_WSS) 739 .expect("primary") 740 .status, 741 RadrootsOutboxRelayStatus::Accepted 742 ); 743 assert_eq!( 744 statuses 745 .iter() 746 .find(|status| status.relay_url == RELAY_SECONDARY_WSS) 747 .expect("secondary") 748 .status, 749 RadrootsOutboxRelayStatus::FailedRetryable 750 ); 751 assert_eq!( 752 statuses 753 .iter() 754 .find(|status| status.relay_url == RELAY_TERTIARY_WSS) 755 .expect("tertiary") 756 .status, 757 RadrootsOutboxRelayStatus::Accepted 758 ); 759 760 let retry_claim = outbox 761 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500) 762 .await 763 .expect("claim") 764 .expect("retry claim"); 765 let retry_adapter = RadrootsMockRelayPublishAdapter::new() 766 .with_outcome(RELAY_SECONDARY_WSS, RadrootsRelayOutcome::accepted()); 767 let second = publish_claimed_outbox_event( 768 &outbox, 769 &store, 770 &retry_adapter, 771 &retry_claim, 772 RadrootsOutboxPublishPolicy::new(3_000), 773 2_600, 774 ) 775 .await 776 .expect("retry publish"); 777 778 assert_eq!(second.local_ingest.event_id, signed.id); 779 assert_eq!(second.publish.attempted_count, 1); 780 assert_eq!(retry_adapter.captured_raw_events().len(), 1); 781 782 let event = outbox 783 .get_event(receipt.outbox_event_id) 784 .await 785 .expect("event") 786 .expect("event"); 787 assert_eq!(event.state, RadrootsOutboxEventState::Published); 788 assert_eq!(event.accepted_quorum, 3); 789 let operation = outbox 790 .get_operation(receipt.operation_id) 791 .await 792 .expect("operation") 793 .expect("operation"); 794 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); 795 796 let observations = store 797 .observations_for_event(signed.id.as_str()) 798 .await 799 .expect("observations"); 800 assert_eq!(observations.len(), 3); 801 } 802 803 #[tokio::test] 804 async fn outbox_publish_transport_failure_releases_retryable_claim() { 805 let signed = signed_post("adapter transport failure"); 806 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 807 let store = RadrootsEventStore::open_memory().await.expect("store"); 808 let draft = RadrootsFrozenEventDraft::new( 809 "radroots.social.post.v1", 810 KIND_POST, 811 signed.created_at, 812 signed.tags.clone(), 813 signed.content.clone(), 814 signed.pubkey.as_str(), 815 ) 816 .expect("draft"); 817 let receipt = outbox 818 .enqueue_operation(RadrootsOutboxOperationInput::new( 819 "publish_post", 820 draft, 821 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], 822 1_000, 823 )) 824 .await 825 .expect("enqueue"); 826 let claimed = outbox 827 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 828 .await 829 .expect("claim") 830 .expect("claim"); 831 complete_claimed_signing(&outbox, &claimed, 1_100).await; 832 outbox.recover_expired_claims(2_001).await.expect("recover"); 833 let publish_claim = outbox 834 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 835 .await 836 .expect("claim") 837 .expect("publish claim"); 838 839 let published = publish_claimed_outbox_event( 840 &outbox, 841 &store, 842 &TransportFailurePublishAdapter, 843 &publish_claim, 844 RadrootsOutboxPublishPolicy::new(2_500), 845 2_200, 846 ) 847 .await 848 .expect("publish"); 849 850 assert_eq!(published.publish.attempted_count, 2); 851 assert_eq!(published.publish.accepted_count, 0); 852 assert_eq!(published.publish.retryable_count, 2); 853 assert_eq!(published.publish.terminal_count, 0); 854 assert!(!published.publish.quorum_met); 855 assert!( 856 published 857 .publish 858 .relays 859 .iter() 860 .all(|relay| relay.outcome.kind == RadrootsRelayOutcomeKind::ConnectionFailed) 861 ); 862 863 let event = outbox 864 .get_event(receipt.outbox_event_id) 865 .await 866 .expect("event") 867 .expect("event"); 868 assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); 869 assert!(event.claim_token.is_none()); 870 assert_eq!(event.next_attempt_after_ms, 2_500); 871 872 let statuses = outbox 873 .relay_statuses(receipt.outbox_event_id) 874 .await 875 .expect("statuses"); 876 assert_eq!(statuses.len(), 2); 877 assert!( 878 statuses 879 .iter() 880 .all(|status| status.status == RadrootsOutboxRelayStatus::FailedRetryable) 881 ); 882 assert!( 883 outbox 884 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_499) 885 .await 886 .expect("early claim") 887 .is_none() 888 ); 889 let retry_claim = outbox 890 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500) 891 .await 892 .expect("retry claim") 893 .expect("retry claim"); 894 assert_eq!(retry_claim.outbox_event_id, receipt.outbox_event_id); 895 assert_eq!(retry_claim.state, RadrootsOutboxEventState::Publishing); 896 } 897 898 #[tokio::test] 899 async fn outbox_publish_marks_published_without_adapter_when_all_relays_already_accepted() { 900 let signed = signed_post("already accepted"); 901 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 902 let store = RadrootsEventStore::open_memory().await.expect("store"); 903 let draft = RadrootsFrozenEventDraft::new( 904 "radroots.social.post.v1", 905 KIND_POST, 906 signed.created_at, 907 signed.tags.clone(), 908 signed.content.clone(), 909 signed.pubkey.as_str(), 910 ) 911 .expect("draft"); 912 let receipt = outbox 913 .enqueue_operation(RadrootsOutboxOperationInput::new( 914 "publish_post", 915 draft, 916 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], 917 1_000, 918 )) 919 .await 920 .expect("enqueue"); 921 let claimed = outbox 922 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 923 .await 924 .expect("claim") 925 .expect("claim"); 926 let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await; 927 outbox.recover_expired_claims(2_001).await.expect("recover"); 928 let publish_claim = outbox 929 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 930 .await 931 .expect("claim") 932 .expect("publish claim"); 933 outbox 934 .mark_relay_accepted( 935 publish_claim.outbox_event_id, 936 publish_claim.claim_token.as_str(), 937 RELAY_PRIMARY_WSS, 938 2_150, 939 ) 940 .await 941 .expect("primary accepted"); 942 outbox 943 .mark_relay_accepted( 944 publish_claim.outbox_event_id, 945 publish_claim.claim_token.as_str(), 946 RELAY_SECONDARY_WSS, 947 2_151, 948 ) 949 .await 950 .expect("secondary accepted"); 951 952 let adapter = RadrootsMockRelayPublishAdapter::new(); 953 let published = publish_claimed_outbox_event( 954 &outbox, 955 &store, 956 &adapter, 957 &publish_claim, 958 RadrootsOutboxPublishPolicy::new(2_500), 959 2_200, 960 ) 961 .await 962 .expect("publish"); 963 964 assert_eq!(published.local_ingest.event_id, signed.id); 965 assert_eq!(published.publish.event_id, signed.id); 966 assert_eq!(published.publish.attempted_count, 0); 967 assert_eq!(published.publish.accepted_count, 2); 968 assert_eq!(published.publish.quorum, 2); 969 assert!(published.publish.quorum_met); 970 assert!(published.publish.relays.is_empty()); 971 assert!(adapter.captured_raw_events().is_empty()); 972 973 let event = outbox 974 .get_event(receipt.outbox_event_id) 975 .await 976 .expect("event") 977 .expect("event"); 978 assert_eq!(event.state, RadrootsOutboxEventState::Published); 979 assert_eq!(event.accepted_quorum, 2); 980 assert!(event.claim_token.is_none()); 981 let operation = outbox 982 .get_operation(receipt.operation_id) 983 .await 984 .expect("operation") 985 .expect("operation"); 986 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); 987 } 988 989 #[tokio::test] 990 async fn outbox_publish_uses_persisted_accepted_count_for_explicit_quorum() { 991 let signed = signed_post("explicit quorum already accepted"); 992 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 993 let store = RadrootsEventStore::open_memory().await.expect("store"); 994 let draft = RadrootsFrozenEventDraft::new( 995 "radroots.social.post.v1", 996 KIND_POST, 997 signed.created_at, 998 signed.tags.clone(), 999 signed.content.clone(), 1000 signed.pubkey.as_str(), 1001 ) 1002 .expect("draft"); 1003 let receipt = outbox 1004 .enqueue_operation(RadrootsOutboxOperationInput::new( 1005 "publish_post", 1006 draft, 1007 vec![ 1008 RELAY_PRIMARY_WSS.to_owned(), 1009 RELAY_SECONDARY_WSS.to_owned(), 1010 RELAY_TERTIARY_WSS.to_owned(), 1011 ], 1012 1_000, 1013 )) 1014 .await 1015 .expect("enqueue"); 1016 let claimed = outbox 1017 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 1018 .await 1019 .expect("claim") 1020 .expect("claim"); 1021 complete_claimed_signing(&outbox, &claimed, 1_100).await; 1022 outbox.recover_expired_claims(2_001).await.expect("recover"); 1023 let publish_claim = outbox 1024 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 1025 .await 1026 .expect("claim") 1027 .expect("publish claim"); 1028 outbox 1029 .mark_relay_accepted( 1030 publish_claim.outbox_event_id, 1031 publish_claim.claim_token.as_str(), 1032 RELAY_PRIMARY_WSS, 1033 2_150, 1034 ) 1035 .await 1036 .expect("primary accepted"); 1037 outbox 1038 .mark_relay_accepted( 1039 publish_claim.outbox_event_id, 1040 publish_claim.claim_token.as_str(), 1041 RELAY_SECONDARY_WSS, 1042 2_151, 1043 ) 1044 .await 1045 .expect("secondary accepted"); 1046 1047 let adapter = RadrootsMockRelayPublishAdapter::new(); 1048 let published = publish_claimed_outbox_event( 1049 &outbox, 1050 &store, 1051 &adapter, 1052 &publish_claim, 1053 RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2), 1054 2_200, 1055 ) 1056 .await 1057 .expect("publish"); 1058 1059 assert_eq!(published.publish.attempted_count, 0); 1060 assert_eq!(published.publish.accepted_count, 2); 1061 assert_eq!(published.publish.quorum, 2); 1062 assert!(published.publish.quorum_met); 1063 assert!(adapter.captured_raw_events().is_empty()); 1064 1065 let event = outbox 1066 .get_event(receipt.outbox_event_id) 1067 .await 1068 .expect("event") 1069 .expect("event"); 1070 assert_eq!(event.state, RadrootsOutboxEventState::Published); 1071 assert_eq!(event.accepted_quorum, 2); 1072 let statuses = outbox 1073 .relay_statuses(receipt.outbox_event_id) 1074 .await 1075 .expect("statuses"); 1076 assert_eq!( 1077 statuses 1078 .iter() 1079 .find(|status| status.relay_url == RELAY_TERTIARY_WSS) 1080 .expect("tertiary") 1081 .status, 1082 RadrootsOutboxRelayStatus::Pending 1083 ); 1084 } 1085 1086 #[tokio::test] 1087 async fn outbox_publish_marks_published_when_policy_quorum_is_met_with_failure_diagnostics() { 1088 let signed = signed_post("quorum"); 1089 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 1090 let store = RadrootsEventStore::open_memory().await.expect("store"); 1091 let draft = RadrootsFrozenEventDraft::new( 1092 "radroots.social.post.v1", 1093 KIND_POST, 1094 signed.created_at, 1095 signed.tags.clone(), 1096 signed.content.clone(), 1097 signed.pubkey.as_str(), 1098 ) 1099 .expect("draft"); 1100 let receipt = outbox 1101 .enqueue_operation(RadrootsOutboxOperationInput::new( 1102 "publish_post", 1103 draft, 1104 vec![ 1105 RELAY_PRIMARY_WSS.to_owned(), 1106 RELAY_SECONDARY_WSS.to_owned(), 1107 RELAY_TERTIARY_WSS.to_owned(), 1108 ], 1109 1_000, 1110 )) 1111 .await 1112 .expect("enqueue"); 1113 let claimed = outbox 1114 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 1115 .await 1116 .expect("claim") 1117 .expect("claim"); 1118 let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await; 1119 outbox.recover_expired_claims(2_001).await.expect("recover"); 1120 let publish_claim = outbox 1121 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 1122 .await 1123 .expect("claim") 1124 .expect("publish claim"); 1125 1126 let adapter = RadrootsMockRelayPublishAdapter::new() 1127 .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted()) 1128 .with_outcome( 1129 RELAY_SECONDARY_WSS, 1130 RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"), 1131 ) 1132 .with_outcome( 1133 RELAY_TERTIARY_WSS, 1134 RadrootsRelayOutcome::classify("restricted: group write denied"), 1135 ); 1136 let published = publish_claimed_outbox_event( 1137 &outbox, 1138 &store, 1139 &adapter, 1140 &publish_claim, 1141 RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2), 1142 2_200, 1143 ) 1144 .await 1145 .expect("publish"); 1146 1147 assert_eq!(published.publish.quorum, 2); 1148 assert_eq!(published.publish.accepted_count, 2); 1149 assert_eq!(published.publish.terminal_count, 1); 1150 assert!(published.publish.quorum_met); 1151 1152 let event = outbox 1153 .get_event(receipt.outbox_event_id) 1154 .await 1155 .expect("event") 1156 .expect("event"); 1157 assert_eq!(event.state, RadrootsOutboxEventState::Published); 1158 assert_eq!(event.accepted_quorum, 2); 1159 assert!(event.claim_token.is_none()); 1160 let operation = outbox 1161 .get_operation(receipt.operation_id) 1162 .await 1163 .expect("operation") 1164 .expect("operation"); 1165 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); 1166 1167 let statuses = outbox 1168 .relay_statuses(receipt.outbox_event_id) 1169 .await 1170 .expect("statuses"); 1171 assert_eq!( 1172 statuses 1173 .iter() 1174 .find(|status| status.relay_url == RELAY_TERTIARY_WSS) 1175 .expect("tertiary") 1176 .status, 1177 RadrootsOutboxRelayStatus::FailedTerminal 1178 ); 1179 assert!( 1180 outbox 1181 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_300) 1182 .await 1183 .expect("claim") 1184 .is_none() 1185 ); 1186 1187 let observations = store 1188 .observations_for_event(signed.id.as_str()) 1189 .await 1190 .expect("observations"); 1191 assert_eq!(observations.len(), 2); 1192 } 1193 1194 #[tokio::test] 1195 async fn outbox_publish_republishes_accepted_relays_when_policy_requests_it() { 1196 let signed = signed_post("republish accepted"); 1197 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 1198 let store = RadrootsEventStore::open_memory().await.expect("store"); 1199 let draft = RadrootsFrozenEventDraft::new( 1200 "radroots.social.post.v1", 1201 KIND_POST, 1202 signed.created_at, 1203 signed.tags.clone(), 1204 signed.content.clone(), 1205 signed.pubkey.as_str(), 1206 ) 1207 .expect("draft"); 1208 let receipt = outbox 1209 .enqueue_operation(RadrootsOutboxOperationInput::new( 1210 "publish_post", 1211 draft, 1212 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], 1213 1_000, 1214 )) 1215 .await 1216 .expect("enqueue"); 1217 let claimed = outbox 1218 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 1219 .await 1220 .expect("claim") 1221 .expect("claim"); 1222 let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await; 1223 outbox.recover_expired_claims(2_001).await.expect("recover"); 1224 let publish_claim = outbox 1225 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 1226 .await 1227 .expect("claim") 1228 .expect("publish claim"); 1229 outbox 1230 .mark_relay_accepted( 1231 publish_claim.outbox_event_id, 1232 publish_claim.claim_token.as_str(), 1233 RELAY_PRIMARY_WSS, 1234 2_150, 1235 ) 1236 .await 1237 .expect("primary accepted"); 1238 1239 let adapter = RadrootsMockRelayPublishAdapter::new() 1240 .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted()) 1241 .with_outcome(RELAY_SECONDARY_WSS, RadrootsRelayOutcome::accepted()); 1242 let published = publish_claimed_outbox_event( 1243 &outbox, 1244 &store, 1245 &adapter, 1246 &publish_claim, 1247 RadrootsOutboxPublishPolicy::new(2_500) 1248 .republish_accepted_relays(true) 1249 .relay_url_policy(RadrootsRelayUrlPolicy::Public), 1250 2_200, 1251 ) 1252 .await 1253 .expect("publish"); 1254 1255 assert_eq!(published.local_ingest.event_id, signed.id); 1256 assert_eq!(published.publish.attempted_count, 2); 1257 assert_eq!(published.publish.accepted_count, 2); 1258 assert_eq!(published.publish.quorum, 1); 1259 assert!(published.publish.quorum_met); 1260 assert_eq!(adapter.captured_raw_events().len(), 1); 1261 1262 let event = outbox 1263 .get_event(receipt.outbox_event_id) 1264 .await 1265 .expect("event") 1266 .expect("event"); 1267 assert_eq!(event.state, RadrootsOutboxEventState::Published); 1268 let statuses = outbox 1269 .relay_statuses(receipt.outbox_event_id) 1270 .await 1271 .expect("statuses"); 1272 assert!( 1273 statuses 1274 .iter() 1275 .all(|status| status.status == RadrootsOutboxRelayStatus::Accepted) 1276 ); 1277 } 1278 1279 #[tokio::test] 1280 async fn outbox_publish_requires_claimed_signed_event() { 1281 let signed = signed_post("missing signature"); 1282 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 1283 let store = RadrootsEventStore::open_memory().await.expect("store"); 1284 let draft = RadrootsFrozenEventDraft::new( 1285 "radroots.social.post.v1", 1286 KIND_POST, 1287 signed.created_at, 1288 signed.tags, 1289 signed.content, 1290 signed.pubkey.as_str(), 1291 ) 1292 .expect("draft"); 1293 let receipt = outbox 1294 .enqueue_operation(RadrootsOutboxOperationInput::new( 1295 "publish_post", 1296 draft, 1297 vec![RELAY_PRIMARY_WSS.to_owned()], 1298 1_000, 1299 )) 1300 .await 1301 .expect("enqueue"); 1302 let claimed = outbox 1303 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 1304 .await 1305 .expect("claim") 1306 .expect("claim"); 1307 let adapter = RadrootsMockRelayPublishAdapter::new(); 1308 1309 let error = publish_claimed_outbox_event( 1310 &outbox, 1311 &store, 1312 &adapter, 1313 &claimed, 1314 RadrootsOutboxPublishPolicy::new(2_500), 1315 1_100, 1316 ) 1317 .await 1318 .expect_err("missing signed event"); 1319 1320 assert!(matches!( 1321 error, 1322 RadrootsRelayTransportError::MissingSignedOutboxEvent(event_id) 1323 if event_id == receipt.outbox_event_id 1324 )); 1325 assert!(adapter.captured_raw_events().is_empty()); 1326 } 1327 1328 #[tokio::test] 1329 async fn outbox_publish_propagates_non_transport_adapter_errors_after_target_filtering() { 1330 let signed = signed_post("adapter non transport failure"); 1331 let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); 1332 let store = RadrootsEventStore::open_memory().await.expect("store"); 1333 let draft = RadrootsFrozenEventDraft::new( 1334 "radroots.social.post.v1", 1335 KIND_POST, 1336 signed.created_at, 1337 signed.tags, 1338 signed.content, 1339 signed.pubkey.as_str(), 1340 ) 1341 .expect("draft"); 1342 let receipt = outbox 1343 .enqueue_operation(RadrootsOutboxOperationInput::new( 1344 "publish_post", 1345 draft, 1346 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], 1347 1_000, 1348 )) 1349 .await 1350 .expect("enqueue"); 1351 let claimed = outbox 1352 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 1353 .await 1354 .expect("claim") 1355 .expect("claim"); 1356 complete_claimed_signing(&outbox, &claimed, 1_100).await; 1357 outbox.recover_expired_claims(2_001).await.expect("recover"); 1358 let mut publish_claim = outbox 1359 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 1360 .await 1361 .expect("claim") 1362 .expect("publish claim"); 1363 publish_claim.target_relays = vec![RELAY_PRIMARY_WSS.to_owned()]; 1364 1365 let error = publish_claimed_outbox_event( 1366 &outbox, 1367 &store, 1368 &NostrJsonFailurePublishAdapter, 1369 &publish_claim, 1370 RadrootsOutboxPublishPolicy::new(2_500), 1371 2_200, 1372 ) 1373 .await 1374 .expect_err("adapter error"); 1375 1376 assert!(matches!( 1377 error, 1378 RadrootsRelayTransportError::NostrEventJson(_) 1379 )); 1380 let event = outbox 1381 .get_event(receipt.outbox_event_id) 1382 .await 1383 .expect("event") 1384 .expect("event"); 1385 assert_eq!(event.accepted_quorum, 1); 1386 } 1387 1388 #[tokio::test] 1389 async fn smoke_relay_fetch_processes_one_thousand_event_receipts() { 1390 let store = RadrootsEventStore::open_memory().await.expect("store"); 1391 let mut items = Vec::new(); 1392 for index in 0..1_000 { 1393 let signed = signed_post(format!("fetch-smoke-{index}").as_str()); 1394 let relay_url = match index % 3 { 1395 0 => RELAY_PRIMARY_WSS, 1396 1 => RELAY_SECONDARY_WSS, 1397 _ => RELAY_TERTIARY_WSS, 1398 }; 1399 items.push(RadrootsRelayFetchItem::Event { 1400 relay_url: relay_url.to_owned(), 1401 raw_json: signed.raw_json, 1402 observed_at_ms: 10_000 + index, 1403 }); 1404 } 1405 let adapter = RadrootsMockRelayFetchAdapter::new(items); 1406 let receipt = fetch_and_ingest_relay_events( 1407 &adapter, 1408 &store, 1409 RadrootsRelayFetchRequest::fetch(10_000, 1_000), 1410 ) 1411 .await 1412 .expect("fetch"); 1413 1414 assert_eq!(receipt.inserted_count, 1_000); 1415 assert_eq!(receipt.duplicate_count, 0); 1416 assert_eq!(receipt.malformed_count, 0); 1417 assert_eq!(receipt.unsupported_count, 0); 1418 assert_eq!(receipt.events.len(), 1_000); 1419 assert!(receipt.events.iter().all(|event| event.projection_eligible)); 1420 let replay = store 1421 .events_since_cursor("fetch-smoke", 1_000) 1422 .await 1423 .expect("replay"); 1424 assert_eq!(replay.len(), 1_000); 1425 }