commit 1fe976ae1ad2e4d361a4f58b2b282a5fae0b6ca1
parent bda28d8893f1f3384956901d5e02562b9d7fb201
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:37:53 -0700
groups: merge generated outbox payloads
- Rename outbox insertion to explicit idempotent merge semantics for persisted and derived records.
- Preserve existing persisted outbox status when recovery derives the same deterministic payload.
- Assert generated payload fields round-trip through outbox record persistence.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 40 insertions(+), 13 deletions(-)
diff --git a/crates/tangle_groups/src/outbox.rs b/crates/tangle_groups/src/outbox.rs
@@ -235,7 +235,7 @@ impl GroupOutbox {
Self::default()
}
- pub fn insert_idempotent(&mut self, record: GroupOutboxRecord) -> Result<bool, GroupError> {
+ pub fn merge_idempotent(&mut self, record: GroupOutboxRecord) -> Result<bool, GroupError> {
if let Some(existing) = self.records.get(record.key()) {
if existing.payload() == record.payload() {
return Ok(false);
@@ -466,7 +466,7 @@ impl GroupOutboxPayloadDocument {
mod tests {
use super::{
GroupCrashHooks, GroupCrashPoint, GroupOutbox, GroupOutboxEffect, GroupOutboxKey,
- GroupOutboxPayload, GroupOutboxRecord,
+ GroupOutboxPayload, GroupOutboxRecord, GroupOutboxStatus,
};
use crate::GroupId;
use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp};
@@ -487,20 +487,39 @@ mod tests {
}
#[test]
- fn outbox_insert_is_idempotent_for_same_payload() {
+ fn outbox_merge_is_idempotent_for_same_payload() {
let mut outbox = GroupOutbox::new();
let record = GroupOutboxRecord::pending(key(None), payload(9_000));
- assert!(outbox.insert_idempotent(record.clone()).expect("insert"));
- assert!(!outbox.insert_idempotent(record).expect("same"));
+ assert!(outbox.merge_idempotent(record.clone()).expect("insert"));
+ assert!(!outbox.merge_idempotent(record).expect("same"));
assert!(
outbox
- .insert_idempotent(GroupOutboxRecord::pending(key(None), payload(9_001)))
+ .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_001)))
.is_err()
);
}
#[test]
+ fn outbox_merge_preserves_persisted_status_for_same_payload() {
+ let mut outbox = GroupOutbox::new();
+ let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000));
+ let generated_event_id = EventId::new(&"9".repeat(64)).expect("event");
+ stored.mark_stored(generated_event_id.clone());
+
+ assert!(outbox.merge_idempotent(stored.clone()).expect("stored"));
+ assert!(
+ !outbox
+ .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_000)))
+ .expect("derived")
+ );
+ assert_eq!(
+ outbox.get(stored.key()).expect("record").status(),
+ &GroupOutboxStatus::Stored { generated_event_id }
+ );
+ }
+
+ #[test]
fn outbox_replay_plan_is_sorted_and_retryable_only() {
let mut outbox = GroupOutbox::new();
let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000));
@@ -516,8 +535,8 @@ mod tests {
);
retryable.mark_failed(true, "store failed");
- outbox.insert_idempotent(stored).expect("stored");
- outbox.insert_idempotent(retryable).expect("retryable");
+ outbox.merge_idempotent(stored).expect("stored");
+ outbox.merge_idempotent(retryable).expect("retryable");
let plan = outbox.replay_plan();
assert_eq!(plan.records().len(), 1);
@@ -530,11 +549,19 @@ mod tests {
let mut record = GroupOutboxRecord::pending(key(None), payload(39_000));
record.mark_failed(true, "pending retry");
+ let decoded = GroupOutboxRecord::from_json_bytes(&record.to_json_bytes().expect("bytes"))
+ .expect("record");
+ assert_eq!(decoded.payload().generated_kind(), 39_000);
+ assert_eq!(
+ decoded.payload().generated_created_at(),
+ UnixTimestamp::new(1)
+ );
assert_eq!(
- GroupOutboxRecord::from_json_bytes(&record.to_json_bytes().expect("bytes"))
- .expect("record"),
- record
+ decoded.payload().tags(),
+ &[vec!["h".to_owned(), "Farm".to_owned()]]
);
+ assert_eq!(decoded.payload().content(), "");
+ assert_eq!(decoded, record);
}
#[test]
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -165,7 +165,7 @@ impl GroupService {
self.persist_group_projection(store, group_id)?;
}
for record in self.plan_outbox_records(event, class)? {
- let inserted = self.outbox.insert_idempotent(record.clone())?;
+ let inserted = self.outbox.merge_idempotent(record.clone())?;
if inserted {
persist_outbox_record(store, &record)?;
}
@@ -209,7 +209,7 @@ impl GroupService {
&self.authority,
self.member_snapshot_cap,
)? {
- let inserted = self.outbox.insert_idempotent(record.clone())?;
+ let inserted = self.outbox.merge_idempotent(record.clone())?;
if inserted {
persist_outbox_record(store, &record)?;
}