app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 85b33655227aa897c6de45fc887d067e8e810e47
parent 66b992436383e1faf7e462dc170f44929e96a310
Author: triesap <tyson@radroots.org>
Date:   Sat, 23 May 2026 03:45:37 +0000

local_interop: import shared CLI records into app state

- add the app sqlite local interop import table and repository
- project CLI farm and listing records into app farm and product state
- import shared local events during desktop runtime bootstrap
- cover local farm and listing import with sqlite projection tests

Diffstat:
MCargo.lock | 81++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
MCargo.toml | 6++++--
Mcrates/launchers/desktop/src/runtime.rs | 22++++++++++++++++++++++
Mcrates/shared/sqlite/Cargo.toml | 3+++
Acrates/shared/sqlite/migrations/0012_local_interop_imports.sql | 40++++++++++++++++++++++++++++++++++++++++
Mcrates/shared/sqlite/src/error.rs | 14++++++++++++++
Mcrates/shared/sqlite/src/lib.rs | 4++++
Acrates/shared/sqlite/src/local_interop.rs | 972+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/shared/sqlite/src/migrations.rs | 4++++
9 files changed, 1135 insertions(+), 11 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2023,6 +2023,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" [[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + +[[package]] name = "font-types" version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2778,7 +2784,16 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", ] [[package]] @@ -2798,11 +2813,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.9.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" dependencies = [ - "hashbrown 0.14.5", + "hashbrown 0.16.1", ] [[package]] @@ -3505,9 +3520,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.30.1" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +checksum = "b1f111c8c41e7c61a49cd34e44c7619462967221a6443b0ec299e0ac30cfb9b1" dependencies = [ "cc", "pkg-config", @@ -5116,7 +5131,10 @@ version = "0.1.0" dependencies = [ "radroots_app_models", "radroots_app_sync", + "radroots_local_events", + "radroots_sql_core", "rusqlite", + "serde_json", "thiserror 2.0.18", "uuid", ] @@ -5186,6 +5204,16 @@ dependencies = [ ] [[package]] +name = "radroots_local_events" +version = "0.1.0-alpha.2" +dependencies = [ + "radroots_sql_core", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] name = "radroots_log" version = "0.1.0-alpha.2" dependencies = [ @@ -5299,6 +5327,18 @@ name = "radroots_secret_vault" version = "0.1.0-alpha.2" [[package]] +name = "radroots_sql_core" +version = "0.1.0-alpha.2" +dependencies = [ + "chrono", + "rusqlite", + "serde", + "serde_json", + "thiserror 1.0.69", + "uuid", +] + +[[package]] name = "rand" version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -5613,17 +5653,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" [[package]] +name = "rsqlite-vfs" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c51c9ae4df8a7fba42103df5c621fa3c37eccf3a3c650879e90fc48b11cc192c" +dependencies = [ + "hashbrown 0.16.1", + "thiserror 2.0.18", +] + +[[package]] name = "rusqlite" -version = "0.32.1" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +checksum = "a0d2b0146dd9661bf67bb107c0bb2a55064d556eeb3fc314151b957f313bcd4e" dependencies = [ "bitflags 2.11.1", "fallible-iterator", "fallible-streaming-iterator", - "hashlink 0.9.1", + "hashlink 0.11.0", "libsqlite3-sys", "smallvec", + "sqlite-wasm-rs", ] [[package]] @@ -6384,6 +6435,18 @@ dependencies = [ ] [[package]] +name = "sqlite-wasm-rs" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd578e94101503d97e2b286bbf8db2135035ca24b2ce4cbf3f9e2fb2bbf1eee" +dependencies = [ + "cc", + "js-sys", + "rsqlite-vfs", + "wasm-bindgen", +] + +[[package]] name = "stable_deref_trait" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -29,11 +29,13 @@ gpui-component = "0.5.1" gpui-component-assets = "0.5.1" mf2_i18n = { git = "https://github.com/triesap/mf2_i18n.git", rev = "e2ad58d5863d9dd98f2f38d1f08b2140bf34b0a1" } radroots_identity = { path = "../lib/crates/identity" } +radroots_local_events = { path = "../lib/crates/local_events", features = ["native"] } radroots_nostr = { path = "../lib/crates/nostr", features = ["client"] } radroots_nostr_accounts = { path = "../lib/crates/nostr_accounts" } radroots_nostr_connect = { path = "../lib/crates/nostr_connect" } radroots_protected_store = { path = "../lib/crates/protected_store" } radroots_secret_vault = { path = "../lib/crates/secret_vault", default-features = false, features = ["std"] } +radroots_sql_core = { path = "../lib/crates/sql_core", features = ["native"] } radroots_app_core = { path = "crates/shared/core", version = "0.1.0" } radroots_app_i18n = { path = "crates/shared/i18n", version = "0.1.0" } radroots_app_models = { path = "crates/shared/models", version = "0.1.0" } @@ -41,7 +43,7 @@ radroots_app_sqlite = { path = "crates/shared/sqlite", version = "0.1.0" } radroots_app_state = { path = "crates/shared/state", version = "0.1.0" } radroots_app_sync = { path = "crates/shared/sync", version = "0.1.0" } radroots_app_ui = { path = "crates/shared/ui", version = "0.1.0" } -rusqlite = { version = "0.32", features = ["bundled"] } +rusqlite = { version = "0.39", features = ["bundled"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "2" @@ -49,7 +51,7 @@ toml = "0.8" tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } -uuid = { version = "1", features = ["serde", "v7"] } +uuid = { version = "1", features = ["serde", "v5", "v7"] } [workspace.lints.rust] unsafe_code = "forbid" diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -75,6 +75,8 @@ use crate::remote_signer::{ }; const APP_DATABASE_FILE_NAME: &str = "app.sqlite3"; +const SHARED_LOCAL_EVENTS_DIR: &str = "local_events"; +const SHARED_LOCAL_EVENTS_DB_FILE_NAME: &str = "local_events.sqlite"; const SYNC_TRANSPORT_UNAVAILABLE_MESSAGE: &str = "remote sync transport is not configured"; #[derive(Debug, Default)] @@ -917,6 +919,9 @@ impl DesktopAppRuntimeState { } let database_path = paths.app.data.join(APP_DATABASE_FILE_NAME); let sqlite_store = AppSqliteStore::open(DatabaseTarget::Path(database_path.clone()))?; + let shared_local_events_database_path = shared_local_events_database_path(&paths)?; + let _ = sqlite_store + .import_shared_local_events_from_path(shared_local_events_database_path.as_path())?; let database_schema_version = sqlite_store.schema_version()?; let mut state_store = AppStateStore::load(AppStatePersistenceRepository::file_backed( paths.app.data.join(APP_STATE_FILE_NAME), @@ -3545,6 +3550,23 @@ enum DesktopAppRuntimeBootstrapError { Sqlite(#[from] AppSqliteError), #[error(transparent)] State(#[from] AppStateStoreError), + #[error("desktop app data root must be nested under the Radroots data root")] + SharedLocalEventsPath, +} + +fn shared_local_events_database_path( + paths: &AppDesktopRuntimePaths, +) -> Result<PathBuf, DesktopAppRuntimeBootstrapError> { + let data_root = paths + .app + .data + .parent() + .and_then(|apps_root| apps_root.parent()) + .ok_or(DesktopAppRuntimeBootstrapError::SharedLocalEventsPath)?; + Ok(data_root + .join("shared") + .join(SHARED_LOCAL_EVENTS_DIR) + .join(SHARED_LOCAL_EVENTS_DB_FILE_NAME)) } fn load_selected_account_context( diff --git a/crates/shared/sqlite/Cargo.toml b/crates/shared/sqlite/Cargo.toml @@ -8,9 +8,12 @@ license.workspace = true publish = false [dependencies] +radroots_local_events.workspace = true radroots_app_models.workspace = true radroots_app_sync.workspace = true +radroots_sql_core.workspace = true rusqlite.workspace = true +serde_json.workspace = true thiserror.workspace = true uuid.workspace = true diff --git a/crates/shared/sqlite/migrations/0012_local_interop_imports.sql b/crates/shared/sqlite/migrations/0012_local_interop_imports.sql @@ -0,0 +1,40 @@ +CREATE TABLE local_interop_imports ( + record_id TEXT PRIMARY KEY NOT NULL, + local_seq INTEGER NOT NULL CHECK (local_seq >= 0), + record_family TEXT NOT NULL CHECK (record_family IN ('local_work', 'signed_event')), + local_status TEXT NOT NULL CHECK ( + local_status IN ( + 'local_draft', + 'local_saved', + 'pending_publish', + 'published', + 'failed', + 'conflict' + ) + ), + source_runtime TEXT NOT NULL, + owner_account_id TEXT, + owner_pubkey TEXT, + farm_key TEXT, + listing_addr TEXT, + projected_kind TEXT NOT NULL CHECK ( + projected_kind IN ('farm', 'listing', 'signed_event', 'unsupported') + ), + projected_id TEXT, + event_id TEXT, + event_kind INTEGER, + outbox_status TEXT NOT NULL CHECK ( + outbox_status IN ('none', 'pending', 'acknowledged', 'failed') + ), + relay_delivery_json TEXT, + imported_at TEXT NOT NULL +); + +CREATE INDEX idx_local_interop_imports_seq + ON local_interop_imports(local_seq); + +CREATE INDEX idx_local_interop_imports_owner_status + ON local_interop_imports(owner_account_id, local_status, local_seq DESC); + +CREATE INDEX idx_local_interop_imports_projected + ON local_interop_imports(projected_kind, projected_id); diff --git a/crates/shared/sqlite/src/error.rs b/crates/shared/sqlite/src/error.rs @@ -1,5 +1,7 @@ use std::{io, path::PathBuf}; +use radroots_local_events::LocalEventsError; +use radroots_sql_core::SqlError; use thiserror::Error; #[derive(Debug, Error)] @@ -79,4 +81,16 @@ pub enum AppSqliteError { DecodeEnum { field: &'static str, value: String }, #[error("invalid farm-rules projection: {reason}")] InvalidProjection { reason: &'static str }, + #[error("failed to access shared local events store during {operation}")] + LocalEventsSql { + operation: &'static str, + #[source] + source: SqlError, + }, + #[error("failed to import shared local event records during {operation}")] + LocalEvents { + operation: &'static str, + #[source] + source: LocalEventsError, + }, } diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs @@ -6,6 +6,7 @@ mod buyer; mod error; mod farm_rules; mod farm_setup; +mod local_interop; mod migrations; mod orders; mod products; @@ -40,6 +41,9 @@ pub use buyer::{AppBuyerRepository, BuyerRepeatDemandApplyOutcome}; pub use error::AppSqliteError; pub use farm_rules::{AppFarmRulesRepository, derive_farm_rules_readiness}; pub use farm_setup::AppFarmSetupRepository; +pub use local_interop::{ + AppLocalInteropImportReport, AppLocalInteropRepository, StoredLocalInteropRecord, +}; pub use migrations::latest_schema_version; pub use orders::AppOrdersRepository; pub use products::AppProductsRepository; diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs @@ -0,0 +1,972 @@ +use std::{ + fs, + path::Path, + time::{SystemTime, UNIX_EPOCH}, +}; + +use radroots_app_models::{ + FarmId, FarmOrderMethod, FarmReadiness, FarmSetupDraft, FarmSetupProjection, FarmSummary, + ProductId, ProductStatus, +}; +use radroots_local_events::{ + LocalEventRecord, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, + SourceRuntime, +}; +use radroots_sql_core::{SqlExecutor, SqliteExecutor}; +use rusqlite::{Connection, params}; +use serde_json::Value; +use uuid::Uuid; + +use crate::farm_setup::AppFarmSetupRepository; +use crate::{AppSqliteError, AppSqliteStore}; + +const LOCAL_EVENTS_BATCH_LIMIT: u32 = 500; +const APP_LOCAL_EVENTS_CONSUMER_ID: &str = "radroots_app_sqlite_projection_v1"; +const KIND_FARM: i64 = 30340; +const KIND_LISTING: i64 = 30402; +const KIND_LISTING_DRAFT: i64 = 30403; + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct AppLocalInteropImportReport { + pub scanned_records: u32, + pub imported_records: u32, + pub skipped_records: u32, + pub last_seq: Option<i64>, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StoredLocalInteropRecord { + pub record_id: String, + pub local_seq: i64, + pub record_family: String, + pub local_status: String, + pub source_runtime: String, + pub owner_account_id: Option<String>, + pub owner_pubkey: Option<String>, + pub farm_key: Option<String>, + pub listing_addr: Option<String>, + pub projected_kind: String, + pub projected_id: Option<String>, + pub event_id: Option<String>, + pub event_kind: Option<i64>, + pub outbox_status: String, + pub relay_delivery_json: Option<String>, +} + +pub struct AppLocalInteropRepository<'a> { + connection: &'a Connection, +} + +impl<'a> AppLocalInteropRepository<'a> { + pub const fn new(connection: &'a Connection) -> Self { + Self { connection } + } + + pub fn import_from_path( + &self, + shared_database_path: &Path, + ) -> Result<AppLocalInteropImportReport, AppSqliteError> { + if let Some(parent) = shared_database_path.parent() { + fs::create_dir_all(parent).map_err(|source| AppSqliteError::CreateParentDirectory { + path: parent.to_path_buf(), + source, + })?; + } + let executor = SqliteExecutor::open(shared_database_path).map_err(|source| { + AppSqliteError::LocalEventsSql { + operation: "open shared local events database", + source, + } + })?; + let store = LocalEventsStore::new(executor); + store + .migrate_up() + .map_err(|source| AppSqliteError::LocalEventsSql { + operation: "migrate shared local events database", + source, + })?; + self.import_from_store(&store) + } + + pub fn import_from_store<E>( + &self, + store: &LocalEventsStore<E>, + ) -> Result<AppLocalInteropImportReport, AppSqliteError> + where + E: SqlExecutor, + { + let mut report = AppLocalInteropImportReport::default(); + let mut after_seq = 0i64; + loop { + let records = store + .list_records_after(after_seq, LOCAL_EVENTS_BATCH_LIMIT) + .map_err(|source| AppSqliteError::LocalEvents { + operation: "list shared local event records", + source, + })?; + let batch_len = records.len(); + for record in records { + after_seq = record.seq; + report.scanned_records += 1; + report.last_seq = Some(record.seq); + match self.import_record(&record)? { + ImportOutcome::Imported => report.imported_records += 1, + ImportOutcome::Skipped => report.skipped_records += 1, + } + } + if batch_len < LOCAL_EVENTS_BATCH_LIMIT as usize { + break; + } + } + if let Some(last_seq) = report.last_seq { + store + .advance_cursor(APP_LOCAL_EVENTS_CONSUMER_ID, last_seq, current_time_ms()?) + .map_err(|source| AppSqliteError::LocalEvents { + operation: "advance shared local event cursor", + source, + })?; + } + Ok(report) + } + + pub fn load_records(&self) -> Result<Vec<StoredLocalInteropRecord>, AppSqliteError> { + let mut statement = self + .connection + .prepare( + "SELECT + record_id, + local_seq, + record_family, + local_status, + source_runtime, + owner_account_id, + owner_pubkey, + farm_key, + listing_addr, + projected_kind, + projected_id, + event_id, + event_kind, + outbox_status, + relay_delivery_json + FROM local_interop_imports + ORDER BY local_seq ASC, record_id ASC", + ) + .map_err(|source| AppSqliteError::Query { + operation: "prepare local interop import query", + source, + })?; + let rows = statement + .query_map([], |row| { + Ok(StoredLocalInteropRecord { + record_id: row.get(0)?, + local_seq: row.get(1)?, + record_family: row.get(2)?, + local_status: row.get(3)?, + source_runtime: row.get(4)?, + owner_account_id: row.get(5)?, + owner_pubkey: row.get(6)?, + farm_key: row.get(7)?, + listing_addr: row.get(8)?, + projected_kind: row.get(9)?, + projected_id: row.get(10)?, + event_id: row.get(11)?, + event_kind: row.get(12)?, + outbox_status: row.get(13)?, + relay_delivery_json: row.get(14)?, + }) + }) + .map_err(|source| AppSqliteError::Query { + operation: "query local interop imports", + source, + })?; + rows.map(|row| { + row.map_err(|source| AppSqliteError::Query { + operation: "read local interop import row", + source, + }) + }) + .collect() + } + + fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> { + if record.source_runtime == SourceRuntime::App { + self.record_import(record, "unsupported", None)?; + return Ok(ImportOutcome::Skipped); + } + let projection = match record.family { + LocalRecordFamily::LocalWork => self.import_local_work(record)?, + LocalRecordFamily::SignedEvent => self.import_signed_event(record)?, + }; + match projection { + Some(projection) => { + self.record_import(record, projection.kind, projection.projected_id)?; + Ok(ImportOutcome::Imported) + } + None => { + self.record_import(record, "unsupported", None)?; + Ok(ImportOutcome::Skipped) + } + } + } + + fn import_local_work( + &self, + record: &LocalEventRecord, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + let Some(payload) = record.local_work_json.as_ref() else { + return Ok(None); + }; + match string_at(payload, &["record_kind"]).as_deref() { + Some("farm_config_v1") => self.import_farm_config(record, payload), + Some("listing_draft_v1") => self.import_listing_draft(record, payload), + _ => Ok(None), + } + } + + fn import_signed_event( + &self, + record: &LocalEventRecord, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + match record.event_kind { + Some(KIND_FARM) => self.import_signed_farm(record), + Some(KIND_LISTING | KIND_LISTING_DRAFT) => self.import_signed_listing(record), + _ => Ok(Some(ProjectionRecord { + kind: "signed_event", + projected_id: record.event_id.clone(), + })), + } + } + + fn import_farm_config( + &self, + record: &LocalEventRecord, + payload: &Value, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + let Some(document) = payload.get("document") else { + return Ok(None); + }; + let Some(farm_key) = record + .farm_id + .clone() + .or_else(|| string_at(document, &["selection", "farm_d_tag"])) + .or_else(|| string_at(document, &["farm", "d_tag"])) + else { + return Ok(None); + }; + let owner_pubkey = record.owner_pubkey.clone(); + let farm_id = deterministic_farm_id(owner_pubkey.as_deref(), farm_key.as_str()); + let display_name = string_at(document, &["profile", "display_name"]) + .or_else(|| string_at(document, &["profile", "name"])) + .or_else(|| string_at(document, &["farm", "name"])) + .unwrap_or_else(|| "Local farm".to_owned()); + let location = string_at(document, &["farm", "location", "primary"]) + .or_else(|| string_at(document, &["listing_defaults", "location", "primary"])) + .unwrap_or_default(); + let methods = string_at(document, &["listing_defaults", "delivery_method"]) + .and_then(|method| farm_order_method(method.as_str())) + .into_iter() + .collect::<Vec<_>>(); + let saved_farm = FarmSummary { + farm_id, + display_name: display_name.clone(), + readiness: FarmReadiness::Incomplete, + }; + self.upsert_farm_summary(&saved_farm)?; + let owner_account_id = record + .owner_account_id + .clone() + .or_else(|| string_at(document, &["selection", "account"])); + if let Some(owner_account_id) = owner_account_id.as_deref() { + let projection = FarmSetupProjection::new( + FarmSetupDraft::new(display_name, location, methods), + Some(saved_farm), + ); + AppFarmSetupRepository::new(self.connection) + .save_farm_setup(owner_account_id, &projection)?; + } + Ok(Some(ProjectionRecord { + kind: "farm", + projected_id: Some(farm_id.to_string()), + })) + } + + fn import_listing_draft( + &self, + record: &LocalEventRecord, + payload: &Value, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + let Some(document) = payload.get("document") else { + return Ok(None); + }; + let Some(listing_key) = + string_at(document, &["listing", "d_tag"]).or_else(|| listing_id(record)) + else { + return Ok(None); + }; + let owner_pubkey = record + .owner_pubkey + .clone() + .or_else(|| string_at(document, &["seller_actor", "pubkey"])); + let farm_key = record + .farm_id + .clone() + .or_else(|| string_at(document, &["listing", "farm_d_tag"])); + let Some(farm_key) = farm_key else { + return Ok(None); + }; + let farm_id = deterministic_farm_id(owner_pubkey.as_deref(), farm_key.as_str()); + self.ensure_farm_exists(farm_id)?; + let product_id = deterministic_product_id(owner_pubkey.as_deref(), listing_key.as_str()); + let title = string_at(document, &["product", "title"]) + .or_else(|| string_at(document, &["product", "key"])) + .unwrap_or_else(|| "Local product".to_owned()); + let subtitle = string_at(document, &["product", "summary"]).unwrap_or_default(); + let unit_label = string_at(document, &["primary_bin", "quantity_unit"]) + .or_else(|| string_at(document, &["primary_bin", "price_per_unit"])) + .unwrap_or_default(); + let price_minor_units = string_at(document, &["primary_bin", "price_amount"]) + .and_then(|price| parse_decimal_minor_units(price.as_str())); + let price_currency = string_at(document, &["primary_bin", "price_currency"]) + .unwrap_or_else(|| "USD".to_owned()); + let stock_count = string_at(document, &["inventory", "available"]) + .and_then(|quantity| parse_u32_quantity(quantity.as_str())); + self.upsert_product(ProductProjection { + product_id, + farm_id, + title, + subtitle, + status: product_status_for_record(record), + unit_label, + price_minor_units, + price_currency, + stock_count, + })?; + Ok(Some(ProjectionRecord { + kind: "listing", + projected_id: Some(product_id.to_string()), + })) + } + + fn import_signed_farm( + &self, + record: &LocalEventRecord, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + let Some(content) = record.event_content.as_deref() else { + return Ok(None); + }; + let content = parse_json_value(content)?; + let Some(farm_key) = record + .farm_id + .clone() + .or_else(|| string_at(&content, &["d_tag"])) + else { + return Ok(None); + }; + let owner_pubkey = record + .owner_pubkey + .as_deref() + .or(record.event_pubkey.as_deref()); + let farm_id = deterministic_farm_id(owner_pubkey, farm_key.as_str()); + let display_name = + string_at(&content, &["name"]).unwrap_or_else(|| "Local farm".to_owned()); + self.upsert_farm_summary(&FarmSummary { + farm_id, + display_name, + readiness: FarmReadiness::Incomplete, + })?; + Ok(Some(ProjectionRecord { + kind: "farm", + projected_id: Some(farm_id.to_string()), + })) + } + + fn import_signed_listing( + &self, + record: &LocalEventRecord, + ) -> Result<Option<ProjectionRecord>, AppSqliteError> { + let Some(content) = record.event_content.as_deref() else { + return Ok(None); + }; + let content = parse_json_value(content)?; + let listing_key = string_at(&content, &["d_tag"]).or_else(|| listing_id(record)); + let Some(listing_key) = listing_key else { + return Ok(None); + }; + let farm_key = record + .farm_id + .clone() + .or_else(|| string_at(&content, &["farm", "d_tag"])); + let Some(farm_key) = farm_key else { + return Ok(None); + }; + let signed_farm_pubkey = string_at(&content, &["farm", "pubkey"]); + let owner_pubkey = record + .owner_pubkey + .as_deref() + .or(record.event_pubkey.as_deref()) + .or(signed_farm_pubkey.as_deref()); + let farm_id = deterministic_farm_id(owner_pubkey, farm_key.as_str()); + self.ensure_farm_exists(farm_id)?; + let product_id = deterministic_product_id(owner_pubkey, listing_key.as_str()); + let title = string_at(&content, &["product", "title"]) + .or_else(|| string_at(&content, &["product", "key"])) + .unwrap_or_else(|| "Local product".to_owned()); + let subtitle = string_at(&content, &["product", "summary"]).unwrap_or_default(); + let bin = primary_bin(&content); + let unit_label = bin + .and_then(|value| { + string_at(value, &["quantity", "unit"]) + .or_else(|| string_at(value, &["display_unit"])) + .or_else(|| string_at(value, &["display_price_unit"])) + }) + .unwrap_or_default(); + let price_minor_units = bin.and_then(|value| { + string_at(value, &["price_per_canonical_unit", "amount", "amount"]) + .or_else(|| string_at(value, &["display_price", "amount"])) + .and_then(|price| parse_decimal_minor_units(price.as_str())) + }); + let price_currency = bin + .and_then(|value| { + string_at(value, &["price_per_canonical_unit", "amount", "currency"]) + .or_else(|| string_at(value, &["display_price", "currency"])) + }) + .unwrap_or_else(|| "USD".to_owned()); + let stock_count = string_at(&content, &["inventory_available"]) + .and_then(|quantity| parse_u32_quantity(quantity.as_str())); + self.upsert_product(ProductProjection { + product_id, + farm_id, + title, + subtitle, + status: product_status_for_record(record), + unit_label, + price_minor_units, + price_currency, + stock_count, + })?; + Ok(Some(ProjectionRecord { + kind: "listing", + projected_id: Some(product_id.to_string()), + })) + } + + fn upsert_farm_summary(&self, farm: &FarmSummary) -> Result<(), AppSqliteError> { + self.connection + .execute( + "INSERT INTO farms (id, display_name, readiness, created_at, updated_at) + VALUES (?1, ?2, ?3, strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + ON CONFLICT(id) DO UPDATE SET + display_name = excluded.display_name, + readiness = excluded.readiness, + updated_at = excluded.updated_at", + params![ + farm.farm_id.to_string(), + farm.display_name.as_str(), + farm_readiness_storage_key(farm.readiness), + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "upsert local interop farm summary", + source, + })?; + Ok(()) + } + + fn ensure_farm_exists(&self, farm_id: FarmId) -> Result<(), AppSqliteError> { + let exists = self + .connection + .query_row( + "SELECT EXISTS(SELECT 1 FROM farms WHERE id = ?1)", + [farm_id.to_string()], + |row| row.get::<_, bool>(0), + ) + .map_err(|source| AppSqliteError::Query { + operation: "check local interop farm existence", + source, + })?; + if !exists { + self.upsert_farm_summary(&FarmSummary { + farm_id, + display_name: "Local farm".to_owned(), + readiness: FarmReadiness::Incomplete, + })?; + } + Ok(()) + } + + fn upsert_product(&self, projection: ProductProjection) -> Result<(), AppSqliteError> { + self.connection + .execute( + "INSERT INTO products ( + id, + farm_id, + title, + subtitle, + status, + unit_label, + price_minor_units, + price_currency, + stock_count, + availability_window_id, + updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, NULL, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + ON CONFLICT(id) DO UPDATE SET + farm_id = excluded.farm_id, + title = excluded.title, + subtitle = excluded.subtitle, + status = excluded.status, + unit_label = excluded.unit_label, + price_minor_units = excluded.price_minor_units, + price_currency = excluded.price_currency, + stock_count = excluded.stock_count, + updated_at = excluded.updated_at", + params![ + projection.product_id.to_string(), + projection.farm_id.to_string(), + projection.title.as_str(), + projection.subtitle.as_str(), + projection.status.storage_key(), + projection.unit_label.as_str(), + projection.price_minor_units, + projection.price_currency.as_str(), + projection.stock_count, + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "upsert local interop product", + source, + })?; + Ok(()) + } + + fn record_import( + &self, + record: &LocalEventRecord, + projected_kind: &str, + projected_id: Option<String>, + ) -> Result<(), AppSqliteError> { + let relay_delivery_json = record + .relay_delivery_json + .as_ref() + .map(serde_json::to_string) + .transpose() + .map_err(|_| AppSqliteError::InvalidProjection { + reason: "local interop relay delivery json must encode", + })?; + self.connection + .execute( + "INSERT INTO local_interop_imports ( + record_id, + local_seq, + record_family, + local_status, + source_runtime, + owner_account_id, + owner_pubkey, + farm_key, + listing_addr, + projected_kind, + projected_id, + event_id, + event_kind, + outbox_status, + relay_delivery_json, + imported_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + ON CONFLICT(record_id) DO UPDATE SET + local_seq = excluded.local_seq, + record_family = excluded.record_family, + local_status = excluded.local_status, + source_runtime = excluded.source_runtime, + owner_account_id = excluded.owner_account_id, + owner_pubkey = excluded.owner_pubkey, + farm_key = excluded.farm_key, + listing_addr = excluded.listing_addr, + projected_kind = excluded.projected_kind, + projected_id = excluded.projected_id, + event_id = excluded.event_id, + event_kind = excluded.event_kind, + outbox_status = excluded.outbox_status, + relay_delivery_json = excluded.relay_delivery_json, + imported_at = excluded.imported_at", + params![ + record.record_id.as_str(), + record.seq, + record.family.as_str(), + record.status.as_str(), + record.source_runtime.as_str(), + record.owner_account_id.as_deref(), + record.owner_pubkey.as_deref(), + record.farm_id.as_deref(), + record.listing_addr.as_deref(), + projected_kind, + projected_id.as_deref(), + record.event_id.as_deref(), + record.event_kind, + record.outbox_status.as_str(), + relay_delivery_json.as_deref(), + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "record local interop import", + source, + })?; + Ok(()) + } +} + +impl AppSqliteStore { + pub fn local_interop_repository(&self) -> AppLocalInteropRepository<'_> { + AppLocalInteropRepository::new(&self.connection) + } + + pub fn import_shared_local_events_from_path( + &self, + shared_database_path: &Path, + ) -> Result<AppLocalInteropImportReport, AppSqliteError> { + self.local_interop_repository() + .import_from_path(shared_database_path) + } + + pub fn import_shared_local_events_from_store<E>( + &self, + store: &LocalEventsStore<E>, + ) -> Result<AppLocalInteropImportReport, AppSqliteError> + where + E: SqlExecutor, + { + self.local_interop_repository().import_from_store(store) + } + + pub fn load_local_interop_records( + &self, + ) -> Result<Vec<StoredLocalInteropRecord>, AppSqliteError> { + self.local_interop_repository().load_records() + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ImportOutcome { + Imported, + Skipped, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct ProjectionRecord { + kind: &'static str, + projected_id: Option<String>, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct ProductProjection { + product_id: ProductId, + farm_id: FarmId, + title: String, + subtitle: String, + status: ProductStatus, + unit_label: String, + price_minor_units: Option<u32>, + price_currency: String, + stock_count: Option<u32>, +} + +fn current_time_ms() -> Result<i64, AppSqliteError> { + let duration = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|_| { + AppSqliteError::InvalidProjection { + reason: "current local interop timestamp must be after unix epoch", + } + })?; + i64::try_from(duration.as_millis()).map_err(|_| AppSqliteError::InvalidProjection { + reason: "current local interop timestamp must fit i64 milliseconds", + }) +} + +fn deterministic_farm_id(owner_pubkey: Option<&str>, farm_key: &str) -> FarmId { + FarmId::from(deterministic_uuid( + "radroots-cli-farm", + owner_pubkey, + farm_key, + )) +} + +fn deterministic_product_id(owner_pubkey: Option<&str>, listing_key: &str) -> ProductId { + ProductId::from(deterministic_uuid( + "radroots-cli-listing", + owner_pubkey, + listing_key, + )) +} + +fn deterministic_uuid(scope: &str, owner_pubkey: Option<&str>, key: &str) -> Uuid { + let seed = format!( + "{scope}:{}:{}", + owner_pubkey.unwrap_or("unknown-owner"), + key.trim() + ); + Uuid::new_v5(&Uuid::NAMESPACE_URL, seed.as_bytes()) +} + +fn string_at(value: &Value, path: &[&str]) -> Option<String> { + let mut cursor = value; + for segment in path { + cursor = cursor.get(*segment)?; + } + match cursor { + Value::String(value) => { + let trimmed = value.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_owned()) + } + Value::Number(number) => Some(number.to_string()), + _ => None, + } +} + +fn listing_id(record: &LocalEventRecord) -> Option<String> { + record + .listing_addr + .as_deref() + .and_then(|addr| addr.rsplit(':').next()) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_owned) +} + +fn farm_order_method(value: &str) -> Option<FarmOrderMethod> { + match value.trim() { + "pickup" => Some(FarmOrderMethod::Pickup), + "delivery" | "local_delivery" => Some(FarmOrderMethod::Delivery), + "shipping" => Some(FarmOrderMethod::Shipping), + _ => None, + } +} + +fn parse_decimal_minor_units(value: &str) -> Option<u32> { + let value = value.trim(); + if value.is_empty() || value.starts_with('-') { + return None; + } + let (whole, fraction) = value.split_once('.').unwrap_or((value, "")); + let whole_units = whole.parse::<u32>().ok()?; + let cents = match fraction.len() { + 0 => 0, + 1 => fraction.parse::<u32>().ok()? * 10, + _ => fraction.get(0..2)?.parse::<u32>().ok()?, + }; + whole_units.checked_mul(100)?.checked_add(cents) +} + +fn parse_u32_quantity(value: &str) -> Option<u32> { + let value = value.trim(); + if value.is_empty() || value.starts_with('-') { + return None; + } + let whole = value.split_once('.').map_or(value, |(whole, _)| whole); + whole.parse::<u32>().ok() +} + +fn product_status_for_record(record: &LocalEventRecord) -> ProductStatus { + if record.status == LocalRecordStatus::Published + && record.outbox_status == PublishOutboxStatus::Acknowledged + { + ProductStatus::Published + } else { + ProductStatus::Draft + } +} + +fn primary_bin(content: &Value) -> Option<&Value> { + let bins = content.get("bins")?.as_array()?; + let primary_bin_id = string_at(content, &["primary_bin_id"]); + primary_bin_id + .as_deref() + .and_then(|primary_bin_id| { + bins.iter() + .find(|bin| string_at(bin, &["bin_id"]).as_deref() == Some(primary_bin_id)) + }) + .or_else(|| bins.first()) +} + +fn parse_json_value(raw: &str) -> Result<Value, AppSqliteError> { + serde_json::from_str(raw).map_err(|_| AppSqliteError::InvalidProjection { + reason: "shared local signed event content must be json", + }) +} + +fn farm_readiness_storage_key(readiness: FarmReadiness) -> &'static str { + match readiness { + FarmReadiness::Incomplete => "incomplete", + FarmReadiness::Ready => "ready", + } +} + +#[cfg(test)] +mod tests { + use radroots_local_events::{ + LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, + PublishOutboxStatus, SourceRuntime, + }; + use radroots_sql_core::SqliteExecutor; + use serde_json::json; + + use crate::{AppSqliteStore, DatabaseTarget}; + + fn local_events_store() -> LocalEventsStore<SqliteExecutor> { + let executor = SqliteExecutor::open_memory().expect("open local events memory db"); + let store = LocalEventsStore::new(executor); + store.migrate_up().expect("migrate local events store"); + store + } + + fn local_work_record( + record_id: &str, + farm_key: &str, + payload: serde_json::Value, + ) -> LocalEventRecordInput { + LocalEventRecordInput { + record_id: record_id.to_owned(), + family: LocalRecordFamily::LocalWork, + status: LocalRecordStatus::LocalSaved, + source_runtime: SourceRuntime::Cli, + created_at_ms: 1000, + inserted_at_ms: 1001, + owner_account_id: Some("seller-account".to_owned()), + owner_pubkey: Some("seller-pubkey".to_owned()), + farm_id: Some(farm_key.to_owned()), + listing_addr: None, + local_work_json: Some(payload), + event_id: None, + event_kind: None, + event_pubkey: None, + event_created_at: None, + event_tags_json: None, + event_content: None, + event_sig: None, + raw_event_json: None, + outbox_status: PublishOutboxStatus::None, + relay_set_fingerprint: None, + relay_delivery_json: None, + } + } + + #[test] + fn imports_cli_local_work_into_app_farm_and_product_projection() { + let app_store = + AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store"); + let events = local_events_store(); + let farm_key = "AAAAAAAAAAAAAAAAAAAAAA"; + let listing_key = "BBBBBBBBBBBBBBBBBBBBBB"; + events + .append_record(&local_work_record( + "cli:local_work:farm", + farm_key, + json!({ + "record_kind": "farm_config_v1", + "document": { + "selection": { + "account": "seller-account", + "farm_d_tag": farm_key + }, + "profile": { + "name": "Green Farm", + "display_name": "Green Farm" + }, + "farm": { + "d_tag": farm_key, + "name": "Green Farm", + "location": { + "primary": "farmstand" + } + }, + "listing_defaults": { + "delivery_method": "pickup", + "location": { + "primary": "farmstand" + } + } + } + }), + )) + .expect("append farm local work"); + let mut listing = local_work_record( + "cli:local_work:listing", + farm_key, + json!({ + "record_kind": "listing_draft_v1", + "document": { + "listing": { + "d_tag": listing_key, + "farm_d_tag": farm_key + }, + "seller_actor": { + "account_id": "seller-account", + "pubkey": "seller-pubkey" + }, + "product": { + "key": "eggs", + "title": "Eggs", + "summary": "Fresh eggs" + }, + "primary_bin": { + "quantity_unit": "each", + "price_amount": "6", + "price_currency": "USD" + }, + "inventory": { + "available": "10" + } + } + }), + ); + listing.listing_addr = Some(format!("30402:seller-pubkey:{listing_key}")); + events + .append_record(&listing) + .expect("append listing local work"); + + let report = app_store + .import_shared_local_events_from_store(&events) + .expect("import shared local events"); + let second_report = app_store + .import_shared_local_events_from_store(&events) + .expect("import shared local events again"); + + assert_eq!(report.scanned_records, 2); + assert_eq!(report.imported_records, 2); + assert_eq!(second_report.imported_records, 2); + let imported = app_store + .load_local_interop_records() + .expect("load imported records"); + assert_eq!(imported.len(), 2); + assert!( + imported + .iter() + .all(|record| record.local_status == "local_saved") + ); + let farm_setup = app_store + .load_farm_setup("seller-account") + .expect("load farm setup"); + let saved_farm = farm_setup.saved_farm.expect("saved farm"); + assert_eq!(saved_farm.display_name, "Green Farm"); + assert_eq!(farm_setup.draft.farm_name, "Green Farm"); + let products = app_store + .load_products( + saved_farm.farm_id, + "", + Default::default(), + Default::default(), + ) + .expect("load products"); + assert_eq!(products.rows.len(), 1); + assert_eq!(products.rows[0].title, "Eggs"); + assert_eq!(products.rows[0].subtitle.as_deref(), Some("Fresh eggs")); + assert_eq!( + products.rows[0] + .price + .as_ref() + .expect("price") + .amount_minor_units, + 600 + ); + assert_eq!(products.rows[0].stock.quantity, Some(10)); + } +} diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs @@ -48,6 +48,10 @@ const MIGRATIONS: &[Migration] = &[ version: 11, sql: include_str!("../migrations/0011_reminders_and_recovery.sql"), }, + Migration { + version: 12, + sql: include_str!("../migrations/0012_local_interop_imports.sql"), + }, ]; pub fn latest_schema_version() -> u32 {