lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

store.rs (109822B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::RadrootsOutboxError;
      4 use crate::migrations::{OUTBOX_MIGRATION_DOWN, OUTBOX_MIGRATION_UP};
      5 use crate::model::{
      6     RadrootsOutboxClaimedEvent, RadrootsOutboxEnqueueReceipt, RadrootsOutboxEnqueueStatus,
      7     RadrootsOutboxEventRecord, RadrootsOutboxEventState, RadrootsOutboxEventStoreIngestReceipt,
      8     RadrootsOutboxOperationInput, RadrootsOutboxOperationRecord, RadrootsOutboxOperationStatus,
      9     RadrootsOutboxRelayStatus, RadrootsOutboxRelayStatusRecord, RadrootsOutboxSignedOperationInput,
     10     RadrootsOutboxStatusSummary,
     11 };
     12 use radroots_event_store::{RadrootsEventIngest, RadrootsEventStore};
     13 use radroots_events::RadrootsNostrEvent;
     14 use radroots_events::draft::{
     15     RadrootsFrozenEventDraft, RadrootsSignedNostrEvent, validate_signed_nostr_event_matches_draft,
     16 };
     17 use serde::Serialize;
     18 use sha2::{Digest, Sha256};
     19 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteQueryResult};
     20 use sqlx::{Row, SqlitePool};
     21 use std::path::Path;
     22 use std::str::FromStr;
     23 
     24 #[derive(Clone)]
     25 pub struct RadrootsOutbox {
     26     pool: SqlitePool,
     27 }
     28 
     29 impl RadrootsOutbox {
     30     pub async fn open_memory() -> Result<Self, RadrootsOutboxError> {
     31         let options = SqliteConnectOptions::from_str("sqlite::memory:")?;
     32         let pool = SqlitePoolOptions::new()
     33             .max_connections(1)
     34             .connect_with(options)
     35             .await?;
     36         configure_connection(&pool, false).await?;
     37         apply_up(&pool).await?;
     38         Ok(Self { pool })
     39     }
     40 
     41     pub async fn open_file(path: impl AsRef<Path>) -> Result<Self, RadrootsOutboxError> {
     42         let options = SqliteConnectOptions::new()
     43             .filename(path)
     44             .create_if_missing(true);
     45         let pool = SqlitePoolOptions::new()
     46             .max_connections(1)
     47             .connect_with(options)
     48             .await?;
     49         configure_connection(&pool, true).await?;
     50         apply_up(&pool).await?;
     51         Ok(Self { pool })
     52     }
     53 
     54     pub fn pool(&self) -> &SqlitePool {
     55         &self.pool
     56     }
     57 
     58     pub async fn migrate_down(&self) -> Result<(), RadrootsOutboxError> {
     59         apply_down(&self.pool).await
     60     }
     61 
     62     pub async fn pragma_foreign_keys(&self) -> Result<i64, RadrootsOutboxError> {
     63         query_i64(&self.pool, "PRAGMA foreign_keys").await
     64     }
     65 
     66     pub async fn pragma_busy_timeout(&self) -> Result<i64, RadrootsOutboxError> {
     67         query_i64(&self.pool, "PRAGMA busy_timeout").await
     68     }
     69 
     70     pub async fn pragma_journal_mode(&self) -> Result<String, RadrootsOutboxError> {
     71         query_string(&self.pool, "PRAGMA journal_mode").await
     72     }
     73 
     74     pub async fn status_summary(
     75         &self,
     76         now_ms: i64,
     77     ) -> Result<RadrootsOutboxStatusSummary, RadrootsOutboxError> {
     78         let row = sqlx::query(
     79             "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN state IN ('draft_queued', 'signing', 'signed', 'publishing') THEN 1 ELSE 0 END), 0) AS pending_events, COALESCE(SUM(CASE WHEN state IN ('sign_retryable', 'publish_retryable') THEN 1 ELSE 0 END), 0) AS retryable_events, COALESCE(SUM(CASE WHEN state IN ('published', 'failed_terminal', 'cancelled') THEN 1 ELSE 0 END), 0) AS terminal_events, COALESCE(SUM(CASE WHEN state = 'failed_terminal' THEN 1 ELSE 0 END), 0) AS failed_terminal_events, COALESCE(SUM(CASE WHEN state = 'publishing' THEN 1 ELSE 0 END), 0) AS publishing_events FROM outbox_event",
     80         )
     81         .fetch_one(&self.pool)
     82         .await?;
     83         let ready_signed_events = sqlx::query(
     84             "SELECT COUNT(*) FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?)",
     85         )
     86         .bind(now_ms)
     87         .bind(now_ms)
     88         .fetch_one(&self.pool)
     89         .await?
     90         .try_get(0)?;
     91         let last_attempt_at_ms =
     92             sqlx::query("SELECT MAX(last_attempt_at_ms) FROM outbox_event_relay_status")
     93                 .fetch_one(&self.pool)
     94                 .await?
     95                 .try_get(0)?;
     96         let last_error = sqlx::query(
     97             "SELECT last_error FROM outbox_event WHERE last_error IS NOT NULL ORDER BY updated_at_ms DESC, outbox_event_id DESC LIMIT 1",
     98         )
     99         .fetch_optional(&self.pool)
    100         .await?
    101         .map(|row| row.try_get("last_error"))
    102         .transpose()?;
    103         Ok(RadrootsOutboxStatusSummary {
    104             total_events: row.try_get("total_events")?,
    105             pending_events: row.try_get("pending_events")?,
    106             retryable_events: row.try_get("retryable_events")?,
    107             terminal_events: row.try_get("terminal_events")?,
    108             failed_terminal_events: row.try_get("failed_terminal_events")?,
    109             ready_signed_events,
    110             publishing_events: row.try_get("publishing_events")?,
    111             last_attempt_at_ms,
    112             last_error,
    113         })
    114     }
    115 
    116     pub async fn enqueue_operation(
    117         &self,
    118         input: RadrootsOutboxOperationInput,
    119     ) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> {
    120         let target_relays = ordered_unique_relays(input.target_relays);
    121         if target_relays.is_empty() && !input.allow_empty_target_relays {
    122             return Err(RadrootsOutboxError::EmptyTargetRelays);
    123         }
    124         let digest_relays = digest_relays(target_relays.as_slice());
    125         let digest = idempotency_digest(
    126             input.operation_kind.as_str(),
    127             input.draft.expected_pubkey.as_str(),
    128             &input.draft,
    129             &digest_relays,
    130         )?;
    131         let accepted_quorum = target_relays.len() as i64;
    132         let mut tx = self.pool.begin().await?;
    133 
    134         if let Some(idempotency_key) = input.idempotency_key.as_deref()
    135             && let Some(existing) = existing_idempotent_operation(
    136                 &mut tx,
    137                 input.operation_kind.as_str(),
    138                 input.draft.expected_pubkey.as_str(),
    139                 idempotency_key,
    140             )
    141             .await?
    142         {
    143             if existing.idempotency_digest != digest {
    144                 return Err(RadrootsOutboxError::IdempotencyConflict {
    145                     operation_kind: input.operation_kind,
    146                     expected_pubkey: input.draft.expected_pubkey,
    147                     idempotency_key: idempotency_key.to_owned(),
    148                     existing_digest: existing.idempotency_digest,
    149                     new_digest: digest,
    150                 });
    151             }
    152             tx.commit().await?;
    153             return Ok(RadrootsOutboxEnqueueReceipt {
    154                 status: RadrootsOutboxEnqueueStatus::Existing,
    155                 operation_id: existing.operation_id,
    156                 outbox_event_id: existing.outbox_event_id,
    157                 expected_event_id: existing.event_id,
    158                 idempotency_digest: digest,
    159             });
    160         }
    161 
    162         let operation = sqlx::query(
    163             "INSERT INTO outbox_operation(operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?)",
    164         )
    165         .bind(input.operation_kind.as_str())
    166         .bind(input.draft.expected_pubkey.as_str())
    167         .bind(input.idempotency_key.as_deref())
    168         .bind(digest.as_str())
    169         .bind(RadrootsOutboxOperationStatus::Queued.as_str())
    170         .bind(input.created_at_ms)
    171         .bind(input.created_at_ms)
    172         .execute(&mut *tx)
    173         .await?;
    174         let operation_id = operation.last_insert_rowid();
    175         let draft_json = serde_json::to_string(&input.draft)?;
    176         let event = sqlx::query(
    177             "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, state, accepted_quorum, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, 0, ?, 0, 0, ?, ?)",
    178         )
    179         .bind(operation_id)
    180         .bind(input.draft.expected_event_id.as_str())
    181         .bind(input.draft.expected_pubkey.as_str())
    182         .bind(draft_json.as_str())
    183         .bind(RadrootsOutboxEventState::DraftQueued.as_str())
    184         .bind(accepted_quorum)
    185         .bind(input.created_at_ms)
    186         .bind(input.created_at_ms)
    187         .bind(input.created_at_ms)
    188         .execute(&mut *tx)
    189         .await?;
    190         let outbox_event_id = event.last_insert_rowid();
    191 
    192         for relay_url in target_relays {
    193             sqlx::query(
    194                 "INSERT INTO outbox_event_relay_status(outbox_event_id, relay_url, status, attempt_count) VALUES (?, ?, ?, 0)",
    195             )
    196             .bind(outbox_event_id)
    197             .bind(relay_url.as_str())
    198             .bind(RadrootsOutboxRelayStatus::Pending.as_str())
    199             .execute(&mut *tx)
    200             .await?;
    201         }
    202 
    203         tx.commit().await?;
    204         Ok(RadrootsOutboxEnqueueReceipt {
    205             status: RadrootsOutboxEnqueueStatus::Inserted,
    206             operation_id,
    207             outbox_event_id,
    208             expected_event_id: input.draft.expected_event_id,
    209             idempotency_digest: digest,
    210         })
    211     }
    212 
    213     pub async fn enqueue_signed_operation(
    214         &self,
    215         input: RadrootsOutboxSignedOperationInput,
    216     ) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> {
    217         validate_signed_nostr_event_matches_draft(&input.signed_event, &input.draft)?;
    218         let target_relays = ordered_unique_relays(input.target_relays);
    219         if target_relays.is_empty() && !input.allow_empty_target_relays {
    220             return Err(RadrootsOutboxError::EmptyTargetRelays);
    221         }
    222         let digest_relays = digest_relays(target_relays.as_slice());
    223         let digest = idempotency_digest(
    224             input.operation_kind.as_str(),
    225             input.draft.expected_pubkey.as_str(),
    226             &input.draft,
    227             &digest_relays,
    228         )?;
    229         let accepted_quorum = target_relays.len() as i64;
    230         let mut tx = self.pool.begin().await?;
    231 
    232         if let Some(idempotency_key) = input.idempotency_key.as_deref()
    233             && let Some(existing) = existing_idempotent_operation(
    234                 &mut tx,
    235                 input.operation_kind.as_str(),
    236                 input.draft.expected_pubkey.as_str(),
    237                 idempotency_key,
    238             )
    239             .await?
    240         {
    241             if existing.idempotency_digest != digest {
    242                 return Err(RadrootsOutboxError::IdempotencyConflict {
    243                     operation_kind: input.operation_kind,
    244                     expected_pubkey: input.draft.expected_pubkey,
    245                     idempotency_key: idempotency_key.to_owned(),
    246                     existing_digest: existing.idempotency_digest,
    247                     new_digest: digest,
    248                 });
    249             }
    250             tx.commit().await?;
    251             return Ok(RadrootsOutboxEnqueueReceipt {
    252                 status: RadrootsOutboxEnqueueStatus::Existing,
    253                 operation_id: existing.operation_id,
    254                 outbox_event_id: existing.outbox_event_id,
    255                 expected_event_id: existing.event_id,
    256                 idempotency_digest: digest,
    257             });
    258         }
    259 
    260         let operation = sqlx::query(
    261             "INSERT INTO outbox_operation(operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?)",
    262         )
    263         .bind(input.operation_kind.as_str())
    264         .bind(input.draft.expected_pubkey.as_str())
    265         .bind(input.idempotency_key.as_deref())
    266         .bind(digest.as_str())
    267         .bind(RadrootsOutboxOperationStatus::Queued.as_str())
    268         .bind(input.created_at_ms)
    269         .bind(input.created_at_ms)
    270         .execute(&mut *tx)
    271         .await?;
    272         let operation_id = operation.last_insert_rowid();
    273         let draft_json = serde_json::to_string(&input.draft)?;
    274         let signed_event_json = serde_json::to_string(&input.signed_event)?;
    275         let event = sqlx::query(
    276             "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, ?, 1, ?, ?, ?, ?)",
    277         )
    278         .bind(operation_id)
    279         .bind(input.draft.expected_event_id.as_str())
    280         .bind(input.draft.expected_pubkey.as_str())
    281         .bind(draft_json.as_str())
    282         .bind(signed_event_json.as_str())
    283         .bind(input.signed_event.raw_json.as_str())
    284         .bind(RadrootsOutboxEventState::Signed.as_str())
    285         .bind(accepted_quorum)
    286         .bind(input.created_at_ms)
    287         .bind(bool_i64(input.event_store_inserted))
    288         .bind(input.event_store_ingested_at_ms)
    289         .bind(input.created_at_ms)
    290         .bind(input.created_at_ms)
    291         .execute(&mut *tx)
    292         .await?;
    293         let outbox_event_id = event.last_insert_rowid();
    294 
    295         for relay_url in target_relays {
    296             sqlx::query(
    297                 "INSERT INTO outbox_event_relay_status(outbox_event_id, relay_url, status, attempt_count) VALUES (?, ?, ?, 0)",
    298             )
    299             .bind(outbox_event_id)
    300             .bind(relay_url.as_str())
    301             .bind(RadrootsOutboxRelayStatus::Pending.as_str())
    302             .execute(&mut *tx)
    303             .await?;
    304         }
    305 
    306         tx.commit().await?;
    307         Ok(RadrootsOutboxEnqueueReceipt {
    308             status: RadrootsOutboxEnqueueStatus::Inserted,
    309             operation_id,
    310             outbox_event_id,
    311             expected_event_id: input.draft.expected_event_id,
    312             idempotency_digest: digest,
    313         })
    314     }
    315 
    316     pub async fn get_operation(
    317         &self,
    318         operation_id: i64,
    319     ) -> Result<Option<RadrootsOutboxOperationRecord>, RadrootsOutboxError> {
    320         let row = sqlx::query(
    321             "SELECT operation_id, operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms FROM outbox_operation WHERE operation_id = ?",
    322         )
    323         .bind(operation_id)
    324         .fetch_optional(&self.pool)
    325         .await?;
    326         row.map(operation_from_row).transpose()
    327     }
    328 
    329     pub async fn get_event(
    330         &self,
    331         outbox_event_id: i64,
    332     ) -> Result<Option<RadrootsOutboxEventRecord>, RadrootsOutboxError> {
    333         let row = sqlx::query(
    334             "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?",
    335         )
    336         .bind(outbox_event_id)
    337         .fetch_optional(&self.pool)
    338         .await?;
    339         row.map(event_from_row).transpose()
    340     }
    341 
    342     pub async fn relay_statuses(
    343         &self,
    344         outbox_event_id: i64,
    345     ) -> Result<Vec<RadrootsOutboxRelayStatusRecord>, RadrootsOutboxError> {
    346         relay_statuses_for(&self.pool, outbox_event_id).await
    347     }
    348 
    349     pub async fn claim_next_ready_event(
    350         &self,
    351         claim_owner: impl AsRef<str>,
    352         claim_token: impl AsRef<str>,
    353         claim_expires_at_ms: i64,
    354         now_ms: i64,
    355     ) -> Result<Option<RadrootsOutboxClaimedEvent>, RadrootsOutboxError> {
    356         let mut tx = self.pool.begin().await?;
    357         let row = sqlx::query(
    358             "SELECT outbox_event_id, state, signed_event_json FROM outbox_event WHERE state IN ('draft_queued', 'sign_retryable', 'signed', 'publish_retryable') AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?) ORDER BY created_at_ms, outbox_event_id LIMIT 1",
    359         )
    360         .bind(now_ms)
    361         .bind(now_ms)
    362         .fetch_optional(&mut *tx)
    363         .await?;
    364         let Some(row) = row else {
    365             tx.commit().await?;
    366             return Ok(None);
    367         };
    368         let outbox_event_id: i64 = row.try_get("outbox_event_id")?;
    369         let state = RadrootsOutboxEventState::parse(row.try_get::<String, _>("state")?.as_str())?;
    370         let signed_event_json: Option<String> = row.try_get("signed_event_json")?;
    371         let claimed_state = match (state, signed_event_json.as_ref()) {
    372             (
    373                 RadrootsOutboxEventState::DraftQueued | RadrootsOutboxEventState::SignRetryable,
    374                 None,
    375             ) => RadrootsOutboxEventState::Signing,
    376             _ => RadrootsOutboxEventState::Publishing,
    377         };
    378         let changed = sqlx::query(
    379             "UPDATE outbox_event SET state = ?, claim_token = ?, claim_owner = ?, claim_expires_at_ms = ?, attempt_count = attempt_count + 1, updated_at_ms = ? WHERE outbox_event_id = ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?)",
    380         )
    381         .bind(claimed_state.as_str())
    382         .bind(claim_token.as_ref())
    383         .bind(claim_owner.as_ref())
    384         .bind(claim_expires_at_ms)
    385         .bind(now_ms)
    386         .bind(outbox_event_id)
    387         .bind(now_ms)
    388         .execute(&mut *tx)
    389         .await?;
    390         if changed.rows_affected() == 0 {
    391             tx.commit().await?;
    392             return Ok(None);
    393         }
    394         let record = event_by_id_tx(&mut tx, outbox_event_id).await?;
    395         let target_relays = relay_urls_for_tx(&mut tx, outbox_event_id).await?;
    396         tx.commit().await?;
    397         Ok(Some(RadrootsOutboxClaimedEvent {
    398             outbox_event_id: record.outbox_event_id,
    399             operation_id: record.operation_id,
    400             expected_event_id: record.event_id,
    401             attempt_count: record.attempt_count,
    402             state: claimed_state,
    403             claim_token: claim_token.as_ref().to_owned(),
    404             draft: record.draft,
    405             signed_event: record.signed_event,
    406             target_relays,
    407         }))
    408     }
    409 
    410     pub async fn claim_next_ready_signed_event(
    411         &self,
    412         claim_owner: impl AsRef<str>,
    413         claim_token: impl AsRef<str>,
    414         claim_expires_at_ms: i64,
    415         now_ms: i64,
    416     ) -> Result<Option<RadrootsOutboxClaimedEvent>, RadrootsOutboxError> {
    417         let mut tx = self.pool.begin().await?;
    418         let row = sqlx::query(
    419             "SELECT outbox_event_id FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?) ORDER BY created_at_ms, outbox_event_id LIMIT 1",
    420         )
    421         .bind(now_ms)
    422         .bind(now_ms)
    423         .fetch_optional(&mut *tx)
    424         .await?;
    425         let Some(row) = row else {
    426             tx.commit().await?;
    427             return Ok(None);
    428         };
    429         let outbox_event_id: i64 = row.try_get("outbox_event_id")?;
    430         let changed = sqlx::query(
    431             "UPDATE outbox_event SET state = ?, claim_token = ?, claim_owner = ?, claim_expires_at_ms = ?, attempt_count = attempt_count + 1, updated_at_ms = ? WHERE outbox_event_id = ? AND state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND (claim_token IS NULL OR claim_expires_at_ms <= ?)",
    432         )
    433         .bind(RadrootsOutboxEventState::Publishing.as_str())
    434         .bind(claim_token.as_ref())
    435         .bind(claim_owner.as_ref())
    436         .bind(claim_expires_at_ms)
    437         .bind(now_ms)
    438         .bind(outbox_event_id)
    439         .bind(now_ms)
    440         .execute(&mut *tx)
    441         .await?;
    442         if changed.rows_affected() == 0 {
    443             tx.commit().await?;
    444             return Ok(None);
    445         }
    446         let record = event_by_id_tx(&mut tx, outbox_event_id).await?;
    447         let target_relays = relay_urls_for_tx(&mut tx, outbox_event_id).await?;
    448         tx.commit().await?;
    449         Ok(Some(RadrootsOutboxClaimedEvent {
    450             outbox_event_id: record.outbox_event_id,
    451             operation_id: record.operation_id,
    452             expected_event_id: record.event_id,
    453             attempt_count: record.attempt_count,
    454             state: RadrootsOutboxEventState::Publishing,
    455             claim_token: claim_token.as_ref().to_owned(),
    456             draft: record.draft,
    457             signed_event: record.signed_event,
    458             target_relays,
    459         }))
    460     }
    461 
    462     pub async fn complete_signing(
    463         &self,
    464         outbox_event_id: i64,
    465         claim_token: &str,
    466         signed_event: RadrootsSignedNostrEvent,
    467         now_ms: i64,
    468     ) -> Result<RadrootsSignedNostrEvent, RadrootsOutboxError> {
    469         let record = self.claimed_event(outbox_event_id, claim_token).await?;
    470         if signed_event.id != record.event_id {
    471             return Err(RadrootsOutboxError::SignedEventIdMismatch {
    472                 expected_event_id: record.event_id,
    473                 actual_event_id: signed_event.id,
    474             });
    475         }
    476         let signed_event_json = serde_json::to_string(&signed_event)?;
    477         let changed = sqlx::query(
    478             "UPDATE outbox_event SET signed_event_json = ?, raw_event_json = ?, state = ?, last_error = NULL, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    479         )
    480         .bind(signed_event_json.as_str())
    481         .bind(signed_event.raw_json.as_str())
    482         .bind(RadrootsOutboxEventState::Signed.as_str())
    483         .bind(now_ms)
    484         .bind(outbox_event_id)
    485         .bind(claim_token)
    486         .execute(&self.pool)
    487         .await?;
    488         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    489             .await?;
    490         Ok(signed_event)
    491     }
    492 
    493     pub async fn mark_sign_retryable(
    494         &self,
    495         outbox_event_id: i64,
    496         claim_token: &str,
    497         error: impl AsRef<str>,
    498         next_attempt_after_ms: i64,
    499         now_ms: i64,
    500     ) -> Result<(), RadrootsOutboxError> {
    501         let changed = sqlx::query(
    502             "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    503         )
    504         .bind(RadrootsOutboxEventState::SignRetryable.as_str())
    505         .bind(error.as_ref())
    506         .bind(next_attempt_after_ms)
    507         .bind(now_ms)
    508         .bind(outbox_event_id)
    509         .bind(claim_token)
    510         .execute(&self.pool)
    511         .await?;
    512         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    513             .await?;
    514         Ok(())
    515     }
    516 
    517     pub async fn mark_publish_retryable(
    518         &self,
    519         outbox_event_id: i64,
    520         claim_token: &str,
    521         error: impl AsRef<str>,
    522         next_attempt_after_ms: i64,
    523         now_ms: i64,
    524     ) -> Result<(), RadrootsOutboxError> {
    525         let changed = sqlx::query(
    526             "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    527         )
    528         .bind(RadrootsOutboxEventState::PublishRetryable.as_str())
    529         .bind(error.as_ref())
    530         .bind(next_attempt_after_ms)
    531         .bind(now_ms)
    532         .bind(outbox_event_id)
    533         .bind(claim_token)
    534         .execute(&self.pool)
    535         .await?;
    536         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    537             .await?;
    538         Ok(())
    539     }
    540 
    541     pub async fn recover_expired_claims(&self, now_ms: i64) -> Result<u64, RadrootsOutboxError> {
    542         let changed = sqlx::query(
    543             "UPDATE outbox_event SET state = CASE WHEN state = 'signing' AND signed_event_json IS NULL THEN 'sign_retryable' WHEN state = 'signing' AND signed_event_json IS NOT NULL THEN 'signed' WHEN state = 'publishing' THEN 'publish_retryable' ELSE state END, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, updated_at_ms = ? WHERE claim_token IS NOT NULL AND claim_expires_at_ms <= ? AND state IN ('signing', 'signed', 'publishing')",
    544         )
    545         .bind(now_ms)
    546         .bind(now_ms)
    547         .execute(&self.pool)
    548         .await?;
    549         Ok(changed.rows_affected())
    550     }
    551 
    552     pub async fn ingest_signed_event_local(
    553         &self,
    554         event_store: &RadrootsEventStore,
    555         outbox_event_id: i64,
    556         claim_token: &str,
    557         observed_at_ms: i64,
    558     ) -> Result<RadrootsOutboxEventStoreIngestReceipt, RadrootsOutboxError> {
    559         let record = self.claimed_event(outbox_event_id, claim_token).await?;
    560         if record.event_store_ingested {
    561             return Ok(RadrootsOutboxEventStoreIngestReceipt {
    562                 outbox_event_id,
    563                 event_id: record.event_id,
    564                 already_ingested: true,
    565                 event_store_inserted: false,
    566             });
    567         }
    568         let signed_event = record
    569             .signed_event
    570             .ok_or(RadrootsOutboxError::MissingSignedEvent(outbox_event_id))?;
    571         let event = event_from_signed(&signed_event);
    572         let ingest = RadrootsEventIngest::new(event, observed_at_ms)
    573             .with_raw_json(signed_event.raw_json.clone());
    574         let receipt = event_store.ingest_event(ingest).await?;
    575         let changed = sqlx::query(
    576             "UPDATE outbox_event SET event_store_ingested = 1, event_store_inserted = ?, event_store_ingested_at_ms = ?, state = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    577         )
    578         .bind(bool_i64(receipt.inserted))
    579         .bind(observed_at_ms)
    580         .bind(RadrootsOutboxEventState::Publishing.as_str())
    581         .bind(observed_at_ms)
    582         .bind(outbox_event_id)
    583         .bind(claim_token)
    584         .execute(&self.pool)
    585         .await?;
    586         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    587             .await?;
    588         Ok(RadrootsOutboxEventStoreIngestReceipt {
    589             outbox_event_id,
    590             event_id: receipt.event_id,
    591             already_ingested: false,
    592             event_store_inserted: receipt.inserted,
    593         })
    594     }
    595 
    596     pub async fn mark_relay_accepted(
    597         &self,
    598         outbox_event_id: i64,
    599         claim_token: &str,
    600         relay_url: &str,
    601         acknowledged_at_ms: i64,
    602     ) -> Result<(), RadrootsOutboxError> {
    603         let changed = sqlx::query(
    604             "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = ?, last_error = NULL WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)",
    605         )
    606         .bind(RadrootsOutboxRelayStatus::Accepted.as_str())
    607         .bind(acknowledged_at_ms)
    608         .bind(acknowledged_at_ms)
    609         .bind(outbox_event_id)
    610         .bind(relay_url)
    611         .bind(outbox_event_id)
    612         .bind(claim_token)
    613         .execute(&self.pool)
    614         .await?;
    615         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    616             .await?;
    617         Ok(())
    618     }
    619 
    620     pub async fn set_publish_quorum(
    621         &self,
    622         outbox_event_id: i64,
    623         claim_token: &str,
    624         accepted_quorum: i64,
    625         now_ms: i64,
    626     ) -> Result<(), RadrootsOutboxError> {
    627         let changed = sqlx::query(
    628             "UPDATE outbox_event SET accepted_quorum = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    629         )
    630         .bind(accepted_quorum)
    631         .bind(now_ms)
    632         .bind(outbox_event_id)
    633         .bind(claim_token)
    634         .execute(&self.pool)
    635         .await?;
    636         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    637             .await?;
    638         Ok(())
    639     }
    640 
    641     pub async fn mark_relay_failed_retryable(
    642         &self,
    643         outbox_event_id: i64,
    644         claim_token: &str,
    645         relay_url: &str,
    646         error: &str,
    647         attempted_at_ms: i64,
    648     ) -> Result<(), RadrootsOutboxError> {
    649         self.mark_relay_failed(
    650             outbox_event_id,
    651             claim_token,
    652             relay_url,
    653             RadrootsOutboxRelayStatus::FailedRetryable,
    654             error,
    655             attempted_at_ms,
    656         )
    657         .await
    658     }
    659 
    660     pub async fn mark_relay_failed_terminal(
    661         &self,
    662         outbox_event_id: i64,
    663         claim_token: &str,
    664         relay_url: &str,
    665         error: &str,
    666         attempted_at_ms: i64,
    667     ) -> Result<(), RadrootsOutboxError> {
    668         self.mark_relay_failed(
    669             outbox_event_id,
    670             claim_token,
    671             relay_url,
    672             RadrootsOutboxRelayStatus::FailedTerminal,
    673             error,
    674             attempted_at_ms,
    675         )
    676         .await
    677     }
    678 
    679     pub async fn complete_publish_attempt(
    680         &self,
    681         outbox_event_id: i64,
    682         claim_token: &str,
    683         retryable_error: impl AsRef<str>,
    684         terminal_error: impl AsRef<str>,
    685         next_attempt_after_ms: i64,
    686         now_ms: i64,
    687     ) -> Result<RadrootsOutboxEventState, RadrootsOutboxError> {
    688         let mut tx = self.pool.begin().await?;
    689         let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?;
    690         let operation_id = row.operation_id;
    691         let accepted_quorum = row.accepted_quorum;
    692         let accepted_count: i64 = sqlx::query(
    693             "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?",
    694         )
    695         .bind(outbox_event_id)
    696         .bind(RadrootsOutboxRelayStatus::Accepted.as_str())
    697         .fetch_one(&mut *tx)
    698         .await?
    699         .try_get(0)?;
    700         let retryable_count: i64 = sqlx::query(
    701             "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?",
    702         )
    703         .bind(outbox_event_id)
    704         .bind(RadrootsOutboxRelayStatus::FailedRetryable.as_str())
    705         .fetch_one(&mut *tx)
    706         .await?
    707         .try_get(0)?;
    708         let pending_count: i64 = sqlx::query(
    709             "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?",
    710         )
    711         .bind(outbox_event_id)
    712         .bind(RadrootsOutboxRelayStatus::Pending.as_str())
    713         .fetch_one(&mut *tx)
    714         .await?
    715         .try_get(0)?;
    716 
    717         let (event_state, operation_status, last_error, next_attempt_after_ms) =
    718             if accepted_count >= accepted_quorum {
    719                 (
    720                     RadrootsOutboxEventState::Published,
    721                     Some(RadrootsOutboxOperationStatus::Complete),
    722                     None,
    723                     now_ms,
    724                 )
    725             } else if retryable_count > 0 || pending_count > 0 {
    726                 (
    727                     RadrootsOutboxEventState::PublishRetryable,
    728                     None,
    729                     Some(retryable_error.as_ref()),
    730                     next_attempt_after_ms,
    731                 )
    732             } else {
    733                 (
    734                     RadrootsOutboxEventState::FailedTerminal,
    735                     Some(RadrootsOutboxOperationStatus::FailedTerminal),
    736                     Some(terminal_error.as_ref()),
    737                     now_ms,
    738                 )
    739             };
    740 
    741         let changed = sqlx::query(
    742             "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    743         )
    744         .bind(event_state.as_str())
    745         .bind(last_error)
    746         .bind(next_attempt_after_ms)
    747         .bind(now_ms)
    748         .bind(outbox_event_id)
    749         .bind(claim_token)
    750         .execute(&mut *tx)
    751         .await?;
    752         if changed.rows_affected() == 0 {
    753             return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id });
    754         }
    755 
    756         if let Some(operation_status) = operation_status {
    757             sqlx::query(
    758                 "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?",
    759             )
    760             .bind(operation_status.as_str())
    761             .bind(now_ms)
    762             .bind(operation_id)
    763             .execute(&mut *tx)
    764             .await?;
    765         }
    766 
    767         tx.commit().await?;
    768         Ok(event_state)
    769     }
    770 
    771     pub async fn mark_publish_failed_terminal(
    772         &self,
    773         outbox_event_id: i64,
    774         claim_token: &str,
    775         error: impl AsRef<str>,
    776         now_ms: i64,
    777     ) -> Result<(), RadrootsOutboxError> {
    778         self.finish_claimed_event(
    779             outbox_event_id,
    780             claim_token,
    781             RadrootsOutboxEventState::FailedTerminal,
    782             RadrootsOutboxOperationStatus::FailedTerminal,
    783             Some(error.as_ref()),
    784             now_ms,
    785         )
    786         .await
    787     }
    788 
    789     pub async fn cancel_claimed_event(
    790         &self,
    791         outbox_event_id: i64,
    792         claim_token: &str,
    793         now_ms: i64,
    794     ) -> Result<(), RadrootsOutboxError> {
    795         self.finish_claimed_event(
    796             outbox_event_id,
    797             claim_token,
    798             RadrootsOutboxEventState::Cancelled,
    799             RadrootsOutboxOperationStatus::Cancelled,
    800             None,
    801             now_ms,
    802         )
    803         .await
    804     }
    805 
    806     async fn claimed_event(
    807         &self,
    808         outbox_event_id: i64,
    809         claim_token: &str,
    810     ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> {
    811         self.ensure_claim_token(outbox_event_id, claim_token)
    812             .await?;
    813         self.get_event(outbox_event_id)
    814             .await?
    815             .ok_or(RadrootsOutboxError::EventNotFound(outbox_event_id))
    816     }
    817 
    818     async fn ensure_claim_token(
    819         &self,
    820         outbox_event_id: i64,
    821         claim_token: &str,
    822     ) -> Result<(), RadrootsOutboxError> {
    823         let row = sqlx::query("SELECT claim_token FROM outbox_event WHERE outbox_event_id = ?")
    824             .bind(outbox_event_id)
    825             .fetch_optional(&self.pool)
    826             .await?;
    827         let Some(row) = row else {
    828             return Err(RadrootsOutboxError::EventNotFound(outbox_event_id));
    829         };
    830         let stored: Option<String> = row.try_get("claim_token")?;
    831         if stored.as_deref() != Some(claim_token) {
    832             return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id });
    833         }
    834         Ok(())
    835     }
    836 
    837     async fn ensure_claimed_update(
    838         &self,
    839         outbox_event_id: i64,
    840         claim_token: &str,
    841         changed: SqliteQueryResult,
    842     ) -> Result<(), RadrootsOutboxError> {
    843         if changed.rows_affected() > 0 {
    844             return Ok(());
    845         }
    846         self.ensure_claim_token(outbox_event_id, claim_token)
    847             .await?;
    848         Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id })
    849     }
    850 
    851     async fn finish_claimed_event(
    852         &self,
    853         outbox_event_id: i64,
    854         claim_token: &str,
    855         event_state: RadrootsOutboxEventState,
    856         operation_status: RadrootsOutboxOperationStatus,
    857         last_error: Option<&str>,
    858         now_ms: i64,
    859     ) -> Result<(), RadrootsOutboxError> {
    860         let mut tx = self.pool.begin().await?;
    861         let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?;
    862         let changed = sqlx::query(
    863             "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?",
    864         )
    865         .bind(event_state.as_str())
    866         .bind(last_error)
    867         .bind(now_ms)
    868         .bind(now_ms)
    869         .bind(outbox_event_id)
    870         .bind(claim_token)
    871         .execute(&mut *tx)
    872         .await?;
    873         if changed.rows_affected() == 0 {
    874             return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id });
    875         }
    876 
    877         sqlx::query(
    878             "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?",
    879         )
    880         .bind(operation_status.as_str())
    881         .bind(now_ms)
    882         .bind(row.operation_id)
    883         .execute(&mut *tx)
    884         .await?;
    885 
    886         tx.commit().await?;
    887         Ok(())
    888     }
    889 
    890     async fn mark_relay_failed(
    891         &self,
    892         outbox_event_id: i64,
    893         claim_token: &str,
    894         relay_url: &str,
    895         status: RadrootsOutboxRelayStatus,
    896         error: &str,
    897         attempted_at_ms: i64,
    898     ) -> Result<(), RadrootsOutboxError> {
    899         let changed = sqlx::query(
    900             "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = NULL, last_error = ? WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)",
    901         )
    902         .bind(status.as_str())
    903         .bind(attempted_at_ms)
    904         .bind(error)
    905         .bind(outbox_event_id)
    906         .bind(relay_url)
    907         .bind(outbox_event_id)
    908         .bind(claim_token)
    909         .execute(&self.pool)
    910         .await?;
    911         self.ensure_claimed_update(outbox_event_id, claim_token, changed)
    912             .await?;
    913         Ok(())
    914     }
    915 }
    916 
    917 struct ExistingOperation {
    918     operation_id: i64,
    919     outbox_event_id: i64,
    920     event_id: String,
    921     idempotency_digest: String,
    922 }
    923 
    924 struct ClaimedEventIdentity {
    925     operation_id: i64,
    926     accepted_quorum: i64,
    927 }
    928 
    929 async fn configure_connection(
    930     pool: &SqlitePool,
    931     file_backed: bool,
    932 ) -> Result<(), RadrootsOutboxError> {
    933     sqlx::query("PRAGMA foreign_keys = ON")
    934         .execute(pool)
    935         .await?;
    936     sqlx::query("PRAGMA busy_timeout = 5000")
    937         .execute(pool)
    938         .await?;
    939     if file_backed {
    940         sqlx::query("PRAGMA journal_mode = WAL")
    941             .execute(pool)
    942             .await?;
    943     }
    944     Ok(())
    945 }
    946 
    947 #[cfg_attr(coverage_nightly, coverage(off))]
    948 async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> {
    949     sqlx::raw_sql(OUTBOX_MIGRATION_UP).execute(pool).await?;
    950     Ok(())
    951 }
    952 
    953 #[cfg_attr(coverage_nightly, coverage(off))]
    954 async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> {
    955     sqlx::raw_sql(OUTBOX_MIGRATION_DOWN).execute(pool).await?;
    956     Ok(())
    957 }
    958 
    959 #[cfg_attr(coverage_nightly, coverage(off))]
    960 async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsOutboxError> {
    961     let row = sqlx::query(sql).fetch_one(pool).await?;
    962     Ok(row.try_get(0)?)
    963 }
    964 
    965 #[cfg_attr(coverage_nightly, coverage(off))]
    966 async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsOutboxError> {
    967     let row = sqlx::query(sql).fetch_one(pool).await?;
    968     Ok(row.try_get(0)?)
    969 }
    970 
    971 #[cfg_attr(coverage_nightly, coverage(off))]
    972 async fn existing_idempotent_operation(
    973     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    974     operation_kind: &str,
    975     expected_pubkey: &str,
    976     idempotency_key: &str,
    977 ) -> Result<Option<ExistingOperation>, RadrootsOutboxError> {
    978     let row = sqlx::query(
    979         "SELECT o.operation_id, o.idempotency_digest, e.outbox_event_id, e.event_id FROM outbox_operation o JOIN outbox_event e ON e.operation_id = o.operation_id WHERE o.operation_kind = ? AND o.expected_pubkey = ? AND o.idempotency_key = ? ORDER BY e.outbox_event_id LIMIT 1",
    980     )
    981     .bind(operation_kind)
    982     .bind(expected_pubkey)
    983     .bind(idempotency_key)
    984     .fetch_optional(&mut **tx)
    985     .await?;
    986     row.map(|row| {
    987         Ok(ExistingOperation {
    988             operation_id: row.try_get("operation_id")?,
    989             outbox_event_id: row.try_get("outbox_event_id")?,
    990             event_id: row.try_get("event_id")?,
    991             idempotency_digest: row.try_get("idempotency_digest")?,
    992         })
    993     })
    994     .transpose()
    995 }
    996 
    997 #[cfg_attr(coverage_nightly, coverage(off))]
    998 async fn event_by_id_tx(
    999     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
   1000     outbox_event_id: i64,
   1001 ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> {
   1002     let row = sqlx::query(
   1003         "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?",
   1004     )
   1005     .bind(outbox_event_id)
   1006     .fetch_one(&mut **tx)
   1007     .await?;
   1008     event_from_row(row)
   1009 }
   1010 
   1011 async fn claimed_event_identity_tx(
   1012     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
   1013     outbox_event_id: i64,
   1014     claim_token: &str,
   1015 ) -> Result<ClaimedEventIdentity, RadrootsOutboxError> {
   1016     let row =
   1017         sqlx::query("SELECT operation_id, accepted_quorum, claim_token FROM outbox_event WHERE outbox_event_id = ?")
   1018             .bind(outbox_event_id)
   1019             .fetch_optional(&mut **tx)
   1020             .await?;
   1021     let Some(row) = row else {
   1022         return Err(RadrootsOutboxError::EventNotFound(outbox_event_id));
   1023     };
   1024     let stored: Option<String> = row.try_get("claim_token")?;
   1025     if stored.as_deref() != Some(claim_token) {
   1026         return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id });
   1027     }
   1028     Ok(ClaimedEventIdentity {
   1029         operation_id: row.try_get("operation_id")?,
   1030         accepted_quorum: row.try_get("accepted_quorum")?,
   1031     })
   1032 }
   1033 
   1034 #[cfg_attr(coverage_nightly, coverage(off))]
   1035 async fn relay_urls_for_tx(
   1036     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
   1037     outbox_event_id: i64,
   1038 ) -> Result<Vec<String>, RadrootsOutboxError> {
   1039     let rows = sqlx::query(
   1040         "SELECT relay_url FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid",
   1041     )
   1042     .bind(outbox_event_id)
   1043     .fetch_all(&mut **tx)
   1044     .await?;
   1045     rows.into_iter()
   1046         .map(|row| row.try_get("relay_url").map_err(Into::into))
   1047         .collect()
   1048 }
   1049 
   1050 #[cfg_attr(coverage_nightly, coverage(off))]
   1051 async fn relay_statuses_for(
   1052     pool: &SqlitePool,
   1053     outbox_event_id: i64,
   1054 ) -> Result<Vec<RadrootsOutboxRelayStatusRecord>, RadrootsOutboxError> {
   1055     let rows = sqlx::query(
   1056         "SELECT outbox_event_id, relay_url, status, attempt_count, last_attempt_at_ms, acknowledged_at_ms, last_error FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid",
   1057     )
   1058     .bind(outbox_event_id)
   1059     .fetch_all(pool)
   1060     .await?;
   1061     rows.into_iter().map(relay_status_from_row).collect()
   1062 }
   1063 
   1064 #[cfg_attr(coverage_nightly, coverage(off))]
   1065 fn operation_from_row(
   1066     row: sqlx::sqlite::SqliteRow,
   1067 ) -> Result<RadrootsOutboxOperationRecord, RadrootsOutboxError> {
   1068     let status =
   1069         RadrootsOutboxOperationStatus::parse(row.try_get::<String, _>("status")?.as_str())?;
   1070     Ok(RadrootsOutboxOperationRecord {
   1071         operation_id: row.try_get("operation_id")?,
   1072         operation_kind: row.try_get("operation_kind")?,
   1073         expected_pubkey: row.try_get("expected_pubkey")?,
   1074         idempotency_key: row.try_get("idempotency_key")?,
   1075         idempotency_digest: row.try_get("idempotency_digest")?,
   1076         status,
   1077         created_at_ms: row.try_get("created_at_ms")?,
   1078         updated_at_ms: row.try_get("updated_at_ms")?,
   1079     })
   1080 }
   1081 
   1082 #[cfg_attr(coverage_nightly, coverage(off))]
   1083 fn event_from_row(
   1084     row: sqlx::sqlite::SqliteRow,
   1085 ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> {
   1086     let draft: RadrootsFrozenEventDraft =
   1087         serde_json::from_str(row.try_get::<String, _>("draft_json")?.as_str())?;
   1088     let signed_event = row
   1089         .try_get::<Option<String>, _>("signed_event_json")?
   1090         .map(|json| serde_json::from_str(json.as_str()))
   1091         .transpose()?;
   1092     let state = RadrootsOutboxEventState::parse(row.try_get::<String, _>("state")?.as_str())?;
   1093     Ok(RadrootsOutboxEventRecord {
   1094         outbox_event_id: row.try_get("outbox_event_id")?,
   1095         operation_id: row.try_get("operation_id")?,
   1096         event_id: row.try_get("event_id")?,
   1097         expected_pubkey: row.try_get("expected_pubkey")?,
   1098         draft,
   1099         signed_event,
   1100         raw_event_json: row.try_get("raw_event_json")?,
   1101         state,
   1102         accepted_quorum: row.try_get("accepted_quorum")?,
   1103         attempt_count: row.try_get("attempt_count")?,
   1104         claim_token: row.try_get("claim_token")?,
   1105         claim_owner: row.try_get("claim_owner")?,
   1106         claim_expires_at_ms: row.try_get("claim_expires_at_ms")?,
   1107         next_attempt_after_ms: row.try_get("next_attempt_after_ms")?,
   1108         last_error: row.try_get("last_error")?,
   1109         event_store_ingested: row.try_get::<i64, _>("event_store_ingested")? != 0,
   1110         event_store_inserted: row.try_get::<i64, _>("event_store_inserted")? != 0,
   1111         event_store_ingested_at_ms: row.try_get("event_store_ingested_at_ms")?,
   1112         created_at_ms: row.try_get("created_at_ms")?,
   1113         updated_at_ms: row.try_get("updated_at_ms")?,
   1114     })
   1115 }
   1116 
   1117 #[cfg_attr(coverage_nightly, coverage(off))]
   1118 fn relay_status_from_row(
   1119     row: sqlx::sqlite::SqliteRow,
   1120 ) -> Result<RadrootsOutboxRelayStatusRecord, RadrootsOutboxError> {
   1121     let status = RadrootsOutboxRelayStatus::parse(row.try_get::<String, _>("status")?.as_str())?;
   1122     Ok(RadrootsOutboxRelayStatusRecord {
   1123         outbox_event_id: row.try_get("outbox_event_id")?,
   1124         relay_url: row.try_get("relay_url")?,
   1125         status,
   1126         attempt_count: row.try_get("attempt_count")?,
   1127         last_attempt_at_ms: row.try_get("last_attempt_at_ms")?,
   1128         acknowledged_at_ms: row.try_get("acknowledged_at_ms")?,
   1129         last_error: row.try_get("last_error")?,
   1130     })
   1131 }
   1132 
   1133 fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent {
   1134     RadrootsNostrEvent {
   1135         id: signed_event.id.clone(),
   1136         author: signed_event.pubkey.clone(),
   1137         created_at: signed_event.created_at,
   1138         kind: signed_event.kind,
   1139         tags: signed_event.tags.clone(),
   1140         content: signed_event.content.clone(),
   1141         sig: signed_event.sig.clone(),
   1142     }
   1143 }
   1144 
   1145 fn ordered_unique_relays(relays: Vec<String>) -> Vec<String> {
   1146     let mut out = Vec::new();
   1147     for relay in relays {
   1148         if !out.iter().any(|existing| existing == &relay) {
   1149             out.push(relay);
   1150         }
   1151     }
   1152     out
   1153 }
   1154 
   1155 fn digest_relays(relays: &[String]) -> Vec<String> {
   1156     let mut out = relays.to_vec();
   1157     out.sort();
   1158     out.dedup();
   1159     out
   1160 }
   1161 
   1162 #[derive(Serialize)]
   1163 struct DigestInput<'a> {
   1164     operation_kind: &'a str,
   1165     expected_pubkey: &'a str,
   1166     draft: &'a RadrootsFrozenEventDraft,
   1167     target_relays: &'a [String],
   1168 }
   1169 
   1170 fn idempotency_digest(
   1171     operation_kind: &str,
   1172     expected_pubkey: &str,
   1173     draft: &RadrootsFrozenEventDraft,
   1174     target_relays: &[String],
   1175 ) -> Result<String, RadrootsOutboxError> {
   1176     let input = DigestInput {
   1177         operation_kind,
   1178         expected_pubkey,
   1179         draft,
   1180         target_relays,
   1181     };
   1182     let bytes = serde_json::to_vec(&input)?;
   1183     Ok(hex::encode(Sha256::digest(bytes)))
   1184 }
   1185 
   1186 fn bool_i64(value: bool) -> i64 {
   1187     if value { 1 } else { 0 }
   1188 }
   1189 
   1190 #[cfg(test)]
   1191 mod tests {
   1192     use super::*;
   1193     use radroots_events::kinds::KIND_POST;
   1194     use radroots_nostr::prelude::{
   1195         RadrootsNostrKeys, RadrootsNostrSecretKey, radroots_nostr_sign_frozen_draft,
   1196     };
   1197 
   1198     const FIXTURE_ALICE_SECRET_KEY_HEX: &str =
   1199         "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5";
   1200     const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str =
   1201         "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df";
   1202     const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com";
   1203     const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com";
   1204 
   1205     fn hex_64(character: char) -> String {
   1206         std::iter::repeat_n(character, 64).collect()
   1207     }
   1208 
   1209     fn post_draft(expected_pubkey: &str, content: &str) -> RadrootsFrozenEventDraft {
   1210         RadrootsFrozenEventDraft::new(
   1211             "radroots.social.post.v1",
   1212             KIND_POST,
   1213             1_700_000_000,
   1214             vec![vec!["t".to_owned(), "soil".to_owned()]],
   1215             content,
   1216             expected_pubkey,
   1217         )
   1218         .expect("post draft")
   1219     }
   1220 
   1221     fn operation_input(
   1222         draft: RadrootsFrozenEventDraft,
   1223         created_at_ms: i64,
   1224     ) -> RadrootsOutboxOperationInput {
   1225         RadrootsOutboxOperationInput::new(
   1226             "publish_post",
   1227             draft,
   1228             vec![
   1229                 RELAY_PRIMARY_WSS.to_owned(),
   1230                 RELAY_SECONDARY_WSS.to_owned(),
   1231                 RELAY_PRIMARY_WSS.to_owned(),
   1232             ],
   1233             created_at_ms,
   1234         )
   1235     }
   1236 
   1237     fn signed_operation_input(
   1238         draft: RadrootsFrozenEventDraft,
   1239         signed_event: RadrootsSignedNostrEvent,
   1240         created_at_ms: i64,
   1241     ) -> RadrootsOutboxSignedOperationInput {
   1242         RadrootsOutboxSignedOperationInput::new(
   1243             "publish_post",
   1244             draft,
   1245             signed_event,
   1246             vec![
   1247                 RELAY_PRIMARY_WSS.to_owned(),
   1248                 RELAY_SECONDARY_WSS.to_owned(),
   1249                 RELAY_PRIMARY_WSS.to_owned(),
   1250             ],
   1251             true,
   1252             created_at_ms + 7,
   1253             created_at_ms,
   1254         )
   1255     }
   1256 
   1257     fn fixture_keys() -> RadrootsNostrKeys {
   1258         let secret_key =
   1259             RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key");
   1260         RadrootsNostrKeys::new(secret_key)
   1261     }
   1262 
   1263     async fn enqueue_signed_fixture(
   1264         outbox: &RadrootsOutbox,
   1265     ) -> (RadrootsOutboxEnqueueReceipt, RadrootsOutboxClaimedEvent) {
   1266         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "hello");
   1267         let receipt = outbox
   1268             .enqueue_operation(operation_input(draft, 1_000))
   1269             .await
   1270             .expect("enqueue");
   1271         let claimed = outbox
   1272             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   1273             .await
   1274             .expect("claim")
   1275             .expect("claimed event");
   1276         (receipt, claimed)
   1277     }
   1278 
   1279     async fn complete_claimed_signing(
   1280         outbox: &RadrootsOutbox,
   1281         claimed: &RadrootsOutboxClaimedEvent,
   1282         keys: &RadrootsNostrKeys,
   1283         now_ms: i64,
   1284     ) -> RadrootsSignedNostrEvent {
   1285         if let Some(signed_event) = claimed.signed_event.clone() {
   1286             return signed_event;
   1287         }
   1288         let signed_event =
   1289             radroots_nostr_sign_frozen_draft(keys, &claimed.draft).expect("signed event");
   1290         outbox
   1291             .complete_signing(
   1292                 claimed.outbox_event_id,
   1293                 claimed.claim_token.as_str(),
   1294                 signed_event,
   1295                 now_ms,
   1296             )
   1297             .await
   1298             .expect("complete signing")
   1299     }
   1300 
   1301     async fn table_count(outbox: &RadrootsOutbox, table_name: &str) -> i64 {
   1302         let sql = format!("SELECT COUNT(*) FROM {table_name}");
   1303         sqlx::query_scalar(sql.as_str())
   1304             .fetch_one(outbox.pool())
   1305             .await
   1306             .expect("table count")
   1307     }
   1308 
   1309     #[tokio::test]
   1310     async fn migration_applies_pragmas_and_migrates_down() {
   1311         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1312 
   1313         assert_eq!(outbox.pragma_foreign_keys().await.expect("foreign keys"), 1);
   1314         assert_eq!(
   1315             outbox.pragma_busy_timeout().await.expect("busy timeout"),
   1316             5_000
   1317         );
   1318         assert_eq!(
   1319             outbox.pragma_journal_mode().await.expect("journal mode"),
   1320             "memory"
   1321         );
   1322 
   1323         let row = sqlx::query(
   1324             "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'outbox_event'",
   1325         )
   1326         .fetch_optional(outbox.pool())
   1327         .await
   1328         .expect("table query");
   1329         assert!(row.is_some());
   1330 
   1331         outbox.migrate_down().await.expect("migrate down");
   1332         let row = sqlx::query(
   1333             "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'outbox_event'",
   1334         )
   1335         .fetch_optional(outbox.pool())
   1336         .await
   1337         .expect("table query");
   1338         assert!(row.is_none());
   1339     }
   1340 
   1341     #[tokio::test]
   1342     async fn file_outbox_reopens_existing_schema() {
   1343         let tempdir = tempfile::tempdir().expect("tempdir");
   1344         let path = tempdir.path().join("outbox.sqlite");
   1345 
   1346         let first = RadrootsOutbox::open_file(&path).await.expect("first");
   1347         assert_eq!(first.pragma_foreign_keys().await.expect("foreign keys"), 1);
   1348         drop(first);
   1349 
   1350         let second = RadrootsOutbox::open_file(&path).await.expect("second");
   1351         assert_eq!(second.pragma_foreign_keys().await.expect("foreign keys"), 1);
   1352     }
   1353 
   1354     #[tokio::test]
   1355     async fn status_summary_counts_ready_publishing_retryable_and_terminal_work() {
   1356         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1357 
   1358         let empty = outbox.status_summary(1_000).await.expect("empty status");
   1359         assert_eq!(empty.total_events, 0);
   1360         assert_eq!(empty.pending_events, 0);
   1361         assert_eq!(empty.retryable_events, 0);
   1362         assert_eq!(empty.terminal_events, 0);
   1363         assert_eq!(empty.failed_terminal_events, 0);
   1364         assert_eq!(empty.ready_signed_events, 0);
   1365         assert_eq!(empty.publishing_events, 0);
   1366         assert_eq!(empty.last_attempt_at_ms, None);
   1367         assert_eq!(empty.last_error, None);
   1368 
   1369         let keys = fixture_keys();
   1370         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ready");
   1371         let signed_event = radroots_nostr_sign_frozen_draft(&keys, &draft).expect("signed event");
   1372         let receipt = outbox
   1373             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   1374             .await
   1375             .expect("signed enqueue");
   1376         let ready = outbox.status_summary(1_000).await.expect("ready status");
   1377         assert_eq!(ready.total_events, 1);
   1378         assert_eq!(ready.pending_events, 1);
   1379         assert_eq!(ready.retryable_events, 0);
   1380         assert_eq!(ready.terminal_events, 0);
   1381         assert_eq!(ready.ready_signed_events, 1);
   1382 
   1383         let claimed = outbox
   1384             .claim_next_ready_signed_event("publisher", "claim-a", 2_500, 2_000)
   1385             .await
   1386             .expect("claim")
   1387             .expect("claimed");
   1388         let publishing = outbox
   1389             .status_summary(2_000)
   1390             .await
   1391             .expect("publishing status");
   1392         assert_eq!(publishing.pending_events, 1);
   1393         assert_eq!(publishing.publishing_events, 1);
   1394         assert_eq!(publishing.ready_signed_events, 0);
   1395 
   1396         outbox
   1397             .mark_relay_failed_retryable(
   1398                 receipt.outbox_event_id,
   1399                 claimed.claim_token.as_str(),
   1400                 RELAY_PRIMARY_WSS,
   1401                 "timeout: relay unavailable",
   1402                 2_100,
   1403             )
   1404             .await
   1405             .expect("relay failed");
   1406         outbox
   1407             .mark_publish_retryable(
   1408                 receipt.outbox_event_id,
   1409                 claimed.claim_token.as_str(),
   1410                 "relay publish incomplete",
   1411                 3_000,
   1412                 2_200,
   1413             )
   1414             .await
   1415             .expect("retryable");
   1416 
   1417         let retry_wait = outbox
   1418             .status_summary(2_900)
   1419             .await
   1420             .expect("retry wait status");
   1421         assert_eq!(retry_wait.pending_events, 0);
   1422         assert_eq!(retry_wait.retryable_events, 1);
   1423         assert_eq!(retry_wait.terminal_events, 0);
   1424         assert_eq!(retry_wait.failed_terminal_events, 0);
   1425         assert_eq!(retry_wait.ready_signed_events, 0);
   1426         assert_eq!(retry_wait.last_attempt_at_ms, Some(2_100));
   1427         assert_eq!(
   1428             retry_wait.last_error.as_deref(),
   1429             Some("relay publish incomplete")
   1430         );
   1431 
   1432         let retry_ready = outbox
   1433             .status_summary(3_000)
   1434             .await
   1435             .expect("retry ready status");
   1436         assert_eq!(retry_ready.ready_signed_events, 1);
   1437     }
   1438 
   1439     #[test]
   1440     fn terminal_and_cancelled_event_states_round_trip() {
   1441         assert_eq!(bool_i64(true), 1);
   1442         assert_eq!(bool_i64(false), 0);
   1443         assert_eq!(
   1444             RadrootsOutboxOperationStatus::parse("failed_terminal").expect("operation status"),
   1445             RadrootsOutboxOperationStatus::FailedTerminal
   1446         );
   1447         assert_eq!(
   1448             RadrootsOutboxOperationStatus::FailedTerminal.as_str(),
   1449             "failed_terminal"
   1450         );
   1451         assert_eq!(
   1452             RadrootsOutboxOperationStatus::parse("cancelled").expect("operation status"),
   1453             RadrootsOutboxOperationStatus::Cancelled
   1454         );
   1455         assert_eq!(
   1456             RadrootsOutboxOperationStatus::Cancelled.as_str(),
   1457             "cancelled"
   1458         );
   1459         assert_eq!(
   1460             RadrootsOutboxEventState::parse("failed_terminal").expect("event state"),
   1461             RadrootsOutboxEventState::FailedTerminal
   1462         );
   1463         assert!(RadrootsOutboxEventState::FailedTerminal.is_terminal());
   1464         assert_eq!(
   1465             RadrootsOutboxEventState::parse("cancelled").expect("event state"),
   1466             RadrootsOutboxEventState::Cancelled
   1467         );
   1468         assert!(RadrootsOutboxEventState::Cancelled.is_terminal());
   1469         assert!(RadrootsOutboxEventState::Published.is_terminal());
   1470         assert!(!RadrootsOutboxEventState::PublishRetryable.is_terminal());
   1471     }
   1472 
   1473     #[tokio::test]
   1474     async fn enqueue_idempotency_is_scoped_by_kind_pubkey_and_digest() {
   1475         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1476         let first_draft = post_draft(hex_64('a').as_str(), "hello");
   1477 
   1478         let first = outbox
   1479             .enqueue_operation(operation_input(first_draft.clone(), 1_000))
   1480             .await
   1481             .expect("first enqueue");
   1482         let second = outbox
   1483             .enqueue_operation(operation_input(first_draft.clone(), 1_001))
   1484             .await
   1485             .expect("second enqueue");
   1486 
   1487         assert_eq!(first.status, RadrootsOutboxEnqueueStatus::Inserted);
   1488         assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Inserted);
   1489         assert_ne!(first.operation_id, second.operation_id);
   1490         assert_ne!(first.outbox_event_id, second.outbox_event_id);
   1491 
   1492         let keyed_first = outbox
   1493             .enqueue_operation(
   1494                 operation_input(first_draft.clone(), 1_002).with_idempotency_key("idem-a"),
   1495             )
   1496             .await
   1497             .expect("keyed first");
   1498         let keyed_second = outbox
   1499             .enqueue_operation(
   1500                 operation_input(first_draft.clone(), 1_003).with_idempotency_key("idem-a"),
   1501             )
   1502             .await
   1503             .expect("keyed second");
   1504 
   1505         assert_eq!(keyed_first.status, RadrootsOutboxEnqueueStatus::Inserted);
   1506         assert_eq!(keyed_second.status, RadrootsOutboxEnqueueStatus::Existing);
   1507         assert_eq!(keyed_first.operation_id, keyed_second.operation_id);
   1508         assert_eq!(keyed_first.outbox_event_id, keyed_second.outbox_event_id);
   1509         assert_eq!(
   1510             keyed_first.idempotency_digest,
   1511             keyed_second.idempotency_digest
   1512         );
   1513 
   1514         let conflict = outbox
   1515             .enqueue_operation(
   1516                 operation_input(post_draft(hex_64('a').as_str(), "changed"), 1_004)
   1517                     .with_idempotency_key("idem-a"),
   1518             )
   1519             .await
   1520             .expect_err("conflict");
   1521         assert!(matches!(
   1522             conflict,
   1523             RadrootsOutboxError::IdempotencyConflict { .. }
   1524         ));
   1525 
   1526         let other_kind = outbox
   1527             .enqueue_operation(
   1528                 RadrootsOutboxOperationInput::new(
   1529                     "publish_post_reply",
   1530                     first_draft.clone(),
   1531                     vec![RELAY_PRIMARY_WSS.to_owned()],
   1532                     1_005,
   1533                 )
   1534                 .with_idempotency_key("idem-a"),
   1535             )
   1536             .await
   1537             .expect("other kind");
   1538         assert_eq!(other_kind.status, RadrootsOutboxEnqueueStatus::Inserted);
   1539 
   1540         let other_pubkey = outbox
   1541             .enqueue_operation(
   1542                 operation_input(post_draft(hex_64('b').as_str(), "hello"), 1_006)
   1543                     .with_idempotency_key("idem-a"),
   1544             )
   1545             .await
   1546             .expect("other pubkey");
   1547         assert_eq!(other_pubkey.status, RadrootsOutboxEnqueueStatus::Inserted);
   1548     }
   1549 
   1550     #[tokio::test]
   1551     async fn enqueue_idempotency_digest_sorts_relays_but_publish_order_is_preserved() {
   1552         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1553         let draft = post_draft(hex_64('a').as_str(), "hello");
   1554         let first = outbox
   1555             .enqueue_operation(
   1556                 RadrootsOutboxOperationInput::new(
   1557                     "publish_post",
   1558                     draft.clone(),
   1559                     vec![
   1560                         RELAY_SECONDARY_WSS.to_owned(),
   1561                         RELAY_PRIMARY_WSS.to_owned(),
   1562                         RELAY_SECONDARY_WSS.to_owned(),
   1563                     ],
   1564                     1_000,
   1565                 )
   1566                 .with_idempotency_key("idem-relay-order"),
   1567             )
   1568             .await
   1569             .expect("first enqueue");
   1570         let second = outbox
   1571             .enqueue_operation(
   1572                 RadrootsOutboxOperationInput::new(
   1573                     "publish_post",
   1574                     draft,
   1575                     vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
   1576                     1_001,
   1577                 )
   1578                 .with_idempotency_key("idem-relay-order"),
   1579             )
   1580             .await
   1581             .expect("second enqueue");
   1582 
   1583         assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Existing);
   1584         assert_eq!(first.idempotency_digest, second.idempotency_digest);
   1585 
   1586         let claimed = outbox
   1587             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   1588             .await
   1589             .expect("claim")
   1590             .expect("claimed event");
   1591         assert_eq!(
   1592             claimed.target_relays,
   1593             vec![RELAY_SECONDARY_WSS.to_owned(), RELAY_PRIMARY_WSS.to_owned()]
   1594         );
   1595     }
   1596 
   1597     #[tokio::test]
   1598     async fn enqueue_rejects_empty_target_relays_before_persistence() {
   1599         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1600         let draft = post_draft(hex_64('a').as_str(), "hello");
   1601 
   1602         let err = outbox
   1603             .enqueue_operation(
   1604                 RadrootsOutboxOperationInput::new("publish_post", draft, Vec::new(), 1_000)
   1605                     .with_idempotency_key("empty-relays"),
   1606             )
   1607             .await
   1608             .expect_err("empty relays");
   1609 
   1610         assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays));
   1611         assert_eq!(table_count(&outbox, "outbox_operation").await, 0);
   1612         assert_eq!(table_count(&outbox, "outbox_event").await, 0);
   1613         assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0);
   1614     }
   1615 
   1616     #[tokio::test]
   1617     async fn enqueue_signed_rejects_empty_target_relays_before_persistence() {
   1618         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1619         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed empty");
   1620         let signed_event =
   1621             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   1622 
   1623         let err = outbox
   1624             .enqueue_signed_operation(RadrootsOutboxSignedOperationInput::new(
   1625                 "publish_post",
   1626                 draft,
   1627                 signed_event,
   1628                 Vec::new(),
   1629                 false,
   1630                 1_007,
   1631                 1_000,
   1632             ))
   1633             .await
   1634             .expect_err("empty relays");
   1635 
   1636         assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays));
   1637         assert_eq!(table_count(&outbox, "outbox_operation").await, 0);
   1638         assert_eq!(table_count(&outbox, "outbox_event").await, 0);
   1639         assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0);
   1640     }
   1641 
   1642     #[tokio::test]
   1643     async fn enqueue_allows_explicitly_delegated_empty_target_relays() {
   1644         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1645         let draft = post_draft(hex_64('a').as_str(), "delegated empty");
   1646 
   1647         let receipt = outbox
   1648             .enqueue_operation(
   1649                 RadrootsOutboxOperationInput::new("publish_post", draft, Vec::new(), 1_000)
   1650                     .allow_empty_target_relays(),
   1651             )
   1652             .await
   1653             .expect("delegated empty relays");
   1654 
   1655         let event = outbox
   1656             .get_event(receipt.outbox_event_id)
   1657             .await
   1658             .expect("event")
   1659             .expect("event");
   1660         assert_eq!(event.accepted_quorum, 0);
   1661         assert_eq!(
   1662             outbox
   1663                 .relay_statuses(receipt.outbox_event_id)
   1664                 .await
   1665                 .expect("relay statuses"),
   1666             Vec::new()
   1667         );
   1668 
   1669         let claimed = outbox
   1670             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   1671             .await
   1672             .expect("claim")
   1673             .expect("claim");
   1674         assert_eq!(claimed.target_relays, Vec::<String>::new());
   1675     }
   1676 
   1677     #[tokio::test]
   1678     async fn enqueue_signed_allows_explicitly_delegated_empty_target_relays() {
   1679         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1680         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed delegated empty");
   1681         let signed_event =
   1682             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   1683 
   1684         let receipt = outbox
   1685             .enqueue_signed_operation(
   1686                 RadrootsOutboxSignedOperationInput::new(
   1687                     "publish_post",
   1688                     draft,
   1689                     signed_event.clone(),
   1690                     Vec::new(),
   1691                     false,
   1692                     1_007,
   1693                     1_000,
   1694                 )
   1695                 .allow_empty_target_relays(),
   1696             )
   1697             .await
   1698             .expect("delegated signed empty relays");
   1699 
   1700         let event = outbox
   1701             .get_event(receipt.outbox_event_id)
   1702             .await
   1703             .expect("event")
   1704             .expect("event");
   1705         assert_eq!(event.accepted_quorum, 0);
   1706         assert_eq!(event.signed_event, Some(signed_event.clone()));
   1707         assert_eq!(
   1708             outbox
   1709                 .relay_statuses(receipt.outbox_event_id)
   1710                 .await
   1711                 .expect("relay statuses"),
   1712             Vec::new()
   1713         );
   1714 
   1715         let claimed = outbox
   1716             .claim_next_ready_signed_event("publisher-a", "claim-a", 2_000, 1_000)
   1717             .await
   1718             .expect("claim")
   1719             .expect("claim");
   1720         assert_eq!(claimed.signed_event, Some(signed_event));
   1721         assert_eq!(claimed.target_relays, Vec::<String>::new());
   1722     }
   1723 
   1724     #[tokio::test]
   1725     async fn claim_next_ready_event_returns_none_when_no_work_is_ready() {
   1726         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1727 
   1728         assert!(
   1729             outbox
   1730                 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   1731                 .await
   1732                 .expect("claim")
   1733                 .is_none()
   1734         );
   1735         assert!(
   1736             outbox
   1737                 .claim_next_ready_signed_event("publisher-a", "claim-b", 2_000, 1_000)
   1738                 .await
   1739                 .expect("claim signed")
   1740                 .is_none()
   1741         );
   1742     }
   1743 
   1744     #[tokio::test]
   1745     async fn enqueue_accepts_single_and_multiple_target_relays() {
   1746         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1747         let single_draft = post_draft(hex_64('a').as_str(), "single");
   1748 
   1749         let single = outbox
   1750             .enqueue_operation(RadrootsOutboxOperationInput::new(
   1751                 "publish_post",
   1752                 single_draft,
   1753                 vec![RELAY_PRIMARY_WSS.to_owned()],
   1754                 1_000,
   1755             ))
   1756             .await
   1757             .expect("single relay");
   1758         let single_event = outbox
   1759             .get_event(single.outbox_event_id)
   1760             .await
   1761             .expect("single event")
   1762             .expect("single event");
   1763         assert_eq!(single_event.accepted_quorum, 1);
   1764         assert_eq!(
   1765             outbox
   1766                 .relay_statuses(single.outbox_event_id)
   1767                 .await
   1768                 .expect("single relay statuses")
   1769                 .len(),
   1770             1
   1771         );
   1772 
   1773         let multi_draft = post_draft(hex_64('b').as_str(), "multi");
   1774         let multi = outbox
   1775             .enqueue_operation(RadrootsOutboxOperationInput::new(
   1776                 "publish_post",
   1777                 multi_draft,
   1778                 vec![
   1779                     RELAY_PRIMARY_WSS.to_owned(),
   1780                     RELAY_SECONDARY_WSS.to_owned(),
   1781                     RELAY_PRIMARY_WSS.to_owned(),
   1782                 ],
   1783                 1_100,
   1784             ))
   1785             .await
   1786             .expect("multiple relays");
   1787         let multi_event = outbox
   1788             .get_event(multi.outbox_event_id)
   1789             .await
   1790             .expect("multi event")
   1791             .expect("multi event");
   1792         assert_eq!(multi_event.accepted_quorum, 2);
   1793         assert_eq!(
   1794             outbox
   1795                 .relay_statuses(multi.outbox_event_id)
   1796                 .await
   1797                 .expect("multi relay statuses")
   1798                 .len(),
   1799             2
   1800         );
   1801 
   1802         let zero_quorum_count: i64 =
   1803             sqlx::query_scalar("SELECT COUNT(*) FROM outbox_event WHERE accepted_quorum = 0")
   1804                 .fetch_one(outbox.pool())
   1805                 .await
   1806                 .expect("zero quorum count");
   1807         assert_eq!(zero_quorum_count, 0);
   1808     }
   1809 
   1810     #[tokio::test]
   1811     async fn enqueue_signed_operation_stores_pushable_signed_event_without_claim() {
   1812         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1813         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed");
   1814         let signed_event =
   1815             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   1816         let expected_raw_json = signed_event.raw_json.clone();
   1817 
   1818         let receipt = outbox
   1819             .enqueue_signed_operation(
   1820                 signed_operation_input(draft.clone(), signed_event.clone(), 1_000)
   1821                     .with_idempotency_key("signed-a"),
   1822             )
   1823             .await
   1824             .expect("signed enqueue");
   1825 
   1826         assert_eq!(receipt.status, RadrootsOutboxEnqueueStatus::Inserted);
   1827         assert_eq!(receipt.expected_event_id, draft.expected_event_id);
   1828 
   1829         let event = outbox
   1830             .get_event(receipt.outbox_event_id)
   1831             .await
   1832             .expect("event")
   1833             .expect("event");
   1834         assert_eq!(event.state, RadrootsOutboxEventState::Signed);
   1835         assert_eq!(event.signed_event, Some(signed_event.clone()));
   1836         assert_eq!(event.raw_event_json, Some(expected_raw_json));
   1837         assert!(event.event_store_ingested);
   1838         assert!(event.event_store_inserted);
   1839         assert_eq!(event.event_store_ingested_at_ms, Some(1_007));
   1840         assert_eq!(event.claim_token, None);
   1841         assert_eq!(event.claim_owner, None);
   1842         assert_eq!(event.claim_expires_at_ms, None);
   1843 
   1844         let statuses = outbox
   1845             .relay_statuses(receipt.outbox_event_id)
   1846             .await
   1847             .expect("statuses");
   1848         assert_eq!(statuses.len(), 2);
   1849         assert!(
   1850             statuses
   1851                 .iter()
   1852                 .all(|status| status.status == RadrootsOutboxRelayStatus::Pending)
   1853         );
   1854 
   1855         let claimed = outbox
   1856             .claim_next_ready_event("publisher-a", "claim-a", 2_000, 1_000)
   1857             .await
   1858             .expect("claim")
   1859             .expect("claimed");
   1860         assert_eq!(claimed.state, RadrootsOutboxEventState::Publishing);
   1861         assert_eq!(claimed.signed_event, Some(signed_event));
   1862         assert_eq!(
   1863             claimed.target_relays,
   1864             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()]
   1865         );
   1866     }
   1867 
   1868     #[tokio::test]
   1869     async fn claim_next_ready_signed_event_skips_unsigned_work() {
   1870         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1871         let unsigned = outbox
   1872             .enqueue_operation(operation_input(
   1873                 post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "unsigned"),
   1874                 900,
   1875             ))
   1876             .await
   1877             .expect("unsigned enqueue");
   1878         let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed-only");
   1879         let signed_event =
   1880             radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event");
   1881         let signed = outbox
   1882             .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_000))
   1883             .await
   1884             .expect("signed enqueue");
   1885 
   1886         let claimed = outbox
   1887             .claim_next_ready_signed_event("publisher-a", "claim-a", 2_000, 1_000)
   1888             .await
   1889             .expect("claim")
   1890             .expect("claimed");
   1891         assert_eq!(claimed.outbox_event_id, signed.outbox_event_id);
   1892         assert_eq!(claimed.state, RadrootsOutboxEventState::Publishing);
   1893         assert!(claimed.signed_event.is_some());
   1894 
   1895         let unsigned_event = outbox
   1896             .get_event(unsigned.outbox_event_id)
   1897             .await
   1898             .expect("unsigned event")
   1899             .expect("unsigned event");
   1900         assert_eq!(unsigned_event.state, RadrootsOutboxEventState::DraftQueued);
   1901         assert!(unsigned_event.claim_token.is_none());
   1902 
   1903         let signing_claim = outbox
   1904             .claim_next_ready_event("signer-a", "sign-a", 2_100, 1_100)
   1905             .await
   1906             .expect("sign claim")
   1907             .expect("sign claim");
   1908         assert_eq!(signing_claim.outbox_event_id, unsigned.outbox_event_id);
   1909         assert_eq!(signing_claim.state, RadrootsOutboxEventState::Signing);
   1910         assert!(signing_claim.signed_event.is_none());
   1911     }
   1912 
   1913     #[tokio::test]
   1914     async fn enqueue_signed_operation_idempotency_reuses_existing_signed_record() {
   1915         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1916         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "idem-signed");
   1917         let signed_event =
   1918             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   1919 
   1920         let first = outbox
   1921             .enqueue_signed_operation(
   1922                 signed_operation_input(draft.clone(), signed_event.clone(), 1_000)
   1923                     .with_idempotency_key("signed-idem"),
   1924             )
   1925             .await
   1926             .expect("first");
   1927         let second = outbox
   1928             .enqueue_signed_operation(
   1929                 signed_operation_input(draft.clone(), signed_event, 1_100)
   1930                     .with_idempotency_key("signed-idem"),
   1931             )
   1932             .await
   1933             .expect("second");
   1934 
   1935         assert_eq!(first.status, RadrootsOutboxEnqueueStatus::Inserted);
   1936         assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Existing);
   1937         assert_eq!(first.operation_id, second.operation_id);
   1938         assert_eq!(first.outbox_event_id, second.outbox_event_id);
   1939         assert_eq!(table_count(&outbox, "outbox_operation").await, 1);
   1940         assert_eq!(table_count(&outbox, "outbox_event").await, 1);
   1941 
   1942         let changed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "changed");
   1943         let changed_signed =
   1944             radroots_nostr_sign_frozen_draft(&fixture_keys(), &changed_draft).expect("signed");
   1945         let conflict = outbox
   1946             .enqueue_signed_operation(
   1947                 signed_operation_input(changed_draft, changed_signed, 1_200)
   1948                     .with_idempotency_key("signed-idem"),
   1949             )
   1950             .await
   1951             .expect_err("conflict");
   1952         assert!(matches!(
   1953             conflict,
   1954             RadrootsOutboxError::IdempotencyConflict { .. }
   1955         ));
   1956     }
   1957 
   1958     #[tokio::test]
   1959     async fn enqueue_signed_operation_rejects_mismatched_signed_event() {
   1960         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1961         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "trusted");
   1962         let other_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "other");
   1963         let signed_event =
   1964             radroots_nostr_sign_frozen_draft(&fixture_keys(), &other_draft).expect("signed event");
   1965 
   1966         let error = outbox
   1967             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   1968             .await
   1969             .expect_err("mismatch");
   1970 
   1971         assert!(matches!(
   1972             error,
   1973             RadrootsOutboxError::SignedEventDraftMismatch(_)
   1974         ));
   1975         assert_eq!(table_count(&outbox, "outbox_operation").await, 0);
   1976         assert_eq!(table_count(&outbox, "outbox_event").await, 0);
   1977     }
   1978 
   1979     #[tokio::test]
   1980     async fn enqueue_signed_operation_rejects_event_id_mismatch() {
   1981         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   1982         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "bad-id");
   1983         let mut signed_event =
   1984             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   1985         signed_event.id = hex_64('f');
   1986 
   1987         let error = outbox
   1988             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   1989             .await
   1990             .expect_err("mismatch");
   1991 
   1992         assert!(matches!(
   1993             error,
   1994             RadrootsOutboxError::SignedEventDraftMismatch(_)
   1995         ));
   1996         assert_eq!(table_count(&outbox, "outbox_operation").await, 0);
   1997         assert_eq!(table_count(&outbox, "outbox_event").await, 0);
   1998     }
   1999 
   2000     #[tokio::test]
   2001     async fn complete_signing_rejects_signed_event_id_mismatch() {
   2002         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2003         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "claimed draft");
   2004         let receipt = outbox
   2005             .enqueue_operation(operation_input(draft, 1_000))
   2006             .await
   2007             .expect("enqueue");
   2008         let claimed = outbox
   2009             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   2010             .await
   2011             .expect("claim")
   2012             .expect("claim");
   2013         let other_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "other draft");
   2014         let signed_event =
   2015             radroots_nostr_sign_frozen_draft(&fixture_keys(), &other_draft).expect("signed event");
   2016 
   2017         let err = outbox
   2018             .complete_signing(
   2019                 receipt.outbox_event_id,
   2020                 claimed.claim_token.as_str(),
   2021                 signed_event.clone(),
   2022                 1_100,
   2023             )
   2024             .await
   2025             .expect_err("event id mismatch");
   2026 
   2027         assert_eq!(
   2028             err.to_string(),
   2029             format!(
   2030                 "Signed event ID mismatch: expected {}, got {}",
   2031                 receipt.expected_event_id, signed_event.id
   2032             )
   2033         );
   2034     }
   2035 
   2036     #[tokio::test]
   2037     async fn claim_token_guards_updates_and_expired_signing_claim_recovers() {
   2038         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2039         let draft = post_draft(hex_64('a').as_str(), "hello");
   2040         let receipt = outbox
   2041             .enqueue_operation(operation_input(draft, 1_000))
   2042             .await
   2043             .expect("enqueue");
   2044 
   2045         let claimed = outbox
   2046             .claim_next_ready_event("worker-a", "claim-a", 1_100, 1_000)
   2047             .await
   2048             .expect("claim")
   2049             .expect("claimed event");
   2050         assert_eq!(claimed.state, RadrootsOutboxEventState::Signing);
   2051         assert_eq!(claimed.attempt_count, 1);
   2052         assert_eq!(
   2053             claimed.target_relays,
   2054             vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()]
   2055         );
   2056 
   2057         let unavailable = outbox
   2058             .claim_next_ready_event("worker-b", "claim-b", 1_100, 1_050)
   2059             .await
   2060             .expect("claim");
   2061         assert!(unavailable.is_none());
   2062 
   2063         let wrong_token = outbox
   2064             .mark_sign_retryable(
   2065                 receipt.outbox_event_id,
   2066                 "claim-b",
   2067                 "sign failed",
   2068                 1_200,
   2069                 1_100,
   2070             )
   2071             .await
   2072             .expect_err("wrong token");
   2073         assert!(matches!(
   2074             wrong_token,
   2075             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2076         ));
   2077 
   2078         let recovered = outbox.recover_expired_claims(1_101).await.expect("recover");
   2079         assert_eq!(recovered, 1);
   2080 
   2081         let event = outbox
   2082             .get_event(receipt.outbox_event_id)
   2083             .await
   2084             .expect("event")
   2085             .expect("event");
   2086         assert_eq!(event.state, RadrootsOutboxEventState::SignRetryable);
   2087         assert_eq!(event.attempt_count, 1);
   2088         assert!(event.claim_token.is_none());
   2089 
   2090         let reclaimed = outbox
   2091             .claim_next_ready_event("worker-b", "claim-b", 1_400, 1_200)
   2092             .await
   2093             .expect("claim")
   2094             .expect("reclaimed");
   2095         assert_eq!(reclaimed.state, RadrootsOutboxEventState::Signing);
   2096         assert_eq!(reclaimed.attempt_count, 2);
   2097     }
   2098 
   2099     #[tokio::test]
   2100     async fn claimed_mutations_reject_stale_tokens_without_state_changes() {
   2101         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2102         let event_store = RadrootsEventStore::open_memory()
   2103             .await
   2104             .expect("event store");
   2105         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "hello");
   2106         let receipt = outbox
   2107             .enqueue_operation(operation_input(draft, 1_000))
   2108             .await
   2109             .expect("enqueue");
   2110 
   2111         let first_claim = outbox
   2112             .claim_next_ready_event("worker-a", "claim-a", 1_100, 1_000)
   2113             .await
   2114             .expect("claim")
   2115             .expect("claim");
   2116         let signed = complete_claimed_signing(&outbox, &first_claim, &fixture_keys(), 1_050).await;
   2117         outbox.recover_expired_claims(1_101).await.expect("recover");
   2118         let second_claim = outbox
   2119             .claim_next_ready_event("worker-b", "claim-b", 1_500, 1_200)
   2120             .await
   2121             .expect("claim")
   2122             .expect("claim");
   2123         assert_eq!(second_claim.state, RadrootsOutboxEventState::Publishing);
   2124 
   2125         let retry_with_stale_token = outbox
   2126             .mark_publish_retryable(
   2127                 receipt.outbox_event_id,
   2128                 "claim-a",
   2129                 "stale retry",
   2130                 1_600,
   2131                 1_300,
   2132             )
   2133             .await
   2134             .expect_err("stale retry token");
   2135         assert!(matches!(
   2136             retry_with_stale_token,
   2137             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2138         ));
   2139 
   2140         let relay_with_stale_token = outbox
   2141             .mark_relay_accepted(receipt.outbox_event_id, "claim-a", RELAY_PRIMARY_WSS, 1_300)
   2142             .await
   2143             .expect_err("stale relay token");
   2144         assert!(matches!(
   2145             relay_with_stale_token,
   2146             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2147         ));
   2148 
   2149         let ingest_with_current_token = outbox
   2150             .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-b", 1_350)
   2151             .await
   2152             .expect("current ingest");
   2153         assert_eq!(ingest_with_current_token.event_id, signed.id);
   2154 
   2155         let event = outbox
   2156             .get_event(receipt.outbox_event_id)
   2157             .await
   2158             .expect("event")
   2159             .expect("event");
   2160         assert_eq!(event.state, RadrootsOutboxEventState::Publishing);
   2161         assert_eq!(event.claim_token.as_deref(), Some("claim-b"));
   2162 
   2163         let statuses = outbox
   2164             .relay_statuses(receipt.outbox_event_id)
   2165             .await
   2166             .expect("statuses");
   2167         assert!(
   2168             statuses
   2169                 .iter()
   2170                 .all(|status| status.status == RadrootsOutboxRelayStatus::Pending)
   2171         );
   2172     }
   2173 
   2174     #[tokio::test]
   2175     async fn claimed_update_paths_report_missing_events_and_wrong_tokens() {
   2176         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2177 
   2178         let missing = outbox
   2179             .mark_sign_retryable(999, "missing-claim", "missing", 1_200, 1_100)
   2180             .await
   2181             .expect_err("missing event");
   2182         assert!(matches!(missing, RadrootsOutboxError::EventNotFound(999)));
   2183         let missing_publish = outbox
   2184             .complete_publish_attempt(999, "missing-claim", "retryable", "terminal", 1_300, 1_200)
   2185             .await
   2186             .expect_err("missing publish event");
   2187         assert!(matches!(
   2188             missing_publish,
   2189             RadrootsOutboxError::EventNotFound(999)
   2190         ));
   2191 
   2192         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "wrong token");
   2193         let receipt = outbox
   2194             .enqueue_operation(operation_input(draft, 1_000))
   2195             .await
   2196             .expect("enqueue");
   2197         outbox
   2198             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   2199             .await
   2200             .expect("claim")
   2201             .expect("claim");
   2202 
   2203         let wrong_token = outbox
   2204             .complete_publish_attempt(
   2205                 receipt.outbox_event_id,
   2206                 "claim-b",
   2207                 "retryable",
   2208                 "terminal",
   2209                 2_500,
   2210                 2_100,
   2211             )
   2212             .await
   2213             .expect_err("wrong token");
   2214         assert!(matches!(
   2215             wrong_token,
   2216             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2217         ));
   2218     }
   2219 
   2220     #[tokio::test]
   2221     async fn sign_retryable_update_succeeds_and_reports_ignored_update_with_current_token() {
   2222         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2223         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "retryable success");
   2224         let receipt = outbox
   2225             .enqueue_operation(operation_input(draft, 1_000))
   2226             .await
   2227             .expect("enqueue");
   2228         outbox
   2229             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   2230             .await
   2231             .expect("claim")
   2232             .expect("claim");
   2233 
   2234         outbox
   2235             .mark_sign_retryable(receipt.outbox_event_id, "claim-a", "retry", 1_500, 1_100)
   2236             .await
   2237             .expect("mark retryable");
   2238         let event = outbox
   2239             .get_event(receipt.outbox_event_id)
   2240             .await
   2241             .expect("event")
   2242             .expect("event");
   2243         assert_eq!(event.state, RadrootsOutboxEventState::SignRetryable);
   2244 
   2245         let ignored_outbox = RadrootsOutbox::open_memory().await.expect("ignored open");
   2246         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored retryable");
   2247         let ignored_receipt = ignored_outbox
   2248             .enqueue_operation(operation_input(draft, 2_000))
   2249             .await
   2250             .expect("enqueue ignored");
   2251         ignored_outbox
   2252             .claim_next_ready_event("worker-b", "claim-b", 3_000, 2_000)
   2253             .await
   2254             .expect("claim ignored")
   2255             .expect("claim ignored");
   2256         sqlx::query(
   2257             "CREATE TEMP TRIGGER ignore_sign_retry_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'sign_retryable' BEGIN SELECT RAISE(IGNORE); END",
   2258         )
   2259         .execute(ignored_outbox.pool())
   2260         .await
   2261         .expect("retry trigger");
   2262         let ignored = ignored_outbox
   2263             .mark_sign_retryable(
   2264                 ignored_receipt.outbox_event_id,
   2265                 "claim-b",
   2266                 "ignored retry",
   2267                 2_500,
   2268                 2_100,
   2269             )
   2270             .await
   2271             .expect_err("ignored retryable update");
   2272         assert!(matches!(
   2273             ignored,
   2274             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2275         ));
   2276     }
   2277 
   2278     #[tokio::test]
   2279     async fn ignored_sqlite_updates_preserve_race_guards() {
   2280         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2281         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored claim");
   2282         outbox
   2283             .enqueue_operation(operation_input(draft, 1_000))
   2284             .await
   2285             .expect("enqueue");
   2286         let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored signed claim");
   2287         let signed_event =
   2288             radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event");
   2289         outbox
   2290             .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_001))
   2291             .await
   2292             .expect("signed enqueue");
   2293         sqlx::query(
   2294             "CREATE TEMP TRIGGER ignore_claim_update BEFORE UPDATE OF claim_token ON outbox_event WHEN NEW.claim_token IN ('blocked-claim', 'blocked-signed') BEGIN SELECT RAISE(IGNORE); END",
   2295         )
   2296         .execute(outbox.pool())
   2297         .await
   2298         .expect("claim trigger");
   2299 
   2300         assert!(
   2301             outbox
   2302                 .claim_next_ready_event("worker-a", "blocked-claim", 2_000, 1_000)
   2303                 .await
   2304                 .expect("claim")
   2305                 .is_none()
   2306         );
   2307         assert!(
   2308             outbox
   2309                 .claim_next_ready_signed_event("publisher-a", "blocked-signed", 2_000, 1_001)
   2310                 .await
   2311                 .expect("claim signed")
   2312                 .is_none()
   2313         );
   2314 
   2315         let publish_outbox = RadrootsOutbox::open_memory().await.expect("publish open");
   2316         let publish_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored publish");
   2317         let publish_signed = radroots_nostr_sign_frozen_draft(&fixture_keys(), &publish_draft)
   2318             .expect("signed event");
   2319         let publish_receipt = publish_outbox
   2320             .enqueue_signed_operation(signed_operation_input(publish_draft, publish_signed, 1_100))
   2321             .await
   2322             .expect("publish enqueue");
   2323         let publish_claim = publish_outbox
   2324             .claim_next_ready_signed_event("publisher-b", "publish-claim", 3_000, 1_100)
   2325             .await
   2326             .expect("claim publish")
   2327             .expect("claim publish");
   2328         publish_outbox
   2329             .mark_relay_accepted(
   2330                 publish_receipt.outbox_event_id,
   2331                 publish_claim.claim_token.as_str(),
   2332                 RELAY_PRIMARY_WSS,
   2333                 1_150,
   2334             )
   2335             .await
   2336             .expect("primary accepted");
   2337         publish_outbox
   2338             .mark_relay_accepted(
   2339                 publish_receipt.outbox_event_id,
   2340                 publish_claim.claim_token.as_str(),
   2341                 RELAY_SECONDARY_WSS,
   2342                 1_160,
   2343             )
   2344             .await
   2345             .expect("secondary accepted");
   2346         sqlx::query(
   2347             "CREATE TEMP TRIGGER ignore_published_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'published' BEGIN SELECT RAISE(IGNORE); END",
   2348         )
   2349         .execute(publish_outbox.pool())
   2350         .await
   2351         .expect("publish trigger");
   2352         let ignored_publish = publish_outbox
   2353             .complete_publish_attempt(
   2354                 publish_receipt.outbox_event_id,
   2355                 publish_claim.claim_token.as_str(),
   2356                 "retryable",
   2357                 "terminal",
   2358                 2_500,
   2359                 1_200,
   2360             )
   2361             .await
   2362             .expect_err("ignored publish update");
   2363         assert!(matches!(
   2364             ignored_publish,
   2365             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2366         ));
   2367 
   2368         let cancel_outbox = RadrootsOutbox::open_memory().await.expect("cancel open");
   2369         let cancel_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored cancel");
   2370         let cancel_receipt = cancel_outbox
   2371             .enqueue_operation(operation_input(cancel_draft, 1_200))
   2372             .await
   2373             .expect("cancel enqueue");
   2374         let cancel_claim = cancel_outbox
   2375             .claim_next_ready_event("worker-b", "cancel-claim", 3_000, 1_200)
   2376             .await
   2377             .expect("cancel claim")
   2378             .expect("cancel claim");
   2379         sqlx::query(
   2380             "CREATE TEMP TRIGGER ignore_cancel_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'cancelled' BEGIN SELECT RAISE(IGNORE); END",
   2381         )
   2382         .execute(cancel_outbox.pool())
   2383         .await
   2384         .expect("cancel trigger");
   2385         let ignored_cancel = cancel_outbox
   2386             .cancel_claimed_event(
   2387                 cancel_receipt.outbox_event_id,
   2388                 cancel_claim.claim_token.as_str(),
   2389                 1_300,
   2390             )
   2391             .await
   2392             .expect_err("ignored cancel update");
   2393         assert!(matches!(
   2394             ignored_cancel,
   2395             RadrootsOutboxError::ClaimTokenMismatch { .. }
   2396         ));
   2397     }
   2398 
   2399     #[tokio::test]
   2400     async fn signed_events_are_reused_after_claim_recovery() {
   2401         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2402         let (receipt, claimed) = enqueue_signed_fixture(&outbox).await;
   2403         let keys = fixture_keys();
   2404 
   2405         let signed = complete_claimed_signing(&outbox, &claimed, &keys, 1_100).await;
   2406         assert_eq!(signed.id, receipt.expected_event_id);
   2407 
   2408         let recovered = outbox.recover_expired_claims(2_001).await.expect("recover");
   2409         assert_eq!(recovered, 1);
   2410 
   2411         let publish_claim = outbox
   2412             .claim_next_ready_event("publisher-a", "claim-b", 3_000, 2_100)
   2413             .await
   2414             .expect("claim")
   2415             .expect("publish claim");
   2416         assert_eq!(publish_claim.state, RadrootsOutboxEventState::Publishing);
   2417         assert_eq!(publish_claim.signed_event.as_ref(), Some(&signed));
   2418 
   2419         let reused = complete_claimed_signing(&outbox, &publish_claim, &keys, 2_200).await;
   2420         assert_eq!(reused, signed);
   2421 
   2422         let event = outbox
   2423             .get_event(receipt.outbox_event_id)
   2424             .await
   2425             .expect("event")
   2426             .expect("event");
   2427         assert_eq!(event.state, RadrootsOutboxEventState::Publishing);
   2428         assert_eq!(event.signed_event.as_ref(), Some(&signed));
   2429     }
   2430 
   2431     #[tokio::test]
   2432     async fn local_signed_event_ingest_is_idempotent_without_relay_observation() {
   2433         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2434         let event_store = RadrootsEventStore::open_memory()
   2435             .await
   2436             .expect("event store");
   2437         let (receipt, claimed) = enqueue_signed_fixture(&outbox).await;
   2438         let keys = fixture_keys();
   2439         let signed = complete_claimed_signing(&outbox, &claimed, &keys, 1_100).await;
   2440 
   2441         let first = outbox
   2442             .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-a", 1_200)
   2443             .await
   2444             .expect("first ingest");
   2445         assert_eq!(first.outbox_event_id, receipt.outbox_event_id);
   2446         assert_eq!(first.event_id, signed.id);
   2447         assert!(!first.already_ingested);
   2448         assert!(first.event_store_inserted);
   2449 
   2450         let stored = event_store
   2451             .get_event(signed.id.as_str())
   2452             .await
   2453             .expect("stored event");
   2454         assert!(stored.is_some());
   2455 
   2456         let observations = event_store
   2457             .observations_for_event(signed.id.as_str())
   2458             .await
   2459             .expect("observations");
   2460         assert!(observations.is_empty());
   2461 
   2462         let second = outbox
   2463             .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-a", 1_300)
   2464             .await
   2465             .expect("second ingest");
   2466         assert!(second.already_ingested);
   2467         assert!(!second.event_store_inserted);
   2468 
   2469         let event = outbox
   2470             .get_event(receipt.outbox_event_id)
   2471             .await
   2472             .expect("event")
   2473             .expect("event");
   2474         assert_eq!(event.state, RadrootsOutboxEventState::Publishing);
   2475         assert!(event.event_store_ingested);
   2476         assert!(event.event_store_inserted);
   2477         assert_eq!(event.event_store_ingested_at_ms, Some(1_200));
   2478 
   2479         let recovered = outbox.recover_expired_claims(2_001).await.expect("recover");
   2480         assert_eq!(recovered, 1);
   2481 
   2482         let event = outbox
   2483             .get_event(receipt.outbox_event_id)
   2484             .await
   2485             .expect("event")
   2486             .expect("event");
   2487         assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
   2488         assert!(event.claim_token.is_none());
   2489 
   2490         let reclaimed = outbox
   2491             .claim_next_ready_event("publisher-a", "claim-b", 3_000, 2_100)
   2492             .await
   2493             .expect("claim")
   2494             .expect("publish claim");
   2495         assert_eq!(reclaimed.state, RadrootsOutboxEventState::Publishing);
   2496         assert_eq!(reclaimed.signed_event.as_ref(), Some(&signed));
   2497     }
   2498 
   2499     #[tokio::test]
   2500     async fn terminal_and_cancelled_claimed_events_are_not_reclaimable() {
   2501         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2502         let terminal_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal");
   2503         let terminal_receipt = outbox
   2504             .enqueue_operation(operation_input(terminal_draft, 1_000))
   2505             .await
   2506             .expect("enqueue");
   2507         outbox
   2508             .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
   2509             .await
   2510             .expect("claim")
   2511             .expect("claim");
   2512         outbox
   2513             .mark_publish_failed_terminal(
   2514                 terminal_receipt.outbox_event_id,
   2515                 "claim-a",
   2516                 "terminal failure",
   2517                 1_100,
   2518             )
   2519             .await
   2520             .expect("terminal");
   2521 
   2522         let terminal_event = outbox
   2523             .get_event(terminal_receipt.outbox_event_id)
   2524             .await
   2525             .expect("event")
   2526             .expect("event");
   2527         assert_eq!(
   2528             terminal_event.state,
   2529             RadrootsOutboxEventState::FailedTerminal
   2530         );
   2531         assert!(terminal_event.state.is_terminal());
   2532         assert!(terminal_event.claim_token.is_none());
   2533         let terminal_operation = outbox
   2534             .get_operation(terminal_receipt.operation_id)
   2535             .await
   2536             .expect("operation")
   2537             .expect("operation");
   2538         assert_eq!(
   2539             terminal_operation.status,
   2540             RadrootsOutboxOperationStatus::FailedTerminal
   2541         );
   2542         assert!(
   2543             outbox
   2544                 .claim_next_ready_event("worker-b", "claim-b", 2_000, 1_200)
   2545                 .await
   2546                 .expect("claim")
   2547                 .is_none()
   2548         );
   2549 
   2550         let cancelled_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "cancelled");
   2551         let cancelled_receipt = outbox
   2552             .enqueue_operation(operation_input(cancelled_draft, 2_000))
   2553             .await
   2554             .expect("enqueue");
   2555         outbox
   2556             .claim_next_ready_event("worker-c", "claim-c", 3_000, 2_000)
   2557             .await
   2558             .expect("claim")
   2559             .expect("claim");
   2560         outbox
   2561             .cancel_claimed_event(cancelled_receipt.outbox_event_id, "claim-c", 2_100)
   2562             .await
   2563             .expect("cancel");
   2564         let cancelled_event = outbox
   2565             .get_event(cancelled_receipt.outbox_event_id)
   2566             .await
   2567             .expect("event")
   2568             .expect("event");
   2569         assert_eq!(cancelled_event.state, RadrootsOutboxEventState::Cancelled);
   2570         assert!(cancelled_event.state.is_terminal());
   2571         assert!(cancelled_event.claim_token.is_none());
   2572         let cancelled_operation = outbox
   2573             .get_operation(cancelled_receipt.operation_id)
   2574             .await
   2575             .expect("operation")
   2576             .expect("operation");
   2577         assert_eq!(
   2578             cancelled_operation.status,
   2579             RadrootsOutboxOperationStatus::Cancelled
   2580         );
   2581         assert!(
   2582             outbox
   2583                 .claim_next_ready_event("worker-d", "claim-d", 3_000, 2_200)
   2584                 .await
   2585                 .expect("claim")
   2586                 .is_none()
   2587         );
   2588     }
   2589 
   2590     #[tokio::test]
   2591     async fn terminal_publish_attempt_fails_operation_when_quorum_cannot_be_met() {
   2592         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2593         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal quorum");
   2594         let receipt = outbox
   2595             .enqueue_operation(RadrootsOutboxOperationInput::new(
   2596                 "publish_post",
   2597                 draft,
   2598                 vec![
   2599                     RELAY_PRIMARY_WSS.to_owned(),
   2600                     RELAY_SECONDARY_WSS.to_owned(),
   2601                     "wss://relay-3.example.com".to_owned(),
   2602                 ],
   2603                 1_000,
   2604             ))
   2605             .await
   2606             .expect("enqueue");
   2607         let sign_claim = outbox
   2608             .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
   2609             .await
   2610             .expect("claim")
   2611             .expect("claim");
   2612         complete_claimed_signing(&outbox, &sign_claim, &fixture_keys(), 1_100).await;
   2613         outbox.recover_expired_claims(2_001).await.expect("recover");
   2614         outbox
   2615             .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
   2616             .await
   2617             .expect("claim")
   2618             .expect("claim");
   2619         outbox
   2620             .set_publish_quorum(receipt.outbox_event_id, "publish-a", 3, 2_200)
   2621             .await
   2622             .expect("quorum");
   2623         outbox
   2624             .mark_relay_accepted(
   2625                 receipt.outbox_event_id,
   2626                 "publish-a",
   2627                 RELAY_PRIMARY_WSS,
   2628                 2_250,
   2629             )
   2630             .await
   2631             .expect("accepted");
   2632         outbox
   2633             .mark_relay_failed_terminal(
   2634                 receipt.outbox_event_id,
   2635                 "publish-a",
   2636                 RELAY_SECONDARY_WSS,
   2637                 "restricted: denied",
   2638                 2_260,
   2639             )
   2640             .await
   2641             .expect("terminal");
   2642         outbox
   2643             .mark_relay_failed_terminal(
   2644                 receipt.outbox_event_id,
   2645                 "publish-a",
   2646                 "wss://relay-3.example.com",
   2647                 "blocked: denied",
   2648                 2_270,
   2649             )
   2650             .await
   2651             .expect("terminal");
   2652 
   2653         let state = outbox
   2654             .complete_publish_attempt(
   2655                 receipt.outbox_event_id,
   2656                 "publish-a",
   2657                 "retryable",
   2658                 "terminal",
   2659                 2_500,
   2660                 2_300,
   2661             )
   2662             .await
   2663             .expect("complete attempt");
   2664         assert_eq!(state, RadrootsOutboxEventState::FailedTerminal);
   2665         let event = outbox
   2666             .get_event(receipt.outbox_event_id)
   2667             .await
   2668             .expect("event")
   2669             .expect("event");
   2670         assert_eq!(event.state, RadrootsOutboxEventState::FailedTerminal);
   2671         assert!(event.claim_token.is_none());
   2672         assert_eq!(event.last_error.as_deref(), Some("terminal"));
   2673         let operation = outbox
   2674             .get_operation(receipt.operation_id)
   2675             .await
   2676             .expect("operation")
   2677             .expect("operation");
   2678         assert_eq!(
   2679             operation.status,
   2680             RadrootsOutboxOperationStatus::FailedTerminal
   2681         );
   2682         assert!(
   2683             outbox
   2684                 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_400)
   2685                 .await
   2686                 .expect("claim")
   2687                 .is_none()
   2688         );
   2689     }
   2690 
   2691     #[tokio::test]
   2692     async fn publish_attempt_marks_event_published_after_acceptance_quorum() {
   2693         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2694         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "published quorum");
   2695         let signed_event =
   2696             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   2697         let receipt = outbox
   2698             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   2699             .await
   2700             .expect("enqueue");
   2701         let claimed = outbox
   2702             .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000)
   2703             .await
   2704             .expect("claim")
   2705             .expect("claim");
   2706 
   2707         outbox
   2708             .mark_relay_accepted(
   2709                 receipt.outbox_event_id,
   2710                 claimed.claim_token.as_str(),
   2711                 RELAY_PRIMARY_WSS,
   2712                 1_100,
   2713             )
   2714             .await
   2715             .expect("primary accepted");
   2716         outbox
   2717             .mark_relay_accepted(
   2718                 receipt.outbox_event_id,
   2719                 claimed.claim_token.as_str(),
   2720                 RELAY_SECONDARY_WSS,
   2721                 1_110,
   2722             )
   2723             .await
   2724             .expect("secondary accepted");
   2725 
   2726         let state = outbox
   2727             .complete_publish_attempt(
   2728                 receipt.outbox_event_id,
   2729                 claimed.claim_token.as_str(),
   2730                 "retryable",
   2731                 "terminal",
   2732                 2_500,
   2733                 1_200,
   2734             )
   2735             .await
   2736             .expect("complete attempt");
   2737         let event = outbox
   2738             .get_event(receipt.outbox_event_id)
   2739             .await
   2740             .expect("event")
   2741             .expect("event");
   2742         let operation = outbox
   2743             .get_operation(receipt.operation_id)
   2744             .await
   2745             .expect("operation")
   2746             .expect("operation");
   2747 
   2748         assert_eq!(state, RadrootsOutboxEventState::Published);
   2749         assert_eq!(event.state, RadrootsOutboxEventState::Published);
   2750         assert_eq!(event.next_attempt_after_ms, 1_200);
   2751         assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete);
   2752     }
   2753 
   2754     #[tokio::test]
   2755     async fn publish_attempt_remains_retryable_when_relay_work_remains() {
   2756         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2757         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "retryable quorum");
   2758         let signed_event =
   2759             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   2760         let receipt = outbox
   2761             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   2762             .await
   2763             .expect("enqueue");
   2764         let claimed = outbox
   2765             .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000)
   2766             .await
   2767             .expect("claim")
   2768             .expect("claim");
   2769 
   2770         outbox
   2771             .mark_relay_failed_retryable(
   2772                 receipt.outbox_event_id,
   2773                 claimed.claim_token.as_str(),
   2774                 RELAY_PRIMARY_WSS,
   2775                 "timeout",
   2776                 1_100,
   2777             )
   2778             .await
   2779             .expect("retryable relay");
   2780 
   2781         let state = outbox
   2782             .complete_publish_attempt(
   2783                 receipt.outbox_event_id,
   2784                 claimed.claim_token.as_str(),
   2785                 "retryable error",
   2786                 "terminal",
   2787                 2_500,
   2788                 1_200,
   2789             )
   2790             .await
   2791             .expect("complete attempt");
   2792         let event = outbox
   2793             .get_event(receipt.outbox_event_id)
   2794             .await
   2795             .expect("event")
   2796             .expect("event");
   2797         let operation = outbox
   2798             .get_operation(receipt.operation_id)
   2799             .await
   2800             .expect("operation")
   2801             .expect("operation");
   2802 
   2803         assert_eq!(state, RadrootsOutboxEventState::PublishRetryable);
   2804         assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
   2805         assert_eq!(event.last_error.as_deref(), Some("retryable error"));
   2806         assert_eq!(event.next_attempt_after_ms, 2_500);
   2807         assert_eq!(operation.status, RadrootsOutboxOperationStatus::Queued);
   2808     }
   2809 
   2810     #[tokio::test]
   2811     async fn publish_attempt_remains_retryable_when_pending_relay_work_remains() {
   2812         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2813         let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "pending quorum");
   2814         let signed_event =
   2815             radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event");
   2816         let receipt = outbox
   2817             .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
   2818             .await
   2819             .expect("enqueue");
   2820         let claimed = outbox
   2821             .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000)
   2822             .await
   2823             .expect("claim")
   2824             .expect("claim");
   2825 
   2826         outbox
   2827             .mark_relay_accepted(
   2828                 receipt.outbox_event_id,
   2829                 claimed.claim_token.as_str(),
   2830                 RELAY_PRIMARY_WSS,
   2831                 1_100,
   2832             )
   2833             .await
   2834             .expect("accepted relay");
   2835 
   2836         let state = outbox
   2837             .complete_publish_attempt(
   2838                 receipt.outbox_event_id,
   2839                 claimed.claim_token.as_str(),
   2840                 "pending relay",
   2841                 "terminal",
   2842                 2_500,
   2843                 1_200,
   2844             )
   2845             .await
   2846             .expect("complete attempt");
   2847         let event = outbox
   2848             .get_event(receipt.outbox_event_id)
   2849             .await
   2850             .expect("event")
   2851             .expect("event");
   2852 
   2853         assert_eq!(state, RadrootsOutboxEventState::PublishRetryable);
   2854         assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
   2855         assert_eq!(event.last_error.as_deref(), Some("pending relay"));
   2856         assert_eq!(event.next_attempt_after_ms, 2_500);
   2857     }
   2858 
   2859     #[tokio::test]
   2860     async fn smoke_outbox_claim_cancel_cycles_complete_one_thousand_events() {
   2861         let outbox = RadrootsOutbox::open_memory().await.expect("open");
   2862         let mut receipts = Vec::new();
   2863         for index in 0..1_000 {
   2864             let draft = post_draft(
   2865                 FIXTURE_ALICE_PUBLIC_KEY_HEX,
   2866                 format!("claim-cycle-{index}").as_str(),
   2867             );
   2868             let receipt = outbox
   2869                 .enqueue_operation(operation_input(draft, 1_000 + index))
   2870                 .await
   2871                 .expect("enqueue");
   2872             receipts.push(receipt);
   2873         }
   2874 
   2875         for index in 0..1_000 {
   2876             let claim_token = format!("claim-{index}");
   2877             let claimed = outbox
   2878                 .claim_next_ready_event(
   2879                     "smoke-worker",
   2880                     claim_token.as_str(),
   2881                     10_000 + index,
   2882                     2_000 + index,
   2883                 )
   2884                 .await
   2885                 .expect("claim")
   2886                 .expect("claimed");
   2887             outbox
   2888                 .cancel_claimed_event(claimed.outbox_event_id, claim_token.as_str(), 3_000 + index)
   2889                 .await
   2890                 .expect("cancel");
   2891         }
   2892 
   2893         for receipt in receipts {
   2894             let event = outbox
   2895                 .get_event(receipt.outbox_event_id)
   2896                 .await
   2897                 .expect("event")
   2898                 .expect("event");
   2899             assert_eq!(event.state, RadrootsOutboxEventState::Cancelled);
   2900             assert!(event.claim_token.is_none());
   2901             let operation = outbox
   2902                 .get_operation(receipt.operation_id)
   2903                 .await
   2904                 .expect("operation")
   2905                 .expect("operation");
   2906             assert_eq!(operation.status, RadrootsOutboxOperationStatus::Cancelled);
   2907         }
   2908 
   2909         assert!(
   2910             outbox
   2911                 .claim_next_ready_event("smoke-worker", "claim-final", 20_000, 20_000)
   2912                 .await
   2913                 .expect("claim")
   2914                 .is_none()
   2915         );
   2916     }
   2917 }