commit 760ccc9c7b90dac616a489aa41e6757791d11a25
parent 1bddd8afa8465dcb0e22ec119197aad851cb0a0d
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:13:31 -0700
ws: add client message loop
Diffstat:
1 file changed, 147 insertions(+), 9 deletions(-)
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -16,7 +16,7 @@ use tangle_core::{
SubscriptionManager, SubscriptionMatcher,
};
use tangle_nips::{FulfillmentMethod, ListingUnit};
-use tangle_protocol::{EventId, PublicKeyHex};
+use tangle_protocol::{ClientMessage, EventId, PublicKeyHex, RelayMessage, parse_client_message};
use tangle_store_surreal::{ListingProjectionQuery, SearchDocumentQuery, SurrealStore};
use url::form_urlencoded;
@@ -192,6 +192,57 @@ impl Default for WebSocketHttpState {
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ClientFrame {
+ Text(String),
+ Binary(Vec<u8>),
+ Ping(Vec<u8>),
+ Pong(Vec<u8>),
+ Close,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum ClientFrameOutcome {
+ Message(ClientMessage),
+ Reject(RelayMessage),
+ Ignore,
+ Close,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClientMessageLoop {
+ connection: RelayConnection,
+}
+
+impl ClientMessageLoop {
+ pub fn new(connection: RelayConnection) -> Self {
+ Self { connection }
+ }
+
+ pub fn connection(&self) -> &RelayConnection {
+ &self.connection
+ }
+
+ pub fn connection_mut(&mut self) -> &mut RelayConnection {
+ &mut self.connection
+ }
+
+ pub fn handle_frame(&mut self, frame: ClientFrame) -> ClientFrameOutcome {
+ match frame {
+ ClientFrame::Text(raw) => parse_client_message(&raw)
+ .map(ClientFrameOutcome::Message)
+ .unwrap_or_else(|error| {
+ ClientFrameOutcome::Reject(RelayMessage::Notice(format!("invalid: {error}")))
+ }),
+ ClientFrame::Binary(_) => ClientFrameOutcome::Reject(RelayMessage::Notice(
+ "unsupported: binary websocket messages are not supported".to_owned(),
+ )),
+ ClientFrame::Ping(_) | ClientFrame::Pong(_) => ClientFrameOutcome::Ignore,
+ ClientFrame::Close => ClientFrameOutcome::Close,
+ }
+ }
+}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApiErrorCode {
InvalidRequest,
@@ -1352,21 +1403,21 @@ fn invalid_parameter(field: &'static str, requirement: &str) -> ApiError {
#[cfg(test)]
mod tests {
use super::{
- ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, ListingsHttpState,
- ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig,
- RelayConnectionId, RelayInfoDocument, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS,
- WebSocketHttpState, health_router, listing_item_document, listing_projection_query,
- listings_router, parse_listing_query, parse_marketplace_search_query, relay_info_router,
- search_document_query, websocket_router,
+ ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, ClientFrame, ClientFrameOutcome,
+ ClientMessageLoop, ListingsHttpState, ReadinessCheckStatus, ReadinessState,
+ RelayConnection, RelayConnectionConfig, RelayConnectionId, RelayInfoDocument,
+ TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router,
+ listing_item_document, listing_projection_query, listings_router, parse_listing_query,
+ parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
use tangle_core::{MarketplaceListingStatus, MarketplaceSort, RateLimitConfig, RuntimeLimits};
use tangle_nips::{FulfillmentMethod, ListingUnit};
- use tangle_protocol::{UnixTimestamp, event_to_value};
+ use tangle_protocol::{ClientMessage, RelayMessage, UnixTimestamp, event_to_value};
use tangle_store::StoredEvent;
use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore, base_migration_plan};
- use tangle_test_support::{build_fixture_event, valid_public_listing_spec};
+ use tangle_test_support::{auth_event_spec, build_fixture_event, valid_public_listing_spec};
use tower::ServiceExt;
#[test]
@@ -1582,6 +1633,84 @@ mod tests {
assert_eq!(response.status(), StatusCode::UPGRADE_REQUIRED);
}
+ #[test]
+ fn client_message_loop_dispatches_supported_text_messages() {
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let auth = build_fixture_event(&auth_event_spec()).expect("auth");
+ let mut loop_state = runtime_client_message_loop();
+
+ match loop_state.handle_frame(ClientFrame::Text(
+ serde_json::json!(["EVENT", event_to_value(&listing)]).to_string(),
+ )) {
+ ClientFrameOutcome::Message(ClientMessage::Event(event)) => {
+ assert_eq!(event.id(), listing.id());
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ match loop_state.handle_frame(ClientFrame::Text(
+ serde_json::json!(["AUTH", event_to_value(&auth)]).to_string(),
+ )) {
+ ClientFrameOutcome::Message(ClientMessage::Auth(event)) => {
+ assert_eq!(event.id(), auth.id());
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ match loop_state.handle_frame(ClientFrame::Text(
+ r#"["REQ","sub-a",{"kinds":[30402],"limit":1}]"#.to_owned(),
+ )) {
+ ClientFrameOutcome::Message(ClientMessage::Req {
+ subscription_id,
+ filters,
+ }) => {
+ assert_eq!(subscription_id.as_str(), "sub-a");
+ assert_eq!(filters.len(), 1);
+ assert_eq!(filters[0].limit(), Some(1));
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ match loop_state.handle_frame(ClientFrame::Text(r#"["CLOSE","sub-a"]"#.to_owned())) {
+ ClientFrameOutcome::Message(ClientMessage::Close(subscription_id)) => {
+ assert_eq!(subscription_id.as_str(), "sub-a");
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ assert_eq!(loop_state.connection().id().as_str(), "client-loop");
+ assert_eq!(
+ loop_state.connection_mut().remote_addr(),
+ Some("127.0.0.1:7777")
+ );
+ }
+
+ #[test]
+ fn client_message_loop_rejects_or_ignores_non_message_frames() {
+ let mut loop_state = runtime_client_message_loop();
+
+ match loop_state.handle_frame(ClientFrame::Text("not json".to_owned())) {
+ ClientFrameOutcome::Reject(RelayMessage::Notice(message)) => {
+ assert!(message.starts_with("invalid: client message JSON is invalid:"));
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ assert_eq!(
+ loop_state.handle_frame(ClientFrame::Binary(vec![1, 2, 3])),
+ ClientFrameOutcome::Reject(RelayMessage::Notice(
+ "unsupported: binary websocket messages are not supported".to_owned()
+ ))
+ );
+ assert_eq!(
+ loop_state.handle_frame(ClientFrame::Ping(vec![1])),
+ ClientFrameOutcome::Ignore
+ );
+ assert_eq!(
+ loop_state.handle_frame(ClientFrame::Pong(vec![2])),
+ ClientFrameOutcome::Ignore
+ );
+ assert_eq!(
+ loop_state.handle_frame(ClientFrame::Close),
+ ClientFrameOutcome::Close
+ );
+ }
+
#[tokio::test]
async fn api_error_into_response_keeps_public_envelope_shape() {
let response = ApiError::not_found("listing not found").into_response();
@@ -2619,4 +2748,13 @@ mod tests {
.expect("apply plan");
store
}
+
+ fn runtime_client_message_loop() -> ClientMessageLoop {
+ let mut connection = RelayConnection::new(
+ RelayConnectionId::new("client-loop").expect("connection id"),
+ RelayConnectionConfig::default(),
+ );
+ connection.set_remote_addr("127.0.0.1:7777");
+ ClientMessageLoop::new(connection)
+ }
}