commit 2a407325b8429227ccab48b9c9afbcd56ccbf396
parent 39498026feb0c7afe94a65d3142dff82fdc3cabb
Author: triesap <tyson@radroots.org>
Date: Sat, 9 May 2026 07:41:54 +0000
cli: harden sync push closeout
- reject radrootsd sync push before runtime preflights
- normalize sync push author grouping and account recovery actions
- add mixed-author sync push process envelope coverage
- prove radrootsd and mixed-author behavior through focused tests
Diffstat:
3 files changed, 391 insertions(+), 20 deletions(-)
diff --git a/src/operation_runtime.rs b/src/operation_runtime.rs
@@ -1,5 +1,5 @@
use serde::Serialize;
-use serde_json::Value;
+use serde_json::{Value, json};
use crate::domain::runtime::{CommandDisposition, SyncActionView, SyncStatusView};
use crate::operation_adapter::{
@@ -10,7 +10,7 @@ use crate::operation_adapter::{
SyncWatchResult,
};
use crate::runtime::RuntimeError;
-use crate::runtime::config::RuntimeConfig;
+use crate::runtime::config::{PublishMode, RuntimeConfig};
use crate::runtime_args::SyncWatchArgs;
pub struct RuntimeOperationService<'a> {
@@ -78,6 +78,9 @@ impl OperationService<SyncPushRequest> for RuntimeOperationService<'_> {
&self,
request: OperationRequest<SyncPushRequest>,
) -> Result<OperationResult<Self::Result>, OperationAdapterError> {
+ if matches!(self.config.publish.mode, PublishMode::Radrootsd) {
+ return Err(sync_push_radrootsd_unavailable(self.config));
+ }
if request.context.requires_approval_token() {
return Err(OperationAdapterError::approval_required("sync.push"));
}
@@ -105,6 +108,26 @@ impl OperationService<SyncWatchRequest> for RuntimeOperationService<'_> {
}
}
+fn sync_push_radrootsd_unavailable(config: &RuntimeConfig) -> OperationAdapterError {
+ OperationAdapterError::operation_unavailable_with_detail(
+ "sync.push",
+ crate::runtime::sync::RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON.to_owned(),
+ json!({
+ "publish": {
+ "mode": config.publish.mode.as_str(),
+ "source": config.publish.source.as_str(),
+ "transport_family": config.publish.mode.transport_family(),
+ "state": "unavailable",
+ "executable": false,
+ "provider": {
+ "provider_runtime_id": "radrootsd",
+ "state": "unavailable",
+ }
+ }
+ }),
+ )
+}
+
fn serialized_operation_result<R, T>(value: &T) -> Result<OperationResult<R>, OperationAdapterError>
where
R: OperationResultData,
@@ -220,7 +243,7 @@ mod tests {
use super::RuntimeOperationService;
use crate::operation_adapter::{
OperationAdapter, OperationContext, OperationRequest, RelayListRequest,
- SignerStatusGetRequest, SyncStatusGetRequest,
+ SignerStatusGetRequest, SyncPushRequest, SyncStatusGetRequest,
};
use crate::runtime::config::{
AccountConfig, AccountSecretContractConfig, HyfConfig, IdentityConfig, InteractionConfig,
@@ -277,6 +300,31 @@ mod tests {
assert!(error.to_string().contains("sync.status.get"));
}
+ #[test]
+ fn runtime_service_rejects_radrootsd_sync_push_before_approval_or_store_checks() {
+ let dir = tempdir().expect("tempdir");
+ let mut config = sample_config(dir.path(), Vec::new());
+ config.publish.mode = PublishMode::Radrootsd;
+ config.publish.source = PublishModeSource::Flags;
+ let service = OperationAdapter::new(RuntimeOperationService::new(&config));
+
+ let sync = OperationRequest::new(OperationContext::default(), SyncPushRequest::default())
+ .expect("sync push request");
+ let error = service.execute(sync).expect_err("radrootsd sync push");
+ let output = error.to_output_error();
+ let detail = output.detail.expect("radrootsd detail");
+
+ assert_eq!(output.code, "operation_unavailable");
+ assert_eq!(output.exit_code, 3);
+ assert_eq!(detail["publish"]["mode"], "radrootsd");
+ assert_eq!(detail["publish"]["source"], "cli flags · local first");
+ assert_eq!(
+ detail["publish"]["provider"]["provider_runtime_id"],
+ "radrootsd"
+ );
+ assert_eq!(detail["class"], "operation");
+ }
+
fn sample_config(root: &Path, relays: Vec<String>) -> RuntimeConfig {
let data = root.join("data");
let logs = root.join("logs");
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -45,6 +45,7 @@ const SYNC_READY_ACTION: &str = "radroots market product search eggs";
const MARKET_READY_ACTION: &str = "radroots market product search eggs";
const INGEST_SOURCE: &str = "direct Nostr relay fetch · local replica ingest";
const PUBLISH_SOURCE: &str = "direct Nostr relay publish · local replica sync";
+pub(crate) const RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON: &str = "sync push is only available in publish mode `nostr_relay`; radrootsd sync push is not implemented";
const RELAY_FETCH_LIMIT: usize = 1_000;
const MARKET_REFRESH_KINDS: &[u32] = &[KIND_PROFILE, KIND_FARM, KIND_LISTING];
const SYNC_PULL_KINDS: &[u32] = &[
@@ -246,22 +247,15 @@ where
&RadrootsReplicaPendingPublishEvent,
) -> Result<DirectRelayPublishReceipt, DirectRelayPublishError>,
{
+ if matches!(config.publish.mode, PublishMode::Radrootsd) {
+ return Ok(push_radrootsd_unavailable_view(config));
+ }
+
let snapshot = inspect_sync(config)?;
if snapshot.state == "unconfigured" {
return Ok(push_unconfigured_view(snapshot));
}
- if matches!(config.publish.mode, PublishMode::Radrootsd) {
- let mut view = empty_action_from_snapshot(snapshot, "push");
- view.state = "unavailable".to_owned();
- view.reason = Some(
- "sync push is only available in publish mode `nostr_relay`; radrootsd sync push is not implemented"
- .to_owned(),
- );
- view.actions = vec!["radroots --publish-mode nostr_relay sync push".to_owned()];
- return Ok(view);
- }
-
let signing = match accounts::resolve_local_signing_identity(config) {
Ok(signing) => signing,
Err(RuntimeError::Account(failure)) => {
@@ -448,6 +442,42 @@ fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncAc
}
}
+fn push_radrootsd_unavailable_view(config: &RuntimeConfig) -> SyncActionView {
+ SyncActionView {
+ direction: "push".to_owned(),
+ state: "unavailable".to_owned(),
+ source: PUBLISH_SOURCE.to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "not_checked".to_owned(),
+ relay_count: config.relay.urls.len(),
+ publish_policy: config.relay.publish_policy.as_str().to_owned(),
+ freshness: SyncFreshnessView {
+ state: "not_checked".to_owned(),
+ display: "not checked".to_owned(),
+ age_seconds: None,
+ last_event_at: None,
+ },
+ queue: SyncQueueView {
+ expected_count: 0,
+ pending_count: 0,
+ },
+ target_relays: config.relay.urls.clone(),
+ connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ fetched_count: None,
+ ingested_count: None,
+ publishable_count: None,
+ published_count: None,
+ skipped_count: None,
+ unsupported_count: None,
+ failed_count: None,
+ publish_plan: None,
+ reason: Some(RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON.to_owned()),
+ actions: vec!["radroots --publish-mode nostr_relay sync push".to_owned()],
+ }
+}
+
fn push_unconfigured_view(snapshot: SyncSnapshot) -> SyncActionView {
let mut view = empty_action_from_snapshot(snapshot, "push");
if view.replica_db == "ready" && view.relay_count == 0 {
@@ -508,9 +538,11 @@ fn sync_push_plan<'a>(
let mut publishable_events = Vec::new();
let mut event_kinds = BTreeMap::<u32, SyncPublishPlanKindView>::new();
let mut authors = BTreeMap::<String, SyncPublishPlanAuthorView>::new();
+ let selected_author = canonical_pubkey_hex(selected_pubkey);
for event in &batch.pending_events {
- let is_publishable = event.author.eq_ignore_ascii_case(selected_pubkey);
+ let event_author = canonical_pubkey_hex(event.author.as_str());
+ let is_publishable = event_author == selected_author;
let kind = event_kinds
.entry(event.kind)
.or_insert_with(|| SyncPublishPlanKindView {
@@ -525,9 +557,9 @@ fn sync_push_plan<'a>(
let author =
authors
- .entry(event.author.clone())
+ .entry(event_author.clone())
.or_insert_with(|| SyncPublishPlanAuthorView {
- author: event.author.clone(),
+ author: event_author.clone(),
eligibility: if is_publishable {
"selected".to_owned()
} else {
@@ -548,7 +580,7 @@ fn sync_push_plan<'a>(
author.skipped_count += 1;
counts.skipped_count += 1;
if counts.first_skipped_author.is_none() {
- counts.first_skipped_author = Some(event.author.clone());
+ counts.first_skipped_author = Some(event_author);
}
}
}
@@ -559,13 +591,17 @@ fn sync_push_plan<'a>(
counts,
publishable_events,
SyncPublishPlanView {
- selected_author: selected_pubkey.to_owned(),
+ selected_author,
event_kinds: event_kinds.into_values().collect(),
authors: authors.into_values().collect(),
},
)
}
+fn canonical_pubkey_hex(pubkey: &str) -> String {
+ pubkey.to_ascii_lowercase()
+}
+
fn sync_push_dry_run_reason(counts: &SyncPushCounts) -> Option<String> {
match counts.skipped_count {
0 => Some("dry run requested; relay publish skipped".to_owned()),
@@ -1070,8 +1106,9 @@ mod tests {
.public_key_hex
.clone();
let other_pubkey = identity(42).public_key_hex();
+ let other_pubkey_upper = other_pubkey.to_ascii_uppercase();
seed_replica_farm(&config, selected_pubkey.as_str());
- seed_replica_farm(&config, other_pubkey.as_str());
+ seed_replica_farm(&config, other_pubkey_upper.as_str());
let view = push_with_publisher(&config, |_, _, _| panic!("dry run must not publish"))
.expect("sync push dry run");
@@ -1312,6 +1349,31 @@ mod tests {
}
#[test]
+ fn sync_push_rejects_radrootsd_before_store_relay_or_signer_checks() {
+ let dir = tempdir().expect("tempdir");
+ let mut config = sample_config(dir.path(), Vec::new());
+ config.publish.mode = PublishMode::Radrootsd;
+
+ let view = push_with_publisher(&config, |_, _, _| {
+ panic!("radrootsd sync push must not publish")
+ })
+ .expect("radrootsd sync push view");
+
+ assert_eq!(view.state, "unavailable");
+ assert_eq!(view.replica_db, "not_checked");
+ assert_eq!(view.relay_count, 0);
+ assert_eq!(
+ view.reason.as_deref(),
+ Some(super::RADROOTSD_SYNC_PUSH_UNAVAILABLE_REASON)
+ );
+ assert_eq!(
+ view.actions,
+ vec!["radroots --publish-mode nostr_relay sync push"]
+ );
+ assert!(!config.local.replica_db_path.exists());
+ }
+
+ #[test]
fn sync_pull_ingests_relay_events_and_market_reads_without_daemon() {
let dir = tempdir().expect("tempdir");
let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
diff --git a/tests/target_cli.rs b/tests/target_cli.rs
@@ -9,6 +9,13 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;
use radroots_events::kinds::{KIND_FARM, KIND_PROFILE};
+use radroots_replica_db::{farm, farm_member_claim, migrations};
+use radroots_replica_db_schema::farm::IFarmFields;
+use radroots_replica_db_schema::farm_member_claim::IFarmMemberClaimFields;
+use radroots_replica_sync::{
+ RadrootsReplicaPendingPublishBatch, radroots_replica_pending_publish_batch,
+};
+use radroots_sql_core::SqliteExecutor;
use serde_json::Value;
use serde_json::json;
@@ -22,6 +29,7 @@ use support::{
const LISTING_ADDR: &str =
"30402:1111111111111111111111111111111111111111111111111111111111111111:AAAAAAAAAAAAAAAAAAAAAg";
+const SYNC_PUSH_FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA";
struct JsonRpcRequest {
headers: String,
@@ -208,6 +216,123 @@ fn content_length(headers: &str) -> usize {
.expect("content-length header")
}
+struct RelayPublishServer {
+ endpoint: String,
+ requests: Receiver<Value>,
+ handle: JoinHandle<()>,
+}
+
+impl RelayPublishServer {
+ fn with_publish_outcomes(outcomes: Vec<(bool, &'static str)>) -> Self {
+ let listener = TcpListener::bind("127.0.0.1:0").expect("bind relay");
+ let endpoint = format!("ws://{}", listener.local_addr().expect("relay addr"));
+ let (tx, requests) = mpsc::channel();
+ let handle = thread::spawn(move || {
+ for (accepted, reason) in outcomes {
+ let (stream, _) = listener.accept().expect("accept relay connection");
+ handle_relay_publish_connection(stream, accepted, reason, &tx);
+ }
+ });
+
+ Self {
+ endpoint,
+ requests,
+ handle,
+ }
+ }
+
+ fn endpoint(&self) -> &str {
+ self.endpoint.as_str()
+ }
+
+ fn take_requests(self, count: usize) -> Vec<Value> {
+ let requests = (0..count)
+ .map(|_| {
+ self.requests
+ .recv_timeout(Duration::from_secs(5))
+ .expect("relay publish request")
+ })
+ .collect::<Vec<_>>();
+ self.handle.join().expect("relay server join");
+ requests
+ }
+}
+
+fn handle_relay_publish_connection(
+ stream: TcpStream,
+ accepted: bool,
+ reason: &str,
+ tx: &mpsc::Sender<Value>,
+) {
+ let mut websocket = tungstenite::accept(stream).expect("accept websocket");
+ let event = read_relay_event_message(&mut websocket);
+ let event_id = event["id"].as_str().expect("event id").to_owned();
+ tx.send(event).expect("relay request send");
+ websocket
+ .send(tungstenite::Message::Text(
+ json!(["OK", event_id, accepted, reason]).to_string().into(),
+ ))
+ .expect("relay ok send");
+}
+
+fn read_relay_event_message(websocket: &mut tungstenite::WebSocket<TcpStream>) -> Value {
+ loop {
+ let message = websocket.read().expect("relay message");
+ if !message.is_text() {
+ continue;
+ }
+ let value: Value =
+ serde_json::from_str(message.to_text().expect("relay text")).expect("relay json");
+ if value.get(0).and_then(Value::as_str) == Some("EVENT") {
+ return value.get(1).cloned().expect("relay event payload");
+ }
+ }
+}
+
+fn seed_sync_push_farm(sandbox: &RadrootsCliSandbox, d_tag: &str, pubkey: &str) {
+ let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica");
+ migrations::run_all_up(&executor).expect("replica migrations");
+ farm::create(
+ &executor,
+ &IFarmFields {
+ d_tag: d_tag.to_owned(),
+ pubkey: pubkey.to_owned(),
+ name: "Sync Push Farm".to_owned(),
+ about: Some("sync push process fixture".to_owned()),
+ website: None,
+ picture: None,
+ banner: None,
+ location_primary: None,
+ location_city: None,
+ location_region: None,
+ location_country: None,
+ },
+ )
+ .expect("seed sync push farm");
+}
+
+fn seed_sync_push_member_claim(
+ sandbox: &RadrootsCliSandbox,
+ member_pubkey: &str,
+ farm_pubkey: &str,
+) {
+ let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica");
+ migrations::run_all_up(&executor).expect("replica migrations");
+ farm_member_claim::create(
+ &executor,
+ &IFarmMemberClaimFields {
+ member_pubkey: member_pubkey.to_owned(),
+ farm_pubkey: farm_pubkey.to_owned(),
+ },
+ )
+ .expect("seed sync push member claim");
+}
+
+fn sync_push_pending_batch(sandbox: &RadrootsCliSandbox) -> RadrootsReplicaPendingPublishBatch {
+ let executor = SqliteExecutor::open(sandbox.replica_db_path()).expect("open replica");
+ radroots_replica_pending_publish_batch(&executor).expect("sync push pending batch")
+}
+
#[test]
fn root_help_exposes_only_target_namespaces() {
let output = radroots().arg("--help").output().expect("run root help");
@@ -2905,6 +3030,142 @@ fn seller_dry_runs_preflight_without_mutating_farm_or_listing_files() {
}
#[test]
+fn sync_push_partial_mixed_author_queue_reports_error_envelope() {
+ let sandbox = RadrootsCliSandbox::new();
+ sandbox.json_success(&["--format", "json", "account", "create"]);
+ let signer = sandbox.json_success(&["--format", "json", "signer", "status", "get"]);
+ let selected_pubkey = signer["result"]["local"]["public_identity"]["public_key_hex"]
+ .as_str()
+ .expect("selected public key");
+ sandbox.json_success(&["--format", "json", "store", "init"]);
+ let other_pubkey = identity_public(81).public_key_hex;
+ let other_pubkey_canonical = other_pubkey.to_ascii_lowercase();
+ seed_sync_push_farm(&sandbox, SYNC_PUSH_FARM_D_TAG, selected_pubkey);
+ seed_sync_push_member_claim(&sandbox, other_pubkey.as_str(), selected_pubkey);
+ let batch = sync_push_pending_batch(&sandbox);
+ let expected_publishable_count = batch
+ .pending_events
+ .iter()
+ .filter(|event| event.author.eq_ignore_ascii_case(selected_pubkey))
+ .count();
+ let expected_skipped_count = batch.pending_count - expected_publishable_count;
+ assert!(expected_publishable_count > 0);
+ assert!(expected_skipped_count > 0);
+ let relay =
+ RelayPublishServer::with_publish_outcomes(vec![(true, ""); expected_publishable_count]);
+ let relay_url = relay.endpoint().to_owned();
+
+ let (output, value) = sandbox.json_output(&[
+ "--format",
+ "json",
+ "--relay",
+ relay_url.as_str(),
+ "--approval-token",
+ "approve",
+ "sync",
+ "push",
+ ]);
+
+ assert!(!output.status.success(), "{value}");
+ assert_eq!(value["operation_id"], "sync.push");
+ assert_eq!(value["result"], Value::Null);
+ assert_eq!(value["errors"][0]["code"], "network_unavailable", "{value}");
+ assert_eq!(value["errors"][0]["detail"]["state"], "partial");
+ assert_eq!(
+ value["errors"][0]["detail"]["queue"]["pending_count"],
+ json!(expected_skipped_count)
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["published_count"],
+ json!(expected_publishable_count)
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["skipped_count"],
+ json!(expected_skipped_count)
+ );
+ assert_contains(
+ &value["errors"][0]["detail"]["reason"],
+ "belong to another author",
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["actions"][1],
+ "radroots account list"
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["actions"][2],
+ format!("radroots --account-id {other_pubkey_canonical} sync push")
+ );
+ assert_eq!(
+ value["next_actions"][2]["command"],
+ format!("radroots --account-id {other_pubkey_canonical} sync push")
+ );
+ let requests = relay.take_requests(expected_publishable_count);
+ assert_eq!(requests.len(), expected_publishable_count);
+ assert!(
+ requests
+ .iter()
+ .all(|request| request["pubkey"] == selected_pubkey)
+ );
+ assert_no_removed_command_reference(&value, &["sync", "push"]);
+ assert_no_daemon_runtime_reference(&value, &["sync", "push"]);
+}
+
+#[test]
+fn sync_push_other_author_only_queue_reports_unconfigured_error_envelope() {
+ let sandbox = RadrootsCliSandbox::new();
+ sandbox.json_success(&["--format", "json", "account", "create"]);
+ sandbox.json_success(&["--format", "json", "store", "init"]);
+ let other_pubkey = identity_public(82).public_key_hex;
+ let other_pubkey_canonical = other_pubkey.to_ascii_lowercase();
+ seed_sync_push_farm(&sandbox, SYNC_PUSH_FARM_D_TAG, other_pubkey.as_str());
+
+ let (output, value) = sandbox.json_output(&[
+ "--format",
+ "json",
+ "--relay",
+ "ws://127.0.0.1:9",
+ "--approval-token",
+ "approve",
+ "sync",
+ "push",
+ ]);
+
+ assert!(!output.status.success(), "{value}");
+ assert_eq!(value["operation_id"], "sync.push");
+ assert_eq!(value["result"], Value::Null);
+ assert_eq!(
+ value["errors"][0]["code"], "operation_unavailable",
+ "{value}"
+ );
+ assert_eq!(value["errors"][0]["detail"]["state"], "unconfigured");
+ let pending_count = value["errors"][0]["detail"]["queue"]["pending_count"]
+ .as_u64()
+ .expect("pending count");
+ assert!(pending_count > 0);
+ assert_eq!(value["errors"][0]["detail"]["publishable_count"], 0);
+ assert_eq!(value["errors"][0]["detail"]["published_count"], 0);
+ assert_eq!(value["errors"][0]["detail"]["skipped_count"], pending_count);
+ assert_contains(
+ &value["errors"][0]["detail"]["reason"],
+ "belong to another author",
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["actions"][1],
+ "radroots account list"
+ );
+ assert_eq!(
+ value["errors"][0]["detail"]["actions"][2],
+ format!("radroots --account-id {other_pubkey_canonical} sync push")
+ );
+ assert_eq!(
+ value["next_actions"][2]["command"],
+ format!("radroots --account-id {other_pubkey_canonical} sync push")
+ );
+ assert_no_removed_command_reference(&value, &["sync", "push"]);
+ assert_no_daemon_runtime_reference(&value, &["sync", "push"]);
+}
+
+#[test]
fn buyer_market_sync_basket_dry_runs_preflight_without_mutating_local_state() {
let sandbox = RadrootsCliSandbox::new();
sandbox.json_success(&["--format", "json", "account", "create"]);