commit 39498026feb0c7afe94a65d3142dff82fdc3cabb
parent a669185d463c2e0ebafefcc47449b9db442351fb
Author: triesap <tyson@radroots.org>
Date: Sat, 9 May 2026 03:36:12 +0000
sync: fix sync push mixed-author truth
- add dry-run publish plan summaries for event kinds and authors
- report skipped other-author work in push states, reasons, and actions
- document direct-relay-only sync push supported draft families
- validate with fmt, focused sync tests, and nested nix check/test
Diffstat:
2 files changed, 368 insertions(+), 30 deletions(-)
diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs
@@ -2921,6 +2921,8 @@ pub struct SyncActionView {
#[serde(skip_serializing_if = "Option::is_none")]
pub failed_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
+ pub publish_plan: Option<SyncPublishPlanView>,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub actions: Vec<String>,
@@ -2939,6 +2941,34 @@ impl SyncActionView {
}
#[derive(Debug, Clone, Serialize)]
+pub struct SyncPublishPlanView {
+ pub selected_author: String,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub event_kinds: Vec<SyncPublishPlanKindView>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub authors: Vec<SyncPublishPlanAuthorView>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncPublishPlanKindView {
+ pub kind: u32,
+ pub pending_count: usize,
+ pub publishable_count: usize,
+ pub skipped_count: usize,
+ pub unsupported_count: usize,
+ pub failed_count: usize,
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncPublishPlanAuthorView {
+ pub author: String,
+ pub eligibility: String,
+ pub pending_count: usize,
+ pub publishable_count: usize,
+ pub skipped_count: usize,
+}
+
+#[derive(Debug, Clone, Serialize)]
pub struct SyncWatchView {
pub state: String,
pub source: String,
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -1,3 +1,4 @@
+use std::collections::BTreeMap;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -22,7 +23,8 @@ use radroots_replica_sync::{
use radroots_sql_core::SqliteExecutor;
use crate::domain::runtime::{
- RelayFailureView, SyncActionView, SyncFreshnessView, SyncQueueView, SyncStatusView,
+ RelayFailureView, SyncActionView, SyncFreshnessView, SyncPublishPlanAuthorView,
+ SyncPublishPlanKindView, SyncPublishPlanView, SyncQueueView, SyncStatusView,
SyncWatchFrameView, SyncWatchView,
};
use crate::runtime::RuntimeError;
@@ -213,6 +215,7 @@ where
skipped_count: Some(ingest.skipped_count),
unsupported_count: Some(ingest.unsupported_count),
failed_count: Some(ingest.failed_count),
+ publish_plan: None,
reason: ingest.reason(),
actions: vec![scope.ready_action().to_owned()],
})
@@ -283,27 +286,16 @@ where
.public_identity
.public_key_hex
.as_str();
- let mut counts = SyncPushCounts::from_batch(&batch);
- let publishable_events = batch
- .pending_events
- .iter()
- .filter(|event| {
- if event.author.eq_ignore_ascii_case(selected_pubkey) {
- true
- } else {
- counts.skipped_count += 1;
- false
- }
- })
- .collect::<Vec<_>>();
- counts.publishable_count = publishable_events.len();
+ let (mut counts, publishable_events, publish_plan) = sync_push_plan(&batch, selected_pubkey);
if config.output.dry_run {
- let state = if counts.publishable_count > 0 {
+ let state = if counts.pending_count > 0 {
"dry_run"
} else {
"ready"
};
+ let reason = sync_push_dry_run_reason(&counts);
+ let actions = sync_push_actions(state, &counts);
return Ok(push_view(
config,
state,
@@ -316,12 +308,9 @@ where
Vec::new(),
Vec::new(),
Vec::new(),
- Some("dry run requested; relay publish skipped".to_owned()),
- if batch.pending_count > 0 {
- vec![SYNC_PUSH_ACTION.to_owned()]
- } else {
- vec!["radroots sync status get".to_owned()]
- },
+ Some(publish_plan),
+ reason,
+ actions,
));
}
@@ -363,16 +352,17 @@ where
"partial"
} else if counts.failed_count > 0 {
"unavailable"
+ } else if counts.published_count > 0 && counts.skipped_count > 0 && queue.pending_count > 0 {
+ "partial"
+ } else if counts.published_count == 0 && counts.skipped_count > 0 && queue.pending_count > 0 {
+ "unconfigured"
} else if counts.published_count > 0 {
"published"
} else {
"ready"
};
let reason = counts.reason();
- let actions = match state {
- "published" | "ready" => vec!["radroots sync status get".to_owned()],
- _ => vec![SYNC_PUSH_ACTION.to_owned()],
- };
+ let actions = sync_push_actions(state, &counts);
Ok(push_view(
config,
@@ -386,6 +376,7 @@ where
connected_relays,
acknowledged_relays,
failed_relays,
+ None,
reason,
actions,
))
@@ -451,6 +442,7 @@ fn empty_action_from_snapshot(snapshot: SyncSnapshot, direction: &str) -> SyncAc
skipped_count: None,
unsupported_count: None,
failed_count: None,
+ publish_plan: None,
reason: snapshot.reason,
actions: snapshot.actions,
}
@@ -473,6 +465,7 @@ fn push_view(
connected_relays: Vec<String>,
acknowledged_relays: Vec<String>,
failed_relays: Vec<RelayFailureView>,
+ publish_plan: Option<SyncPublishPlanView>,
reason: Option<String>,
actions: Vec<String>,
) -> SyncActionView {
@@ -497,11 +490,129 @@ fn push_view(
skipped_count: Some(counts.skipped_count),
unsupported_count: Some(counts.unsupported_count),
failed_count: Some(counts.failed_count),
+ publish_plan,
reason,
actions,
}
}
+fn sync_push_plan<'a>(
+ batch: &'a radroots_replica_sync::RadrootsReplicaPendingPublishBatch,
+ selected_pubkey: &str,
+) -> (
+ SyncPushCounts,
+ Vec<&'a RadrootsReplicaPendingPublishEvent>,
+ SyncPublishPlanView,
+) {
+ let mut counts = SyncPushCounts::from_batch(batch);
+ let mut publishable_events = Vec::new();
+ let mut event_kinds = BTreeMap::<u32, SyncPublishPlanKindView>::new();
+ let mut authors = BTreeMap::<String, SyncPublishPlanAuthorView>::new();
+
+ for event in &batch.pending_events {
+ let is_publishable = event.author.eq_ignore_ascii_case(selected_pubkey);
+ let kind = event_kinds
+ .entry(event.kind)
+ .or_insert_with(|| SyncPublishPlanKindView {
+ kind: event.kind,
+ pending_count: 0,
+ publishable_count: 0,
+ skipped_count: 0,
+ unsupported_count: 0,
+ failed_count: 0,
+ });
+ kind.pending_count += 1;
+
+ let author =
+ authors
+ .entry(event.author.clone())
+ .or_insert_with(|| SyncPublishPlanAuthorView {
+ author: event.author.clone(),
+ eligibility: if is_publishable {
+ "selected".to_owned()
+ } else {
+ "other_author".to_owned()
+ },
+ pending_count: 0,
+ publishable_count: 0,
+ skipped_count: 0,
+ });
+ author.pending_count += 1;
+
+ if is_publishable {
+ kind.publishable_count += 1;
+ author.publishable_count += 1;
+ publishable_events.push(event);
+ } else {
+ kind.skipped_count += 1;
+ author.skipped_count += 1;
+ counts.skipped_count += 1;
+ if counts.first_skipped_author.is_none() {
+ counts.first_skipped_author = Some(event.author.clone());
+ }
+ }
+ }
+
+ counts.publishable_count = publishable_events.len();
+
+ (
+ counts,
+ publishable_events,
+ SyncPublishPlanView {
+ selected_author: selected_pubkey.to_owned(),
+ event_kinds: event_kinds.into_values().collect(),
+ authors: authors.into_values().collect(),
+ },
+ )
+}
+
+fn sync_push_dry_run_reason(counts: &SyncPushCounts) -> Option<String> {
+ match counts.skipped_count {
+ 0 => Some("dry run requested; relay publish skipped".to_owned()),
+ skipped => Some(format!(
+ "dry run requested; relay publish skipped; {skipped} pending event(s) belong to another author and would not be signed"
+ )),
+ }
+}
+
+fn sync_push_actions(state: &str, counts: &SyncPushCounts) -> Vec<String> {
+ let retry_selected_account =
+ counts.failed_count > 0 || counts.publishable_count > counts.published_count;
+ let selected_account_actionable =
+ retry_selected_account || (state == "dry_run" && counts.publishable_count > 0);
+ let mut actions = match state {
+ "published" | "ready" => vec!["radroots sync status get".to_owned()],
+ "dry_run" if selected_account_actionable => {
+ vec![
+ SYNC_PUSH_ACTION.to_owned(),
+ "radroots sync status get".to_owned(),
+ ]
+ }
+ "dry_run" => vec!["radroots sync status get".to_owned()],
+ _ if selected_account_actionable => {
+ vec![
+ SYNC_PUSH_ACTION.to_owned(),
+ "radroots sync status get".to_owned(),
+ ]
+ }
+ _ => vec!["radroots sync status get".to_owned()],
+ };
+
+ if counts.skipped_count > 0 {
+ actions.push("radroots account list".to_owned());
+ if let Some(author) = counts.first_skipped_author.as_deref() {
+ actions.push(format!("radroots --account-id {author} sync push"));
+ }
+ }
+
+ actions.into_iter().fold(Vec::new(), |mut unique, action| {
+ if !unique.contains(&action) {
+ unique.push(action);
+ }
+ unique
+ })
+}
+
#[derive(Debug, Clone, Default)]
struct SyncPushCounts {
pending_count: usize,
@@ -511,6 +622,7 @@ struct SyncPushCounts {
unsupported_count: usize,
failed_count: usize,
first_failure_reason: Option<String>,
+ first_skipped_author: Option<String>,
}
impl SyncPushCounts {
@@ -523,15 +635,22 @@ impl SyncPushCounts {
fn reason(&self) -> Option<String> {
if self.failed_count > 0 {
- return Some(match &self.first_failure_reason {
+ let failure_reason = match &self.first_failure_reason {
Some(reason) => format!(
"{} pending event(s) failed publish: {reason}",
self.failed_count
),
None => format!("{} pending event(s) failed publish", self.failed_count),
- });
+ };
+ if self.skipped_count > 0 {
+ return Some(format!(
+ "{failure_reason}; {} pending event(s) belong to another author and were not signed",
+ self.skipped_count
+ ));
+ }
+ return Some(failure_reason);
}
- if self.pending_count > 0 && self.publishable_count == 0 {
+ if self.pending_count > 0 && self.skipped_count > 0 {
return Some(
"pending local replica events belong to another author and were not signed"
.to_owned(),
@@ -824,8 +943,9 @@ mod tests {
use radroots_nostr::prelude::{
RadrootsNostrEvent, RadrootsNostrFilter, radroots_nostr_build_event,
};
- use radroots_replica_db::{farm, migrations};
+ use radroots_replica_db::{farm, farm_member_claim, migrations};
use radroots_replica_db_schema::farm::IFarmFields;
+ use radroots_replica_db_schema::farm_member_claim::IFarmMemberClaimFields;
use radroots_replica_sync::radroots_replica_sync_status;
use radroots_runtime_paths::RadrootsMigrationReport;
use radroots_secret_vault::RadrootsSecretBackend;
@@ -916,10 +1036,72 @@ mod tests {
assert_eq!(view.publishable_count, Some(status.pending_count));
assert_eq!(view.published_count, Some(0));
assert_eq!(view.failed_count, Some(0));
+ let plan = view.publish_plan.as_ref().expect("publish plan");
+ assert_eq!(
+ plan.selected_author,
+ signing.account.record.public_identity.public_key_hex
+ );
+ assert!(plan.event_kinds.iter().any(|kind| {
+ kind.kind == KIND_FARM
+ && kind.pending_count == 1
+ && kind.publishable_count == 1
+ && kind.skipped_count == 0
+ }));
+ assert!(plan.authors.iter().any(|author| {
+ author.author == signing.account.record.public_identity.public_key_hex
+ && author.eligibility == "selected"
+ && author.publishable_count == status.pending_count
+ }));
assert!(status.pending_count > 0);
}
#[test]
+ fn sync_push_dry_run_reports_other_author_publish_plan() {
+ let dir = tempdir().expect("tempdir");
+ let mut config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ config.output.dry_run = true;
+ crate::runtime::local::init(&config).expect("store init");
+ let signing =
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ let selected_pubkey = signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .clone();
+ let other_pubkey = identity(42).public_key_hex();
+ seed_replica_farm(&config, selected_pubkey.as_str());
+ seed_replica_farm(&config, other_pubkey.as_str());
+
+ let view = push_with_publisher(&config, |_, _, _| panic!("dry run must not publish"))
+ .expect("sync push dry run");
+
+ assert_eq!(view.state, "dry_run");
+ let skipped_count = view.skipped_count.expect("skipped count");
+ assert!(skipped_count > 0);
+ assert!(
+ view.reason
+ .as_deref()
+ .expect("dry-run reason")
+ .contains("belong to another author")
+ );
+ let plan = view.publish_plan.as_ref().expect("publish plan");
+ assert!(plan.event_kinds.iter().any(|kind| kind.skipped_count == 1));
+ assert!(plan.authors.iter().any(|author| {
+ author.author == other_pubkey
+ && author.eligibility == "other_author"
+ && author.pending_count == skipped_count
+ && author.skipped_count == skipped_count
+ }));
+ assert!(view.actions.contains(&"radroots account list".to_owned()));
+ assert!(
+ view.actions
+ .iter()
+ .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push"))
+ );
+ }
+
+ #[test]
fn sync_push_publishes_pending_local_author_events_and_updates_state() {
let dir = tempdir().expect("tempdir");
let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
@@ -976,6 +1158,119 @@ mod tests {
}
#[test]
+ fn sync_push_reports_partial_when_other_author_events_remain_pending() {
+ let dir = tempdir().expect("tempdir");
+ let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ crate::runtime::local::init(&config).expect("store init");
+ let signing =
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ let selected_pubkey = signing
+ .account
+ .record
+ .public_identity
+ .public_key_hex
+ .clone();
+ let other_pubkey = identity(43).public_key_hex();
+ seed_replica_farm(&config, selected_pubkey.as_str());
+ seed_replica_member_claim(&config, other_pubkey.as_str(), selected_pubkey.as_str());
+
+ let before = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status before");
+ let view = push_with_publisher(&config, |identity, relays, event| {
+ assert!(event.author.eq_ignore_ascii_case(selected_pubkey.as_str()));
+ let signed = signed_event(
+ identity,
+ WireEventParts {
+ kind: event.draft.kind,
+ content: event.draft.content.clone(),
+ tags: event.draft.tags.clone(),
+ },
+ );
+ Ok(DirectRelayPublishReceipt {
+ event_id: signed.id.to_hex(),
+ created_at: u32::try_from(signed.created_at.as_secs()).unwrap_or(u32::MAX),
+ signature: signed.sig.to_string(),
+ event: signed,
+ target_relays: relays.to_vec(),
+ connected_relays: relays.to_vec(),
+ acknowledged_relays: relays.to_vec(),
+ failed_relays: Vec::new(),
+ })
+ })
+ .expect("sync push");
+ let after = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status after");
+
+ assert!(before.pending_count > after.pending_count);
+ assert_eq!(view.state, "partial");
+ assert_eq!(view.published_count, Some(before.pending_count - 1));
+ assert_eq!(view.skipped_count, Some(1));
+ assert_eq!(after.pending_count, 1);
+ assert!(
+ view.reason
+ .as_deref()
+ .expect("partial reason")
+ .contains("belong to another author")
+ );
+ assert!(
+ view.actions
+ .contains(&"radroots sync status get".to_owned())
+ );
+ assert!(view.actions.contains(&"radroots account list".to_owned()));
+ assert!(
+ view.actions
+ .iter()
+ .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push"))
+ );
+ }
+
+ #[test]
+ fn sync_push_reports_unconfigured_when_only_other_author_events_are_pending() {
+ let dir = tempdir().expect("tempdir");
+ let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
+ crate::runtime::local::init(&config).expect("store init");
+ crate::runtime::accounts::create_or_migrate_default_account(&config).expect("account");
+ let other_pubkey = identity(44).public_key_hex();
+ seed_replica_farm(&config, other_pubkey.as_str());
+
+ let view = push_with_publisher(&config, |_, _, _| {
+ panic!("other-author-only queue must not publish")
+ })
+ .expect("sync push");
+ let after = radroots_replica_sync_status(
+ &SqliteExecutor::open(&config.local.replica_db_path).expect("open replica"),
+ )
+ .expect("status after");
+
+ assert_eq!(view.state, "unconfigured");
+ assert_eq!(view.publishable_count, Some(0));
+ assert_eq!(view.published_count, Some(0));
+ let skipped_count = view.skipped_count.expect("skipped count");
+ assert!(skipped_count > 0);
+ assert_eq!(after.pending_count, skipped_count);
+ assert!(
+ view.reason
+ .as_deref()
+ .expect("unconfigured reason")
+ .contains("belong to another author")
+ );
+ assert!(
+ view.actions
+ .contains(&"radroots sync status get".to_owned())
+ );
+ assert!(view.actions.contains(&"radroots account list".to_owned()));
+ assert!(
+ view.actions
+ .iter()
+ .any(|action| action == &format!("radroots --account-id {other_pubkey} sync push"))
+ );
+ }
+
+ #[test]
fn sync_push_failed_publish_leaves_pending_state_retryable() {
let dir = tempdir().expect("tempdir");
let config = sample_config(dir.path(), vec!["wss://relay.example.com".to_owned()]);
@@ -1295,6 +1590,19 @@ mod tests {
.expect("farm");
}
+ fn seed_replica_member_claim(config: &RuntimeConfig, member_pubkey: &str, farm_pubkey: &str) {
+ let executor = SqliteExecutor::open(&config.local.replica_db_path).expect("open replica");
+ migrations::run_all_up(&executor).expect("migrations");
+ let _ = farm_member_claim::create(
+ &executor,
+ &IFarmMemberClaimFields {
+ member_pubkey: member_pubkey.to_owned(),
+ farm_pubkey: farm_pubkey.to_owned(),
+ },
+ )
+ .expect("member claim");
+ }
+
fn identity(seed: u8) -> RadrootsIdentity {
RadrootsIdentity::from_secret_key_bytes(&[seed; 32]).expect("identity")
}