tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

client_message.rs (18987B)


      1 #![forbid(unsafe_code)]
      2 
      3 use serde::de::{self, Deserialize, Deserializer, MapAccess, Visitor};
      4 use serde_json::value::RawValue;
      5 use std::collections::BTreeSet;
      6 use std::fmt;
      7 use tangle_protocol::{SubscriptionId, TagName};
      8 use tangle_store_pocket::{
      9     PocketOwnedEvent, PocketOwnedFilter, parse_pocket_event_json, parse_pocket_filter_json,
     10 };
     11 
     12 #[derive(Debug, Clone, PartialEq, Eq)]
     13 pub(crate) enum RuntimeClientMessage {
     14     Event(PocketOwnedEvent),
     15     Auth(PocketOwnedEvent),
     16     Req {
     17         subscription_id: SubscriptionId,
     18         filters: Vec<PocketOwnedFilter>,
     19         search_present: bool,
     20     },
     21     Count {
     22         subscription_id: SubscriptionId,
     23         filters: Vec<PocketOwnedFilter>,
     24         search_present: bool,
     25     },
     26     Close(SubscriptionId),
     27     NegOpen {
     28         subscription_id: SubscriptionId,
     29         filter: PocketOwnedFilter,
     30         message: String,
     31     },
     32     NegMsg {
     33         subscription_id: SubscriptionId,
     34         message: String,
     35     },
     36     NegClose(SubscriptionId),
     37 }
     38 
     39 pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<RuntimeClientMessage, String> {
     40     let values = serde_json::from_str::<Vec<Box<RawValue>>>(raw)
     41         .map_err(|source| format!("client message JSON is invalid: {source}"))?;
     42     let command = parse_string_value(
     43         values.first().map(Box::as_ref),
     44         "client message command is missing",
     45         "client message command must be a string",
     46     )?;
     47     match command.as_str() {
     48         "EVENT" => parse_event_or_auth(&values, RuntimeClientMessage::Event),
     49         "AUTH" => parse_event_or_auth(&values, RuntimeClientMessage::Auth),
     50         "REQ" => parse_req_or_count(&values, "REQ", |subscription_id, filters| {
     51             RuntimeClientMessage::Req {
     52                 subscription_id,
     53                 search_present: filters.iter().any(|filter| filter.search_present),
     54                 filters: filters
     55                     .into_iter()
     56                     .map(|filter| filter.filter)
     57                     .collect::<Vec<_>>(),
     58             }
     59         }),
     60         "COUNT" => parse_req_or_count(&values, "COUNT", |subscription_id, filters| {
     61             RuntimeClientMessage::Count {
     62                 subscription_id,
     63                 search_present: filters.iter().any(|filter| filter.search_present),
     64                 filters: filters
     65                     .into_iter()
     66                     .map(|filter| filter.filter)
     67                     .collect::<Vec<_>>(),
     68             }
     69         }),
     70         "CLOSE" => parse_close(&values),
     71         "NEG-OPEN" => parse_neg_open(&values),
     72         "NEG-MSG" => parse_neg_msg(&values),
     73         "NEG-CLOSE" => parse_neg_close(&values),
     74         unsupported => Err(format!(
     75             "client message command `{unsupported}` is unsupported"
     76         )),
     77     }
     78 }
     79 
     80 fn parse_event_or_auth(
     81     values: &[Box<RawValue>],
     82     build: impl FnOnce(PocketOwnedEvent) -> RuntimeClientMessage,
     83 ) -> Result<RuntimeClientMessage, String> {
     84     if values.len() != 2 {
     85         return Err("EVENT and AUTH client messages must contain exactly one event".to_owned());
     86     }
     87     let event = parse_pocket_event_json(values[1].get().as_bytes())
     88         .map_err(|error| error.message().to_owned())?;
     89     Ok(build(event))
     90 }
     91 
     92 fn parse_req_or_count(
     93     values: &[Box<RawValue>],
     94     command: &'static str,
     95     build: impl FnOnce(SubscriptionId, Vec<RuntimeParsedFilter>) -> RuntimeClientMessage,
     96 ) -> Result<RuntimeClientMessage, String> {
     97     if values.len() < 3 {
     98         return Err(format!(
     99             "{command} client message must contain a subscription id and filters"
    100         ));
    101     }
    102     let subscription_id = parse_subscription_id(&values[1], command)?;
    103     let filters = values[2..]
    104         .iter()
    105         .map(|value| parse_filter(value))
    106         .collect::<Result<Vec<_>, _>>()?;
    107     Ok(build(subscription_id, filters))
    108 }
    109 
    110 fn parse_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> {
    111     if values.len() != 2 {
    112         return Err("CLOSE client message must contain exactly 2 elements".to_owned());
    113     }
    114     parse_subscription_id(&values[1], "CLOSE").map(RuntimeClientMessage::Close)
    115 }
    116 
    117 fn parse_neg_open(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> {
    118     if values.len() != 4 {
    119         return Err(
    120             "NEG-OPEN client message must contain a subscription id, filter, and message"
    121                 .to_owned(),
    122         );
    123     }
    124     Ok(RuntimeClientMessage::NegOpen {
    125         subscription_id: parse_subscription_id(&values[1], "NEG-OPEN")?,
    126         filter: parse_filter(&values[2])?.filter,
    127         message: parse_negentropy_message(&values[3], "NEG-OPEN")?,
    128     })
    129 }
    130 
    131 fn parse_neg_msg(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> {
    132     if values.len() != 3 {
    133         return Err("NEG-MSG client message must contain a subscription id and message".to_owned());
    134     }
    135     Ok(RuntimeClientMessage::NegMsg {
    136         subscription_id: parse_subscription_id(&values[1], "NEG-MSG")?,
    137         message: parse_negentropy_message(&values[2], "NEG-MSG")?,
    138     })
    139 }
    140 
    141 fn parse_neg_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> {
    142     if values.len() != 2 {
    143         return Err("NEG-CLOSE client message must contain exactly 2 elements".to_owned());
    144     }
    145     parse_subscription_id(&values[1], "NEG-CLOSE").map(RuntimeClientMessage::NegClose)
    146 }
    147 
    148 fn parse_subscription_id(
    149     value: &RawValue,
    150     command: &'static str,
    151 ) -> Result<SubscriptionId, String> {
    152     serde_json::from_str::<String>(value.get())
    153         .map_err(|_| format!("{command} subscription id must be a string"))
    154         .and_then(|subscription_id| SubscriptionId::new(&subscription_id))
    155 }
    156 
    157 fn parse_negentropy_message(value: &RawValue, command: &'static str) -> Result<String, String> {
    158     let message = serde_json::from_str::<String>(value.get())
    159         .map_err(|_| format!("{command} message must be a string"))?;
    160     if message.len() % 2 == 0
    161         && message
    162             .bytes()
    163             .all(|byte| matches!(byte, b'0'..=b'9' | b'a'..=b'f'))
    164     {
    165         Ok(message)
    166     } else {
    167         Err(format!(
    168             "{command} message must be a lowercase even-length hex string"
    169         ))
    170     }
    171 }
    172 
    173 fn parse_filter(value: &RawValue) -> Result<RuntimeParsedFilter, String> {
    174     let shape = inspect_filter_shape(value.get())?;
    175     let search_present = shape.search;
    176     let pocket_filter_json = serde_json::Value::Object(shape.pocket_fields).to_string();
    177     let filter = parse_pocket_filter_json(pocket_filter_json.as_bytes())
    178         .map_err(|error| error.message().to_owned())?;
    179     Ok(RuntimeParsedFilter {
    180         filter,
    181         search_present,
    182     })
    183 }
    184 
    185 fn parse_string_value(
    186     value: Option<&RawValue>,
    187     missing: &'static str,
    188     invalid_type: &'static str,
    189 ) -> Result<String, String> {
    190     let Some(value) = value else {
    191         return Err(missing.to_owned());
    192     };
    193     serde_json::from_str::<String>(value.get()).map_err(|_| invalid_type.to_owned())
    194 }
    195 
    196 fn inspect_filter_shape(raw: &str) -> Result<RuntimeFilterShape, String> {
    197     let mut deserializer = serde_json::Deserializer::from_str(raw);
    198     let shape =
    199         RuntimeFilterShape::deserialize(&mut deserializer).map_err(filter_deserialize_error)?;
    200     deserializer
    201         .end()
    202         .map_err(|source| format!("filter JSON is invalid: {source}"))?;
    203     Ok(shape)
    204 }
    205 
    206 fn filter_deserialize_error(source: serde_json::Error) -> String {
    207     if source.classify() == serde_json::error::Category::Data {
    208         strip_json_location(&source.to_string()).to_owned()
    209     } else {
    210         format!("filter JSON is invalid: {source}")
    211     }
    212 }
    213 
    214 fn strip_json_location(message: &str) -> &str {
    215     message
    216         .split_once(" at line ")
    217         .map_or(message, |(head, _)| head)
    218 }
    219 
    220 #[derive(Debug, Clone, PartialEq, Eq, Default)]
    221 struct RuntimeFilterShape {
    222     search: bool,
    223     pocket_fields: serde_json::Map<String, serde_json::Value>,
    224 }
    225 
    226 #[derive(Debug, Clone, PartialEq, Eq)]
    227 struct RuntimeParsedFilter {
    228     filter: PocketOwnedFilter,
    229     search_present: bool,
    230 }
    231 
    232 impl<'de> Deserialize<'de> for RuntimeFilterShape {
    233     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    234     where
    235         D: Deserializer<'de>,
    236     {
    237         deserializer.deserialize_map(RuntimeFilterShapeVisitor)
    238     }
    239 }
    240 
    241 struct RuntimeFilterShapeVisitor;
    242 
    243 impl<'de> Visitor<'de> for RuntimeFilterShapeVisitor {
    244     type Value = RuntimeFilterShape;
    245 
    246     fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
    247         formatter.write_str("a filter JSON object")
    248     }
    249 
    250     fn visit_map<A>(self, mut object: A) -> Result<Self::Value, A::Error>
    251     where
    252         A: MapAccess<'de>,
    253     {
    254         let mut fields = BTreeSet::new();
    255         let mut shape = RuntimeFilterShape::default();
    256         while let Some(field) = object.next_key::<String>()? {
    257             if !fields.insert(field.clone()) {
    258                 return Err(de::Error::custom(format!(
    259                     "duplicate object field `{field}`"
    260                 )));
    261             }
    262             let value = object.next_value::<serde_json::Value>()?;
    263             match field.as_str() {
    264                 "ids" | "authors" | "kinds" => {
    265                     validate_non_empty_array(&field, &value)?;
    266                     shape.pocket_fields.insert(field, value);
    267                 }
    268                 "since" | "until" => {
    269                     validate_u64_field(&field, &value)?;
    270                     shape.pocket_fields.insert(field, value);
    271                 }
    272                 "limit" => {
    273                     validate_limit_field(&field, &value)?;
    274                     shape.pocket_fields.insert(field, value);
    275                 }
    276                 "search" => {
    277                     value.as_str().ok_or_else(|| {
    278                         de::Error::custom(format!("filter field `{field}` must be a string"))
    279                     })?;
    280                     shape.search = true;
    281                 }
    282                 tag_field if tag_field.starts_with('#') => {
    283                     validate_tag_filter_field(tag_field, &value)?;
    284                     shape.pocket_fields.insert(field, value);
    285                 }
    286                 unsupported => {
    287                     return Err(de::Error::custom(format!(
    288                         "filter field `{unsupported}` is unsupported"
    289                     )));
    290                 }
    291             }
    292         }
    293         Ok(shape)
    294     }
    295 }
    296 
    297 fn validate_non_empty_array<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
    298 where
    299     E: de::Error,
    300 {
    301     match value.as_array() {
    302         Some(items) if !items.is_empty() => Ok(()),
    303         _ => Err(de::Error::custom(format!(
    304             "filter field `{field}` must be a non-empty array"
    305         ))),
    306     }
    307 }
    308 
    309 fn validate_u64_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
    310 where
    311     E: de::Error,
    312 {
    313     value.as_u64().map(|_| ()).ok_or_else(|| {
    314         de::Error::custom(format!(
    315             "filter field `{field}` must be an unsigned integer"
    316         ))
    317     })
    318 }
    319 
    320 fn validate_limit_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
    321 where
    322     E: de::Error,
    323 {
    324     let limit = value.as_u64().ok_or_else(|| {
    325         de::Error::custom(format!(
    326             "filter field `{field}` must be an unsigned integer"
    327         ))
    328     })?;
    329     u32::try_from(limit)
    330         .map(|_| ())
    331         .map_err(|_| de::Error::custom(format!("filter field `{field}` exceeds Pocket range")))
    332 }
    333 
    334 fn validate_tag_filter_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E>
    335 where
    336     E: de::Error,
    337 {
    338     let name = &field[1..];
    339     let tag_name = TagName::new(name).map_err(|reason| {
    340         de::Error::custom(format!("filter field `{field}` is invalid: {reason}"))
    341     })?;
    342     if !tag_name.is_indexable() {
    343         return Err(de::Error::custom(format!(
    344             "filter field `{field}` is invalid: tag name must be a single ASCII letter"
    345         )));
    346     }
    347     validate_non_empty_array(field, value)
    348 }
    349 
    350 #[cfg(test)]
    351 mod tests {
    352     use super::{RuntimeClientMessage, parse_runtime_client_message};
    353     use serde_json::json;
    354     use tangle_protocol::event_to_value;
    355     use tangle_test_support::{FixtureKey, tangle_v2_auth_event, tangle_v2_event};
    356 
    357     #[test]
    358     fn runtime_parser_maps_event_and_auth_through_pocket_event_json() {
    359         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello")
    360             .expect("event");
    361         let RuntimeClientMessage::Event(parsed_event) =
    362             parse_runtime_client_message(&json!(["EVENT", event_to_value(&event)]).to_string())
    363                 .expect("event")
    364         else {
    365             panic!("event expected")
    366         };
    367         assert_eq!(parsed_event.id().as_hex_string(), event.id().as_str());
    368 
    369         let auth =
    370             tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_434).expect("auth");
    371         let RuntimeClientMessage::Auth(parsed_auth) =
    372             parse_runtime_client_message(&json!(["AUTH", event_to_value(&auth)]).to_string())
    373                 .expect("auth")
    374         else {
    375             panic!("auth expected")
    376         };
    377         assert_eq!(parsed_auth.id().as_hex_string(), auth.id().as_str());
    378     }
    379 
    380     #[test]
    381     fn runtime_parser_rejects_malformed_event_and_auth_payloads() {
    382         assert_eq!(
    383             parse_runtime_client_message("[\"EVENT\"]").expect_err("missing event"),
    384             "EVENT and AUTH client messages must contain exactly one event"
    385         );
    386         assert_eq!(
    387             parse_runtime_client_message("[\"AUTH\",{},{}]").expect_err("too many auth values"),
    388             "EVENT and AUTH client messages must contain exactly one event"
    389         );
    390         assert!(
    391             !parse_runtime_client_message("[\"EVENT\",{\"id\":5}]")
    392                 .expect_err("invalid event")
    393                 .is_empty()
    394         );
    395     }
    396 
    397     #[test]
    398     fn runtime_parser_maps_req_count_and_close_without_protocol_parser_delegation() {
    399         let filter = json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30});
    400         let RuntimeClientMessage::Req {
    401             subscription_id,
    402             filters,
    403             search_present,
    404         } = parse_runtime_client_message(&json!(["REQ", "sub", filter.clone()]).to_string())
    405             .expect("req")
    406         else {
    407             panic!("req expected")
    408         };
    409         assert_eq!(subscription_id.as_str(), "sub");
    410         assert_eq!(filters.len(), 1);
    411         assert!(!search_present);
    412 
    413         let RuntimeClientMessage::Count {
    414             subscription_id,
    415             filters,
    416             search_present,
    417         } = parse_runtime_client_message(&json!(["COUNT", "sub", filter]).to_string())
    418             .expect("count")
    419         else {
    420             panic!("count expected")
    421         };
    422         assert_eq!(subscription_id.as_str(), "sub");
    423         assert_eq!(filters.len(), 1);
    424         assert!(!search_present);
    425         assert_eq!(
    426             parse_runtime_client_message("[\"CLOSE\",\"sub\"]").expect("close"),
    427             RuntimeClientMessage::Close("sub".parse().expect("subscription"))
    428         );
    429     }
    430 
    431     #[test]
    432     fn runtime_parser_maps_negentropy_commands_without_protocol_parser_delegation() {
    433         let RuntimeClientMessage::NegOpen {
    434             subscription_id,
    435             filter,
    436             message,
    437         } = parse_runtime_client_message(
    438             &json!(["NEG-OPEN", "neg-sub", json!({"kinds": [1]}), "00ff"]).to_string(),
    439         )
    440         .expect("neg open")
    441         else {
    442             panic!("neg open expected")
    443         };
    444         assert_eq!(subscription_id.as_str(), "neg-sub");
    445         assert_eq!(filter.kinds().count(), 1);
    446         assert_eq!(message, "00ff");
    447         assert_eq!(
    448             parse_runtime_client_message("[\"NEG-MSG\",\"neg-sub\",\"\"]").expect("neg msg"),
    449             RuntimeClientMessage::NegMsg {
    450                 subscription_id: "neg-sub".parse().expect("subscription"),
    451                 message: String::new()
    452             }
    453         );
    454         assert_eq!(
    455             parse_runtime_client_message("[\"NEG-CLOSE\",\"neg-sub\"]").expect("neg close"),
    456             RuntimeClientMessage::NegClose("neg-sub".parse().expect("subscription"))
    457         );
    458     }
    459 
    460     #[test]
    461     fn runtime_parser_preserves_search_rejection_marker_before_dispatch() {
    462         let RuntimeClientMessage::Req { search_present, .. } =
    463             parse_runtime_client_message("[\"REQ\",\"sub\",{\"search\":\"carrots\",\"limit\":1}]")
    464                 .expect("search req")
    465         else {
    466             panic!("req expected")
    467         };
    468         assert!(search_present);
    469     }
    470 
    471     #[test]
    472     fn runtime_parser_rejects_malformed_req_and_count_filters() {
    473         for (raw, expected) in [
    474             (
    475                 "[\"REQ\",\"sub\",{\"ids\":[]}]",
    476                 "filter field `ids` must be a non-empty array",
    477             ),
    478             (
    479                 "[\"REQ\",\"sub\",{\"unknown\":true}]",
    480                 "filter field `unknown` is unsupported",
    481             ),
    482             (
    483                 "[\"REQ\",\"sub\",{\"#aa\":[\"value\"]}]",
    484                 "filter field `#aa` is invalid: tag name must be a single ASCII letter",
    485             ),
    486             (
    487                 "[\"COUNT\",\"sub\",{\"limit\":4294967296}]",
    488                 "filter field `limit` exceeds Pocket range",
    489             ),
    490             (
    491                 "[\"COUNT\",\"sub\",{\"authors\":[\"BAD\"]}]",
    492                 "Too short reading pubkey",
    493             ),
    494             (
    495                 "[\"REQ\",\"sub\",{\"limit\":1,\"limit\":2}]",
    496                 "duplicate object field `limit`",
    497             ),
    498         ] {
    499             let actual = parse_runtime_client_message(raw).expect_err(raw);
    500             assert!(actual.contains(expected), "{actual}");
    501         }
    502     }
    503 
    504     #[test]
    505     fn runtime_parser_rejects_malformed_negentropy_commands() {
    506         for (raw, expected) in [
    507             (
    508                 "[\"NEG-OPEN\",\"sub\",{}]",
    509                 "NEG-OPEN client message must contain a subscription id, filter, and message",
    510             ),
    511             (
    512                 "[\"NEG-OPEN\",1,{},\"00\"]",
    513                 "NEG-OPEN subscription id must be a string",
    514             ),
    515             (
    516                 "[\"NEG-OPEN\",\"sub\",1,\"00\"]",
    517                 "expected a filter JSON object",
    518             ),
    519             (
    520                 "[\"NEG-OPEN\",\"sub\",{},1]",
    521                 "NEG-OPEN message must be a string",
    522             ),
    523             (
    524                 "[\"NEG-MSG\",\"sub\",\"0\"]",
    525                 "NEG-MSG message must be a lowercase even-length hex string",
    526             ),
    527             (
    528                 "[\"NEG-MSG\",\"sub\",\"0G\"]",
    529                 "NEG-MSG message must be a lowercase even-length hex string",
    530             ),
    531             (
    532                 "[\"NEG-CLOSE\",\"sub\",{}]",
    533                 "NEG-CLOSE client message must contain exactly 2 elements",
    534             ),
    535         ] {
    536             let actual = parse_runtime_client_message(raw).expect_err(raw);
    537             assert!(actual.contains(expected), "{actual}");
    538         }
    539     }
    540 }