tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit a98d7814e294d2cf5a6e57c5912ae57ec6b4f785
parent 7afebc5970d67d7bce9ac25d0006927b056ee2ed
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 03:48:41 -0700

store-surreal: project forum threads

Diffstat:
Mcrates/tangle_store_surreal/src/lib.rs | 599+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 591 insertions(+), 8 deletions(-)

diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs @@ -6,10 +6,10 @@ use std::collections::BTreeSet; use surrealdb::Surreal; use surrealdb::engine::local::{Db, Mem, RocksDb}; use tangle_nips::{ - CommentEvent, DeletionTarget, ListingProjection, ListingProjectionEvaluation, LongFormEvent, - LongFormKind, NIP99_DRAFT_LISTING_KIND, NIP99_PUBLIC_LISTING_KIND, ReactionEvent, - ReactionValue, evaluate_listing_projection, parse_comment_event, parse_deletion_request, - parse_long_form_event, parse_reaction_event, + CommentEvent, DeletionTarget, ForumThreadEvent, ListingProjection, ListingProjectionEvaluation, + LongFormEvent, LongFormKind, NIP99_DRAFT_LISTING_KIND, NIP99_PUBLIC_LISTING_KIND, + ReactionEvent, ReactionValue, evaluate_listing_projection, parse_comment_event, + parse_deletion_request, parse_forum_thread_event, parse_long_form_event, parse_reaction_event, }; use tangle_protocol::{AddressCoordinate, Event, EventId, Filter, UnixTimestamp, event_to_value}; use tangle_store::{StoreEventOutcome, StoredEvent}; @@ -882,6 +882,13 @@ pub enum LongFormProjectionOutcome { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ForumThreadProjectionOutcome { + NotForumThread, + Ineligible, + Projected, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum HiddenEventOutcome { NotFound, Hidden, @@ -1099,6 +1106,34 @@ impl LongFormProjectionQuery { } } +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct ForumThreadProjectionQuery { + pubkey: Option<String>, + topic: Option<String>, + limit: Option<u64>, +} + +impl ForumThreadProjectionQuery { + pub fn new() -> Self { + Self::default() + } + + pub fn with_pubkey(mut self, pubkey: &str) -> Self { + self.pubkey = Some(pubkey.to_owned()); + self + } + + pub fn with_topic(mut self, topic: &str) -> Self { + self.topic = Some(topic.to_owned()); + self + } + + pub fn with_limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } +} + #[derive(Clone)] pub struct SurrealStore { db: Surreal<Db>, @@ -2324,6 +2359,169 @@ UPSERT type::record('search_doc', $long_form_key) CONTENT { response.take(0).map_err(SurrealStoreError::from) } + pub async fn project_forum_thread( + &self, + event: &Event, + projected_at: UnixTimestamp, + ) -> Result<ForumThreadProjectionOutcome, SurrealStoreError> { + let thread = match parse_forum_thread_event(event) { + Ok(Some(thread)) => thread, + Ok(None) => return Ok(ForumThreadProjectionOutcome::NotForumThread), + Err(_) => return Ok(ForumThreadProjectionOutcome::Ineligible), + }; + let fields = forum_thread_projection_fields(&thread, projected_at); + self.db + .query( + r#" +UPSERT type::record('forum_thread_projection', $thread_id) CONTENT { + thread_id: $thread_id, + event_id: $event_id, + pubkey: $pubkey, + created_at: $created_at, + updated_at: $updated_at, + title: $title, + content: $content, + tags: $tags, + referenced_events: $referenced_events, + referenced_pubkeys: $referenced_pubkeys, + hidden: false, + deleted: false, + projected_at: $projected_at +}; +UPSERT type::record('search_doc', $thread_id) CONTENT { + doc_key: $thread_id, + event_id: $event_id, + current_event_id: $event_id, + doc_type: "forum_thread", + kind: $kind, + pubkey: $pubkey, + address_key: NONE, + title: $search_title, + summary: $title, + body: $content, + category_text: $category_text, + location_text: NONE, + tags: $tags, + categories: [], + created_at: $created_at, + updated_at: $updated_at, + visible: true, + status: "open", + seller_trust_score: NONE +}; +"#, + ) + .bind(("thread_id", fields.thread_id.as_str())) + .bind(("event_id", fields.event_id.as_str())) + .bind(("pubkey", fields.pubkey.as_str())) + .bind(("created_at", fields.created_at)) + .bind(("updated_at", fields.updated_at)) + .bind(("title", fields.title.as_deref())) + .bind(("content", fields.content.as_str())) + .bind(("tags", fields.tags.clone())) + .bind(("referenced_events", fields.referenced_events.clone())) + .bind(("referenced_pubkeys", fields.referenced_pubkeys.clone())) + .bind(("projected_at", fields.projected_at)) + .bind(("kind", fields.kind)) + .bind(("search_title", fields.search_title.as_str())) + .bind(("category_text", fields.tags.join(" "))) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + self.replace_forum_thread_topic_rows(&fields).await?; + Ok(ForumThreadProjectionOutcome::Projected) + } + + pub async fn forum_thread_row( + &self, + thread_id: &EventId, + ) -> Result<Option<serde_json::Value>, SurrealStoreError> { + let mut response = self + .db + .query("SELECT * FROM ONLY type::record('forum_thread_projection', $thread_id);") + .bind(("thread_id", thread_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + response.take(0).map_err(SurrealStoreError::from) + } + + pub async fn forum_thread_topic_rows( + &self, + thread_id: &EventId, + ) -> Result<Vec<serde_json::Value>, SurrealStoreError> { + let mut response = self + .db + .query( + "SELECT * FROM forum_thread_topic WHERE thread_id = $thread_id ORDER BY topic ASC;", + ) + .bind(("thread_id", thread_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + response.take(0).map_err(SurrealStoreError::from) + } + + pub async fn query_forum_threads( + &self, + query: &ForumThreadProjectionQuery, + ) -> Result<Vec<serde_json::Value>, SurrealStoreError> { + let topic_thread_ids = match query.topic.as_deref() { + Some(topic) => { + let normalized = topic.trim().to_ascii_lowercase(); + let mut response = self + .db + .query( + "SELECT VALUE thread_id FROM forum_thread_topic WHERE topic = $topic AND hidden = false AND deleted = false ORDER BY updated_at DESC, event_id ASC;", + ) + .bind(("topic", normalized.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + let ids: Vec<String> = response.take(0).map_err(SurrealStoreError::from)?; + if ids.is_empty() { + return Ok(Vec::new()); + } + Some(ids) + } + None => None, + }; + let mut statement = + "SELECT * FROM forum_thread_projection WHERE hidden = false AND deleted = false" + .to_owned(); + if query.pubkey.is_some() { + statement.push_str(" AND pubkey = $pubkey"); + } + if topic_thread_ids.is_some() { + statement.push_str(" AND thread_id IN $topic_thread_ids"); + } + statement.push_str(" ORDER BY updated_at DESC, event_id ASC"); + if query.limit.is_some() { + statement.push_str(" LIMIT $limit"); + } + statement.push(';'); + let mut surreal_query = self.db.query(statement); + if let Some(value) = &query.pubkey { + surreal_query = surreal_query.bind(("pubkey", value.as_str())); + } + if let Some(ids) = topic_thread_ids { + surreal_query = surreal_query.bind(("topic_thread_ids", ids)); + } + if let Some(value) = query.limit { + surreal_query = surreal_query.bind(("limit", value)); + } + let mut response = surreal_query + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + response.take(0).map_err(SurrealStoreError::from) + } + pub async fn project_listing_helpers( &self, event: &Event, @@ -2623,6 +2821,8 @@ UPDATE comment_projection SET hidden = true WHERE event_id = $event_id; UPDATE reaction_projection SET hidden = true WHERE event_id = $event_id; UPDATE long_form_current SET hidden = true WHERE event_id = $event_id; UPDATE long_form_topic SET hidden = true WHERE event_id = $event_id; +UPDATE forum_thread_projection SET hidden = true WHERE event_id = $event_id; +UPDATE forum_thread_topic SET hidden = true WHERE event_id = $event_id; UPDATE search_doc SET visible = false WHERE event_id = $event_id OR current_event_id = $event_id; "#, ) @@ -2676,8 +2876,10 @@ UPDATE comment_projection SET hidden = false WHERE event_id = $event_id; UPDATE reaction_projection SET hidden = false WHERE event_id = $event_id; UPDATE long_form_current SET hidden = false WHERE event_id = $event_id; UPDATE long_form_topic SET hidden = false WHERE event_id = $event_id; -UPDATE search_doc SET visible = true WHERE (event_id = $event_id OR current_event_id = $event_id) AND (status = "active" OR status = "published"); -UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_event_id = $event_id) AND status != "active" AND status != "published"; +UPDATE forum_thread_projection SET hidden = false WHERE event_id = $event_id; +UPDATE forum_thread_topic SET hidden = false WHERE event_id = $event_id; +UPDATE search_doc SET visible = true WHERE (event_id = $event_id OR current_event_id = $event_id) AND (status = "active" OR status = "published" OR status = "open"); +UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_event_id = $event_id) AND status != "active" AND status != "published" AND status != "open"; "#, ) .bind(("event_id", event_id.as_str())) @@ -2985,6 +3187,43 @@ CREATE long_form_topic CONTENT { Ok(()) } + async fn replace_forum_thread_topic_rows( + &self, + fields: &ForumThreadProjectionFields, + ) -> Result<(), SurrealStoreError> { + self.db + .query("DELETE forum_thread_topic WHERE thread_id = $thread_id;") + .bind(("thread_id", fields.thread_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + for topic in &fields.tags { + self.db + .query( + r#" +CREATE forum_thread_topic CONTENT { + thread_id: $thread_id, + topic: $topic, + updated_at: $updated_at, + event_id: $event_id, + hidden: false, + deleted: false +}; +"#, + ) + .bind(("thread_id", fields.thread_id.as_str())) + .bind(("topic", topic.as_str())) + .bind(("updated_at", fields.updated_at)) + .bind(("event_id", fields.event_id.as_str())) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + } + Ok(()) + } + async fn upsert_rate_limit_state( &self, key: &str, @@ -3237,6 +3476,8 @@ UPDATE comment_projection SET deleted = true WHERE event_id = $event_id AND pubk UPDATE reaction_projection SET deleted = true WHERE event_id = $event_id AND pubkey = $author_pubkey; UPDATE long_form_current SET deleted = true WHERE event_id = $event_id AND author_pubkey = $author_pubkey; UPDATE long_form_topic SET deleted = true WHERE event_id = $event_id; +UPDATE forum_thread_projection SET deleted = true WHERE event_id = $event_id AND pubkey = $author_pubkey; +UPDATE forum_thread_topic SET deleted = true WHERE event_id = $event_id; UPDATE search_doc SET visible = false WHERE event_id = $event_id OR current_event_id = $event_id; "#, ) @@ -3541,6 +3782,22 @@ struct LongFormProjectionFields { search_title: String, } +struct ForumThreadProjectionFields { + thread_id: String, + event_id: String, + pubkey: String, + created_at: u64, + updated_at: u64, + title: Option<String>, + content: String, + tags: Vec<String>, + referenced_events: Vec<String>, + referenced_pubkeys: Vec<String>, + projected_at: u64, + kind: u32, + search_title: String, +} + struct ListingCurrentFields { listing_key: String, listing_key_hash: String, @@ -3688,6 +3945,46 @@ fn long_form_current_should_replace(article: &LongFormEvent, row: &serde_json::V && article.event_id().as_str() > existing_event_id) } +fn forum_thread_projection_fields( + thread: &ForumThreadEvent, + projected_at: UnixTimestamp, +) -> ForumThreadProjectionFields { + ForumThreadProjectionFields { + thread_id: thread.event_id().as_str().to_owned(), + event_id: thread.event_id().as_str().to_owned(), + pubkey: thread.pubkey().as_str().to_owned(), + created_at: thread.created_at().as_u64(), + updated_at: thread.created_at().as_u64(), + title: thread.title().map(str::to_owned), + content: thread.content().to_owned(), + tags: thread.topics().to_vec(), + referenced_events: thread + .referenced_events() + .iter() + .map(|event_id| event_id.as_str().to_owned()) + .collect(), + referenced_pubkeys: thread + .referenced_pubkeys() + .iter() + .map(|pubkey| pubkey.as_str().to_owned()) + .collect(), + projected_at: projected_at.as_u64(), + kind: 11, + search_title: thread + .title() + .map(str::to_owned) + .unwrap_or_else(|| fallback_thread_title(thread)), + } +} + +fn fallback_thread_title(thread: &ForumThreadEvent) -> String { + let fallback = thread.content().chars().take(80).collect::<String>(); + if fallback.is_empty() { + return thread.event_id().as_str().to_owned(); + } + fallback +} + struct SearchDocumentFields { doc_key: String, address_key: Option<String>, @@ -3914,7 +4211,8 @@ impl From<surrealdb::Error> for SurrealStoreError { mod tests { use super::{ CommentProjectionOutcome, CommentProjectionQuery, CurrentEventOutcome, - DeletionMarkerOutcome, DurableRateLimitDecision, HiddenEventOutcome, ListingCurrentOutcome, + DeletionMarkerOutcome, DurableRateLimitDecision, ForumThreadProjectionOutcome, + ForumThreadProjectionQuery, HiddenEventOutcome, ListingCurrentOutcome, ListingHelperOutcome, ListingProjectionQuery, ListingRevisionOutcome, LongFormProjectionOutcome, LongFormProjectionQuery, MigrationApplyOutcome, ReactionProjectionOutcome, SearchDocumentOutcome, SearchDocumentQuery, SurrealConfigError, @@ -3923,7 +4221,8 @@ mod tests { migration_tracking_schema, }; use tangle_nips::{ - ListingProjectionEvaluation, NIP23_LONG_FORM_DRAFT_KIND, NIP23_LONG_FORM_KIND, + ListingProjectionEvaluation, NIP7D_THREAD_KIND, NIP23_LONG_FORM_DRAFT_KIND, + NIP23_LONG_FORM_KIND, }; use tangle_protocol::{ Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, @@ -6695,6 +6994,264 @@ mod tests { } #[tokio::test] + async fn project_forum_threads_persists_projection_topic_and_search_rows() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let thread = forum_thread(1_714_125_080, Some("Market day thread"), &["market", "CSA"]); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let invalid = build_fixture_event_from_parts( + FixtureKey::Buyer, + 1_714_125_079, + u64::from(NIP7D_THREAD_KIND), + vec![vec!["p".to_owned(), "bad".to_owned()]], + "Invalid thread.", + ) + .expect("invalid thread"); + + assert_eq!( + store + .project_forum_thread(&listing, UnixTimestamp::new(1_714_125_081)) + .await + .expect("not forum"), + ForumThreadProjectionOutcome::NotForumThread + ); + assert_eq!( + store + .project_forum_thread(&invalid, UnixTimestamp::new(1_714_125_081)) + .await + .expect("invalid forum"), + ForumThreadProjectionOutcome::Ineligible + ); + assert_eq!( + store + .project_forum_thread(&thread, UnixTimestamp::new(1_714_125_082)) + .await + .expect("project thread"), + ForumThreadProjectionOutcome::Projected + ); + + let row = store + .forum_thread_row(thread.id()) + .await + .expect("thread row") + .expect("thread row exists"); + assert_eq!(row["thread_id"], thread.id().as_str()); + assert_eq!(row["event_id"], thread.id().as_str()); + assert_eq!(row["pubkey"], FixtureKey::Buyer.public_key().as_str()); + assert_eq!(row["created_at"], 1_714_125_080_u64); + assert_eq!(row["updated_at"], 1_714_125_080_u64); + assert_eq!(row["title"], "Market day thread"); + assert_eq!(row["content"], "What is everyone bringing this weekend?"); + assert_eq!(row["tags"][0], "csa"); + assert_eq!(row["tags"][1], "market"); + assert_eq!(row["referenced_events"][0], "5".repeat(EventId::HEX_LENGTH)); + assert_eq!( + row["referenced_pubkeys"][0], + FixtureKey::Seller.public_key().as_str() + ); + assert_eq!(row["hidden"], false); + assert_eq!(row["deleted"], false); + + let topics = store + .forum_thread_topic_rows(thread.id()) + .await + .expect("topic rows"); + assert_eq!(topics.len(), 2); + assert_eq!(topics[0]["topic"], "csa"); + assert_eq!(topics[1]["topic"], "market"); + assert_eq!( + store + .query_forum_threads( + &ForumThreadProjectionQuery::new() + .with_pubkey(FixtureKey::Buyer.public_key().as_str()) + .with_topic("Market") + .with_limit(5), + ) + .await + .expect("forum query") + .len(), + 1 + ); + + let search = store + .search_document_row(thread.id().as_str()) + .await + .expect("search row") + .expect("search row exists"); + assert_eq!(search["doc_type"], "forum_thread"); + assert_eq!(search["kind"], u64::from(NIP7D_THREAD_KIND)); + assert_eq!(search["title"], "Market day thread"); + assert_eq!(search["body"], "What is everyone bringing this weekend?"); + assert_eq!(search["status"], "open"); + assert_eq!(search["visible"], true); + assert_eq!( + store + .query_search_documents( + &SearchDocumentQuery::new() + .with_text("bringing") + .with_doc_type("forum_thread") + .with_visible(true), + ) + .await + .expect("search query") + .len(), + 1 + ); + } + + #[tokio::test] + async fn forum_thread_projection_tracks_moderation_and_deletion() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let thread = forum_thread(1_714_125_090, None, &["market"]); + let admin_pubkey = "a".repeat(PublicKeyHex::HEX_LENGTH); + store + .store_raw_event(&StoredEvent::new( + thread.clone(), + UnixTimestamp::new(1_714_125_091), + )) + .await + .expect("raw thread"); + store + .project_forum_thread(&thread, UnixTimestamp::new(1_714_125_092)) + .await + .expect("project thread"); + + assert_eq!( + store + .search_document_row(thread.id().as_str()) + .await + .expect("search") + .expect("search row")["title"], + "What is everyone bringing this weekend?" + ); + assert_eq!( + store + .hide_event( + thread.id(), + "forum moderation", + "admin_api", + &admin_pubkey, + UnixTimestamp::new(1_714_125_093), + ) + .await + .expect("hide thread"), + HiddenEventOutcome::Hidden + ); + assert!( + store + .query_forum_threads(&ForumThreadProjectionQuery::new()) + .await + .expect("hidden query") + .is_empty() + ); + assert_eq!( + store + .forum_thread_row(thread.id()) + .await + .expect("row") + .expect("row exists")["hidden"], + true + ); + assert_eq!( + store + .forum_thread_topic_rows(thread.id()) + .await + .expect("topics")[0]["hidden"], + true + ); + assert_eq!( + store + .search_document_row(thread.id().as_str()) + .await + .expect("search") + .expect("search row")["visible"], + false + ); + + assert_eq!( + store + .unhide_event( + thread.id(), + "forum restored", + &admin_pubkey, + UnixTimestamp::new(1_714_125_094), + ) + .await + .expect("unhide thread"), + HiddenEventOutcome::Unhidden + ); + assert_eq!( + store + .query_forum_threads(&ForumThreadProjectionQuery::new()) + .await + .expect("visible query") + .len(), + 1 + ); + assert_eq!( + store + .search_document_row(thread.id().as_str()) + .await + .expect("search") + .expect("search row")["visible"], + true + ); + + let deletion = build_fixture_event_from_parts( + FixtureKey::Buyer, + 1_714_125_095, + 5, + vec![vec!["e".to_owned(), thread.id().as_str().to_owned()]], + "", + ) + .expect("deletion event"); + assert_eq!( + store + .apply_deletion_markers(&deletion) + .await + .expect("delete thread"), + DeletionMarkerOutcome::Applied { targets: 1 } + ); + assert!( + store + .query_forum_threads(&ForumThreadProjectionQuery::new()) + .await + .expect("deleted query") + .is_empty() + ); + assert_eq!( + store + .forum_thread_row(thread.id()) + .await + .expect("row") + .expect("row exists")["deleted"], + true + ); + assert_eq!( + store + .forum_thread_topic_rows(thread.id()) + .await + .expect("topics")[0]["deleted"], + true + ); + assert_eq!( + store + .search_document_row(thread.id().as_str()) + .await + .expect("search") + .expect("search row")["visible"], + false + ); + } + + #[tokio::test] async fn hidden_event_overlay_excludes_events_from_public_read_models() { let store = memory_store().await; store @@ -7322,6 +7879,32 @@ mod tests { .expect("long-form article") } + fn forum_thread(created_at: u64, title: Option<&str>, topics: &[&str]) -> Event { + let mut tags = vec![ + vec!["e".to_owned(), "5".repeat(EventId::HEX_LENGTH)], + vec![ + "p".to_owned(), + FixtureKey::Seller.public_key().as_str().to_owned(), + ], + ]; + if let Some(title) = title { + tags.push(vec!["title".to_owned(), title.to_owned()]); + } + tags.extend( + topics + .iter() + .map(|topic| vec!["t".to_owned(), (*topic).to_owned()]), + ); + build_fixture_event_from_parts( + FixtureKey::Buyer, + created_at, + u64::from(NIP7D_THREAD_KIND), + tags, + "What is everyone bringing this weekend?", + ) + .expect("forum thread") + } + fn synthetic_event( id_digit: &str, sig_digit: &str,