commit 8cf8aa7c6cbd3654751e4cfed60777b75c365f08
parent d1c414f253691e598db239869e2c32b0ae37f709
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 08:30:12 +0000
simplex: add outbound delivery state events
- emit runtime events when outbound send commands are broker-delivered
- add idempotent app-store sent and acknowledged outbox transitions
- cover outbound delivery status ordering with focused tests
Diffstat:
3 files changed, 169 insertions(+), 1 deletion(-)
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -1535,9 +1535,15 @@ impl RadrootsSimplexAgentRuntime {
delivery: Some(delivery),
..
} => {
- let _ = self
+ let delivered = self
.store
.confirm_outbound_message(&command.connection_id, delivery.message_id)?;
+ self.events
+ .push_back(RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered {
+ connection_id: command.connection_id.clone(),
+ message_id: delivered.message_id,
+ message_hash: delivered.message_hash,
+ });
}
RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue } => {
self.store
@@ -3305,6 +3311,14 @@ mod tests {
.staged_outbound_message,
None
);
+ assert!(runtime.drain_events(64).into_iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::OutboundMessageDelivered {
+ connection_id,
+ message_id: 1,
+ message_hash,
+ } if connection_id == joined && !message_hash.is_empty()
+ )));
}
#[test]
diff --git a/crates/simplex_agent_runtime/src/types.rs b/crates/simplex_agent_runtime/src/types.rs
@@ -22,6 +22,11 @@ pub enum RadrootsSimplexAgentRuntimeEvent {
connection_id: String,
message_id: u64,
},
+ OutboundMessageDelivered {
+ connection_id: String,
+ message_id: u64,
+ message_hash: Vec<u8>,
+ },
MessageReceived {
connection_id: String,
message_id: u64,
diff --git a/crates/simplex_app_store/src/store.rs b/crates/simplex_app_store/src/store.rs
@@ -641,6 +641,69 @@ impl RadrootsSimplexAppStore {
collect_rows(statement.query_map([], outbox_message_from_row)?)
}
+ pub fn mark_outbox_message_sent(
+ &self,
+ outbox_id: &str,
+ ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> {
+ self.mark_outbox_message_delivery_status(outbox_id, "sent", false)
+ }
+
+ pub fn mark_outbox_message_acknowledged(
+ &self,
+ outbox_id: &str,
+ ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> {
+ self.mark_outbox_message_delivery_status(outbox_id, "acknowledged", true)
+ }
+
+ fn mark_outbox_message_delivery_status(
+ &self,
+ outbox_id: &str,
+ status: &str,
+ terminal: bool,
+ ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> {
+ if outbox_id.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "outbox id must not be empty".into(),
+ ));
+ }
+ let transaction = self.connection.unchecked_transaction()?;
+ if terminal {
+ transaction.execute(
+ "UPDATE outbox_messages SET status = ?2 WHERE outbox_id = ?1",
+ params![outbox_id, status],
+ )?;
+ transaction.execute(
+ "UPDATE chat_items
+ SET delivery_status = ?2
+ WHERE chat_item_id = (
+ SELECT chat_item_id FROM outbox_messages WHERE outbox_id = ?1
+ )",
+ params![outbox_id, status],
+ )?;
+ } else {
+ transaction.execute(
+ "UPDATE outbox_messages
+ SET status = CASE WHEN status = 'acknowledged' THEN status ELSE ?2 END
+ WHERE outbox_id = ?1",
+ params![outbox_id, status],
+ )?;
+ transaction.execute(
+ "UPDATE chat_items
+ SET delivery_status = CASE
+ WHEN delivery_status = 'acknowledged' THEN delivery_status
+ ELSE ?2
+ END
+ WHERE chat_item_id = (
+ SELECT chat_item_id FROM outbox_messages WHERE outbox_id = ?1
+ )",
+ params![outbox_id, status],
+ )?;
+ }
+ let updated = outbound_text_by_outbox_id(&transaction, outbox_id)?;
+ transaction.commit()?;
+ Ok(updated)
+ }
+
pub fn record_unsupported_protocol_event(
&self,
event: &RadrootsSimplexAppUnsupportedProtocolEvent,
@@ -1258,6 +1321,40 @@ fn outbound_text_by_msg_id(
.map_err(Into::into)
}
+fn outbound_text_by_outbox_id(
+ transaction: &Transaction<'_>,
+ outbox_id: &str,
+) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> {
+ transaction
+ .query_row(
+ "SELECT
+ c.chat_item_id,
+ c.conversation_id,
+ c.logical_order,
+ c.direction,
+ c.chat_msg_id,
+ c.body,
+ c.delivery_status,
+ c.created_at_unix,
+ o.outbox_id,
+ o.chat_item_id,
+ o.connection_id,
+ o.conversation_id,
+ o.chat_msg_id,
+ o.body,
+ o.status,
+ o.retry_after_unix,
+ o.created_at_unix
+ FROM outbox_messages o
+ JOIN chat_items c ON c.chat_item_id = o.chat_item_id
+ WHERE o.outbox_id = ?1",
+ params![outbox_id],
+ outbound_text_draft_from_row,
+ )
+ .optional()
+ .map_err(Into::into)
+}
+
fn inbound_commit_by_identity(
transaction: &Transaction<'_>,
connection_id: &str,
@@ -2064,6 +2161,58 @@ mod tests {
}
#[test]
+ fn outbound_delivery_state_updates_are_idempotent() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+
+ let draft = store
+ .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM")
+ .expect("draft");
+ let sent = store
+ .mark_outbox_message_sent(&draft.outbox_message.outbox_id)
+ .expect("sent")
+ .expect("sent row");
+
+ assert_eq!(sent.outbox_message.status, "sent");
+ assert_eq!(sent.chat_item.delivery_status, "sent");
+ assert!(store.pending_outbox_messages().expect("pending").is_empty());
+ assert_eq!(
+ store
+ .mark_outbox_message_sent(&draft.outbox_message.outbox_id)
+ .expect("sent again")
+ .expect("sent row")
+ .outbox_message
+ .status,
+ "sent"
+ );
+
+ let acknowledged = store
+ .mark_outbox_message_acknowledged(&draft.outbox_message.outbox_id)
+ .expect("acknowledged")
+ .expect("acknowledged row");
+ assert_eq!(acknowledged.outbox_message.status, "acknowledged");
+ assert_eq!(acknowledged.chat_item.delivery_status, "acknowledged");
+ assert_eq!(
+ store
+ .mark_outbox_message_sent(&draft.outbox_message.outbox_id)
+ .expect("sent after acknowledged")
+ .expect("row")
+ .outbox_message
+ .status,
+ "acknowledged"
+ );
+ assert!(
+ store
+ .mark_outbox_message_acknowledged("missing-outbox")
+ .expect("missing")
+ .is_none()
+ );
+ }
+
+ #[test]
fn outbound_text_generates_twelve_byte_base64url_msg_id() {
let temp = tempfile::tempdir().expect("temp");
let path = temp.path().join("simplex.sqlite");