tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 513ac06a6db8c1d7e37a6e235b0a7f66c96f3ada
parent cc8c614a33051151a58fa7722f36648b781ba732
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 20:46:12 -0700

runtime: close protocol relay boundaries

- Add Pocket-native direct relay REQ and fanout APIs for production callers.
- Quarantine old protocol relay helpers behind explicit test-only names.
- Move benchmark and integration adapters to Pocket conversion boundaries.
- Validate runtime, workspace test, workspace check, source scans, and clippy lanes.

Diffstat:
Mcrates/tangle_bench/src/lib.rs | 27+++++++++++++++++++--------
Mcrates/tangle_runtime/src/pocket_conversion.rs | 13++++++++++---
Mcrates/tangle_runtime/src/relay/core.rs | 274+++++++++++++++++++++++++++++++------------------------------------------------
Mcrates/tangle_runtime/src/session.rs | 9++++-----
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 58++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 19+++++++++++++++++++
6 files changed, 217 insertions(+), 183 deletions(-)

diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -21,7 +21,7 @@ use tangle_runtime::{ runtime::{TangleRuntime, TangleRuntimeHandle}, }; use tangle_store_pocket::{ - PocketOwnedFilter, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, + PocketOwnedEvent, PocketOwnedFilter, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, parse_pocket_event_json, parse_pocket_filter_json, }; use tangle_test_support::{ @@ -1308,12 +1308,13 @@ fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, let public_group = dataset.first_group(BenchGroupVisibility::Public)?; let subscriber_count = dataset.config.group_count.max(4); let filter = filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?; + let pocket_filter = pocket_filter(&filter)?; for index in 0..subscriber_count { materialized .relay - .handle_req( + .handle_pocket_req( subscription(&format!("lag-{index:04}"))?, - vec![filter.clone()], + vec![pocket_filter.clone()], ) .map_err(|error| error.to_string())?; } @@ -1332,8 +1333,10 @@ fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, "broadcast lag second", )?; let started = Instant::now(); - let first_messages = materialized.relay.fanout(&first); - let second_messages = materialized.relay.fanout(&second); + let first_pocket = pocket_event(&first)?; + let second_pocket = pocket_event(&second)?; + let first_messages = materialized.relay.fanout_pocket(&first_pocket); + let second_messages = materialized.relay.fanout_pocket(&second_pocket); let elapsed = elapsed_micros(started); let first_events = first_messages .iter() @@ -1574,12 +1577,15 @@ fn query_for_operation( let subscription_id = subscription(operation.name)?; let messages = match operation.auth { QueryAuth::None => relay - .handle_req(subscription_id.clone(), vec![operation.filter.clone()]) + .handle_pocket_req( + subscription_id.clone(), + vec![pocket_filter(&operation.filter)?], + ) .map_err(|error| error.to_string())?, QueryAuth::Owner => relay - .handle_req_with_auth( + .handle_pocket_req_with_auth( subscription_id.clone(), - vec![operation.filter.clone()], + vec![pocket_filter(&operation.filter)?], owner_auth, ) .map_err(|error| error.to_string())?, @@ -1649,6 +1655,11 @@ fn pocket_filter(filter: &Filter) -> Result<PocketOwnedFilter, String> { parse_pocket_filter_json(&raw).map_err(|error| error.to_string()) } +fn pocket_event(event: &Event) -> Result<PocketOwnedEvent, String> { + let raw = serde_json::to_vec(&event_to_value(event)).map_err(|error| error.to_string())?; + parse_pocket_event_json(&raw).map_err(|error| error.to_string()) +} + fn pocket_filter_from_value(value: &serde_json::Value) -> Result<PocketOwnedFilter, String> { let filter = filter_from_value(value)?; pocket_filter(&filter) diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs @@ -2,12 +2,16 @@ use crate::errors::BaseRelayError; use std::str; +#[cfg(test)] +use tangle_protocol::Filter; use tangle_protocol::{ - Event, EventId, Filter, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, + Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, }; +#[cfg(test)] +use tangle_store_pocket::PocketOwnedFilter; use tangle_store_pocket::{ - PocketEvent, PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, - PocketPubkey, PocketSig, PocketTags, PocketTime, + PocketEvent, PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketPubkey, + PocketSig, PocketTags, PocketTime, }; pub(crate) fn tangle_event_to_pocket(event: &Event) -> Result<PocketOwnedEvent, BaseRelayError> { @@ -25,6 +29,7 @@ pub(crate) fn tangle_event_to_pocket(event: &Event) -> Result<PocketOwnedEvent, .map_err(|error| BaseRelayError::error(error.to_string())) } +#[cfg(test)] pub(crate) fn tangle_filter_to_pocket( filter: &Filter, ) -> Result<PocketOwnedFilter, BaseRelayError> { @@ -157,6 +162,7 @@ fn ensure_tag_size(size: usize) -> Result<(), BaseRelayError> { Ok(()) } +#[cfg(test)] fn ensure_filter_array_len(name: &str, len: usize) -> Result<(), BaseRelayError> { if len > usize::from(u16::MAX) { return Err(BaseRelayError::invalid(format!( @@ -166,6 +172,7 @@ fn ensure_filter_array_len(name: &str, len: usize) -> Result<(), BaseRelayError> Ok(()) } +#[cfg(test)] fn ensure_filter_size( ids: &[PocketEventId], authors: &[PocketPubkey], diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -4,11 +4,9 @@ use crate::groups::{ }; use crate::logging::{self, TangleModerationAuditResult}; use crate::ops::BaseRelayReadinessState; +use crate::pocket_conversion::pocket_event_to_tangle; #[cfg(test)] -use crate::pocket_conversion::pocket_event_id; -use crate::pocket_conversion::{ - pocket_event_to_tangle, tangle_event_to_pocket, tangle_filter_to_pocket, -}; +use crate::pocket_conversion::{pocket_event_id, tangle_event_to_pocket, tangle_filter_to_pocket}; use crate::pocket_event_validation::{ is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind, pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, @@ -29,8 +27,8 @@ use tangle_groups::{ classify_group_event, validate_client_group_event_structure, }; #[cfg(test)] -use tangle_protocol::ClientMessage; -use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; +use tangle_protocol::{ClientMessage, Event, Filter}; +use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ PocketEvent, PocketFilter, PocketHll8, PocketOwnedEvent, PocketOwnedFilter, PocketQueryConfig, PocketScreenResult, PocketStoreConfig, PocketStoreHandle, @@ -484,7 +482,8 @@ impl BaseRelayLimits { self.default_limit } - pub fn validate_event(&self, event: &Event) -> Result<(), BaseRelayError> { + #[cfg(test)] + fn validate_protocol_event_for_test(&self, event: &Event) -> Result<(), BaseRelayError> { if event.unsigned().tags().len() > self.max_event_tags { return Err(BaseRelayError::invalid(format!( "event tag count exceeds runtime max_event_tags {}", @@ -518,37 +517,6 @@ impl BaseRelayLimits { Ok(()) } - pub fn validate_filters(&self, filters: &[Filter]) -> Result<(), BaseRelayError> { - if filters.is_empty() { - return Err(BaseRelayError::invalid( - "request must include at least one filter", - )); - } - if filters.len() > self.max_filters_per_request { - return Err(BaseRelayError::invalid(format!( - "filter count exceeds runtime max_filters_per_request {}", - self.max_filters_per_request - ))); - } - for filter in filters { - let tag_values = filter.tag_filters().values().map(Vec::len).sum::<usize>(); - if tag_values > self.max_tag_values_per_filter { - return Err(BaseRelayError::invalid(format!( - "filter tag value count exceeds runtime max_tag_values_per_filter {}", - self.max_tag_values_per_filter - ))); - } - if filter.limit().is_some_and(|limit| limit > self.max_limit) { - return Err(BaseRelayError::invalid(format!( - "filter limit exceeds runtime max_limit {}", - self.max_limit - ))); - } - } - self.validate_query_complexity(filters)?; - Ok(()) - } - pub(crate) fn validate_pocket_filters( &self, filters: &[PocketOwnedFilter], @@ -588,10 +556,6 @@ impl BaseRelayLimits { Ok(()) } - fn effective_filter_limit(self, filter: &Filter) -> usize { - usize::try_from(filter.limit().unwrap_or(self.default_limit)).unwrap_or(usize::MAX) - } - fn effective_pocket_filter_limit(self, filter: &PocketFilter) -> usize { if filter.limit() == u32::MAX { usize::try_from(self.default_limit).unwrap_or(usize::MAX) @@ -600,20 +564,6 @@ impl BaseRelayLimits { } } - fn validate_query_complexity(&self, filters: &[Filter]) -> Result<(), BaseRelayError> { - let score = filters - .iter() - .map(|filter| self.filter_complexity(filter)) - .fold(0_usize, usize::saturating_add); - if score > self.max_query_complexity { - return Err(BaseRelayError::invalid(format!( - "query complexity {score} exceeds runtime max_query_complexity {}", - self.max_query_complexity - ))); - } - Ok(()) - } - fn validate_pocket_query_complexity( &self, filters: &[PocketOwnedFilter], @@ -631,23 +581,6 @@ impl BaseRelayLimits { Ok(()) } - fn filter_complexity(&self, filter: &Filter) -> usize { - let tag_score = filter - .tag_filters() - .values() - .map(|values| 1_usize.saturating_add(values.len())) - .fold(0_usize, usize::saturating_add); - 1_usize - .saturating_add(filter.ids().len()) - .saturating_add(filter.authors().len()) - .saturating_add(filter.kinds().len()) - .saturating_add(tag_score) - .saturating_add(usize::from(filter.since().is_some())) - .saturating_add(usize::from(filter.until().is_some())) - .saturating_add(filter.search().map(str::len).unwrap_or(0)) - .saturating_add(self.effective_filter_limit(filter)) - } - fn pocket_filter_complexity(&self, filter: &PocketFilter) -> usize { let tag_score = filter .tags() @@ -760,7 +693,7 @@ impl BaseRelay { ClientMessage::Req { subscription_id, filters, - } => self.handle_req_with_auth(subscription_id, filters, auth), + } => self.handle_protocol_req_with_auth_for_test(subscription_id, filters, auth), ClientMessage::Count { subscription_id, filters, @@ -846,18 +779,6 @@ impl BaseRelay { } } - pub fn query_events_with_auth( - &self, - filters: &[Filter], - auth: &BaseAuthState, - ) -> Result<Vec<Event>, BaseRelayError> { - self.limits.validate_filters(filters)?; - self.query_events( - filters, - &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), - ) - } - #[cfg(test)] fn handle_auth_message( &self, @@ -875,7 +796,7 @@ impl BaseRelay { auth: &mut BaseAuthState, now: UnixTimestamp, ) -> Vec<RelayMessage> { - if let Err(error) = limits.validate_event(&event) { + if let Err(error) = limits.validate_protocol_event_for_test(&event) { return vec![RelayMessage::Ok { event_id: event.id().clone(), accepted: false, @@ -1080,7 +1001,7 @@ impl BaseRelay { auth: &GroupAuthContext, ) -> Result<BaseRelayEventWrite, BaseRelayError> { let event_id = event.id().clone(); - if let Err(error) = limits.validate_event(&event) { + if let Err(error) = limits.validate_protocol_event_for_test(&event) { return Ok(BaseRelayEventWrite::unstored(ok_rejected( event_id, error.prefixed_message(), @@ -1318,42 +1239,81 @@ impl BaseRelay { )) } - pub fn handle_req( + pub fn handle_pocket_req( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_pocket_req_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::unauthenticated(), + ) + } + + #[cfg(test)] + pub fn handle_protocol_req_for_test( &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_req_with_group_auth( + self.handle_protocol_req_with_group_auth_for_test( subscription_id, filters, &GroupAuthContext::unauthenticated(), ) } - pub fn handle_req_with_auth( + #[cfg(test)] + pub fn handle_protocol_req_with_auth_for_test( &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, auth: &BaseAuthState, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_req_with_group_auth( + self.handle_protocol_req_with_group_auth_for_test( subscription_id, filters, &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), ) } - fn handle_req_with_group_auth( + #[cfg(test)] + fn handle_protocol_req_with_group_auth_for_test( &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, auth: &GroupAuthContext, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_req_with_group_auth_report(subscription_id, filters, auth) + self.handle_protocol_req_with_group_auth_report_for_test(subscription_id, filters, auth) + .and_then(BaseRelayQueryReport::into_protocol_messages) + } + + pub fn handle_pocket_req_with_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + auth: &BaseAuthState, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_pocket_req_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + } + + fn handle_pocket_req_with_group_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + auth: &GroupAuthContext, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_pocket_req_with_group_auth_report(subscription_id, filters, false, auth) .and_then(BaseRelayQueryReport::into_protocol_messages) } - fn handle_req_with_group_auth_report( + #[cfg(test)] + fn handle_protocol_req_with_group_auth_report_for_test( &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, @@ -1606,66 +1566,46 @@ impl BaseRelay { self.subscriptions.close(subscription_id) } - pub fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { - self.fanout_with_group_auth(event, &GroupAuthContext::unauthenticated()) + pub fn fanout_pocket(&mut self, event: &PocketEvent) -> Vec<RelayMessage> { + self.fanout_pocket_with_group_auth(event, &GroupAuthContext::unauthenticated()) } - pub fn fanout_with_group_auth( + pub fn fanout_pocket_with_group_auth( &mut self, - event: &Event, + event: &PocketEvent, auth: &GroupAuthContext, ) -> Vec<RelayMessage> { let groups = self.groups.as_ref(); - let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket"); self.subscriptions - .fanout(&pocket_event, auth, |event, auth| { + .fanout(event, auth, |event, auth| { Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false) }) .expect("Pocket live fanout must match") .into_iter() .map(|subscription_id| RelayMessage::Event { subscription_id, - event: event.clone(), + event: pocket_event_to_tangle(event).expect("Pocket fanout event must convert"), }) .collect() } - pub fn active_subscription_count(&self) -> usize { - self.subscriptions.active_count() + #[cfg(test)] + pub fn fanout_protocol_for_test(&mut self, event: &Event) -> Vec<RelayMessage> { + self.fanout_protocol_with_group_auth_for_test(event, &GroupAuthContext::unauthenticated()) } - fn query_events( - &self, - filters: &[Filter], + #[cfg(test)] + pub fn fanout_protocol_with_group_auth_for_test( + &mut self, + event: &Event, auth: &GroupAuthContext, - ) -> Result<Vec<Event>, BaseRelayError> { - self.limits.validate_filters(filters)?; - let filters = filters - .iter() - .map(tangle_filter_to_pocket) - .collect::<Result<Vec<_>, _>>()?; - self.query_events_report(&filters, auth).and_then(|report| { - report - .events - .into_iter() - .map(|event| pocket_event_to_tangle(&event)) - .collect() - }) + ) -> Vec<RelayMessage> { + let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket"); + self.fanout_pocket_with_group_auth(&pocket_event, auth) } - fn query_events_report( - &self, - filters: &[PocketOwnedFilter], - auth: &GroupAuthContext, - ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { - Self::query_events_report_with_services( - &self.store, - self.groups.as_ref(), - self.limits, - self.query, - filters, - auth, - ) + pub fn active_subscription_count(&self) -> usize { + self.subscriptions.active_count() } fn query_events_report_with_services( @@ -1987,7 +1927,7 @@ mod tests { ); let messages = relay - .handle_req(subscription_id.clone(), vec![filter.clone()]) + .handle_protocol_req_for_test(subscription_id.clone(), vec![filter.clone()]) .expect("req"); assert!( matches!(&messages[0], RelayMessage::Event { event: found, .. } if found.id() == event.id()) @@ -2004,13 +1944,13 @@ mod tests { } ); assert!(matches!( - relay.fanout(&event).as_slice(), + relay.fanout_protocol_for_test(&event).as_slice(), [RelayMessage::Event { subscription_id: delivered, event: found }] if delivered == &subscription_id && found.id() == event.id() )); assert_eq!(relay.handle_close(&subscription_id), CloseResult::Closed); assert_eq!(relay.active_subscription_count(), 0); - assert!(relay.fanout(&event).is_empty()); + assert!(relay.fanout_protocol_for_test(&event).is_empty()); } #[test] @@ -2033,7 +1973,7 @@ mod tests { ); assert!( strict - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("strict").expect("sub"), vec![broad.clone()] ) @@ -2059,7 +1999,7 @@ mod tests { &limited_event, ); let messages = limited - .handle_req(SubscriptionId::new("limited").expect("sub"), vec![broad]) + .handle_protocol_req_for_test(SubscriptionId::new("limited").expect("sub"), vec![broad]) .expect("limited scrape"); assert!( @@ -2080,7 +2020,7 @@ mod tests { assert_eq!( relay - .handle_req(req_id.clone(), vec![search.clone()]) + .handle_protocol_req_for_test(req_id.clone(), vec![search.clone()]) .expect("req"), vec![RelayMessage::Closed { subscription_id: req_id, @@ -2203,7 +2143,7 @@ mod tests { })) .expect("author filter"); let messages = relay - .handle_req(subscription_id.clone(), vec![market_limit, author_limit]) + .handle_protocol_req_for_test(subscription_id.clone(), vec![market_limit, author_limit]) .expect("req"); let mut tied = [tied_author.clone(), tied_other.clone()]; tied.sort_by(|left, right| left.id().cmp(right.id())); @@ -2292,7 +2232,7 @@ mod tests { })) .expect("author filter"); let messages = relay - .handle_req( + .handle_protocol_req_for_test( subscription_id.clone(), vec![market_limit.clone(), author_limit.clone()], ) @@ -2333,7 +2273,7 @@ mod tests { let restricted_sub = SubscriptionId::new("restricted-screened").expect("sub"); let restricted_messages = relay - .handle_req_with_auth( + .handle_protocol_req_with_auth_for_test( restricted_sub.clone(), vec![market_limit.clone(), author_limit.clone()], &outsider_auth, @@ -2365,7 +2305,7 @@ mod tests { let private_sub = SubscriptionId::new("private-screened").expect("sub"); assert_eq!( relay - .handle_req( + .handle_protocol_req_for_test( private_sub.clone(), vec![filter_group_tag(1, "h", "Private")] ) @@ -2379,7 +2319,7 @@ mod tests { let private_auth_sub = SubscriptionId::new("private-auth").expect("sub"); assert!(matches!( relay - .handle_req_with_auth( + .handle_protocol_req_with_auth_for_test( private_auth_sub.clone(), vec![filter_group_tag(1, "h", "Private")], &auth @@ -2431,7 +2371,7 @@ mod tests { filter_from_value(&serde_json::json!({"limit":501})).expect("limit filter"); assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("limit-req").expect("sub"), vec![too_large_limit.clone()] ) @@ -2455,7 +2395,7 @@ mod tests { let search_req = SubscriptionId::new("search-req").expect("sub"); assert_eq!( relay - .handle_req(search_req.clone(), vec![search.clone()]) + .handle_protocol_req_for_test(search_req.clone(), vec![search.clone()]) .expect("search req"), vec![RelayMessage::Closed { subscription_id: search_req, @@ -2502,7 +2442,7 @@ mod tests { assert_accepted(relay.handle_event(second.clone()).expect("second"), &second); let limited = relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("lim").expect("sub"), vec![Filter::empty()], ) @@ -2521,7 +2461,7 @@ mod tests { assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("long").expect("sub"), vec![Filter::empty()] ) @@ -2617,7 +2557,7 @@ mod tests { assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("req").expect("sub"), vec![complex.clone()] ) @@ -3773,7 +3713,7 @@ mod tests { let auth_sub = SubscriptionId::new("private-auth").expect("sub"); assert_eq!( relay - .handle_req(unauth_sub.clone(), vec![filter_kind(1)]) + .handle_protocol_req_for_test(unauth_sub.clone(), vec![filter_kind(1)]) .expect("unauth req"), vec![RelayMessage::Closed { subscription_id: unauth_sub, @@ -3783,7 +3723,7 @@ mod tests { assert_eq!(relay.active_subscription_count(), 0); assert!(matches!( relay - .handle_req_with_auth(auth_sub.clone(), vec![filter_kind(1)], &auth) + .handle_protocol_req_with_auth_for_test(auth_sub.clone(), vec![filter_kind(1)], &auth) .expect("auth req") .as_slice(), [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)] @@ -3878,7 +3818,7 @@ mod tests { .expect("create"); let subscription_id = SubscriptionId::new("fanout-current-auth").expect("sub"); relay - .handle_req(subscription_id.clone(), vec![filter_kind(1)]) + .handle_protocol_req_for_test(subscription_id.clone(), vec![filter_kind(1)]) .expect("sub"); let private_event = signed_event_at( 7, @@ -3891,10 +3831,10 @@ mod tests { .handle_event_with_auth(private_event.clone(), &auth) .expect("private event"); - assert!(relay.fanout(&private_event).is_empty()); + assert!(relay.fanout_protocol_for_test(&private_event).is_empty()); assert!(matches!( relay - .fanout_with_group_auth( + .fanout_protocol_with_group_auth_for_test( &private_event, &GroupAuthContext::new([owner]) ) @@ -3912,17 +3852,17 @@ mod tests { let subscription_id = SubscriptionId::new("sub-volume").expect("sub"); let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter"); relay - .handle_req(subscription_id.clone(), vec![filter]) + .handle_protocol_req_for_test(subscription_id.clone(), vec![filter]) .expect("req"); let first = signed_public_event(7, 1, Vec::new(), "first"); let second = signed_public_event(7, 1, Vec::new(), "second"); assert!(matches!( - relay.fanout(&first).as_slice(), + relay.fanout_protocol_for_test(&first).as_slice(), [RelayMessage::Event { .. }] )); assert!(matches!( - relay.fanout(&second).as_slice(), + relay.fanout_protocol_for_test(&second).as_slice(), [RelayMessage::Event { .. }] )); assert_eq!(relay.active_subscription_count(), 1); @@ -3938,7 +3878,7 @@ mod tests { assert_accepted(relay.handle_event(event.clone()).expect("event"), &event); relay - .handle_req(subscription_id, vec![filter_kind(1)]) + .handle_protocol_req_for_test(subscription_id, vec![filter_kind(1)]) .expect("req"); assert_eq!(relay.active_subscription_count(), 1); @@ -3947,7 +3887,7 @@ mod tests { assert_eq!(report.closed_subscriptions(), 1); assert_eq!(relay.active_subscription_count(), 0); - assert!(relay.fanout(&event).is_empty()); + assert!(relay.fanout_protocol_for_test(&event).is_empty()); let reopened = BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default()) .expect("reopened"); @@ -4034,7 +3974,7 @@ mod tests { ); assert_eq!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("a").expect("sub"), vec![Filter::empty()] ) @@ -4044,7 +3984,7 @@ mod tests { ); assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("a").expect("sub"), vec![Filter::empty(), Filter::empty()], ) @@ -4067,7 +4007,7 @@ mod tests { ); assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("a").expect("sub"), vec![filter_from_value(&serde_json::json!({"limit": 3})).expect("filter")], ) @@ -4086,7 +4026,7 @@ mod tests { assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("abcde").expect("sub"), vec![Filter::empty()], ) @@ -4095,14 +4035,14 @@ mod tests { .contains("max_subid_length 4") ); relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("a").expect("sub"), vec![Filter::empty()], ) .expect("first subscription"); assert!( relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("b").expect("sub"), vec![Filter::empty()] ) @@ -4111,7 +4051,7 @@ mod tests { .contains("connection subscription limit exceeded") ); relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new("a").expect("sub"), vec![Filter::empty()], ) @@ -4352,7 +4292,7 @@ mod tests { fn query_filter(relay: &mut BaseRelay, subscription_id: &str, filter: Filter) -> Vec<Event> { relay - .handle_req( + .handle_protocol_req_for_test( SubscriptionId::new(subscription_id).expect("sub"), vec![filter], ) diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -1371,13 +1371,12 @@ mod tests { second.handle_event_receive_result(second_offset).await, TangleSessionControl::Continue ); - assert_eq!( - take_outbound_text(&mut second), + assert_relay_message_text( + &take_outbound_text(&mut second), RelayMessage::Event { subscription_id: subscription_id.clone(), - event - } - .encode() + event, + }, ); let snapshot = metrics.snapshot(); assert_eq!(snapshot.client_messages(), 5); diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -48,6 +48,14 @@ trait BaseRelayEventTestExt { event: Event, auth: &BaseAuthState, ) -> Result<RelayMessage, BaseRelayError>; + + fn fanout(&mut self, event: &Event) -> Vec<RelayMessage>; + + fn fanout_with_group_auth( + &mut self, + event: &Event, + auth: &GroupAuthContext, + ) -> Vec<RelayMessage>; } impl BaseRelayEventTestExt for BaseRelay { @@ -66,6 +74,56 @@ impl BaseRelayEventTestExt for BaseRelay { let pocket = parse_pocket_event_json(&raw).expect("pocket event"); self.handle_pocket_event_with_auth(&pocket, auth) } + + fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.fanout_pocket(&pocket) + } + + fn fanout_with_group_auth( + &mut self, + event: &Event, + auth: &GroupAuthContext, + ) -> Vec<RelayMessage> { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.fanout_pocket_with_group_auth(&pocket, auth) + } +} + +trait BaseRelayReqTestExt { + fn handle_req( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<Vec<RelayMessage>, BaseRelayError>; + + fn handle_req_with_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<Vec<RelayMessage>, BaseRelayError>; +} + +impl BaseRelayReqTestExt for BaseRelay { + fn handle_req( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_pocket_req(subscription_id, pocket_filters(filters)) + } + + fn handle_req_with_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_pocket_req_with_auth(subscription_id, pocket_filters(filters), auth) + } } trait BaseRelayCountTestExt { diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1690,6 +1690,25 @@ fn runtime_count_handling_does_not_lock_relay_state() { } #[test] +fn relay_core_exposes_pocket_native_req_and_fanout_boundaries() { + let relay_core = include_str!("../src/relay/core.rs"); + + assert!(relay_core.contains("pub fn handle_pocket_req(")); + assert!(relay_core.contains("pub fn handle_pocket_req_with_auth(")); + assert!(relay_core.contains("pub fn fanout_pocket(")); + assert!(relay_core.contains("pub fn fanout_pocket_with_group_auth(")); + assert!(!relay_core.contains("pub fn handle_req(")); + assert!(!relay_core.contains("pub fn handle_req_with_auth(")); + assert!(!relay_core.contains("pub fn fanout(")); + assert!(!relay_core.contains("pub fn fanout_with_group_auth(")); + assert!(!relay_core.contains("pub fn query_events_with_auth(")); + assert!(!relay_core.contains("pub fn validate_event(")); + assert!(!relay_core.contains("pub fn validate_filters(")); + assert!(relay_core.contains("handle_protocol_req_for_test")); + assert!(relay_core.contains("fanout_protocol_for_test")); +} + +#[test] fn runtime_live_fanout_offset_lookup_does_not_lock_relay_state() { let runtime = include_str!("../src/runtime.rs"); let fanout_helper = runtime