store.rs (70136B)
1 use crate::RadrootsEventStoreError; 2 use crate::migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP}; 3 use crate::model::{ 4 RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest, 5 RadrootsEventIngestReceipt, RadrootsEventStoreStatusSummary, RadrootsEventVerificationStatus, 6 RadrootsProjectionCursor, RadrootsRelayObservation, RadrootsStoredEvent, 7 RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass, tag_semantic_name, 8 tag_value_type_name, 9 }; 10 use radroots_events::RadrootsNostrEvent; 11 use radroots_events::contract::{ 12 RadrootsEventClass, RadrootsEventContract, identify_event_contract, 13 }; 14 use radroots_events::event_head::{ 15 RadrootsCurrentEventHead, RadrootsEventHeadCandidate, RadrootsEventHeadCandidateResult, 16 RadrootsEventHeadCoordinate, RadrootsEventHeadDecision, event_head_candidate_for_contract, 17 select_event_head, 18 }; 19 use radroots_events::ids::{RadrootsEventId, RadrootsEventSignature, RadrootsPublicKey}; 20 use radroots_nostr::prelude::{RadrootsNostrEventVerification, radroots_nostr_verify_event}; 21 use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; 22 use sqlx::{Row, SqlitePool}; 23 use std::path::Path; 24 use std::str::FromStr; 25 26 pub const RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX: u32 = 1_000; 27 pub const RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX: usize = 16; 28 29 #[derive(Clone)] 30 pub struct RadrootsEventStore { 31 pool: SqlitePool, 32 } 33 34 impl RadrootsEventStore { 35 pub async fn open_memory() -> Result<Self, RadrootsEventStoreError> { 36 let options = SqliteConnectOptions::from_str("sqlite::memory:")?; 37 let pool = SqlitePoolOptions::new() 38 .max_connections(1) 39 .connect_with(options) 40 .await?; 41 configure_connection(&pool, false).await?; 42 apply_up(&pool).await?; 43 Ok(Self { pool }) 44 } 45 46 pub async fn open_file(path: impl AsRef<Path>) -> Result<Self, RadrootsEventStoreError> { 47 let options = SqliteConnectOptions::new() 48 .filename(path) 49 .create_if_missing(true); 50 let pool = SqlitePoolOptions::new() 51 .max_connections(1) 52 .connect_with(options) 53 .await?; 54 configure_connection(&pool, true).await?; 55 apply_up(&pool).await?; 56 Ok(Self { pool }) 57 } 58 59 pub fn pool(&self) -> &SqlitePool { 60 &self.pool 61 } 62 63 pub async fn migrate_down(&self) -> Result<(), RadrootsEventStoreError> { 64 apply_down(&self.pool).await 65 } 66 67 pub async fn pragma_foreign_keys(&self) -> Result<i64, RadrootsEventStoreError> { 68 query_i64(&self.pool, "PRAGMA foreign_keys").await 69 } 70 71 pub async fn pragma_busy_timeout(&self) -> Result<i64, RadrootsEventStoreError> { 72 query_i64(&self.pool, "PRAGMA busy_timeout").await 73 } 74 75 pub async fn pragma_journal_mode(&self) -> Result<String, RadrootsEventStoreError> { 76 query_string(&self.pool, "PRAGMA journal_mode").await 77 } 78 79 pub async fn status_summary( 80 &self, 81 ) -> Result<RadrootsEventStoreStatusSummary, RadrootsEventStoreError> { 82 let row = sqlx::query( 83 "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN projection_eligible = 1 THEN 1 ELSE 0 END), 0) AS projection_eligible_events, MAX(seq) AS last_event_seq, MAX(updated_at_ms) AS last_event_updated_at_ms FROM nostr_event", 84 ) 85 .fetch_one(&self.pool) 86 .await?; 87 let relay_observations = 88 query_i64(&self.pool, "SELECT COUNT(*) FROM relay_event_seen").await?; 89 Ok(RadrootsEventStoreStatusSummary { 90 total_events: row.try_get("total_events")?, 91 projection_eligible_events: row.try_get("projection_eligible_events")?, 92 relay_observations, 93 last_event_seq: row.try_get("last_event_seq")?, 94 last_event_updated_at_ms: row.try_get("last_event_updated_at_ms")?, 95 }) 96 } 97 98 pub async fn ingest_event( 99 &self, 100 ingest: RadrootsEventIngest, 101 ) -> Result<RadrootsEventIngestReceipt, RadrootsEventStoreError> { 102 validate_event_identity(&ingest.event)?; 103 let verification_status = verify_event(&ingest.event); 104 let classification = classify_event(&ingest.event); 105 let raw_json = ingest 106 .raw_json 107 .clone() 108 .map(Ok) 109 .unwrap_or_else(|| serde_json::to_string(&ingest.event))?; 110 let tags_json = serde_json::to_string(&ingest.event.tags)?; 111 let mut tx = self.pool.begin().await?; 112 let insert = insert_raw_event( 113 &mut tx, 114 &ingest, 115 &classification, 116 verification_status, 117 raw_json.as_str(), 118 tags_json.as_str(), 119 ) 120 .await?; 121 let inserted = insert.inserted; 122 let mut head_decision = RadrootsEventHeadStoreDecision::Unsupported; 123 let mut projection_eligible = classification.base_projection_eligible(verification_status); 124 125 if inserted { 126 insert_tags(&mut tx, &ingest.event, classification.contract).await?; 127 if let Some(contract) = classification.contract { 128 if projection_eligible { 129 let head = 130 apply_event_head(&mut tx, &ingest.event, contract, ingest.observed_at_ms) 131 .await?; 132 projection_eligible = head.projection_eligible; 133 head_decision = head.decision; 134 sqlx::query( 135 "UPDATE nostr_event SET projection_eligible = ?, updated_at_ms = ? WHERE event_id = ?", 136 ) 137 .bind(bool_i64(projection_eligible)) 138 .bind(ingest.observed_at_ms) 139 .bind(ingest.event.id.as_str()) 140 .execute(&mut *tx) 141 .await?; 142 } else { 143 head_decision = RadrootsEventHeadStoreDecision::NotProjectionEligible; 144 } 145 } 146 } else if classification.contract.is_some() { 147 head_decision = RadrootsEventHeadStoreDecision::SkippedDuplicate; 148 projection_eligible = false; 149 } 150 151 if let Some(observation) = ingest.relay_observation.as_ref() { 152 upsert_observation(&mut tx, ingest.event.id.as_str(), observation).await?; 153 } 154 155 tx.commit().await?; 156 157 Ok(RadrootsEventIngestReceipt { 158 seq: insert.seq, 159 event_id: ingest.event.id, 160 inserted, 161 verification_status, 162 contract_status: classification.contract_status, 163 contract_id: classification 164 .contract 165 .map(|contract| contract.id.to_owned()), 166 projection_eligible, 167 head_decision, 168 }) 169 } 170 171 pub async fn get_event( 172 &self, 173 event_id: &str, 174 ) -> Result<Option<RadrootsStoredEvent>, RadrootsEventStoreError> { 175 let row = sqlx::query( 176 "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE event_id = ?", 177 ) 178 .bind(event_id) 179 .fetch_optional(&self.pool) 180 .await?; 181 row.map(stored_event_from_row).transpose() 182 } 183 184 pub async fn tags_for_event( 185 &self, 186 event_id: &str, 187 ) -> Result<Vec<RadrootsStoredEventTag>, RadrootsEventStoreError> { 188 let rows = sqlx::query( 189 "SELECT event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed FROM nostr_event_tag WHERE event_id = ? ORDER BY tag_index", 190 ) 191 .bind(event_id) 192 .fetch_all(&self.pool) 193 .await?; 194 rows.into_iter().map(stored_tag_from_row).collect() 195 } 196 197 pub async fn observations_for_event( 198 &self, 199 event_id: &str, 200 ) -> Result<Vec<RadrootsRelayObservationRow>, RadrootsEventStoreError> { 201 let rows = sqlx::query( 202 "SELECT event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message FROM relay_event_seen WHERE event_id = ? ORDER BY relay_url, observation_type", 203 ) 204 .bind(event_id) 205 .fetch_all(&self.pool) 206 .await?; 207 rows.into_iter().map(relay_observation_from_row).collect() 208 } 209 210 pub async fn event_head( 211 &self, 212 coordinate: &RadrootsEventHeadCoordinate, 213 ) -> Result<Option<RadrootsStoredEventHead>, RadrootsEventStoreError> { 214 let row = match coordinate { 215 RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { 216 sqlx::query( 217 "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", 218 ) 219 .bind(i64::from(*kind)) 220 .bind(pubkey.as_str()) 221 .fetch_optional(&self.pool) 222 .await? 223 } 224 RadrootsEventHeadCoordinate::Addressable { 225 kind, 226 pubkey, 227 d_tag, 228 } => { 229 sqlx::query( 230 "SELECT coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", 231 ) 232 .bind(i64::from(*kind)) 233 .bind(pubkey.as_str()) 234 .bind(d_tag.as_str()) 235 .fetch_optional(&self.pool) 236 .await? 237 } 238 }; 239 row.map(stored_head_from_row).transpose() 240 } 241 242 pub async fn get_projection_cursor( 243 &self, 244 projection_id: &str, 245 ) -> Result<Option<RadrootsProjectionCursor>, RadrootsEventStoreError> { 246 let row = sqlx::query( 247 "SELECT projection_id, projection_version, last_event_seq, updated_at_ms FROM projection_cursor WHERE projection_id = ?", 248 ) 249 .bind(projection_id) 250 .fetch_optional(&self.pool) 251 .await?; 252 row.map(projection_cursor_from_row).transpose() 253 } 254 255 pub async fn update_projection_cursor( 256 &self, 257 cursor: &RadrootsProjectionCursor, 258 ) -> Result<(), RadrootsEventStoreError> { 259 sqlx::query( 260 "INSERT INTO projection_cursor(projection_id, projection_version, last_event_seq, updated_at_ms) VALUES (?, ?, ?, ?) ON CONFLICT(projection_id) DO UPDATE SET projection_version = excluded.projection_version, last_event_seq = excluded.last_event_seq, updated_at_ms = excluded.updated_at_ms", 261 ) 262 .bind(cursor.projection_id.as_str()) 263 .bind(i64::from(cursor.projection_version)) 264 .bind(cursor.last_event_seq) 265 .bind(cursor.updated_at_ms) 266 .execute(&self.pool) 267 .await?; 268 Ok(()) 269 } 270 271 pub async fn events_since_cursor( 272 &self, 273 projection_id: &str, 274 limit: u32, 275 ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> { 276 let cursor = self.get_projection_cursor(projection_id).await?; 277 let last_event_seq = cursor 278 .as_ref() 279 .map(|cursor| cursor.last_event_seq) 280 .unwrap_or(0); 281 let rows = sqlx::query( 282 "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event WHERE projection_eligible = 1 AND seq > ? ORDER BY seq ASC LIMIT ?", 283 ) 284 .bind(last_event_seq) 285 .bind(i64::from(limit)) 286 .fetch_all(&self.pool) 287 .await?; 288 rows.into_iter().map(stored_event_from_row).collect() 289 } 290 291 pub async fn events_by_tag( 292 &self, 293 tag_name: &str, 294 tag_value: &str, 295 limit: u32, 296 ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> { 297 validate_tag_query(tag_name, limit)?; 298 let rows = sqlx::query( 299 "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event AS event WHERE projection_eligible = 1 AND EXISTS (SELECT 1 FROM nostr_event_tag AS tag WHERE tag.event_id = event.event_id AND tag.tag_name = ? AND tag.tag_value = ?) ORDER BY event.seq ASC LIMIT ?", 300 ) 301 .bind(tag_name) 302 .bind(tag_value) 303 .bind(i64::from(limit)) 304 .fetch_all(&self.pool) 305 .await?; 306 rows.into_iter().map(stored_event_from_row).collect() 307 } 308 309 pub async fn events_by_contract_and_tag<S>( 310 &self, 311 contract_ids: &[S], 312 tag_name: &str, 313 tag_value: &str, 314 limit: u32, 315 ) -> Result<Vec<RadrootsStoredEvent>, RadrootsEventStoreError> 316 where 317 S: AsRef<str>, 318 { 319 validate_contract_tag_query(contract_ids, tag_name, limit)?; 320 let placeholders = core::iter::repeat_n("?", contract_ids.len()) 321 .collect::<Vec<_>>() 322 .join(", "); 323 let sql = format!( 324 "SELECT seq, event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms FROM nostr_event AS event WHERE projection_eligible = 1 AND contract_id IN ({placeholders}) AND EXISTS (SELECT 1 FROM nostr_event_tag AS tag WHERE tag.event_id = event.event_id AND tag.tag_name = ? AND tag.tag_value = ?) ORDER BY event.seq ASC LIMIT ?" 325 ); 326 let mut query = sqlx::query(sql.as_str()); 327 for contract_id in contract_ids { 328 query = query.bind(contract_id.as_ref()); 329 } 330 let rows = query 331 .bind(tag_name) 332 .bind(tag_value) 333 .bind(i64::from(limit)) 334 .fetch_all(&self.pool) 335 .await?; 336 rows.into_iter().map(stored_event_from_row).collect() 337 } 338 } 339 340 #[derive(Clone, Debug, PartialEq, Eq)] 341 pub struct RadrootsRelayObservationRow { 342 pub event_id: String, 343 pub relay_url: String, 344 pub observation_type: String, 345 pub first_seen_at_ms: i64, 346 pub last_seen_at_ms: i64, 347 pub observation_count: i64, 348 pub last_message: Option<String>, 349 } 350 351 struct EventClassification { 352 contract_status: RadrootsEventContractStatus, 353 contract: Option<&'static RadrootsEventContract>, 354 } 355 356 impl EventClassification { 357 fn base_projection_eligible(&self, verification: RadrootsEventVerificationStatus) -> bool { 358 verification == RadrootsEventVerificationStatus::Verified 359 && self 360 .contract 361 .map(|contract| contract.class != RadrootsEventClass::Ephemeral) 362 .unwrap_or(false) 363 } 364 } 365 366 struct AppliedHead { 367 decision: RadrootsEventHeadStoreDecision, 368 projection_eligible: bool, 369 } 370 371 struct InsertRawEventResult { 372 inserted: bool, 373 seq: i64, 374 } 375 376 async fn configure_connection( 377 pool: &SqlitePool, 378 file_backed: bool, 379 ) -> Result<(), RadrootsEventStoreError> { 380 sqlx::query("PRAGMA foreign_keys = ON") 381 .execute(pool) 382 .await?; 383 sqlx::query("PRAGMA busy_timeout = 5000") 384 .execute(pool) 385 .await?; 386 if file_backed { 387 sqlx::query("PRAGMA journal_mode = WAL") 388 .execute(pool) 389 .await?; 390 } 391 Ok(()) 392 } 393 394 #[cfg_attr(coverage_nightly, coverage(off))] 395 async fn apply_up(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> { 396 sqlx::raw_sql(EVENT_STORE_MIGRATION_UP) 397 .execute(pool) 398 .await?; 399 Ok(()) 400 } 401 402 #[cfg_attr(coverage_nightly, coverage(off))] 403 async fn apply_down(pool: &SqlitePool) -> Result<(), RadrootsEventStoreError> { 404 sqlx::raw_sql(EVENT_STORE_MIGRATION_DOWN) 405 .execute(pool) 406 .await?; 407 Ok(()) 408 } 409 410 #[cfg_attr(coverage_nightly, coverage(off))] 411 async fn query_i64(pool: &SqlitePool, sql: &str) -> Result<i64, RadrootsEventStoreError> { 412 let row = sqlx::query(sql).fetch_one(pool).await?; 413 Ok(row.try_get(0)?) 414 } 415 416 #[cfg_attr(coverage_nightly, coverage(off))] 417 async fn query_string(pool: &SqlitePool, sql: &str) -> Result<String, RadrootsEventStoreError> { 418 let row = sqlx::query(sql).fetch_one(pool).await?; 419 Ok(row.try_get(0)?) 420 } 421 422 fn validate_event_identity(event: &RadrootsNostrEvent) -> Result<(), RadrootsEventStoreError> { 423 RadrootsEventId::parse(event.id.as_str())?; 424 RadrootsPublicKey::parse(event.author.as_str())?; 425 RadrootsEventSignature::parse(event.sig.as_str())?; 426 Ok(()) 427 } 428 429 fn classify_event(event: &RadrootsNostrEvent) -> EventClassification { 430 match identify_event_contract(event.kind, &event.tags, &event.content) { 431 Ok(contract) => EventClassification { 432 contract_status: RadrootsEventContractStatus::Supported, 433 contract: Some(contract), 434 }, 435 Err(error) => EventClassification { 436 contract_status: RadrootsEventContractStatus::from_match_error(error), 437 contract: None, 438 }, 439 } 440 } 441 442 fn verify_event(event: &RadrootsNostrEvent) -> RadrootsEventVerificationStatus { 443 verification_status_from_nostr(radroots_nostr_verify_event(event)) 444 } 445 446 fn verification_status_from_nostr( 447 verification: RadrootsNostrEventVerification, 448 ) -> RadrootsEventVerificationStatus { 449 match verification { 450 RadrootsNostrEventVerification::Verified => RadrootsEventVerificationStatus::Verified, 451 RadrootsNostrEventVerification::IdVerified => RadrootsEventVerificationStatus::IdVerified, 452 RadrootsNostrEventVerification::IdMismatch => RadrootsEventVerificationStatus::IdMismatch, 453 RadrootsNostrEventVerification::SignatureInvalid => { 454 RadrootsEventVerificationStatus::SignatureInvalid 455 } 456 RadrootsNostrEventVerification::MalformedEnvelope => { 457 RadrootsEventVerificationStatus::MalformedEnvelope 458 } 459 } 460 } 461 462 async fn insert_raw_event( 463 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 464 ingest: &RadrootsEventIngest, 465 classification: &EventClassification, 466 verification_status: RadrootsEventVerificationStatus, 467 raw_json: &str, 468 tags_json: &str, 469 ) -> Result<InsertRawEventResult, RadrootsEventStoreError> { 470 let event = &ingest.event; 471 let contract_id = classification.contract.map(|contract| contract.id); 472 let event_class = classification 473 .contract 474 .map(|contract| StoredEventClass::from_event_class(contract.class).as_str()); 475 let projection_eligible = classification.base_projection_eligible(verification_status); 476 let result = sqlx::query( 477 "INSERT OR IGNORE INTO nostr_event(event_id, pubkey, created_at, kind, tags_json, content, sig, raw_json, verification_status, contract_status, contract_id, event_class, projection_eligible, inserted_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 478 ) 479 .bind(event.id.as_str()) 480 .bind(event.author.as_str()) 481 .bind(i64::from(event.created_at)) 482 .bind(i64::from(event.kind)) 483 .bind(tags_json) 484 .bind(event.content.as_str()) 485 .bind(event.sig.as_str()) 486 .bind(raw_json) 487 .bind(verification_status.as_str()) 488 .bind(classification.contract_status.as_str()) 489 .bind(contract_id) 490 .bind(event_class) 491 .bind(bool_i64(projection_eligible)) 492 .bind(ingest.observed_at_ms) 493 .bind(ingest.observed_at_ms) 494 .execute(&mut **tx) 495 .await?; 496 let inserted = result.rows_affected() > 0; 497 let seq = event_seq(tx, event.id.as_str()).await?; 498 Ok(InsertRawEventResult { inserted, seq }) 499 } 500 501 #[cfg_attr(coverage_nightly, coverage(off))] 502 async fn event_seq( 503 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 504 event_id: &str, 505 ) -> Result<i64, RadrootsEventStoreError> { 506 let row = sqlx::query("SELECT seq FROM nostr_event WHERE event_id = ?") 507 .bind(event_id) 508 .fetch_one(&mut **tx) 509 .await?; 510 row.try_get("seq").map_err(Into::into) 511 } 512 513 #[cfg_attr(coverage_nightly, coverage(off))] 514 async fn insert_tags( 515 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 516 event: &RadrootsNostrEvent, 517 contract: Option<&'static RadrootsEventContract>, 518 ) -> Result<(), RadrootsEventStoreError> { 519 for (index, tag) in event.tags.iter().enumerate() { 520 let tag_name = tag.first().map(String::as_str).unwrap_or(""); 521 let tag_value = tag.get(1).map(String::as_str); 522 let tag_json = serde_json::to_string(tag)?; 523 let tag_contract = contract.and_then(|contract| { 524 contract 525 .tags 526 .iter() 527 .find(|candidate| candidate.name == tag_name) 528 }); 529 let contract_semantic = tag_contract.map(|tag| tag_semantic_name(tag.semantic)); 530 let contract_value_type = tag_contract.map(|tag| tag_value_type_name(tag.value_type)); 531 let relay_indexed = tag_contract.map(|tag| tag.relay_indexed).unwrap_or(false); 532 sqlx::query( 533 "INSERT INTO nostr_event_tag(event_id, tag_index, tag_name, tag_value, tag_json, contract_semantic, contract_value_type, relay_indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 534 ) 535 .bind(event.id.as_str()) 536 .bind(i64::try_from(index).map_err(|_| RadrootsEventStoreError::IntegerRange { 537 field: "tag_index", 538 value: i64::MAX, 539 })?) 540 .bind(tag_name) 541 .bind(tag_value) 542 .bind(tag_json.as_str()) 543 .bind(contract_semantic) 544 .bind(contract_value_type) 545 .bind(bool_i64(relay_indexed)) 546 .execute(&mut **tx) 547 .await?; 548 } 549 Ok(()) 550 } 551 552 #[cfg_attr(coverage_nightly, coverage(off))] 553 async fn upsert_observation( 554 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 555 event_id: &str, 556 observation: &RadrootsRelayObservation, 557 ) -> Result<(), RadrootsEventStoreError> { 558 sqlx::query( 559 "INSERT INTO relay_event_seen(event_id, relay_url, observation_type, first_seen_at_ms, last_seen_at_ms, observation_count, last_message) VALUES (?, ?, ?, ?, ?, 1, ?) ON CONFLICT(event_id, relay_url, observation_type) DO UPDATE SET last_seen_at_ms = excluded.last_seen_at_ms, observation_count = relay_event_seen.observation_count + 1, last_message = excluded.last_message", 560 ) 561 .bind(event_id) 562 .bind(observation.relay_url.as_str()) 563 .bind(observation.observation_type.as_str()) 564 .bind(observation.observed_at_ms) 565 .bind(observation.observed_at_ms) 566 .bind(observation.message.as_deref()) 567 .execute(&mut **tx) 568 .await?; 569 Ok(()) 570 } 571 572 async fn apply_event_head( 573 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 574 event: &RadrootsNostrEvent, 575 contract: &RadrootsEventContract, 576 updated_at_ms: i64, 577 ) -> Result<AppliedHead, RadrootsEventStoreError> { 578 let candidate = match event_head_candidate_for_contract(event, contract) { 579 RadrootsEventHeadCandidateResult::Candidate(candidate) => candidate, 580 RadrootsEventHeadCandidateResult::NotHeadSelected => { 581 return Ok(AppliedHead { 582 decision: RadrootsEventHeadStoreDecision::NotHeadSelected, 583 projection_eligible: true, 584 }); 585 } 586 RadrootsEventHeadCandidateResult::NotPersisted => { 587 return Ok(AppliedHead { 588 decision: RadrootsEventHeadStoreDecision::NotPersisted, 589 projection_eligible: false, 590 }); 591 } 592 RadrootsEventHeadCandidateResult::Malformed(_) => { 593 return Ok(AppliedHead { 594 decision: RadrootsEventHeadStoreDecision::Malformed, 595 projection_eligible: false, 596 }); 597 } 598 }; 599 let current = current_event_head(tx, &candidate.coordinate).await?; 600 let protocol_decision = select_event_head(candidate.clone(), current.as_ref()); 601 if let RadrootsEventHeadDecision::Applied(head) = &protocol_decision { 602 upsert_head(tx, &candidate, head, updated_at_ms).await?; 603 } 604 let projection_eligible = matches!(protocol_decision, RadrootsEventHeadDecision::Applied(_)); 605 Ok(AppliedHead { 606 decision: RadrootsEventHeadStoreDecision::from_protocol(&protocol_decision), 607 projection_eligible, 608 }) 609 } 610 611 async fn current_event_head( 612 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 613 coordinate: &RadrootsEventHeadCoordinate, 614 ) -> Result<Option<RadrootsCurrentEventHead>, RadrootsEventStoreError> { 615 let row = match coordinate { 616 RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { 617 sqlx::query( 618 "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", 619 ) 620 .bind(i64::from(*kind)) 621 .bind(pubkey.as_str()) 622 .fetch_optional(&mut **tx) 623 .await? 624 } 625 RadrootsEventHeadCoordinate::Addressable { 626 kind, 627 pubkey, 628 d_tag, 629 } => { 630 sqlx::query( 631 "SELECT event_id, created_at FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", 632 ) 633 .bind(i64::from(*kind)) 634 .bind(pubkey.as_str()) 635 .bind(d_tag.as_str()) 636 .fetch_optional(&mut **tx) 637 .await? 638 } 639 }; 640 row.map(|row| { 641 let event_id: String = row.try_get("event_id")?; 642 let created_at: i64 = row.try_get("created_at")?; 643 Ok(RadrootsCurrentEventHead { 644 coordinate: coordinate.clone(), 645 event_id: RadrootsEventId::parse(event_id)?, 646 created_at: u32_from_i64("created_at", created_at)?, 647 }) 648 }) 649 .transpose() 650 } 651 652 #[cfg_attr(coverage_nightly, coverage(off))] 653 async fn upsert_head( 654 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, 655 candidate: &RadrootsEventHeadCandidate, 656 head: &RadrootsCurrentEventHead, 657 updated_at_ms: i64, 658 ) -> Result<(), RadrootsEventStoreError> { 659 match &head.coordinate { 660 RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => { 661 sqlx::query( 662 "DELETE FROM nostr_event_head WHERE coordinate_type = 'replaceable' AND kind = ? AND pubkey = ? AND d_tag IS NULL", 663 ) 664 .bind(i64::from(*kind)) 665 .bind(pubkey.as_str()) 666 .execute(&mut **tx) 667 .await?; 668 sqlx::query( 669 "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('replaceable', ?, ?, NULL, ?, ?, ?)", 670 ) 671 .bind(i64::from(*kind)) 672 .bind(pubkey.as_str()) 673 .bind(candidate.event_id.as_str()) 674 .bind(i64::from(candidate.created_at)) 675 .bind(updated_at_ms) 676 .execute(&mut **tx) 677 .await?; 678 } 679 RadrootsEventHeadCoordinate::Addressable { 680 kind, 681 pubkey, 682 d_tag, 683 } => { 684 sqlx::query( 685 "DELETE FROM nostr_event_head WHERE coordinate_type = 'addressable' AND kind = ? AND pubkey = ? AND d_tag = ?", 686 ) 687 .bind(i64::from(*kind)) 688 .bind(pubkey.as_str()) 689 .bind(d_tag.as_str()) 690 .execute(&mut **tx) 691 .await?; 692 sqlx::query( 693 "INSERT INTO nostr_event_head(coordinate_type, kind, pubkey, d_tag, event_id, created_at, updated_at_ms) VALUES ('addressable', ?, ?, ?, ?, ?, ?)", 694 ) 695 .bind(i64::from(*kind)) 696 .bind(pubkey.as_str()) 697 .bind(d_tag.as_str()) 698 .bind(candidate.event_id.as_str()) 699 .bind(i64::from(candidate.created_at)) 700 .bind(updated_at_ms) 701 .execute(&mut **tx) 702 .await?; 703 } 704 } 705 Ok(()) 706 } 707 708 #[cfg_attr(coverage_nightly, coverage(off))] 709 fn stored_event_from_row( 710 row: sqlx::sqlite::SqliteRow, 711 ) -> Result<RadrootsStoredEvent, RadrootsEventStoreError> { 712 let kind = u32_from_i64("kind", row.try_get("kind")?)?; 713 let created_at = u32_from_i64("created_at", row.try_get("created_at")?)?; 714 let verification_status = 715 RadrootsEventVerificationStatus::parse(row.try_get("verification_status")?)?; 716 let contract_status = 717 RadrootsEventContractStatus::parse(row.try_get("contract_status")?, kind)?; 718 let event_class = row 719 .try_get::<Option<String>, _>("event_class")? 720 .map(|value| StoredEventClass::parse(value.as_str())) 721 .transpose()?; 722 let projection_eligible = row.try_get::<i64, _>("projection_eligible")? != 0; 723 Ok(RadrootsStoredEvent { 724 seq: row.try_get("seq")?, 725 event_id: row.try_get("event_id")?, 726 pubkey: row.try_get("pubkey")?, 727 created_at, 728 kind, 729 tags_json: row.try_get("tags_json")?, 730 content: row.try_get("content")?, 731 sig: row.try_get("sig")?, 732 raw_json: row.try_get("raw_json")?, 733 verification_status, 734 contract_status, 735 contract_id: row.try_get("contract_id")?, 736 event_class, 737 projection_eligible, 738 inserted_at_ms: row.try_get("inserted_at_ms")?, 739 updated_at_ms: row.try_get("updated_at_ms")?, 740 }) 741 } 742 743 #[cfg_attr(coverage_nightly, coverage(off))] 744 fn stored_tag_from_row( 745 row: sqlx::sqlite::SqliteRow, 746 ) -> Result<RadrootsStoredEventTag, RadrootsEventStoreError> { 747 Ok(RadrootsStoredEventTag { 748 event_id: row.try_get("event_id")?, 749 tag_index: u32_from_i64("tag_index", row.try_get("tag_index")?)?, 750 tag_name: row.try_get("tag_name")?, 751 tag_value: row.try_get("tag_value")?, 752 tag_json: row.try_get("tag_json")?, 753 contract_semantic: row.try_get("contract_semantic")?, 754 contract_value_type: row.try_get("contract_value_type")?, 755 relay_indexed: row.try_get::<i64, _>("relay_indexed")? != 0, 756 }) 757 } 758 759 #[cfg_attr(coverage_nightly, coverage(off))] 760 fn stored_head_from_row( 761 row: sqlx::sqlite::SqliteRow, 762 ) -> Result<RadrootsStoredEventHead, RadrootsEventStoreError> { 763 Ok(RadrootsStoredEventHead { 764 coordinate_type: StoredEventClass::parse(row.try_get("coordinate_type")?)?, 765 kind: u32_from_i64("kind", row.try_get("kind")?)?, 766 pubkey: row.try_get("pubkey")?, 767 d_tag: row.try_get("d_tag")?, 768 event_id: row.try_get("event_id")?, 769 created_at: u32_from_i64("created_at", row.try_get("created_at")?)?, 770 updated_at_ms: row.try_get("updated_at_ms")?, 771 }) 772 } 773 774 #[cfg_attr(coverage_nightly, coverage(off))] 775 fn projection_cursor_from_row( 776 row: sqlx::sqlite::SqliteRow, 777 ) -> Result<RadrootsProjectionCursor, RadrootsEventStoreError> { 778 Ok(RadrootsProjectionCursor { 779 projection_id: row.try_get("projection_id")?, 780 projection_version: u32_from_i64("projection_version", row.try_get("projection_version")?)?, 781 last_event_seq: row.try_get("last_event_seq")?, 782 updated_at_ms: row.try_get("updated_at_ms")?, 783 }) 784 } 785 786 #[cfg_attr(coverage_nightly, coverage(off))] 787 fn relay_observation_from_row( 788 row: sqlx::sqlite::SqliteRow, 789 ) -> Result<RadrootsRelayObservationRow, RadrootsEventStoreError> { 790 Ok(RadrootsRelayObservationRow { 791 event_id: row.try_get("event_id")?, 792 relay_url: row.try_get("relay_url")?, 793 observation_type: row.try_get("observation_type")?, 794 first_seen_at_ms: row.try_get("first_seen_at_ms")?, 795 last_seen_at_ms: row.try_get("last_seen_at_ms")?, 796 observation_count: row.try_get("observation_count")?, 797 last_message: row.try_get("last_message")?, 798 }) 799 } 800 801 #[cfg_attr(coverage_nightly, coverage(off))] 802 fn u32_from_i64(field: &'static str, value: i64) -> Result<u32, RadrootsEventStoreError> { 803 u32::try_from(value).map_err(|_| RadrootsEventStoreError::IntegerRange { field, value }) 804 } 805 806 fn bool_i64(value: bool) -> i64 { 807 if value { 1 } else { 0 } 808 } 809 810 fn validate_tag_query(tag_name: &str, limit: u32) -> Result<(), RadrootsEventStoreError> { 811 if tag_name.is_empty() { 812 return Err(RadrootsEventStoreError::EmptyTagName); 813 } 814 if !(1..=RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX).contains(&limit) { 815 return Err(RadrootsEventStoreError::QueryLimitOutOfRange { 816 min: 1, 817 max: RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX, 818 actual: limit, 819 }); 820 } 821 Ok(()) 822 } 823 824 fn validate_contract_tag_query<S>( 825 contract_ids: &[S], 826 tag_name: &str, 827 limit: u32, 828 ) -> Result<(), RadrootsEventStoreError> 829 where 830 S: AsRef<str>, 831 { 832 if contract_ids.is_empty() { 833 return Err(RadrootsEventStoreError::EmptyContractList); 834 } 835 if contract_ids.len() > RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX { 836 return Err(RadrootsEventStoreError::ContractListTooLarge { 837 max: RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX, 838 actual: contract_ids.len(), 839 }); 840 } 841 validate_tag_query(tag_name, limit) 842 } 843 844 #[cfg(test)] 845 mod tests { 846 use super::*; 847 use radroots_events::event_head::event_head_candidate_for_event; 848 use radroots_events::kinds::{ 849 KIND_GEOCHAT, KIND_LISTING, KIND_ORDER_REQUEST, KIND_POST, KIND_PROFILE, 850 }; 851 use radroots_nostr::prelude::{ 852 RadrootsNostrKeys, RadrootsNostrSecretKey, RadrootsNostrTimestamp, 853 radroots_event_from_nostr, radroots_nostr_build_event, 854 }; 855 856 const FIXTURE_ALICE_SECRET_KEY_HEX: &str = 857 "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5"; 858 const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str = 859 "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df"; 860 861 fn fixture_keys() -> RadrootsNostrKeys { 862 let secret_key = 863 RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key"); 864 RadrootsNostrKeys::new(secret_key) 865 } 866 867 fn event_id(character: char) -> String { 868 core::iter::repeat_n(character, 64).collect() 869 } 870 871 fn signed_event( 872 kind: u32, 873 created_at: u32, 874 tags: Vec<Vec<String>>, 875 content: &str, 876 ) -> RadrootsNostrEvent { 877 let raw_event = radroots_nostr_build_event(kind, content, tags) 878 .expect("builder") 879 .custom_created_at(RadrootsNostrTimestamp::from_secs(u64::from(created_at))) 880 .sign_with_keys(&fixture_keys()) 881 .expect("signed event"); 882 radroots_event_from_nostr(&raw_event) 883 } 884 885 fn tamper_signature(event: &mut RadrootsNostrEvent) { 886 let replacement = if event.sig.starts_with('0') { "1" } else { "0" }; 887 event.sig.replace_range(0..1, replacement); 888 } 889 890 fn listing_tags(d_tag: &str) -> Vec<Vec<String>> { 891 vec![vec!["d".to_owned(), d_tag.to_owned()]] 892 } 893 894 fn head_coordinate_for_event(event: &RadrootsNostrEvent) -> RadrootsEventHeadCoordinate { 895 let RadrootsEventHeadCandidateResult::Candidate(candidate) = 896 event_head_candidate_for_event(event).expect("head candidate") 897 else { 898 panic!("event should select a head"); 899 }; 900 candidate.coordinate 901 } 902 903 fn profile_coordinate() -> RadrootsEventHeadCoordinate { 904 RadrootsEventHeadCoordinate::Replaceable { 905 kind: KIND_PROFILE, 906 pubkey: RadrootsPublicKey::parse(FIXTURE_ALICE_PUBLIC_KEY_HEX).expect("pubkey"), 907 } 908 } 909 910 #[test] 911 fn verification_status_values_round_trip() { 912 for status in [ 913 RadrootsEventVerificationStatus::NotChecked, 914 RadrootsEventVerificationStatus::IdVerified, 915 RadrootsEventVerificationStatus::Verified, 916 RadrootsEventVerificationStatus::IdMismatch, 917 RadrootsEventVerificationStatus::SignatureInvalid, 918 RadrootsEventVerificationStatus::MalformedEnvelope, 919 ] { 920 assert_eq!( 921 RadrootsEventVerificationStatus::parse(status.as_str()).expect("status"), 922 status 923 ); 924 } 925 assert!(RadrootsEventVerificationStatus::parse("invalid").is_err()); 926 } 927 928 #[test] 929 fn verification_status_mapper_covers_all_nostr_results() { 930 assert_eq!( 931 verification_status_from_nostr(RadrootsNostrEventVerification::Verified), 932 RadrootsEventVerificationStatus::Verified 933 ); 934 assert_eq!( 935 verification_status_from_nostr(RadrootsNostrEventVerification::IdVerified), 936 RadrootsEventVerificationStatus::IdVerified 937 ); 938 assert_eq!( 939 verification_status_from_nostr(RadrootsNostrEventVerification::IdMismatch), 940 RadrootsEventVerificationStatus::IdMismatch 941 ); 942 assert_eq!( 943 verification_status_from_nostr(RadrootsNostrEventVerification::SignatureInvalid), 944 RadrootsEventVerificationStatus::SignatureInvalid 945 ); 946 assert_eq!( 947 verification_status_from_nostr(RadrootsNostrEventVerification::MalformedEnvelope), 948 RadrootsEventVerificationStatus::MalformedEnvelope 949 ); 950 } 951 952 #[tokio::test] 953 async fn constructor_enforces_sqlite_pragmas() { 954 let store = RadrootsEventStore::open_memory().await.expect("open"); 955 956 assert_eq!(store.pragma_foreign_keys().await.expect("foreign_keys"), 1); 957 assert_eq!( 958 store.pragma_busy_timeout().await.expect("busy_timeout"), 959 5000 960 ); 961 assert_eq!( 962 store.pragma_journal_mode().await.expect("journal"), 963 "memory" 964 ); 965 } 966 967 #[tokio::test] 968 async fn status_summary_counts_events_projections_and_relay_observations() { 969 let store = RadrootsEventStore::open_memory().await.expect("open"); 970 971 let empty = store.status_summary().await.expect("empty status"); 972 assert_eq!(empty.total_events, 0); 973 assert_eq!(empty.projection_eligible_events, 0); 974 assert_eq!(empty.relay_observations, 0); 975 assert_eq!(empty.last_event_seq, None); 976 assert_eq!(empty.last_event_updated_at_ms, None); 977 978 let event = signed_event( 979 KIND_POST, 980 10, 981 vec![vec!["t".to_owned(), "soil".to_owned()]], 982 "hello", 983 ); 984 let observation = RadrootsRelayObservation::new( 985 "wss://relay.example.com", 986 crate::RadrootsRelayObservationType::PublishAck, 987 1_100, 988 ); 989 store 990 .ingest_event(RadrootsEventIngest::new(event.clone(), 1_000)) 991 .await 992 .expect("event ingest"); 993 store 994 .ingest_event(RadrootsEventIngest::new(event, 1_100).with_observation(observation)) 995 .await 996 .expect("observation ingest"); 997 998 let status = store.status_summary().await.expect("status"); 999 assert_eq!(status.total_events, 1); 1000 assert_eq!(status.projection_eligible_events, 1); 1001 assert_eq!(status.relay_observations, 1); 1002 assert_eq!(status.last_event_seq, Some(1)); 1003 assert_eq!(status.last_event_updated_at_ms, Some(1_000)); 1004 } 1005 1006 #[tokio::test] 1007 async fn file_store_reopens_existing_schema() { 1008 let tempdir = tempfile::tempdir().expect("tempdir"); 1009 let path = tempdir.path().join("event_store.sqlite"); 1010 1011 let first = RadrootsEventStore::open_file(&path).await.expect("first"); 1012 assert_eq!(first.pragma_foreign_keys().await.expect("foreign_keys"), 1); 1013 drop(first); 1014 1015 let second = RadrootsEventStore::open_file(&path).await.expect("second"); 1016 assert_eq!(second.pragma_foreign_keys().await.expect("foreign_keys"), 1); 1017 } 1018 1019 #[tokio::test] 1020 async fn migration_can_run_down() { 1021 let store = RadrootsEventStore::open_memory().await.expect("open"); 1022 store.migrate_down().await.expect("down"); 1023 1024 let missing = sqlx::query("SELECT COUNT(*) FROM nostr_event") 1025 .fetch_one(store.pool()) 1026 .await 1027 .err() 1028 .expect("table should be removed"); 1029 assert!(missing.to_string().contains("nostr_event")); 1030 } 1031 1032 #[tokio::test] 1033 async fn ingest_retains_raw_event_and_ignores_duplicate_rows() { 1034 let store = RadrootsEventStore::open_memory().await.expect("open"); 1035 let event = signed_event( 1036 KIND_POST, 1037 10, 1038 vec![vec!["t".to_owned(), "soil".to_owned()]], 1039 "hello", 1040 ); 1041 let ingest = 1042 RadrootsEventIngest::new(event.clone(), 1_000).with_raw_json("{\"fixture\":true}"); 1043 1044 let first = store 1045 .ingest_event(ingest.clone()) 1046 .await 1047 .expect("first ingest"); 1048 let second = store.ingest_event(ingest).await.expect("second ingest"); 1049 let stored = store 1050 .get_event(event.id.as_str()) 1051 .await 1052 .expect("get") 1053 .expect("stored"); 1054 1055 assert!(first.inserted); 1056 assert!(!second.inserted); 1057 assert_eq!(first.seq, second.seq); 1058 assert_eq!( 1059 second.head_decision, 1060 RadrootsEventHeadStoreDecision::SkippedDuplicate 1061 ); 1062 assert_eq!( 1063 first.verification_status, 1064 RadrootsEventVerificationStatus::Verified 1065 ); 1066 assert_eq!(stored.seq, first.seq); 1067 assert_eq!(stored.raw_json, "{\"fixture\":true}"); 1068 assert_eq!(stored.content, "hello"); 1069 assert_eq!(stored.tags_json, "[[\"t\",\"soil\"]]"); 1070 assert_eq!( 1071 stored.contract_status, 1072 RadrootsEventContractStatus::Supported 1073 ); 1074 assert!(stored.projection_eligible); 1075 assert_eq!( 1076 store 1077 .tags_for_event(event.id.as_str()) 1078 .await 1079 .expect("tags") 1080 .len(), 1081 1 1082 ); 1083 } 1084 1085 #[tokio::test] 1086 async fn unsupported_verified_events_are_stored_but_not_projected() { 1087 let store = RadrootsEventStore::open_memory().await.expect("open"); 1088 let event = signed_event(999, 11, Vec::new(), "unsupported"); 1089 let receipt = store 1090 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_000)) 1091 .await 1092 .expect("ingest"); 1093 let stored = store 1094 .get_event(event.id.as_str()) 1095 .await 1096 .expect("get") 1097 .expect("stored"); 1098 1099 assert_eq!( 1100 receipt.contract_status, 1101 RadrootsEventContractStatus::UnsupportedKind(999) 1102 ); 1103 assert_eq!( 1104 stored.verification_status, 1105 RadrootsEventVerificationStatus::Verified 1106 ); 1107 assert!(!stored.projection_eligible); 1108 1109 let duplicate = store 1110 .ingest_event(RadrootsEventIngest::new(event, 2_100)) 1111 .await 1112 .expect("duplicate"); 1113 assert!(!duplicate.inserted); 1114 assert_eq!( 1115 duplicate.head_decision, 1116 RadrootsEventHeadStoreDecision::Unsupported 1117 ); 1118 } 1119 1120 #[test] 1121 fn test_helpers_cover_signature_and_non_head_branches() { 1122 let mut zero_sig = signed_event(KIND_POST, 12, Vec::new(), "zero"); 1123 zero_sig.sig.replace_range(0..1, "0"); 1124 tamper_signature(&mut zero_sig); 1125 assert!(zero_sig.sig.starts_with('1')); 1126 1127 let mut nonzero_sig = signed_event(KIND_POST, 12, Vec::new(), "nonzero"); 1128 nonzero_sig.sig.replace_range(0..1, "1"); 1129 tamper_signature(&mut nonzero_sig); 1130 assert!(nonzero_sig.sig.starts_with('0')); 1131 } 1132 1133 #[test] 1134 #[should_panic(expected = "event should select a head")] 1135 fn head_coordinate_helper_panics_for_regular_events() { 1136 let event = signed_event(KIND_POST, 12, Vec::new(), "regular"); 1137 let _ = head_coordinate_for_event(&event); 1138 } 1139 1140 #[tokio::test] 1141 async fn id_mismatch_events_are_stored_but_not_projected() { 1142 let store = RadrootsEventStore::open_memory().await.expect("open"); 1143 let mut event = signed_event(KIND_POST, 12, Vec::new(), "hello"); 1144 event.content = "tampered".to_owned(); 1145 let receipt = store 1146 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_100)) 1147 .await 1148 .expect("ingest"); 1149 let stored = store 1150 .get_event(event.id.as_str()) 1151 .await 1152 .expect("get") 1153 .expect("stored"); 1154 1155 assert_eq!( 1156 receipt.contract_status, 1157 RadrootsEventContractStatus::Supported 1158 ); 1159 assert_eq!( 1160 receipt.verification_status, 1161 RadrootsEventVerificationStatus::IdMismatch 1162 ); 1163 assert_eq!( 1164 stored.verification_status, 1165 RadrootsEventVerificationStatus::IdMismatch 1166 ); 1167 assert!(!stored.projection_eligible); 1168 assert!( 1169 store 1170 .events_since_cursor("social", 10) 1171 .await 1172 .expect("events") 1173 .is_empty() 1174 ); 1175 } 1176 1177 #[tokio::test] 1178 async fn signature_invalid_events_are_stored_but_not_projected() { 1179 let store = RadrootsEventStore::open_memory().await.expect("open"); 1180 let mut event = signed_event(KIND_POST, 13, Vec::new(), "hello"); 1181 tamper_signature(&mut event); 1182 let receipt = store 1183 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_200)) 1184 .await 1185 .expect("ingest"); 1186 let stored = store 1187 .get_event(event.id.as_str()) 1188 .await 1189 .expect("get") 1190 .expect("stored"); 1191 1192 assert_eq!( 1193 receipt.verification_status, 1194 RadrootsEventVerificationStatus::SignatureInvalid 1195 ); 1196 assert_eq!( 1197 stored.verification_status, 1198 RadrootsEventVerificationStatus::SignatureInvalid 1199 ); 1200 assert!(!stored.projection_eligible); 1201 assert!( 1202 store 1203 .events_since_cursor("social", 10) 1204 .await 1205 .expect("events") 1206 .is_empty() 1207 ); 1208 } 1209 1210 #[tokio::test] 1211 async fn malformed_envelope_events_are_stored_but_not_projected() { 1212 let store = RadrootsEventStore::open_memory().await.expect("open"); 1213 let mut event = signed_event(KIND_POST, 13, Vec::new(), "hello"); 1214 event.kind = u32::from(u16::MAX) + 1; 1215 1216 let receipt = store 1217 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_250)) 1218 .await 1219 .expect("ingest"); 1220 let stored = store 1221 .get_event(event.id.as_str()) 1222 .await 1223 .expect("get") 1224 .expect("stored"); 1225 1226 assert_eq!( 1227 receipt.verification_status, 1228 RadrootsEventVerificationStatus::MalformedEnvelope 1229 ); 1230 assert_eq!( 1231 stored.verification_status, 1232 RadrootsEventVerificationStatus::MalformedEnvelope 1233 ); 1234 assert!(!stored.projection_eligible); 1235 } 1236 1237 #[tokio::test] 1238 async fn ephemeral_events_are_not_persisted_as_heads() { 1239 let store = RadrootsEventStore::open_memory().await.expect("open"); 1240 let event = signed_event(KIND_GEOCHAT, 15, Vec::new(), "hello"); 1241 1242 let receipt = store 1243 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_260)) 1244 .await 1245 .expect("ingest"); 1246 let stored = store 1247 .get_event(event.id.as_str()) 1248 .await 1249 .expect("get") 1250 .expect("stored"); 1251 1252 assert_eq!( 1253 receipt.contract_status, 1254 RadrootsEventContractStatus::Supported 1255 ); 1256 assert_eq!( 1257 receipt.head_decision, 1258 RadrootsEventHeadStoreDecision::NotProjectionEligible 1259 ); 1260 assert!(!receipt.projection_eligible); 1261 assert!(!stored.projection_eligible); 1262 } 1263 1264 #[tokio::test] 1265 async fn event_head_helper_maps_not_persisted_candidates() { 1266 let store = RadrootsEventStore::open_memory().await.expect("open"); 1267 let event = signed_event(KIND_GEOCHAT, 17, Vec::new(), "hello"); 1268 let classification = classify_event(&event); 1269 let contract = classification.contract.expect("contract"); 1270 let mut tx = store.pool.begin().await.expect("tx"); 1271 1272 let head = apply_event_head(&mut tx, &event, contract, 2_280) 1273 .await 1274 .expect("head"); 1275 1276 assert_eq!(head.decision, RadrootsEventHeadStoreDecision::NotPersisted); 1277 assert!(!head.projection_eligible); 1278 } 1279 1280 #[tokio::test] 1281 async fn malformed_addressable_heads_are_not_projected() { 1282 let store = RadrootsEventStore::open_memory().await.expect("open"); 1283 let event = signed_event(KIND_LISTING, 16, Vec::new(), "{}"); 1284 1285 let receipt = store 1286 .ingest_event(RadrootsEventIngest::new(event.clone(), 2_270)) 1287 .await 1288 .expect("ingest"); 1289 let stored = store 1290 .get_event(event.id.as_str()) 1291 .await 1292 .expect("get") 1293 .expect("stored"); 1294 1295 assert_eq!( 1296 receipt.contract_status, 1297 RadrootsEventContractStatus::Supported 1298 ); 1299 assert_eq!( 1300 receipt.head_decision, 1301 RadrootsEventHeadStoreDecision::Malformed 1302 ); 1303 assert!(!receipt.projection_eligible); 1304 assert!(!stored.projection_eligible); 1305 } 1306 1307 #[tokio::test] 1308 async fn id_mismatch_addressable_events_do_not_update_heads() { 1309 let store = RadrootsEventStore::open_memory().await.expect("open"); 1310 let original = signed_event(KIND_LISTING, 17, listing_tags("listing-1"), "{}"); 1311 let first = store 1312 .ingest_event(RadrootsEventIngest::new(original.clone(), 2_300)) 1313 .await 1314 .expect("first"); 1315 let coordinate = head_coordinate_for_event(&original); 1316 let mut invalid = signed_event(KIND_LISTING, 18, listing_tags("listing-1"), "{}"); 1317 invalid.content = "{\"tampered\":true}".to_owned(); 1318 1319 let receipt = store 1320 .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_400)) 1321 .await 1322 .expect("invalid"); 1323 let stored = store 1324 .get_event(invalid.id.as_str()) 1325 .await 1326 .expect("get") 1327 .expect("stored"); 1328 let head = store 1329 .event_head(&coordinate) 1330 .await 1331 .expect("head") 1332 .expect("stored head"); 1333 1334 assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied); 1335 assert_eq!( 1336 receipt.verification_status, 1337 RadrootsEventVerificationStatus::IdMismatch 1338 ); 1339 assert_eq!( 1340 receipt.head_decision, 1341 RadrootsEventHeadStoreDecision::NotProjectionEligible 1342 ); 1343 assert!(!receipt.projection_eligible); 1344 assert!(!stored.projection_eligible); 1345 assert_eq!(head.event_id, original.id); 1346 } 1347 1348 #[tokio::test] 1349 async fn signature_invalid_addressable_events_do_not_update_heads() { 1350 let store = RadrootsEventStore::open_memory().await.expect("open"); 1351 let original = signed_event(KIND_LISTING, 19, listing_tags("listing-2"), "{}"); 1352 store 1353 .ingest_event(RadrootsEventIngest::new(original.clone(), 2_500)) 1354 .await 1355 .expect("first"); 1356 let coordinate = head_coordinate_for_event(&original); 1357 let mut invalid = signed_event(KIND_LISTING, 20, listing_tags("listing-2"), "{}"); 1358 tamper_signature(&mut invalid); 1359 1360 let receipt = store 1361 .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_600)) 1362 .await 1363 .expect("invalid"); 1364 let head = store 1365 .event_head(&coordinate) 1366 .await 1367 .expect("head") 1368 .expect("stored head"); 1369 1370 assert_eq!( 1371 receipt.verification_status, 1372 RadrootsEventVerificationStatus::SignatureInvalid 1373 ); 1374 assert_eq!( 1375 receipt.head_decision, 1376 RadrootsEventHeadStoreDecision::NotProjectionEligible 1377 ); 1378 assert!(!receipt.projection_eligible); 1379 assert_eq!(head.event_id, original.id); 1380 } 1381 1382 #[tokio::test] 1383 async fn duplicate_invalid_addressable_events_do_not_update_heads() { 1384 let store = RadrootsEventStore::open_memory().await.expect("open"); 1385 let original = signed_event(KIND_LISTING, 21, listing_tags("listing-3"), "{}"); 1386 store 1387 .ingest_event(RadrootsEventIngest::new(original.clone(), 2_700)) 1388 .await 1389 .expect("original"); 1390 let coordinate = head_coordinate_for_event(&original); 1391 let mut invalid = signed_event(KIND_LISTING, 22, listing_tags("listing-3"), "{}"); 1392 invalid.content = "{\"tampered\":true}".to_owned(); 1393 1394 let first_invalid = store 1395 .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_800)) 1396 .await 1397 .expect("first invalid"); 1398 let second_invalid = store 1399 .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_900)) 1400 .await 1401 .expect("second invalid"); 1402 let head = store 1403 .event_head(&coordinate) 1404 .await 1405 .expect("head") 1406 .expect("stored head"); 1407 1408 assert!(first_invalid.inserted); 1409 assert!(!second_invalid.inserted); 1410 assert_eq!(first_invalid.seq, second_invalid.seq); 1411 assert_eq!( 1412 first_invalid.head_decision, 1413 RadrootsEventHeadStoreDecision::NotProjectionEligible 1414 ); 1415 assert_eq!( 1416 second_invalid.head_decision, 1417 RadrootsEventHeadStoreDecision::SkippedDuplicate 1418 ); 1419 assert_eq!(head.event_id, original.id); 1420 } 1421 1422 #[tokio::test] 1423 async fn duplicate_verified_addressable_events_preserve_heads() { 1424 let store = RadrootsEventStore::open_memory().await.expect("open"); 1425 let event = signed_event(KIND_LISTING, 23, listing_tags("listing-4"), "{}"); 1426 let coordinate = head_coordinate_for_event(&event); 1427 1428 let first = store 1429 .ingest_event(RadrootsEventIngest::new(event.clone(), 3_000)) 1430 .await 1431 .expect("first"); 1432 let second = store 1433 .ingest_event(RadrootsEventIngest::new(event.clone(), 3_100)) 1434 .await 1435 .expect("second"); 1436 let head = store 1437 .event_head(&coordinate) 1438 .await 1439 .expect("head") 1440 .expect("stored head"); 1441 1442 assert!(first.inserted); 1443 assert!(!second.inserted); 1444 assert_eq!(first.seq, second.seq); 1445 assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied); 1446 assert_eq!( 1447 second.head_decision, 1448 RadrootsEventHeadStoreDecision::SkippedDuplicate 1449 ); 1450 assert_eq!(head.event_id, event.id); 1451 } 1452 1453 #[tokio::test] 1454 async fn verified_regular_events_remain_projection_eligible_without_head_selection() { 1455 let store = RadrootsEventStore::open_memory().await.expect("open"); 1456 let event = signed_event(KIND_POST, 24, Vec::new(), "hello"); 1457 1458 let receipt = store 1459 .ingest_event(RadrootsEventIngest::new(event.clone(), 3_200)) 1460 .await 1461 .expect("ingest"); 1462 let stored = store 1463 .get_event(event.id.as_str()) 1464 .await 1465 .expect("get") 1466 .expect("stored"); 1467 1468 assert_eq!( 1469 receipt.verification_status, 1470 RadrootsEventVerificationStatus::Verified 1471 ); 1472 assert_eq!( 1473 receipt.head_decision, 1474 RadrootsEventHeadStoreDecision::NotHeadSelected 1475 ); 1476 assert!(receipt.projection_eligible); 1477 assert!(stored.projection_eligible); 1478 } 1479 1480 #[tokio::test] 1481 async fn events_by_tag_validates_inputs_and_returns_projection_events_in_sequence_order() { 1482 let store = RadrootsEventStore::open_memory().await.expect("store"); 1483 1484 assert!(matches!( 1485 store.events_by_tag("", "soil", 1).await, 1486 Err(RadrootsEventStoreError::EmptyTagName) 1487 )); 1488 assert!(matches!( 1489 store.events_by_tag("t", "soil", 0).await, 1490 Err(RadrootsEventStoreError::QueryLimitOutOfRange { .. }) 1491 )); 1492 assert!(matches!( 1493 store 1494 .events_by_tag("t", "soil", RADROOTS_EVENT_STORE_QUERY_LIMIT_MAX + 1) 1495 .await, 1496 Err(RadrootsEventStoreError::QueryLimitOutOfRange { .. }) 1497 )); 1498 1499 let unsupported = signed_event( 1500 999, 1501 40, 1502 vec![vec!["t".to_owned(), "soil".to_owned()]], 1503 "unsupported", 1504 ); 1505 let high_created_at = signed_event( 1506 KIND_POST, 1507 60, 1508 vec![ 1509 vec!["t".to_owned(), "soil".to_owned()], 1510 vec!["t".to_owned(), "soil".to_owned()], 1511 ], 1512 "high-created-at", 1513 ); 1514 let low_created_at = signed_event( 1515 KIND_POST, 1516 50, 1517 vec![vec!["t".to_owned(), "soil".to_owned()]], 1518 "low-created-at", 1519 ); 1520 1521 store 1522 .ingest_event(RadrootsEventIngest::new(unsupported.clone(), 3_300)) 1523 .await 1524 .expect("unsupported ingest"); 1525 store 1526 .ingest_event(RadrootsEventIngest::new(high_created_at.clone(), 3_400)) 1527 .await 1528 .expect("high ingest"); 1529 store 1530 .ingest_event(RadrootsEventIngest::new(low_created_at.clone(), 3_500)) 1531 .await 1532 .expect("low ingest"); 1533 1534 let events = store 1535 .events_by_tag("t", "soil", 10) 1536 .await 1537 .expect("tag query"); 1538 assert_eq!(events.len(), 2); 1539 assert_eq!(events[0].event_id, high_created_at.id); 1540 assert_eq!(events[1].event_id, low_created_at.id); 1541 assert!(events.iter().all(|event| event.projection_eligible)); 1542 1543 let limited = store 1544 .events_by_tag("t", "soil", 1) 1545 .await 1546 .expect("limited tag query"); 1547 assert_eq!(limited.len(), 1); 1548 assert_eq!(limited[0].event_id, high_created_at.id); 1549 } 1550 1551 #[tokio::test] 1552 async fn events_by_contract_and_tag_enforces_contract_tag_and_projection_filters() { 1553 let store = RadrootsEventStore::open_memory().await.expect("store"); 1554 1555 assert!(matches!( 1556 store 1557 .events_by_contract_and_tag::<&str>(&[], "p", FIXTURE_ALICE_PUBLIC_KEY_HEX, 1) 1558 .await, 1559 Err(RadrootsEventStoreError::EmptyContractList) 1560 )); 1561 let too_many_contracts = 1562 vec!["radroots.order.request.v1"; RADROOTS_EVENT_STORE_CONTRACT_QUERY_LIMIT_MAX + 1]; 1563 assert!(matches!( 1564 store 1565 .events_by_contract_and_tag( 1566 too_many_contracts.as_slice(), 1567 "p", 1568 FIXTURE_ALICE_PUBLIC_KEY_HEX, 1569 1, 1570 ) 1571 .await, 1572 Err(RadrootsEventStoreError::ContractListTooLarge { .. }) 1573 )); 1574 1575 let matching_order = signed_event( 1576 KIND_ORDER_REQUEST, 1577 70, 1578 vec![ 1579 vec!["d".to_owned(), "order-1".to_owned()], 1580 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()], 1581 ], 1582 "{}", 1583 ); 1584 let wrong_tag_order = signed_event( 1585 KIND_ORDER_REQUEST, 1586 71, 1587 vec![ 1588 vec!["d".to_owned(), "order-2".to_owned()], 1589 vec!["p".to_owned(), event_id('b')], 1590 ], 1591 "{}", 1592 ); 1593 let same_tag_wrong_contract = signed_event( 1594 KIND_POST, 1595 72, 1596 vec![vec![ 1597 "p".to_owned(), 1598 FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned(), 1599 ]], 1600 "hello", 1601 ); 1602 let unsupported_same_tag = signed_event( 1603 999, 1604 73, 1605 vec![vec![ 1606 "p".to_owned(), 1607 FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned(), 1608 ]], 1609 "unsupported", 1610 ); 1611 1612 for (event, observed_at_ms) in [ 1613 (matching_order.clone(), 3_600), 1614 (wrong_tag_order, 3_700), 1615 (same_tag_wrong_contract, 3_800), 1616 (unsupported_same_tag, 3_900), 1617 ] { 1618 store 1619 .ingest_event(RadrootsEventIngest::new(event, observed_at_ms)) 1620 .await 1621 .expect("ingest"); 1622 } 1623 1624 let events = store 1625 .events_by_contract_and_tag( 1626 &["radroots.order.request.v1"], 1627 "p", 1628 FIXTURE_ALICE_PUBLIC_KEY_HEX, 1629 10, 1630 ) 1631 .await 1632 .expect("contract tag query"); 1633 assert_eq!(events.len(), 1); 1634 assert_eq!(events[0].event_id, matching_order.id); 1635 assert_eq!( 1636 events[0].contract_id.as_deref(), 1637 Some("radroots.order.request.v1") 1638 ); 1639 assert!(events[0].projection_eligible); 1640 } 1641 1642 #[tokio::test] 1643 async fn tag_rows_preserve_order_and_contract_metadata() { 1644 let store = RadrootsEventStore::open_memory().await.expect("open"); 1645 let event = signed_event( 1646 KIND_PROFILE, 1647 14, 1648 vec![ 1649 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()], 1650 vec!["t".to_owned(), "harvest".to_owned()], 1651 ], 1652 "{}", 1653 ); 1654 1655 store 1656 .ingest_event(RadrootsEventIngest::new(event.clone(), 3_000)) 1657 .await 1658 .expect("ingest"); 1659 let tags = store.tags_for_event(event.id.as_str()).await.expect("tags"); 1660 1661 assert_eq!(tags[0].tag_index, 0); 1662 assert_eq!(tags[0].tag_name, "p"); 1663 assert_eq!(tags[0].contract_value_type.as_deref(), Some("public_key")); 1664 assert!(tags[0].relay_indexed); 1665 assert_eq!(tags[1].tag_index, 1); 1666 assert_eq!(tags[1].tag_json, "[\"t\",\"harvest\"]"); 1667 } 1668 1669 #[tokio::test] 1670 async fn listing_event_tag_persists_event_pointer_contract_metadata() { 1671 let store = RadrootsEventStore::open_memory().await.expect("open"); 1672 let listing_event_id = event_id('f'); 1673 let event = signed_event( 1674 KIND_ORDER_REQUEST, 1675 16, 1676 vec![ 1677 vec!["d".to_owned(), "order-1".to_owned()], 1678 vec!["p".to_owned(), FIXTURE_ALICE_PUBLIC_KEY_HEX.to_owned()], 1679 vec![ 1680 "a".to_owned(), 1681 format!( 1682 "{KIND_LISTING}:{}:AAAAAAAAAAAAAAAAAAAAAg", 1683 FIXTURE_ALICE_PUBLIC_KEY_HEX 1684 ), 1685 ], 1686 vec![ 1687 "listing_event".to_owned(), 1688 listing_event_id.clone(), 1689 "wss://relay.example.com".to_owned(), 1690 ], 1691 ], 1692 "{}", 1693 ); 1694 1695 store 1696 .ingest_event(RadrootsEventIngest::new(event.clone(), 3_100)) 1697 .await 1698 .expect("ingest"); 1699 let tags = store.tags_for_event(event.id.as_str()).await.expect("tags"); 1700 let listing_tag = tags 1701 .iter() 1702 .find(|tag| tag.tag_name == "listing_event") 1703 .expect("listing event tag"); 1704 1705 assert_eq!( 1706 listing_tag.tag_value.as_deref(), 1707 Some(listing_event_id.as_str()) 1708 ); 1709 assert_eq!( 1710 listing_tag.contract_semantic.as_deref(), 1711 Some("listing_snapshot") 1712 ); 1713 assert_eq!( 1714 listing_tag.contract_value_type.as_deref(), 1715 Some("event_pointer") 1716 ); 1717 assert!(!listing_tag.relay_indexed); 1718 } 1719 1720 #[tokio::test] 1721 async fn relay_observations_upsert_separately_from_event_identity() { 1722 let store = RadrootsEventStore::open_memory().await.expect("open"); 1723 let event = signed_event(KIND_POST, 15, Vec::new(), "hello"); 1724 let observation = RadrootsRelayObservation::new( 1725 "wss://relay.local", 1726 crate::RadrootsRelayObservationType::Subscription, 1727 4_000, 1728 ); 1729 let ingest = RadrootsEventIngest::new(event.clone(), 4_000).with_observation(observation); 1730 store.ingest_event(ingest).await.expect("first"); 1731 let observation = RadrootsRelayObservation::new( 1732 "wss://relay.local", 1733 crate::RadrootsRelayObservationType::Subscription, 1734 4_100, 1735 ) 1736 .with_message("duplicate accepted"); 1737 let ingest = RadrootsEventIngest::new(event.clone(), 4_100).with_observation(observation); 1738 store.ingest_event(ingest).await.expect("second"); 1739 1740 let observations = store 1741 .observations_for_event(event.id.as_str()) 1742 .await 1743 .expect("observations"); 1744 assert_eq!(observations.len(), 1); 1745 assert_eq!(observations[0].observation_count, 2); 1746 assert_eq!(observations[0].last_seen_at_ms, 4_100); 1747 assert_eq!( 1748 observations[0].last_message.as_deref(), 1749 Some("duplicate accepted") 1750 ); 1751 } 1752 1753 #[tokio::test] 1754 async fn event_heads_use_protocol_tie_breaks() { 1755 let mut events = [ 1756 signed_event(KIND_PROFILE, 20, Vec::new(), "{\"name\":\"a\"}"), 1757 signed_event(KIND_PROFILE, 20, Vec::new(), "{\"name\":\"b\"}"), 1758 ]; 1759 events.sort_by(|left, right| left.id.cmp(&right.id)); 1760 let lower = events[0].clone(); 1761 let higher = events[1].clone(); 1762 1763 let store = RadrootsEventStore::open_memory().await.expect("open"); 1764 let first = store 1765 .ingest_event(RadrootsEventIngest::new(higher.clone(), 5_000)) 1766 .await 1767 .expect("first"); 1768 let second = store 1769 .ingest_event(RadrootsEventIngest::new(lower.clone(), 5_100)) 1770 .await 1771 .expect("second"); 1772 let head = store 1773 .event_head(&profile_coordinate()) 1774 .await 1775 .expect("head") 1776 .expect("stored head"); 1777 1778 assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied); 1779 assert_eq!( 1780 second.head_decision, 1781 RadrootsEventHeadStoreDecision::Applied 1782 ); 1783 assert_eq!(head.event_id, lower.id); 1784 1785 let store = RadrootsEventStore::open_memory().await.expect("open"); 1786 store 1787 .ingest_event(RadrootsEventIngest::new(lower.clone(), 5_200)) 1788 .await 1789 .expect("first"); 1790 let second = store 1791 .ingest_event(RadrootsEventIngest::new(higher, 5_300)) 1792 .await 1793 .expect("second"); 1794 let head = store 1795 .event_head(&profile_coordinate()) 1796 .await 1797 .expect("head") 1798 .expect("stored head"); 1799 1800 assert_eq!( 1801 second.head_decision, 1802 RadrootsEventHeadStoreDecision::SkippedSameTimestampHigherEventId 1803 ); 1804 assert_eq!(head.event_id, lower.id); 1805 } 1806 1807 #[tokio::test] 1808 async fn projection_cursors_replay_by_store_sequence() { 1809 let store = RadrootsEventStore::open_memory().await.expect("open"); 1810 let first = signed_event(KIND_POST, 30, Vec::new(), "one"); 1811 let second = signed_event(KIND_POST, 30, Vec::new(), "two"); 1812 let first_receipt = store 1813 .ingest_event(RadrootsEventIngest::new(first.clone(), 6_000)) 1814 .await 1815 .expect("first"); 1816 let second_receipt = store 1817 .ingest_event(RadrootsEventIngest::new(second.clone(), 6_100)) 1818 .await 1819 .expect("second"); 1820 assert!(first_receipt.seq < second_receipt.seq); 1821 1822 let replay = store 1823 .events_since_cursor("social", 10) 1824 .await 1825 .expect("initial replay"); 1826 assert_eq!(replay.len(), 2); 1827 assert_eq!(replay[0].event_id, first.id); 1828 assert_eq!(replay[1].event_id, second.id); 1829 store 1830 .update_projection_cursor(&RadrootsProjectionCursor { 1831 projection_id: "social".to_owned(), 1832 projection_version: 1, 1833 last_event_seq: first_receipt.seq, 1834 updated_at_ms: 6_200, 1835 }) 1836 .await 1837 .expect("cursor"); 1838 let replay = store 1839 .events_since_cursor("social", 10) 1840 .await 1841 .expect("next replay"); 1842 assert_eq!(replay.len(), 1); 1843 assert_eq!(replay[0].event_id, second.id); 1844 } 1845 1846 #[tokio::test] 1847 async fn smoke_event_store_ingests_and_replays_ten_thousand_events() { 1848 let store = RadrootsEventStore::open_memory().await.expect("open"); 1849 for index in 0..10_000u32 { 1850 let event = signed_event( 1851 KIND_POST, 1852 10_000 + index, 1853 vec![vec!["t".to_owned(), "smoke".to_owned()]], 1854 format!("smoke-{index}").as_str(), 1855 ); 1856 let receipt = store 1857 .ingest_event(RadrootsEventIngest::new(event, 10_000 + i64::from(index))) 1858 .await 1859 .expect("ingest"); 1860 assert!(receipt.inserted); 1861 assert_eq!( 1862 receipt.verification_status, 1863 RadrootsEventVerificationStatus::Verified 1864 ); 1865 } 1866 1867 let replay = store 1868 .events_since_cursor("smoke", 10_000) 1869 .await 1870 .expect("replay"); 1871 assert_eq!(replay.len(), 10_000); 1872 assert_eq!(replay[0].seq, 1); 1873 assert_eq!(replay[9_999].seq, 10_000); 1874 1875 store 1876 .update_projection_cursor(&RadrootsProjectionCursor { 1877 projection_id: "smoke".to_owned(), 1878 projection_version: 1, 1879 last_event_seq: replay[4_999].seq, 1880 updated_at_ms: 25_000, 1881 }) 1882 .await 1883 .expect("cursor"); 1884 let replay = store 1885 .events_since_cursor("smoke", 10_000) 1886 .await 1887 .expect("replay after cursor"); 1888 assert_eq!(replay.len(), 5_000); 1889 assert_eq!(replay[0].seq, 5_001); 1890 assert_eq!(replay[4_999].seq, 10_000); 1891 } 1892 }