tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 0b71a5e01dcdfd558812b16057c65e9736a236dc
parent da6a212a025627f689a69e8e4a1c8a37b7d16e21
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 03:34:12 -0700

http: expose reaction counts

Diffstat:
Mcrates/tangle/tests/run_integration.rs | 68++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/lib.rs | 210+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 275 insertions(+), 3 deletions(-)

diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs @@ -53,6 +53,7 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); let comment = listing_comment(&listing, 1_714_124_436, "Can I pickup Saturday?"); + let reaction = listing_reaction(&listing, 1_714_124_437, "+"); let auth = build_fixture_event(&auth_event_spec()).expect("auth"); let seller = FixtureKey::Seller.public_key(); @@ -163,6 +164,29 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { assert_eq!(fetched_comment[2]["id"], comment.id().as_str()); assert_eq!(next_label(&mut publisher).await, "EOSE"); + publisher + .send(Message::Text( + serde_json::json!(["EVENT", event_to_value(&reaction)]) + .to_string() + .into(), + )) + .await + .expect("reaction send"); + assert_ok(&next_json(&mut publisher).await, true); + publisher + .send(Message::Text( + serde_json::json!(["REQ", "sub-reaction", { "ids": [reaction.id().as_str()] }]) + .to_string() + .into(), + )) + .await + .expect("reaction fetch send"); + let fetched_reaction = next_json(&mut publisher).await; + assert_eq!(fetched_reaction[0], "EVENT"); + assert_eq!(fetched_reaction[1], "sub-reaction"); + assert_eq!(fetched_reaction[2]["id"], reaction.id().as_str()); + assert_eq!(next_label(&mut publisher).await, "EOSE"); + subscriber .send(Message::Text( serde_json::json!(["CLOSE", "sub-live"]).to_string().into(), @@ -191,6 +215,13 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { assert!(comments.contains("200 OK")); assert!(comments.contains(comment.id().as_str())); assert!(comments.contains("Can I pickup Saturday?")); + let reactions = http_get( + port, + &format!("/api/listings/{}/listing-a/reactions", seller.as_str()), + ); + assert!(reactions.contains("200 OK")); + assert!(reactions.contains("\"like_count\":1")); + assert!(reactions.contains("\"total_count\":1")); let search = http_get(port, "/api/search?q=carrots&limit=5"); assert!(search.contains("200 OK")); assert!(search.contains(listing.id().as_str())); @@ -234,6 +265,14 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { assert_eq!(comment_row["event_id"], comment.id().as_str()); assert_eq!(comment_row["root_ref"], listing_key); assert_eq!(comment_row["content"], "Can I pickup Saturday?"); + let reaction_count = store + .reaction_count_row(listing.id()) + .await + .expect("reaction count") + .expect("reaction count exists"); + assert_eq!(reaction_count["target_event_id"], listing.id().as_str()); + assert_eq!(reaction_count["like_count"], 1_i64); + assert_eq!(reaction_count["total_count"], 1_i64); assert!( store .search_document_row(&listing_key) @@ -823,6 +862,35 @@ fn listing_comment( .expect("comment event") } +fn listing_reaction( + listing: &tangle_protocol::Event, + created_at: u64, + content: &str, +) -> tangle_protocol::Event { + let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()); + build_fixture_event_from_parts( + FixtureKey::Seller, + created_at, + 7, + vec![ + vec![ + "e".to_owned(), + listing.id().as_str().to_owned(), + "wss://relay.radroots.test".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec![ + "p".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec!["a".to_owned(), listing_key], + vec!["k".to_owned(), "30402".to_owned()], + ], + content, + ) + .expect("reaction event") +} + fn stop_relay(mut relay: Child) { stop_child(&mut relay); let status = relay.wait().expect("relay exit"); diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -36,8 +36,8 @@ use tangle_protocol::{ use tangle_store::{StoreEventOutcome, StoredEvent}; use tangle_store_surreal::{ CommentProjectionOutcome, CommentProjectionQuery, DurableRateLimitDecision, - ListingProjectionQuery, MigrationApplyOutcome, SearchDocumentQuery, SurrealConnectionConfig, - SurrealConnectionMode, SurrealStore, base_migration_plan, + ListingProjectionQuery, MigrationApplyOutcome, ReactionProjectionOutcome, SearchDocumentQuery, + SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, base_migration_plan, }; use tokio::net::TcpListener; use tokio::sync::broadcast; @@ -597,6 +597,11 @@ async fn project_stored_event( Ok(CommentProjectionOutcome::NotComment | CommentProjectionOutcome::Ineligible) => false, Err(_) => return Err(RuntimeCommandError::store("event projection failed")), }; + let reaction_projected = match store.project_reaction(event, now).await { + Ok(ReactionProjectionOutcome::Projected) => true, + Ok(ReactionProjectionOutcome::NotReaction | ReactionProjectionOutcome::Ineligible) => false, + Err(_) => return Err(RuntimeCommandError::store("event projection failed")), + }; if effect == AdmissionEffect::StoreRawAndProjectPublicListing { if store.project_current_listing(event, now).await.is_err() || store.project_listing_helpers(event).await.is_err() @@ -606,7 +611,7 @@ async fn project_stored_event( } return Ok(true); } - Ok(comment_projected) + Ok(comment_projected || reaction_projected) } fn parse_event_import_document(raw: &str) -> Result<Vec<Event>, RuntimeCommandError> { @@ -963,6 +968,10 @@ fn runtime_router( "/api/listings/{pubkey}/{d}/comments", get(runtime_listing_comments), ) + .route( + "/api/listings/{pubkey}/{d}/reactions", + get(runtime_listing_reactions), + ) .route("/api/search", get(runtime_marketplace_search)) .route("/api/sellers/{pubkey}", get(runtime_seller_detail)) .route( @@ -1055,6 +1064,20 @@ async fn runtime_listing_comments( .await } +async fn runtime_listing_reactions( + State(state): State<RuntimeRelayState>, + Path((pubkey, d)): Path<(String, String)>, +) -> Result<Json<ReactionCountsDocument>, ApiError> { + listing_reactions( + State(ListingsHttpState::new( + state.store.clone(), + state.config.limits(), + )), + Path((pubkey, d)), + ) + .await +} + async fn runtime_marketplace_search( State(state): State<RuntimeRelayState>, RawQuery(query): RawQuery, @@ -1887,6 +1910,7 @@ impl EventMessageHandler { .await .is_err() || self.store.project_comment(&event, now).await.is_err() + || self.store.project_reaction(&event, now).await.is_err() { return ok_rejected(event_id, "error: projection failed".to_owned()); } @@ -2568,6 +2592,18 @@ pub struct CommentReferenceDocument { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ReactionCountsDocument { + pub target_event_id: String, + pub target_kind: Option<String>, + pub like_count: u64, + pub dislike_count: u64, + pub emoji_count: u64, + pub text_count: u64, + pub total_count: u64, + pub updated_at: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SellerDocument { pub pubkey: String, pub approved: bool, @@ -2771,6 +2807,10 @@ pub fn listings_router(state: ListingsHttpState) -> Router { .route("/api/listings", get(listings)) .route("/api/listings/{pubkey}/{d}", get(listing_detail)) .route("/api/listings/{pubkey}/{d}/comments", get(listing_comments)) + .route( + "/api/listings/{pubkey}/{d}/reactions", + get(listing_reactions), + ) .route("/api/search", get(marketplace_search)) .route("/api/sellers/{pubkey}", get(seller_detail)) .with_state(state) @@ -2912,6 +2952,36 @@ async fn listing_comments( })) } +async fn listing_reactions( + State(state): State<ListingsHttpState>, + Path((pubkey, d)): Path<(String, String)>, +) -> Result<Json<ReactionCountsDocument>, ApiError> { + let pubkey = parse_pubkey("pubkey", &pubkey)?; + let d = required_value("d", &d)?; + let listing_key = format!("30402:{}:{d}", pubkey.as_str()); + let listing = state + .store + .listing_current_row(&listing_key) + .await + .map_err(|_| ApiError::internal())? + .ok_or_else(|| ApiError::not_found("listing not found"))?; + if bool_field(&listing, "hidden")? || bool_field(&listing, "deleted")? { + return Err(ApiError::not_found("listing not found")); + } + let event_id = + EventId::new(&string_field(&listing, "event_id")?).map_err(|_| ApiError::internal())?; + let row = state + .store + .reaction_count_row(&event_id) + .await + .map_err(|_| ApiError::internal())?; + Ok(Json(reaction_counts_document( + row.as_ref(), + event_id.as_str(), + Some("30402"), + )?)) +} + async fn marketplace_search( State(state): State<ListingsHttpState>, RawQuery(query): RawQuery, @@ -3189,6 +3259,35 @@ fn comment_item_document(row: &serde_json::Value) -> Result<CommentItemDocument, }) } +fn reaction_counts_document( + row: Option<&serde_json::Value>, + target_event_id: &str, + target_kind: Option<&str>, +) -> Result<ReactionCountsDocument, ApiError> { + match row { + Some(row) => Ok(ReactionCountsDocument { + target_event_id: string_field(row, "target_event_id")?, + target_kind: optional_string_field(row, "target_kind")?, + like_count: u64_field(row, "like_count")?, + dislike_count: u64_field(row, "dislike_count")?, + emoji_count: u64_field(row, "emoji_count")?, + text_count: u64_field(row, "text_count")?, + total_count: u64_field(row, "total_count")?, + updated_at: u64_field(row, "updated_at")?, + }), + None => Ok(ReactionCountsDocument { + target_event_id: target_event_id.to_owned(), + target_kind: target_kind.map(str::to_owned), + like_count: 0, + dislike_count: 0, + emoji_count: 0, + text_count: 0, + total_count: 0, + updated_at: 0, + }), + } +} + fn fulfillment_document(row: &serde_json::Value) -> Result<Vec<String>, ApiError> { let mut fulfillment = Vec::new(); if bool_field(row, "pickup_available")? { @@ -5532,6 +5631,82 @@ mod tests { } #[tokio::test] + async fn listing_reactions_endpoint_returns_aggregate_counts() { + let store = runtime_memory_store().await; + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let reaction = listing_reaction(&listing, 1_714_125_420, "+"); + store + .project_current_listing(&listing, UnixTimestamp::new(1_714_125_419)) + .await + .expect("project listing"); + + let uri = format!( + "/api/listings/{}/listing-a/reactions", + listing.unsigned().pubkey().as_str() + ); + let empty = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(empty.status(), StatusCode::OK); + let body = axum::body::to_bytes(empty.into_body(), usize::MAX) + .await + .expect("body"); + assert_eq!( + serde_json::from_slice::<serde_json::Value>(&body).expect("json"), + serde_json::json!({ + "target_event_id": listing.id().as_str(), + "target_kind": "30402", + "like_count": 0, + "dislike_count": 0, + "emoji_count": 0, + "text_count": 0, + "total_count": 0, + "updated_at": 0 + }) + ); + + store + .project_reaction(&reaction, UnixTimestamp::new(1_714_125_421)) + .await + .expect("project reaction"); + let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(response.status(), StatusCode::OK); + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("body"); + assert_eq!( + serde_json::from_slice::<serde_json::Value>(&body).expect("json"), + serde_json::json!({ + "target_event_id": listing.id().as_str(), + "target_kind": "30402", + "like_count": 1, + "dislike_count": 0, + "emoji_count": 0, + "text_count": 0, + "total_count": 1, + "updated_at": 1714125421 + }) + ); + } + + #[tokio::test] async fn listing_detail_endpoint_rejects_invalid_or_missing_listing() { let store = runtime_memory_store().await; let response = listings_router(ListingsHttpState::new( @@ -5793,6 +5968,35 @@ mod tests { .expect("comment event") } + fn listing_reaction( + listing: &tangle_protocol::Event, + created_at: u64, + content: &str, + ) -> tangle_protocol::Event { + let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()); + build_fixture_event_from_parts( + FixtureKey::Buyer, + created_at, + 7, + vec![ + vec![ + "e".to_owned(), + listing.id().as_str().to_owned(), + "wss://relay.radroots.test".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec![ + "p".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec!["a".to_owned(), listing_key], + vec!["k".to_owned(), "30402".to_owned()], + ], + content, + ) + .expect("reaction event") + } + fn runtime_client_message_loop() -> ClientMessageLoop { let mut connection = runtime_connection("client-loop"); connection.set_remote_addr("127.0.0.1:7777");