runtime.rs (80795B)
1 use std::fs; 2 use std::future::Future; 3 use std::net::SocketAddr; 4 use std::path::{Path, PathBuf}; 5 use std::sync::Arc; 6 use std::time::Duration; 7 8 use super::backend::MycSignerBackend; 9 use crate::audit::{ 10 MycJsonlOperationAuditStore, MycOperationAuditKind, MycOperationAuditOutcome, 11 MycOperationAuditRecord, MycOperationAuditStore, 12 }; 13 use crate::audit_sqlite::MycSqliteOperationAuditStore; 14 use crate::config::{ 15 MycAuditConfig, MycConfig, MycIdentityBackend, MycIdentitySourceSpec, MycPersistenceConfig, 16 MycRuntimeAuditBackend, MycSignerStateBackend, MycTransportDeliveryPolicy, 17 }; 18 use crate::custody::{MycActiveIdentity, MycIdentityProvider}; 19 use crate::discovery::MycDiscoveryContext; 20 use crate::error::MycError; 21 use crate::operability::{ 22 MycDeliveryOutboxStatusOutput, MycLiveMetricsHandle, MycLiveMetricsState, MycMetricsSnapshot, 23 server::run_observability_server, 24 }; 25 use crate::outbox::{ 26 MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, MycDeliveryOutboxStore, 27 }; 28 use crate::outbox_sqlite::MycSqliteDeliveryOutboxStore; 29 use crate::policy::MycPolicyContext; 30 use crate::transport::{ 31 MycNip46Service, MycNostrTransport, MycPublishOutcome, MycTransportSnapshot, 32 }; 33 use radroots_identity::RadrootsIdentityPublic; 34 use radroots_nostr_signer::prelude::{ 35 RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement, 36 RadrootsNostrSignerAuthState, RadrootsNostrSignerConnectionRecord, RadrootsNostrSignerManager, 37 RadrootsNostrSignerPublishWorkflowKind, RadrootsNostrSignerPublishWorkflowRecord, 38 RadrootsNostrSignerPublishWorkflowState, RadrootsNostrSignerRequestAuditRecord, 39 RadrootsNostrSignerStore, RadrootsNostrSqliteSignerStore, 40 }; 41 use serde::Serialize; 42 43 #[derive(Debug, Clone, PartialEq, Eq)] 44 pub struct MycRuntimePaths { 45 pub state_dir: PathBuf, 46 pub audit_dir: PathBuf, 47 pub signer_identity_path: PathBuf, 48 pub user_identity_path: PathBuf, 49 pub signer_state_path: PathBuf, 50 pub runtime_audit_path: PathBuf, 51 pub delivery_outbox_path: PathBuf, 52 } 53 54 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 55 pub struct MycStartupSnapshot { 56 pub instance_name: String, 57 pub log_filter: String, 58 pub observability_enabled: bool, 59 pub observability_bind_addr: SocketAddr, 60 pub state_dir: PathBuf, 61 pub audit_dir: PathBuf, 62 #[serde(skip_serializing_if = "Option::is_none")] 63 pub signer_identity_path: Option<PathBuf>, 64 #[serde(skip_serializing_if = "Option::is_none")] 65 pub user_identity_path: Option<PathBuf>, 66 pub signer_identity_source: MycIdentitySourceSpec, 67 pub user_identity_source: MycIdentitySourceSpec, 68 pub signer_state_backend: MycSignerStateBackend, 69 pub signer_state_path: PathBuf, 70 pub runtime_audit_backend: MycRuntimeAuditBackend, 71 pub runtime_audit_path: PathBuf, 72 pub signer_identity_id: String, 73 pub signer_public_key_hex: String, 74 pub user_identity_id: String, 75 pub user_public_key_hex: String, 76 pub transport: MycTransportSnapshot, 77 } 78 79 #[derive(Clone)] 80 pub struct MycSignerContext { 81 signer_identity_provider: MycIdentityProvider, 82 user_identity_provider: MycIdentityProvider, 83 signer_identity: MycActiveIdentity, 84 user_identity: MycActiveIdentity, 85 signer_store: Arc<dyn RadrootsNostrSignerStore>, 86 operation_audit_store: Arc<dyn MycOperationAuditStore>, 87 live_metrics: MycLiveMetricsHandle, 88 policy: MycPolicyContext, 89 connection_approval_requirement: RadrootsNostrSignerApprovalRequirement, 90 } 91 92 #[derive(Clone)] 93 pub struct MycRuntime { 94 config: MycConfig, 95 paths: MycRuntimePaths, 96 signer: MycSignerContext, 97 transport: Option<MycNostrTransport>, 98 delivery_outbox_store: Arc<dyn MycDeliveryOutboxStore>, 99 } 100 101 fn startup_identity_path(source: &MycIdentitySourceSpec) -> Option<PathBuf> { 102 match source.backend { 103 MycIdentityBackend::EncryptedFile 104 | MycIdentityBackend::PlaintextFile 105 | MycIdentityBackend::ManagedAccount => source.path.clone(), 106 MycIdentityBackend::HostVault | MycIdentityBackend::ExternalCommand => None, 107 } 108 } 109 110 fn format_startup_identity_path(path: Option<&Path>) -> String { 111 path.map(|path| path.display().to_string()) 112 .unwrap_or_default() 113 } 114 115 impl MycRuntime { 116 pub fn bootstrap(config: MycConfig) -> Result<Self, MycError> { 117 config.validate()?; 118 119 let paths = MycRuntimePaths::from_config(&config); 120 Self::prepare_filesystem_for(&paths)?; 121 let signer = MycSignerContext::bootstrap( 122 &paths, 123 &config.persistence, 124 config.audit.clone(), 125 MycPolicyContext::from_config(&config.policy)?, 126 Duration::from_secs(config.custody.external_command_timeout_secs), 127 config.paths.signer_identity_source(), 128 config.paths.user_identity_source(), 129 )?; 130 let transport = MycNostrTransport::bootstrap(&config.transport, &signer.signer_identity)?; 131 let delivery_outbox_store = Arc::new(MycSqliteDeliveryOutboxStore::open(&paths.state_dir)?); 132 let runtime = Self { 133 paths, 134 config, 135 signer, 136 transport, 137 delivery_outbox_store, 138 }; 139 Ok(runtime) 140 } 141 142 pub fn paths(&self) -> &MycRuntimePaths { 143 &self.paths 144 } 145 146 pub fn config(&self) -> &MycConfig { 147 &self.config 148 } 149 150 pub fn signer_identity(&self) -> &MycActiveIdentity { 151 self.signer.signer_identity() 152 } 153 154 pub fn signer_public_identity(&self) -> RadrootsIdentityPublic { 155 self.signer.signer_public_identity() 156 } 157 158 pub fn user_identity(&self) -> &MycActiveIdentity { 159 self.signer.user_identity() 160 } 161 162 pub fn user_public_identity(&self) -> RadrootsIdentityPublic { 163 self.signer.user_public_identity() 164 } 165 166 pub fn signer_manager(&self) -> Result<RadrootsNostrSignerManager, MycError> { 167 self.signer.load_signer_manager() 168 } 169 170 pub fn signer_backend(&self) -> MycSignerBackend { 171 MycSignerBackend::new(self.signer.clone()) 172 } 173 174 pub fn transport(&self) -> Option<&MycNostrTransport> { 175 self.transport.as_ref() 176 } 177 178 pub fn operation_audit_store(&self) -> Arc<dyn MycOperationAuditStore> { 179 self.signer.operation_audit_store() 180 } 181 182 pub(crate) fn metrics_snapshot( 183 &self, 184 outbox_status: &MycDeliveryOutboxStatusOutput, 185 ) -> MycMetricsSnapshot { 186 self.signer.metrics_snapshot(outbox_status) 187 } 188 189 pub fn delivery_outbox_store(&self) -> Arc<dyn MycDeliveryOutboxStore> { 190 self.delivery_outbox_store.clone() 191 } 192 193 pub fn record_operation_audit(&self, record: &MycOperationAuditRecord) { 194 self.signer.record_operation_audit(record); 195 } 196 197 pub(crate) fn signer_context(&self) -> MycSignerContext { 198 self.signer.clone() 199 } 200 201 pub fn snapshot(&self) -> MycStartupSnapshot { 202 let signer_public = self.signer.signer_identity.to_public(); 203 let user_public = self.signer.user_identity.to_public(); 204 MycStartupSnapshot { 205 instance_name: self.config.service.instance_name.clone(), 206 log_filter: self.config.logging.filter.clone(), 207 observability_enabled: self.config.observability.enabled, 208 observability_bind_addr: self.config.observability.bind_addr, 209 state_dir: self.paths.state_dir.clone(), 210 audit_dir: self.paths.audit_dir.clone(), 211 signer_identity_path: startup_identity_path(self.signer.signer_identity_source()), 212 user_identity_path: startup_identity_path(self.signer.user_identity_source()), 213 signer_identity_source: self.signer.signer_identity_source().clone(), 214 user_identity_source: self.signer.user_identity_source().clone(), 215 signer_state_backend: self.config.persistence.signer_state_backend, 216 signer_state_path: self.paths.signer_state_path.clone(), 217 runtime_audit_backend: self.config.persistence.runtime_audit_backend, 218 runtime_audit_path: self.paths.runtime_audit_path.clone(), 219 signer_identity_id: signer_public.id.into_string(), 220 signer_public_key_hex: signer_public.public_key_hex, 221 user_identity_id: user_public.id.into_string(), 222 user_public_key_hex: user_public.public_key_hex, 223 transport: self 224 .transport 225 .as_ref() 226 .map(MycNostrTransport::snapshot) 227 .unwrap_or_else(MycTransportSnapshot::disabled), 228 } 229 } 230 231 pub async fn run(self) -> Result<(), MycError> { 232 self.run_until(std::future::pending()).await 233 } 234 235 pub async fn run_until<F>(self, shutdown: F) -> Result<(), MycError> 236 where 237 F: Future<Output = ()>, 238 { 239 let snapshot = self.snapshot(); 240 let signer_identity_path = 241 format_startup_identity_path(snapshot.signer_identity_path.as_deref()); 242 let user_identity_path = 243 format_startup_identity_path(snapshot.user_identity_path.as_deref()); 244 tracing::info!( 245 instance_name = %snapshot.instance_name, 246 state_dir = %snapshot.state_dir.display(), 247 audit_dir = %snapshot.audit_dir.display(), 248 signer_identity_path = %signer_identity_path, 249 user_identity_path = %user_identity_path, 250 signer_identity_backend = %snapshot.signer_identity_source.backend.as_str(), 251 user_identity_backend = %snapshot.user_identity_source.backend.as_str(), 252 signer_keyring_account_id = snapshot.signer_identity_source.keyring_account_id.as_deref().unwrap_or(""), 253 user_keyring_account_id = snapshot.user_identity_source.keyring_account_id.as_deref().unwrap_or(""), 254 signer_state_backend = snapshot.signer_state_backend.as_str(), 255 signer_state_path = %snapshot.signer_state_path.display(), 256 runtime_audit_backend = snapshot.runtime_audit_backend.as_str(), 257 runtime_audit_path = %snapshot.runtime_audit_path.display(), 258 signer_identity_id = %snapshot.signer_identity_id, 259 signer_public_key_hex = %snapshot.signer_public_key_hex, 260 user_identity_id = %snapshot.user_identity_id, 261 user_public_key_hex = %snapshot.user_public_key_hex, 262 observability_enabled = snapshot.observability_enabled, 263 observability_bind_addr = %snapshot.observability_bind_addr, 264 transport_enabled = snapshot.transport.enabled, 265 transport_relay_count = snapshot.transport.relay_count, 266 transport_connect_timeout_secs = snapshot.transport.connect_timeout_secs, 267 "myc runtime bootstrapped" 268 ); 269 self.recover_pending_delivery_jobs().await?; 270 let mut tasks = tokio::task::JoinSet::new(); 271 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); 272 if let Some(transport) = self.transport.clone() { 273 let service = MycNip46Service::new( 274 self.signer_context(), 275 transport, 276 self.delivery_outbox_store(), 277 ); 278 let shutdown = observe_shutdown_signal(shutdown_rx.clone()); 279 tasks.spawn(async move { service.run_until(shutdown).await }); 280 } 281 if self.config.observability.enabled { 282 let runtime = self.clone(); 283 let shutdown = observe_shutdown_signal(shutdown_rx); 284 tasks.spawn(async move { run_observability_server(runtime, shutdown).await }); 285 } 286 287 tokio::pin!(shutdown); 288 if tasks.is_empty() { 289 shutdown.await; 290 return Ok(()); 291 } 292 293 tokio::select! { 294 _ = &mut shutdown => { 295 let _ = shutdown_tx.send(true); 296 drain_runtime_tasks(tasks).await 297 } 298 joined = tasks.join_next() => { 299 let _ = shutdown_tx.send(true); 300 let first_result = match joined { 301 Some(result) => result.map_err(|error| { 302 MycError::InvalidOperation(format!("myc runtime task failed: {error}")) 303 })?, 304 None => Ok(()), 305 }; 306 let remaining = drain_runtime_tasks(tasks).await; 307 first_result.and(remaining) 308 } 309 } 310 } 311 312 fn prepare_filesystem_for(paths: &MycRuntimePaths) -> Result<(), MycError> { 313 fs::create_dir_all(&paths.state_dir).map_err(|source| MycError::CreateDir { 314 path: paths.state_dir.clone(), 315 source, 316 })?; 317 fs::create_dir_all(&paths.audit_dir).map_err(|source| MycError::CreateDir { 318 path: paths.audit_dir.clone(), 319 source, 320 })?; 321 Ok(()) 322 } 323 324 async fn recover_pending_delivery_jobs(&self) -> Result<(), MycError> { 325 let mut queued_records = self 326 .delivery_outbox_store 327 .list_by_status(MycDeliveryOutboxStatus::Queued)?; 328 let published_records = self 329 .delivery_outbox_store 330 .list_by_status(MycDeliveryOutboxStatus::PublishedPendingFinalize)?; 331 if queued_records.is_empty() && published_records.is_empty() { 332 if let Err(error) = self.ensure_no_orphaned_publish_workflows() { 333 self.record_delivery_recovery_summary( 334 MycOperationAuditOutcome::Rejected, 335 0, 336 0, 337 0, 338 error.to_string(), 339 ); 340 return Err(error); 341 } 342 return Ok(()); 343 } 344 345 queued_records.extend(published_records); 346 queued_records.sort_by(|left, right| { 347 left.created_at_unix 348 .cmp(&right.created_at_unix) 349 .then_with(|| left.job_id.as_str().cmp(right.job_id.as_str())) 350 }); 351 352 tracing::info!( 353 unfinished_delivery_job_count = queued_records.len(), 354 "starting myc delivery recovery" 355 ); 356 357 let unfinished_delivery_job_count = queued_records.len(); 358 let mut finalized_job_count = 0usize; 359 let mut republished_job_count = 0usize; 360 let manager = self.signer_manager()?; 361 for record in queued_records { 362 match self.recover_delivery_outbox_record(&manager, record).await { 363 Ok(republished) => { 364 finalized_job_count += 1; 365 if republished { 366 republished_job_count += 1; 367 } 368 } 369 Err(error) => { 370 self.record_delivery_recovery_summary( 371 MycOperationAuditOutcome::Rejected, 372 unfinished_delivery_job_count, 373 finalized_job_count, 374 republished_job_count, 375 error.to_string(), 376 ); 377 return Err(error); 378 } 379 } 380 } 381 if let Err(error) = self.ensure_no_orphaned_publish_workflows() { 382 self.record_delivery_recovery_summary( 383 MycOperationAuditOutcome::Rejected, 384 unfinished_delivery_job_count, 385 finalized_job_count, 386 republished_job_count, 387 error.to_string(), 388 ); 389 return Err(error); 390 } 391 self.record_delivery_recovery_summary( 392 MycOperationAuditOutcome::Succeeded, 393 unfinished_delivery_job_count, 394 finalized_job_count, 395 republished_job_count, 396 format!( 397 "recovered {finalized_job_count}/{unfinished_delivery_job_count} delivery outbox job(s); republished {republished_job_count}" 398 ), 399 ); 400 401 tracing::info!("completed myc delivery recovery"); 402 Ok(()) 403 } 404 405 fn ensure_no_orphaned_publish_workflows(&self) -> Result<(), MycError> { 406 let workflows = self.signer_manager()?.list_publish_workflows()?; 407 if workflows.is_empty() { 408 return Ok(()); 409 } 410 411 let remaining = workflows 412 .into_iter() 413 .map(|workflow| { 414 format!( 415 "{}:{}:{:?}", 416 workflow.workflow_id, workflow.connection_id, workflow.kind 417 ) 418 }) 419 .collect::<Vec<_>>() 420 .join(", "); 421 Err(MycError::InvalidOperation(format!( 422 "startup recovery found orphaned signer publish workflows with no recoverable outbox job: {remaining}" 423 ))) 424 } 425 426 async fn recover_delivery_outbox_record( 427 &self, 428 manager: &RadrootsNostrSignerManager, 429 record: MycDeliveryOutboxRecord, 430 ) -> Result<bool, MycError> { 431 self.validate_outbox_workflow_expectations(&record)?; 432 let workflow = self.lookup_publish_workflow_for_record(manager, &record)?; 433 tracing::info!( 434 job_id = %record.job_id, 435 kind = ?record.kind, 436 status = ?record.status, 437 request_id = record.request_id.as_deref().unwrap_or(""), 438 attempt_id = record.attempt_id.as_deref().unwrap_or(""), 439 signer_publish_workflow_id = record 440 .signer_publish_workflow_id 441 .as_ref() 442 .map(ToString::to_string) 443 .unwrap_or_default(), 444 "recovering myc delivery outbox job" 445 ); 446 447 match record.status { 448 MycDeliveryOutboxStatus::Queued => { 449 if record.signer_publish_workflow_id.is_some() && workflow.is_none() { 450 return Err(self.wrap_recovery_error( 451 &record, 452 MycError::InvalidOperation( 453 "delivery outbox job references a missing signer publish workflow before startup recovery publish" 454 .to_owned(), 455 ), 456 )); 457 } 458 if matches!( 459 workflow.as_ref().map(|workflow| workflow.state), 460 Some(RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize) 461 ) { 462 let publish_attempt_count = record.publish_attempt_count.max(1); 463 let published = self 464 .delivery_outbox_store 465 .mark_published_pending_finalize(&record.job_id, publish_attempt_count)?; 466 self.finalize_recovered_delivery_job( 467 manager, 468 published, 469 workflow.as_ref(), 470 None, 471 )?; 472 return Ok(false); 473 } 474 475 let publish_outcome = self 476 .republish_recovered_outbox_event(&record) 477 .await 478 .map_err(|error| self.wrap_recovery_error(&record, error))?; 479 if let Some(workflow) = workflow.as_ref() { 480 manager 481 .mark_publish_workflow_published(&workflow.workflow_id) 482 .map_err(|error| { 483 self.wrap_recovery_error( 484 &record, 485 MycError::InvalidOperation(format!( 486 "failed to mark signer publish workflow as published during startup recovery: {error}" 487 )), 488 ) 489 })?; 490 } 491 let published_workflow = match record.signer_publish_workflow_id.as_ref() { 492 Some(workflow_id) => Some( 493 manager 494 .get_publish_workflow(workflow_id) 495 .map_err(MycError::from) 496 .and_then(|workflow| { 497 workflow.ok_or_else(|| { 498 MycError::InvalidOperation(format!( 499 "signer publish workflow `{workflow_id}` disappeared after startup recovery publish confirmation" 500 )) 501 }) 502 }) 503 .map_err(|error| self.wrap_recovery_error(&record, error))?, 504 ), 505 None => None, 506 }; 507 let published = self 508 .delivery_outbox_store 509 .mark_published_pending_finalize(&record.job_id, publish_outcome.attempt_count) 510 .map_err(|error| self.wrap_recovery_error(&record, error))?; 511 self.finalize_recovered_delivery_job( 512 manager, 513 published, 514 published_workflow.as_ref(), 515 Some(&publish_outcome), 516 )?; 517 Ok(true) 518 } 519 MycDeliveryOutboxStatus::PublishedPendingFinalize => { 520 self.finalize_recovered_delivery_job(manager, record, workflow.as_ref(), None)?; 521 Ok(false) 522 } 523 MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => Ok(false), 524 } 525 } 526 527 fn finalize_recovered_delivery_job( 528 &self, 529 manager: &RadrootsNostrSignerManager, 530 record: MycDeliveryOutboxRecord, 531 workflow: Option<&RadrootsNostrSignerPublishWorkflowRecord>, 532 publish_outcome: Option<&MycPublishOutcome>, 533 ) -> Result<(), MycError> { 534 if let Some(workflow) = workflow { 535 if workflow.state != RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize { 536 return Err(self.wrap_recovery_error( 537 &record, 538 MycError::InvalidOperation(format!( 539 "signer publish workflow `{}` is in `{}` instead of `published_pending_finalize` during startup recovery", 540 workflow.workflow_id, 541 format!("{:?}", workflow.state) 542 )), 543 )); 544 } 545 manager 546 .finalize_publish_workflow(&workflow.workflow_id) 547 .map_err(|error| { 548 self.wrap_recovery_error( 549 &record, 550 MycError::InvalidOperation(format!( 551 "failed to finalize signer publish workflow during startup recovery: {error}" 552 )), 553 ) 554 })?; 555 } else { 556 self.ensure_record_is_already_finalized_without_workflow(manager, &record)?; 557 } 558 559 let finalized_record = self 560 .delivery_outbox_store 561 .mark_finalized(&record.job_id) 562 .map_err(|error| self.wrap_recovery_error(&record, error))?; 563 self.record_recovery_success(&finalized_record, publish_outcome); 564 Ok(()) 565 } 566 567 fn ensure_record_is_already_finalized_without_workflow( 568 &self, 569 manager: &RadrootsNostrSignerManager, 570 record: &MycDeliveryOutboxRecord, 571 ) -> Result<(), MycError> { 572 let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() else { 573 return Ok(()); 574 }; 575 576 match record.kind { 577 MycDeliveryOutboxKind::ListenerResponsePublish 578 | MycDeliveryOutboxKind::ConnectAcceptPublish => { 579 let connection = self.recovery_connection_record(manager, record)?; 580 if !connection.connect_secret_is_consumed() { 581 return Err(self.wrap_recovery_error( 582 record, 583 MycError::InvalidOperation(format!( 584 "delivery outbox job `{}` references consumed-secret workflow `{workflow_id}` but the connection secret is still reusable", 585 record.job_id 586 )), 587 )); 588 } 589 } 590 MycDeliveryOutboxKind::AuthReplayPublish => { 591 let connection = self.recovery_connection_record(manager, record)?; 592 if connection.auth_state != RadrootsNostrSignerAuthState::Authorized 593 || connection.pending_request.is_some() 594 { 595 return Err(self.wrap_recovery_error( 596 record, 597 MycError::InvalidOperation(format!( 598 "delivery outbox job `{}` references auth replay workflow `{workflow_id}` but the connection auth state is not finalized", 599 record.job_id 600 )), 601 )); 602 } 603 } 604 MycDeliveryOutboxKind::DiscoveryHandlerPublish => { 605 return Err(self.wrap_recovery_error( 606 record, 607 MycError::InvalidOperation(format!( 608 "discovery delivery outbox job `{}` unexpectedly references signer workflow `{workflow_id}`", 609 record.job_id 610 )), 611 )); 612 } 613 } 614 615 Ok(()) 616 } 617 618 fn recovery_connection_record( 619 &self, 620 manager: &RadrootsNostrSignerManager, 621 record: &MycDeliveryOutboxRecord, 622 ) -> Result<RadrootsNostrSignerConnectionRecord, MycError> { 623 let connection_id = record.connection_id.as_ref().ok_or_else(|| { 624 self.wrap_recovery_error( 625 record, 626 MycError::InvalidOperation( 627 "delivery outbox job is missing a connection id required for recovery" 628 .to_owned(), 629 ), 630 ) 631 })?; 632 manager.get_connection(connection_id)?.ok_or_else(|| { 633 self.wrap_recovery_error( 634 record, 635 MycError::InvalidOperation(format!( 636 "delivery outbox job references missing connection `{connection_id}`" 637 )), 638 ) 639 }) 640 } 641 642 fn validate_outbox_workflow_expectations( 643 &self, 644 record: &MycDeliveryOutboxRecord, 645 ) -> Result<(), MycError> { 646 match record.kind { 647 MycDeliveryOutboxKind::DiscoveryHandlerPublish => { 648 if record.signer_publish_workflow_id.is_some() { 649 return Err(self.wrap_recovery_error( 650 record, 651 MycError::InvalidOperation( 652 "discovery delivery outbox jobs must not reference signer publish workflows" 653 .to_owned(), 654 ), 655 )); 656 } 657 } 658 MycDeliveryOutboxKind::ConnectAcceptPublish 659 | MycDeliveryOutboxKind::AuthReplayPublish => { 660 if record.signer_publish_workflow_id.is_none() { 661 return Err(self.wrap_recovery_error( 662 record, 663 MycError::InvalidOperation( 664 "control delivery outbox jobs must reference signer publish workflows" 665 .to_owned(), 666 ), 667 )); 668 } 669 } 670 MycDeliveryOutboxKind::ListenerResponsePublish => {} 671 } 672 Ok(()) 673 } 674 675 fn lookup_publish_workflow_for_record( 676 &self, 677 manager: &RadrootsNostrSignerManager, 678 record: &MycDeliveryOutboxRecord, 679 ) -> Result<Option<RadrootsNostrSignerPublishWorkflowRecord>, MycError> { 680 let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() else { 681 return Ok(None); 682 }; 683 let workflow = manager.get_publish_workflow(workflow_id)?.map(|workflow| { 684 let kind_label = match record.kind { 685 MycDeliveryOutboxKind::ListenerResponsePublish 686 | MycDeliveryOutboxKind::ConnectAcceptPublish => { 687 RadrootsNostrSignerPublishWorkflowKind::ConnectSecretFinalization 688 } 689 MycDeliveryOutboxKind::AuthReplayPublish => { 690 RadrootsNostrSignerPublishWorkflowKind::AuthReplayFinalization 691 } 692 MycDeliveryOutboxKind::DiscoveryHandlerPublish => unreachable!(), 693 }; 694 if workflow.kind != kind_label { 695 return Err(self.wrap_recovery_error( 696 record, 697 MycError::InvalidOperation(format!( 698 "delivery outbox job `{}` expects signer workflow kind `{}` but found `{}`", 699 record.job_id, 700 format!("{kind_label:?}"), 701 format!("{:?}", workflow.kind), 702 )), 703 )); 704 } 705 if let Some(connection_id) = record.connection_id.as_ref() { 706 if &workflow.connection_id != connection_id { 707 return Err(self.wrap_recovery_error( 708 record, 709 MycError::InvalidOperation(format!( 710 "delivery outbox job `{}` connection `{connection_id}` does not match signer workflow connection `{}`", 711 record.job_id, workflow.connection_id 712 )), 713 )); 714 } 715 } 716 Ok(workflow) 717 }); 718 workflow.transpose() 719 } 720 721 async fn republish_recovered_outbox_event( 722 &self, 723 record: &MycDeliveryOutboxRecord, 724 ) -> Result<MycPublishOutcome, MycError> { 725 let signer_identity = self.recovery_publisher_identity(record)?; 726 MycNostrTransport::publish_event_once( 727 &signer_identity, 728 &record.relay_urls, 729 &self.config.transport, 730 recovery_operation_label(record.kind), 731 &record.event, 732 ) 733 .await 734 } 735 736 fn recovery_publisher_identity( 737 &self, 738 record: &MycDeliveryOutboxRecord, 739 ) -> Result<MycActiveIdentity, MycError> { 740 if record.kind != MycDeliveryOutboxKind::DiscoveryHandlerPublish { 741 return Ok(self.signer_identity().clone()); 742 } 743 if record.event.pubkey == self.signer_identity().public_key() { 744 return Ok(self.signer_identity().clone()); 745 } 746 747 let context = MycDiscoveryContext::from_runtime(self)?; 748 if record.event.pubkey != context.app_identity().public_key() { 749 return Err(self.wrap_recovery_error( 750 record, 751 MycError::InvalidOperation(format!( 752 "discovery delivery outbox job author `{}` does not match the configured signer or discovery app identity", 753 record.event.pubkey 754 )), 755 )); 756 } 757 Ok(context.app_identity().clone()) 758 } 759 760 fn record_recovery_success( 761 &self, 762 outbox_record: &MycDeliveryOutboxRecord, 763 publish_outcome: Option<&MycPublishOutcome>, 764 ) { 765 let (relay_count, acknowledged_relay_count, summary, mut audit_record) = 766 match publish_outcome { 767 Some(publish_outcome) => ( 768 publish_outcome.relay_count, 769 publish_outcome.acknowledged_relay_count, 770 publish_outcome.relay_outcome_summary.clone(), 771 MycOperationAuditRecord::new( 772 recovery_operation_audit_kind(outbox_record.kind), 773 MycOperationAuditOutcome::Succeeded, 774 outbox_record.connection_id.as_ref(), 775 outbox_record.request_id.as_deref(), 776 publish_outcome.relay_count, 777 publish_outcome.acknowledged_relay_count, 778 publish_outcome.relay_outcome_summary.clone(), 779 ) 780 .with_delivery_details( 781 publish_outcome.delivery_policy, 782 publish_outcome.required_acknowledged_relay_count, 783 publish_outcome.attempt_count, 784 ), 785 ), 786 None => { 787 let relay_count = outbox_record.relay_urls.len(); 788 let required_acknowledged_relay_count = self 789 .required_acknowledged_relay_count(relay_count) 790 .unwrap_or_default(); 791 let summary = "startup recovery finalized previously published delivery job"; 792 ( 793 relay_count, 794 required_acknowledged_relay_count, 795 summary.to_owned(), 796 MycOperationAuditRecord::new( 797 recovery_operation_audit_kind(outbox_record.kind), 798 MycOperationAuditOutcome::Succeeded, 799 outbox_record.connection_id.as_ref(), 800 outbox_record.request_id.as_deref(), 801 relay_count, 802 required_acknowledged_relay_count, 803 summary.to_owned(), 804 ) 805 .with_delivery_details( 806 self.config.transport.delivery_policy, 807 required_acknowledged_relay_count, 808 outbox_record.publish_attempt_count.max(1), 809 ), 810 ) 811 } 812 }; 813 if let Some(attempt_id) = outbox_record.attempt_id.as_deref() { 814 audit_record = audit_record.with_attempt_id(attempt_id); 815 } 816 tracing::info!( 817 job_id = %outbox_record.job_id, 818 kind = ?outbox_record.kind, 819 relay_count, 820 acknowledged_relay_count, 821 summary = %summary, 822 "recovered myc delivery outbox job" 823 ); 824 self.record_operation_audit(&audit_record); 825 } 826 827 fn record_delivery_recovery_summary( 828 &self, 829 outcome: MycOperationAuditOutcome, 830 unfinished_job_count: usize, 831 finalized_job_count: usize, 832 republished_job_count: usize, 833 summary: impl Into<String>, 834 ) { 835 let summary = summary.into(); 836 let record = MycOperationAuditRecord::new( 837 MycOperationAuditKind::DeliveryRecovery, 838 outcome, 839 None, 840 None, 841 unfinished_job_count, 842 finalized_job_count, 843 summary.clone(), 844 ); 845 tracing::info!( 846 outcome = ?outcome, 847 unfinished_job_count, 848 finalized_job_count, 849 republished_job_count, 850 summary = %summary, 851 "recorded myc delivery recovery summary" 852 ); 853 self.record_operation_audit(&record); 854 } 855 856 fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, MycError> { 857 match self.config.transport.delivery_policy { 858 MycTransportDeliveryPolicy::Any => Ok(1), 859 MycTransportDeliveryPolicy::All => Ok(relay_count), 860 MycTransportDeliveryPolicy::Quorum => { 861 let delivery_quorum = self.config.transport.delivery_quorum.ok_or_else(|| { 862 MycError::InvalidOperation( 863 "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`" 864 .to_owned(), 865 ) 866 })?; 867 if delivery_quorum > relay_count { 868 return Err(MycError::InvalidOperation(format!( 869 "transport.delivery_quorum `{delivery_quorum}` cannot be satisfied by `{relay_count}` target relays" 870 ))); 871 } 872 Ok(delivery_quorum) 873 } 874 } 875 } 876 877 fn wrap_recovery_error(&self, record: &MycDeliveryOutboxRecord, error: MycError) -> MycError { 878 let wrapped = MycError::InvalidOperation(format!( 879 "startup recovery failed for delivery outbox job `{}` ({:?}): {error}", 880 record.job_id, record.kind 881 )); 882 tracing::error!( 883 job_id = %record.job_id, 884 kind = ?record.kind, 885 status = ?record.status, 886 request_id = record.request_id.as_deref().unwrap_or(""), 887 attempt_id = record.attempt_id.as_deref().unwrap_or(""), 888 error = %wrapped, 889 "myc startup delivery recovery failed" 890 ); 891 wrapped 892 } 893 } 894 895 fn recovery_operation_label(kind: MycDeliveryOutboxKind) -> &'static str { 896 match kind { 897 MycDeliveryOutboxKind::ListenerResponsePublish => "listener response recovery publish", 898 MycDeliveryOutboxKind::ConnectAcceptPublish => "connect accept recovery publish", 899 MycDeliveryOutboxKind::AuthReplayPublish => "auth replay recovery publish", 900 MycDeliveryOutboxKind::DiscoveryHandlerPublish => "discovery handler recovery publish", 901 } 902 } 903 904 fn recovery_operation_audit_kind(kind: MycDeliveryOutboxKind) -> MycOperationAuditKind { 905 match kind { 906 MycDeliveryOutboxKind::ListenerResponsePublish => { 907 MycOperationAuditKind::ListenerResponsePublish 908 } 909 MycDeliveryOutboxKind::ConnectAcceptPublish => MycOperationAuditKind::ConnectAcceptPublish, 910 MycDeliveryOutboxKind::AuthReplayPublish => MycOperationAuditKind::AuthReplayPublish, 911 MycDeliveryOutboxKind::DiscoveryHandlerPublish => { 912 MycOperationAuditKind::DiscoveryHandlerPublish 913 } 914 } 915 } 916 917 async fn drain_runtime_tasks( 918 mut tasks: tokio::task::JoinSet<Result<(), MycError>>, 919 ) -> Result<(), MycError> { 920 let mut first_error = None; 921 while let Some(joined) = tasks.join_next().await { 922 match joined { 923 Ok(Ok(())) => {} 924 Ok(Err(error)) => { 925 if first_error.is_none() { 926 first_error = Some(error); 927 } 928 } 929 Err(error) => { 930 if first_error.is_none() { 931 first_error = Some(MycError::InvalidOperation(format!( 932 "myc runtime task failed: {error}" 933 ))); 934 } 935 } 936 } 937 } 938 939 match first_error { 940 Some(error) => Err(error), 941 None => Ok(()), 942 } 943 } 944 945 async fn observe_shutdown_signal(mut shutdown_rx: tokio::sync::watch::Receiver<bool>) { 946 loop { 947 if *shutdown_rx.borrow() { 948 break; 949 } 950 if shutdown_rx.changed().await.is_err() { 951 break; 952 } 953 } 954 } 955 956 impl MycRuntimePaths { 957 pub(crate) fn audit_dir_for_state_dir(state_dir: &Path) -> PathBuf { 958 state_dir.join("audit") 959 } 960 961 pub(crate) fn signer_state_path_for_backend( 962 state_dir: &Path, 963 backend: MycSignerStateBackend, 964 ) -> PathBuf { 965 state_dir.join(match backend { 966 MycSignerStateBackend::JsonFile => "signer-state.json", 967 MycSignerStateBackend::Sqlite => "signer-state.sqlite", 968 }) 969 } 970 971 pub(crate) fn runtime_audit_path_for_backend( 972 audit_dir: &Path, 973 backend: MycRuntimeAuditBackend, 974 ) -> PathBuf { 975 audit_dir.join(match backend { 976 MycRuntimeAuditBackend::JsonlFile => "operations.jsonl", 977 MycRuntimeAuditBackend::Sqlite => "operations.sqlite", 978 }) 979 } 980 981 pub(crate) fn delivery_outbox_path_for_state_dir(state_dir: &Path) -> PathBuf { 982 state_dir.join("delivery-outbox.sqlite") 983 } 984 985 fn from_config(config: &MycConfig) -> Self { 986 let state_dir = config.paths.state_dir.clone(); 987 let audit_dir = Self::audit_dir_for_state_dir(&state_dir); 988 Self { 989 signer_identity_path: config.paths.signer_identity_path.clone(), 990 user_identity_path: config.paths.user_identity_path.clone(), 991 signer_state_path: Self::signer_state_path_for_backend( 992 &state_dir, 993 config.persistence.signer_state_backend, 994 ), 995 runtime_audit_path: Self::runtime_audit_path_for_backend( 996 &audit_dir, 997 config.persistence.runtime_audit_backend, 998 ), 999 delivery_outbox_path: Self::delivery_outbox_path_for_state_dir(&state_dir), 1000 audit_dir, 1001 state_dir, 1002 } 1003 } 1004 } 1005 1006 impl MycSignerContext { 1007 pub fn signer_identity(&self) -> &MycActiveIdentity { 1008 &self.signer_identity 1009 } 1010 1011 pub fn signer_identity_source(&self) -> &MycIdentitySourceSpec { 1012 self.signer_identity_provider.source() 1013 } 1014 1015 pub fn signer_identity_provider(&self) -> &MycIdentityProvider { 1016 &self.signer_identity_provider 1017 } 1018 1019 pub fn signer_public_identity(&self) -> RadrootsIdentityPublic { 1020 self.signer_identity.to_public() 1021 } 1022 1023 pub fn user_identity(&self) -> &MycActiveIdentity { 1024 &self.user_identity 1025 } 1026 1027 pub fn user_identity_source(&self) -> &MycIdentitySourceSpec { 1028 self.user_identity_provider.source() 1029 } 1030 1031 pub fn user_identity_provider(&self) -> &MycIdentityProvider { 1032 &self.user_identity_provider 1033 } 1034 1035 pub fn user_public_identity(&self) -> RadrootsIdentityPublic { 1036 self.user_identity.to_public() 1037 } 1038 1039 pub fn load_signer_manager(&self) -> Result<RadrootsNostrSignerManager, MycError> { 1040 Self::load_signer_manager_from_store(self.signer_store.clone()) 1041 } 1042 1043 pub fn operation_audit_store(&self) -> Arc<dyn MycOperationAuditStore> { 1044 self.operation_audit_store.clone() 1045 } 1046 1047 pub fn record_signer_request_audit(&self, record: &RadrootsNostrSignerRequestAuditRecord) { 1048 let mut metrics = self 1049 .live_metrics 1050 .lock() 1051 .unwrap_or_else(|poisoned| poisoned.into_inner()); 1052 metrics.record_signer_request_audit(record); 1053 } 1054 1055 pub fn record_operation_audit(&self, record: &MycOperationAuditRecord) { 1056 emit_operation_audit_trace(record); 1057 match self.operation_audit_store.append(record) { 1058 Ok(()) => { 1059 let mut metrics = self 1060 .live_metrics 1061 .lock() 1062 .unwrap_or_else(|poisoned| poisoned.into_inner()); 1063 metrics.record_runtime_operation(record); 1064 } 1065 Err(error) => { 1066 tracing::error!( 1067 operation = ?record.operation, 1068 outcome = ?record.outcome, 1069 relay_url = record.relay_url.as_deref().unwrap_or(""), 1070 connection_id = record.connection_id.as_deref().unwrap_or(""), 1071 request_id = record.request_id.as_deref().unwrap_or(""), 1072 attempt_id = record.attempt_id.as_deref().unwrap_or(""), 1073 delivery_policy = ?record.delivery_policy, 1074 required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(), 1075 publish_attempt_count = record.publish_attempt_count.unwrap_or_default(), 1076 relay_count = record.relay_count, 1077 acknowledged_relay_count = record.acknowledged_relay_count, 1078 relay_outcome_summary = %record.relay_outcome_summary, 1079 error = %error, 1080 "failed to persist myc operation audit record" 1081 ); 1082 } 1083 } 1084 } 1085 1086 pub fn metrics_snapshot( 1087 &self, 1088 outbox_status: &MycDeliveryOutboxStatusOutput, 1089 ) -> MycMetricsSnapshot { 1090 let metrics = self 1091 .live_metrics 1092 .lock() 1093 .unwrap_or_else(|poisoned| poisoned.into_inner()); 1094 metrics.snapshot(outbox_status) 1095 } 1096 1097 pub fn connection_approval_requirement(&self) -> RadrootsNostrSignerApprovalRequirement { 1098 self.connection_approval_requirement 1099 } 1100 1101 pub fn policy(&self) -> &MycPolicyContext { 1102 &self.policy 1103 } 1104 1105 fn bootstrap( 1106 paths: &MycRuntimePaths, 1107 persistence: &MycPersistenceConfig, 1108 audit_config: MycAuditConfig, 1109 policy: MycPolicyContext, 1110 external_command_timeout: Duration, 1111 signer_identity_source: MycIdentitySourceSpec, 1112 user_identity_source: MycIdentitySourceSpec, 1113 ) -> Result<Self, MycError> { 1114 let signer_identity_provider = MycIdentityProvider::from_source( 1115 "signer", 1116 signer_identity_source, 1117 external_command_timeout, 1118 )?; 1119 let user_identity_provider = MycIdentityProvider::from_source( 1120 "user", 1121 user_identity_source, 1122 external_command_timeout, 1123 )?; 1124 let signer_identity = signer_identity_provider.load_active_identity()?; 1125 let user_identity = user_identity_provider.load_active_identity()?; 1126 let signer_store = Self::build_signer_store(persistence, &paths.signer_state_path)?; 1127 let operation_audit_store = 1128 Self::build_operation_audit_store(persistence, &paths.audit_dir, audit_config)?; 1129 let manager = Self::load_signer_manager_from_store(signer_store.clone())?; 1130 let live_metrics = Arc::new(std::sync::Mutex::new(MycLiveMetricsState::from_records( 1131 &manager.list_audit_records()?, 1132 &operation_audit_store.list_all()?, 1133 ))); 1134 let configured_public = signer_identity.to_public(); 1135 1136 match manager.signer_identity()? { 1137 Some(existing) if existing.id != configured_public.id => { 1138 return Err(MycError::SignerIdentityMismatch { 1139 identity_path: paths.signer_identity_path.clone(), 1140 state_path: paths.signer_state_path.clone(), 1141 configured_identity_id: configured_public.id.to_string(), 1142 persisted_identity_id: existing.id.to_string(), 1143 }); 1144 } 1145 Some(_) => manager.set_signer_identity(configured_public.clone())?, 1146 None => manager.set_signer_identity(configured_public.clone())?, 1147 } 1148 let stale_session_cleanup_count = policy.cleanup_stale_sessions(&manager)?; 1149 if stale_session_cleanup_count > 0 { 1150 tracing::info!( 1151 stale_session_cleanup_count, 1152 "cleaned stale trusted auth sessions during myc bootstrap" 1153 ); 1154 } 1155 1156 Ok(Self { 1157 signer_identity_provider, 1158 user_identity_provider, 1159 signer_identity, 1160 user_identity, 1161 signer_store, 1162 operation_audit_store, 1163 live_metrics, 1164 connection_approval_requirement: policy.default_approval_requirement(), 1165 policy, 1166 }) 1167 } 1168 1169 fn build_signer_store( 1170 persistence: &MycPersistenceConfig, 1171 path: &Path, 1172 ) -> Result<Arc<dyn RadrootsNostrSignerStore>, MycError> { 1173 match persistence.signer_state_backend { 1174 MycSignerStateBackend::JsonFile => { 1175 Ok(Arc::new(RadrootsNostrFileSignerStore::new(path))) 1176 } 1177 MycSignerStateBackend::Sqlite => { 1178 Ok(Arc::new(RadrootsNostrSqliteSignerStore::open(path)?)) 1179 } 1180 } 1181 } 1182 1183 fn build_operation_audit_store( 1184 persistence: &MycPersistenceConfig, 1185 audit_dir: &Path, 1186 audit_config: MycAuditConfig, 1187 ) -> Result<Arc<dyn MycOperationAuditStore>, MycError> { 1188 match persistence.runtime_audit_backend { 1189 MycRuntimeAuditBackend::JsonlFile => Ok(Arc::new(MycJsonlOperationAuditStore::new( 1190 audit_dir, 1191 audit_config, 1192 ))), 1193 MycRuntimeAuditBackend::Sqlite => Ok(Arc::new(MycSqliteOperationAuditStore::open( 1194 audit_dir, 1195 audit_config, 1196 )?)), 1197 } 1198 } 1199 1200 fn load_signer_manager_from_store( 1201 store: Arc<dyn RadrootsNostrSignerStore>, 1202 ) -> Result<RadrootsNostrSignerManager, MycError> { 1203 Ok(RadrootsNostrSignerManager::new(store)?) 1204 } 1205 } 1206 1207 fn emit_operation_audit_trace(record: &MycOperationAuditRecord) { 1208 match record.outcome { 1209 crate::audit::MycOperationAuditOutcome::Succeeded 1210 | crate::audit::MycOperationAuditOutcome::Missing 1211 | crate::audit::MycOperationAuditOutcome::Matched 1212 | crate::audit::MycOperationAuditOutcome::Skipped => tracing::info!( 1213 operation = ?record.operation, 1214 outcome = ?record.outcome, 1215 relay_url = record.relay_url.as_deref().unwrap_or(""), 1216 connection_id = record.connection_id.as_deref().unwrap_or(""), 1217 request_id = record.request_id.as_deref().unwrap_or(""), 1218 attempt_id = record.attempt_id.as_deref().unwrap_or(""), 1219 delivery_policy = ?record.delivery_policy, 1220 required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(), 1221 publish_attempt_count = record.publish_attempt_count.unwrap_or_default(), 1222 relay_count = record.relay_count, 1223 acknowledged_relay_count = record.acknowledged_relay_count, 1224 relay_outcome_summary = %record.relay_outcome_summary, 1225 "recorded myc operation audit" 1226 ), 1227 crate::audit::MycOperationAuditOutcome::Rejected 1228 | crate::audit::MycOperationAuditOutcome::Restored 1229 | crate::audit::MycOperationAuditOutcome::Unavailable 1230 | crate::audit::MycOperationAuditOutcome::Drifted 1231 | crate::audit::MycOperationAuditOutcome::Conflicted => tracing::warn!( 1232 operation = ?record.operation, 1233 outcome = ?record.outcome, 1234 relay_url = record.relay_url.as_deref().unwrap_or(""), 1235 connection_id = record.connection_id.as_deref().unwrap_or(""), 1236 request_id = record.request_id.as_deref().unwrap_or(""), 1237 attempt_id = record.attempt_id.as_deref().unwrap_or(""), 1238 delivery_policy = ?record.delivery_policy, 1239 required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(), 1240 publish_attempt_count = record.publish_attempt_count.unwrap_or_default(), 1241 relay_count = record.relay_count, 1242 acknowledged_relay_count = record.acknowledged_relay_count, 1243 relay_outcome_summary = %record.relay_outcome_summary, 1244 "recorded myc operation audit" 1245 ), 1246 } 1247 } 1248 1249 #[cfg(test)] 1250 mod tests { 1251 use std::fs; 1252 #[cfg(unix)] 1253 use std::os::unix::fs::PermissionsExt; 1254 use std::path::PathBuf; 1255 use std::sync::Arc; 1256 1257 use nostr::PublicKey; 1258 use radroots_identity::RadrootsIdentity; 1259 use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind}; 1260 use radroots_nostr_signer::prelude::{ 1261 RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement, 1262 RadrootsNostrSignerAuthState, RadrootsNostrSignerConnectionDraft, 1263 RadrootsNostrSignerManager, RadrootsNostrSqliteSignerStore, 1264 }; 1265 1266 use super::{MycRuntime, startup_identity_path}; 1267 use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord}; 1268 use crate::config::{ 1269 MycConfig, MycIdentityBackend, MycRuntimeAuditBackend, MycSignerStateBackend, 1270 }; 1271 use crate::discovery::MycDiscoveryContext; 1272 use crate::error::MycError; 1273 use crate::outbox::{MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus}; 1274 1275 fn write_test_identity(path: &std::path::Path, secret_key: &str) { 1276 let identity = 1277 RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret"); 1278 crate::identity_files::store_encrypted_identity(path, &identity).expect("write identity"); 1279 } 1280 1281 fn write_external_command_helper(path: &std::path::Path, secret_key: &str) -> RadrootsIdentity { 1282 let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity"); 1283 let identity_json = 1284 serde_json::to_string(&identity.to_public()).expect("serialize public identity"); 1285 let script = format!( 1286 "#!/bin/sh\nrequest=\"$(cat)\"\ncase \"$request\" in\n *'\"operation\":\"describe\"'*) printf '%s' '{{\"identity\":{identity_json}}}' ;;\n *) printf '%s' '{{\"error\":\"unsupported operation\"}}' ;;\nesac\n" 1287 ); 1288 fs::write(path, script).expect("write helper"); 1289 #[cfg(unix)] 1290 { 1291 let mut permissions = fs::metadata(path).expect("metadata").permissions(); 1292 permissions.set_mode(0o755); 1293 fs::set_permissions(path, permissions).expect("set permissions"); 1294 } 1295 identity 1296 } 1297 1298 #[test] 1299 fn bootstrap_creates_runtime_directories() { 1300 let temp = tempfile::tempdir().expect("tempdir"); 1301 let mut config = MycConfig::default(); 1302 config.paths.state_dir = PathBuf::from(temp.path()).join("state"); 1303 config.paths.signer_identity_path = temp.path().join("identity.json"); 1304 config.paths.user_identity_path = temp.path().join("user.json"); 1305 write_test_identity( 1306 &config.paths.signer_identity_path, 1307 "1111111111111111111111111111111111111111111111111111111111111111", 1308 ); 1309 write_test_identity( 1310 &config.paths.user_identity_path, 1311 "2222222222222222222222222222222222222222222222222222222222222222", 1312 ); 1313 1314 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1315 assert!(runtime.paths().state_dir.is_dir()); 1316 assert!(runtime.paths().audit_dir.is_dir()); 1317 assert_eq!( 1318 runtime.paths().signer_identity_path, 1319 temp.path().join("identity.json") 1320 ); 1321 assert_eq!( 1322 runtime.paths().user_identity_path, 1323 temp.path().join("user.json") 1324 ); 1325 assert!( 1326 runtime 1327 .paths() 1328 .signer_state_path 1329 .ends_with("signer-state.json") 1330 ); 1331 assert!(runtime.paths().signer_state_path.is_file()); 1332 assert!( 1333 runtime 1334 .paths() 1335 .delivery_outbox_path 1336 .ends_with("delivery-outbox.sqlite") 1337 ); 1338 assert!(runtime.paths().delivery_outbox_path.is_file()); 1339 assert!( 1340 runtime 1341 .delivery_outbox_store() 1342 .list_all() 1343 .expect("list outbox jobs") 1344 .is_empty() 1345 ); 1346 assert_eq!( 1347 runtime 1348 .signer_manager() 1349 .expect("manager") 1350 .signer_identity() 1351 .expect("signer identity") 1352 .expect("configured signer") 1353 .id 1354 .to_string(), 1355 runtime.snapshot().signer_identity_id 1356 ); 1357 assert_eq!( 1358 runtime.user_identity().public_key_hex(), 1359 runtime.snapshot().user_public_key_hex 1360 ); 1361 assert!(!runtime.snapshot().transport.enabled); 1362 } 1363 1364 #[test] 1365 fn bootstrap_rejects_invalid_config() { 1366 let mut config = MycConfig::default(); 1367 config.service.instance_name.clear(); 1368 1369 let err = match MycRuntime::bootstrap(config) { 1370 Ok(_) => panic!("expected invalid config error"), 1371 Err(err) => err, 1372 }; 1373 assert!(err.to_string().contains("service.instance_name")); 1374 } 1375 1376 #[test] 1377 fn bootstrap_rejects_mismatched_persisted_signer_identity() { 1378 let temp = tempfile::tempdir().expect("tempdir"); 1379 let identity_path = temp.path().join("identity.json"); 1380 let user_path = temp.path().join("user.json"); 1381 write_test_identity( 1382 &identity_path, 1383 "1111111111111111111111111111111111111111111111111111111111111111", 1384 ); 1385 write_test_identity( 1386 &user_path, 1387 "3333333333333333333333333333333333333333333333333333333333333333", 1388 ); 1389 1390 let store_identity = RadrootsIdentity::from_secret_key_str( 1391 "2222222222222222222222222222222222222222222222222222222222222222", 1392 ) 1393 .expect("second identity"); 1394 let store = Arc::new(RadrootsNostrFileSignerStore::new( 1395 temp.path().join("state").join("signer-state.json"), 1396 )); 1397 let manager = RadrootsNostrSignerManager::new(store).expect("manager"); 1398 manager 1399 .set_signer_identity(store_identity.to_public()) 1400 .expect("persist signer"); 1401 1402 let mut config = MycConfig::default(); 1403 config.paths.state_dir = temp.path().join("state"); 1404 config.paths.signer_identity_path = identity_path; 1405 config.paths.user_identity_path = user_path; 1406 1407 let err = match MycRuntime::bootstrap(config) { 1408 Ok(_) => panic!("expected identity mismatch"), 1409 Err(err) => err, 1410 }; 1411 assert!(matches!(err, MycError::SignerIdentityMismatch { .. })); 1412 } 1413 1414 #[test] 1415 fn bootstrap_keeps_signer_and_user_identities_distinct() { 1416 let temp = tempfile::tempdir().expect("tempdir"); 1417 let mut config = MycConfig::default(); 1418 config.paths.state_dir = temp.path().join("state"); 1419 config.paths.signer_identity_path = temp.path().join("signer.json"); 1420 config.paths.user_identity_path = temp.path().join("user.json"); 1421 write_test_identity( 1422 &config.paths.signer_identity_path, 1423 "1111111111111111111111111111111111111111111111111111111111111111", 1424 ); 1425 write_test_identity( 1426 &config.paths.user_identity_path, 1427 "2222222222222222222222222222222222222222222222222222222222222222", 1428 ); 1429 1430 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1431 1432 assert_ne!( 1433 runtime.signer_public_identity().public_key_hex, 1434 runtime.user_public_identity().public_key_hex 1435 ); 1436 assert_ne!( 1437 runtime.snapshot().signer_identity_id, 1438 runtime.snapshot().user_identity_id 1439 ); 1440 } 1441 1442 #[test] 1443 fn bootstrap_cleans_stale_trusted_authorized_sessions() { 1444 let temp = tempfile::tempdir().expect("tempdir"); 1445 let mut config = MycConfig::default(); 1446 config.paths.state_dir = temp.path().join("state"); 1447 config.paths.signer_identity_path = temp.path().join("signer.json"); 1448 config.paths.user_identity_path = temp.path().join("user.json"); 1449 config.policy.auth_url = Some("https://auth.example/challenge".to_owned()); 1450 config.policy.auth_authorized_ttl_secs = Some(1); 1451 let client_public_key = 1452 PublicKey::parse("4545454545454545454545454545454545454545454545454545454545454545") 1453 .expect("client public key"); 1454 config.policy.trusted_client_pubkeys = vec![client_public_key.to_hex()]; 1455 write_test_identity( 1456 &config.paths.signer_identity_path, 1457 "1111111111111111111111111111111111111111111111111111111111111111", 1458 ); 1459 write_test_identity( 1460 &config.paths.user_identity_path, 1461 "2222222222222222222222222222222222222222222222222222222222222222", 1462 ); 1463 1464 let runtime = MycRuntime::bootstrap(config.clone()).expect("runtime"); 1465 let manager = runtime.signer_manager().expect("manager"); 1466 let connection = manager 1467 .register_connection( 1468 RadrootsNostrSignerConnectionDraft::new( 1469 client_public_key, 1470 runtime.user_public_identity(), 1471 ) 1472 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1473 ) 1474 .expect("register connection"); 1475 manager 1476 .require_auth_challenge( 1477 &connection.connection_id, 1478 config.policy.auth_url.as_deref().expect("auth url"), 1479 ) 1480 .expect("require auth"); 1481 manager 1482 .authorize_auth_challenge(&connection.connection_id) 1483 .expect("authorize auth"); 1484 1485 std::thread::sleep(std::time::Duration::from_secs(2)); 1486 drop(runtime); 1487 1488 let runtime = MycRuntime::bootstrap(config).expect("runtime restart"); 1489 let reloaded = runtime 1490 .signer_manager() 1491 .expect("manager") 1492 .get_connection(&connection.connection_id) 1493 .expect("load connection") 1494 .expect("connection"); 1495 1496 assert_eq!(reloaded.auth_state, RadrootsNostrSignerAuthState::Pending); 1497 assert!(reloaded.auth_challenge.is_some()); 1498 } 1499 1500 #[test] 1501 fn bootstrap_prepares_transport_when_enabled() { 1502 let temp = tempfile::tempdir().expect("tempdir"); 1503 let mut config = MycConfig::default(); 1504 config.paths.state_dir = temp.path().join("state"); 1505 config.paths.signer_identity_path = temp.path().join("signer.json"); 1506 config.paths.user_identity_path = temp.path().join("user.json"); 1507 config.transport.enabled = true; 1508 config.transport.connect_timeout_secs = 15; 1509 config.transport.relays = vec!["wss://relay.example.com".to_owned()]; 1510 write_test_identity( 1511 &config.paths.signer_identity_path, 1512 "1111111111111111111111111111111111111111111111111111111111111111", 1513 ); 1514 write_test_identity( 1515 &config.paths.user_identity_path, 1516 "2222222222222222222222222222222222222222222222222222222222222222", 1517 ); 1518 1519 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1520 1521 assert!(runtime.transport().is_some()); 1522 assert!(runtime.snapshot().transport.enabled); 1523 assert_eq!(runtime.snapshot().transport.relay_count, 1); 1524 assert_eq!(runtime.snapshot().transport.connect_timeout_secs, 15); 1525 } 1526 1527 #[tokio::test] 1528 async fn bootstrap_prepares_signerless_transport_for_external_command_backend() { 1529 let temp = tempfile::tempdir().expect("tempdir"); 1530 let helper_path = temp.path().join("signer-helper.sh"); 1531 let helper_identity = write_external_command_helper( 1532 &helper_path, 1533 "1111111111111111111111111111111111111111111111111111111111111111", 1534 ); 1535 let mut config = MycConfig::default(); 1536 config.paths.state_dir = temp.path().join("state"); 1537 config.paths.signer_identity_backend = MycIdentityBackend::ExternalCommand; 1538 config.paths.signer_identity_path = helper_path; 1539 config.paths.user_identity_path = temp.path().join("user.json"); 1540 config.transport.enabled = true; 1541 config.transport.connect_timeout_secs = 15; 1542 config.transport.relays = vec!["wss://relay.example.com".to_owned()]; 1543 write_test_identity( 1544 &config.paths.user_identity_path, 1545 "2222222222222222222222222222222222222222222222222222222222222222", 1546 ); 1547 1548 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1549 1550 assert!(runtime.transport().is_some()); 1551 assert!( 1552 !runtime 1553 .transport() 1554 .expect("transport") 1555 .client() 1556 .has_signer() 1557 .await 1558 ); 1559 assert_eq!( 1560 runtime.signer_identity().public_key_hex(), 1561 helper_identity.public_key_hex() 1562 ); 1563 } 1564 1565 #[tokio::test] 1566 async fn discovery_context_uses_signerless_client_for_external_command_app_identity() { 1567 let temp = tempfile::tempdir().expect("tempdir"); 1568 let helper_path = temp.path().join("discovery-helper.sh"); 1569 let helper_identity = write_external_command_helper( 1570 &helper_path, 1571 "6666666666666666666666666666666666666666666666666666666666666666", 1572 ); 1573 let mut config = MycConfig::default(); 1574 config.paths.state_dir = temp.path().join("state"); 1575 config.paths.signer_identity_path = temp.path().join("signer.json"); 1576 config.paths.user_identity_path = temp.path().join("user.json"); 1577 config.discovery.enabled = true; 1578 config.discovery.domain = Some("signer.example.com".to_owned()); 1579 config.discovery.public_relays = vec!["wss://relay.example.com".to_owned()]; 1580 config.discovery.publish_relays = vec!["wss://relay.example.com".to_owned()]; 1581 config.discovery.nostrconnect_url_template = 1582 Some("https://signer.example.com/connect?uri=<nostrconnect>".to_owned()); 1583 config.discovery.app_identity_backend = Some(MycIdentityBackend::ExternalCommand); 1584 config.discovery.app_identity_path = Some(helper_path); 1585 write_test_identity( 1586 &config.paths.signer_identity_path, 1587 "1111111111111111111111111111111111111111111111111111111111111111", 1588 ); 1589 write_test_identity( 1590 &config.paths.user_identity_path, 1591 "2222222222222222222222222222222222222222222222222222222222222222", 1592 ); 1593 1594 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1595 let context = MycDiscoveryContext::from_runtime(&runtime).expect("discovery context"); 1596 1597 assert!(!context.app_identity().nostr_client().has_signer().await); 1598 assert_eq!( 1599 context.app_identity().public_key_hex(), 1600 helper_identity.public_key_hex() 1601 ); 1602 } 1603 1604 #[test] 1605 fn bootstrap_supports_sqlite_signer_state_backend() { 1606 let temp = tempfile::tempdir().expect("tempdir"); 1607 let mut config = MycConfig::default(); 1608 config.paths.state_dir = temp.path().join("state"); 1609 config.paths.signer_identity_path = temp.path().join("signer.json"); 1610 config.paths.user_identity_path = temp.path().join("user.json"); 1611 config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite; 1612 write_test_identity( 1613 &config.paths.signer_identity_path, 1614 "1111111111111111111111111111111111111111111111111111111111111111", 1615 ); 1616 write_test_identity( 1617 &config.paths.user_identity_path, 1618 "2222222222222222222222222222222222222222222222222222222222222222", 1619 ); 1620 1621 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1622 1623 assert!( 1624 runtime 1625 .paths() 1626 .signer_state_path 1627 .ends_with("signer-state.sqlite") 1628 ); 1629 assert!(runtime.paths().signer_state_path.is_file()); 1630 assert!(runtime.paths().delivery_outbox_path.is_file()); 1631 } 1632 1633 #[test] 1634 fn bootstrap_rejects_mismatched_persisted_sqlite_signer_identity() { 1635 let temp = tempfile::tempdir().expect("tempdir"); 1636 let identity_path = temp.path().join("identity.json"); 1637 let user_path = temp.path().join("user.json"); 1638 write_test_identity( 1639 &identity_path, 1640 "1111111111111111111111111111111111111111111111111111111111111111", 1641 ); 1642 write_test_identity( 1643 &user_path, 1644 "3333333333333333333333333333333333333333333333333333333333333333", 1645 ); 1646 1647 let store_identity = RadrootsIdentity::from_secret_key_str( 1648 "2222222222222222222222222222222222222222222222222222222222222222", 1649 ) 1650 .expect("second identity"); 1651 let store = Arc::new( 1652 RadrootsNostrSqliteSignerStore::open( 1653 temp.path().join("state").join("signer-state.sqlite"), 1654 ) 1655 .expect("open sqlite store"), 1656 ); 1657 let manager = RadrootsNostrSignerManager::new(store).expect("manager"); 1658 manager 1659 .set_signer_identity(store_identity.to_public()) 1660 .expect("persist signer"); 1661 1662 let mut config = MycConfig::default(); 1663 config.paths.state_dir = temp.path().join("state"); 1664 config.paths.signer_identity_path = identity_path; 1665 config.paths.user_identity_path = user_path; 1666 config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite; 1667 1668 let err = match MycRuntime::bootstrap(config) { 1669 Ok(_) => panic!("expected identity mismatch"), 1670 Err(err) => err, 1671 }; 1672 assert!(matches!(err, MycError::SignerIdentityMismatch { .. })); 1673 } 1674 1675 #[test] 1676 fn bootstrap_supports_sqlite_operation_audit_backend() { 1677 let temp = tempfile::tempdir().expect("tempdir"); 1678 let mut config = MycConfig::default(); 1679 config.paths.state_dir = temp.path().join("state"); 1680 config.paths.signer_identity_path = temp.path().join("signer.json"); 1681 config.paths.user_identity_path = temp.path().join("user.json"); 1682 config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite; 1683 write_test_identity( 1684 &config.paths.signer_identity_path, 1685 "1111111111111111111111111111111111111111111111111111111111111111", 1686 ); 1687 write_test_identity( 1688 &config.paths.user_identity_path, 1689 "2222222222222222222222222222222222222222222222222222222222222222", 1690 ); 1691 1692 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1693 runtime.record_operation_audit(&MycOperationAuditRecord::new( 1694 MycOperationAuditKind::ListenerResponsePublish, 1695 MycOperationAuditOutcome::Succeeded, 1696 None, 1697 Some("request-1"), 1698 1, 1699 1, 1700 "relay acknowledged publish", 1701 )); 1702 1703 let records = runtime 1704 .operation_audit_store() 1705 .list() 1706 .expect("list runtime audit"); 1707 assert_eq!(records.len(), 1); 1708 assert!( 1709 runtime 1710 .paths() 1711 .audit_dir 1712 .join("operations.sqlite") 1713 .is_file() 1714 ); 1715 assert!(runtime.paths().delivery_outbox_path.is_file()); 1716 } 1717 1718 #[test] 1719 fn startup_identity_path_reporting_matches_backend_sources() { 1720 let mut config = MycConfig::default(); 1721 config.paths.signer_identity_backend = MycIdentityBackend::HostVault; 1722 config.paths.signer_identity_keyring_account_id = 1723 Some("1111111111111111111111111111111111111111111111111111111111111111".to_owned()); 1724 config.paths.signer_identity_profile_path = Some(PathBuf::from("/tmp/signer-profile.json")); 1725 config.paths.user_identity_backend = MycIdentityBackend::ManagedAccount; 1726 config.paths.user_identity_path = PathBuf::from("/tmp/user-accounts.json"); 1727 1728 assert_eq!( 1729 startup_identity_path(&config.paths.signer_identity_source()), 1730 None 1731 ); 1732 assert_eq!( 1733 startup_identity_path(&config.paths.user_identity_source()), 1734 Some(PathBuf::from("/tmp/user-accounts.json")) 1735 ); 1736 1737 config.paths.user_identity_backend = MycIdentityBackend::ExternalCommand; 1738 config.paths.user_identity_path = PathBuf::from("/usr/local/libexec/myc-user-helper"); 1739 assert_eq!( 1740 startup_identity_path(&config.paths.user_identity_source()), 1741 None 1742 ); 1743 } 1744 1745 #[tokio::test] 1746 async fn startup_recovery_rejects_orphaned_signer_publish_workflow() { 1747 let temp = tempfile::tempdir().expect("tempdir"); 1748 let mut config = MycConfig::default(); 1749 config.paths.state_dir = temp.path().join("state"); 1750 config.paths.signer_identity_path = temp.path().join("signer.json"); 1751 config.paths.user_identity_path = temp.path().join("user.json"); 1752 write_test_identity( 1753 &config.paths.signer_identity_path, 1754 "1111111111111111111111111111111111111111111111111111111111111111", 1755 ); 1756 write_test_identity( 1757 &config.paths.user_identity_path, 1758 "2222222222222222222222222222222222222222222222222222222222222222", 1759 ); 1760 1761 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1762 let client_identity = RadrootsIdentity::from_secret_key_str( 1763 "7777777777777777777777777777777777777777777777777777777777777777", 1764 ) 1765 .expect("client identity"); 1766 let manager = runtime.signer_manager().expect("manager"); 1767 let connection = manager 1768 .register_connection( 1769 RadrootsNostrSignerConnectionDraft::new( 1770 client_identity.public_key(), 1771 runtime.user_public_identity(), 1772 ) 1773 .with_connect_secret("orphan-secret") 1774 .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")]) 1775 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1776 ) 1777 .expect("register connection"); 1778 let workflow = manager 1779 .begin_connect_secret_publish_finalization(&connection.connection_id) 1780 .expect("begin workflow"); 1781 1782 let error = runtime 1783 .recover_pending_delivery_jobs() 1784 .await 1785 .expect_err("orphaned workflow should fail recovery"); 1786 let message = error.to_string(); 1787 assert!(message.contains("orphaned signer publish workflows")); 1788 assert!(message.contains(workflow.workflow_id.as_str())); 1789 } 1790 1791 #[tokio::test] 1792 async fn startup_recovery_finalizes_published_connect_secret_workflow() { 1793 let temp = tempfile::tempdir().expect("tempdir"); 1794 let mut config = MycConfig::default(); 1795 config.paths.state_dir = temp.path().join("state"); 1796 config.paths.signer_identity_path = temp.path().join("signer.json"); 1797 config.paths.user_identity_path = temp.path().join("user.json"); 1798 write_test_identity( 1799 &config.paths.signer_identity_path, 1800 "1111111111111111111111111111111111111111111111111111111111111111", 1801 ); 1802 write_test_identity( 1803 &config.paths.user_identity_path, 1804 "2222222222222222222222222222222222222222222222222222222222222222", 1805 ); 1806 1807 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1808 let client_identity = RadrootsIdentity::from_secret_key_str( 1809 "7777777777777777777777777777777777777777777777777777777777777777", 1810 ) 1811 .expect("client identity"); 1812 let manager = runtime.signer_manager().expect("manager"); 1813 let connection = manager 1814 .register_connection( 1815 RadrootsNostrSignerConnectionDraft::new( 1816 client_identity.public_key(), 1817 runtime.user_public_identity(), 1818 ) 1819 .with_connect_secret("recovery-secret") 1820 .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")]) 1821 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1822 ) 1823 .expect("register connection"); 1824 let workflow = manager 1825 .begin_connect_secret_publish_finalization(&connection.connection_id) 1826 .expect("begin workflow"); 1827 manager 1828 .mark_publish_workflow_published(&workflow.workflow_id) 1829 .expect("mark workflow published"); 1830 1831 let event = runtime 1832 .signer_identity() 1833 .sign_event_builder( 1834 RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "recovery"), 1835 "recovery test", 1836 ) 1837 .expect("sign event"); 1838 let outbox_record = MycDeliveryOutboxRecord::new( 1839 MycDeliveryOutboxKind::ListenerResponsePublish, 1840 event, 1841 vec!["wss://relay.example.com".parse().expect("relay url")], 1842 ) 1843 .expect("outbox record") 1844 .with_connection_id(&connection.connection_id) 1845 .with_request_id("recovery-request") 1846 .with_signer_publish_workflow_id(&workflow.workflow_id); 1847 runtime 1848 .delivery_outbox_store() 1849 .enqueue(&outbox_record) 1850 .expect("enqueue outbox"); 1851 runtime 1852 .delivery_outbox_store() 1853 .mark_published_pending_finalize(&outbox_record.job_id, 1) 1854 .expect("mark outbox published"); 1855 1856 runtime 1857 .recover_pending_delivery_jobs() 1858 .await 1859 .expect("recovery should succeed"); 1860 1861 let connection = runtime 1862 .signer_manager() 1863 .expect("manager") 1864 .get_connection(&connection.connection_id) 1865 .expect("get connection") 1866 .expect("stored connection"); 1867 assert!(connection.connect_secret_is_consumed()); 1868 assert!( 1869 runtime 1870 .signer_manager() 1871 .expect("manager") 1872 .list_publish_workflows() 1873 .expect("list workflows") 1874 .is_empty() 1875 ); 1876 let outbox_records = runtime 1877 .delivery_outbox_store() 1878 .list_all() 1879 .expect("list outbox"); 1880 assert_eq!(outbox_records.len(), 1); 1881 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 1882 assert!(outbox_records[0].finalized_at_unix.is_some()); 1883 let audit_records = runtime.operation_audit_store().list().expect("list audit"); 1884 assert_eq!(audit_records.len(), 2); 1885 assert_eq!( 1886 audit_records[0].operation, 1887 MycOperationAuditKind::ListenerResponsePublish 1888 ); 1889 assert_eq!( 1890 audit_records[0].outcome, 1891 MycOperationAuditOutcome::Succeeded 1892 ); 1893 assert_eq!( 1894 audit_records[0].request_id.as_deref(), 1895 Some("recovery-request") 1896 ); 1897 assert_eq!( 1898 audit_records[1].operation, 1899 MycOperationAuditKind::DeliveryRecovery 1900 ); 1901 assert_eq!( 1902 audit_records[1].outcome, 1903 MycOperationAuditOutcome::Succeeded 1904 ); 1905 } 1906 1907 #[tokio::test] 1908 async fn startup_recovery_rejects_queued_job_with_missing_signer_workflow() { 1909 let temp = tempfile::tempdir().expect("tempdir"); 1910 let mut config = MycConfig::default(); 1911 config.paths.state_dir = temp.path().join("state"); 1912 config.paths.signer_identity_path = temp.path().join("signer.json"); 1913 config.paths.user_identity_path = temp.path().join("user.json"); 1914 write_test_identity( 1915 &config.paths.signer_identity_path, 1916 "1111111111111111111111111111111111111111111111111111111111111111", 1917 ); 1918 write_test_identity( 1919 &config.paths.user_identity_path, 1920 "2222222222222222222222222222222222222222222222222222222222222222", 1921 ); 1922 1923 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1924 let client_identity = RadrootsIdentity::from_secret_key_str( 1925 "7777777777777777777777777777777777777777777777777777777777777777", 1926 ) 1927 .expect("client identity"); 1928 let manager = runtime.signer_manager().expect("manager"); 1929 let connection = manager 1930 .register_connection( 1931 RadrootsNostrSignerConnectionDraft::new( 1932 client_identity.public_key(), 1933 runtime.user_public_identity(), 1934 ) 1935 .with_connect_secret("missing-workflow-secret") 1936 .with_relays(vec!["wss://relay.example.com".parse().expect("relay url")]) 1937 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1938 ) 1939 .expect("register connection"); 1940 let workflow = manager 1941 .begin_connect_secret_publish_finalization(&connection.connection_id) 1942 .expect("begin workflow"); 1943 let event = runtime 1944 .signer_identity() 1945 .sign_event_builder( 1946 RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "queued-recovery"), 1947 "queued recovery test", 1948 ) 1949 .expect("sign event"); 1950 let outbox_record = MycDeliveryOutboxRecord::new( 1951 MycDeliveryOutboxKind::ListenerResponsePublish, 1952 event, 1953 vec!["wss://relay.example.com".parse().expect("relay url")], 1954 ) 1955 .expect("outbox record") 1956 .with_connection_id(&connection.connection_id) 1957 .with_request_id("queued-missing-workflow") 1958 .with_signer_publish_workflow_id(&workflow.workflow_id); 1959 runtime 1960 .delivery_outbox_store() 1961 .enqueue(&outbox_record) 1962 .expect("enqueue outbox"); 1963 manager 1964 .cancel_publish_workflow(&workflow.workflow_id) 1965 .expect("cancel workflow"); 1966 1967 let error = runtime 1968 .recover_pending_delivery_jobs() 1969 .await 1970 .expect_err("queued job with missing workflow should fail recovery"); 1971 assert!( 1972 error 1973 .to_string() 1974 .contains("missing signer publish workflow before startup recovery publish") 1975 ); 1976 } 1977 }