lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

runtime.rs (179625B)


      1 use crate::error::RadrootsSimplexAgentRuntimeError;
      2 use crate::types::{RadrootsSimplexAgentCommandOutcome, RadrootsSimplexAgentRuntimeEvent};
      3 use alloc::collections::VecDeque;
      4 use alloc::format;
      5 use alloc::string::{String, ToString};
      6 use alloc::vec;
      7 use alloc::vec::Vec;
      8 use base64::Engine as _;
      9 use base64::engine::general_purpose::{URL_SAFE, URL_SAFE_NO_PAD};
     10 use radroots_simplex_agent_proto::prelude::{
     11     RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentConnectionMode,
     12     RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentDecryptedMessage,
     13     RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentEnvelope,
     14     RadrootsSimplexAgentMessage, RadrootsSimplexAgentMessageFrame,
     15     RadrootsSimplexAgentMessageHeader, RadrootsSimplexAgentMessageReceipt,
     16     RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor,
     17     RadrootsSimplexAgentShortInvitationLink, RadrootsSimplexAgentShortLinkScheme,
     18     decode_decrypted_message, decode_envelope, decode_short_invitation_fixed_data,
     19     decode_short_invitation_user_data, encode_decrypted_message, encode_envelope,
     20     encode_short_invitation_fixed_data, encode_short_invitation_user_data,
     21 };
     22 use radroots_simplex_agent_store::prelude::{
     23     RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentPendingCommand,
     24     RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentPqKeypair,
     25     RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentQueueRole,
     26     RadrootsSimplexAgentShortLinkCredentials, RadrootsSimplexAgentStore,
     27     RadrootsSimplexAgentX3dhKeypair,
     28 };
     29 use radroots_simplex_smp_crypto::prelude::{
     30     RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION,
     31     RADROOTS_SIMPLEX_SMP_NONCE_LENGTH, RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH,
     32     RadrootsSimplexOfficialSntrup761Keypair, RadrootsSimplexOfficialX3dhParams,
     33     RadrootsSimplexOfficialX448Keypair, RadrootsSimplexSmpCommandAuthorization,
     34     RadrootsSimplexSmpCryptoError, RadrootsSimplexSmpEd25519Keypair,
     35     RadrootsSimplexSmpRatchetState, RadrootsSimplexSmpX25519Keypair, decode_x25519_public_key_x509,
     36     decrypt_padded, decrypt_short_link_data, derive_invitation_short_link_data_key,
     37     derive_shared_secret, encode_ed25519_public_key_x509, encode_x25519_public_key_x509,
     38     encrypt_padded, encrypt_short_link_data, official_sntrup761_keypair_from_seed,
     39     official_x3dh_receiver_init, official_x3dh_receiver_init_accepting_pq,
     40     official_x3dh_sender_init, official_x3dh_sender_init_accepting_pq,
     41     official_x448_keypair_from_seed, random_nonce, sign_short_link_data,
     42     verify_signed_short_link_data,
     43 };
     44 use radroots_simplex_smp_proto::prelude::{
     45     RADROOTS_SIMPLEX_SMP_CURRENT_CLIENT_VERSION, RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
     46     RadrootsSimplexSmpBrokerMessage, RadrootsSimplexSmpCommand, RadrootsSimplexSmpCorrelationId,
     47     RadrootsSimplexSmpError, RadrootsSimplexSmpMessageFlags,
     48     RadrootsSimplexSmpMessagingQueueRequest, RadrootsSimplexSmpNewQueueRequest,
     49     RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpQueueLinkData,
     50     RadrootsSimplexSmpQueueMode, RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpQueueUri,
     51     RadrootsSimplexSmpSendCommand, RadrootsSimplexSmpServerAddress,
     52     RadrootsSimplexSmpSubscriptionMode, RadrootsSimplexSmpVersionRange,
     53 };
     54 use radroots_simplex_smp_transport::prelude::{
     55     RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
     56     RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest,
     57     RadrootsSimplexSmpTransportResponse,
     58 };
     59 use sha2::{Digest, Sha256};
     60 use sha3::Sha3_384;
     61 #[cfg(feature = "std")]
     62 use std::path::{Path, PathBuf};
     63 
     64 const SIMPLEX_E2E_CONFIRMATION_LENGTH: usize = 15_904;
     65 const SIMPLEX_E2E_MESSAGE_LENGTH: usize = 16_000;
     66 const SIMPLEX_AGENT_E2E_CONN_INFO_LENGTH: usize = 14_832;
     67 const SIMPLEX_AGENT_E2E_CONN_INFO_PQ_LENGTH: usize = 11_106;
     68 const SIMPLEX_AGENT_E2E_MESSAGE_LENGTH: usize = 15_840;
     69 const SIMPLEX_AGENT_E2E_MESSAGE_PQ_LENGTH: usize = 13_618;
     70 
     71 #[derive(Debug, Clone)]
     72 struct SimplexClientMessageEnvelope {
     73     sender_public_key: Option<Vec<u8>>,
     74     nonce: [u8; RADROOTS_SIMPLEX_SMP_NONCE_LENGTH],
     75     ciphertext: Vec<u8>,
     76 }
     77 
     78 #[derive(Debug, Clone, Copy)]
     79 enum SimplexAgentPayloadKind {
     80     ConnectionInfo,
     81     Message,
     82 }
     83 
     84 #[derive(Debug, Clone)]
     85 struct SimplexReceivedBody {
     86     timestamp: u64,
     87     flags: RadrootsSimplexSmpMessageFlags,
     88     sent_body: Vec<u8>,
     89 }
     90 
     91 struct SimplexPreparedShortInvitationLinkData {
     92     link_key: Vec<u8>,
     93     link_public_signature_key: Vec<u8>,
     94     link_private_signature_key: Vec<u8>,
     95     encrypted_link_data: RadrootsSimplexSmpQueueLinkData,
     96 }
     97 
     98 pub fn decrypt_short_invitation_link_data(
     99     invitation: &RadrootsSimplexAgentShortInvitationLink,
    100     encrypted_link_data: &RadrootsSimplexSmpQueueLinkData,
    101 ) -> Result<RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentRuntimeError> {
    102     let link_data_key = derive_invitation_short_link_data_key(&invitation.link_key)
    103         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    104     let signed_link_data = decrypt_short_link_data(&link_data_key, encrypted_link_data)
    105         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    106     if signed_link_data.fixed_data.len() <= RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH {
    107         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
    108             "SimpleX short invitation fixed data is missing its signed payload".into(),
    109         ));
    110     }
    111     let fixed_payload =
    112         &signed_link_data.fixed_data[RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH..];
    113     let mut fixed_data = decode_short_invitation_fixed_data(fixed_payload)?;
    114     let verified = verify_signed_short_link_data(
    115         &invitation.link_key,
    116         &fixed_data.root_public_signature_key,
    117         &signed_link_data,
    118     )
    119     .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    120     let user_data = decode_short_invitation_user_data(&verified.user_data)?;
    121     if !fixed_data.invitation.connection_id.is_empty()
    122         && fixed_data.invitation.connection_id != user_data.user_data
    123     {
    124         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
    125             "SimpleX short invitation user data does not match the fixed connection link".into(),
    126         ));
    127     }
    128     fixed_data.invitation.connection_id = user_data.user_data;
    129     Ok(fixed_data.invitation)
    130 }
    131 
    132 pub struct RadrootsSimplexAgentRuntimeBuilder {
    133     store: Option<RadrootsSimplexAgentStore>,
    134     queue_capacity: usize,
    135     retry_delay_ms: u64,
    136     #[cfg(feature = "std")]
    137     persistent_store_path: Option<PathBuf>,
    138 }
    139 
    140 impl RadrootsSimplexAgentRuntimeBuilder {
    141     pub const DEFAULT_QUEUE_CAPACITY: usize = 2_048;
    142     pub const DEFAULT_RETRY_DELAY_MS: u64 = 5_000;
    143 
    144     pub fn new() -> Self {
    145         Self {
    146             store: None,
    147             queue_capacity: Self::DEFAULT_QUEUE_CAPACITY,
    148             retry_delay_ms: Self::DEFAULT_RETRY_DELAY_MS,
    149             #[cfg(feature = "std")]
    150             persistent_store_path: None,
    151         }
    152     }
    153 
    154     pub fn store(mut self, store: RadrootsSimplexAgentStore) -> Self {
    155         self.store = Some(store);
    156         self
    157     }
    158 
    159     #[cfg(feature = "std")]
    160     pub fn persistent_store_path(mut self, path: impl AsRef<Path>) -> Self {
    161         self.persistent_store_path = Some(path.as_ref().to_path_buf());
    162         self
    163     }
    164 
    165     pub fn queue_capacity(mut self, queue_capacity: usize) -> Self {
    166         self.queue_capacity = queue_capacity;
    167         self
    168     }
    169 
    170     pub fn retry_delay_ms(mut self, retry_delay_ms: u64) -> Self {
    171         self.retry_delay_ms = retry_delay_ms;
    172         self
    173     }
    174 
    175     pub fn build(self) -> Result<RadrootsSimplexAgentRuntime, RadrootsSimplexAgentRuntimeError> {
    176         if self.queue_capacity == 0 {
    177             return Err(RadrootsSimplexAgentRuntimeError::InvalidConfig(
    178                 "queue_capacity",
    179             ));
    180         }
    181         #[cfg(feature = "std")]
    182         let store = match (self.store, self.persistent_store_path) {
    183             (Some(mut store), Some(path)) => {
    184                 store.set_persistence_path(path);
    185                 store
    186             }
    187             (Some(store), None) => store,
    188             (None, Some(path)) => RadrootsSimplexAgentStore::open(path)?,
    189             (None, None) => RadrootsSimplexAgentStore::default(),
    190         };
    191         #[cfg(not(feature = "std"))]
    192         let store = self.store.unwrap_or_default();
    193 
    194         Ok(RadrootsSimplexAgentRuntime {
    195             store,
    196             events: VecDeque::with_capacity(self.queue_capacity),
    197             retry_delay_ms: self.retry_delay_ms,
    198         })
    199     }
    200 }
    201 
    202 impl Default for RadrootsSimplexAgentRuntimeBuilder {
    203     fn default() -> Self {
    204         Self::new()
    205     }
    206 }
    207 
    208 pub struct RadrootsSimplexAgentRuntime {
    209     store: RadrootsSimplexAgentStore,
    210     events: VecDeque<RadrootsSimplexAgentRuntimeEvent>,
    211     retry_delay_ms: u64,
    212 }
    213 
    214 impl RadrootsSimplexAgentRuntime {
    215     pub fn create_connection(
    216         &mut self,
    217         mut invitation_queue: RadrootsSimplexSmpQueueUri,
    218         e2e_seed: Vec<u8>,
    219         contact_address: bool,
    220         now: u64,
    221     ) -> Result<String, RadrootsSimplexAgentRuntimeError> {
    222         let e2e_keypair = RadrootsSimplexSmpX25519Keypair::from_seed(&e2e_seed);
    223         invitation_queue.recipient_dh_public_key = encode_queue_public_key(&e2e_keypair.public_key)
    224             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    225         invitation_queue.sender_id = placeholder_sender_id(
    226             invitation_queue.server.server_identity.as_bytes(),
    227             &now.to_be_bytes(),
    228         );
    229         let x3dh_key_1 = official_x448_keypair_from_seed(&derive_material(
    230             b"connection-create-x3dh-1",
    231             &[
    232                 invitation_queue.to_string().as_bytes(),
    233                 &e2e_keypair.public_key,
    234                 &now.to_be_bytes(),
    235             ],
    236         ));
    237         let x3dh_key_2 = official_x448_keypair_from_seed(&derive_material(
    238             b"connection-create-x3dh-2",
    239             &[
    240                 invitation_queue.to_string().as_bytes(),
    241                 &e2e_keypair.public_key,
    242                 &now.to_be_bytes(),
    243             ],
    244         ));
    245         let pq_keypair = official_sntrup761_keypair_from_seed(&derive_material(
    246             b"connection-create-pq-kem",
    247             &[
    248                 invitation_queue.to_string().as_bytes(),
    249                 &e2e_keypair.public_key,
    250                 &now.to_be_bytes(),
    251             ],
    252         ));
    253         let e2e_ratchet_params = RadrootsSimplexOfficialX3dhParams {
    254             version_range: RadrootsSimplexSmpVersionRange::new(
    255                 RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION,
    256                 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION,
    257             )
    258             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?,
    259             key_1: x3dh_key_1.public_key.clone(),
    260             key_2: x3dh_key_2.public_key.clone(),
    261             pq_public_key: Some(pq_keypair.public_key.clone()),
    262             pq_ciphertext: None,
    263         };
    264         let mut ratchet_state = RadrootsSimplexSmpRatchetState::initiator(
    265             x3dh_key_2.public_key.clone(),
    266             x3dh_key_1.public_key.clone(),
    267             None,
    268         )
    269         .ok();
    270         if let Some(ratchet_state) = ratchet_state.as_mut() {
    271             ratchet_state.current_pq_public_key = Some(pq_keypair.public_key.clone());
    272             ratchet_state.local_pq_private_key = Some(pq_keypair.private_key.clone());
    273         }
    274         let connection = self.store.create_connection(
    275             if contact_address {
    276                 RadrootsSimplexAgentConnectionMode::ContactAddress
    277             } else {
    278                 RadrootsSimplexAgentConnectionMode::Direct
    279             },
    280             RadrootsSimplexAgentConnectionStatus::CreatePending,
    281             None,
    282             ratchet_state,
    283         );
    284         let invitation = RadrootsSimplexAgentConnectionLink {
    285             invitation_queue: invitation_queue.clone(),
    286             connection_id: connection.id.as_bytes().to_vec(),
    287             e2e_ratchet_params,
    288             contact_address,
    289         };
    290         let prepared_short_link = if contact_address {
    291             None
    292         } else {
    293             Some(prepare_short_invitation_link_data(&invitation)?)
    294         };
    295         self.store.connection_mut(&connection.id)?.invitation = Some(invitation);
    296         let receive_auth_state = self.store.generate_queue_auth_state()?;
    297         let delivery_keypair = RadrootsSimplexSmpX25519Keypair::generate()
    298             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    299         let descriptor = RadrootsSimplexAgentQueueDescriptor {
    300             queue_uri: invitation_queue,
    301             replaced_queue: None,
    302             primary: true,
    303             sender_key: None,
    304         };
    305         self.store.add_queue(
    306             &connection.id,
    307             descriptor.clone(),
    308             RadrootsSimplexAgentQueueRole::Receive,
    309             true,
    310             receive_auth_state,
    311         )?;
    312         let server_key_hash = decode_server_key_hash(&descriptor.queue_uri.server.server_identity)?;
    313         {
    314             let connection = self.store.connection_mut(&connection.id)?;
    315             connection.local_e2e_public_key = Some(e2e_keypair.public_key);
    316             connection.local_e2e_private_key = Some(e2e_keypair.private_key);
    317             connection.local_x3dh_key_1 = Some(agent_x3dh_keypair(x3dh_key_1));
    318             connection.local_x3dh_key_2 = Some(agent_x3dh_keypair(x3dh_key_2));
    319             connection.local_pq_keypair = Some(agent_pq_keypair(pq_keypair));
    320             connection.short_link =
    321                 prepared_short_link.map(|prepared| RadrootsSimplexAgentShortLinkCredentials {
    322                     scheme: RadrootsSimplexAgentShortLinkScheme::Simplex,
    323                     hosts: descriptor.queue_uri.server.hosts.clone(),
    324                     port: descriptor.queue_uri.server.port,
    325                     server_key_hash: Some(server_key_hash),
    326                     link_id: Vec::new(),
    327                     link_key: prepared.link_key,
    328                     link_public_signature_key: prepared.link_public_signature_key,
    329                     link_private_signature_key: prepared.link_private_signature_key,
    330                     encrypted_fixed_data: Some(prepared.encrypted_link_data.fixed_data),
    331                     encrypted_user_data: Some(prepared.encrypted_link_data.user_data),
    332                 });
    333             let queue = connection
    334                 .queues
    335                 .iter_mut()
    336                 .find(|queue| queue.descriptor.queue_address() == descriptor.queue_address())
    337                 .ok_or_else(|| {
    338                     RadrootsSimplexAgentRuntimeError::Runtime(
    339                         "SimpleX receive queue missing after create_connection".into(),
    340                     )
    341                 })?;
    342             queue.delivery_private_key = Some(delivery_keypair.private_key);
    343         }
    344         self.store.enqueue_command(
    345             &connection.id,
    346             RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor },
    347             now,
    348         )?;
    349         self.flush_store()?;
    350         Ok(connection.id)
    351     }
    352 
    353     pub fn join_connection(
    354         &mut self,
    355         invitation: RadrootsSimplexAgentConnectionLink,
    356         reply_queue: RadrootsSimplexSmpQueueUri,
    357         now: u64,
    358     ) -> Result<String, RadrootsSimplexAgentRuntimeError> {
    359         let connection = self.store.create_connection(
    360             RadrootsSimplexAgentConnectionMode::Direct,
    361             RadrootsSimplexAgentConnectionStatus::JoinPending,
    362             None,
    363             None,
    364         );
    365         let connection_id = connection.id.clone();
    366         self.prepare_join_connection(&connection_id, invitation, reply_queue, now, None)?;
    367         self.flush_store()?;
    368         Ok(connection_id)
    369     }
    370 
    371     pub fn join_short_invitation(
    372         &mut self,
    373         invitation: RadrootsSimplexAgentShortInvitationLink,
    374         reply_queue: RadrootsSimplexSmpQueueUri,
    375         now: u64,
    376     ) -> Result<String, RadrootsSimplexAgentRuntimeError> {
    377         let _ = short_invitation_server(&invitation)?;
    378         let connection = self.store.create_connection(
    379             RadrootsSimplexAgentConnectionMode::Direct,
    380             RadrootsSimplexAgentConnectionStatus::JoinPending,
    381             None,
    382             None,
    383         );
    384         let sender_auth_state = self.store.generate_queue_auth_state()?;
    385         self.store.enqueue_command(
    386             &connection.id,
    387             RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
    388                 invitation,
    389                 reply_queue,
    390                 sender_auth_state,
    391             },
    392             now,
    393         )?;
    394         self.flush_store()?;
    395         Ok(connection.id)
    396     }
    397 
    398     fn prepare_join_connection(
    399         &mut self,
    400         connection_id: &str,
    401         invitation: RadrootsSimplexAgentConnectionLink,
    402         mut reply_queue: RadrootsSimplexSmpQueueUri,
    403         now: u64,
    404         secured_sender_auth_state: Option<RadrootsSimplexAgentQueueAuthState>,
    405     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    406         let send_queue_is_secured = secured_sender_auth_state.is_some();
    407         let local_e2e_keypair = RadrootsSimplexSmpX25519Keypair::generate()
    408             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    409         let invitation_e2e_public_key =
    410             decode_queue_public_key(&invitation.invitation_queue.recipient_dh_public_key)?;
    411         let shared_secret =
    412             derive_shared_secret(&local_e2e_keypair.private_key, &invitation_e2e_public_key)
    413                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    414         reply_queue.recipient_dh_public_key =
    415             encode_queue_public_key(&local_e2e_keypair.public_key)
    416                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    417         reply_queue.sender_id =
    418             placeholder_sender_id(invitation.connection_id.as_slice(), &now.to_be_bytes());
    419         let local_x3dh_key_1 = official_x448_keypair_from_seed(&derive_material(
    420             b"connection-join-x3dh-1",
    421             &[
    422                 invitation.connection_id.as_slice(),
    423                 reply_queue.to_string().as_bytes(),
    424                 &now.to_be_bytes(),
    425             ],
    426         ));
    427         let local_x3dh_key_2 = official_x448_keypair_from_seed(&derive_material(
    428             b"connection-join-x3dh-2",
    429             &[
    430                 invitation.connection_id.as_slice(),
    431                 reply_queue.to_string().as_bytes(),
    432                 &now.to_be_bytes(),
    433             ],
    434         ));
    435         let local_pq_keypair = invitation
    436             .e2e_ratchet_params
    437             .pq_public_key
    438             .as_ref()
    439             .map(|_| {
    440                 official_sntrup761_keypair_from_seed(&derive_material(
    441                     b"connection-join-pq-kem",
    442                     &[
    443                         invitation.connection_id.as_slice(),
    444                         reply_queue.to_string().as_bytes(),
    445                         &now.to_be_bytes(),
    446                     ],
    447                 ))
    448             });
    449         let mut ratchet_state = RadrootsSimplexSmpRatchetState::responder(
    450             local_x3dh_key_2.public_key.clone(),
    451             invitation.e2e_ratchet_params.key_2.clone(),
    452             local_pq_keypair
    453                 .as_ref()
    454                 .map(|keypair| keypair.public_key.clone()),
    455         )
    456         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    457         let local_pq_keypair = if let Some(local_pq_keypair) = local_pq_keypair {
    458             let sender_init = official_x3dh_sender_init_accepting_pq(
    459                 &local_x3dh_key_1,
    460                 &local_x3dh_key_2,
    461                 local_pq_keypair,
    462                 &invitation.e2e_ratchet_params,
    463                 &derive_material(
    464                     b"connection-join-pq-encapsulation",
    465                     &[
    466                         invitation.connection_id.as_slice(),
    467                         reply_queue.to_string().as_bytes(),
    468                         &now.to_be_bytes(),
    469                     ],
    470                 ),
    471             )
    472             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    473             ratchet_state
    474                 .initialize_official_sender(local_x3dh_key_2.private_key.clone(), sender_init.init)
    475                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    476             ratchet_state.current_pq_public_key = sender_init.sender_params.pq_public_key.clone();
    477             ratchet_state.pending_outbound_pq_ciphertext =
    478                 sender_init.sender_params.pq_ciphertext.clone();
    479             ratchet_state.local_pq_private_key =
    480                 Some(sender_init.local_pq_keypair.private_key.clone());
    481             Some(sender_init.local_pq_keypair)
    482         } else {
    483             let sender_init = official_x3dh_sender_init(
    484                 &local_x3dh_key_1,
    485                 &local_x3dh_key_2,
    486                 &invitation.e2e_ratchet_params,
    487             )
    488             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    489             ratchet_state
    490                 .initialize_official_sender(local_x3dh_key_2.private_key.clone(), sender_init)
    491                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    492             None
    493         };
    494         let send_auth_state = if let Some(sender_auth_state) = secured_sender_auth_state {
    495             sender_auth_state
    496         } else {
    497             self.store.generate_queue_auth_state()?
    498         };
    499         let send_descriptor = RadrootsSimplexAgentQueueDescriptor {
    500             queue_uri: invitation.invitation_queue.clone(),
    501             replaced_queue: None,
    502             primary: true,
    503             sender_key: Some(send_auth_state.public_key.clone()),
    504         };
    505         let receive_auth_state = self.store.generate_queue_auth_state()?;
    506         let delivery_keypair = RadrootsSimplexSmpX25519Keypair::generate()
    507             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
    508         let receive_descriptor = RadrootsSimplexAgentQueueDescriptor {
    509             queue_uri: reply_queue,
    510             replaced_queue: None,
    511             primary: true,
    512             sender_key: None,
    513         };
    514         {
    515             let connection = self.store.connection_mut(connection_id)?;
    516             connection.mode = RadrootsSimplexAgentConnectionMode::Direct;
    517             connection.status = RadrootsSimplexAgentConnectionStatus::JoinPending;
    518             connection.invitation = Some(invitation);
    519             connection.ratchet_state = Some(ratchet_state);
    520         }
    521         self.store.add_queue(
    522             connection_id,
    523             send_descriptor.clone(),
    524             RadrootsSimplexAgentQueueRole::Send,
    525             true,
    526             send_auth_state,
    527         )?;
    528         self.store.add_queue(
    529             connection_id,
    530             receive_descriptor.clone(),
    531             RadrootsSimplexAgentQueueRole::Receive,
    532             true,
    533             receive_auth_state,
    534         )?;
    535         {
    536             let connection = self.store.connection_mut(connection_id)?;
    537             connection.local_e2e_public_key = Some(local_e2e_keypair.public_key.clone());
    538             connection.local_e2e_private_key = Some(local_e2e_keypair.private_key);
    539             connection.local_x3dh_key_1 = Some(agent_x3dh_keypair(local_x3dh_key_1));
    540             connection.local_x3dh_key_2 = Some(agent_x3dh_keypair(local_x3dh_key_2));
    541             connection.local_pq_keypair = local_pq_keypair.map(agent_pq_keypair);
    542             connection.shared_secret = Some(shared_secret);
    543             let queue = connection
    544                 .queues
    545                 .iter_mut()
    546                 .find(|queue| {
    547                     queue.descriptor.queue_address() == receive_descriptor.queue_address()
    548                 })
    549                 .ok_or_else(|| {
    550                     RadrootsSimplexAgentRuntimeError::Runtime(
    551                         "SimpleX reply receive queue missing after join_connection".into(),
    552                     )
    553                 })?;
    554             queue.delivery_private_key = Some(delivery_keypair.private_key);
    555         }
    556         if !send_queue_is_secured {
    557             self.store.enqueue_command(
    558                 connection_id,
    559                 RadrootsSimplexAgentPendingCommandKind::SecureQueue {
    560                     queue: send_descriptor.queue_address(),
    561                     sender_key: send_descriptor.sender_key.clone(),
    562                 },
    563                 now,
    564             )?;
    565         }
    566         self.store.enqueue_command(
    567             connection_id,
    568             RadrootsSimplexAgentPendingCommandKind::CreateQueue {
    569                 descriptor: receive_descriptor.clone(),
    570             },
    571             now,
    572         )?;
    573         Ok(())
    574     }
    575 
    576     pub fn allow_connection(
    577         &mut self,
    578         connection_id: &str,
    579         local_info: Vec<u8>,
    580         now: u64,
    581     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    582         if self.store.connection(connection_id)?.status
    583             != RadrootsSimplexAgentConnectionStatus::AwaitingApproval
    584         {
    585             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
    586                 "SimpleX connection `{connection_id}` is not awaiting approval"
    587             )));
    588         }
    589         self.store
    590             .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?;
    591         let send_queue = self.store.primary_send_queue(connection_id)?;
    592         let encrypted = self.next_encrypted_payload(
    593             connection_id,
    594             encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(
    595                 local_info,
    596             ))?,
    597             SimplexAgentPayloadKind::ConnectionInfo,
    598         )?;
    599         self.store.enqueue_command(
    600             connection_id,
    601             RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
    602                 queue: send_queue.descriptor.queue_address(),
    603                 envelope: RadrootsSimplexAgentEnvelope::Confirmation {
    604                     reply_queue: false,
    605                     e2e_ratchet_params: None,
    606                     encrypted,
    607                 },
    608                 delivery: None,
    609             },
    610             now,
    611         )?;
    612         self.enqueue_hello(connection_id, now)?;
    613         self.flush_store()?;
    614         Ok(())
    615     }
    616 
    617     pub fn subscribe_connection(
    618         &mut self,
    619         connection_id: &str,
    620         now: u64,
    621     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    622         for queue in self.store.receive_queues(connection_id)? {
    623             self.store.enqueue_command(
    624                 connection_id,
    625                 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue {
    626                     queue: queue.descriptor.queue_address(),
    627                 },
    628                 now,
    629             )?;
    630         }
    631         self.events
    632             .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued {
    633                 connection_id: connection_id.into(),
    634             });
    635         self.flush_store()?;
    636         Ok(())
    637     }
    638 
    639     pub fn resume_subscriptions(
    640         &mut self,
    641         now: u64,
    642     ) -> Result<usize, RadrootsSimplexAgentRuntimeError> {
    643         let mut queued = 0_usize;
    644         let mut event_connections = Vec::new();
    645         for (connection_id, queue) in self.store.subscribed_receive_queues() {
    646             if self
    647                 .store
    648                 .has_pending_subscribe_queue(&connection_id, &queue)
    649             {
    650                 continue;
    651             }
    652             self.store.enqueue_command(
    653                 &connection_id,
    654                 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue },
    655                 now,
    656             )?;
    657             queued = queued.saturating_add(1);
    658             if !event_connections.contains(&connection_id) {
    659                 event_connections.push(connection_id);
    660             }
    661         }
    662         for connection_id in event_connections {
    663             self.events
    664                 .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { connection_id });
    665         }
    666         if queued > 0 {
    667             self.flush_store()?;
    668         }
    669         Ok(queued)
    670     }
    671 
    672     pub fn get_connection_message(
    673         &mut self,
    674         connection_id: &str,
    675         now: u64,
    676     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    677         for queue in self.store.receive_queues(connection_id)? {
    678             self.store.enqueue_command(
    679                 connection_id,
    680                 RadrootsSimplexAgentPendingCommandKind::GetQueueMessage {
    681                     queue: queue.descriptor.queue_address(),
    682                 },
    683                 now,
    684             )?;
    685         }
    686         self.flush_store()?;
    687         Ok(())
    688     }
    689 
    690     pub fn send_message(
    691         &mut self,
    692         connection_id: &str,
    693         body: Vec<u8>,
    694         now: u64,
    695     ) -> Result<u64, RadrootsSimplexAgentRuntimeError> {
    696         let send_queue = self.store.primary_send_queue(connection_id)?;
    697         let connection = self.store.connection(connection_id)?;
    698         if connection.status != RadrootsSimplexAgentConnectionStatus::Connected {
    699             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
    700                 "SimpleX connection `{connection_id}` is not connected"
    701             )));
    702         }
    703         if connection.staged_outbound_message.is_some() {
    704             return Err(RadrootsSimplexAgentRuntimeError::Store(
    705                 radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::PendingOutboundMessage(
    706                     connection_id.into(),
    707                 ),
    708             ));
    709         }
    710         let previous_hash = connection
    711             .delivery_cursor
    712             .last_sent_message_hash
    713             .clone()
    714             .unwrap_or_default();
    715         let message_id = connection
    716             .delivery_cursor
    717             .last_sent_message_id
    718             .unwrap_or(0)
    719             .saturating_add(1);
    720         let frame = RadrootsSimplexAgentMessageFrame {
    721             header: RadrootsSimplexAgentMessageHeader {
    722                 message_id,
    723                 previous_message_hash: previous_hash,
    724             },
    725             message: RadrootsSimplexAgentMessage::UserMessage(body),
    726             padding: Vec::new(),
    727         };
    728         let ciphertext =
    729             encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?;
    730         let message_hash = Sha256::digest(&ciphertext).to_vec();
    731         let prepared = self
    732             .store
    733             .prepare_outbound_message(connection_id, message_hash.clone())?;
    734         let encrypted = self.next_encrypted_payload(
    735             connection_id,
    736             ciphertext,
    737             SimplexAgentPayloadKind::Message,
    738         )?;
    739         self.store.enqueue_command(
    740             connection_id,
    741             RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
    742                 queue: send_queue.descriptor.queue_address(),
    743                 envelope: RadrootsSimplexAgentEnvelope::Message(encrypted),
    744                 delivery: Some(RadrootsSimplexAgentOutboundMessage {
    745                     message_id: prepared.message_id,
    746                     message_hash: prepared.message_hash,
    747                 }),
    748             },
    749             now,
    750         )?;
    751         self.events
    752             .push_back(RadrootsSimplexAgentRuntimeEvent::MessageQueued {
    753                 connection_id: connection_id.into(),
    754                 message_id,
    755             });
    756         self.flush_store()?;
    757         Ok(message_id)
    758     }
    759 
    760     pub fn send_hello(
    761         &mut self,
    762         connection_id: &str,
    763         now: u64,
    764     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    765         self.enqueue_hello(connection_id, now)?;
    766         self.flush_store()?;
    767         Ok(())
    768     }
    769 
    770     pub fn ack_message(
    771         &mut self,
    772         connection_id: &str,
    773         message_id: u64,
    774         message_hash: Vec<u8>,
    775         receipt_info: Vec<u8>,
    776         now: u64,
    777     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    778         if self
    779             .store
    780             .has_pending_ack_message(connection_id, message_id, &message_hash)
    781         {
    782             return Ok(());
    783         }
    784         let (receive_queue, broker_message_id) = self
    785             .store
    786             .inbound_ack_target(connection_id, message_id, &message_hash)?
    787             .ok_or_else(|| {
    788                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
    789                     "SimpleX connection `{connection_id}` has no frame-specific ACK target for message `{message_id}`"
    790                 ))
    791             })?;
    792         self.store.enqueue_command(
    793             connection_id,
    794             RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
    795                 queue: receive_queue,
    796                 broker_message_id,
    797                 receipt: Some(RadrootsSimplexAgentMessageReceipt {
    798                     message_id,
    799                     message_hash,
    800                     receipt_info,
    801                 }),
    802             },
    803             now,
    804         )?;
    805         self.flush_store()?;
    806         Ok(())
    807     }
    808 
    809     pub fn ack_last_received_message(
    810         &mut self,
    811         connection_id: &str,
    812         message_id: u64,
    813         receipt_info: Vec<u8>,
    814         now: u64,
    815     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    816         let message_hash = self
    817             .store
    818             .connection(connection_id)?
    819             .delivery_cursor
    820             .last_received_message_hash
    821             .clone()
    822             .ok_or_else(|| {
    823                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
    824                     "SimpleX connection `{connection_id}` has no received message hash to acknowledge"
    825                 ))
    826             })?;
    827         self.ack_message(connection_id, message_id, message_hash, receipt_info, now)
    828     }
    829 
    830     fn ack_broker_message(
    831         &mut self,
    832         connection_id: &str,
    833         queue: radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress,
    834         broker_message_id: Vec<u8>,
    835         now: u64,
    836     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    837         if self
    838             .store
    839             .has_pending_broker_ack(connection_id, &queue, &broker_message_id)
    840         {
    841             return Ok(());
    842         }
    843         self.store.enqueue_command(
    844             connection_id,
    845             RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
    846                 queue,
    847                 broker_message_id,
    848                 receipt: None,
    849             },
    850             now,
    851         )?;
    852         Ok(())
    853     }
    854 
    855     pub fn reconnect_connection(
    856         &mut self,
    857         connection_id: &str,
    858         now: u64,
    859     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    860         self.subscribe_connection(connection_id, now)?;
    861         let ready = self.store.take_ready_commands(now, usize::MAX);
    862         for command in ready {
    863             self.store
    864                 .mark_command_retry(command.id, now + self.retry_delay_ms)?;
    865             self.events
    866                 .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued {
    867                     connection_id: connection_id.into(),
    868                     command_id: command.id,
    869                 });
    870         }
    871         self.flush_store()?;
    872         Ok(())
    873     }
    874 
    875     pub fn queue_rotation(
    876         &mut self,
    877         connection_id: &str,
    878         descriptors: Vec<RadrootsSimplexAgentQueueDescriptor>,
    879         now: u64,
    880     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    881         self.store.set_status(
    882             connection_id,
    883             RadrootsSimplexAgentConnectionStatus::Rotating,
    884         )?;
    885         self.store.enqueue_command(
    886             connection_id,
    887             RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors },
    888             now,
    889         )?;
    890         self.events
    891             .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued {
    892                 connection_id: connection_id.into(),
    893             });
    894         self.flush_store()?;
    895         Ok(())
    896     }
    897 
    898     pub fn handle_inbound_decrypted_message(
    899         &mut self,
    900         connection_id: &str,
    901         message: RadrootsSimplexAgentDecryptedMessage,
    902         message_hash: Vec<u8>,
    903     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
    904         match message {
    905             RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => {
    906                 if self.store.connection(connection_id)?.status
    907                     != RadrootsSimplexAgentConnectionStatus::Connected
    908                 {
    909                     self.store
    910                         .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?;
    911                 }
    912                 self.enqueue_hello(connection_id, 0)?;
    913                 self.events
    914                     .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo {
    915                         connection_id: connection_id.into(),
    916                         info,
    917                     });
    918             }
    919             RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => {
    920                 let mut secure_queues = Vec::new();
    921                 for descriptor in reply_queues {
    922                     let queue_address = descriptor.queue_address();
    923                     if let Ok(existing_queue) =
    924                         self.store.queue_record(connection_id, &queue_address)
    925                         && existing_queue.role == RadrootsSimplexAgentQueueRole::Send
    926                     {
    927                         continue;
    928                     }
    929                     let auth_state = self.store.generate_queue_auth_state()?;
    930                     let mut descriptor = descriptor;
    931                     descriptor.sender_key = Some(auth_state.public_key.clone());
    932                     let secure_queue = descriptor.queue_address();
    933                     let sender_key = descriptor.sender_key.clone();
    934                     self.store.add_queue(
    935                         connection_id,
    936                         descriptor,
    937                         RadrootsSimplexAgentQueueRole::Send,
    938                         true,
    939                         auth_state,
    940                     )?;
    941                     secure_queues.push((secure_queue, sender_key));
    942                 }
    943                 if secure_queues.is_empty()
    944                     && matches!(
    945                         self.store.connection(connection_id)?.status,
    946                         RadrootsSimplexAgentConnectionStatus::AwaitingApproval
    947                             | RadrootsSimplexAgentConnectionStatus::Allowed
    948                             | RadrootsSimplexAgentConnectionStatus::Connected
    949                     )
    950                 {
    951                     return Ok(());
    952                 }
    953                 self.store.set_status(
    954                     connection_id,
    955                     RadrootsSimplexAgentConnectionStatus::AwaitingApproval,
    956                 )?;
    957                 for (queue, sender_key) in secure_queues {
    958                     self.store.enqueue_command(
    959                         connection_id,
    960                         RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key },
    961                         0,
    962                     )?;
    963                 }
    964                 self.events
    965                     .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired {
    966                         connection_id: connection_id.into(),
    967                     });
    968                 self.events
    969                     .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo {
    970                         connection_id: connection_id.into(),
    971                         info,
    972                     });
    973             }
    974             RadrootsSimplexAgentDecryptedMessage::RatchetInfo(info) => {
    975                 self.events
    976                     .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo {
    977                         connection_id: connection_id.into(),
    978                         info,
    979                     });
    980             }
    981             RadrootsSimplexAgentDecryptedMessage::Message(frame) => match frame.message {
    982                 RadrootsSimplexAgentMessage::Hello => {
    983                     let connection = self.store.connection(connection_id)?;
    984                     let was_connected =
    985                         connection.status == RadrootsSimplexAgentConnectionStatus::Connected;
    986                     let should_send_hello = !connection.hello_sent;
    987                     {
    988                         let connection = self.store.connection_mut(connection_id)?;
    989                         connection.hello_received = true;
    990                     }
    991                     if should_send_hello {
    992                         self.enqueue_hello(connection_id, 0)?;
    993                     }
    994                     if !was_connected {
    995                         self.store.set_status(
    996                             connection_id,
    997                             RadrootsSimplexAgentConnectionStatus::Connected,
    998                         )?;
    999                         self.events.push_back(
   1000                             RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished {
   1001                                 connection_id: connection_id.into(),
   1002                             },
   1003                         );
   1004                     }
   1005                 }
   1006                 RadrootsSimplexAgentMessage::Receipt(receipt) => {
   1007                     let Some(stored_message_hash) = self
   1008                         .store
   1009                         .outbound_message_hash(connection_id, receipt.message_id)?
   1010                     else {
   1011                         return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1012                             "SimpleX receipt for `{connection_id}` referenced unknown outbound message `{}`",
   1013                             receipt.message_id
   1014                         )));
   1015                     };
   1016                     if stored_message_hash != receipt.message_hash {
   1017                         return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1018                             "SimpleX receipt for `{connection_id}` message `{}` did not match stored outbound message hash",
   1019                             receipt.message_id
   1020                         )));
   1021                     }
   1022                     self.events
   1023                         .push_back(RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged {
   1024                             connection_id: connection_id.into(),
   1025                             message_id: receipt.message_id,
   1026                             message_hash: receipt.message_hash,
   1027                         });
   1028                 }
   1029                 RadrootsSimplexAgentMessage::QueueAdd(_)
   1030                 | RadrootsSimplexAgentMessage::QueueKey(_)
   1031                 | RadrootsSimplexAgentMessage::QueueUse(_)
   1032                 | RadrootsSimplexAgentMessage::QueueTest(_)
   1033                 | RadrootsSimplexAgentMessage::QueueContinue(_) => {
   1034                     self.events
   1035                         .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued {
   1036                             connection_id: connection_id.into(),
   1037                         });
   1038                 }
   1039                 RadrootsSimplexAgentMessage::UserMessage(body) => {
   1040                     let broker_message_id_hash = self
   1041                         .store
   1042                         .connection(connection_id)?
   1043                         .last_received_broker_message_id
   1044                         .as_ref()
   1045                         .map(|broker_message_id| Sha256::digest(broker_message_id).to_vec())
   1046                         .unwrap_or_default();
   1047                     self.events
   1048                         .push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived {
   1049                             connection_id: connection_id.into(),
   1050                             message_id: frame.header.message_id,
   1051                             broker_message_id_hash,
   1052                             message_hash,
   1053                             body,
   1054                         });
   1055                 }
   1056                 _ => {}
   1057             },
   1058         }
   1059         self.flush_store()?;
   1060         Ok(())
   1061     }
   1062 
   1063     pub fn record_command_outcome(
   1064         &mut self,
   1065         command_id: u64,
   1066         outcome: RadrootsSimplexAgentCommandOutcome,
   1067     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1068         match outcome {
   1069             RadrootsSimplexAgentCommandOutcome::Delivered => {
   1070                 let command = self.store.mark_command_delivered(command_id)?;
   1071                 self.apply_delivery_side_effects(&command)?;
   1072             }
   1073             RadrootsSimplexAgentCommandOutcome::RetryAt { ready_at } => {
   1074                 let command = self.store.mark_command_retry(command_id, ready_at)?;
   1075                 self.events
   1076                     .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued {
   1077                         connection_id: command.connection_id,
   1078                         command_id,
   1079                     });
   1080             }
   1081             RadrootsSimplexAgentCommandOutcome::Failed { message } => {
   1082                 let command = self.store.mark_command_failed(command_id)?;
   1083                 self.apply_failure_side_effects(&command)?;
   1084                 self.events
   1085                     .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
   1086                         connection_id: Some(command.connection_id),
   1087                         message,
   1088                     });
   1089             }
   1090         }
   1091         self.flush_store()?;
   1092         Ok(())
   1093     }
   1094 
   1095     pub fn execute_ready_commands<T: RadrootsSimplexSmpCommandTransport>(
   1096         &mut self,
   1097         transport: &mut T,
   1098         now: u64,
   1099         limit: usize,
   1100     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1101         let mut remaining = limit;
   1102         while remaining > 0 {
   1103             let ready = self.store.take_ready_commands(now, remaining);
   1104             if ready.is_empty() {
   1105                 break;
   1106             }
   1107             remaining = remaining.saturating_sub(ready.len());
   1108             for command in ready {
   1109                 self.dispatch_ready_command(transport, &command, now)?;
   1110             }
   1111         }
   1112         self.flush_store()?;
   1113         Ok(())
   1114     }
   1115 
   1116     pub fn receive_subscription_messages<T: RadrootsSimplexSmpSubscriptionTransport>(
   1117         &mut self,
   1118         transport: &mut T,
   1119         limit: usize,
   1120     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1121         let mut remaining = limit;
   1122         for server in self.store.subscribed_receive_servers() {
   1123             while remaining > 0 {
   1124                 match transport.receive_subscription(RadrootsSimplexSmpSubscriptionReceiveRequest {
   1125                     server: server.clone(),
   1126                 }) {
   1127                     Ok(Some(response)) => {
   1128                         self.apply_subscription_response(response)?;
   1129                         remaining = remaining.saturating_sub(1);
   1130                     }
   1131                     Ok(None) => break,
   1132                     Err(error) => {
   1133                         self.events
   1134                             .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
   1135                                 connection_id: None,
   1136                                 message: format!(
   1137                                     "SimpleX subscription receive failed for server `{}`: {error}",
   1138                                     server.server_identity
   1139                                 ),
   1140                             });
   1141                         break;
   1142                     }
   1143                 }
   1144             }
   1145             if remaining == 0 {
   1146                 break;
   1147             }
   1148         }
   1149         self.flush_store()?;
   1150         Ok(())
   1151     }
   1152 
   1153     pub fn retry_pending(
   1154         &mut self,
   1155         now: u64,
   1156         limit: usize,
   1157     ) -> Vec<RadrootsSimplexAgentPendingCommand> {
   1158         self.store.take_ready_commands(now, limit)
   1159     }
   1160 
   1161     pub fn drain_events(&mut self, max: usize) -> Vec<RadrootsSimplexAgentRuntimeEvent> {
   1162         let take = self.events.len().min(max);
   1163         (0..take)
   1164             .filter_map(|_| self.events.pop_front())
   1165             .collect::<Vec<_>>()
   1166     }
   1167 
   1168     fn dispatch_ready_command<T: RadrootsSimplexSmpCommandTransport>(
   1169         &mut self,
   1170         transport: &mut T,
   1171         command: &RadrootsSimplexAgentPendingCommand,
   1172         now: u64,
   1173     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1174         match &command.kind {
   1175             RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => {
   1176                 for descriptor in descriptors.clone() {
   1177                     let auth_state = self.store.generate_queue_auth_state()?;
   1178                     self.store.add_queue(
   1179                         &command.connection_id,
   1180                         descriptor,
   1181                         RadrootsSimplexAgentQueueRole::Receive,
   1182                         true,
   1183                         auth_state,
   1184                     )?;
   1185                 }
   1186                 self.record_command_outcome(
   1187                     command.id,
   1188                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1189                 )
   1190             }
   1191             RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => {
   1192                 for queue in queues {
   1193                     self.store
   1194                         .mark_queue_tested(&command.connection_id, queue)?;
   1195                 }
   1196                 self.record_command_outcome(
   1197                     command.id,
   1198                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1199                 )
   1200             }
   1201             _ => {
   1202                 let request = self.build_transport_request(command)?;
   1203                 match transport.execute(request) {
   1204                     Ok(response) => self.apply_transport_response(command, response),
   1205                     Err(error) => {
   1206                         self.events
   1207                             .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
   1208                                 connection_id: Some(command.connection_id.clone()),
   1209                                 message: format!(
   1210                                     "SimpleX transport execution failed for command `{}`: {error}",
   1211                                     command.id
   1212                                 ),
   1213                             });
   1214                         self.record_command_outcome(
   1215                             command.id,
   1216                             RadrootsSimplexAgentCommandOutcome::RetryAt {
   1217                                 ready_at: now + self.retry_delay_ms,
   1218                             },
   1219                         )
   1220                     }
   1221                 }
   1222             }
   1223         }
   1224     }
   1225 
   1226     fn build_transport_request(
   1227         &self,
   1228         command: &RadrootsSimplexAgentPendingCommand,
   1229     ) -> Result<RadrootsSimplexSmpTransportRequest, RadrootsSimplexAgentRuntimeError> {
   1230         match &command.kind {
   1231             RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
   1232                 invitation,
   1233                 sender_auth_state,
   1234                 ..
   1235             } => {
   1236                 let server = short_invitation_server(invitation)?;
   1237                 return Ok(RadrootsSimplexSmpTransportRequest {
   1238                     server,
   1239                     transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1240                     correlation_id: Some(self.command_correlation_id(command)?),
   1241                     entity_id: invitation.link_id.clone(),
   1242                     command: RadrootsSimplexSmpCommand::LKey(
   1243                         encode_ed25519_public_key_x509(&sender_auth_state.public_key).map_err(
   1244                             |error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()),
   1245                         )?,
   1246                     ),
   1247                     authorization: RadrootsSimplexSmpCommandAuthorization::Ed25519(
   1248                         radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair {
   1249                             public_key: sender_auth_state.public_key.clone(),
   1250                             private_key: sender_auth_state.private_key.clone(),
   1251                         },
   1252                     ),
   1253                 });
   1254             }
   1255             RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { invitation, .. } => {
   1256                 let server = short_invitation_server(invitation)?;
   1257                 return Ok(self.server_transport_request(
   1258                     command.id,
   1259                     &server,
   1260                     invitation.link_id.clone(),
   1261                     RadrootsSimplexSmpCommand::LGet,
   1262                 ));
   1263             }
   1264             _ => {}
   1265         }
   1266         let (queue_address, entity_id, smp_command) = self.command_transport_parts(command)?;
   1267         let queue = self
   1268             .store
   1269             .queue_record(&command.connection_id, &queue_address)?;
   1270         let auth = queue.auth_state.ok_or_else(|| {
   1271             RadrootsSimplexAgentRuntimeError::Store(
   1272                 radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::QueueAuthStateMissing(
   1273                     command.connection_id.clone(),
   1274                 ),
   1275             )
   1276         })?;
   1277         let correlation_id = self.command_correlation_id(command)?;
   1278         let authorization = RadrootsSimplexSmpCommandAuthorization::Ed25519(
   1279             radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair {
   1280                 public_key: auth.public_key,
   1281                 private_key: auth.private_key,
   1282             },
   1283         );
   1284         Ok(RadrootsSimplexSmpTransportRequest {
   1285             server: queue.descriptor.queue_uri.server.clone(),
   1286             transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1287             correlation_id: Some(correlation_id),
   1288             entity_id,
   1289             command: smp_command,
   1290             authorization,
   1291         })
   1292     }
   1293 
   1294     fn server_transport_request(
   1295         &self,
   1296         command_id: u64,
   1297         server: &RadrootsSimplexSmpServerAddress,
   1298         entity_id: Vec<u8>,
   1299         command: RadrootsSimplexSmpCommand,
   1300     ) -> RadrootsSimplexSmpTransportRequest {
   1301         let correlation_id = correlation_id_from_material(
   1302             b"simplex-server-command-correlation",
   1303             &[
   1304                 command_id.to_be_bytes().to_vec(),
   1305                 server.server_identity.as_bytes().to_vec(),
   1306                 entity_id.clone(),
   1307             ],
   1308         );
   1309         RadrootsSimplexSmpTransportRequest {
   1310             server: server.clone(),
   1311             transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1312             correlation_id: Some(correlation_id),
   1313             entity_id,
   1314             command,
   1315             authorization: RadrootsSimplexSmpCommandAuthorization::None,
   1316         }
   1317     }
   1318 
   1319     fn command_correlation_id(
   1320         &self,
   1321         command: &RadrootsSimplexAgentPendingCommand,
   1322     ) -> Result<RadrootsSimplexSmpCorrelationId, RadrootsSimplexAgentRuntimeError> {
   1323         let mut parts = vec![
   1324             command.id.to_be_bytes().to_vec(),
   1325             command.connection_id.as_bytes().to_vec(),
   1326         ];
   1327         if let Some(queue_address) = queue_for_command(command) {
   1328             parts.push(queue_address.server.server_identity.as_bytes().to_vec());
   1329             parts.push(queue_address.sender_id.clone());
   1330             parts.push(
   1331                 self.store
   1332                     .queue_auth_state(&command.connection_id, &queue_address)?
   1333                     .public_key,
   1334             );
   1335         }
   1336         if matches!(
   1337             command.kind,
   1338             RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. }
   1339         ) && let Some(short_link) = self
   1340             .store
   1341             .connection(&command.connection_id)?
   1342             .short_link
   1343             .as_ref()
   1344         {
   1345             parts.push(short_link.link_key.clone());
   1346         }
   1347         if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
   1348             sender_auth_state,
   1349             ..
   1350         } = &command.kind
   1351         {
   1352             parts.push(sender_auth_state.public_key.clone());
   1353         }
   1354         Ok(correlation_id_from_material(
   1355             b"simplex-command-correlation",
   1356             &parts,
   1357         ))
   1358     }
   1359 
   1360     fn command_transport_parts(
   1361         &self,
   1362         command: &RadrootsSimplexAgentPendingCommand,
   1363     ) -> Result<
   1364         (
   1365             radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress,
   1366             Vec<u8>,
   1367             RadrootsSimplexSmpCommand,
   1368         ),
   1369         RadrootsSimplexAgentRuntimeError,
   1370     > {
   1371         match &command.kind {
   1372             RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => {
   1373                 let correlation_id = self.command_correlation_id(command)?;
   1374                 let auth_state = self
   1375                     .store
   1376                     .queue_auth_state(&command.connection_id, &descriptor.queue_address())?;
   1377                 let delivery_private_key = self
   1378                     .store
   1379                     .queue_record(&command.connection_id, &descriptor.queue_address())?
   1380                     .delivery_private_key
   1381                     .ok_or_else(|| {
   1382                         RadrootsSimplexAgentRuntimeError::Runtime(
   1383                             "SimpleX receive queue missing delivery private key".into(),
   1384                         )
   1385                     })?;
   1386                 Ok((
   1387                     descriptor.queue_address(),
   1388                     Vec::new(),
   1389                     RadrootsSimplexSmpCommand::New(RadrootsSimplexSmpNewQueueRequest {
   1390                         recipient_auth_public_key: encode_ed25519_public_key_x509(
   1391                             &auth_state.public_key,
   1392                         )
   1393                         .map_err(|error| {
   1394                             RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
   1395                         })?,
   1396                         recipient_dh_public_key: encode_x25519_public_key_x509(
   1397                             &RadrootsSimplexSmpX25519Keypair::public_key_from_private(
   1398                                 &delivery_private_key,
   1399                             )
   1400                             .map_err(|error| {
   1401                                 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
   1402                             })?,
   1403                         )
   1404                         .map_err(|error| {
   1405                             RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
   1406                         })?,
   1407                         basic_auth: None,
   1408                         subscription_mode: RadrootsSimplexSmpSubscriptionMode::OnlyCreate,
   1409                         queue_request_data: Some(
   1410                             match descriptor
   1411                                 .queue_uri
   1412                                 .queue_mode
   1413                                 .unwrap_or(RadrootsSimplexSmpQueueMode::Messaging)
   1414                             {
   1415                                 RadrootsSimplexSmpQueueMode::Messaging => {
   1416                                     RadrootsSimplexSmpQueueRequestData::Messaging(
   1417                                         self.short_link_messaging_queue_request(
   1418                                             &command.connection_id,
   1419                                             &correlation_id,
   1420                                         )?,
   1421                                     )
   1422                                 }
   1423                                 RadrootsSimplexSmpQueueMode::Contact => {
   1424                                     RadrootsSimplexSmpQueueRequestData::Contact(None)
   1425                                 }
   1426                             },
   1427                         ),
   1428                         notifier_credentials: None,
   1429                     }),
   1430                 ))
   1431             }
   1432             RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key } => Ok((
   1433                 queue.clone(),
   1434                 queue.sender_id.clone(),
   1435                 RadrootsSimplexSmpCommand::SKey(
   1436                     encode_ed25519_public_key_x509(sender_key.as_deref().unwrap_or_default())
   1437                         .map_err(|error| {
   1438                             RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
   1439                         })?,
   1440                 ),
   1441             )),
   1442             RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   1443                 queue, envelope, ..
   1444             } => Ok((
   1445                 queue.clone(),
   1446                 queue.sender_id.clone(),
   1447                 RadrootsSimplexSmpCommand::Send(RadrootsSimplexSmpSendCommand {
   1448                     flags: RadrootsSimplexSmpMessageFlags::notifications_enabled(),
   1449                     message_body: self.encode_smp_message_body(&command.connection_id, envelope)?,
   1450                 }),
   1451             )),
   1452             RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => Ok((
   1453                 queue.clone(),
   1454                 self.store
   1455                     .queue_record(&command.connection_id, queue)?
   1456                     .entity_id,
   1457                 RadrootsSimplexSmpCommand::Sub,
   1458             )),
   1459             RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue } => Ok((
   1460                 queue.clone(),
   1461                 self.store
   1462                     .queue_record(&command.connection_id, queue)?
   1463                     .entity_id,
   1464                 RadrootsSimplexSmpCommand::Get,
   1465             )),
   1466             RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
   1467                 queue,
   1468                 broker_message_id,
   1469                 ..
   1470             } => Ok((
   1471                 queue.clone(),
   1472                 self.store
   1473                     .queue_record(&command.connection_id, queue)?
   1474                     .entity_id,
   1475                 RadrootsSimplexSmpCommand::Ack(broker_message_id.clone()),
   1476             )),
   1477             RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => {
   1478                 let address = descriptors
   1479                     .first()
   1480                     .ok_or_else(|| {
   1481                         RadrootsSimplexAgentRuntimeError::Runtime(
   1482                             "queue rotation command requires at least one descriptor".into(),
   1483                         )
   1484                     })?
   1485                     .queue_address();
   1486                 let entity_id = address.sender_id.clone();
   1487                 Ok((address, entity_id, RadrootsSimplexSmpCommand::Que))
   1488             }
   1489             RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => {
   1490                 let address = queues.first().cloned().ok_or_else(|| {
   1491                     RadrootsSimplexAgentRuntimeError::Runtime(
   1492                         "queue test command requires at least one queue".into(),
   1493                     )
   1494                 })?;
   1495                 let entity_id = address.sender_id.clone();
   1496                 Ok((address, entity_id, RadrootsSimplexSmpCommand::Ping))
   1497             }
   1498             RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData {
   1499                 queue,
   1500                 link_id,
   1501                 link_data,
   1502             } => Ok((
   1503                 queue.clone(),
   1504                 self.store
   1505                     .queue_record(&command.connection_id, queue)?
   1506                     .entity_id,
   1507                 RadrootsSimplexSmpCommand::LSet {
   1508                     link_id: link_id.clone(),
   1509                     link_data: link_data.clone(),
   1510                 },
   1511             )),
   1512             RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. }
   1513             | RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => {
   1514                 Err(RadrootsSimplexAgentRuntimeError::Runtime(
   1515                     "SimpleX short-link retrieval commands require server transport dispatch"
   1516                         .into(),
   1517                 ))
   1518             }
   1519         }
   1520     }
   1521 
   1522     fn short_link_messaging_queue_request(
   1523         &self,
   1524         connection_id: &str,
   1525         correlation_id: &RadrootsSimplexSmpCorrelationId,
   1526     ) -> Result<Option<RadrootsSimplexSmpMessagingQueueRequest>, RadrootsSimplexAgentRuntimeError>
   1527     {
   1528         let connection = self.store.connection(connection_id)?;
   1529         if connection.status != RadrootsSimplexAgentConnectionStatus::CreatePending {
   1530             return Ok(None);
   1531         }
   1532         let Some(short_link) = connection.short_link.as_ref() else {
   1533             return Ok(None);
   1534         };
   1535         let fixed_data = short_link.encrypted_fixed_data.clone().ok_or_else(|| {
   1536             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1537                 "SimpleX connection `{connection_id}` is missing encrypted short-link fixed data"
   1538             ))
   1539         })?;
   1540         let user_data = short_link.encrypted_user_data.clone().ok_or_else(|| {
   1541             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1542                 "SimpleX connection `{connection_id}` is missing encrypted short-link user data"
   1543             ))
   1544         })?;
   1545         Ok(Some(RadrootsSimplexSmpMessagingQueueRequest {
   1546             sender_id: short_link_sender_id(correlation_id),
   1547             link_data: RadrootsSimplexSmpQueueLinkData {
   1548                 fixed_data,
   1549                 user_data,
   1550             },
   1551         }))
   1552     }
   1553 
   1554     fn process_short_link_response(
   1555         &mut self,
   1556         command: &RadrootsSimplexAgentPendingCommand,
   1557         sender_id: Vec<u8>,
   1558         link_data: RadrootsSimplexSmpQueueLinkData,
   1559     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1560         let RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData {
   1561             invitation,
   1562             reply_queue,
   1563         } = &command.kind
   1564         else {
   1565             if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
   1566                 invitation,
   1567                 reply_queue,
   1568                 sender_auth_state,
   1569             } = &command.kind
   1570             {
   1571                 let mut connection_link =
   1572                     decrypt_short_invitation_link_data(invitation, &link_data)?;
   1573                 connection_link.invitation_queue.sender_id = URL_SAFE_NO_PAD.encode(sender_id);
   1574                 return self.prepare_join_connection(
   1575                     &command.connection_id,
   1576                     connection_link,
   1577                     reply_queue.clone(),
   1578                     command.ready_at,
   1579                     Some(sender_auth_state.clone()),
   1580                 );
   1581             }
   1582             return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   1583                 "SimpleX LNK response received for non-retrieval command".into(),
   1584             ));
   1585         };
   1586         let mut connection_link = decrypt_short_invitation_link_data(invitation, &link_data)?;
   1587         connection_link.invitation_queue.sender_id = URL_SAFE_NO_PAD.encode(sender_id);
   1588         self.prepare_join_connection(
   1589             &command.connection_id,
   1590             connection_link,
   1591             reply_queue.clone(),
   1592             command.ready_at,
   1593             None,
   1594         )
   1595     }
   1596 
   1597     fn apply_transport_response(
   1598         &mut self,
   1599         command: &RadrootsSimplexAgentPendingCommand,
   1600         response: RadrootsSimplexSmpTransportResponse,
   1601     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1602         match response.transmission.message {
   1603             RadrootsSimplexSmpBrokerMessage::Err(error)
   1604                 if is_empty_queue_no_msg(command, &error) =>
   1605             {
   1606                 self.record_command_outcome(
   1607                     command.id,
   1608                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1609                 )
   1610             }
   1611             RadrootsSimplexSmpBrokerMessage::Err(error) => self.record_command_outcome(
   1612                 command.id,
   1613                 RadrootsSimplexAgentCommandOutcome::Failed {
   1614                     message: format!(
   1615                         "SimpleX broker rejected command `{}` ({}): {:?}",
   1616                         command.id,
   1617                         pending_command_kind_label(command),
   1618                         error
   1619                     ),
   1620                 },
   1621             ),
   1622             RadrootsSimplexSmpBrokerMessage::Ids(ids) => {
   1623                 self.process_queue_ids_response(command, ids)?;
   1624                 self.record_command_outcome(
   1625                     command.id,
   1626                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1627                 )
   1628             }
   1629             RadrootsSimplexSmpBrokerMessage::Msg(message) => {
   1630                 let queue = queue_for_command(command).ok_or_else(|| {
   1631                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1632                         "SimpleX command `{}` has no queue context for broker message",
   1633                         command.id
   1634                     ))
   1635                 })?;
   1636                 self.process_received_message_response(
   1637                     &command.connection_id,
   1638                     &queue,
   1639                     message,
   1640                     response.transport_hash,
   1641                 )?;
   1642                 self.record_command_outcome(
   1643                     command.id,
   1644                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1645                 )
   1646             }
   1647             RadrootsSimplexSmpBrokerMessage::Lnk {
   1648                 sender_id,
   1649                 link_data,
   1650             } => {
   1651                 self.process_short_link_response(command, sender_id, link_data)?;
   1652                 self.record_command_outcome(
   1653                     command.id,
   1654                     RadrootsSimplexAgentCommandOutcome::Delivered,
   1655                 )
   1656             }
   1657             _ => self
   1658                 .record_command_outcome(command.id, RadrootsSimplexAgentCommandOutcome::Delivered),
   1659         }
   1660     }
   1661 
   1662     fn apply_subscription_response(
   1663         &mut self,
   1664         response: RadrootsSimplexSmpTransportResponse,
   1665     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1666         let entity_id = response.transmission.entity_id.clone();
   1667         let (connection_id, queue) = self
   1668             .store
   1669             .receive_queue_by_entity_id(&response.server, &entity_id)
   1670             .ok_or_else(|| {
   1671                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1672                     "SimpleX subscription response for server `{}` used unknown queue entity `{}`",
   1673                     response.server.server_identity,
   1674                     URL_SAFE_NO_PAD.encode(&entity_id)
   1675                 ))
   1676             })?;
   1677         match response.transmission.message {
   1678             RadrootsSimplexSmpBrokerMessage::Msg(message) => self
   1679                 .process_received_message_response(
   1680                     &connection_id,
   1681                     &queue,
   1682                     message,
   1683                     response.transport_hash,
   1684                 ),
   1685             RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg) => Ok(()),
   1686             RadrootsSimplexSmpBrokerMessage::Err(error) => {
   1687                 self.events
   1688                     .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
   1689                         connection_id: Some(connection_id),
   1690                         message: format!(
   1691                             "SimpleX subscription broker error for queue entity `{}`: {:?}",
   1692                             URL_SAFE_NO_PAD.encode(&entity_id),
   1693                             error
   1694                         ),
   1695                     });
   1696                 Ok(())
   1697             }
   1698             _ => Ok(()),
   1699         }
   1700     }
   1701 
   1702     fn apply_delivery_side_effects(
   1703         &mut self,
   1704         command: &RadrootsSimplexAgentPendingCommand,
   1705     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1706         match &command.kind {
   1707             RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   1708                 delivery: Some(delivery),
   1709                 ..
   1710             } => {
   1711                 let delivered = self
   1712                     .store
   1713                     .confirm_outbound_message(&command.connection_id, delivery.message_id)?;
   1714                 self.events
   1715                     .push_back(RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered {
   1716                         connection_id: command.connection_id.clone(),
   1717                         message_id: delivered.message_id,
   1718                         message_hash: delivered.message_hash,
   1719                     });
   1720                 let connection = self.store.connection(&command.connection_id)?;
   1721                 if connection.status == RadrootsSimplexAgentConnectionStatus::Allowed
   1722                     && connection.hello_sent
   1723                     && delivered.message_id == 1
   1724                 {
   1725                     self.store.set_status(
   1726                         &command.connection_id,
   1727                         RadrootsSimplexAgentConnectionStatus::Connected,
   1728                     )?;
   1729                     self.events.push_back(
   1730                         RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished {
   1731                             connection_id: command.connection_id.clone(),
   1732                         },
   1733                     );
   1734                 }
   1735             }
   1736             RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => {
   1737                 self.store
   1738                     .mark_queue_subscribed(&command.connection_id, queue)?;
   1739             }
   1740             RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => {
   1741                 for queue in queues {
   1742                     self.store
   1743                         .mark_queue_tested(&command.connection_id, queue)?;
   1744                 }
   1745             }
   1746             RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
   1747                 receipt: Some(receipt),
   1748                 ..
   1749             } => {
   1750                 self.events.push_back(
   1751                     RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered {
   1752                         connection_id: command.connection_id.clone(),
   1753                         message_id: receipt.message_id,
   1754                         message_hash: receipt.message_hash.clone(),
   1755                     },
   1756                 );
   1757             }
   1758             _ => {}
   1759         }
   1760         Ok(())
   1761     }
   1762 
   1763     fn apply_failure_side_effects(
   1764         &mut self,
   1765         command: &RadrootsSimplexAgentPendingCommand,
   1766     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1767         if let RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   1768             delivery: Some(delivery),
   1769             ..
   1770         } = &command.kind
   1771         {
   1772             let _ = self
   1773                 .store
   1774                 .clear_staged_outbound_message(&command.connection_id, delivery.message_id)?;
   1775         }
   1776         Ok(())
   1777     }
   1778 
   1779     fn enqueue_hello(
   1780         &mut self,
   1781         connection_id: &str,
   1782         now: u64,
   1783     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1784         if self.store.connection(connection_id)?.hello_sent {
   1785             return Ok(());
   1786         }
   1787         let send_queue = self.store.primary_send_queue(connection_id)?;
   1788         let connection = self.store.connection(connection_id)?;
   1789         let previous_hash = connection
   1790             .delivery_cursor
   1791             .last_sent_message_hash
   1792             .clone()
   1793             .unwrap_or_default();
   1794         let message_id = connection
   1795             .delivery_cursor
   1796             .last_sent_message_id
   1797             .unwrap_or(0)
   1798             .saturating_add(1);
   1799         let frame = RadrootsSimplexAgentMessageFrame {
   1800             header: RadrootsSimplexAgentMessageHeader {
   1801                 message_id,
   1802                 previous_message_hash: previous_hash,
   1803             },
   1804             message: RadrootsSimplexAgentMessage::Hello,
   1805             padding: Vec::new(),
   1806         };
   1807         let ciphertext =
   1808             encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?;
   1809         let message_hash = Sha256::digest(&ciphertext).to_vec();
   1810         let prepared = self
   1811             .store
   1812             .prepare_outbound_message(connection_id, message_hash)?;
   1813         let encrypted = self.next_encrypted_payload(
   1814             connection_id,
   1815             ciphertext,
   1816             SimplexAgentPayloadKind::Message,
   1817         )?;
   1818         self.store.enqueue_command(
   1819             connection_id,
   1820             RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   1821                 queue: send_queue.descriptor.queue_address(),
   1822                 envelope: RadrootsSimplexAgentEnvelope::Message(encrypted),
   1823                 delivery: Some(RadrootsSimplexAgentOutboundMessage {
   1824                     message_id: prepared.message_id,
   1825                     message_hash: prepared.message_hash,
   1826                 }),
   1827             },
   1828             now,
   1829         )?;
   1830         self.store.connection_mut(connection_id)?.hello_sent = true;
   1831         Ok(())
   1832     }
   1833 
   1834     fn encode_smp_message_body(
   1835         &self,
   1836         connection_id: &str,
   1837         envelope: &RadrootsSimplexAgentEnvelope,
   1838     ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
   1839         let shared_secret = self
   1840             .store
   1841             .connection(connection_id)?
   1842             .shared_secret
   1843             .clone()
   1844             .ok_or_else(|| {
   1845                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1846                     "SimpleX connection `{connection_id}` has no shared queue secret"
   1847                 ))
   1848             })?;
   1849         let sender_public_key = match envelope {
   1850             RadrootsSimplexAgentEnvelope::Confirmation {
   1851                 reply_queue: true, ..
   1852             } => Some(
   1853                 self.store
   1854                     .connection(connection_id)?
   1855                     .local_e2e_public_key
   1856                     .clone()
   1857                     .ok_or_else(|| {
   1858                         RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1859                             "SimpleX connection `{connection_id}` is missing local E2E public key"
   1860                         ))
   1861                     })?,
   1862             ),
   1863             _ => None,
   1864         };
   1865         let mut body = Vec::with_capacity(1 + 512);
   1866         body.push(b'_');
   1867         body.extend_from_slice(&encode_envelope(envelope)?);
   1868         let nonce = random_nonce()
   1869             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   1870         let padded_len = match envelope {
   1871             RadrootsSimplexAgentEnvelope::Confirmation { .. } => SIMPLEX_E2E_CONFIRMATION_LENGTH,
   1872             _ => SIMPLEX_E2E_MESSAGE_LENGTH,
   1873         };
   1874         let ciphertext = encrypt_padded(&shared_secret, &nonce, &body, padded_len)
   1875             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   1876         encode_client_message_envelope(&SimplexClientMessageEnvelope {
   1877             sender_public_key,
   1878             nonce,
   1879             ciphertext,
   1880         })
   1881     }
   1882 
   1883     fn process_queue_ids_response(
   1884         &mut self,
   1885         command: &RadrootsSimplexAgentPendingCommand,
   1886         ids: RadrootsSimplexSmpQueueIdsResponse,
   1887     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   1888         let RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } = &command.kind
   1889         else {
   1890             return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   1891                 "SimpleX IDS response received for non-create command".into(),
   1892             ));
   1893         };
   1894 
   1895         let old_address = descriptor.queue_address();
   1896         let sender_id = URL_SAFE_NO_PAD.encode(&ids.sender_id);
   1897         let mut invitation_event = None;
   1898         let mut join_confirmation = None;
   1899         let subscribe_queue;
   1900 
   1901         {
   1902             let connection = self.store.connection_mut(&command.connection_id)?;
   1903             let queue = connection
   1904                 .queues
   1905                 .iter_mut()
   1906                 .find(|queue| queue.descriptor.queue_address() == old_address)
   1907                 .ok_or_else(|| {
   1908                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1909                         "SimpleX connection `{}` missing receive queue for IDS",
   1910                         command.connection_id
   1911                     ))
   1912                 })?;
   1913             let delivery_private_key = queue.delivery_private_key.clone().ok_or_else(|| {
   1914                 RadrootsSimplexAgentRuntimeError::Runtime(
   1915                     "SimpleX receive queue missing delivery private key".into(),
   1916                 )
   1917             })?;
   1918             let server_dh_public_key = decode_x25519_public_key_x509(&ids.server_dh_public_key)
   1919                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   1920             queue.delivery_shared_secret = Some(
   1921                 derive_shared_secret(&delivery_private_key, &server_dh_public_key).map_err(
   1922                     |error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()),
   1923                 )?,
   1924             );
   1925             queue.entity_id = ids.recipient_id.clone();
   1926             queue.descriptor.queue_uri.sender_id = sender_id;
   1927             if let Some(queue_mode) = ids.queue_mode {
   1928                 queue.descriptor.queue_uri.queue_mode = Some(queue_mode);
   1929             }
   1930             let new_address = queue.descriptor.queue_address();
   1931             subscribe_queue = new_address.clone();
   1932 
   1933             if connection.status == RadrootsSimplexAgentConnectionStatus::CreatePending {
   1934                 connection.status = RadrootsSimplexAgentConnectionStatus::InvitationReady;
   1935                 if let Some(invitation) = connection.invitation.as_mut() {
   1936                     invitation.invitation_queue = queue.descriptor.queue_uri.clone();
   1937                 }
   1938                 if let Some(short_link) = connection.short_link.as_mut() {
   1939                     short_link.link_id = ids.link_id.clone().ok_or_else(|| {
   1940                         RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1941                             "SimpleX broker IDS response for `{}` did not include a short-link id",
   1942                             command.connection_id
   1943                         ))
   1944                     })?;
   1945                     short_link.hosts = queue.descriptor.queue_uri.server.hosts.clone();
   1946                     short_link.port = queue.descriptor.queue_uri.server.port;
   1947                     invitation_event = Some(short_link.invitation_link());
   1948                 }
   1949             } else if connection.status == RadrootsSimplexAgentConnectionStatus::JoinPending {
   1950                 let local_x3dh_key_1 = connection.local_x3dh_key_1.as_ref().ok_or_else(|| {
   1951                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1952                         "SimpleX connection `{}` missing local X3DH key 1",
   1953                         command.connection_id
   1954                     ))
   1955                 })?;
   1956                 let local_x3dh_key_2 = connection.local_x3dh_key_2.as_ref().ok_or_else(|| {
   1957                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1958                         "SimpleX connection `{}` missing local X3DH key 2",
   1959                         command.connection_id
   1960                     ))
   1961                 })?;
   1962                 let ratchet_state = connection.ratchet_state.as_ref().ok_or_else(|| {
   1963                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   1964                         "SimpleX connection `{}` missing ratchet state",
   1965                         command.connection_id
   1966                     ))
   1967                 })?;
   1968                 join_confirmation = Some((
   1969                     queue.descriptor.clone(),
   1970                     official_x3dh_params_from_parts(
   1971                         &local_x3dh_key_1.public_key,
   1972                         &local_x3dh_key_2.public_key,
   1973                         ratchet_state.current_pq_public_key.clone(),
   1974                         ratchet_state.pending_outbound_pq_ciphertext.clone(),
   1975                     )?,
   1976                 ));
   1977             }
   1978         }
   1979 
   1980         self.store.enqueue_command(
   1981             &command.connection_id,
   1982             RadrootsSimplexAgentPendingCommandKind::SubscribeQueue {
   1983                 queue: subscribe_queue,
   1984             },
   1985             command.ready_at,
   1986         )?;
   1987         if let Some(invitation) = invitation_event {
   1988             self.events
   1989                 .push_back(RadrootsSimplexAgentRuntimeEvent::InvitationReady {
   1990                     connection_id: command.connection_id.clone(),
   1991                     invitation,
   1992                 });
   1993         }
   1994         if let Some((reply_descriptor, e2e_ratchet_params)) = join_confirmation {
   1995             let send_queue = self.store.primary_send_queue(&command.connection_id)?;
   1996             let confirmation_payload = self.next_encrypted_payload(
   1997                 &command.connection_id,
   1998                 encode_decrypted_message(
   1999                     &RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply {
   2000                         reply_queues: vec![reply_descriptor],
   2001                         info: Vec::new(),
   2002                     },
   2003                 )?,
   2004                 SimplexAgentPayloadKind::ConnectionInfo,
   2005             )?;
   2006             self.store.enqueue_command(
   2007                 &command.connection_id,
   2008                 RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   2009                     queue: send_queue.descriptor.queue_address(),
   2010                     envelope: RadrootsSimplexAgentEnvelope::Confirmation {
   2011                         reply_queue: true,
   2012                         e2e_ratchet_params: Some(e2e_ratchet_params),
   2013                         encrypted: confirmation_payload,
   2014                     },
   2015                     delivery: None,
   2016                 },
   2017                 command.ready_at,
   2018             )?;
   2019             self.events
   2020                 .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired {
   2021                     connection_id: command.connection_id.clone(),
   2022                 });
   2023         }
   2024         Ok(())
   2025     }
   2026 
   2027     fn process_received_message_response(
   2028         &mut self,
   2029         connection_id: &str,
   2030         queue: &radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress,
   2031         message: radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpReceivedMessage,
   2032         transport_hash: Vec<u8>,
   2033     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   2034         let connection = self.store.connection(connection_id)?;
   2035         if connection.last_received_queue.as_ref() == Some(queue)
   2036             && connection.last_received_broker_message_id.as_deref()
   2037                 == Some(message.message_id.as_slice())
   2038         {
   2039             self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
   2040             return Ok(());
   2041         }
   2042         let received = self.decode_received_message_body(connection_id, queue, &message)?;
   2043         if received.sent_body.is_empty() {
   2044             return Ok(());
   2045         }
   2046         let (envelope, derived_secret) =
   2047             self.decode_agent_envelope_payload(connection_id, &received.sent_body)?;
   2048         if let Some(shared_secret) = derived_secret {
   2049             self.store.connection_mut(connection_id)?.shared_secret = Some(shared_secret);
   2050         }
   2051         if self.is_official_payload_replay(connection_id, &envelope)? {
   2052             {
   2053                 let connection = self.store.connection_mut(connection_id)?;
   2054                 connection.last_received_queue = Some(queue.clone());
   2055                 connection.last_received_broker_message_id = Some(message.message_id.clone());
   2056             }
   2057             self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
   2058             return Ok(());
   2059         }
   2060         self.initialize_receiver_ratchet_from_confirmation(connection_id, &envelope)?;
   2061         let decrypted = match self.extract_decrypted_message(connection_id, &envelope) {
   2062             Ok(decrypted) => decrypted,
   2063             Err(RadrootsSimplexAgentRuntimeError::Crypto(
   2064                 RadrootsSimplexSmpCryptoError::RatchetMessageRegression { .. },
   2065             )) => {
   2066                 {
   2067                     let connection = self.store.connection_mut(connection_id)?;
   2068                     connection.last_received_queue = Some(queue.clone());
   2069                     connection.last_received_broker_message_id = Some(message.message_id.clone());
   2070                 }
   2071                 self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
   2072                 return Ok(());
   2073             }
   2074             Err(error) => return Err(error),
   2075         };
   2076         let agent_message_hash =
   2077             if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted {
   2078                 let encoded = encode_decrypted_message(&decrypted)?;
   2079                 let message_hash = Sha256::digest(&encoded).to_vec();
   2080                 self.validate_inbound_frame_progress(connection_id, frame, &message_hash)?;
   2081                 Some(message_hash)
   2082             } else {
   2083                 None
   2084             };
   2085         let requires_app_ack = matches!(
   2086             &decrypted,
   2087             RadrootsSimplexAgentDecryptedMessage::Message(frame)
   2088                 if matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_))
   2089         );
   2090         {
   2091             let connection = self.store.connection_mut(connection_id)?;
   2092             connection.last_received_queue = Some(queue.clone());
   2093             connection.last_received_broker_message_id = Some(message.message_id.clone());
   2094         }
   2095         let _ = received.timestamp;
   2096         let _ = received.flags;
   2097         if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted {
   2098             self.store.record_inbound_message(
   2099                 connection_id,
   2100                 queue.clone(),
   2101                 message.message_id.clone(),
   2102                 frame.header.message_id,
   2103                 agent_message_hash.clone().unwrap_or_default(),
   2104             )?;
   2105         }
   2106         let message_hash = agent_message_hash.unwrap_or_else(|| transport_hash);
   2107         self.handle_inbound_decrypted_message(connection_id, decrypted, message_hash)?;
   2108         if !requires_app_ack {
   2109             self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
   2110         }
   2111         Ok(())
   2112     }
   2113 
   2114     fn is_official_payload_replay(
   2115         &self,
   2116         connection_id: &str,
   2117         envelope: &RadrootsSimplexAgentEnvelope,
   2118     ) -> Result<bool, RadrootsSimplexAgentRuntimeError> {
   2119         let official_message = match envelope {
   2120             RadrootsSimplexAgentEnvelope::Confirmation { encrypted, .. }
   2121             | RadrootsSimplexAgentEnvelope::Message(encrypted)
   2122             | RadrootsSimplexAgentEnvelope::RatchetKey { encrypted, .. } => {
   2123                 encrypted.official_message.as_deref()
   2124             }
   2125             RadrootsSimplexAgentEnvelope::Invitation { .. } => None,
   2126         };
   2127         let Some(official_message) = official_message else {
   2128             return Ok(false);
   2129         };
   2130         let connection = self.store.connection(connection_id)?;
   2131         let Some(ratchet_state) = connection.ratchet_state.as_ref() else {
   2132             return Ok(false);
   2133         };
   2134         if ratchet_state.official_associated_data.is_none() {
   2135             return Ok(false);
   2136         }
   2137         ratchet_state
   2138             .is_official_payload_replay(official_message)
   2139             .map_err(Into::into)
   2140     }
   2141 
   2142     fn validate_inbound_frame_progress(
   2143         &self,
   2144         connection_id: &str,
   2145         frame: &RadrootsSimplexAgentMessageFrame,
   2146         message_hash: &[u8],
   2147     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   2148         if frame.header.message_id == 0 {
   2149             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2150                 "SimpleX inbound message id for `{connection_id}` must start at 1"
   2151             )));
   2152         }
   2153         let connection = self.store.connection(connection_id)?;
   2154         let Some(last_message_id) = connection.delivery_cursor.last_received_message_id else {
   2155             if connection.status == RadrootsSimplexAgentConnectionStatus::Connected
   2156                 && !connection.hello_received
   2157                 && frame.header.message_id == 2
   2158                 && !frame.header.previous_message_hash.is_empty()
   2159                 && matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_))
   2160             {
   2161                 return Ok(());
   2162             }
   2163             if frame.header.message_id != 1 {
   2164                 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2165                     "SimpleX inbound message id for `{connection_id}` started at `{}` instead of `1`",
   2166                     frame.header.message_id
   2167                 )));
   2168             }
   2169             if !frame.header.previous_message_hash.is_empty() {
   2170                 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2171                     "SimpleX first inbound message for `{connection_id}` carried a previous-message hash"
   2172                 )));
   2173             }
   2174             return Ok(());
   2175         };
   2176         let last_message_hash = connection
   2177             .delivery_cursor
   2178             .last_received_message_hash
   2179             .as_deref()
   2180             .ok_or_else(|| {
   2181                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2182                     "SimpleX connection `{connection_id}` has a received message id without a message hash"
   2183                 ))
   2184             })?;
   2185         if frame.header.message_id == last_message_id {
   2186             if message_hash == last_message_hash {
   2187                 return Ok(());
   2188             }
   2189             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2190                 "SimpleX inbound message id `{last_message_id}` for `{connection_id}` was replayed with a different message hash"
   2191             )));
   2192         }
   2193         if frame.header.message_id < last_message_id {
   2194             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2195                 "SimpleX inbound message id `{}` for `{connection_id}` regressed below `{last_message_id}`",
   2196                 frame.header.message_id
   2197             )));
   2198         }
   2199         let expected_message_id = last_message_id.checked_add(1).ok_or_else(|| {
   2200             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2201                 "SimpleX inbound message id for `{connection_id}` overflowed"
   2202             ))
   2203         })?;
   2204         if frame.header.message_id != expected_message_id {
   2205             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2206                 "SimpleX inbound message id `{}` for `{connection_id}` skipped expected `{expected_message_id}`",
   2207                 frame.header.message_id
   2208             )));
   2209         }
   2210         if frame.header.previous_message_hash != last_message_hash {
   2211             return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2212                 "SimpleX inbound message `{}` for `{connection_id}` carried an unexpected previous-message hash",
   2213                 frame.header.message_id
   2214             )));
   2215         }
   2216         Ok(())
   2217     }
   2218 
   2219     fn decode_received_message_body(
   2220         &mut self,
   2221         connection_id: &str,
   2222         queue: &radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress,
   2223         message: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpReceivedMessage,
   2224     ) -> Result<SimplexReceivedBody, RadrootsSimplexAgentRuntimeError> {
   2225         let queue_record = self.store.queue_record(connection_id, queue)?;
   2226         let delivery_secret = queue_record.delivery_shared_secret.ok_or_else(|| {
   2227             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2228                 "SimpleX receive queue on `{connection_id}` is missing delivery secret"
   2229             ))
   2230         })?;
   2231         let decrypted = decrypt_padded(
   2232             &delivery_secret,
   2233             &message.message_id,
   2234             &message.encrypted_body,
   2235         )
   2236         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2237         decode_received_body(&decrypted)
   2238     }
   2239 
   2240     fn decode_agent_envelope_payload(
   2241         &self,
   2242         connection_id: &str,
   2243         payload: &[u8],
   2244     ) -> Result<(RadrootsSimplexAgentEnvelope, Option<Vec<u8>>), RadrootsSimplexAgentRuntimeError>
   2245     {
   2246         let sent = decode_client_message_envelope(payload)?;
   2247         let derived_secret = match self.store.connection(connection_id)?.shared_secret.clone() {
   2248             Some(secret) => Some(secret),
   2249             None => {
   2250                 let sender_public_key = sent.sender_public_key.as_deref().ok_or_else(|| {
   2251                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2252                         "SimpleX connection `{connection_id}` received encrypted body without sender key"
   2253                     ))
   2254                 })?;
   2255                 let private_key = self
   2256                     .store
   2257                     .connection(connection_id)?
   2258                     .local_e2e_private_key
   2259                     .as_deref()
   2260                     .ok_or_else(|| {
   2261                         RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2262                             "SimpleX connection `{connection_id}` missing local E2E private key"
   2263                         ))
   2264                     })?;
   2265                 Some(
   2266                     derive_shared_secret(private_key, sender_public_key).map_err(|error| {
   2267                         RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
   2268                     })?,
   2269                 )
   2270             }
   2271         };
   2272         let shared_secret = derived_secret.clone().ok_or_else(|| {
   2273             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2274                 "SimpleX connection `{connection_id}` has no shared secret"
   2275             ))
   2276         })?;
   2277         let decrypted = decrypt_padded(&shared_secret, &sent.nonce, &sent.ciphertext)
   2278             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2279         let (_, payload) = decrypted.split_first().ok_or_else(|| {
   2280             RadrootsSimplexAgentRuntimeError::Runtime(
   2281                 "SimpleX decrypted client body is empty".into(),
   2282             )
   2283         })?;
   2284         let envelope = decode_envelope(payload)?;
   2285         let should_store_secret = self
   2286             .store
   2287             .connection(connection_id)?
   2288             .shared_secret
   2289             .is_none()
   2290             && sent.sender_public_key.is_some();
   2291         Ok((
   2292             envelope,
   2293             if should_store_secret {
   2294                 derived_secret
   2295             } else {
   2296                 None
   2297             },
   2298         ))
   2299     }
   2300 
   2301     fn initialize_receiver_ratchet_from_confirmation(
   2302         &mut self,
   2303         connection_id: &str,
   2304         envelope: &RadrootsSimplexAgentEnvelope,
   2305     ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   2306         let RadrootsSimplexAgentEnvelope::Confirmation {
   2307             e2e_ratchet_params: Some(params),
   2308             ..
   2309         } = envelope
   2310         else {
   2311             return Ok(());
   2312         };
   2313         let connection = self.store.connection(connection_id)?;
   2314         let local_key_1 = connection.local_x3dh_key_1.clone().ok_or_else(|| {
   2315             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2316                 "SimpleX connection `{connection_id}` missing local X3DH key 1"
   2317             ))
   2318         })?;
   2319         let local_key_2 = connection.local_x3dh_key_2.clone().ok_or_else(|| {
   2320             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2321                 "SimpleX connection `{connection_id}` missing local X3DH key 2"
   2322             ))
   2323         })?;
   2324         let local_pq_keypair = connection.local_pq_keypair.clone();
   2325         let local_key_1 = official_x3dh_keypair_from_agent(local_key_1);
   2326         let local_key_2 = official_x3dh_keypair_from_agent(local_key_2);
   2327         let receiver_init = if params.pq_public_key.is_some() || params.pq_ciphertext.is_some() {
   2328             let local_pq_keypair = local_pq_keypair.as_ref().ok_or_else(|| {
   2329                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2330                     "SimpleX connection `{connection_id}` missing local PQ keypair"
   2331                 ))
   2332             })?;
   2333             official_x3dh_receiver_init_accepting_pq(
   2334                 &local_key_1,
   2335                 &local_key_2,
   2336                 &official_pq_keypair_from_agent(local_pq_keypair.clone()),
   2337                 params,
   2338             )
   2339             .map(|init| init.init)
   2340             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?
   2341         } else {
   2342             official_x3dh_receiver_init(&local_key_1, &local_key_2, params)
   2343                 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?
   2344         };
   2345         let connection = self.store.connection_mut(connection_id)?;
   2346         let ratchet_state = connection.ratchet_state.as_mut().ok_or_else(|| {
   2347             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2348                 "SimpleX connection `{connection_id}` has no ratchet state"
   2349             ))
   2350         })?;
   2351         if let Some(local_pq_keypair) = local_pq_keypair {
   2352             ratchet_state.current_pq_public_key = Some(local_pq_keypair.public_key);
   2353             ratchet_state.local_pq_private_key = Some(local_pq_keypair.private_key);
   2354         }
   2355         ratchet_state
   2356             .initialize_official_receiver(local_key_2.private_key, receiver_init)
   2357             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))
   2358     }
   2359 
   2360     fn next_encrypted_payload(
   2361         &mut self,
   2362         connection_id: &str,
   2363         plaintext: Vec<u8>,
   2364         payload_kind: SimplexAgentPayloadKind,
   2365     ) -> Result<RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentRuntimeError> {
   2366         let shared_secret = self
   2367             .store
   2368             .connection(connection_id)?
   2369             .shared_secret
   2370             .clone()
   2371             .ok_or_else(|| {
   2372                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2373                     "SimpleX connection `{connection_id}` has no shared secret"
   2374                 ))
   2375             })?;
   2376         let padded_len = self.agent_payload_padded_len(connection_id, payload_kind)?;
   2377         let official_message = self
   2378             .store
   2379             .connection_mut(connection_id)?
   2380             .ratchet_state
   2381             .as_mut()
   2382             .ok_or_else(|| {
   2383                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2384                     "SimpleX connection `{connection_id}` has no ratchet state"
   2385                 ))
   2386             })?
   2387             .encrypt_official_payload(&shared_secret, &plaintext, padded_len)
   2388             .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2389         Ok(RadrootsSimplexAgentEncryptedPayload {
   2390             ratchet_header: None,
   2391             official_message: Some(official_message),
   2392             ciphertext: Vec::new(),
   2393         })
   2394     }
   2395 
   2396     fn extract_decrypted_message(
   2397         &mut self,
   2398         connection_id: &str,
   2399         envelope: &RadrootsSimplexAgentEnvelope,
   2400     ) -> Result<RadrootsSimplexAgentDecryptedMessage, RadrootsSimplexAgentRuntimeError> {
   2401         match envelope {
   2402             RadrootsSimplexAgentEnvelope::Confirmation { encrypted, .. }
   2403             | RadrootsSimplexAgentEnvelope::Message(encrypted)
   2404             | RadrootsSimplexAgentEnvelope::RatchetKey { encrypted, .. } => {
   2405                 let plaintext = self.decrypt_agent_payload(connection_id, encrypted)?;
   2406                 decode_decrypted_message(&plaintext).map_err(Into::into)
   2407             }
   2408             RadrootsSimplexAgentEnvelope::Invitation {
   2409                 connection_info, ..
   2410             } => decode_decrypted_message(connection_info).map_err(Into::into),
   2411         }
   2412     }
   2413 
   2414     fn decrypt_agent_payload(
   2415         &mut self,
   2416         connection_id: &str,
   2417         encrypted: &RadrootsSimplexAgentEncryptedPayload,
   2418     ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
   2419         let shared_secret = self
   2420             .store
   2421             .connection(connection_id)?
   2422             .shared_secret
   2423             .clone()
   2424             .ok_or_else(|| {
   2425                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2426                     "SimpleX connection `{connection_id}` has no shared secret"
   2427                 ))
   2428             })?;
   2429         if let Some(official_message) = encrypted.official_message.as_ref() {
   2430             return self
   2431                 .store
   2432                 .connection_mut(connection_id)?
   2433                 .ratchet_state
   2434                 .as_mut()
   2435                 .ok_or_else(|| {
   2436                     RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2437                         "SimpleX connection `{connection_id}` has no ratchet state"
   2438                     ))
   2439                 })?
   2440                 .decrypt_official_payload(&shared_secret, official_message)
   2441                 .map_err(Into::into);
   2442         }
   2443         let header = encrypted.ratchet_header.as_ref().ok_or_else(|| {
   2444             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2445                 "SimpleX connection `{connection_id}` received agent payload without ratchet header"
   2446             ))
   2447         })?;
   2448         self.store
   2449             .connection_mut(connection_id)?
   2450             .ratchet_state
   2451             .as_mut()
   2452             .ok_or_else(|| {
   2453                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2454                     "SimpleX connection `{connection_id}` has no ratchet state"
   2455                 ))
   2456             })?
   2457             .decrypt_payload(&shared_secret, header, &encrypted.ciphertext)
   2458             .map_err(Into::into)
   2459     }
   2460 
   2461     fn agent_payload_padded_len(
   2462         &self,
   2463         connection_id: &str,
   2464         payload_kind: SimplexAgentPayloadKind,
   2465     ) -> Result<usize, RadrootsSimplexAgentRuntimeError> {
   2466         let ratchet = self
   2467             .store
   2468             .connection(connection_id)?
   2469             .ratchet_state
   2470             .as_ref()
   2471             .ok_or_else(|| {
   2472                 RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2473                     "SimpleX connection `{connection_id}` has no ratchet state"
   2474                 ))
   2475             })?;
   2476         let pq_enabled = ratchet.current_pq_public_key.is_some()
   2477             || ratchet.remote_pq_public_key.is_some()
   2478             || ratchet.current_pq_shared_secret.is_some()
   2479             || ratchet.local_pq_private_key.is_some();
   2480         Ok(match (payload_kind, pq_enabled) {
   2481             (SimplexAgentPayloadKind::ConnectionInfo, true) => {
   2482                 SIMPLEX_AGENT_E2E_CONN_INFO_PQ_LENGTH
   2483             }
   2484             (SimplexAgentPayloadKind::ConnectionInfo, false) => SIMPLEX_AGENT_E2E_CONN_INFO_LENGTH,
   2485             (SimplexAgentPayloadKind::Message, true) => SIMPLEX_AGENT_E2E_MESSAGE_PQ_LENGTH,
   2486             (SimplexAgentPayloadKind::Message, false) => SIMPLEX_AGENT_E2E_MESSAGE_LENGTH,
   2487         })
   2488     }
   2489 
   2490     #[cfg(feature = "std")]
   2491     fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   2492         self.store.flush().map_err(Into::into)
   2493     }
   2494 
   2495     #[cfg(not(feature = "std"))]
   2496     fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> {
   2497         Ok(())
   2498     }
   2499 }
   2500 
   2501 fn derive_material(label: &[u8], parts: &[&[u8]]) -> Vec<u8> {
   2502     let mut hasher = Sha256::new();
   2503     hasher.update(label);
   2504     for part in parts {
   2505         hasher.update((*part).len().to_be_bytes());
   2506         hasher.update(*part);
   2507     }
   2508     hasher.finalize().to_vec()
   2509 }
   2510 
   2511 fn agent_x3dh_keypair(
   2512     keypair: RadrootsSimplexOfficialX448Keypair,
   2513 ) -> RadrootsSimplexAgentX3dhKeypair {
   2514     RadrootsSimplexAgentX3dhKeypair {
   2515         public_key: keypair.public_key,
   2516         private_key: keypair.private_key,
   2517     }
   2518 }
   2519 
   2520 fn official_x3dh_keypair_from_agent(
   2521     keypair: RadrootsSimplexAgentX3dhKeypair,
   2522 ) -> RadrootsSimplexOfficialX448Keypair {
   2523     RadrootsSimplexOfficialX448Keypair {
   2524         public_key: keypair.public_key,
   2525         private_key: keypair.private_key,
   2526     }
   2527 }
   2528 
   2529 fn agent_pq_keypair(
   2530     keypair: RadrootsSimplexOfficialSntrup761Keypair,
   2531 ) -> RadrootsSimplexAgentPqKeypair {
   2532     RadrootsSimplexAgentPqKeypair {
   2533         public_key: keypair.public_key,
   2534         private_key: keypair.private_key,
   2535     }
   2536 }
   2537 
   2538 fn official_pq_keypair_from_agent(
   2539     keypair: RadrootsSimplexAgentPqKeypair,
   2540 ) -> RadrootsSimplexOfficialSntrup761Keypair {
   2541     RadrootsSimplexOfficialSntrup761Keypair {
   2542         public_key: keypair.public_key,
   2543         private_key: keypair.private_key,
   2544     }
   2545 }
   2546 
   2547 fn official_x3dh_params_from_parts(
   2548     key_1: &[u8],
   2549     key_2: &[u8],
   2550     pq_public_key: Option<Vec<u8>>,
   2551     pq_ciphertext: Option<Vec<u8>>,
   2552 ) -> Result<RadrootsSimplexOfficialX3dhParams, RadrootsSimplexAgentRuntimeError> {
   2553     Ok(RadrootsSimplexOfficialX3dhParams {
   2554         version_range: RadrootsSimplexSmpVersionRange::new(
   2555             RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION,
   2556             RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION,
   2557         )
   2558         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?,
   2559         key_1: key_1.to_vec(),
   2560         key_2: key_2.to_vec(),
   2561         pq_public_key,
   2562         pq_ciphertext,
   2563     })
   2564 }
   2565 
   2566 fn prepare_short_invitation_link_data(
   2567     invitation: &RadrootsSimplexAgentConnectionLink,
   2568 ) -> Result<SimplexPreparedShortInvitationLinkData, RadrootsSimplexAgentRuntimeError> {
   2569     let root_keypair = RadrootsSimplexSmpEd25519Keypair::generate()
   2570         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2571     let fixed_data = encode_short_invitation_fixed_data(&root_keypair.public_key, invitation)?;
   2572     let user_data = encode_short_invitation_user_data(invitation)?;
   2573     let (link_key, signed_link_data) = sign_short_link_data(&root_keypair, &fixed_data, &user_data)
   2574         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2575     let link_data_key = derive_invitation_short_link_data_key(&link_key)
   2576         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2577     let encrypted_link_data = encrypt_short_link_data(&link_data_key, &signed_link_data)
   2578         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
   2579     Ok(SimplexPreparedShortInvitationLinkData {
   2580         link_key,
   2581         link_public_signature_key: root_keypair.public_key,
   2582         link_private_signature_key: root_keypair.private_key,
   2583         encrypted_link_data,
   2584     })
   2585 }
   2586 
   2587 fn short_invitation_server(
   2588     invitation: &RadrootsSimplexAgentShortInvitationLink,
   2589 ) -> Result<RadrootsSimplexSmpServerAddress, RadrootsSimplexAgentRuntimeError> {
   2590     if invitation.hosts.is_empty() {
   2591         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2592             "SimpleX short invitation link does not include a relay host".into(),
   2593         ));
   2594     }
   2595     let server_key_hash = invitation.server_key_hash.as_ref().ok_or_else(|| {
   2596         RadrootsSimplexAgentRuntimeError::Runtime(
   2597             "SimpleX short invitation link does not include a server key hash".into(),
   2598         )
   2599     })?;
   2600     Ok(RadrootsSimplexSmpServerAddress {
   2601         server_identity: URL_SAFE_NO_PAD.encode(server_key_hash),
   2602         hosts: invitation.hosts.clone(),
   2603         port: invitation.port,
   2604     })
   2605 }
   2606 
   2607 fn decode_server_key_hash(
   2608     server_identity: &str,
   2609 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
   2610     URL_SAFE_NO_PAD
   2611         .decode(server_identity.as_bytes())
   2612         .or_else(|_| URL_SAFE.decode(server_identity.as_bytes()))
   2613         .map_err(|error| {
   2614             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2615                 "failed to decode SimpleX server identity: {error}"
   2616             ))
   2617         })
   2618 }
   2619 
   2620 fn correlation_id_from_material(
   2621     label: &[u8],
   2622     parts: &[Vec<u8>],
   2623 ) -> RadrootsSimplexSmpCorrelationId {
   2624     let refs = parts.iter().map(Vec::as_slice).collect::<Vec<&[u8]>>();
   2625     let digest = derive_material(label, &refs);
   2626     let mut correlation = [0_u8; RadrootsSimplexSmpCorrelationId::LENGTH];
   2627     correlation.copy_from_slice(&digest[..RadrootsSimplexSmpCorrelationId::LENGTH]);
   2628     RadrootsSimplexSmpCorrelationId::new(correlation)
   2629 }
   2630 
   2631 fn short_link_sender_id(correlation_id: &RadrootsSimplexSmpCorrelationId) -> Vec<u8> {
   2632     let digest = Sha3_384::digest(correlation_id.as_bytes());
   2633     digest[..RadrootsSimplexSmpCorrelationId::LENGTH].to_vec()
   2634 }
   2635 
   2636 fn encode_queue_public_key(public_key: &[u8]) -> Result<String, RadrootsSimplexSmpCryptoError> {
   2637     Ok(URL_SAFE.encode(encode_x25519_public_key_x509(public_key)?))
   2638 }
   2639 
   2640 fn decode_queue_public_key(encoded: &str) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
   2641     let bytes = URL_SAFE
   2642         .decode(encoded.as_bytes())
   2643         .or_else(|_| URL_SAFE_NO_PAD.decode(encoded.as_bytes()))
   2644         .map_err(|error| {
   2645             RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2646                 "failed to decode SimpleX queue E2E public key: {error}"
   2647             ))
   2648         })?;
   2649     decode_x25519_public_key_x509(&bytes)
   2650         .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))
   2651 }
   2652 
   2653 fn placeholder_sender_id(seed_a: &[u8], seed_b: &[u8]) -> String {
   2654     let digest = derive_material(b"simplex-placeholder-sender-id", &[seed_a, seed_b]);
   2655     URL_SAFE_NO_PAD.encode(&digest[..18])
   2656 }
   2657 
   2658 fn queue_for_command(
   2659     command: &RadrootsSimplexAgentPendingCommand,
   2660 ) -> Option<RadrootsSimplexAgentQueueAddress> {
   2661     match &command.kind {
   2662         RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => {
   2663             Some(descriptor.queue_address())
   2664         }
   2665         RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, .. }
   2666         | RadrootsSimplexAgentPendingCommandKind::SendEnvelope { queue, .. }
   2667         | RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue }
   2668         | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue }
   2669         | RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { queue, .. }
   2670         | RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { queue, .. } => {
   2671             Some(queue.clone())
   2672         }
   2673         RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => descriptors
   2674             .first()
   2675             .map(RadrootsSimplexAgentQueueDescriptor::queue_address),
   2676         RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => queues.first().cloned(),
   2677         RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. }
   2678         | RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => None,
   2679     }
   2680 }
   2681 
   2682 fn pending_command_kind_label(command: &RadrootsSimplexAgentPendingCommand) -> &'static str {
   2683     match command.kind {
   2684         RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. } => "create_queue",
   2685         RadrootsSimplexAgentPendingCommandKind::SecureQueue { .. } => "secure_queue",
   2686         RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. } => "subscribe_queue",
   2687         RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. } => "get_queue_message",
   2688         RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. } => "ack_inbox_message",
   2689         RadrootsSimplexAgentPendingCommandKind::SendEnvelope { .. } => "send_envelope",
   2690         RadrootsSimplexAgentPendingCommandKind::RotateQueues { .. } => "rotate_queues",
   2691         RadrootsSimplexAgentPendingCommandKind::TestQueues { .. } => "test_queues",
   2692         RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { .. } => "set_queue_link_data",
   2693         RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => "get_queue_link_data",
   2694         RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. } => {
   2695             "secure_get_queue_link_data"
   2696         }
   2697     }
   2698 }
   2699 
   2700 fn is_empty_queue_no_msg(
   2701     command: &RadrootsSimplexAgentPendingCommand,
   2702     error: &RadrootsSimplexSmpError,
   2703 ) -> bool {
   2704     matches!(
   2705         command.kind,
   2706         RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. }
   2707             | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. }
   2708             | RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. }
   2709     ) && matches!(error, RadrootsSimplexSmpError::NoMsg)
   2710 }
   2711 
   2712 fn encode_client_message_envelope(
   2713     envelope: &SimplexClientMessageEnvelope,
   2714 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
   2715     let mut buffer = Vec::with_capacity(
   2716         2 + 1
   2717             + envelope
   2718                 .sender_public_key
   2719                 .as_ref()
   2720                 .map_or(0, |value| 1 + value.len())
   2721             + 24
   2722             + envelope.ciphertext.len(),
   2723     );
   2724     buffer.extend_from_slice(&RADROOTS_SIMPLEX_SMP_CURRENT_CLIENT_VERSION.to_be_bytes());
   2725     match envelope.sender_public_key.as_deref() {
   2726         Some(sender_public_key) => {
   2727             if sender_public_key.len() > u8::MAX as usize {
   2728                 return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2729                     "SimpleX sender public key exceeds short-field limit".into(),
   2730                 ));
   2731             }
   2732             buffer.push(b'1');
   2733             buffer.push(sender_public_key.len() as u8);
   2734             buffer.extend_from_slice(sender_public_key);
   2735         }
   2736         None => buffer.push(b'0'),
   2737     }
   2738     buffer.extend_from_slice(&envelope.nonce);
   2739     buffer.extend_from_slice(&envelope.ciphertext);
   2740     Ok(buffer)
   2741 }
   2742 
   2743 fn decode_client_message_envelope(
   2744     bytes: &[u8],
   2745 ) -> Result<SimplexClientMessageEnvelope, RadrootsSimplexAgentRuntimeError> {
   2746     if bytes.len() < 2 + 1 + RADROOTS_SIMPLEX_SMP_NONCE_LENGTH {
   2747         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2748             "SimpleX client message envelope is truncated".into(),
   2749         ));
   2750     }
   2751     let _version = u16::from_be_bytes([bytes[0], bytes[1]]);
   2752     let mut index = 2;
   2753     let sender_public_key = match bytes[index] {
   2754         b'0' => {
   2755             index += 1;
   2756             None
   2757         }
   2758         b'1' => {
   2759             index += 1;
   2760             let length = *bytes.get(index).ok_or_else(|| {
   2761                 RadrootsSimplexAgentRuntimeError::Runtime(
   2762                     "SimpleX confirmation envelope is missing sender key length".into(),
   2763                 )
   2764             })? as usize;
   2765             index += 1;
   2766             let sender_public_key = bytes
   2767                 .get(index..index + length)
   2768                 .ok_or_else(|| {
   2769                     RadrootsSimplexAgentRuntimeError::Runtime(
   2770                         "SimpleX confirmation envelope is missing sender key bytes".into(),
   2771                     )
   2772                 })?
   2773                 .to_vec();
   2774             index += length;
   2775             Some(sender_public_key)
   2776         }
   2777         _ => {
   2778             return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2779                 "SimpleX client message envelope has an unknown public header".into(),
   2780             ));
   2781         }
   2782     };
   2783     let nonce_slice = bytes
   2784         .get(index..index + RADROOTS_SIMPLEX_SMP_NONCE_LENGTH)
   2785         .ok_or_else(|| {
   2786             RadrootsSimplexAgentRuntimeError::Runtime(
   2787                 "SimpleX client message envelope is missing nonce".into(),
   2788             )
   2789         })?;
   2790     let mut nonce = [0_u8; RADROOTS_SIMPLEX_SMP_NONCE_LENGTH];
   2791     nonce.copy_from_slice(nonce_slice);
   2792     index += RADROOTS_SIMPLEX_SMP_NONCE_LENGTH;
   2793     let ciphertext = bytes
   2794         .get(index..)
   2795         .ok_or_else(|| {
   2796             RadrootsSimplexAgentRuntimeError::Runtime(
   2797                 "SimpleX client message envelope is missing ciphertext".into(),
   2798             )
   2799         })?
   2800         .to_vec();
   2801     Ok(SimplexClientMessageEnvelope {
   2802         sender_public_key,
   2803         nonce,
   2804         ciphertext,
   2805     })
   2806 }
   2807 
   2808 fn decode_received_body(
   2809     bytes: &[u8],
   2810 ) -> Result<SimplexReceivedBody, RadrootsSimplexAgentRuntimeError> {
   2811     if let Some(timestamp_bytes) = bytes.strip_prefix(b"QUOTA ") {
   2812         let timestamp: [u8; 8] = timestamp_bytes.try_into().map_err(|_| {
   2813             RadrootsSimplexAgentRuntimeError::Runtime(
   2814                 "SimpleX quota notification has an invalid timestamp".into(),
   2815             )
   2816         })?;
   2817         return Ok(SimplexReceivedBody {
   2818             timestamp: u64::from_be_bytes(timestamp),
   2819             flags: RadrootsSimplexSmpMessageFlags::notifications_disabled(),
   2820             sent_body: Vec::new(),
   2821         });
   2822     }
   2823     if bytes.len() < 10 {
   2824         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2825             "SimpleX received body is truncated".into(),
   2826         ));
   2827     }
   2828     let timestamp = u64::from_be_bytes(bytes[..8].try_into().map_err(|_| {
   2829         RadrootsSimplexAgentRuntimeError::Runtime(
   2830             "SimpleX received body is missing timestamp".into(),
   2831         )
   2832     })?);
   2833     let flags_offset = bytes[8..]
   2834         .iter()
   2835         .position(|byte| *byte == b' ')
   2836         .ok_or_else(|| {
   2837             RadrootsSimplexAgentRuntimeError::Runtime(
   2838                 "SimpleX received body is missing message flags separator".into(),
   2839             )
   2840         })?
   2841         + 8;
   2842     let flags_bytes = &bytes[8..flags_offset];
   2843     if flags_bytes.is_empty() {
   2844         return Err(RadrootsSimplexAgentRuntimeError::Runtime(
   2845             "SimpleX received body is missing message flags".into(),
   2846         ));
   2847     }
   2848     let flags = RadrootsSimplexSmpMessageFlags {
   2849         notification: match flags_bytes[0] {
   2850             b'F' => false,
   2851             b'T' => true,
   2852             other => {
   2853                 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
   2854                     "SimpleX received body has invalid notification flag `{other}`"
   2855                 )));
   2856             }
   2857         },
   2858         reserved: flags_bytes[1..].to_vec(),
   2859     };
   2860     Ok(SimplexReceivedBody {
   2861         timestamp,
   2862         flags,
   2863         sent_body: bytes[flags_offset + 1..].to_vec(),
   2864     })
   2865 }
   2866 
   2867 #[cfg(test)]
   2868 mod tests {
   2869     use super::*;
   2870     use alloc::collections::VecDeque;
   2871     use radroots_simplex_smp_crypto::prelude::{
   2872         RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope,
   2873         RadrootsSimplexSmpX25519Keypair,
   2874     };
   2875     use radroots_simplex_smp_proto::prelude::{
   2876         RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpError,
   2877         RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpVersionRange,
   2878     };
   2879     use radroots_simplex_smp_transport::prelude::RadrootsSimplexSmpTransportBlock;
   2880 
   2881     fn invitation_queue() -> RadrootsSimplexSmpQueueUri {
   2882         RadrootsSimplexSmpQueueUri::parse(
   2883             "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m",
   2884         )
   2885         .unwrap()
   2886     }
   2887 
   2888     fn reply_queue() -> RadrootsSimplexSmpQueueUri {
   2889         RadrootsSimplexSmpQueueUri::parse(
   2890             "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cmVwbHk#/?v=4&dh=YmF6cXV4&q=m",
   2891         )
   2892         .unwrap()
   2893     }
   2894 
   2895     fn reply_descriptor() -> RadrootsSimplexAgentQueueDescriptor {
   2896         RadrootsSimplexAgentQueueDescriptor {
   2897             queue_uri: reply_queue(),
   2898             replaced_queue: None,
   2899             primary: true,
   2900             sender_key: None,
   2901         }
   2902     }
   2903 
   2904     fn hello_message(message_id: u64) -> RadrootsSimplexAgentDecryptedMessage {
   2905         RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame {
   2906             header: RadrootsSimplexAgentMessageHeader {
   2907                 message_id,
   2908                 previous_message_hash: Vec::new(),
   2909             },
   2910             message: RadrootsSimplexAgentMessage::Hello,
   2911             padding: Vec::new(),
   2912         })
   2913     }
   2914 
   2915     fn user_message_frame(
   2916         message_id: u64,
   2917         previous_message_hash: Vec<u8>,
   2918         body: &[u8],
   2919     ) -> RadrootsSimplexAgentMessageFrame {
   2920         RadrootsSimplexAgentMessageFrame {
   2921             header: RadrootsSimplexAgentMessageHeader {
   2922                 message_id,
   2923                 previous_message_hash,
   2924             },
   2925             message: RadrootsSimplexAgentMessage::UserMessage(body.to_vec()),
   2926             padding: Vec::new(),
   2927         }
   2928     }
   2929 
   2930     fn agent_message_hash(frame: &RadrootsSimplexAgentMessageFrame) -> Vec<u8> {
   2931         let encoded = encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(
   2932             frame.clone(),
   2933         ))
   2934         .unwrap();
   2935         Sha256::digest(&encoded).to_vec()
   2936     }
   2937 
   2938     #[test]
   2939     fn received_body_decodes_official_message_flags() {
   2940         let mut body = 1_725_555_000_i64.to_be_bytes().to_vec();
   2941         body.extend_from_slice(b"T rr-synth-body");
   2942         let decoded = decode_received_body(&body).unwrap();
   2943         assert_eq!(decoded.timestamp, 1_725_555_000_u64);
   2944         assert!(decoded.flags.notification);
   2945         assert!(decoded.flags.reserved.is_empty());
   2946         assert_eq!(decoded.sent_body, b"rr-synth-body");
   2947 
   2948         body[8] = b'F';
   2949         let decoded = decode_received_body(&body).unwrap();
   2950         assert!(!decoded.flags.notification);
   2951     }
   2952 
   2953     fn receipt_message(
   2954         frame_message_id: u64,
   2955         message_id: u64,
   2956         message_hash: Vec<u8>,
   2957     ) -> RadrootsSimplexAgentDecryptedMessage {
   2958         RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame {
   2959             header: RadrootsSimplexAgentMessageHeader {
   2960                 message_id: frame_message_id,
   2961                 previous_message_hash: Vec::new(),
   2962             },
   2963             message: RadrootsSimplexAgentMessage::Receipt(RadrootsSimplexAgentMessageReceipt {
   2964                 message_id,
   2965                 message_hash,
   2966                 receipt_info: Vec::new(),
   2967             }),
   2968             padding: Vec::new(),
   2969         })
   2970     }
   2971 
   2972     fn mark_connected(runtime: &mut RadrootsSimplexAgentRuntime, connection_id: &str) {
   2973         runtime
   2974             .store
   2975             .set_status(
   2976                 connection_id,
   2977                 RadrootsSimplexAgentConnectionStatus::Connected,
   2978             )
   2979             .unwrap();
   2980     }
   2981 
   2982     fn initialize_test_outbound_official_ratchet(
   2983         runtime: &mut RadrootsSimplexAgentRuntime,
   2984         connection_id: &str,
   2985     ) {
   2986         let local_key_1 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-local-x3dh-1");
   2987         let local_key_2 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-local-x3dh-2");
   2988         let remote_key_1 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-remote-x3dh-1");
   2989         let remote_key_2 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-remote-x3dh-2");
   2990         let remote_params = RadrootsSimplexOfficialX3dhParams {
   2991             version_range: RadrootsSimplexSmpVersionRange::new(
   2992                 RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION,
   2993                 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION,
   2994             )
   2995             .unwrap(),
   2996             key_1: remote_key_1.public_key,
   2997             key_2: remote_key_2.public_key.clone(),
   2998             pq_public_key: None,
   2999             pq_ciphertext: None,
   3000         };
   3001         let sender_init =
   3002             official_x3dh_sender_init(&local_key_1, &local_key_2, &remote_params).unwrap();
   3003         let mut ratchet = RadrootsSimplexSmpRatchetState::responder(
   3004             local_key_2.public_key,
   3005             remote_key_2.public_key,
   3006             None,
   3007         )
   3008         .unwrap();
   3009         ratchet
   3010             .initialize_official_sender(local_key_2.private_key, sender_init)
   3011             .unwrap();
   3012         runtime
   3013             .store
   3014             .connection_mut(connection_id)
   3015             .unwrap()
   3016             .ratchet_state = Some(ratchet);
   3017     }
   3018 
   3019     fn ids_response(
   3020         recipient_id: &[u8],
   3021         sender_id: &[u8],
   3022         seed: &[u8],
   3023     ) -> RadrootsSimplexSmpBrokerMessage {
   3024         RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse {
   3025             recipient_id: recipient_id.to_vec(),
   3026             sender_id: sender_id.to_vec(),
   3027             server_dh_public_key: RadrootsSimplexSmpX25519Keypair::from_seed(seed).public_key,
   3028             queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging),
   3029             link_id: Some(synthetic_link_id(seed)),
   3030             service_id: None,
   3031             server_notification_credentials: None,
   3032         })
   3033     }
   3034 
   3035     fn synthetic_link_id(seed: &[u8]) -> Vec<u8> {
   3036         let mut hasher = Sha256::new();
   3037         hasher.update(b"rr-synth-runtime-link-id");
   3038         hasher.update(seed);
   3039         let digest = hasher.finalize();
   3040         digest[..24].to_vec()
   3041     }
   3042 
   3043     #[derive(Default)]
   3044     struct ScriptedTransport {
   3045         responses: VecDeque<RadrootsSimplexSmpBrokerMessage>,
   3046         subscription_responses: VecDeque<RadrootsSimplexSmpBrokerTransmission>,
   3047         requests: Vec<RadrootsSimplexSmpTransportRequest>,
   3048         subscription_requests: Vec<RadrootsSimplexSmpSubscriptionReceiveRequest>,
   3049     }
   3050 
   3051     impl ScriptedTransport {
   3052         fn with_responses(responses: Vec<RadrootsSimplexSmpBrokerMessage>) -> Self {
   3053             Self {
   3054                 responses: responses.into(),
   3055                 subscription_responses: VecDeque::new(),
   3056                 requests: Vec::new(),
   3057                 subscription_requests: Vec::new(),
   3058             }
   3059         }
   3060 
   3061         fn with_subscription_responses(
   3062             responses: Vec<RadrootsSimplexSmpBrokerTransmission>,
   3063         ) -> Self {
   3064             Self {
   3065                 responses: VecDeque::new(),
   3066                 subscription_responses: responses.into(),
   3067                 requests: Vec::new(),
   3068                 subscription_requests: Vec::new(),
   3069             }
   3070         }
   3071     }
   3072 
   3073     impl RadrootsSimplexSmpCommandTransport for ScriptedTransport {
   3074         type Error = String;
   3075 
   3076         fn execute(
   3077             &mut self,
   3078             request: RadrootsSimplexSmpTransportRequest,
   3079         ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> {
   3080             let correlation_id = request
   3081                 .correlation_id
   3082                 .ok_or_else(|| "missing scripted transport correlation id".to_owned())?;
   3083             let scope = RadrootsSimplexSmpQueueAuthorizationScope::new(
   3084                 b"scripted-session".to_vec(),
   3085                 correlation_id,
   3086                 request.entity_id.clone(),
   3087             )
   3088             .map_err(|error| error.to_string())?;
   3089             let material = RadrootsSimplexSmpQueueAuthorizationMaterial::for_command(
   3090                 &scope,
   3091                 &request.command,
   3092                 request.transport_version,
   3093                 &request.authorization,
   3094             )
   3095             .map_err(|error| error.to_string())?;
   3096             let transmission =
   3097                 radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommandTransmission {
   3098                     authorization: material.authorization,
   3099                     correlation_id: Some(correlation_id),
   3100                     entity_id: request.entity_id.clone(),
   3101                     command: request.command.clone(),
   3102                 };
   3103             let block = RadrootsSimplexSmpTransportBlock::from_current_command_transmissions(&[
   3104                 transmission.clone(),
   3105             ])
   3106             .map_err(|error| error.to_string())?;
   3107             let encoded = block.encode().map_err(|error| error.to_string())?;
   3108             let decoded = RadrootsSimplexSmpTransportBlock::decode(&encoded)
   3109                 .map_err(|error| error.to_string())?;
   3110             let decoded_transmissions = decoded
   3111                 .decode_command_transmissions(request.transport_version)
   3112                 .map_err(|error| error.to_string())?;
   3113             assert_eq!(decoded_transmissions.len(), 1);
   3114             assert_eq!(decoded_transmissions[0], transmission);
   3115 
   3116             let response_message = self
   3117                 .responses
   3118                 .pop_front()
   3119                 .ok_or_else(|| "missing scripted transport response".to_owned())?;
   3120             let response_transmission = RadrootsSimplexSmpBrokerTransmission {
   3121                 authorization: Vec::new(),
   3122                 correlation_id: Some(correlation_id),
   3123                 entity_id: request.entity_id.clone(),
   3124                 message: response_message,
   3125             };
   3126             let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions(
   3127                 &[response_transmission.clone()],
   3128                 request.transport_version,
   3129             )
   3130             .map_err(|error| error.to_string())?;
   3131             let response_encoded = response_block.encode().map_err(|error| error.to_string())?;
   3132             self.requests.push(request.clone());
   3133             Ok(RadrootsSimplexSmpTransportResponse {
   3134                 server: request.server,
   3135                 transport_version: request.transport_version,
   3136                 transmission: response_transmission,
   3137                 transport_hash: Sha256::digest(&response_encoded).to_vec(),
   3138             })
   3139         }
   3140     }
   3141 
   3142     impl RadrootsSimplexSmpSubscriptionTransport for ScriptedTransport {
   3143         fn receive_subscription(
   3144             &mut self,
   3145             request: RadrootsSimplexSmpSubscriptionReceiveRequest,
   3146         ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> {
   3147             self.subscription_requests.push(request.clone());
   3148             let Some(response_transmission) = self.subscription_responses.pop_front() else {
   3149                 return Ok(None);
   3150             };
   3151             let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions(
   3152                 &[response_transmission.clone()],
   3153                 RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   3154             )
   3155             .map_err(|error| error.to_string())?;
   3156             let response_encoded = response_block.encode().map_err(|error| error.to_string())?;
   3157             Ok(Some(RadrootsSimplexSmpTransportResponse {
   3158                 server: request.server,
   3159                 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   3160                 transmission: response_transmission,
   3161                 transport_hash: Sha256::digest(&response_encoded).to_vec(),
   3162             }))
   3163         }
   3164     }
   3165 
   3166     #[test]
   3167     fn create_and_join_commands_execute_through_transport() {
   3168         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3169         let created = runtime
   3170             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3171             .unwrap();
   3172         let invitation = runtime
   3173             .store
   3174             .connection(&created)
   3175             .unwrap()
   3176             .invitation
   3177             .clone()
   3178             .unwrap();
   3179         let joined = runtime
   3180             .join_connection(invitation, reply_queue(), 20)
   3181             .unwrap();
   3182 
   3183         let mut transport = ScriptedTransport::with_responses(vec![
   3184             ids_response(b"recipient", b"sender", b"server-dh"),
   3185             RadrootsSimplexSmpBrokerMessage::Ok,
   3186             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   3187             RadrootsSimplexSmpBrokerMessage::Ok,
   3188             RadrootsSimplexSmpBrokerMessage::Ok,
   3189             RadrootsSimplexSmpBrokerMessage::Ok,
   3190         ]);
   3191         runtime
   3192             .execute_ready_commands(&mut transport, 30, 16)
   3193             .unwrap();
   3194 
   3195         let created_queue = runtime.store.receive_queues(&created).unwrap();
   3196         assert!(created_queue[0].subscribed);
   3197         assert_eq!(transport.requests.len(), 6);
   3198         let RadrootsSimplexSmpCommand::New(create_request) = &transport.requests[0].command else {
   3199             panic!("first request should create the invitation queue");
   3200         };
   3201         let Some(RadrootsSimplexSmpQueueRequestData::Messaging(Some(link_request))) =
   3202             create_request.queue_request_data.as_ref()
   3203         else {
   3204             panic!("invitation NEW should carry short-link messaging data");
   3205         };
   3206         let create_correlation = transport.requests[0]
   3207             .correlation_id
   3208             .as_ref()
   3209             .expect("create command should carry a correlation ID");
   3210         assert_eq!(
   3211             link_request.sender_id,
   3212             short_link_sender_id(create_correlation)
   3213         );
   3214         assert!(!link_request.link_data.fixed_data.is_empty());
   3215         assert!(!link_request.link_data.user_data.is_empty());
   3216         assert!(matches!(
   3217             transport.requests[3].command,
   3218             RadrootsSimplexSmpCommand::Sub
   3219         ));
   3220         assert_eq!(transport.requests[3].entity_id, b"recipient".to_vec());
   3221         assert!(matches!(
   3222             transport.requests[4].command,
   3223             RadrootsSimplexSmpCommand::Sub
   3224         ));
   3225         assert_eq!(transport.requests[4].entity_id, b"recipient-2".to_vec());
   3226         assert!(
   3227             !transport
   3228                 .requests
   3229                 .iter()
   3230                 .any(|request| matches!(request.command, RadrootsSimplexSmpCommand::Get))
   3231         );
   3232         let events = runtime.drain_events(16);
   3233         let Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady { invitation, .. }) =
   3234             events.first()
   3235         else {
   3236             panic!("runtime should emit a short invitation event");
   3237         };
   3238         let rendered = invitation.render().unwrap();
   3239         assert!(rendered.starts_with("simplex:/i#"));
   3240         assert_eq!(
   3241             radroots_simplex_agent_proto::prelude::parse_short_invitation_link(&rendered).unwrap(),
   3242             invitation.clone()
   3243         );
   3244         let short_link = runtime
   3245             .store
   3246             .connection(&created)
   3247             .unwrap()
   3248             .short_link
   3249             .as_ref()
   3250             .unwrap();
   3251         assert_eq!(short_link.link_id, synthetic_link_id(b"server-dh"));
   3252         let link_data_key = derive_invitation_short_link_data_key(&short_link.link_key).unwrap();
   3253         let stored_link_data = RadrootsSimplexSmpQueueLinkData {
   3254             fixed_data: short_link.encrypted_fixed_data.clone().unwrap(),
   3255             user_data: short_link.encrypted_user_data.clone().unwrap(),
   3256         };
   3257         let verified = radroots_simplex_smp_crypto::prelude::decrypt_verify_short_link_data(
   3258             &short_link.link_key,
   3259             &link_data_key,
   3260             &short_link.link_public_signature_key,
   3261             &stored_link_data,
   3262         )
   3263         .unwrap();
   3264         let decoded = radroots_simplex_agent_proto::prelude::decode_short_invitation_fixed_data(
   3265             &verified.fixed_data,
   3266         )
   3267         .unwrap();
   3268         assert_eq!(
   3269             decoded.root_public_signature_key,
   3270             short_link.link_public_signature_key
   3271         );
   3272         assert!(decoded.invitation.connection_id.is_empty());
   3273         let decoded_user_data =
   3274             radroots_simplex_agent_proto::prelude::decode_short_invitation_user_data(
   3275                 &verified.user_data,
   3276             )
   3277             .unwrap();
   3278         assert_eq!(decoded_user_data.user_data, created.as_bytes().to_vec());
   3279         let decrypted_invitation =
   3280             decrypt_short_invitation_link_data(invitation, &stored_link_data).unwrap();
   3281         assert_eq!(
   3282             decrypted_invitation.connection_id,
   3283             created.as_bytes().to_vec()
   3284         );
   3285         assert_eq!(
   3286             runtime.store.connection(&joined).unwrap().status,
   3287             RadrootsSimplexAgentConnectionStatus::JoinPending
   3288         );
   3289     }
   3290 
   3291     #[test]
   3292     fn join_short_invitation_retrieves_link_data_and_continues_join() {
   3293         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3294         let created = runtime
   3295             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3296             .unwrap();
   3297         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3298             ids_response(b"recipient", b"sender", b"server-dh"),
   3299             RadrootsSimplexSmpBrokerMessage::Ok,
   3300         ]);
   3301         runtime
   3302             .execute_ready_commands(&mut setup_transport, 30, 16)
   3303             .unwrap();
   3304         let events = runtime.drain_events(16);
   3305         let Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady {
   3306             invitation: short_invitation,
   3307             ..
   3308         }) = events.first()
   3309         else {
   3310             panic!("runtime should emit a short invitation event");
   3311         };
   3312         let short_link = runtime
   3313             .store
   3314             .connection(&created)
   3315             .unwrap()
   3316             .short_link
   3317             .as_ref()
   3318             .unwrap();
   3319         let stored_link_data = RadrootsSimplexSmpQueueLinkData {
   3320             fixed_data: short_link.encrypted_fixed_data.clone().unwrap(),
   3321             user_data: short_link.encrypted_user_data.clone().unwrap(),
   3322         };
   3323         let joined = runtime
   3324             .join_short_invitation(short_invitation.clone(), reply_queue(), 40)
   3325             .unwrap();
   3326         let mut join_transport = ScriptedTransport::with_responses(vec![
   3327             RadrootsSimplexSmpBrokerMessage::Lnk {
   3328                 sender_id: b"sender".to_vec(),
   3329                 link_data: stored_link_data,
   3330             },
   3331             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   3332             RadrootsSimplexSmpBrokerMessage::Ok,
   3333             RadrootsSimplexSmpBrokerMessage::Ok,
   3334         ]);
   3335         runtime
   3336             .execute_ready_commands(&mut join_transport, 50, 16)
   3337             .unwrap();
   3338 
   3339         assert_eq!(join_transport.requests.len(), 4);
   3340         let RadrootsSimplexSmpCommand::LKey(sender_auth_key) = &join_transport.requests[0].command
   3341         else {
   3342             panic!("short invitation join should authorize link retrieval first");
   3343         };
   3344         let RadrootsSimplexSmpCommandAuthorization::Ed25519(lkey_auth) =
   3345             &join_transport.requests[0].authorization
   3346         else {
   3347             panic!("short invitation link retrieval should be signed");
   3348         };
   3349         assert_eq!(
   3350             sender_auth_key,
   3351             &encode_ed25519_public_key_x509(&lkey_auth.public_key).unwrap()
   3352         );
   3353         assert_eq!(
   3354             join_transport.requests[0].entity_id,
   3355             short_invitation.link_id.clone()
   3356         );
   3357         let RadrootsSimplexSmpCommand::New(_) = &join_transport.requests[1].command else {
   3358             panic!("short invitation join should create the reply queue");
   3359         };
   3360         assert!(matches!(
   3361             join_transport.requests[2].command,
   3362             RadrootsSimplexSmpCommand::Sub
   3363         ));
   3364         assert!(matches!(
   3365             join_transport.requests[3].command,
   3366             RadrootsSimplexSmpCommand::Send(_)
   3367         ));
   3368         let joined_connection = runtime.store.connection(&joined).unwrap();
   3369         assert_eq!(
   3370             joined_connection.status,
   3371             RadrootsSimplexAgentConnectionStatus::JoinPending
   3372         );
   3373         assert_eq!(
   3374             joined_connection.invitation.as_ref().unwrap().connection_id,
   3375             created.as_bytes().to_vec()
   3376         );
   3377         assert_eq!(
   3378             runtime
   3379                 .store
   3380                 .primary_send_queue(&joined)
   3381                 .unwrap()
   3382                 .descriptor
   3383                 .queue_uri
   3384                 .sender_id,
   3385             URL_SAFE_NO_PAD.encode(b"sender")
   3386         );
   3387         let send_auth = runtime
   3388             .store
   3389             .primary_send_queue(&joined)
   3390             .unwrap()
   3391             .auth_state
   3392             .unwrap();
   3393         assert_eq!(send_auth.public_key, lkey_auth.public_key);
   3394         let RadrootsSimplexSmpCommandAuthorization::Ed25519(send_auth_request) =
   3395             &join_transport.requests[3].authorization
   3396         else {
   3397             panic!("short invitation join confirmation should be signed");
   3398         };
   3399         assert_eq!(send_auth_request.public_key, send_auth.public_key);
   3400         assert!(runtime.drain_events(16).iter().any(|event| matches!(
   3401             event,
   3402             RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { connection_id }
   3403                 if connection_id == &joined
   3404         )));
   3405     }
   3406 
   3407     #[test]
   3408     fn join_confirmation_carries_sender_x3dh_params() {
   3409         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3410         let created = runtime
   3411             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3412             .unwrap();
   3413         let invitation = runtime
   3414             .store
   3415             .connection(&created)
   3416             .unwrap()
   3417             .invitation
   3418             .clone()
   3419             .unwrap();
   3420         let joined = runtime
   3421             .join_connection(invitation, reply_queue(), 20)
   3422             .unwrap();
   3423 
   3424         let mut transport = ScriptedTransport::with_responses(vec![
   3425             ids_response(b"recipient", b"sender", b"server-dh"),
   3426             RadrootsSimplexSmpBrokerMessage::Ok,
   3427             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   3428         ]);
   3429         runtime
   3430             .execute_ready_commands(&mut transport, 30, 3)
   3431             .unwrap();
   3432         let local_key_1 = runtime
   3433             .store
   3434             .connection(&joined)
   3435             .unwrap()
   3436             .local_x3dh_key_1
   3437             .clone()
   3438             .unwrap();
   3439         let local_key_2 = runtime
   3440             .store
   3441             .connection(&joined)
   3442             .unwrap()
   3443             .local_x3dh_key_2
   3444             .clone()
   3445             .unwrap();
   3446         let local_pq_keypair = runtime
   3447             .store
   3448             .connection(&joined)
   3449             .unwrap()
   3450             .local_pq_keypair
   3451             .clone()
   3452             .unwrap();
   3453         let ready = runtime.retry_pending(30, 16);
   3454         let confirmation_params = ready
   3455             .into_iter()
   3456             .find_map(|command| match command.kind {
   3457                 RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
   3458                     envelope:
   3459                         RadrootsSimplexAgentEnvelope::Confirmation {
   3460                             reply_queue: true,
   3461                             e2e_ratchet_params: Some(params),
   3462                             ..
   3463                         },
   3464                     ..
   3465                 } => Some(params),
   3466                 _ => None,
   3467             })
   3468             .unwrap();
   3469 
   3470         assert_eq!(confirmation_params.key_1, local_key_1.public_key);
   3471         assert_eq!(confirmation_params.key_2, local_key_2.public_key);
   3472         assert_eq!(
   3473             confirmation_params.pq_public_key,
   3474             Some(local_pq_keypair.public_key)
   3475         );
   3476         assert!(confirmation_params.pq_ciphertext.is_some());
   3477     }
   3478 
   3479     #[test]
   3480     fn confirmation_params_initialize_receiver_ratchet() {
   3481         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3482         let created = runtime
   3483             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3484             .unwrap();
   3485         let invitation = runtime
   3486             .store
   3487             .connection(&created)
   3488             .unwrap()
   3489             .invitation
   3490             .clone()
   3491             .unwrap();
   3492         let joined = runtime
   3493             .join_connection(invitation, reply_queue(), 20)
   3494             .unwrap();
   3495         let joined_connection = runtime.store.connection(&joined).unwrap();
   3496         let joined_key_1 = joined_connection.local_x3dh_key_1.as_ref().unwrap();
   3497         let joined_key_2 = joined_connection.local_x3dh_key_2.as_ref().unwrap();
   3498         let joined_ratchet = joined_connection.ratchet_state.as_ref().unwrap();
   3499         let e2e_ratchet_params = official_x3dh_params_from_parts(
   3500             &joined_key_1.public_key,
   3501             &joined_key_2.public_key,
   3502             joined_ratchet.current_pq_public_key.clone(),
   3503             joined_ratchet.pending_outbound_pq_ciphertext.clone(),
   3504         )
   3505         .unwrap();
   3506         let envelope = RadrootsSimplexAgentEnvelope::Confirmation {
   3507             reply_queue: true,
   3508             e2e_ratchet_params: Some(e2e_ratchet_params),
   3509             encrypted: RadrootsSimplexAgentEncryptedPayload {
   3510                 ratchet_header: None,
   3511                 official_message: Some(Vec::new()),
   3512                 ciphertext: Vec::new(),
   3513             },
   3514         };
   3515 
   3516         runtime
   3517             .initialize_receiver_ratchet_from_confirmation(&created, &envelope)
   3518             .unwrap();
   3519         let mut sender_ratchet = runtime
   3520             .store
   3521             .connection(&joined)
   3522             .unwrap()
   3523             .ratchet_state
   3524             .clone()
   3525             .unwrap();
   3526         let encrypted = sender_ratchet
   3527             .encrypt_official_payload(&[0_u8; 32], b"reply-info", 96)
   3528             .unwrap();
   3529         let receiver_ratchet = runtime
   3530             .store
   3531             .connection_mut(&created)
   3532             .unwrap()
   3533             .ratchet_state
   3534             .as_mut()
   3535             .unwrap();
   3536         let decrypted = receiver_ratchet
   3537             .decrypt_official_payload(&[0_u8; 32], &encrypted)
   3538             .unwrap();
   3539 
   3540         assert_eq!(decrypted, b"reply-info");
   3541         assert!(receiver_ratchet.official_sending_chain_key.is_some());
   3542         assert!(receiver_ratchet.official_receiving_chain_key.is_some());
   3543     }
   3544 
   3545     #[test]
   3546     fn explicit_get_connection_message_executes_smp_get() {
   3547         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3548         let created = runtime
   3549             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3550             .unwrap();
   3551 
   3552         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3553             ids_response(b"recipient", b"sender", b"server-dh"),
   3554             RadrootsSimplexSmpBrokerMessage::Ok,
   3555         ]);
   3556         runtime
   3557             .execute_ready_commands(&mut setup_transport, 30, 16)
   3558             .unwrap();
   3559         assert!(matches!(
   3560             setup_transport.requests[1].command,
   3561             RadrootsSimplexSmpCommand::Sub
   3562         ));
   3563         assert_eq!(setup_transport.requests[1].entity_id, b"recipient".to_vec());
   3564         assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
   3565 
   3566         runtime.get_connection_message(&created, 40).unwrap();
   3567         let mut get_transport =
   3568             ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
   3569         runtime
   3570             .execute_ready_commands(&mut get_transport, 50, 16)
   3571             .unwrap();
   3572 
   3573         assert_eq!(get_transport.requests.len(), 1);
   3574         assert!(matches!(
   3575             get_transport.requests[0].command,
   3576             RadrootsSimplexSmpCommand::Get
   3577         ));
   3578         assert_eq!(get_transport.requests[0].entity_id, b"recipient".to_vec());
   3579         assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
   3580     }
   3581 
   3582     #[test]
   3583     fn get_no_msg_response_is_empty_queue_success() {
   3584         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3585         let created = runtime
   3586             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3587             .unwrap();
   3588 
   3589         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3590             ids_response(b"recipient", b"sender", b"server-dh"),
   3591             RadrootsSimplexSmpBrokerMessage::Ok,
   3592         ]);
   3593         runtime
   3594             .execute_ready_commands(&mut setup_transport, 30, 16)
   3595             .unwrap();
   3596         let _ = runtime.drain_events(16);
   3597 
   3598         runtime.get_connection_message(&created, 40).unwrap();
   3599         let mut get_transport =
   3600             ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err(
   3601                 RadrootsSimplexSmpError::NoMsg,
   3602             )]);
   3603         runtime
   3604             .execute_ready_commands(&mut get_transport, 50, 16)
   3605             .unwrap();
   3606 
   3607         assert_eq!(get_transport.requests.len(), 1);
   3608         assert!(matches!(
   3609             get_transport.requests[0].command,
   3610             RadrootsSimplexSmpCommand::Get
   3611         ));
   3612         assert!(runtime.retry_pending(50, 16).is_empty());
   3613         assert!(!runtime.drain_events(16).iter().any(|event| matches!(
   3614             event,
   3615             RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
   3616         )));
   3617     }
   3618 
   3619     #[test]
   3620     fn subscription_receive_routes_broker_transmission_by_entity_id() {
   3621         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3622         let created = runtime
   3623             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3624             .unwrap();
   3625 
   3626         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3627             ids_response(b"recipient", b"sender", b"server-dh"),
   3628             RadrootsSimplexSmpBrokerMessage::Ok,
   3629         ]);
   3630         runtime
   3631             .execute_ready_commands(&mut setup_transport, 30, 16)
   3632             .unwrap();
   3633         let receive_queue = runtime.store.receive_queues(&created).unwrap()[0].clone();
   3634         let _ = runtime.drain_events(16);
   3635 
   3636         let mut subscription_transport = ScriptedTransport::with_subscription_responses(vec![
   3637             RadrootsSimplexSmpBrokerTransmission {
   3638                 authorization: Vec::new(),
   3639                 correlation_id: None,
   3640                 entity_id: receive_queue.entity_id,
   3641                 message: RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg),
   3642             },
   3643         ]);
   3644         runtime
   3645             .receive_subscription_messages(&mut subscription_transport, 4)
   3646             .unwrap();
   3647 
   3648         assert_eq!(subscription_transport.subscription_requests.len(), 2);
   3649         assert_eq!(
   3650             subscription_transport.subscription_requests[0].server,
   3651             receive_queue.descriptor.queue_uri.server
   3652         );
   3653         assert!(runtime.drain_events(16).is_empty());
   3654     }
   3655 
   3656     #[test]
   3657     fn subscribe_no_msg_response_marks_queue_subscribed() {
   3658         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3659         let created = runtime
   3660             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3661             .unwrap();
   3662 
   3663         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3664             ids_response(b"recipient", b"sender", b"server-dh"),
   3665             RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg),
   3666         ]);
   3667         runtime
   3668             .execute_ready_commands(&mut setup_transport, 30, 16)
   3669             .unwrap();
   3670 
   3671         assert_eq!(setup_transport.requests.len(), 2);
   3672         assert!(matches!(
   3673             setup_transport.requests[1].command,
   3674             RadrootsSimplexSmpCommand::Sub
   3675         ));
   3676         assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
   3677         assert!(!runtime.drain_events(16).iter().any(|event| matches!(
   3678             event,
   3679             RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
   3680         )));
   3681     }
   3682 
   3683     #[test]
   3684     fn inbound_progress_accepts_exact_duplicate_for_latest_ack_target() {
   3685         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3686         let connection_id = runtime
   3687             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3688             .unwrap();
   3689         mark_connected(&mut runtime, &connection_id);
   3690         let first_queue = reply_descriptor().queue_address();
   3691         let second_queue = RadrootsSimplexAgentQueueAddress {
   3692             server: first_queue.server.clone(),
   3693             sender_id: b"second-duplicate-broker".to_vec(),
   3694         };
   3695         let frame = user_message_frame(1, Vec::new(), b"first");
   3696         let frame_hash = agent_message_hash(&frame);
   3697 
   3698         runtime
   3699             .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash)
   3700             .unwrap();
   3701         runtime
   3702             .store
   3703             .record_inbound_message(
   3704                 &connection_id,
   3705                 first_queue,
   3706                 b"first-broker-message".to_vec(),
   3707                 frame.header.message_id,
   3708                 frame_hash.clone(),
   3709             )
   3710             .unwrap();
   3711         runtime
   3712             .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash)
   3713             .unwrap();
   3714         runtime
   3715             .store
   3716             .record_inbound_message(
   3717                 &connection_id,
   3718                 second_queue.clone(),
   3719                 b"second-broker-message".to_vec(),
   3720                 frame.header.message_id,
   3721                 frame_hash,
   3722             )
   3723             .unwrap();
   3724 
   3725         assert_eq!(
   3726             runtime
   3727                 .store
   3728                 .inbound_ack_target(&connection_id, 1, &agent_message_hash(&frame))
   3729                 .unwrap(),
   3730             Some((second_queue, b"second-broker-message".to_vec()))
   3731         );
   3732     }
   3733 
   3734     #[test]
   3735     fn inbound_progress_rejects_gap_and_previous_hash_mismatch() {
   3736         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3737         let connection_id = runtime
   3738             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3739             .unwrap();
   3740         mark_connected(&mut runtime, &connection_id);
   3741         let queue = reply_descriptor().queue_address();
   3742         let first_frame = user_message_frame(1, Vec::new(), b"first");
   3743         let first_hash = agent_message_hash(&first_frame);
   3744         runtime
   3745             .store
   3746             .record_inbound_message(
   3747                 &connection_id,
   3748                 queue,
   3749                 b"first-broker-message".to_vec(),
   3750                 first_frame.header.message_id,
   3751                 first_hash.clone(),
   3752             )
   3753             .unwrap();
   3754 
   3755         let gap_frame = user_message_frame(3, first_hash.clone(), b"gap");
   3756         let gap_error = runtime
   3757             .validate_inbound_frame_progress(
   3758                 &connection_id,
   3759                 &gap_frame,
   3760                 &agent_message_hash(&gap_frame),
   3761             )
   3762             .unwrap_err();
   3763         assert!(gap_error.to_string().contains("skipped expected `2`"));
   3764 
   3765         let mismatch_frame = user_message_frame(2, b"wrong-previous-hash".to_vec(), b"second");
   3766         let mismatch_error = runtime
   3767             .validate_inbound_frame_progress(
   3768                 &connection_id,
   3769                 &mismatch_frame,
   3770                 &agent_message_hash(&mismatch_frame),
   3771             )
   3772             .unwrap_err();
   3773         assert!(
   3774             mismatch_error
   3775                 .to_string()
   3776                 .contains("unexpected previous-message hash")
   3777         );
   3778     }
   3779 
   3780     #[test]
   3781     fn inbound_progress_allows_first_visible_user_message_after_missing_peer_hello() {
   3782         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3783         let connection_id = runtime
   3784             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3785             .unwrap();
   3786         mark_connected(&mut runtime, &connection_id);
   3787         runtime
   3788             .store
   3789             .connection_mut(&connection_id)
   3790             .unwrap()
   3791             .hello_received = false;
   3792 
   3793         let frame = user_message_frame(2, b"peer-hello-hash".to_vec(), b"first visible");
   3794         let frame_hash = agent_message_hash(&frame);
   3795 
   3796         runtime
   3797             .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash)
   3798             .unwrap();
   3799     }
   3800 
   3801     #[test]
   3802     fn inbound_progress_rejects_regression_after_accepted_next_message() {
   3803         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3804         let connection_id = runtime
   3805             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3806             .unwrap();
   3807         mark_connected(&mut runtime, &connection_id);
   3808         let queue = reply_descriptor().queue_address();
   3809         let first_frame = user_message_frame(1, Vec::new(), b"first");
   3810         let first_hash = agent_message_hash(&first_frame);
   3811         let second_frame = user_message_frame(2, first_hash.clone(), b"second");
   3812         let second_hash = agent_message_hash(&second_frame);
   3813         runtime
   3814             .store
   3815             .record_inbound_message(
   3816                 &connection_id,
   3817                 queue.clone(),
   3818                 b"first-broker-message".to_vec(),
   3819                 first_frame.header.message_id,
   3820                 first_hash,
   3821             )
   3822             .unwrap();
   3823         runtime
   3824             .validate_inbound_frame_progress(&connection_id, &second_frame, &second_hash)
   3825             .unwrap();
   3826         runtime
   3827             .store
   3828             .record_inbound_message(
   3829                 &connection_id,
   3830                 queue,
   3831                 b"second-broker-message".to_vec(),
   3832                 second_frame.header.message_id,
   3833                 second_hash,
   3834             )
   3835             .unwrap();
   3836 
   3837         let regression_frame = user_message_frame(1, Vec::new(), b"first");
   3838         let regression_error = runtime
   3839             .validate_inbound_frame_progress(
   3840                 &connection_id,
   3841                 &regression_frame,
   3842                 &agent_message_hash(&regression_frame),
   3843             )
   3844             .unwrap_err();
   3845         assert!(regression_error.to_string().contains("regressed below `2`"));
   3846     }
   3847 
   3848     #[test]
   3849     fn send_message_requires_connected_state() {
   3850         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3851         let created = runtime
   3852             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3853             .unwrap();
   3854         let invitation = runtime
   3855             .store
   3856             .connection(&created)
   3857             .unwrap()
   3858             .invitation
   3859             .clone()
   3860             .unwrap();
   3861         let joined = runtime
   3862             .join_connection(invitation, reply_queue(), 20)
   3863             .unwrap();
   3864 
   3865         let error = runtime
   3866             .send_message(&joined, b"blocked before connected".to_vec(), 30)
   3867             .unwrap_err();
   3868         assert!(error.to_string().contains("is not connected"));
   3869     }
   3870 
   3871     #[test]
   3872     fn allow_and_hello_lifecycle_reaches_connected() {
   3873         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3874         let created = runtime
   3875             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3876             .unwrap();
   3877         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3878             ids_response(b"recipient", b"sender", b"server-dh"),
   3879             RadrootsSimplexSmpBrokerMessage::Ok,
   3880         ]);
   3881         runtime
   3882             .execute_ready_commands(&mut setup_transport, 30, 16)
   3883             .unwrap();
   3884         runtime
   3885             .store
   3886             .connection_mut(&created)
   3887             .unwrap()
   3888             .shared_secret = Some(vec![3_u8; 32]);
   3889 
   3890         runtime
   3891             .handle_inbound_decrypted_message(
   3892                 &created,
   3893                 RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply {
   3894                     reply_queues: vec![reply_descriptor()],
   3895                     info: b"peer-info".to_vec(),
   3896                 },
   3897                 b"reply-confirmation".to_vec(),
   3898             )
   3899             .unwrap();
   3900         assert_eq!(
   3901             runtime.store.connection(&created).unwrap().status,
   3902             RadrootsSimplexAgentConnectionStatus::AwaitingApproval
   3903         );
   3904         runtime
   3905             .handle_inbound_decrypted_message(
   3906                 &created,
   3907                 RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply {
   3908                     reply_queues: vec![reply_descriptor()],
   3909                     info: b"peer-info".to_vec(),
   3910                 },
   3911                 b"reply-confirmation-duplicate".to_vec(),
   3912             )
   3913             .unwrap();
   3914         initialize_test_outbound_official_ratchet(&mut runtime, &created);
   3915 
   3916         runtime
   3917             .allow_connection(&created, b"local-info".to_vec(), 40)
   3918             .unwrap();
   3919         let mut allow_transport = ScriptedTransport::with_responses(vec![
   3920             RadrootsSimplexSmpBrokerMessage::Ok,
   3921             RadrootsSimplexSmpBrokerMessage::Ok,
   3922             RadrootsSimplexSmpBrokerMessage::Ok,
   3923         ]);
   3924         runtime
   3925             .execute_ready_commands(&mut allow_transport, 50, 16)
   3926             .unwrap();
   3927         assert_eq!(allow_transport.requests.len(), 3);
   3928         assert!(matches!(
   3929             allow_transport.requests[0].command,
   3930             RadrootsSimplexSmpCommand::SKey(_)
   3931         ));
   3932         assert!(matches!(
   3933             allow_transport.requests[1].command,
   3934             RadrootsSimplexSmpCommand::Send(_)
   3935         ));
   3936         assert!(matches!(
   3937             allow_transport.requests[2].command,
   3938             RadrootsSimplexSmpCommand::Send(_)
   3939         ));
   3940         let connection = runtime.store.connection(&created).unwrap();
   3941         assert_eq!(
   3942             connection.status,
   3943             RadrootsSimplexAgentConnectionStatus::Connected
   3944         );
   3945         assert!(connection.hello_sent);
   3946         assert!(!connection.hello_received);
   3947 
   3948         runtime
   3949             .handle_inbound_decrypted_message(&created, hello_message(1), b"hello-in".to_vec())
   3950             .unwrap();
   3951         let connection = runtime.store.connection(&created).unwrap();
   3952         assert_eq!(
   3953             connection.status,
   3954             RadrootsSimplexAgentConnectionStatus::Connected
   3955         );
   3956         assert!(connection.hello_sent);
   3957         assert!(connection.hello_received);
   3958         assert!(runtime.drain_events(16).into_iter().any(|event| matches!(
   3959             event,
   3960             RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { connection_id }
   3961                 if connection_id == created
   3962         )));
   3963         let mut hello_transport = ScriptedTransport::with_responses(vec![]);
   3964         runtime
   3965             .execute_ready_commands(&mut hello_transport, 60, 16)
   3966             .unwrap();
   3967         assert!(hello_transport.requests.is_empty());
   3968     }
   3969 
   3970     #[test]
   3971     fn delivered_send_confirms_cursor_only_after_transport_success() {
   3972         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   3973         let created = runtime
   3974             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   3975             .unwrap();
   3976         let invitation = runtime
   3977             .store
   3978             .connection(&created)
   3979             .unwrap()
   3980             .invitation
   3981             .clone()
   3982             .unwrap();
   3983         let joined = runtime
   3984             .join_connection(invitation, reply_queue(), 20)
   3985             .unwrap();
   3986 
   3987         let mut setup_transport = ScriptedTransport::with_responses(vec![
   3988             ids_response(b"recipient", b"sender", b"server-dh"),
   3989             RadrootsSimplexSmpBrokerMessage::Ok,
   3990             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   3991             RadrootsSimplexSmpBrokerMessage::Ok,
   3992             RadrootsSimplexSmpBrokerMessage::Ok,
   3993             RadrootsSimplexSmpBrokerMessage::Ok,
   3994         ]);
   3995         runtime
   3996             .execute_ready_commands(&mut setup_transport, 30, 16)
   3997             .unwrap();
   3998         mark_connected(&mut runtime, &joined);
   3999 
   4000         let message_id = runtime
   4001             .send_message(&joined, b"hello simplex".to_vec(), 40)
   4002             .unwrap();
   4003         assert_eq!(message_id, 1);
   4004         assert_eq!(
   4005             runtime
   4006                 .store
   4007                 .connection(&joined)
   4008                 .unwrap()
   4009                 .delivery_cursor
   4010                 .last_sent_message_id,
   4011             None
   4012         );
   4013 
   4014         let mut delivery_transport =
   4015             ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
   4016         runtime
   4017             .execute_ready_commands(&mut delivery_transport, 50, 16)
   4018             .unwrap();
   4019 
   4020         let cursor = &runtime.store.connection(&joined).unwrap().delivery_cursor;
   4021         assert_eq!(cursor.last_sent_message_id, Some(1));
   4022         assert!(cursor.last_sent_message_hash.is_some());
   4023         assert_eq!(
   4024             runtime
   4025                 .store
   4026                 .connection(&joined)
   4027                 .unwrap()
   4028                 .staged_outbound_message,
   4029             None
   4030         );
   4031         assert!(runtime.drain_events(64).into_iter().any(|event| matches!(
   4032             event,
   4033             RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered {
   4034                 connection_id,
   4035                 message_id: 1,
   4036                 message_hash,
   4037             } if connection_id == joined && !message_hash.is_empty()
   4038         )));
   4039     }
   4040 
   4041     #[test]
   4042     fn peer_receipt_requires_stored_outbound_message_hash() {
   4043         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4044         let connection_id = runtime
   4045             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4046             .unwrap();
   4047         let prepared = runtime
   4048             .store
   4049             .prepare_outbound_message(&connection_id, b"outbound-message-hash".to_vec())
   4050             .unwrap();
   4051         runtime
   4052             .store
   4053             .confirm_outbound_message(&connection_id, prepared.message_id)
   4054             .unwrap();
   4055 
   4056         runtime
   4057             .handle_inbound_decrypted_message(
   4058                 &connection_id,
   4059                 receipt_message(1, prepared.message_id, b"outbound-message-hash".to_vec()),
   4060                 b"receipt-frame".to_vec(),
   4061             )
   4062             .unwrap();
   4063 
   4064         assert!(runtime.drain_events(16).into_iter().any(|event| matches!(
   4065             event,
   4066             RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged {
   4067                 connection_id: event_connection_id,
   4068                 message_id,
   4069                 message_hash,
   4070             } if event_connection_id == connection_id
   4071                 && message_id == prepared.message_id
   4072                 && message_hash == b"outbound-message-hash".to_vec()
   4073         )));
   4074     }
   4075 
   4076     #[test]
   4077     fn peer_receipt_rejects_hash_mismatch() {
   4078         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4079         let connection_id = runtime
   4080             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4081             .unwrap();
   4082         let prepared = runtime
   4083             .store
   4084             .prepare_outbound_message(&connection_id, b"outbound-message-hash".to_vec())
   4085             .unwrap();
   4086         runtime
   4087             .store
   4088             .confirm_outbound_message(&connection_id, prepared.message_id)
   4089             .unwrap();
   4090 
   4091         let error = runtime
   4092             .handle_inbound_decrypted_message(
   4093                 &connection_id,
   4094                 receipt_message(1, prepared.message_id, b"wrong-hash".to_vec()),
   4095                 b"receipt-frame".to_vec(),
   4096             )
   4097             .unwrap_err();
   4098 
   4099         assert!(
   4100             error
   4101                 .to_string()
   4102                 .contains("did not match stored outbound message hash")
   4103         );
   4104         assert!(!runtime.drain_events(16).into_iter().any(|event| matches!(
   4105             event,
   4106             RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { .. }
   4107         )));
   4108     }
   4109 
   4110     #[test]
   4111     fn send_message_stores_opaque_encrypted_agent_payload() {
   4112         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4113         let created = runtime
   4114             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4115             .unwrap();
   4116         let invitation = runtime
   4117             .store
   4118             .connection(&created)
   4119             .unwrap()
   4120             .invitation
   4121             .clone()
   4122             .unwrap();
   4123         let joined = runtime
   4124             .join_connection(invitation, reply_queue(), 20)
   4125             .unwrap();
   4126 
   4127         let mut setup_transport = ScriptedTransport::with_responses(vec![
   4128             ids_response(b"recipient", b"sender", b"server-dh"),
   4129             RadrootsSimplexSmpBrokerMessage::Ok,
   4130             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   4131             RadrootsSimplexSmpBrokerMessage::Ok,
   4132             RadrootsSimplexSmpBrokerMessage::Ok,
   4133             RadrootsSimplexSmpBrokerMessage::Ok,
   4134         ]);
   4135         runtime
   4136             .execute_ready_commands(&mut setup_transport, 30, 16)
   4137             .unwrap();
   4138         mark_connected(&mut runtime, &joined);
   4139 
   4140         runtime
   4141             .send_message(&joined, b"hello simplex".to_vec(), 40)
   4142             .unwrap();
   4143         let command = runtime.retry_pending(40, 16).remove(0);
   4144         let RadrootsSimplexAgentPendingCommandKind::SendEnvelope { envelope, .. } = command.kind
   4145         else {
   4146             panic!("expected send envelope command");
   4147         };
   4148         let RadrootsSimplexAgentEnvelope::Message(encrypted) = envelope else {
   4149             panic!("expected encrypted message envelope");
   4150         };
   4151         let expected_plaintext = encode_decrypted_message(
   4152             &RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame {
   4153                 header: RadrootsSimplexAgentMessageHeader {
   4154                     message_id: 1,
   4155                     previous_message_hash: Vec::new(),
   4156                 },
   4157                 message: RadrootsSimplexAgentMessage::UserMessage(b"hello simplex".to_vec()),
   4158                 padding: Vec::new(),
   4159             }),
   4160         )
   4161         .unwrap();
   4162 
   4163         assert!(encrypted.ratchet_header.is_none());
   4164         assert!(encrypted.ciphertext.is_empty());
   4165         let official_message = encrypted.official_message.as_ref().unwrap();
   4166         assert_ne!(official_message, &expected_plaintext);
   4167         assert_eq!(
   4168             official_message.len(),
   4169             2 + 124 + 16 + SIMPLEX_AGENT_E2E_MESSAGE_LENGTH
   4170         );
   4171     }
   4172 
   4173     #[test]
   4174     fn transport_retry_keeps_staged_outbound_message() {
   4175         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4176         let created = runtime
   4177             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4178             .unwrap();
   4179         let invitation = runtime
   4180             .store
   4181             .connection(&created)
   4182             .unwrap()
   4183             .invitation
   4184             .clone()
   4185             .unwrap();
   4186         let joined = runtime
   4187             .join_connection(invitation, reply_queue(), 20)
   4188             .unwrap();
   4189 
   4190         let mut setup_transport = ScriptedTransport::with_responses(vec![
   4191             ids_response(b"recipient", b"sender", b"server-dh"),
   4192             RadrootsSimplexSmpBrokerMessage::Ok,
   4193             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   4194             RadrootsSimplexSmpBrokerMessage::Ok,
   4195             RadrootsSimplexSmpBrokerMessage::Ok,
   4196             RadrootsSimplexSmpBrokerMessage::Ok,
   4197         ]);
   4198         runtime
   4199             .execute_ready_commands(&mut setup_transport, 30, 16)
   4200             .unwrap();
   4201         mark_connected(&mut runtime, &joined);
   4202 
   4203         runtime
   4204             .send_message(&joined, b"hello simplex".to_vec(), 40)
   4205             .unwrap();
   4206 
   4207         struct FailingTransport;
   4208         impl RadrootsSimplexSmpCommandTransport for FailingTransport {
   4209             type Error = String;
   4210             fn execute(
   4211                 &mut self,
   4212                 _request: RadrootsSimplexSmpTransportRequest,
   4213             ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> {
   4214                 Err("synthetic failure".to_owned())
   4215             }
   4216         }
   4217 
   4218         runtime
   4219             .execute_ready_commands(&mut FailingTransport, 50, 16)
   4220             .unwrap();
   4221 
   4222         assert_eq!(
   4223             runtime
   4224                 .store
   4225                 .connection(&joined)
   4226                 .unwrap()
   4227                 .delivery_cursor
   4228                 .last_sent_message_id,
   4229             None
   4230         );
   4231         assert_eq!(
   4232             runtime
   4233                 .store
   4234                 .connection(&joined)
   4235                 .unwrap()
   4236                 .staged_outbound_message
   4237                 .as_ref()
   4238                 .map(|message| message.message_id),
   4239             Some(1)
   4240         );
   4241         let ready_again = runtime.retry_pending(50 + 5_000, 16);
   4242         assert_eq!(ready_again.len(), 1);
   4243     }
   4244 
   4245     #[cfg(feature = "std")]
   4246     #[test]
   4247     fn builder_opens_persistent_store_path() {
   4248         let tempdir = tempfile::tempdir().unwrap();
   4249         let path = tempdir.path().join("runtime-store.json");
   4250         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new()
   4251             .persistent_store_path(&path)
   4252             .build()
   4253             .unwrap();
   4254         runtime
   4255             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4256             .unwrap();
   4257         assert!(path.exists());
   4258     }
   4259 
   4260     #[cfg(feature = "std")]
   4261     #[test]
   4262     fn resume_subscriptions_requeues_persisted_receive_queues_after_restart() {
   4263         let tempdir = tempfile::tempdir().unwrap();
   4264         let path = tempdir.path().join("runtime-store.json");
   4265         {
   4266             let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new()
   4267                 .persistent_store_path(&path)
   4268                 .build()
   4269                 .unwrap();
   4270             let created = runtime
   4271                 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4272                 .unwrap();
   4273             let mut setup_transport = ScriptedTransport::with_responses(vec![
   4274                 ids_response(b"recipient", b"sender", b"server-dh"),
   4275                 RadrootsSimplexSmpBrokerMessage::Ok,
   4276             ]);
   4277             runtime
   4278                 .execute_ready_commands(&mut setup_transport, 30, 16)
   4279                 .unwrap();
   4280             assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
   4281         }
   4282 
   4283         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new()
   4284             .persistent_store_path(&path)
   4285             .build()
   4286             .unwrap();
   4287         assert_eq!(runtime.resume_subscriptions(40).unwrap(), 1);
   4288         assert_eq!(runtime.resume_subscriptions(41).unwrap(), 0);
   4289         let mut subscription_transport =
   4290             ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
   4291         runtime
   4292             .execute_ready_commands(&mut subscription_transport, 50, 16)
   4293             .unwrap();
   4294 
   4295         assert_eq!(subscription_transport.requests.len(), 1);
   4296         assert!(matches!(
   4297             subscription_transport.requests[0].command,
   4298             RadrootsSimplexSmpCommand::Sub
   4299         ));
   4300         assert_eq!(
   4301             subscription_transport.requests[0].entity_id,
   4302             b"recipient".to_vec()
   4303         );
   4304         assert!(runtime.drain_events(16).into_iter().any(|event| matches!(
   4305             event,
   4306             RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { .. }
   4307         )));
   4308     }
   4309 
   4310     #[test]
   4311     fn ack_no_msg_response_is_idempotently_delivered() {
   4312         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4313         let created = runtime
   4314             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4315             .unwrap();
   4316 
   4317         let mut setup_transport = ScriptedTransport::with_responses(vec![
   4318             ids_response(b"recipient", b"sender", b"server-dh"),
   4319             RadrootsSimplexSmpBrokerMessage::Ok,
   4320         ]);
   4321         runtime
   4322             .execute_ready_commands(&mut setup_transport, 30, 16)
   4323             .unwrap();
   4324         mark_connected(&mut runtime, &created);
   4325         let _ = runtime.drain_events(16);
   4326 
   4327         let receive_queue = runtime.store.receive_queues(&created).unwrap()[0]
   4328             .descriptor
   4329             .queue_address();
   4330         let frame = user_message_frame(7, Vec::new(), b"ack target");
   4331         let message_hash = agent_message_hash(&frame);
   4332         runtime
   4333             .store
   4334             .record_inbound_message(
   4335                 &created,
   4336                 receive_queue,
   4337                 b"already-acked-broker-message".to_vec(),
   4338                 frame.header.message_id,
   4339                 message_hash.clone(),
   4340             )
   4341             .unwrap();
   4342         runtime
   4343             .ack_message(
   4344                 &created,
   4345                 frame.header.message_id,
   4346                 message_hash.clone(),
   4347                 Vec::new(),
   4348                 40,
   4349             )
   4350             .unwrap();
   4351 
   4352         let mut ack_transport =
   4353             ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err(
   4354                 RadrootsSimplexSmpError::NoMsg,
   4355             )]);
   4356         runtime
   4357             .execute_ready_commands(&mut ack_transport, 50, 16)
   4358             .unwrap();
   4359 
   4360         assert_eq!(ack_transport.requests.len(), 1);
   4361         assert!(matches!(
   4362             ack_transport.requests[0].command,
   4363             RadrootsSimplexSmpCommand::Ack(_)
   4364         ));
   4365         assert!(runtime.retry_pending(50, 16).is_empty());
   4366         let events = runtime.drain_events(16);
   4367         assert!(events.iter().any(|event| matches!(
   4368             event,
   4369             RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered {
   4370                 connection_id,
   4371                 message_id: 7,
   4372                 message_hash: delivered_hash,
   4373             } if connection_id == &created && delivered_hash == &message_hash
   4374         )));
   4375         assert!(!events.iter().any(|event| matches!(
   4376             event,
   4377             RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
   4378         )));
   4379     }
   4380 
   4381     #[test]
   4382     fn manual_record_command_failure_clears_staged_delivery_state() {
   4383         let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
   4384         let created = runtime
   4385             .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
   4386             .unwrap();
   4387         let invitation = runtime
   4388             .store
   4389             .connection(&created)
   4390             .unwrap()
   4391             .invitation
   4392             .clone()
   4393             .unwrap();
   4394         let joined = runtime
   4395             .join_connection(invitation, reply_queue(), 20)
   4396             .unwrap();
   4397 
   4398         let mut setup_transport = ScriptedTransport::with_responses(vec![
   4399             ids_response(b"recipient", b"sender", b"server-dh"),
   4400             RadrootsSimplexSmpBrokerMessage::Ok,
   4401             ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
   4402             RadrootsSimplexSmpBrokerMessage::Ok,
   4403             RadrootsSimplexSmpBrokerMessage::Ok,
   4404             RadrootsSimplexSmpBrokerMessage::Ok,
   4405         ]);
   4406         runtime
   4407             .execute_ready_commands(&mut setup_transport, 30, 16)
   4408             .unwrap();
   4409         mark_connected(&mut runtime, &joined);
   4410 
   4411         runtime
   4412             .send_message(&joined, b"hello simplex".to_vec(), 40)
   4413             .unwrap();
   4414         let command = runtime.retry_pending(40, 16).remove(0);
   4415         runtime
   4416             .record_command_outcome(
   4417                 command.id,
   4418                 RadrootsSimplexAgentCommandOutcome::Failed {
   4419                     message: "synthetic failure".into(),
   4420                 },
   4421             )
   4422             .unwrap();
   4423         assert_eq!(
   4424             runtime
   4425                 .store
   4426                 .connection(&joined)
   4427                 .unwrap()
   4428                 .staged_outbound_message,
   4429             None
   4430         );
   4431     }
   4432 }