commit 607a493d7e3066842327338c0dd4f5c45e285477
parent 4b33176e934b7f4d0f5dbabc621b5cd2e8caa1cc
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 12:53:19 +0000
simplex: harden live dm interop runtime
- decode queue URI sender ids before queue-address matching
- align short-link auth, server identity, and hello lifecycle with live SMP flows
- treat broker replays and empty NoMsg polls as idempotent live transport outcomes
- extend ratchet, store, runtime, transport, and interop tests for the live-DM path
Diffstat:
9 files changed, 1025 insertions(+), 161 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -4611,6 +4611,7 @@ dependencies = [
"radroots_simplex_smp_proto",
"radroots_simplex_smp_transport",
"sha2",
+ "sha3",
"tempfile",
]
diff --git a/crates/simplex_agent_proto/src/model.rs b/crates/simplex_agent_proto/src/model.rs
@@ -1,4 +1,6 @@
use alloc::vec::Vec;
+use base64::Engine as _;
+use base64::engine::general_purpose::{URL_SAFE, URL_SAFE_NO_PAD};
use radroots_simplex_smp_crypto::prelude::{
RadrootsSimplexOfficialX3dhParams, RadrootsSimplexSmpRatchetHeader,
};
@@ -58,11 +60,18 @@ impl RadrootsSimplexAgentQueueDescriptor {
pub fn queue_address(&self) -> RadrootsSimplexAgentQueueAddress {
RadrootsSimplexAgentQueueAddress {
server: self.queue_uri.server.clone(),
- sender_id: self.queue_uri.sender_id.as_bytes().to_vec(),
+ sender_id: decode_queue_uri_sender_id(&self.queue_uri.sender_id),
}
}
}
+fn decode_queue_uri_sender_id(sender_id: &str) -> Vec<u8> {
+ URL_SAFE_NO_PAD
+ .decode(sender_id.as_bytes())
+ .or_else(|_| URL_SAFE.decode(sender_id.as_bytes()))
+ .expect("validated SimpleX queue URI sender id")
+}
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RadrootsSimplexAgentQueueUseDecision {
pub queue_address: RadrootsSimplexAgentQueueAddress,
diff --git a/crates/simplex_agent_runtime/Cargo.toml b/crates/simplex_agent_runtime/Cargo.toml
@@ -21,6 +21,7 @@ std = [
"radroots_simplex_smp_proto/std",
"radroots_simplex_smp_transport/std",
"sha2/std",
+ "sha3/std",
]
[dependencies]
@@ -31,6 +32,7 @@ radroots_simplex_smp_crypto = { workspace = true, default-features = false }
radroots_simplex_smp_proto = { workspace = true, default-features = false }
radroots_simplex_smp_transport = { workspace = true, default-features = false }
sha2 = { workspace = true, default-features = false }
+sha3 = { workspace = true, default-features = false }
[dev-dependencies]
tempfile = { workspace = true }
diff --git a/crates/simplex_agent_runtime/src/error.rs b/crates/simplex_agent_runtime/src/error.rs
@@ -2,11 +2,13 @@ use alloc::string::String;
use core::fmt;
use radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentProtoError;
use radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError;
+use radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpCryptoError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RadrootsSimplexAgentRuntimeError {
Proto(RadrootsSimplexAgentProtoError),
Store(RadrootsSimplexAgentStoreError),
+ Crypto(RadrootsSimplexSmpCryptoError),
MissingConfig(&'static str),
InvalidConfig(&'static str),
Runtime(String),
@@ -24,11 +26,18 @@ impl From<RadrootsSimplexAgentStoreError> for RadrootsSimplexAgentRuntimeError {
}
}
+impl From<RadrootsSimplexSmpCryptoError> for RadrootsSimplexAgentRuntimeError {
+ fn from(value: RadrootsSimplexSmpCryptoError) -> Self {
+ Self::Crypto(value)
+ }
+}
+
impl fmt::Display for RadrootsSimplexAgentRuntimeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Proto(error) => write!(f, "{error}"),
Self::Store(error) => write!(f, "{error}"),
+ Self::Crypto(error) => write!(f, "{error}"),
Self::MissingConfig(field) => {
write!(f, "missing SimpleX agent runtime config `{field}`")
}
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -22,8 +22,9 @@ use radroots_simplex_agent_proto::prelude::{
use radroots_simplex_agent_store::prelude::{
RadrootsSimplexAgentOutboundMessage, RadrootsSimplexAgentPendingCommand,
RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentPqKeypair,
- RadrootsSimplexAgentQueueRole, RadrootsSimplexAgentShortLinkCredentials,
- RadrootsSimplexAgentStore, RadrootsSimplexAgentX3dhKeypair,
+ RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentQueueRole,
+ RadrootsSimplexAgentShortLinkCredentials, RadrootsSimplexAgentStore,
+ RadrootsSimplexAgentX3dhKeypair,
};
use radroots_simplex_smp_crypto::prelude::{
RADROOTS_SIMPLEX_OFFICIAL_E2E_CURRENT_VERSION, RADROOTS_SIMPLEX_OFFICIAL_E2E_KDF_VERSION,
@@ -43,12 +44,12 @@ use radroots_simplex_smp_crypto::prelude::{
use radroots_simplex_smp_proto::prelude::{
RADROOTS_SIMPLEX_SMP_CURRENT_CLIENT_VERSION, RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
RadrootsSimplexSmpBrokerMessage, RadrootsSimplexSmpCommand, RadrootsSimplexSmpCorrelationId,
- RadrootsSimplexSmpMessageFlags, RadrootsSimplexSmpMessagingQueueRequest,
- RadrootsSimplexSmpNewQueueRequest, RadrootsSimplexSmpQueueIdsResponse,
- RadrootsSimplexSmpQueueLinkData, RadrootsSimplexSmpQueueMode,
- RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpSendCommand,
- RadrootsSimplexSmpServerAddress, RadrootsSimplexSmpSubscriptionMode,
- RadrootsSimplexSmpVersionRange,
+ RadrootsSimplexSmpError, RadrootsSimplexSmpMessageFlags,
+ RadrootsSimplexSmpMessagingQueueRequest, RadrootsSimplexSmpNewQueueRequest,
+ RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpQueueLinkData,
+ RadrootsSimplexSmpQueueMode, RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpQueueUri,
+ RadrootsSimplexSmpSendCommand, RadrootsSimplexSmpServerAddress,
+ RadrootsSimplexSmpSubscriptionMode, RadrootsSimplexSmpVersionRange,
};
use radroots_simplex_smp_transport::prelude::{
RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
@@ -56,6 +57,7 @@ use radroots_simplex_smp_transport::prelude::{
RadrootsSimplexSmpTransportResponse,
};
use sha2::{Digest, Sha256};
+use sha3::Sha3_384;
#[cfg(feature = "std")]
use std::path::{Path, PathBuf};
@@ -303,6 +305,7 @@ impl RadrootsSimplexAgentRuntime {
true,
receive_auth_state,
)?;
+ let server_key_hash = decode_server_key_hash(&descriptor.queue_uri.server.server_identity)?;
{
let connection = self.store.connection_mut(&connection.id)?;
connection.local_e2e_public_key = Some(e2e_keypair.public_key);
@@ -315,7 +318,7 @@ impl RadrootsSimplexAgentRuntime {
scheme: RadrootsSimplexAgentShortLinkScheme::Simplex,
hosts: descriptor.queue_uri.server.hosts.clone(),
port: descriptor.queue_uri.server.port,
- server_key_hash: None,
+ server_key_hash: Some(server_key_hash),
link_id: Vec::new(),
link_key: prepared.link_key,
link_public_signature_key: prepared.link_public_signature_key,
@@ -356,7 +359,7 @@ impl RadrootsSimplexAgentRuntime {
None,
);
let connection_id = connection.id.clone();
- self.prepare_join_connection(&connection_id, invitation, reply_queue, now)?;
+ self.prepare_join_connection(&connection_id, invitation, reply_queue, now, None)?;
self.flush_store()?;
Ok(connection_id)
}
@@ -374,11 +377,13 @@ impl RadrootsSimplexAgentRuntime {
None,
None,
);
+ let sender_auth_state = self.store.generate_queue_auth_state()?;
self.store.enqueue_command(
&connection.id,
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation,
reply_queue,
+ sender_auth_state,
},
now,
)?;
@@ -392,7 +397,9 @@ impl RadrootsSimplexAgentRuntime {
invitation: RadrootsSimplexAgentConnectionLink,
mut reply_queue: RadrootsSimplexSmpQueueUri,
now: u64,
+ secured_sender_auth_state: Option<RadrootsSimplexAgentQueueAuthState>,
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ let send_queue_is_secured = secured_sender_auth_state.is_some();
let local_e2e_keypair = RadrootsSimplexSmpX25519Keypair::generate()
.map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
let invitation_e2e_public_key =
@@ -480,7 +487,11 @@ impl RadrootsSimplexAgentRuntime {
.map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))?;
None
};
- let send_auth_state = self.store.generate_queue_auth_state()?;
+ let send_auth_state = if let Some(sender_auth_state) = secured_sender_auth_state {
+ sender_auth_state
+ } else {
+ self.store.generate_queue_auth_state()?
+ };
let send_descriptor = RadrootsSimplexAgentQueueDescriptor {
queue_uri: invitation.invitation_queue.clone(),
replaced_queue: None,
@@ -538,14 +549,16 @@ impl RadrootsSimplexAgentRuntime {
})?;
queue.delivery_private_key = Some(delivery_keypair.private_key);
}
- self.store.enqueue_command(
- connection_id,
- RadrootsSimplexAgentPendingCommandKind::SecureQueue {
- queue: send_descriptor.queue_address(),
- sender_key: send_descriptor.sender_key.clone(),
- },
- now,
- )?;
+ if !send_queue_is_secured {
+ self.store.enqueue_command(
+ connection_id,
+ RadrootsSimplexAgentPendingCommandKind::SecureQueue {
+ queue: send_descriptor.queue_address(),
+ sender_key: send_descriptor.sender_key.clone(),
+ },
+ now,
+ )?;
+ }
self.store.enqueue_command(
connection_id,
RadrootsSimplexAgentPendingCommandKind::CreateQueue {
@@ -592,6 +605,7 @@ impl RadrootsSimplexAgentRuntime {
},
now,
)?;
+ self.enqueue_hello(connection_id, now)?;
self.flush_store()?;
Ok(())
}
@@ -776,11 +790,11 @@ impl RadrootsSimplexAgentRuntime {
RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
queue: receive_queue,
broker_message_id,
- receipt: RadrootsSimplexAgentMessageReceipt {
+ receipt: Some(RadrootsSimplexAgentMessageReceipt {
message_id,
message_hash,
receipt_info,
- },
+ }),
},
now,
)?;
@@ -809,6 +823,31 @@ impl RadrootsSimplexAgentRuntime {
self.ack_message(connection_id, message_id, message_hash, receipt_info, now)
}
+ fn ack_broker_message(
+ &mut self,
+ connection_id: &str,
+ queue: radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentQueueAddress,
+ broker_message_id: Vec<u8>,
+ now: u64,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ if self
+ .store
+ .has_pending_broker_ack(connection_id, &queue, &broker_message_id)
+ {
+ return Ok(());
+ }
+ self.store.enqueue_command(
+ connection_id,
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
+ queue,
+ broker_message_id,
+ receipt: None,
+ },
+ now,
+ )?;
+ Ok(())
+ }
+
pub fn reconnect_connection(
&mut self,
connection_id: &str,
@@ -876,6 +915,13 @@ impl RadrootsSimplexAgentRuntime {
RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => {
let mut secure_queues = Vec::new();
for descriptor in reply_queues {
+ let queue_address = descriptor.queue_address();
+ if let Ok(existing_queue) =
+ self.store.queue_record(connection_id, &queue_address)
+ && existing_queue.role == RadrootsSimplexAgentQueueRole::Send
+ {
+ continue;
+ }
let auth_state = self.store.generate_queue_auth_state()?;
let mut descriptor = descriptor;
descriptor.sender_key = Some(auth_state.public_key.clone());
@@ -890,6 +936,16 @@ impl RadrootsSimplexAgentRuntime {
)?;
secure_queues.push((secure_queue, sender_key));
}
+ if secure_queues.is_empty()
+ && matches!(
+ self.store.connection(connection_id)?.status,
+ RadrootsSimplexAgentConnectionStatus::AwaitingApproval
+ | RadrootsSimplexAgentConnectionStatus::Allowed
+ | RadrootsSimplexAgentConnectionStatus::Connected
+ )
+ {
+ return Ok(());
+ }
self.store.set_status(
connection_id,
RadrootsSimplexAgentConnectionStatus::AwaitingApproval,
@@ -1169,15 +1225,28 @@ impl RadrootsSimplexAgentRuntime {
) -> Result<RadrootsSimplexSmpTransportRequest, RadrootsSimplexAgentRuntimeError> {
match &command.kind {
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
- invitation, ..
+ invitation,
+ sender_auth_state,
+ ..
} => {
let server = short_invitation_server(invitation)?;
- return Ok(self.server_transport_request(
- command.id,
- &server,
- invitation.link_id.clone(),
- RadrootsSimplexSmpCommand::LKey(invitation.link_key.clone()),
- ));
+ return Ok(RadrootsSimplexSmpTransportRequest {
+ server,
+ transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
+ correlation_id: Some(self.command_correlation_id(command)?),
+ entity_id: invitation.link_id.clone(),
+ command: RadrootsSimplexSmpCommand::LKey(
+ encode_ed25519_public_key_x509(&sender_auth_state.public_key).map_err(
+ |error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()),
+ )?,
+ ),
+ authorization: RadrootsSimplexSmpCommandAuthorization::Ed25519(
+ radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair {
+ public_key: sender_auth_state.public_key.clone(),
+ private_key: sender_auth_state.private_key.clone(),
+ },
+ ),
+ });
}
RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { invitation, .. } => {
let server = short_invitation_server(invitation)?;
@@ -1201,24 +1270,13 @@ impl RadrootsSimplexAgentRuntime {
),
)
})?;
- let correlation_id = correlation_id_for_command(command.id);
- let authorization = match &command.kind {
- RadrootsSimplexAgentPendingCommandKind::SendEnvelope { .. }
- if queue.role == RadrootsSimplexAgentQueueRole::Send
- && matches!(
- self.store.connection(&command.connection_id)?.status,
- RadrootsSimplexAgentConnectionStatus::JoinPending
- ) =>
- {
- RadrootsSimplexSmpCommandAuthorization::None
- }
- _ => RadrootsSimplexSmpCommandAuthorization::Ed25519(
- radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair {
- public_key: auth.public_key,
- private_key: auth.private_key,
- },
- ),
- };
+ let correlation_id = self.command_correlation_id(command)?;
+ let authorization = RadrootsSimplexSmpCommandAuthorization::Ed25519(
+ radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpEd25519Keypair {
+ public_key: auth.public_key,
+ private_key: auth.private_key,
+ },
+ );
Ok(RadrootsSimplexSmpTransportRequest {
server: queue.descriptor.queue_uri.server.clone(),
transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
@@ -1236,16 +1294,65 @@ impl RadrootsSimplexAgentRuntime {
entity_id: Vec<u8>,
command: RadrootsSimplexSmpCommand,
) -> RadrootsSimplexSmpTransportRequest {
+ let correlation_id = correlation_id_from_material(
+ b"simplex-server-command-correlation",
+ &[
+ command_id.to_be_bytes().to_vec(),
+ server.server_identity.as_bytes().to_vec(),
+ entity_id.clone(),
+ ],
+ );
RadrootsSimplexSmpTransportRequest {
server: server.clone(),
transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
- correlation_id: Some(correlation_id_for_command(command_id)),
+ correlation_id: Some(correlation_id),
entity_id,
command,
authorization: RadrootsSimplexSmpCommandAuthorization::None,
}
}
+ fn command_correlation_id(
+ &self,
+ command: &RadrootsSimplexAgentPendingCommand,
+ ) -> Result<RadrootsSimplexSmpCorrelationId, RadrootsSimplexAgentRuntimeError> {
+ let mut parts = vec![
+ command.id.to_be_bytes().to_vec(),
+ command.connection_id.as_bytes().to_vec(),
+ ];
+ if let Some(queue_address) = queue_for_command(command) {
+ parts.push(queue_address.server.server_identity.as_bytes().to_vec());
+ parts.push(queue_address.sender_id.clone());
+ parts.push(
+ self.store
+ .queue_auth_state(&command.connection_id, &queue_address)?
+ .public_key,
+ );
+ }
+ if matches!(
+ command.kind,
+ RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. }
+ ) && let Some(short_link) = self
+ .store
+ .connection(&command.connection_id)?
+ .short_link
+ .as_ref()
+ {
+ parts.push(short_link.link_key.clone());
+ }
+ if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
+ sender_auth_state,
+ ..
+ } = &command.kind
+ {
+ parts.push(sender_auth_state.public_key.clone());
+ }
+ Ok(correlation_id_from_material(
+ b"simplex-command-correlation",
+ &parts,
+ ))
+ }
+
fn command_transport_parts(
&self,
command: &RadrootsSimplexAgentPendingCommand,
@@ -1259,6 +1366,7 @@ impl RadrootsSimplexAgentRuntime {
> {
match &command.kind {
RadrootsSimplexAgentPendingCommandKind::CreateQueue { descriptor } => {
+ let correlation_id = self.command_correlation_id(command)?;
let auth_state = self
.store
.queue_auth_state(&command.connection_id, &descriptor.queue_address())?;
@@ -1304,7 +1412,7 @@ impl RadrootsSimplexAgentRuntime {
RadrootsSimplexSmpQueueRequestData::Messaging(
self.short_link_messaging_queue_request(
&command.connection_id,
- descriptor,
+ &correlation_id,
)?,
)
}
@@ -1410,7 +1518,7 @@ impl RadrootsSimplexAgentRuntime {
fn short_link_messaging_queue_request(
&self,
connection_id: &str,
- descriptor: &RadrootsSimplexAgentQueueDescriptor,
+ correlation_id: &RadrootsSimplexSmpCorrelationId,
) -> Result<Option<RadrootsSimplexSmpMessagingQueueRequest>, RadrootsSimplexAgentRuntimeError>
{
let connection = self.store.connection(connection_id)?;
@@ -1431,7 +1539,7 @@ impl RadrootsSimplexAgentRuntime {
))
})?;
Ok(Some(RadrootsSimplexSmpMessagingQueueRequest {
- sender_id: descriptor.queue_address().sender_id,
+ sender_id: short_link_sender_id(correlation_id),
link_data: RadrootsSimplexSmpQueueLinkData {
fixed_data,
user_data,
@@ -1450,6 +1558,23 @@ impl RadrootsSimplexAgentRuntime {
reply_queue,
} = &command.kind
else {
+ if let RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
+ invitation,
+ reply_queue,
+ sender_auth_state,
+ } = &command.kind
+ {
+ let mut connection_link =
+ decrypt_short_invitation_link_data(invitation, &link_data)?;
+ connection_link.invitation_queue.sender_id = URL_SAFE_NO_PAD.encode(sender_id);
+ return self.prepare_join_connection(
+ &command.connection_id,
+ connection_link,
+ reply_queue.clone(),
+ command.ready_at,
+ Some(sender_auth_state.clone()),
+ );
+ }
return Err(RadrootsSimplexAgentRuntimeError::Runtime(
"SimpleX LNK response received for non-retrieval command".into(),
));
@@ -1461,6 +1586,7 @@ impl RadrootsSimplexAgentRuntime {
connection_link,
reply_queue.clone(),
command.ready_at,
+ None,
)
}
@@ -1470,12 +1596,22 @@ impl RadrootsSimplexAgentRuntime {
response: RadrootsSimplexSmpTransportResponse,
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
match response.transmission.message {
+ RadrootsSimplexSmpBrokerMessage::Err(error)
+ if is_empty_queue_no_msg(command, &error) =>
+ {
+ self.record_command_outcome(
+ command.id,
+ RadrootsSimplexAgentCommandOutcome::Delivered,
+ )
+ }
RadrootsSimplexSmpBrokerMessage::Err(error) => self.record_command_outcome(
command.id,
RadrootsSimplexAgentCommandOutcome::Failed {
message: format!(
- "SimpleX broker rejected command `{}`: {:?}",
- command.id, error
+ "SimpleX broker rejected command `{}` ({}): {:?}",
+ command.id,
+ pending_command_kind_label(command),
+ error
),
},
),
@@ -1542,6 +1678,7 @@ impl RadrootsSimplexAgentRuntime {
message,
response.transport_hash,
),
+ RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg) => Ok(()),
RadrootsSimplexSmpBrokerMessage::Err(error) => {
self.events
.push_back(RadrootsSimplexAgentRuntimeEvent::Error {
@@ -1576,6 +1713,21 @@ impl RadrootsSimplexAgentRuntime {
message_id: delivered.message_id,
message_hash: delivered.message_hash,
});
+ let connection = self.store.connection(&command.connection_id)?;
+ if connection.status == RadrootsSimplexAgentConnectionStatus::Allowed
+ && connection.hello_sent
+ && delivered.message_id == 1
+ {
+ self.store.set_status(
+ &command.connection_id,
+ RadrootsSimplexAgentConnectionStatus::Connected,
+ )?;
+ self.events.push_back(
+ RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished {
+ connection_id: command.connection_id.clone(),
+ },
+ );
+ }
}
RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => {
self.store
@@ -1587,7 +1739,10 @@ impl RadrootsSimplexAgentRuntime {
.mark_queue_tested(&command.connection_id, queue)?;
}
}
- RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { receipt, .. } => {
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
+ receipt: Some(receipt),
+ ..
+ } => {
self.events.push_back(
RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered {
connection_id: command.connection_id.clone(),
@@ -1596,19 +1751,6 @@ impl RadrootsSimplexAgentRuntime {
},
);
}
- RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
- invitation,
- reply_queue,
- } => {
- self.store.enqueue_command(
- &command.connection_id,
- RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData {
- invitation: invitation.clone(),
- reply_queue: reply_queue.clone(),
- },
- command.ready_at,
- )?;
- }
_ => {}
}
Ok(())
@@ -1885,6 +2027,14 @@ impl RadrootsSimplexAgentRuntime {
message: radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpReceivedMessage,
transport_hash: Vec<u8>,
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ let connection = self.store.connection(connection_id)?;
+ if connection.last_received_queue.as_ref() == Some(queue)
+ && connection.last_received_broker_message_id.as_deref()
+ == Some(message.message_id.as_slice())
+ {
+ self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
+ return Ok(());
+ }
let received = self.decode_received_message_body(connection_id, queue, &message)?;
if received.sent_body.is_empty() {
return Ok(());
@@ -1894,8 +2044,31 @@ impl RadrootsSimplexAgentRuntime {
if let Some(shared_secret) = derived_secret {
self.store.connection_mut(connection_id)?.shared_secret = Some(shared_secret);
}
+ if self.is_official_payload_replay(connection_id, &envelope)? {
+ {
+ let connection = self.store.connection_mut(connection_id)?;
+ connection.last_received_queue = Some(queue.clone());
+ connection.last_received_broker_message_id = Some(message.message_id.clone());
+ }
+ self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
+ return Ok(());
+ }
self.initialize_receiver_ratchet_from_confirmation(connection_id, &envelope)?;
- let decrypted = self.extract_decrypted_message(connection_id, &envelope)?;
+ let decrypted = match self.extract_decrypted_message(connection_id, &envelope) {
+ Ok(decrypted) => decrypted,
+ Err(RadrootsSimplexAgentRuntimeError::Crypto(
+ RadrootsSimplexSmpCryptoError::RatchetMessageRegression { .. },
+ )) => {
+ {
+ let connection = self.store.connection_mut(connection_id)?;
+ connection.last_received_queue = Some(queue.clone());
+ connection.last_received_broker_message_id = Some(message.message_id.clone());
+ }
+ self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
+ return Ok(());
+ }
+ Err(error) => return Err(error),
+ };
let agent_message_hash =
if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted {
let encoded = encode_decrypted_message(&decrypted)?;
@@ -1905,9 +2078,15 @@ impl RadrootsSimplexAgentRuntime {
} else {
None
};
+ let requires_app_ack = matches!(
+ &decrypted,
+ RadrootsSimplexAgentDecryptedMessage::Message(frame)
+ if matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_))
+ );
{
let connection = self.store.connection_mut(connection_id)?;
connection.last_received_queue = Some(queue.clone());
+ connection.last_received_broker_message_id = Some(message.message_id.clone());
}
let _ = received.timestamp;
let _ = received.flags;
@@ -1920,11 +2099,40 @@ impl RadrootsSimplexAgentRuntime {
agent_message_hash.clone().unwrap_or_default(),
)?;
}
- self.handle_inbound_decrypted_message(
- connection_id,
- decrypted,
- agent_message_hash.unwrap_or(transport_hash),
- )
+ let message_hash = agent_message_hash.unwrap_or_else(|| transport_hash);
+ self.handle_inbound_decrypted_message(connection_id, decrypted, message_hash)?;
+ if !requires_app_ack {
+ self.ack_broker_message(connection_id, queue.clone(), message.message_id, 0)?;
+ }
+ Ok(())
+ }
+
+ fn is_official_payload_replay(
+ &self,
+ connection_id: &str,
+ envelope: &RadrootsSimplexAgentEnvelope,
+ ) -> Result<bool, RadrootsSimplexAgentRuntimeError> {
+ let official_message = match envelope {
+ RadrootsSimplexAgentEnvelope::Confirmation { encrypted, .. }
+ | RadrootsSimplexAgentEnvelope::Message(encrypted)
+ | RadrootsSimplexAgentEnvelope::RatchetKey { encrypted, .. } => {
+ encrypted.official_message.as_deref()
+ }
+ RadrootsSimplexAgentEnvelope::Invitation { .. } => None,
+ };
+ let Some(official_message) = official_message else {
+ return Ok(false);
+ };
+ let connection = self.store.connection(connection_id)?;
+ let Some(ratchet_state) = connection.ratchet_state.as_ref() else {
+ return Ok(false);
+ };
+ if ratchet_state.official_associated_data.is_none() {
+ return Ok(false);
+ }
+ ratchet_state
+ .is_official_payload_replay(official_message)
+ .map_err(Into::into)
}
fn validate_inbound_frame_progress(
@@ -1940,6 +2148,14 @@ impl RadrootsSimplexAgentRuntime {
}
let connection = self.store.connection(connection_id)?;
let Some(last_message_id) = connection.delivery_cursor.last_received_message_id else {
+ if connection.status == RadrootsSimplexAgentConnectionStatus::Connected
+ && !connection.hello_received
+ && frame.header.message_id == 2
+ && !frame.header.previous_message_hash.is_empty()
+ && matches!(frame.message, RadrootsSimplexAgentMessage::UserMessage(_))
+ {
+ return Ok(());
+ }
if frame.header.message_id != 1 {
return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
"SimpleX inbound message id for `{connection_id}` started at `{}` instead of `1`",
@@ -2218,7 +2434,7 @@ impl RadrootsSimplexAgentRuntime {
))
})?
.decrypt_official_payload(&shared_secret, official_message)
- .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()));
+ .map_err(Into::into);
}
let header = encrypted.ratchet_header.as_ref().ok_or_else(|| {
RadrootsSimplexAgentRuntimeError::Runtime(format!(
@@ -2235,7 +2451,7 @@ impl RadrootsSimplexAgentRuntime {
))
})?
.decrypt_payload(&shared_secret, header, &encrypted.ciphertext)
- .map_err(|error| RadrootsSimplexAgentRuntimeError::Runtime(error.to_string()))
+ .map_err(Into::into)
}
fn agent_payload_padded_len(
@@ -2367,25 +2583,52 @@ fn prepare_short_invitation_link_data(
fn short_invitation_server(
invitation: &RadrootsSimplexAgentShortInvitationLink,
) -> Result<RadrootsSimplexSmpServerAddress, RadrootsSimplexAgentRuntimeError> {
- let server_identity = invitation.hosts.first().cloned().ok_or_else(|| {
- RadrootsSimplexAgentRuntimeError::Runtime(
+ if invitation.hosts.is_empty() {
+ return Err(RadrootsSimplexAgentRuntimeError::Runtime(
"SimpleX short invitation link does not include a relay host".into(),
+ ));
+ }
+ let server_key_hash = invitation.server_key_hash.as_ref().ok_or_else(|| {
+ RadrootsSimplexAgentRuntimeError::Runtime(
+ "SimpleX short invitation link does not include a server key hash".into(),
)
})?;
Ok(RadrootsSimplexSmpServerAddress {
- server_identity,
+ server_identity: URL_SAFE_NO_PAD.encode(server_key_hash),
hosts: invitation.hosts.clone(),
port: invitation.port,
})
}
-fn correlation_id_for_command(command_id: u64) -> RadrootsSimplexSmpCorrelationId {
- let digest = derive_material(b"simplex-command-correlation", &[&command_id.to_be_bytes()]);
+fn decode_server_key_hash(
+ server_identity: &str,
+) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
+ URL_SAFE_NO_PAD
+ .decode(server_identity.as_bytes())
+ .or_else(|_| URL_SAFE.decode(server_identity.as_bytes()))
+ .map_err(|error| {
+ RadrootsSimplexAgentRuntimeError::Runtime(format!(
+ "failed to decode SimpleX server identity: {error}"
+ ))
+ })
+}
+
+fn correlation_id_from_material(
+ label: &[u8],
+ parts: &[Vec<u8>],
+) -> RadrootsSimplexSmpCorrelationId {
+ let refs = parts.iter().map(Vec::as_slice).collect::<Vec<&[u8]>>();
+ let digest = derive_material(label, &refs);
let mut correlation = [0_u8; RadrootsSimplexSmpCorrelationId::LENGTH];
correlation.copy_from_slice(&digest[..RadrootsSimplexSmpCorrelationId::LENGTH]);
RadrootsSimplexSmpCorrelationId::new(correlation)
}
+fn short_link_sender_id(correlation_id: &RadrootsSimplexSmpCorrelationId) -> Vec<u8> {
+ let digest = Sha3_384::digest(correlation_id.as_bytes());
+ digest[..RadrootsSimplexSmpCorrelationId::LENGTH].to_vec()
+}
+
fn encode_queue_public_key(public_key: &[u8]) -> Result<String, RadrootsSimplexSmpCryptoError> {
Ok(URL_SAFE.encode(encode_x25519_public_key_x509(public_key)?))
}
@@ -2432,6 +2675,36 @@ fn queue_for_command(
}
}
+fn pending_command_kind_label(command: &RadrootsSimplexAgentPendingCommand) -> &'static str {
+ match command.kind {
+ RadrootsSimplexAgentPendingCommandKind::CreateQueue { .. } => "create_queue",
+ RadrootsSimplexAgentPendingCommandKind::SecureQueue { .. } => "secure_queue",
+ RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. } => "subscribe_queue",
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. } => "get_queue_message",
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. } => "ack_inbox_message",
+ RadrootsSimplexAgentPendingCommandKind::SendEnvelope { .. } => "send_envelope",
+ RadrootsSimplexAgentPendingCommandKind::RotateQueues { .. } => "rotate_queues",
+ RadrootsSimplexAgentPendingCommandKind::TestQueues { .. } => "test_queues",
+ RadrootsSimplexAgentPendingCommandKind::SetQueueLinkData { .. } => "set_queue_link_data",
+ RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData { .. } => "get_queue_link_data",
+ RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData { .. } => {
+ "secure_get_queue_link_data"
+ }
+ }
+}
+
+fn is_empty_queue_no_msg(
+ command: &RadrootsSimplexAgentPendingCommand,
+ error: &RadrootsSimplexSmpError,
+) -> bool {
+ matches!(
+ command.kind,
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { .. }
+ | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { .. }
+ | RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { .. }
+ ) && matches!(error, RadrootsSimplexSmpError::NoMsg)
+}
+
fn encode_client_message_envelope(
envelope: &SimplexClientMessageEnvelope,
) -> Result<Vec<u8>, RadrootsSimplexAgentRuntimeError> {
@@ -2570,8 +2843,8 @@ fn decode_received_body(
}
let flags = RadrootsSimplexSmpMessageFlags {
notification: match flags_bytes[0] {
- 0 => false,
- 1 => true,
+ b'F' => false,
+ b'T' => true,
other => {
return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
"SimpleX received body has invalid notification flag `{other}`"
@@ -2603,14 +2876,14 @@ mod tests {
fn invitation_queue() -> RadrootsSimplexSmpQueueUri {
RadrootsSimplexSmpQueueUri::parse(
- "smp://aGVsbG8@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m",
+ "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m",
)
.unwrap()
}
fn reply_queue() -> RadrootsSimplexSmpQueueUri {
RadrootsSimplexSmpQueueUri::parse(
- "smp://aGVsbG8@relay.example/cmVwbHk#/?v=4&dh=YmF6cXV4&q=m",
+ "smp://BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc@relay.example/cmVwbHk#/?v=4&dh=YmF6cXV4&q=m",
)
.unwrap()
}
@@ -2658,6 +2931,21 @@ mod tests {
Sha256::digest(&encoded).to_vec()
}
+ #[test]
+ fn received_body_decodes_official_message_flags() {
+ let mut body = 1_725_555_000_i64.to_be_bytes().to_vec();
+ body.extend_from_slice(b"T rr-synth-body");
+ let decoded = decode_received_body(&body).unwrap();
+ assert_eq!(decoded.timestamp, 1_725_555_000_u64);
+ assert!(decoded.flags.notification);
+ assert!(decoded.flags.reserved.is_empty());
+ assert_eq!(decoded.sent_body, b"rr-synth-body");
+
+ body[8] = b'F';
+ let decoded = decode_received_body(&body).unwrap();
+ assert!(!decoded.flags.notification);
+ }
+
fn receipt_message(
frame_message_id: u64,
message_id: u64,
@@ -2911,7 +3199,14 @@ mod tests {
else {
panic!("invitation NEW should carry short-link messaging data");
};
- assert!(!link_request.sender_id.is_empty());
+ let create_correlation = transport.requests[0]
+ .correlation_id
+ .as_ref()
+ .expect("create command should carry a correlation ID");
+ assert_eq!(
+ link_request.sender_id,
+ short_link_sender_id(create_correlation)
+ );
assert!(!link_request.link_data.fixed_data.is_empty());
assert!(!link_request.link_data.user_data.is_empty());
assert!(matches!(
@@ -3023,12 +3318,10 @@ mod tests {
.join_short_invitation(short_invitation.clone(), reply_queue(), 40)
.unwrap();
let mut join_transport = ScriptedTransport::with_responses(vec![
- RadrootsSimplexSmpBrokerMessage::Ok,
RadrootsSimplexSmpBrokerMessage::Lnk {
sender_id: b"sender".to_vec(),
link_data: stored_link_data,
},
- RadrootsSimplexSmpBrokerMessage::Ok,
ids_response(b"recipient-2", b"sender-2", b"server-dh-2"),
RadrootsSimplexSmpBrokerMessage::Ok,
RadrootsSimplexSmpBrokerMessage::Ok,
@@ -3037,25 +3330,35 @@ mod tests {
.execute_ready_commands(&mut join_transport, 50, 16)
.unwrap();
- assert_eq!(join_transport.requests.len(), 6);
- let RadrootsSimplexSmpCommand::LKey(link_key) = &join_transport.requests[0].command else {
+ assert_eq!(join_transport.requests.len(), 4);
+ let RadrootsSimplexSmpCommand::LKey(sender_auth_key) = &join_transport.requests[0].command
+ else {
panic!("short invitation join should authorize link retrieval first");
};
- assert_eq!(link_key, &short_invitation.link_key);
+ let RadrootsSimplexSmpCommandAuthorization::Ed25519(lkey_auth) =
+ &join_transport.requests[0].authorization
+ else {
+ panic!("short invitation link retrieval should be signed");
+ };
+ assert_eq!(
+ sender_auth_key,
+ &encode_ed25519_public_key_x509(&lkey_auth.public_key).unwrap()
+ );
assert_eq!(
join_transport.requests[0].entity_id,
short_invitation.link_id.clone()
);
- assert!(matches!(
- join_transport.requests[1].command,
- RadrootsSimplexSmpCommand::LGet
- ));
- let RadrootsSimplexSmpCommand::SKey(_) = &join_transport.requests[2].command else {
- panic!("short invitation join should secure the invitation send queue");
- };
- let RadrootsSimplexSmpCommand::New(_) = &join_transport.requests[3].command else {
+ let RadrootsSimplexSmpCommand::New(_) = &join_transport.requests[1].command else {
panic!("short invitation join should create the reply queue");
};
+ assert!(matches!(
+ join_transport.requests[2].command,
+ RadrootsSimplexSmpCommand::Sub
+ ));
+ assert!(matches!(
+ join_transport.requests[3].command,
+ RadrootsSimplexSmpCommand::Send(_)
+ ));
let joined_connection = runtime.store.connection(&joined).unwrap();
assert_eq!(
joined_connection.status,
@@ -3075,6 +3378,19 @@ mod tests {
.sender_id,
URL_SAFE_NO_PAD.encode(b"sender")
);
+ let send_auth = runtime
+ .store
+ .primary_send_queue(&joined)
+ .unwrap()
+ .auth_state
+ .unwrap();
+ assert_eq!(send_auth.public_key, lkey_auth.public_key);
+ let RadrootsSimplexSmpCommandAuthorization::Ed25519(send_auth_request) =
+ &join_transport.requests[3].authorization
+ else {
+ panic!("short invitation join confirmation should be signed");
+ };
+ assert_eq!(send_auth_request.public_key, send_auth.public_key);
assert!(runtime.drain_events(16).iter().any(|event| matches!(
event,
RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { connection_id }
@@ -3258,6 +3574,43 @@ mod tests {
}
#[test]
+ fn get_no_msg_response_is_empty_queue_success() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ let _ = runtime.drain_events(16);
+
+ runtime.get_connection_message(&created, 40).unwrap();
+ let mut get_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err(
+ RadrootsSimplexSmpError::NoMsg,
+ )]);
+ runtime
+ .execute_ready_commands(&mut get_transport, 50, 16)
+ .unwrap();
+
+ assert_eq!(get_transport.requests.len(), 1);
+ assert!(matches!(
+ get_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::Get
+ ));
+ assert!(runtime.retry_pending(50, 16).is_empty());
+ assert!(!runtime.drain_events(16).iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
+ )));
+ }
+
+ #[test]
fn subscription_receive_routes_broker_transmission_by_entity_id() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
let created = runtime
@@ -3291,13 +3644,34 @@ mod tests {
subscription_transport.subscription_requests[0].server,
receive_queue.descriptor.queue_uri.server
);
+ assert!(runtime.drain_events(16).is_empty());
+ }
+
+ #[test]
+ fn subscribe_no_msg_response_marks_queue_subscribed() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg),
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+
+ assert_eq!(setup_transport.requests.len(), 2);
assert!(matches!(
- runtime.drain_events(16).first(),
- Some(RadrootsSimplexAgentRuntimeEvent::Error {
- connection_id: Some(connection_id),
- message,
- }) if connection_id == &created && message.contains("NoMsg")
+ setup_transport.requests[1].command,
+ RadrootsSimplexSmpCommand::Sub
));
+ assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
+ assert!(!runtime.drain_events(16).iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
+ )));
}
#[test]
@@ -3398,6 +3772,27 @@ mod tests {
}
#[test]
+ fn inbound_progress_allows_first_visible_user_message_after_missing_peer_hello() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let connection_id = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+ mark_connected(&mut runtime, &connection_id);
+ runtime
+ .store
+ .connection_mut(&connection_id)
+ .unwrap()
+ .hello_received = false;
+
+ let frame = user_message_frame(2, b"peer-hello-hash".to_vec(), b"first visible");
+ let frame_hash = agent_message_hash(&frame);
+
+ runtime
+ .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash)
+ .unwrap();
+ }
+
+ #[test]
fn inbound_progress_rejects_regression_after_accepted_next_message() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
let connection_id = runtime
@@ -3500,6 +3895,16 @@ mod tests {
runtime.store.connection(&created).unwrap().status,
RadrootsSimplexAgentConnectionStatus::AwaitingApproval
);
+ runtime
+ .handle_inbound_decrypted_message(
+ &created,
+ RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply {
+ reply_queues: vec![reply_descriptor()],
+ info: b"peer-info".to_vec(),
+ },
+ b"reply-confirmation-duplicate".to_vec(),
+ )
+ .unwrap();
initialize_test_outbound_official_ratchet(&mut runtime, &created);
runtime
@@ -3508,10 +3913,12 @@ mod tests {
let mut allow_transport = ScriptedTransport::with_responses(vec![
RadrootsSimplexSmpBrokerMessage::Ok,
RadrootsSimplexSmpBrokerMessage::Ok,
+ RadrootsSimplexSmpBrokerMessage::Ok,
]);
runtime
.execute_ready_commands(&mut allow_transport, 50, 16)
.unwrap();
+ assert_eq!(allow_transport.requests.len(), 3);
assert!(matches!(
allow_transport.requests[0].command,
RadrootsSimplexSmpCommand::SKey(_)
@@ -3520,7 +3927,17 @@ mod tests {
allow_transport.requests[1].command,
RadrootsSimplexSmpCommand::Send(_)
));
- assert!(!runtime.store.connection(&created).unwrap().hello_sent);
+ assert!(matches!(
+ allow_transport.requests[2].command,
+ RadrootsSimplexSmpCommand::Send(_)
+ ));
+ let connection = runtime.store.connection(&created).unwrap();
+ assert_eq!(
+ connection.status,
+ RadrootsSimplexAgentConnectionStatus::Connected
+ );
+ assert!(connection.hello_sent);
+ assert!(!connection.hello_received);
runtime
.handle_inbound_decrypted_message(&created, hello_message(1), b"hello-in".to_vec())
@@ -3537,17 +3954,11 @@ mod tests {
RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { connection_id }
if connection_id == created
)));
-
- let mut hello_transport =
- ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
+ let mut hello_transport = ScriptedTransport::with_responses(vec![]);
runtime
.execute_ready_commands(&mut hello_transport, 60, 16)
.unwrap();
- assert_eq!(hello_transport.requests.len(), 1);
- assert!(matches!(
- hello_transport.requests[0].command,
- RadrootsSimplexSmpCommand::Send(_)
- ));
+ assert!(hello_transport.requests.is_empty());
}
#[test]
@@ -3891,6 +4302,77 @@ mod tests {
}
#[test]
+ fn ack_no_msg_response_is_idempotently_delivered() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ mark_connected(&mut runtime, &created);
+ let _ = runtime.drain_events(16);
+
+ let receive_queue = runtime.store.receive_queues(&created).unwrap()[0]
+ .descriptor
+ .queue_address();
+ let frame = user_message_frame(7, Vec::new(), b"ack target");
+ let message_hash = agent_message_hash(&frame);
+ runtime
+ .store
+ .record_inbound_message(
+ &created,
+ receive_queue,
+ b"already-acked-broker-message".to_vec(),
+ frame.header.message_id,
+ message_hash.clone(),
+ )
+ .unwrap();
+ runtime
+ .ack_message(
+ &created,
+ frame.header.message_id,
+ message_hash.clone(),
+ Vec::new(),
+ 40,
+ )
+ .unwrap();
+
+ let mut ack_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Err(
+ RadrootsSimplexSmpError::NoMsg,
+ )]);
+ runtime
+ .execute_ready_commands(&mut ack_transport, 50, 16)
+ .unwrap();
+
+ assert_eq!(ack_transport.requests.len(), 1);
+ assert!(matches!(
+ ack_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::Ack(_)
+ ));
+ assert!(runtime.retry_pending(50, 16).is_empty());
+ let events = runtime.drain_events(16);
+ assert!(events.iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered {
+ connection_id,
+ message_id: 7,
+ message_hash: delivered_hash,
+ } if connection_id == &created && delivered_hash == &message_hash
+ )));
+ assert!(!events.iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::Error { message, .. } if message.contains("NoMsg")
+ )));
+ }
+
+ #[test]
fn manual_record_command_failure_clears_staged_delivery_state() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
let created = runtime
diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs
@@ -260,7 +260,7 @@ pub enum RadrootsSimplexAgentPendingCommandKind {
AckInboxMessage {
queue: RadrootsSimplexAgentQueueAddress,
broker_message_id: Vec<u8>,
- receipt: RadrootsSimplexAgentMessageReceipt,
+ receipt: Option<RadrootsSimplexAgentMessageReceipt>,
},
RotateQueues {
descriptors: Vec<RadrootsSimplexAgentQueueDescriptor>,
@@ -276,6 +276,7 @@ pub enum RadrootsSimplexAgentPendingCommandKind {
SecureGetQueueLinkData {
invitation: RadrootsSimplexAgentShortInvitationLink,
reply_queue: RadrootsSimplexSmpQueueUri,
+ sender_auth_state: RadrootsSimplexAgentQueueAuthState,
},
GetQueueLinkData {
invitation: RadrootsSimplexAgentShortInvitationLink,
@@ -503,7 +504,7 @@ enum RadrootsSimplexAgentPendingCommandKindSnapshot {
AckInboxMessage {
queue: RadrootsSimplexAgentQueueAddressSnapshot,
broker_message_id: Vec<u8>,
- receipt: RadrootsSimplexAgentMessageReceiptSnapshot,
+ receipt: Option<RadrootsSimplexAgentMessageReceiptSnapshot>,
},
RotateQueues {
descriptors: Vec<RadrootsSimplexAgentQueueDescriptorSnapshot>,
@@ -519,6 +520,8 @@ enum RadrootsSimplexAgentPendingCommandKindSnapshot {
SecureGetQueueLinkData {
invitation: RadrootsSimplexAgentShortInvitationLinkSnapshot,
reply_queue: String,
+ sender_auth_public_key: Vec<u8>,
+ sender_auth_private_key: Vec<u8>,
},
GetQueueLinkData {
invitation: RadrootsSimplexAgentShortInvitationLinkSnapshot,
@@ -592,6 +595,7 @@ struct RadrootsSimplexAgentPendingCommandSecretsSnapshot {
id: u64,
connection_id: String,
short_invitation_link_key: Option<Vec<u8>>,
+ short_invitation_sender_auth_private_key: Option<Vec<u8>>,
}
#[cfg(feature = "std")]
@@ -1199,12 +1203,34 @@ impl RadrootsSimplexAgentStore {
command.connection_id == connection_id
&& matches!(
&command.kind,
- RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { receipt, .. }
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
+ receipt: Some(receipt),
+ ..
+ }
if receipt.message_id == message_id && receipt.message_hash == message_hash
)
})
}
+ pub fn has_pending_broker_ack(
+ &self,
+ connection_id: &str,
+ queue_address: &RadrootsSimplexAgentQueueAddress,
+ broker_message_id: &[u8],
+ ) -> bool {
+ self.pending_commands.values().any(|command| {
+ command.connection_id == connection_id
+ && matches!(
+ &command.kind,
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
+ queue,
+ broker_message_id: pending_broker_message_id,
+ ..
+ } if queue == queue_address && pending_broker_message_id == broker_message_id
+ )
+ })
+ }
+
pub fn has_pending_subscribe_queue(
&self,
connection_id: &str,
@@ -1769,6 +1795,8 @@ fn redact_pending_command_secrets(
short_invitation_link_key: redact_pending_command_short_invitation_link_key(
&mut command.kind,
),
+ short_invitation_sender_auth_private_key:
+ redact_pending_command_short_invitation_sender_auth_private_key(&mut command.kind),
}
}
@@ -1789,6 +1817,19 @@ fn redact_pending_command_short_invitation_link_key(
}
#[cfg(feature = "std")]
+fn redact_pending_command_short_invitation_sender_auth_private_key(
+ kind: &mut RadrootsSimplexAgentPendingCommandKindSnapshot,
+) -> Option<Vec<u8>> {
+ match kind {
+ RadrootsSimplexAgentPendingCommandKindSnapshot::SecureGetQueueLinkData {
+ sender_auth_private_key,
+ ..
+ } => take_non_empty_vec(sender_auth_private_key),
+ _ => None,
+ }
+}
+
+#[cfg(feature = "std")]
fn redact_x3dh_keypair_private(
keypair: &mut Option<RadrootsSimplexAgentX3dhKeypair>,
) -> Option<Vec<u8>> {
@@ -2382,9 +2423,23 @@ fn validate_public_pending_command_secret_posture(
match &command.kind {
RadrootsSimplexAgentPendingCommandKindSnapshot::SecureGetQueueLinkData {
invitation,
+ sender_auth_private_key,
..
+ } => {
+ reject_public_secret_vec(
+ invitation.link_key.as_slice(),
+ protected_secrets_configured,
+ "pending short-link invitation link key",
+ &command.connection_id,
+ )?;
+ reject_public_secret_vec(
+ sender_auth_private_key.as_slice(),
+ protected_secrets_configured,
+ "pending short-link sender auth private key",
+ &command.connection_id,
+ )
}
- | RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueLinkData { invitation, .. } => {
+ RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueLinkData { invitation, .. } => {
reject_public_secret_vec(
invitation.link_key.as_slice(),
protected_secrets_configured,
@@ -2667,17 +2722,29 @@ fn merge_pending_command_secrets(
))
})?;
- let Some(link_key) = secrets.short_invitation_link_key else {
- return Ok(());
- };
-
match &mut command.kind {
RadrootsSimplexAgentPendingCommandKindSnapshot::SecureGetQueueLinkData {
invitation,
+ sender_auth_private_key,
..
+ } => {
+ if let Some(link_key) = secrets.short_invitation_link_key {
+ invitation.link_key = link_key;
+ }
+ if let Some(private_key) = secrets.short_invitation_sender_auth_private_key {
+ *sender_auth_private_key = private_key;
+ }
+ Ok(())
}
- | RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueLinkData { invitation, .. } => {
- invitation.link_key = link_key;
+ RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueLinkData { invitation, .. } => {
+ if let Some(link_key) = secrets.short_invitation_link_key {
+ invitation.link_key = link_key;
+ }
+ Ok(())
+ }
+ _ if secrets.short_invitation_link_key.is_none()
+ && secrets.short_invitation_sender_auth_private_key.is_none() =>
+ {
Ok(())
}
_ => Err(RadrootsSimplexAgentStoreError::Persistence(format!(
@@ -3206,11 +3273,11 @@ fn command_kind_to_snapshot(
} => RadrootsSimplexAgentPendingCommandKindSnapshot::AckInboxMessage {
queue: queue_address_to_snapshot(queue),
broker_message_id,
- receipt: RadrootsSimplexAgentMessageReceiptSnapshot {
+ receipt: receipt.map(|receipt| RadrootsSimplexAgentMessageReceiptSnapshot {
message_id: receipt.message_id,
message_hash: receipt.message_hash,
receipt_info: receipt.receipt_info,
- },
+ }),
},
RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => {
RadrootsSimplexAgentPendingCommandKindSnapshot::RotateQueues {
@@ -3237,9 +3304,12 @@ fn command_kind_to_snapshot(
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation,
reply_queue,
+ sender_auth_state,
} => RadrootsSimplexAgentPendingCommandKindSnapshot::SecureGetQueueLinkData {
invitation: short_invitation_to_snapshot(invitation),
reply_queue: reply_queue.to_string(),
+ sender_auth_public_key: sender_auth_state.public_key,
+ sender_auth_private_key: sender_auth_state.private_key,
},
RadrootsSimplexAgentPendingCommandKind::GetQueueLinkData {
invitation,
@@ -3297,11 +3367,11 @@ fn command_kind_from_snapshot(
} => RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
queue: queue_address_from_snapshot(queue)?,
broker_message_id,
- receipt: RadrootsSimplexAgentMessageReceipt {
+ receipt: receipt.map(|receipt| RadrootsSimplexAgentMessageReceipt {
message_id: receipt.message_id,
message_hash: receipt.message_hash,
receipt_info: receipt.receipt_info,
- },
+ }),
},
RadrootsSimplexAgentPendingCommandKindSnapshot::RotateQueues { descriptors } => {
RadrootsSimplexAgentPendingCommandKind::RotateQueues {
@@ -3331,9 +3401,15 @@ fn command_kind_from_snapshot(
RadrootsSimplexAgentPendingCommandKindSnapshot::SecureGetQueueLinkData {
invitation,
reply_queue,
+ sender_auth_public_key,
+ sender_auth_private_key,
} => RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation: short_invitation_from_snapshot(invitation)?,
reply_queue: queue_uri_from_string(&reply_queue)?,
+ sender_auth_state: RadrootsSimplexAgentQueueAuthState {
+ public_key: sender_auth_public_key,
+ private_key: sender_auth_private_key,
+ },
},
RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueLinkData {
invitation,
@@ -3814,6 +3890,7 @@ mod tests {
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation: secure_short_invitation.clone(),
reply_queue: short_reply_queue.clone(),
+ sender_auth_state: sample_auth_state(),
},
13,
)
@@ -4115,7 +4192,8 @@ mod tests {
&command.kind,
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation,
- reply_queue
+ reply_queue,
+ ..
} if invitation == &secure_short_invitation
&& reply_queue == &short_reply_queue
)));
@@ -4435,6 +4513,7 @@ mod tests {
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation: sample_short_invitation_link(vec![5_u8; 24]),
reply_queue: sample_descriptor(true).queue_uri,
+ sender_auth_state: sample_auth_state(),
},
10,
)
diff --git a/crates/simplex_interop_tests/src/fixtures.rs b/crates/simplex_interop_tests/src/fixtures.rs
@@ -12,14 +12,14 @@ pub const fn synthetic_connection_id() -> &'static str {
pub fn synthetic_invitation_queue() -> RadrootsSimplexSmpQueueUri {
RadrootsSimplexSmpQueueUri::parse(
- "smp://cnItc3ludGg@relay.synthetic.invalid/aW52aXRl#/?v=4&dh=cnItc3ludGgtZGg&q=m",
+ "smp://cnItc3ludGgtc2VydmVyLWtleS1oYXNoLTAwMDAwMDE@relay.synthetic.invalid/aW52aXRl#/?v=4&dh=cnItc3ludGgtZGg&q=m",
)
.unwrap()
}
pub fn synthetic_reply_queue() -> RadrootsSimplexSmpQueueUri {
RadrootsSimplexSmpQueueUri::parse(
- "smp://cnItc3ludGg@reply.synthetic.invalid/cmVwbHk#/?v=4&dh=cnItc3ludGgtcmVwbHk&q=m",
+ "smp://cnItc3ludGgtcmVwbHkta2V5LWhhc2gtMDAwMDAwMDE@reply.synthetic.invalid/cmVwbHk#/?v=4&dh=cnItc3ludGgtcmVwbHk&q=m",
)
.unwrap()
}
diff --git a/crates/simplex_smp_crypto/src/ratchet.rs b/crates/simplex_smp_crypto/src/ratchet.rs
@@ -461,6 +461,39 @@ impl RadrootsSimplexSmpRatchetState {
Ok(plaintext)
}
+ pub fn is_official_payload_replay(
+ &self,
+ encrypted_message: &[u8],
+ ) -> Result<bool, RadrootsSimplexSmpCryptoError> {
+ let message = decode_official_encrypted_message(encrypted_message)?;
+ let header = decode_official_encrypted_header(&message.encrypted_header)?;
+ let ratchet_ad = self.official_associated_data.clone().ok_or(
+ RadrootsSimplexSmpCryptoError::MissingRatchetKey("official_associated_data"),
+ )?;
+ for skipped in &self.official_skipped_message_keys {
+ if let Ok(ratchet_header) =
+ decrypt_official_header_with_key(&header, &skipped.header_key, &ratchet_ad)
+ && ratchet_header.message_number == skipped.message_number
+ {
+ return Ok(false);
+ }
+ }
+ if let Some(receiving_header_key) = self.official_receiving_header_key.as_ref()
+ && let Ok(ratchet_header) =
+ decrypt_official_header_with_key(&header, receiving_header_key, &ratchet_ad)
+ {
+ return Ok(ratchet_header.message_number < self.receiving_chain_length);
+ }
+ if let Some(next_receiving_header_key) = self.official_next_receiving_header_key.as_ref()
+ && let Ok(ratchet_header) =
+ decrypt_official_header_with_key(&header, next_receiving_header_key, &ratchet_ad)
+ {
+ return Ok(ratchet_header.message_number < self.receiving_chain_length
+ && ratchet_header.previous_sending_chain_length < self.receiving_chain_length);
+ }
+ Ok(false)
+ }
+
fn decrypt_official_skipped_payload(
&mut self,
header: &RadrootsSimplexOfficialEncryptedHeader,
@@ -1149,6 +1182,33 @@ mod tests {
}
#[test]
+ fn detects_official_payload_replay_without_consuming_skipped_messages() {
+ let (mut sender, mut receiver) = official_sender_receiver_ratchets();
+ let shared_secret = [14_u8; RADROOTS_SIMPLEX_SMP_SHARED_SECRET_LENGTH];
+ let first = sender
+ .encrypt_official_payload(&shared_secret, b"first", 96)
+ .unwrap();
+ let second = sender
+ .encrypt_official_payload(&shared_secret, b"second", 96)
+ .unwrap();
+
+ assert_eq!(
+ receiver
+ .decrypt_official_payload(&shared_secret, &second)
+ .unwrap(),
+ b"second"
+ );
+ assert!(!receiver.is_official_payload_replay(&first).unwrap());
+ assert_eq!(
+ receiver
+ .decrypt_official_payload(&shared_secret, &first)
+ .unwrap(),
+ b"first"
+ );
+ assert!(receiver.is_official_payload_replay(&first).unwrap());
+ }
+
+ #[test]
fn advances_official_pq_ratchet_in_both_directions() {
let (mut sender, mut receiver) = official_pq_sender_receiver_ratchets();
let shared_secret = [21_u8; RADROOTS_SIMPLEX_SMP_SHARED_SECRET_LENGTH];
diff --git a/crates/simplex_smp_transport/src/client.rs b/crates/simplex_smp_transport/src/client.rs
@@ -22,8 +22,9 @@ use radroots_simplex_smp_crypto::prelude::{
};
use radroots_simplex_smp_proto::prelude::{
RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION,
- RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION, RadrootsSimplexSmpCommandTransmission,
- RadrootsSimplexSmpCorrelationId, RadrootsSimplexSmpServerAddress,
+ RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage,
+ RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId,
+ RadrootsSimplexSmpServerAddress,
};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
@@ -32,7 +33,7 @@ use rustls::{
StreamOwned,
};
use sha2::{Digest, Sha256};
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, VecDeque};
use std::io::{ErrorKind, Read, Write};
use std::net::{IpAddr, TcpStream, ToSocketAddrs};
use std::sync::Arc;
@@ -44,6 +45,9 @@ pub struct RadrootsSimplexSmpTlsCommandTransport {
sessions: BTreeMap<String, RadrootsSimplexSmpLiveSession>,
}
+const LIVE_SESSION_TIMEOUT: Duration = Duration::from_secs(5);
+const LIVE_EMPTY_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_millis(150);
+
struct RadrootsSimplexSmpLiveSession {
stream: StreamOwned<ClientConnection, TcpStream>,
transport_version: u16,
@@ -51,6 +55,7 @@ struct RadrootsSimplexSmpLiveSession {
send_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>,
receive_chain_key: Option<RadrootsSimplexSmpSecretBoxChainKey>,
debug_shared_secret: Option<Vec<u8>>,
+ pending_broker_responses: VecDeque<RadrootsSimplexSmpTransportResponse>,
}
impl RadrootsSimplexSmpTlsCommandTransport {
@@ -97,13 +102,20 @@ impl RadrootsSimplexSmpCommandTransport for RadrootsSimplexSmpTlsCommandTranspor
) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> {
let session_kind = session_kind_for_command(&request.command);
let key = Self::session_key(&request.server, session_kind);
- match execute_live_request(self.session_for(&request.server, session_kind)?, &request) {
+ let accepts_uncorrelated_subscription_response =
+ accepts_uncorrelated_subscription_response(&request.command);
+ match execute_live_request(
+ self.session_for(&request.server, session_kind)?,
+ &request,
+ accepts_uncorrelated_subscription_response,
+ ) {
Ok(response) => Ok(response),
Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
self.sessions.remove(&key);
let response = execute_live_request(
self.session_for(&request.server, session_kind)?,
&request,
+ accepts_uncorrelated_subscription_response,
);
match response {
Ok(response) => Ok(response),
@@ -129,6 +141,7 @@ impl RadrootsSimplexSmpSubscriptionTransport for RadrootsSimplexSmpTlsCommandTra
&request.server,
None,
true,
+ None,
) {
Ok(response) => Ok(response),
Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
@@ -155,9 +168,23 @@ fn session_kind_for_command(
}
}
+fn accepts_uncorrelated_subscription_response(
+ command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand,
+) -> bool {
+ matches!(
+ command,
+ radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Ack(_)
+ )
+}
+
fn execute_live_request(
session: &mut RadrootsSimplexSmpLiveSession,
request: &RadrootsSimplexSmpTransportRequest,
+ accept_uncorrelated_subscription_response: bool,
) -> Result<RadrootsSimplexSmpTransportResponse, RadrootsSimplexSmpTransportError> {
let correlation_id = request
.correlation_id
@@ -193,7 +220,16 @@ fn execute_live_request(
.flush()
.map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
- read_live_response(session, &request.server, Some(correlation_id), false)?.ok_or_else(|| {
+ let accepted_entity_id =
+ accept_uncorrelated_subscription_response.then_some(request.entity_id.as_slice());
+ read_live_response(
+ session,
+ &request.server,
+ Some(correlation_id),
+ false,
+ accepted_entity_id,
+ )?
+ .ok_or_else(|| {
RadrootsSimplexSmpTransportError::LiveTransportIo(
"SMP command response was not available before the read timeout".into(),
)
@@ -205,9 +241,30 @@ fn read_live_response(
server: &RadrootsSimplexSmpServerAddress,
expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
timeout_is_empty: bool,
+ accepted_subscription_entity_id: Option<&[u8]>,
) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> {
+ if expected_correlation_id.is_none()
+ && let Some(response) = session.pending_broker_responses.pop_front()
+ {
+ return Ok(Some(response));
+ }
+ if let Some(entity_id) = accepted_subscription_entity_id
+ && let Some(position) = session
+ .pending_broker_responses
+ .iter()
+ .position(|response| is_subscription_response_for_entity(response, entity_id))
+ {
+ return Ok(session.pending_broker_responses.remove(position));
+ }
let mut response_block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE];
- if let Err(error) = session.stream.read_exact(&mut response_block) {
+ if timeout_is_empty {
+ set_live_read_timeout(session, LIVE_EMPTY_SUBSCRIPTION_TIMEOUT)?;
+ }
+ let read_result = session.stream.read_exact(&mut response_block);
+ if timeout_is_empty {
+ set_live_read_timeout(session, LIVE_SESSION_TIMEOUT)?;
+ }
+ if let Err(error) = read_result {
if timeout_is_empty && matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) {
return Ok(None);
}
@@ -218,25 +275,78 @@ fn read_live_response(
let response_hash = Sha256::digest(&response_block).to_vec();
let decoded = decode_live_transport_block(session, &response_block)?;
let transmissions = decoded.decode_broker_transmissions(session.transport_version)?;
- if transmissions.len() != 1 {
- return Err(
- RadrootsSimplexSmpTransportError::UnexpectedBrokerTransmissionCount(
- transmissions.len(),
- ),
- );
- }
- let transmission = transmissions.into_iter().next().expect("checked len");
- if let Some(expected_correlation_id) = expected_correlation_id
- && transmission.correlation_id != Some(expected_correlation_id)
- {
+ let responses = transmissions
+ .into_iter()
+ .map(|transmission| RadrootsSimplexSmpTransportResponse {
+ server: server.clone(),
+ transport_version: session.transport_version,
+ transmission,
+ transport_hash: response_hash.clone(),
+ })
+ .collect::<Vec<_>>();
+ select_live_response(
+ &mut session.pending_broker_responses,
+ responses,
+ expected_correlation_id,
+ accepted_subscription_entity_id,
+ )
+}
+
+fn select_live_response(
+ pending_broker_responses: &mut VecDeque<RadrootsSimplexSmpTransportResponse>,
+ mut responses: Vec<RadrootsSimplexSmpTransportResponse>,
+ expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
+ accepted_subscription_entity_id: Option<&[u8]>,
+) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> {
+ if let Some(expected_correlation_id) = expected_correlation_id {
+ if let Some(position) = responses.iter().position(|response| {
+ response.transmission.correlation_id == Some(expected_correlation_id)
+ }) {
+ let matched_response = responses.remove(position);
+ pending_broker_responses.extend(responses);
+ return Ok(Some(matched_response));
+ }
+ if let Some(entity_id) = accepted_subscription_entity_id
+ && let Some(position) = responses
+ .iter()
+ .position(|response| is_subscription_response_for_entity(response, entity_id))
+ {
+ let matched_response = responses.remove(position);
+ pending_broker_responses.extend(responses);
+ return Ok(Some(matched_response));
+ }
+ pending_broker_responses.extend(responses);
return Err(RadrootsSimplexSmpTransportError::CorrelationIdMismatch);
}
- Ok(Some(RadrootsSimplexSmpTransportResponse {
- server: server.clone(),
- transport_version: session.transport_version,
- transmission,
- transport_hash: response_hash,
- }))
+ pending_broker_responses.extend(responses);
+ Ok(pending_broker_responses.pop_front())
+}
+
+fn is_subscription_response_for_entity(
+ response: &RadrootsSimplexSmpTransportResponse,
+ entity_id: &[u8],
+) -> bool {
+ response.transmission.entity_id == entity_id
+ && matches!(
+ response.transmission.message,
+ RadrootsSimplexSmpBrokerMessage::Msg(_)
+ | RadrootsSimplexSmpBrokerMessage::NMsg { .. }
+ | RadrootsSimplexSmpBrokerMessage::Sok(_)
+ | RadrootsSimplexSmpBrokerMessage::Soks(_)
+ | RadrootsSimplexSmpBrokerMessage::Ok
+ | RadrootsSimplexSmpBrokerMessage::Err(_)
+ )
+}
+
+fn set_live_read_timeout(
+ session: &mut RadrootsSimplexSmpLiveSession,
+ timeout: Duration,
+) -> Result<(), RadrootsSimplexSmpTransportError> {
+ session
+ .stream
+ .sock
+ .set_read_timeout(Some(timeout))
+ .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))
}
fn transport_debug_enabled() -> bool {
@@ -401,17 +511,16 @@ fn connect_live_session_host(
"failed to resolve SMP server host `{host}:{port}`"
))
})?;
- let tcp =
- TcpStream::connect_timeout(&socket_addr, Duration::from_secs(5)).map_err(|error| {
- RadrootsSimplexSmpTransportError::LiveTransportIo(format!(
- "failed to connect to SMP server `{host}:{port}`: {error}"
- ))
- })?;
+ let tcp = TcpStream::connect_timeout(&socket_addr, LIVE_SESSION_TIMEOUT).map_err(|error| {
+ RadrootsSimplexSmpTransportError::LiveTransportIo(format!(
+ "failed to connect to SMP server `{host}:{port}`: {error}"
+ ))
+ })?;
tcp.set_nodelay(true)
.map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
- tcp.set_read_timeout(Some(Duration::from_secs(5)))
+ tcp.set_read_timeout(Some(LIVE_SESSION_TIMEOUT))
.map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
- tcp.set_write_timeout(Some(Duration::from_secs(5)))
+ tcp.set_write_timeout(Some(LIVE_SESSION_TIMEOUT))
.map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
let server_name = match host.parse::<IpAddr>() {
@@ -530,6 +639,7 @@ fn connect_live_session_host(
send_chain_key,
receive_chain_key,
debug_shared_secret,
+ pending_broker_responses: VecDeque::new(),
})
}
@@ -843,9 +953,10 @@ mod tests {
use super::{
canonical_server_identity, decode_encrypted_transport_block,
decode_server_transport_public_key, encode_encrypted_transport_payload,
+ select_live_response,
};
use crate::handshake::RadrootsSimplexSmpTransportServerProof;
- use crate::prelude::RadrootsSimplexSmpTransportBlock;
+ use crate::prelude::{RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportResponse};
use radroots_simplex_smp_crypto::prelude::{
RadrootsSimplexSmpX25519Keypair, encode_x25519_public_key_x509, init_secretbox_chain,
};
@@ -853,7 +964,9 @@ mod tests {
RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage,
RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpCommand,
RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId,
+ RadrootsSimplexSmpReceivedMessage, RadrootsSimplexSmpServerAddress,
};
+ use std::collections::VecDeque;
#[test]
fn canonicalizes_padded_and_unpadded_server_identity() {
@@ -966,6 +1079,93 @@ mod tests {
super::session_kind_for_command(&RadrootsSimplexSmpCommand::Ack(b"message".to_vec())),
"subscription"
);
+ assert!(super::accepts_uncorrelated_subscription_response(
+ &RadrootsSimplexSmpCommand::Ack(b"message".to_vec())
+ ));
+ assert!(super::accepts_uncorrelated_subscription_response(
+ &RadrootsSimplexSmpCommand::Sub
+ ));
+ }
+
+ #[test]
+ fn strict_command_selection_buffers_unmatched_response_and_errors() {
+ let mut pending = VecDeque::new();
+ let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
+ let unmatched = response(
+ Some(RadrootsSimplexSmpCorrelationId::new([2_u8; 24])),
+ b"rr-synth-entity",
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ );
+
+ assert_eq!(
+ select_live_response(&mut pending, vec![unmatched.clone()], Some(expected), None)
+ .unwrap_err(),
+ crate::prelude::RadrootsSimplexSmpTransportError::CorrelationIdMismatch
+ );
+ assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![unmatched]);
+ }
+
+ #[test]
+ fn matched_response_wins_and_buffers_subscription_message() {
+ let mut pending = VecDeque::new();
+ let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
+ let message = response(
+ None,
+ b"rr-synth-entity",
+ RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
+ message_id: b"message-1".to_vec(),
+ encrypted_body: b"body".to_vec(),
+ }),
+ );
+ let matched = response(
+ Some(expected),
+ b"rr-synth-entity",
+ RadrootsSimplexSmpBrokerMessage::Sok(None),
+ );
+
+ let selected = select_live_response(
+ &mut pending,
+ vec![message.clone(), matched.clone()],
+ Some(expected),
+ Some(b"rr-synth-entity"),
+ )
+ .unwrap();
+
+ assert_eq!(selected, Some(matched));
+ assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![message]);
+ }
+
+ #[test]
+ fn subscription_selection_accepts_uncorrelated_message_for_entity() {
+ let mut pending = VecDeque::new();
+ let expected = RadrootsSimplexSmpCorrelationId::new([1_u8; 24]);
+ let message = response(
+ None,
+ b"rr-synth-entity",
+ RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
+ message_id: b"message-1".to_vec(),
+ encrypted_body: b"body".to_vec(),
+ }),
+ );
+ let other = response(
+ None,
+ b"rr-other-entity",
+ RadrootsSimplexSmpBrokerMessage::Msg(RadrootsSimplexSmpReceivedMessage {
+ message_id: b"message-2".to_vec(),
+ encrypted_body: b"other".to_vec(),
+ }),
+ );
+
+ let selected = select_live_response(
+ &mut pending,
+ vec![other.clone(), message.clone()],
+ Some(expected),
+ Some(b"rr-synth-entity"),
+ )
+ .unwrap();
+
+ assert_eq!(selected, Some(message));
+ assert_eq!(pending.into_iter().collect::<Vec<_>>(), vec![other]);
}
fn der_sequence<'a, I>(elements: I) -> Vec<u8>
@@ -1001,4 +1201,26 @@ mod tests {
buffer.push(0x80 | (bytes.len() as u8));
buffer.extend_from_slice(&bytes);
}
+
+ fn response(
+ correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
+ entity_id: &[u8],
+ message: RadrootsSimplexSmpBrokerMessage,
+ ) -> RadrootsSimplexSmpTransportResponse {
+ RadrootsSimplexSmpTransportResponse {
+ server: RadrootsSimplexSmpServerAddress {
+ server_identity: "cnItc3ludGgtc2VydmVy".to_owned(),
+ hosts: vec!["127.0.0.1".to_owned()],
+ port: Some(5223),
+ },
+ transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
+ transmission: RadrootsSimplexSmpBrokerTransmission {
+ authorization: Vec::new(),
+ correlation_id,
+ entity_id: entity_id.to_vec(),
+ message,
+ },
+ transport_hash: vec![9_u8; 32],
+ }
+ }
}