client.rs (48380B)
1 #![cfg(feature = "std")] 2 3 use crate::error::RadrootsSimplexSmpTransportError; 4 use crate::executor::{ 5 RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest, 6 RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest, 7 RadrootsSimplexSmpTransportResponse, 8 }; 9 use crate::frame::{RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE, RadrootsSimplexSmpTransportBlock}; 10 use crate::handshake::{ 11 RADROOTS_SIMPLEX_SMP_TLS_ALPN_V1, RadrootsSimplexSmpClientHello, RadrootsSimplexSmpServerHello, 12 RadrootsSimplexSmpTlsHandshakeEvidence, RadrootsSimplexSmpTlsPolicy, 13 RadrootsSimplexSmpTransportServerProof, validate_tls_handshake, 14 }; 15 use base64::Engine as _; 16 use base64::engine::general_purpose::{URL_SAFE, URL_SAFE_NO_PAD}; 17 use radroots_simplex_smp_crypto::prelude::{ 18 RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope, 19 RadrootsSimplexSmpSecretBoxChainKey, RadrootsSimplexSmpX25519Keypair, advance_secretbox_chain, 20 decode_x25519_public_key_x509, derive_shared_secret, encode_x25519_public_key_x509, 21 encrypt_padded, init_secretbox_chain, verify_signature, 22 }; 23 use radroots_simplex_smp_proto::prelude::{ 24 RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION, 25 RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage, 26 RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId, 27 RadrootsSimplexSmpServerAddress, 28 }; 29 use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; 30 use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; 31 use rustls::{ 32 ClientConfig, ClientConnection, DigitallySignedStruct, Error as RustlsError, SignatureScheme, 33 StreamOwned, 34 }; 35 use sha2::{Digest, Sha256}; 36 use std::collections::{BTreeMap, VecDeque}; 37 use std::io::{ErrorKind, Read, Write}; 38 use std::net::{IpAddr, TcpStream, ToSocketAddrs}; 39 use std::sync::Arc; 40 use std::time::Duration; 41 use x509_parser::prelude::FromDer; 42 43 #[derive(Default)] 44 pub struct RadrootsSimplexSmpTlsCommandTransport { 45 sessions: BTreeMap<String, RadrootsSimplexSmpLiveSession>, 46 } 47 48 const LIVE_SESSION_TIMEOUT: Duration = Duration::from_secs(5); 49 const LIVE_EMPTY_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_millis(150); 50 51 struct RadrootsSimplexSmpLiveSession { 52 stream: StreamOwned<ClientConnection, TcpStream>, 53 transport_version: u16, 54 session_identifier: Vec<u8>, 55 send_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>, 56 receive_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>, 57 debug_shared_secret: Option<Vec<u8>>, 58 pending_broker_responses: VecDeque<RadrootsSimplexSmpTransportResponse>, 59 } 60 61 impl RadrootsSimplexSmpTlsCommandTransport { 62 pub fn new() -> Self { 63 Self::default() 64 } 65 66 fn session_key(server: &RadrootsSimplexSmpServerAddress, kind: &str) -> String { 67 let mut key = server.server_identity.clone(); 68 key.push('@'); 69 key.push_str(&server.hosts.join(",")); 70 key.push(':'); 71 key.push_str(&server.port.unwrap_or(5223).to_string()); 72 key.push('#'); 73 key.push_str(kind); 74 key 75 } 76 77 fn session_for( 78 &mut self, 79 server: &RadrootsSimplexSmpServerAddress, 80 kind: &str, 81 ) -> Result<&mut RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> { 82 let key = Self::session_key(server, kind); 83 if !self.sessions.contains_key(&key) { 84 let session = connect_live_session(server)?; 85 self.sessions.insert(key.clone(), session); 86 } 87 self.sessions.get_mut(&key).ok_or_else(|| { 88 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 89 "missing live SMP session for `{}`", 90 server.server_identity 91 )) 92 }) 93 } 94 } 95 96 impl RadrootsSimplexSmpCommandTransport for RadrootsSimplexSmpTlsCommandTransport { 97 type Error = RadrootsSimplexSmpTransportError; 98 99 fn execute( 100 &mut self, 101 request: RadrootsSimplexSmpTransportRequest, 102 ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> { 103 let session_kind = session_kind_for_command(&request.command); 104 let key = Self::session_key(&request.server, session_kind); 105 let accepts_uncorrelated_subscription_response = 106 accepts_uncorrelated_subscription_response(&request.command); 107 match execute_live_request( 108 self.session_for(&request.server, session_kind)?, 109 &request, 110 accepts_uncorrelated_subscription_response, 111 ) { 112 Ok(response) => Ok(response), 113 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => { 114 self.sessions.remove(&key); 115 let response = execute_live_request( 116 self.session_for(&request.server, session_kind)?, 117 &request, 118 accepts_uncorrelated_subscription_response, 119 ); 120 match response { 121 Ok(response) => Ok(response), 122 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(_)) => { 123 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) 124 } 125 Err(error) => Err(error), 126 } 127 } 128 Err(error) => Err(error), 129 } 130 } 131 } 132 133 impl RadrootsSimplexSmpSubscriptionTransport for RadrootsSimplexSmpTlsCommandTransport { 134 fn receive_subscription( 135 &mut self, 136 request: RadrootsSimplexSmpSubscriptionReceiveRequest, 137 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> { 138 let key = Self::session_key(&request.server, "subscription"); 139 match read_live_response( 140 self.session_for(&request.server, "subscription")?, 141 &request.server, 142 None, 143 true, 144 None, 145 ) { 146 Ok(response) => Ok(response), 147 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => { 148 self.sessions.remove(&key); 149 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) 150 } 151 Err(error) => Err(error), 152 } 153 } 154 } 155 156 fn session_kind_for_command( 157 command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand, 158 ) -> &'static str { 159 match command { 160 radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub 161 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs 162 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub 163 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs 164 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Ack(_) => "subscription", 165 radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Get 166 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::LGet => "poll", 167 _ => "command", 168 } 169 } 170 171 fn accepts_uncorrelated_subscription_response( 172 command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand, 173 ) -> bool { 174 matches!( 175 command, 176 radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub 177 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs 178 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub 179 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs 180 | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Ack(_) 181 ) 182 } 183 184 fn execute_live_request( 185 session: &mut RadrootsSimplexSmpLiveSession, 186 request: &RadrootsSimplexSmpTransportRequest, 187 accept_uncorrelated_subscription_response: bool, 188 ) -> Result<RadrootsSimplexSmpTransportResponse, RadrootsSimplexSmpTransportError> { 189 let correlation_id = request 190 .correlation_id 191 .ok_or(RadrootsSimplexSmpTransportError::MissingCorrelationId)?; 192 let scope = RadrootsSimplexSmpQueueAuthorizationScope::new( 193 session.session_identifier.clone(), 194 correlation_id, 195 request.entity_id.clone(), 196 )?; 197 let material = RadrootsSimplexSmpQueueAuthorizationMaterial::for_command( 198 &scope, 199 &request.command, 200 session.transport_version, 201 &request.authorization, 202 )?; 203 let transmission = RadrootsSimplexSmpCommandTransmission { 204 authorization: material.authorization, 205 correlation_id: Some(correlation_id), 206 entity_id: request.entity_id.clone(), 207 command: request.command.clone(), 208 }; 209 let block = RadrootsSimplexSmpTransportBlock::from_command_transmissions( 210 &[transmission], 211 session.transport_version, 212 )?; 213 let encoded = encode_live_transport_block(session, &block)?; 214 session 215 .stream 216 .write_all(&encoded) 217 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 218 session 219 .stream 220 .flush() 221 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 222 223 let accepted_entity_id = 224 accept_uncorrelated_subscription_response.then_some(request.entity_id.as_slice()); 225 read_live_response( 226 session, 227 &request.server, 228 Some(correlation_id), 229 false, 230 accepted_entity_id, 231 )? 232 .ok_or_else(|| { 233 RadrootsSimplexSmpTransportError::LiveTransportIo( 234 "SMP command response was not available before the read timeout".into(), 235 ) 236 }) 237 } 238 239 fn read_live_response( 240 session: &mut RadrootsSimplexSmpLiveSession, 241 server: &RadrootsSimplexSmpServerAddress, 242 expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>, 243 timeout_is_empty: bool, 244 accepted_subscription_entity_id: Option<&[u8]>, 245 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> { 246 if expected_correlation_id.is_none() 247 && let Some(response) = session.pending_broker_responses.pop_front() 248 { 249 return Ok(Some(response)); 250 } 251 if let Some(entity_id) = accepted_subscription_entity_id 252 && let Some(position) = session 253 .pending_broker_responses 254 .iter() 255 .position(|response| is_subscription_response_for_entity(response, entity_id)) 256 { 257 return Ok(session.pending_broker_responses.remove(position)); 258 } 259 let mut response_block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE]; 260 if timeout_is_empty { 261 set_live_read_timeout(session, LIVE_EMPTY_SUBSCRIPTION_TIMEOUT)?; 262 } 263 let read_result = session.stream.read_exact(&mut response_block); 264 if timeout_is_empty { 265 set_live_read_timeout(session, LIVE_SESSION_TIMEOUT)?; 266 } 267 if let Err(error) = read_result { 268 if timeout_is_empty && matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) { 269 return Ok(None); 270 } 271 return Err(RadrootsSimplexSmpTransportError::LiveTransportIo( 272 error.to_string(), 273 )); 274 } 275 let response_hash = Sha256::digest(&response_block).to_vec(); 276 let decoded = decode_live_transport_block(session, &response_block)?; 277 let transmissions = decoded.decode_broker_transmissions(session.transport_version)?; 278 let responses = transmissions 279 .into_iter() 280 .map(|transmission| RadrootsSimplexSmpTransportResponse { 281 server: server.clone(), 282 transport_version: session.transport_version, 283 transmission, 284 transport_hash: response_hash.clone(), 285 }) 286 .collect::<Vec<_>>(); 287 select_live_response( 288 &mut session.pending_broker_responses, 289 responses, 290 expected_correlation_id, 291 accepted_subscription_entity_id, 292 ) 293 } 294 295 fn select_live_response( 296 pending_broker_responses: &mut VecDeque<RadrootsSimplexSmpTransportResponse>, 297 mut responses: Vec<RadrootsSimplexSmpTransportResponse>, 298 expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>, 299 accepted_subscription_entity_id: Option<&[u8]>, 300 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> { 301 if let Some(expected_correlation_id) = expected_correlation_id { 302 if let Some(position) = responses.iter().position(|response| { 303 response.transmission.correlation_id == Some(expected_correlation_id) 304 }) { 305 let matched_response = responses.remove(position); 306 pending_broker_responses.extend(responses); 307 return Ok(Some(matched_response)); 308 } 309 if let Some(entity_id) = accepted_subscription_entity_id 310 && let Some(position) = responses 311 .iter() 312 .position(|response| is_subscription_response_for_entity(response, entity_id)) 313 { 314 let matched_response = responses.remove(position); 315 pending_broker_responses.extend(responses); 316 return Ok(Some(matched_response)); 317 } 318 pending_broker_responses.extend(responses); 319 return Err(RadrootsSimplexSmpTransportError::CorrelationIdMismatch); 320 } 321 pending_broker_responses.extend(responses); 322 Ok(pending_broker_responses.pop_front()) 323 } 324 325 fn is_subscription_response_for_entity( 326 response: &RadrootsSimplexSmpTransportResponse, 327 entity_id: &[u8], 328 ) -> bool { 329 response.transmission.entity_id == entity_id 330 && matches!( 331 response.transmission.message, 332 RadrootsSimplexSmpBrokerMessage::Msg(_) 333 | RadrootsSimplexSmpBrokerMessage::NMsg { .. } 334 | RadrootsSimplexSmpBrokerMessage::Sok(_) 335 | RadrootsSimplexSmpBrokerMessage::Soks(_) 336 | RadrootsSimplexSmpBrokerMessage::Ok 337 | RadrootsSimplexSmpBrokerMessage::Err(_) 338 ) 339 } 340 341 fn set_live_read_timeout( 342 session: &mut RadrootsSimplexSmpLiveSession, 343 timeout: Duration, 344 ) -> Result<(), RadrootsSimplexSmpTransportError> { 345 session 346 .stream 347 .sock 348 .set_read_timeout(Some(timeout)) 349 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string())) 350 } 351 352 fn transport_debug_enabled() -> bool { 353 std::env::var_os("RADROOTS_SIMPLEX_DEBUG_TRANSPORT").is_some() 354 } 355 356 fn debug_sha256_label(label: &str, value: &[u8]) { 357 if transport_debug_enabled() { 358 eprintln!( 359 "[simplex-smp-transport] {label}: len={} sha256={}", 360 value.len(), 361 URL_SAFE_NO_PAD.encode(Sha256::digest(value)), 362 ); 363 } 364 } 365 366 fn encode_live_transport_block( 367 session: &mut RadrootsSimplexSmpLiveSession, 368 block: &RadrootsSimplexSmpTransportBlock, 369 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> { 370 if session.transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION 371 && let Some(chain_key) = session.send_chain_key.as_mut() 372 { 373 return encode_encrypted_transport_payload(chain_key, &block.encode_payload()?); 374 } 375 block.encode() 376 } 377 378 fn encode_encrypted_transport_payload( 379 chain_key: &mut RadrootsSimplexSmpSecretBoxChainKey, 380 payload: &[u8], 381 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> { 382 let ((secretbox_key, nonce), next_chain_key) = advance_secretbox_chain(chain_key)?; 383 *chain_key = next_chain_key; 384 encrypt_padded( 385 &secretbox_key, 386 &nonce, 387 payload, 388 RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE - 16, 389 ) 390 .map_err(Into::into) 391 } 392 393 fn decode_live_transport_block( 394 session: &mut RadrootsSimplexSmpLiveSession, 395 bytes: &[u8], 396 ) -> Result<RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportError> { 397 if session.transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION 398 && let Some(chain_key) = session.receive_chain_key.as_mut() 399 { 400 match decode_encrypted_transport_block(chain_key, bytes) { 401 Ok(block) => { 402 let payload = block.encode_payload()?; 403 debug_sha256_label("live-response-payload", &payload); 404 return Ok(block); 405 } 406 Err(error) => { 407 if transport_debug_enabled() { 408 eprintln!("[simplex-smp-transport] live response decrypt failed: {error}"); 409 debug_sha256_label("live-response-ciphertext", bytes); 410 } 411 if let Some(send_chain_key) = session.send_chain_key.as_ref() { 412 let mut alternate_chain_key = send_chain_key.clone(); 413 if decode_encrypted_transport_block(&mut alternate_chain_key, bytes).is_ok() { 414 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 415 "server response decrypted with the outbound chain key; live SMP block direction is assigned incorrectly".into(), 416 )); 417 } 418 } 419 debug_probe_transport_candidates(session, bytes); 420 if let Ok(block) = RadrootsSimplexSmpTransportBlock::decode(bytes) { 421 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 422 format!( 423 "server returned plaintext SMP block while encrypted transport was expected: {:?}", 424 block.transmissions.first().map(|t| &t[..t.len().min(8)]) 425 ), 426 )); 427 } 428 return Err(error.into()); 429 } 430 } 431 } 432 RadrootsSimplexSmpTransportBlock::decode(bytes) 433 } 434 435 fn decode_encrypted_transport_block( 436 chain_key: &mut RadrootsSimplexSmpSecretBoxChainKey, 437 bytes: &[u8], 438 ) -> Result<RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportError> { 439 let ((secretbox_key, nonce), next_chain_key) = advance_secretbox_chain(chain_key)?; 440 let payload = 441 radroots_simplex_smp_crypto::prelude::decrypt_padded(&secretbox_key, &nonce, bytes)?; 442 let block = RadrootsSimplexSmpTransportBlock::from_payload(&payload)?; 443 *chain_key = next_chain_key; 444 Ok(block) 445 } 446 447 fn debug_probe_transport_candidates(session: &mut RadrootsSimplexSmpLiveSession, bytes: &[u8]) { 448 if !transport_debug_enabled() { 449 return; 450 } 451 let Some(shared_secret) = session.debug_shared_secret.as_ref() else { 452 return; 453 }; 454 let Ok((first_chain_key, second_chain_key)) = 455 init_secretbox_chain(&session.session_identifier, shared_secret) 456 else { 457 return; 458 }; 459 for (label, chain_key) in [ 460 ("initial-first", first_chain_key), 461 ("initial-second", second_chain_key), 462 ] { 463 let Ok(((secretbox_key, nonce), _)) = advance_secretbox_chain(&chain_key) else { 464 continue; 465 }; 466 let result = 467 radroots_simplex_smp_crypto::prelude::decrypt_padded(&secretbox_key, &nonce, bytes); 468 match result { 469 Ok(payload) => { 470 eprintln!("[simplex-smp-transport] debug candidate {label} decrypted live block"); 471 debug_sha256_label("debug-candidate-payload", &payload); 472 } 473 Err(error) => { 474 eprintln!("[simplex-smp-transport] debug candidate {label} failed: {error}"); 475 } 476 } 477 } 478 } 479 480 fn connect_live_session( 481 server: &RadrootsSimplexSmpServerAddress, 482 ) -> Result<RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> { 483 let mut last_error = None; 484 for host in &server.hosts { 485 match connect_live_session_host(server, host) { 486 Ok(session) => return Ok(session), 487 Err(error) => last_error = Some(error), 488 } 489 } 490 491 Err(last_error.unwrap_or_else(|| { 492 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 493 "SMP server `{}` has no usable hosts", 494 server.server_identity 495 )) 496 })) 497 } 498 499 fn connect_live_session_host( 500 server: &RadrootsSimplexSmpServerAddress, 501 host: &str, 502 ) -> Result<RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> { 503 let port = server.port.unwrap_or(5223); 504 let mut addresses = (host, port).to_socket_addrs().map_err(|error| { 505 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 506 "failed to resolve SMP server host `{host}:{port}`: {error}" 507 )) 508 })?; 509 let socket_addr = addresses.next().ok_or_else(|| { 510 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 511 "failed to resolve SMP server host `{host}:{port}`" 512 )) 513 })?; 514 let tcp = TcpStream::connect_timeout(&socket_addr, LIVE_SESSION_TIMEOUT).map_err(|error| { 515 RadrootsSimplexSmpTransportError::LiveTransportIo(format!( 516 "failed to connect to SMP server `{host}:{port}`: {error}" 517 )) 518 })?; 519 tcp.set_nodelay(true) 520 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 521 tcp.set_read_timeout(Some(LIVE_SESSION_TIMEOUT)) 522 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 523 tcp.set_write_timeout(Some(LIVE_SESSION_TIMEOUT)) 524 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 525 526 let server_name = match host.parse::<IpAddr>() { 527 Ok(address) => ServerName::IpAddress(address.into()), 528 Err(_) => ServerName::try_from(host.to_owned()).map_err(|_| { 529 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 530 "invalid SMP server name `{host}`" 531 )) 532 })?, 533 }; 534 let verifier = Arc::new(PermissiveSimplexServerVerifier); 535 let mut config = ClientConfig::builder() 536 .dangerous() 537 .with_custom_certificate_verifier(verifier) 538 .with_no_client_auth(); 539 config.alpn_protocols = vec![RADROOTS_SIMPLEX_SMP_TLS_ALPN_V1.as_bytes().to_vec()]; 540 541 let mut stream = StreamOwned::new( 542 ClientConnection::new(Arc::new(config), server_name).map_err(|error| { 543 RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()) 544 })?, 545 tcp, 546 ); 547 while stream.conn.is_handshaking() { 548 stream.conn.complete_io(&mut stream.sock).map_err(|error| { 549 RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()) 550 })?; 551 } 552 553 let peer_certs = stream 554 .conn 555 .peer_certificates() 556 .ok_or(RadrootsSimplexSmpTransportError::MissingPeerCertificates)? 557 .to_vec(); 558 let server_hello = read_server_hello(&mut stream)?; 559 let actual_identity = matching_server_identity(&peer_certs, &server.server_identity)?; 560 let expected_identity = canonical_server_identity(&server.server_identity)?; 561 let mut policy = RadrootsSimplexSmpTlsPolicy::modern(expected_identity.clone()); 562 policy.require_tls_unique_binding = false; 563 let transport_version = validate_tls_handshake( 564 &policy, 565 &server_hello, 566 &RadrootsSimplexSmpTlsHandshakeEvidence { 567 confirmed_alpn: stream 568 .conn 569 .alpn_protocol() 570 .map(|value| String::from_utf8_lossy(value).into_owned()), 571 session_resumed: false, 572 certificate_chain_length: peer_certs.len(), 573 online_certificate_fingerprint: actual_identity, 574 tls_unique_channel_binding: None, 575 }, 576 )?; 577 let transport_keypair = 578 if transport_version >= RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION { 579 Some(RadrootsSimplexSmpX25519Keypair::generate()?) 580 } else { 581 None 582 }; 583 let client_hello = RadrootsSimplexSmpClientHello { 584 chosen_version: transport_version, 585 server_key_hash: decode_server_identity(&expected_identity)?, 586 client_key: transport_keypair 587 .as_ref() 588 .map(|keypair| encode_x25519_public_key_x509(&keypair.public_key)) 589 .transpose()?, 590 proxy_server: false, 591 ignored_part: Vec::new(), 592 }; 593 let encoded_client_hello = client_hello.encode()?; 594 if transport_debug_enabled() { 595 debug_sha256_label("client-hello", &encoded_client_hello); 596 debug_sha256_label("server-session-id", &server_hello.session_identifier); 597 } 598 stream 599 .write_all(&encoded_client_hello) 600 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 601 stream 602 .flush() 603 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 604 605 let mut debug_shared_secret = None; 606 let (receive_chain_key, send_chain_key) = 607 if transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION { 608 let server_key = decode_server_transport_public_key( 609 server_hello 610 .server_proof 611 .as_ref() 612 .ok_or(RadrootsSimplexSmpTransportError::MissingServerProof)?, 613 )?; 614 let shared_secret = derive_shared_secret( 615 &transport_keypair 616 .as_ref() 617 .ok_or(RadrootsSimplexSmpTransportError::MissingServerProof)? 618 .private_key, 619 &server_key, 620 )?; 621 if transport_debug_enabled() { 622 if let Some(keypair) = transport_keypair.as_ref() { 623 debug_sha256_label("client-transport-public-key", &keypair.public_key); 624 } 625 debug_sha256_label("server-transport-public-key", &server_key); 626 } 627 debug_shared_secret = transport_debug_enabled().then_some(shared_secret.clone()); 628 let (receive_chain_key, send_chain_key) = 629 init_secretbox_chain(&server_hello.session_identifier, &shared_secret)?; 630 (Some(receive_chain_key), Some(send_chain_key)) 631 } else { 632 (None, None) 633 }; 634 635 Ok(RadrootsSimplexSmpLiveSession { 636 stream, 637 transport_version, 638 session_identifier: server_hello.session_identifier, 639 send_chain_key, 640 receive_chain_key, 641 debug_shared_secret, 642 pending_broker_responses: VecDeque::new(), 643 }) 644 } 645 646 fn decode_server_transport_public_key( 647 proof: &RadrootsSimplexSmpTransportServerProof, 648 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> { 649 let (signed_object, signature) = decode_signed_server_key_parts(&proof.signed_server_key)?; 650 if transport_debug_enabled() { 651 eprintln!( 652 "[simplex-smp-transport] signed-server-key: proof_len={} signed_object_len={} signature_len={}", 653 proof.signed_server_key.len(), 654 signed_object.len(), 655 signature.len() 656 ); 657 } 658 if !proof.certificate_payload.is_empty() { 659 let verify_key = decode_server_certificate_verify_key(&proof.certificate_payload)?; 660 verify_signature(signed_object, &verify_key, signature).map_err(|error| { 661 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 662 "failed to verify SMP server transport key signature: {error}" 663 )) 664 })?; 665 } 666 667 decode_x25519_public_key_x509(signed_object) 668 .or_else(|_| { 669 first_der_sequence_element(signed_object) 670 .and_then(|candidate| decode_x25519_public_key_x509(candidate).map_err(Into::into)) 671 }) 672 .map_err(|error: RadrootsSimplexSmpTransportError| { 673 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 674 "failed to decode verified SMP server transport key: {error}" 675 )) 676 }) 677 } 678 679 fn first_der_sequence_element(bytes: &[u8]) -> Result<&[u8], RadrootsSimplexSmpTransportError> { 680 let (sequence_tag, _, sequence_header_end, sequence_content_end) = parse_der_element(bytes, 0)?; 681 if sequence_tag != 0x30 { 682 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 683 "invalid SMP server proof: expected DER sequence".into(), 684 )); 685 } 686 let (_, element_start, _, element_end) = parse_der_element(bytes, sequence_header_end)?; 687 if element_end > sequence_content_end { 688 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 689 "invalid SMP server proof: first element exceeds sequence bounds".into(), 690 )); 691 } 692 Ok(&bytes[element_start..element_end]) 693 } 694 695 fn decode_signed_server_key_parts( 696 bytes: &[u8], 697 ) -> Result<(&[u8], &[u8]), RadrootsSimplexSmpTransportError> { 698 let (sequence_tag, _, sequence_header_end, sequence_content_end) = parse_der_element(bytes, 0)?; 699 if sequence_tag != 0x30 { 700 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 701 "invalid SMP server proof: signed key is not a DER sequence".into(), 702 )); 703 } 704 705 let (_, signed_object_start, _, signed_object_end) = 706 parse_der_element(bytes, sequence_header_end)?; 707 let (_, _, _, algorithm_end) = parse_der_element(bytes, signed_object_end)?; 708 if algorithm_end > sequence_content_end { 709 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 710 "invalid SMP server proof: signature algorithm exceeds sequence bounds".into(), 711 )); 712 } 713 let (signature_tag, _, signature_value_start, signature_end) = 714 parse_der_element(bytes, algorithm_end)?; 715 if signature_tag != 0x03 { 716 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 717 "invalid SMP server proof: expected DER bit string signature".into(), 718 )); 719 } 720 if signature_end > sequence_content_end { 721 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 722 "invalid SMP server proof: signature exceeds sequence bounds".into(), 723 )); 724 } 725 let signature_value = bytes 726 .get(signature_value_start..signature_end) 727 .ok_or_else(|| { 728 RadrootsSimplexSmpTransportError::InvalidServerAddress( 729 "invalid SMP server proof: truncated signature".into(), 730 ) 731 })?; 732 let (unused_bits, signature) = signature_value.split_first().ok_or_else(|| { 733 RadrootsSimplexSmpTransportError::InvalidServerAddress( 734 "invalid SMP server proof: missing signature payload".into(), 735 ) 736 })?; 737 if *unused_bits != 0 { 738 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 739 "invalid SMP server proof: unsupported signature bit padding".into(), 740 )); 741 } 742 Ok((&bytes[signed_object_start..signed_object_end], signature)) 743 } 744 745 fn decode_server_certificate_verify_key( 746 certificate_payload: &[u8], 747 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> { 748 let Some(&cert_count) = certificate_payload.first() else { 749 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 750 "invalid SMP server proof: missing certificate chain".into(), 751 )); 752 }; 753 if cert_count == 0 { 754 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 755 "invalid SMP server proof: empty certificate chain".into(), 756 )); 757 } 758 let (certificate_der, _) = read_large_handshake_field(certificate_payload, 1)?; 759 let (_, certificate) = x509_parser::certificate::X509Certificate::from_der(&certificate_der) 760 .map_err(|error| { 761 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 762 "failed to parse SMP proof certificate: {error}" 763 )) 764 })?; 765 Ok(certificate 766 .tbs_certificate 767 .subject_pki 768 .subject_public_key 769 .data 770 .to_vec()) 771 } 772 773 fn read_large_handshake_field( 774 bytes: &[u8], 775 offset: usize, 776 ) -> Result<(Vec<u8>, usize), RadrootsSimplexSmpTransportError> { 777 let Some(length_bytes) = bytes.get(offset..offset + 2) else { 778 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 779 "invalid SMP server proof: truncated certificate length".into(), 780 )); 781 }; 782 let length = u16::from_be_bytes([length_bytes[0], length_bytes[1]]) as usize; 783 let start = offset + 2; 784 let end = start + length; 785 let value = bytes.get(start..end).ok_or_else(|| { 786 RadrootsSimplexSmpTransportError::InvalidServerAddress( 787 "invalid SMP server proof: certificate exceeds payload".into(), 788 ) 789 })?; 790 Ok((value.to_vec(), end)) 791 } 792 793 fn parse_der_element( 794 bytes: &[u8], 795 offset: usize, 796 ) -> Result<(u8, usize, usize, usize), RadrootsSimplexSmpTransportError> { 797 let tag = *bytes.get(offset).ok_or_else(|| { 798 RadrootsSimplexSmpTransportError::InvalidServerAddress( 799 "invalid SMP server proof: truncated DER element".into(), 800 ) 801 })?; 802 let length_offset = offset + 1; 803 let length_tag = *bytes.get(length_offset).ok_or_else(|| { 804 RadrootsSimplexSmpTransportError::InvalidServerAddress( 805 "invalid SMP server proof: missing DER length".into(), 806 ) 807 })?; 808 let (value_len, header_len) = if length_tag & 0x80 == 0 { 809 (length_tag as usize, 2) 810 } else { 811 let length_bytes = (length_tag & 0x7f) as usize; 812 if length_bytes == 0 || length_bytes > 4 { 813 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 814 "invalid SMP server proof: unsupported DER length encoding".into(), 815 )); 816 } 817 let length_start = length_offset + 1; 818 let length_end = length_start + length_bytes; 819 let encoded_length = bytes.get(length_start..length_end).ok_or_else(|| { 820 RadrootsSimplexSmpTransportError::InvalidServerAddress( 821 "invalid SMP server proof: truncated DER length".into(), 822 ) 823 })?; 824 let value_len = encoded_length 825 .iter() 826 .fold(0_usize, |acc, byte| (acc << 8) | (*byte as usize)); 827 (value_len, 2 + length_bytes) 828 }; 829 let value_start = offset + header_len; 830 let value_end = value_start + value_len; 831 if value_end > bytes.len() { 832 return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress( 833 "invalid SMP server proof: DER element exceeds input".into(), 834 )); 835 } 836 Ok((tag, offset, value_start, value_end)) 837 } 838 839 fn read_server_hello( 840 stream: &mut StreamOwned<ClientConnection, TcpStream>, 841 ) -> Result<RadrootsSimplexSmpServerHello, RadrootsSimplexSmpTransportError> { 842 let mut block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE]; 843 stream 844 .read_exact(&mut block) 845 .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?; 846 RadrootsSimplexSmpServerHello::decode(&block) 847 } 848 849 fn matching_server_identity( 850 chain: &[CertificateDer<'static>], 851 expected_identity: &str, 852 ) -> Result<String, RadrootsSimplexSmpTransportError> { 853 let expected_identity = canonical_server_identity(expected_identity)?; 854 for certificate in chain { 855 let identity = server_identity_from_certificate(certificate.as_ref())?; 856 if identity == expected_identity { 857 return Ok(identity); 858 } 859 } 860 Err(RadrootsSimplexSmpTransportError::ServerIdentityMismatch { 861 expected: expected_identity, 862 actual: chain 863 .first() 864 .map(|certificate| server_identity_from_certificate(certificate.as_ref())) 865 .transpose()? 866 .unwrap_or_default(), 867 }) 868 } 869 870 fn server_identity_from_certificate( 871 der: &[u8], 872 ) -> Result<String, RadrootsSimplexSmpTransportError> { 873 x509_parser::certificate::X509Certificate::from_der(der).map_err(|error| { 874 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 875 "failed to parse SMP certificate: {error}" 876 )) 877 })?; 878 let digest = Sha256::digest(der); 879 Ok(URL_SAFE_NO_PAD.encode(digest)) 880 } 881 882 fn canonical_server_identity(value: &str) -> Result<String, RadrootsSimplexSmpTransportError> { 883 URL_SAFE_NO_PAD 884 .decode(value) 885 .or_else(|_| URL_SAFE.decode(value)) 886 .map(|decoded| URL_SAFE_NO_PAD.encode(decoded)) 887 .map_err(|_| { 888 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 889 "invalid base64url server identity `{value}`" 890 )) 891 }) 892 } 893 894 fn decode_server_identity(value: &str) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> { 895 URL_SAFE_NO_PAD 896 .decode(value) 897 .or_else(|_| URL_SAFE.decode(value)) 898 .map_err(|_| { 899 RadrootsSimplexSmpTransportError::InvalidServerAddress(format!( 900 "invalid base64url server identity `{value}`" 901 )) 902 }) 903 } 904 905 #[derive(Debug)] 906 struct PermissiveSimplexServerVerifier; 907 908 impl ServerCertVerifier for PermissiveSimplexServerVerifier { 909 fn verify_server_cert( 910 &self, 911 _end_entity: &CertificateDer<'_>, 912 _intermediates: &[CertificateDer<'_>], 913 _server_name: &ServerName<'_>, 914 _ocsp_response: &[u8], 915 _now: UnixTime, 916 ) -> Result<ServerCertVerified, RustlsError> { 917 Ok(ServerCertVerified::assertion()) 918 } 919 920 fn verify_tls12_signature( 921 &self, 922 _message: &[u8], 923 _cert: &CertificateDer<'_>, 924 _dss: &DigitallySignedStruct, 925 ) -> Result<HandshakeSignatureValid, RustlsError> { 926 Ok(HandshakeSignatureValid::assertion()) 927 } 928 929 fn verify_tls13_signature( 930 &self, 931 _message: &[u8], 932 _cert: &CertificateDer<'_>, 933 _dss: &DigitallySignedStruct, 934 ) -> Result<HandshakeSignatureValid, RustlsError> { 935 Ok(HandshakeSignatureValid::assertion()) 936 } 937 938 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> { 939 vec![ 940 SignatureScheme::ED25519, 941 SignatureScheme::ECDSA_NISTP256_SHA256, 942 SignatureScheme::ECDSA_NISTP384_SHA384, 943 SignatureScheme::RSA_PSS_SHA256, 944 SignatureScheme::RSA_PSS_SHA384, 945 SignatureScheme::RSA_PKCS1_SHA256, 946 SignatureScheme::RSA_PKCS1_SHA384, 947 ] 948 } 949 } 950 951 #[cfg(test)] 952 mod tests { 953 use super::{ 954 canonical_server_identity, decode_encrypted_transport_block, 955 decode_server_transport_public_key, encode_encrypted_transport_payload, 956 select_live_response, 957 }; 958 use crate::handshake::RadrootsSimplexSmpTransportServerProof; 959 use crate::prelude::{RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportResponse}; 960 use radroots_simplex_smp_crypto::prelude::{ 961 RadrootsSimplexSmpX25519Keypair, encode_x25519_public_key_x509, init_secretbox_chain, 962 }; 963 use radroots_simplex_smp_proto::prelude::{ 964 RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage, 965 RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpCommand, 966 RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId, 967 RadrootsSimplexSmpReceivedMessage, RadrootsSimplexSmpServerAddress, 968 }; 969 use std::collections::VecDeque; 970 971 #[test] 972 fn canonicalizes_padded_and_unpadded_server_identity() { 973 assert_eq!(canonical_server_identity("YWJjZA").unwrap(), "YWJjZA"); 974 assert_eq!(canonical_server_identity("YWJjZA==").unwrap(), "YWJjZA"); 975 } 976 977 #[test] 978 fn extracts_spki_from_signed_server_key_sequence() { 979 let keypair = RadrootsSimplexSmpX25519Keypair::from_seed(b"transport-proof"); 980 let spki = encode_x25519_public_key_x509(&keypair.public_key).unwrap(); 981 let empty_sequence = der_sequence(core::iter::once(&[][..])); 982 let signature = [0x03, 0x01, 0x00]; 983 let signed_object = der_sequence([ 984 spki.as_slice(), 985 empty_sequence.as_slice(), 986 signature.as_slice(), 987 ]); 988 let proof = RadrootsSimplexSmpTransportServerProof { 989 certificate_payload: Vec::new(), 990 signed_server_key: signed_object, 991 }; 992 assert_eq!( 993 decode_server_transport_public_key(&proof).unwrap(), 994 keypair.public_key 995 ); 996 } 997 998 #[test] 999 fn encrypted_transport_blocks_use_upstream_client_chain_direction() { 1000 let session_identifier = b"rr-synth-session-id"; 1001 let shared_secret = b"rr-synth-shared-secret"; 1002 let (mut server_send_chain, mut server_receive_chain) = 1003 init_secretbox_chain(session_identifier, shared_secret).unwrap(); 1004 let (client_receive_chain, client_send_chain) = 1005 init_secretbox_chain(session_identifier, shared_secret).unwrap(); 1006 let mut client_receive_chain_for_response = client_receive_chain.clone(); 1007 let mut client_send_chain_for_request = client_send_chain.clone(); 1008 1009 let command_transmission = RadrootsSimplexSmpCommandTransmission { 1010 authorization: Vec::new(), 1011 correlation_id: Some(RadrootsSimplexSmpCorrelationId::new([3_u8; 24])), 1012 entity_id: b"rr-synth-queue".to_vec(), 1013 command: RadrootsSimplexSmpCommand::Ping, 1014 }; 1015 let command_block = RadrootsSimplexSmpTransportBlock::from_command_transmissions( 1016 &[command_transmission.clone()], 1017 RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1018 ) 1019 .unwrap(); 1020 let encrypted_command = encode_encrypted_transport_payload( 1021 &mut client_send_chain_for_request, 1022 &command_block.encode_payload().unwrap(), 1023 ) 1024 .unwrap(); 1025 assert_eq!( 1026 decode_encrypted_transport_block(&mut server_receive_chain, &encrypted_command) 1027 .unwrap() 1028 .decode_command_transmissions(RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION) 1029 .unwrap(), 1030 vec![command_transmission] 1031 ); 1032 1033 let broker_transmission = RadrootsSimplexSmpBrokerTransmission { 1034 authorization: Vec::new(), 1035 correlation_id: Some(RadrootsSimplexSmpCorrelationId::new([3_u8; 24])), 1036 entity_id: b"rr-synth-queue".to_vec(), 1037 message: RadrootsSimplexSmpBrokerMessage::Ok, 1038 }; 1039 let broker_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions( 1040 &[broker_transmission.clone()], 1041 RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1042 ) 1043 .unwrap(); 1044 let encrypted_broker = encode_encrypted_transport_payload( 1045 &mut server_send_chain, 1046 &broker_block.encode_payload().unwrap(), 1047 ) 1048 .unwrap(); 1049 assert_eq!( 1050 decode_encrypted_transport_block( 1051 &mut client_receive_chain_for_response, 1052 &encrypted_broker, 1053 ) 1054 .unwrap() 1055 .decode_broker_transmissions(RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION) 1056 .unwrap(), 1057 vec![broker_transmission] 1058 ); 1059 1060 let mut wrong_response_chain = client_send_chain; 1061 let wrong_direction_broker = encode_encrypted_transport_payload( 1062 &mut wrong_response_chain, 1063 &broker_block.encode_payload().unwrap(), 1064 ) 1065 .unwrap(); 1066 let mut fresh_client_receive_chain = client_receive_chain; 1067 assert!( 1068 decode_encrypted_transport_block( 1069 &mut fresh_client_receive_chain, 1070 &wrong_direction_broker 1071 ) 1072 .is_err() 1073 ); 1074 } 1075 1076 #[test] 1077 fn ack_uses_subscription_session_state() { 1078 assert_eq!( 1079 super::session_kind_for_command(&RadrootsSimplexSmpCommand::Ack(b"message".to_vec())), 1080 "subscription" 1081 ); 1082 assert!(super::accepts_uncorrelated_subscription_response( 1083 &RadrootsSimplexSmpCommand::Ack(b"message".to_vec()) 1084 )); 1085 assert!(super::accepts_uncorrelated_subscription_response( 1086 &RadrootsSimplexSmpCommand::Sub 1087 )); 1088 } 1089 1090 #[test] 1091 fn strict_command_selection_buffers_unmatched_response_and_errors() { 1092 let mut pending = VecDeque::new(); 1093 let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]); 1094 let unmatched = response( 1095 Some(RadrootsSimplexSmpCorrelationId::new([2_u8; 24])), 1096 b"rr-synth-entity", 1097 RadrootsSimplexSmpBrokerMessage::Ok, 1098 ); 1099 1100 assert_eq!( 1101 select_live_response(&mut pending, vec![unmatched.clone()], Some(expected), None) 1102 .unwrap_err(), 1103 crate::prelude::RadrootsSimplexSmpTransportError::CorrelationIdMismatch 1104 ); 1105 assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![unmatched]); 1106 } 1107 1108 #[test] 1109 fn matched_response_wins_and_buffers_subscription_message() { 1110 let mut pending = VecDeque::new(); 1111 let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]); 1112 let message = response( 1113 None, 1114 b"rr-synth-entity", 1115 RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage { 1116 message_id: b"message-1".to_vec(), 1117 encrypted_body: b"body".to_vec(), 1118 }), 1119 ); 1120 let matched = response( 1121 Some(expected), 1122 b"rr-synth-entity", 1123 RadrootsSimplexSmpBrokerMessage::Sok(None), 1124 ); 1125 1126 let selected = select_live_response( 1127 &mut pending, 1128 vec![message.clone(), matched.clone()], 1129 Some(expected), 1130 Some(b"rr-synth-entity"), 1131 ) 1132 .unwrap(); 1133 1134 assert_eq!(selected, Some(matched)); 1135 assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![message]); 1136 } 1137 1138 #[test] 1139 fn subscription_selection_accepts_uncorrelated_message_for_entity() { 1140 let mut pending = VecDeque::new(); 1141 let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]); 1142 let message = response( 1143 None, 1144 b"rr-synth-entity", 1145 RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage { 1146 message_id: b"message-1".to_vec(), 1147 encrypted_body: b"body".to_vec(), 1148 }), 1149 ); 1150 let other = response( 1151 None, 1152 b"rr-other-entity", 1153 RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage { 1154 message_id: b"message-2".to_vec(), 1155 encrypted_body: b"other".to_vec(), 1156 }), 1157 ); 1158 1159 let selected = select_live_response( 1160 &mut pending, 1161 vec![other.clone(), message.clone()], 1162 Some(expected), 1163 Some(b"rr-synth-entity"), 1164 ) 1165 .unwrap(); 1166 1167 assert_eq!(selected, Some(message)); 1168 assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![other]); 1169 } 1170 1171 fn der_sequence<'a, I>(elements: I) -> Vec<u8> 1172 where 1173 I: IntoIterator<Item = &'a [u8]>, 1174 { 1175 let mut body = Vec::new(); 1176 for element in elements { 1177 if element.is_empty() { 1178 body.extend_from_slice(&[0x30, 0x00]); 1179 } else { 1180 body.extend_from_slice(element); 1181 } 1182 } 1183 let mut sequence = vec![0x30]; 1184 push_der_length(&mut sequence, body.len()); 1185 sequence.extend_from_slice(&body); 1186 sequence 1187 } 1188 1189 fn push_der_length(buffer: &mut Vec<u8>, len: usize) { 1190 if len < 0x80 { 1191 buffer.push(len as u8); 1192 return; 1193 } 1194 let mut bytes = Vec::new(); 1195 let mut remaining = len; 1196 while remaining > 0 { 1197 bytes.push((remaining & 0xff) as u8); 1198 remaining >>= 8; 1199 } 1200 bytes.reverse(); 1201 buffer.push(0x80 | (bytes.len() as u8)); 1202 buffer.extend_from_slice(&bytes); 1203 } 1204 1205 fn response( 1206 correlation_id: Option<RadrootsSimplexSmpCorrelationId>, 1207 entity_id: &[u8], 1208 message: RadrootsSimplexSmpBrokerMessage, 1209 ) -> RadrootsSimplexSmpTransportResponse { 1210 RadrootsSimplexSmpTransportResponse { 1211 server: RadrootsSimplexSmpServerAddress { 1212 server_identity: "cnItc3ludGgtc2VydmVy".to_owned(), 1213 hosts: vec!["127.0.0.1".to_owned()], 1214 port: Some(5223), 1215 }, 1216 transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, 1217 transmission: RadrootsSimplexSmpBrokerTransmission { 1218 authorization: Vec::new(), 1219 correlation_id, 1220 entity_id: entity_id.to_vec(), 1221 message, 1222 }, 1223 transport_hash: vec![9_u8; 32], 1224 } 1225 } 1226 }