commit 5dc82088e9e9c544314e679bf878421cff9b28c2
parent 052890803eca186461aa28f39870ba5461cdcd5c
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 08:06:57 +0000
simplex: add durable inbound app commits
- add inbound text and unsupported-event commit APIs
- link inbound logs to committed app records with pending_ack state
- deduplicate redelivery by broker and protocol message identity
- cover inbound commit, duplicate, unsupported, and pre-commit failure paths
Diffstat:
3 files changed, 579 insertions(+), 23 deletions(-)
diff --git a/crates/simplex_app_store/src/lib.rs b/crates/simplex_app_store/src/lib.rs
@@ -15,10 +15,11 @@ pub mod prelude {
pub use crate::model::{
RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection,
RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics,
- RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft,
- RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage,
- RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint,
- RadrootsSimplexAppUnsupportedProtocolEvent,
+ RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry,
+ RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest,
+ RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest,
+ RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile,
+ RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent,
};
#[cfg(all(feature = "std", feature = "sqlcipher"))]
pub use crate::store::RadrootsSimplexAppStore;
@@ -28,9 +29,10 @@ pub use error::RadrootsSimplexAppStoreError;
pub use model::{
RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection,
RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics,
- RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft,
- RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage,
- RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint,
+ RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry,
+ RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest,
+ RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest,
+ RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint,
RadrootsSimplexAppUnsupportedProtocolEvent,
};
#[cfg(all(feature = "std", feature = "sqlcipher"))]
diff --git a/crates/simplex_app_store/src/model.rs b/crates/simplex_app_store/src/model.rs
@@ -94,6 +94,8 @@ pub struct RadrootsSimplexAppInboundMessageLogEntry {
pub inbound_sequence: Option<i64>,
pub message_hash: Vec<u8>,
pub ack_status: String,
+ pub app_record_kind: String,
+ pub app_record_id: String,
pub received_at_unix: i64,
}
@@ -125,6 +127,37 @@ pub struct RadrootsSimplexAppOutboundTextDraft {
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RadrootsSimplexAppInboundTextRequest {
+ pub connection_id: String,
+ pub conversation_id: String,
+ pub broker_message_id_hash: Vec<u8>,
+ pub inbound_sequence: Option<i64>,
+ pub message_hash: Vec<u8>,
+ pub chat_msg_id: Option<String>,
+ pub body: String,
+ pub received_at_unix: i64,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RadrootsSimplexAppInboundUnsupportedEventRequest {
+ pub connection_id: String,
+ pub broker_message_id_hash: Vec<u8>,
+ pub inbound_sequence: Option<i64>,
+ pub message_hash: Vec<u8>,
+ pub event_kind: String,
+ pub payload_json: String,
+ pub received_at_unix: i64,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RadrootsSimplexAppInboundCommit {
+ pub inbound: RadrootsSimplexAppInboundMessageLogEntry,
+ pub chat_item: Option<RadrootsSimplexAppChatItem>,
+ pub unsupported_event: Option<RadrootsSimplexAppUnsupportedProtocolEvent>,
+ pub duplicate: bool,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RadrootsSimplexAppUnsupportedProtocolEvent {
pub event_id: String,
pub connection_id: Option<String>,
diff --git a/crates/simplex_app_store/src/store.rs b/crates/simplex_app_store/src/store.rs
@@ -2,9 +2,10 @@ use crate::error::RadrootsSimplexAppStoreError;
use crate::model::{
RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection,
RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics,
- RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft,
- RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage,
- RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint,
+ RadrootsSimplexAppInboundCommit, RadrootsSimplexAppInboundMessageLogEntry,
+ RadrootsSimplexAppInboundTextRequest, RadrootsSimplexAppInboundUnsupportedEventRequest,
+ RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppOutboundTextRequest,
+ RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint,
RadrootsSimplexAppUnsupportedProtocolEvent,
};
use alloc::format;
@@ -24,7 +25,7 @@ use std::path::Path;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use zeroize::Zeroize;
-const CURRENT_SCHEMA_VERSION: i64 = 2;
+const CURRENT_SCHEMA_VERSION: i64 = 3;
const DEFAULT_KEYCHAIN_SERVICE: &str = "org.radroots.simplex.app-store";
const DATABASE_KEY_BYTES: usize = 32;
const CHAT_MSG_ID_BYTES: usize = 12;
@@ -387,8 +388,8 @@ impl RadrootsSimplexAppStore {
) -> Result<(), RadrootsSimplexAppStoreError> {
self.connection.execute(
"INSERT INTO inbound_message_log
- (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, received_at_unix)
- VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
+ (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
entry.inbound_id,
entry.connection_id,
@@ -396,19 +397,166 @@ impl RadrootsSimplexAppStore {
entry.inbound_sequence,
entry.message_hash,
entry.ack_status,
+ entry.app_record_kind,
+ entry.app_record_id,
entry.received_at_unix
],
)?;
Ok(())
}
+ pub fn commit_inbound_text(
+ &self,
+ request: &RadrootsSimplexAppInboundTextRequest,
+ ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> {
+ validate_inbound_text_request(request)?;
+ let transaction = self.connection.unchecked_transaction()?;
+ if let Some(existing) = inbound_commit_by_identity(
+ &transaction,
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ request.inbound_sequence,
+ &request.message_hash,
+ )? {
+ transaction.commit()?;
+ return Ok(existing);
+ }
+ let inbound_id = derive_inbound_local_id(
+ "inbound",
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ );
+ let chat_item_id = derive_inbound_local_id(
+ "chat",
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ );
+ let logical_order = next_logical_order(&transaction, &request.conversation_id)?;
+ let chat_item = RadrootsSimplexAppChatItem {
+ chat_item_id: chat_item_id.clone(),
+ conversation_id: request.conversation_id.clone(),
+ logical_order,
+ direction: RadrootsSimplexAppChatDirection::Inbound,
+ chat_msg_id: request.chat_msg_id.clone(),
+ body: request.body.clone(),
+ delivery_status: "received".to_owned(),
+ created_at_unix: request.received_at_unix,
+ };
+ transaction.execute(
+ "INSERT INTO chat_items
+ (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
+ params![
+ chat_item.chat_item_id,
+ chat_item.conversation_id,
+ chat_item.logical_order,
+ chat_item.direction.as_str(),
+ chat_item.chat_msg_id,
+ chat_item.body,
+ chat_item.delivery_status,
+ chat_item.created_at_unix
+ ],
+ )?;
+ let inbound = RadrootsSimplexAppInboundMessageLogEntry {
+ inbound_id,
+ connection_id: request.connection_id.clone(),
+ broker_message_id_hash: request.broker_message_id_hash.clone(),
+ inbound_sequence: request.inbound_sequence,
+ message_hash: request.message_hash.clone(),
+ ack_status: "pending_ack".to_owned(),
+ app_record_kind: "chat_item".to_owned(),
+ app_record_id: chat_item_id,
+ received_at_unix: request.received_at_unix,
+ };
+ insert_inbound_log(&transaction, &inbound)?;
+ transaction.commit()?;
+ Ok(RadrootsSimplexAppInboundCommit {
+ inbound,
+ chat_item: Some(chat_item),
+ unsupported_event: None,
+ duplicate: false,
+ })
+ }
+
+ pub fn commit_inbound_unsupported_event(
+ &self,
+ request: &RadrootsSimplexAppInboundUnsupportedEventRequest,
+ ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> {
+ validate_inbound_unsupported_request(request)?;
+ let transaction = self.connection.unchecked_transaction()?;
+ if let Some(existing) = inbound_commit_by_identity(
+ &transaction,
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ request.inbound_sequence,
+ &request.message_hash,
+ )? {
+ transaction.commit()?;
+ return Ok(existing);
+ }
+ let inbound_id = derive_inbound_local_id(
+ "inbound",
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ );
+ let event_id = derive_inbound_local_id(
+ "unsupported",
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ );
+ let unsupported_event = RadrootsSimplexAppUnsupportedProtocolEvent {
+ event_id: event_id.clone(),
+ connection_id: Some(request.connection_id.clone()),
+ event_kind: request.event_kind.clone(),
+ payload_json: request.payload_json.clone(),
+ status: "stored".to_owned(),
+ received_at_unix: request.received_at_unix,
+ };
+ transaction.execute(
+ "INSERT INTO unsupported_protocol_events
+ (event_id, connection_id, event_kind, payload_json, status, received_at_unix)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
+ params![
+ unsupported_event.event_id,
+ unsupported_event.connection_id,
+ unsupported_event.event_kind,
+ unsupported_event.payload_json,
+ unsupported_event.status,
+ unsupported_event.received_at_unix
+ ],
+ )?;
+ let inbound = RadrootsSimplexAppInboundMessageLogEntry {
+ inbound_id,
+ connection_id: request.connection_id.clone(),
+ broker_message_id_hash: request.broker_message_id_hash.clone(),
+ inbound_sequence: request.inbound_sequence,
+ message_hash: request.message_hash.clone(),
+ ack_status: "pending_ack".to_owned(),
+ app_record_kind: "unsupported_event".to_owned(),
+ app_record_id: event_id,
+ received_at_unix: request.received_at_unix,
+ };
+ insert_inbound_log(&transaction, &inbound)?;
+ transaction.commit()?;
+ Ok(RadrootsSimplexAppInboundCommit {
+ inbound,
+ chat_item: None,
+ unsupported_event: Some(unsupported_event),
+ duplicate: false,
+ })
+ }
+
pub fn pending_ack_messages(
&self,
) -> Result<Vec<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> {
let mut statement = self.connection.prepare(
- "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, received_at_unix
+ "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix
FROM inbound_message_log
- WHERE ack_status = 'pending'
+ WHERE ack_status = 'pending_ack'
ORDER BY received_at_unix, inbound_id",
)?;
collect_rows(statement.query_map([], inbound_message_from_row)?)
@@ -584,7 +732,7 @@ fn migrate(
match user_version {
0 => {
let transaction = connection.transaction()?;
- apply_schema_v2(&transaction)?;
+ apply_schema_v3(&transaction)?;
transaction.execute(
"INSERT INTO encryption_metadata
(id, key_slot_digest, key_source, cipher, created_at_unix)
@@ -601,17 +749,39 @@ fn migrate(
VALUES (2, 'message-lifecycle-outbound', ?1)",
params![now_unix_secs()],
)?;
+ transaction.execute(
+ "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix)
+ VALUES (3, 'message-lifecycle-inbound', ?1)",
+ params![now_unix_secs()],
+ )?;
transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?;
transaction.commit()?;
}
1 => {
let transaction = connection.transaction()?;
apply_migration_v2(&transaction)?;
+ apply_migration_v3(&transaction)?;
transaction.execute(
"INSERT INTO simplex_schema_migrations (version, name, applied_at_unix)
VALUES (2, 'message-lifecycle-outbound', ?1)",
params![now_unix_secs()],
)?;
+ transaction.execute(
+ "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix)
+ VALUES (3, 'message-lifecycle-inbound', ?1)",
+ params![now_unix_secs()],
+ )?;
+ transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?;
+ transaction.commit()?;
+ }
+ 2 => {
+ let transaction = connection.transaction()?;
+ apply_migration_v3(&transaction)?;
+ transaction.execute(
+ "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix)
+ VALUES (3, 'message-lifecycle-inbound', ?1)",
+ params![now_unix_secs()],
+ )?;
transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?;
transaction.commit()?;
}
@@ -625,7 +795,7 @@ fn migrate(
Ok(())
}
-fn apply_schema_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> {
+fn apply_schema_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> {
transaction.execute_batch(
"
CREATE TABLE encryption_metadata (
@@ -700,6 +870,8 @@ fn apply_schema_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA
inbound_sequence INTEGER,
message_hash BLOB NOT NULL,
ack_status TEXT NOT NULL,
+ app_record_kind TEXT NOT NULL,
+ app_record_id TEXT NOT NULL,
received_at_unix INTEGER NOT NULL,
UNIQUE(connection_id, broker_message_id_hash)
);
@@ -735,7 +907,7 @@ fn apply_schema_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA
WHERE inbound_sequence IS NOT NULL;
CREATE INDEX inbound_message_log_pending_ack_idx
ON inbound_message_log(connection_id, inbound_id)
- WHERE ack_status = 'pending';
+ WHERE ack_status = 'pending_ack';
CREATE INDEX outbox_messages_pending_retryable_idx
ON outbox_messages(connection_id, outbox_id)
WHERE status IN ('pending', 'retryable');
@@ -775,6 +947,22 @@ fn apply_migration_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimpl
Ok(())
}
+fn apply_migration_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> {
+ transaction.execute_batch(
+ "
+ ALTER TABLE inbound_message_log ADD COLUMN app_record_kind TEXT NOT NULL DEFAULT 'inbound_log';
+ ALTER TABLE inbound_message_log ADD COLUMN app_record_id TEXT NOT NULL DEFAULT '';
+ UPDATE inbound_message_log
+ SET app_record_id = inbound_id
+ WHERE app_record_id = '';
+ UPDATE inbound_message_log
+ SET ack_status = 'pending_ack'
+ WHERE ack_status = 'pending';
+ ",
+ )?;
+ Ok(())
+}
+
fn verify_metadata(
connection: &Connection,
expected_key_slot_digest: &str,
@@ -847,6 +1035,29 @@ fn collect_rows<T>(
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
+fn insert_inbound_log(
+ transaction: &Transaction<'_>,
+ inbound: &RadrootsSimplexAppInboundMessageLogEntry,
+) -> Result<(), RadrootsSimplexAppStoreError> {
+ transaction.execute(
+ "INSERT INTO inbound_message_log
+ (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
+ params![
+ inbound.inbound_id,
+ inbound.connection_id,
+ inbound.broker_message_id_hash,
+ inbound.inbound_sequence,
+ inbound.message_hash,
+ inbound.ack_status,
+ inbound.app_record_kind,
+ inbound.app_record_id,
+ inbound.received_at_unix
+ ],
+ )?;
+ Ok(())
+}
+
fn generate_chat_msg_id() -> Result<String, RadrootsSimplexAppStoreError> {
let mut bytes = [0_u8; CHAT_MSG_ID_BYTES];
getrandom(&mut bytes).map_err(|_| {
@@ -888,6 +1099,74 @@ fn validate_outbound_text_request(
Ok(())
}
+fn validate_inbound_text_request(
+ request: &RadrootsSimplexAppInboundTextRequest,
+) -> Result<(), RadrootsSimplexAppStoreError> {
+ validate_inbound_identity(
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ )?;
+ if request.conversation_id.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "conversation id must not be empty".into(),
+ ));
+ }
+ if request.body.trim().is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "inbound text must not be empty".into(),
+ ));
+ }
+ if let Some(chat_msg_id) = &request.chat_msg_id {
+ validate_chat_msg_id(chat_msg_id)?;
+ }
+ Ok(())
+}
+
+fn validate_inbound_unsupported_request(
+ request: &RadrootsSimplexAppInboundUnsupportedEventRequest,
+) -> Result<(), RadrootsSimplexAppStoreError> {
+ validate_inbound_identity(
+ &request.connection_id,
+ &request.broker_message_id_hash,
+ &request.message_hash,
+ )?;
+ if request.event_kind.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "unsupported event kind must not be empty".into(),
+ ));
+ }
+ if request.payload_json.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "unsupported event payload must not be empty".into(),
+ ));
+ }
+ Ok(())
+}
+
+fn validate_inbound_identity(
+ connection_id: &str,
+ broker_message_id_hash: &[u8],
+ message_hash: &[u8],
+) -> Result<(), RadrootsSimplexAppStoreError> {
+ if connection_id.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "connection id must not be empty".into(),
+ ));
+ }
+ if broker_message_id_hash.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "broker message id hash must not be empty".into(),
+ ));
+ }
+ if message_hash.is_empty() {
+ return Err(RadrootsSimplexAppStoreError::MessageLifecycle(
+ "message hash must not be empty".into(),
+ ));
+ }
+ Ok(())
+}
+
fn next_logical_order(
transaction: &Transaction<'_>,
conversation_id: &str,
@@ -935,6 +1214,91 @@ fn outbound_text_by_msg_id(
.map_err(Into::into)
}
+fn inbound_commit_by_identity(
+ transaction: &Transaction<'_>,
+ connection_id: &str,
+ broker_message_id_hash: &[u8],
+ inbound_sequence: Option<i64>,
+ message_hash: &[u8],
+) -> Result<Option<RadrootsSimplexAppInboundCommit>, RadrootsSimplexAppStoreError> {
+ let inbound = match inbound_sequence {
+ Some(sequence) => transaction
+ .query_row(
+ "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix
+ FROM inbound_message_log
+ WHERE connection_id = ?1
+ AND (broker_message_id_hash = ?2 OR (inbound_sequence = ?3 AND message_hash = ?4))
+ ORDER BY received_at_unix, inbound_id
+ LIMIT 1",
+ params![connection_id, broker_message_id_hash, sequence, message_hash],
+ inbound_message_from_row,
+ )
+ .optional()?,
+ None => transaction
+ .query_row(
+ "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, ack_status, app_record_kind, app_record_id, received_at_unix
+ FROM inbound_message_log
+ WHERE connection_id = ?1 AND broker_message_id_hash = ?2
+ ORDER BY received_at_unix, inbound_id
+ LIMIT 1",
+ params![connection_id, broker_message_id_hash],
+ inbound_message_from_row,
+ )
+ .optional()?,
+ };
+ let Some(inbound) = inbound else {
+ return Ok(None);
+ };
+ let chat_item = if inbound.app_record_kind == "chat_item" {
+ chat_item_by_id(transaction, &inbound.app_record_id)?
+ } else {
+ None
+ };
+ let unsupported_event = if inbound.app_record_kind == "unsupported_event" {
+ unsupported_event_by_id(transaction, &inbound.app_record_id)?
+ } else {
+ None
+ };
+ Ok(Some(RadrootsSimplexAppInboundCommit {
+ inbound,
+ chat_item,
+ unsupported_event,
+ duplicate: true,
+ }))
+}
+
+fn chat_item_by_id(
+ transaction: &Transaction<'_>,
+ chat_item_id: &str,
+) -> Result<Option<RadrootsSimplexAppChatItem>, RadrootsSimplexAppStoreError> {
+ transaction
+ .query_row(
+ "SELECT chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix
+ FROM chat_items
+ WHERE chat_item_id = ?1",
+ params![chat_item_id],
+ chat_item_from_row,
+ )
+ .optional()
+ .map_err(Into::into)
+}
+
+fn unsupported_event_by_id(
+ transaction: &Transaction<'_>,
+ event_id: &str,
+) -> Result<Option<RadrootsSimplexAppUnsupportedProtocolEvent>, RadrootsSimplexAppStoreError> {
+ transaction
+ .query_row(
+ "SELECT event_id, connection_id, event_kind, payload_json, status, received_at_unix
+ FROM unsupported_protocol_events
+ WHERE event_id = ?1",
+ params![event_id],
+ unsupported_event_from_row,
+ )
+ .optional()
+ .map_err(Into::into)
+}
+
fn derive_outbound_local_id(prefix: &str, connection_id: &str, chat_msg_id: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(prefix.as_bytes());
@@ -946,6 +1310,24 @@ fn derive_outbound_local_id(prefix: &str, connection_id: &str, chat_msg_id: &str
format!("{prefix}_{}", hex::encode(&digest[..16]))
}
+fn derive_inbound_local_id(
+ prefix: &str,
+ connection_id: &str,
+ broker_message_id_hash: &[u8],
+ message_hash: &[u8],
+) -> String {
+ let mut hasher = Sha256::new();
+ hasher.update(prefix.as_bytes());
+ hasher.update([0]);
+ hasher.update(connection_id.as_bytes());
+ hasher.update([0]);
+ hasher.update(broker_message_id_hash);
+ hasher.update([0]);
+ hasher.update(message_hash);
+ let digest = hasher.finalize();
+ format!("{prefix}_{}", hex::encode(&digest[..16]))
+}
+
fn profile_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppProfile> {
Ok(RadrootsSimplexAppProfile {
profile_id: row.get(0)?,
@@ -1042,7 +1424,9 @@ fn inbound_message_from_row(
inbound_sequence: row.get(3)?,
message_hash: row.get(4)?,
ack_status: row.get(5)?,
- received_at_unix: row.get(6)?,
+ app_record_kind: row.get(6)?,
+ app_record_id: row.get(7)?,
+ received_at_unix: row.get(8)?,
})
}
@@ -1155,6 +1539,31 @@ mod tests {
}
}
+ fn inbound_text_request() -> RadrootsSimplexAppInboundTextRequest {
+ RadrootsSimplexAppInboundTextRequest {
+ connection_id: "connection-1".into(),
+ conversation_id: "conversation-1".into(),
+ broker_message_id_hash: b"broker-message-hash-1".to_vec(),
+ inbound_sequence: Some(21),
+ message_hash: b"agent-message-hash-1".to_vec(),
+ chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()),
+ body: "hello from the iPhone".into(),
+ received_at_unix: 12,
+ }
+ }
+
+ fn inbound_unsupported_request() -> RadrootsSimplexAppInboundUnsupportedEventRequest {
+ RadrootsSimplexAppInboundUnsupportedEventRequest {
+ connection_id: "connection-1".into(),
+ broker_message_id_hash: b"broker-message-hash-2".to_vec(),
+ inbound_sequence: Some(22),
+ message_hash: b"agent-message-hash-2".to_vec(),
+ event_kind: "x.future.dm".into(),
+ payload_json: "{\"event\":\"x.future.dm\"}".into(),
+ received_at_unix: 13,
+ }
+ }
+
#[test]
fn empty_store_initializes_encrypted_schema() {
let temp = tempfile::tempdir().expect("temp");
@@ -1165,8 +1574,8 @@ mod tests {
let diagnostics = store.diagnostics();
assert!(diagnostics.encrypted);
assert!(!diagnostics.cipher.is_empty());
- assert_eq!(diagnostics.schema_version, 2);
- assert_eq!(diagnostics.migration_count, 2);
+ assert_eq!(diagnostics.schema_version, 3);
+ assert_eq!(diagnostics.migration_count, 3);
assert!(diagnostics.foreign_keys_enabled);
assert!(diagnostics.wal_enabled);
assert_eq!(diagnostics.key_source, "memory");
@@ -1234,7 +1643,9 @@ mod tests {
broker_message_id_hash: b"broker-hash".to_vec(),
inbound_sequence: Some(1),
message_hash: b"message-hash".to_vec(),
- ack_status: "pending".into(),
+ ack_status: "pending_ack".into(),
+ app_record_kind: "chat_item".into(),
+ app_record_id: "chat-2".into(),
received_at_unix: 8,
})
.expect("inbound");
@@ -1396,7 +1807,9 @@ mod tests {
broker_message_id_hash: b"dedupe".to_vec(),
inbound_sequence: Some(1),
message_hash: b"hash".to_vec(),
- ack_status: "pending".into(),
+ ack_status: "pending_ack".into(),
+ app_record_kind: "chat_item".into(),
+ app_record_id: "chat-1".into(),
received_at_unix: 8,
};
store.record_inbound_message(&inbound).expect("inbound");
@@ -1408,6 +1821,114 @@ mod tests {
}
#[test]
+ fn inbound_text_commit_persists_chat_item_and_pending_ack() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+
+ let commit = store
+ .commit_inbound_text(&inbound_text_request())
+ .expect("commit");
+
+ assert!(!commit.duplicate);
+ assert_eq!(commit.inbound.ack_status, "pending_ack");
+ assert_eq!(commit.inbound.app_record_kind, "chat_item");
+ let chat_item = commit.chat_item.expect("chat item");
+ assert_eq!(commit.inbound.app_record_id, chat_item.chat_item_id);
+ assert_eq!(
+ chat_item.direction,
+ RadrootsSimplexAppChatDirection::Inbound
+ );
+ assert_eq!(chat_item.chat_msg_id.as_deref(), Some("AQIDBAUGBwgJCgsM"));
+ assert_eq!(chat_item.body, "hello from the iPhone");
+ assert_eq!(
+ store.chat_page("conversation-1", 10).expect("page"),
+ vec![chat_item]
+ );
+ assert_eq!(
+ store.pending_ack_messages().expect("pending ack"),
+ vec![commit.inbound]
+ );
+ }
+
+ #[test]
+ fn inbound_text_duplicate_redelivery_returns_prior_commit() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+
+ let first = store
+ .commit_inbound_text(&inbound_text_request())
+ .expect("first");
+ let second = store
+ .commit_inbound_text(&inbound_text_request())
+ .expect("second");
+
+ assert!(second.duplicate);
+ assert_eq!(second.inbound, first.inbound);
+ assert_eq!(second.chat_item, first.chat_item);
+ assert_eq!(
+ store.chat_page("conversation-1", 10).expect("page").len(),
+ 1
+ );
+ assert_eq!(store.pending_ack_messages().expect("pending").len(), 1);
+ }
+
+ #[test]
+ fn inbound_unsupported_event_commit_persists_safe_record_and_pending_ack() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+
+ let commit = store
+ .commit_inbound_unsupported_event(&inbound_unsupported_request())
+ .expect("commit");
+
+ assert!(!commit.duplicate);
+ assert_eq!(commit.inbound.ack_status, "pending_ack");
+ assert_eq!(commit.inbound.app_record_kind, "unsupported_event");
+ let unsupported = commit.unsupported_event.expect("unsupported event");
+ assert_eq!(commit.inbound.app_record_id, unsupported.event_id);
+ assert_eq!(unsupported.event_kind, "x.future.dm");
+ assert_eq!(unsupported.status, "stored");
+ assert_eq!(
+ store
+ .list_unsupported_protocol_events()
+ .expect("unsupported"),
+ vec![unsupported]
+ );
+ assert_eq!(store.pending_ack_messages().expect("pending").len(), 1);
+ }
+
+ #[test]
+ fn invalid_inbound_text_does_not_create_chat_or_pending_ack() {
+ let temp = tempfile::tempdir().expect("temp");
+ let path = temp.path().join("simplex.sqlite");
+ let vault = Arc::new(RadrootsSecretVaultMemory::new());
+ let store = memory_store(&path, vault).expect("store");
+ seed_store(&store);
+ let invalid = RadrootsSimplexAppInboundTextRequest {
+ body: " ".into(),
+ ..inbound_text_request()
+ };
+
+ assert!(store.commit_inbound_text(&invalid).is_err());
+ assert!(
+ store
+ .chat_page("conversation-1", 10)
+ .expect("page")
+ .is_empty()
+ );
+ assert!(store.pending_ack_messages().expect("pending").is_empty());
+ }
+
+ #[test]
fn outbound_text_lifecycle_persists_chat_item_outbox_and_msg_id() {
let temp = tempfile::tempdir().expect("temp");
let path = temp.path().join("simplex.sqlite");