tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit ee4e7d1ba5a4eafe27fe6f230f3ff13a992d45a1
parent 6ac248f91364861379dc18f458afaf63aefc2db3
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 20:08:50 -0700

runtime: query reqs with Pocket filters

- Validate and execute REQ filters as Pocket-owned filters through the shared query path.
- Move REQ rate-limit classification and live subscription decisions onto Pocket filters.
- Preserve protocol conversion only at protocol helper and COUNT/HLL boundaries.
- Validate the req, complete, redacted, workspace check, source scan, and clippy lanes.

Diffstat:
Mcrates/tangle_runtime/src/relay/core.rs | 281+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Mcrates/tangle_runtime/src/runtime.rs | 270++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Mcrates/tangle_runtime/src/session.rs | 52++++++++++++++++++++++------------------------------
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 7+++++--
4 files changed, 514 insertions(+), 96 deletions(-)

diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -32,8 +32,8 @@ use tangle_groups::{ use tangle_protocol::ClientMessage; use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ - PocketEvent, PocketHll8, PocketOwnedEvent, PocketQueryConfig, PocketScreenResult, - PocketStoreConfig, PocketStoreHandle, + PocketEvent, PocketFilter, PocketHll8, PocketOwnedEvent, PocketOwnedFilter, PocketQueryConfig, + PocketScreenResult, PocketStoreConfig, PocketStoreHandle, }; pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled"; @@ -84,6 +84,36 @@ pub(crate) struct BaseRelayQueryReport { query_metrics: BaseRelayQueryMetrics, } +pub(crate) struct BaseRelayReqQuery<'a> { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a BaseAuthState, +} + +impl<'a> BaseRelayReqQuery<'a> { + pub(crate) fn new( + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a BaseAuthState, + ) -> Self { + Self { + subscription_id, + filters, + search_present, + auth, + } + } +} + +struct BaseRelayGroupReqQuery<'a> { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a GroupAuthContext, +} + impl BaseRelayQueryReport { fn new( messages: Vec<RuntimeRelayMessage>, @@ -489,10 +519,57 @@ impl BaseRelayLimits { Ok(()) } + pub(crate) fn validate_pocket_filters( + &self, + filters: &[PocketOwnedFilter], + ) -> Result<(), BaseRelayError> { + if filters.is_empty() { + return Err(BaseRelayError::invalid( + "request must include at least one filter", + )); + } + if filters.len() > self.max_filters_per_request { + return Err(BaseRelayError::invalid(format!( + "filter count exceeds runtime max_filters_per_request {}", + self.max_filters_per_request + ))); + } + for filter in filters { + let tag_values = filter + .tags() + .map_err(|error| BaseRelayError::error(error.to_string()))? + .iter() + .map(|tag| tag.skip(1).count()) + .sum::<usize>(); + if tag_values > self.max_tag_values_per_filter { + return Err(BaseRelayError::invalid(format!( + "filter tag value count exceeds runtime max_tag_values_per_filter {}", + self.max_tag_values_per_filter + ))); + } + if filter.limit() != u32::MAX && u64::from(filter.limit()) > self.max_limit { + return Err(BaseRelayError::invalid(format!( + "filter limit exceeds runtime max_limit {}", + self.max_limit + ))); + } + } + self.validate_pocket_query_complexity(filters)?; + Ok(()) + } + fn effective_filter_limit(self, filter: &Filter) -> usize { usize::try_from(filter.limit().unwrap_or(self.default_limit)).unwrap_or(usize::MAX) } + fn effective_pocket_filter_limit(self, filter: &PocketFilter) -> usize { + if filter.limit() == u32::MAX { + usize::try_from(self.default_limit).unwrap_or(usize::MAX) + } else { + usize::try_from(filter.limit()).unwrap_or(usize::MAX) + } + } + fn validate_query_complexity(&self, filters: &[Filter]) -> Result<(), BaseRelayError> { let score = filters .iter() @@ -507,6 +584,23 @@ impl BaseRelayLimits { Ok(()) } + fn validate_pocket_query_complexity( + &self, + filters: &[PocketOwnedFilter], + ) -> Result<(), BaseRelayError> { + let score = filters + .iter() + .map(|filter| self.pocket_filter_complexity(filter)) + .fold(0_usize, usize::saturating_add); + if score > self.max_query_complexity { + return Err(BaseRelayError::invalid(format!( + "query complexity {score} exceeds runtime max_query_complexity {}", + self.max_query_complexity + ))); + } + Ok(()) + } + fn filter_complexity(&self, filter: &Filter) -> usize { let tag_score = filter .tag_filters() @@ -523,6 +617,29 @@ impl BaseRelayLimits { .saturating_add(filter.search().map(str::len).unwrap_or(0)) .saturating_add(self.effective_filter_limit(filter)) } + + fn pocket_filter_complexity(&self, filter: &PocketFilter) -> usize { + let tag_score = filter + .tags() + .map(|tags| { + tags.iter() + .map(|tag| 1_usize.saturating_add(tag.skip(1).count())) + .fold(0_usize, usize::saturating_add) + }) + .unwrap_or(usize::MAX); + 1_usize + .saturating_add(filter.num_ids()) + .saturating_add(filter.num_authors()) + .saturating_add(filter.num_kinds()) + .saturating_add(tag_score) + .saturating_add(usize::from( + filter.since() != tangle_store_pocket::PocketTime::min(), + )) + .saturating_add(usize::from( + filter.until() != tangle_store_pocket::PocketTime::max(), + )) + .saturating_add(self.effective_pocket_filter_limit(filter)) + } } impl BaseRelay { @@ -539,6 +656,16 @@ impl BaseRelay { }) } + pub(crate) fn unsupported_search_present_closed( + subscription_id: &SubscriptionId, + search_present: bool, + ) -> Option<RelayMessage> { + search_present.then(|| RelayMessage::Closed { + subscription_id: subscription_id.clone(), + message: "unsupported: search filters are not supported".to_owned(), + }) + } + fn redacted_req_closed( subscription_id: SubscriptionId, auth: &GroupAuthContext, @@ -650,18 +777,21 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - subscription_id: SubscriptionId, - filters: Vec<Filter>, - auth: &BaseAuthState, + request: BaseRelayReqQuery<'_>, ) -> Result<BaseRelayQueryReport, BaseRelayError> { + let group_auth = + GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned()); Self::query_req_with_group_auth_shared_services( store, groups, limits, query, - subscription_id, - filters, - &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + BaseRelayGroupReqQuery { + subscription_id: request.subscription_id, + filters: request.filters, + search_present: request.search_present, + auth: &group_auth, + }, ) } @@ -1201,38 +1331,60 @@ impl BaseRelay { filters: Vec<Filter>, auth: &GroupAuthContext, ) -> Result<BaseRelayQueryReport, BaseRelayError> { + let search_present = filters.iter().any(|filter| filter.search().is_some()); + let filters = filters + .iter() + .map(tangle_filter_to_pocket) + .collect::<Result<Vec<_>, _>>()?; + self.handle_pocket_req_with_group_auth_report( + subscription_id, + filters, + search_present, + auth, + ) + } + + fn handle_pocket_req_with_group_auth_report( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &GroupAuthContext, + ) -> Result<BaseRelayQueryReport, BaseRelayError> { self.limits.validate_subscription_id(&subscription_id)?; - self.limits.validate_filters(&filters)?; - if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { + self.limits.validate_pocket_filters(&filters)?; + if let Some(message) = + Self::unsupported_search_present_closed(&subscription_id, search_present) + { return Ok(BaseRelayQueryReport::new( vec![message.into()], false, BaseRelayQueryMetrics::default(), )); } - let should_subscribe = !filters_are_complete(&filters); + let should_subscribe = !pocket_filters_are_complete(&filters); if should_subscribe { - let pocket_filters = filters - .iter() - .map(tangle_filter_to_pocket) - .collect::<Result<Vec<_>, _>>()?; self.subscriptions - .ensure_can_subscribe(&subscription_id, &pocket_filters)?; - let report = - self.query_req_with_group_auth_report(subscription_id.clone(), filters, auth)?; + .ensure_can_subscribe(&subscription_id, &filters)?; + let report = self.query_req_with_group_auth_report( + subscription_id.clone(), + filters.clone(), + false, + auth, + )?; if !report.group_read_denied() { - self.subscriptions - .subscribe(subscription_id, pocket_filters)?; + self.subscriptions.subscribe(subscription_id, filters)?; } return Ok(report); } - self.query_req_with_group_auth_report(subscription_id, filters, auth) + self.query_req_with_group_auth_report(subscription_id, filters, false, auth) } fn query_req_with_group_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, auth: &GroupAuthContext, ) -> Result<BaseRelayQueryReport, BaseRelayError> { Self::query_req_with_group_auth_shared_services( @@ -1240,9 +1392,12 @@ impl BaseRelay { self.groups.as_ref(), self.limits, self.query, - subscription_id, - filters, - auth, + BaseRelayGroupReqQuery { + subscription_id, + filters, + search_present, + auth, + }, ) } @@ -1251,13 +1406,19 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - subscription_id: SubscriptionId, - filters: Vec<Filter>, - auth: &GroupAuthContext, + request: BaseRelayGroupReqQuery<'_>, ) -> Result<BaseRelayQueryReport, BaseRelayError> { + let BaseRelayGroupReqQuery { + subscription_id, + filters, + search_present, + auth, + } = request; limits.validate_subscription_id(&subscription_id)?; - limits.validate_filters(&filters)?; - if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { + limits.validate_pocket_filters(&filters)?; + if let Some(message) = + Self::unsupported_search_present_closed(&subscription_id, search_present) + { return Ok(BaseRelayQueryReport::new( vec![message.into()], false, @@ -1439,7 +1600,12 @@ impl BaseRelay { filters: &[Filter], auth: &GroupAuthContext, ) -> Result<Vec<Event>, BaseRelayError> { - self.query_events_report(filters, auth).and_then(|report| { + self.limits.validate_filters(filters)?; + let filters = filters + .iter() + .map(tangle_filter_to_pocket) + .collect::<Result<Vec<_>, _>>()?; + self.query_events_report(&filters, auth).and_then(|report| { report .events .into_iter() @@ -1450,7 +1616,7 @@ impl BaseRelay { fn query_events_report( &self, - filters: &[Filter], + filters: &[PocketOwnedFilter], auth: &GroupAuthContext, ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { Self::query_events_report_with_services( @@ -1468,7 +1634,7 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - filters: &[Filter], + filters: &[PocketOwnedFilter], auth: &GroupAuthContext, ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { let mut output = Vec::new(); @@ -1487,7 +1653,7 @@ impl BaseRelay { group_read_denied |= report.group_read_denied; query_metrics = query_metrics.add(report.query_metrics); let mut events = Self::sort_and_dedupe_query_events(report.events); - events.truncate(limits.effective_filter_limit(filter)); + events.truncate(limits.effective_pocket_filter_limit(filter)); output.extend(events); } let events = Self::sort_and_dedupe_query_events(output); @@ -1515,6 +1681,7 @@ impl BaseRelay { let mut hll = hll_offset.map(|_| PocketHll8::new()); for filter in filters { let filter = filter.without_limit(); + let filter = tangle_filter_to_pocket(&filter)?; let report = Self::query_filter_events_report_with_services( store, groups, @@ -1563,12 +1730,11 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - filter: &Filter, + filter: &PocketFilter, auth: &GroupAuthContext, limit_mode: BaseRelayFilterLimitMode, ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { - let effective_filter = Self::filter_with_limit_mode(limits, filter, limit_mode); - let pocket_filter = tangle_filter_to_pocket(&effective_filter)?; + let pocket_filter = Self::pocket_filter_with_limit_mode(limits, filter, limit_mode)?; let screen_error = RefCell::new(None); let candidates_scanned = Cell::new(0_u64); let redacted_events = Cell::new(0_u64); @@ -1610,17 +1776,38 @@ impl BaseRelay { )) } - fn filter_with_limit_mode( + fn pocket_filter_with_limit_mode( limits: BaseRelayLimits, - filter: &Filter, + filter: &PocketFilter, limit_mode: BaseRelayFilterLimitMode, - ) -> Filter { - match (limit_mode, filter.limit()) { - (BaseRelayFilterLimitMode::ApplyDefaultLimit, None) => { - filter.with_limit(limits.default_limit()) + ) -> Result<PocketOwnedFilter, BaseRelayError> { + let limit = match (limit_mode, filter.limit()) { + (BaseRelayFilterLimitMode::ApplyDefaultLimit, u32::MAX) => { + u32::try_from(limits.default_limit) + .map_err(|_| BaseRelayError::invalid("default filter limit exceeds u32"))? } - _ => filter.clone(), - } + (_, limit) => limit, + }; + let ids = filter.ids().collect::<Vec<_>>(); + let authors = filter.authors().collect::<Vec<_>>(); + let kinds = filter.kinds().collect::<Vec<_>>(); + let since = + (filter.since() != tangle_store_pocket::PocketTime::min()).then(|| filter.since()); + let until = + (filter.until() != tangle_store_pocket::PocketTime::max()).then(|| filter.until()); + let limit = (limit != u32::MAX).then_some(limit); + PocketOwnedFilter::new( + &ids, + &authors, + &kinds, + filter + .tags() + .map_err(|error| BaseRelayError::error(error.to_string()))?, + since, + until, + limit, + ) + .map_err(|error| BaseRelayError::error(error.to_string())) } fn sort_and_dedupe_query_events(mut events: Vec<PocketOwnedEvent>) -> Vec<PocketOwnedEvent> { @@ -1654,8 +1841,8 @@ impl BaseRelay { } } -fn filters_are_complete(filters: &[Filter]) -> bool { - !filters.is_empty() && filters.iter().all(Filter::is_complete) +fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool { + !filters.is_empty() && filters.iter().all(|filter| filter.completes()) } #[cfg(test)] diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -18,7 +18,8 @@ use crate::{ auth::BaseAuthState, core::{ BaseRelay, BaseRelayCountReport, BaseRelayEventWrite, BaseRelayLimits, - BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayShutdownReport, + BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayReqQuery, + BaseRelayShutdownReport, }, live::LiveSubscriptionSet, outbound::{RuntimeRelayMessage, protocol_messages}, @@ -43,7 +44,9 @@ use tangle_groups::{ use tangle_protocol::{ EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp, }; -use tangle_store_pocket::{PocketEvent, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle}; +use tangle_store_pocket::{ + PocketEvent, PocketFilter, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle, PocketTime, +}; use tokio::sync::watch; pub struct TangleRuntime { @@ -83,6 +86,17 @@ struct TangleQueryRateLimitRequest<'a> { now: UnixTimestamp, } +struct TanglePocketQueryRateLimitRequest<'a> { + scope: TangleRateLimitScope, + rules: TangleQueryRateLimitConfig, + label: &'static str, + subscription_id: &'a SubscriptionId, + filters: &'a [PocketOwnedFilter], + auth: &'a BaseAuthState, + context: TangleClientRateLimitContext, + now: UnixTimestamp, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum TangleQueryClassification { Bounded, @@ -138,6 +152,17 @@ impl TangleQueryClassifier { self.classify_filters(filters, Self::classify_count_filter) } + fn classify_pocket_query(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification { + if filters.is_empty() { + return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters); + } + filters + .iter() + .map(|filter| self.classify_pocket_query_filter(filter)) + .find(|classification| classification.is_broad()) + .unwrap_or(TangleQueryClassification::Bounded) + } + fn classify_filters( self, filters: &[Filter], @@ -188,6 +213,21 @@ impl TangleQueryClassifier { TangleQueryClassification::Bounded } + fn classify_pocket_query_filter(self, filter: &PocketFilter) -> TangleQueryClassification { + if !self.has_pocket_primary_constraint(filter) { + return TangleQueryClassification::Broad( + TangleBroadQueryReason::MissingPrimaryConstraint, + ); + } + if self.has_pocket_high_limit(filter) { + return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); + } + if self.has_pocket_broad_time_window(filter) && !self.has_pocket_strong_constraint(filter) { + return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); + } + TangleQueryClassification::Bounded + } + fn has_primary_constraint(self, filter: &Filter) -> bool { !filter.ids().is_empty() || !filter.authors().is_empty() @@ -253,6 +293,50 @@ impl TangleQueryClassifier { _ => false, } } + + fn has_pocket_primary_constraint(self, filter: &PocketFilter) -> bool { + filter.num_ids() > 0 + || filter.num_authors() > 0 + || filter.num_kinds() > 0 + || self.has_pocket_group_constraint(filter) + } + + fn has_pocket_strong_constraint(self, filter: &PocketFilter) -> bool { + filter.num_ids() > 0 || filter.num_authors() > 0 || self.has_pocket_group_constraint(filter) + } + + fn has_pocket_group_constraint(self, filter: &PocketFilter) -> bool { + filter + .tags() + .map(|tags| { + tags.iter().any(|mut tag| { + let name = tag.next(); + let has_value = tag.next().is_some(); + matches!(name, Some(value) if matches!(value, b"h" | b"d")) && has_value + }) + }) + .unwrap_or(false) + } + + fn has_pocket_high_limit(self, filter: &PocketFilter) -> bool { + let limit = if filter.limit() == u32::MAX { + self.limits.default_limit() + } else { + u64::from(filter.limit()) + }; + limit >= self.limits.max_limit() + } + + fn has_pocket_broad_time_window(self, filter: &PocketFilter) -> bool { + if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() { + return false; + } + filter + .until() + .as_ref() + .saturating_sub(*filter.since().as_ref()) + > BROAD_QUERY_TIME_WINDOW_SECONDS + } } impl TangleRuntime { @@ -573,7 +657,8 @@ impl TangleRuntimeShared { fn query_req_with_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, auth: &BaseAuthState, ) -> Result<BaseRelayQueryReport, BaseRelayError> { BaseRelay::query_req_with_shared_services( @@ -581,9 +666,7 @@ impl TangleRuntimeShared { self.groups.as_ref(), self.limits.base_relay_limits(), self.config.pocket_query_config(), - subscription_id, - filters, - auth, + BaseRelayReqQuery::new(subscription_id, filters, search_present, auth), ) } @@ -624,6 +707,26 @@ impl TangleRuntimeShared { }) } + fn rate_limit_req_pocket( + &self, + subscription_id: &SubscriptionId, + filters: &[PocketOwnedFilter], + auth: &BaseAuthState, + context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { + self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest { + scope: TangleRateLimitScope::Req, + rules: self.config.rate_limits().req(), + label: "req", + subscription_id, + filters, + auth, + context, + now, + }) + } + fn rate_limit_count( &self, subscription_id: &SubscriptionId, @@ -743,6 +846,88 @@ impl TangleRuntimeShared { None } + fn rate_limit_pocket_query( + &self, + request: TanglePocketQueryRateLimitRequest<'_>, + ) -> Option<RelayMessage> { + if let Some(peer_ip) = request.context.peer_ip + && let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::ip(request.scope, peer_ip), + request.rules.per_ip(), + request.label, + "ip", + request.now, + ) + { + return Some(message); + } + if let Some(connection_id) = request.context.connection_id + && let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::connection(request.scope, connection_id), + request.rules.per_connection(), + request.label, + "connection", + request.now, + ) + { + return Some(message); + } + for pubkey in request.auth.authenticated_pubkeys() { + if let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::pubkey(request.scope, pubkey.clone()), + request.rules.per_pubkey(), + request.label, + "pubkey", + request.now, + ) { + return Some(message); + } + } + for group_id in pocket_filter_group_ids(request.filters) { + if let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::group(request.scope, group_id), + request.rules.per_group(), + request.label, + "group", + request.now, + ) { + return Some(message); + } + } + for kind in pocket_filter_kinds(request.filters) { + if let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::kind(request.scope, kind), + request.rules.per_kind(), + request.label, + "kind", + request.now, + ) { + return Some(message); + } + } + let query_classification = TangleQueryClassifier::new(self.limits.base_relay_limits()) + .classify_pocket_query(request.filters); + if query_classification.is_broad() + && let Some(message) = self.rate_limit_closed( + request.subscription_id, + TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad), + request.rules.broad(), + request.label, + "broad", + request.now, + ) + { + self.metrics.record_broad_query_rejection(); + return Some(message); + } + None + } + fn rate_limit_closed( &self, subscription_id: &SubscriptionId, @@ -956,7 +1141,6 @@ impl TangleRuntimeHandle { filters, search_present, } => { - let filters = runtime_filters_to_protocol(filters, search_present)?; let started_at = Instant::now(); self.inner .limits @@ -965,16 +1149,16 @@ impl TangleRuntimeHandle { self.inner .limits .base_relay_limits() - .validate_filters(&filters)?; + .validate_pocket_filters(&filters)?; if let Some(message) = - BaseRelay::unsupported_search_closed(&subscription_id, &filters) + BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message.into()]); } - if let Some(message) = self.inner.rate_limit_req( + if let Some(message) = self.inner.rate_limit_req_pocket( &subscription_id, &filters, auth, @@ -986,9 +1170,12 @@ impl TangleRuntimeHandle { .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message.into()]); } - let report = - self.inner - .query_req_with_auth_report(subscription_id, filters, auth)?; + let report = self.inner.query_req_with_auth_report( + subscription_id, + filters, + search_present, + auth, + )?; self.inner .metrics .record_query_metrics(report.query_metrics()); @@ -1154,16 +1341,32 @@ impl TangleRuntimeHandle { .rate_limit_req(subscription_id, filters, auth, rate_limit_context, now) } + pub(crate) async fn rate_limit_req_pocket( + &self, + subscription_id: &SubscriptionId, + filters: &[PocketOwnedFilter], + auth: &BaseAuthState, + rate_limit_context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { + self.inner + .rate_limit_req_pocket(subscription_id, filters, auth, rate_limit_context, now) + } + pub(crate) async fn query_req_with_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, auth: &BaseAuthState, ) -> Result<BaseRelayQueryReport, BaseRelayError> { let started_at = Instant::now(); - let report = self - .inner - .query_req_with_auth_report(subscription_id, filters, auth)?; + let report = self.inner.query_req_with_auth_report( + subscription_id, + filters, + search_present, + auth, + )?; if report.group_read_denied() { self.inner.metrics.record_group_read_denial(); } @@ -1382,6 +1585,39 @@ fn filter_kinds(filters: &[Filter]) -> Vec<Kind> { .collect() } +fn pocket_filter_group_ids(filters: &[PocketOwnedFilter]) -> Vec<GroupId> { + let mut group_ids = BTreeSet::new(); + for filter in filters { + let Ok(tags) = filter.tags() else { + continue; + }; + for mut tag in tags.iter() { + let name = tag.next(); + if !matches!(name, Some(value) if matches!(value, b"h" | b"d")) { + continue; + } + for value in tag { + if let Ok(value) = std::str::from_utf8(value) + && let Ok(group_id) = GroupId::new(value) + { + group_ids.insert(group_id); + } + } + } + } + group_ids.into_iter().collect() +} + +fn pocket_filter_kinds(filters: &[PocketOwnedFilter]) -> Vec<Kind> { + filters + .iter() + .flat_map(|filter| filter.kinds()) + .filter_map(|kind| Kind::new(u64::from(kind.as_u16())).ok()) + .collect::<BTreeSet<_>>() + .into_iter() + .collect() +} + impl fmt::Debug for TangleRuntimeHandle { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter.write_str("TangleRuntimeHandle") diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -5,7 +5,6 @@ use crate::{ errors::BaseRelayError, event_bus::{TangleEventReceiveError, TangleEventReceiver}, logging, - pocket_conversion::pocket_filter_to_tangle, relay::{ auth::{BaseAuthState, generate_auth_challenge}, core::BaseRelay, @@ -23,7 +22,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, time::{Instant, SystemTime, UNIX_EPOCH}, }; -use tangle_protocol::{Filter, RelayMessage, SubscriptionId, UnixTimestamp}; +use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::PocketOwnedFilter; use tokio::sync::{mpsc, watch}; @@ -279,9 +278,7 @@ impl TangleWebSocketSession { filters, search_present, } => { - let protocol_filters = - runtime_filters_to_protocol(filters.clone(), search_present)?; - self.handle_req(subscription_id, protocol_filters, filters) + self.handle_req(subscription_id, filters, search_present) .await } RuntimeClientMessage::Count { @@ -331,21 +328,25 @@ impl TangleWebSocketSession { async fn handle_req( &mut self, subscription_id: SubscriptionId, - filters: Vec<Filter>, - pocket_filters: Vec<PocketOwnedFilter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { let metrics = self.runtime.metrics(); metrics.record_client_message(TangleClientMessageMetricKind::Req); self.limits .base_relay_limits() .validate_subscription_id(&subscription_id)?; - self.limits.base_relay_limits().validate_filters(&filters)?; - if let Some(message) = BaseRelay::unsupported_search_closed(&subscription_id, &filters) { + self.limits + .base_relay_limits() + .validate_pocket_filters(&filters)?; + if let Some(message) = + BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) + { return Ok(vec![message.into()]); } if let Some(message) = self .runtime - .rate_limit_req( + .rate_limit_req_pocket( &subscription_id, &filters, &self.auth, @@ -356,20 +357,25 @@ impl TangleWebSocketSession { { return Ok(vec![message.into()]); } - let should_subscribe = !filters_are_complete(&filters); + let should_subscribe = !pocket_filters_are_complete(&filters); if should_subscribe { self.subscriptions - .ensure_can_subscribe(&subscription_id, &pocket_filters)?; + .ensure_can_subscribe(&subscription_id, &filters)?; } let report = self .runtime - .query_req_with_auth_report(subscription_id.clone(), filters.clone(), &self.auth) + .query_req_with_auth_report( + subscription_id.clone(), + filters.clone(), + search_present, + &self.auth, + ) .await?; let closes_subscription = report.group_read_denied(); let replies = report.into_messages(); if should_subscribe && !closes_subscription { self.subscriptions - .subscribe(subscription_id.clone(), pocket_filters)?; + .subscribe(subscription_id.clone(), filters)?; metrics.record_subscription_opened(); logging::log_subscription_opened(self.connection_id, &subscription_id); } @@ -410,20 +416,6 @@ impl TangleWebSocketSession { } } -fn runtime_filters_to_protocol( - filters: Vec<PocketOwnedFilter>, - search_present: bool, -) -> Result<Vec<Filter>, BaseRelayError> { - filters - .into_iter() - .enumerate() - .map(|(index, filter)| { - let search = (search_present && index == 0).then(String::new); - pocket_filter_to_tangle(&filter, search) - }) - .collect() -} - #[derive(Debug, Clone, PartialEq, Eq)] enum TangleSessionControl { Continue, @@ -492,8 +484,8 @@ fn current_unix_timestamp() -> UnixTimestamp { ) } -fn filters_are_complete(filters: &[Filter]) -> bool { - !filters.is_empty() && filters.iter().all(Filter::is_complete) +fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool { + !filters.is_empty() && filters.iter().all(|filter| filter.completes()) } #[cfg(test)] diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1633,8 +1633,11 @@ fn runtime_req_handling_does_not_lock_relay_state() { assert!(!req_branch.contains("relay.lock().await")); assert!(!query_helper.contains("relay.lock().await")); - assert!(req_branch.contains("query_req_with_auth_report(subscription_id, filters, auth)?")); - assert!(query_helper.contains("query_req_with_auth_report(subscription_id, filters, auth)?")); + assert!(!req_branch.contains("runtime_filters_to_protocol(filters, search_present)?")); + assert!(req_branch.contains("validate_pocket_filters(&filters)?")); + assert!(req_branch.contains("rate_limit_req_pocket(")); + assert!(req_branch.contains("query_req_with_auth_report(")); + assert!(query_helper.contains("query_req_with_auth_report(")); } #[test]