tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 15d74820da9bb4fbcfc60ce69c4937e9a2a43f2b
parent ae6ea890ea3dba4b974be661bfccc101ee74ecb5
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 18:38:16 -0700

runtime: add pocket-native client message boundary

- add RuntimeClientMessage with Pocket-owned event and filter payloads
- parse EVENT and AUTH directly through Pocket event JSON
- parse REQ COUNT and NEG-OPEN filters through Pocket JSON
- keep session behavior stable for the parser boundary checkpoint

Diffstat:
Mcrates/tangle_runtime/src/client_message.rs | 243++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mcrates/tangle_runtime/src/session.rs | 96++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 229 insertions(+), 110 deletions(-)

diff --git a/crates/tangle_runtime/src/client_message.rs b/crates/tangle_runtime/src/client_message.rs @@ -1,17 +1,42 @@ #![forbid(unsafe_code)] -use crate::{ - errors::BaseRelayError, - pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle}, -}; use serde::de::{self, Deserialize, Deserializer, MapAccess, Visitor}; use serde_json::value::RawValue; use std::collections::BTreeSet; use std::fmt; -use tangle_protocol::{ClientMessage, Filter, SubscriptionId, TagName}; -use tangle_store_pocket::{PocketOwnedFilter, parse_pocket_event_json, parse_pocket_filter_json}; +use tangle_protocol::{SubscriptionId, TagName}; +use tangle_store_pocket::{ + PocketOwnedEvent, PocketOwnedFilter, parse_pocket_event_json, parse_pocket_filter_json, +}; -pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<ClientMessage, String> { +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum RuntimeClientMessage { + Event(PocketOwnedEvent), + Auth(PocketOwnedEvent), + Req { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + }, + Count { + subscription_id: SubscriptionId, + filters: Vec<PocketOwnedFilter>, + search_present: bool, + }, + Close(SubscriptionId), + NegOpen { + subscription_id: SubscriptionId, + filter: PocketOwnedFilter, + message: String, + }, + NegMsg { + subscription_id: SubscriptionId, + message: String, + }, + NegClose(SubscriptionId), +} + +pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<RuntimeClientMessage, String> { let values = serde_json::from_str::<Vec<Box<RawValue>>>(raw) .map_err(|source| format!("client message JSON is invalid: {source}"))?; let command = parse_string_value( @@ -20,18 +45,26 @@ pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<ClientMessage, S "client message command must be a string", )?; match command.as_str() { - "EVENT" => parse_event_or_auth(&values, ClientMessage::Event), - "AUTH" => parse_event_or_auth(&values, ClientMessage::Auth), + "EVENT" => parse_event_or_auth(&values, RuntimeClientMessage::Event), + "AUTH" => parse_event_or_auth(&values, RuntimeClientMessage::Auth), "REQ" => parse_req_or_count(&values, "REQ", |subscription_id, filters| { - ClientMessage::Req { + RuntimeClientMessage::Req { subscription_id, - filters, + search_present: filters.iter().any(|filter| filter.search_present), + filters: filters + .into_iter() + .map(|filter| filter.filter) + .collect::<Vec<_>>(), } }), "COUNT" => parse_req_or_count(&values, "COUNT", |subscription_id, filters| { - ClientMessage::Count { + RuntimeClientMessage::Count { subscription_id, - filters, + search_present: filters.iter().any(|filter| filter.search_present), + filters: filters + .into_iter() + .map(|filter| filter.filter) + .collect::<Vec<_>>(), } }), "CLOSE" => parse_close(&values), @@ -46,22 +79,21 @@ pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<ClientMessage, S fn parse_event_or_auth( values: &[Box<RawValue>], - build: impl FnOnce(tangle_protocol::Event) -> ClientMessage, -) -> Result<ClientMessage, String> { + build: impl FnOnce(PocketOwnedEvent) -> RuntimeClientMessage, +) -> Result<RuntimeClientMessage, String> { if values.len() != 2 { return Err("EVENT and AUTH client messages must contain exactly one event".to_owned()); } let event = parse_pocket_event_json(values[1].get().as_bytes()) - .map_err(|error| error.message().to_owned()) - .and_then(|event| pocket_event_to_tangle(&event).map_err(base_relay_error_message))?; + .map_err(|error| error.message().to_owned())?; Ok(build(event)) } fn parse_req_or_count( values: &[Box<RawValue>], command: &'static str, - build: impl FnOnce(SubscriptionId, Vec<Filter>) -> ClientMessage, -) -> Result<ClientMessage, String> { + build: impl FnOnce(SubscriptionId, Vec<RuntimeParsedFilter>) -> RuntimeClientMessage, +) -> Result<RuntimeClientMessage, String> { if values.len() < 3 { return Err(format!( "{command} client message must contain a subscription id and filters" @@ -75,42 +107,42 @@ fn parse_req_or_count( Ok(build(subscription_id, filters)) } -fn parse_close(values: &[Box<RawValue>]) -> Result<ClientMessage, String> { +fn parse_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { if values.len() != 2 { return Err("CLOSE client message must contain exactly 2 elements".to_owned()); } - parse_subscription_id(&values[1], "CLOSE").map(ClientMessage::Close) + parse_subscription_id(&values[1], "CLOSE").map(RuntimeClientMessage::Close) } -fn parse_neg_open(values: &[Box<RawValue>]) -> Result<ClientMessage, String> { +fn parse_neg_open(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { if values.len() != 4 { return Err( "NEG-OPEN client message must contain a subscription id, filter, and message" .to_owned(), ); } - Ok(ClientMessage::NegOpen { + Ok(RuntimeClientMessage::NegOpen { subscription_id: parse_subscription_id(&values[1], "NEG-OPEN")?, - filter: parse_filter(&values[2])?, + filter: parse_filter(&values[2])?.filter, message: parse_negentropy_message(&values[3], "NEG-OPEN")?, }) } -fn parse_neg_msg(values: &[Box<RawValue>]) -> Result<ClientMessage, String> { +fn parse_neg_msg(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { if values.len() != 3 { return Err("NEG-MSG client message must contain a subscription id and message".to_owned()); } - Ok(ClientMessage::NegMsg { + Ok(RuntimeClientMessage::NegMsg { subscription_id: parse_subscription_id(&values[1], "NEG-MSG")?, message: parse_negentropy_message(&values[2], "NEG-MSG")?, }) } -fn parse_neg_close(values: &[Box<RawValue>]) -> Result<ClientMessage, String> { +fn parse_neg_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { if values.len() != 2 { return Err("NEG-CLOSE client message must contain exactly 2 elements".to_owned()); } - parse_subscription_id(&values[1], "NEG-CLOSE").map(ClientMessage::NegClose) + parse_subscription_id(&values[1], "NEG-CLOSE").map(RuntimeClientMessage::NegClose) } fn parse_subscription_id( @@ -138,17 +170,16 @@ fn parse_negentropy_message(value: &RawValue, command: &'static str) -> Result<S } } -fn parse_filter(value: &RawValue) -> Result<Filter, String> { +fn parse_filter(value: &RawValue) -> Result<RuntimeParsedFilter, String> { let shape = inspect_filter_shape(value.get())?; - let search = shape.search; + let search_present = shape.search; let pocket_filter_json = serde_json::Value::Object(shape.pocket_fields).to_string(); let filter = parse_pocket_filter_json(pocket_filter_json.as_bytes()) .map_err(|error| error.message().to_owned())?; - build_filter(filter, search) -} - -fn build_filter(filter: PocketOwnedFilter, search: Option<String>) -> Result<Filter, String> { - pocket_filter_to_tangle(&filter, search).map_err(base_relay_error_message) + Ok(RuntimeParsedFilter { + filter, + search_present, + }) } fn parse_string_value( @@ -162,10 +193,6 @@ fn parse_string_value( serde_json::from_str::<String>(value.get()).map_err(|_| invalid_type.to_owned()) } -fn base_relay_error_message(error: BaseRelayError) -> String { - error.message().to_owned() -} - fn inspect_filter_shape(raw: &str) -> Result<RuntimeFilterShape, String> { let mut deserializer = serde_json::Deserializer::from_str(raw); let shape = @@ -192,10 +219,16 @@ fn strip_json_location(message: &str) -> &str { #[derive(Debug, Clone, PartialEq, Eq, Default)] struct RuntimeFilterShape { - search: Option<String>, + search: bool, pocket_fields: serde_json::Map<String, serde_json::Value>, } +#[derive(Debug, Clone, PartialEq, Eq)] +struct RuntimeParsedFilter { + filter: PocketOwnedFilter, + search_present: bool, +} + impl<'de> Deserialize<'de> for RuntimeFilterShape { fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where @@ -241,16 +274,10 @@ impl<'de> Visitor<'de> for RuntimeFilterShapeVisitor { shape.pocket_fields.insert(field, value); } "search" => { - shape.search = Some( - value - .as_str() - .ok_or_else(|| { - de::Error::custom(format!( - "filter field `{field}` must be a string" - )) - })? - .to_owned(), - ); + value.as_str().ok_or_else(|| { + de::Error::custom(format!("filter field `{field}` must be a string")) + })?; + shape.search = true; } tag_field if tag_field.starts_with('#') => { validate_tag_filter_field(tag_field, &value)?; @@ -322,28 +349,32 @@ where #[cfg(test)] mod tests { - use super::parse_runtime_client_message; + use super::{RuntimeClientMessage, parse_runtime_client_message}; use serde_json::json; - use tangle_protocol::{ClientMessage, event_to_value, filter_from_value}; + use tangle_protocol::event_to_value; use tangle_test_support::{FixtureKey, tangle_v2_auth_event, tangle_v2_event}; #[test] fn runtime_parser_maps_event_and_auth_through_pocket_event_json() { let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello") .expect("event"); - assert_eq!( + let RuntimeClientMessage::Event(parsed_event) = parse_runtime_client_message(&json!(["EVENT", event_to_value(&event)]).to_string()) - .expect("event"), - ClientMessage::Event(event.clone()) - ); + .expect("event") + else { + panic!("event expected") + }; + assert_eq!(parsed_event.id().as_hex_string(), event.id().as_str()); let auth = tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_434).expect("auth"); - assert_eq!( + let RuntimeClientMessage::Auth(parsed_auth) = parse_runtime_client_message(&json!(["AUTH", event_to_value(&auth)]).to_string()) - .expect("auth"), - ClientMessage::Auth(auth) - ); + .expect("auth") + else { + panic!("auth expected") + }; + assert_eq!(parsed_auth.id().as_hex_string(), auth.id().as_str()); } #[test] @@ -365,78 +396,76 @@ mod tests { #[test] fn runtime_parser_maps_req_count_and_close_without_protocol_parser_delegation() { - let expected = filter_from_value(&json!({ - "ids": ["a".repeat(64)], - "authors": ["b".repeat(64)], - "kinds": [1], - "#t": ["market"], - "since": 10, - "until": 20, - "limit": 30 - })) - .expect("filter"); - assert_eq!( - parse_runtime_client_message( - &json!(["REQ", "sub", json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30})]).to_string() - ) - .expect("req"), - ClientMessage::Req { - subscription_id: "sub".parse().expect("subscription"), - filters: vec![expected.clone()] - } - ); - assert_eq!( - parse_runtime_client_message( - &json!(["COUNT", "sub", json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30})]).to_string() - ) - .expect("count"), - ClientMessage::Count { - subscription_id: "sub".parse().expect("subscription"), - filters: vec![expected] - } - ); + let filter = json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30}); + let RuntimeClientMessage::Req { + subscription_id, + filters, + search_present, + } = parse_runtime_client_message(&json!(["REQ", "sub", filter.clone()]).to_string()) + .expect("req") + else { + panic!("req expected") + }; + assert_eq!(subscription_id.as_str(), "sub"); + assert_eq!(filters.len(), 1); + assert!(!search_present); + + let RuntimeClientMessage::Count { + subscription_id, + filters, + search_present, + } = parse_runtime_client_message(&json!(["COUNT", "sub", filter]).to_string()) + .expect("count") + else { + panic!("count expected") + }; + assert_eq!(subscription_id.as_str(), "sub"); + assert_eq!(filters.len(), 1); + assert!(!search_present); assert_eq!( parse_runtime_client_message("[\"CLOSE\",\"sub\"]").expect("close"), - ClientMessage::Close("sub".parse().expect("subscription")) + RuntimeClientMessage::Close("sub".parse().expect("subscription")) ); } #[test] fn runtime_parser_maps_negentropy_commands_without_protocol_parser_delegation() { - let filter = filter_from_value(&json!({"kinds": [1]})).expect("filter"); - assert_eq!( - parse_runtime_client_message( - &json!(["NEG-OPEN", "neg-sub", json!({"kinds": [1]}), "00ff"]).to_string() - ) - .expect("neg open"), - ClientMessage::NegOpen { - subscription_id: "neg-sub".parse().expect("subscription"), - filter, - message: "00ff".to_owned() - } - ); + let RuntimeClientMessage::NegOpen { + subscription_id, + filter, + message, + } = parse_runtime_client_message( + &json!(["NEG-OPEN", "neg-sub", json!({"kinds": [1]}), "00ff"]).to_string(), + ) + .expect("neg open") + else { + panic!("neg open expected") + }; + assert_eq!(subscription_id.as_str(), "neg-sub"); + assert_eq!(filter.kinds().count(), 1); + assert_eq!(message, "00ff"); assert_eq!( parse_runtime_client_message("[\"NEG-MSG\",\"neg-sub\",\"\"]").expect("neg msg"), - ClientMessage::NegMsg { + RuntimeClientMessage::NegMsg { subscription_id: "neg-sub".parse().expect("subscription"), message: String::new() } ); assert_eq!( parse_runtime_client_message("[\"NEG-CLOSE\",\"neg-sub\"]").expect("neg close"), - ClientMessage::NegClose("neg-sub".parse().expect("subscription")) + RuntimeClientMessage::NegClose("neg-sub".parse().expect("subscription")) ); } #[test] fn runtime_parser_preserves_search_rejection_marker_before_dispatch() { - let ClientMessage::Req { filters, .. } = + let RuntimeClientMessage::Req { search_present, .. } = parse_runtime_client_message("[\"REQ\",\"sub\",{\"search\":\"carrots\",\"limit\":1}]") .expect("search req") else { panic!("req expected") }; - assert_eq!(filters[0].search(), Some("carrots")); + assert!(search_present); } #[test] diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -1,10 +1,11 @@ #![forbid(unsafe_code)] use crate::{ - client_message::parse_runtime_client_message, + client_message::{RuntimeClientMessage, parse_runtime_client_message}, errors::BaseRelayError, event_bus::{TangleEventReceiveError, TangleEventReceiver}, logging, + pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle}, relay::{ auth::{BaseAuthState, generate_auth_challenge}, core::BaseRelay, @@ -264,9 +265,9 @@ impl TangleWebSocketSession { async fn handle_client_message( &mut self, - message: ClientMessage, + message: impl Into<SessionClientMessage>, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - match message { + match message.into().into_protocol_message()? { ClientMessage::Req { subscription_id, filters, @@ -388,6 +389,95 @@ impl TangleWebSocketSession { } } +enum SessionClientMessage { + Runtime(RuntimeClientMessage), + Protocol(ClientMessage), +} + +impl From<RuntimeClientMessage> for SessionClientMessage { + fn from(message: RuntimeClientMessage) -> Self { + Self::Runtime(message) + } +} + +impl From<ClientMessage> for SessionClientMessage { + fn from(message: ClientMessage) -> Self { + Self::Protocol(message) + } +} + +impl SessionClientMessage { + fn into_protocol_message(self) -> Result<ClientMessage, BaseRelayError> { + match self { + Self::Protocol(message) => Ok(message), + Self::Runtime(message) => runtime_message_to_protocol(message), + } + } +} + +fn runtime_message_to_protocol( + message: RuntimeClientMessage, +) -> Result<ClientMessage, BaseRelayError> { + match message { + RuntimeClientMessage::Event(event) => { + pocket_event_to_tangle(&event).map(ClientMessage::Event) + } + RuntimeClientMessage::Auth(event) => { + pocket_event_to_tangle(&event).map(ClientMessage::Auth) + } + RuntimeClientMessage::Req { + subscription_id, + filters, + search_present, + } => Ok(ClientMessage::Req { + subscription_id, + filters: runtime_filters_to_protocol(filters, search_present)?, + }), + RuntimeClientMessage::Count { + subscription_id, + filters, + search_present, + } => Ok(ClientMessage::Count { + subscription_id, + filters: runtime_filters_to_protocol(filters, search_present)?, + }), + RuntimeClientMessage::Close(subscription_id) => Ok(ClientMessage::Close(subscription_id)), + RuntimeClientMessage::NegOpen { + subscription_id, + filter, + message, + } => Ok(ClientMessage::NegOpen { + subscription_id, + filter: pocket_filter_to_tangle(&filter, None)?, + message, + }), + RuntimeClientMessage::NegMsg { + subscription_id, + message, + } => Ok(ClientMessage::NegMsg { + subscription_id, + message, + }), + RuntimeClientMessage::NegClose(subscription_id) => { + Ok(ClientMessage::NegClose(subscription_id)) + } + } +} + +fn runtime_filters_to_protocol( + filters: Vec<tangle_store_pocket::PocketOwnedFilter>, + search_present: bool, +) -> Result<Vec<Filter>, BaseRelayError> { + filters + .into_iter() + .enumerate() + .map(|(index, filter)| { + let search = (search_present && index == 0).then(String::new); + pocket_filter_to_tangle(&filter, search) + }) + .collect() +} + #[derive(Debug, Clone, PartialEq, Eq)] enum TangleSessionControl { Continue,