lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 3a2fb61198689ff1584c436ae8a56c39468a1f4d
parent 5d65ed2a2ad73bf31737411a5439a65185d83c81
Author: triesap <tyson@radroots.org>
Date:   Sat, 23 May 2026 09:13:18 +0000

local_events: add change tracking

- add change_seq migration and changed-after queries for shared records
- move projection cursors from insert seq to explicit change seq
- advance change state for appends and outbox status updates
- cover idempotent append, migration, cursor, and update behavior

Diffstat:
Acrates/local_events/migrations/0001_change_tracking.down.sql | 114+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/local_events/migrations/0001_change_tracking.up.sql | 119+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/local_events/src/migrations.rs | 17++++++++++++-----
Mcrates/local_events/src/models.rs | 3++-
Mcrates/local_events/src/store.rs | 224+++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
Mcrates/local_events/tests/store.rs | 136+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
6 files changed, 521 insertions(+), 92 deletions(-)

diff --git a/crates/local_events/migrations/0001_change_tracking.down.sql b/crates/local_events/migrations/0001_change_tracking.down.sql @@ -0,0 +1,114 @@ +create table local_event_projection_cursor_previous ( + consumer_id text primary key, + last_seq integer not null, + updated_at_ms integer not null, + check (trim(consumer_id) <> ''), + check (last_seq >= 0) +); + +insert into local_event_projection_cursor_previous( + consumer_id, + last_seq, + updated_at_ms +) +select + consumer_id, + last_change_seq, + updated_at_ms +from local_event_projection_cursor; + +drop table local_event_projection_cursor; +alter table local_event_projection_cursor_previous rename to local_event_projection_cursor; + +create table local_event_record_previous ( + seq integer primary key autoincrement, + record_id text not null unique, + family text not null check (family in ('local_work', 'signed_event')), + status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')), + source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')), + created_at_ms integer not null, + inserted_at_ms integer not null, + updated_at_ms integer not null, + owner_account_id text, + owner_pubkey text, + farm_id text, + listing_addr text, + local_work_json text, + event_id text, + event_kind integer, + event_pubkey text, + event_created_at integer, + event_tags_json text, + event_content text, + event_sig text, + raw_event_json text, + outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')), + relay_set_fingerprint text, + relay_delivery_json text, + check (trim(record_id) <> ''), + check (family <> 'local_work' or local_work_json is not null), + check (family <> 'local_work' or outbox_status = 'none'), + check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null)) +); + +insert into local_event_record_previous( + seq, + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json +) +select + seq, + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json +from local_event_record +order by seq asc; + +drop table local_event_record; +alter table local_event_record_previous rename to local_event_record; + +create index local_event_record_event_id_idx on local_event_record(event_id); +create index local_event_record_listing_addr_idx on local_event_record(listing_addr); +create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey); +create index local_event_record_status_idx on local_event_record(status); diff --git a/crates/local_events/migrations/0001_change_tracking.up.sql b/crates/local_events/migrations/0001_change_tracking.up.sql @@ -0,0 +1,119 @@ +create table local_event_record_next ( + seq integer primary key autoincrement, + change_seq integer not null unique, + record_id text not null unique, + family text not null check (family in ('local_work', 'signed_event')), + status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')), + source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')), + created_at_ms integer not null, + inserted_at_ms integer not null, + updated_at_ms integer not null, + owner_account_id text, + owner_pubkey text, + farm_id text, + listing_addr text, + local_work_json text, + event_id text, + event_kind integer, + event_pubkey text, + event_created_at integer, + event_tags_json text, + event_content text, + event_sig text, + raw_event_json text, + outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')), + relay_set_fingerprint text, + relay_delivery_json text, + check (change_seq >= 1), + check (trim(record_id) <> ''), + check (family <> 'local_work' or local_work_json is not null), + check (family <> 'local_work' or outbox_status = 'none'), + check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null)) +); + +insert into local_event_record_next( + seq, + change_seq, + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json +) +select + seq, + seq, + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json +from local_event_record +order by seq asc; + +drop table local_event_record; +alter table local_event_record_next rename to local_event_record; + +create index local_event_record_change_seq_idx on local_event_record(change_seq); +create index local_event_record_event_id_idx on local_event_record(event_id); +create index local_event_record_listing_addr_idx on local_event_record(listing_addr); +create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey); +create index local_event_record_status_idx on local_event_record(status); + +create table local_event_projection_cursor_next ( + consumer_id text primary key, + last_change_seq integer not null, + updated_at_ms integer not null, + check (trim(consumer_id) <> ''), + check (last_change_seq >= 0) +); + +insert into local_event_projection_cursor_next( + consumer_id, + last_change_seq, + updated_at_ms +) +select + consumer_id, + last_seq, + updated_at_ms +from local_event_projection_cursor; + +drop table local_event_projection_cursor; +alter table local_event_projection_cursor_next rename to local_event_projection_cursor; diff --git a/crates/local_events/src/migrations.rs b/crates/local_events/src/migrations.rs @@ -4,11 +4,18 @@ use radroots_sql_core::SqlExecutor; use radroots_sql_core::error::SqlError; use radroots_sql_core::migrations::{Migration, migrations_run_all_down, migrations_run_all_up}; -pub static MIGRATIONS: &[Migration] = &[Migration { - name: "0000_local_events", - up_sql: include_str!("../migrations/0000_local_events.up.sql"), - down_sql: include_str!("../migrations/0000_local_events.down.sql"), -}]; +pub static MIGRATIONS: &[Migration] = &[ + Migration { + name: "0000_local_events", + up_sql: include_str!("../migrations/0000_local_events.up.sql"), + down_sql: include_str!("../migrations/0000_local_events.down.sql"), + }, + Migration { + name: "0001_change_tracking", + up_sql: include_str!("../migrations/0001_change_tracking.up.sql"), + down_sql: include_str!("../migrations/0001_change_tracking.down.sql"), + }, +]; pub fn run_all_up<E>(executor: &E) -> Result<(), SqlError> where diff --git a/crates/local_events/src/models.rs b/crates/local_events/src/models.rs @@ -213,6 +213,7 @@ impl LocalEventRecordInput { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct LocalEventRecord { pub seq: i64, + pub change_seq: i64, pub record_id: String, pub family: LocalRecordFamily, pub status: LocalRecordStatus, @@ -251,7 +252,7 @@ pub struct LocalEventRecordUpdate { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct LocalEventsCursor { pub consumer_id: String, - pub last_seq: i64, + pub last_change_seq: i64, pub updated_at_ms: i64, } diff --git a/crates/local_events/src/store.rs b/crates/local_events/src/store.rs @@ -38,58 +38,72 @@ impl<E: SqlExecutor> LocalEventsStore<E> { input: &LocalEventRecordInput, ) -> Result<LocalEventRecord, LocalEventsError> { input.validate()?; - let params = json!([ - input.record_id, - input.family.as_str(), - input.status.as_str(), - input.source_runtime.as_str(), - input.created_at_ms, - input.inserted_at_ms, - input.inserted_at_ms, - input.owner_account_id, - input.owner_pubkey, - input.farm_id, - input.listing_addr, - encode_json(input.local_work_json.as_ref())?, - input.event_id, - input.event_kind, - input.event_pubkey, - input.event_created_at, - encode_json(input.event_tags_json.as_ref())?, - input.event_content, - input.event_sig, - encode_json(input.raw_event_json.as_ref())?, - input.outbox_status.as_str(), - input.relay_set_fingerprint, - encode_json(input.relay_delivery_json.as_ref())? - ]) - .to_string(); - let sql = "insert or ignore into local_event_record( - record_id, - family, - status, - source_runtime, - created_at_ms, - inserted_at_ms, - updated_at_ms, - owner_account_id, - owner_pubkey, - farm_id, - listing_addr, - local_work_json, - event_id, - event_kind, - event_pubkey, - event_created_at, - event_tags_json, - event_content, - event_sig, - raw_event_json, - outbox_status, - relay_set_fingerprint, - relay_delivery_json - ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; - self.executor.exec(sql, &params)?; + self.executor.begin()?; + let result = (|| -> Result<(), LocalEventsError> { + let change_seq = self.next_change_seq()?; + let params = json!([ + change_seq, + input.record_id, + input.family.as_str(), + input.status.as_str(), + input.source_runtime.as_str(), + input.created_at_ms, + input.inserted_at_ms, + input.inserted_at_ms, + input.owner_account_id, + input.owner_pubkey, + input.farm_id, + input.listing_addr, + encode_json(input.local_work_json.as_ref())?, + input.event_id, + input.event_kind, + input.event_pubkey, + input.event_created_at, + encode_json(input.event_tags_json.as_ref())?, + input.event_content, + input.event_sig, + encode_json(input.raw_event_json.as_ref())?, + input.outbox_status.as_str(), + input.relay_set_fingerprint, + encode_json(input.relay_delivery_json.as_ref())? + ]) + .to_string(); + let sql = "insert or ignore into local_event_record( + change_seq, + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json + ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + let _ = self.executor.exec(sql, &params)?; + Ok(()) + })(); + match result { + Ok(()) => self.executor.commit()?, + Err(err) => { + let _ = self.executor.rollback(); + return Err(err); + } + } self.get_record(&input.record_id)? .ok_or_else(|| LocalEventsError::InvalidRecord("record append failed".to_owned())) } @@ -107,7 +121,7 @@ impl<E: SqlExecutor> LocalEventsStore<E> { Ok(rows.into_iter().next()) } - pub fn list_records_after( + pub fn list_records_after_seq( &self, after_seq: i64, limit: u32, @@ -119,31 +133,60 @@ impl<E: SqlExecutor> LocalEventsStore<E> { ) } + pub fn list_records_changed_after( + &self, + after_change_seq: i64, + limit: u32, + ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { + let params = json!([after_change_seq, i64::from(limit)]).to_string(); + self.query_records( + "select * from local_event_record where change_seq > ? order by change_seq asc, seq asc limit ?", + &params, + ) + } + pub fn update_outbox( &self, update: &LocalEventRecordUpdate, ) -> Result<LocalEventRecord, LocalEventsError> { validate_non_empty("record_id", &update.record_id)?; - let params = json!([ - update.status.as_str(), - update.outbox_status.as_str(), - update.relay_set_fingerprint, - encode_json(update.relay_delivery_json.as_ref())?, - update.updated_at_ms, - update.record_id - ]) - .to_string(); - let outcome = self.executor.exec( - "update local_event_record - set status = ?, - outbox_status = ?, - relay_set_fingerprint = ?, - relay_delivery_json = ?, - updated_at_ms = ? - where record_id = ?", - &params, - )?; - if outcome.changes == 0 { + self.executor.begin()?; + let result = (|| -> Result<i64, LocalEventsError> { + let change_seq = self.next_change_seq()?; + let params = json!([ + change_seq, + update.status.as_str(), + update.outbox_status.as_str(), + update.relay_set_fingerprint, + encode_json(update.relay_delivery_json.as_ref())?, + update.updated_at_ms, + update.record_id + ]) + .to_string(); + let outcome = self.executor.exec( + "update local_event_record + set change_seq = ?, + status = ?, + outbox_status = ?, + relay_set_fingerprint = ?, + relay_delivery_json = ?, + updated_at_ms = ? + where record_id = ?", + &params, + )?; + Ok(outcome.changes) + })(); + let changes = match result { + Ok(changes) => { + self.executor.commit()?; + changes + } + Err(err) => { + let _ = self.executor.rollback(); + return Err(err); + } + }; + if changes == 0 { return Err(LocalEventsError::Sql(SqlError::NotFound( update.record_id.clone(), ))); @@ -159,7 +202,7 @@ impl<E: SqlExecutor> LocalEventsStore<E> { validate_non_empty("consumer_id", consumer_id)?; let params = json!([consumer_id]).to_string(); let raw = self.executor.query_raw( - "select consumer_id, last_seq, updated_at_ms from local_event_projection_cursor where consumer_id = ? limit 1", + "select consumer_id, last_change_seq, updated_at_ms from local_event_projection_cursor where consumer_id = ? limit 1", &params, )?; let rows: Vec<CursorRow> = serde_json::from_str(&raw)?; @@ -169,16 +212,16 @@ impl<E: SqlExecutor> LocalEventsStore<E> { pub fn advance_cursor( &self, consumer_id: &str, - last_seq: i64, + last_change_seq: i64, updated_at_ms: i64, ) -> Result<LocalEventsCursor, LocalEventsError> { validate_non_empty("consumer_id", consumer_id)?; - let params = json!([consumer_id, last_seq, updated_at_ms]).to_string(); + let params = json!([consumer_id, last_change_seq, updated_at_ms]).to_string(); self.executor.exec( - "insert into local_event_projection_cursor(consumer_id, last_seq, updated_at_ms) + "insert into local_event_projection_cursor(consumer_id, last_change_seq, updated_at_ms) values(?,?,?) on conflict(consumer_id) do update set - last_seq = max(local_event_projection_cursor.last_seq, excluded.last_seq), + last_change_seq = max(local_event_projection_cursor.last_change_seq, excluded.last_change_seq), updated_at_ms = excluded.updated_at_ms", &params, )?; @@ -195,11 +238,26 @@ impl<E: SqlExecutor> LocalEventsStore<E> { let rows: Vec<RecordRow> = serde_json::from_str(&raw)?; rows.into_iter().map(TryInto::try_into).collect() } + + fn next_change_seq(&self) -> Result<i64, LocalEventsError> { + let raw = self.executor.query_raw( + "select coalesce(max(change_seq), 0) + 1 as change_seq from local_event_record", + "[]", + )?; + let rows: Vec<ChangeSeqRow> = serde_json::from_str(&raw)?; + rows.into_iter() + .next() + .map(|row| row.change_seq) + .ok_or_else(|| { + LocalEventsError::InvalidRecord("change sequence unavailable".to_owned()) + }) + } } #[derive(Debug, Deserialize)] struct RecordRow { seq: i64, + change_seq: i64, record_id: String, family: String, status: String, @@ -231,6 +289,7 @@ impl TryFrom<RecordRow> for LocalEventRecord { fn try_from(row: RecordRow) -> Result<Self, Self::Error> { Ok(Self { seq: row.seq, + change_seq: row.change_seq, record_id: row.record_id, family: LocalRecordFamily::parse(&row.family)?, status: LocalRecordStatus::parse(&row.status)?, @@ -261,7 +320,7 @@ impl TryFrom<RecordRow> for LocalEventRecord { #[derive(Debug, Deserialize)] struct CursorRow { consumer_id: String, - last_seq: i64, + last_change_seq: i64, updated_at_ms: i64, } @@ -269,12 +328,17 @@ impl From<CursorRow> for LocalEventsCursor { fn from(row: CursorRow) -> Self { Self { consumer_id: row.consumer_id, - last_seq: row.last_seq, + last_change_seq: row.last_change_seq, updated_at_ms: row.updated_at_ms, } } } +#[derive(Debug, Deserialize)] +struct ChangeSeqRow { + change_seq: i64, +} + fn encode_json(value: Option<&Value>) -> Result<Option<String>, LocalEventsError> { value .map(serde_json::to_string) diff --git a/crates/local_events/tests/store.rs b/crates/local_events/tests/store.rs @@ -1,8 +1,9 @@ use radroots_local_events::{ LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily, - LocalRecordStatus, PublishOutboxStatus, SourceRuntime, + LocalRecordStatus, MIGRATIONS, PublishOutboxStatus, SourceRuntime, }; -use radroots_sql_core::SqliteExecutor; +use radroots_sql_core::migrations::migrations_run_all_up; +use radroots_sql_core::{SqlExecutor, SqliteExecutor}; use serde_json::json; fn store() -> LocalEventsStore<SqliteExecutor> { @@ -84,9 +85,10 @@ fn append_is_idempotent_by_record_id() { let first = store.append_record(&input).expect("append first"); let second = store.append_record(&input).expect("append second"); - let rows = store.list_records_after(0, 10).expect("list records"); + let rows = store.list_records_after_seq(0, 10).expect("list records"); assert_eq!(first.seq, second.seq); + assert_eq!(first.change_seq, second.change_seq); assert_eq!(rows.len(), 1); assert_eq!(rows[0].record_id, "local-a"); assert_eq!( @@ -105,9 +107,9 @@ fn projection_cursor_advances_without_rewinding() { let second = store.advance_cursor("app", 5, 200).expect("ignore rewind"); let third = store.advance_cursor("app", 12, 300).expect("advance again"); - assert_eq!(first.last_seq, 10); - assert_eq!(second.last_seq, 10); - assert_eq!(third.last_seq, 12); + assert_eq!(first.last_change_seq, 10); + assert_eq!(second.last_change_seq, 10); + assert_eq!(third.last_change_seq, 12); } #[test] @@ -134,3 +136,125 @@ fn outbox_status_updates_signed_event_records() { Some(json!({"acked":["ws://127.0.0.1:8080"]})) ); } + +#[test] +fn changed_after_uses_change_seq_for_appends_and_outbox_updates() { + let store = store(); + let input = signed_event("event-a"); + let appended = store.append_record(&input).expect("append signed event"); + let initial_rows = store + .list_records_changed_after(0, 10) + .expect("list initial changes"); + + assert_eq!(initial_rows.len(), 1); + assert_eq!(initial_rows[0].record_id, "event-a"); + assert_eq!(initial_rows[0].seq, appended.seq); + assert_eq!(initial_rows[0].change_seq, appended.change_seq); + + let updated = store + .update_outbox(&LocalEventRecordUpdate { + record_id: "event-a".to_owned(), + status: LocalRecordStatus::Published, + outbox_status: PublishOutboxStatus::Acknowledged, + relay_set_fingerprint: Some("relay-set-a".to_owned()), + relay_delivery_json: Some(json!({"acked":["ws://127.0.0.1:8080"]})), + updated_at_ms: 3000, + }) + .expect("update outbox"); + let changed_rows = store + .list_records_changed_after(appended.change_seq, 10) + .expect("list changed rows"); + + assert_eq!(updated.seq, appended.seq); + assert!(updated.change_seq > appended.change_seq); + assert_eq!(changed_rows.len(), 1); + assert_eq!(changed_rows[0].record_id, "event-a"); + assert_eq!(changed_rows[0].change_seq, updated.change_seq); +} + +#[test] +fn migration_assigns_existing_records_change_seq_from_insert_order() { + let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); + migrations_run_all_up(&executor, &MIGRATIONS[..1]).expect("apply initial migration"); + let first = insert_pre_change_tracking_record(&executor, "local-a"); + let second = insert_pre_change_tracking_record(&executor, "local-b"); + let store = LocalEventsStore::new(executor); + + store.migrate_up().expect("apply change tracking migration"); + let rows = store + .list_records_changed_after(0, 10) + .expect("list changed rows after migration"); + + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].seq, first); + assert_eq!(rows[0].change_seq, first); + assert_eq!(rows[1].seq, second); + assert_eq!(rows[1].change_seq, second); +} + +fn insert_pre_change_tracking_record(executor: &SqliteExecutor, record_id: &str) -> i64 { + let input = local_work(record_id); + let params = json!([ + input.record_id, + input.family.as_str(), + input.status.as_str(), + input.source_runtime.as_str(), + input.created_at_ms, + input.inserted_at_ms, + input.inserted_at_ms, + input.owner_account_id, + input.owner_pubkey, + input.farm_id, + input.listing_addr, + serde_json::to_string(&input.local_work_json).expect("encode local work"), + input.event_id, + input.event_kind, + input.event_pubkey, + input.event_created_at, + input + .event_tags_json + .map(|value| serde_json::to_string(&value).expect("encode tags")), + input.event_content, + input.event_sig, + input + .raw_event_json + .map(|value| serde_json::to_string(&value).expect("encode raw event")), + input.outbox_status.as_str(), + input.relay_set_fingerprint, + input + .relay_delivery_json + .map(|value| serde_json::to_string(&value).expect("encode relay delivery")), + ]) + .to_string(); + let outcome = executor + .exec( + "insert into local_event_record( + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json + ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + &params, + ) + .expect("insert old local event record"); + outcome.last_insert_id +}