cli

Command-line interface for Radroots
git clone https://radroots.dev/git/cli.git
Log | Files | Refs | README | LICENSE

sync.rs (82313B)


      1 use std::thread;
      2 use std::time::{Duration, SystemTime, UNIX_EPOCH};
      3 
      4 use radroots_events::kinds::{
      5     KIND_FARM, KIND_LIST_SET_APP_CURATION, KIND_LIST_SET_BOOKMARK, KIND_LIST_SET_CALENDAR,
      6     KIND_LIST_SET_CURATION, KIND_LIST_SET_EMOJI, KIND_LIST_SET_FOLLOW, KIND_LIST_SET_GENERIC,
      7     KIND_LIST_SET_INTEREST, KIND_LIST_SET_KIND_MUTE, KIND_LIST_SET_MEDIA_STARTER_PACK,
      8     KIND_LIST_SET_PICTURE, KIND_LIST_SET_RELAY, KIND_LIST_SET_RELEASE_ARTIFACT,
      9     KIND_LIST_SET_STARTER_PACK, KIND_LIST_SET_VIDEO, KIND_LISTING, KIND_PLOT, KIND_PROFILE,
     10 };
     11 use radroots_nostr::prelude::{
     12     RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_event_from_nostr, radroots_nostr_kind,
     13 };
     14 use radroots_replica_db::{ReplicaSql, migrations};
     15 use radroots_replica_sync::{
     16     RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, radroots_replica_ingest_event,
     17     radroots_replica_sync_status,
     18 };
     19 use radroots_sdk::{
     20     PushOutboxEventReceipt, PushOutboxReceipt, PushOutboxRelayOutcomeKind, PushOutboxRequest,
     21     SyncStatusReceipt, SyncStatusRequest,
     22 };
     23 use radroots_sql_core::{SqlExecutor, SqliteExecutor};
     24 use serde::Deserialize;
     25 use serde_json::json;
     26 
     27 use crate::cli::global::SyncWatchArgs;
     28 use crate::runtime::RuntimeError;
     29 use crate::runtime::config::RuntimeConfig;
     30 use crate::runtime::direct_relay::{
     31     DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, fetch_events_from_relays,
     32 };
     33 use crate::runtime::sdk::{CliSdkAdapterError, CliSdkSession, sdk_relay_url_policy};
     34 use crate::view::runtime::{
     35     RelayFailureView, SyncActionView, SyncFreshnessView, SyncQueueView, SyncRunFreshnessView,
     36     SyncStatusView, SyncWatchFrameView, SyncWatchView,
     37 };
     38 
     39 const SYNC_SOURCE: &str = "local replica · local first";
     40 const SDK_SYNC_SOURCE: &str = "SDK canonical event store and outbox";
     41 const SDK_PUSH_SOURCE: &str = "SDK outbox push";
     42 const RELAY_PULL_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync pull";
     43 const SYNC_PULL_ACTION: &str = "radroots sync pull";
     44 const SYNC_PUSH_ACTION: &str = "radroots sync push";
     45 const SYNC_READY_ACTION: &str = "radroots market product search eggs";
     46 const MARKET_READY_ACTION: &str = "radroots market product search eggs";
     47 const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest";
     48 const RELAY_FETCH_LIMIT: usize = 1_000;
     49 const RELAY_FETCH_MAX_PAGES: usize = 5;
     50 const MARKET_FRESHNESS_STALE_AFTER_SECONDS: u64 = 15 * 60;
     51 const SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS: u64 = 30 * 60;
     52 const SYNC_RUN_TABLE: &str = "radroots_cli_sync_run";
     53 const MARKET_REFRESH_KINDS: &[u32] = &[KIND_PROFILE, KIND_FARM, KIND_LISTING];
     54 const SYNC_PULL_KINDS: &[u32] = &[
     55     KIND_PROFILE,
     56     KIND_FARM,
     57     KIND_PLOT,
     58     KIND_LISTING,
     59     KIND_LIST_SET_FOLLOW,
     60     KIND_LIST_SET_GENERIC,
     61     KIND_LIST_SET_RELAY,
     62     KIND_LIST_SET_BOOKMARK,
     63     KIND_LIST_SET_CURATION,
     64     KIND_LIST_SET_VIDEO,
     65     KIND_LIST_SET_PICTURE,
     66     KIND_LIST_SET_KIND_MUTE,
     67     KIND_LIST_SET_INTEREST,
     68     KIND_LIST_SET_EMOJI,
     69     KIND_LIST_SET_RELEASE_ARTIFACT,
     70     KIND_LIST_SET_APP_CURATION,
     71     KIND_LIST_SET_CALENDAR,
     72     KIND_LIST_SET_STARTER_PACK,
     73     KIND_LIST_SET_MEDIA_STARTER_PACK,
     74 ];
     75 
     76 #[derive(Debug, Clone)]
     77 struct SyncSnapshot {
     78     state: String,
     79     source: String,
     80     local_root: String,
     81     replica_db: String,
     82     relay_count: usize,
     83     publish_policy: String,
     84     freshness: SyncFreshnessView,
     85     queue: SyncQueueView,
     86     reason: Option<String>,
     87     actions: Vec<String>,
     88 }
     89 
     90 #[derive(Debug, Clone)]
     91 struct SyncRunRecord {
     92     scope: String,
     93     relay_set_fingerprint: String,
     94     target_relays_json: String,
     95     connected_relays_json: String,
     96     failed_relays_json: String,
     97     started_at: u64,
     98     completed_at: Option<u64>,
     99     state: String,
    100     fetched_count: usize,
    101     ingested_count: usize,
    102     skipped_count: usize,
    103     unsupported_count: usize,
    104     failed_count: usize,
    105     failure_reason: Option<String>,
    106 }
    107 
    108 #[derive(Debug, Deserialize)]
    109 struct SyncRunRow {
    110     scope: String,
    111     relay_set_fingerprint: String,
    112     target_relays_json: String,
    113     connected_relays_json: String,
    114     failed_relays_json: String,
    115     started_at: i64,
    116     completed_at: Option<i64>,
    117     state: String,
    118     fetched_count: i64,
    119     ingested_count: i64,
    120     skipped_count: i64,
    121     unsupported_count: i64,
    122     failed_count: i64,
    123     failure_reason: Option<String>,
    124 }
    125 
    126 pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, CliSdkAdapterError> {
    127     let session = CliSdkSession::connect(config)?;
    128     let receipt = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?;
    129     Ok(sdk_sync_status_view(config, receipt))
    130 }
    131 
    132 pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
    133     pull_with_fetcher(config, fetch_events_from_relays_windowed)
    134 }
    135 
    136 pub fn market_refresh(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
    137     market_refresh_with_fetcher(config, fetch_events_from_relays_windowed)
    138 }
    139 
    140 fn pull_with_fetcher<F>(config: &RuntimeConfig, fetcher: F) -> Result<SyncActionView, RuntimeError>
    141 where
    142     F: FnOnce(
    143         &[String],
    144         RadrootsNostrFilter,
    145     ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>,
    146 {
    147     relay_ingest(config, RelayIngestScope::SyncPull, fetcher)
    148 }
    149 
    150 fn market_refresh_with_fetcher<F>(
    151     config: &RuntimeConfig,
    152     fetcher: F,
    153 ) -> Result<SyncActionView, RuntimeError>
    154 where
    155     F: FnOnce(
    156         &[String],
    157         RadrootsNostrFilter,
    158     ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>,
    159 {
    160     relay_ingest(config, RelayIngestScope::MarketRefresh, fetcher)
    161 }
    162 
    163 fn fetch_events_from_relays_windowed(
    164     relay_urls: &[String],
    165     base_filter: RadrootsNostrFilter,
    166 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> {
    167     let mut next_filter = base_filter.clone();
    168     let mut merged: Option<DirectRelayFetchReceipt> = None;
    169 
    170     for _ in 0..RELAY_FETCH_MAX_PAGES {
    171         let receipt = fetch_events_from_relays(relay_urls, next_filter)?;
    172         let page_len = receipt.events.len();
    173         let oldest_created_at = receipt
    174             .events
    175             .iter()
    176             .map(|event| event.created_at.as_secs())
    177             .min();
    178         merge_fetch_receipt(&mut merged, receipt);
    179         if page_len < RELAY_FETCH_LIMIT {
    180             break;
    181         }
    182         let Some(oldest_created_at) = oldest_created_at else {
    183             break;
    184         };
    185         if oldest_created_at == 0 {
    186             break;
    187         }
    188         next_filter = base_filter
    189             .clone()
    190             .until(RadrootsNostrTimestamp::from(oldest_created_at - 1))
    191             .limit(RELAY_FETCH_LIMIT);
    192     }
    193 
    194     merged.ok_or(DirectRelayFetchError::MissingRelays)
    195 }
    196 
    197 fn relay_ingest<F>(
    198     config: &RuntimeConfig,
    199     scope: RelayIngestScope,
    200     fetcher: F,
    201 ) -> Result<SyncActionView, RuntimeError>
    202 where
    203     F: FnOnce(
    204         &[String],
    205         RadrootsNostrFilter,
    206     ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>,
    207 {
    208     let snapshot = inspect_sync(config)?;
    209     if snapshot.state == "unconfigured" {
    210         return Ok(empty_action_from_snapshot(snapshot, "pull"));
    211     }
    212 
    213     if config.output.dry_run {
    214         let mut view = empty_action_from_snapshot(snapshot, "pull");
    215         view.state = "ready".to_owned();
    216         view.reason = Some("dry run requested; relay fetch skipped".to_owned());
    217         view.target_relays = config.relay.urls.clone();
    218         view.fetched_count = Some(0);
    219         view.ingested_count = Some(0);
    220         view.publishable_count = None;
    221         view.published_count = None;
    222         view.skipped_count = Some(0);
    223         view.unsupported_count = Some(0);
    224         view.failed_count = Some(0);
    225         view.reason_code = Some("dry_run".to_owned());
    226         view.actions = vec![scope.ready_action().to_owned()];
    227         return Ok(view);
    228     }
    229 
    230     let started_at = unix_now();
    231     let receipt = match fetcher(&config.relay.urls, scope.filter()) {
    232         Ok(receipt) => receipt,
    233         Err(DirectRelayFetchError::Connect {
    234             reason,
    235             target_relays,
    236             failed_relays,
    237         }) => {
    238             let failed_relays = relay_failures(failed_relays);
    239             let failure_reason = format!("direct relay connection failed: {reason}");
    240             let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    241             migrations::run_all_up(&executor)?;
    242             record_sync_run(
    243                 &executor,
    244                 &sync_record_from_failure(
    245                     scope,
    246                     &config.relay.urls,
    247                     target_relays.clone(),
    248                     failed_relays.clone(),
    249                     started_at,
    250                     failure_reason.clone(),
    251                 )?,
    252             )?;
    253             let mut view = empty_action_from_snapshot(snapshot, "pull");
    254             view.state = "unavailable".to_owned();
    255             view.reason = Some(failure_reason);
    256             view.reason_code = Some("relay_fetch_failed".to_owned());
    257             view.target_relays = target_relays;
    258             view.failed_relays = failed_relays;
    259             view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?;
    260             return Ok(view);
    261         }
    262         Err(error) => {
    263             let failure_reason = error.to_string();
    264             let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    265             migrations::run_all_up(&executor)?;
    266             record_sync_run(
    267                 &executor,
    268                 &sync_record_from_failure(
    269                     scope,
    270                     &config.relay.urls,
    271                     config.relay.urls.clone(),
    272                     Vec::new(),
    273                     started_at,
    274                     failure_reason.clone(),
    275                 )?,
    276             )?;
    277             let mut view = empty_action_from_snapshot(snapshot, "pull");
    278             view.state = "unavailable".to_owned();
    279             view.reason = Some(failure_reason);
    280             view.reason_code = Some("relay_fetch_failed".to_owned());
    281             view.target_relays = config.relay.urls.clone();
    282             view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?;
    283             return Ok(view);
    284         }
    285     };
    286 
    287     let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    288     migrations::run_all_up(&executor)?;
    289     let ingest = ingest_events(&executor, &receipt, scope)?;
    290     record_sync_run(
    291         &executor,
    292         &sync_record_from_ingest(scope, &config.relay.urls, &receipt, &ingest, started_at)?,
    293     )?;
    294     let failed_relays = relay_failures(receipt.failed_relays);
    295     let failed_count = ingest.failed_count + failed_relays.len();
    296     let reason_code = relay_ingest_reason_code(&ingest, &failed_relays).map(str::to_owned);
    297     let reason = relay_ingest_reason(&ingest, &failed_relays);
    298     let freshness = freshness_for_scope_from_executor(config, &executor, scope)?;
    299     let queue = radroots_replica_sync_status(&executor)?;
    300 
    301     Ok(SyncActionView {
    302         direction: "pull".to_owned(),
    303         state: "ready".to_owned(),
    304         source: INGEST_SOURCE.to_owned(),
    305         local_root: config.local.root.display().to_string(),
    306         replica_db: "ready".to_owned(),
    307         relay_count: config.relay.urls.len(),
    308         publish_policy: config.relay.publish_policy.as_str().to_owned(),
    309         freshness,
    310         queue: legacy_sync_queue(queue.expected_count, queue.pending_count),
    311         target_relays: receipt.target_relays,
    312         connected_relays: receipt.connected_relays,
    313         acknowledged_relays: Vec::new(),
    314         failed_relays,
    315         fetched_count: Some(ingest.fetched_count),
    316         ingested_count: Some(ingest.ingested_count),
    317         publishable_count: None,
    318         published_count: None,
    319         skipped_count: Some(ingest.skipped_count),
    320         unsupported_count: Some(ingest.unsupported_count),
    321         failed_count: Some(failed_count),
    322         publish_plan: None,
    323         reason_code,
    324         reason,
    325         actions: vec![scope.ready_action().to_owned()],
    326     })
    327 }
    328 
    329 pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, CliSdkAdapterError> {
    330     let session = CliSdkSession::connect(config)?;
    331     if config.output.dry_run {
    332         let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?;
    333         return Ok(sdk_push_dry_run_view(config, status));
    334     }
    335 
    336     let receipt = session.block_on(session.sdk().sync().push_outbox(
    337         PushOutboxRequest::new().with_relay_url_policy(sdk_relay_url_policy(config)),
    338     ))?;
    339     let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?;
    340     Ok(sdk_push_view(config, receipt, status))
    341 }
    342 
    343 pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchView, RuntimeError> {
    344     if args.frames == 0 {
    345         return Err(RuntimeError::Config(
    346             "`sync watch --frames` must be greater than 0".to_owned(),
    347         ));
    348     }
    349 
    350     let mut frames = Vec::with_capacity(args.frames);
    351     let mut last_snapshot = None;
    352 
    353     for index in 0..args.frames {
    354         let snapshot = inspect_sync(config)?;
    355         frames.push(SyncWatchFrameView {
    356             sequence: index + 1,
    357             observed_at: unix_now(),
    358             state: snapshot.state.clone(),
    359             relay_count: snapshot.relay_count,
    360             freshness: snapshot.freshness.clone(),
    361             queue: snapshot.queue.clone(),
    362         });
    363         last_snapshot = Some(snapshot);
    364 
    365         if index + 1 < args.frames {
    366             thread::sleep(Duration::from_millis(args.interval_ms));
    367         }
    368     }
    369 
    370     let snapshot = last_snapshot.expect("watch frames are non-empty");
    371     Ok(SyncWatchView {
    372         state: snapshot.state,
    373         source: snapshot.source,
    374         interval_ms: args.interval_ms,
    375         frames,
    376         reason: snapshot.reason,
    377         actions: snapshot.actions,
    378     })
    379 }
    380 
    381 fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncActionView {
    382     SyncActionView {
    383         direction: direction.to_owned(),
    384         state: snapshot.state,
    385         source: snapshot.source,
    386         local_root: snapshot.local_root,
    387         replica_db: snapshot.replica_db,
    388         relay_count: snapshot.relay_count,
    389         publish_policy: snapshot.publish_policy,
    390         freshness: snapshot.freshness,
    391         queue: snapshot.queue,
    392         target_relays: Vec::new(),
    393         connected_relays: Vec::new(),
    394         acknowledged_relays: Vec::new(),
    395         failed_relays: Vec::new(),
    396         fetched_count: None,
    397         ingested_count: None,
    398         publishable_count: None,
    399         published_count: None,
    400         skipped_count: None,
    401         unsupported_count: None,
    402         failed_count: None,
    403         publish_plan: None,
    404         reason_code: None,
    405         reason: snapshot.reason,
    406         actions: snapshot.actions,
    407     }
    408 }
    409 
    410 fn sdk_sync_status_view(config: &RuntimeConfig, receipt: SyncStatusReceipt) -> SyncStatusView {
    411     let actions = sdk_sync_status_actions(&receipt);
    412     let relay_count = receipt.relay_targets.configured_count;
    413     SyncStatusView {
    414         state: "ready".to_owned(),
    415         source: SDK_SYNC_SOURCE.to_owned(),
    416         local_root: config.local.root.display().to_string(),
    417         replica_db: "legacy_derived_not_checked".to_owned(),
    418         relay_count,
    419         publish_policy: config.relay.publish_policy.as_str().to_owned(),
    420         freshness: sdk_sync_freshness(&receipt),
    421         queue: sdk_sync_queue(&receipt),
    422         reason: None,
    423         actions,
    424     }
    425 }
    426 
    427 fn sdk_push_dry_run_view(config: &RuntimeConfig, status: SyncStatusReceipt) -> SyncActionView {
    428     let publishable_count = usize_from_i64(status.outbox.ready_signed_events);
    429     let state = if publishable_count > 0 {
    430         "dry_run"
    431     } else {
    432         "ready"
    433     };
    434     let reason = if publishable_count > 0 {
    435         Some("dry run requested; SDK outbox push skipped".to_owned())
    436     } else if status.outbox.total_events > 0 {
    437         Some("SDK outbox has no ready signed events to push".to_owned())
    438     } else {
    439         None
    440     };
    441     sdk_push_action_view(
    442         config,
    443         state,
    444         sdk_sync_queue(&status),
    445         sdk_sync_freshness(&status),
    446         status.relay_targets.configured_relays,
    447         Vec::new(),
    448         Vec::new(),
    449         Vec::new(),
    450         publishable_count,
    451         0,
    452         0,
    453         Some(0),
    454         reason,
    455         sdk_sync_push_actions(state, publishable_count > 0),
    456     )
    457 }
    458 
    459 fn sdk_push_view(
    460     config: &RuntimeConfig,
    461     receipt: PushOutboxReceipt,
    462     status: SyncStatusReceipt,
    463 ) -> SyncActionView {
    464     let failed_count = receipt.retryable_events + receipt.terminal_events;
    465     let state = if receipt.attempted_events == 0 {
    466         "ready"
    467     } else if receipt.published_events > 0 && failed_count > 0 {
    468         "partial"
    469     } else if failed_count > 0 {
    470         "unavailable"
    471     } else if receipt.published_events > 0 {
    472         "published"
    473     } else {
    474         "ready"
    475     };
    476     let reason = sdk_push_reason(&receipt, failed_count);
    477     sdk_push_action_view(
    478         config,
    479         state,
    480         sdk_sync_queue(&status),
    481         sdk_sync_freshness(&status),
    482         sdk_push_target_relays(&receipt, &status),
    483         sdk_push_connected_relays(&receipt),
    484         sdk_push_acknowledged_relays(&receipt),
    485         sdk_push_failed_relays(&receipt),
    486         receipt.attempted_events,
    487         receipt.published_events,
    488         failed_count,
    489         Some(0),
    490         reason,
    491         sdk_sync_push_actions(state, failed_count > 0),
    492     )
    493 }
    494 
    495 fn sdk_push_action_view(
    496     config: &RuntimeConfig,
    497     state: &str,
    498     queue: SyncQueueView,
    499     freshness: SyncFreshnessView,
    500     target_relays: Vec<String>,
    501     connected_relays: Vec<String>,
    502     acknowledged_relays: Vec<String>,
    503     failed_relays: Vec<RelayFailureView>,
    504     publishable_count: usize,
    505     published_count: usize,
    506     failed_count: usize,
    507     skipped_count: Option<usize>,
    508     reason: Option<String>,
    509     actions: Vec<String>,
    510 ) -> SyncActionView {
    511     SyncActionView {
    512         direction: "push".to_owned(),
    513         state: state.to_owned(),
    514         source: SDK_PUSH_SOURCE.to_owned(),
    515         local_root: config.local.root.display().to_string(),
    516         replica_db: "legacy_derived_not_checked".to_owned(),
    517         relay_count: config.relay.urls.len(),
    518         publish_policy: config.relay.publish_policy.as_str().to_owned(),
    519         freshness,
    520         queue,
    521         target_relays,
    522         connected_relays,
    523         acknowledged_relays,
    524         failed_relays,
    525         fetched_count: None,
    526         ingested_count: None,
    527         publishable_count: Some(publishable_count),
    528         published_count: Some(published_count),
    529         skipped_count,
    530         unsupported_count: Some(0),
    531         failed_count: Some(failed_count),
    532         publish_plan: None,
    533         reason_code: sdk_sync_push_reason_code(state).map(str::to_owned),
    534         reason,
    535         actions,
    536     }
    537 }
    538 
    539 fn sdk_sync_status_actions(receipt: &SyncStatusReceipt) -> Vec<String> {
    540     let mut actions = Vec::new();
    541     if receipt.outbox.ready_signed_events > 0 {
    542         actions.push(SYNC_PUSH_ACTION.to_owned());
    543     }
    544     if receipt.event_store.total_events == 0 {
    545         actions.push(SYNC_PULL_ACTION.to_owned());
    546     }
    547     actions
    548 }
    549 
    550 fn sdk_sync_push_actions(state: &str, retryable: bool) -> Vec<String> {
    551     match state {
    552         "published" | "ready" => vec!["radroots sync status get".to_owned()],
    553         "dry_run" | "partial" | "unavailable" if retryable => {
    554             vec![
    555                 SYNC_PUSH_ACTION.to_owned(),
    556                 "radroots sync status get".to_owned(),
    557             ]
    558         }
    559         _ => vec!["radroots sync status get".to_owned()],
    560     }
    561 }
    562 
    563 fn sdk_sync_push_reason_code(state: &str) -> Option<&'static str> {
    564     match state {
    565         "dry_run" => Some("dry_run"),
    566         "partial" => Some("sdk_outbox_push_partial"),
    567         "unavailable" => Some("sdk_outbox_push_failed"),
    568         _ => None,
    569     }
    570 }
    571 
    572 fn sdk_push_reason(receipt: &PushOutboxReceipt, failed_count: usize) -> Option<String> {
    573     if receipt.attempted_events == 0 {
    574         return Some("SDK outbox had no ready signed events to push".to_owned());
    575     }
    576     if failed_count > 0 && receipt.published_events > 0 {
    577         return Some(format!(
    578             "SDK outbox push published {} event(s); {failed_count} event(s) remain retryable or terminal",
    579             receipt.published_events
    580         ));
    581     }
    582     if failed_count > 0 {
    583         return Some(
    584             "SDK outbox push did not reach accepted quorum for any ready event".to_owned(),
    585         );
    586     }
    587     None
    588 }
    589 
    590 fn sdk_sync_queue(receipt: &SyncStatusReceipt) -> SyncQueueView {
    591     let pending_count = usize_from_i64(
    592         receipt
    593             .outbox
    594             .pending_events
    595             .saturating_add(receipt.outbox.retryable_events),
    596     );
    597     SyncQueueView {
    598         expected_count: usize_from_i64(receipt.outbox.total_events),
    599         pending_count,
    600         total_count: Some(usize_from_i64(receipt.outbox.total_events)),
    601         retryable_count: Some(usize_from_i64(receipt.outbox.retryable_events)),
    602         terminal_count: Some(usize_from_i64(receipt.outbox.terminal_events)),
    603         failed_terminal_count: Some(usize_from_i64(receipt.outbox.failed_terminal_events)),
    604         ready_signed_count: Some(usize_from_i64(receipt.outbox.ready_signed_events)),
    605         publishing_count: Some(usize_from_i64(receipt.outbox.publishing_events)),
    606         last_attempt_at_ms: receipt.outbox.last_attempt_at_ms,
    607         last_error: receipt.outbox.last_error.clone(),
    608     }
    609 }
    610 
    611 fn legacy_sync_queue(expected_count: usize, pending_count: usize) -> SyncQueueView {
    612     SyncQueueView {
    613         expected_count,
    614         pending_count,
    615         total_count: None,
    616         retryable_count: None,
    617         terminal_count: None,
    618         failed_terminal_count: None,
    619         ready_signed_count: None,
    620         publishing_count: None,
    621         last_attempt_at_ms: None,
    622         last_error: None,
    623     }
    624 }
    625 
    626 fn sdk_sync_freshness(receipt: &SyncStatusReceipt) -> SyncFreshnessView {
    627     let Some(last_event_updated_at_ms) = receipt.event_store.last_event_updated_at_ms else {
    628         return missing_freshness();
    629     };
    630     let last_event_at = u64::try_from(last_event_updated_at_ms / 1_000).unwrap_or(0);
    631     let observed_at = u64::try_from(receipt.observed_at_ms / 1_000).unwrap_or_else(|_| unix_now());
    632     let age_seconds = observed_at.saturating_sub(last_event_at);
    633     SyncFreshnessView {
    634         state: "synced".to_owned(),
    635         display: format!("SDK event store updated {}", relative_age(age_seconds)),
    636         age_seconds: Some(age_seconds),
    637         last_event_at: Some(last_event_at),
    638         run: None,
    639     }
    640 }
    641 
    642 fn sdk_push_target_relays(receipt: &PushOutboxReceipt, status: &SyncStatusReceipt) -> Vec<String> {
    643     let mut relays = Vec::new();
    644     for relay in receipt.events.iter().flat_map(|event| event.relays.iter()) {
    645         if !relays.contains(&relay.relay_url) {
    646             relays.push(relay.relay_url.clone());
    647         }
    648     }
    649     if relays.is_empty() {
    650         relays.extend(status.relay_targets.configured_relays.clone());
    651     }
    652     relays
    653 }
    654 
    655 fn sdk_push_connected_relays(receipt: &PushOutboxReceipt) -> Vec<String> {
    656     sdk_push_relays_matching(receipt, |_, relay| relay.attempted)
    657 }
    658 
    659 fn sdk_push_acknowledged_relays(receipt: &PushOutboxReceipt) -> Vec<String> {
    660     sdk_push_relays_matching(receipt, |_, relay| sdk_relay_accepted(relay.outcome_kind))
    661 }
    662 
    663 fn sdk_push_relays_matching(
    664     receipt: &PushOutboxReceipt,
    665     predicate: impl Fn(&PushOutboxEventReceipt, &radroots_sdk::PushOutboxRelayReceipt) -> bool,
    666 ) -> Vec<String> {
    667     let mut relays = Vec::new();
    668     for event in &receipt.events {
    669         for relay in &event.relays {
    670             if predicate(event, relay) && !relays.contains(&relay.relay_url) {
    671                 relays.push(relay.relay_url.clone());
    672             }
    673         }
    674     }
    675     relays
    676 }
    677 
    678 fn sdk_push_failed_relays(receipt: &PushOutboxReceipt) -> Vec<RelayFailureView> {
    679     receipt
    680         .events
    681         .iter()
    682         .flat_map(|event| event.relays.iter())
    683         .filter(|relay| !sdk_relay_accepted(relay.outcome_kind))
    684         .map(|relay| RelayFailureView {
    685             relay: relay.relay_url.clone(),
    686             reason: relay
    687                 .message
    688                 .clone()
    689                 .unwrap_or_else(|| sdk_relay_outcome_kind(relay.outcome_kind).to_owned()),
    690         })
    691         .collect()
    692 }
    693 
    694 fn sdk_relay_accepted(kind: PushOutboxRelayOutcomeKind) -> bool {
    695     matches!(
    696         kind,
    697         PushOutboxRelayOutcomeKind::Accepted | PushOutboxRelayOutcomeKind::DuplicateAccepted
    698     )
    699 }
    700 
    701 fn sdk_relay_outcome_kind(kind: PushOutboxRelayOutcomeKind) -> &'static str {
    702     match kind {
    703         PushOutboxRelayOutcomeKind::Accepted => "accepted",
    704         PushOutboxRelayOutcomeKind::DuplicateAccepted => "duplicate_accepted",
    705         PushOutboxRelayOutcomeKind::Blocked => "blocked",
    706         PushOutboxRelayOutcomeKind::RateLimited => "rate_limited",
    707         PushOutboxRelayOutcomeKind::Invalid => "invalid",
    708         PushOutboxRelayOutcomeKind::PowRequired => "pow_required",
    709         PushOutboxRelayOutcomeKind::Restricted => "restricted",
    710         PushOutboxRelayOutcomeKind::AuthRequired => "auth_required",
    711         PushOutboxRelayOutcomeKind::Error => "error",
    712         PushOutboxRelayOutcomeKind::Timeout => "timeout",
    713         PushOutboxRelayOutcomeKind::ConnectionFailed => "connection_failed",
    714         PushOutboxRelayOutcomeKind::Unknown => "unknown",
    715         _ => "unknown",
    716     }
    717 }
    718 
    719 fn usize_from_i64(value: i64) -> usize {
    720     usize::try_from(value.max(0)).unwrap_or(usize::MAX)
    721 }
    722 
    723 fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
    724     if !config.local.replica_db_path.exists() {
    725         return Ok(SyncSnapshot {
    726             state: "unconfigured".to_owned(),
    727             source: SYNC_SOURCE.to_owned(),
    728             local_root: config.local.root.display().to_string(),
    729             replica_db: "missing".to_owned(),
    730             relay_count: config.relay.urls.len(),
    731             publish_policy: config.relay.publish_policy.as_str().to_owned(),
    732             freshness: missing_freshness(),
    733             queue: legacy_sync_queue(0, 0),
    734             reason: Some("local replica database is not initialized".to_owned()),
    735             actions: vec!["radroots store init".to_owned()],
    736         });
    737     }
    738 
    739     let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    740     migrations::run_all_up(&executor)?;
    741     let queue = radroots_replica_sync_status(&executor)?;
    742     let freshness =
    743         freshness_for_scope_from_executor(config, &executor, RelayIngestScope::SyncPull)?;
    744     let relay_count = config.relay.urls.len();
    745     let publish_policy = config.relay.publish_policy.as_str().to_owned();
    746     let mut actions = Vec::new();
    747 
    748     if relay_count == 0 {
    749         actions.push(RELAY_PULL_SETUP_ACTION.to_owned());
    750         return Ok(SyncSnapshot {
    751             state: "unconfigured".to_owned(),
    752             source: SYNC_SOURCE.to_owned(),
    753             local_root: config.local.root.display().to_string(),
    754             replica_db: "ready".to_owned(),
    755             relay_count,
    756             publish_policy,
    757             freshness,
    758             queue: legacy_sync_queue(queue.expected_count, queue.pending_count),
    759             reason: Some("no relays are configured for this operator session".to_owned()),
    760             actions,
    761         });
    762     }
    763 
    764     actions.push(SYNC_PULL_ACTION.to_owned());
    765     if queue.pending_count > 0 {
    766         actions.push(SYNC_PUSH_ACTION.to_owned());
    767     }
    768 
    769     Ok(SyncSnapshot {
    770         state: "ready".to_owned(),
    771         source: SYNC_SOURCE.to_owned(),
    772         local_root: config.local.root.display().to_string(),
    773         replica_db: "ready".to_owned(),
    774         relay_count,
    775         publish_policy,
    776         freshness,
    777         queue: legacy_sync_queue(queue.expected_count, queue.pending_count),
    778         reason: None,
    779         actions,
    780     })
    781 }
    782 
    783 pub(crate) fn missing_freshness() -> SyncFreshnessView {
    784     SyncFreshnessView {
    785         state: "never".to_owned(),
    786         display: "never synced".to_owned(),
    787         age_seconds: None,
    788         last_event_at: None,
    789         run: None,
    790     }
    791 }
    792 
    793 pub(crate) fn freshness_for_scope(
    794     config: &RuntimeConfig,
    795     scope: RelayIngestScope,
    796 ) -> Result<SyncFreshnessView, RuntimeError> {
    797     let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    798     migrations::run_all_up(&executor)?;
    799     freshness_for_scope_from_executor(config, &executor, scope)
    800 }
    801 
    802 pub(crate) fn relay_provenance_relays_for_scope(
    803     config: &RuntimeConfig,
    804     scope: RelayIngestScope,
    805 ) -> Result<Vec<String>, RuntimeError> {
    806     if !config.local.replica_db_path.exists() {
    807         return Ok(Vec::new());
    808     }
    809     let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
    810     migrations::run_all_up(&executor)?;
    811     ensure_sync_run_table(&executor)?;
    812     let current_fingerprint = relay_set_fingerprint(&config.relay.urls);
    813     let Some(run) = latest_sync_run(&executor, scope)? else {
    814         return Ok(Vec::new());
    815     };
    816     if run.relay_set_fingerprint != current_fingerprint || !sync_run_successful(&run) {
    817         return Ok(Vec::new());
    818     }
    819     let mut relays: Vec<String> = serde_json::from_str(run.connected_relays_json.as_str())?;
    820     relays.sort();
    821     relays.dedup();
    822     Ok(relays)
    823 }
    824 
    825 pub(crate) fn freshness_for_scope_from_executor(
    826     config: &RuntimeConfig,
    827     executor: &SqliteExecutor,
    828     scope: RelayIngestScope,
    829 ) -> Result<SyncFreshnessView, RuntimeError> {
    830     let last_event_at = ReplicaSql::new(executor).nostr_event_last_created_at()?;
    831     let now = unix_now();
    832     let age_seconds = last_event_at.map(|last_event_at| now.saturating_sub(last_event_at));
    833     ensure_sync_run_table(executor)?;
    834     let current_fingerprint = relay_set_fingerprint(&config.relay.urls);
    835     let latest = latest_sync_run(executor, scope)?;
    836     let current = latest
    837         .as_ref()
    838         .filter(|run| run.relay_set_fingerprint == current_fingerprint);
    839     let last_success = current.filter(|run| sync_run_successful(run));
    840     let state = freshness_state(scope, latest.as_ref(), current, last_success, age_seconds);
    841     let display = freshness_display(scope, state.as_str(), age_seconds, current);
    842 
    843     Ok(SyncFreshnessView {
    844         state,
    845         display,
    846         age_seconds,
    847         last_event_at,
    848         run: latest.map(|run| sync_run_freshness_view(scope, run, current_fingerprint)),
    849     })
    850 }
    851 
    852 pub(crate) fn freshness_requires_refresh(freshness: &SyncFreshnessView) -> bool {
    853     matches!(
    854         freshness.state.as_str(),
    855         "never" | "stale" | "relay_set_changed" | "refresh_failed"
    856     )
    857 }
    858 
    859 fn freshness_state(
    860     scope: RelayIngestScope,
    861     latest: Option<&SyncRunRecord>,
    862     current: Option<&SyncRunRecord>,
    863     last_success: Option<&SyncRunRecord>,
    864     age_seconds: Option<u64>,
    865 ) -> String {
    866     let Some(latest) = latest else {
    867         return "never".to_owned();
    868     };
    869     let Some(current) = current else {
    870         return "relay_set_changed".to_owned();
    871     };
    872     if !sync_run_successful(current) {
    873         return "refresh_failed".to_owned();
    874     }
    875     if last_success.is_none() {
    876         return "refresh_failed".to_owned();
    877     }
    878     if age_seconds.is_none() {
    879         return "fresh".to_owned();
    880     }
    881     if age_seconds.unwrap_or_default() > scope.stale_after_seconds() {
    882         return "stale".to_owned();
    883     }
    884     if latest.state == "partial" {
    885         return "partial".to_owned();
    886     }
    887     "fresh".to_owned()
    888 }
    889 
    890 fn freshness_display(
    891     scope: RelayIngestScope,
    892     state: &str,
    893     age_seconds: Option<u64>,
    894     run: Option<&SyncRunRecord>,
    895 ) -> String {
    896     match state {
    897         "fresh" => match age_seconds {
    898             Some(age_seconds) => format!("{} fresh {}", scope.display(), relative_age(age_seconds)),
    899             None => format!("{} fresh; no market events yet", scope.display()),
    900         },
    901         "partial" => match age_seconds {
    902             Some(age_seconds) => format!(
    903                 "{} partially refreshed {}",
    904                 scope.display(),
    905                 relative_age(age_seconds)
    906             ),
    907             None => format!(
    908                 "{} partially refreshed; no market events yet",
    909                 scope.display()
    910             ),
    911         },
    912         "stale" => match age_seconds {
    913             Some(age_seconds) => format!("{} stale {}", scope.display(), relative_age(age_seconds)),
    914             None => format!("{} stale", scope.display()),
    915         },
    916         "relay_set_changed" => format!("{} relay set changed; refresh required", scope.display()),
    917         "refresh_failed" => run
    918             .and_then(|run| run.failure_reason.clone())
    919             .unwrap_or_else(|| format!("{} refresh failed", scope.display())),
    920         _ => format!("{} never synced", scope.display()),
    921     }
    922 }
    923 
    924 fn sync_run_successful(run: &SyncRunRecord) -> bool {
    925     matches!(run.state.as_str(), "success" | "partial")
    926 }
    927 
    928 fn sync_run_freshness_view(
    929     scope: RelayIngestScope,
    930     run: SyncRunRecord,
    931     current_fingerprint: String,
    932 ) -> SyncRunFreshnessView {
    933     let relay_set_current = run.relay_set_fingerprint == current_fingerprint;
    934     let successful = sync_run_successful(&run);
    935     let last_successful_at = successful.then_some(run.completed_at.unwrap_or(run.started_at));
    936     SyncRunFreshnessView {
    937         scope: run.scope,
    938         relay_set_fingerprint: run.relay_set_fingerprint,
    939         relay_set_current,
    940         last_state: run.state,
    941         last_attempted_at: Some(run.started_at),
    942         last_successful_at,
    943         last_completed_at: run.completed_at,
    944         stale_after_seconds: Some(scope.stale_after_seconds()),
    945         fetched_count: Some(run.fetched_count),
    946         ingested_count: Some(run.ingested_count),
    947         skipped_count: Some(run.skipped_count),
    948         unsupported_count: Some(run.unsupported_count),
    949         failed_count: Some(run.failed_count),
    950         failure_reason: run.failure_reason,
    951     }
    952 }
    953 
    954 pub(crate) fn ensure_sync_run_table(executor: &SqliteExecutor) -> Result<(), RuntimeError> {
    955     executor.exec(
    956         "CREATE TABLE IF NOT EXISTS radroots_cli_sync_run (
    957             id INTEGER PRIMARY KEY AUTOINCREMENT,
    958             scope TEXT NOT NULL,
    959             relay_set_fingerprint TEXT NOT NULL,
    960             target_relays_json TEXT NOT NULL,
    961             connected_relays_json TEXT NOT NULL,
    962             failed_relays_json TEXT NOT NULL,
    963             started_at INTEGER NOT NULL,
    964             completed_at INTEGER,
    965             state TEXT NOT NULL,
    966             fetched_count INTEGER NOT NULL,
    967             ingested_count INTEGER NOT NULL,
    968             skipped_count INTEGER NOT NULL,
    969             unsupported_count INTEGER NOT NULL,
    970             failed_count INTEGER NOT NULL,
    971             failure_reason TEXT
    972         );
    973         CREATE INDEX IF NOT EXISTS idx_radroots_cli_sync_run_scope_started
    974             ON radroots_cli_sync_run(scope, started_at DESC);",
    975         "[]",
    976     )?;
    977     Ok(())
    978 }
    979 
    980 fn latest_sync_run(
    981     executor: &SqliteExecutor,
    982     scope: RelayIngestScope,
    983 ) -> Result<Option<SyncRunRecord>, RuntimeError> {
    984     let rows = executor.query_raw(
    985         &format!(
    986             "SELECT scope,
    987                     relay_set_fingerprint,
    988                     target_relays_json,
    989                     connected_relays_json,
    990                     failed_relays_json,
    991                     started_at,
    992                     completed_at,
    993                     state,
    994                     fetched_count,
    995                     ingested_count,
    996                     skipped_count,
    997                     unsupported_count,
    998                     failed_count,
    999                     failure_reason
   1000              FROM {SYNC_RUN_TABLE}
   1001              WHERE scope = ?1
   1002              ORDER BY started_at DESC, id DESC
   1003              LIMIT 1"
   1004         ),
   1005         json!([scope.id()]).to_string().as_str(),
   1006     )?;
   1007     let mut rows: Vec<SyncRunRow> = serde_json::from_str(rows.as_str())?;
   1008     Ok(rows.pop().map(sync_run_record_from_row))
   1009 }
   1010 
   1011 fn sync_run_record_from_row(row: SyncRunRow) -> SyncRunRecord {
   1012     SyncRunRecord {
   1013         scope: row.scope,
   1014         relay_set_fingerprint: row.relay_set_fingerprint,
   1015         target_relays_json: row.target_relays_json,
   1016         connected_relays_json: row.connected_relays_json,
   1017         failed_relays_json: row.failed_relays_json,
   1018         started_at: u64_from_db(row.started_at),
   1019         completed_at: row.completed_at.map(u64_from_db),
   1020         state: row.state,
   1021         fetched_count: usize_from_db(row.fetched_count),
   1022         ingested_count: usize_from_db(row.ingested_count),
   1023         skipped_count: usize_from_db(row.skipped_count),
   1024         unsupported_count: usize_from_db(row.unsupported_count),
   1025         failed_count: usize_from_db(row.failed_count),
   1026         failure_reason: row.failure_reason,
   1027     }
   1028 }
   1029 
   1030 fn record_sync_run(executor: &SqliteExecutor, record: &SyncRunRecord) -> Result<(), RuntimeError> {
   1031     ensure_sync_run_table(executor)?;
   1032     executor.exec(
   1033         &format!(
   1034             "INSERT INTO {SYNC_RUN_TABLE} (
   1035                 scope,
   1036                 relay_set_fingerprint,
   1037                 target_relays_json,
   1038                 connected_relays_json,
   1039                 failed_relays_json,
   1040                 started_at,
   1041                 completed_at,
   1042                 state,
   1043                 fetched_count,
   1044                 ingested_count,
   1045                 skipped_count,
   1046                 unsupported_count,
   1047                 failed_count,
   1048                 failure_reason
   1049             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"
   1050         ),
   1051         json!([
   1052             record.scope.as_str(),
   1053             record.relay_set_fingerprint.as_str(),
   1054             record.target_relays_json.as_str(),
   1055             record.connected_relays_json.as_str(),
   1056             record.failed_relays_json.as_str(),
   1057             i64_from_u64(record.started_at),
   1058             record.completed_at.map(i64_from_u64),
   1059             record.state.as_str(),
   1060             i64_from_usize(record.fetched_count),
   1061             i64_from_usize(record.ingested_count),
   1062             i64_from_usize(record.skipped_count),
   1063             i64_from_usize(record.unsupported_count),
   1064             i64_from_usize(record.failed_count),
   1065             record.failure_reason.as_deref(),
   1066         ])
   1067         .to_string()
   1068         .as_str(),
   1069     )?;
   1070     Ok(())
   1071 }
   1072 
   1073 fn sync_record_from_failure(
   1074     scope: RelayIngestScope,
   1075     relays: &[String],
   1076     target_relays: Vec<String>,
   1077     failed_relays: Vec<RelayFailureView>,
   1078     started_at: u64,
   1079     reason: String,
   1080 ) -> Result<SyncRunRecord, RuntimeError> {
   1081     Ok(SyncRunRecord {
   1082         scope: scope.id().to_owned(),
   1083         relay_set_fingerprint: relay_set_fingerprint(relays),
   1084         target_relays_json: serde_json::to_string(&target_relays)?,
   1085         connected_relays_json: serde_json::to_string(&Vec::<String>::new())?,
   1086         failed_relays_json: serde_json::to_string(&failed_relays)?,
   1087         started_at,
   1088         completed_at: Some(unix_now()),
   1089         state: "failed".to_owned(),
   1090         fetched_count: 0,
   1091         ingested_count: 0,
   1092         skipped_count: 0,
   1093         unsupported_count: 0,
   1094         failed_count: 1,
   1095         failure_reason: Some(reason),
   1096     })
   1097 }
   1098 
   1099 fn sync_record_from_ingest(
   1100     scope: RelayIngestScope,
   1101     relays: &[String],
   1102     receipt: &DirectRelayFetchReceipt,
   1103     ingest: &RelayIngestCounts,
   1104     started_at: u64,
   1105 ) -> Result<SyncRunRecord, RuntimeError> {
   1106     let failed_relays = relay_failures(receipt.failed_relays.clone());
   1107     let state = if ingest.failed_count > 0 || !failed_relays.is_empty() {
   1108         "partial"
   1109     } else {
   1110         "success"
   1111     };
   1112     Ok(SyncRunRecord {
   1113         scope: scope.id().to_owned(),
   1114         relay_set_fingerprint: relay_set_fingerprint(relays),
   1115         target_relays_json: serde_json::to_string(&receipt.target_relays)?,
   1116         connected_relays_json: serde_json::to_string(&receipt.connected_relays)?,
   1117         failed_relays_json: serde_json::to_string(&failed_relays)?,
   1118         started_at,
   1119         completed_at: Some(unix_now()),
   1120         state: state.to_owned(),
   1121         fetched_count: ingest.fetched_count,
   1122         ingested_count: ingest.ingested_count,
   1123         skipped_count: ingest.skipped_count,
   1124         unsupported_count: ingest.unsupported_count,
   1125         failed_count: ingest.failed_count + failed_relays.len(),
   1126         failure_reason: ingest.reason(),
   1127     })
   1128 }
   1129 
   1130 fn relay_set_fingerprint(relays: &[String]) -> String {
   1131     let mut normalized = relays
   1132         .iter()
   1133         .map(|relay| relay.trim().to_ascii_lowercase())
   1134         .filter(|relay| !relay.is_empty())
   1135         .collect::<Vec<_>>();
   1136     normalized.sort();
   1137     normalized.dedup();
   1138     let mut hash = 0xcbf29ce484222325_u64;
   1139     for relay in normalized {
   1140         for byte in relay.as_bytes() {
   1141             hash ^= u64::from(*byte);
   1142             hash = hash.wrapping_mul(0x100000001b3);
   1143         }
   1144         hash ^= 0xff;
   1145         hash = hash.wrapping_mul(0x100000001b3);
   1146     }
   1147     format!("relayset_{hash:016x}")
   1148 }
   1149 
   1150 fn u64_from_db(value: i64) -> u64 {
   1151     u64::try_from(value).unwrap_or_default()
   1152 }
   1153 
   1154 fn usize_from_db(value: i64) -> usize {
   1155     usize::try_from(value).unwrap_or_default()
   1156 }
   1157 
   1158 fn i64_from_u64(value: u64) -> i64 {
   1159     i64::try_from(value).unwrap_or(i64::MAX)
   1160 }
   1161 
   1162 fn i64_from_usize(value: usize) -> i64 {
   1163     i64::try_from(value).unwrap_or(i64::MAX)
   1164 }
   1165 
   1166 #[derive(Debug, Clone, Default)]
   1167 struct RelayIngestCounts {
   1168     fetched_count: usize,
   1169     ingested_count: usize,
   1170     skipped_count: usize,
   1171     unsupported_count: usize,
   1172     failed_count: usize,
   1173     first_failure_reason: Option<String>,
   1174 }
   1175 
   1176 impl RelayIngestCounts {
   1177     fn reason_code(&self) -> Option<&'static str> {
   1178         if self.failed_count > 0 {
   1179             Some("sync_ingest_failed")
   1180         } else if self.skipped_count > 0 {
   1181             Some("sync_no_overwrite")
   1182         } else {
   1183             None
   1184         }
   1185     }
   1186 
   1187     fn reason(&self) -> Option<String> {
   1188         if self.failed_count > 0 {
   1189             return Some(match &self.first_failure_reason {
   1190                 Some(reason) => format!(
   1191                     "{} fetched event(s) failed ingest: {}",
   1192                     self.failed_count, reason
   1193                 ),
   1194                 None => format!("{} fetched event(s) failed ingest", self.failed_count),
   1195             });
   1196         }
   1197         if self.skipped_count > 0 {
   1198             return Some(format!(
   1199                 "{} fetched event(s) skipped because the local replica already has current or newer state",
   1200                 self.skipped_count
   1201             ));
   1202         }
   1203         None
   1204     }
   1205 }
   1206 
   1207 fn relay_ingest_reason_code(
   1208     ingest: &RelayIngestCounts,
   1209     failed_relays: &[RelayFailureView],
   1210 ) -> Option<&'static str> {
   1211     ingest
   1212         .reason_code()
   1213         .or_else(|| (!failed_relays.is_empty()).then_some("relay_fetch_partial"))
   1214 }
   1215 
   1216 fn relay_ingest_reason(
   1217     ingest: &RelayIngestCounts,
   1218     failed_relays: &[RelayFailureView],
   1219 ) -> Option<String> {
   1220     let mut parts = Vec::new();
   1221     if let Some(reason) = ingest.reason() {
   1222         parts.push(reason);
   1223     }
   1224     if !failed_relays.is_empty() {
   1225         parts.push(format!(
   1226             "{} relay(s) failed during fetch: {}",
   1227             failed_relays.len(),
   1228             relay_failure_reason(failed_relays)
   1229         ));
   1230     }
   1231 
   1232     if parts.is_empty() {
   1233         None
   1234     } else {
   1235         Some(parts.join("; "))
   1236     }
   1237 }
   1238 
   1239 fn relay_failure_reason(failed_relays: &[RelayFailureView]) -> String {
   1240     failed_relays
   1241         .iter()
   1242         .map(|failure| format!("{}: {}", failure.relay, failure.reason))
   1243         .collect::<Vec<_>>()
   1244         .join("; ")
   1245 }
   1246 
   1247 #[derive(Debug, Clone, Copy)]
   1248 pub(crate) enum RelayIngestScope {
   1249     SyncPull,
   1250     MarketRefresh,
   1251 }
   1252 
   1253 impl RelayIngestScope {
   1254     fn id(self) -> &'static str {
   1255         match self {
   1256             Self::SyncPull => "sync_pull",
   1257             Self::MarketRefresh => "market_refresh",
   1258         }
   1259     }
   1260 
   1261     fn display(self) -> &'static str {
   1262         match self {
   1263             Self::SyncPull => "sync pull",
   1264             Self::MarketRefresh => "market refresh",
   1265         }
   1266     }
   1267 
   1268     fn stale_after_seconds(self) -> u64 {
   1269         match self {
   1270             Self::SyncPull => SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS,
   1271             Self::MarketRefresh => MARKET_FRESHNESS_STALE_AFTER_SECONDS,
   1272         }
   1273     }
   1274 
   1275     fn kinds(self) -> &'static [u32] {
   1276         match self {
   1277             Self::SyncPull => SYNC_PULL_KINDS,
   1278             Self::MarketRefresh => MARKET_REFRESH_KINDS,
   1279         }
   1280     }
   1281 
   1282     fn filter(self) -> RadrootsNostrFilter {
   1283         RadrootsNostrFilter::new()
   1284             .kinds(
   1285                 self.kinds()
   1286                     .iter()
   1287                     .copied()
   1288                     .map(|kind| radroots_nostr_kind(kind as u16)),
   1289             )
   1290             .limit(RELAY_FETCH_LIMIT)
   1291     }
   1292 
   1293     fn ready_action(self) -> &'static str {
   1294         match self {
   1295             Self::SyncPull => SYNC_READY_ACTION,
   1296             Self::MarketRefresh => MARKET_READY_ACTION,
   1297         }
   1298     }
   1299 
   1300     fn supports_kind(self, kind: u32) -> bool {
   1301         self.kinds().contains(&kind)
   1302     }
   1303 }
   1304 
   1305 fn ingest_events(
   1306     executor: &SqliteExecutor,
   1307     receipt: &DirectRelayFetchReceipt,
   1308     scope: RelayIngestScope,
   1309 ) -> Result<RelayIngestCounts, RuntimeError> {
   1310     let mut counts = RelayIngestCounts {
   1311         fetched_count: receipt.events.len(),
   1312         ..RelayIngestCounts::default()
   1313     };
   1314 
   1315     for event in &receipt.events {
   1316         if !scope.supports_kind(event_kind(event)) {
   1317             counts.unsupported_count += 1;
   1318             continue;
   1319         }
   1320         let event = radroots_event_from_nostr(event);
   1321         match radroots_replica_ingest_event(executor, &event) {
   1322             Ok(RadrootsReplicaIngestOutcome::Applied) => counts.ingested_count += 1,
   1323             Ok(RadrootsReplicaIngestOutcome::Skipped) => counts.skipped_count += 1,
   1324             Err(error @ RadrootsReplicaEventsError::Sql(_)) => return Err(error.into()),
   1325             Err(error) => {
   1326                 counts.failed_count += 1;
   1327                 if counts.first_failure_reason.is_none() {
   1328                     counts.first_failure_reason = Some(error.to_string());
   1329                 }
   1330             }
   1331         }
   1332     }
   1333 
   1334     Ok(counts)
   1335 }
   1336 
   1337 fn event_kind(event: &radroots_nostr::prelude::RadrootsNostrEvent) -> u32 {
   1338     u32::from(event.kind.as_u16())
   1339 }
   1340 
   1341 fn relay_failures(failures: Vec<DirectRelayFailure>) -> Vec<RelayFailureView> {
   1342     failures
   1343         .into_iter()
   1344         .map(|failure| RelayFailureView {
   1345             relay: failure.relay,
   1346             reason: failure.reason,
   1347         })
   1348         .collect()
   1349 }
   1350 
   1351 fn merge_fetch_receipt(
   1352     target: &mut Option<DirectRelayFetchReceipt>,
   1353     receipt: DirectRelayFetchReceipt,
   1354 ) {
   1355     match target {
   1356         Some(target) => {
   1357             push_unique_many(&mut target.target_relays, receipt.target_relays.iter());
   1358             push_unique_many(
   1359                 &mut target.connected_relays,
   1360                 receipt.connected_relays.iter(),
   1361             );
   1362             for failure in receipt.failed_relays {
   1363                 if !target
   1364                     .failed_relays
   1365                     .iter()
   1366                     .any(|existing| existing.relay == failure.relay)
   1367                 {
   1368                     target.failed_relays.push(failure);
   1369                 }
   1370             }
   1371             target.events.extend(receipt.events);
   1372         }
   1373         None => *target = Some(receipt),
   1374     }
   1375 }
   1376 
   1377 fn push_unique_many<'a>(target: &mut Vec<String>, values: impl Iterator<Item = &'a String>) {
   1378     for value in values {
   1379         if !target.contains(value) {
   1380             target.push(value.clone());
   1381         }
   1382     }
   1383 }
   1384 
   1385 fn unix_now() -> u64 {
   1386     SystemTime::now()
   1387         .duration_since(UNIX_EPOCH)
   1388         .map(|duration| duration.as_secs())
   1389         .unwrap_or(0)
   1390 }
   1391 
   1392 fn relative_age(age_seconds: u64) -> String {
   1393     match age_seconds {
   1394         0 => "now".to_owned(),
   1395         1..=59 => format!("{age_seconds}s ago"),
   1396         60..=3_599 => format!("{}m ago", age_seconds / 60),
   1397         3_600..=86_399 => format!("{}h ago", age_seconds / 3_600),
   1398         _ => format!("{}d ago", age_seconds / 86_400),
   1399     }
   1400 }
   1401 
   1402 #[cfg(test)]
   1403 mod tests {
   1404     use std::path::{Path, PathBuf};
   1405 
   1406     use radroots_events::farm::{RadrootsFarm, RadrootsFarmRef};
   1407     use radroots_events::ids::RadrootsEventId;
   1408     use radroots_events::kinds::{KIND_FARM, KIND_LIST_SET_GENERIC, KIND_LISTING, KIND_POST};
   1409     use radroots_events::list::RadrootsListEntry;
   1410     use radroots_events::list_set::RadrootsListSet;
   1411     use radroots_events::plot::RadrootsPlot;
   1412     use radroots_events::profile::{RadrootsProfile, RadrootsProfileType};
   1413     use radroots_events_codec::farm::encode as farm_encode;
   1414     use radroots_events_codec::list_set::encode as list_set_encode;
   1415     use radroots_events_codec::plot::encode as plot_encode;
   1416     use radroots_events_codec::profile::encode as profile_encode;
   1417     use radroots_events_codec::wire::WireEventParts;
   1418     use radroots_identity::RadrootsIdentity;
   1419     use radroots_nostr::prelude::{
   1420         RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_nostr_build_event,
   1421     };
   1422     use radroots_runtime_paths::RadrootsMigrationReport;
   1423     use radroots_sdk::{
   1424         PushOutboxEventReceipt, PushOutboxEventState, PushOutboxReceipt,
   1425         PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, SyncEventStoreStatus, SyncOutboxStatus,
   1426         SyncRelayTargetSummary, SyncStatusReceipt, SyncStatusSource,
   1427     };
   1428     use radroots_secret_vault::RadrootsSecretBackend;
   1429     use tempfile::tempdir;
   1430 
   1431     use super::{
   1432         DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, RelayIngestScope,
   1433         freshness_for_scope, market_refresh_with_fetcher, pull_with_fetcher,
   1434         relay_provenance_relays_for_scope, sdk_push_dry_run_view, sdk_push_view,
   1435         sdk_sync_status_view,
   1436     };
   1437     use crate::cli::global::{FindQueryArgs, RecordLookupArgs};
   1438     use crate::runtime::config::{
   1439         AccountConfig, AccountSecretContractConfig, HyfConfig, IdentityConfig, InteractionConfig,
   1440         LocalConfig, LoggingConfig, MigrationConfig, MycConfig, OutputConfig, OutputFormat,
   1441         PathsConfig, PublishConfig, PublishTransport, PublishTransportSource, RelayConfig,
   1442         RelayConfigSource, RelayPublishPolicy, RpcConfig, RuntimeConfig, SignerBackend,
   1443         SignerConfig, Verbosity,
   1444     };
   1445 
   1446     const FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA";
   1447     const PLOT_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAQ";
   1448     const LISTING_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAg";
   1449 
   1450     #[test]
   1451     fn sync_pull_dry_run_skips_relay_fetch() {
   1452         let dir = tempdir().expect("tempdir");
   1453         let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1454         config.output.dry_run = true;
   1455         crate::runtime::store::init(&config).expect("store init");
   1456 
   1457         let view = pull_with_fetcher(&config, |_, _| panic!("dry run must not fetch"))
   1458             .expect("sync pull dry run");
   1459 
   1460         assert_eq!(view.state, "ready");
   1461         assert_eq!(view.target_relays, vec!["wss://relay.example.com"]);
   1462         assert_eq!(view.fetched_count, Some(0));
   1463         assert_eq!(view.ingested_count, Some(0));
   1464         assert_eq!(view.skipped_count, Some(0));
   1465         assert_eq!(view.unsupported_count, Some(0));
   1466         assert_eq!(view.failed_count, Some(0));
   1467     }
   1468 
   1469     #[test]
   1470     fn sync_pull_no_relay_action_is_actionable() {
   1471         let dir = tempdir().expect("tempdir");
   1472         let config = sample_config(dir.path(), Vec::new());
   1473         crate::runtime::store::init(&config).expect("store init");
   1474 
   1475         let view = pull_with_fetcher(&config, |_, _| {
   1476             panic!("unconfigured sync pull must not fetch")
   1477         })
   1478         .expect("sync pull unconfigured");
   1479 
   1480         assert_eq!(view.state, "unconfigured");
   1481         assert_eq!(
   1482             view.actions,
   1483             vec!["radroots --relay wss://relay.example.com sync pull"]
   1484         );
   1485     }
   1486 
   1487     #[test]
   1488     fn sync_status_empty_sdk_store_reports_canonical_source() {
   1489         let dir = tempdir().expect("tempdir");
   1490         let config = sample_config(
   1491             dir.path(),
   1492             vec![
   1493                 "wss://relay-a.example.com".to_owned(),
   1494                 "wss://relay-b.example.com".to_owned(),
   1495             ],
   1496         );
   1497 
   1498         let view = sdk_sync_status_view(
   1499             &config,
   1500             sdk_status_receipt(
   1501                 0,
   1502                 0,
   1503                 0,
   1504                 0,
   1505                 0,
   1506                 0,
   1507                 0,
   1508                 0,
   1509                 None,
   1510                 None,
   1511                 &["wss://relay-a.example.com", "wss://relay-b.example.com"],
   1512             ),
   1513         );
   1514 
   1515         assert_eq!(view.state, "ready");
   1516         assert_eq!(view.source, "SDK canonical event store and outbox");
   1517         assert_eq!(view.replica_db, "legacy_derived_not_checked");
   1518         assert_eq!(view.relay_count, 2);
   1519         assert_eq!(view.queue.total_count, Some(0));
   1520         assert_eq!(view.queue.pending_count, 0);
   1521         assert_eq!(view.queue.retryable_count, Some(0));
   1522         assert_eq!(view.queue.terminal_count, Some(0));
   1523         assert_eq!(view.queue.ready_signed_count, Some(0));
   1524         assert_eq!(view.actions, vec!["radroots sync pull"]);
   1525     }
   1526 
   1527     #[test]
   1528     fn sync_status_reports_sdk_pending_retryable_and_terminal_outbox_counts() {
   1529         let dir = tempdir().expect("tempdir");
   1530         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1531 
   1532         let view = sdk_sync_status_view(
   1533             &config,
   1534             sdk_status_receipt(
   1535                 3,
   1536                 4,
   1537                 1,
   1538                 1,
   1539                 2,
   1540                 1,
   1541                 1,
   1542                 0,
   1543                 Some(1_700_000_010_000),
   1544                 Some("auth-required: login".to_owned()),
   1545                 &["wss://relay.example.com"],
   1546             ),
   1547         );
   1548 
   1549         assert_eq!(view.state, "ready");
   1550         assert_eq!(view.queue.expected_count, 4);
   1551         assert_eq!(view.queue.pending_count, 2);
   1552         assert_eq!(view.queue.retryable_count, Some(1));
   1553         assert_eq!(view.queue.terminal_count, Some(2));
   1554         assert_eq!(view.queue.failed_terminal_count, Some(1));
   1555         assert_eq!(view.queue.ready_signed_count, Some(1));
   1556         assert_eq!(view.queue.last_attempt_at_ms, Some(1_700_000_010_000));
   1557         assert_eq!(
   1558             view.queue.last_error.as_deref(),
   1559             Some("auth-required: login")
   1560         );
   1561         assert_eq!(view.actions, vec!["radroots sync push"]);
   1562     }
   1563 
   1564     #[test]
   1565     fn sync_push_dry_run_reports_sdk_ready_outbox_plan() {
   1566         let dir = tempdir().expect("tempdir");
   1567         let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1568         config.output.dry_run = true;
   1569 
   1570         let view = sdk_push_dry_run_view(
   1571             &config,
   1572             sdk_status_receipt(
   1573                 1,
   1574                 1,
   1575                 1,
   1576                 0,
   1577                 0,
   1578                 0,
   1579                 1,
   1580                 0,
   1581                 None,
   1582                 None,
   1583                 &["wss://relay.example.com"],
   1584             ),
   1585         );
   1586 
   1587         assert_eq!(view.state, "dry_run");
   1588         assert_eq!(view.source, "SDK outbox push");
   1589         assert_eq!(view.replica_db, "legacy_derived_not_checked");
   1590         assert_eq!(view.target_relays, vec!["wss://relay.example.com"]);
   1591         assert_eq!(view.publishable_count, Some(1));
   1592         assert_eq!(view.published_count, Some(0));
   1593         assert_eq!(view.failed_count, Some(0));
   1594         assert_eq!(view.reason_code.as_deref(), Some("dry_run"));
   1595         assert_eq!(
   1596             view.reason.as_deref(),
   1597             Some("dry run requested; SDK outbox push skipped")
   1598         );
   1599         assert_eq!(
   1600             view.actions,
   1601             vec!["radroots sync push", "radroots sync status get"]
   1602         );
   1603         assert!(view.publish_plan.is_none());
   1604     }
   1605 
   1606     #[test]
   1607     fn sync_push_empty_queue_reports_ready_sdk_state() {
   1608         let dir = tempdir().expect("tempdir");
   1609         let config = sample_config(dir.path(), Vec::new());
   1610 
   1611         let view = sdk_push_view(
   1612             &config,
   1613             PushOutboxReceipt::default(),
   1614             sdk_status_receipt(0, 0, 0, 0, 0, 0, 0, 0, None, None, &[]),
   1615         );
   1616 
   1617         assert_eq!(view.state, "ready");
   1618         assert_eq!(view.publishable_count, Some(0));
   1619         assert_eq!(view.published_count, Some(0));
   1620         assert_eq!(view.failed_count, Some(0));
   1621         assert_eq!(
   1622             view.reason.as_deref(),
   1623             Some("SDK outbox had no ready signed events to push")
   1624         );
   1625         assert_eq!(view.actions, vec!["radroots sync status get"]);
   1626     }
   1627 
   1628     #[test]
   1629     fn sync_push_maps_published_and_auth_required_sdk_receipts() {
   1630         let dir = tempdir().expect("tempdir");
   1631         let config = sample_config(
   1632             dir.path(),
   1633             vec![
   1634                 "wss://relay-a.example.com".to_owned(),
   1635                 "wss://relay-b.example.com".to_owned(),
   1636             ],
   1637         );
   1638         let receipt = PushOutboxReceipt {
   1639             attempted_events: 2,
   1640             published_events: 1,
   1641             retryable_events: 1,
   1642             terminal_events: 0,
   1643             events: vec![
   1644                 sdk_push_event(
   1645                     "a",
   1646                     PushOutboxEventState::Published,
   1647                     PushOutboxRelayOutcomeKind::Accepted,
   1648                     "wss://relay-a.example.com",
   1649                     Some("accepted".to_owned()),
   1650                 ),
   1651                 sdk_push_event(
   1652                     "b",
   1653                     PushOutboxEventState::PublishRetryable,
   1654                     PushOutboxRelayOutcomeKind::AuthRequired,
   1655                     "wss://relay-b.example.com",
   1656                     Some("auth-required: login".to_owned()),
   1657                 ),
   1658             ],
   1659         };
   1660 
   1661         let view = sdk_push_view(
   1662             &config,
   1663             receipt,
   1664             sdk_status_receipt(
   1665                 2,
   1666                 2,
   1667                 0,
   1668                 1,
   1669                 1,
   1670                 0,
   1671                 0,
   1672                 0,
   1673                 Some(1_700_000_020_000),
   1674                 Some("auth-required: login".to_owned()),
   1675                 &["wss://relay-a.example.com", "wss://relay-b.example.com"],
   1676             ),
   1677         );
   1678 
   1679         assert_eq!(view.state, "partial");
   1680         assert_eq!(view.publishable_count, Some(2));
   1681         assert_eq!(view.published_count, Some(1));
   1682         assert_eq!(view.failed_count, Some(1));
   1683         assert_eq!(view.reason_code.as_deref(), Some("sdk_outbox_push_partial"));
   1684         assert_eq!(
   1685             view.target_relays,
   1686             vec![
   1687                 "wss://relay-a.example.com".to_owned(),
   1688                 "wss://relay-b.example.com".to_owned()
   1689             ]
   1690         );
   1691         assert_eq!(
   1692             view.connected_relays,
   1693             vec![
   1694                 "wss://relay-a.example.com".to_owned(),
   1695                 "wss://relay-b.example.com".to_owned()
   1696             ]
   1697         );
   1698         assert_eq!(
   1699             view.acknowledged_relays,
   1700             vec!["wss://relay-a.example.com".to_owned()]
   1701         );
   1702         assert_eq!(view.failed_relays.len(), 1);
   1703         assert_eq!(view.failed_relays[0].relay, "wss://relay-b.example.com");
   1704         assert_eq!(view.failed_relays[0].reason, "auth-required: login");
   1705         assert_eq!(
   1706             view.actions,
   1707             vec!["radroots sync push", "radroots sync status get"]
   1708         );
   1709     }
   1710 
   1711     fn sdk_status_receipt(
   1712         total_events: i64,
   1713         outbox_total_events: i64,
   1714         pending_events: i64,
   1715         retryable_events: i64,
   1716         terminal_events: i64,
   1717         failed_terminal_events: i64,
   1718         ready_signed_events: i64,
   1719         publishing_events: i64,
   1720         last_attempt_at_ms: Option<i64>,
   1721         last_error: Option<String>,
   1722         relays: &[&str],
   1723     ) -> SyncStatusReceipt {
   1724         SyncStatusReceipt {
   1725             source: SyncStatusSource::SdkCanonicalStores,
   1726             observed_at_ms: 1_700_000_030_000,
   1727             event_store: SyncEventStoreStatus {
   1728                 total_events,
   1729                 projection_eligible_events: total_events,
   1730                 relay_observations: 0,
   1731                 last_event_seq: (total_events > 0).then_some(total_events),
   1732                 last_event_updated_at_ms: (total_events > 0).then_some(1_700_000_000_000),
   1733             },
   1734             outbox: SyncOutboxStatus {
   1735                 total_events: outbox_total_events,
   1736                 pending_events,
   1737                 retryable_events,
   1738                 terminal_events,
   1739                 failed_terminal_events,
   1740                 ready_signed_events,
   1741                 publishing_events,
   1742                 last_attempt_at_ms,
   1743                 last_error,
   1744             },
   1745             relay_targets: SyncRelayTargetSummary {
   1746                 configured_count: relays.len(),
   1747                 configured_relays: relays.iter().map(|relay| (*relay).to_owned()).collect(),
   1748             },
   1749         }
   1750     }
   1751 
   1752     fn sdk_push_event(
   1753         event_id_prefix: &str,
   1754         final_state: PushOutboxEventState,
   1755         outcome_kind: PushOutboxRelayOutcomeKind,
   1756         relay_url: &str,
   1757         message: Option<String>,
   1758     ) -> PushOutboxEventReceipt {
   1759         PushOutboxEventReceipt {
   1760             event_id: RadrootsEventId::parse(event_id_prefix.repeat(64).as_str())
   1761                 .expect("event id"),
   1762             outbox_event_id: 7,
   1763             final_state,
   1764             attempted_count: 1,
   1765             accepted_count: usize::from(matches!(
   1766                 outcome_kind,
   1767                 PushOutboxRelayOutcomeKind::Accepted
   1768                     | PushOutboxRelayOutcomeKind::DuplicateAccepted
   1769             )),
   1770             retryable_count: usize::from(matches!(
   1771                 outcome_kind,
   1772                 PushOutboxRelayOutcomeKind::AuthRequired
   1773                     | PushOutboxRelayOutcomeKind::Timeout
   1774                     | PushOutboxRelayOutcomeKind::ConnectionFailed
   1775             )),
   1776             terminal_count: usize::from(matches!(
   1777                 outcome_kind,
   1778                 PushOutboxRelayOutcomeKind::Blocked
   1779                     | PushOutboxRelayOutcomeKind::RateLimited
   1780                     | PushOutboxRelayOutcomeKind::Invalid
   1781                     | PushOutboxRelayOutcomeKind::PowRequired
   1782                     | PushOutboxRelayOutcomeKind::Restricted
   1783                     | PushOutboxRelayOutcomeKind::Error
   1784                     | PushOutboxRelayOutcomeKind::Unknown
   1785             )),
   1786             quorum: 1,
   1787             quorum_met: matches!(
   1788                 outcome_kind,
   1789                 PushOutboxRelayOutcomeKind::Accepted
   1790                     | PushOutboxRelayOutcomeKind::DuplicateAccepted
   1791             ),
   1792             relays: vec![PushOutboxRelayReceipt {
   1793                 relay_url: relay_url.to_owned(),
   1794                 outcome_kind,
   1795                 attempted: true,
   1796                 message,
   1797             }],
   1798         }
   1799     }
   1800 
   1801     #[test]
   1802     fn sync_pull_ingests_relay_events_and_market_reads_without_daemon() {
   1803         let dir = tempdir().expect("tempdir");
   1804         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1805         crate::runtime::store::init(&config).expect("store init");
   1806         let seller = identity(7);
   1807         let seller_pubkey = seller.public_key_hex();
   1808         let listing_addr = format!("{KIND_LISTING}:{seller_pubkey}:{LISTING_D_TAG}");
   1809         let events = vec![
   1810             profile_event(&seller),
   1811             farm_event(&seller),
   1812             plot_event(&seller),
   1813             listing_event(&seller),
   1814             list_set_event(&seller),
   1815         ];
   1816 
   1817         let view = pull_with_fetcher(&config, fake_fetcher(events)).expect("sync pull ingest");
   1818 
   1819         assert_eq!(view.state, "ready");
   1820         assert_eq!(view.fetched_count, Some(5));
   1821         assert_eq!(view.ingested_count, Some(5));
   1822         assert_eq!(view.skipped_count, Some(0));
   1823         assert_eq!(view.unsupported_count, Some(0));
   1824         assert_eq!(view.failed_count, Some(0));
   1825         assert_eq!(view.reason, None);
   1826 
   1827         let search = crate::runtime::find::search(
   1828             &config,
   1829             &FindQueryArgs {
   1830                 query: vec!["eggs".to_owned()],
   1831             },
   1832         )
   1833         .expect("market search");
   1834         assert_eq!(search.state, "ready");
   1835         assert_eq!(search.count, 1);
   1836         assert_eq!(
   1837             search.results[0].listing_addr.as_deref(),
   1838             Some(listing_addr.as_str())
   1839         );
   1840 
   1841         let listing = crate::runtime::listing::get(
   1842             &config,
   1843             &RecordLookupArgs {
   1844                 key: "pasture-eggs".to_owned(),
   1845             },
   1846         )
   1847         .expect("listing get");
   1848         assert_eq!(listing.state, "ready");
   1849         assert_eq!(listing.listing_addr.as_deref(), Some(listing_addr.as_str()));
   1850     }
   1851 
   1852     #[test]
   1853     fn market_refresh_uses_market_scope_for_ingest() {
   1854         let dir = tempdir().expect("tempdir");
   1855         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1856         crate::runtime::store::init(&config).expect("store init");
   1857         let seller = identity(8);
   1858         let events = vec![listing_event(&seller), plot_event(&seller)];
   1859 
   1860         let view =
   1861             market_refresh_with_fetcher(&config, fake_fetcher(events)).expect("market refresh");
   1862 
   1863         assert_eq!(view.state, "ready");
   1864         assert_eq!(view.fetched_count, Some(2));
   1865         assert_eq!(view.ingested_count, Some(1));
   1866         assert_eq!(view.unsupported_count, Some(1));
   1867         assert_eq!(view.failed_count, Some(0));
   1868     }
   1869 
   1870     #[test]
   1871     fn market_refresh_records_relay_provenance_relays_for_order_drafts() {
   1872         let dir = tempdir().expect("tempdir");
   1873         let config = sample_config(
   1874             dir.path(),
   1875             vec![
   1876                 "wss://relay-a.example.com".to_owned(),
   1877                 "wss://relay-b.example.com".to_owned(),
   1878             ],
   1879         );
   1880         crate::runtime::store::init(&config).expect("store init");
   1881         let seller = identity(9);
   1882 
   1883         let _ = market_refresh_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)]))
   1884             .expect("market refresh");
   1885         let relays = relay_provenance_relays_for_scope(&config, RelayIngestScope::MarketRefresh)
   1886             .expect("relay provenance");
   1887 
   1888         assert_eq!(
   1889             relays,
   1890             vec![
   1891                 "wss://relay-a.example.com".to_owned(),
   1892                 "wss://relay-b.example.com".to_owned()
   1893             ]
   1894         );
   1895     }
   1896 
   1897     #[test]
   1898     fn relay_refresh_records_current_run_freshness() {
   1899         let dir = tempdir().expect("tempdir");
   1900         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1901         crate::runtime::store::init(&config).expect("store init");
   1902         let seller = identity(10);
   1903 
   1904         let view = market_refresh_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)]))
   1905             .expect("market refresh");
   1906 
   1907         assert_eq!(view.freshness.state, "fresh");
   1908         let run = view.freshness.run.as_ref().expect("run freshness");
   1909         assert_eq!(run.scope, "market_refresh");
   1910         assert_eq!(run.last_state, "success");
   1911         assert_eq!(run.relay_set_current, true);
   1912         assert_eq!(run.fetched_count, Some(1));
   1913         assert_eq!(run.ingested_count, Some(1));
   1914     }
   1915 
   1916     #[test]
   1917     fn sync_pull_reports_partial_relay_fetch_reason_code() {
   1918         let dir = tempdir().expect("tempdir");
   1919         let config = sample_config(
   1920             dir.path(),
   1921             vec![
   1922                 "wss://relay-a.example.com".to_owned(),
   1923                 "wss://relay-b.example.com".to_owned(),
   1924             ],
   1925         );
   1926         crate::runtime::store::init(&config).expect("store init");
   1927         let seller = identity(13);
   1928 
   1929         let view = pull_with_fetcher(&config, |relays, _| {
   1930             Ok(DirectRelayFetchReceipt {
   1931                 target_relays: relays.to_vec(),
   1932                 connected_relays: vec![relays[0].clone()],
   1933                 failed_relays: vec![DirectRelayFailure {
   1934                     relay: relays[1].clone(),
   1935                     reason: "connection refused".to_owned(),
   1936                 }],
   1937                 events: vec![listing_event(&seller)],
   1938             })
   1939         })
   1940         .expect("sync pull partial relay fetch");
   1941 
   1942         assert_eq!(view.state, "ready");
   1943         assert_eq!(view.connected_relays, vec!["wss://relay-a.example.com"]);
   1944         assert_eq!(view.failed_relays.len(), 1);
   1945         assert_eq!(view.failed_count, Some(1));
   1946         assert_eq!(view.reason_code.as_deref(), Some("relay_fetch_partial"));
   1947         assert!(
   1948             view.reason
   1949                 .as_deref()
   1950                 .expect("partial relay reason")
   1951                 .contains("relay(s) failed during fetch")
   1952         );
   1953         let run = view.freshness.run.as_ref().expect("run freshness");
   1954         assert_eq!(run.last_state, "partial");
   1955         assert_eq!(run.failed_count, Some(1));
   1956     }
   1957 
   1958     #[test]
   1959     fn sync_pull_reports_no_overwrite_skips_without_replacing_projection() {
   1960         let dir = tempdir().expect("tempdir");
   1961         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   1962         crate::runtime::store::init(&config).expect("store init");
   1963         let seller = identity(12);
   1964 
   1965         let first = listing_event_with_title_at(&seller, "Pasture Eggs", 200);
   1966         let stale = listing_event_with_title_at(&seller, "Older Eggs", 199);
   1967         pull_with_fetcher(&config, fake_fetcher(vec![first])).expect("initial sync pull");
   1968         let view = pull_with_fetcher(&config, fake_fetcher(vec![stale])).expect("stale sync pull");
   1969 
   1970         assert_eq!(view.state, "ready");
   1971         assert_eq!(view.fetched_count, Some(1));
   1972         assert_eq!(view.ingested_count, Some(0));
   1973         assert_eq!(view.skipped_count, Some(1));
   1974         assert_eq!(view.reason_code.as_deref(), Some("sync_no_overwrite"));
   1975         assert!(
   1976             view.reason
   1977                 .as_deref()
   1978                 .expect("skip reason")
   1979                 .contains("current or newer state")
   1980         );
   1981         let run = view.freshness.run.as_ref().expect("run freshness");
   1982         assert_eq!(run.last_state, "success");
   1983         assert_eq!(run.skipped_count, Some(1));
   1984 
   1985         let search = crate::runtime::find::search(
   1986             &config,
   1987             &FindQueryArgs {
   1988                 query: vec!["eggs".to_owned()],
   1989             },
   1990         )
   1991         .expect("market search");
   1992         assert_eq!(search.results[0].title, "Pasture Eggs");
   1993     }
   1994 
   1995     #[test]
   1996     fn sync_pull_freshness_reports_relay_set_changed() {
   1997         let dir = tempdir().expect("tempdir");
   1998         let config = sample_config(dir.path(), vec!["wss://relay-a.example.com".to_owned()]);
   1999         crate::runtime::store::init(&config).expect("store init");
   2000         let seller = identity(11);
   2001         pull_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])).expect("sync pull");
   2002         let changed = sample_config(dir.path(), vec!["wss://relay-b.example.com".to_owned()]);
   2003 
   2004         let freshness =
   2005             freshness_for_scope(&changed, RelayIngestScope::SyncPull).expect("sync freshness");
   2006 
   2007         assert_eq!(freshness.state, "relay_set_changed");
   2008         let run = freshness.run.as_ref().expect("run freshness");
   2009         assert_eq!(run.scope, "sync_pull");
   2010         assert_eq!(run.relay_set_current, false);
   2011     }
   2012 
   2013     #[test]
   2014     fn relay_ingest_splits_unsupported_and_failed_events() {
   2015         let dir = tempdir().expect("tempdir");
   2016         let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
   2017         crate::runtime::store::init(&config).expect("store init");
   2018         let seller = identity(9);
   2019         let events = vec![
   2020             signed_event(
   2021                 &seller,
   2022                 WireEventParts {
   2023                     kind: KIND_POST,
   2024                     content: "hello".to_owned(),
   2025                     tags: Vec::new(),
   2026                 },
   2027             ),
   2028             signed_event(
   2029                 &seller,
   2030                 WireEventParts {
   2031                     kind: KIND_LISTING,
   2032                     content: "not a listing".to_owned(),
   2033                     tags: Vec::new(),
   2034                 },
   2035             ),
   2036         ];
   2037 
   2038         let view = pull_with_fetcher(&config, fake_fetcher(events)).expect("sync pull ingest");
   2039 
   2040         assert_eq!(view.state, "ready");
   2041         assert_eq!(view.fetched_count, Some(2));
   2042         assert_eq!(view.ingested_count, Some(0));
   2043         assert_eq!(view.unsupported_count, Some(1));
   2044         assert_eq!(view.failed_count, Some(1));
   2045         assert!(
   2046             view.reason
   2047                 .as_deref()
   2048                 .expect("failure reason")
   2049                 .contains("failed ingest")
   2050         );
   2051     }
   2052 
   2053     fn fake_fetcher(
   2054         events: Vec<RadrootsNostrEvent>,
   2055     ) -> impl FnOnce(
   2056         &[String],
   2057         RadrootsNostrFilter,
   2058     ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> {
   2059         move |relays, _| {
   2060             Ok(DirectRelayFetchReceipt {
   2061                 target_relays: relays.to_vec(),
   2062                 connected_relays: relays.to_vec(),
   2063                 failed_relays: Vec::new(),
   2064                 events,
   2065             })
   2066         }
   2067     }
   2068 
   2069     fn profile_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent {
   2070         let profile = RadrootsProfile {
   2071             name: "seller".to_owned(),
   2072             display_name: Some("Seller".to_owned()),
   2073             nip05: None,
   2074             about: Some("market seller".to_owned()),
   2075             website: Some("https://seller.example.com".to_owned()),
   2076             picture: None,
   2077             banner: None,
   2078             lud06: None,
   2079             lud16: None,
   2080             bot: None,
   2081         };
   2082         signed_event(
   2083             identity,
   2084             profile_encode::to_wire_parts_with_profile_type(
   2085                 &profile,
   2086                 Some(RadrootsProfileType::Farm),
   2087             )
   2088             .expect("profile parts"),
   2089         )
   2090     }
   2091 
   2092     fn farm_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent {
   2093         let farm = RadrootsFarm {
   2094             d_tag: FARM_D_TAG.to_owned(),
   2095             name: "Relay Farm".to_owned(),
   2096             about: Some("relay farm".to_owned()),
   2097             website: Some("https://farm.example.com".to_owned()),
   2098             picture: None,
   2099             banner: None,
   2100             location: None,
   2101             tags: None,
   2102         };
   2103         signed_event(
   2104             identity,
   2105             farm_encode::to_wire_parts(&farm).expect("farm parts"),
   2106         )
   2107     }
   2108 
   2109     fn plot_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent {
   2110         let plot = RadrootsPlot {
   2111             d_tag: PLOT_D_TAG.to_owned(),
   2112             farm: RadrootsFarmRef {
   2113                 pubkey: identity.public_key_hex(),
   2114                 d_tag: FARM_D_TAG.to_owned(),
   2115             },
   2116             name: "Relay Plot".to_owned(),
   2117             about: Some("relay plot".to_owned()),
   2118             location: None,
   2119             tags: None,
   2120         };
   2121         signed_event(
   2122             identity,
   2123             plot_encode::to_wire_parts(&plot).expect("plot parts"),
   2124         )
   2125     }
   2126 
   2127     fn list_set_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent {
   2128         let list_set = RadrootsListSet {
   2129             d_tag: "member_of.farms".to_owned(),
   2130             content: String::new(),
   2131             entries: vec![RadrootsListEntry {
   2132                 tag: "p".to_owned(),
   2133                 values: vec![identity.public_key_hex()],
   2134             }],
   2135             title: None,
   2136             description: None,
   2137             image: None,
   2138         };
   2139         signed_event(
   2140             identity,
   2141             list_set_encode::to_wire_parts_with_kind(&list_set, KIND_LIST_SET_GENERIC)
   2142                 .expect("list set parts"),
   2143         )
   2144     }
   2145 
   2146     fn listing_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent {
   2147         listing_event_with_title_at(identity, "Pasture Eggs", 0)
   2148     }
   2149 
   2150     fn listing_event_with_title_at(
   2151         identity: &RadrootsIdentity,
   2152         title: &str,
   2153         created_at: u64,
   2154     ) -> RadrootsNostrEvent {
   2155         let mut builder = radroots_nostr_build_event(
   2156             KIND_LISTING,
   2157             "# Pasture Eggs",
   2158             vec![
   2159                 vec!["d".to_owned(), LISTING_D_TAG.to_owned()],
   2160                 vec![
   2161                     "a".to_owned(),
   2162                     format!("{}:{}:{}", KIND_FARM, identity.public_key_hex(), FARM_D_TAG),
   2163                 ],
   2164                 vec!["p".to_owned(), identity.public_key_hex()],
   2165                 vec!["key".to_owned(), "pasture-eggs".to_owned()],
   2166                 vec!["title".to_owned(), title.to_owned()],
   2167                 vec!["category".to_owned(), "eggs".to_owned()],
   2168                 vec!["summary".to_owned(), "Pasture-raised eggs".to_owned()],
   2169                 vec!["process".to_owned(), "washed".to_owned()],
   2170                 vec!["lot".to_owned(), "lot-a".to_owned()],
   2171                 vec!["profile".to_owned(), "dozen".to_owned()],
   2172                 vec!["year".to_owned(), "2026".to_owned()],
   2173                 vec!["radroots:primary_bin".to_owned(), "bin-a".to_owned()],
   2174                 vec![
   2175                     "radroots:bin".to_owned(),
   2176                     "bin-a".to_owned(),
   2177                     "12".to_owned(),
   2178                     "each".to_owned(),
   2179                     "12".to_owned(),
   2180                     "each".to_owned(),
   2181                     "dozen".to_owned(),
   2182                 ],
   2183                 vec![
   2184                     "radroots:price".to_owned(),
   2185                     "bin-a".to_owned(),
   2186                     "6".to_owned(),
   2187                     "USD".to_owned(),
   2188                     "1".to_owned(),
   2189                     "each".to_owned(),
   2190                     "6".to_owned(),
   2191                     "each".to_owned(),
   2192                 ],
   2193                 vec!["inventory".to_owned(), "5".to_owned()],
   2194                 vec!["status".to_owned(), "active".to_owned()],
   2195             ],
   2196         )
   2197         .expect("listing parts");
   2198         if created_at > 0 {
   2199             builder = builder.custom_created_at(RadrootsNostrTimestamp::from(created_at));
   2200         }
   2201         builder
   2202             .sign_with_keys(identity.keys())
   2203             .expect("signed event")
   2204     }
   2205 
   2206     fn signed_event(identity: &RadrootsIdentity, parts: WireEventParts) -> RadrootsNostrEvent {
   2207         radroots_nostr_build_event(parts.kind, parts.content, parts.tags)
   2208             .expect("event builder")
   2209             .sign_with_keys(identity.keys())
   2210             .expect("signed event")
   2211     }
   2212 
   2213     fn identity(seed: u8) -> RadrootsIdentity {
   2214         RadrootsIdentity::from_secret_key_bytes(&[seed; 32]).expect("identity")
   2215     }
   2216 
   2217     fn sample_config(root: &Path, relays: Vec<String>) -> RuntimeConfig {
   2218         let data = root.join("data");
   2219         let logs = root.join("logs");
   2220         let secrets = root.join("secrets");
   2221         RuntimeConfig {
   2222             output: OutputConfig {
   2223                 format: OutputFormat::Human,
   2224                 verbosity: Verbosity::Normal,
   2225                 color: true,
   2226                 dry_run: false,
   2227             },
   2228             interaction: InteractionConfig {
   2229                 input_enabled: true,
   2230                 assume_yes: false,
   2231                 stdin_tty: false,
   2232                 stdout_tty: false,
   2233                 prompts_allowed: false,
   2234                 confirmations_allowed: false,
   2235             },
   2236             paths: PathsConfig {
   2237                 profile: "interactive_user".into(),
   2238                 profile_source: "test".into(),
   2239                 allowed_profiles: vec!["interactive_user".into(), "repo_local".into()],
   2240                 root_source: "test".into(),
   2241                 repo_local_root: None,
   2242                 repo_local_root_source: None,
   2243                 subordinate_path_override_source: "runtime_config".into(),
   2244                 app_namespace: "apps/cli".into(),
   2245                 shared_accounts_namespace: "shared/accounts".into(),
   2246                 shared_identities_namespace: "shared/identities".into(),
   2247                 app_config_path: root.join("config/apps/cli/config.toml"),
   2248                 workspace_config_path: None,
   2249                 app_data_root: data.join("apps/cli"),
   2250                 app_logs_root: logs.join("apps/cli"),
   2251                 shared_accounts_data_root: data.join("shared/accounts"),
   2252                 shared_accounts_secrets_root: secrets.join("shared/accounts"),
   2253                 default_identity_path: secrets.join("shared/identities/default.json"),
   2254             },
   2255             migration: MigrationConfig {
   2256                 report: RadrootsMigrationReport::empty(),
   2257             },
   2258             logging: LoggingConfig {
   2259                 filter: "info".into(),
   2260                 directory: None,
   2261                 stdout: false,
   2262             },
   2263             account: AccountConfig {
   2264                 selector: None,
   2265                 store_path: data.join("shared/accounts/store.json"),
   2266                 secrets_dir: secrets.join("shared/accounts"),
   2267                 secret_backend: RadrootsSecretBackend::EncryptedFile,
   2268                 secret_fallback: None,
   2269             },
   2270             account_secret_contract: AccountSecretContractConfig {
   2271                 default_backend: "host_vault".into(),
   2272                 default_fallback: Some("encrypted_file".into()),
   2273                 allowed_backends: vec!["host_vault".into(), "encrypted_file".into()],
   2274                 host_vault_policy: Some("desktop".into()),
   2275                 uses_protected_store: true,
   2276             },
   2277             identity: IdentityConfig {
   2278                 path: secrets.join("shared/identities/default.json"),
   2279             },
   2280             signer: SignerConfig {
   2281                 backend: SignerBackend::Local,
   2282             },
   2283             publish: PublishConfig {
   2284                 transport: PublishTransport::DirectNostrRelay,
   2285                 source: PublishTransportSource::Defaults,
   2286                 radrootsd_proxy: crate::runtime::config::RadrootsdProxyConfig::default(),
   2287             },
   2288             relay: RelayConfig {
   2289                 urls: relays,
   2290                 publish_policy: RelayPublishPolicy::Any,
   2291                 source: RelayConfigSource::Defaults,
   2292             },
   2293             local: LocalConfig {
   2294                 root: data.join("apps/cli/replica"),
   2295                 replica_db_path: data.join("apps/cli/replica/replica.sqlite"),
   2296                 backups_dir: data.join("apps/cli/replica/backups"),
   2297                 exports_dir: data.join("apps/cli/replica/exports"),
   2298             },
   2299             myc: MycConfig {
   2300                 executable: PathBuf::from("myc"),
   2301                 status_timeout_ms: 2_000,
   2302             },
   2303             hyf: HyfConfig {
   2304                 enabled: false,
   2305                 executable: PathBuf::from("hyfd"),
   2306             },
   2307             rpc: RpcConfig {
   2308                 url: "http://127.0.0.1:7070".into(),
   2309             },
   2310             rhi: crate::runtime::config::RhiConfig {
   2311                 trusted_worker_pubkeys: Vec::new(),
   2312             },
   2313             capability_bindings: Vec::new(),
   2314         }
   2315     }
   2316 }