cli

Command-line interface for Radroots
git clone https://radroots.dev/git/cli.git
Log | Files | Refs | README | LICENSE

commit d6e4a59e0c073a27634b013185bc1e57c4e4d998
parent 371276014e4be8f774661a61bfc4ee37a6c6e59f
Author: triesap <tyson@radroots.org>
Date:   Wed, 17 Jun 2026 15:17:10 -0700

sync: migrate status and push to SDK runtime

- route sync status and canonical sync push through the CLI SDK adapter
- render SDK outbox queue, relay, partial, retryable, and empty states
- label remaining replica sync state as legacy-derived while preserving sync pull
- update service and target tests for SDK-backed sync behavior

Diffstat:
Msrc/ops/exec/runtime.rs | 29+++++++++++++++++++++--------
Msrc/runtime/sync.rs | 1306+++++++++++++++++++++++++++++++++++--------------------------------------------
Msrc/view/runtime.rs | 16++++++++++++++++
Mtests/target_cli.rs | 227+++++++++++++++++++++++++++++++++++++------------------------------------------
4 files changed, 713 insertions(+), 865 deletions(-)

diff --git a/src/ops/exec/runtime.rs b/src/ops/exec/runtime.rs @@ -54,7 +54,9 @@ impl OperationService<SyncStatusGetRequest> for RuntimeOperationService<'_> { &self, _request: OperationRequest<SyncStatusGetRequest>, ) -> Result<OperationResult<Self::Result>, OperationAdapterError> { - let view = map_runtime("sync.status.get", crate::runtime::sync::status(self.config))?; + let view = crate::runtime::sync::status(self.config).map_err(|error| { + OperationAdapterError::sdk_adapter_failure("sync.status.get", error) + })?; sync_status_result(&view) } } @@ -84,7 +86,8 @@ impl OperationService<SyncPushRequest> for RuntimeOperationService<'_> { if request.context.requires_approval_token() { return Err(OperationAdapterError::approval_required("sync.push")); } - let view = map_runtime("sync.push", crate::runtime::sync::push(self.config))?; + let view = crate::runtime::sync::push(self.config) + .map_err(|error| OperationAdapterError::sdk_adapter_failure("sync.push", error))?; sync_action_result::<SyncPushResult>("sync.push", &view) } } @@ -292,12 +295,22 @@ mod tests { let sync = OperationRequest::new(OperationContext::default(), SyncStatusGetRequest::default()) .expect("sync status request"); - let error = service.execute(sync).expect_err("sync status unconfigured"); - let output = error.to_output_error(); - - assert_eq!(output.code, "operation_unavailable"); - assert_eq!(output.exit_code, 3); - assert!(error.to_string().contains("sync.status.get")); + let envelope = service + .execute(sync) + .expect("sync status result") + .to_envelope(OperationContext::default().envelope_context("req_sync_status")) + .expect("sync status envelope"); + + assert_eq!(envelope.operation_id, "sync.status.get"); + assert_eq!(envelope.result["state"], "ready"); + assert_eq!( + envelope.result["source"], + "SDK canonical event store and outbox" + ); + assert_eq!(envelope.result["replica_db"], "legacy_derived_not_checked"); + assert_eq!(envelope.result["queue"]["pending_count"], 0); + assert_eq!(envelope.result["queue"]["total_count"], 0); + assert_eq!(envelope.result["actions"][0], "radroots sync pull"); } #[test] diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -9,16 +8,17 @@ use radroots_events::kinds::{ KIND_LIST_SET_PICTURE, KIND_LIST_SET_RELAY, KIND_LIST_SET_RELEASE_ARTIFACT, KIND_LIST_SET_STARTER_PACK, KIND_LIST_SET_VIDEO, KIND_LISTING, KIND_PLOT, KIND_PROFILE, }; -use radroots_events_codec::wire::WireEventParts; -use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::{ RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_event_from_nostr, radroots_nostr_kind, }; use radroots_replica_db::{ReplicaSql, migrations}; use radroots_replica_sync::{ - RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, RadrootsReplicaPendingPublishEvent, - radroots_replica_ingest_event, radroots_replica_ingest_event_head, - radroots_replica_pending_publish_batch, radroots_replica_sync_status, + RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, radroots_replica_ingest_event, + radroots_replica_sync_status, +}; +use radroots_sdk::{ + PushOutboxEventReceipt, PushOutboxReceipt, PushOutboxRelayOutcomeKind, PushOutboxRequest, + SyncStatusReceipt, SyncStatusRequest, }; use radroots_sql_core::{SqlExecutor, SqliteExecutor}; use serde::Deserialize; @@ -26,27 +26,26 @@ use serde_json::json; use crate::cli::global::SyncWatchArgs; use crate::runtime::RuntimeError; -use crate::runtime::account; use crate::runtime::config::{PublishMode, RuntimeConfig}; use crate::runtime::direct_relay::{ - DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, DirectRelayPublishError, - DirectRelayPublishReceipt, fetch_events_from_relays, publish_parts_with_identity, + DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, fetch_events_from_relays, }; +use crate::runtime::sdk::{CliSdkAdapterError, CliSdkSession, sdk_relay_url_policy}; use crate::view::runtime::{ - RelayFailureView, SyncActionView, SyncFreshnessView, SyncPublishPlanAuthorView, - SyncPublishPlanKindView, SyncPublishPlanView, SyncQueueView, SyncRunFreshnessView, + RelayFailureView, SyncActionView, SyncFreshnessView, SyncQueueView, SyncRunFreshnessView, SyncStatusView, SyncWatchFrameView, SyncWatchView, }; const SYNC_SOURCE: &str = "local replica · local first"; +const SDK_SYNC_SOURCE: &str = "SDK canonical event store and outbox"; +const SDK_PUSH_SOURCE: &str = "SDK outbox push"; const RELAY_PULL_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync pull"; -const RELAY_PUSH_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync push"; const SYNC_PULL_ACTION: &str = "radroots sync pull"; const SYNC_PUSH_ACTION: &str = "radroots sync push"; const SYNC_READY_ACTION: &str = "radroots market product search eggs"; const MARKET_READY_ACTION: &str = "radroots market product search eggs"; const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest"; -const PUBLISH_SOURCE: &str = "direct Nostr relay publish · local replica sync"; +const RADROOTSD_SYNC_PUSH_SOURCE: &str = "radrootsd sync push · deferred"; pub(crate) const RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON: &str = "sync push is only available in publish mode `nostr_relay`; radrootsd sync push is not implemented"; const RELAY_FETCH_LIMIT: usize = 1_000; const RELAY_FETCH_MAX_PAGES: usize = 5; @@ -126,20 +125,10 @@ struct SyncRunRow { failure_reason: Option<String>, } -pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, RuntimeError> { - let snapshot = inspect_sync(config)?; - Ok(SyncStatusView { - state: snapshot.state, - source: snapshot.source, - local_root: snapshot.local_root, - replica_db: snapshot.replica_db, - relay_count: snapshot.relay_count, - publish_policy: snapshot.publish_policy, - freshness: snapshot.freshness, - queue: snapshot.queue, - reason: snapshot.reason, - actions: snapshot.actions, - }) +pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, CliSdkAdapterError> { + let session = CliSdkSession::connect(config)?; + let receipt = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; + Ok(sdk_sync_status_view(config, receipt)) } pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { @@ -320,10 +309,7 @@ where relay_count: config.relay.urls.len(), publish_policy: config.relay.publish_policy.as_str().to_owned(), freshness, - queue: SyncQueueView { - expected_count: queue.expected_count, - pending_count: queue.pending_count, - }, + queue: legacy_sync_queue(queue.expected_count, queue.pending_count), target_relays: receipt.target_relays, connected_relays: receipt.connected_relays, acknowledged_relays: Vec::new(), @@ -342,153 +328,22 @@ where }) } -pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> { - push_with_publisher(config, |identity, relay_urls, event| { - publish_parts_with_identity( - identity, - relay_urls, - WireEventParts { - kind: event.draft.kind, - content: event.draft.content.clone(), - tags: event.draft.tags.clone(), - }, - ) - }) -} - -fn push_with_publisher<F>( - config: &RuntimeConfig, - mut publisher: F, -) -> Result<SyncActionView, RuntimeError> -where - F: FnMut( - &RadrootsIdentity, - &[String], - &RadrootsReplicaPendingPublishEvent, - ) -> Result<DirectRelayPublishReceipt, DirectRelayPublishError>, -{ +pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, CliSdkAdapterError> { if matches!(config.publish.mode, PublishMode::Radrootsd) { return Ok(push_radrootsd_unavailable_view(config)); } - let snapshot = inspect_sync(config)?; - if snapshot.state == "unconfigured" { - return Ok(push_unconfigured_view(snapshot)); - } - - let signing = match account::resolve_local_signing_identity(config) { - Ok(signing) => signing, - Err(RuntimeError::Account(failure)) => { - let mut view = empty_action_from_snapshot(snapshot, "push"); - view.state = "unconfigured".to_owned(); - view.reason = Some(failure.to_string()); - view.actions = vec![ - "radroots account create".to_owned(), - "radroots account attach-secret".to_owned(), - ]; - return Ok(view); - } - Err(error) => return Err(error), - }; - - let executor = SqliteExecutor::open(&config.local.replica_db_path)?; - migrations::run_all_up(&executor)?; - let batch = radroots_replica_pending_publish_batch(&executor)?; - let selected_pubkey = signing - .account - .record - .public_identity - .public_key_hex - .as_str(); - let (mut counts, publishable_events, publish_plan) = sync_push_plan(&batch, selected_pubkey); - + let session = CliSdkSession::connect(config)?; if config.output.dry_run { - let state = if counts.pending_count > 0 { - "dry_run" - } else { - "ready" - }; - let reason = sync_push_dry_run_reason(&counts); - let actions = sync_push_actions(state, &counts); - return Ok(push_view( - config, - state, - SyncQueueView { - expected_count: batch.expected_count, - pending_count: batch.pending_count, - }, - snapshot.freshness, - counts, - Vec::new(), - Vec::new(), - Vec::new(), - Some(publish_plan), - reason, - actions, - )); + let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; + return Ok(sdk_push_dry_run_view(config, status)); } - let mut connected_relays = Vec::new(); - let mut acknowledged_relays = Vec::new(); - let mut failed_relays = Vec::new(); - - for event in publishable_events { - match publisher(&signing.identity, &config.relay.urls, event) { - Ok(receipt) => { - push_unique_many(&mut connected_relays, receipt.connected_relays.iter()); - push_unique_many(&mut acknowledged_relays, receipt.acknowledged_relays.iter()); - failed_relays.extend(relay_failures(receipt.failed_relays)); - let signed_event = radroots_event_from_nostr(&receipt.event); - radroots_replica_ingest_event_head(&executor, &signed_event)?; - counts.published_count += 1; - } - Err(error) => { - counts.failed_count += 1; - let failure = sync_push_publish_failure(error); - push_unique_many(&mut connected_relays, failure.connected_relays.iter()); - failed_relays.extend(failure.failed_relays); - if counts.first_failure_reason.is_none() { - counts.first_failure_reason = Some(failure.reason); - } - break; - } - } - } - - let queue = radroots_replica_sync_status(&executor)?; - let freshness = freshness_from_executor(&executor)?; - let state = if counts.failed_count > 0 && counts.published_count > 0 { - "partial" - } else if counts.failed_count > 0 { - "unavailable" - } else if counts.published_count > 0 && counts.skipped_count > 0 && queue.pending_count > 0 { - "partial" - } else if counts.published_count == 0 && counts.skipped_count > 0 && queue.pending_count > 0 { - "unconfigured" - } else if counts.published_count > 0 { - "published" - } else { - "ready" - }; - let reason = counts.reason(); - let actions = sync_push_actions(state, &counts); - - Ok(push_view( - config, - state, - SyncQueueView { - expected_count: queue.expected_count, - pending_count: queue.pending_count, - }, - freshness, - counts, - connected_relays, - acknowledged_relays, - failed_relays, - None, - reason, - actions, - )) + let receipt = session.block_on(session.sdk().sync().push_outbox( + PushOutboxRequest::new().with_relay_url_policy(sdk_relay_url_policy(config)), + ))?; + let status = session.block_on(session.sdk().sync().status(SyncStatusRequest::new()))?; + Ok(sdk_push_view(config, receipt, status)) } pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchView, RuntimeError> { @@ -562,7 +417,7 @@ fn push_radrootsd_unavailable_view(config: &RuntimeConfig) -> SyncActionView { SyncActionView { direction: "push".to_owned(), state: "unavailable".to_owned(), - source: PUBLISH_SOURCE.to_owned(), + source: RADROOTSD_SYNC_PUSH_SOURCE.to_owned(), local_root: config.local.root.display().to_string(), replica_db: "not_checked".to_owned(), relay_count: config.relay.urls.len(), @@ -574,10 +429,7 @@ fn push_radrootsd_unavailable_view(config: &RuntimeConfig) -> SyncActionView { last_event_at: None, run: None, }, - queue: SyncQueueView { - expected_count: 0, - pending_count: 0, - }, + queue: legacy_sync_queue(0, 0), target_relays: config.relay.urls.clone(), connected_relays: Vec::new(), acknowledged_relays: Vec::new(), @@ -596,272 +448,319 @@ fn push_radrootsd_unavailable_view(config: &RuntimeConfig) -> SyncActionView { } } -fn push_unconfigured_view(snapshot: SyncSnapshot) -> SyncActionView { - let mut view = empty_action_from_snapshot(snapshot, "push"); - if view.replica_db == "ready" && view.relay_count == 0 { - view.actions = vec![RELAY_PUSH_SETUP_ACTION.to_owned()]; +fn sdk_sync_status_view(config: &RuntimeConfig, receipt: SyncStatusReceipt) -> SyncStatusView { + let actions = sdk_sync_status_actions(&receipt); + let relay_count = receipt.relay_targets.configured_count; + SyncStatusView { + state: "ready".to_owned(), + source: SDK_SYNC_SOURCE.to_owned(), + local_root: config.local.root.display().to_string(), + replica_db: "legacy_derived_not_checked".to_owned(), + relay_count, + publish_policy: config.relay.publish_policy.as_str().to_owned(), + freshness: sdk_sync_freshness(&receipt), + queue: sdk_sync_queue(&receipt), + reason: None, + actions, } - view } -fn push_view( +fn sdk_push_dry_run_view(config: &RuntimeConfig, status: SyncStatusReceipt) -> SyncActionView { + let publishable_count = usize_from_i64(status.outbox.ready_signed_events); + let state = if publishable_count > 0 { + "dry_run" + } else { + "ready" + }; + let reason = if publishable_count > 0 { + Some("dry run requested; SDK outbox push skipped".to_owned()) + } else if status.outbox.total_events > 0 { + Some("SDK outbox has no ready signed events to push".to_owned()) + } else { + None + }; + sdk_push_action_view( + config, + state, + sdk_sync_queue(&status), + sdk_sync_freshness(&status), + status.relay_targets.configured_relays, + Vec::new(), + Vec::new(), + Vec::new(), + publishable_count, + 0, + 0, + Some(0), + reason, + sdk_sync_push_actions(state, publishable_count > 0), + ) +} + +fn sdk_push_view( + config: &RuntimeConfig, + receipt: PushOutboxReceipt, + status: SyncStatusReceipt, +) -> SyncActionView { + let failed_count = receipt.retryable_events + receipt.terminal_events; + let state = if receipt.attempted_events == 0 { + "ready" + } else if receipt.published_events > 0 && failed_count > 0 { + "partial" + } else if failed_count > 0 { + "unavailable" + } else if receipt.published_events > 0 { + "published" + } else { + "ready" + }; + let reason = sdk_push_reason(&receipt, failed_count); + sdk_push_action_view( + config, + state, + sdk_sync_queue(&status), + sdk_sync_freshness(&status), + sdk_push_target_relays(&receipt, &status), + sdk_push_connected_relays(&receipt), + sdk_push_acknowledged_relays(&receipt), + sdk_push_failed_relays(&receipt), + receipt.attempted_events, + receipt.published_events, + failed_count, + Some(0), + reason, + sdk_sync_push_actions(state, failed_count > 0), + ) +} + +fn sdk_push_action_view( config: &RuntimeConfig, state: &str, queue: SyncQueueView, freshness: SyncFreshnessView, - counts: SyncPushCounts, + target_relays: Vec<String>, connected_relays: Vec<String>, acknowledged_relays: Vec<String>, failed_relays: Vec<RelayFailureView>, - publish_plan: Option<SyncPublishPlanView>, + publishable_count: usize, + published_count: usize, + failed_count: usize, + skipped_count: Option<usize>, reason: Option<String>, actions: Vec<String>, ) -> SyncActionView { SyncActionView { direction: "push".to_owned(), state: state.to_owned(), - source: PUBLISH_SOURCE.to_owned(), + source: SDK_PUSH_SOURCE.to_owned(), local_root: config.local.root.display().to_string(), - replica_db: "ready".to_owned(), + replica_db: "legacy_derived_not_checked".to_owned(), relay_count: config.relay.urls.len(), publish_policy: config.relay.publish_policy.as_str().to_owned(), freshness, queue, - target_relays: config.relay.urls.clone(), + target_relays, connected_relays, acknowledged_relays, failed_relays, fetched_count: None, ingested_count: None, - publishable_count: Some(counts.publishable_count), - published_count: Some(counts.published_count), - skipped_count: Some(counts.skipped_count), - unsupported_count: Some(counts.unsupported_count), - failed_count: Some(counts.failed_count), - publish_plan, - reason_code: counts.reason_code().map(str::to_owned), + publishable_count: Some(publishable_count), + published_count: Some(published_count), + skipped_count, + unsupported_count: Some(0), + failed_count: Some(failed_count), + publish_plan: None, + reason_code: sdk_sync_push_reason_code(state).map(str::to_owned), reason, actions, } } -fn sync_push_plan<'a>( - batch: &'a radroots_replica_sync::RadrootsReplicaPendingPublishBatch, - selected_pubkey: &str, -) -> ( - SyncPushCounts, - Vec<&'a RadrootsReplicaPendingPublishEvent>, - SyncPublishPlanView, -) { - let mut counts = SyncPushCounts::from_batch(batch); - let mut publishable_events = Vec::new(); - let mut event_kinds = BTreeMap::<u32, SyncPublishPlanKindView>::new(); - let mut authors = BTreeMap::<String, SyncPublishPlanAuthorView>::new(); - let selected_author = canonical_pubkey_hex(selected_pubkey); - - for event in &batch.pending_events { - let event_author = canonical_pubkey_hex(event.author.as_str()); - let is_publishable = event_author == selected_author; - let kind = event_kinds - .entry(event.kind) - .or_insert_with(|| SyncPublishPlanKindView { - kind: event.kind, - pending_count: 0, - publishable_count: 0, - skipped_count: 0, - unsupported_count: 0, - failed_count: 0, - }); - kind.pending_count += 1; - - let author = - authors - .entry(event_author.clone()) - .or_insert_with(|| SyncPublishPlanAuthorView { - author: event_author.clone(), - eligibility: if is_publishable { - "selected".to_owned() - } else { - "other_author".to_owned() - }, - pending_count: 0, - publishable_count: 0, - skipped_count: 0, - }); - author.pending_count += 1; - - if is_publishable { - kind.publishable_count += 1; - author.publishable_count += 1; - publishable_events.push(event); - } else { - kind.skipped_count += 1; - author.skipped_count += 1; - counts.skipped_count += 1; - if counts.first_skipped_author.is_none() { - counts.first_skipped_author = Some(event_author); - } - } +fn sdk_sync_status_actions(receipt: &SyncStatusReceipt) -> Vec<String> { + let mut actions = Vec::new(); + if receipt.outbox.ready_signed_events > 0 { + actions.push(SYNC_PUSH_ACTION.to_owned()); } - - counts.publishable_count = publishable_events.len(); - - ( - counts, - publishable_events, - SyncPublishPlanView { - selected_author, - event_kinds: event_kinds.into_values().collect(), - authors: authors.into_values().collect(), - }, - ) -} - -fn canonical_pubkey_hex(pubkey: &str) -> String { - pubkey.to_ascii_lowercase() -} - -fn sync_push_dry_run_reason(counts: &SyncPushCounts) -> Option<String> { - match counts.skipped_count { - 0 => Some("dry run requested; relay publish skipped".to_owned()), - skipped => Some(format!( - "dry run requested; relay publish skipped; {skipped} pending event(s) belong to another author and would not be signed" - )), + if receipt.event_store.total_events == 0 { + actions.push(SYNC_PULL_ACTION.to_owned()); } + actions } -fn sync_push_actions(state: &str, counts: &SyncPushCounts) -> Vec<String> { - let retry_selected_account = - counts.failed_count > 0 || counts.publishable_count > counts.published_count; - let selected_account_actionable = - retry_selected_account || (state == "dry_run" && counts.publishable_count > 0); - let mut actions = match state { +fn sdk_sync_push_actions(state: &str, retryable: bool) -> Vec<String> { + match state { "published" | "ready" => vec!["radroots sync status get".to_owned()], - "dry_run" if selected_account_actionable => { - vec![ - SYNC_PUSH_ACTION.to_owned(), - "radroots sync status get".to_owned(), - ] - } - "dry_run" => vec!["radroots sync status get".to_owned()], - _ if selected_account_actionable => { + "dry_run" | "partial" | "unavailable" if retryable => { vec![ SYNC_PUSH_ACTION.to_owned(), "radroots sync status get".to_owned(), ] } _ => vec!["radroots sync status get".to_owned()], - }; + } +} - if counts.skipped_count > 0 { - actions.push("radroots account list".to_owned()); - if let Some(author) = counts.first_skipped_author.as_deref() { - actions.push(format!("radroots --account-id {author} sync push")); - } +fn sdk_sync_push_reason_code(state: &str) -> Option<&'static str> { + match state { + "dry_run" => Some("dry_run"), + "partial" => Some("sdk_outbox_push_partial"), + "unavailable" => Some("sdk_outbox_push_failed"), + _ => None, } +} - actions.into_iter().fold(Vec::new(), |mut unique, action| { - if !unique.contains(&action) { - unique.push(action); - } - unique - }) +fn sdk_push_reason(receipt: &PushOutboxReceipt, failed_count: usize) -> Option<String> { + if receipt.attempted_events == 0 { + return Some("SDK outbox had no ready signed events to push".to_owned()); + } + if failed_count > 0 && receipt.published_events > 0 { + return Some(format!( + "SDK outbox push published {} event(s); {failed_count} event(s) remain retryable or terminal", + receipt.published_events + )); + } + if failed_count > 0 { + return Some( + "SDK outbox push did not reach accepted quorum for any ready event".to_owned(), + ); + } + None } -#[derive(Debug, Clone, Default)] -struct SyncPushCounts { - pending_count: usize, - publishable_count: usize, - published_count: usize, - skipped_count: usize, - unsupported_count: usize, - failed_count: usize, - first_failure_reason: Option<String>, - first_skipped_author: Option<String>, +fn sdk_sync_queue(receipt: &SyncStatusReceipt) -> SyncQueueView { + let pending_count = usize_from_i64( + receipt + .outbox + .pending_events + .saturating_add(receipt.outbox.retryable_events), + ); + SyncQueueView { + expected_count: usize_from_i64(receipt.outbox.total_events), + pending_count, + total_count: Some(usize_from_i64(receipt.outbox.total_events)), + retryable_count: Some(usize_from_i64(receipt.outbox.retryable_events)), + terminal_count: Some(usize_from_i64(receipt.outbox.terminal_events)), + failed_terminal_count: Some(usize_from_i64(receipt.outbox.failed_terminal_events)), + ready_signed_count: Some(usize_from_i64(receipt.outbox.ready_signed_events)), + publishing_count: Some(usize_from_i64(receipt.outbox.publishing_events)), + last_attempt_at_ms: receipt.outbox.last_attempt_at_ms, + last_error: receipt.outbox.last_error.clone(), + } } -impl SyncPushCounts { - fn from_batch(batch: &radroots_replica_sync::RadrootsReplicaPendingPublishBatch) -> Self { - Self { - pending_count: batch.pending_count, - ..Self::default() - } +fn legacy_sync_queue(expected_count: usize, pending_count: usize) -> SyncQueueView { + SyncQueueView { + expected_count, + pending_count, + total_count: None, + retryable_count: None, + terminal_count: None, + failed_terminal_count: None, + ready_signed_count: None, + publishing_count: None, + last_attempt_at_ms: None, + last_error: None, } +} - fn reason(&self) -> Option<String> { - if self.failed_count > 0 { - let failure_reason = match &self.first_failure_reason { - Some(reason) => format!( - "{} pending event(s) failed publish: {reason}", - self.failed_count - ), - None => format!("{} pending event(s) failed publish", self.failed_count), - }; - if self.skipped_count > 0 { - return Some(format!( - "{failure_reason}; {} pending event(s) belong to another author and were not signed", - self.skipped_count - )); - } - return Some(failure_reason); - } - if self.pending_count > 0 && self.skipped_count > 0 { - return Some( - "pending local replica events belong to another author and were not signed" - .to_owned(), - ); +fn sdk_sync_freshness(receipt: &SyncStatusReceipt) -> SyncFreshnessView { + let Some(last_event_updated_at_ms) = receipt.event_store.last_event_updated_at_ms else { + return missing_freshness(); + }; + let last_event_at = u64::try_from(last_event_updated_at_ms / 1_000).unwrap_or(0); + let observed_at = u64::try_from(receipt.observed_at_ms / 1_000).unwrap_or_else(|_| unix_now()); + let age_seconds = observed_at.saturating_sub(last_event_at); + SyncFreshnessView { + state: "synced".to_owned(), + display: format!("SDK event store updated {}", relative_age(age_seconds)), + age_seconds: Some(age_seconds), + last_event_at: Some(last_event_at), + run: None, + } +} + +fn sdk_push_target_relays(receipt: &PushOutboxReceipt, status: &SyncStatusReceipt) -> Vec<String> { + let mut relays = Vec::new(); + for relay in receipt.events.iter().flat_map(|event| event.relays.iter()) { + if !relays.contains(&relay.relay_url) { + relays.push(relay.relay_url.clone()); } - None } + if relays.is_empty() { + relays.extend(status.relay_targets.configured_relays.clone()); + } + relays +} - fn reason_code(&self) -> Option<&'static str> { - if self.failed_count > 0 { - Some("sync_publish_failed") - } else if self.skipped_count > 0 { - Some("sync_publish_skipped_other_author") - } else { - None +fn sdk_push_connected_relays(receipt: &PushOutboxReceipt) -> Vec<String> { + sdk_push_relays_matching(receipt, |_, relay| relay.attempted) +} + +fn sdk_push_acknowledged_relays(receipt: &PushOutboxReceipt) -> Vec<String> { + sdk_push_relays_matching(receipt, |_, relay| sdk_relay_accepted(relay.outcome_kind)) +} + +fn sdk_push_relays_matching( + receipt: &PushOutboxReceipt, + predicate: impl Fn(&PushOutboxEventReceipt, &radroots_sdk::PushOutboxRelayReceipt) -> bool, +) -> Vec<String> { + let mut relays = Vec::new(); + for event in &receipt.events { + for relay in &event.relays { + if predicate(event, relay) && !relays.contains(&relay.relay_url) { + relays.push(relay.relay_url.clone()); + } } } + relays } -#[derive(Debug, Clone)] -struct SyncPushPublishFailure { - reason: String, - connected_relays: Vec<String>, - failed_relays: Vec<RelayFailureView>, +fn sdk_push_failed_relays(receipt: &PushOutboxReceipt) -> Vec<RelayFailureView> { + receipt + .events + .iter() + .flat_map(|event| event.relays.iter()) + .filter(|relay| !sdk_relay_accepted(relay.outcome_kind)) + .map(|relay| RelayFailureView { + relay: relay.relay_url.clone(), + reason: relay + .message + .clone() + .unwrap_or_else(|| sdk_relay_outcome_kind(relay.outcome_kind).to_owned()), + }) + .collect() } -fn sync_push_publish_failure(error: DirectRelayPublishError) -> SyncPushPublishFailure { - match error { - DirectRelayPublishError::Connect { - reason, - connected_relays, - failed_relays, - .. - } => SyncPushPublishFailure { - reason: format!("direct relay connection failed: {reason}"), - connected_relays, - failed_relays: relay_failures(failed_relays), - }, - DirectRelayPublishError::Publish { - reason, - connected_relays, - failed_relays, - .. - } => SyncPushPublishFailure { - reason: format!("direct relay publish failed: {reason}"), - connected_relays, - failed_relays: relay_failures(failed_relays), - }, - other => SyncPushPublishFailure { - reason: other.to_string(), - connected_relays: Vec::new(), - failed_relays: Vec::new(), - }, +fn sdk_relay_accepted(kind: PushOutboxRelayOutcomeKind) -> bool { + matches!( + kind, + PushOutboxRelayOutcomeKind::Accepted | PushOutboxRelayOutcomeKind::DuplicateAccepted + ) +} + +fn sdk_relay_outcome_kind(kind: PushOutboxRelayOutcomeKind) -> &'static str { + match kind { + PushOutboxRelayOutcomeKind::Accepted => "accepted", + PushOutboxRelayOutcomeKind::DuplicateAccepted => "duplicate_accepted", + PushOutboxRelayOutcomeKind::Blocked => "blocked", + PushOutboxRelayOutcomeKind::RateLimited => "rate_limited", + PushOutboxRelayOutcomeKind::Invalid => "invalid", + PushOutboxRelayOutcomeKind::PowRequired => "pow_required", + PushOutboxRelayOutcomeKind::Restricted => "restricted", + PushOutboxRelayOutcomeKind::AuthRequired => "auth_required", + PushOutboxRelayOutcomeKind::Error => "error", + PushOutboxRelayOutcomeKind::Timeout => "timeout", + PushOutboxRelayOutcomeKind::ConnectionFailed => "connection_failed", + PushOutboxRelayOutcomeKind::Unknown => "unknown", + _ => "unknown", } } +fn usize_from_i64(value: i64) -> usize { + usize::try_from(value.max(0)).unwrap_or(usize::MAX) +} + fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { if !config.local.replica_db_path.exists() { return Ok(SyncSnapshot { @@ -872,10 +771,7 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { relay_count: config.relay.urls.len(), publish_policy: config.relay.publish_policy.as_str().to_owned(), freshness: missing_freshness(), - queue: SyncQueueView { - expected_count: 0, - pending_count: 0, - }, + queue: legacy_sync_queue(0, 0), reason: Some("local replica database is not initialized".to_owned()), actions: vec!["radroots store init".to_owned()], }); @@ -900,10 +796,7 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { relay_count, publish_policy, freshness, - queue: SyncQueueView { - expected_count: queue.expected_count, - pending_count: queue.pending_count, - }, + queue: legacy_sync_queue(queue.expected_count, queue.pending_count), reason: Some("no relays are configured for this operator session".to_owned()), actions, }); @@ -922,42 +815,12 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> { relay_count, publish_policy, freshness, - queue: SyncQueueView { - expected_count: queue.expected_count, - pending_count: queue.pending_count, - }, + queue: legacy_sync_queue(queue.expected_count, queue.pending_count), reason: None, actions, }) } -pub(crate) fn freshness_from_executor( - executor: &SqliteExecutor, -) -> Result<SyncFreshnessView, RuntimeError> { - let db = ReplicaSql::new(executor); - let last_event_at = db.nostr_event_last_created_at()?; - - Ok(match last_event_at { - Some(last_event_at) => { - let age_seconds = unix_now().saturating_sub(last_event_at); - SyncFreshnessView { - state: "synced".to_owned(), - display: format!("synced {}", relative_age(age_seconds)), - age_seconds: Some(age_seconds), - last_event_at: Some(last_event_at), - run: None, - } - } - None => SyncFreshnessView { - state: "never".to_owned(), - display: "never synced".to_owned(), - age_seconds: None, - last_event_at: None, - run: None, - }, - }) -} - pub(crate) fn missing_freshness() -> SyncFreshnessView { SyncFreshnessView { state: "never".to_owned(), @@ -1582,6 +1445,7 @@ mod tests { use std::path::{Path, PathBuf}; use radroots_events::farm::{RadrootsFarm, RadrootsFarmRef}; + use radroots_events::ids::RadrootsEventId; use radroots_events::kinds::{KIND_FARM, KIND_LIST_SET_GENERIC, KIND_LISTING, KIND_POST}; use radroots_events::list::RadrootsListEntry; use radroots_events::list_set::RadrootsListSet; @@ -1596,19 +1460,20 @@ mod tests { use radroots_nostr::prelude::{ RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrTimestamp, radroots_nostr_build_event, }; - use radroots_replica_db::{farm, farm_member_claim, migrations}; - use radroots_replica_db_schema::farm::IFarmFields; - use radroots_replica_db_schema::farm_member_claim::IFarmMemberClaimFields; - use radroots_replica_sync::radroots_replica_sync_status; use radroots_runtime_paths::RadrootsMigrationReport; + use radroots_sdk::{ + PushOutboxEventReceipt, PushOutboxEventState, PushOutboxReceipt, + PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, SyncEventStoreStatus, SyncOutboxStatus, + SyncRelayTargetSummary, SyncStatusReceipt, SyncStatusSource, + }; use radroots_secret_vault::RadrootsSecretBackend; - use radroots_sql_core::SqliteExecutor; use tempfile::tempdir; use super::{ - DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, - DirectRelayPublishReceipt, RelayIngestScope, market_refresh_with_fetcher, - pull_with_fetcher, push_with_publisher, relay_provenance_relays_for_scope, status, + DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, RelayIngestScope, + freshness_for_scope, market_refresh_with_fetcher, pull_with_fetcher, push, + relay_provenance_relays_for_scope, sdk_push_dry_run_view, sdk_push_view, + sdk_sync_status_view, }; use crate::cli::global::{FindQueryArgs, RecordLookupArgs}; use crate::runtime::config::{ @@ -1660,321 +1525,236 @@ mod tests { } #[test] - fn sync_push_dry_run_reports_pending_without_publish_or_state_update() { + fn sync_status_empty_sdk_store_reports_canonical_source() { let dir = tempdir().expect("tempdir"); - let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - config.output.dry_run = true; - crate::runtime::store::init(&config).expect("store init"); - let signing = - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - seed_replica_farm( - &config, - signing - .account - .record - .public_identity - .public_key_hex - .as_str(), + let config = sample_config( + dir.path(), + vec![ + "wss://relay-a.example.com".to_owned(), + "wss://relay-b.example.com".to_owned(), + ], ); - let view = push_with_publisher(&config, |_, _, _| panic!("dry run must not publish")) - .expect("sync push dry run"); - let status = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status"); - - assert_eq!(view.state, "dry_run"); - assert_eq!(view.target_relays, vec!["wss://relay.example.com"]); - assert_eq!(view.publishable_count, Some(status.pending_count)); - assert_eq!(view.published_count, Some(0)); - assert_eq!(view.failed_count, Some(0)); - let plan = view.publish_plan.as_ref().expect("publish plan"); - assert_eq!( - plan.selected_author, - signing.account.record.public_identity.public_key_hex + let view = sdk_sync_status_view( + &config, + sdk_status_receipt( + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + None, + None, + &["wss://relay-a.example.com", "wss://relay-b.example.com"], + ), ); - assert!(plan.event_kinds.iter().any(|kind| { - kind.kind == KIND_FARM - && kind.pending_count == 1 - && kind.publishable_count == 1 - && kind.skipped_count == 0 - })); - assert!(plan.authors.iter().any(|author| { - author.author == signing.account.record.public_identity.public_key_hex - && author.eligibility == "selected" - && author.publishable_count == status.pending_count - })); - assert!(status.pending_count > 0); - } - - #[test] - fn sync_push_dry_run_reports_other_author_publish_plan() { - let dir = tempdir().expect("tempdir"); - let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - config.output.dry_run = true; - crate::runtime::store::init(&config).expect("store init"); - let signing = - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - let selected_pubkey = signing - .account - .record - .public_identity - .public_key_hex - .clone(); - let other_pubkey = identity(42).public_key_hex(); - let other_pubkey_upper = other_pubkey.to_ascii_uppercase(); - seed_replica_farm(&config, selected_pubkey.as_str()); - seed_replica_farm(&config, other_pubkey_upper.as_str()); - - let view = push_with_publisher(&config, |_, _, _| panic!("dry run must not publish")) - .expect("sync push dry run"); - assert_eq!(view.state, "dry_run"); - let skipped_count = view.skipped_count.expect("skipped count"); - assert!(skipped_count > 0); - assert!( - view.reason - .as_deref() - .expect("dry-run reason") - .contains("belong to another author") - ); - let plan = view.publish_plan.as_ref().expect("publish plan"); - assert!(plan.event_kinds.iter().any(|kind| kind.skipped_count == 1)); - assert!(plan.authors.iter().any(|author| { - author.author == other_pubkey - && author.eligibility == "other_author" - && author.pending_count == skipped_count - && author.skipped_count == skipped_count - })); - assert!(view.actions.contains(&"radroots account list".to_owned())); - assert!( - view.actions - .iter() - .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push")) - ); + assert_eq!(view.state, "ready"); + assert_eq!(view.source, "SDK canonical event store and outbox"); + assert_eq!(view.replica_db, "legacy_derived_not_checked"); + assert_eq!(view.relay_count, 2); + assert_eq!(view.queue.total_count, Some(0)); + assert_eq!(view.queue.pending_count, 0); + assert_eq!(view.queue.retryable_count, Some(0)); + assert_eq!(view.queue.terminal_count, Some(0)); + assert_eq!(view.queue.ready_signed_count, Some(0)); + assert_eq!(view.actions, vec!["radroots sync pull"]); } #[test] - fn sync_push_publishes_pending_local_author_events_and_updates_state() { + fn sync_status_reports_sdk_pending_retryable_and_terminal_outbox_counts() { let dir = tempdir().expect("tempdir"); let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - crate::runtime::store::init(&config).expect("store init"); - let signing = - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - seed_replica_farm( + + let view = sdk_sync_status_view( &config, - signing - .account - .record - .public_identity - .public_key_hex - .as_str(), + sdk_status_receipt( + 3, + 4, + 1, + 1, + 2, + 1, + 1, + 0, + Some(1_700_000_010_000), + Some("auth-required: login".to_owned()), + &["wss://relay.example.com"], + ), ); - let before = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status before"); - - let view = push_with_publisher(&config, |identity, relays, event| { - let signed = signed_event( - identity, - WireEventParts { - kind: event.draft.kind, - content: event.draft.content.clone(), - tags: event.draft.tags.clone(), - }, - ); - Ok(DirectRelayPublishReceipt { - event_id: signed.id.to_hex(), - created_at: u32::try_from(signed.created_at.as_secs()).unwrap_or(u32::MAX), - signature: signed.sig.to_string(), - event: signed, - target_relays: relays.to_vec(), - connected_relays: relays.to_vec(), - acknowledged_relays: relays.to_vec(), - failed_relays: Vec::new(), - }) - }) - .expect("sync push"); - let after = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status after"); - assert!(before.pending_count > 0); - assert_eq!(view.state, "published"); - assert_eq!(view.published_count, Some(before.pending_count)); - assert_eq!(view.failed_count, Some(0)); - assert_eq!(view.connected_relays, vec!["wss://relay.example.com"]); - assert_eq!(view.acknowledged_relays, vec!["wss://relay.example.com"]); - assert_eq!(after.pending_count, 0); + assert_eq!(view.state, "ready"); + assert_eq!(view.queue.expected_count, 4); + assert_eq!(view.queue.pending_count, 2); + assert_eq!(view.queue.retryable_count, Some(1)); + assert_eq!(view.queue.terminal_count, Some(2)); + assert_eq!(view.queue.failed_terminal_count, Some(1)); + assert_eq!(view.queue.ready_signed_count, Some(1)); + assert_eq!(view.queue.last_attempt_at_ms, Some(1_700_000_010_000)); + assert_eq!( + view.queue.last_error.as_deref(), + Some("auth-required: login") + ); + assert_eq!(view.actions, vec!["radroots sync push"]); } #[test] - fn sync_push_reports_partial_when_other_author_events_remain_pending() { + fn sync_push_dry_run_reports_sdk_ready_outbox_plan() { let dir = tempdir().expect("tempdir"); - let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - crate::runtime::store::init(&config).expect("store init"); - let signing = - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - let selected_pubkey = signing - .account - .record - .public_identity - .public_key_hex - .clone(); - let other_pubkey = identity(43).public_key_hex(); - seed_replica_farm(&config, selected_pubkey.as_str()); - seed_replica_member_claim(&config, other_pubkey.as_str(), selected_pubkey.as_str()); - - let before = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status before"); - let view = push_with_publisher(&config, |identity, relays, event| { - assert!(event.author.eq_ignore_ascii_case(selected_pubkey.as_str())); - let signed = signed_event( - identity, - WireEventParts { - kind: event.draft.kind, - content: event.draft.content.clone(), - tags: event.draft.tags.clone(), - }, - ); - Ok(DirectRelayPublishReceipt { - event_id: signed.id.to_hex(), - created_at: u32::try_from(signed.created_at.as_secs()).unwrap_or(u32::MAX), - signature: signed.sig.to_string(), - event: signed, - target_relays: relays.to_vec(), - connected_relays: relays.to_vec(), - acknowledged_relays: relays.to_vec(), - failed_relays: Vec::new(), - }) - }) - .expect("sync push"); - let after = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status after"); + let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); + config.output.dry_run = true; - assert!(before.pending_count > after.pending_count); - assert_eq!(view.state, "partial"); - assert_eq!(view.published_count, Some(before.pending_count - 1)); - assert_eq!(view.skipped_count, Some(1)); - assert_eq!(after.pending_count, 1); - assert!( - view.reason - .as_deref() - .expect("partial reason") - .contains("belong to another author") + let view = sdk_push_dry_run_view( + &config, + sdk_status_receipt( + 1, + 1, + 1, + 0, + 0, + 0, + 1, + 0, + None, + None, + &["wss://relay.example.com"], + ), ); - assert!( - view.actions - .contains(&"radroots sync status get".to_owned()) + + assert_eq!(view.state, "dry_run"); + assert_eq!(view.source, "SDK outbox push"); + assert_eq!(view.replica_db, "legacy_derived_not_checked"); + assert_eq!(view.target_relays, vec!["wss://relay.example.com"]); + assert_eq!(view.publishable_count, Some(1)); + assert_eq!(view.published_count, Some(0)); + assert_eq!(view.failed_count, Some(0)); + assert_eq!(view.reason_code.as_deref(), Some("dry_run")); + assert_eq!( + view.reason.as_deref(), + Some("dry run requested; SDK outbox push skipped") ); - assert!(view.actions.contains(&"radroots account list".to_owned())); - assert!( - view.actions - .iter() - .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push")) + assert_eq!( + view.actions, + vec!["radroots sync push", "radroots sync status get"] ); + assert!(view.publish_plan.is_none()); } #[test] - fn sync_push_reports_unconfigured_when_only_other_author_events_are_pending() { + fn sync_push_empty_queue_reports_ready_sdk_state() { let dir = tempdir().expect("tempdir"); - let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - crate::runtime::store::init(&config).expect("store init"); - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - let other_pubkey = identity(44).public_key_hex(); - seed_replica_farm(&config, other_pubkey.as_str()); + let config = sample_config(dir.path(), Vec::new()); - let view = push_with_publisher(&config, |_, _, _| { - panic!("other-author-only queue must not publish") - }) - .expect("sync push"); - let after = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status after"); + let view = sdk_push_view( + &config, + PushOutboxReceipt::default(), + sdk_status_receipt(0, 0, 0, 0, 0, 0, 0, 0, None, None, &[]), + ); - assert_eq!(view.state, "unconfigured"); + assert_eq!(view.state, "ready"); assert_eq!(view.publishable_count, Some(0)); assert_eq!(view.published_count, Some(0)); - let skipped_count = view.skipped_count.expect("skipped count"); - assert!(skipped_count > 0); - assert_eq!(after.pending_count, skipped_count); - assert!( - view.reason - .as_deref() - .expect("unconfigured reason") - .contains("belong to another author") - ); - assert!( - view.actions - .contains(&"radroots sync status get".to_owned()) - ); - assert!(view.actions.contains(&"radroots account list".to_owned())); - assert!( - view.actions - .iter() - .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push")) + assert_eq!(view.failed_count, Some(0)); + assert_eq!( + view.reason.as_deref(), + Some("SDK outbox had no ready signed events to push") ); + assert_eq!(view.actions, vec!["radroots sync status get"]); } #[test] - fn sync_push_failed_publish_leaves_pending_state_retryable() { + fn sync_push_maps_published_and_auth_required_sdk_receipts() { let dir = tempdir().expect("tempdir"); - let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]); - crate::runtime::store::init(&config).expect("store init"); - let signing = - crate::runtime::account::create_or_migrate_default_account(&config).expect("account"); - seed_replica_farm( - &config, - signing - .account - .record - .public_identity - .public_key_hex - .as_str(), + let config = sample_config( + dir.path(), + vec![ + "wss://relay-a.example.com".to_owned(), + "wss://relay-b.example.com".to_owned(), + ], ); + let receipt = PushOutboxReceipt { + attempted_events: 2, + published_events: 1, + retryable_events: 1, + terminal_events: 0, + events: vec![ + sdk_push_event( + "a", + PushOutboxEventState::Published, + PushOutboxRelayOutcomeKind::Accepted, + "wss://relay-a.example.com", + Some("accepted".to_owned()), + ), + sdk_push_event( + "b", + PushOutboxEventState::PublishRetryable, + PushOutboxRelayOutcomeKind::AuthRequired, + "wss://relay-b.example.com", + Some("auth-required: login".to_owned()), + ), + ], + }; - let view = push_with_publisher(&config, |_, relays, _| { - Err(super::DirectRelayPublishError::Publish { - event_id: "0".repeat(64), - reason: "relay refused event".to_owned(), - target_relays: relays.to_vec(), - connected_relays: relays.to_vec(), - failed_relays: vec![DirectRelayFailure { - relay: relays[0].clone(), - reason: "relay refused event".to_owned(), - }], - }) - }) - .expect("sync push failure view"); - let status = radroots_replica_sync_status( - &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"), - ) - .expect("status"); + let view = sdk_push_view( + &config, + receipt, + sdk_status_receipt( + 2, + 2, + 0, + 1, + 1, + 0, + 0, + 0, + Some(1_700_000_020_000), + Some("auth-required: login".to_owned()), + &["wss://relay-a.example.com", "wss://relay-b.example.com"], + ), + ); - assert_eq!(view.state, "unavailable"); - assert_eq!(view.published_count, Some(0)); + assert_eq!(view.state, "partial"); + assert_eq!(view.publishable_count, Some(2)); + assert_eq!(view.published_count, Some(1)); assert_eq!(view.failed_count, Some(1)); - assert!(status.pending_count > 0); + assert_eq!(view.reason_code.as_deref(), Some("sdk_outbox_push_partial")); + assert_eq!( + view.target_relays, + vec![ + "wss://relay-a.example.com".to_owned(), + "wss://relay-b.example.com".to_owned() + ] + ); + assert_eq!( + view.connected_relays, + vec![ + "wss://relay-a.example.com".to_owned(), + "wss://relay-b.example.com".to_owned() + ] + ); + assert_eq!( + view.acknowledged_relays, + vec!["wss://relay-a.example.com".to_owned()] + ); + assert_eq!(view.failed_relays.len(), 1); + assert_eq!(view.failed_relays[0].relay, "wss://relay-b.example.com"); + assert_eq!(view.failed_relays[0].reason, "auth-required: login"); + assert_eq!( + view.actions, + vec!["radroots sync push", "radroots sync status get"] + ); } #[test] - fn sync_push_rejects_radrootsd_before_store_relay_or_signer_checks() { + fn sync_push_rejects_radrootsd_before_store_or_sdk_work() { let dir = tempdir().expect("tempdir"); let mut config = sample_config(dir.path(), Vec::new()); config.publish.mode = PublishMode::Radrootsd; - let view = push_with_publisher(&config, |_, _, _| { - panic!("radrootsd sync push must not publish") - }) - .expect("radrootsd sync push view"); + let view = push(&config).expect("radrootsd sync push view"); assert_eq!(view.state, "unavailable"); assert_eq!(view.replica_db, "not_checked"); @@ -1990,6 +1770,96 @@ mod tests { assert!(!config.local.replica_db_path.exists()); } + fn sdk_status_receipt( + total_events: i64, + outbox_total_events: i64, + pending_events: i64, + retryable_events: i64, + terminal_events: i64, + failed_terminal_events: i64, + ready_signed_events: i64, + publishing_events: i64, + last_attempt_at_ms: Option<i64>, + last_error: Option<String>, + relays: &[&str], + ) -> SyncStatusReceipt { + SyncStatusReceipt { + source: SyncStatusSource::SdkCanonicalStores, + observed_at_ms: 1_700_000_030_000, + event_store: SyncEventStoreStatus { + total_events, + projection_eligible_events: total_events, + relay_observations: 0, + last_event_seq: (total_events > 0).then_some(total_events), + last_event_updated_at_ms: (total_events > 0).then_some(1_700_000_000_000), + }, + outbox: SyncOutboxStatus { + total_events: outbox_total_events, + pending_events, + retryable_events, + terminal_events, + failed_terminal_events, + ready_signed_events, + publishing_events, + last_attempt_at_ms, + last_error, + }, + relay_targets: SyncRelayTargetSummary { + configured_count: relays.len(), + configured_relays: relays.iter().map(|relay| (*relay).to_owned()).collect(), + }, + } + } + + fn sdk_push_event( + event_id_prefix: &str, + final_state: PushOutboxEventState, + outcome_kind: PushOutboxRelayOutcomeKind, + relay_url: &str, + message: Option<String>, + ) -> PushOutboxEventReceipt { + PushOutboxEventReceipt { + event_id: RadrootsEventId::parse(event_id_prefix.repeat(64).as_str()) + .expect("event id"), + outbox_event_id: 7, + final_state, + attempted_count: 1, + accepted_count: usize::from(matches!( + outcome_kind, + PushOutboxRelayOutcomeKind::Accepted + | PushOutboxRelayOutcomeKind::DuplicateAccepted + )), + retryable_count: usize::from(matches!( + outcome_kind, + PushOutboxRelayOutcomeKind::AuthRequired + | PushOutboxRelayOutcomeKind::Timeout + | PushOutboxRelayOutcomeKind::ConnectionFailed + )), + terminal_count: usize::from(matches!( + outcome_kind, + PushOutboxRelayOutcomeKind::Blocked + | PushOutboxRelayOutcomeKind::RateLimited + | PushOutboxRelayOutcomeKind::Invalid + | PushOutboxRelayOutcomeKind::PowRequired + | PushOutboxRelayOutcomeKind::Restricted + | PushOutboxRelayOutcomeKind::Error + | PushOutboxRelayOutcomeKind::Unknown + )), + quorum: 1, + quorum_met: matches!( + outcome_kind, + PushOutboxRelayOutcomeKind::Accepted + | PushOutboxRelayOutcomeKind::DuplicateAccepted + ), + relays: vec![PushOutboxRelayReceipt { + relay_url: relay_url.to_owned(), + outcome_kind, + attempted: true, + message, + }], + } + } + #[test] fn sync_pull_ingests_relay_events_and_market_reads_without_daemon() { let dir = tempdir().expect("tempdir"); @@ -2185,7 +2055,7 @@ mod tests { } #[test] - fn sync_status_reports_relay_set_changed_freshness() { + fn sync_pull_freshness_reports_relay_set_changed() { let dir = tempdir().expect("tempdir"); let config = sample_config(dir.path(), vec!["wss://relay-a.example.com".to_owned()]); crate::runtime::store::init(&config).expect("store init"); @@ -2193,10 +2063,11 @@ mod tests { pull_with_fetcher(&config, fake_fetcher(vec![listing_event(&seller)])).expect("sync pull"); let changed = sample_config(dir.path(), vec!["wss://relay-b.example.com".to_owned()]); - let view = status(&changed).expect("sync status"); + let freshness = + freshness_for_scope(&changed, RelayIngestScope::SyncPull).expect("sync freshness"); - assert_eq!(view.freshness.state, "relay_set_changed"); - let run = view.freshness.run.as_ref().expect("run freshness"); + assert_eq!(freshness.state, "relay_set_changed"); + let run = freshness.run.as_ref().expect("run freshness"); assert_eq!(run.scope, "sync_pull"); assert_eq!(run.relay_set_current, false); } @@ -2401,41 +2272,6 @@ mod tests { .expect("signed event") } - fn seed_replica_farm(config: &RuntimeConfig, pubkey: &str) { - let executor = SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"); - migrations::run_all_up(&executor).expect("migrations"); - let _ = farm::create( - &executor, - &IFarmFields { - d_tag: FARM_D_TAG.to_owned(), - pubkey: pubkey.to_owned(), - name: "Local Farm".to_owned(), - about: Some("local replica farm".to_owned()), - website: None, - picture: None, - banner: None, - location_primary: None, - location_city: None, - location_region: None, - location_country: None, - }, - ) - .expect("farm"); - } - - fn seed_replica_member_claim(config: &RuntimeConfig, member_pubkey: &str, farm_pubkey: &str) { - let executor = SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"); - migrations::run_all_up(&executor).expect("migrations"); - let _ = farm_member_claim::create( - &executor, - &IFarmMemberClaimFields { - member_pubkey: member_pubkey.to_owned(), - farm_pubkey: farm_pubkey.to_owned(), - }, - ) - .expect("member claim"); - } - fn identity(seed: u8) -> RadrootsIdentity { RadrootsIdentity::from_secret_key_bytes(&[seed; 32]).expect("identity") } diff --git a/src/view/runtime.rs b/src/view/runtime.rs @@ -3352,6 +3352,22 @@ pub struct SyncRunFreshnessView { pub struct SyncQueueView { pub expected_count: usize, pub pending_count: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub total_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub retryable_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub terminal_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub failed_terminal_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub ready_signed_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub publishing_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_attempt_at_ms: Option<i64>, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option<String>, } #[derive(Debug, Clone, Serialize)] diff --git a/tests/target_cli.rs b/tests/target_cli.rs @@ -20,12 +20,9 @@ use radroots_local_events::{ PublishOutboxStatus, RelayDeliveryEvidence, SourceRuntime, canonical_relay_set_fingerprint, }; use radroots_nostr::prelude::{RadrootsNostrEvent, radroots_nostr_build_event}; -use radroots_replica_db::{farm, farm_member_claim, migrations}; +use radroots_replica_db::{farm, migrations}; use radroots_replica_db_schema::farm::IFarmFields; -use radroots_replica_db_schema::farm_member_claim::IFarmMemberClaimFields; -use radroots_replica_sync::{ - RadrootsReplicaPendingPublishBatch, radroots_replica_pending_publish_batch, -}; +use radroots_replica_sync::radroots_replica_pending_publish_batch; use radroots_sql_core::SqliteExecutor; use serde_json::Value; use serde_json::json; @@ -42,7 +39,7 @@ use support::{ const LISTING_ADDR: &str = "30402:1111111111111111111111111111111111111111111111111111111111111111:AAAAAAAAAAAAAAAAAAAAAg"; -const SYNC_PUSH_FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA"; +const LEGACY_SYNC_PUSH_FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA"; fn test_order_id(value: &str) -> RadrootsOrderId { value.parse().expect("valid order id") @@ -193,7 +190,7 @@ fn read_relay_event_message(websocket: &mut tungstenite::WebSocket<TcpStream>) - } } -fn seed_sync_push_farm(sandbox: &RadrootsCliSandbox, d_tag: &str, pubkey: &str) { +fn seed_legacy_replica_sync_farm(sandbox: &RadrootsCliSandbox, d_tag: &str, pubkey: &str) { let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica"); migrations::run_all_up(&executor).expect("replica migrations"); farm::create( @@ -212,29 +209,7 @@ fn seed_sync_push_farm(sandbox: &RadrootsCliSandbox, d_tag: &str, pubkey: &str) location_country: None, }, ) - .expect("seed sync push farm"); -} - -fn seed_sync_push_member_claim( - sandbox: &RadrootsCliSandbox, - member_pubkey: &str, - farm_pubkey: &str, -) { - let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica"); - migrations::run_all_up(&executor).expect("replica migrations"); - farm_member_claim::create( - &executor, - &IFarmMemberClaimFields { - member_pubkey: member_pubkey.to_owned(), - farm_pubkey: farm_pubkey.to_owned(), - }, - ) - .expect("seed sync push member claim"); -} - -fn sync_push_pending_batch(sandbox: &RadrootsCliSandbox) -> RadrootsReplicaPendingPublishBatch { - let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica"); - radroots_replica_pending_publish_batch(&executor).expect("sync push pending batch") + .expect("seed legacy replica sync farm"); } fn seed_app_farm_record( @@ -2330,10 +2305,12 @@ fn target_outputs_do_not_suggest_removed_command_families() { } let sync_args = ["--format", "json", "sync", "status", "get"]; - let (output, value) = sandbox.json_output(&sync_args); - assert!(!output.status.success()); + let value = sandbox.json_success(&sync_args); assert_eq!(value["operation_id"], "sync.status.get"); - assert_eq!(value["errors"][0]["code"], "operation_unavailable"); + assert_eq!( + value["result"]["source"], + "SDK canonical event store and outbox" + ); assert_no_removed_command_reference(&value, &sync_args); } @@ -5484,36 +5461,58 @@ fn listing_publish_failure_uses_sdk_outbox_without_legacy_local_event_record() { } #[test] -fn sync_push_partial_mixed_author_queue_reports_error_envelope() { +fn sync_push_sdk_outbox_failure_reports_network_unavailable() { let sandbox = RadrootsCliSandbox::new(); sandbox.json_success(&["--format", "json", "account", "create"]); - let signer = sandbox.json_success(&["--format", "json", "signer", "status", "get"]); - let selected_pubkey = signer["result"]["local"]["public_identity"]["public_key_hex"] + let farm = sandbox.json_success(&[ + "--format", + "json", + "farm", + "create", + "--name", + "Sync SDK Farm", + "--location", + "farmstand", + "--country", + "US", + "--delivery-method", + "pickup", + ]); + let farm_d_tag = farm["result"]["config"]["farm_d_tag"] .as_str() - .expect("selected public key"); - sandbox.json_success(&["--format", "json", "store", "init"]); - let other_pubkey = identity_public(81).public_key_hex; - let other_pubkey_canonical = other_pubkey.to_ascii_lowercase(); - seed_sync_push_farm(&sandbox, SYNC_PUSH_FARM_D_TAG, selected_pubkey); - seed_sync_push_member_claim(&sandbox, other_pubkey.as_str(), selected_pubkey); - let batch = sync_push_pending_batch(&sandbox); - let expected_publishable_count = batch - .pending_events - .iter() - .filter(|event| event.author.eq_ignore_ascii_case(selected_pubkey)) - .count(); - let expected_skipped_count = batch.pending_count - expected_publishable_count; - assert!(expected_publishable_count > 0); - assert!(expected_skipped_count > 0); - let relay = - RelayPublishServer::with_publish_outcomes(vec![(true, ""); expected_publishable_count]); - let relay_url = relay.endpoint().to_owned(); + .expect("farm d tag"); + let listing_file = create_listing_draft(&sandbox, "sync-sdk-push-eggs"); + make_listing_publishable(&listing_file, farm_d_tag); + let relay = "ws://127.0.0.1:9"; + let publish = sandbox.json_success(&[ + "--format", + "json", + "--offline", + "--relay", + relay, + "--approval-token", + "approve", + "listing", + "publish", + listing_file.to_string_lossy().as_ref(), + ]); + + assert_eq!(publish["operation_id"], "listing.publish"); + assert_eq!(publish["result"]["state"], "queued"); + let status = sandbox.json_success(&["--format", "json", "sync", "status", "get"]); + assert_eq!( + status["result"]["source"], + "SDK canonical event store and outbox" + ); + assert_eq!(status["result"]["replica_db"], "legacy_derived_not_checked"); + assert_eq!(status["result"]["queue"]["pending_count"], 1); + assert_eq!(status["result"]["queue"]["ready_signed_count"], 1); let (output, value) = sandbox.json_output(&[ "--format", "json", "--relay", - relay_url.as_str(), + relay, "--approval-token", "approve", "sync", @@ -5524,56 +5523,48 @@ fn sync_push_partial_mixed_author_queue_reports_error_envelope() { assert_eq!(value["operation_id"], "sync.push"); assert_eq!(value["result"], Value::Null); assert_eq!(value["errors"][0]["code"], "network_unavailable", "{value}"); - assert_eq!(value["errors"][0]["detail"]["state"], "partial"); - assert_eq!( - value["errors"][0]["detail"]["queue"]["pending_count"], - json!(expected_skipped_count) - ); - assert_eq!( - value["errors"][0]["detail"]["published_count"], - json!(expected_publishable_count) - ); + assert_eq!(value["errors"][0]["detail"]["state"], "unavailable"); + assert_eq!(value["errors"][0]["detail"]["source"], "SDK outbox push"); assert_eq!( - value["errors"][0]["detail"]["skipped_count"], - json!(expected_skipped_count) - ); - assert_contains( - &value["errors"][0]["detail"]["reason"], - "belong to another author", + value["errors"][0]["detail"]["replica_db"], + "legacy_derived_not_checked" ); + assert_eq!(value["errors"][0]["detail"]["target_relays"][0], relay); assert_eq!( - value["errors"][0]["detail"]["actions"][1], - "radroots account list" + value["errors"][0]["detail"]["failed_relays"][0]["relay"], + relay ); + assert_eq!(value["errors"][0]["detail"]["publishable_count"], 1); + assert_eq!(value["errors"][0]["detail"]["published_count"], 0); + assert_eq!(value["errors"][0]["detail"]["failed_count"], 1); assert_eq!( - value["errors"][0]["detail"]["actions"][2], - format!("radroots --account-id {other_pubkey_canonical} sync push") + value["errors"][0]["detail"]["reason"], + "SDK outbox push did not reach accepted quorum for any ready event" ); assert_eq!( - value["next_actions"][2]["command"], - format!("radroots --account-id {other_pubkey_canonical} sync push") - ); - let requests = relay.take_requests(expected_publishable_count); - assert_eq!(requests.len(), expected_publishable_count); - assert!( - requests - .iter() - .all(|request| request["pubkey"] == selected_pubkey) + value["errors"][0]["detail"]["actions"][0], + "radroots sync push" ); assert_no_removed_command_reference(&value, &["sync", "push"]); assert_no_daemon_runtime_reference(&value, &["sync", "push"]); } #[test] -fn sync_push_other_author_only_queue_reports_unconfigured_error_envelope() { +fn sync_push_ignores_legacy_replica_pending_queue_for_sdk_canonical_push() { let sandbox = RadrootsCliSandbox::new(); sandbox.json_success(&["--format", "json", "account", "create"]); sandbox.json_success(&["--format", "json", "store", "init"]); - let other_pubkey = identity_public(82).public_key_hex; - let other_pubkey_canonical = other_pubkey.to_ascii_lowercase(); - seed_sync_push_farm(&sandbox, SYNC_PUSH_FARM_D_TAG, other_pubkey.as_str()); + let signer = sandbox.json_success(&["--format", "json", "signer", "status", "get"]); + let selected_pubkey = signer["result"]["local"]["public_identity"]["public_key_hex"] + .as_str() + .expect("selected public key"); + seed_legacy_replica_sync_farm(&sandbox, LEGACY_SYNC_PUSH_FARM_D_TAG, selected_pubkey); + let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica"); + let legacy_batch = + radroots_replica_pending_publish_batch(&executor).expect("legacy pending batch"); + assert!(legacy_batch.pending_count > 0); - let (output, value) = sandbox.json_output(&[ + let value = sandbox.json_success(&[ "--format", "json", "--relay", @@ -5584,36 +5575,18 @@ fn sync_push_other_author_only_queue_reports_unconfigured_error_envelope() { "push", ]); - assert!(!output.status.success(), "{value}"); assert_eq!(value["operation_id"], "sync.push"); - assert_eq!(value["result"], Value::Null); - assert_eq!( - value["errors"][0]["code"], "operation_unavailable", - "{value}" - ); - assert_eq!(value["errors"][0]["detail"]["state"], "unconfigured"); - let pending_count = value["errors"][0]["detail"]["queue"]["pending_count"] - .as_u64() - .expect("pending count"); - assert!(pending_count > 0); - assert_eq!(value["errors"][0]["detail"]["publishable_count"], 0); - assert_eq!(value["errors"][0]["detail"]["published_count"], 0); - assert_eq!(value["errors"][0]["detail"]["skipped_count"], pending_count); - assert_contains( - &value["errors"][0]["detail"]["reason"], - "belong to another author", - ); - assert_eq!( - value["errors"][0]["detail"]["actions"][1], - "radroots account list" - ); - assert_eq!( - value["errors"][0]["detail"]["actions"][2], - format!("radroots --account-id {other_pubkey_canonical} sync push") - ); + assert_eq!(value["result"]["state"], "ready"); + assert_eq!(value["result"]["source"], "SDK outbox push"); + assert_eq!(value["result"]["replica_db"], "legacy_derived_not_checked"); + assert_eq!(value["result"]["queue"]["pending_count"], 0); + assert_eq!(value["result"]["queue"]["total_count"], 0); + assert_eq!(value["result"]["publishable_count"], 0); + assert_eq!(value["result"]["published_count"], 0); + assert_eq!(value["result"]["failed_count"], 0); assert_eq!( - value["next_actions"][2]["command"], - format!("radroots --account-id {other_pubkey_canonical} sync push") + value["result"]["reason"], + "SDK outbox had no ready signed events to push" ); assert_no_removed_command_reference(&value, &["sync", "push"]); assert_no_daemon_runtime_reference(&value, &["sync", "push"]); @@ -5639,14 +5612,19 @@ fn buyer_market_sync_basket_dry_runs_preflight_without_mutating_local_state() { assert_eq!(sync_pull["errors"][0]["detail"]["state"], "unconfigured"); assert_eq!(sync_pull["errors"][0]["detail"]["replica_db"], "missing"); - let (sync_push_output, sync_push) = - sandbox.json_output(&["--format", "json", "--dry-run", "sync", "push"]); - assert!(!sync_push_output.status.success()); + let sync_push = sandbox.json_success(&["--format", "json", "--dry-run", "sync", "push"]); assert_eq!(sync_push["operation_id"], "sync.push"); assert_eq!(sync_push["dry_run"], true); - assert_eq!(sync_push["errors"][0]["code"], "operation_unavailable"); - assert_eq!(sync_push["errors"][0]["detail"]["state"], "unconfigured"); - assert_eq!(sync_push["errors"][0]["detail"]["replica_db"], "missing"); + assert_eq!(sync_push["result"]["state"], "ready"); + assert_eq!(sync_push["result"]["source"], "SDK outbox push"); + assert_eq!( + sync_push["result"]["replica_db"], + "legacy_derived_not_checked" + ); + assert_eq!(sync_push["result"]["queue"]["pending_count"], 0); + assert_eq!(sync_push["result"]["queue"]["total_count"], 0); + assert_eq!(sync_push["result"]["publishable_count"], 0); + assert_eq!(sync_push["result"]["published_count"], 0); sandbox.json_success(&["--format", "json", "store", "init"]); let relay_refresh = sandbox.json_success(&[ @@ -5680,6 +5658,11 @@ fn buyer_market_sync_basket_dry_runs_preflight_without_mutating_local_state() { assert_eq!(sync_push_ready["operation_id"], "sync.push"); assert_eq!(sync_push_ready["dry_run"], true); assert_eq!(sync_push_ready["result"]["state"], "ready"); + assert_eq!(sync_push_ready["result"]["source"], "SDK outbox push"); + assert_eq!( + sync_push_ready["result"]["replica_db"], + "legacy_derived_not_checked" + ); assert_eq!( sync_push_ready["result"]["target_relays"][0], "ws://127.0.0.1:9"