commit 3d2363b31982d0b548fe9124ec2f9b14e04fdcf3
parent 5889b8d30cea8a4756f9eef882a9b8c4c82d3306
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 21:09:34 +0000
publish-proxy: recover interrupted jobs
- recover persisted publishing jobs on store open
- mark interrupted attempts retryable with explicit last_error
- preserve existing job rows without daemon background retry
- add file-backed store recovery coverage
Diffstat:
1 file changed, 71 insertions(+), 0 deletions(-)
diff --git a/src/core/publish_proxy/mod.rs b/src/core/publish_proxy/mod.rs
@@ -709,6 +709,7 @@ impl PublishProxyStore {
"#,
)?;
migrate_schema(&connection)?;
+ recover_interrupted_publish_jobs(&connection)?;
connection.pragma_update(None, "user_version", SCHEMA_VERSION)?;
Ok(Self {
inner: Arc::new(Mutex::new(connection)),
@@ -1174,6 +1175,28 @@ fn migrate_schema(connection: &Connection) -> Result<(), PublishProxyError> {
Ok(())
}
+fn recover_interrupted_publish_jobs(connection: &Connection) -> Result<(), PublishProxyError> {
+ let now = current_unix_millis();
+ connection.execute(
+ r#"
+ UPDATE publish_proxy_jobs
+ SET status = ?1,
+ updated_at_ms = ?2,
+ completed_at_ms = ?3,
+ last_error = ?4
+ WHERE status = ?5
+ "#,
+ params![
+ serde_json::to_string(&PublishJobStatus::DeliveryUnsatisfiedRetryable)?,
+ now,
+ now,
+ "publish_attempt_interrupted",
+ serde_json::to_string(&PublishJobStatus::Publishing)?,
+ ],
+ )?;
+ Ok(())
+}
+
fn table_has_column(
connection: &Connection,
table: &str,
@@ -1948,6 +1971,54 @@ mod tests {
);
}
+ #[test]
+ fn store_open_recovers_interrupted_publishing_jobs() {
+ let directory = tempfile::tempdir().expect("tempdir");
+ let database_path = directory.path().join("publish-proxy.sqlite");
+ let token_hash = hash_bearer_token(generate_bearer_token().as_str());
+ let pubkey = "a".repeat(64);
+ let request = request(pubkey.as_str(), 30_402);
+ let job_id = {
+ let store = PublishProxyStore::open(database_path.clone()).expect("store");
+ let principal = store
+ .create_principal(PublishPrincipalInit {
+ label: "tester".to_owned(),
+ token_hash,
+ allowed_pubkeys: vec![pubkey],
+ allowed_kinds: vec![30_402],
+ allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly],
+ allow_request_relays: false,
+ job_visibility: PublishJobVisibility::Own,
+ expires_at_unix: None,
+ })
+ .expect("principal");
+ let response = store
+ .record_publish_job(PublishJobInsert {
+ principal_id: principal.principal_id,
+ idempotency_key: Some("idem-interrupted".to_owned()),
+ request,
+ request_fingerprint: "fingerprint-interrupted".to_owned(),
+ effective_relay_count: 1,
+ })
+ .expect("record job");
+ assert_eq!(response.job.status, PublishJobStatus::Publishing);
+ response.job.job_id
+ };
+
+ let reopened = PublishProxyStore::open(database_path).expect("reopen store");
+ let recovered = reopened.job_by_id(job_id.as_str()).expect("recovered job");
+ assert_eq!(
+ recovered.status,
+ PublishJobStatus::DeliveryUnsatisfiedRetryable
+ );
+ assert_eq!(
+ recovered.last_error.as_deref(),
+ Some("publish_attempt_interrupted")
+ );
+ assert!(recovered.completed_at_ms.is_some());
+ assert!(recovered.relays.is_empty());
+ }
+
#[tokio::test]
async fn publish_event_verifies_and_records_daemon_default_outcome() {
let identity = RadrootsIdentity::generate();