commit c18f87f86923b1c84cd7b7bf556f4ec0d79c4e4a
parent f817aafd3d65a735800393c3ac1d5a7486a6d94e
Author: triesap <tyson@radroots.org>
Date: Sat, 23 May 2026 17:29:42 +0000
sqlite: move local interop cursor into app state
- add app-owned local interop projection cursor storage
- stop advancing shared local-events app projection cursors
- cover same-store idempotency and fresh-store replay
- validate focused local interop tests and app check
Diffstat:
3 files changed, 118 insertions(+), 34 deletions(-)
diff --git a/crates/shared/sqlite/migrations/0013_local_interop_projection_cursor.sql b/crates/shared/sqlite/migrations/0013_local_interop_projection_cursor.sql
@@ -0,0 +1,5 @@
+CREATE TABLE local_interop_projection_cursor (
+ consumer_id TEXT PRIMARY KEY NOT NULL,
+ last_change_seq INTEGER NOT NULL CHECK (last_change_seq >= 0),
+ updated_at TEXT NOT NULL
+);
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -1,8 +1,4 @@
-use std::{
- fs,
- path::Path,
- time::{SystemTime, UNIX_EPOCH},
-};
+use std::{fs, path::Path};
use radroots_app_models::{
FarmId, FarmOrderMethod, FarmReadiness, FarmSetupDraft, FarmSetupProjection, FarmSummary,
@@ -21,7 +17,7 @@ use crate::farm_setup::AppFarmSetupRepository;
use crate::{AppSqliteError, AppSqliteStore};
const LOCAL_EVENTS_BATCH_LIMIT: u32 = 500;
-const APP_LOCAL_EVENTS_CONSUMER_ID: &str = "radroots_app_sqlite_projection_v1";
+const APP_LOCAL_INTEROP_CURSOR_ID: &str = "radroots_app_sqlite_projection_v1";
const KIND_FARM: i64 = 30340;
const KIND_LISTING: i64 = 30402;
const KIND_LISTING_DRAFT: i64 = 30403;
@@ -97,13 +93,7 @@ impl<'a> AppLocalInteropRepository<'a> {
E: SqlExecutor,
{
let mut report = AppLocalInteropImportReport::default();
- let mut after_change_seq = store
- .get_cursor(APP_LOCAL_EVENTS_CONSUMER_ID)
- .map_err(|source| AppSqliteError::LocalEvents {
- operation: "read shared local event cursor",
- source,
- })?
- .map_or(0, |cursor| cursor.last_change_seq);
+ let mut after_change_seq = self.last_imported_change_seq()?;
loop {
let records = store
.list_records_changed_after(after_change_seq, LOCAL_EVENTS_BATCH_LIMIT)
@@ -127,16 +117,7 @@ impl<'a> AppLocalInteropRepository<'a> {
}
}
if let Some(last_change_seq) = report.last_change_seq {
- store
- .advance_cursor(
- APP_LOCAL_EVENTS_CONSUMER_ID,
- last_change_seq,
- current_time_ms()?,
- )
- .map_err(|source| AppSqliteError::LocalEvents {
- operation: "advance shared local event cursor",
- source,
- })?;
+ self.advance_import_cursor(last_change_seq)?;
}
Ok(report)
}
@@ -201,6 +182,47 @@ impl<'a> AppLocalInteropRepository<'a> {
.collect()
}
+ fn last_imported_change_seq(&self) -> Result<i64, AppSqliteError> {
+ match self.connection.query_row(
+ "SELECT last_change_seq
+ FROM local_interop_projection_cursor
+ WHERE consumer_id = ?1
+ LIMIT 1",
+ [APP_LOCAL_INTEROP_CURSOR_ID],
+ |row| row.get::<_, i64>(0),
+ ) {
+ Ok(last_change_seq) => Ok(last_change_seq),
+ Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
+ Err(source) => Err(AppSqliteError::Query {
+ operation: "read app local interop projection cursor",
+ source,
+ }),
+ }
+ }
+
+ fn advance_import_cursor(&self, last_change_seq: i64) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute(
+ "INSERT INTO local_interop_projection_cursor (
+ consumer_id,
+ last_change_seq,
+ updated_at
+ ) VALUES (?1, ?2, strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
+ ON CONFLICT(consumer_id) DO UPDATE SET
+ last_change_seq = max(
+ local_interop_projection_cursor.last_change_seq,
+ excluded.last_change_seq
+ ),
+ updated_at = excluded.updated_at",
+ params![APP_LOCAL_INTEROP_CURSOR_ID, last_change_seq],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "advance app local interop projection cursor",
+ source,
+ })?;
+ Ok(())
+ }
+
fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> {
if record.source_runtime == SourceRuntime::App {
return Ok(ImportOutcome::SelfObserved);
@@ -728,17 +750,6 @@ struct ProductProjection {
stock_count: Option<u32>,
}
-fn current_time_ms() -> Result<i64, AppSqliteError> {
- let duration = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|_| {
- AppSqliteError::InvalidProjection {
- reason: "current local interop timestamp must be after unix epoch",
- }
- })?;
- i64::try_from(duration.as_millis()).map_err(|_| AppSqliteError::InvalidProjection {
- reason: "current local interop timestamp must fit i64 milliseconds",
- })
-}
-
fn deterministic_farm_id(owner_pubkey: Option<&str>, farm_key: &str) -> FarmId {
FarmId::from(deterministic_uuid(
"radroots-cli-farm",
@@ -1168,6 +1179,12 @@ mod tests {
assert_eq!(second_report.imported_records, 0);
assert_eq!(second_report.skipped_records, 0);
assert_eq!(second_report.self_observed_records, 0);
+ assert!(
+ events
+ .get_cursor("radroots_app_sqlite_projection_v1")
+ .expect("read shared cursor")
+ .is_none()
+ );
let imported = app_store
.load_local_interop_records()
.expect("load imported records");
@@ -1210,6 +1227,64 @@ mod tests {
}
#[test]
+ fn fresh_app_store_replays_existing_shared_records_after_another_app_imported_them() {
+ let events = local_events_store();
+ let farm_key = "AAAAAAAAAAAAAAAAAAAAAA";
+ events
+ .append_record(&local_work_record(
+ "cli:local_work:farm",
+ farm_key,
+ json!({
+ "record_kind": "farm_config_v1",
+ "document": {
+ "selection": {
+ "account": "seller-account",
+ "farm_d_tag": farm_key
+ },
+ "profile": {
+ "name": "Green Farm",
+ "display_name": "Green Farm"
+ },
+ "farm": {
+ "d_tag": farm_key,
+ "name": "Green Farm",
+ "location": {
+ "primary": "farmstand"
+ }
+ }
+ }
+ }),
+ ))
+ .expect("append farm local work");
+ let first_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open first app sqlite store");
+ let first_report = first_store
+ .import_shared_local_events_from_store(&events)
+ .expect("first app imports shared local events");
+ let second_same_store_report = first_store
+ .import_shared_local_events_from_store(&events)
+ .expect("first app imports unchanged shared local events");
+ let second_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open second app sqlite store");
+ let fresh_store_report = second_store
+ .import_shared_local_events_from_store(&events)
+ .expect("fresh app imports shared local events");
+
+ assert_eq!(first_report.scanned_records, 1);
+ assert_eq!(first_report.imported_records, 1);
+ assert_eq!(second_same_store_report.scanned_records, 0);
+ assert_eq!(second_same_store_report.imported_records, 0);
+ assert_eq!(fresh_store_report.scanned_records, 1);
+ assert_eq!(fresh_store_report.imported_records, 1);
+ assert!(
+ events
+ .get_cursor("radroots_app_sqlite_projection_v1")
+ .expect("read shared cursor")
+ .is_none()
+ );
+ }
+
+ #[test]
fn imports_signed_listing_tags_into_existing_local_product_projection() {
let app_store =
AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs
@@ -52,6 +52,10 @@ const MIGRATIONS: &[Migration] = &[
version: 12,
sql: include_str!("../migrations/0012_local_interop_imports.sql"),
},
+ Migration {
+ version: 13,
+ sql: include_str!("../migrations/0013_local_interop_projection_cursor.sql"),
+ },
];
pub fn latest_schema_version() -> u32 {