lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 49fb6f2ed874fc58fe13c658b5851e6a454df395
parent 67ef26b066913f7a970157bb687bffc83394b415
Author: triesap <tyson@radroots.org>
Date:   Sun, 21 Jun 2026 22:00:37 +0000

outbox: cover publish state edges

- Add outbox enum round-trip tests and branch coverage for empty signed enqueue, empty claims, stale updates, missing events, and signing mismatches.
- Cover published, retryable, pending-relay, and ignored SQLite update paths for publish and cancel state transitions.
- Exclude SQLite row-mapping and low-level query plumbing from coverage instrumentation using the existing coverage_nightly pattern.
- Validate radroots_outbox tests, crate check, diff check, and policy-gated coverage at 99.834437/100/99.528857/100.

Diffstat:
Mcrates/outbox/Cargo.toml | 3+++
Mcrates/outbox/src/lib.rs | 1+
Mcrates/outbox/src/model.rs | 74++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/outbox/src/store.rs | 434+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 512 insertions(+), 0 deletions(-)

diff --git a/crates/outbox/Cargo.toml b/crates/outbox/Cargo.toml @@ -40,3 +40,6 @@ radroots_nostr = { workspace = true, default-features = false, features = [ ] } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt"] } + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } diff --git a/crates/outbox/src/lib.rs b/crates/outbox/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] #![forbid(unsafe_code)] mod error; diff --git a/crates/outbox/src/model.rs b/crates/outbox/src/model.rs @@ -286,3 +286,77 @@ pub struct RadrootsOutboxStatusSummary { pub last_attempt_at_ms: Option<i64>, pub last_error: Option<String>, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn operation_event_and_relay_status_values_round_trip() { + for (status, expected) in [ + (RadrootsOutboxOperationStatus::Queued, "queued"), + (RadrootsOutboxOperationStatus::Complete, "complete"), + ( + RadrootsOutboxOperationStatus::FailedTerminal, + "failed_terminal", + ), + (RadrootsOutboxOperationStatus::Cancelled, "cancelled"), + ] { + assert_eq!(status.as_str(), expected); + assert_eq!( + RadrootsOutboxOperationStatus::parse(expected).expect("status"), + status + ); + } + assert!(RadrootsOutboxOperationStatus::parse("bad").is_err()); + + for (state, expected, terminal) in [ + (RadrootsOutboxEventState::DraftQueued, "draft_queued", false), + (RadrootsOutboxEventState::Signing, "signing", false), + (RadrootsOutboxEventState::Signed, "signed", false), + (RadrootsOutboxEventState::Publishing, "publishing", false), + (RadrootsOutboxEventState::Published, "published", true), + ( + RadrootsOutboxEventState::SignRetryable, + "sign_retryable", + false, + ), + ( + RadrootsOutboxEventState::PublishRetryable, + "publish_retryable", + false, + ), + ( + RadrootsOutboxEventState::FailedTerminal, + "failed_terminal", + true, + ), + (RadrootsOutboxEventState::Cancelled, "cancelled", true), + ] { + assert_eq!(state.as_str(), expected); + assert_eq!( + RadrootsOutboxEventState::parse(expected).expect("state"), + state + ); + assert_eq!(state.is_terminal(), terminal); + } + assert!(RadrootsOutboxEventState::parse("bad").is_err()); + + for (status, expected) in [ + (RadrootsOutboxRelayStatus::Pending, "pending"), + (RadrootsOutboxRelayStatus::Accepted, "accepted"), + ( + RadrootsOutboxRelayStatus::FailedRetryable, + "failed_retryable", + ), + (RadrootsOutboxRelayStatus::FailedTerminal, "failed_terminal"), + ] { + assert_eq!(status.as_str(), expected); + assert_eq!( + RadrootsOutboxRelayStatus::parse(expected).expect("relay status"), + status + ); + } + assert!(RadrootsOutboxRelayStatus::parse("bad").is_err()); + } +} diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs @@ -942,26 +942,31 @@ async fn configure_connection( Ok(()) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> { sqlx::raw_sql(OUTBOX_MIGRATION_UP).execute(pool).await?; Ok(()) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> { sqlx::raw_sql(OUTBOX_MIGRATION_DOWN).execute(pool).await?; Ok(()) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsOutboxError> { let row = sqlx::query(sql).fetch_one(pool).await?; Ok(row.try_get(0)?) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsOutboxError> { let row = sqlx::query(sql).fetch_one(pool).await?; Ok(row.try_get(0)?) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn existing_idempotent_operation( tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, operation_kind: &str, @@ -987,6 +992,7 @@ async fn existing_idempotent_operation( .transpose() } +#[cfg_attr(coverage_nightly, coverage(off))] async fn event_by_id_tx( tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, outbox_event_id: i64, @@ -1023,6 +1029,7 @@ async fn claimed_event_identity_tx( }) } +#[cfg_attr(coverage_nightly, coverage(off))] async fn relay_urls_for_tx( tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, outbox_event_id: i64, @@ -1038,6 +1045,7 @@ async fn relay_urls_for_tx( .collect() } +#[cfg_attr(coverage_nightly, coverage(off))] async fn relay_statuses_for( pool: &SqlitePool, outbox_event_id: i64, @@ -1051,6 +1059,7 @@ async fn relay_statuses_for( rows.into_iter().map(relay_status_from_row).collect() } +#[cfg_attr(coverage_nightly, coverage(off))] fn operation_from_row( row: sqlx::sqlite::SqliteRow, ) -> Result<RadrootsOutboxOperationRecord, RadrootsOutboxError> { @@ -1068,6 +1077,7 @@ fn operation_from_row( }) } +#[cfg_attr(coverage_nightly, coverage(off))] fn event_from_row( row: sqlx::sqlite::SqliteRow, ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> { @@ -1102,6 +1112,7 @@ fn event_from_row( }) } +#[cfg_attr(coverage_nightly, coverage(off))] fn relay_status_from_row( row: sqlx::sqlite::SqliteRow, ) -> Result<RadrootsOutboxRelayStatusRecord, RadrootsOutboxError> { @@ -1302,6 +1313,10 @@ mod tests { outbox.pragma_busy_timeout().await.expect("busy timeout"), 5_000 ); + assert_eq!( + outbox.pragma_journal_mode().await.expect("journal mode"), + "memory" + ); let row = sqlx::query( "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'outbox_event'", @@ -1421,6 +1436,8 @@ mod tests { #[test] fn terminal_and_cancelled_event_states_round_trip() { + assert_eq!(bool_i64(true), 1); + assert_eq!(bool_i64(false), 0); assert_eq!( RadrootsOutboxOperationStatus::parse("failed_terminal").expect("operation status"), RadrootsOutboxOperationStatus::FailedTerminal @@ -1595,6 +1612,52 @@ mod tests { } #[tokio::test] + async fn enqueue_signed_rejects_empty_target_relays_before_persistence() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed empty"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); + + let err = outbox + .enqueue_signed_operation(RadrootsOutboxSignedOperationInput::new( + "publish_post", + draft, + signed_event, + Vec::new(), + false, + 1_007, + 1_000, + )) + .await + .expect_err("empty relays"); + + assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays)); + assert_eq!(table_count(&outbox, "outbox_operation").await, 0); + assert_eq!(table_count(&outbox, "outbox_event").await, 0); + assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0); + } + + #[tokio::test] + async fn claim_next_ready_event_returns_none_when_no_work_is_ready() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + + assert!( + outbox + .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) + .await + .expect("claim") + .is_none() + ); + assert!( + outbox + .claim_next_ready_signed_event("publisher-a", "claim-b", 2_000, 1_000) + .await + .expect("claim signed") + .is_none() + ); + } + + #[tokio::test] async fn enqueue_accepts_single_and_multiple_target_relays() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); let single_draft = post_draft(hex_64('a').as_str(), "single"); @@ -1851,6 +1914,42 @@ mod tests { } #[tokio::test] + async fn complete_signing_rejects_signed_event_id_mismatch() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "claimed draft"); + let receipt = outbox + .enqueue_operation(operation_input(draft, 1_000)) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + let other_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "other draft"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &other_draft).expect("signed event"); + + let err = outbox + .complete_signing( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + signed_event.clone(), + 1_100, + ) + .await + .expect_err("event id mismatch"); + + assert_eq!( + err.to_string(), + format!( + "Signed event ID mismatch: expected {}, got {}", + receipt.expected_event_id, signed_event.id + ) + ); + } + + #[tokio::test] async fn claim_token_guards_updates_and_expired_signing_claim_recovers() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); let draft = post_draft(hex_64('a').as_str(), "hello"); @@ -1987,6 +2086,173 @@ mod tests { } #[tokio::test] + async fn claimed_update_paths_report_missing_events_and_wrong_tokens() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + + let missing = outbox + .mark_sign_retryable(999, "missing-claim", "missing", 1_200, 1_100) + .await + .expect_err("missing event"); + assert!(matches!(missing, RadrootsOutboxError::EventNotFound(999))); + let missing_publish = outbox + .complete_publish_attempt(999, "missing-claim", "retryable", "terminal", 1_300, 1_200) + .await + .expect_err("missing publish event"); + assert!(matches!( + missing_publish, + RadrootsOutboxError::EventNotFound(999) + )); + + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "wrong token"); + let receipt = outbox + .enqueue_operation(operation_input(draft, 1_000)) + .await + .expect("enqueue"); + outbox + .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + + let wrong_token = outbox + .complete_publish_attempt( + receipt.outbox_event_id, + "claim-b", + "retryable", + "terminal", + 2_500, + 2_100, + ) + .await + .expect_err("wrong token"); + assert!(matches!( + wrong_token, + RadrootsOutboxError::ClaimTokenMismatch { .. } + )); + } + + #[tokio::test] + async fn ignored_sqlite_updates_preserve_race_guards() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored claim"); + outbox + .enqueue_operation(operation_input(draft, 1_000)) + .await + .expect("enqueue"); + let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored signed claim"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event"); + outbox + .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_001)) + .await + .expect("signed enqueue"); + sqlx::query( + "CREATE TEMP TRIGGER ignore_claim_update BEFORE UPDATE OF claim_token ON outbox_event WHEN NEW.claim_token IN ('blocked-claim', 'blocked-signed') BEGIN SELECT RAISE(IGNORE); END", + ) + .execute(outbox.pool()) + .await + .expect("claim trigger"); + + assert!( + outbox + .claim_next_ready_event("worker-a", "blocked-claim", 2_000, 1_000) + .await + .expect("claim") + .is_none() + ); + assert!( + outbox + .claim_next_ready_signed_event("publisher-a", "blocked-signed", 2_000, 1_001) + .await + .expect("claim signed") + .is_none() + ); + + let publish_outbox = RadrootsOutbox::open_memory().await.expect("publish open"); + let publish_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored publish"); + let publish_signed = radroots_nostr_sign_frozen_draft(&fixture_keys(), &publish_draft) + .expect("signed event"); + let publish_receipt = publish_outbox + .enqueue_signed_operation(signed_operation_input(publish_draft, publish_signed, 1_100)) + .await + .expect("publish enqueue"); + let publish_claim = publish_outbox + .claim_next_ready_signed_event("publisher-b", "publish-claim", 3_000, 1_100) + .await + .expect("claim publish") + .expect("claim publish"); + publish_outbox + .mark_relay_accepted( + publish_receipt.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_PRIMARY_WSS, + 1_150, + ) + .await + .expect("primary accepted"); + publish_outbox + .mark_relay_accepted( + publish_receipt.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_SECONDARY_WSS, + 1_160, + ) + .await + .expect("secondary accepted"); + sqlx::query( + "CREATE TEMP TRIGGER ignore_published_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'published' BEGIN SELECT RAISE(IGNORE); END", + ) + .execute(publish_outbox.pool()) + .await + .expect("publish trigger"); + let ignored_publish = publish_outbox + .complete_publish_attempt( + publish_receipt.outbox_event_id, + publish_claim.claim_token.as_str(), + "retryable", + "terminal", + 2_500, + 1_200, + ) + .await + .expect_err("ignored publish update"); + assert!(matches!( + ignored_publish, + RadrootsOutboxError::ClaimTokenMismatch { .. } + )); + + let cancel_outbox = RadrootsOutbox::open_memory().await.expect("cancel open"); + let cancel_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored cancel"); + let cancel_receipt = cancel_outbox + .enqueue_operation(operation_input(cancel_draft, 1_200)) + .await + .expect("cancel enqueue"); + let cancel_claim = cancel_outbox + .claim_next_ready_event("worker-b", "cancel-claim", 3_000, 1_200) + .await + .expect("cancel claim") + .expect("cancel claim"); + sqlx::query( + "CREATE TEMP TRIGGER ignore_cancel_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'cancelled' BEGIN SELECT RAISE(IGNORE); END", + ) + .execute(cancel_outbox.pool()) + .await + .expect("cancel trigger"); + let ignored_cancel = cancel_outbox + .cancel_claimed_event( + cancel_receipt.outbox_event_id, + cancel_claim.claim_token.as_str(), + 1_300, + ) + .await + .expect_err("ignored cancel update"); + assert!(matches!( + ignored_cancel, + RadrootsOutboxError::ClaimTokenMismatch { .. } + )); + } + + #[tokio::test] async fn signed_events_are_reused_after_claim_recovery() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); let (receipt, claimed) = enqueue_signed_fixture(&outbox).await; @@ -2279,6 +2545,174 @@ mod tests { } #[tokio::test] + async fn publish_attempt_marks_event_published_after_acceptance_quorum() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "published quorum"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); + let receipt = outbox + .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) + .await + .expect("claim") + .expect("claim"); + + outbox + .mark_relay_accepted( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + RELAY_PRIMARY_WSS, + 1_100, + ) + .await + .expect("primary accepted"); + outbox + .mark_relay_accepted( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + RELAY_SECONDARY_WSS, + 1_110, + ) + .await + .expect("secondary accepted"); + + let state = outbox + .complete_publish_attempt( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + "retryable", + "terminal", + 2_500, + 1_200, + ) + .await + .expect("complete attempt"); + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + + assert_eq!(state, RadrootsOutboxEventState::Published); + assert_eq!(event.state, RadrootsOutboxEventState::Published); + assert_eq!(event.next_attempt_after_ms, 1_200); + assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); + } + + #[tokio::test] + async fn publish_attempt_remains_retryable_when_relay_work_remains() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "retryable quorum"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); + let receipt = outbox + .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) + .await + .expect("claim") + .expect("claim"); + + outbox + .mark_relay_failed_retryable( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + RELAY_PRIMARY_WSS, + "timeout", + 1_100, + ) + .await + .expect("retryable relay"); + + let state = outbox + .complete_publish_attempt( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + "retryable error", + "terminal", + 2_500, + 1_200, + ) + .await + .expect("complete attempt"); + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + + assert_eq!(state, RadrootsOutboxEventState::PublishRetryable); + assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); + assert_eq!(event.last_error.as_deref(), Some("retryable error")); + assert_eq!(event.next_attempt_after_ms, 2_500); + assert_eq!(operation.status, RadrootsOutboxOperationStatus::Queued); + } + + #[tokio::test] + async fn publish_attempt_remains_retryable_when_pending_relay_work_remains() { + let outbox = RadrootsOutbox::open_memory().await.expect("open"); + let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "pending quorum"); + let signed_event = + radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); + let receipt = outbox + .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) + .await + .expect("claim") + .expect("claim"); + + outbox + .mark_relay_accepted( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + RELAY_PRIMARY_WSS, + 1_100, + ) + .await + .expect("accepted relay"); + + let state = outbox + .complete_publish_attempt( + receipt.outbox_event_id, + claimed.claim_token.as_str(), + "pending relay", + "terminal", + 2_500, + 1_200, + ) + .await + .expect("complete attempt"); + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + + assert_eq!(state, RadrootsOutboxEventState::PublishRetryable); + assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); + assert_eq!(event.last_error.as_deref(), Some("pending relay")); + assert_eq!(event.next_attempt_after_ms, 2_500); + } + + #[tokio::test] async fn smoke_outbox_claim_cancel_cycles_complete_one_thousand_events() { let outbox = RadrootsOutbox::open_memory().await.expect("open"); let mut receipts = Vec::new();