cli

Command-line interface for Radroots
git clone https://radroots.dev/git/cli.git
Log | Files | Refs | README | LICENSE

commit 7a9e205d44cc009b6e9fbfd1b94cfd2f44852a28
parent 2a3643e65a1e8df98d6a99ecb263e1d908e4777f
Author: triesap <tyson@radroots.org>
Date:   Sun, 10 May 2026 02:20:23 +0000

cli: harden local replica freshness

- add relay-set-aware sync run metadata for market refresh and sync pull
- refresh stale market reads and block live order submit before stale signing
- project direct farm publish events into the local replica
- document local replica freshness, watch, and export semantics

Diffstat:
Msrc/domain/runtime.rs | 49+++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/operation_market.rs | 1+
Msrc/runtime/farm.rs | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/runtime/find.rs | 31++++++++++++++++++++++---------
Msrc/runtime/listing.rs | 32+++++++++++++++++++++++---------
Msrc/runtime/local.rs | 4++++
Msrc/runtime/order.rs | 41+++++++++++++++++++++++++++++++++++++++++
Msrc/runtime/sync.rs | 587++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtests/signer_runtime_modes.rs | 14+++++++++++++-
9 files changed, 835 insertions(+), 44 deletions(-)

diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs @@ -855,6 +855,8 @@ pub struct FarmPublishView { pub profile: FarmPublishComponentView, pub farm: FarmPublishComponentView, #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub local_replica: Vec<FarmPublishLocalReplicaView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub missing: Vec<String>, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option<String>, @@ -910,6 +912,23 @@ pub struct FarmPublishComponentView { } #[derive(Debug, Clone, Serialize)] +pub struct FarmPublishLocalReplicaView { + pub component: String, + pub state: String, + pub store_state: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub ingest_outcome: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub event_id: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub event_addr: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub actions: Vec<String>, +} + +#[derive(Debug, Clone, Serialize)] pub struct FarmPublishJobView { pub rpc_method: String, pub state: String, @@ -2778,6 +2797,36 @@ pub struct SyncFreshnessView { pub age_seconds: Option<u64>, #[serde(skip_serializing_if = "Option::is_none")] pub last_event_at: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub run: Option<SyncRunFreshnessView>, +} + +#[derive(Debug, Clone, Serialize)] +pub struct SyncRunFreshnessView { + pub scope: String, + pub relay_set_fingerprint: String, + pub relay_set_current: bool, + pub last_state: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_attempted_at: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_successful_at: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_completed_at: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub stale_after_seconds: Option<u64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub fetched_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub ingested_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub skipped_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub unsupported_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure_reason: Option<String>, } #[derive(Debug, Clone, Serialize)] diff --git a/src/operation_market.rs b/src/operation_market.rs @@ -504,6 +504,7 @@ mod tests { display: "fresh".to_owned(), age_seconds: Some(0), last_event_at: Some(0), + run: None, } } diff --git a/src/runtime/farm.rs b/src/runtime/farm.rs @@ -9,19 +9,23 @@ use radroots_events_codec::d_tag::is_d_tag_base64url; use radroots_events_codec::farm::encode::to_wire_parts_with_kind; use radroots_events_codec::profile::encode::to_wire_parts_with_profile_type; use radroots_events_codec::wire::WireEventParts; +use radroots_nostr::prelude::radroots_event_from_nostr; +use radroots_replica_db::migrations; +use radroots_replica_sync::{RadrootsReplicaIngestOutcome, radroots_replica_ingest_event}; use radroots_sdk::{ RadrootsSdkClient, RadrootsSdkConfig, RadrootsdAuth, SdkEnvironment, SdkPublishError, SdkPublishReceipt, SdkRadrootsdFarmPublishOptions, SdkRadrootsdProfilePublishOptions, SdkRadrootsdPublishReceipt, SdkRadrootsdSignerSessionRef, SdkTransportMode, SdkTransportReceipt, SignerConfig as SdkSignerConfig, }; +use radroots_sql_core::SqliteExecutor; use serde_json::json; use crate::domain::runtime::{ FarmConfigDocumentView, FarmConfigSummaryView, FarmGetView, FarmListingDefaultsView, FarmPublicationView, FarmPublishComponentView, FarmPublishEventView, FarmPublishJobView, - FarmPublishView, FarmRebindView, FarmSelectionView, FarmSetView, FarmSetupView, FarmStatusView, - RelayFailureView, + FarmPublishLocalReplicaView, FarmPublishView, FarmRebindView, FarmSelectionView, FarmSetView, + FarmSetupView, FarmStatusView, RelayFailureView, }; use crate::runtime::RuntimeError; use crate::runtime::accounts::{self, AccountRecordView}; @@ -829,6 +833,8 @@ fn publish_via_direct_relay( previews.profile.parts.clone(), ) .map_err(|error| RuntimeError::Network(error.to_string()))?; + let profile_local_replica = + farm_local_replica_ingest_view(config, "profile", &profile_receipt, None); persist_profile_publication(config, &mut resolved, profile_receipt.event_id.clone())?; let farm_receipt = match publish_parts_with_identity( @@ -847,13 +853,20 @@ fn publish_via_direct_relay( profile_idempotency_key, farm_idempotency_key, profile_receipt, + profile_local_replica, error, )); } }; + let farm_local_replica = farm_local_replica_ingest_view( + config, + "farm", + &farm_receipt, + previews.farm.event.event_addr.clone(), + ); persist_farm_publication(config, &mut resolved, farm_receipt.event_id.clone())?; - Ok(base_publish_view( + let mut view = base_publish_view( "published", config, args, @@ -877,7 +890,9 @@ fn publish_via_direct_relay( ), None, Vec::new(), - )) + ); + view.local_replica = vec![profile_local_replica, farm_local_replica]; + Ok(view) } fn publish_via_radrootsd( @@ -1063,6 +1078,7 @@ fn missing_publish_view( None, ), farm: not_submitted_component(farm_publish_rpc_method(config), KIND_FARM, args, None, None), + local_replica: Vec::new(), missing, reason: Some(reason), actions, @@ -1127,6 +1143,7 @@ fn base_publish_view( requested_signer_session_id: args.signer_session_id.clone(), profile, farm, + local_replica: Vec::new(), missing: Vec::new(), reason, actions, @@ -1283,10 +1300,11 @@ fn partial_publish_view( profile_idempotency_key: Option<String>, farm_idempotency_key: Option<String>, profile_receipt: DirectRelayPublishReceipt, + profile_local_replica: FarmPublishLocalReplicaView, farm_error: DirectRelayPublishError, ) -> FarmPublishView { let reason = format!("farm publish failed after profile publish: {farm_error}"); - base_publish_view( + let mut view = base_publish_view( "partial", config, args, @@ -1311,7 +1329,9 @@ fn partial_publish_view( ), Some(reason), vec!["radroots farm publish".to_owned()], - ) + ); + view.local_replica = vec![profile_local_replica]; + view } fn published_component( @@ -1345,6 +1365,94 @@ fn published_component( } } +fn farm_local_replica_ingest_view( + config: &RuntimeConfig, + component: &str, + receipt: &DirectRelayPublishReceipt, + event_addr: Option<String>, +) -> FarmPublishLocalReplicaView { + if !config.local.replica_db_path.exists() { + return FarmPublishLocalReplicaView { + component: component.to_owned(), + state: "unconfigured".to_owned(), + store_state: "missing".to_owned(), + ingest_outcome: None, + event_id: Some(receipt.event_id.clone()), + event_addr, + reason: Some("local replica database is not initialized".to_owned()), + actions: vec!["radroots store init".to_owned()], + }; + } + + let executor = match SqliteExecutor::open(&config.local.replica_db_path) { + Ok(executor) => executor, + Err(error) => { + return farm_local_replica_failed_view( + component, + receipt.event_id.clone(), + event_addr, + format!("failed to open local replica database: {error}"), + ); + } + }; + if let Err(error) = migrations::run_all_up(&executor) { + return farm_local_replica_failed_view( + component, + receipt.event_id.clone(), + event_addr, + format!("failed to migrate local replica database: {error}"), + ); + } + + let event = radroots_event_from_nostr(&receipt.event); + match radroots_replica_ingest_event(&executor, &event) { + Ok(RadrootsReplicaIngestOutcome::Applied) => FarmPublishLocalReplicaView { + component: component.to_owned(), + state: "applied".to_owned(), + store_state: "ready".to_owned(), + ingest_outcome: Some("applied".to_owned()), + event_id: Some(receipt.event_id.clone()), + event_addr, + reason: None, + actions: Vec::new(), + }, + Ok(RadrootsReplicaIngestOutcome::Skipped) => FarmPublishLocalReplicaView { + component: component.to_owned(), + state: "skipped".to_owned(), + store_state: "ready".to_owned(), + ingest_outcome: Some("skipped".to_owned()), + event_id: Some(receipt.event_id.clone()), + event_addr, + reason: Some("shared replica ingest skipped the event".to_owned()), + actions: Vec::new(), + }, + Err(error) => farm_local_replica_failed_view( + component, + receipt.event_id.clone(), + event_addr, + format!("failed to ingest farm publish event into local replica: {error}"), + ), + } +} + +fn farm_local_replica_failed_view( + component: &str, + event_id: String, + event_addr: Option<String>, + reason: String, +) -> FarmPublishLocalReplicaView { + FarmPublishLocalReplicaView { + component: component.to_owned(), + state: "failed".to_owned(), + store_state: "unavailable".to_owned(), + ingest_outcome: None, + event_id: Some(event_id), + event_addr, + reason: Some(reason), + actions: vec!["radroots store status get".to_owned()], + } +} + fn failed_component( rpc_method: &str, event_kind: u32, diff --git a/src/runtime/find.rs b/src/runtime/find.rs @@ -3,12 +3,15 @@ use radroots_sql_core::SqliteExecutor; use crate::domain::runtime::{ FindHyfView, FindPriceView, FindQuantityView, FindResultHyfView, FindResultProvenanceView, - FindResultView, FindView, SyncFreshnessView, + FindResultView, FindView, }; use crate::runtime::RuntimeError; use crate::runtime::config::RuntimeConfig; use crate::runtime::hyf::{self, HyfQueryRewriteRequest, HyfRequestContext}; -use crate::runtime::sync::freshness_from_executor; +use crate::runtime::sync::{ + RelayIngestScope, freshness_for_scope_from_executor, freshness_requires_refresh, + market_refresh, missing_freshness, +}; use crate::runtime_args::FindQueryArgs; const FIND_SOURCE: &str = "local replica · local first"; @@ -50,12 +53,7 @@ pub fn search(config: &RuntimeConfig, args: &FindQueryArgs) -> Result<FindView, count: 0, relay_count: config.relay.urls.len(), replica_db: config.local.replica_db_path.display().to_string(), - freshness: SyncFreshnessView { - state: "never".to_owned(), - display: "never synced".to_owned(), - age_seconds: None, - last_event_at: None, - }, + freshness: missing_freshness(), results: Vec::new(), hyf: None, reason: Some("local replica database is not initialized".to_owned()), @@ -63,8 +61,10 @@ pub fn search(config: &RuntimeConfig, args: &FindQueryArgs) -> Result<FindView, }); } + refresh_market_if_needed(config)?; let db = ReplicaSql::new(SqliteExecutor::open(&config.local.replica_db_path)?); - let freshness = freshness_from_executor(db.executor())?; + let freshness = + freshness_for_scope_from_executor(config, db.executor(), RelayIngestScope::MarketRefresh)?; let applied_query_rewrite = attempt_query_rewrite(config, query.as_str(), &args.query); let effective_query_terms = applied_query_rewrite .as_ref() @@ -136,6 +136,19 @@ pub fn search(config: &RuntimeConfig, args: &FindQueryArgs) -> Result<FindView, }) } +fn refresh_market_if_needed(config: &RuntimeConfig) -> Result<(), RuntimeError> { + if config.output.dry_run || config.relay.urls.is_empty() { + return Ok(()); + } + let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + let freshness = + freshness_for_scope_from_executor(config, &executor, RelayIngestScope::MarketRefresh)?; + if freshness_requires_refresh(&freshness) { + let _ = market_refresh(config)?; + } + Ok(()) +} + fn attempt_query_rewrite( config: &RuntimeConfig, query: &str, diff --git a/src/runtime/listing.rs b/src/runtime/listing.rs @@ -39,7 +39,7 @@ use crate::domain::runtime::{ FindPriceView, FindQuantityView, FindResultProvenanceView, ListingGetView, ListingListView, ListingMutationEventView, ListingMutationJobView, ListingMutationLocalReplicaView, ListingMutationView, ListingNewView, ListingRebindView, ListingSummaryView, - ListingValidateView, ListingValidationIssueView, RelayFailureView, SyncFreshnessView, + ListingValidateView, ListingValidationIssueView, RelayFailureView, }; use crate::runtime::RuntimeError; use crate::runtime::accounts; @@ -52,7 +52,9 @@ use crate::runtime::direct_relay::{ }; use crate::runtime::farm_config; use crate::runtime::signer::{ActorWriteBindingError, resolve_actor_write_authority}; -use crate::runtime::sync::freshness_from_executor; +use crate::runtime::sync::{ + RelayIngestScope, freshness_for_scope_from_executor, market_refresh, missing_freshness, +}; use crate::runtime_args::{ ListingCreateArgs, ListingFileArgs, ListingMutationArgs, ListingRebindArgs, RecordLookupArgs, }; @@ -930,16 +932,12 @@ pub fn get( config: &RuntimeConfig, args: &RecordLookupArgs, ) -> Result<ListingGetView, RuntimeError> { + refresh_market_listing_if_needed(config)?; let freshness = if config.local.replica_db_path.exists() { let executor = SqliteExecutor::open(&config.local.replica_db_path)?; - freshness_from_executor(&executor)? + freshness_for_scope_from_executor(config, &executor, RelayIngestScope::MarketRefresh)? } else { - SyncFreshnessView { - state: "never".to_owned(), - display: "never synced".to_owned(), - age_seconds: None, - last_event_at: None, - } + missing_freshness() }; let provenance = FindResultProvenanceView { origin: "local_replica.trade_product".to_owned(), @@ -1024,6 +1022,22 @@ pub fn get( }) } +fn refresh_market_listing_if_needed(config: &RuntimeConfig) -> Result<(), RuntimeError> { + if !config.local.replica_db_path.exists() + || config.output.dry_run + || config.relay.urls.is_empty() + { + return Ok(()); + } + let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + let freshness = + freshness_for_scope_from_executor(config, &executor, RelayIngestScope::MarketRefresh)?; + if crate::runtime::sync::freshness_requires_refresh(&freshness) { + let _ = market_refresh(config)?; + } + Ok(()) +} + pub fn publish( config: &RuntimeConfig, args: &ListingMutationArgs, diff --git a/src/runtime/local.rs b/src/runtime/local.rs @@ -14,6 +14,7 @@ use crate::domain::runtime::{ }; use crate::runtime::RuntimeError; use crate::runtime::config::RuntimeConfig; +use crate::runtime::sync::ensure_sync_run_table; use crate::runtime_args::LocalExportFormatArg; const LOCAL_SOURCE: &str = "local replica · local first"; @@ -23,6 +24,7 @@ pub fn init(config: &RuntimeConfig) -> Result<LocalInitView, RuntimeError> { ensure_local_roots(config)?; let executor = SqliteExecutor::open(&config.local.replica_db_path)?; migrations::run_all_up(&executor)?; + ensure_sync_run_table(&executor)?; let manifest = export_manifest(&executor)?; Ok(LocalInitView { @@ -44,6 +46,7 @@ pub fn init_preflight(config: &RuntimeConfig) -> Result<LocalInitView, RuntimeEr validate_local_roots(config)?; if config.local.replica_db_path.exists() { let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + ensure_sync_run_table(&executor)?; let manifest = export_manifest(&executor)?; return Ok(LocalInitView { state: "ready".to_owned(), @@ -95,6 +98,7 @@ pub fn status(config: &RuntimeConfig) -> Result<LocalStatusView, RuntimeError> { } let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + ensure_sync_run_table(&executor)?; let manifest = export_manifest(&executor)?; let sync = radroots_replica_sync_status(&executor)?; diff --git a/src/runtime/order.rs b/src/runtime/order.rs @@ -96,6 +96,9 @@ use crate::runtime::direct_relay::{ fetch_events_from_relays, publish_parts_with_identity, }; use crate::runtime::signer::ActorWriteBindingError; +use crate::runtime::sync::{ + RelayIngestScope, freshness_for_scope, freshness_requires_refresh, market_refresh, +}; use crate::runtime_args::{ OrderCancelArgs, OrderDecisionArg, OrderDecisionArgs, OrderDraftCreateArgs, OrderFulfillmentArgs, OrderPaymentArgs, OrderReceiptArgs, OrderRevisionDecisionArg, @@ -691,6 +694,10 @@ pub fn submit( )); } + if let Some(view) = order_submit_market_freshness_view(config, &loaded, args)? { + return Ok(view); + } + if let Some(view) = order_submit_existing_request_preflight_view(config, &loaded, args, &payload)? { @@ -9301,6 +9308,40 @@ fn order_submit_invalid_quantity_view( } } +fn order_submit_market_freshness_view( + config: &RuntimeConfig, + loaded: &LoadedOrderDraft, + args: &OrderSubmitArgs, +) -> Result<Option<OrderSubmitView>, RuntimeError> { + if config.output.dry_run { + return Ok(None); + } + + let mut freshness = freshness_for_scope(config, RelayIngestScope::MarketRefresh)?; + if freshness_requires_refresh(&freshness) { + let _ = market_refresh(config)?; + freshness = freshness_for_scope(config, RelayIngestScope::MarketRefresh)?; + } + if !freshness_requires_refresh(&freshness) { + return Ok(None); + } + + Ok(Some(order_submit_unconfigured_view( + config, + loaded, + args, + "order submit requires a current market refresh before signing; run `radroots market refresh` with the relays you trust, then submit again", + vec![issue( + "order.listing_addr", + format!( + "local market freshness is `{}`; current listing state must be refreshed before order submit", + freshness.state + ), + )], + vec!["radroots market refresh".to_owned()], + ))) +} + fn order_submit_existing_request_preflight_view( config: &RuntimeConfig, loaded: &LoadedOrderDraft, diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs @@ -12,7 +12,7 @@ use radroots_events::kinds::{ use radroots_events_codec::wire::WireEventParts; use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::{ - RadrootsNostrFilter, radroots_event_from_nostr, radroots_nostr_kind, + RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_event_from_nostr, radroots_nostr_kind, }; use radroots_replica_db::{ReplicaSql, migrations}; use radroots_replica_sync::{ @@ -20,12 +20,14 @@ use radroots_replica_sync::{ radroots_replica_ingest_event, radroots_replica_ingest_event_state, radroots_replica_pending_publish_batch, radroots_replica_sync_status, }; -use radroots_sql_core::SqliteExecutor; +use radroots_sql_core::{SqlExecutor, SqliteExecutor}; +use serde::Deserialize; +use serde_json::json; use crate::domain::runtime::{ RelayFailureView, SyncActionView, SyncFreshnessView, SyncPublishPlanAuthorView, - SyncPublishPlanKindView, SyncPublishPlanView, SyncQueueView, SyncStatusView, - SyncWatchFrameView, SyncWatchView, + SyncPublishPlanKindView, SyncPublishPlanView, SyncQueueView, SyncRunFreshnessView, + SyncStatusView, SyncWatchFrameView, SyncWatchView, }; use crate::runtime::RuntimeError; use crate::runtime::accounts; @@ -47,6 +49,10 @@ const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest"; const PUBLISH_SOURCE: &str = "direct Nostr relay publish · local replica sync"; pub(crate) const RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON: &str = "sync push is only available in publish mode `nostr_relay`; radrootsd sync push is not implemented"; const RELAY_FETCH_LIMIT: usize = 1_000; +const RELAY_FETCH_MAX_PAGES: usize = 5; +const MARKET_FRESHNESS_STALE_AFTER_SECONDS: u64 = 15 * 60; +const SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS: u64 = 30 * 60; +const SYNC_RUN_TABLE: &str = "radroots_cli_sync_run"; const MARKET_REFRESH_KINDS: &[u32] = &[KIND_PROFILE, KIND_FARM, KIND_LISTING]; const SYNC_PULL_KINDS: &[u32] = &[ KIND_PROFILE, @@ -84,6 +90,39 @@ struct SyncSnapshot { actions: Vec<String>, } +#[derive(Debug, Clone)] +struct SyncRunRecord { + scope: String, + relay_set_fingerprint: String, + target_relays_json: String, + connected_relays_json: String, + failed_relays_json: String, + started_at: u64, + completed_at: Option<u64>, + state: String, + fetched_count: usize, + ingested_count: usize, + skipped_count: usize, + unsupported_count: usize, + failed_count: usize, + failure_reason: Option<String>, +} + +#[derive(Debug, Deserialize)] +struct SyncRunRow { + scope: String, + relay_set_fingerprint: String, + started_at: i64, + completed_at: Option<i64>, + state: String, + fetched_count: i64, + ingested_count: i64, + skipped_count: i64, + unsupported_count: i64, + failed_count: i64, + failure_reason: Option<String>, +} + pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, RuntimeError> { let snapshot = inspect_sync(config)?; Ok(SyncStatusView { @@ -101,11 +140,11 @@ pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, RuntimeError> { } pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { - pull_with_fetcher(config, fetch_events_from_relays) + pull_with_fetcher(config, fetch_events_from_relays_windowed) } pub fn market_refresh(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { - market_refresh_with_fetcher(config, fetch_events_from_relays) + market_refresh_with_fetcher(config, fetch_events_from_relays_windowed) } fn pull_with_fetcher<F>(config: &RuntimeConfig, fetcher: F) -> Result<SyncActionView, RuntimeError> @@ -131,6 +170,40 @@ where relay_ingest(config, RelayIngestScope::MarketRefresh, fetcher) } +fn fetch_events_from_relays_windowed( + relay_urls: &[String], + base_filter: RadrootsNostrFilter, +) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { + let mut next_filter = base_filter.clone(); + let mut merged: Option<DirectRelayFetchReceipt> = None; + + for _ in 0..RELAY_FETCH_MAX_PAGES { + let receipt = fetch_events_from_relays(relay_urls, next_filter)?; + let page_len = receipt.events.len(); + let oldest_created_at = receipt + .events + .iter() + .map(|event| event.created_at.as_secs()) + .min(); + merge_fetch_receipt(&mut merged, receipt); + if page_len < RELAY_FETCH_LIMIT { + break; + } + let Some(oldest_created_at) = oldest_created_at else { + break; + }; + if oldest_created_at == 0 { + break; + } + next_filter = base_filter + .clone() + .until(RadrootsNostrTimestamp::from(oldest_created_at - 1)) + .limit(RELAY_FETCH_LIMIT); + } + + merged.ok_or(DirectRelayFetchError::MissingRelays) +} + fn relay_ingest<F>( config: &RuntimeConfig, scope: RelayIngestScope, @@ -163,6 +236,7 @@ where return Ok(view); } + let started_at = unix_now(); let receipt = match fetcher(&config.relay.urls, scope.filter()) { Ok(receipt) => receipt, Err(DirectRelayFetchError::Connect { @@ -170,18 +244,49 @@ where target_relays, failed_relays, }) => { + let failed_relays = relay_failures(failed_relays); + let failure_reason = format!("direct relay connection failed: {reason}"); + let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + migrations::run_all_up(&executor)?; + record_sync_run( + &executor, + &sync_record_from_failure( + scope, + &config.relay.urls, + target_relays.clone(), + failed_relays.clone(), + started_at, + failure_reason.clone(), + )?, + )?; let mut view = empty_action_from_snapshot(snapshot, "pull"); view.state = "unavailable".to_owned(); - view.reason = Some(format!("direct relay connection failed: {reason}")); + view.reason = Some(failure_reason); view.target_relays = target_relays; - view.failed_relays = relay_failures(failed_relays); + view.failed_relays = failed_relays; + view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?; return Ok(view); } Err(error) => { + let failure_reason = error.to_string(); + let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + migrations::run_all_up(&executor)?; + record_sync_run( + &executor, + &sync_record_from_failure( + scope, + &config.relay.urls, + config.relay.urls.clone(), + Vec::new(), + started_at, + failure_reason.clone(), + )?, + )?; let mut view = empty_action_from_snapshot(snapshot, "pull"); view.state = "unavailable".to_owned(); - view.reason = Some(error.to_string()); + view.reason = Some(failure_reason); view.target_relays = config.relay.urls.clone(); + view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?; return Ok(view); } }; @@ -189,7 +294,11 @@ where let executor = SqliteExecutor::open(&config.local.replica_db_path)?; migrations::run_all_up(&executor)?; let ingest = ingest_events(&executor, &receipt, scope)?; - let freshness = freshness_from_executor(&executor)?; + record_sync_run( + &executor, + &sync_record_from_ingest(scope, &config.relay.urls, &receipt, &ingest, started_at)?, + )?; + let freshness = freshness_for_scope_from_executor(config, &executor, scope)?; let queue = radroots_replica_sync_status(&executor)?; Ok(SyncActionView { @@ -456,6 +565,7 @@ fn push_radrootsd_unavailable_view(config: &RuntimeConfig) -> SyncActionView { display: "not checked".to_owned(), age_seconds: None, last_event_at: None, + run: None, }, queue: SyncQueueView { expected_count: 0, @@ -742,12 +852,7 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { replica_db: "missing".to_owned(), relay_count: config.relay.urls.len(), publish_policy: config.relay.publish_policy.as_str().to_owned(), - freshness: SyncFreshnessView { - state: "never".to_owned(), - display: "never synced".to_owned(), - age_seconds: None, - last_event_at: None, - }, + freshness: missing_freshness(), queue: SyncQueueView { expected_count: 0, pending_count: 0, @@ -758,8 +863,10 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { } let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + migrations::run_all_up(&executor)?; let queue = radroots_replica_sync_status(&executor)?; - let freshness = freshness_from_executor(&executor)?; + let freshness = + freshness_for_scope_from_executor(config, &executor, RelayIngestScope::SyncPull)?; let relay_count = config.relay.urls.len(); let publish_policy = config.relay.publish_policy.as_str().to_owned(); let mut actions = Vec::new(); @@ -819,6 +926,7 @@ pub(crate) fn freshness_from_executor( display: format!("synced {}", relative_age(age_seconds)), age_seconds: Some(age_seconds), last_event_at: Some(last_event_at), + run: None, } } None => SyncFreshnessView { @@ -826,10 +934,368 @@ pub(crate) fn freshness_from_executor( display: "never synced".to_owned(), age_seconds: None, last_event_at: None, + run: None, }, }) } +pub(crate) fn missing_freshness() -> SyncFreshnessView { + SyncFreshnessView { + state: "never".to_owned(), + display: "never synced".to_owned(), + age_seconds: None, + last_event_at: None, + run: None, + } +} + +pub(crate) fn freshness_for_scope( + config: &RuntimeConfig, + scope: RelayIngestScope, +) -> Result<SyncFreshnessView, RuntimeError> { + let executor = SqliteExecutor::open(&config.local.replica_db_path)?; + migrations::run_all_up(&executor)?; + freshness_for_scope_from_executor(config, &executor, scope) +} + +pub(crate) fn freshness_for_scope_from_executor( + config: &RuntimeConfig, + executor: &SqliteExecutor, + scope: RelayIngestScope, +) -> Result<SyncFreshnessView, RuntimeError> { + let last_event_at = ReplicaSql::new(executor).nostr_event_last_created_at()?; + let now = unix_now(); + let age_seconds = last_event_at.map(|last_event_at| now.saturating_sub(last_event_at)); + ensure_sync_run_table(executor)?; + let current_fingerprint = relay_set_fingerprint(&config.relay.urls); + let latest = latest_sync_run(executor, scope)?; + let current = latest + .as_ref() + .filter(|run| run.relay_set_fingerprint == current_fingerprint); + let last_success = current.filter(|run| sync_run_successful(run)); + let state = freshness_state(scope, latest.as_ref(), current, last_success, age_seconds); + let display = freshness_display(scope, state.as_str(), age_seconds, current); + + Ok(SyncFreshnessView { + state, + display, + age_seconds, + last_event_at, + run: latest.map(|run| sync_run_freshness_view(scope, run, current_fingerprint)), + }) +} + +pub(crate) fn freshness_requires_refresh(freshness: &SyncFreshnessView) -> bool { + matches!( + freshness.state.as_str(), + "never" | "stale" | "relay_set_changed" | "refresh_failed" + ) +} + +fn freshness_state( + scope: RelayIngestScope, + latest: Option<&SyncRunRecord>, + current: Option<&SyncRunRecord>, + last_success: Option<&SyncRunRecord>, + age_seconds: Option<u64>, +) -> String { + let Some(latest) = latest else { + return "never".to_owned(); + }; + let Some(current) = current else { + return "relay_set_changed".to_owned(); + }; + if !sync_run_successful(current) { + return "refresh_failed".to_owned(); + } + if last_success.is_none() { + return "refresh_failed".to_owned(); + } + if age_seconds.is_none() { + return "fresh".to_owned(); + } + if age_seconds.unwrap_or_default() > scope.stale_after_seconds() { + return "stale".to_owned(); + } + if latest.state == "partial" { + return "partial".to_owned(); + } + "fresh".to_owned() +} + +fn freshness_display( + scope: RelayIngestScope, + state: &str, + age_seconds: Option<u64>, + run: Option<&SyncRunRecord>, +) -> String { + match state { + "fresh" => match age_seconds { + Some(age_seconds) => format!("{} fresh {}", scope.display(), relative_age(age_seconds)), + None => format!("{} fresh; no market events yet", scope.display()), + }, + "partial" => match age_seconds { + Some(age_seconds) => format!( + "{} partially refreshed {}", + scope.display(), + relative_age(age_seconds) + ), + None => format!( + "{} partially refreshed; no market events yet", + scope.display() + ), + }, + "stale" => match age_seconds { + Some(age_seconds) => format!("{} stale {}", scope.display(), relative_age(age_seconds)), + None => format!("{} stale", scope.display()), + }, + "relay_set_changed" => format!("{} relay set changed; refresh required", scope.display()), + "refresh_failed" => run + .and_then(|run| run.failure_reason.clone()) + .unwrap_or_else(|| format!("{} refresh failed", scope.display())), + _ => format!("{} never synced", scope.display()), + } +} + +fn sync_run_successful(run: &SyncRunRecord) -> bool { + matches!(run.state.as_str(), "success" | "partial") +} + +fn sync_run_freshness_view( + scope: RelayIngestScope, + run: SyncRunRecord, + current_fingerprint: String, +) -> SyncRunFreshnessView { + let relay_set_current = run.relay_set_fingerprint == current_fingerprint; + let successful = sync_run_successful(&run); + let last_successful_at = successful.then_some(run.completed_at.unwrap_or(run.started_at)); + SyncRunFreshnessView { + scope: run.scope, + relay_set_fingerprint: run.relay_set_fingerprint, + relay_set_current, + last_state: run.state, + last_attempted_at: Some(run.started_at), + last_successful_at, + last_completed_at: run.completed_at, + stale_after_seconds: Some(scope.stale_after_seconds()), + fetched_count: Some(run.fetched_count), + ingested_count: Some(run.ingested_count), + skipped_count: Some(run.skipped_count), + unsupported_count: Some(run.unsupported_count), + failed_count: Some(run.failed_count), + failure_reason: run.failure_reason, + } +} + +pub(crate) fn ensure_sync_run_table(executor: &SqliteExecutor) -> Result<(), RuntimeError> { + executor.exec( + "CREATE TABLE IF NOT EXISTS radroots_cli_sync_run ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + scope TEXT NOT NULL, + relay_set_fingerprint TEXT NOT NULL, + target_relays_json TEXT NOT NULL, + connected_relays_json TEXT NOT NULL, + failed_relays_json TEXT NOT NULL, + started_at INTEGER NOT NULL, + completed_at INTEGER, + state TEXT NOT NULL, + fetched_count INTEGER NOT NULL, + ingested_count INTEGER NOT NULL, + skipped_count INTEGER NOT NULL, + unsupported_count INTEGER NOT NULL, + failed_count INTEGER NOT NULL, + failure_reason TEXT + ); + CREATE INDEX IF NOT EXISTS idx_radroots_cli_sync_run_scope_started + ON radroots_cli_sync_run(scope, started_at DESC);", + "[]", + )?; + Ok(()) +} + +fn latest_sync_run( + executor: &SqliteExecutor, + scope: RelayIngestScope, +) -> Result<Option<SyncRunRecord>, RuntimeError> { + let rows = executor.query_raw( + &format!( + "SELECT scope, + relay_set_fingerprint, + started_at, + completed_at, + state, + fetched_count, + ingested_count, + skipped_count, + unsupported_count, + failed_count, + failure_reason + FROM {SYNC_RUN_TABLE} + WHERE scope = ?1 + ORDER BY started_at DESC, id DESC + LIMIT 1" + ), + json!([scope.id()]).to_string().as_str(), + )?; + let mut rows: Vec<SyncRunRow> = serde_json::from_str(rows.as_str())?; + Ok(rows.pop().map(sync_run_record_from_row)) +} + +fn sync_run_record_from_row(row: SyncRunRow) -> SyncRunRecord { + SyncRunRecord { + scope: row.scope, + relay_set_fingerprint: row.relay_set_fingerprint, + target_relays_json: String::new(), + connected_relays_json: String::new(), + failed_relays_json: String::new(), + started_at: u64_from_db(row.started_at), + completed_at: row.completed_at.map(u64_from_db), + state: row.state, + fetched_count: usize_from_db(row.fetched_count), + ingested_count: usize_from_db(row.ingested_count), + skipped_count: usize_from_db(row.skipped_count), + unsupported_count: usize_from_db(row.unsupported_count), + failed_count: usize_from_db(row.failed_count), + failure_reason: row.failure_reason, + } +} + +fn record_sync_run(executor: &SqliteExecutor, record: &SyncRunRecord) -> Result<(), RuntimeError> { + ensure_sync_run_table(executor)?; + executor.exec( + &format!( + "INSERT INTO {SYNC_RUN_TABLE} ( + scope, + relay_set_fingerprint, + target_relays_json, + connected_relays_json, + failed_relays_json, + started_at, + completed_at, + state, + fetched_count, + ingested_count, + skipped_count, + unsupported_count, + failed_count, + failure_reason + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)" + ), + json!([ + record.scope.as_str(), + record.relay_set_fingerprint.as_str(), + record.target_relays_json.as_str(), + record.connected_relays_json.as_str(), + record.failed_relays_json.as_str(), + i64_from_u64(record.started_at), + record.completed_at.map(i64_from_u64), + record.state.as_str(), + i64_from_usize(record.fetched_count), + i64_from_usize(record.ingested_count), + i64_from_usize(record.skipped_count), + i64_from_usize(record.unsupported_count), + i64_from_usize(record.failed_count), + record.failure_reason.as_deref(), + ]) + .to_string() + .as_str(), + )?; + Ok(()) +} + +fn sync_record_from_failure( + scope: RelayIngestScope, + relays: &[String], + target_relays: Vec<String>, + failed_relays: Vec<RelayFailureView>, + started_at: u64, + reason: String, +) -> Result<SyncRunRecord, RuntimeError> { + Ok(SyncRunRecord { + scope: scope.id().to_owned(), + relay_set_fingerprint: relay_set_fingerprint(relays), + target_relays_json: serde_json::to_string(&target_relays)?, + connected_relays_json: serde_json::to_string(&Vec::<String>::new())?, + failed_relays_json: serde_json::to_string(&failed_relays)?, + started_at, + completed_at: Some(unix_now()), + state: "failed".to_owned(), + fetched_count: 0, + ingested_count: 0, + skipped_count: 0, + unsupported_count: 0, + failed_count: 1, + failure_reason: Some(reason), + }) +} + +fn sync_record_from_ingest( + scope: RelayIngestScope, + relays: &[String], + receipt: &DirectRelayFetchReceipt, + ingest: &RelayIngestCounts, + started_at: u64, +) -> Result<SyncRunRecord, RuntimeError> { + let failed_relays = relay_failures(receipt.failed_relays.clone()); + let state = if ingest.failed_count > 0 || !failed_relays.is_empty() { + "partial" + } else { + "success" + }; + Ok(SyncRunRecord { + scope: scope.id().to_owned(), + relay_set_fingerprint: relay_set_fingerprint(relays), + target_relays_json: serde_json::to_string(&receipt.target_relays)?, + connected_relays_json: serde_json::to_string(&receipt.connected_relays)?, + failed_relays_json: serde_json::to_string(&failed_relays)?, + started_at, + completed_at: Some(unix_now()), + state: state.to_owned(), + fetched_count: ingest.fetched_count, + ingested_count: ingest.ingested_count, + skipped_count: ingest.skipped_count, + unsupported_count: ingest.unsupported_count, + failed_count: ingest.failed_count + failed_relays.len(), + failure_reason: ingest.reason(), + }) +} + +fn relay_set_fingerprint(relays: &[String]) -> String { + let mut normalized = relays + .iter() + .map(|relay| relay.trim().to_ascii_lowercase()) + .filter(|relay| !relay.is_empty()) + .collect::<Vec<_>>(); + normalized.sort(); + normalized.dedup(); + let mut hash = 0xcbf29ce484222325_u64; + for relay in normalized { + for byte in relay.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + hash ^= 0xff; + hash = hash.wrapping_mul(0x100000001b3); + } + format!("relayset_{hash:016x}") +} + +fn u64_from_db(value: i64) -> u64 { + u64::try_from(value).unwrap_or_default() +} + +fn usize_from_db(value: i64) -> usize { + usize::try_from(value).unwrap_or_default() +} + +fn i64_from_u64(value: u64) -> i64 { + i64::try_from(value).unwrap_or(i64::MAX) +} + +fn i64_from_usize(value: usize) -> i64 { + i64::try_from(value).unwrap_or(i64::MAX) +} + #[derive(Debug, Clone, Default)] struct RelayIngestCounts { fetched_count: usize, @@ -853,12 +1319,33 @@ impl RelayIngestCounts { } #[derive(Debug, Clone, Copy)] -enum RelayIngestScope { +pub(crate) enum RelayIngestScope { SyncPull, MarketRefresh, } impl RelayIngestScope { + fn id(self) -> &'static str { + match self { + Self::SyncPull => "sync_pull", + Self::MarketRefresh => "market_refresh", + } + } + + fn display(self) -> &'static str { + match self { + Self::SyncPull => "sync pull", + Self::MarketRefresh => "market refresh", + } + } + + fn stale_after_seconds(self) -> u64 { + match self { + Self::SyncPull => SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS, + Self::MarketRefresh => MARKET_FRESHNESS_STALE_AFTER_SECONDS, + } + } + fn kinds(self) -> &'static [u32] { match self { Self::SyncPull => SYNC_PULL_KINDS, @@ -935,6 +1422,32 @@ fn relay_failures(failures: Vec<DirectRelayFailure>) -> Vec<RelayFailureView> { .collect() } +fn merge_fetch_receipt( + target: &mut Option<DirectRelayFetchReceipt>, + receipt: DirectRelayFetchReceipt, +) { + match target { + Some(target) => { + push_unique_many(&mut target.target_relays, receipt.target_relays.iter()); + push_unique_many( + &mut target.connected_relays, + receipt.connected_relays.iter(), + ); + for failure in receipt.failed_relays { + if !target + .failed_relays + .iter() + .any(|existing| existing.relay == failure.relay) + { + target.failed_relays.push(failure); + } + } + target.events.extend(receipt.events); + } + None => *target = Some(receipt), + } +} + fn push_unique_many<'a>(target: &mut Vec<String>, values: impl Iterator<Item = &'a String>) { for value in values { if !target.contains(value) { @@ -991,7 +1504,7 @@ mod tests { use super::{ DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, DirectRelayPublishReceipt, market_refresh_with_fetcher, pull_with_fetcher, - push_with_publisher, + push_with_publisher, status, }; use crate::runtime::config::{ AccountConfig, AccountSecretContractConfig, HyfConfig, IdentityConfig, InteractionConfig, @@ -1443,6 +1956,42 @@ mod tests { } #[test] + fn relay_refresh_records_current_run_freshness() { + let dir = tempdir().expect("tempdir"); + let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); + crate::runtime::local::init(&config).expect("store init"); + let seller = identity(10); + + let view = market_refresh_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])) + .expect("market refresh"); + + assert_eq!(view.freshness.state, "fresh"); + let run = view.freshness.run.as_ref().expect("run freshness"); + assert_eq!(run.scope, "market_refresh"); + assert_eq!(run.last_state, "success"); + assert_eq!(run.relay_set_current, true); + assert_eq!(run.fetched_count, Some(1)); + assert_eq!(run.ingested_count, Some(1)); + } + + #[test] + fn sync_status_reports_relay_set_changed_freshness() { + let dir = tempdir().expect("tempdir"); + let config = sample_config(dir.path(), vec!["wss://relay-a.example.com".to_owned()]); + crate::runtime::local::init(&config).expect("store init"); + let seller = identity(11); + pull_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])).expect("sync pull"); + let changed = sample_config(dir.path(), vec!["wss://relay-b.example.com".to_owned()]); + + let view = status(&changed).expect("sync status"); + + assert_eq!(view.freshness.state, "relay_set_changed"); + let run = view.freshness.run.as_ref().expect("run freshness"); + assert_eq!(run.scope, "sync_pull"); + assert_eq!(run.relay_set_current, false); + } + + #[test] fn relay_ingest_splits_unsupported_and_failed_events() { let dir = tempdir().expect("tempdir"); let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); diff --git a/tests/signer_runtime_modes.rs b/tests/signer_runtime_modes.rs @@ -2121,7 +2121,19 @@ fn local_seller_publish_commands_attempt_configured_direct_relay() { order_id, ]); assert!(!order_output.status.success()); - assert_direct_relay_connection_failure(&order_value, "order.submit", &["order", "submit"]); + assert_eq!(order_value["operation_id"], "order.submit"); + assert_eq!(order_value["result"], serde_json::Value::Null); + assert_eq!(order_value["errors"][0]["code"], "operation_unavailable"); + assert_eq!( + order_value["errors"][0]["detail"]["issues"][0]["field"], + "order.listing_addr" + ); + assert_contains( + &order_value["errors"][0]["detail"]["issues"][0]["message"], + "local market freshness", + ); + assert_no_removed_command_reference(&order_value, &["order", "submit"]); + assert_no_daemon_runtime_reference(&order_value, &["order", "submit"]); } #[test]