lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

client.rs (48380B)


      1 #![cfg(feature = "std")]
      2 
      3 use crate::error::RadrootsSimplexSmpTransportError;
      4 use crate::executor::{
      5     RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
      6     RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest,
      7     RadrootsSimplexSmpTransportResponse,
      8 };
      9 use crate::frame::{RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE, RadrootsSimplexSmpTransportBlock};
     10 use crate::handshake::{
     11     RADROOTS_SIMPLEX_SMP_TLS_ALPN_V1, RadrootsSimplexSmpClientHello, RadrootsSimplexSmpServerHello,
     12     RadrootsSimplexSmpTlsHandshakeEvidence, RadrootsSimplexSmpTlsPolicy,
     13     RadrootsSimplexSmpTransportServerProof, validate_tls_handshake,
     14 };
     15 use base64::Engine as _;
     16 use base64::engine::general_purpose::{URL_SAFE, URL_SAFE_NO_PAD};
     17 use radroots_simplex_smp_crypto::prelude::{
     18     RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope,
     19     RadrootsSimplexSmpSecretBoxChainKey, RadrootsSimplexSmpX25519Keypair, advance_secretbox_chain,
     20     decode_x25519_public_key_x509, derive_shared_secret, encode_x25519_public_key_x509,
     21     encrypt_padded, init_secretbox_chain, verify_signature,
     22 };
     23 use radroots_simplex_smp_proto::prelude::{
     24     RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION,
     25     RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage,
     26     RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId,
     27     RadrootsSimplexSmpServerAddress,
     28 };
     29 use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
     30 use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
     31 use rustls::{
     32     ClientConfig, ClientConnection, DigitallySignedStruct, Error as RustlsError, SignatureScheme,
     33     StreamOwned,
     34 };
     35 use sha2::{Digest, Sha256};
     36 use std::collections::{BTreeMap, VecDeque};
     37 use std::io::{ErrorKind, Read, Write};
     38 use std::net::{IpAddr, TcpStream, ToSocketAddrs};
     39 use std::sync::Arc;
     40 use std::time::Duration;
     41 use x509_parser::prelude::FromDer;
     42 
     43 #[derive(Default)]
     44 pub struct RadrootsSimplexSmpTlsCommandTransport {
     45     sessions: BTreeMap<String, RadrootsSimplexSmpLiveSession>,
     46 }
     47 
     48 const LIVE_SESSION_TIMEOUT: Duration = Duration::from_secs(5);
     49 const LIVE_EMPTY_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_millis(150);
     50 
     51 struct RadrootsSimplexSmpLiveSession {
     52     stream: StreamOwned<ClientConnection, TcpStream>,
     53     transport_version: u16,
     54     session_identifier: Vec<u8>,
     55     send_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>,
     56     receive_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>,
     57     debug_shared_secret: Option<Vec<u8>>,
     58     pending_broker_responses: VecDeque<RadrootsSimplexSmpTransportResponse>,
     59 }
     60 
     61 impl RadrootsSimplexSmpTlsCommandTransport {
     62     pub fn new() -> Self {
     63         Self::default()
     64     }
     65 
     66     fn session_key(server: &RadrootsSimplexSmpServerAddress, kind: &str) -> String {
     67         let mut key = server.server_identity.clone();
     68         key.push('@');
     69         key.push_str(&server.hosts.join(","));
     70         key.push(':');
     71         key.push_str(&server.port.unwrap_or(5223).to_string());
     72         key.push('#');
     73         key.push_str(kind);
     74         key
     75     }
     76 
     77     fn session_for(
     78         &mut self,
     79         server: &RadrootsSimplexSmpServerAddress,
     80         kind: &str,
     81     ) -> Result<&mut RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> {
     82         let key = Self::session_key(server, kind);
     83         if !self.sessions.contains_key(&key) {
     84             let session = connect_live_session(server)?;
     85             self.sessions.insert(key.clone(), session);
     86         }
     87         self.sessions.get_mut(&key).ok_or_else(|| {
     88             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
     89                 "missing live SMP session for `{}`",
     90                 server.server_identity
     91             ))
     92         })
     93     }
     94 }
     95 
     96 impl RadrootsSimplexSmpCommandTransport for RadrootsSimplexSmpTlsCommandTransport {
     97     type Error = RadrootsSimplexSmpTransportError;
     98 
     99     fn execute(
    100         &mut self,
    101         request: RadrootsSimplexSmpTransportRequest,
    102     ) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> {
    103         let session_kind = session_kind_for_command(&request.command);
    104         let key = Self::session_key(&request.server, session_kind);
    105         let accepts_uncorrelated_subscription_response =
    106             accepts_uncorrelated_subscription_response(&request.command);
    107         match execute_live_request(
    108             self.session_for(&request.server, session_kind)?,
    109             &request,
    110             accepts_uncorrelated_subscription_response,
    111         ) {
    112             Ok(response) => Ok(response),
    113             Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
    114                 self.sessions.remove(&key);
    115                 let response = execute_live_request(
    116                     self.session_for(&request.server, session_kind)?,
    117                     &request,
    118                     accepts_uncorrelated_subscription_response,
    119                 );
    120                 match response {
    121                     Ok(response) => Ok(response),
    122                     Err(RadrootsSimplexSmpTransportError::LiveTransportIo(_)) => {
    123                         Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error))
    124                     }
    125                     Err(error) => Err(error),
    126                 }
    127             }
    128             Err(error) => Err(error),
    129         }
    130     }
    131 }
    132 
    133 impl RadrootsSimplexSmpSubscriptionTransport for RadrootsSimplexSmpTlsCommandTransport {
    134     fn receive_subscription(
    135         &mut self,
    136         request: RadrootsSimplexSmpSubscriptionReceiveRequest,
    137     ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> {
    138         let key = Self::session_key(&request.server, "subscription");
    139         match read_live_response(
    140             self.session_for(&request.server, "subscription")?,
    141             &request.server,
    142             None,
    143             true,
    144             None,
    145         ) {
    146             Ok(response) => Ok(response),
    147             Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
    148                 self.sessions.remove(&key);
    149                 Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error))
    150             }
    151             Err(error) => Err(error),
    152         }
    153     }
    154 }
    155 
    156 fn session_kind_for_command(
    157     command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand,
    158 ) -> &'static str {
    159     match command {
    160         radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub
    161         | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs
    162         | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub
    163         | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs
    164         | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Ack(_) => "subscription",
    165         radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Get
    166         | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::LGet => "poll",
    167         _ => "command",
    168     }
    169 }
    170 
    171 fn accepts_uncorrelated_subscription_response(
    172     command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand,
    173 ) -> bool {
    174     matches!(
    175         command,
    176         radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub
    177             | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs
    178             | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub
    179             | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs
    180             | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Ack(_)
    181     )
    182 }
    183 
    184 fn execute_live_request(
    185     session: &mut RadrootsSimplexSmpLiveSession,
    186     request: &RadrootsSimplexSmpTransportRequest,
    187     accept_uncorrelated_subscription_response: bool,
    188 ) -> Result<RadrootsSimplexSmpTransportResponse, RadrootsSimplexSmpTransportError> {
    189     let correlation_id = request
    190         .correlation_id
    191         .ok_or(RadrootsSimplexSmpTransportError::MissingCorrelationId)?;
    192     let scope = RadrootsSimplexSmpQueueAuthorizationScope::new(
    193         session.session_identifier.clone(),
    194         correlation_id,
    195         request.entity_id.clone(),
    196     )?;
    197     let material = RadrootsSimplexSmpQueueAuthorizationMaterial::for_command(
    198         &scope,
    199         &request.command,
    200         session.transport_version,
    201         &request.authorization,
    202     )?;
    203     let transmission = RadrootsSimplexSmpCommandTransmission {
    204         authorization: material.authorization,
    205         correlation_id: Some(correlation_id),
    206         entity_id: request.entity_id.clone(),
    207         command: request.command.clone(),
    208     };
    209     let block = RadrootsSimplexSmpTransportBlock::from_command_transmissions(
    210         &[transmission],
    211         session.transport_version,
    212     )?;
    213     let encoded = encode_live_transport_block(session, &block)?;
    214     session
    215         .stream
    216         .write_all(&encoded)
    217         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    218     session
    219         .stream
    220         .flush()
    221         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    222 
    223     let accepted_entity_id =
    224         accept_uncorrelated_subscription_response.then_some(request.entity_id.as_slice());
    225     read_live_response(
    226         session,
    227         &request.server,
    228         Some(correlation_id),
    229         false,
    230         accepted_entity_id,
    231     )?
    232     .ok_or_else(|| {
    233         RadrootsSimplexSmpTransportError::LiveTransportIo(
    234             "SMP command response was not available before the read timeout".into(),
    235         )
    236     })
    237 }
    238 
    239 fn read_live_response(
    240     session: &mut RadrootsSimplexSmpLiveSession,
    241     server: &RadrootsSimplexSmpServerAddress,
    242     expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
    243     timeout_is_empty: bool,
    244     accepted_subscription_entity_id: Option<&[u8]>,
    245 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> {
    246     if expected_correlation_id.is_none()
    247         && let Some(response) = session.pending_broker_responses.pop_front()
    248     {
    249         return Ok(Some(response));
    250     }
    251     if let Some(entity_id) = accepted_subscription_entity_id
    252         && let Some(position) = session
    253             .pending_broker_responses
    254             .iter()
    255             .position(|response| is_subscription_response_for_entity(response, entity_id))
    256     {
    257         return Ok(session.pending_broker_responses.remove(position));
    258     }
    259     let mut response_block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE];
    260     if timeout_is_empty {
    261         set_live_read_timeout(session, LIVE_EMPTY_SUBSCRIPTION_TIMEOUT)?;
    262     }
    263     let read_result = session.stream.read_exact(&mut response_block);
    264     if timeout_is_empty {
    265         set_live_read_timeout(session, LIVE_SESSION_TIMEOUT)?;
    266     }
    267     if let Err(error) = read_result {
    268         if timeout_is_empty && matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) {
    269             return Ok(None);
    270         }
    271         return Err(RadrootsSimplexSmpTransportError::LiveTransportIo(
    272             error.to_string(),
    273         ));
    274     }
    275     let response_hash = Sha256::digest(&response_block).to_vec();
    276     let decoded = decode_live_transport_block(session, &response_block)?;
    277     let transmissions = decoded.decode_broker_transmissions(session.transport_version)?;
    278     let responses = transmissions
    279         .into_iter()
    280         .map(|transmission| RadrootsSimplexSmpTransportResponse {
    281             server: server.clone(),
    282             transport_version: session.transport_version,
    283             transmission,
    284             transport_hash: response_hash.clone(),
    285         })
    286         .collect::<Vec<_>>();
    287     select_live_response(
    288         &mut session.pending_broker_responses,
    289         responses,
    290         expected_correlation_id,
    291         accepted_subscription_entity_id,
    292     )
    293 }
    294 
    295 fn select_live_response(
    296     pending_broker_responses: &mut VecDeque<RadrootsSimplexSmpTransportResponse>,
    297     mut responses: Vec<RadrootsSimplexSmpTransportResponse>,
    298     expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
    299     accepted_subscription_entity_id: Option<&[u8]>,
    300 ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> {
    301     if let Some(expected_correlation_id) = expected_correlation_id {
    302         if let Some(position) = responses.iter().position(|response| {
    303             response.transmission.correlation_id == Some(expected_correlation_id)
    304         }) {
    305             let matched_response = responses.remove(position);
    306             pending_broker_responses.extend(responses);
    307             return Ok(Some(matched_response));
    308         }
    309         if let Some(entity_id) = accepted_subscription_entity_id
    310             && let Some(position) = responses
    311                 .iter()
    312                 .position(|response| is_subscription_response_for_entity(response, entity_id))
    313         {
    314             let matched_response = responses.remove(position);
    315             pending_broker_responses.extend(responses);
    316             return Ok(Some(matched_response));
    317         }
    318         pending_broker_responses.extend(responses);
    319         return Err(RadrootsSimplexSmpTransportError::CorrelationIdMismatch);
    320     }
    321     pending_broker_responses.extend(responses);
    322     Ok(pending_broker_responses.pop_front())
    323 }
    324 
    325 fn is_subscription_response_for_entity(
    326     response: &RadrootsSimplexSmpTransportResponse,
    327     entity_id: &[u8],
    328 ) -> bool {
    329     response.transmission.entity_id == entity_id
    330         && matches!(
    331             response.transmission.message,
    332             RadrootsSimplexSmpBrokerMessage::Msg(_)
    333                 | RadrootsSimplexSmpBrokerMessage::NMsg { .. }
    334                 | RadrootsSimplexSmpBrokerMessage::Sok(_)
    335                 | RadrootsSimplexSmpBrokerMessage::Soks(_)
    336                 | RadrootsSimplexSmpBrokerMessage::Ok
    337                 | RadrootsSimplexSmpBrokerMessage::Err(_)
    338         )
    339 }
    340 
    341 fn set_live_read_timeout(
    342     session: &mut RadrootsSimplexSmpLiveSession,
    343     timeout: Duration,
    344 ) -> Result<(), RadrootsSimplexSmpTransportError> {
    345     session
    346         .stream
    347         .sock
    348         .set_read_timeout(Some(timeout))
    349         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))
    350 }
    351 
    352 fn transport_debug_enabled() -> bool {
    353     std::env::var_os("RADROOTS_SIMPLEX_DEBUG_TRANSPORT").is_some()
    354 }
    355 
    356 fn debug_sha256_label(label: &str, value: &[u8]) {
    357     if transport_debug_enabled() {
    358         eprintln!(
    359             "[simplex-smp-transport] {label}: len={} sha256={}",
    360             value.len(),
    361             URL_SAFE_NO_PAD.encode(Sha256::digest(value)),
    362         );
    363     }
    364 }
    365 
    366 fn encode_live_transport_block(
    367     session: &mut RadrootsSimplexSmpLiveSession,
    368     block: &RadrootsSimplexSmpTransportBlock,
    369 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> {
    370     if session.transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION
    371         && let Some(chain_key) = session.send_chain_key.as_mut()
    372     {
    373         return encode_encrypted_transport_payload(chain_key, &block.encode_payload()?);
    374     }
    375     block.encode()
    376 }
    377 
    378 fn encode_encrypted_transport_payload(
    379     chain_key: &mut RadrootsSimplexSmpSecretBoxChainKey,
    380     payload: &[u8],
    381 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> {
    382     let ((secretbox_key, nonce), next_chain_key) = advance_secretbox_chain(chain_key)?;
    383     *chain_key = next_chain_key;
    384     encrypt_padded(
    385         &secretbox_key,
    386         &nonce,
    387         payload,
    388         RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE - 16,
    389     )
    390     .map_err(Into::into)
    391 }
    392 
    393 fn decode_live_transport_block(
    394     session: &mut RadrootsSimplexSmpLiveSession,
    395     bytes: &[u8],
    396 ) -> Result<RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportError> {
    397     if session.transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION
    398         && let Some(chain_key) = session.receive_chain_key.as_mut()
    399     {
    400         match decode_encrypted_transport_block(chain_key, bytes) {
    401             Ok(block) => {
    402                 let payload = block.encode_payload()?;
    403                 debug_sha256_label("live-response-payload", &payload);
    404                 return Ok(block);
    405             }
    406             Err(error) => {
    407                 if transport_debug_enabled() {
    408                     eprintln!("[simplex-smp-transport] live response decrypt failed: {error}");
    409                     debug_sha256_label("live-response-ciphertext", bytes);
    410                 }
    411                 if let Some(send_chain_key) = session.send_chain_key.as_ref() {
    412                     let mut alternate_chain_key = send_chain_key.clone();
    413                     if decode_encrypted_transport_block(&mut alternate_chain_key, bytes).is_ok() {
    414                         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    415                             "server response decrypted with the outbound chain key; live SMP block direction is assigned incorrectly".into(),
    416                         ));
    417                     }
    418                 }
    419                 debug_probe_transport_candidates(session, bytes);
    420                 if let Ok(block) = RadrootsSimplexSmpTransportBlock::decode(bytes) {
    421                     return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    422                         format!(
    423                             "server returned plaintext SMP block while encrypted transport was expected: {:?}",
    424                             block.transmissions.first().map(|t| &t[..t.len().min(8)])
    425                         ),
    426                     ));
    427                 }
    428                 return Err(error.into());
    429             }
    430         }
    431     }
    432     RadrootsSimplexSmpTransportBlock::decode(bytes)
    433 }
    434 
    435 fn decode_encrypted_transport_block(
    436     chain_key: &mut RadrootsSimplexSmpSecretBoxChainKey,
    437     bytes: &[u8],
    438 ) -> Result<RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportError> {
    439     let ((secretbox_key, nonce), next_chain_key) = advance_secretbox_chain(chain_key)?;
    440     let payload =
    441         radroots_simplex_smp_crypto::prelude::decrypt_padded(&secretbox_key, &nonce, bytes)?;
    442     let block = RadrootsSimplexSmpTransportBlock::from_payload(&payload)?;
    443     *chain_key = next_chain_key;
    444     Ok(block)
    445 }
    446 
    447 fn debug_probe_transport_candidates(session: &mut RadrootsSimplexSmpLiveSession, bytes: &[u8]) {
    448     if !transport_debug_enabled() {
    449         return;
    450     }
    451     let Some(shared_secret) = session.debug_shared_secret.as_ref() else {
    452         return;
    453     };
    454     let Ok((first_chain_key, second_chain_key)) =
    455         init_secretbox_chain(&session.session_identifier, shared_secret)
    456     else {
    457         return;
    458     };
    459     for (label, chain_key) in [
    460         ("initial-first", first_chain_key),
    461         ("initial-second", second_chain_key),
    462     ] {
    463         let Ok(((secretbox_key, nonce), _)) = advance_secretbox_chain(&chain_key) else {
    464             continue;
    465         };
    466         let result =
    467             radroots_simplex_smp_crypto::prelude::decrypt_padded(&secretbox_key, &nonce, bytes);
    468         match result {
    469             Ok(payload) => {
    470                 eprintln!("[simplex-smp-transport] debug candidate {label} decrypted live block");
    471                 debug_sha256_label("debug-candidate-payload", &payload);
    472             }
    473             Err(error) => {
    474                 eprintln!("[simplex-smp-transport] debug candidate {label} failed: {error}");
    475             }
    476         }
    477     }
    478 }
    479 
    480 fn connect_live_session(
    481     server: &RadrootsSimplexSmpServerAddress,
    482 ) -> Result<RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> {
    483     let mut last_error = None;
    484     for host in &server.hosts {
    485         match connect_live_session_host(server, host) {
    486             Ok(session) => return Ok(session),
    487             Err(error) => last_error = Some(error),
    488         }
    489     }
    490 
    491     Err(last_error.unwrap_or_else(|| {
    492         RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    493             "SMP server `{}` has no usable hosts",
    494             server.server_identity
    495         ))
    496     }))
    497 }
    498 
    499 fn connect_live_session_host(
    500     server: &RadrootsSimplexSmpServerAddress,
    501     host: &str,
    502 ) -> Result<RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> {
    503     let port = server.port.unwrap_or(5223);
    504     let mut addresses = (host, port).to_socket_addrs().map_err(|error| {
    505         RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    506             "failed to resolve SMP server host `{host}:{port}`: {error}"
    507         ))
    508     })?;
    509     let socket_addr = addresses.next().ok_or_else(|| {
    510         RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    511             "failed to resolve SMP server host `{host}:{port}`"
    512         ))
    513     })?;
    514     let tcp = TcpStream::connect_timeout(&socket_addr, LIVE_SESSION_TIMEOUT).map_err(|error| {
    515         RadrootsSimplexSmpTransportError::LiveTransportIo(format!(
    516             "failed to connect to SMP server `{host}:{port}`: {error}"
    517         ))
    518     })?;
    519     tcp.set_nodelay(true)
    520         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    521     tcp.set_read_timeout(Some(LIVE_SESSION_TIMEOUT))
    522         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    523     tcp.set_write_timeout(Some(LIVE_SESSION_TIMEOUT))
    524         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    525 
    526     let server_name = match host.parse::<IpAddr>() {
    527         Ok(address) => ServerName::IpAddress(address.into()),
    528         Err(_) => ServerName::try_from(host.to_owned()).map_err(|_| {
    529             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    530                 "invalid SMP server name `{host}`"
    531             ))
    532         })?,
    533     };
    534     let verifier = Arc::new(PermissiveSimplexServerVerifier);
    535     let mut config = ClientConfig::builder()
    536         .dangerous()
    537         .with_custom_certificate_verifier(verifier)
    538         .with_no_client_auth();
    539     config.alpn_protocols = vec![RADROOTS_SIMPLEX_SMP_TLS_ALPN_V1.as_bytes().to_vec()];
    540 
    541     let mut stream = StreamOwned::new(
    542         ClientConnection::new(Arc::new(config), server_name).map_err(|error| {
    543             RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string())
    544         })?,
    545         tcp,
    546     );
    547     while stream.conn.is_handshaking() {
    548         stream.conn.complete_io(&mut stream.sock).map_err(|error| {
    549             RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string())
    550         })?;
    551     }
    552 
    553     let peer_certs = stream
    554         .conn
    555         .peer_certificates()
    556         .ok_or(RadrootsSimplexSmpTransportError::MissingPeerCertificates)?
    557         .to_vec();
    558     let server_hello = read_server_hello(&mut stream)?;
    559     let actual_identity = matching_server_identity(&peer_certs, &server.server_identity)?;
    560     let expected_identity = canonical_server_identity(&server.server_identity)?;
    561     let mut policy = RadrootsSimplexSmpTlsPolicy::modern(expected_identity.clone());
    562     policy.require_tls_unique_binding = false;
    563     let transport_version = validate_tls_handshake(
    564         &policy,
    565         &server_hello,
    566         &RadrootsSimplexSmpTlsHandshakeEvidence {
    567             confirmed_alpn: stream
    568                 .conn
    569                 .alpn_protocol()
    570                 .map(|value| String::from_utf8_lossy(value).into_owned()),
    571             session_resumed: false,
    572             certificate_chain_length: peer_certs.len(),
    573             online_certificate_fingerprint: actual_identity,
    574             tls_unique_channel_binding: None,
    575         },
    576     )?;
    577     let transport_keypair =
    578         if transport_version >= RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION {
    579             Some(RadrootsSimplexSmpX25519Keypair::generate()?)
    580         } else {
    581             None
    582         };
    583     let client_hello = RadrootsSimplexSmpClientHello {
    584         chosen_version: transport_version,
    585         server_key_hash: decode_server_identity(&expected_identity)?,
    586         client_key: transport_keypair
    587             .as_ref()
    588             .map(|keypair| encode_x25519_public_key_x509(&keypair.public_key))
    589             .transpose()?,
    590         proxy_server: false,
    591         ignored_part: Vec::new(),
    592     };
    593     let encoded_client_hello = client_hello.encode()?;
    594     if transport_debug_enabled() {
    595         debug_sha256_label("client-hello", &encoded_client_hello);
    596         debug_sha256_label("server-session-id", &server_hello.session_identifier);
    597     }
    598     stream
    599         .write_all(&encoded_client_hello)
    600         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    601     stream
    602         .flush()
    603         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    604 
    605     let mut debug_shared_secret = None;
    606     let (receive_chain_key, send_chain_key) =
    607         if transport_version >= RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION {
    608             let server_key = decode_server_transport_public_key(
    609                 server_hello
    610                     .server_proof
    611                     .as_ref()
    612                     .ok_or(RadrootsSimplexSmpTransportError::MissingServerProof)?,
    613             )?;
    614             let shared_secret = derive_shared_secret(
    615                 &transport_keypair
    616                     .as_ref()
    617                     .ok_or(RadrootsSimplexSmpTransportError::MissingServerProof)?
    618                     .private_key,
    619                 &server_key,
    620             )?;
    621             if transport_debug_enabled() {
    622                 if let Some(keypair) = transport_keypair.as_ref() {
    623                     debug_sha256_label("client-transport-public-key", &keypair.public_key);
    624                 }
    625                 debug_sha256_label("server-transport-public-key", &server_key);
    626             }
    627             debug_shared_secret = transport_debug_enabled().then_some(shared_secret.clone());
    628             let (receive_chain_key, send_chain_key) =
    629                 init_secretbox_chain(&server_hello.session_identifier, &shared_secret)?;
    630             (Some(receive_chain_key), Some(send_chain_key))
    631         } else {
    632             (None, None)
    633         };
    634 
    635     Ok(RadrootsSimplexSmpLiveSession {
    636         stream,
    637         transport_version,
    638         session_identifier: server_hello.session_identifier,
    639         send_chain_key,
    640         receive_chain_key,
    641         debug_shared_secret,
    642         pending_broker_responses: VecDeque::new(),
    643     })
    644 }
    645 
    646 fn decode_server_transport_public_key(
    647     proof: &RadrootsSimplexSmpTransportServerProof,
    648 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> {
    649     let (signed_object, signature) = decode_signed_server_key_parts(&proof.signed_server_key)?;
    650     if transport_debug_enabled() {
    651         eprintln!(
    652             "[simplex-smp-transport] signed-server-key: proof_len={} signed_object_len={} signature_len={}",
    653             proof.signed_server_key.len(),
    654             signed_object.len(),
    655             signature.len()
    656         );
    657     }
    658     if !proof.certificate_payload.is_empty() {
    659         let verify_key = decode_server_certificate_verify_key(&proof.certificate_payload)?;
    660         verify_signature(signed_object, &verify_key, signature).map_err(|error| {
    661             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    662                 "failed to verify SMP server transport key signature: {error}"
    663             ))
    664         })?;
    665     }
    666 
    667     decode_x25519_public_key_x509(signed_object)
    668         .or_else(|_| {
    669             first_der_sequence_element(signed_object)
    670                 .and_then(|candidate| decode_x25519_public_key_x509(candidate).map_err(Into::into))
    671         })
    672         .map_err(|error: RadrootsSimplexSmpTransportError| {
    673             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    674                 "failed to decode verified SMP server transport key: {error}"
    675             ))
    676         })
    677 }
    678 
    679 fn first_der_sequence_element(bytes: &[u8]) -> Result<&[u8], RadrootsSimplexSmpTransportError> {
    680     let (sequence_tag, _, sequence_header_end, sequence_content_end) = parse_der_element(bytes, 0)?;
    681     if sequence_tag != 0x30 {
    682         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    683             "invalid SMP server proof: expected DER sequence".into(),
    684         ));
    685     }
    686     let (_, element_start, _, element_end) = parse_der_element(bytes, sequence_header_end)?;
    687     if element_end > sequence_content_end {
    688         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    689             "invalid SMP server proof: first element exceeds sequence bounds".into(),
    690         ));
    691     }
    692     Ok(&bytes[element_start..element_end])
    693 }
    694 
    695 fn decode_signed_server_key_parts(
    696     bytes: &[u8],
    697 ) -> Result<(&[u8], &[u8]), RadrootsSimplexSmpTransportError> {
    698     let (sequence_tag, _, sequence_header_end, sequence_content_end) = parse_der_element(bytes, 0)?;
    699     if sequence_tag != 0x30 {
    700         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    701             "invalid SMP server proof: signed key is not a DER sequence".into(),
    702         ));
    703     }
    704 
    705     let (_, signed_object_start, _, signed_object_end) =
    706         parse_der_element(bytes, sequence_header_end)?;
    707     let (_, _, _, algorithm_end) = parse_der_element(bytes, signed_object_end)?;
    708     if algorithm_end > sequence_content_end {
    709         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    710             "invalid SMP server proof: signature algorithm exceeds sequence bounds".into(),
    711         ));
    712     }
    713     let (signature_tag, _, signature_value_start, signature_end) =
    714         parse_der_element(bytes, algorithm_end)?;
    715     if signature_tag != 0x03 {
    716         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    717             "invalid SMP server proof: expected DER bit string signature".into(),
    718         ));
    719     }
    720     if signature_end > sequence_content_end {
    721         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    722             "invalid SMP server proof: signature exceeds sequence bounds".into(),
    723         ));
    724     }
    725     let signature_value = bytes
    726         .get(signature_value_start..signature_end)
    727         .ok_or_else(|| {
    728             RadrootsSimplexSmpTransportError::InvalidServerAddress(
    729                 "invalid SMP server proof: truncated signature".into(),
    730             )
    731         })?;
    732     let (unused_bits, signature) = signature_value.split_first().ok_or_else(|| {
    733         RadrootsSimplexSmpTransportError::InvalidServerAddress(
    734             "invalid SMP server proof: missing signature payload".into(),
    735         )
    736     })?;
    737     if *unused_bits != 0 {
    738         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    739             "invalid SMP server proof: unsupported signature bit padding".into(),
    740         ));
    741     }
    742     Ok((&bytes[signed_object_start..signed_object_end], signature))
    743 }
    744 
    745 fn decode_server_certificate_verify_key(
    746     certificate_payload: &[u8],
    747 ) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> {
    748     let Some(&cert_count) = certificate_payload.first() else {
    749         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    750             "invalid SMP server proof: missing certificate chain".into(),
    751         ));
    752     };
    753     if cert_count == 0 {
    754         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    755             "invalid SMP server proof: empty certificate chain".into(),
    756         ));
    757     }
    758     let (certificate_der, _) = read_large_handshake_field(certificate_payload, 1)?;
    759     let (_, certificate) = x509_parser::certificate::X509Certificate::from_der(&certificate_der)
    760         .map_err(|error| {
    761             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    762                 "failed to parse SMP proof certificate: {error}"
    763             ))
    764         })?;
    765     Ok(certificate
    766         .tbs_certificate
    767         .subject_pki
    768         .subject_public_key
    769         .data
    770         .to_vec())
    771 }
    772 
    773 fn read_large_handshake_field(
    774     bytes: &[u8],
    775     offset: usize,
    776 ) -> Result<(Vec<u8>, usize), RadrootsSimplexSmpTransportError> {
    777     let Some(length_bytes) = bytes.get(offset..offset + 2) else {
    778         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    779             "invalid SMP server proof: truncated certificate length".into(),
    780         ));
    781     };
    782     let length = u16::from_be_bytes([length_bytes[0], length_bytes[1]]) as usize;
    783     let start = offset + 2;
    784     let end = start + length;
    785     let value = bytes.get(start..end).ok_or_else(|| {
    786         RadrootsSimplexSmpTransportError::InvalidServerAddress(
    787             "invalid SMP server proof: certificate exceeds payload".into(),
    788         )
    789     })?;
    790     Ok((value.to_vec(), end))
    791 }
    792 
    793 fn parse_der_element(
    794     bytes: &[u8],
    795     offset: usize,
    796 ) -> Result<(u8, usize, usize, usize), RadrootsSimplexSmpTransportError> {
    797     let tag = *bytes.get(offset).ok_or_else(|| {
    798         RadrootsSimplexSmpTransportError::InvalidServerAddress(
    799             "invalid SMP server proof: truncated DER element".into(),
    800         )
    801     })?;
    802     let length_offset = offset + 1;
    803     let length_tag = *bytes.get(length_offset).ok_or_else(|| {
    804         RadrootsSimplexSmpTransportError::InvalidServerAddress(
    805             "invalid SMP server proof: missing DER length".into(),
    806         )
    807     })?;
    808     let (value_len, header_len) = if length_tag & 0x80 == 0 {
    809         (length_tag as usize, 2)
    810     } else {
    811         let length_bytes = (length_tag & 0x7f) as usize;
    812         if length_bytes == 0 || length_bytes > 4 {
    813             return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    814                 "invalid SMP server proof: unsupported DER length encoding".into(),
    815             ));
    816         }
    817         let length_start = length_offset + 1;
    818         let length_end = length_start + length_bytes;
    819         let encoded_length = bytes.get(length_start..length_end).ok_or_else(|| {
    820             RadrootsSimplexSmpTransportError::InvalidServerAddress(
    821                 "invalid SMP server proof: truncated DER length".into(),
    822             )
    823         })?;
    824         let value_len = encoded_length
    825             .iter()
    826             .fold(0_usize, |acc, byte| (acc << 8) | (*byte as usize));
    827         (value_len, 2 + length_bytes)
    828     };
    829     let value_start = offset + header_len;
    830     let value_end = value_start + value_len;
    831     if value_end > bytes.len() {
    832         return Err(RadrootsSimplexSmpTransportError::InvalidServerAddress(
    833             "invalid SMP server proof: DER element exceeds input".into(),
    834         ));
    835     }
    836     Ok((tag, offset, value_start, value_end))
    837 }
    838 
    839 fn read_server_hello(
    840     stream: &mut StreamOwned<ClientConnection, TcpStream>,
    841 ) -> Result<RadrootsSimplexSmpServerHello, RadrootsSimplexSmpTransportError> {
    842     let mut block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE];
    843     stream
    844         .read_exact(&mut block)
    845         .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
    846     RadrootsSimplexSmpServerHello::decode(&block)
    847 }
    848 
    849 fn matching_server_identity(
    850     chain: &[CertificateDer<'static>],
    851     expected_identity: &str,
    852 ) -> Result<String, RadrootsSimplexSmpTransportError> {
    853     let expected_identity = canonical_server_identity(expected_identity)?;
    854     for certificate in chain {
    855         let identity = server_identity_from_certificate(certificate.as_ref())?;
    856         if identity == expected_identity {
    857             return Ok(identity);
    858         }
    859     }
    860     Err(RadrootsSimplexSmpTransportError::ServerIdentityMismatch {
    861         expected: expected_identity,
    862         actual: chain
    863             .first()
    864             .map(|certificate| server_identity_from_certificate(certificate.as_ref()))
    865             .transpose()?
    866             .unwrap_or_default(),
    867     })
    868 }
    869 
    870 fn server_identity_from_certificate(
    871     der: &[u8],
    872 ) -> Result<String, RadrootsSimplexSmpTransportError> {
    873     x509_parser::certificate::X509Certificate::from_der(der).map_err(|error| {
    874         RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    875             "failed to parse SMP certificate: {error}"
    876         ))
    877     })?;
    878     let digest = Sha256::digest(der);
    879     Ok(URL_SAFE_NO_PAD.encode(digest))
    880 }
    881 
    882 fn canonical_server_identity(value: &str) -> Result<String, RadrootsSimplexSmpTransportError> {
    883     URL_SAFE_NO_PAD
    884         .decode(value)
    885         .or_else(|_| URL_SAFE.decode(value))
    886         .map(|decoded| URL_SAFE_NO_PAD.encode(decoded))
    887         .map_err(|_| {
    888             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    889                 "invalid base64url server identity `{value}`"
    890             ))
    891         })
    892 }
    893 
    894 fn decode_server_identity(value: &str) -> Result<Vec<u8>, RadrootsSimplexSmpTransportError> {
    895     URL_SAFE_NO_PAD
    896         .decode(value)
    897         .or_else(|_| URL_SAFE.decode(value))
    898         .map_err(|_| {
    899             RadrootsSimplexSmpTransportError::InvalidServerAddress(format!(
    900                 "invalid base64url server identity `{value}`"
    901             ))
    902         })
    903 }
    904 
    905 #[derive(Debug)]
    906 struct PermissiveSimplexServerVerifier;
    907 
    908 impl ServerCertVerifier for PermissiveSimplexServerVerifier {
    909     fn verify_server_cert(
    910         &self,
    911         _end_entity: &CertificateDer<'_>,
    912         _intermediates: &[CertificateDer<'_>],
    913         _server_name: &ServerName<'_>,
    914         _ocsp_response: &[u8],
    915         _now: UnixTime,
    916     ) -> Result<ServerCertVerified, RustlsError> {
    917         Ok(ServerCertVerified::assertion())
    918     }
    919 
    920     fn verify_tls12_signature(
    921         &self,
    922         _message: &[u8],
    923         _cert: &CertificateDer<'_>,
    924         _dss: &DigitallySignedStruct,
    925     ) -> Result<HandshakeSignatureValid, RustlsError> {
    926         Ok(HandshakeSignatureValid::assertion())
    927     }
    928 
    929     fn verify_tls13_signature(
    930         &self,
    931         _message: &[u8],
    932         _cert: &CertificateDer<'_>,
    933         _dss: &DigitallySignedStruct,
    934     ) -> Result<HandshakeSignatureValid, RustlsError> {
    935         Ok(HandshakeSignatureValid::assertion())
    936     }
    937 
    938     fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
    939         vec![
    940             SignatureScheme::ED25519,
    941             SignatureScheme::ECDSA_NISTP256_SHA256,
    942             SignatureScheme::ECDSA_NISTP384_SHA384,
    943             SignatureScheme::RSA_PSS_SHA256,
    944             SignatureScheme::RSA_PSS_SHA384,
    945             SignatureScheme::RSA_PKCS1_SHA256,
    946             SignatureScheme::RSA_PKCS1_SHA384,
    947         ]
    948     }
    949 }
    950 
    951 #[cfg(test)]
    952 mod tests {
    953     use super::{
    954         canonical_server_identity, decode_encrypted_transport_block,
    955         decode_server_transport_public_key, encode_encrypted_transport_payload,
    956         select_live_response,
    957     };
    958     use crate::handshake::RadrootsSimplexSmpTransportServerProof;
    959     use crate::prelude::{RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportResponse};
    960     use radroots_simplex_smp_crypto::prelude::{
    961         RadrootsSimplexSmpX25519Keypair, encode_x25519_public_key_x509, init_secretbox_chain,
    962     };
    963     use radroots_simplex_smp_proto::prelude::{
    964         RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage,
    965         RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpCommand,
    966         RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId,
    967         RadrootsSimplexSmpReceivedMessage, RadrootsSimplexSmpServerAddress,
    968     };
    969     use std::collections::VecDeque;
    970 
    971     #[test]
    972     fn canonicalizes_padded_and_unpadded_server_identity() {
    973         assert_eq!(canonical_server_identity("YWJjZA").unwrap(), "YWJjZA");
    974         assert_eq!(canonical_server_identity("YWJjZA==").unwrap(), "YWJjZA");
    975     }
    976 
    977     #[test]
    978     fn extracts_spki_from_signed_server_key_sequence() {
    979         let keypair = RadrootsSimplexSmpX25519Keypair::from_seed(b"transport-proof");
    980         let spki = encode_x25519_public_key_x509(&keypair.public_key).unwrap();
    981         let empty_sequence = der_sequence(core::iter::once(&[][..]));
    982         let signature = [0x03, 0x01, 0x00];
    983         let signed_object = der_sequence([
    984             spki.as_slice(),
    985             empty_sequence.as_slice(),
    986             signature.as_slice(),
    987         ]);
    988         let proof = RadrootsSimplexSmpTransportServerProof {
    989             certificate_payload: Vec::new(),
    990             signed_server_key: signed_object,
    991         };
    992         assert_eq!(
    993             decode_server_transport_public_key(&proof).unwrap(),
    994             keypair.public_key
    995         );
    996     }
    997 
    998     #[test]
    999     fn encrypted_transport_blocks_use_upstream_client_chain_direction() {
   1000         let session_identifier = b"rr-synth-session-id";
   1001         let shared_secret = b"rr-synth-shared-secret";
   1002         let (mut server_send_chain, mut server_receive_chain) =
   1003             init_secretbox_chain(session_identifier, shared_secret).unwrap();
   1004         let (client_receive_chain, client_send_chain) =
   1005             init_secretbox_chain(session_identifier, shared_secret).unwrap();
   1006         let mut client_receive_chain_for_response = client_receive_chain.clone();
   1007         let mut client_send_chain_for_request = client_send_chain.clone();
   1008 
   1009         let command_transmission = RadrootsSimplexSmpCommandTransmission {
   1010             authorization: Vec::new(),
   1011             correlation_id: Some(RadrootsSimplexSmpCorrelationId::new([3_u8; 24])),
   1012             entity_id: b"rr-synth-queue".to_vec(),
   1013             command: RadrootsSimplexSmpCommand::Ping,
   1014         };
   1015         let command_block = RadrootsSimplexSmpTransportBlock::from_command_transmissions(
   1016             &[command_transmission.clone()],
   1017             RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1018         )
   1019         .unwrap();
   1020         let encrypted_command = encode_encrypted_transport_payload(
   1021             &mut client_send_chain_for_request,
   1022             &command_block.encode_payload().unwrap(),
   1023         )
   1024         .unwrap();
   1025         assert_eq!(
   1026             decode_encrypted_transport_block(&mut server_receive_chain, &encrypted_command)
   1027                 .unwrap()
   1028                 .decode_command_transmissions(RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION)
   1029                 .unwrap(),
   1030             vec![command_transmission]
   1031         );
   1032 
   1033         let broker_transmission = RadrootsSimplexSmpBrokerTransmission {
   1034             authorization: Vec::new(),
   1035             correlation_id: Some(RadrootsSimplexSmpCorrelationId::new([3_u8; 24])),
   1036             entity_id: b"rr-synth-queue".to_vec(),
   1037             message: RadrootsSimplexSmpBrokerMessage::Ok,
   1038         };
   1039         let broker_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions(
   1040             &[broker_transmission.clone()],
   1041             RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1042         )
   1043         .unwrap();
   1044         let encrypted_broker = encode_encrypted_transport_payload(
   1045             &mut server_send_chain,
   1046             &broker_block.encode_payload().unwrap(),
   1047         )
   1048         .unwrap();
   1049         assert_eq!(
   1050             decode_encrypted_transport_block(
   1051                 &mut client_receive_chain_for_response,
   1052                 &encrypted_broker,
   1053             )
   1054             .unwrap()
   1055             .decode_broker_transmissions(RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION)
   1056             .unwrap(),
   1057             vec![broker_transmission]
   1058         );
   1059 
   1060         let mut wrong_response_chain = client_send_chain;
   1061         let wrong_direction_broker = encode_encrypted_transport_payload(
   1062             &mut wrong_response_chain,
   1063             &broker_block.encode_payload().unwrap(),
   1064         )
   1065         .unwrap();
   1066         let mut fresh_client_receive_chain = client_receive_chain;
   1067         assert!(
   1068             decode_encrypted_transport_block(
   1069                 &mut fresh_client_receive_chain,
   1070                 &wrong_direction_broker
   1071             )
   1072             .is_err()
   1073         );
   1074     }
   1075 
   1076     #[test]
   1077     fn ack_uses_subscription_session_state() {
   1078         assert_eq!(
   1079             super::session_kind_for_command(&RadrootsSimplexSmpCommand::Ack(b"message".to_vec())),
   1080             "subscription"
   1081         );
   1082         assert!(super::accepts_uncorrelated_subscription_response(
   1083             &RadrootsSimplexSmpCommand::Ack(b"message".to_vec())
   1084         ));
   1085         assert!(super::accepts_uncorrelated_subscription_response(
   1086             &RadrootsSimplexSmpCommand::Sub
   1087         ));
   1088     }
   1089 
   1090     #[test]
   1091     fn strict_command_selection_buffers_unmatched_response_and_errors() {
   1092         let mut pending = VecDeque::new();
   1093         let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
   1094         let unmatched = response(
   1095             Some(RadrootsSimplexSmpCorrelationId::new([2_u8; 24])),
   1096             b"rr-synth-entity",
   1097             RadrootsSimplexSmpBrokerMessage::Ok,
   1098         );
   1099 
   1100         assert_eq!(
   1101             select_live_response(&mut pending, vec![unmatched.clone()], Some(expected), None)
   1102                 .unwrap_err(),
   1103             crate::prelude::RadrootsSimplexSmpTransportError::CorrelationIdMismatch
   1104         );
   1105         assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![unmatched]);
   1106     }
   1107 
   1108     #[test]
   1109     fn matched_response_wins_and_buffers_subscription_message() {
   1110         let mut pending = VecDeque::new();
   1111         let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
   1112         let message = response(
   1113             None,
   1114             b"rr-synth-entity",
   1115             RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
   1116                 message_id: b"message-1".to_vec(),
   1117                 encrypted_body: b"body".to_vec(),
   1118             }),
   1119         );
   1120         let matched = response(
   1121             Some(expected),
   1122             b"rr-synth-entity",
   1123             RadrootsSimplexSmpBrokerMessage::Sok(None),
   1124         );
   1125 
   1126         let selected = select_live_response(
   1127             &mut pending,
   1128             vec![message.clone(), matched.clone()],
   1129             Some(expected),
   1130             Some(b"rr-synth-entity"),
   1131         )
   1132         .unwrap();
   1133 
   1134         assert_eq!(selected, Some(matched));
   1135         assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![message]);
   1136     }
   1137 
   1138     #[test]
   1139     fn subscription_selection_accepts_uncorrelated_message_for_entity() {
   1140         let mut pending = VecDeque::new();
   1141         let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
   1142         let message = response(
   1143             None,
   1144             b"rr-synth-entity",
   1145             RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
   1146                 message_id: b"message-1".to_vec(),
   1147                 encrypted_body: b"body".to_vec(),
   1148             }),
   1149         );
   1150         let other = response(
   1151             None,
   1152             b"rr-other-entity",
   1153             RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
   1154                 message_id: b"message-2".to_vec(),
   1155                 encrypted_body: b"other".to_vec(),
   1156             }),
   1157         );
   1158 
   1159         let selected = select_live_response(
   1160             &mut pending,
   1161             vec![other.clone(), message.clone()],
   1162             Some(expected),
   1163             Some(b"rr-synth-entity"),
   1164         )
   1165         .unwrap();
   1166 
   1167         assert_eq!(selected, Some(message));
   1168         assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![other]);
   1169     }
   1170 
   1171     fn der_sequence<'a, I>(elements: I) -> Vec<u8>
   1172     where
   1173         I: IntoIterator<Item = &'a [u8]>,
   1174     {
   1175         let mut body = Vec::new();
   1176         for element in elements {
   1177             if element.is_empty() {
   1178                 body.extend_from_slice(&[0x30, 0x00]);
   1179             } else {
   1180                 body.extend_from_slice(element);
   1181             }
   1182         }
   1183         let mut sequence = vec![0x30];
   1184         push_der_length(&mut sequence, body.len());
   1185         sequence.extend_from_slice(&body);
   1186         sequence
   1187     }
   1188 
   1189     fn push_der_length(buffer: &mut Vec<u8>, len: usize) {
   1190         if len < 0x80 {
   1191             buffer.push(len as u8);
   1192             return;
   1193         }
   1194         let mut bytes = Vec::new();
   1195         let mut remaining = len;
   1196         while remaining > 0 {
   1197             bytes.push((remaining & 0xff) as u8);
   1198             remaining >>= 8;
   1199         }
   1200         bytes.reverse();
   1201         buffer.push(0x80 | (bytes.len() as u8));
   1202         buffer.extend_from_slice(&bytes);
   1203     }
   1204 
   1205     fn response(
   1206         correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
   1207         entity_id: &[u8],
   1208         message: RadrootsSimplexSmpBrokerMessage,
   1209     ) -> RadrootsSimplexSmpTransportResponse {
   1210         RadrootsSimplexSmpTransportResponse {
   1211             server: RadrootsSimplexSmpServerAddress {
   1212                 server_identity: "cnItc3ludGgtc2VydmVy".to_owned(),
   1213                 hosts: vec!["127.0.0.1".to_owned()],
   1214                 port: Some(5223),
   1215             },
   1216             transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
   1217             transmission: RadrootsSimplexSmpBrokerTransmission {
   1218                 authorization: Vec::new(),
   1219                 correlation_id,
   1220                 entity_id: entity_id.to_vec(),
   1221                 message,
   1222             },
   1223             transport_hash: vec![9_u8; 32],
   1224         }
   1225     }
   1226 }