store.rs (109822B)
1 #![forbid(unsafe_code)] 2 3 use crate::RadrootsOutboxError; 4 use crate::migrations::{OUTBOX_MIGRATION_DOWN, OUTBOX_MIGRATION_UP}; 5 use crate::model::{ 6 RadrootsOutboxClaimedEvent, RadrootsOutboxEnqueueReceipt, RadrootsOutboxEnqueueStatus, 7 RadrootsOutboxEventRecord, RadrootsOutboxEventState, RadrootsOutboxEventStoreIngestReceipt, 8 RadrootsOutboxOperationInput, RadrootsOutboxOperationRecord, RadrootsOutboxOperationStatus, 9 RadrootsOutboxRelayStatus, RadrootsOutboxRelayStatusRecord, RadrootsOutboxSignedOperationInput, 10 RadrootsOutboxStatusSummary, 11 }; 12 use radroots_event_store::{RadrootsEventIngest, RadrootsEventStore}; 13 use radroots_events::RadrootsNostrEvent; 14 use radroots_events::draft::{ 15 RadrootsFrozenEventDraft, RadrootsSignedNostrEvent, validate_signed_nostr_event_matches_draft, 16 }; 17 use serde::Serialize; 18 use sha2::{Digest, Sha256}; 19 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteQueryResult}; 20 use sqlx::{Row, SqlitePool}; 21 use std::path::Path; 22 use std::str::FromStr; 23 24 #[derive(Clone)] 25 pub struct RadrootsOutbox { 26 pool: SqlitePool, 27 } 28 29 impl RadrootsOutbox { 30 pub async fn open_memory() -> Result<Self, RadrootsOutboxError> { 31 let options = SqliteConnectOptions::from_str("sqlite::memory:")?; 32 let pool = SqlitePoolOptions::new() 33 .max_connections(1) 34 .connect_with(options) 35 .await?; 36 configure_connection(&pool, false).await?; 37 apply_up(&pool).await?; 38 Ok(Self { pool }) 39 } 40 41 pub async fn open_file(path: impl AsRef<Path>) -> Result<Self, RadrootsOutboxError> { 42 let options = SqliteConnectOptions::new() 43 .filename(path) 44 .create_if_missing(true); 45 let pool = SqlitePoolOptions::new() 46 .max_connections(1) 47 .connect_with(options) 48 .await?; 49 configure_connection(&pool, true).await?; 50 apply_up(&pool).await?; 51 Ok(Self { pool }) 52 } 53 54 pub fn pool(&self) -> &SqlitePool { 55 &self.pool 56 } 57 58 pub async fn migrate_down(&self) -> Result<(), RadrootsOutboxError> { 59 apply_down(&self.pool).await 60 } 61 62 pub async fn pragma_foreign_keys(&self) -> Result<i64, RadrootsOutboxError> { 63 query_i64(&self.pool, "PRAGMA foreign_keys").await 64 } 65 66 pub async fn pragma_busy_timeout(&self) -> Result<i64, RadrootsOutboxError> { 67 query_i64(&self.pool, "PRAGMA busy_timeout").await 68 } 69 70 pub async fn pragma_journal_mode(&self) -> Result<String, RadrootsOutboxError> { 71 query_string(&self.pool, "PRAGMA journal_mode").await 72 } 73 74 pub async fn status_summary( 75 &self, 76 now_ms: i64, 77 ) -> Result<RadrootsOutboxStatusSummary, RadrootsOutboxError> { 78 let row = sqlx::query( 79 "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN state IN ('draft_queued', 'signing', 'signed', 'publishing') THEN 1 ELSE 0 END), 0) AS pending_events, COALESCE(SUM(CASE WHEN state IN ('sign_retryable', 'publish_retryable') THEN 1 ELSE 0 END), 0) AS retryable_events, COALESCE(SUM(CASE WHEN state IN ('published', 'failed_terminal', 'cancelled') THEN 1 ELSE 0 END), 0) AS terminal_events, COALESCE(SUM(CASE WHEN state = 'failed_terminal' THEN 1 ELSE 0 END), 0) AS failed_terminal_events, COALESCE(SUM(CASE WHEN state = 'publishing' THEN 1 ELSE 0 END), 0) AS publishing_events FROM outbox_event", 80 ) 81 .fetch_one(&self.pool) 82 .await?; 83 let ready_signed_events = sqlx::query( 84 "SELECT COUNT(*) FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?)", 85 ) 86 .bind(now_ms) 87 .bind(now_ms) 88 .fetch_one(&self.pool) 89 .await? 90 .try_get(0)?; 91 let last_attempt_at_ms = 92 sqlx::query("SELECT MAX(last_attempt_at_ms) FROM outbox_event_relay_status") 93 .fetch_one(&self.pool) 94 .await? 95 .try_get(0)?; 96 let last_error = sqlx::query( 97 "SELECT last_error FROM outbox_event WHERE last_error IS NOT NULL ORDER BY updated_at_ms DESC, outbox_event_id DESC LIMIT 1", 98 ) 99 .fetch_optional(&self.pool) 100 .await? 101 .map(|row| row.try_get("last_error")) 102 .transpose()?; 103 Ok(RadrootsOutboxStatusSummary { 104 total_events: row.try_get("total_events")?, 105 pending_events: row.try_get("pending_events")?, 106 retryable_events: row.try_get("retryable_events")?, 107 terminal_events: row.try_get("terminal_events")?, 108 failed_terminal_events: row.try_get("failed_terminal_events")?, 109 ready_signed_events, 110 publishing_events: row.try_get("publishing_events")?, 111 last_attempt_at_ms, 112 last_error, 113 }) 114 } 115 116 pub async fn enqueue_operation( 117 &self, 118 input: RadrootsOutboxOperationInput, 119 ) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> { 120 let target_relays = ordered_unique_relays(input.target_relays); 121 if target_relays.is_empty() && !input.allow_empty_target_relays { 122 return Err(RadrootsOutboxError::EmptyTargetRelays); 123 } 124 let digest_relays = digest_relays(target_relays.as_slice()); 125 let digest = idempotency_digest( 126 input.operation_kind.as_str(), 127 input.draft.expected_pubkey.as_str(), 128 &input.draft, 129 &digest_relays, 130 )?; 131 let accepted_quorum = target_relays.len() as i64; 132 let mut tx = self.pool.begin().await?; 133 134 if let Some(idempotency_key) = input.idempotency_key.as_deref() 135 && let Some(existing) = existing_idempotent_operation( 136 &mut tx, 137 input.operation_kind.as_str(), 138 input.draft.expected_pubkey.as_str(), 139 idempotency_key, 140 ) 141 .await? 142 { 143 if existing.idempotency_digest != digest { 144 return Err(RadrootsOutboxError::IdempotencyConflict { 145 operation_kind: input.operation_kind, 146 expected_pubkey: input.draft.expected_pubkey, 147 idempotency_key: idempotency_key.to_owned(), 148 existing_digest: existing.idempotency_digest, 149 new_digest: digest, 150 }); 151 } 152 tx.commit().await?; 153 return Ok(RadrootsOutboxEnqueueReceipt { 154 status: RadrootsOutboxEnqueueStatus::Existing, 155 operation_id: existing.operation_id, 156 outbox_event_id: existing.outbox_event_id, 157 expected_event_id: existing.event_id, 158 idempotency_digest: digest, 159 }); 160 } 161 162 let operation = sqlx::query( 163 "INSERT INTO outbox_operation(operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?)", 164 ) 165 .bind(input.operation_kind.as_str()) 166 .bind(input.draft.expected_pubkey.as_str()) 167 .bind(input.idempotency_key.as_deref()) 168 .bind(digest.as_str()) 169 .bind(RadrootsOutboxOperationStatus::Queued.as_str()) 170 .bind(input.created_at_ms) 171 .bind(input.created_at_ms) 172 .execute(&mut *tx) 173 .await?; 174 let operation_id = operation.last_insert_rowid(); 175 let draft_json = serde_json::to_string(&input.draft)?; 176 let event = sqlx::query( 177 "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, state, accepted_quorum, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, 0, ?, 0, 0, ?, ?)", 178 ) 179 .bind(operation_id) 180 .bind(input.draft.expected_event_id.as_str()) 181 .bind(input.draft.expected_pubkey.as_str()) 182 .bind(draft_json.as_str()) 183 .bind(RadrootsOutboxEventState::DraftQueued.as_str()) 184 .bind(accepted_quorum) 185 .bind(input.created_at_ms) 186 .bind(input.created_at_ms) 187 .bind(input.created_at_ms) 188 .execute(&mut *tx) 189 .await?; 190 let outbox_event_id = event.last_insert_rowid(); 191 192 for relay_url in target_relays { 193 sqlx::query( 194 "INSERT INTO outbox_event_relay_status(outbox_event_id, relay_url, status, attempt_count) VALUES (?, ?, ?, 0)", 195 ) 196 .bind(outbox_event_id) 197 .bind(relay_url.as_str()) 198 .bind(RadrootsOutboxRelayStatus::Pending.as_str()) 199 .execute(&mut *tx) 200 .await?; 201 } 202 203 tx.commit().await?; 204 Ok(RadrootsOutboxEnqueueReceipt { 205 status: RadrootsOutboxEnqueueStatus::Inserted, 206 operation_id, 207 outbox_event_id, 208 expected_event_id: input.draft.expected_event_id, 209 idempotency_digest: digest, 210 }) 211 } 212 213 pub async fn enqueue_signed_operation( 214 &self, 215 input: RadrootsOutboxSignedOperationInput, 216 ) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> { 217 validate_signed_nostr_event_matches_draft(&input.signed_event, &input.draft)?; 218 let target_relays = ordered_unique_relays(input.target_relays); 219 if target_relays.is_empty() && !input.allow_empty_target_relays { 220 return Err(RadrootsOutboxError::EmptyTargetRelays); 221 } 222 let digest_relays = digest_relays(target_relays.as_slice()); 223 let digest = idempotency_digest( 224 input.operation_kind.as_str(), 225 input.draft.expected_pubkey.as_str(), 226 &input.draft, 227 &digest_relays, 228 )?; 229 let accepted_quorum = target_relays.len() as i64; 230 let mut tx = self.pool.begin().await?; 231 232 if let Some(idempotency_key) = input.idempotency_key.as_deref() 233 && let Some(existing) = existing_idempotent_operation( 234 &mut tx, 235 input.operation_kind.as_str(), 236 input.draft.expected_pubkey.as_str(), 237 idempotency_key, 238 ) 239 .await? 240 { 241 if existing.idempotency_digest != digest { 242 return Err(RadrootsOutboxError::IdempotencyConflict { 243 operation_kind: input.operation_kind, 244 expected_pubkey: input.draft.expected_pubkey, 245 idempotency_key: idempotency_key.to_owned(), 246 existing_digest: existing.idempotency_digest, 247 new_digest: digest, 248 }); 249 } 250 tx.commit().await?; 251 return Ok(RadrootsOutboxEnqueueReceipt { 252 status: RadrootsOutboxEnqueueStatus::Existing, 253 operation_id: existing.operation_id, 254 outbox_event_id: existing.outbox_event_id, 255 expected_event_id: existing.event_id, 256 idempotency_digest: digest, 257 }); 258 } 259 260 let operation = sqlx::query( 261 "INSERT INTO outbox_operation(operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?)", 262 ) 263 .bind(input.operation_kind.as_str()) 264 .bind(input.draft.expected_pubkey.as_str()) 265 .bind(input.idempotency_key.as_deref()) 266 .bind(digest.as_str()) 267 .bind(RadrootsOutboxOperationStatus::Queued.as_str()) 268 .bind(input.created_at_ms) 269 .bind(input.created_at_ms) 270 .execute(&mut *tx) 271 .await?; 272 let operation_id = operation.last_insert_rowid(); 273 let draft_json = serde_json::to_string(&input.draft)?; 274 let signed_event_json = serde_json::to_string(&input.signed_event)?; 275 let event = sqlx::query( 276 "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, ?, 1, ?, ?, ?, ?)", 277 ) 278 .bind(operation_id) 279 .bind(input.draft.expected_event_id.as_str()) 280 .bind(input.draft.expected_pubkey.as_str()) 281 .bind(draft_json.as_str()) 282 .bind(signed_event_json.as_str()) 283 .bind(input.signed_event.raw_json.as_str()) 284 .bind(RadrootsOutboxEventState::Signed.as_str()) 285 .bind(accepted_quorum) 286 .bind(input.created_at_ms) 287 .bind(bool_i64(input.event_store_inserted)) 288 .bind(input.event_store_ingested_at_ms) 289 .bind(input.created_at_ms) 290 .bind(input.created_at_ms) 291 .execute(&mut *tx) 292 .await?; 293 let outbox_event_id = event.last_insert_rowid(); 294 295 for relay_url in target_relays { 296 sqlx::query( 297 "INSERT INTO outbox_event_relay_status(outbox_event_id, relay_url, status, attempt_count) VALUES (?, ?, ?, 0)", 298 ) 299 .bind(outbox_event_id) 300 .bind(relay_url.as_str()) 301 .bind(RadrootsOutboxRelayStatus::Pending.as_str()) 302 .execute(&mut *tx) 303 .await?; 304 } 305 306 tx.commit().await?; 307 Ok(RadrootsOutboxEnqueueReceipt { 308 status: RadrootsOutboxEnqueueStatus::Inserted, 309 operation_id, 310 outbox_event_id, 311 expected_event_id: input.draft.expected_event_id, 312 idempotency_digest: digest, 313 }) 314 } 315 316 pub async fn get_operation( 317 &self, 318 operation_id: i64, 319 ) -> Result<Option<RadrootsOutboxOperationRecord>, RadrootsOutboxError> { 320 let row = sqlx::query( 321 "SELECT operation_id, operation_kind, expected_pubkey, idempotency_key, idempotency_digest, status, created_at_ms, updated_at_ms FROM outbox_operation WHERE operation_id = ?", 322 ) 323 .bind(operation_id) 324 .fetch_optional(&self.pool) 325 .await?; 326 row.map(operation_from_row).transpose() 327 } 328 329 pub async fn get_event( 330 &self, 331 outbox_event_id: i64, 332 ) -> Result<Option<RadrootsOutboxEventRecord>, RadrootsOutboxError> { 333 let row = sqlx::query( 334 "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", 335 ) 336 .bind(outbox_event_id) 337 .fetch_optional(&self.pool) 338 .await?; 339 row.map(event_from_row).transpose() 340 } 341 342 pub async fn relay_statuses( 343 &self, 344 outbox_event_id: i64, 345 ) -> Result<Vec<RadrootsOutboxRelayStatusRecord>, RadrootsOutboxError> { 346 relay_statuses_for(&self.pool, outbox_event_id).await 347 } 348 349 pub async fn claim_next_ready_event( 350 &self, 351 claim_owner: impl AsRef<str>, 352 claim_token: impl AsRef<str>, 353 claim_expires_at_ms: i64, 354 now_ms: i64, 355 ) -> Result<Option<RadrootsOutboxClaimedEvent>, RadrootsOutboxError> { 356 let mut tx = self.pool.begin().await?; 357 let row = sqlx::query( 358 "SELECT outbox_event_id, state, signed_event_json FROM outbox_event WHERE state IN ('draft_queued', 'sign_retryable', 'signed', 'publish_retryable') AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?) ORDER BY created_at_ms, outbox_event_id LIMIT 1", 359 ) 360 .bind(now_ms) 361 .bind(now_ms) 362 .fetch_optional(&mut *tx) 363 .await?; 364 let Some(row) = row else { 365 tx.commit().await?; 366 return Ok(None); 367 }; 368 let outbox_event_id: i64 = row.try_get("outbox_event_id")?; 369 let state = RadrootsOutboxEventState::parse(row.try_get::<String, _>("state")?.as_str())?; 370 let signed_event_json: Option<String> = row.try_get("signed_event_json")?; 371 let claimed_state = match (state, signed_event_json.as_ref()) { 372 ( 373 RadrootsOutboxEventState::DraftQueued | RadrootsOutboxEventState::SignRetryable, 374 None, 375 ) => RadrootsOutboxEventState::Signing, 376 _ => RadrootsOutboxEventState::Publishing, 377 }; 378 let changed = sqlx::query( 379 "UPDATE outbox_event SET state = ?, claim_token = ?, claim_owner = ?, claim_expires_at_ms = ?, attempt_count = attempt_count + 1, updated_at_ms = ? WHERE outbox_event_id = ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?)", 380 ) 381 .bind(claimed_state.as_str()) 382 .bind(claim_token.as_ref()) 383 .bind(claim_owner.as_ref()) 384 .bind(claim_expires_at_ms) 385 .bind(now_ms) 386 .bind(outbox_event_id) 387 .bind(now_ms) 388 .execute(&mut *tx) 389 .await?; 390 if changed.rows_affected() == 0 { 391 tx.commit().await?; 392 return Ok(None); 393 } 394 let record = event_by_id_tx(&mut tx, outbox_event_id).await?; 395 let target_relays = relay_urls_for_tx(&mut tx, outbox_event_id).await?; 396 tx.commit().await?; 397 Ok(Some(RadrootsOutboxClaimedEvent { 398 outbox_event_id: record.outbox_event_id, 399 operation_id: record.operation_id, 400 expected_event_id: record.event_id, 401 attempt_count: record.attempt_count, 402 state: claimed_state, 403 claim_token: claim_token.as_ref().to_owned(), 404 draft: record.draft, 405 signed_event: record.signed_event, 406 target_relays, 407 })) 408 } 409 410 pub async fn claim_next_ready_signed_event( 411 &self, 412 claim_owner: impl AsRef<str>, 413 claim_token: impl AsRef<str>, 414 claim_expires_at_ms: i64, 415 now_ms: i64, 416 ) -> Result<Option<RadrootsOutboxClaimedEvent>, RadrootsOutboxError> { 417 let mut tx = self.pool.begin().await?; 418 let row = sqlx::query( 419 "SELECT outbox_event_id FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?) ORDER BY created_at_ms, outbox_event_id LIMIT 1", 420 ) 421 .bind(now_ms) 422 .bind(now_ms) 423 .fetch_optional(&mut *tx) 424 .await?; 425 let Some(row) = row else { 426 tx.commit().await?; 427 return Ok(None); 428 }; 429 let outbox_event_id: i64 = row.try_get("outbox_event_id")?; 430 let changed = sqlx::query( 431 "UPDATE outbox_event SET state = ?, claim_token = ?, claim_owner = ?, claim_expires_at_ms = ?, attempt_count = attempt_count + 1, updated_at_ms = ? WHERE outbox_event_id = ? AND state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND (claim_token IS NULL OR claim_expires_at_ms <= ?)", 432 ) 433 .bind(RadrootsOutboxEventState::Publishing.as_str()) 434 .bind(claim_token.as_ref()) 435 .bind(claim_owner.as_ref()) 436 .bind(claim_expires_at_ms) 437 .bind(now_ms) 438 .bind(outbox_event_id) 439 .bind(now_ms) 440 .execute(&mut *tx) 441 .await?; 442 if changed.rows_affected() == 0 { 443 tx.commit().await?; 444 return Ok(None); 445 } 446 let record = event_by_id_tx(&mut tx, outbox_event_id).await?; 447 let target_relays = relay_urls_for_tx(&mut tx, outbox_event_id).await?; 448 tx.commit().await?; 449 Ok(Some(RadrootsOutboxClaimedEvent { 450 outbox_event_id: record.outbox_event_id, 451 operation_id: record.operation_id, 452 expected_event_id: record.event_id, 453 attempt_count: record.attempt_count, 454 state: RadrootsOutboxEventState::Publishing, 455 claim_token: claim_token.as_ref().to_owned(), 456 draft: record.draft, 457 signed_event: record.signed_event, 458 target_relays, 459 })) 460 } 461 462 pub async fn complete_signing( 463 &self, 464 outbox_event_id: i64, 465 claim_token: &str, 466 signed_event: RadrootsSignedNostrEvent, 467 now_ms: i64, 468 ) -> Result<RadrootsSignedNostrEvent, RadrootsOutboxError> { 469 let record = self.claimed_event(outbox_event_id, claim_token).await?; 470 if signed_event.id != record.event_id { 471 return Err(RadrootsOutboxError::SignedEventIdMismatch { 472 expected_event_id: record.event_id, 473 actual_event_id: signed_event.id, 474 }); 475 } 476 let signed_event_json = serde_json::to_string(&signed_event)?; 477 let changed = sqlx::query( 478 "UPDATE outbox_event SET signed_event_json = ?, raw_event_json = ?, state = ?, last_error = NULL, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 479 ) 480 .bind(signed_event_json.as_str()) 481 .bind(signed_event.raw_json.as_str()) 482 .bind(RadrootsOutboxEventState::Signed.as_str()) 483 .bind(now_ms) 484 .bind(outbox_event_id) 485 .bind(claim_token) 486 .execute(&self.pool) 487 .await?; 488 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 489 .await?; 490 Ok(signed_event) 491 } 492 493 pub async fn mark_sign_retryable( 494 &self, 495 outbox_event_id: i64, 496 claim_token: &str, 497 error: impl AsRef<str>, 498 next_attempt_after_ms: i64, 499 now_ms: i64, 500 ) -> Result<(), RadrootsOutboxError> { 501 let changed = sqlx::query( 502 "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 503 ) 504 .bind(RadrootsOutboxEventState::SignRetryable.as_str()) 505 .bind(error.as_ref()) 506 .bind(next_attempt_after_ms) 507 .bind(now_ms) 508 .bind(outbox_event_id) 509 .bind(claim_token) 510 .execute(&self.pool) 511 .await?; 512 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 513 .await?; 514 Ok(()) 515 } 516 517 pub async fn mark_publish_retryable( 518 &self, 519 outbox_event_id: i64, 520 claim_token: &str, 521 error: impl AsRef<str>, 522 next_attempt_after_ms: i64, 523 now_ms: i64, 524 ) -> Result<(), RadrootsOutboxError> { 525 let changed = sqlx::query( 526 "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 527 ) 528 .bind(RadrootsOutboxEventState::PublishRetryable.as_str()) 529 .bind(error.as_ref()) 530 .bind(next_attempt_after_ms) 531 .bind(now_ms) 532 .bind(outbox_event_id) 533 .bind(claim_token) 534 .execute(&self.pool) 535 .await?; 536 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 537 .await?; 538 Ok(()) 539 } 540 541 pub async fn recover_expired_claims(&self, now_ms: i64) -> Result<u64, RadrootsOutboxError> { 542 let changed = sqlx::query( 543 "UPDATE outbox_event SET state = CASE WHEN state = 'signing' AND signed_event_json IS NULL THEN 'sign_retryable' WHEN state = 'signing' AND signed_event_json IS NOT NULL THEN 'signed' WHEN state = 'publishing' THEN 'publish_retryable' ELSE state END, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, updated_at_ms = ? WHERE claim_token IS NOT NULL AND claim_expires_at_ms <= ? AND state IN ('signing', 'signed', 'publishing')", 544 ) 545 .bind(now_ms) 546 .bind(now_ms) 547 .execute(&self.pool) 548 .await?; 549 Ok(changed.rows_affected()) 550 } 551 552 pub async fn ingest_signed_event_local( 553 &self, 554 event_store: &RadrootsEventStore, 555 outbox_event_id: i64, 556 claim_token: &str, 557 observed_at_ms: i64, 558 ) -> Result<RadrootsOutboxEventStoreIngestReceipt, RadrootsOutboxError> { 559 let record = self.claimed_event(outbox_event_id, claim_token).await?; 560 if record.event_store_ingested { 561 return Ok(RadrootsOutboxEventStoreIngestReceipt { 562 outbox_event_id, 563 event_id: record.event_id, 564 already_ingested: true, 565 event_store_inserted: false, 566 }); 567 } 568 let signed_event = record 569 .signed_event 570 .ok_or(RadrootsOutboxError::MissingSignedEvent(outbox_event_id))?; 571 let event = event_from_signed(&signed_event); 572 let ingest = RadrootsEventIngest::new(event, observed_at_ms) 573 .with_raw_json(signed_event.raw_json.clone()); 574 let receipt = event_store.ingest_event(ingest).await?; 575 let changed = sqlx::query( 576 "UPDATE outbox_event SET event_store_ingested = 1, event_store_inserted = ?, event_store_ingested_at_ms = ?, state = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 577 ) 578 .bind(bool_i64(receipt.inserted)) 579 .bind(observed_at_ms) 580 .bind(RadrootsOutboxEventState::Publishing.as_str()) 581 .bind(observed_at_ms) 582 .bind(outbox_event_id) 583 .bind(claim_token) 584 .execute(&self.pool) 585 .await?; 586 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 587 .await?; 588 Ok(RadrootsOutboxEventStoreIngestReceipt { 589 outbox_event_id, 590 event_id: receipt.event_id, 591 already_ingested: false, 592 event_store_inserted: receipt.inserted, 593 }) 594 } 595 596 pub async fn mark_relay_accepted( 597 &self, 598 outbox_event_id: i64, 599 claim_token: &str, 600 relay_url: &str, 601 acknowledged_at_ms: i64, 602 ) -> Result<(), RadrootsOutboxError> { 603 let changed = sqlx::query( 604 "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = ?, last_error = NULL WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)", 605 ) 606 .bind(RadrootsOutboxRelayStatus::Accepted.as_str()) 607 .bind(acknowledged_at_ms) 608 .bind(acknowledged_at_ms) 609 .bind(outbox_event_id) 610 .bind(relay_url) 611 .bind(outbox_event_id) 612 .bind(claim_token) 613 .execute(&self.pool) 614 .await?; 615 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 616 .await?; 617 Ok(()) 618 } 619 620 pub async fn set_publish_quorum( 621 &self, 622 outbox_event_id: i64, 623 claim_token: &str, 624 accepted_quorum: i64, 625 now_ms: i64, 626 ) -> Result<(), RadrootsOutboxError> { 627 let changed = sqlx::query( 628 "UPDATE outbox_event SET accepted_quorum = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 629 ) 630 .bind(accepted_quorum) 631 .bind(now_ms) 632 .bind(outbox_event_id) 633 .bind(claim_token) 634 .execute(&self.pool) 635 .await?; 636 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 637 .await?; 638 Ok(()) 639 } 640 641 pub async fn mark_relay_failed_retryable( 642 &self, 643 outbox_event_id: i64, 644 claim_token: &str, 645 relay_url: &str, 646 error: &str, 647 attempted_at_ms: i64, 648 ) -> Result<(), RadrootsOutboxError> { 649 self.mark_relay_failed( 650 outbox_event_id, 651 claim_token, 652 relay_url, 653 RadrootsOutboxRelayStatus::FailedRetryable, 654 error, 655 attempted_at_ms, 656 ) 657 .await 658 } 659 660 pub async fn mark_relay_failed_terminal( 661 &self, 662 outbox_event_id: i64, 663 claim_token: &str, 664 relay_url: &str, 665 error: &str, 666 attempted_at_ms: i64, 667 ) -> Result<(), RadrootsOutboxError> { 668 self.mark_relay_failed( 669 outbox_event_id, 670 claim_token, 671 relay_url, 672 RadrootsOutboxRelayStatus::FailedTerminal, 673 error, 674 attempted_at_ms, 675 ) 676 .await 677 } 678 679 pub async fn complete_publish_attempt( 680 &self, 681 outbox_event_id: i64, 682 claim_token: &str, 683 retryable_error: impl AsRef<str>, 684 terminal_error: impl AsRef<str>, 685 next_attempt_after_ms: i64, 686 now_ms: i64, 687 ) -> Result<RadrootsOutboxEventState, RadrootsOutboxError> { 688 let mut tx = self.pool.begin().await?; 689 let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?; 690 let operation_id = row.operation_id; 691 let accepted_quorum = row.accepted_quorum; 692 let accepted_count: i64 = sqlx::query( 693 "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", 694 ) 695 .bind(outbox_event_id) 696 .bind(RadrootsOutboxRelayStatus::Accepted.as_str()) 697 .fetch_one(&mut *tx) 698 .await? 699 .try_get(0)?; 700 let retryable_count: i64 = sqlx::query( 701 "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", 702 ) 703 .bind(outbox_event_id) 704 .bind(RadrootsOutboxRelayStatus::FailedRetryable.as_str()) 705 .fetch_one(&mut *tx) 706 .await? 707 .try_get(0)?; 708 let pending_count: i64 = sqlx::query( 709 "SELECT COUNT(*) FROM outbox_event_relay_status WHERE outbox_event_id = ? AND status = ?", 710 ) 711 .bind(outbox_event_id) 712 .bind(RadrootsOutboxRelayStatus::Pending.as_str()) 713 .fetch_one(&mut *tx) 714 .await? 715 .try_get(0)?; 716 717 let (event_state, operation_status, last_error, next_attempt_after_ms) = 718 if accepted_count >= accepted_quorum { 719 ( 720 RadrootsOutboxEventState::Published, 721 Some(RadrootsOutboxOperationStatus::Complete), 722 None, 723 now_ms, 724 ) 725 } else if retryable_count > 0 || pending_count > 0 { 726 ( 727 RadrootsOutboxEventState::PublishRetryable, 728 None, 729 Some(retryable_error.as_ref()), 730 next_attempt_after_ms, 731 ) 732 } else { 733 ( 734 RadrootsOutboxEventState::FailedTerminal, 735 Some(RadrootsOutboxOperationStatus::FailedTerminal), 736 Some(terminal_error.as_ref()), 737 now_ms, 738 ) 739 }; 740 741 let changed = sqlx::query( 742 "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 743 ) 744 .bind(event_state.as_str()) 745 .bind(last_error) 746 .bind(next_attempt_after_ms) 747 .bind(now_ms) 748 .bind(outbox_event_id) 749 .bind(claim_token) 750 .execute(&mut *tx) 751 .await?; 752 if changed.rows_affected() == 0 { 753 return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); 754 } 755 756 if let Some(operation_status) = operation_status { 757 sqlx::query( 758 "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?", 759 ) 760 .bind(operation_status.as_str()) 761 .bind(now_ms) 762 .bind(operation_id) 763 .execute(&mut *tx) 764 .await?; 765 } 766 767 tx.commit().await?; 768 Ok(event_state) 769 } 770 771 pub async fn mark_publish_failed_terminal( 772 &self, 773 outbox_event_id: i64, 774 claim_token: &str, 775 error: impl AsRef<str>, 776 now_ms: i64, 777 ) -> Result<(), RadrootsOutboxError> { 778 self.finish_claimed_event( 779 outbox_event_id, 780 claim_token, 781 RadrootsOutboxEventState::FailedTerminal, 782 RadrootsOutboxOperationStatus::FailedTerminal, 783 Some(error.as_ref()), 784 now_ms, 785 ) 786 .await 787 } 788 789 pub async fn cancel_claimed_event( 790 &self, 791 outbox_event_id: i64, 792 claim_token: &str, 793 now_ms: i64, 794 ) -> Result<(), RadrootsOutboxError> { 795 self.finish_claimed_event( 796 outbox_event_id, 797 claim_token, 798 RadrootsOutboxEventState::Cancelled, 799 RadrootsOutboxOperationStatus::Cancelled, 800 None, 801 now_ms, 802 ) 803 .await 804 } 805 806 async fn claimed_event( 807 &self, 808 outbox_event_id: i64, 809 claim_token: &str, 810 ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> { 811 self.ensure_claim_token(outbox_event_id, claim_token) 812 .await?; 813 self.get_event(outbox_event_id) 814 .await? 815 .ok_or(RadrootsOutboxError::EventNotFound(outbox_event_id)) 816 } 817 818 async fn ensure_claim_token( 819 &self, 820 outbox_event_id: i64, 821 claim_token: &str, 822 ) -> Result<(), RadrootsOutboxError> { 823 let row = sqlx::query("SELECT claim_token FROM outbox_event WHERE outbox_event_id = ?") 824 .bind(outbox_event_id) 825 .fetch_optional(&self.pool) 826 .await?; 827 let Some(row) = row else { 828 return Err(RadrootsOutboxError::EventNotFound(outbox_event_id)); 829 }; 830 let stored: Option<String> = row.try_get("claim_token")?; 831 if stored.as_deref() != Some(claim_token) { 832 return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); 833 } 834 Ok(()) 835 } 836 837 async fn ensure_claimed_update( 838 &self, 839 outbox_event_id: i64, 840 claim_token: &str, 841 changed: SqliteQueryResult, 842 ) -> Result<(), RadrootsOutboxError> { 843 if changed.rows_affected() > 0 { 844 return Ok(()); 845 } 846 self.ensure_claim_token(outbox_event_id, claim_token) 847 .await?; 848 Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }) 849 } 850 851 async fn finish_claimed_event( 852 &self, 853 outbox_event_id: i64, 854 claim_token: &str, 855 event_state: RadrootsOutboxEventState, 856 operation_status: RadrootsOutboxOperationStatus, 857 last_error: Option<&str>, 858 now_ms: i64, 859 ) -> Result<(), RadrootsOutboxError> { 860 let mut tx = self.pool.begin().await?; 861 let row = claimed_event_identity_tx(&mut tx, outbox_event_id, claim_token).await?; 862 let changed = sqlx::query( 863 "UPDATE outbox_event SET state = ?, claim_token = NULL, claim_owner = NULL, claim_expires_at_ms = NULL, last_error = ?, next_attempt_after_ms = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", 864 ) 865 .bind(event_state.as_str()) 866 .bind(last_error) 867 .bind(now_ms) 868 .bind(now_ms) 869 .bind(outbox_event_id) 870 .bind(claim_token) 871 .execute(&mut *tx) 872 .await?; 873 if changed.rows_affected() == 0 { 874 return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); 875 } 876 877 sqlx::query( 878 "UPDATE outbox_operation SET status = ?, updated_at_ms = ? WHERE operation_id = ?", 879 ) 880 .bind(operation_status.as_str()) 881 .bind(now_ms) 882 .bind(row.operation_id) 883 .execute(&mut *tx) 884 .await?; 885 886 tx.commit().await?; 887 Ok(()) 888 } 889 890 async fn mark_relay_failed( 891 &self, 892 outbox_event_id: i64, 893 claim_token: &str, 894 relay_url: &str, 895 status: RadrootsOutboxRelayStatus, 896 error: &str, 897 attempted_at_ms: i64, 898 ) -> Result<(), RadrootsOutboxError> { 899 let changed = sqlx::query( 900 "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = NULL, last_error = ? WHERE outbox_event_id = ? AND relay_url = ? AND EXISTS (SELECT 1 FROM outbox_event WHERE outbox_event_id = ? AND claim_token = ?)", 901 ) 902 .bind(status.as_str()) 903 .bind(attempted_at_ms) 904 .bind(error) 905 .bind(outbox_event_id) 906 .bind(relay_url) 907 .bind(outbox_event_id) 908 .bind(claim_token) 909 .execute(&self.pool) 910 .await?; 911 self.ensure_claimed_update(outbox_event_id, claim_token, changed) 912 .await?; 913 Ok(()) 914 } 915 } 916 917 struct ExistingOperation { 918 operation_id: i64, 919 outbox_event_id: i64, 920 event_id: String, 921 idempotency_digest: String, 922 } 923 924 struct ClaimedEventIdentity { 925 operation_id: i64, 926 accepted_quorum: i64, 927 } 928 929 async fn configure_connection( 930 pool: &SqlitePool, 931 file_backed: bool, 932 ) -> Result<(), RadrootsOutboxError> { 933 sqlx::query("PRAGMA foreign_keys = ON") 934 .execute(pool) 935 .await?; 936 sqlx::query("PRAGMA busy_timeout = 5000") 937 .execute(pool) 938 .await?; 939 if file_backed { 940 sqlx::query("PRAGMA journal_mode = WAL") 941 .execute(pool) 942 .await?; 943 } 944 Ok(()) 945 } 946 947 #[cfg_attr(coverage_nightly, coverage(off))] 948 async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> { 949 sqlx::raw_sql(OUTBOX_MIGRATION_UP).execute(pool).await?; 950 Ok(()) 951 } 952 953 #[cfg_attr(coverage_nightly, coverage(off))] 954 async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsOutboxError> { 955 sqlx::raw_sql(OUTBOX_MIGRATION_DOWN).execute(pool).await?; 956 Ok(()) 957 } 958 959 #[cfg_attr(coverage_nightly, coverage(off))] 960 async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsOutboxError> { 961 let row = sqlx::query(sql).fetch_one(pool).await?; 962 Ok(row.try_get(0)?) 963 } 964 965 #[cfg_attr(coverage_nightly, coverage(off))] 966 async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsOutboxError> { 967 let row = sqlx::query(sql).fetch_one(pool).await?; 968 Ok(row.try_get(0)?) 969 } 970 971 #[cfg_attr(coverage_nightly, coverage(off))] 972 async fn existing_idempotent_operation( 973 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 974 operation_kind: &str, 975 expected_pubkey: &str, 976 idempotency_key: &str, 977 ) -> Result<Option<ExistingOperation>, RadrootsOutboxError> { 978 let row = sqlx::query( 979 "SELECT o.operation_id, o.idempotency_digest, e.outbox_event_id, e.event_id FROM outbox_operation o JOIN outbox_event e ON e.operation_id = o.operation_id WHERE o.operation_kind = ? AND o.expected_pubkey = ? AND o.idempotency_key = ? ORDER BY e.outbox_event_id LIMIT 1", 980 ) 981 .bind(operation_kind) 982 .bind(expected_pubkey) 983 .bind(idempotency_key) 984 .fetch_optional(&mut **tx) 985 .await?; 986 row.map(|row| { 987 Ok(ExistingOperation { 988 operation_id: row.try_get("operation_id")?, 989 outbox_event_id: row.try_get("outbox_event_id")?, 990 event_id: row.try_get("event_id")?, 991 idempotency_digest: row.try_get("idempotency_digest")?, 992 }) 993 }) 994 .transpose() 995 } 996 997 #[cfg_attr(coverage_nightly, coverage(off))] 998 async fn event_by_id_tx( 999 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 1000 outbox_event_id: i64, 1001 ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> { 1002 let row = sqlx::query( 1003 "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", 1004 ) 1005 .bind(outbox_event_id) 1006 .fetch_one(&mut **tx) 1007 .await?; 1008 event_from_row(row) 1009 } 1010 1011 async fn claimed_event_identity_tx( 1012 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 1013 outbox_event_id: i64, 1014 claim_token: &str, 1015 ) -> Result<ClaimedEventIdentity, RadrootsOutboxError> { 1016 let row = 1017 sqlx::query("SELECT operation_id, accepted_quorum, claim_token FROM outbox_event WHERE outbox_event_id = ?") 1018 .bind(outbox_event_id) 1019 .fetch_optional(&mut **tx) 1020 .await?; 1021 let Some(row) = row else { 1022 return Err(RadrootsOutboxError::EventNotFound(outbox_event_id)); 1023 }; 1024 let stored: Option<String> = row.try_get("claim_token")?; 1025 if stored.as_deref() != Some(claim_token) { 1026 return Err(RadrootsOutboxError::ClaimTokenMismatch { outbox_event_id }); 1027 } 1028 Ok(ClaimedEventIdentity { 1029 operation_id: row.try_get("operation_id")?, 1030 accepted_quorum: row.try_get("accepted_quorum")?, 1031 }) 1032 } 1033 1034 #[cfg_attr(coverage_nightly, coverage(off))] 1035 async fn relay_urls_for_tx( 1036 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 1037 outbox_event_id: i64, 1038 ) -> Result<Vec<String>, RadrootsOutboxError> { 1039 let rows = sqlx::query( 1040 "SELECT relay_url FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid", 1041 ) 1042 .bind(outbox_event_id) 1043 .fetch_all(&mut **tx) 1044 .await?; 1045 rows.into_iter() 1046 .map(|row| row.try_get("relay_url").map_err(Into::into)) 1047 .collect() 1048 } 1049 1050 #[cfg_attr(coverage_nightly, coverage(off))] 1051 async fn relay_statuses_for( 1052 pool: &SqlitePool, 1053 outbox_event_id: i64, 1054 ) -> Result<Vec<RadrootsOutboxRelayStatusRecord>, RadrootsOutboxError> { 1055 let rows = sqlx::query( 1056 "SELECT outbox_event_id, relay_url, status, attempt_count, last_attempt_at_ms, acknowledged_at_ms, last_error FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid", 1057 ) 1058 .bind(outbox_event_id) 1059 .fetch_all(pool) 1060 .await?; 1061 rows.into_iter().map(relay_status_from_row).collect() 1062 } 1063 1064 #[cfg_attr(coverage_nightly, coverage(off))] 1065 fn operation_from_row( 1066 row: sqlx::sqlite::SqliteRow, 1067 ) -> Result<RadrootsOutboxOperationRecord, RadrootsOutboxError> { 1068 let status = 1069 RadrootsOutboxOperationStatus::parse(row.try_get::<String, _>("status")?.as_str())?; 1070 Ok(RadrootsOutboxOperationRecord { 1071 operation_id: row.try_get("operation_id")?, 1072 operation_kind: row.try_get("operation_kind")?, 1073 expected_pubkey: row.try_get("expected_pubkey")?, 1074 idempotency_key: row.try_get("idempotency_key")?, 1075 idempotency_digest: row.try_get("idempotency_digest")?, 1076 status, 1077 created_at_ms: row.try_get("created_at_ms")?, 1078 updated_at_ms: row.try_get("updated_at_ms")?, 1079 }) 1080 } 1081 1082 #[cfg_attr(coverage_nightly, coverage(off))] 1083 fn event_from_row( 1084 row: sqlx::sqlite::SqliteRow, 1085 ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> { 1086 let draft: RadrootsFrozenEventDraft = 1087 serde_json::from_str(row.try_get::<String, _>("draft_json")?.as_str())?; 1088 let signed_event = row 1089 .try_get::<Option<String>, _>("signed_event_json")? 1090 .map(|json| serde_json::from_str(json.as_str())) 1091 .transpose()?; 1092 let state = RadrootsOutboxEventState::parse(row.try_get::<String, _>("state")?.as_str())?; 1093 Ok(RadrootsOutboxEventRecord { 1094 outbox_event_id: row.try_get("outbox_event_id")?, 1095 operation_id: row.try_get("operation_id")?, 1096 event_id: row.try_get("event_id")?, 1097 expected_pubkey: row.try_get("expected_pubkey")?, 1098 draft, 1099 signed_event, 1100 raw_event_json: row.try_get("raw_event_json")?, 1101 state, 1102 accepted_quorum: row.try_get("accepted_quorum")?, 1103 attempt_count: row.try_get("attempt_count")?, 1104 claim_token: row.try_get("claim_token")?, 1105 claim_owner: row.try_get("claim_owner")?, 1106 claim_expires_at_ms: row.try_get("claim_expires_at_ms")?, 1107 next_attempt_after_ms: row.try_get("next_attempt_after_ms")?, 1108 last_error: row.try_get("last_error")?, 1109 event_store_ingested: row.try_get::<i64, _>("event_store_ingested")? != 0, 1110 event_store_inserted: row.try_get::<i64, _>("event_store_inserted")? != 0, 1111 event_store_ingested_at_ms: row.try_get("event_store_ingested_at_ms")?, 1112 created_at_ms: row.try_get("created_at_ms")?, 1113 updated_at_ms: row.try_get("updated_at_ms")?, 1114 }) 1115 } 1116 1117 #[cfg_attr(coverage_nightly, coverage(off))] 1118 fn relay_status_from_row( 1119 row: sqlx::sqlite::SqliteRow, 1120 ) -> Result<RadrootsOutboxRelayStatusRecord, RadrootsOutboxError> { 1121 let status = RadrootsOutboxRelayStatus::parse(row.try_get::<String, _>("status")?.as_str())?; 1122 Ok(RadrootsOutboxRelayStatusRecord { 1123 outbox_event_id: row.try_get("outbox_event_id")?, 1124 relay_url: row.try_get("relay_url")?, 1125 status, 1126 attempt_count: row.try_get("attempt_count")?, 1127 last_attempt_at_ms: row.try_get("last_attempt_at_ms")?, 1128 acknowledged_at_ms: row.try_get("acknowledged_at_ms")?, 1129 last_error: row.try_get("last_error")?, 1130 }) 1131 } 1132 1133 fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent { 1134 RadrootsNostrEvent { 1135 id: signed_event.id.clone(), 1136 author: signed_event.pubkey.clone(), 1137 created_at: signed_event.created_at, 1138 kind: signed_event.kind, 1139 tags: signed_event.tags.clone(), 1140 content: signed_event.content.clone(), 1141 sig: signed_event.sig.clone(), 1142 } 1143 } 1144 1145 fn ordered_unique_relays(relays: Vec<String>) -> Vec<String> { 1146 let mut out = Vec::new(); 1147 for relay in relays { 1148 if !out.iter().any(|existing| existing == &relay) { 1149 out.push(relay); 1150 } 1151 } 1152 out 1153 } 1154 1155 fn digest_relays(relays: &[String]) -> Vec<String> { 1156 let mut out = relays.to_vec(); 1157 out.sort(); 1158 out.dedup(); 1159 out 1160 } 1161 1162 #[derive(Serialize)] 1163 struct DigestInput<'a> { 1164 operation_kind: &'a str, 1165 expected_pubkey: &'a str, 1166 draft: &'a RadrootsFrozenEventDraft, 1167 target_relays: &'a [String], 1168 } 1169 1170 fn idempotency_digest( 1171 operation_kind: &str, 1172 expected_pubkey: &str, 1173 draft: &RadrootsFrozenEventDraft, 1174 target_relays: &[String], 1175 ) -> Result<String, RadrootsOutboxError> { 1176 let input = DigestInput { 1177 operation_kind, 1178 expected_pubkey, 1179 draft, 1180 target_relays, 1181 }; 1182 let bytes = serde_json::to_vec(&input)?; 1183 Ok(hex::encode(Sha256::digest(bytes))) 1184 } 1185 1186 fn bool_i64(value: bool) -> i64 { 1187 if value { 1 } else { 0 } 1188 } 1189 1190 #[cfg(test)] 1191 mod tests { 1192 use super::*; 1193 use radroots_events::kinds::KIND_POST; 1194 use radroots_nostr::prelude::{ 1195 RadrootsNostrKeys, RadrootsNostrSecretKey, radroots_nostr_sign_frozen_draft, 1196 }; 1197 1198 const FIXTURE_ALICE_SECRET_KEY_HEX: &str = 1199 "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5"; 1200 const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str = 1201 "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df"; 1202 const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com"; 1203 const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com"; 1204 1205 fn hex_64(character: char) -> String { 1206 std::iter::repeat_n(character, 64).collect() 1207 } 1208 1209 fn post_draft(expected_pubkey: &str, content: &str) -> RadrootsFrozenEventDraft { 1210 RadrootsFrozenEventDraft::new( 1211 "radroots.social.post.v1", 1212 KIND_POST, 1213 1_700_000_000, 1214 vec![vec!["t".to_owned(), "soil".to_owned()]], 1215 content, 1216 expected_pubkey, 1217 ) 1218 .expect("post draft") 1219 } 1220 1221 fn operation_input( 1222 draft: RadrootsFrozenEventDraft, 1223 created_at_ms: i64, 1224 ) -> RadrootsOutboxOperationInput { 1225 RadrootsOutboxOperationInput::new( 1226 "publish_post", 1227 draft, 1228 vec![ 1229 RELAY_PRIMARY_WSS.to_owned(), 1230 RELAY_SECONDARY_WSS.to_owned(), 1231 RELAY_PRIMARY_WSS.to_owned(), 1232 ], 1233 created_at_ms, 1234 ) 1235 } 1236 1237 fn signed_operation_input( 1238 draft: RadrootsFrozenEventDraft, 1239 signed_event: RadrootsSignedNostrEvent, 1240 created_at_ms: i64, 1241 ) -> RadrootsOutboxSignedOperationInput { 1242 RadrootsOutboxSignedOperationInput::new( 1243 "publish_post", 1244 draft, 1245 signed_event, 1246 vec![ 1247 RELAY_PRIMARY_WSS.to_owned(), 1248 RELAY_SECONDARY_WSS.to_owned(), 1249 RELAY_PRIMARY_WSS.to_owned(), 1250 ], 1251 true, 1252 created_at_ms + 7, 1253 created_at_ms, 1254 ) 1255 } 1256 1257 fn fixture_keys() -> RadrootsNostrKeys { 1258 let secret_key = 1259 RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key"); 1260 RadrootsNostrKeys::new(secret_key) 1261 } 1262 1263 async fn enqueue_signed_fixture( 1264 outbox: &RadrootsOutbox, 1265 ) -> (RadrootsOutboxEnqueueReceipt, RadrootsOutboxClaimedEvent) { 1266 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "hello"); 1267 let receipt = outbox 1268 .enqueue_operation(operation_input(draft, 1_000)) 1269 .await 1270 .expect("enqueue"); 1271 let claimed = outbox 1272 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 1273 .await 1274 .expect("claim") 1275 .expect("claimed event"); 1276 (receipt, claimed) 1277 } 1278 1279 async fn complete_claimed_signing( 1280 outbox: &RadrootsOutbox, 1281 claimed: &RadrootsOutboxClaimedEvent, 1282 keys: &RadrootsNostrKeys, 1283 now_ms: i64, 1284 ) -> RadrootsSignedNostrEvent { 1285 if let Some(signed_event) = claimed.signed_event.clone() { 1286 return signed_event; 1287 } 1288 let signed_event = 1289 radroots_nostr_sign_frozen_draft(keys, &claimed.draft).expect("signed event"); 1290 outbox 1291 .complete_signing( 1292 claimed.outbox_event_id, 1293 claimed.claim_token.as_str(), 1294 signed_event, 1295 now_ms, 1296 ) 1297 .await 1298 .expect("complete signing") 1299 } 1300 1301 async fn table_count(outbox: &RadrootsOutbox, table_name: &str) -> i64 { 1302 let sql = format!("SELECT COUNT(*) FROM {table_name}"); 1303 sqlx::query_scalar(sql.as_str()) 1304 .fetch_one(outbox.pool()) 1305 .await 1306 .expect("table count") 1307 } 1308 1309 #[tokio::test] 1310 async fn migration_applies_pragmas_and_migrates_down() { 1311 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1312 1313 assert_eq!(outbox.pragma_foreign_keys().await.expect("foreign keys"), 1); 1314 assert_eq!( 1315 outbox.pragma_busy_timeout().await.expect("busy timeout"), 1316 5_000 1317 ); 1318 assert_eq!( 1319 outbox.pragma_journal_mode().await.expect("journal mode"), 1320 "memory" 1321 ); 1322 1323 let row = sqlx::query( 1324 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'outbox_event'", 1325 ) 1326 .fetch_optional(outbox.pool()) 1327 .await 1328 .expect("table query"); 1329 assert!(row.is_some()); 1330 1331 outbox.migrate_down().await.expect("migrate down"); 1332 let row = sqlx::query( 1333 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'outbox_event'", 1334 ) 1335 .fetch_optional(outbox.pool()) 1336 .await 1337 .expect("table query"); 1338 assert!(row.is_none()); 1339 } 1340 1341 #[tokio::test] 1342 async fn file_outbox_reopens_existing_schema() { 1343 let tempdir = tempfile::tempdir().expect("tempdir"); 1344 let path = tempdir.path().join("outbox.sqlite"); 1345 1346 let first = RadrootsOutbox::open_file(&path).await.expect("first"); 1347 assert_eq!(first.pragma_foreign_keys().await.expect("foreign keys"), 1); 1348 drop(first); 1349 1350 let second = RadrootsOutbox::open_file(&path).await.expect("second"); 1351 assert_eq!(second.pragma_foreign_keys().await.expect("foreign keys"), 1); 1352 } 1353 1354 #[tokio::test] 1355 async fn status_summary_counts_ready_publishing_retryable_and_terminal_work() { 1356 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1357 1358 let empty = outbox.status_summary(1_000).await.expect("empty status"); 1359 assert_eq!(empty.total_events, 0); 1360 assert_eq!(empty.pending_events, 0); 1361 assert_eq!(empty.retryable_events, 0); 1362 assert_eq!(empty.terminal_events, 0); 1363 assert_eq!(empty.failed_terminal_events, 0); 1364 assert_eq!(empty.ready_signed_events, 0); 1365 assert_eq!(empty.publishing_events, 0); 1366 assert_eq!(empty.last_attempt_at_ms, None); 1367 assert_eq!(empty.last_error, None); 1368 1369 let keys = fixture_keys(); 1370 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ready"); 1371 let signed_event = radroots_nostr_sign_frozen_draft(&keys, &draft).expect("signed event"); 1372 let receipt = outbox 1373 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 1374 .await 1375 .expect("signed enqueue"); 1376 let ready = outbox.status_summary(1_000).await.expect("ready status"); 1377 assert_eq!(ready.total_events, 1); 1378 assert_eq!(ready.pending_events, 1); 1379 assert_eq!(ready.retryable_events, 0); 1380 assert_eq!(ready.terminal_events, 0); 1381 assert_eq!(ready.ready_signed_events, 1); 1382 1383 let claimed = outbox 1384 .claim_next_ready_signed_event("publisher", "claim-a", 2_500, 2_000) 1385 .await 1386 .expect("claim") 1387 .expect("claimed"); 1388 let publishing = outbox 1389 .status_summary(2_000) 1390 .await 1391 .expect("publishing status"); 1392 assert_eq!(publishing.pending_events, 1); 1393 assert_eq!(publishing.publishing_events, 1); 1394 assert_eq!(publishing.ready_signed_events, 0); 1395 1396 outbox 1397 .mark_relay_failed_retryable( 1398 receipt.outbox_event_id, 1399 claimed.claim_token.as_str(), 1400 RELAY_PRIMARY_WSS, 1401 "timeout: relay unavailable", 1402 2_100, 1403 ) 1404 .await 1405 .expect("relay failed"); 1406 outbox 1407 .mark_publish_retryable( 1408 receipt.outbox_event_id, 1409 claimed.claim_token.as_str(), 1410 "relay publish incomplete", 1411 3_000, 1412 2_200, 1413 ) 1414 .await 1415 .expect("retryable"); 1416 1417 let retry_wait = outbox 1418 .status_summary(2_900) 1419 .await 1420 .expect("retry wait status"); 1421 assert_eq!(retry_wait.pending_events, 0); 1422 assert_eq!(retry_wait.retryable_events, 1); 1423 assert_eq!(retry_wait.terminal_events, 0); 1424 assert_eq!(retry_wait.failed_terminal_events, 0); 1425 assert_eq!(retry_wait.ready_signed_events, 0); 1426 assert_eq!(retry_wait.last_attempt_at_ms, Some(2_100)); 1427 assert_eq!( 1428 retry_wait.last_error.as_deref(), 1429 Some("relay publish incomplete") 1430 ); 1431 1432 let retry_ready = outbox 1433 .status_summary(3_000) 1434 .await 1435 .expect("retry ready status"); 1436 assert_eq!(retry_ready.ready_signed_events, 1); 1437 } 1438 1439 #[test] 1440 fn terminal_and_cancelled_event_states_round_trip() { 1441 assert_eq!(bool_i64(true), 1); 1442 assert_eq!(bool_i64(false), 0); 1443 assert_eq!( 1444 RadrootsOutboxOperationStatus::parse("failed_terminal").expect("operation status"), 1445 RadrootsOutboxOperationStatus::FailedTerminal 1446 ); 1447 assert_eq!( 1448 RadrootsOutboxOperationStatus::FailedTerminal.as_str(), 1449 "failed_terminal" 1450 ); 1451 assert_eq!( 1452 RadrootsOutboxOperationStatus::parse("cancelled").expect("operation status"), 1453 RadrootsOutboxOperationStatus::Cancelled 1454 ); 1455 assert_eq!( 1456 RadrootsOutboxOperationStatus::Cancelled.as_str(), 1457 "cancelled" 1458 ); 1459 assert_eq!( 1460 RadrootsOutboxEventState::parse("failed_terminal").expect("event state"), 1461 RadrootsOutboxEventState::FailedTerminal 1462 ); 1463 assert!(RadrootsOutboxEventState::FailedTerminal.is_terminal()); 1464 assert_eq!( 1465 RadrootsOutboxEventState::parse("cancelled").expect("event state"), 1466 RadrootsOutboxEventState::Cancelled 1467 ); 1468 assert!(RadrootsOutboxEventState::Cancelled.is_terminal()); 1469 assert!(RadrootsOutboxEventState::Published.is_terminal()); 1470 assert!(!RadrootsOutboxEventState::PublishRetryable.is_terminal()); 1471 } 1472 1473 #[tokio::test] 1474 async fn enqueue_idempotency_is_scoped_by_kind_pubkey_and_digest() { 1475 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1476 let first_draft = post_draft(hex_64('a').as_str(), "hello"); 1477 1478 let first = outbox 1479 .enqueue_operation(operation_input(first_draft.clone(), 1_000)) 1480 .await 1481 .expect("first enqueue"); 1482 let second = outbox 1483 .enqueue_operation(operation_input(first_draft.clone(), 1_001)) 1484 .await 1485 .expect("second enqueue"); 1486 1487 assert_eq!(first.status, RadrootsOutboxEnqueueStatus::Inserted); 1488 assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Inserted); 1489 assert_ne!(first.operation_id, second.operation_id); 1490 assert_ne!(first.outbox_event_id, second.outbox_event_id); 1491 1492 let keyed_first = outbox 1493 .enqueue_operation( 1494 operation_input(first_draft.clone(), 1_002).with_idempotency_key("idem-a"), 1495 ) 1496 .await 1497 .expect("keyed first"); 1498 let keyed_second = outbox 1499 .enqueue_operation( 1500 operation_input(first_draft.clone(), 1_003).with_idempotency_key("idem-a"), 1501 ) 1502 .await 1503 .expect("keyed second"); 1504 1505 assert_eq!(keyed_first.status, RadrootsOutboxEnqueueStatus::Inserted); 1506 assert_eq!(keyed_second.status, RadrootsOutboxEnqueueStatus::Existing); 1507 assert_eq!(keyed_first.operation_id, keyed_second.operation_id); 1508 assert_eq!(keyed_first.outbox_event_id, keyed_second.outbox_event_id); 1509 assert_eq!( 1510 keyed_first.idempotency_digest, 1511 keyed_second.idempotency_digest 1512 ); 1513 1514 let conflict = outbox 1515 .enqueue_operation( 1516 operation_input(post_draft(hex_64('a').as_str(), "changed"), 1_004) 1517 .with_idempotency_key("idem-a"), 1518 ) 1519 .await 1520 .expect_err("conflict"); 1521 assert!(matches!( 1522 conflict, 1523 RadrootsOutboxError::IdempotencyConflict { .. } 1524 )); 1525 1526 let other_kind = outbox 1527 .enqueue_operation( 1528 RadrootsOutboxOperationInput::new( 1529 "publish_post_reply", 1530 first_draft.clone(), 1531 vec![RELAY_PRIMARY_WSS.to_owned()], 1532 1_005, 1533 ) 1534 .with_idempotency_key("idem-a"), 1535 ) 1536 .await 1537 .expect("other kind"); 1538 assert_eq!(other_kind.status, RadrootsOutboxEnqueueStatus::Inserted); 1539 1540 let other_pubkey = outbox 1541 .enqueue_operation( 1542 operation_input(post_draft(hex_64('b').as_str(), "hello"), 1_006) 1543 .with_idempotency_key("idem-a"), 1544 ) 1545 .await 1546 .expect("other pubkey"); 1547 assert_eq!(other_pubkey.status, RadrootsOutboxEnqueueStatus::Inserted); 1548 } 1549 1550 #[tokio::test] 1551 async fn enqueue_idempotency_digest_sorts_relays_but_publish_order_is_preserved() { 1552 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1553 let draft = post_draft(hex_64('a').as_str(), "hello"); 1554 let first = outbox 1555 .enqueue_operation( 1556 RadrootsOutboxOperationInput::new( 1557 "publish_post", 1558 draft.clone(), 1559 vec![ 1560 RELAY_SECONDARY_WSS.to_owned(), 1561 RELAY_PRIMARY_WSS.to_owned(), 1562 RELAY_SECONDARY_WSS.to_owned(), 1563 ], 1564 1_000, 1565 ) 1566 .with_idempotency_key("idem-relay-order"), 1567 ) 1568 .await 1569 .expect("first enqueue"); 1570 let second = outbox 1571 .enqueue_operation( 1572 RadrootsOutboxOperationInput::new( 1573 "publish_post", 1574 draft, 1575 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], 1576 1_001, 1577 ) 1578 .with_idempotency_key("idem-relay-order"), 1579 ) 1580 .await 1581 .expect("second enqueue"); 1582 1583 assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Existing); 1584 assert_eq!(first.idempotency_digest, second.idempotency_digest); 1585 1586 let claimed = outbox 1587 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 1588 .await 1589 .expect("claim") 1590 .expect("claimed event"); 1591 assert_eq!( 1592 claimed.target_relays, 1593 vec![RELAY_SECONDARY_WSS.to_owned(), RELAY_PRIMARY_WSS.to_owned()] 1594 ); 1595 } 1596 1597 #[tokio::test] 1598 async fn enqueue_rejects_empty_target_relays_before_persistence() { 1599 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1600 let draft = post_draft(hex_64('a').as_str(), "hello"); 1601 1602 let err = outbox 1603 .enqueue_operation( 1604 RadrootsOutboxOperationInput::new("publish_post", draft, Vec::new(), 1_000) 1605 .with_idempotency_key("empty-relays"), 1606 ) 1607 .await 1608 .expect_err("empty relays"); 1609 1610 assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays)); 1611 assert_eq!(table_count(&outbox, "outbox_operation").await, 0); 1612 assert_eq!(table_count(&outbox, "outbox_event").await, 0); 1613 assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0); 1614 } 1615 1616 #[tokio::test] 1617 async fn enqueue_signed_rejects_empty_target_relays_before_persistence() { 1618 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1619 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed empty"); 1620 let signed_event = 1621 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 1622 1623 let err = outbox 1624 .enqueue_signed_operation(RadrootsOutboxSignedOperationInput::new( 1625 "publish_post", 1626 draft, 1627 signed_event, 1628 Vec::new(), 1629 false, 1630 1_007, 1631 1_000, 1632 )) 1633 .await 1634 .expect_err("empty relays"); 1635 1636 assert!(matches!(err, RadrootsOutboxError::EmptyTargetRelays)); 1637 assert_eq!(table_count(&outbox, "outbox_operation").await, 0); 1638 assert_eq!(table_count(&outbox, "outbox_event").await, 0); 1639 assert_eq!(table_count(&outbox, "outbox_event_relay_status").await, 0); 1640 } 1641 1642 #[tokio::test] 1643 async fn enqueue_allows_explicitly_delegated_empty_target_relays() { 1644 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1645 let draft = post_draft(hex_64('a').as_str(), "delegated empty"); 1646 1647 let receipt = outbox 1648 .enqueue_operation( 1649 RadrootsOutboxOperationInput::new("publish_post", draft, Vec::new(), 1_000) 1650 .allow_empty_target_relays(), 1651 ) 1652 .await 1653 .expect("delegated empty relays"); 1654 1655 let event = outbox 1656 .get_event(receipt.outbox_event_id) 1657 .await 1658 .expect("event") 1659 .expect("event"); 1660 assert_eq!(event.accepted_quorum, 0); 1661 assert_eq!( 1662 outbox 1663 .relay_statuses(receipt.outbox_event_id) 1664 .await 1665 .expect("relay statuses"), 1666 Vec::new() 1667 ); 1668 1669 let claimed = outbox 1670 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 1671 .await 1672 .expect("claim") 1673 .expect("claim"); 1674 assert_eq!(claimed.target_relays, Vec::<String>::new()); 1675 } 1676 1677 #[tokio::test] 1678 async fn enqueue_signed_allows_explicitly_delegated_empty_target_relays() { 1679 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1680 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed delegated empty"); 1681 let signed_event = 1682 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 1683 1684 let receipt = outbox 1685 .enqueue_signed_operation( 1686 RadrootsOutboxSignedOperationInput::new( 1687 "publish_post", 1688 draft, 1689 signed_event.clone(), 1690 Vec::new(), 1691 false, 1692 1_007, 1693 1_000, 1694 ) 1695 .allow_empty_target_relays(), 1696 ) 1697 .await 1698 .expect("delegated signed empty relays"); 1699 1700 let event = outbox 1701 .get_event(receipt.outbox_event_id) 1702 .await 1703 .expect("event") 1704 .expect("event"); 1705 assert_eq!(event.accepted_quorum, 0); 1706 assert_eq!(event.signed_event, Some(signed_event.clone())); 1707 assert_eq!( 1708 outbox 1709 .relay_statuses(receipt.outbox_event_id) 1710 .await 1711 .expect("relay statuses"), 1712 Vec::new() 1713 ); 1714 1715 let claimed = outbox 1716 .claim_next_ready_signed_event("publisher-a", "claim-a", 2_000, 1_000) 1717 .await 1718 .expect("claim") 1719 .expect("claim"); 1720 assert_eq!(claimed.signed_event, Some(signed_event)); 1721 assert_eq!(claimed.target_relays, Vec::<String>::new()); 1722 } 1723 1724 #[tokio::test] 1725 async fn claim_next_ready_event_returns_none_when_no_work_is_ready() { 1726 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1727 1728 assert!( 1729 outbox 1730 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 1731 .await 1732 .expect("claim") 1733 .is_none() 1734 ); 1735 assert!( 1736 outbox 1737 .claim_next_ready_signed_event("publisher-a", "claim-b", 2_000, 1_000) 1738 .await 1739 .expect("claim signed") 1740 .is_none() 1741 ); 1742 } 1743 1744 #[tokio::test] 1745 async fn enqueue_accepts_single_and_multiple_target_relays() { 1746 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1747 let single_draft = post_draft(hex_64('a').as_str(), "single"); 1748 1749 let single = outbox 1750 .enqueue_operation(RadrootsOutboxOperationInput::new( 1751 "publish_post", 1752 single_draft, 1753 vec![RELAY_PRIMARY_WSS.to_owned()], 1754 1_000, 1755 )) 1756 .await 1757 .expect("single relay"); 1758 let single_event = outbox 1759 .get_event(single.outbox_event_id) 1760 .await 1761 .expect("single event") 1762 .expect("single event"); 1763 assert_eq!(single_event.accepted_quorum, 1); 1764 assert_eq!( 1765 outbox 1766 .relay_statuses(single.outbox_event_id) 1767 .await 1768 .expect("single relay statuses") 1769 .len(), 1770 1 1771 ); 1772 1773 let multi_draft = post_draft(hex_64('b').as_str(), "multi"); 1774 let multi = outbox 1775 .enqueue_operation(RadrootsOutboxOperationInput::new( 1776 "publish_post", 1777 multi_draft, 1778 vec![ 1779 RELAY_PRIMARY_WSS.to_owned(), 1780 RELAY_SECONDARY_WSS.to_owned(), 1781 RELAY_PRIMARY_WSS.to_owned(), 1782 ], 1783 1_100, 1784 )) 1785 .await 1786 .expect("multiple relays"); 1787 let multi_event = outbox 1788 .get_event(multi.outbox_event_id) 1789 .await 1790 .expect("multi event") 1791 .expect("multi event"); 1792 assert_eq!(multi_event.accepted_quorum, 2); 1793 assert_eq!( 1794 outbox 1795 .relay_statuses(multi.outbox_event_id) 1796 .await 1797 .expect("multi relay statuses") 1798 .len(), 1799 2 1800 ); 1801 1802 let zero_quorum_count: i64 = 1803 sqlx::query_scalar("SELECT COUNT(*) FROM outbox_event WHERE accepted_quorum = 0") 1804 .fetch_one(outbox.pool()) 1805 .await 1806 .expect("zero quorum count"); 1807 assert_eq!(zero_quorum_count, 0); 1808 } 1809 1810 #[tokio::test] 1811 async fn enqueue_signed_operation_stores_pushable_signed_event_without_claim() { 1812 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1813 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed"); 1814 let signed_event = 1815 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 1816 let expected_raw_json = signed_event.raw_json.clone(); 1817 1818 let receipt = outbox 1819 .enqueue_signed_operation( 1820 signed_operation_input(draft.clone(), signed_event.clone(), 1_000) 1821 .with_idempotency_key("signed-a"), 1822 ) 1823 .await 1824 .expect("signed enqueue"); 1825 1826 assert_eq!(receipt.status, RadrootsOutboxEnqueueStatus::Inserted); 1827 assert_eq!(receipt.expected_event_id, draft.expected_event_id); 1828 1829 let event = outbox 1830 .get_event(receipt.outbox_event_id) 1831 .await 1832 .expect("event") 1833 .expect("event"); 1834 assert_eq!(event.state, RadrootsOutboxEventState::Signed); 1835 assert_eq!(event.signed_event, Some(signed_event.clone())); 1836 assert_eq!(event.raw_event_json, Some(expected_raw_json)); 1837 assert!(event.event_store_ingested); 1838 assert!(event.event_store_inserted); 1839 assert_eq!(event.event_store_ingested_at_ms, Some(1_007)); 1840 assert_eq!(event.claim_token, None); 1841 assert_eq!(event.claim_owner, None); 1842 assert_eq!(event.claim_expires_at_ms, None); 1843 1844 let statuses = outbox 1845 .relay_statuses(receipt.outbox_event_id) 1846 .await 1847 .expect("statuses"); 1848 assert_eq!(statuses.len(), 2); 1849 assert!( 1850 statuses 1851 .iter() 1852 .all(|status| status.status == RadrootsOutboxRelayStatus::Pending) 1853 ); 1854 1855 let claimed = outbox 1856 .claim_next_ready_event("publisher-a", "claim-a", 2_000, 1_000) 1857 .await 1858 .expect("claim") 1859 .expect("claimed"); 1860 assert_eq!(claimed.state, RadrootsOutboxEventState::Publishing); 1861 assert_eq!(claimed.signed_event, Some(signed_event)); 1862 assert_eq!( 1863 claimed.target_relays, 1864 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()] 1865 ); 1866 } 1867 1868 #[tokio::test] 1869 async fn claim_next_ready_signed_event_skips_unsigned_work() { 1870 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1871 let unsigned = outbox 1872 .enqueue_operation(operation_input( 1873 post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "unsigned"), 1874 900, 1875 )) 1876 .await 1877 .expect("unsigned enqueue"); 1878 let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "signed-only"); 1879 let signed_event = 1880 radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event"); 1881 let signed = outbox 1882 .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_000)) 1883 .await 1884 .expect("signed enqueue"); 1885 1886 let claimed = outbox 1887 .claim_next_ready_signed_event("publisher-a", "claim-a", 2_000, 1_000) 1888 .await 1889 .expect("claim") 1890 .expect("claimed"); 1891 assert_eq!(claimed.outbox_event_id, signed.outbox_event_id); 1892 assert_eq!(claimed.state, RadrootsOutboxEventState::Publishing); 1893 assert!(claimed.signed_event.is_some()); 1894 1895 let unsigned_event = outbox 1896 .get_event(unsigned.outbox_event_id) 1897 .await 1898 .expect("unsigned event") 1899 .expect("unsigned event"); 1900 assert_eq!(unsigned_event.state, RadrootsOutboxEventState::DraftQueued); 1901 assert!(unsigned_event.claim_token.is_none()); 1902 1903 let signing_claim = outbox 1904 .claim_next_ready_event("signer-a", "sign-a", 2_100, 1_100) 1905 .await 1906 .expect("sign claim") 1907 .expect("sign claim"); 1908 assert_eq!(signing_claim.outbox_event_id, unsigned.outbox_event_id); 1909 assert_eq!(signing_claim.state, RadrootsOutboxEventState::Signing); 1910 assert!(signing_claim.signed_event.is_none()); 1911 } 1912 1913 #[tokio::test] 1914 async fn enqueue_signed_operation_idempotency_reuses_existing_signed_record() { 1915 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1916 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "idem-signed"); 1917 let signed_event = 1918 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 1919 1920 let first = outbox 1921 .enqueue_signed_operation( 1922 signed_operation_input(draft.clone(), signed_event.clone(), 1_000) 1923 .with_idempotency_key("signed-idem"), 1924 ) 1925 .await 1926 .expect("first"); 1927 let second = outbox 1928 .enqueue_signed_operation( 1929 signed_operation_input(draft.clone(), signed_event, 1_100) 1930 .with_idempotency_key("signed-idem"), 1931 ) 1932 .await 1933 .expect("second"); 1934 1935 assert_eq!(first.status, RadrootsOutboxEnqueueStatus::Inserted); 1936 assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Existing); 1937 assert_eq!(first.operation_id, second.operation_id); 1938 assert_eq!(first.outbox_event_id, second.outbox_event_id); 1939 assert_eq!(table_count(&outbox, "outbox_operation").await, 1); 1940 assert_eq!(table_count(&outbox, "outbox_event").await, 1); 1941 1942 let changed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "changed"); 1943 let changed_signed = 1944 radroots_nostr_sign_frozen_draft(&fixture_keys(), &changed_draft).expect("signed"); 1945 let conflict = outbox 1946 .enqueue_signed_operation( 1947 signed_operation_input(changed_draft, changed_signed, 1_200) 1948 .with_idempotency_key("signed-idem"), 1949 ) 1950 .await 1951 .expect_err("conflict"); 1952 assert!(matches!( 1953 conflict, 1954 RadrootsOutboxError::IdempotencyConflict { .. } 1955 )); 1956 } 1957 1958 #[tokio::test] 1959 async fn enqueue_signed_operation_rejects_mismatched_signed_event() { 1960 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1961 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "trusted"); 1962 let other_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "other"); 1963 let signed_event = 1964 radroots_nostr_sign_frozen_draft(&fixture_keys(), &other_draft).expect("signed event"); 1965 1966 let error = outbox 1967 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 1968 .await 1969 .expect_err("mismatch"); 1970 1971 assert!(matches!( 1972 error, 1973 RadrootsOutboxError::SignedEventDraftMismatch(_) 1974 )); 1975 assert_eq!(table_count(&outbox, "outbox_operation").await, 0); 1976 assert_eq!(table_count(&outbox, "outbox_event").await, 0); 1977 } 1978 1979 #[tokio::test] 1980 async fn enqueue_signed_operation_rejects_event_id_mismatch() { 1981 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 1982 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "bad-id"); 1983 let mut signed_event = 1984 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 1985 signed_event.id = hex_64('f'); 1986 1987 let error = outbox 1988 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 1989 .await 1990 .expect_err("mismatch"); 1991 1992 assert!(matches!( 1993 error, 1994 RadrootsOutboxError::SignedEventDraftMismatch(_) 1995 )); 1996 assert_eq!(table_count(&outbox, "outbox_operation").await, 0); 1997 assert_eq!(table_count(&outbox, "outbox_event").await, 0); 1998 } 1999 2000 #[tokio::test] 2001 async fn complete_signing_rejects_signed_event_id_mismatch() { 2002 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2003 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "claimed draft"); 2004 let receipt = outbox 2005 .enqueue_operation(operation_input(draft, 1_000)) 2006 .await 2007 .expect("enqueue"); 2008 let claimed = outbox 2009 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 2010 .await 2011 .expect("claim") 2012 .expect("claim"); 2013 let other_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "other draft"); 2014 let signed_event = 2015 radroots_nostr_sign_frozen_draft(&fixture_keys(), &other_draft).expect("signed event"); 2016 2017 let err = outbox 2018 .complete_signing( 2019 receipt.outbox_event_id, 2020 claimed.claim_token.as_str(), 2021 signed_event.clone(), 2022 1_100, 2023 ) 2024 .await 2025 .expect_err("event id mismatch"); 2026 2027 assert_eq!( 2028 err.to_string(), 2029 format!( 2030 "Signed event ID mismatch: expected {}, got {}", 2031 receipt.expected_event_id, signed_event.id 2032 ) 2033 ); 2034 } 2035 2036 #[tokio::test] 2037 async fn claim_token_guards_updates_and_expired_signing_claim_recovers() { 2038 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2039 let draft = post_draft(hex_64('a').as_str(), "hello"); 2040 let receipt = outbox 2041 .enqueue_operation(operation_input(draft, 1_000)) 2042 .await 2043 .expect("enqueue"); 2044 2045 let claimed = outbox 2046 .claim_next_ready_event("worker-a", "claim-a", 1_100, 1_000) 2047 .await 2048 .expect("claim") 2049 .expect("claimed event"); 2050 assert_eq!(claimed.state, RadrootsOutboxEventState::Signing); 2051 assert_eq!(claimed.attempt_count, 1); 2052 assert_eq!( 2053 claimed.target_relays, 2054 vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()] 2055 ); 2056 2057 let unavailable = outbox 2058 .claim_next_ready_event("worker-b", "claim-b", 1_100, 1_050) 2059 .await 2060 .expect("claim"); 2061 assert!(unavailable.is_none()); 2062 2063 let wrong_token = outbox 2064 .mark_sign_retryable( 2065 receipt.outbox_event_id, 2066 "claim-b", 2067 "sign failed", 2068 1_200, 2069 1_100, 2070 ) 2071 .await 2072 .expect_err("wrong token"); 2073 assert!(matches!( 2074 wrong_token, 2075 RadrootsOutboxError::ClaimTokenMismatch { .. } 2076 )); 2077 2078 let recovered = outbox.recover_expired_claims(1_101).await.expect("recover"); 2079 assert_eq!(recovered, 1); 2080 2081 let event = outbox 2082 .get_event(receipt.outbox_event_id) 2083 .await 2084 .expect("event") 2085 .expect("event"); 2086 assert_eq!(event.state, RadrootsOutboxEventState::SignRetryable); 2087 assert_eq!(event.attempt_count, 1); 2088 assert!(event.claim_token.is_none()); 2089 2090 let reclaimed = outbox 2091 .claim_next_ready_event("worker-b", "claim-b", 1_400, 1_200) 2092 .await 2093 .expect("claim") 2094 .expect("reclaimed"); 2095 assert_eq!(reclaimed.state, RadrootsOutboxEventState::Signing); 2096 assert_eq!(reclaimed.attempt_count, 2); 2097 } 2098 2099 #[tokio::test] 2100 async fn claimed_mutations_reject_stale_tokens_without_state_changes() { 2101 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2102 let event_store = RadrootsEventStore::open_memory() 2103 .await 2104 .expect("event store"); 2105 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "hello"); 2106 let receipt = outbox 2107 .enqueue_operation(operation_input(draft, 1_000)) 2108 .await 2109 .expect("enqueue"); 2110 2111 let first_claim = outbox 2112 .claim_next_ready_event("worker-a", "claim-a", 1_100, 1_000) 2113 .await 2114 .expect("claim") 2115 .expect("claim"); 2116 let signed = complete_claimed_signing(&outbox, &first_claim, &fixture_keys(), 1_050).await; 2117 outbox.recover_expired_claims(1_101).await.expect("recover"); 2118 let second_claim = outbox 2119 .claim_next_ready_event("worker-b", "claim-b", 1_500, 1_200) 2120 .await 2121 .expect("claim") 2122 .expect("claim"); 2123 assert_eq!(second_claim.state, RadrootsOutboxEventState::Publishing); 2124 2125 let retry_with_stale_token = outbox 2126 .mark_publish_retryable( 2127 receipt.outbox_event_id, 2128 "claim-a", 2129 "stale retry", 2130 1_600, 2131 1_300, 2132 ) 2133 .await 2134 .expect_err("stale retry token"); 2135 assert!(matches!( 2136 retry_with_stale_token, 2137 RadrootsOutboxError::ClaimTokenMismatch { .. } 2138 )); 2139 2140 let relay_with_stale_token = outbox 2141 .mark_relay_accepted(receipt.outbox_event_id, "claim-a", RELAY_PRIMARY_WSS, 1_300) 2142 .await 2143 .expect_err("stale relay token"); 2144 assert!(matches!( 2145 relay_with_stale_token, 2146 RadrootsOutboxError::ClaimTokenMismatch { .. } 2147 )); 2148 2149 let ingest_with_current_token = outbox 2150 .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-b", 1_350) 2151 .await 2152 .expect("current ingest"); 2153 assert_eq!(ingest_with_current_token.event_id, signed.id); 2154 2155 let event = outbox 2156 .get_event(receipt.outbox_event_id) 2157 .await 2158 .expect("event") 2159 .expect("event"); 2160 assert_eq!(event.state, RadrootsOutboxEventState::Publishing); 2161 assert_eq!(event.claim_token.as_deref(), Some("claim-b")); 2162 2163 let statuses = outbox 2164 .relay_statuses(receipt.outbox_event_id) 2165 .await 2166 .expect("statuses"); 2167 assert!( 2168 statuses 2169 .iter() 2170 .all(|status| status.status == RadrootsOutboxRelayStatus::Pending) 2171 ); 2172 } 2173 2174 #[tokio::test] 2175 async fn claimed_update_paths_report_missing_events_and_wrong_tokens() { 2176 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2177 2178 let missing = outbox 2179 .mark_sign_retryable(999, "missing-claim", "missing", 1_200, 1_100) 2180 .await 2181 .expect_err("missing event"); 2182 assert!(matches!(missing, RadrootsOutboxError::EventNotFound(999))); 2183 let missing_publish = outbox 2184 .complete_publish_attempt(999, "missing-claim", "retryable", "terminal", 1_300, 1_200) 2185 .await 2186 .expect_err("missing publish event"); 2187 assert!(matches!( 2188 missing_publish, 2189 RadrootsOutboxError::EventNotFound(999) 2190 )); 2191 2192 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "wrong token"); 2193 let receipt = outbox 2194 .enqueue_operation(operation_input(draft, 1_000)) 2195 .await 2196 .expect("enqueue"); 2197 outbox 2198 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 2199 .await 2200 .expect("claim") 2201 .expect("claim"); 2202 2203 let wrong_token = outbox 2204 .complete_publish_attempt( 2205 receipt.outbox_event_id, 2206 "claim-b", 2207 "retryable", 2208 "terminal", 2209 2_500, 2210 2_100, 2211 ) 2212 .await 2213 .expect_err("wrong token"); 2214 assert!(matches!( 2215 wrong_token, 2216 RadrootsOutboxError::ClaimTokenMismatch { .. } 2217 )); 2218 } 2219 2220 #[tokio::test] 2221 async fn sign_retryable_update_succeeds_and_reports_ignored_update_with_current_token() { 2222 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2223 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "retryable success"); 2224 let receipt = outbox 2225 .enqueue_operation(operation_input(draft, 1_000)) 2226 .await 2227 .expect("enqueue"); 2228 outbox 2229 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 2230 .await 2231 .expect("claim") 2232 .expect("claim"); 2233 2234 outbox 2235 .mark_sign_retryable(receipt.outbox_event_id, "claim-a", "retry", 1_500, 1_100) 2236 .await 2237 .expect("mark retryable"); 2238 let event = outbox 2239 .get_event(receipt.outbox_event_id) 2240 .await 2241 .expect("event") 2242 .expect("event"); 2243 assert_eq!(event.state, RadrootsOutboxEventState::SignRetryable); 2244 2245 let ignored_outbox = RadrootsOutbox::open_memory().await.expect("ignored open"); 2246 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored retryable"); 2247 let ignored_receipt = ignored_outbox 2248 .enqueue_operation(operation_input(draft, 2_000)) 2249 .await 2250 .expect("enqueue ignored"); 2251 ignored_outbox 2252 .claim_next_ready_event("worker-b", "claim-b", 3_000, 2_000) 2253 .await 2254 .expect("claim ignored") 2255 .expect("claim ignored"); 2256 sqlx::query( 2257 "CREATE TEMP TRIGGER ignore_sign_retry_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'sign_retryable' BEGIN SELECT RAISE(IGNORE); END", 2258 ) 2259 .execute(ignored_outbox.pool()) 2260 .await 2261 .expect("retry trigger"); 2262 let ignored = ignored_outbox 2263 .mark_sign_retryable( 2264 ignored_receipt.outbox_event_id, 2265 "claim-b", 2266 "ignored retry", 2267 2_500, 2268 2_100, 2269 ) 2270 .await 2271 .expect_err("ignored retryable update"); 2272 assert!(matches!( 2273 ignored, 2274 RadrootsOutboxError::ClaimTokenMismatch { .. } 2275 )); 2276 } 2277 2278 #[tokio::test] 2279 async fn ignored_sqlite_updates_preserve_race_guards() { 2280 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2281 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored claim"); 2282 outbox 2283 .enqueue_operation(operation_input(draft, 1_000)) 2284 .await 2285 .expect("enqueue"); 2286 let signed_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored signed claim"); 2287 let signed_event = 2288 radroots_nostr_sign_frozen_draft(&fixture_keys(), &signed_draft).expect("signed event"); 2289 outbox 2290 .enqueue_signed_operation(signed_operation_input(signed_draft, signed_event, 1_001)) 2291 .await 2292 .expect("signed enqueue"); 2293 sqlx::query( 2294 "CREATE TEMP TRIGGER ignore_claim_update BEFORE UPDATE OF claim_token ON outbox_event WHEN NEW.claim_token IN ('blocked-claim', 'blocked-signed') BEGIN SELECT RAISE(IGNORE); END", 2295 ) 2296 .execute(outbox.pool()) 2297 .await 2298 .expect("claim trigger"); 2299 2300 assert!( 2301 outbox 2302 .claim_next_ready_event("worker-a", "blocked-claim", 2_000, 1_000) 2303 .await 2304 .expect("claim") 2305 .is_none() 2306 ); 2307 assert!( 2308 outbox 2309 .claim_next_ready_signed_event("publisher-a", "blocked-signed", 2_000, 1_001) 2310 .await 2311 .expect("claim signed") 2312 .is_none() 2313 ); 2314 2315 let publish_outbox = RadrootsOutbox::open_memory().await.expect("publish open"); 2316 let publish_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored publish"); 2317 let publish_signed = radroots_nostr_sign_frozen_draft(&fixture_keys(), &publish_draft) 2318 .expect("signed event"); 2319 let publish_receipt = publish_outbox 2320 .enqueue_signed_operation(signed_operation_input(publish_draft, publish_signed, 1_100)) 2321 .await 2322 .expect("publish enqueue"); 2323 let publish_claim = publish_outbox 2324 .claim_next_ready_signed_event("publisher-b", "publish-claim", 3_000, 1_100) 2325 .await 2326 .expect("claim publish") 2327 .expect("claim publish"); 2328 publish_outbox 2329 .mark_relay_accepted( 2330 publish_receipt.outbox_event_id, 2331 publish_claim.claim_token.as_str(), 2332 RELAY_PRIMARY_WSS, 2333 1_150, 2334 ) 2335 .await 2336 .expect("primary accepted"); 2337 publish_outbox 2338 .mark_relay_accepted( 2339 publish_receipt.outbox_event_id, 2340 publish_claim.claim_token.as_str(), 2341 RELAY_SECONDARY_WSS, 2342 1_160, 2343 ) 2344 .await 2345 .expect("secondary accepted"); 2346 sqlx::query( 2347 "CREATE TEMP TRIGGER ignore_published_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'published' BEGIN SELECT RAISE(IGNORE); END", 2348 ) 2349 .execute(publish_outbox.pool()) 2350 .await 2351 .expect("publish trigger"); 2352 let ignored_publish = publish_outbox 2353 .complete_publish_attempt( 2354 publish_receipt.outbox_event_id, 2355 publish_claim.claim_token.as_str(), 2356 "retryable", 2357 "terminal", 2358 2_500, 2359 1_200, 2360 ) 2361 .await 2362 .expect_err("ignored publish update"); 2363 assert!(matches!( 2364 ignored_publish, 2365 RadrootsOutboxError::ClaimTokenMismatch { .. } 2366 )); 2367 2368 let cancel_outbox = RadrootsOutbox::open_memory().await.expect("cancel open"); 2369 let cancel_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ignored cancel"); 2370 let cancel_receipt = cancel_outbox 2371 .enqueue_operation(operation_input(cancel_draft, 1_200)) 2372 .await 2373 .expect("cancel enqueue"); 2374 let cancel_claim = cancel_outbox 2375 .claim_next_ready_event("worker-b", "cancel-claim", 3_000, 1_200) 2376 .await 2377 .expect("cancel claim") 2378 .expect("cancel claim"); 2379 sqlx::query( 2380 "CREATE TEMP TRIGGER ignore_cancel_update BEFORE UPDATE OF state ON outbox_event WHEN NEW.state = 'cancelled' BEGIN SELECT RAISE(IGNORE); END", 2381 ) 2382 .execute(cancel_outbox.pool()) 2383 .await 2384 .expect("cancel trigger"); 2385 let ignored_cancel = cancel_outbox 2386 .cancel_claimed_event( 2387 cancel_receipt.outbox_event_id, 2388 cancel_claim.claim_token.as_str(), 2389 1_300, 2390 ) 2391 .await 2392 .expect_err("ignored cancel update"); 2393 assert!(matches!( 2394 ignored_cancel, 2395 RadrootsOutboxError::ClaimTokenMismatch { .. } 2396 )); 2397 } 2398 2399 #[tokio::test] 2400 async fn signed_events_are_reused_after_claim_recovery() { 2401 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2402 let (receipt, claimed) = enqueue_signed_fixture(&outbox).await; 2403 let keys = fixture_keys(); 2404 2405 let signed = complete_claimed_signing(&outbox, &claimed, &keys, 1_100).await; 2406 assert_eq!(signed.id, receipt.expected_event_id); 2407 2408 let recovered = outbox.recover_expired_claims(2_001).await.expect("recover"); 2409 assert_eq!(recovered, 1); 2410 2411 let publish_claim = outbox 2412 .claim_next_ready_event("publisher-a", "claim-b", 3_000, 2_100) 2413 .await 2414 .expect("claim") 2415 .expect("publish claim"); 2416 assert_eq!(publish_claim.state, RadrootsOutboxEventState::Publishing); 2417 assert_eq!(publish_claim.signed_event.as_ref(), Some(&signed)); 2418 2419 let reused = complete_claimed_signing(&outbox, &publish_claim, &keys, 2_200).await; 2420 assert_eq!(reused, signed); 2421 2422 let event = outbox 2423 .get_event(receipt.outbox_event_id) 2424 .await 2425 .expect("event") 2426 .expect("event"); 2427 assert_eq!(event.state, RadrootsOutboxEventState::Publishing); 2428 assert_eq!(event.signed_event.as_ref(), Some(&signed)); 2429 } 2430 2431 #[tokio::test] 2432 async fn local_signed_event_ingest_is_idempotent_without_relay_observation() { 2433 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2434 let event_store = RadrootsEventStore::open_memory() 2435 .await 2436 .expect("event store"); 2437 let (receipt, claimed) = enqueue_signed_fixture(&outbox).await; 2438 let keys = fixture_keys(); 2439 let signed = complete_claimed_signing(&outbox, &claimed, &keys, 1_100).await; 2440 2441 let first = outbox 2442 .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-a", 1_200) 2443 .await 2444 .expect("first ingest"); 2445 assert_eq!(first.outbox_event_id, receipt.outbox_event_id); 2446 assert_eq!(first.event_id, signed.id); 2447 assert!(!first.already_ingested); 2448 assert!(first.event_store_inserted); 2449 2450 let stored = event_store 2451 .get_event(signed.id.as_str()) 2452 .await 2453 .expect("stored event"); 2454 assert!(stored.is_some()); 2455 2456 let observations = event_store 2457 .observations_for_event(signed.id.as_str()) 2458 .await 2459 .expect("observations"); 2460 assert!(observations.is_empty()); 2461 2462 let second = outbox 2463 .ingest_signed_event_local(&event_store, receipt.outbox_event_id, "claim-a", 1_300) 2464 .await 2465 .expect("second ingest"); 2466 assert!(second.already_ingested); 2467 assert!(!second.event_store_inserted); 2468 2469 let event = outbox 2470 .get_event(receipt.outbox_event_id) 2471 .await 2472 .expect("event") 2473 .expect("event"); 2474 assert_eq!(event.state, RadrootsOutboxEventState::Publishing); 2475 assert!(event.event_store_ingested); 2476 assert!(event.event_store_inserted); 2477 assert_eq!(event.event_store_ingested_at_ms, Some(1_200)); 2478 2479 let recovered = outbox.recover_expired_claims(2_001).await.expect("recover"); 2480 assert_eq!(recovered, 1); 2481 2482 let event = outbox 2483 .get_event(receipt.outbox_event_id) 2484 .await 2485 .expect("event") 2486 .expect("event"); 2487 assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); 2488 assert!(event.claim_token.is_none()); 2489 2490 let reclaimed = outbox 2491 .claim_next_ready_event("publisher-a", "claim-b", 3_000, 2_100) 2492 .await 2493 .expect("claim") 2494 .expect("publish claim"); 2495 assert_eq!(reclaimed.state, RadrootsOutboxEventState::Publishing); 2496 assert_eq!(reclaimed.signed_event.as_ref(), Some(&signed)); 2497 } 2498 2499 #[tokio::test] 2500 async fn terminal_and_cancelled_claimed_events_are_not_reclaimable() { 2501 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2502 let terminal_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal"); 2503 let terminal_receipt = outbox 2504 .enqueue_operation(operation_input(terminal_draft, 1_000)) 2505 .await 2506 .expect("enqueue"); 2507 outbox 2508 .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000) 2509 .await 2510 .expect("claim") 2511 .expect("claim"); 2512 outbox 2513 .mark_publish_failed_terminal( 2514 terminal_receipt.outbox_event_id, 2515 "claim-a", 2516 "terminal failure", 2517 1_100, 2518 ) 2519 .await 2520 .expect("terminal"); 2521 2522 let terminal_event = outbox 2523 .get_event(terminal_receipt.outbox_event_id) 2524 .await 2525 .expect("event") 2526 .expect("event"); 2527 assert_eq!( 2528 terminal_event.state, 2529 RadrootsOutboxEventState::FailedTerminal 2530 ); 2531 assert!(terminal_event.state.is_terminal()); 2532 assert!(terminal_event.claim_token.is_none()); 2533 let terminal_operation = outbox 2534 .get_operation(terminal_receipt.operation_id) 2535 .await 2536 .expect("operation") 2537 .expect("operation"); 2538 assert_eq!( 2539 terminal_operation.status, 2540 RadrootsOutboxOperationStatus::FailedTerminal 2541 ); 2542 assert!( 2543 outbox 2544 .claim_next_ready_event("worker-b", "claim-b", 2_000, 1_200) 2545 .await 2546 .expect("claim") 2547 .is_none() 2548 ); 2549 2550 let cancelled_draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "cancelled"); 2551 let cancelled_receipt = outbox 2552 .enqueue_operation(operation_input(cancelled_draft, 2_000)) 2553 .await 2554 .expect("enqueue"); 2555 outbox 2556 .claim_next_ready_event("worker-c", "claim-c", 3_000, 2_000) 2557 .await 2558 .expect("claim") 2559 .expect("claim"); 2560 outbox 2561 .cancel_claimed_event(cancelled_receipt.outbox_event_id, "claim-c", 2_100) 2562 .await 2563 .expect("cancel"); 2564 let cancelled_event = outbox 2565 .get_event(cancelled_receipt.outbox_event_id) 2566 .await 2567 .expect("event") 2568 .expect("event"); 2569 assert_eq!(cancelled_event.state, RadrootsOutboxEventState::Cancelled); 2570 assert!(cancelled_event.state.is_terminal()); 2571 assert!(cancelled_event.claim_token.is_none()); 2572 let cancelled_operation = outbox 2573 .get_operation(cancelled_receipt.operation_id) 2574 .await 2575 .expect("operation") 2576 .expect("operation"); 2577 assert_eq!( 2578 cancelled_operation.status, 2579 RadrootsOutboxOperationStatus::Cancelled 2580 ); 2581 assert!( 2582 outbox 2583 .claim_next_ready_event("worker-d", "claim-d", 3_000, 2_200) 2584 .await 2585 .expect("claim") 2586 .is_none() 2587 ); 2588 } 2589 2590 #[tokio::test] 2591 async fn terminal_publish_attempt_fails_operation_when_quorum_cannot_be_met() { 2592 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2593 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "terminal quorum"); 2594 let receipt = outbox 2595 .enqueue_operation(RadrootsOutboxOperationInput::new( 2596 "publish_post", 2597 draft, 2598 vec![ 2599 RELAY_PRIMARY_WSS.to_owned(), 2600 RELAY_SECONDARY_WSS.to_owned(), 2601 "wss://relay-3.example.com".to_owned(), 2602 ], 2603 1_000, 2604 )) 2605 .await 2606 .expect("enqueue"); 2607 let sign_claim = outbox 2608 .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) 2609 .await 2610 .expect("claim") 2611 .expect("claim"); 2612 complete_claimed_signing(&outbox, &sign_claim, &fixture_keys(), 1_100).await; 2613 outbox.recover_expired_claims(2_001).await.expect("recover"); 2614 outbox 2615 .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) 2616 .await 2617 .expect("claim") 2618 .expect("claim"); 2619 outbox 2620 .set_publish_quorum(receipt.outbox_event_id, "publish-a", 3, 2_200) 2621 .await 2622 .expect("quorum"); 2623 outbox 2624 .mark_relay_accepted( 2625 receipt.outbox_event_id, 2626 "publish-a", 2627 RELAY_PRIMARY_WSS, 2628 2_250, 2629 ) 2630 .await 2631 .expect("accepted"); 2632 outbox 2633 .mark_relay_failed_terminal( 2634 receipt.outbox_event_id, 2635 "publish-a", 2636 RELAY_SECONDARY_WSS, 2637 "restricted: denied", 2638 2_260, 2639 ) 2640 .await 2641 .expect("terminal"); 2642 outbox 2643 .mark_relay_failed_terminal( 2644 receipt.outbox_event_id, 2645 "publish-a", 2646 "wss://relay-3.example.com", 2647 "blocked: denied", 2648 2_270, 2649 ) 2650 .await 2651 .expect("terminal"); 2652 2653 let state = outbox 2654 .complete_publish_attempt( 2655 receipt.outbox_event_id, 2656 "publish-a", 2657 "retryable", 2658 "terminal", 2659 2_500, 2660 2_300, 2661 ) 2662 .await 2663 .expect("complete attempt"); 2664 assert_eq!(state, RadrootsOutboxEventState::FailedTerminal); 2665 let event = outbox 2666 .get_event(receipt.outbox_event_id) 2667 .await 2668 .expect("event") 2669 .expect("event"); 2670 assert_eq!(event.state, RadrootsOutboxEventState::FailedTerminal); 2671 assert!(event.claim_token.is_none()); 2672 assert_eq!(event.last_error.as_deref(), Some("terminal")); 2673 let operation = outbox 2674 .get_operation(receipt.operation_id) 2675 .await 2676 .expect("operation") 2677 .expect("operation"); 2678 assert_eq!( 2679 operation.status, 2680 RadrootsOutboxOperationStatus::FailedTerminal 2681 ); 2682 assert!( 2683 outbox 2684 .claim_next_ready_event("publisher", "publish-b", 4_000, 2_400) 2685 .await 2686 .expect("claim") 2687 .is_none() 2688 ); 2689 } 2690 2691 #[tokio::test] 2692 async fn publish_attempt_marks_event_published_after_acceptance_quorum() { 2693 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2694 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "published quorum"); 2695 let signed_event = 2696 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 2697 let receipt = outbox 2698 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 2699 .await 2700 .expect("enqueue"); 2701 let claimed = outbox 2702 .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) 2703 .await 2704 .expect("claim") 2705 .expect("claim"); 2706 2707 outbox 2708 .mark_relay_accepted( 2709 receipt.outbox_event_id, 2710 claimed.claim_token.as_str(), 2711 RELAY_PRIMARY_WSS, 2712 1_100, 2713 ) 2714 .await 2715 .expect("primary accepted"); 2716 outbox 2717 .mark_relay_accepted( 2718 receipt.outbox_event_id, 2719 claimed.claim_token.as_str(), 2720 RELAY_SECONDARY_WSS, 2721 1_110, 2722 ) 2723 .await 2724 .expect("secondary accepted"); 2725 2726 let state = outbox 2727 .complete_publish_attempt( 2728 receipt.outbox_event_id, 2729 claimed.claim_token.as_str(), 2730 "retryable", 2731 "terminal", 2732 2_500, 2733 1_200, 2734 ) 2735 .await 2736 .expect("complete attempt"); 2737 let event = outbox 2738 .get_event(receipt.outbox_event_id) 2739 .await 2740 .expect("event") 2741 .expect("event"); 2742 let operation = outbox 2743 .get_operation(receipt.operation_id) 2744 .await 2745 .expect("operation") 2746 .expect("operation"); 2747 2748 assert_eq!(state, RadrootsOutboxEventState::Published); 2749 assert_eq!(event.state, RadrootsOutboxEventState::Published); 2750 assert_eq!(event.next_attempt_after_ms, 1_200); 2751 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); 2752 } 2753 2754 #[tokio::test] 2755 async fn publish_attempt_remains_retryable_when_relay_work_remains() { 2756 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2757 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "retryable quorum"); 2758 let signed_event = 2759 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 2760 let receipt = outbox 2761 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 2762 .await 2763 .expect("enqueue"); 2764 let claimed = outbox 2765 .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) 2766 .await 2767 .expect("claim") 2768 .expect("claim"); 2769 2770 outbox 2771 .mark_relay_failed_retryable( 2772 receipt.outbox_event_id, 2773 claimed.claim_token.as_str(), 2774 RELAY_PRIMARY_WSS, 2775 "timeout", 2776 1_100, 2777 ) 2778 .await 2779 .expect("retryable relay"); 2780 2781 let state = outbox 2782 .complete_publish_attempt( 2783 receipt.outbox_event_id, 2784 claimed.claim_token.as_str(), 2785 "retryable error", 2786 "terminal", 2787 2_500, 2788 1_200, 2789 ) 2790 .await 2791 .expect("complete attempt"); 2792 let event = outbox 2793 .get_event(receipt.outbox_event_id) 2794 .await 2795 .expect("event") 2796 .expect("event"); 2797 let operation = outbox 2798 .get_operation(receipt.operation_id) 2799 .await 2800 .expect("operation") 2801 .expect("operation"); 2802 2803 assert_eq!(state, RadrootsOutboxEventState::PublishRetryable); 2804 assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); 2805 assert_eq!(event.last_error.as_deref(), Some("retryable error")); 2806 assert_eq!(event.next_attempt_after_ms, 2_500); 2807 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Queued); 2808 } 2809 2810 #[tokio::test] 2811 async fn publish_attempt_remains_retryable_when_pending_relay_work_remains() { 2812 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2813 let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "pending quorum"); 2814 let signed_event = 2815 radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event"); 2816 let receipt = outbox 2817 .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000)) 2818 .await 2819 .expect("enqueue"); 2820 let claimed = outbox 2821 .claim_next_ready_signed_event("publisher", "publish-a", 3_000, 1_000) 2822 .await 2823 .expect("claim") 2824 .expect("claim"); 2825 2826 outbox 2827 .mark_relay_accepted( 2828 receipt.outbox_event_id, 2829 claimed.claim_token.as_str(), 2830 RELAY_PRIMARY_WSS, 2831 1_100, 2832 ) 2833 .await 2834 .expect("accepted relay"); 2835 2836 let state = outbox 2837 .complete_publish_attempt( 2838 receipt.outbox_event_id, 2839 claimed.claim_token.as_str(), 2840 "pending relay", 2841 "terminal", 2842 2_500, 2843 1_200, 2844 ) 2845 .await 2846 .expect("complete attempt"); 2847 let event = outbox 2848 .get_event(receipt.outbox_event_id) 2849 .await 2850 .expect("event") 2851 .expect("event"); 2852 2853 assert_eq!(state, RadrootsOutboxEventState::PublishRetryable); 2854 assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); 2855 assert_eq!(event.last_error.as_deref(), Some("pending relay")); 2856 assert_eq!(event.next_attempt_after_ms, 2_500); 2857 } 2858 2859 #[tokio::test] 2860 async fn smoke_outbox_claim_cancel_cycles_complete_one_thousand_events() { 2861 let outbox = RadrootsOutbox::open_memory().await.expect("open"); 2862 let mut receipts = Vec::new(); 2863 for index in 0..1_000 { 2864 let draft = post_draft( 2865 FIXTURE_ALICE_PUBLIC_KEY_HEX, 2866 format!("claim-cycle-{index}").as_str(), 2867 ); 2868 let receipt = outbox 2869 .enqueue_operation(operation_input(draft, 1_000 + index)) 2870 .await 2871 .expect("enqueue"); 2872 receipts.push(receipt); 2873 } 2874 2875 for index in 0..1_000 { 2876 let claim_token = format!("claim-{index}"); 2877 let claimed = outbox 2878 .claim_next_ready_event( 2879 "smoke-worker", 2880 claim_token.as_str(), 2881 10_000 + index, 2882 2_000 + index, 2883 ) 2884 .await 2885 .expect("claim") 2886 .expect("claimed"); 2887 outbox 2888 .cancel_claimed_event(claimed.outbox_event_id, claim_token.as_str(), 3_000 + index) 2889 .await 2890 .expect("cancel"); 2891 } 2892 2893 for receipt in receipts { 2894 let event = outbox 2895 .get_event(receipt.outbox_event_id) 2896 .await 2897 .expect("event") 2898 .expect("event"); 2899 assert_eq!(event.state, RadrootsOutboxEventState::Cancelled); 2900 assert!(event.claim_token.is_none()); 2901 let operation = outbox 2902 .get_operation(receipt.operation_id) 2903 .await 2904 .expect("operation") 2905 .expect("operation"); 2906 assert_eq!(operation.status, RadrootsOutboxOperationStatus::Cancelled); 2907 } 2908 2909 assert!( 2910 outbox 2911 .claim_next_ready_event("smoke-worker", "claim-final", 20_000, 20_000) 2912 .await 2913 .expect("claim") 2914 .is_none() 2915 ); 2916 } 2917 }