commit 5c4369d1afd9585fc22d36818efcd43bc3ece72e
parent 8a2c0d07dd38281c4ee202aae7816d47d2c30272
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:55:32 -0700
groups: scope live outbox replay per group
- Add group-scoped outbox replay planning on top of deterministic replay ordering.
- Materialize generated side effects for the affected group after live source writes.
- Keep startup recovery on the full outbox replay path.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/crates/tangle_groups/src/outbox.rs b/crates/tangle_groups/src/outbox.rs
@@ -257,10 +257,21 @@ impl GroupOutbox {
}
pub fn replay_plan(&self) -> OutboxReplayPlan {
+ self.replay_plan_matching(|_| true)
+ }
+
+ pub fn replay_plan_for_group(&self, group_id: &GroupId) -> OutboxReplayPlan {
+ self.replay_plan_matching(|record| record.key().group_id() == group_id)
+ }
+
+ fn replay_plan_matching(
+ &self,
+ include: impl Fn(&GroupOutboxRecord) -> bool,
+ ) -> OutboxReplayPlan {
let mut records = self
.records
.values()
- .filter(|record| record.is_retryable())
+ .filter(|record| record.is_retryable() && include(record))
.cloned()
.collect::<Vec<_>>();
records.sort_by(|left, right| {
@@ -592,6 +603,36 @@ mod tests {
}
#[test]
+ fn outbox_replay_plan_can_scope_retryable_records_to_one_group() {
+ let mut outbox = GroupOutbox::new();
+ let farm_early = replay_record(&"f".repeat(64), "Farm", 1);
+ let farm_late = replay_record(&"0".repeat(64), "Farm", 2);
+ let market_early = replay_record(&"1".repeat(64), "Market", 1);
+
+ outbox
+ .merge_idempotent(market_early.clone())
+ .expect("market");
+ outbox
+ .merge_idempotent(farm_late.clone())
+ .expect("farm late");
+ outbox
+ .merge_idempotent(farm_early.clone())
+ .expect("farm early");
+ let plan = outbox.replay_plan_for_group(&GroupId::new("Farm").expect("group"));
+
+ assert_eq!(
+ plan.records()
+ .iter()
+ .map(|record| record.key().source_event_id())
+ .collect::<Vec<_>>(),
+ vec![
+ farm_early.key().source_event_id(),
+ farm_late.key().source_event_id()
+ ]
+ );
+ }
+
+ #[test]
fn outbox_records_round_trip_for_persistence() {
let mut record = GroupOutboxRecord::pending(key(None), payload(39_000));
record.mark_failed(true, "pending retry");
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -170,7 +170,10 @@ impl GroupService {
persist_outbox_record(store, &record)?;
}
}
- self.materialize_outbox(store)
+ if let Some(group_id) = class_group_id(class) {
+ return self.materialize_outbox_for_group(store, group_id);
+ }
+ Ok(Vec::new())
}
fn plan_outbox_records(
@@ -222,8 +225,29 @@ impl GroupService {
&mut self,
store: &PocketStoreHandle,
) -> Result<Vec<StoreOffset>, BaseRelayError> {
- let mut stored_offsets = Vec::new();
let records = self.outbox.replay_plan().records().to_vec();
+ self.materialize_records(store, records)
+ }
+
+ fn materialize_outbox_for_group(
+ &mut self,
+ store: &PocketStoreHandle,
+ group_id: &GroupId,
+ ) -> Result<Vec<StoreOffset>, BaseRelayError> {
+ let records = self
+ .outbox
+ .replay_plan_for_group(group_id)
+ .records()
+ .to_vec();
+ self.materialize_records(store, records)
+ }
+
+ fn materialize_records(
+ &mut self,
+ store: &PocketStoreHandle,
+ records: Vec<GroupOutboxRecord>,
+ ) -> Result<Vec<StoreOffset>, BaseRelayError> {
+ let mut stored_offsets = Vec::new();
for record in records {
if let Some(offset) = self.materialize_record(store, record)? {
stored_offsets.push(offset);