store.rs (103820B)
1 use crate::error::RadrootsSimplexAppStoreError; 2 use crate::model::{ 3 RadrootsSimplexAppChatDirection, RadrootsSimplexAppChatItem, RadrootsSimplexAppConnection, 4 RadrootsSimplexAppContact, RadrootsSimplexAppConversation, RadrootsSimplexAppDiagnostics, 5 RadrootsSimplexAppInboundChildEvent, RadrootsSimplexAppInboundCommit, 6 RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppInboundTextRequest, 7 RadrootsSimplexAppInboundUnsupportedEventRequest, RadrootsSimplexAppOutboundTextDraft, 8 RadrootsSimplexAppOutboundTextRequest, RadrootsSimplexAppOutboxMessage, 9 RadrootsSimplexAppProfile, RadrootsSimplexAppQueueEndpoint, 10 RadrootsSimplexAppUnsupportedProtocolEvent, 11 }; 12 use alloc::format; 13 use alloc::string::String; 14 use alloc::sync::Arc; 15 use alloc::vec::Vec; 16 use base64::Engine as _; 17 use base64::engine::general_purpose::URL_SAFE_NO_PAD; 18 use getrandom::getrandom; 19 use radroots_secret_vault::RadrootsSecretVault; 20 #[cfg(feature = "os-keyring")] 21 use radroots_secret_vault::RadrootsSecretVaultOsKeyring; 22 use rusqlite::{Connection, OpenFlags, OptionalExtension, Row, Transaction, params}; 23 use sha2::{Digest, Sha256}; 24 use std::fs; 25 use std::path::Path; 26 use std::time::{Duration, SystemTime, UNIX_EPOCH}; 27 use zeroize::Zeroize; 28 29 const CURRENT_SCHEMA_VERSION: i64 = 4; 30 const DEFAULT_KEYCHAIN_SERVICE: &str = "org.radroots.simplex.app-store"; 31 const DATABASE_KEY_BYTES: usize = 32; 32 const CHAT_MSG_ID_BYTES: usize = 12; 33 34 pub struct RadrootsSimplexAppStore { 35 connection: Connection, 36 diagnostics: RadrootsSimplexAppDiagnostics, 37 } 38 39 impl RadrootsSimplexAppStore { 40 #[cfg(feature = "os-keyring")] 41 pub fn open_keychain_backed( 42 path: impl AsRef<Path>, 43 ) -> Result<Self, RadrootsSimplexAppStoreError> { 44 let path = path.as_ref(); 45 let key_slot = derived_key_slot(path); 46 Self::open_with_vault( 47 path, 48 Arc::new(RadrootsSecretVaultOsKeyring::new(DEFAULT_KEYCHAIN_SERVICE)), 49 key_slot, 50 "host_vault", 51 ) 52 } 53 54 pub fn open_with_vault( 55 path: impl AsRef<Path>, 56 vault: Arc<dyn RadrootsSecretVault>, 57 key_slot: impl Into<String>, 58 key_source: impl Into<String>, 59 ) -> Result<Self, RadrootsSimplexAppStoreError> { 60 let path = path.as_ref(); 61 let key_slot = key_slot.into(); 62 let key_source = key_source.into(); 63 let existed = path.exists(); 64 if let Some(parent) = path.parent() 65 && !parent.as_os_str().is_empty() 66 { 67 fs::create_dir_all(parent).map_err(|error| { 68 RadrootsSimplexAppStoreError::Io(format!( 69 "failed to create SimpleX app store directory: {error}" 70 )) 71 })?; 72 } 73 74 let mut key_hex = load_or_create_database_key(vault.as_ref(), &key_slot, existed)?; 75 let key_slot_digest = key_slot_digest(&key_slot); 76 let mut connection = open_keyed_connection(path, &key_hex)?; 77 key_hex.zeroize(); 78 let cipher = verify_encryption(&connection)?; 79 configure_connection(&connection)?; 80 migrate(&mut connection, &key_slot_digest, &key_source)?; 81 verify_metadata(&connection, &key_slot_digest)?; 82 let diagnostics = diagnostics_for(&connection, cipher, key_source, key_slot_digest)?; 83 Ok(Self { 84 connection, 85 diagnostics, 86 }) 87 } 88 89 pub fn diagnostics(&self) -> &RadrootsSimplexAppDiagnostics { 90 &self.diagnostics 91 } 92 93 pub fn upsert_profile( 94 &self, 95 profile: &RadrootsSimplexAppProfile, 96 ) -> Result<(), RadrootsSimplexAppStoreError> { 97 self.connection.execute( 98 "INSERT INTO profiles (profile_id, display_name, created_at_unix) 99 VALUES (?1, ?2, ?3) 100 ON CONFLICT(profile_id) DO UPDATE SET display_name = excluded.display_name", 101 params![ 102 profile.profile_id, 103 profile.display_name, 104 profile.created_at_unix 105 ], 106 )?; 107 Ok(()) 108 } 109 110 pub fn get_profile( 111 &self, 112 profile_id: &str, 113 ) -> Result<Option<RadrootsSimplexAppProfile>, RadrootsSimplexAppStoreError> { 114 self.connection 115 .query_row( 116 "SELECT profile_id, display_name, created_at_unix FROM profiles WHERE profile_id = ?1", 117 params![profile_id], 118 profile_from_row, 119 ) 120 .optional() 121 .map_err(Into::into) 122 } 123 124 pub fn list_profiles( 125 &self, 126 ) -> Result<Vec<RadrootsSimplexAppProfile>, RadrootsSimplexAppStoreError> { 127 let mut statement = self.connection.prepare( 128 "SELECT profile_id, display_name, created_at_unix FROM profiles ORDER BY profile_id", 129 )?; 130 collect_rows(statement.query_map([], profile_from_row)?) 131 } 132 133 pub fn upsert_contact( 134 &self, 135 contact: &RadrootsSimplexAppContact, 136 ) -> Result<(), RadrootsSimplexAppStoreError> { 137 self.connection.execute( 138 "INSERT INTO contacts (contact_id, profile_id, display_name, lifecycle, created_at_unix) 139 VALUES (?1, ?2, ?3, ?4, ?5) 140 ON CONFLICT(contact_id) DO UPDATE SET 141 display_name = excluded.display_name, 142 lifecycle = excluded.lifecycle", 143 params![ 144 contact.contact_id, 145 contact.profile_id, 146 contact.display_name, 147 contact.lifecycle, 148 contact.created_at_unix 149 ], 150 )?; 151 Ok(()) 152 } 153 154 pub fn list_contacts( 155 &self, 156 ) -> Result<Vec<RadrootsSimplexAppContact>, RadrootsSimplexAppStoreError> { 157 let mut statement = self.connection.prepare( 158 "SELECT contact_id, profile_id, display_name, lifecycle, created_at_unix 159 FROM contacts ORDER BY contact_id", 160 )?; 161 collect_rows(statement.query_map([], contact_from_row)?) 162 } 163 164 pub fn upsert_connection( 165 &self, 166 connection: &RadrootsSimplexAppConnection, 167 ) -> Result<(), RadrootsSimplexAppStoreError> { 168 self.connection.execute( 169 "INSERT INTO connections 170 (connection_id, profile_id, contact_id, state, agent_connection_id, created_at_unix) 171 VALUES (?1, ?2, ?3, ?4, ?5, ?6) 172 ON CONFLICT(connection_id) DO UPDATE SET 173 contact_id = excluded.contact_id, 174 state = excluded.state, 175 agent_connection_id = excluded.agent_connection_id", 176 params![ 177 connection.connection_id, 178 connection.profile_id, 179 connection.contact_id, 180 connection.state, 181 connection.agent_connection_id, 182 connection.created_at_unix 183 ], 184 )?; 185 Ok(()) 186 } 187 188 pub fn list_connections_by_state( 189 &self, 190 state: &str, 191 ) -> Result<Vec<RadrootsSimplexAppConnection>, RadrootsSimplexAppStoreError> { 192 let mut statement = self.connection.prepare( 193 "SELECT connection_id, profile_id, contact_id, state, agent_connection_id, created_at_unix 194 FROM connections WHERE state = ?1 ORDER BY connection_id", 195 )?; 196 collect_rows(statement.query_map(params![state], connection_from_row)?) 197 } 198 199 pub fn upsert_queue_endpoint( 200 &self, 201 queue: &RadrootsSimplexAppQueueEndpoint, 202 ) -> Result<(), RadrootsSimplexAppStoreError> { 203 self.connection.execute( 204 "INSERT INTO queue_endpoints 205 (queue_endpoint_id, connection_id, role, server, sender_id, status, created_at_unix) 206 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) 207 ON CONFLICT(queue_endpoint_id) DO UPDATE SET 208 role = excluded.role, 209 server = excluded.server, 210 sender_id = excluded.sender_id, 211 status = excluded.status", 212 params![ 213 queue.queue_endpoint_id, 214 queue.connection_id, 215 queue.role, 216 queue.server, 217 queue.sender_id, 218 queue.status, 219 queue.created_at_unix 220 ], 221 )?; 222 Ok(()) 223 } 224 225 pub fn list_queues_by_status( 226 &self, 227 status: &str, 228 ) -> Result<Vec<RadrootsSimplexAppQueueEndpoint>, RadrootsSimplexAppStoreError> { 229 let mut statement = self.connection.prepare( 230 "SELECT queue_endpoint_id, connection_id, role, server, sender_id, status, created_at_unix 231 FROM queue_endpoints WHERE status = ?1 ORDER BY queue_endpoint_id", 232 )?; 233 collect_rows(statement.query_map(params![status], queue_endpoint_from_row)?) 234 } 235 236 pub fn upsert_conversation( 237 &self, 238 conversation: &RadrootsSimplexAppConversation, 239 ) -> Result<(), RadrootsSimplexAppStoreError> { 240 self.connection.execute( 241 "INSERT INTO conversations (conversation_id, profile_id, contact_id, created_at_unix) 242 VALUES (?1, ?2, ?3, ?4) 243 ON CONFLICT(conversation_id) DO UPDATE SET contact_id = excluded.contact_id", 244 params![ 245 conversation.conversation_id, 246 conversation.profile_id, 247 conversation.contact_id, 248 conversation.created_at_unix 249 ], 250 )?; 251 Ok(()) 252 } 253 254 pub fn append_chat_item( 255 &self, 256 item: &RadrootsSimplexAppChatItem, 257 ) -> Result<(), RadrootsSimplexAppStoreError> { 258 self.connection.execute( 259 "INSERT INTO chat_items 260 (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix) 261 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", 262 params![ 263 item.chat_item_id, 264 item.conversation_id, 265 item.logical_order, 266 item.direction.as_str(), 267 item.chat_msg_id, 268 item.body, 269 item.delivery_status, 270 item.created_at_unix 271 ], 272 )?; 273 Ok(()) 274 } 275 276 pub fn create_outbound_text( 277 &self, 278 request: &RadrootsSimplexAppOutboundTextRequest, 279 ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { 280 let chat_msg_id = generate_chat_msg_id()?; 281 self.create_outbound_text_with_msg_id(request, &chat_msg_id) 282 } 283 284 #[cfg(test)] 285 fn create_outbound_text_with_test_msg_id( 286 &self, 287 request: &RadrootsSimplexAppOutboundTextRequest, 288 chat_msg_id: &str, 289 ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { 290 self.create_outbound_text_with_msg_id(request, chat_msg_id) 291 } 292 293 fn create_outbound_text_with_msg_id( 294 &self, 295 request: &RadrootsSimplexAppOutboundTextRequest, 296 chat_msg_id: &str, 297 ) -> Result<RadrootsSimplexAppOutboundTextDraft, RadrootsSimplexAppStoreError> { 298 validate_outbound_text_request(request)?; 299 validate_chat_msg_id(chat_msg_id)?; 300 let transaction = self.connection.unchecked_transaction()?; 301 if let Some(existing) = 302 outbound_text_by_msg_id(&transaction, &request.connection_id, chat_msg_id)? 303 { 304 transaction.commit()?; 305 return Ok(existing); 306 } 307 let logical_order = next_logical_order(&transaction, &request.conversation_id)?; 308 let chat_item_id = derive_outbound_local_id("chat", &request.connection_id, chat_msg_id); 309 let outbox_id = derive_outbound_local_id("outbox", &request.connection_id, chat_msg_id); 310 let chat_item = RadrootsSimplexAppChatItem { 311 chat_item_id: chat_item_id.clone(), 312 conversation_id: request.conversation_id.clone(), 313 logical_order, 314 direction: RadrootsSimplexAppChatDirection::Outbound, 315 chat_msg_id: Some(chat_msg_id.to_owned()), 316 body: request.body.clone(), 317 delivery_status: "pending".to_owned(), 318 created_at_unix: request.created_at_unix, 319 }; 320 transaction.execute( 321 "INSERT INTO chat_items 322 (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix) 323 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", 324 params![ 325 chat_item.chat_item_id, 326 chat_item.conversation_id, 327 chat_item.logical_order, 328 chat_item.direction.as_str(), 329 chat_item.chat_msg_id, 330 chat_item.body, 331 chat_item.delivery_status, 332 chat_item.created_at_unix 333 ], 334 )?; 335 let outbox_message = RadrootsSimplexAppOutboxMessage { 336 outbox_id, 337 chat_item_id, 338 connection_id: request.connection_id.clone(), 339 conversation_id: Some(request.conversation_id.clone()), 340 chat_msg_id: chat_msg_id.to_owned(), 341 body: request.body.clone(), 342 status: "pending".to_owned(), 343 runtime_message_id: None, 344 retry_after_unix: None, 345 created_at_unix: request.created_at_unix, 346 }; 347 transaction.execute( 348 "INSERT INTO outbox_messages 349 (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix) 350 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 351 params![ 352 outbox_message.outbox_id, 353 outbox_message.chat_item_id, 354 outbox_message.connection_id, 355 outbox_message.conversation_id, 356 outbox_message.chat_msg_id, 357 outbox_message.body, 358 outbox_message.status, 359 outbox_message.runtime_message_id, 360 outbox_message.retry_after_unix, 361 outbox_message.created_at_unix 362 ], 363 )?; 364 transaction.commit()?; 365 Ok(RadrootsSimplexAppOutboundTextDraft { 366 chat_item, 367 outbox_message, 368 }) 369 } 370 371 pub fn chat_page( 372 &self, 373 conversation_id: &str, 374 limit: usize, 375 ) -> Result<Vec<RadrootsSimplexAppChatItem>, RadrootsSimplexAppStoreError> { 376 let mut statement = self.connection.prepare( 377 "SELECT chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix 378 FROM chat_items 379 WHERE conversation_id = ?1 380 ORDER BY logical_order DESC, chat_item_id DESC 381 LIMIT ?2", 382 )?; 383 collect_rows( 384 statement.query_map(params![conversation_id, limit as i64], chat_item_from_row)?, 385 ) 386 } 387 388 pub fn record_inbound_message( 389 &self, 390 entry: &RadrootsSimplexAppInboundMessageLogEntry, 391 ) -> Result<(), RadrootsSimplexAppStoreError> { 392 self.connection.execute( 393 "INSERT INTO inbound_message_log 394 (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix) 395 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 396 params![ 397 entry.inbound_id, 398 entry.connection_id, 399 entry.broker_message_id_hash, 400 entry.inbound_sequence, 401 entry.message_hash, 402 entry.runtime_ack_handle, 403 entry.ack_status, 404 entry.app_record_kind, 405 entry.app_record_id, 406 entry.received_at_unix 407 ], 408 )?; 409 Ok(()) 410 } 411 412 pub fn commit_inbound_text( 413 &self, 414 request: &RadrootsSimplexAppInboundTextRequest, 415 ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> { 416 validate_inbound_text_request(request)?; 417 let transaction = self.connection.unchecked_transaction()?; 418 let inbound = ensure_inbound_frame( 419 &transaction, 420 &request.connection_id, 421 &request.broker_message_id_hash, 422 request.inbound_sequence, 423 &request.message_hash, 424 &request.runtime_ack_handle, 425 request.received_at_unix, 426 )?; 427 if let Some(existing) = 428 inbound_child_commit_by_ordinal(&transaction, &inbound, request.child_ordinal)? 429 { 430 transaction.commit()?; 431 return Ok(existing); 432 } 433 let chat_item_id = derive_inbound_child_local_id( 434 "chat", 435 &inbound.inbound_id, 436 request.child_ordinal, 437 request.chat_msg_id.as_deref().unwrap_or(""), 438 ); 439 let logical_order = next_logical_order(&transaction, &request.conversation_id)?; 440 let chat_item = RadrootsSimplexAppChatItem { 441 chat_item_id: chat_item_id.clone(), 442 conversation_id: request.conversation_id.clone(), 443 logical_order, 444 direction: RadrootsSimplexAppChatDirection::Inbound, 445 chat_msg_id: request.chat_msg_id.clone(), 446 body: request.body.clone(), 447 delivery_status: "received".to_owned(), 448 created_at_unix: request.received_at_unix, 449 }; 450 transaction.execute( 451 "INSERT INTO chat_items 452 (chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix) 453 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", 454 params![ 455 chat_item.chat_item_id, 456 chat_item.conversation_id, 457 chat_item.logical_order, 458 chat_item.direction.as_str(), 459 chat_item.chat_msg_id, 460 chat_item.body, 461 chat_item.delivery_status, 462 chat_item.created_at_unix 463 ], 464 )?; 465 let child_event = RadrootsSimplexAppInboundChildEvent { 466 child_event_id: derive_inbound_child_local_id( 467 "child", 468 &inbound.inbound_id, 469 request.child_ordinal, 470 request.chat_msg_id.as_deref().unwrap_or(""), 471 ), 472 inbound_id: inbound.inbound_id.clone(), 473 child_ordinal: request.child_ordinal, 474 app_record_kind: "chat_item".to_owned(), 475 app_record_id: chat_item_id, 476 event_kind: "x.msg.new".to_owned(), 477 chat_msg_id: request.chat_msg_id.clone(), 478 received_at_unix: request.received_at_unix, 479 }; 480 insert_inbound_child_event(&transaction, &child_event)?; 481 transaction.commit()?; 482 Ok(RadrootsSimplexAppInboundCommit { 483 inbound, 484 child_event, 485 chat_item: Some(chat_item), 486 unsupported_event: None, 487 duplicate: false, 488 }) 489 } 490 491 pub fn commit_inbound_unsupported_event( 492 &self, 493 request: &RadrootsSimplexAppInboundUnsupportedEventRequest, 494 ) -> Result<RadrootsSimplexAppInboundCommit, RadrootsSimplexAppStoreError> { 495 validate_inbound_unsupported_request(request)?; 496 let transaction = self.connection.unchecked_transaction()?; 497 let inbound = ensure_inbound_frame( 498 &transaction, 499 &request.connection_id, 500 &request.broker_message_id_hash, 501 request.inbound_sequence, 502 &request.message_hash, 503 &request.runtime_ack_handle, 504 request.received_at_unix, 505 )?; 506 if let Some(existing) = 507 inbound_child_commit_by_ordinal(&transaction, &inbound, request.child_ordinal)? 508 { 509 transaction.commit()?; 510 return Ok(existing); 511 } 512 let event_id = derive_inbound_child_local_id( 513 "unsupported", 514 &inbound.inbound_id, 515 request.child_ordinal, 516 &request.event_kind, 517 ); 518 let unsupported_event = RadrootsSimplexAppUnsupportedProtocolEvent { 519 event_id: event_id.clone(), 520 connection_id: Some(request.connection_id.clone()), 521 event_kind: request.event_kind.clone(), 522 payload_json: request.payload_json.clone(), 523 status: "stored".to_owned(), 524 received_at_unix: request.received_at_unix, 525 }; 526 transaction.execute( 527 "INSERT INTO unsupported_protocol_events 528 (event_id, connection_id, event_kind, payload_json, status, received_at_unix) 529 VALUES (?1, ?2, ?3, ?4, ?5, ?6)", 530 params![ 531 unsupported_event.event_id, 532 unsupported_event.connection_id, 533 unsupported_event.event_kind, 534 unsupported_event.payload_json, 535 unsupported_event.status, 536 unsupported_event.received_at_unix 537 ], 538 )?; 539 let child_event = RadrootsSimplexAppInboundChildEvent { 540 child_event_id: derive_inbound_child_local_id( 541 "child", 542 &inbound.inbound_id, 543 request.child_ordinal, 544 &request.event_kind, 545 ), 546 inbound_id: inbound.inbound_id.clone(), 547 child_ordinal: request.child_ordinal, 548 app_record_kind: "unsupported_event".to_owned(), 549 app_record_id: event_id, 550 event_kind: request.event_kind.clone(), 551 chat_msg_id: None, 552 received_at_unix: request.received_at_unix, 553 }; 554 insert_inbound_child_event(&transaction, &child_event)?; 555 transaction.commit()?; 556 Ok(RadrootsSimplexAppInboundCommit { 557 inbound, 558 child_event, 559 chat_item: None, 560 unsupported_event: Some(unsupported_event), 561 duplicate: false, 562 }) 563 } 564 565 pub fn pending_ack_messages( 566 &self, 567 ) -> Result<Vec<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> { 568 let mut statement = self.connection.prepare( 569 "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix 570 FROM inbound_message_log 571 WHERE ack_status = 'pending_ack' 572 ORDER BY received_at_unix, inbound_id", 573 )?; 574 collect_rows(statement.query_map([], inbound_message_from_row)?) 575 } 576 577 pub fn mark_inbound_ack_delivered( 578 &self, 579 connection_id: &str, 580 inbound_sequence: i64, 581 message_hash: &[u8], 582 ) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> 583 { 584 if connection_id.is_empty() { 585 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 586 "connection id must not be empty".into(), 587 )); 588 } 589 if inbound_sequence < 0 { 590 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 591 "inbound sequence must not be negative".into(), 592 )); 593 } 594 if message_hash.is_empty() { 595 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 596 "message hash must not be empty".into(), 597 )); 598 } 599 let Some(inbound) = self 600 .connection 601 .query_row( 602 "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix 603 FROM inbound_message_log 604 WHERE connection_id = ?1 AND inbound_sequence = ?2 AND message_hash = ?3 605 LIMIT 1", 606 params![connection_id, inbound_sequence, message_hash], 607 inbound_message_from_row, 608 ) 609 .optional()? else { 610 return Ok(None); 611 }; 612 self.mark_inbound_ack_delivered_by_handle(&inbound.runtime_ack_handle) 613 } 614 615 pub fn mark_inbound_ack_delivered_by_handle( 616 &self, 617 runtime_ack_handle: &str, 618 ) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> 619 { 620 if runtime_ack_handle.is_empty() { 621 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 622 "runtime ack handle must not be empty".into(), 623 )); 624 } 625 self.connection.execute( 626 "UPDATE inbound_message_log 627 SET ack_status = 'acked' 628 WHERE runtime_ack_handle = ?1 AND ack_status = 'pending_ack'", 629 params![runtime_ack_handle], 630 )?; 631 self.connection 632 .query_row( 633 "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix 634 FROM inbound_message_log 635 WHERE runtime_ack_handle = ?1 636 LIMIT 1", 637 params![runtime_ack_handle], 638 inbound_message_from_row, 639 ) 640 .optional() 641 .map_err(Into::into) 642 } 643 644 pub fn enqueue_outbox_message( 645 &self, 646 message: &RadrootsSimplexAppOutboxMessage, 647 ) -> Result<(), RadrootsSimplexAppStoreError> { 648 self.connection.execute( 649 "INSERT INTO outbox_messages 650 (outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix) 651 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 652 params![ 653 message.outbox_id, 654 message.chat_item_id, 655 message.connection_id, 656 message.conversation_id, 657 message.chat_msg_id, 658 message.body, 659 message.status, 660 message.runtime_message_id, 661 message.retry_after_unix, 662 message.created_at_unix 663 ], 664 )?; 665 Ok(()) 666 } 667 668 pub fn pending_outbox_messages( 669 &self, 670 ) -> Result<Vec<RadrootsSimplexAppOutboxMessage>, RadrootsSimplexAppStoreError> { 671 let mut statement = self.connection.prepare( 672 "SELECT outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix 673 FROM outbox_messages 674 WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL 675 ORDER BY created_at_unix, outbox_id", 676 )?; 677 collect_rows(statement.query_map([], outbox_message_from_row)?) 678 } 679 680 pub fn list_outbox_messages( 681 &self, 682 ) -> Result<Vec<RadrootsSimplexAppOutboxMessage>, RadrootsSimplexAppStoreError> { 683 let mut statement = self.connection.prepare( 684 "SELECT outbox_id, chat_item_id, connection_id, conversation_id, chat_msg_id, body, status, runtime_message_id, retry_after_unix, created_at_unix 685 FROM outbox_messages 686 ORDER BY created_at_unix, outbox_id", 687 )?; 688 collect_rows(statement.query_map([], outbox_message_from_row)?) 689 } 690 691 pub fn mark_outbox_message_queued( 692 &self, 693 outbox_id: &str, 694 runtime_message_id: u64, 695 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 696 if outbox_id.is_empty() { 697 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 698 "outbox id must not be empty".into(), 699 )); 700 } 701 let transaction = self.connection.unchecked_transaction()?; 702 let Some(current) = outbound_text_by_outbox_id(&transaction, outbox_id)? else { 703 transaction.commit()?; 704 return Ok(None); 705 }; 706 match current.outbox_message.status.as_str() { 707 "pending" | "retryable" => {} 708 other => { 709 return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( 710 "cannot queue outbound message `{outbox_id}` from `{other}`" 711 ))); 712 } 713 } 714 let runtime_message_id = i64::try_from(runtime_message_id).map_err(|_| { 715 RadrootsSimplexAppStoreError::MessageLifecycle(format!( 716 "runtime message id `{runtime_message_id}` exceeds app-store range" 717 )) 718 })?; 719 transaction.execute( 720 "UPDATE outbox_messages 721 SET runtime_message_id = ?2 722 WHERE outbox_id = ?1", 723 params![outbox_id, runtime_message_id], 724 )?; 725 let updated = outbound_text_by_outbox_id(&transaction, outbox_id)?; 726 transaction.commit()?; 727 Ok(updated) 728 } 729 730 pub fn mark_outbox_message_sent( 731 &self, 732 outbox_id: &str, 733 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 734 self.mark_outbox_message_delivery_status(outbox_id, "sent", false) 735 } 736 737 pub fn mark_outbox_message_acknowledged( 738 &self, 739 outbox_id: &str, 740 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 741 self.mark_outbox_message_delivery_status(outbox_id, "acknowledged", true) 742 } 743 744 fn mark_outbox_message_delivery_status( 745 &self, 746 outbox_id: &str, 747 status: &str, 748 terminal: bool, 749 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 750 if outbox_id.is_empty() { 751 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 752 "outbox id must not be empty".into(), 753 )); 754 } 755 let transaction = self.connection.unchecked_transaction()?; 756 let Some(current) = outbound_text_by_outbox_id(&transaction, outbox_id)? else { 757 transaction.commit()?; 758 return Ok(None); 759 }; 760 match (current.outbox_message.status.as_str(), status) { 761 ("pending" | "retryable" | "sent", "sent") 762 | ("sent" | "acknowledged", "acknowledged") 763 | ("acknowledged", "sent") => {} 764 (current_status, next_status) => { 765 return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( 766 "cannot transition outbound message `{outbox_id}` from `{current_status}` to `{next_status}`" 767 ))); 768 } 769 } 770 if terminal { 771 transaction.execute( 772 "UPDATE outbox_messages SET status = ?2 WHERE outbox_id = ?1", 773 params![outbox_id, status], 774 )?; 775 transaction.execute( 776 "UPDATE chat_items 777 SET delivery_status = ?2 778 WHERE chat_item_id = ( 779 SELECT chat_item_id FROM outbox_messages WHERE outbox_id = ?1 780 )", 781 params![outbox_id, status], 782 )?; 783 } else { 784 transaction.execute( 785 "UPDATE outbox_messages 786 SET status = CASE WHEN status = 'acknowledged' THEN status ELSE ?2 END 787 WHERE outbox_id = ?1", 788 params![outbox_id, status], 789 )?; 790 transaction.execute( 791 "UPDATE chat_items 792 SET delivery_status = CASE 793 WHEN delivery_status = 'acknowledged' THEN delivery_status 794 ELSE ?2 795 END 796 WHERE chat_item_id = ( 797 SELECT chat_item_id FROM outbox_messages WHERE outbox_id = ?1 798 )", 799 params![outbox_id, status], 800 )?; 801 } 802 let updated = outbound_text_by_outbox_id(&transaction, outbox_id)?; 803 transaction.commit()?; 804 Ok(updated) 805 } 806 807 pub fn record_unsupported_protocol_event( 808 &self, 809 event: &RadrootsSimplexAppUnsupportedProtocolEvent, 810 ) -> Result<(), RadrootsSimplexAppStoreError> { 811 self.connection.execute( 812 "INSERT INTO unsupported_protocol_events 813 (event_id, connection_id, event_kind, payload_json, status, received_at_unix) 814 VALUES (?1, ?2, ?3, ?4, ?5, ?6)", 815 params![ 816 event.event_id, 817 event.connection_id, 818 event.event_kind, 819 event.payload_json, 820 event.status, 821 event.received_at_unix 822 ], 823 )?; 824 Ok(()) 825 } 826 827 pub fn list_unsupported_protocol_events( 828 &self, 829 ) -> Result<Vec<RadrootsSimplexAppUnsupportedProtocolEvent>, RadrootsSimplexAppStoreError> { 830 let mut statement = self.connection.prepare( 831 "SELECT event_id, connection_id, event_kind, payload_json, status, received_at_unix 832 FROM unsupported_protocol_events ORDER BY received_at_unix, event_id", 833 )?; 834 collect_rows(statement.query_map([], unsupported_event_from_row)?) 835 } 836 837 pub fn reset_disposable_runtime_state(&self) -> Result<(), RadrootsSimplexAppStoreError> { 838 let transaction = self.connection.unchecked_transaction()?; 839 transaction.execute("DELETE FROM unsupported_protocol_events", [])?; 840 transaction.execute("DELETE FROM inbound_child_events", [])?; 841 transaction.execute("DELETE FROM inbound_message_log", [])?; 842 transaction.execute("DELETE FROM outbox_messages", [])?; 843 transaction.execute("DELETE FROM chat_items", [])?; 844 transaction.execute("DELETE FROM queue_endpoints", [])?; 845 transaction.execute("DELETE FROM conversations", [])?; 846 transaction.execute("DELETE FROM connections", [])?; 847 transaction.execute("DELETE FROM contacts", [])?; 848 transaction.commit()?; 849 Ok(()) 850 } 851 } 852 853 fn load_or_create_database_key( 854 vault: &dyn RadrootsSecretVault, 855 key_slot: &str, 856 database_exists: bool, 857 ) -> Result<String, RadrootsSimplexAppStoreError> { 858 match vault.load_secret(key_slot)? { 859 Some(secret) => validate_database_key(secret), 860 None if database_exists => Err(RadrootsSimplexAppStoreError::MissingDatabaseKey), 861 None => { 862 let key = generate_database_key_hex()?; 863 vault.store_secret(key_slot, &key)?; 864 Ok(key) 865 } 866 } 867 } 868 869 fn generate_database_key_hex() -> Result<String, RadrootsSimplexAppStoreError> { 870 let mut key = [0_u8; DATABASE_KEY_BYTES]; 871 getrandom(&mut key).map_err(|_| { 872 RadrootsSimplexAppStoreError::InvalidDatabaseKey("entropy unavailable".into()) 873 })?; 874 let hex = hex::encode(key); 875 key.zeroize(); 876 Ok(hex) 877 } 878 879 fn validate_database_key(secret: String) -> Result<String, RadrootsSimplexAppStoreError> { 880 if secret.len() != DATABASE_KEY_BYTES * 2 { 881 return Err(RadrootsSimplexAppStoreError::InvalidDatabaseKey( 882 "expected 32-byte hex key".into(), 883 )); 884 } 885 if !secret.as_bytes().iter().all(u8::is_ascii_hexdigit) { 886 return Err(RadrootsSimplexAppStoreError::InvalidDatabaseKey( 887 "key is not hex encoded".into(), 888 )); 889 } 890 Ok(secret) 891 } 892 893 fn open_keyed_connection( 894 path: &Path, 895 key_hex: &str, 896 ) -> Result<Connection, RadrootsSimplexAppStoreError> { 897 let connection = Connection::open_with_flags( 898 path, 899 OpenFlags::SQLITE_OPEN_READ_WRITE 900 | OpenFlags::SQLITE_OPEN_CREATE 901 | OpenFlags::SQLITE_OPEN_FULL_MUTEX, 902 )?; 903 connection.busy_timeout(Duration::from_secs(5))?; 904 connection.execute_batch(&format!("PRAGMA key = \"x'{key_hex}'\";"))?; 905 match connection.query_row("SELECT count(*) FROM sqlite_schema", [], |_| Ok(())) { 906 Ok(()) => Ok(connection), 907 Err(_) => Err(RadrootsSimplexAppStoreError::EncryptionKeyRejected), 908 } 909 } 910 911 fn verify_encryption(connection: &Connection) -> Result<String, RadrootsSimplexAppStoreError> { 912 let cipher = connection 913 .query_row("PRAGMA cipher_version", [], |row| row.get::<_, String>(0)) 914 .optional()? 915 .ok_or(RadrootsSimplexAppStoreError::EncryptionUnavailable)?; 916 if cipher.trim().is_empty() { 917 return Err(RadrootsSimplexAppStoreError::EncryptionUnavailable); 918 } 919 Ok(cipher) 920 } 921 922 fn configure_connection(connection: &Connection) -> Result<(), RadrootsSimplexAppStoreError> { 923 connection.pragma_update(None, "foreign_keys", true)?; 924 let foreign_keys: i64 = 925 connection.pragma_query_value(None, "foreign_keys", |row| row.get(0))?; 926 if foreign_keys != 1 { 927 return Err(RadrootsSimplexAppStoreError::Schema( 928 "foreign keys did not enable".into(), 929 )); 930 } 931 let journal_mode: String = 932 connection.pragma_update_and_check(None, "journal_mode", "WAL", |row| row.get(0))?; 933 if !journal_mode.eq_ignore_ascii_case("wal") { 934 return Err(RadrootsSimplexAppStoreError::Schema(format!( 935 "WAL journal mode unavailable: {journal_mode}" 936 ))); 937 } 938 connection.pragma_update(None, "synchronous", "NORMAL")?; 939 Ok(()) 940 } 941 942 fn migrate( 943 connection: &mut Connection, 944 key_slot_digest: &str, 945 key_source: &str, 946 ) -> Result<(), RadrootsSimplexAppStoreError> { 947 let user_version: i64 = 948 connection.pragma_query_value(None, "user_version", |row| row.get(0))?; 949 if user_version > CURRENT_SCHEMA_VERSION { 950 return Err(RadrootsSimplexAppStoreError::Schema(format!( 951 "unsupported future schema version `{user_version}`" 952 ))); 953 } 954 match user_version { 955 0 => { 956 let transaction = connection.transaction()?; 957 apply_schema_v4(&transaction)?; 958 transaction.execute( 959 "INSERT INTO encryption_metadata 960 (id, key_slot_digest, key_source, cipher, created_at_unix) 961 VALUES (1, ?1, ?2, 'sqlcipher', ?3)", 962 params![key_slot_digest, key_source, now_unix_secs()], 963 )?; 964 transaction.execute( 965 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 966 VALUES (1, 'initial-simplex-app-store', ?1)", 967 params![now_unix_secs()], 968 )?; 969 transaction.execute( 970 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 971 VALUES (2, 'message-lifecycle-outbound', ?1)", 972 params![now_unix_secs()], 973 )?; 974 transaction.execute( 975 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 976 VALUES (3, 'message-lifecycle-inbound', ?1)", 977 params![now_unix_secs()], 978 )?; 979 transaction.execute( 980 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 981 VALUES (4, 'message-lifecycle-frame-children', ?1)", 982 params![now_unix_secs()], 983 )?; 984 transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; 985 transaction.commit()?; 986 } 987 1 => { 988 let transaction = connection.transaction()?; 989 apply_migration_v2(&transaction)?; 990 apply_migration_v3(&transaction)?; 991 apply_migration_v4(&transaction)?; 992 transaction.execute( 993 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 994 VALUES (2, 'message-lifecycle-outbound', ?1)", 995 params![now_unix_secs()], 996 )?; 997 transaction.execute( 998 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 999 VALUES (3, 'message-lifecycle-inbound', ?1)", 1000 params![now_unix_secs()], 1001 )?; 1002 transaction.execute( 1003 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 1004 VALUES (4, 'message-lifecycle-frame-children', ?1)", 1005 params![now_unix_secs()], 1006 )?; 1007 transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; 1008 transaction.commit()?; 1009 } 1010 2 => { 1011 let transaction = connection.transaction()?; 1012 apply_migration_v3(&transaction)?; 1013 apply_migration_v4(&transaction)?; 1014 transaction.execute( 1015 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 1016 VALUES (3, 'message-lifecycle-inbound', ?1)", 1017 params![now_unix_secs()], 1018 )?; 1019 transaction.execute( 1020 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 1021 VALUES (4, 'message-lifecycle-frame-children', ?1)", 1022 params![now_unix_secs()], 1023 )?; 1024 transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; 1025 transaction.commit()?; 1026 } 1027 3 => { 1028 let transaction = connection.transaction()?; 1029 apply_migration_v4(&transaction)?; 1030 transaction.execute( 1031 "INSERT INTO simplex_schema_migrations (version, name, applied_at_unix) 1032 VALUES (4, 'message-lifecycle-frame-children', ?1)", 1033 params![now_unix_secs()], 1034 )?; 1035 transaction.pragma_update(None, "user_version", CURRENT_SCHEMA_VERSION)?; 1036 transaction.commit()?; 1037 } 1038 CURRENT_SCHEMA_VERSION => {} 1039 _ => { 1040 return Err(RadrootsSimplexAppStoreError::Schema(format!( 1041 "unsupported schema version `{user_version}`" 1042 ))); 1043 } 1044 } 1045 Ok(()) 1046 } 1047 1048 fn apply_schema_v4(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { 1049 transaction.execute_batch( 1050 " 1051 CREATE TABLE encryption_metadata ( 1052 id INTEGER PRIMARY KEY CHECK (id = 1), 1053 key_slot_digest TEXT NOT NULL, 1054 key_source TEXT NOT NULL, 1055 cipher TEXT NOT NULL, 1056 created_at_unix INTEGER NOT NULL 1057 ); 1058 1059 CREATE TABLE simplex_schema_migrations ( 1060 version INTEGER PRIMARY KEY, 1061 name TEXT NOT NULL, 1062 applied_at_unix INTEGER NOT NULL 1063 ); 1064 1065 CREATE TABLE profiles ( 1066 profile_id TEXT PRIMARY KEY, 1067 display_name TEXT NOT NULL, 1068 created_at_unix INTEGER NOT NULL 1069 ); 1070 1071 CREATE TABLE contacts ( 1072 contact_id TEXT PRIMARY KEY, 1073 profile_id TEXT NOT NULL REFERENCES profiles(profile_id) ON DELETE CASCADE, 1074 display_name TEXT NOT NULL, 1075 lifecycle TEXT NOT NULL, 1076 created_at_unix INTEGER NOT NULL 1077 ); 1078 1079 CREATE TABLE connections ( 1080 connection_id TEXT PRIMARY KEY, 1081 profile_id TEXT NOT NULL REFERENCES profiles(profile_id) ON DELETE CASCADE, 1082 contact_id TEXT REFERENCES contacts(contact_id) ON DELETE SET NULL, 1083 state TEXT NOT NULL, 1084 agent_connection_id TEXT, 1085 created_at_unix INTEGER NOT NULL 1086 ); 1087 1088 CREATE TABLE queue_endpoints ( 1089 queue_endpoint_id TEXT PRIMARY KEY, 1090 connection_id TEXT NOT NULL REFERENCES connections(connection_id) ON DELETE CASCADE, 1091 role TEXT NOT NULL, 1092 server TEXT NOT NULL, 1093 sender_id BLOB NOT NULL, 1094 status TEXT NOT NULL, 1095 created_at_unix INTEGER NOT NULL 1096 ); 1097 1098 CREATE TABLE conversations ( 1099 conversation_id TEXT PRIMARY KEY, 1100 profile_id TEXT NOT NULL REFERENCES profiles(profile_id) ON DELETE CASCADE, 1101 contact_id TEXT REFERENCES contacts(contact_id) ON DELETE SET NULL, 1102 created_at_unix INTEGER NOT NULL 1103 ); 1104 1105 CREATE TABLE chat_items ( 1106 chat_item_id TEXT PRIMARY KEY, 1107 conversation_id TEXT NOT NULL REFERENCES conversations(conversation_id) ON DELETE CASCADE, 1108 logical_order INTEGER NOT NULL, 1109 direction TEXT NOT NULL, 1110 chat_msg_id TEXT, 1111 body TEXT NOT NULL, 1112 delivery_status TEXT NOT NULL, 1113 created_at_unix INTEGER NOT NULL 1114 ); 1115 1116 CREATE TABLE inbound_message_log ( 1117 inbound_id TEXT PRIMARY KEY, 1118 connection_id TEXT NOT NULL REFERENCES connections(connection_id) ON DELETE CASCADE, 1119 broker_message_id_hash BLOB NOT NULL, 1120 inbound_sequence INTEGER, 1121 message_hash BLOB NOT NULL, 1122 runtime_ack_handle TEXT NOT NULL, 1123 ack_status TEXT NOT NULL, 1124 app_record_kind TEXT NOT NULL, 1125 app_record_id TEXT NOT NULL, 1126 received_at_unix INTEGER NOT NULL, 1127 UNIQUE(connection_id, broker_message_id_hash) 1128 ); 1129 1130 CREATE TABLE inbound_child_events ( 1131 child_event_id TEXT PRIMARY KEY, 1132 inbound_id TEXT NOT NULL REFERENCES inbound_message_log(inbound_id) ON DELETE CASCADE, 1133 child_ordinal INTEGER NOT NULL, 1134 app_record_kind TEXT NOT NULL, 1135 app_record_id TEXT NOT NULL, 1136 event_kind TEXT NOT NULL, 1137 chat_msg_id TEXT, 1138 received_at_unix INTEGER NOT NULL, 1139 UNIQUE(inbound_id, child_ordinal) 1140 ); 1141 1142 CREATE TABLE outbox_messages ( 1143 outbox_id TEXT PRIMARY KEY, 1144 chat_item_id TEXT NOT NULL REFERENCES chat_items(chat_item_id) ON DELETE CASCADE, 1145 connection_id TEXT NOT NULL REFERENCES connections(connection_id) ON DELETE CASCADE, 1146 conversation_id TEXT REFERENCES conversations(conversation_id) ON DELETE SET NULL, 1147 chat_msg_id TEXT NOT NULL, 1148 body TEXT NOT NULL, 1149 status TEXT NOT NULL, 1150 runtime_message_id INTEGER, 1151 retry_after_unix INTEGER, 1152 created_at_unix INTEGER NOT NULL 1153 ); 1154 1155 CREATE TABLE unsupported_protocol_events ( 1156 event_id TEXT PRIMARY KEY, 1157 connection_id TEXT REFERENCES connections(connection_id) ON DELETE SET NULL, 1158 event_kind TEXT NOT NULL, 1159 payload_json TEXT NOT NULL, 1160 status TEXT NOT NULL, 1161 received_at_unix INTEGER NOT NULL 1162 ); 1163 1164 CREATE INDEX chat_items_page_idx 1165 ON chat_items(conversation_id, logical_order DESC, chat_item_id DESC); 1166 CREATE UNIQUE INDEX chat_items_conversation_msg_id_idx 1167 ON chat_items(conversation_id, chat_msg_id) 1168 WHERE chat_msg_id IS NOT NULL; 1169 CREATE UNIQUE INDEX inbound_message_log_sequence_hash_idx 1170 ON inbound_message_log(connection_id, inbound_sequence, message_hash) 1171 WHERE inbound_sequence IS NOT NULL; 1172 CREATE INDEX inbound_message_log_pending_ack_idx 1173 ON inbound_message_log(connection_id, inbound_id) 1174 WHERE ack_status = 'pending_ack'; 1175 CREATE INDEX inbound_child_events_frame_idx 1176 ON inbound_child_events(inbound_id, child_ordinal); 1177 CREATE INDEX outbox_messages_pending_retryable_idx 1178 ON outbox_messages(connection_id, outbox_id) 1179 WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL; 1180 CREATE UNIQUE INDEX outbox_messages_connection_msg_id_idx 1181 ON outbox_messages(connection_id, chat_msg_id); 1182 CREATE UNIQUE INDEX outbox_messages_chat_item_idx 1183 ON outbox_messages(chat_item_id); 1184 CREATE INDEX connections_state_idx ON connections(state); 1185 CREATE INDEX queue_endpoints_status_idx ON queue_endpoints(status); 1186 CREATE INDEX contacts_lifecycle_idx ON contacts(lifecycle); 1187 ", 1188 )?; 1189 Ok(()) 1190 } 1191 1192 fn apply_migration_v2(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { 1193 transaction.execute_batch( 1194 " 1195 ALTER TABLE chat_items ADD COLUMN chat_msg_id TEXT; 1196 ALTER TABLE outbox_messages ADD COLUMN chat_item_id TEXT NOT NULL DEFAULT ''; 1197 ALTER TABLE outbox_messages ADD COLUMN chat_msg_id TEXT NOT NULL DEFAULT ''; 1198 UPDATE outbox_messages 1199 SET chat_item_id = outbox_id 1200 WHERE chat_item_id = ''; 1201 UPDATE outbox_messages 1202 SET chat_msg_id = outbox_id 1203 WHERE chat_msg_id = ''; 1204 CREATE UNIQUE INDEX chat_items_conversation_msg_id_idx 1205 ON chat_items(conversation_id, chat_msg_id) 1206 WHERE chat_msg_id IS NOT NULL; 1207 CREATE UNIQUE INDEX outbox_messages_connection_msg_id_idx 1208 ON outbox_messages(connection_id, chat_msg_id); 1209 CREATE UNIQUE INDEX outbox_messages_chat_item_idx 1210 ON outbox_messages(chat_item_id); 1211 ", 1212 )?; 1213 Ok(()) 1214 } 1215 1216 fn apply_migration_v3(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { 1217 transaction.execute_batch( 1218 " 1219 ALTER TABLE inbound_message_log ADD COLUMN app_record_kind TEXT NOT NULL DEFAULT 'inbound_log'; 1220 ALTER TABLE inbound_message_log ADD COLUMN app_record_id TEXT NOT NULL DEFAULT ''; 1221 UPDATE inbound_message_log 1222 SET app_record_id = inbound_id 1223 WHERE app_record_id = ''; 1224 UPDATE inbound_message_log 1225 SET ack_status = 'pending_ack' 1226 WHERE ack_status = 'pending'; 1227 ", 1228 )?; 1229 Ok(()) 1230 } 1231 1232 fn apply_migration_v4(transaction: &Transaction<'_>) -> Result<(), RadrootsSimplexAppStoreError> { 1233 transaction.execute_batch( 1234 " 1235 ALTER TABLE inbound_message_log ADD COLUMN runtime_ack_handle TEXT NOT NULL DEFAULT ''; 1236 ALTER TABLE outbox_messages ADD COLUMN runtime_message_id INTEGER; 1237 CREATE TABLE inbound_child_events ( 1238 child_event_id TEXT PRIMARY KEY, 1239 inbound_id TEXT NOT NULL REFERENCES inbound_message_log(inbound_id) ON DELETE CASCADE, 1240 child_ordinal INTEGER NOT NULL, 1241 app_record_kind TEXT NOT NULL, 1242 app_record_id TEXT NOT NULL, 1243 event_kind TEXT NOT NULL, 1244 chat_msg_id TEXT, 1245 received_at_unix INTEGER NOT NULL, 1246 UNIQUE(inbound_id, child_ordinal) 1247 ); 1248 INSERT INTO inbound_child_events 1249 (child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix) 1250 SELECT 1251 'child_' || inbound_id, 1252 inbound_id, 1253 0, 1254 app_record_kind, 1255 app_record_id, 1256 app_record_kind, 1257 NULL, 1258 received_at_unix 1259 FROM inbound_message_log 1260 WHERE app_record_id <> ''; 1261 UPDATE inbound_message_log 1262 SET runtime_ack_handle = 'legacy:' || connection_id || ':' || COALESCE(CAST(inbound_sequence AS TEXT), inbound_id); 1263 CREATE INDEX inbound_child_events_frame_idx 1264 ON inbound_child_events(inbound_id, child_ordinal); 1265 DROP INDEX IF EXISTS outbox_messages_pending_retryable_idx; 1266 CREATE INDEX outbox_messages_pending_retryable_idx 1267 ON outbox_messages(connection_id, outbox_id) 1268 WHERE status IN ('pending', 'retryable') AND runtime_message_id IS NULL; 1269 ", 1270 )?; 1271 Ok(()) 1272 } 1273 1274 fn verify_metadata( 1275 connection: &Connection, 1276 expected_key_slot_digest: &str, 1277 ) -> Result<(), RadrootsSimplexAppStoreError> { 1278 let actual_key_slot_digest: String = connection.query_row( 1279 "SELECT key_slot_digest FROM encryption_metadata WHERE id = 1", 1280 [], 1281 |row| row.get(0), 1282 )?; 1283 if actual_key_slot_digest != expected_key_slot_digest { 1284 return Err(RadrootsSimplexAppStoreError::EncryptionKeyRejected); 1285 } 1286 Ok(()) 1287 } 1288 1289 fn diagnostics_for( 1290 connection: &Connection, 1291 cipher: String, 1292 key_source: String, 1293 key_slot_digest: String, 1294 ) -> Result<RadrootsSimplexAppDiagnostics, RadrootsSimplexAppStoreError> { 1295 let schema_version: i64 = 1296 connection.pragma_query_value(None, "user_version", |row| row.get(0))?; 1297 let migration_count: i64 = connection.query_row( 1298 "SELECT count(*) FROM simplex_schema_migrations", 1299 [], 1300 |row| row.get(0), 1301 )?; 1302 let foreign_keys: i64 = 1303 connection.pragma_query_value(None, "foreign_keys", |row| row.get(0))?; 1304 let journal_mode: String = 1305 connection.pragma_query_value(None, "journal_mode", |row| row.get(0))?; 1306 Ok(RadrootsSimplexAppDiagnostics { 1307 encrypted: true, 1308 cipher, 1309 schema_version: schema_version as u32, 1310 migration_count: migration_count as usize, 1311 foreign_keys_enabled: foreign_keys == 1, 1312 wal_enabled: journal_mode.eq_ignore_ascii_case("wal"), 1313 key_source, 1314 key_slot_digest, 1315 }) 1316 } 1317 1318 fn key_slot_digest(key_slot: &str) -> String { 1319 let mut hasher = Sha256::new(); 1320 hasher.update(key_slot.as_bytes()); 1321 hex::encode(hasher.finalize()) 1322 } 1323 1324 fn derived_key_slot(path: &Path) -> String { 1325 let mut hasher = Sha256::new(); 1326 hasher.update(path.as_os_str().as_encoded_bytes()); 1327 format!( 1328 "radroots_simplex_app_store_{}", 1329 hex::encode(hasher.finalize()) 1330 ) 1331 } 1332 1333 fn now_unix_secs() -> i64 { 1334 SystemTime::now() 1335 .duration_since(UNIX_EPOCH) 1336 .map(|duration| i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)) 1337 .unwrap_or(0) 1338 } 1339 1340 fn collect_rows<T>( 1341 rows: rusqlite::MappedRows<'_, impl FnMut(&Row<'_>) -> rusqlite::Result<T>>, 1342 ) -> Result<Vec<T>, RadrootsSimplexAppStoreError> { 1343 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into) 1344 } 1345 1346 fn insert_inbound_log( 1347 transaction: &Transaction<'_>, 1348 inbound: &RadrootsSimplexAppInboundMessageLogEntry, 1349 ) -> Result<(), RadrootsSimplexAppStoreError> { 1350 transaction.execute( 1351 "INSERT INTO inbound_message_log 1352 (inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix) 1353 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 1354 params![ 1355 inbound.inbound_id, 1356 inbound.connection_id, 1357 inbound.broker_message_id_hash, 1358 inbound.inbound_sequence, 1359 inbound.message_hash, 1360 inbound.runtime_ack_handle, 1361 inbound.ack_status, 1362 inbound.app_record_kind, 1363 inbound.app_record_id, 1364 inbound.received_at_unix 1365 ], 1366 )?; 1367 Ok(()) 1368 } 1369 1370 fn generate_chat_msg_id() -> Result<String, RadrootsSimplexAppStoreError> { 1371 let mut bytes = [0_u8; CHAT_MSG_ID_BYTES]; 1372 getrandom(&mut bytes).map_err(|_| { 1373 RadrootsSimplexAppStoreError::MessageLifecycle("entropy unavailable".into()) 1374 })?; 1375 Ok(URL_SAFE_NO_PAD.encode(bytes)) 1376 } 1377 1378 fn validate_chat_msg_id(value: &str) -> Result<(), RadrootsSimplexAppStoreError> { 1379 let decoded = URL_SAFE_NO_PAD.decode(value.as_bytes()).map_err(|_| { 1380 RadrootsSimplexAppStoreError::MessageLifecycle("chat msgId must be base64url".into()) 1381 })?; 1382 if decoded.len() != CHAT_MSG_ID_BYTES { 1383 return Err(RadrootsSimplexAppStoreError::MessageLifecycle(format!( 1384 "chat msgId must decode to {CHAT_MSG_ID_BYTES} bytes" 1385 ))); 1386 } 1387 Ok(()) 1388 } 1389 1390 fn validate_outbound_text_request( 1391 request: &RadrootsSimplexAppOutboundTextRequest, 1392 ) -> Result<(), RadrootsSimplexAppStoreError> { 1393 if request.connection_id.is_empty() { 1394 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1395 "connection id must not be empty".into(), 1396 )); 1397 } 1398 if request.conversation_id.is_empty() { 1399 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1400 "conversation id must not be empty".into(), 1401 )); 1402 } 1403 if request.body.trim().is_empty() { 1404 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1405 "outbound text must not be empty".into(), 1406 )); 1407 } 1408 Ok(()) 1409 } 1410 1411 fn validate_inbound_text_request( 1412 request: &RadrootsSimplexAppInboundTextRequest, 1413 ) -> Result<(), RadrootsSimplexAppStoreError> { 1414 validate_inbound_identity( 1415 &request.connection_id, 1416 &request.broker_message_id_hash, 1417 &request.message_hash, 1418 &request.runtime_ack_handle, 1419 )?; 1420 if request.conversation_id.is_empty() { 1421 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1422 "conversation id must not be empty".into(), 1423 )); 1424 } 1425 if request.body.trim().is_empty() { 1426 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1427 "inbound text must not be empty".into(), 1428 )); 1429 } 1430 if let Some(chat_msg_id) = &request.chat_msg_id { 1431 validate_chat_msg_id(chat_msg_id)?; 1432 } 1433 Ok(()) 1434 } 1435 1436 fn validate_inbound_unsupported_request( 1437 request: &RadrootsSimplexAppInboundUnsupportedEventRequest, 1438 ) -> Result<(), RadrootsSimplexAppStoreError> { 1439 validate_inbound_identity( 1440 &request.connection_id, 1441 &request.broker_message_id_hash, 1442 &request.message_hash, 1443 &request.runtime_ack_handle, 1444 )?; 1445 if request.event_kind.is_empty() { 1446 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1447 "unsupported event kind must not be empty".into(), 1448 )); 1449 } 1450 if request.payload_json.is_empty() { 1451 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1452 "unsupported event payload must not be empty".into(), 1453 )); 1454 } 1455 Ok(()) 1456 } 1457 1458 fn validate_inbound_identity( 1459 connection_id: &str, 1460 broker_message_id_hash: &[u8], 1461 message_hash: &[u8], 1462 runtime_ack_handle: &str, 1463 ) -> Result<(), RadrootsSimplexAppStoreError> { 1464 if connection_id.is_empty() { 1465 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1466 "connection id must not be empty".into(), 1467 )); 1468 } 1469 if broker_message_id_hash.is_empty() { 1470 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1471 "broker message id hash must not be empty".into(), 1472 )); 1473 } 1474 if message_hash.is_empty() { 1475 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1476 "message hash must not be empty".into(), 1477 )); 1478 } 1479 if runtime_ack_handle.is_empty() { 1480 return Err(RadrootsSimplexAppStoreError::MessageLifecycle( 1481 "runtime ack handle must not be empty".into(), 1482 )); 1483 } 1484 Ok(()) 1485 } 1486 1487 fn next_logical_order( 1488 transaction: &Transaction<'_>, 1489 conversation_id: &str, 1490 ) -> Result<i64, RadrootsSimplexAppStoreError> { 1491 let current: Option<i64> = transaction.query_row( 1492 "SELECT MAX(logical_order) FROM chat_items WHERE conversation_id = ?1", 1493 params![conversation_id], 1494 |row| row.get(0), 1495 )?; 1496 Ok(current.unwrap_or(0).saturating_add(1)) 1497 } 1498 1499 fn outbound_text_by_msg_id( 1500 transaction: &Transaction<'_>, 1501 connection_id: &str, 1502 chat_msg_id: &str, 1503 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 1504 transaction 1505 .query_row( 1506 "SELECT 1507 c.chat_item_id, 1508 c.conversation_id, 1509 c.logical_order, 1510 c.direction, 1511 c.chat_msg_id, 1512 c.body, 1513 c.delivery_status, 1514 c.created_at_unix, 1515 o.outbox_id, 1516 o.chat_item_id, 1517 o.connection_id, 1518 o.conversation_id, 1519 o.chat_msg_id, 1520 o.body, 1521 o.status, 1522 o.runtime_message_id, 1523 o.retry_after_unix, 1524 o.created_at_unix 1525 FROM outbox_messages o 1526 JOIN chat_items c ON c.chat_item_id = o.chat_item_id 1527 WHERE o.connection_id = ?1 AND o.chat_msg_id = ?2", 1528 params![connection_id, chat_msg_id], 1529 outbound_text_draft_from_row, 1530 ) 1531 .optional() 1532 .map_err(Into::into) 1533 } 1534 1535 fn outbound_text_by_outbox_id( 1536 transaction: &Transaction<'_>, 1537 outbox_id: &str, 1538 ) -> Result<Option<RadrootsSimplexAppOutboundTextDraft>, RadrootsSimplexAppStoreError> { 1539 transaction 1540 .query_row( 1541 "SELECT 1542 c.chat_item_id, 1543 c.conversation_id, 1544 c.logical_order, 1545 c.direction, 1546 c.chat_msg_id, 1547 c.body, 1548 c.delivery_status, 1549 c.created_at_unix, 1550 o.outbox_id, 1551 o.chat_item_id, 1552 o.connection_id, 1553 o.conversation_id, 1554 o.chat_msg_id, 1555 o.body, 1556 o.status, 1557 o.runtime_message_id, 1558 o.retry_after_unix, 1559 o.created_at_unix 1560 FROM outbox_messages o 1561 JOIN chat_items c ON c.chat_item_id = o.chat_item_id 1562 WHERE o.outbox_id = ?1", 1563 params![outbox_id], 1564 outbound_text_draft_from_row, 1565 ) 1566 .optional() 1567 .map_err(Into::into) 1568 } 1569 1570 fn inbound_frame_by_identity( 1571 transaction: &Transaction<'_>, 1572 connection_id: &str, 1573 broker_message_id_hash: &[u8], 1574 inbound_sequence: Option<i64>, 1575 message_hash: &[u8], 1576 ) -> Result<Option<RadrootsSimplexAppInboundMessageLogEntry>, RadrootsSimplexAppStoreError> { 1577 Ok(match inbound_sequence { 1578 Some(sequence) => transaction 1579 .query_row( 1580 "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix 1581 FROM inbound_message_log 1582 WHERE connection_id = ?1 1583 AND (broker_message_id_hash = ?2 OR (inbound_sequence = ?3 AND message_hash = ?4)) 1584 ORDER BY received_at_unix, inbound_id 1585 LIMIT 1", 1586 params![connection_id, broker_message_id_hash, sequence, message_hash], 1587 inbound_message_from_row, 1588 ) 1589 .optional()?, 1590 None => transaction 1591 .query_row( 1592 "SELECT inbound_id, connection_id, broker_message_id_hash, inbound_sequence, message_hash, runtime_ack_handle, ack_status, app_record_kind, app_record_id, received_at_unix 1593 FROM inbound_message_log 1594 WHERE connection_id = ?1 AND broker_message_id_hash = ?2 1595 ORDER BY received_at_unix, inbound_id 1596 LIMIT 1", 1597 params![connection_id, broker_message_id_hash], 1598 inbound_message_from_row, 1599 ) 1600 .optional()?, 1601 }) 1602 } 1603 1604 fn ensure_inbound_frame( 1605 transaction: &Transaction<'_>, 1606 connection_id: &str, 1607 broker_message_id_hash: &[u8], 1608 inbound_sequence: Option<i64>, 1609 message_hash: &[u8], 1610 runtime_ack_handle: &str, 1611 received_at_unix: i64, 1612 ) -> Result<RadrootsSimplexAppInboundMessageLogEntry, RadrootsSimplexAppStoreError> { 1613 if let Some(existing) = inbound_frame_by_identity( 1614 transaction, 1615 connection_id, 1616 broker_message_id_hash, 1617 inbound_sequence, 1618 message_hash, 1619 )? { 1620 return Ok(existing); 1621 } 1622 let inbound_id = derive_inbound_frame_local_id( 1623 "inbound", 1624 connection_id, 1625 broker_message_id_hash, 1626 message_hash, 1627 ); 1628 let inbound = RadrootsSimplexAppInboundMessageLogEntry { 1629 inbound_id: inbound_id.clone(), 1630 connection_id: connection_id.to_owned(), 1631 broker_message_id_hash: broker_message_id_hash.to_vec(), 1632 inbound_sequence, 1633 message_hash: message_hash.to_vec(), 1634 runtime_ack_handle: runtime_ack_handle.to_owned(), 1635 ack_status: "pending_ack".to_owned(), 1636 app_record_kind: "frame".to_owned(), 1637 app_record_id: inbound_id, 1638 received_at_unix, 1639 }; 1640 insert_inbound_log(transaction, &inbound)?; 1641 Ok(inbound) 1642 } 1643 1644 fn inbound_child_commit_by_ordinal( 1645 transaction: &Transaction<'_>, 1646 inbound: &RadrootsSimplexAppInboundMessageLogEntry, 1647 child_ordinal: u32, 1648 ) -> Result<Option<RadrootsSimplexAppInboundCommit>, RadrootsSimplexAppStoreError> { 1649 let child_event = transaction 1650 .query_row( 1651 "SELECT child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix 1652 FROM inbound_child_events 1653 WHERE inbound_id = ?1 AND child_ordinal = ?2 1654 LIMIT 1", 1655 params![inbound.inbound_id, i64::from(child_ordinal)], 1656 inbound_child_event_from_row, 1657 ) 1658 .optional()?; 1659 let Some(child_event) = child_event else { 1660 return Ok(None); 1661 }; 1662 let chat_item = if child_event.app_record_kind == "chat_item" { 1663 chat_item_by_id(transaction, &child_event.app_record_id)? 1664 } else { 1665 None 1666 }; 1667 let unsupported_event = if child_event.app_record_kind == "unsupported_event" { 1668 unsupported_event_by_id(transaction, &child_event.app_record_id)? 1669 } else { 1670 None 1671 }; 1672 Ok(Some(RadrootsSimplexAppInboundCommit { 1673 inbound: inbound.clone(), 1674 child_event, 1675 chat_item, 1676 unsupported_event, 1677 duplicate: true, 1678 })) 1679 } 1680 1681 fn insert_inbound_child_event( 1682 transaction: &Transaction<'_>, 1683 child_event: &RadrootsSimplexAppInboundChildEvent, 1684 ) -> Result<(), RadrootsSimplexAppStoreError> { 1685 transaction.execute( 1686 "INSERT INTO inbound_child_events 1687 (child_event_id, inbound_id, child_ordinal, app_record_kind, app_record_id, event_kind, chat_msg_id, received_at_unix) 1688 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", 1689 params![ 1690 child_event.child_event_id, 1691 child_event.inbound_id, 1692 i64::from(child_event.child_ordinal), 1693 child_event.app_record_kind, 1694 child_event.app_record_id, 1695 child_event.event_kind, 1696 child_event.chat_msg_id, 1697 child_event.received_at_unix 1698 ], 1699 )?; 1700 Ok(()) 1701 } 1702 1703 fn chat_item_by_id( 1704 transaction: &Transaction<'_>, 1705 chat_item_id: &str, 1706 ) -> Result<Option<RadrootsSimplexAppChatItem>, RadrootsSimplexAppStoreError> { 1707 transaction 1708 .query_row( 1709 "SELECT chat_item_id, conversation_id, logical_order, direction, chat_msg_id, body, delivery_status, created_at_unix 1710 FROM chat_items 1711 WHERE chat_item_id = ?1", 1712 params![chat_item_id], 1713 chat_item_from_row, 1714 ) 1715 .optional() 1716 .map_err(Into::into) 1717 } 1718 1719 fn unsupported_event_by_id( 1720 transaction: &Transaction<'_>, 1721 event_id: &str, 1722 ) -> Result<Option<RadrootsSimplexAppUnsupportedProtocolEvent>, RadrootsSimplexAppStoreError> { 1723 transaction 1724 .query_row( 1725 "SELECT event_id, connection_id, event_kind, payload_json, status, received_at_unix 1726 FROM unsupported_protocol_events 1727 WHERE event_id = ?1", 1728 params![event_id], 1729 unsupported_event_from_row, 1730 ) 1731 .optional() 1732 .map_err(Into::into) 1733 } 1734 1735 fn derive_outbound_local_id(prefix: &str, connection_id: &str, chat_msg_id: &str) -> String { 1736 let mut hasher = Sha256::new(); 1737 hasher.update(prefix.as_bytes()); 1738 hasher.update([0]); 1739 hasher.update(connection_id.as_bytes()); 1740 hasher.update([0]); 1741 hasher.update(chat_msg_id.as_bytes()); 1742 let digest = hasher.finalize(); 1743 format!("{prefix}_{}", hex::encode(&digest[..16])) 1744 } 1745 1746 fn derive_inbound_frame_local_id( 1747 prefix: &str, 1748 connection_id: &str, 1749 broker_message_id_hash: &[u8], 1750 message_hash: &[u8], 1751 ) -> String { 1752 let mut hasher = Sha256::new(); 1753 hasher.update(prefix.as_bytes()); 1754 hasher.update([0]); 1755 hasher.update(connection_id.as_bytes()); 1756 hasher.update([0]); 1757 hasher.update(broker_message_id_hash); 1758 hasher.update([0]); 1759 hasher.update(message_hash); 1760 let digest = hasher.finalize(); 1761 format!("{prefix}_{}", hex::encode(&digest[..16])) 1762 } 1763 1764 fn derive_inbound_child_local_id( 1765 prefix: &str, 1766 inbound_id: &str, 1767 child_ordinal: u32, 1768 key: &str, 1769 ) -> String { 1770 let mut hasher = Sha256::new(); 1771 hasher.update(prefix.as_bytes()); 1772 hasher.update([0]); 1773 hasher.update(inbound_id.as_bytes()); 1774 hasher.update([0]); 1775 hasher.update(child_ordinal.to_be_bytes()); 1776 hasher.update([0]); 1777 hasher.update(key.as_bytes()); 1778 let digest = hasher.finalize(); 1779 format!("{prefix}_{}", hex::encode(&digest[..16])) 1780 } 1781 1782 fn profile_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppProfile> { 1783 Ok(RadrootsSimplexAppProfile { 1784 profile_id: row.get(0)?, 1785 display_name: row.get(1)?, 1786 created_at_unix: row.get(2)?, 1787 }) 1788 } 1789 1790 fn contact_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppContact> { 1791 Ok(RadrootsSimplexAppContact { 1792 contact_id: row.get(0)?, 1793 profile_id: row.get(1)?, 1794 display_name: row.get(2)?, 1795 lifecycle: row.get(3)?, 1796 created_at_unix: row.get(4)?, 1797 }) 1798 } 1799 1800 fn connection_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppConnection> { 1801 Ok(RadrootsSimplexAppConnection { 1802 connection_id: row.get(0)?, 1803 profile_id: row.get(1)?, 1804 contact_id: row.get(2)?, 1805 state: row.get(3)?, 1806 agent_connection_id: row.get(4)?, 1807 created_at_unix: row.get(5)?, 1808 }) 1809 } 1810 1811 fn queue_endpoint_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppQueueEndpoint> { 1812 Ok(RadrootsSimplexAppQueueEndpoint { 1813 queue_endpoint_id: row.get(0)?, 1814 connection_id: row.get(1)?, 1815 role: row.get(2)?, 1816 server: row.get(3)?, 1817 sender_id: row.get(4)?, 1818 status: row.get(5)?, 1819 created_at_unix: row.get(6)?, 1820 }) 1821 } 1822 1823 fn chat_item_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppChatItem> { 1824 let direction: String = row.get(3)?; 1825 Ok(RadrootsSimplexAppChatItem { 1826 chat_item_id: row.get(0)?, 1827 conversation_id: row.get(1)?, 1828 logical_order: row.get(2)?, 1829 direction: RadrootsSimplexAppChatDirection::parse(&direction) 1830 .map_err(|error| rusqlite::Error::ToSqlConversionFailure(error.into()))?, 1831 chat_msg_id: row.get(4)?, 1832 body: row.get(5)?, 1833 delivery_status: row.get(6)?, 1834 created_at_unix: row.get(7)?, 1835 }) 1836 } 1837 1838 fn outbound_text_draft_from_row( 1839 row: &Row<'_>, 1840 ) -> rusqlite::Result<RadrootsSimplexAppOutboundTextDraft> { 1841 let direction: String = row.get(3)?; 1842 Ok(RadrootsSimplexAppOutboundTextDraft { 1843 chat_item: RadrootsSimplexAppChatItem { 1844 chat_item_id: row.get(0)?, 1845 conversation_id: row.get(1)?, 1846 logical_order: row.get(2)?, 1847 direction: RadrootsSimplexAppChatDirection::parse(&direction) 1848 .map_err(|error| rusqlite::Error::ToSqlConversionFailure(error.into()))?, 1849 chat_msg_id: row.get(4)?, 1850 body: row.get(5)?, 1851 delivery_status: row.get(6)?, 1852 created_at_unix: row.get(7)?, 1853 }, 1854 outbox_message: RadrootsSimplexAppOutboxMessage { 1855 outbox_id: row.get(8)?, 1856 chat_item_id: row.get(9)?, 1857 connection_id: row.get(10)?, 1858 conversation_id: row.get(11)?, 1859 chat_msg_id: row.get(12)?, 1860 body: row.get(13)?, 1861 status: row.get(14)?, 1862 runtime_message_id: row.get(15)?, 1863 retry_after_unix: row.get(16)?, 1864 created_at_unix: row.get(17)?, 1865 }, 1866 }) 1867 } 1868 1869 fn inbound_message_from_row( 1870 row: &Row<'_>, 1871 ) -> rusqlite::Result<RadrootsSimplexAppInboundMessageLogEntry> { 1872 Ok(RadrootsSimplexAppInboundMessageLogEntry { 1873 inbound_id: row.get(0)?, 1874 connection_id: row.get(1)?, 1875 broker_message_id_hash: row.get(2)?, 1876 inbound_sequence: row.get(3)?, 1877 message_hash: row.get(4)?, 1878 runtime_ack_handle: row.get(5)?, 1879 ack_status: row.get(6)?, 1880 app_record_kind: row.get(7)?, 1881 app_record_id: row.get(8)?, 1882 received_at_unix: row.get(9)?, 1883 }) 1884 } 1885 1886 fn inbound_child_event_from_row( 1887 row: &Row<'_>, 1888 ) -> rusqlite::Result<RadrootsSimplexAppInboundChildEvent> { 1889 let child_ordinal: i64 = row.get(2)?; 1890 let child_ordinal = u32::try_from(child_ordinal).map_err(|error| { 1891 rusqlite::Error::FromSqlConversionFailure( 1892 2, 1893 rusqlite::types::Type::Integer, 1894 Box::new(error), 1895 ) 1896 })?; 1897 Ok(RadrootsSimplexAppInboundChildEvent { 1898 child_event_id: row.get(0)?, 1899 inbound_id: row.get(1)?, 1900 child_ordinal, 1901 app_record_kind: row.get(3)?, 1902 app_record_id: row.get(4)?, 1903 event_kind: row.get(5)?, 1904 chat_msg_id: row.get(6)?, 1905 received_at_unix: row.get(7)?, 1906 }) 1907 } 1908 1909 fn outbox_message_from_row(row: &Row<'_>) -> rusqlite::Result<RadrootsSimplexAppOutboxMessage> { 1910 Ok(RadrootsSimplexAppOutboxMessage { 1911 outbox_id: row.get(0)?, 1912 chat_item_id: row.get(1)?, 1913 connection_id: row.get(2)?, 1914 conversation_id: row.get(3)?, 1915 chat_msg_id: row.get(4)?, 1916 body: row.get(5)?, 1917 status: row.get(6)?, 1918 runtime_message_id: row.get(7)?, 1919 retry_after_unix: row.get(8)?, 1920 created_at_unix: row.get(9)?, 1921 }) 1922 } 1923 1924 fn unsupported_event_from_row( 1925 row: &Row<'_>, 1926 ) -> rusqlite::Result<RadrootsSimplexAppUnsupportedProtocolEvent> { 1927 Ok(RadrootsSimplexAppUnsupportedProtocolEvent { 1928 event_id: row.get(0)?, 1929 connection_id: row.get(1)?, 1930 event_kind: row.get(2)?, 1931 payload_json: row.get(3)?, 1932 status: row.get(4)?, 1933 received_at_unix: row.get(5)?, 1934 }) 1935 } 1936 1937 #[cfg(test)] 1938 mod tests { 1939 use super::*; 1940 use radroots_secret_vault::{RadrootsSecretVault, RadrootsSecretVaultMemory}; 1941 use std::sync::Arc; 1942 1943 fn memory_store( 1944 path: &Path, 1945 vault: Arc<RadrootsSecretVaultMemory>, 1946 ) -> Result<RadrootsSimplexAppStore, RadrootsSimplexAppStoreError> { 1947 RadrootsSimplexAppStore::open_with_vault(path, vault, "test-simplex-app-store", "memory") 1948 } 1949 1950 fn profile() -> RadrootsSimplexAppProfile { 1951 RadrootsSimplexAppProfile { 1952 profile_id: "profile-1".into(), 1953 display_name: "Local Profile".into(), 1954 created_at_unix: 1, 1955 } 1956 } 1957 1958 fn contact() -> RadrootsSimplexAppContact { 1959 RadrootsSimplexAppContact { 1960 contact_id: "contact-1".into(), 1961 profile_id: "profile-1".into(), 1962 display_name: "Phone Contact".into(), 1963 lifecycle: "active".into(), 1964 created_at_unix: 2, 1965 } 1966 } 1967 1968 fn connection() -> RadrootsSimplexAppConnection { 1969 RadrootsSimplexAppConnection { 1970 connection_id: "connection-1".into(), 1971 profile_id: "profile-1".into(), 1972 contact_id: Some("contact-1".into()), 1973 state: "connected".into(), 1974 agent_connection_id: Some("agent-connection-1".into()), 1975 created_at_unix: 3, 1976 } 1977 } 1978 1979 fn queue() -> RadrootsSimplexAppQueueEndpoint { 1980 RadrootsSimplexAppQueueEndpoint { 1981 queue_endpoint_id: "queue-1".into(), 1982 connection_id: "connection-1".into(), 1983 role: "receive".into(), 1984 server: "smp.example".into(), 1985 sender_id: b"sender-id".to_vec(), 1986 status: "active".into(), 1987 created_at_unix: 4, 1988 } 1989 } 1990 1991 fn conversation() -> RadrootsSimplexAppConversation { 1992 RadrootsSimplexAppConversation { 1993 conversation_id: "conversation-1".into(), 1994 profile_id: "profile-1".into(), 1995 contact_id: Some("contact-1".into()), 1996 created_at_unix: 5, 1997 } 1998 } 1999 2000 fn seed_store(store: &RadrootsSimplexAppStore) { 2001 store.upsert_profile(&profile()).expect("profile"); 2002 store.upsert_contact(&contact()).expect("contact"); 2003 store.upsert_connection(&connection()).expect("connection"); 2004 store.upsert_queue_endpoint(&queue()).expect("queue"); 2005 store 2006 .upsert_conversation(&conversation()) 2007 .expect("conversation"); 2008 } 2009 2010 fn outbound_request() -> RadrootsSimplexAppOutboundTextRequest { 2011 RadrootsSimplexAppOutboundTextRequest { 2012 connection_id: "connection-1".into(), 2013 conversation_id: "conversation-1".into(), 2014 body: "hello encrypted iPhone".into(), 2015 created_at_unix: 11, 2016 } 2017 } 2018 2019 fn inbound_text_request() -> RadrootsSimplexAppInboundTextRequest { 2020 RadrootsSimplexAppInboundTextRequest { 2021 connection_id: "connection-1".into(), 2022 conversation_id: "conversation-1".into(), 2023 broker_message_id_hash: b"broker-message-hash-1".to_vec(), 2024 inbound_sequence: Some(21), 2025 message_hash: b"agent-message-hash-1".to_vec(), 2026 runtime_ack_handle: "ack-handle-1".into(), 2027 child_ordinal: 0, 2028 chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), 2029 body: "hello from the iPhone".into(), 2030 received_at_unix: 12, 2031 } 2032 } 2033 2034 fn inbound_unsupported_request() -> RadrootsSimplexAppInboundUnsupportedEventRequest { 2035 RadrootsSimplexAppInboundUnsupportedEventRequest { 2036 connection_id: "connection-1".into(), 2037 broker_message_id_hash: b"broker-message-hash-2".to_vec(), 2038 inbound_sequence: Some(22), 2039 message_hash: b"agent-message-hash-2".to_vec(), 2040 runtime_ack_handle: "ack-handle-2".into(), 2041 child_ordinal: 0, 2042 event_kind: "x.future.dm".into(), 2043 payload_json: "{\"event\":\"x.future.dm\"}".into(), 2044 received_at_unix: 13, 2045 } 2046 } 2047 2048 #[test] 2049 fn empty_store_initializes_encrypted_schema() { 2050 let temp = tempfile::tempdir().expect("temp"); 2051 let path = temp.path().join("simplex.sqlite"); 2052 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2053 let store = memory_store(&path, vault).expect("store"); 2054 2055 let diagnostics = store.diagnostics(); 2056 assert!(diagnostics.encrypted); 2057 assert!(!diagnostics.cipher.is_empty()); 2058 assert_eq!(diagnostics.schema_version, 4); 2059 assert_eq!(diagnostics.migration_count, 4); 2060 assert!(diagnostics.foreign_keys_enabled); 2061 assert!(diagnostics.wal_enabled); 2062 assert_eq!(diagnostics.key_source, "memory"); 2063 assert_eq!(diagnostics.key_slot_digest.len(), 64); 2064 } 2065 2066 #[test] 2067 fn typed_repositories_round_trip_and_indexes_support_queries() { 2068 let temp = tempfile::tempdir().expect("temp"); 2069 let path = temp.path().join("simplex.sqlite"); 2070 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2071 let store = memory_store(&path, vault).expect("store"); 2072 seed_store(&store); 2073 2074 assert_eq!( 2075 store.get_profile("profile-1").expect("profile"), 2076 Some(profile()) 2077 ); 2078 assert_eq!(store.list_profiles().expect("profiles"), vec![profile()]); 2079 assert_eq!(store.list_contacts().expect("contacts"), vec![contact()]); 2080 assert_eq!( 2081 store 2082 .list_connections_by_state("connected") 2083 .expect("connections"), 2084 vec![connection()] 2085 ); 2086 assert_eq!( 2087 store.list_queues_by_status("active").expect("queues"), 2088 vec![queue()] 2089 ); 2090 2091 store 2092 .append_chat_item(&RadrootsSimplexAppChatItem { 2093 chat_item_id: "chat-1".into(), 2094 conversation_id: "conversation-1".into(), 2095 logical_order: 1, 2096 direction: RadrootsSimplexAppChatDirection::Outbound, 2097 chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), 2098 body: "hello encrypted iPhone".into(), 2099 delivery_status: "sent".into(), 2100 created_at_unix: 6, 2101 }) 2102 .expect("chat 1"); 2103 store 2104 .append_chat_item(&RadrootsSimplexAppChatItem { 2105 chat_item_id: "chat-2".into(), 2106 conversation_id: "conversation-1".into(), 2107 logical_order: 2, 2108 direction: RadrootsSimplexAppChatDirection::Inbound, 2109 chat_msg_id: None, 2110 body: "hello encrypted runtime".into(), 2111 delivery_status: "received".into(), 2112 created_at_unix: 7, 2113 }) 2114 .expect("chat 2"); 2115 2116 let page = store.chat_page("conversation-1", 10).expect("page"); 2117 assert_eq!(page[0].chat_item_id, "chat-2"); 2118 assert_eq!(page[1].chat_item_id, "chat-1"); 2119 2120 store 2121 .record_inbound_message(&RadrootsSimplexAppInboundMessageLogEntry { 2122 inbound_id: "inbound-1".into(), 2123 connection_id: "connection-1".into(), 2124 broker_message_id_hash: b"broker-hash".to_vec(), 2125 inbound_sequence: Some(1), 2126 message_hash: b"message-hash".to_vec(), 2127 runtime_ack_handle: "ack-handle-manual".into(), 2128 ack_status: "pending_ack".into(), 2129 app_record_kind: "chat_item".into(), 2130 app_record_id: "chat-2".into(), 2131 received_at_unix: 8, 2132 }) 2133 .expect("inbound"); 2134 assert_eq!( 2135 store.pending_ack_messages().expect("pending ack")[0].inbound_id, 2136 "inbound-1" 2137 ); 2138 2139 store 2140 .enqueue_outbox_message(&RadrootsSimplexAppOutboxMessage { 2141 outbox_id: "outbox-1".into(), 2142 chat_item_id: "chat-1".into(), 2143 connection_id: "connection-1".into(), 2144 conversation_id: Some("conversation-1".into()), 2145 chat_msg_id: "AQIDBAUGBwgJCgsM".into(), 2146 body: "queued plaintext before encryption".into(), 2147 status: "retryable".into(), 2148 runtime_message_id: None, 2149 retry_after_unix: Some(9), 2150 created_at_unix: 9, 2151 }) 2152 .expect("outbox"); 2153 assert_eq!( 2154 store.pending_outbox_messages().expect("outbox")[0].outbox_id, 2155 "outbox-1" 2156 ); 2157 2158 store 2159 .record_unsupported_protocol_event(&RadrootsSimplexAppUnsupportedProtocolEvent { 2160 event_id: "event-1".into(), 2161 connection_id: Some("connection-1".into()), 2162 event_kind: "future_event".into(), 2163 payload_json: "{\"field\":\"value\"}".into(), 2164 status: "stored".into(), 2165 received_at_unix: 10, 2166 }) 2167 .expect("unsupported"); 2168 assert_eq!( 2169 store 2170 .list_unsupported_protocol_events() 2171 .expect("unsupported")[0] 2172 .event_id, 2173 "event-1" 2174 ); 2175 } 2176 2177 #[test] 2178 fn database_bytes_do_not_expose_message_or_profile_text() { 2179 let temp = tempfile::tempdir().expect("temp"); 2180 let path = temp.path().join("simplex.sqlite"); 2181 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2182 let store = memory_store(&path, vault).expect("store"); 2183 seed_store(&store); 2184 store 2185 .append_chat_item(&RadrootsSimplexAppChatItem { 2186 chat_item_id: "chat-1".into(), 2187 conversation_id: "conversation-1".into(), 2188 logical_order: 1, 2189 direction: RadrootsSimplexAppChatDirection::Outbound, 2190 chat_msg_id: Some("AQIDBAUGBwgJCgsM".into()), 2191 body: "plaintext should not appear in sqlite bytes".into(), 2192 delivery_status: "sent".into(), 2193 created_at_unix: 6, 2194 }) 2195 .expect("chat"); 2196 drop(store); 2197 2198 let raw = fs::read(&path).expect("read database"); 2199 let raw_text = String::from_utf8_lossy(&raw); 2200 assert!(!raw_text.contains("Local Profile")); 2201 assert!(!raw_text.contains("plaintext should not appear")); 2202 } 2203 2204 #[test] 2205 fn existing_store_reopens_with_same_vault_key() { 2206 let temp = tempfile::tempdir().expect("temp"); 2207 let path = temp.path().join("simplex.sqlite"); 2208 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2209 let store = memory_store(&path, vault.clone()).expect("store"); 2210 seed_store(&store); 2211 drop(store); 2212 2213 let reopened = memory_store(&path, vault).expect("reopen"); 2214 assert_eq!( 2215 reopened.get_profile("profile-1").expect("profile"), 2216 Some(profile()) 2217 ); 2218 } 2219 2220 #[test] 2221 fn missing_key_for_existing_store_fails_closed() { 2222 let temp = tempfile::tempdir().expect("temp"); 2223 let path = temp.path().join("simplex.sqlite"); 2224 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2225 let store = memory_store(&path, vault).expect("store"); 2226 seed_store(&store); 2227 drop(store); 2228 2229 let missing_vault = Arc::new(RadrootsSecretVaultMemory::new()); 2230 let error = memory_store(&path, missing_vault) 2231 .err() 2232 .expect("missing key error"); 2233 assert_eq!(error, RadrootsSimplexAppStoreError::MissingDatabaseKey); 2234 } 2235 2236 #[test] 2237 fn corrupt_key_fails_before_database_open() { 2238 let temp = tempfile::tempdir().expect("temp"); 2239 let path = temp.path().join("simplex.sqlite"); 2240 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2241 vault 2242 .store_secret("test-simplex-app-store", "not-a-valid-key") 2243 .expect("secret"); 2244 2245 let error = memory_store(&path, vault).err().expect("invalid key error"); 2246 assert!(matches!( 2247 error, 2248 RadrootsSimplexAppStoreError::InvalidDatabaseKey(_) 2249 )); 2250 } 2251 2252 #[test] 2253 fn wrong_key_for_existing_store_fails_closed() { 2254 let temp = tempfile::tempdir().expect("temp"); 2255 let path = temp.path().join("simplex.sqlite"); 2256 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2257 let store = memory_store(&path, vault).expect("store"); 2258 seed_store(&store); 2259 drop(store); 2260 2261 let wrong_vault = Arc::new(RadrootsSecretVaultMemory::new()); 2262 wrong_vault 2263 .store_secret( 2264 "test-simplex-app-store", 2265 "0000000000000000000000000000000000000000000000000000000000000000", 2266 ) 2267 .expect("wrong"); 2268 let error = memory_store(&path, wrong_vault) 2269 .err() 2270 .expect("wrong key error"); 2271 assert_eq!(error, RadrootsSimplexAppStoreError::EncryptionKeyRejected); 2272 } 2273 2274 #[test] 2275 fn foreign_keys_and_unique_dedupe_fail_closed() { 2276 let temp = tempfile::tempdir().expect("temp"); 2277 let path = temp.path().join("simplex.sqlite"); 2278 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2279 let store = memory_store(&path, vault).expect("store"); 2280 let invalid_contact = RadrootsSimplexAppContact { 2281 profile_id: "missing-profile".into(), 2282 ..contact() 2283 }; 2284 assert!(store.upsert_contact(&invalid_contact).is_err()); 2285 2286 seed_store(&store); 2287 let inbound = RadrootsSimplexAppInboundMessageLogEntry { 2288 inbound_id: "inbound-1".into(), 2289 connection_id: "connection-1".into(), 2290 broker_message_id_hash: b"dedupe".to_vec(), 2291 inbound_sequence: Some(1), 2292 message_hash: b"hash".to_vec(), 2293 runtime_ack_handle: "ack-handle-dedupe".into(), 2294 ack_status: "pending_ack".into(), 2295 app_record_kind: "chat_item".into(), 2296 app_record_id: "chat-1".into(), 2297 received_at_unix: 8, 2298 }; 2299 store.record_inbound_message(&inbound).expect("inbound"); 2300 let duplicate = RadrootsSimplexAppInboundMessageLogEntry { 2301 inbound_id: "inbound-2".into(), 2302 ..inbound 2303 }; 2304 assert!(store.record_inbound_message(&duplicate).is_err()); 2305 } 2306 2307 #[test] 2308 fn inbound_text_commit_persists_chat_item_and_pending_ack() { 2309 let temp = tempfile::tempdir().expect("temp"); 2310 let path = temp.path().join("simplex.sqlite"); 2311 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2312 let store = memory_store(&path, vault).expect("store"); 2313 seed_store(&store); 2314 2315 let commit = store 2316 .commit_inbound_text(&inbound_text_request()) 2317 .expect("commit"); 2318 2319 assert!(!commit.duplicate); 2320 assert_eq!(commit.inbound.ack_status, "pending_ack"); 2321 assert_eq!(commit.inbound.app_record_kind, "frame"); 2322 assert_eq!(commit.inbound.runtime_ack_handle, "ack-handle-1"); 2323 let chat_item = commit.chat_item.expect("chat item"); 2324 assert_eq!(commit.child_event.app_record_kind, "chat_item"); 2325 assert_eq!(commit.child_event.app_record_id, chat_item.chat_item_id); 2326 assert_eq!( 2327 chat_item.direction, 2328 RadrootsSimplexAppChatDirection::Inbound 2329 ); 2330 assert_eq!(chat_item.chat_msg_id.as_deref(), Some("AQIDBAUGBwgJCgsM")); 2331 assert_eq!(chat_item.body, "hello from the iPhone"); 2332 assert_eq!( 2333 store.chat_page("conversation-1", 10).expect("page"), 2334 vec![chat_item] 2335 ); 2336 assert_eq!( 2337 store.pending_ack_messages().expect("pending ack"), 2338 vec![commit.inbound] 2339 ); 2340 } 2341 2342 #[test] 2343 fn inbound_text_duplicate_redelivery_returns_prior_commit() { 2344 let temp = tempfile::tempdir().expect("temp"); 2345 let path = temp.path().join("simplex.sqlite"); 2346 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2347 let store = memory_store(&path, vault).expect("store"); 2348 seed_store(&store); 2349 2350 let first = store 2351 .commit_inbound_text(&inbound_text_request()) 2352 .expect("first"); 2353 let second = store 2354 .commit_inbound_text(&inbound_text_request()) 2355 .expect("second"); 2356 2357 assert!(second.duplicate); 2358 assert_eq!(second.inbound, first.inbound); 2359 assert_eq!(second.chat_item, first.chat_item); 2360 assert_eq!( 2361 store.chat_page("conversation-1", 10).expect("page").len(), 2362 1 2363 ); 2364 assert_eq!(store.pending_ack_messages().expect("pending").len(), 1); 2365 } 2366 2367 #[test] 2368 fn inbound_frame_persists_multiple_child_events_with_one_pending_ack() { 2369 let temp = tempfile::tempdir().expect("temp"); 2370 let path = temp.path().join("simplex.sqlite"); 2371 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2372 let store = memory_store(&path, vault).expect("store"); 2373 seed_store(&store); 2374 2375 let first = store 2376 .commit_inbound_text(&inbound_text_request()) 2377 .expect("first"); 2378 let second = store 2379 .commit_inbound_text(&RadrootsSimplexAppInboundTextRequest { 2380 child_ordinal: 1, 2381 chat_msg_id: Some("AgIDBAUGBwgJCgsM".into()), 2382 body: "second child event".into(), 2383 ..inbound_text_request() 2384 }) 2385 .expect("second"); 2386 2387 assert_eq!(first.inbound.inbound_id, second.inbound.inbound_id); 2388 assert_eq!(first.child_event.child_ordinal, 0); 2389 assert_eq!(second.child_event.child_ordinal, 1); 2390 assert_eq!(store.pending_ack_messages().expect("pending").len(), 1); 2391 assert_eq!( 2392 store.chat_page("conversation-1", 10).expect("page").len(), 2393 2 2394 ); 2395 } 2396 2397 #[test] 2398 fn inbound_unsupported_event_commit_persists_safe_record_and_pending_ack() { 2399 let temp = tempfile::tempdir().expect("temp"); 2400 let path = temp.path().join("simplex.sqlite"); 2401 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2402 let store = memory_store(&path, vault).expect("store"); 2403 seed_store(&store); 2404 2405 let commit = store 2406 .commit_inbound_unsupported_event(&inbound_unsupported_request()) 2407 .expect("commit"); 2408 2409 assert!(!commit.duplicate); 2410 assert_eq!(commit.inbound.ack_status, "pending_ack"); 2411 assert_eq!(commit.inbound.app_record_kind, "frame"); 2412 let unsupported = commit.unsupported_event.expect("unsupported event"); 2413 assert_eq!(commit.child_event.app_record_kind, "unsupported_event"); 2414 assert_eq!(commit.child_event.app_record_id, unsupported.event_id); 2415 assert_eq!(unsupported.event_kind, "x.future.dm"); 2416 assert_eq!(unsupported.status, "stored"); 2417 assert_eq!( 2418 store 2419 .list_unsupported_protocol_events() 2420 .expect("unsupported"), 2421 vec![unsupported] 2422 ); 2423 assert_eq!(store.pending_ack_messages().expect("pending").len(), 1); 2424 } 2425 2426 #[test] 2427 fn inbound_ack_delivery_marks_pending_row_acked() { 2428 let temp = tempfile::tempdir().expect("temp"); 2429 let path = temp.path().join("simplex.sqlite"); 2430 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2431 let store = memory_store(&path, vault).expect("store"); 2432 seed_store(&store); 2433 2434 let commit = store 2435 .commit_inbound_text(&inbound_text_request()) 2436 .expect("commit"); 2437 let acked = store 2438 .mark_inbound_ack_delivered("connection-1", 21, b"agent-message-hash-1") 2439 .expect("ack") 2440 .expect("row"); 2441 2442 assert_eq!(acked.inbound_id, commit.inbound.inbound_id); 2443 assert_eq!(acked.ack_status, "acked"); 2444 assert!(store.pending_ack_messages().expect("pending").is_empty()); 2445 assert_eq!( 2446 store 2447 .mark_inbound_ack_delivered("connection-1", 21, b"agent-message-hash-1") 2448 .expect("idempotent") 2449 .expect("row") 2450 .ack_status, 2451 "acked" 2452 ); 2453 assert!( 2454 store 2455 .mark_inbound_ack_delivered("connection-1", 21, b"wrong-hash") 2456 .expect("missing") 2457 .is_none() 2458 ); 2459 } 2460 2461 #[test] 2462 fn invalid_inbound_text_does_not_create_chat_or_pending_ack() { 2463 let temp = tempfile::tempdir().expect("temp"); 2464 let path = temp.path().join("simplex.sqlite"); 2465 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2466 let store = memory_store(&path, vault).expect("store"); 2467 seed_store(&store); 2468 let invalid = RadrootsSimplexAppInboundTextRequest { 2469 body: " ".into(), 2470 ..inbound_text_request() 2471 }; 2472 2473 assert!(store.commit_inbound_text(&invalid).is_err()); 2474 assert!( 2475 store 2476 .chat_page("conversation-1", 10) 2477 .expect("page") 2478 .is_empty() 2479 ); 2480 assert!(store.pending_ack_messages().expect("pending").is_empty()); 2481 } 2482 2483 #[test] 2484 fn outbound_text_lifecycle_persists_chat_item_outbox_and_msg_id() { 2485 let temp = tempfile::tempdir().expect("temp"); 2486 let path = temp.path().join("simplex.sqlite"); 2487 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2488 let store = memory_store(&path, vault).expect("store"); 2489 seed_store(&store); 2490 2491 let draft = store 2492 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2493 .expect("draft"); 2494 2495 assert_eq!( 2496 draft.chat_item.direction, 2497 RadrootsSimplexAppChatDirection::Outbound 2498 ); 2499 assert_eq!( 2500 draft.chat_item.chat_msg_id.as_deref(), 2501 Some("AQIDBAUGBwgJCgsM") 2502 ); 2503 assert_eq!(draft.chat_item.delivery_status, "pending"); 2504 assert_eq!( 2505 draft.outbox_message.chat_item_id, 2506 draft.chat_item.chat_item_id 2507 ); 2508 assert_eq!(draft.outbox_message.chat_msg_id, "AQIDBAUGBwgJCgsM"); 2509 assert_eq!(draft.outbox_message.status, "pending"); 2510 let page = store.chat_page("conversation-1", 10).expect("page"); 2511 assert_eq!(page, vec![draft.chat_item]); 2512 let pending = store.pending_outbox_messages().expect("pending"); 2513 assert_eq!(pending, vec![draft.outbox_message.clone()]); 2514 assert_eq!( 2515 store.list_outbox_messages().expect("outbox"), 2516 vec![draft.outbox_message] 2517 ); 2518 } 2519 2520 #[test] 2521 fn reset_disposable_runtime_state_preserves_profiles_and_clears_messages() { 2522 let temp = tempfile::tempdir().expect("temp"); 2523 let path = temp.path().join("simplex.sqlite"); 2524 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2525 let store = memory_store(&path, vault).expect("store"); 2526 seed_store(&store); 2527 2528 let draft = store 2529 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2530 .expect("draft"); 2531 let commit = store 2532 .commit_inbound_text(&RadrootsSimplexAppInboundTextRequest { 2533 chat_msg_id: Some("AgIDBAUGBwgJCgsM".into()), 2534 broker_message_id_hash: b"reset-broker-hash".to_vec(), 2535 message_hash: b"reset-message-hash".to_vec(), 2536 runtime_ack_handle: "ack-handle-reset".into(), 2537 ..inbound_text_request() 2538 }) 2539 .expect("inbound"); 2540 store 2541 .record_unsupported_protocol_event(&RadrootsSimplexAppUnsupportedProtocolEvent { 2542 event_id: "unsupported-1".into(), 2543 connection_id: Some("connection-1".into()), 2544 event_kind: "x.future".into(), 2545 payload_json: "{}".into(), 2546 status: "stored".into(), 2547 received_at_unix: 11, 2548 }) 2549 .expect("unsupported"); 2550 2551 assert_eq!(store.pending_outbox_messages().expect("outbox").len(), 1); 2552 assert_eq!( 2553 store.pending_ack_messages().expect("acks"), 2554 vec![commit.inbound] 2555 ); 2556 assert_eq!( 2557 store 2558 .list_unsupported_protocol_events() 2559 .expect("unsupported") 2560 .len(), 2561 1 2562 ); 2563 2564 store 2565 .reset_disposable_runtime_state() 2566 .expect("reset disposable state"); 2567 2568 assert_eq!( 2569 store.get_profile("profile-1").expect("profile"), 2570 Some(profile()) 2571 ); 2572 assert!(store.pending_outbox_messages().expect("outbox").is_empty()); 2573 assert!(store.list_outbox_messages().expect("outbox").is_empty()); 2574 assert!(store.pending_ack_messages().expect("acks").is_empty()); 2575 assert!( 2576 store 2577 .chat_page("conversation-1", 10) 2578 .expect("chat") 2579 .is_empty() 2580 ); 2581 assert!( 2582 store 2583 .list_unsupported_protocol_events() 2584 .expect("unsupported") 2585 .is_empty() 2586 ); 2587 assert!( 2588 store 2589 .mark_outbox_message_sent(&draft.outbox_message.outbox_id) 2590 .expect("missing") 2591 .is_none() 2592 ); 2593 } 2594 2595 #[test] 2596 fn outbound_text_retry_preserves_msg_id_and_chat_item() { 2597 let temp = tempfile::tempdir().expect("temp"); 2598 let path = temp.path().join("simplex.sqlite"); 2599 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2600 let store = memory_store(&path, vault).expect("store"); 2601 seed_store(&store); 2602 2603 let first = store 2604 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2605 .expect("first"); 2606 let second = store 2607 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2608 .expect("second"); 2609 2610 assert_eq!(second, first); 2611 assert_eq!( 2612 store.chat_page("conversation-1", 10).expect("page").len(), 2613 1 2614 ); 2615 assert_eq!(store.pending_outbox_messages().expect("pending").len(), 1); 2616 } 2617 2618 #[test] 2619 fn outbound_runtime_correlation_removes_message_from_retry_queue() { 2620 let temp = tempfile::tempdir().expect("temp"); 2621 let path = temp.path().join("simplex.sqlite"); 2622 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2623 let store = memory_store(&path, vault).expect("store"); 2624 seed_store(&store); 2625 2626 let draft = store 2627 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2628 .expect("draft"); 2629 let queued = store 2630 .mark_outbox_message_queued(&draft.outbox_message.outbox_id, 42) 2631 .expect("queued") 2632 .expect("queued row"); 2633 2634 assert_eq!(queued.outbox_message.runtime_message_id, Some(42)); 2635 assert!(store.pending_outbox_messages().expect("pending").is_empty()); 2636 } 2637 2638 #[test] 2639 fn outbound_delivery_state_updates_are_idempotent() { 2640 let temp = tempfile::tempdir().expect("temp"); 2641 let path = temp.path().join("simplex.sqlite"); 2642 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2643 let store = memory_store(&path, vault).expect("store"); 2644 seed_store(&store); 2645 2646 let draft = store 2647 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2648 .expect("draft"); 2649 let sent = store 2650 .mark_outbox_message_sent(&draft.outbox_message.outbox_id) 2651 .expect("sent") 2652 .expect("sent row"); 2653 2654 assert_eq!(sent.outbox_message.status, "sent"); 2655 assert_eq!(sent.chat_item.delivery_status, "sent"); 2656 assert!(store.pending_outbox_messages().expect("pending").is_empty()); 2657 assert_eq!( 2658 store 2659 .mark_outbox_message_sent(&draft.outbox_message.outbox_id) 2660 .expect("sent again") 2661 .expect("sent row") 2662 .outbox_message 2663 .status, 2664 "sent" 2665 ); 2666 2667 let acknowledged = store 2668 .mark_outbox_message_acknowledged(&draft.outbox_message.outbox_id) 2669 .expect("acknowledged") 2670 .expect("acknowledged row"); 2671 assert_eq!(acknowledged.outbox_message.status, "acknowledged"); 2672 assert_eq!(acknowledged.chat_item.delivery_status, "acknowledged"); 2673 assert_eq!( 2674 store 2675 .mark_outbox_message_sent(&draft.outbox_message.outbox_id) 2676 .expect("sent after acknowledged") 2677 .expect("row") 2678 .outbox_message 2679 .status, 2680 "acknowledged" 2681 ); 2682 assert!( 2683 store 2684 .mark_outbox_message_acknowledged("missing-outbox") 2685 .expect("missing") 2686 .is_none() 2687 ); 2688 } 2689 2690 #[test] 2691 fn outbound_delivery_transitions_fail_closed() { 2692 let temp = tempfile::tempdir().expect("temp"); 2693 let path = temp.path().join("simplex.sqlite"); 2694 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2695 let store = memory_store(&path, vault).expect("store"); 2696 seed_store(&store); 2697 2698 let draft = store 2699 .create_outbound_text_with_test_msg_id(&outbound_request(), "AQIDBAUGBwgJCgsM") 2700 .expect("draft"); 2701 let error = store 2702 .mark_outbox_message_acknowledged(&draft.outbox_message.outbox_id) 2703 .err() 2704 .expect("transition error"); 2705 2706 assert!(matches!( 2707 error, 2708 RadrootsSimplexAppStoreError::MessageLifecycle(_) 2709 )); 2710 } 2711 2712 #[test] 2713 fn outbound_text_generates_twelve_byte_base64url_msg_id() { 2714 let temp = tempfile::tempdir().expect("temp"); 2715 let path = temp.path().join("simplex.sqlite"); 2716 let vault = Arc::new(RadrootsSecretVaultMemory::new()); 2717 let store = memory_store(&path, vault).expect("store"); 2718 seed_store(&store); 2719 2720 let draft = store 2721 .create_outbound_text(&outbound_request()) 2722 .expect("draft"); 2723 let chat_msg_id = draft.outbox_message.chat_msg_id; 2724 let decoded = URL_SAFE_NO_PAD 2725 .decode(chat_msg_id.as_bytes()) 2726 .expect("base64url"); 2727 2728 assert_eq!(decoded.len(), CHAT_MSG_ID_BYTES); 2729 assert_eq!( 2730 draft.chat_item.chat_msg_id.as_deref(), 2731 Some(chat_msg_id.as_str()) 2732 ); 2733 } 2734 }