commit d1c414f253691e598db239869e2c32b0ae37f709
parent 5dc82088e9e9c544314e679bf878421cff9b28c2
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 08:22:15 +0000
simplex: reconcile inbound ack lifecycle
- surface inbound message hashes and delivered ACK events from the runtime
- dedupe pending ACK command enqueueing in the agent store
- let the app store mark inbound ACK records as delivered
Diffstat:
4 files changed, 126 insertions(+), 0 deletions(-)
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -724,6 +724,12 @@ impl RadrootsSimplexAgentRuntime {
receipt_info: Vec<u8>,
now: u64,
) -> Result<(), RadrootsSimplexAgentRuntimeError> {
+ if self
+ .store
+ .has_pending_ack_message(connection_id, message_id, &message_hash)
+ {
+ return Ok(());
+ }
let receive_queue = self
.store
.connection(connection_id)?
@@ -938,10 +944,19 @@ impl RadrootsSimplexAgentRuntime {
);
}
RadrootsSimplexAgentMessage::UserMessage(body) => {
+ let broker_message_id_hash = self
+ .store
+ .connection(connection_id)?
+ .last_received_broker_message_id
+ .as_ref()
+ .map(|broker_message_id| Sha256::digest(broker_message_id).to_vec())
+ .unwrap_or_default();
self.events
.push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived {
connection_id: connection_id.into(),
message_id: frame.header.message_id,
+ broker_message_id_hash,
+ message_hash: transport_hash,
body,
});
}
@@ -1534,6 +1549,15 @@ impl RadrootsSimplexAgentRuntime {
.mark_queue_tested(&command.connection_id, queue)?;
}
}
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { receipt, .. } => {
+ self.events.push_back(
+ RadrootsSimplexAgentRuntimeEvent::InboundMessageAckDelivered {
+ connection_id: command.connection_id.clone(),
+ message_id: receipt.message_id,
+ message_hash: receipt.message_hash.clone(),
+ },
+ );
+ }
RadrootsSimplexAgentPendingCommandKind::SecureGetQueueLinkData {
invitation,
reply_queue,
diff --git a/crates/simplex_agent_runtime/src/types.rs b/crates/simplex_agent_runtime/src/types.rs
@@ -25,8 +25,15 @@ pub enum RadrootsSimplexAgentRuntimeEvent {
MessageReceived {
connection_id: String,
message_id: u64,
+ broker_message_id_hash: Vec<u8>,
+ message_hash: Vec<u8>,
body: Vec<u8>,
},
+ InboundMessageAckDelivered {
+ connection_id: String,
+ message_id: u64,
+ message_hash: Vec<u8>,
+ },
MessageAcknowledged {
connection_id: String,
message_id: u64,
diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs
@@ -1130,6 +1130,22 @@ impl RadrootsSimplexAgentStore {
Ok(command)
}
+ pub fn has_pending_ack_message(
+ &self,
+ connection_id: &str,
+ message_id: RadrootsSimplexAgentMessageId,
+ message_hash: &[u8],
+ ) -> bool {
+ self.pending_commands.values().any(|command| {
+ command.connection_id == connection_id
+ && matches!(
+ &command.kind,
+ RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { receipt, .. }
+ if receipt.message_id == message_id && receipt.message_hash == message_hash
+ )
+ })
+ }
+
pub fn take_ready_commands(
&mut self,
now: u64,
diff --git a/crates/simplex_app_store/src/store.rs b/crates/simplex_app_store/src/store.rs
@@ -562,6 +562,50 @@ impl RadrootsSimplexAppStore {
collect_rows(statement.query_map([], inbound_message_from_row)?)
}
+ pub fn mark_inbound_ack_delivered(
+ &self,
+ connection_id: &str,
+ inbound_sequence: i64,
+ message_hash: &[u8],
+ ) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError>
+ {
+ if connection_id.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "connection id must not be empty".into(),
+ ));
+ }
+ if inbound_sequence < 0 {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "inbound sequence must not be negative".into(),
+ ));
+ }
+ if message_hash.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "message hash must not be empty".into(),
+ ));
+ }
+ self.connection.execute(
+ "UPDATE inbound_message_log
+ SET ack_status = 'acked'
+ WHERE connection_id = ?1
+ AND inbound_sequence = ?2
+ AND message_hash = ?3
+ AND ack_status = 'pending_ack'",
+ params![connection_id, inbound_sequence, message_hash],
+ )?;
+ self.connection
+ .query_row(
+ "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix
+ FROM inbound_message_log
+ WHERE connection_id = ?1 AND inbound_sequence = ?2 AND message_hash = ?3
+ LIMIT 1",
+ params![connection_id, inbound_sequence, message_hash],
+ inbound_message_from_row,
+ )
+ .optional()
+ .map_err(Into::into)
+ }
+
pub fn enqueue_outbox_message(
&self,
message: &RadrootsSimplexAppOutboxMessage,
@@ -1907,6 +1951,41 @@ mod tests {
}
#[test]
+ fn inbound_ack_delivery_marks_pending_row_acked() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+
+ let commit = store
+ .commit_inbound_text(&inbound_text_request())
+ .expect("commit");
+ let acked = store
+ .mark_inbound_ack_delivered("connection-1", 21, b"agent-message-hash-1")
+ .expect("ack")
+ .expect("row");
+
+ assert_eq!(acked.inbound_id, commit.inbound.inbound_id);
+ assert_eq!(acked.ack_status, "acked");
+ assert!(store.pending_ack_messages().expect("pending").is_empty());
+ assert_eq!(
+ store
+ .mark_inbound_ack_delivered("connection-1", 21, b"agent-message-hash-1")
+ .expect("idempotent")
+ .expect("row")
+ .ack_status,
+ "acked"
+ );
+ assert!(
+ store
+ .mark_inbound_ack_delivered("connection-1", 21, b"wrong-hash")
+ .expect("missing")
+ .is_none()
+ );
+ }
+
+ #[test]
fn invalid_inbound_text_does_not_create_chat_or_pending_ack() {
let temp = tempfile::tempdir().expect("temp");
let path = temp.path().join("simplex.sqlite");