lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

store.rs (70136B)


      1 use crate::RadrootsEventStoreError;
      2 use crate::migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP};
      3 use crate::model::{
      4     RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest,
      5     RadrootsEventIngestReceipt, RadrootsEventStoreStatusSummary, RadrootsEventVerificationStatus,
      6     RadrootsProjectionCursor, RadrootsRelayObservation, RadrootsStoredEvent,
      7     RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass, tag_semantic_name,
      8     tag_value_type_name,
      9 };
     10 use radroots_events::RadrootsNostrEvent;
     11 use radroots_events::contract::{
     12     RadrootsEventClass, RadrootsEventContract, identify_event_contract,
     13 };
     14 use radroots_events::event_head::{
     15     RadrootsCurrentEventHead, RadrootsEventHeadCandidate, RadrootsEventHeadCandidateResult,
     16     RadrootsEventHeadCoordinate, RadrootsEventHeadDecision, event_head_candidate_for_contract,
     17     select_event_head,
     18 };
     19 use radroots_events::ids::{RadrootsEventId, RadrootsEventSignature, RadrootsPublicKey};
     20 use radroots_nostr::prelude::{RadrootsNostrEventVerification, radroots_nostr_verify_event};
     21 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
     22 use sqlx::{Row, SqlitePool};
     23 use std::path::Path;
     24 use std::str::FromStr;
     25 
     26 pub const RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX: u32 = 1_000;
     27 pub const RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX: usize = 16;
     28 
     29 #[derive(Clone)]
     30 pub struct RadrootsEventStore {
     31     pool: SqlitePool,
     32 }
     33 
     34 impl RadrootsEventStore {
     35     pub async fn open_memory() -> Result<Self, RadrootsEventStoreError> {
     36         let options = SqliteConnectOptions::from_str("sqlite::memory:")?;
     37         let pool = SqlitePoolOptions::new()
     38             .max_connections(1)
     39             .connect_with(options)
     40             .await?;
     41         configure_connection(&pool, false).await?;
     42         apply_up(&pool).await?;
     43         Ok(Self { pool })
     44     }
     45 
     46     pub async fn open_file(path: impl AsRef<Path>) -> Result<Self, RadrootsEventStoreError> {
     47         let options = SqliteConnectOptions::new()
     48             .filename(path)
     49             .create_if_missing(true);
     50         let pool = SqlitePoolOptions::new()
     51             .max_connections(1)
     52             .connect_with(options)
     53             .await?;
     54         configure_connection(&pool, true).await?;
     55         apply_up(&pool).await?;
     56         Ok(Self { pool })
     57     }
     58 
     59     pub fn pool(&self) -> &SqlitePool {
     60         &self.pool
     61     }
     62 
     63     pub async fn migrate_down(&self) -> Result<(), RadrootsEventStoreError> {
     64         apply_down(&self.pool).await
     65     }
     66 
     67     pub async fn pragma_foreign_keys(&self) -> Result<i64, RadrootsEventStoreError> {
     68         query_i64(&self.pool, "PRAGMA foreign_keys").await
     69     }
     70 
     71     pub async fn pragma_busy_timeout(&self) -> Result<i64, RadrootsEventStoreError> {
     72         query_i64(&self.pool, "PRAGMA busy_timeout").await
     73     }
     74 
     75     pub async fn pragma_journal_mode(&self) -> Result<String, RadrootsEventStoreError> {
     76         query_string(&self.pool, "PRAGMA journal_mode").await
     77     }
     78 
     79     pub async fn status_summary(
     80         &self,
     81     ) -> Result<RadrootsEventStoreStatusSummary, RadrootsEventStoreError> {
     82         let row = sqlx::query(
     83             "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN projection_eligible = 1 THEN 1 ELSE 0 END), 0) AS projection_eligible_events, MAX(seq) AS last_event_seq, MAX(updated_at_ms) AS last_event_updated_at_ms FROM nostr_event",
     84         )
     85         .fetch_one(&self.pool)
     86         .await?;
     87         let relay_observations =
     88             query_i64(&self.pool, "SELECT COUNT(*) FROM relay_event_seen").await?;
     89         Ok(RadrootsEventStoreStatusSummary {
     90             total_events: row.try_get("total_events")?,
     91             projection_eligible_events: row.try_get("projection_eligible_events")?,
     92             relay_observations,
     93             last_event_seq: row.try_get("last_event_seq")?,
     94             last_event_updated_at_ms: row.try_get("last_event_updated_at_ms")?,
     95         })
     96     }
     97 
     98     pub async fn ingest_event(
     99         &self,
    100         ingest: RadrootsEventIngest,
    101     ) -> Result<RadrootsEventIngestReceipt, RadrootsEventStoreError> {
    102         validate_event_identity(&ingest.event)?;
    103         let verification_status = verify_event(&ingest.event);
    104         let classification = classify_event(&ingest.event);
    105         let raw_json = ingest
    106             .raw_json
    107             .clone()
    108             .map(Ok)
    109             .unwrap_or_else(|| serde_json::to_string(&ingest.event))?;
    110         let tags_json = serde_json::to_string(&ingest.event.tags)?;
    111         let mut tx = self.pool.begin().await?;
    112         let insert = insert_raw_event(
    113             &mut tx,
    114             &ingest,
    115             &classification,
    116             verification_status,
    117             raw_json.as_str(),
    118             tags_json.as_str(),
    119         )
    120         .await?;
    121         let inserted = insert.inserted;
    122         let mut head_decision = RadrootsEventHeadStoreDecision::Unsupported;
    123         let mut projection_eligible = classification.base_projection_eligible(verification_status);
    124 
    125         if inserted {
    126             insert_tags(&mut tx, &ingest.event, classification.contract).await?;
    127             if let Some(contract) = classification.contract {
    128                 if projection_eligible {
    129                     let head =
    130                         apply_event_head(&mut tx, &ingest.event, contract, ingest.observed_at_ms)
    131                             .await?;
    132                     projection_eligible = head.projection_eligible;
    133                     head_decision = head.decision;
    134                     sqlx::query(
    135                         "UPDATE nostr_event SET projection_eligible = ?, updated_at_ms = ? WHERE event_id = ?",
    136                     )
    137                     .bind(bool_i64(projection_eligible))
    138                     .bind(ingest.observed_at_ms)
    139                     .bind(ingest.event.id.as_str())
    140                     .execute(&mut *tx)
    141                     .await?;
    142                 } else {
    143                     head_decision = RadrootsEventHeadStoreDecision::NotProjectionEligible;
    144                 }
    145             }
    146         } else if classification.contract.is_some() {
    147             head_decision = RadrootsEventHeadStoreDecision::SkippedDuplicate;
    148             projection_eligible = false;
    149         }
    150 
    151         if let Some(observation) = ingest.relay_observation.as_ref() {
    152             upsert_observation(&mut tx, ingest.event.id.as_str(), observation).await?;
    153         }
    154 
    155         tx.commit().await?;
    156 
    157         Ok(RadrootsEventIngestReceipt {
    158             seq: insert.seq,
    159             event_id: ingest.event.id,
    160             inserted,
    161             verification_status,
    162             contract_status: classification.contract_status,
    163             contract_id: classification
    164                 .contract
    165                 .map(|contract| contract.id.to_owned()),
    166             projection_eligible,
    167             head_decision,
    168         })
    169     }
    170 
    171     pub async fn get_event(
    172         &self,
    173         event_id: &str,
    174     ) -> Result<Option<RadrootsStoredEvent>, RadrootsEventStoreError> {
    175         let row = sqlx::query(
    176             "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE event_id = ?",
    177         )
    178         .bind(event_id)
    179         .fetch_optional(&self.pool)
    180         .await?;
    181         row.map(stored_event_from_row).transpose()
    182     }
    183 
    184     pub async fn tags_for_event(
    185         &self,
    186         event_id: &str,
    187     ) -> Result<Vec<RadrootsStoredEventTag>, RadrootsEventStoreError> {
    188         let rows = sqlx::query(
    189             "SELECT event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed FROM nostr_event_tag WHERE event_id = ? ORDER BY tag_index",
    190         )
    191         .bind(event_id)
    192         .fetch_all(&self.pool)
    193         .await?;
    194         rows.into_iter().map(stored_tag_from_row).collect()
    195     }
    196 
    197     pub async fn observations_for_event(
    198         &self,
    199         event_id: &str,
    200     ) -> Result<Vec<RadrootsRelayObservationRow>, RadrootsEventStoreError> {
    201         let rows = sqlx::query(
    202             "SELECT event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message FROM relay_event_seen WHERE event_id = ? ORDER BY relay_url, observation_type",
    203         )
    204         .bind(event_id)
    205         .fetch_all(&self.pool)
    206         .await?;
    207         rows.into_iter().map(relay_observation_from_row).collect()
    208     }
    209 
    210     pub async fn event_head(
    211         &self,
    212         coordinate: &RadrootsEventHeadCoordinate,
    213     ) -> Result<Option<RadrootsStoredEventHead>, RadrootsEventStoreError> {
    214         let row = match coordinate {
    215             RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => {
    216                 sqlx::query(
    217                     "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL",
    218                 )
    219                 .bind(i64::from(*kind))
    220                 .bind(pubkey.as_str())
    221                 .fetch_optional(&self.pool)
    222                 .await?
    223             }
    224             RadrootsEventHeadCoordinate::Addressable {
    225                 kind,
    226                 pubkey,
    227                 d_tag,
    228             } => {
    229                 sqlx::query(
    230                     "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?",
    231                 )
    232                 .bind(i64::from(*kind))
    233                 .bind(pubkey.as_str())
    234                 .bind(d_tag.as_str())
    235                 .fetch_optional(&self.pool)
    236                 .await?
    237             }
    238         };
    239         row.map(stored_head_from_row).transpose()
    240     }
    241 
    242     pub async fn get_projection_cursor(
    243         &self,
    244         projection_id: &str,
    245     ) -> Result<Option<RadrootsProjectionCursor>, RadrootsEventStoreError> {
    246         let row = sqlx::query(
    247             "SELECT projection_id, projection_version, last_event_seq, updated_at_ms FROM projection_cursor WHERE projection_id = ?",
    248         )
    249         .bind(projection_id)
    250         .fetch_optional(&self.pool)
    251         .await?;
    252         row.map(projection_cursor_from_row).transpose()
    253     }
    254 
    255     pub async fn update_projection_cursor(
    256         &self,
    257         cursor: &RadrootsProjectionCursor,
    258     ) -> Result<(), RadrootsEventStoreError> {
    259         sqlx::query(
    260             "INSERT INTO projection_cursor(projection_id, projection_version, last_event_seq, updated_at_ms) VALUES (?, ?, ?, ?) ON CONFLICT(projection_id) DO UPDATE SET projection_version = excluded.projection_version, last_event_seq = excluded.last_event_seq, updated_at_ms = excluded.updated_at_ms",
    261         )
    262         .bind(cursor.projection_id.as_str())
    263         .bind(i64::from(cursor.projection_version))
    264         .bind(cursor.last_event_seq)
    265         .bind(cursor.updated_at_ms)
    266         .execute(&self.pool)
    267         .await?;
    268         Ok(())
    269     }
    270 
    271     pub async fn events_since_cursor(
    272         &self,
    273         projection_id: &str,
    274         limit: u32,
    275     ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> {
    276         let cursor = self.get_projection_cursor(projection_id).await?;
    277         let last_event_seq = cursor
    278             .as_ref()
    279             .map(|cursor| cursor.last_event_seq)
    280             .unwrap_or(0);
    281         let rows = sqlx::query(
    282             "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE projection_eligible = 1 AND seq > ? ORDER BY seq ASC LIMIT ?",
    283         )
    284         .bind(last_event_seq)
    285         .bind(i64::from(limit))
    286         .fetch_all(&self.pool)
    287         .await?;
    288         rows.into_iter().map(stored_event_from_row).collect()
    289     }
    290 
    291     pub async fn events_by_tag(
    292         &self,
    293         tag_name: &str,
    294         tag_value: &str,
    295         limit: u32,
    296     ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> {
    297         validate_tag_query(tag_name, limit)?;
    298         let rows = sqlx::query(
    299             "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event AS event WHERE projection_eligible = 1 AND EXISTS (SELECT 1 FROM nostr_event_tag AS tag WHERE tag.event_id = event.event_id AND tag.tag_name = ? AND tag.tag_value = ?) ORDER BY event.seq ASC LIMIT ?",
    300         )
    301         .bind(tag_name)
    302         .bind(tag_value)
    303         .bind(i64::from(limit))
    304         .fetch_all(&self.pool)
    305         .await?;
    306         rows.into_iter().map(stored_event_from_row).collect()
    307     }
    308 
    309     pub async fn events_by_contract_and_tag<S>(
    310         &self,
    311         contract_ids: &[S],
    312         tag_name: &str,
    313         tag_value: &str,
    314         limit: u32,
    315     ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError>
    316     where
    317         S: AsRef<str>,
    318     {
    319         validate_contract_tag_query(contract_ids, tag_name, limit)?;
    320         let placeholders = core::iter::repeat_n("?", contract_ids.len())
    321             .collect::<Vec<_>>()
    322             .join(", ");
    323         let sql = format!(
    324             "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event AS event WHERE projection_eligible = 1 AND contract_id IN ({placeholders}) AND EXISTS (SELECT 1 FROM nostr_event_tag AS tag WHERE tag.event_id = event.event_id AND tag.tag_name = ? AND tag.tag_value = ?) ORDER BY event.seq ASC LIMIT ?"
    325         );
    326         let mut query = sqlx::query(sql.as_str());
    327         for contract_id in contract_ids {
    328             query = query.bind(contract_id.as_ref());
    329         }
    330         let rows = query
    331             .bind(tag_name)
    332             .bind(tag_value)
    333             .bind(i64::from(limit))
    334             .fetch_all(&self.pool)
    335             .await?;
    336         rows.into_iter().map(stored_event_from_row).collect()
    337     }
    338 }
    339 
    340 #[derive(Clone, Debug, PartialEq, Eq)]
    341 pub struct RadrootsRelayObservationRow {
    342     pub event_id: String,
    343     pub relay_url: String,
    344     pub observation_type: String,
    345     pub first_seen_at_ms: i64,
    346     pub last_seen_at_ms: i64,
    347     pub observation_count: i64,
    348     pub last_message: Option<String>,
    349 }
    350 
    351 struct EventClassification {
    352     contract_status: RadrootsEventContractStatus,
    353     contract: Option<&'static RadrootsEventContract>,
    354 }
    355 
    356 impl EventClassification {
    357     fn base_projection_eligible(&self, verification: RadrootsEventVerificationStatus) -> bool {
    358         verification == RadrootsEventVerificationStatus::Verified
    359             && self
    360                 .contract
    361                 .map(|contract| contract.class != RadrootsEventClass::Ephemeral)
    362                 .unwrap_or(false)
    363     }
    364 }
    365 
    366 struct AppliedHead {
    367     decision: RadrootsEventHeadStoreDecision,
    368     projection_eligible: bool,
    369 }
    370 
    371 struct InsertRawEventResult {
    372     inserted: bool,
    373     seq: i64,
    374 }
    375 
    376 async fn configure_connection(
    377     pool: &SqlitePool,
    378     file_backed: bool,
    379 ) -> Result<(), RadrootsEventStoreError> {
    380     sqlx::query("PRAGMA foreign_keys = ON")
    381         .execute(pool)
    382         .await?;
    383     sqlx::query("PRAGMA busy_timeout = 5000")
    384         .execute(pool)
    385         .await?;
    386     if file_backed {
    387         sqlx::query("PRAGMA journal_mode = WAL")
    388             .execute(pool)
    389             .await?;
    390     }
    391     Ok(())
    392 }
    393 
    394 #[cfg_attr(coverage_nightly, coverage(off))]
    395 async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> {
    396     sqlx::raw_sql(EVENT_STORE_MIGRATION_UP)
    397         .execute(pool)
    398         .await?;
    399     Ok(())
    400 }
    401 
    402 #[cfg_attr(coverage_nightly, coverage(off))]
    403 async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> {
    404     sqlx::raw_sql(EVENT_STORE_MIGRATION_DOWN)
    405         .execute(pool)
    406         .await?;
    407     Ok(())
    408 }
    409 
    410 #[cfg_attr(coverage_nightly, coverage(off))]
    411 async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsEventStoreError> {
    412     let row = sqlx::query(sql).fetch_one(pool).await?;
    413     Ok(row.try_get(0)?)
    414 }
    415 
    416 #[cfg_attr(coverage_nightly, coverage(off))]
    417 async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsEventStoreError> {
    418     let row = sqlx::query(sql).fetch_one(pool).await?;
    419     Ok(row.try_get(0)?)
    420 }
    421 
    422 fn validate_event_identity(event: &RadrootsNostrEvent) -> Result<(), RadrootsEventStoreError> {
    423     RadrootsEventId::parse(event.id.as_str())?;
    424     RadrootsPublicKey::parse(event.author.as_str())?;
    425     RadrootsEventSignature::parse(event.sig.as_str())?;
    426     Ok(())
    427 }
    428 
    429 fn classify_event(event: &RadrootsNostrEvent) -> EventClassification {
    430     match identify_event_contract(event.kind, &event.tags, &event.content) {
    431         Ok(contract) => EventClassification {
    432             contract_status: RadrootsEventContractStatus::Supported,
    433             contract: Some(contract),
    434         },
    435         Err(error) => EventClassification {
    436             contract_status: RadrootsEventContractStatus::from_match_error(error),
    437             contract: None,
    438         },
    439     }
    440 }
    441 
    442 fn verify_event(event: &RadrootsNostrEvent) -> RadrootsEventVerificationStatus {
    443     verification_status_from_nostr(radroots_nostr_verify_event(event))
    444 }
    445 
    446 fn verification_status_from_nostr(
    447     verification: RadrootsNostrEventVerification,
    448 ) -> RadrootsEventVerificationStatus {
    449     match verification {
    450         RadrootsNostrEventVerification::Verified => RadrootsEventVerificationStatus::Verified,
    451         RadrootsNostrEventVerification::IdVerified => RadrootsEventVerificationStatus::IdVerified,
    452         RadrootsNostrEventVerification::IdMismatch => RadrootsEventVerificationStatus::IdMismatch,
    453         RadrootsNostrEventVerification::SignatureInvalid => {
    454             RadrootsEventVerificationStatus::SignatureInvalid
    455         }
    456         RadrootsNostrEventVerification::MalformedEnvelope => {
    457             RadrootsEventVerificationStatus::MalformedEnvelope
    458         }
    459     }
    460 }
    461 
    462 async fn insert_raw_event(
    463     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    464     ingest: &RadrootsEventIngest,
    465     classification: &EventClassification,
    466     verification_status: RadrootsEventVerificationStatus,
    467     raw_json: &str,
    468     tags_json: &str,
    469 ) -> Result<InsertRawEventResult, RadrootsEventStoreError> {
    470     let event = &ingest.event;
    471     let contract_id = classification.contract.map(|contract| contract.id);
    472     let event_class = classification
    473         .contract
    474         .map(|contract| StoredEventClass::from_event_class(contract.class).as_str());
    475     let projection_eligible = classification.base_projection_eligible(verification_status);
    476     let result = sqlx::query(
    477         "INSERT OR IGNORE INTO nostr_event(event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
    478     )
    479     .bind(event.id.as_str())
    480     .bind(event.author.as_str())
    481     .bind(i64::from(event.created_at))
    482     .bind(i64::from(event.kind))
    483     .bind(tags_json)
    484     .bind(event.content.as_str())
    485     .bind(event.sig.as_str())
    486     .bind(raw_json)
    487     .bind(verification_status.as_str())
    488     .bind(classification.contract_status.as_str())
    489     .bind(contract_id)
    490     .bind(event_class)
    491     .bind(bool_i64(projection_eligible))
    492     .bind(ingest.observed_at_ms)
    493     .bind(ingest.observed_at_ms)
    494     .execute(&mut **tx)
    495     .await?;
    496     let inserted = result.rows_affected() > 0;
    497     let seq = event_seq(tx, event.id.as_str()).await?;
    498     Ok(InsertRawEventResult { inserted, seq })
    499 }
    500 
    501 #[cfg_attr(coverage_nightly, coverage(off))]
    502 async fn event_seq(
    503     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    504     event_id: &str,
    505 ) -> Result<i64, RadrootsEventStoreError> {
    506     let row = sqlx::query("SELECT seq FROM nostr_event WHERE event_id = ?")
    507         .bind(event_id)
    508         .fetch_one(&mut **tx)
    509         .await?;
    510     row.try_get("seq").map_err(Into::into)
    511 }
    512 
    513 #[cfg_attr(coverage_nightly, coverage(off))]
    514 async fn insert_tags(
    515     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    516     event: &RadrootsNostrEvent,
    517     contract: Option<&'static RadrootsEventContract>,
    518 ) -> Result<(), RadrootsEventStoreError> {
    519     for (index, tag) in event.tags.iter().enumerate() {
    520         let tag_name = tag.first().map(String::as_str).unwrap_or("");
    521         let tag_value = tag.get(1).map(String::as_str);
    522         let tag_json = serde_json::to_string(tag)?;
    523         let tag_contract = contract.and_then(|contract| {
    524             contract
    525                 .tags
    526                 .iter()
    527                 .find(|candidate| candidate.name == tag_name)
    528         });
    529         let contract_semantic = tag_contract.map(|tag| tag_semantic_name(tag.semantic));
    530         let contract_value_type = tag_contract.map(|tag| tag_value_type_name(tag.value_type));
    531         let relay_indexed = tag_contract.map(|tag| tag.relay_indexed).unwrap_or(false);
    532         sqlx::query(
    533             "INSERT INTO nostr_event_tag(event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
    534         )
    535         .bind(event.id.as_str())
    536         .bind(i64::try_from(index).map_err(|_| RadrootsEventStoreError::IntegerRange {
    537             field: "tag_index",
    538             value: i64::MAX,
    539         })?)
    540         .bind(tag_name)
    541         .bind(tag_value)
    542         .bind(tag_json.as_str())
    543         .bind(contract_semantic)
    544         .bind(contract_value_type)
    545         .bind(bool_i64(relay_indexed))
    546         .execute(&mut **tx)
    547         .await?;
    548     }
    549     Ok(())
    550 }
    551 
    552 #[cfg_attr(coverage_nightly, coverage(off))]
    553 async fn upsert_observation(
    554     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    555     event_id: &str,
    556     observation: &RadrootsRelayObservation,
    557 ) -> Result<(), RadrootsEventStoreError> {
    558     sqlx::query(
    559         "INSERT INTO relay_event_seen(event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message) VALUES (?, ?, ?, ?, ?, 1, ?) ON CONFLICT(event_id, relay_url, observation_type) DO UPDATE SET last_seen_at_ms = excluded.last_seen_at_ms, observation_count = relay_event_seen.observation_count + 1, last_message = excluded.last_message",
    560     )
    561     .bind(event_id)
    562     .bind(observation.relay_url.as_str())
    563     .bind(observation.observation_type.as_str())
    564     .bind(observation.observed_at_ms)
    565     .bind(observation.observed_at_ms)
    566     .bind(observation.message.as_deref())
    567     .execute(&mut **tx)
    568     .await?;
    569     Ok(())
    570 }
    571 
    572 async fn apply_event_head(
    573     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    574     event: &RadrootsNostrEvent,
    575     contract: &RadrootsEventContract,
    576     updated_at_ms: i64,
    577 ) -> Result<AppliedHead, RadrootsEventStoreError> {
    578     let candidate = match event_head_candidate_for_contract(event, contract) {
    579         RadrootsEventHeadCandidateResult::Candidate(candidate) => candidate,
    580         RadrootsEventHeadCandidateResult::NotHeadSelected => {
    581             return Ok(AppliedHead {
    582                 decision: RadrootsEventHeadStoreDecision::NotHeadSelected,
    583                 projection_eligible: true,
    584             });
    585         }
    586         RadrootsEventHeadCandidateResult::NotPersisted => {
    587             return Ok(AppliedHead {
    588                 decision: RadrootsEventHeadStoreDecision::NotPersisted,
    589                 projection_eligible: false,
    590             });
    591         }
    592         RadrootsEventHeadCandidateResult::Malformed(_) => {
    593             return Ok(AppliedHead {
    594                 decision: RadrootsEventHeadStoreDecision::Malformed,
    595                 projection_eligible: false,
    596             });
    597         }
    598     };
    599     let current = current_event_head(tx, &candidate.coordinate).await?;
    600     let protocol_decision = select_event_head(candidate.clone(), current.as_ref());
    601     if let RadrootsEventHeadDecision::Applied(head) = &protocol_decision {
    602         upsert_head(tx, &candidate, head, updated_at_ms).await?;
    603     }
    604     let projection_eligible = matches!(protocol_decision, RadrootsEventHeadDecision::Applied(_));
    605     Ok(AppliedHead {
    606         decision: RadrootsEventHeadStoreDecision::from_protocol(&protocol_decision),
    607         projection_eligible,
    608     })
    609 }
    610 
    611 async fn current_event_head(
    612     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    613     coordinate: &RadrootsEventHeadCoordinate,
    614 ) -> Result<Option<RadrootsCurrentEventHead>, RadrootsEventStoreError> {
    615     let row = match coordinate {
    616         RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => {
    617             sqlx::query(
    618                 "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL",
    619             )
    620             .bind(i64::from(*kind))
    621             .bind(pubkey.as_str())
    622             .fetch_optional(&mut **tx)
    623             .await?
    624         }
    625         RadrootsEventHeadCoordinate::Addressable {
    626             kind,
    627             pubkey,
    628             d_tag,
    629         } => {
    630             sqlx::query(
    631                 "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?",
    632             )
    633             .bind(i64::from(*kind))
    634             .bind(pubkey.as_str())
    635             .bind(d_tag.as_str())
    636             .fetch_optional(&mut **tx)
    637             .await?
    638         }
    639     };
    640     row.map(|row| {
    641         let event_id: String = row.try_get("event_id")?;
    642         let created_at: i64 = row.try_get("created_at")?;
    643         Ok(RadrootsCurrentEventHead {
    644             coordinate: coordinate.clone(),
    645             event_id: RadrootsEventId::parse(event_id)?,
    646             created_at: u32_from_i64("created_at", created_at)?,
    647         })
    648     })
    649     .transpose()
    650 }
    651 
    652 #[cfg_attr(coverage_nightly, coverage(off))]
    653 async fn upsert_head(
    654     tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
    655     candidate: &RadrootsEventHeadCandidate,
    656     head: &RadrootsCurrentEventHead,
    657     updated_at_ms: i64,
    658 ) -> Result<(), RadrootsEventStoreError> {
    659     match &head.coordinate {
    660         RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => {
    661             sqlx::query(
    662                 "DELETE FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL",
    663             )
    664             .bind(i64::from(*kind))
    665             .bind(pubkey.as_str())
    666             .execute(&mut **tx)
    667             .await?;
    668             sqlx::query(
    669                 "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('replaceable', ?, ?, NULL, ?, ?, ?)",
    670             )
    671             .bind(i64::from(*kind))
    672             .bind(pubkey.as_str())
    673             .bind(candidate.event_id.as_str())
    674             .bind(i64::from(candidate.created_at))
    675             .bind(updated_at_ms)
    676             .execute(&mut **tx)
    677             .await?;
    678         }
    679         RadrootsEventHeadCoordinate::Addressable {
    680             kind,
    681             pubkey,
    682             d_tag,
    683         } => {
    684             sqlx::query(
    685                 "DELETE FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?",
    686             )
    687             .bind(i64::from(*kind))
    688             .bind(pubkey.as_str())
    689             .bind(d_tag.as_str())
    690             .execute(&mut **tx)
    691             .await?;
    692             sqlx::query(
    693                 "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('addressable', ?, ?, ?, ?, ?, ?)",
    694             )
    695             .bind(i64::from(*kind))
    696             .bind(pubkey.as_str())
    697             .bind(d_tag.as_str())
    698             .bind(candidate.event_id.as_str())
    699             .bind(i64::from(candidate.created_at))
    700             .bind(updated_at_ms)
    701             .execute(&mut **tx)
    702             .await?;
    703         }
    704     }
    705     Ok(())
    706 }
    707 
    708 #[cfg_attr(coverage_nightly, coverage(off))]
    709 fn stored_event_from_row(
    710     row: sqlx::sqlite::SqliteRow,
    711 ) -> Result<RadrootsStoredEvent, RadrootsEventStoreError> {
    712     let kind = u32_from_i64("kind", row.try_get("kind")?)?;
    713     let created_at = u32_from_i64("created_at", row.try_get("created_at")?)?;
    714     let verification_status =
    715         RadrootsEventVerificationStatus::parse(row.try_get("verification_status")?)?;
    716     let contract_status =
    717         RadrootsEventContractStatus::parse(row.try_get("contract_status")?, kind)?;
    718     let event_class = row
    719         .try_get::<Option<String>, _>("event_class")?
    720         .map(|value| StoredEventClass::parse(value.as_str()))
    721         .transpose()?;
    722     let projection_eligible = row.try_get::<i64, _>("projection_eligible")? != 0;
    723     Ok(RadrootsStoredEvent {
    724         seq: row.try_get("seq")?,
    725         event_id: row.try_get("event_id")?,
    726         pubkey: row.try_get("pubkey")?,
    727         created_at,
    728         kind,
    729         tags_json: row.try_get("tags_json")?,
    730         content: row.try_get("content")?,
    731         sig: row.try_get("sig")?,
    732         raw_json: row.try_get("raw_json")?,
    733         verification_status,
    734         contract_status,
    735         contract_id: row.try_get("contract_id")?,
    736         event_class,
    737         projection_eligible,
    738         inserted_at_ms: row.try_get("inserted_at_ms")?,
    739         updated_at_ms: row.try_get("updated_at_ms")?,
    740     })
    741 }
    742 
    743 #[cfg_attr(coverage_nightly, coverage(off))]
    744 fn stored_tag_from_row(
    745     row: sqlx::sqlite::SqliteRow,
    746 ) -> Result<RadrootsStoredEventTag, RadrootsEventStoreError> {
    747     Ok(RadrootsStoredEventTag {
    748         event_id: row.try_get("event_id")?,
    749         tag_index: u32_from_i64("tag_index", row.try_get("tag_index")?)?,
    750         tag_name: row.try_get("tag_name")?,
    751         tag_value: row.try_get("tag_value")?,
    752         tag_json: row.try_get("tag_json")?,
    753         contract_semantic: row.try_get("contract_semantic")?,
    754         contract_value_type: row.try_get("contract_value_type")?,
    755         relay_indexed: row.try_get::<i64, _>("relay_indexed")? != 0,
    756     })
    757 }
    758 
    759 #[cfg_attr(coverage_nightly, coverage(off))]
    760 fn stored_head_from_row(
    761     row: sqlx::sqlite::SqliteRow,
    762 ) -> Result<RadrootsStoredEventHead, RadrootsEventStoreError> {
    763     Ok(RadrootsStoredEventHead {
    764         coordinate_type: StoredEventClass::parse(row.try_get("coordinate_type")?)?,
    765         kind: u32_from_i64("kind", row.try_get("kind")?)?,
    766         pubkey: row.try_get("pubkey")?,
    767         d_tag: row.try_get("d_tag")?,
    768         event_id: row.try_get("event_id")?,
    769         created_at: u32_from_i64("created_at", row.try_get("created_at")?)?,
    770         updated_at_ms: row.try_get("updated_at_ms")?,
    771     })
    772 }
    773 
    774 #[cfg_attr(coverage_nightly, coverage(off))]
    775 fn projection_cursor_from_row(
    776     row: sqlx::sqlite::SqliteRow,
    777 ) -> Result<RadrootsProjectionCursor, RadrootsEventStoreError> {
    778     Ok(RadrootsProjectionCursor {
    779         projection_id: row.try_get("projection_id")?,
    780         projection_version: u32_from_i64("projection_version", row.try_get("projection_version")?)?,
    781         last_event_seq: row.try_get("last_event_seq")?,
    782         updated_at_ms: row.try_get("updated_at_ms")?,
    783     })
    784 }
    785 
    786 #[cfg_attr(coverage_nightly, coverage(off))]
    787 fn relay_observation_from_row(
    788     row: sqlx::sqlite::SqliteRow,
    789 ) -> Result<RadrootsRelayObservationRow, RadrootsEventStoreError> {
    790     Ok(RadrootsRelayObservationRow {
    791         event_id: row.try_get("event_id")?,
    792         relay_url: row.try_get("relay_url")?,
    793         observation_type: row.try_get("observation_type")?,
    794         first_seen_at_ms: row.try_get("first_seen_at_ms")?,
    795         last_seen_at_ms: row.try_get("last_seen_at_ms")?,
    796         observation_count: row.try_get("observation_count")?,
    797         last_message: row.try_get("last_message")?,
    798     })
    799 }
    800 
    801 #[cfg_attr(coverage_nightly, coverage(off))]
    802 fn u32_from_i64(field: &'static str, value: i64) -> Result<u32, RadrootsEventStoreError> {
    803     u32::try_from(value).map_err(|_| RadrootsEventStoreError::IntegerRange { field, value })
    804 }
    805 
    806 fn bool_i64(value: bool) -> i64 {
    807     if value { 1 } else { 0 }
    808 }
    809 
    810 fn validate_tag_query(tag_name: &str, limit: u32) -> Result<(), RadrootsEventStoreError> {
    811     if tag_name.is_empty() {
    812         return Err(RadrootsEventStoreError::EmptyTagName);
    813     }
    814     if !(1..=RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX).contains(&limit) {
    815         return Err(RadrootsEventStoreError::QueryLimitOutOfRange {
    816             min: 1,
    817             max: RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX,
    818             actual: limit,
    819         });
    820     }
    821     Ok(())
    822 }
    823 
    824 fn validate_contract_tag_query<S>(
    825     contract_ids: &[S],
    826     tag_name: &str,
    827     limit: u32,
    828 ) -> Result<(), RadrootsEventStoreError>
    829 where
    830     S: AsRef<str>,
    831 {
    832     if contract_ids.is_empty() {
    833         return Err(RadrootsEventStoreError::EmptyContractList);
    834     }
    835     if contract_ids.len() > RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX {
    836         return Err(RadrootsEventStoreError::ContractListTooLarge {
    837             max: RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX,
    838             actual: contract_ids.len(),
    839         });
    840     }
    841     validate_tag_query(tag_name, limit)
    842 }
    843 
    844 #[cfg(test)]
    845 mod tests {
    846     use super::*;
    847     use radroots_events::event_head::event_head_candidate_for_event;
    848     use radroots_events::kinds::{
    849         KIND_GEOCHAT, KIND_LISTING, KIND_ORDER_REQUEST, KIND_POST, KIND_PROFILE,
    850     };
    851     use radroots_nostr::prelude::{
    852         RadrootsNostrKeys, RadrootsNostrSecretKey, RadrootsNostrTimestamp,
    853         radroots_event_from_nostr, radroots_nostr_build_event,
    854     };
    855 
    856     const FIXTURE_ALICE_SECRET_KEY_HEX: &str =
    857         "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5";
    858     const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str =
    859         "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df";
    860 
    861     fn fixture_keys() -> RadrootsNostrKeys {
    862         let secret_key =
    863             RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key");
    864         RadrootsNostrKeys::new(secret_key)
    865     }
    866 
    867     fn event_id(character: char) -> String {
    868         core::iter::repeat_n(character, 64).collect()
    869     }
    870 
    871     fn signed_event(
    872         kind: u32,
    873         created_at: u32,
    874         tags: Vec<Vec<String>>,
    875         content: &str,
    876     ) -> RadrootsNostrEvent {
    877         let raw_event = radroots_nostr_build_event(kind, content, tags)
    878             .expect("builder")
    879             .custom_created_at(RadrootsNostrTimestamp::from_secs(u64::from(created_at)))
    880             .sign_with_keys(&fixture_keys())
    881             .expect("signed event");
    882         radroots_event_from_nostr(&raw_event)
    883     }
    884 
    885     fn tamper_signature(event: &mut RadrootsNostrEvent) {
    886         let replacement = if event.sig.starts_with('0') { "1" } else { "0" };
    887         event.sig.replace_range(0..1, replacement);
    888     }
    889 
    890     fn listing_tags(d_tag: &str) -> Vec<Vec<String>> {
    891         vec![vec!["d".to_owned(), d_tag.to_owned()]]
    892     }
    893 
    894     fn head_coordinate_for_event(event: &RadrootsNostrEvent) -> RadrootsEventHeadCoordinate {
    895         let RadrootsEventHeadCandidateResult::Candidate(candidate) =
    896             event_head_candidate_for_event(event).expect("head candidate")
    897         else {
    898             panic!("event should select a head");
    899         };
    900         candidate.coordinate
    901     }
    902 
    903     fn profile_coordinate() -> RadrootsEventHeadCoordinate {
    904         RadrootsEventHeadCoordinate::Replaceable {
    905             kind: KIND_PROFILE,
    906             pubkey: RadrootsPublicKey::parse(FIXTURE_ALICE_PUBLIC_KEY_HEX).expect("pubkey"),
    907         }
    908     }
    909 
    910     #[test]
    911     fn verification_status_values_round_trip() {
    912         for status in [
    913             RadrootsEventVerificationStatus::NotChecked,
    914             RadrootsEventVerificationStatus::IdVerified,
    915             RadrootsEventVerificationStatus::Verified,
    916             RadrootsEventVerificationStatus::IdMismatch,
    917             RadrootsEventVerificationStatus::SignatureInvalid,
    918             RadrootsEventVerificationStatus::MalformedEnvelope,
    919         ] {
    920             assert_eq!(
    921                 RadrootsEventVerificationStatus::parse(status.as_str()).expect("status"),
    922                 status
    923             );
    924         }
    925         assert!(RadrootsEventVerificationStatus::parse("invalid").is_err());
    926     }
    927 
    928     #[test]
    929     fn verification_status_mapper_covers_all_nostr_results() {
    930         assert_eq!(
    931             verification_status_from_nostr(RadrootsNostrEventVerification::Verified),
    932             RadrootsEventVerificationStatus::Verified
    933         );
    934         assert_eq!(
    935             verification_status_from_nostr(RadrootsNostrEventVerification::IdVerified),
    936             RadrootsEventVerificationStatus::IdVerified
    937         );
    938         assert_eq!(
    939             verification_status_from_nostr(RadrootsNostrEventVerification::IdMismatch),
    940             RadrootsEventVerificationStatus::IdMismatch
    941         );
    942         assert_eq!(
    943             verification_status_from_nostr(RadrootsNostrEventVerification::SignatureInvalid),
    944             RadrootsEventVerificationStatus::SignatureInvalid
    945         );
    946         assert_eq!(
    947             verification_status_from_nostr(RadrootsNostrEventVerification::MalformedEnvelope),
    948             RadrootsEventVerificationStatus::MalformedEnvelope
    949         );
    950     }
    951 
    952     #[tokio::test]
    953     async fn constructor_enforces_sqlite_pragmas() {
    954         let store = RadrootsEventStore::open_memory().await.expect("open");
    955 
    956         assert_eq!(store.pragma_foreign_keys().await.expect("foreign_keys"), 1);
    957         assert_eq!(
    958             store.pragma_busy_timeout().await.expect("busy_timeout"),
    959             5000
    960         );
    961         assert_eq!(
    962             store.pragma_journal_mode().await.expect("journal"),
    963             "memory"
    964         );
    965     }
    966 
    967     #[tokio::test]
    968     async fn status_summary_counts_events_projections_and_relay_observations() {
    969         let store = RadrootsEventStore::open_memory().await.expect("open");
    970 
    971         let empty = store.status_summary().await.expect("empty status");
    972         assert_eq!(empty.total_events, 0);
    973         assert_eq!(empty.projection_eligible_events, 0);
    974         assert_eq!(empty.relay_observations, 0);
    975         assert_eq!(empty.last_event_seq, None);
    976         assert_eq!(empty.last_event_updated_at_ms, None);
    977 
    978         let event = signed_event(
    979             KIND_POST,
    980             10,
    981             vec![vec!["t".to_owned(), "soil".to_owned()]],
    982             "hello",
    983         );
    984         let observation = RadrootsRelayObservation::new(
    985             "wss://relay.example.com",
    986             crate::RadrootsRelayObservationType::PublishAck,
    987             1_100,
    988         );
    989         store
    990             .ingest_event(RadrootsEventIngest::new(event.clone(), 1_000))
    991             .await
    992             .expect("event ingest");
    993         store
    994             .ingest_event(RadrootsEventIngest::new(event, 1_100).with_observation(observation))
    995             .await
    996             .expect("observation ingest");
    997 
    998         let status = store.status_summary().await.expect("status");
    999         assert_eq!(status.total_events, 1);
   1000         assert_eq!(status.projection_eligible_events, 1);
   1001         assert_eq!(status.relay_observations, 1);
   1002         assert_eq!(status.last_event_seq, Some(1));
   1003         assert_eq!(status.last_event_updated_at_ms, Some(1_000));
   1004     }
   1005 
   1006     #[tokio::test]
   1007     async fn file_store_reopens_existing_schema() {
   1008         let tempdir = tempfile::tempdir().expect("tempdir");
   1009         let path = tempdir.path().join("event_store.sqlite");
   1010 
   1011         let first = RadrootsEventStore::open_file(&path).await.expect("first");
   1012         assert_eq!(first.pragma_foreign_keys().await.expect("foreign_keys"), 1);
   1013         drop(first);
   1014 
   1015         let second = RadrootsEventStore::open_file(&path).await.expect("second");
   1016         assert_eq!(second.pragma_foreign_keys().await.expect("foreign_keys"), 1);
   1017     }
   1018 
   1019     #[tokio::test]
   1020     async fn migration_can_run_down() {
   1021         let store = RadrootsEventStore::open_memory().await.expect("open");
   1022         store.migrate_down().await.expect("down");
   1023 
   1024         let missing = sqlx::query("SELECT COUNT(*) FROM nostr_event")
   1025             .fetch_one(store.pool())
   1026             .await
   1027             .err()
   1028             .expect("table should be removed");
   1029         assert!(missing.to_string().contains("nostr_event"));
   1030     }
   1031 
   1032     #[tokio::test]
   1033     async fn ingest_retains_raw_event_and_ignores_duplicate_rows() {
   1034         let store = RadrootsEventStore::open_memory().await.expect("open");
   1035         let event = signed_event(
   1036             KIND_POST,
   1037             10,
   1038             vec![vec!["t".to_owned(), "soil".to_owned()]],
   1039             "hello",
   1040         );
   1041         let ingest =
   1042             RadrootsEventIngest::new(event.clone(), 1_000).with_raw_json("{\"fixture\":true}");
   1043 
   1044         let first = store
   1045             .ingest_event(ingest.clone())
   1046             .await
   1047             .expect("first ingest");
   1048         let second = store.ingest_event(ingest).await.expect("second ingest");
   1049         let stored = store
   1050             .get_event(event.id.as_str())
   1051             .await
   1052             .expect("get")
   1053             .expect("stored");
   1054 
   1055         assert!(first.inserted);
   1056         assert!(!second.inserted);
   1057         assert_eq!(first.seq, second.seq);
   1058         assert_eq!(
   1059             second.head_decision,
   1060             RadrootsEventHeadStoreDecision::SkippedDuplicate
   1061         );
   1062         assert_eq!(
   1063             first.verification_status,
   1064             RadrootsEventVerificationStatus::Verified
   1065         );
   1066         assert_eq!(stored.seq, first.seq);
   1067         assert_eq!(stored.raw_json, "{\"fixture\":true}");
   1068         assert_eq!(stored.content, "hello");
   1069         assert_eq!(stored.tags_json, "[[\"t\",\"soil\"]]");
   1070         assert_eq!(
   1071             stored.contract_status,
   1072             RadrootsEventContractStatus::Supported
   1073         );
   1074         assert!(stored.projection_eligible);
   1075         assert_eq!(
   1076             store
   1077                 .tags_for_event(event.id.as_str())
   1078                 .await
   1079                 .expect("tags")
   1080                 .len(),
   1081             1
   1082         );
   1083     }
   1084 
   1085     #[tokio::test]
   1086     async fn unsupported_verified_events_are_stored_but_not_projected() {
   1087         let store = RadrootsEventStore::open_memory().await.expect("open");
   1088         let event = signed_event(999, 11, Vec::new(), "unsupported");
   1089         let receipt = store
   1090             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_000))
   1091             .await
   1092             .expect("ingest");
   1093         let stored = store
   1094             .get_event(event.id.as_str())
   1095             .await
   1096             .expect("get")
   1097             .expect("stored");
   1098 
   1099         assert_eq!(
   1100             receipt.contract_status,
   1101             RadrootsEventContractStatus::UnsupportedKind(999)
   1102         );
   1103         assert_eq!(
   1104             stored.verification_status,
   1105             RadrootsEventVerificationStatus::Verified
   1106         );
   1107         assert!(!stored.projection_eligible);
   1108 
   1109         let duplicate = store
   1110             .ingest_event(RadrootsEventIngest::new(event, 2_100))
   1111             .await
   1112             .expect("duplicate");
   1113         assert!(!duplicate.inserted);
   1114         assert_eq!(
   1115             duplicate.head_decision,
   1116             RadrootsEventHeadStoreDecision::Unsupported
   1117         );
   1118     }
   1119 
   1120     #[test]
   1121     fn test_helpers_cover_signature_and_non_head_branches() {
   1122         let mut zero_sig = signed_event(KIND_POST, 12, Vec::new(), "zero");
   1123         zero_sig.sig.replace_range(0..1, "0");
   1124         tamper_signature(&mut zero_sig);
   1125         assert!(zero_sig.sig.starts_with('1'));
   1126 
   1127         let mut nonzero_sig = signed_event(KIND_POST, 12, Vec::new(), "nonzero");
   1128         nonzero_sig.sig.replace_range(0..1, "1");
   1129         tamper_signature(&mut nonzero_sig);
   1130         assert!(nonzero_sig.sig.starts_with('0'));
   1131     }
   1132 
   1133     #[test]
   1134     #[should_panic(expected = "event should select a head")]
   1135     fn head_coordinate_helper_panics_for_regular_events() {
   1136         let event = signed_event(KIND_POST, 12, Vec::new(), "regular");
   1137         let _ = head_coordinate_for_event(&event);
   1138     }
   1139 
   1140     #[tokio::test]
   1141     async fn id_mismatch_events_are_stored_but_not_projected() {
   1142         let store = RadrootsEventStore::open_memory().await.expect("open");
   1143         let mut event = signed_event(KIND_POST, 12, Vec::new(), "hello");
   1144         event.content = "tampered".to_owned();
   1145         let receipt = store
   1146             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_100))
   1147             .await
   1148             .expect("ingest");
   1149         let stored = store
   1150             .get_event(event.id.as_str())
   1151             .await
   1152             .expect("get")
   1153             .expect("stored");
   1154 
   1155         assert_eq!(
   1156             receipt.contract_status,
   1157             RadrootsEventContractStatus::Supported
   1158         );
   1159         assert_eq!(
   1160             receipt.verification_status,
   1161             RadrootsEventVerificationStatus::IdMismatch
   1162         );
   1163         assert_eq!(
   1164             stored.verification_status,
   1165             RadrootsEventVerificationStatus::IdMismatch
   1166         );
   1167         assert!(!stored.projection_eligible);
   1168         assert!(
   1169             store
   1170                 .events_since_cursor("social", 10)
   1171                 .await
   1172                 .expect("events")
   1173                 .is_empty()
   1174         );
   1175     }
   1176 
   1177     #[tokio::test]
   1178     async fn signature_invalid_events_are_stored_but_not_projected() {
   1179         let store = RadrootsEventStore::open_memory().await.expect("open");
   1180         let mut event = signed_event(KIND_POST, 13, Vec::new(), "hello");
   1181         tamper_signature(&mut event);
   1182         let receipt = store
   1183             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_200))
   1184             .await
   1185             .expect("ingest");
   1186         let stored = store
   1187             .get_event(event.id.as_str())
   1188             .await
   1189             .expect("get")
   1190             .expect("stored");
   1191 
   1192         assert_eq!(
   1193             receipt.verification_status,
   1194             RadrootsEventVerificationStatus::SignatureInvalid
   1195         );
   1196         assert_eq!(
   1197             stored.verification_status,
   1198             RadrootsEventVerificationStatus::SignatureInvalid
   1199         );
   1200         assert!(!stored.projection_eligible);
   1201         assert!(
   1202             store
   1203                 .events_since_cursor("social", 10)
   1204                 .await
   1205                 .expect("events")
   1206                 .is_empty()
   1207         );
   1208     }
   1209 
   1210     #[tokio::test]
   1211     async fn malformed_envelope_events_are_stored_but_not_projected() {
   1212         let store = RadrootsEventStore::open_memory().await.expect("open");
   1213         let mut event = signed_event(KIND_POST, 13, Vec::new(), "hello");
   1214         event.kind = u32::from(u16::MAX) + 1;
   1215 
   1216         let receipt = store
   1217             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_250))
   1218             .await
   1219             .expect("ingest");
   1220         let stored = store
   1221             .get_event(event.id.as_str())
   1222             .await
   1223             .expect("get")
   1224             .expect("stored");
   1225 
   1226         assert_eq!(
   1227             receipt.verification_status,
   1228             RadrootsEventVerificationStatus::MalformedEnvelope
   1229         );
   1230         assert_eq!(
   1231             stored.verification_status,
   1232             RadrootsEventVerificationStatus::MalformedEnvelope
   1233         );
   1234         assert!(!stored.projection_eligible);
   1235     }
   1236 
   1237     #[tokio::test]
   1238     async fn ephemeral_events_are_not_persisted_as_heads() {
   1239         let store = RadrootsEventStore::open_memory().await.expect("open");
   1240         let event = signed_event(KIND_GEOCHAT, 15, Vec::new(), "hello");
   1241 
   1242         let receipt = store
   1243             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_260))
   1244             .await
   1245             .expect("ingest");
   1246         let stored = store
   1247             .get_event(event.id.as_str())
   1248             .await
   1249             .expect("get")
   1250             .expect("stored");
   1251 
   1252         assert_eq!(
   1253             receipt.contract_status,
   1254             RadrootsEventContractStatus::Supported
   1255         );
   1256         assert_eq!(
   1257             receipt.head_decision,
   1258             RadrootsEventHeadStoreDecision::NotProjectionEligible
   1259         );
   1260         assert!(!receipt.projection_eligible);
   1261         assert!(!stored.projection_eligible);
   1262     }
   1263 
   1264     #[tokio::test]
   1265     async fn event_head_helper_maps_not_persisted_candidates() {
   1266         let store = RadrootsEventStore::open_memory().await.expect("open");
   1267         let event = signed_event(KIND_GEOCHAT, 17, Vec::new(), "hello");
   1268         let classification = classify_event(&event);
   1269         let contract = classification.contract.expect("contract");
   1270         let mut tx = store.pool.begin().await.expect("tx");
   1271 
   1272         let head = apply_event_head(&mut tx, &event, contract, 2_280)
   1273             .await
   1274             .expect("head");
   1275 
   1276         assert_eq!(head.decision, RadrootsEventHeadStoreDecision::NotPersisted);
   1277         assert!(!head.projection_eligible);
   1278     }
   1279 
   1280     #[tokio::test]
   1281     async fn malformed_addressable_heads_are_not_projected() {
   1282         let store = RadrootsEventStore::open_memory().await.expect("open");
   1283         let event = signed_event(KIND_LISTING, 16, Vec::new(), "{}");
   1284 
   1285         let receipt = store
   1286             .ingest_event(RadrootsEventIngest::new(event.clone(), 2_270))
   1287             .await
   1288             .expect("ingest");
   1289         let stored = store
   1290             .get_event(event.id.as_str())
   1291             .await
   1292             .expect("get")
   1293             .expect("stored");
   1294 
   1295         assert_eq!(
   1296             receipt.contract_status,
   1297             RadrootsEventContractStatus::Supported
   1298         );
   1299         assert_eq!(
   1300             receipt.head_decision,
   1301             RadrootsEventHeadStoreDecision::Malformed
   1302         );
   1303         assert!(!receipt.projection_eligible);
   1304         assert!(!stored.projection_eligible);
   1305     }
   1306 
   1307     #[tokio::test]
   1308     async fn id_mismatch_addressable_events_do_not_update_heads() {
   1309         let store = RadrootsEventStore::open_memory().await.expect("open");
   1310         let original = signed_event(KIND_LISTING, 17, listing_tags("listing-1"), "{}");
   1311         let first = store
   1312             .ingest_event(RadrootsEventIngest::new(original.clone(), 2_300))
   1313             .await
   1314             .expect("first");
   1315         let coordinate = head_coordinate_for_event(&original);
   1316         let mut invalid = signed_event(KIND_LISTING, 18, listing_tags("listing-1"), "{}");
   1317         invalid.content = "{\"tampered\":true}".to_owned();
   1318 
   1319         let receipt = store
   1320             .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_400))
   1321             .await
   1322             .expect("invalid");
   1323         let stored = store
   1324             .get_event(invalid.id.as_str())
   1325             .await
   1326             .expect("get")
   1327             .expect("stored");
   1328         let head = store
   1329             .event_head(&coordinate)
   1330             .await
   1331             .expect("head")
   1332             .expect("stored head");
   1333 
   1334         assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied);
   1335         assert_eq!(
   1336             receipt.verification_status,
   1337             RadrootsEventVerificationStatus::IdMismatch
   1338         );
   1339         assert_eq!(
   1340             receipt.head_decision,
   1341             RadrootsEventHeadStoreDecision::NotProjectionEligible
   1342         );
   1343         assert!(!receipt.projection_eligible);
   1344         assert!(!stored.projection_eligible);
   1345         assert_eq!(head.event_id, original.id);
   1346     }
   1347 
   1348     #[tokio::test]
   1349     async fn signature_invalid_addressable_events_do_not_update_heads() {
   1350         let store = RadrootsEventStore::open_memory().await.expect("open");
   1351         let original = signed_event(KIND_LISTING, 19, listing_tags("listing-2"), "{}");
   1352         store
   1353             .ingest_event(RadrootsEventIngest::new(original.clone(), 2_500))
   1354             .await
   1355             .expect("first");
   1356         let coordinate = head_coordinate_for_event(&original);
   1357         let mut invalid = signed_event(KIND_LISTING, 20, listing_tags("listing-2"), "{}");
   1358         tamper_signature(&mut invalid);
   1359 
   1360         let receipt = store
   1361             .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_600))
   1362             .await
   1363             .expect("invalid");
   1364         let head = store
   1365             .event_head(&coordinate)
   1366             .await
   1367             .expect("head")
   1368             .expect("stored head");
   1369 
   1370         assert_eq!(
   1371             receipt.verification_status,
   1372             RadrootsEventVerificationStatus::SignatureInvalid
   1373         );
   1374         assert_eq!(
   1375             receipt.head_decision,
   1376             RadrootsEventHeadStoreDecision::NotProjectionEligible
   1377         );
   1378         assert!(!receipt.projection_eligible);
   1379         assert_eq!(head.event_id, original.id);
   1380     }
   1381 
   1382     #[tokio::test]
   1383     async fn duplicate_invalid_addressable_events_do_not_update_heads() {
   1384         let store = RadrootsEventStore::open_memory().await.expect("open");
   1385         let original = signed_event(KIND_LISTING, 21, listing_tags("listing-3"), "{}");
   1386         store
   1387             .ingest_event(RadrootsEventIngest::new(original.clone(), 2_700))
   1388             .await
   1389             .expect("original");
   1390         let coordinate = head_coordinate_for_event(&original);
   1391         let mut invalid = signed_event(KIND_LISTING, 22, listing_tags("listing-3"), "{}");
   1392         invalid.content = "{\"tampered\":true}".to_owned();
   1393 
   1394         let first_invalid = store
   1395             .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_800))
   1396             .await
   1397             .expect("first invalid");
   1398         let second_invalid = store
   1399             .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_900))
   1400             .await
   1401             .expect("second invalid");
   1402         let head = store
   1403             .event_head(&coordinate)
   1404             .await
   1405             .expect("head")
   1406             .expect("stored head");
   1407 
   1408         assert!(first_invalid.inserted);
   1409         assert!(!second_invalid.inserted);
   1410         assert_eq!(first_invalid.seq, second_invalid.seq);
   1411         assert_eq!(
   1412             first_invalid.head_decision,
   1413             RadrootsEventHeadStoreDecision::NotProjectionEligible
   1414         );
   1415         assert_eq!(
   1416             second_invalid.head_decision,
   1417             RadrootsEventHeadStoreDecision::SkippedDuplicate
   1418         );
   1419         assert_eq!(head.event_id, original.id);
   1420     }
   1421 
   1422     #[tokio::test]
   1423     async fn duplicate_verified_addressable_events_preserve_heads() {
   1424         let store = RadrootsEventStore::open_memory().await.expect("open");
   1425         let event = signed_event(KIND_LISTING, 23, listing_tags("listing-4"), "{}");
   1426         let coordinate = head_coordinate_for_event(&event);
   1427 
   1428         let first = store
   1429             .ingest_event(RadrootsEventIngest::new(event.clone(), 3_000))
   1430             .await
   1431             .expect("first");
   1432         let second = store
   1433             .ingest_event(RadrootsEventIngest::new(event.clone(), 3_100))
   1434             .await
   1435             .expect("second");
   1436         let head = store
   1437             .event_head(&coordinate)
   1438             .await
   1439             .expect("head")
   1440             .expect("stored head");
   1441 
   1442         assert!(first.inserted);
   1443         assert!(!second.inserted);
   1444         assert_eq!(first.seq, second.seq);
   1445         assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied);
   1446         assert_eq!(
   1447             second.head_decision,
   1448             RadrootsEventHeadStoreDecision::SkippedDuplicate
   1449         );
   1450         assert_eq!(head.event_id, event.id);
   1451     }
   1452 
   1453     #[tokio::test]
   1454     async fn verified_regular_events_remain_projection_eligible_without_head_selection() {
   1455         let store = RadrootsEventStore::open_memory().await.expect("open");
   1456         let event = signed_event(KIND_POST, 24, Vec::new(), "hello");
   1457 
   1458         let receipt = store
   1459             .ingest_event(RadrootsEventIngest::new(event.clone(), 3_200))
   1460             .await
   1461             .expect("ingest");
   1462         let stored = store
   1463             .get_event(event.id.as_str())
   1464             .await
   1465             .expect("get")
   1466             .expect("stored");
   1467 
   1468         assert_eq!(
   1469             receipt.verification_status,
   1470             RadrootsEventVerificationStatus::Verified
   1471         );
   1472         assert_eq!(
   1473             receipt.head_decision,
   1474             RadrootsEventHeadStoreDecision::NotHeadSelected
   1475         );
   1476         assert!(receipt.projection_eligible);
   1477         assert!(stored.projection_eligible);
   1478     }
   1479 
   1480     #[tokio::test]
   1481     async fn events_by_tag_validates_inputs_and_returns_projection_events_in_sequence_order() {
   1482         let store = RadrootsEventStore::open_memory().await.expect("store");
   1483 
   1484         assert!(matches!(
   1485             store.events_by_tag("", "soil", 1).await,
   1486             Err(RadrootsEventStoreError::EmptyTagName)
   1487         ));
   1488         assert!(matches!(
   1489             store.events_by_tag("t", "soil", 0).await,
   1490             Err(RadrootsEventStoreError::QueryLimitOutOfRange { .. })
   1491         ));
   1492         assert!(matches!(
   1493             store
   1494                 .events_by_tag("t", "soil", RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX + 1)
   1495                 .await,
   1496             Err(RadrootsEventStoreError::QueryLimitOutOfRange { .. })
   1497         ));
   1498 
   1499         let unsupported = signed_event(
   1500             999,
   1501             40,
   1502             vec![vec!["t".to_owned(), "soil".to_owned()]],
   1503             "unsupported",
   1504         );
   1505         let high_created_at = signed_event(
   1506             KIND_POST,
   1507             60,
   1508             vec![
   1509                 vec!["t".to_owned(), "soil".to_owned()],
   1510                 vec!["t".to_owned(), "soil".to_owned()],
   1511             ],
   1512             "high-created-at",
   1513         );
   1514         let low_created_at = signed_event(
   1515             KIND_POST,
   1516             50,
   1517             vec![vec!["t".to_owned(), "soil".to_owned()]],
   1518             "low-created-at",
   1519         );
   1520 
   1521         store
   1522             .ingest_event(RadrootsEventIngest::new(unsupported.clone(), 3_300))
   1523             .await
   1524             .expect("unsupported ingest");
   1525         store
   1526             .ingest_event(RadrootsEventIngest::new(high_created_at.clone(), 3_400))
   1527             .await
   1528             .expect("high ingest");
   1529         store
   1530             .ingest_event(RadrootsEventIngest::new(low_created_at.clone(), 3_500))
   1531             .await
   1532             .expect("low ingest");
   1533 
   1534         let events = store
   1535             .events_by_tag("t", "soil", 10)
   1536             .await
   1537             .expect("tag query");
   1538         assert_eq!(events.len(), 2);
   1539         assert_eq!(events[0].event_id, high_created_at.id);
   1540         assert_eq!(events[1].event_id, low_created_at.id);
   1541         assert!(events.iter().all(|event| event.projection_eligible));
   1542 
   1543         let limited = store
   1544             .events_by_tag("t", "soil", 1)
   1545             .await
   1546             .expect("limited tag query");
   1547         assert_eq!(limited.len(), 1);
   1548         assert_eq!(limited[0].event_id, high_created_at.id);
   1549     }
   1550 
   1551     #[tokio::test]
   1552     async fn events_by_contract_and_tag_enforces_contract_tag_and_projection_filters() {
   1553         let store = RadrootsEventStore::open_memory().await.expect("store");
   1554 
   1555         assert!(matches!(
   1556             store
   1557                 .events_by_contract_and_tag::<&str>(&[], "p", FIXTURE_ALICE_PUBLIC_KEY_HEX, 1)
   1558                 .await,
   1559             Err(RadrootsEventStoreError::EmptyContractList)
   1560         ));
   1561         let too_many_contracts =
   1562             vec!["radroots.order.request.v1"; RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX + 1];
   1563         assert!(matches!(
   1564             store
   1565                 .events_by_contract_and_tag(
   1566                     too_many_contracts.as_slice(),
   1567                     "p",
   1568                     FIXTURE_ALICE_PUBLIC_KEY_HEX,
   1569                     1,
   1570                 )
   1571                 .await,
   1572             Err(RadrootsEventStoreError::ContractListTooLarge { .. })
   1573         ));
   1574 
   1575         let matching_order = signed_event(
   1576             KIND_ORDER_REQUEST,
   1577             70,
   1578             vec![
   1579                 vec!["d".to_owned(), "order-1".to_owned()],
   1580                 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()],
   1581             ],
   1582             "{}",
   1583         );
   1584         let wrong_tag_order = signed_event(
   1585             KIND_ORDER_REQUEST,
   1586             71,
   1587             vec![
   1588                 vec!["d".to_owned(), "order-2".to_owned()],
   1589                 vec!["p".to_owned(), event_id('b')],
   1590             ],
   1591             "{}",
   1592         );
   1593         let same_tag_wrong_contract = signed_event(
   1594             KIND_POST,
   1595             72,
   1596             vec![vec![
   1597                 "p".to_owned(),
   1598                 FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned(),
   1599             ]],
   1600             "hello",
   1601         );
   1602         let unsupported_same_tag = signed_event(
   1603             999,
   1604             73,
   1605             vec![vec![
   1606                 "p".to_owned(),
   1607                 FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned(),
   1608             ]],
   1609             "unsupported",
   1610         );
   1611 
   1612         for (event, observed_at_ms) in [
   1613             (matching_order.clone(), 3_600),
   1614             (wrong_tag_order, 3_700),
   1615             (same_tag_wrong_contract, 3_800),
   1616             (unsupported_same_tag, 3_900),
   1617         ] {
   1618             store
   1619                 .ingest_event(RadrootsEventIngest::new(event, observed_at_ms))
   1620                 .await
   1621                 .expect("ingest");
   1622         }
   1623 
   1624         let events = store
   1625             .events_by_contract_and_tag(
   1626                 &["radroots.order.request.v1"],
   1627                 "p",
   1628                 FIXTURE_ALICE_PUBLIC_KEY_HEX,
   1629                 10,
   1630             )
   1631             .await
   1632             .expect("contract tag query");
   1633         assert_eq!(events.len(), 1);
   1634         assert_eq!(events[0].event_id, matching_order.id);
   1635         assert_eq!(
   1636             events[0].contract_id.as_deref(),
   1637             Some("radroots.order.request.v1")
   1638         );
   1639         assert!(events[0].projection_eligible);
   1640     }
   1641 
   1642     #[tokio::test]
   1643     async fn tag_rows_preserve_order_and_contract_metadata() {
   1644         let store = RadrootsEventStore::open_memory().await.expect("open");
   1645         let event = signed_event(
   1646             KIND_PROFILE,
   1647             14,
   1648             vec![
   1649                 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()],
   1650                 vec!["t".to_owned(), "harvest".to_owned()],
   1651             ],
   1652             "{}",
   1653         );
   1654 
   1655         store
   1656             .ingest_event(RadrootsEventIngest::new(event.clone(), 3_000))
   1657             .await
   1658             .expect("ingest");
   1659         let tags = store.tags_for_event(event.id.as_str()).await.expect("tags");
   1660 
   1661         assert_eq!(tags[0].tag_index, 0);
   1662         assert_eq!(tags[0].tag_name, "p");
   1663         assert_eq!(tags[0].contract_value_type.as_deref(), Some("public_key"));
   1664         assert!(tags[0].relay_indexed);
   1665         assert_eq!(tags[1].tag_index, 1);
   1666         assert_eq!(tags[1].tag_json, "[\"t\",\"harvest\"]");
   1667     }
   1668 
   1669     #[tokio::test]
   1670     async fn listing_event_tag_persists_event_pointer_contract_metadata() {
   1671         let store = RadrootsEventStore::open_memory().await.expect("open");
   1672         let listing_event_id = event_id('f');
   1673         let event = signed_event(
   1674             KIND_ORDER_REQUEST,
   1675             16,
   1676             vec![
   1677                 vec!["d".to_owned(), "order-1".to_owned()],
   1678                 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()],
   1679                 vec![
   1680                     "a".to_owned(),
   1681                     format!(
   1682                         "{KIND_LISTING}:{}:AAAAAAAAAAAAAAAAAAAAAg",
   1683                         FIXTURE_ALICE_PUBLIC_KEY_HEX
   1684                     ),
   1685                 ],
   1686                 vec![
   1687                     "listing_event".to_owned(),
   1688                     listing_event_id.clone(),
   1689                     "wss://relay.example.com".to_owned(),
   1690                 ],
   1691             ],
   1692             "{}",
   1693         );
   1694 
   1695         store
   1696             .ingest_event(RadrootsEventIngest::new(event.clone(), 3_100))
   1697             .await
   1698             .expect("ingest");
   1699         let tags = store.tags_for_event(event.id.as_str()).await.expect("tags");
   1700         let listing_tag = tags
   1701             .iter()
   1702             .find(|tag| tag.tag_name == "listing_event")
   1703             .expect("listing event tag");
   1704 
   1705         assert_eq!(
   1706             listing_tag.tag_value.as_deref(),
   1707             Some(listing_event_id.as_str())
   1708         );
   1709         assert_eq!(
   1710             listing_tag.contract_semantic.as_deref(),
   1711             Some("listing_snapshot")
   1712         );
   1713         assert_eq!(
   1714             listing_tag.contract_value_type.as_deref(),
   1715             Some("event_pointer")
   1716         );
   1717         assert!(!listing_tag.relay_indexed);
   1718     }
   1719 
   1720     #[tokio::test]
   1721     async fn relay_observations_upsert_separately_from_event_identity() {
   1722         let store = RadrootsEventStore::open_memory().await.expect("open");
   1723         let event = signed_event(KIND_POST, 15, Vec::new(), "hello");
   1724         let observation = RadrootsRelayObservation::new(
   1725             "wss://relay.local",
   1726             crate::RadrootsRelayObservationType::Subscription,
   1727             4_000,
   1728         );
   1729         let ingest = RadrootsEventIngest::new(event.clone(), 4_000).with_observation(observation);
   1730         store.ingest_event(ingest).await.expect("first");
   1731         let observation = RadrootsRelayObservation::new(
   1732             "wss://relay.local",
   1733             crate::RadrootsRelayObservationType::Subscription,
   1734             4_100,
   1735         )
   1736         .with_message("duplicate accepted");
   1737         let ingest = RadrootsEventIngest::new(event.clone(), 4_100).with_observation(observation);
   1738         store.ingest_event(ingest).await.expect("second");
   1739 
   1740         let observations = store
   1741             .observations_for_event(event.id.as_str())
   1742             .await
   1743             .expect("observations");
   1744         assert_eq!(observations.len(), 1);
   1745         assert_eq!(observations[0].observation_count, 2);
   1746         assert_eq!(observations[0].last_seen_at_ms, 4_100);
   1747         assert_eq!(
   1748             observations[0].last_message.as_deref(),
   1749             Some("duplicate accepted")
   1750         );
   1751     }
   1752 
   1753     #[tokio::test]
   1754     async fn event_heads_use_protocol_tie_breaks() {
   1755         let mut events = [
   1756             signed_event(KIND_PROFILE, 20, Vec::new(), "{\"name\":\"a\"}"),
   1757             signed_event(KIND_PROFILE, 20, Vec::new(), "{\"name\":\"b\"}"),
   1758         ];
   1759         events.sort_by(|left, right| left.id.cmp(&right.id));
   1760         let lower = events[0].clone();
   1761         let higher = events[1].clone();
   1762 
   1763         let store = RadrootsEventStore::open_memory().await.expect("open");
   1764         let first = store
   1765             .ingest_event(RadrootsEventIngest::new(higher.clone(), 5_000))
   1766             .await
   1767             .expect("first");
   1768         let second = store
   1769             .ingest_event(RadrootsEventIngest::new(lower.clone(), 5_100))
   1770             .await
   1771             .expect("second");
   1772         let head = store
   1773             .event_head(&profile_coordinate())
   1774             .await
   1775             .expect("head")
   1776             .expect("stored head");
   1777 
   1778         assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied);
   1779         assert_eq!(
   1780             second.head_decision,
   1781             RadrootsEventHeadStoreDecision::Applied
   1782         );
   1783         assert_eq!(head.event_id, lower.id);
   1784 
   1785         let store = RadrootsEventStore::open_memory().await.expect("open");
   1786         store
   1787             .ingest_event(RadrootsEventIngest::new(lower.clone(), 5_200))
   1788             .await
   1789             .expect("first");
   1790         let second = store
   1791             .ingest_event(RadrootsEventIngest::new(higher, 5_300))
   1792             .await
   1793             .expect("second");
   1794         let head = store
   1795             .event_head(&profile_coordinate())
   1796             .await
   1797             .expect("head")
   1798             .expect("stored head");
   1799 
   1800         assert_eq!(
   1801             second.head_decision,
   1802             RadrootsEventHeadStoreDecision::SkippedSameTimestampHigherEventId
   1803         );
   1804         assert_eq!(head.event_id, lower.id);
   1805     }
   1806 
   1807     #[tokio::test]
   1808     async fn projection_cursors_replay_by_store_sequence() {
   1809         let store = RadrootsEventStore::open_memory().await.expect("open");
   1810         let first = signed_event(KIND_POST, 30, Vec::new(), "one");
   1811         let second = signed_event(KIND_POST, 30, Vec::new(), "two");
   1812         let first_receipt = store
   1813             .ingest_event(RadrootsEventIngest::new(first.clone(), 6_000))
   1814             .await
   1815             .expect("first");
   1816         let second_receipt = store
   1817             .ingest_event(RadrootsEventIngest::new(second.clone(), 6_100))
   1818             .await
   1819             .expect("second");
   1820         assert!(first_receipt.seq < second_receipt.seq);
   1821 
   1822         let replay = store
   1823             .events_since_cursor("social", 10)
   1824             .await
   1825             .expect("initial replay");
   1826         assert_eq!(replay.len(), 2);
   1827         assert_eq!(replay[0].event_id, first.id);
   1828         assert_eq!(replay[1].event_id, second.id);
   1829         store
   1830             .update_projection_cursor(&RadrootsProjectionCursor {
   1831                 projection_id: "social".to_owned(),
   1832                 projection_version: 1,
   1833                 last_event_seq: first_receipt.seq,
   1834                 updated_at_ms: 6_200,
   1835             })
   1836             .await
   1837             .expect("cursor");
   1838         let replay = store
   1839             .events_since_cursor("social", 10)
   1840             .await
   1841             .expect("next replay");
   1842         assert_eq!(replay.len(), 1);
   1843         assert_eq!(replay[0].event_id, second.id);
   1844     }
   1845 
   1846     #[tokio::test]
   1847     async fn smoke_event_store_ingests_and_replays_ten_thousand_events() {
   1848         let store = RadrootsEventStore::open_memory().await.expect("open");
   1849         for index in 0..10_000u32 {
   1850             let event = signed_event(
   1851                 KIND_POST,
   1852                 10_000 + index,
   1853                 vec![vec!["t".to_owned(), "smoke".to_owned()]],
   1854                 format!("smoke-{index}").as_str(),
   1855             );
   1856             let receipt = store
   1857                 .ingest_event(RadrootsEventIngest::new(event, 10_000 + i64::from(index)))
   1858                 .await
   1859                 .expect("ingest");
   1860             assert!(receipt.inserted);
   1861             assert_eq!(
   1862                 receipt.verification_status,
   1863                 RadrootsEventVerificationStatus::Verified
   1864             );
   1865         }
   1866 
   1867         let replay = store
   1868             .events_since_cursor("smoke", 10_000)
   1869             .await
   1870             .expect("replay");
   1871         assert_eq!(replay.len(), 10_000);
   1872         assert_eq!(replay[0].seq, 1);
   1873         assert_eq!(replay[9_999].seq, 10_000);
   1874 
   1875         store
   1876             .update_projection_cursor(&RadrootsProjectionCursor {
   1877                 projection_id: "smoke".to_owned(),
   1878                 projection_version: 1,
   1879                 last_event_seq: replay[4_999].seq,
   1880                 updated_at_ms: 25_000,
   1881             })
   1882             .await
   1883             .expect("cursor");
   1884         let replay = store
   1885             .events_since_cursor("smoke", 10_000)
   1886             .await
   1887             .expect("replay after cursor");
   1888         assert_eq!(replay.len(), 5_000);
   1889         assert_eq!(replay[0].seq, 5_001);
   1890         assert_eq!(replay[4_999].seq, 10_000);
   1891     }
   1892 }