commit 0894b5b2d6d16379508dcdfd28ad08e230ae68c9
parent 04f92812fb215f960922d0f49b9dbb38b58d3661
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:26:07 -0700
ws: add live event fanout
Diffstat:
1 file changed, 69 insertions(+), 5 deletions(-)
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -532,6 +532,23 @@ impl CloseMessageHandler {
}
}
+#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
+pub struct LiveEventFanout;
+
+impl LiveEventFanout {
+ pub fn fanout(&self, connection: &RelayConnection, event: &Event) -> Vec<RelayMessage> {
+ connection
+ .subscriptions()
+ .match_event(event)
+ .into_iter()
+ .map(|matched| RelayMessage::Event {
+ subscription_id: matched.subscription_id,
+ event: event.clone(),
+ })
+ .collect()
+ }
+}
+
fn admission_context(connection: &RelayConnection) -> AdmissionContext {
connection
.auth()
@@ -1725,11 +1742,11 @@ mod tests {
use super::{
ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, AuthMessageHandler, ClientFrame,
ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome,
- EventMessageHandler, ListingsHttpState, ReadinessCheckStatus, ReadinessState,
- RelayConnection, RelayConnectionConfig, RelayConnectionId, RelayInfoDocument,
- ReqMessageHandler, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState,
- health_router, listing_item_document, listing_projection_query, listings_router,
- parse_listing_query, parse_marketplace_search_query, relay_info_router,
+ EventMessageHandler, ListingsHttpState, LiveEventFanout, ReadinessCheckStatus,
+ ReadinessState, RelayConnection, RelayConnectionConfig, RelayConnectionId,
+ RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS,
+ WebSocketHttpState, health_router, listing_item_document, listing_projection_query,
+ listings_router, parse_listing_query, parse_marketplace_search_query, relay_info_router,
search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
@@ -2344,6 +2361,53 @@ mod tests {
}
#[tokio::test]
+ async fn live_event_fanout_delivers_matching_subscription_events() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let handler = ReqMessageHandler::new(store, NostrFilterCompiler::default());
+ let fanout = LiveEventFanout;
+ let mut connection = runtime_connection("fanout");
+ let matching_id = SubscriptionId::new("sub-matching").expect("matching subscription");
+ let miss_id = SubscriptionId::new("sub-miss").expect("miss subscription");
+ let matching_filter = filter_from_value(&serde_json::json!({
+ "kinds": [30402],
+ "authors": [listing.unsigned().pubkey().as_str()]
+ }))
+ .expect("matching filter");
+ let miss_filter = filter_from_value(&serde_json::json!({
+ "ids": ["cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"]
+ }))
+ .expect("miss filter");
+
+ handler
+ .handle_req(&mut connection, matching_id.clone(), vec![matching_filter])
+ .await;
+ handler
+ .handle_req(&mut connection, miss_id, vec![miss_filter])
+ .await;
+
+ let messages = fanout.fanout(&connection, &listing);
+
+ assert_eq!(messages.len(), 1);
+ assert_eq!(
+ messages,
+ vec![RelayMessage::Event {
+ subscription_id: matching_id,
+ event: listing
+ }]
+ );
+ }
+
+ #[test]
+ fn live_event_fanout_ignores_connections_without_matches() {
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let fanout = LiveEventFanout;
+ let connection = runtime_connection("fanout-empty");
+
+ assert_eq!(fanout.fanout(&connection, &listing), Vec::new());
+ }
+
+ #[tokio::test]
async fn api_error_into_response_keeps_public_envelope_shape() {
let response = ApiError::not_found("listing not found").into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);