commit 1a94b73948b7fbd18636114b7ce70e706d89d4b9
parent d230245c5721d9bff853893b2aa421479ca417d1
Author: triesap <tyson@radroots.org>
Date: Mon, 22 Jun 2026 22:35:50 +0000
simplex: complete duplex hello lifecycle
- gate user messages on connected runtime state
- persist sent and received HELLO lifecycle flags
- enqueue SKEY and HELLO commands during approval flow
- cover connected-state and restart persistence behavior
Diffstat:
3 files changed, 286 insertions(+), 11 deletions(-)
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -339,6 +339,13 @@ impl RadrootsSimplexAgentRuntime {
local_info: Vec<u8>,
now: u64,
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ if self.store.connection(connection_id)?.status
+ != RadrootsSimplexAgentConnectionStatus::AwaitingApproval
+ {
+ return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
+ "SimpleX connection `{connection_id}` is not awaiting approval"
+ )));
+ }
self.store
.set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?;
let send_queue = self.store.primary_send_queue(connection_id)?;
@@ -413,6 +420,11 @@ impl RadrootsSimplexAgentRuntime {
) -> Result<u64, RadrootsSimplexAgentRuntimeError> {
let send_queue = self.store.primary_send_queue(connection_id)?;
let connection = self.store.connection(connection_id)?;
+ if connection.status != RadrootsSimplexAgentConnectionStatus::Connected {
+ return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!(
+ "SimpleX connection `{connection_id}` is not connected"
+ )));
+ }
if connection.staged_outbound_message.is_some() {
return Err(RadrootsSimplexAgentRuntimeError::Store(
radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError::PendingOutboundMessage(
@@ -466,6 +478,16 @@ impl RadrootsSimplexAgentRuntime {
Ok(message_id)
}
+ pub fn send_hello(
+ &mut self,
+ connection_id: &str,
+ now: u64,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ self.enqueue_hello(connection_id, now)?;
+ self.flush_store()?;
+ Ok(())
+ }
+
pub fn ack_message(
&mut self,
connection_id: &str,
@@ -583,6 +605,13 @@ impl RadrootsSimplexAgentRuntime {
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
match message {
RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => {
+ if self.store.connection(connection_id)?.status
+ != RadrootsSimplexAgentConnectionStatus::Connected
+ {
+ self.store
+ .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?;
+ }
+ self.enqueue_hello(connection_id, 0)?;
self.events
.push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo {
connection_id: connection_id.into(),
@@ -590,10 +619,13 @@ impl RadrootsSimplexAgentRuntime {
});
}
RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => {
+ let mut secure_queues = Vec::new();
for descriptor in reply_queues {
let auth_state = self.store.generate_queue_auth_state()?;
let mut descriptor = descriptor;
descriptor.sender_key = Some(auth_state.public_key.clone());
+ let secure_queue = descriptor.queue_address();
+ let sender_key = descriptor.sender_key.clone();
self.store.add_queue(
connection_id,
descriptor,
@@ -601,11 +633,19 @@ impl RadrootsSimplexAgentRuntime {
true,
auth_state,
)?;
+ secure_queues.push((secure_queue, sender_key));
}
self.store.set_status(
connection_id,
RadrootsSimplexAgentConnectionStatus::AwaitingApproval,
)?;
+ for (queue, sender_key) in secure_queues {
+ self.store.enqueue_command(
+ connection_id,
+ RadrootsSimplexAgentPendingCommandKind::SecureQueue { queue, sender_key },
+ 0,
+ )?;
+ }
self.events
.push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired {
connection_id: connection_id.into(),
@@ -627,15 +667,28 @@ impl RadrootsSimplexAgentRuntime {
let _ = transport_hash;
match frame.message {
RadrootsSimplexAgentMessage::Hello => {
- self.store.set_status(
- connection_id,
- RadrootsSimplexAgentConnectionStatus::Connected,
- )?;
- self.events.push_back(
- RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished {
- connection_id: connection_id.into(),
- },
- );
+ let connection = self.store.connection(connection_id)?;
+ let was_connected =
+ connection.status == RadrootsSimplexAgentConnectionStatus::Connected;
+ let should_send_hello = !connection.hello_sent;
+ {
+ let connection = self.store.connection_mut(connection_id)?;
+ connection.hello_received = true;
+ }
+ if should_send_hello {
+ self.enqueue_hello(connection_id, 0)?;
+ }
+ if !was_connected {
+ self.store.set_status(
+ connection_id,
+ RadrootsSimplexAgentConnectionStatus::Connected,
+ )?;
+ self.events.push_back(
+ RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished {
+ connection_id: connection_id.into(),
+ },
+ );
+ }
}
RadrootsSimplexAgentMessage::Receipt(receipt) => {
self.events.push_back(
@@ -1141,6 +1194,57 @@ impl RadrootsSimplexAgentRuntime {
Ok(())
}
+ fn enqueue_hello(
+ &mut self,
+ connection_id: &str,
+ now: u64,
+ ) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ if self.store.connection(connection_id)?.hello_sent {
+ return Ok(());
+ }
+ let send_queue = self.store.primary_send_queue(connection_id)?;
+ let connection = self.store.connection(connection_id)?;
+ let previous_hash = connection
+ .delivery_cursor
+ .last_sent_message_hash
+ .clone()
+ .unwrap_or_default();
+ let message_id = connection
+ .delivery_cursor
+ .last_sent_message_id
+ .unwrap_or(0)
+ .saturating_add(1);
+ let frame = RadrootsSimplexAgentMessageFrame {
+ header: RadrootsSimplexAgentMessageHeader {
+ message_id,
+ previous_message_hash: previous_hash,
+ },
+ message: RadrootsSimplexAgentMessage::Hello,
+ padding: Vec::new(),
+ };
+ let ciphertext =
+ encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?;
+ let message_hash = Sha256::digest(&ciphertext).to_vec();
+ let prepared = self
+ .store
+ .prepare_outbound_message(connection_id, message_hash)?;
+ let encrypted = self.next_encrypted_payload(connection_id, ciphertext, None)?;
+ self.store.enqueue_command(
+ connection_id,
+ RadrootsSimplexAgentPendingCommandKind::SendEnvelope {
+ queue: send_queue.descriptor.queue_address(),
+ envelope: RadrootsSimplexAgentEnvelope::Message(encrypted),
+ delivery: Some(RadrootsSimplexAgentOutboundMessage {
+ message_id: prepared.message_id,
+ message_hash: prepared.message_hash,
+ }),
+ },
+ now,
+ )?;
+ self.store.connection_mut(connection_id)?.hello_sent = true;
+ Ok(())
+ }
+
fn encode_smp_message_body(
&self,
connection_id: &str,
@@ -1708,6 +1812,36 @@ mod tests {
.unwrap()
}
+ fn reply_descriptor() -> RadrootsSimplexAgentQueueDescriptor {
+ RadrootsSimplexAgentQueueDescriptor {
+ queue_uri: reply_queue(),
+ replaced_queue: None,
+ primary: true,
+ sender_key: None,
+ }
+ }
+
+ fn hello_message(message_id: u64) -> RadrootsSimplexAgentDecryptedMessage {
+ RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame {
+ header: RadrootsSimplexAgentMessageHeader {
+ message_id,
+ previous_message_hash: Vec::new(),
+ },
+ message: RadrootsSimplexAgentMessage::Hello,
+ padding: Vec::new(),
+ })
+ }
+
+ fn mark_connected(runtime: &mut RadrootsSimplexAgentRuntime, connection_id: &str) {
+ runtime
+ .store
+ .set_status(
+ connection_id,
+ RadrootsSimplexAgentConnectionStatus::Connected,
+ )
+ .unwrap();
+ }
+
fn ids_response(
recipient_id: &[u8],
sender_id: &[u8],
@@ -1986,6 +2120,111 @@ mod tests {
}
#[test]
+ fn send_message_requires_connected_state() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+ let invitation = runtime
+ .store
+ .connection(&created)
+ .unwrap()
+ .invitation
+ .clone()
+ .unwrap();
+ let joined = runtime
+ .join_connection(invitation, reply_queue(), 20)
+ .unwrap();
+
+ let error = runtime
+ .send_message(&joined, b"blocked before connected".to_vec(), 30)
+ .unwrap_err();
+ assert!(error.to_string().contains("is not connected"));
+ }
+
+ #[test]
+ fn allow_and_hello_lifecycle_reaches_connected() {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ runtime
+ .store
+ .connection_mut(&created)
+ .unwrap()
+ .shared_secret = Some(vec![3_u8; 32]);
+
+ runtime
+ .handle_inbound_decrypted_message(
+ &created,
+ RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply {
+ reply_queues: vec![reply_descriptor()],
+ info: b"peer-info".to_vec(),
+ },
+ b"reply-confirmation".to_vec(),
+ )
+ .unwrap();
+ assert_eq!(
+ runtime.store.connection(&created).unwrap().status,
+ RadrootsSimplexAgentConnectionStatus::AwaitingApproval
+ );
+
+ runtime
+ .allow_connection(&created, b"local-info".to_vec(), 40)
+ .unwrap();
+ let mut allow_transport = ScriptedTransport::with_responses(vec![
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut allow_transport, 50, 16)
+ .unwrap();
+ assert!(matches!(
+ allow_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::SKey(_)
+ ));
+ assert!(matches!(
+ allow_transport.requests[1].command,
+ RadrootsSimplexSmpCommand::Send(_)
+ ));
+ assert!(!runtime.store.connection(&created).unwrap().hello_sent);
+
+ runtime
+ .handle_inbound_decrypted_message(&created, hello_message(1), b"hello-in".to_vec())
+ .unwrap();
+ let connection = runtime.store.connection(&created).unwrap();
+ assert_eq!(
+ connection.status,
+ RadrootsSimplexAgentConnectionStatus::Connected
+ );
+ assert!(connection.hello_sent);
+ assert!(connection.hello_received);
+ assert!(runtime.drain_events(16).into_iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { connection_id }
+ if connection_id == created
+ )));
+
+ let mut hello_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
+ runtime
+ .execute_ready_commands(&mut hello_transport, 60, 16)
+ .unwrap();
+ assert_eq!(hello_transport.requests.len(), 1);
+ assert!(matches!(
+ hello_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::Send(_)
+ ));
+ }
+
+ #[test]
fn delivered_send_confirms_cursor_only_after_transport_success() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
let created = runtime
@@ -2013,6 +2252,7 @@ mod tests {
runtime
.execute_ready_commands(&mut setup_transport, 30, 16)
.unwrap();
+ mark_connected(&mut runtime, &joined);
let message_id = runtime
.send_message(&joined, b"hello simplex".to_vec(), 40)
@@ -2075,6 +2315,7 @@ mod tests {
runtime
.execute_ready_commands(&mut setup_transport, 30, 16)
.unwrap();
+ mark_connected(&mut runtime, &joined);
runtime
.send_message(&joined, b"hello simplex".to_vec(), 40)
@@ -2161,6 +2402,7 @@ mod tests {
runtime
.execute_ready_commands(&mut setup_transport, 30, 16)
.unwrap();
+ mark_connected(&mut runtime, &joined);
runtime
.send_message(&joined, b"hello simplex".to_vec(), 40)
diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs
@@ -137,6 +137,8 @@ pub struct RadrootsSimplexAgentConnectionRecord {
pub last_received_broker_message_id: Option<Vec<u8>>,
pub recent_messages: Vec<RadrootsSimplexAgentRecentMessageRecord>,
pub staged_outbound_message: Option<RadrootsSimplexAgentOutboundMessage>,
+ pub hello_sent: bool,
+ pub hello_received: bool,
}
#[cfg(feature = "std")]
@@ -165,6 +167,8 @@ struct RadrootsSimplexAgentConnectionSnapshot {
last_received_broker_message_id: Option<Vec<u8>>,
recent_messages: Vec<RadrootsSimplexAgentRecentMessageRecord>,
staged_outbound_message: Option<RadrootsSimplexAgentOutboundMessage>,
+ hello_sent: bool,
+ hello_received: bool,
}
#[cfg(feature = "std")]
@@ -378,6 +382,8 @@ impl RadrootsSimplexAgentStore {
last_received_broker_message_id: None,
recent_messages: Vec::new(),
staged_outbound_message: None,
+ hello_sent: false,
+ hello_received: false,
};
self.connections.insert(id, record.clone());
record
@@ -846,6 +852,8 @@ fn connection_to_snapshot(
last_received_broker_message_id: record.last_received_broker_message_id,
recent_messages: record.recent_messages,
staged_outbound_message: record.staged_outbound_message,
+ hello_sent: record.hello_sent,
+ hello_received: record.hello_received,
})
}
@@ -888,6 +896,8 @@ fn connection_from_snapshot(
last_received_broker_message_id: snapshot.last_received_broker_message_id,
recent_messages: snapshot.recent_messages,
staged_outbound_message: snapshot.staged_outbound_message,
+ hello_sent: snapshot.hello_sent,
+ hello_received: snapshot.hello_received,
})
}
@@ -1449,6 +1459,11 @@ mod tests {
11,
)
.unwrap();
+ {
+ let connection = store.connection_mut(&connection.id).unwrap();
+ connection.hello_sent = true;
+ connection.hello_received = true;
+ }
store.flush().unwrap();
let loaded = RadrootsSimplexAgentStore::open(&path).unwrap();
@@ -1460,6 +1475,8 @@ mod tests {
message_hash: b"persisted".to_vec(),
})
);
+ assert!(loaded_connection.hello_sent);
+ assert!(loaded_connection.hello_received);
assert_eq!(loaded.pending_commands.len(), 2);
assert!(loaded.pending_commands.values().any(|command| matches!(
&command.kind,
diff --git a/crates/simplex_interop_tests/src/lib.rs b/crates/simplex_interop_tests/src/lib.rs
@@ -317,12 +317,28 @@ mod tests {
.execute_ready_commands(&mut join_transport, 40, 16)
.unwrap();
runtime
- .allow_connection(&joined, b"rr-synth-info".to_vec(), 50)
+ .handle_inbound_decrypted_message(
+ &joined,
+ RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame {
+ header: RadrootsSimplexAgentMessageHeader {
+ message_id: 1,
+ previous_message_hash: Vec::new(),
+ },
+ message: RadrootsSimplexAgentMessage::Hello,
+ padding: Vec::new(),
+ }),
+ b"rr-synth-hello".to_vec(),
+ )
+ .unwrap();
+ let mut hello_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
+ runtime
+ .execute_ready_commands(&mut hello_transport, 50, 16)
.unwrap();
let message_id = runtime
.send_message(&joined, b"rr-synth-chat".to_vec(), 60)
.unwrap();
- assert_eq!(message_id, 1);
+ assert_eq!(message_id, 2);
runtime.reconnect_connection(&joined, 70).unwrap();
assert!(!runtime.retry_pending(70 + 5_000, 64).is_empty());
assert!(created.starts_with("conn-"));