commit d10f2335f2e4dab6e346591d14525078d9283216
parent 1dcee031c8bc4a6fd8a4db00a80ce7ebd980d67b
Author: triesap <tyson@radroots.org>
Date: Sat, 13 Jun 2026 03:48:35 -0700
event_store: replay projections by sequence
- add monotonic sequence storage for persisted Nostr events
- move projection cursors to versioned last-event sequence state
- return event sequence data from ingest and stored event APIs
- cover duplicate ingest and same-timestamp replay ordering
Diffstat:
3 files changed, 60 insertions(+), 36 deletions(-)
diff --git a/crates/event_store/migrations/0001_event_store.up.sql b/crates/event_store/migrations/0001_event_store.up.sql
@@ -1,5 +1,6 @@
CREATE TABLE nostr_event (
- event_id TEXT PRIMARY KEY NOT NULL,
+ seq INTEGER PRIMARY KEY AUTOINCREMENT,
+ event_id TEXT NOT NULL UNIQUE,
pubkey TEXT NOT NULL,
created_at INTEGER NOT NULL,
kind INTEGER NOT NULL,
@@ -17,8 +18,10 @@ CREATE TABLE nostr_event (
);
CREATE INDEX nostr_event_kind_created_idx ON nostr_event(kind, created_at, event_id);
-CREATE INDEX nostr_event_contract_idx ON nostr_event(contract_id, created_at, event_id);
-CREATE INDEX nostr_event_projection_idx ON nostr_event(projection_eligible, created_at, event_id);
+CREATE INDEX nostr_event_contract_idx ON nostr_event(contract_id, seq);
+CREATE INDEX nostr_event_projection_idx ON nostr_event(projection_eligible, seq);
+CREATE INDEX nostr_event_verification_contract_idx
+ON nostr_event(verification_status, contract_status, seq);
CREATE TABLE nostr_event_tag (
event_id TEXT NOT NULL REFERENCES nostr_event(event_id) ON DELETE CASCADE,
@@ -74,7 +77,7 @@ CREATE INDEX nostr_event_head_event_idx ON nostr_event_head(event_id);
CREATE TABLE projection_cursor (
projection_id TEXT PRIMARY KEY NOT NULL,
- last_event_id TEXT,
- last_created_at INTEGER NOT NULL,
+ projection_version INTEGER NOT NULL DEFAULT 1,
+ last_event_seq INTEGER NOT NULL DEFAULT 0,
updated_at_ms INTEGER NOT NULL
);
diff --git a/crates/event_store/src/model.rs b/crates/event_store/src/model.rs
@@ -221,6 +221,7 @@ impl RadrootsEventHeadStoreDecision {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RadrootsEventIngestReceipt {
+ pub seq: i64,
pub event_id: String,
pub inserted: bool,
pub verification_status: RadrootsEventVerificationStatus,
@@ -232,6 +233,7 @@ pub struct RadrootsEventIngestReceipt {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RadrootsStoredEvent {
+ pub seq: i64,
pub event_id: String,
pub pubkey: String,
pub created_at: u32,
@@ -275,8 +277,8 @@ pub struct RadrootsStoredEventHead {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RadrootsProjectionCursor {
pub projection_id: String,
- pub last_event_id: Option<String>,
- pub last_created_at: u32,
+ pub projection_version: u32,
+ pub last_event_seq: i64,
pub updated_at_ms: i64,
}
diff --git a/crates/event_store/src/store.rs b/crates/event_store/src/store.rs
@@ -84,15 +84,15 @@ impl RadrootsEventStore {
.unwrap_or_else(|| serde_json::to_string(&ingest.event))?;
let tags_json = serde_json::to_string(&ingest.event.tags)?;
let mut tx = self.pool.begin().await?;
- let inserted = insert_raw_event(
+ let insert = insert_raw_event(
&mut tx,
&ingest,
&classification,
raw_json.as_str(),
tags_json.as_str(),
)
- .await?
- > 0;
+ .await?;
+ let inserted = insert.inserted;
let mut head_decision = RadrootsEventHeadStoreDecision::Unsupported;
let mut projection_eligible =
classification.base_projection_eligible(ingest.verification_status);
@@ -126,6 +126,7 @@ impl RadrootsEventStore {
tx.commit().await?;
Ok(RadrootsEventIngestReceipt {
+ seq: insert.seq,
event_id: ingest.event.id,
inserted,
verification_status: ingest.verification_status,
@@ -143,7 +144,7 @@ impl RadrootsEventStore {
event_id: &str,
) -> Result<Option<RadrootsStoredEvent>, RadrootsEventStoreError> {
let row = sqlx::query(
- "SELECT event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE event_id = ?",
+ "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE event_id = ?",
)
.bind(event_id)
.fetch_optional(&self.pool)
@@ -214,7 +215,7 @@ impl RadrootsEventStore {
projection_id: &str,
) -> Result<Option<RadrootsProjectionCursor>, RadrootsEventStoreError> {
let row = sqlx::query(
- "SELECT projection_id, last_event_id, last_created_at, updated_at_ms FROM projection_cursor WHERE projection_id = ?",
+ "SELECT projection_id, projection_version, last_event_seq, updated_at_ms FROM projection_cursor WHERE projection_id = ?",
)
.bind(projection_id)
.fetch_optional(&self.pool)
@@ -227,11 +228,11 @@ impl RadrootsEventStore {
cursor: &RadrootsProjectionCursor,
) -> Result<(), RadrootsEventStoreError> {
sqlx::query(
- "INSERT INTO projection_cursor(projection_id, last_event_id, last_created_at, updated_at_ms) VALUES (?, ?, ?, ?) ON CONFLICT(projection_id) DO UPDATE SET last_event_id = excluded.last_event_id, last_created_at = excluded.last_created_at, updated_at_ms = excluded.updated_at_ms",
+ "INSERT INTO projection_cursor(projection_id, projection_version, last_event_seq, updated_at_ms) VALUES (?, ?, ?, ?) ON CONFLICT(projection_id) DO UPDATE SET projection_version = excluded.projection_version, last_event_seq = excluded.last_event_seq, updated_at_ms = excluded.updated_at_ms",
)
.bind(cursor.projection_id.as_str())
- .bind(cursor.last_event_id.as_deref())
- .bind(i64::from(cursor.last_created_at))
+ .bind(i64::from(cursor.projection_version))
+ .bind(cursor.last_event_seq)
.bind(cursor.updated_at_ms)
.execute(&self.pool)
.await?;
@@ -244,20 +245,14 @@ impl RadrootsEventStore {
limit: u32,
) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> {
let cursor = self.get_projection_cursor(projection_id).await?;
- let last_created_at = cursor
+ let last_event_seq = cursor
.as_ref()
- .map(|cursor| cursor.last_created_at)
+ .map(|cursor| cursor.last_event_seq)
.unwrap_or(0);
- let last_event_id = cursor
- .as_ref()
- .and_then(|cursor| cursor.last_event_id.as_deref())
- .unwrap_or("");
let rows = sqlx::query(
- "SELECT event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE projection_eligible = 1 AND (created_at > ? OR (created_at = ? AND event_id > ?)) ORDER BY created_at, event_id LIMIT ?",
+ "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE projection_eligible = 1 AND seq > ? ORDER BY seq ASC LIMIT ?",
)
- .bind(i64::from(last_created_at))
- .bind(i64::from(last_created_at))
- .bind(last_event_id)
+ .bind(last_event_seq)
.bind(i64::from(limit))
.fetch_all(&self.pool)
.await?;
@@ -296,6 +291,11 @@ struct AppliedHead {
projection_eligible: bool,
}
+struct InsertRawEventResult {
+ inserted: bool,
+ seq: i64,
+}
+
async fn configure_connection(
pool: &SqlitePool,
file_backed: bool,
@@ -364,7 +364,7 @@ async fn insert_raw_event(
classification: &EventClassification,
raw_json: &str,
tags_json: &str,
-) -> Result<u64, RadrootsEventStoreError> {
+) -> Result<InsertRawEventResult, RadrootsEventStoreError> {
let event = &ingest.event;
let contract_id = classification.contract.map(|contract| contract.id);
let event_class = classification
@@ -391,7 +391,20 @@ async fn insert_raw_event(
.bind(ingest.observed_at_ms)
.execute(&mut **tx)
.await?;
- Ok(result.rows_affected())
+ let inserted = result.rows_affected() > 0;
+ let seq = event_seq(tx, event.id.as_str()).await?;
+ Ok(InsertRawEventResult { inserted, seq })
+}
+
+async fn event_seq(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ event_id: &str,
+) -> Result<i64, RadrootsEventStoreError> {
+ let row = sqlx::query("SELECT seq FROM nostr_event WHERE event_id = ?")
+ .bind(event_id)
+ .fetch_one(&mut **tx)
+ .await?;
+ row.try_get("seq").map_err(Into::into)
}
async fn insert_tags(
@@ -601,6 +614,7 @@ fn stored_event_from_row(
.transpose()?;
let projection_eligible = row.try_get::<i64, _>("projection_eligible")? != 0;
Ok(RadrootsStoredEvent {
+ seq: row.try_get("seq")?,
event_id: row.try_get("event_id")?,
pubkey: row.try_get("pubkey")?,
created_at,
@@ -653,8 +667,8 @@ fn projection_cursor_from_row(
) -> Result<RadrootsProjectionCursor, RadrootsEventStoreError> {
Ok(RadrootsProjectionCursor {
projection_id: row.try_get("projection_id")?,
- last_event_id: row.try_get("last_event_id")?,
- last_created_at: u32_from_i64("last_created_at", row.try_get("last_created_at")?)?,
+ projection_version: u32_from_i64("projection_version", row.try_get("projection_version")?)?,
+ last_event_seq: row.try_get("last_event_seq")?,
updated_at_ms: row.try_get("updated_at_ms")?,
})
}
@@ -767,6 +781,8 @@ mod tests {
assert!(first.inserted);
assert!(!second.inserted);
+ assert_eq!(first.seq, second.seq);
+ assert_eq!(stored.seq, first.seq);
assert_eq!(stored.raw_json, "{\"fixture\":true}");
assert_eq!(stored.content, "hello");
assert_eq!(stored.tags_json, "[[\"t\",\"soil\"]]");
@@ -907,29 +923,32 @@ mod tests {
}
#[tokio::test]
- async fn projection_cursors_drive_replay_without_json_extraction() {
+ async fn projection_cursors_replay_by_store_sequence() {
let store = RadrootsEventStore::open_memory().await.expect("open");
- let first = event(KIND_POST, 'c', 'd', 30, Vec::new(), "one");
- let second = event(KIND_POST, 'd', 'd', 31, Vec::new(), "two");
- store
+ let first = event(KIND_POST, 'e', 'd', 30, Vec::new(), "one");
+ let second = event(KIND_POST, 'd', 'd', 30, Vec::new(), "two");
+ let first_receipt = store
.ingest_event(RadrootsEventIngest::verified(first.clone(), 6_000))
.await
.expect("first");
- store
+ let second_receipt = store
.ingest_event(RadrootsEventIngest::verified(second.clone(), 6_100))
.await
.expect("second");
+ assert!(first_receipt.seq < second_receipt.seq);
let replay = store
.events_since_cursor("social", 10)
.await
.expect("initial replay");
assert_eq!(replay.len(), 2);
+ assert_eq!(replay[0].event_id, first.id);
+ assert_eq!(replay[1].event_id, second.id);
store
.update_projection_cursor(&RadrootsProjectionCursor {
projection_id: "social".to_owned(),
- last_event_id: Some(first.id),
- last_created_at: 30,
+ projection_version: 1,
+ last_event_seq: first_receipt.seq,
updated_at_ms: 6_200,
})
.await