commit 3b55068d79195f859bc789729aae3e8307ec8bff
parent 34c47f2e83f4a4f9977d1cb685ee5f181c5e087b
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 14:33:49 -0700
runtime: parse req count through Pocket
- route REQ COUNT and CLOSE through the runtime parser boundary
- convert Pocket filters back into typed Tangle filters
- preserve unsupported search and malformed filter rejection behavior
- keep protocol parser usage outside runtime WebSocket source
Diffstat:
3 files changed, 503 insertions(+), 31 deletions(-)
diff --git a/crates/tangle_protocol/src/lib.rs b/crates/tangle_protocol/src/lib.rs
@@ -659,6 +659,40 @@ impl Filter {
}
}
+ #[allow(clippy::too_many_arguments)]
+ pub fn from_parts(
+ ids: Vec<EventId>,
+ authors: Vec<PublicKeyHex>,
+ kinds: Vec<Kind>,
+ tag_filters: BTreeMap<TagName, Vec<TagValue>>,
+ since: Option<UnixTimestamp>,
+ until: Option<UnixTimestamp>,
+ limit: Option<u64>,
+ search: Option<String>,
+ ) -> Result<Self, String> {
+ for (name, values) in &tag_filters {
+ if !name.is_indexable() {
+ return Err(format!(
+ "filter field `#{}` is invalid: tag name must be a single ASCII letter",
+ name.as_str()
+ ));
+ }
+ if values.is_empty() {
+ return Err(filter_array_error(&format!("#{}", name.as_str())));
+ }
+ }
+ Ok(Self {
+ ids,
+ authors,
+ kinds,
+ tag_filters,
+ since,
+ until,
+ limit,
+ search,
+ })
+ }
+
pub fn ids(&self) -> &[EventId] {
&self.ids
}
@@ -1363,7 +1397,7 @@ mod tests {
parse_client_message, parse_event_json, relay_message_to_value, too_long_error,
};
use core::str::FromStr;
- use std::collections::hash_map::DefaultHasher;
+ use std::collections::{BTreeMap, hash_map::DefaultHasher};
use std::hash::{Hash, Hasher};
#[test]
@@ -2313,6 +2347,46 @@ mod tests {
}
#[test]
+ fn filter_from_parts_builds_validated_filter_components() {
+ let name = TagName::new("t").expect("tag name");
+ let value = TagValue::new("market");
+ let filter = Filter::from_parts(
+ vec![EventId::new(&"a".repeat(EventId::HEX_LENGTH)).expect("id")],
+ vec![PublicKeyHex::new(&"b".repeat(PublicKeyHex::HEX_LENGTH)).expect("author")],
+ vec![Kind::new(1).expect("kind")],
+ BTreeMap::from([(name.clone(), vec![value.clone()])]),
+ Some(UnixTimestamp::new(10)),
+ Some(UnixTimestamp::new(20)),
+ Some(30),
+ Some("carrots".to_owned()),
+ )
+ .expect("filter");
+
+ assert_eq!(filter.ids().len(), 1);
+ assert_eq!(filter.authors().len(), 1);
+ assert_eq!(filter.kinds(), &[Kind::new(1).expect("kind")]);
+ assert_eq!(filter.tag_filters().get(&name), Some(&vec![value]));
+ assert_eq!(filter.since(), Some(UnixTimestamp::new(10)));
+ assert_eq!(filter.until(), Some(UnixTimestamp::new(20)));
+ assert_eq!(filter.limit(), Some(30));
+ assert_eq!(filter.search(), Some("carrots"));
+ assert_eq!(
+ Filter::from_parts(
+ Vec::new(),
+ Vec::new(),
+ Vec::new(),
+ BTreeMap::from([(TagName::new("ab").expect("tag"), Vec::new())]),
+ None,
+ None,
+ None,
+ None
+ )
+ .expect_err("invalid tag"),
+ "filter field `#ab` is invalid: tag name must be a single ASCII letter"
+ );
+ }
+
+ #[test]
fn filter_complete_semantics_are_exact_id_only() {
assert!(
filter_from_value(&serde_json::json!({
diff --git a/crates/tangle_runtime/src/client_message.rs b/crates/tangle_runtime/src/client_message.rs
@@ -1,24 +1,43 @@
#![forbid(unsafe_code)]
-use crate::{errors::BaseRelayError, pocket_conversion::pocket_event_to_tangle};
+use crate::{
+ errors::BaseRelayError,
+ pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle},
+};
+use serde::de::{self, Deserialize, Deserializer, MapAccess, Visitor};
use serde_json::value::RawValue;
-use tangle_protocol::{ClientMessage, parse_client_message};
-use tangle_store_pocket::parse_pocket_event_json;
+use std::collections::BTreeSet;
+use std::fmt;
+use tangle_protocol::{ClientMessage, Filter, SubscriptionId, TagName};
+use tangle_store_pocket::{PocketOwnedFilter, parse_pocket_event_json, parse_pocket_filter_json};
pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<ClientMessage, String> {
- let Ok(values) = serde_json::from_str::<Vec<Box<RawValue>>>(raw) else {
- return parse_client_message(raw);
- };
- let Some(command) = values
- .first()
- .and_then(|value| serde_json::from_str::<String>(value.get()).ok())
- else {
- return parse_client_message(raw);
- };
+ let values = serde_json::from_str::<Vec<Box<RawValue>>>(raw)
+ .map_err(|source| format!("client message JSON is invalid: {source}"))?;
+ let command = parse_string_value(
+ values.first().map(Box::as_ref),
+ "client message command is missing",
+ "client message command must be a string",
+ )?;
match command.as_str() {
"EVENT" => parse_event_or_auth(&values, ClientMessage::Event),
"AUTH" => parse_event_or_auth(&values, ClientMessage::Auth),
- _ => parse_client_message(raw),
+ "REQ" => parse_req_or_count(&values, "REQ", |subscription_id, filters| {
+ ClientMessage::Req {
+ subscription_id,
+ filters,
+ }
+ }),
+ "COUNT" => parse_req_or_count(&values, "COUNT", |subscription_id, filters| {
+ ClientMessage::Count {
+ subscription_id,
+ filters,
+ }
+ }),
+ "CLOSE" => parse_close(&values),
+ unsupported => Err(format!(
+ "client message command `{unsupported}` is unsupported"
+ )),
}
}
@@ -35,15 +54,227 @@ fn parse_event_or_auth(
Ok(build(event))
}
+fn parse_req_or_count(
+ values: &[Box<RawValue>],
+ command: &'static str,
+ build: impl FnOnce(SubscriptionId, Vec<Filter>) -> ClientMessage,
+) -> Result<ClientMessage, String> {
+ if values.len() < 3 {
+ return Err(format!(
+ "{command} client message must contain a subscription id and filters"
+ ));
+ }
+ let subscription_id = parse_subscription_id(&values[1], command)?;
+ let filters = values[2..]
+ .iter()
+ .map(|value| parse_filter(value))
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(build(subscription_id, filters))
+}
+
+fn parse_close(values: &[Box<RawValue>]) -> Result<ClientMessage, String> {
+ if values.len() != 2 {
+ return Err("CLOSE client message must contain exactly 2 elements".to_owned());
+ }
+ parse_subscription_id(&values[1], "CLOSE").map(ClientMessage::Close)
+}
+
+fn parse_subscription_id(
+ value: &RawValue,
+ command: &'static str,
+) -> Result<SubscriptionId, String> {
+ serde_json::from_str::<String>(value.get())
+ .map_err(|_| format!("{command} subscription id must be a string"))
+ .and_then(|subscription_id| SubscriptionId::new(&subscription_id))
+}
+
+fn parse_filter(value: &RawValue) -> Result<Filter, String> {
+ let shape = inspect_filter_shape(value.get())?;
+ let search = shape.search;
+ let pocket_filter_json = serde_json::Value::Object(shape.pocket_fields).to_string();
+ let filter = parse_pocket_filter_json(pocket_filter_json.as_bytes())
+ .map_err(|error| error.message().to_owned())?;
+ build_filter(filter, search)
+}
+
+fn build_filter(filter: PocketOwnedFilter, search: Option<String>) -> Result<Filter, String> {
+ pocket_filter_to_tangle(&filter, search).map_err(base_relay_error_message)
+}
+
+fn parse_string_value(
+ value: Option<&RawValue>,
+ missing: &'static str,
+ invalid_type: &'static str,
+) -> Result<String, String> {
+ let Some(value) = value else {
+ return Err(missing.to_owned());
+ };
+ serde_json::from_str::<String>(value.get()).map_err(|_| invalid_type.to_owned())
+}
+
fn base_relay_error_message(error: BaseRelayError) -> String {
error.message().to_owned()
}
+fn inspect_filter_shape(raw: &str) -> Result<RuntimeFilterShape, String> {
+ let mut deserializer = serde_json::Deserializer::from_str(raw);
+ let shape =
+ RuntimeFilterShape::deserialize(&mut deserializer).map_err(filter_deserialize_error)?;
+ deserializer
+ .end()
+ .map_err(|source| format!("filter JSON is invalid: {source}"))?;
+ Ok(shape)
+}
+
+fn filter_deserialize_error(source: serde_json::Error) -> String {
+ if source.classify() == serde_json::error::Category::Data {
+ strip_json_location(&source.to_string()).to_owned()
+ } else {
+ format!("filter JSON is invalid: {source}")
+ }
+}
+
+fn strip_json_location(message: &str) -> &str {
+ message
+ .split_once(" at line ")
+ .map_or(message, |(head, _)| head)
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+struct RuntimeFilterShape {
+ search: Option<String>,
+ pocket_fields: serde_json::Map<String, serde_json::Value>,
+}
+
+impl<'de> Deserialize<'de> for RuntimeFilterShape {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ deserializer.deserialize_map(RuntimeFilterShapeVisitor)
+ }
+}
+
+struct RuntimeFilterShapeVisitor;
+
+impl<'de> Visitor<'de> for RuntimeFilterShapeVisitor {
+ type Value = RuntimeFilterShape;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str("a filter JSON object")
+ }
+
+ fn visit_map<A>(self, mut object: A) -> Result<Self::Value, A::Error>
+ where
+ A: MapAccess<'de>,
+ {
+ let mut fields = BTreeSet::new();
+ let mut shape = RuntimeFilterShape::default();
+ while let Some(field) = object.next_key::<String>()? {
+ if !fields.insert(field.clone()) {
+ return Err(de::Error::custom(format!(
+ "duplicate object field `{field}`"
+ )));
+ }
+ let value = object.next_value::<serde_json::Value>()?;
+ match field.as_str() {
+ "ids" | "authors" | "kinds" => {
+ validate_non_empty_array(&field, &value)?;
+ shape.pocket_fields.insert(field, value);
+ }
+ "since" | "until" => {
+ validate_u64_field(&field, &value)?;
+ shape.pocket_fields.insert(field, value);
+ }
+ "limit" => {
+ validate_limit_field(&field, &value)?;
+ shape.pocket_fields.insert(field, value);
+ }
+ "search" => {
+ shape.search = Some(
+ value
+ .as_str()
+ .ok_or_else(|| {
+ de::Error::custom(format!(
+ "filter field `{field}` must be a string"
+ ))
+ })?
+ .to_owned(),
+ );
+ }
+ tag_field if tag_field.starts_with('#') => {
+ validate_tag_filter_field(tag_field, &value)?;
+ shape.pocket_fields.insert(field, value);
+ }
+ unsupported => {
+ return Err(de::Error::custom(format!(
+ "filter field `{unsupported}` is unsupported"
+ )));
+ }
+ }
+ }
+ Ok(shape)
+ }
+}
+
+fn validate_non_empty_array<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
+where
+ E: de::Error,
+{
+ match value.as_array() {
+ Some(items) if !items.is_empty() => Ok(()),
+ _ => Err(de::Error::custom(format!(
+ "filter field `{field}` must be a non-empty array"
+ ))),
+ }
+}
+
+fn validate_u64_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
+where
+ E: de::Error,
+{
+ value.as_u64().map(|_| ()).ok_or_else(|| {
+ de::Error::custom(format!(
+ "filter field `{field}` must be an unsigned integer"
+ ))
+ })
+}
+
+fn validate_limit_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
+where
+ E: de::Error,
+{
+ let limit = value.as_u64().ok_or_else(|| {
+ de::Error::custom(format!(
+ "filter field `{field}` must be an unsigned integer"
+ ))
+ })?;
+ u32::try_from(limit)
+ .map(|_| ())
+ .map_err(|_| de::Error::custom(format!("filter field `{field}` exceeds Pocket range")))
+}
+
+fn validate_tag_filter_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
+where
+ E: de::Error,
+{
+ let name = &field[1..];
+ let tag_name = TagName::new(name).map_err(|reason| {
+ de::Error::custom(format!("filter field `{field}` is invalid: {reason}"))
+ })?;
+ if !tag_name.is_indexable() {
+ return Err(de::Error::custom(format!(
+ "filter field `{field}` is invalid: tag name must be a single ASCII letter"
+ )));
+ }
+ validate_non_empty_array(field, value)
+}
+
#[cfg(test)]
mod tests {
use super::parse_runtime_client_message;
use serde_json::json;
- use tangle_protocol::{ClientMessage, event_to_value};
+ use tangle_protocol::{ClientMessage, event_to_value, filter_from_value};
use tangle_test_support::{FixtureKey, tangle_v2_auth_event, tangle_v2_event};
#[test]
@@ -83,18 +314,84 @@ mod tests {
}
#[test]
- fn runtime_parser_delegates_req_count_and_close_until_filter_slice() {
- assert!(matches!(
- parse_runtime_client_message("[\"REQ\",\"sub\",{\"kinds\":[1]}]").expect("req"),
- ClientMessage::Req { .. }
- ));
- assert!(matches!(
- parse_runtime_client_message("[\"COUNT\",\"sub\",{\"kinds\":[1]}]").expect("count"),
- ClientMessage::Count { .. }
- ));
- assert!(matches!(
+ fn runtime_parser_maps_req_count_and_close_without_protocol_parser_delegation() {
+ let expected = filter_from_value(&json!({
+ "ids": ["a".repeat(64)],
+ "authors": ["b".repeat(64)],
+ "kinds": [1],
+ "#t": ["market"],
+ "since": 10,
+ "until": 20,
+ "limit": 30
+ }))
+ .expect("filter");
+ assert_eq!(
+ parse_runtime_client_message(
+ &json!(["REQ", "sub", json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30})]).to_string()
+ )
+ .expect("req"),
+ ClientMessage::Req {
+ subscription_id: "sub".parse().expect("subscription"),
+ filters: vec![expected.clone()]
+ }
+ );
+ assert_eq!(
+ parse_runtime_client_message(
+ &json!(["COUNT", "sub", json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30})]).to_string()
+ )
+ .expect("count"),
+ ClientMessage::Count {
+ subscription_id: "sub".parse().expect("subscription"),
+ filters: vec![expected]
+ }
+ );
+ assert_eq!(
parse_runtime_client_message("[\"CLOSE\",\"sub\"]").expect("close"),
- ClientMessage::Close(_)
- ));
+ ClientMessage::Close("sub".parse().expect("subscription"))
+ );
+ }
+
+ #[test]
+ fn runtime_parser_preserves_search_rejection_marker_before_dispatch() {
+ let ClientMessage::Req { filters, .. } =
+ parse_runtime_client_message("[\"REQ\",\"sub\",{\"search\":\"carrots\",\"limit\":1}]")
+ .expect("search req")
+ else {
+ panic!("req expected")
+ };
+ assert_eq!(filters[0].search(), Some("carrots"));
+ }
+
+ #[test]
+ fn runtime_parser_rejects_malformed_req_and_count_filters() {
+ for (raw, expected) in [
+ (
+ "[\"REQ\",\"sub\",{\"ids\":[]}]",
+ "filter field `ids` must be a non-empty array",
+ ),
+ (
+ "[\"REQ\",\"sub\",{\"unknown\":true}]",
+ "filter field `unknown` is unsupported",
+ ),
+ (
+ "[\"REQ\",\"sub\",{\"#aa\":[\"value\"]}]",
+ "filter field `#aa` is invalid: tag name must be a single ASCII letter",
+ ),
+ (
+ "[\"COUNT\",\"sub\",{\"limit\":4294967296}]",
+ "filter field `limit` exceeds Pocket range",
+ ),
+ (
+ "[\"COUNT\",\"sub\",{\"authors\":[\"BAD\"]}]",
+ "Too short reading pubkey",
+ ),
+ (
+ "[\"REQ\",\"sub\",{\"limit\":1,\"limit\":2}]",
+ "duplicate object field `limit`",
+ ),
+ ] {
+ let actual = parse_runtime_client_message(raw).expect_err(raw);
+ assert!(actual.contains(expected), "{actual}");
+ }
}
}
diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs
@@ -1,13 +1,15 @@
#![forbid(unsafe_code)]
use crate::errors::BaseRelayError;
+use std::collections::BTreeMap;
use std::str;
use tangle_protocol::{
- Event, EventId, Filter, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
+ Event, EventId, Filter, Kind, PublicKeyHex, SignatureHex, Tag, TagName, TagValue,
+ UnixTimestamp, UnsignedEvent,
};
use tangle_store_pocket::{
- PocketEvent, PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags,
- PocketPubkey, PocketSig, PocketTags, PocketTime,
+ PocketEvent, PocketEventId, PocketFilter, PocketKind, PocketOwnedEvent, PocketOwnedFilter,
+ PocketOwnedTags, PocketPubkey, PocketSig, PocketTags, PocketTime,
};
pub(crate) fn tangle_event_to_pocket(event: &Event) -> Result<PocketOwnedEvent, BaseRelayError> {
@@ -113,6 +115,41 @@ pub(crate) fn pocket_event_to_tangle(event: &PocketEvent) -> Result<Event, BaseR
))
}
+pub(crate) fn pocket_filter_to_tangle(
+ filter: &PocketFilter,
+ search: Option<String>,
+) -> Result<Filter, BaseRelayError> {
+ let ids = filter
+ .ids()
+ .map(|id| EventId::new(&id.as_hex_string()).map_err(BaseRelayError::error))
+ .collect::<Result<Vec<_>, _>>()?;
+ let authors = filter
+ .authors()
+ .map(|author| PublicKeyHex::new(&author.as_hex_string()).map_err(BaseRelayError::error))
+ .collect::<Result<Vec<_>, _>>()?;
+ let kinds = filter
+ .kinds()
+ .map(|kind| Kind::new(u64::from(kind.as_u16())).map_err(BaseRelayError::error))
+ .collect::<Result<Vec<_>, _>>()?;
+ let tag_filters = pocket_filter_tags_to_tangle(filter)?;
+ let since =
+ (filter.since() != PocketTime::min()).then(|| UnixTimestamp::new(filter.since().as_u64()));
+ let until =
+ (filter.until() != PocketTime::max()).then(|| UnixTimestamp::new(filter.until().as_u64()));
+ let limit = (filter.limit() != u32::MAX).then(|| u64::from(filter.limit()));
+ Filter::from_parts(
+ ids,
+ authors,
+ kinds,
+ tag_filters,
+ since,
+ until,
+ limit,
+ search,
+ )
+ .map_err(BaseRelayError::error)
+}
+
pub(crate) fn pocket_event_id(event_id: &EventId) -> Result<PocketEventId, BaseRelayError> {
PocketEventId::read_hex(event_id.as_str().as_bytes())
.map_err(|error| BaseRelayError::error(error.to_string()))
@@ -148,6 +185,43 @@ fn tangle_tags_to_pocket(tags: &[Tag]) -> Result<PocketOwnedTags, BaseRelayError
PocketOwnedTags::new(&parts).map_err(|error| BaseRelayError::error(error.to_string()))
}
+fn pocket_filter_tags_to_tangle(
+ filter: &PocketFilter,
+) -> Result<BTreeMap<TagName, Vec<TagValue>>, BaseRelayError> {
+ let tags = filter
+ .tags()
+ .map_err(|error| BaseRelayError::error(error.to_string()))?;
+ let mut tag_filters = BTreeMap::new();
+ for mut tag in tags.iter() {
+ let name = tag
+ .next()
+ .ok_or_else(|| BaseRelayError::invalid("filter tag must include a name"))
+ .and_then(tag_name_from_bytes)?;
+ let values = tag
+ .map(tag_value_from_bytes)
+ .collect::<Result<Vec<_>, _>>()?;
+ if values.is_empty() {
+ return Err(BaseRelayError::invalid(format!(
+ "filter field `#{}` must be a non-empty array",
+ name.as_str()
+ )));
+ }
+ tag_filters.insert(name, values);
+ }
+ Ok(tag_filters)
+}
+
+fn tag_name_from_bytes(bytes: &[u8]) -> Result<TagName, BaseRelayError> {
+ let name = str::from_utf8(bytes).map_err(|error| BaseRelayError::error(error.to_string()))?;
+ TagName::new(name).map_err(BaseRelayError::error)
+}
+
+fn tag_value_from_bytes(bytes: &[u8]) -> Result<TagValue, BaseRelayError> {
+ str::from_utf8(bytes)
+ .map(TagValue::new)
+ .map_err(|error| BaseRelayError::error(error.to_string()))
+}
+
fn ensure_tag_size(size: usize) -> Result<(), BaseRelayError> {
if size > usize::from(u16::MAX) {
return Err(BaseRelayError::invalid(format!(
@@ -199,7 +273,8 @@ fn ensure_event_size(tags_len: usize, content_len: usize) -> Result<(), BaseRela
#[cfg(test)]
mod tests {
use super::{
- pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket, tangle_filter_to_pocket,
+ pocket_event_id, pocket_event_to_tangle, pocket_filter_to_tangle, tangle_event_to_pocket,
+ tangle_filter_to_pocket,
};
use tangle_protocol::{
Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
@@ -301,4 +376,30 @@ mod tests {
);
}
}
+
+ #[test]
+ fn pocket_filter_conversion_builds_tangle_filter_with_search_marker() {
+ let value = serde_json::json!({
+ "ids": ["a".repeat(64)],
+ "authors": ["b".repeat(64)],
+ "kinds": [1],
+ "#t": ["market"],
+ "since": 10,
+ "until": 20,
+ "limit": 30
+ });
+ let filter = filter_from_value(&value).expect("filter");
+ let pocket_filter = tangle_filter_to_pocket(&filter).expect("pocket");
+ let converted =
+ pocket_filter_to_tangle(&pocket_filter, Some("carrots".to_owned())).expect("tangle");
+
+ assert_eq!(converted.ids(), filter.ids());
+ assert_eq!(converted.authors(), filter.authors());
+ assert_eq!(converted.kinds(), filter.kinds());
+ assert_eq!(converted.tag_filters(), filter.tag_filters());
+ assert_eq!(converted.since(), filter.since());
+ assert_eq!(converted.until(), filter.until());
+ assert_eq!(converted.limit(), filter.limit());
+ assert_eq!(converted.search(), Some("carrots"));
+ }
}