commit ac146ec3ad0755aed20ed5c4b1f507081827a767
parent 368e49bf3cd33e4869e9f01aebd2eb7b1fb87318
Author: triesap <tyson@radroots.org>
Date: Tue, 26 May 2026 17:48:38 +0000
sqlite: guard listing currentness
Diffstat:
1 file changed, 492 insertions(+), 4 deletions(-)
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -768,6 +768,13 @@ impl<'a> AppLocalInteropRepository<'a> {
};
(farm_id, product_id)
};
+ let projection_record = ProjectionRecord {
+ kind: "listing",
+ projected_id: Some(product_id.to_string()),
+ };
+ if !self.signed_listing_is_current(record, listing_key.as_str())? {
+ return Ok(Some(projection_record));
+ }
self.ensure_farm_exists(farm_id)?;
let title = content
.as_ref()
@@ -857,10 +864,7 @@ impl<'a> AppLocalInteropRepository<'a> {
availability_window_id,
listing_bin_id,
})?;
- Ok(Some(ProjectionRecord {
- kind: "listing",
- projected_id: Some(product_id.to_string()),
- }))
+ Ok(Some(projection_record))
}
fn import_signed_order_request(
@@ -1617,6 +1621,132 @@ impl<'a> AppLocalInteropRepository<'a> {
}))
}
+ fn signed_listing_is_current(
+ &self,
+ record: &LocalEventRecord,
+ listing_key: &str,
+ ) -> Result<bool, AppSqliteError> {
+ if !signed_listing_has_public_evidence(record) {
+ return Ok(true);
+ }
+ let Some(incoming_key) = listing_currentness_key(
+ record.event_created_at,
+ record.event_id.as_deref(),
+ signed_event_evidence_precedence(
+ record.source_runtime.as_str(),
+ record.owner_account_id.as_deref(),
+ record.status.as_str(),
+ record.outbox_status.as_str(),
+ ),
+ ) else {
+ return Ok(true);
+ };
+ let Some(identity) = ListingCurrentnessIdentity::from_record(record, listing_key) else {
+ return Ok(true);
+ };
+ let Some(current_key) = self.current_listing_key(&identity)? else {
+ return Ok(true);
+ };
+ Ok(incoming_key >= current_key)
+ }
+
+ fn current_listing_key(
+ &self,
+ identity: &ListingCurrentnessIdentity,
+ ) -> Result<Option<ListingCurrentnessKey>, AppSqliteError> {
+ let mut keys = Vec::new();
+ match identity {
+ ListingCurrentnessIdentity::ListingAddress(listing_addr) => {
+ let mut statement = self
+ .connection
+ .prepare(
+ "SELECT
+ event_id,
+ event_created_at,
+ source_runtime,
+ owner_account_id,
+ local_status,
+ outbox_status,
+ relay_delivery_json
+ FROM local_interop_imports
+ WHERE record_family = 'signed_event'
+ AND projected_kind = 'listing'
+ AND listing_addr = ?1",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare current listing-address evidence query",
+ source,
+ })?;
+ let rows = statement
+ .query_map(params![listing_addr.as_str()], listing_currentness_row)
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query current listing-address evidence",
+ source,
+ })?;
+ for row in rows {
+ let evidence = row.map_err(|source| AppSqliteError::Query {
+ operation: "read current listing-address evidence",
+ source,
+ })?;
+ if let Some(key) = evidence.into_currentness_key() {
+ keys.push(key);
+ }
+ }
+ }
+ ListingCurrentnessIdentity::KindPubkeyDTag {
+ event_kind,
+ event_pubkey,
+ listing_key,
+ } => {
+ let mut statement = self
+ .connection
+ .prepare(
+ "SELECT
+ event_id,
+ event_created_at,
+ source_runtime,
+ owner_account_id,
+ local_status,
+ outbox_status,
+ relay_delivery_json,
+ event_tags_json,
+ event_content,
+ listing_addr
+ FROM local_interop_imports
+ WHERE record_family = 'signed_event'
+ AND projected_kind = 'listing'
+ AND event_kind = ?1
+ AND event_pubkey = ?2",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare current listing identity evidence query",
+ source,
+ })?;
+ let rows = statement
+ .query_map(
+ params![event_kind, event_pubkey.as_str()],
+ listing_currentness_identity_row,
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query current listing identity evidence",
+ source,
+ })?;
+ for row in rows {
+ let evidence = row.map_err(|source| AppSqliteError::Query {
+ operation: "read current listing identity evidence",
+ source,
+ })?;
+ if evidence.listing_key().as_deref() == Some(listing_key.as_str())
+ && let Some(key) = evidence.currentness.into_currentness_key()
+ {
+ keys.push(key);
+ }
+ }
+ }
+ }
+ Ok(keys.into_iter().max())
+ }
+
fn record_import(
&self,
record: &LocalEventRecord,
@@ -1814,6 +1944,104 @@ struct StoredLocalInteropSignedEventEvidence {
}
#[derive(Clone, Debug, Eq, PartialEq)]
+struct StoredListingCurrentnessEvidence {
+ event_id: Option<String>,
+ event_created_at: Option<i64>,
+ source_runtime: String,
+ owner_account_id: Option<String>,
+ local_status: String,
+ outbox_status: String,
+ relay_delivery_json: Option<String>,
+}
+
+impl StoredListingCurrentnessEvidence {
+ fn into_currentness_key(self) -> Option<ListingCurrentnessKey> {
+ if !signed_event_import_has_public_evidence(
+ self.local_status.as_str(),
+ self.outbox_status.as_str(),
+ self.relay_delivery_json.as_deref(),
+ ) {
+ return None;
+ }
+ listing_currentness_key(
+ self.event_created_at,
+ self.event_id.as_deref(),
+ signed_event_evidence_precedence(
+ self.source_runtime.as_str(),
+ self.owner_account_id.as_deref(),
+ self.local_status.as_str(),
+ self.outbox_status.as_str(),
+ ),
+ )
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+struct StoredListingCurrentnessIdentityEvidence {
+ currentness: StoredListingCurrentnessEvidence,
+ event_tags_json: Option<String>,
+ event_content: Option<String>,
+ listing_addr: Option<String>,
+}
+
+impl StoredListingCurrentnessIdentityEvidence {
+ fn listing_key(&self) -> Option<String> {
+ self.event_content
+ .as_deref()
+ .and_then(parse_json_value_opt)
+ .and_then(|content| string_at(&content, &["d_tag"]))
+ .or_else(|| {
+ self.event_tags_json
+ .as_deref()
+ .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
+ .and_then(|tags| tag_index_value(Some(&tags), "d", 1))
+ })
+ .or_else(|| self.listing_addr.as_deref().and_then(address_d_tag))
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+enum ListingCurrentnessIdentity {
+ ListingAddress(String),
+ KindPubkeyDTag {
+ event_kind: i64,
+ event_pubkey: String,
+ listing_key: String,
+ },
+}
+
+impl ListingCurrentnessIdentity {
+ fn from_record(record: &LocalEventRecord, listing_key: &str) -> Option<Self> {
+ if let Some(listing_addr) = record
+ .listing_addr
+ .as_deref()
+ .map(str::trim)
+ .filter(|listing_addr| !listing_addr.is_empty())
+ {
+ return Some(Self::ListingAddress(listing_addr.to_owned()));
+ }
+ let event_kind = record.event_kind?;
+ let event_pubkey = record
+ .event_pubkey
+ .as_deref()
+ .map(str::trim)
+ .filter(|event_pubkey| !event_pubkey.is_empty())?;
+ Some(Self::KindPubkeyDTag {
+ event_kind,
+ event_pubkey: event_pubkey.to_owned(),
+ listing_key: listing_key.to_owned(),
+ })
+ }
+}
+
+#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
+struct ListingCurrentnessKey {
+ event_created_at: i64,
+ evidence_precedence: u8,
+ event_id: String,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
struct ProductProjection {
product_id: ProductId,
farm_id: FarmId,
@@ -1838,6 +2066,54 @@ struct ExistingListingProjection {
farm_key: Option<String>,
}
+fn listing_currentness_row(
+ row: &rusqlite::Row<'_>,
+) -> rusqlite::Result<StoredListingCurrentnessEvidence> {
+ Ok(StoredListingCurrentnessEvidence {
+ event_id: row.get(0)?,
+ event_created_at: row.get(1)?,
+ source_runtime: row.get(2)?,
+ owner_account_id: row.get(3)?,
+ local_status: row.get(4)?,
+ outbox_status: row.get(5)?,
+ relay_delivery_json: row.get(6)?,
+ })
+}
+
+fn listing_currentness_identity_row(
+ row: &rusqlite::Row<'_>,
+) -> rusqlite::Result<StoredListingCurrentnessIdentityEvidence> {
+ Ok(StoredListingCurrentnessIdentityEvidence {
+ currentness: StoredListingCurrentnessEvidence {
+ event_id: row.get(0)?,
+ event_created_at: row.get(1)?,
+ source_runtime: row.get(2)?,
+ owner_account_id: row.get(3)?,
+ local_status: row.get(4)?,
+ outbox_status: row.get(5)?,
+ relay_delivery_json: row.get(6)?,
+ },
+ event_tags_json: row.get(7)?,
+ event_content: row.get(8)?,
+ listing_addr: row.get(9)?,
+ })
+}
+
+fn listing_currentness_key(
+ event_created_at: Option<i64>,
+ event_id: Option<&str>,
+ evidence_precedence: u8,
+) -> Option<ListingCurrentnessKey> {
+ Some(ListingCurrentnessKey {
+ event_created_at: event_created_at?,
+ evidence_precedence,
+ event_id: event_id
+ .map(str::trim)
+ .filter(|event_id| !event_id.is_empty())?
+ .to_owned(),
+ })
+}
+
fn signed_event_evidence_precedence(
source_runtime: &str,
owner_account_id: Option<&str>,
@@ -2302,6 +2578,23 @@ fn signed_listing_has_public_evidence(record: &LocalEventRecord) -> bool {
.is_some_and(|delivery| delivery.state == RelayDeliveryState::Observed)
}
+fn signed_event_import_has_public_evidence(
+ local_status: &str,
+ outbox_status: &str,
+ relay_delivery_json: Option<&str>,
+) -> bool {
+ if local_status != LocalRecordStatus::Published.as_str() {
+ return false;
+ }
+ if outbox_status == PublishOutboxStatus::Acknowledged.as_str() {
+ return true;
+ }
+ relay_delivery_json
+ .and_then(|delivery| serde_json::from_str::<Value>(delivery).ok())
+ .and_then(|delivery| RelayDeliveryEvidence::from_json_value(&delivery).ok())
+ .is_some_and(|delivery| delivery.state == RelayDeliveryState::Observed)
+}
+
fn signed_farm_readiness(content: &Value, tags: Option<&Value>) -> Option<FarmReadiness> {
string_at(content, &["readiness"])
.or_else(|| {
@@ -2849,6 +3142,48 @@ mod tests {
}
}
+ fn set_listing_event_version(
+ record: &mut LocalEventRecordInput,
+ event_id: &str,
+ created_at: i64,
+ title: &str,
+ inventory_available: &str,
+ ) {
+ record.event_id = Some(event_id.to_owned());
+ record.event_created_at = Some(created_at);
+ record.created_at_ms = created_at * 1_000;
+ record.inserted_at_ms = created_at * 1_000 + 1;
+ if let Some(content) = record.event_content.as_deref() {
+ let mut content: serde_json::Value =
+ serde_json::from_str(content).expect("listing content should parse");
+ content["product"]["title"] = json!(title);
+ content["inventory_available"] = json!(inventory_available);
+ record.event_content = Some(content.to_string());
+ }
+ if let Some(serde_json::Value::Array(tags)) = record.event_tags_json.as_mut() {
+ for tag in tags {
+ let Some(values) = tag.as_array_mut() else {
+ continue;
+ };
+ match values.first().and_then(serde_json::Value::as_str) {
+ Some("title") => {
+ values[1] = json!(title);
+ }
+ Some("inventory") => {
+ values[1] = json!(inventory_available);
+ }
+ _ => {}
+ }
+ }
+ }
+ record.raw_event_json = Some(json!({
+ "id": event_id,
+ "kind": record.event_kind,
+ "pubkey": record.event_pubkey,
+ "content": record.event_content,
+ }));
+ }
+
fn buyer_listing_titles(app_store: &AppSqliteStore) -> Vec<String> {
app_store
.load_buyer_listings("", &BTreeSet::new())
@@ -4817,6 +5152,159 @@ mod tests {
}
#[test]
+ fn older_signed_listing_import_does_not_roll_back_current_product_state() {
+ let app_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
+ let events = local_events_store();
+ let farm_key = "CURRENTFARMAAAAAAAAAA";
+ let listing_key = "CURRENTLISTINGBBBBBB";
+ let mut newer = signed_market_listing_record(
+ "listing-current-newer",
+ "seller-pubkey",
+ farm_key,
+ listing_key,
+ "New Eggs",
+ "12",
+ "active",
+ "pickup",
+ "North barn pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ set_listing_event_version(
+ &mut newer,
+ "event-listing-current-newer",
+ 2_000,
+ "New Eggs",
+ "12",
+ );
+ events.append_record(&newer).expect("append newer listing");
+ app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import newer listing");
+
+ let mut older = signed_market_listing_record(
+ "listing-current-older",
+ "seller-pubkey",
+ farm_key,
+ listing_key,
+ "Old Eggs",
+ "3",
+ "active",
+ "pickup",
+ "North barn pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ set_listing_event_version(
+ &mut older,
+ "event-listing-current-older",
+ 1_000,
+ "Old Eggs",
+ "3",
+ );
+ events.append_record(&older).expect("append older listing");
+
+ let report = app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import older listing");
+ let product: (String, Option<i64>) = app_store
+ .connection()
+ .query_row("SELECT title, stock_count FROM products", [], |row| {
+ Ok((row.get(0)?, row.get(1)?))
+ })
+ .expect("load product");
+ let imported = app_store
+ .load_local_interop_records()
+ .expect("load imported records");
+
+ assert_eq!(report.imported_records, 1);
+ assert_eq!(product.0, "New Eggs");
+ assert_eq!(product.1, Some(12));
+ assert_eq!(
+ imported
+ .iter()
+ .filter(|record| record.projected_kind == "listing")
+ .count(),
+ 2
+ );
+ }
+
+ #[test]
+ fn equal_timestamp_signed_listing_currentness_uses_event_id_tie_breaker() {
+ let app_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
+ let events = local_events_store();
+ let farm_key = "TIEFARMAAAAAAAAAAAAAA";
+ let listing_key = "TIELISTINGBBBBBBBBBB";
+ let mut winning = signed_market_listing_record(
+ "listing-tie-winning",
+ "seller-pubkey",
+ farm_key,
+ listing_key,
+ "Tie Winner Eggs",
+ "10",
+ "active",
+ "pickup",
+ "North barn pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ set_listing_event_version(
+ &mut winning,
+ "event-z-winning",
+ 3_000,
+ "Tie Winner Eggs",
+ "10",
+ );
+ events
+ .append_record(&winning)
+ .expect("append winning listing");
+ app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import winning listing");
+
+ let mut losing = signed_market_listing_record(
+ "listing-tie-losing",
+ "seller-pubkey",
+ farm_key,
+ listing_key,
+ "Tie Loser Eggs",
+ "1",
+ "active",
+ "pickup",
+ "North barn pickup",
+ 4_102_444_800,
+ 4_102_531_200,
+ LocalRecordStatus::Published,
+ PublishOutboxStatus::Acknowledged,
+ );
+ set_listing_event_version(&mut losing, "event-a-losing", 3_000, "Tie Loser Eggs", "1");
+ events
+ .append_record(&losing)
+ .expect("append losing listing");
+
+ app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import losing listing");
+ let product: (String, Option<i64>) = app_store
+ .connection()
+ .query_row("SELECT title, stock_count FROM products", [], |row| {
+ Ok((row.get(0)?, row.get(1)?))
+ })
+ .expect("load product");
+
+ assert_eq!(product.0, "Tie Winner Eggs");
+ assert_eq!(product.1, Some(10));
+ }
+
+ #[test]
fn signed_farm_import_prefers_event_identity_over_local_owner_metadata() {
let app_store =
AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");