commit 546e32d47c218de3cf8906a18d17752e8cf98b8c
parent 5474c4175b728768fe21345524f884eedb29e5cc
Author: triesap <tyson@radroots.org>
Date: Sat, 23 May 2026 09:20:15 +0000
sqlite: make local interop change-cursor aware
- read and advance shared local-events cursors by change_seq
- ignore app-authored shared records as self-observed records
- add importer coverage for no-op refresh and outbox updates
- update runtime test helper for insert-order listing api
Diffstat:
2 files changed, 134 insertions(+), 14 deletions(-)
diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs
@@ -10494,7 +10494,7 @@ mod tests {
SqliteExecutor::open(database_path.as_path()).expect("open shared local events db");
let store = LocalEventsStore::new(executor);
store
- .list_records_after(0, 100)
+ .list_records_after_seq(0, 100)
.expect("shared local records should list")
}
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -31,7 +31,8 @@ pub struct AppLocalInteropImportReport {
pub scanned_records: u32,
pub imported_records: u32,
pub skipped_records: u32,
- pub last_seq: Option<i64>,
+ pub self_observed_records: u32,
+ pub last_change_seq: Option<i64>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -96,31 +97,42 @@ impl<'a> AppLocalInteropRepository<'a> {
E: SqlExecutor,
{
let mut report = AppLocalInteropImportReport::default();
- let mut after_seq = 0i64;
+ let mut after_change_seq = store
+ .get_cursor(APP_LOCAL_EVENTS_CONSUMER_ID)
+ .map_err(|source| AppSqliteError::LocalEvents {
+ operation: "read shared local event cursor",
+ source,
+ })?
+ .map_or(0, |cursor| cursor.last_change_seq);
loop {
let records = store
- .list_records_after(after_seq, LOCAL_EVENTS_BATCH_LIMIT)
+ .list_records_changed_after(after_change_seq, LOCAL_EVENTS_BATCH_LIMIT)
.map_err(|source| AppSqliteError::LocalEvents {
- operation: "list shared local event records",
+ operation: "list changed shared local event records",
source,
})?;
let batch_len = records.len();
for record in records {
- after_seq = record.seq;
+ after_change_seq = record.change_seq;
report.scanned_records += 1;
- report.last_seq = Some(record.seq);
+ report.last_change_seq = Some(record.change_seq);
match self.import_record(&record)? {
ImportOutcome::Imported => report.imported_records += 1,
ImportOutcome::Skipped => report.skipped_records += 1,
+ ImportOutcome::SelfObserved => report.self_observed_records += 1,
}
}
if batch_len < LOCAL_EVENTS_BATCH_LIMIT as usize {
break;
}
}
- if let Some(last_seq) = report.last_seq {
+ if let Some(last_change_seq) = report.last_change_seq {
store
- .advance_cursor(APP_LOCAL_EVENTS_CONSUMER_ID, last_seq, current_time_ms()?)
+ .advance_cursor(
+ APP_LOCAL_EVENTS_CONSUMER_ID,
+ last_change_seq,
+ current_time_ms()?,
+ )
.map_err(|source| AppSqliteError::LocalEvents {
operation: "advance shared local event cursor",
source,
@@ -191,8 +203,7 @@ impl<'a> AppLocalInteropRepository<'a> {
fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> {
if record.source_runtime == SourceRuntime::App {
- self.record_import(record, "unsupported", None)?;
- return Ok(ImportOutcome::Skipped);
+ return Ok(ImportOutcome::SelfObserved);
}
let projection = match record.family {
LocalRecordFamily::LocalWork => self.import_local_work(record)?,
@@ -695,6 +706,7 @@ impl AppSqliteStore {
enum ImportOutcome {
Imported,
Skipped,
+ SelfObserved,
}
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -945,8 +957,8 @@ fn farm_readiness_storage_key(readiness: FarmReadiness) -> &'static str {
#[cfg(test)]
mod tests {
use radroots_local_events::{
- LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, LocalRecordStatus,
- PublishOutboxStatus, SourceRuntime,
+ LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily,
+ LocalRecordStatus, PublishOutboxStatus, SourceRuntime,
};
use radroots_sql_core::SqliteExecutor;
use serde_json::json;
@@ -1151,7 +1163,11 @@ mod tests {
assert_eq!(report.scanned_records, 2);
assert_eq!(report.imported_records, 2);
- assert_eq!(second_report.imported_records, 2);
+ assert!(report.last_change_seq.is_some());
+ assert_eq!(second_report.scanned_records, 0);
+ assert_eq!(second_report.imported_records, 0);
+ assert_eq!(second_report.skipped_records, 0);
+ assert_eq!(second_report.self_observed_records, 0);
let imported = app_store
.load_local_interop_records()
.expect("load imported records");
@@ -1464,4 +1480,108 @@ mod tests {
}));
}
}
+
+ #[test]
+ fn observes_outbox_updates_after_first_import_without_replaying_unchanged_rows() {
+ let app_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
+ let events = local_events_store();
+ let farm_key = "AAAAAAAAAAAAAAAAAAAAAA";
+ let listing_key = "BBBBBBBBBBBBBBBBBBBBBB";
+ events
+ .append_record(&signed_listing_record_with_publish_state(
+ "pending-listing",
+ farm_key,
+ listing_key,
+ "active",
+ LocalRecordStatus::PendingPublish,
+ PublishOutboxStatus::Pending,
+ ))
+ .expect("append pending signed listing");
+ let first_report = app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import pending listing");
+ let unchanged_report = app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import unchanged listing");
+
+ assert_eq!(first_report.scanned_records, 1);
+ assert_eq!(first_report.imported_records, 1);
+ assert_eq!(unchanged_report.scanned_records, 0);
+
+ events
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: "pending-listing".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: Some("relay-set".to_owned()),
+ relay_delivery_json: Some(json!({
+ "state": "acknowledged",
+ "acknowledged_relays": ["ws://127.0.0.1:1234/"]
+ })),
+ updated_at_ms: 1200,
+ })
+ .expect("update listing outbox");
+ let changed_report = app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import updated listing");
+ let product_status: String = app_store
+ .connection()
+ .query_row("SELECT status FROM products", [], |row| row.get(0))
+ .expect("load product status");
+ let imported = app_store
+ .load_local_interop_records()
+ .expect("load imported records");
+
+ assert_eq!(changed_report.scanned_records, 1);
+ assert_eq!(changed_report.imported_records, 1);
+ assert_eq!(product_status, "published");
+ assert_eq!(imported.len(), 1);
+ assert_eq!(imported[0].local_status, "published");
+ assert_eq!(imported[0].outbox_status, "acknowledged");
+ }
+
+ #[test]
+ fn app_authored_shared_records_are_self_observed_without_unsupported_imports() {
+ let app_store =
+ AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");
+ let events = local_events_store();
+ let mut app_record = local_work_record(
+ "app:local_work:farm",
+ "AAAAAAAAAAAAAAAAAAAAAA",
+ json!({
+ "record_kind": "farm_config_v1",
+ "document": {
+ "selection": {
+ "account": "seller-account",
+ "farm_d_tag": "AAAAAAAAAAAAAAAAAAAAAA"
+ },
+ "profile": {
+ "display_name": "App Farm"
+ },
+ "farm": {
+ "d_tag": "AAAAAAAAAAAAAAAAAAAAAA",
+ "name": "App Farm"
+ }
+ }
+ }),
+ );
+ app_record.source_runtime = SourceRuntime::App;
+ events
+ .append_record(&app_record)
+ .expect("append app local work");
+
+ let report = app_store
+ .import_shared_local_events_from_store(&events)
+ .expect("import shared local events");
+ let imported = app_store
+ .load_local_interop_records()
+ .expect("load imported records");
+
+ assert_eq!(report.scanned_records, 1);
+ assert_eq!(report.imported_records, 0);
+ assert_eq!(report.skipped_records, 0);
+ assert_eq!(report.self_observed_records, 1);
+ assert!(imported.is_empty());
+ }
}