tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 7d173dc8bd37dc7fc6aedbb451541a289c974cc5
parent ee4e7d1ba5a4eafe27fe6f230f3ff13a992d45a1
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 20:26:03 -0700

runtime: count events with Pocket filters

- Validate and execute COUNT filters as Pocket-owned filters through runtime and relay core.
- Compute exact counts and HLL from Pocket events while preserving privacy gates and limitless count queries.
- Remove the obsolete Pocket-to-protocol filter conversion and protocol rate-limit path.
- Validate count, hll, workspace check, source scan, and clippy lanes.

Diffstat:
Mcrates/tangle_bench/src/lib.rs | 22+++++++++++++++++-----
Mcrates/tangle_runtime/src/pocket_conversion.rs | 109+++----------------------------------------------------------------------------
Mcrates/tangle_runtime/src/relay/core.rs | 235+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Mcrates/tangle_runtime/src/runtime.rs | 408++++++++++++++++---------------------------------------------------------------
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 112+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 54+++++++++++++++++++++++++++++++++++++++++++-----------
6 files changed, 388 insertions(+), 552 deletions(-)

diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -10,6 +10,7 @@ use std::time::Instant; use tangle_groups::{KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, MemberStatus}; use tangle_protocol::{ Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value, filter_from_value, + filter_to_value, }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, @@ -20,8 +21,8 @@ use tangle_runtime::{ runtime::{TangleRuntime, TangleRuntimeHandle}, }; use tangle_store_pocket::{ - PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, parse_pocket_event_json, - parse_pocket_filter_json, + PocketOwnedFilter, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, + parse_pocket_event_json, parse_pocket_filter_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_config, @@ -1598,12 +1599,13 @@ fn count_for_operation( owner_auth: &BaseAuthState, ) -> Result<u64, String> { let subscription_id = subscription(operation.name)?; + let filter = pocket_filter(&operation.filter)?; let message = match operation.auth { QueryAuth::None => relay - .handle_count(subscription_id, vec![operation.filter.clone()]) + .handle_count(subscription_id, vec![filter]) .map_err(|error| error.to_string())?, QueryAuth::Owner => relay - .handle_count_with_auth(subscription_id, vec![operation.filter.clone()], owner_auth) + .handle_count_with_auth(subscription_id, vec![filter], owner_auth) .map_err(|error| error.to_string())?, }; match message { @@ -1632,7 +1634,7 @@ fn count_kind(relay: &BaseRelay, kind: u32) -> Result<u64, String> { let message = relay .handle_count_with_auth( subscription(&format!("count-{kind}"))?, - vec![filter_from_value(&json!({"kinds": [kind]}))?], + vec![pocket_filter_from_value(&json!({"kinds": [kind]}))?], &owner_auth, ) .map_err(|error| error.to_string())?; @@ -1642,6 +1644,16 @@ fn count_kind(relay: &BaseRelay, kind: u32) -> Result<u64, String> { } } +fn pocket_filter(filter: &Filter) -> Result<PocketOwnedFilter, String> { + let raw = serde_json::to_vec(&filter_to_value(filter)).map_err(|error| error.to_string())?; + parse_pocket_filter_json(&raw).map_err(|error| error.to_string()) +} + +fn pocket_filter_from_value(value: &serde_json::Value) -> Result<PocketOwnedFilter, String> { + let filter = filter_from_value(value)?; + pocket_filter(&filter) +} + fn validation_summary( scenarios: &[ScenarioReport], thresholds: BenchmarkThresholds, diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs @@ -1,15 +1,13 @@ #![forbid(unsafe_code)] use crate::errors::BaseRelayError; -use std::collections::BTreeMap; use std::str; use tangle_protocol::{ - Event, EventId, Filter, Kind, PublicKeyHex, SignatureHex, Tag, TagName, TagValue, - UnixTimestamp, UnsignedEvent, + Event, EventId, Filter, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, }; use tangle_store_pocket::{ - PocketEvent, PocketEventId, PocketFilter, PocketKind, PocketOwnedEvent, PocketOwnedFilter, - PocketOwnedTags, PocketPubkey, PocketSig, PocketTags, PocketTime, + PocketEvent, PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, + PocketPubkey, PocketSig, PocketTags, PocketTime, }; pub(crate) fn tangle_event_to_pocket(event: &Event) -> Result<PocketOwnedEvent, BaseRelayError> { @@ -115,41 +113,6 @@ pub(crate) fn pocket_event_to_tangle(event: &PocketEvent) -> Result<Event, BaseR )) } -pub(crate) fn pocket_filter_to_tangle( - filter: &PocketFilter, - search: Option<String>, -) -> Result<Filter, BaseRelayError> { - let ids = filter - .ids() - .map(|id| EventId::new(&id.as_hex_string()).map_err(BaseRelayError::error)) - .collect::<Result<Vec<_>, _>>()?; - let authors = filter - .authors() - .map(|author| PublicKeyHex::new(&author.as_hex_string()).map_err(BaseRelayError::error)) - .collect::<Result<Vec<_>, _>>()?; - let kinds = filter - .kinds() - .map(|kind| Kind::new(u64::from(kind.as_u16())).map_err(BaseRelayError::error)) - .collect::<Result<Vec<_>, _>>()?; - let tag_filters = pocket_filter_tags_to_tangle(filter)?; - let since = - (filter.since() != PocketTime::min()).then(|| UnixTimestamp::new(filter.since().as_u64())); - let until = - (filter.until() != PocketTime::max()).then(|| UnixTimestamp::new(filter.until().as_u64())); - let limit = (filter.limit() != u32::MAX).then(|| u64::from(filter.limit())); - Filter::from_parts( - ids, - authors, - kinds, - tag_filters, - since, - until, - limit, - search, - ) - .map_err(BaseRelayError::error) -} - pub(crate) fn pocket_event_id(event_id: &EventId) -> Result<PocketEventId, BaseRelayError> { PocketEventId::read_hex(event_id.as_str().as_bytes()) .map_err(|error| BaseRelayError::error(error.to_string())) @@ -185,43 +148,6 @@ fn tangle_tags_to_pocket(tags: &[Tag]) -> Result<PocketOwnedTags, BaseRelayError PocketOwnedTags::new(&parts).map_err(|error| BaseRelayError::error(error.to_string())) } -fn pocket_filter_tags_to_tangle( - filter: &PocketFilter, -) -> Result<BTreeMap<TagName, Vec<TagValue>>, BaseRelayError> { - let tags = filter - .tags() - .map_err(|error| BaseRelayError::error(error.to_string()))?; - let mut tag_filters = BTreeMap::new(); - for mut tag in tags.iter() { - let name = tag - .next() - .ok_or_else(|| BaseRelayError::invalid("filter tag must include a name")) - .and_then(tag_name_from_bytes)?; - let values = tag - .map(tag_value_from_bytes) - .collect::<Result<Vec<_>, _>>()?; - if values.is_empty() { - return Err(BaseRelayError::invalid(format!( - "filter field `#{}` must be a non-empty array", - name.as_str() - ))); - } - tag_filters.insert(name, values); - } - Ok(tag_filters) -} - -fn tag_name_from_bytes(bytes: &[u8]) -> Result<TagName, BaseRelayError> { - let name = str::from_utf8(bytes).map_err(|error| BaseRelayError::error(error.to_string()))?; - TagName::new(name).map_err(BaseRelayError::error) -} - -fn tag_value_from_bytes(bytes: &[u8]) -> Result<TagValue, BaseRelayError> { - str::from_utf8(bytes) - .map(TagValue::new) - .map_err(|error| BaseRelayError::error(error.to_string())) -} - fn ensure_tag_size(size: usize) -> Result<(), BaseRelayError> { if size > usize::from(u16::MAX) { return Err(BaseRelayError::invalid(format!( @@ -273,8 +199,7 @@ fn ensure_event_size(tags_len: usize, content_len: usize) -> Result<(), BaseRela #[cfg(test)] mod tests { use super::{ - pocket_event_id, pocket_event_to_tangle, pocket_filter_to_tangle, tangle_event_to_pocket, - tangle_filter_to_pocket, + pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket, tangle_filter_to_pocket, }; use tangle_protocol::{ Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, @@ -376,30 +301,4 @@ mod tests { ); } } - - #[test] - fn pocket_filter_conversion_builds_tangle_filter_with_search_marker() { - let value = serde_json::json!({ - "ids": ["a".repeat(64)], - "authors": ["b".repeat(64)], - "kinds": [1], - "#t": ["market"], - "since": 10, - "until": 20, - "limit": 30 - }); - let filter = filter_from_value(&value).expect("filter"); - let pocket_filter = tangle_filter_to_pocket(&filter).expect("pocket"); - let converted = - pocket_filter_to_tangle(&pocket_filter, Some("carrots".to_owned())).expect("tangle"); - - assert_eq!(converted.ids(), filter.ids()); - assert_eq!(converted.authors(), filter.authors()); - assert_eq!(converted.kinds(), filter.kinds()); - assert_eq!(converted.tag_filters(), filter.tag_filters()); - assert_eq!(converted.since(), filter.since()); - assert_eq!(converted.until(), filter.until()); - assert_eq!(converted.limit(), filter.limit()); - assert_eq!(converted.search(), Some("carrots")); - } } diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -114,6 +114,36 @@ struct BaseRelayGroupReqQuery<'a> { auth: &'a GroupAuthContext, } +pub(crate) struct BaseRelayCountQuery<'a> { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a BaseAuthState, +} + +impl<'a> BaseRelayCountQuery<'a> { + pub(crate) fn new( + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a BaseAuthState, + ) -> Self { + Self { + subscription_id, + filters, + search_present, + auth, + } + } +} + +struct BaseRelayGroupCountQuery<'a> { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + auth: &'a GroupAuthContext, +} + impl BaseRelayQueryReport { fn new( messages: Vec<RuntimeRelayMessage>, @@ -643,19 +673,6 @@ impl BaseRelayLimits { } impl BaseRelay { - pub(crate) fn unsupported_search_closed( - subscription_id: &SubscriptionId, - filters: &[Filter], - ) -> Option<RelayMessage> { - filters - .iter() - .any(|filter| filter.search().is_some()) - .then(|| RelayMessage::Closed { - subscription_id: subscription_id.clone(), - message: "unsupported: search filters are not supported".to_owned(), - }) - } - pub(crate) fn unsupported_search_present_closed( subscription_id: &SubscriptionId, search_present: bool, @@ -747,9 +764,20 @@ impl BaseRelay { ClientMessage::Count { subscription_id, filters, - } => self - .handle_count_with_auth(subscription_id, filters, auth) - .map(|message| vec![message]), + } => { + let search_present = filters.iter().any(|filter| filter.search().is_some()); + let filters = filters + .iter() + .map(tangle_filter_to_pocket) + .collect::<Result<Vec<_>, _>>()?; + self.handle_count_with_group_auth_report( + subscription_id, + filters, + search_present, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + .map(|report| vec![report.into_message()]) + } ClientMessage::Close(subscription_id) => { self.handle_close(&subscription_id); Ok(Vec::new()) @@ -1449,7 +1477,7 @@ impl BaseRelay { pub fn handle_count( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, ) -> Result<RelayMessage, BaseRelayError> { self.handle_count_with_group_auth( subscription_id, @@ -1461,7 +1489,7 @@ impl BaseRelay { pub fn handle_count_with_auth( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, auth: &BaseAuthState, ) -> Result<RelayMessage, BaseRelayError> { self.handle_count_with_auth_report(subscription_id, filters, auth) @@ -1471,7 +1499,7 @@ impl BaseRelay { pub(crate) fn handle_count_with_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, auth: &BaseAuthState, ) -> Result<BaseRelayCountReport, BaseRelayError> { Self::handle_count_with_shared_services( @@ -1479,9 +1507,7 @@ impl BaseRelay { self.groups.as_ref(), self.limits, self.query, - subscription_id, - filters, - auth, + BaseRelayCountQuery::new(subscription_id, filters, false, auth), ) } @@ -1490,35 +1516,39 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - subscription_id: SubscriptionId, - filters: Vec<Filter>, - auth: &BaseAuthState, + request: BaseRelayCountQuery<'_>, ) -> Result<BaseRelayCountReport, BaseRelayError> { + let group_auth = + GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned()); Self::handle_count_with_group_auth_shared_services( store, groups, limits, query, - subscription_id, - filters, - &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + BaseRelayGroupCountQuery { + subscription_id: request.subscription_id, + filters: request.filters, + search_present: request.search_present, + auth: &group_auth, + }, ) } fn handle_count_with_group_auth( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, auth: &GroupAuthContext, ) -> Result<RelayMessage, BaseRelayError> { - self.handle_count_with_group_auth_report(subscription_id, filters, auth) + self.handle_count_with_group_auth_report(subscription_id, filters, false, auth) .map(BaseRelayCountReport::into_message) } fn handle_count_with_group_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, auth: &GroupAuthContext, ) -> Result<BaseRelayCountReport, BaseRelayError> { Self::handle_count_with_group_auth_shared_services( @@ -1526,9 +1556,12 @@ impl BaseRelay { self.groups.as_ref(), self.limits, self.query, - subscription_id, - filters, - auth, + BaseRelayGroupCountQuery { + subscription_id, + filters, + search_present, + auth, + }, ) } @@ -1537,13 +1570,19 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - subscription_id: SubscriptionId, - filters: Vec<Filter>, - auth: &GroupAuthContext, + request: BaseRelayGroupCountQuery<'_>, ) -> Result<BaseRelayCountReport, BaseRelayError> { + let BaseRelayGroupCountQuery { + subscription_id, + filters, + search_present, + auth, + } = request; limits.validate_subscription_id(&subscription_id)?; - limits.validate_filters(&filters)?; - if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { + limits.validate_pocket_filters(&filters)?; + if let Some(message) = + Self::unsupported_search_present_closed(&subscription_id, search_present) + { return Ok(BaseRelayCountReport::new( message, false, @@ -1670,7 +1709,7 @@ impl BaseRelay { groups: Option<&GroupServiceHandle>, limits: BaseRelayLimits, query: PocketQueryConfig, - filters: &[Filter], + filters: &[PocketOwnedFilter], auth: &GroupAuthContext, ) -> Result<BaseRelayCountEventsReport, BaseRelayError> { let mut seen = BTreeSet::new(); @@ -1680,14 +1719,12 @@ impl BaseRelay { let hll_offset = Self::count_hll_offset(filters)?; let mut hll = hll_offset.map(|_| PocketHll8::new()); for filter in filters { - let filter = filter.without_limit(); - let filter = tangle_filter_to_pocket(&filter)?; let report = Self::query_filter_events_report_with_services( store, groups, limits, count_query, - &filter, + filter, auth, BaseRelayFilterLimitMode::PreserveCountLimitless, )?; @@ -1715,12 +1752,11 @@ impl BaseRelay { )) } - fn count_hll_offset(filters: &[Filter]) -> Result<Option<usize>, BaseRelayError> { + fn count_hll_offset(filters: &[PocketOwnedFilter]) -> Result<Option<usize>, BaseRelayError> { let [filter] = filters else { return Ok(None); }; - let pocket_filter = tangle_filter_to_pocket(filter)?; - pocket_filter + filter .hyperloglog_offset() .map_err(|error| BaseRelayError::error(error.to_string())) } @@ -1786,6 +1822,7 @@ impl BaseRelay { u32::try_from(limits.default_limit) .map_err(|_| BaseRelayError::invalid("default filter limit exceeds u32"))? } + (BaseRelayFilterLimitMode::PreserveCountLimitless, _) => u32::MAX, (_, limit) => limit, }; let ids = filter.ids().collect::<Vec<_>>(); @@ -1848,7 +1885,7 @@ fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool { #[cfg(test)] mod tests { use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE}; - use crate::pocket_conversion::tangle_event_to_pocket; + use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket}; use crate::relay::auth::BaseAuthState; use crate::relay::live::CloseResult; use tangle_crypto::RelaySigner; @@ -1865,8 +1902,66 @@ mod tests { Tag, UnixTimestamp, UnsignedEvent, filter_from_value, }; use tangle_store_pocket::{ - PocketEvent, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, + PocketEvent, PocketOwnedFilter, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, }; + + trait BaseRelayCountTestExt { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, crate::errors::BaseRelayError>; + + fn handle_count_with_auth_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<RelayMessage, crate::errors::BaseRelayError>; + } + + impl BaseRelayCountTestExt for BaseRelay { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, crate::errors::BaseRelayError> { + let search_present = filters.iter().any(|filter| filter.search().is_some()); + let filters = pocket_filters(filters)?; + self.handle_count_with_group_auth_report( + subscription_id, + filters, + search_present, + &GroupAuthContext::unauthenticated(), + ) + .map(|report| report.into_message()) + } + + fn handle_count_with_auth_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<RelayMessage, crate::errors::BaseRelayError> { + let search_present = filters.iter().any(|filter| filter.search().is_some()); + let filters = pocket_filters(filters)?; + let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); + self.handle_count_with_group_auth_report( + subscription_id, + filters, + search_present, + &group_auth, + ) + .map(|report| report.into_message()) + } + } + + fn pocket_filters( + filters: Vec<Filter>, + ) -> Result<Vec<PocketOwnedFilter>, crate::errors::BaseRelayError> { + filters.iter().map(tangle_filter_to_pocket).collect() + } + #[test] fn base_relay_stores_queries_counts_closes_and_fans_out_public_events() { let mut relay = test_relay("base-relay-public", 4); @@ -1900,7 +1995,7 @@ mod tests { assert_eq!(messages[1], RelayMessage::Eose(subscription_id.clone())); assert_eq!( relay - .handle_count(subscription_id.clone(), vec![filter]) + .handle_count_protocol(subscription_id.clone(), vec![filter]) .expect("count"), RelayMessage::Count { subscription_id: subscription_id.clone(), @@ -1995,7 +2090,7 @@ mod tests { assert_eq!(relay.active_subscription_count(), 0); assert_eq!( relay - .handle_count(count_id.clone(), vec![search]) + .handle_count_protocol(count_id.clone(), vec![search]) .expect("count"), RelayMessage::Closed { subscription_id: count_id, @@ -2306,7 +2401,7 @@ mod tests { .expect("author count filter"); assert_eq!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-visible").expect("sub"), vec![market_notes.clone(), author_events.clone()] ) @@ -2319,7 +2414,7 @@ mod tests { ); assert_eq!( relay - .handle_count_with_auth( + .handle_count_with_auth_protocol( SubscriptionId::new("count-auth").expect("sub"), vec![market_notes, author_events], &auth @@ -2346,7 +2441,7 @@ mod tests { ); assert!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("limit-count").expect("sub"), vec![too_large_limit] ) @@ -2370,7 +2465,7 @@ mod tests { let search_count = SubscriptionId::new("search-count").expect("sub"); assert_eq!( relay - .handle_count(search_count.clone(), vec![search]) + .handle_count_protocol(search_count.clone(), vec![search]) .expect("search count"), RelayMessage::Closed { subscription_id: search_count, @@ -2436,7 +2531,7 @@ mod tests { ); assert!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("cnt").expect("sub"), vec![Filter::empty(), Filter::empty()] ) @@ -2446,7 +2541,7 @@ mod tests { ); assert!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("tag").expect("sub"), vec![ filter_from_value(&serde_json::json!({"#t":["one", "two"]})) @@ -2459,7 +2554,7 @@ mod tests { ); assert!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("max").expect("sub"), vec![filter_from_value(&serde_json::json!({"limit":3})).expect("filter")] ) @@ -2533,7 +2628,7 @@ mod tests { assert_eq!(relay.active_subscription_count(), 0); assert!( relay - .handle_count(SubscriptionId::new("cnt").expect("sub"), vec![complex]) + .handle_count_protocol(SubscriptionId::new("cnt").expect("sub"), vec![complex]) .expect_err("count complexity") .prefixed_message() .contains("max_query_complexity 4") @@ -2567,7 +2662,7 @@ mod tests { assert_eq!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-limit").expect("sub"), vec![limited_market] ) @@ -2581,7 +2676,7 @@ mod tests { assert_eq!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-dedupe").expect("sub"), vec![market_notes, author_events] ) @@ -2607,7 +2702,7 @@ mod tests { } let RelayMessage::Count { count, hll, .. } = relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-hll-public").expect("sub"), vec![ filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) @@ -2662,7 +2757,7 @@ mod tests { ); let limited = relay - .handle_count_with_auth( + .handle_count_with_auth_protocol( SubscriptionId::new("count-hll-limited").expect("sub"), vec![ filter_from_value( @@ -2683,7 +2778,7 @@ mod tests { )); let redacted = relay - .handle_count_with_auth( + .handle_count_with_auth_protocol( SubscriptionId::new("count-hll-redacted").expect("sub"), vec![ filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) @@ -2745,7 +2840,7 @@ mod tests { assert_eq!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-unbounded").expect("sub"), vec![unbounded] ) @@ -2758,7 +2853,7 @@ mod tests { ); assert_eq!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("count-client-limited").expect("sub"), vec![client_limited] ) @@ -3959,7 +4054,7 @@ mod tests { ); assert!( relay - .handle_count( + .handle_count_protocol( SubscriptionId::new("a").expect("sub"), vec![ filter_from_value(&serde_json::json!({"#t":["one", "two"]})) @@ -4204,7 +4299,7 @@ mod tests { let subscription_id = SubscriptionId::new(&format!("count-{kind}")).expect("sub"); let filter = filter_kind(kind); match relay - .handle_count(subscription_id, vec![filter]) + .handle_count_protocol(subscription_id, vec![filter]) .expect("count") { RelayMessage::Count { count, .. } => count, @@ -4215,7 +4310,7 @@ mod tests { fn count_kind_with_auth(relay: &BaseRelay, kind: u32, auth: &BaseAuthState) -> u64 { let subscription_id = SubscriptionId::new(&format!("count-auth-{kind}")).expect("sub"); match relay - .handle_count_with_auth(subscription_id, vec![filter_kind(kind)], auth) + .handle_count_with_auth_protocol(subscription_id, vec![filter_kind(kind)], auth) .expect("count") { RelayMessage::Count { count, .. } => count, @@ -4225,7 +4320,7 @@ mod tests { fn count_filter(relay: &BaseRelay, subscription_id: &str, filter: Filter) -> u64 { match relay - .handle_count( + .handle_count_protocol( SubscriptionId::new(subscription_id).expect("sub"), vec![filter], ) @@ -4243,7 +4338,7 @@ mod tests { auth: &BaseAuthState, ) -> u64 { match relay - .handle_count_with_auth( + .handle_count_with_auth_protocol( SubscriptionId::new(subscription_id).expect("sub"), vec![filter], auth, diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -8,7 +8,6 @@ use crate::{ groups::GroupServiceHandle, logging, ops::{BaseRelayReadinessHandle, BaseRelayReadinessState}, - pocket_conversion::pocket_filter_to_tangle, pocket_event_validation::{pocket_event_id, pocket_event_kind, pocket_event_pubkey}, rate_limits::{ TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, @@ -17,8 +16,8 @@ use crate::{ relay::{ auth::BaseAuthState, core::{ - BaseRelay, BaseRelayCountReport, BaseRelayEventWrite, BaseRelayLimits, - BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayReqQuery, + BaseRelay, BaseRelayCountQuery, BaseRelayCountReport, BaseRelayEventWrite, + BaseRelayLimits, BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayReqQuery, BaseRelayShutdownReport, }, live::LiveSubscriptionSet, @@ -41,9 +40,7 @@ use tangle_groups::{ GroupAuthContext, GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset, validate_client_group_event_structure, }; -use tangle_protocol::{ - EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp, -}; +use tangle_protocol::{Kind, RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ PocketEvent, PocketFilter, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle, PocketTime, }; @@ -75,17 +72,6 @@ impl TangleClientRateLimitContext { } } -struct TangleQueryRateLimitRequest<'a> { - scope: TangleRateLimitScope, - rules: TangleQueryRateLimitConfig, - label: &'static str, - subscription_id: &'a SubscriptionId, - filters: &'a [Filter], - auth: &'a BaseAuthState, - context: TangleClientRateLimitContext, - now: UnixTimestamp, -} - struct TanglePocketQueryRateLimitRequest<'a> { scope: TangleRateLimitScope, rules: TangleQueryRateLimitConfig, @@ -130,28 +116,6 @@ impl TangleQueryClassifier { Self { limits } } - fn classify( - self, - scope: TangleRateLimitScope, - filters: &[Filter], - ) -> TangleQueryClassification { - match scope { - TangleRateLimitScope::Req => self.classify_query(filters), - TangleRateLimitScope::Count => self.classify_count(filters), - TangleRateLimitScope::Auth - | TangleRateLimitScope::Event - | TangleRateLimitScope::GroupWrite => self.classify_query(filters), - } - } - - fn classify_query(self, filters: &[Filter]) -> TangleQueryClassification { - self.classify_filters(filters, Self::classify_query_filter) - } - - fn classify_count(self, filters: &[Filter]) -> TangleQueryClassification { - self.classify_filters(filters, Self::classify_count_filter) - } - fn classify_pocket_query(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification { if filters.is_empty() { return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters); @@ -163,49 +127,45 @@ impl TangleQueryClassifier { .unwrap_or(TangleQueryClassification::Bounded) } - fn classify_filters( - self, - filters: &[Filter], - classify_filter: fn(Self, &Filter) -> TangleQueryClassification, - ) -> TangleQueryClassification { + fn classify_pocket_count(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification { if filters.is_empty() { return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters); } filters .iter() - .map(|filter| classify_filter(self, filter)) + .map(|filter| self.classify_pocket_count_filter(filter)) .find(|classification| classification.is_broad()) .unwrap_or(TangleQueryClassification::Bounded) } - fn classify_query_filter(self, filter: &Filter) -> TangleQueryClassification { - if !self.has_primary_constraint(filter) { + fn classify_pocket_query_filter(self, filter: &PocketFilter) -> TangleQueryClassification { + if !self.has_pocket_primary_constraint(filter) { return TangleQueryClassification::Broad( TangleBroadQueryReason::MissingPrimaryConstraint, ); } - if self.has_high_limit(filter) { + if self.has_pocket_high_limit(filter) { return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); } - if self.has_broad_time_window(filter) && !self.has_strong_constraint(filter) { + if self.has_pocket_broad_time_window(filter) && !self.has_pocket_strong_constraint(filter) { return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); } TangleQueryClassification::Bounded } - fn classify_count_filter(self, filter: &Filter) -> TangleQueryClassification { - if !self.has_primary_constraint(filter) { + fn classify_pocket_count_filter(self, filter: &PocketFilter) -> TangleQueryClassification { + if !self.has_pocket_primary_constraint(filter) { return TangleQueryClassification::Broad( TangleBroadQueryReason::MissingPrimaryConstraint, ); } - if self.has_high_limit(filter) { + if self.has_pocket_high_limit(filter) { return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); } - if self.has_broad_time_window(filter) { + if self.has_pocket_broad_time_window(filter) { return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); } - if !self.has_count_bounded_selector(filter) { + if !self.has_pocket_count_bounded_selector(filter) { return TangleQueryClassification::Broad( TangleBroadQueryReason::MissingBoundedSelector, ); @@ -213,87 +173,6 @@ impl TangleQueryClassifier { TangleQueryClassification::Bounded } - fn classify_pocket_query_filter(self, filter: &PocketFilter) -> TangleQueryClassification { - if !self.has_pocket_primary_constraint(filter) { - return TangleQueryClassification::Broad( - TangleBroadQueryReason::MissingPrimaryConstraint, - ); - } - if self.has_pocket_high_limit(filter) { - return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); - } - if self.has_pocket_broad_time_window(filter) && !self.has_pocket_strong_constraint(filter) { - return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); - } - TangleQueryClassification::Bounded - } - - fn has_primary_constraint(self, filter: &Filter) -> bool { - !filter.ids().is_empty() - || !filter.authors().is_empty() - || !filter.kinds().is_empty() - || self.has_group_constraint(filter) - } - - fn has_strong_constraint(self, filter: &Filter) -> bool { - !filter.ids().is_empty() - || !filter.authors().is_empty() - || self.has_group_constraint(filter) - } - - fn has_count_bounded_selector(self, filter: &Filter) -> bool { - self.has_strong_constraint(filter) - || (!filter.kinds().is_empty() && self.has_bounded_time_window(filter)) - || self.has_hll_count_selector(filter) - } - - fn has_hll_count_selector(self, filter: &Filter) -> bool { - let [kind] = filter.kinds() else { - return false; - }; - let mut tags = filter.tag_filters().iter(); - let Some((name, values)) = tags.next() else { - return false; - }; - if tags.next().is_some() || values.len() != 1 { - return false; - } - match (kind.as_u32(), name.as_str()) { - (3, "p") => PublicKeyHex::new(values[0].as_str()).is_ok(), - (7, "e") => EventId::new(values[0].as_str()).is_ok(), - _ => false, - } - } - - fn has_group_constraint(self, filter: &Filter) -> bool { - filter - .tag_filters() - .iter() - .any(|(name, values)| matches!(name.as_str(), "h" | "d") && !values.is_empty()) - } - - fn has_high_limit(self, filter: &Filter) -> bool { - filter.limit().unwrap_or(self.limits.default_limit()) >= self.limits.max_limit() - } - - fn has_bounded_time_window(self, filter: &Filter) -> bool { - match (filter.since(), filter.until()) { - (Some(since), Some(until)) => { - until.as_u64().saturating_sub(since.as_u64()) <= BROAD_QUERY_TIME_WINDOW_SECONDS - } - _ => false, - } - } - - fn has_broad_time_window(self, filter: &Filter) -> bool { - match (filter.since(), filter.until()) { - (Some(since), Some(until)) => { - until.as_u64().saturating_sub(since.as_u64()) > BROAD_QUERY_TIME_WINDOW_SECONDS - } - _ => false, - } - } - fn has_pocket_primary_constraint(self, filter: &PocketFilter) -> bool { filter.num_ids() > 0 || filter.num_authors() > 0 @@ -305,6 +184,18 @@ impl TangleQueryClassifier { filter.num_ids() > 0 || filter.num_authors() > 0 || self.has_pocket_group_constraint(filter) } + fn has_pocket_count_bounded_selector(self, filter: &PocketFilter) -> bool { + self.has_pocket_strong_constraint(filter) + || (filter.num_kinds() > 0 && self.has_pocket_bounded_time_window(filter)) + || self.has_pocket_hll_count_selector(filter) + } + + fn has_pocket_hll_count_selector(self, filter: &PocketFilter) -> bool { + filter + .hyperloglog_offset() + .is_ok_and(|offset| offset.is_some()) + } + fn has_pocket_group_constraint(self, filter: &PocketFilter) -> bool { filter .tags() @@ -327,6 +218,17 @@ impl TangleQueryClassifier { limit >= self.limits.max_limit() } + fn has_pocket_bounded_time_window(self, filter: &PocketFilter) -> bool { + if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() { + return false; + } + filter + .until() + .as_ref() + .saturating_sub(*filter.since().as_ref()) + <= BROAD_QUERY_TIME_WINDOW_SECONDS + } + fn has_pocket_broad_time_window(self, filter: &PocketFilter) -> bool { if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() { return false; @@ -673,7 +575,8 @@ impl TangleRuntimeShared { fn handle_count_with_auth_report( &self, subscription_id: SubscriptionId, - filters: Vec<Filter>, + filters: Vec<PocketOwnedFilter>, + search_present: bool, auth: &BaseAuthState, ) -> Result<BaseRelayCountReport, BaseRelayError> { BaseRelay::handle_count_with_shared_services( @@ -681,32 +584,10 @@ impl TangleRuntimeShared { self.groups.as_ref(), self.limits.base_relay_limits(), self.config.pocket_query_config(), - subscription_id, - filters, - auth, + BaseRelayCountQuery::new(subscription_id, filters, search_present, auth), ) } - fn rate_limit_req( - &self, - subscription_id: &SubscriptionId, - filters: &[Filter], - auth: &BaseAuthState, - context: TangleClientRateLimitContext, - now: UnixTimestamp, - ) -> Option<RelayMessage> { - self.rate_limit_query(TangleQueryRateLimitRequest { - scope: TangleRateLimitScope::Req, - rules: self.config.rate_limits().req(), - label: "req", - subscription_id, - filters, - auth, - context, - now, - }) - } - fn rate_limit_req_pocket( &self, subscription_id: &SubscriptionId, @@ -727,15 +608,15 @@ impl TangleRuntimeShared { }) } - fn rate_limit_count( + fn rate_limit_count_pocket( &self, subscription_id: &SubscriptionId, - filters: &[Filter], + filters: &[PocketOwnedFilter], auth: &BaseAuthState, context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Option<RelayMessage> { - self.rate_limit_query(TangleQueryRateLimitRequest { + self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest { scope: TangleRateLimitScope::Count, rules: self.config.rate_limits().count(), label: "count", @@ -750,10 +631,10 @@ impl TangleRuntimeShared { fn refuse_broad_count( &self, subscription_id: &SubscriptionId, - filters: &[Filter], + filters: &[PocketOwnedFilter], ) -> Option<RelayMessage> { if TangleQueryClassifier::new(self.limits.base_relay_limits()) - .classify_count(filters) + .classify_pocket_count(filters) .is_broad() { self.metrics.record_count_refusal(); @@ -767,85 +648,6 @@ impl TangleRuntimeShared { None } - fn rate_limit_query(&self, request: TangleQueryRateLimitRequest<'_>) -> Option<RelayMessage> { - if let Some(peer_ip) = request.context.peer_ip - && let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::ip(request.scope, peer_ip), - request.rules.per_ip(), - request.label, - "ip", - request.now, - ) - { - return Some(message); - } - if let Some(connection_id) = request.context.connection_id - && let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::connection(request.scope, connection_id), - request.rules.per_connection(), - request.label, - "connection", - request.now, - ) - { - return Some(message); - } - for pubkey in request.auth.authenticated_pubkeys() { - if let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::pubkey(request.scope, pubkey.clone()), - request.rules.per_pubkey(), - request.label, - "pubkey", - request.now, - ) { - return Some(message); - } - } - for group_id in filter_group_ids(request.filters) { - if let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::group(request.scope, group_id), - request.rules.per_group(), - request.label, - "group", - request.now, - ) { - return Some(message); - } - } - for kind in filter_kinds(request.filters) { - if let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::kind(request.scope, kind), - request.rules.per_kind(), - request.label, - "kind", - request.now, - ) { - return Some(message); - } - } - let query_classification = TangleQueryClassifier::new(self.limits.base_relay_limits()) - .classify(request.scope, request.filters); - if query_classification.is_broad() - && let Some(message) = self.rate_limit_closed( - request.subscription_id, - TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad), - request.rules.broad(), - request.label, - "broad", - request.now, - ) - { - self.metrics.record_broad_query_rejection(); - return Some(message); - } - None - } - fn rate_limit_pocket_query( &self, request: TanglePocketQueryRateLimitRequest<'_>, @@ -910,8 +712,14 @@ impl TangleRuntimeShared { return Some(message); } } - let query_classification = TangleQueryClassifier::new(self.limits.base_relay_limits()) - .classify_pocket_query(request.filters); + let classifier = TangleQueryClassifier::new(self.limits.base_relay_limits()); + let query_classification = match request.scope { + TangleRateLimitScope::Req => classifier.classify_pocket_query(request.filters), + TangleRateLimitScope::Count => classifier.classify_pocket_count(request.filters), + TangleRateLimitScope::Auth + | TangleRateLimitScope::Event + | TangleRateLimitScope::GroupWrite => classifier.classify_pocket_query(request.filters), + }; if query_classification.is_broad() && let Some(message) = self.rate_limit_closed( request.subscription_id, @@ -1192,7 +1000,6 @@ impl TangleRuntimeHandle { filters, search_present, } => { - let filters = runtime_filters_to_protocol(filters, search_present)?; let started_at = Instant::now(); self.inner .limits @@ -1201,9 +1008,9 @@ impl TangleRuntimeHandle { self.inner .limits .base_relay_limits() - .validate_filters(&filters)?; + .validate_pocket_filters(&filters)?; if let Some(message) = - BaseRelay::unsupported_search_closed(&subscription_id, &filters) + BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) { self.inner .metrics @@ -1216,7 +1023,7 @@ impl TangleRuntimeHandle { .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message.into()]); } - if let Some(message) = self.inner.rate_limit_count( + if let Some(message) = self.inner.rate_limit_count_pocket( &subscription_id, &filters, auth, @@ -1228,9 +1035,12 @@ impl TangleRuntimeHandle { .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message.into()]); } - let report = - self.inner - .handle_count_with_auth_report(subscription_id, filters, auth)?; + let report = self.inner.handle_count_with_auth_report( + subscription_id, + filters, + search_present, + auth, + )?; self.inner .metrics .record_query_metrics(report.query_metrics()); @@ -1329,18 +1139,6 @@ impl TangleRuntimeHandle { self.inner.rate_limiter.clone() } - pub async fn rate_limit_req( - &self, - subscription_id: &SubscriptionId, - filters: &[Filter], - auth: &BaseAuthState, - rate_limit_context: TangleClientRateLimitContext, - now: UnixTimestamp, - ) -> Option<RelayMessage> { - self.inner - .rate_limit_req(subscription_id, filters, auth, rate_limit_context, now) - } - pub(crate) async fn rate_limit_req_pocket( &self, subscription_id: &SubscriptionId, @@ -1476,20 +1274,6 @@ fn directory_size_bytes(path: &Path) -> u64 { .sum() } -fn runtime_filters_to_protocol( - filters: Vec<tangle_store_pocket::PocketOwnedFilter>, - search_present: bool, -) -> Result<Vec<Filter>, BaseRelayError> { - filters - .into_iter() - .enumerate() - .map(|(index, filter)| { - let search = (search_present && index == 0).then(String::new); - pocket_filter_to_tangle(&filter, search) - }) - .collect() -} - #[cfg(test)] fn protocol_client_message_to_runtime_for_test( message: tangle_protocol::ClientMessage, @@ -1563,28 +1347,6 @@ fn runtime_client_message_metric_kind( } } -fn filter_group_ids(filters: &[Filter]) -> Vec<GroupId> { - filters - .iter() - .flat_map(|filter| filter.tag_filters()) - .filter(|(name, _)| matches!(name.as_str(), "h" | "d")) - .flat_map(|(_, values)| values) - .filter_map(|value| GroupId::new(value.as_str()).ok()) - .collect::<BTreeSet<_>>() - .into_iter() - .collect() -} - -fn filter_kinds(filters: &[Filter]) -> Vec<Kind> { - filters - .iter() - .flat_map(Filter::kinds) - .copied() - .collect::<BTreeSet<_>>() - .into_iter() - .collect() -} - fn pocket_filter_group_ids(filters: &[PocketOwnedFilter]) -> Vec<GroupId> { let mut group_ids = BTreeSet::new(); for filter in filters { @@ -3427,67 +3189,59 @@ mod tests { #[test] fn query_classifier_identifies_broad_count_shapes() { let classifier = TangleQueryClassifier::new(runtime_relay_limits(8)); - let empty_filter = filter_from_value(&json!({})).expect("filter"); - let tag_only_filter = - filter_from_value(&json!({"#t": ["market"], "limit": 1})).expect("filter"); - let kind_only_filter = - filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter"); - let high_limit_filter = - filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 500})) - .expect("filter"); - let broad_time_filter = filter_from_value(&json!({ + let empty_filter = pocket_filter(json!({})); + let tag_only_filter = pocket_filter(json!({"#t": ["market"], "limit": 1})); + let kind_only_filter = pocket_filter(json!({"kinds": [1], "limit": 1})); + let high_limit_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 500})); + let broad_time_filter = pocket_filter(json!({ "kinds": [1], "since": 1, "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2, "limit": 1 - })) - .expect("filter"); - let bounded_group_filter = - filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"); - let bounded_time_filter = filter_from_value(&json!({ + })); + let bounded_group_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 1})); + let bounded_time_filter = pocket_filter(json!({ "kinds": [1], "since": 1, "until": BROAD_QUERY_TIME_WINDOW_SECONDS, "limit": 1 - })) - .expect("filter"); - let hll_reaction_filter = - filter_from_value(&json!({"kinds": [7], "#e": ["a".repeat(64)]})).expect("filter"); + })); + let hll_reaction_filter = pocket_filter(json!({"kinds": [7], "#e": ["a".repeat(64)]})); assert_eq!( - classifier.classify_count(&[]), + classifier.classify_pocket_count(&[]), TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters) ); assert_eq!( - classifier.classify_count(&[empty_filter]), + classifier.classify_pocket_count(&[empty_filter]), TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint) ); assert_eq!( - classifier.classify_count(&[tag_only_filter]), + classifier.classify_pocket_count(&[tag_only_filter]), TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint) ); assert_eq!( - classifier.classify_count(&[kind_only_filter]), + classifier.classify_pocket_count(&[kind_only_filter]), TangleQueryClassification::Broad(TangleBroadQueryReason::MissingBoundedSelector) ); assert_eq!( - classifier.classify_count(&[high_limit_filter]), + classifier.classify_pocket_count(&[high_limit_filter]), TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit) ); assert_eq!( - classifier.classify_count(&[broad_time_filter]), + classifier.classify_pocket_count(&[broad_time_filter]), TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow) ); assert_eq!( - classifier.classify_count(&[bounded_group_filter]), + classifier.classify_pocket_count(&[bounded_group_filter]), TangleQueryClassification::Bounded ); assert_eq!( - classifier.classify_count(&[bounded_time_filter]), + classifier.classify_pocket_count(&[bounded_time_filter]), TangleQueryClassification::Bounded ); assert_eq!( - classifier.classify_count(&[hll_reaction_filter]), + classifier.classify_pocket_count(&[hll_reaction_filter]), TangleQueryClassification::Bounded ); } diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -13,7 +13,7 @@ use tangle_groups::{ }; use tangle_protocol::{ Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp, event_to_value, - filter_from_value, parse_client_message, parse_event_json, + filter_from_value, filter_to_value, parse_client_message, parse_event_json, }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, @@ -29,7 +29,7 @@ use tangle_runtime::{ use tangle_store_pocket::{ PocketQueryConfig, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, - parse_pocket_event_json, + parse_pocket_event_json, parse_pocket_filter_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, @@ -68,6 +68,50 @@ impl BaseRelayEventTestExt for BaseRelay { } } +trait BaseRelayCountTestExt { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, BaseRelayError>; + + fn handle_count_with_auth_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError>; +} + +impl BaseRelayCountTestExt for BaseRelay { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, BaseRelayError> { + BaseRelay::handle_count(self, subscription_id, pocket_filters(filters)) + } + + fn handle_count_with_auth_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + BaseRelay::handle_count_with_auth(self, subscription_id, pocket_filters(filters), auth) + } +} + +fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedFilter> { + filters + .iter() + .map(|filter| { + let raw = serde_json::to_vec(&filter_to_value(filter)).expect("filter JSON"); + parse_pocket_filter_json(&raw).expect("pocket filter") + }) + .collect() +} + #[test] fn public_relay_smoke_stores_queries_counts_and_fans_out() { let config = test_store_config("public-smoke"); @@ -86,7 +130,7 @@ fn public_relay_smoke_stores_queries_counts_and_fans_out() { &[&first], ); assert_count( - relay.handle_count(subscription("public-count"), vec![filter_kind(1)]), + relay.handle_count_protocol(subscription("public-count"), vec![filter_kind(1)]), 1, ); assert_eq!(relay.handle_close(&query_id), CloseResult::Closed); @@ -327,7 +371,7 @@ fn group_auth_lifecycle_membership_and_flag_flows_pass_in_process() { &remove, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("members"), vec![filter_kind(KIND_GROUP_MEMBERS)], ), @@ -492,28 +536,28 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { ); assert_eq!(relay.active_subscription_count(), 0); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-count-unauth"), vec![filter_group_tag(1, "h", "PrivateFarm")], ), 0, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-metadata-unauth"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "PrivateFarm")], ), 1, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-admins-unauth"), vec![filter_group_tag(KIND_GROUP_ADMINS, "d", "PrivateFarm")], ), 1, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-members-unauth"), vec![filter_kind(KIND_GROUP_MEMBERS)], ), @@ -533,7 +577,7 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { ); assert_eq!(relay.handle_close(&owner_query_id), CloseResult::Closed); assert_count( - relay.handle_count_with_auth( + relay.handle_count_with_auth_protocol( subscription("private-count-owner"), vec![filter_group_tag(1, "h", "PrivateFarm")], &owner_auth, @@ -585,14 +629,14 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { accept_group_create(&mut relay, "HiddenFarm", &["hidden"], 10, &owner_auth); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("hidden-unauth"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "HiddenFarm")], ), 0, ); assert_count( - relay.handle_count_with_auth( + relay.handle_count_with_auth_protocol( subscription("hidden-owner"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "HiddenFarm")], &owner_auth, @@ -678,7 +722,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "auth-required: group event author must authenticate with AUTH" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("unauthorized-generated"), vec![filter_group_tag( KIND_GROUP_METADATA, @@ -722,7 +766,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { }] ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-count-unauth"), vec![filter_group_tag(1, "h", "LeakPrivate")], ), @@ -783,14 +827,14 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { ))); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-metadata-public"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakPrivate")], ), 1, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("private-members-public"), vec![filter_group_tag(KIND_GROUP_MEMBERS, "d", "LeakPrivate")], ), @@ -799,14 +843,14 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { accept_group_create(&mut relay, "LeakHidden", &["hidden"], 20, &owner_auth); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("hidden-metadata-public"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakHidden")], ), 0, ); assert_count( - relay.handle_count_with_auth( + relay.handle_count_with_auth_protocol( subscription("hidden-metadata-owner"), vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakHidden")], &owner_auth, @@ -833,7 +877,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "restricted: group is unavailable" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("restricted-count"), vec![filter_group_tag(1, "h", "LeakRestricted")], ), @@ -852,7 +896,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "restricted: group is unavailable" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("closed-join-count"), vec![filter_group_tag(KIND_GROUP_JOIN_REQUEST, "h", "LeakClosed")], ), @@ -868,7 +912,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { &closed_normal, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("closed-normal-count"), vec![filter_group_tag(1, "h", "LeakClosed")], ), @@ -886,7 +930,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "duplicate: group member already exists" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("duplicate-join-count"), vec![filter_group_tag( KIND_GROUP_JOIN_REQUEST, @@ -907,7 +951,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "duplicate: group member does not exist" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("duplicate-leave-count"), vec![filter_group_tag( KIND_GROUP_LEAVE_REQUEST, @@ -940,7 +984,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "blocked: relay-generated group state events cannot be submitted by clients" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("client-generated-count"), vec![filter_group_tag(kind, "d", "ClientGenerated")], ), @@ -971,7 +1015,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { &delete_target, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("deleted-target-count"), vec![filter_group_tag(1, "h", "LeakDeleted")], ), @@ -1034,7 +1078,7 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { "restricted: missing group capability manage_members" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("unauthorized-put-count"), vec![filter_group_tag( KIND_GROUP_PUT_USER, @@ -1082,14 +1126,14 @@ fn delete_and_secondary_privacy_surfaces_are_read_gated_or_absent() { ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("deleted-target"), vec![filter_group_tag(1, "h", "DeleteFarm")], ), 0, ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("delete-marker"), vec![filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "DeleteFarm")], ), @@ -1116,7 +1160,7 @@ fn delete_and_secondary_privacy_surfaces_are_read_gated_or_absent() { "blocked: group is deleted" ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("group-marker"), vec![filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "DeleteFarm")], ), @@ -1166,7 +1210,7 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("tombstone-note-before"), vec![filter_group_tag(1, "h", "TombstoneFarm")], ), @@ -1178,7 +1222,7 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ("tombstone-members-before", KIND_GROUP_MEMBERS), ] { assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription(subscription_id), vec![filter_group_tag(kind, "d", "TombstoneFarm")], ), @@ -1196,7 +1240,7 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ); assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("tombstone-note-after"), vec![filter_group_tag(1, "h", "TombstoneFarm")], ), @@ -1225,7 +1269,7 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ), ] { assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription(subscription_id), vec![filter_group_tag(kind, "d", "TombstoneFarm")], ), @@ -1238,7 +1282,7 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ); } assert_count( - relay.handle_count( + relay.handle_count_protocol( subscription("tombstone-marker-after"), vec![filter_group_tag( KIND_GROUP_DELETE_GROUP, @@ -2822,7 +2866,7 @@ fn assert_count( fn count_kind(relay: &BaseRelay, kind: u32) -> u64 { let RelayMessage::Count { count, .. } = relay - .handle_count(subscription("count-kind"), vec![filter_kind(kind)]) + .handle_count_protocol(subscription("count-kind"), vec![filter_kind(kind)]) .expect("count") else { panic!("expected count") diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -19,7 +19,7 @@ use tangle_groups::{ }; use tangle_protocol::{ Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, SubscriptionId, Tag, - UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, + UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, filter_to_value, }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, @@ -31,7 +31,7 @@ use tangle_runtime::{ }; use tangle_store_pocket::{ PocketStoreConfig, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, - TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, + TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, @@ -69,6 +69,34 @@ impl BaseRelayEventTestExt for BaseRelay { } } +trait BaseRelayCountTestExt { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, BaseRelayError>; +} + +impl BaseRelayCountTestExt for BaseRelay { + fn handle_count_protocol( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<RelayMessage, BaseRelayError> { + BaseRelay::handle_count(self, subscription_id, pocket_filters(filters)) + } +} + +fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedFilter> { + filters + .iter() + .map(|filter| { + let raw = serde_json::to_vec(&filter_to_value(filter)).expect("filter JSON"); + parse_pocket_filter_json(&raw).expect("pocket filter") + }) + .collect() +} + #[tokio::test] async fn tangle_run_serves_until_shutdown() { let root = temp_root("acceptance-server"); @@ -1644,7 +1672,9 @@ fn runtime_req_handling_does_not_lock_relay_state() { fn runtime_count_handling_does_not_lock_relay_state() { let runtime = include_str!("../src/runtime.rs"); let count_branch = runtime - .split("RuntimeClientMessage::Count {") + .split( + " RuntimeClientMessage::Count {\n subscription_id,\n filters,\n search_present,\n } => {", + ) .nth(1) .expect("count branch") .split("RuntimeClientMessage::Auth") @@ -1652,9 +1682,11 @@ fn runtime_count_handling_does_not_lock_relay_state() { .expect("auth branch"); assert!(!count_branch.contains("relay.lock().await")); - assert!( - count_branch.contains("handle_count_with_auth_report(subscription_id, filters, auth)?") - ); + assert!(!count_branch.contains("runtime_filters_to_protocol(")); + assert!(count_branch.contains("validate_pocket_filters(&filters)?")); + assert!(count_branch.contains("rate_limit_count_pocket(")); + assert!(count_branch.contains("handle_count_with_auth_report(")); + assert!(count_branch.contains("search_present")); } #[test] @@ -1783,7 +1815,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { assert_relay_count( runtime .relay() - .handle_count( + .handle_count_protocol( subscription_id("pre-recovery-members"), vec![relay_filter( json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["RecoverSocket"]}), @@ -1814,7 +1846,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { assert_relay_count( recovered .relay() - .handle_count( + .handle_count_protocol( subscription_id("recovered-metadata"), vec![relay_filter( json!({"kinds":[KIND_GROUP_METADATA], "#d":["RecoverSocket"]}), @@ -1827,7 +1859,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { assert_relay_count( recovered .relay() - .handle_count( + .handle_count_protocol( subscription_id("recovered-admins"), vec![relay_filter( json!({"kinds":[KIND_GROUP_ADMINS], "#d":["RecoverSocket"]}), @@ -1840,7 +1872,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { assert_relay_count( recovered .relay() - .handle_count( + .handle_count_protocol( subscription_id("recovered-members"), vec![relay_filter( json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["RecoverSocket"]}), @@ -1853,7 +1885,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { assert_relay_count( recovered .relay() - .handle_count( + .handle_count_protocol( subscription_id("recovered-note"), vec![relay_filter(json!({"kinds":[1], "#h":["RecoverSocket"]}))], )