lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 5d65ed2a2ad73bf31737411a5439a65185d83c81
parent acbc2b3ad1ba26422349008748df2d484f57b611
Author: triesap <tyson@radroots.org>
Date:   Sat, 23 May 2026 02:27:35 +0000

local_events: add shared store foundation

- add the radroots_local_events workspace crate and lockfile entry
- define local work, signed event, outbox, and cursor models
- add SQLite migrations and typed store APIs for append and projection reads
- cover malformed records, idempotent append, cursor movement, and outbox updates

Diffstat:
MCargo.lock | 10++++++++++
MCargo.toml | 2++
Acrates/local_events/Cargo.toml | 29+++++++++++++++++++++++++++++
Acrates/local_events/README | 8++++++++
Acrates/local_events/migrations/0000_local_events.down.sql | 2++
Acrates/local_events/migrations/0000_local_events.up.sql | 43+++++++++++++++++++++++++++++++++++++++++++
Acrates/local_events/src/error.rs | 14++++++++++++++
Acrates/local_events/src/lib.rs | 14++++++++++++++
Acrates/local_events/src/migrations.rs | 25+++++++++++++++++++++++++
Acrates/local_events/src/models.rs | 274+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/local_events/src/store.rs | 290+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/local_events/tests/store.rs | 136+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
12 files changed, 847 insertions(+), 0 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -3987,6 +3987,16 @@ dependencies = [ ] [[package]] +name = "radroots_local_events" +version = "0.1.0-alpha.2" +dependencies = [ + "radroots_sql_core", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] name = "radroots_log" version = "0.1.0-alpha.2" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/events_indexed", "crates/geocoder", "crates/identity", + "crates/local_events", "crates/log", "crates/net", "crates/nostr", @@ -63,6 +64,7 @@ radroots_events_codec = { path = "crates/events_codec", version = "0.1.0-alpha.2 radroots_events_indexed = { path = "crates/events_indexed", version = "0.1.0-alpha.2", default-features = false } radroots_geocoder = { path = "crates/geocoder", version = "0.1.0-alpha.2" } radroots_identity = { path = "crates/identity", version = "0.1.0-alpha.2", default-features = false } +radroots_local_events = { path = "crates/local_events", version = "0.1.0-alpha.2", default-features = false } radroots_nostr = { path = "crates/nostr", version = "0.1.0-alpha.2", default-features = false } radroots_nostr_accounts = { path = "crates/nostr_accounts", version = "0.1.0-alpha.2", default-features = false } radroots_nostr_connect = { path = "crates/nostr_connect", version = "0.1.0-alpha.2", default-features = false } diff --git a/crates/local_events/Cargo.toml b/crates/local_events/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "radroots_local_events" +publish = ["crates-io"] +version = "0.1.0-alpha.2" +edition.workspace = true +authors = ["Tyson Lupul <tyson@radroots.org>"] +rust-version.workspace = true +license.workspace = true +description = "Shared local event and work-store primitives for Radroots runtimes" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots_local_events" +readme = "README" + +[lib] +crate-type = ["rlib"] + +[features] +default = [] +native = ["radroots_sql_core/native"] + +[dependencies] +radroots_sql_core = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +radroots_sql_core = { workspace = true, features = ["native"] } diff --git a/crates/local_events/README b/crates/local_events/README @@ -0,0 +1,8 @@ +# radroots_local_events + +Shared local event and work-store primitives for same-host Rad Roots runtimes. + +This crate owns the SQLite schema and typed API for the `shared/local_events` +namespace. It is an interop store for local work records, signed event records, +publish outbox status, relay delivery metadata, and projection cursors. It is +not an application primary database. diff --git a/crates/local_events/migrations/0000_local_events.down.sql b/crates/local_events/migrations/0000_local_events.down.sql @@ -0,0 +1,2 @@ +drop table if exists local_event_projection_cursor; +drop table if exists local_event_record; diff --git a/crates/local_events/migrations/0000_local_events.up.sql b/crates/local_events/migrations/0000_local_events.up.sql @@ -0,0 +1,43 @@ +create table if not exists local_event_record ( + seq integer primary key autoincrement, + record_id text not null unique, + family text not null check (family in ('local_work', 'signed_event')), + status text not null check (status in ('local_draft', 'local_saved', 'pending_publish', 'published', 'failed', 'conflict')), + source_runtime text not null check (source_runtime in ('cli', 'app', 'service', 'worker', 'test')), + created_at_ms integer not null, + inserted_at_ms integer not null, + updated_at_ms integer not null, + owner_account_id text, + owner_pubkey text, + farm_id text, + listing_addr text, + local_work_json text, + event_id text, + event_kind integer, + event_pubkey text, + event_created_at integer, + event_tags_json text, + event_content text, + event_sig text, + raw_event_json text, + outbox_status text not null check (outbox_status in ('none', 'pending', 'acknowledged', 'failed')), + relay_set_fingerprint text, + relay_delivery_json text, + check (trim(record_id) <> ''), + check (family <> 'local_work' or local_work_json is not null), + check (family <> 'local_work' or outbox_status = 'none'), + check (family <> 'signed_event' or (event_id is not null and event_kind is not null and event_pubkey is not null and event_sig is not null and raw_event_json is not null)) +); + +create index if not exists local_event_record_event_id_idx on local_event_record(event_id); +create index if not exists local_event_record_listing_addr_idx on local_event_record(listing_addr); +create index if not exists local_event_record_owner_pubkey_idx on local_event_record(owner_pubkey); +create index if not exists local_event_record_status_idx on local_event_record(status); + +create table if not exists local_event_projection_cursor ( + consumer_id text primary key, + last_seq integer not null, + updated_at_ms integer not null, + check (trim(consumer_id) <> ''), + check (last_seq >= 0) +); diff --git a/crates/local_events/src/error.rs b/crates/local_events/src/error.rs @@ -0,0 +1,14 @@ +#![forbid(unsafe_code)] + +use radroots_sql_core::error::SqlError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum LocalEventsError { + #[error("invalid local event record: {0}")] + InvalidRecord(String), + #[error("sql error: {0}")] + Sql(#[from] SqlError), + #[error("serialization error: {0}")] + Serialization(#[from] serde_json::Error), +} diff --git a/crates/local_events/src/lib.rs b/crates/local_events/src/lib.rs @@ -0,0 +1,14 @@ +#![forbid(unsafe_code)] + +mod error; +mod migrations; +mod models; +mod store; + +pub use error::LocalEventsError; +pub use migrations::{MIGRATIONS, run_all_down, run_all_up}; +pub use models::{ + LocalEventRecord, LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsCursor, + LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime, +}; +pub use store::LocalEventsStore; diff --git a/crates/local_events/src/migrations.rs b/crates/local_events/src/migrations.rs @@ -0,0 +1,25 @@ +#![forbid(unsafe_code)] + +use radroots_sql_core::SqlExecutor; +use radroots_sql_core::error::SqlError; +use radroots_sql_core::migrations::{Migration, migrations_run_all_down, migrations_run_all_up}; + +pub static MIGRATIONS: &[Migration] = &[Migration { + name: "0000_local_events", + up_sql: include_str!("../migrations/0000_local_events.up.sql"), + down_sql: include_str!("../migrations/0000_local_events.down.sql"), +}]; + +pub fn run_all_up<E>(executor: &E) -> Result<(), SqlError> +where + E: SqlExecutor, +{ + migrations_run_all_up(executor, MIGRATIONS) +} + +pub fn run_all_down<E>(executor: &E) -> Result<(), SqlError> +where + E: SqlExecutor, +{ + migrations_run_all_down(executor, MIGRATIONS) +} diff --git a/crates/local_events/src/models.rs b/crates/local_events/src/models.rs @@ -0,0 +1,274 @@ +#![forbid(unsafe_code)] + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::LocalEventsError; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LocalRecordFamily { + LocalWork, + SignedEvent, +} + +impl LocalRecordFamily { + pub fn as_str(self) -> &'static str { + match self { + Self::LocalWork => "local_work", + Self::SignedEvent => "signed_event", + } + } + + pub fn parse(value: &str) -> Result<Self, LocalEventsError> { + match value { + "local_work" => Ok(Self::LocalWork), + "signed_event" => Ok(Self::SignedEvent), + other => Err(LocalEventsError::InvalidRecord(format!( + "unknown record family `{other}`" + ))), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LocalRecordStatus { + LocalDraft, + LocalSaved, + PendingPublish, + Published, + Failed, + Conflict, +} + +impl LocalRecordStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::LocalDraft => "local_draft", + Self::LocalSaved => "local_saved", + Self::PendingPublish => "pending_publish", + Self::Published => "published", + Self::Failed => "failed", + Self::Conflict => "conflict", + } + } + + pub fn parse(value: &str) -> Result<Self, LocalEventsError> { + match value { + "local_draft" => Ok(Self::LocalDraft), + "local_saved" => Ok(Self::LocalSaved), + "pending_publish" => Ok(Self::PendingPublish), + "published" => Ok(Self::Published), + "failed" => Ok(Self::Failed), + "conflict" => Ok(Self::Conflict), + other => Err(LocalEventsError::InvalidRecord(format!( + "unknown record status `{other}`" + ))), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PublishOutboxStatus { + None, + Pending, + Acknowledged, + Failed, +} + +impl PublishOutboxStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::None => "none", + Self::Pending => "pending", + Self::Acknowledged => "acknowledged", + Self::Failed => "failed", + } + } + + pub fn parse(value: &str) -> Result<Self, LocalEventsError> { + match value { + "none" => Ok(Self::None), + "pending" => Ok(Self::Pending), + "acknowledged" => Ok(Self::Acknowledged), + "failed" => Ok(Self::Failed), + other => Err(LocalEventsError::InvalidRecord(format!( + "unknown outbox status `{other}`" + ))), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SourceRuntime { + Cli, + App, + Service, + Worker, + Test, +} + +impl SourceRuntime { + pub fn as_str(self) -> &'static str { + match self { + Self::Cli => "cli", + Self::App => "app", + Self::Service => "service", + Self::Worker => "worker", + Self::Test => "test", + } + } + + pub fn parse(value: &str) -> Result<Self, LocalEventsError> { + match value { + "cli" => Ok(Self::Cli), + "app" => Ok(Self::App), + "service" => Ok(Self::Service), + "worker" => Ok(Self::Worker), + "test" => Ok(Self::Test), + other => Err(LocalEventsError::InvalidRecord(format!( + "unknown source runtime `{other}`" + ))), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LocalEventRecordInput { + pub record_id: String, + pub family: LocalRecordFamily, + pub status: LocalRecordStatus, + pub source_runtime: SourceRuntime, + pub created_at_ms: i64, + pub inserted_at_ms: i64, + pub owner_account_id: Option<String>, + pub owner_pubkey: Option<String>, + pub farm_id: Option<String>, + pub listing_addr: Option<String>, + pub local_work_json: Option<Value>, + pub event_id: Option<String>, + pub event_kind: Option<i64>, + pub event_pubkey: Option<String>, + pub event_created_at: Option<i64>, + pub event_tags_json: Option<Value>, + pub event_content: Option<String>, + pub event_sig: Option<String>, + pub raw_event_json: Option<Value>, + pub outbox_status: PublishOutboxStatus, + pub relay_set_fingerprint: Option<String>, + pub relay_delivery_json: Option<Value>, +} + +impl LocalEventRecordInput { + pub fn validate(&self) -> Result<(), LocalEventsError> { + validate_non_empty("record_id", &self.record_id)?; + if let Some(value) = self.owner_account_id.as_deref() { + validate_non_empty("owner_account_id", value)?; + } + if let Some(value) = self.owner_pubkey.as_deref() { + validate_non_empty("owner_pubkey", value)?; + } + if let Some(value) = self.farm_id.as_deref() { + validate_non_empty("farm_id", value)?; + } + if let Some(value) = self.listing_addr.as_deref() { + validate_non_empty("listing_addr", value)?; + } + match self.family { + LocalRecordFamily::LocalWork => { + if self.local_work_json.is_none() { + return Err(LocalEventsError::InvalidRecord( + "local work records require local_work_json".to_owned(), + )); + } + if self.outbox_status != PublishOutboxStatus::None { + return Err(LocalEventsError::InvalidRecord( + "local work records must use outbox status none".to_owned(), + )); + } + } + LocalRecordFamily::SignedEvent => { + validate_required("event_id", self.event_id.as_deref())?; + validate_required("event_pubkey", self.event_pubkey.as_deref())?; + validate_required("event_sig", self.event_sig.as_deref())?; + if self.event_kind.is_none() { + return Err(LocalEventsError::InvalidRecord( + "signed event records require event_kind".to_owned(), + )); + } + if self.raw_event_json.is_none() { + return Err(LocalEventsError::InvalidRecord( + "signed event records require raw_event_json".to_owned(), + )); + } + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LocalEventRecord { + pub seq: i64, + pub record_id: String, + pub family: LocalRecordFamily, + pub status: LocalRecordStatus, + pub source_runtime: SourceRuntime, + pub created_at_ms: i64, + pub inserted_at_ms: i64, + pub updated_at_ms: i64, + pub owner_account_id: Option<String>, + pub owner_pubkey: Option<String>, + pub farm_id: Option<String>, + pub listing_addr: Option<String>, + pub local_work_json: Option<Value>, + pub event_id: Option<String>, + pub event_kind: Option<i64>, + pub event_pubkey: Option<String>, + pub event_created_at: Option<i64>, + pub event_tags_json: Option<Value>, + pub event_content: Option<String>, + pub event_sig: Option<String>, + pub raw_event_json: Option<Value>, + pub outbox_status: PublishOutboxStatus, + pub relay_set_fingerprint: Option<String>, + pub relay_delivery_json: Option<Value>, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LocalEventRecordUpdate { + pub record_id: String, + pub status: LocalRecordStatus, + pub outbox_status: PublishOutboxStatus, + pub relay_set_fingerprint: Option<String>, + pub relay_delivery_json: Option<Value>, + pub updated_at_ms: i64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LocalEventsCursor { + pub consumer_id: String, + pub last_seq: i64, + pub updated_at_ms: i64, +} + +pub(crate) fn validate_non_empty(field: &str, value: &str) -> Result<(), LocalEventsError> { + if value.trim().is_empty() { + return Err(LocalEventsError::InvalidRecord(format!( + "{field} must not be empty" + ))); + } + Ok(()) +} + +fn validate_required(field: &str, value: Option<&str>) -> Result<(), LocalEventsError> { + match value { + Some(value) => validate_non_empty(field, value), + None => Err(LocalEventsError::InvalidRecord(format!( + "{field} is required" + ))), + } +} diff --git a/crates/local_events/src/store.rs b/crates/local_events/src/store.rs @@ -0,0 +1,290 @@ +#![forbid(unsafe_code)] + +use radroots_sql_core::SqlExecutor; +use radroots_sql_core::error::SqlError; +use serde::Deserialize; +use serde_json::{Value, json}; + +use crate::migrations; +use crate::models::validate_non_empty; +use crate::{ + LocalEventRecord, LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsCursor, + LocalEventsError, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime, +}; + +pub struct LocalEventsStore<E: SqlExecutor> { + executor: E, +} + +impl<E: SqlExecutor> LocalEventsStore<E> { + pub fn new(executor: E) -> Self { + Self { executor } + } + + pub fn executor(&self) -> &E { + &self.executor + } + + pub fn migrate_up(&self) -> Result<(), SqlError> { + migrations::run_all_up(self.executor()) + } + + pub fn migrate_down(&self) -> Result<(), SqlError> { + migrations::run_all_down(self.executor()) + } + + pub fn append_record( + &self, + input: &LocalEventRecordInput, + ) -> Result<LocalEventRecord, LocalEventsError> { + input.validate()?; + let params = json!([ + input.record_id, + input.family.as_str(), + input.status.as_str(), + input.source_runtime.as_str(), + input.created_at_ms, + input.inserted_at_ms, + input.inserted_at_ms, + input.owner_account_id, + input.owner_pubkey, + input.farm_id, + input.listing_addr, + encode_json(input.local_work_json.as_ref())?, + input.event_id, + input.event_kind, + input.event_pubkey, + input.event_created_at, + encode_json(input.event_tags_json.as_ref())?, + input.event_content, + input.event_sig, + encode_json(input.raw_event_json.as_ref())?, + input.outbox_status.as_str(), + input.relay_set_fingerprint, + encode_json(input.relay_delivery_json.as_ref())? + ]) + .to_string(); + let sql = "insert or ignore into local_event_record( + record_id, + family, + status, + source_runtime, + created_at_ms, + inserted_at_ms, + updated_at_ms, + owner_account_id, + owner_pubkey, + farm_id, + listing_addr, + local_work_json, + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, + outbox_status, + relay_set_fingerprint, + relay_delivery_json + ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + self.executor.exec(sql, &params)?; + self.get_record(&input.record_id)? + .ok_or_else(|| LocalEventsError::InvalidRecord("record append failed".to_owned())) + } + + pub fn get_record( + &self, + record_id: &str, + ) -> Result<Option<LocalEventRecord>, LocalEventsError> { + validate_non_empty("record_id", record_id)?; + let params = json!([record_id]).to_string(); + let rows = self.query_records( + "select * from local_event_record where record_id = ? limit 1", + &params, + )?; + Ok(rows.into_iter().next()) + } + + pub fn list_records_after( + &self, + after_seq: i64, + limit: u32, + ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { + let params = json!([after_seq, i64::from(limit)]).to_string(); + self.query_records( + "select * from local_event_record where seq > ? order by seq asc limit ?", + &params, + ) + } + + pub fn update_outbox( + &self, + update: &LocalEventRecordUpdate, + ) -> Result<LocalEventRecord, LocalEventsError> { + validate_non_empty("record_id", &update.record_id)?; + let params = json!([ + update.status.as_str(), + update.outbox_status.as_str(), + update.relay_set_fingerprint, + encode_json(update.relay_delivery_json.as_ref())?, + update.updated_at_ms, + update.record_id + ]) + .to_string(); + let outcome = self.executor.exec( + "update local_event_record + set status = ?, + outbox_status = ?, + relay_set_fingerprint = ?, + relay_delivery_json = ?, + updated_at_ms = ? + where record_id = ?", + &params, + )?; + if outcome.changes == 0 { + return Err(LocalEventsError::Sql(SqlError::NotFound( + update.record_id.clone(), + ))); + } + self.get_record(&update.record_id)? + .ok_or_else(|| LocalEventsError::Sql(SqlError::NotFound(update.record_id.clone()))) + } + + pub fn get_cursor( + &self, + consumer_id: &str, + ) -> Result<Option<LocalEventsCursor>, LocalEventsError> { + validate_non_empty("consumer_id", consumer_id)?; + let params = json!([consumer_id]).to_string(); + let raw = self.executor.query_raw( + "select consumer_id, last_seq, updated_at_ms from local_event_projection_cursor where consumer_id = ? limit 1", + &params, + )?; + let rows: Vec<CursorRow> = serde_json::from_str(&raw)?; + Ok(rows.into_iter().next().map(Into::into)) + } + + pub fn advance_cursor( + &self, + consumer_id: &str, + last_seq: i64, + updated_at_ms: i64, + ) -> Result<LocalEventsCursor, LocalEventsError> { + validate_non_empty("consumer_id", consumer_id)?; + let params = json!([consumer_id, last_seq, updated_at_ms]).to_string(); + self.executor.exec( + "insert into local_event_projection_cursor(consumer_id, last_seq, updated_at_ms) + values(?,?,?) + on conflict(consumer_id) do update set + last_seq = max(local_event_projection_cursor.last_seq, excluded.last_seq), + updated_at_ms = excluded.updated_at_ms", + &params, + )?; + self.get_cursor(consumer_id)? + .ok_or_else(|| LocalEventsError::InvalidRecord("cursor advance failed".to_owned())) + } + + fn query_records( + &self, + sql: &str, + params: &str, + ) -> Result<Vec<LocalEventRecord>, LocalEventsError> { + let raw = self.executor.query_raw(sql, params)?; + let rows: Vec<RecordRow> = serde_json::from_str(&raw)?; + rows.into_iter().map(TryInto::try_into).collect() + } +} + +#[derive(Debug, Deserialize)] +struct RecordRow { + seq: i64, + record_id: String, + family: String, + status: String, + source_runtime: String, + created_at_ms: i64, + inserted_at_ms: i64, + updated_at_ms: i64, + owner_account_id: Option<String>, + owner_pubkey: Option<String>, + farm_id: Option<String>, + listing_addr: Option<String>, + local_work_json: Option<String>, + event_id: Option<String>, + event_kind: Option<i64>, + event_pubkey: Option<String>, + event_created_at: Option<i64>, + event_tags_json: Option<String>, + event_content: Option<String>, + event_sig: Option<String>, + raw_event_json: Option<String>, + outbox_status: String, + relay_set_fingerprint: Option<String>, + relay_delivery_json: Option<String>, +} + +impl TryFrom<RecordRow> for LocalEventRecord { + type Error = LocalEventsError; + + fn try_from(row: RecordRow) -> Result<Self, Self::Error> { + Ok(Self { + seq: row.seq, + record_id: row.record_id, + family: LocalRecordFamily::parse(&row.family)?, + status: LocalRecordStatus::parse(&row.status)?, + source_runtime: SourceRuntime::parse(&row.source_runtime)?, + created_at_ms: row.created_at_ms, + inserted_at_ms: row.inserted_at_ms, + updated_at_ms: row.updated_at_ms, + owner_account_id: row.owner_account_id, + owner_pubkey: row.owner_pubkey, + farm_id: row.farm_id, + listing_addr: row.listing_addr, + local_work_json: decode_json(row.local_work_json)?, + event_id: row.event_id, + event_kind: row.event_kind, + event_pubkey: row.event_pubkey, + event_created_at: row.event_created_at, + event_tags_json: decode_json(row.event_tags_json)?, + event_content: row.event_content, + event_sig: row.event_sig, + raw_event_json: decode_json(row.raw_event_json)?, + outbox_status: PublishOutboxStatus::parse(&row.outbox_status)?, + relay_set_fingerprint: row.relay_set_fingerprint, + relay_delivery_json: decode_json(row.relay_delivery_json)?, + }) + } +} + +#[derive(Debug, Deserialize)] +struct CursorRow { + consumer_id: String, + last_seq: i64, + updated_at_ms: i64, +} + +impl From<CursorRow> for LocalEventsCursor { + fn from(row: CursorRow) -> Self { + Self { + consumer_id: row.consumer_id, + last_seq: row.last_seq, + updated_at_ms: row.updated_at_ms, + } + } +} + +fn encode_json(value: Option<&Value>) -> Result<Option<String>, LocalEventsError> { + value + .map(serde_json::to_string) + .transpose() + .map_err(Into::into) +} + +fn decode_json(value: Option<String>) -> Result<Option<Value>, LocalEventsError> { + value + .map(|value| serde_json::from_str(&value)) + .transpose() + .map_err(Into::into) +} diff --git a/crates/local_events/tests/store.rs b/crates/local_events/tests/store.rs @@ -0,0 +1,136 @@ +use radroots_local_events::{ + LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily, + LocalRecordStatus, PublishOutboxStatus, SourceRuntime, +}; +use radroots_sql_core::SqliteExecutor; +use serde_json::json; + +fn store() -> LocalEventsStore<SqliteExecutor> { + let executor = SqliteExecutor::open_memory().expect("open memory sqlite"); + let store = LocalEventsStore::new(executor); + store.migrate_up().expect("migrate local events"); + store +} + +fn local_work(record_id: &str) -> LocalEventRecordInput { + LocalEventRecordInput { + record_id: record_id.to_owned(), + family: LocalRecordFamily::LocalWork, + status: LocalRecordStatus::LocalSaved, + source_runtime: SourceRuntime::Cli, + created_at_ms: 1000, + inserted_at_ms: 1001, + owner_account_id: Some("seller-account".to_owned()), + owner_pubkey: Some("seller-pubkey".to_owned()), + farm_id: Some("farm-a".to_owned()), + listing_addr: Some("listing-a".to_owned()), + local_work_json: Some(json!({"kind":"listing","title":"Eggs"})), + event_id: None, + event_kind: None, + event_pubkey: None, + event_created_at: None, + event_tags_json: None, + event_content: None, + event_sig: None, + raw_event_json: None, + outbox_status: PublishOutboxStatus::None, + relay_set_fingerprint: None, + relay_delivery_json: None, + } +} + +fn signed_event(record_id: &str) -> LocalEventRecordInput { + LocalEventRecordInput { + record_id: record_id.to_owned(), + family: LocalRecordFamily::SignedEvent, + status: LocalRecordStatus::PendingPublish, + source_runtime: SourceRuntime::Cli, + created_at_ms: 2000, + inserted_at_ms: 2001, + owner_account_id: Some("seller-account".to_owned()), + owner_pubkey: Some("seller-pubkey".to_owned()), + farm_id: Some("farm-a".to_owned()), + listing_addr: Some("listing-a".to_owned()), + local_work_json: None, + event_id: Some("event-a".to_owned()), + event_kind: Some(3421), + event_pubkey: Some("seller-pubkey".to_owned()), + event_created_at: Some(2000), + event_tags_json: Some(json!([["d", "listing-a"]])), + event_content: Some("{\"title\":\"Eggs\"}".to_owned()), + event_sig: Some("sig-a".to_owned()), + raw_event_json: Some(json!({"id":"event-a","kind":3421})), + outbox_status: PublishOutboxStatus::Pending, + relay_set_fingerprint: Some("relay-set-a".to_owned()), + relay_delivery_json: Some(json!({"pending":["ws://127.0.0.1:8080"]})), + } +} + +#[test] +fn append_rejects_malformed_local_work_records() { + let store = store(); + let mut input = local_work("local-a"); + input.local_work_json = None; + + let err = store.append_record(&input).expect_err("invalid record"); + + assert!(err.to_string().contains("local_work_json")); +} + +#[test] +fn append_is_idempotent_by_record_id() { + let store = store(); + let input = local_work("local-a"); + + let first = store.append_record(&input).expect("append first"); + let second = store.append_record(&input).expect("append second"); + let rows = store.list_records_after(0, 10).expect("list records"); + + assert_eq!(first.seq, second.seq); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].record_id, "local-a"); + assert_eq!( + rows[0].local_work_json, + Some(json!({"kind":"listing","title":"Eggs"})) + ); +} + +#[test] +fn projection_cursor_advances_without_rewinding() { + let store = store(); + + let first = store + .advance_cursor("app", 10, 100) + .expect("advance cursor"); + let second = store.advance_cursor("app", 5, 200).expect("ignore rewind"); + let third = store.advance_cursor("app", 12, 300).expect("advance again"); + + assert_eq!(first.last_seq, 10); + assert_eq!(second.last_seq, 10); + assert_eq!(third.last_seq, 12); +} + +#[test] +fn outbox_status_updates_signed_event_records() { + let store = store(); + let input = signed_event("event-a"); + store.append_record(&input).expect("append signed event"); + + let updated = store + .update_outbox(&LocalEventRecordUpdate { + record_id: "event-a".to_owned(), + status: LocalRecordStatus::Published, + outbox_status: PublishOutboxStatus::Acknowledged, + relay_set_fingerprint: Some("relay-set-a".to_owned()), + relay_delivery_json: Some(json!({"acked":["ws://127.0.0.1:8080"]})), + updated_at_ms: 3000, + }) + .expect("update outbox"); + + assert_eq!(updated.status, LocalRecordStatus::Published); + assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged); + assert_eq!( + updated.relay_delivery_json, + Some(json!({"acked":["ws://127.0.0.1:8080"]})) + ); +}