commit 38d9edd7350f48dace0ce2f4f3987c15f59b43fa
parent 99cfeaf1fbb129d26a866e75eef4d5733c8dad6b
Author: triesap <tyson@radroots.org>
Date: Thu, 26 Mar 2026 21:52:06 +0000
runtime: add startup delivery recovery
Diffstat:
| M | src/app/runtime.rs | | | 760 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- |
| M | tests/nip46_e2e.rs | | | 93 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- |
2 files changed, 846 insertions(+), 7 deletions(-)
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -4,23 +4,34 @@ use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use crate::audit::{MycJsonlOperationAuditStore, MycOperationAuditRecord, MycOperationAuditStore};
+use crate::audit::{
+ MycJsonlOperationAuditStore, MycOperationAuditKind, MycOperationAuditOutcome,
+ MycOperationAuditRecord, MycOperationAuditStore,
+};
use crate::audit_sqlite::MycSqliteOperationAuditStore;
use crate::config::{
MycAuditConfig, MycConfig, MycIdentitySourceSpec, MycPersistenceConfig, MycRuntimeAuditBackend,
- MycSignerStateBackend,
+ MycSignerStateBackend, MycTransportDeliveryPolicy,
};
use crate::custody::MycIdentityProvider;
+use crate::discovery::MycDiscoveryContext;
use crate::error::MycError;
use crate::operability::server::run_observability_server;
-use crate::outbox::MycDeliveryOutboxStore;
+use crate::outbox::{
+ MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, MycDeliveryOutboxStore,
+};
use crate::outbox_sqlite::MycSqliteDeliveryOutboxStore;
use crate::policy::MycPolicyContext;
-use crate::transport::{MycNip46Service, MycNostrTransport, MycTransportSnapshot};
+use crate::transport::{
+ MycNip46Service, MycNostrTransport, MycPublishOutcome, MycTransportSnapshot,
+};
use radroots_identity::{RadrootsIdentity, RadrootsIdentityPublic};
use radroots_nostr_signer::prelude::{
RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement,
- RadrootsNostrSignerManager, RadrootsNostrSignerStore, RadrootsNostrSqliteSignerStore,
+ RadrootsNostrSignerAuthState, RadrootsNostrSignerConnectionRecord, RadrootsNostrSignerManager,
+ RadrootsNostrSignerPublishWorkflowKind, RadrootsNostrSignerPublishWorkflowRecord,
+ RadrootsNostrSignerPublishWorkflowState, RadrootsNostrSignerStore,
+ RadrootsNostrSqliteSignerStore,
};
use serde::Serialize;
@@ -217,6 +228,7 @@ impl MycRuntime {
transport_connect_timeout_secs = snapshot.transport.connect_timeout_secs,
"myc runtime bootstrapped"
);
+ self.recover_pending_delivery_jobs().await?;
let mut tasks = tokio::task::JoinSet::new();
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
if let Some(transport) = self.transport.clone() {
@@ -270,6 +282,520 @@ impl MycRuntime {
})?;
Ok(())
}
+
+ async fn recover_pending_delivery_jobs(&self) -> Result<(), MycError> {
+ let mut queued_records = self
+ .delivery_outbox_store
+ .list_by_status(MycDeliveryOutboxStatus::Queued)?;
+ let published_records = self
+ .delivery_outbox_store
+ .list_by_status(MycDeliveryOutboxStatus::PublishedPendingFinalize)?;
+ if queued_records.is_empty() && published_records.is_empty() {
+ self.ensure_no_orphaned_publish_workflows()?;
+ return Ok(());
+ }
+
+ queued_records.extend(published_records);
+ queued_records.sort_by(|left, right| {
+ left.created_at_unix
+ .cmp(&right.created_at_unix)
+ .then_with(|| left.job_id.as_str().cmp(right.job_id.as_str()))
+ });
+
+ tracing::info!(
+ unfinished_delivery_job_count = queued_records.len(),
+ "starting myc delivery recovery"
+ );
+
+ let manager = self.signer_manager()?;
+ for record in queued_records {
+ self.recover_delivery_outbox_record(&manager, record)
+ .await?;
+ }
+ self.ensure_no_orphaned_publish_workflows()?;
+
+ tracing::info!("completed myc delivery recovery");
+ Ok(())
+ }
+
+ fn ensure_no_orphaned_publish_workflows(&self) -> Result<(), MycError> {
+ let workflows = self.signer_manager()?.list_publish_workflows()?;
+ if workflows.is_empty() {
+ return Ok(());
+ }
+
+ let remaining = workflows
+ .into_iter()
+ .map(|workflow| {
+ format!(
+ "{}:{}:{:?}",
+ workflow.workflow_id, workflow.connection_id, workflow.kind
+ )
+ })
+ .collect::<Vec<_>>()
+ .join(", ");
+ Err(MycError::InvalidOperation(format!(
+ "startup recovery found orphaned signer publish workflows with no recoverable outbox job: {remaining}"
+ )))
+ }
+
+ async fn recover_delivery_outbox_record(
+ &self,
+ manager: &RadrootsNostrSignerManager,
+ record: MycDeliveryOutboxRecord,
+ ) -> Result<(), MycError> {
+ self.validate_outbox_workflow_expectations(&record)?;
+ let workflow = self.lookup_publish_workflow_for_record(manager, &record)?;
+ tracing::info!(
+ job_id = %record.job_id,
+ kind = ?record.kind,
+ status = ?record.status,
+ request_id = record.request_id.as_deref().unwrap_or(""),
+ attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ signer_publish_workflow_id = record
+ .signer_publish_workflow_id
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_default(),
+ "recovering myc delivery outbox job"
+ );
+
+ match record.status {
+ MycDeliveryOutboxStatus::Queued => {
+ if record.signer_publish_workflow_id.is_some() && workflow.is_none() {
+ return Err(self.wrap_recovery_error(
+ &record,
+ MycError::InvalidOperation(
+ "delivery outbox job references a missing signer publish workflow before startup recovery publish"
+ .to_owned(),
+ ),
+ ));
+ }
+ if matches!(
+ workflow.as_ref().map(|workflow| workflow.state),
+ Some(RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize)
+ ) {
+ let publish_attempt_count = record.publish_attempt_count.max(1);
+ let published = self
+ .delivery_outbox_store
+ .mark_published_pending_finalize(&record.job_id, publish_attempt_count)?;
+ return self.finalize_recovered_delivery_job(
+ manager,
+ published,
+ workflow.as_ref(),
+ None,
+ );
+ }
+
+ let publish_outcome = self
+ .republish_recovered_outbox_event(&record)
+ .await
+ .map_err(|error| self.wrap_recovery_error(&record, error))?;
+ if let Some(workflow) = workflow.as_ref() {
+ manager
+ .mark_publish_workflow_published(&workflow.workflow_id)
+ .map_err(|error| {
+ self.wrap_recovery_error(
+ &record,
+ MycError::InvalidOperation(format!(
+ "failed to mark signer publish workflow as published during startup recovery: {error}"
+ )),
+ )
+ })?;
+ }
+ let published_workflow = match record.signer_publish_workflow_id.as_ref() {
+ Some(workflow_id) => Some(
+ manager
+ .get_publish_workflow(workflow_id)
+ .map_err(MycError::from)
+ .and_then(|workflow| {
+ workflow.ok_or_else(|| {
+ MycError::InvalidOperation(format!(
+ "signer publish workflow `{workflow_id}` disappeared after startup recovery publish confirmation"
+ ))
+ })
+ })
+ .map_err(|error| self.wrap_recovery_error(&record, error))?,
+ ),
+ None => None,
+ };
+ let published = self
+ .delivery_outbox_store
+ .mark_published_pending_finalize(&record.job_id, publish_outcome.attempt_count)
+ .map_err(|error| self.wrap_recovery_error(&record, error))?;
+ self.finalize_recovered_delivery_job(
+ manager,
+ published,
+ published_workflow.as_ref(),
+ Some(&publish_outcome),
+ )
+ }
+ MycDeliveryOutboxStatus::PublishedPendingFinalize => {
+ self.finalize_recovered_delivery_job(manager, record, workflow.as_ref(), None)
+ }
+ MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => Ok(()),
+ }
+ }
+
+ fn finalize_recovered_delivery_job(
+ &self,
+ manager: &RadrootsNostrSignerManager,
+ record: MycDeliveryOutboxRecord,
+ workflow: Option<&RadrootsNostrSignerPublishWorkflowRecord>,
+ publish_outcome: Option<&MycPublishOutcome>,
+ ) -> Result<(), MycError> {
+ if let Some(workflow) = workflow {
+ if workflow.state != RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize {
+ return Err(self.wrap_recovery_error(
+ &record,
+ MycError::InvalidOperation(format!(
+ "signer publish workflow `{}` is in `{}` instead of `published_pending_finalize` during startup recovery",
+ workflow.workflow_id,
+ format!("{:?}", workflow.state)
+ )),
+ ));
+ }
+ manager
+ .finalize_publish_workflow(&workflow.workflow_id)
+ .map_err(|error| {
+ self.wrap_recovery_error(
+ &record,
+ MycError::InvalidOperation(format!(
+ "failed to finalize signer publish workflow during startup recovery: {error}"
+ )),
+ )
+ })?;
+ } else {
+ self.ensure_record_is_already_finalized_without_workflow(manager, &record)?;
+ }
+
+ let finalized_record = self
+ .delivery_outbox_store
+ .mark_finalized(&record.job_id)
+ .map_err(|error| self.wrap_recovery_error(&record, error))?;
+ self.record_recovery_success(&finalized_record, publish_outcome);
+ Ok(())
+ }
+
+ fn ensure_record_is_already_finalized_without_workflow(
+ &self,
+ manager: &RadrootsNostrSignerManager,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<(), MycError> {
+ let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() else {
+ return Ok(());
+ };
+
+ match record.kind {
+ MycDeliveryOutboxKind::ListenerResponsePublish
+ | MycDeliveryOutboxKind::ConnectAcceptPublish => {
+ let connection = self.recovery_connection_record(manager, record)?;
+ if !connection.connect_secret_is_consumed() {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "delivery outbox job `{}` references consumed-secret workflow `{workflow_id}` but the connection secret is still reusable",
+ record.job_id
+ )),
+ ));
+ }
+ }
+ MycDeliveryOutboxKind::AuthReplayPublish => {
+ let connection = self.recovery_connection_record(manager, record)?;
+ if connection.auth_state != RadrootsNostrSignerAuthState::Authorized
+ || connection.pending_request.is_some()
+ {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "delivery outbox job `{}` references auth replay workflow `{workflow_id}` but the connection auth state is not finalized",
+ record.job_id
+ )),
+ ));
+ }
+ }
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish => {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "discovery delivery outbox job `{}` unexpectedly references signer workflow `{workflow_id}`",
+ record.job_id
+ )),
+ ));
+ }
+ }
+
+ Ok(())
+ }
+
+ fn recovery_connection_record(
+ &self,
+ manager: &RadrootsNostrSignerManager,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<RadrootsNostrSignerConnectionRecord, MycError> {
+ let connection_id = record.connection_id.as_ref().ok_or_else(|| {
+ self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(
+ "delivery outbox job is missing a connection id required for recovery"
+ .to_owned(),
+ ),
+ )
+ })?;
+ manager.get_connection(connection_id)?.ok_or_else(|| {
+ self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "delivery outbox job references missing connection `{connection_id}`"
+ )),
+ )
+ })
+ }
+
+ fn validate_outbox_workflow_expectations(
+ &self,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<(), MycError> {
+ match record.kind {
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish => {
+ if record.signer_publish_workflow_id.is_some() {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(
+ "discovery delivery outbox jobs must not reference signer publish workflows"
+ .to_owned(),
+ ),
+ ));
+ }
+ }
+ MycDeliveryOutboxKind::ConnectAcceptPublish
+ | MycDeliveryOutboxKind::AuthReplayPublish => {
+ if record.signer_publish_workflow_id.is_none() {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(
+ "control delivery outbox jobs must reference signer publish workflows"
+ .to_owned(),
+ ),
+ ));
+ }
+ }
+ MycDeliveryOutboxKind::ListenerResponsePublish => {}
+ }
+ Ok(())
+ }
+
+ fn lookup_publish_workflow_for_record(
+ &self,
+ manager: &RadrootsNostrSignerManager,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<Option<RadrootsNostrSignerPublishWorkflowRecord>, MycError> {
+ let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() else {
+ return Ok(None);
+ };
+ let workflow = manager.get_publish_workflow(workflow_id)?.map(|workflow| {
+ let kind_label = match record.kind {
+ MycDeliveryOutboxKind::ListenerResponsePublish
+ | MycDeliveryOutboxKind::ConnectAcceptPublish => {
+ RadrootsNostrSignerPublishWorkflowKind::ConnectSecretFinalization
+ }
+ MycDeliveryOutboxKind::AuthReplayPublish => {
+ RadrootsNostrSignerPublishWorkflowKind::AuthReplayFinalization
+ }
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish => unreachable!(),
+ };
+ if workflow.kind != kind_label {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "delivery outbox job `{}` expects signer workflow kind `{}` but found `{}`",
+ record.job_id,
+ format!("{kind_label:?}"),
+ format!("{:?}", workflow.kind),
+ )),
+ ));
+ }
+ if let Some(connection_id) = record.connection_id.as_ref() {
+ if &workflow.connection_id != connection_id {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "delivery outbox job `{}` connection `{connection_id}` does not match signer workflow connection `{}`",
+ record.job_id, workflow.connection_id
+ )),
+ ));
+ }
+ }
+ Ok(workflow)
+ });
+ workflow.transpose()
+ }
+
+ async fn republish_recovered_outbox_event(
+ &self,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<MycPublishOutcome, MycError> {
+ let signer_identity = self.recovery_publisher_identity(record)?;
+ MycNostrTransport::publish_event_once(
+ &signer_identity,
+ &record.relay_urls,
+ &self.config.transport,
+ recovery_operation_label(record.kind),
+ &record.event,
+ )
+ .await
+ }
+
+ fn recovery_publisher_identity(
+ &self,
+ record: &MycDeliveryOutboxRecord,
+ ) -> Result<RadrootsIdentity, MycError> {
+ if record.kind != MycDeliveryOutboxKind::DiscoveryHandlerPublish {
+ return Ok(self.signer_identity().clone());
+ }
+ if record.event.pubkey == self.signer_identity().public_key() {
+ return Ok(self.signer_identity().clone());
+ }
+
+ let context = MycDiscoveryContext::from_runtime(self)?;
+ if record.event.pubkey != context.app_identity().public_key() {
+ return Err(self.wrap_recovery_error(
+ record,
+ MycError::InvalidOperation(format!(
+ "discovery delivery outbox job author `{}` does not match the configured signer or discovery app identity",
+ record.event.pubkey
+ )),
+ ));
+ }
+ Ok(context.app_identity().clone())
+ }
+
+ fn record_recovery_success(
+ &self,
+ outbox_record: &MycDeliveryOutboxRecord,
+ publish_outcome: Option<&MycPublishOutcome>,
+ ) {
+ let (relay_count, acknowledged_relay_count, summary, mut audit_record) =
+ match publish_outcome {
+ Some(publish_outcome) => (
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ MycOperationAuditRecord::new(
+ recovery_operation_audit_kind(outbox_record.kind),
+ MycOperationAuditOutcome::Succeeded,
+ outbox_record.connection_id.as_ref(),
+ outbox_record.request_id.as_deref(),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ ),
+ ),
+ None => {
+ let relay_count = outbox_record.relay_urls.len();
+ let required_acknowledged_relay_count = self
+ .required_acknowledged_relay_count(relay_count)
+ .unwrap_or_default();
+ let summary = "startup recovery finalized previously published delivery job";
+ (
+ relay_count,
+ required_acknowledged_relay_count,
+ summary.to_owned(),
+ MycOperationAuditRecord::new(
+ recovery_operation_audit_kind(outbox_record.kind),
+ MycOperationAuditOutcome::Succeeded,
+ outbox_record.connection_id.as_ref(),
+ outbox_record.request_id.as_deref(),
+ relay_count,
+ required_acknowledged_relay_count,
+ summary.to_owned(),
+ )
+ .with_delivery_details(
+ self.config.transport.delivery_policy,
+ required_acknowledged_relay_count,
+ outbox_record.publish_attempt_count.max(1),
+ ),
+ )
+ }
+ };
+ if let Some(attempt_id) = outbox_record.attempt_id.as_deref() {
+ audit_record = audit_record.with_attempt_id(attempt_id);
+ }
+ tracing::info!(
+ job_id = %outbox_record.job_id,
+ kind = ?outbox_record.kind,
+ relay_count,
+ acknowledged_relay_count,
+ summary = %summary,
+ "recovered myc delivery outbox job"
+ );
+ self.record_operation_audit(&audit_record);
+ }
+
+ fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, MycError> {
+ match self.config.transport.delivery_policy {
+ MycTransportDeliveryPolicy::Any => Ok(1),
+ MycTransportDeliveryPolicy::All => Ok(relay_count),
+ MycTransportDeliveryPolicy::Quorum => {
+ let delivery_quorum = self.config.transport.delivery_quorum.ok_or_else(|| {
+ MycError::InvalidOperation(
+ "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`"
+ .to_owned(),
+ )
+ })?;
+ if delivery_quorum > relay_count {
+ return Err(MycError::InvalidOperation(format!(
+ "transport.delivery_quorum `{delivery_quorum}` cannot be satisfied by `{relay_count}` target relays"
+ )));
+ }
+ Ok(delivery_quorum)
+ }
+ }
+ }
+
+ fn wrap_recovery_error(&self, record: &MycDeliveryOutboxRecord, error: MycError) -> MycError {
+ let wrapped = MycError::InvalidOperation(format!(
+ "startup recovery failed for delivery outbox job `{}` ({:?}): {error}",
+ record.job_id, record.kind
+ ));
+ tracing::error!(
+ job_id = %record.job_id,
+ kind = ?record.kind,
+ status = ?record.status,
+ request_id = record.request_id.as_deref().unwrap_or(""),
+ attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ error = %wrapped,
+ "myc startup delivery recovery failed"
+ );
+ wrapped
+ }
+}
+
+fn recovery_operation_label(kind: MycDeliveryOutboxKind) -> &'static str {
+ match kind {
+ MycDeliveryOutboxKind::ListenerResponsePublish => "listener response recovery publish",
+ MycDeliveryOutboxKind::ConnectAcceptPublish => "connect accept recovery publish",
+ MycDeliveryOutboxKind::AuthReplayPublish => "auth replay recovery publish",
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish => "discovery handler recovery publish",
+ }
+}
+
+fn recovery_operation_audit_kind(kind: MycDeliveryOutboxKind) -> MycOperationAuditKind {
+ match kind {
+ MycDeliveryOutboxKind::ListenerResponsePublish => {
+ MycOperationAuditKind::ListenerResponsePublish
+ }
+ MycDeliveryOutboxKind::ConnectAcceptPublish => MycOperationAuditKind::ConnectAcceptPublish,
+ MycDeliveryOutboxKind::AuthReplayPublish => MycOperationAuditKind::AuthReplayPublish,
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish => {
+ MycOperationAuditKind::DiscoveryHandlerPublish
+ }
+ }
}
async fn drain_runtime_tasks(
@@ -563,14 +1089,18 @@ mod tests {
use std::sync::Arc;
use radroots_identity::RadrootsIdentity;
+ use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind};
use radroots_nostr_signer::prelude::{
- RadrootsNostrFileSignerStore, RadrootsNostrSignerManager, RadrootsNostrSqliteSignerStore,
+ RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement,
+ RadrootsNostrSignerConnectionDraft, RadrootsNostrSignerManager,
+ RadrootsNostrSqliteSignerStore,
};
use super::MycRuntime;
use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
use crate::config::{MycConfig, MycRuntimeAuditBackend, MycSignerStateBackend};
use crate::error::MycError;
+ use crate::outbox::{MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus};
fn write_test_identity(path: &std::path::Path, secret_key: &str) {
RadrootsIdentity::from_secret_key_str(secret_key)
@@ -863,4 +1393,222 @@ mod tests {
);
assert!(runtime.paths().delivery_outbox_path.is_file());
}
+
+ #[tokio::test]
+ async fn startup_recovery_rejects_orphaned_signer_publish_workflow() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let mut config = MycConfig::default();
+ config.paths.state_dir = temp.path().join("state");
+ config.paths.signer_identity_path = temp.path().join("signer.json");
+ config.paths.user_identity_path = temp.path().join("user.json");
+ write_test_identity(
+ &config.paths.signer_identity_path,
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ );
+ write_test_identity(
+ &config.paths.user_identity_path,
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ );
+
+ let runtime = MycRuntime::bootstrap(config).expect("runtime");
+ let client_identity = RadrootsIdentity::from_secret_key_str(
+ "7777777777777777777777777777777777777777777777777777777777777777",
+ )
+ .expect("client identity");
+ let manager = runtime.signer_manager().expect("manager");
+ let connection = manager
+ .register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_identity.public_key(),
+ runtime.user_public_identity(),
+ )
+ .with_connect_secret("orphan-secret")
+ .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")])
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )
+ .expect("register connection");
+ let workflow = manager
+ .begin_connect_secret_publish_finalization(&connection.connection_id)
+ .expect("begin workflow");
+
+ let error = runtime
+ .recover_pending_delivery_jobs()
+ .await
+ .expect_err("orphaned workflow should fail recovery");
+ let message = error.to_string();
+ assert!(message.contains("orphaned signer publish workflows"));
+ assert!(message.contains(workflow.workflow_id.as_str()));
+ }
+
+ #[tokio::test]
+ async fn startup_recovery_finalizes_published_connect_secret_workflow() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let mut config = MycConfig::default();
+ config.paths.state_dir = temp.path().join("state");
+ config.paths.signer_identity_path = temp.path().join("signer.json");
+ config.paths.user_identity_path = temp.path().join("user.json");
+ write_test_identity(
+ &config.paths.signer_identity_path,
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ );
+ write_test_identity(
+ &config.paths.user_identity_path,
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ );
+
+ let runtime = MycRuntime::bootstrap(config).expect("runtime");
+ let client_identity = RadrootsIdentity::from_secret_key_str(
+ "7777777777777777777777777777777777777777777777777777777777777777",
+ )
+ .expect("client identity");
+ let manager = runtime.signer_manager().expect("manager");
+ let connection = manager
+ .register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_identity.public_key(),
+ runtime.user_public_identity(),
+ )
+ .with_connect_secret("recovery-secret")
+ .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")])
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )
+ .expect("register connection");
+ let workflow = manager
+ .begin_connect_secret_publish_finalization(&connection.connection_id)
+ .expect("begin workflow");
+ manager
+ .mark_publish_workflow_published(&workflow.workflow_id)
+ .expect("mark workflow published");
+
+ let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "recovery")
+ .sign_with_keys(runtime.signer_identity().keys())
+ .expect("sign event");
+ let outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::ListenerResponsePublish,
+ event,
+ vec!["wss://relay.example.com".parse().expect("relay url")],
+ )
+ .expect("outbox record")
+ .with_connection_id(&connection.connection_id)
+ .with_request_id("recovery-request")
+ .with_signer_publish_workflow_id(&workflow.workflow_id);
+ runtime
+ .delivery_outbox_store()
+ .enqueue(&outbox_record)
+ .expect("enqueue outbox");
+ runtime
+ .delivery_outbox_store()
+ .mark_published_pending_finalize(&outbox_record.job_id, 1)
+ .expect("mark outbox published");
+
+ runtime
+ .recover_pending_delivery_jobs()
+ .await
+ .expect("recovery should succeed");
+
+ let connection = runtime
+ .signer_manager()
+ .expect("manager")
+ .get_connection(&connection.connection_id)
+ .expect("get connection")
+ .expect("stored connection");
+ assert!(connection.connect_secret_is_consumed());
+ assert!(
+ runtime
+ .signer_manager()
+ .expect("manager")
+ .list_publish_workflows()
+ .expect("list workflows")
+ .is_empty()
+ );
+ let outbox_records = runtime
+ .delivery_outbox_store()
+ .list_all()
+ .expect("list outbox");
+ assert_eq!(outbox_records.len(), 1);
+ assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
+ assert!(outbox_records[0].finalized_at_unix.is_some());
+ let audit_records = runtime.operation_audit_store().list().expect("list audit");
+ assert_eq!(audit_records.len(), 1);
+ assert_eq!(
+ audit_records[0].operation,
+ MycOperationAuditKind::ListenerResponsePublish
+ );
+ assert_eq!(
+ audit_records[0].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
+ assert_eq!(
+ audit_records[0].request_id.as_deref(),
+ Some("recovery-request")
+ );
+ }
+
+ #[tokio::test]
+ async fn startup_recovery_rejects_queued_job_with_missing_signer_workflow() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let mut config = MycConfig::default();
+ config.paths.state_dir = temp.path().join("state");
+ config.paths.signer_identity_path = temp.path().join("signer.json");
+ config.paths.user_identity_path = temp.path().join("user.json");
+ write_test_identity(
+ &config.paths.signer_identity_path,
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ );
+ write_test_identity(
+ &config.paths.user_identity_path,
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ );
+
+ let runtime = MycRuntime::bootstrap(config).expect("runtime");
+ let client_identity = RadrootsIdentity::from_secret_key_str(
+ "7777777777777777777777777777777777777777777777777777777777777777",
+ )
+ .expect("client identity");
+ let manager = runtime.signer_manager().expect("manager");
+ let connection = manager
+ .register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_identity.public_key(),
+ runtime.user_public_identity(),
+ )
+ .with_connect_secret("missing-workflow-secret")
+ .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")])
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )
+ .expect("register connection");
+ let workflow = manager
+ .begin_connect_secret_publish_finalization(&connection.connection_id)
+ .expect("begin workflow");
+ let event =
+ RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "queued-recovery")
+ .sign_with_keys(runtime.signer_identity().keys())
+ .expect("sign event");
+ let outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::ListenerResponsePublish,
+ event,
+ vec!["wss://relay.example.com".parse().expect("relay url")],
+ )
+ .expect("outbox record")
+ .with_connection_id(&connection.connection_id)
+ .with_request_id("queued-missing-workflow")
+ .with_signer_publish_workflow_id(&workflow.workflow_id);
+ runtime
+ .delivery_outbox_store()
+ .enqueue(&outbox_record)
+ .expect("enqueue outbox");
+ manager
+ .cancel_publish_workflow(&workflow.workflow_id)
+ .expect("cancel workflow");
+
+ let error = runtime
+ .recover_pending_delivery_jobs()
+ .await
+ .expect_err("queued job with missing workflow should fail recovery");
+ assert!(
+ error
+ .to_string()
+ .contains("missing signer publish workflow before startup recovery publish")
+ );
+ }
}
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -28,7 +28,8 @@ use nostr::{
};
use radroots_identity::RadrootsIdentity;
use radroots_nostr::prelude::{
- RadrootsNostrApplicationHandlerSpec, RadrootsNostrClient, RadrootsNostrMetadata,
+ RadrootsNostrApplicationHandlerSpec, RadrootsNostrClient, RadrootsNostrEventBuilder,
+ RadrootsNostrKind, RadrootsNostrMetadata, RadrootsNostrRelayUrl,
radroots_nostr_build_application_handler_event,
};
use radroots_nostr_connect::prelude::{
@@ -1632,6 +1633,96 @@ async fn live_listener_works_with_sqlite_signer_state_and_runtime_audit() -> Tes
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn startup_recovery_republishes_queued_listener_connect_secret_job() -> TestResult<()> {
+ let relay = TestRelay::spawn().await?;
+ let test_runtime = MycTestRuntime::new_with_transport_relays(
+ &[relay.url()],
+ MycConnectionApproval::NotRequired,
+ );
+ let MycTestRuntime {
+ _temp: _tempdir,
+ runtime,
+ } = test_runtime;
+ let signer_public_key = runtime.signer_identity().public_key();
+ let config = runtime.config().clone();
+ let client_identity =
+ identity("5454545454545454545454545454545454545454545454545454545454545454");
+ let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
+
+ let manager = runtime.signer_manager()?;
+ let connection = manager.register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_identity.public_key(),
+ runtime.user_public_identity(),
+ )
+ .with_connect_secret("startup-recovery-secret")
+ .with_relays(vec![relay_url.clone()])
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )?;
+ let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?;
+ let event = RadrootsNostrEventBuilder::new(
+ RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
+ "startup-recovery",
+ )
+ .sign_with_keys(runtime.signer_identity().keys())
+ .map_err(|error| format!("failed to sign startup recovery event: {error}"))?;
+ let outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::ListenerResponsePublish,
+ event,
+ vec![relay_url],
+ )?
+ .with_connection_id(&connection.connection_id)
+ .with_request_id("startup-recovery-connect")
+ .with_signer_publish_workflow_id(&workflow.workflow_id);
+ runtime.delivery_outbox_store().enqueue(&outbox_record)?;
+
+ runtime.run_until(async {}).await?;
+
+ let published = relay
+ .wait_for_published_events_by_author(signer_public_key, 1)
+ .await?;
+ assert_eq!(published.len(), 1);
+
+ let restarted_runtime = MycRuntime::bootstrap(config)?;
+ let recovered_connection = restarted_runtime
+ .signer_manager()?
+ .get_connection(&connection.connection_id)?
+ .expect("persisted connection");
+ assert!(recovered_connection.connect_secret_is_consumed());
+ assert!(
+ restarted_runtime
+ .signer_manager()?
+ .list_publish_workflows()?
+ .is_empty()
+ );
+ let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
+ assert_eq!(outbox_records.len(), 1);
+ assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
+ assert_eq!(
+ outbox_records[0].request_id.as_deref(),
+ Some("startup-recovery-connect")
+ );
+ assert!(outbox_records[0].published_at_unix.is_some());
+ assert!(outbox_records[0].finalized_at_unix.is_some());
+ let audit_records = restarted_runtime.operation_audit_store().list_all()?;
+ assert_eq!(audit_records.len(), 1);
+ assert_eq!(
+ audit_records[0].operation,
+ MycOperationAuditKind::ListenerResponsePublish
+ );
+ assert_eq!(
+ audit_records[0].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
+ assert_eq!(
+ audit_records[0].request_id.as_deref(),
+ Some("startup-recovery-connect")
+ );
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn trusted_client_reauths_after_authorized_ttl() -> TestResult<()> {
let relay = TestRelay::spawn().await?;
let client_identity =