lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

transport.rs (48550B)


      1 use futures::future::BoxFuture;
      2 use nostr::JsonUtil;
      3 use radroots_event_store::{RadrootsEventStore, RadrootsEventVerificationStatus};
      4 use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent};
      5 use radroots_events::kinds::KIND_POST;
      6 use radroots_nostr::prelude::{
      7     RadrootsNostrKeys, RadrootsNostrSecretKey, RadrootsNostrTimestamp, radroots_nostr_build_event,
      8     radroots_nostr_sign_frozen_draft,
      9 };
     10 use radroots_outbox::{
     11     RadrootsOutbox, RadrootsOutboxClaimedEvent, RadrootsOutboxEventState,
     12     RadrootsOutboxOperationInput, RadrootsOutboxOperationStatus, RadrootsOutboxRelayStatus,
     13 };
     14 use radroots_relay_transport::{
     15     RadrootsMockRelayFetchAdapter, RadrootsMockRelayPublishAdapter, RadrootsOutboxPublishPolicy,
     16     RadrootsRelayFetchItem, RadrootsRelayFetchOutcomeKind, RadrootsRelayFetchRequest,
     17     RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter,
     18     RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, RadrootsRelayTargetSet,
     19     RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy,
     20     fetch_and_ingest_relay_events, publish_claimed_outbox_event, publish_signed_event,
     21 };
     22 use std::net::{IpAddr, Ipv4Addr};
     23 
     24 const FIXTURE_ALICE_SECRET_KEY_HEX: &str =
     25     "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5";
     26 const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str =
     27     "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df";
     28 const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com";
     29 const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com";
     30 const RELAY_TERTIARY_WSS: &str = "wss://relay-3.example.com";
     31 
     32 struct TransportFailurePublishAdapter;
     33 
     34 impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter {
     35     fn publish<'a>(
     36         &'a self,
     37         _request: RadrootsRelayPublishRequest,
     38     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
     39     {
     40         Box::pin(async {
     41             Err(RadrootsRelayTransportError::Transport(
     42                 "adapter boundary unavailable".to_owned(),
     43             ))
     44         })
     45     }
     46 }
     47 
     48 struct NostrJsonFailurePublishAdapter;
     49 
     50 impl RadrootsRelayPublishAdapter for NostrJsonFailurePublishAdapter {
     51     fn publish<'a>(
     52         &'a self,
     53         _request: RadrootsRelayPublishRequest,
     54     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
     55     {
     56         Box::pin(async {
     57             Err(RadrootsRelayTransportError::NostrEventJson(
     58                 "adapter rejected raw event".to_owned(),
     59             ))
     60         })
     61     }
     62 }
     63 
     64 fn fixture_keys() -> RadrootsNostrKeys {
     65     let secret_key =
     66         RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key");
     67     RadrootsNostrKeys::new(secret_key)
     68 }
     69 
     70 fn signed_post(content: &str) -> RadrootsSignedNostrEvent {
     71     let draft = RadrootsFrozenEventDraft::new(
     72         "radroots.social.post.v1",
     73         KIND_POST,
     74         1_700_000_000,
     75         vec![vec!["t".to_owned(), "soil".to_owned()]],
     76         content,
     77         FIXTURE_ALICE_PUBLIC_KEY_HEX,
     78     )
     79     .expect("draft");
     80     radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event")
     81 }
     82 
     83 async fn complete_claimed_signing(
     84     outbox: &RadrootsOutbox,
     85     claimed: &RadrootsOutboxClaimedEvent,
     86     now_ms: i64,
     87 ) -> RadrootsSignedNostrEvent {
     88     if let Some(signed_event) = claimed.signed_event.clone() {
     89         return signed_event;
     90     }
     91     let signed_event =
     92         radroots_nostr_sign_frozen_draft(&fixture_keys(), &claimed.draft).expect("signed event");
     93     outbox
     94         .complete_signing(
     95             claimed.outbox_event_id,
     96             claimed.claim_token.as_str(),
     97             signed_event,
     98             now_ms,
     99         )
    100         .await
    101         .expect("complete signing")
    102 }
    103 
    104 fn unsupported_raw_event() -> String {
    105     let event = radroots_nostr_build_event(999, "unsupported", Vec::new())
    106         .expect("event builder")
    107         .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_001))
    108         .sign_with_keys(&fixture_keys())
    109         .expect("signed unsupported event");
    110     event.as_json()
    111 }
    112 
    113 fn tampered_raw_event() -> String {
    114     let signed = signed_post("trusted");
    115     let mut value =
    116         serde_json::from_str::<serde_json::Value>(signed.raw_json.as_str()).expect("raw json");
    117     value["content"] = serde_json::Value::String("tampered".to_owned());
    118     serde_json::to_string(&value).expect("tampered json")
    119 }
    120 
    121 #[test]
    122 fn relay_url_validation_and_target_normalization() {
    123     let relay = RadrootsRelayUrl::parse("wss://Relay.Example.com", RadrootsRelayUrlPolicy::Public)
    124         .expect("relay");
    125     assert_eq!(relay.as_str(), RELAY_PRIMARY_WSS);
    126     assert_eq!(relay.clone().into_string(), RELAY_PRIMARY_WSS);
    127     let relay_path = RadrootsRelayUrl::parse(
    128         "wss://Relay.Example.com/nostr",
    129         RadrootsRelayUrlPolicy::Public,
    130     )
    131     .expect("relay path");
    132     assert_eq!(relay_path.as_str(), "wss://relay.example.com/nostr");
    133 
    134     assert!(
    135         RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::Public).is_err()
    136     );
    137     let local = RadrootsRelayUrl::parse("ws://localhost:7777", RadrootsRelayUrlPolicy::Localhost)
    138         .expect("local relay");
    139     assert_eq!(local.as_str(), "ws://localhost:7777");
    140     let local_ipv4 =
    141         RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::Localhost)
    142             .expect("local ipv4 relay");
    143     assert_eq!(local_ipv4.as_str(), "ws://127.0.0.1:7777");
    144     let local_ipv6 = RadrootsRelayUrl::parse("ws://[::1]:7777", RadrootsRelayUrlPolicy::Localhost)
    145         .expect("local ipv6 relay");
    146     assert_eq!(local_ipv6.as_str(), "ws://[::1]:7777");
    147     assert!(
    148         RadrootsRelayUrl::parse("ws://example.com", RadrootsRelayUrlPolicy::Localhost).is_err()
    149     );
    150     assert!(
    151         RadrootsRelayUrl::parse("ws://192.168.1.10:7777", RadrootsRelayUrlPolicy::Localhost)
    152             .is_err()
    153     );
    154     assert!(matches!(
    155         RadrootsRelayUrl::parse("wss://127.0.0.1", RadrootsRelayUrlPolicy::Public),
    156         Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. })
    157     ));
    158     assert!(matches!(
    159         RadrootsRelayUrl::parse("wss://10.1.2.3", RadrootsRelayUrlPolicy::Public),
    160         Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. })
    161     ));
    162     assert!(matches!(
    163         RadrootsRelayUrl::parse("wss://[::1]", RadrootsRelayUrlPolicy::Public),
    164         Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. })
    165     ));
    166     assert!(matches!(
    167         RadrootsRelayUrl::parse("wss://[fd00::1]", RadrootsRelayUrlPolicy::Public),
    168         Err(RadrootsRelayTransportError::RelayUrlForbiddenDestination { .. })
    169     ));
    170     let public_relay =
    171         RadrootsRelayUrl::parse("wss://relay.example.com", RadrootsRelayUrlPolicy::Public)
    172             .expect("public relay");
    173     public_relay
    174         .validate_public_resolved_ip_addrs([IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34))])
    175         .expect("public resolved ip");
    176     assert!(matches!(
    177         public_relay
    178             .validate_public_resolved_ip_addrs([IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10))]),
    179         Err(RadrootsRelayTransportError::RelayUrlResolvedForbiddenDestination { .. })
    180     ));
    181 
    182     assert!(
    183         RadrootsRelayUrl::parse("https://relay.example.com", RadrootsRelayUrlPolicy::Public)
    184             .is_err()
    185     );
    186     assert!(
    187         RadrootsRelayUrl::parse(
    188             "wss://user@relay.example.com",
    189             RadrootsRelayUrlPolicy::Public
    190         )
    191         .is_err()
    192     );
    193     assert!(matches!(
    194         RadrootsRelayUrl::parse(
    195             "wss://user:password@relay.example.com",
    196             RadrootsRelayUrlPolicy::Public
    197         ),
    198         Err(RadrootsRelayTransportError::RelayUrlUserinfo { .. })
    199     ));
    200     assert!(matches!(
    201         RadrootsRelayUrl::parse(
    202             "wss://:password@relay.example.com",
    203             RadrootsRelayUrlPolicy::Public
    204         ),
    205         Err(RadrootsRelayTransportError::RelayUrlUserinfo { .. })
    206     ));
    207     assert!(
    208         RadrootsRelayUrl::parse(
    209             "wss://relay.example.com:bad",
    210             RadrootsRelayUrlPolicy::Public
    211         )
    212         .is_err()
    213     );
    214     assert!(RadrootsRelayUrl::parse("wss://", RadrootsRelayUrlPolicy::Public).is_err());
    215     assert!(matches!(
    216         RadrootsRelayUrl::parse("radroots:relay", RadrootsRelayUrlPolicy::Public),
    217         Err(RadrootsRelayTransportError::EmptyRelayHost { .. })
    218     ));
    219     assert!(matches!(
    220         RadrootsRelayUrl::parse("relay.example.com", RadrootsRelayUrlPolicy::Public),
    221         Err(RadrootsRelayTransportError::RelayUrlParse { .. })
    222     ));
    223     assert!(
    224         RadrootsRelayUrl::parse(
    225             "wss://relay.example.com?subscription=1",
    226             RadrootsRelayUrlPolicy::Public
    227         )
    228         .is_err()
    229     );
    230     assert!(
    231         RadrootsRelayUrl::parse(
    232             "wss://relay.example.com#fragment",
    233             RadrootsRelayUrlPolicy::Public
    234         )
    235         .is_err()
    236     );
    237 
    238     let targets = RadrootsRelayTargetSet::new(
    239         vec![
    240             RELAY_TERTIARY_WSS,
    241             RELAY_PRIMARY_WSS,
    242             RELAY_PRIMARY_WSS,
    243             RELAY_SECONDARY_WSS,
    244         ],
    245         RadrootsRelayUrlPolicy::Public,
    246     )
    247     .expect("targets");
    248     assert_eq!(
    249         targets.relay_strings(),
    250         vec![
    251             RELAY_TERTIARY_WSS.to_owned(),
    252             RELAY_PRIMARY_WSS.to_owned(),
    253             RELAY_SECONDARY_WSS.to_owned()
    254         ]
    255     );
    256 
    257     let from_urls = RadrootsRelayTargetSet::from_urls(vec![
    258         relay_path.clone(),
    259         relay_path.clone(),
    260         RadrootsRelayUrl::parse(RELAY_SECONDARY_WSS, RadrootsRelayUrlPolicy::Public)
    261             .expect("secondary"),
    262     ])
    263     .expect("from urls");
    264     assert_eq!(from_urls.len(), 2);
    265     assert!(!from_urls.is_empty());
    266     assert_eq!(from_urls.relays()[0], relay_path);
    267     assert_eq!(
    268         from_urls.relays()[0].to_string(),
    269         "wss://relay.example.com/nostr"
    270     );
    271     assert!(matches!(
    272         RadrootsRelayTargetSet::new(Vec::<&str>::new(), RadrootsRelayUrlPolicy::Public),
    273         Err(RadrootsRelayTransportError::EmptyTargetSet)
    274     ));
    275     assert!(matches!(
    276         RadrootsRelayTargetSet::from_urls(Vec::new()),
    277         Err(RadrootsRelayTransportError::EmptyTargetSet)
    278     ));
    279 }
    280 
    281 #[test]
    282 fn outcome_prefix_classification_covers_required_kinds() {
    283     let cases = [
    284         ("blocked: policy", RadrootsRelayOutcomeKind::Blocked),
    285         (
    286             "rate-limited: slow down",
    287             RadrootsRelayOutcomeKind::RateLimited,
    288         ),
    289         ("invalid: bad event", RadrootsRelayOutcomeKind::Invalid),
    290         ("pow: difficulty 24", RadrootsRelayOutcomeKind::PowRequired),
    291         (
    292             "restricted: group write denied",
    293             RadrootsRelayOutcomeKind::Restricted,
    294         ),
    295         (
    296             "auth-required: challenge",
    297             RadrootsRelayOutcomeKind::AuthRequired,
    298         ),
    299         ("mute: pubkey muted", RadrootsRelayOutcomeKind::Muted),
    300         (
    301             "unsupported: event kind",
    302             RadrootsRelayOutcomeKind::Unsupported,
    303         ),
    304         (
    305             "payment-required: paid relay",
    306             RadrootsRelayOutcomeKind::PaymentRequired,
    307         ),
    308         (
    309             "duplicate: already have it",
    310             RadrootsRelayOutcomeKind::DuplicateAccepted,
    311         ),
    312         ("error: relay failed", RadrootsRelayOutcomeKind::Error),
    313         ("timeout: no OK", RadrootsRelayOutcomeKind::Timeout),
    314         ("strange relay text", RadrootsRelayOutcomeKind::Unknown),
    315     ];
    316 
    317     for (message, kind) in cases {
    318         let outcome = RadrootsRelayOutcome::classify(message);
    319         assert_eq!(outcome.kind, kind);
    320     }
    321 
    322     assert!(RadrootsRelayOutcome::classify("duplicate: already have it").counts_toward_quorum());
    323     assert!(
    324         RadrootsRelayOutcome::skipped_already_accepted("already accepted").counts_toward_quorum()
    325     );
    326     assert!(RadrootsRelayOutcome::classify("auth-required: challenge").is_retryable());
    327     assert!(RadrootsRelayOutcome::classify("restricted: denied").is_terminal_failure());
    328     assert!(RadrootsRelayOutcome::relay_url_rejected("unsafe relay").is_terminal_failure());
    329     assert!(RadrootsRelayOutcome::classify("mute: pubkey muted").is_terminal_failure());
    330 }
    331 
    332 #[tokio::test]
    333 async fn mock_publish_preserves_exact_raw_json_and_counts_outcomes() {
    334     let signed = signed_post("hello");
    335     let targets = RadrootsRelayTargetSet::new(
    336         vec![RELAY_PRIMARY_WSS, RELAY_SECONDARY_WSS, RELAY_TERTIARY_WSS],
    337         RadrootsRelayUrlPolicy::Public,
    338     )
    339     .expect("targets");
    340     let adapter = RadrootsMockRelayPublishAdapter::new()
    341         .with_outcome(
    342             RELAY_SECONDARY_WSS,
    343             RadrootsRelayOutcome::classify("duplicate: already have it"),
    344         )
    345         .with_outcome(
    346             RELAY_TERTIARY_WSS,
    347             RadrootsRelayOutcome::classify("auth-required: challenge"),
    348         );
    349 
    350     let receipt = publish_signed_event(
    351         &adapter,
    352         radroots_relay_transport::RadrootsRelayPublishRequest::new(signed.clone(), targets, 1_000)
    353             .with_accepted_quorum(2),
    354     )
    355     .await
    356     .expect("publish");
    357 
    358     assert_eq!(adapter.captured_raw_events(), vec![signed.raw_json]);
    359     assert_eq!(receipt.attempted_count, 3);
    360     assert_eq!(receipt.accepted_count, 2);
    361     assert_eq!(receipt.retryable_count, 1);
    362     assert!(receipt.quorum_met);
    363     serde_json::to_string(&receipt).expect("receipt json");
    364 }
    365 
    366 #[tokio::test]
    367 async fn publish_receipts_track_terminal_skipped_and_adapter_errors() {
    368     let signed = signed_post("terminal");
    369     let targets = RadrootsRelayTargetSet::new(
    370         vec![RELAY_PRIMARY_WSS, RELAY_SECONDARY_WSS],
    371         RadrootsRelayUrlPolicy::Public,
    372     )
    373     .expect("targets");
    374     let adapter = RadrootsMockRelayPublishAdapter::new().with_outcome(
    375         RELAY_SECONDARY_WSS,
    376         RadrootsRelayOutcome::classify("restricted: group write denied"),
    377     );
    378 
    379     let receipt = publish_signed_event(
    380         &adapter,
    381         RadrootsRelayPublishRequest::new(signed.clone(), targets, 1_050).with_accepted_quorum(2),
    382     )
    383     .await
    384     .expect("publish");
    385 
    386     assert_eq!(receipt.event_id, signed.id);
    387     assert_eq!(receipt.attempted_count, 2);
    388     assert_eq!(receipt.accepted_count, 1);
    389     assert_eq!(receipt.retryable_count, 0);
    390     assert_eq!(receipt.terminal_count, 1);
    391     assert_eq!(receipt.quorum, 2);
    392     assert!(!receipt.quorum_met);
    393 
    394     let skipped = RadrootsRelayPublishRelayReceipt::skipped(
    395         RELAY_TERTIARY_WSS,
    396         RadrootsRelayOutcome::timeout("timeout: no OK"),
    397     );
    398     assert_eq!(skipped.relay_url, RELAY_TERTIARY_WSS);
    399     assert!(!skipped.attempted);
    400     assert_eq!(skipped.outcome.kind, RadrootsRelayOutcomeKind::Timeout);
    401 
    402     let error = publish_signed_event(
    403         &TransportFailurePublishAdapter,
    404         RadrootsRelayPublishRequest::new(
    405             signed,
    406             RadrootsRelayTargetSet::new(vec![RELAY_PRIMARY_WSS], RadrootsRelayUrlPolicy::Public)
    407                 .expect("targets"),
    408             1_060,
    409         ),
    410     )
    411     .await
    412     .expect_err("transport failure");
    413     assert!(matches!(error, RadrootsRelayTransportError::Transport(_)));
    414 }
    415 
    416 #[tokio::test]
    417 async fn fetch_ingests_events_and_records_relay_observations() {
    418     let signed = signed_post("hello");
    419     let store = RadrootsEventStore::open_memory().await.expect("store");
    420     let adapter = RadrootsMockRelayFetchAdapter::new(vec![
    421         RadrootsRelayFetchItem::Event {
    422             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    423             raw_json: signed.raw_json.clone(),
    424             observed_at_ms: 1_000,
    425         },
    426         RadrootsRelayFetchItem::Event {
    427             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    428             raw_json: signed.raw_json.clone(),
    429             observed_at_ms: 1_001,
    430         },
    431         RadrootsRelayFetchItem::Event {
    432             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    433             raw_json: unsupported_raw_event(),
    434             observed_at_ms: 1_002,
    435         },
    436         RadrootsRelayFetchItem::Event {
    437             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    438             raw_json: tampered_raw_event(),
    439             observed_at_ms: 1_003,
    440         },
    441         RadrootsRelayFetchItem::Event {
    442             relay_url: RELAY_TERTIARY_WSS.to_owned(),
    443             raw_json: "{not json".to_owned(),
    444             observed_at_ms: 1_004,
    445         },
    446         RadrootsRelayFetchItem::Eose {
    447             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    448         },
    449         RadrootsRelayFetchItem::Closed {
    450             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    451             message: "auth-required: challenge".to_owned(),
    452         },
    453         RadrootsRelayFetchItem::Closed {
    454             relay_url: RELAY_TERTIARY_WSS.to_owned(),
    455             message: "restricted: group write denied".to_owned(),
    456         },
    457         RadrootsRelayFetchItem::Notice {
    458             relay_url: RELAY_TERTIARY_WSS.to_owned(),
    459             message: "notice: test".to_owned(),
    460         },
    461     ]);
    462 
    463     let receipt = fetch_and_ingest_relay_events(
    464         &adapter,
    465         &store,
    466         RadrootsRelayFetchRequest::fetch(1_000, 10),
    467     )
    468     .await
    469     .expect("fetch ingest");
    470 
    471     assert_eq!(receipt.inserted_count, 3);
    472     assert_eq!(receipt.duplicate_count, 1);
    473     assert_eq!(receipt.unsupported_count, 1);
    474     assert_eq!(receipt.malformed_count, 1);
    475     assert_eq!(receipt.eose_count, 1);
    476     assert_eq!(receipt.closed_count, 2);
    477     assert_eq!(receipt.notice_count, 1);
    478     assert_eq!(receipt.relay_outcomes.len(), 4);
    479     assert_eq!(receipt.relay_outcomes[0].relay_url, RELAY_PRIMARY_WSS);
    480     assert_eq!(
    481         receipt.relay_outcomes[0].kind,
    482         RadrootsRelayFetchOutcomeKind::Eose
    483     );
    484     assert!(receipt.relay_outcomes[0].relay_outcome.is_none());
    485     assert_eq!(receipt.relay_outcomes[1].relay_url, RELAY_SECONDARY_WSS);
    486     assert_eq!(
    487         receipt.relay_outcomes[1]
    488             .relay_outcome
    489             .as_ref()
    490             .expect("auth outcome")
    491             .kind,
    492         RadrootsRelayOutcomeKind::AuthRequired
    493     );
    494     assert_eq!(receipt.relay_outcomes[2].relay_url, RELAY_TERTIARY_WSS);
    495     assert_eq!(
    496         receipt.relay_outcomes[2]
    497             .relay_outcome
    498             .as_ref()
    499             .expect("restricted outcome")
    500             .kind,
    501         RadrootsRelayOutcomeKind::Restricted
    502     );
    503     assert_eq!(
    504         receipt.relay_outcomes[3].kind,
    505         RadrootsRelayFetchOutcomeKind::Notice
    506     );
    507     assert!(receipt.relay_outcomes[3].relay_outcome.is_none());
    508     assert_eq!(
    509         receipt.events[0].verification_status.as_deref(),
    510         Some(RadrootsEventVerificationStatus::Verified.as_str())
    511     );
    512     assert!(receipt.events[0].projection_eligible);
    513     assert_eq!(
    514         receipt.events[1].verification_status.as_deref(),
    515         Some(RadrootsEventVerificationStatus::Verified.as_str())
    516     );
    517     assert!(!receipt.events[1].projection_eligible);
    518     assert_eq!(
    519         receipt.events[2].verification_status.as_deref(),
    520         Some(RadrootsEventVerificationStatus::Verified.as_str())
    521     );
    522     assert!(!receipt.events[2].projection_eligible);
    523     assert_eq!(
    524         receipt.events[3].verification_status.as_deref(),
    525         Some(RadrootsEventVerificationStatus::IdMismatch.as_str())
    526     );
    527     assert!(!receipt.events[3].projection_eligible);
    528     assert_eq!(receipt.events[4].verification_status, None);
    529     assert!(!receipt.events[4].projection_eligible);
    530 
    531     let observations = store
    532         .observations_for_event(signed.id.as_str())
    533         .await
    534         .expect("observations");
    535     assert_eq!(observations.len(), 1);
    536     assert_eq!(observations[0].relay_url, RELAY_PRIMARY_WSS);
    537     assert_eq!(observations[0].observation_count, 2);
    538 }
    539 
    540 #[tokio::test]
    541 async fn fetch_event_cap_preserves_later_control_outcomes() {
    542     let first = signed_post("first capped event");
    543     let skipped = signed_post("skipped capped event");
    544     let store = RadrootsEventStore::open_memory().await.expect("store");
    545     let adapter = RadrootsMockRelayFetchAdapter::new(vec![
    546         RadrootsRelayFetchItem::Event {
    547             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    548             raw_json: first.raw_json.clone(),
    549             observed_at_ms: 1_100,
    550         },
    551         RadrootsRelayFetchItem::Event {
    552             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    553             raw_json: skipped.raw_json,
    554             observed_at_ms: 1_101,
    555         },
    556         RadrootsRelayFetchItem::Event {
    557             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    558             raw_json: "{not json".to_owned(),
    559             observed_at_ms: 1_102,
    560         },
    561         RadrootsRelayFetchItem::Event {
    562             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    563             raw_json: unsupported_raw_event(),
    564             observed_at_ms: 1_103,
    565         },
    566         RadrootsRelayFetchItem::Eose {
    567             relay_url: RELAY_PRIMARY_WSS.to_owned(),
    568         },
    569         RadrootsRelayFetchItem::Closed {
    570             relay_url: RELAY_SECONDARY_WSS.to_owned(),
    571             message: "auth-required: challenge".to_owned(),
    572         },
    573         RadrootsRelayFetchItem::Notice {
    574             relay_url: RELAY_TERTIARY_WSS.to_owned(),
    575             message: "notice: still visible".to_owned(),
    576         },
    577     ]);
    578 
    579     let receipt =
    580         fetch_and_ingest_relay_events(&adapter, &store, RadrootsRelayFetchRequest::fetch(1_100, 1))
    581             .await
    582             .expect("fetch ingest");
    583 
    584     assert_eq!(receipt.inserted_count, 1);
    585     assert_eq!(receipt.duplicate_count, 0);
    586     assert_eq!(receipt.unsupported_count, 0);
    587     assert_eq!(receipt.malformed_count, 0);
    588     assert_eq!(receipt.events.len(), 1);
    589     assert_eq!(receipt.eose_count, 1);
    590     assert_eq!(receipt.closed_count, 1);
    591     assert_eq!(receipt.notice_count, 1);
    592     assert_eq!(receipt.relay_outcomes.len(), 3);
    593     assert_eq!(
    594         receipt.relay_outcomes[0].kind,
    595         RadrootsRelayFetchOutcomeKind::Eose
    596     );
    597     assert_eq!(
    598         receipt.relay_outcomes[1]
    599             .relay_outcome
    600             .as_ref()
    601             .expect("closed outcome")
    602             .kind,
    603         RadrootsRelayOutcomeKind::AuthRequired
    604     );
    605     assert_eq!(
    606         receipt.relay_outcomes[2].kind,
    607         RadrootsRelayFetchOutcomeKind::Notice
    608     );
    609 }
    610 
    611 #[tokio::test]
    612 async fn fetch_subscription_mode_and_store_errors_are_reported() {
    613     let signed = signed_post("subscription");
    614     let store = RadrootsEventStore::open_memory().await.expect("store");
    615     let adapter = RadrootsMockRelayFetchAdapter::new(vec![RadrootsRelayFetchItem::Event {
    616         relay_url: RELAY_PRIMARY_WSS.to_owned(),
    617         raw_json: signed.raw_json.clone(),
    618         observed_at_ms: 1_200,
    619     }]);
    620 
    621     let receipt = fetch_and_ingest_relay_events(
    622         &adapter,
    623         &store,
    624         RadrootsRelayFetchRequest::subscription(1_200, 10),
    625     )
    626     .await
    627     .expect("fetch ingest");
    628 
    629     assert_eq!(receipt.inserted_count, 1);
    630     let observations = store
    631         .observations_for_event(signed.id.as_str())
    632         .await
    633         .expect("observations");
    634     assert_eq!(observations.len(), 1);
    635     assert_eq!(observations[0].observation_type, "subscription");
    636 
    637     let closed_store = RadrootsEventStore::open_memory().await.expect("store");
    638     closed_store.pool().close().await;
    639     let adapter = RadrootsMockRelayFetchAdapter::new(vec![RadrootsRelayFetchItem::Event {
    640         relay_url: RELAY_PRIMARY_WSS.to_owned(),
    641         raw_json: signed.raw_json,
    642         observed_at_ms: 1_210,
    643     }]);
    644     let receipt = fetch_and_ingest_relay_events(
    645         &adapter,
    646         &closed_store,
    647         RadrootsRelayFetchRequest::fetch(1_210, 10),
    648     )
    649     .await
    650     .expect("fetch ingest");
    651 
    652     assert_eq!(receipt.inserted_count, 0);
    653     assert_eq!(receipt.malformed_count, 1);
    654     assert!(receipt.events[0].malformed);
    655     assert!(receipt.events[0].message.is_some());
    656 }
    657 
    658 #[tokio::test]
    659 async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() {
    660     let signed = signed_post("hello");
    661     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
    662     let store = RadrootsEventStore::open_memory().await.expect("store");
    663     let draft = RadrootsFrozenEventDraft::new(
    664         "radroots.social.post.v1",
    665         KIND_POST,
    666         signed.created_at,
    667         signed.tags.clone(),
    668         signed.content.clone(),
    669         signed.pubkey.as_str(),
    670     )
    671     .expect("draft");
    672     let receipt = outbox
    673         .enqueue_operation(RadrootsOutboxOperationInput::new(
    674             "publish_post",
    675             draft,
    676             vec![
    677                 RELAY_PRIMARY_WSS.to_owned(),
    678                 RELAY_SECONDARY_WSS.to_owned(),
    679                 RELAY_TERTIARY_WSS.to_owned(),
    680             ],
    681             1_000,
    682         ))
    683         .await
    684         .expect("enqueue");
    685     let claimed = outbox
    686         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
    687         .await
    688         .expect("claim")
    689         .expect("claim");
    690     let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await;
    691     outbox.recover_expired_claims(2_001).await.expect("recover");
    692     let publish_claim = outbox
    693         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
    694         .await
    695         .expect("claim")
    696         .expect("publish claim");
    697     assert_eq!(publish_claim.state, RadrootsOutboxEventState::Publishing);
    698 
    699     let adapter = RadrootsMockRelayPublishAdapter::new()
    700         .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted())
    701         .with_outcome(
    702             RELAY_SECONDARY_WSS,
    703             RadrootsRelayOutcome::timeout("timeout: no OK"),
    704         )
    705         .with_outcome(
    706             RELAY_TERTIARY_WSS,
    707             RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"),
    708         );
    709     let first = publish_claimed_outbox_event(
    710         &outbox,
    711         &store,
    712         &adapter,
    713         &publish_claim,
    714         RadrootsOutboxPublishPolicy::new(2_500),
    715         2_200,
    716     )
    717     .await
    718     .expect("publish");
    719 
    720     assert_eq!(first.publish.attempted_count, 3);
    721     assert_eq!(first.publish.accepted_count, 2);
    722     assert!(!first.publish.quorum_met);
    723     let event = outbox
    724         .get_event(receipt.outbox_event_id)
    725         .await
    726         .expect("event")
    727         .expect("event");
    728     assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
    729     assert_eq!(event.accepted_quorum, 3);
    730 
    731     let statuses = outbox
    732         .relay_statuses(receipt.outbox_event_id)
    733         .await
    734         .expect("statuses");
    735     assert_eq!(
    736         statuses
    737             .iter()
    738             .find(|status| status.relay_url == RELAY_PRIMARY_WSS)
    739             .expect("primary")
    740             .status,
    741         RadrootsOutboxRelayStatus::Accepted
    742     );
    743     assert_eq!(
    744         statuses
    745             .iter()
    746             .find(|status| status.relay_url == RELAY_SECONDARY_WSS)
    747             .expect("secondary")
    748             .status,
    749         RadrootsOutboxRelayStatus::FailedRetryable
    750     );
    751     assert_eq!(
    752         statuses
    753             .iter()
    754             .find(|status| status.relay_url == RELAY_TERTIARY_WSS)
    755             .expect("tertiary")
    756             .status,
    757         RadrootsOutboxRelayStatus::Accepted
    758     );
    759 
    760     let retry_claim = outbox
    761         .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500)
    762         .await
    763         .expect("claim")
    764         .expect("retry claim");
    765     let retry_adapter = RadrootsMockRelayPublishAdapter::new()
    766         .with_outcome(RELAY_SECONDARY_WSS, RadrootsRelayOutcome::accepted());
    767     let second = publish_claimed_outbox_event(
    768         &outbox,
    769         &store,
    770         &retry_adapter,
    771         &retry_claim,
    772         RadrootsOutboxPublishPolicy::new(3_000),
    773         2_600,
    774     )
    775     .await
    776     .expect("retry publish");
    777 
    778     assert_eq!(second.local_ingest.event_id, signed.id);
    779     assert_eq!(second.publish.attempted_count, 1);
    780     assert_eq!(retry_adapter.captured_raw_events().len(), 1);
    781 
    782     let event = outbox
    783         .get_event(receipt.outbox_event_id)
    784         .await
    785         .expect("event")
    786         .expect("event");
    787     assert_eq!(event.state, RadrootsOutboxEventState::Published);
    788     assert_eq!(event.accepted_quorum, 3);
    789     let operation = outbox
    790         .get_operation(receipt.operation_id)
    791         .await
    792         .expect("operation")
    793         .expect("operation");
    794     assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete);
    795 
    796     let observations = store
    797         .observations_for_event(signed.id.as_str())
    798         .await
    799         .expect("observations");
    800     assert_eq!(observations.len(), 3);
    801 }
    802 
    803 #[tokio::test]
    804 async fn outbox_publish_transport_failure_releases_retryable_claim() {
    805     let signed = signed_post("adapter transport failure");
    806     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
    807     let store = RadrootsEventStore::open_memory().await.expect("store");
    808     let draft = RadrootsFrozenEventDraft::new(
    809         "radroots.social.post.v1",
    810         KIND_POST,
    811         signed.created_at,
    812         signed.tags.clone(),
    813         signed.content.clone(),
    814         signed.pubkey.as_str(),
    815     )
    816     .expect("draft");
    817     let receipt = outbox
    818         .enqueue_operation(RadrootsOutboxOperationInput::new(
    819             "publish_post",
    820             draft,
    821             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
    822             1_000,
    823         ))
    824         .await
    825         .expect("enqueue");
    826     let claimed = outbox
    827         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
    828         .await
    829         .expect("claim")
    830         .expect("claim");
    831     complete_claimed_signing(&outbox, &claimed, 1_100).await;
    832     outbox.recover_expired_claims(2_001).await.expect("recover");
    833     let publish_claim = outbox
    834         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
    835         .await
    836         .expect("claim")
    837         .expect("publish claim");
    838 
    839     let published = publish_claimed_outbox_event(
    840         &outbox,
    841         &store,
    842         &TransportFailurePublishAdapter,
    843         &publish_claim,
    844         RadrootsOutboxPublishPolicy::new(2_500),
    845         2_200,
    846     )
    847     .await
    848     .expect("publish");
    849 
    850     assert_eq!(published.publish.attempted_count, 2);
    851     assert_eq!(published.publish.accepted_count, 0);
    852     assert_eq!(published.publish.retryable_count, 2);
    853     assert_eq!(published.publish.terminal_count, 0);
    854     assert!(!published.publish.quorum_met);
    855     assert!(
    856         published
    857             .publish
    858             .relays
    859             .iter()
    860             .all(|relay| relay.outcome.kind == RadrootsRelayOutcomeKind::ConnectionFailed)
    861     );
    862 
    863     let event = outbox
    864         .get_event(receipt.outbox_event_id)
    865         .await
    866         .expect("event")
    867         .expect("event");
    868     assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
    869     assert!(event.claim_token.is_none());
    870     assert_eq!(event.next_attempt_after_ms, 2_500);
    871 
    872     let statuses = outbox
    873         .relay_statuses(receipt.outbox_event_id)
    874         .await
    875         .expect("statuses");
    876     assert_eq!(statuses.len(), 2);
    877     assert!(
    878         statuses
    879             .iter()
    880             .all(|status| status.status == RadrootsOutboxRelayStatus::FailedRetryable)
    881     );
    882     assert!(
    883         outbox
    884             .claim_next_ready_event("publisher", "publish-b", 4_000, 2_499)
    885             .await
    886             .expect("early claim")
    887             .is_none()
    888     );
    889     let retry_claim = outbox
    890         .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500)
    891         .await
    892         .expect("retry claim")
    893         .expect("retry claim");
    894     assert_eq!(retry_claim.outbox_event_id, receipt.outbox_event_id);
    895     assert_eq!(retry_claim.state, RadrootsOutboxEventState::Publishing);
    896 }
    897 
    898 #[tokio::test]
    899 async fn outbox_publish_marks_published_without_adapter_when_all_relays_already_accepted() {
    900     let signed = signed_post("already accepted");
    901     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
    902     let store = RadrootsEventStore::open_memory().await.expect("store");
    903     let draft = RadrootsFrozenEventDraft::new(
    904         "radroots.social.post.v1",
    905         KIND_POST,
    906         signed.created_at,
    907         signed.tags.clone(),
    908         signed.content.clone(),
    909         signed.pubkey.as_str(),
    910     )
    911     .expect("draft");
    912     let receipt = outbox
    913         .enqueue_operation(RadrootsOutboxOperationInput::new(
    914             "publish_post",
    915             draft,
    916             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
    917             1_000,
    918         ))
    919         .await
    920         .expect("enqueue");
    921     let claimed = outbox
    922         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
    923         .await
    924         .expect("claim")
    925         .expect("claim");
    926     let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await;
    927     outbox.recover_expired_claims(2_001).await.expect("recover");
    928     let publish_claim = outbox
    929         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
    930         .await
    931         .expect("claim")
    932         .expect("publish claim");
    933     outbox
    934         .mark_relay_accepted(
    935             publish_claim.outbox_event_id,
    936             publish_claim.claim_token.as_str(),
    937             RELAY_PRIMARY_WSS,
    938             2_150,
    939         )
    940         .await
    941         .expect("primary accepted");
    942     outbox
    943         .mark_relay_accepted(
    944             publish_claim.outbox_event_id,
    945             publish_claim.claim_token.as_str(),
    946             RELAY_SECONDARY_WSS,
    947             2_151,
    948         )
    949         .await
    950         .expect("secondary accepted");
    951 
    952     let adapter = RadrootsMockRelayPublishAdapter::new();
    953     let published = publish_claimed_outbox_event(
    954         &outbox,
    955         &store,
    956         &adapter,
    957         &publish_claim,
    958         RadrootsOutboxPublishPolicy::new(2_500),
    959         2_200,
    960     )
    961     .await
    962     .expect("publish");
    963 
    964     assert_eq!(published.local_ingest.event_id, signed.id);
    965     assert_eq!(published.publish.event_id, signed.id);
    966     assert_eq!(published.publish.attempted_count, 0);
    967     assert_eq!(published.publish.accepted_count, 2);
    968     assert_eq!(published.publish.quorum, 2);
    969     assert!(published.publish.quorum_met);
    970     assert!(published.publish.relays.is_empty());
    971     assert!(adapter.captured_raw_events().is_empty());
    972 
    973     let event = outbox
    974         .get_event(receipt.outbox_event_id)
    975         .await
    976         .expect("event")
    977         .expect("event");
    978     assert_eq!(event.state, RadrootsOutboxEventState::Published);
    979     assert_eq!(event.accepted_quorum, 2);
    980     assert!(event.claim_token.is_none());
    981     let operation = outbox
    982         .get_operation(receipt.operation_id)
    983         .await
    984         .expect("operation")
    985         .expect("operation");
    986     assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete);
    987 }
    988 
    989 #[tokio::test]
    990 async fn outbox_publish_uses_persisted_accepted_count_for_explicit_quorum() {
    991     let signed = signed_post("explicit quorum already accepted");
    992     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
    993     let store = RadrootsEventStore::open_memory().await.expect("store");
    994     let draft = RadrootsFrozenEventDraft::new(
    995         "radroots.social.post.v1",
    996         KIND_POST,
    997         signed.created_at,
    998         signed.tags.clone(),
    999         signed.content.clone(),
   1000         signed.pubkey.as_str(),
   1001     )
   1002     .expect("draft");
   1003     let receipt = outbox
   1004         .enqueue_operation(RadrootsOutboxOperationInput::new(
   1005             "publish_post",
   1006             draft,
   1007             vec![
   1008                 RELAY_PRIMARY_WSS.to_owned(),
   1009                 RELAY_SECONDARY_WSS.to_owned(),
   1010                 RELAY_TERTIARY_WSS.to_owned(),
   1011             ],
   1012             1_000,
   1013         ))
   1014         .await
   1015         .expect("enqueue");
   1016     let claimed = outbox
   1017         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   1018         .await
   1019         .expect("claim")
   1020         .expect("claim");
   1021     complete_claimed_signing(&outbox, &claimed, 1_100).await;
   1022     outbox.recover_expired_claims(2_001).await.expect("recover");
   1023     let publish_claim = outbox
   1024         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
   1025         .await
   1026         .expect("claim")
   1027         .expect("publish claim");
   1028     outbox
   1029         .mark_relay_accepted(
   1030             publish_claim.outbox_event_id,
   1031             publish_claim.claim_token.as_str(),
   1032             RELAY_PRIMARY_WSS,
   1033             2_150,
   1034         )
   1035         .await
   1036         .expect("primary accepted");
   1037     outbox
   1038         .mark_relay_accepted(
   1039             publish_claim.outbox_event_id,
   1040             publish_claim.claim_token.as_str(),
   1041             RELAY_SECONDARY_WSS,
   1042             2_151,
   1043         )
   1044         .await
   1045         .expect("secondary accepted");
   1046 
   1047     let adapter = RadrootsMockRelayPublishAdapter::new();
   1048     let published = publish_claimed_outbox_event(
   1049         &outbox,
   1050         &store,
   1051         &adapter,
   1052         &publish_claim,
   1053         RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2),
   1054         2_200,
   1055     )
   1056     .await
   1057     .expect("publish");
   1058 
   1059     assert_eq!(published.publish.attempted_count, 0);
   1060     assert_eq!(published.publish.accepted_count, 2);
   1061     assert_eq!(published.publish.quorum, 2);
   1062     assert!(published.publish.quorum_met);
   1063     assert!(adapter.captured_raw_events().is_empty());
   1064 
   1065     let event = outbox
   1066         .get_event(receipt.outbox_event_id)
   1067         .await
   1068         .expect("event")
   1069         .expect("event");
   1070     assert_eq!(event.state, RadrootsOutboxEventState::Published);
   1071     assert_eq!(event.accepted_quorum, 2);
   1072     let statuses = outbox
   1073         .relay_statuses(receipt.outbox_event_id)
   1074         .await
   1075         .expect("statuses");
   1076     assert_eq!(
   1077         statuses
   1078             .iter()
   1079             .find(|status| status.relay_url == RELAY_TERTIARY_WSS)
   1080             .expect("tertiary")
   1081             .status,
   1082         RadrootsOutboxRelayStatus::Pending
   1083     );
   1084 }
   1085 
   1086 #[tokio::test]
   1087 async fn outbox_publish_marks_published_when_policy_quorum_is_met_with_failure_diagnostics() {
   1088     let signed = signed_post("quorum");
   1089     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
   1090     let store = RadrootsEventStore::open_memory().await.expect("store");
   1091     let draft = RadrootsFrozenEventDraft::new(
   1092         "radroots.social.post.v1",
   1093         KIND_POST,
   1094         signed.created_at,
   1095         signed.tags.clone(),
   1096         signed.content.clone(),
   1097         signed.pubkey.as_str(),
   1098     )
   1099     .expect("draft");
   1100     let receipt = outbox
   1101         .enqueue_operation(RadrootsOutboxOperationInput::new(
   1102             "publish_post",
   1103             draft,
   1104             vec![
   1105                 RELAY_PRIMARY_WSS.to_owned(),
   1106                 RELAY_SECONDARY_WSS.to_owned(),
   1107                 RELAY_TERTIARY_WSS.to_owned(),
   1108             ],
   1109             1_000,
   1110         ))
   1111         .await
   1112         .expect("enqueue");
   1113     let claimed = outbox
   1114         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   1115         .await
   1116         .expect("claim")
   1117         .expect("claim");
   1118     let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await;
   1119     outbox.recover_expired_claims(2_001).await.expect("recover");
   1120     let publish_claim = outbox
   1121         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
   1122         .await
   1123         .expect("claim")
   1124         .expect("publish claim");
   1125 
   1126     let adapter = RadrootsMockRelayPublishAdapter::new()
   1127         .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted())
   1128         .with_outcome(
   1129             RELAY_SECONDARY_WSS,
   1130             RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"),
   1131         )
   1132         .with_outcome(
   1133             RELAY_TERTIARY_WSS,
   1134             RadrootsRelayOutcome::classify("restricted: group write denied"),
   1135         );
   1136     let published = publish_claimed_outbox_event(
   1137         &outbox,
   1138         &store,
   1139         &adapter,
   1140         &publish_claim,
   1141         RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2),
   1142         2_200,
   1143     )
   1144     .await
   1145     .expect("publish");
   1146 
   1147     assert_eq!(published.publish.quorum, 2);
   1148     assert_eq!(published.publish.accepted_count, 2);
   1149     assert_eq!(published.publish.terminal_count, 1);
   1150     assert!(published.publish.quorum_met);
   1151 
   1152     let event = outbox
   1153         .get_event(receipt.outbox_event_id)
   1154         .await
   1155         .expect("event")
   1156         .expect("event");
   1157     assert_eq!(event.state, RadrootsOutboxEventState::Published);
   1158     assert_eq!(event.accepted_quorum, 2);
   1159     assert!(event.claim_token.is_none());
   1160     let operation = outbox
   1161         .get_operation(receipt.operation_id)
   1162         .await
   1163         .expect("operation")
   1164         .expect("operation");
   1165     assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete);
   1166 
   1167     let statuses = outbox
   1168         .relay_statuses(receipt.outbox_event_id)
   1169         .await
   1170         .expect("statuses");
   1171     assert_eq!(
   1172         statuses
   1173             .iter()
   1174             .find(|status| status.relay_url == RELAY_TERTIARY_WSS)
   1175             .expect("tertiary")
   1176             .status,
   1177         RadrootsOutboxRelayStatus::FailedTerminal
   1178     );
   1179     assert!(
   1180         outbox
   1181             .claim_next_ready_event("publisher", "publish-b", 4_000, 2_300)
   1182             .await
   1183             .expect("claim")
   1184             .is_none()
   1185     );
   1186 
   1187     let observations = store
   1188         .observations_for_event(signed.id.as_str())
   1189         .await
   1190         .expect("observations");
   1191     assert_eq!(observations.len(), 2);
   1192 }
   1193 
   1194 #[tokio::test]
   1195 async fn outbox_publish_republishes_accepted_relays_when_policy_requests_it() {
   1196     let signed = signed_post("republish accepted");
   1197     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
   1198     let store = RadrootsEventStore::open_memory().await.expect("store");
   1199     let draft = RadrootsFrozenEventDraft::new(
   1200         "radroots.social.post.v1",
   1201         KIND_POST,
   1202         signed.created_at,
   1203         signed.tags.clone(),
   1204         signed.content.clone(),
   1205         signed.pubkey.as_str(),
   1206     )
   1207     .expect("draft");
   1208     let receipt = outbox
   1209         .enqueue_operation(RadrootsOutboxOperationInput::new(
   1210             "publish_post",
   1211             draft,
   1212             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
   1213             1_000,
   1214         ))
   1215         .await
   1216         .expect("enqueue");
   1217     let claimed = outbox
   1218         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   1219         .await
   1220         .expect("claim")
   1221         .expect("claim");
   1222     let signed = complete_claimed_signing(&outbox, &claimed, 1_100).await;
   1223     outbox.recover_expired_claims(2_001).await.expect("recover");
   1224     let publish_claim = outbox
   1225         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
   1226         .await
   1227         .expect("claim")
   1228         .expect("publish claim");
   1229     outbox
   1230         .mark_relay_accepted(
   1231             publish_claim.outbox_event_id,
   1232             publish_claim.claim_token.as_str(),
   1233             RELAY_PRIMARY_WSS,
   1234             2_150,
   1235         )
   1236         .await
   1237         .expect("primary accepted");
   1238 
   1239     let adapter = RadrootsMockRelayPublishAdapter::new()
   1240         .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted())
   1241         .with_outcome(RELAY_SECONDARY_WSS, RadrootsRelayOutcome::accepted());
   1242     let published = publish_claimed_outbox_event(
   1243         &outbox,
   1244         &store,
   1245         &adapter,
   1246         &publish_claim,
   1247         RadrootsOutboxPublishPolicy::new(2_500)
   1248             .republish_accepted_relays(true)
   1249             .relay_url_policy(RadrootsRelayUrlPolicy::Public),
   1250         2_200,
   1251     )
   1252     .await
   1253     .expect("publish");
   1254 
   1255     assert_eq!(published.local_ingest.event_id, signed.id);
   1256     assert_eq!(published.publish.attempted_count, 2);
   1257     assert_eq!(published.publish.accepted_count, 2);
   1258     assert_eq!(published.publish.quorum, 1);
   1259     assert!(published.publish.quorum_met);
   1260     assert_eq!(adapter.captured_raw_events().len(), 1);
   1261 
   1262     let event = outbox
   1263         .get_event(receipt.outbox_event_id)
   1264         .await
   1265         .expect("event")
   1266         .expect("event");
   1267     assert_eq!(event.state, RadrootsOutboxEventState::Published);
   1268     let statuses = outbox
   1269         .relay_statuses(receipt.outbox_event_id)
   1270         .await
   1271         .expect("statuses");
   1272     assert!(
   1273         statuses
   1274             .iter()
   1275             .all(|status| status.status == RadrootsOutboxRelayStatus::Accepted)
   1276     );
   1277 }
   1278 
   1279 #[tokio::test]
   1280 async fn outbox_publish_requires_claimed_signed_event() {
   1281     let signed = signed_post("missing signature");
   1282     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
   1283     let store = RadrootsEventStore::open_memory().await.expect("store");
   1284     let draft = RadrootsFrozenEventDraft::new(
   1285         "radroots.social.post.v1",
   1286         KIND_POST,
   1287         signed.created_at,
   1288         signed.tags,
   1289         signed.content,
   1290         signed.pubkey.as_str(),
   1291     )
   1292     .expect("draft");
   1293     let receipt = outbox
   1294         .enqueue_operation(RadrootsOutboxOperationInput::new(
   1295             "publish_post",
   1296             draft,
   1297             vec![RELAY_PRIMARY_WSS.to_owned()],
   1298             1_000,
   1299         ))
   1300         .await
   1301         .expect("enqueue");
   1302     let claimed = outbox
   1303         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   1304         .await
   1305         .expect("claim")
   1306         .expect("claim");
   1307     let adapter = RadrootsMockRelayPublishAdapter::new();
   1308 
   1309     let error = publish_claimed_outbox_event(
   1310         &outbox,
   1311         &store,
   1312         &adapter,
   1313         &claimed,
   1314         RadrootsOutboxPublishPolicy::new(2_500),
   1315         1_100,
   1316     )
   1317     .await
   1318     .expect_err("missing signed event");
   1319 
   1320     assert!(matches!(
   1321         error,
   1322         RadrootsRelayTransportError::MissingSignedOutboxEvent(event_id)
   1323             if event_id == receipt.outbox_event_id
   1324     ));
   1325     assert!(adapter.captured_raw_events().is_empty());
   1326 }
   1327 
   1328 #[tokio::test]
   1329 async fn outbox_publish_propagates_non_transport_adapter_errors_after_target_filtering() {
   1330     let signed = signed_post("adapter non transport failure");
   1331     let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
   1332     let store = RadrootsEventStore::open_memory().await.expect("store");
   1333     let draft = RadrootsFrozenEventDraft::new(
   1334         "radroots.social.post.v1",
   1335         KIND_POST,
   1336         signed.created_at,
   1337         signed.tags,
   1338         signed.content,
   1339         signed.pubkey.as_str(),
   1340     )
   1341     .expect("draft");
   1342     let receipt = outbox
   1343         .enqueue_operation(RadrootsOutboxOperationInput::new(
   1344             "publish_post",
   1345             draft,
   1346             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
   1347             1_000,
   1348         ))
   1349         .await
   1350         .expect("enqueue");
   1351     let claimed = outbox
   1352         .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   1353         .await
   1354         .expect("claim")
   1355         .expect("claim");
   1356     complete_claimed_signing(&outbox, &claimed, 1_100).await;
   1357     outbox.recover_expired_claims(2_001).await.expect("recover");
   1358     let mut publish_claim = outbox
   1359         .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
   1360         .await
   1361         .expect("claim")
   1362         .expect("publish claim");
   1363     publish_claim.target_relays = vec![RELAY_PRIMARY_WSS.to_owned()];
   1364 
   1365     let error = publish_claimed_outbox_event(
   1366         &outbox,
   1367         &store,
   1368         &NostrJsonFailurePublishAdapter,
   1369         &publish_claim,
   1370         RadrootsOutboxPublishPolicy::new(2_500),
   1371         2_200,
   1372     )
   1373     .await
   1374     .expect_err("adapter error");
   1375 
   1376     assert!(matches!(
   1377         error,
   1378         RadrootsRelayTransportError::NostrEventJson(_)
   1379     ));
   1380     let event = outbox
   1381         .get_event(receipt.outbox_event_id)
   1382         .await
   1383         .expect("event")
   1384         .expect("event");
   1385     assert_eq!(event.accepted_quorum, 1);
   1386 }
   1387 
   1388 #[tokio::test]
   1389 async fn smoke_relay_fetch_processes_one_thousand_event_receipts() {
   1390     let store = RadrootsEventStore::open_memory().await.expect("store");
   1391     let mut items = Vec::new();
   1392     for index in 0..1_000 {
   1393         let signed = signed_post(format!("fetch-smoke-{index}").as_str());
   1394         let relay_url = match index % 3 {
   1395             0 => RELAY_PRIMARY_WSS,
   1396             1 => RELAY_SECONDARY_WSS,
   1397             _ => RELAY_TERTIARY_WSS,
   1398         };
   1399         items.push(RadrootsRelayFetchItem::Event {
   1400             relay_url: relay_url.to_owned(),
   1401             raw_json: signed.raw_json,
   1402             observed_at_ms: 10_000 + index,
   1403         });
   1404     }
   1405     let adapter = RadrootsMockRelayFetchAdapter::new(items);
   1406     let receipt = fetch_and_ingest_relay_events(
   1407         &adapter,
   1408         &store,
   1409         RadrootsRelayFetchRequest::fetch(10_000, 1_000),
   1410     )
   1411     .await
   1412     .expect("fetch");
   1413 
   1414     assert_eq!(receipt.inserted_count, 1_000);
   1415     assert_eq!(receipt.duplicate_count, 0);
   1416     assert_eq!(receipt.malformed_count, 0);
   1417     assert_eq!(receipt.unsupported_count, 0);
   1418     assert_eq!(receipt.events.len(), 1_000);
   1419     assert!(receipt.events.iter().all(|event| event.projection_eligible));
   1420     let replay = store
   1421         .events_since_cursor("fetch-smoke", 1_000)
   1422         .await
   1423         .expect("replay");
   1424     assert_eq!(replay.len(), 1_000);
   1425 }