commit 3ce26f6709e79c32b3c690ddff702e3213d05be6
parent c18f87f86923b1c84cd7b7bf556f4ec0d79c4e4a
Author: triesap <tyson@radroots.org>
Date: Sat, 23 May 2026 19:57:01 +0000
sqlite: replay app local interop records
Diffstat:
1 file changed, 345 insertions(+), 21 deletions(-)
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -109,7 +109,6 @@ impl<'a> AppLocalInteropRepository<'a> {
match self.import_record(&record)? {
ImportOutcome::Imported => report.imported_records += 1,
ImportOutcome::Skipped => report.skipped_records += 1,
- ImportOutcome::SelfObserved => report.self_observed_records += 1,
}
}
if batch_len < LOCAL_EVENTS_BATCH_LIMIT as usize {
@@ -224,9 +223,6 @@ impl<'a> AppLocalInteropRepository<'a> {
}
fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> {
- if record.source_runtime == SourceRuntime::App {
- return Ok(ImportOutcome::SelfObserved);
- }
let projection = match record.family {
LocalRecordFamily::LocalWork => self.import_local_work(record)?,
LocalRecordFamily::SignedEvent => self.import_signed_event(record)?,
@@ -288,7 +284,13 @@ impl<'a> AppLocalInteropRepository<'a> {
return Ok(None);
};
let owner_pubkey = record.owner_pubkey.clone();
- let farm_id = deterministic_farm_id(owner_pubkey.as_deref(), farm_key.as_str());
+ let Some(farm_id) = projected_farm_id(
+ record.source_runtime,
+ owner_pubkey.as_deref(),
+ farm_key.as_str(),
+ ) else {
+ return Ok(None);
+ };
let display_name = string_at(document, &["profile", "display_name"])
.or_else(|| string_at(document, &["profile", "name"]))
.or_else(|| string_at(document, &["farm", "name"]))
@@ -348,9 +350,21 @@ impl<'a> AppLocalInteropRepository<'a> {
let Some(farm_key) = farm_key else {
return Ok(None);
};
- let farm_id = deterministic_farm_id(owner_pubkey.as_deref(), farm_key.as_str());
+ let Some(farm_id) = projected_farm_id(
+ record.source_runtime,
+ owner_pubkey.as_deref(),
+ farm_key.as_str(),
+ ) else {
+ return Ok(None);
+ };
self.ensure_farm_exists(farm_id)?;
- let product_id = deterministic_product_id(owner_pubkey.as_deref(), listing_key.as_str());
+ let Some(product_id) = projected_product_id(
+ record.source_runtime,
+ owner_pubkey.as_deref(),
+ listing_key.as_str(),
+ ) else {
+ return Ok(None);
+ };
let title = string_at(document, &["product", "title"])
.or_else(|| string_at(document, &["product", "key"]))
.unwrap_or_else(|| "Local product".to_owned());
@@ -400,7 +414,11 @@ impl<'a> AppLocalInteropRepository<'a> {
.event_pubkey
.as_deref()
.or(record.owner_pubkey.as_deref());
- let farm_id = deterministic_farm_id(owner_pubkey, farm_key.as_str());
+ let Some(farm_id) =
+ projected_farm_id(record.source_runtime, owner_pubkey, farm_key.as_str())
+ else {
+ return Ok(None);
+ };
let display_name =
string_at(&content, &["name"]).unwrap_or_else(|| "Local farm".to_owned());
self.upsert_farm_summary(&FarmSummary {
@@ -452,9 +470,17 @@ impl<'a> AppLocalInteropRepository<'a> {
.as_deref()
.or(signed_farm_pubkey.as_deref())
.or(record.owner_pubkey.as_deref());
- let farm_id = deterministic_farm_id(farm_pubkey, farm_key.as_str());
+ let Some(farm_id) =
+ projected_farm_id(record.source_runtime, farm_pubkey, farm_key.as_str())
+ else {
+ return Ok(None);
+ };
self.ensure_farm_exists(farm_id)?;
- let product_id = deterministic_product_id(listing_pubkey, listing_key.as_str());
+ let Some(product_id) =
+ projected_product_id(record.source_runtime, listing_pubkey, listing_key.as_str())
+ else {
+ return Ok(None);
+ };
let title = content
.as_ref()
.and_then(|content| string_at(content, &["product", "title"]))
@@ -728,7 +754,6 @@ impl AppSqliteStore {
enum ImportOutcome {
Imported,
Skipped,
- SelfObserved,
}
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -766,6 +791,28 @@ fn deterministic_product_id(owner_pubkey: Option<&str>, listing_key: &str) -> Pr
))
}
+fn projected_farm_id(
+ source_runtime: SourceRuntime,
+ owner_pubkey: Option<&str>,
+ farm_key: &str,
+) -> Option<FarmId> {
+ match source_runtime {
+ SourceRuntime::App => parse_app_d_tag_uuid(farm_key).map(FarmId::from),
+ _ => Some(deterministic_farm_id(owner_pubkey, farm_key)),
+ }
+}
+
+fn projected_product_id(
+ source_runtime: SourceRuntime,
+ owner_pubkey: Option<&str>,
+ listing_key: &str,
+) -> Option<ProductId> {
+ match source_runtime {
+ SourceRuntime::App => parse_app_d_tag_uuid(listing_key).map(ProductId::from),
+ _ => Some(deterministic_product_id(owner_pubkey, listing_key)),
+ }
+}
+
fn deterministic_uuid(scope: &str, owner_pubkey: Option<&str>, key: &str) -> Uuid {
let seed = format!(
"{scope}:{}:{}",
@@ -775,6 +822,41 @@ fn deterministic_uuid(scope: &str, owner_pubkey: Option<&str>, key: &str) -> Uui
Uuid::new_v5(&Uuid::NAMESPACE_URL, seed.as_bytes())
}
+fn parse_app_d_tag_uuid(value: &str) -> Option<Uuid> {
+ let mut decoded = Vec::with_capacity(16);
+ let mut buffer = 0u32;
+ let mut bits = 0u8;
+ for byte in value.trim().bytes() {
+ let digit = base64_url_digit(byte)?;
+ buffer = (buffer << 6) | u32::from(digit);
+ bits += 6;
+ while bits >= 8 {
+ bits -= 8;
+ decoded.push(((buffer >> bits) & 0xff) as u8);
+ buffer &= (1u32 << bits) - 1;
+ }
+ }
+ if bits > 0 && buffer != 0 {
+ return None;
+ }
+ if decoded.len() == 16 {
+ Uuid::from_slice(decoded.as_slice()).ok()
+ } else {
+ None
+ }
+}
+
+fn base64_url_digit(byte: u8) -> Option<u8> {
+ match byte {
+ b'A'..=b'Z' => Some(byte - b'A'),
+ b'a'..=b'z' => Some(byte - b'a' + 26),
+ b'0'..=b'9' => Some(byte - b'0' + 52),
+ b'-' => Some(62),
+ b'_' => Some(63),
+ _ => None,
+ }
+}
+
fn string_at(value: &Value, path: &[&str]) -> Option<String> {
let mut cursor = value;
for segment in path {
@@ -972,7 +1054,9 @@ mod tests {
LocalRecordStatus, PublishOutboxStatus, SourceRuntime,
};
use radroots_sql_core::SqliteExecutor;
+ use rusqlite::params;
use serde_json::json;
+ use uuid::Uuid;
use super::{KIND_FARM, KIND_LISTING, deterministic_farm_id, deterministic_product_id};
use crate::{AppSqliteStore, DatabaseTarget};
@@ -1091,6 +1175,91 @@ mod tests {
}
}
+ fn app_d_tag_from_uuid(uuid: Uuid) -> String {
+ const ALPHABET: &[u8; 64] =
+ b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
+ let bytes = uuid.as_bytes();
+ let mut output = String::with_capacity((bytes.len() * 4).div_ceil(3));
+ let mut chunks = bytes.chunks_exact(3);
+ for chunk in &mut chunks {
+ output.push(ALPHABET[(chunk[0] >> 2) as usize] as char);
+ output.push(
+ ALPHABET[(((chunk[0] & 0b0000_0011) << 4) | (chunk[1] >> 4)) as usize] as char,
+ );
+ output.push(
+ ALPHABET[(((chunk[1] & 0b0000_1111) << 2) | (chunk[2] >> 6)) as usize] as char,
+ );
+ output.push(ALPHABET[(chunk[2] & 0b0011_1111) as usize] as char);
+ }
+ match chunks.remainder() {
+ [one] => {
+ output.push(ALPHABET[(one >> 2) as usize] as char);
+ output.push(ALPHABET[((one & 0b0000_0011) << 4) as usize] as char);
+ }
+ [one, two] => {
+ output.push(ALPHABET[(one >> 2) as usize] as char);
+ output.push(ALPHABET[(((one & 0b0000_0011) << 4) | (two >> 4)) as usize] as char);
+ output.push(ALPHABET[((two & 0b0000_1111) << 2) as usize] as char);
+ }
+ [] => {}
+ _ => unreachable!(),
+ }
+ output
+ }
+
+ fn app_local_work_record(
+ record_id: &str,
+ farm_key: &str,
+ payload: serde_json::Value,
+ ) -> LocalEventRecordInput {
+ let mut record = local_work_record(record_id, farm_key, payload);
+ record.source_runtime = SourceRuntime::App;
+ record.owner_pubkey = Some("app-seller-pubkey".to_owned());
+ record
+ }
+
+ fn seed_app_projection(app_store: &AppSqliteStore, farm_id: Uuid, product_id: Uuid) {
+ app_store
+ .connection()
+ .execute(
+ "INSERT INTO farms (id, display_name, readiness, created_at, updated_at)
+ VALUES (?1, 'Origin Farm', 'ready', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')",
+ params![farm_id.to_string()],
+ )
+ .expect("seed origin farm");
+ app_store
+ .connection()
+ .execute(
+ "INSERT INTO products (
+ id,
+ farm_id,
+ title,
+ subtitle,
+ status,
+ unit_label,
+ price_minor_units,
+ price_currency,
+ stock_count,
+ availability_window_id,
+ updated_at
+ ) VALUES (
+ ?1,
+ ?2,
+ 'Origin Eggs',
+ 'Seeded product',
+ 'draft',
+ 'each',
+ 400,
+ 'USD',
+ 3,
+ NULL,
+ '2026-01-01T00:00:00Z'
+ )",
+ params![product_id.to_string(), farm_id.to_string()],
+ )
+ .expect("seed origin product");
+ }
+
#[test]
fn imports_cli_local_work_into_app_farm_and_product_projection() {
let app_store =
@@ -1771,31 +1940,180 @@ mod tests {
}
#[test]
- fn app_authored_shared_records_are_self_observed_without_unsupported_imports() {
+ fn app_authored_shared_records_replay_into_fresh_store_without_origin_duplicates() {
+ let events = local_events_store();
+ let farm_uuid = Uuid::from_u128(0x11111111111111111111111111111111);
+ let product_uuid = Uuid::from_u128(0x22222222222222222222222222222222);
+ let farm_key = app_d_tag_from_uuid(farm_uuid);
+ let listing_key = app_d_tag_from_uuid(product_uuid);
+ let app_farm_record = app_local_work_record(
+ "app:local_work:farm",
+ farm_key.as_str(),
+ json!({
+ "record_kind": "farm_config_v1",
+ "document": {
+ "selection": {
+ "account": "seller-account",
+ "farm_d_tag": farm_key
+ },
+ "profile": {
+ "display_name": "App Farm"
+ },
+ "farm": {
+ "d_tag": farm_key,
+ "name": "App Farm",
+ "location": {
+ "primary": "app farmstand"
+ }
+ },
+ "listing_defaults": {
+ "delivery_method": "pickup",
+ "location": {
+ "primary": "app farmstand"
+ }
+ }
+ }
+ }),
+ );
+ let mut app_listing_record = app_local_work_record(
+ "app:local_work:listing",
+ farm_key.as_str(),
+ json!({
+ "record_kind": "listing_draft_v1",
+ "document": {
+ "listing": {
+ "d_tag": listing_key,
+ "farm_d_tag": farm_key
+ },
+ "seller_actor": {
+ "account_id": "seller-account",
+ "pubkey": "app-seller-pubkey"
+ },
+ "product": {
+ "key": listing_key,
+ "title": "App Eggs",
+ "summary": "Fresh app-origin eggs"
+ },
+ "primary_bin": {
+ "quantity_unit": "each",
+ "price_amount": "7",
+ "price_currency": "USD"
+ },
+ "inventory": {
+ "available": "12"
+ }
+ }
+ }),
+ );
+ app_listing_record.listing_addr = Some(format!("30402:app-seller-pubkey:{listing_key}"));
+ events
+ .append_record(&app_farm_record)
+ .expect("append app farm local work");
+ events
+ .append_record(&app_listing_record)
+ .expect("append app listing local work");
+
+ let origin_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open origin app sqlite store");
+ seed_app_projection(&origin_store, farm_uuid, product_uuid);
+ let origin_report = origin_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import shared local events into origin store");
+ let origin_second_report = origin_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import unchanged shared local events into origin store");
+ let origin_product_count: i64 = origin_store
+ .connection()
+ .query_row("SELECT COUNT(*) FROM products", [], |row| row.get(0))
+ .expect("origin product count");
+ let origin_product: (String, String, String, Option<i64>, Option<i64>) = origin_store
+ .connection()
+ .query_row(
+ "SELECT id, farm_id, title, price_minor_units, stock_count FROM products",
+ [],
+ |row| {
+ Ok((
+ row.get(0)?,
+ row.get(1)?,
+ row.get(2)?,
+ row.get(3)?,
+ row.get(4)?,
+ ))
+ },
+ )
+ .expect("load origin product");
+ let origin_imports = origin_store
+ .load_local_interop_records()
+ .expect("load origin imported records");
+
+ assert_eq!(origin_report.scanned_records, 2);
+ assert_eq!(origin_report.imported_records, 2);
+ assert_eq!(origin_report.skipped_records, 0);
+ assert_eq!(origin_report.self_observed_records, 0);
+ assert_eq!(origin_second_report.scanned_records, 0);
+ assert_eq!(origin_product_count, 1);
+ assert_eq!(origin_product.0, product_uuid.to_string());
+ assert_eq!(origin_product.1, farm_uuid.to_string());
+ assert_eq!(origin_product.2, "App Eggs");
+ assert_eq!(origin_product.3, Some(700));
+ assert_eq!(origin_product.4, Some(12));
+ assert_eq!(origin_imports.len(), 2);
+
+ let fresh_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open fresh app sqlite store");
+ let fresh_report = fresh_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import shared local events into fresh store");
+ let fresh_product_count: i64 = fresh_store
+ .connection()
+ .query_row("SELECT COUNT(*) FROM products", [], |row| row.get(0))
+ .expect("fresh product count");
+ let fresh_product: (String, String, String) = fresh_store
+ .connection()
+ .query_row("SELECT id, farm_id, title FROM products", [], |row| {
+ Ok((row.get(0)?, row.get(1)?, row.get(2)?))
+ })
+ .expect("load fresh product");
+ let fresh_imports = fresh_store
+ .load_local_interop_records()
+ .expect("load fresh imported records");
+
+ assert_eq!(fresh_report.scanned_records, 2);
+ assert_eq!(fresh_report.imported_records, 2);
+ assert_eq!(fresh_report.skipped_records, 0);
+ assert_eq!(fresh_report.self_observed_records, 0);
+ assert_eq!(fresh_product_count, 1);
+ assert_eq!(fresh_product.0, product_uuid.to_string());
+ assert_eq!(fresh_product.1, farm_uuid.to_string());
+ assert_eq!(fresh_product.2, "App Eggs");
+ assert_eq!(fresh_imports.len(), 2);
+ }
+
+ #[test]
+ fn app_authored_records_with_non_uuid_tags_do_not_fallback_to_cli_identity() {
let app_store =
AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
let events = local_events_store();
- let mut app_record = local_work_record(
- "app:local_work:farm",
- "AAAAAAAAAAAAAAAAAAAAAA",
+ let app_record = app_local_work_record(
+ "app:local_work:farm:invalid-tag",
+ "not-a-uuid-d-tag",
json!({
"record_kind": "farm_config_v1",
"document": {
"selection": {
"account": "seller-account",
- "farm_d_tag": "AAAAAAAAAAAAAAAAAAAAAA"
+ "farm_d_tag": "not-a-uuid-d-tag"
},
"profile": {
"display_name": "App Farm"
},
"farm": {
- "d_tag": "AAAAAAAAAAAAAAAAAAAAAA",
+ "d_tag": "not-a-uuid-d-tag",
"name": "App Farm"
}
}
}),
);
- app_record.source_runtime = SourceRuntime::App;
events
.append_record(&app_record)
.expect("append app local work");
@@ -1806,11 +2124,17 @@ mod tests {
let imported = app_store
.load_local_interop_records()
.expect("load imported records");
+ let farm_count: i64 = app_store
+ .connection()
+ .query_row("SELECT COUNT(*) FROM farms", [], |row| row.get(0))
+ .expect("farm count");
assert_eq!(report.scanned_records, 1);
assert_eq!(report.imported_records, 0);
- assert_eq!(report.skipped_records, 0);
- assert_eq!(report.self_observed_records, 1);
- assert!(imported.is_empty());
+ assert_eq!(report.skipped_records, 1);
+ assert_eq!(report.self_observed_records, 0);
+ assert_eq!(imported.len(), 1);
+ assert_eq!(imported[0].projected_kind, "unsupported");
+ assert_eq!(farm_count, 0);
}
}