commit 8a2c0d07dd38281c4ee202aae7816d47d2c30272
parent 3e3d537bf4416a5007abb9e9072395182966a53c
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:49:22 -0700
groups: order outbox replay per group
- Sort retryable outbox replay records by group, generated timestamp, source event, effect, and target.
- Keep generated side effects deterministic when recovery replays pending records from multiple source events.
- Add replay-plan coverage that proves source-time order wins over raw source-event-id ordering.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
1 file changed, 73 insertions(+), 8 deletions(-)
diff --git a/crates/tangle_groups/src/outbox.rs b/crates/tangle_groups/src/outbox.rs
@@ -257,14 +257,30 @@ impl GroupOutbox {
}
pub fn replay_plan(&self) -> OutboxReplayPlan {
- OutboxReplayPlan {
- records: self
- .records
- .values()
- .filter(|record| record.is_retryable())
- .cloned()
- .collect(),
- }
+ let mut records = self
+ .records
+ .values()
+ .filter(|record| record.is_retryable())
+ .cloned()
+ .collect::<Vec<_>>();
+ records.sort_by(|left, right| {
+ left.key()
+ .group_id()
+ .cmp(right.key().group_id())
+ .then_with(|| {
+ left.payload()
+ .generated_created_at()
+ .cmp(&right.payload().generated_created_at())
+ })
+ .then_with(|| {
+ left.key()
+ .source_event_id()
+ .cmp(right.key().source_event_id())
+ })
+ .then_with(|| left.key().effect().cmp(&right.key().effect()))
+ .then_with(|| left.key().target_pubkey().cmp(&right.key().target_pubkey()))
+ });
+ OutboxReplayPlan { records }
}
}
@@ -545,6 +561,37 @@ mod tests {
}
#[test]
+ fn outbox_replay_plan_orders_retryable_records_by_group_and_source_time() {
+ let mut outbox = GroupOutbox::new();
+ let farm_early = replay_record(&"f".repeat(64), "Farm", 1);
+ let farm_late = replay_record(&"0".repeat(64), "Farm", 2);
+ let market_early = replay_record(&"1".repeat(64), "Market", 1);
+
+ outbox
+ .merge_idempotent(market_early.clone())
+ .expect("market");
+ outbox
+ .merge_idempotent(farm_late.clone())
+ .expect("farm late");
+ outbox
+ .merge_idempotent(farm_early.clone())
+ .expect("farm early");
+ let plan = outbox.replay_plan();
+
+ assert_eq!(
+ plan.records()
+ .iter()
+ .map(|record| record.key().source_event_id())
+ .collect::<Vec<_>>(),
+ vec![
+ farm_early.key().source_event_id(),
+ farm_late.key().source_event_id(),
+ market_early.key().source_event_id()
+ ]
+ );
+ }
+
+ #[test]
fn outbox_records_round_trip_for_persistence() {
let mut record = GroupOutboxRecord::pending(key(None), payload(39_000));
record.mark_failed(true, "pending retry");
@@ -600,4 +647,22 @@ mod tests {
"",
)
}
+
+ fn replay_record(source_event_id: &str, group_id: &str, created_at: u64) -> GroupOutboxRecord {
+ let group_id = GroupId::new(group_id).expect("group");
+ GroupOutboxRecord::pending(
+ GroupOutboxKey::new(
+ EventId::new(source_event_id).expect("event"),
+ GroupOutboxEffect::MetadataSnapshot,
+ group_id.clone(),
+ None,
+ ),
+ GroupOutboxPayload::new(
+ 39_000,
+ UnixTimestamp::new(created_at),
+ vec![vec!["h".to_owned(), group_id.as_str().to_owned()]],
+ "",
+ ),
+ )
+ }
}