outbox.rs (21985B)
1 use std::collections::{BTreeMap, BTreeSet}; 2 3 use crate::{GroupError, GroupId}; 4 use serde::{Deserialize, Serialize}; 5 use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp}; 6 7 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] 8 pub enum GroupOutboxEffect { 9 MetadataSnapshot, 10 AdminListSnapshot, 11 MemberListSnapshot, 12 RoleListSnapshot, 13 State39004Snapshot, 14 JoinAccepted, 15 LeaveAccepted, 16 } 17 18 impl GroupOutboxEffect { 19 pub fn as_str(self) -> &'static str { 20 match self { 21 Self::MetadataSnapshot => "metadata_snapshot", 22 Self::AdminListSnapshot => "admin_list_snapshot", 23 Self::MemberListSnapshot => "member_list_snapshot", 24 Self::RoleListSnapshot => "role_list_snapshot", 25 Self::State39004Snapshot => "state_39004_snapshot", 26 Self::JoinAccepted => "join_accepted", 27 Self::LeaveAccepted => "leave_accepted", 28 } 29 } 30 31 pub fn from_label(value: &str) -> Result<Self, GroupError> { 32 match value { 33 "metadata_snapshot" => Ok(Self::MetadataSnapshot), 34 "admin_list_snapshot" => Ok(Self::AdminListSnapshot), 35 "member_list_snapshot" => Ok(Self::MemberListSnapshot), 36 "role_list_snapshot" => Ok(Self::RoleListSnapshot), 37 "state_39004_snapshot" => Ok(Self::State39004Snapshot), 38 "join_accepted" => Ok(Self::JoinAccepted), 39 "leave_accepted" => Ok(Self::LeaveAccepted), 40 _ => Err(GroupError::internal(format!( 41 "unknown outbox effect {value}" 42 ))), 43 } 44 } 45 } 46 47 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] 48 pub struct GroupOutboxKey { 49 source_event_id: EventId, 50 effect: GroupOutboxEffect, 51 group_id: GroupId, 52 target_pubkey: Option<PublicKeyHex>, 53 } 54 55 impl GroupOutboxKey { 56 pub fn new( 57 source_event_id: EventId, 58 effect: GroupOutboxEffect, 59 group_id: GroupId, 60 target_pubkey: Option<PublicKeyHex>, 61 ) -> Self { 62 Self { 63 source_event_id, 64 effect, 65 group_id, 66 target_pubkey, 67 } 68 } 69 70 pub fn source_event_id(&self) -> &EventId { 71 &self.source_event_id 72 } 73 74 pub fn effect(&self) -> GroupOutboxEffect { 75 self.effect 76 } 77 78 pub fn group_id(&self) -> &GroupId { 79 &self.group_id 80 } 81 82 pub fn target_pubkey(&self) -> Option<&PublicKeyHex> { 83 self.target_pubkey.as_ref() 84 } 85 86 pub fn storage_key(&self) -> Vec<u8> { 87 let mut key = Vec::new(); 88 key.extend_from_slice(self.source_event_id.as_str().as_bytes()); 89 key.push(0); 90 key.extend_from_slice(self.effect.as_str().as_bytes()); 91 key.push(0); 92 key.extend_from_slice(self.group_id.as_str().as_bytes()); 93 key.push(0); 94 if let Some(pubkey) = &self.target_pubkey { 95 key.extend_from_slice(pubkey.as_str().as_bytes()); 96 } 97 key 98 } 99 } 100 101 #[derive(Debug, Clone, PartialEq, Eq)] 102 pub enum GroupOutboxStatus { 103 Pending, 104 Stored { generated_event_id: EventId }, 105 Skipped { reason: String }, 106 Failed { retryable: bool }, 107 } 108 109 #[derive(Debug, Clone, PartialEq, Eq)] 110 pub struct GroupOutboxPayload { 111 generated_kind: u32, 112 generated_created_at: UnixTimestamp, 113 tags: Vec<Vec<String>>, 114 content: String, 115 } 116 117 impl GroupOutboxPayload { 118 pub fn new( 119 generated_kind: u32, 120 generated_created_at: UnixTimestamp, 121 tags: Vec<Vec<String>>, 122 content: impl Into<String>, 123 ) -> Self { 124 Self { 125 generated_kind, 126 generated_created_at, 127 tags, 128 content: content.into(), 129 } 130 } 131 132 pub fn generated_kind(&self) -> u32 { 133 self.generated_kind 134 } 135 136 pub fn generated_created_at(&self) -> UnixTimestamp { 137 self.generated_created_at 138 } 139 140 pub fn tags(&self) -> &[Vec<String>] { 141 &self.tags 142 } 143 144 pub fn content(&self) -> &str { 145 &self.content 146 } 147 } 148 149 #[derive(Debug, Clone, PartialEq, Eq)] 150 pub struct GroupOutboxRecord { 151 key: GroupOutboxKey, 152 status: GroupOutboxStatus, 153 payload: GroupOutboxPayload, 154 attempts: u32, 155 last_error: Option<String>, 156 } 157 158 impl GroupOutboxRecord { 159 pub fn pending(key: GroupOutboxKey, payload: GroupOutboxPayload) -> Self { 160 Self { 161 key, 162 status: GroupOutboxStatus::Pending, 163 payload, 164 attempts: 0, 165 last_error: None, 166 } 167 } 168 169 pub fn key(&self) -> &GroupOutboxKey { 170 &self.key 171 } 172 173 pub fn status(&self) -> &GroupOutboxStatus { 174 &self.status 175 } 176 177 pub fn payload(&self) -> &GroupOutboxPayload { 178 &self.payload 179 } 180 181 pub fn attempts(&self) -> u32 { 182 self.attempts 183 } 184 185 pub fn last_error(&self) -> Option<&str> { 186 self.last_error.as_deref() 187 } 188 189 pub fn mark_stored(&mut self, generated_event_id: EventId) { 190 self.status = GroupOutboxStatus::Stored { generated_event_id }; 191 self.last_error = None; 192 } 193 194 pub fn mark_skipped(&mut self, reason: impl Into<String>) { 195 self.status = GroupOutboxStatus::Skipped { 196 reason: reason.into(), 197 }; 198 } 199 200 pub fn mark_failed(&mut self, retryable: bool, error: impl Into<String>) { 201 self.status = GroupOutboxStatus::Failed { retryable }; 202 self.attempts = self.attempts.saturating_add(1); 203 self.last_error = Some(error.into()); 204 } 205 206 pub fn is_retryable(&self) -> bool { 207 matches!( 208 self.status, 209 GroupOutboxStatus::Pending | GroupOutboxStatus::Failed { retryable: true } 210 ) 211 } 212 213 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 214 serde_json::to_vec(&GroupOutboxRecordDocument::from_record(self)).map_err(|error| { 215 GroupError::internal(format!("outbox record JSON encode failed: {error}")) 216 }) 217 } 218 219 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 220 let document = 221 serde_json::from_slice::<GroupOutboxRecordDocument>(raw).map_err(|error| { 222 GroupError::internal(format!("outbox record JSON decode failed: {error}")) 223 })?; 224 document.into_record() 225 } 226 } 227 228 #[derive(Debug, Clone, PartialEq, Eq, Default)] 229 pub struct GroupOutbox { 230 records: BTreeMap<GroupOutboxKey, GroupOutboxRecord>, 231 } 232 233 impl GroupOutbox { 234 pub fn new() -> Self { 235 Self::default() 236 } 237 238 pub fn merge_idempotent(&mut self, record: GroupOutboxRecord) -> Result<bool, GroupError> { 239 if let Some(existing) = self.records.get(record.key()) { 240 if existing.payload() == record.payload() { 241 return Ok(false); 242 } 243 return Err(GroupError::internal( 244 "outbox record key already exists with different payload", 245 )); 246 } 247 self.records.insert(record.key().clone(), record); 248 Ok(true) 249 } 250 251 pub fn get(&self, key: &GroupOutboxKey) -> Option<&GroupOutboxRecord> { 252 self.records.get(key) 253 } 254 255 pub fn update(&mut self, record: GroupOutboxRecord) { 256 self.records.insert(record.key().clone(), record); 257 } 258 259 pub fn replay_plan(&self) -> OutboxReplayPlan { 260 self.replay_plan_matching(|_| true) 261 } 262 263 pub fn replay_plan_for_group(&self, group_id: &GroupId) -> OutboxReplayPlan { 264 self.replay_plan_matching(|record| record.key().group_id() == group_id) 265 } 266 267 fn replay_plan_matching( 268 &self, 269 include: impl Fn(&GroupOutboxRecord) -> bool, 270 ) -> OutboxReplayPlan { 271 let mut records = self 272 .records 273 .values() 274 .filter(|record| record.is_retryable() && include(record)) 275 .cloned() 276 .collect::<Vec<_>>(); 277 records.sort_by(|left, right| { 278 left.key() 279 .group_id() 280 .cmp(right.key().group_id()) 281 .then_with(|| { 282 left.payload() 283 .generated_created_at() 284 .cmp(&right.payload().generated_created_at()) 285 }) 286 .then_with(|| { 287 left.key() 288 .source_event_id() 289 .cmp(right.key().source_event_id()) 290 }) 291 .then_with(|| left.key().effect().cmp(&right.key().effect())) 292 .then_with(|| left.key().target_pubkey().cmp(&right.key().target_pubkey())) 293 }); 294 OutboxReplayPlan { records } 295 } 296 } 297 298 #[derive(Debug, Clone, PartialEq, Eq)] 299 pub struct OutboxReplayPlan { 300 records: Vec<GroupOutboxRecord>, 301 } 302 303 impl OutboxReplayPlan { 304 pub fn records(&self) -> &[GroupOutboxRecord] { 305 &self.records 306 } 307 } 308 309 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] 310 pub enum GroupCrashPoint { 311 SourceParsedBeforeStore, 312 SourceStoreBeforeProjection, 313 ProjectionUpdateBeforeOutboxPersist, 314 OutboxPersistBeforeGeneratedStore, 315 GeneratedStoreBeforeOutboxMark, 316 OutboxMarkBeforeBroadcast, 317 ProjectionRebuild, 318 } 319 320 #[derive(Debug, Clone, PartialEq, Eq, Default)] 321 pub struct GroupCrashHooks { 322 fail_points: BTreeSet<GroupCrashPoint>, 323 } 324 325 impl GroupCrashHooks { 326 pub fn disabled() -> Self { 327 Self::default() 328 } 329 330 pub fn failing_at(points: impl IntoIterator<Item = GroupCrashPoint>) -> Self { 331 Self { 332 fail_points: points.into_iter().collect(), 333 } 334 } 335 336 pub fn check(&self, point: GroupCrashPoint) -> Result<(), GroupError> { 337 if self.fail_points.contains(&point) { 338 return Err(GroupError::internal(format!( 339 "injected group crash at {point:?}" 340 ))); 341 } 342 Ok(()) 343 } 344 } 345 346 #[derive(Debug, Clone, PartialEq, Eq)] 347 pub enum OutboxRecoveryReadiness { 348 Ready, 349 FailedClosed { reason: String }, 350 } 351 352 impl OutboxRecoveryReadiness { 353 pub fn from_replay_result<T>(result: &Result<T, GroupError>) -> Self { 354 match result { 355 Ok(_) => Self::Ready, 356 Err(error) => Self::FailedClosed { 357 reason: error.prefixed_message(), 358 }, 359 } 360 } 361 } 362 363 #[derive(Debug, Clone, Serialize, Deserialize)] 364 struct GroupOutboxRecordDocument { 365 key: GroupOutboxKeyDocument, 366 status: GroupOutboxStatusDocument, 367 payload: GroupOutboxPayloadDocument, 368 attempts: u32, 369 last_error: Option<String>, 370 } 371 372 impl GroupOutboxRecordDocument { 373 fn from_record(record: &GroupOutboxRecord) -> Self { 374 Self { 375 key: GroupOutboxKeyDocument::from_key(record.key()), 376 status: GroupOutboxStatusDocument::from_status(record.status()), 377 payload: GroupOutboxPayloadDocument::from_payload(record.payload()), 378 attempts: record.attempts(), 379 last_error: record.last_error().map(str::to_owned), 380 } 381 } 382 383 fn into_record(self) -> Result<GroupOutboxRecord, GroupError> { 384 Ok(GroupOutboxRecord { 385 key: self.key.into_key()?, 386 status: self.status.into_status()?, 387 payload: self.payload.into_payload(), 388 attempts: self.attempts, 389 last_error: self.last_error, 390 }) 391 } 392 } 393 394 #[derive(Debug, Clone, Serialize, Deserialize)] 395 struct GroupOutboxKeyDocument { 396 source_event_id: String, 397 effect: String, 398 group_id: String, 399 target_pubkey: Option<String>, 400 } 401 402 impl GroupOutboxKeyDocument { 403 fn from_key(key: &GroupOutboxKey) -> Self { 404 Self { 405 source_event_id: key.source_event_id().as_str().to_owned(), 406 effect: key.effect().as_str().to_owned(), 407 group_id: key.group_id().as_str().to_owned(), 408 target_pubkey: key.target_pubkey().map(|pubkey| pubkey.as_str().to_owned()), 409 } 410 } 411 412 fn into_key(self) -> Result<GroupOutboxKey, GroupError> { 413 Ok(GroupOutboxKey::new( 414 EventId::new(&self.source_event_id).map_err(GroupError::internal)?, 415 GroupOutboxEffect::from_label(&self.effect)?, 416 GroupId::new(&self.group_id)?, 417 self.target_pubkey 418 .as_deref() 419 .map(PublicKeyHex::new) 420 .transpose() 421 .map_err(GroupError::internal)?, 422 )) 423 } 424 } 425 426 #[derive(Debug, Clone, Serialize, Deserialize)] 427 #[serde(tag = "state")] 428 enum GroupOutboxStatusDocument { 429 Pending, 430 Stored { generated_event_id: String }, 431 Skipped { reason: String }, 432 Failed { retryable: bool }, 433 } 434 435 impl GroupOutboxStatusDocument { 436 fn from_status(status: &GroupOutboxStatus) -> Self { 437 match status { 438 GroupOutboxStatus::Pending => Self::Pending, 439 GroupOutboxStatus::Stored { generated_event_id } => Self::Stored { 440 generated_event_id: generated_event_id.as_str().to_owned(), 441 }, 442 GroupOutboxStatus::Skipped { reason } => Self::Skipped { 443 reason: reason.clone(), 444 }, 445 GroupOutboxStatus::Failed { retryable } => Self::Failed { 446 retryable: *retryable, 447 }, 448 } 449 } 450 451 fn into_status(self) -> Result<GroupOutboxStatus, GroupError> { 452 match self { 453 Self::Pending => Ok(GroupOutboxStatus::Pending), 454 Self::Stored { generated_event_id } => Ok(GroupOutboxStatus::Stored { 455 generated_event_id: EventId::new(&generated_event_id) 456 .map_err(GroupError::internal)?, 457 }), 458 Self::Skipped { reason } => Ok(GroupOutboxStatus::Skipped { reason }), 459 Self::Failed { retryable } => Ok(GroupOutboxStatus::Failed { retryable }), 460 } 461 } 462 } 463 464 #[derive(Debug, Clone, Serialize, Deserialize)] 465 struct GroupOutboxPayloadDocument { 466 generated_kind: u32, 467 generated_created_at: u64, 468 tags: Vec<Vec<String>>, 469 content: String, 470 } 471 472 impl GroupOutboxPayloadDocument { 473 fn from_payload(payload: &GroupOutboxPayload) -> Self { 474 Self { 475 generated_kind: payload.generated_kind(), 476 generated_created_at: payload.generated_created_at().as_u64(), 477 tags: payload.tags().to_vec(), 478 content: payload.content().to_owned(), 479 } 480 } 481 482 fn into_payload(self) -> GroupOutboxPayload { 483 GroupOutboxPayload::new( 484 self.generated_kind, 485 UnixTimestamp::new(self.generated_created_at), 486 self.tags, 487 self.content, 488 ) 489 } 490 } 491 492 #[cfg(test)] 493 mod tests { 494 use super::{ 495 GroupCrashHooks, GroupCrashPoint, GroupOutbox, GroupOutboxEffect, GroupOutboxKey, 496 GroupOutboxPayload, GroupOutboxRecord, GroupOutboxStatus, 497 }; 498 use crate::GroupId; 499 use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp}; 500 501 #[test] 502 fn outbox_keys_are_deterministic() { 503 let key = key(Some(PublicKeyHex::new(&"2".repeat(64)).expect("pubkey"))); 504 505 assert_eq!( 506 key.storage_key(), 507 format!( 508 "{}\0join_accepted\0Farm\0{}", 509 "1".repeat(64), 510 "2".repeat(64) 511 ) 512 .into_bytes() 513 ); 514 } 515 516 #[test] 517 fn outbox_merge_is_idempotent_for_same_payload() { 518 let mut outbox = GroupOutbox::new(); 519 let record = GroupOutboxRecord::pending(key(None), payload(9_000)); 520 521 assert!(outbox.merge_idempotent(record.clone()).expect("insert")); 522 assert!(!outbox.merge_idempotent(record).expect("same")); 523 assert!( 524 outbox 525 .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_001))) 526 .is_err() 527 ); 528 } 529 530 #[test] 531 fn outbox_merge_preserves_persisted_status_for_same_payload() { 532 let mut outbox = GroupOutbox::new(); 533 let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000)); 534 let generated_event_id = EventId::new(&"9".repeat(64)).expect("event"); 535 stored.mark_stored(generated_event_id.clone()); 536 537 assert!(outbox.merge_idempotent(stored.clone()).expect("stored")); 538 assert!( 539 !outbox 540 .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_000))) 541 .expect("derived") 542 ); 543 assert_eq!( 544 outbox.get(stored.key()).expect("record").status(), 545 &GroupOutboxStatus::Stored { generated_event_id } 546 ); 547 } 548 549 #[test] 550 fn outbox_replay_plan_is_sorted_and_retryable_only() { 551 let mut outbox = GroupOutbox::new(); 552 let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000)); 553 stored.mark_stored(EventId::new(&"9".repeat(64)).expect("event")); 554 let mut retryable = GroupOutboxRecord::pending( 555 GroupOutboxKey::new( 556 EventId::new(&"0".repeat(64)).expect("event"), 557 GroupOutboxEffect::MetadataSnapshot, 558 GroupId::new("Farm").expect("group"), 559 None, 560 ), 561 payload(39_000), 562 ); 563 retryable.mark_failed(true, "store failed"); 564 565 outbox.merge_idempotent(stored).expect("stored"); 566 outbox.merge_idempotent(retryable).expect("retryable"); 567 let plan = outbox.replay_plan(); 568 569 assert_eq!(plan.records().len(), 1); 570 assert_eq!(plan.records()[0].payload().generated_kind(), 39_000); 571 assert_eq!(plan.records()[0].attempts(), 1); 572 } 573 574 #[test] 575 fn outbox_replay_plan_orders_retryable_records_by_group_and_source_time() { 576 let mut outbox = GroupOutbox::new(); 577 let farm_early = replay_record(&"f".repeat(64), "Farm", 1); 578 let farm_late = replay_record(&"0".repeat(64), "Farm", 2); 579 let market_early = replay_record(&"1".repeat(64), "Market", 1); 580 581 outbox 582 .merge_idempotent(market_early.clone()) 583 .expect("market"); 584 outbox 585 .merge_idempotent(farm_late.clone()) 586 .expect("farm late"); 587 outbox 588 .merge_idempotent(farm_early.clone()) 589 .expect("farm early"); 590 let plan = outbox.replay_plan(); 591 592 assert_eq!( 593 plan.records() 594 .iter() 595 .map(|record| record.key().source_event_id()) 596 .collect::<Vec<_>>(), 597 vec![ 598 farm_early.key().source_event_id(), 599 farm_late.key().source_event_id(), 600 market_early.key().source_event_id() 601 ] 602 ); 603 } 604 605 #[test] 606 fn outbox_replay_plan_can_scope_retryable_records_to_one_group() { 607 let mut outbox = GroupOutbox::new(); 608 let farm_early = replay_record(&"f".repeat(64), "Farm", 1); 609 let farm_late = replay_record(&"0".repeat(64), "Farm", 2); 610 let market_early = replay_record(&"1".repeat(64), "Market", 1); 611 612 outbox 613 .merge_idempotent(market_early.clone()) 614 .expect("market"); 615 outbox 616 .merge_idempotent(farm_late.clone()) 617 .expect("farm late"); 618 outbox 619 .merge_idempotent(farm_early.clone()) 620 .expect("farm early"); 621 let plan = outbox.replay_plan_for_group(&GroupId::new("Farm").expect("group")); 622 623 assert_eq!( 624 plan.records() 625 .iter() 626 .map(|record| record.key().source_event_id()) 627 .collect::<Vec<_>>(), 628 vec![ 629 farm_early.key().source_event_id(), 630 farm_late.key().source_event_id() 631 ] 632 ); 633 } 634 635 #[test] 636 fn outbox_records_round_trip_for_persistence() { 637 let mut record = GroupOutboxRecord::pending(key(None), payload(39_000)); 638 record.mark_failed(true, "pending retry"); 639 640 let decoded = GroupOutboxRecord::from_json_bytes(&record.to_json_bytes().expect("bytes")) 641 .expect("record"); 642 assert_eq!(decoded.payload().generated_kind(), 39_000); 643 assert_eq!( 644 decoded.payload().generated_created_at(), 645 UnixTimestamp::new(1) 646 ); 647 assert_eq!( 648 decoded.payload().tags(), 649 &[vec!["h".to_owned(), "Farm".to_owned()]] 650 ); 651 assert_eq!(decoded.payload().content(), ""); 652 assert_eq!(decoded, record); 653 } 654 655 #[test] 656 fn crash_hooks_fail_only_at_configured_points() { 657 let hooks = 658 GroupCrashHooks::failing_at([GroupCrashPoint::OutboxPersistBeforeGeneratedStore]); 659 660 assert!( 661 hooks 662 .check(GroupCrashPoint::GeneratedStoreBeforeOutboxMark) 663 .is_ok() 664 ); 665 assert_eq!( 666 hooks 667 .check(GroupCrashPoint::OutboxPersistBeforeGeneratedStore) 668 .expect_err("injected") 669 .prefixed_message(), 670 "error: injected group crash at OutboxPersistBeforeGeneratedStore" 671 ); 672 } 673 674 fn key(target_pubkey: Option<PublicKeyHex>) -> GroupOutboxKey { 675 GroupOutboxKey::new( 676 EventId::new(&"1".repeat(64)).expect("event"), 677 GroupOutboxEffect::JoinAccepted, 678 GroupId::new("Farm").expect("group"), 679 target_pubkey, 680 ) 681 } 682 683 fn payload(kind: u32) -> GroupOutboxPayload { 684 GroupOutboxPayload::new( 685 kind, 686 UnixTimestamp::new(1), 687 vec![vec!["h".to_owned(), "Farm".to_owned()]], 688 "", 689 ) 690 } 691 692 fn replay_record(source_event_id: &str, group_id: &str, created_at: u64) -> GroupOutboxRecord { 693 let group_id = GroupId::new(group_id).expect("group"); 694 GroupOutboxRecord::pending( 695 GroupOutboxKey::new( 696 EventId::new(source_event_id).expect("event"), 697 GroupOutboxEffect::MetadataSnapshot, 698 group_id.clone(), 699 None, 700 ), 701 GroupOutboxPayload::new( 702 39_000, 703 UnixTimestamp::new(created_at), 704 vec![vec!["h".to_owned(), group_id.as_str().to_owned()]], 705 "", 706 ), 707 ) 708 } 709 }