commit 84c135fd3a9a77126f1f4a0db5b7b5bc454ce774
parent 16e08b9ea993ceac4f889912f15afea910b42ac6
Author: triesap <tyson@radroots.org>
Date: Tue, 26 May 2026 17:58:37 +0000
sqlite: make duplicate replacement atomic
Diffstat:
1 file changed, 198 insertions(+), 0 deletions(-)
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -314,6 +314,24 @@ impl<'a> AppLocalInteropRepository<'a> {
}
fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> {
+ self.begin_import_record_savepoint()?;
+ match self.import_record_inner(record) {
+ Ok(outcome) => {
+ self.release_import_record_savepoint()?;
+ Ok(outcome)
+ }
+ Err(error) => {
+ let _ = self.rollback_import_record_savepoint();
+ let _ = self.release_import_record_savepoint();
+ Err(error)
+ }
+ }
+ }
+
+ fn import_record_inner(
+ &self,
+ record: &LocalEventRecord,
+ ) -> Result<ImportOutcome, AppSqliteError> {
let superseded_listing_ids = match self.duplicate_signed_event_action(record)? {
DuplicateSignedEventAction::Import => Vec::new(),
DuplicateSignedEventAction::ReplaceExisting(event_id) => self
@@ -349,6 +367,33 @@ impl<'a> AppLocalInteropRepository<'a> {
}
}
+ fn begin_import_record_savepoint(&self) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute_batch("SAVEPOINT app_local_interop_import_record")
+ .map_err(|source| AppSqliteError::Query {
+ operation: "begin local interop import record transaction",
+ source,
+ })
+ }
+
+ fn rollback_import_record_savepoint(&self) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute_batch("ROLLBACK TO app_local_interop_import_record")
+ .map_err(|source| AppSqliteError::Query {
+ operation: "rollback local interop import record transaction",
+ source,
+ })
+ }
+
+ fn release_import_record_savepoint(&self) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute_batch("RELEASE app_local_interop_import_record")
+ .map_err(|source| AppSqliteError::Query {
+ operation: "release local interop import record transaction",
+ source,
+ })
+ }
+
fn duplicate_signed_event_action(
&self,
record: &LocalEventRecord,
@@ -5184,6 +5229,159 @@ mod tests {
}
#[test]
+ fn failed_duplicate_listing_replacement_rolls_back_prior_visible_state() {
+ let app_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
+ let events = local_events_store();
+ let farm_uuid = Uuid::from_u128(0x9b9b9b9b9b9b4b9bbb9b9b9b9b9b9b9b);
+ let product_uuid = Uuid::from_u128(0xabababababab4abababababababababa);
+ let farm_key = app_d_tag_from_uuid(farm_uuid);
+ let listing_key = app_d_tag_from_uuid(product_uuid);
+ let seller_pubkey = "app-seller-pubkey";
+ let duplicate_event_id = "duplicate-app-origin-rollback-event";
+ let mut network_listing = signed_market_listing_record(
+ "rollback-network-app-origin",
+ seller_pubkey,
+ farm_key.as_str(),
+ listing_key.as_str(),
+ "Rollback Relay Eggs",
+ "11",
+ "active",
+ "pickup",
+ "App farmstand pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ network_listing.source_runtime = SourceRuntime::Network;
+ network_listing.owner_account_id = None;
+ network_listing.record_id = "app:relay_event:rollback-app-origin".to_owned();
+ network_listing.event_id = Some(duplicate_event_id.to_owned());
+ events
+ .append_record(&network_listing)
+ .expect("append network app-origin listing");
+ app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import network app-origin listing");
+
+ let network_product_id =
+ deterministic_product_id(Some(seller_pubkey), listing_key.as_str());
+ let network_farm_id = deterministic_farm_id(Some(seller_pubkey), farm_key.as_str());
+ seed_app_projection(&app_store, farm_uuid, product_uuid);
+ app_store
+ .connection()
+ .execute(
+ "INSERT INTO buyer_carts (
+ buyer_context_key,
+ farm_id,
+ updated_at
+ ) VALUES ('account:acct_buyer', ?1, '2026-01-01T00:00:00Z')",
+ [network_farm_id.to_string()],
+ )
+ .expect("insert buyer cart header");
+ app_store
+ .connection()
+ .execute(
+ "INSERT INTO buyer_cart_lines (
+ buyer_context_key,
+ product_id,
+ quantity,
+ updated_at
+ ) VALUES ('account:acct_buyer', ?1, 2, '2026-01-01T00:00:00Z')",
+ [network_product_id.to_string()],
+ )
+ .expect("insert stale buyer cart line");
+ app_store
+ .connection()
+ .execute_batch(
+ format!(
+ "CREATE TEMP TRIGGER fail_duplicate_cart_delete
+ BEFORE DELETE ON buyer_cart_lines
+ WHEN old.product_id = '{}'
+ BEGIN
+ SELECT RAISE(ABORT, 'forced duplicate cart migration failure');
+ END;",
+ network_product_id
+ )
+ .as_str(),
+ )
+ .expect("create failure trigger");
+
+ let mut app_listing = signed_market_listing_record(
+ "rollback-app-signed-origin",
+ seller_pubkey,
+ farm_key.as_str(),
+ listing_key.as_str(),
+ "Rollback App Eggs",
+ "9",
+ "active",
+ "pickup",
+ "App farmstand pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ app_listing.source_runtime = SourceRuntime::App;
+ app_listing.record_id = "app:signed_event:rollback-app-origin".to_owned();
+ app_listing.event_id = Some(duplicate_event_id.to_owned());
+ events
+ .append_record(&app_listing)
+ .expect("append app signed duplicate listing");
+
+ app_store
+ .import_shared_local_events_from_store(&events)
+ .expect_err("duplicate replacement should roll back on cart migration failure");
+ let imported = app_store
+ .load_local_interop_records()
+ .expect("load imported records");
+ let product_count: i64 = app_store
+ .connection()
+ .query_row("SELECT COUNT(*) FROM products", [], |row| row.get(0))
+ .expect("product count");
+ let stale_cart_quantity: i64 = app_store
+ .connection()
+ .query_row(
+ "SELECT quantity FROM buyer_cart_lines WHERE product_id = ?1",
+ [network_product_id.to_string()],
+ |row| row.get(0),
+ )
+ .expect("stale cart quantity");
+ let canonical_cart_count: i64 = app_store
+ .connection()
+ .query_row(
+ "SELECT COUNT(*) FROM buyer_cart_lines WHERE product_id = ?1",
+ [product_uuid.to_string()],
+ |row| row.get(0),
+ )
+ .expect("canonical cart count");
+ let network_product_title: String = app_store
+ .connection()
+ .query_row(
+ "SELECT title FROM products WHERE id = ?1",
+ [network_product_id.to_string()],
+ |row| row.get(0),
+ )
+ .expect("network product title");
+
+ assert_eq!(product_count, 2);
+ assert_eq!(stale_cart_quantity, 2);
+ assert_eq!(canonical_cart_count, 0);
+ assert_eq!(network_product_title, "Rollback Relay Eggs");
+ assert!(
+ imported
+ .iter()
+ .any(|record| record.record_id == "app:relay_event:rollback-app-origin")
+ );
+ assert!(
+ imported
+ .iter()
+ .all(|record| record.record_id != "app:signed_event:rollback-app-origin")
+ );
+ }
+
+ #[test]
fn buyer_visibility_rejects_incomplete_unpublished_stale_and_unsupported_records() {
for record in [
signed_market_listing_record(