lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 81d50491fea677a29d8d7594169cc1d8eeafd953
parent f1c95fc48520492078d6d15f8bdd5b8e2a637c05
Author: triesap <tyson@radroots.org>
Date:   Tue, 23 Jun 2026 10:02:20 +0000

simplex: validate inbound frame progress

- use agent-message hashes for inbound runtime cursor state
- reject inbound sequence gaps, regressions, and hash mismatches
- preserve exact duplicate frames for app-store dedupe and broker ACK targeting
- cover replay validation and duplicate ACK target selection

Diffstat:
Mcrates/simplex_agent_runtime/src/runtime.rs | 366++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 305 insertions(+), 61 deletions(-)

diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs @@ -823,7 +823,7 @@ impl RadrootsSimplexAgentRuntime { &mut self, connection_id: &str, message: RadrootsSimplexAgentDecryptedMessage, - transport_hash: Vec<u8>, + message_hash: Vec<u8>, ) -> Result<(), RadrootsSimplexAgentRuntimeError> { match message { RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => { @@ -885,72 +885,67 @@ impl RadrootsSimplexAgentRuntime { info, }); } - RadrootsSimplexAgentDecryptedMessage::Message(frame) => { - let _ = transport_hash; - match frame.message { - RadrootsSimplexAgentMessage::Hello => { - let connection = self.store.connection(connection_id)?; - let was_connected = - connection.status == RadrootsSimplexAgentConnectionStatus::Connected; - let should_send_hello = !connection.hello_sent; - { - let connection = self.store.connection_mut(connection_id)?; - connection.hello_received = true; - } - if should_send_hello { - self.enqueue_hello(connection_id, 0)?; - } - if !was_connected { - self.store.set_status( - connection_id, - RadrootsSimplexAgentConnectionStatus::Connected, - )?; - self.events.push_back( - RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { - connection_id: connection_id.into(), - }, - ); - } + RadrootsSimplexAgentDecryptedMessage::Message(frame) => match frame.message { + RadrootsSimplexAgentMessage::Hello => { + let connection = self.store.connection(connection_id)?; + let was_connected = + connection.status == RadrootsSimplexAgentConnectionStatus::Connected; + let should_send_hello = !connection.hello_sent; + { + let connection = self.store.connection_mut(connection_id)?; + connection.hello_received = true; } - RadrootsSimplexAgentMessage::Receipt(receipt) => { - self.events.push_back( - RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { - connection_id: connection_id.into(), - message_id: receipt.message_id, - }, - ); + if should_send_hello { + self.enqueue_hello(connection_id, 0)?; } - RadrootsSimplexAgentMessage::QueueAdd(_) - | RadrootsSimplexAgentMessage::QueueKey(_) - | RadrootsSimplexAgentMessage::QueueUse(_) - | RadrootsSimplexAgentMessage::QueueTest(_) - | RadrootsSimplexAgentMessage::QueueContinue(_) => { + if !was_connected { + self.store.set_status( + connection_id, + RadrootsSimplexAgentConnectionStatus::Connected, + )?; self.events.push_back( - RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { + RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { connection_id: connection_id.into(), }, ); } - RadrootsSimplexAgentMessage::UserMessage(body) => { - let broker_message_id_hash = self - .store - .connection(connection_id)? - .last_received_broker_message_id - .as_ref() - .map(|broker_message_id| Sha256::digest(broker_message_id).to_vec()) - .unwrap_or_default(); - self.events - .push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived { - connection_id: connection_id.into(), - message_id: frame.header.message_id, - broker_message_id_hash, - message_hash: transport_hash, - body, - }); - } - _ => {} } - } + RadrootsSimplexAgentMessage::Receipt(receipt) => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { + connection_id: connection_id.into(), + message_id: receipt.message_id, + }); + } + RadrootsSimplexAgentMessage::QueueAdd(_) + | RadrootsSimplexAgentMessage::QueueKey(_) + | RadrootsSimplexAgentMessage::QueueUse(_) + | RadrootsSimplexAgentMessage::QueueTest(_) + | RadrootsSimplexAgentMessage::QueueContinue(_) => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { + connection_id: connection_id.into(), + }); + } + RadrootsSimplexAgentMessage::UserMessage(body) => { + let broker_message_id_hash = self + .store + .connection(connection_id)? + .last_received_broker_message_id + .as_ref() + .map(|broker_message_id| Sha256::digest(broker_message_id).to_vec()) + .unwrap_or_default(); + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived { + connection_id: connection_id.into(), + message_id: frame.header.message_id, + broker_message_id_hash, + message_hash, + body, + }); + } + _ => {} + }, } self.flush_store()?; Ok(()) @@ -1852,6 +1847,15 @@ impl RadrootsSimplexAgentRuntime { } self.initialize_receiver_ratchet_from_confirmation(connection_id, &envelope)?; let decrypted = self.extract_decrypted_message(connection_id, &envelope)?; + let agent_message_hash = + if let RadrootsSimplexAgentDecryptedMessage::Message(frame) = &decrypted { + let encoded = encode_decrypted_message(&decrypted)?; + let message_hash = Sha256::digest(&encoded).to_vec(); + self.validate_inbound_frame_progress(connection_id, frame, &message_hash)?; + Some(message_hash) + } else { + None + }; { let connection = self.store.connection_mut(connection_id)?; connection.last_received_queue = Some(queue.clone()); @@ -1864,10 +1868,83 @@ impl RadrootsSimplexAgentRuntime { queue.clone(), message.message_id.clone(), frame.header.message_id, - transport_hash.clone(), + agent_message_hash.clone().unwrap_or_default(), )?; } - self.handle_inbound_decrypted_message(connection_id, decrypted, transport_hash) + self.handle_inbound_decrypted_message( + connection_id, + decrypted, + agent_message_hash.unwrap_or(transport_hash), + ) + } + + fn validate_inbound_frame_progress( + &self, + connection_id: &str, + frame: &RadrootsSimplexAgentMessageFrame, + message_hash: &[u8], + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + if frame.header.message_id == 0 { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id for `{connection_id}` must start at 1" + ))); + } + let connection = self.store.connection(connection_id)?; + let Some(last_message_id) = connection.delivery_cursor.last_received_message_id else { + if frame.header.message_id != 1 { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id for `{connection_id}` started at `{}` instead of `1`", + frame.header.message_id + ))); + } + if !frame.header.previous_message_hash.is_empty() { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX first inbound message for `{connection_id}` carried a previous-message hash" + ))); + } + return Ok(()); + }; + let last_message_hash = connection + .delivery_cursor + .last_received_message_hash + .as_deref() + .ok_or_else(|| { + RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX connection `{connection_id}` has a received message id without a message hash" + )) + })?; + if frame.header.message_id == last_message_id { + if message_hash == last_message_hash { + return Ok(()); + } + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id `{last_message_id}` for `{connection_id}` was replayed with a different message hash" + ))); + } + if frame.header.message_id < last_message_id { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id `{}` for `{connection_id}` regressed below `{last_message_id}`", + frame.header.message_id + ))); + } + let expected_message_id = last_message_id.checked_add(1).ok_or_else(|| { + RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id for `{connection_id}` overflowed" + )) + })?; + if frame.header.message_id != expected_message_id { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message id `{}` for `{connection_id}` skipped expected `{expected_message_id}`", + frame.header.message_id + ))); + } + if frame.header.previous_message_hash != last_message_hash { + return Err(RadrootsSimplexAgentRuntimeError::Runtime(format!( + "SimpleX inbound message `{}` for `{connection_id}` carried an unexpected previous-message hash", + frame.header.message_id + ))); + } + Ok(()) } fn decode_received_message_body( @@ -2509,6 +2586,29 @@ mod tests { }) } + fn user_message_frame( + message_id: u64, + previous_message_hash: Vec<u8>, + body: &[u8], + ) -> RadrootsSimplexAgentMessageFrame { + RadrootsSimplexAgentMessageFrame { + header: RadrootsSimplexAgentMessageHeader { + message_id, + previous_message_hash, + }, + message: RadrootsSimplexAgentMessage::UserMessage(body.to_vec()), + padding: Vec::new(), + } + } + + fn agent_message_hash(frame: &RadrootsSimplexAgentMessageFrame) -> Vec<u8> { + let encoded = encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message( + frame.clone(), + )) + .unwrap(); + Sha256::digest(&encoded).to_vec() + } + fn mark_connected(runtime: &mut RadrootsSimplexAgentRuntime, connection_id: &str) { runtime .store @@ -3133,6 +3233,150 @@ mod tests { } #[test] + fn inbound_progress_accepts_exact_duplicate_for_latest_ack_target() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let connection_id = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + mark_connected(&mut runtime, &connection_id); + let first_queue = reply_descriptor().queue_address(); + let second_queue = RadrootsSimplexAgentQueueAddress { + server: first_queue.server.clone(), + sender_id: b"second-duplicate-broker".to_vec(), + }; + let frame = user_message_frame(1, Vec::new(), b"first"); + let frame_hash = agent_message_hash(&frame); + + runtime + .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash) + .unwrap(); + runtime + .store + .record_inbound_message( + &connection_id, + first_queue, + b"first-broker-message".to_vec(), + frame.header.message_id, + frame_hash.clone(), + ) + .unwrap(); + runtime + .validate_inbound_frame_progress(&connection_id, &frame, &frame_hash) + .unwrap(); + runtime + .store + .record_inbound_message( + &connection_id, + second_queue.clone(), + b"second-broker-message".to_vec(), + frame.header.message_id, + frame_hash, + ) + .unwrap(); + + assert_eq!( + runtime + .store + .inbound_ack_target(&connection_id, 1, &agent_message_hash(&frame)) + .unwrap(), + Some((second_queue, b"second-broker-message".to_vec())) + ); + } + + #[test] + fn inbound_progress_rejects_gap_and_previous_hash_mismatch() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let connection_id = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + mark_connected(&mut runtime, &connection_id); + let queue = reply_descriptor().queue_address(); + let first_frame = user_message_frame(1, Vec::new(), b"first"); + let first_hash = agent_message_hash(&first_frame); + runtime + .store + .record_inbound_message( + &connection_id, + queue, + b"first-broker-message".to_vec(), + first_frame.header.message_id, + first_hash.clone(), + ) + .unwrap(); + + let gap_frame = user_message_frame(3, first_hash.clone(), b"gap"); + let gap_error = runtime + .validate_inbound_frame_progress( + &connection_id, + &gap_frame, + &agent_message_hash(&gap_frame), + ) + .unwrap_err(); + assert!(gap_error.to_string().contains("skipped expected `2`")); + + let mismatch_frame = user_message_frame(2, b"wrong-previous-hash".to_vec(), b"second"); + let mismatch_error = runtime + .validate_inbound_frame_progress( + &connection_id, + &mismatch_frame, + &agent_message_hash(&mismatch_frame), + ) + .unwrap_err(); + assert!( + mismatch_error + .to_string() + .contains("unexpected previous-message hash") + ); + } + + #[test] + fn inbound_progress_rejects_regression_after_accepted_next_message() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let connection_id = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + mark_connected(&mut runtime, &connection_id); + let queue = reply_descriptor().queue_address(); + let first_frame = user_message_frame(1, Vec::new(), b"first"); + let first_hash = agent_message_hash(&first_frame); + let second_frame = user_message_frame(2, first_hash.clone(), b"second"); + let second_hash = agent_message_hash(&second_frame); + runtime + .store + .record_inbound_message( + &connection_id, + queue.clone(), + b"first-broker-message".to_vec(), + first_frame.header.message_id, + first_hash, + ) + .unwrap(); + runtime + .validate_inbound_frame_progress(&connection_id, &second_frame, &second_hash) + .unwrap(); + runtime + .store + .record_inbound_message( + &connection_id, + queue, + b"second-broker-message".to_vec(), + second_frame.header.message_id, + second_hash, + ) + .unwrap(); + + let regression_frame = user_message_frame(1, Vec::new(), b"first"); + let regression_error = runtime + .validate_inbound_frame_progress( + &connection_id, + &regression_frame, + &agent_message_hash(&regression_frame), + ) + .unwrap_err(); + assert!(regression_error.to_string().contains("regressed below `2`")); + } + + #[test] fn send_message_requires_connected_state() { let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); let created = runtime