commit 7d8bcc4a6a8ddfbcf586eebce6b012e728e23484
parent 0357954d7db7702064c35905509abe8bd00ca090
Author: triesap <tyson@radroots.org>
Date: Tue, 26 May 2026 10:14:21 +0000
local-events: repair network source migration
- add the forward network source runtime repair migration
- preserve existing change-tracked local event rows
- cover pre-network sqlite schema migration
- validate local-events crate checks
Diffstat:
4 files changed, 266 insertions(+), 0 deletions(-)
diff --git a/crates/local_events/migrations/0002_network_source_runtime.down.sql b/crates/local_events/migrations/0002_network_source_runtime.down.sql
@@ -0,0 +1 @@
+select 1;
diff --git a/crates/local_events/migrations/0002_network_source_runtime.up.sql b/crates/local_events/migrations/0002_network_source_runtime.up.sql
@@ -0,0 +1,97 @@
+create table local_event_record_network_source_next (
+ seq integer primary key autoincrement,
+ change_seq integer not null unique,
+ record_id text not null unique,
+ family text not null check (family in ('local_work', 'signed_event')),
+ status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')),
+ source_runtime text not null check (source_runtime in ('cli', 'app', 'network', 'service', 'worker', 'test')),
+ created_at_ms integer not null,
+ inserted_at_ms integer not null,
+ updated_at_ms integer not null,
+ owner_account_id text,
+ owner_pubkey text,
+ farm_id text,
+ listing_addr text,
+ local_work_json text,
+ event_id text,
+ event_kind integer,
+ event_pubkey text,
+ event_created_at integer,
+ event_tags_json text,
+ event_content text,
+ event_sig text,
+ raw_event_json text,
+ outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')),
+ relay_set_fingerprint text,
+ relay_delivery_json text,
+ check (change_seq >= 1),
+ check (trim(record_id) <> ''),
+ check (family <> 'local_work' or local_work_json is not null),
+ check (family <> 'local_work' or outbox_status = 'none'),
+ check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null))
+);
+
+insert into local_event_record_network_source_next(
+ seq,
+ change_seq,
+ record_id,
+ family,
+ status,
+ source_runtime,
+ created_at_ms,
+ inserted_at_ms,
+ updated_at_ms,
+ owner_account_id,
+ owner_pubkey,
+ farm_id,
+ listing_addr,
+ local_work_json,
+ event_id,
+ event_kind,
+ event_pubkey,
+ event_created_at,
+ event_tags_json,
+ event_content,
+ event_sig,
+ raw_event_json,
+ outbox_status,
+ relay_set_fingerprint,
+ relay_delivery_json
+)
+select
+ seq,
+ change_seq,
+ record_id,
+ family,
+ status,
+ source_runtime,
+ created_at_ms,
+ inserted_at_ms,
+ updated_at_ms,
+ owner_account_id,
+ owner_pubkey,
+ farm_id,
+ listing_addr,
+ local_work_json,
+ event_id,
+ event_kind,
+ event_pubkey,
+ event_created_at,
+ event_tags_json,
+ event_content,
+ event_sig,
+ raw_event_json,
+ outbox_status,
+ relay_set_fingerprint,
+ relay_delivery_json
+from local_event_record
+order by seq asc;
+
+drop table local_event_record;
+alter table local_event_record_network_source_next rename to local_event_record;
+
+create index local_event_record_change_seq_idx on local_event_record(change_seq);
+create index local_event_record_event_id_idx on local_event_record(event_id);
+create index local_event_record_listing_addr_idx on local_event_record(listing_addr);
+create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey);
+create index local_event_record_status_idx on local_event_record(status);
diff --git a/crates/local_events/src/migrations.rs b/crates/local_events/src/migrations.rs
@@ -15,6 +15,11 @@ pub static MIGRATIONS: &[Migration] = &[
up_sql: include_str!("../migrations/0001_change_tracking.up.sql"),
down_sql: include_str!("../migrations/0001_change_tracking.down.sql"),
},
+ Migration {
+ name: "0002_network_source_runtime",
+ up_sql: include_str!("../migrations/0002_network_source_runtime.up.sql"),
+ down_sql: include_str!("../migrations/0002_network_source_runtime.down.sql"),
+ },
];
pub fn run_all_up<E>(executor: &E) -> Result<(), SqlError>
diff --git a/crates/local_events/tests/store.rs b/crates/local_events/tests/store.rs
@@ -325,6 +325,37 @@ fn migration_assigns_existing_records_change_seq_from_insert_order() {
assert_eq!(rows[1].change_seq, second);
}
+#[test]
+fn migration_repairs_pre_network_source_runtime_constraint() {
+ let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
+ create_pre_network_change_tracking_schema(&executor);
+ let legacy_seq = insert_pre_network_change_tracking_record(&executor, "legacy-cli", 1);
+ let store = LocalEventsStore::new(executor);
+
+ store
+ .migrate_up()
+ .expect("apply network source repair migration");
+ let mut input = signed_event("event-network-repaired");
+ input.source_runtime = SourceRuntime::Network;
+ input.event_id = Some("event-network-repaired".to_owned());
+ input.raw_event_json = Some(json!({"id":"event-network-repaired","kind":3421}));
+ let inserted = store
+ .append_record(&input)
+ .expect("append repaired network event");
+ let rows = store
+ .list_records_changed_after(0, 10)
+ .expect("list changed rows after repair");
+
+ assert_eq!(legacy_seq, 1);
+ assert_eq!(rows.len(), 2);
+ assert_eq!(rows[0].record_id, "legacy-cli");
+ assert_eq!(rows[0].change_seq, 1);
+ assert_eq!(rows[0].source_runtime, SourceRuntime::Cli);
+ assert_eq!(rows[1].record_id, "event-network-repaired");
+ assert_eq!(rows[1].seq, inserted.seq);
+ assert_eq!(rows[1].source_runtime, SourceRuntime::Network);
+}
+
fn insert_pre_change_tracking_record(executor: &SqliteExecutor, record_id: &str) -> i64 {
let input = local_work(record_id);
let params = json!([
@@ -391,3 +422,135 @@ fn insert_pre_change_tracking_record(executor: &SqliteExecutor, record_id: &str)
.expect("insert old local event record");
outcome.last_insert_id
}
+
+fn create_pre_network_change_tracking_schema(executor: &SqliteExecutor) {
+ let schema = [
+ "create table __migrations(id integer primary key, name text not null unique, applied_at text not null default (datetime('now')))",
+ "create table local_event_record (
+ seq integer primary key autoincrement,
+ change_seq integer not null unique,
+ record_id text not null unique,
+ family text not null check (family in ('local_work', 'signed_event')),
+ status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')),
+ source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')),
+ created_at_ms integer not null,
+ inserted_at_ms integer not null,
+ updated_at_ms integer not null,
+ owner_account_id text,
+ owner_pubkey text,
+ farm_id text,
+ listing_addr text,
+ local_work_json text,
+ event_id text,
+ event_kind integer,
+ event_pubkey text,
+ event_created_at integer,
+ event_tags_json text,
+ event_content text,
+ event_sig text,
+ raw_event_json text,
+ outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')),
+ relay_set_fingerprint text,
+ relay_delivery_json text,
+ check (change_seq >= 1),
+ check (trim(record_id) <> ''),
+ check (family <> 'local_work' or local_work_json is not null),
+ check (family <> 'local_work' or outbox_status = 'none'),
+ check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null))
+ )",
+ "create index local_event_record_change_seq_idx on local_event_record(change_seq)",
+ "create index local_event_record_event_id_idx on local_event_record(event_id)",
+ "create index local_event_record_listing_addr_idx on local_event_record(listing_addr)",
+ "create index local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey)",
+ "create index local_event_record_status_idx on local_event_record(status)",
+ "create table local_event_projection_cursor (
+ consumer_id text primary key,
+ last_change_seq integer not null,
+ updated_at_ms integer not null,
+ check (trim(consumer_id) <> ''),
+ check (last_change_seq >= 0)
+ )",
+ ];
+ for sql in schema {
+ executor.exec(sql, "[]").expect("schema statement");
+ }
+ for name in ["0000_local_events", "0001_change_tracking"] {
+ let params = json!([name]).to_string();
+ executor
+ .exec("insert into __migrations(name) values(?)", ¶ms)
+ .expect("migration marker");
+ }
+}
+
+fn insert_pre_network_change_tracking_record(
+ executor: &SqliteExecutor,
+ record_id: &str,
+ change_seq: i64,
+) -> i64 {
+ let input = local_work(record_id);
+ let params = json!([
+ change_seq,
+ input.record_id,
+ input.family.as_str(),
+ input.status.as_str(),
+ input.source_runtime.as_str(),
+ input.created_at_ms,
+ input.inserted_at_ms,
+ input.inserted_at_ms,
+ input.owner_account_id,
+ input.owner_pubkey,
+ input.farm_id,
+ input.listing_addr,
+ serde_json::to_string(&input.local_work_json).expect("encode local work"),
+ input.event_id,
+ input.event_kind,
+ input.event_pubkey,
+ input.event_created_at,
+ input
+ .event_tags_json
+ .map(|value| serde_json::to_string(&value).expect("encode tags")),
+ input.event_content,
+ input.event_sig,
+ input
+ .raw_event_json
+ .map(|value| serde_json::to_string(&value).expect("encode raw event")),
+ input.outbox_status.as_str(),
+ input.relay_set_fingerprint,
+ input
+ .relay_delivery_json
+ .map(|value| serde_json::to_string(&value).expect("encode relay delivery")),
+ ])
+ .to_string();
+ let outcome = executor
+ .exec(
+ "insert into local_event_record(
+ change_seq,
+ record_id,
+ family,
+ status,
+ source_runtime,
+ created_at_ms,
+ inserted_at_ms,
+ updated_at_ms,
+ owner_account_id,
+ owner_pubkey,
+ farm_id,
+ listing_addr,
+ local_work_json,
+ event_id,
+ event_kind,
+ event_pubkey,
+ event_created_at,
+ event_tags_json,
+ event_content,
+ event_sig,
+ raw_event_json,
+ outbox_status,
+ relay_set_fingerprint,
+ relay_delivery_json
+ ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
+ ¶ms,
+ )
+ .expect("insert pre-network local event record");
+ outcome.last_insert_id
+}