commit 6867a6e95211c67dd03349a5723bf17cb0e33fcf
parent 3b2f8393a6dbbc34e02d61cca7aee129a02a1cb8
Author: triesap <tyson@radroots.org>
Date: Sat, 13 Jun 2026 13:29:34 -0700
event_store: gate event heads by projection eligibility
Diffstat:
2 files changed, 133 insertions(+), 13 deletions(-)
diff --git a/crates/event_store/src/model.rs b/crates/event_store/src/model.rs
@@ -205,6 +205,7 @@ pub enum RadrootsEventHeadStoreDecision {
Applied,
NotHeadSelected,
NotPersisted,
+ NotProjectionEligible,
SkippedDuplicate,
SkippedOlder,
SkippedSameTimestampHigherEventId,
diff --git a/crates/event_store/src/store.rs b/crates/event_store/src/store.rs
@@ -102,19 +102,23 @@ impl RadrootsEventStore {
if inserted {
insert_tags(&mut tx, &ingest.event, classification.contract).await?;
if let Some(contract) = classification.contract {
- let head =
- apply_event_head(&mut tx, &ingest.event, contract, ingest.observed_at_ms)
- .await?;
- projection_eligible = projection_eligible && head.projection_eligible;
- head_decision = head.decision;
- sqlx::query(
- "UPDATE nostr_event SET projection_eligible = ?, updated_at_ms = ? WHERE event_id = ?",
- )
- .bind(bool_i64(projection_eligible))
- .bind(ingest.observed_at_ms)
- .bind(ingest.event.id.as_str())
- .execute(&mut *tx)
- .await?;
+ if projection_eligible {
+ let head =
+ apply_event_head(&mut tx, &ingest.event, contract, ingest.observed_at_ms)
+ .await?;
+ projection_eligible = head.projection_eligible;
+ head_decision = head.decision;
+ sqlx::query(
+ "UPDATE nostr_event SET projection_eligible = ?, updated_at_ms = ? WHERE event_id = ?",
+ )
+ .bind(bool_i64(projection_eligible))
+ .bind(ingest.observed_at_ms)
+ .bind(ingest.event.id.as_str())
+ .execute(&mut *tx)
+ .await?;
+ } else {
+ head_decision = RadrootsEventHeadStoreDecision::NotProjectionEligible;
+ }
}
} else if classification.contract.is_some() {
head_decision = RadrootsEventHeadStoreDecision::SkippedDuplicate;
@@ -756,6 +760,19 @@ mod tests {
event.sig.replace_range(0..1, replacement);
}
+ fn listing_tags(d_tag: &str) -> Vec<Vec<String>> {
+ vec![vec!["d".to_owned(), d_tag.to_owned()]]
+ }
+
+ fn head_coordinate_for_event(event: &RadrootsNostrEvent) -> RadrootsEventHeadCoordinate {
+ let RadrootsEventHeadCandidateResult::Candidate(candidate) =
+ event_head_candidate_for_event(event).expect("head candidate")
+ else {
+ panic!("event should select a head");
+ };
+ candidate.coordinate
+ }
+
#[test]
fn verification_status_values_round_trip() {
for status in [
@@ -945,6 +962,108 @@ mod tests {
}
#[tokio::test]
+ async fn id_mismatch_addressable_events_do_not_update_heads() {
+ let store = RadrootsEventStore::open_memory().await.expect("open");
+ let original = signed_event(KIND_LISTING, 17, listing_tags("listing-1"), "{}");
+ let first = store
+ .ingest_event(RadrootsEventIngest::new(original.clone(), 2_300))
+ .await
+ .expect("first");
+ let coordinate = head_coordinate_for_event(&original);
+ let mut invalid = signed_event(KIND_LISTING, 18, listing_tags("listing-1"), "{}");
+ invalid.content = "{\"tampered\":true}".to_owned();
+
+ let receipt = store
+ .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_400))
+ .await
+ .expect("invalid");
+ let stored = store
+ .get_event(invalid.id.as_str())
+ .await
+ .expect("get")
+ .expect("stored");
+ let head = store
+ .event_head(&coordinate)
+ .await
+ .expect("head")
+ .expect("stored head");
+
+ assert_eq!(first.head_decision, RadrootsEventHeadStoreDecision::Applied);
+ assert_eq!(
+ receipt.verification_status,
+ RadrootsEventVerificationStatus::IdMismatch
+ );
+ assert_eq!(
+ receipt.head_decision,
+ RadrootsEventHeadStoreDecision::NotProjectionEligible
+ );
+ assert!(!receipt.projection_eligible);
+ assert!(!stored.projection_eligible);
+ assert_eq!(head.event_id, original.id);
+ }
+
+ #[tokio::test]
+ async fn signature_invalid_addressable_events_do_not_update_heads() {
+ let store = RadrootsEventStore::open_memory().await.expect("open");
+ let original = signed_event(KIND_LISTING, 19, listing_tags("listing-2"), "{}");
+ store
+ .ingest_event(RadrootsEventIngest::new(original.clone(), 2_500))
+ .await
+ .expect("first");
+ let coordinate = head_coordinate_for_event(&original);
+ let mut invalid = signed_event(KIND_LISTING, 20, listing_tags("listing-2"), "{}");
+ tamper_signature(&mut invalid);
+
+ let receipt = store
+ .ingest_event(RadrootsEventIngest::new(invalid.clone(), 2_600))
+ .await
+ .expect("invalid");
+ let head = store
+ .event_head(&coordinate)
+ .await
+ .expect("head")
+ .expect("stored head");
+
+ assert_eq!(
+ receipt.verification_status,
+ RadrootsEventVerificationStatus::SignatureInvalid
+ );
+ assert_eq!(
+ receipt.head_decision,
+ RadrootsEventHeadStoreDecision::NotProjectionEligible
+ );
+ assert!(!receipt.projection_eligible);
+ assert_eq!(head.event_id, original.id);
+ }
+
+ #[tokio::test]
+ async fn verified_regular_events_remain_projection_eligible_without_head_selection() {
+ let store = RadrootsEventStore::open_memory().await.expect("open");
+ let event = signed_event(KIND_POST, 21, Vec::new(), "hello");
+
+ let receipt = store
+ .ingest_event(RadrootsEventIngest::new(event.clone(), 2_700))
+ .await
+ .expect("ingest");
+ let stored = store
+ .get_event(event.id.as_str())
+ .await
+ .expect("get")
+ .expect("stored");
+
+ assert_eq!(
+ receipt.verification_status,
+ RadrootsEventVerificationStatus::Verified
+ );
+ assert_eq!(
+ receipt.head_decision,
+ RadrootsEventHeadStoreDecision::NotHeadSelected
+ );
+ assert!(receipt.projection_eligible);
+ assert!(stored.projection_eligible);
+ }
+
+ #[tokio::test]
async fn tag_rows_preserve_order_and_contract_metadata() {
let store = RadrootsEventStore::open_memory().await.expect("open");
let event = signed_event(