commit a4ca6f873df7ca0c86793a713be7c412fbfc8277
parent edd28484dc6740fbfa84ceafdcb2315ddf29d50c
Author: triesap <tyson@radroots.org>
Date: Tue, 28 Apr 2026 00:19:00 +0000
market: refresh from direct relays
Diffstat:
5 files changed, 209 insertions(+), 18 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -1782,6 +1782,8 @@ version = "0.1.0-alpha.2"
dependencies = [
"nostr",
"nostr-sdk",
+ "radroots_events",
+ "radroots_events_codec",
"radroots_identity",
"serde",
"serde_json",
@@ -1872,6 +1874,7 @@ version = "0.1.0-alpha.2"
dependencies = [
"base64 0.22.1",
"hex",
+ "radroots_core",
"radroots_events",
"radroots_events_codec",
"radroots_replica_db",
diff --git a/Cargo.toml b/Cargo.toml
@@ -30,7 +30,7 @@ radroots_events_codec = { path = "../lib/crates/events_codec", features = ["nost
radroots_identity = { path = "../lib/crates/identity" }
radroots_log = { path = "../lib/crates/log" }
radroots_nostr_accounts = { path = "../lib/crates/nostr_accounts", features = ["os-keyring"] }
-radroots_nostr = { path = "../lib/crates/nostr", features = ["client"] }
+radroots_nostr = { path = "../lib/crates/nostr", features = ["client", "events"] }
radroots_nostr_signer = { path = "../lib/crates/nostr_signer" }
radroots_protected_store = { path = "../lib/crates/protected_store", features = ["std"] }
radroots_replica_db = { path = "../lib/crates/replica_db" }
diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs
@@ -2045,6 +2045,20 @@ pub struct SyncActionView {
pub publish_policy: String,
pub freshness: SyncFreshnessView,
pub queue: SyncQueueView,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub target_relays: Vec<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub connected_relays: Vec<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub failed_relays: Vec<RelayFailureView>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub fetched_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ingested_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub skipped_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub unsupported_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
diff --git a/src/operation_market.rs b/src/operation_market.rs
@@ -31,7 +31,9 @@ impl OperationService<MarketRefreshRequest> for MarketOperationService<'_> {
&self,
_request: OperationRequest<MarketRefreshRequest>,
) -> Result<OperationResult<Self::Result>, OperationAdapterError> {
- let view = market_refresh_view(map_runtime(crate::runtime::sync::pull(self.config))?);
+ let view = market_refresh_view(map_runtime(crate::runtime::sync::market_refresh(
+ self.config,
+ ))?);
serialized_operation_result::<MarketRefreshResult, _>(&view)
}
}
@@ -316,6 +318,43 @@ mod tests {
}
#[test]
+ fn market_refresh_dry_run_skips_relay_fetch_when_store_is_ready() {
+ let dir = tempdir().expect("tempdir");
+ let mut config = sample_config(dir.path());
+ config.output.dry_run = true;
+ config.relay.urls = vec!["wss://relay.example.com".to_owned()];
+ crate::runtime::local::init(&config).expect("store init");
+
+ let service = OperationAdapter::new(MarketOperationService::new(&config));
+ let mut context = OperationContext::default();
+ context.dry_run = true;
+ let request = OperationRequest::new(context.clone(), MarketRefreshRequest::default())
+ .expect("market refresh request");
+ let envelope = service
+ .execute(request)
+ .expect("market refresh dry run")
+ .to_envelope(context.envelope_context("req_market_refresh"))
+ .expect("market refresh envelope");
+
+ assert_eq!(envelope.operation_id, "market.refresh");
+ assert_eq!(envelope.result["state"], "ready");
+ assert_eq!(
+ envelope.result["target_relays"][0],
+ "wss://relay.example.com"
+ );
+ assert_eq!(envelope.result["fetched_count"], 0);
+ assert_eq!(envelope.result["ingested_count"], 0);
+ assert_eq!(envelope.result["skipped_count"], 0);
+ assert_eq!(envelope.result["unsupported_count"], 0);
+ assert!(
+ envelope.result["reason"]
+ .as_str()
+ .expect("reason")
+ .contains("dry run")
+ );
+ }
+
+ #[test]
fn market_product_search_uses_find_runtime_without_top_level_find() {
let dir = tempdir().expect("tempdir");
let config = sample_config(dir.path());
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -1,16 +1,25 @@
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use radroots_events::kinds::{KIND_FARM, KIND_LISTING, KIND_PROFILE};
+use radroots_nostr::prelude::{
+ RadrootsNostrFilter, radroots_event_from_nostr, radroots_nostr_kind,
+};
use radroots_replica_db::ReplicaSql;
-use radroots_replica_sync::radroots_replica_sync_status;
+use radroots_replica_sync::{
+ RadrootsReplicaIngestOutcome, radroots_replica_ingest_event, radroots_replica_sync_status,
+};
use radroots_sql_core::SqliteExecutor;
use crate::domain::runtime::{
- SyncActionView, SyncFreshnessView, SyncQueueView, SyncStatusView, SyncWatchFrameView,
- SyncWatchView,
+ RelayFailureView, SyncActionView, SyncFreshnessView, SyncQueueView, SyncStatusView,
+ SyncWatchFrameView, SyncWatchView,
};
use crate::runtime::RuntimeError;
use crate::runtime::config::RuntimeConfig;
+use crate::runtime::direct_relay::{
+ DirectRelayFailure, DirectRelayFetchReceipt, fetch_events_from_relays,
+};
use crate::runtime_args::SyncWatchArgs;
const SYNC_SOURCE: &str = "local replica · local first";
@@ -54,6 +63,65 @@ pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
)
}
+pub fn market_refresh(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
+ let snapshot = inspect_sync(config)?;
+ if snapshot.state == "unconfigured" {
+ return Ok(empty_action_from_snapshot(snapshot, "pull"));
+ }
+
+ if config.output.dry_run {
+ let mut view = empty_action_from_snapshot(snapshot, "pull");
+ view.state = "ready".to_owned();
+ view.reason = Some("dry run requested; relay fetch skipped".to_owned());
+ view.target_relays = config.relay.urls.clone();
+ view.fetched_count = Some(0);
+ view.ingested_count = Some(0);
+ view.skipped_count = Some(0);
+ view.unsupported_count = Some(0);
+ return Ok(view);
+ }
+
+ let receipt = match fetch_events_from_relays(&config.relay.urls, market_refresh_filter()) {
+ Ok(receipt) => receipt,
+ Err(error) => {
+ let mut view = empty_action_from_snapshot(snapshot, "pull");
+ view.state = "unavailable".to_owned();
+ view.reason = Some(error.to_string());
+ view.target_relays = config.relay.urls.clone();
+ return Ok(view);
+ }
+ };
+
+ let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
+ let ingest = ingest_market_events(&executor, &receipt)?;
+ let freshness = freshness_from_executor(&executor)?;
+ let queue = radroots_replica_sync_status(&executor)?;
+
+ Ok(SyncActionView {
+ direction: "pull".to_owned(),
+ state: "ready".to_owned(),
+ source: "direct Nostr relay fetch · local replica ingest".to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "ready".to_owned(),
+ relay_count: config.relay.urls.len(),
+ publish_policy: config.relay.publish_policy.as_str().to_owned(),
+ freshness,
+ queue: SyncQueueView {
+ expected_count: queue.expected_count,
+ pending_count: queue.pending_count,
+ },
+ target_relays: receipt.target_relays,
+ connected_relays: receipt.connected_relays,
+ failed_relays: relay_failures(receipt.failed_relays),
+ fetched_count: Some(ingest.fetched_count),
+ ingested_count: Some(ingest.ingested_count),
+ skipped_count: Some(ingest.skipped_count),
+ unsupported_count: Some(ingest.unsupported_count),
+ reason: None,
+ actions: vec!["radroots market product search eggs".to_owned()],
+ })
+}
+
pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
narrowed_action(
config,
@@ -107,19 +175,7 @@ fn narrowed_action(
) -> Result<SyncActionView, RuntimeError> {
let snapshot = inspect_sync(config)?;
if snapshot.state == "unconfigured" {
- return Ok(SyncActionView {
- direction: direction.to_owned(),
- state: snapshot.state,
- source: snapshot.source,
- local_root: snapshot.local_root,
- replica_db: snapshot.replica_db,
- relay_count: snapshot.relay_count,
- publish_policy: snapshot.publish_policy,
- freshness: snapshot.freshness,
- queue: snapshot.queue,
- reason: snapshot.reason,
- actions: snapshot.actions,
- });
+ return Ok(empty_action_from_snapshot(snapshot, direction));
}
let mut actions = vec!["radroots sync status get".to_owned()];
@@ -135,11 +191,41 @@ fn narrowed_action(
publish_policy: snapshot.publish_policy,
freshness: snapshot.freshness,
queue: snapshot.queue,
+ target_relays: Vec::new(),
+ connected_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ fetched_count: None,
+ ingested_count: None,
+ skipped_count: None,
+ unsupported_count: None,
reason: Some(unavailable_reason.to_owned()),
actions,
})
}
+fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncActionView {
+ SyncActionView {
+ direction: direction.to_owned(),
+ state: snapshot.state,
+ source: snapshot.source,
+ local_root: snapshot.local_root,
+ replica_db: snapshot.replica_db,
+ relay_count: snapshot.relay_count,
+ publish_policy: snapshot.publish_policy,
+ freshness: snapshot.freshness,
+ queue: snapshot.queue,
+ target_relays: Vec::new(),
+ connected_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ fetched_count: None,
+ ingested_count: None,
+ skipped_count: None,
+ unsupported_count: None,
+ reason: snapshot.reason,
+ actions: snapshot.actions,
+ }
+}
+
fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
if !config.local.replica_db_path.exists() {
return Ok(SyncSnapshot {
@@ -237,6 +323,55 @@ pub(crate) fn freshness_from_executor(
})
}
+#[derive(Debug, Clone, Copy, Default)]
+struct MarketIngestCounts {
+ fetched_count: usize,
+ ingested_count: usize,
+ skipped_count: usize,
+ unsupported_count: usize,
+}
+
+fn market_refresh_filter() -> RadrootsNostrFilter {
+ RadrootsNostrFilter::new()
+ .kinds([
+ radroots_nostr_kind(KIND_PROFILE as u16),
+ radroots_nostr_kind(KIND_FARM as u16),
+ radroots_nostr_kind(KIND_LISTING as u16),
+ ])
+ .limit(1_000)
+}
+
+fn ingest_market_events(
+ executor: &SqliteExecutor,
+ receipt: &DirectRelayFetchReceipt,
+) -> Result<MarketIngestCounts, RuntimeError> {
+ let mut counts = MarketIngestCounts {
+ fetched_count: receipt.events.len(),
+ ..MarketIngestCounts::default()
+ };
+
+ for event in &receipt.events {
+ let event = radroots_event_from_nostr(event);
+ match radroots_replica_ingest_event(executor, &event) {
+ Ok(RadrootsReplicaIngestOutcome::Applied) => counts.ingested_count += 1,
+ Ok(RadrootsReplicaIngestOutcome::Skipped) => counts.skipped_count += 1,
+ Err(_) => counts.unsupported_count += 1,
+ }
+ }
+
+ Ok(counts)
+}
+
+fn relay_failures(failures: Vec<DirectRelayFailure>) -> Vec<RelayFailureView> {
+ failures
+ .into_iter()
+ .map(|failure| RelayFailureView {
+ relay: failure.relay,
+ reason: failure.reason,
+ })
+ .collect()
+}
+
fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)