radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit 17c7792a905f4b2cc6af94d55823d843e4417f94
parent ad5f2171ddefeaa169c252a668edeaa07bcfc556
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 22:18:20 +0000

bridge: expose durable job lifecycle

Diffstat:
Msrc/core/bridge/store.rs | 48+++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/transport/jsonrpc/methods/bridge/job_status.rs | 4++--
Msrc/transport/jsonrpc/methods/bridge/listing_publish.rs | 4++--
Msrc/transport/jsonrpc/methods/bridge/order_request.rs | 4++--
Msrc/transport/jsonrpc/methods/bridge/shared.rs | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/transport/jsonrpc/methods/bridge/status.rs | 8++++++++
Msrc/transport/jsonrpc/methods/mod.rs | 4++++
7 files changed, 157 insertions(+), 11 deletions(-)

diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs @@ -9,7 +9,7 @@ use crate::app::config::BridgeDeliveryPolicy; use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult}; const BRIDGE_JOB_STORE_VERSION: u32 = 2; -const BRIDGE_PENDING_RECOVERY_SUMMARY: &str = +pub(crate) const BRIDGE_PENDING_RECOVERY_SUMMARY: &str = "bridge publish did not complete before process restart"; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -50,10 +50,25 @@ pub struct BridgeJobRecord { pub relay_outcome_summary: String, } +impl BridgeJobRecord { + pub fn is_terminal(&self) -> bool { + self.status != BridgeJobStatus::Accepted + } + + pub fn recovered_after_restart(&self) -> bool { + self.status == BridgeJobStatus::Failed + && self.relay_outcome_summary == BRIDGE_PENDING_RECOVERY_SUMMARY + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] pub struct BridgeJobStoreSnapshot { pub retained_jobs: usize, pub retained_idempotency_keys: usize, + pub accepted_jobs: usize, + pub published_jobs: usize, + pub failed_jobs: usize, + pub recovered_failed_jobs: usize, pub capacity: usize, } @@ -232,9 +247,29 @@ impl BridgeJobStore { pub fn snapshot(&self) -> BridgeJobStoreSnapshot { let inner = self.inner.read().unwrap_or_else(|e| e.into_inner()); + let mut accepted_jobs = 0usize; + let mut published_jobs = 0usize; + let mut failed_jobs = 0usize; + let mut recovered_failed_jobs = 0usize; + for record in inner.jobs.values() { + match record.status { + BridgeJobStatus::Accepted => accepted_jobs += 1, + BridgeJobStatus::Published => published_jobs += 1, + BridgeJobStatus::Failed => { + failed_jobs += 1; + if record.recovered_after_restart() { + recovered_failed_jobs += 1; + } + } + } + } BridgeJobStoreSnapshot { retained_jobs: inner.jobs.len(), retained_idempotency_keys: inner.idempotency.len(), + accepted_jobs, + published_jobs, + failed_jobs, + recovered_failed_jobs, capacity: inner.capacity, } } @@ -623,6 +658,9 @@ mod tests { assert!(store.get("job-1").is_none()); assert!(store.get("job-2").is_some()); assert_eq!(store.snapshot().retained_jobs, 1); + assert_eq!(store.snapshot().accepted_jobs, 1); + assert_eq!(store.snapshot().published_jobs, 0); + assert_eq!(store.snapshot().failed_jobs, 0); } #[test] @@ -704,6 +742,14 @@ mod tests { existing.relay_outcome_summary, BRIDGE_PENDING_RECOVERY_SUMMARY ); + assert!(existing.is_terminal()); + assert!(existing.recovered_after_restart()); + + let snapshot = loaded.store.snapshot(); + assert_eq!(snapshot.accepted_jobs, 0); + assert_eq!(snapshot.published_jobs, 0); + assert_eq!(snapshot.failed_jobs, 1); + assert_eq!(snapshot.recovered_failed_jobs, 1); let payload = std::fs::read_to_string(&path).expect("persisted payload"); let persisted: PersistedBridgeJobStore = diff --git a/src/transport/jsonrpc/methods/bridge/job_status.rs b/src/transport/jsonrpc/methods/bridge/job_status.rs @@ -2,8 +2,8 @@ use anyhow::Result; use jsonrpsee::server::RpcModule; use serde::Deserialize; -use crate::core::bridge::store::BridgeJobRecord; use crate::transport::jsonrpc::auth::require_bridge_auth; +use crate::transport::jsonrpc::methods::bridge::shared::BridgeJobView; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; #[derive(Debug, Deserialize)] @@ -26,7 +26,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res .bridge_jobs .get(job_id) .ok_or_else(|| RpcError::Other(format!("unknown bridge job: {job_id}"))) - .map(|job| -> BridgeJobRecord { job }) + .map(BridgeJobView::from) })?; Ok(()) } diff --git a/src/transport/jsonrpc/methods/bridge/listing_publish.rs b/src/transport/jsonrpc/methods/bridge/listing_publish.rs @@ -81,7 +81,7 @@ async fn publish_listing( crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { return Ok(BridgePublishResponse { deduplicated: true, - job: existing, + job: existing.into(), }); } }; @@ -131,7 +131,7 @@ async fn publish_listing( Ok(BridgePublishResponse { deduplicated: false, - job, + job: job.into(), }) } diff --git a/src/transport/jsonrpc/methods/bridge/order_request.rs b/src/transport/jsonrpc/methods/bridge/order_request.rs @@ -107,7 +107,7 @@ async fn publish_order_request( crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { return Ok(BridgePublishResponse { deduplicated: true, - job: existing, + job: existing.into(), }); } }; @@ -136,7 +136,7 @@ async fn publish_order_request( Ok(BridgePublishResponse { deduplicated: false, - job, + job: job.into(), }) } diff --git a/src/transport/jsonrpc/methods/bridge/shared.rs b/src/transport/jsonrpc/methods/bridge/shared.rs @@ -5,14 +5,77 @@ use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; use serde::Serialize; use sha2::{Digest, Sha256}; -use crate::core::bridge::store::{BridgeJobRecord, BridgeJobReservation, BridgeJobStoreError}; +use crate::core::bridge::publish::BridgeRelayPublishResult; +use crate::core::bridge::store::{ + BridgeJobRecord, BridgeJobReservation, BridgeJobStatus, BridgeJobStoreError, +}; use crate::transport::jsonrpc::nip46::{client as nip46_client, session as nip46_session}; use crate::transport::jsonrpc::{RpcContext, RpcError}; #[derive(Clone, Debug, Serialize)] pub(super) struct BridgePublishResponse { pub deduplicated: bool, - pub job: BridgeJobRecord, + pub job: BridgeJobView, +} + +#[derive(Clone, Debug, Serialize)] +pub(super) struct BridgeJobView { + pub job_id: String, + pub command: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub idempotency_key: Option<String>, + pub status: BridgeJobStatus, + pub terminal: bool, + pub recovered_after_restart: bool, + pub requested_at_unix: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completed_at_unix: Option<u64>, + pub signer_mode: String, + pub event_kind: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_addr: Option<String>, + pub delivery_policy: crate::app::config::BridgeDeliveryPolicy, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub delivery_quorum: Option<usize>, + pub relay_count: usize, + pub acknowledged_relay_count: usize, + pub required_acknowledged_relay_count: usize, + pub attempt_count: usize, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub attempt_summaries: Vec<String>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub relay_results: Vec<BridgeRelayPublishResult>, + pub relay_outcome_summary: String, +} + +impl From<BridgeJobRecord> for BridgeJobView { + fn from(record: BridgeJobRecord) -> Self { + Self { + terminal: record.is_terminal(), + recovered_after_restart: record.recovered_after_restart(), + job_id: record.job_id, + command: record.command, + idempotency_key: record.idempotency_key, + status: record.status, + requested_at_unix: record.requested_at_unix, + completed_at_unix: record.completed_at_unix, + signer_mode: record.signer_mode, + event_kind: record.event_kind, + event_id: record.event_id, + event_addr: record.event_addr, + delivery_policy: record.delivery_policy, + delivery_quorum: record.delivery_quorum, + relay_count: record.relay_count, + acknowledged_relay_count: record.acknowledged_relay_count, + required_acknowledged_relay_count: record.required_acknowledged_relay_count, + attempt_count: record.attempt_count, + attempt_summaries: record.attempt_summaries, + relay_results: record.relay_results, + relay_outcome_summary: record.relay_outcome_summary, + } + } } pub(super) fn ensure_bridge_enabled(ctx: &RpcContext) -> Result<(), RpcError> { @@ -159,12 +222,17 @@ mod tests { use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata}; - use crate::app::config::{BridgeConfig, Nip46Config}; + use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy, Nip46Config}; use crate::core::Radrootsd; + use crate::core::bridge::store::{ + BRIDGE_PENDING_RECOVERY_SUMMARY, BridgeJobStatus, new_listing_publish_job, + }; use crate::core::nip46::session::Nip46Session; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - use super::{fingerprint_bridge_request, normalize_idempotency_key, resolve_bridge_signer}; + use super::{ + BridgeJobView, fingerprint_bridge_request, normalize_idempotency_key, resolve_bridge_signer, + }; use std::time::Instant; #[test] @@ -239,4 +307,24 @@ mod tests { .expect("second"); assert_ne!(first, second); } + + #[test] + fn bridge_job_view_exposes_terminal_and_recovery_flags() { + let mut job = new_listing_publish_job( + "job-1".to_string(), + Some("same".to_string()), + "embedded_service_identity".to_string(), + 30402, + Some("event-1".to_string()), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + job.status = BridgeJobStatus::Failed; + job.completed_at_unix = Some(1); + job.relay_outcome_summary = BRIDGE_PENDING_RECOVERY_SUMMARY.to_string(); + let view = BridgeJobView::from(job); + assert!(view.terminal); + assert!(view.recovered_after_restart); + } } diff --git a/src/transport/jsonrpc/methods/bridge/status.rs b/src/transport/jsonrpc/methods/bridge/status.rs @@ -22,6 +22,10 @@ struct BridgeStatusResponse { job_status_retention: usize, retained_jobs: usize, retained_idempotency_keys: usize, + accepted_jobs: usize, + published_jobs: usize, + failed_jobs: usize, + recovered_failed_jobs: usize, methods: Vec<String>, } @@ -45,6 +49,10 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res job_status_retention: ctx.state.bridge_config.job_status_retention, retained_jobs: snapshot.retained_jobs, retained_idempotency_keys: snapshot.retained_idempotency_keys, + accepted_jobs: snapshot.accepted_jobs, + published_jobs: snapshot.published_jobs, + failed_jobs: snapshot.failed_jobs, + recovered_failed_jobs: snapshot.recovered_failed_jobs, methods: ctx.methods.list(), }) })?; diff --git a/src/transport/jsonrpc/methods/mod.rs b/src/transport/jsonrpc/methods/mod.rs @@ -103,5 +103,9 @@ mod tests { .await .expect("request"); assert!(response.get().contains("\"auth_mode\":\"bearer_token\"")); + assert!(response.get().contains("\"accepted_jobs\":0")); + assert!(response.get().contains("\"published_jobs\":0")); + assert!(response.get().contains("\"failed_jobs\":0")); + assert!(response.get().contains("\"recovered_failed_jobs\":0")); } }