client_message.rs (18987B)
1 #![forbid(unsafe_code)] 2 3 use serde::de::{self, Deserialize, Deserializer, MapAccess, Visitor}; 4 use serde_json::value::RawValue; 5 use std::collections::BTreeSet; 6 use std::fmt; 7 use tangle_protocol::{SubscriptionId, TagName}; 8 use tangle_store_pocket::{ 9 PocketOwnedEvent, PocketOwnedFilter, parse_pocket_event_json, parse_pocket_filter_json, 10 }; 11 12 #[derive(Debug, Clone, PartialEq, Eq)] 13 pub(crate) enum RuntimeClientMessage { 14 Event(PocketOwnedEvent), 15 Auth(PocketOwnedEvent), 16 Req { 17 subscription_id: SubscriptionId, 18 filters: Vec<PocketOwnedFilter>, 19 search_present: bool, 20 }, 21 Count { 22 subscription_id: SubscriptionId, 23 filters: Vec<PocketOwnedFilter>, 24 search_present: bool, 25 }, 26 Close(SubscriptionId), 27 NegOpen { 28 subscription_id: SubscriptionId, 29 filter: PocketOwnedFilter, 30 message: String, 31 }, 32 NegMsg { 33 subscription_id: SubscriptionId, 34 message: String, 35 }, 36 NegClose(SubscriptionId), 37 } 38 39 pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<RuntimeClientMessage, String> { 40 let values = serde_json::from_str::<Vec<Box<RawValue>>>(raw) 41 .map_err(|source| format!("client message JSON is invalid: {source}"))?; 42 let command = parse_string_value( 43 values.first().map(Box::as_ref), 44 "client message command is missing", 45 "client message command must be a string", 46 )?; 47 match command.as_str() { 48 "EVENT" => parse_event_or_auth(&values, RuntimeClientMessage::Event), 49 "AUTH" => parse_event_or_auth(&values, RuntimeClientMessage::Auth), 50 "REQ" => parse_req_or_count(&values, "REQ", |subscription_id, filters| { 51 RuntimeClientMessage::Req { 52 subscription_id, 53 search_present: filters.iter().any(|filter| filter.search_present), 54 filters: filters 55 .into_iter() 56 .map(|filter| filter.filter) 57 .collect::<Vec<_>>(), 58 } 59 }), 60 "COUNT" => parse_req_or_count(&values, "COUNT", |subscription_id, filters| { 61 RuntimeClientMessage::Count { 62 subscription_id, 63 search_present: filters.iter().any(|filter| filter.search_present), 64 filters: filters 65 .into_iter() 66 .map(|filter| filter.filter) 67 .collect::<Vec<_>>(), 68 } 69 }), 70 "CLOSE" => parse_close(&values), 71 "NEG-OPEN" => parse_neg_open(&values), 72 "NEG-MSG" => parse_neg_msg(&values), 73 "NEG-CLOSE" => parse_neg_close(&values), 74 unsupported => Err(format!( 75 "client message command `{unsupported}` is unsupported" 76 )), 77 } 78 } 79 80 fn parse_event_or_auth( 81 values: &[Box<RawValue>], 82 build: impl FnOnce(PocketOwnedEvent) -> RuntimeClientMessage, 83 ) -> Result<RuntimeClientMessage, String> { 84 if values.len() != 2 { 85 return Err("EVENT and AUTH client messages must contain exactly one event".to_owned()); 86 } 87 let event = parse_pocket_event_json(values[1].get().as_bytes()) 88 .map_err(|error| error.message().to_owned())?; 89 Ok(build(event)) 90 } 91 92 fn parse_req_or_count( 93 values: &[Box<RawValue>], 94 command: &'static str, 95 build: impl FnOnce(SubscriptionId, Vec<RuntimeParsedFilter>) -> RuntimeClientMessage, 96 ) -> Result<RuntimeClientMessage, String> { 97 if values.len() < 3 { 98 return Err(format!( 99 "{command} client message must contain a subscription id and filters" 100 )); 101 } 102 let subscription_id = parse_subscription_id(&values[1], command)?; 103 let filters = values[2..] 104 .iter() 105 .map(|value| parse_filter(value)) 106 .collect::<Result<Vec<_>, _>>()?; 107 Ok(build(subscription_id, filters)) 108 } 109 110 fn parse_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { 111 if values.len() != 2 { 112 return Err("CLOSE client message must contain exactly 2 elements".to_owned()); 113 } 114 parse_subscription_id(&values[1], "CLOSE").map(RuntimeClientMessage::Close) 115 } 116 117 fn parse_neg_open(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { 118 if values.len() != 4 { 119 return Err( 120 "NEG-OPEN client message must contain a subscription id, filter, and message" 121 .to_owned(), 122 ); 123 } 124 Ok(RuntimeClientMessage::NegOpen { 125 subscription_id: parse_subscription_id(&values[1], "NEG-OPEN")?, 126 filter: parse_filter(&values[2])?.filter, 127 message: parse_negentropy_message(&values[3], "NEG-OPEN")?, 128 }) 129 } 130 131 fn parse_neg_msg(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { 132 if values.len() != 3 { 133 return Err("NEG-MSG client message must contain a subscription id and message".to_owned()); 134 } 135 Ok(RuntimeClientMessage::NegMsg { 136 subscription_id: parse_subscription_id(&values[1], "NEG-MSG")?, 137 message: parse_negentropy_message(&values[2], "NEG-MSG")?, 138 }) 139 } 140 141 fn parse_neg_close(values: &[Box<RawValue>]) -> Result<RuntimeClientMessage, String> { 142 if values.len() != 2 { 143 return Err("NEG-CLOSE client message must contain exactly 2 elements".to_owned()); 144 } 145 parse_subscription_id(&values[1], "NEG-CLOSE").map(RuntimeClientMessage::NegClose) 146 } 147 148 fn parse_subscription_id( 149 value: &RawValue, 150 command: &'static str, 151 ) -> Result<SubscriptionId, String> { 152 serde_json::from_str::<String>(value.get()) 153 .map_err(|_| format!("{command} subscription id must be a string")) 154 .and_then(|subscription_id| SubscriptionId::new(&subscription_id)) 155 } 156 157 fn parse_negentropy_message(value: &RawValue, command: &'static str) -> Result<String, String> { 158 let message = serde_json::from_str::<String>(value.get()) 159 .map_err(|_| format!("{command} message must be a string"))?; 160 if message.len() % 2 == 0 161 && message 162 .bytes() 163 .all(|byte| matches!(byte, b'0'..=b'9' | b'a'..=b'f')) 164 { 165 Ok(message) 166 } else { 167 Err(format!( 168 "{command} message must be a lowercase even-length hex string" 169 )) 170 } 171 } 172 173 fn parse_filter(value: &RawValue) -> Result<RuntimeParsedFilter, String> { 174 let shape = inspect_filter_shape(value.get())?; 175 let search_present = shape.search; 176 let pocket_filter_json = serde_json::Value::Object(shape.pocket_fields).to_string(); 177 let filter = parse_pocket_filter_json(pocket_filter_json.as_bytes()) 178 .map_err(|error| error.message().to_owned())?; 179 Ok(RuntimeParsedFilter { 180 filter, 181 search_present, 182 }) 183 } 184 185 fn parse_string_value( 186 value: Option<&RawValue>, 187 missing: &'static str, 188 invalid_type: &'static str, 189 ) -> Result<String, String> { 190 let Some(value) = value else { 191 return Err(missing.to_owned()); 192 }; 193 serde_json::from_str::<String>(value.get()).map_err(|_| invalid_type.to_owned()) 194 } 195 196 fn inspect_filter_shape(raw: &str) -> Result<RuntimeFilterShape, String> { 197 let mut deserializer = serde_json::Deserializer::from_str(raw); 198 let shape = 199 RuntimeFilterShape::deserialize(&mut deserializer).map_err(filter_deserialize_error)?; 200 deserializer 201 .end() 202 .map_err(|source| format!("filter JSON is invalid: {source}"))?; 203 Ok(shape) 204 } 205 206 fn filter_deserialize_error(source: serde_json::Error) -> String { 207 if source.classify() == serde_json::error::Category::Data { 208 strip_json_location(&source.to_string()).to_owned() 209 } else { 210 format!("filter JSON is invalid: {source}") 211 } 212 } 213 214 fn strip_json_location(message: &str) -> &str { 215 message 216 .split_once(" at line ") 217 .map_or(message, |(head, _)| head) 218 } 219 220 #[derive(Debug, Clone, PartialEq, Eq, Default)] 221 struct RuntimeFilterShape { 222 search: bool, 223 pocket_fields: serde_json::Map<String, serde_json::Value>, 224 } 225 226 #[derive(Debug, Clone, PartialEq, Eq)] 227 struct RuntimeParsedFilter { 228 filter: PocketOwnedFilter, 229 search_present: bool, 230 } 231 232 impl<'de> Deserialize<'de> for RuntimeFilterShape { 233 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 234 where 235 D: Deserializer<'de>, 236 { 237 deserializer.deserialize_map(RuntimeFilterShapeVisitor) 238 } 239 } 240 241 struct RuntimeFilterShapeVisitor; 242 243 impl<'de> Visitor<'de> for RuntimeFilterShapeVisitor { 244 type Value = RuntimeFilterShape; 245 246 fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { 247 formatter.write_str("a filter JSON object") 248 } 249 250 fn visit_map<A>(self, mut object: A) -> Result<Self::Value, A::Error> 251 where 252 A: MapAccess<'de>, 253 { 254 let mut fields = BTreeSet::new(); 255 let mut shape = RuntimeFilterShape::default(); 256 while let Some(field) = object.next_key::<String>()? { 257 if !fields.insert(field.clone()) { 258 return Err(de::Error::custom(format!( 259 "duplicate object field `{field}`" 260 ))); 261 } 262 let value = object.next_value::<serde_json::Value>()?; 263 match field.as_str() { 264 "ids" | "authors" | "kinds" => { 265 validate_non_empty_array(&field, &value)?; 266 shape.pocket_fields.insert(field, value); 267 } 268 "since" | "until" => { 269 validate_u64_field(&field, &value)?; 270 shape.pocket_fields.insert(field, value); 271 } 272 "limit" => { 273 validate_limit_field(&field, &value)?; 274 shape.pocket_fields.insert(field, value); 275 } 276 "search" => { 277 value.as_str().ok_or_else(|| { 278 de::Error::custom(format!("filter field `{field}` must be a string")) 279 })?; 280 shape.search = true; 281 } 282 tag_field if tag_field.starts_with('#') => { 283 validate_tag_filter_field(tag_field, &value)?; 284 shape.pocket_fields.insert(field, value); 285 } 286 unsupported => { 287 return Err(de::Error::custom(format!( 288 "filter field `{unsupported}` is unsupported" 289 ))); 290 } 291 } 292 } 293 Ok(shape) 294 } 295 } 296 297 fn validate_non_empty_array<E>(field: &str, value: &serde_json::Value) -> Result<(), E> 298 where 299 E: de::Error, 300 { 301 match value.as_array() { 302 Some(items) if !items.is_empty() => Ok(()), 303 _ => Err(de::Error::custom(format!( 304 "filter field `{field}` must be a non-empty array" 305 ))), 306 } 307 } 308 309 fn validate_u64_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E> 310 where 311 E: de::Error, 312 { 313 value.as_u64().map(|_| ()).ok_or_else(|| { 314 de::Error::custom(format!( 315 "filter field `{field}` must be an unsigned integer" 316 )) 317 }) 318 } 319 320 fn validate_limit_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E> 321 where 322 E: de::Error, 323 { 324 let limit = value.as_u64().ok_or_else(|| { 325 de::Error::custom(format!( 326 "filter field `{field}` must be an unsigned integer" 327 )) 328 })?; 329 u32::try_from(limit) 330 .map(|_| ()) 331 .map_err(|_| de::Error::custom(format!("filter field `{field}` exceeds Pocket range"))) 332 } 333 334 fn validate_tag_filter_field<E>(field: &str, value: &serde_json::Value) -> Result<(), E> 335 where 336 E: de::Error, 337 { 338 let name = &field[1..]; 339 let tag_name = TagName::new(name).map_err(|reason| { 340 de::Error::custom(format!("filter field `{field}` is invalid: {reason}")) 341 })?; 342 if !tag_name.is_indexable() { 343 return Err(de::Error::custom(format!( 344 "filter field `{field}` is invalid: tag name must be a single ASCII letter" 345 ))); 346 } 347 validate_non_empty_array(field, value) 348 } 349 350 #[cfg(test)] 351 mod tests { 352 use super::{RuntimeClientMessage, parse_runtime_client_message}; 353 use serde_json::json; 354 use tangle_protocol::event_to_value; 355 use tangle_test_support::{FixtureKey, tangle_v2_auth_event, tangle_v2_event}; 356 357 #[test] 358 fn runtime_parser_maps_event_and_auth_through_pocket_event_json() { 359 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello") 360 .expect("event"); 361 let RuntimeClientMessage::Event(parsed_event) = 362 parse_runtime_client_message(&json!(["EVENT", event_to_value(&event)]).to_string()) 363 .expect("event") 364 else { 365 panic!("event expected") 366 }; 367 assert_eq!(parsed_event.id().as_hex_string(), event.id().as_str()); 368 369 let auth = 370 tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_434).expect("auth"); 371 let RuntimeClientMessage::Auth(parsed_auth) = 372 parse_runtime_client_message(&json!(["AUTH", event_to_value(&auth)]).to_string()) 373 .expect("auth") 374 else { 375 panic!("auth expected") 376 }; 377 assert_eq!(parsed_auth.id().as_hex_string(), auth.id().as_str()); 378 } 379 380 #[test] 381 fn runtime_parser_rejects_malformed_event_and_auth_payloads() { 382 assert_eq!( 383 parse_runtime_client_message("[\"EVENT\"]").expect_err("missing event"), 384 "EVENT and AUTH client messages must contain exactly one event" 385 ); 386 assert_eq!( 387 parse_runtime_client_message("[\"AUTH\",{},{}]").expect_err("too many auth values"), 388 "EVENT and AUTH client messages must contain exactly one event" 389 ); 390 assert!( 391 !parse_runtime_client_message("[\"EVENT\",{\"id\":5}]") 392 .expect_err("invalid event") 393 .is_empty() 394 ); 395 } 396 397 #[test] 398 fn runtime_parser_maps_req_count_and_close_without_protocol_parser_delegation() { 399 let filter = json!({"ids":["a".repeat(64)],"authors":["b".repeat(64)],"kinds":[1],"#t":["market"],"since":10,"until":20,"limit":30}); 400 let RuntimeClientMessage::Req { 401 subscription_id, 402 filters, 403 search_present, 404 } = parse_runtime_client_message(&json!(["REQ", "sub", filter.clone()]).to_string()) 405 .expect("req") 406 else { 407 panic!("req expected") 408 }; 409 assert_eq!(subscription_id.as_str(), "sub"); 410 assert_eq!(filters.len(), 1); 411 assert!(!search_present); 412 413 let RuntimeClientMessage::Count { 414 subscription_id, 415 filters, 416 search_present, 417 } = parse_runtime_client_message(&json!(["COUNT", "sub", filter]).to_string()) 418 .expect("count") 419 else { 420 panic!("count expected") 421 }; 422 assert_eq!(subscription_id.as_str(), "sub"); 423 assert_eq!(filters.len(), 1); 424 assert!(!search_present); 425 assert_eq!( 426 parse_runtime_client_message("[\"CLOSE\",\"sub\"]").expect("close"), 427 RuntimeClientMessage::Close("sub".parse().expect("subscription")) 428 ); 429 } 430 431 #[test] 432 fn runtime_parser_maps_negentropy_commands_without_protocol_parser_delegation() { 433 let RuntimeClientMessage::NegOpen { 434 subscription_id, 435 filter, 436 message, 437 } = parse_runtime_client_message( 438 &json!(["NEG-OPEN", "neg-sub", json!({"kinds": [1]}), "00ff"]).to_string(), 439 ) 440 .expect("neg open") 441 else { 442 panic!("neg open expected") 443 }; 444 assert_eq!(subscription_id.as_str(), "neg-sub"); 445 assert_eq!(filter.kinds().count(), 1); 446 assert_eq!(message, "00ff"); 447 assert_eq!( 448 parse_runtime_client_message("[\"NEG-MSG\",\"neg-sub\",\"\"]").expect("neg msg"), 449 RuntimeClientMessage::NegMsg { 450 subscription_id: "neg-sub".parse().expect("subscription"), 451 message: String::new() 452 } 453 ); 454 assert_eq!( 455 parse_runtime_client_message("[\"NEG-CLOSE\",\"neg-sub\"]").expect("neg close"), 456 RuntimeClientMessage::NegClose("neg-sub".parse().expect("subscription")) 457 ); 458 } 459 460 #[test] 461 fn runtime_parser_preserves_search_rejection_marker_before_dispatch() { 462 let RuntimeClientMessage::Req { search_present, .. } = 463 parse_runtime_client_message("[\"REQ\",\"sub\",{\"search\":\"carrots\",\"limit\":1}]") 464 .expect("search req") 465 else { 466 panic!("req expected") 467 }; 468 assert!(search_present); 469 } 470 471 #[test] 472 fn runtime_parser_rejects_malformed_req_and_count_filters() { 473 for (raw, expected) in [ 474 ( 475 "[\"REQ\",\"sub\",{\"ids\":[]}]", 476 "filter field `ids` must be a non-empty array", 477 ), 478 ( 479 "[\"REQ\",\"sub\",{\"unknown\":true}]", 480 "filter field `unknown` is unsupported", 481 ), 482 ( 483 "[\"REQ\",\"sub\",{\"#aa\":[\"value\"]}]", 484 "filter field `#aa` is invalid: tag name must be a single ASCII letter", 485 ), 486 ( 487 "[\"COUNT\",\"sub\",{\"limit\":4294967296}]", 488 "filter field `limit` exceeds Pocket range", 489 ), 490 ( 491 "[\"COUNT\",\"sub\",{\"authors\":[\"BAD\"]}]", 492 "Too short reading pubkey", 493 ), 494 ( 495 "[\"REQ\",\"sub\",{\"limit\":1,\"limit\":2}]", 496 "duplicate object field `limit`", 497 ), 498 ] { 499 let actual = parse_runtime_client_message(raw).expect_err(raw); 500 assert!(actual.contains(expected), "{actual}"); 501 } 502 } 503 504 #[test] 505 fn runtime_parser_rejects_malformed_negentropy_commands() { 506 for (raw, expected) in [ 507 ( 508 "[\"NEG-OPEN\",\"sub\",{}]", 509 "NEG-OPEN client message must contain a subscription id, filter, and message", 510 ), 511 ( 512 "[\"NEG-OPEN\",1,{},\"00\"]", 513 "NEG-OPEN subscription id must be a string", 514 ), 515 ( 516 "[\"NEG-OPEN\",\"sub\",1,\"00\"]", 517 "expected a filter JSON object", 518 ), 519 ( 520 "[\"NEG-OPEN\",\"sub\",{},1]", 521 "NEG-OPEN message must be a string", 522 ), 523 ( 524 "[\"NEG-MSG\",\"sub\",\"0\"]", 525 "NEG-MSG message must be a lowercase even-length hex string", 526 ), 527 ( 528 "[\"NEG-MSG\",\"sub\",\"0G\"]", 529 "NEG-MSG message must be a lowercase even-length hex string", 530 ), 531 ( 532 "[\"NEG-CLOSE\",\"sub\",{}]", 533 "NEG-CLOSE client message must contain exactly 2 elements", 534 ), 535 ] { 536 let actual = parse_runtime_client_message(raw).expect_err(raw); 537 assert!(actual.contains(expected), "{actual}"); 538 } 539 } 540 }