sync.rs (82313B)
1 use std::thread; 2 use std::time::{Duration, SystemTime, UNIX_EPOCH}; 3 4 use radroots_events::kinds::{ 5 KIND_FARM, KIND_LIST_SET_APP_CURATION, KIND_LIST_SET_BOOKMARK, KIND_LIST_SET_CALENDAR, 6 KIND_LIST_SET_CURATION, KIND_LIST_SET_EMOJI, KIND_LIST_SET_FOLLOW, KIND_LIST_SET_GENERIC, 7 KIND_LIST_SET_INTEREST, KIND_LIST_SET_KIND_MUTE, KIND_LIST_SET_MEDIA_STARTER_PACK, 8 KIND_LIST_SET_PICTURE, KIND_LIST_SET_RELAY, KIND_LIST_SET_RELEASE_ARTIFACT, 9 KIND_LIST_SET_STARTER_PACK, KIND_LIST_SET_VIDEO, KIND_LISTING, KIND_PLOT, KIND_PROFILE, 10 }; 11 use radroots_nostr::prelude::{ 12 RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_event_from_nostr, radroots_nostr_kind, 13 }; 14 use radroots_replica_db::{ReplicaSql, migrations}; 15 use radroots_replica_sync::{ 16 RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, radroots_replica_ingest_event, 17 radroots_replica_sync_status, 18 }; 19 use radroots_sdk::{ 20 PushOutboxEventReceipt, PushOutboxReceipt, PushOutboxRelayOutcomeKind, PushOutboxRequest, 21 SyncStatusReceipt, SyncStatusRequest, 22 }; 23 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 24 use serde::Deserialize; 25 use serde_json::json; 26 27 use crate::cli::global::SyncWatchArgs; 28 use crate::runtime::RuntimeError; 29 use crate::runtime::config::RuntimeConfig; 30 use crate::runtime::direct_relay::{ 31 DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, fetch_events_from_relays, 32 }; 33 use crate::runtime::sdk::{CliSdkAdapterError, CliSdkSession, sdk_relay_url_policy}; 34 use crate::view::runtime::{ 35 RelayFailureView, SyncActionView, SyncFreshnessView, SyncQueueView, SyncRunFreshnessView, 36 SyncStatusView, SyncWatchFrameView, SyncWatchView, 37 }; 38 39 const SYNC_SOURCE: &str = "local replica · local first"; 40 const SDK_SYNC_SOURCE: &str = "SDK canonical event store and outbox"; 41 const SDK_PUSH_SOURCE: &str = "SDK outbox push"; 42 const RELAY_PULL_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync pull"; 43 const SYNC_PULL_ACTION: &str = "radroots sync pull"; 44 const SYNC_PUSH_ACTION: &str = "radroots sync push"; 45 const SYNC_READY_ACTION: &str = "radroots market product search eggs"; 46 const MARKET_READY_ACTION: &str = "radroots market product search eggs"; 47 const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest"; 48 const RELAY_FETCH_LIMIT: usize = 1_000; 49 const RELAY_FETCH_MAX_PAGES: usize = 5; 50 const MARKET_FRESHNESS_STALE_AFTER_SECONDS: u64 = 15 * 60; 51 const SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS: u64 = 30 * 60; 52 const SYNC_RUN_TABLE: &str = "radroots_cli_sync_run"; 53 const MARKET_REFRESH_KINDS: &[u32] = &[KIND_PROFILE, KIND_FARM, KIND_LISTING]; 54 const SYNC_PULL_KINDS: &[u32] = &[ 55 KIND_PROFILE, 56 KIND_FARM, 57 KIND_PLOT, 58 KIND_LISTING, 59 KIND_LIST_SET_FOLLOW, 60 KIND_LIST_SET_GENERIC, 61 KIND_LIST_SET_RELAY, 62 KIND_LIST_SET_BOOKMARK, 63 KIND_LIST_SET_CURATION, 64 KIND_LIST_SET_VIDEO, 65 KIND_LIST_SET_PICTURE, 66 KIND_LIST_SET_KIND_MUTE, 67 KIND_LIST_SET_INTEREST, 68 KIND_LIST_SET_EMOJI, 69 KIND_LIST_SET_RELEASE_ARTIFACT, 70 KIND_LIST_SET_APP_CURATION, 71 KIND_LIST_SET_CALENDAR, 72 KIND_LIST_SET_STARTER_PACK, 73 KIND_LIST_SET_MEDIA_STARTER_PACK, 74 ]; 75 76 #[derive(Debug, Clone)] 77 struct SyncSnapshot { 78 state: String, 79 source: String, 80 local_root: String, 81 replica_db: String, 82 relay_count: usize, 83 publish_policy: String, 84 freshness: SyncFreshnessView, 85 queue: SyncQueueView, 86 reason: Option<String>, 87 actions: Vec<String>, 88 } 89 90 #[derive(Debug, Clone)] 91 struct SyncRunRecord { 92 scope: String, 93 relay_set_fingerprint: String, 94 target_relays_json: String, 95 connected_relays_json: String, 96 failed_relays_json: String, 97 started_at: u64, 98 completed_at: Option<u64>, 99 state: String, 100 fetched_count: usize, 101 ingested_count: usize, 102 skipped_count: usize, 103 unsupported_count: usize, 104 failed_count: usize, 105 failure_reason: Option<String>, 106 } 107 108 #[derive(Debug, Deserialize)] 109 struct SyncRunRow { 110 scope: String, 111 relay_set_fingerprint: String, 112 target_relays_json: String, 113 connected_relays_json: String, 114 failed_relays_json: String, 115 started_at: i64, 116 completed_at: Option<i64>, 117 state: String, 118 fetched_count: i64, 119 ingested_count: i64, 120 skipped_count: i64, 121 unsupported_count: i64, 122 failed_count: i64, 123 failure_reason: Option<String>, 124 } 125 126 pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, CliSdkAdapterError> { 127 let session = CliSdkSession::connect(config)?; 128 let receipt = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; 129 Ok(sdk_sync_status_view(config, receipt)) 130 } 131 132 pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { 133 pull_with_fetcher(config, fetch_events_from_relays_windowed) 134 } 135 136 pub fn market_refresh(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { 137 market_refresh_with_fetcher(config, fetch_events_from_relays_windowed) 138 } 139 140 fn pull_with_fetcher<F>(config: &RuntimeConfig, fetcher: F) -> Result<SyncActionView, RuntimeError> 141 where 142 F: FnOnce( 143 &[String], 144 RadrootsNostrFilter, 145 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>, 146 { 147 relay_ingest(config, RelayIngestScope::SyncPull, fetcher) 148 } 149 150 fn market_refresh_with_fetcher<F>( 151 config: &RuntimeConfig, 152 fetcher: F, 153 ) -> Result<SyncActionView, RuntimeError> 154 where 155 F: FnOnce( 156 &[String], 157 RadrootsNostrFilter, 158 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>, 159 { 160 relay_ingest(config, RelayIngestScope::MarketRefresh, fetcher) 161 } 162 163 fn fetch_events_from_relays_windowed( 164 relay_urls: &[String], 165 base_filter: RadrootsNostrFilter, 166 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { 167 let mut next_filter = base_filter.clone(); 168 let mut merged: Option<DirectRelayFetchReceipt> = None; 169 170 for _ in 0..RELAY_FETCH_MAX_PAGES { 171 let receipt = fetch_events_from_relays(relay_urls, next_filter)?; 172 let page_len = receipt.events.len(); 173 let oldest_created_at = receipt 174 .events 175 .iter() 176 .map(|event| event.created_at.as_secs()) 177 .min(); 178 merge_fetch_receipt(&mut merged, receipt); 179 if page_len < RELAY_FETCH_LIMIT { 180 break; 181 } 182 let Some(oldest_created_at) = oldest_created_at else { 183 break; 184 }; 185 if oldest_created_at == 0 { 186 break; 187 } 188 next_filter = base_filter 189 .clone() 190 .until(RadrootsNostrTimestamp::from(oldest_created_at - 1)) 191 .limit(RELAY_FETCH_LIMIT); 192 } 193 194 merged.ok_or(DirectRelayFetchError::MissingRelays) 195 } 196 197 fn relay_ingest<F>( 198 config: &RuntimeConfig, 199 scope: RelayIngestScope, 200 fetcher: F, 201 ) -> Result<SyncActionView, RuntimeError> 202 where 203 F: FnOnce( 204 &[String], 205 RadrootsNostrFilter, 206 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError>, 207 { 208 let snapshot = inspect_sync(config)?; 209 if snapshot.state == "unconfigured" { 210 return Ok(empty_action_from_snapshot(snapshot, "pull")); 211 } 212 213 if config.output.dry_run { 214 let mut view = empty_action_from_snapshot(snapshot, "pull"); 215 view.state = "ready".to_owned(); 216 view.reason = Some("dry run requested; relay fetch skipped".to_owned()); 217 view.target_relays = config.relay.urls.clone(); 218 view.fetched_count = Some(0); 219 view.ingested_count = Some(0); 220 view.publishable_count = None; 221 view.published_count = None; 222 view.skipped_count = Some(0); 223 view.unsupported_count = Some(0); 224 view.failed_count = Some(0); 225 view.reason_code = Some("dry_run".to_owned()); 226 view.actions = vec![scope.ready_action().to_owned()]; 227 return Ok(view); 228 } 229 230 let started_at = unix_now(); 231 let receipt = match fetcher(&config.relay.urls, scope.filter()) { 232 Ok(receipt) => receipt, 233 Err(DirectRelayFetchError::Connect { 234 reason, 235 target_relays, 236 failed_relays, 237 }) => { 238 let failed_relays = relay_failures(failed_relays); 239 let failure_reason = format!("direct relay connection failed: {reason}"); 240 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 241 migrations::run_all_up(&executor)?; 242 record_sync_run( 243 &executor, 244 &sync_record_from_failure( 245 scope, 246 &config.relay.urls, 247 target_relays.clone(), 248 failed_relays.clone(), 249 started_at, 250 failure_reason.clone(), 251 )?, 252 )?; 253 let mut view = empty_action_from_snapshot(snapshot, "pull"); 254 view.state = "unavailable".to_owned(); 255 view.reason = Some(failure_reason); 256 view.reason_code = Some("relay_fetch_failed".to_owned()); 257 view.target_relays = target_relays; 258 view.failed_relays = failed_relays; 259 view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?; 260 return Ok(view); 261 } 262 Err(error) => { 263 let failure_reason = error.to_string(); 264 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 265 migrations::run_all_up(&executor)?; 266 record_sync_run( 267 &executor, 268 &sync_record_from_failure( 269 scope, 270 &config.relay.urls, 271 config.relay.urls.clone(), 272 Vec::new(), 273 started_at, 274 failure_reason.clone(), 275 )?, 276 )?; 277 let mut view = empty_action_from_snapshot(snapshot, "pull"); 278 view.state = "unavailable".to_owned(); 279 view.reason = Some(failure_reason); 280 view.reason_code = Some("relay_fetch_failed".to_owned()); 281 view.target_relays = config.relay.urls.clone(); 282 view.freshness = freshness_for_scope_from_executor(config, &executor, scope)?; 283 return Ok(view); 284 } 285 }; 286 287 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 288 migrations::run_all_up(&executor)?; 289 let ingest = ingest_events(&executor, &receipt, scope)?; 290 record_sync_run( 291 &executor, 292 &sync_record_from_ingest(scope, &config.relay.urls, &receipt, &ingest, started_at)?, 293 )?; 294 let failed_relays = relay_failures(receipt.failed_relays); 295 let failed_count = ingest.failed_count + failed_relays.len(); 296 let reason_code = relay_ingest_reason_code(&ingest, &failed_relays).map(str::to_owned); 297 let reason = relay_ingest_reason(&ingest, &failed_relays); 298 let freshness = freshness_for_scope_from_executor(config, &executor, scope)?; 299 let queue = radroots_replica_sync_status(&executor)?; 300 301 Ok(SyncActionView { 302 direction: "pull".to_owned(), 303 state: "ready".to_owned(), 304 source: INGEST_SOURCE.to_owned(), 305 local_root: config.local.root.display().to_string(), 306 replica_db: "ready".to_owned(), 307 relay_count: config.relay.urls.len(), 308 publish_policy: config.relay.publish_policy.as_str().to_owned(), 309 freshness, 310 queue: legacy_sync_queue(queue.expected_count, queue.pending_count), 311 target_relays: receipt.target_relays, 312 connected_relays: receipt.connected_relays, 313 acknowledged_relays: Vec::new(), 314 failed_relays, 315 fetched_count: Some(ingest.fetched_count), 316 ingested_count: Some(ingest.ingested_count), 317 publishable_count: None, 318 published_count: None, 319 skipped_count: Some(ingest.skipped_count), 320 unsupported_count: Some(ingest.unsupported_count), 321 failed_count: Some(failed_count), 322 publish_plan: None, 323 reason_code, 324 reason, 325 actions: vec![scope.ready_action().to_owned()], 326 }) 327 } 328 329 pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, CliSdkAdapterError> { 330 let session = CliSdkSession::connect(config)?; 331 if config.output.dry_run { 332 let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; 333 return Ok(sdk_push_dry_run_view(config, status)); 334 } 335 336 let receipt = session.block_on(session.sdk().sync().push_outbox( 337 PushOutboxRequest::new().with_relay_url_policy(sdk_relay_url_policy(config)), 338 ))?; 339 let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; 340 Ok(sdk_push_view(config, receipt, status)) 341 } 342 343 pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchView, RuntimeError> { 344 if args.frames == 0 { 345 return Err(RuntimeError::Config( 346 "`sync watch --frames` must be greater than 0".to_owned(), 347 )); 348 } 349 350 let mut frames = Vec::with_capacity(args.frames); 351 let mut last_snapshot = None; 352 353 for index in 0..args.frames { 354 let snapshot = inspect_sync(config)?; 355 frames.push(SyncWatchFrameView { 356 sequence: index + 1, 357 observed_at: unix_now(), 358 state: snapshot.state.clone(), 359 relay_count: snapshot.relay_count, 360 freshness: snapshot.freshness.clone(), 361 queue: snapshot.queue.clone(), 362 }); 363 last_snapshot = Some(snapshot); 364 365 if index + 1 < args.frames { 366 thread::sleep(Duration::from_millis(args.interval_ms)); 367 } 368 } 369 370 let snapshot = last_snapshot.expect("watch frames are non-empty"); 371 Ok(SyncWatchView { 372 state: snapshot.state, 373 source: snapshot.source, 374 interval_ms: args.interval_ms, 375 frames, 376 reason: snapshot.reason, 377 actions: snapshot.actions, 378 }) 379 } 380 381 fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncActionView { 382 SyncActionView { 383 direction: direction.to_owned(), 384 state: snapshot.state, 385 source: snapshot.source, 386 local_root: snapshot.local_root, 387 replica_db: snapshot.replica_db, 388 relay_count: snapshot.relay_count, 389 publish_policy: snapshot.publish_policy, 390 freshness: snapshot.freshness, 391 queue: snapshot.queue, 392 target_relays: Vec::new(), 393 connected_relays: Vec::new(), 394 acknowledged_relays: Vec::new(), 395 failed_relays: Vec::new(), 396 fetched_count: None, 397 ingested_count: None, 398 publishable_count: None, 399 published_count: None, 400 skipped_count: None, 401 unsupported_count: None, 402 failed_count: None, 403 publish_plan: None, 404 reason_code: None, 405 reason: snapshot.reason, 406 actions: snapshot.actions, 407 } 408 } 409 410 fn sdk_sync_status_view(config: &RuntimeConfig, receipt: SyncStatusReceipt) -> SyncStatusView { 411 let actions = sdk_sync_status_actions(&receipt); 412 let relay_count = receipt.relay_targets.configured_count; 413 SyncStatusView { 414 state: "ready".to_owned(), 415 source: SDK_SYNC_SOURCE.to_owned(), 416 local_root: config.local.root.display().to_string(), 417 replica_db: "legacy_derived_not_checked".to_owned(), 418 relay_count, 419 publish_policy: config.relay.publish_policy.as_str().to_owned(), 420 freshness: sdk_sync_freshness(&receipt), 421 queue: sdk_sync_queue(&receipt), 422 reason: None, 423 actions, 424 } 425 } 426 427 fn sdk_push_dry_run_view(config: &RuntimeConfig, status: SyncStatusReceipt) -> SyncActionView { 428 let publishable_count = usize_from_i64(status.outbox.ready_signed_events); 429 let state = if publishable_count > 0 { 430 "dry_run" 431 } else { 432 "ready" 433 }; 434 let reason = if publishable_count > 0 { 435 Some("dry run requested; SDK outbox push skipped".to_owned()) 436 } else if status.outbox.total_events > 0 { 437 Some("SDK outbox has no ready signed events to push".to_owned()) 438 } else { 439 None 440 }; 441 sdk_push_action_view( 442 config, 443 state, 444 sdk_sync_queue(&status), 445 sdk_sync_freshness(&status), 446 status.relay_targets.configured_relays, 447 Vec::new(), 448 Vec::new(), 449 Vec::new(), 450 publishable_count, 451 0, 452 0, 453 Some(0), 454 reason, 455 sdk_sync_push_actions(state, publishable_count > 0), 456 ) 457 } 458 459 fn sdk_push_view( 460 config: &RuntimeConfig, 461 receipt: PushOutboxReceipt, 462 status: SyncStatusReceipt, 463 ) -> SyncActionView { 464 let failed_count = receipt.retryable_events + receipt.terminal_events; 465 let state = if receipt.attempted_events == 0 { 466 "ready" 467 } else if receipt.published_events > 0 && failed_count > 0 { 468 "partial" 469 } else if failed_count > 0 { 470 "unavailable" 471 } else if receipt.published_events > 0 { 472 "published" 473 } else { 474 "ready" 475 }; 476 let reason = sdk_push_reason(&receipt, failed_count); 477 sdk_push_action_view( 478 config, 479 state, 480 sdk_sync_queue(&status), 481 sdk_sync_freshness(&status), 482 sdk_push_target_relays(&receipt, &status), 483 sdk_push_connected_relays(&receipt), 484 sdk_push_acknowledged_relays(&receipt), 485 sdk_push_failed_relays(&receipt), 486 receipt.attempted_events, 487 receipt.published_events, 488 failed_count, 489 Some(0), 490 reason, 491 sdk_sync_push_actions(state, failed_count > 0), 492 ) 493 } 494 495 fn sdk_push_action_view( 496 config: &RuntimeConfig, 497 state: &str, 498 queue: SyncQueueView, 499 freshness: SyncFreshnessView, 500 target_relays: Vec<String>, 501 connected_relays: Vec<String>, 502 acknowledged_relays: Vec<String>, 503 failed_relays: Vec<RelayFailureView>, 504 publishable_count: usize, 505 published_count: usize, 506 failed_count: usize, 507 skipped_count: Option<usize>, 508 reason: Option<String>, 509 actions: Vec<String>, 510 ) -> SyncActionView { 511 SyncActionView { 512 direction: "push".to_owned(), 513 state: state.to_owned(), 514 source: SDK_PUSH_SOURCE.to_owned(), 515 local_root: config.local.root.display().to_string(), 516 replica_db: "legacy_derived_not_checked".to_owned(), 517 relay_count: config.relay.urls.len(), 518 publish_policy: config.relay.publish_policy.as_str().to_owned(), 519 freshness, 520 queue, 521 target_relays, 522 connected_relays, 523 acknowledged_relays, 524 failed_relays, 525 fetched_count: None, 526 ingested_count: None, 527 publishable_count: Some(publishable_count), 528 published_count: Some(published_count), 529 skipped_count, 530 unsupported_count: Some(0), 531 failed_count: Some(failed_count), 532 publish_plan: None, 533 reason_code: sdk_sync_push_reason_code(state).map(str::to_owned), 534 reason, 535 actions, 536 } 537 } 538 539 fn sdk_sync_status_actions(receipt: &SyncStatusReceipt) -> Vec<String> { 540 let mut actions = Vec::new(); 541 if receipt.outbox.ready_signed_events > 0 { 542 actions.push(SYNC_PUSH_ACTION.to_owned()); 543 } 544 if receipt.event_store.total_events == 0 { 545 actions.push(SYNC_PULL_ACTION.to_owned()); 546 } 547 actions 548 } 549 550 fn sdk_sync_push_actions(state: &str, retryable: bool) -> Vec<String> { 551 match state { 552 "published" | "ready" => vec!["radroots sync status get".to_owned()], 553 "dry_run" | "partial" | "unavailable" if retryable => { 554 vec![ 555 SYNC_PUSH_ACTION.to_owned(), 556 "radroots sync status get".to_owned(), 557 ] 558 } 559 _ => vec!["radroots sync status get".to_owned()], 560 } 561 } 562 563 fn sdk_sync_push_reason_code(state: &str) -> Option<&'static str> { 564 match state { 565 "dry_run" => Some("dry_run"), 566 "partial" => Some("sdk_outbox_push_partial"), 567 "unavailable" => Some("sdk_outbox_push_failed"), 568 _ => None, 569 } 570 } 571 572 fn sdk_push_reason(receipt: &PushOutboxReceipt, failed_count: usize) -> Option<String> { 573 if receipt.attempted_events == 0 { 574 return Some("SDK outbox had no ready signed events to push".to_owned()); 575 } 576 if failed_count > 0 && receipt.published_events > 0 { 577 return Some(format!( 578 "SDK outbox push published {} event(s); {failed_count} event(s) remain retryable or terminal", 579 receipt.published_events 580 )); 581 } 582 if failed_count > 0 { 583 return Some( 584 "SDK outbox push did not reach accepted quorum for any ready event".to_owned(), 585 ); 586 } 587 None 588 } 589 590 fn sdk_sync_queue(receipt: &SyncStatusReceipt) -> SyncQueueView { 591 let pending_count = usize_from_i64( 592 receipt 593 .outbox 594 .pending_events 595 .saturating_add(receipt.outbox.retryable_events), 596 ); 597 SyncQueueView { 598 expected_count: usize_from_i64(receipt.outbox.total_events), 599 pending_count, 600 total_count: Some(usize_from_i64(receipt.outbox.total_events)), 601 retryable_count: Some(usize_from_i64(receipt.outbox.retryable_events)), 602 terminal_count: Some(usize_from_i64(receipt.outbox.terminal_events)), 603 failed_terminal_count: Some(usize_from_i64(receipt.outbox.failed_terminal_events)), 604 ready_signed_count: Some(usize_from_i64(receipt.outbox.ready_signed_events)), 605 publishing_count: Some(usize_from_i64(receipt.outbox.publishing_events)), 606 last_attempt_at_ms: receipt.outbox.last_attempt_at_ms, 607 last_error: receipt.outbox.last_error.clone(), 608 } 609 } 610 611 fn legacy_sync_queue(expected_count: usize, pending_count: usize) -> SyncQueueView { 612 SyncQueueView { 613 expected_count, 614 pending_count, 615 total_count: None, 616 retryable_count: None, 617 terminal_count: None, 618 failed_terminal_count: None, 619 ready_signed_count: None, 620 publishing_count: None, 621 last_attempt_at_ms: None, 622 last_error: None, 623 } 624 } 625 626 fn sdk_sync_freshness(receipt: &SyncStatusReceipt) -> SyncFreshnessView { 627 let Some(last_event_updated_at_ms) = receipt.event_store.last_event_updated_at_ms else { 628 return missing_freshness(); 629 }; 630 let last_event_at = u64::try_from(last_event_updated_at_ms / 1_000).unwrap_or(0); 631 let observed_at = u64::try_from(receipt.observed_at_ms / 1_000).unwrap_or_else(|_| unix_now()); 632 let age_seconds = observed_at.saturating_sub(last_event_at); 633 SyncFreshnessView { 634 state: "synced".to_owned(), 635 display: format!("SDK event store updated {}", relative_age(age_seconds)), 636 age_seconds: Some(age_seconds), 637 last_event_at: Some(last_event_at), 638 run: None, 639 } 640 } 641 642 fn sdk_push_target_relays(receipt: &PushOutboxReceipt, status: &SyncStatusReceipt) -> Vec<String> { 643 let mut relays = Vec::new(); 644 for relay in receipt.events.iter().flat_map(|event| event.relays.iter()) { 645 if !relays.contains(&relay.relay_url) { 646 relays.push(relay.relay_url.clone()); 647 } 648 } 649 if relays.is_empty() { 650 relays.extend(status.relay_targets.configured_relays.clone()); 651 } 652 relays 653 } 654 655 fn sdk_push_connected_relays(receipt: &PushOutboxReceipt) -> Vec<String> { 656 sdk_push_relays_matching(receipt, |_, relay| relay.attempted) 657 } 658 659 fn sdk_push_acknowledged_relays(receipt: &PushOutboxReceipt) -> Vec<String> { 660 sdk_push_relays_matching(receipt, |_, relay| sdk_relay_accepted(relay.outcome_kind)) 661 } 662 663 fn sdk_push_relays_matching( 664 receipt: &PushOutboxReceipt, 665 predicate: impl Fn(&PushOutboxEventReceipt, &radroots_sdk::PushOutboxRelayReceipt) -> bool, 666 ) -> Vec<String> { 667 let mut relays = Vec::new(); 668 for event in &receipt.events { 669 for relay in &event.relays { 670 if predicate(event, relay) && !relays.contains(&relay.relay_url) { 671 relays.push(relay.relay_url.clone()); 672 } 673 } 674 } 675 relays 676 } 677 678 fn sdk_push_failed_relays(receipt: &PushOutboxReceipt) -> Vec<RelayFailureView> { 679 receipt 680 .events 681 .iter() 682 .flat_map(|event| event.relays.iter()) 683 .filter(|relay| !sdk_relay_accepted(relay.outcome_kind)) 684 .map(|relay| RelayFailureView { 685 relay: relay.relay_url.clone(), 686 reason: relay 687 .message 688 .clone() 689 .unwrap_or_else(|| sdk_relay_outcome_kind(relay.outcome_kind).to_owned()), 690 }) 691 .collect() 692 } 693 694 fn sdk_relay_accepted(kind: PushOutboxRelayOutcomeKind) -> bool { 695 matches!( 696 kind, 697 PushOutboxRelayOutcomeKind::Accepted | PushOutboxRelayOutcomeKind::DuplicateAccepted 698 ) 699 } 700 701 fn sdk_relay_outcome_kind(kind: PushOutboxRelayOutcomeKind) -> &'static str { 702 match kind { 703 PushOutboxRelayOutcomeKind::Accepted => "accepted", 704 PushOutboxRelayOutcomeKind::DuplicateAccepted => "duplicate_accepted", 705 PushOutboxRelayOutcomeKind::Blocked => "blocked", 706 PushOutboxRelayOutcomeKind::RateLimited => "rate_limited", 707 PushOutboxRelayOutcomeKind::Invalid => "invalid", 708 PushOutboxRelayOutcomeKind::PowRequired => "pow_required", 709 PushOutboxRelayOutcomeKind::Restricted => "restricted", 710 PushOutboxRelayOutcomeKind::AuthRequired => "auth_required", 711 PushOutboxRelayOutcomeKind::Error => "error", 712 PushOutboxRelayOutcomeKind::Timeout => "timeout", 713 PushOutboxRelayOutcomeKind::ConnectionFailed => "connection_failed", 714 PushOutboxRelayOutcomeKind::Unknown => "unknown", 715 _ => "unknown", 716 } 717 } 718 719 fn usize_from_i64(value: i64) -> usize { 720 usize::try_from(value.max(0)).unwrap_or(usize::MAX) 721 } 722 723 fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { 724 if !config.local.replica_db_path.exists() { 725 return Ok(SyncSnapshot { 726 state: "unconfigured".to_owned(), 727 source: SYNC_SOURCE.to_owned(), 728 local_root: config.local.root.display().to_string(), 729 replica_db: "missing".to_owned(), 730 relay_count: config.relay.urls.len(), 731 publish_policy: config.relay.publish_policy.as_str().to_owned(), 732 freshness: missing_freshness(), 733 queue: legacy_sync_queue(0, 0), 734 reason: Some("local replica database is not initialized".to_owned()), 735 actions: vec!["radroots store init".to_owned()], 736 }); 737 } 738 739 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 740 migrations::run_all_up(&executor)?; 741 let queue = radroots_replica_sync_status(&executor)?; 742 let freshness = 743 freshness_for_scope_from_executor(config, &executor, RelayIngestScope::SyncPull)?; 744 let relay_count = config.relay.urls.len(); 745 let publish_policy = config.relay.publish_policy.as_str().to_owned(); 746 let mut actions = Vec::new(); 747 748 if relay_count == 0 { 749 actions.push(RELAY_PULL_SETUP_ACTION.to_owned()); 750 return Ok(SyncSnapshot { 751 state: "unconfigured".to_owned(), 752 source: SYNC_SOURCE.to_owned(), 753 local_root: config.local.root.display().to_string(), 754 replica_db: "ready".to_owned(), 755 relay_count, 756 publish_policy, 757 freshness, 758 queue: legacy_sync_queue(queue.expected_count, queue.pending_count), 759 reason: Some("no relays are configured for this operator session".to_owned()), 760 actions, 761 }); 762 } 763 764 actions.push(SYNC_PULL_ACTION.to_owned()); 765 if queue.pending_count > 0 { 766 actions.push(SYNC_PUSH_ACTION.to_owned()); 767 } 768 769 Ok(SyncSnapshot { 770 state: "ready".to_owned(), 771 source: SYNC_SOURCE.to_owned(), 772 local_root: config.local.root.display().to_string(), 773 replica_db: "ready".to_owned(), 774 relay_count, 775 publish_policy, 776 freshness, 777 queue: legacy_sync_queue(queue.expected_count, queue.pending_count), 778 reason: None, 779 actions, 780 }) 781 } 782 783 pub(crate) fn missing_freshness() -> SyncFreshnessView { 784 SyncFreshnessView { 785 state: "never".to_owned(), 786 display: "never synced".to_owned(), 787 age_seconds: None, 788 last_event_at: None, 789 run: None, 790 } 791 } 792 793 pub(crate) fn freshness_for_scope( 794 config: &RuntimeConfig, 795 scope: RelayIngestScope, 796 ) -> Result<SyncFreshnessView, RuntimeError> { 797 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 798 migrations::run_all_up(&executor)?; 799 freshness_for_scope_from_executor(config, &executor, scope) 800 } 801 802 pub(crate) fn relay_provenance_relays_for_scope( 803 config: &RuntimeConfig, 804 scope: RelayIngestScope, 805 ) -> Result<Vec<String>, RuntimeError> { 806 if !config.local.replica_db_path.exists() { 807 return Ok(Vec::new()); 808 } 809 let executor = SqliteExecutor::open(&config.local.replica_db_path)?; 810 migrations::run_all_up(&executor)?; 811 ensure_sync_run_table(&executor)?; 812 let current_fingerprint = relay_set_fingerprint(&config.relay.urls); 813 let Some(run) = latest_sync_run(&executor, scope)? else { 814 return Ok(Vec::new()); 815 }; 816 if run.relay_set_fingerprint != current_fingerprint || !sync_run_successful(&run) { 817 return Ok(Vec::new()); 818 } 819 let mut relays: Vec<String> = serde_json::from_str(run.connected_relays_json.as_str())?; 820 relays.sort(); 821 relays.dedup(); 822 Ok(relays) 823 } 824 825 pub(crate) fn freshness_for_scope_from_executor( 826 config: &RuntimeConfig, 827 executor: &SqliteExecutor, 828 scope: RelayIngestScope, 829 ) -> Result<SyncFreshnessView, RuntimeError> { 830 let last_event_at = ReplicaSql::new(executor).nostr_event_last_created_at()?; 831 let now = unix_now(); 832 let age_seconds = last_event_at.map(|last_event_at| now.saturating_sub(last_event_at)); 833 ensure_sync_run_table(executor)?; 834 let current_fingerprint = relay_set_fingerprint(&config.relay.urls); 835 let latest = latest_sync_run(executor, scope)?; 836 let current = latest 837 .as_ref() 838 .filter(|run| run.relay_set_fingerprint == current_fingerprint); 839 let last_success = current.filter(|run| sync_run_successful(run)); 840 let state = freshness_state(scope, latest.as_ref(), current, last_success, age_seconds); 841 let display = freshness_display(scope, state.as_str(), age_seconds, current); 842 843 Ok(SyncFreshnessView { 844 state, 845 display, 846 age_seconds, 847 last_event_at, 848 run: latest.map(|run| sync_run_freshness_view(scope, run, current_fingerprint)), 849 }) 850 } 851 852 pub(crate) fn freshness_requires_refresh(freshness: &SyncFreshnessView) -> bool { 853 matches!( 854 freshness.state.as_str(), 855 "never" | "stale" | "relay_set_changed" | "refresh_failed" 856 ) 857 } 858 859 fn freshness_state( 860 scope: RelayIngestScope, 861 latest: Option<&SyncRunRecord>, 862 current: Option<&SyncRunRecord>, 863 last_success: Option<&SyncRunRecord>, 864 age_seconds: Option<u64>, 865 ) -> String { 866 let Some(latest) = latest else { 867 return "never".to_owned(); 868 }; 869 let Some(current) = current else { 870 return "relay_set_changed".to_owned(); 871 }; 872 if !sync_run_successful(current) { 873 return "refresh_failed".to_owned(); 874 } 875 if last_success.is_none() { 876 return "refresh_failed".to_owned(); 877 } 878 if age_seconds.is_none() { 879 return "fresh".to_owned(); 880 } 881 if age_seconds.unwrap_or_default() > scope.stale_after_seconds() { 882 return "stale".to_owned(); 883 } 884 if latest.state == "partial" { 885 return "partial".to_owned(); 886 } 887 "fresh".to_owned() 888 } 889 890 fn freshness_display( 891 scope: RelayIngestScope, 892 state: &str, 893 age_seconds: Option<u64>, 894 run: Option<&SyncRunRecord>, 895 ) -> String { 896 match state { 897 "fresh" => match age_seconds { 898 Some(age_seconds) => format!("{} fresh {}", scope.display(), relative_age(age_seconds)), 899 None => format!("{} fresh; no market events yet", scope.display()), 900 }, 901 "partial" => match age_seconds { 902 Some(age_seconds) => format!( 903 "{} partially refreshed {}", 904 scope.display(), 905 relative_age(age_seconds) 906 ), 907 None => format!( 908 "{} partially refreshed; no market events yet", 909 scope.display() 910 ), 911 }, 912 "stale" => match age_seconds { 913 Some(age_seconds) => format!("{} stale {}", scope.display(), relative_age(age_seconds)), 914 None => format!("{} stale", scope.display()), 915 }, 916 "relay_set_changed" => format!("{} relay set changed; refresh required", scope.display()), 917 "refresh_failed" => run 918 .and_then(|run| run.failure_reason.clone()) 919 .unwrap_or_else(|| format!("{} refresh failed", scope.display())), 920 _ => format!("{} never synced", scope.display()), 921 } 922 } 923 924 fn sync_run_successful(run: &SyncRunRecord) -> bool { 925 matches!(run.state.as_str(), "success" | "partial") 926 } 927 928 fn sync_run_freshness_view( 929 scope: RelayIngestScope, 930 run: SyncRunRecord, 931 current_fingerprint: String, 932 ) -> SyncRunFreshnessView { 933 let relay_set_current = run.relay_set_fingerprint == current_fingerprint; 934 let successful = sync_run_successful(&run); 935 let last_successful_at = successful.then_some(run.completed_at.unwrap_or(run.started_at)); 936 SyncRunFreshnessView { 937 scope: run.scope, 938 relay_set_fingerprint: run.relay_set_fingerprint, 939 relay_set_current, 940 last_state: run.state, 941 last_attempted_at: Some(run.started_at), 942 last_successful_at, 943 last_completed_at: run.completed_at, 944 stale_after_seconds: Some(scope.stale_after_seconds()), 945 fetched_count: Some(run.fetched_count), 946 ingested_count: Some(run.ingested_count), 947 skipped_count: Some(run.skipped_count), 948 unsupported_count: Some(run.unsupported_count), 949 failed_count: Some(run.failed_count), 950 failure_reason: run.failure_reason, 951 } 952 } 953 954 pub(crate) fn ensure_sync_run_table(executor: &SqliteExecutor) -> Result<(), RuntimeError> { 955 executor.exec( 956 "CREATE TABLE IF NOT EXISTS radroots_cli_sync_run ( 957 id INTEGER PRIMARY KEY AUTOINCREMENT, 958 scope TEXT NOT NULL, 959 relay_set_fingerprint TEXT NOT NULL, 960 target_relays_json TEXT NOT NULL, 961 connected_relays_json TEXT NOT NULL, 962 failed_relays_json TEXT NOT NULL, 963 started_at INTEGER NOT NULL, 964 completed_at INTEGER, 965 state TEXT NOT NULL, 966 fetched_count INTEGER NOT NULL, 967 ingested_count INTEGER NOT NULL, 968 skipped_count INTEGER NOT NULL, 969 unsupported_count INTEGER NOT NULL, 970 failed_count INTEGER NOT NULL, 971 failure_reason TEXT 972 ); 973 CREATE INDEX IF NOT EXISTS idx_radroots_cli_sync_run_scope_started 974 ON radroots_cli_sync_run(scope, started_at DESC);", 975 "[]", 976 )?; 977 Ok(()) 978 } 979 980 fn latest_sync_run( 981 executor: &SqliteExecutor, 982 scope: RelayIngestScope, 983 ) -> Result<Option<SyncRunRecord>, RuntimeError> { 984 let rows = executor.query_raw( 985 &format!( 986 "SELECT scope, 987 relay_set_fingerprint, 988 target_relays_json, 989 connected_relays_json, 990 failed_relays_json, 991 started_at, 992 completed_at, 993 state, 994 fetched_count, 995 ingested_count, 996 skipped_count, 997 unsupported_count, 998 failed_count, 999 failure_reason 1000 FROM {SYNC_RUN_TABLE} 1001 WHERE scope = ?1 1002 ORDER BY started_at DESC, id DESC 1003 LIMIT 1" 1004 ), 1005 json!([scope.id()]).to_string().as_str(), 1006 )?; 1007 let mut rows: Vec<SyncRunRow> = serde_json::from_str(rows.as_str())?; 1008 Ok(rows.pop().map(sync_run_record_from_row)) 1009 } 1010 1011 fn sync_run_record_from_row(row: SyncRunRow) -> SyncRunRecord { 1012 SyncRunRecord { 1013 scope: row.scope, 1014 relay_set_fingerprint: row.relay_set_fingerprint, 1015 target_relays_json: row.target_relays_json, 1016 connected_relays_json: row.connected_relays_json, 1017 failed_relays_json: row.failed_relays_json, 1018 started_at: u64_from_db(row.started_at), 1019 completed_at: row.completed_at.map(u64_from_db), 1020 state: row.state, 1021 fetched_count: usize_from_db(row.fetched_count), 1022 ingested_count: usize_from_db(row.ingested_count), 1023 skipped_count: usize_from_db(row.skipped_count), 1024 unsupported_count: usize_from_db(row.unsupported_count), 1025 failed_count: usize_from_db(row.failed_count), 1026 failure_reason: row.failure_reason, 1027 } 1028 } 1029 1030 fn record_sync_run(executor: &SqliteExecutor, record: &SyncRunRecord) -> Result<(), RuntimeError> { 1031 ensure_sync_run_table(executor)?; 1032 executor.exec( 1033 &format!( 1034 "INSERT INTO {SYNC_RUN_TABLE} ( 1035 scope, 1036 relay_set_fingerprint, 1037 target_relays_json, 1038 connected_relays_json, 1039 failed_relays_json, 1040 started_at, 1041 completed_at, 1042 state, 1043 fetched_count, 1044 ingested_count, 1045 skipped_count, 1046 unsupported_count, 1047 failed_count, 1048 failure_reason 1049 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)" 1050 ), 1051 json!([ 1052 record.scope.as_str(), 1053 record.relay_set_fingerprint.as_str(), 1054 record.target_relays_json.as_str(), 1055 record.connected_relays_json.as_str(), 1056 record.failed_relays_json.as_str(), 1057 i64_from_u64(record.started_at), 1058 record.completed_at.map(i64_from_u64), 1059 record.state.as_str(), 1060 i64_from_usize(record.fetched_count), 1061 i64_from_usize(record.ingested_count), 1062 i64_from_usize(record.skipped_count), 1063 i64_from_usize(record.unsupported_count), 1064 i64_from_usize(record.failed_count), 1065 record.failure_reason.as_deref(), 1066 ]) 1067 .to_string() 1068 .as_str(), 1069 )?; 1070 Ok(()) 1071 } 1072 1073 fn sync_record_from_failure( 1074 scope: RelayIngestScope, 1075 relays: &[String], 1076 target_relays: Vec<String>, 1077 failed_relays: Vec<RelayFailureView>, 1078 started_at: u64, 1079 reason: String, 1080 ) -> Result<SyncRunRecord, RuntimeError> { 1081 Ok(SyncRunRecord { 1082 scope: scope.id().to_owned(), 1083 relay_set_fingerprint: relay_set_fingerprint(relays), 1084 target_relays_json: serde_json::to_string(&target_relays)?, 1085 connected_relays_json: serde_json::to_string(&Vec::<String>::new())?, 1086 failed_relays_json: serde_json::to_string(&failed_relays)?, 1087 started_at, 1088 completed_at: Some(unix_now()), 1089 state: "failed".to_owned(), 1090 fetched_count: 0, 1091 ingested_count: 0, 1092 skipped_count: 0, 1093 unsupported_count: 0, 1094 failed_count: 1, 1095 failure_reason: Some(reason), 1096 }) 1097 } 1098 1099 fn sync_record_from_ingest( 1100 scope: RelayIngestScope, 1101 relays: &[String], 1102 receipt: &DirectRelayFetchReceipt, 1103 ingest: &RelayIngestCounts, 1104 started_at: u64, 1105 ) -> Result<SyncRunRecord, RuntimeError> { 1106 let failed_relays = relay_failures(receipt.failed_relays.clone()); 1107 let state = if ingest.failed_count > 0 || !failed_relays.is_empty() { 1108 "partial" 1109 } else { 1110 "success" 1111 }; 1112 Ok(SyncRunRecord { 1113 scope: scope.id().to_owned(), 1114 relay_set_fingerprint: relay_set_fingerprint(relays), 1115 target_relays_json: serde_json::to_string(&receipt.target_relays)?, 1116 connected_relays_json: serde_json::to_string(&receipt.connected_relays)?, 1117 failed_relays_json: serde_json::to_string(&failed_relays)?, 1118 started_at, 1119 completed_at: Some(unix_now()), 1120 state: state.to_owned(), 1121 fetched_count: ingest.fetched_count, 1122 ingested_count: ingest.ingested_count, 1123 skipped_count: ingest.skipped_count, 1124 unsupported_count: ingest.unsupported_count, 1125 failed_count: ingest.failed_count + failed_relays.len(), 1126 failure_reason: ingest.reason(), 1127 }) 1128 } 1129 1130 fn relay_set_fingerprint(relays: &[String]) -> String { 1131 let mut normalized = relays 1132 .iter() 1133 .map(|relay| relay.trim().to_ascii_lowercase()) 1134 .filter(|relay| !relay.is_empty()) 1135 .collect::<Vec<_>>(); 1136 normalized.sort(); 1137 normalized.dedup(); 1138 let mut hash = 0xcbf29ce484222325_u64; 1139 for relay in normalized { 1140 for byte in relay.as_bytes() { 1141 hash ^= u64::from(*byte); 1142 hash = hash.wrapping_mul(0x100000001b3); 1143 } 1144 hash ^= 0xff; 1145 hash = hash.wrapping_mul(0x100000001b3); 1146 } 1147 format!("relayset_{hash:016x}") 1148 } 1149 1150 fn u64_from_db(value: i64) -> u64 { 1151 u64::try_from(value).unwrap_or_default() 1152 } 1153 1154 fn usize_from_db(value: i64) -> usize { 1155 usize::try_from(value).unwrap_or_default() 1156 } 1157 1158 fn i64_from_u64(value: u64) -> i64 { 1159 i64::try_from(value).unwrap_or(i64::MAX) 1160 } 1161 1162 fn i64_from_usize(value: usize) -> i64 { 1163 i64::try_from(value).unwrap_or(i64::MAX) 1164 } 1165 1166 #[derive(Debug, Clone, Default)] 1167 struct RelayIngestCounts { 1168 fetched_count: usize, 1169 ingested_count: usize, 1170 skipped_count: usize, 1171 unsupported_count: usize, 1172 failed_count: usize, 1173 first_failure_reason: Option<String>, 1174 } 1175 1176 impl RelayIngestCounts { 1177 fn reason_code(&self) -> Option<&'static str> { 1178 if self.failed_count > 0 { 1179 Some("sync_ingest_failed") 1180 } else if self.skipped_count > 0 { 1181 Some("sync_no_overwrite") 1182 } else { 1183 None 1184 } 1185 } 1186 1187 fn reason(&self) -> Option<String> { 1188 if self.failed_count > 0 { 1189 return Some(match &self.first_failure_reason { 1190 Some(reason) => format!( 1191 "{} fetched event(s) failed ingest: {}", 1192 self.failed_count, reason 1193 ), 1194 None => format!("{} fetched event(s) failed ingest", self.failed_count), 1195 }); 1196 } 1197 if self.skipped_count > 0 { 1198 return Some(format!( 1199 "{} fetched event(s) skipped because the local replica already has current or newer state", 1200 self.skipped_count 1201 )); 1202 } 1203 None 1204 } 1205 } 1206 1207 fn relay_ingest_reason_code( 1208 ingest: &RelayIngestCounts, 1209 failed_relays: &[RelayFailureView], 1210 ) -> Option<&'static str> { 1211 ingest 1212 .reason_code() 1213 .or_else(|| (!failed_relays.is_empty()).then_some("relay_fetch_partial")) 1214 } 1215 1216 fn relay_ingest_reason( 1217 ingest: &RelayIngestCounts, 1218 failed_relays: &[RelayFailureView], 1219 ) -> Option<String> { 1220 let mut parts = Vec::new(); 1221 if let Some(reason) = ingest.reason() { 1222 parts.push(reason); 1223 } 1224 if !failed_relays.is_empty() { 1225 parts.push(format!( 1226 "{} relay(s) failed during fetch: {}", 1227 failed_relays.len(), 1228 relay_failure_reason(failed_relays) 1229 )); 1230 } 1231 1232 if parts.is_empty() { 1233 None 1234 } else { 1235 Some(parts.join("; ")) 1236 } 1237 } 1238 1239 fn relay_failure_reason(failed_relays: &[RelayFailureView]) -> String { 1240 failed_relays 1241 .iter() 1242 .map(|failure| format!("{}: {}", failure.relay, failure.reason)) 1243 .collect::<Vec<_>>() 1244 .join("; ") 1245 } 1246 1247 #[derive(Debug, Clone, Copy)] 1248 pub(crate) enum RelayIngestScope { 1249 SyncPull, 1250 MarketRefresh, 1251 } 1252 1253 impl RelayIngestScope { 1254 fn id(self) -> &'static str { 1255 match self { 1256 Self::SyncPull => "sync_pull", 1257 Self::MarketRefresh => "market_refresh", 1258 } 1259 } 1260 1261 fn display(self) -> &'static str { 1262 match self { 1263 Self::SyncPull => "sync pull", 1264 Self::MarketRefresh => "market refresh", 1265 } 1266 } 1267 1268 fn stale_after_seconds(self) -> u64 { 1269 match self { 1270 Self::SyncPull => SYNC_PULL_FRESHNESS_STALE_AFTER_SECONDS, 1271 Self::MarketRefresh => MARKET_FRESHNESS_STALE_AFTER_SECONDS, 1272 } 1273 } 1274 1275 fn kinds(self) -> &'static [u32] { 1276 match self { 1277 Self::SyncPull => SYNC_PULL_KINDS, 1278 Self::MarketRefresh => MARKET_REFRESH_KINDS, 1279 } 1280 } 1281 1282 fn filter(self) -> RadrootsNostrFilter { 1283 RadrootsNostrFilter::new() 1284 .kinds( 1285 self.kinds() 1286 .iter() 1287 .copied() 1288 .map(|kind| radroots_nostr_kind(kind as u16)), 1289 ) 1290 .limit(RELAY_FETCH_LIMIT) 1291 } 1292 1293 fn ready_action(self) -> &'static str { 1294 match self { 1295 Self::SyncPull => SYNC_READY_ACTION, 1296 Self::MarketRefresh => MARKET_READY_ACTION, 1297 } 1298 } 1299 1300 fn supports_kind(self, kind: u32) -> bool { 1301 self.kinds().contains(&kind) 1302 } 1303 } 1304 1305 fn ingest_events( 1306 executor: &SqliteExecutor, 1307 receipt: &DirectRelayFetchReceipt, 1308 scope: RelayIngestScope, 1309 ) -> Result<RelayIngestCounts, RuntimeError> { 1310 let mut counts = RelayIngestCounts { 1311 fetched_count: receipt.events.len(), 1312 ..RelayIngestCounts::default() 1313 }; 1314 1315 for event in &receipt.events { 1316 if !scope.supports_kind(event_kind(event)) { 1317 counts.unsupported_count += 1; 1318 continue; 1319 } 1320 let event = radroots_event_from_nostr(event); 1321 match radroots_replica_ingest_event(executor, &event) { 1322 Ok(RadrootsReplicaIngestOutcome::Applied) => counts.ingested_count += 1, 1323 Ok(RadrootsReplicaIngestOutcome::Skipped) => counts.skipped_count += 1, 1324 Err(error @ RadrootsReplicaEventsError::Sql(_)) => return Err(error.into()), 1325 Err(error) => { 1326 counts.failed_count += 1; 1327 if counts.first_failure_reason.is_none() { 1328 counts.first_failure_reason = Some(error.to_string()); 1329 } 1330 } 1331 } 1332 } 1333 1334 Ok(counts) 1335 } 1336 1337 fn event_kind(event: &radroots_nostr::prelude::RadrootsNostrEvent) -> u32 { 1338 u32::from(event.kind.as_u16()) 1339 } 1340 1341 fn relay_failures(failures: Vec<DirectRelayFailure>) -> Vec<RelayFailureView> { 1342 failures 1343 .into_iter() 1344 .map(|failure| RelayFailureView { 1345 relay: failure.relay, 1346 reason: failure.reason, 1347 }) 1348 .collect() 1349 } 1350 1351 fn merge_fetch_receipt( 1352 target: &mut Option<DirectRelayFetchReceipt>, 1353 receipt: DirectRelayFetchReceipt, 1354 ) { 1355 match target { 1356 Some(target) => { 1357 push_unique_many(&mut target.target_relays, receipt.target_relays.iter()); 1358 push_unique_many( 1359 &mut target.connected_relays, 1360 receipt.connected_relays.iter(), 1361 ); 1362 for failure in receipt.failed_relays { 1363 if !target 1364 .failed_relays 1365 .iter() 1366 .any(|existing| existing.relay == failure.relay) 1367 { 1368 target.failed_relays.push(failure); 1369 } 1370 } 1371 target.events.extend(receipt.events); 1372 } 1373 None => *target = Some(receipt), 1374 } 1375 } 1376 1377 fn push_unique_many<'a>(target: &mut Vec<String>, values: impl Iterator<Item = &'a String>) { 1378 for value in values { 1379 if !target.contains(value) { 1380 target.push(value.clone()); 1381 } 1382 } 1383 } 1384 1385 fn unix_now() -> u64 { 1386 SystemTime::now() 1387 .duration_since(UNIX_EPOCH) 1388 .map(|duration| duration.as_secs()) 1389 .unwrap_or(0) 1390 } 1391 1392 fn relative_age(age_seconds: u64) -> String { 1393 match age_seconds { 1394 0 => "now".to_owned(), 1395 1..=59 => format!("{age_seconds}s ago"), 1396 60..=3_599 => format!("{}m ago", age_seconds / 60), 1397 3_600..=86_399 => format!("{}h ago", age_seconds / 3_600), 1398 _ => format!("{}d ago", age_seconds / 86_400), 1399 } 1400 } 1401 1402 #[cfg(test)] 1403 mod tests { 1404 use std::path::{Path, PathBuf}; 1405 1406 use radroots_events::farm::{RadrootsFarm, RadrootsFarmRef}; 1407 use radroots_events::ids::RadrootsEventId; 1408 use radroots_events::kinds::{KIND_FARM, KIND_LIST_SET_GENERIC, KIND_LISTING, KIND_POST}; 1409 use radroots_events::list::RadrootsListEntry; 1410 use radroots_events::list_set::RadrootsListSet; 1411 use radroots_events::plot::RadrootsPlot; 1412 use radroots_events::profile::{RadrootsProfile, RadrootsProfileType}; 1413 use radroots_events_codec::farm::encode as farm_encode; 1414 use radroots_events_codec::list_set::encode as list_set_encode; 1415 use radroots_events_codec::plot::encode as plot_encode; 1416 use radroots_events_codec::profile::encode as profile_encode; 1417 use radroots_events_codec::wire::WireEventParts; 1418 use radroots_identity::RadrootsIdentity; 1419 use radroots_nostr::prelude::{ 1420 RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_nostr_build_event, 1421 }; 1422 use radroots_runtime_paths::RadrootsMigrationReport; 1423 use radroots_sdk::{ 1424 PushOutboxEventReceipt, PushOutboxEventState, PushOutboxReceipt, 1425 PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, SyncEventStoreStatus, SyncOutboxStatus, 1426 SyncRelayTargetSummary, SyncStatusReceipt, SyncStatusSource, 1427 }; 1428 use radroots_secret_vault::RadrootsSecretBackend; 1429 use tempfile::tempdir; 1430 1431 use super::{ 1432 DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, RelayIngestScope, 1433 freshness_for_scope, market_refresh_with_fetcher, pull_with_fetcher, 1434 relay_provenance_relays_for_scope, sdk_push_dry_run_view, sdk_push_view, 1435 sdk_sync_status_view, 1436 }; 1437 use crate::cli::global::{FindQueryArgs, RecordLookupArgs}; 1438 use crate::runtime::config::{ 1439 AccountConfig, AccountSecretContractConfig, HyfConfig, IdentityConfig, InteractionConfig, 1440 LocalConfig, LoggingConfig, MigrationConfig, MycConfig, OutputConfig, OutputFormat, 1441 PathsConfig, PublishConfig, PublishTransport, PublishTransportSource, RelayConfig, 1442 RelayConfigSource, RelayPublishPolicy, RpcConfig, RuntimeConfig, SignerBackend, 1443 SignerConfig, Verbosity, 1444 }; 1445 1446 const FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA"; 1447 const PLOT_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAQ"; 1448 const LISTING_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAg"; 1449 1450 #[test] 1451 fn sync_pull_dry_run_skips_relay_fetch() { 1452 let dir = tempdir().expect("tempdir"); 1453 let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1454 config.output.dry_run = true; 1455 crate::runtime::store::init(&config).expect("store init"); 1456 1457 let view = pull_with_fetcher(&config, |_, _| panic!("dry run must not fetch")) 1458 .expect("sync pull dry run"); 1459 1460 assert_eq!(view.state, "ready"); 1461 assert_eq!(view.target_relays, vec!["wss://relay.example.com"]); 1462 assert_eq!(view.fetched_count, Some(0)); 1463 assert_eq!(view.ingested_count, Some(0)); 1464 assert_eq!(view.skipped_count, Some(0)); 1465 assert_eq!(view.unsupported_count, Some(0)); 1466 assert_eq!(view.failed_count, Some(0)); 1467 } 1468 1469 #[test] 1470 fn sync_pull_no_relay_action_is_actionable() { 1471 let dir = tempdir().expect("tempdir"); 1472 let config = sample_config(dir.path(), Vec::new()); 1473 crate::runtime::store::init(&config).expect("store init"); 1474 1475 let view = pull_with_fetcher(&config, |_, _| { 1476 panic!("unconfigured sync pull must not fetch") 1477 }) 1478 .expect("sync pull unconfigured"); 1479 1480 assert_eq!(view.state, "unconfigured"); 1481 assert_eq!( 1482 view.actions, 1483 vec!["radroots --relay wss://relay.example.com sync pull"] 1484 ); 1485 } 1486 1487 #[test] 1488 fn sync_status_empty_sdk_store_reports_canonical_source() { 1489 let dir = tempdir().expect("tempdir"); 1490 let config = sample_config( 1491 dir.path(), 1492 vec![ 1493 "wss://relay-a.example.com".to_owned(), 1494 "wss://relay-b.example.com".to_owned(), 1495 ], 1496 ); 1497 1498 let view = sdk_sync_status_view( 1499 &config, 1500 sdk_status_receipt( 1501 0, 1502 0, 1503 0, 1504 0, 1505 0, 1506 0, 1507 0, 1508 0, 1509 None, 1510 None, 1511 &["wss://relay-a.example.com", "wss://relay-b.example.com"], 1512 ), 1513 ); 1514 1515 assert_eq!(view.state, "ready"); 1516 assert_eq!(view.source, "SDK canonical event store and outbox"); 1517 assert_eq!(view.replica_db, "legacy_derived_not_checked"); 1518 assert_eq!(view.relay_count, 2); 1519 assert_eq!(view.queue.total_count, Some(0)); 1520 assert_eq!(view.queue.pending_count, 0); 1521 assert_eq!(view.queue.retryable_count, Some(0)); 1522 assert_eq!(view.queue.terminal_count, Some(0)); 1523 assert_eq!(view.queue.ready_signed_count, Some(0)); 1524 assert_eq!(view.actions, vec!["radroots sync pull"]); 1525 } 1526 1527 #[test] 1528 fn sync_status_reports_sdk_pending_retryable_and_terminal_outbox_counts() { 1529 let dir = tempdir().expect("tempdir"); 1530 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1531 1532 let view = sdk_sync_status_view( 1533 &config, 1534 sdk_status_receipt( 1535 3, 1536 4, 1537 1, 1538 1, 1539 2, 1540 1, 1541 1, 1542 0, 1543 Some(1_700_000_010_000), 1544 Some("auth-required: login".to_owned()), 1545 &["wss://relay.example.com"], 1546 ), 1547 ); 1548 1549 assert_eq!(view.state, "ready"); 1550 assert_eq!(view.queue.expected_count, 4); 1551 assert_eq!(view.queue.pending_count, 2); 1552 assert_eq!(view.queue.retryable_count, Some(1)); 1553 assert_eq!(view.queue.terminal_count, Some(2)); 1554 assert_eq!(view.queue.failed_terminal_count, Some(1)); 1555 assert_eq!(view.queue.ready_signed_count, Some(1)); 1556 assert_eq!(view.queue.last_attempt_at_ms, Some(1_700_000_010_000)); 1557 assert_eq!( 1558 view.queue.last_error.as_deref(), 1559 Some("auth-required: login") 1560 ); 1561 assert_eq!(view.actions, vec!["radroots sync push"]); 1562 } 1563 1564 #[test] 1565 fn sync_push_dry_run_reports_sdk_ready_outbox_plan() { 1566 let dir = tempdir().expect("tempdir"); 1567 let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1568 config.output.dry_run = true; 1569 1570 let view = sdk_push_dry_run_view( 1571 &config, 1572 sdk_status_receipt( 1573 1, 1574 1, 1575 1, 1576 0, 1577 0, 1578 0, 1579 1, 1580 0, 1581 None, 1582 None, 1583 &["wss://relay.example.com"], 1584 ), 1585 ); 1586 1587 assert_eq!(view.state, "dry_run"); 1588 assert_eq!(view.source, "SDK outbox push"); 1589 assert_eq!(view.replica_db, "legacy_derived_not_checked"); 1590 assert_eq!(view.target_relays, vec!["wss://relay.example.com"]); 1591 assert_eq!(view.publishable_count, Some(1)); 1592 assert_eq!(view.published_count, Some(0)); 1593 assert_eq!(view.failed_count, Some(0)); 1594 assert_eq!(view.reason_code.as_deref(), Some("dry_run")); 1595 assert_eq!( 1596 view.reason.as_deref(), 1597 Some("dry run requested; SDK outbox push skipped") 1598 ); 1599 assert_eq!( 1600 view.actions, 1601 vec!["radroots sync push", "radroots sync status get"] 1602 ); 1603 assert!(view.publish_plan.is_none()); 1604 } 1605 1606 #[test] 1607 fn sync_push_empty_queue_reports_ready_sdk_state() { 1608 let dir = tempdir().expect("tempdir"); 1609 let config = sample_config(dir.path(), Vec::new()); 1610 1611 let view = sdk_push_view( 1612 &config, 1613 PushOutboxReceipt::default(), 1614 sdk_status_receipt(0, 0, 0, 0, 0, 0, 0, 0, None, None, &[]), 1615 ); 1616 1617 assert_eq!(view.state, "ready"); 1618 assert_eq!(view.publishable_count, Some(0)); 1619 assert_eq!(view.published_count, Some(0)); 1620 assert_eq!(view.failed_count, Some(0)); 1621 assert_eq!( 1622 view.reason.as_deref(), 1623 Some("SDK outbox had no ready signed events to push") 1624 ); 1625 assert_eq!(view.actions, vec!["radroots sync status get"]); 1626 } 1627 1628 #[test] 1629 fn sync_push_maps_published_and_auth_required_sdk_receipts() { 1630 let dir = tempdir().expect("tempdir"); 1631 let config = sample_config( 1632 dir.path(), 1633 vec![ 1634 "wss://relay-a.example.com".to_owned(), 1635 "wss://relay-b.example.com".to_owned(), 1636 ], 1637 ); 1638 let receipt = PushOutboxReceipt { 1639 attempted_events: 2, 1640 published_events: 1, 1641 retryable_events: 1, 1642 terminal_events: 0, 1643 events: vec![ 1644 sdk_push_event( 1645 "a", 1646 PushOutboxEventState::Published, 1647 PushOutboxRelayOutcomeKind::Accepted, 1648 "wss://relay-a.example.com", 1649 Some("accepted".to_owned()), 1650 ), 1651 sdk_push_event( 1652 "b", 1653 PushOutboxEventState::PublishRetryable, 1654 PushOutboxRelayOutcomeKind::AuthRequired, 1655 "wss://relay-b.example.com", 1656 Some("auth-required: login".to_owned()), 1657 ), 1658 ], 1659 }; 1660 1661 let view = sdk_push_view( 1662 &config, 1663 receipt, 1664 sdk_status_receipt( 1665 2, 1666 2, 1667 0, 1668 1, 1669 1, 1670 0, 1671 0, 1672 0, 1673 Some(1_700_000_020_000), 1674 Some("auth-required: login".to_owned()), 1675 &["wss://relay-a.example.com", "wss://relay-b.example.com"], 1676 ), 1677 ); 1678 1679 assert_eq!(view.state, "partial"); 1680 assert_eq!(view.publishable_count, Some(2)); 1681 assert_eq!(view.published_count, Some(1)); 1682 assert_eq!(view.failed_count, Some(1)); 1683 assert_eq!(view.reason_code.as_deref(), Some("sdk_outbox_push_partial")); 1684 assert_eq!( 1685 view.target_relays, 1686 vec![ 1687 "wss://relay-a.example.com".to_owned(), 1688 "wss://relay-b.example.com".to_owned() 1689 ] 1690 ); 1691 assert_eq!( 1692 view.connected_relays, 1693 vec![ 1694 "wss://relay-a.example.com".to_owned(), 1695 "wss://relay-b.example.com".to_owned() 1696 ] 1697 ); 1698 assert_eq!( 1699 view.acknowledged_relays, 1700 vec!["wss://relay-a.example.com".to_owned()] 1701 ); 1702 assert_eq!(view.failed_relays.len(), 1); 1703 assert_eq!(view.failed_relays[0].relay, "wss://relay-b.example.com"); 1704 assert_eq!(view.failed_relays[0].reason, "auth-required: login"); 1705 assert_eq!( 1706 view.actions, 1707 vec!["radroots sync push", "radroots sync status get"] 1708 ); 1709 } 1710 1711 fn sdk_status_receipt( 1712 total_events: i64, 1713 outbox_total_events: i64, 1714 pending_events: i64, 1715 retryable_events: i64, 1716 terminal_events: i64, 1717 failed_terminal_events: i64, 1718 ready_signed_events: i64, 1719 publishing_events: i64, 1720 last_attempt_at_ms: Option<i64>, 1721 last_error: Option<String>, 1722 relays: &[&str], 1723 ) -> SyncStatusReceipt { 1724 SyncStatusReceipt { 1725 source: SyncStatusSource::SdkCanonicalStores, 1726 observed_at_ms: 1_700_000_030_000, 1727 event_store: SyncEventStoreStatus { 1728 total_events, 1729 projection_eligible_events: total_events, 1730 relay_observations: 0, 1731 last_event_seq: (total_events > 0).then_some(total_events), 1732 last_event_updated_at_ms: (total_events > 0).then_some(1_700_000_000_000), 1733 }, 1734 outbox: SyncOutboxStatus { 1735 total_events: outbox_total_events, 1736 pending_events, 1737 retryable_events, 1738 terminal_events, 1739 failed_terminal_events, 1740 ready_signed_events, 1741 publishing_events, 1742 last_attempt_at_ms, 1743 last_error, 1744 }, 1745 relay_targets: SyncRelayTargetSummary { 1746 configured_count: relays.len(), 1747 configured_relays: relays.iter().map(|relay| (*relay).to_owned()).collect(), 1748 }, 1749 } 1750 } 1751 1752 fn sdk_push_event( 1753 event_id_prefix: &str, 1754 final_state: PushOutboxEventState, 1755 outcome_kind: PushOutboxRelayOutcomeKind, 1756 relay_url: &str, 1757 message: Option<String>, 1758 ) -> PushOutboxEventReceipt { 1759 PushOutboxEventReceipt { 1760 event_id: RadrootsEventId::parse(event_id_prefix.repeat(64).as_str()) 1761 .expect("event id"), 1762 outbox_event_id: 7, 1763 final_state, 1764 attempted_count: 1, 1765 accepted_count: usize::from(matches!( 1766 outcome_kind, 1767 PushOutboxRelayOutcomeKind::Accepted 1768 | PushOutboxRelayOutcomeKind::DuplicateAccepted 1769 )), 1770 retryable_count: usize::from(matches!( 1771 outcome_kind, 1772 PushOutboxRelayOutcomeKind::AuthRequired 1773 | PushOutboxRelayOutcomeKind::Timeout 1774 | PushOutboxRelayOutcomeKind::ConnectionFailed 1775 )), 1776 terminal_count: usize::from(matches!( 1777 outcome_kind, 1778 PushOutboxRelayOutcomeKind::Blocked 1779 | PushOutboxRelayOutcomeKind::RateLimited 1780 | PushOutboxRelayOutcomeKind::Invalid 1781 | PushOutboxRelayOutcomeKind::PowRequired 1782 | PushOutboxRelayOutcomeKind::Restricted 1783 | PushOutboxRelayOutcomeKind::Error 1784 | PushOutboxRelayOutcomeKind::Unknown 1785 )), 1786 quorum: 1, 1787 quorum_met: matches!( 1788 outcome_kind, 1789 PushOutboxRelayOutcomeKind::Accepted 1790 | PushOutboxRelayOutcomeKind::DuplicateAccepted 1791 ), 1792 relays: vec![PushOutboxRelayReceipt { 1793 relay_url: relay_url.to_owned(), 1794 outcome_kind, 1795 attempted: true, 1796 message, 1797 }], 1798 } 1799 } 1800 1801 #[test] 1802 fn sync_pull_ingests_relay_events_and_market_reads_without_daemon() { 1803 let dir = tempdir().expect("tempdir"); 1804 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1805 crate::runtime::store::init(&config).expect("store init"); 1806 let seller = identity(7); 1807 let seller_pubkey = seller.public_key_hex(); 1808 let listing_addr = format!("{KIND_LISTING}:{seller_pubkey}:{LISTING_D_TAG}"); 1809 let events = vec![ 1810 profile_event(&seller), 1811 farm_event(&seller), 1812 plot_event(&seller), 1813 listing_event(&seller), 1814 list_set_event(&seller), 1815 ]; 1816 1817 let view = pull_with_fetcher(&config, fake_fetcher(events)).expect("sync pull ingest"); 1818 1819 assert_eq!(view.state, "ready"); 1820 assert_eq!(view.fetched_count, Some(5)); 1821 assert_eq!(view.ingested_count, Some(5)); 1822 assert_eq!(view.skipped_count, Some(0)); 1823 assert_eq!(view.unsupported_count, Some(0)); 1824 assert_eq!(view.failed_count, Some(0)); 1825 assert_eq!(view.reason, None); 1826 1827 let search = crate::runtime::find::search( 1828 &config, 1829 &FindQueryArgs { 1830 query: vec!["eggs".to_owned()], 1831 }, 1832 ) 1833 .expect("market search"); 1834 assert_eq!(search.state, "ready"); 1835 assert_eq!(search.count, 1); 1836 assert_eq!( 1837 search.results[0].listing_addr.as_deref(), 1838 Some(listing_addr.as_str()) 1839 ); 1840 1841 let listing = crate::runtime::listing::get( 1842 &config, 1843 &RecordLookupArgs { 1844 key: "pasture-eggs".to_owned(), 1845 }, 1846 ) 1847 .expect("listing get"); 1848 assert_eq!(listing.state, "ready"); 1849 assert_eq!(listing.listing_addr.as_deref(), Some(listing_addr.as_str())); 1850 } 1851 1852 #[test] 1853 fn market_refresh_uses_market_scope_for_ingest() { 1854 let dir = tempdir().expect("tempdir"); 1855 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1856 crate::runtime::store::init(&config).expect("store init"); 1857 let seller = identity(8); 1858 let events = vec![listing_event(&seller), plot_event(&seller)]; 1859 1860 let view = 1861 market_refresh_with_fetcher(&config, fake_fetcher(events)).expect("market refresh"); 1862 1863 assert_eq!(view.state, "ready"); 1864 assert_eq!(view.fetched_count, Some(2)); 1865 assert_eq!(view.ingested_count, Some(1)); 1866 assert_eq!(view.unsupported_count, Some(1)); 1867 assert_eq!(view.failed_count, Some(0)); 1868 } 1869 1870 #[test] 1871 fn market_refresh_records_relay_provenance_relays_for_order_drafts() { 1872 let dir = tempdir().expect("tempdir"); 1873 let config = sample_config( 1874 dir.path(), 1875 vec![ 1876 "wss://relay-a.example.com".to_owned(), 1877 "wss://relay-b.example.com".to_owned(), 1878 ], 1879 ); 1880 crate::runtime::store::init(&config).expect("store init"); 1881 let seller = identity(9); 1882 1883 let _ = market_refresh_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])) 1884 .expect("market refresh"); 1885 let relays = relay_provenance_relays_for_scope(&config, RelayIngestScope::MarketRefresh) 1886 .expect("relay provenance"); 1887 1888 assert_eq!( 1889 relays, 1890 vec![ 1891 "wss://relay-a.example.com".to_owned(), 1892 "wss://relay-b.example.com".to_owned() 1893 ] 1894 ); 1895 } 1896 1897 #[test] 1898 fn relay_refresh_records_current_run_freshness() { 1899 let dir = tempdir().expect("tempdir"); 1900 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1901 crate::runtime::store::init(&config).expect("store init"); 1902 let seller = identity(10); 1903 1904 let view = market_refresh_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])) 1905 .expect("market refresh"); 1906 1907 assert_eq!(view.freshness.state, "fresh"); 1908 let run = view.freshness.run.as_ref().expect("run freshness"); 1909 assert_eq!(run.scope, "market_refresh"); 1910 assert_eq!(run.last_state, "success"); 1911 assert_eq!(run.relay_set_current, true); 1912 assert_eq!(run.fetched_count, Some(1)); 1913 assert_eq!(run.ingested_count, Some(1)); 1914 } 1915 1916 #[test] 1917 fn sync_pull_reports_partial_relay_fetch_reason_code() { 1918 let dir = tempdir().expect("tempdir"); 1919 let config = sample_config( 1920 dir.path(), 1921 vec![ 1922 "wss://relay-a.example.com".to_owned(), 1923 "wss://relay-b.example.com".to_owned(), 1924 ], 1925 ); 1926 crate::runtime::store::init(&config).expect("store init"); 1927 let seller = identity(13); 1928 1929 let view = pull_with_fetcher(&config, |relays, _| { 1930 Ok(DirectRelayFetchReceipt { 1931 target_relays: relays.to_vec(), 1932 connected_relays: vec![relays[0].clone()], 1933 failed_relays: vec![DirectRelayFailure { 1934 relay: relays[1].clone(), 1935 reason: "connection refused".to_owned(), 1936 }], 1937 events: vec![listing_event(&seller)], 1938 }) 1939 }) 1940 .expect("sync pull partial relay fetch"); 1941 1942 assert_eq!(view.state, "ready"); 1943 assert_eq!(view.connected_relays, vec!["wss://relay-a.example.com"]); 1944 assert_eq!(view.failed_relays.len(), 1); 1945 assert_eq!(view.failed_count, Some(1)); 1946 assert_eq!(view.reason_code.as_deref(), Some("relay_fetch_partial")); 1947 assert!( 1948 view.reason 1949 .as_deref() 1950 .expect("partial relay reason") 1951 .contains("relay(s) failed during fetch") 1952 ); 1953 let run = view.freshness.run.as_ref().expect("run freshness"); 1954 assert_eq!(run.last_state, "partial"); 1955 assert_eq!(run.failed_count, Some(1)); 1956 } 1957 1958 #[test] 1959 fn sync_pull_reports_no_overwrite_skips_without_replacing_projection() { 1960 let dir = tempdir().expect("tempdir"); 1961 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 1962 crate::runtime::store::init(&config).expect("store init"); 1963 let seller = identity(12); 1964 1965 let first = listing_event_with_title_at(&seller, "Pasture Eggs", 200); 1966 let stale = listing_event_with_title_at(&seller, "Older Eggs", 199); 1967 pull_with_fetcher(&config, fake_fetcher(vec![first])).expect("initial sync pull"); 1968 let view = pull_with_fetcher(&config, fake_fetcher(vec![stale])).expect("stale sync pull"); 1969 1970 assert_eq!(view.state, "ready"); 1971 assert_eq!(view.fetched_count, Some(1)); 1972 assert_eq!(view.ingested_count, Some(0)); 1973 assert_eq!(view.skipped_count, Some(1)); 1974 assert_eq!(view.reason_code.as_deref(), Some("sync_no_overwrite")); 1975 assert!( 1976 view.reason 1977 .as_deref() 1978 .expect("skip reason") 1979 .contains("current or newer state") 1980 ); 1981 let run = view.freshness.run.as_ref().expect("run freshness"); 1982 assert_eq!(run.last_state, "success"); 1983 assert_eq!(run.skipped_count, Some(1)); 1984 1985 let search = crate::runtime::find::search( 1986 &config, 1987 &FindQueryArgs { 1988 query: vec!["eggs".to_owned()], 1989 }, 1990 ) 1991 .expect("market search"); 1992 assert_eq!(search.results[0].title, "Pasture Eggs"); 1993 } 1994 1995 #[test] 1996 fn sync_pull_freshness_reports_relay_set_changed() { 1997 let dir = tempdir().expect("tempdir"); 1998 let config = sample_config(dir.path(), vec!["wss://relay-a.example.com".to_owned()]); 1999 crate::runtime::store::init(&config).expect("store init"); 2000 let seller = identity(11); 2001 pull_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])).expect("sync pull"); 2002 let changed = sample_config(dir.path(), vec!["wss://relay-b.example.com".to_owned()]); 2003 2004 let freshness = 2005 freshness_for_scope(&changed, RelayIngestScope::SyncPull).expect("sync freshness"); 2006 2007 assert_eq!(freshness.state, "relay_set_changed"); 2008 let run = freshness.run.as_ref().expect("run freshness"); 2009 assert_eq!(run.scope, "sync_pull"); 2010 assert_eq!(run.relay_set_current, false); 2011 } 2012 2013 #[test] 2014 fn relay_ingest_splits_unsupported_and_failed_events() { 2015 let dir = tempdir().expect("tempdir"); 2016 let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); 2017 crate::runtime::store::init(&config).expect("store init"); 2018 let seller = identity(9); 2019 let events = vec![ 2020 signed_event( 2021 &seller, 2022 WireEventParts { 2023 kind: KIND_POST, 2024 content: "hello".to_owned(), 2025 tags: Vec::new(), 2026 }, 2027 ), 2028 signed_event( 2029 &seller, 2030 WireEventParts { 2031 kind: KIND_LISTING, 2032 content: "not a listing".to_owned(), 2033 tags: Vec::new(), 2034 }, 2035 ), 2036 ]; 2037 2038 let view = pull_with_fetcher(&config, fake_fetcher(events)).expect("sync pull ingest"); 2039 2040 assert_eq!(view.state, "ready"); 2041 assert_eq!(view.fetched_count, Some(2)); 2042 assert_eq!(view.ingested_count, Some(0)); 2043 assert_eq!(view.unsupported_count, Some(1)); 2044 assert_eq!(view.failed_count, Some(1)); 2045 assert!( 2046 view.reason 2047 .as_deref() 2048 .expect("failure reason") 2049 .contains("failed ingest") 2050 ); 2051 } 2052 2053 fn fake_fetcher( 2054 events: Vec<RadrootsNostrEvent>, 2055 ) -> impl FnOnce( 2056 &[String], 2057 RadrootsNostrFilter, 2058 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { 2059 move |relays, _| { 2060 Ok(DirectRelayFetchReceipt { 2061 target_relays: relays.to_vec(), 2062 connected_relays: relays.to_vec(), 2063 failed_relays: Vec::new(), 2064 events, 2065 }) 2066 } 2067 } 2068 2069 fn profile_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent { 2070 let profile = RadrootsProfile { 2071 name: "seller".to_owned(), 2072 display_name: Some("Seller".to_owned()), 2073 nip05: None, 2074 about: Some("market seller".to_owned()), 2075 website: Some("https://seller.example.com".to_owned()), 2076 picture: None, 2077 banner: None, 2078 lud06: None, 2079 lud16: None, 2080 bot: None, 2081 }; 2082 signed_event( 2083 identity, 2084 profile_encode::to_wire_parts_with_profile_type( 2085 &profile, 2086 Some(RadrootsProfileType::Farm), 2087 ) 2088 .expect("profile parts"), 2089 ) 2090 } 2091 2092 fn farm_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent { 2093 let farm = RadrootsFarm { 2094 d_tag: FARM_D_TAG.to_owned(), 2095 name: "Relay Farm".to_owned(), 2096 about: Some("relay farm".to_owned()), 2097 website: Some("https://farm.example.com".to_owned()), 2098 picture: None, 2099 banner: None, 2100 location: None, 2101 tags: None, 2102 }; 2103 signed_event( 2104 identity, 2105 farm_encode::to_wire_parts(&farm).expect("farm parts"), 2106 ) 2107 } 2108 2109 fn plot_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent { 2110 let plot = RadrootsPlot { 2111 d_tag: PLOT_D_TAG.to_owned(), 2112 farm: RadrootsFarmRef { 2113 pubkey: identity.public_key_hex(), 2114 d_tag: FARM_D_TAG.to_owned(), 2115 }, 2116 name: "Relay Plot".to_owned(), 2117 about: Some("relay plot".to_owned()), 2118 location: None, 2119 tags: None, 2120 }; 2121 signed_event( 2122 identity, 2123 plot_encode::to_wire_parts(&plot).expect("plot parts"), 2124 ) 2125 } 2126 2127 fn list_set_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent { 2128 let list_set = RadrootsListSet { 2129 d_tag: "member_of.farms".to_owned(), 2130 content: String::new(), 2131 entries: vec![RadrootsListEntry { 2132 tag: "p".to_owned(), 2133 values: vec![identity.public_key_hex()], 2134 }], 2135 title: None, 2136 description: None, 2137 image: None, 2138 }; 2139 signed_event( 2140 identity, 2141 list_set_encode::to_wire_parts_with_kind(&list_set, KIND_LIST_SET_GENERIC) 2142 .expect("list set parts"), 2143 ) 2144 } 2145 2146 fn listing_event(identity: &RadrootsIdentity) -> RadrootsNostrEvent { 2147 listing_event_with_title_at(identity, "Pasture Eggs", 0) 2148 } 2149 2150 fn listing_event_with_title_at( 2151 identity: &RadrootsIdentity, 2152 title: &str, 2153 created_at: u64, 2154 ) -> RadrootsNostrEvent { 2155 let mut builder = radroots_nostr_build_event( 2156 KIND_LISTING, 2157 "# Pasture Eggs", 2158 vec![ 2159 vec!["d".to_owned(), LISTING_D_TAG.to_owned()], 2160 vec![ 2161 "a".to_owned(), 2162 format!("{}:{}:{}", KIND_FARM, identity.public_key_hex(), FARM_D_TAG), 2163 ], 2164 vec!["p".to_owned(), identity.public_key_hex()], 2165 vec!["key".to_owned(), "pasture-eggs".to_owned()], 2166 vec!["title".to_owned(), title.to_owned()], 2167 vec!["category".to_owned(), "eggs".to_owned()], 2168 vec!["summary".to_owned(), "Pasture-raised eggs".to_owned()], 2169 vec!["process".to_owned(), "washed".to_owned()], 2170 vec!["lot".to_owned(), "lot-a".to_owned()], 2171 vec!["profile".to_owned(), "dozen".to_owned()], 2172 vec!["year".to_owned(), "2026".to_owned()], 2173 vec!["radroots:primary_bin".to_owned(), "bin-a".to_owned()], 2174 vec![ 2175 "radroots:bin".to_owned(), 2176 "bin-a".to_owned(), 2177 "12".to_owned(), 2178 "each".to_owned(), 2179 "12".to_owned(), 2180 "each".to_owned(), 2181 "dozen".to_owned(), 2182 ], 2183 vec![ 2184 "radroots:price".to_owned(), 2185 "bin-a".to_owned(), 2186 "6".to_owned(), 2187 "USD".to_owned(), 2188 "1".to_owned(), 2189 "each".to_owned(), 2190 "6".to_owned(), 2191 "each".to_owned(), 2192 ], 2193 vec!["inventory".to_owned(), "5".to_owned()], 2194 vec!["status".to_owned(), "active".to_owned()], 2195 ], 2196 ) 2197 .expect("listing parts"); 2198 if created_at > 0 { 2199 builder = builder.custom_created_at(RadrootsNostrTimestamp::from(created_at)); 2200 } 2201 builder 2202 .sign_with_keys(identity.keys()) 2203 .expect("signed event") 2204 } 2205 2206 fn signed_event(identity: &RadrootsIdentity, parts: WireEventParts) -> RadrootsNostrEvent { 2207 radroots_nostr_build_event(parts.kind, parts.content, parts.tags) 2208 .expect("event builder") 2209 .sign_with_keys(identity.keys()) 2210 .expect("signed event") 2211 } 2212 2213 fn identity(seed: u8) -> RadrootsIdentity { 2214 RadrootsIdentity::from_secret_key_bytes(&[seed; 32]).expect("identity") 2215 } 2216 2217 fn sample_config(root: &Path, relays: Vec<String>) -> RuntimeConfig { 2218 let data = root.join("data"); 2219 let logs = root.join("logs"); 2220 let secrets = root.join("secrets"); 2221 RuntimeConfig { 2222 output: OutputConfig { 2223 format: OutputFormat::Human, 2224 verbosity: Verbosity::Normal, 2225 color: true, 2226 dry_run: false, 2227 }, 2228 interaction: InteractionConfig { 2229 input_enabled: true, 2230 assume_yes: false, 2231 stdin_tty: false, 2232 stdout_tty: false, 2233 prompts_allowed: false, 2234 confirmations_allowed: false, 2235 }, 2236 paths: PathsConfig { 2237 profile: "interactive_user".into(), 2238 profile_source: "test".into(), 2239 allowed_profiles: vec!["interactive_user".into(), "repo_local".into()], 2240 root_source: "test".into(), 2241 repo_local_root: None, 2242 repo_local_root_source: None, 2243 subordinate_path_override_source: "runtime_config".into(), 2244 app_namespace: "apps/cli".into(), 2245 shared_accounts_namespace: "shared/accounts".into(), 2246 shared_identities_namespace: "shared/identities".into(), 2247 app_config_path: root.join("config/apps/cli/config.toml"), 2248 workspace_config_path: None, 2249 app_data_root: data.join("apps/cli"), 2250 app_logs_root: logs.join("apps/cli"), 2251 shared_accounts_data_root: data.join("shared/accounts"), 2252 shared_accounts_secrets_root: secrets.join("shared/accounts"), 2253 default_identity_path: secrets.join("shared/identities/default.json"), 2254 }, 2255 migration: MigrationConfig { 2256 report: RadrootsMigrationReport::empty(), 2257 }, 2258 logging: LoggingConfig { 2259 filter: "info".into(), 2260 directory: None, 2261 stdout: false, 2262 }, 2263 account: AccountConfig { 2264 selector: None, 2265 store_path: data.join("shared/accounts/store.json"), 2266 secrets_dir: secrets.join("shared/accounts"), 2267 secret_backend: RadrootsSecretBackend::EncryptedFile, 2268 secret_fallback: None, 2269 }, 2270 account_secret_contract: AccountSecretContractConfig { 2271 default_backend: "host_vault".into(), 2272 default_fallback: Some("encrypted_file".into()), 2273 allowed_backends: vec!["host_vault".into(), "encrypted_file".into()], 2274 host_vault_policy: Some("desktop".into()), 2275 uses_protected_store: true, 2276 }, 2277 identity: IdentityConfig { 2278 path: secrets.join("shared/identities/default.json"), 2279 }, 2280 signer: SignerConfig { 2281 backend: SignerBackend::Local, 2282 }, 2283 publish: PublishConfig { 2284 transport: PublishTransport::DirectNostrRelay, 2285 source: PublishTransportSource::Defaults, 2286 radrootsd_proxy: crate::runtime::config::RadrootsdProxyConfig::default(), 2287 }, 2288 relay: RelayConfig { 2289 urls: relays, 2290 publish_policy: RelayPublishPolicy::Any, 2291 source: RelayConfigSource::Defaults, 2292 }, 2293 local: LocalConfig { 2294 root: data.join("apps/cli/replica"), 2295 replica_db_path: data.join("apps/cli/replica/replica.sqlite"), 2296 backups_dir: data.join("apps/cli/replica/backups"), 2297 exports_dir: data.join("apps/cli/replica/exports"), 2298 }, 2299 myc: MycConfig { 2300 executable: PathBuf::from("myc"), 2301 status_timeout_ms: 2_000, 2302 }, 2303 hyf: HyfConfig { 2304 enabled: false, 2305 executable: PathBuf::from("hyfd"), 2306 }, 2307 rpc: RpcConfig { 2308 url: "http://127.0.0.1:7070".into(), 2309 }, 2310 rhi: crate::runtime::config::RhiConfig { 2311 trusted_worker_pubkeys: Vec::new(), 2312 }, 2313 capability_bindings: Vec::new(), 2314 } 2315 } 2316 }