runtime.rs (179625B)
1 use crate::error::RadrootsSimplexAgentRuntimeError; 2 use crate::types::{RadrootsSimplexAgentCommandOutcome, RadrootsSimplexAgentRuntimeEvent}; 3 use alloc::collections::VecDeque; 4 use alloc::format; 5 use alloc::string::{String, ToString}; 6 use alloc::vec; 7 use alloc::vec::Vec; 8 use base64::Engine as _; 9 use base64::engine::general_purpose::{URL_SAFE, URL_SAFE_NO_PAD}; 10 use radroots_simplex_agent_proto::prelude::{ 11 RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentConnectionMode, 12 RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentDecryptedMessage, 13 RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentEnvelope, 14 RadrootsSimplexAgentMessage, RadrootsSimplexAgentMessageFrame, 15 RadrootsSimplexAgentMessageHeader, RadrootsSimplexAgentMessageReceipt, 16 RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor, 17 RadrootsSimplexAgentShortInvitationLink, RadrootsSimplexAgentShortLinkScheme, 18 decode_decrypted_message, decode_envelope, decode_short_invitation_fixed_data, 19 decode_short_invitation_user_data, encode_decrypted_message, encode_envelope, 20 encode_short_invitation_fixed_data, encode_short_invitation_user_data, 21 }; 22 use radroots_simplex_agent_store::prelude::{ 23 RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentPendingCommand, 24 RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentPqKeypair, 25 RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentQueueRole, 26 RadrootsSimplexAgentShortLinkCredentials, RadrootsSimplexAgentStore, 27 RadrootsSimplexAgentX3dhKeypair, 28 }; 29 use radroots_simplex_smp_crypto::prelude::{ 30 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION, 31 RADROOTS_SIMPLEX_SMP_NONCE_LENGTH, RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH, 32 RadrootsSimplexOfficialSntrup761Keypair, RadrootsSimplexOfficialX3dhParams, 33 RadrootsSimplexOfficialX448Keypair, RadrootsSimplexSmpCommandAuthorization, 34 RadrootsSimplexSmpCryptoError, RadrootsSimplexSmpEd25519Keypair, 35 RadrootsSimplexSmpRatchetState, RadrootsSimplexSmpX25519Keypair, decode_x25519_public_key_x509, 36 decrypt_padded, decrypt_short_link_data, derive_invitation_short_link_data_key, 37 derive_shared_secret, encode_ed25519_public_key_x509, encode_x25519_public_key_x509, 38 encrypt_padded, encrypt_short_link_data, official_sntrup761_keypair_from_seed, 39 official_x3dh_receiver_init, official_x3dh_receiver_init_accepting_pq, 40 official_x3dh_sender_init, official_x3dh_sender_init_accepting_pq, 41 official_x448_keypair_from_seed, random_nonce, sign_short_link_data, 42 verify_signed_short_link_data, 43 }; 44 use radroots_simplex_smp_proto::prelude::{ 45 RADROOTS_SIMPLEX_SMP_CURRENT_CLIENT_VERSION, RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 46 RadrootsSimplexSmpBrokerMessage, RadrootsSimplexSmpCommand, RadrootsSimplexSmpCorrelationId, 47 RadrootsSimplexSmpError, RadrootsSimplexSmpMessageFlags, 48 RadrootsSimplexSmpMessagingQueueRequest, RadrootsSimplexSmpNewQueueRequest, 49 RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpQueueLinkData, 50 RadrootsSimplexSmpQueueMode, RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpQueueUri, 51 RadrootsSimplexSmpSendCommand, RadrootsSimplexSmpServerAddress, 52 RadrootsSimplexSmpSubscriptionMode, RadrootsSimplexSmpVersionRange, 53 }; 54 use radroots_simplex_smp_transport::prelude::{ 55 RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest, 56 RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest, 57 RadrootsSimplexSmpTransportResponse, 58 }; 59 use sha2::{Digest, Sha256}; 60 use sha3::Sha3_384; 61 #[cfg(feature = "std")] 62 use std::path::{Path, PathBuf}; 63 64 const SIMPLEX_E2E_CONFIRMATION_LENGTH: usize = 15_904; 65 const SIMPLEX_E2E_MESSAGE_LENGTH: usize = 16_000; 66 const SIMPLEX_AGENT_E2E_CONN_INFO_LENGTH: usize = 14_832; 67 const SIMPLEX_AGENT_E2E_CONN_INFO_PQ_LENGTH: usize = 11_106; 68 const SIMPLEX_AGENT_E2E_MESSAGE_LENGTH: usize = 15_840; 69 const SIMPLEX_AGENT_E2E_MESSAGE_PQ_LENGTH: usize = 13_618; 70 71 #[derive(Debug, Clone)] 72 struct SimplexClientMessageEnvelope { 73 sender_public_key: Option<Vec<u8>>, 74 nonce: [u8; RADROOTS_SIMPLEX_SMP_NONCE_LENGTH], 75 ciphertext: Vec<u8>, 76 } 77 78 #[derive(Debug, Clone, Copy)] 79 enum SimplexAgentPayloadKind { 80 ConnectionInfo, 81 Message, 82 } 83 84 #[derive(Debug, Clone)] 85 struct SimplexReceivedBody { 86 timestamp: u64, 87 flags: RadrootsSimplexSmpMessageFlags, 88 sent_body: Vec<u8>, 89 } 90 91 struct SimplexPreparedShortInvitationLinkData { 92 link_key: Vec<u8>, 93 link_public_signature_key: Vec<u8>, 94 link_private_signature_key: Vec<u8>, 95 encrypted_link_data: RadrootsSimplexSmpQueueLinkData, 96 } 97 98 pub fn decrypt_short_invitation_link_data( 99 invitation: &RadrootsSimplexAgentShortInvitationLink, 100 encrypted_link_data: &RadrootsSimplexSmpQueueLinkData, 101 ) -> Result<RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentRuntimeError> { 102 let link_data_key = derive_invitation_short_link_data_key(&invitation.link_key) 103 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 104 let signed_link_data = decrypt_short_link_data(&link_data_key, encrypted_link_data) 105 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 106 if signed_link_data.fixed_data.len() <= RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH { 107 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 108 "SimpleX short invitation fixed data is missing its signed payload".into(), 109 )); 110 } 111 let fixed_payload = 112 &signed_link_data.fixed_data[RADROOTS_SIMPLEX_SMP_SHORT_LINK_SIGNATURE_LENGTH..]; 113 let mut fixed_data = decode_short_invitation_fixed_data(fixed_payload)?; 114 let verified = verify_signed_short_link_data( 115 &invitation.link_key, 116 &fixed_data.root_public_signature_key, 117 &signed_link_data, 118 ) 119 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 120 let user_data = decode_short_invitation_user_data(&verified.user_data)?; 121 if !fixed_data.invitation.connection_id.is_empty() 122 && fixed_data.invitation.connection_id != user_data.user_data 123 { 124 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 125 "SimpleX short invitation user data does not match the fixed connection link".into(), 126 )); 127 } 128 fixed_data.invitation.connection_id = user_data.user_data; 129 Ok(fixed_data.invitation) 130 } 131 132 pub struct RadrootsSimplexAgentRuntimeBuilder { 133 store: Option<RadrootsSimplexAgentStore>, 134 queue_capacity: usize, 135 retry_delay_ms: u64, 136 #[cfg(feature = "std")] 137 persistent_store_path: Option<PathBuf>, 138 } 139 140 impl RadrootsSimplexAgentRuntimeBuilder { 141 pub const DEFAULT_QUEUE_CAPACITY: usize = 2_048; 142 pub const DEFAULT_RETRY_DELAY_MS: u64 = 5_000; 143 144 pub fn new() -> Self { 145 Self { 146 store: None, 147 queue_capacity: Self::DEFAULT_QUEUE_CAPACITY, 148 retry_delay_ms: Self::DEFAULT_RETRY_DELAY_MS, 149 #[cfg(feature = "std")] 150 persistent_store_path: None, 151 } 152 } 153 154 pub fn store(mut self, store: RadrootsSimplexAgentStore) -> Self { 155 self.store = Some(store); 156 self 157 } 158 159 #[cfg(feature = "std")] 160 pub fn persistent_store_path(mut self, path: impl AsRef<Path>) -> Self { 161 self.persistent_store_path = Some(path.as_ref().to_path_buf()); 162 self 163 } 164 165 pub fn queue_capacity(mut self, queue_capacity: usize) -> Self { 166 self.queue_capacity = queue_capacity; 167 self 168 } 169 170 pub fn retry_delay_ms(mut self, retry_delay_ms: u64) -> Self { 171 self.retry_delay_ms = retry_delay_ms; 172 self 173 } 174 175 pub fn build(self) -> Result<RadrootsSimplexAgentRuntime, RadrootsSimplexAgentRuntimeError> { 176 if self.queue_capacity == 0 { 177 return Err(RadrootsSimplexAgentRuntimeError::InvalidConfig( 178 "queue_capacity", 179 )); 180 } 181 #[cfg(feature = "std")] 182 let store = match (self.store, self.persistent_store_path) { 183 (Some(mut store), Some(path)) => { 184 store.set_persistence_path(path); 185 store 186 } 187 (Some(store), None) => store, 188 (None, Some(path)) => RadrootsSimplexAgentStore::open(path)?, 189 (None, None) => RadrootsSimplexAgentStore::default(), 190 }; 191 #[cfg(not(feature = "std"))] 192 let store = self.store.unwrap_or_default(); 193 194 Ok(RadrootsSimplexAgentRuntime { 195 store, 196 events: VecDeque::with_capacity(self.queue_capacity), 197 retry_delay_ms: self.retry_delay_ms, 198 }) 199 } 200 } 201 202 impl Default for RadrootsSimplexAgentRuntimeBuilder { 203 fn default() -> Self { 204 Self::new() 205 } 206 } 207 208 pub struct RadrootsSimplexAgentRuntime { 209 store: RadrootsSimplexAgentStore, 210 events: VecDeque<RadrootsSimplexAgentRuntimeEvent>, 211 retry_delay_ms: u64, 212 } 213 214 impl RadrootsSimplexAgentRuntime { 215 pub fn create_connection( 216 &mut self, 217 mut invitation_queue: RadrootsSimplexSmpQueueUri, 218 e2e_seed: Vec<u8>, 219 contact_address: bool, 220 now: u64, 221 ) -> Result<String, RadrootsSimplexAgentRuntimeError> { 222 let e2e_keypair = RadrootsSimplexSmpX25519Keypair::from_seed(&e2e_seed); 223 invitation_queue.recipient_dh_public_key = encode_queue_public_key(&e2e_keypair.public_key) 224 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 225 invitation_queue.sender_id = placeholder_sender_id( 226 invitation_queue.server.server_identity.as_bytes(), 227 &now.to_be_bytes(), 228 ); 229 let x3dh_key_1 = official_x448_keypair_from_seed(&derive_material( 230 b"connection-create-x3dh-1", 231 &[ 232 invitation_queue.to_string().as_bytes(), 233 &e2e_keypair.public_key, 234 &now.to_be_bytes(), 235 ], 236 )); 237 let x3dh_key_2 = official_x448_keypair_from_seed(&derive_material( 238 b"connection-create-x3dh-2", 239 &[ 240 invitation_queue.to_string().as_bytes(), 241 &e2e_keypair.public_key, 242 &now.to_be_bytes(), 243 ], 244 )); 245 let pq_keypair = official_sntrup761_keypair_from_seed(&derive_material( 246 b"connection-create-pq-kem", 247 &[ 248 invitation_queue.to_string().as_bytes(), 249 &e2e_keypair.public_key, 250 &now.to_be_bytes(), 251 ], 252 )); 253 let e2e_ratchet_params = RadrootsSimplexOfficialX3dhParams { 254 version_range: RadrootsSimplexSmpVersionRange::new( 255 RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION, 256 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, 257 ) 258 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?, 259 key_1: x3dh_key_1.public_key.clone(), 260 key_2: x3dh_key_2.public_key.clone(), 261 pq_public_key: Some(pq_keypair.public_key.clone()), 262 pq_ciphertext: None, 263 }; 264 let mut ratchet_state = RadrootsSimplexSmpRatchetState::initiator( 265 x3dh_key_2.public_key.clone(), 266 x3dh_key_1.public_key.clone(), 267 None, 268 ) 269 .ok(); 270 if let Some(ratchet_state) = ratchet_state.as_mut() { 271 ratchet_state.current_pq_public_key = Some(pq_keypair.public_key.clone()); 272 ratchet_state.local_pq_private_key = Some(pq_keypair.private_key.clone()); 273 } 274 let connection = self.store.create_connection( 275 if contact_address { 276 RadrootsSimplexAgentConnectionMode::ContactAddress 277 } else { 278 RadrootsSimplexAgentConnectionMode::Direct 279 }, 280 RadrootsSimplexAgentConnectionStatus::CreatePending, 281 None, 282 ratchet_state, 283 ); 284 let invitation = RadrootsSimplexAgentConnectionLink { 285 invitation_queue: invitation_queue.clone(), 286 connection_id: connection.id.as_bytes().to_vec(), 287 e2e_ratchet_params, 288 contact_address, 289 }; 290 let prepared_short_link = if contact_address { 291 None 292 } else { 293 Some(prepare_short_invitation_link_data(&invitation)?) 294 }; 295 self.store.connection_mut(&connection.id)?.invitation = Some(invitation); 296 let receive_auth_state = self.store.generate_queue_auth_state()?; 297 let delivery_keypair = RadrootsSimplexSmpX25519Keypair::generate() 298 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 299 let descriptor = RadrootsSimplexAgentQueueDescriptor { 300 queue_uri: invitation_queue, 301 replaced_queue: None, 302 primary: true, 303 sender_key: None, 304 }; 305 self.store.add_queue( 306 &connection.id, 307 descriptor.clone(), 308 RadrootsSimplexAgentQueueRole::Receive, 309 true, 310 receive_auth_state, 311 )?; 312 let server_key_hash = decode_server_key_hash(&descriptor.queue_uri.server.server_identity)?; 313 { 314 let connection = self.store.connection_mut(&connection.id)?; 315 connection.local_e2e_public_key = Some(e2e_keypair.public_key); 316 connection.local_e2e_private_key = Some(e2e_keypair.private_key); 317 connection.local_x3dh_key_1 = Some(agent_x3dh_keypair(x3dh_key_1)); 318 connection.local_x3dh_key_2 = Some(agent_x3dh_keypair(x3dh_key_2)); 319 connection.local_pq_keypair = Some(agent_pq_keypair(pq_keypair)); 320 connection.short_link = 321 prepared_short_link.map(|prepared| RadrootsSimplexAgentShortLinkCredentials { 322 scheme: RadrootsSimplexAgentShortLinkScheme::Simplex, 323 hosts: descriptor.queue_uri.server.hosts.clone(), 324 port: descriptor.queue_uri.server.port, 325 server_key_hash: Some(server_key_hash), 326 link_id: Vec::new(), 327 link_key: prepared.link_key, 328 link_public_signature_key: prepared.link_public_signature_key, 329 link_private_signature_key: prepared.link_private_signature_key, 330 encrypted_fixed_data: Some(prepared.encrypted_link_data.fixed_data), 331 encrypted_user_data: Some(prepared.encrypted_link_data.user_data), 332 }); 333 let queue = connection 334 .queues 335 .iter_mut() 336 .find(|queue| queue.descriptor.queue_address() == descriptor.queue_address()) 337 .ok_or_else(|| { 338 RadrootsSimplexAgentRuntimeError::Runtime( 339 "SimpleX receive queue missing after create_connection".into(), 340 ) 341 })?; 342 queue.delivery_private_key = Some(delivery_keypair.private_key); 343 } 344 self.store.enqueue_command( 345 &connection.id, 346 RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor }, 347 now, 348 )?; 349 self.flush_store()?; 350 Ok(connection.id) 351 } 352 353 pub fn join_connection( 354 &mut self, 355 invitation: RadrootsSimplexAgentConnectionLink, 356 reply_queue: RadrootsSimplexSmpQueueUri, 357 now: u64, 358 ) -> Result<String, RadrootsSimplexAgentRuntimeError> { 359 let connection = self.store.create_connection( 360 RadrootsSimplexAgentConnectionMode::Direct, 361 RadrootsSimplexAgentConnectionStatus::JoinPending, 362 None, 363 None, 364 ); 365 let connection_id = connection.id.clone(); 366 self.prepare_join_connection(&connection_id, invitation, reply_queue, now, None)?; 367 self.flush_store()?; 368 Ok(connection_id) 369 } 370 371 pub fn join_short_invitation( 372 &mut self, 373 invitation: RadrootsSimplexAgentShortInvitationLink, 374 reply_queue: RadrootsSimplexSmpQueueUri, 375 now: u64, 376 ) -> Result<String, RadrootsSimplexAgentRuntimeError> { 377 let _ = short_invitation_server(&invitation)?; 378 let connection = self.store.create_connection( 379 RadrootsSimplexAgentConnectionMode::Direct, 380 RadrootsSimplexAgentConnectionStatus::JoinPending, 381 None, 382 None, 383 ); 384 let sender_auth_state = self.store.generate_queue_auth_state()?; 385 self.store.enqueue_command( 386 &connection.id, 387 RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { 388 invitation, 389 reply_queue, 390 sender_auth_state, 391 }, 392 now, 393 )?; 394 self.flush_store()?; 395 Ok(connection.id) 396 } 397 398 fn prepare_join_connection( 399 &mut self, 400 connection_id: &str, 401 invitation: RadrootsSimplexAgentConnectionLink, 402 mut reply_queue: RadrootsSimplexSmpQueueUri, 403 now: u64, 404 secured_sender_auth_state: Option<RadrootsSimplexAgentQueueAuthState>, 405 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 406 let send_queue_is_secured = secured_sender_auth_state.is_some(); 407 let local_e2e_keypair = RadrootsSimplexSmpX25519Keypair::generate() 408 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 409 let invitation_e2e_public_key = 410 decode_queue_public_key(&invitation.invitation_queue.recipient_dh_public_key)?; 411 let shared_secret = 412 derive_shared_secret(&local_e2e_keypair.private_key, &invitation_e2e_public_key) 413 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 414 reply_queue.recipient_dh_public_key = 415 encode_queue_public_key(&local_e2e_keypair.public_key) 416 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 417 reply_queue.sender_id = 418 placeholder_sender_id(invitation.connection_id.as_slice(), &now.to_be_bytes()); 419 let local_x3dh_key_1 = official_x448_keypair_from_seed(&derive_material( 420 b"connection-join-x3dh-1", 421 &[ 422 invitation.connection_id.as_slice(), 423 reply_queue.to_string().as_bytes(), 424 &now.to_be_bytes(), 425 ], 426 )); 427 let local_x3dh_key_2 = official_x448_keypair_from_seed(&derive_material( 428 b"connection-join-x3dh-2", 429 &[ 430 invitation.connection_id.as_slice(), 431 reply_queue.to_string().as_bytes(), 432 &now.to_be_bytes(), 433 ], 434 )); 435 let local_pq_keypair = invitation 436 .e2e_ratchet_params 437 .pq_public_key 438 .as_ref() 439 .map(|_| { 440 official_sntrup761_keypair_from_seed(&derive_material( 441 b"connection-join-pq-kem", 442 &[ 443 invitation.connection_id.as_slice(), 444 reply_queue.to_string().as_bytes(), 445 &now.to_be_bytes(), 446 ], 447 )) 448 }); 449 let mut ratchet_state = RadrootsSimplexSmpRatchetState::responder( 450 local_x3dh_key_2.public_key.clone(), 451 invitation.e2e_ratchet_params.key_2.clone(), 452 local_pq_keypair 453 .as_ref() 454 .map(|keypair| keypair.public_key.clone()), 455 ) 456 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 457 let local_pq_keypair = if let Some(local_pq_keypair) = local_pq_keypair { 458 let sender_init = official_x3dh_sender_init_accepting_pq( 459 &local_x3dh_key_1, 460 &local_x3dh_key_2, 461 local_pq_keypair, 462 &invitation.e2e_ratchet_params, 463 &derive_material( 464 b"connection-join-pq-encapsulation", 465 &[ 466 invitation.connection_id.as_slice(), 467 reply_queue.to_string().as_bytes(), 468 &now.to_be_bytes(), 469 ], 470 ), 471 ) 472 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 473 ratchet_state 474 .initialize_official_sender(local_x3dh_key_2.private_key.clone(), sender_init.init) 475 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 476 ratchet_state.current_pq_public_key = sender_init.sender_params.pq_public_key.clone(); 477 ratchet_state.pending_outbound_pq_ciphertext = 478 sender_init.sender_params.pq_ciphertext.clone(); 479 ratchet_state.local_pq_private_key = 480 Some(sender_init.local_pq_keypair.private_key.clone()); 481 Some(sender_init.local_pq_keypair) 482 } else { 483 let sender_init = official_x3dh_sender_init( 484 &local_x3dh_key_1, 485 &local_x3dh_key_2, 486 &invitation.e2e_ratchet_params, 487 ) 488 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 489 ratchet_state 490 .initialize_official_sender(local_x3dh_key_2.private_key.clone(), sender_init) 491 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 492 None 493 }; 494 let send_auth_state = if let Some(sender_auth_state) = secured_sender_auth_state { 495 sender_auth_state 496 } else { 497 self.store.generate_queue_auth_state()? 498 }; 499 let send_descriptor = RadrootsSimplexAgentQueueDescriptor { 500 queue_uri: invitation.invitation_queue.clone(), 501 replaced_queue: None, 502 primary: true, 503 sender_key: Some(send_auth_state.public_key.clone()), 504 }; 505 let receive_auth_state = self.store.generate_queue_auth_state()?; 506 let delivery_keypair = RadrootsSimplexSmpX25519Keypair::generate() 507 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 508 let receive_descriptor = RadrootsSimplexAgentQueueDescriptor { 509 queue_uri: reply_queue, 510 replaced_queue: None, 511 primary: true, 512 sender_key: None, 513 }; 514 { 515 let connection = self.store.connection_mut(connection_id)?; 516 connection.mode = RadrootsSimplexAgentConnectionMode::Direct; 517 connection.status = RadrootsSimplexAgentConnectionStatus::JoinPending; 518 connection.invitation = Some(invitation); 519 connection.ratchet_state = Some(ratchet_state); 520 } 521 self.store.add_queue( 522 connection_id, 523 send_descriptor.clone(), 524 RadrootsSimplexAgentQueueRole::Send, 525 true, 526 send_auth_state, 527 )?; 528 self.store.add_queue( 529 connection_id, 530 receive_descriptor.clone(), 531 RadrootsSimplexAgentQueueRole::Receive, 532 true, 533 receive_auth_state, 534 )?; 535 { 536 let connection = self.store.connection_mut(connection_id)?; 537 connection.local_e2e_public_key = Some(local_e2e_keypair.public_key.clone()); 538 connection.local_e2e_private_key = Some(local_e2e_keypair.private_key); 539 connection.local_x3dh_key_1 = Some(agent_x3dh_keypair(local_x3dh_key_1)); 540 connection.local_x3dh_key_2 = Some(agent_x3dh_keypair(local_x3dh_key_2)); 541 connection.local_pq_keypair = local_pq_keypair.map(agent_pq_keypair); 542 connection.shared_secret = Some(shared_secret); 543 let queue = connection 544 .queues 545 .iter_mut() 546 .find(|queue| { 547 queue.descriptor.queue_address() == receive_descriptor.queue_address() 548 }) 549 .ok_or_else(|| { 550 RadrootsSimplexAgentRuntimeError::Runtime( 551 "SimpleX reply receive queue missing after join_connection".into(), 552 ) 553 })?; 554 queue.delivery_private_key = Some(delivery_keypair.private_key); 555 } 556 if !send_queue_is_secured { 557 self.store.enqueue_command( 558 connection_id, 559 RadrootsSimplexAgentPendingCommandKind::SecureQueue { 560 queue: send_descriptor.queue_address(), 561 sender_key: send_descriptor.sender_key.clone(), 562 }, 563 now, 564 )?; 565 } 566 self.store.enqueue_command( 567 connection_id, 568 RadrootsSimplexAgentPendingCommandKind::CreateQueue { 569 descriptor: receive_descriptor.clone(), 570 }, 571 now, 572 )?; 573 Ok(()) 574 } 575 576 pub fn allow_connection( 577 &mut self, 578 connection_id: &str, 579 local_info: Vec<u8>, 580 now: u64, 581 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 582 if self.store.connection(connection_id)?.status 583 != RadrootsSimplexAgentConnectionStatus::AwaitingApproval 584 { 585 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 586 "SimpleX connection `{connection_id}` is not awaiting approval" 587 ))); 588 } 589 self.store 590 .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?; 591 let send_queue = self.store.primary_send_queue(connection_id)?; 592 let encrypted = self.next_encrypted_payload( 593 connection_id, 594 encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::ConnectionInfo( 595 local_info, 596 ))?, 597 SimplexAgentPayloadKind::ConnectionInfo, 598 )?; 599 self.store.enqueue_command( 600 connection_id, 601 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 602 queue: send_queue.descriptor.queue_address(), 603 envelope: RadrootsSimplexAgentEnvelope::Confirmation { 604 reply_queue: false, 605 e2e_ratchet_params: None, 606 encrypted, 607 }, 608 delivery: None, 609 }, 610 now, 611 )?; 612 self.enqueue_hello(connection_id, now)?; 613 self.flush_store()?; 614 Ok(()) 615 } 616 617 pub fn subscribe_connection( 618 &mut self, 619 connection_id: &str, 620 now: u64, 621 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 622 for queue in self.store.receive_queues(connection_id)? { 623 self.store.enqueue_command( 624 connection_id, 625 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { 626 queue: queue.descriptor.queue_address(), 627 }, 628 now, 629 )?; 630 } 631 self.events 632 .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { 633 connection_id: connection_id.into(), 634 }); 635 self.flush_store()?; 636 Ok(()) 637 } 638 639 pub fn resume_subscriptions( 640 &mut self, 641 now: u64, 642 ) -> Result<usize, RadrootsSimplexAgentRuntimeError> { 643 let mut queued = 0_usize; 644 let mut event_connections = Vec::new(); 645 for (connection_id, queue) in self.store.subscribed_receive_queues() { 646 if self 647 .store 648 .has_pending_subscribe_queue(&connection_id, &queue) 649 { 650 continue; 651 } 652 self.store.enqueue_command( 653 &connection_id, 654 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue }, 655 now, 656 )?; 657 queued = queued.saturating_add(1); 658 if !event_connections.contains(&connection_id) { 659 event_connections.push(connection_id); 660 } 661 } 662 for connection_id in event_connections { 663 self.events 664 .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { connection_id }); 665 } 666 if queued > 0 { 667 self.flush_store()?; 668 } 669 Ok(queued) 670 } 671 672 pub fn get_connection_message( 673 &mut self, 674 connection_id: &str, 675 now: u64, 676 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 677 for queue in self.store.receive_queues(connection_id)? { 678 self.store.enqueue_command( 679 connection_id, 680 RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { 681 queue: queue.descriptor.queue_address(), 682 }, 683 now, 684 )?; 685 } 686 self.flush_store()?; 687 Ok(()) 688 } 689 690 pub fn send_message( 691 &mut self, 692 connection_id: &str, 693 body: Vec<u8>, 694 now: u64, 695 ) -> Result<u64, RadrootsSimplexAgentRuntimeError> { 696 let send_queue = self.store.primary_send_queue(connection_id)?; 697 let connection = self.store.connection(connection_id)?; 698 if connection.status != RadrootsSimplexAgentConnectionStatus::Connected { 699 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 700 "SimpleX connection `{connection_id}` is not connected" 701 ))); 702 } 703 if connection.staged_outbound_message.is_some() { 704 return Err(RadrootsSimplexAgentRuntimeError::Store( 705 radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::PendingOutboundMessage( 706 connection_id.into(), 707 ), 708 )); 709 } 710 let previous_hash = connection 711 .delivery_cursor 712 .last_sent_message_hash 713 .clone() 714 .unwrap_or_default(); 715 let message_id = connection 716 .delivery_cursor 717 .last_sent_message_id 718 .unwrap_or(0) 719 .saturating_add(1); 720 let frame = RadrootsSimplexAgentMessageFrame { 721 header: RadrootsSimplexAgentMessageHeader { 722 message_id, 723 previous_message_hash: previous_hash, 724 }, 725 message: RadrootsSimplexAgentMessage::UserMessage(body), 726 padding: Vec::new(), 727 }; 728 let ciphertext = 729 encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?; 730 let message_hash = Sha256::digest(&ciphertext).to_vec(); 731 let prepared = self 732 .store 733 .prepare_outbound_message(connection_id, message_hash.clone())?; 734 let encrypted = self.next_encrypted_payload( 735 connection_id, 736 ciphertext, 737 SimplexAgentPayloadKind::Message, 738 )?; 739 self.store.enqueue_command( 740 connection_id, 741 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 742 queue: send_queue.descriptor.queue_address(), 743 envelope: RadrootsSimplexAgentEnvelope::Message(encrypted), 744 delivery: Some(RadrootsSimplexAgentOutboundMessage { 745 message_id: prepared.message_id, 746 message_hash: prepared.message_hash, 747 }), 748 }, 749 now, 750 )?; 751 self.events 752 .push_back(RadrootsSimplexAgentRuntimeEvent::MessageQueued { 753 connection_id: connection_id.into(), 754 message_id, 755 }); 756 self.flush_store()?; 757 Ok(message_id) 758 } 759 760 pub fn send_hello( 761 &mut self, 762 connection_id: &str, 763 now: u64, 764 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 765 self.enqueue_hello(connection_id, now)?; 766 self.flush_store()?; 767 Ok(()) 768 } 769 770 pub fn ack_message( 771 &mut self, 772 connection_id: &str, 773 message_id: u64, 774 message_hash: Vec<u8>, 775 receipt_info: Vec<u8>, 776 now: u64, 777 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 778 if self 779 .store 780 .has_pending_ack_message(connection_id, message_id, &message_hash) 781 { 782 return Ok(()); 783 } 784 let (receive_queue, broker_message_id) = self 785 .store 786 .inbound_ack_target(connection_id, message_id, &message_hash)? 787 .ok_or_else(|| { 788 RadrootsSimplexAgentRuntimeError::Runtime(format!( 789 "SimpleX connection `{connection_id}` has no frame-specific ACK target for message `{message_id}`" 790 )) 791 })?; 792 self.store.enqueue_command( 793 connection_id, 794 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { 795 queue: receive_queue, 796 broker_message_id, 797 receipt: Some(RadrootsSimplexAgentMessageReceipt { 798 message_id, 799 message_hash, 800 receipt_info, 801 }), 802 }, 803 now, 804 )?; 805 self.flush_store()?; 806 Ok(()) 807 } 808 809 pub fn ack_last_received_message( 810 &mut self, 811 connection_id: &str, 812 message_id: u64, 813 receipt_info: Vec<u8>, 814 now: u64, 815 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 816 let message_hash = self 817 .store 818 .connection(connection_id)? 819 .delivery_cursor 820 .last_received_message_hash 821 .clone() 822 .ok_or_else(|| { 823 RadrootsSimplexAgentRuntimeError::Runtime(format!( 824 "SimpleX connection `{connection_id}` has no received message hash to acknowledge" 825 )) 826 })?; 827 self.ack_message(connection_id, message_id, message_hash, receipt_info, now) 828 } 829 830 fn ack_broker_message( 831 &mut self, 832 connection_id: &str, 833 queue: radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress, 834 broker_message_id: Vec<u8>, 835 now: u64, 836 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 837 if self 838 .store 839 .has_pending_broker_ack(connection_id, &queue, &broker_message_id) 840 { 841 return Ok(()); 842 } 843 self.store.enqueue_command( 844 connection_id, 845 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { 846 queue, 847 broker_message_id, 848 receipt: None, 849 }, 850 now, 851 )?; 852 Ok(()) 853 } 854 855 pub fn reconnect_connection( 856 &mut self, 857 connection_id: &str, 858 now: u64, 859 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 860 self.subscribe_connection(connection_id, now)?; 861 let ready = self.store.take_ready_commands(now, usize::MAX); 862 for command in ready { 863 self.store 864 .mark_command_retry(command.id, now + self.retry_delay_ms)?; 865 self.events 866 .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued { 867 connection_id: connection_id.into(), 868 command_id: command.id, 869 }); 870 } 871 self.flush_store()?; 872 Ok(()) 873 } 874 875 pub fn queue_rotation( 876 &mut self, 877 connection_id: &str, 878 descriptors: Vec<RadrootsSimplexAgentQueueDescriptor>, 879 now: u64, 880 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 881 self.store.set_status( 882 connection_id, 883 RadrootsSimplexAgentConnectionStatus::Rotating, 884 )?; 885 self.store.enqueue_command( 886 connection_id, 887 RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors }, 888 now, 889 )?; 890 self.events 891 .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { 892 connection_id: connection_id.into(), 893 }); 894 self.flush_store()?; 895 Ok(()) 896 } 897 898 pub fn handle_inbound_decrypted_message( 899 &mut self, 900 connection_id: &str, 901 message: RadrootsSimplexAgentDecryptedMessage, 902 message_hash: Vec<u8>, 903 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 904 match message { 905 RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => { 906 if self.store.connection(connection_id)?.status 907 != RadrootsSimplexAgentConnectionStatus::Connected 908 { 909 self.store 910 .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?; 911 } 912 self.enqueue_hello(connection_id, 0)?; 913 self.events 914 .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { 915 connection_id: connection_id.into(), 916 info, 917 }); 918 } 919 RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => { 920 let mut secure_queues = Vec::new(); 921 for descriptor in reply_queues { 922 let queue_address = descriptor.queue_address(); 923 if let Ok(existing_queue) = 924 self.store.queue_record(connection_id, &queue_address) 925 && existing_queue.role == RadrootsSimplexAgentQueueRole::Send 926 { 927 continue; 928 } 929 let auth_state = self.store.generate_queue_auth_state()?; 930 let mut descriptor = descriptor; 931 descriptor.sender_key = Some(auth_state.public_key.clone()); 932 let secure_queue = descriptor.queue_address(); 933 let sender_key = descriptor.sender_key.clone(); 934 self.store.add_queue( 935 connection_id, 936 descriptor, 937 RadrootsSimplexAgentQueueRole::Send, 938 true, 939 auth_state, 940 )?; 941 secure_queues.push((secure_queue, sender_key)); 942 } 943 if secure_queues.is_empty() 944 && matches!( 945 self.store.connection(connection_id)?.status, 946 RadrootsSimplexAgentConnectionStatus::AwaitingApproval 947 | RadrootsSimplexAgentConnectionStatus::Allowed 948 | RadrootsSimplexAgentConnectionStatus::Connected 949 ) 950 { 951 return Ok(()); 952 } 953 self.store.set_status( 954 connection_id, 955 RadrootsSimplexAgentConnectionStatus::AwaitingApproval, 956 )?; 957 for (queue, sender_key) in secure_queues { 958 self.store.enqueue_command( 959 connection_id, 960 RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key }, 961 0, 962 )?; 963 } 964 self.events 965 .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { 966 connection_id: connection_id.into(), 967 }); 968 self.events 969 .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { 970 connection_id: connection_id.into(), 971 info, 972 }); 973 } 974 RadrootsSimplexAgentDecryptedMessage::RatchetInfo(info) => { 975 self.events 976 .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { 977 connection_id: connection_id.into(), 978 info, 979 }); 980 } 981 RadrootsSimplexAgentDecryptedMessage::Message(frame) => match frame.message { 982 RadrootsSimplexAgentMessage::Hello => { 983 let connection = self.store.connection(connection_id)?; 984 let was_connected = 985 connection.status == RadrootsSimplexAgentConnectionStatus::Connected; 986 let should_send_hello = !connection.hello_sent; 987 { 988 let connection = self.store.connection_mut(connection_id)?; 989 connection.hello_received = true; 990 } 991 if should_send_hello { 992 self.enqueue_hello(connection_id, 0)?; 993 } 994 if !was_connected { 995 self.store.set_status( 996 connection_id, 997 RadrootsSimplexAgentConnectionStatus::Connected, 998 )?; 999 self.events.push_back( 1000 RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { 1001 connection_id: connection_id.into(), 1002 }, 1003 ); 1004 } 1005 } 1006 RadrootsSimplexAgentMessage::Receipt(receipt) => { 1007 let Some(stored_message_hash) = self 1008 .store 1009 .outbound_message_hash(connection_id, receipt.message_id)? 1010 else { 1011 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 1012 "SimpleX receipt for `{connection_id}` referenced unknown outbound message `{}`", 1013 receipt.message_id 1014 ))); 1015 }; 1016 if stored_message_hash != receipt.message_hash { 1017 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 1018 "SimpleX receipt for `{connection_id}` message `{}` did not match stored outbound message hash", 1019 receipt.message_id 1020 ))); 1021 } 1022 self.events 1023 .push_back(RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { 1024 connection_id: connection_id.into(), 1025 message_id: receipt.message_id, 1026 message_hash: receipt.message_hash, 1027 }); 1028 } 1029 RadrootsSimplexAgentMessage::QueueAdd(_) 1030 | RadrootsSimplexAgentMessage::QueueKey(_) 1031 | RadrootsSimplexAgentMessage::QueueUse(_) 1032 | RadrootsSimplexAgentMessage::QueueTest(_) 1033 | RadrootsSimplexAgentMessage::QueueContinue(_) => { 1034 self.events 1035 .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { 1036 connection_id: connection_id.into(), 1037 }); 1038 } 1039 RadrootsSimplexAgentMessage::UserMessage(body) => { 1040 let broker_message_id_hash = self 1041 .store 1042 .connection(connection_id)? 1043 .last_received_broker_message_id 1044 .as_ref() 1045 .map(|broker_message_id| Sha256::digest(broker_message_id).to_vec()) 1046 .unwrap_or_default(); 1047 self.events 1048 .push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived { 1049 connection_id: connection_id.into(), 1050 message_id: frame.header.message_id, 1051 broker_message_id_hash, 1052 message_hash, 1053 body, 1054 }); 1055 } 1056 _ => {} 1057 }, 1058 } 1059 self.flush_store()?; 1060 Ok(()) 1061 } 1062 1063 pub fn record_command_outcome( 1064 &mut self, 1065 command_id: u64, 1066 outcome: RadrootsSimplexAgentCommandOutcome, 1067 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1068 match outcome { 1069 RadrootsSimplexAgentCommandOutcome::Delivered => { 1070 let command = self.store.mark_command_delivered(command_id)?; 1071 self.apply_delivery_side_effects(&command)?; 1072 } 1073 RadrootsSimplexAgentCommandOutcome::RetryAt { ready_at } => { 1074 let command = self.store.mark_command_retry(command_id, ready_at)?; 1075 self.events 1076 .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued { 1077 connection_id: command.connection_id, 1078 command_id, 1079 }); 1080 } 1081 RadrootsSimplexAgentCommandOutcome::Failed { message } => { 1082 let command = self.store.mark_command_failed(command_id)?; 1083 self.apply_failure_side_effects(&command)?; 1084 self.events 1085 .push_back(RadrootsSimplexAgentRuntimeEvent::Error { 1086 connection_id: Some(command.connection_id), 1087 message, 1088 }); 1089 } 1090 } 1091 self.flush_store()?; 1092 Ok(()) 1093 } 1094 1095 pub fn execute_ready_commands<T: RadrootsSimplexSmpCommandTransport>( 1096 &mut self, 1097 transport: &mut T, 1098 now: u64, 1099 limit: usize, 1100 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1101 let mut remaining = limit; 1102 while remaining > 0 { 1103 let ready = self.store.take_ready_commands(now, remaining); 1104 if ready.is_empty() { 1105 break; 1106 } 1107 remaining = remaining.saturating_sub(ready.len()); 1108 for command in ready { 1109 self.dispatch_ready_command(transport, &command, now)?; 1110 } 1111 } 1112 self.flush_store()?; 1113 Ok(()) 1114 } 1115 1116 pub fn receive_subscription_messages<T: RadrootsSimplexSmpSubscriptionTransport>( 1117 &mut self, 1118 transport: &mut T, 1119 limit: usize, 1120 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1121 let mut remaining = limit; 1122 for server in self.store.subscribed_receive_servers() { 1123 while remaining > 0 { 1124 match transport.receive_subscription(RadrootsSimplexSmpSubscriptionReceiveRequest { 1125 server: server.clone(), 1126 }) { 1127 Ok(Some(response)) => { 1128 self.apply_subscription_response(response)?; 1129 remaining = remaining.saturating_sub(1); 1130 } 1131 Ok(None) => break, 1132 Err(error) => { 1133 self.events 1134 .push_back(RadrootsSimplexAgentRuntimeEvent::Error { 1135 connection_id: None, 1136 message: format!( 1137 "SimpleX subscription receive failed for server `{}`: {error}", 1138 server.server_identity 1139 ), 1140 }); 1141 break; 1142 } 1143 } 1144 } 1145 if remaining == 0 { 1146 break; 1147 } 1148 } 1149 self.flush_store()?; 1150 Ok(()) 1151 } 1152 1153 pub fn retry_pending( 1154 &mut self, 1155 now: u64, 1156 limit: usize, 1157 ) -> Vec<RadrootsSimplexAgentPendingCommand> { 1158 self.store.take_ready_commands(now, limit) 1159 } 1160 1161 pub fn drain_events(&mut self, max: usize) -> Vec<RadrootsSimplexAgentRuntimeEvent> { 1162 let take = self.events.len().min(max); 1163 (0..take) 1164 .filter_map(|_| self.events.pop_front()) 1165 .collect::<Vec<_>>() 1166 } 1167 1168 fn dispatch_ready_command<T: RadrootsSimplexSmpCommandTransport>( 1169 &mut self, 1170 transport: &mut T, 1171 command: &RadrootsSimplexAgentPendingCommand, 1172 now: u64, 1173 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1174 match &command.kind { 1175 RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => { 1176 for descriptor in descriptors.clone() { 1177 let auth_state = self.store.generate_queue_auth_state()?; 1178 self.store.add_queue( 1179 &command.connection_id, 1180 descriptor, 1181 RadrootsSimplexAgentQueueRole::Receive, 1182 true, 1183 auth_state, 1184 )?; 1185 } 1186 self.record_command_outcome( 1187 command.id, 1188 RadrootsSimplexAgentCommandOutcome::Delivered, 1189 ) 1190 } 1191 RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { 1192 for queue in queues { 1193 self.store 1194 .mark_queue_tested(&command.connection_id, queue)?; 1195 } 1196 self.record_command_outcome( 1197 command.id, 1198 RadrootsSimplexAgentCommandOutcome::Delivered, 1199 ) 1200 } 1201 _ => { 1202 let request = self.build_transport_request(command)?; 1203 match transport.execute(request) { 1204 Ok(response) => self.apply_transport_response(command, response), 1205 Err(error) => { 1206 self.events 1207 .push_back(RadrootsSimplexAgentRuntimeEvent::Error { 1208 connection_id: Some(command.connection_id.clone()), 1209 message: format!( 1210 "SimpleX transport execution failed for command `{}`: {error}", 1211 command.id 1212 ), 1213 }); 1214 self.record_command_outcome( 1215 command.id, 1216 RadrootsSimplexAgentCommandOutcome::RetryAt { 1217 ready_at: now + self.retry_delay_ms, 1218 }, 1219 ) 1220 } 1221 } 1222 } 1223 } 1224 } 1225 1226 fn build_transport_request( 1227 &self, 1228 command: &RadrootsSimplexAgentPendingCommand, 1229 ) -> Result<RadrootsSimplexSmpTransportRequest, RadrootsSimplexAgentRuntimeError> { 1230 match &command.kind { 1231 RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { 1232 invitation, 1233 sender_auth_state, 1234 .. 1235 } => { 1236 let server = short_invitation_server(invitation)?; 1237 return Ok(RadrootsSimplexSmpTransportRequest { 1238 server, 1239 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1240 correlation_id: Some(self.command_correlation_id(command)?), 1241 entity_id: invitation.link_id.clone(), 1242 command: RadrootsSimplexSmpCommand::LKey( 1243 encode_ed25519_public_key_x509(&sender_auth_state.public_key).map_err( 1244 |error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()), 1245 )?, 1246 ), 1247 authorization: RadrootsSimplexSmpCommandAuthorization::Ed25519( 1248 radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair { 1249 public_key: sender_auth_state.public_key.clone(), 1250 private_key: sender_auth_state.private_key.clone(), 1251 }, 1252 ), 1253 }); 1254 } 1255 RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { invitation, .. } => { 1256 let server = short_invitation_server(invitation)?; 1257 return Ok(self.server_transport_request( 1258 command.id, 1259 &server, 1260 invitation.link_id.clone(), 1261 RadrootsSimplexSmpCommand::LGet, 1262 )); 1263 } 1264 _ => {} 1265 } 1266 let (queue_address, entity_id, smp_command) = self.command_transport_parts(command)?; 1267 let queue = self 1268 .store 1269 .queue_record(&command.connection_id, &queue_address)?; 1270 let auth = queue.auth_state.ok_or_else(|| { 1271 RadrootsSimplexAgentRuntimeError::Store( 1272 radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::QueueAuthStateMissing( 1273 command.connection_id.clone(), 1274 ), 1275 ) 1276 })?; 1277 let correlation_id = self.command_correlation_id(command)?; 1278 let authorization = RadrootsSimplexSmpCommandAuthorization::Ed25519( 1279 radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair { 1280 public_key: auth.public_key, 1281 private_key: auth.private_key, 1282 }, 1283 ); 1284 Ok(RadrootsSimplexSmpTransportRequest { 1285 server: queue.descriptor.queue_uri.server.clone(), 1286 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1287 correlation_id: Some(correlation_id), 1288 entity_id, 1289 command: smp_command, 1290 authorization, 1291 }) 1292 } 1293 1294 fn server_transport_request( 1295 &self, 1296 command_id: u64, 1297 server: &RadrootsSimplexSmpServerAddress, 1298 entity_id: Vec<u8>, 1299 command: RadrootsSimplexSmpCommand, 1300 ) -> RadrootsSimplexSmpTransportRequest { 1301 let correlation_id = correlation_id_from_material( 1302 b"simplex-server-command-correlation", 1303 &[ 1304 command_id.to_be_bytes().to_vec(), 1305 server.server_identity.as_bytes().to_vec(), 1306 entity_id.clone(), 1307 ], 1308 ); 1309 RadrootsSimplexSmpTransportRequest { 1310 server: server.clone(), 1311 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1312 correlation_id: Some(correlation_id), 1313 entity_id, 1314 command, 1315 authorization: RadrootsSimplexSmpCommandAuthorization::None, 1316 } 1317 } 1318 1319 fn command_correlation_id( 1320 &self, 1321 command: &RadrootsSimplexAgentPendingCommand, 1322 ) -> Result<RadrootsSimplexSmpCorrelationId, RadrootsSimplexAgentRuntimeError> { 1323 let mut parts = vec![ 1324 command.id.to_be_bytes().to_vec(), 1325 command.connection_id.as_bytes().to_vec(), 1326 ]; 1327 if let Some(queue_address) = queue_for_command(command) { 1328 parts.push(queue_address.server.server_identity.as_bytes().to_vec()); 1329 parts.push(queue_address.sender_id.clone()); 1330 parts.push( 1331 self.store 1332 .queue_auth_state(&command.connection_id, &queue_address)? 1333 .public_key, 1334 ); 1335 } 1336 if matches!( 1337 command.kind, 1338 RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. } 1339 ) && let Some(short_link) = self 1340 .store 1341 .connection(&command.connection_id)? 1342 .short_link 1343 .as_ref() 1344 { 1345 parts.push(short_link.link_key.clone()); 1346 } 1347 if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { 1348 sender_auth_state, 1349 .. 1350 } = &command.kind 1351 { 1352 parts.push(sender_auth_state.public_key.clone()); 1353 } 1354 Ok(correlation_id_from_material( 1355 b"simplex-command-correlation", 1356 &parts, 1357 )) 1358 } 1359 1360 fn command_transport_parts( 1361 &self, 1362 command: &RadrootsSimplexAgentPendingCommand, 1363 ) -> Result< 1364 ( 1365 radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress, 1366 Vec<u8>, 1367 RadrootsSimplexSmpCommand, 1368 ), 1369 RadrootsSimplexAgentRuntimeError, 1370 > { 1371 match &command.kind { 1372 RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => { 1373 let correlation_id = self.command_correlation_id(command)?; 1374 let auth_state = self 1375 .store 1376 .queue_auth_state(&command.connection_id, &descriptor.queue_address())?; 1377 let delivery_private_key = self 1378 .store 1379 .queue_record(&command.connection_id, &descriptor.queue_address())? 1380 .delivery_private_key 1381 .ok_or_else(|| { 1382 RadrootsSimplexAgentRuntimeError::Runtime( 1383 "SimpleX receive queue missing delivery private key".into(), 1384 ) 1385 })?; 1386 Ok(( 1387 descriptor.queue_address(), 1388 Vec::new(), 1389 RadrootsSimplexSmpCommand::New(RadrootsSimplexSmpNewQueueRequest { 1390 recipient_auth_public_key: encode_ed25519_public_key_x509( 1391 &auth_state.public_key, 1392 ) 1393 .map_err(|error| { 1394 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()) 1395 })?, 1396 recipient_dh_public_key: encode_x25519_public_key_x509( 1397 &RadrootsSimplexSmpX25519Keypair::public_key_from_private( 1398 &delivery_private_key, 1399 ) 1400 .map_err(|error| { 1401 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()) 1402 })?, 1403 ) 1404 .map_err(|error| { 1405 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()) 1406 })?, 1407 basic_auth: None, 1408 subscription_mode: RadrootsSimplexSmpSubscriptionMode::OnlyCreate, 1409 queue_request_data: Some( 1410 match descriptor 1411 .queue_uri 1412 .queue_mode 1413 .unwrap_or(RadrootsSimplexSmpQueueMode::Messaging) 1414 { 1415 RadrootsSimplexSmpQueueMode::Messaging => { 1416 RadrootsSimplexSmpQueueRequestData::Messaging( 1417 self.short_link_messaging_queue_request( 1418 &command.connection_id, 1419 &correlation_id, 1420 )?, 1421 ) 1422 } 1423 RadrootsSimplexSmpQueueMode::Contact => { 1424 RadrootsSimplexSmpQueueRequestData::Contact(None) 1425 } 1426 }, 1427 ), 1428 notifier_credentials: None, 1429 }), 1430 )) 1431 } 1432 RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key } => Ok(( 1433 queue.clone(), 1434 queue.sender_id.clone(), 1435 RadrootsSimplexSmpCommand::SKey( 1436 encode_ed25519_public_key_x509(sender_key.as_deref().unwrap_or_default()) 1437 .map_err(|error| { 1438 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()) 1439 })?, 1440 ), 1441 )), 1442 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 1443 queue, envelope, .. 1444 } => Ok(( 1445 queue.clone(), 1446 queue.sender_id.clone(), 1447 RadrootsSimplexSmpCommand::Send(RadrootsSimplexSmpSendCommand { 1448 flags: RadrootsSimplexSmpMessageFlags::notifications_enabled(), 1449 message_body: self.encode_smp_message_body(&command.connection_id, envelope)?, 1450 }), 1451 )), 1452 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => Ok(( 1453 queue.clone(), 1454 self.store 1455 .queue_record(&command.connection_id, queue)? 1456 .entity_id, 1457 RadrootsSimplexSmpCommand::Sub, 1458 )), 1459 RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue } => Ok(( 1460 queue.clone(), 1461 self.store 1462 .queue_record(&command.connection_id, queue)? 1463 .entity_id, 1464 RadrootsSimplexSmpCommand::Get, 1465 )), 1466 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { 1467 queue, 1468 broker_message_id, 1469 .. 1470 } => Ok(( 1471 queue.clone(), 1472 self.store 1473 .queue_record(&command.connection_id, queue)? 1474 .entity_id, 1475 RadrootsSimplexSmpCommand::Ack(broker_message_id.clone()), 1476 )), 1477 RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => { 1478 let address = descriptors 1479 .first() 1480 .ok_or_else(|| { 1481 RadrootsSimplexAgentRuntimeError::Runtime( 1482 "queue rotation command requires at least one descriptor".into(), 1483 ) 1484 })? 1485 .queue_address(); 1486 let entity_id = address.sender_id.clone(); 1487 Ok((address, entity_id, RadrootsSimplexSmpCommand::Que)) 1488 } 1489 RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { 1490 let address = queues.first().cloned().ok_or_else(|| { 1491 RadrootsSimplexAgentRuntimeError::Runtime( 1492 "queue test command requires at least one queue".into(), 1493 ) 1494 })?; 1495 let entity_id = address.sender_id.clone(); 1496 Ok((address, entity_id, RadrootsSimplexSmpCommand::Ping)) 1497 } 1498 RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { 1499 queue, 1500 link_id, 1501 link_data, 1502 } => Ok(( 1503 queue.clone(), 1504 self.store 1505 .queue_record(&command.connection_id, queue)? 1506 .entity_id, 1507 RadrootsSimplexSmpCommand::LSet { 1508 link_id: link_id.clone(), 1509 link_data: link_data.clone(), 1510 }, 1511 )), 1512 RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. } 1513 | RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => { 1514 Err(RadrootsSimplexAgentRuntimeError::Runtime( 1515 "SimpleX short-link retrieval commands require server transport dispatch" 1516 .into(), 1517 )) 1518 } 1519 } 1520 } 1521 1522 fn short_link_messaging_queue_request( 1523 &self, 1524 connection_id: &str, 1525 correlation_id: &RadrootsSimplexSmpCorrelationId, 1526 ) -> Result<Option<RadrootsSimplexSmpMessagingQueueRequest>, RadrootsSimplexAgentRuntimeError> 1527 { 1528 let connection = self.store.connection(connection_id)?; 1529 if connection.status != RadrootsSimplexAgentConnectionStatus::CreatePending { 1530 return Ok(None); 1531 } 1532 let Some(short_link) = connection.short_link.as_ref() else { 1533 return Ok(None); 1534 }; 1535 let fixed_data = short_link.encrypted_fixed_data.clone().ok_or_else(|| { 1536 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1537 "SimpleX connection `{connection_id}` is missing encrypted short-link fixed data" 1538 )) 1539 })?; 1540 let user_data = short_link.encrypted_user_data.clone().ok_or_else(|| { 1541 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1542 "SimpleX connection `{connection_id}` is missing encrypted short-link user data" 1543 )) 1544 })?; 1545 Ok(Some(RadrootsSimplexSmpMessagingQueueRequest { 1546 sender_id: short_link_sender_id(correlation_id), 1547 link_data: RadrootsSimplexSmpQueueLinkData { 1548 fixed_data, 1549 user_data, 1550 }, 1551 })) 1552 } 1553 1554 fn process_short_link_response( 1555 &mut self, 1556 command: &RadrootsSimplexAgentPendingCommand, 1557 sender_id: Vec<u8>, 1558 link_data: RadrootsSimplexSmpQueueLinkData, 1559 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1560 let RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { 1561 invitation, 1562 reply_queue, 1563 } = &command.kind 1564 else { 1565 if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { 1566 invitation, 1567 reply_queue, 1568 sender_auth_state, 1569 } = &command.kind 1570 { 1571 let mut connection_link = 1572 decrypt_short_invitation_link_data(invitation, &link_data)?; 1573 connection_link.invitation_queue.sender_id = URL_SAFE_NO_PAD.encode(sender_id); 1574 return self.prepare_join_connection( 1575 &command.connection_id, 1576 connection_link, 1577 reply_queue.clone(), 1578 command.ready_at, 1579 Some(sender_auth_state.clone()), 1580 ); 1581 } 1582 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 1583 "SimpleX LNK response received for non-retrieval command".into(), 1584 )); 1585 }; 1586 let mut connection_link = decrypt_short_invitation_link_data(invitation, &link_data)?; 1587 connection_link.invitation_queue.sender_id = URL_SAFE_NO_PAD.encode(sender_id); 1588 self.prepare_join_connection( 1589 &command.connection_id, 1590 connection_link, 1591 reply_queue.clone(), 1592 command.ready_at, 1593 None, 1594 ) 1595 } 1596 1597 fn apply_transport_response( 1598 &mut self, 1599 command: &RadrootsSimplexAgentPendingCommand, 1600 response: RadrootsSimplexSmpTransportResponse, 1601 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1602 match response.transmission.message { 1603 RadrootsSimplexSmpBrokerMessage::Err(error) 1604 if is_empty_queue_no_msg(command, &error) => 1605 { 1606 self.record_command_outcome( 1607 command.id, 1608 RadrootsSimplexAgentCommandOutcome::Delivered, 1609 ) 1610 } 1611 RadrootsSimplexSmpBrokerMessage::Err(error) => self.record_command_outcome( 1612 command.id, 1613 RadrootsSimplexAgentCommandOutcome::Failed { 1614 message: format!( 1615 "SimpleX broker rejected command `{}` ({}): {:?}", 1616 command.id, 1617 pending_command_kind_label(command), 1618 error 1619 ), 1620 }, 1621 ), 1622 RadrootsSimplexSmpBrokerMessage::Ids(ids) => { 1623 self.process_queue_ids_response(command, ids)?; 1624 self.record_command_outcome( 1625 command.id, 1626 RadrootsSimplexAgentCommandOutcome::Delivered, 1627 ) 1628 } 1629 RadrootsSimplexSmpBrokerMessage::Msg(message) => { 1630 let queue = queue_for_command(command).ok_or_else(|| { 1631 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1632 "SimpleX command `{}` has no queue context for broker message", 1633 command.id 1634 )) 1635 })?; 1636 self.process_received_message_response( 1637 &command.connection_id, 1638 &queue, 1639 message, 1640 response.transport_hash, 1641 )?; 1642 self.record_command_outcome( 1643 command.id, 1644 RadrootsSimplexAgentCommandOutcome::Delivered, 1645 ) 1646 } 1647 RadrootsSimplexSmpBrokerMessage::Lnk { 1648 sender_id, 1649 link_data, 1650 } => { 1651 self.process_short_link_response(command, sender_id, link_data)?; 1652 self.record_command_outcome( 1653 command.id, 1654 RadrootsSimplexAgentCommandOutcome::Delivered, 1655 ) 1656 } 1657 _ => self 1658 .record_command_outcome(command.id, RadrootsSimplexAgentCommandOutcome::Delivered), 1659 } 1660 } 1661 1662 fn apply_subscription_response( 1663 &mut self, 1664 response: RadrootsSimplexSmpTransportResponse, 1665 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1666 let entity_id = response.transmission.entity_id.clone(); 1667 let (connection_id, queue) = self 1668 .store 1669 .receive_queue_by_entity_id(&response.server, &entity_id) 1670 .ok_or_else(|| { 1671 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1672 "SimpleX subscription response for server `{}` used unknown queue entity `{}`", 1673 response.server.server_identity, 1674 URL_SAFE_NO_PAD.encode(&entity_id) 1675 )) 1676 })?; 1677 match response.transmission.message { 1678 RadrootsSimplexSmpBrokerMessage::Msg(message) => self 1679 .process_received_message_response( 1680 &connection_id, 1681 &queue, 1682 message, 1683 response.transport_hash, 1684 ), 1685 RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg) => Ok(()), 1686 RadrootsSimplexSmpBrokerMessage::Err(error) => { 1687 self.events 1688 .push_back(RadrootsSimplexAgentRuntimeEvent::Error { 1689 connection_id: Some(connection_id), 1690 message: format!( 1691 "SimpleX subscription broker error for queue entity `{}`: {:?}", 1692 URL_SAFE_NO_PAD.encode(&entity_id), 1693 error 1694 ), 1695 }); 1696 Ok(()) 1697 } 1698 _ => Ok(()), 1699 } 1700 } 1701 1702 fn apply_delivery_side_effects( 1703 &mut self, 1704 command: &RadrootsSimplexAgentPendingCommand, 1705 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1706 match &command.kind { 1707 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 1708 delivery: Some(delivery), 1709 .. 1710 } => { 1711 let delivered = self 1712 .store 1713 .confirm_outbound_message(&command.connection_id, delivery.message_id)?; 1714 self.events 1715 .push_back(RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered { 1716 connection_id: command.connection_id.clone(), 1717 message_id: delivered.message_id, 1718 message_hash: delivered.message_hash, 1719 }); 1720 let connection = self.store.connection(&command.connection_id)?; 1721 if connection.status == RadrootsSimplexAgentConnectionStatus::Allowed 1722 && connection.hello_sent 1723 && delivered.message_id == 1 1724 { 1725 self.store.set_status( 1726 &command.connection_id, 1727 RadrootsSimplexAgentConnectionStatus::Connected, 1728 )?; 1729 self.events.push_back( 1730 RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { 1731 connection_id: command.connection_id.clone(), 1732 }, 1733 ); 1734 } 1735 } 1736 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => { 1737 self.store 1738 .mark_queue_subscribed(&command.connection_id, queue)?; 1739 } 1740 RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => { 1741 for queue in queues { 1742 self.store 1743 .mark_queue_tested(&command.connection_id, queue)?; 1744 } 1745 } 1746 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { 1747 receipt: Some(receipt), 1748 .. 1749 } => { 1750 self.events.push_back( 1751 RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered { 1752 connection_id: command.connection_id.clone(), 1753 message_id: receipt.message_id, 1754 message_hash: receipt.message_hash.clone(), 1755 }, 1756 ); 1757 } 1758 _ => {} 1759 } 1760 Ok(()) 1761 } 1762 1763 fn apply_failure_side_effects( 1764 &mut self, 1765 command: &RadrootsSimplexAgentPendingCommand, 1766 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1767 if let RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 1768 delivery: Some(delivery), 1769 .. 1770 } = &command.kind 1771 { 1772 let _ = self 1773 .store 1774 .clear_staged_outbound_message(&command.connection_id, delivery.message_id)?; 1775 } 1776 Ok(()) 1777 } 1778 1779 fn enqueue_hello( 1780 &mut self, 1781 connection_id: &str, 1782 now: u64, 1783 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1784 if self.store.connection(connection_id)?.hello_sent { 1785 return Ok(()); 1786 } 1787 let send_queue = self.store.primary_send_queue(connection_id)?; 1788 let connection = self.store.connection(connection_id)?; 1789 let previous_hash = connection 1790 .delivery_cursor 1791 .last_sent_message_hash 1792 .clone() 1793 .unwrap_or_default(); 1794 let message_id = connection 1795 .delivery_cursor 1796 .last_sent_message_id 1797 .unwrap_or(0) 1798 .saturating_add(1); 1799 let frame = RadrootsSimplexAgentMessageFrame { 1800 header: RadrootsSimplexAgentMessageHeader { 1801 message_id, 1802 previous_message_hash: previous_hash, 1803 }, 1804 message: RadrootsSimplexAgentMessage::Hello, 1805 padding: Vec::new(), 1806 }; 1807 let ciphertext = 1808 encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?; 1809 let message_hash = Sha256::digest(&ciphertext).to_vec(); 1810 let prepared = self 1811 .store 1812 .prepare_outbound_message(connection_id, message_hash)?; 1813 let encrypted = self.next_encrypted_payload( 1814 connection_id, 1815 ciphertext, 1816 SimplexAgentPayloadKind::Message, 1817 )?; 1818 self.store.enqueue_command( 1819 connection_id, 1820 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 1821 queue: send_queue.descriptor.queue_address(), 1822 envelope: RadrootsSimplexAgentEnvelope::Message(encrypted), 1823 delivery: Some(RadrootsSimplexAgentOutboundMessage { 1824 message_id: prepared.message_id, 1825 message_hash: prepared.message_hash, 1826 }), 1827 }, 1828 now, 1829 )?; 1830 self.store.connection_mut(connection_id)?.hello_sent = true; 1831 Ok(()) 1832 } 1833 1834 fn encode_smp_message_body( 1835 &self, 1836 connection_id: &str, 1837 envelope: &RadrootsSimplexAgentEnvelope, 1838 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> { 1839 let shared_secret = self 1840 .store 1841 .connection(connection_id)? 1842 .shared_secret 1843 .clone() 1844 .ok_or_else(|| { 1845 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1846 "SimpleX connection `{connection_id}` has no shared queue secret" 1847 )) 1848 })?; 1849 let sender_public_key = match envelope { 1850 RadrootsSimplexAgentEnvelope::Confirmation { 1851 reply_queue: true, .. 1852 } => Some( 1853 self.store 1854 .connection(connection_id)? 1855 .local_e2e_public_key 1856 .clone() 1857 .ok_or_else(|| { 1858 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1859 "SimpleX connection `{connection_id}` is missing local E2E public key" 1860 )) 1861 })?, 1862 ), 1863 _ => None, 1864 }; 1865 let mut body = Vec::with_capacity(1 + 512); 1866 body.push(b'_'); 1867 body.extend_from_slice(&encode_envelope(envelope)?); 1868 let nonce = random_nonce() 1869 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 1870 let padded_len = match envelope { 1871 RadrootsSimplexAgentEnvelope::Confirmation { .. } => SIMPLEX_E2E_CONFIRMATION_LENGTH, 1872 _ => SIMPLEX_E2E_MESSAGE_LENGTH, 1873 }; 1874 let ciphertext = encrypt_padded(&shared_secret, &nonce, &body, padded_len) 1875 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 1876 encode_client_message_envelope(&SimplexClientMessageEnvelope { 1877 sender_public_key, 1878 nonce, 1879 ciphertext, 1880 }) 1881 } 1882 1883 fn process_queue_ids_response( 1884 &mut self, 1885 command: &RadrootsSimplexAgentPendingCommand, 1886 ids: RadrootsSimplexSmpQueueIdsResponse, 1887 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 1888 let RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } = &command.kind 1889 else { 1890 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 1891 "SimpleX IDS response received for non-create command".into(), 1892 )); 1893 }; 1894 1895 let old_address = descriptor.queue_address(); 1896 let sender_id = URL_SAFE_NO_PAD.encode(&ids.sender_id); 1897 let mut invitation_event = None; 1898 let mut join_confirmation = None; 1899 let subscribe_queue; 1900 1901 { 1902 let connection = self.store.connection_mut(&command.connection_id)?; 1903 let queue = connection 1904 .queues 1905 .iter_mut() 1906 .find(|queue| queue.descriptor.queue_address() == old_address) 1907 .ok_or_else(|| { 1908 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1909 "SimpleX connection `{}` missing receive queue for IDS", 1910 command.connection_id 1911 )) 1912 })?; 1913 let delivery_private_key = queue.delivery_private_key.clone().ok_or_else(|| { 1914 RadrootsSimplexAgentRuntimeError::Runtime( 1915 "SimpleX receive queue missing delivery private key".into(), 1916 ) 1917 })?; 1918 let server_dh_public_key = decode_x25519_public_key_x509(&ids.server_dh_public_key) 1919 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 1920 queue.delivery_shared_secret = Some( 1921 derive_shared_secret(&delivery_private_key, &server_dh_public_key).map_err( 1922 |error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()), 1923 )?, 1924 ); 1925 queue.entity_id = ids.recipient_id.clone(); 1926 queue.descriptor.queue_uri.sender_id = sender_id; 1927 if let Some(queue_mode) = ids.queue_mode { 1928 queue.descriptor.queue_uri.queue_mode = Some(queue_mode); 1929 } 1930 let new_address = queue.descriptor.queue_address(); 1931 subscribe_queue = new_address.clone(); 1932 1933 if connection.status == RadrootsSimplexAgentConnectionStatus::CreatePending { 1934 connection.status = RadrootsSimplexAgentConnectionStatus::InvitationReady; 1935 if let Some(invitation) = connection.invitation.as_mut() { 1936 invitation.invitation_queue = queue.descriptor.queue_uri.clone(); 1937 } 1938 if let Some(short_link) = connection.short_link.as_mut() { 1939 short_link.link_id = ids.link_id.clone().ok_or_else(|| { 1940 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1941 "SimpleX broker IDS response for `{}` did not include a short-link id", 1942 command.connection_id 1943 )) 1944 })?; 1945 short_link.hosts = queue.descriptor.queue_uri.server.hosts.clone(); 1946 short_link.port = queue.descriptor.queue_uri.server.port; 1947 invitation_event = Some(short_link.invitation_link()); 1948 } 1949 } else if connection.status == RadrootsSimplexAgentConnectionStatus::JoinPending { 1950 let local_x3dh_key_1 = connection.local_x3dh_key_1.as_ref().ok_or_else(|| { 1951 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1952 "SimpleX connection `{}` missing local X3DH key 1", 1953 command.connection_id 1954 )) 1955 })?; 1956 let local_x3dh_key_2 = connection.local_x3dh_key_2.as_ref().ok_or_else(|| { 1957 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1958 "SimpleX connection `{}` missing local X3DH key 2", 1959 command.connection_id 1960 )) 1961 })?; 1962 let ratchet_state = connection.ratchet_state.as_ref().ok_or_else(|| { 1963 RadrootsSimplexAgentRuntimeError::Runtime(format!( 1964 "SimpleX connection `{}` missing ratchet state", 1965 command.connection_id 1966 )) 1967 })?; 1968 join_confirmation = Some(( 1969 queue.descriptor.clone(), 1970 official_x3dh_params_from_parts( 1971 &local_x3dh_key_1.public_key, 1972 &local_x3dh_key_2.public_key, 1973 ratchet_state.current_pq_public_key.clone(), 1974 ratchet_state.pending_outbound_pq_ciphertext.clone(), 1975 )?, 1976 )); 1977 } 1978 } 1979 1980 self.store.enqueue_command( 1981 &command.connection_id, 1982 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { 1983 queue: subscribe_queue, 1984 }, 1985 command.ready_at, 1986 )?; 1987 if let Some(invitation) = invitation_event { 1988 self.events 1989 .push_back(RadrootsSimplexAgentRuntimeEvent::InvitationReady { 1990 connection_id: command.connection_id.clone(), 1991 invitation, 1992 }); 1993 } 1994 if let Some((reply_descriptor, e2e_ratchet_params)) = join_confirmation { 1995 let send_queue = self.store.primary_send_queue(&command.connection_id)?; 1996 let confirmation_payload = self.next_encrypted_payload( 1997 &command.connection_id, 1998 encode_decrypted_message( 1999 &RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { 2000 reply_queues: vec![reply_descriptor], 2001 info: Vec::new(), 2002 }, 2003 )?, 2004 SimplexAgentPayloadKind::ConnectionInfo, 2005 )?; 2006 self.store.enqueue_command( 2007 &command.connection_id, 2008 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 2009 queue: send_queue.descriptor.queue_address(), 2010 envelope: RadrootsSimplexAgentEnvelope::Confirmation { 2011 reply_queue: true, 2012 e2e_ratchet_params: Some(e2e_ratchet_params), 2013 encrypted: confirmation_payload, 2014 }, 2015 delivery: None, 2016 }, 2017 command.ready_at, 2018 )?; 2019 self.events 2020 .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { 2021 connection_id: command.connection_id.clone(), 2022 }); 2023 } 2024 Ok(()) 2025 } 2026 2027 fn process_received_message_response( 2028 &mut self, 2029 connection_id: &str, 2030 queue: &radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress, 2031 message: radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpReceivedMessage, 2032 transport_hash: Vec<u8>, 2033 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 2034 let connection = self.store.connection(connection_id)?; 2035 if connection.last_received_queue.as_ref() == Some(queue) 2036 && connection.last_received_broker_message_id.as_deref() 2037 == Some(message.message_id.as_slice()) 2038 { 2039 self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?; 2040 return Ok(()); 2041 } 2042 let received = self.decode_received_message_body(connection_id, queue, &message)?; 2043 if received.sent_body.is_empty() { 2044 return Ok(()); 2045 } 2046 let (envelope, derived_secret) = 2047 self.decode_agent_envelope_payload(connection_id, &received.sent_body)?; 2048 if let Some(shared_secret) = derived_secret { 2049 self.store.connection_mut(connection_id)?.shared_secret = Some(shared_secret); 2050 } 2051 if self.is_official_payload_replay(connection_id, &envelope)? { 2052 { 2053 let connection = self.store.connection_mut(connection_id)?; 2054 connection.last_received_queue = Some(queue.clone()); 2055 connection.last_received_broker_message_id = Some(message.message_id.clone()); 2056 } 2057 self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?; 2058 return Ok(()); 2059 } 2060 self.initialize_receiver_ratchet_from_confirmation(connection_id, &envelope)?; 2061 let decrypted = match self.extract_decrypted_message(connection_id, &envelope) { 2062 Ok(decrypted) => decrypted, 2063 Err(RadrootsSimplexAgentRuntimeError::Crypto( 2064 RadrootsSimplexSmpCryptoError::RatchetMessageRegression { .. }, 2065 )) => { 2066 { 2067 let connection = self.store.connection_mut(connection_id)?; 2068 connection.last_received_queue = Some(queue.clone()); 2069 connection.last_received_broker_message_id = Some(message.message_id.clone()); 2070 } 2071 self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?; 2072 return Ok(()); 2073 } 2074 Err(error) => return Err(error), 2075 }; 2076 let agent_message_hash = 2077 if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted { 2078 let encoded = encode_decrypted_message(&decrypted)?; 2079 let message_hash = Sha256::digest(&encoded).to_vec(); 2080 self.validate_inbound_frame_progress(connection_id, frame, &message_hash)?; 2081 Some(message_hash) 2082 } else { 2083 None 2084 }; 2085 let requires_app_ack = matches!( 2086 &decrypted, 2087 RadrootsSimplexAgentDecryptedMessage::Message(frame) 2088 if matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_)) 2089 ); 2090 { 2091 let connection = self.store.connection_mut(connection_id)?; 2092 connection.last_received_queue = Some(queue.clone()); 2093 connection.last_received_broker_message_id = Some(message.message_id.clone()); 2094 } 2095 let _ = received.timestamp; 2096 let _ = received.flags; 2097 if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted { 2098 self.store.record_inbound_message( 2099 connection_id, 2100 queue.clone(), 2101 message.message_id.clone(), 2102 frame.header.message_id, 2103 agent_message_hash.clone().unwrap_or_default(), 2104 )?; 2105 } 2106 let message_hash = agent_message_hash.unwrap_or_else(|| transport_hash); 2107 self.handle_inbound_decrypted_message(connection_id, decrypted, message_hash)?; 2108 if !requires_app_ack { 2109 self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?; 2110 } 2111 Ok(()) 2112 } 2113 2114 fn is_official_payload_replay( 2115 &self, 2116 connection_id: &str, 2117 envelope: &RadrootsSimplexAgentEnvelope, 2118 ) -> Result<bool, RadrootsSimplexAgentRuntimeError> { 2119 let official_message = match envelope { 2120 RadrootsSimplexAgentEnvelope::Confirmation { encrypted, .. } 2121 | RadrootsSimplexAgentEnvelope::Message(encrypted) 2122 | RadrootsSimplexAgentEnvelope::RatchetKey { encrypted, .. } => { 2123 encrypted.official_message.as_deref() 2124 } 2125 RadrootsSimplexAgentEnvelope::Invitation { .. } => None, 2126 }; 2127 let Some(official_message) = official_message else { 2128 return Ok(false); 2129 }; 2130 let connection = self.store.connection(connection_id)?; 2131 let Some(ratchet_state) = connection.ratchet_state.as_ref() else { 2132 return Ok(false); 2133 }; 2134 if ratchet_state.official_associated_data.is_none() { 2135 return Ok(false); 2136 } 2137 ratchet_state 2138 .is_official_payload_replay(official_message) 2139 .map_err(Into::into) 2140 } 2141 2142 fn validate_inbound_frame_progress( 2143 &self, 2144 connection_id: &str, 2145 frame: &RadrootsSimplexAgentMessageFrame, 2146 message_hash: &[u8], 2147 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 2148 if frame.header.message_id == 0 { 2149 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2150 "SimpleX inbound message id for `{connection_id}` must start at 1" 2151 ))); 2152 } 2153 let connection = self.store.connection(connection_id)?; 2154 let Some(last_message_id) = connection.delivery_cursor.last_received_message_id else { 2155 if connection.status == RadrootsSimplexAgentConnectionStatus::Connected 2156 && !connection.hello_received 2157 && frame.header.message_id == 2 2158 && !frame.header.previous_message_hash.is_empty() 2159 && matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_)) 2160 { 2161 return Ok(()); 2162 } 2163 if frame.header.message_id != 1 { 2164 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2165 "SimpleX inbound message id for `{connection_id}` started at `{}` instead of `1`", 2166 frame.header.message_id 2167 ))); 2168 } 2169 if !frame.header.previous_message_hash.is_empty() { 2170 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2171 "SimpleX first inbound message for `{connection_id}` carried a previous-message hash" 2172 ))); 2173 } 2174 return Ok(()); 2175 }; 2176 let last_message_hash = connection 2177 .delivery_cursor 2178 .last_received_message_hash 2179 .as_deref() 2180 .ok_or_else(|| { 2181 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2182 "SimpleX connection `{connection_id}` has a received message id without a message hash" 2183 )) 2184 })?; 2185 if frame.header.message_id == last_message_id { 2186 if message_hash == last_message_hash { 2187 return Ok(()); 2188 } 2189 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2190 "SimpleX inbound message id `{last_message_id}` for `{connection_id}` was replayed with a different message hash" 2191 ))); 2192 } 2193 if frame.header.message_id < last_message_id { 2194 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2195 "SimpleX inbound message id `{}` for `{connection_id}` regressed below `{last_message_id}`", 2196 frame.header.message_id 2197 ))); 2198 } 2199 let expected_message_id = last_message_id.checked_add(1).ok_or_else(|| { 2200 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2201 "SimpleX inbound message id for `{connection_id}` overflowed" 2202 )) 2203 })?; 2204 if frame.header.message_id != expected_message_id { 2205 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2206 "SimpleX inbound message id `{}` for `{connection_id}` skipped expected `{expected_message_id}`", 2207 frame.header.message_id 2208 ))); 2209 } 2210 if frame.header.previous_message_hash != last_message_hash { 2211 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2212 "SimpleX inbound message `{}` for `{connection_id}` carried an unexpected previous-message hash", 2213 frame.header.message_id 2214 ))); 2215 } 2216 Ok(()) 2217 } 2218 2219 fn decode_received_message_body( 2220 &mut self, 2221 connection_id: &str, 2222 queue: &radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress, 2223 message: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpReceivedMessage, 2224 ) -> Result<SimplexReceivedBody, RadrootsSimplexAgentRuntimeError> { 2225 let queue_record = self.store.queue_record(connection_id, queue)?; 2226 let delivery_secret = queue_record.delivery_shared_secret.ok_or_else(|| { 2227 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2228 "SimpleX receive queue on `{connection_id}` is missing delivery secret" 2229 )) 2230 })?; 2231 let decrypted = decrypt_padded( 2232 &delivery_secret, 2233 &message.message_id, 2234 &message.encrypted_body, 2235 ) 2236 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2237 decode_received_body(&decrypted) 2238 } 2239 2240 fn decode_agent_envelope_payload( 2241 &self, 2242 connection_id: &str, 2243 payload: &[u8], 2244 ) -> Result<(RadrootsSimplexAgentEnvelope, Option<Vec<u8>>), RadrootsSimplexAgentRuntimeError> 2245 { 2246 let sent = decode_client_message_envelope(payload)?; 2247 let derived_secret = match self.store.connection(connection_id)?.shared_secret.clone() { 2248 Some(secret) => Some(secret), 2249 None => { 2250 let sender_public_key = sent.sender_public_key.as_deref().ok_or_else(|| { 2251 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2252 "SimpleX connection `{connection_id}` received encrypted body without sender key" 2253 )) 2254 })?; 2255 let private_key = self 2256 .store 2257 .connection(connection_id)? 2258 .local_e2e_private_key 2259 .as_deref() 2260 .ok_or_else(|| { 2261 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2262 "SimpleX connection `{connection_id}` missing local E2E private key" 2263 )) 2264 })?; 2265 Some( 2266 derive_shared_secret(private_key, sender_public_key).map_err(|error| { 2267 RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()) 2268 })?, 2269 ) 2270 } 2271 }; 2272 let shared_secret = derived_secret.clone().ok_or_else(|| { 2273 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2274 "SimpleX connection `{connection_id}` has no shared secret" 2275 )) 2276 })?; 2277 let decrypted = decrypt_padded(&shared_secret, &sent.nonce, &sent.ciphertext) 2278 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2279 let (_, payload) = decrypted.split_first().ok_or_else(|| { 2280 RadrootsSimplexAgentRuntimeError::Runtime( 2281 "SimpleX decrypted client body is empty".into(), 2282 ) 2283 })?; 2284 let envelope = decode_envelope(payload)?; 2285 let should_store_secret = self 2286 .store 2287 .connection(connection_id)? 2288 .shared_secret 2289 .is_none() 2290 && sent.sender_public_key.is_some(); 2291 Ok(( 2292 envelope, 2293 if should_store_secret { 2294 derived_secret 2295 } else { 2296 None 2297 }, 2298 )) 2299 } 2300 2301 fn initialize_receiver_ratchet_from_confirmation( 2302 &mut self, 2303 connection_id: &str, 2304 envelope: &RadrootsSimplexAgentEnvelope, 2305 ) -> Result<(), RadrootsSimplexAgentRuntimeError> { 2306 let RadrootsSimplexAgentEnvelope::Confirmation { 2307 e2e_ratchet_params: Some(params), 2308 .. 2309 } = envelope 2310 else { 2311 return Ok(()); 2312 }; 2313 let connection = self.store.connection(connection_id)?; 2314 let local_key_1 = connection.local_x3dh_key_1.clone().ok_or_else(|| { 2315 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2316 "SimpleX connection `{connection_id}` missing local X3DH key 1" 2317 )) 2318 })?; 2319 let local_key_2 = connection.local_x3dh_key_2.clone().ok_or_else(|| { 2320 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2321 "SimpleX connection `{connection_id}` missing local X3DH key 2" 2322 )) 2323 })?; 2324 let local_pq_keypair = connection.local_pq_keypair.clone(); 2325 let local_key_1 = official_x3dh_keypair_from_agent(local_key_1); 2326 let local_key_2 = official_x3dh_keypair_from_agent(local_key_2); 2327 let receiver_init = if params.pq_public_key.is_some() || params.pq_ciphertext.is_some() { 2328 let local_pq_keypair = local_pq_keypair.as_ref().ok_or_else(|| { 2329 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2330 "SimpleX connection `{connection_id}` missing local PQ keypair" 2331 )) 2332 })?; 2333 official_x3dh_receiver_init_accepting_pq( 2334 &local_key_1, 2335 &local_key_2, 2336 &official_pq_keypair_from_agent(local_pq_keypair.clone()), 2337 params, 2338 ) 2339 .map(|init| init.init) 2340 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))? 2341 } else { 2342 official_x3dh_receiver_init(&local_key_1, &local_key_2, params) 2343 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))? 2344 }; 2345 let connection = self.store.connection_mut(connection_id)?; 2346 let ratchet_state = connection.ratchet_state.as_mut().ok_or_else(|| { 2347 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2348 "SimpleX connection `{connection_id}` has no ratchet state" 2349 )) 2350 })?; 2351 if let Some(local_pq_keypair) = local_pq_keypair { 2352 ratchet_state.current_pq_public_key = Some(local_pq_keypair.public_key); 2353 ratchet_state.local_pq_private_key = Some(local_pq_keypair.private_key); 2354 } 2355 ratchet_state 2356 .initialize_official_receiver(local_key_2.private_key, receiver_init) 2357 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())) 2358 } 2359 2360 fn next_encrypted_payload( 2361 &mut self, 2362 connection_id: &str, 2363 plaintext: Vec<u8>, 2364 payload_kind: SimplexAgentPayloadKind, 2365 ) -> Result<RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentRuntimeError> { 2366 let shared_secret = self 2367 .store 2368 .connection(connection_id)? 2369 .shared_secret 2370 .clone() 2371 .ok_or_else(|| { 2372 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2373 "SimpleX connection `{connection_id}` has no shared secret" 2374 )) 2375 })?; 2376 let padded_len = self.agent_payload_padded_len(connection_id, payload_kind)?; 2377 let official_message = self 2378 .store 2379 .connection_mut(connection_id)? 2380 .ratchet_state 2381 .as_mut() 2382 .ok_or_else(|| { 2383 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2384 "SimpleX connection `{connection_id}` has no ratchet state" 2385 )) 2386 })? 2387 .encrypt_official_payload(&shared_secret, &plaintext, padded_len) 2388 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2389 Ok(RadrootsSimplexAgentEncryptedPayload { 2390 ratchet_header: None, 2391 official_message: Some(official_message), 2392 ciphertext: Vec::new(), 2393 }) 2394 } 2395 2396 fn extract_decrypted_message( 2397 &mut self, 2398 connection_id: &str, 2399 envelope: &RadrootsSimplexAgentEnvelope, 2400 ) -> Result<RadrootsSimplexAgentDecryptedMessage, RadrootsSimplexAgentRuntimeError> { 2401 match envelope { 2402 RadrootsSimplexAgentEnvelope::Confirmation { encrypted, .. } 2403 | RadrootsSimplexAgentEnvelope::Message(encrypted) 2404 | RadrootsSimplexAgentEnvelope::RatchetKey { encrypted, .. } => { 2405 let plaintext = self.decrypt_agent_payload(connection_id, encrypted)?; 2406 decode_decrypted_message(&plaintext).map_err(Into::into) 2407 } 2408 RadrootsSimplexAgentEnvelope::Invitation { 2409 connection_info, .. 2410 } => decode_decrypted_message(connection_info).map_err(Into::into), 2411 } 2412 } 2413 2414 fn decrypt_agent_payload( 2415 &mut self, 2416 connection_id: &str, 2417 encrypted: &RadrootsSimplexAgentEncryptedPayload, 2418 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> { 2419 let shared_secret = self 2420 .store 2421 .connection(connection_id)? 2422 .shared_secret 2423 .clone() 2424 .ok_or_else(|| { 2425 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2426 "SimpleX connection `{connection_id}` has no shared secret" 2427 )) 2428 })?; 2429 if let Some(official_message) = encrypted.official_message.as_ref() { 2430 return self 2431 .store 2432 .connection_mut(connection_id)? 2433 .ratchet_state 2434 .as_mut() 2435 .ok_or_else(|| { 2436 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2437 "SimpleX connection `{connection_id}` has no ratchet state" 2438 )) 2439 })? 2440 .decrypt_official_payload(&shared_secret, official_message) 2441 .map_err(Into::into); 2442 } 2443 let header = encrypted.ratchet_header.as_ref().ok_or_else(|| { 2444 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2445 "SimpleX connection `{connection_id}` received agent payload without ratchet header" 2446 )) 2447 })?; 2448 self.store 2449 .connection_mut(connection_id)? 2450 .ratchet_state 2451 .as_mut() 2452 .ok_or_else(|| { 2453 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2454 "SimpleX connection `{connection_id}` has no ratchet state" 2455 )) 2456 })? 2457 .decrypt_payload(&shared_secret, header, &encrypted.ciphertext) 2458 .map_err(Into::into) 2459 } 2460 2461 fn agent_payload_padded_len( 2462 &self, 2463 connection_id: &str, 2464 payload_kind: SimplexAgentPayloadKind, 2465 ) -> Result<usize, RadrootsSimplexAgentRuntimeError> { 2466 let ratchet = self 2467 .store 2468 .connection(connection_id)? 2469 .ratchet_state 2470 .as_ref() 2471 .ok_or_else(|| { 2472 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2473 "SimpleX connection `{connection_id}` has no ratchet state" 2474 )) 2475 })?; 2476 let pq_enabled = ratchet.current_pq_public_key.is_some() 2477 || ratchet.remote_pq_public_key.is_some() 2478 || ratchet.current_pq_shared_secret.is_some() 2479 || ratchet.local_pq_private_key.is_some(); 2480 Ok(match (payload_kind, pq_enabled) { 2481 (SimplexAgentPayloadKind::ConnectionInfo, true) => { 2482 SIMPLEX_AGENT_E2E_CONN_INFO_PQ_LENGTH 2483 } 2484 (SimplexAgentPayloadKind::ConnectionInfo, false) => SIMPLEX_AGENT_E2E_CONN_INFO_LENGTH, 2485 (SimplexAgentPayloadKind::Message, true) => SIMPLEX_AGENT_E2E_MESSAGE_PQ_LENGTH, 2486 (SimplexAgentPayloadKind::Message, false) => SIMPLEX_AGENT_E2E_MESSAGE_LENGTH, 2487 }) 2488 } 2489 2490 #[cfg(feature = "std")] 2491 fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> { 2492 self.store.flush().map_err(Into::into) 2493 } 2494 2495 #[cfg(not(feature = "std"))] 2496 fn flush_store(&self) -> Result<(), RadrootsSimplexAgentRuntimeError> { 2497 Ok(()) 2498 } 2499 } 2500 2501 fn derive_material(label: &[u8], parts: &[&[u8]]) -> Vec<u8> { 2502 let mut hasher = Sha256::new(); 2503 hasher.update(label); 2504 for part in parts { 2505 hasher.update((*part).len().to_be_bytes()); 2506 hasher.update(*part); 2507 } 2508 hasher.finalize().to_vec() 2509 } 2510 2511 fn agent_x3dh_keypair( 2512 keypair: RadrootsSimplexOfficialX448Keypair, 2513 ) -> RadrootsSimplexAgentX3dhKeypair { 2514 RadrootsSimplexAgentX3dhKeypair { 2515 public_key: keypair.public_key, 2516 private_key: keypair.private_key, 2517 } 2518 } 2519 2520 fn official_x3dh_keypair_from_agent( 2521 keypair: RadrootsSimplexAgentX3dhKeypair, 2522 ) -> RadrootsSimplexOfficialX448Keypair { 2523 RadrootsSimplexOfficialX448Keypair { 2524 public_key: keypair.public_key, 2525 private_key: keypair.private_key, 2526 } 2527 } 2528 2529 fn agent_pq_keypair( 2530 keypair: RadrootsSimplexOfficialSntrup761Keypair, 2531 ) -> RadrootsSimplexAgentPqKeypair { 2532 RadrootsSimplexAgentPqKeypair { 2533 public_key: keypair.public_key, 2534 private_key: keypair.private_key, 2535 } 2536 } 2537 2538 fn official_pq_keypair_from_agent( 2539 keypair: RadrootsSimplexAgentPqKeypair, 2540 ) -> RadrootsSimplexOfficialSntrup761Keypair { 2541 RadrootsSimplexOfficialSntrup761Keypair { 2542 public_key: keypair.public_key, 2543 private_key: keypair.private_key, 2544 } 2545 } 2546 2547 fn official_x3dh_params_from_parts( 2548 key_1: &[u8], 2549 key_2: &[u8], 2550 pq_public_key: Option<Vec<u8>>, 2551 pq_ciphertext: Option<Vec<u8>>, 2552 ) -> Result<RadrootsSimplexOfficialX3dhParams, RadrootsSimplexAgentRuntimeError> { 2553 Ok(RadrootsSimplexOfficialX3dhParams { 2554 version_range: RadrootsSimplexSmpVersionRange::new( 2555 RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION, 2556 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, 2557 ) 2558 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?, 2559 key_1: key_1.to_vec(), 2560 key_2: key_2.to_vec(), 2561 pq_public_key, 2562 pq_ciphertext, 2563 }) 2564 } 2565 2566 fn prepare_short_invitation_link_data( 2567 invitation: &RadrootsSimplexAgentConnectionLink, 2568 ) -> Result<SimplexPreparedShortInvitationLinkData, RadrootsSimplexAgentRuntimeError> { 2569 let root_keypair = RadrootsSimplexSmpEd25519Keypair::generate() 2570 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2571 let fixed_data = encode_short_invitation_fixed_data(&root_keypair.public_key, invitation)?; 2572 let user_data = encode_short_invitation_user_data(invitation)?; 2573 let (link_key, signed_link_data) = sign_short_link_data(&root_keypair, &fixed_data, &user_data) 2574 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2575 let link_data_key = derive_invitation_short_link_data_key(&link_key) 2576 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2577 let encrypted_link_data = encrypt_short_link_data(&link_data_key, &signed_link_data) 2578 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?; 2579 Ok(SimplexPreparedShortInvitationLinkData { 2580 link_key, 2581 link_public_signature_key: root_keypair.public_key, 2582 link_private_signature_key: root_keypair.private_key, 2583 encrypted_link_data, 2584 }) 2585 } 2586 2587 fn short_invitation_server( 2588 invitation: &RadrootsSimplexAgentShortInvitationLink, 2589 ) -> Result<RadrootsSimplexSmpServerAddress, RadrootsSimplexAgentRuntimeError> { 2590 if invitation.hosts.is_empty() { 2591 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2592 "SimpleX short invitation link does not include a relay host".into(), 2593 )); 2594 } 2595 let server_key_hash = invitation.server_key_hash.as_ref().ok_or_else(|| { 2596 RadrootsSimplexAgentRuntimeError::Runtime( 2597 "SimpleX short invitation link does not include a server key hash".into(), 2598 ) 2599 })?; 2600 Ok(RadrootsSimplexSmpServerAddress { 2601 server_identity: URL_SAFE_NO_PAD.encode(server_key_hash), 2602 hosts: invitation.hosts.clone(), 2603 port: invitation.port, 2604 }) 2605 } 2606 2607 fn decode_server_key_hash( 2608 server_identity: &str, 2609 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> { 2610 URL_SAFE_NO_PAD 2611 .decode(server_identity.as_bytes()) 2612 .or_else(|_| URL_SAFE.decode(server_identity.as_bytes())) 2613 .map_err(|error| { 2614 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2615 "failed to decode SimpleX server identity: {error}" 2616 )) 2617 }) 2618 } 2619 2620 fn correlation_id_from_material( 2621 label: &[u8], 2622 parts: &[Vec<u8>], 2623 ) -> RadrootsSimplexSmpCorrelationId { 2624 let refs = parts.iter().map(Vec::as_slice).collect::<Vec<&[u8]>>(); 2625 let digest = derive_material(label, &refs); 2626 let mut correlation = [0_u8; RadrootsSimplexSmpCorrelationId::LENGTH]; 2627 correlation.copy_from_slice(&digest[..RadrootsSimplexSmpCorrelationId::LENGTH]); 2628 RadrootsSimplexSmpCorrelationId::new(correlation) 2629 } 2630 2631 fn short_link_sender_id(correlation_id: &RadrootsSimplexSmpCorrelationId) -> Vec<u8> { 2632 let digest = Sha3_384::digest(correlation_id.as_bytes()); 2633 digest[..RadrootsSimplexSmpCorrelationId::LENGTH].to_vec() 2634 } 2635 2636 fn encode_queue_public_key(public_key: &[u8]) -> Result<String, RadrootsSimplexSmpCryptoError> { 2637 Ok(URL_SAFE.encode(encode_x25519_public_key_x509(public_key)?)) 2638 } 2639 2640 fn decode_queue_public_key(encoded: &str) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> { 2641 let bytes = URL_SAFE 2642 .decode(encoded.as_bytes()) 2643 .or_else(|_| URL_SAFE_NO_PAD.decode(encoded.as_bytes())) 2644 .map_err(|error| { 2645 RadrootsSimplexAgentRuntimeError::Runtime(format!( 2646 "failed to decode SimpleX queue E2E public key: {error}" 2647 )) 2648 })?; 2649 decode_x25519_public_key_x509(&bytes) 2650 .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())) 2651 } 2652 2653 fn placeholder_sender_id(seed_a: &[u8], seed_b: &[u8]) -> String { 2654 let digest = derive_material(b"simplex-placeholder-sender-id", &[seed_a, seed_b]); 2655 URL_SAFE_NO_PAD.encode(&digest[..18]) 2656 } 2657 2658 fn queue_for_command( 2659 command: &RadrootsSimplexAgentPendingCommand, 2660 ) -> Option<RadrootsSimplexAgentQueueAddress> { 2661 match &command.kind { 2662 RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => { 2663 Some(descriptor.queue_address()) 2664 } 2665 RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, .. } 2666 | RadrootsSimplexAgentPendingCommandKind::SendEnvelope { queue, .. } 2667 | RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } 2668 | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue } 2669 | RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { queue, .. } 2670 | RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { queue, .. } => { 2671 Some(queue.clone()) 2672 } 2673 RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => descriptors 2674 .first() 2675 .map(RadrootsSimplexAgentQueueDescriptor::queue_address), 2676 RadrootsSimplexAgentPendingCommandKind::TestQueues { queues } => queues.first().cloned(), 2677 RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. } 2678 | RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => None, 2679 } 2680 } 2681 2682 fn pending_command_kind_label(command: &RadrootsSimplexAgentPendingCommand) -> &'static str { 2683 match command.kind { 2684 RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. } => "create_queue", 2685 RadrootsSimplexAgentPendingCommandKind::SecureQueue { .. } => "secure_queue", 2686 RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. } => "subscribe_queue", 2687 RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. } => "get_queue_message", 2688 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. } => "ack_inbox_message", 2689 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { .. } => "send_envelope", 2690 RadrootsSimplexAgentPendingCommandKind::RotateQueues { .. } => "rotate_queues", 2691 RadrootsSimplexAgentPendingCommandKind::TestQueues { .. } => "test_queues", 2692 RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { .. } => "set_queue_link_data", 2693 RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => "get_queue_link_data", 2694 RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. } => { 2695 "secure_get_queue_link_data" 2696 } 2697 } 2698 } 2699 2700 fn is_empty_queue_no_msg( 2701 command: &RadrootsSimplexAgentPendingCommand, 2702 error: &RadrootsSimplexSmpError, 2703 ) -> bool { 2704 matches!( 2705 command.kind, 2706 RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. } 2707 | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. } 2708 | RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. } 2709 ) && matches!(error, RadrootsSimplexSmpError::NoMsg) 2710 } 2711 2712 fn encode_client_message_envelope( 2713 envelope: &SimplexClientMessageEnvelope, 2714 ) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> { 2715 let mut buffer = Vec::with_capacity( 2716 2 + 1 2717 + envelope 2718 .sender_public_key 2719 .as_ref() 2720 .map_or(0, |value| 1 + value.len()) 2721 + 24 2722 + envelope.ciphertext.len(), 2723 ); 2724 buffer.extend_from_slice(&RADROOTS_SIMPLEX_SMP_CURRENT_CLIENT_VERSION.to_be_bytes()); 2725 match envelope.sender_public_key.as_deref() { 2726 Some(sender_public_key) => { 2727 if sender_public_key.len() > u8::MAX as usize { 2728 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2729 "SimpleX sender public key exceeds short-field limit".into(), 2730 )); 2731 } 2732 buffer.push(b'1'); 2733 buffer.push(sender_public_key.len() as u8); 2734 buffer.extend_from_slice(sender_public_key); 2735 } 2736 None => buffer.push(b'0'), 2737 } 2738 buffer.extend_from_slice(&envelope.nonce); 2739 buffer.extend_from_slice(&envelope.ciphertext); 2740 Ok(buffer) 2741 } 2742 2743 fn decode_client_message_envelope( 2744 bytes: &[u8], 2745 ) -> Result<SimplexClientMessageEnvelope, RadrootsSimplexAgentRuntimeError> { 2746 if bytes.len() < 2 + 1 + RADROOTS_SIMPLEX_SMP_NONCE_LENGTH { 2747 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2748 "SimpleX client message envelope is truncated".into(), 2749 )); 2750 } 2751 let _version = u16::from_be_bytes([bytes[0], bytes[1]]); 2752 let mut index = 2; 2753 let sender_public_key = match bytes[index] { 2754 b'0' => { 2755 index += 1; 2756 None 2757 } 2758 b'1' => { 2759 index += 1; 2760 let length = *bytes.get(index).ok_or_else(|| { 2761 RadrootsSimplexAgentRuntimeError::Runtime( 2762 "SimpleX confirmation envelope is missing sender key length".into(), 2763 ) 2764 })? as usize; 2765 index += 1; 2766 let sender_public_key = bytes 2767 .get(index..index + length) 2768 .ok_or_else(|| { 2769 RadrootsSimplexAgentRuntimeError::Runtime( 2770 "SimpleX confirmation envelope is missing sender key bytes".into(), 2771 ) 2772 })? 2773 .to_vec(); 2774 index += length; 2775 Some(sender_public_key) 2776 } 2777 _ => { 2778 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2779 "SimpleX client message envelope has an unknown public header".into(), 2780 )); 2781 } 2782 }; 2783 let nonce_slice = bytes 2784 .get(index..index + RADROOTS_SIMPLEX_SMP_NONCE_LENGTH) 2785 .ok_or_else(|| { 2786 RadrootsSimplexAgentRuntimeError::Runtime( 2787 "SimpleX client message envelope is missing nonce".into(), 2788 ) 2789 })?; 2790 let mut nonce = [0_u8; RADROOTS_SIMPLEX_SMP_NONCE_LENGTH]; 2791 nonce.copy_from_slice(nonce_slice); 2792 index += RADROOTS_SIMPLEX_SMP_NONCE_LENGTH; 2793 let ciphertext = bytes 2794 .get(index..) 2795 .ok_or_else(|| { 2796 RadrootsSimplexAgentRuntimeError::Runtime( 2797 "SimpleX client message envelope is missing ciphertext".into(), 2798 ) 2799 })? 2800 .to_vec(); 2801 Ok(SimplexClientMessageEnvelope { 2802 sender_public_key, 2803 nonce, 2804 ciphertext, 2805 }) 2806 } 2807 2808 fn decode_received_body( 2809 bytes: &[u8], 2810 ) -> Result<SimplexReceivedBody, RadrootsSimplexAgentRuntimeError> { 2811 if let Some(timestamp_bytes) = bytes.strip_prefix(b"QUOTA ") { 2812 let timestamp: [u8; 8] = timestamp_bytes.try_into().map_err(|_| { 2813 RadrootsSimplexAgentRuntimeError::Runtime( 2814 "SimpleX quota notification has an invalid timestamp".into(), 2815 ) 2816 })?; 2817 return Ok(SimplexReceivedBody { 2818 timestamp: u64::from_be_bytes(timestamp), 2819 flags: RadrootsSimplexSmpMessageFlags::notifications_disabled(), 2820 sent_body: Vec::new(), 2821 }); 2822 } 2823 if bytes.len() < 10 { 2824 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2825 "SimpleX received body is truncated".into(), 2826 )); 2827 } 2828 let timestamp = u64::from_be_bytes(bytes[..8].try_into().map_err(|_| { 2829 RadrootsSimplexAgentRuntimeError::Runtime( 2830 "SimpleX received body is missing timestamp".into(), 2831 ) 2832 })?); 2833 let flags_offset = bytes[8..] 2834 .iter() 2835 .position(|byte| *byte == b' ') 2836 .ok_or_else(|| { 2837 RadrootsSimplexAgentRuntimeError::Runtime( 2838 "SimpleX received body is missing message flags separator".into(), 2839 ) 2840 })? 2841 + 8; 2842 let flags_bytes = &bytes[8..flags_offset]; 2843 if flags_bytes.is_empty() { 2844 return Err(RadrootsSimplexAgentRuntimeError::Runtime( 2845 "SimpleX received body is missing message flags".into(), 2846 )); 2847 } 2848 let flags = RadrootsSimplexSmpMessageFlags { 2849 notification: match flags_bytes[0] { 2850 b'F' => false, 2851 b'T' => true, 2852 other => { 2853 return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( 2854 "SimpleX received body has invalid notification flag `{other}`" 2855 ))); 2856 } 2857 }, 2858 reserved: flags_bytes[1..].to_vec(), 2859 }; 2860 Ok(SimplexReceivedBody { 2861 timestamp, 2862 flags, 2863 sent_body: bytes[flags_offset + 1..].to_vec(), 2864 }) 2865 } 2866 2867 #[cfg(test)] 2868 mod tests { 2869 use super::*; 2870 use alloc::collections::VecDeque; 2871 use radroots_simplex_smp_crypto::prelude::{ 2872 RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope, 2873 RadrootsSimplexSmpX25519Keypair, 2874 }; 2875 use radroots_simplex_smp_proto::prelude::{ 2876 RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpError, 2877 RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpVersionRange, 2878 }; 2879 use radroots_simplex_smp_transport::prelude::RadrootsSimplexSmpTransportBlock; 2880 2881 fn invitation_queue() -> RadrootsSimplexSmpQueueUri { 2882 RadrootsSimplexSmpQueueUri::parse( 2883 "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m", 2884 ) 2885 .unwrap() 2886 } 2887 2888 fn reply_queue() -> RadrootsSimplexSmpQueueUri { 2889 RadrootsSimplexSmpQueueUri::parse( 2890 "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cmVwbHk#/?v=4&dh=YmF6cXV4&q=m", 2891 ) 2892 .unwrap() 2893 } 2894 2895 fn reply_descriptor() -> RadrootsSimplexAgentQueueDescriptor { 2896 RadrootsSimplexAgentQueueDescriptor { 2897 queue_uri: reply_queue(), 2898 replaced_queue: None, 2899 primary: true, 2900 sender_key: None, 2901 } 2902 } 2903 2904 fn hello_message(message_id: u64) -> RadrootsSimplexAgentDecryptedMessage { 2905 RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { 2906 header: RadrootsSimplexAgentMessageHeader { 2907 message_id, 2908 previous_message_hash: Vec::new(), 2909 }, 2910 message: RadrootsSimplexAgentMessage::Hello, 2911 padding: Vec::new(), 2912 }) 2913 } 2914 2915 fn user_message_frame( 2916 message_id: u64, 2917 previous_message_hash: Vec<u8>, 2918 body: &[u8], 2919 ) -> RadrootsSimplexAgentMessageFrame { 2920 RadrootsSimplexAgentMessageFrame { 2921 header: RadrootsSimplexAgentMessageHeader { 2922 message_id, 2923 previous_message_hash, 2924 }, 2925 message: RadrootsSimplexAgentMessage::UserMessage(body.to_vec()), 2926 padding: Vec::new(), 2927 } 2928 } 2929 2930 fn agent_message_hash(frame: &RadrootsSimplexAgentMessageFrame) -> Vec<u8> { 2931 let encoded = encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message( 2932 frame.clone(), 2933 )) 2934 .unwrap(); 2935 Sha256::digest(&encoded).to_vec() 2936 } 2937 2938 #[test] 2939 fn received_body_decodes_official_message_flags() { 2940 let mut body = 1_725_555_000_i64.to_be_bytes().to_vec(); 2941 body.extend_from_slice(b"T rr-synth-body"); 2942 let decoded = decode_received_body(&body).unwrap(); 2943 assert_eq!(decoded.timestamp, 1_725_555_000_u64); 2944 assert!(decoded.flags.notification); 2945 assert!(decoded.flags.reserved.is_empty()); 2946 assert_eq!(decoded.sent_body, b"rr-synth-body"); 2947 2948 body[8] = b'F'; 2949 let decoded = decode_received_body(&body).unwrap(); 2950 assert!(!decoded.flags.notification); 2951 } 2952 2953 fn receipt_message( 2954 frame_message_id: u64, 2955 message_id: u64, 2956 message_hash: Vec<u8>, 2957 ) -> RadrootsSimplexAgentDecryptedMessage { 2958 RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { 2959 header: RadrootsSimplexAgentMessageHeader { 2960 message_id: frame_message_id, 2961 previous_message_hash: Vec::new(), 2962 }, 2963 message: RadrootsSimplexAgentMessage::Receipt(RadrootsSimplexAgentMessageReceipt { 2964 message_id, 2965 message_hash, 2966 receipt_info: Vec::new(), 2967 }), 2968 padding: Vec::new(), 2969 }) 2970 } 2971 2972 fn mark_connected(runtime: &mut RadrootsSimplexAgentRuntime, connection_id: &str) { 2973 runtime 2974 .store 2975 .set_status( 2976 connection_id, 2977 RadrootsSimplexAgentConnectionStatus::Connected, 2978 ) 2979 .unwrap(); 2980 } 2981 2982 fn initialize_test_outbound_official_ratchet( 2983 runtime: &mut RadrootsSimplexAgentRuntime, 2984 connection_id: &str, 2985 ) { 2986 let local_key_1 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-local-x3dh-1"); 2987 let local_key_2 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-local-x3dh-2"); 2988 let remote_key_1 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-remote-x3dh-1"); 2989 let remote_key_2 = official_x448_keypair_from_seed(b"rr-synth-runtime-test-remote-x3dh-2"); 2990 let remote_params = RadrootsSimplexOfficialX3dhParams { 2991 version_range: RadrootsSimplexSmpVersionRange::new( 2992 RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION, 2993 RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, 2994 ) 2995 .unwrap(), 2996 key_1: remote_key_1.public_key, 2997 key_2: remote_key_2.public_key.clone(), 2998 pq_public_key: None, 2999 pq_ciphertext: None, 3000 }; 3001 let sender_init = 3002 official_x3dh_sender_init(&local_key_1, &local_key_2, &remote_params).unwrap(); 3003 let mut ratchet = RadrootsSimplexSmpRatchetState::responder( 3004 local_key_2.public_key, 3005 remote_key_2.public_key, 3006 None, 3007 ) 3008 .unwrap(); 3009 ratchet 3010 .initialize_official_sender(local_key_2.private_key, sender_init) 3011 .unwrap(); 3012 runtime 3013 .store 3014 .connection_mut(connection_id) 3015 .unwrap() 3016 .ratchet_state = Some(ratchet); 3017 } 3018 3019 fn ids_response( 3020 recipient_id: &[u8], 3021 sender_id: &[u8], 3022 seed: &[u8], 3023 ) -> RadrootsSimplexSmpBrokerMessage { 3024 RadrootsSimplexSmpBrokerMessage::Ids(RadrootsSimplexSmpQueueIdsResponse { 3025 recipient_id: recipient_id.to_vec(), 3026 sender_id: sender_id.to_vec(), 3027 server_dh_public_key: RadrootsSimplexSmpX25519Keypair::from_seed(seed).public_key, 3028 queue_mode: Some(RadrootsSimplexSmpQueueMode::Messaging), 3029 link_id: Some(synthetic_link_id(seed)), 3030 service_id: None, 3031 server_notification_credentials: None, 3032 }) 3033 } 3034 3035 fn synthetic_link_id(seed: &[u8]) -> Vec<u8> { 3036 let mut hasher = Sha256::new(); 3037 hasher.update(b"rr-synth-runtime-link-id"); 3038 hasher.update(seed); 3039 let digest = hasher.finalize(); 3040 digest[..24].to_vec() 3041 } 3042 3043 #[derive(Default)] 3044 struct ScriptedTransport { 3045 responses: VecDeque<RadrootsSimplexSmpBrokerMessage>, 3046 subscription_responses: VecDeque<RadrootsSimplexSmpBrokerTransmission>, 3047 requests: Vec<RadrootsSimplexSmpTransportRequest>, 3048 subscription_requests: Vec<RadrootsSimplexSmpSubscriptionReceiveRequest>, 3049 } 3050 3051 impl ScriptedTransport { 3052 fn with_responses(responses: Vec<RadrootsSimplexSmpBrokerMessage>) -> Self { 3053 Self { 3054 responses: responses.into(), 3055 subscription_responses: VecDeque::new(), 3056 requests: Vec::new(), 3057 subscription_requests: Vec::new(), 3058 } 3059 } 3060 3061 fn with_subscription_responses( 3062 responses: Vec<RadrootsSimplexSmpBrokerTransmission>, 3063 ) -> Self { 3064 Self { 3065 responses: VecDeque::new(), 3066 subscription_responses: responses.into(), 3067 requests: Vec::new(), 3068 subscription_requests: Vec::new(), 3069 } 3070 } 3071 } 3072 3073 impl RadrootsSimplexSmpCommandTransport for ScriptedTransport { 3074 type Error = String; 3075 3076 fn execute( 3077 &mut self, 3078 request: RadrootsSimplexSmpTransportRequest, 3079 ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> { 3080 let correlation_id = request 3081 .correlation_id 3082 .ok_or_else(|| "missing scripted transport correlation id".to_owned())?; 3083 let scope = RadrootsSimplexSmpQueueAuthorizationScope::new( 3084 b"scripted-session".to_vec(), 3085 correlation_id, 3086 request.entity_id.clone(), 3087 ) 3088 .map_err(|error| error.to_string())?; 3089 let material = RadrootsSimplexSmpQueueAuthorizationMaterial::for_command( 3090 &scope, 3091 &request.command, 3092 request.transport_version, 3093 &request.authorization, 3094 ) 3095 .map_err(|error| error.to_string())?; 3096 let transmission = 3097 radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommandTransmission { 3098 authorization: material.authorization, 3099 correlation_id: Some(correlation_id), 3100 entity_id: request.entity_id.clone(), 3101 command: request.command.clone(), 3102 }; 3103 let block = RadrootsSimplexSmpTransportBlock::from_current_command_transmissions(&[ 3104 transmission.clone(), 3105 ]) 3106 .map_err(|error| error.to_string())?; 3107 let encoded = block.encode().map_err(|error| error.to_string())?; 3108 let decoded = RadrootsSimplexSmpTransportBlock::decode(&encoded) 3109 .map_err(|error| error.to_string())?; 3110 let decoded_transmissions = decoded 3111 .decode_command_transmissions(request.transport_version) 3112 .map_err(|error| error.to_string())?; 3113 assert_eq!(decoded_transmissions.len(), 1); 3114 assert_eq!(decoded_transmissions[0], transmission); 3115 3116 let response_message = self 3117 .responses 3118 .pop_front() 3119 .ok_or_else(|| "missing scripted transport response".to_owned())?; 3120 let response_transmission = RadrootsSimplexSmpBrokerTransmission { 3121 authorization: Vec::new(), 3122 correlation_id: Some(correlation_id), 3123 entity_id: request.entity_id.clone(), 3124 message: response_message, 3125 }; 3126 let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions( 3127 &[response_transmission.clone()], 3128 request.transport_version, 3129 ) 3130 .map_err(|error| error.to_string())?; 3131 let response_encoded = response_block.encode().map_err(|error| error.to_string())?; 3132 self.requests.push(request.clone()); 3133 Ok(RadrootsSimplexSmpTransportResponse { 3134 server: request.server, 3135 transport_version: request.transport_version, 3136 transmission: response_transmission, 3137 transport_hash: Sha256::digest(&response_encoded).to_vec(), 3138 }) 3139 } 3140 } 3141 3142 impl RadrootsSimplexSmpSubscriptionTransport for ScriptedTransport { 3143 fn receive_subscription( 3144 &mut self, 3145 request: RadrootsSimplexSmpSubscriptionReceiveRequest, 3146 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> { 3147 self.subscription_requests.push(request.clone()); 3148 let Some(response_transmission) = self.subscription_responses.pop_front() else { 3149 return Ok(None); 3150 }; 3151 let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions( 3152 &[response_transmission.clone()], 3153 RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 3154 ) 3155 .map_err(|error| error.to_string())?; 3156 let response_encoded = response_block.encode().map_err(|error| error.to_string())?; 3157 Ok(Some(RadrootsSimplexSmpTransportResponse { 3158 server: request.server, 3159 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 3160 transmission: response_transmission, 3161 transport_hash: Sha256::digest(&response_encoded).to_vec(), 3162 })) 3163 } 3164 } 3165 3166 #[test] 3167 fn create_and_join_commands_execute_through_transport() { 3168 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3169 let created = runtime 3170 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3171 .unwrap(); 3172 let invitation = runtime 3173 .store 3174 .connection(&created) 3175 .unwrap() 3176 .invitation 3177 .clone() 3178 .unwrap(); 3179 let joined = runtime 3180 .join_connection(invitation, reply_queue(), 20) 3181 .unwrap(); 3182 3183 let mut transport = ScriptedTransport::with_responses(vec![ 3184 ids_response(b"recipient", b"sender", b"server-dh"), 3185 RadrootsSimplexSmpBrokerMessage::Ok, 3186 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 3187 RadrootsSimplexSmpBrokerMessage::Ok, 3188 RadrootsSimplexSmpBrokerMessage::Ok, 3189 RadrootsSimplexSmpBrokerMessage::Ok, 3190 ]); 3191 runtime 3192 .execute_ready_commands(&mut transport, 30, 16) 3193 .unwrap(); 3194 3195 let created_queue = runtime.store.receive_queues(&created).unwrap(); 3196 assert!(created_queue[0].subscribed); 3197 assert_eq!(transport.requests.len(), 6); 3198 let RadrootsSimplexSmpCommand::New(create_request) = &transport.requests[0].command else { 3199 panic!("first request should create the invitation queue"); 3200 }; 3201 let Some(RadrootsSimplexSmpQueueRequestData::Messaging(Some(link_request))) = 3202 create_request.queue_request_data.as_ref() 3203 else { 3204 panic!("invitation NEW should carry short-link messaging data"); 3205 }; 3206 let create_correlation = transport.requests[0] 3207 .correlation_id 3208 .as_ref() 3209 .expect("create command should carry a correlation ID"); 3210 assert_eq!( 3211 link_request.sender_id, 3212 short_link_sender_id(create_correlation) 3213 ); 3214 assert!(!link_request.link_data.fixed_data.is_empty()); 3215 assert!(!link_request.link_data.user_data.is_empty()); 3216 assert!(matches!( 3217 transport.requests[3].command, 3218 RadrootsSimplexSmpCommand::Sub 3219 )); 3220 assert_eq!(transport.requests[3].entity_id, b"recipient".to_vec()); 3221 assert!(matches!( 3222 transport.requests[4].command, 3223 RadrootsSimplexSmpCommand::Sub 3224 )); 3225 assert_eq!(transport.requests[4].entity_id, b"recipient-2".to_vec()); 3226 assert!( 3227 !transport 3228 .requests 3229 .iter() 3230 .any(|request| matches!(request.command, RadrootsSimplexSmpCommand::Get)) 3231 ); 3232 let events = runtime.drain_events(16); 3233 let Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady { invitation, .. }) = 3234 events.first() 3235 else { 3236 panic!("runtime should emit a short invitation event"); 3237 }; 3238 let rendered = invitation.render().unwrap(); 3239 assert!(rendered.starts_with("simplex:/i#")); 3240 assert_eq!( 3241 radroots_simplex_agent_proto::prelude::parse_short_invitation_link(&rendered).unwrap(), 3242 invitation.clone() 3243 ); 3244 let short_link = runtime 3245 .store 3246 .connection(&created) 3247 .unwrap() 3248 .short_link 3249 .as_ref() 3250 .unwrap(); 3251 assert_eq!(short_link.link_id, synthetic_link_id(b"server-dh")); 3252 let link_data_key = derive_invitation_short_link_data_key(&short_link.link_key).unwrap(); 3253 let stored_link_data = RadrootsSimplexSmpQueueLinkData { 3254 fixed_data: short_link.encrypted_fixed_data.clone().unwrap(), 3255 user_data: short_link.encrypted_user_data.clone().unwrap(), 3256 }; 3257 let verified = radroots_simplex_smp_crypto::prelude::decrypt_verify_short_link_data( 3258 &short_link.link_key, 3259 &link_data_key, 3260 &short_link.link_public_signature_key, 3261 &stored_link_data, 3262 ) 3263 .unwrap(); 3264 let decoded = radroots_simplex_agent_proto::prelude::decode_short_invitation_fixed_data( 3265 &verified.fixed_data, 3266 ) 3267 .unwrap(); 3268 assert_eq!( 3269 decoded.root_public_signature_key, 3270 short_link.link_public_signature_key 3271 ); 3272 assert!(decoded.invitation.connection_id.is_empty()); 3273 let decoded_user_data = 3274 radroots_simplex_agent_proto::prelude::decode_short_invitation_user_data( 3275 &verified.user_data, 3276 ) 3277 .unwrap(); 3278 assert_eq!(decoded_user_data.user_data, created.as_bytes().to_vec()); 3279 let decrypted_invitation = 3280 decrypt_short_invitation_link_data(invitation, &stored_link_data).unwrap(); 3281 assert_eq!( 3282 decrypted_invitation.connection_id, 3283 created.as_bytes().to_vec() 3284 ); 3285 assert_eq!( 3286 runtime.store.connection(&joined).unwrap().status, 3287 RadrootsSimplexAgentConnectionStatus::JoinPending 3288 ); 3289 } 3290 3291 #[test] 3292 fn join_short_invitation_retrieves_link_data_and_continues_join() { 3293 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3294 let created = runtime 3295 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3296 .unwrap(); 3297 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3298 ids_response(b"recipient", b"sender", b"server-dh"), 3299 RadrootsSimplexSmpBrokerMessage::Ok, 3300 ]); 3301 runtime 3302 .execute_ready_commands(&mut setup_transport, 30, 16) 3303 .unwrap(); 3304 let events = runtime.drain_events(16); 3305 let Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady { 3306 invitation: short_invitation, 3307 .. 3308 }) = events.first() 3309 else { 3310 panic!("runtime should emit a short invitation event"); 3311 }; 3312 let short_link = runtime 3313 .store 3314 .connection(&created) 3315 .unwrap() 3316 .short_link 3317 .as_ref() 3318 .unwrap(); 3319 let stored_link_data = RadrootsSimplexSmpQueueLinkData { 3320 fixed_data: short_link.encrypted_fixed_data.clone().unwrap(), 3321 user_data: short_link.encrypted_user_data.clone().unwrap(), 3322 }; 3323 let joined = runtime 3324 .join_short_invitation(short_invitation.clone(), reply_queue(), 40) 3325 .unwrap(); 3326 let mut join_transport = ScriptedTransport::with_responses(vec![ 3327 RadrootsSimplexSmpBrokerMessage::Lnk { 3328 sender_id: b"sender".to_vec(), 3329 link_data: stored_link_data, 3330 }, 3331 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 3332 RadrootsSimplexSmpBrokerMessage::Ok, 3333 RadrootsSimplexSmpBrokerMessage::Ok, 3334 ]); 3335 runtime 3336 .execute_ready_commands(&mut join_transport, 50, 16) 3337 .unwrap(); 3338 3339 assert_eq!(join_transport.requests.len(), 4); 3340 let RadrootsSimplexSmpCommand::LKey(sender_auth_key) = &join_transport.requests[0].command 3341 else { 3342 panic!("short invitation join should authorize link retrieval first"); 3343 }; 3344 let RadrootsSimplexSmpCommandAuthorization::Ed25519(lkey_auth) = 3345 &join_transport.requests[0].authorization 3346 else { 3347 panic!("short invitation link retrieval should be signed"); 3348 }; 3349 assert_eq!( 3350 sender_auth_key, 3351 &encode_ed25519_public_key_x509(&lkey_auth.public_key).unwrap() 3352 ); 3353 assert_eq!( 3354 join_transport.requests[0].entity_id, 3355 short_invitation.link_id.clone() 3356 ); 3357 let RadrootsSimplexSmpCommand::New(_) = &join_transport.requests[1].command else { 3358 panic!("short invitation join should create the reply queue"); 3359 }; 3360 assert!(matches!( 3361 join_transport.requests[2].command, 3362 RadrootsSimplexSmpCommand::Sub 3363 )); 3364 assert!(matches!( 3365 join_transport.requests[3].command, 3366 RadrootsSimplexSmpCommand::Send(_) 3367 )); 3368 let joined_connection = runtime.store.connection(&joined).unwrap(); 3369 assert_eq!( 3370 joined_connection.status, 3371 RadrootsSimplexAgentConnectionStatus::JoinPending 3372 ); 3373 assert_eq!( 3374 joined_connection.invitation.as_ref().unwrap().connection_id, 3375 created.as_bytes().to_vec() 3376 ); 3377 assert_eq!( 3378 runtime 3379 .store 3380 .primary_send_queue(&joined) 3381 .unwrap() 3382 .descriptor 3383 .queue_uri 3384 .sender_id, 3385 URL_SAFE_NO_PAD.encode(b"sender") 3386 ); 3387 let send_auth = runtime 3388 .store 3389 .primary_send_queue(&joined) 3390 .unwrap() 3391 .auth_state 3392 .unwrap(); 3393 assert_eq!(send_auth.public_key, lkey_auth.public_key); 3394 let RadrootsSimplexSmpCommandAuthorization::Ed25519(send_auth_request) = 3395 &join_transport.requests[3].authorization 3396 else { 3397 panic!("short invitation join confirmation should be signed"); 3398 }; 3399 assert_eq!(send_auth_request.public_key, send_auth.public_key); 3400 assert!(runtime.drain_events(16).iter().any(|event| matches!( 3401 event, 3402 RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { connection_id } 3403 if connection_id == &joined 3404 ))); 3405 } 3406 3407 #[test] 3408 fn join_confirmation_carries_sender_x3dh_params() { 3409 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3410 let created = runtime 3411 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3412 .unwrap(); 3413 let invitation = runtime 3414 .store 3415 .connection(&created) 3416 .unwrap() 3417 .invitation 3418 .clone() 3419 .unwrap(); 3420 let joined = runtime 3421 .join_connection(invitation, reply_queue(), 20) 3422 .unwrap(); 3423 3424 let mut transport = ScriptedTransport::with_responses(vec![ 3425 ids_response(b"recipient", b"sender", b"server-dh"), 3426 RadrootsSimplexSmpBrokerMessage::Ok, 3427 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 3428 ]); 3429 runtime 3430 .execute_ready_commands(&mut transport, 30, 3) 3431 .unwrap(); 3432 let local_key_1 = runtime 3433 .store 3434 .connection(&joined) 3435 .unwrap() 3436 .local_x3dh_key_1 3437 .clone() 3438 .unwrap(); 3439 let local_key_2 = runtime 3440 .store 3441 .connection(&joined) 3442 .unwrap() 3443 .local_x3dh_key_2 3444 .clone() 3445 .unwrap(); 3446 let local_pq_keypair = runtime 3447 .store 3448 .connection(&joined) 3449 .unwrap() 3450 .local_pq_keypair 3451 .clone() 3452 .unwrap(); 3453 let ready = runtime.retry_pending(30, 16); 3454 let confirmation_params = ready 3455 .into_iter() 3456 .find_map(|command| match command.kind { 3457 RadrootsSimplexAgentPendingCommandKind::SendEnvelope { 3458 envelope: 3459 RadrootsSimplexAgentEnvelope::Confirmation { 3460 reply_queue: true, 3461 e2e_ratchet_params: Some(params), 3462 .. 3463 }, 3464 .. 3465 } => Some(params), 3466 _ => None, 3467 }) 3468 .unwrap(); 3469 3470 assert_eq!(confirmation_params.key_1, local_key_1.public_key); 3471 assert_eq!(confirmation_params.key_2, local_key_2.public_key); 3472 assert_eq!( 3473 confirmation_params.pq_public_key, 3474 Some(local_pq_keypair.public_key) 3475 ); 3476 assert!(confirmation_params.pq_ciphertext.is_some()); 3477 } 3478 3479 #[test] 3480 fn confirmation_params_initialize_receiver_ratchet() { 3481 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3482 let created = runtime 3483 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3484 .unwrap(); 3485 let invitation = runtime 3486 .store 3487 .connection(&created) 3488 .unwrap() 3489 .invitation 3490 .clone() 3491 .unwrap(); 3492 let joined = runtime 3493 .join_connection(invitation, reply_queue(), 20) 3494 .unwrap(); 3495 let joined_connection = runtime.store.connection(&joined).unwrap(); 3496 let joined_key_1 = joined_connection.local_x3dh_key_1.as_ref().unwrap(); 3497 let joined_key_2 = joined_connection.local_x3dh_key_2.as_ref().unwrap(); 3498 let joined_ratchet = joined_connection.ratchet_state.as_ref().unwrap(); 3499 let e2e_ratchet_params = official_x3dh_params_from_parts( 3500 &joined_key_1.public_key, 3501 &joined_key_2.public_key, 3502 joined_ratchet.current_pq_public_key.clone(), 3503 joined_ratchet.pending_outbound_pq_ciphertext.clone(), 3504 ) 3505 .unwrap(); 3506 let envelope = RadrootsSimplexAgentEnvelope::Confirmation { 3507 reply_queue: true, 3508 e2e_ratchet_params: Some(e2e_ratchet_params), 3509 encrypted: RadrootsSimplexAgentEncryptedPayload { 3510 ratchet_header: None, 3511 official_message: Some(Vec::new()), 3512 ciphertext: Vec::new(), 3513 }, 3514 }; 3515 3516 runtime 3517 .initialize_receiver_ratchet_from_confirmation(&created, &envelope) 3518 .unwrap(); 3519 let mut sender_ratchet = runtime 3520 .store 3521 .connection(&joined) 3522 .unwrap() 3523 .ratchet_state 3524 .clone() 3525 .unwrap(); 3526 let encrypted = sender_ratchet 3527 .encrypt_official_payload(&[0_u8; 32], b"reply-info", 96) 3528 .unwrap(); 3529 let receiver_ratchet = runtime 3530 .store 3531 .connection_mut(&created) 3532 .unwrap() 3533 .ratchet_state 3534 .as_mut() 3535 .unwrap(); 3536 let decrypted = receiver_ratchet 3537 .decrypt_official_payload(&[0_u8; 32], &encrypted) 3538 .unwrap(); 3539 3540 assert_eq!(decrypted, b"reply-info"); 3541 assert!(receiver_ratchet.official_sending_chain_key.is_some()); 3542 assert!(receiver_ratchet.official_receiving_chain_key.is_some()); 3543 } 3544 3545 #[test] 3546 fn explicit_get_connection_message_executes_smp_get() { 3547 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3548 let created = runtime 3549 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3550 .unwrap(); 3551 3552 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3553 ids_response(b"recipient", b"sender", b"server-dh"), 3554 RadrootsSimplexSmpBrokerMessage::Ok, 3555 ]); 3556 runtime 3557 .execute_ready_commands(&mut setup_transport, 30, 16) 3558 .unwrap(); 3559 assert!(matches!( 3560 setup_transport.requests[1].command, 3561 RadrootsSimplexSmpCommand::Sub 3562 )); 3563 assert_eq!(setup_transport.requests[1].entity_id, b"recipient".to_vec()); 3564 assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed); 3565 3566 runtime.get_connection_message(&created, 40).unwrap(); 3567 let mut get_transport = 3568 ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]); 3569 runtime 3570 .execute_ready_commands(&mut get_transport, 50, 16) 3571 .unwrap(); 3572 3573 assert_eq!(get_transport.requests.len(), 1); 3574 assert!(matches!( 3575 get_transport.requests[0].command, 3576 RadrootsSimplexSmpCommand::Get 3577 )); 3578 assert_eq!(get_transport.requests[0].entity_id, b"recipient".to_vec()); 3579 assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed); 3580 } 3581 3582 #[test] 3583 fn get_no_msg_response_is_empty_queue_success() { 3584 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3585 let created = runtime 3586 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3587 .unwrap(); 3588 3589 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3590 ids_response(b"recipient", b"sender", b"server-dh"), 3591 RadrootsSimplexSmpBrokerMessage::Ok, 3592 ]); 3593 runtime 3594 .execute_ready_commands(&mut setup_transport, 30, 16) 3595 .unwrap(); 3596 let _ = runtime.drain_events(16); 3597 3598 runtime.get_connection_message(&created, 40).unwrap(); 3599 let mut get_transport = 3600 ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err( 3601 RadrootsSimplexSmpError::NoMsg, 3602 )]); 3603 runtime 3604 .execute_ready_commands(&mut get_transport, 50, 16) 3605 .unwrap(); 3606 3607 assert_eq!(get_transport.requests.len(), 1); 3608 assert!(matches!( 3609 get_transport.requests[0].command, 3610 RadrootsSimplexSmpCommand::Get 3611 )); 3612 assert!(runtime.retry_pending(50, 16).is_empty()); 3613 assert!(!runtime.drain_events(16).iter().any(|event| matches!( 3614 event, 3615 RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg") 3616 ))); 3617 } 3618 3619 #[test] 3620 fn subscription_receive_routes_broker_transmission_by_entity_id() { 3621 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3622 let created = runtime 3623 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3624 .unwrap(); 3625 3626 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3627 ids_response(b"recipient", b"sender", b"server-dh"), 3628 RadrootsSimplexSmpBrokerMessage::Ok, 3629 ]); 3630 runtime 3631 .execute_ready_commands(&mut setup_transport, 30, 16) 3632 .unwrap(); 3633 let receive_queue = runtime.store.receive_queues(&created).unwrap()[0].clone(); 3634 let _ = runtime.drain_events(16); 3635 3636 let mut subscription_transport = ScriptedTransport::with_subscription_responses(vec![ 3637 RadrootsSimplexSmpBrokerTransmission { 3638 authorization: Vec::new(), 3639 correlation_id: None, 3640 entity_id: receive_queue.entity_id, 3641 message: RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg), 3642 }, 3643 ]); 3644 runtime 3645 .receive_subscription_messages(&mut subscription_transport, 4) 3646 .unwrap(); 3647 3648 assert_eq!(subscription_transport.subscription_requests.len(), 2); 3649 assert_eq!( 3650 subscription_transport.subscription_requests[0].server, 3651 receive_queue.descriptor.queue_uri.server 3652 ); 3653 assert!(runtime.drain_events(16).is_empty()); 3654 } 3655 3656 #[test] 3657 fn subscribe_no_msg_response_marks_queue_subscribed() { 3658 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3659 let created = runtime 3660 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3661 .unwrap(); 3662 3663 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3664 ids_response(b"recipient", b"sender", b"server-dh"), 3665 RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg), 3666 ]); 3667 runtime 3668 .execute_ready_commands(&mut setup_transport, 30, 16) 3669 .unwrap(); 3670 3671 assert_eq!(setup_transport.requests.len(), 2); 3672 assert!(matches!( 3673 setup_transport.requests[1].command, 3674 RadrootsSimplexSmpCommand::Sub 3675 )); 3676 assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed); 3677 assert!(!runtime.drain_events(16).iter().any(|event| matches!( 3678 event, 3679 RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg") 3680 ))); 3681 } 3682 3683 #[test] 3684 fn inbound_progress_accepts_exact_duplicate_for_latest_ack_target() { 3685 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3686 let connection_id = runtime 3687 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3688 .unwrap(); 3689 mark_connected(&mut runtime, &connection_id); 3690 let first_queue = reply_descriptor().queue_address(); 3691 let second_queue = RadrootsSimplexAgentQueueAddress { 3692 server: first_queue.server.clone(), 3693 sender_id: b"second-duplicate-broker".to_vec(), 3694 }; 3695 let frame = user_message_frame(1, Vec::new(), b"first"); 3696 let frame_hash = agent_message_hash(&frame); 3697 3698 runtime 3699 .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash) 3700 .unwrap(); 3701 runtime 3702 .store 3703 .record_inbound_message( 3704 &connection_id, 3705 first_queue, 3706 b"first-broker-message".to_vec(), 3707 frame.header.message_id, 3708 frame_hash.clone(), 3709 ) 3710 .unwrap(); 3711 runtime 3712 .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash) 3713 .unwrap(); 3714 runtime 3715 .store 3716 .record_inbound_message( 3717 &connection_id, 3718 second_queue.clone(), 3719 b"second-broker-message".to_vec(), 3720 frame.header.message_id, 3721 frame_hash, 3722 ) 3723 .unwrap(); 3724 3725 assert_eq!( 3726 runtime 3727 .store 3728 .inbound_ack_target(&connection_id, 1, &agent_message_hash(&frame)) 3729 .unwrap(), 3730 Some((second_queue, b"second-broker-message".to_vec())) 3731 ); 3732 } 3733 3734 #[test] 3735 fn inbound_progress_rejects_gap_and_previous_hash_mismatch() { 3736 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3737 let connection_id = runtime 3738 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3739 .unwrap(); 3740 mark_connected(&mut runtime, &connection_id); 3741 let queue = reply_descriptor().queue_address(); 3742 let first_frame = user_message_frame(1, Vec::new(), b"first"); 3743 let first_hash = agent_message_hash(&first_frame); 3744 runtime 3745 .store 3746 .record_inbound_message( 3747 &connection_id, 3748 queue, 3749 b"first-broker-message".to_vec(), 3750 first_frame.header.message_id, 3751 first_hash.clone(), 3752 ) 3753 .unwrap(); 3754 3755 let gap_frame = user_message_frame(3, first_hash.clone(), b"gap"); 3756 let gap_error = runtime 3757 .validate_inbound_frame_progress( 3758 &connection_id, 3759 &gap_frame, 3760 &agent_message_hash(&gap_frame), 3761 ) 3762 .unwrap_err(); 3763 assert!(gap_error.to_string().contains("skipped expected `2`")); 3764 3765 let mismatch_frame = user_message_frame(2, b"wrong-previous-hash".to_vec(), b"second"); 3766 let mismatch_error = runtime 3767 .validate_inbound_frame_progress( 3768 &connection_id, 3769 &mismatch_frame, 3770 &agent_message_hash(&mismatch_frame), 3771 ) 3772 .unwrap_err(); 3773 assert!( 3774 mismatch_error 3775 .to_string() 3776 .contains("unexpected previous-message hash") 3777 ); 3778 } 3779 3780 #[test] 3781 fn inbound_progress_allows_first_visible_user_message_after_missing_peer_hello() { 3782 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3783 let connection_id = runtime 3784 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3785 .unwrap(); 3786 mark_connected(&mut runtime, &connection_id); 3787 runtime 3788 .store 3789 .connection_mut(&connection_id) 3790 .unwrap() 3791 .hello_received = false; 3792 3793 let frame = user_message_frame(2, b"peer-hello-hash".to_vec(), b"first visible"); 3794 let frame_hash = agent_message_hash(&frame); 3795 3796 runtime 3797 .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash) 3798 .unwrap(); 3799 } 3800 3801 #[test] 3802 fn inbound_progress_rejects_regression_after_accepted_next_message() { 3803 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3804 let connection_id = runtime 3805 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3806 .unwrap(); 3807 mark_connected(&mut runtime, &connection_id); 3808 let queue = reply_descriptor().queue_address(); 3809 let first_frame = user_message_frame(1, Vec::new(), b"first"); 3810 let first_hash = agent_message_hash(&first_frame); 3811 let second_frame = user_message_frame(2, first_hash.clone(), b"second"); 3812 let second_hash = agent_message_hash(&second_frame); 3813 runtime 3814 .store 3815 .record_inbound_message( 3816 &connection_id, 3817 queue.clone(), 3818 b"first-broker-message".to_vec(), 3819 first_frame.header.message_id, 3820 first_hash, 3821 ) 3822 .unwrap(); 3823 runtime 3824 .validate_inbound_frame_progress(&connection_id, &second_frame, &second_hash) 3825 .unwrap(); 3826 runtime 3827 .store 3828 .record_inbound_message( 3829 &connection_id, 3830 queue, 3831 b"second-broker-message".to_vec(), 3832 second_frame.header.message_id, 3833 second_hash, 3834 ) 3835 .unwrap(); 3836 3837 let regression_frame = user_message_frame(1, Vec::new(), b"first"); 3838 let regression_error = runtime 3839 .validate_inbound_frame_progress( 3840 &connection_id, 3841 ®ression_frame, 3842 &agent_message_hash(®ression_frame), 3843 ) 3844 .unwrap_err(); 3845 assert!(regression_error.to_string().contains("regressed below `2`")); 3846 } 3847 3848 #[test] 3849 fn send_message_requires_connected_state() { 3850 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3851 let created = runtime 3852 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3853 .unwrap(); 3854 let invitation = runtime 3855 .store 3856 .connection(&created) 3857 .unwrap() 3858 .invitation 3859 .clone() 3860 .unwrap(); 3861 let joined = runtime 3862 .join_connection(invitation, reply_queue(), 20) 3863 .unwrap(); 3864 3865 let error = runtime 3866 .send_message(&joined, b"blocked before connected".to_vec(), 30) 3867 .unwrap_err(); 3868 assert!(error.to_string().contains("is not connected")); 3869 } 3870 3871 #[test] 3872 fn allow_and_hello_lifecycle_reaches_connected() { 3873 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3874 let created = runtime 3875 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3876 .unwrap(); 3877 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3878 ids_response(b"recipient", b"sender", b"server-dh"), 3879 RadrootsSimplexSmpBrokerMessage::Ok, 3880 ]); 3881 runtime 3882 .execute_ready_commands(&mut setup_transport, 30, 16) 3883 .unwrap(); 3884 runtime 3885 .store 3886 .connection_mut(&created) 3887 .unwrap() 3888 .shared_secret = Some(vec![3_u8; 32]); 3889 3890 runtime 3891 .handle_inbound_decrypted_message( 3892 &created, 3893 RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { 3894 reply_queues: vec![reply_descriptor()], 3895 info: b"peer-info".to_vec(), 3896 }, 3897 b"reply-confirmation".to_vec(), 3898 ) 3899 .unwrap(); 3900 assert_eq!( 3901 runtime.store.connection(&created).unwrap().status, 3902 RadrootsSimplexAgentConnectionStatus::AwaitingApproval 3903 ); 3904 runtime 3905 .handle_inbound_decrypted_message( 3906 &created, 3907 RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { 3908 reply_queues: vec![reply_descriptor()], 3909 info: b"peer-info".to_vec(), 3910 }, 3911 b"reply-confirmation-duplicate".to_vec(), 3912 ) 3913 .unwrap(); 3914 initialize_test_outbound_official_ratchet(&mut runtime, &created); 3915 3916 runtime 3917 .allow_connection(&created, b"local-info".to_vec(), 40) 3918 .unwrap(); 3919 let mut allow_transport = ScriptedTransport::with_responses(vec![ 3920 RadrootsSimplexSmpBrokerMessage::Ok, 3921 RadrootsSimplexSmpBrokerMessage::Ok, 3922 RadrootsSimplexSmpBrokerMessage::Ok, 3923 ]); 3924 runtime 3925 .execute_ready_commands(&mut allow_transport, 50, 16) 3926 .unwrap(); 3927 assert_eq!(allow_transport.requests.len(), 3); 3928 assert!(matches!( 3929 allow_transport.requests[0].command, 3930 RadrootsSimplexSmpCommand::SKey(_) 3931 )); 3932 assert!(matches!( 3933 allow_transport.requests[1].command, 3934 RadrootsSimplexSmpCommand::Send(_) 3935 )); 3936 assert!(matches!( 3937 allow_transport.requests[2].command, 3938 RadrootsSimplexSmpCommand::Send(_) 3939 )); 3940 let connection = runtime.store.connection(&created).unwrap(); 3941 assert_eq!( 3942 connection.status, 3943 RadrootsSimplexAgentConnectionStatus::Connected 3944 ); 3945 assert!(connection.hello_sent); 3946 assert!(!connection.hello_received); 3947 3948 runtime 3949 .handle_inbound_decrypted_message(&created, hello_message(1), b"hello-in".to_vec()) 3950 .unwrap(); 3951 let connection = runtime.store.connection(&created).unwrap(); 3952 assert_eq!( 3953 connection.status, 3954 RadrootsSimplexAgentConnectionStatus::Connected 3955 ); 3956 assert!(connection.hello_sent); 3957 assert!(connection.hello_received); 3958 assert!(runtime.drain_events(16).into_iter().any(|event| matches!( 3959 event, 3960 RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { connection_id } 3961 if connection_id == created 3962 ))); 3963 let mut hello_transport = ScriptedTransport::with_responses(vec![]); 3964 runtime 3965 .execute_ready_commands(&mut hello_transport, 60, 16) 3966 .unwrap(); 3967 assert!(hello_transport.requests.is_empty()); 3968 } 3969 3970 #[test] 3971 fn delivered_send_confirms_cursor_only_after_transport_success() { 3972 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 3973 let created = runtime 3974 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 3975 .unwrap(); 3976 let invitation = runtime 3977 .store 3978 .connection(&created) 3979 .unwrap() 3980 .invitation 3981 .clone() 3982 .unwrap(); 3983 let joined = runtime 3984 .join_connection(invitation, reply_queue(), 20) 3985 .unwrap(); 3986 3987 let mut setup_transport = ScriptedTransport::with_responses(vec![ 3988 ids_response(b"recipient", b"sender", b"server-dh"), 3989 RadrootsSimplexSmpBrokerMessage::Ok, 3990 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 3991 RadrootsSimplexSmpBrokerMessage::Ok, 3992 RadrootsSimplexSmpBrokerMessage::Ok, 3993 RadrootsSimplexSmpBrokerMessage::Ok, 3994 ]); 3995 runtime 3996 .execute_ready_commands(&mut setup_transport, 30, 16) 3997 .unwrap(); 3998 mark_connected(&mut runtime, &joined); 3999 4000 let message_id = runtime 4001 .send_message(&joined, b"hello simplex".to_vec(), 40) 4002 .unwrap(); 4003 assert_eq!(message_id, 1); 4004 assert_eq!( 4005 runtime 4006 .store 4007 .connection(&joined) 4008 .unwrap() 4009 .delivery_cursor 4010 .last_sent_message_id, 4011 None 4012 ); 4013 4014 let mut delivery_transport = 4015 ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]); 4016 runtime 4017 .execute_ready_commands(&mut delivery_transport, 50, 16) 4018 .unwrap(); 4019 4020 let cursor = &runtime.store.connection(&joined).unwrap().delivery_cursor; 4021 assert_eq!(cursor.last_sent_message_id, Some(1)); 4022 assert!(cursor.last_sent_message_hash.is_some()); 4023 assert_eq!( 4024 runtime 4025 .store 4026 .connection(&joined) 4027 .unwrap() 4028 .staged_outbound_message, 4029 None 4030 ); 4031 assert!(runtime.drain_events(64).into_iter().any(|event| matches!( 4032 event, 4033 RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered { 4034 connection_id, 4035 message_id: 1, 4036 message_hash, 4037 } if connection_id == joined && !message_hash.is_empty() 4038 ))); 4039 } 4040 4041 #[test] 4042 fn peer_receipt_requires_stored_outbound_message_hash() { 4043 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4044 let connection_id = runtime 4045 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4046 .unwrap(); 4047 let prepared = runtime 4048 .store 4049 .prepare_outbound_message(&connection_id, b"outbound-message-hash".to_vec()) 4050 .unwrap(); 4051 runtime 4052 .store 4053 .confirm_outbound_message(&connection_id, prepared.message_id) 4054 .unwrap(); 4055 4056 runtime 4057 .handle_inbound_decrypted_message( 4058 &connection_id, 4059 receipt_message(1, prepared.message_id, b"outbound-message-hash".to_vec()), 4060 b"receipt-frame".to_vec(), 4061 ) 4062 .unwrap(); 4063 4064 assert!(runtime.drain_events(16).into_iter().any(|event| matches!( 4065 event, 4066 RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { 4067 connection_id: event_connection_id, 4068 message_id, 4069 message_hash, 4070 } if event_connection_id == connection_id 4071 && message_id == prepared.message_id 4072 && message_hash == b"outbound-message-hash".to_vec() 4073 ))); 4074 } 4075 4076 #[test] 4077 fn peer_receipt_rejects_hash_mismatch() { 4078 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4079 let connection_id = runtime 4080 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4081 .unwrap(); 4082 let prepared = runtime 4083 .store 4084 .prepare_outbound_message(&connection_id, b"outbound-message-hash".to_vec()) 4085 .unwrap(); 4086 runtime 4087 .store 4088 .confirm_outbound_message(&connection_id, prepared.message_id) 4089 .unwrap(); 4090 4091 let error = runtime 4092 .handle_inbound_decrypted_message( 4093 &connection_id, 4094 receipt_message(1, prepared.message_id, b"wrong-hash".to_vec()), 4095 b"receipt-frame".to_vec(), 4096 ) 4097 .unwrap_err(); 4098 4099 assert!( 4100 error 4101 .to_string() 4102 .contains("did not match stored outbound message hash") 4103 ); 4104 assert!(!runtime.drain_events(16).into_iter().any(|event| matches!( 4105 event, 4106 RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { .. } 4107 ))); 4108 } 4109 4110 #[test] 4111 fn send_message_stores_opaque_encrypted_agent_payload() { 4112 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4113 let created = runtime 4114 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4115 .unwrap(); 4116 let invitation = runtime 4117 .store 4118 .connection(&created) 4119 .unwrap() 4120 .invitation 4121 .clone() 4122 .unwrap(); 4123 let joined = runtime 4124 .join_connection(invitation, reply_queue(), 20) 4125 .unwrap(); 4126 4127 let mut setup_transport = ScriptedTransport::with_responses(vec![ 4128 ids_response(b"recipient", b"sender", b"server-dh"), 4129 RadrootsSimplexSmpBrokerMessage::Ok, 4130 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 4131 RadrootsSimplexSmpBrokerMessage::Ok, 4132 RadrootsSimplexSmpBrokerMessage::Ok, 4133 RadrootsSimplexSmpBrokerMessage::Ok, 4134 ]); 4135 runtime 4136 .execute_ready_commands(&mut setup_transport, 30, 16) 4137 .unwrap(); 4138 mark_connected(&mut runtime, &joined); 4139 4140 runtime 4141 .send_message(&joined, b"hello simplex".to_vec(), 40) 4142 .unwrap(); 4143 let command = runtime.retry_pending(40, 16).remove(0); 4144 let RadrootsSimplexAgentPendingCommandKind::SendEnvelope { envelope, .. } = command.kind 4145 else { 4146 panic!("expected send envelope command"); 4147 }; 4148 let RadrootsSimplexAgentEnvelope::Message(encrypted) = envelope else { 4149 panic!("expected encrypted message envelope"); 4150 }; 4151 let expected_plaintext = encode_decrypted_message( 4152 &RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { 4153 header: RadrootsSimplexAgentMessageHeader { 4154 message_id: 1, 4155 previous_message_hash: Vec::new(), 4156 }, 4157 message: RadrootsSimplexAgentMessage::UserMessage(b"hello simplex".to_vec()), 4158 padding: Vec::new(), 4159 }), 4160 ) 4161 .unwrap(); 4162 4163 assert!(encrypted.ratchet_header.is_none()); 4164 assert!(encrypted.ciphertext.is_empty()); 4165 let official_message = encrypted.official_message.as_ref().unwrap(); 4166 assert_ne!(official_message, &expected_plaintext); 4167 assert_eq!( 4168 official_message.len(), 4169 2 + 124 + 16 + SIMPLEX_AGENT_E2E_MESSAGE_LENGTH 4170 ); 4171 } 4172 4173 #[test] 4174 fn transport_retry_keeps_staged_outbound_message() { 4175 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4176 let created = runtime 4177 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4178 .unwrap(); 4179 let invitation = runtime 4180 .store 4181 .connection(&created) 4182 .unwrap() 4183 .invitation 4184 .clone() 4185 .unwrap(); 4186 let joined = runtime 4187 .join_connection(invitation, reply_queue(), 20) 4188 .unwrap(); 4189 4190 let mut setup_transport = ScriptedTransport::with_responses(vec![ 4191 ids_response(b"recipient", b"sender", b"server-dh"), 4192 RadrootsSimplexSmpBrokerMessage::Ok, 4193 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 4194 RadrootsSimplexSmpBrokerMessage::Ok, 4195 RadrootsSimplexSmpBrokerMessage::Ok, 4196 RadrootsSimplexSmpBrokerMessage::Ok, 4197 ]); 4198 runtime 4199 .execute_ready_commands(&mut setup_transport, 30, 16) 4200 .unwrap(); 4201 mark_connected(&mut runtime, &joined); 4202 4203 runtime 4204 .send_message(&joined, b"hello simplex".to_vec(), 40) 4205 .unwrap(); 4206 4207 struct FailingTransport; 4208 impl RadrootsSimplexSmpCommandTransport for FailingTransport { 4209 type Error = String; 4210 fn execute( 4211 &mut self, 4212 _request: RadrootsSimplexSmpTransportRequest, 4213 ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> { 4214 Err("synthetic failure".to_owned()) 4215 } 4216 } 4217 4218 runtime 4219 .execute_ready_commands(&mut FailingTransport, 50, 16) 4220 .unwrap(); 4221 4222 assert_eq!( 4223 runtime 4224 .store 4225 .connection(&joined) 4226 .unwrap() 4227 .delivery_cursor 4228 .last_sent_message_id, 4229 None 4230 ); 4231 assert_eq!( 4232 runtime 4233 .store 4234 .connection(&joined) 4235 .unwrap() 4236 .staged_outbound_message 4237 .as_ref() 4238 .map(|message| message.message_id), 4239 Some(1) 4240 ); 4241 let ready_again = runtime.retry_pending(50 + 5_000, 16); 4242 assert_eq!(ready_again.len(), 1); 4243 } 4244 4245 #[cfg(feature = "std")] 4246 #[test] 4247 fn builder_opens_persistent_store_path() { 4248 let tempdir = tempfile::tempdir().unwrap(); 4249 let path = tempdir.path().join("runtime-store.json"); 4250 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new() 4251 .persistent_store_path(&path) 4252 .build() 4253 .unwrap(); 4254 runtime 4255 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4256 .unwrap(); 4257 assert!(path.exists()); 4258 } 4259 4260 #[cfg(feature = "std")] 4261 #[test] 4262 fn resume_subscriptions_requeues_persisted_receive_queues_after_restart() { 4263 let tempdir = tempfile::tempdir().unwrap(); 4264 let path = tempdir.path().join("runtime-store.json"); 4265 { 4266 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new() 4267 .persistent_store_path(&path) 4268 .build() 4269 .unwrap(); 4270 let created = runtime 4271 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4272 .unwrap(); 4273 let mut setup_transport = ScriptedTransport::with_responses(vec![ 4274 ids_response(b"recipient", b"sender", b"server-dh"), 4275 RadrootsSimplexSmpBrokerMessage::Ok, 4276 ]); 4277 runtime 4278 .execute_ready_commands(&mut setup_transport, 30, 16) 4279 .unwrap(); 4280 assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed); 4281 } 4282 4283 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new() 4284 .persistent_store_path(&path) 4285 .build() 4286 .unwrap(); 4287 assert_eq!(runtime.resume_subscriptions(40).unwrap(), 1); 4288 assert_eq!(runtime.resume_subscriptions(41).unwrap(), 0); 4289 let mut subscription_transport = 4290 ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]); 4291 runtime 4292 .execute_ready_commands(&mut subscription_transport, 50, 16) 4293 .unwrap(); 4294 4295 assert_eq!(subscription_transport.requests.len(), 1); 4296 assert!(matches!( 4297 subscription_transport.requests[0].command, 4298 RadrootsSimplexSmpCommand::Sub 4299 )); 4300 assert_eq!( 4301 subscription_transport.requests[0].entity_id, 4302 b"recipient".to_vec() 4303 ); 4304 assert!(runtime.drain_events(16).into_iter().any(|event| matches!( 4305 event, 4306 RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { .. } 4307 ))); 4308 } 4309 4310 #[test] 4311 fn ack_no_msg_response_is_idempotently_delivered() { 4312 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4313 let created = runtime 4314 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4315 .unwrap(); 4316 4317 let mut setup_transport = ScriptedTransport::with_responses(vec![ 4318 ids_response(b"recipient", b"sender", b"server-dh"), 4319 RadrootsSimplexSmpBrokerMessage::Ok, 4320 ]); 4321 runtime 4322 .execute_ready_commands(&mut setup_transport, 30, 16) 4323 .unwrap(); 4324 mark_connected(&mut runtime, &created); 4325 let _ = runtime.drain_events(16); 4326 4327 let receive_queue = runtime.store.receive_queues(&created).unwrap()[0] 4328 .descriptor 4329 .queue_address(); 4330 let frame = user_message_frame(7, Vec::new(), b"ack target"); 4331 let message_hash = agent_message_hash(&frame); 4332 runtime 4333 .store 4334 .record_inbound_message( 4335 &created, 4336 receive_queue, 4337 b"already-acked-broker-message".to_vec(), 4338 frame.header.message_id, 4339 message_hash.clone(), 4340 ) 4341 .unwrap(); 4342 runtime 4343 .ack_message( 4344 &created, 4345 frame.header.message_id, 4346 message_hash.clone(), 4347 Vec::new(), 4348 40, 4349 ) 4350 .unwrap(); 4351 4352 let mut ack_transport = 4353 ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err( 4354 RadrootsSimplexSmpError::NoMsg, 4355 )]); 4356 runtime 4357 .execute_ready_commands(&mut ack_transport, 50, 16) 4358 .unwrap(); 4359 4360 assert_eq!(ack_transport.requests.len(), 1); 4361 assert!(matches!( 4362 ack_transport.requests[0].command, 4363 RadrootsSimplexSmpCommand::Ack(_) 4364 )); 4365 assert!(runtime.retry_pending(50, 16).is_empty()); 4366 let events = runtime.drain_events(16); 4367 assert!(events.iter().any(|event| matches!( 4368 event, 4369 RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered { 4370 connection_id, 4371 message_id: 7, 4372 message_hash: delivered_hash, 4373 } if connection_id == &created && delivered_hash == &message_hash 4374 ))); 4375 assert!(!events.iter().any(|event| matches!( 4376 event, 4377 RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg") 4378 ))); 4379 } 4380 4381 #[test] 4382 fn manual_record_command_failure_clears_staged_delivery_state() { 4383 let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); 4384 let created = runtime 4385 .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) 4386 .unwrap(); 4387 let invitation = runtime 4388 .store 4389 .connection(&created) 4390 .unwrap() 4391 .invitation 4392 .clone() 4393 .unwrap(); 4394 let joined = runtime 4395 .join_connection(invitation, reply_queue(), 20) 4396 .unwrap(); 4397 4398 let mut setup_transport = ScriptedTransport::with_responses(vec![ 4399 ids_response(b"recipient", b"sender", b"server-dh"), 4400 RadrootsSimplexSmpBrokerMessage::Ok, 4401 ids_response(b"recipient-2", b"sender-2", b"server-dh-2"), 4402 RadrootsSimplexSmpBrokerMessage::Ok, 4403 RadrootsSimplexSmpBrokerMessage::Ok, 4404 RadrootsSimplexSmpBrokerMessage::Ok, 4405 ]); 4406 runtime 4407 .execute_ready_commands(&mut setup_transport, 30, 16) 4408 .unwrap(); 4409 mark_connected(&mut runtime, &joined); 4410 4411 runtime 4412 .send_message(&joined, b"hello simplex".to_vec(), 40) 4413 .unwrap(); 4414 let command = runtime.retry_pending(40, 16).remove(0); 4415 runtime 4416 .record_command_outcome( 4417 command.id, 4418 RadrootsSimplexAgentCommandOutcome::Failed { 4419 message: "synthetic failure".into(), 4420 }, 4421 ) 4422 .unwrap(); 4423 assert_eq!( 4424 runtime 4425 .store 4426 .connection(&joined) 4427 .unwrap() 4428 .staged_outbound_message, 4429 None 4430 ); 4431 } 4432 }