lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

store.rs (20448B)


      1 use radroots_local_events::{
      2     LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily,
      3     LocalRecordStatus, MIGRATIONS, PublishOutboxStatus, RelayDeliveryEvidence, SourceRuntime,
      4 };
      5 use radroots_sql_core::migrations::migrations_run_all_up;
      6 use radroots_sql_core::{SqlExecutor, SqliteExecutor};
      7 use serde_json::json;
      8 
      9 fn store() -> LocalEventsStore<SqliteExecutor> {
     10     let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
     11     let store = LocalEventsStore::new(executor);
     12     store.migrate_up().expect("migrate local events");
     13     store
     14 }
     15 
     16 fn local_work(record_id: &str) -> LocalEventRecordInput {
     17     LocalEventRecordInput {
     18         record_id: record_id.to_owned(),
     19         family: LocalRecordFamily::LocalWork,
     20         status: LocalRecordStatus::LocalSaved,
     21         source_runtime: SourceRuntime::Cli,
     22         created_at_ms: 1000,
     23         inserted_at_ms: 1001,
     24         owner_account_id: Some("seller-account".to_owned()),
     25         owner_pubkey: Some("seller-pubkey".to_owned()),
     26         farm_id: Some("farm-a".to_owned()),
     27         listing_addr: Some("listing-a".to_owned()),
     28         local_work_json: Some(json!({"kind":"listing","title":"Eggs"})),
     29         event_id: None,
     30         event_kind: None,
     31         event_pubkey: None,
     32         event_created_at: None,
     33         event_tags_json: None,
     34         event_content: None,
     35         event_sig: None,
     36         raw_event_json: None,
     37         outbox_status: PublishOutboxStatus::None,
     38         relay_set_fingerprint: None,
     39         relay_delivery_json: None,
     40     }
     41 }
     42 
     43 fn signed_event(record_id: &str) -> LocalEventRecordInput {
     44     LocalEventRecordInput {
     45         record_id: record_id.to_owned(),
     46         family: LocalRecordFamily::SignedEvent,
     47         status: LocalRecordStatus::PendingPublish,
     48         source_runtime: SourceRuntime::Cli,
     49         created_at_ms: 2000,
     50         inserted_at_ms: 2001,
     51         owner_account_id: Some("seller-account".to_owned()),
     52         owner_pubkey: Some("seller-pubkey".to_owned()),
     53         farm_id: Some("farm-a".to_owned()),
     54         listing_addr: Some("listing-a".to_owned()),
     55         local_work_json: None,
     56         event_id: Some("event-a".to_owned()),
     57         event_kind: Some(3421),
     58         event_pubkey: Some("seller-pubkey".to_owned()),
     59         event_created_at: Some(2000),
     60         event_tags_json: Some(json!([["d", "listing-a"]])),
     61         event_content: Some("{\"title\":\"Eggs\"}".to_owned()),
     62         event_sig: Some("sig-a".to_owned()),
     63         raw_event_json: Some(json!({"id":"event-a","kind":3421})),
     64         outbox_status: PublishOutboxStatus::Pending,
     65         relay_set_fingerprint: Some("relay-set-a".to_owned()),
     66         relay_delivery_json: Some(
     67             RelayDeliveryEvidence::pending(["ws://127.0.0.1:8080"])
     68                 .expect("pending delivery")
     69                 .to_json_value()
     70                 .expect("pending delivery json"),
     71         ),
     72     }
     73 }
     74 
     75 #[test]
     76 fn append_rejects_malformed_local_work_records() {
     77     let store = store();
     78     let mut input = local_work("local-a");
     79     input.local_work_json = None;
     80 
     81     let err = store.append_record(&input).expect_err("invalid record");
     82 
     83     assert!(err.to_string().contains("local_work_json"));
     84 }
     85 
     86 #[test]
     87 fn append_is_idempotent_by_record_id() {
     88     let store = store();
     89     let input = local_work("local-a");
     90 
     91     let first = store.append_record(&input).expect("append first");
     92     let second = store.append_record(&input).expect("append second");
     93     let rows = store.list_records_after_seq(0, 10).expect("list records");
     94 
     95     assert_eq!(first.seq, second.seq);
     96     assert_eq!(first.change_seq, second.change_seq);
     97     assert_eq!(rows.len(), 1);
     98     assert_eq!(rows[0].record_id, "local-a");
     99     assert_eq!(
    100         rows[0].local_work_json,
    101         Some(json!({"kind":"listing","title":"Eggs"}))
    102     );
    103 }
    104 
    105 #[test]
    106 fn source_runtime_network_round_trips() {
    107     let store = store();
    108     let mut input = signed_event("event-network-a");
    109     input.source_runtime = SourceRuntime::Network;
    110 
    111     let inserted = store.append_record(&input).expect("append network event");
    112     let rows = store
    113         .list_records_after_seq(0, 10)
    114         .expect("list network event");
    115 
    116     assert_eq!(SourceRuntime::Network.as_str(), "network");
    117     assert_eq!(
    118         SourceRuntime::parse("network").expect("parse network runtime"),
    119         SourceRuntime::Network
    120     );
    121     assert_eq!(inserted.source_runtime, SourceRuntime::Network);
    122     assert_eq!(rows.len(), 1);
    123     assert_eq!(rows[0].source_runtime, SourceRuntime::Network);
    124 }
    125 
    126 #[test]
    127 fn projection_cursor_advances_without_rewinding() {
    128     let store = store();
    129 
    130     let first = store
    131         .advance_cursor("app", 10, 100)
    132         .expect("advance cursor");
    133     let second = store.advance_cursor("app", 5, 200).expect("ignore rewind");
    134     let third = store.advance_cursor("app", 12, 300).expect("advance again");
    135 
    136     assert_eq!(first.last_change_seq, 10);
    137     assert_eq!(second.last_change_seq, 10);
    138     assert_eq!(third.last_change_seq, 12);
    139 }
    140 
    141 #[test]
    142 fn outbox_status_updates_signed_event_records() {
    143     let store = store();
    144     let input = signed_event("event-a");
    145     store.append_record(&input).expect("append signed event");
    146 
    147     let updated = store
    148         .update_outbox(&LocalEventRecordUpdate {
    149             record_id: "event-a".to_owned(),
    150             status: LocalRecordStatus::Published,
    151             outbox_status: PublishOutboxStatus::Acknowledged,
    152             relay_set_fingerprint: Some("relay-set-a".to_owned()),
    153             relay_delivery_json: Some(
    154                 RelayDeliveryEvidence::acknowledged(
    155                     ["ws://127.0.0.1:8080"],
    156                     ["ws://127.0.0.1:8080"],
    157                     ["ws://127.0.0.1:8080"],
    158                     Vec::new(),
    159                 )
    160                 .expect("acknowledged delivery")
    161                 .to_json_value()
    162                 .expect("acknowledged delivery json"),
    163             ),
    164             updated_at_ms: 3000,
    165         })
    166         .expect("update outbox");
    167 
    168     assert_eq!(updated.status, LocalRecordStatus::Published);
    169     assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged);
    170     assert_eq!(
    171         updated.relay_delivery_json,
    172         Some(json!({
    173             "state": "acknowledged",
    174             "target_relays": ["ws://127.0.0.1:8080"],
    175             "connected_relays": ["ws://127.0.0.1:8080"],
    176             "acknowledged_relays": ["ws://127.0.0.1:8080"],
    177             "failed_relays": []
    178         }))
    179     );
    180 }
    181 
    182 #[test]
    183 fn changed_after_uses_change_seq_for_appends_and_outbox_updates() {
    184     let store = store();
    185     let input = signed_event("event-a");
    186     let appended = store.append_record(&input).expect("append signed event");
    187     let initial_rows = store
    188         .list_records_changed_after(0, 10)
    189         .expect("list initial changes");
    190 
    191     assert_eq!(initial_rows.len(), 1);
    192     assert_eq!(initial_rows[0].record_id, "event-a");
    193     assert_eq!(initial_rows[0].seq, appended.seq);
    194     assert_eq!(initial_rows[0].change_seq, appended.change_seq);
    195 
    196     let updated = store
    197         .update_outbox(&LocalEventRecordUpdate {
    198             record_id: "event-a".to_owned(),
    199             status: LocalRecordStatus::Published,
    200             outbox_status: PublishOutboxStatus::Acknowledged,
    201             relay_set_fingerprint: Some("relay-set-a".to_owned()),
    202             relay_delivery_json: Some(
    203                 RelayDeliveryEvidence::acknowledged(
    204                     ["ws://127.0.0.1:8080"],
    205                     ["ws://127.0.0.1:8080"],
    206                     ["ws://127.0.0.1:8080"],
    207                     Vec::new(),
    208                 )
    209                 .expect("acknowledged delivery")
    210                 .to_json_value()
    211                 .expect("acknowledged delivery json"),
    212             ),
    213             updated_at_ms: 3000,
    214         })
    215         .expect("update outbox");
    216     let changed_rows = store
    217         .list_records_changed_after(appended.change_seq, 10)
    218         .expect("list changed rows");
    219 
    220     assert_eq!(updated.seq, appended.seq);
    221     assert!(updated.change_seq > appended.change_seq);
    222     assert_eq!(changed_rows.len(), 1);
    223     assert_eq!(changed_rows[0].record_id, "event-a");
    224     assert_eq!(changed_rows[0].change_seq, updated.change_seq);
    225 }
    226 
    227 #[test]
    228 fn changed_latest_lists_newest_records_first() {
    229     let store = store();
    230     let first = store
    231         .append_record(&local_work("local-a"))
    232         .expect("append first");
    233     let second = store
    234         .append_record(&local_work("local-b"))
    235         .expect("append second");
    236     let third = store
    237         .append_record(&local_work("local-c"))
    238         .expect("append third");
    239 
    240     let rows = store
    241         .list_records_changed_latest(2)
    242         .expect("list latest changed rows");
    243 
    244     assert_eq!(rows.len(), 2);
    245     assert_eq!(rows[0].record_id, "local-c");
    246     assert_eq!(rows[0].change_seq, third.change_seq);
    247     assert_eq!(rows[1].record_id, "local-b");
    248     assert_eq!(rows[1].change_seq, second.change_seq);
    249     assert!(rows[1].change_seq > first.change_seq);
    250 }
    251 
    252 #[test]
    253 fn changed_before_pages_newest_first_by_cursor() {
    254     let store = store();
    255     let _first = store
    256         .append_record(&local_work("local-a"))
    257         .expect("append first");
    258     let second = store
    259         .append_record(&local_work("local-b"))
    260         .expect("append second");
    261     let third = store
    262         .append_record(&local_work("local-c"))
    263         .expect("append third");
    264     let fourth = store
    265         .append_record(&local_work("local-d"))
    266         .expect("append fourth");
    267 
    268     let first_page = store
    269         .list_records_changed_latest(2)
    270         .expect("list first page");
    271     let cursor = first_page.last().expect("last first page");
    272     let second_page = store
    273         .list_records_changed_before(cursor.change_seq, cursor.seq, 2)
    274         .expect("list second page");
    275 
    276     assert_eq!(first_page.len(), 2);
    277     assert_eq!(first_page[0].record_id, "local-d");
    278     assert_eq!(first_page[0].change_seq, fourth.change_seq);
    279     assert_eq!(first_page[1].record_id, "local-c");
    280     assert_eq!(first_page[1].change_seq, third.change_seq);
    281     assert_eq!(second_page.len(), 2);
    282     assert_eq!(second_page[0].record_id, "local-b");
    283     assert_eq!(second_page[0].change_seq, second.change_seq);
    284     assert_eq!(second_page[1].record_id, "local-a");
    285 }
    286 
    287 #[test]
    288 fn changed_latest_is_not_blocked_by_older_record_volume() {
    289     let store = store();
    290     for index in 0..505 {
    291         store
    292             .append_record(&local_work(&format!("older-{index:03}")))
    293             .expect("append older record");
    294     }
    295     let current = store
    296         .append_record(&local_work("current-record"))
    297         .expect("append current record");
    298 
    299     let rows = store
    300         .list_records_changed_latest(1)
    301         .expect("list latest record");
    302 
    303     assert_eq!(rows.len(), 1);
    304     assert_eq!(rows[0].record_id, "current-record");
    305     assert_eq!(rows[0].change_seq, current.change_seq);
    306 }
    307 
    308 #[test]
    309 fn migration_assigns_existing_records_change_seq_from_insert_order() {
    310     let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
    311     migrations_run_all_up(&executor, &MIGRATIONS[..1]).expect("apply initial migration");
    312     let first = insert_pre_change_tracking_record(&executor, "local-a");
    313     let second = insert_pre_change_tracking_record(&executor, "local-b");
    314     let store = LocalEventsStore::new(executor);
    315 
    316     store.migrate_up().expect("apply change tracking migration");
    317     let rows = store
    318         .list_records_changed_after(0, 10)
    319         .expect("list changed rows after migration");
    320 
    321     assert_eq!(rows.len(), 2);
    322     assert_eq!(rows[0].seq, first);
    323     assert_eq!(rows[0].change_seq, first);
    324     assert_eq!(rows[1].seq, second);
    325     assert_eq!(rows[1].change_seq, second);
    326 }
    327 
    328 #[test]
    329 fn migration_repairs_pre_network_source_runtime_constraint() {
    330     let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
    331     create_pre_network_change_tracking_schema(&executor);
    332     let legacy_seq = insert_pre_network_change_tracking_record(&executor, "legacy-cli", 1);
    333     let store = LocalEventsStore::new(executor);
    334 
    335     store
    336         .migrate_up()
    337         .expect("apply network source repair migration");
    338     let mut input = signed_event("event-network-repaired");
    339     input.source_runtime = SourceRuntime::Network;
    340     input.event_id = Some("event-network-repaired".to_owned());
    341     input.raw_event_json = Some(json!({"id":"event-network-repaired","kind":3421}));
    342     let inserted = store
    343         .append_record(&input)
    344         .expect("append repaired network event");
    345     let rows = store
    346         .list_records_changed_after(0, 10)
    347         .expect("list changed rows after repair");
    348 
    349     assert_eq!(legacy_seq, 1);
    350     assert_eq!(rows.len(), 2);
    351     assert_eq!(rows[0].record_id, "legacy-cli");
    352     assert_eq!(rows[0].change_seq, 1);
    353     assert_eq!(rows[0].source_runtime, SourceRuntime::Cli);
    354     assert_eq!(rows[1].record_id, "event-network-repaired");
    355     assert_eq!(rows[1].seq, inserted.seq);
    356     assert_eq!(rows[1].source_runtime, SourceRuntime::Network);
    357 }
    358 
    359 fn insert_pre_change_tracking_record(executor: &SqliteExecutor, record_id: &str) -> i64 {
    360     let input = local_work(record_id);
    361     let params = json!([
    362         input.record_id,
    363         input.family.as_str(),
    364         input.status.as_str(),
    365         input.source_runtime.as_str(),
    366         input.created_at_ms,
    367         input.inserted_at_ms,
    368         input.inserted_at_ms,
    369         input.owner_account_id,
    370         input.owner_pubkey,
    371         input.farm_id,
    372         input.listing_addr,
    373         serde_json::to_string(&input.local_work_json).expect("encode local work"),
    374         input.event_id,
    375         input.event_kind,
    376         input.event_pubkey,
    377         input.event_created_at,
    378         input
    379             .event_tags_json
    380             .map(|value| serde_json::to_string(&value).expect("encode tags")),
    381         input.event_content,
    382         input.event_sig,
    383         input
    384             .raw_event_json
    385             .map(|value| serde_json::to_string(&value).expect("encode raw event")),
    386         input.outbox_status.as_str(),
    387         input.relay_set_fingerprint,
    388         input
    389             .relay_delivery_json
    390             .map(|value| serde_json::to_string(&value).expect("encode relay delivery")),
    391     ])
    392     .to_string();
    393     let outcome = executor
    394         .exec(
    395             "insert into local_event_record(
    396                 record_id,
    397                 family,
    398                 status,
    399                 source_runtime,
    400                 created_at_ms,
    401                 inserted_at_ms,
    402                 updated_at_ms,
    403                 owner_account_id,
    404                 owner_pubkey,
    405                 farm_id,
    406                 listing_addr,
    407                 local_work_json,
    408                 event_id,
    409                 event_kind,
    410                 event_pubkey,
    411                 event_created_at,
    412                 event_tags_json,
    413                 event_content,
    414                 event_sig,
    415                 raw_event_json,
    416                 outbox_status,
    417                 relay_set_fingerprint,
    418                 relay_delivery_json
    419             ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
    420             &params,
    421         )
    422         .expect("insert old local event record");
    423     outcome.last_insert_id
    424 }
    425 
    426 fn create_pre_network_change_tracking_schema(executor: &SqliteExecutor) {
    427     let schema = [
    428         "create table __migrations(id integer primary key, name text not null unique, applied_at text not null default (datetime('now')))",
    429         "create table local_event_record (
    430             seq integer primary key autoincrement,
    431             change_seq integer not null unique,
    432             record_id text not null unique,
    433             family text not null check (family in ('local_work', 'signed_event')),
    434             status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')),
    435             source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')),
    436             created_at_ms integer not null,
    437             inserted_at_ms integer not null,
    438             updated_at_ms integer not null,
    439             owner_account_id text,
    440             owner_pubkey text,
    441             farm_id text,
    442             listing_addr text,
    443             local_work_json text,
    444             event_id text,
    445             event_kind integer,
    446             event_pubkey text,
    447             event_created_at integer,
    448             event_tags_json text,
    449             event_content text,
    450             event_sig text,
    451             raw_event_json text,
    452             outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')),
    453             relay_set_fingerprint text,
    454             relay_delivery_json text,
    455             check (change_seq >= 1),
    456             check (trim(record_id) <> ''),
    457             check (family <> 'local_work' or local_work_json is not null),
    458             check (family <> 'local_work' or outbox_status = 'none'),
    459             check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null))
    460         )",
    461         "create index local_event_record_change_seq_idx on local_event_record(change_seq)",
    462         "create index local_event_record_event_id_idx on local_event_record(event_id)",
    463         "create index local_event_record_listing_addr_idx on local_event_record(listing_addr)",
    464         "create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey)",
    465         "create index local_event_record_status_idx on local_event_record(status)",
    466         "create table local_event_projection_cursor (
    467             consumer_id text primary key,
    468             last_change_seq integer not null,
    469             updated_at_ms integer not null,
    470             check (trim(consumer_id) <> ''),
    471             check (last_change_seq >= 0)
    472         )",
    473     ];
    474     for sql in schema {
    475         executor.exec(sql, "[]").expect("schema statement");
    476     }
    477     for name in ["0000_local_events", "0001_change_tracking"] {
    478         let params = json!([name]).to_string();
    479         executor
    480             .exec("insert into __migrations(name) values(?)", &params)
    481             .expect("migration marker");
    482     }
    483 }
    484 
    485 fn insert_pre_network_change_tracking_record(
    486     executor: &SqliteExecutor,
    487     record_id: &str,
    488     change_seq: i64,
    489 ) -> i64 {
    490     let input = local_work(record_id);
    491     let params = json!([
    492         change_seq,
    493         input.record_id,
    494         input.family.as_str(),
    495         input.status.as_str(),
    496         input.source_runtime.as_str(),
    497         input.created_at_ms,
    498         input.inserted_at_ms,
    499         input.inserted_at_ms,
    500         input.owner_account_id,
    501         input.owner_pubkey,
    502         input.farm_id,
    503         input.listing_addr,
    504         serde_json::to_string(&input.local_work_json).expect("encode local work"),
    505         input.event_id,
    506         input.event_kind,
    507         input.event_pubkey,
    508         input.event_created_at,
    509         input
    510             .event_tags_json
    511             .map(|value| serde_json::to_string(&value).expect("encode tags")),
    512         input.event_content,
    513         input.event_sig,
    514         input
    515             .raw_event_json
    516             .map(|value| serde_json::to_string(&value).expect("encode raw event")),
    517         input.outbox_status.as_str(),
    518         input.relay_set_fingerprint,
    519         input
    520             .relay_delivery_json
    521             .map(|value| serde_json::to_string(&value).expect("encode relay delivery")),
    522     ])
    523     .to_string();
    524     let outcome = executor
    525         .exec(
    526             "insert into local_event_record(
    527                 change_seq,
    528                 record_id,
    529                 family,
    530                 status,
    531                 source_runtime,
    532                 created_at_ms,
    533                 inserted_at_ms,
    534                 updated_at_ms,
    535                 owner_account_id,
    536                 owner_pubkey,
    537                 farm_id,
    538                 listing_addr,
    539                 local_work_json,
    540                 event_id,
    541                 event_kind,
    542                 event_pubkey,
    543                 event_created_at,
    544                 event_tags_json,
    545                 event_content,
    546                 event_sig,
    547                 raw_event_json,
    548                 outbox_status,
    549                 relay_set_fingerprint,
    550                 relay_delivery_json
    551             ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
    552             &params,
    553         )
    554         .expect("insert pre-network local event record");
    555     outcome.last_insert_id
    556 }