commit 5868a5a46979d1c7acd588a8c760b3dfde88067e
parent 15d74820da9bb4fbcfc60ce69c4937e9a2a43f2b
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 18:45:22 -0700
runtime: dispatch sessions through runtime messages
- remove the session protocol message bridge
- route session and runtime dispatch through RuntimeClientMessage
- keep protocol fixtures behind test-only runtime adapters
- retarget source-shape guards to runtime message branches
Diffstat:
3 files changed, 311 insertions(+), 184 deletions(-)
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -1,13 +1,14 @@
#![forbid(unsafe_code)]
use crate::{
+ client_message::RuntimeClientMessage,
config::BaseRelayRuntimeConfig,
errors::BaseRelayError,
event_bus::{TangleEventBus, TangleEventReceiver},
groups::GroupServiceHandle,
logging,
ops::{BaseRelayReadinessHandle, BaseRelayReadinessState},
- pocket_conversion::pocket_event_to_tangle,
+ pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle},
rate_limits::{
TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey,
TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter,
@@ -38,8 +39,7 @@ use tangle_groups::{
validate_client_group_event_structure,
};
use tangle_protocol::{
- ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId,
- UnixTimestamp,
+ Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp,
};
use tangle_store_pocket::PocketStoreHandle;
use tokio::sync::watch;
@@ -815,9 +815,10 @@ impl TangleRuntimeHandle {
self.inner.config.auth_state()
}
- pub async fn handle_client_message(
+ #[cfg(test)]
+ pub(crate) async fn handle_client_message(
&self,
- message: ClientMessage,
+ message: RuntimeClientMessage,
auth: &mut BaseAuthState,
now: UnixTimestamp,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
@@ -830,18 +831,51 @@ impl TangleRuntimeHandle {
.await
}
- pub async fn handle_client_message_with_rate_limit_context(
+ #[cfg(test)]
+ pub(crate) async fn handle_protocol_client_message_for_test(
&self,
- message: ClientMessage,
+ message: tangle_protocol::ClientMessage,
+ auth: &mut BaseAuthState,
+ now: UnixTimestamp,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.handle_client_message(
+ protocol_client_message_to_runtime_for_test(message)?,
+ auth,
+ now,
+ )
+ .await
+ }
+
+ #[cfg(test)]
+ pub(crate) async fn handle_protocol_client_message_with_rate_limit_context_for_test(
+ &self,
+ message: tangle_protocol::ClientMessage,
+ auth: &mut BaseAuthState,
+ rate_limit_context: TangleClientRateLimitContext,
+ now: UnixTimestamp,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.handle_client_message_with_rate_limit_context(
+ protocol_client_message_to_runtime_for_test(message)?,
+ auth,
+ rate_limit_context,
+ now,
+ )
+ .await
+ }
+
+ pub(crate) async fn handle_client_message_with_rate_limit_context(
+ &self,
+ message: RuntimeClientMessage,
auth: &mut BaseAuthState,
rate_limit_context: TangleClientRateLimitContext,
now: UnixTimestamp,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
self.inner
.metrics
- .record_client_message(client_message_metric_kind(&message));
+ .record_client_message(runtime_client_message_metric_kind(&message));
match message {
- ClientMessage::Event(event) => {
+ RuntimeClientMessage::Event(pocket_event) => {
+ let event = pocket_event_to_tangle(&pocket_event)?;
let started_at = Instant::now();
let event_id = event.id().clone();
let is_group_event = self.inner.is_group_event(&event);
@@ -884,10 +918,12 @@ impl TangleRuntimeHandle {
record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
Ok(vec![message])
}
- ClientMessage::Req {
+ RuntimeClientMessage::Req {
subscription_id,
filters,
+ search_present,
} => {
+ let filters = runtime_filters_to_protocol(filters, search_present)?;
let started_at = Instant::now();
self.inner
.limits
@@ -931,10 +967,12 @@ impl TangleRuntimeHandle {
.record_query_latency(elapsed_micros(started_at));
Ok(report.into_messages())
}
- ClientMessage::Count {
+ RuntimeClientMessage::Count {
subscription_id,
filters,
+ search_present,
} => {
+ let filters = runtime_filters_to_protocol(filters, search_present)?;
let started_at = Instant::now();
self.inner
.limits
@@ -984,7 +1022,8 @@ impl TangleRuntimeHandle {
.record_query_latency(elapsed_micros(started_at));
Ok(vec![report.into_message()])
}
- ClientMessage::Auth(event) => {
+ RuntimeClientMessage::Auth(pocket_event) => {
+ let event = pocket_event_to_tangle(&pocket_event)?;
if let Err(error) = self.inner.limits.base_relay_limits().validate_event(&event) {
self.inner.metrics.record_auth_failure();
return Ok(vec![RelayMessage::Ok {
@@ -1021,17 +1060,17 @@ impl TangleRuntimeHandle {
}
Ok(replies)
}
- ClientMessage::Close(subscription_id) => {
+ RuntimeClientMessage::Close(subscription_id) => {
self.inner
.limits
.base_relay_limits()
.validate_subscription_id(&subscription_id)?;
Ok(Vec::new())
}
- ClientMessage::NegOpen {
+ RuntimeClientMessage::NegOpen {
subscription_id, ..
}
- | ClientMessage::NegMsg {
+ | RuntimeClientMessage::NegMsg {
subscription_id, ..
} => {
self.inner
@@ -1042,7 +1081,7 @@ impl TangleRuntimeHandle {
subscription_id,
)])
}
- ClientMessage::NegClose(subscription_id) => {
+ RuntimeClientMessage::NegClose(subscription_id) => {
self.inner
.limits
.base_relay_limits()
@@ -1191,16 +1230,90 @@ fn directory_size_bytes(path: &Path) -> u64 {
.sum()
}
-fn client_message_metric_kind(message: &ClientMessage) -> TangleClientMessageMetricKind {
+fn runtime_filters_to_protocol(
+ filters: Vec<tangle_store_pocket::PocketOwnedFilter>,
+ search_present: bool,
+) -> Result<Vec<Filter>, BaseRelayError> {
+ filters
+ .into_iter()
+ .enumerate()
+ .map(|(index, filter)| {
+ let search = (search_present && index == 0).then(String::new);
+ pocket_filter_to_tangle(&filter, search)
+ })
+ .collect()
+}
+
+#[cfg(test)]
+fn protocol_client_message_to_runtime_for_test(
+ message: tangle_protocol::ClientMessage,
+) -> Result<RuntimeClientMessage, BaseRelayError> {
match message {
- ClientMessage::Event(_) => TangleClientMessageMetricKind::Event,
- ClientMessage::Req { .. } => TangleClientMessageMetricKind::Req,
- ClientMessage::Count { .. } => TangleClientMessageMetricKind::Count,
- ClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth,
- ClientMessage::Close(_) => TangleClientMessageMetricKind::Close,
- ClientMessage::NegOpen { .. }
- | ClientMessage::NegMsg { .. }
- | ClientMessage::NegClose(_) => TangleClientMessageMetricKind::Negentropy,
+ tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event(
+ crate::pocket_conversion::tangle_event_to_pocket(&event)?,
+ )),
+ tangle_protocol::ClientMessage::Req {
+ subscription_id,
+ filters,
+ } => Ok(RuntimeClientMessage::Req {
+ subscription_id,
+ search_present: filters.iter().any(|filter| filter.search().is_some()),
+ filters: filters
+ .iter()
+ .map(crate::pocket_conversion::tangle_filter_to_pocket)
+ .collect::<Result<Vec<_>, _>>()?,
+ }),
+ tangle_protocol::ClientMessage::Count {
+ subscription_id,
+ filters,
+ } => Ok(RuntimeClientMessage::Count {
+ subscription_id,
+ search_present: filters.iter().any(|filter| filter.search().is_some()),
+ filters: filters
+ .iter()
+ .map(crate::pocket_conversion::tangle_filter_to_pocket)
+ .collect::<Result<Vec<_>, _>>()?,
+ }),
+ tangle_protocol::ClientMessage::Close(subscription_id) => {
+ Ok(RuntimeClientMessage::Close(subscription_id))
+ }
+ tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth(
+ crate::pocket_conversion::tangle_event_to_pocket(&event)?,
+ )),
+ tangle_protocol::ClientMessage::NegOpen {
+ subscription_id,
+ filter,
+ message,
+ } => Ok(RuntimeClientMessage::NegOpen {
+ subscription_id,
+ filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?,
+ message,
+ }),
+ tangle_protocol::ClientMessage::NegMsg {
+ subscription_id,
+ message,
+ } => Ok(RuntimeClientMessage::NegMsg {
+ subscription_id,
+ message,
+ }),
+ tangle_protocol::ClientMessage::NegClose(subscription_id) => {
+ Ok(RuntimeClientMessage::NegClose(subscription_id))
+ }
+ }
+}
+
+fn runtime_client_message_metric_kind(
+ message: &RuntimeClientMessage,
+) -> TangleClientMessageMetricKind {
+ match message {
+ RuntimeClientMessage::Event(_) => TangleClientMessageMetricKind::Event,
+ RuntimeClientMessage::Req { .. } => TangleClientMessageMetricKind::Req,
+ RuntimeClientMessage::Count { .. } => TangleClientMessageMetricKind::Count,
+ RuntimeClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth,
+ RuntimeClientMessage::Close(_) => TangleClientMessageMetricKind::Close,
+ RuntimeClientMessage::NegOpen { .. }
+ | RuntimeClientMessage::NegMsg { .. }
+ | RuntimeClientMessage::NegClose(_) => TangleClientMessageMetricKind::Negentropy,
}
}
@@ -2166,7 +2279,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2194,7 +2307,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_434)
@@ -2245,7 +2358,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2282,7 +2395,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2327,7 +2440,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Event(limited_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(saturated_peer_ip), None),
@@ -2343,7 +2456,7 @@ mod tests {
);
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Event(rotated_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(saturated_peer_ip), None),
@@ -2359,7 +2472,7 @@ mod tests {
);
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Event(allowed_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(other_peer_ip), None),
@@ -2404,7 +2517,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
UnixTimestamp::new(120)
@@ -2444,7 +2557,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(peer_ip), None),
@@ -2485,7 +2598,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2523,7 +2636,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(peer_ip), None),
@@ -2579,7 +2692,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(pubkey_event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2595,7 +2708,7 @@ mod tests {
);
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Auth(peer_event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(peer_ip), None),
@@ -2649,7 +2762,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2693,7 +2806,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(peer_ip), None),
@@ -2740,7 +2853,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2784,7 +2897,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2827,7 +2940,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -2871,7 +2984,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
TangleClientRateLimitContext::new(Some(peer_ip), None),
@@ -2908,7 +3021,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
UnixTimestamp::new(120)
@@ -2932,7 +3045,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters
@@ -2970,7 +3083,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters
@@ -3012,7 +3125,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters
@@ -3136,7 +3249,7 @@ mod tests {
let subscription_id = SubscriptionId::new("count-hll-runtime").expect("subscription");
let replies = handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters: vec![
@@ -3211,7 +3324,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message_with_rate_limit_context(
+ .handle_protocol_client_message_with_rate_limit_context_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters
@@ -3246,7 +3359,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Req {
subscription_id: req_id.clone(),
filters: vec![search.clone()]
@@ -3263,7 +3376,7 @@ mod tests {
);
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: count_id.clone(),
filters: vec![search]
@@ -3304,7 +3417,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters
@@ -3345,7 +3458,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters
@@ -3397,7 +3510,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters
@@ -3451,7 +3564,7 @@ mod tests {
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(auth_event.clone()),
&mut auth,
UnixTimestamp::new(120)
@@ -3466,7 +3579,7 @@ mod tests {
);
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(create.clone()),
&mut auth,
UnixTimestamp::new(121)
@@ -3491,7 +3604,7 @@ mod tests {
.expect("put member");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(put_member.clone()),
&mut auth,
UnixTimestamp::new(122)
@@ -3963,7 +4076,7 @@ mod tests {
.expect("owner auth event");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(owner_auth_event.clone()),
&mut owner_auth,
UnixTimestamp::new(base_time)
@@ -3985,7 +4098,7 @@ mod tests {
.expect("create");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(create.clone()),
&mut owner_auth,
UnixTimestamp::new(base_time + 1)
@@ -4007,7 +4120,7 @@ mod tests {
.expect("put member");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(put_member.clone()),
&mut owner_auth,
UnixTimestamp::new(base_time + 2)
@@ -4029,7 +4142,7 @@ mod tests {
.expect("member auth event");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(member_auth_event.clone()),
&mut member_auth,
UnixTimestamp::new(base_time + 3)
@@ -4061,7 +4174,7 @@ mod tests {
.expect("group event");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(
@@ -4093,7 +4206,7 @@ mod tests {
.expect("public event");
assert_eq!(
handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(
@@ -4236,7 +4349,7 @@ mod tests {
let subscription_id =
SubscriptionId::new(&format!("member-req-{index}")).expect("subscription");
let replies = member_req_handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters: vec![
@@ -4277,7 +4390,7 @@ mod tests {
let subscription_id =
SubscriptionId::new(&format!("public-req-{index}")).expect("subscription");
let replies = public_req_handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters: vec![
@@ -4309,7 +4422,7 @@ mod tests {
let subscription_id =
SubscriptionId::new(&format!("member-count-{index}")).expect("subscription");
let replies = member_count_handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters: vec![
@@ -4340,7 +4453,7 @@ mod tests {
let subscription_id =
SubscriptionId::new(&format!("public-count-{index}")).expect("subscription");
let replies = public_count_handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: subscription_id.clone(),
filters: vec![
@@ -4495,7 +4608,7 @@ mod tests {
.expect("challenge");
let event = tangle_v2_auth_event(key, challenge, now).expect("auth event");
let replies = handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(event.clone()),
&mut auth,
UnixTimestamp::new(now),
@@ -4521,7 +4634,11 @@ mod tests {
now: u64,
) -> RelayMessage {
let replies = handle
- .handle_client_message(ClientMessage::Event(event), auth, UnixTimestamp::new(now))
+ .handle_protocol_client_message_for_test(
+ ClientMessage::Event(event),
+ auth,
+ UnixTimestamp::new(now),
+ )
.await
.expect("event message");
@@ -4539,7 +4656,7 @@ mod tests {
now: u64,
) -> u64 {
let replies = handle
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Count {
subscription_id: SubscriptionId::new(subscription_id).expect("subscription"),
filters: vec![runtime_group_filter(group_id, kind, tag_name)],
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -5,7 +5,7 @@ use crate::{
errors::BaseRelayError,
event_bus::{TangleEventReceiveError, TangleEventReceiver},
logging,
- pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle},
+ pocket_conversion::pocket_filter_to_tangle,
relay::{
auth::{BaseAuthState, generate_auth_challenge},
core::BaseRelay,
@@ -22,7 +22,8 @@ use std::{
sync::atomic::{AtomicU64, Ordering},
time::{Instant, SystemTime, UNIX_EPOCH},
};
-use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
+use tangle_protocol::{Filter, RelayMessage, SubscriptionId, UnixTimestamp};
+use tangle_store_pocket::PocketOwnedFilter;
use tokio::sync::{mpsc, watch};
#[derive(Debug)]
@@ -265,23 +266,32 @@ impl TangleWebSocketSession {
async fn handle_client_message(
&mut self,
- message: impl Into<SessionClientMessage>,
+ message: RuntimeClientMessage,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
- match message.into().into_protocol_message()? {
- ClientMessage::Req {
+ match message {
+ RuntimeClientMessage::Req {
subscription_id,
filters,
- } => self.handle_req(subscription_id, filters).await,
- ClientMessage::Count {
+ search_present,
+ } => {
+ self.handle_req(
+ subscription_id,
+ runtime_filters_to_protocol(filters, search_present)?,
+ )
+ .await
+ }
+ RuntimeClientMessage::Count {
subscription_id,
filters,
+ search_present,
} => {
let context = self.client_rate_limit_context();
self.runtime
.handle_client_message_with_rate_limit_context(
- ClientMessage::Count {
+ RuntimeClientMessage::Count {
subscription_id,
filters,
+ search_present,
},
&mut self.auth,
context,
@@ -289,7 +299,7 @@ impl TangleWebSocketSession {
)
.await
}
- ClientMessage::Close(subscription_id) => {
+ RuntimeClientMessage::Close(subscription_id) => {
let metrics = self.runtime.metrics();
metrics.record_client_message(TangleClientMessageMetricKind::Close);
self.limits
@@ -389,83 +399,8 @@ impl TangleWebSocketSession {
}
}
-enum SessionClientMessage {
- Runtime(RuntimeClientMessage),
- Protocol(ClientMessage),
-}
-
-impl From<RuntimeClientMessage> for SessionClientMessage {
- fn from(message: RuntimeClientMessage) -> Self {
- Self::Runtime(message)
- }
-}
-
-impl From<ClientMessage> for SessionClientMessage {
- fn from(message: ClientMessage) -> Self {
- Self::Protocol(message)
- }
-}
-
-impl SessionClientMessage {
- fn into_protocol_message(self) -> Result<ClientMessage, BaseRelayError> {
- match self {
- Self::Protocol(message) => Ok(message),
- Self::Runtime(message) => runtime_message_to_protocol(message),
- }
- }
-}
-
-fn runtime_message_to_protocol(
- message: RuntimeClientMessage,
-) -> Result<ClientMessage, BaseRelayError> {
- match message {
- RuntimeClientMessage::Event(event) => {
- pocket_event_to_tangle(&event).map(ClientMessage::Event)
- }
- RuntimeClientMessage::Auth(event) => {
- pocket_event_to_tangle(&event).map(ClientMessage::Auth)
- }
- RuntimeClientMessage::Req {
- subscription_id,
- filters,
- search_present,
- } => Ok(ClientMessage::Req {
- subscription_id,
- filters: runtime_filters_to_protocol(filters, search_present)?,
- }),
- RuntimeClientMessage::Count {
- subscription_id,
- filters,
- search_present,
- } => Ok(ClientMessage::Count {
- subscription_id,
- filters: runtime_filters_to_protocol(filters, search_present)?,
- }),
- RuntimeClientMessage::Close(subscription_id) => Ok(ClientMessage::Close(subscription_id)),
- RuntimeClientMessage::NegOpen {
- subscription_id,
- filter,
- message,
- } => Ok(ClientMessage::NegOpen {
- subscription_id,
- filter: pocket_filter_to_tangle(&filter, None)?,
- message,
- }),
- RuntimeClientMessage::NegMsg {
- subscription_id,
- message,
- } => Ok(ClientMessage::NegMsg {
- subscription_id,
- message,
- }),
- RuntimeClientMessage::NegClose(subscription_id) => {
- Ok(ClientMessage::NegClose(subscription_id))
- }
- }
-}
-
fn runtime_filters_to_protocol(
- filters: Vec<tangle_store_pocket::PocketOwnedFilter>,
+ filters: Vec<PocketOwnedFilter>,
search_present: bool,
) -> Result<Vec<Filter>, BaseRelayError> {
filters
@@ -544,6 +479,77 @@ fn filters_are_complete(filters: &[Filter]) -> bool {
}
#[cfg(test)]
+impl TangleWebSocketSession {
+ async fn handle_protocol_client_message_for_test(
+ &mut self,
+ message: tangle_protocol::ClientMessage,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.handle_client_message(protocol_client_message_to_runtime_for_session_test(
+ message,
+ )?)
+ .await
+ }
+}
+
+#[cfg(test)]
+fn protocol_client_message_to_runtime_for_session_test(
+ message: tangle_protocol::ClientMessage,
+) -> Result<RuntimeClientMessage, BaseRelayError> {
+ match message {
+ tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event(
+ crate::pocket_conversion::tangle_event_to_pocket(&event)?,
+ )),
+ tangle_protocol::ClientMessage::Req {
+ subscription_id,
+ filters,
+ } => Ok(RuntimeClientMessage::Req {
+ subscription_id,
+ search_present: filters.iter().any(|filter| filter.search().is_some()),
+ filters: filters
+ .iter()
+ .map(crate::pocket_conversion::tangle_filter_to_pocket)
+ .collect::<Result<Vec<_>, _>>()?,
+ }),
+ tangle_protocol::ClientMessage::Count {
+ subscription_id,
+ filters,
+ } => Ok(RuntimeClientMessage::Count {
+ subscription_id,
+ search_present: filters.iter().any(|filter| filter.search().is_some()),
+ filters: filters
+ .iter()
+ .map(crate::pocket_conversion::tangle_filter_to_pocket)
+ .collect::<Result<Vec<_>, _>>()?,
+ }),
+ tangle_protocol::ClientMessage::Close(subscription_id) => {
+ Ok(RuntimeClientMessage::Close(subscription_id))
+ }
+ tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth(
+ crate::pocket_conversion::tangle_event_to_pocket(&event)?,
+ )),
+ tangle_protocol::ClientMessage::NegOpen {
+ subscription_id,
+ filter,
+ message,
+ } => Ok(RuntimeClientMessage::NegOpen {
+ subscription_id,
+ filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?,
+ message,
+ }),
+ tangle_protocol::ClientMessage::NegMsg {
+ subscription_id,
+ message,
+ } => Ok(RuntimeClientMessage::NegMsg {
+ subscription_id,
+ message,
+ }),
+ tangle_protocol::ClientMessage::NegClose(subscription_id) => {
+ Ok(RuntimeClientMessage::NegClose(subscription_id))
+ }
+ }
+}
+
+#[cfg(test)]
mod tests {
use super::{
TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession,
@@ -821,14 +827,14 @@ mod tests {
assert_eq!(
first
- .handle_client_message(req(subscription_id.clone()))
+ .handle_protocol_client_message_for_test(req(subscription_id.clone()))
.await
.expect("first req"),
vec![RelayMessage::Eose(subscription_id.clone())]
);
assert_eq!(
second
- .handle_client_message(req(subscription_id.clone()))
+ .handle_protocol_client_message_for_test(req(subscription_id.clone()))
.await
.expect("second req"),
vec![RelayMessage::Eose(subscription_id.clone())]
@@ -838,7 +844,9 @@ mod tests {
assert_eq!(
first
- .handle_client_message(ClientMessage::Close(subscription_id.clone()))
+ .handle_protocol_client_message_for_test(ClientMessage::Close(
+ subscription_id.clone()
+ ))
.await
.expect("close first"),
Vec::<RelayMessage>::new()
@@ -848,7 +856,7 @@ mod tests {
assert_eq!(
second
- .handle_client_message(req(subscription_id.clone()))
+ .handle_protocol_client_message_for_test(req(subscription_id.clone()))
.await
.expect("replace second"),
vec![RelayMessage::Eose(subscription_id.clone())]
@@ -858,7 +866,7 @@ mod tests {
assert_eq!(
second
- .handle_client_message(ClientMessage::Close(subscription_id))
+ .handle_protocol_client_message_for_test(ClientMessage::Close(subscription_id))
.await
.expect("close second"),
Vec::<RelayMessage>::new()
@@ -890,7 +898,7 @@ mod tests {
tangle_v2_auth_event(FixtureKey::Owner, "owner-live", 120).expect("owner auth event");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(owner_auth_event.clone()),
&mut owner_auth,
UnixTimestamp::new(120)
@@ -907,7 +915,7 @@ mod tests {
.expect("create");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(create.clone()),
&mut owner_auth,
UnixTimestamp::new(121)
@@ -934,7 +942,7 @@ mod tests {
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters: vec![
filter_from_value(&json!({"kinds":[1], "#h":["LiveFarm"]}))
@@ -952,7 +960,7 @@ mod tests {
let before_auth_id = before_auth.id().clone();
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(before_auth),
&mut owner_auth,
UnixTimestamp::new(122)
@@ -982,7 +990,9 @@ mod tests {
.expect("auth event");
assert_eq!(
session
- .handle_client_message(ClientMessage::Auth(session_auth_event.clone()))
+ .handle_protocol_client_message_for_test(ClientMessage::Auth(
+ session_auth_event.clone()
+ ))
.await
.expect("session auth"),
vec![RelayMessage::Ok {
@@ -995,7 +1005,7 @@ mod tests {
.expect("after auth");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(after_auth.clone()),
&mut owner_auth,
UnixTimestamp::new(132)
@@ -1047,7 +1057,7 @@ mod tests {
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(event.clone()),
&mut auth,
UnixTimestamp::new(1_714_124_433)
@@ -1063,7 +1073,7 @@ mod tests {
let exact_id = SubscriptionId::new("exact-id").expect("subscription");
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: exact_id.clone(),
filters: vec![
filter_from_value(&json!({"ids":[event.id().as_str()]}))
@@ -1085,7 +1095,7 @@ mod tests {
let open = SubscriptionId::new("open").expect("subscription");
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: open.clone(),
filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("open filter")],
})
@@ -1104,7 +1114,7 @@ mod tests {
let search = SubscriptionId::new("search").expect("subscription");
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: search.clone(),
filters: vec![
filter_from_value(&json!({"search":"carrots"})).expect("search filter")
@@ -1121,7 +1131,7 @@ mod tests {
let invalid = SubscriptionId::new("invalid").expect("subscription");
let invalid_result = session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: invalid,
filters: vec![Filter::empty(); 11],
})
@@ -1148,7 +1158,7 @@ mod tests {
.expect("owner auth event");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Auth(owner_auth_event.clone()),
&mut owner_auth,
UnixTimestamp::new(120)
@@ -1166,7 +1176,7 @@ mod tests {
.expect("create");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(create.clone()),
&mut owner_auth,
UnixTimestamp::new(121)
@@ -1184,7 +1194,7 @@ mod tests {
.expect("public");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(public_event.clone()),
&mut owner_auth,
UnixTimestamp::new(122)
@@ -1202,7 +1212,7 @@ mod tests {
.expect("private");
assert_eq!(
runtime
- .handle_client_message(
+ .handle_protocol_client_message_for_test(
ClientMessage::Event(private_event.clone()),
&mut owner_auth,
UnixTimestamp::new(123)
@@ -1228,7 +1238,7 @@ mod tests {
let subscription_id = SubscriptionId::new("redacted-req").expect("subscription");
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")],
})
@@ -1397,7 +1407,7 @@ mod tests {
assert_eq!(
session
- .handle_client_message(ClientMessage::Req {
+ .handle_protocol_client_message_for_test(ClientMessage::Req {
subscription_id: subscription_id.clone(),
filters: vec![
filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -1572,10 +1572,10 @@ fn req_count_and_live_fanout_share_one_group_read_gate() {
fn runtime_event_handling_does_not_lock_relay_state() {
let runtime = include_str!("../src/runtime.rs");
let event_branch = runtime
- .split("ClientMessage::Event(event) => {")
+ .split("RuntimeClientMessage::Event(pocket_event) => {")
.nth(1)
.expect("event branch")
- .split("ClientMessage::Req")
+ .split("RuntimeClientMessage::Req")
.next()
.expect("req branch");
@@ -1587,10 +1587,10 @@ fn runtime_event_handling_does_not_lock_relay_state() {
fn runtime_req_handling_does_not_lock_relay_state() {
let runtime = include_str!("../src/runtime.rs");
let req_branch = runtime
- .split("ClientMessage::Req {")
+ .split("RuntimeClientMessage::Req {")
.nth(1)
.expect("req branch")
- .split("ClientMessage::Count")
+ .split("RuntimeClientMessage::Count")
.next()
.expect("count branch");
let query_helper = runtime
@@ -1611,10 +1611,10 @@ fn runtime_req_handling_does_not_lock_relay_state() {
fn runtime_count_handling_does_not_lock_relay_state() {
let runtime = include_str!("../src/runtime.rs");
let count_branch = runtime
- .split("ClientMessage::Count {")
+ .split("RuntimeClientMessage::Count {")
.nth(1)
.expect("count branch")
- .split("ClientMessage::Auth")
+ .split("RuntimeClientMessage::Auth")
.next()
.expect("auth branch");