commit 16e08b9ea993ceac4f889912f15afea910b42ac6
parent ac146ec3ad0755aed20ed5c4b1f507081827a767
Author: triesap <tyson@radroots.org>
Date: Tue, 26 May 2026 17:54:57 +0000
sqlite: preserve buyer state on listing convergence
Diffstat:
2 files changed, 279 insertions(+), 45 deletions(-)
diff --git a/crates/shared/sqlite/src/buyer.rs b/crates/shared/sqlite/src/buyer.rs
@@ -728,7 +728,7 @@ impl<'a> AppBuyerRepository<'a> {
context: &BuyerContext,
) -> Result<BuyerOrdersProjection, AppSqliteError> {
let now_utc = self.current_utc_timestamp()?;
- let visible_listings = self.visible_listing_records_by_id(&now_utc)?;
+ let visible_listings = self.visible_listing_index(&now_utc)?;
let context_key = context.storage_key();
let mut statement = self
.connection
@@ -815,7 +815,7 @@ impl<'a> AppBuyerRepository<'a> {
order_id: OrderId,
) -> Result<Option<BuyerOrderDetailProjection>, AppSqliteError> {
let now_utc = self.current_utc_timestamp()?;
- let visible_listings = self.visible_listing_records_by_id(&now_utc)?;
+ let visible_listings = self.visible_listing_index(&now_utc)?;
let context_key = context.storage_key();
let record = self
.connection
@@ -1014,7 +1014,7 @@ impl<'a> AppBuyerRepository<'a> {
return Ok(BuyerRepeatDemandApplyOutcome::Unavailable);
};
let now_utc = self.current_utc_timestamp()?;
- let visible_listings = self.visible_listing_records_by_id(&now_utc)?;
+ let visible_listings = self.visible_listing_index(&now_utc)?;
let Some(candidate) = self.build_repeat_demand_candidate(
order_id,
farm_id,
@@ -1798,7 +1798,7 @@ impl<'a> AppBuyerRepository<'a> {
let mut statement = self
.connection
.prepare(
- "select id, quantity_value
+ "select id, quantity_value, listing_addr
from order_lines
where order_id = ?1
order by sort_index asc, id asc",
@@ -1809,7 +1809,11 @@ impl<'a> AppBuyerRepository<'a> {
})?;
let rows = statement
.query_map(params![order_id.to_string()], |row| {
- Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, i64>(1)?,
+ row.get::<_, Option<String>>(2)?,
+ ))
})
.map_err(|source| AppSqliteError::Query {
operation: "query repeat demand order lines",
@@ -1818,10 +1822,11 @@ impl<'a> AppBuyerRepository<'a> {
let mut order_lines = Vec::new();
for row in rows {
- let (line_id, quantity_value) = row.map_err(|source| AppSqliteError::Query {
- operation: "read repeat demand order lines",
- source,
- })?;
+ let (line_id, quantity_value, listing_addr) =
+ row.map_err(|source| AppSqliteError::Query {
+ operation: "read repeat demand order lines",
+ source,
+ })?;
let product_id = parse_repeat_demand_product_id(line_id.as_str())?;
let quantity =
u32::try_from(quantity_value).map_err(|_| AppSqliteError::InvalidProjection {
@@ -1836,6 +1841,7 @@ impl<'a> AppBuyerRepository<'a> {
order_lines.push(RepeatDemandOrderLine {
product_id,
quantity,
+ listing_addr: listing_addr.and_then(empty_string_to_none),
});
}
@@ -1873,16 +1879,12 @@ impl<'a> AppBuyerRepository<'a> {
.transpose()
}
- fn visible_listing_records_by_id(
- &self,
- now_utc: &str,
- ) -> Result<BTreeMap<ProductId, BuyerListingRecord>, AppSqliteError> {
- Ok(self
- .load_listing_records()?
- .into_iter()
- .filter(|record| record.is_buyer_visible(now_utc))
- .map(|record| (record.product_id, record))
- .collect())
+ fn visible_listing_index(&self, now_utc: &str) -> Result<VisibleListingIndex, AppSqliteError> {
+ Ok(VisibleListingIndex::from_records(
+ self.load_listing_records()?
+ .into_iter()
+ .filter(|record| record.is_buyer_visible(now_utc)),
+ ))
}
fn build_repeat_demand_handoff(
@@ -1890,7 +1892,7 @@ impl<'a> AppBuyerRepository<'a> {
order_id: OrderId,
farm_id: FarmId,
farm_display_name: &str,
- visible_listings: &BTreeMap<ProductId, BuyerListingRecord>,
+ visible_listings: &VisibleListingIndex,
) -> Result<Option<RepeatDemandHandoffProjection>, AppSqliteError> {
Ok(self
.build_repeat_demand_candidate(order_id, farm_id, farm_display_name, visible_listings)?
@@ -1902,7 +1904,7 @@ impl<'a> AppBuyerRepository<'a> {
order_id: OrderId,
farm_id: FarmId,
farm_display_name: &str,
- visible_listings: &BTreeMap<ProductId, BuyerListingRecord>,
+ visible_listings: &VisibleListingIndex,
) -> Result<Option<RepeatDemandCandidate>, AppSqliteError> {
let order_lines = self.load_repeat_demand_order_lines(order_id)?;
if order_lines.is_empty() {
@@ -1913,15 +1915,11 @@ impl<'a> AppBuyerRepository<'a> {
let mut unavailable_item_count = 0u32;
for order_line in &order_lines {
- if let Some(listing) = visible_listings
- .get(&order_line.product_id)
- .filter(|listing| {
- listing
- .stock_count
- .is_some_and(|quantity| quantity >= order_line.quantity)
- })
- .cloned()
- {
+ if let Some(listing) = visible_listings.resolve(order_line).filter(|listing| {
+ listing
+ .stock_count
+ .is_some_and(|quantity| quantity >= order_line.quantity)
+ }) {
available_lines.push(BuyerCartLineRecord {
listing,
quantity: order_line.quantity,
@@ -1943,14 +1941,19 @@ impl<'a> AppBuyerRepository<'a> {
} else {
RepeatDemandEligibility::Partial
};
+ let current_farm = available_lines
+ .first()
+ .map(|line| (line.listing.farm_id, line.listing.farm_display_name.clone()))
+ .unwrap_or_else(|| (farm_id, farm_display_name.to_owned()));
+ let (current_farm_id, current_farm_display_name) = current_farm;
Ok(Some(RepeatDemandCandidate {
- farm_id,
- farm_display_name: farm_display_name.to_owned(),
+ farm_id: current_farm_id,
+ farm_display_name: current_farm_display_name,
available_lines,
handoff: RepeatDemandHandoffProjection {
order_id,
- farm_id,
+ farm_id: current_farm_id,
eligibility,
available_item_count,
unavailable_item_count,
@@ -2204,6 +2207,39 @@ struct BuyerCartLineRecord {
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
+struct VisibleListingIndex {
+ by_product_id: BTreeMap<ProductId, BuyerListingRecord>,
+ by_listing_addr: BTreeMap<String, BuyerListingRecord>,
+}
+
+impl VisibleListingIndex {
+ fn from_records(records: impl IntoIterator<Item = BuyerListingRecord>) -> Self {
+ let mut index = Self::default();
+ for record in records {
+ if let Some(listing_addr) = record.listing_addr.as_deref() {
+ index
+ .by_listing_addr
+ .insert(listing_addr.to_owned(), record.clone());
+ }
+ index.by_product_id.insert(record.product_id, record);
+ }
+ index
+ }
+
+ fn resolve(&self, order_line: &RepeatDemandOrderLine) -> Option<BuyerListingRecord> {
+ self.by_product_id
+ .get(&order_line.product_id)
+ .or_else(|| {
+ order_line
+ .listing_addr
+ .as_deref()
+ .and_then(|listing_addr| self.by_listing_addr.get(listing_addr))
+ })
+ .cloned()
+ }
+}
+
+#[derive(Clone, Debug, Default, Eq, PartialEq)]
struct BuyerCartLineSnapshot {
listing_bin_id: Option<String>,
farm_key: Option<String>,
@@ -2241,10 +2277,11 @@ impl BuyerCartLineRecord {
}
}
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
struct RepeatDemandOrderLine {
product_id: ProductId,
quantity: u32,
+ listing_addr: Option<String>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -314,20 +314,32 @@ impl<'a> AppLocalInteropRepository<'a> {
}
fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> {
- match self.duplicate_signed_event_action(record)? {
- DuplicateSignedEventAction::Import => {}
- DuplicateSignedEventAction::ReplaceExisting(event_id) => {
- self.delete_duplicate_signed_events(event_id.as_str(), record.record_id.as_str())?;
- }
+ let superseded_listing_ids = match self.duplicate_signed_event_action(record)? {
+ DuplicateSignedEventAction::Import => Vec::new(),
+ DuplicateSignedEventAction::ReplaceExisting(event_id) => self
+ .delete_duplicate_signed_event_imports(
+ event_id.as_str(),
+ record.record_id.as_str(),
+ )?,
DuplicateSignedEventAction::Skip => return Ok(ImportOutcome::Skipped),
- }
+ };
let projection = match record.family {
LocalRecordFamily::LocalWork => self.import_local_work(record)?,
LocalRecordFamily::SignedEvent => self.import_signed_event(record)?,
};
match projection {
Some(projection) => {
- self.record_import(record, projection.kind, projection.projected_id)?;
+ let projected_kind = projection.kind;
+ let projected_id = projection.projected_id;
+ self.record_import(record, projected_kind, projected_id.clone())?;
+ if projected_kind == "listing" {
+ if let Some(projected_id) = projected_id.as_deref() {
+ self.finish_duplicate_listing_replacement(
+ &superseded_listing_ids,
+ projected_id,
+ )?;
+ }
+ }
Ok(ImportOutcome::Imported)
}
None => {
@@ -411,11 +423,11 @@ impl<'a> AppLocalInteropRepository<'a> {
}
}
- fn delete_duplicate_signed_events(
+ fn delete_duplicate_signed_event_imports(
&self,
event_id: &str,
record_id: &str,
- ) -> Result<(), AppSqliteError> {
+ ) -> Result<Vec<String>, AppSqliteError> {
let superseded_listing_ids =
self.superseded_duplicate_listing_projection_ids(event_id, record_id)?;
self.connection
@@ -430,7 +442,19 @@ impl<'a> AppLocalInteropRepository<'a> {
operation: "delete superseded duplicate local interop signed event",
source,
})?;
- self.delete_unreferenced_listing_products(&superseded_listing_ids)?;
+ Ok(superseded_listing_ids)
+ }
+
+ fn finish_duplicate_listing_replacement(
+ &self,
+ superseded_listing_ids: &[String],
+ canonical_listing_product_id: &str,
+ ) -> Result<(), AppSqliteError> {
+ self.migrate_duplicate_buyer_cart_lines(
+ superseded_listing_ids,
+ canonical_listing_product_id,
+ )?;
+ self.delete_unreferenced_listing_products(superseded_listing_ids)?;
Ok(())
}
@@ -494,6 +518,80 @@ impl<'a> AppLocalInteropRepository<'a> {
Ok(())
}
+ fn migrate_duplicate_buyer_cart_lines(
+ &self,
+ product_ids: &[String],
+ canonical_product_id: &str,
+ ) -> Result<(), AppSqliteError> {
+ for product_id in product_ids {
+ if product_id == canonical_product_id {
+ continue;
+ }
+ self.connection
+ .execute(
+ "INSERT INTO buyer_cart_lines (
+ buyer_context_key,
+ product_id,
+ quantity,
+ listing_bin_id,
+ quantity_unit_label,
+ unit_price_minor_units,
+ price_currency,
+ farm_key,
+ listing_addr,
+ listing_event_id,
+ seller_pubkey,
+ listing_relays_json,
+ updated_at
+ )
+ SELECT
+ buyer_context_key,
+ ?2,
+ quantity,
+ listing_bin_id,
+ quantity_unit_label,
+ unit_price_minor_units,
+ price_currency,
+ farm_key,
+ listing_addr,
+ listing_event_id,
+ seller_pubkey,
+ listing_relays_json,
+ strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
+ FROM buyer_cart_lines
+ WHERE product_id = ?1
+ ON CONFLICT(buyer_context_key, product_id) DO UPDATE SET
+ quantity = buyer_cart_lines.quantity + excluded.quantity,
+ listing_bin_id = coalesce(nullif(buyer_cart_lines.listing_bin_id, ''), excluded.listing_bin_id),
+ quantity_unit_label = coalesce(nullif(buyer_cart_lines.quantity_unit_label, ''), excluded.quantity_unit_label),
+ unit_price_minor_units = coalesce(buyer_cart_lines.unit_price_minor_units, excluded.unit_price_minor_units),
+ price_currency = coalesce(nullif(buyer_cart_lines.price_currency, ''), excluded.price_currency),
+ farm_key = coalesce(nullif(buyer_cart_lines.farm_key, ''), excluded.farm_key),
+ listing_addr = coalesce(nullif(buyer_cart_lines.listing_addr, ''), excluded.listing_addr),
+ listing_event_id = coalesce(nullif(buyer_cart_lines.listing_event_id, ''), excluded.listing_event_id),
+ seller_pubkey = coalesce(nullif(buyer_cart_lines.seller_pubkey, ''), excluded.seller_pubkey),
+ listing_relays_json = coalesce(nullif(buyer_cart_lines.listing_relays_json, ''), excluded.listing_relays_json),
+ updated_at = excluded.updated_at",
+ params![product_id, canonical_product_id],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "migrate duplicate listing buyer cart lines",
+ source,
+ })?;
+ self.connection
+ .execute(
+ "DELETE FROM buyer_cart_lines
+ WHERE product_id = ?1",
+ params![product_id],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "delete migrated duplicate listing buyer cart lines",
+ source,
+ })?;
+ }
+ Ok(())
+ }
+
fn import_local_work(
&self,
record: &LocalEventRecord,
@@ -2871,7 +2969,7 @@ mod tests {
KIND_FARM, KIND_LISTING, KIND_ORDER_REQUEST, deterministic_farm_id,
deterministic_product_id, projected_farm_id, projected_order_id, projected_product_id,
};
- use crate::{AppSqliteStore, DatabaseTarget};
+ use crate::{AppSqliteStore, BuyerRepeatDemandApplyOutcome, DatabaseTarget};
fn local_events_store() -> LocalEventsStore<SqliteExecutor> {
let executor = SqliteExecutor::open_memory().expect("open local events memory db");
@@ -4925,6 +5023,75 @@ mod tests {
.expect("network product count");
assert_eq!(network_product_count, 1);
assert_ne!(network_product_id.as_uuid(), product_uuid);
+ let buyer_context = BuyerContext::account("acct_buyer");
+ let network_listing = app_store
+ .load_buyer_product_detail(network_product_id)
+ .expect("network buyer detail should load")
+ .expect("network listing should exist")
+ .listing;
+ app_store
+ .replace_buyer_cart(
+ &buyer_context,
+ &radroots_app_models::BuyerCartProjection {
+ farm_id: Some(network_listing.farm_id),
+ farm_display_name: Some(network_listing.farm_display_name.clone()),
+ lines: vec![radroots_app_models::BuyerCartLineProjection {
+ product_id: network_listing.product_id,
+ farm_id: network_listing.farm_id,
+ farm_display_name: network_listing.farm_display_name.clone(),
+ title: network_listing.title.clone(),
+ quantity: 2,
+ unit_price: network_listing.price.clone(),
+ line_total_minor_units: 1600,
+ fulfillment_summary: network_listing
+ .next_fulfillment_window_label
+ .clone()
+ .expect("network listing fulfillment summary"),
+ }],
+ subtotal_minor_units: Some(1600),
+ currency_code: Some("USD".to_owned()),
+ replace_confirmation: None,
+ },
+ )
+ .expect("buyer cart should save");
+ app_store
+ .save_buyer_checkout_draft(
+ &buyer_context,
+ &radroots_app_models::BuyerCheckoutDraft {
+ name: "Casey Buyer".to_owned(),
+ email: "casey@example.test".to_owned(),
+ phone: String::new(),
+ order_note: String::new(),
+ },
+ )
+ .expect("checkout draft should save");
+ let order_id = app_store
+ .place_buyer_order(&buyer_context)
+ .expect("buyer order should place");
+ app_store
+ .replace_buyer_cart(
+ &buyer_context,
+ &radroots_app_models::BuyerCartProjection {
+ farm_id: Some(network_listing.farm_id),
+ farm_display_name: Some(network_listing.farm_display_name.clone()),
+ lines: vec![radroots_app_models::BuyerCartLineProjection {
+ product_id: network_listing.product_id,
+ farm_id: network_listing.farm_id,
+ farm_display_name: network_listing.farm_display_name.clone(),
+ title: network_listing.title.clone(),
+ quantity: 3,
+ unit_price: network_listing.price,
+ line_total_minor_units: 2400,
+ fulfillment_summary: network_listing
+ .next_fulfillment_window_label
+ .expect("network listing fulfillment summary"),
+ }],
+ subtotal_minor_units: Some(2400),
+ currency_code: Some("USD".to_owned()),
+ replace_confirmation: None,
+ },
+ )
+ .expect("buyer cart should save again");
seed_app_projection(&app_store, farm_uuid, product_uuid);
let mut app_listing = signed_market_listing_record(
@@ -4971,9 +5138,24 @@ mod tests {
.iter()
.find(|record| record.record_id == "app:signed_event:duplicate-app-origin")
.expect("app signed duplicate listing import");
+ let migrated_cart = app_store
+ .load_buyer_cart(&buyer_context)
+ .expect("buyer cart should load after duplicate convergence");
+ let order_line_id: String = app_store
+ .connection()
+ .query_row(
+ "SELECT id FROM order_lines WHERE order_id = ?1",
+ [order_id.to_string()],
+ |row| row.get(0),
+ )
+ .expect("order line id should load");
assert_eq!(product_count, 1);
assert_eq!(stale_product_count, 0);
+ assert_eq!(migrated_cart.lines.len(), 1);
+ assert_eq!(migrated_cart.lines[0].product_id.as_uuid(), product_uuid);
+ assert_eq!(migrated_cart.lines[0].quantity, 3);
+ assert!(order_line_id.contains(network_product_id.to_string().as_str()));
assert_eq!(listing_import.source_runtime, SourceRuntime::App.as_str());
assert_eq!(
listing_import.projected_id.as_deref(),
@@ -4984,6 +5166,21 @@ mod tests {
.iter()
.all(|record| record.record_id != "app:relay_event:duplicate-app-origin")
);
+ app_store
+ .clear_buyer_cart(&buyer_context)
+ .expect("buyer cart should clear");
+ assert_eq!(
+ app_store
+ .apply_buyer_repeat_demand_to_cart(&buyer_context, order_id, false)
+ .expect("repeat demand should apply"),
+ BuyerRepeatDemandApplyOutcome::Applied
+ );
+ let repeated_cart = app_store
+ .load_buyer_cart(&buyer_context)
+ .expect("buyer cart should load after repeat demand");
+ assert_eq!(repeated_cart.lines.len(), 1);
+ assert_eq!(repeated_cart.lines[0].product_id.as_uuid(), product_uuid);
+ assert_eq!(repeated_cart.lines[0].quantity, 2);
}
#[test]