store.rs (20448B)
1 use radroots_local_events::{ 2 LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily, 3 LocalRecordStatus, MIGRATIONS, PublishOutboxStatus, RelayDeliveryEvidence, SourceRuntime, 4 }; 5 use radroots_sql_core::migrations::migrations_run_all_up; 6 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 7 use serde_json::json; 8 9 fn store() -> LocalEventsStore<SqliteExecutor> { 10 let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); 11 let store = LocalEventsStore::new(executor); 12 store.migrate_up().expect("migrate local events"); 13 store 14 } 15 16 fn local_work(record_id: &str) -> LocalEventRecordInput { 17 LocalEventRecordInput { 18 record_id: record_id.to_owned(), 19 family: LocalRecordFamily::LocalWork, 20 status: LocalRecordStatus::LocalSaved, 21 source_runtime: SourceRuntime::Cli, 22 created_at_ms: 1000, 23 inserted_at_ms: 1001, 24 owner_account_id: Some("seller-account".to_owned()), 25 owner_pubkey: Some("seller-pubkey".to_owned()), 26 farm_id: Some("farm-a".to_owned()), 27 listing_addr: Some("listing-a".to_owned()), 28 local_work_json: Some(json!({"kind":"listing","title":"Eggs"})), 29 event_id: None, 30 event_kind: None, 31 event_pubkey: None, 32 event_created_at: None, 33 event_tags_json: None, 34 event_content: None, 35 event_sig: None, 36 raw_event_json: None, 37 outbox_status: PublishOutboxStatus::None, 38 relay_set_fingerprint: None, 39 relay_delivery_json: None, 40 } 41 } 42 43 fn signed_event(record_id: &str) -> LocalEventRecordInput { 44 LocalEventRecordInput { 45 record_id: record_id.to_owned(), 46 family: LocalRecordFamily::SignedEvent, 47 status: LocalRecordStatus::PendingPublish, 48 source_runtime: SourceRuntime::Cli, 49 created_at_ms: 2000, 50 inserted_at_ms: 2001, 51 owner_account_id: Some("seller-account".to_owned()), 52 owner_pubkey: Some("seller-pubkey".to_owned()), 53 farm_id: Some("farm-a".to_owned()), 54 listing_addr: Some("listing-a".to_owned()), 55 local_work_json: None, 56 event_id: Some("event-a".to_owned()), 57 event_kind: Some(3421), 58 event_pubkey: Some("seller-pubkey".to_owned()), 59 event_created_at: Some(2000), 60 event_tags_json: Some(json!([["d", "listing-a"]])), 61 event_content: Some("{\"title\":\"Eggs\"}".to_owned()), 62 event_sig: Some("sig-a".to_owned()), 63 raw_event_json: Some(json!({"id":"event-a","kind":3421})), 64 outbox_status: PublishOutboxStatus::Pending, 65 relay_set_fingerprint: Some("relay-set-a".to_owned()), 66 relay_delivery_json: Some( 67 RelayDeliveryEvidence::pending(["ws://127.0.0.1:8080"]) 68 .expect("pending delivery") 69 .to_json_value() 70 .expect("pending delivery json"), 71 ), 72 } 73 } 74 75 #[test] 76 fn append_rejects_malformed_local_work_records() { 77 let store = store(); 78 let mut input = local_work("local-a"); 79 input.local_work_json = None; 80 81 let err = store.append_record(&input).expect_err("invalid record"); 82 83 assert!(err.to_string().contains("local_work_json")); 84 } 85 86 #[test] 87 fn append_is_idempotent_by_record_id() { 88 let store = store(); 89 let input = local_work("local-a"); 90 91 let first = store.append_record(&input).expect("append first"); 92 let second = store.append_record(&input).expect("append second"); 93 let rows = store.list_records_after_seq(0, 10).expect("list records"); 94 95 assert_eq!(first.seq, second.seq); 96 assert_eq!(first.change_seq, second.change_seq); 97 assert_eq!(rows.len(), 1); 98 assert_eq!(rows[0].record_id, "local-a"); 99 assert_eq!( 100 rows[0].local_work_json, 101 Some(json!({"kind":"listing","title":"Eggs"})) 102 ); 103 } 104 105 #[test] 106 fn source_runtime_network_round_trips() { 107 let store = store(); 108 let mut input = signed_event("event-network-a"); 109 input.source_runtime = SourceRuntime::Network; 110 111 let inserted = store.append_record(&input).expect("append network event"); 112 let rows = store 113 .list_records_after_seq(0, 10) 114 .expect("list network event"); 115 116 assert_eq!(SourceRuntime::Network.as_str(), "network"); 117 assert_eq!( 118 SourceRuntime::parse("network").expect("parse network runtime"), 119 SourceRuntime::Network 120 ); 121 assert_eq!(inserted.source_runtime, SourceRuntime::Network); 122 assert_eq!(rows.len(), 1); 123 assert_eq!(rows[0].source_runtime, SourceRuntime::Network); 124 } 125 126 #[test] 127 fn projection_cursor_advances_without_rewinding() { 128 let store = store(); 129 130 let first = store 131 .advance_cursor("app", 10, 100) 132 .expect("advance cursor"); 133 let second = store.advance_cursor("app", 5, 200).expect("ignore rewind"); 134 let third = store.advance_cursor("app", 12, 300).expect("advance again"); 135 136 assert_eq!(first.last_change_seq, 10); 137 assert_eq!(second.last_change_seq, 10); 138 assert_eq!(third.last_change_seq, 12); 139 } 140 141 #[test] 142 fn outbox_status_updates_signed_event_records() { 143 let store = store(); 144 let input = signed_event("event-a"); 145 store.append_record(&input).expect("append signed event"); 146 147 let updated = store 148 .update_outbox(&LocalEventRecordUpdate { 149 record_id: "event-a".to_owned(), 150 status: LocalRecordStatus::Published, 151 outbox_status: PublishOutboxStatus::Acknowledged, 152 relay_set_fingerprint: Some("relay-set-a".to_owned()), 153 relay_delivery_json: Some( 154 RelayDeliveryEvidence::acknowledged( 155 ["ws://127.0.0.1:8080"], 156 ["ws://127.0.0.1:8080"], 157 ["ws://127.0.0.1:8080"], 158 Vec::new(), 159 ) 160 .expect("acknowledged delivery") 161 .to_json_value() 162 .expect("acknowledged delivery json"), 163 ), 164 updated_at_ms: 3000, 165 }) 166 .expect("update outbox"); 167 168 assert_eq!(updated.status, LocalRecordStatus::Published); 169 assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged); 170 assert_eq!( 171 updated.relay_delivery_json, 172 Some(json!({ 173 "state": "acknowledged", 174 "target_relays": ["ws://127.0.0.1:8080"], 175 "connected_relays": ["ws://127.0.0.1:8080"], 176 "acknowledged_relays": ["ws://127.0.0.1:8080"], 177 "failed_relays": [] 178 })) 179 ); 180 } 181 182 #[test] 183 fn changed_after_uses_change_seq_for_appends_and_outbox_updates() { 184 let store = store(); 185 let input = signed_event("event-a"); 186 let appended = store.append_record(&input).expect("append signed event"); 187 let initial_rows = store 188 .list_records_changed_after(0, 10) 189 .expect("list initial changes"); 190 191 assert_eq!(initial_rows.len(), 1); 192 assert_eq!(initial_rows[0].record_id, "event-a"); 193 assert_eq!(initial_rows[0].seq, appended.seq); 194 assert_eq!(initial_rows[0].change_seq, appended.change_seq); 195 196 let updated = store 197 .update_outbox(&LocalEventRecordUpdate { 198 record_id: "event-a".to_owned(), 199 status: LocalRecordStatus::Published, 200 outbox_status: PublishOutboxStatus::Acknowledged, 201 relay_set_fingerprint: Some("relay-set-a".to_owned()), 202 relay_delivery_json: Some( 203 RelayDeliveryEvidence::acknowledged( 204 ["ws://127.0.0.1:8080"], 205 ["ws://127.0.0.1:8080"], 206 ["ws://127.0.0.1:8080"], 207 Vec::new(), 208 ) 209 .expect("acknowledged delivery") 210 .to_json_value() 211 .expect("acknowledged delivery json"), 212 ), 213 updated_at_ms: 3000, 214 }) 215 .expect("update outbox"); 216 let changed_rows = store 217 .list_records_changed_after(appended.change_seq, 10) 218 .expect("list changed rows"); 219 220 assert_eq!(updated.seq, appended.seq); 221 assert!(updated.change_seq > appended.change_seq); 222 assert_eq!(changed_rows.len(), 1); 223 assert_eq!(changed_rows[0].record_id, "event-a"); 224 assert_eq!(changed_rows[0].change_seq, updated.change_seq); 225 } 226 227 #[test] 228 fn changed_latest_lists_newest_records_first() { 229 let store = store(); 230 let first = store 231 .append_record(&local_work("local-a")) 232 .expect("append first"); 233 let second = store 234 .append_record(&local_work("local-b")) 235 .expect("append second"); 236 let third = store 237 .append_record(&local_work("local-c")) 238 .expect("append third"); 239 240 let rows = store 241 .list_records_changed_latest(2) 242 .expect("list latest changed rows"); 243 244 assert_eq!(rows.len(), 2); 245 assert_eq!(rows[0].record_id, "local-c"); 246 assert_eq!(rows[0].change_seq, third.change_seq); 247 assert_eq!(rows[1].record_id, "local-b"); 248 assert_eq!(rows[1].change_seq, second.change_seq); 249 assert!(rows[1].change_seq > first.change_seq); 250 } 251 252 #[test] 253 fn changed_before_pages_newest_first_by_cursor() { 254 let store = store(); 255 let _first = store 256 .append_record(&local_work("local-a")) 257 .expect("append first"); 258 let second = store 259 .append_record(&local_work("local-b")) 260 .expect("append second"); 261 let third = store 262 .append_record(&local_work("local-c")) 263 .expect("append third"); 264 let fourth = store 265 .append_record(&local_work("local-d")) 266 .expect("append fourth"); 267 268 let first_page = store 269 .list_records_changed_latest(2) 270 .expect("list first page"); 271 let cursor = first_page.last().expect("last first page"); 272 let second_page = store 273 .list_records_changed_before(cursor.change_seq, cursor.seq, 2) 274 .expect("list second page"); 275 276 assert_eq!(first_page.len(), 2); 277 assert_eq!(first_page[0].record_id, "local-d"); 278 assert_eq!(first_page[0].change_seq, fourth.change_seq); 279 assert_eq!(first_page[1].record_id, "local-c"); 280 assert_eq!(first_page[1].change_seq, third.change_seq); 281 assert_eq!(second_page.len(), 2); 282 assert_eq!(second_page[0].record_id, "local-b"); 283 assert_eq!(second_page[0].change_seq, second.change_seq); 284 assert_eq!(second_page[1].record_id, "local-a"); 285 } 286 287 #[test] 288 fn changed_latest_is_not_blocked_by_older_record_volume() { 289 let store = store(); 290 for index in 0..505 { 291 store 292 .append_record(&local_work(&format!("older-{index:03}"))) 293 .expect("append older record"); 294 } 295 let current = store 296 .append_record(&local_work("current-record")) 297 .expect("append current record"); 298 299 let rows = store 300 .list_records_changed_latest(1) 301 .expect("list latest record"); 302 303 assert_eq!(rows.len(), 1); 304 assert_eq!(rows[0].record_id, "current-record"); 305 assert_eq!(rows[0].change_seq, current.change_seq); 306 } 307 308 #[test] 309 fn migration_assigns_existing_records_change_seq_from_insert_order() { 310 let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); 311 migrations_run_all_up(&executor, &MIGRATIONS[..1]).expect("apply initial migration"); 312 let first = insert_pre_change_tracking_record(&executor, "local-a"); 313 let second = insert_pre_change_tracking_record(&executor, "local-b"); 314 let store = LocalEventsStore::new(executor); 315 316 store.migrate_up().expect("apply change tracking migration"); 317 let rows = store 318 .list_records_changed_after(0, 10) 319 .expect("list changed rows after migration"); 320 321 assert_eq!(rows.len(), 2); 322 assert_eq!(rows[0].seq, first); 323 assert_eq!(rows[0].change_seq, first); 324 assert_eq!(rows[1].seq, second); 325 assert_eq!(rows[1].change_seq, second); 326 } 327 328 #[test] 329 fn migration_repairs_pre_network_source_runtime_constraint() { 330 let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); 331 create_pre_network_change_tracking_schema(&executor); 332 let legacy_seq = insert_pre_network_change_tracking_record(&executor, "legacy-cli", 1); 333 let store = LocalEventsStore::new(executor); 334 335 store 336 .migrate_up() 337 .expect("apply network source repair migration"); 338 let mut input = signed_event("event-network-repaired"); 339 input.source_runtime = SourceRuntime::Network; 340 input.event_id = Some("event-network-repaired".to_owned()); 341 input.raw_event_json = Some(json!({"id":"event-network-repaired","kind":3421})); 342 let inserted = store 343 .append_record(&input) 344 .expect("append repaired network event"); 345 let rows = store 346 .list_records_changed_after(0, 10) 347 .expect("list changed rows after repair"); 348 349 assert_eq!(legacy_seq, 1); 350 assert_eq!(rows.len(), 2); 351 assert_eq!(rows[0].record_id, "legacy-cli"); 352 assert_eq!(rows[0].change_seq, 1); 353 assert_eq!(rows[0].source_runtime, SourceRuntime::Cli); 354 assert_eq!(rows[1].record_id, "event-network-repaired"); 355 assert_eq!(rows[1].seq, inserted.seq); 356 assert_eq!(rows[1].source_runtime, SourceRuntime::Network); 357 } 358 359 fn insert_pre_change_tracking_record(executor: &SqliteExecutor, record_id: &str) -> i64 { 360 let input = local_work(record_id); 361 let params = json!([ 362 input.record_id, 363 input.family.as_str(), 364 input.status.as_str(), 365 input.source_runtime.as_str(), 366 input.created_at_ms, 367 input.inserted_at_ms, 368 input.inserted_at_ms, 369 input.owner_account_id, 370 input.owner_pubkey, 371 input.farm_id, 372 input.listing_addr, 373 serde_json::to_string(&input.local_work_json).expect("encode local work"), 374 input.event_id, 375 input.event_kind, 376 input.event_pubkey, 377 input.event_created_at, 378 input 379 .event_tags_json 380 .map(|value| serde_json::to_string(&value).expect("encode tags")), 381 input.event_content, 382 input.event_sig, 383 input 384 .raw_event_json 385 .map(|value| serde_json::to_string(&value).expect("encode raw event")), 386 input.outbox_status.as_str(), 387 input.relay_set_fingerprint, 388 input 389 .relay_delivery_json 390 .map(|value| serde_json::to_string(&value).expect("encode relay delivery")), 391 ]) 392 .to_string(); 393 let outcome = executor 394 .exec( 395 "insert into local_event_record( 396 record_id, 397 family, 398 status, 399 source_runtime, 400 created_at_ms, 401 inserted_at_ms, 402 updated_at_ms, 403 owner_account_id, 404 owner_pubkey, 405 farm_id, 406 listing_addr, 407 local_work_json, 408 event_id, 409 event_kind, 410 event_pubkey, 411 event_created_at, 412 event_tags_json, 413 event_content, 414 event_sig, 415 raw_event_json, 416 outbox_status, 417 relay_set_fingerprint, 418 relay_delivery_json 419 ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", 420 ¶ms, 421 ) 422 .expect("insert old local event record"); 423 outcome.last_insert_id 424 } 425 426 fn create_pre_network_change_tracking_schema(executor: &SqliteExecutor) { 427 let schema = [ 428 "create table __migrations(id integer primary key, name text not null unique, applied_at text not null default (datetime('now')))", 429 "create table local_event_record ( 430 seq integer primary key autoincrement, 431 change_seq integer not null unique, 432 record_id text not null unique, 433 family text not null check (family in ('local_work', 'signed_event')), 434 status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')), 435 source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')), 436 created_at_ms integer not null, 437 inserted_at_ms integer not null, 438 updated_at_ms integer not null, 439 owner_account_id text, 440 owner_pubkey text, 441 farm_id text, 442 listing_addr text, 443 local_work_json text, 444 event_id text, 445 event_kind integer, 446 event_pubkey text, 447 event_created_at integer, 448 event_tags_json text, 449 event_content text, 450 event_sig text, 451 raw_event_json text, 452 outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')), 453 relay_set_fingerprint text, 454 relay_delivery_json text, 455 check (change_seq >= 1), 456 check (trim(record_id) <> ''), 457 check (family <> 'local_work' or local_work_json is not null), 458 check (family <> 'local_work' or outbox_status = 'none'), 459 check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null)) 460 )", 461 "create index local_event_record_change_seq_idx on local_event_record(change_seq)", 462 "create index local_event_record_event_id_idx on local_event_record(event_id)", 463 "create index local_event_record_listing_addr_idx on local_event_record(listing_addr)", 464 "create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey)", 465 "create index local_event_record_status_idx on local_event_record(status)", 466 "create table local_event_projection_cursor ( 467 consumer_id text primary key, 468 last_change_seq integer not null, 469 updated_at_ms integer not null, 470 check (trim(consumer_id) <> ''), 471 check (last_change_seq >= 0) 472 )", 473 ]; 474 for sql in schema { 475 executor.exec(sql, "[]").expect("schema statement"); 476 } 477 for name in ["0000_local_events", "0001_change_tracking"] { 478 let params = json!([name]).to_string(); 479 executor 480 .exec("insert into __migrations(name) values(?)", ¶ms) 481 .expect("migration marker"); 482 } 483 } 484 485 fn insert_pre_network_change_tracking_record( 486 executor: &SqliteExecutor, 487 record_id: &str, 488 change_seq: i64, 489 ) -> i64 { 490 let input = local_work(record_id); 491 let params = json!([ 492 change_seq, 493 input.record_id, 494 input.family.as_str(), 495 input.status.as_str(), 496 input.source_runtime.as_str(), 497 input.created_at_ms, 498 input.inserted_at_ms, 499 input.inserted_at_ms, 500 input.owner_account_id, 501 input.owner_pubkey, 502 input.farm_id, 503 input.listing_addr, 504 serde_json::to_string(&input.local_work_json).expect("encode local work"), 505 input.event_id, 506 input.event_kind, 507 input.event_pubkey, 508 input.event_created_at, 509 input 510 .event_tags_json 511 .map(|value| serde_json::to_string(&value).expect("encode tags")), 512 input.event_content, 513 input.event_sig, 514 input 515 .raw_event_json 516 .map(|value| serde_json::to_string(&value).expect("encode raw event")), 517 input.outbox_status.as_str(), 518 input.relay_set_fingerprint, 519 input 520 .relay_delivery_json 521 .map(|value| serde_json::to_string(&value).expect("encode relay delivery")), 522 ]) 523 .to_string(); 524 let outcome = executor 525 .exec( 526 "insert into local_event_record( 527 change_seq, 528 record_id, 529 family, 530 status, 531 source_runtime, 532 created_at_ms, 533 inserted_at_ms, 534 updated_at_ms, 535 owner_account_id, 536 owner_pubkey, 537 farm_id, 538 listing_addr, 539 local_work_json, 540 event_id, 541 event_kind, 542 event_pubkey, 543 event_created_at, 544 event_tags_json, 545 event_content, 546 event_sig, 547 raw_event_json, 548 outbox_status, 549 relay_set_fingerprint, 550 relay_delivery_json 551 ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", 552 ¶ms, 553 ) 554 .expect("insert pre-network local event record"); 555 outcome.last_insert_id 556 }