commit d551db1ace7612728a6b9ce5460cd0bd5a3e478f
parent ef3c6d9b2f56159c31a3e78386674fc32c46b97a
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:21:44 -0700
ws: handle req queries
Diffstat:
1 file changed, 269 insertions(+), 16 deletions(-)
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -10,14 +10,17 @@ use axum::{
use core::fmt;
use http::{HeaderMap, HeaderValue, StatusCode, header};
use serde::{Deserialize, Serialize};
+use std::collections::BTreeSet;
use tangle_core::{
AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter,
MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec,
- MarketplaceSort, RateLimitConfig, RuntimeLimits, SubscriptionManager, SubscriptionMatcher,
+ MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RuntimeLimits,
+ SubscriptionManager, SubscriptionMatcher,
};
use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event};
use tangle_protocol::{
- ClientMessage, Event, EventId, PublicKeyHex, RelayMessage, UnixTimestamp, parse_client_message,
+ ClientMessage, Event, EventId, Filter, PublicKeyHex, RawEventJson, RelayMessage,
+ SubscriptionId, UnixTimestamp, parse_client_message, parse_event_json,
};
use tangle_store::{StoreEventOutcome, StoredEvent};
use tangle_store_surreal::{ListingProjectionQuery, SearchDocumentQuery, SurrealStore};
@@ -365,6 +368,148 @@ impl AuthMessageHandler {
}
}
+#[derive(Debug, Clone)]
+pub struct ReqMessageHandler {
+ store: SurrealStore,
+ compiler: NostrFilterCompiler,
+}
+
+impl ReqMessageHandler {
+ pub fn new(store: SurrealStore, compiler: NostrFilterCompiler) -> Self {
+ Self { store, compiler }
+ }
+
+ pub fn store(&self) -> &SurrealStore {
+ &self.store
+ }
+
+ pub fn compiler(&self) -> NostrFilterCompiler {
+ self.compiler
+ }
+
+ pub async fn handle_req(
+ &self,
+ connection: &mut RelayConnection,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ ) -> Vec<RelayMessage> {
+ let plan = match self
+ .compiler
+ .compile(&filters, QueryExecutionMode::HistoricalThenLive)
+ {
+ Ok(plan) => plan,
+ Err(error) => {
+ return vec![RelayMessage::Closed {
+ subscription_id,
+ message: format!("unsupported: {error}"),
+ }];
+ }
+ };
+ if let Err(error) = connection
+ .subscriptions_mut()
+ .subscribe(subscription_id.clone(), plan)
+ {
+ return vec![RelayMessage::Closed {
+ subscription_id,
+ message: format!("error: {error}"),
+ }];
+ }
+ let events = match self.query_historical_events(&filters).await {
+ Ok(events) => events,
+ Err(error) => {
+ return vec![RelayMessage::Closed {
+ subscription_id,
+ message: error.message().to_owned(),
+ }];
+ }
+ };
+ let mut messages = events
+ .into_iter()
+ .map(|event| RelayMessage::Event {
+ subscription_id: subscription_id.clone(),
+ event,
+ })
+ .collect::<Vec<_>>();
+ messages.push(RelayMessage::Eose(subscription_id));
+ messages
+ }
+
+ async fn query_historical_events(&self, filters: &[Filter]) -> Result<Vec<Event>, ApiError> {
+ let mut seen = BTreeSet::new();
+ let mut events = Vec::new();
+ for filter in filters {
+ for event in self.query_single_filter_events(filter).await? {
+ if seen.insert(event.id().clone()) {
+ events.push(event);
+ }
+ }
+ }
+ Ok(events)
+ }
+
+ async fn query_single_filter_events(&self, filter: &Filter) -> Result<Vec<Event>, ApiError> {
+ let rows = if filter.search().is_some() {
+ self.query_search_filter_rows(filter).await?
+ } else if filter.ids().is_empty()
+ && filter
+ .kinds()
+ .iter()
+ .any(|kind| kind.is_replaceable() || kind.is_addressable())
+ {
+ self.store
+ .query_current_events(filter)
+ .await
+ .map_err(|_| ApiError::internal())?
+ } else {
+ self.store
+ .query_raw_events(filter)
+ .await
+ .map_err(|_| ApiError::internal())?
+ };
+ rows.iter().map(event_from_store_row).collect()
+ }
+
+ async fn query_search_filter_rows(
+ &self,
+ filter: &Filter,
+ ) -> Result<Vec<serde_json::Value>, ApiError> {
+ let mut query = SearchDocumentQuery::new()
+ .with_doc_type("listing")
+ .with_visible(true);
+ if let Some(search) = filter.search() {
+ query = query.with_text(search);
+ }
+ if filter.kinds().len() == 1 {
+ query = query.with_kind(filter.kinds()[0].as_u32());
+ }
+ if filter.authors().len() == 1 {
+ query = query.with_pubkey(filter.authors()[0].as_str());
+ }
+ if let Some(limit) = filter.limit() {
+ query = query.with_limit(limit);
+ }
+ let docs = self
+ .store
+ .query_search_documents(&query)
+ .await
+ .map_err(|_| ApiError::internal())?;
+ let mut rows = Vec::new();
+ for doc in docs {
+ let event_id =
+ EventId::new(&string_field(&doc, "event_id")?).map_err(|_| ApiError::internal())?;
+ if let Some(row) = self
+ .store
+ .raw_event_row(&event_id)
+ .await
+ .map_err(|_| ApiError::internal())?
+ {
+ rows.push(row);
+ }
+ }
+ Ok(rows)
+ }
+}
+
fn admission_context(connection: &RelayConnection) -> AdmissionContext {
connection
.auth()
@@ -390,6 +535,12 @@ fn ok_rejected(event_id: EventId, message: String) -> RelayMessage {
}
}
+fn event_from_store_row(row: &serde_json::Value) -> Result<Event, ApiError> {
+ let raw =
+ RawEventJson::new(&string_field(row, "raw_json")?).map_err(|_| ApiError::internal())?;
+ parse_event_json(&raw).map_err(|_| ApiError::internal())
+}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApiErrorCode {
InvalidRequest,
@@ -1553,19 +1704,22 @@ mod tests {
ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, AuthMessageHandler, ClientFrame,
ClientFrameOutcome, ClientMessageLoop, EventMessageHandler, ListingsHttpState,
ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig,
- RelayConnectionId, RelayInfoDocument, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS,
- WebSocketHttpState, health_router, listing_item_document, listing_projection_query,
- listings_router, parse_listing_query, parse_marketplace_search_query, relay_info_router,
- search_document_query, websocket_router,
+ RelayConnectionId, RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE,
+ TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router, listing_item_document,
+ listing_projection_query, listings_router, parse_listing_query,
+ parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
use tangle_core::{
AdmissionPolicy, EventValidator, MarketplaceListingStatus, MarketplaceSort,
- RateLimitConfig, RuntimeLimits,
+ NostrFilterCompiler, RateLimitConfig, RuntimeLimits,
};
use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event};
- use tangle_protocol::{ClientMessage, RelayMessage, UnixTimestamp, event_to_value};
+ use tangle_protocol::{
+ ClientMessage, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value,
+ filter_from_value,
+ };
use tangle_store::StoredEvent;
use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore, base_migration_plan};
use tangle_test_support::{auth_event_spec, build_fixture_event, valid_public_listing_spec};
@@ -2033,6 +2187,104 @@ mod tests {
}
#[tokio::test]
+ async fn req_message_handler_returns_raw_events_and_eose() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ store
+ .store_raw_event(&StoredEvent::new(
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_300),
+ ))
+ .await
+ .expect("raw event");
+ let handler = ReqMessageHandler::new(store, NostrFilterCompiler::default());
+ let mut connection = runtime_connection("req-raw");
+ let subscription_id = SubscriptionId::new("sub-raw").expect("subscription");
+ let filter = filter_from_value(&serde_json::json!({
+ "ids": [listing.id().as_str()],
+ "limit": 10
+ }))
+ .expect("filter");
+
+ let messages = handler
+ .handle_req(&mut connection, subscription_id.clone(), vec![filter])
+ .await;
+
+ assert_eq!(messages.len(), 2);
+ match &messages[0] {
+ RelayMessage::Event {
+ subscription_id: id,
+ event,
+ } => {
+ assert_eq!(id, &subscription_id);
+ assert_eq!(event.id(), listing.id());
+ }
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ assert_eq!(messages[1], RelayMessage::Eose(subscription_id.clone()));
+ assert!(connection.subscriptions().plan(&subscription_id).is_some());
+ assert_eq!(handler.compiler(), NostrFilterCompiler::default());
+ }
+
+ #[tokio::test]
+ async fn req_message_handler_hydrates_search_results_and_closes_bad_requests() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ store
+ .store_raw_event(&StoredEvent::new(
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_300),
+ ))
+ .await
+ .expect("raw event");
+ store
+ .index_listing_search_document(&listing)
+ .await
+ .expect("search");
+ let handler = ReqMessageHandler::new(store.clone(), NostrFilterCompiler::default());
+ let mut connection = runtime_connection("req-search");
+ let search_id = SubscriptionId::new("sub-search").expect("subscription");
+ let search_filter = filter_from_value(&serde_json::json!({
+ "search": "carrot",
+ "kinds": [30402],
+ "authors": [listing.unsigned().pubkey().as_str()],
+ "limit": 5
+ }))
+ .expect("filter");
+
+ let messages = handler
+ .handle_req(&mut connection, search_id.clone(), vec![search_filter])
+ .await;
+
+ assert_eq!(messages.len(), 2);
+ match &messages[0] {
+ RelayMessage::Event { event, .. } => assert_eq!(event.id(), listing.id()),
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ assert_eq!(messages[1], RelayMessage::Eose(search_id));
+ let bad_id = SubscriptionId::new("sub-bad").expect("subscription");
+ let bad = handler
+ .handle_req(&mut connection, bad_id.clone(), Vec::new())
+ .await;
+ assert_eq!(
+ bad,
+ vec![RelayMessage::Closed {
+ subscription_id: bad_id,
+ message: "unsupported: query plan: query plan must include at least one branch"
+ .to_owned()
+ }]
+ );
+ assert!(
+ handler
+ .store()
+ .raw_event_row(listing.id())
+ .await
+ .expect("raw")
+ .is_some()
+ );
+ }
+
+ #[tokio::test]
async fn api_error_into_response_keeps_public_envelope_shape() {
let response = ApiError::not_found("listing not found").into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
@@ -3071,19 +3323,20 @@ mod tests {
}
fn runtime_client_message_loop() -> ClientMessageLoop {
- let mut connection = RelayConnection::new(
- RelayConnectionId::new("client-loop").expect("connection id"),
- RelayConnectionConfig::default(),
- );
+ let mut connection = runtime_connection("client-loop");
connection.set_remote_addr("127.0.0.1:7777");
ClientMessageLoop::new(connection)
}
- fn authenticated_connection() -> RelayConnection {
- let mut connection = RelayConnection::new(
- RelayConnectionId::new("authenticated").expect("connection id"),
+ fn runtime_connection(id: &str) -> RelayConnection {
+ RelayConnection::new(
+ RelayConnectionId::new(id).expect("connection id"),
RelayConnectionConfig::default(),
- );
+ )
+ }
+
+ fn authenticated_connection() -> RelayConnection {
+ let mut connection = runtime_connection("authenticated");
let auth = build_fixture_event(&auth_event_spec()).expect("auth");
connection
.auth_mut()