commit ad5f2171ddefeaa169c252a668edeaa07bcfc556
parent aef910bf5391ed04fe9f660f54d68ad33f3ec3b0
Author: triesap <tyson@radroots.org>
Date: Sat, 28 Mar 2026 22:13:08 +0000
bridge: recover accepted jobs on restart
Diffstat:
2 files changed, 88 insertions(+), 7 deletions(-)
diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs
@@ -9,6 +9,8 @@ use crate::app::config::BridgeDeliveryPolicy;
use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult};
const BRIDGE_JOB_STORE_VERSION: u32 = 2;
+const BRIDGE_PENDING_RECOVERY_SUMMARY: &str =
+ "bridge publish did not complete before process restart";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -55,7 +57,13 @@ pub struct BridgeJobStoreSnapshot {
pub capacity: usize,
}
-#[derive(Clone)]
+#[derive(Debug, Clone)]
+pub struct BridgeJobStoreLoadOutcome {
+ pub store: BridgeJobStore,
+ pub recovered_jobs: Vec<BridgeJobRecord>,
+}
+
+#[derive(Debug, Clone)]
pub struct BridgeJobStore {
inner: Arc<RwLock<BridgeJobStoreInner>>,
persistence: Option<Arc<BridgeJobStorePersistence>>,
@@ -124,12 +132,20 @@ impl BridgeJobStore {
}
}
- pub fn load(path: PathBuf, capacity: usize) -> Result<Self, BridgeJobStoreError> {
+ pub fn load(
+ path: PathBuf,
+ capacity: usize,
+ ) -> Result<BridgeJobStoreLoadOutcome, BridgeJobStoreError> {
let persistence = Arc::new(BridgeJobStorePersistence::new(path));
let inner = persistence.load(capacity)?;
- Ok(Self {
+ let store = Self {
inner: Arc::new(RwLock::new(inner)),
persistence: Some(persistence),
+ };
+ let recovered_jobs = store.recover_pending_jobs()?;
+ Ok(BridgeJobStoreLoadOutcome {
+ store,
+ recovered_jobs,
})
}
@@ -223,6 +239,37 @@ impl BridgeJobStore {
}
}
+ fn recover_pending_jobs(&self) -> Result<Vec<BridgeJobRecord>, BridgeJobStoreError> {
+ let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner());
+ let mut recovered_jobs = Vec::new();
+ let completed_at_unix = unix_timestamp_now();
+
+ for record in inner.jobs.values_mut() {
+ if record.status != BridgeJobStatus::Accepted {
+ continue;
+ }
+ record.status = BridgeJobStatus::Failed;
+ record.completed_at_unix = Some(completed_at_unix);
+ record.relay_count = 0;
+ record.acknowledged_relay_count = 0;
+ record.required_acknowledged_relay_count = 0;
+ record.attempt_count = 0;
+ record.relay_results.clear();
+ record.attempt_summaries = vec![BRIDGE_PENDING_RECOVERY_SUMMARY.to_string()];
+ record.relay_outcome_summary = BRIDGE_PENDING_RECOVERY_SUMMARY.to_string();
+ recovered_jobs.push(record.clone());
+ }
+
+ if recovered_jobs.is_empty() {
+ return Ok(recovered_jobs);
+ }
+
+ let persisted = persisted_store_from_inner(&inner);
+ drop(inner);
+ self.persist_snapshot(&persisted)?;
+ Ok(recovered_jobs)
+ }
+
fn persist_snapshot(
&self,
snapshot: &PersistedBridgeJobStore,
@@ -413,8 +460,8 @@ mod tests {
use crate::core::bridge::publish::BridgePublishExecution;
use super::{
- BridgeJobReservation, BridgeJobStatus, BridgeJobStore, PersistedBridgeJobStore,
- new_listing_publish_job, new_order_request_job,
+ BRIDGE_PENDING_RECOVERY_SUMMARY, BridgeJobReservation, BridgeJobStatus, BridgeJobStore,
+ PersistedBridgeJobStore, new_listing_publish_job, new_order_request_job,
};
#[test]
@@ -597,13 +644,15 @@ mod tests {
}
#[test]
- fn load_recovers_persisted_jobs_and_idempotency() {
+ fn load_terminalizes_persisted_accepted_jobs_and_preserves_idempotency() {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos();
let path = std::env::temp_dir().join(format!("radrootsd-bridge-jobs-{nanos}.json"));
- let store = BridgeJobStore::load(path.clone(), 8).expect("load empty store");
+ let store = BridgeJobStore::load(path.clone(), 8)
+ .expect("load empty store")
+ .store;
let first = new_listing_publish_job(
"job-1".to_string(),
Some("same".to_string()),
@@ -622,6 +671,14 @@ mod tests {
));
let loaded = BridgeJobStore::load(path.clone(), 8).expect("reload store");
+ assert_eq!(loaded.recovered_jobs.len(), 1);
+ assert_eq!(loaded.recovered_jobs[0].job_id, "job-1");
+ assert_eq!(loaded.recovered_jobs[0].status, BridgeJobStatus::Failed);
+ assert_eq!(
+ loaded.recovered_jobs[0].relay_outcome_summary,
+ BRIDGE_PENDING_RECOVERY_SUMMARY
+ );
+
let duplicate = new_listing_publish_job(
"job-2".to_string(),
Some("same".to_string()),
@@ -633,6 +690,7 @@ mod tests {
None,
);
let existing = match loaded
+ .store
.reserve(duplicate, "fingerprint-1".to_string())
.expect("dedupe after reload")
{
@@ -640,11 +698,25 @@ mod tests {
BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"),
};
assert_eq!(existing.job_id, "job-1");
+ assert_eq!(existing.status, BridgeJobStatus::Failed);
+ assert!(existing.completed_at_unix.is_some());
+ assert_eq!(
+ existing.relay_outcome_summary,
+ BRIDGE_PENDING_RECOVERY_SUMMARY
+ );
let payload = std::fs::read_to_string(&path).expect("persisted payload");
let persisted: PersistedBridgeJobStore =
serde_json::from_str(&payload).expect("persisted store");
assert_eq!(persisted.version, 2);
+ assert_eq!(
+ persisted
+ .jobs
+ .get("job-1")
+ .expect("persisted recovered job")
+ .status,
+ BridgeJobStatus::Failed
+ );
let _ = std::fs::remove_file(path);
}
diff --git a/src/core/state.rs b/src/core/state.rs
@@ -41,9 +41,18 @@ impl Radrootsd {
bridge_config.state_path.clone(),
bridge_config.job_status_retention,
)?;
+ #[cfg(not(test))]
+ if !bridge_jobs.recovered_jobs.is_empty() {
+ tracing::warn!(
+ recovered_bridge_jobs = bridge_jobs.recovered_jobs.len(),
+ "terminalized bridge jobs left accepted across restart"
+ );
+ }
#[cfg(test)]
let bridge_jobs =
crate::core::bridge::store::BridgeJobStore::new(bridge_config.job_status_retention);
+ #[cfg(not(test))]
+ let bridge_jobs = bridge_jobs.store;
let nip46_sessions = crate::core::nip46::session::Nip46SessionStore::new();
Ok(Self {