lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 74d6515696249db9d51b61293c6f8832a543ed67
parent ab432b108686980036251f268d060b8674d71f44
Author: triesap <tyson@radroots.org>
Date:   Fri, 12 Jun 2026 22:45:44 -0700

event_store: add sqlx raw event store

- add a SQLx SQLite event-store crate with raw events, tags, relay observations, heads, and cursors
- classify ingested events through the contract registry and event-head selector
- align SQLite native bindings and keep geocoder byte-backed opens safe on the shared rusqlite line
- list the new crate in rr-rs SDK metadata as deferred publication

Diffstat:
MCargo.lock | 240+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
MCargo.toml | 5++++-
Acrates/event_store/Cargo.toml | 31+++++++++++++++++++++++++++++++
Acrates/event_store/README | 3+++
Acrates/event_store/migrations/0001_event_store.down.sql | 5+++++
Acrates/event_store/migrations/0001_event_store.up.sql | 80+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/event_store/src/error.rs | 23+++++++++++++++++++++++
Acrates/event_store/src/lib.rs | 24++++++++++++++++++++++++
Acrates/event_store/src/migrations.rs | 3+++
Acrates/event_store/src/model.rs | 322+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/event_store/src/store.rs | 944+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/geocoder/Cargo.toml | 6++----
Mcrates/geocoder/src/error.rs | 2++
Mcrates/geocoder/src/geocoder.rs | 41++++++++++++++++++++++++++++++++---------
Mspec/README.md | 1+
Mspec/manifest.toml | 1+
16 files changed, 1694 insertions(+), 37 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -289,6 +289,15 @@ dependencies = [ ] [[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + +[[package]] name = "atomic" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -775,6 +784,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" [[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + +[[package]] name = "config" version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -952,6 +970,21 @@ dependencies = [ ] [[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "217698eaf96b4a3f0bc4f3662aaa55bdf913cd54d7204591faa790070c6d0853" + +[[package]] name = "crc32fast" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1351,6 +1384,12 @@ dependencies = [ ] [[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + +[[package]] name = "downcast-rs" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1523,6 +1562,17 @@ dependencies = [ ] [[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] name = "eventsource-stream" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1640,6 +1690,17 @@ dependencies = [ ] [[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + +[[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1730,6 +1791,17 @@ dependencies = [ ] [[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + +[[package]] name = "futures-io" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1952,6 +2024,24 @@ dependencies = [ ] [[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + +[[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2564,9 +2654,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.37.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f111c8c41e7c61a49cd34e44c7619462967221a6443b0ec299e0ac30cfb9b1" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -3470,6 +3560,12 @@ dependencies = [ ] [[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + +[[package]] name = "parking_lot" version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3912,6 +4008,18 @@ dependencies = [ ] [[package]] +name = "radroots_event_store" +version = "0.1.0-alpha.2" +dependencies = [ + "radroots_events", + "serde", + "serde_json", + "sqlx", + "thiserror 1.0.69", + "tokio", +] + +[[package]] name = "radroots_events" version = "0.1.0-alpha.2" dependencies = [ @@ -4783,27 +4891,17 @@ dependencies = [ ] [[package]] -name = "rsqlite-vfs" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a1f2315036ef6b1fbacd1972e8ee7688030b0a2121edfc2a6550febd41574d" -dependencies = [ - "hashbrown 0.16.1", - "thiserror 2.0.18", -] - -[[package]] name = "rusqlite" -version = "0.39.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0d2b0146dd9661bf67bb107c0bb2a55064d556eeb3fc314151b957f313bcd4e" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.11.0", "fallible-iterator", "fallible-streaming-iterator", + "hashlink 0.9.1", "libsqlite3-sys", "smallvec", - "sqlite-wasm-rs", ] [[package]] @@ -6408,6 +6506,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -6420,15 +6521,107 @@ dependencies = [ ] [[package]] -name = "sqlite-wasm-rs" -version = "0.5.2" +name = "sqlx" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4206ed3a67690b9c29b77d728f6acc3ce78f16bf846d83c94f76400320181b" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" dependencies = [ - "cc", - "js-sys", - "rsqlite-vfs", - "wasm-bindgen", + "sqlx-core", + "sqlx-macros", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" +dependencies = [ + "base64 0.22.1", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.5", + "hashlink 0.10.0", + "indexmap 2.13.0", + "log", + "memchr", + "once_cell", + "percent-encoding", + "serde", + "sha2", + "smallvec", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.117", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" +dependencies = [ + "dotenvy", + "either", + "heck", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-sqlite", + "syn 2.0.117", + "tokio", + "url", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "thiserror 2.0.18", + "tracing", + "url", ] [[package]] @@ -7038,6 +7231,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -8141,7 +8335,7 @@ checksum = "8902160c4e6f2fb145dbe9d6760a75e3c9522d8bf796ed7047c85919ac7115f8" dependencies = [ "arraydeque", "encoding_rs", - "hashlink", + "hashlink 0.8.4", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "crates/core", "crates/events", + "crates/event_store", "crates/events_codec", "crates/events_codec_wasm", "crates/events_indexed", @@ -60,6 +61,7 @@ readme = "README" [workspace.dependencies] radroots_core = { path = "crates/core", version = "0.1.0-alpha.2", default-features = false } radroots_events = { path = "crates/events", version = "0.1.0-alpha.2", default-features = false } +radroots_event_store = { path = "crates/event_store", version = "0.1.0-alpha.2", default-features = false } radroots_events_codec = { path = "crates/events_codec", version = "0.1.0-alpha.2", default-features = false } radroots_events_indexed = { path = "crates/events_indexed", version = "0.1.0-alpha.2", default-features = false } radroots_geocoder = { path = "crates/geocoder", version = "0.1.0-alpha.2" } @@ -138,6 +140,7 @@ serde = { version = "1", default-features = false, features = [ serde_json = { version = "1", default-features = false, features = ["alloc"] } serde-wasm-bindgen = { version = "0.6" } sha2 = { version = "0.10", default-features = false } +sqlx = { version = "0.8.6", default-features = false } sp1-build = { version = "6.2.3" } sp1-sdk = { version = "6.2.3", default-features = false } sp1-zkvm = { version = "6.2.3" } @@ -172,4 +175,4 @@ uniffi_build = { version = "=0.29.4" } wasm-bindgen = { version = "0.2" } wasm-bindgen-futures = { version = "0.4" } wasm-bindgen-test = { version = "0.3" } -rusqlite = { version = "0.39", default-features = false } +rusqlite = { version = "0.32.1", default-features = false } diff --git a/crates/event_store/Cargo.toml b/crates/event_store/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "radroots_event_store" +publish = false +version = "0.1.0-alpha.2" +edition.workspace = true +authors = ["Tyson Lupul <tyson@radroots.org>"] +rust-version.workspace = true +license.workspace = true +description = "SQLx-backed canonical Nostr event store substrate" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots_event_store" +readme = "README" + +[features] +default = ["sqlite", "runtime-tokio"] +sqlite = ["dep:sqlx", "sqlx/sqlite"] +runtime-tokio = ["sqlx/runtime-tokio"] + +[dependencies] +radroots_events = { workspace = true, default-features = false, features = [ + "std", + "serde", +] } +serde = { workspace = true, features = ["std"] } +serde_json = { workspace = true, features = ["std"] } +sqlx = { workspace = true, optional = true, features = ["derive"] } +thiserror = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/event_store/README b/crates/event_store/README @@ -0,0 +1,3 @@ +# radroots_event_store + +SQLx-backed canonical raw Nostr event storage for Rad Roots protocol events. diff --git a/crates/event_store/migrations/0001_event_store.down.sql b/crates/event_store/migrations/0001_event_store.down.sql @@ -0,0 +1,5 @@ +DROP TABLE projection_cursor; +DROP TABLE nostr_event_head; +DROP TABLE relay_event_seen; +DROP TABLE nostr_event_tag; +DROP TABLE nostr_event; diff --git a/crates/event_store/migrations/0001_event_store.up.sql b/crates/event_store/migrations/0001_event_store.up.sql @@ -0,0 +1,80 @@ +CREATE TABLE nostr_event ( + event_id TEXT PRIMARY KEY NOT NULL, + pubkey TEXT NOT NULL, + created_at INTEGER NOT NULL, + kind INTEGER NOT NULL, + tags_json TEXT NOT NULL, + content TEXT NOT NULL, + sig TEXT NOT NULL, + raw_json TEXT NOT NULL, + verification_status TEXT NOT NULL, + contract_status TEXT NOT NULL, + contract_id TEXT, + event_class TEXT, + projection_eligible INTEGER NOT NULL, + inserted_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE INDEX nostr_event_kind_created_idx ON nostr_event(kind, created_at, event_id); +CREATE INDEX nostr_event_contract_idx ON nostr_event(contract_id, created_at, event_id); +CREATE INDEX nostr_event_projection_idx ON nostr_event(projection_eligible, created_at, event_id); + +CREATE TABLE nostr_event_tag ( + event_id TEXT NOT NULL REFERENCES nostr_event(event_id) ON DELETE CASCADE, + tag_index INTEGER NOT NULL, + tag_name TEXT NOT NULL, + tag_value TEXT, + tag_json TEXT NOT NULL, + contract_semantic TEXT, + contract_value_type TEXT, + relay_indexed INTEGER NOT NULL, + PRIMARY KEY (event_id, tag_index) +); + +CREATE INDEX nostr_event_tag_lookup_idx ON nostr_event_tag(tag_name, tag_value, event_id); +CREATE INDEX nostr_event_tag_relay_idx ON nostr_event_tag(relay_indexed, tag_name, tag_value, event_id); + +CREATE TABLE relay_event_seen ( + event_id TEXT NOT NULL REFERENCES nostr_event(event_id) ON DELETE CASCADE, + relay_url TEXT NOT NULL, + observation_type TEXT NOT NULL, + first_seen_at_ms INTEGER NOT NULL, + last_seen_at_ms INTEGER NOT NULL, + observation_count INTEGER NOT NULL, + last_message TEXT, + PRIMARY KEY (event_id, relay_url, observation_type) +); + +CREATE INDEX relay_event_seen_relay_idx ON relay_event_seen(relay_url, last_seen_at_ms, event_id); + +CREATE TABLE nostr_event_head ( + coordinate_type TEXT NOT NULL, + kind INTEGER NOT NULL, + pubkey TEXT NOT NULL, + d_tag TEXT, + event_id TEXT NOT NULL REFERENCES nostr_event(event_id) ON DELETE CASCADE, + created_at INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL, + CHECK ( + (coordinate_type = 'replaceable' AND d_tag IS NULL) + OR (coordinate_type = 'addressable' AND d_tag IS NOT NULL) + ) +); + +CREATE UNIQUE INDEX nostr_event_head_replaceable_idx +ON nostr_event_head(kind, pubkey) +WHERE coordinate_type = 'replaceable'; + +CREATE UNIQUE INDEX nostr_event_head_addressable_idx +ON nostr_event_head(kind, pubkey, d_tag) +WHERE coordinate_type = 'addressable'; + +CREATE INDEX nostr_event_head_event_idx ON nostr_event_head(event_id); + +CREATE TABLE projection_cursor ( + projection_id TEXT PRIMARY KEY NOT NULL, + last_event_id TEXT, + last_created_at INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL +); diff --git a/crates/event_store/src/error.rs b/crates/event_store/src/error.rs @@ -0,0 +1,23 @@ +use radroots_events::contract::RadrootsContractMatchError; +use radroots_events::event_head::RadrootsEventHeadMalformed; +use radroots_events::ids::RadrootsIdParseError; + +#[derive(Debug, thiserror::Error)] +pub enum RadrootsEventStoreError { + #[error("sqlx error: {0}")] + Sqlx(#[from] sqlx::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + #[error("contract match error: {0:?}")] + ContractMatch(RadrootsContractMatchError), + #[error("event-head malformed: {0:?}")] + EventHeadMalformed(RadrootsEventHeadMalformed), + #[error("identifier parse error: {0}")] + IdParse(#[from] RadrootsIdParseError), + #[error("stored event `{0}` was not found")] + MissingEvent(String), + #[error("invalid stored enum value `{value}` for {field}")] + InvalidStoredEnum { field: &'static str, value: String }, + #[error("integer value `{value}` is outside {field} range")] + IntegerRange { field: &'static str, value: i64 }, +} diff --git a/crates/event_store/src/lib.rs b/crates/event_store/src/lib.rs @@ -0,0 +1,24 @@ +#![forbid(unsafe_code)] + +#[cfg(feature = "sqlite")] +mod error; +#[cfg(feature = "sqlite")] +mod migrations; +#[cfg(feature = "sqlite")] +mod model; +#[cfg(feature = "sqlite")] +mod store; + +#[cfg(feature = "sqlite")] +pub use error::RadrootsEventStoreError; +#[cfg(feature = "sqlite")] +pub use migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP}; +#[cfg(feature = "sqlite")] +pub use model::{ + RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest, + RadrootsEventIngestReceipt, RadrootsEventVerificationStatus, RadrootsProjectionCursor, + RadrootsRelayObservation, RadrootsRelayObservationType, RadrootsStoredEvent, + RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass, +}; +#[cfg(feature = "sqlite")] +pub use store::RadrootsEventStore; diff --git a/crates/event_store/src/migrations.rs b/crates/event_store/src/migrations.rs @@ -0,0 +1,3 @@ +pub const EVENT_STORE_MIGRATION_UP: &str = include_str!("../migrations/0001_event_store.up.sql"); +pub const EVENT_STORE_MIGRATION_DOWN: &str = + include_str!("../migrations/0001_event_store.down.sql"); diff --git a/crates/event_store/src/model.rs b/crates/event_store/src/model.rs @@ -0,0 +1,322 @@ +use crate::RadrootsEventStoreError; +use radroots_events::RadrootsNostrEvent; +use radroots_events::contract::{ + RadrootsContractMatchError, RadrootsEventClass, RadrootsTagSemantic, RadrootsTagValueType, +}; +use radroots_events::event_head::RadrootsEventHeadDecision; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RadrootsEventVerificationStatus { + Verified, + Unverified, + Invalid, +} + +impl RadrootsEventVerificationStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Verified => "verified", + Self::Unverified => "unverified", + Self::Invalid => "invalid", + } + } + + pub fn parse(value: &str) -> Result<Self, RadrootsEventStoreError> { + match value { + "verified" => Ok(Self::Verified), + "unverified" => Ok(Self::Unverified), + "invalid" => Ok(Self::Invalid), + _ => Err(RadrootsEventStoreError::InvalidStoredEnum { + field: "verification_status", + value: value.to_owned(), + }), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RadrootsEventContractStatus { + Supported, + UnsupportedKind(u32), + UnsupportedShape(u32), + AmbiguousShape(u32), +} + +impl RadrootsEventContractStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Supported => "supported", + Self::UnsupportedKind(_) => "unsupported_kind", + Self::UnsupportedShape(_) => "unsupported_shape", + Self::AmbiguousShape(_) => "ambiguous_shape", + } + } + + pub fn from_match_error(error: RadrootsContractMatchError) -> Self { + match error { + RadrootsContractMatchError::UnsupportedKind(kind) => Self::UnsupportedKind(kind), + RadrootsContractMatchError::UnsupportedShape(kind) => Self::UnsupportedShape(kind), + RadrootsContractMatchError::AmbiguousShape(kind) => Self::AmbiguousShape(kind), + } + } + + pub fn parse(value: &str, kind: u32) -> Result<Self, RadrootsEventStoreError> { + match value { + "supported" => Ok(Self::Supported), + "unsupported_kind" => Ok(Self::UnsupportedKind(kind)), + "unsupported_shape" => Ok(Self::UnsupportedShape(kind)), + "ambiguous_shape" => Ok(Self::AmbiguousShape(kind)), + _ => Err(RadrootsEventStoreError::InvalidStoredEnum { + field: "contract_status", + value: value.to_owned(), + }), + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StoredEventClass { + Regular, + Replaceable, + Addressable, + Ephemeral, +} + +impl StoredEventClass { + pub fn as_str(self) -> &'static str { + match self { + Self::Regular => "regular", + Self::Replaceable => "replaceable", + Self::Addressable => "addressable", + Self::Ephemeral => "ephemeral", + } + } + + pub fn from_event_class(value: RadrootsEventClass) -> Self { + match value { + RadrootsEventClass::Regular => Self::Regular, + RadrootsEventClass::Replaceable => Self::Replaceable, + RadrootsEventClass::Addressable => Self::Addressable, + RadrootsEventClass::Ephemeral => Self::Ephemeral, + } + } + + pub fn parse(value: &str) -> Result<Self, RadrootsEventStoreError> { + match value { + "regular" => Ok(Self::Regular), + "replaceable" => Ok(Self::Replaceable), + "addressable" => Ok(Self::Addressable), + "ephemeral" => Ok(Self::Ephemeral), + _ => Err(RadrootsEventStoreError::InvalidStoredEnum { + field: "event_class", + value: value.to_owned(), + }), + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RadrootsRelayObservationType { + Fetch, + Subscription, + PublishAck, + Import, +} + +impl RadrootsRelayObservationType { + pub fn as_str(self) -> &'static str { + match self { + Self::Fetch => "fetch", + Self::Subscription => "subscription", + Self::PublishAck => "publish_ack", + Self::Import => "import", + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsRelayObservation { + pub relay_url: String, + pub observation_type: RadrootsRelayObservationType, + pub observed_at_ms: i64, + pub message: Option<String>, +} + +impl RadrootsRelayObservation { + pub fn new( + relay_url: impl Into<String>, + observation_type: RadrootsRelayObservationType, + observed_at_ms: i64, + ) -> Self { + Self { + relay_url: relay_url.into(), + observation_type, + observed_at_ms, + message: None, + } + } + + pub fn with_message(mut self, message: impl Into<String>) -> Self { + self.message = Some(message.into()); + self + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsEventIngest { + pub event: RadrootsNostrEvent, + pub raw_json: Option<String>, + pub verification_status: RadrootsEventVerificationStatus, + pub observed_at_ms: i64, + pub relay_observation: Option<RadrootsRelayObservation>, +} + +impl RadrootsEventIngest { + pub fn verified(event: RadrootsNostrEvent, observed_at_ms: i64) -> Self { + Self { + event, + raw_json: None, + verification_status: RadrootsEventVerificationStatus::Verified, + observed_at_ms, + relay_observation: None, + } + } + + pub fn with_raw_json(mut self, raw_json: impl Into<String>) -> Self { + self.raw_json = Some(raw_json.into()); + self + } + + pub fn with_observation(mut self, observation: RadrootsRelayObservation) -> Self { + self.relay_observation = Some(observation); + self + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RadrootsEventHeadStoreDecision { + Applied, + NotHeadSelected, + NotPersisted, + SkippedDuplicate, + SkippedOlder, + SkippedSameTimestampHigherEventId, + Malformed, + Unsupported, +} + +impl RadrootsEventHeadStoreDecision { + pub fn from_protocol(value: &RadrootsEventHeadDecision) -> Self { + match value { + RadrootsEventHeadDecision::Applied(_) => Self::Applied, + RadrootsEventHeadDecision::SkippedDuplicate => Self::SkippedDuplicate, + RadrootsEventHeadDecision::SkippedOlder => Self::SkippedOlder, + RadrootsEventHeadDecision::SkippedSameTimestampHigherEventId => { + Self::SkippedSameTimestampHigherEventId + } + RadrootsEventHeadDecision::CoordinateMismatch => Self::Malformed, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsEventIngestReceipt { + pub event_id: String, + pub inserted: bool, + pub verification_status: RadrootsEventVerificationStatus, + pub contract_status: RadrootsEventContractStatus, + pub contract_id: Option<String>, + pub projection_eligible: bool, + pub head_decision: RadrootsEventHeadStoreDecision, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsStoredEvent { + pub event_id: String, + pub pubkey: String, + pub created_at: u32, + pub kind: u32, + pub tags_json: String, + pub content: String, + pub sig: String, + pub raw_json: String, + pub verification_status: RadrootsEventVerificationStatus, + pub contract_status: RadrootsEventContractStatus, + pub contract_id: Option<String>, + pub event_class: Option<StoredEventClass>, + pub projection_eligible: bool, + pub inserted_at_ms: i64, + pub updated_at_ms: i64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsStoredEventTag { + pub event_id: String, + pub tag_index: u32, + pub tag_name: String, + pub tag_value: Option<String>, + pub tag_json: String, + pub contract_semantic: Option<String>, + pub contract_value_type: Option<String>, + pub relay_indexed: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsStoredEventHead { + pub coordinate_type: StoredEventClass, + pub kind: u32, + pub pubkey: String, + pub d_tag: Option<String>, + pub event_id: String, + pub created_at: u32, + pub updated_at_ms: i64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsProjectionCursor { + pub projection_id: String, + pub last_event_id: Option<String>, + pub last_created_at: u32, + pub updated_at_ms: i64, +} + +pub fn tag_semantic_name(value: RadrootsTagSemantic) -> &'static str { + match value { + RadrootsTagSemantic::AddressableCoordinate => "addressable_coordinate", + RadrootsTagSemantic::Category => "category", + RadrootsTagSemantic::Counterparty => "counterparty", + RadrootsTagSemantic::EventPointer => "event_pointer", + RadrootsTagSemantic::GroupId => "group_id", + RadrootsTagSemantic::Identifier => "identifier", + RadrootsTagSemantic::Image => "image", + RadrootsTagSemantic::Kind => "kind", + RadrootsTagSemantic::ListingAddress => "listing_address", + RadrootsTagSemantic::ListingSnapshot => "listing_snapshot", + RadrootsTagSemantic::Location => "location", + RadrootsTagSemantic::PreviousEvent => "previous_event", + RadrootsTagSemantic::Price => "price", + RadrootsTagSemantic::PublishedAt => "published_at", + RadrootsTagSemantic::Relay => "relay", + RadrootsTagSemantic::RootEvent => "root_event", + RadrootsTagSemantic::ServiceInput => "service_input", + RadrootsTagSemantic::ServiceOutput => "service_output", + RadrootsTagSemantic::Status => "status", + RadrootsTagSemantic::Summary => "summary", + RadrootsTagSemantic::Title => "title", + RadrootsTagSemantic::Url => "url", + } +} + +pub fn tag_value_type_name(value: RadrootsTagValueType) -> &'static str { + match value { + RadrootsTagValueType::AddressableCoordinate => "addressable_coordinate", + RadrootsTagValueType::DTag => "d_tag", + RadrootsTagValueType::EventId => "event_id", + RadrootsTagValueType::Kind => "kind", + RadrootsTagValueType::PublicKey => "public_key", + RadrootsTagValueType::RelayUrl => "relay_url", + RadrootsTagValueType::Text => "text", + RadrootsTagValueType::UnixTimestamp => "unix_timestamp", + RadrootsTagValueType::Url => "url", + } +} diff --git a/crates/event_store/src/store.rs b/crates/event_store/src/store.rs @@ -0,0 +1,944 @@ +use crate::RadrootsEventStoreError; +use crate::migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP}; +use crate::model::{ + RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest, + RadrootsEventIngestReceipt, RadrootsEventVerificationStatus, RadrootsProjectionCursor, + RadrootsRelayObservation, RadrootsStoredEvent, RadrootsStoredEventHead, RadrootsStoredEventTag, + StoredEventClass, tag_semantic_name, tag_value_type_name, +}; +use radroots_events::RadrootsNostrEvent; +use radroots_events::contract::{ + RadrootsEventClass, RadrootsEventContract, identify_event_contract, +}; +use radroots_events::event_head::{ + RadrootsCurrentEventHead, RadrootsEventHeadCandidate, RadrootsEventHeadCandidateResult, + RadrootsEventHeadCoordinate, RadrootsEventHeadDecision, event_head_candidate_for_contract, + select_event_head, +}; +use radroots_events::ids::{RadrootsEventId, RadrootsEventSignature, RadrootsPublicKey}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::{Row, SqlitePool}; +use std::path::Path; +use std::str::FromStr; + +#[derive(Clone)] +pub struct RadrootsEventStore { + pool: SqlitePool, +} + +impl RadrootsEventStore { + pub async fn open_memory() -> Result<Self, RadrootsEventStoreError> { + let options = SqliteConnectOptions::from_str("sqlite::memory:")?; + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await?; + configure_connection(&pool, false).await?; + apply_up(&pool).await?; + Ok(Self { pool }) + } + + pub async fn open_file(path: impl AsRef<Path>) -> Result<Self, RadrootsEventStoreError> { + let options = SqliteConnectOptions::new() + .filename(path) + .create_if_missing(true); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await?; + configure_connection(&pool, true).await?; + apply_up(&pool).await?; + Ok(Self { pool }) + } + + pub fn pool(&self) -> &SqlitePool { + &self.pool + } + + pub async fn migrate_down(&self) -> Result<(), RadrootsEventStoreError> { + apply_down(&self.pool).await + } + + pub async fn pragma_foreign_keys(&self) -> Result<i64, RadrootsEventStoreError> { + query_i64(&self.pool, "PRAGMA foreign_keys").await + } + + pub async fn pragma_busy_timeout(&self) -> Result<i64, RadrootsEventStoreError> { + query_i64(&self.pool, "PRAGMA busy_timeout").await + } + + pub async fn pragma_journal_mode(&self) -> Result<String, RadrootsEventStoreError> { + query_string(&self.pool, "PRAGMA journal_mode").await + } + + pub async fn ingest_event( + &self, + ingest: RadrootsEventIngest, + ) -> Result<RadrootsEventIngestReceipt, RadrootsEventStoreError> { + validate_event_identity(&ingest.event)?; + let classification = classify_event(&ingest.event); + let raw_json = ingest + .raw_json + .clone() + .map(Ok) + .unwrap_or_else(|| serde_json::to_string(&ingest.event))?; + let tags_json = serde_json::to_string(&ingest.event.tags)?; + let mut tx = self.pool.begin().await?; + let inserted = insert_raw_event( + &mut tx, + &ingest, + &classification, + raw_json.as_str(), + tags_json.as_str(), + ) + .await? + > 0; + let mut head_decision = RadrootsEventHeadStoreDecision::Unsupported; + let mut projection_eligible = + classification.base_projection_eligible(ingest.verification_status); + + if inserted { + insert_tags(&mut tx, &ingest.event, classification.contract).await?; + if let Some(contract) = classification.contract { + let head = + apply_event_head(&mut tx, &ingest.event, contract, ingest.observed_at_ms) + .await?; + projection_eligible = projection_eligible && head.projection_eligible; + head_decision = head.decision; + sqlx::query( + "UPDATE nostr_event SET projection_eligible = ?, updated_at_ms = ? WHERE event_id = ?", + ) + .bind(bool_i64(projection_eligible)) + .bind(ingest.observed_at_ms) + .bind(ingest.event.id.as_str()) + .execute(&mut *tx) + .await?; + } + } else if classification.contract.is_some() { + head_decision = RadrootsEventHeadStoreDecision::SkippedDuplicate; + projection_eligible = false; + } + + if let Some(observation) = ingest.relay_observation.as_ref() { + upsert_observation(&mut tx, ingest.event.id.as_str(), observation).await?; + } + + tx.commit().await?; + + Ok(RadrootsEventIngestReceipt { + event_id: ingest.event.id, + inserted, + verification_status: ingest.verification_status, + contract_status: classification.contract_status, + contract_id: classification + .contract + .map(|contract| contract.id.to_owned()), + projection_eligible, + head_decision, + }) + } + + pub async fn get_event( + &self, + event_id: &str, + ) -> Result<Option<RadrootsStoredEvent>, RadrootsEventStoreError> { + let row = sqlx::query( + "SELECT event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE event_id = ?", + ) + .bind(event_id) + .fetch_optional(&self.pool) + .await?; + row.map(stored_event_from_row).transpose() + } + + pub async fn tags_for_event( + &self, + event_id: &str, + ) -> Result<Vec<RadrootsStoredEventTag>, RadrootsEventStoreError> { + let rows = sqlx::query( + "SELECT event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed FROM nostr_event_tag WHERE event_id = ? ORDER BY tag_index", + ) + .bind(event_id) + .fetch_all(&self.pool) + .await?; + rows.into_iter().map(stored_tag_from_row).collect() + } + + pub async fn observations_for_event( + &self, + event_id: &str, + ) -> Result<Vec<RadrootsRelayObservationRow>, RadrootsEventStoreError> { + let rows = sqlx::query( + "SELECT event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message FROM relay_event_seen WHERE event_id = ? ORDER BY relay_url, observation_type", + ) + .bind(event_id) + .fetch_all(&self.pool) + .await?; + rows.into_iter().map(relay_observation_from_row).collect() + } + + pub async fn event_head( + &self, + coordinate: &RadrootsEventHeadCoordinate, + ) -> Result<Option<RadrootsStoredEventHead>, RadrootsEventStoreError> { + let row = match coordinate { + RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { + sqlx::query( + "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .fetch_optional(&self.pool) + .await? + } + RadrootsEventHeadCoordinate::Addressable { + kind, + pubkey, + d_tag, + } => { + sqlx::query( + "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .bind(d_tag.as_str()) + .fetch_optional(&self.pool) + .await? + } + }; + row.map(stored_head_from_row).transpose() + } + + pub async fn get_projection_cursor( + &self, + projection_id: &str, + ) -> Result<Option<RadrootsProjectionCursor>, RadrootsEventStoreError> { + let row = sqlx::query( + "SELECT projection_id, last_event_id, last_created_at, updated_at_ms FROM projection_cursor WHERE projection_id = ?", + ) + .bind(projection_id) + .fetch_optional(&self.pool) + .await?; + row.map(projection_cursor_from_row).transpose() + } + + pub async fn update_projection_cursor( + &self, + cursor: &RadrootsProjectionCursor, + ) -> Result<(), RadrootsEventStoreError> { + sqlx::query( + "INSERT INTO projection_cursor(projection_id, last_event_id, last_created_at, updated_at_ms) VALUES (?, ?, ?, ?) ON CONFLICT(projection_id) DO UPDATE SET last_event_id = excluded.last_event_id, last_created_at = excluded.last_created_at, updated_at_ms = excluded.updated_at_ms", + ) + .bind(cursor.projection_id.as_str()) + .bind(cursor.last_event_id.as_deref()) + .bind(i64::from(cursor.last_created_at)) + .bind(cursor.updated_at_ms) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn events_since_cursor( + &self, + projection_id: &str, + limit: u32, + ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> { + let cursor = self.get_projection_cursor(projection_id).await?; + let last_created_at = cursor + .as_ref() + .map(|cursor| cursor.last_created_at) + .unwrap_or(0); + let last_event_id = cursor + .as_ref() + .and_then(|cursor| cursor.last_event_id.as_deref()) + .unwrap_or(""); + let rows = sqlx::query( + "SELECT event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE projection_eligible = 1 AND (created_at > ? OR (created_at = ? AND event_id > ?)) ORDER BY created_at, event_id LIMIT ?", + ) + .bind(i64::from(last_created_at)) + .bind(i64::from(last_created_at)) + .bind(last_event_id) + .bind(i64::from(limit)) + .fetch_all(&self.pool) + .await?; + rows.into_iter().map(stored_event_from_row).collect() + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsRelayObservationRow { + pub event_id: String, + pub relay_url: String, + pub observation_type: String, + pub first_seen_at_ms: i64, + pub last_seen_at_ms: i64, + pub observation_count: i64, + pub last_message: Option<String>, +} + +struct EventClassification { + contract_status: RadrootsEventContractStatus, + contract: Option<&'static RadrootsEventContract>, +} + +impl EventClassification { + fn base_projection_eligible(&self, verification: RadrootsEventVerificationStatus) -> bool { + verification == RadrootsEventVerificationStatus::Verified + && self + .contract + .map(|contract| contract.class != RadrootsEventClass::Ephemeral) + .unwrap_or(false) + } +} + +struct AppliedHead { + decision: RadrootsEventHeadStoreDecision, + projection_eligible: bool, +} + +async fn configure_connection( + pool: &SqlitePool, + file_backed: bool, +) -> Result<(), RadrootsEventStoreError> { + sqlx::query("PRAGMA foreign_keys = ON") + .execute(pool) + .await?; + sqlx::query("PRAGMA busy_timeout = 5000") + .execute(pool) + .await?; + if file_backed { + sqlx::query("PRAGMA journal_mode = WAL") + .execute(pool) + .await?; + } + Ok(()) +} + +async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> { + sqlx::raw_sql(EVENT_STORE_MIGRATION_UP) + .execute(pool) + .await?; + Ok(()) +} + +async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> { + sqlx::raw_sql(EVENT_STORE_MIGRATION_DOWN) + .execute(pool) + .await?; + Ok(()) +} + +async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsEventStoreError> { + let row = sqlx::query(sql).fetch_one(pool).await?; + Ok(row.try_get(0)?) +} + +async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsEventStoreError> { + let row = sqlx::query(sql).fetch_one(pool).await?; + Ok(row.try_get(0)?) +} + +fn validate_event_identity(event: &RadrootsNostrEvent) -> Result<(), RadrootsEventStoreError> { + RadrootsEventId::parse(event.id.as_str())?; + RadrootsPublicKey::parse(event.author.as_str())?; + RadrootsEventSignature::parse(event.sig.as_str())?; + Ok(()) +} + +fn classify_event(event: &RadrootsNostrEvent) -> EventClassification { + match identify_event_contract(event.kind, &event.tags, &event.content) { + Ok(contract) => EventClassification { + contract_status: RadrootsEventContractStatus::Supported, + contract: Some(contract), + }, + Err(error) => EventClassification { + contract_status: RadrootsEventContractStatus::from_match_error(error), + contract: None, + }, + } +} + +async fn insert_raw_event( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + ingest: &RadrootsEventIngest, + classification: &EventClassification, + raw_json: &str, + tags_json: &str, +) -> Result<u64, RadrootsEventStoreError> { + let event = &ingest.event; + let contract_id = classification.contract.map(|contract| contract.id); + let event_class = classification + .contract + .map(|contract| StoredEventClass::from_event_class(contract.class).as_str()); + let projection_eligible = classification.base_projection_eligible(ingest.verification_status); + let result = sqlx::query( + "INSERT OR IGNORE INTO nostr_event(event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(event.id.as_str()) + .bind(event.author.as_str()) + .bind(i64::from(event.created_at)) + .bind(i64::from(event.kind)) + .bind(tags_json) + .bind(event.content.as_str()) + .bind(event.sig.as_str()) + .bind(raw_json) + .bind(ingest.verification_status.as_str()) + .bind(classification.contract_status.as_str()) + .bind(contract_id) + .bind(event_class) + .bind(bool_i64(projection_eligible)) + .bind(ingest.observed_at_ms) + .bind(ingest.observed_at_ms) + .execute(&mut **tx) + .await?; + Ok(result.rows_affected()) +} + +async fn insert_tags( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + event: &RadrootsNostrEvent, + contract: Option<&'static RadrootsEventContract>, +) -> Result<(), RadrootsEventStoreError> { + for (index, tag) in event.tags.iter().enumerate() { + let tag_name = tag.first().map(String::as_str).unwrap_or(""); + let tag_value = tag.get(1).map(String::as_str); + let tag_json = serde_json::to_string(tag)?; + let tag_contract = contract.and_then(|contract| { + contract + .tags + .iter() + .find(|candidate| candidate.name == tag_name) + }); + let contract_semantic = tag_contract.map(|tag| tag_semantic_name(tag.semantic)); + let contract_value_type = tag_contract.map(|tag| tag_value_type_name(tag.value_type)); + let relay_indexed = tag_contract.map(|tag| tag.relay_indexed).unwrap_or(false); + sqlx::query( + "INSERT INTO nostr_event_tag(event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(event.id.as_str()) + .bind(i64::try_from(index).map_err(|_| RadrootsEventStoreError::IntegerRange { + field: "tag_index", + value: i64::MAX, + })?) + .bind(tag_name) + .bind(tag_value) + .bind(tag_json.as_str()) + .bind(contract_semantic) + .bind(contract_value_type) + .bind(bool_i64(relay_indexed)) + .execute(&mut **tx) + .await?; + } + Ok(()) +} + +async fn upsert_observation( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + event_id: &str, + observation: &RadrootsRelayObservation, +) -> Result<(), RadrootsEventStoreError> { + sqlx::query( + "INSERT INTO relay_event_seen(event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message) VALUES (?, ?, ?, ?, ?, 1, ?) ON CONFLICT(event_id, relay_url, observation_type) DO UPDATE SET last_seen_at_ms = excluded.last_seen_at_ms, observation_count = relay_event_seen.observation_count + 1, last_message = excluded.last_message", + ) + .bind(event_id) + .bind(observation.relay_url.as_str()) + .bind(observation.observation_type.as_str()) + .bind(observation.observed_at_ms) + .bind(observation.observed_at_ms) + .bind(observation.message.as_deref()) + .execute(&mut **tx) + .await?; + Ok(()) +} + +async fn apply_event_head( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + event: &RadrootsNostrEvent, + contract: &RadrootsEventContract, + updated_at_ms: i64, +) -> Result<AppliedHead, RadrootsEventStoreError> { + let candidate = match event_head_candidate_for_contract(event, contract) { + RadrootsEventHeadCandidateResult::Candidate(candidate) => candidate, + RadrootsEventHeadCandidateResult::NotHeadSelected => { + return Ok(AppliedHead { + decision: RadrootsEventHeadStoreDecision::NotHeadSelected, + projection_eligible: true, + }); + } + RadrootsEventHeadCandidateResult::NotPersisted => { + return Ok(AppliedHead { + decision: RadrootsEventHeadStoreDecision::NotPersisted, + projection_eligible: false, + }); + } + RadrootsEventHeadCandidateResult::Malformed(_) => { + return Ok(AppliedHead { + decision: RadrootsEventHeadStoreDecision::Malformed, + projection_eligible: false, + }); + } + }; + let current = current_event_head(tx, &candidate.coordinate).await?; + let protocol_decision = select_event_head(candidate.clone(), current.as_ref()); + if let RadrootsEventHeadDecision::Applied(head) = &protocol_decision { + upsert_head(tx, &candidate, head, updated_at_ms).await?; + } + let projection_eligible = matches!(protocol_decision, RadrootsEventHeadDecision::Applied(_)); + Ok(AppliedHead { + decision: RadrootsEventHeadStoreDecision::from_protocol(&protocol_decision), + projection_eligible, + }) +} + +async fn current_event_head( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + coordinate: &RadrootsEventHeadCoordinate, +) -> Result<Option<RadrootsCurrentEventHead>, RadrootsEventStoreError> { + let row = match coordinate { + RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { + sqlx::query( + "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .fetch_optional(&mut **tx) + .await? + } + RadrootsEventHeadCoordinate::Addressable { + kind, + pubkey, + d_tag, + } => { + sqlx::query( + "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .bind(d_tag.as_str()) + .fetch_optional(&mut **tx) + .await? + } + }; + row.map(|row| { + let event_id: String = row.try_get("event_id")?; + let created_at: i64 = row.try_get("created_at")?; + Ok(RadrootsCurrentEventHead { + coordinate: coordinate.clone(), + event_id: RadrootsEventId::parse(event_id)?, + created_at: u32_from_i64("created_at", created_at)?, + }) + }) + .transpose() +} + +async fn upsert_head( + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, + candidate: &RadrootsEventHeadCandidate, + head: &RadrootsCurrentEventHead, + updated_at_ms: i64, +) -> Result<(), RadrootsEventStoreError> { + match &head.coordinate { + RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { + sqlx::query( + "DELETE FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .execute(&mut **tx) + .await?; + sqlx::query( + "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('replaceable', ?, ?, NULL, ?, ?, ?)", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .bind(candidate.event_id.as_str()) + .bind(i64::from(candidate.created_at)) + .bind(updated_at_ms) + .execute(&mut **tx) + .await?; + } + RadrootsEventHeadCoordinate::Addressable { + kind, + pubkey, + d_tag, + } => { + sqlx::query( + "DELETE FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .bind(d_tag.as_str()) + .execute(&mut **tx) + .await?; + sqlx::query( + "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('addressable', ?, ?, ?, ?, ?, ?)", + ) + .bind(i64::from(*kind)) + .bind(pubkey.as_str()) + .bind(d_tag.as_str()) + .bind(candidate.event_id.as_str()) + .bind(i64::from(candidate.created_at)) + .bind(updated_at_ms) + .execute(&mut **tx) + .await?; + } + } + Ok(()) +} + +fn stored_event_from_row( + row: sqlx::sqlite::SqliteRow, +) -> Result<RadrootsStoredEvent, RadrootsEventStoreError> { + let kind = u32_from_i64("kind", row.try_get("kind")?)?; + let created_at = u32_from_i64("created_at", row.try_get("created_at")?)?; + let verification_status = + RadrootsEventVerificationStatus::parse(row.try_get("verification_status")?)?; + let contract_status = + RadrootsEventContractStatus::parse(row.try_get("contract_status")?, kind)?; + let event_class = row + .try_get::<Option<String>, _>("event_class")? + .map(|value| StoredEventClass::parse(value.as_str())) + .transpose()?; + let projection_eligible = row.try_get::<i64, _>("projection_eligible")? != 0; + Ok(RadrootsStoredEvent { + event_id: row.try_get("event_id")?, + pubkey: row.try_get("pubkey")?, + created_at, + kind, + tags_json: row.try_get("tags_json")?, + content: row.try_get("content")?, + sig: row.try_get("sig")?, + raw_json: row.try_get("raw_json")?, + verification_status, + contract_status, + contract_id: row.try_get("contract_id")?, + event_class, + projection_eligible, + inserted_at_ms: row.try_get("inserted_at_ms")?, + updated_at_ms: row.try_get("updated_at_ms")?, + }) +} + +fn stored_tag_from_row( + row: sqlx::sqlite::SqliteRow, +) -> Result<RadrootsStoredEventTag, RadrootsEventStoreError> { + Ok(RadrootsStoredEventTag { + event_id: row.try_get("event_id")?, + tag_index: u32_from_i64("tag_index", row.try_get("tag_index")?)?, + tag_name: row.try_get("tag_name")?, + tag_value: row.try_get("tag_value")?, + tag_json: row.try_get("tag_json")?, + contract_semantic: row.try_get("contract_semantic")?, + contract_value_type: row.try_get("contract_value_type")?, + relay_indexed: row.try_get::<i64, _>("relay_indexed")? != 0, + }) +} + +fn stored_head_from_row( + row: sqlx::sqlite::SqliteRow, +) -> Result<RadrootsStoredEventHead, RadrootsEventStoreError> { + Ok(RadrootsStoredEventHead { + coordinate_type: StoredEventClass::parse(row.try_get("coordinate_type")?)?, + kind: u32_from_i64("kind", row.try_get("kind")?)?, + pubkey: row.try_get("pubkey")?, + d_tag: row.try_get("d_tag")?, + event_id: row.try_get("event_id")?, + created_at: u32_from_i64("created_at", row.try_get("created_at")?)?, + updated_at_ms: row.try_get("updated_at_ms")?, + }) +} + +fn projection_cursor_from_row( + row: sqlx::sqlite::SqliteRow, +) -> Result<RadrootsProjectionCursor, RadrootsEventStoreError> { + Ok(RadrootsProjectionCursor { + projection_id: row.try_get("projection_id")?, + last_event_id: row.try_get("last_event_id")?, + last_created_at: u32_from_i64("last_created_at", row.try_get("last_created_at")?)?, + updated_at_ms: row.try_get("updated_at_ms")?, + }) +} + +fn relay_observation_from_row( + row: sqlx::sqlite::SqliteRow, +) -> Result<RadrootsRelayObservationRow, RadrootsEventStoreError> { + Ok(RadrootsRelayObservationRow { + event_id: row.try_get("event_id")?, + relay_url: row.try_get("relay_url")?, + observation_type: row.try_get("observation_type")?, + first_seen_at_ms: row.try_get("first_seen_at_ms")?, + last_seen_at_ms: row.try_get("last_seen_at_ms")?, + observation_count: row.try_get("observation_count")?, + last_message: row.try_get("last_message")?, + }) +} + +fn u32_from_i64(field: &'static str, value: i64) -> Result<u32, RadrootsEventStoreError> { + u32::try_from(value).map_err(|_| RadrootsEventStoreError::IntegerRange { field, value }) +} + +fn bool_i64(value: bool) -> i64 { + if value { 1 } else { 0 } +} + +#[cfg(test)] +mod tests { + use super::*; + use radroots_events::event_head::event_head_candidate_for_event; + use radroots_events::kinds::{KIND_POST, KIND_PROFILE}; + + fn hex_64(character: char) -> String { + core::iter::repeat_n(character, 64).collect() + } + + fn sig() -> String { + "f".repeat(128) + } + + fn event( + kind: u32, + id: char, + author: char, + created_at: u32, + tags: Vec<Vec<String>>, + content: &str, + ) -> RadrootsNostrEvent { + RadrootsNostrEvent { + id: hex_64(id), + author: hex_64(author), + created_at, + kind, + tags, + content: content.to_owned(), + sig: sig(), + } + } + + #[tokio::test] + async fn constructor_enforces_sqlite_pragmas() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + + assert_eq!(store.pragma_foreign_keys().await.expect("foreign_keys"), 1); + assert_eq!( + store.pragma_busy_timeout().await.expect("busy_timeout"), + 5000 + ); + } + + #[tokio::test] + async fn migration_can_run_down() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + store.migrate_down().await.expect("down"); + + let missing = match sqlx::query("SELECT COUNT(*) FROM nostr_event") + .fetch_one(store.pool()) + .await + { + Ok(_) => panic!("table should be removed"), + Err(error) => error, + }; + assert!(missing.to_string().contains("nostr_event")); + } + + #[tokio::test] + async fn ingest_retains_raw_event_and_ignores_duplicate_rows() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let event = event( + KIND_POST, + '1', + '2', + 10, + vec![vec!["t".to_owned(), "soil".to_owned()]], + "hello", + ); + let ingest = + RadrootsEventIngest::verified(event.clone(), 1_000).with_raw_json("{\"fixture\":true}"); + + let first = store + .ingest_event(ingest.clone()) + .await + .expect("first ingest"); + let second = store.ingest_event(ingest).await.expect("second ingest"); + let stored = store + .get_event(event.id.as_str()) + .await + .expect("get") + .expect("stored"); + + assert!(first.inserted); + assert!(!second.inserted); + assert_eq!(stored.raw_json, "{\"fixture\":true}"); + assert_eq!(stored.content, "hello"); + assert_eq!(stored.tags_json, "[[\"t\",\"soil\"]]"); + assert_eq!( + stored.contract_status, + RadrootsEventContractStatus::Supported + ); + assert!(stored.projection_eligible); + assert_eq!( + store + .tags_for_event(event.id.as_str()) + .await + .expect("tags") + .len(), + 1 + ); + } + + #[tokio::test] + async fn unsupported_and_invalid_events_are_stored_but_not_projected() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let mut ingest = + RadrootsEventIngest::verified(event(999_999, '3', '4', 11, Vec::new(), ""), 2_000); + ingest.verification_status = RadrootsEventVerificationStatus::Invalid; + let receipt = store.ingest_event(ingest).await.expect("ingest"); + let stored = store + .get_event(hex_64('3').as_str()) + .await + .expect("get") + .expect("stored"); + + assert_eq!( + receipt.contract_status, + RadrootsEventContractStatus::UnsupportedKind(999_999) + ); + assert_eq!( + stored.verification_status, + RadrootsEventVerificationStatus::Invalid + ); + assert!(!stored.projection_eligible); + } + + #[tokio::test] + async fn tag_rows_preserve_order_and_contract_metadata() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let event = event( + KIND_PROFILE, + '5', + '6', + 12, + vec![ + vec!["p".to_owned(), hex_64('7')], + vec!["t".to_owned(), "harvest".to_owned()], + ], + "{}", + ); + + store + .ingest_event(RadrootsEventIngest::verified(event.clone(), 3_000)) + .await + .expect("ingest"); + let tags = store.tags_for_event(event.id.as_str()).await.expect("tags"); + + assert_eq!(tags[0].tag_index, 0); + assert_eq!(tags[0].tag_name, "p"); + assert_eq!(tags[0].contract_value_type.as_deref(), Some("public_key")); + assert!(tags[0].relay_indexed); + assert_eq!(tags[1].tag_index, 1); + assert_eq!(tags[1].tag_json, "[\"t\",\"harvest\"]"); + } + + #[tokio::test] + async fn relay_observations_upsert_separately_from_event_identity() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let event = event(KIND_POST, '8', '9', 13, Vec::new(), "hello"); + let observation = RadrootsRelayObservation::new( + "wss://relay.local", + crate::RadrootsRelayObservationType::Subscription, + 4_000, + ); + let ingest = + RadrootsEventIngest::verified(event.clone(), 4_000).with_observation(observation); + store.ingest_event(ingest).await.expect("first"); + let observation = RadrootsRelayObservation::new( + "wss://relay.local", + crate::RadrootsRelayObservationType::Subscription, + 4_100, + ) + .with_message("duplicate accepted"); + let ingest = + RadrootsEventIngest::verified(event.clone(), 4_100).with_observation(observation); + store.ingest_event(ingest).await.expect("second"); + + let observations = store + .observations_for_event(event.id.as_str()) + .await + .expect("observations"); + assert_eq!(observations.len(), 1); + assert_eq!(observations[0].observation_count, 2); + assert_eq!(observations[0].last_seen_at_ms, 4_100); + assert_eq!( + observations[0].last_message.as_deref(), + Some("duplicate accepted") + ); + } + + #[tokio::test] + async fn event_heads_use_protocol_tie_breaks() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let high = event(KIND_PROFILE, 'b', 'a', 20, Vec::new(), "{}"); + let low = event(KIND_PROFILE, 'a', 'a', 20, Vec::new(), "{}"); + + let first = store + .ingest_event(RadrootsEventIngest::verified(high.clone(), 5_000)) + .await + .expect("first"); + let second = store + .ingest_event(RadrootsEventIngest::verified(low.clone(), 5_100)) + .await + .expect("second"); + let RadrootsEventHeadCandidateResult::Candidate(candidate) = + event_head_candidate_for_event(&low).expect("candidate") + else { + panic!("profile should select a head"); + }; + let head = store + .event_head(&candidate.coordinate) + .await + .expect("head") + .expect("stored head"); + + assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied); + assert_eq!( + second.head_decision, + RadrootsEventHeadStoreDecision::Applied + ); + assert_eq!(head.event_id, low.id); + } + + #[tokio::test] + async fn projection_cursors_drive_replay_without_json_extraction() { + let store = RadrootsEventStore::open_memory().await.expect("open"); + let first = event(KIND_POST, 'c', 'd', 30, Vec::new(), "one"); + let second = event(KIND_POST, 'd', 'd', 31, Vec::new(), "two"); + store + .ingest_event(RadrootsEventIngest::verified(first.clone(), 6_000)) + .await + .expect("first"); + store + .ingest_event(RadrootsEventIngest::verified(second.clone(), 6_100)) + .await + .expect("second"); + + let replay = store + .events_since_cursor("social", 10) + .await + .expect("initial replay"); + assert_eq!(replay.len(), 2); + store + .update_projection_cursor(&RadrootsProjectionCursor { + projection_id: "social".to_owned(), + last_event_id: Some(first.id), + last_created_at: 30, + updated_at_ms: 6_200, + }) + .await + .expect("cursor"); + let replay = store + .events_since_cursor("social", 10) + .await + .expect("next replay"); + assert_eq!(replay.len(), 1); + assert_eq!(replay[0].event_id, second.id); + } +} diff --git a/crates/geocoder/Cargo.toml b/crates/geocoder/Cargo.toml @@ -13,12 +13,10 @@ documentation = "https://docs.rs/radroots_geocoder" readme = "README" [dependencies] -rusqlite = { workspace = true, features = ["bundled", "serialize"] } +rusqlite = { workspace = true, features = ["bundled"] } serde = { workspace = true, features = ["derive"] } -thiserror = { workspace = true } - -[dev-dependencies] tempfile = { workspace = true } +thiserror = { workspace = true } [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } diff --git a/crates/geocoder/src/error.rs b/crates/geocoder/src/error.rs @@ -4,6 +4,8 @@ use thiserror::Error; pub enum GeocoderError { #[error("sqlite error: {0}")] Sqlite(#[from] rusqlite::Error), + #[error("io error: {0}")] + Io(#[from] std::io::Error), #[error("country center not found for {country_id}")] CountryCenterNotFound { country_id: String }, } diff --git a/crates/geocoder/src/geocoder.rs b/crates/geocoder/src/geocoder.rs @@ -2,25 +2,36 @@ use crate::error::GeocoderError; use crate::model::{ GeocoderCountryListResult, GeocoderPoint, GeocoderReverseOptions, GeocoderReverseResult, }; -use rusqlite::{Connection, MAIN_DB, named_params}; +use rusqlite::{Connection, named_params}; +use std::io::Write; use std::path::Path; pub struct Geocoder { conn: Connection, + _temp_path: Option<tempfile::TempPath>, } impl Geocoder { #[cfg_attr(coverage_nightly, coverage(off))] pub fn open_path<P: AsRef<Path>>(path: P) -> Result<Self, GeocoderError> { let conn = Connection::open(path)?; - Ok(Self { conn }) + Ok(Self { + conn, + _temp_path: None, + }) } #[cfg_attr(coverage_nightly, coverage(off))] pub fn open_bytes(bytes: &[u8]) -> Result<Self, GeocoderError> { - let mut conn = Connection::open_in_memory()?; - conn.deserialize_read_exact(MAIN_DB, bytes, bytes.len(), true)?; - Ok(Self { conn }) + let mut temp = tempfile::NamedTempFile::new()?; + temp.as_file_mut().write_all(bytes)?; + let temp_path = temp.into_temp_path(); + let path: &Path = temp_path.as_ref(); + let conn = Connection::open(path)?; + Ok(Self { + conn, + _temp_path: Some(temp_path), + }) } pub fn reverse( @@ -590,7 +601,10 @@ mod tests { "#, ) .expect("create reverse/country execution error schema"); - Geocoder { conn } + Geocoder { + conn, + _temp_path: None, + } } fn geocoder_with_country_list_query_execution_error() -> Geocoder { @@ -607,7 +621,10 @@ mod tests { "#, ) .expect("create country_list execution error schema"); - Geocoder { conn } + Geocoder { + conn, + _temp_path: None, + } } fn geocoder_with_country_center_query_execution_error() -> Geocoder { @@ -623,7 +640,10 @@ mod tests { "#, ) .expect("create country_center execution error schema"); - Geocoder { conn } + Geocoder { + conn, + _temp_path: None, + } } fn geocoder_with_country_list_sql_row( @@ -647,7 +667,10 @@ mod tests { "#, )) .expect("create country_list field error schema"); - Geocoder { conn } + Geocoder { + conn, + _temp_path: None, + } } fn map_country_center_row_error(latitude_sql: &str, longitude_sql: &str) -> rusqlite::Error { diff --git a/spec/README.md b/spec/README.md @@ -96,6 +96,7 @@ The public Rust story is tiered explicitly. - `radroots_replica_sync` - Deferred crates.io publication: - `radroots_types` + - `radroots_event_store` - `radroots_events_codec_wasm` - `radroots_net` - `radroots_nostr_runtime` diff --git a/spec/manifest.toml b/spec/manifest.toml @@ -48,6 +48,7 @@ published_support = [ ] deferred_publication = [ "radroots_types", + "radroots_event_store", "radroots_events_codec_wasm", "radroots_net", "radroots_nostr_runtime",