commit 44636b68a03f55bc095bbd97b86c77677eab10e8
parent 853eb9704cff329effb47549d70654df2e408404
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 00:50:45 -0700
outbox: reject empty publish relay targets
- add a typed empty-target-relays error before enqueue persistence
- enforce positive publish quorum in the outbox schema
- cover empty, single-relay, and multi-relay enqueue behavior
- validate with cargo fmt, check, and tests for radroots_outbox
Diffstat:
3 files changed, 100 insertions(+), 1 deletion(-)
diff --git a/crates/outbox/migrations/0001_outbox.up.sql b/crates/outbox/migrations/0001_outbox.up.sql
@@ -25,7 +25,7 @@ CREATE TABLE outbox_event (
signed_event_json TEXT,
raw_event_json TEXT,
state TEXT NOT NULL CHECK (state IN ('draft_queued', 'signing', 'signed', 'publishing', 'published', 'sign_retryable', 'publish_retryable', 'failed_terminal', 'cancelled')),
- accepted_quorum INTEGER NOT NULL CHECK (accepted_quorum >= 0),
+ accepted_quorum INTEGER NOT NULL CHECK (accepted_quorum > 0),
attempt_count INTEGER NOT NULL,
claim_token TEXT,
claim_owner TEXT,
diff --git a/crates/outbox/src/error.rs b/crates/outbox/src/error.rs
@@ -16,6 +16,9 @@ pub enum RadrootsOutboxError {
#[error("Event store error: {0}")]
EventStore(#[from] radroots_event_store::RadrootsEventStoreError),
+ #[error("target relays cannot be empty")]
+ EmptyTargetRelays,
+
#[error("Invalid stored enum for {field}: {value}")]
InvalidStoredEnum { field: &'static str, value: String },
diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs
@@ -74,6 +74,9 @@ impl RadrootsOutbox {
input: RadrootsOutboxOperationInput,
) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> {
let target_relays = canonical_relays(input.target_relays);
+ if target_relays.is_empty() {
+ return Err(RadrootsOutboxError::EmptyTargetRelays);
+ }
let digest = idempotency_digest(
input.operation_kind.as_str(),
input.draft.expected_pubkey.as_str(),
@@ -1052,6 +1055,14 @@ mod tests {
(receipt, claimed)
}
+ async fn table_count(outbox: &RadrootsOutbox, table_name: &str) -> i64 {
+ let sql = format!("SELECT COUNT(*) FROM {table_name}");
+ sqlx::query_scalar(sql.as_str())
+ .fetch_one(outbox.pool())
+ .await
+ .expect("table count")
+ }
+
#[tokio::test]
async fn migration_applies_pragmas_and_migrates_down() {
let outbox = RadrootsOutbox::open_memory().await.expect("open");
@@ -1190,6 +1201,91 @@ mod tests {
}
#[tokio::test]
+ async fn enqueue_rejects_empty_target_relays_before_persistence() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+ let draft = post_draft(hex_64('a').as_str(), "hello");
+
+ let err = outbox
+ .enqueue_operation(
+ RadrootsOutboxOperationInput::new("publish_post", draft, Vec::new(), 1_000)
+ .with_idempotency_key("empty-relays"),
+ )
+ .await
+ .expect_err("empty relays");
+
+ assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays));
+ assert_eq!(table_count(&outbox, "outbox_operation").await, 0);
+ assert_eq!(table_count(&outbox, "outbox_event").await, 0);
+ assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0);
+ }
+
+ #[tokio::test]
+ async fn enqueue_accepts_single_and_multiple_target_relays() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+ let single_draft = post_draft(hex_64('a').as_str(), "single");
+
+ let single = outbox
+ .enqueue_operation(RadrootsOutboxOperationInput::new(
+ "publish_post",
+ single_draft,
+ vec![RELAY_PRIMARY_WSS.to_owned()],
+ 1_000,
+ ))
+ .await
+ .expect("single relay");
+ let single_event = outbox
+ .get_event(single.outbox_event_id)
+ .await
+ .expect("single event")
+ .expect("single event");
+ assert_eq!(single_event.accepted_quorum, 1);
+ assert_eq!(
+ outbox
+ .relay_statuses(single.outbox_event_id)
+ .await
+ .expect("single relay statuses")
+ .len(),
+ 1
+ );
+
+ let multi_draft = post_draft(hex_64('b').as_str(), "multi");
+ let multi = outbox
+ .enqueue_operation(RadrootsOutboxOperationInput::new(
+ "publish_post",
+ multi_draft,
+ vec![
+ RELAY_PRIMARY_WSS.to_owned(),
+ RELAY_SECONDARY_WSS.to_owned(),
+ RELAY_PRIMARY_WSS.to_owned(),
+ ],
+ 1_100,
+ ))
+ .await
+ .expect("multiple relays");
+ let multi_event = outbox
+ .get_event(multi.outbox_event_id)
+ .await
+ .expect("multi event")
+ .expect("multi event");
+ assert_eq!(multi_event.accepted_quorum, 2);
+ assert_eq!(
+ outbox
+ .relay_statuses(multi.outbox_event_id)
+ .await
+ .expect("multi relay statuses")
+ .len(),
+ 2
+ );
+
+ let zero_quorum_count: i64 =
+ sqlx::query_scalar("SELECT COUNT(*) FROM outbox_event WHERE accepted_quorum = 0")
+ .fetch_one(outbox.pool())
+ .await
+ .expect("zero quorum count");
+ assert_eq!(zero_quorum_count, 0);
+ }
+
+ #[tokio::test]
async fn claim_token_guards_updates_and_expired_signing_claim_recovers() {
let outbox = RadrootsOutbox::open_memory().await.expect("open");
let draft = post_draft(hex_64('a').as_str(), "hello");