tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit da6a212a025627f689a69e8e4a1c8a37b7d16e21
parent 67fc7598bdc72c50c1ba7db7ccef327918692b90
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 03:29:54 -0700

store-surreal: project reactions

Diffstat:
Mcrates/tangle_store_surreal/src/lib.rs | 466+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 458 insertions(+), 8 deletions(-)

diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs @@ -7,8 +7,8 @@ use surrealdb::Surreal; use surrealdb::engine::local::{Db, Mem, RocksDb}; use tangle_nips::{ CommentEvent, DeletionTarget, ListingProjection, ListingProjectionEvaluation, - NIP99_DRAFT_LISTING_KIND, NIP99_PUBLIC_LISTING_KIND, evaluate_listing_projection, - parse_comment_event, parse_deletion_request, + NIP99_DRAFT_LISTING_KIND, NIP99_PUBLIC_LISTING_KIND, ReactionEvent, ReactionValue, + evaluate_listing_projection, parse_comment_event, parse_deletion_request, parse_reaction_event, }; use tangle_protocol::{AddressCoordinate, Event, EventId, Filter, UnixTimestamp, event_to_value}; use tangle_store::{StoreEventOutcome, StoredEvent}; @@ -786,6 +786,13 @@ pub enum CommentProjectionOutcome { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReactionProjectionOutcome { + NotReaction, + Ineligible, + Projected, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum HiddenEventOutcome { NotFound, Hidden, @@ -1517,8 +1524,12 @@ UPSERT type::record('deletion_marker', $marker_id) CONTENT { .check() .map_err(SurrealStoreError::from)?; if target_type == "event" { - self.mark_raw_event_deleted(&target_ref, event.unsigned().pubkey().as_str()) - .await?; + self.mark_raw_event_deleted( + &target_ref, + event.unsigned().pubkey().as_str(), + event.unsigned().created_at().as_u64(), + ) + .await?; } else { self.mark_address_deleted(&target_ref, event.unsigned().pubkey().as_str()) .await?; @@ -1926,6 +1937,93 @@ UPSERT type::record('comment_projection', $event_id) CONTENT { response.take(0).map_err(SurrealStoreError::from) } + pub async fn project_reaction( + &self, + event: &Event, + projected_at: UnixTimestamp, + ) -> Result<ReactionProjectionOutcome, SurrealStoreError> { + let reaction = match parse_reaction_event(event) { + Ok(Some(reaction)) => reaction, + Ok(None) => return Ok(ReactionProjectionOutcome::NotReaction), + Err(_) => return Ok(ReactionProjectionOutcome::Ineligible), + }; + let fields = reaction_projection_fields(&reaction, projected_at); + self.db + .query( + r#" +UPSERT type::record('reaction_projection', $event_id) CONTENT { + reaction_id: $reaction_id, + event_id: $event_id, + pubkey: $pubkey, + created_at: $created_at, + content: $content, + value_type: $value_type, + value: $value, + target_event_id: $target_event_id, + target_pubkey: $target_pubkey, + target_address: $target_address, + target_kind: $target_kind, + hidden: false, + deleted: false, + projected_at: $projected_at +}; +"#, + ) + .bind(("event_id", event.id().as_str())) + .bind(("reaction_id", fields.reaction_id)) + .bind(("pubkey", fields.pubkey)) + .bind(("created_at", fields.created_at)) + .bind(("content", fields.content)) + .bind(("value_type", fields.value_type)) + .bind(("value", fields.value)) + .bind(("target_event_id", fields.target_event_id.as_str())) + .bind(("target_pubkey", fields.target_pubkey)) + .bind(("target_address", fields.target_address)) + .bind(("target_kind", fields.target_kind.as_deref())) + .bind(("projected_at", fields.projected_at)) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + self.recompute_reaction_count( + &fields.target_event_id, + fields.target_kind, + projected_at.as_u64(), + ) + .await?; + Ok(ReactionProjectionOutcome::Projected) + } + + pub async fn reaction_projection_row( + &self, + event_id: &EventId, + ) -> Result<Option<serde_json::Value>, SurrealStoreError> { + let mut response = self + .db + .query("SELECT * FROM ONLY type::record('reaction_projection', $event_id);") + .bind(("event_id", event_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + response.take(0).map_err(SurrealStoreError::from) + } + + pub async fn reaction_count_row( + &self, + target_event_id: &EventId, + ) -> Result<Option<serde_json::Value>, SurrealStoreError> { + let mut response = self + .db + .query("SELECT * FROM ONLY type::record('reaction_count', $target_event_id);") + .bind(("target_event_id", target_event_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + response.take(0).map_err(SurrealStoreError::from) + } + pub async fn project_listing_helpers( &self, event: &Event, @@ -2222,6 +2320,7 @@ UPDATE nostr_event SET hidden = true WHERE event_id = $event_id; UPDATE event_current SET hidden = true WHERE event_id = $event_id; UPDATE listing_current SET hidden = true WHERE event_id = $event_id; UPDATE comment_projection SET hidden = true WHERE event_id = $event_id; +UPDATE reaction_projection SET hidden = true WHERE event_id = $event_id; UPDATE search_doc SET visible = false WHERE event_id = $event_id OR current_event_id = $event_id; "#, ) @@ -2238,6 +2337,8 @@ UPDATE search_doc SET visible = false WHERE event_id = $event_id OR current_even .map_err(SurrealStoreError::from)? .check() .map_err(SurrealStoreError::from)?; + self.refresh_reaction_count_for_event(event_id.as_str(), created_at.as_u64()) + .await?; Ok(HiddenEventOutcome::Hidden) } @@ -2270,6 +2371,7 @@ UPDATE nostr_event SET hidden = false WHERE event_id = $event_id; UPDATE event_current SET hidden = false WHERE event_id = $event_id; UPDATE listing_current SET hidden = false WHERE event_id = $event_id; UPDATE comment_projection SET hidden = false WHERE event_id = $event_id; +UPDATE reaction_projection SET hidden = false WHERE event_id = $event_id; UPDATE search_doc SET visible = true WHERE (event_id = $event_id OR current_event_id = $event_id) AND status = "active"; UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_event_id = $event_id) AND status != "active"; "#, @@ -2291,6 +2393,8 @@ UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_eve .map_err(SurrealStoreError::from)? .check() .map_err(SurrealStoreError::from)?; + self.refresh_reaction_count_for_event(event_id.as_str(), created_at.as_u64()) + .await?; Ok(HiddenEventOutcome::Unhidden) } @@ -2607,6 +2711,90 @@ UPSERT type::record('relay_user', $pubkey) CONTENT { Ok(()) } + async fn refresh_reaction_count_for_event( + &self, + event_id: &str, + updated_at: u64, + ) -> Result<(), SurrealStoreError> { + let mut response = self + .db + .query("SELECT * FROM ONLY type::record('reaction_projection', $event_id);") + .bind(("event_id", event_id)) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + let row: Option<serde_json::Value> = response.take(0).map_err(SurrealStoreError::from)?; + let Some(row) = row else { + return Ok(()); + }; + let target_event_id = string_row_field(&row, "target_event_id")?; + let target_kind = optional_string_row_field(&row, "target_kind")?; + self.recompute_reaction_count(&target_event_id, target_kind, updated_at) + .await + } + + async fn recompute_reaction_count( + &self, + target_event_id: &str, + target_kind: Option<String>, + updated_at: u64, + ) -> Result<(), SurrealStoreError> { + let mut response = self + .db + .query( + "SELECT value_type FROM reaction_projection WHERE target_event_id = $target_event_id AND hidden = false AND deleted = false;", + ) + .bind(("target_event_id", target_event_id)) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + let rows: Vec<serde_json::Value> = response.take(0).map_err(SurrealStoreError::from)?; + let mut like_count = 0_i64; + let mut dislike_count = 0_i64; + let mut emoji_count = 0_i64; + let mut text_count = 0_i64; + for row in rows { + match string_row_field(&row, "value_type")?.as_str() { + "like" => like_count += 1, + "dislike" => dislike_count += 1, + "emoji" => emoji_count += 1, + "text" => text_count += 1, + _ => {} + } + } + let total_count = like_count + dislike_count + emoji_count + text_count; + self.db + .query( + r#" +UPSERT type::record('reaction_count', $target_event_id) CONTENT { + target_event_id: $target_event_id, + target_kind: $target_kind, + like_count: $like_count, + dislike_count: $dislike_count, + emoji_count: $emoji_count, + text_count: $text_count, + total_count: $total_count, + updated_at: $updated_at +}; +"#, + ) + .bind(("target_event_id", target_event_id)) + .bind(("target_kind", target_kind.as_deref())) + .bind(("like_count", like_count)) + .bind(("dislike_count", dislike_count)) + .bind(("emoji_count", emoji_count)) + .bind(("text_count", text_count)) + .bind(("total_count", total_count)) + .bind(("updated_at", updated_at)) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + Ok(()) + } + async fn query_single_indexed_tag_event_ids( &self, tag: &str, @@ -2698,12 +2886,14 @@ UPSERT type::record('relay_user', $pubkey) CONTENT { &self, event_id: &str, author_pubkey: &str, + deleted_at: u64, ) -> Result<(), SurrealStoreError> { self.db .query( r#" UPDATE nostr_event SET deleted = true WHERE event_id = $event_id AND pubkey = $author_pubkey; UPDATE comment_projection SET deleted = true WHERE event_id = $event_id AND pubkey = $author_pubkey; +UPDATE reaction_projection SET deleted = true WHERE event_id = $event_id AND pubkey = $author_pubkey; "#, ) .bind(("event_id", event_id)) @@ -2712,6 +2902,8 @@ UPDATE comment_projection SET deleted = true WHERE event_id = $event_id AND pubk .map_err(SurrealStoreError::from)? .check() .map_err(SurrealStoreError::from)?; + self.refresh_reaction_count_for_event(event_id, deleted_at) + .await?; Ok(()) } @@ -2780,6 +2972,27 @@ fn required_policy_text(value: &str, field: &str) -> Result<String, SurrealStore Ok(value.to_owned()) } +fn string_row_field(row: &serde_json::Value, field: &str) -> Result<String, SurrealStoreError> { + row.get(field) + .and_then(serde_json::Value::as_str) + .map(str::to_owned) + .ok_or_else(|| SurrealStoreError::new(&format!("{field} row field must be a string"))) +} + +fn optional_string_row_field( + row: &serde_json::Value, + field: &str, +) -> Result<Option<String>, SurrealStoreError> { + match row.get(field) { + Some(value) if value.is_null() => Ok(None), + Some(value) => value + .as_str() + .map(|value| Some(value.to_owned())) + .ok_or_else(|| SurrealStoreError::new(&format!("{field} row field must be a string"))), + None => Ok(None), + } +} + fn moderation_action_id( action: &str, target_ref: &str, @@ -2944,6 +3157,20 @@ struct CommentProjectionFields { projected_at: u64, } +struct ReactionProjectionFields { + reaction_id: String, + pubkey: String, + created_at: u64, + content: String, + value_type: String, + value: String, + target_event_id: String, + target_pubkey: Option<String>, + target_address: Option<String>, + target_kind: Option<String>, + projected_at: u64, +} + struct ListingCurrentFields { listing_key: String, listing_key_hash: String, @@ -3012,6 +3239,37 @@ fn comment_projection_fields( } } +fn reaction_projection_fields( + reaction: &ReactionEvent, + projected_at: UnixTimestamp, +) -> ReactionProjectionFields { + ReactionProjectionFields { + reaction_id: reaction.event_id().as_str().to_owned(), + pubkey: reaction.pubkey().as_str().to_owned(), + created_at: reaction.created_at().as_u64(), + content: reaction.content().to_owned(), + value_type: reaction.value().canonical().to_owned(), + value: reaction_value_string(reaction.value()), + target_event_id: reaction.target_event_id().as_str().to_owned(), + target_pubkey: reaction + .target_pubkey() + .map(|pubkey| pubkey.as_str().to_owned()), + target_address: reaction + .target_address() + .map(|address| address.key().to_string()), + target_kind: reaction.target_kind().map(str::to_owned), + projected_at: projected_at.as_u64(), + } +} + +fn reaction_value_string(value: &ReactionValue) -> String { + match value { + ReactionValue::Like => "like".to_owned(), + ReactionValue::Dislike => "dislike".to_owned(), + ReactionValue::Emoji(value) | ReactionValue::Text(value) => value.clone(), + } +} + struct SearchDocumentFields { doc_key: String, address_key: Option<String>, @@ -3240,10 +3498,10 @@ mod tests { CommentProjectionOutcome, CommentProjectionQuery, CurrentEventOutcome, DeletionMarkerOutcome, DurableRateLimitDecision, HiddenEventOutcome, ListingCurrentOutcome, ListingHelperOutcome, ListingProjectionQuery, ListingRevisionOutcome, - MigrationApplyOutcome, SearchDocumentOutcome, SearchDocumentQuery, SurrealConfigError, - SurrealConnectionConfig, SurrealConnectionMode, SurrealMigration, SurrealMigrationError, - SurrealMigrationPlan, SurrealStore, SurrealStoreError, base_migration_plan, - migration_tracking_schema, + MigrationApplyOutcome, ReactionProjectionOutcome, SearchDocumentOutcome, + SearchDocumentQuery, SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode, + SurrealMigration, SurrealMigrationError, SurrealMigrationPlan, SurrealStore, + SurrealStoreError, base_migration_plan, migration_tracking_schema, }; use tangle_nips::ListingProjectionEvaluation; use tangle_protocol::{ @@ -5419,6 +5677,173 @@ mod tests { } #[tokio::test] + async fn project_reactions_persists_rows_and_aggregate_counts() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let like = listing_reaction(&listing, 1_714_125_030, "+"); + let dislike = listing_reaction(&listing, 1_714_125_031, "-"); + let emoji = listing_reaction(&listing, 1_714_125_032, "⭐"); + let invalid = + build_fixture_event_from_parts(FixtureKey::Buyer, 1_714_125_029, 7, Vec::new(), "+") + .expect("invalid reaction"); + + assert_eq!( + store + .project_reaction(&listing, UnixTimestamp::new(1_714_125_033)) + .await + .expect("not reaction"), + ReactionProjectionOutcome::NotReaction + ); + assert_eq!( + store + .project_reaction(&invalid, UnixTimestamp::new(1_714_125_033)) + .await + .expect("invalid reaction"), + ReactionProjectionOutcome::Ineligible + ); + for reaction in [&like, &dislike, &emoji] { + assert_eq!( + store + .project_reaction(reaction, UnixTimestamp::new(1_714_125_033)) + .await + .expect("project reaction"), + ReactionProjectionOutcome::Projected + ); + } + + let row = store + .reaction_projection_row(like.id()) + .await + .expect("reaction row") + .expect("reaction row exists"); + assert_eq!(row["reaction_id"], like.id().as_str()); + assert_eq!(row["event_id"], like.id().as_str()); + assert_eq!(row["pubkey"], FixtureKey::Buyer.public_key().as_str()); + assert_eq!(row["content"], "+"); + assert_eq!(row["value_type"], "like"); + assert_eq!(row["value"], "like"); + assert_eq!(row["target_event_id"], listing.id().as_str()); + assert_eq!(row["target_pubkey"], listing.unsigned().pubkey().as_str()); + assert_eq!( + row["target_address"], + format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()) + ); + assert_eq!(row["target_kind"], "30402"); + assert_eq!(row["hidden"], false); + assert_eq!(row["deleted"], false); + + let count = store + .reaction_count_row(listing.id()) + .await + .expect("count row") + .expect("count row exists"); + assert_eq!(count["target_event_id"], listing.id().as_str()); + assert_eq!(count["target_kind"], "30402"); + assert_eq!(count["like_count"], 1_i64); + assert_eq!(count["dislike_count"], 1_i64); + assert_eq!(count["emoji_count"], 1_i64); + assert_eq!(count["text_count"], 0_i64); + assert_eq!(count["total_count"], 3_i64); + } + + #[tokio::test] + async fn reaction_counts_track_hidden_restored_and_deleted_reactions() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let reaction = listing_reaction(&listing, 1_714_125_040, "+"); + let admin_pubkey = "a".repeat(PublicKeyHex::HEX_LENGTH); + store + .store_raw_event(&StoredEvent::new( + reaction.clone(), + UnixTimestamp::new(1_714_125_041), + )) + .await + .expect("raw reaction"); + store + .project_reaction(&reaction, UnixTimestamp::new(1_714_125_042)) + .await + .expect("project reaction"); + + assert_eq!( + store + .reaction_count_row(listing.id()) + .await + .expect("count") + .expect("count row")["like_count"], + 1_i64 + ); + store + .hide_event( + reaction.id(), + "reaction moderation", + "admin_api", + &admin_pubkey, + UnixTimestamp::new(1_714_125_043), + ) + .await + .expect("hide reaction"); + assert_eq!( + store + .reaction_count_row(listing.id()) + .await + .expect("count") + .expect("count row")["total_count"], + 0_i64 + ); + store + .unhide_event( + reaction.id(), + "reaction restored", + &admin_pubkey, + UnixTimestamp::new(1_714_125_044), + ) + .await + .expect("unhide reaction"); + assert_eq!( + store + .reaction_count_row(listing.id()) + .await + .expect("count") + .expect("count row")["total_count"], + 1_i64 + ); + let deletion = build_fixture_event_from_parts( + FixtureKey::Buyer, + 1_714_125_045, + 5, + vec![vec!["e".to_owned(), reaction.id().as_str().to_owned()]], + "", + ) + .expect("deletion event"); + store + .apply_deletion_markers(&deletion) + .await + .expect("delete reaction"); + let row = store + .reaction_projection_row(reaction.id()) + .await + .expect("reaction row") + .expect("reaction row exists"); + assert_eq!(row["deleted"], true); + assert_eq!( + store + .reaction_count_row(listing.id()) + .await + .expect("count") + .expect("count row")["total_count"], + 0_i64 + ); + } + + #[tokio::test] async fn hidden_event_overlay_excludes_events_from_public_read_models() { let store = memory_store().await; store @@ -5981,6 +6406,31 @@ mod tests { .expect("comment event") } + fn listing_reaction(listing: &Event, created_at: u64, content: &str) -> Event { + let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()); + build_fixture_event_from_parts( + FixtureKey::Buyer, + created_at, + 7, + vec![ + vec![ + "e".to_owned(), + listing.id().as_str().to_owned(), + "wss://relay.radroots.test".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec![ + "p".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec!["a".to_owned(), listing_key], + vec!["k".to_owned(), "30402".to_owned()], + ], + content, + ) + .expect("reaction event") + } + fn synthetic_event( id_digit: &str, sig_digit: &str,