tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 7421ae582d1d6c82ef8ec8ee654fa5e286adfd76
parent 16a0af31a3a49caee23c41b3add4e98c75ff9359
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 19:44:39 -0700

runtime: encode outbound events from Pocket

- Add a runtime relay message boundary for Pocket-backed outbound EVENT replies.
- Encode outbound EVENT frames from Pocket event JSON while preserving protocol encoders for non-event replies.
- Keep offset lookup and live offset fanout payloads Pocket-owned through session enqueueing.
- Validate the offset, outbound, protocol relay-message, workspace check, and clippy lanes.

Diffstat:
Mcrates/tangle_runtime/src/relay/core.rs | 94+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Mcrates/tangle_runtime/src/relay/mod.rs | 1+
Acrates/tangle_runtime/src/relay/outbound.rs | 125+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/runtime.rs | 142+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Mcrates/tangle_runtime/src/session.rs | 66+++++++++++++++++++++++++++++++++++++++++++-----------------------
5 files changed, 304 insertions(+), 124 deletions(-)

diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -6,7 +6,7 @@ use crate::logging::{self, TangleModerationAuditResult}; use crate::ops::BaseRelayReadinessState; #[cfg(test)] use crate::pocket_conversion::{pocket_event_id, tangle_event_to_pocket}; -use crate::pocket_conversion::{pocket_event_to_tangle, pocket_pubkey, tangle_filter_to_pocket}; +use crate::pocket_conversion::{pocket_event_to_tangle, tangle_filter_to_pocket}; use crate::pocket_event_validation::{ is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind, pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, @@ -14,6 +14,7 @@ use crate::pocket_event_validation::{ use crate::relay::{ auth::BaseAuthState, live::{CloseResult, LiveSubscriptionSet}, + outbound::{RuntimeRelayMessage, protocol_messages}, }; use std::{ cell::{Cell, RefCell}, @@ -29,8 +30,8 @@ use tangle_groups::{ use tangle_protocol::ClientMessage; use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ - PocketEvent, PocketHll8, PocketQueryConfig, PocketScreenResult, PocketStoreConfig, - PocketStoreHandle, + PocketEvent, PocketHll8, PocketOwnedEvent, PocketQueryConfig, PocketScreenResult, + PocketStoreConfig, PocketStoreHandle, }; pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled"; @@ -76,14 +77,14 @@ impl BaseRelayEventWrite { #[derive(Debug, Clone, PartialEq)] pub(crate) struct BaseRelayQueryReport { - messages: Vec<RelayMessage>, + messages: Vec<RuntimeRelayMessage>, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics, } impl BaseRelayQueryReport { fn new( - messages: Vec<RelayMessage>, + messages: Vec<RuntimeRelayMessage>, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics, ) -> Self { @@ -102,9 +103,13 @@ impl BaseRelayQueryReport { self.query_metrics } - pub(crate) fn into_messages(self) -> Vec<RelayMessage> { + pub(crate) fn into_messages(self) -> Vec<RuntimeRelayMessage> { self.messages } + + pub(crate) fn into_protocol_messages(self) -> Result<Vec<RelayMessage>, BaseRelayError> { + protocol_messages(self.messages) + } } #[derive(Debug, Clone, PartialEq)] @@ -142,14 +147,14 @@ impl BaseRelayCountReport { #[derive(Debug, Clone, PartialEq)] struct BaseRelayEventQueryReport { - events: Vec<Event>, + events: Vec<PocketOwnedEvent>, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics, } impl BaseRelayEventQueryReport { fn new( - events: Vec<Event>, + events: Vec<PocketOwnedEvent>, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics, ) -> Self { @@ -658,16 +663,17 @@ impl BaseRelay { ) } - fn event_by_offset(&self, offset: StoreOffset) -> Result<Event, BaseRelayError> { - let event = self.store.event_by_offset(offset.as_u64())?; - pocket_event_to_tangle(&event) + fn event_by_offset(&self, offset: StoreOffset) -> Result<PocketOwnedEvent, BaseRelayError> { + self.store + .event_by_offset(offset.as_u64()) + .map_err(BaseRelayError::from) } pub fn event_by_offset_with_auth( &self, offset: StoreOffset, auth: &BaseAuthState, - ) -> Result<Option<Event>, BaseRelayError> { + ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> { let event = self.event_by_offset(offset)?; if Self::group_read_gate_visible_to_auth( self.groups.as_ref(), @@ -1184,7 +1190,7 @@ impl BaseRelay { auth: &GroupAuthContext, ) -> Result<Vec<RelayMessage>, BaseRelayError> { self.handle_req_with_group_auth_report(subscription_id, filters, auth) - .map(BaseRelayQueryReport::into_messages) + .and_then(BaseRelayQueryReport::into_protocol_messages) } fn handle_req_with_group_auth_report( @@ -1197,7 +1203,7 @@ impl BaseRelay { self.limits.validate_filters(&filters)?; if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { return Ok(BaseRelayQueryReport::new( - vec![message], + vec![message.into()], false, BaseRelayQueryMetrics::default(), )); @@ -1245,7 +1251,7 @@ impl BaseRelay { limits.validate_filters(&filters)?; if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { return Ok(BaseRelayQueryReport::new( - vec![message], + vec![message.into()], false, BaseRelayQueryMetrics::default(), )); @@ -1257,15 +1263,12 @@ impl BaseRelay { let mut messages = report .events .into_iter() - .map(|event| RelayMessage::Event { - subscription_id: subscription_id.clone(), - event, - }) + .map(|event| RuntimeRelayMessage::event(subscription_id.clone(), event)) .collect::<Vec<_>>(); if group_read_denied { - messages.push(Self::redacted_req_closed(subscription_id, auth)); + messages.push(Self::redacted_req_closed(subscription_id, auth).into()); } else { - messages.push(RelayMessage::Eose(subscription_id)); + messages.push(RelayMessage::Eose(subscription_id).into()); } Ok(BaseRelayQueryReport::new( messages, @@ -1419,8 +1422,13 @@ impl BaseRelay { filters: &[Filter], auth: &GroupAuthContext, ) -> Result<Vec<Event>, BaseRelayError> { - self.query_events_report(filters, auth) - .map(|report| report.events) + self.query_events_report(filters, auth).and_then(|report| { + report + .events + .into_iter() + .map(|event| pocket_event_to_tangle(&event)) + .collect() + }) } fn query_events_report( @@ -1502,12 +1510,12 @@ impl BaseRelay { group_read_denied |= report.group_read_denied; query_metrics = query_metrics.add(report.query_metrics); for event in report.events { + let event: &PocketEvent = &event; if let (Some(hll), Some(offset)) = (&mut hll, hll_offset) { - let pubkey = pocket_pubkey(event.unsigned().pubkey())?; - hll.add_element(pubkey.as_bytes(), offset) + hll.add_element(event.pubkey().as_bytes(), offset) .map_err(|error| BaseRelayError::error(error.to_string()))?; } - seen.insert(event.id().clone()); + seen.insert(event.id()); } } let count = u64::try_from(seen.len()) @@ -1577,11 +1585,7 @@ impl BaseRelay { return Err(error); } let group_read_denied = screened.redacted(); - let events = screened - .into_events() - .into_iter() - .map(|pocket_event| pocket_event_to_tangle(&pocket_event)) - .collect::<Result<Vec<_>, _>>()?; + let events = screened.into_events(); Ok(BaseRelayEventQueryReport::new( events, group_read_denied, @@ -1602,18 +1606,22 @@ impl BaseRelay { } } - fn sort_and_dedupe_query_events(mut events: Vec<Event>) -> Vec<Event> { + fn sort_and_dedupe_query_events(mut events: Vec<PocketOwnedEvent>) -> Vec<PocketOwnedEvent> { events.sort_by(|left, right| { + let left: &PocketEvent = left; + let right: &PocketEvent = right; right - .unsigned() .created_at() - .cmp(&left.unsigned().created_at()) - .then_with(|| left.id().cmp(right.id())) + .cmp(&left.created_at()) + .then_with(|| left.id().cmp(&right.id())) }); let mut seen = BTreeSet::new(); events .into_iter() - .filter(|event| seen.insert(event.id().clone())) + .filter(|event| { + let event: &PocketEvent = event; + seen.insert(event.id()) + }) .collect() } @@ -1652,7 +1660,9 @@ mod tests { ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, filter_from_value, }; - use tangle_store_pocket::{PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy}; + use tangle_store_pocket::{ + PocketEvent, PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, + }; #[test] fn base_relay_stores_queries_counts_closes_and_fans_out_public_events() { let mut relay = test_relay("base-relay-public", 4); @@ -1849,7 +1859,9 @@ mod tests { let pocket = tangle_event_to_pocket(&event).expect("pocket"); let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store")); - assert_eq!(relay.event_by_offset(offset).expect("offset"), event); + let found = relay.event_by_offset(offset).expect("offset"); + let found: &PocketEvent = &found; + assert_eq!(found.id().as_hex_string(), event.id().as_str()); } #[test] @@ -3520,7 +3532,8 @@ mod tests { .event_by_offset_with_auth(offset, &owner_auth) .expect("owner offset") .expect("visible"); - assert_eq!(visible.id(), private_event.id()); + let visible: &PocketEvent = &visible; + assert_eq!(visible.id().as_hex_string(), private_event.id().as_str()); relay .handle_event_with_auth( @@ -3548,7 +3561,8 @@ mod tests { .event_by_offset_with_auth(offset, &owner_auth) .expect("hidden owner offset") .expect("hidden visible"); - assert_eq!(visible.id(), hidden_event.id()); + let visible: &PocketEvent = &visible; + assert_eq!(visible.id().as_hex_string(), hidden_event.id().as_str()); } #[test] diff --git a/crates/tangle_runtime/src/relay/mod.rs b/crates/tangle_runtime/src/relay/mod.rs @@ -3,3 +3,4 @@ pub mod auth; pub mod core; pub mod live; +pub mod outbound; diff --git a/crates/tangle_runtime/src/relay/outbound.rs b/crates/tangle_runtime/src/relay/outbound.rs @@ -0,0 +1,125 @@ +#![forbid(unsafe_code)] + +use crate::{errors::BaseRelayError, pocket_conversion::pocket_event_to_tangle}; +use std::str; +use tangle_protocol::{RelayMessage, SubscriptionId}; +use tangle_store_pocket::PocketOwnedEvent; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum RuntimeRelayMessage { + Event { + subscription_id: SubscriptionId, + event: PocketOwnedEvent, + }, + Protocol(RelayMessage), +} + +impl RuntimeRelayMessage { + pub(crate) fn event(subscription_id: SubscriptionId, event: PocketOwnedEvent) -> Self { + Self::Event { + subscription_id, + event, + } + } + + pub(crate) fn encode(&self) -> Result<String, BaseRelayError> { + match self { + Self::Event { + subscription_id, + event, + } => encode_pocket_event_message(subscription_id, event), + Self::Protocol(message) => Ok(message.encode()), + } + } + + pub(crate) fn into_protocol_message(self) -> Result<RelayMessage, BaseRelayError> { + match self { + Self::Event { + subscription_id, + event, + } => Ok(RelayMessage::Event { + subscription_id, + event: pocket_event_to_tangle(&event)?, + }), + Self::Protocol(message) => Ok(message), + } + } +} + +impl From<RelayMessage> for RuntimeRelayMessage { + fn from(message: RelayMessage) -> Self { + Self::Protocol(message) + } +} + +pub(crate) fn protocol_messages( + messages: Vec<RuntimeRelayMessage>, +) -> Result<Vec<RelayMessage>, BaseRelayError> { + messages + .into_iter() + .map(RuntimeRelayMessage::into_protocol_message) + .collect() +} + +fn encode_pocket_event_message( + subscription_id: &SubscriptionId, + event: &PocketOwnedEvent, +) -> Result<String, BaseRelayError> { + let subscription = serde_json::to_string(subscription_id.as_str()).map_err(|error| { + BaseRelayError::error(format!("outbound subscription encode failed: {error}")) + })?; + let event_json = event.as_json().map_err(|error| { + BaseRelayError::error(format!("outbound Pocket event encode failed: {error}")) + })?; + let event_json = str::from_utf8(&event_json).map_err(|error| { + BaseRelayError::error(format!("outbound Pocket event JSON is not UTF-8: {error}")) + })?; + Ok(format!(r#"["EVENT",{subscription},{event_json}]"#)) +} + +#[cfg(test)] +mod tests { + use super::RuntimeRelayMessage; + use crate::pocket_conversion::tangle_event_to_pocket; + use serde_json::json; + use tangle_protocol::{RelayMessage, SubscriptionId, event_to_value, relay_message_to_value}; + use tangle_test_support::{FixtureKey, tangle_v2_event}; + + #[test] + fn outbound_pocket_event_encoding_preserves_event_fields() { + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")], + "fresh carrots", + ) + .expect("event"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + let subscription_id = SubscriptionId::new("outbound-event").expect("subscription"); + let encoded = RuntimeRelayMessage::event(subscription_id.clone(), pocket) + .encode() + .expect("encoded"); + + assert_eq!( + serde_json::from_str::<serde_json::Value>(&encoded).expect("json"), + json!(["EVENT", subscription_id.as_str(), event_to_value(&event)]) + ); + } + + #[test] + fn outbound_protocol_messages_still_use_protocol_encoder() { + let subscription_id = SubscriptionId::new("outbound-eose").expect("subscription"); + let message = RelayMessage::Eose(subscription_id); + + assert_eq!( + serde_json::from_str::<serde_json::Value>( + &RuntimeRelayMessage::from(message.clone()) + .encode() + .expect("encoded") + ) + .expect("json"), + relay_message_to_value(&message) + ); + } +} diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -21,6 +21,7 @@ use crate::{ BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayShutdownReport, }, live::LiveSubscriptionSet, + outbound::{RuntimeRelayMessage, protocol_messages}, }, }; use serde::{Deserialize, Serialize}; @@ -40,9 +41,9 @@ use tangle_groups::{ validate_client_group_event_structure, }; use tangle_protocol::{ - Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp, + EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp, }; -use tangle_store_pocket::{PocketEvent, PocketOwnedFilter, PocketStoreHandle}; +use tangle_store_pocket::{PocketEvent, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle}; use tokio::sync::watch; pub struct TangleRuntime { @@ -824,17 +825,19 @@ impl TangleRuntimeHandle { auth: &mut BaseAuthState, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_client_message_with_rate_limit_context( - RuntimeClientMessage::Count { - subscription_id, - filters, - search_present: false, - }, - auth, - TangleClientRateLimitContext::default(), - now, - ) - .await + let messages = self + .handle_client_message_with_rate_limit_context( + RuntimeClientMessage::Count { + subscription_id, + filters, + search_present: false, + }, + auth, + TangleClientRateLimitContext::default(), + now, + ) + .await?; + protocol_messages(messages) } #[cfg(test)] @@ -844,13 +847,15 @@ impl TangleRuntimeHandle { auth: &mut BaseAuthState, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_client_message_with_rate_limit_context( - message, - auth, - TangleClientRateLimitContext::default(), - now, - ) - .await + let messages = self + .handle_client_message_with_rate_limit_context( + message, + auth, + TangleClientRateLimitContext::default(), + now, + ) + .await?; + protocol_messages(messages) } #[cfg(test)] @@ -876,13 +881,15 @@ impl TangleRuntimeHandle { rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_client_message_with_rate_limit_context( - protocol_client_message_to_runtime_for_test(message)?, - auth, - rate_limit_context, - now, - ) - .await + let messages = self + .handle_client_message_with_rate_limit_context( + protocol_client_message_to_runtime_for_test(message)?, + auth, + rate_limit_context, + now, + ) + .await?; + protocol_messages(messages) } pub(crate) async fn handle_client_message_with_rate_limit_context( @@ -891,7 +898,7 @@ impl TangleRuntimeHandle { auth: &mut BaseAuthState, rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { self.inner .metrics .record_client_message(runtime_client_message_metric_kind(&message)); @@ -905,7 +912,7 @@ impl TangleRuntimeHandle { .rate_limit_event_pocket(&pocket_event, rate_limit_context, now)? { record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); - return Ok(vec![message]); + return Ok(vec![message.into()]); } if let Some(message) = self.inner.rate_limit_group_write_pocket( &pocket_event, @@ -913,7 +920,7 @@ impl TangleRuntimeHandle { now, )? { record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); - return Ok(vec![message]); + return Ok(vec![message.into()]); } let result = self .inner @@ -942,7 +949,7 @@ impl TangleRuntimeHandle { } let message = result.into_message(); record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); - Ok(vec![message]) + Ok(vec![message.into()]) } RuntimeClientMessage::Req { subscription_id, @@ -965,7 +972,7 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - return Ok(vec![message]); + return Ok(vec![message.into()]); } if let Some(message) = self.inner.rate_limit_req( &subscription_id, @@ -977,7 +984,7 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - return Ok(vec![message]); + return Ok(vec![message.into()]); } let report = self.inner @@ -1014,13 +1021,13 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - return Ok(vec![message]); + return Ok(vec![message.into()]); } if let Some(message) = self.inner.refuse_broad_count(&subscription_id, &filters) { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - return Ok(vec![message]); + return Ok(vec![message.into()]); } if let Some(message) = self.inner.rate_limit_count( &subscription_id, @@ -1032,7 +1039,7 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - return Ok(vec![message]); + return Ok(vec![message.into()]); } let report = self.inner @@ -1046,7 +1053,7 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - Ok(vec![report.into_message()]) + Ok(vec![report.into_message().into()]) } RuntimeClientMessage::Auth(pocket_event) => { let event_id = pocket_event_id(&pocket_event)?; @@ -1057,11 +1064,11 @@ impl TangleRuntimeHandle { .validate_pocket_event(&pocket_event) { self.inner.metrics.record_auth_failure(); - return Ok(vec![RelayMessage::Ok { + return Ok(vec![RuntimeRelayMessage::from(RelayMessage::Ok { event_id, accepted: false, message: error.prefixed_message(), - }]); + })]); } if let Some(message) = self.inner.rate_limit_auth_attempt_pocket( &pocket_event, @@ -1069,7 +1076,7 @@ impl TangleRuntimeHandle { now, )? { self.inner.metrics.record_auth_failure(); - return Ok(vec![message]); + return Ok(vec![message.into()]); } let event_for_failure = pocket_event.clone(); let replies = BaseRelay::handle_pocket_auth_with_limits( @@ -1085,12 +1092,12 @@ impl TangleRuntimeHandle { rate_limit_context, now, )? { - return Ok(vec![message]); + return Ok(vec![message.into()]); } } else { self.inner.metrics.record_auth_success(); } - Ok(replies) + Ok(replies.into_iter().map(Into::into).collect()) } RuntimeClientMessage::Close(subscription_id) => { self.inner @@ -1109,9 +1116,9 @@ impl TangleRuntimeHandle { .limits .base_relay_limits() .validate_subscription_id(&subscription_id)?; - Ok(vec![BaseRelay::disabled_negentropy_message( - subscription_id, - )]) + Ok(vec![ + BaseRelay::disabled_negentropy_message(subscription_id).into(), + ]) } RuntimeClientMessage::NegClose(subscription_id) => { self.inner @@ -1170,20 +1177,19 @@ impl TangleRuntimeHandle { &self, offset: StoreOffset, auth: &BaseAuthState, - ) -> Result<Option<Event>, BaseRelayError> { + ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> { let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?; - let event = pocket_event_to_tangle(&pocket_event)?; let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); let visible = BaseRelay::group_read_gate_visible_to_auth( self.inner.groups.as_ref(), - &event, + &pocket_event, &group_auth, )?; if !visible { self.inner.metrics.record_group_read_denial(); return Ok(None); } - Ok(Some(event)) + Ok(Some(pocket_event)) } pub(crate) async fn fanout_event_offset( @@ -1191,14 +1197,23 @@ impl TangleRuntimeHandle { offset: StoreOffset, subscriptions: &mut LiveSubscriptionSet, auth: &BaseAuthState, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?; let event = pocket_event_to_tangle(&pocket_event)?; let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); - Ok(subscriptions.fanout(&event, &group_auth, |event, auth| { + let messages = subscriptions.fanout(&event, &group_auth, |event, auth| { BaseRelay::group_read_gate_visible_to_auth(self.inner.groups.as_ref(), event, auth) .unwrap_or(false) - })) + }); + Ok(messages + .into_iter() + .map(|message| match message { + RelayMessage::Event { + subscription_id, .. + } => RuntimeRelayMessage::event(subscription_id, pocket_event.clone()), + message => message.into(), + }) + .collect()) } pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> { @@ -2084,6 +2099,7 @@ mod tests { use crate::relay::auth::BaseAuthState; use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics}; use crate::relay::live::LiveSubscriptionSet; + use crate::relay::outbound::RuntimeRelayMessage; use serde_json::json; use std::{ collections::{BTreeMap, BTreeSet}, @@ -2097,8 +2113,8 @@ mod tests { MemberStatus, StoreOffset, rebuild_group_projection, }; use tangle_protocol::{ - ClientMessage, Event, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, Tag, - UnixTimestamp, filter_from_value, + ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, + Tag, UnixTimestamp, filter_from_value, }; use tangle_test_support::{ FixtureKey, tangle_v2_auth_event, tangle_v2_delete_group_event, tangle_v2_event, @@ -2331,10 +2347,10 @@ mod tests { .await .expect("fanout") .as_slice(), - [RelayMessage::Event { + [RuntimeRelayMessage::Event { subscription_id: delivered, event: found - }] if delivered == &subscription_id && found.id() == event.id() + }] if delivered == &subscription_id && found.id().as_hex_string() == event.id().as_str() )); assert_eq!( @@ -3666,11 +3682,11 @@ mod tests { .expect("fanout"); assert!(matches!( messages.as_slice(), - [RelayMessage::Event { + [RuntimeRelayMessage::Event { subscription_id: delivered, event }] if delivered == &subscription_id - && generated_kinds.insert(event.unsigned().kind().as_u32()) + && generated_kinds.insert(u32::from(event.kind().as_u16())) )); } assert_eq!( @@ -4313,7 +4329,9 @@ mod tests { .event_by_offset_with_auth(offset, &public_auth) .await .expect("public offset"); - let is_group_event = group_event_ids.contains(member_event.id()); + let member_event_id = + EventId::new(&member_event.id().as_hex_string()).expect("pocket id"); + let is_group_event = group_event_ids.contains(&member_event_id); if is_group_event { assert!(public_event.is_none()); } else { @@ -4359,12 +4377,14 @@ mod tests { .expect("member fanout"); for reply in member_replies { match reply { - RelayMessage::Event { + RuntimeRelayMessage::Event { subscription_id, event, } => { assert_eq!(subscription_id, member_subscription); - assert!(group_event_ids.contains(event.id())); + let event_id = + EventId::new(&event.id().as_hex_string()).expect("pocket id"); + assert!(group_event_ids.contains(&event_id)); member_fanout_count += 1; } other => panic!("unexpected fanout reply {other:?}"), diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -10,6 +10,7 @@ use crate::{ auth::{BaseAuthState, generate_auth_challenge}, core::BaseRelay, live::{CloseResult, LiveSubscriptionSet}, + outbound::RuntimeRelayMessage, }, runtime::{ TangleClientMessageMetricKind, TangleClientRateLimitContext, TangleRuntimeHandle, @@ -205,7 +206,7 @@ impl TangleWebSocketSession { .await { Ok(replies) => replies, - Err(error) => vec![RelayMessage::Notice(error.prefixed_message())], + Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()], }; for reply in replies { if let Err(control) = self.enqueue_relay_message(reply) { @@ -219,9 +220,10 @@ impl TangleWebSocketSession { match message { Message::Text(raw) => self.dispatch_text(raw.as_str()).await, Message::Binary(_) => self - .enqueue_relay_message(RelayMessage::Notice( - "invalid: client message must be a text frame".to_owned(), - )) + .enqueue_relay_message( + RelayMessage::Notice("invalid: client message must be a text frame".to_owned()) + .into(), + ) .map(|_| TangleSessionControl::Continue) .unwrap_or_else(|control| control), Message::Ping(_) | Message::Pong(_) => TangleSessionControl::Continue, @@ -236,25 +238,28 @@ impl TangleWebSocketSession { .issue_challenge(challenge, current_unix_timestamp()) }) .unwrap_or_else(|error| RelayMessage::Notice(error.prefixed_message())); - self.send_relay_message(message).is_ok() + self.send_relay_message(message.into()).is_ok() } async fn dispatch_text(&mut self, raw: &str) -> TangleSessionControl { if raw.len() > self.limits.max_message_length() { return self - .enqueue_relay_message(RelayMessage::Notice(format!( - "invalid: client message length exceeds runtime max_message_length {}", - self.limits.max_message_length() - ))) + .enqueue_relay_message( + RelayMessage::Notice(format!( + "invalid: client message length exceeds runtime max_message_length {}", + self.limits.max_message_length() + )) + .into(), + ) .map(|_| TangleSessionControl::Continue) .unwrap_or_else(|control| control); } let replies = match parse_runtime_client_message(raw) { Ok(message) => match self.handle_client_message(message).await { Ok(replies) => replies, - Err(error) => vec![RelayMessage::Notice(error.prefixed_message())], + Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()], }, - Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}"))], + Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}")).into()], }; for reply in replies { if let Err(control) = self.enqueue_relay_message(reply) { @@ -267,7 +272,7 @@ impl TangleWebSocketSession { async fn handle_client_message( &mut self, message: RuntimeClientMessage, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { match message { RuntimeClientMessage::Req { subscription_id, @@ -328,7 +333,7 @@ impl TangleWebSocketSession { &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { let metrics = self.runtime.metrics(); metrics.record_client_message(TangleClientMessageMetricKind::Req); self.limits @@ -336,7 +341,7 @@ impl TangleWebSocketSession { .validate_subscription_id(&subscription_id)?; self.limits.base_relay_limits().validate_filters(&filters)?; if let Some(message) = BaseRelay::unsupported_search_closed(&subscription_id, &filters) { - return Ok(vec![message]); + return Ok(vec![message.into()]); } if let Some(message) = self .runtime @@ -349,7 +354,7 @@ impl TangleWebSocketSession { ) .await { - return Ok(vec![message]); + return Ok(vec![message.into()]); } let should_subscribe = !filters_are_complete(&filters); if should_subscribe { @@ -375,14 +380,20 @@ impl TangleWebSocketSession { TangleClientRateLimitContext::new(self.peer_ip, Some(self.connection_id)) } - fn send_relay_message(&self, message: RelayMessage) -> Result<(), TangleOutboundQueueError> { + fn send_relay_message(&self, message: RuntimeRelayMessage) -> Result<(), TangleSessionControl> { + let text = message + .encode() + .map_err(|_| TangleSessionControl::Close(outbound_encode_close_message()))?; self.outbound - .try_send(Message::Text(message.encode().into())) + .try_send(Message::Text(text.into())) + .map_err(|error| self.outbound_queue_error_control(error)) } - fn enqueue_relay_message(&self, message: RelayMessage) -> Result<(), TangleSessionControl> { + fn enqueue_relay_message( + &self, + message: RuntimeRelayMessage, + ) -> Result<(), TangleSessionControl> { self.send_relay_message(message) - .map_err(|error| self.outbound_queue_error_control(error)) } fn outbound_queue_error_control( @@ -434,6 +445,13 @@ fn outbound_queue_full_close_message() -> Message { })) } +fn outbound_encode_close_message() -> Message { + Message::Close(Some(CloseFrame { + code: 1011, + reason: Utf8Bytes::from_static("outbound relay message encode failed"), + })) +} + #[derive(Debug, Clone)] pub struct TangleOutboundSender { sender: mpsc::Sender<Message>, @@ -484,10 +502,12 @@ impl TangleWebSocketSession { &mut self, message: tangle_protocol::ClientMessage, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_client_message(protocol_client_message_to_runtime_for_session_test( - message, - )?) - .await + let messages = self + .handle_client_message(protocol_client_message_to_runtime_for_session_test( + message, + )?) + .await?; + crate::relay::outbound::protocol_messages(messages) } }