commit d02d3229a04235379fa05322ba514782bd42d5ce
parent 760ccc9c7b90dac616a489aa41e6757791d11a25
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:17:38 -0700
ws: handle event messages
Diffstat:
2 files changed, 234 insertions(+), 12 deletions(-)
diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml
@@ -15,11 +15,11 @@ serde_json = "1"
tangle_core = { path = "../tangle_core" }
tangle_nips = { path = "../tangle_nips" }
tangle_protocol = { path = "../tangle_protocol" }
+tangle_store = { path = "../tangle_store" }
tangle_store_surreal = { path = "../tangle_store_surreal" }
url = "2"
[dev-dependencies]
-tangle_store = { path = "../tangle_store" }
tangle_test_support = { path = "../tangle_test_support" }
tokio = { version = "1", features = ["macros", "rt"] }
tower = { version = "0.5", features = ["util"] }
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -11,12 +11,15 @@ use core::fmt;
use http::{HeaderMap, HeaderValue, StatusCode, header};
use serde::{Deserialize, Serialize};
use tangle_core::{
- AuthChallengeState, FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery,
- MarketplaceQueryError, MarketplaceQuerySpec, MarketplaceSort, RateLimitConfig, RuntimeLimits,
- SubscriptionManager, SubscriptionMatcher,
+ AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter,
+ MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec,
+ MarketplaceSort, RateLimitConfig, RuntimeLimits, SubscriptionManager, SubscriptionMatcher,
};
use tangle_nips::{FulfillmentMethod, ListingUnit};
-use tangle_protocol::{ClientMessage, EventId, PublicKeyHex, RelayMessage, parse_client_message};
+use tangle_protocol::{
+ ClientMessage, Event, EventId, PublicKeyHex, RelayMessage, UnixTimestamp, parse_client_message,
+};
+use tangle_store::{StoreEventOutcome, StoredEvent};
use tangle_store_surreal::{ListingProjectionQuery, SearchDocumentQuery, SurrealStore};
use url::form_urlencoded;
@@ -243,6 +246,110 @@ impl ClientMessageLoop {
}
}
+#[derive(Debug, Clone)]
+pub struct EventMessageHandler {
+ store: SurrealStore,
+ validator: EventValidator,
+}
+
+impl EventMessageHandler {
+ pub fn new(store: SurrealStore, validator: EventValidator) -> Self {
+ Self { store, validator }
+ }
+
+ pub fn store(&self) -> &SurrealStore {
+ &self.store
+ }
+
+ pub fn validator(&self) -> &EventValidator {
+ &self.validator
+ }
+
+ pub async fn handle_event(
+ &self,
+ connection: &RelayConnection,
+ event: Event,
+ received_at: UnixTimestamp,
+ now: UnixTimestamp,
+ ) -> RelayMessage {
+ let event_id = event.id().clone();
+ let context = admission_context(connection);
+ let validated = match self.validator.validate(&event, &context, now) {
+ Ok(validated) => validated,
+ Err(error) => return ok_rejected(event_id, format!("invalid: {error}")),
+ };
+ if validated.admission().effect() == AdmissionEffect::AuthenticateOnly {
+ return ok_rejected(event_id, "invalid: auth events must use AUTH".to_owned());
+ }
+ if event.unsigned().kind().is_ephemeral() {
+ return ok_accepted(event_id);
+ }
+ let raw_outcome = match self
+ .store
+ .store_raw_event(&StoredEvent::new(event.clone(), received_at))
+ .await
+ {
+ Ok(outcome) => outcome,
+ Err(_) => return ok_rejected(event_id, "error: store unavailable".to_owned()),
+ };
+ if raw_outcome == StoreEventOutcome::Duplicate {
+ return ok_accepted(event_id);
+ }
+ if self.store.index_event_tags(&event).await.is_err()
+ || self.store.maintain_current_event(&event).await.is_err()
+ || self.store.apply_deletion_markers(&event).await.is_err()
+ || self
+ .store
+ .store_listing_revision(&event, now)
+ .await
+ .is_err()
+ {
+ return ok_rejected(event_id, "error: projection failed".to_owned());
+ }
+ if validated.admission().effect() == AdmissionEffect::StoreRawAndProjectPublicListing
+ && (self
+ .store
+ .project_current_listing(&event, now)
+ .await
+ .is_err()
+ || self.store.project_listing_helpers(&event).await.is_err()
+ || self
+ .store
+ .index_listing_search_document(&event)
+ .await
+ .is_err())
+ {
+ return ok_rejected(event_id, "error: projection failed".to_owned());
+ }
+ ok_accepted(event_id)
+ }
+}
+
+fn admission_context(connection: &RelayConnection) -> AdmissionContext {
+ connection
+ .auth()
+ .authenticated_pubkey()
+ .cloned()
+ .map(AdmissionContext::authenticated)
+ .unwrap_or_else(AdmissionContext::unauthenticated)
+}
+
+fn ok_accepted(event_id: EventId) -> RelayMessage {
+ RelayMessage::Ok {
+ event_id,
+ accepted: true,
+ message: String::new(),
+ }
+}
+
+fn ok_rejected(event_id: EventId, message: String) -> RelayMessage {
+ RelayMessage::Ok {
+ event_id,
+ accepted: false,
+ message,
+ }
+}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApiErrorCode {
InvalidRequest,
@@ -1404,16 +1511,20 @@ fn invalid_parameter(field: &'static str, requirement: &str) -> ApiError {
mod tests {
use super::{
ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, ClientFrame, ClientFrameOutcome,
- ClientMessageLoop, ListingsHttpState, ReadinessCheckStatus, ReadinessState,
- RelayConnection, RelayConnectionConfig, RelayConnectionId, RelayInfoDocument,
- TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router,
- listing_item_document, listing_projection_query, listings_router, parse_listing_query,
- parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router,
+ ClientMessageLoop, EventMessageHandler, ListingsHttpState, ReadinessCheckStatus,
+ ReadinessState, RelayConnection, RelayConnectionConfig, RelayConnectionId,
+ RelayInfoDocument, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState,
+ health_router, listing_item_document, listing_projection_query, listings_router,
+ parse_listing_query, parse_marketplace_search_query, relay_info_router,
+ search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
- use tangle_core::{MarketplaceListingStatus, MarketplaceSort, RateLimitConfig, RuntimeLimits};
- use tangle_nips::{FulfillmentMethod, ListingUnit};
+ use tangle_core::{
+ AdmissionPolicy, EventValidator, MarketplaceListingStatus, MarketplaceSort,
+ RateLimitConfig, RuntimeLimits,
+ };
+ use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event};
use tangle_protocol::{ClientMessage, RelayMessage, UnixTimestamp, event_to_value};
use tangle_store::StoredEvent;
use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore, base_migration_plan};
@@ -1712,6 +1823,97 @@ mod tests {
}
#[tokio::test]
+ async fn event_message_handler_stores_and_projects_authenticated_listing() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let mut connection = authenticated_connection();
+ let handler = EventMessageHandler::new(
+ store.clone(),
+ EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()),
+ ),
+ );
+ let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str());
+
+ let outcome = handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_300),
+ UnixTimestamp::new(1_714_125_400),
+ )
+ .await;
+
+ assert_eq!(
+ outcome,
+ RelayMessage::Ok {
+ event_id: listing.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ );
+ assert_eq!(
+ handler
+ .store()
+ .raw_event_row(listing.id())
+ .await
+ .expect("raw")
+ .expect("raw exists")["event_id"],
+ listing.id().as_str()
+ );
+ assert_eq!(
+ store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("current")
+ .expect("current exists")["event_id"],
+ listing.id().as_str()
+ );
+ assert_eq!(
+ store
+ .search_document_row(&listing_key)
+ .await
+ .expect("search")
+ .expect("search exists")["event_id"],
+ listing.id().as_str()
+ );
+ assert_eq!(
+ handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_301),
+ UnixTimestamp::new(1_714_125_401),
+ )
+ .await,
+ RelayMessage::Ok {
+ event_id: listing.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ );
+ assert_eq!(handler.validator().limits(), RuntimeLimits::default());
+ connection.auth_mut().clear_authentication();
+ let rejected = handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_302),
+ UnixTimestamp::new(1_714_125_402),
+ )
+ .await;
+ match rejected {
+ RelayMessage::Ok {
+ accepted: false,
+ message,
+ ..
+ } => assert!(message.contains("write authentication required")),
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ }
+
+ #[tokio::test]
async fn api_error_into_response_keeps_public_envelope_shape() {
let response = ApiError::not_found("listing not found").into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
@@ -2757,4 +2959,24 @@ mod tests {
connection.set_remote_addr("127.0.0.1:7777");
ClientMessageLoop::new(connection)
}
+
+ fn authenticated_connection() -> RelayConnection {
+ let mut connection = RelayConnection::new(
+ RelayConnectionId::new("authenticated").expect("connection id"),
+ RelayConnectionConfig::default(),
+ );
+ let auth = build_fixture_event(&auth_event_spec()).expect("auth");
+ connection
+ .auth_mut()
+ .issue_challenge("challenge-001", UnixTimestamp::new(1_714_124_430))
+ .expect("challenge");
+ let auth = parse_relay_auth_event(&auth)
+ .expect("auth parses")
+ .expect("auth event");
+ connection
+ .auth_mut()
+ .authenticate(&auth, UnixTimestamp::new(1_714_124_435))
+ .expect("authenticate");
+ connection
+ }
}