lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit f1c95fc48520492078d6d15f8bdd5b8e2a637c05
parent 1fb597064489e6b5f8655506196e407b077f103f
Author: triesap <tyson@radroots.org>
Date:   Tue, 23 Jun 2026 09:54:27 +0000

simplex: harden message lifecycle state

- split inbound frame ACK state from decoded app child events
- add durable runtime correlation for queued outbox messages
- replay inbound ACKs from frame-specific protected runtime targets
- cover multi-child frames, ACK handles, and delivery transitions

Diffstat:
Mcrates/simplex_agent_runtime/src/runtime.rs | 18+++---------------
Mcrates/simplex_agent_store/src/lib.rs | 5+++--
Mcrates/simplex_agent_store/src/store.rs | 130+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/simplex_app_store/src/lib.rs | 20+++++++++++---------
Mcrates/simplex_app_store/src/model.rs | 19+++++++++++++++++++
Mcrates/simplex_app_store/src/store.rs | 578++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
6 files changed, 649 insertions(+), 121 deletions(-)

diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs @@ -730,24 +730,12 @@ impl RadrootsSimplexAgentRuntime { { return Ok(()); } - let receive_queue = self + let (receive_queue, broker_message_id) = self .store - .connection(connection_id)? - .last_received_queue - .clone() - .ok_or_else(|| { - RadrootsSimplexAgentRuntimeError::Runtime(format!( - "SimpleX connection `{connection_id}` has no received queue to acknowledge" - )) - })?; - let broker_message_id = self - .store - .connection(connection_id)? - .last_received_broker_message_id - .clone() + .inbound_ack_target(connection_id, message_id, &message_hash)? .ok_or_else(|| { RadrootsSimplexAgentRuntimeError::Runtime(format!( - "SimpleX connection `{connection_id}` has no broker message id to acknowledge" + "SimpleX connection `{connection_id}` has no frame-specific ACK target for message `{message_id}`" )) })?; self.store.enqueue_command( diff --git a/crates/simplex_agent_store/src/lib.rs b/crates/simplex_agent_store/src/lib.rs @@ -16,7 +16,8 @@ pub mod prelude { RadrootsSimplexAgentPendingCommandKind, RadrootsSimplexAgentPqKeypair, RadrootsSimplexAgentPreparedOutboundMessage, RadrootsSimplexAgentQueueAuthState, RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentQueueRole, - RadrootsSimplexAgentRecentMessageRecord, RadrootsSimplexAgentShortLinkCredentials, - RadrootsSimplexAgentStore, RadrootsSimplexAgentX3dhKeypair, + RadrootsSimplexAgentRecentMessageRecord, RadrootsSimplexAgentRecentQueueAddress, + RadrootsSimplexAgentShortLinkCredentials, RadrootsSimplexAgentStore, + RadrootsSimplexAgentX3dhKeypair, }; } diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs @@ -139,6 +139,47 @@ pub struct RadrootsSimplexAgentDeliveryCursor { pub struct RadrootsSimplexAgentRecentMessageRecord { pub message_id: RadrootsSimplexAgentMessageId, pub message_hash: Vec<u8>, + #[cfg_attr( + feature = "std", + serde(default, skip_serializing_if = "Option::is_none") + )] + pub inbound_queue: Option<RadrootsSimplexAgentRecentQueueAddress>, + #[cfg_attr( + feature = "std", + serde(default, skip_serializing_if = "Option::is_none") + )] + pub inbound_broker_message_id: Option<Vec<u8>>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +pub struct RadrootsSimplexAgentRecentQueueAddress { + pub server_identity: String, + pub hosts: Vec<String>, + pub port: Option<u16>, + pub sender_id: Vec<u8>, +} + +impl RadrootsSimplexAgentRecentQueueAddress { + fn from_queue_address(queue: &RadrootsSimplexAgentQueueAddress) -> Self { + Self { + server_identity: queue.server.server_identity.clone(), + hosts: queue.server.hosts.clone(), + port: queue.server.port, + sender_id: queue.sender_id.clone(), + } + } + + fn into_queue_address(self) -> RadrootsSimplexAgentQueueAddress { + RadrootsSimplexAgentQueueAddress { + server: RadrootsSimplexSmpServerAddress { + server_identity: self.server_identity, + hosts: self.hosts, + port: self.port, + }, + sender_id: self.sender_id, + } + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -1062,6 +1103,8 @@ impl RadrootsSimplexAgentStore { .push(RadrootsSimplexAgentRecentMessageRecord { message_id: staged.message_id, message_hash: staged.message_hash.clone(), + inbound_queue: None, + inbound_broker_message_id: None, }); Ok(staged) } @@ -1099,13 +1142,17 @@ impl RadrootsSimplexAgentStore { let connection = self.connection_mut(connection_id)?; connection.delivery_cursor.last_received_message_id = Some(message_id); connection.delivery_cursor.last_received_message_hash = Some(message_hash.clone()); - connection.last_received_queue = Some(queue_address); - connection.last_received_broker_message_id = Some(broker_message_id); + connection.last_received_queue = Some(queue_address.clone()); + connection.last_received_broker_message_id = Some(broker_message_id.clone()); connection .recent_messages .push(RadrootsSimplexAgentRecentMessageRecord { message_id, message_hash, + inbound_queue: Some(RadrootsSimplexAgentRecentQueueAddress::from_queue_address( + &queue_address, + )), + inbound_broker_message_id: Some(broker_message_id), }); Ok(()) } @@ -1146,6 +1193,29 @@ impl RadrootsSimplexAgentStore { }) } + pub fn inbound_ack_target( + &self, + connection_id: &str, + message_id: RadrootsSimplexAgentMessageId, + message_hash: &[u8], + ) -> Result<Option<(RadrootsSimplexAgentQueueAddress, Vec<u8>)>, RadrootsSimplexAgentStoreError> + { + let connection = self.connection(connection_id)?; + Ok(connection + .recent_messages + .iter() + .rev() + .find(|message| { + message.message_id == message_id && message.message_hash == message_hash + }) + .and_then(|message| { + Some(( + message.inbound_queue.clone()?.into_queue_address(), + message.inbound_broker_message_id.clone()?, + )) + })) + } + pub fn take_ready_commands( &mut self, now: u64, @@ -3558,6 +3628,62 @@ mod tests { assert_eq!(cursor.last_sent_message_hash, Some(b"ciphertext".to_vec())); } + #[test] + fn inbound_ack_target_uses_frame_specific_queue_after_cursor_moves() { + let mut store = RadrootsSimplexAgentStore::new(); + let connection = store.create_connection( + RadrootsSimplexAgentConnectionMode::Direct, + RadrootsSimplexAgentConnectionStatus::Connected, + None, + None, + ); + let first_queue = sample_descriptor(true).queue_address(); + let second_queue = sample_descriptor_with_uri( + "smp://aGVsbG8@relay.example/c2Vjb25k#/?v=4&dh=Zm9vYmFy&q=m", + true, + ) + .queue_address(); + + store + .record_inbound_message( + &connection.id, + first_queue.clone(), + b"first-broker-message".to_vec(), + 7, + b"first-message-hash".to_vec(), + ) + .unwrap(); + store + .record_inbound_message( + &connection.id, + second_queue.clone(), + b"second-broker-message".to_vec(), + 8, + b"second-message-hash".to_vec(), + ) + .unwrap(); + + assert_eq!( + store + .connection(&connection.id) + .unwrap() + .last_received_queue, + Some(second_queue.clone()) + ); + assert_eq!( + store + .inbound_ack_target(&connection.id, 7, b"first-message-hash") + .unwrap(), + Some((first_queue, b"first-broker-message".to_vec())) + ); + assert_eq!( + store + .inbound_ack_target(&connection.id, 8, b"second-message-hash") + .unwrap(), + Some((second_queue, b"second-broker-message".to_vec())) + ); + } + #[cfg(feature = "std")] #[test] fn flush_and_reopen_persisted_store_state() { diff --git a/crates/simplex_app_store/src/lib.rs b/crates/simplex_app_store/src/lib.rs @@ -15,11 +15,12 @@ pub mod prelude { pub use crate::model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry, - RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest, - RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest, - RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, - RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, + RadrootsSimplexAppInboundChildEvent, RadrootsSimplexAppInboundCommit, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppInboundTextRequest, + RadrootsSimplexAppInboundUnsupportedEventRequest, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, + RadrootsSimplexAppUnsupportedProtocolEvent, }; #[cfg(all(feature = "std", feature = "sqlcipher"))] pub use crate::store::RadrootsSimplexAppStore; @@ -29,10 +30,11 @@ pub use error::RadrootsSimplexAppStoreError; pub use model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry, - RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest, - RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest, - RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, + RadrootsSimplexAppInboundChildEvent, RadrootsSimplexAppInboundCommit, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppInboundTextRequest, + RadrootsSimplexAppInboundUnsupportedEventRequest, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, }; #[cfg(all(feature = "std", feature = "sqlcipher"))] diff --git a/crates/simplex_app_store/src/model.rs b/crates/simplex_app_store/src/model.rs @@ -93,6 +93,7 @@ pub struct RadrootsSimplexAppInboundMessageLogEntry { pub broker_message_id_hash: Vec<u8>, pub inbound_sequence: Option<i64>, pub message_hash: Vec<u8>, + pub runtime_ack_handle: String, pub ack_status: String, pub app_record_kind: String, pub app_record_id: String, @@ -100,6 +101,18 @@ pub struct RadrootsSimplexAppInboundMessageLogEntry { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsSimplexAppInboundChildEvent { + pub child_event_id: String, + pub inbound_id: String, + pub child_ordinal: u32, + pub app_record_kind: String, + pub app_record_id: String, + pub event_kind: String, + pub chat_msg_id: Option<String>, + pub received_at_unix: i64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RadrootsSimplexAppOutboxMessage { pub outbox_id: String, pub chat_item_id: String, @@ -108,6 +121,7 @@ pub struct RadrootsSimplexAppOutboxMessage { pub chat_msg_id: String, pub body: String, pub status: String, + pub runtime_message_id: Option<i64>, pub retry_after_unix: Option<i64>, pub created_at_unix: i64, } @@ -133,6 +147,8 @@ pub struct RadrootsSimplexAppInboundTextRequest { pub broker_message_id_hash: Vec<u8>, pub inbound_sequence: Option<i64>, pub message_hash: Vec<u8>, + pub runtime_ack_handle: String, + pub child_ordinal: u32, pub chat_msg_id: Option<String>, pub body: String, pub received_at_unix: i64, @@ -144,6 +160,8 @@ pub struct RadrootsSimplexAppInboundUnsupportedEventRequest { pub broker_message_id_hash: Vec<u8>, pub inbound_sequence: Option<i64>, pub message_hash: Vec<u8>, + pub runtime_ack_handle: String, + pub child_ordinal: u32, pub event_kind: String, pub payload_json: String, pub received_at_unix: i64, @@ -152,6 +170,7 @@ pub struct RadrootsSimplexAppInboundUnsupportedEventRequest { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RadrootsSimplexAppInboundCommit { pub inbound: RadrootsSimplexAppInboundMessageLogEntry, + pub child_event: RadrootsSimplexAppInboundChildEvent, pub chat_item: Option<RadrootsSimplexAppChatItem>, pub unsupported_event: Option<RadrootsSimplexAppUnsupportedProtocolEvent>, pub duplicate: bool, diff --git a/crates/simplex_app_store/src/store.rs b/crates/simplex_app_store/src/store.rs @@ -2,10 +2,11 @@ use crate::error::RadrootsSimplexAppStoreError; use crate::model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry, - RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest, - RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest, - RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, + RadrootsSimplexAppInboundChildEvent, RadrootsSimplexAppInboundCommit, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppInboundTextRequest, + RadrootsSimplexAppInboundUnsupportedEventRequest, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, }; use alloc::format; @@ -25,7 +26,7 @@ use std::path::Path; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use zeroize::Zeroize; -const CURRENT_SCHEMA_VERSION: i64 = 3; +const CURRENT_SCHEMA_VERSION: i64 = 4; const DEFAULT_KEYCHAIN_SERVICE: &str = "org.radroots.simplex.app-store"; const DATABASE_KEY_BYTES: usize = 32; const CHAT_MSG_ID_BYTES: usize = 12; @@ -339,13 +340,14 @@ impl RadrootsSimplexAppStore { chat_msg_id: chat_msg_id.to_owned(), body: request.body.clone(), status: "pending".to_owned(), + runtime_message_id: None, retry_after_unix: None, created_at_unix: request.created_at_unix, }; transaction.execute( "INSERT INTO outbox_messages - (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ outbox_message.outbox_id, outbox_message.chat_item_id, @@ -354,6 +356,7 @@ impl RadrootsSimplexAppStore { outbox_message.chat_msg_id, outbox_message.body, outbox_message.status, + outbox_message.runtime_message_id, outbox_message.retry_after_unix, outbox_message.created_at_unix ], @@ -388,14 +391,15 @@ impl RadrootsSimplexAppStore { ) -> Result<(), RadrootsSimplexAppStoreError> { self.connection.execute( "INSERT INTO inbound_message_log - (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ entry.inbound_id, entry.connection_id, entry.broker_message_id_hash, entry.inbound_sequence, entry.message_hash, + entry.runtime_ack_handle, entry.ack_status, entry.app_record_kind, entry.app_record_id, @@ -411,27 +415,26 @@ impl RadrootsSimplexAppStore { ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> { validate_inbound_text_request(request)?; let transaction = self.connection.unchecked_transaction()?; - if let Some(existing) = inbound_commit_by_identity( + let inbound = ensure_inbound_frame( &transaction, &request.connection_id, &request.broker_message_id_hash, request.inbound_sequence, &request.message_hash, - )? { + &request.runtime_ack_handle, + request.received_at_unix, + )?; + if let Some(existing) = + inbound_child_commit_by_ordinal(&transaction, &inbound, request.child_ordinal)? + { transaction.commit()?; return Ok(existing); } - let inbound_id = derive_inbound_local_id( - "inbound", - &request.connection_id, - &request.broker_message_id_hash, - &request.message_hash, - ); - let chat_item_id = derive_inbound_local_id( + let chat_item_id = derive_inbound_child_local_id( "chat", - &request.connection_id, - &request.broker_message_id_hash, - &request.message_hash, + &inbound.inbound_id, + request.child_ordinal, + request.chat_msg_id.as_deref().unwrap_or(""), ); let logical_order = next_logical_order(&transaction, &request.conversation_id)?; let chat_item = RadrootsSimplexAppChatItem { @@ -459,21 +462,26 @@ impl RadrootsSimplexAppStore { chat_item.created_at_unix ], )?; - let inbound = RadrootsSimplexAppInboundMessageLogEntry { - inbound_id, - connection_id: request.connection_id.clone(), - broker_message_id_hash: request.broker_message_id_hash.clone(), - inbound_sequence: request.inbound_sequence, - message_hash: request.message_hash.clone(), - ack_status: "pending_ack".to_owned(), + let child_event = RadrootsSimplexAppInboundChildEvent { + child_event_id: derive_inbound_child_local_id( + "child", + &inbound.inbound_id, + request.child_ordinal, + request.chat_msg_id.as_deref().unwrap_or(""), + ), + inbound_id: inbound.inbound_id.clone(), + child_ordinal: request.child_ordinal, app_record_kind: "chat_item".to_owned(), app_record_id: chat_item_id, + event_kind: "x.msg.new".to_owned(), + chat_msg_id: request.chat_msg_id.clone(), received_at_unix: request.received_at_unix, }; - insert_inbound_log(&transaction, &inbound)?; + insert_inbound_child_event(&transaction, &child_event)?; transaction.commit()?; Ok(RadrootsSimplexAppInboundCommit { inbound, + child_event, chat_item: Some(chat_item), unsupported_event: None, duplicate: false, @@ -486,27 +494,26 @@ impl RadrootsSimplexAppStore { ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> { validate_inbound_unsupported_request(request)?; let transaction = self.connection.unchecked_transaction()?; - if let Some(existing) = inbound_commit_by_identity( + let inbound = ensure_inbound_frame( &transaction, &request.connection_id, &request.broker_message_id_hash, request.inbound_sequence, &request.message_hash, - )? { + &request.runtime_ack_handle, + request.received_at_unix, + )?; + if let Some(existing) = + inbound_child_commit_by_ordinal(&transaction, &inbound, request.child_ordinal)? + { transaction.commit()?; return Ok(existing); } - let inbound_id = derive_inbound_local_id( - "inbound", - &request.connection_id, - &request.broker_message_id_hash, - &request.message_hash, - ); - let event_id = derive_inbound_local_id( + let event_id = derive_inbound_child_local_id( "unsupported", - &request.connection_id, - &request.broker_message_id_hash, - &request.message_hash, + &inbound.inbound_id, + request.child_ordinal, + &request.event_kind, ); let unsupported_event = RadrootsSimplexAppUnsupportedProtocolEvent { event_id: event_id.clone(), @@ -529,21 +536,26 @@ impl RadrootsSimplexAppStore { unsupported_event.received_at_unix ], )?; - let inbound = RadrootsSimplexAppInboundMessageLogEntry { - inbound_id, - connection_id: request.connection_id.clone(), - broker_message_id_hash: request.broker_message_id_hash.clone(), - inbound_sequence: request.inbound_sequence, - message_hash: request.message_hash.clone(), - ack_status: "pending_ack".to_owned(), + let child_event = RadrootsSimplexAppInboundChildEvent { + child_event_id: derive_inbound_child_local_id( + "child", + &inbound.inbound_id, + request.child_ordinal, + &request.event_kind, + ), + inbound_id: inbound.inbound_id.clone(), + child_ordinal: request.child_ordinal, app_record_kind: "unsupported_event".to_owned(), app_record_id: event_id, + event_kind: request.event_kind.clone(), + chat_msg_id: None, received_at_unix: request.received_at_unix, }; - insert_inbound_log(&transaction, &inbound)?; + insert_inbound_child_event(&transaction, &child_event)?; transaction.commit()?; Ok(RadrootsSimplexAppInboundCommit { inbound, + child_event, chat_item: None, unsupported_event: Some(unsupported_event), duplicate: false, @@ -554,7 +566,7 @@ impl RadrootsSimplexAppStore { &self, ) -> Result<Vec<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> { let mut statement = self.connection.prepare( - "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix + "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix FROM inbound_message_log WHERE ack_status = 'pending_ack' ORDER BY received_at_unix, inbound_id", @@ -584,22 +596,45 @@ impl RadrootsSimplexAppStore { "message hash must not be empty".into(), )); } + let Some(inbound) = self + .connection + .query_row( + "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix + FROM inbound_message_log + WHERE connection_id = ?1 AND inbound_sequence = ?2 AND message_hash = ?3 + LIMIT 1", + params![connection_id, inbound_sequence, message_hash], + inbound_message_from_row, + ) + .optional()? else { + return Ok(None); + }; + self.mark_inbound_ack_delivered_by_handle(&inbound.runtime_ack_handle) + } + + pub fn mark_inbound_ack_delivered_by_handle( + &self, + runtime_ack_handle: &str, + ) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> + { + if runtime_ack_handle.is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "runtime ack handle must not be empty".into(), + )); + } self.connection.execute( "UPDATE inbound_message_log SET ack_status = 'acked' - WHERE connection_id = ?1 - AND inbound_sequence = ?2 - AND message_hash = ?3 - AND ack_status = 'pending_ack'", - params![connection_id, inbound_sequence, message_hash], + WHERE runtime_ack_handle = ?1 AND ack_status = 'pending_ack'", + params![runtime_ack_handle], )?; self.connection .query_row( - "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix + "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix FROM inbound_message_log - WHERE connection_id = ?1 AND inbound_sequence = ?2 AND message_hash = ?3 + WHERE runtime_ack_handle = ?1 LIMIT 1", - params![connection_id, inbound_sequence, message_hash], + params![runtime_ack_handle], inbound_message_from_row, ) .optional() @@ -612,8 +647,8 @@ impl RadrootsSimplexAppStore { ) -> Result<(), RadrootsSimplexAppStoreError> { self.connection.execute( "INSERT INTO outbox_messages - (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ message.outbox_id, message.chat_item_id, @@ -622,6 +657,7 @@ impl RadrootsSimplexAppStore { message.chat_msg_id, message.body, message.status, + message.runtime_message_id, message.retry_after_unix, message.created_at_unix ], @@ -633,14 +669,53 @@ impl RadrootsSimplexAppStore { &self, ) -> Result<Vec<RadrootsSimplexAppOutboxMessage>, RadrootsSimplexAppStoreError> { let mut statement = self.connection.prepare( - "SELECT outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix + "SELECT outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix FROM outbox_messages - WHERE status IN ('pending', 'retryable') + WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL ORDER BY created_at_unix, outbox_id", )?; collect_rows(statement.query_map([], outbox_message_from_row)?) } + pub fn mark_outbox_message_queued( + &self, + outbox_id: &str, + runtime_message_id: u64, + ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { + if outbox_id.is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "outbox id must not be empty".into(), + )); + } + let transaction = self.connection.unchecked_transaction()?; + let Some(current) = outbound_text_by_outbox_id(&transaction, outbox_id)? else { + transaction.commit()?; + return Ok(None); + }; + match current.outbox_message.status.as_str() { + "pending" | "retryable" => {} + other => { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( + "cannot queue outbound message `{outbox_id}` from `{other}`" + ))); + } + } + let runtime_message_id = i64::try_from(runtime_message_id).map_err(|_| { + RadrootsSimplexAppStoreError::MessageLifecycle(format!( + "runtime message id `{runtime_message_id}` exceeds app-store range" + )) + })?; + transaction.execute( + "UPDATE outbox_messages + SET runtime_message_id = ?2 + WHERE outbox_id = ?1", + params![outbox_id, runtime_message_id], + )?; + let updated = outbound_text_by_outbox_id(&transaction, outbox_id)?; + transaction.commit()?; + Ok(updated) + } + pub fn mark_outbox_message_sent( &self, outbox_id: &str, @@ -667,6 +742,20 @@ impl RadrootsSimplexAppStore { )); } let transaction = self.connection.unchecked_transaction()?; + let Some(current) = outbound_text_by_outbox_id(&transaction, outbox_id)? else { + transaction.commit()?; + return Ok(None); + }; + match (current.outbox_message.status.as_str(), status) { + ("pending" | "retryable" | "sent", "sent") + | ("sent" | "acknowledged", "acknowledged") + | ("acknowledged", "sent") => {} + (current_status, next_status) => { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( + "cannot transition outbound message `{outbox_id}` from `{current_status}` to `{next_status}`" + ))); + } + } if terminal { transaction.execute( "UPDATE outbox_messages SET status = ?2 WHERE outbox_id = ?1", @@ -839,7 +928,7 @@ fn migrate( match user_version { 0 => { let transaction = connection.transaction()?; - apply_schema_v3(&transaction)?; + apply_schema_v4(&transaction)?; transaction.execute( "INSERT INTO encryption_metadata (id, key_slot_digest, key_source, cipher, created_at_unix) @@ -861,6 +950,11 @@ fn migrate( VALUES (3, 'message-lifecycle-inbound', ?1)", params![now_unix_secs()], )?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (4, 'message-lifecycle-frame-children', ?1)", + params![now_unix_secs()], + )?; transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; transaction.commit()?; } @@ -868,6 +962,7 @@ fn migrate( let transaction = connection.transaction()?; apply_migration_v2(&transaction)?; apply_migration_v3(&transaction)?; + apply_migration_v4(&transaction)?; transaction.execute( "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) VALUES (2, 'message-lifecycle-outbound', ?1)", @@ -878,17 +973,39 @@ fn migrate( VALUES (3, 'message-lifecycle-inbound', ?1)", params![now_unix_secs()], )?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (4, 'message-lifecycle-frame-children', ?1)", + params![now_unix_secs()], + )?; transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; transaction.commit()?; } 2 => { let transaction = connection.transaction()?; apply_migration_v3(&transaction)?; + apply_migration_v4(&transaction)?; transaction.execute( "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) VALUES (3, 'message-lifecycle-inbound', ?1)", params![now_unix_secs()], )?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (4, 'message-lifecycle-frame-children', ?1)", + params![now_unix_secs()], + )?; + transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; + transaction.commit()?; + } + 3 => { + let transaction = connection.transaction()?; + apply_migration_v4(&transaction)?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (4, 'message-lifecycle-frame-children', ?1)", + params![now_unix_secs()], + )?; transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; transaction.commit()?; } @@ -902,7 +1019,7 @@ fn migrate( Ok(()) } -fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { +fn apply_schema_v4(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { transaction.execute_batch( " CREATE TABLE encryption_metadata ( @@ -976,6 +1093,7 @@ fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA broker_message_id_hash BLOB NOT NULL, inbound_sequence INTEGER, message_hash BLOB NOT NULL, + runtime_ack_handle TEXT NOT NULL, ack_status TEXT NOT NULL, app_record_kind TEXT NOT NULL, app_record_id TEXT NOT NULL, @@ -983,6 +1101,18 @@ fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA UNIQUE(connection_id, broker_message_id_hash) ); + CREATE TABLE inbound_child_events ( + child_event_id TEXT PRIMARY KEY, + inbound_id TEXT NOT NULL REFERENCES inbound_message_log(inbound_id) ON DELETE CASCADE, + child_ordinal INTEGER NOT NULL, + app_record_kind TEXT NOT NULL, + app_record_id TEXT NOT NULL, + event_kind TEXT NOT NULL, + chat_msg_id TEXT, + received_at_unix INTEGER NOT NULL, + UNIQUE(inbound_id, child_ordinal) + ); + CREATE TABLE outbox_messages ( outbox_id TEXT PRIMARY KEY, chat_item_id TEXT NOT NULL REFERENCES chat_items(chat_item_id) ON DELETE CASCADE, @@ -991,6 +1121,7 @@ fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA chat_msg_id TEXT NOT NULL, body TEXT NOT NULL, status TEXT NOT NULL, + runtime_message_id INTEGER, retry_after_unix INTEGER, created_at_unix INTEGER NOT NULL ); @@ -1015,9 +1146,11 @@ fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA CREATE INDEX inbound_message_log_pending_ack_idx ON inbound_message_log(connection_id, inbound_id) WHERE ack_status = 'pending_ack'; + CREATE INDEX inbound_child_events_frame_idx + ON inbound_child_events(inbound_id, child_ordinal); CREATE INDEX outbox_messages_pending_retryable_idx ON outbox_messages(connection_id, outbox_id) - WHERE status IN ('pending', 'retryable'); + WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL; CREATE UNIQUE INDEX outbox_messages_connection_msg_id_idx ON outbox_messages(connection_id, chat_msg_id); CREATE UNIQUE INDEX outbox_messages_chat_item_idx @@ -1070,6 +1203,48 @@ fn apply_migration_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimpl Ok(()) } +fn apply_migration_v4(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { + transaction.execute_batch( + " + ALTER TABLE inbound_message_log ADD COLUMN runtime_ack_handle TEXT NOT NULL DEFAULT ''; + ALTER TABLE outbox_messages ADD COLUMN runtime_message_id INTEGER; + CREATE TABLE inbound_child_events ( + child_event_id TEXT PRIMARY KEY, + inbound_id TEXT NOT NULL REFERENCES inbound_message_log(inbound_id) ON DELETE CASCADE, + child_ordinal INTEGER NOT NULL, + app_record_kind TEXT NOT NULL, + app_record_id TEXT NOT NULL, + event_kind TEXT NOT NULL, + chat_msg_id TEXT, + received_at_unix INTEGER NOT NULL, + UNIQUE(inbound_id, child_ordinal) + ); + INSERT INTO inbound_child_events + (child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix) + SELECT + 'child_' || inbound_id, + inbound_id, + 0, + app_record_kind, + app_record_id, + app_record_kind, + NULL, + received_at_unix + FROM inbound_message_log + WHERE app_record_id <> ''; + UPDATE inbound_message_log + SET runtime_ack_handle = 'legacy:' || connection_id || ':' || COALESCE(CAST(inbound_sequence AS TEXT), inbound_id); + CREATE INDEX inbound_child_events_frame_idx + ON inbound_child_events(inbound_id, child_ordinal); + DROP INDEX IF EXISTS outbox_messages_pending_retryable_idx; + CREATE INDEX outbox_messages_pending_retryable_idx + ON outbox_messages(connection_id, outbox_id) + WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL; + ", + )?; + Ok(()) +} + fn verify_metadata( connection: &Connection, expected_key_slot_digest: &str, @@ -1148,14 +1323,15 @@ fn insert_inbound_log( ) -> Result<(), RadrootsSimplexAppStoreError> { transaction.execute( "INSERT INTO inbound_message_log - (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ inbound.inbound_id, inbound.connection_id, inbound.broker_message_id_hash, inbound.inbound_sequence, inbound.message_hash, + inbound.runtime_ack_handle, inbound.ack_status, inbound.app_record_kind, inbound.app_record_id, @@ -1213,6 +1389,7 @@ fn validate_inbound_text_request( &request.connection_id, &request.broker_message_id_hash, &request.message_hash, + &request.runtime_ack_handle, )?; if request.conversation_id.is_empty() { return Err(RadrootsSimplexAppStoreError::MessageLifecycle( @@ -1237,6 +1414,7 @@ fn validate_inbound_unsupported_request( &request.connection_id, &request.broker_message_id_hash, &request.message_hash, + &request.runtime_ack_handle, )?; if request.event_kind.is_empty() { return Err(RadrootsSimplexAppStoreError::MessageLifecycle( @@ -1255,6 +1433,7 @@ fn validate_inbound_identity( connection_id: &str, broker_message_id_hash: &[u8], message_hash: &[u8], + runtime_ack_handle: &str, ) -> Result<(), RadrootsSimplexAppStoreError> { if connection_id.is_empty() { return Err(RadrootsSimplexAppStoreError::MessageLifecycle( @@ -1271,6 +1450,11 @@ fn validate_inbound_identity( "message hash must not be empty".into(), )); } + if runtime_ack_handle.is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "runtime ack handle must not be empty".into(), + )); + } Ok(()) } @@ -1309,6 +1493,7 @@ fn outbound_text_by_msg_id( o.chat_msg_id, o.body, o.status, + o.runtime_message_id, o.retry_after_unix, o.created_at_unix FROM outbox_messages o @@ -1343,6 +1528,7 @@ fn outbound_text_by_outbox_id( o.chat_msg_id, o.body, o.status, + o.runtime_message_id, o.retry_after_unix, o.created_at_unix FROM outbox_messages o @@ -1355,17 +1541,17 @@ fn outbound_text_by_outbox_id( .map_err(Into::into) } -fn inbound_commit_by_identity( +fn inbound_frame_by_identity( transaction: &Transaction<'_>, connection_id: &str, broker_message_id_hash: &[u8], inbound_sequence: Option<i64>, message_hash: &[u8], -) -> Result<Option<RadrootsSimplexAppInboundCommit>, RadrootsSimplexAppStoreError> { - let inbound = match inbound_sequence { +) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> { + Ok(match inbound_sequence { Some(sequence) => transaction .query_row( - "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix + "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix FROM inbound_message_log WHERE connection_id = ?1 AND (broker_message_id_hash = ?2 OR (inbound_sequence = ?3 AND message_hash = ?4)) @@ -1377,7 +1563,7 @@ fn inbound_commit_by_identity( .optional()?, None => transaction .query_row( - "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix + "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix FROM inbound_message_log WHERE connection_id = ?1 AND broker_message_id_hash = ?2 ORDER BY received_at_unix, inbound_id @@ -1386,28 +1572,108 @@ fn inbound_commit_by_identity( inbound_message_from_row, ) .optional()?, + }) +} + +fn ensure_inbound_frame( + transaction: &Transaction<'_>, + connection_id: &str, + broker_message_id_hash: &[u8], + inbound_sequence: Option<i64>, + message_hash: &[u8], + runtime_ack_handle: &str, + received_at_unix: i64, +) -> Result<RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppStoreError> { + if let Some(existing) = inbound_frame_by_identity( + transaction, + connection_id, + broker_message_id_hash, + inbound_sequence, + message_hash, + )? { + return Ok(existing); + } + let inbound_id = derive_inbound_frame_local_id( + "inbound", + connection_id, + broker_message_id_hash, + message_hash, + ); + let inbound = RadrootsSimplexAppInboundMessageLogEntry { + inbound_id: inbound_id.clone(), + connection_id: connection_id.to_owned(), + broker_message_id_hash: broker_message_id_hash.to_vec(), + inbound_sequence, + message_hash: message_hash.to_vec(), + runtime_ack_handle: runtime_ack_handle.to_owned(), + ack_status: "pending_ack".to_owned(), + app_record_kind: "frame".to_owned(), + app_record_id: inbound_id, + received_at_unix, }; - let Some(inbound) = inbound else { + insert_inbound_log(transaction, &inbound)?; + Ok(inbound) +} + +fn inbound_child_commit_by_ordinal( + transaction: &Transaction<'_>, + inbound: &RadrootsSimplexAppInboundMessageLogEntry, + child_ordinal: u32, +) -> Result<Option<RadrootsSimplexAppInboundCommit>, RadrootsSimplexAppStoreError> { + let child_event = transaction + .query_row( + "SELECT child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix + FROM inbound_child_events + WHERE inbound_id = ?1 AND child_ordinal = ?2 + LIMIT 1", + params![inbound.inbound_id, i64::from(child_ordinal)], + inbound_child_event_from_row, + ) + .optional()?; + let Some(child_event) = child_event else { return Ok(None); }; - let chat_item = if inbound.app_record_kind == "chat_item" { - chat_item_by_id(transaction, &inbound.app_record_id)? + let chat_item = if child_event.app_record_kind == "chat_item" { + chat_item_by_id(transaction, &child_event.app_record_id)? } else { None }; - let unsupported_event = if inbound.app_record_kind == "unsupported_event" { - unsupported_event_by_id(transaction, &inbound.app_record_id)? + let unsupported_event = if child_event.app_record_kind == "unsupported_event" { + unsupported_event_by_id(transaction, &child_event.app_record_id)? } else { None }; Ok(Some(RadrootsSimplexAppInboundCommit { - inbound, + inbound: inbound.clone(), + child_event, chat_item, unsupported_event, duplicate: true, })) } +fn insert_inbound_child_event( + transaction: &Transaction<'_>, + child_event: &RadrootsSimplexAppInboundChildEvent, +) -> Result<(), RadrootsSimplexAppStoreError> { + transaction.execute( + "INSERT INTO inbound_child_events + (child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + child_event.child_event_id, + child_event.inbound_id, + i64::from(child_event.child_ordinal), + child_event.app_record_kind, + child_event.app_record_id, + child_event.event_kind, + child_event.chat_msg_id, + child_event.received_at_unix + ], + )?; + Ok(()) +} + fn chat_item_by_id( transaction: &Transaction<'_>, chat_item_id: &str, @@ -1451,7 +1717,7 @@ fn derive_outbound_local_id(prefix: &str, connection_id: &str, chat_msg_id: &str format!("{prefix}_{}", hex::encode(&digest[..16])) } -fn derive_inbound_local_id( +fn derive_inbound_frame_local_id( prefix: &str, connection_id: &str, broker_message_id_hash: &[u8], @@ -1469,6 +1735,24 @@ fn derive_inbound_local_id( format!("{prefix}_{}", hex::encode(&digest[..16])) } +fn derive_inbound_child_local_id( + prefix: &str, + inbound_id: &str, + child_ordinal: u32, + key: &str, +) -> String { + let mut hasher = Sha256::new(); + hasher.update(prefix.as_bytes()); + hasher.update([0]); + hasher.update(inbound_id.as_bytes()); + hasher.update([0]); + hasher.update(child_ordinal.to_be_bytes()); + hasher.update([0]); + hasher.update(key.as_bytes()); + let digest = hasher.finalize(); + format!("{prefix}_{}", hex::encode(&digest[..16])) +} + fn profile_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppProfile> { Ok(RadrootsSimplexAppProfile { profile_id: row.get(0)?, @@ -1549,8 +1833,9 @@ fn outbound_text_draft_from_row( chat_msg_id: row.get(12)?, body: row.get(13)?, status: row.get(14)?, - retry_after_unix: row.get(15)?, - created_at_unix: row.get(16)?, + runtime_message_id: row.get(15)?, + retry_after_unix: row.get(16)?, + created_at_unix: row.get(17)?, }, }) } @@ -1564,10 +1849,34 @@ fn inbound_message_from_row( broker_message_id_hash: row.get(2)?, inbound_sequence: row.get(3)?, message_hash: row.get(4)?, - ack_status: row.get(5)?, - app_record_kind: row.get(6)?, - app_record_id: row.get(7)?, - received_at_unix: row.get(8)?, + runtime_ack_handle: row.get(5)?, + ack_status: row.get(6)?, + app_record_kind: row.get(7)?, + app_record_id: row.get(8)?, + received_at_unix: row.get(9)?, + }) +} + +fn inbound_child_event_from_row( + row: &Row<'_>, +) -> rusqlite::Result<RadrootsSimplexAppInboundChildEvent> { + let child_ordinal: i64 = row.get(2)?; + let child_ordinal = u32::try_from(child_ordinal).map_err(|error| { + rusqlite::Error::FromSqlConversionFailure( + 2, + rusqlite::types::Type::Integer, + Box::new(error), + ) + })?; + Ok(RadrootsSimplexAppInboundChildEvent { + child_event_id: row.get(0)?, + inbound_id: row.get(1)?, + child_ordinal, + app_record_kind: row.get(3)?, + app_record_id: row.get(4)?, + event_kind: row.get(5)?, + chat_msg_id: row.get(6)?, + received_at_unix: row.get(7)?, }) } @@ -1580,8 +1889,9 @@ fn outbox_message_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexApp chat_msg_id: row.get(4)?, body: row.get(5)?, status: row.get(6)?, - retry_after_unix: row.get(7)?, - created_at_unix: row.get(8)?, + runtime_message_id: row.get(7)?, + retry_after_unix: row.get(8)?, + created_at_unix: row.get(9)?, }) } @@ -1687,6 +1997,8 @@ mod tests { broker_message_id_hash: b"broker-message-hash-1".to_vec(), inbound_sequence: Some(21), message_hash: b"agent-message-hash-1".to_vec(), + runtime_ack_handle: "ack-handle-1".into(), + child_ordinal: 0, chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), body: "hello from the iPhone".into(), received_at_unix: 12, @@ -1699,6 +2011,8 @@ mod tests { broker_message_id_hash: b"broker-message-hash-2".to_vec(), inbound_sequence: Some(22), message_hash: b"agent-message-hash-2".to_vec(), + runtime_ack_handle: "ack-handle-2".into(), + child_ordinal: 0, event_kind: "x.future.dm".into(), payload_json: "{\"event\":\"x.future.dm\"}".into(), received_at_unix: 13, @@ -1715,8 +2029,8 @@ mod tests { let diagnostics = store.diagnostics(); assert!(diagnostics.encrypted); assert!(!diagnostics.cipher.is_empty()); - assert_eq!(diagnostics.schema_version, 3); - assert_eq!(diagnostics.migration_count, 3); + assert_eq!(diagnostics.schema_version, 4); + assert_eq!(diagnostics.migration_count, 4); assert!(diagnostics.foreign_keys_enabled); assert!(diagnostics.wal_enabled); assert_eq!(diagnostics.key_source, "memory"); @@ -1784,6 +2098,7 @@ mod tests { broker_message_id_hash: b"broker-hash".to_vec(), inbound_sequence: Some(1), message_hash: b"message-hash".to_vec(), + runtime_ack_handle: "ack-handle-manual".into(), ack_status: "pending_ack".into(), app_record_kind: "chat_item".into(), app_record_id: "chat-2".into(), @@ -1804,6 +2119,7 @@ mod tests { chat_msg_id: "AQIDBAUGBwgJCgsM".into(), body: "queued plaintext before encryption".into(), status: "retryable".into(), + runtime_message_id: None, retry_after_unix: Some(9), created_at_unix: 9, }) @@ -1948,6 +2264,7 @@ mod tests { broker_message_id_hash: b"dedupe".to_vec(), inbound_sequence: Some(1), message_hash: b"hash".to_vec(), + runtime_ack_handle: "ack-handle-dedupe".into(), ack_status: "pending_ack".into(), app_record_kind: "chat_item".into(), app_record_id: "chat-1".into(), @@ -1975,9 +2292,11 @@ mod tests { assert!(!commit.duplicate); assert_eq!(commit.inbound.ack_status, "pending_ack"); - assert_eq!(commit.inbound.app_record_kind, "chat_item"); + assert_eq!(commit.inbound.app_record_kind, "frame"); + assert_eq!(commit.inbound.runtime_ack_handle, "ack-handle-1"); let chat_item = commit.chat_item.expect("chat item"); - assert_eq!(commit.inbound.app_record_id, chat_item.chat_item_id); + assert_eq!(commit.child_event.app_record_kind, "chat_item"); + assert_eq!(commit.child_event.app_record_id, chat_item.chat_item_id); assert_eq!( chat_item.direction, RadrootsSimplexAppChatDirection::Inbound @@ -2020,6 +2339,36 @@ mod tests { } #[test] + fn inbound_frame_persists_multiple_child_events_with_one_pending_ack() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let first = store + .commit_inbound_text(&inbound_text_request()) + .expect("first"); + let second = store + .commit_inbound_text(&RadrootsSimplexAppInboundTextRequest { + child_ordinal: 1, + chat_msg_id: Some("AgIDBAUGBwgJCgsM".into()), + body: "second child event".into(), + ..inbound_text_request() + }) + .expect("second"); + + assert_eq!(first.inbound.inbound_id, second.inbound.inbound_id); + assert_eq!(first.child_event.child_ordinal, 0); + assert_eq!(second.child_event.child_ordinal, 1); + assert_eq!(store.pending_ack_messages().expect("pending").len(), 1); + assert_eq!( + store.chat_page("conversation-1", 10).expect("page").len(), + 2 + ); + } + + #[test] fn inbound_unsupported_event_commit_persists_safe_record_and_pending_ack() { let temp = tempfile::tempdir().expect("temp"); let path = temp.path().join("simplex.sqlite"); @@ -2033,9 +2382,10 @@ mod tests { assert!(!commit.duplicate); assert_eq!(commit.inbound.ack_status, "pending_ack"); - assert_eq!(commit.inbound.app_record_kind, "unsupported_event"); + assert_eq!(commit.inbound.app_record_kind, "frame"); let unsupported = commit.unsupported_event.expect("unsupported event"); - assert_eq!(commit.inbound.app_record_id, unsupported.event_id); + assert_eq!(commit.child_event.app_record_kind, "unsupported_event"); + assert_eq!(commit.child_event.app_record_id, unsupported.event_id); assert_eq!(unsupported.event_kind, "x.future.dm"); assert_eq!(unsupported.status, "stored"); assert_eq!( @@ -2161,6 +2511,26 @@ mod tests { } #[test] + fn outbound_runtime_correlation_removes_message_from_retry_queue() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let draft = store + .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") + .expect("draft"); + let queued = store + .mark_outbox_message_queued(&draft.outbox_message.outbox_id, 42) + .expect("queued") + .expect("queued row"); + + assert_eq!(queued.outbox_message.runtime_message_id, Some(42)); + assert!(store.pending_outbox_messages().expect("pending").is_empty()); + } + + #[test] fn outbound_delivery_state_updates_are_idempotent() { let temp = tempfile::tempdir().expect("temp"); let path = temp.path().join("simplex.sqlite"); @@ -2213,6 +2583,28 @@ mod tests { } #[test] + fn outbound_delivery_transitions_fail_closed() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let draft = store + .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") + .expect("draft"); + let error = store + .mark_outbox_message_acknowledged(&draft.outbox_message.outbox_id) + .err() + .expect("transition error"); + + assert!(matches!( + error, + RadrootsSimplexAppStoreError::MessageLifecycle(_) + )); + } + + #[test] fn outbound_text_generates_twelve_byte_base64url_msg_id() { let temp = tempfile::tempdir().expect("temp"); let path = temp.path().join("simplex.sqlite");