commit 929b814fe4ddc8902da0936aded10ff50fbde712
parent f6f034cbe60925de7198e5ec1ed65a5e436007f4
Author: triesap <tyson@radroots.org>
Date: Tue, 28 Apr 2026 00:08:49 +0000
replica_sync: project listings into replica
Diffstat:
3 files changed, 471 insertions(+), 4 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -2559,6 +2559,7 @@ version = "0.1.0-alpha.2"
dependencies = [
"base64 0.22.1",
"hex",
+ "radroots_core",
"radroots_events",
"radroots_events_codec",
"radroots_replica_db",
diff --git a/crates/replica_sync/Cargo.toml b/crates/replica_sync/Cargo.toml
@@ -28,6 +28,7 @@ std = [
radroots_events = { workspace = true, default-features = false, features = [
"serde",
] }
+radroots_core = { workspace = true, default-features = false }
radroots_events_codec = { workspace = true, default-features = false, features = [
"serde_json",
] }
diff --git a/crates/replica_sync/src/ingest.rs b/crates/replica_sync/src/ingest.rs
@@ -11,15 +11,22 @@ use base64::Engine;
#[cfg(feature = "std")]
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
+use radroots_core::RadrootsCoreDecimal;
use radroots_events::RadrootsNostrEvent;
-use radroots_events::kinds::{KIND_FARM, KIND_PLOT, KIND_PROFILE, is_nip51_list_set_kind};
+use radroots_events::kinds::{
+ KIND_FARM, KIND_LISTING, KIND_PLOT, KIND_PROFILE, is_nip51_list_set_kind,
+};
+use radroots_events::listing::{
+ RadrootsListing, RadrootsListingAvailability, RadrootsListingBin, RadrootsListingStatus,
+};
use radroots_events_codec::farm::decode as farm_decode;
use radroots_events_codec::list_set::decode as list_set_decode;
+use radroots_events_codec::listing::decode as listing_decode;
use radroots_events_codec::plot::decode as plot_decode;
use radroots_events_codec::profile::decode as profile_decode;
use radroots_replica_db::{
farm, farm_gcs_location, farm_member, farm_member_claim, farm_tag, gcs_location,
- nostr_event_state, nostr_profile, plot, plot_gcs_location, plot_tag,
+ nostr_event_state, nostr_profile, plot, plot_gcs_location, plot_tag, trade_product,
};
use radroots_replica_db_schema::farm::{
FarmQueryBindValues, IFarmFields, IFarmFieldsFilter, IFarmFieldsPartial, IFarmFindMany,
@@ -62,6 +69,11 @@ use radroots_replica_db_schema::plot_tag::{
IPlotTagDelete, IPlotTagFields, IPlotTagFieldsFilter, IPlotTagFindMany, IPlotTagFindOneArgs,
PlotTagQueryBindValues,
};
+use radroots_replica_db_schema::trade_product::{
+ ITradeProductFields, ITradeProductFieldsFilter, ITradeProductFieldsPartial,
+ ITradeProductFindMany, ITradeProductFindOne, ITradeProductFindOneArgs, ITradeProductUpdate,
+ TradeProductQueryBindValues,
+};
use radroots_sql_core::SqlExecutor;
use radroots_sql_core::error::SqlError;
use serde_json::Value;
@@ -175,6 +187,7 @@ fn ingest_event_inner(
KIND_PROFILE => ingest_profile_event(exec, event),
KIND_FARM => ingest_farm_event(exec, event, factory),
KIND_PLOT => ingest_plot_event(exec, event, factory),
+ KIND_LISTING => ingest_listing_event(exec, event),
kind if is_nip51_list_set_kind(kind) => ingest_list_set_event(exec, event),
_ => Err(RadrootsReplicaEventsError::InvalidData(format!(
"unsupported kind {}",
@@ -435,6 +448,28 @@ fn ingest_plot_event(
Ok(RadrootsReplicaIngestOutcome::Applied)
}
+fn ingest_listing_event(
+ exec: &dyn SqlExecutor,
+ event: &RadrootsNostrEvent,
+) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
+ let listing = listing_decode::listing_from_event(event.kind, &event.tags, &event.content)?;
+ let decision = event_state_decision(exec, event, &listing.d_tag)?;
+ if !decision.apply {
+ return Ok(RadrootsReplicaIngestOutcome::Skipped);
+ }
+
+ let listing_addr = listing_event_addr(event, &listing);
+ if listing_is_orderable(&listing) {
+ let fields = trade_product_fields_from_listing(&listing, &listing_addr)?;
+ upsert_trade_product_for_listing_addr(exec, &listing_addr, fields)?;
+ } else {
+ delete_trade_products_for_listing_addr(exec, &listing_addr)?;
+ }
+
+ radroots_replica_ingest_event_state(exec, event, &listing.d_tag, &decision.content_hash)?;
+ Ok(RadrootsReplicaIngestOutcome::Applied)
+}
+
fn ingest_list_set_event(
exec: &dyn SqlExecutor,
event: &RadrootsNostrEvent,
@@ -490,6 +525,240 @@ fn ingest_list_set_event(
))
}
+fn listing_event_addr(event: &RadrootsNostrEvent, listing: &RadrootsListing) -> String {
+ format!("{}:{}:{}", event.kind, event.author, listing.d_tag)
+}
+
+fn listing_is_orderable(listing: &RadrootsListing) -> bool {
+ match listing.availability.as_ref() {
+ Some(RadrootsListingAvailability::Status { status }) => {
+ matches!(status, RadrootsListingStatus::Active)
+ }
+ Some(RadrootsListingAvailability::Window { .. }) | None => true,
+ }
+}
+
+fn trade_product_fields_from_listing(
+ listing: &RadrootsListing,
+ listing_addr: &str,
+) -> Result<ITradeProductFields, RadrootsReplicaEventsError> {
+ let bin = primary_listing_bin(listing)?;
+ let qty_amt = decimal_to_i64(&bin.quantity.amount, "listing primary bin quantity")?;
+ let qty_avail = listing
+ .inventory_available
+ .as_ref()
+ .map(|amount| decimal_to_i64(amount, "listing inventory"))
+ .transpose()?;
+ let price_amt = bin
+ .display_price
+ .as_ref()
+ .unwrap_or(&bin.price_per_canonical_unit.amount)
+ .amount
+ .to_f64_lossy()
+ .ok_or_else(|| {
+ RadrootsReplicaEventsError::InvalidData("listing price amount out of range".to_string())
+ })?;
+ let price_currency = bin
+ .display_price
+ .as_ref()
+ .unwrap_or(&bin.price_per_canonical_unit.amount)
+ .currency
+ .as_str()
+ .to_string();
+ let price_qty_amt = if bin.display_price.is_some() {
+ 1
+ } else {
+ decimal_to_u32(
+ &bin.price_per_canonical_unit.quantity.amount,
+ "listing price quantity",
+ )?
+ };
+ let price_qty_unit = bin
+ .display_price_unit
+ .unwrap_or(bin.price_per_canonical_unit.quantity.unit)
+ .to_string();
+
+ Ok(ITradeProductFields {
+ key: listing.product.key.clone(),
+ category: listing.product.category.clone(),
+ title: listing.product.title.clone(),
+ summary: listing.product.summary.clone().unwrap_or_default(),
+ process: listing.product.process.clone().unwrap_or_default(),
+ lot: listing.product.lot.clone().unwrap_or_default(),
+ profile: listing.product.profile.clone().unwrap_or_default(),
+ year: listing
+ .product
+ .year
+ .as_deref()
+ .and_then(|value| value.parse::<i64>().ok())
+ .unwrap_or_default(),
+ qty_amt,
+ qty_unit: bin.quantity.unit.to_string(),
+ qty_label: bin
+ .display_label
+ .clone()
+ .or_else(|| bin.quantity.label.clone()),
+ qty_avail,
+ price_amt,
+ price_currency,
+ price_qty_amt,
+ price_qty_unit,
+ listing_addr: Some(listing_addr.to_string()),
+ notes: None,
+ })
+}
+
+fn primary_listing_bin(
+ listing: &RadrootsListing,
+) -> Result<&RadrootsListingBin, RadrootsReplicaEventsError> {
+ listing
+ .bins
+ .iter()
+ .find(|bin| bin.bin_id == listing.primary_bin_id)
+ .ok_or_else(|| {
+ RadrootsReplicaEventsError::InvalidData(
+ "listing primary bin missing from bins".to_string(),
+ )
+ })
+}
+
+fn decimal_to_i64(
+ value: &RadrootsCoreDecimal,
+ field: &str,
+) -> Result<i64, RadrootsReplicaEventsError> {
+ let value = decimal_to_u64(value, field)?;
+ i64::try_from(value)
+ .map_err(|_| RadrootsReplicaEventsError::InvalidData(format!("{field} exceeds i64 range")))
+}
+
+fn decimal_to_u32(
+ value: &RadrootsCoreDecimal,
+ field: &str,
+) -> Result<u32, RadrootsReplicaEventsError> {
+ let value = decimal_to_u64(value, field)?;
+ u32::try_from(value)
+ .map_err(|_| RadrootsReplicaEventsError::InvalidData(format!("{field} exceeds u32 range")))
+}
+
+fn decimal_to_u64(
+ value: &RadrootsCoreDecimal,
+ field: &str,
+) -> Result<u64, RadrootsReplicaEventsError> {
+ value.to_u64_exact().ok_or_else(|| {
+ RadrootsReplicaEventsError::InvalidData(format!("{field} must be a whole number"))
+ })
+}
+
+fn trade_product_listing_addr_filter(listing_addr: &str) -> ITradeProductFieldsFilter {
+ ITradeProductFieldsFilter {
+ id: None,
+ created_at: None,
+ updated_at: None,
+ key: None,
+ category: None,
+ title: None,
+ summary: None,
+ process: None,
+ lot: None,
+ profile: None,
+ year: None,
+ qty_amt: None,
+ qty_unit: None,
+ qty_label: None,
+ qty_avail: None,
+ price_amt: None,
+ price_currency: None,
+ price_qty_amt: None,
+ price_qty_unit: None,
+ listing_addr: Some(listing_addr.to_string()),
+ notes: None,
+ }
+}
+
+fn upsert_trade_product_for_listing_addr(
+ exec: &dyn SqlExecutor,
+ listing_addr: &str,
+ fields: ITradeProductFields,
+) -> Result<(), RadrootsReplicaEventsError> {
+ let existing = trade_product::find_many(
+ exec,
+ &ITradeProductFindMany {
+ filter: Some(trade_product_listing_addr_filter(listing_addr)),
+ },
+ )?
+ .results;
+
+ if let Some(row) = existing.first() {
+ let update = ITradeProductUpdate {
+ on: TradeProductQueryBindValues::Id { id: row.id.clone() },
+ fields: trade_product_partial_from_fields(&fields),
+ };
+ let _ = trade_product::update(exec, &update)?;
+ for duplicate in existing.iter().skip(1) {
+ delete_trade_product_by_id(exec, &duplicate.id)?;
+ }
+ } else {
+ let _ = trade_product::create(exec, &fields)?;
+ }
+
+ Ok(())
+}
+
+fn delete_trade_products_for_listing_addr(
+ exec: &dyn SqlExecutor,
+ listing_addr: &str,
+) -> Result<(), RadrootsReplicaEventsError> {
+ let existing = trade_product::find_many(
+ exec,
+ &ITradeProductFindMany {
+ filter: Some(trade_product_listing_addr_filter(listing_addr)),
+ },
+ )?
+ .results;
+
+ for row in existing {
+ delete_trade_product_by_id(exec, &row.id)?;
+ }
+
+ Ok(())
+}
+
+fn delete_trade_product_by_id(
+ exec: &dyn SqlExecutor,
+ id: &str,
+) -> Result<(), RadrootsReplicaEventsError> {
+ let _ = trade_product::delete(
+ exec,
+ &ITradeProductFindOne::On(ITradeProductFindOneArgs {
+ on: TradeProductQueryBindValues::Id { id: id.to_string() },
+ }),
+ )?;
+ Ok(())
+}
+
+fn trade_product_partial_from_fields(fields: &ITradeProductFields) -> ITradeProductFieldsPartial {
+ ITradeProductFieldsPartial {
+ key: Some(Value::from(fields.key.clone())),
+ category: Some(Value::from(fields.category.clone())),
+ title: Some(Value::from(fields.title.clone())),
+ summary: Some(Value::from(fields.summary.clone())),
+ process: Some(Value::from(fields.process.clone())),
+ lot: Some(Value::from(fields.lot.clone())),
+ profile: Some(Value::from(fields.profile.clone())),
+ year: Some(Value::from(fields.year)),
+ qty_amt: Some(Value::from(fields.qty_amt)),
+ qty_unit: Some(Value::from(fields.qty_unit.clone())),
+ qty_label: to_value_opt(fields.qty_label.clone()),
+ qty_avail: fields.qty_avail.map(Value::from).or(Some(Value::Null)),
+ price_amt: Some(Value::from(fields.price_amt)),
+ price_currency: Some(Value::from(fields.price_currency.clone())),
+ price_qty_amt: Some(Value::from(fields.price_qty_amt)),
+ price_qty_unit: Some(Value::from(fields.price_qty_unit.clone())),
+ listing_addr: to_value_opt(fields.listing_addr.clone()),
+ notes: to_value_opt(fields.notes.clone()),
+ }
+}
+
pub fn radroots_replica_ingest_event_state(
exec: &dyn SqlExecutor,
event: &RadrootsNostrEvent,
@@ -1118,8 +1387,9 @@ mod tests {
use radroots_events_codec::list_set::encode as list_set_encode;
use radroots_events_codec::plot::encode as plot_encode;
use radroots_replica_db::{
- farm, farm_gcs_location, farm_member, farm_member_claim, farm_tag, gcs_location,
- migrations, plot, plot_gcs_location, plot_tag,
+ ReplicaSql, farm, farm_gcs_location, farm_member, farm_member_claim, farm_tag,
+ gcs_location, migrations, nostr_event_state, plot, plot_gcs_location, plot_tag,
+ trade_product,
};
use radroots_replica_db_schema::farm::IFarmFields;
use radroots_replica_db_schema::farm_gcs_location::IFarmGcsLocationFields;
@@ -1428,6 +1698,63 @@ mod tests {
}
}
+ fn listing_event(
+ id: u64,
+ author: &str,
+ created_at: u32,
+ d_tag: &str,
+ status: &str,
+ title: &str,
+ ) -> RadrootsNostrEvent {
+ let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
+ RadrootsNostrEvent {
+ id: format!("{id:064x}"),
+ author: author.to_string(),
+ created_at,
+ kind: KIND_LISTING,
+ tags: vec![
+ vec!["d".to_string(), d_tag.to_string()],
+ vec![
+ "a".to_string(),
+ format!("{}:{}:{}", KIND_FARM, author, farm_d_tag),
+ ],
+ vec!["p".to_string(), author.to_string()],
+ vec!["key".to_string(), "pasture-eggs".to_string()],
+ vec!["title".to_string(), title.to_string()],
+ vec!["category".to_string(), "eggs".to_string()],
+ vec!["summary".to_string(), "Pasture-raised eggs".to_string()],
+ vec!["process".to_string(), "washed".to_string()],
+ vec!["lot".to_string(), "lot-a".to_string()],
+ vec!["profile".to_string(), "dozen".to_string()],
+ vec!["year".to_string(), "2026".to_string()],
+ vec!["radroots:primary_bin".to_string(), "bin-a".to_string()],
+ vec![
+ "radroots:bin".to_string(),
+ "bin-a".to_string(),
+ "12".to_string(),
+ "each".to_string(),
+ "12".to_string(),
+ "each".to_string(),
+ "dozen".to_string(),
+ ],
+ vec![
+ "radroots:price".to_string(),
+ "bin-a".to_string(),
+ "6".to_string(),
+ "USD".to_string(),
+ "1".to_string(),
+ "each".to_string(),
+ "6".to_string(),
+ "each".to_string(),
+ ],
+ vec!["inventory".to_string(), "5".to_string()],
+ vec!["status".to_string(), status.to_string()],
+ ],
+ content: format!("# {title}"),
+ sig: "f".repeat(128),
+ }
+ }
+
fn seed_rows(exec: &SqliteExecutor) -> (String, String, String, String) {
migrations::run_all_up(exec).expect("migrations");
let farm_row = farm::create(
@@ -1953,6 +2280,129 @@ mod tests {
}
#[test]
+ fn ingest_listing_projects_trade_product_and_removes_archived_replacements() {
+ let exec = SqliteExecutor::open_memory().expect("db");
+ migrations::run_all_up(&exec).expect("migrations");
+
+ let seller_pubkey = "s".repeat(64);
+ let listing_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
+ let listing_addr = format!("{}:{}:{}", KIND_LISTING, seller_pubkey, listing_d_tag);
+
+ let active = listing_event(
+ 500,
+ &seller_pubkey,
+ 10,
+ listing_d_tag,
+ "active",
+ "Pasture Eggs",
+ );
+ assert_eq!(
+ radroots_replica_ingest_event(&exec, &active).expect("active ingest"),
+ RadrootsReplicaIngestOutcome::Applied
+ );
+
+ let replica = ReplicaSql::new(&exec);
+ let search_rows = replica
+ .trade_product_search(&["eggs".to_string()])
+ .expect("search");
+ assert_eq!(search_rows.len(), 1);
+ assert_eq!(
+ search_rows[0].listing_addr.as_deref(),
+ Some(listing_addr.as_str())
+ );
+ assert_eq!(search_rows[0].title, "Pasture Eggs");
+ assert_eq!(search_rows[0].qty_amt, 12);
+ assert_eq!(search_rows[0].qty_avail, Some(5));
+ assert_eq!(search_rows[0].price_amt, 6.0);
+ assert_eq!(search_rows[0].price_currency, "USD");
+
+ let updated = listing_event(
+ 501,
+ &seller_pubkey,
+ 11,
+ listing_d_tag,
+ "active",
+ "Market Eggs",
+ );
+ assert_eq!(
+ radroots_replica_ingest_event(&exec, &updated).expect("listing update"),
+ RadrootsReplicaIngestOutcome::Applied
+ );
+ let product_rows = trade_product::find_many(
+ &exec,
+ &ITradeProductFindMany {
+ filter: Some(trade_product_listing_addr_filter(&listing_addr)),
+ },
+ )
+ .expect("product rows")
+ .results;
+ assert_eq!(product_rows.len(), 1);
+ assert_eq!(product_rows[0].title, "Market Eggs");
+
+ let archived = listing_event(
+ 502,
+ &seller_pubkey,
+ 12,
+ listing_d_tag,
+ "archived",
+ "Market Eggs",
+ );
+ assert_eq!(
+ radroots_replica_ingest_event(&exec, &archived).expect("archived ingest"),
+ RadrootsReplicaIngestOutcome::Applied
+ );
+ let search_rows = replica
+ .trade_product_search(&["eggs".to_string()])
+ .expect("search archived");
+ assert!(search_rows.is_empty());
+
+ let product_rows = trade_product::find_many(
+ &exec,
+ &ITradeProductFindMany {
+ filter: Some(trade_product_listing_addr_filter(&listing_addr)),
+ },
+ )
+ .expect("archived product rows")
+ .results;
+ assert!(product_rows.is_empty());
+
+ let state = nostr_event_state::find_one(
+ &exec,
+ &INostrEventStateFindOne::On(INostrEventStateFindOneArgs {
+ on: NostrEventStateQueryBindValues::Key {
+ key: event_state_key(KIND_LISTING, &seller_pubkey, listing_d_tag),
+ },
+ }),
+ )
+ .expect("event state")
+ .result
+ .expect("state row");
+ assert_eq!(state.last_event_id, archived.id);
+
+ let stale_active = listing_event(
+ 499,
+ &seller_pubkey,
+ 11,
+ listing_d_tag,
+ "active",
+ "Stale Eggs",
+ );
+ assert_eq!(
+ radroots_replica_ingest_event(&exec, &stale_active).expect("stale ingest"),
+ RadrootsReplicaIngestOutcome::Skipped
+ );
+ let product_rows = trade_product::find_many(
+ &exec,
+ &ITradeProductFindMany {
+ filter: Some(trade_product_listing_addr_filter(&listing_addr)),
+ },
+ )
+ .expect("stale product rows")
+ .results;
+ assert!(product_rows.is_empty());
+ }
+
+ #[test]
fn upsert_location_none_paths_are_ok() {
let exec = SqliteExecutor::open_memory().expect("db");
migrations::run_all_up(&exec).expect("migrations");
@@ -2082,6 +2532,21 @@ mod tests {
.results
.is_empty()
);
+ assert!(
+ upsert_farm_location(
+ &exec,
+ &farm_id,
+ Some(RadrootsFarmLocation {
+ primary: None,
+ city: None,
+ region: None,
+ country: None,
+ gcs: Some(sample_gcs(2.0, 3.0, "s6")),
+ }),
+ &FixedFactory,
+ )
+ .is_ok()
+ );
let not_found_plot_locations = DeleteErrorExecutor {
inner: &exec,