commit a669185d463c2e0ebafefcc47449b9db442351fb
parent 66faea106d0a7c9186df9a0a95df5325fdf4e290
Author: triesap <tyson@radroots.org>
Date: Sat, 9 May 2026 02:17:27 +0000
cli: implement relay sync push
Diffstat:
5 files changed, 661 insertions(+), 75 deletions(-)
diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs
@@ -2903,12 +2903,18 @@ pub struct SyncActionView {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub connected_relays: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub acknowledged_relays: Vec<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
pub failed_relays: Vec<RelayFailureView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub fetched_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ingested_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
+ pub publishable_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub published_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub skipped_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub unsupported_count: Option<usize>,
@@ -2925,6 +2931,7 @@ impl SyncActionView {
match self.state.as_str() {
"unconfigured" => CommandDisposition::Unconfigured,
"unavailable" => CommandDisposition::ExternalUnavailable,
+ "partial" => CommandDisposition::ExternalUnavailable,
"error" => CommandDisposition::InternalError,
_ => CommandDisposition::Success,
}
diff --git a/src/operation_registry.rs b/src/operation_registry.rs
@@ -422,7 +422,7 @@ pub const OPERATION_REGISTRY: &[OperationSpec] = &[
"Push local signed updates to relays.",
Any,
true,
- Conditional,
+ Required,
High,
true,
true
@@ -1136,6 +1136,7 @@ pub fn requires_local_signer_mode(operation_id: &str) -> bool {
operation_id,
"signer.status.get"
| "farm.publish"
+ | "sync.push"
| "listing.publish"
| "listing.archive"
| "order.submit"
@@ -1153,7 +1154,8 @@ pub fn requires_local_signer_mode(operation_id: &str) -> bool {
pub fn requires_nostr_relay_publish_mode(operation_id: &str) -> bool {
matches!(
operation_id,
- "order.submit"
+ "sync.push"
+ | "order.submit"
| "order.accept"
| "order.decline"
| "order.cancel"
@@ -1349,6 +1351,7 @@ mod tests {
"account.import",
"account.attach_secret",
"account.remove",
+ "sync.push",
"farm.publish",
"listing.publish",
"listing.archive",
@@ -1479,6 +1482,7 @@ mod tests {
.collect::<BTreeSet<_>>();
let expected = [
"signer.status.get",
+ "sync.push",
"farm.publish",
"listing.publish",
"listing.archive",
@@ -1506,6 +1510,7 @@ mod tests {
.map(|operation| operation.operation_id)
.collect::<BTreeSet<_>>();
let expected = [
+ "sync.push",
"order.submit",
"order.accept",
"order.decline",
diff --git a/src/operation_runtime.rs b/src/operation_runtime.rs
@@ -1,6 +1,7 @@
use serde::Serialize;
use serde_json::Value;
+use crate::domain::runtime::{CommandDisposition, SyncActionView, SyncStatusView};
use crate::operation_adapter::{
OperationAdapterError, OperationRequest, OperationRequestData, OperationRequestPayload,
OperationResult, OperationResultData, OperationService, RelayListRequest, RelayListResult,
@@ -53,8 +54,8 @@ impl OperationService<SyncStatusGetRequest> for RuntimeOperationService<'_> {
&self,
_request: OperationRequest<SyncStatusGetRequest>,
) -> Result<OperationResult<Self::Result>, OperationAdapterError> {
- let view = map_runtime(crate::runtime::sync::status(self.config))?;
- serialized_operation_result::<SyncStatusGetResult, _>(&view)
+ let view = map_runtime("sync.status.get", crate::runtime::sync::status(self.config))?;
+ sync_status_result(&view)
}
}
@@ -65,8 +66,8 @@ impl OperationService<SyncPullRequest> for RuntimeOperationService<'_> {
&self,
_request: OperationRequest<SyncPullRequest>,
) -> Result<OperationResult<Self::Result>, OperationAdapterError> {
- let view = map_runtime(crate::runtime::sync::pull(self.config))?;
- serialized_operation_result::<SyncPullResult, _>(&view)
+ let view = map_runtime("sync.pull", crate::runtime::sync::pull(self.config))?;
+ sync_action_result::<SyncPullResult>("sync.pull", &view)
}
}
@@ -75,10 +76,13 @@ impl OperationService<SyncPushRequest> for RuntimeOperationService<'_> {
fn execute(
&self,
- _request: OperationRequest<SyncPushRequest>,
+ request: OperationRequest<SyncPushRequest>,
) -> Result<OperationResult<Self::Result>, OperationAdapterError> {
- let view = map_runtime(crate::runtime::sync::push(self.config))?;
- serialized_operation_result::<SyncPushResult, _>(&view)
+ if request.context.requires_approval_token() {
+ return Err(OperationAdapterError::approval_required("sync.push"));
+ }
+ let view = map_runtime("sync.push", crate::runtime::sync::push(self.config))?;
+ sync_action_result::<SyncPushResult>("sync.push", &view)
}
}
@@ -93,7 +97,10 @@ impl OperationService<SyncWatchRequest> for RuntimeOperationService<'_> {
frames: usize_input(&request, "frames").unwrap_or(1),
interval_ms: u64_input(&request, "interval_ms").unwrap_or(1_000),
};
- let view = map_runtime(crate::runtime::sync::watch(self.config, &args))?;
+ let view = map_runtime(
+ "sync.watch",
+ crate::runtime::sync::watch(self.config, &args),
+ )?;
serialized_operation_result::<SyncWatchResult, _>(&view)
}
}
@@ -106,8 +113,81 @@ where
OperationResult::new(R::from_serializable(value)?)
}
-fn map_runtime<T>(result: Result<T, RuntimeError>) -> Result<T, OperationAdapterError> {
- result.map_err(|error| OperationAdapterError::Runtime(error.to_string()))
+fn sync_status_result(
+ view: &SyncStatusView,
+) -> Result<OperationResult<SyncStatusGetResult>, OperationAdapterError> {
+ match view.disposition() {
+ CommandDisposition::Success => serialized_operation_result::<SyncStatusGetResult, _>(view),
+ disposition => Err(sync_view_error(
+ "sync.status.get",
+ disposition,
+ view,
+ view.reason.as_deref(),
+ )),
+ }
+}
+
+fn sync_action_result<R>(
+ operation_id: &str,
+ view: &SyncActionView,
+) -> Result<OperationResult<R>, OperationAdapterError>
+where
+ R: OperationResultData,
+{
+ match view.disposition() {
+ CommandDisposition::Success => serialized_operation_result::<R, _>(view),
+ disposition => Err(sync_view_error(
+ operation_id,
+ disposition,
+ view,
+ view.reason.as_deref(),
+ )),
+ }
+}
+
+fn sync_view_error<T>(
+ operation_id: &str,
+ disposition: CommandDisposition,
+ view: &T,
+ reason: Option<&str>,
+) -> OperationAdapterError
+where
+ T: Serialize,
+{
+ let detail = serde_json::to_value(view).unwrap_or_else(|_| Value::Object(Default::default()));
+ let message = reason
+ .map(str::to_owned)
+ .unwrap_or_else(|| format!("`{operation_id}` is not ready"));
+ match disposition {
+ CommandDisposition::Unconfigured => {
+ OperationAdapterError::operation_unavailable_with_detail(operation_id, message, detail)
+ }
+ CommandDisposition::ExternalUnavailable => {
+ OperationAdapterError::network_unavailable_with_detail(operation_id, message, detail)
+ }
+ CommandDisposition::Unsupported => OperationAdapterError::InvalidInput {
+ operation_id: operation_id.to_owned(),
+ message,
+ },
+ CommandDisposition::ValidationFailed => OperationAdapterError::ValidationFailed {
+ operation_id: operation_id.to_owned(),
+ message,
+ },
+ CommandDisposition::NotFound => OperationAdapterError::NotFound {
+ operation_id: operation_id.to_owned(),
+ message,
+ },
+ CommandDisposition::InternalError | CommandDisposition::Success => {
+ OperationAdapterError::Runtime(message)
+ }
+ }
+}
+
+fn map_runtime<T>(
+ operation_id: &str,
+ result: Result<T, RuntimeError>,
+) -> Result<T, OperationAdapterError> {
+ result.map_err(|error| OperationAdapterError::runtime_failure(operation_id, error))
}
fn usize_input<P>(request: &OperationRequest<P>, key: &str) -> Option<usize>
@@ -189,13 +269,12 @@ mod tests {
let sync =
OperationRequest::new(OperationContext::default(), SyncStatusGetRequest::default())
.expect("sync status request");
- let sync_envelope = service
- .execute(sync)
- .expect("sync status result")
- .to_envelope(OperationContext::default().envelope_context("req_sync"))
- .expect("sync envelope");
- assert_eq!(sync_envelope.operation_id, "sync.status.get");
- assert_eq!(sync_envelope.result["state"], "unconfigured");
+ let error = service.execute(sync).expect_err("sync status unconfigured");
+ let output = error.to_output_error();
+
+ assert_eq!(output.code, "operation_unavailable");
+ assert_eq!(output.exit_code, 3);
+ assert!(error.to_string().contains("sync.status.get"));
}
fn sample_config(root: &Path, relays: Vec<String>) -> RuntimeConfig {
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -8,13 +8,16 @@ use radroots_events::kinds::{
KIND_LIST_SET_PICTURE, KIND_LIST_SET_RELAY, KIND_LIST_SET_RELEASE_ARTIFACT,
KIND_LIST_SET_STARTER_PACK, KIND_LIST_SET_VIDEO, KIND_LISTING, KIND_PLOT, KIND_PROFILE,
};
+use radroots_events_codec::wire::WireEventParts;
+use radroots_identity::RadrootsIdentity;
use radroots_nostr::prelude::{
RadrootsNostrFilter, radroots_event_from_nostr, radroots_nostr_kind,
};
use radroots_replica_db::{ReplicaSql, migrations};
use radroots_replica_sync::{
- RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, radroots_replica_ingest_event,
- radroots_replica_sync_status,
+ RadrootsReplicaEventsError, RadrootsReplicaIngestOutcome, RadrootsReplicaPendingPublishEvent,
+ radroots_replica_ingest_event, radroots_replica_ingest_event_state,
+ radroots_replica_pending_publish_batch, radroots_replica_sync_status,
};
use radroots_sql_core::SqliteExecutor;
@@ -23,18 +26,23 @@ use crate::domain::runtime::{
SyncWatchFrameView, SyncWatchView,
};
use crate::runtime::RuntimeError;
-use crate::runtime::config::RuntimeConfig;
+use crate::runtime::accounts;
+use crate::runtime::config::{PublishMode, RuntimeConfig};
use crate::runtime::direct_relay::{
- DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, fetch_events_from_relays,
+ DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt, DirectRelayPublishError,
+ DirectRelayPublishReceipt, fetch_events_from_relays, publish_parts_with_identity,
};
use crate::runtime_args::SyncWatchArgs;
const SYNC_SOURCE: &str = "local replica · local first";
-const RELAY_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync pull";
+const RELAY_PULL_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync pull";
+const RELAY_PUSH_SETUP_ACTION: &str = "radroots --relay wss://relay.example.com sync push";
const SYNC_PULL_ACTION: &str = "radroots sync pull";
+const SYNC_PUSH_ACTION: &str = "radroots sync push";
const SYNC_READY_ACTION: &str = "radroots market product search eggs";
const MARKET_READY_ACTION: &str = "radroots market product search eggs";
const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest";
+const PUBLISH_SOURCE: &str = "direct Nostr relay publish · local replica sync";
const RELAY_FETCH_LIMIT: usize = 1_000;
const MARKET_REFRESH_KINDS: &[u32] = &[KIND_PROFILE, KIND_FARM, KIND_LISTING];
const SYNC_PULL_KINDS: &[u32] = &[
@@ -143,6 +151,8 @@ where
view.target_relays = config.relay.urls.clone();
view.fetched_count = Some(0);
view.ingested_count = Some(0);
+ view.publishable_count = None;
+ view.published_count = None;
view.skipped_count = Some(0);
view.unsupported_count = Some(0);
view.failed_count = Some(0);
@@ -194,9 +204,12 @@ where
},
target_relays: receipt.target_relays,
connected_relays: receipt.connected_relays,
+ acknowledged_relays: Vec::new(),
failed_relays: relay_failures(receipt.failed_relays),
fetched_count: Some(ingest.fetched_count),
ingested_count: Some(ingest.ingested_count),
+ publishable_count: None,
+ published_count: None,
skipped_count: Some(ingest.skipped_count),
unsupported_count: Some(ingest.unsupported_count),
failed_count: Some(ingest.failed_count),
@@ -206,11 +219,176 @@ where
}
pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
- narrowed_action(
+ push_with_publisher(config, |identity, relay_urls, event| {
+ publish_parts_with_identity(
+ identity,
+ relay_urls,
+ WireEventParts {
+ kind: event.draft.kind,
+ content: event.draft.content.clone(),
+ tags: event.draft.tags.clone(),
+ },
+ )
+ })
+}
+
+fn push_with_publisher<F>(
+ config: &RuntimeConfig,
+ mut publisher: F,
+) -> Result<SyncActionView, RuntimeError>
+where
+ F: FnMut(
+ &RadrootsIdentity,
+ &[String],
+ &RadrootsReplicaPendingPublishEvent,
+ ) -> Result<DirectRelayPublishReceipt, DirectRelayPublishError>,
+{
+ let snapshot = inspect_sync(config)?;
+ if snapshot.state == "unconfigured" {
+ return Ok(push_unconfigured_view(snapshot));
+ }
+
+ if matches!(config.publish.mode, PublishMode::Radrootsd) {
+ let mut view = empty_action_from_snapshot(snapshot, "push");
+ view.state = "unavailable".to_owned();
+ view.reason = Some(
+ "sync push is only available in publish mode `nostr_relay`; radrootsd sync push is not implemented"
+ .to_owned(),
+ );
+ view.actions = vec!["radroots --publish-mode nostr_relay sync push".to_owned()];
+ return Ok(view);
+ }
+
+ let signing = match accounts::resolve_local_signing_identity(config) {
+ Ok(signing) => signing,
+ Err(RuntimeError::Account(failure)) => {
+ let mut view = empty_action_from_snapshot(snapshot, "push");
+ view.state = "unconfigured".to_owned();
+ view.reason = Some(failure.to_string());
+ view.actions = vec![
+ "radroots account create".to_owned(),
+ "radroots account attach-secret".to_owned(),
+ ];
+ return Ok(view);
+ }
+ Err(error) => return Err(error),
+ };
+
+ let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
+ migrations::run_all_up(&executor)?;
+ let batch = radroots_replica_pending_publish_batch(&executor)?;
+ let selected_pubkey = signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .as_str();
+ let mut counts = SyncPushCounts::from_batch(&batch);
+ let publishable_events = batch
+ .pending_events
+ .iter()
+ .filter(|event| {
+ if event.author.eq_ignore_ascii_case(selected_pubkey) {
+ true
+ } else {
+ counts.skipped_count += 1;
+ false
+ }
+ })
+ .collect::<Vec<_>>();
+ counts.publishable_count = publishable_events.len();
+
+ if config.output.dry_run {
+ let state = if counts.publishable_count > 0 {
+ "dry_run"
+ } else {
+ "ready"
+ };
+ return Ok(push_view(
+ config,
+ state,
+ SyncQueueView {
+ expected_count: batch.expected_count,
+ pending_count: batch.pending_count,
+ },
+ snapshot.freshness,
+ counts,
+ Vec::new(),
+ Vec::new(),
+ Vec::new(),
+ Some("dry run requested; relay publish skipped".to_owned()),
+ if batch.pending_count > 0 {
+ vec![SYNC_PUSH_ACTION.to_owned()]
+ } else {
+ vec!["radroots sync status get".to_owned()]
+ },
+ ));
+ }
+
+ let mut connected_relays = Vec::new();
+ let mut acknowledged_relays = Vec::new();
+ let mut failed_relays = Vec::new();
+
+ for event in publishable_events {
+ match publisher(&signing.identity, &config.relay.urls, event) {
+ Ok(receipt) => {
+ push_unique_many(&mut connected_relays, receipt.connected_relays.iter());
+ push_unique_many(&mut acknowledged_relays, receipt.acknowledged_relays.iter());
+ failed_relays.extend(relay_failures(receipt.failed_relays));
+ let signed_event = radroots_event_from_nostr(&receipt.event);
+ radroots_replica_ingest_event_state(
+ &executor,
+ &signed_event,
+ event.d_tag.as_str(),
+ event.content_hash.as_str(),
+ )?;
+ counts.published_count += 1;
+ }
+ Err(error) => {
+ counts.failed_count += 1;
+ let failure = sync_push_publish_failure(error);
+ push_unique_many(&mut connected_relays, failure.connected_relays.iter());
+ failed_relays.extend(failure.failed_relays);
+ if counts.first_failure_reason.is_none() {
+ counts.first_failure_reason = Some(failure.reason);
+ }
+ break;
+ }
+ }
+ }
+
+ let queue = radroots_replica_sync_status(&executor)?;
+ let freshness = freshness_from_executor(&executor)?;
+ let state = if counts.failed_count > 0 && counts.published_count > 0 {
+ "partial"
+ } else if counts.failed_count > 0 {
+ "unavailable"
+ } else if counts.published_count > 0 {
+ "published"
+ } else {
+ "ready"
+ };
+ let reason = counts.reason();
+ let actions = match state {
+ "published" | "ready" => vec!["radroots sync status get".to_owned()],
+ _ => vec![SYNC_PUSH_ACTION.to_owned()],
+ };
+
+ Ok(push_view(
config,
- "push",
- "relay publish is not wired into `radroots sync push` yet",
- )
+ state,
+ SyncQueueView {
+ expected_count: queue.expected_count,
+ pending_count: queue.pending_count,
+ },
+ freshness,
+ counts,
+ connected_relays,
+ acknowledged_relays,
+ failed_relays,
+ reason,
+ actions,
+ ))
}
pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchView, RuntimeError> {
@@ -251,42 +429,6 @@ pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchVi
})
}
-fn narrowed_action(
- config: &RuntimeConfig,
- direction: &str,
- unavailable_reason: &str,
-) -> Result<SyncActionView, RuntimeError> {
- let snapshot = inspect_sync(config)?;
- if snapshot.state == "unconfigured" {
- return Ok(empty_action_from_snapshot(snapshot, direction));
- }
-
- let mut actions = vec!["radroots sync status get".to_owned()];
- actions.extend(snapshot.actions);
-
- Ok(SyncActionView {
- direction: direction.to_owned(),
- state: "unavailable".to_owned(),
- source: snapshot.source,
- local_root: snapshot.local_root,
- replica_db: snapshot.replica_db,
- relay_count: snapshot.relay_count,
- publish_policy: snapshot.publish_policy,
- freshness: snapshot.freshness,
- queue: snapshot.queue,
- target_relays: Vec::new(),
- connected_relays: Vec::new(),
- failed_relays: Vec::new(),
- fetched_count: None,
- ingested_count: None,
- skipped_count: None,
- unsupported_count: None,
- failed_count: None,
- reason: Some(unavailable_reason.to_owned()),
- actions,
- })
-}
-
fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncActionView {
SyncActionView {
direction: direction.to_owned(),
@@ -300,9 +442,12 @@ fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncAc
queue: snapshot.queue,
target_relays: Vec::new(),
connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
failed_relays: Vec::new(),
fetched_count: None,
ingested_count: None,
+ publishable_count: None,
+ published_count: None,
skipped_count: None,
unsupported_count: None,
failed_count: None,
@@ -311,6 +456,128 @@ fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncAc
}
}
+fn push_unconfigured_view(snapshot: SyncSnapshot) -> SyncActionView {
+ let mut view = empty_action_from_snapshot(snapshot, "push");
+ if view.replica_db == "ready" && view.relay_count == 0 {
+ view.actions = vec![RELAY_PUSH_SETUP_ACTION.to_owned()];
+ }
+ view
+}
+
+fn push_view(
+ config: &RuntimeConfig,
+ state: &str,
+ queue: SyncQueueView,
+ freshness: SyncFreshnessView,
+ counts: SyncPushCounts,
+ connected_relays: Vec<String>,
+ acknowledged_relays: Vec<String>,
+ failed_relays: Vec<RelayFailureView>,
+ reason: Option<String>,
+ actions: Vec<String>,
+) -> SyncActionView {
+ SyncActionView {
+ direction: "push".to_owned(),
+ state: state.to_owned(),
+ source: PUBLISH_SOURCE.to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "ready".to_owned(),
+ relay_count: config.relay.urls.len(),
+ publish_policy: config.relay.publish_policy.as_str().to_owned(),
+ freshness,
+ queue,
+ target_relays: config.relay.urls.clone(),
+ connected_relays,
+ acknowledged_relays,
+ failed_relays,
+ fetched_count: None,
+ ingested_count: None,
+ publishable_count: Some(counts.publishable_count),
+ published_count: Some(counts.published_count),
+ skipped_count: Some(counts.skipped_count),
+ unsupported_count: Some(counts.unsupported_count),
+ failed_count: Some(counts.failed_count),
+ reason,
+ actions,
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+struct SyncPushCounts {
+ pending_count: usize,
+ publishable_count: usize,
+ published_count: usize,
+ skipped_count: usize,
+ unsupported_count: usize,
+ failed_count: usize,
+ first_failure_reason: Option<String>,
+}
+
+impl SyncPushCounts {
+ fn from_batch(batch: &radroots_replica_sync::RadrootsReplicaPendingPublishBatch) -> Self {
+ Self {
+ pending_count: batch.pending_count,
+ ..Self::default()
+ }
+ }
+
+ fn reason(&self) -> Option<String> {
+ if self.failed_count > 0 {
+ return Some(match &self.first_failure_reason {
+ Some(reason) => format!(
+ "{} pending event(s) failed publish: {reason}",
+ self.failed_count
+ ),
+ None => format!("{} pending event(s) failed publish", self.failed_count),
+ });
+ }
+ if self.pending_count > 0 && self.publishable_count == 0 {
+ return Some(
+ "pending local replica events belong to another author and were not signed"
+ .to_owned(),
+ );
+ }
+ None
+ }
+}
+
+#[derive(Debug, Clone)]
+struct SyncPushPublishFailure {
+ reason: String,
+ connected_relays: Vec<String>,
+ failed_relays: Vec<RelayFailureView>,
+}
+
+fn sync_push_publish_failure(error: DirectRelayPublishError) -> SyncPushPublishFailure {
+ match error {
+ DirectRelayPublishError::Connect {
+ reason,
+ connected_relays,
+ failed_relays,
+ ..
+ } => SyncPushPublishFailure {
+ reason: format!("direct relay connection failed: {reason}"),
+ connected_relays,
+ failed_relays: relay_failures(failed_relays),
+ },
+ DirectRelayPublishError::Publish {
+ reason,
+ connected_relays,
+ failed_relays,
+ ..
+ } => SyncPushPublishFailure {
+ reason: format!("direct relay publish failed: {reason}"),
+ connected_relays,
+ failed_relays: relay_failures(failed_relays),
+ },
+ other => SyncPushPublishFailure {
+ reason: other.to_string(),
+ connected_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ },
+ }
+}
+
fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
if !config.local.replica_db_path.exists() {
return Ok(SyncSnapshot {
@@ -343,7 +610,7 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
let mut actions = Vec::new();
if relay_count == 0 {
- actions.push(RELAY_SETUP_ACTION.to_owned());
+ actions.push(RELAY_PULL_SETUP_ACTION.to_owned());
return Ok(SyncSnapshot {
state: "unconfigured".to_owned(),
source: SYNC_SOURCE.to_owned(),
@@ -363,7 +630,7 @@ fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
actions.push(SYNC_PULL_ACTION.to_owned());
if queue.pending_count > 0 {
- actions.push("radroots sync push".to_owned());
+ actions.push(SYNC_PUSH_ACTION.to_owned());
}
Ok(SyncSnapshot {
@@ -513,6 +780,14 @@ fn relay_failures(failures: Vec<DirectRelayFailure>) -> Vec<RelayFailureView> {
.collect()
}
+fn push_unique_many<'a>(target: &mut Vec<String>, values: impl Iterator<Item = &'a String>) {
+ for value in values {
+ if !target.contains(value) {
+ target.push(value.clone());
+ }
+ }
+}
+
fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -549,13 +824,18 @@ mod tests {
use radroots_nostr::prelude::{
RadrootsNostrEvent, RadrootsNostrFilter, radroots_nostr_build_event,
};
+ use radroots_replica_db::{farm, migrations};
+ use radroots_replica_db_schema::farm::IFarmFields;
+ use radroots_replica_sync::radroots_replica_sync_status;
use radroots_runtime_paths::RadrootsMigrationReport;
use radroots_secret_vault::RadrootsSecretBackend;
+ use radroots_sql_core::SqliteExecutor;
use tempfile::tempdir;
use super::{
- DirectRelayFetchError, DirectRelayFetchReceipt, market_refresh_with_fetcher,
- pull_with_fetcher,
+ DirectRelayFailure, DirectRelayFetchError, DirectRelayFetchReceipt,
+ DirectRelayPublishReceipt, market_refresh_with_fetcher, pull_with_fetcher,
+ push_with_publisher,
};
use crate::runtime::config::{
AccountConfig, AccountSecretContractConfig, HyfConfig, IdentityConfig, InteractionConfig,
@@ -607,6 +887,136 @@ mod tests {
}
#[test]
+ fn sync_push_dry_run_reports_pending_without_publish_or_state_update() {
+ let dir = tempdir().expect("tempdir");
+ let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ config.output.dry_run = true;
+ crate::runtime::local::init(&config).expect("store init");
+ let signing =
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ seed_replica_farm(
+ &config,
+ signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .as_str(),
+ );
+
+ let view = push_with_publisher(&config, |_, _, _| panic!("dry run must not publish"))
+ .expect("sync push dry run");
+ let status = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status");
+
+ assert_eq!(view.state, "dry_run");
+ assert_eq!(view.target_relays, vec!["wss://relay.example.com"]);
+ assert_eq!(view.publishable_count, Some(status.pending_count));
+ assert_eq!(view.published_count, Some(0));
+ assert_eq!(view.failed_count, Some(0));
+ assert!(status.pending_count > 0);
+ }
+
+ #[test]
+ fn sync_push_publishes_pending_local_author_events_and_updates_state() {
+ let dir = tempdir().expect("tempdir");
+ let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ crate::runtime::local::init(&config).expect("store init");
+ let signing =
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ seed_replica_farm(
+ &config,
+ signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .as_str(),
+ );
+ let before = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status before");
+
+ let view = push_with_publisher(&config, |identity, relays, event| {
+ let signed = signed_event(
+ identity,
+ WireEventParts {
+ kind: event.draft.kind,
+ content: event.draft.content.clone(),
+ tags: event.draft.tags.clone(),
+ },
+ );
+ Ok(DirectRelayPublishReceipt {
+ event_id: signed.id.to_hex(),
+ created_at: u32::try_from(signed.created_at.as_secs()).unwrap_or(u32::MAX),
+ signature: signed.sig.to_string(),
+ event: signed,
+ target_relays: relays.to_vec(),
+ connected_relays: relays.to_vec(),
+ acknowledged_relays: relays.to_vec(),
+ failed_relays: Vec::new(),
+ })
+ })
+ .expect("sync push");
+ let after = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status after");
+
+ assert!(before.pending_count > 0);
+ assert_eq!(view.state, "published");
+ assert_eq!(view.published_count, Some(before.pending_count));
+ assert_eq!(view.failed_count, Some(0));
+ assert_eq!(view.connected_relays, vec!["wss://relay.example.com"]);
+ assert_eq!(view.acknowledged_relays, vec!["wss://relay.example.com"]);
+ assert_eq!(after.pending_count, 0);
+ }
+
+ #[test]
+ fn sync_push_failed_publish_leaves_pending_state_retryable() {
+ let dir = tempdir().expect("tempdir");
+ let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ crate::runtime::local::init(&config).expect("store init");
+ let signing =
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ seed_replica_farm(
+ &config,
+ signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .as_str(),
+ );
+
+ let view = push_with_publisher(&config, |_, relays, _| {
+ Err(super::DirectRelayPublishError::Publish {
+ event_id: "0".repeat(64),
+ reason: "relay refused event".to_owned(),
+ target_relays: relays.to_vec(),
+ connected_relays: relays.to_vec(),
+ failed_relays: vec![DirectRelayFailure {
+ relay: relays[0].clone(),
+ reason: "relay refused event".to_owned(),
+ }],
+ })
+ })
+ .expect("sync push failure view");
+ let status = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status");
+
+ assert_eq!(view.state, "unavailable");
+ assert_eq!(view.published_count, Some(0));
+ assert_eq!(view.failed_count, Some(1));
+ assert!(status.pending_count > 0);
+ }
+
+ #[test]
fn sync_pull_ingests_relay_events_and_market_reads_without_daemon() {
let dir = tempdir().expect("tempdir");
let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
@@ -863,6 +1273,28 @@ mod tests {
.expect("signed event")
}
+ fn seed_replica_farm(config: &RuntimeConfig, pubkey: &str) {
+ let executor = SqliteExecutor::open(&config.local.replica_db_path).expect("open replica");
+ migrations::run_all_up(&executor).expect("migrations");
+ let _ = farm::create(
+ &executor,
+ &IFarmFields {
+ d_tag: FARM_D_TAG.to_owned(),
+ pubkey: pubkey.to_owned(),
+ name: "Local Farm".to_owned(),
+ about: Some("local replica farm".to_owned()),
+ website: None,
+ picture: None,
+ banner: None,
+ location_primary: None,
+ location_city: None,
+ location_region: None,
+ location_country: None,
+ },
+ )
+ .expect("farm");
+ }
+
fn identity(seed: u8) -> RadrootsIdentity {
RadrootsIdentity::from_secret_key_bytes(&[seed; 32]).expect("identity")
}
diff --git a/tests/target_cli.rs b/tests/target_cli.rs
@@ -1725,7 +1725,6 @@ fn target_outputs_do_not_suggest_removed_command_families() {
["--format", "json", "market", "listing", "get", "eggs"].as_slice(),
["--format", "json", "listing", "get", "eggs"].as_slice(),
["--format", "json", "listing", "list"].as_slice(),
- ["--format", "json", "sync", "status", "get"].as_slice(),
[
"--format",
"json",
@@ -1738,6 +1737,13 @@ fn target_outputs_do_not_suggest_removed_command_families() {
let value = sandbox.json_success(args);
assert_no_removed_command_reference(&value, args);
}
+
+ let sync_args = ["--format", "json", "sync", "status", "get"];
+ let (output, value) = sandbox.json_output(&sync_args);
+ assert!(!output.status.success());
+ assert_eq!(value["operation_id"], "sync.status.get");
+ assert_eq!(value["errors"][0]["code"], "operation_unavailable");
+ assert_no_removed_command_reference(&value, &sync_args);
}
#[test]
@@ -2091,6 +2097,10 @@ fn offline_forbids_external_network_operations() {
["--format", "json", "--offline", "sync", "pull"].as_slice(),
),
(
+ "sync.push",
+ ["--format", "json", "--offline", "sync", "push"].as_slice(),
+ ),
+ (
"market.refresh",
["--format", "json", "--offline", "market", "refresh"].as_slice(),
),
@@ -2233,6 +2243,21 @@ fn offline_allows_supported_external_dry_run() {
assert_eq!(publish["operation_id"], "listing.publish");
assert_eq!(publish["result"]["state"], "dry_run");
+
+ sandbox.json_success(&["--format", "json", "store", "init"]);
+ let sync_push = sandbox.json_success(&[
+ "--format",
+ "json",
+ "--offline",
+ "--relay",
+ "ws://127.0.0.1:9",
+ "--dry-run",
+ "sync",
+ "push",
+ ]);
+
+ assert_eq!(sync_push["operation_id"], "sync.push");
+ assert_eq!(sync_push["result"]["state"], "ready");
}
#[test]
@@ -2433,6 +2458,14 @@ fn listing_publish_invalid_draft_returns_validation_failure() {
fn online_requires_relay_for_external_network_operations() {
for (operation_id, args) in [
(
+ "sync.pull",
+ ["--format", "json", "--online", "sync", "pull"].as_slice(),
+ ),
+ (
+ "sync.push",
+ ["--format", "json", "--online", "sync", "push"].as_slice(),
+ ),
+ (
"market.refresh",
["--format", "json", "--online", "market", "refresh"].as_slice(),
),
@@ -2882,17 +2915,23 @@ fn buyer_market_sync_basket_dry_runs_preflight_without_mutating_local_state() {
assert_eq!(market["result"]["state"], "unconfigured");
assert_eq!(market["result"]["replica_db"], "missing");
- let sync_pull = sandbox.json_success(&["--format", "json", "--dry-run", "sync", "pull"]);
+ let (sync_pull_output, sync_pull) =
+ sandbox.json_output(&["--format", "json", "--dry-run", "sync", "pull"]);
+ assert!(!sync_pull_output.status.success());
assert_eq!(sync_pull["operation_id"], "sync.pull");
assert_eq!(sync_pull["dry_run"], true);
- assert_eq!(sync_pull["result"]["state"], "unconfigured");
- assert_eq!(sync_pull["result"]["replica_db"], "missing");
+ assert_eq!(sync_pull["errors"][0]["code"], "operation_unavailable");
+ assert_eq!(sync_pull["errors"][0]["detail"]["state"], "unconfigured");
+ assert_eq!(sync_pull["errors"][0]["detail"]["replica_db"], "missing");
- let sync_push = sandbox.json_success(&["--format", "json", "--dry-run", "sync", "push"]);
+ let (sync_push_output, sync_push) =
+ sandbox.json_output(&["--format", "json", "--dry-run", "sync", "push"]);
+ assert!(!sync_push_output.status.success());
assert_eq!(sync_push["operation_id"], "sync.push");
assert_eq!(sync_push["dry_run"], true);
- assert_eq!(sync_push["result"]["state"], "unconfigured");
- assert_eq!(sync_push["result"]["replica_db"], "missing");
+ assert_eq!(sync_push["errors"][0]["code"], "operation_unavailable");
+ assert_eq!(sync_push["errors"][0]["detail"]["state"], "unconfigured");
+ assert_eq!(sync_push["errors"][0]["detail"]["replica_db"], "missing");
sandbox.json_success(&["--format", "json", "store", "init"]);
let relay_refresh = sandbox.json_success(&[
@@ -2914,6 +2953,25 @@ fn buyer_market_sync_basket_dry_runs_preflight_without_mutating_local_state() {
assert_eq!(relay_refresh["result"]["fetched_count"], 0);
assert_eq!(relay_refresh["result"]["ingested_count"], 0);
+ let sync_push_ready = sandbox.json_success(&[
+ "--format",
+ "json",
+ "--relay",
+ "ws://127.0.0.1:9",
+ "--dry-run",
+ "sync",
+ "push",
+ ]);
+ assert_eq!(sync_push_ready["operation_id"], "sync.push");
+ assert_eq!(sync_push_ready["dry_run"], true);
+ assert_eq!(sync_push_ready["result"]["state"], "ready");
+ assert_eq!(
+ sync_push_ready["result"]["target_relays"][0],
+ "ws://127.0.0.1:9"
+ );
+ assert_eq!(sync_push_ready["result"]["publishable_count"], 0);
+ assert_eq!(sync_push_ready["result"]["published_count"], 0);
+
let empty_search =
sandbox.json_success(&["--format", "json", "market", "product", "search", "eggs"]);
assert_eq!(empty_search["operation_id"], "market.product.search");
@@ -3038,6 +3096,11 @@ fn required_approval_token_rejects_absent_empty_and_whitespace_values() {
"listing.archive",
&["listing", "archive", "missing-listing.toml"],
);
+ assert_required_approval_token_rejected(
+ &sandbox,
+ "sync.push",
+ &["--relay", "ws://127.0.0.1:9", "sync", "push"],
+ );
assert_required_approval_token_rejected(&sandbox, "order.submit", &["order", "submit"]);
assert_required_approval_token_rejected(&sandbox, "order.accept", &["order", "accept"]);
assert_required_approval_token_rejected(