commit 530715c3e1a68a5da7a4fe83eb259e5cc310704c
parent 83a151e6551bb3275754a2f2831bd51a843c12e0
Author: triesap <tyson@radroots.org>
Date: Tue, 7 Apr 2026 05:07:18 +0000
land first sync operator surfaces
Diffstat:
8 files changed, 763 insertions(+), 8 deletions(-)
diff --git a/src/cli.rs b/src/cli.rs
@@ -125,7 +125,7 @@ impl Command {
SyncCommand::Status => "sync status",
SyncCommand::Pull => "sync pull",
SyncCommand::Push => "sync push",
- SyncCommand::Watch => "sync watch",
+ SyncCommand::Watch(_) => "sync watch",
},
}
}
@@ -146,7 +146,7 @@ impl Command {
}) | Self::Order(OrderArgs {
command: OrderCommand::Ls | OrderCommand::History,
}) | Self::Sync(SyncArgs {
- command: SyncCommand::Watch,
+ command: SyncCommand::Watch(_),
}) | Self::Find(_)
),
}
@@ -301,7 +301,15 @@ pub enum SyncCommand {
Status,
Pull,
Push,
- Watch,
+ Watch(SyncWatchArgs),
+}
+
+#[derive(Debug, Clone, Args)]
+pub struct SyncWatchArgs {
+ #[arg(long)]
+ pub frames: usize,
+ #[arg(long, default_value_t = 1_000)]
+ pub interval_ms: u64,
}
#[derive(Debug, Clone, Args)]
@@ -378,7 +386,7 @@ mod tests {
use super::{
AccountCommand, CliArgs, Command, ConfigCommand, JobCommand, ListingCommand, LocalCommand,
LocalExportFormatArg, MycCommand, NetCommand, OrderCommand, RelayCommand, RpcCommand,
- SignerCommand, SyncCommand,
+ SignerCommand, SyncCommand, SyncWatchArgs,
};
use crate::runtime::config::OutputFormat;
use clap::Parser;
@@ -589,6 +597,29 @@ mod tests {
_ => panic!("unexpected command variant"),
}
+ let sync_watch = CliArgs::parse_from([
+ "radroots",
+ "sync",
+ "watch",
+ "--frames",
+ "2",
+ "--interval-ms",
+ "25",
+ ]);
+ match sync_watch.command {
+ Command::Sync(args) => match args.command {
+ SyncCommand::Watch(SyncWatchArgs {
+ frames,
+ interval_ms,
+ }) => {
+ assert_eq!(frames, 2);
+ assert_eq!(interval_ms, 25);
+ }
+ _ => panic!("unexpected sync subcommand"),
+ },
+ _ => panic!("unexpected command variant"),
+ }
+
let listing = CliArgs::parse_from(["radroots", "listing", "get", "lst_123"]);
match listing.command {
Command::Listing(args) => match args.command {
@@ -652,5 +683,12 @@ mod tests {
let find = CliArgs::parse_from(["radroots", "find", "eggs"]);
assert!(find.command.supports_output_format(OutputFormat::Ndjson));
+
+ let sync_watch = CliArgs::parse_from(["radroots", "sync", "watch", "--frames", "1"]);
+ assert!(
+ sync_watch
+ .command
+ .supports_output_format(OutputFormat::Ndjson)
+ );
}
}
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
@@ -6,6 +6,7 @@ pub mod net;
pub mod relay;
pub mod runtime;
pub mod signer;
+pub mod sync;
use crate::cli::{
AccountCommand, Command, ConfigCommand, JobCommand, ListingCommand, LocalCommand, MycCommand,
@@ -84,10 +85,10 @@ pub fn dispatch(
RpcCommand::Sessions => unimplemented_command("rpc sessions"),
},
Command::Sync(sync) => match &sync.command {
- SyncCommand::Status => unimplemented_command("sync status"),
- SyncCommand::Pull => unimplemented_command("sync pull"),
- SyncCommand::Push => unimplemented_command("sync push"),
- SyncCommand::Watch => unimplemented_command("sync watch"),
+ SyncCommand::Status => sync::status(config),
+ SyncCommand::Pull => sync::pull(config),
+ SyncCommand::Push => sync::push(config),
+ SyncCommand::Watch(args) => sync::watch(config, args),
},
}
}
diff --git a/src/commands/sync.rs b/src/commands/sync.rs
@@ -0,0 +1,45 @@
+use crate::cli::SyncWatchArgs;
+use crate::domain::runtime::{CommandDisposition, CommandOutput, CommandView};
+use crate::runtime::RuntimeError;
+use crate::runtime::config::RuntimeConfig;
+
+pub fn status(config: &RuntimeConfig) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::sync::status(config)?;
+ Ok(output_from_disposition(
+ view.disposition(),
+ CommandView::SyncStatus(view),
+ ))
+}
+
+pub fn pull(config: &RuntimeConfig) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::sync::pull(config)?;
+ Ok(output_from_disposition(
+ view.disposition(),
+ CommandView::SyncPull(view),
+ ))
+}
+
+pub fn push(config: &RuntimeConfig) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::sync::push(config)?;
+ Ok(output_from_disposition(
+ view.disposition(),
+ CommandView::SyncPush(view),
+ ))
+}
+
+pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::sync::watch(config, args)?;
+ Ok(output_from_disposition(
+ view.disposition(),
+ CommandView::SyncWatch(view),
+ ))
+}
+
+fn output_from_disposition(disposition: CommandDisposition, view: CommandView) -> CommandOutput {
+ match disposition {
+ CommandDisposition::Success => CommandOutput::success(view),
+ CommandDisposition::Unconfigured => CommandOutput::unconfigured(view),
+ CommandDisposition::ExternalUnavailable => CommandOutput::external_unavailable(view),
+ CommandDisposition::InternalError => CommandOutput::internal_error(view),
+ }
+}
diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs
@@ -82,6 +82,10 @@ pub enum CommandView {
NetStatus(NetStatusView),
RelayList(RelayListView),
SignerStatus(SignerStatusView),
+ SyncPull(SyncActionView),
+ SyncPush(SyncActionView),
+ SyncStatus(SyncStatusView),
+ SyncWatch(SyncWatchView),
}
#[derive(Debug, Clone, Serialize)]
@@ -323,6 +327,22 @@ pub struct LocalReplicaSyncView {
}
#[derive(Debug, Clone, Serialize)]
+pub struct SyncFreshnessView {
+ pub state: String,
+ pub display: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub age_seconds: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_event_at: Option<u64>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncQueueView {
+ pub expected_count: usize,
+ pub pending_count: usize,
+}
+
+#[derive(Debug, Clone, Serialize)]
pub struct LocalBackupView {
pub state: String,
pub source: String,
@@ -424,6 +444,94 @@ impl NetStatusView {
}
#[derive(Debug, Clone, Serialize)]
+pub struct SyncStatusView {
+ pub state: String,
+ pub source: String,
+ pub local_root: String,
+ pub replica_db: String,
+ pub relay_count: usize,
+ pub publish_policy: String,
+ pub freshness: SyncFreshnessView,
+ pub queue: SyncQueueView,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub reason: Option<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub actions: Vec<String>,
+}
+
+impl SyncStatusView {
+ pub fn disposition(&self) -> CommandDisposition {
+ match self.state.as_str() {
+ "unconfigured" => CommandDisposition::Unconfigured,
+ "unavailable" => CommandDisposition::ExternalUnavailable,
+ "error" => CommandDisposition::InternalError,
+ _ => CommandDisposition::Success,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncActionView {
+ pub direction: String,
+ pub state: String,
+ pub source: String,
+ pub local_root: String,
+ pub replica_db: String,
+ pub relay_count: usize,
+ pub publish_policy: String,
+ pub freshness: SyncFreshnessView,
+ pub queue: SyncQueueView,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub reason: Option<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub actions: Vec<String>,
+}
+
+impl SyncActionView {
+ pub fn disposition(&self) -> CommandDisposition {
+ match self.state.as_str() {
+ "unconfigured" => CommandDisposition::Unconfigured,
+ "unavailable" => CommandDisposition::ExternalUnavailable,
+ "error" => CommandDisposition::InternalError,
+ _ => CommandDisposition::Success,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncWatchView {
+ pub state: String,
+ pub source: String,
+ pub interval_ms: u64,
+ pub frames: Vec<SyncWatchFrameView>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub reason: Option<String>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub actions: Vec<String>,
+}
+
+impl SyncWatchView {
+ pub fn disposition(&self) -> CommandDisposition {
+ match self.state.as_str() {
+ "unconfigured" => CommandDisposition::Unconfigured,
+ "unavailable" => CommandDisposition::ExternalUnavailable,
+ "error" => CommandDisposition::InternalError,
+ _ => CommandDisposition::Success,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct SyncWatchFrameView {
+ pub sequence: usize,
+ pub observed_at: u64,
+ pub state: String,
+ pub relay_count: usize,
+ pub freshness: SyncFreshnessView,
+ pub queue: SyncQueueView,
+}
+
+#[derive(Debug, Clone, Serialize)]
pub struct SignerStatusView {
pub mode: String,
pub state: String,
diff --git a/src/render/mod.rs b/src/render/mod.rs
@@ -3,6 +3,7 @@ use std::io::{self, Write};
use crate::domain::runtime::{
AccountListView, AccountSummaryView, CommandOutput, CommandView, DoctorCheckView, DoctorView,
LocalBackupView, LocalExportView, LocalInitView, LocalStatusView, NetStatusView, RelayListView,
+ SyncActionView, SyncStatusView, SyncWatchView,
};
use crate::runtime::RuntimeError;
use crate::runtime::config::{OutputConfig, OutputFormat};
@@ -128,6 +129,18 @@ fn render_human_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(),
render_myc_status(stdout, myc, false)?;
}
}
+ CommandView::SyncPull(view) => {
+ render_sync_action(stdout, view)?;
+ }
+ CommandView::SyncPush(view) => {
+ render_sync_action(stdout, view)?;
+ }
+ CommandView::SyncStatus(view) => {
+ render_sync_status(stdout, view)?;
+ }
+ CommandView::SyncWatch(view) => {
+ render_sync_watch(stdout, view)?;
+ }
}
Ok(())
}
@@ -195,6 +208,22 @@ fn render_json_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(),
serde_json::to_writer_pretty(&mut *stdout, view)?;
writeln!(stdout)?;
}
+ CommandView::SyncPull(view) => {
+ serde_json::to_writer_pretty(&mut *stdout, view)?;
+ writeln!(stdout)?;
+ }
+ CommandView::SyncPush(view) => {
+ serde_json::to_writer_pretty(&mut *stdout, view)?;
+ writeln!(stdout)?;
+ }
+ CommandView::SyncStatus(view) => {
+ serde_json::to_writer_pretty(&mut *stdout, view)?;
+ writeln!(stdout)?;
+ }
+ CommandView::SyncWatch(view) => {
+ serde_json::to_writer_pretty(&mut *stdout, view)?;
+ writeln!(stdout)?;
+ }
}
Ok(())
}
@@ -220,6 +249,13 @@ fn render_ndjson_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<()
}
Ok(())
}
+ CommandView::SyncWatch(view) => {
+ for frame in &view.frames {
+ serde_json::to_writer(&mut *stdout, frame)?;
+ writeln!(stdout)?;
+ }
+ Ok(())
+ }
_ => Err(RuntimeError::Config(format!(
"`{}` does not support --ndjson",
human_command_name(output.view())
@@ -437,6 +473,104 @@ fn render_net_status(stdout: &mut dyn Write, view: &NetStatusView) -> Result<(),
Ok(())
}
+fn render_sync_status(stdout: &mut dyn Write, view: &SyncStatusView) -> Result<(), RuntimeError> {
+ write_context(
+ stdout,
+ match view.state.as_str() {
+ "ready" => "activity · sync status",
+ _ => "activity · sync unconfigured",
+ },
+ )?;
+ let relay_count = view.relay_count.to_string();
+ let expected = view.queue.expected_count.to_string();
+ let pending = view.queue.pending_count.to_string();
+ render_pairs(
+ stdout,
+ "sync",
+ &[
+ ("status", view.state.as_str()),
+ ("freshness", view.freshness.display.as_str()),
+ ("pending", pending.as_str()),
+ ("expected", expected.as_str()),
+ ("relays", relay_count.as_str()),
+ ("publish policy", view.publish_policy.as_str()),
+ ("replica db", view.replica_db.as_str()),
+ ("local root", view.local_root.as_str()),
+ ],
+ )?;
+ if let Some(reason) = &view.reason {
+ writeln!(stdout, "reason: {reason}")?;
+ }
+ writeln!(stdout, "source: {}", view.source)?;
+ render_actions(stdout, &view.actions)?;
+ Ok(())
+}
+
+fn render_sync_action(stdout: &mut dyn Write, view: &SyncActionView) -> Result<(), RuntimeError> {
+ write_context(
+ stdout,
+ format!("activity · sync {} {}", view.direction, view.state).as_str(),
+ )?;
+ let relay_count = view.relay_count.to_string();
+ let expected = view.queue.expected_count.to_string();
+ let pending = view.queue.pending_count.to_string();
+ render_pairs(
+ stdout,
+ "sync",
+ &[
+ ("direction", view.direction.as_str()),
+ ("status", view.state.as_str()),
+ ("freshness", view.freshness.display.as_str()),
+ ("pending", pending.as_str()),
+ ("expected", expected.as_str()),
+ ("relays", relay_count.as_str()),
+ ("publish policy", view.publish_policy.as_str()),
+ ("replica db", view.replica_db.as_str()),
+ ("local root", view.local_root.as_str()),
+ ],
+ )?;
+ if let Some(reason) = &view.reason {
+ writeln!(stdout, "reason: {reason}")?;
+ }
+ writeln!(stdout, "source: {}", view.source)?;
+ render_actions(stdout, &view.actions)?;
+ Ok(())
+}
+
+fn render_sync_watch(stdout: &mut dyn Write, view: &SyncWatchView) -> Result<(), RuntimeError> {
+ write_context(stdout, "activity · sync watch")?;
+ if view.frames.is_empty() {
+ writeln!(stdout, "no sync frames collected")?;
+ writeln!(stdout)?;
+ } else {
+ let table = Table {
+ headers: &["frame", "status", "freshness", "pending", "relays"],
+ rows: view
+ .frames
+ .iter()
+ .map(|frame| {
+ vec![
+ frame.sequence.to_string(),
+ frame.state.clone(),
+ frame.freshness.display.clone(),
+ frame.queue.pending_count.to_string(),
+ frame.relay_count.to_string(),
+ ]
+ })
+ .collect(),
+ };
+ render_table(stdout, &table)?;
+ writeln!(stdout)?;
+ }
+ writeln!(stdout, "interval ms: {}", view.interval_ms)?;
+ if let Some(reason) = &view.reason {
+ writeln!(stdout, "reason: {reason}")?;
+ }
+ writeln!(stdout, "source: {}", view.source)?;
+ render_actions(stdout, &view.actions)?;
+ Ok(())
+}
+
fn render_local_init(stdout: &mut dyn Write, view: &LocalInitView) -> Result<(), RuntimeError> {
write_context(stdout, format!("local · {}", view.state).as_str())?;
render_pairs(
@@ -761,6 +895,10 @@ fn human_command_name(view: &CommandView) -> &'static str {
CommandView::NetStatus(_) => "net status",
CommandView::RelayList(_) => "relay ls",
CommandView::SignerStatus(_) => "signer status",
+ CommandView::SyncPull(_) => "sync pull",
+ CommandView::SyncPush(_) => "sync push",
+ CommandView::SyncStatus(_) => "sync status",
+ CommandView::SyncWatch(_) => "sync watch",
}
}
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
@@ -5,6 +5,7 @@ pub mod logging;
pub mod myc;
pub mod network;
pub mod signer;
+pub mod sync;
use std::process::ExitCode;
diff --git a/src/runtime/sync.rs b/src/runtime/sync.rs
@@ -0,0 +1,265 @@
+use std::thread;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use radroots_replica_sync::radroots_replica_sync_status;
+use radroots_sql_core::{SqlExecutor, SqliteExecutor};
+use serde_json::Value;
+
+use crate::cli::SyncWatchArgs;
+use crate::domain::runtime::{
+ SyncActionView, SyncFreshnessView, SyncQueueView, SyncStatusView, SyncWatchFrameView,
+ SyncWatchView,
+};
+use crate::runtime::RuntimeError;
+use crate::runtime::config::RuntimeConfig;
+
+const SYNC_SOURCE: &str = "local replica · local first";
+const RELAY_SETUP_ACTION: &str = "radroots relay ls --relay wss://relay.example.com";
+
+#[derive(Debug, Clone)]
+struct SyncSnapshot {
+ state: String,
+ source: String,
+ local_root: String,
+ replica_db: String,
+ relay_count: usize,
+ publish_policy: String,
+ freshness: SyncFreshnessView,
+ queue: SyncQueueView,
+ reason: Option<String>,
+ actions: Vec<String>,
+}
+
+pub fn status(config: &RuntimeConfig) -> Result<SyncStatusView, RuntimeError> {
+ let snapshot = inspect_sync(config)?;
+ Ok(SyncStatusView {
+ state: snapshot.state,
+ source: snapshot.source,
+ local_root: snapshot.local_root,
+ replica_db: snapshot.replica_db,
+ relay_count: snapshot.relay_count,
+ publish_policy: snapshot.publish_policy,
+ freshness: snapshot.freshness,
+ queue: snapshot.queue,
+ reason: snapshot.reason,
+ actions: snapshot.actions,
+ })
+}
+
+pub fn pull(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
+ narrowed_action(
+ config,
+ "pull",
+ "relay ingest is not wired into `radroots sync pull` yet",
+ )
+}
+
+pub fn push(config: &RuntimeConfig) -> Result<SyncActionView, RuntimeError> {
+ narrowed_action(
+ config,
+ "push",
+ "relay publish is not wired into `radroots sync push` yet",
+ )
+}
+
+pub fn watch(config: &RuntimeConfig, args: &SyncWatchArgs) -> Result<SyncWatchView, RuntimeError> {
+ if args.frames == 0 {
+ return Err(RuntimeError::Config(
+ "`sync watch --frames` must be greater than 0".to_owned(),
+ ));
+ }
+
+ let mut frames = Vec::with_capacity(args.frames);
+ let mut last_snapshot = None;
+
+ for index in 0..args.frames {
+ let snapshot = inspect_sync(config)?;
+ frames.push(SyncWatchFrameView {
+ sequence: index + 1,
+ observed_at: unix_now(),
+ state: snapshot.state.clone(),
+ relay_count: snapshot.relay_count,
+ freshness: snapshot.freshness.clone(),
+ queue: snapshot.queue.clone(),
+ });
+ last_snapshot = Some(snapshot);
+
+ if index + 1 < args.frames {
+ thread::sleep(Duration::from_millis(args.interval_ms));
+ }
+ }
+
+ let snapshot = last_snapshot.expect("watch frames are non-empty");
+ Ok(SyncWatchView {
+ state: snapshot.state,
+ source: snapshot.source,
+ interval_ms: args.interval_ms,
+ frames,
+ reason: snapshot.reason,
+ actions: snapshot.actions,
+ })
+}
+
+fn narrowed_action(
+ config: &RuntimeConfig,
+ direction: &str,
+ unavailable_reason: &str,
+) -> Result<SyncActionView, RuntimeError> {
+ let snapshot = inspect_sync(config)?;
+ if snapshot.state == "unconfigured" {
+ return Ok(SyncActionView {
+ direction: direction.to_owned(),
+ state: snapshot.state,
+ source: snapshot.source,
+ local_root: snapshot.local_root,
+ replica_db: snapshot.replica_db,
+ relay_count: snapshot.relay_count,
+ publish_policy: snapshot.publish_policy,
+ freshness: snapshot.freshness,
+ queue: snapshot.queue,
+ reason: snapshot.reason,
+ actions: snapshot.actions,
+ });
+ }
+
+ let mut actions = vec!["radroots sync status".to_owned()];
+ actions.extend(snapshot.actions);
+
+ Ok(SyncActionView {
+ direction: direction.to_owned(),
+ state: "unavailable".to_owned(),
+ source: snapshot.source,
+ local_root: snapshot.local_root,
+ replica_db: snapshot.replica_db,
+ relay_count: snapshot.relay_count,
+ publish_policy: snapshot.publish_policy,
+ freshness: snapshot.freshness,
+ queue: snapshot.queue,
+ reason: Some(unavailable_reason.to_owned()),
+ actions,
+ })
+}
+
+fn inspect_sync(config: &RuntimeConfig) -> Result<SyncSnapshot, RuntimeError> {
+ if !config.local.replica_db_path.exists() {
+ return Ok(SyncSnapshot {
+ state: "unconfigured".to_owned(),
+ source: SYNC_SOURCE.to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "missing".to_owned(),
+ relay_count: config.relay.urls.len(),
+ publish_policy: config.relay.publish_policy.as_str().to_owned(),
+ freshness: SyncFreshnessView {
+ state: "never".to_owned(),
+ display: "never synced".to_owned(),
+ age_seconds: None,
+ last_event_at: None,
+ },
+ queue: SyncQueueView {
+ expected_count: 0,
+ pending_count: 0,
+ },
+ reason: Some("local replica database is not initialized".to_owned()),
+ actions: vec!["radroots local init".to_owned()],
+ });
+ }
+
+ let executor = SqliteExecutor::open(&config.local.replica_db_path)?;
+ let queue = radroots_replica_sync_status(&executor)?;
+ let freshness = freshness_from_executor(&executor)?;
+ let relay_count = config.relay.urls.len();
+ let publish_policy = config.relay.publish_policy.as_str().to_owned();
+ let mut actions = Vec::new();
+
+ if relay_count == 0 {
+ actions.push(RELAY_SETUP_ACTION.to_owned());
+ return Ok(SyncSnapshot {
+ state: "unconfigured".to_owned(),
+ source: SYNC_SOURCE.to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "ready".to_owned(),
+ relay_count,
+ publish_policy,
+ freshness,
+ queue: SyncQueueView {
+ expected_count: queue.expected_count,
+ pending_count: queue.pending_count,
+ },
+ reason: Some("no relays are configured for this operator session".to_owned()),
+ actions,
+ });
+ }
+
+ actions.push("radroots sync pull".to_owned());
+ if queue.pending_count > 0 {
+ actions.push("radroots sync push".to_owned());
+ }
+
+ Ok(SyncSnapshot {
+ state: "ready".to_owned(),
+ source: SYNC_SOURCE.to_owned(),
+ local_root: config.local.root.display().to_string(),
+ replica_db: "ready".to_owned(),
+ relay_count,
+ publish_policy,
+ freshness,
+ queue: SyncQueueView {
+ expected_count: queue.expected_count,
+ pending_count: queue.pending_count,
+ },
+ reason: None,
+ actions,
+ })
+}
+
+fn freshness_from_executor(executor: &SqliteExecutor) -> Result<SyncFreshnessView, RuntimeError> {
+ let raw = executor.query_raw(
+ "SELECT MAX(last_created_at) AS last_created_at FROM nostr_event_state WHERE last_created_at IS NOT NULL",
+ "[]",
+ )?;
+ let json: Value = serde_json::from_str(&raw)?;
+ let last_event_at = json
+ .as_array()
+ .and_then(|rows| rows.first())
+ .and_then(|row| row.get("last_created_at"))
+ .and_then(|value| {
+ value
+ .as_u64()
+ .or_else(|| value.as_i64().and_then(|signed| u64::try_from(signed).ok()))
+ });
+
+ Ok(match last_event_at {
+ Some(last_event_at) => {
+ let age_seconds = unix_now().saturating_sub(last_event_at);
+ SyncFreshnessView {
+ state: "synced".to_owned(),
+ display: format!("synced {}", relative_age(age_seconds)),
+ age_seconds: Some(age_seconds),
+ last_event_at: Some(last_event_at),
+ }
+ }
+ None => SyncFreshnessView {
+ state: "never".to_owned(),
+ display: "never synced".to_owned(),
+ age_seconds: None,
+ last_event_at: None,
+ },
+ })
+}
+
+fn unix_now() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .map(|duration| duration.as_secs())
+ .unwrap_or(0)
+}
+
+fn relative_age(age_seconds: u64) -> String {
+ match age_seconds {
+ 0 => "now".to_owned(),
+ 1..=59 => format!("{age_seconds}s ago"),
+ 60..=3_599 => format!("{}m ago", age_seconds / 60),
+ 3_600..=86_399 => format!("{}h ago", age_seconds / 3_600),
+ _ => format!("{}d ago", age_seconds / 86_400),
+ }
+}
diff --git a/tests/sync.rs b/tests/sync.rs
@@ -0,0 +1,159 @@
+use std::fs;
+use std::path::Path;
+use std::process::Command;
+
+use assert_cmd::prelude::*;
+use serde_json::Value;
+use tempfile::tempdir;
+
+fn cli_command_in(workdir: &Path) -> Command {
+ let mut command = Command::cargo_bin("radroots").expect("binary");
+ command.current_dir(workdir);
+ command.env("HOME", workdir.join("home"));
+ for key in [
+ "RADROOTS_ENV_FILE",
+ "RADROOTS_OUTPUT",
+ "RADROOTS_CLI_LOGGING_FILTER",
+ "RADROOTS_CLI_LOGGING_OUTPUT_DIR",
+ "RADROOTS_CLI_LOGGING_STDOUT",
+ "RADROOTS_LOG_FILTER",
+ "RADROOTS_LOG_DIR",
+ "RADROOTS_LOG_STDOUT",
+ "RADROOTS_ACCOUNT",
+ "RADROOTS_IDENTITY_PATH",
+ "RADROOTS_SIGNER",
+ "RADROOTS_RELAYS",
+ "RADROOTS_MYC_EXECUTABLE",
+ ] {
+ command.env_remove(key);
+ }
+ command
+}
+
+#[test]
+fn sync_status_reports_unconfigured_when_local_replica_is_missing() {
+ let dir = tempdir().expect("tempdir");
+ let output = cli_command_in(dir.path())
+ .args(["--json", "sync", "status"])
+ .output()
+ .expect("run sync status");
+
+ assert_eq!(output.status.code(), Some(3));
+ let json: Value = serde_json::from_slice(output.stdout.as_slice()).expect("sync json");
+ assert_eq!(json["state"], "unconfigured");
+ assert_eq!(json["replica_db"], "missing");
+ assert_eq!(json["freshness"]["display"], "never synced");
+ assert_eq!(json["actions"][0], "radroots local init");
+}
+
+#[test]
+fn sync_status_reports_queue_and_relay_setup_need_after_local_init() {
+ let dir = tempdir().expect("tempdir");
+ let init = cli_command_in(dir.path())
+ .args(["local", "init"])
+ .output()
+ .expect("run local init");
+ assert!(init.status.success());
+
+ let output = cli_command_in(dir.path())
+ .args(["--json", "sync", "status"])
+ .output()
+ .expect("run sync status");
+
+ assert_eq!(output.status.code(), Some(3));
+ let json: Value = serde_json::from_slice(output.stdout.as_slice()).expect("sync json");
+ assert_eq!(json["state"], "unconfigured");
+ assert_eq!(json["replica_db"], "ready");
+ assert_eq!(json["queue"]["pending_count"], 0);
+ assert_eq!(json["freshness"]["display"], "never synced");
+ assert_eq!(
+ json["actions"][0],
+ "radroots relay ls --relay wss://relay.example.com"
+ );
+}
+
+#[test]
+fn sync_pull_and_push_are_honestly_narrowed_until_relay_plane_lands() {
+ let dir = tempdir().expect("tempdir");
+ let init = cli_command_in(dir.path())
+ .args(["local", "init"])
+ .output()
+ .expect("run local init");
+ assert!(init.status.success());
+ let config_dir = dir.path().join(".radroots");
+ fs::create_dir_all(&config_dir).expect("workspace config dir");
+ fs::write(
+ config_dir.join("config.toml"),
+ "[relay]\nurls = [\"wss://relay.one\"]\npublish_policy = \"any\"\n",
+ )
+ .expect("write workspace config");
+
+ let pull = cli_command_in(dir.path())
+ .args(["--json", "sync", "pull"])
+ .output()
+ .expect("run sync pull");
+ assert_eq!(pull.status.code(), Some(4));
+ let pull_json: Value = serde_json::from_slice(pull.stdout.as_slice()).expect("pull json");
+ assert_eq!(pull_json["direction"], "pull");
+ assert_eq!(pull_json["state"], "unavailable");
+ assert_eq!(pull_json["relay_count"], 1);
+ assert!(
+ pull_json["reason"]
+ .as_str()
+ .is_some_and(|reason| reason.contains("relay ingest"))
+ );
+
+ let push = cli_command_in(dir.path())
+ .args(["--json", "sync", "push"])
+ .output()
+ .expect("run sync push");
+ assert_eq!(push.status.code(), Some(4));
+ let push_json: Value = serde_json::from_slice(push.stdout.as_slice()).expect("push json");
+ assert_eq!(push_json["direction"], "push");
+ assert_eq!(push_json["state"], "unavailable");
+ assert_eq!(push_json["queue"]["pending_count"], 0);
+ assert!(
+ push_json["reason"]
+ .as_str()
+ .is_some_and(|reason| reason.contains("relay publish"))
+ );
+}
+
+#[test]
+fn sync_watch_ndjson_emits_one_frame_per_poll() {
+ let dir = tempdir().expect("tempdir");
+ let init = cli_command_in(dir.path())
+ .args(["local", "init"])
+ .output()
+ .expect("run local init");
+ assert!(init.status.success());
+ let config_dir = dir.path().join(".radroots");
+ fs::create_dir_all(&config_dir).expect("workspace config dir");
+ fs::write(
+ config_dir.join("config.toml"),
+ "[relay]\nurls = [\"wss://relay.one\", \"wss://relay.two\"]\npublish_policy = \"any\"\n",
+ )
+ .expect("write workspace config");
+
+ let output = cli_command_in(dir.path())
+ .args([
+ "--ndjson",
+ "sync",
+ "watch",
+ "--frames",
+ "2",
+ "--interval-ms",
+ "1",
+ ])
+ .output()
+ .expect("run sync watch");
+
+ assert!(output.status.success());
+ let stdout = String::from_utf8(output.stdout).expect("utf8 stdout");
+ let lines = stdout.lines().collect::<Vec<_>>();
+ assert_eq!(lines.len(), 2);
+ assert!(lines[0].contains("\"sequence\":1"));
+ assert!(lines[0].contains("\"state\":\"ready\""));
+ assert!(lines[1].contains("\"sequence\":2"));
+ assert!(lines[1].contains("\"relay_count\":2"));
+}