commit 1411e9d452f2ed50e2db671de82f59a1cb7da633
parent 3e202261bd7ac8177ace82089b5d961dad7fd9d0
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 14:12:01 -0700
outbox: add signed publish claim
Add a signed-only outbox claim path for publishing workers so unsigned signing work is not claimed by relay publication flows. Cover the claim behavior with an outbox unit test.
Diffstat:
1 file changed, 96 insertions(+), 0 deletions(-)
diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs
@@ -361,6 +361,57 @@ impl RadrootsOutbox {
}))
}
+ pub async fn claim_next_ready_signed_event(
+ &self,
+ claim_owner: impl AsRef<str>,
+ claim_token: impl AsRef<str>,
+ claim_expires_at_ms: i64,
+ now_ms: i64,
+ ) -> Result<Option<RadrootsOutboxClaimedEvent>, RadrootsOutboxError> {
+ let mut tx = self.pool.begin().await?;
+ let row = sqlx::query(
+ "SELECT outbox_event_id FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?) ORDER BY created_at_ms, outbox_event_id LIMIT 1",
+ )
+ .bind(now_ms)
+ .bind(now_ms)
+ .fetch_optional(&mut *tx)
+ .await?;
+ let Some(row) = row else {
+ tx.commit().await?;
+ return Ok(None);
+ };
+ let outbox_event_id: i64 = row.try_get("outbox_event_id")?;
+ let changed = sqlx::query(
+ "UPDATE outbox_event SET state = ?, claim_token = ?, claim_owner = ?, claim_expires_at_ms = ?, attempt_count = attempt_count + 1, updated_at_ms = ? WHERE outbox_event_id = ? AND state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND (claim_token IS NULL OR claim_expires_at_ms <= ?)",
+ )
+ .bind(RadrootsOutboxEventState::Publishing.as_str())
+ .bind(claim_token.as_ref())
+ .bind(claim_owner.as_ref())
+ .bind(claim_expires_at_ms)
+ .bind(now_ms)
+ .bind(outbox_event_id)
+ .bind(now_ms)
+ .execute(&mut *tx)
+ .await?;
+ if changed.rows_affected() == 0 {
+ tx.commit().await?;
+ return Ok(None);
+ }
+ let record = event_by_id_tx(&mut tx, outbox_event_id).await?;
+ let target_relays = relay_urls_for_tx(&mut tx, outbox_event_id).await?;
+ tx.commit().await?;
+ Ok(Some(RadrootsOutboxClaimedEvent {
+ outbox_event_id: record.outbox_event_id,
+ operation_id: record.operation_id,
+ expected_event_id: record.event_id,
+ state: RadrootsOutboxEventState::Publishing,
+ claim_token: claim_token.as_ref().to_owned(),
+ draft: record.draft,
+ signed_event: record.signed_event,
+ target_relays,
+ }))
+ }
+
pub async fn complete_signing(
&self,
outbox_event_id: i64,
@@ -1484,6 +1535,51 @@ mod tests {
}
#[tokio::test]
+ async fn claim_next_ready_signed_event_skips_unsigned_work() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+ let unsigned = outbox
+ .enqueue_operation(operation_input(
+ post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "unsigned"),
+ 900,
+ ))
+ .await
+ .expect("unsigned enqueue");
+ let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed-only");
+ let signed_event =
+ radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event");
+ let signed = outbox
+ .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_000))
+ .await
+ .expect("signed enqueue");
+
+ let claimed = outbox
+ .claim_next_ready_signed_event("publisher-a", "claim-a", 2_000, 1_000)
+ .await
+ .expect("claim")
+ .expect("claimed");
+ assert_eq!(claimed.outbox_event_id, signed.outbox_event_id);
+ assert_eq!(claimed.state, RadrootsOutboxEventState::Publishing);
+ assert!(claimed.signed_event.is_some());
+
+ let unsigned_event = outbox
+ .get_event(unsigned.outbox_event_id)
+ .await
+ .expect("unsigned event")
+ .expect("unsigned event");
+ assert_eq!(unsigned_event.state, RadrootsOutboxEventState::DraftQueued);
+ assert!(unsigned_event.claim_token.is_none());
+
+ let signing_claim = outbox
+ .claim_next_ready_event("signer-a", "sign-a", 2_100, 1_100)
+ .await
+ .expect("sign claim")
+ .expect("sign claim");
+ assert_eq!(signing_claim.outbox_event_id, unsigned.outbox_event_id);
+ assert_eq!(signing_claim.state, RadrootsOutboxEventState::Signing);
+ assert!(signing_claim.signed_event.is_none());
+ }
+
+ #[tokio::test]
async fn enqueue_signed_operation_idempotency_reuses_existing_signed_record() {
let outbox = RadrootsOutbox::open_memory().await.expect("open");
let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "idem-signed");