store.rs (34416B)
1 #![forbid(unsafe_code)] 2 3 use radroots_sql_core::SqlExecutor; 4 use radroots_sql_core::error::SqlError; 5 use serde::Deserialize; 6 use serde_json::{Value, json}; 7 8 use crate::migrations; 9 use crate::models::validate_non_empty; 10 use crate::{ 11 LocalEventRecord, LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsCursor, 12 LocalEventsError, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime, 13 }; 14 15 pub struct LocalEventsStore<E: SqlExecutor> { 16 executor: E, 17 } 18 19 impl<E: SqlExecutor> LocalEventsStore<E> { 20 pub fn new(executor: E) -> Self { 21 Self { executor } 22 } 23 24 pub fn executor(&self) -> &E { 25 &self.executor 26 } 27 28 pub fn migrate_up(&self) -> Result<(), SqlError> { 29 migrations::run_all_up(self.executor()) 30 } 31 32 pub fn migrate_down(&self) -> Result<(), SqlError> { 33 migrations::run_all_down(self.executor()) 34 } 35 36 pub fn append_record( 37 &self, 38 input: &LocalEventRecordInput, 39 ) -> Result<LocalEventRecord, LocalEventsError> { 40 input.validate()?; 41 self.executor.begin()?; 42 let result = (|| -> Result<(), LocalEventsError> { 43 let change_seq = self.next_change_seq()?; 44 let params = json!([ 45 change_seq, 46 input.record_id, 47 input.family.as_str(), 48 input.status.as_str(), 49 input.source_runtime.as_str(), 50 input.created_at_ms, 51 input.inserted_at_ms, 52 input.inserted_at_ms, 53 input.owner_account_id, 54 input.owner_pubkey, 55 input.farm_id, 56 input.listing_addr, 57 encode_json(input.local_work_json.as_ref()), 58 input.event_id, 59 input.event_kind, 60 input.event_pubkey, 61 input.event_created_at, 62 encode_json(input.event_tags_json.as_ref()), 63 input.event_content, 64 input.event_sig, 65 encode_json(input.raw_event_json.as_ref()), 66 input.outbox_status.as_str(), 67 input.relay_set_fingerprint, 68 encode_json(input.relay_delivery_json.as_ref()) 69 ]) 70 .to_string(); 71 let sql = "insert or ignore into local_event_record( 72 change_seq, 73 record_id, 74 family, 75 status, 76 source_runtime, 77 created_at_ms, 78 inserted_at_ms, 79 updated_at_ms, 80 owner_account_id, 81 owner_pubkey, 82 farm_id, 83 listing_addr, 84 local_work_json, 85 event_id, 86 event_kind, 87 event_pubkey, 88 event_created_at, 89 event_tags_json, 90 event_content, 91 event_sig, 92 raw_event_json, 93 outbox_status, 94 relay_set_fingerprint, 95 relay_delivery_json 96 ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; 97 let _ = self.executor.exec(sql, ¶ms)?; 98 Ok(()) 99 })(); 100 match result { 101 Ok(()) => self.executor.commit()?, 102 Err(err) => { 103 let _ = self.executor.rollback(); 104 return Err(err); 105 } 106 } 107 self.get_record(&input.record_id)? 108 .ok_or_else(|| LocalEventsError::InvalidRecord("record append failed".to_owned())) 109 } 110 111 pub fn get_record( 112 &self, 113 record_id: &str, 114 ) -> Result<Option<LocalEventRecord>, LocalEventsError> { 115 validate_non_empty("record_id", record_id)?; 116 let params = json!([record_id]).to_string(); 117 let rows = self.query_records( 118 "select * from local_event_record where record_id = ? limit 1", 119 ¶ms, 120 )?; 121 Ok(rows.into_iter().next()) 122 } 123 124 pub fn list_records_after_seq( 125 &self, 126 after_seq: i64, 127 limit: u32, 128 ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { 129 let params = json!([after_seq, i64::from(limit)]).to_string(); 130 self.query_records( 131 "select * from local_event_record where seq > ? order by seq asc limit ?", 132 ¶ms, 133 ) 134 } 135 136 pub fn list_records_changed_after( 137 &self, 138 after_change_seq: i64, 139 limit: u32, 140 ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { 141 let params = json!([after_change_seq, i64::from(limit)]).to_string(); 142 self.query_records( 143 "select * from local_event_record where change_seq > ? order by change_seq asc, seq asc limit ?", 144 ¶ms, 145 ) 146 } 147 148 pub fn list_records_changed_latest( 149 &self, 150 limit: u32, 151 ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { 152 let params = json!([i64::from(limit)]).to_string(); 153 self.query_records( 154 "select * from local_event_record order by change_seq desc, seq desc, record_id asc limit ?", 155 ¶ms, 156 ) 157 } 158 159 pub fn list_records_changed_before( 160 &self, 161 before_change_seq: i64, 162 before_seq: i64, 163 limit: u32, 164 ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { 165 let params = json!([ 166 before_change_seq, 167 before_change_seq, 168 before_seq, 169 i64::from(limit) 170 ]) 171 .to_string(); 172 self.query_records( 173 "select * from local_event_record 174 where change_seq < ? or (change_seq = ? and seq < ?) 175 order by change_seq desc, seq desc, record_id asc 176 limit ?", 177 ¶ms, 178 ) 179 } 180 181 pub fn update_outbox( 182 &self, 183 update: &LocalEventRecordUpdate, 184 ) -> Result<LocalEventRecord, LocalEventsError> { 185 validate_non_empty("record_id", &update.record_id)?; 186 self.executor.begin()?; 187 let result = (|| -> Result<i64, LocalEventsError> { 188 let change_seq = self.next_change_seq()?; 189 let params = json!([ 190 change_seq, 191 update.status.as_str(), 192 update.outbox_status.as_str(), 193 update.relay_set_fingerprint, 194 encode_json(update.relay_delivery_json.as_ref()), 195 update.updated_at_ms, 196 update.record_id 197 ]) 198 .to_string(); 199 let outcome = self.executor.exec( 200 "update local_event_record 201 set change_seq = ?, 202 status = ?, 203 outbox_status = ?, 204 relay_set_fingerprint = ?, 205 relay_delivery_json = ?, 206 updated_at_ms = ? 207 where record_id = ?", 208 ¶ms, 209 )?; 210 Ok(outcome.changes) 211 })(); 212 let changes = match result { 213 Ok(changes) => { 214 self.executor.commit()?; 215 changes 216 } 217 Err(err) => { 218 let _ = self.executor.rollback(); 219 return Err(err); 220 } 221 }; 222 if changes == 0 { 223 return Err(LocalEventsError::Sql(SqlError::NotFound( 224 update.record_id.clone(), 225 ))); 226 } 227 self.get_record(&update.record_id)? 228 .ok_or_else(|| LocalEventsError::Sql(SqlError::NotFound(update.record_id.clone()))) 229 } 230 231 pub fn get_cursor( 232 &self, 233 consumer_id: &str, 234 ) -> Result<Option<LocalEventsCursor>, LocalEventsError> { 235 validate_non_empty("consumer_id", consumer_id)?; 236 let params = json!([consumer_id]).to_string(); 237 let raw = self.executor.query_raw( 238 "select consumer_id, last_change_seq, updated_at_ms from local_event_projection_cursor where consumer_id = ? limit 1", 239 ¶ms, 240 )?; 241 let rows: Vec<CursorRow> = serde_json::from_str(&raw)?; 242 Ok(rows.into_iter().next().map(Into::into)) 243 } 244 245 pub fn advance_cursor( 246 &self, 247 consumer_id: &str, 248 last_change_seq: i64, 249 updated_at_ms: i64, 250 ) -> Result<LocalEventsCursor, LocalEventsError> { 251 validate_non_empty("consumer_id", consumer_id)?; 252 let params = json!([consumer_id, last_change_seq, updated_at_ms]).to_string(); 253 self.executor.exec( 254 "insert into local_event_projection_cursor(consumer_id, last_change_seq, updated_at_ms) 255 values(?,?,?) 256 on conflict(consumer_id) do update set 257 last_change_seq = max(local_event_projection_cursor.last_change_seq, excluded.last_change_seq), 258 updated_at_ms = excluded.updated_at_ms", 259 ¶ms, 260 )?; 261 self.get_cursor(consumer_id)? 262 .ok_or_else(|| LocalEventsError::InvalidRecord("cursor advance failed".to_owned())) 263 } 264 265 fn query_records( 266 &self, 267 sql: &str, 268 params: &str, 269 ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { 270 let raw = self.executor.query_raw(sql, params)?; 271 let rows: Vec<RecordRow> = serde_json::from_str(&raw)?; 272 rows.into_iter().map(TryInto::try_into).collect() 273 } 274 275 fn next_change_seq(&self) -> Result<i64, LocalEventsError> { 276 let raw = self.executor.query_raw( 277 "select coalesce(max(change_seq), 0) + 1 as change_seq from local_event_record", 278 "[]", 279 )?; 280 let rows: Vec<ChangeSeqRow> = serde_json::from_str(&raw)?; 281 rows.into_iter() 282 .next() 283 .map(|row| row.change_seq) 284 .ok_or_else(|| { 285 LocalEventsError::InvalidRecord("change sequence unavailable".to_owned()) 286 }) 287 } 288 } 289 290 #[derive(Debug, Deserialize)] 291 struct RecordRow { 292 seq: i64, 293 change_seq: i64, 294 record_id: String, 295 family: String, 296 status: String, 297 source_runtime: String, 298 created_at_ms: i64, 299 inserted_at_ms: i64, 300 updated_at_ms: i64, 301 owner_account_id: Option<String>, 302 owner_pubkey: Option<String>, 303 farm_id: Option<String>, 304 listing_addr: Option<String>, 305 local_work_json: Option<String>, 306 event_id: Option<String>, 307 event_kind: Option<i64>, 308 event_pubkey: Option<String>, 309 event_created_at: Option<i64>, 310 event_tags_json: Option<String>, 311 event_content: Option<String>, 312 event_sig: Option<String>, 313 raw_event_json: Option<String>, 314 outbox_status: String, 315 relay_set_fingerprint: Option<String>, 316 relay_delivery_json: Option<String>, 317 } 318 319 impl TryFrom<RecordRow> for LocalEventRecord { 320 type Error = LocalEventsError; 321 322 fn try_from(row: RecordRow) -> Result<Self, Self::Error> { 323 Ok(Self { 324 seq: row.seq, 325 change_seq: row.change_seq, 326 record_id: row.record_id, 327 family: LocalRecordFamily::parse(&row.family)?, 328 status: LocalRecordStatus::parse(&row.status)?, 329 source_runtime: SourceRuntime::parse(&row.source_runtime)?, 330 created_at_ms: row.created_at_ms, 331 inserted_at_ms: row.inserted_at_ms, 332 updated_at_ms: row.updated_at_ms, 333 owner_account_id: row.owner_account_id, 334 owner_pubkey: row.owner_pubkey, 335 farm_id: row.farm_id, 336 listing_addr: row.listing_addr, 337 local_work_json: decode_json(row.local_work_json)?, 338 event_id: row.event_id, 339 event_kind: row.event_kind, 340 event_pubkey: row.event_pubkey, 341 event_created_at: row.event_created_at, 342 event_tags_json: decode_json(row.event_tags_json)?, 343 event_content: row.event_content, 344 event_sig: row.event_sig, 345 raw_event_json: decode_json(row.raw_event_json)?, 346 outbox_status: PublishOutboxStatus::parse(&row.outbox_status)?, 347 relay_set_fingerprint: row.relay_set_fingerprint, 348 relay_delivery_json: decode_json(row.relay_delivery_json)?, 349 }) 350 } 351 } 352 353 #[derive(Debug, Deserialize)] 354 struct CursorRow { 355 consumer_id: String, 356 last_change_seq: i64, 357 updated_at_ms: i64, 358 } 359 360 impl From<CursorRow> for LocalEventsCursor { 361 fn from(row: CursorRow) -> Self { 362 Self { 363 consumer_id: row.consumer_id, 364 last_change_seq: row.last_change_seq, 365 updated_at_ms: row.updated_at_ms, 366 } 367 } 368 } 369 370 #[derive(Debug, Deserialize)] 371 struct ChangeSeqRow { 372 change_seq: i64, 373 } 374 375 fn encode_json(value: Option<&Value>) -> Option<String> { 376 value.map(Value::to_string) 377 } 378 379 fn decode_json(value: Option<String>) -> Result<Option<Value>, LocalEventsError> { 380 value 381 .map(|value| serde_json::from_str(&value)) 382 .transpose() 383 .map_err(Into::into) 384 } 385 386 #[cfg(test)] 387 mod tests { 388 use std::collections::VecDeque; 389 use std::sync::Mutex; 390 use std::sync::atomic::{AtomicUsize, Ordering}; 391 392 use radroots_sql_core::{ExecOutcome, SqlExecutor, SqliteExecutor}; 393 use serde_json::json; 394 395 use super::*; 396 397 fn store() -> LocalEventsStore<SqliteExecutor> { 398 let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); 399 let store = LocalEventsStore::new(executor); 400 store.migrate_up().expect("migrate up"); 401 store 402 } 403 404 fn local_work(record_id: &str) -> LocalEventRecordInput { 405 LocalEventRecordInput { 406 record_id: record_id.to_owned(), 407 family: LocalRecordFamily::LocalWork, 408 status: LocalRecordStatus::LocalSaved, 409 source_runtime: SourceRuntime::Cli, 410 created_at_ms: 1000, 411 inserted_at_ms: 1001, 412 owner_account_id: Some("seller-account".to_owned()), 413 owner_pubkey: Some("seller-pubkey".to_owned()), 414 farm_id: Some("farm-a".to_owned()), 415 listing_addr: Some("listing-a".to_owned()), 416 local_work_json: Some(json!({"kind":"listing","title":"Eggs"})), 417 event_id: None, 418 event_kind: None, 419 event_pubkey: None, 420 event_created_at: None, 421 event_tags_json: None, 422 event_content: None, 423 event_sig: None, 424 raw_event_json: None, 425 outbox_status: PublishOutboxStatus::None, 426 relay_set_fingerprint: None, 427 relay_delivery_json: None, 428 } 429 } 430 431 fn signed_event(record_id: &str) -> LocalEventRecordInput { 432 LocalEventRecordInput { 433 record_id: record_id.to_owned(), 434 family: LocalRecordFamily::SignedEvent, 435 status: LocalRecordStatus::PendingPublish, 436 source_runtime: SourceRuntime::Cli, 437 created_at_ms: 2000, 438 inserted_at_ms: 2001, 439 owner_account_id: Some("seller-account".to_owned()), 440 owner_pubkey: Some("seller-pubkey".to_owned()), 441 farm_id: Some("farm-a".to_owned()), 442 listing_addr: Some("listing-a".to_owned()), 443 local_work_json: None, 444 event_id: Some(record_id.to_owned()), 445 event_kind: Some(3421), 446 event_pubkey: Some("seller-pubkey".to_owned()), 447 event_created_at: Some(2000), 448 event_tags_json: Some(json!([["d", "listing-a"]])), 449 event_content: Some("{\"title\":\"Eggs\"}".to_owned()), 450 event_sig: Some("sig-a".to_owned()), 451 raw_event_json: Some(json!({"id":record_id,"kind":3421})), 452 outbox_status: PublishOutboxStatus::Pending, 453 relay_set_fingerprint: Some("relay-set-a".to_owned()), 454 relay_delivery_json: Some(json!({ 455 "state": "pending", 456 "target_relays": ["ws://127.0.0.1:8080"], 457 "connected_relays": [], 458 "acknowledged_relays": [], 459 "failed_relays": [] 460 })), 461 } 462 } 463 464 #[derive(Debug)] 465 struct ScriptedExecutor { 466 begin_result: Mutex<Result<(), SqlError>>, 467 commit_result: Mutex<Result<(), SqlError>>, 468 exec_results: Mutex<VecDeque<Result<ExecOutcome, SqlError>>>, 469 query_results: Mutex<VecDeque<Result<String, SqlError>>>, 470 rollbacks: AtomicUsize, 471 } 472 473 impl ScriptedExecutor { 474 fn new( 475 exec_results: Vec<Result<ExecOutcome, SqlError>>, 476 query_results: Vec<Result<String, SqlError>>, 477 ) -> Self { 478 Self { 479 begin_result: Mutex::new(Ok(())), 480 commit_result: Mutex::new(Ok(())), 481 exec_results: Mutex::new(exec_results.into()), 482 query_results: Mutex::new(query_results.into()), 483 rollbacks: AtomicUsize::new(0), 484 } 485 } 486 487 fn with_begin_error(error: SqlError) -> Self { 488 let executor = Self::new(Vec::new(), Vec::new()); 489 *executor.begin_result.lock().expect("begin result") = Err(error); 490 executor 491 } 492 493 fn with_commit_error(error: SqlError) -> Self { 494 let executor = Self::new( 495 vec![Ok(ExecOutcome { 496 changes: 1, 497 last_insert_id: 0, 498 })], 499 vec![Ok(r#"[{"change_seq":1}]"#.to_owned())], 500 ); 501 *executor.commit_result.lock().expect("commit result") = Err(error); 502 executor 503 } 504 } 505 506 impl SqlExecutor for ScriptedExecutor { 507 fn exec(&self, _sql: &str, _params_json: &str) -> Result<ExecOutcome, SqlError> { 508 self.exec_results 509 .lock() 510 .expect("exec results") 511 .pop_front() 512 .unwrap_or(Ok(ExecOutcome { 513 changes: 1, 514 last_insert_id: 0, 515 })) 516 } 517 518 fn query_raw(&self, _sql: &str, _params_json: &str) -> Result<String, SqlError> { 519 self.query_results 520 .lock() 521 .expect("query results") 522 .pop_front() 523 .unwrap_or_else(|| Ok("[]".to_owned())) 524 } 525 526 fn begin(&self) -> Result<(), SqlError> { 527 self.begin_result.lock().expect("begin result").clone() 528 } 529 530 fn commit(&self) -> Result<(), SqlError> { 531 self.commit_result.lock().expect("commit result").clone() 532 } 533 534 fn rollback(&self) -> Result<(), SqlError> { 535 self.rollbacks.fetch_add(1, Ordering::SeqCst); 536 Ok(()) 537 } 538 } 539 540 fn record_row_with(field: &str, value: serde_json::Value) -> String { 541 let mut row = json!({ 542 "seq": 1, 543 "change_seq": 1, 544 "record_id": "record-a", 545 "family": "signed_event", 546 "status": "pending_publish", 547 "source_runtime": "cli", 548 "created_at_ms": 1000, 549 "inserted_at_ms": 1001, 550 "updated_at_ms": 1001, 551 "owner_account_id": "seller-account", 552 "owner_pubkey": "seller-pubkey", 553 "farm_id": "farm-a", 554 "listing_addr": "listing-a", 555 "local_work_json": null, 556 "event_id": "event-a", 557 "event_kind": 3421, 558 "event_pubkey": "seller-pubkey", 559 "event_created_at": 1000, 560 "event_tags_json": "[[\"d\",\"listing-a\"]]", 561 "event_content": "{}", 562 "event_sig": "sig-a", 563 "raw_event_json": "{\"id\":\"event-a\",\"kind\":3421}", 564 "outbox_status": "pending", 565 "relay_set_fingerprint": "relay-set-a", 566 "relay_delivery_json": "{\"state\":\"pending\",\"target_relays\":[\"ws://127.0.0.1:8080\"],\"connected_relays\":[],\"acknowledged_relays\":[],\"failed_relays\":[]}" 567 }); 568 row[field] = value; 569 json!([row]).to_string() 570 } 571 572 #[test] 573 fn store_methods_round_trip_records_and_cursors() { 574 let store = store(); 575 576 assert!( 577 store 578 .executor() 579 .query_raw("select 1 as value", "[]") 580 .is_ok() 581 ); 582 assert!(store.get_record("missing").expect("get missing").is_none()); 583 assert!(store.get_cursor("app").expect("cursor missing").is_none()); 584 585 let local = store 586 .append_record(&local_work("local-a")) 587 .expect("append local work"); 588 let event = store 589 .append_record(&signed_event("event-a")) 590 .expect("append signed event"); 591 592 assert_eq!( 593 store 594 .get_record("local-a") 595 .expect("get local") 596 .expect("local record") 597 .record_id, 598 local.record_id 599 ); 600 assert_eq!( 601 store 602 .list_records_after_seq(0, 10) 603 .expect("list after seq") 604 .len(), 605 2 606 ); 607 assert_eq!( 608 store 609 .list_records_changed_after(local.change_seq, 10) 610 .expect("list changed after")[0] 611 .record_id, 612 event.record_id 613 ); 614 assert_eq!( 615 store.list_records_changed_latest(1).expect("list latest")[0].record_id, 616 event.record_id 617 ); 618 assert_eq!( 619 store 620 .list_records_changed_before(event.change_seq, event.seq, 10) 621 .expect("list before")[0] 622 .record_id, 623 local.record_id 624 ); 625 626 let cursor = store 627 .advance_cursor("app", event.change_seq, 3000) 628 .expect("advance cursor"); 629 assert_eq!(cursor.consumer_id, "app"); 630 assert_eq!( 631 store 632 .get_cursor("app") 633 .expect("get cursor") 634 .expect("cursor") 635 .last_change_seq, 636 event.change_seq 637 ); 638 639 let updated = store 640 .update_outbox(&LocalEventRecordUpdate { 641 record_id: "event-a".to_owned(), 642 status: LocalRecordStatus::Published, 643 outbox_status: PublishOutboxStatus::Acknowledged, 644 relay_set_fingerprint: Some("relay-set-a".to_owned()), 645 relay_delivery_json: Some(json!({ 646 "state": "acknowledged", 647 "target_relays": ["ws://127.0.0.1:8080"], 648 "connected_relays": ["ws://127.0.0.1:8080"], 649 "acknowledged_relays": ["ws://127.0.0.1:8080"], 650 "failed_relays": [] 651 })), 652 updated_at_ms: 4000, 653 }) 654 .expect("update outbox"); 655 656 assert_eq!(updated.status, LocalRecordStatus::Published); 657 assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged); 658 store.migrate_down().expect("migrate down"); 659 } 660 661 #[test] 662 fn store_reports_missing_updates_and_decode_errors() { 663 let store = store(); 664 assert!( 665 store 666 .get_record(" ") 667 .expect_err("empty record id") 668 .to_string() 669 .contains("record_id") 670 ); 671 assert!( 672 store 673 .get_cursor(" ") 674 .expect_err("empty consumer id") 675 .to_string() 676 .contains("consumer_id") 677 ); 678 assert!( 679 store 680 .advance_cursor(" ", 1, 1000) 681 .expect_err("empty cursor consumer") 682 .to_string() 683 .contains("consumer_id") 684 ); 685 assert!( 686 store 687 .update_outbox(&LocalEventRecordUpdate { 688 record_id: " ".to_owned(), 689 status: LocalRecordStatus::Published, 690 outbox_status: PublishOutboxStatus::Acknowledged, 691 relay_set_fingerprint: None, 692 relay_delivery_json: None, 693 updated_at_ms: 4000, 694 }) 695 .expect_err("empty update record id") 696 .to_string() 697 .contains("record_id") 698 ); 699 700 let missing_update = store 701 .update_outbox(&LocalEventRecordUpdate { 702 record_id: "missing-event".to_owned(), 703 status: LocalRecordStatus::Published, 704 outbox_status: PublishOutboxStatus::Acknowledged, 705 relay_set_fingerprint: None, 706 relay_delivery_json: None, 707 updated_at_ms: 4000, 708 }) 709 .expect_err("missing record update"); 710 711 assert!(missing_update.to_string().contains("missing-event")); 712 713 store 714 .append_record(&local_work("local-a")) 715 .expect("append local"); 716 let params = json!(["{", "local-a"]).to_string(); 717 store 718 .executor() 719 .exec( 720 "update local_event_record set local_work_json = ? where record_id = ?", 721 ¶ms, 722 ) 723 .expect("corrupt local work json"); 724 let decode_error = store.get_record("local-a").expect_err("decode error"); 725 726 assert!(decode_error.to_string().contains("EOF")); 727 } 728 729 #[test] 730 fn store_rolls_back_when_change_sequence_is_unavailable() { 731 let append_store = 732 LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())])); 733 let append_error = append_store 734 .append_record(&local_work("local-a")) 735 .expect_err("append error"); 736 737 assert!(append_error.to_string().contains("change sequence")); 738 assert_eq!(append_store.executor().rollbacks.load(Ordering::SeqCst), 1); 739 740 let update_store = 741 LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())])); 742 let update_error = update_store 743 .update_outbox(&LocalEventRecordUpdate { 744 record_id: "event-a".to_owned(), 745 status: LocalRecordStatus::Published, 746 outbox_status: PublishOutboxStatus::Acknowledged, 747 relay_set_fingerprint: None, 748 relay_delivery_json: None, 749 updated_at_ms: 4000, 750 }) 751 .expect_err("update error"); 752 753 assert!(update_error.to_string().contains("change sequence")); 754 assert_eq!(update_store.executor().rollbacks.load(Ordering::SeqCst), 1); 755 } 756 757 #[test] 758 fn store_reports_cursor_advance_without_returned_cursor() { 759 let store = LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), Vec::new())); 760 761 assert!(store.get_cursor("app").expect("missing cursor").is_none()); 762 let cursor_error = store 763 .advance_cursor("app", 1, 1000) 764 .expect_err("cursor advance error"); 765 766 assert!(cursor_error.to_string().contains("cursor advance failed")); 767 } 768 769 #[test] 770 fn store_reports_executor_and_decode_failures() { 771 let begin_store = LocalEventsStore::new(ScriptedExecutor::with_begin_error( 772 SqlError::InvalidQuery("begin failed".to_owned()), 773 )); 774 assert!( 775 begin_store 776 .append_record(&local_work("local-a")) 777 .expect_err("begin failure") 778 .to_string() 779 .contains("begin failed") 780 ); 781 782 let exec_store = LocalEventsStore::new(ScriptedExecutor::new( 783 vec![Err(SqlError::InvalidQuery("insert failed".to_owned()))], 784 vec![Ok(r#"[{"change_seq":1}]"#.to_owned())], 785 )); 786 assert!( 787 exec_store 788 .append_record(&local_work("local-a")) 789 .expect_err("exec failure") 790 .to_string() 791 .contains("insert failed") 792 ); 793 assert_eq!(exec_store.executor().rollbacks.load(Ordering::SeqCst), 1); 794 795 let commit_store = LocalEventsStore::new(ScriptedExecutor::with_commit_error( 796 SqlError::InvalidQuery("commit failed".to_owned()), 797 )); 798 assert!( 799 commit_store 800 .append_record(&local_work("local-a")) 801 .expect_err("commit failure") 802 .to_string() 803 .contains("commit failed") 804 ); 805 806 let query_error_store = LocalEventsStore::new(ScriptedExecutor::new( 807 Vec::new(), 808 vec![Err(SqlError::InvalidQuery("query failed".to_owned()))], 809 )); 810 assert!( 811 query_error_store 812 .get_record("record-a") 813 .expect_err("query failure") 814 .to_string() 815 .contains("query failed") 816 ); 817 818 let invalid_rows_store = 819 LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())])); 820 let _ = invalid_rows_store 821 .get_record("record-a") 822 .expect_err("invalid rows"); 823 824 let cursor_rows_store = 825 LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())])); 826 let _ = cursor_rows_store 827 .get_cursor("app") 828 .expect_err("invalid cursor rows"); 829 830 let change_rows_store = 831 LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())])); 832 let _ = change_rows_store 833 .append_record(&local_work("local-a")) 834 .expect_err("invalid change rows"); 835 836 let cursor_exec_store = LocalEventsStore::new(ScriptedExecutor::new( 837 vec![Err(SqlError::InvalidQuery("cursor failed".to_owned()))], 838 Vec::new(), 839 )); 840 assert!( 841 cursor_exec_store 842 .advance_cursor("app", 1, 1000) 843 .expect_err("cursor exec failure") 844 .to_string() 845 .contains("cursor failed") 846 ); 847 848 let append_lookup_store = LocalEventsStore::new(ScriptedExecutor::new( 849 vec![Ok(ExecOutcome { 850 changes: 1, 851 last_insert_id: 0, 852 })], 853 vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())], 854 )); 855 assert!( 856 append_lookup_store 857 .append_record(&local_work("local-a")) 858 .expect_err("append lookup failure") 859 .to_string() 860 .contains("record append failed") 861 ); 862 863 let update_lookup_store = LocalEventsStore::new(ScriptedExecutor::new( 864 vec![Ok(ExecOutcome { 865 changes: 1, 866 last_insert_id: 0, 867 })], 868 vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())], 869 )); 870 assert!( 871 update_lookup_store 872 .update_outbox(&LocalEventRecordUpdate { 873 record_id: "event-a".to_owned(), 874 status: LocalRecordStatus::Published, 875 outbox_status: PublishOutboxStatus::Acknowledged, 876 relay_set_fingerprint: None, 877 relay_delivery_json: None, 878 updated_at_ms: 4000, 879 }) 880 .expect_err("update lookup failure") 881 .to_string() 882 .contains("event-a") 883 ); 884 885 let cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new( 886 Vec::new(), 887 vec![Err(SqlError::InvalidQuery( 888 "cursor query failed".to_owned(), 889 ))], 890 )); 891 assert!( 892 cursor_query_store 893 .get_cursor("app") 894 .expect_err("cursor query failure") 895 .to_string() 896 .contains("cursor query failed") 897 ); 898 899 let advance_cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new( 900 vec![Ok(ExecOutcome { 901 changes: 1, 902 last_insert_id: 0, 903 })], 904 vec![Err(SqlError::InvalidQuery( 905 "advanced cursor query failed".to_owned(), 906 ))], 907 )); 908 assert!( 909 advance_cursor_query_store 910 .advance_cursor("app", 1, 1000) 911 .expect_err("advance cursor query failure") 912 .to_string() 913 .contains("advanced cursor query failed") 914 ); 915 916 let change_query_store = LocalEventsStore::new(ScriptedExecutor::new( 917 Vec::new(), 918 vec![Err(SqlError::InvalidQuery( 919 "change query failed".to_owned(), 920 ))], 921 )); 922 assert!( 923 change_query_store 924 .append_record(&local_work("local-a")) 925 .expect_err("change query failure") 926 .to_string() 927 .contains("change query failed") 928 ); 929 } 930 931 #[test] 932 fn store_reports_record_row_conversion_failures() { 933 for (field, value, expected) in [ 934 ("family", json!("bad_family"), "family"), 935 ("status", json!("bad_status"), "status"), 936 ("source_runtime", json!("bad_runtime"), "runtime"), 937 ("event_tags_json", json!("{"), "EOF"), 938 ("raw_event_json", json!("{"), "EOF"), 939 ("outbox_status", json!("bad_outbox"), "outbox"), 940 ("relay_delivery_json", json!("{"), "EOF"), 941 ] { 942 let store = LocalEventsStore::new(ScriptedExecutor::new( 943 Vec::new(), 944 vec![Ok(record_row_with(field, value))], 945 )); 946 let error = store.get_record("record-a").expect_err("conversion error"); 947 948 assert!( 949 error.to_string().contains(expected), 950 "expected error to contain {expected}, got {error}" 951 ); 952 } 953 954 let store = LocalEventsStore::new(ScriptedExecutor::new( 955 vec![Ok(ExecOutcome { 956 changes: 1, 957 last_insert_id: 0, 958 })], 959 vec![ 960 Ok(r#"[{"change_seq":1}]"#.to_owned()), 961 Ok(record_row_with("status", json!("published"))), 962 ], 963 )); 964 let updated = store 965 .update_outbox(&LocalEventRecordUpdate { 966 record_id: "record-a".to_owned(), 967 status: LocalRecordStatus::Published, 968 outbox_status: PublishOutboxStatus::Acknowledged, 969 relay_set_fingerprint: None, 970 relay_delivery_json: None, 971 updated_at_ms: 4000, 972 }) 973 .expect("scripted update"); 974 assert_eq!(updated.status, LocalRecordStatus::Published); 975 } 976 }