tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit b14904ce5945208641eef0ea6205a94ed14c8200
parent dda7f1726eb94011f90e55346f6f6be7f387617f
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 04:21:01 -0700

http: add moderation review endpoints

Diffstat:
Mcrates/tangle/tests/run_integration.rs | 163+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/lib.rs | 276+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 434 insertions(+), 5 deletions(-)

diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs @@ -27,12 +27,14 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { let db_path = root.join("surrealdb"); let config_path = root.join("runtime.json"); fs::create_dir_all(&root).expect("runtime root"); + let admin = FixtureKey::Relay.public_key(); write_runtime_config( &config_path, &db_path, port, "tangle_it", serde_json::json!({ + "admin_pubkeys": [admin.as_str()], "approved_sellers": [FixtureKey::Seller.public_key().as_str()], "write_rate_limit": { "limit": 10, @@ -60,6 +62,8 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { let reaction = listing_reaction(&listing, 1_714_124_437, "+"); let thread = forum_thread(1_714_124_438, Some("Market day thread"), &["market", "csa"]); let thread_comment = forum_thread_comment(&thread, 1_714_124_439, "I can bring greens."); + let label = listing_label(&listing, 1_714_124_440, "reviewed"); + let report = listing_report(&listing, 1_714_124_441, "spam"); let auth = build_fixture_event(&auth_event_spec()).expect("auth"); let seller = FixtureKey::Seller.public_key(); @@ -242,6 +246,52 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { ); assert_eq!(next_label(&mut publisher).await, "EOSE"); + publisher + .send(Message::Text( + serde_json::json!(["EVENT", event_to_value(&label)]) + .to_string() + .into(), + )) + .await + .expect("label send"); + assert_ok(&next_json(&mut publisher).await, true); + publisher + .send(Message::Text( + serde_json::json!(["REQ", "sub-label", { "ids": [label.id().as_str()] }]) + .to_string() + .into(), + )) + .await + .expect("label fetch send"); + let fetched_label = next_json(&mut publisher).await; + assert_eq!(fetched_label[0], "EVENT"); + assert_eq!(fetched_label[1], "sub-label"); + assert_eq!(fetched_label[2]["id"], label.id().as_str()); + assert_eq!(next_label(&mut publisher).await, "EOSE"); + + publisher + .send(Message::Text( + serde_json::json!(["EVENT", event_to_value(&report)]) + .to_string() + .into(), + )) + .await + .expect("report send"); + assert_ok(&next_json(&mut publisher).await, true); + publisher + .send(Message::Text( + serde_json::json!(["REQ", "sub-report", { "ids": [report.id().as_str()] }]) + .to_string() + .into(), + )) + .await + .expect("report fetch send"); + let fetched_report = next_json(&mut publisher).await; + assert_eq!(fetched_report[0], "EVENT"); + assert_eq!(fetched_report[1], "sub-report"); + assert_eq!(fetched_report[2]["id"], report.id().as_str()); + assert_eq!(next_label(&mut publisher).await, "EOSE"); + subscriber .send(Message::Text( serde_json::json!(["CLOSE", "sub-live"]).to_string().into(), @@ -298,6 +348,28 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { assert!(forum_comments.contains("200 OK")); assert!(forum_comments.contains(thread_comment.id().as_str())); assert!(forum_comments.contains("I can bring greens.")); + let moderation_labels = http_get_admin( + port, + &format!( + "/api/admin/moderation/labels?target_type=event&target_ref={}&namespace=com.radroots.moderation&label=reviewed&limit=5", + listing.id().as_str() + ), + Some(admin.as_str()), + ); + assert!(moderation_labels.contains("200 OK")); + assert!(moderation_labels.contains(label.id().as_str())); + assert!(moderation_labels.contains("\"label\":\"reviewed\"")); + let moderation_reports = http_get_admin( + port, + &format!( + "/api/admin/moderation/reports?target_type=event&target_ref={}&report_type=spam&limit=5", + listing.id().as_str() + ), + Some(admin.as_str()), + ); + assert!(moderation_reports.contains("200 OK")); + assert!(moderation_reports.contains(report.id().as_str())); + assert!(moderation_reports.contains("\"report_type\":\"spam\"")); let search = http_get(port, "/api/search?q=carrots&limit=5"); assert!(search.contains("200 OK")); assert!(search.contains(listing.id().as_str())); @@ -370,6 +442,29 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { .expect("thread search row") .is_some() ); + let label_rows = store + .label_projection_rows(label.id()) + .await + .expect("label rows"); + assert_eq!(label_rows.len(), 2); + let event_label = label_rows + .iter() + .find(|row| row["target_type"] == "event") + .expect("event label row"); + assert_eq!(event_label["event_id"], label.id().as_str()); + assert_eq!(event_label["target_ref"], listing.id().as_str()); + assert_eq!(event_label["namespace"], "com.radroots.moderation"); + assert_eq!(event_label["label"], "reviewed"); + let report_rows = store + .report_projection_rows(report.id()) + .await + .expect("report rows"); + assert_eq!(report_rows.len(), 1); + assert_eq!(report_rows[0]["event_id"], report.id().as_str()); + assert_eq!(report_rows[0]["target_type"], "event"); + assert_eq!(report_rows[0]["target_ref"], listing.id().as_str()); + assert_eq!(report_rows[0]["report_type"], "spam"); + assert_eq!(report_rows[0]["reported_pubkeys"][0], seller.as_str()); assert!( store .search_document_row(&listing_key) @@ -850,6 +945,27 @@ fn http_get(port: u16, path: &str) -> String { try_http_get(port, path).expect("http get") } +fn http_get_admin(port: u16, path: &str, admin_pubkey: Option<&str>) -> String { + let mut stream = TcpStream::connect(("127.0.0.1", port)).expect("http connect"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("read timeout"); + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .expect("write timeout"); + let admin_header = admin_pubkey + .map(|pubkey| format!("x-tangle-admin-pubkey: {pubkey}\r\n")) + .unwrap_or_default(); + write!( + stream, + "GET {path} HTTP/1.1\r\nHost: 127.0.0.1:{port}\r\nAccept: application/json\r\n{admin_header}Connection: close\r\n\r\n" + ) + .expect("http get"); + let mut response = String::new(); + stream.read_to_string(&mut response).expect("http read"); + response +} + fn http_post_json(port: u16, path: &str, admin_pubkey: Option<&str>, body: Value) -> String { let body = body.to_string(); let mut stream = TcpStream::connect(("127.0.0.1", port)).expect("http connect"); @@ -988,6 +1104,53 @@ fn listing_reaction( .expect("reaction event") } +fn listing_label( + listing: &tangle_protocol::Event, + created_at: u64, + label: &str, +) -> tangle_protocol::Event { + let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()); + let namespace = "com.radroots.moderation"; + build_fixture_event_from_parts( + FixtureKey::Seller, + created_at, + 1_985, + vec![ + vec!["L".to_owned(), namespace.to_owned()], + vec!["l".to_owned(), label.to_owned(), namespace.to_owned()], + vec!["e".to_owned(), listing.id().as_str().to_owned()], + vec!["a".to_owned(), listing_key], + ], + "moderator label", + ) + .expect("label event") +} + +fn listing_report( + listing: &tangle_protocol::Event, + created_at: u64, + report_type: &str, +) -> tangle_protocol::Event { + build_fixture_event_from_parts( + FixtureKey::Seller, + created_at, + 1_984, + vec![ + vec![ + "p".to_owned(), + listing.unsigned().pubkey().as_str().to_owned(), + ], + vec![ + "e".to_owned(), + listing.id().as_str().to_owned(), + report_type.to_owned(), + ], + ], + "moderator report", + ) + .expect("report event") +} + fn forum_thread(created_at: u64, title: Option<&str>, topics: &[&str]) -> tangle_protocol::Event { let mut tags = vec![ vec!["e".to_owned(), "5".repeat(EventId::HEX_LENGTH)], diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -36,10 +36,10 @@ use tangle_protocol::{ use tangle_store::{StoreEventOutcome, StoredEvent}; use tangle_store_surreal::{ CommentProjectionOutcome, CommentProjectionQuery, DurableRateLimitDecision, - ForumThreadProjectionOutcome, ForumThreadProjectionQuery, ListingProjectionQuery, - LongFormProjectionOutcome, MigrationApplyOutcome, ReactionProjectionOutcome, - SearchDocumentQuery, SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, - base_migration_plan, + ForumThreadProjectionOutcome, ForumThreadProjectionQuery, LabelProjectionOutcome, + LabelProjectionQuery, ListingProjectionQuery, LongFormProjectionOutcome, MigrationApplyOutcome, + ReactionProjectionOutcome, ReportProjectionOutcome, ReportProjectionQuery, SearchDocumentQuery, + SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, base_migration_plan, }; use tokio::net::TcpListener; use tokio::sync::broadcast; @@ -616,6 +616,16 @@ async fn project_stored_event( ) => false, Err(_) => return Err(RuntimeCommandError::store("event projection failed")), }; + let label_projected = match store.project_label(event, now).await { + Ok(LabelProjectionOutcome::Projected) => true, + Ok(LabelProjectionOutcome::NotLabel | LabelProjectionOutcome::Ineligible) => false, + Err(_) => return Err(RuntimeCommandError::store("event projection failed")), + }; + let report_projected = match store.project_report(event, now).await { + Ok(ReportProjectionOutcome::Projected) => true, + Ok(ReportProjectionOutcome::NotReport | ReportProjectionOutcome::Ineligible) => false, + Err(_) => return Err(RuntimeCommandError::store("event projection failed")), + }; if effect == AdmissionEffect::StoreRawAndProjectPublicListing { if store.project_current_listing(event, now).await.is_err() || store.project_listing_helpers(event).await.is_err() @@ -625,7 +635,12 @@ async fn project_stored_event( } return Ok(true); } - Ok(comment_projected || reaction_projected || long_form_projected || forum_thread_projected) + Ok(comment_projected + || reaction_projected + || long_form_projected + || forum_thread_projected + || label_projected + || report_projected) } fn parse_event_import_document(raw: &str) -> Result<Vec<Event>, RuntimeCommandError> { @@ -1013,6 +1028,14 @@ fn runtime_router( "/api/admin/events/{event_id}/unhide", post(runtime_admin_unhide_event), ) + .route( + "/api/admin/moderation/labels", + get(runtime_admin_moderation_labels), + ) + .route( + "/api/admin/moderation/reports", + get(runtime_admin_moderation_reports), + ) .with_state(state) } @@ -1273,6 +1296,50 @@ async fn runtime_admin_unhide_event( } } +async fn runtime_admin_moderation_labels( + State(state): State<RuntimeRelayState>, + headers: HeaderMap, + RawQuery(query): RawQuery, +) -> Result<Json<ModerationLabelsDocument>, ApiError> { + let _admin = require_admin_pubkey(&state.config, &headers)?; + let query = label_projection_query(query.as_deref().unwrap_or_default())?; + let rows = state + .store + .query_label_projections(&query) + .await + .map_err(|_| ApiError::internal())?; + let items = rows + .iter() + .map(moderation_label_document) + .collect::<Result<Vec<_>, _>>()?; + Ok(Json(ModerationLabelsDocument { + items, + next_cursor: None, + })) +} + +async fn runtime_admin_moderation_reports( + State(state): State<RuntimeRelayState>, + headers: HeaderMap, + RawQuery(query): RawQuery, +) -> Result<Json<ModerationReportsDocument>, ApiError> { + let _admin = require_admin_pubkey(&state.config, &headers)?; + let query = report_projection_query(query.as_deref().unwrap_or_default())?; + let rows = state + .store + .query_report_projections(&query) + .await + .map_err(|_| ApiError::internal())?; + let items = rows + .iter() + .map(moderation_report_document) + .collect::<Result<Vec<_>, _>>()?; + Ok(Json(ModerationReportsDocument { + items, + next_cursor: None, + })) +} + async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) { let mut shutdown = state.shutdown_signal.subscribe(); let mut event_rx = state.event_tx.subscribe(); @@ -1980,6 +2047,8 @@ impl EventMessageHandler { || self.store.project_reaction(&event, now).await.is_err() || self.store.project_long_form(&event, now).await.is_err() || self.store.project_forum_thread(&event, now).await.is_err() + || self.store.project_label(&event, now).await.is_err() + || self.store.project_report(&event, now).await.is_err() { return ok_rejected(event_id, "error: projection failed".to_owned()); } @@ -2720,6 +2789,47 @@ impl AdminPolicyDocument { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ModerationLabelsDocument { + pub items: Vec<ModerationLabelDocument>, + pub next_cursor: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ModerationLabelDocument { + pub label_id: String, + pub event_id: String, + pub pubkey: String, + pub created_at: u64, + pub content: String, + pub namespace: String, + pub label: String, + pub target_type: String, + pub target_ref: String, + pub projected_at: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ModerationReportsDocument { + pub items: Vec<ModerationReportDocument>, + pub next_cursor: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ModerationReportDocument { + pub report_id: String, + pub event_id: String, + pub pubkey: String, + pub created_at: u64, + pub content: String, + pub target_type: String, + pub target_ref: String, + pub report_type: String, + pub reported_pubkeys: Vec<String>, + pub server_urls: Vec<String>, + pub projected_at: u64, +} + #[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] pub struct AdminEventPolicyRequest { pub reason: Option<String>, @@ -3418,6 +3528,129 @@ fn forum_thread_query(raw: &str) -> Result<ForumThreadProjectionQuery, ApiError> Ok(query) } +fn label_projection_query(raw: &str) -> Result<LabelProjectionQuery, ApiError> { + let mut target_type = None; + let mut target_ref = None; + let mut namespace = None; + let mut label = None; + let mut pubkey = None; + let mut limit = None; + for (key, value) in form_urlencoded::parse(raw.as_bytes()) { + let value = value.into_owned(); + match key.as_ref() { + "target_type" => set_once( + "target_type", + &mut target_type, + required_value("target_type", &value)?, + )?, + "target_ref" => set_once( + "target_ref", + &mut target_ref, + required_value("target_ref", &value)?, + )?, + "namespace" => set_once( + "namespace", + &mut namespace, + required_value("namespace", &value)?, + )?, + "label" => set_once("label", &mut label, required_value("label", &value)?)?, + "pubkey" => set_once("pubkey", &mut pubkey, parse_pubkey("pubkey", &value)?)?, + "limit" => set_once("limit", &mut limit, parse_limit(&value)?)?, + "cursor" => { + return Err(invalid_parameter( + "cursor", + "signed cursor decoding is not implemented", + )); + } + "" => {} + unsupported => { + return Err(ApiError::invalid_request(format!( + "query parameter `{unsupported}` is unsupported" + ))); + } + } + } + if target_type.is_some() != target_ref.is_some() { + return Err(invalid_parameter( + "target", + "target_type and target_ref must be provided together", + )); + } + let mut query = LabelProjectionQuery::new().with_limit(limit.unwrap_or(50)); + if let (Some(target_type), Some(target_ref)) = (target_type, target_ref) { + query = query.with_target(&target_type, &target_ref); + } + if let Some(namespace) = namespace { + query = query.with_namespace(&namespace); + } + if let Some(label) = label { + query = query.with_label(&label); + } + if let Some(pubkey) = pubkey { + query = query.with_pubkey(pubkey.as_str()); + } + Ok(query) +} + +fn report_projection_query(raw: &str) -> Result<ReportProjectionQuery, ApiError> { + let mut target_type = None; + let mut target_ref = None; + let mut report_type = None; + let mut pubkey = None; + let mut limit = None; + for (key, value) in form_urlencoded::parse(raw.as_bytes()) { + let value = value.into_owned(); + match key.as_ref() { + "target_type" => set_once( + "target_type", + &mut target_type, + required_value("target_type", &value)?, + )?, + "target_ref" => set_once( + "target_ref", + &mut target_ref, + required_value("target_ref", &value)?, + )?, + "report_type" => set_once( + "report_type", + &mut report_type, + required_value("report_type", &value)?, + )?, + "pubkey" => set_once("pubkey", &mut pubkey, parse_pubkey("pubkey", &value)?)?, + "limit" => set_once("limit", &mut limit, parse_limit(&value)?)?, + "cursor" => { + return Err(invalid_parameter( + "cursor", + "signed cursor decoding is not implemented", + )); + } + "" => {} + unsupported => { + return Err(ApiError::invalid_request(format!( + "query parameter `{unsupported}` is unsupported" + ))); + } + } + } + if target_type.is_some() != target_ref.is_some() { + return Err(invalid_parameter( + "target", + "target_type and target_ref must be provided together", + )); + } + let mut query = ReportProjectionQuery::new().with_limit(limit.unwrap_or(50)); + if let (Some(target_type), Some(target_ref)) = (target_type, target_ref) { + query = query.with_target(&target_type, &target_ref); + } + if let Some(report_type) = report_type { + query = query.with_report_type(&report_type); + } + if let Some(pubkey) = pubkey { + query = query.with_pubkey(pubkey.as_str()); + } + Ok(query) +} + fn parse_comment_query(raw: &str) -> Result<u64, ApiError> { let mut limit = None; for (key, value) in form_urlencoded::parse(raw.as_bytes()) { @@ -3493,6 +3726,39 @@ fn comment_item_document(row: &serde_json::Value) -> Result<CommentItemDocument, }) } +fn moderation_label_document(row: &serde_json::Value) -> Result<ModerationLabelDocument, ApiError> { + Ok(ModerationLabelDocument { + label_id: string_field(row, "label_id")?, + event_id: string_field(row, "event_id")?, + pubkey: string_field(row, "pubkey")?, + created_at: u64_field(row, "created_at")?, + content: string_field(row, "content")?, + namespace: string_field(row, "namespace")?, + label: string_field(row, "label")?, + target_type: string_field(row, "target_type")?, + target_ref: string_field(row, "target_ref")?, + projected_at: u64_field(row, "projected_at")?, + }) +} + +fn moderation_report_document( + row: &serde_json::Value, +) -> Result<ModerationReportDocument, ApiError> { + Ok(ModerationReportDocument { + report_id: string_field(row, "report_id")?, + event_id: string_field(row, "event_id")?, + pubkey: string_field(row, "pubkey")?, + created_at: u64_field(row, "created_at")?, + content: string_field(row, "content")?, + target_type: string_field(row, "target_type")?, + target_ref: string_field(row, "target_ref")?, + report_type: string_field(row, "report_type")?, + reported_pubkeys: string_array_field(row, "reported_pubkeys")?, + server_urls: string_array_field(row, "server_urls")?, + projected_at: u64_field(row, "projected_at")?, + }) +} + fn reaction_counts_document( row: Option<&serde_json::Value>, target_event_id: &str,