commit 82b3f3e28134424d1d966a1f349c3df895def958
parent d6ea400730d3bbc6b0cc96dcfafad1828ca8865a
Author: triesap <tyson@radroots.org>
Date: Sun, 12 Apr 2026 05:24:34 +0000
runtime: consume shaped replica queries
Diffstat:
3 files changed, 22 insertions(+), 141 deletions(-)
diff --git a/src/runtime/find.rs b/src/runtime/find.rs
@@ -1,39 +1,20 @@
-use radroots_sql_core::{SqlExecutor, SqliteExecutor, utils};
-use serde::Deserialize;
-use serde_json::Value;
+use radroots_replica_db::ReplicaSql;
+use radroots_sql_core::SqliteExecutor;
use crate::cli::FindArgs;
use crate::domain::runtime::{
FindHyfView, FindPriceView, FindQuantityView, FindResultHyfView, FindResultProvenanceView,
FindResultView, FindView, SyncFreshnessView,
};
-use crate::runtime::RuntimeError;
use crate::runtime::config::RuntimeConfig;
use crate::runtime::hyf::{self, HyfQueryRewriteRequest, HyfRequestContext};
use crate::runtime::sync::freshness_from_executor;
+use crate::runtime::RuntimeError;
const FIND_SOURCE: &str = "local replica · local first";
const FIND_HYF_SOURCE: &str = "hyf query_rewrite · local first";
const FIND_HYF_QUERY_REWRITE_REQUEST_ID: &str = "cli-find-query-rewrite";
-#[derive(Debug, Clone, Deserialize)]
-struct FindRow {
- id: String,
- key: String,
- category: String,
- title: String,
- summary: String,
- qty_amt: i64,
- qty_unit: String,
- qty_label: Option<String>,
- qty_avail: Option<i64>,
- price_amt: f64,
- price_currency: String,
- price_qty_amt: u32,
- price_qty_unit: String,
- location_primary: Option<String>,
-}
-
#[derive(Debug, Clone)]
struct AppliedQueryRewrite {
rewritten_query: String,
@@ -82,14 +63,14 @@ pub fn search(config: &RuntimeConfig, args: &FindArgs) -> Result<FindView, Runti
});
}
- let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
- let freshness = freshness_from_executor(&executor)?;
+ let db = ReplicaSql::new(SqliteExecutor::open(&config.local.replica_db_path)?);
+ let freshness = freshness_from_executor(db.executor())?;
let applied_query_rewrite = attempt_query_rewrite(config, query.as_str(), &args.query);
let effective_query_terms = applied_query_rewrite
.as_ref()
.map(|rewrite| rewrite.query_terms.clone())
.unwrap_or_else(|| normalize_query_terms(args.query.clone()));
- let rows = query_rows(&executor, effective_query_terms.as_slice())?;
+ let rows = db.trade_product_search(effective_query_terms.as_slice())?;
let relay_count = config.relay.urls.len();
let result_provenance = FindResultProvenanceView {
origin: "local_replica.trade_product".to_owned(),
@@ -197,42 +178,6 @@ fn attempt_query_rewrite(
})
}
-fn query_rows(
- executor: &SqliteExecutor,
- query_terms: &[String],
-) -> Result<Vec<FindRow>, RuntimeError> {
- let mut where_clauses = Vec::with_capacity(query_terms.len());
- let mut bind_values = Vec::<Value>::with_capacity(query_terms.len() * 5);
-
- for term in query_terms {
- let pattern = format!("%{}%", term.to_lowercase());
- where_clauses.push(
- "(lower(tp.title) LIKE ? OR lower(tp.summary) LIKE ? OR lower(tp.category) LIKE ? OR lower(tp.key) LIKE ? OR lower(COALESCE(tp.notes, '')) LIKE ?)"
- .to_owned(),
- );
- for _ in 0..5 {
- bind_values.push(Value::from(pattern.clone()));
- }
- }
-
- let sql = format!(
- "SELECT tp.id, tp.key, tp.category, tp.title, tp.summary, tp.qty_amt, tp.qty_unit, tp.qty_label, tp.qty_avail, tp.price_amt, tp.price_currency, tp.price_qty_amt, tp.price_qty_unit, loc.location_primary \
- FROM trade_product tp \
- LEFT JOIN (\
- SELECT tpl.tb_tp AS trade_product_id, MIN(COALESCE(gl.label, gl.gc_name, gl.gc_admin1_name, gl.gc_country_name, gl.d_tag)) AS location_primary \
- FROM trade_product_location tpl \
- JOIN gcs_location gl ON gl.id = tpl.tb_gl \
- GROUP BY tpl.tb_tp\
- ) loc ON loc.trade_product_id = tp.id \
- WHERE {} \
- ORDER BY lower(tp.title) ASC, tp.id ASC;",
- where_clauses.join(" AND ")
- );
- let params_json = utils::to_params_json(bind_values)?;
- let raw = executor.query_raw(&sql, ¶ms_json)?;
- serde_json::from_str(&raw).map_err(RuntimeError::from)
-}
-
fn non_empty(value: String) -> Option<String> {
let trimmed = value.trim();
if trimmed.is_empty() {
diff --git a/src/runtime/listing.rs b/src/runtime/listing.rs
@@ -7,7 +7,6 @@ use radroots_core::{
RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity,
RadrootsCoreQuantityPrice, RadrootsCoreUnit,
};
-use radroots_events::RadrootsNostrEvent;
use radroots_events::kinds::{KIND_LISTING, KIND_LISTING_DRAFT};
use radroots_events::listing::{
RadrootsListing, RadrootsListingAvailability, RadrootsListingBin,
@@ -15,13 +14,14 @@ use radroots_events::listing::{
RadrootsListingProduct, RadrootsListingStatus,
};
use radroots_events::trade::RadrootsTradeListingValidationError;
+use radroots_events::RadrootsNostrEvent;
use radroots_events_codec::d_tag::is_d_tag_base64url;
use radroots_events_codec::listing::encode::to_wire_parts_with_kind;
-use radroots_sql_core::{SqlExecutor, SqliteExecutor, utils};
+use radroots_replica_db::ReplicaSql;
+use radroots_sql_core::SqliteExecutor;
use radroots_trade::listing::publish::validate_listing_for_seller;
use radroots_trade::listing::validation::validate_listing_event;
use serde::{Deserialize, Serialize};
-use serde_json::Value;
use crate::cli::{ListingFileArgs, ListingMutationArgs, ListingNewArgs, RecordKeyArgs};
use crate::domain::runtime::{
@@ -29,13 +29,13 @@ use crate::domain::runtime::{
ListingMutationEventView, ListingMutationJobView, ListingMutationView, ListingNewView,
ListingValidateView, ListingValidationIssueView, SyncFreshnessView,
};
-use crate::runtime::RuntimeError;
use crate::runtime::accounts;
use crate::runtime::config::RuntimeConfig;
use crate::runtime::daemon;
use crate::runtime::daemon::DaemonRpcError;
-use crate::runtime::signer::{ActorWriteBindingError, resolve_actor_write_authority};
+use crate::runtime::signer::{resolve_actor_write_authority, ActorWriteBindingError};
use crate::runtime::sync::freshness_from_executor;
+use crate::runtime::RuntimeError;
const DRAFT_KIND: &str = "listing_draft_v1";
const LISTING_SOURCE: &str = "local draft · local first";
@@ -141,29 +141,6 @@ struct CanonicalListingDraft {
listing: RadrootsListing,
}
-#[derive(Debug, Clone, Deserialize)]
-struct ListingRow {
- id: String,
- key: String,
- category: String,
- title: String,
- summary: String,
- qty_amt: i64,
- qty_unit: String,
- qty_label: Option<String>,
- qty_avail: Option<i64>,
- price_amt: f64,
- price_currency: String,
- price_qty_amt: u32,
- price_qty_unit: String,
- location_primary: Option<String>,
-}
-
-#[derive(Debug, Clone, Deserialize)]
-struct FarmRow {
- d_tag: String,
-}
-
#[derive(Debug, Clone, Copy)]
pub enum ListingMutationOperation {
Publish,
@@ -398,8 +375,8 @@ pub fn get(config: &RuntimeConfig, args: &RecordKeyArgs) -> Result<ListingGetVie
});
}
- let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
- let rows = query_listing_rows(&executor, args.key.as_str())?;
+ let db = ReplicaSql::new(SqliteExecutor::open(&config.local.replica_db_path)?);
+ let rows = db.trade_product_lookup(args.key.as_str())?;
let Some(row) = rows.into_iter().next() else {
return Ok(ListingGetView {
state: "missing".to_owned(),
@@ -1218,28 +1195,6 @@ fn issue_from_trade_validation(
}
}
-fn query_listing_rows(
- executor: &SqliteExecutor,
- lookup: &str,
-) -> Result<Vec<ListingRow>, RuntimeError> {
- let sql = "SELECT tp.id, tp.key, tp.category, tp.title, tp.summary, tp.qty_amt, tp.qty_unit, tp.qty_label, tp.qty_avail, tp.price_amt, tp.price_currency, tp.price_qty_amt, tp.price_qty_unit, loc.location_primary \
- FROM trade_product tp \
- LEFT JOIN (\
- SELECT tpl.tb_tp AS trade_product_id, MIN(COALESCE(gl.label, gl.gc_name, gl.gc_admin1_name, gl.gc_country_name, gl.d_tag)) AS location_primary \
- FROM trade_product_location tpl \
- JOIN gcs_location gl ON gl.id = tpl.tb_gl \
- GROUP BY tpl.tb_tp\
- ) loc ON loc.trade_product_id = tp.id \
- WHERE tp.id = ? OR tp.key = ? \
- ORDER BY lower(tp.title) ASC, tp.id ASC;";
- let params = utils::to_params_json(vec![
- Value::from(lookup.to_owned()),
- Value::from(lookup.to_owned()),
- ])?;
- let raw = executor.query_raw(sql, ¶ms)?;
- serde_json::from_str(&raw).map_err(RuntimeError::from)
-}
-
fn resolve_selected_farm_d_tag(
config: &RuntimeConfig,
seller_pubkey: &str,
@@ -1247,16 +1202,9 @@ fn resolve_selected_farm_d_tag(
if !config.local.replica_db_path.exists() {
return Ok(None);
}
- let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
- let sql = "SELECT d_tag FROM farm WHERE pubkey = ? ORDER BY d_tag ASC;";
- let params = utils::to_params_json(vec![Value::from(seller_pubkey.to_owned())])?;
- let raw = executor.query_raw(sql, ¶ms)?;
- let rows: Vec<FarmRow> = serde_json::from_str(&raw).map_err(RuntimeError::from)?;
- if rows.len() == 1 {
- Ok(Some(rows[0].d_tag.clone()))
- } else {
- Ok(None)
- }
+ let db = ReplicaSql::new(SqliteExecutor::open(&config.local.replica_db_path)?);
+ db.farm_unique_d_tag_by_pubkey(seller_pubkey)
+ .map_err(RuntimeError::from)
}
fn parse_decimal_field(
@@ -1407,7 +1355,7 @@ fn encode_base64url_no_pad(bytes: [u8; 16]) -> String {
#[cfg(test)]
mod tests {
- use super::{DRAFT_KIND, ListingDraftDocument, encode_base64url_no_pad, generate_d_tag};
+ use super::{encode_base64url_no_pad, generate_d_tag, ListingDraftDocument, DRAFT_KIND};
use radroots_events_codec::d_tag::is_d_tag_base64url;
#[test]
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -1,17 +1,17 @@
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use radroots_replica_db::ReplicaSql;
use radroots_replica_sync::radroots_replica_sync_status;
-use radroots_sql_core::{SqlExecutor, SqliteExecutor};
-use serde_json::Value;
+use radroots_sql_core::SqliteExecutor;
use crate::cli::SyncWatchArgs;
use crate::domain::runtime::{
SyncActionView, SyncFreshnessView, SyncQueueView, SyncStatusView, SyncWatchFrameView,
SyncWatchView,
};
-use crate::runtime::RuntimeError;
use crate::runtime::config::RuntimeConfig;
+use crate::runtime::RuntimeError;
const SYNC_SOURCE: &str = "local replica · local first";
const RELAY_SETUP_ACTION: &str = "radroots relay ls --relay wss://relay.example.com";
@@ -215,20 +215,8 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
pub(crate) fn freshness_from_executor(
executor: &SqliteExecutor,
) -> Result<SyncFreshnessView, RuntimeError> {
- let raw = executor.query_raw(
- "SELECT MAX(last_created_at) AS last_created_at FROM nostr_event_state WHERE last_created_at IS NOT NULL",
- "[]",
- )?;
- let json: Value = serde_json::from_str(&raw)?;
- let last_event_at = json
- .as_array()
- .and_then(|rows| rows.first())
- .and_then(|row| row.get("last_created_at"))
- .and_then(|value| {
- value
- .as_u64()
- .or_else(|| value.as_i64().and_then(|signed| u64::try_from(signed).ok()))
- });
+ let db = ReplicaSql::new(executor);
+ let last_event_at = db.nostr_event_last_created_at()?;
Ok(match last_event_at {
Some(last_event_at) => {