tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 04f92812fb215f960922d0f49b9dbb38b58d3661
parent d551db1ace7612728a6b9ce5460cd0bd5a3e478f
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 01:23:36 -0700

ws: handle close messages

Diffstat:
Mcrates/tangle_runtime/src/lib.rs | 73++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 66 insertions(+), 7 deletions(-)

diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -15,7 +15,7 @@ use tangle_core::{ AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec, MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RuntimeLimits, - SubscriptionManager, SubscriptionMatcher, + SubscriptionCloseOutcome, SubscriptionManager, SubscriptionMatcher, }; use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event}; use tangle_protocol::{ @@ -510,6 +510,28 @@ impl ReqMessageHandler { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CloseMessageOutcome { + Closed, + NotFound, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct CloseMessageHandler; + +impl CloseMessageHandler { + pub fn handle_close( + &self, + connection: &mut RelayConnection, + subscription_id: &SubscriptionId, + ) -> CloseMessageOutcome { + match connection.subscriptions_mut().close(subscription_id) { + SubscriptionCloseOutcome::Closed => CloseMessageOutcome::Closed, + SubscriptionCloseOutcome::NotFound => CloseMessageOutcome::NotFound, + } + } +} + fn admission_context(connection: &RelayConnection) -> AdmissionContext { connection .auth() @@ -1702,12 +1724,13 @@ fn invalid_parameter(field: &'static str, requirement: &str) -> ApiError { mod tests { use super::{ ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, AuthMessageHandler, ClientFrame, - ClientFrameOutcome, ClientMessageLoop, EventMessageHandler, ListingsHttpState, - ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig, - RelayConnectionId, RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE, - TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router, listing_item_document, - listing_projection_query, listings_router, parse_listing_query, - parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router, + ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome, + EventMessageHandler, ListingsHttpState, ReadinessCheckStatus, ReadinessState, + RelayConnection, RelayConnectionConfig, RelayConnectionId, RelayInfoDocument, + ReqMessageHandler, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, + health_router, listing_item_document, listing_projection_query, listings_router, + parse_listing_query, parse_marketplace_search_query, relay_info_router, + search_document_query, websocket_router, }; use axum::{body::Body, response::IntoResponse}; use http::{HeaderValue, Request, StatusCode, header}; @@ -2285,6 +2308,42 @@ mod tests { } #[tokio::test] + async fn close_message_handler_removes_subscriptions() { + let store = runtime_memory_store().await; + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + store + .store_raw_event(&StoredEvent::new( + listing.clone(), + UnixTimestamp::new(1_714_125_300), + )) + .await + .expect("raw event"); + let req_handler = ReqMessageHandler::new(store, NostrFilterCompiler::default()); + let close_handler = CloseMessageHandler; + let mut connection = runtime_connection("close"); + let subscription_id = SubscriptionId::new("sub-close").expect("subscription"); + let filter = filter_from_value(&serde_json::json!({ + "ids": [listing.id().as_str()] + })) + .expect("filter"); + + req_handler + .handle_req(&mut connection, subscription_id.clone(), vec![filter]) + .await; + + assert_eq!(connection.subscriptions().active_count(), 1); + assert_eq!( + close_handler.handle_close(&mut connection, &subscription_id), + CloseMessageOutcome::Closed + ); + assert_eq!(connection.subscriptions().active_count(), 0); + assert_eq!( + close_handler.handle_close(&mut connection, &subscription_id), + CloseMessageOutcome::NotFound + ); + } + + #[tokio::test] async fn api_error_into_response_keeps_public_envelope_shape() { let response = ApiError::not_found("listing not found").into_response(); assert_eq!(response.status(), StatusCode::NOT_FOUND);