app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit df0b3a84a4b7c33b43879d6c9f70ec8c3751d7d2
parent 4cb986daef8d642f315a236a0a7a511990d0121f
Author: triesap <tyson@radroots.org>
Date:   Thu, 18 Jun 2026 14:41:42 -0700

app: harden sdk runtime shutdown

- decouple SDK shutdown from normal bounded command queue pressure
- drop the command sender and join the worker through a dedicated shutdown flag
- preserve CommandQueueFull behavior for ordinary SDK commands
- cover full-queue shutdown and lifecycle status behavior in runtime tests

Diffstat:
Mcrates/runtime/src/sdk.rs | 138+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
1 file changed, 96 insertions(+), 42 deletions(-)

diff --git a/crates/runtime/src/sdk.rs b/crates/runtime/src/sdk.rs @@ -3,6 +3,7 @@ use std::{ path::{Path, PathBuf}, sync::{ Arc, Condvar, Mutex, MutexGuard, + atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, SyncSender, TrySendError}, }, thread::{self, JoinHandle}, @@ -240,7 +241,7 @@ pub enum AppSdkRuntimeError { #[derive(Debug)] pub struct AppSdkRuntime { - command_sender: SyncSender<AppSdkWorkerCommand>, + command_sender: Mutex<Option<SyncSender<AppSdkWorkerCommand>>>, shared: Arc<AppSdkRuntimeShared>, worker: Mutex<Option<JoinHandle<()>>>, } @@ -249,10 +250,10 @@ pub struct AppSdkRuntime { struct AppSdkRuntimeShared { status: Mutex<AppSdkRuntimeStatus>, status_changed: Condvar, + shutdown_requested: AtomicBool, } enum AppSdkWorkerCommand { - Shutdown(mpsc::Sender<()>), StorageStatus(mpsc::Sender<Result<AppSdkStorageDiagnostics, AppSdkRuntimeIssue>>), IntegrityStatus(mpsc::Sender<Result<AppSdkIntegrityDiagnostics, AppSdkRuntimeIssue>>), SyncStatus(mpsc::Sender<Result<AppSdkSyncDiagnostics, AppSdkRuntimeIssue>>), @@ -272,7 +273,6 @@ enum AppSdkWorkerCommand { impl fmt::Debug for AppSdkWorkerCommand { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Shutdown(_) => formatter.write_str("Shutdown"), Self::StorageStatus(_) => formatter.write_str("StorageStatus"), Self::IntegrityStatus(_) => formatter.write_str("IntegrityStatus"), Self::SyncStatus(_) => formatter.write_str("SyncStatus"), @@ -355,6 +355,7 @@ impl AppSdkRuntime { let shared = Arc::new(AppSdkRuntimeShared { status: Mutex::new(initial_status), status_changed: Condvar::new(), + shutdown_requested: AtomicBool::new(false), }); let (command_sender, command_receiver) = mpsc::sync_channel(config.command_queue_capacity); let worker_shared = Arc::clone(&shared); @@ -363,7 +364,7 @@ impl AppSdkRuntime { .spawn(move || run_app_sdk_worker(config, worker_shared, command_receiver))?; Ok(Self { - command_sender, + command_sender: Mutex::new(Some(command_sender)), shared, worker: Mutex::new(Some(worker)), }) @@ -441,21 +442,14 @@ impl AppSdkRuntime { return self.join_worker(); } - let (ack_sender, ack_receiver) = mpsc::channel(); - match self + self.shared.shutdown_requested.store(true, Ordering::SeqCst); + transition_status_state(&self.shared, AppSdkLifecycleState::ShuttingDown); + let command_sender = self .command_sender - .try_send(AppSdkWorkerCommand::Shutdown(ack_sender)) - { - Ok(()) => {} - Err(TrySendError::Full(_)) => return Err(AppSdkRuntimeError::CommandQueueFull), - Err(TrySendError::Disconnected(_)) => { - transition_status_state(&self.shared, AppSdkLifecycleState::Stopped); - return self.join_worker(); - } - } - ack_receiver - .recv() - .map_err(|_| AppSdkRuntimeError::ShutdownAck)?; + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .take(); + drop(command_sender); self.join_worker() } @@ -475,7 +469,20 @@ impl AppSdkRuntime { command: impl FnOnce(mpsc::Sender<Result<T, AppSdkRuntimeIssue>>) -> AppSdkWorkerCommand, ) -> Result<T, AppSdkRuntimeError> { let (response_sender, response_receiver) = mpsc::channel(); - match self.command_sender.try_send(command(response_sender)) { + let command_sender = { + let command_sender = self + .command_sender + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if self.shared.shutdown_requested.load(Ordering::SeqCst) { + return Err(AppSdkRuntimeError::CommandQueueClosed); + } + command_sender + .as_ref() + .cloned() + .ok_or(AppSdkRuntimeError::CommandQueueClosed)? + }; + match command_sender.try_send(command(response_sender)) { Ok(()) => {} Err(TrySendError::Full(_)) => return Err(AppSdkRuntimeError::CommandQueueFull), Err(TrySendError::Disconnected(_)) => { @@ -786,14 +793,11 @@ fn run_app_sdk_worker( }; while let Ok(command) = command_receiver.recv() { + if shared.shutdown_requested.load(Ordering::SeqCst) { + break; + } + match command { - AppSdkWorkerCommand::Shutdown(ack_sender) => { - transition_status_state(&shared, AppSdkLifecycleState::ShuttingDown); - drop(sdk.take()); - transition_status_state(&shared, AppSdkLifecycleState::Stopped); - let _ = ack_sender.send(()); - return; - } AppSdkWorkerCommand::StorageStatus(response_sender) => { let result = if let Some(issue) = lifecycle_busy_issue(&shared) { Err(issue) @@ -887,22 +891,11 @@ fn run_degraded_worker( command_receiver: Receiver<AppSdkWorkerCommand>, ) { while let Ok(command) = command_receiver.recv() { + if shared.shutdown_requested.load(Ordering::SeqCst) { + break; + } + match command { - AppSdkWorkerCommand::Shutdown(ack_sender) => { - transition_status_state(&shared, AppSdkLifecycleState::ShuttingDown); - let last_issue = lock_status(&shared).last_issue.clone(); - replace_status( - &shared, - AppSdkRuntimeStatus::from_config( - &config, - AppSdkLifecycleState::Stopped, - None, - last_issue, - ), - ); - let _ = ack_sender.send(()); - return; - } AppSdkWorkerCommand::StorageStatus(response_sender) => { send_worker_result( &shared, @@ -1149,6 +1142,12 @@ fn serialized_label(value: &(impl Serialize + fmt::Debug)) -> String { mod tests { use std::{ fs, + sync::{ + Arc, Condvar, Mutex, + atomic::{AtomicBool, Ordering}, + mpsc, + }, + thread, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -1162,7 +1161,8 @@ mod tests { use super::{ APP_SDK_STORAGE_DIR_NAME, AppSdkConfig, AppSdkLifecycleState, AppSdkProjectionLifecycleState, AppSdkRelayUrlPolicy, AppSdkRestorePreflightRequest, - AppSdkRuntime, AppSdkRuntimeError, app_sdk_storage_root_from_data_root, + AppSdkRuntime, AppSdkRuntimeError, AppSdkRuntimeShared, AppSdkRuntimeStatus, + AppSdkWorkerCommand, app_sdk_storage_root_from_data_root, transition_status_state, }; #[test] @@ -1296,6 +1296,60 @@ mod tests { } #[test] + fn sdk_shutdown_joins_when_normal_command_queue_is_full() { + let config = AppSdkConfig::from_app_data_root( + "/tmp/radroots-app-sdk-full-queue".as_ref(), + vec!["ws://127.0.0.1:8080".to_owned()], + ) + .with_command_queue_capacity(1); + let shared = Arc::new(AppSdkRuntimeShared { + status: Mutex::new(AppSdkRuntimeStatus::from_config( + &config, + AppSdkLifecycleState::Ready, + None, + None, + )), + status_changed: Condvar::new(), + shutdown_requested: AtomicBool::new(false), + }); + let (command_sender, command_receiver) = mpsc::sync_channel(config.command_queue_capacity); + let worker_shared = Arc::clone(&shared); + let worker = thread::spawn(move || { + while !worker_shared.shutdown_requested.load(Ordering::SeqCst) { + thread::sleep(Duration::from_millis(1)); + } + drop(command_receiver); + transition_status_state(&worker_shared, AppSdkLifecycleState::Stopped); + }); + let runtime = AppSdkRuntime { + command_sender: Mutex::new(Some(command_sender)), + shared, + worker: Mutex::new(Some(worker)), + }; + let (response_sender, _response_receiver) = mpsc::channel(); + runtime + .command_sender + .lock() + .expect("command sender lock") + .as_ref() + .expect("command sender") + .try_send(AppSdkWorkerCommand::Diagnostics(response_sender)) + .expect("normal command queue should fill"); + + assert!(matches!( + runtime.sync_status(), + Err(AppSdkRuntimeError::CommandQueueFull) + )); + assert_eq!(runtime.status().state, AppSdkLifecycleState::Ready); + + runtime + .shutdown() + .expect("shutdown should not depend on normal command queue capacity"); + + assert_eq!(runtime.status().state, AppSdkLifecycleState::Stopped); + } + + #[test] fn sdk_restore_preflight_marks_projections_stale_without_writing_destination() { let backup_source_root = temp_storage_root("restore_backup_source"); let backup_archive = backup_source_root