commit d230245c5721d9bff853893b2aa421479ca417d1
parent 26aef8dd1bfca8fcbc2eca51e0f29e4f6bbf9db6
Author: triesap <tyson@radroots.org>
Date: Mon, 22 Jun 2026 22:27:41 +0000
simplex: split receive subscriptions from polling
- route receive subscriptions through SMP SUB sessions
- add explicit one-message GET runtime commands
- keep subscription and poll transport sessions separate
- cover async subscription routing and opt-in live receive tests
Diffstat:
7 files changed, 545 insertions(+), 31 deletions(-)
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -35,7 +35,8 @@ use radroots_simplex_smp_proto::prelude::{
RadrootsSimplexSmpSubscriptionMode,
};
use radroots_simplex_smp_transport::prelude::{
- RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportRequest,
+ RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
+ RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest,
RadrootsSimplexSmpTransportResponse,
};
use sha2::{Digest, Sha256};
@@ -386,6 +387,24 @@ impl RadrootsSimplexAgentRuntime {
Ok(())
}
+ pub fn get_connection_message(
+ &mut self,
+ connection_id: &str,
+ now: u64,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ for queue in self.store.receive_queues(connection_id)? {
+ self.store.enqueue_command(
+ connection_id,
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage {
+ queue: queue.descriptor.queue_address(),
+ },
+ now,
+ )?;
+ }
+ self.flush_store()?;
+ Ok(())
+ }
+
pub fn send_message(
&mut self,
connection_id: &str,
@@ -706,6 +725,43 @@ impl RadrootsSimplexAgentRuntime {
Ok(())
}
+ pub fn receive_subscription_messages<T: RadrootsSimplexSmpSubscriptionTransport>(
+ &mut self,
+ transport: &mut T,
+ limit: usize,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ let mut remaining = limit;
+ for server in self.store.subscribed_receive_servers() {
+ while remaining > 0 {
+ match transport.receive_subscription(RadrootsSimplexSmpSubscriptionReceiveRequest {
+ server: server.clone(),
+ }) {
+ Ok(Some(response)) => {
+ self.apply_subscription_response(response)?;
+ remaining = remaining.saturating_sub(1);
+ }
+ Ok(None) => break,
+ Err(error) => {
+ self.events
+ .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
+ connection_id: None,
+ message: format!(
+ "SimpleX subscription receive failed for server `{}`: {error}",
+ server.server_identity
+ ),
+ });
+ break;
+ }
+ }
+ }
+ if remaining == 0 {
+ break;
+ }
+ }
+ self.flush_store()?;
+ Ok(())
+ }
+
pub fn retry_pending(
&mut self,
now: u64,
@@ -869,7 +925,7 @@ impl RadrootsSimplexAgentRuntime {
RadrootsSimplexAgentRuntimeError::Runtime(error.to_string())
})?,
basic_auth: None,
- subscription_mode: RadrootsSimplexSmpSubscriptionMode::Subscribe,
+ subscription_mode: RadrootsSimplexSmpSubscriptionMode::OnlyCreate,
queue_request_data: Some(
match descriptor
.queue_uri
@@ -910,7 +966,16 @@ impl RadrootsSimplexAgentRuntime {
)),
RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => Ok((
queue.clone(),
- queue.sender_id.clone(),
+ self.store
+ .queue_record(&command.connection_id, queue)?
+ .entity_id,
+ RadrootsSimplexSmpCommand::Sub,
+ )),
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue } => Ok((
+ queue.clone(),
+ self.store
+ .queue_record(&command.connection_id, queue)?
+ .entity_id,
RadrootsSimplexSmpCommand::Get,
)),
RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
@@ -919,7 +984,9 @@ impl RadrootsSimplexAgentRuntime {
..
} => Ok((
queue.clone(),
- queue.sender_id.clone(),
+ self.store
+ .queue_record(&command.connection_id, queue)?
+ .entity_id,
RadrootsSimplexSmpCommand::Ack(broker_message_id.clone()),
)),
RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors } => {
@@ -991,6 +1058,45 @@ impl RadrootsSimplexAgentRuntime {
}
}
+ fn apply_subscription_response(
+ &mut self,
+ response: RadrootsSimplexSmpTransportResponse,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ let entity_id = response.transmission.entity_id.clone();
+ let (connection_id, queue) = self
+ .store
+ .receive_queue_by_entity_id(&response.server, &entity_id)
+ .ok_or_else(|| {
+ RadrootsSimplexAgentRuntimeError::Runtime(format!(
+ "SimpleX subscription response for server `{}` used unknown queue entity `{}`",
+ response.server.server_identity,
+ URL_SAFE_NO_PAD.encode(&entity_id)
+ ))
+ })?;
+ match response.transmission.message {
+ RadrootsSimplexSmpBrokerMessage::Msg(message) => self
+ .process_received_message_response(
+ &connection_id,
+ &queue,
+ message,
+ response.transport_hash,
+ ),
+ RadrootsSimplexSmpBrokerMessage::Err(error) => {
+ self.events
+ .push_back(RadrootsSimplexAgentRuntimeEvent::Error {
+ connection_id: Some(connection_id),
+ message: format!(
+ "SimpleX subscription broker error for queue entity `{}`: {:?}",
+ URL_SAFE_NO_PAD.encode(&entity_id),
+ error
+ ),
+ });
+ Ok(())
+ }
+ _ => Ok(()),
+ }
+ }
+
fn apply_delivery_side_effects(
&mut self,
command: &RadrootsSimplexAgentPendingCommand,
@@ -1393,6 +1499,7 @@ fn queue_for_command(
RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, .. }
| RadrootsSimplexAgentPendingCommandKind::SendEnvelope { queue, .. }
| RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue }
+ | RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue }
| RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { queue, .. } => {
Some(queue.clone())
}
@@ -1582,7 +1689,8 @@ mod tests {
RadrootsSimplexSmpX25519Keypair,
};
use radroots_simplex_smp_proto::prelude::{
- RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpQueueIdsResponse,
+ RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpError,
+ RadrootsSimplexSmpQueueIdsResponse,
};
use radroots_simplex_smp_transport::prelude::RadrootsSimplexSmpTransportBlock;
@@ -1619,14 +1727,29 @@ mod tests {
#[derive(Default)]
struct ScriptedTransport {
responses: VecDeque<RadrootsSimplexSmpBrokerMessage>,
+ subscription_responses: VecDeque<RadrootsSimplexSmpBrokerTransmission>,
requests: Vec<RadrootsSimplexSmpTransportRequest>,
+ subscription_requests: Vec<RadrootsSimplexSmpSubscriptionReceiveRequest>,
}
impl ScriptedTransport {
fn with_responses(responses: Vec<RadrootsSimplexSmpBrokerMessage>) -> Self {
Self {
responses: responses.into(),
+ subscription_responses: VecDeque::new(),
+ requests: Vec::new(),
+ subscription_requests: Vec::new(),
+ }
+ }
+
+ fn with_subscription_responses(
+ responses: Vec<RadrootsSimplexSmpBrokerTransmission>,
+ ) -> Self {
+ Self {
+ responses: VecDeque::new(),
+ subscription_responses: responses.into(),
requests: Vec::new(),
+ subscription_requests: Vec::new(),
}
}
}
@@ -1700,6 +1823,30 @@ mod tests {
}
}
+ impl RadrootsSimplexSmpSubscriptionTransport for ScriptedTransport {
+ fn receive_subscription(
+ &mut self,
+ request: RadrootsSimplexSmpSubscriptionReceiveRequest,
+ ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> {
+ self.subscription_requests.push(request.clone());
+ let Some(response_transmission) = self.subscription_responses.pop_front() else {
+ return Ok(None);
+ };
+ let response_block = RadrootsSimplexSmpTransportBlock::from_broker_transmissions(
+ &[response_transmission.clone()],
+ RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
+ )
+ .map_err(|error| error.to_string())?;
+ let response_encoded = response_block.encode().map_err(|error| error.to_string())?;
+ Ok(Some(RadrootsSimplexSmpTransportResponse {
+ server: request.server,
+ transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
+ transmission: response_transmission,
+ transport_hash: Sha256::digest(&response_encoded).to_vec(),
+ }))
+ }
+ }
+
#[test]
fn create_and_join_commands_execute_through_transport() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
@@ -1733,6 +1880,22 @@ mod tests {
assert!(created_queue[0].subscribed);
assert_eq!(transport.requests.len(), 6);
assert!(matches!(
+ transport.requests[3].command,
+ RadrootsSimplexSmpCommand::Sub
+ ));
+ assert_eq!(transport.requests[3].entity_id, b"recipient".to_vec());
+ assert!(matches!(
+ transport.requests[4].command,
+ RadrootsSimplexSmpCommand::Sub
+ ));
+ assert_eq!(transport.requests[4].entity_id, b"recipient-2".to_vec());
+ assert!(
+ !transport
+ .requests
+ .iter()
+ .any(|request| matches!(request.command, RadrootsSimplexSmpCommand::Get))
+ );
+ assert!(matches!(
runtime.drain_events(16).first(),
Some(RadrootsSimplexAgentRuntimeEvent::InvitationReady { .. })
));
@@ -1743,6 +1906,86 @@ mod tests {
}
#[test]
+ fn explicit_get_connection_message_executes_smp_get() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ assert!(matches!(
+ setup_transport.requests[1].command,
+ RadrootsSimplexSmpCommand::Sub
+ ));
+ assert_eq!(setup_transport.requests[1].entity_id, b"recipient".to_vec());
+ assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
+
+ runtime.get_connection_message(&created, 40).unwrap();
+ let mut get_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
+ runtime
+ .execute_ready_commands(&mut get_transport, 50, 16)
+ .unwrap();
+
+ assert_eq!(get_transport.requests.len(), 1);
+ assert!(matches!(
+ get_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::Get
+ ));
+ assert_eq!(get_transport.requests[0].entity_id, b"recipient".to_vec());
+ assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
+ }
+
+ #[test]
+ fn subscription_receive_routes_broker_transmission_by_entity_id() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ let receive_queue = runtime.store.receive_queues(&created).unwrap()[0].clone();
+ let _ = runtime.drain_events(16);
+
+ let mut subscription_transport = ScriptedTransport::with_subscription_responses(vec![
+ RadrootsSimplexSmpBrokerTransmission {
+ authorization: Vec::new(),
+ correlation_id: None,
+ entity_id: receive_queue.entity_id,
+ message: RadrootsSimplexSmpBrokerMessage::Err(RadrootsSimplexSmpError::NoMsg),
+ },
+ ]);
+ runtime
+ .receive_subscription_messages(&mut subscription_transport, 4)
+ .unwrap();
+
+ assert_eq!(subscription_transport.subscription_requests.len(), 2);
+ assert_eq!(
+ subscription_transport.subscription_requests[0].server,
+ receive_queue.descriptor.queue_uri.server
+ );
+ assert!(matches!(
+ runtime.drain_events(16).first(),
+ Some(RadrootsSimplexAgentRuntimeEvent::Error {
+ connection_id: Some(connection_id),
+ message,
+ }) if connection_id == &created && message.contains("NoMsg")
+ ));
+ }
+
+ #[test]
fn delivered_send_confirms_cursor_only_after_transport_success() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
let created = runtime
diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs
@@ -95,6 +95,9 @@ pub enum RadrootsSimplexAgentPendingCommandKind {
SubscribeQueue {
queue: RadrootsSimplexAgentQueueAddress,
},
+ GetQueueMessage {
+ queue: RadrootsSimplexAgentQueueAddress,
+ },
AckInboxMessage {
queue: RadrootsSimplexAgentQueueAddress,
broker_message_id: Vec<u8>,
@@ -243,6 +246,9 @@ enum RadrootsSimplexAgentPendingCommandKindSnapshot {
SubscribeQueue {
queue: RadrootsSimplexAgentQueueAddressSnapshot,
},
+ GetQueueMessage {
+ queue: RadrootsSimplexAgentQueueAddressSnapshot,
+ },
AckInboxMessage {
queue: RadrootsSimplexAgentQueueAddressSnapshot,
broker_message_id: Vec<u8>,
@@ -534,6 +540,39 @@ impl RadrootsSimplexAgentStore {
.collect())
}
+ pub fn subscribed_receive_servers(&self) -> Vec<RadrootsSimplexSmpServerAddress> {
+ let mut servers = Vec::new();
+ for connection in self.connections.values() {
+ for queue in &connection.queues {
+ if queue.role == RadrootsSimplexAgentQueueRole::Receive
+ && queue.subscribed
+ && !servers.contains(&queue.descriptor.queue_uri.server)
+ {
+ servers.push(queue.descriptor.queue_uri.server.clone());
+ }
+ }
+ }
+ servers
+ }
+
+ pub fn receive_queue_by_entity_id(
+ &self,
+ server: &RadrootsSimplexSmpServerAddress,
+ entity_id: &[u8],
+ ) -> Option<(String, RadrootsSimplexAgentQueueAddress)> {
+ for connection in self.connections.values() {
+ for queue in &connection.queues {
+ if queue.role == RadrootsSimplexAgentQueueRole::Receive
+ && queue.descriptor.queue_uri.server == *server
+ && queue.entity_id == entity_id
+ {
+ return Some((connection.id.clone(), queue.descriptor.queue_address()));
+ }
+ }
+ }
+ None
+ }
+
pub fn queue_auth_state(
&self,
connection_id: &str,
@@ -1075,6 +1114,11 @@ fn command_kind_to_snapshot(
queue: queue_address_to_snapshot(queue),
}
}
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue } => {
+ RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueMessage {
+ queue: queue_address_to_snapshot(queue),
+ }
+ }
RadrootsSimplexAgentPendingCommandKind::AckInboxMessage {
queue,
broker_message_id,
@@ -1138,6 +1182,11 @@ fn command_kind_from_snapshot(
queue: queue_address_from_snapshot(queue)?,
}
}
+ RadrootsSimplexAgentPendingCommandKindSnapshot::GetQueueMessage { queue } => {
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage {
+ queue: queue_address_from_snapshot(queue)?,
+ }
+ }
RadrootsSimplexAgentPendingCommandKindSnapshot::AckInboxMessage {
queue,
broker_message_id,
@@ -1373,11 +1422,12 @@ mod tests {
let prepared = store
.prepare_outbound_message(&connection.id, b"persisted".to_vec())
.unwrap();
+ let queue = sample_descriptor(true).queue_address();
store
.enqueue_command(
&connection.id,
RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
- queue: sample_descriptor(true).queue_address(),
+ queue: queue.clone(),
envelope: RadrootsSimplexAgentEnvelope::Invitation {
request: b"req".to_vec(),
connection_info: b"info".to_vec(),
@@ -1390,6 +1440,15 @@ mod tests {
10,
)
.unwrap();
+ store
+ .enqueue_command(
+ &connection.id,
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage {
+ queue: queue.clone(),
+ },
+ 11,
+ )
+ .unwrap();
store.flush().unwrap();
let loaded = RadrootsSimplexAgentStore::open(&path).unwrap();
@@ -1401,7 +1460,12 @@ mod tests {
message_hash: b"persisted".to_vec(),
})
);
- assert_eq!(loaded.pending_commands.len(), 1);
+ assert_eq!(loaded.pending_commands.len(), 2);
+ assert!(loaded.pending_commands.values().any(|command| matches!(
+ &command.kind,
+ RadrootsSimplexAgentPendingCommandKind::GetQueueMessage { queue: persisted_queue }
+ if persisted_queue == &queue
+ )));
assert!(
loaded
.primary_send_queue(&connection.id)
diff --git a/crates/simplex_interop_tests/src/lib.rs b/crates/simplex_interop_tests/src/lib.rs
@@ -40,19 +40,25 @@ mod tests {
};
use radroots_simplex_chat_proto::prelude::{decode_messages, encode_compressed_batch};
use radroots_simplex_smp_crypto::prelude::{
- RadrootsSimplexSmpCommandAuthorization, RadrootsSimplexSmpQueueAuthorizationMaterial,
- RadrootsSimplexSmpQueueAuthorizationScope, RadrootsSimplexSmpX25519Keypair,
+ RadrootsSimplexSmpCommandAuthorization, RadrootsSimplexSmpEd25519Keypair,
+ RadrootsSimplexSmpQueueAuthorizationMaterial, RadrootsSimplexSmpQueueAuthorizationScope,
+ RadrootsSimplexSmpX25519Keypair, encode_ed25519_public_key_x509,
+ encode_x25519_public_key_x509,
};
use radroots_simplex_smp_proto::prelude::{
RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION, RadrootsSimplexSmpBrokerMessage,
RadrootsSimplexSmpBrokerTransmission, RadrootsSimplexSmpCommand,
RadrootsSimplexSmpCommandTransmission, RadrootsSimplexSmpCorrelationId,
- RadrootsSimplexSmpMessageFlags, RadrootsSimplexSmpQueueIdsResponse,
- RadrootsSimplexSmpQueueMode, RadrootsSimplexSmpSendCommand,
+ RadrootsSimplexSmpMessageFlags, RadrootsSimplexSmpNewQueueRequest,
+ RadrootsSimplexSmpQueueIdsResponse, RadrootsSimplexSmpQueueMode,
+ RadrootsSimplexSmpQueueRequestData, RadrootsSimplexSmpSendCommand,
+ RadrootsSimplexSmpServerAddress, RadrootsSimplexSmpSubscriptionMode,
};
use radroots_simplex_smp_transport::prelude::{
- RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportBlock,
- RadrootsSimplexSmpTransportRequest, RadrootsSimplexSmpTransportResponse,
+ RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
+ RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTlsCommandTransport,
+ RadrootsSimplexSmpTransportBlock, RadrootsSimplexSmpTransportRequest,
+ RadrootsSimplexSmpTransportResponse,
};
fn ids_response(
@@ -71,6 +77,27 @@ mod tests {
})
}
+ fn correlation_id(byte: u8) -> RadrootsSimplexSmpCorrelationId {
+ RadrootsSimplexSmpCorrelationId::new([byte; RadrootsSimplexSmpCorrelationId::LENGTH])
+ }
+
+ fn live_transport_request(
+ server: RadrootsSimplexSmpServerAddress,
+ correlation_id: RadrootsSimplexSmpCorrelationId,
+ entity_id: Vec<u8>,
+ command: RadrootsSimplexSmpCommand,
+ authorization: RadrootsSimplexSmpCommandAuthorization,
+ ) -> RadrootsSimplexSmpTransportRequest {
+ RadrootsSimplexSmpTransportRequest {
+ server,
+ transport_version: RADROOTS_SIMPLEX_SMP_CURRENT_TRANSPORT_VERSION,
+ correlation_id: Some(correlation_id),
+ entity_id,
+ command,
+ authorization,
+ }
+ }
+
#[derive(Default)]
struct ScriptedTransport {
responses: VecDeque<RadrootsSimplexSmpBrokerMessage>,
@@ -309,4 +336,88 @@ mod tests {
};
target.assert_reachable().unwrap();
}
+
+ #[cfg(feature = "std")]
+ #[test]
+ fn local_upstream_subscribe_receives_sent_message_when_identity_is_configured() {
+ let Some(target) = RadrootsSimplexInteropLocalUpstream::from_env() else {
+ return;
+ };
+ let Some(server) = target.server_address() else {
+ return;
+ };
+
+ let recipient_auth = RadrootsSimplexSmpEd25519Keypair::generate().unwrap();
+ let recipient_dh = RadrootsSimplexSmpX25519Keypair::generate().unwrap();
+ let mut recipient_transport = RadrootsSimplexSmpTlsCommandTransport::new();
+ let create_response = recipient_transport
+ .execute(live_transport_request(
+ server.clone(),
+ correlation_id(1),
+ Vec::new(),
+ RadrootsSimplexSmpCommand::New(RadrootsSimplexSmpNewQueueRequest {
+ recipient_auth_public_key: encode_ed25519_public_key_x509(
+ &recipient_auth.public_key,
+ )
+ .unwrap(),
+ recipient_dh_public_key: encode_x25519_public_key_x509(
+ &recipient_dh.public_key,
+ )
+ .unwrap(),
+ basic_auth: None,
+ subscription_mode: RadrootsSimplexSmpSubscriptionMode::OnlyCreate,
+ queue_request_data: Some(RadrootsSimplexSmpQueueRequestData::Messaging(None)),
+ notifier_credentials: None,
+ }),
+ RadrootsSimplexSmpCommandAuthorization::Ed25519(recipient_auth.clone()),
+ ))
+ .unwrap();
+ let RadrootsSimplexSmpBrokerMessage::Ids(ids) = create_response.transmission.message else {
+ panic!("expected IDS response from live SMP queue creation");
+ };
+
+ let subscribe_response = recipient_transport
+ .execute(live_transport_request(
+ server.clone(),
+ correlation_id(2),
+ ids.recipient_id.clone(),
+ RadrootsSimplexSmpCommand::Sub,
+ RadrootsSimplexSmpCommandAuthorization::Ed25519(recipient_auth),
+ ))
+ .unwrap();
+ assert!(matches!(
+ subscribe_response.transmission.message,
+ RadrootsSimplexSmpBrokerMessage::Ok | RadrootsSimplexSmpBrokerMessage::Msg(_)
+ ));
+
+ let mut sender_transport = RadrootsSimplexSmpTlsCommandTransport::new();
+ let send_response = sender_transport
+ .execute(live_transport_request(
+ server.clone(),
+ correlation_id(3),
+ ids.sender_id,
+ RadrootsSimplexSmpCommand::Send(RadrootsSimplexSmpSendCommand {
+ flags: RadrootsSimplexSmpMessageFlags::notifications_enabled(),
+ message_body: b"rr-synth-live-subscribe-message".to_vec(),
+ }),
+ RadrootsSimplexSmpCommandAuthorization::None,
+ ))
+ .unwrap();
+ assert!(matches!(
+ send_response.transmission.message,
+ RadrootsSimplexSmpBrokerMessage::Ok
+ ));
+
+ let subscription_response = recipient_transport
+ .receive_subscription(RadrootsSimplexSmpSubscriptionReceiveRequest { server })
+ .unwrap()
+ .expect("expected live SMP subscription message");
+ let RadrootsSimplexSmpBrokerMessage::Msg(message) =
+ subscription_response.transmission.message
+ else {
+ panic!("expected MSG response from live SMP subscription");
+ };
+ assert!(!message.message_id.is_empty());
+ assert!(!message.encrypted_body.is_empty());
+ }
}
diff --git a/crates/simplex_interop_tests/src/policy.rs b/crates/simplex_interop_tests/src/policy.rs
@@ -1,4 +1,6 @@
use alloc::string::{String, ToString};
+#[cfg(feature = "std")]
+use alloc::vec;
use core::fmt;
use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri;
@@ -46,6 +48,7 @@ impl RadrootsSimplexInteropFixturePolicy {
pub struct RadrootsSimplexInteropLocalUpstream {
pub host: String,
pub port: u16,
+ pub server_identity: Option<String>,
}
#[cfg(feature = "std")]
@@ -56,7 +59,24 @@ impl RadrootsSimplexInteropLocalUpstream {
.ok()?
.parse::<u16>()
.ok()?;
- Some(Self { host, port })
+ let server_identity = std::env::var("RADROOTS_SIMPLEX_INTEROP_SMP_IDENTITY").ok();
+ Some(Self {
+ host,
+ port,
+ server_identity,
+ })
+ }
+
+ pub fn server_address(
+ &self,
+ ) -> Option<radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpServerAddress> {
+ Some(
+ radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpServerAddress {
+ server_identity: self.server_identity.clone()?,
+ hosts: vec![self.host.clone()],
+ port: Some(self.port),
+ },
+ )
}
pub fn assert_reachable(&self) -> Result<(), RadrootsSimplexInteropPolicyError> {
diff --git a/crates/simplex_smp_transport/src/client.rs b/crates/simplex_smp_transport/src/client.rs
@@ -2,7 +2,8 @@
use crate::error::RadrootsSimplexSmpTransportError;
use crate::executor::{
- RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportRequest,
+ RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
+ RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest,
RadrootsSimplexSmpTransportResponse,
};
use crate::frame::{RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE, RadrootsSimplexSmpTransportBlock};
@@ -22,7 +23,7 @@ use radroots_simplex_smp_crypto::prelude::{
use radroots_simplex_smp_proto::prelude::{
RADROOTS_SIMPLEX_SMP_AUTH_COMMANDS_TRANSPORT_VERSION,
RADROOTS_SIMPLEX_SMP_ENCRYPTED_BLOCK_TRANSPORT_VERSION, RadrootsSimplexSmpCommandTransmission,
- RadrootsSimplexSmpServerAddress,
+ RadrootsSimplexSmpCorrelationId, RadrootsSimplexSmpServerAddress,
};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
@@ -32,7 +33,7 @@ use rustls::{
};
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
-use std::io::{Read, Write};
+use std::io::{ErrorKind, Read, Write};
use std::net::{IpAddr, TcpStream, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
@@ -57,20 +58,23 @@ impl RadrootsSimplexSmpTlsCommandTransport {
Self::default()
}
- fn session_key(server: &RadrootsSimplexSmpServerAddress) -> String {
+ fn session_key(server: &RadrootsSimplexSmpServerAddress, kind: &str) -> String {
let mut key = server.server_identity.clone();
key.push('@');
key.push_str(&server.hosts.join(","));
key.push(':');
key.push_str(&server.port.unwrap_or(5223).to_string());
+ key.push('#');
+ key.push_str(kind);
key
}
fn session_for(
&mut self,
server: &RadrootsSimplexSmpServerAddress,
+ kind: &str,
) -> Result<&mut RadrootsSimplexSmpLiveSession, RadrootsSimplexSmpTransportError> {
- let key = Self::session_key(server);
+ let key = Self::session_key(server, kind);
if !self.sessions.contains_key(&key) {
let session = connect_live_session(server)?;
self.sessions.insert(key.clone(), session);
@@ -91,12 +95,16 @@ impl RadrootsSimplexSmpCommandTransport for RadrootsSimplexSmpTlsCommandTranspor
&mut self,
request: RadrootsSimplexSmpTransportRequest,
) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error> {
- let key = Self::session_key(&request.server);
- match execute_live_request(self.session_for(&request.server)?, &request) {
+ let session_kind = session_kind_for_command(&request.command);
+ let key = Self::session_key(&request.server, session_kind);
+ match execute_live_request(self.session_for(&request.server, session_kind)?, &request) {
Ok(response) => Ok(response),
Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
self.sessions.remove(&key);
- let response = execute_live_request(self.session_for(&request.server)?, &request);
+ let response = execute_live_request(
+ self.session_for(&request.server, session_kind)?,
+ &request,
+ );
match response {
Ok(response) => Ok(response),
Err(RadrootsSimplexSmpTransportError::LiveTransportIo(_)) => {
@@ -110,6 +118,42 @@ impl RadrootsSimplexSmpCommandTransport for RadrootsSimplexSmpTlsCommandTranspor
}
}
+impl RadrootsSimplexSmpSubscriptionTransport for RadrootsSimplexSmpTlsCommandTransport {
+ fn receive_subscription(
+ &mut self,
+ request: RadrootsSimplexSmpSubscriptionReceiveRequest,
+ ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error> {
+ let key = Self::session_key(&request.server, "subscription");
+ match read_live_response(
+ self.session_for(&request.server, "subscription")?,
+ &request.server,
+ None,
+ true,
+ ) {
+ Ok(response) => Ok(response),
+ Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error)) => {
+ self.sessions.remove(&key);
+ Err(RadrootsSimplexSmpTransportError::LiveTransportIo(error))
+ }
+ Err(error) => Err(error),
+ }
+ }
+}
+
+fn session_kind_for_command(
+ command: &radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand,
+) -> &'static str {
+ match command {
+ radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Sub
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Subs
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSub
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::NSubs => "subscription",
+ radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::Get
+ | radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpCommand::LGet => "poll",
+ _ => "command",
+ }
+}
+
fn execute_live_request(
session: &mut RadrootsSimplexSmpLiveSession,
request: &RadrootsSimplexSmpTransportRequest,
@@ -148,11 +192,28 @@ fn execute_live_request(
.flush()
.map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
+ read_live_response(session, &request.server, Some(correlation_id), false)?.ok_or_else(|| {
+ RadrootsSimplexSmpTransportError::LiveTransportIo(
+ "SMP command response was not available before the read timeout".into(),
+ )
+ })
+}
+
+fn read_live_response(
+ session: &mut RadrootsSimplexSmpLiveSession,
+ server: &RadrootsSimplexSmpServerAddress,
+ expected_correlation_id: Option<RadrootsSimplexSmpCorrelationId>,
+ timeout_is_empty: bool,
+) -> Result<Option<RadrootsSimplexSmpTransportResponse>, RadrootsSimplexSmpTransportError> {
let mut response_block = vec![0_u8; RADROOTS_SIMPLEX_SMP_TRANSPORT_BLOCK_SIZE];
- session
- .stream
- .read_exact(&mut response_block)
- .map_err(|error| RadrootsSimplexSmpTransportError::LiveTransportIo(error.to_string()))?;
+ if let Err(error) = session.stream.read_exact(&mut response_block) {
+ if timeout_is_empty && matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) {
+ return Ok(None);
+ }
+ return Err(RadrootsSimplexSmpTransportError::LiveTransportIo(
+ error.to_string(),
+ ));
+ }
let response_hash = Sha256::digest(&response_block).to_vec();
let decoded = decode_live_transport_block(session, &response_block)?;
let transmissions = decoded.decode_broker_transmissions(session.transport_version)?;
@@ -164,15 +225,17 @@ fn execute_live_request(
);
}
let transmission = transmissions.into_iter().next().expect("checked len");
- if transmission.correlation_id != Some(correlation_id) {
+ if let Some(expected_correlation_id) = expected_correlation_id
+ && transmission.correlation_id != Some(expected_correlation_id)
+ {
return Err(RadrootsSimplexSmpTransportError::CorrelationIdMismatch);
}
- Ok(RadrootsSimplexSmpTransportResponse {
- server: request.server.clone(),
+ Ok(Some(RadrootsSimplexSmpTransportResponse {
+ server: server.clone(),
transport_version: session.transport_version,
transmission,
transport_hash: response_hash,
- })
+ }))
}
fn transport_debug_enabled() -> bool {
diff --git a/crates/simplex_smp_transport/src/executor.rs b/crates/simplex_smp_transport/src/executor.rs
@@ -23,6 +23,11 @@ pub struct RadrootsSimplexSmpTransportResponse {
pub transport_hash: Vec<u8>,
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RadrootsSimplexSmpSubscriptionReceiveRequest {
+ pub server: RadrootsSimplexSmpServerAddress,
+}
+
pub trait RadrootsSimplexSmpCommandTransport {
type Error: core::fmt::Display;
@@ -31,3 +36,10 @@ pub trait RadrootsSimplexSmpCommandTransport {
request: RadrootsSimplexSmpTransportRequest,
) -> Result<RadrootsSimplexSmpTransportResponse, Self::Error>;
}
+
+pub trait RadrootsSimplexSmpSubscriptionTransport: RadrootsSimplexSmpCommandTransport {
+ fn receive_subscription(
+ &mut self,
+ request: RadrootsSimplexSmpSubscriptionReceiveRequest,
+ ) -> Result<Option<RadrootsSimplexSmpTransportResponse>, Self::Error>;
+}
diff --git a/crates/simplex_smp_transport/src/lib.rs b/crates/simplex_smp_transport/src/lib.rs
@@ -15,7 +15,8 @@ pub mod prelude {
pub use crate::client::RadrootsSimplexSmpTlsCommandTransport;
pub use crate::error::RadrootsSimplexSmpTransportError;
pub use crate::executor::{
- RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpTransportRequest,
+ RadrootsSimplexSmpCommandTransport, RadrootsSimplexSmpSubscriptionReceiveRequest,
+ RadrootsSimplexSmpSubscriptionTransport, RadrootsSimplexSmpTransportRequest,
RadrootsSimplexSmpTransportResponse,
};
pub use crate::frame::{