commit 3fc747450e3381a1d577d5eac72c397c4dd600e8
parent 3a2fb61198689ff1584c436ae8a56c39468a1f4d
Author: triesap <tyson@radroots.org>
Date: Sat, 23 May 2026 10:31:42 +0000
local-events: add newest-first change reads
- add latest changed-record listing support
- add before-cursor changed-record pagination
- preserve ascending changed-after cursor behavior
- cover ordering cursor and older-record volume cases
Diffstat:
2 files changed, 114 insertions(+), 0 deletions(-)
diff --git a/crates/local_events/src/store.rs b/crates/local_events/src/store.rs
@@ -145,6 +145,39 @@ impl<E: SqlExecutor> LocalEventsStore<E> {
)
}
+ pub fn list_records_changed_latest(
+ &self,
+ limit: u32,
+ ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
+ let params = json!([i64::from(limit)]).to_string();
+ self.query_records(
+ "select * from local_event_record order by change_seq desc, seq desc, record_id asc limit ?",
+ ¶ms,
+ )
+ }
+
+ pub fn list_records_changed_before(
+ &self,
+ before_change_seq: i64,
+ before_seq: i64,
+ limit: u32,
+ ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
+ let params = json!([
+ before_change_seq,
+ before_change_seq,
+ before_seq,
+ i64::from(limit)
+ ])
+ .to_string();
+ self.query_records(
+ "select * from local_event_record
+ where change_seq < ? or (change_seq = ? and seq < ?)
+ order by change_seq desc, seq desc, record_id asc
+ limit ?",
+ ¶ms,
+ )
+ }
+
pub fn update_outbox(
&self,
update: &LocalEventRecordUpdate,
diff --git a/crates/local_events/tests/store.rs b/crates/local_events/tests/store.rs
@@ -173,6 +173,87 @@ fn changed_after_uses_change_seq_for_appends_and_outbox_updates() {
}
#[test]
+fn changed_latest_lists_newest_records_first() {
+ let store = store();
+ let first = store
+ .append_record(&local_work("local-a"))
+ .expect("append first");
+ let second = store
+ .append_record(&local_work("local-b"))
+ .expect("append second");
+ let third = store
+ .append_record(&local_work("local-c"))
+ .expect("append third");
+
+ let rows = store
+ .list_records_changed_latest(2)
+ .expect("list latest changed rows");
+
+ assert_eq!(rows.len(), 2);
+ assert_eq!(rows[0].record_id, "local-c");
+ assert_eq!(rows[0].change_seq, third.change_seq);
+ assert_eq!(rows[1].record_id, "local-b");
+ assert_eq!(rows[1].change_seq, second.change_seq);
+ assert!(rows[1].change_seq > first.change_seq);
+}
+
+#[test]
+fn changed_before_pages_newest_first_by_cursor() {
+ let store = store();
+ let _first = store
+ .append_record(&local_work("local-a"))
+ .expect("append first");
+ let second = store
+ .append_record(&local_work("local-b"))
+ .expect("append second");
+ let third = store
+ .append_record(&local_work("local-c"))
+ .expect("append third");
+ let fourth = store
+ .append_record(&local_work("local-d"))
+ .expect("append fourth");
+
+ let first_page = store
+ .list_records_changed_latest(2)
+ .expect("list first page");
+ let cursor = first_page.last().expect("last first page");
+ let second_page = store
+ .list_records_changed_before(cursor.change_seq, cursor.seq, 2)
+ .expect("list second page");
+
+ assert_eq!(first_page.len(), 2);
+ assert_eq!(first_page[0].record_id, "local-d");
+ assert_eq!(first_page[0].change_seq, fourth.change_seq);
+ assert_eq!(first_page[1].record_id, "local-c");
+ assert_eq!(first_page[1].change_seq, third.change_seq);
+ assert_eq!(second_page.len(), 2);
+ assert_eq!(second_page[0].record_id, "local-b");
+ assert_eq!(second_page[0].change_seq, second.change_seq);
+ assert_eq!(second_page[1].record_id, "local-a");
+}
+
+#[test]
+fn changed_latest_is_not_blocked_by_older_record_volume() {
+ let store = store();
+ for index in 0..505 {
+ store
+ .append_record(&local_work(&format!("older-{index:03}")))
+ .expect("append older record");
+ }
+ let current = store
+ .append_record(&local_work("current-record"))
+ .expect("append current record");
+
+ let rows = store
+ .list_records_changed_latest(1)
+ .expect("list latest record");
+
+ assert_eq!(rows.len(), 1);
+ assert_eq!(rows[0].record_id, "current-record");
+ assert_eq!(rows[0].change_seq, current.change_seq);
+}
+
+#[test]
fn migration_assigns_existing_records_change_seq_from_insert_order() {
let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
migrations_run_all_up(&executor, &MIGRATIONS[..1]).expect("apply initial migration");