commit 56115ac5c89acdbde34d012cba67f9d3a411b985
parent b0fc6c564d8d1a06a943df6de2de0a7bba6c4d03
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 01:59:52 -0700
trade: reduce unified order event records
- add public reduction from unified order event records
- dedupe unified input by event id before grouping by variant
- delegate grouped records through the existing reducer semantics
- validate with cargo fmt, check, and tests for radroots_trade
Diffstat:
1 file changed, 146 insertions(+), 3 deletions(-)
diff --git a/crates/trade/src/order.rs b/crates/trade/src/order.rs
@@ -674,7 +674,7 @@ where
P: IntoIterator<Item = RadrootsOrderPaymentEventRecord>,
Q: IntoIterator<Item = RadrootsOrderSettlementRecord>,
{
- reduce_order_event_records(
+ reduce_grouped_order_event_records(
order_id,
requests.into_iter().collect(),
decisions.into_iter().collect(),
@@ -688,7 +688,59 @@ where
)
}
-fn reduce_order_event_records(
+#[cfg_attr(coverage_nightly, coverage(off))]
+pub fn reduce_order_event_records<I>(
+ order_id: &RadrootsOrderId,
+ records: I,
+) -> RadrootsOrderProjection
+where
+ I: IntoIterator<Item = RadrootsOrderEventRecord>,
+{
+ let mut seen_event_ids = Vec::new();
+ let mut requests = Vec::new();
+ let mut decisions = Vec::new();
+ let mut revision_proposals = Vec::new();
+ let mut revision_decisions = Vec::new();
+ let mut fulfillments = Vec::new();
+ let mut cancellations = Vec::new();
+ let mut receipts = Vec::new();
+ let mut payments = Vec::new();
+ let mut settlements = Vec::new();
+
+ for record in records {
+ let event_id = record.event_id().clone();
+ if seen_event_ids.iter().any(|seen| seen == &event_id) {
+ continue;
+ }
+ seen_event_ids.push(event_id);
+ match record {
+ RadrootsOrderEventRecord::Request(record) => requests.push(record),
+ RadrootsOrderEventRecord::Decision(record) => decisions.push(record),
+ RadrootsOrderEventRecord::RevisionProposal(record) => revision_proposals.push(record),
+ RadrootsOrderEventRecord::RevisionDecision(record) => revision_decisions.push(record),
+ RadrootsOrderEventRecord::Fulfillment(record) => fulfillments.push(record),
+ RadrootsOrderEventRecord::Cancellation(record) => cancellations.push(record),
+ RadrootsOrderEventRecord::Receipt(record) => receipts.push(record),
+ RadrootsOrderEventRecord::Payment(record) => payments.push(record),
+ RadrootsOrderEventRecord::Settlement(record) => settlements.push(record),
+ }
+ }
+
+ reduce_grouped_order_event_records(
+ order_id,
+ requests,
+ decisions,
+ revision_proposals,
+ revision_decisions,
+ fulfillments,
+ cancellations,
+ receipts,
+ payments,
+ settlements,
+ )
+}
+
+fn reduce_grouped_order_event_records(
order_id: &RadrootsOrderId,
requests: Vec<RadrootsOrderRequestRecord>,
decisions: Vec<RadrootsOrderDecisionRecord>,
@@ -3775,7 +3827,7 @@ mod tests {
inventory_issue_sort_key, order_event_record_from_event, projection_issue_event_ids,
radroots_order_economics_digest,
reduce_listing_inventory_accounting as reduce_listing_inventory_accounting_with_revisions,
- reduce_order_events as reduce_order_events_with_revisions,
+ reduce_order_event_records, reduce_order_events as reduce_order_events_with_revisions,
};
const SELLER: &str = "1111111111111111111111111111111111111111111111111111111111111111";
@@ -4553,6 +4605,97 @@ mod tests {
));
}
+ #[test]
+ fn reduce_order_event_records_dedupes_duplicate_event_ids() {
+ let request = request_record_with_event_id("unified-duplicate");
+ let mut duplicate = request.clone();
+ duplicate.payload.items[0].bin_count = 4;
+
+ let projection = reduce_order_event_records(
+ &order_id("order-1"),
+ [
+ RadrootsOrderEventRecord::Request(request.clone()),
+ RadrootsOrderEventRecord::Request(duplicate),
+ ],
+ );
+
+ assert_eq!(projection.status, RadrootsOrderStatus::Requested);
+ assert_eq!(projection.request_event_id, Some(request.event_id));
+ assert!(projection.issues.is_empty());
+ }
+
+ #[test]
+ fn reduce_order_event_records_is_stable_for_shuffled_input() {
+ let request = request_record();
+ let decision = accepted_decision_record("decision-1");
+ let fulfillment = fulfillment_record(
+ "fulfillment-1",
+ "decision-1",
+ RadrootsOrderFulfillmentState::ReadyForPickup,
+ );
+ let receipt = receipt_record("receipt-1", "fulfillment-1", true);
+
+ let grouped = reduce_order_events(
+ "order-1",
+ [request.clone()],
+ [decision.clone()],
+ [fulfillment.clone()],
+ [],
+ [receipt.clone()],
+ );
+ let unified = reduce_order_event_records(
+ &order_id("order-1"),
+ [
+ RadrootsOrderEventRecord::Receipt(receipt),
+ RadrootsOrderEventRecord::Fulfillment(fulfillment),
+ RadrootsOrderEventRecord::Decision(decision),
+ RadrootsOrderEventRecord::Request(request),
+ ],
+ );
+
+ assert_eq!(unified, grouped);
+ }
+
+ #[test]
+ fn reduce_order_event_records_reports_missing_for_empty_stream() {
+ let projection = reduce_order_event_records(
+ &order_id("order-1"),
+ Vec::<RadrootsOrderEventRecord>::new(),
+ );
+
+ assert_eq!(projection.status, RadrootsOrderStatus::Missing);
+ assert!(projection.issues.is_empty());
+ }
+
+ #[test]
+ fn reduce_order_event_records_reports_multiple_requests_deterministically() {
+ let first = request_record_with_event_id("request-a");
+ let second = request_record_with_event_id("request-b");
+ let projection = reduce_order_event_records(
+ &order_id("order-1"),
+ [
+ RadrootsOrderEventRecord::Request(second.clone()),
+ RadrootsOrderEventRecord::Request(first.clone()),
+ ],
+ );
+ let reversed = reduce_order_event_records(
+ &order_id("order-1"),
+ [
+ RadrootsOrderEventRecord::Request(first),
+ RadrootsOrderEventRecord::Request(second),
+ ],
+ );
+
+ assert_eq!(projection, reversed);
+ assert_eq!(projection.status, RadrootsOrderStatus::Invalid);
+ assert!(
+ projection
+ .issues
+ .iter()
+ .any(|issue| matches!(issue, RadrootsOrderIssue::MultipleRequests { .. }))
+ );
+ }
+
fn reduce_order_events<I, J, K, L, M>(
order_id: &str,
requests: I,