lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit fa89df195bf3925fafaad161735e33fb3e2a5dc0
parent fa4da70b2cfc81f04d02e65aa9adb78e5ab9cb86
Author: triesap <tyson@radroots.org>
Date:   Tue, 23 Jun 2026 07:54:42 +0000

simplex: add durable outbound app lifecycle

- add chat msgId and chat-item linkage to app-store message models

- create outbound chat item and outbox rows in one transaction

- generate 12-byte base64url chat msgIds for runtime sends

- cover outbound retry idempotency and feature builds

Diffstat:
MCargo.lock | 1+
Mcrates/simplex_app_store/Cargo.toml | 1+
Mcrates/simplex_app_store/src/error.rs | 4++++
Mcrates/simplex_app_store/src/lib.rs | 6++++--
Mcrates/simplex_app_store/src/model.rs | 17+++++++++++++++++
Mcrates/simplex_app_store/src/store.rs | 462+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
6 files changed, 446 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4628,6 +4628,7 @@ dependencies = [ name = "radroots_simplex_app_store" version = "0.1.0-alpha.2" dependencies = [ + "base64 0.22.1", "getrandom 0.2.17", "hex", "radroots_secret_vault", diff --git a/crates/simplex_app_store/Cargo.toml b/crates/simplex_app_store/Cargo.toml @@ -23,6 +23,7 @@ sqlcipher = ["std", "dep:rusqlite"] os-keyring = ["std", "radroots_secret_vault/os-keyring"] [dependencies] +base64 = { workspace = true } getrandom = { workspace = true } hex = { workspace = true } radroots_secret_vault = { workspace = true, default-features = false } diff --git a/crates/simplex_app_store/src/error.rs b/crates/simplex_app_store/src/error.rs @@ -8,6 +8,7 @@ pub enum RadrootsSimplexAppStoreError { InvalidDatabaseKey(String), EncryptionUnavailable, EncryptionKeyRejected, + MessageLifecycle(String), Schema(String), Sqlite(String), Io(String), @@ -34,6 +35,9 @@ impl fmt::Display for RadrootsSimplexAppStoreError { Self::EncryptionKeyRejected => { write!(formatter, "SimpleX app store encryption key was rejected") } + Self::MessageLifecycle(message) => { + write!(formatter, "SimpleX app message lifecycle error: {message}") + } Self::Schema(message) => write!(formatter, "SimpleX app store schema error: {message}"), Self::Sqlite(message) => write!(formatter, "SimpleX app sqlite error: {message}"), Self::Io(message) => write!(formatter, "SimpleX app store io error: {message}"), diff --git a/crates/simplex_app_store/src/lib.rs b/crates/simplex_app_store/src/lib.rs @@ -15,7 +15,8 @@ pub mod prelude { pub use crate::model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, }; @@ -27,7 +28,8 @@ pub use error::RadrootsSimplexAppStoreError; pub use model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, }; diff --git a/crates/simplex_app_store/src/model.rs b/crates/simplex_app_store/src/model.rs @@ -80,6 +80,7 @@ pub struct RadrootsSimplexAppChatItem { pub conversation_id: String, pub logical_order: i64, pub direction: RadrootsSimplexAppChatDirection, + pub chat_msg_id: Option<String>, pub body: String, pub delivery_status: String, pub created_at_unix: i64, @@ -99,8 +100,10 @@ pub struct RadrootsSimplexAppInboundMessageLogEntry { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RadrootsSimplexAppOutboxMessage { pub outbox_id: String, + pub chat_item_id: String, pub connection_id: String, pub conversation_id: Option<String>, + pub chat_msg_id: String, pub body: String, pub status: String, pub retry_after_unix: Option<i64>, @@ -108,6 +111,20 @@ pub struct RadrootsSimplexAppOutboxMessage { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsSimplexAppOutboundTextRequest { + pub connection_id: String, + pub conversation_id: String, + pub body: String, + pub created_at_unix: i64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsSimplexAppOutboundTextDraft { + pub chat_item: RadrootsSimplexAppChatItem, + pub outbox_message: RadrootsSimplexAppOutboxMessage, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RadrootsSimplexAppUnsupportedProtocolEvent { pub event_id: String, pub connection_id: Option<String>, diff --git a/crates/simplex_app_store/src/store.rs b/crates/simplex_app_store/src/store.rs @@ -2,7 +2,8 @@ use crate::error::RadrootsSimplexAppStoreError; use crate::model::{ RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, - RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboxMessage, + RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppOutboundTextDraft, + RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, RadrootsSimplexAppUnsupportedProtocolEvent, }; @@ -10,6 +11,8 @@ use alloc::format; use alloc::string::String; use alloc::sync::Arc; use alloc::vec::Vec; +use base64::Engine as _; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; use getrandom::getrandom; use radroots_secret_vault::RadrootsSecretVault; #[cfg(feature = "os-keyring")] @@ -21,9 +24,10 @@ use std::path::Path; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use zeroize::Zeroize; -const CURRENT_SCHEMA_VERSION: i64 = 1; +const CURRENT_SCHEMA_VERSION: i64 = 2; const DEFAULT_KEYCHAIN_SERVICE: &str = "org.radroots.simplex.app-store"; const DATABASE_KEY_BYTES: usize = 32; +const CHAT_MSG_ID_BYTES: usize = 12; pub struct RadrootsSimplexAppStore { connection: Connection, @@ -251,13 +255,14 @@ impl RadrootsSimplexAppStore { ) -> Result<(), RadrootsSimplexAppStoreError> { self.connection.execute( "INSERT INTO chat_items - (chat_item_id, conversation_id, logical_order, direction, body, delivery_status, created_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", params![ item.chat_item_id, item.conversation_id, item.logical_order, item.direction.as_str(), + item.chat_msg_id, item.body, item.delivery_status, item.created_at_unix @@ -266,13 +271,106 @@ impl RadrootsSimplexAppStore { Ok(()) } + pub fn create_outbound_text( + &self, + request: &RadrootsSimplexAppOutboundTextRequest, + ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { + let chat_msg_id = generate_chat_msg_id()?; + self.create_outbound_text_with_msg_id(request, &chat_msg_id) + } + + #[cfg(test)] + fn create_outbound_text_with_test_msg_id( + &self, + request: &RadrootsSimplexAppOutboundTextRequest, + chat_msg_id: &str, + ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { + self.create_outbound_text_with_msg_id(request, chat_msg_id) + } + + fn create_outbound_text_with_msg_id( + &self, + request: &RadrootsSimplexAppOutboundTextRequest, + chat_msg_id: &str, + ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { + validate_outbound_text_request(request)?; + validate_chat_msg_id(chat_msg_id)?; + let transaction = self.connection.unchecked_transaction()?; + if let Some(existing) = + outbound_text_by_msg_id(&transaction, &request.connection_id, chat_msg_id)? + { + transaction.commit()?; + return Ok(existing); + } + let logical_order = next_logical_order(&transaction, &request.conversation_id)?; + let chat_item_id = derive_outbound_local_id("chat", &request.connection_id, chat_msg_id); + let outbox_id = derive_outbound_local_id("outbox", &request.connection_id, chat_msg_id); + let chat_item = RadrootsSimplexAppChatItem { + chat_item_id: chat_item_id.clone(), + conversation_id: request.conversation_id.clone(), + logical_order, + direction: RadrootsSimplexAppChatDirection::Outbound, + chat_msg_id: Some(chat_msg_id.to_owned()), + body: request.body.clone(), + delivery_status: "pending".to_owned(), + created_at_unix: request.created_at_unix, + }; + transaction.execute( + "INSERT INTO chat_items + (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + chat_item.chat_item_id, + chat_item.conversation_id, + chat_item.logical_order, + chat_item.direction.as_str(), + chat_item.chat_msg_id, + chat_item.body, + chat_item.delivery_status, + chat_item.created_at_unix + ], + )?; + let outbox_message = RadrootsSimplexAppOutboxMessage { + outbox_id, + chat_item_id, + connection_id: request.connection_id.clone(), + conversation_id: Some(request.conversation_id.clone()), + chat_msg_id: chat_msg_id.to_owned(), + body: request.body.clone(), + status: "pending".to_owned(), + retry_after_unix: None, + created_at_unix: request.created_at_unix, + }; + transaction.execute( + "INSERT INTO outbox_messages + (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + params![ + outbox_message.outbox_id, + outbox_message.chat_item_id, + outbox_message.connection_id, + outbox_message.conversation_id, + outbox_message.chat_msg_id, + outbox_message.body, + outbox_message.status, + outbox_message.retry_after_unix, + outbox_message.created_at_unix + ], + )?; + transaction.commit()?; + Ok(RadrootsSimplexAppOutboundTextDraft { + chat_item, + outbox_message, + }) + } + pub fn chat_page( &self, conversation_id: &str, limit: usize, ) -> Result<Vec<RadrootsSimplexAppChatItem>, RadrootsSimplexAppStoreError> { let mut statement = self.connection.prepare( - "SELECT chat_item_id, conversation_id, logical_order, direction, body, delivery_status, created_at_unix + "SELECT chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix FROM chat_items WHERE conversation_id = ?1 ORDER BY logical_order DESC, chat_item_id DESC @@ -322,12 +420,14 @@ impl RadrootsSimplexAppStore { ) -> Result<(), RadrootsSimplexAppStoreError> { self.connection.execute( "INSERT INTO outbox_messages - (outbox_id, connection_id, conversation_id, body, status, retry_after_unix, created_at_unix) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", params![ message.outbox_id, + message.chat_item_id, message.connection_id, message.conversation_id, + message.chat_msg_id, message.body, message.status, message.retry_after_unix, @@ -341,7 +441,7 @@ impl RadrootsSimplexAppStore { &self, ) -> Result<Vec<RadrootsSimplexAppOutboxMessage>, RadrootsSimplexAppStoreError> { let mut statement = self.connection.prepare( - "SELECT outbox_id, connection_id, conversation_id, body, status, retry_after_unix, created_at_unix + "SELECT outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, retry_after_unix, created_at_unix FROM outbox_messages WHERE status IN ('pending', 'retryable') ORDER BY created_at_unix, outbox_id", @@ -481,34 +581,51 @@ fn migrate( "unsupported future schema version `{user_version}`" ))); } - if user_version == CURRENT_SCHEMA_VERSION { - return Ok(()); - } - if user_version != 0 { - return Err(RadrootsSimplexAppStoreError::Schema(format!( - "unsupported schema version `{user_version}`" - ))); + match user_version { + 0 => { + let transaction = connection.transaction()?; + apply_schema_v2(&transaction)?; + transaction.execute( + "INSERT INTO encryption_metadata + (id, key_slot_digest, key_source, cipher, created_at_unix) + VALUES (1, ?1, ?2, 'sqlcipher', ?3)", + params![key_slot_digest, key_source, now_unix_secs()], + )?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (1, 'initial-simplex-app-store', ?1)", + params![now_unix_secs()], + )?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (2, 'message-lifecycle-outbound', ?1)", + params![now_unix_secs()], + )?; + transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; + transaction.commit()?; + } + 1 => { + let transaction = connection.transaction()?; + apply_migration_v2(&transaction)?; + transaction.execute( + "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) + VALUES (2, 'message-lifecycle-outbound', ?1)", + params![now_unix_secs()], + )?; + transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; + transaction.commit()?; + } + CURRENT_SCHEMA_VERSION => {} + _ => { + return Err(RadrootsSimplexAppStoreError::Schema(format!( + "unsupported schema version `{user_version}`" + ))); + } } - - let transaction = connection.transaction()?; - apply_schema_v1(&transaction)?; - transaction.execute( - "INSERT INTO encryption_metadata - (id, key_slot_digest, key_source, cipher, created_at_unix) - VALUES (1, ?1, ?2, 'sqlcipher', ?3)", - params![key_slot_digest, key_source, now_unix_secs()], - )?; - transaction.execute( - "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) - VALUES (1, 'initial-simplex-app-store', ?1)", - params![now_unix_secs()], - )?; - transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; - transaction.commit()?; Ok(()) } -fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { +fn apply_schema_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { transaction.execute_batch( " CREATE TABLE encryption_metadata ( @@ -570,6 +687,7 @@ fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA conversation_id TEXT NOT NULL REFERENCES conversations(conversation_id) ON DELETE CASCADE, logical_order INTEGER NOT NULL, direction TEXT NOT NULL, + chat_msg_id TEXT, body TEXT NOT NULL, delivery_status TEXT NOT NULL, created_at_unix INTEGER NOT NULL @@ -588,8 +706,10 @@ fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA CREATE TABLE outbox_messages ( outbox_id TEXT PRIMARY KEY, + chat_item_id TEXT NOT NULL REFERENCES chat_items(chat_item_id) ON DELETE CASCADE, connection_id TEXT NOT NULL REFERENCES connections(connection_id) ON DELETE CASCADE, conversation_id TEXT REFERENCES conversations(conversation_id) ON DELETE SET NULL, + chat_msg_id TEXT NOT NULL, body TEXT NOT NULL, status TEXT NOT NULL, retry_after_unix INTEGER, @@ -607,6 +727,9 @@ fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA CREATE INDEX chat_items_page_idx ON chat_items(conversation_id, logical_order DESC, chat_item_id DESC); + CREATE UNIQUE INDEX chat_items_conversation_msg_id_idx + ON chat_items(conversation_id, chat_msg_id) + WHERE chat_msg_id IS NOT NULL; CREATE UNIQUE INDEX inbound_message_log_sequence_hash_idx ON inbound_message_log(connection_id, inbound_sequence, message_hash) WHERE inbound_sequence IS NOT NULL; @@ -616,6 +739,10 @@ fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA CREATE INDEX outbox_messages_pending_retryable_idx ON outbox_messages(connection_id, outbox_id) WHERE status IN ('pending', 'retryable'); + CREATE UNIQUE INDEX outbox_messages_connection_msg_id_idx + ON outbox_messages(connection_id, chat_msg_id); + CREATE UNIQUE INDEX outbox_messages_chat_item_idx + ON outbox_messages(chat_item_id); CREATE INDEX connections_state_idx ON connections(state); CREATE INDEX queue_endpoints_status_idx ON queue_endpoints(status); CREATE INDEX contacts_lifecycle_idx ON contacts(lifecycle); @@ -624,6 +751,30 @@ fn apply_schema_v1(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexA Ok(()) } +fn apply_migration_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { + transaction.execute_batch( + " + ALTER TABLE chat_items ADD COLUMN chat_msg_id TEXT; + ALTER TABLE outbox_messages ADD COLUMN chat_item_id TEXT NOT NULL DEFAULT ''; + ALTER TABLE outbox_messages ADD COLUMN chat_msg_id TEXT NOT NULL DEFAULT ''; + UPDATE outbox_messages + SET chat_item_id = outbox_id + WHERE chat_item_id = ''; + UPDATE outbox_messages + SET chat_msg_id = outbox_id + WHERE chat_msg_id = ''; + CREATE UNIQUE INDEX chat_items_conversation_msg_id_idx + ON chat_items(conversation_id, chat_msg_id) + WHERE chat_msg_id IS NOT NULL; + CREATE UNIQUE INDEX outbox_messages_connection_msg_id_idx + ON outbox_messages(connection_id, chat_msg_id); + CREATE UNIQUE INDEX outbox_messages_chat_item_idx + ON outbox_messages(chat_item_id); + ", + )?; + Ok(()) +} + fn verify_metadata( connection: &Connection, expected_key_slot_digest: &str, @@ -696,6 +847,105 @@ fn collect_rows<T>( rows.collect::<Result<Vec<_>, _>>().map_err(Into::into) } +fn generate_chat_msg_id() -> Result<String, RadrootsSimplexAppStoreError> { + let mut bytes = [0_u8; CHAT_MSG_ID_BYTES]; + getrandom(&mut bytes).map_err(|_| { + RadrootsSimplexAppStoreError::MessageLifecycle("entropy unavailable".into()) + })?; + Ok(URL_SAFE_NO_PAD.encode(bytes)) +} + +fn validate_chat_msg_id(value: &str) -> Result<(), RadrootsSimplexAppStoreError> { + let decoded = URL_SAFE_NO_PAD.decode(value.as_bytes()).map_err(|_| { + RadrootsSimplexAppStoreError::MessageLifecycle("chat msgId must be base64url".into()) + })?; + if decoded.len() != CHAT_MSG_ID_BYTES { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( + "chat msgId must decode to {CHAT_MSG_ID_BYTES} bytes" + ))); + } + Ok(()) +} + +fn validate_outbound_text_request( + request: &RadrootsSimplexAppOutboundTextRequest, +) -> Result<(), RadrootsSimplexAppStoreError> { + if request.connection_id.is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "connection id must not be empty".into(), + )); + } + if request.conversation_id.is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "conversation id must not be empty".into(), + )); + } + if request.body.trim().is_empty() { + return Err(RadrootsSimplexAppStoreError::MessageLifecycle( + "outbound text must not be empty".into(), + )); + } + Ok(()) +} + +fn next_logical_order( + transaction: &Transaction<'_>, + conversation_id: &str, +) -> Result<i64, RadrootsSimplexAppStoreError> { + let current: Option<i64> = transaction.query_row( + "SELECT MAX(logical_order) FROM chat_items WHERE conversation_id = ?1", + params![conversation_id], + |row| row.get(0), + )?; + Ok(current.unwrap_or(0).saturating_add(1)) +} + +fn outbound_text_by_msg_id( + transaction: &Transaction<'_>, + connection_id: &str, + chat_msg_id: &str, +) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { + transaction + .query_row( + "SELECT + c.chat_item_id, + c.conversation_id, + c.logical_order, + c.direction, + c.chat_msg_id, + c.body, + c.delivery_status, + c.created_at_unix, + o.outbox_id, + o.chat_item_id, + o.connection_id, + o.conversation_id, + o.chat_msg_id, + o.body, + o.status, + o.retry_after_unix, + o.created_at_unix + FROM outbox_messages o + JOIN chat_items c ON c.chat_item_id = o.chat_item_id + WHERE o.connection_id = ?1 AND o.chat_msg_id = ?2", + params![connection_id, chat_msg_id], + outbound_text_draft_from_row, + ) + .optional() + .map_err(Into::into) +} + +fn derive_outbound_local_id(prefix: &str, connection_id: &str, chat_msg_id: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(prefix.as_bytes()); + hasher.update([0]); + hasher.update(connection_id.as_bytes()); + hasher.update([0]); + hasher.update(chat_msg_id.as_bytes()); + let digest = hasher.finalize(); + format!("{prefix}_{}", hex::encode(&digest[..16])) +} + fn profile_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppProfile> { Ok(RadrootsSimplexAppProfile { profile_id: row.get(0)?, @@ -745,9 +995,40 @@ fn chat_item_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppChatI logical_order: row.get(2)?, direction: RadrootsSimplexAppChatDirection::parse(&direction) .map_err(|error| rusqlite::Error::ToSqlConversionFailure(error.into()))?, - body: row.get(4)?, - delivery_status: row.get(5)?, - created_at_unix: row.get(6)?, + chat_msg_id: row.get(4)?, + body: row.get(5)?, + delivery_status: row.get(6)?, + created_at_unix: row.get(7)?, + }) +} + +fn outbound_text_draft_from_row( + row: &Row<'_>, +) -> rusqlite::Result<RadrootsSimplexAppOutboundTextDraft> { + let direction: String = row.get(3)?; + Ok(RadrootsSimplexAppOutboundTextDraft { + chat_item: RadrootsSimplexAppChatItem { + chat_item_id: row.get(0)?, + conversation_id: row.get(1)?, + logical_order: row.get(2)?, + direction: RadrootsSimplexAppChatDirection::parse(&direction) + .map_err(|error| rusqlite::Error::ToSqlConversionFailure(error.into()))?, + chat_msg_id: row.get(4)?, + body: row.get(5)?, + delivery_status: row.get(6)?, + created_at_unix: row.get(7)?, + }, + outbox_message: RadrootsSimplexAppOutboxMessage { + outbox_id: row.get(8)?, + chat_item_id: row.get(9)?, + connection_id: row.get(10)?, + conversation_id: row.get(11)?, + chat_msg_id: row.get(12)?, + body: row.get(13)?, + status: row.get(14)?, + retry_after_unix: row.get(15)?, + created_at_unix: row.get(16)?, + }, }) } @@ -768,12 +1049,14 @@ fn inbound_message_from_row( fn outbox_message_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppOutboxMessage> { Ok(RadrootsSimplexAppOutboxMessage { outbox_id: row.get(0)?, - connection_id: row.get(1)?, - conversation_id: row.get(2)?, - body: row.get(3)?, - status: row.get(4)?, - retry_after_unix: row.get(5)?, - created_at_unix: row.get(6)?, + chat_item_id: row.get(1)?, + connection_id: row.get(2)?, + conversation_id: row.get(3)?, + chat_msg_id: row.get(4)?, + body: row.get(5)?, + status: row.get(6)?, + retry_after_unix: row.get(7)?, + created_at_unix: row.get(8)?, }) } @@ -863,6 +1146,15 @@ mod tests { .expect("conversation"); } + fn outbound_request() -> RadrootsSimplexAppOutboundTextRequest { + RadrootsSimplexAppOutboundTextRequest { + connection_id: "connection-1".into(), + conversation_id: "conversation-1".into(), + body: "hello encrypted iPhone".into(), + created_at_unix: 11, + } + } + #[test] fn empty_store_initializes_encrypted_schema() { let temp = tempfile::tempdir().expect("temp"); @@ -873,8 +1165,8 @@ mod tests { let diagnostics = store.diagnostics(); assert!(diagnostics.encrypted); assert!(!diagnostics.cipher.is_empty()); - assert_eq!(diagnostics.schema_version, 1); - assert_eq!(diagnostics.migration_count, 1); + assert_eq!(diagnostics.schema_version, 2); + assert_eq!(diagnostics.migration_count, 2); assert!(diagnostics.foreign_keys_enabled); assert!(diagnostics.wal_enabled); assert_eq!(diagnostics.key_source, "memory"); @@ -912,6 +1204,7 @@ mod tests { conversation_id: "conversation-1".into(), logical_order: 1, direction: RadrootsSimplexAppChatDirection::Outbound, + chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), body: "hello encrypted iPhone".into(), delivery_status: "sent".into(), created_at_unix: 6, @@ -923,6 +1216,7 @@ mod tests { conversation_id: "conversation-1".into(), logical_order: 2, direction: RadrootsSimplexAppChatDirection::Inbound, + chat_msg_id: None, body: "hello encrypted runtime".into(), delivery_status: "received".into(), created_at_unix: 7, @@ -952,8 +1246,10 @@ mod tests { store .enqueue_outbox_message(&RadrootsSimplexAppOutboxMessage { outbox_id: "outbox-1".into(), + chat_item_id: "chat-1".into(), connection_id: "connection-1".into(), conversation_id: Some("conversation-1".into()), + chat_msg_id: "AQIDBAUGBwgJCgsM".into(), body: "queued plaintext before encryption".into(), status: "retryable".into(), retry_after_unix: Some(9), @@ -997,6 +1293,7 @@ mod tests { conversation_id: "conversation-1".into(), logical_order: 1, direction: RadrootsSimplexAppChatDirection::Outbound, + chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), body: "plaintext should not appear in sqlite bytes".into(), delivery_status: "sent".into(), created_at_unix: 6, @@ -1109,4 +1406,83 @@ mod tests { }; assert!(store.record_inbound_message(&duplicate).is_err()); } + + #[test] + fn outbound_text_lifecycle_persists_chat_item_outbox_and_msg_id() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let draft = store + .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") + .expect("draft"); + + assert_eq!( + draft.chat_item.direction, + RadrootsSimplexAppChatDirection::Outbound + ); + assert_eq!( + draft.chat_item.chat_msg_id.as_deref(), + Some("AQIDBAUGBwgJCgsM") + ); + assert_eq!(draft.chat_item.delivery_status, "pending"); + assert_eq!( + draft.outbox_message.chat_item_id, + draft.chat_item.chat_item_id + ); + assert_eq!(draft.outbox_message.chat_msg_id, "AQIDBAUGBwgJCgsM"); + assert_eq!(draft.outbox_message.status, "pending"); + let page = store.chat_page("conversation-1", 10).expect("page"); + assert_eq!(page, vec![draft.chat_item]); + let pending = store.pending_outbox_messages().expect("pending"); + assert_eq!(pending, vec![draft.outbox_message]); + } + + #[test] + fn outbound_text_retry_preserves_msg_id_and_chat_item() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let first = store + .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") + .expect("first"); + let second = store + .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") + .expect("second"); + + assert_eq!(second, first); + assert_eq!( + store.chat_page("conversation-1", 10).expect("page").len(), + 1 + ); + assert_eq!(store.pending_outbox_messages().expect("pending").len(), 1); + } + + #[test] + fn outbound_text_generates_twelve_byte_base64url_msg_id() { + let temp = tempfile::tempdir().expect("temp"); + let path = temp.path().join("simplex.sqlite"); + let vault = Arc::new(RadrootsSecretVaultMemory::new()); + let store = memory_store(&path, vault).expect("store"); + seed_store(&store); + + let draft = store + .create_outbound_text(&outbound_request()) + .expect("draft"); + let chat_msg_id = draft.outbox_message.chat_msg_id; + let decoded = URL_SAFE_NO_PAD + .decode(chat_msg_id.as_bytes()) + .expect("base64url"); + + assert_eq!(decoded.len(), CHAT_MSG_ID_BYTES); + assert_eq!( + draft.chat_item.chat_msg_id.as_deref(), + Some(chat_msg_id.as_str()) + ); + } }