lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 41188033d11137b831ea25c5ddb7e1ffb2dfefd2
parent fd52089a6e6e06e56da4a7af9fcf304b5cf28144
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 04:41:24 +0000

simplex: add durable agent runtime execution

Diffstat:
MCargo.lock | 5+++++
Mcrates/simplex-agent-runtime/Cargo.toml | 3+++
Mcrates/simplex-agent-runtime/src/runtime.rs | 837++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
Mcrates/simplex-agent-store/Cargo.toml | 14+++++++++++++-
Mcrates/simplex-agent-store/src/error.rs | 38++++++++++++++++++++++++++++++++++++++
Mcrates/simplex-agent-store/src/lib.rs | 8+++++---
Mcrates/simplex-agent-store/src/store.rs | 1038+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Acrates/simplex-smp-transport/src/executor.rs | 29+++++++++++++++++++++++++++++
Mcrates/simplex-smp-transport/src/lib.rs | 5+++++
9 files changed, 1868 insertions(+), 109 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2433,6 +2433,7 @@ dependencies = [ "radroots-simplex-smp-proto", "radroots-simplex-smp-transport", "sha2", + "tempfile", ] [[package]] @@ -2441,6 +2442,10 @@ version = "0.1.0-alpha.1" dependencies = [ "radroots-simplex-agent-proto", "radroots-simplex-smp-proto", + "serde", + "serde_json", + "sha2", + "tempfile", ] [[package]] diff --git a/crates/simplex-agent-runtime/Cargo.toml b/crates/simplex-agent-runtime/Cargo.toml @@ -31,3 +31,6 @@ radroots-simplex-smp-crypto = { workspace = true, default-features = false } radroots-simplex-smp-proto = { workspace = true, default-features = false } radroots-simplex-smp-transport = { workspace = true, default-features = false } sha2 = { workspace = true, default-features = false } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/simplex-agent-runtime/src/runtime.rs b/crates/simplex-agent-runtime/src/runtime.rs @@ -1,6 +1,7 @@ use crate::error::RadrootsSimplexAgentRuntimeError; use crate::types::{RadrootsSimplexAgentCommandOutcome, RadrootsSimplexAgentRuntimeEvent}; use alloc::collections::VecDeque; +use alloc::format; use alloc::string::String; use alloc::vec::Vec; use radroots_simplex_agent_proto::prelude::{ @@ -9,19 +10,38 @@ use radroots_simplex_agent_proto::prelude::{ RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentEnvelope, RadrootsSimplexAgentMessage, RadrootsSimplexAgentMessageFrame, RadrootsSimplexAgentMessageHeader, RadrootsSimplexAgentMessageReceipt, - RadrootsSimplexAgentQueueDescriptor, RadrootsSimplexSmpRatchetState, encode_decrypted_message, + RadrootsSimplexAgentQueueDescriptor, encode_decrypted_message, encode_envelope, }; use radroots_simplex_agent_store::prelude::{ - RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentPendingCommandKind, - RadrootsSimplexAgentQueueRole, RadrootsSimplexAgentStore, + RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentPendingCommand, + RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentQueueRole, + RadrootsSimplexAgentStore, +}; +use radroots_simplex_smp_crypto::prelude::{ + RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope, + RadrootsSimplexSmpRatchetState, +}; +use radroots_simplex_smp_proto::prelude::{ + RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage, + RadrootsSimplexSmpCommand, RadrootsSimplexSmpCorrelationId, RadrootsSimplexSmpMessageFlags, + RadrootsSimplexSmpNewQueueRequest, RadrootsSimplexSmpQueueMode, + RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpSendCommand, + RadrootsSimplexSmpSubscriptionMode, +}; +use radroots_simplex_smp_transport::prelude::{ + RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportRequest, + RadrootsSimplexSmpTransportResponse, }; -use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri; use sha2::{Digest, Sha256}; +#[cfg(feature = "std")] +use std::path::{Path, PathBuf}; pub struct RadrootsSimplexAgentRuntimeBuilder { store: Option<RadrootsSimplexAgentStore>, queue_capacity: usize, retry_delay_ms: u64, + #[cfg(feature = "std")] + persistent_store_path: Option<PathBuf>, } impl RadrootsSimplexAgentRuntimeBuilder { @@ -33,6 +53,8 @@ impl RadrootsSimplexAgentRuntimeBuilder { store: None, queue_capacity: Self::DEFAULT_QUEUE_CAPACITY, retry_delay_ms: Self::DEFAULT_RETRY_DELAY_MS, + #[cfg(feature = "std")] + persistent_store_path: None, } } @@ -41,6 +63,12 @@ impl RadrootsSimplexAgentRuntimeBuilder { self } + #[cfg(feature = "std")] + pub fn persistent_store_path(mut self, path: impl AsRef<Path>) -> Self { + self.persistent_store_path = Some(path.as_ref().to_path_buf()); + self + } + pub fn queue_capacity(mut self, queue_capacity: usize) -> Self { self.queue_capacity = queue_capacity; self @@ -57,8 +85,21 @@ impl RadrootsSimplexAgentRuntimeBuilder { "queue_capacity", )); } + #[cfg(feature = "std")] + let store = match (self.store, self.persistent_store_path) { + (Some(mut store), Some(path)) => { + store.set_persistence_path(path); + store + } + (Some(store), None) => store, + (None, Some(path)) => RadrootsSimplexAgentStore::open(path)?, + (None, None) => RadrootsSimplexAgentStore::default(), + }; + #[cfg(not(feature = "std"))] + let store = self.store.unwrap_or_default(); + Ok(RadrootsSimplexAgentRuntime { - store: self.store.unwrap_or_default(), + store, events: VecDeque::with_capacity(self.queue_capacity), retry_delay_ms: self.retry_delay_ms, }) @@ -85,8 +126,16 @@ impl RadrootsSimplexAgentRuntime { contact_address: bool, now: u64, ) -> Result<String, RadrootsSimplexAgentRuntimeError> { + let local_dh_public_key = derive_material( + b"connection-create-local-dh", + &[ + invitation_queue.to_string().as_bytes(), + &e2e_public_key, + &now.to_be_bytes(), + ], + ); let ratchet_state = RadrootsSimplexSmpRatchetState::initiator( - b"local-dh".to_vec(), + local_dh_public_key, invitation_queue.recipient_dh_public_key.as_bytes().to_vec(), None, ) @@ -139,6 +188,7 @@ impl RadrootsSimplexAgentRuntime { connection_id: connection.id.clone(), invitation, }); + self.flush_store()?; Ok(connection.id) } @@ -148,8 +198,16 @@ impl RadrootsSimplexAgentRuntime { reply_queue: RadrootsSimplexSmpQueueUri, now: u64, ) -> Result<String, RadrootsSimplexAgentRuntimeError> { + let local_dh_public_key = derive_material( + b"connection-join-local-dh", + &[ + invitation.connection_id.as_slice(), + reply_queue.to_string().as_bytes(), + &now.to_be_bytes(), + ], + ); let ratchet_state = RadrootsSimplexSmpRatchetState::responder( - b"reply-dh".to_vec(), + local_dh_public_key, invitation .invitation_queue .recipient_dh_public_key @@ -168,7 +226,10 @@ impl RadrootsSimplexAgentRuntime { queue_uri: invitation.invitation_queue.clone(), replaced_queue: None, primary: true, - sender_key: Some(b"sender-auth".to_vec()), + sender_key: Some(derive_material( + b"join-sender-auth", + &[invitation.connection_id.as_slice(), &now.to_be_bytes()], + )), }; let receive_descriptor = RadrootsSimplexAgentQueueDescriptor { queue_uri: reply_queue, @@ -210,17 +271,17 @@ impl RadrootsSimplexAgentRuntime { }, now, )?; + let confirmation_payload = + self.next_encrypted_payload(&connection.id, invitation.connection_id)?; self.store.enqueue_command( &connection.id, RadrootsSimplexAgentPendingCommandKind::SendEnvelope { queue: send_descriptor.queue_address(), envelope: RadrootsSimplexAgentEnvelope::Confirmation { reply_queue: true, - encrypted: RadrootsSimplexAgentEncryptedPayload { - ratchet_header: None, - ciphertext: invitation.connection_id, - }, + encrypted: confirmation_payload, }, + delivery: None, }, now, )?; @@ -228,6 +289,7 @@ impl RadrootsSimplexAgentRuntime { .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { connection_id: connection.id.clone(), }); + self.flush_store()?; Ok(connection.id) } @@ -240,10 +302,7 @@ impl RadrootsSimplexAgentRuntime { self.store .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?; let send_queue = self.store.primary_send_queue(connection_id)?; - let encrypted = RadrootsSimplexAgentEncryptedPayload { - ratchet_header: None, - ciphertext: local_info, - }; + let encrypted = self.next_encrypted_payload(connection_id, local_info)?; self.store.enqueue_command( connection_id, RadrootsSimplexAgentPendingCommandKind::SendEnvelope { @@ -252,9 +311,11 @@ impl RadrootsSimplexAgentRuntime { reply_queue: false, encrypted, }, + delivery: None, }, now, )?; + self.flush_store()?; Ok(()) } @@ -276,6 +337,7 @@ impl RadrootsSimplexAgentRuntime { .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { connection_id: connection_id.into(), }); + self.flush_store()?; Ok(()) } @@ -286,16 +348,20 @@ impl RadrootsSimplexAgentRuntime { now: u64, ) -> Result<u64, RadrootsSimplexAgentRuntimeError> { let send_queue = self.store.primary_send_queue(connection_id)?; - let previous_hash = self - .store - .connection(connection_id)? + let connection = self.store.connection(connection_id)?; + if connection.staged_outbound_message.is_some() { + return Err(RadrootsSimplexAgentRuntimeError::Store( + radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::PendingOutboundMessage( + connection_id.into(), + ), + )); + } + let previous_hash = connection .delivery_cursor .last_sent_message_hash .clone() .unwrap_or_default(); - let message_id = self - .store - .connection(connection_id)? + let message_id = connection .delivery_cursor .last_sent_message_id .unwrap_or(0) @@ -305,24 +371,25 @@ impl RadrootsSimplexAgentRuntime { message_id, previous_message_hash: previous_hash, }, - message: RadrootsSimplexAgentMessage::UserMessage(body.clone()), + message: RadrootsSimplexAgentMessage::UserMessage(body), padding: Vec::new(), }; let ciphertext = encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?; let message_hash = Sha256::digest(&ciphertext).to_vec(); - self.store - .record_outbound_message(connection_id, message_id, message_hash)?; + let prepared = self + .store + .prepare_outbound_message(connection_id, message_hash.clone())?; + let encrypted = self.next_encrypted_payload(connection_id, ciphertext)?; self.store.enqueue_command( connection_id, RadrootsSimplexAgentPendingCommandKind::SendEnvelope { queue: send_queue.descriptor.queue_address(), - envelope: RadrootsSimplexAgentEnvelope::Message( - RadrootsSimplexAgentEncryptedPayload { - ratchet_header: None, - ciphertext, - }, - ), + envelope: RadrootsSimplexAgentEnvelope::Message(encrypted), + delivery: Some(RadrootsSimplexAgentOutboundMessage { + message_id: prepared.message_id, + message_hash: prepared.message_hash, + }), }, now, )?; @@ -331,6 +398,7 @@ impl RadrootsSimplexAgentRuntime { connection_id: connection_id.into(), message_id, }); + self.flush_store()?; Ok(message_id) } @@ -355,6 +423,7 @@ impl RadrootsSimplexAgentRuntime { }, now, )?; + self.flush_store()?; Ok(()) } @@ -374,6 +443,7 @@ impl RadrootsSimplexAgentRuntime { command_id: command.id, }); } + self.flush_store()?; Ok(()) } @@ -396,6 +466,7 @@ impl RadrootsSimplexAgentRuntime { .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { connection_id: connection_id.into(), }); + self.flush_store()?; Ok(()) } @@ -490,6 +561,7 @@ impl RadrootsSimplexAgentRuntime { } } } + self.flush_store()?; Ok(()) } @@ -500,7 +572,8 @@ impl RadrootsSimplexAgentRuntime { ) -> Result<(), RadrootsSimplexAgentRuntimeError> { match outcome { RadrootsSimplexAgentCommandOutcome::Delivered => { - let _ = self.store.mark_command_delivered(command_id)?; + let command = self.store.mark_command_delivered(command_id)?; + self.apply_delivery_side_effects(&command)?; } RadrootsSimplexAgentCommandOutcome::RetryAt { ready_at } => { let command = self.store.mark_command_retry(command_id, ready_at)?; @@ -512,6 +585,7 @@ impl RadrootsSimplexAgentRuntime { } RadrootsSimplexAgentCommandOutcome::Failed { message } => { let command = self.store.mark_command_failed(command_id)?; + self.apply_failure_side_effects(&command)?; self.events .push_back(RadrootsSimplexAgentRuntimeEvent::Error { connection_id: Some(command.connection_id), @@ -519,6 +593,20 @@ impl RadrootsSimplexAgentRuntime { }); } } + self.flush_store()?; + Ok(()) + } + + pub fn execute_ready_commands<T: RadrootsSimplexSmpCommandTransport>( + &mut self, + transport: &mut T, + now: u64, + limit: usize, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + for command in self.store.take_ready_commands(now, limit) { + self.dispatch_ready_command(transport, &command, now)?; + } + self.flush_store()?; Ok(()) } @@ -536,12 +624,309 @@ impl RadrootsSimplexAgentRuntime { .filter_map(|_| self.events.pop_front()) .collect::<Vec<_>>() } + + fn dispatch_ready_command<T: RadrootsSimplexSmpCommandTransport>( + &mut self, + transport: &mut T, + command: &RadrootsSimplexAgentPendingCommand, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + match &command.kind { + RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => { + for descriptor in descriptors.clone() { + self.store.add_queue( + &command.connection_id, + descriptor, + RadrootsSimplexAgentQueueRole::Receive, + true, + )?; + } + self.record_command_outcome( + command.id, + RadrootsSimplexAgentCommandOutcome::Delivered, + ) + } + RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { + for queue in queues { + self.store + .mark_queue_tested(&command.connection_id, queue)?; + } + self.record_command_outcome( + command.id, + RadrootsSimplexAgentCommandOutcome::Delivered, + ) + } + _ => { + let request = self.build_transport_request(command)?; + match transport.execute(request) { + Ok(response) => self.apply_transport_response(command, response), + Err(error) => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::Error { + connection_id: Some(command.connection_id.clone()), + message: format!( + "SimpleX transport execution failed for command `{}`: {error}", + command.id + ), + }); + self.record_command_outcome( + command.id, + RadrootsSimplexAgentCommandOutcome::RetryAt { + ready_at: now + self.retry_delay_ms, + }, + ) + } + } + } + } + } + + fn build_transport_request( + &self, + command: &RadrootsSimplexAgentPendingCommand, + ) -> Result<RadrootsSimplexSmpTransportRequest, RadrootsSimplexAgentRuntimeError> { + let (queue_address, smp_command) = self.command_transport_parts(command)?; + let queue = self + .store + .queue_record(&command.connection_id, &queue_address)?; + let auth = queue.auth_state.ok_or_else(|| { + RadrootsSimplexAgentRuntimeError::Store( + radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::QueueAuthStateMissing( + command.connection_id.clone(), + ), + ) + })?; + let correlation_id = correlation_id_for_command(command.id); + let scope = RadrootsSimplexSmpQueueAuthorizationScope::new( + auth.session_identifier.clone(), + correlation_id, + queue_address.sender_id.clone(), + ) + .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; + let material = RadrootsSimplexSmpQueueAuthorizationMaterial::for_command( + &scope, + &smp_command, + RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, + auth.queue_key_material.clone(), + auth.server_session_key.clone(), + ) + .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; + Ok(RadrootsSimplexSmpTransportRequest { + server: queue.descriptor.queue_uri.server.clone(), + transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, + transmission: + radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommandTransmission { + authorization: material.authorized_digest.to_vec(), + correlation_id: Some(correlation_id), + entity_id: queue_address.sender_id, + command: smp_command, + }, + }) + } + + fn command_transport_parts( + &self, + command: &RadrootsSimplexAgentPendingCommand, + ) -> Result< + ( + radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress, + RadrootsSimplexSmpCommand, + ), + RadrootsSimplexAgentRuntimeError, + > { + match &command.kind { + RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => Ok(( + descriptor.queue_address(), + RadrootsSimplexSmpCommand::New(RadrootsSimplexSmpNewQueueRequest { + recipient_auth_public_key: descriptor.queue_uri.sender_id.as_bytes().to_vec(), + recipient_dh_public_key: descriptor + .queue_uri + .recipient_dh_public_key + .as_bytes() + .to_vec(), + basic_auth: None, + subscription_mode: RadrootsSimplexSmpSubscriptionMode::Subscribe, + queue_request_data: Some( + match descriptor + .queue_uri + .queue_mode + .unwrap_or(RadrootsSimplexSmpQueueMode::Messaging) + { + RadrootsSimplexSmpQueueMode::Messaging => { + RadrootsSimplexSmpQueueRequestData::Messaging(None) + } + RadrootsSimplexSmpQueueMode::Contact => { + RadrootsSimplexSmpQueueRequestData::Contact(None) + } + }, + ), + notifier_credentials: None, + }), + )), + RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key } => Ok(( + queue.clone(), + RadrootsSimplexSmpCommand::SKey(sender_key.clone().unwrap_or_default()), + )), + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue, envelope, .. + } => Ok(( + queue.clone(), + RadrootsSimplexSmpCommand::Send(RadrootsSimplexSmpSendCommand { + flags: RadrootsSimplexSmpMessageFlags::notifications_enabled(), + message_body: encode_envelope(envelope)?, + }), + )), + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => { + Ok((queue.clone(), RadrootsSimplexSmpCommand::Sub)) + } + RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { queue, receipt } => Ok(( + queue.clone(), + RadrootsSimplexSmpCommand::Ack(receipt.message_id.to_be_bytes().to_vec()), + )), + RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => Ok(( + descriptors + .first() + .ok_or_else(|| { + RadrootsSimplexAgentRuntimeError::Runtime( + "queue rotation command requires at least one descriptor".into(), + ) + })? + .queue_address(), + RadrootsSimplexSmpCommand::Que, + )), + RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => Ok(( + queues.first().cloned().ok_or_else(|| { + RadrootsSimplexAgentRuntimeError::Runtime( + "queue test command requires at least one queue".into(), + ) + })?, + RadrootsSimplexSmpCommand::Ping, + )), + } + } + + fn apply_transport_response( + &mut self, + command: &RadrootsSimplexAgentPendingCommand, + response: RadrootsSimplexSmpTransportResponse, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + match response.transmission.message { + RadrootsSimplexSmpBrokerMessage::Err(error) => self.record_command_outcome( + command.id, + RadrootsSimplexAgentCommandOutcome::Failed { + message: format!( + "SimpleX broker rejected command `{}`: {:?}", + command.id, error + ), + }, + ), + _ => self + .record_command_outcome(command.id, RadrootsSimplexAgentCommandOutcome::Delivered), + } + } + + fn apply_delivery_side_effects( + &mut self, + command: &RadrootsSimplexAgentPendingCommand, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + match &command.kind { + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + delivery: Some(delivery), + .. + } => { + let _ = self + .store + .confirm_outbound_message(&command.connection_id, delivery.message_id)?; + } + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => { + self.store + .mark_queue_subscribed(&command.connection_id, queue)?; + } + RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { + for queue in queues { + self.store + .mark_queue_tested(&command.connection_id, queue)?; + } + } + _ => {} + } + Ok(()) + } + + fn apply_failure_side_effects( + &mut self, + command: &RadrootsSimplexAgentPendingCommand, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + if let RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + delivery: Some(delivery), + .. + } = &command.kind + { + let _ = self + .store + .clear_staged_outbound_message(&command.connection_id, delivery.message_id)?; + } + Ok(()) + } + + fn next_encrypted_payload( + &mut self, + connection_id: &str, + ciphertext: Vec<u8>, + ) -> Result<RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentRuntimeError> { + let ratchet_header = self + .store + .connection_mut(connection_id)? + .ratchet_state + .as_mut() + .map(|state| { + state + .next_outbound_header() + .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())) + }) + .transpose()?; + Ok(RadrootsSimplexAgentEncryptedPayload { + ratchet_header, + ciphertext, + }) + } + + #[cfg(feature = "std")] + fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> { + self.store.flush().map_err(Into::into) + } + + #[cfg(not(feature = "std"))] + fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> { + Ok(()) + } +} + +fn derive_material(label: &[u8], parts: &[&[u8]]) -> Vec<u8> { + let mut hasher = Sha256::new(); + hasher.update(label); + for part in parts { + hasher.update((*part).len().to_be_bytes()); + hasher.update(*part); + } + hasher.finalize().to_vec() +} + +fn correlation_id_for_command(command_id: u64) -> RadrootsSimplexSmpCorrelationId { + let digest = derive_material(b"simplex-command-correlation", &[&command_id.to_be_bytes()]); + let mut correlation = [0_u8; RadrootsSimplexSmpCorrelationId::LENGTH]; + correlation.copy_from_slice(&digest[..RadrootsSimplexSmpCorrelationId::LENGTH]); + RadrootsSimplexSmpCorrelationId::new(correlation) } #[cfg(test)] mod tests { use super::*; - use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri; + use alloc::collections::VecDeque; + use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpQueueIdsResponse, + }; + use radroots_simplex_smp_transport::prelude::RadrootsSimplexSmpTransportBlock; fn invitation_queue() -> RadrootsSimplexSmpQueueUri { RadrootsSimplexSmpQueueUri::parse( @@ -557,17 +942,132 @@ mod tests { .unwrap() } + #[derive(Default)] + struct ScriptedTransport { + responses: VecDeque<RadrootsSimplexSmpBrokerMessage>, + requests: Vec<RadrootsSimplexSmpTransportRequest>, + } + + impl ScriptedTransport { + fn with_responses(responses: Vec<RadrootsSimplexSmpBrokerMessage>) -> Self { + Self { + responses: responses.into(), + requests: Vec::new(), + } + } + } + + impl RadrootsSimplexSmpCommandTransport for ScriptedTransport { + type Error = String; + + fn execute( + &mut self, + request: RadrootsSimplexSmpTransportRequest, + ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> { + let block = + RadrootsSimplexSmpTransportBlock::from_current_command_transmissions(&[request + .transmission + .clone()]) + .map_err(|error| error.to_string())?; + let encoded = block.encode().map_err(|error| error.to_string())?; + let decoded = RadrootsSimplexSmpTransportBlock::decode(&encoded) + .map_err(|error| error.to_string())?; + let decoded_transmissions = decoded + .decode_command_transmissions(request.transport_version) + .map_err(|error| error.to_string())?; + assert_eq!(decoded_transmissions.len(), 1); + assert_eq!(decoded_transmissions[0], request.transmission); + + let response_message = self + .responses + .pop_front() + .ok_or_else(|| "missing scripted transport response".to_owned())?; + let response_transmission = RadrootsSimplexSmpBrokerTransmission { + authorization: Vec::new(), + correlation_id: request.transmission.correlation_id, + entity_id: request.transmission.entity_id.clone(), + message: response_message, + }; + let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions( + &[response_transmission.clone()], + request.transport_version, + ) + .map_err(|error| error.to_string())?; + let response_encoded = response_block.encode().map_err(|error| error.to_string())?; + self.requests.push(request.clone()); + Ok(RadrootsSimplexSmpTransportResponse { + server: request.server, + transport_version: request.transport_version, + transmission: response_transmission, + transport_hash: Sha256::digest(&response_encoded).to_vec(), + }) + } + } + #[test] - fn create_join_allow_send_and_retry_flow() { + fn create_and_join_commands_execute_through_transport() { let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); let created = runtime .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) .unwrap(); + let invitation = runtime + .store + .connection(&created) + .unwrap() + .invitation + .clone() + .unwrap(); + let joined = runtime + .join_connection(invitation, reply_queue(), 20) + .unwrap(); + + let mut transport = ScriptedTransport::with_responses(vec![ + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient".to_vec(), + sender_id: b"sender".to_vec(), + server_dh_public_key: b"server-dh".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient-2".to_vec(), + sender_id: b"sender-2".to_vec(), + server_dh_public_key: b"server-dh-2".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + ]); + runtime + .execute_ready_commands(&mut transport, 30, 16) + .unwrap(); + + let created_queue = runtime.store.receive_queues(&created).unwrap(); + assert!(created_queue[0].subscribed); + assert_eq!(transport.requests.len(), 6); assert!(matches!( - runtime.drain_events(10).remove(0), - RadrootsSimplexAgentRuntimeEvent::InvitationReady { .. } + runtime.drain_events(16).first(), + Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady { .. }) )); + assert_eq!( + runtime.store.connection(&joined).unwrap().status, + RadrootsSimplexAgentConnectionStatus::JoinPending + ); + } + #[test] + fn delivered_send_confirms_cursor_only_after_transport_success() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let created = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); let invitation = runtime .store .connection(&created) @@ -578,79 +1078,234 @@ mod tests { let joined = runtime .join_connection(invitation, reply_queue(), 20) .unwrap(); + + let mut setup_transport = ScriptedTransport::with_responses(vec![ + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient".to_vec(), + sender_id: b"sender".to_vec(), + server_dh_public_key: b"server-dh".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient-2".to_vec(), + sender_id: b"sender-2".to_vec(), + server_dh_public_key: b"server-dh-2".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + ]); runtime - .allow_connection(&joined, b"local-info".to_vec(), 30) + .execute_ready_commands(&mut setup_transport, 30, 16) .unwrap(); - runtime.subscribe_connection(&joined, 40).unwrap(); + let message_id = runtime - .send_message(&joined, b"hello simplex".to_vec(), 50) + .send_message(&joined, b"hello simplex".to_vec(), 40) .unwrap(); assert_eq!(message_id, 1); + assert_eq!( + runtime + .store + .connection(&joined) + .unwrap() + .delivery_cursor + .last_sent_message_id, + None + ); + + let mut delivery_transport = + ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]); runtime - .ack_message( - &joined, - message_id, - b"hash".to_vec(), - b"receipt".to_vec(), - 60, - ) + .execute_ready_commands(&mut delivery_transport, 50, 16) + .unwrap(); + + let cursor = &runtime.store.connection(&joined).unwrap().delivery_cursor; + assert_eq!(cursor.last_sent_message_id, Some(1)); + assert!(cursor.last_sent_message_hash.is_some()); + assert_eq!( + runtime + .store + .connection(&joined) + .unwrap() + .staged_outbound_message, + None + ); + } + + #[test] + fn transport_retry_keeps_staged_outbound_message() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let created = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + let invitation = runtime + .store + .connection(&created) + .unwrap() + .invitation + .clone() + .unwrap(); + let joined = runtime + .join_connection(invitation, reply_queue(), 20) + .unwrap(); + + let mut setup_transport = ScriptedTransport::with_responses(vec![ + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient".to_vec(), + sender_id: b"sender".to_vec(), + server_dh_public_key: b"server-dh".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient-2".to_vec(), + sender_id: b"sender-2".to_vec(), + server_dh_public_key: b"server-dh-2".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + ]); + runtime + .execute_ready_commands(&mut setup_transport, 30, 16) + .unwrap(); + + runtime + .send_message(&joined, b"hello simplex".to_vec(), 40) + .unwrap(); + + struct FailingTransport; + impl RadrootsSimplexSmpCommandTransport for FailingTransport { + type Error = String; + fn execute( + &mut self, + _request: RadrootsSimplexSmpTransportRequest, + ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> { + Err("synthetic failure".to_owned()) + } + } + + runtime + .execute_ready_commands(&mut FailingTransport, 50, 16) + .unwrap(); + + assert_eq!( + runtime + .store + .connection(&joined) + .unwrap() + .delivery_cursor + .last_sent_message_id, + None + ); + assert_eq!( + runtime + .store + .connection(&joined) + .unwrap() + .staged_outbound_message + .as_ref() + .map(|message| message.message_id), + Some(1) + ); + let ready_again = runtime.retry_pending(50 + 5_000, 16); + assert_eq!(ready_again.len(), 1); + } + + #[cfg(feature = "std")] + #[test] + fn builder_opens_persistent_store_path() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().join("runtime-store.json"); + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new() + .persistent_store_path(&path) + .build() + .unwrap(); + runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) .unwrap(); - runtime.reconnect_connection(&joined, 70).unwrap(); - let ready = runtime.retry_pending(70 + 5_000, 64); - assert!(!ready.is_empty()); + assert!(path.exists()); } #[test] - fn handles_inbound_hello_and_receipt_events() { + fn manual_record_command_failure_clears_staged_delivery_state() { let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); - let connection_id = runtime + let created = runtime .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) .unwrap(); - runtime.drain_events(8); + let invitation = runtime + .store + .connection(&created) + .unwrap() + .invitation + .clone() + .unwrap(); + let joined = runtime + .join_connection(invitation, reply_queue(), 20) + .unwrap(); + let mut setup_transport = ScriptedTransport::with_responses(vec![ + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient".to_vec(), + sender_id: b"sender".to_vec(), + server_dh_public_key: b"server-dh".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { + recipient_id: b"recipient-2".to_vec(), + sender_id: b"sender-2".to_vec(), + server_dh_public_key: b"server-dh-2".to_vec(), + queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), + link_id: None, + service_id: None, + server_notification_credentials: None, + }), + RadrootsSimplexSmpBrokerMessage::Ok, + RadrootsSimplexSmpBrokerMessage::Ok, + ]); runtime - .handle_inbound_decrypted_message( - &connection_id, - RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { - header: RadrootsSimplexAgentMessageHeader { - message_id: 1, - previous_message_hash: Vec::new(), - }, - message: RadrootsSimplexAgentMessage::Hello, - padding: Vec::new(), - }), - b"transport-hash".to_vec(), - ) + .execute_ready_commands(&mut setup_transport, 30, 16) .unwrap(); + runtime - .handle_inbound_decrypted_message( - &connection_id, - RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { - header: RadrootsSimplexAgentMessageHeader { - message_id: 2, - previous_message_hash: b"transport-hash".to_vec(), - }, - message: RadrootsSimplexAgentMessage::Receipt( - RadrootsSimplexAgentMessageReceipt { - message_id: 1, - message_hash: b"transport-hash".to_vec(), - receipt_info: Vec::new(), - }, - ), - padding: Vec::new(), - }), - b"transport-hash-2".to_vec(), + .send_message(&joined, b"hello simplex".to_vec(), 40) + .unwrap(); + let command = runtime.retry_pending(40, 16).remove(0); + runtime + .record_command_outcome( + command.id, + RadrootsSimplexAgentCommandOutcome::Failed { + message: "synthetic failure".into(), + }, ) .unwrap(); - - let events = runtime.drain_events(16); - assert!(events.iter().any(|event| matches!( - event, - RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { .. } - ))); - assert!(events.iter().any(|event| matches!( - event, - RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { message_id: 1, .. } - ))); + assert_eq!( + runtime + .store + .connection(&joined) + .unwrap() + .staged_outbound_message, + None + ); } } diff --git a/crates/simplex-agent-store/Cargo.toml b/crates/simplex-agent-store/Cargo.toml @@ -15,8 +15,20 @@ readme.workspace = true [features] default = ["std"] -std = ["radroots-simplex-agent-proto/std", "radroots-simplex-smp-proto/std"] +std = [ + "radroots-simplex-agent-proto/std", + "radroots-simplex-smp-proto/std", + "serde/std", + "serde_json/std", + "sha2/std", +] [dependencies] radroots-simplex-agent-proto = { workspace = true, default-features = false } radroots-simplex-smp-proto = { workspace = true, default-features = false } +serde = { workspace = true } +serde_json = { workspace = true } +sha2 = { workspace = true, default-features = false } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/simplex-agent-store/src/error.rs b/crates/simplex-agent-store/src/error.rs @@ -7,6 +7,15 @@ pub enum RadrootsSimplexAgentStoreError { QueueNotFound(String), CommandNotFound(u64), MissingPrimarySendQueue(String), + PendingOutboundMessage(String), + StagedOutboundMessageMissing(String), + StagedOutboundMessageMismatch { + connection_id: String, + expected: u64, + actual: u64, + }, + QueueAuthStateMissing(String), + Persistence(String), } impl fmt::Display for RadrootsSimplexAgentStoreError { @@ -21,6 +30,35 @@ impl fmt::Display for RadrootsSimplexAgentStoreError { "SimpleX agent connection `{id}` has no primary send queue" ) } + Self::PendingOutboundMessage(id) => { + write!( + f, + "SimpleX agent connection `{id}` already has a staged outbound message" + ) + } + Self::StagedOutboundMessageMissing(id) => { + write!( + f, + "SimpleX agent connection `{id}` has no staged outbound message" + ) + } + Self::StagedOutboundMessageMismatch { + connection_id, + expected, + actual, + } => { + write!( + f, + "SimpleX agent connection `{connection_id}` staged outbound message mismatch: expected `{expected}`, got `{actual}`" + ) + } + Self::QueueAuthStateMissing(id) => { + write!( + f, + "SimpleX agent queue `{id}` is missing transport auth state" + ) + } + Self::Persistence(message) => write!(f, "{message}"), } } } diff --git a/crates/simplex-agent-store/src/lib.rs b/crates/simplex-agent-store/src/lib.rs @@ -10,8 +10,10 @@ pub mod prelude { pub use crate::error::RadrootsSimplexAgentStoreError; pub use crate::store::{ RadrootsSimplexAgentConnectionRecord, RadrootsSimplexAgentDeliveryCursor, - RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentPendingCommandKind, - RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentQueueRole, - RadrootsSimplexAgentRecentMessageRecord, RadrootsSimplexAgentStore, + RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentPendingCommand, + RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentPreparedOutboundMessage, + RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentQueueRecord, + RadrootsSimplexAgentQueueRole, RadrootsSimplexAgentRecentMessageRecord, + RadrootsSimplexAgentStore, }; } diff --git a/crates/simplex-agent-store/src/store.rs b/crates/simplex-agent-store/src/store.rs @@ -1,31 +1,53 @@ use crate::error::RadrootsSimplexAgentStoreError; use alloc::collections::BTreeMap; -use alloc::string::String; +use alloc::string::{String, ToString}; use alloc::vec::Vec; use radroots_simplex_agent_proto::prelude::{ RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentConnectionMode, RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentEnvelope, RadrootsSimplexAgentMessageId, RadrootsSimplexAgentMessageReceipt, RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor, - RadrootsSimplexSmpRatchetState, + RadrootsSimplexSmpRatchetState, decode_connection_link, decode_envelope, + encode_connection_link, encode_envelope, }; +use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpServerAddress, +}; +#[cfg(feature = "std")] +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +#[cfg(feature = "std")] +use std::fs; +#[cfg(feature = "std")] +use std::path::{Path, PathBuf}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub enum RadrootsSimplexAgentQueueRole { Receive, Send, } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +pub struct RadrootsSimplexAgentQueueAuthState { + pub session_identifier: Vec<u8>, + pub queue_key_material: Vec<u8>, + pub server_session_key: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct RadrootsSimplexAgentQueueRecord { pub descriptor: RadrootsSimplexAgentQueueDescriptor, pub role: RadrootsSimplexAgentQueueRole, pub subscribed: bool, pub primary: bool, pub tested: bool, + pub auth_state: Option<RadrootsSimplexAgentQueueAuthState>, } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct RadrootsSimplexAgentDeliveryCursor { pub last_sent_message_id: Option<RadrootsSimplexAgentMessageId>, pub last_received_message_id: Option<RadrootsSimplexAgentMessageId>, @@ -34,12 +56,27 @@ pub struct RadrootsSimplexAgentDeliveryCursor { } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct RadrootsSimplexAgentRecentMessageRecord { pub message_id: RadrootsSimplexAgentMessageId, pub message_hash: Vec<u8>, } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +pub struct RadrootsSimplexAgentOutboundMessage { + pub message_id: RadrootsSimplexAgentMessageId, + pub message_hash: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentPreparedOutboundMessage { + pub message_id: RadrootsSimplexAgentMessageId, + pub previous_message_hash: Vec<u8>, + pub message_hash: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub enum RadrootsSimplexAgentPendingCommandKind { CreateQueue { descriptor: RadrootsSimplexAgentQueueDescriptor, @@ -51,6 +88,7 @@ pub enum RadrootsSimplexAgentPendingCommandKind { SendEnvelope { queue: RadrootsSimplexAgentQueueAddress, envelope: RadrootsSimplexAgentEnvelope, + delivery: Option<RadrootsSimplexAgentOutboundMessage>, }, SubscribeQueue { queue: RadrootsSimplexAgentQueueAddress, @@ -87,14 +125,149 @@ pub struct RadrootsSimplexAgentConnectionRecord { pub ratchet_state: Option<RadrootsSimplexSmpRatchetState>, pub delivery_cursor: RadrootsSimplexAgentDeliveryCursor, pub recent_messages: Vec<RadrootsSimplexAgentRecentMessageRecord>, + pub staged_outbound_message: Option<RadrootsSimplexAgentOutboundMessage>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentStoreSnapshot { + next_connection_sequence: u64, + next_command_sequence: u64, + connections: Vec<RadrootsSimplexAgentConnectionSnapshot>, + pending_commands: Vec<RadrootsSimplexAgentPendingCommandSnapshot>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentConnectionSnapshot { + id: String, + mode: String, + status: String, + invitation: Option<Vec<u8>>, + queues: Vec<RadrootsSimplexAgentQueueRecordSnapshot>, + ratchet_state: Option<RadrootsSimplexAgentRatchetStateSnapshot>, + delivery_cursor: RadrootsSimplexAgentDeliveryCursor, + recent_messages: Vec<RadrootsSimplexAgentRecentMessageRecord>, + staged_outbound_message: Option<RadrootsSimplexAgentOutboundMessage>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentQueueRecordSnapshot { + descriptor: RadrootsSimplexAgentQueueDescriptorSnapshot, + role: String, + subscribed: bool, + primary: bool, + tested: bool, + auth_state: Option<RadrootsSimplexAgentQueueAuthState>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentQueueDescriptorSnapshot { + queue_uri: String, + replaced_queue: Option<RadrootsSimplexAgentQueueAddressSnapshot>, + primary: bool, + sender_key: Option<Vec<u8>>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentQueueAddressSnapshot { + server_identity: String, + hosts: Vec<String>, + port: Option<u16>, + sender_id: Vec<u8>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentRatchetStateSnapshot { + role: String, + root_epoch: u64, + previous_sending_chain_length: u32, + sending_chain_length: u32, + receiving_chain_length: u32, + local_dh_public_key: Vec<u8>, + remote_dh_public_key: Vec<u8>, + current_pq_public_key: Option<Vec<u8>>, + remote_pq_public_key: Option<Vec<u8>>, + pending_outbound_pq_ciphertext: Option<Vec<u8>>, + pending_inbound_pq_ciphertext: Option<Vec<u8>>, + current_pq_shared_secret: Option<Vec<u8>>, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentPendingCommandSnapshot { + id: u64, + connection_id: String, + kind: RadrootsSimplexAgentPendingCommandKindSnapshot, + attempts: u32, + ready_at: u64, + inflight: bool, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +enum RadrootsSimplexAgentPendingCommandKindSnapshot { + CreateQueue { + descriptor: RadrootsSimplexAgentQueueDescriptorSnapshot, + }, + SecureQueue { + queue: RadrootsSimplexAgentQueueAddressSnapshot, + sender_key: Option<Vec<u8>>, + }, + SendEnvelope { + queue: RadrootsSimplexAgentQueueAddressSnapshot, + envelope: Vec<u8>, + delivery: Option<RadrootsSimplexAgentOutboundMessage>, + }, + SubscribeQueue { + queue: RadrootsSimplexAgentQueueAddressSnapshot, + }, + AckInboxMessage { + queue: RadrootsSimplexAgentQueueAddressSnapshot, + receipt: RadrootsSimplexAgentMessageReceiptSnapshot, + }, + RotateQueues { + descriptors: Vec<RadrootsSimplexAgentQueueDescriptorSnapshot>, + }, + TestQueues { + queues: Vec<RadrootsSimplexAgentQueueAddressSnapshot>, + }, +} + +#[cfg(feature = "std")] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RadrootsSimplexAgentMessageReceiptSnapshot { + message_id: RadrootsSimplexAgentMessageId, + message_hash: Vec<u8>, + receipt_info: Vec<u8>, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct RadrootsSimplexAgentStore { next_connection_sequence: u64, next_command_sequence: u64, connections: BTreeMap<String, RadrootsSimplexAgentConnectionRecord>, pending_commands: BTreeMap<u64, RadrootsSimplexAgentPendingCommand>, + #[cfg(feature = "std")] + persistence_path: Option<PathBuf>, +} + +impl Default for RadrootsSimplexAgentStore { + fn default() -> Self { + Self { + next_connection_sequence: 0, + next_command_sequence: 0, + connections: BTreeMap::new(), + pending_commands: BTreeMap::new(), + #[cfg(feature = "std")] + persistence_path: None, + } + } } impl RadrootsSimplexAgentStore { @@ -102,6 +275,69 @@ impl RadrootsSimplexAgentStore { Self::default() } + #[cfg(feature = "std")] + pub fn open(path: impl AsRef<Path>) -> Result<Self, RadrootsSimplexAgentStoreError> { + let path = path.as_ref().to_path_buf(); + if !path.exists() { + let mut store = Self::default(); + store.persistence_path = Some(path); + return Ok(store); + } + + let raw = fs::read(&path).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to read SimpleX agent store snapshot `{}`: {error}", + path.display() + )) + })?; + + let snapshot: RadrootsSimplexAgentStoreSnapshot = + serde_json::from_slice(&raw).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to parse SimpleX agent store snapshot `{}`: {error}", + path.display() + )) + })?; + + let mut store = Self::from_snapshot(snapshot)?; + store.persistence_path = Some(path); + Ok(store) + } + + #[cfg(feature = "std")] + pub fn set_persistence_path(&mut self, path: impl AsRef<Path>) { + self.persistence_path = Some(path.as_ref().to_path_buf()); + } + + #[cfg(feature = "std")] + pub fn flush(&self) -> Result<(), RadrootsSimplexAgentStoreError> { + let Some(path) = self.persistence_path.as_ref() else { + return Ok(()); + }; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to create SimpleX agent store directory `{}`: {error}", + parent.display() + )) + })?; + } + let snapshot = self.snapshot()?; + let mut encoded = serde_json::to_vec_pretty(&snapshot).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to serialize SimpleX agent store snapshot `{}`: {error}", + path.display() + )) + })?; + encoded.push(b'\n'); + fs::write(path, encoded).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to write SimpleX agent store snapshot `{}`: {error}", + path.display() + )) + }) + } + pub fn create_connection( &mut self, mode: RadrootsSimplexAgentConnectionMode, @@ -125,6 +361,7 @@ impl RadrootsSimplexAgentStore { last_received_message_hash: None, }, recent_messages: Vec::new(), + staged_outbound_message: None, }; self.connections.insert(id, record.clone()); record @@ -164,6 +401,7 @@ impl RadrootsSimplexAgentStore { role: RadrootsSimplexAgentQueueRole, primary: bool, ) -> Result<(), RadrootsSimplexAgentStoreError> { + let derived_auth = derive_queue_auth_state(connection_id, &descriptor, role); let connection = self.connection_mut(connection_id)?; let address = descriptor.queue_address(); if let Some(queue) = connection @@ -174,6 +412,9 @@ impl RadrootsSimplexAgentStore { queue.descriptor = descriptor; queue.role = role; queue.primary = primary; + if queue.auth_state.is_none() { + queue.auth_state = Some(derived_auth); + } return Ok(()); } connection.queues.push(RadrootsSimplexAgentQueueRecord { @@ -182,10 +423,25 @@ impl RadrootsSimplexAgentStore { subscribed: false, primary, tested: false, + auth_state: Some(derived_auth), }); Ok(()) } + pub fn queue_record( + &self, + connection_id: &str, + queue_address: &RadrootsSimplexAgentQueueAddress, + ) -> Result<RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentStoreError> { + let connection = self.connection(connection_id)?; + connection + .queues + .iter() + .find(|queue| &queue.descriptor.queue_address() == queue_address) + .cloned() + .ok_or_else(|| RadrootsSimplexAgentStoreError::QueueNotFound(connection_id.into())) + } + pub fn mark_queue_subscribed( &mut self, connection_id: &str, @@ -205,6 +461,25 @@ impl RadrootsSimplexAgentStore { Ok(()) } + pub fn mark_queue_tested( + &mut self, + connection_id: &str, + queue_address: &RadrootsSimplexAgentQueueAddress, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + let Some(queue) = connection + .queues + .iter_mut() + .find(|queue| &queue.descriptor.queue_address() == queue_address) + else { + return Err(RadrootsSimplexAgentStoreError::QueueNotFound( + connection_id.into(), + )); + }; + queue.tested = true; + Ok(()) + } + pub fn primary_send_queue( &self, connection_id: &str, @@ -233,22 +508,99 @@ impl RadrootsSimplexAgentStore { .collect()) } - pub fn record_outbound_message( + pub fn queue_auth_state( + &self, + connection_id: &str, + queue_address: &RadrootsSimplexAgentQueueAddress, + ) -> Result<RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentStoreError> { + self.queue_record(connection_id, queue_address)? + .auth_state + .ok_or_else(|| { + RadrootsSimplexAgentStoreError::QueueAuthStateMissing(connection_id.into()) + }) + } + + pub fn prepare_outbound_message( &mut self, connection_id: &str, - message_id: RadrootsSimplexAgentMessageId, message_hash: Vec<u8>, - ) -> Result<(), RadrootsSimplexAgentStoreError> { + ) -> Result<RadrootsSimplexAgentPreparedOutboundMessage, RadrootsSimplexAgentStoreError> { let connection = self.connection_mut(connection_id)?; - connection.delivery_cursor.last_sent_message_id = Some(message_id); - connection.delivery_cursor.last_sent_message_hash = Some(message_hash.clone()); + if connection.staged_outbound_message.is_some() { + return Err(RadrootsSimplexAgentStoreError::PendingOutboundMessage( + connection_id.into(), + )); + } + let prepared = RadrootsSimplexAgentPreparedOutboundMessage { + message_id: connection + .delivery_cursor + .last_sent_message_id + .unwrap_or(0) + .saturating_add(1), + previous_message_hash: connection + .delivery_cursor + .last_sent_message_hash + .clone() + .unwrap_or_default(), + message_hash: message_hash.clone(), + }; + connection.staged_outbound_message = Some(RadrootsSimplexAgentOutboundMessage { + message_id: prepared.message_id, + message_hash, + }); + Ok(prepared) + } + + pub fn confirm_outbound_message( + &mut self, + connection_id: &str, + message_id: RadrootsSimplexAgentMessageId, + ) -> Result<RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + let staged = connection.staged_outbound_message.take().ok_or_else(|| { + RadrootsSimplexAgentStoreError::StagedOutboundMessageMissing(connection_id.into()) + })?; + if staged.message_id != message_id { + connection.staged_outbound_message = Some(staged.clone()); + return Err( + RadrootsSimplexAgentStoreError::StagedOutboundMessageMismatch { + connection_id: connection_id.into(), + expected: staged.message_id, + actual: message_id, + }, + ); + } + connection.delivery_cursor.last_sent_message_id = Some(staged.message_id); + connection.delivery_cursor.last_sent_message_hash = Some(staged.message_hash.clone()); connection .recent_messages .push(RadrootsSimplexAgentRecentMessageRecord { - message_id, - message_hash, + message_id: staged.message_id, + message_hash: staged.message_hash.clone(), }); - Ok(()) + Ok(staged) + } + + pub fn clear_staged_outbound_message( + &mut self, + connection_id: &str, + message_id: RadrootsSimplexAgentMessageId, + ) -> Result<RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + let staged = connection.staged_outbound_message.take().ok_or_else(|| { + RadrootsSimplexAgentStoreError::StagedOutboundMessageMissing(connection_id.into()) + })?; + if staged.message_id != message_id { + connection.staged_outbound_message = Some(staged.clone()); + return Err( + RadrootsSimplexAgentStoreError::StagedOutboundMessageMismatch { + connection_id: connection_id.into(), + expected: staged.message_id, + actual: message_id, + }, + ); + } + Ok(staged) } pub fn record_inbound_message( @@ -344,6 +696,563 @@ impl RadrootsSimplexAgentStore { .remove(&command_id) .ok_or(RadrootsSimplexAgentStoreError::CommandNotFound(command_id)) } + + #[cfg(feature = "std")] + fn snapshot( + &self, + ) -> Result<RadrootsSimplexAgentStoreSnapshot, RadrootsSimplexAgentStoreError> { + let connections = self + .connections + .values() + .cloned() + .map(connection_to_snapshot) + .collect::<Result<Vec<_>, _>>()?; + let pending_commands = self + .pending_commands + .values() + .cloned() + .map(command_to_snapshot) + .collect::<Result<Vec<_>, _>>()?; + Ok(RadrootsSimplexAgentStoreSnapshot { + next_connection_sequence: self.next_connection_sequence, + next_command_sequence: self.next_command_sequence, + connections, + pending_commands, + }) + } + + #[cfg(feature = "std")] + fn from_snapshot( + snapshot: RadrootsSimplexAgentStoreSnapshot, + ) -> Result<Self, RadrootsSimplexAgentStoreError> { + let mut connections = BTreeMap::new(); + for connection in snapshot.connections { + let record = connection_from_snapshot(connection)?; + connections.insert(record.id.clone(), record); + } + let mut pending_commands = BTreeMap::new(); + for command in snapshot.pending_commands { + let record = command_from_snapshot(command)?; + pending_commands.insert(record.id, record); + } + Ok(Self { + next_connection_sequence: snapshot.next_connection_sequence, + next_command_sequence: snapshot.next_command_sequence, + connections, + pending_commands, + persistence_path: None, + }) + } +} + +fn derive_queue_auth_state( + connection_id: &str, + descriptor: &RadrootsSimplexAgentQueueDescriptor, + role: RadrootsSimplexAgentQueueRole, +) -> RadrootsSimplexAgentQueueAuthState { + let address = descriptor.queue_address(); + let role_label = match role { + RadrootsSimplexAgentQueueRole::Receive => b"receive".as_slice(), + RadrootsSimplexAgentQueueRole::Send => b"send".as_slice(), + }; + let session_identifier = hash_material( + b"session", + connection_id, + &address, + role_label, + descriptor.sender_key.as_deref(), + )[..24] + .to_vec(); + let queue_key_material = hash_material( + b"queue-key", + connection_id, + &address, + role_label, + descriptor.sender_key.as_deref(), + ); + let server_session_key = hash_material( + b"server-session", + connection_id, + &address, + role_label, + descriptor.sender_key.as_deref(), + ); + RadrootsSimplexAgentQueueAuthState { + session_identifier, + queue_key_material, + server_session_key, + } +} + +fn hash_material( + label: &[u8], + connection_id: &str, + address: &RadrootsSimplexAgentQueueAddress, + role_label: &[u8], + sender_key: Option<&[u8]>, +) -> Vec<u8> { + let mut hasher = Sha256::new(); + hasher.update(label); + hasher.update(connection_id.as_bytes()); + hasher.update(address.server.server_identity.as_bytes()); + for host in &address.server.hosts { + hasher.update(host.as_bytes()); + hasher.update([0_u8]); + } + hasher.update(address.server.port.unwrap_or_default().to_be_bytes()); + hasher.update(&address.sender_id); + hasher.update(role_label); + if let Some(sender_key) = sender_key { + hasher.update(sender_key); + } + hasher.finalize().to_vec() +} + +#[cfg(feature = "std")] +fn connection_to_snapshot( + record: RadrootsSimplexAgentConnectionRecord, +) -> Result<RadrootsSimplexAgentConnectionSnapshot, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentConnectionSnapshot { + id: record.id, + mode: encode_connection_mode(record.mode).into(), + status: encode_connection_status(record.status).into(), + invitation: record + .invitation + .as_ref() + .map(encode_connection_link) + .transpose() + .map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to encode SimpleX connection invitation: {error}" + )) + })?, + queues: record + .queues + .into_iter() + .map(queue_record_to_snapshot) + .collect::<Result<Vec<_>, _>>()?, + ratchet_state: record.ratchet_state.map(ratchet_state_to_snapshot), + delivery_cursor: record.delivery_cursor, + recent_messages: record.recent_messages, + staged_outbound_message: record.staged_outbound_message, + }) +} + +#[cfg(feature = "std")] +fn connection_from_snapshot( + snapshot: RadrootsSimplexAgentConnectionSnapshot, +) -> Result<RadrootsSimplexAgentConnectionRecord, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentConnectionRecord { + id: snapshot.id, + mode: decode_connection_mode(&snapshot.mode)?, + status: decode_connection_status(&snapshot.status)?, + invitation: snapshot + .invitation + .as_ref() + .map(|value| { + decode_connection_link(value).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to decode SimpleX connection invitation: {error}" + )) + }) + }) + .transpose()?, + queues: snapshot + .queues + .into_iter() + .map(queue_record_from_snapshot) + .collect::<Result<Vec<_>, _>>()?, + ratchet_state: snapshot + .ratchet_state + .map(ratchet_state_from_snapshot) + .transpose()?, + delivery_cursor: snapshot.delivery_cursor, + recent_messages: snapshot.recent_messages, + staged_outbound_message: snapshot.staged_outbound_message, + }) +} + +#[cfg(feature = "std")] +fn queue_record_to_snapshot( + record: RadrootsSimplexAgentQueueRecord, +) -> Result<RadrootsSimplexAgentQueueRecordSnapshot, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentQueueRecordSnapshot { + descriptor: queue_descriptor_to_snapshot(record.descriptor), + role: encode_queue_role(record.role).into(), + subscribed: record.subscribed, + primary: record.primary, + tested: record.tested, + auth_state: record.auth_state, + }) +} + +#[cfg(feature = "std")] +fn queue_record_from_snapshot( + snapshot: RadrootsSimplexAgentQueueRecordSnapshot, +) -> Result<RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentQueueRecord { + descriptor: queue_descriptor_from_snapshot(snapshot.descriptor)?, + role: decode_queue_role(&snapshot.role)?, + subscribed: snapshot.subscribed, + primary: snapshot.primary, + tested: snapshot.tested, + auth_state: snapshot.auth_state, + }) +} + +#[cfg(feature = "std")] +fn queue_descriptor_to_snapshot( + descriptor: RadrootsSimplexAgentQueueDescriptor, +) -> RadrootsSimplexAgentQueueDescriptorSnapshot { + RadrootsSimplexAgentQueueDescriptorSnapshot { + queue_uri: descriptor.queue_uri.to_string(), + replaced_queue: descriptor.replaced_queue.map(queue_address_to_snapshot), + primary: descriptor.primary, + sender_key: descriptor.sender_key, + } +} + +#[cfg(feature = "std")] +fn queue_descriptor_from_snapshot( + snapshot: RadrootsSimplexAgentQueueDescriptorSnapshot, +) -> Result<RadrootsSimplexAgentQueueDescriptor, RadrootsSimplexAgentStoreError> { + let queue_uri = RadrootsSimplexSmpQueueUri::parse(&snapshot.queue_uri).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to parse SimpleX queue uri `{}`: {error}", + snapshot.queue_uri + )) + })?; + Ok(RadrootsSimplexAgentQueueDescriptor { + queue_uri, + replaced_queue: snapshot + .replaced_queue + .map(queue_address_from_snapshot) + .transpose()?, + primary: snapshot.primary, + sender_key: snapshot.sender_key, + }) +} + +#[cfg(feature = "std")] +fn queue_address_to_snapshot( + address: RadrootsSimplexAgentQueueAddress, +) -> RadrootsSimplexAgentQueueAddressSnapshot { + RadrootsSimplexAgentQueueAddressSnapshot { + server_identity: address.server.server_identity, + hosts: address.server.hosts, + port: address.server.port, + sender_id: address.sender_id, + } +} + +#[cfg(feature = "std")] +fn queue_address_from_snapshot( + snapshot: RadrootsSimplexAgentQueueAddressSnapshot, +) -> Result<RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentStoreError> { + if snapshot.server_identity.is_empty() || snapshot.hosts.is_empty() { + return Err(RadrootsSimplexAgentStoreError::Persistence( + "invalid SimpleX queue address snapshot".into(), + )); + } + Ok(RadrootsSimplexAgentQueueAddress { + server: RadrootsSimplexSmpServerAddress { + server_identity: snapshot.server_identity, + hosts: snapshot.hosts, + port: snapshot.port, + }, + sender_id: snapshot.sender_id, + }) +} + +#[cfg(feature = "std")] +fn ratchet_state_to_snapshot( + state: RadrootsSimplexSmpRatchetState, +) -> RadrootsSimplexAgentRatchetStateSnapshot { + RadrootsSimplexAgentRatchetStateSnapshot { + role: alloc::format!("{:?}", state.role).to_ascii_lowercase(), + root_epoch: state.root_epoch, + previous_sending_chain_length: state.previous_sending_chain_length, + sending_chain_length: state.sending_chain_length, + receiving_chain_length: state.receiving_chain_length, + local_dh_public_key: state.local_dh_public_key, + remote_dh_public_key: state.remote_dh_public_key, + current_pq_public_key: state.current_pq_public_key, + remote_pq_public_key: state.remote_pq_public_key, + pending_outbound_pq_ciphertext: state.pending_outbound_pq_ciphertext, + pending_inbound_pq_ciphertext: state.pending_inbound_pq_ciphertext, + current_pq_shared_secret: state.current_pq_shared_secret, + } +} + +#[cfg(feature = "std")] +fn ratchet_state_from_snapshot( + snapshot: RadrootsSimplexAgentRatchetStateSnapshot, +) -> Result<RadrootsSimplexSmpRatchetState, RadrootsSimplexAgentStoreError> { + let mut state = match snapshot.role.as_str() { + "initiator" => RadrootsSimplexSmpRatchetState::initiator( + snapshot.local_dh_public_key.clone(), + snapshot.remote_dh_public_key.clone(), + snapshot.remote_pq_public_key.clone(), + ) + .map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to restore initiator ratchet state: {error}" + )) + })?, + "responder" => RadrootsSimplexSmpRatchetState::responder( + snapshot.local_dh_public_key.clone(), + snapshot.remote_dh_public_key.clone(), + snapshot.current_pq_public_key.clone(), + ) + .map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to restore responder ratchet state: {error}" + )) + })?, + other => { + return Err(RadrootsSimplexAgentStoreError::Persistence(format!( + "invalid SimpleX ratchet role `{other}`" + ))); + } + }; + state.root_epoch = snapshot.root_epoch; + state.previous_sending_chain_length = snapshot.previous_sending_chain_length; + state.sending_chain_length = snapshot.sending_chain_length; + state.receiving_chain_length = snapshot.receiving_chain_length; + state.current_pq_public_key = snapshot.current_pq_public_key; + state.remote_pq_public_key = snapshot.remote_pq_public_key; + state.pending_outbound_pq_ciphertext = snapshot.pending_outbound_pq_ciphertext; + state.pending_inbound_pq_ciphertext = snapshot.pending_inbound_pq_ciphertext; + state.current_pq_shared_secret = snapshot.current_pq_shared_secret; + Ok(state) +} + +#[cfg(feature = "std")] +fn command_to_snapshot( + command: RadrootsSimplexAgentPendingCommand, +) -> Result<RadrootsSimplexAgentPendingCommandSnapshot, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentPendingCommandSnapshot { + id: command.id, + connection_id: command.connection_id, + kind: command_kind_to_snapshot(command.kind)?, + attempts: command.attempts, + ready_at: command.ready_at, + inflight: command.inflight, + }) +} + +#[cfg(feature = "std")] +fn command_from_snapshot( + snapshot: RadrootsSimplexAgentPendingCommandSnapshot, +) -> Result<RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentStoreError> { + Ok(RadrootsSimplexAgentPendingCommand { + id: snapshot.id, + connection_id: snapshot.connection_id, + kind: command_kind_from_snapshot(snapshot.kind)?, + attempts: snapshot.attempts, + ready_at: snapshot.ready_at, + inflight: snapshot.inflight, + }) +} + +#[cfg(feature = "std")] +fn command_kind_to_snapshot( + kind: RadrootsSimplexAgentPendingCommandKind, +) -> Result<RadrootsSimplexAgentPendingCommandKindSnapshot, RadrootsSimplexAgentStoreError> { + Ok(match kind { + RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::CreateQueue { + descriptor: queue_descriptor_to_snapshot(descriptor), + } + } + RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::SecureQueue { + queue: queue_address_to_snapshot(queue), + sender_key, + } + } + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue, + envelope, + delivery, + } => RadrootsSimplexAgentPendingCommandKindSnapshot::SendEnvelope { + queue: queue_address_to_snapshot(queue), + envelope: encode_envelope(&envelope).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to encode SimpleX envelope: {error}" + )) + })?, + delivery, + }, + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::SubscribeQueue { + queue: queue_address_to_snapshot(queue), + } + } + RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { queue, receipt } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::AckInboxMessage { + queue: queue_address_to_snapshot(queue), + receipt: RadrootsSimplexAgentMessageReceiptSnapshot { + message_id: receipt.message_id, + message_hash: receipt.message_hash, + receipt_info: receipt.receipt_info, + }, + } + } + RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::RotateQueues { + descriptors: descriptors + .into_iter() + .map(queue_descriptor_to_snapshot) + .collect(), + } + } + RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { + RadrootsSimplexAgentPendingCommandKindSnapshot::TestQueues { + queues: queues.into_iter().map(queue_address_to_snapshot).collect(), + } + } + }) +} + +#[cfg(feature = "std")] +fn command_kind_from_snapshot( + snapshot: RadrootsSimplexAgentPendingCommandKindSnapshot, +) -> Result<RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentStoreError> { + Ok(match snapshot { + RadrootsSimplexAgentPendingCommandKindSnapshot::CreateQueue { descriptor } => { + RadrootsSimplexAgentPendingCommandKind::CreateQueue { + descriptor: queue_descriptor_from_snapshot(descriptor)?, + } + } + RadrootsSimplexAgentPendingCommandKindSnapshot::SecureQueue { queue, sender_key } => { + RadrootsSimplexAgentPendingCommandKind::SecureQueue { + queue: queue_address_from_snapshot(queue)?, + sender_key, + } + } + RadrootsSimplexAgentPendingCommandKindSnapshot::SendEnvelope { + queue, + envelope, + delivery, + } => RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue: queue_address_from_snapshot(queue)?, + envelope: decode_envelope(&envelope).map_err(|error| { + RadrootsSimplexAgentStoreError::Persistence(format!( + "failed to decode SimpleX envelope: {error}" + )) + })?, + delivery, + }, + RadrootsSimplexAgentPendingCommandKindSnapshot::SubscribeQueue { queue } => { + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { + queue: queue_address_from_snapshot(queue)?, + } + } + RadrootsSimplexAgentPendingCommandKindSnapshot::AckInboxMessage { queue, receipt } => { + RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { + queue: queue_address_from_snapshot(queue)?, + receipt: RadrootsSimplexAgentMessageReceipt { + message_id: receipt.message_id, + message_hash: receipt.message_hash, + receipt_info: receipt.receipt_info, + }, + } + } + RadrootsSimplexAgentPendingCommandKindSnapshot::RotateQueues { descriptors } => { + RadrootsSimplexAgentPendingCommandKind::RotateQueues { + descriptors: descriptors + .into_iter() + .map(queue_descriptor_from_snapshot) + .collect::<Result<Vec<_>, _>>()?, + } + } + RadrootsSimplexAgentPendingCommandKindSnapshot::TestQueues { queues } => { + RadrootsSimplexAgentPendingCommandKind::TestQueues { + queues: queues + .into_iter() + .map(queue_address_from_snapshot) + .collect::<Result<Vec<_>, _>>()?, + } + } + }) +} + +#[cfg(feature = "std")] +fn encode_connection_mode(mode: RadrootsSimplexAgentConnectionMode) -> &'static str { + match mode { + RadrootsSimplexAgentConnectionMode::Direct => "direct", + RadrootsSimplexAgentConnectionMode::ContactAddress => "contact_address", + } +} + +#[cfg(feature = "std")] +fn decode_connection_mode( + value: &str, +) -> Result<RadrootsSimplexAgentConnectionMode, RadrootsSimplexAgentStoreError> { + match value { + "direct" => Ok(RadrootsSimplexAgentConnectionMode::Direct), + "contact_address" => Ok(RadrootsSimplexAgentConnectionMode::ContactAddress), + other => Err(RadrootsSimplexAgentStoreError::Persistence(format!( + "invalid SimpleX connection mode `{other}`" + ))), + } +} + +#[cfg(feature = "std")] +fn encode_connection_status(status: RadrootsSimplexAgentConnectionStatus) -> &'static str { + match status { + RadrootsSimplexAgentConnectionStatus::CreatePending => "create_pending", + RadrootsSimplexAgentConnectionStatus::InvitationReady => "invitation_ready", + RadrootsSimplexAgentConnectionStatus::JoinPending => "join_pending", + RadrootsSimplexAgentConnectionStatus::AwaitingApproval => "awaiting_approval", + RadrootsSimplexAgentConnectionStatus::Allowed => "allowed", + RadrootsSimplexAgentConnectionStatus::Connected => "connected", + RadrootsSimplexAgentConnectionStatus::Suspended => "suspended", + RadrootsSimplexAgentConnectionStatus::Rotating => "rotating", + RadrootsSimplexAgentConnectionStatus::Deleted => "deleted", + } +} + +#[cfg(feature = "std")] +fn decode_connection_status( + value: &str, +) -> Result<RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentStoreError> { + match value { + "create_pending" => Ok(RadrootsSimplexAgentConnectionStatus::CreatePending), + "invitation_ready" => Ok(RadrootsSimplexAgentConnectionStatus::InvitationReady), + "join_pending" => Ok(RadrootsSimplexAgentConnectionStatus::JoinPending), + "awaiting_approval" => Ok(RadrootsSimplexAgentConnectionStatus::AwaitingApproval), + "allowed" => Ok(RadrootsSimplexAgentConnectionStatus::Allowed), + "connected" => Ok(RadrootsSimplexAgentConnectionStatus::Connected), + "suspended" => Ok(RadrootsSimplexAgentConnectionStatus::Suspended), + "rotating" => Ok(RadrootsSimplexAgentConnectionStatus::Rotating), + "deleted" => Ok(RadrootsSimplexAgentConnectionStatus::Deleted), + other => Err(RadrootsSimplexAgentStoreError::Persistence(format!( + "invalid SimpleX connection status `{other}`" + ))), + } +} + +#[cfg(feature = "std")] +fn encode_queue_role(role: RadrootsSimplexAgentQueueRole) -> &'static str { + match role { + RadrootsSimplexAgentQueueRole::Receive => "receive", + RadrootsSimplexAgentQueueRole::Send => "send", + } +} + +#[cfg(feature = "std")] +fn decode_queue_role( + value: &str, +) -> Result<RadrootsSimplexAgentQueueRole, RadrootsSimplexAgentStoreError> { + match value { + "receive" => Ok(RadrootsSimplexAgentQueueRole::Receive), + "send" => Ok(RadrootsSimplexAgentQueueRole::Send), + other => Err(RadrootsSimplexAgentStoreError::Persistence(format!( + "invalid SimpleX queue role `{other}`" + ))), + } } #[cfg(test)] @@ -359,7 +1268,7 @@ mod tests { .unwrap(), replaced_queue: None, primary, - sender_key: None, + sender_key: Some(b"sender-auth".to_vec()), } } @@ -394,9 +1303,110 @@ mod tests { assert_eq!(ready[0].id, command.id); let retried = store.mark_command_retry(command.id, 20).unwrap(); assert_eq!(retried.ready_at, 20); + let queue = store.primary_send_queue(&connection.id).unwrap(); + assert_eq!(queue.descriptor, sample_descriptor(true)); + assert!(queue.auth_state.is_some()); + } + + #[test] + fn stages_and_confirms_outbound_message_without_consuming_cursor_early() { + let mut store = RadrootsSimplexAgentStore::new(); + let connection = store.create_connection( + RadrootsSimplexAgentConnectionMode::Direct, + RadrootsSimplexAgentConnectionStatus::Connected, + None, + None, + ); + + let prepared = store + .prepare_outbound_message(&connection.id, b"ciphertext".to_vec()) + .unwrap(); + assert_eq!(prepared.message_id, 1); + assert!(prepared.previous_message_hash.is_empty()); + assert_eq!( + store + .connection(&connection.id) + .unwrap() + .delivery_cursor + .last_sent_message_id, + None + ); + + let error = store + .prepare_outbound_message(&connection.id, b"next".to_vec()) + .unwrap_err(); assert_eq!( - store.primary_send_queue(&connection.id).unwrap().descriptor, - sample_descriptor(true) + error, + RadrootsSimplexAgentStoreError::PendingOutboundMessage(connection.id.clone()) + ); + + store + .confirm_outbound_message(&connection.id, prepared.message_id) + .unwrap(); + let cursor = &store.connection(&connection.id).unwrap().delivery_cursor; + assert_eq!(cursor.last_sent_message_id, Some(1)); + assert_eq!(cursor.last_sent_message_hash, Some(b"ciphertext".to_vec())); + } + + #[cfg(feature = "std")] + #[test] + fn flush_and_reopen_persisted_store_state() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().join("agent-store.json"); + + let mut store = RadrootsSimplexAgentStore::open(&path).unwrap(); + let connection = store.create_connection( + RadrootsSimplexAgentConnectionMode::Direct, + RadrootsSimplexAgentConnectionStatus::Connected, + None, + None, + ); + store + .add_queue( + &connection.id, + sample_descriptor(true), + RadrootsSimplexAgentQueueRole::Send, + true, + ) + .unwrap(); + let prepared = store + .prepare_outbound_message(&connection.id, b"persisted".to_vec()) + .unwrap(); + store + .enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue: sample_descriptor(true).queue_address(), + envelope: RadrootsSimplexAgentEnvelope::Invitation { + request: b"req".to_vec(), + connection_info: b"info".to_vec(), + }, + delivery: Some(RadrootsSimplexAgentOutboundMessage { + message_id: prepared.message_id, + message_hash: prepared.message_hash.clone(), + }), + }, + 10, + ) + .unwrap(); + store.flush().unwrap(); + + let loaded = RadrootsSimplexAgentStore::open(&path).unwrap(); + let loaded_connection = loaded.connection(&connection.id).unwrap(); + assert_eq!( + loaded_connection.staged_outbound_message, + Some(RadrootsSimplexAgentOutboundMessage { + message_id: 1, + message_hash: b"persisted".to_vec(), + }) + ); + assert_eq!(loaded.pending_commands.len(), 1); + assert!( + loaded + .primary_send_queue(&connection.id) + .unwrap() + .auth_state + .is_some() ); } } diff --git a/crates/simplex-smp-transport/src/executor.rs b/crates/simplex-smp-transport/src/executor.rs @@ -0,0 +1,29 @@ +use alloc::vec::Vec; +use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpCommandTransmission, + RadrootsSimplexSmpServerAddress, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexSmpTransportRequest { + pub server: RadrootsSimplexSmpServerAddress, + pub transport_version: u16, + pub transmission: RadrootsSimplexSmpCommandTransmission, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexSmpTransportResponse { + pub server: RadrootsSimplexSmpServerAddress, + pub transport_version: u16, + pub transmission: RadrootsSimplexSmpBrokerTransmission, + pub transport_hash: Vec<u8>, +} + +pub trait RadrootsSimplexSmpCommandTransport { + type Error: core::fmt::Display; + + fn execute( + &mut self, + request: RadrootsSimplexSmpTransportRequest, + ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error>; +} diff --git a/crates/simplex-smp-transport/src/lib.rs b/crates/simplex-smp-transport/src/lib.rs @@ -4,11 +4,16 @@ extern crate alloc; pub mod error; +pub mod executor; pub mod frame; pub mod handshake; pub mod prelude { pub use crate::error::RadrootsSimplexSmpTransportError; + pub use crate::executor::{ + RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportRequest, + RadrootsSimplexSmpTransportResponse, + }; pub use crate::frame::{ RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE, RADROOTS_SIMPLEX_SMP_TRANSPORT_PAD_BYTE, RadrootsSimplexSmpTransportBlock, decode_padded_bytes, encode_padded_bytes,