lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 7c4c3baa07a1cd81f64122a5dc85de0c3ccae1b3
parent c52205541e3dff66a1f5f0c9fb4821ea6d0bbc72
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 05:01:08 -0700

outbox: enforce publish terminality

- add failed-terminal and cancelled outbox event and operation states
- guard claimed event and relay mutations with atomic claim-token updates
- resolve publish attempts by accepted quorum after recording relay diagnostics
- cover stale token, terminal, cancellation, and quorum publish flows

Diffstat:
Mcrates/outbox/migrations/0001_outbox.up.sql | 8++++----
Mcrates/outbox/src/model.rs | 19+++++++++++++++++++
Mcrates/outbox/src/store.rs | 598+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Mcrates/relay_transport/src/outbox.rs | 21++++++++++-----------
Mcrates/relay_transport/tests/transport.rs | 111+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 688 insertions(+), 69 deletions(-)

diff --git a/crates/outbox/migrations/0001_outbox.up.sql b/crates/outbox/migrations/0001_outbox.up.sql @@ -4,7 +4,7 @@ CREATE TABLE outbox_operation ( expected_pubkey TEXT NOT NULL, idempotency_key TEXT, idempotency_digest TEXT NOT NULL, - status TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('queued', 'complete', 'failed_terminal', 'cancelled')), created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL ); @@ -24,8 +24,8 @@ CREATE TABLE outbox_event ( draft_json TEXT NOT NULL, signed_event_json TEXT, raw_event_json TEXT, - state TEXT NOT NULL, - accepted_quorum INTEGER NOT NULL, + state TEXT NOT NULL CHECK (state IN ('draft_queued', 'signing', 'signed', 'publishing', 'published', 'sign_retryable', 'publish_retryable', 'failed_terminal', 'cancelled')), + accepted_quorum INTEGER NOT NULL CHECK (accepted_quorum >= 0), attempt_count INTEGER NOT NULL, claim_token TEXT, claim_owner TEXT, @@ -48,7 +48,7 @@ ON outbox_event(event_id); CREATE TABLE outbox_event_relay_status ( outbox_event_id INTEGER NOT NULL REFERENCES outbox_event(outbox_event_id) ON DELETE CASCADE, relay_url TEXT NOT NULL, - status TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('pending', 'accepted', 'failed_retryable', 'failed_terminal')), attempt_count INTEGER NOT NULL, last_attempt_at_ms INTEGER, acknowledged_at_ms INTEGER, diff --git a/crates/outbox/src/model.rs b/crates/outbox/src/model.rs @@ -7,6 +7,8 @@ use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent} pub enum RadrootsOutboxOperationStatus { Queued, Complete, + FailedTerminal, + Cancelled, } impl RadrootsOutboxOperationStatus { @@ -14,6 +16,8 @@ impl RadrootsOutboxOperationStatus { match self { Self::Queued => "queued", Self::Complete => "complete", + Self::FailedTerminal => "failed_terminal", + Self::Cancelled => "cancelled", } } @@ -21,6 +25,8 @@ impl RadrootsOutboxOperationStatus { match value { "queued" => Ok(Self::Queued), "complete" => Ok(Self::Complete), + "failed_terminal" => Ok(Self::FailedTerminal), + "cancelled" => Ok(Self::Cancelled), _ => Err(RadrootsOutboxError::InvalidStoredEnum { field: "outbox_operation.status", value: value.to_owned(), @@ -38,6 +44,8 @@ pub enum RadrootsOutboxEventState { Published, SignRetryable, PublishRetryable, + FailedTerminal, + Cancelled, } impl RadrootsOutboxEventState { @@ -50,6 +58,8 @@ impl RadrootsOutboxEventState { Self::Published => "published", Self::SignRetryable => "sign_retryable", Self::PublishRetryable => "publish_retryable", + Self::FailedTerminal => "failed_terminal", + Self::Cancelled => "cancelled", } } @@ -62,12 +72,21 @@ impl RadrootsOutboxEventState { "published" => Ok(Self::Published), "sign_retryable" => Ok(Self::SignRetryable), "publish_retryable" => Ok(Self::PublishRetryable), + "failed_terminal" => Ok(Self::FailedTerminal), + "cancelled" => Ok(Self::Cancelled), _ => Err(RadrootsOutboxError::InvalidStoredEnum { field: "outbox_event.state", value: value.to_owned(), }), } } + + pub fn is_terminal(self) -> bool { + matches!( + self, + Self::Published | Self::FailedTerminal | Self::Cancelled + ) + } } #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs @@ -14,7 +14,7 @@ use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent} use radroots_nostr::prelude::{RadrootsNostrKeys, radroots_nostr_sign_frozen_draft}; use serde::Serialize; use sha2::{Digest, Sha256}; -use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteQueryResult}; use sqlx::{Row, SqlitePool}; use std::path::Path; use std::str::FromStr; @@ -271,7 +271,7 @@ impl RadrootsOutbox { }); } let signed_event_json = serde_json::to_string(&signed_event)?; - sqlx::query( + let changed = sqlx::query( "UPDATE outbox_event SET signed_event_json = ?, raw_event_json = ?, state = ?, last_error = NULL, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", ) .bind(signed_event_json.as_str()) @@ -282,6 +282,8 @@ impl RadrootsOutbox { .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(signed_event) } @@ -312,18 +314,19 @@ impl RadrootsOutbox { next_attempt_after_ms: i64, now_ms: i64, ) -> Result<(), RadrootsOutboxError> { - self.ensure_claim_token(outbox_event_id, claim_token) - .await?; - sqlx::query( - "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ?", + let changed = sqlx::query( + "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", ) .bind(RadrootsOutboxEventState::SignRetryable.as_str()) .bind(error.as_ref()) .bind(next_attempt_after_ms) .bind(now_ms) .bind(outbox_event_id) + .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(()) } @@ -335,18 +338,19 @@ impl RadrootsOutbox { next_attempt_after_ms: i64, now_ms: i64, ) -> Result<(), RadrootsOutboxError> { - self.ensure_claim_token(outbox_event_id, claim_token) - .await?; - sqlx::query( - "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ?", + let changed = sqlx::query( + "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", ) .bind(RadrootsOutboxEventState::PublishRetryable.as_str()) .bind(error.as_ref()) .bind(next_attempt_after_ms) .bind(now_ms) .bind(outbox_event_id) + .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(()) } @@ -384,7 +388,7 @@ impl RadrootsOutbox { let ingest = RadrootsEventIngest::new(event, observed_at_ms) .with_raw_json(signed_event.raw_json.clone()); let receipt = event_store.ingest_event(ingest).await?; - sqlx::query( + let changed = sqlx::query( "UPDATE outbox_event SET event_store_ingested = 1, event_store_inserted = ?, event_store_ingested_at_ms = ?, state = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", ) .bind(bool_i64(receipt.inserted)) @@ -395,6 +399,8 @@ impl RadrootsOutbox { .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(RadrootsOutboxEventStoreIngestReceipt { outbox_event_id, event_id: receipt.event_id, @@ -410,51 +416,20 @@ impl RadrootsOutbox { relay_url: &str, acknowledged_at_ms: i64, ) -> Result<(), RadrootsOutboxError> { - self.ensure_claim_token(outbox_event_id, claim_token) - .await?; - sqlx::query( - "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = ?, last_error = NULL WHERE outbox_event_id = ? AND relay_url = ?", + let changed = sqlx::query( + "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = ?, last_error = NULL WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)", ) .bind(RadrootsOutboxRelayStatus::Accepted.as_str()) .bind(acknowledged_at_ms) .bind(acknowledged_at_ms) .bind(outbox_event_id) .bind(relay_url) + .bind(outbox_event_id) + .bind(claim_token) .execute(&self.pool) .await?; - let remaining: i64 = sqlx::query( - "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status != ?", - ) - .bind(outbox_event_id) - .bind(RadrootsOutboxRelayStatus::Accepted.as_str()) - .fetch_one(&self.pool) - .await? - .try_get(0)?; - if remaining == 0 { - sqlx::query( - "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", - ) - .bind(RadrootsOutboxEventState::Published.as_str()) - .bind(acknowledged_at_ms) - .bind(outbox_event_id) - .bind(claim_token) - .execute(&self.pool) + self.ensure_claimed_update(outbox_event_id, claim_token, changed) .await?; - let operation_id: i64 = - sqlx::query("SELECT operation_id FROM outbox_event WHERE outbox_event_id = ?") - .bind(outbox_event_id) - .fetch_one(&self.pool) - .await? - .try_get("operation_id")?; - sqlx::query( - "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?", - ) - .bind(RadrootsOutboxOperationStatus::Complete.as_str()) - .bind(acknowledged_at_ms) - .bind(operation_id) - .execute(&self.pool) - .await?; - } Ok(()) } @@ -465,9 +440,7 @@ impl RadrootsOutbox { accepted_quorum: i64, now_ms: i64, ) -> Result<(), RadrootsOutboxError> { - self.ensure_claim_token(outbox_event_id, claim_token) - .await?; - sqlx::query( + let changed = sqlx::query( "UPDATE outbox_event SET accepted_quorum = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", ) .bind(accepted_quorum) @@ -476,6 +449,8 @@ impl RadrootsOutbox { .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(()) } @@ -517,6 +492,133 @@ impl RadrootsOutbox { .await } + pub async fn complete_publish_attempt( + &self, + outbox_event_id: i64, + claim_token: &str, + retryable_error: impl AsRef<str>, + terminal_error: impl AsRef<str>, + next_attempt_after_ms: i64, + now_ms: i64, + ) -> Result<RadrootsOutboxEventState, RadrootsOutboxError> { + let mut tx = self.pool.begin().await?; + let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?; + let operation_id = row.operation_id; + let accepted_quorum = row.accepted_quorum; + let accepted_count: i64 = sqlx::query( + "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", + ) + .bind(outbox_event_id) + .bind(RadrootsOutboxRelayStatus::Accepted.as_str()) + .fetch_one(&mut *tx) + .await? + .try_get(0)?; + let retryable_count: i64 = sqlx::query( + "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", + ) + .bind(outbox_event_id) + .bind(RadrootsOutboxRelayStatus::FailedRetryable.as_str()) + .fetch_one(&mut *tx) + .await? + .try_get(0)?; + let pending_count: i64 = sqlx::query( + "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", + ) + .bind(outbox_event_id) + .bind(RadrootsOutboxRelayStatus::Pending.as_str()) + .fetch_one(&mut *tx) + .await? + .try_get(0)?; + + let (event_state, operation_status, last_error, next_attempt_after_ms) = + if accepted_count >= accepted_quorum { + ( + RadrootsOutboxEventState::Published, + Some(RadrootsOutboxOperationStatus::Complete), + None, + now_ms, + ) + } else if retryable_count > 0 || pending_count > 0 { + ( + RadrootsOutboxEventState::PublishRetryable, + None, + Some(retryable_error.as_ref()), + next_attempt_after_ms, + ) + } else { + ( + RadrootsOutboxEventState::FailedTerminal, + Some(RadrootsOutboxOperationStatus::FailedTerminal), + Some(terminal_error.as_ref()), + now_ms, + ) + }; + + let changed = sqlx::query( + "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", + ) + .bind(event_state.as_str()) + .bind(last_error) + .bind(next_attempt_after_ms) + .bind(now_ms) + .bind(outbox_event_id) + .bind(claim_token) + .execute(&mut *tx) + .await?; + if changed.rows_affected() == 0 { + return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); + } + + if let Some(operation_status) = operation_status { + sqlx::query( + "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?", + ) + .bind(operation_status.as_str()) + .bind(now_ms) + .bind(operation_id) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + Ok(event_state) + } + + pub async fn mark_publish_failed_terminal( + &self, + outbox_event_id: i64, + claim_token: &str, + error: impl AsRef<str>, + now_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.finish_claimed_event( + outbox_event_id, + claim_token, + RadrootsOutboxEventState::FailedTerminal, + RadrootsOutboxOperationStatus::FailedTerminal, + Some(error.as_ref()), + now_ms, + ) + .await + } + + pub async fn cancel_claimed_event( + &self, + outbox_event_id: i64, + claim_token: &str, + now_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.finish_claimed_event( + outbox_event_id, + claim_token, + RadrootsOutboxEventState::Cancelled, + RadrootsOutboxOperationStatus::Cancelled, + None, + now_ms, + ) + .await + } + async fn claimed_event( &self, outbox_event_id: i64, @@ -548,6 +650,59 @@ impl RadrootsOutbox { Ok(()) } + async fn ensure_claimed_update( + &self, + outbox_event_id: i64, + claim_token: &str, + changed: SqliteQueryResult, + ) -> Result<(), RadrootsOutboxError> { + if changed.rows_affected() > 0 { + return Ok(()); + } + self.ensure_claim_token(outbox_event_id, claim_token) + .await?; + Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }) + } + + async fn finish_claimed_event( + &self, + outbox_event_id: i64, + claim_token: &str, + event_state: RadrootsOutboxEventState, + operation_status: RadrootsOutboxOperationStatus, + last_error: Option<&str>, + now_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + let mut tx = self.pool.begin().await?; + let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?; + let changed = sqlx::query( + "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", + ) + .bind(event_state.as_str()) + .bind(last_error) + .bind(now_ms) + .bind(now_ms) + .bind(outbox_event_id) + .bind(claim_token) + .execute(&mut *tx) + .await?; + if changed.rows_affected() == 0 { + return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); + } + + sqlx::query( + "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?", + ) + .bind(operation_status.as_str()) + .bind(now_ms) + .bind(row.operation_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + async fn mark_relay_failed( &self, outbox_event_id: i64, @@ -557,18 +712,20 @@ impl RadrootsOutbox { error: &str, attempted_at_ms: i64, ) -> Result<(), RadrootsOutboxError> { - self.ensure_claim_token(outbox_event_id, claim_token) - .await?; - sqlx::query( - "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = NULL, last_error = ? WHERE outbox_event_id = ? AND relay_url = ?", + let changed = sqlx::query( + "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = NULL, last_error = ? WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)", ) .bind(status.as_str()) .bind(attempted_at_ms) .bind(error) .bind(outbox_event_id) .bind(relay_url) + .bind(outbox_event_id) + .bind(claim_token) .execute(&self.pool) .await?; + self.ensure_claimed_update(outbox_event_id, claim_token, changed) + .await?; Ok(()) } } @@ -580,6 +737,11 @@ struct ExistingOperation { idempotency_digest: String, } +struct ClaimedEventIdentity { + operation_id: i64, + accepted_quorum: i64, +} + async fn configure_connection( pool: &SqlitePool, file_backed: bool, @@ -656,6 +818,29 @@ async fn event_by_id_tx( event_from_row(row) } +async fn claimed_event_identity_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + outbox_event_id: i64, + claim_token: &str, +) -> Result<ClaimedEventIdentity, RadrootsOutboxError> { + let row = + sqlx::query("SELECT operation_id, accepted_quorum, claim_token FROM outbox_event WHERE outbox_event_id = ?") + .bind(outbox_event_id) + .fetch_optional(&mut **tx) + .await?; + let Some(row) = row else { + return Err(RadrootsOutboxError::EventNotFound(outbox_event_id)); + }; + let stored: Option<String> = row.try_get("claim_token")?; + if stored.as_deref() != Some(claim_token) { + return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); + } + Ok(ClaimedEventIdentity { + operation_id: row.try_get("operation_id")?, + accepted_quorum: row.try_get("accepted_quorum")?, + }) +} + async fn relay_urls_for_tx( tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, outbox_event_id: i64, @@ -895,6 +1080,38 @@ mod tests { assert!(row.is_none()); } + #[test] + fn terminal_and_cancelled_event_states_round_trip() { + assert_eq!( + RadrootsOutboxOperationStatus::parse("failed_terminal").expect("operation status"), + RadrootsOutboxOperationStatus::FailedTerminal + ); + assert_eq!( + RadrootsOutboxOperationStatus::FailedTerminal.as_str(), + "failed_terminal" + ); + assert_eq!( + RadrootsOutboxOperationStatus::parse("cancelled").expect("operation status"), + RadrootsOutboxOperationStatus::Cancelled + ); + assert_eq!( + RadrootsOutboxOperationStatus::Cancelled.as_str(), + "cancelled" + ); + assert_eq!( + RadrootsOutboxEventState::parse("failed_terminal").expect("event state"), + RadrootsOutboxEventState::FailedTerminal + ); + assert!(RadrootsOutboxEventState::FailedTerminal.is_terminal()); + assert_eq!( + RadrootsOutboxEventState::parse("cancelled").expect("event state"), + RadrootsOutboxEventState::Cancelled + ); + assert!(RadrootsOutboxEventState::Cancelled.is_terminal()); + assert!(RadrootsOutboxEventState::Published.is_terminal()); + assert!(!RadrootsOutboxEventState::PublishRetryable.is_terminal()); + } + #[tokio::test] async fn enqueue_idempotency_is_scoped_by_kind_pubkey_and_digest() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); @@ -1034,6 +1251,84 @@ mod tests { } #[tokio::test] + async fn claimed_mutations_reject_stale_tokens_without_state_changes() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let event_store = RadrootsEventStore::open_memory() + .await + .expect("event store"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "hello"); + let receipt = outbox + .enqueue_operation(operation_input(draft, 1_000)) + .await + .expect("enqueue"); + + let first_claim = outbox + .claim_next_ready_event("worker-a", "claim-a", 1_100, 1_000) + .await + .expect("claim") + .expect("claim"); + let signed = outbox + .sign_claimed_event(&first_claim, &fixture_keys(), 1_050) + .await + .expect("sign"); + outbox.recover_expired_claims(1_101).await.expect("recover"); + let second_claim = outbox + .claim_next_ready_event("worker-b", "claim-b", 1_500, 1_200) + .await + .expect("claim") + .expect("claim"); + assert_eq!(second_claim.state, RadrootsOutboxEventState::Publishing); + + let retry_with_stale_token = outbox + .mark_publish_retryable( + receipt.outbox_event_id, + "claim-a", + "stale retry", + 1_600, + 1_300, + ) + .await + .expect_err("stale retry token"); + assert!(matches!( + retry_with_stale_token, + RadrootsOutboxError::ClaimTokenMismatch { .. } + )); + + let relay_with_stale_token = outbox + .mark_relay_accepted(receipt.outbox_event_id, "claim-a", RELAY_PRIMARY_WSS, 1_300) + .await + .expect_err("stale relay token"); + assert!(matches!( + relay_with_stale_token, + RadrootsOutboxError::ClaimTokenMismatch { .. } + )); + + let ingest_with_current_token = outbox + .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-b", 1_350) + .await + .expect("current ingest"); + assert_eq!(ingest_with_current_token.event_id, signed.id); + + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::Publishing); + assert_eq!(event.claim_token.as_deref(), Some("claim-b")); + + let statuses = outbox + .relay_statuses(receipt.outbox_event_id) + .await + .expect("statuses"); + assert!( + statuses + .iter() + .all(|status| status.status == RadrootsOutboxRelayStatus::Pending) + ); + } + + #[tokio::test] async fn signed_events_are_reused_after_claim_recovery() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); let (receipt, claimed) = enqueue_signed_fixture(&outbox).await; @@ -1141,4 +1436,199 @@ mod tests { assert_eq!(reclaimed.state, RadrootsOutboxEventState::Publishing); assert_eq!(reclaimed.signed_event.as_ref(), Some(&signed)); } + + #[tokio::test] + async fn terminal_and_cancelled_claimed_events_are_not_reclaimable() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let terminal_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal"); + let terminal_receipt = outbox + .enqueue_operation(operation_input(terminal_draft, 1_000)) + .await + .expect("enqueue"); + outbox + .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + outbox + .mark_publish_failed_terminal( + terminal_receipt.outbox_event_id, + "claim-a", + "terminal failure", + 1_100, + ) + .await + .expect("terminal"); + + let terminal_event = outbox + .get_event(terminal_receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!( + terminal_event.state, + RadrootsOutboxEventState::FailedTerminal + ); + assert!(terminal_event.state.is_terminal()); + assert!(terminal_event.claim_token.is_none()); + let terminal_operation = outbox + .get_operation(terminal_receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!( + terminal_operation.status, + RadrootsOutboxOperationStatus::FailedTerminal + ); + assert!( + outbox + .claim_next_ready_event("worker-b", "claim-b", 2_000, 1_200) + .await + .expect("claim") + .is_none() + ); + + let cancelled_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "cancelled"); + let cancelled_receipt = outbox + .enqueue_operation(operation_input(cancelled_draft, 2_000)) + .await + .expect("enqueue"); + outbox + .claim_next_ready_event("worker-c", "claim-c", 3_000, 2_000) + .await + .expect("claim") + .expect("claim"); + outbox + .cancel_claimed_event(cancelled_receipt.outbox_event_id, "claim-c", 2_100) + .await + .expect("cancel"); + let cancelled_event = outbox + .get_event(cancelled_receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(cancelled_event.state, RadrootsOutboxEventState::Cancelled); + assert!(cancelled_event.state.is_terminal()); + assert!(cancelled_event.claim_token.is_none()); + let cancelled_operation = outbox + .get_operation(cancelled_receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!( + cancelled_operation.status, + RadrootsOutboxOperationStatus::Cancelled + ); + assert!( + outbox + .claim_next_ready_event("worker-d", "claim-d", 3_000, 2_200) + .await + .expect("claim") + .is_none() + ); + } + + #[tokio::test] + async fn terminal_publish_attempt_fails_operation_when_quorum_cannot_be_met() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal quorum"); + let receipt = outbox + .enqueue_operation(RadrootsOutboxOperationInput::new( + "publish_post", + draft, + vec![ + RELAY_PRIMARY_WSS.to_owned(), + RELAY_SECONDARY_WSS.to_owned(), + "wss://relay-3.example.com".to_owned(), + ], + 1_000, + )) + .await + .expect("enqueue"); + let sign_claim = outbox + .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + outbox + .sign_claimed_event(&sign_claim, &fixture_keys(), 1_100) + .await + .expect("sign"); + outbox.recover_expired_claims(2_001).await.expect("recover"); + outbox + .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) + .await + .expect("claim") + .expect("claim"); + outbox + .set_publish_quorum(receipt.outbox_event_id, "publish-a", 3, 2_200) + .await + .expect("quorum"); + outbox + .mark_relay_accepted( + receipt.outbox_event_id, + "publish-a", + RELAY_PRIMARY_WSS, + 2_250, + ) + .await + .expect("accepted"); + outbox + .mark_relay_failed_terminal( + receipt.outbox_event_id, + "publish-a", + RELAY_SECONDARY_WSS, + "restricted: denied", + 2_260, + ) + .await + .expect("terminal"); + outbox + .mark_relay_failed_terminal( + receipt.outbox_event_id, + "publish-a", + "wss://relay-3.example.com", + "blocked: denied", + 2_270, + ) + .await + .expect("terminal"); + + let state = outbox + .complete_publish_attempt( + receipt.outbox_event_id, + "publish-a", + "retryable", + "terminal", + 2_500, + 2_300, + ) + .await + .expect("complete attempt"); + assert_eq!(state, RadrootsOutboxEventState::FailedTerminal); + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::FailedTerminal); + assert!(event.claim_token.is_none()); + assert_eq!(event.last_error.as_deref(), Some("terminal")); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!( + operation.status, + RadrootsOutboxOperationStatus::FailedTerminal + ); + assert!( + outbox + .claim_next_ready_event("publisher", "publish-b", 4_000, 2_400) + .await + .expect("claim") + .is_none() + ); + } } diff --git a/crates/relay_transport/src/outbox.rs b/crates/relay_transport/src/outbox.rs @@ -148,17 +148,16 @@ where } } - if !publish.quorum_met || publish.retryable_count > 0 || publish.terminal_count > 0 { - outbox - .mark_publish_retryable( - claimed.outbox_event_id, - claimed.claim_token.as_str(), - "relay publish incomplete", - policy.next_attempt_after_ms, - now_ms, - ) - .await?; - } + outbox + .complete_publish_attempt( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + "relay publish incomplete", + "relay publish terminal", + policy.next_attempt_after_ms, + now_ms, + ) + .await?; Ok(RadrootsOutboxPublishReceipt { local_ingest, diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs @@ -422,3 +422,114 @@ async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() { .expect("observations"); assert_eq!(observations.len(), 3); } + +#[tokio::test] +async fn outbox_publish_marks_published_when_policy_quorum_is_met_with_failure_diagnostics() { + let signed = signed_post("quorum"); + let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); + let store = RadrootsEventStore::open_memory().await.expect("store"); + let draft = RadrootsFrozenEventDraft::new( + "radroots.social.post.v1", + KIND_POST, + signed.created_at, + signed.tags.clone(), + signed.content.clone(), + signed.pubkey.as_str(), + ) + .expect("draft"); + let receipt = outbox + .enqueue_operation(RadrootsOutboxOperationInput::new( + "publish_post", + draft, + vec![ + RELAY_PRIMARY_WSS.to_owned(), + RELAY_SECONDARY_WSS.to_owned(), + RELAY_TERTIARY_WSS.to_owned(), + ], + 1_000, + )) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + let signed = outbox + .sign_claimed_event(&claimed, &fixture_keys(), 1_100) + .await + .expect("sign"); + outbox.recover_expired_claims(2_001).await.expect("recover"); + let publish_claim = outbox + .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) + .await + .expect("claim") + .expect("publish claim"); + + let adapter = RadrootsMockRelayPublishAdapter::new() + .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted()) + .with_outcome( + RELAY_SECONDARY_WSS, + RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"), + ) + .with_outcome( + RELAY_TERTIARY_WSS, + RadrootsRelayOutcome::classify("restricted: group write denied"), + ); + let published = publish_claimed_outbox_event( + &outbox, + &store, + &adapter, + &publish_claim, + RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2), + 2_200, + ) + .await + .expect("publish"); + + assert_eq!(published.publish.quorum, 2); + assert_eq!(published.publish.accepted_count, 2); + assert_eq!(published.publish.terminal_count, 1); + assert!(published.publish.quorum_met); + + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::Published); + assert_eq!(event.accepted_quorum, 2); + assert!(event.claim_token.is_none()); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); + + let statuses = outbox + .relay_statuses(receipt.outbox_event_id) + .await + .expect("statuses"); + assert_eq!( + statuses + .iter() + .find(|status| status.relay_url == RELAY_TERTIARY_WSS) + .expect("tertiary") + .status, + RadrootsOutboxRelayStatus::FailedTerminal + ); + assert!( + outbox + .claim_next_ready_event("publisher", "publish-b", 4_000, 2_300) + .await + .expect("claim") + .is_none() + ); + + let observations = store + .observations_for_event(signed.id.as_str()) + .await + .expect("observations"); + assert_eq!(observations.len(), 2); +}