session.rs (77092B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 client_message::{RuntimeClientMessage, parse_runtime_client_message}, 5 errors::BaseRelayError, 6 event_bus::{TangleEventReceiveError, TangleEventReceiver}, 7 logging, 8 relay::{ 9 auth::{BaseAuthState, generate_auth_challenge}, 10 core::BaseRelay, 11 live::{CloseResult, LiveSubscriptionSet}, 12 outbound::RuntimeRelayMessage, 13 }, 14 resource_limits::{RelayResourceLimiter, RelaySubscriptionPermit}, 15 runtime::{ 16 RelayRuntimeHandle, TangleClientMessageMetricKind, TangleClientRateLimitContext, 17 TangleRuntimeLimits, 18 }, 19 }; 20 use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket}; 21 use std::{ 22 collections::BTreeMap, 23 net::IpAddr, 24 sync::atomic::{AtomicU64, Ordering}, 25 time::{Instant, SystemTime, UNIX_EPOCH}, 26 }; 27 use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp}; 28 use tangle_store_pocket::PocketOwnedFilter; 29 use tokio::sync::{mpsc, watch}; 30 31 #[derive(Debug)] 32 pub struct TangleWebSocketSession { 33 connection_id: u64, 34 peer_ip: Option<IpAddr>, 35 connected_at: Instant, 36 outbound: TangleOutboundSender, 37 outbound_receiver: mpsc::Receiver<Message>, 38 shutdown: watch::Receiver<bool>, 39 runtime: RelayRuntimeHandle, 40 limits: TangleRuntimeLimits, 41 auth: BaseAuthState, 42 subscriptions: LiveSubscriptionSet, 43 resource_limiter: Option<RelayResourceLimiter>, 44 subscription_permits: BTreeMap<SubscriptionId, RelaySubscriptionPermit>, 45 events: TangleEventReceiver, 46 } 47 48 static NEXT_TANGLE_CONNECTION_ID: AtomicU64 = AtomicU64::new(1); 49 50 impl TangleWebSocketSession { 51 pub fn new( 52 limits: TangleRuntimeLimits, 53 shutdown: watch::Receiver<bool>, 54 runtime: RelayRuntimeHandle, 55 auth: BaseAuthState, 56 events: TangleEventReceiver, 57 ) -> Result<Self, BaseRelayError> { 58 Self::new_with_peer(limits, shutdown, runtime, auth, events, None) 59 } 60 61 pub fn new_with_peer( 62 limits: TangleRuntimeLimits, 63 shutdown: watch::Receiver<bool>, 64 runtime: RelayRuntimeHandle, 65 auth: BaseAuthState, 66 events: TangleEventReceiver, 67 peer_ip: Option<IpAddr>, 68 ) -> Result<Self, BaseRelayError> { 69 Self::new_with_peer_and_resources(limits, shutdown, runtime, auth, events, peer_ip, None) 70 } 71 72 pub fn new_with_peer_and_resources( 73 limits: TangleRuntimeLimits, 74 shutdown: watch::Receiver<bool>, 75 runtime: RelayRuntimeHandle, 76 auth: BaseAuthState, 77 events: TangleEventReceiver, 78 peer_ip: Option<IpAddr>, 79 resource_limiter: Option<RelayResourceLimiter>, 80 ) -> Result<Self, BaseRelayError> { 81 let outbound_queue_capacity = limits.outbound_queue_capacity(); 82 let (sender, receiver) = mpsc::channel(outbound_queue_capacity); 83 let subscriptions = LiveSubscriptionSet::new( 84 limits.base_relay_limits().max_pending_events(), 85 limits.base_relay_limits().max_subscriptions(), 86 )?; 87 Ok(Self { 88 connection_id: NEXT_TANGLE_CONNECTION_ID.fetch_add(1, Ordering::Relaxed), 89 peer_ip, 90 connected_at: Instant::now(), 91 outbound: TangleOutboundSender { 92 sender, 93 capacity: outbound_queue_capacity, 94 }, 95 outbound_receiver: receiver, 96 shutdown, 97 runtime, 98 limits, 99 auth, 100 subscriptions, 101 resource_limiter, 102 subscription_permits: BTreeMap::new(), 103 events, 104 }) 105 } 106 107 pub fn connected_at(&self) -> Instant { 108 self.connected_at 109 } 110 111 pub fn outbound(&self) -> TangleOutboundSender { 112 self.outbound.clone() 113 } 114 115 pub fn shutdown_requested(&self) -> bool { 116 *self.shutdown.borrow() 117 } 118 119 #[cfg(test)] 120 fn active_subscription_count(&self) -> usize { 121 self.subscriptions.active_count() 122 } 123 124 pub async fn run(mut self, mut socket: WebSocket) { 125 let metrics = self.runtime.metrics(); 126 metrics.record_session_opened(); 127 logging::log_websocket_session_opened(self.connection_id, self.peer_ip); 128 if !self.issue_auth_challenge() { 129 let closed_subscriptions = self.close_all_subscriptions(); 130 metrics.record_subscriptions_closed(closed_subscriptions); 131 metrics.record_session_closed(); 132 metrics.record_event_bus_receivers( 133 metrics.event_bus_receivers_current().saturating_sub(1), 134 ); 135 logging::log_websocket_session_closed( 136 self.connection_id, 137 self.peer_ip, 138 closed_subscriptions, 139 ); 140 return; 141 } 142 loop { 143 if self.shutdown_requested() { 144 let _ = socket.send(Message::Close(None)).await; 145 break; 146 } 147 tokio::select! { 148 incoming = socket.recv() => { 149 match incoming { 150 Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break, 151 Some(Ok(message)) => { 152 match self.handle_incoming_message(message).await { 153 TangleSessionControl::Continue => {} 154 TangleSessionControl::Close(message) => { 155 let _ = socket.send(message).await; 156 break; 157 } 158 TangleSessionControl::Stop => break, 159 } 160 } 161 } 162 } 163 outgoing = self.outbound_receiver.recv() => { 164 let Some(message) = outgoing else { 165 break; 166 }; 167 if socket.send(message).await.is_err() { 168 break; 169 } 170 } 171 event = self.events.recv() => { 172 match self.handle_event_receive_result(event).await { 173 TangleSessionControl::Continue => {} 174 TangleSessionControl::Close(message) => { 175 let _ = socket.send(message).await; 176 break; 177 } 178 TangleSessionControl::Stop => break, 179 } 180 } 181 changed = self.shutdown.changed() => { 182 if changed.is_err() || self.shutdown_requested() { 183 let _ = socket.send(Message::Close(None)).await; 184 break; 185 } 186 } 187 } 188 } 189 let closed_subscriptions = self.close_all_subscriptions(); 190 metrics.record_subscriptions_closed(closed_subscriptions); 191 metrics.record_session_closed(); 192 metrics.record_event_bus_receivers(metrics.event_bus_receivers_current().saturating_sub(1)); 193 logging::log_websocket_session_closed( 194 self.connection_id, 195 self.peer_ip, 196 closed_subscriptions, 197 ); 198 } 199 200 async fn handle_event_receive_result( 201 &mut self, 202 result: Result<tangle_groups::StoreOffset, TangleEventReceiveError>, 203 ) -> TangleSessionControl { 204 match result { 205 Ok(offset) => self.handle_event_offset(offset).await, 206 Err(TangleEventReceiveError::Lagged(skipped)) => { 207 self.runtime.metrics().record_event_bus_lagged(skipped); 208 TangleSessionControl::Close(event_stream_lag_close_message()) 209 } 210 Err(TangleEventReceiveError::Closed) => TangleSessionControl::Stop, 211 Err(TangleEventReceiveError::Empty) => TangleSessionControl::Continue, 212 } 213 } 214 215 async fn handle_event_offset( 216 &mut self, 217 offset: tangle_groups::StoreOffset, 218 ) -> TangleSessionControl { 219 let runtime = self.runtime.clone(); 220 let auth = self.auth.clone(); 221 let replies = match runtime 222 .fanout_event_offset(offset, &mut self.subscriptions, &auth) 223 .await 224 { 225 Ok(replies) => replies, 226 Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()], 227 }; 228 for reply in replies { 229 if let Err(control) = self.enqueue_relay_message(reply) { 230 return control; 231 } 232 } 233 TangleSessionControl::Continue 234 } 235 236 async fn handle_incoming_message(&mut self, message: Message) -> TangleSessionControl { 237 match message { 238 Message::Text(raw) => self.dispatch_text(raw.as_str()).await, 239 Message::Binary(_) => self 240 .enqueue_relay_message( 241 RelayMessage::Notice("invalid: client message must be a text frame".to_owned()) 242 .into(), 243 ) 244 .map(|_| TangleSessionControl::Continue) 245 .unwrap_or_else(|control| control), 246 Message::Ping(_) | Message::Pong(_) => TangleSessionControl::Continue, 247 Message::Close(_) => TangleSessionControl::Stop, 248 } 249 } 250 251 fn issue_auth_challenge(&mut self) -> bool { 252 let message = generate_auth_challenge() 253 .and_then(|challenge| { 254 self.auth 255 .issue_challenge(challenge, current_unix_timestamp()) 256 }) 257 .unwrap_or_else(|error| RelayMessage::Notice(error.prefixed_message())); 258 self.send_relay_message(message.into()).is_ok() 259 } 260 261 async fn dispatch_text(&mut self, raw: &str) -> TangleSessionControl { 262 if raw.len() > self.limits.max_message_length() { 263 return self 264 .enqueue_relay_message( 265 RelayMessage::Notice(format!( 266 "invalid: client message length exceeds runtime max_message_length {}", 267 self.limits.max_message_length() 268 )) 269 .into(), 270 ) 271 .map(|_| TangleSessionControl::Continue) 272 .unwrap_or_else(|control| control); 273 } 274 let replies = match parse_runtime_client_message(raw) { 275 Ok(message) => match self.handle_client_message(message).await { 276 Ok(replies) => replies, 277 Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()], 278 }, 279 Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}")).into()], 280 }; 281 for reply in replies { 282 if let Err(control) = self.enqueue_relay_message(reply) { 283 return control; 284 } 285 } 286 TangleSessionControl::Continue 287 } 288 289 async fn handle_client_message( 290 &mut self, 291 message: RuntimeClientMessage, 292 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 293 match message { 294 RuntimeClientMessage::Req { 295 subscription_id, 296 filters, 297 search_present, 298 } => { 299 self.handle_req(subscription_id, filters, search_present) 300 .await 301 } 302 RuntimeClientMessage::Count { 303 subscription_id, 304 filters, 305 search_present, 306 } => { 307 let context = self.client_rate_limit_context(); 308 self.runtime 309 .handle_client_message_with_rate_limit_context( 310 RuntimeClientMessage::Count { 311 subscription_id, 312 filters, 313 search_present, 314 }, 315 &mut self.auth, 316 context, 317 current_unix_timestamp(), 318 ) 319 .await 320 } 321 RuntimeClientMessage::Close(subscription_id) => { 322 let metrics = self.runtime.metrics(); 323 metrics.record_client_message(TangleClientMessageMetricKind::Close); 324 self.limits 325 .base_relay_limits() 326 .validate_subscription_id(&subscription_id)?; 327 if self.subscriptions.close(&subscription_id) == CloseResult::Closed { 328 self.subscription_permits.remove(&subscription_id); 329 metrics.record_subscriptions_closed(1); 330 } 331 Ok(Vec::new()) 332 } 333 message => { 334 let context = self.client_rate_limit_context(); 335 self.runtime 336 .handle_client_message_with_rate_limit_context( 337 message, 338 &mut self.auth, 339 context, 340 current_unix_timestamp(), 341 ) 342 .await 343 } 344 } 345 } 346 347 async fn handle_req( 348 &mut self, 349 subscription_id: SubscriptionId, 350 filters: Vec<PocketOwnedFilter>, 351 search_present: bool, 352 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 353 let metrics = self.runtime.metrics(); 354 metrics.record_client_message(TangleClientMessageMetricKind::Req); 355 self.limits 356 .base_relay_limits() 357 .validate_subscription_id(&subscription_id)?; 358 self.limits 359 .base_relay_limits() 360 .validate_pocket_filters(&filters)?; 361 if let Some(message) = 362 BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) 363 { 364 return Ok(vec![message.into()]); 365 } 366 if let Some(message) = self 367 .runtime 368 .rate_limit_req_pocket( 369 &subscription_id, 370 &filters, 371 &self.auth, 372 self.client_rate_limit_context(), 373 current_unix_timestamp(), 374 ) 375 .await 376 { 377 return Ok(vec![message.into()]); 378 } 379 let should_subscribe = !pocket_filters_are_complete(&filters); 380 let already_subscribed = self.subscriptions.contains(&subscription_id); 381 if should_subscribe { 382 self.subscriptions 383 .ensure_can_subscribe(&subscription_id, &filters)?; 384 } 385 let report = self 386 .runtime 387 .query_req_with_auth_report( 388 subscription_id.clone(), 389 filters.clone(), 390 search_present, 391 &self.auth, 392 ) 393 .await?; 394 let closes_subscription = report.group_read_denied(); 395 let replies = report.into_messages(); 396 if should_subscribe && !closes_subscription { 397 let host_permit = if already_subscribed { 398 None 399 } else { 400 self.resource_limiter 401 .as_ref() 402 .map(|resources| resources.try_open_subscriptions(1)) 403 .transpose()? 404 }; 405 self.subscriptions 406 .subscribe(subscription_id.clone(), filters)?; 407 if let Some(permit) = host_permit { 408 self.subscription_permits 409 .insert(subscription_id.clone(), permit); 410 } 411 metrics.record_subscription_opened(); 412 logging::log_subscription_opened(self.connection_id, &subscription_id); 413 } 414 Ok(replies) 415 } 416 417 fn close_all_subscriptions(&mut self) -> usize { 418 let closed = self.subscriptions.close_all(); 419 self.subscription_permits.clear(); 420 closed 421 } 422 423 fn client_rate_limit_context(&self) -> TangleClientRateLimitContext { 424 TangleClientRateLimitContext::new(self.peer_ip, Some(self.connection_id)) 425 } 426 427 fn send_relay_message(&self, message: RuntimeRelayMessage) -> Result<(), TangleSessionControl> { 428 let text = message 429 .encode() 430 .map_err(|_| TangleSessionControl::Close(outbound_encode_close_message()))?; 431 self.outbound 432 .try_send(Message::Text(text.into())) 433 .map_err(|error| self.outbound_queue_error_control(error)) 434 } 435 436 fn enqueue_relay_message( 437 &self, 438 message: RuntimeRelayMessage, 439 ) -> Result<(), TangleSessionControl> { 440 self.send_relay_message(message) 441 } 442 443 fn outbound_queue_error_control( 444 &self, 445 error: TangleOutboundQueueError, 446 ) -> TangleSessionControl { 447 match error { 448 TangleOutboundQueueError::Full => { 449 self.runtime.metrics().record_outbound_queue_full_close(); 450 TangleSessionControl::Close(outbound_queue_full_close_message()) 451 } 452 TangleOutboundQueueError::Closed => TangleSessionControl::Stop, 453 } 454 } 455 } 456 457 #[derive(Debug, Clone, PartialEq, Eq)] 458 enum TangleSessionControl { 459 Continue, 460 Close(Message), 461 Stop, 462 } 463 464 fn event_stream_lag_close_message() -> Message { 465 Message::Close(Some(CloseFrame { 466 code: 1008, 467 reason: Utf8Bytes::from_static("event stream lagged; reconnect required"), 468 })) 469 } 470 471 fn outbound_queue_full_close_message() -> Message { 472 Message::Close(Some(CloseFrame { 473 code: 1013, 474 reason: Utf8Bytes::from_static("outbound queue full; reconnect required"), 475 })) 476 } 477 478 fn outbound_encode_close_message() -> Message { 479 Message::Close(Some(CloseFrame { 480 code: 1011, 481 reason: Utf8Bytes::from_static("outbound relay message encode failed"), 482 })) 483 } 484 485 #[derive(Debug, Clone)] 486 pub struct TangleOutboundSender { 487 sender: mpsc::Sender<Message>, 488 capacity: usize, 489 } 490 491 impl TangleOutboundSender { 492 pub fn capacity(&self) -> usize { 493 self.capacity 494 } 495 496 pub fn try_send(&self, message: Message) -> Result<(), TangleOutboundQueueError> { 497 self.sender.try_send(message).map_err(Into::into) 498 } 499 } 500 501 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 502 pub enum TangleOutboundQueueError { 503 Full, 504 Closed, 505 } 506 507 impl From<mpsc::error::TrySendError<Message>> for TangleOutboundQueueError { 508 fn from(error: mpsc::error::TrySendError<Message>) -> Self { 509 match error { 510 mpsc::error::TrySendError::Full(_) => Self::Full, 511 mpsc::error::TrySendError::Closed(_) => Self::Closed, 512 } 513 } 514 } 515 516 fn current_unix_timestamp() -> UnixTimestamp { 517 UnixTimestamp::new( 518 SystemTime::now() 519 .duration_since(UNIX_EPOCH) 520 .map(|duration| duration.as_secs()) 521 .unwrap_or(0), 522 ) 523 } 524 525 fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool { 526 !filters.is_empty() && filters.iter().all(|filter| filter.completes()) 527 } 528 529 #[cfg(test)] 530 impl TangleWebSocketSession { 531 async fn handle_protocol_client_message_for_test( 532 &mut self, 533 message: tangle_protocol::ClientMessage, 534 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 535 let messages = self 536 .handle_client_message(protocol_client_message_to_runtime_for_session_test( 537 message, 538 )?) 539 .await?; 540 crate::relay::outbound::protocol_messages_for_test(messages) 541 } 542 } 543 544 #[cfg(test)] 545 fn protocol_client_message_to_runtime_for_session_test( 546 message: tangle_protocol::ClientMessage, 547 ) -> Result<RuntimeClientMessage, BaseRelayError> { 548 match message { 549 tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event( 550 crate::pocket_conversion::tangle_event_to_pocket(&event)?, 551 )), 552 tangle_protocol::ClientMessage::Req { 553 subscription_id, 554 filters, 555 } => Ok(RuntimeClientMessage::Req { 556 subscription_id, 557 search_present: filters.iter().any(|filter| filter.search().is_some()), 558 filters: filters 559 .iter() 560 .map(crate::pocket_conversion::tangle_filter_to_pocket) 561 .collect::<Result<Vec<_>, _>>()?, 562 }), 563 tangle_protocol::ClientMessage::Count { 564 subscription_id, 565 filters, 566 } => Ok(RuntimeClientMessage::Count { 567 subscription_id, 568 search_present: filters.iter().any(|filter| filter.search().is_some()), 569 filters: filters 570 .iter() 571 .map(crate::pocket_conversion::tangle_filter_to_pocket) 572 .collect::<Result<Vec<_>, _>>()?, 573 }), 574 tangle_protocol::ClientMessage::Close(subscription_id) => { 575 Ok(RuntimeClientMessage::Close(subscription_id)) 576 } 577 tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth( 578 crate::pocket_conversion::tangle_event_to_pocket(&event)?, 579 )), 580 tangle_protocol::ClientMessage::NegOpen { 581 subscription_id, 582 filter, 583 message, 584 } => Ok(RuntimeClientMessage::NegOpen { 585 subscription_id, 586 filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?, 587 message, 588 }), 589 tangle_protocol::ClientMessage::NegMsg { 590 subscription_id, 591 message, 592 } => Ok(RuntimeClientMessage::NegMsg { 593 subscription_id, 594 message, 595 }), 596 tangle_protocol::ClientMessage::NegClose(subscription_id) => { 597 Ok(RuntimeClientMessage::NegClose(subscription_id)) 598 } 599 } 600 } 601 602 #[cfg(test)] 603 mod tests { 604 use super::{ 605 TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession, 606 current_unix_timestamp, event_stream_lag_close_message, outbound_queue_full_close_message, 607 }; 608 use crate::{ 609 config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, 610 errors::BaseRelayError, 611 event_bus::TangleEventReceiver, 612 rate_limits::{TangleRateLimitKey, TangleRateLimitScope}, 613 relay::core::{BaseRelayLimitSettings, BaseRelayLimits}, 614 runtime::{RelayRuntime, RelayRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal}, 615 }; 616 use axum::extract::ws::Message; 617 use serde_json::json; 618 use std::path::{Path, PathBuf}; 619 use tangle_crypto::RelaySigner; 620 use tangle_groups::{KIND_GROUP_CREATE_GROUP, StoreOffset}; 621 use tangle_protocol::{ 622 ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, 623 SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, 624 }; 625 use tangle_store_pocket::{ 626 PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime, 627 }; 628 use tangle_test_support::FixtureKey; 629 630 #[test] 631 fn websocket_session_records_connection_time() { 632 let before = std::time::Instant::now(); 633 let shutdown = TangleShutdownSignal::new(); 634 let (runtime, auth, events) = session_runtime("records-connection-time"); 635 let session = TangleWebSocketSession::new( 636 session_limits(8), 637 shutdown.subscribe(), 638 runtime, 639 auth, 640 events, 641 ) 642 .expect("session"); 643 644 assert!(session.connected_at() >= before); 645 } 646 647 #[test] 648 fn websocket_session_limit_config_rejects_zero_outbound_capacity() { 649 assert!(session_limits_result(0).is_err()); 650 } 651 652 #[test] 653 fn websocket_session_observes_shutdown_request() { 654 let shutdown = TangleShutdownSignal::new(); 655 let (runtime, auth, events) = session_runtime("observes-shutdown"); 656 let session = TangleWebSocketSession::new( 657 session_limits(8), 658 shutdown.subscribe(), 659 runtime, 660 auth, 661 events, 662 ) 663 .expect("session"); 664 665 assert!(!session.shutdown_requested()); 666 667 shutdown.request_shutdown(); 668 669 assert!(session.shutdown_requested()); 670 } 671 672 #[tokio::test] 673 async fn websocket_session_rejects_overlong_text_before_parsing() { 674 let shutdown = TangleShutdownSignal::new(); 675 let (runtime, auth, events) = session_runtime("overlong-text"); 676 let mut session = TangleWebSocketSession::new( 677 session_limits_with_message_length(8, 8), 678 shutdown.subscribe(), 679 runtime, 680 auth, 681 events, 682 ) 683 .expect("session"); 684 685 assert_eq!( 686 session.dispatch_text("123456789").await, 687 TangleSessionControl::Continue 688 ); 689 let message = session.outbound_receiver.try_recv().expect("notice"); 690 let Message::Text(text) = message else { 691 panic!("expected text notice") 692 }; 693 assert_eq!( 694 text.as_str(), 695 "[\"NOTICE\",\"invalid: client message length exceeds runtime max_message_length 8\"]" 696 ); 697 } 698 699 #[tokio::test] 700 async fn websocket_session_preserves_chorus_malformed_message_parity() { 701 let shutdown = TangleShutdownSignal::new(); 702 let (runtime, auth, events) = session_runtime("chorus-malformed-parity"); 703 let mut session = TangleWebSocketSession::new( 704 session_limits(16), 705 shutdown.subscribe(), 706 runtime, 707 auth, 708 events, 709 ) 710 .expect("session"); 711 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "parity") 712 .expect("event"); 713 for (raw, expected) in [ 714 ("{", None), 715 ( 716 "[\"NOTICE\",\"client\"]", 717 Some("[\"NOTICE\",\"invalid: client message command `NOTICE` is unsupported\"]"), 718 ), 719 ( 720 "[\"NEG-OPEN\",\"sub\",{}]", 721 Some( 722 "[\"NOTICE\",\"invalid: NEG-OPEN client message must contain a subscription id, filter, and message\"]", 723 ), 724 ), 725 ( 726 "[\"REQ\"]", 727 Some( 728 "[\"NOTICE\",\"invalid: REQ client message must contain a subscription id and filters\"]", 729 ), 730 ), 731 ( 732 "[\"CLOSE\",1]", 733 Some("[\"NOTICE\",\"invalid: CLOSE subscription id must be a string\"]"), 734 ), 735 ] { 736 assert_eq!( 737 session.dispatch_text(raw).await, 738 TangleSessionControl::Continue 739 ); 740 let text = take_outbound_text(&mut session); 741 if let Some(expected) = expected { 742 assert_eq!(text, expected); 743 } else { 744 assert!(text.starts_with("[\"NOTICE\",\"invalid: client message JSON is invalid:")); 745 } 746 } 747 748 assert_eq!( 749 session 750 .dispatch_text("[\"REQ\",\"sub-search\",{\"search\":\"carrots\"}]") 751 .await, 752 TangleSessionControl::Continue 753 ); 754 assert_eq!( 755 take_outbound_text(&mut session), 756 "[\"CLOSED\",\"sub-search\",\"unsupported: search filters are not supported\"]" 757 ); 758 759 assert_eq!( 760 session 761 .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string()) 762 .await, 763 TangleSessionControl::Continue 764 ); 765 assert_eq!( 766 take_outbound_text(&mut session), 767 format!("[\"OK\",\"{}\",true,\"\"]", event.id().as_str()) 768 ); 769 } 770 771 #[tokio::test] 772 async fn websocket_session_returns_disabled_negentropy_errors() { 773 let shutdown = TangleShutdownSignal::new(); 774 let (runtime, auth, events) = session_runtime("disabled-negentropy"); 775 let mut session = TangleWebSocketSession::new( 776 session_limits(16), 777 shutdown.subscribe(), 778 runtime, 779 auth, 780 events, 781 ) 782 .expect("session"); 783 784 assert_eq!( 785 session 786 .dispatch_text("[\"NEG-OPEN\",\"neg-sub\",{\"kinds\":[1]},\"00\"]") 787 .await, 788 TangleSessionControl::Continue 789 ); 790 assert_eq!( 791 take_outbound_text(&mut session), 792 "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]" 793 ); 794 assert_eq!( 795 session 796 .dispatch_text("[\"NEG-MSG\",\"neg-sub\",\"\"]") 797 .await, 798 TangleSessionControl::Continue 799 ); 800 assert_eq!( 801 take_outbound_text(&mut session), 802 "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]" 803 ); 804 assert_eq!( 805 session.dispatch_text("[\"NEG-CLOSE\",\"neg-sub\"]").await, 806 TangleSessionControl::Continue 807 ); 808 assert!(session.outbound_receiver.try_recv().is_err()); 809 } 810 811 #[tokio::test] 812 async fn websocket_session_disabled_negentropy_privacy_response_omits_filter_material() { 813 let shutdown = TangleShutdownSignal::new(); 814 let (runtime, auth, events) = session_runtime("disabled-negentropy-privacy"); 815 let mut session = TangleWebSocketSession::new( 816 session_limits(16), 817 shutdown.subscribe(), 818 runtime, 819 auth, 820 events, 821 ) 822 .expect("session"); 823 let hidden_event_id = "a".repeat(64); 824 let private_group_id = "private-group-alpha"; 825 let raw = json!([ 826 "NEG-OPEN", 827 "neg-private", 828 {"ids": [hidden_event_id], "#h": [private_group_id]}, 829 "00" 830 ]) 831 .to_string(); 832 833 assert_eq!( 834 session.dispatch_text(&raw).await, 835 TangleSessionControl::Continue 836 ); 837 let text = take_outbound_text(&mut session); 838 839 assert_eq!( 840 text, 841 "[\"NEG-ERR\",\"neg-private\",\"blocked: Negentropy sync is disabled\"]" 842 ); 843 assert!(!text.contains(private_group_id)); 844 assert!(!text.contains(&hidden_event_id)); 845 assert!(!text.contains("inventory")); 846 assert!(!text.contains("#h")); 847 } 848 849 #[tokio::test] 850 async fn websocket_session_scopes_subscriptions_per_connection() { 851 let shutdown = TangleShutdownSignal::new(); 852 let root = temp_root("connection-scope"); 853 let _ = std::fs::remove_dir_all(&root); 854 let runtime = 855 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); 856 let metrics = runtime.metrics(); 857 let auth_a = runtime.auth_state().await.expect("auth a"); 858 let auth_b = runtime.auth_state().await.expect("auth b"); 859 let events_a = runtime.subscribe_events().await; 860 let events_b = runtime.subscribe_events().await; 861 let mut first = TangleWebSocketSession::new( 862 session_limits(8), 863 shutdown.subscribe(), 864 runtime.clone(), 865 auth_a, 866 events_a, 867 ) 868 .expect("first"); 869 let mut second = TangleWebSocketSession::new( 870 session_limits(8), 871 shutdown.subscribe(), 872 runtime.clone(), 873 auth_b, 874 events_b, 875 ) 876 .expect("second"); 877 let subscription_id = SubscriptionId::new("shared").expect("subscription"); 878 879 assert_eq!( 880 first 881 .handle_protocol_client_message_for_test(req(subscription_id.clone())) 882 .await 883 .expect("first req"), 884 vec![RelayMessage::Eose(subscription_id.clone())] 885 ); 886 assert_eq!( 887 second 888 .handle_protocol_client_message_for_test(req(subscription_id.clone())) 889 .await 890 .expect("second req"), 891 vec![RelayMessage::Eose(subscription_id.clone())] 892 ); 893 assert_eq!(first.active_subscription_count(), 1); 894 assert_eq!(second.active_subscription_count(), 1); 895 896 assert_eq!( 897 first 898 .handle_protocol_client_message_for_test(ClientMessage::Close( 899 subscription_id.clone() 900 )) 901 .await 902 .expect("close first"), 903 Vec::<RelayMessage>::new() 904 ); 905 assert_eq!(first.active_subscription_count(), 0); 906 assert_eq!(second.active_subscription_count(), 1); 907 908 assert_eq!( 909 second 910 .handle_protocol_client_message_for_test(req(subscription_id.clone())) 911 .await 912 .expect("replace second"), 913 vec![RelayMessage::Eose(subscription_id.clone())] 914 ); 915 assert_eq!(first.active_subscription_count(), 0); 916 assert_eq!(second.active_subscription_count(), 1); 917 918 assert_eq!( 919 second 920 .handle_protocol_client_message_for_test(ClientMessage::Close(subscription_id)) 921 .await 922 .expect("close second"), 923 Vec::<RelayMessage>::new() 924 ); 925 assert_eq!(second.active_subscription_count(), 0); 926 let snapshot = metrics.snapshot(); 927 assert_eq!(snapshot.client_messages(), 5); 928 assert_eq!(snapshot.req_messages(), 3); 929 assert_eq!(snapshot.close_messages(), 2); 930 assert_eq!(snapshot.opened_subscriptions(), 3); 931 assert_eq!(snapshot.closed_subscriptions(), 2); 932 933 let _ = std::fs::remove_dir_all(root); 934 } 935 936 #[tokio::test] 937 async fn websocket_session_live_fanout_uses_current_auth() { 938 let shutdown = TangleShutdownSignal::new(); 939 let root = temp_root("current-auth-live"); 940 let _ = std::fs::remove_dir_all(&root); 941 let runtime = RelayRuntimeHandle::new( 942 RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), 943 ); 944 let mut owner_auth = runtime.auth_state().await.expect("owner auth"); 945 owner_auth 946 .issue_challenge("owner-live", UnixTimestamp::new(100)) 947 .expect("owner challenge"); 948 let owner_auth_event = 949 tangle_v2_auth_event(FixtureKey::Owner, "owner-live", 120).expect("owner auth event"); 950 assert_eq!( 951 runtime 952 .handle_protocol_client_message_for_test( 953 ClientMessage::Auth(owner_auth_event.clone()), 954 &mut owner_auth, 955 UnixTimestamp::new(120) 956 ) 957 .await 958 .expect("owner auth"), 959 vec![RelayMessage::Ok { 960 event_id: owner_auth_event.id().clone(), 961 accepted: true, 962 message: String::new() 963 }] 964 ); 965 let create = tangle_v2_group_create_event(FixtureKey::Owner, "LiveFarm", 121, &["private"]) 966 .expect("create"); 967 assert_eq!( 968 runtime 969 .handle_protocol_client_message_for_test( 970 ClientMessage::Event(create.clone()), 971 &mut owner_auth, 972 UnixTimestamp::new(121) 973 ) 974 .await 975 .expect("create"), 976 vec![RelayMessage::Ok { 977 event_id: create.id().clone(), 978 accepted: true, 979 message: String::new() 980 }] 981 ); 982 let session_auth = runtime.auth_state().await.expect("session auth"); 983 let events = runtime.subscribe_events().await; 984 let mut session = TangleWebSocketSession::new( 985 session_limits(8), 986 shutdown.subscribe(), 987 runtime.clone(), 988 session_auth, 989 events, 990 ) 991 .expect("session"); 992 let subscription_id = SubscriptionId::new("current-auth-live").expect("subscription"); 993 994 assert_eq!( 995 session 996 .handle_protocol_client_message_for_test(ClientMessage::Req { 997 subscription_id: subscription_id.clone(), 998 filters: vec![ 999 filter_from_value(&json!({"kinds":[1], "#h":["LiveFarm"]})) 1000 .expect("filter") 1001 ], 1002 }) 1003 .await 1004 .expect("req"), 1005 vec![RelayMessage::Eose(subscription_id.clone())] 1006 ); 1007 assert_eq!(session.active_subscription_count(), 1); 1008 let before_auth = 1009 tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 122, 1, "before auth") 1010 .expect("before auth"); 1011 let before_auth_id = before_auth.id().clone(); 1012 assert_eq!( 1013 runtime 1014 .handle_protocol_client_message_for_test( 1015 ClientMessage::Event(before_auth), 1016 &mut owner_auth, 1017 UnixTimestamp::new(122) 1018 ) 1019 .await 1020 .expect("before event"), 1021 vec![RelayMessage::Ok { 1022 event_id: before_auth_id, 1023 accepted: true, 1024 message: String::new() 1025 }] 1026 ); 1027 let offset = session.events.recv().await; 1028 assert_eq!( 1029 session.handle_event_receive_result(offset).await, 1030 TangleSessionControl::Continue 1031 ); 1032 assert!(session.outbound_receiver.try_recv().is_err()); 1033 1034 let session_now = current_unix_timestamp(); 1035 session 1036 .auth 1037 .issue_challenge("session-live", session_now) 1038 .expect("session challenge"); 1039 let session_auth_event = 1040 tangle_v2_auth_event(FixtureKey::Owner, "session-live", session_now.as_u64()) 1041 .expect("auth event"); 1042 assert_eq!( 1043 session 1044 .handle_protocol_client_message_for_test(ClientMessage::Auth( 1045 session_auth_event.clone() 1046 )) 1047 .await 1048 .expect("session auth"), 1049 vec![RelayMessage::Ok { 1050 event_id: session_auth_event.id().clone(), 1051 accepted: true, 1052 message: String::new() 1053 }] 1054 ); 1055 let after_auth = tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 132, 1, "after auth") 1056 .expect("after auth"); 1057 assert_eq!( 1058 runtime 1059 .handle_protocol_client_message_for_test( 1060 ClientMessage::Event(after_auth.clone()), 1061 &mut owner_auth, 1062 UnixTimestamp::new(132) 1063 ) 1064 .await 1065 .expect("after event"), 1066 vec![RelayMessage::Ok { 1067 event_id: after_auth.id().clone(), 1068 accepted: true, 1069 message: String::new() 1070 }] 1071 ); 1072 let offset = session.events.recv().await; 1073 assert_eq!( 1074 session.handle_event_receive_result(offset).await, 1075 TangleSessionControl::Continue 1076 ); 1077 assert_relay_message_text( 1078 &take_outbound_text(&mut session), 1079 RelayMessage::Event { 1080 subscription_id, 1081 event: after_auth, 1082 }, 1083 ); 1084 1085 let _ = std::fs::remove_dir_all(root); 1086 } 1087 1088 #[tokio::test] 1089 async fn websocket_session_complete_and_failed_reqs_do_not_subscribe() { 1090 let shutdown = TangleShutdownSignal::new(); 1091 let root = temp_root("complete-req-lifecycle"); 1092 let _ = std::fs::remove_dir_all(&root); 1093 let runtime = 1094 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); 1095 let mut auth = runtime.auth_state().await.expect("auth"); 1096 let events = runtime.subscribe_events().await; 1097 let mut session = TangleWebSocketSession::new( 1098 session_limits(8), 1099 shutdown.subscribe(), 1100 runtime.clone(), 1101 runtime.auth_state().await.expect("session auth"), 1102 events, 1103 ) 1104 .expect("session"); 1105 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "complete") 1106 .expect("event"); 1107 1108 assert_eq!( 1109 runtime 1110 .handle_protocol_client_message_for_test( 1111 ClientMessage::Event(event.clone()), 1112 &mut auth, 1113 UnixTimestamp::new(1_714_124_433) 1114 ) 1115 .await 1116 .expect("event"), 1117 vec![RelayMessage::Ok { 1118 event_id: event.id().clone(), 1119 accepted: true, 1120 message: String::new() 1121 }] 1122 ); 1123 let exact_id = SubscriptionId::new("exact-id").expect("subscription"); 1124 assert_eq!( 1125 session 1126 .handle_protocol_client_message_for_test(ClientMessage::Req { 1127 subscription_id: exact_id.clone(), 1128 filters: vec![ 1129 filter_from_value(&json!({"ids":[event.id().as_str()]})) 1130 .expect("exact filter") 1131 ], 1132 }) 1133 .await 1134 .expect("exact req"), 1135 vec![ 1136 RelayMessage::Event { 1137 subscription_id: exact_id.clone(), 1138 event: event.clone() 1139 }, 1140 RelayMessage::Eose(exact_id) 1141 ] 1142 ); 1143 assert_eq!(session.active_subscription_count(), 0); 1144 1145 let open = SubscriptionId::new("open").expect("subscription"); 1146 assert_eq!( 1147 session 1148 .handle_protocol_client_message_for_test(ClientMessage::Req { 1149 subscription_id: open.clone(), 1150 filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("open filter")], 1151 }) 1152 .await 1153 .expect("open req"), 1154 vec![ 1155 RelayMessage::Event { 1156 subscription_id: open.clone(), 1157 event 1158 }, 1159 RelayMessage::Eose(open.clone()) 1160 ] 1161 ); 1162 assert_eq!(session.active_subscription_count(), 1); 1163 1164 let search = SubscriptionId::new("search").expect("subscription"); 1165 assert_eq!( 1166 session 1167 .handle_protocol_client_message_for_test(ClientMessage::Req { 1168 subscription_id: search.clone(), 1169 filters: vec![ 1170 filter_from_value(&json!({"search":"carrots"})).expect("search filter") 1171 ], 1172 }) 1173 .await 1174 .expect("search req"), 1175 vec![RelayMessage::Closed { 1176 subscription_id: search, 1177 message: "unsupported: search filters are not supported".to_owned() 1178 }] 1179 ); 1180 assert_eq!(session.active_subscription_count(), 1); 1181 1182 let invalid = SubscriptionId::new("invalid").expect("subscription"); 1183 let invalid_result = session 1184 .handle_protocol_client_message_for_test(ClientMessage::Req { 1185 subscription_id: invalid, 1186 filters: vec![Filter::empty(); 11], 1187 }) 1188 .await; 1189 assert!(invalid_result.is_err()); 1190 assert_eq!(session.active_subscription_count(), 1); 1191 1192 let _ = std::fs::remove_dir_all(root); 1193 } 1194 1195 #[tokio::test] 1196 async fn websocket_session_redacted_initial_req_closes_without_live_subscription() { 1197 let shutdown = TangleShutdownSignal::new(); 1198 let root = temp_root("redacted-req-close"); 1199 let _ = std::fs::remove_dir_all(&root); 1200 let runtime = RelayRuntimeHandle::new( 1201 RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), 1202 ); 1203 let mut owner_auth = runtime.auth_state().await.expect("owner auth"); 1204 owner_auth 1205 .issue_challenge("owner-redacted", UnixTimestamp::new(100)) 1206 .expect("owner challenge"); 1207 let owner_auth_event = tangle_v2_auth_event(FixtureKey::Owner, "owner-redacted", 120) 1208 .expect("owner auth event"); 1209 assert_eq!( 1210 runtime 1211 .handle_protocol_client_message_for_test( 1212 ClientMessage::Auth(owner_auth_event.clone()), 1213 &mut owner_auth, 1214 UnixTimestamp::new(120) 1215 ) 1216 .await 1217 .expect("owner auth"), 1218 vec![RelayMessage::Ok { 1219 event_id: owner_auth_event.id().clone(), 1220 accepted: true, 1221 message: String::new() 1222 }] 1223 ); 1224 let create = 1225 tangle_v2_group_create_event(FixtureKey::Owner, "RedactedFarm", 121, &["private"]) 1226 .expect("create"); 1227 assert_eq!( 1228 runtime 1229 .handle_protocol_client_message_for_test( 1230 ClientMessage::Event(create.clone()), 1231 &mut owner_auth, 1232 UnixTimestamp::new(121) 1233 ) 1234 .await 1235 .expect("create"), 1236 vec![RelayMessage::Ok { 1237 event_id: create.id().clone(), 1238 accepted: true, 1239 message: String::new() 1240 }] 1241 ); 1242 let public_event = 1243 tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "public") 1244 .expect("public"); 1245 assert_eq!( 1246 runtime 1247 .handle_protocol_client_message_for_test( 1248 ClientMessage::Event(public_event.clone()), 1249 &mut owner_auth, 1250 UnixTimestamp::new(122) 1251 ) 1252 .await 1253 .expect("public event"), 1254 vec![RelayMessage::Ok { 1255 event_id: public_event.id().clone(), 1256 accepted: true, 1257 message: String::new() 1258 }] 1259 ); 1260 let private_event = 1261 tangle_v2_group_event(FixtureKey::Owner, "RedactedFarm", 123, 1, "private") 1262 .expect("private"); 1263 assert_eq!( 1264 runtime 1265 .handle_protocol_client_message_for_test( 1266 ClientMessage::Event(private_event.clone()), 1267 &mut owner_auth, 1268 UnixTimestamp::new(123) 1269 ) 1270 .await 1271 .expect("private event"), 1272 vec![RelayMessage::Ok { 1273 event_id: private_event.id().clone(), 1274 accepted: true, 1275 message: String::new() 1276 }] 1277 ); 1278 1279 let events = runtime.subscribe_events().await; 1280 let mut session = TangleWebSocketSession::new( 1281 session_limits(8), 1282 shutdown.subscribe(), 1283 runtime.clone(), 1284 runtime.auth_state().await.expect("session auth"), 1285 events, 1286 ) 1287 .expect("session"); 1288 let subscription_id = SubscriptionId::new("redacted-req").expect("subscription"); 1289 assert_eq!( 1290 session 1291 .handle_protocol_client_message_for_test(ClientMessage::Req { 1292 subscription_id: subscription_id.clone(), 1293 filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")], 1294 }) 1295 .await 1296 .expect("redacted req"), 1297 vec![ 1298 RelayMessage::Event { 1299 subscription_id: subscription_id.clone(), 1300 event: public_event 1301 }, 1302 RelayMessage::Closed { 1303 subscription_id, 1304 message: "auth-required: authentication required to read group events" 1305 .to_owned() 1306 } 1307 ] 1308 ); 1309 assert_eq!(session.active_subscription_count(), 0); 1310 1311 let _ = std::fs::remove_dir_all(root); 1312 } 1313 1314 #[tokio::test] 1315 async fn websocket_session_preserves_chorus_close_scope_parity() { 1316 let shutdown = TangleShutdownSignal::new(); 1317 let root = temp_root("chorus-close-scope-parity"); 1318 let _ = std::fs::remove_dir_all(&root); 1319 let runtime = 1320 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); 1321 let metrics = runtime.metrics(); 1322 let auth_a = runtime.auth_state().await.expect("auth a"); 1323 let auth_b = runtime.auth_state().await.expect("auth b"); 1324 let events_a = runtime.subscribe_events().await; 1325 let events_b = runtime.subscribe_events().await; 1326 let mut first = TangleWebSocketSession::new( 1327 session_limits(8), 1328 shutdown.subscribe(), 1329 runtime.clone(), 1330 auth_a, 1331 events_a, 1332 ) 1333 .expect("first"); 1334 let mut second = TangleWebSocketSession::new( 1335 session_limits(8), 1336 shutdown.subscribe(), 1337 runtime, 1338 auth_b, 1339 events_b, 1340 ) 1341 .expect("second"); 1342 let subscription_id = SubscriptionId::new("shared-close").expect("subscription"); 1343 let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string(); 1344 1345 assert_eq!( 1346 first.dispatch_text(&req_text).await, 1347 TangleSessionControl::Continue 1348 ); 1349 assert_eq!( 1350 take_outbound_text(&mut first), 1351 RelayMessage::Eose(subscription_id.clone()).encode() 1352 ); 1353 assert_eq!( 1354 second.dispatch_text(&req_text).await, 1355 TangleSessionControl::Continue 1356 ); 1357 assert_eq!( 1358 take_outbound_text(&mut second), 1359 RelayMessage::Eose(subscription_id.clone()).encode() 1360 ); 1361 assert_eq!(first.active_subscription_count(), 1); 1362 assert_eq!(second.active_subscription_count(), 1); 1363 1364 let close_text = json!(["CLOSE", subscription_id.as_str()]).to_string(); 1365 assert_eq!( 1366 first.dispatch_text(&close_text).await, 1367 TangleSessionControl::Continue 1368 ); 1369 assert!(first.outbound_receiver.try_recv().is_err()); 1370 assert_eq!( 1371 first.dispatch_text(&close_text).await, 1372 TangleSessionControl::Continue 1373 ); 1374 assert!(first.outbound_receiver.try_recv().is_err()); 1375 assert_eq!(first.active_subscription_count(), 0); 1376 assert_eq!(second.active_subscription_count(), 1); 1377 1378 let event = tangle_v2_event( 1379 FixtureKey::Member, 1380 1_714_124_433, 1381 1, 1382 Vec::new(), 1383 "close scope parity", 1384 ) 1385 .expect("event"); 1386 assert_eq!( 1387 first 1388 .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string()) 1389 .await, 1390 TangleSessionControl::Continue 1391 ); 1392 assert_eq!( 1393 take_outbound_text(&mut first), 1394 RelayMessage::Ok { 1395 event_id: event.id().clone(), 1396 accepted: true, 1397 message: String::new() 1398 } 1399 .encode() 1400 ); 1401 1402 let first_offset = first.events.recv().await; 1403 let second_offset = second.events.recv().await; 1404 assert_eq!( 1405 first.handle_event_receive_result(first_offset).await, 1406 TangleSessionControl::Continue 1407 ); 1408 assert!(first.outbound_receiver.try_recv().is_err()); 1409 assert_eq!( 1410 second.handle_event_receive_result(second_offset).await, 1411 TangleSessionControl::Continue 1412 ); 1413 assert_relay_message_text( 1414 &take_outbound_text(&mut second), 1415 RelayMessage::Event { 1416 subscription_id: subscription_id.clone(), 1417 event, 1418 }, 1419 ); 1420 let snapshot = metrics.snapshot(); 1421 assert_eq!(snapshot.client_messages(), 5); 1422 assert_eq!(snapshot.event_messages(), 1); 1423 assert_eq!(snapshot.req_messages(), 2); 1424 assert_eq!(snapshot.close_messages(), 2); 1425 assert_eq!(snapshot.opened_subscriptions(), 2); 1426 assert_eq!(snapshot.closed_subscriptions(), 1); 1427 1428 let _ = std::fs::remove_dir_all(root); 1429 } 1430 1431 #[tokio::test] 1432 async fn websocket_session_rate_limited_req_does_not_subscribe() { 1433 let shutdown = TangleShutdownSignal::new(); 1434 let root = temp_root("rate-limited-req"); 1435 let _ = std::fs::remove_dir_all(&root); 1436 let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime"); 1437 let rule = runtime.config().rate_limits().req().per_connection(); 1438 let runtime = RelayRuntimeHandle::new(runtime); 1439 let auth = runtime.auth_state().await.expect("auth"); 1440 let events = runtime.subscribe_events().await; 1441 let now = current_unix_timestamp(); 1442 let mut session = TangleWebSocketSession::new( 1443 session_limits(8), 1444 shutdown.subscribe(), 1445 runtime.clone(), 1446 auth, 1447 events, 1448 ) 1449 .expect("session"); 1450 let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, session.connection_id); 1451 let limiter = runtime.rate_limiter().await; 1452 for _ in 0..rule.max_hits() { 1453 limiter.record(key.clone(), rule, now); 1454 } 1455 let subscription_id = SubscriptionId::new("limited").expect("subscription"); 1456 1457 assert_eq!( 1458 session 1459 .handle_protocol_client_message_for_test(ClientMessage::Req { 1460 subscription_id: subscription_id.clone(), 1461 filters: vec![ 1462 filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter") 1463 ] 1464 }) 1465 .await 1466 .expect("req"), 1467 vec![RelayMessage::Closed { 1468 subscription_id, 1469 message: format!( 1470 "rate-limited: req connection rate limit exceeded until {}", 1471 now.as_u64() + 60 1472 ) 1473 }] 1474 ); 1475 assert_eq!(session.active_subscription_count(), 0); 1476 let snapshot = runtime.metrics().snapshot(); 1477 assert_eq!(snapshot.client_messages(), 1); 1478 assert_eq!(snapshot.req_messages(), 1); 1479 assert_eq!(snapshot.opened_subscriptions(), 0); 1480 assert_eq!(snapshot.rate_limit_rejections(), 1); 1481 1482 let _ = std::fs::remove_dir_all(root); 1483 } 1484 1485 #[tokio::test] 1486 async fn websocket_session_closes_when_event_receiver_lags() { 1487 let shutdown = TangleShutdownSignal::new(); 1488 let root = temp_root("event-receiver-lag"); 1489 let _ = std::fs::remove_dir_all(&root); 1490 let runtime = 1491 RelayRuntime::open(runtime_config_with_outbound_queue(&root, 1)).expect("runtime"); 1492 let auth = runtime.auth_state().expect("auth"); 1493 let events = runtime.event_bus().subscribe(); 1494 assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1); 1495 assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1); 1496 let runtime = RelayRuntimeHandle::new(runtime); 1497 let metrics = runtime.metrics(); 1498 let mut session = TangleWebSocketSession::new( 1499 session_limits(1), 1500 shutdown.subscribe(), 1501 runtime, 1502 auth, 1503 events, 1504 ) 1505 .expect("session"); 1506 let event = session.events.recv().await; 1507 1508 assert_eq!( 1509 session.handle_event_receive_result(event).await, 1510 TangleSessionControl::Close(event_stream_lag_close_message()) 1511 ); 1512 assert_eq!(metrics.event_bus_lagged_receivers(), 1); 1513 assert_eq!(metrics.event_bus_lagged_offsets(), 1); 1514 1515 let _ = std::fs::remove_dir_all(root); 1516 } 1517 1518 #[tokio::test] 1519 async fn websocket_session_preserves_chorus_live_fanout_backpressure_parity() { 1520 let shutdown = TangleShutdownSignal::new(); 1521 let live_root = temp_root("chorus-live-fanout-parity"); 1522 let _ = std::fs::remove_dir_all(&live_root); 1523 let runtime = RelayRuntimeHandle::new( 1524 RelayRuntime::open(runtime_config_with_outbound_queue(&live_root, 1)).expect("runtime"), 1525 ); 1526 let metrics = runtime.metrics(); 1527 let auth = runtime.auth_state().await.expect("auth"); 1528 let events = runtime.subscribe_events().await; 1529 let mut session = TangleWebSocketSession::new( 1530 session_limits(1), 1531 shutdown.subscribe(), 1532 runtime, 1533 auth, 1534 events, 1535 ) 1536 .expect("session"); 1537 let subscription_id = SubscriptionId::new("chorus-live").expect("subscription"); 1538 let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string(); 1539 1540 assert_eq!( 1541 session.dispatch_text(&req_text).await, 1542 TangleSessionControl::Continue 1543 ); 1544 assert_eq!( 1545 take_outbound_text(&mut session), 1546 RelayMessage::Eose(subscription_id.clone()).encode() 1547 ); 1548 for index in 0..3 { 1549 let content = format!("chorus live {index}"); 1550 let event = tangle_v2_event( 1551 FixtureKey::Member, 1552 1_714_124_433 + index, 1553 1, 1554 Vec::new(), 1555 &content, 1556 ) 1557 .expect("event"); 1558 assert_eq!( 1559 session 1560 .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string()) 1561 .await, 1562 TangleSessionControl::Continue 1563 ); 1564 assert_eq!( 1565 take_outbound_text(&mut session), 1566 RelayMessage::Ok { 1567 event_id: event.id().clone(), 1568 accepted: true, 1569 message: String::new() 1570 } 1571 .encode() 1572 ); 1573 let offset = session.events.recv().await; 1574 assert_eq!( 1575 session.handle_event_receive_result(offset).await, 1576 TangleSessionControl::Continue 1577 ); 1578 assert_relay_message_text( 1579 &take_outbound_text(&mut session), 1580 RelayMessage::Event { 1581 subscription_id: subscription_id.clone(), 1582 event, 1583 }, 1584 ); 1585 assert_eq!(session.active_subscription_count(), 1); 1586 } 1587 assert_eq!(metrics.outbound_queue_full_closes(), 0); 1588 assert_eq!(metrics.event_bus_lagged_receivers(), 0); 1589 assert_eq!(metrics.event_bus_lagged_offsets(), 0); 1590 let _ = std::fs::remove_dir_all(live_root); 1591 1592 let lag_root = temp_root("chorus-live-lag-parity"); 1593 let _ = std::fs::remove_dir_all(&lag_root); 1594 let runtime = 1595 RelayRuntime::open(runtime_config_with_outbound_queue(&lag_root, 1)).expect("runtime"); 1596 let auth = runtime.auth_state().expect("auth"); 1597 let events = runtime.event_bus().subscribe(); 1598 assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1); 1599 assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1); 1600 let runtime = RelayRuntimeHandle::new(runtime); 1601 let metrics = runtime.metrics(); 1602 let mut lagged = TangleWebSocketSession::new( 1603 session_limits(1), 1604 shutdown.subscribe(), 1605 runtime, 1606 auth, 1607 events, 1608 ) 1609 .expect("lagged"); 1610 let event = lagged.events.recv().await; 1611 assert_eq!( 1612 lagged.handle_event_receive_result(event).await, 1613 TangleSessionControl::Close(event_stream_lag_close_message()) 1614 ); 1615 assert_eq!(metrics.event_bus_lagged_receivers(), 1); 1616 assert_eq!(metrics.event_bus_lagged_offsets(), 1); 1617 let _ = std::fs::remove_dir_all(lag_root); 1618 1619 let (runtime, auth, events) = session_runtime("chorus-outbound-full-parity"); 1620 let metrics = runtime.metrics(); 1621 let mut blocked = TangleWebSocketSession::new( 1622 session_limits(1), 1623 shutdown.subscribe(), 1624 runtime, 1625 auth, 1626 events, 1627 ) 1628 .expect("blocked"); 1629 blocked 1630 .outbound() 1631 .try_send(Message::Text("blocked".into())) 1632 .expect("fill queue"); 1633 assert_eq!( 1634 blocked.dispatch_text("{").await, 1635 TangleSessionControl::Close(outbound_queue_full_close_message()) 1636 ); 1637 assert_eq!(metrics.outbound_queue_full_closes(), 1); 1638 } 1639 1640 #[test] 1641 fn outbound_queue_is_bounded() { 1642 let shutdown = TangleShutdownSignal::new(); 1643 let (runtime, auth, events) = session_runtime("outbound-queue"); 1644 let session = TangleWebSocketSession::new( 1645 session_limits(1), 1646 shutdown.subscribe(), 1647 runtime, 1648 auth, 1649 events, 1650 ) 1651 .expect("session"); 1652 let outbound = session.outbound(); 1653 1654 assert_eq!(outbound.capacity(), 1); 1655 outbound 1656 .try_send(Message::Text("first".into())) 1657 .expect("first"); 1658 assert_eq!( 1659 outbound 1660 .try_send(Message::Text("second".into())) 1661 .expect_err("full"), 1662 TangleOutboundQueueError::Full 1663 ); 1664 } 1665 1666 #[tokio::test] 1667 async fn websocket_session_closes_when_outbound_queue_is_full() { 1668 let shutdown = TangleShutdownSignal::new(); 1669 let (runtime, auth, events) = session_runtime("outbound-queue-full-close"); 1670 let metrics = runtime.metrics(); 1671 let mut session = TangleWebSocketSession::new( 1672 session_limits(1), 1673 shutdown.subscribe(), 1674 runtime, 1675 auth, 1676 events, 1677 ) 1678 .expect("session"); 1679 session 1680 .outbound() 1681 .try_send(Message::Text("blocked".into())) 1682 .expect("fill queue"); 1683 1684 assert_eq!( 1685 session.dispatch_text("{").await, 1686 TangleSessionControl::Close(outbound_queue_full_close_message()) 1687 ); 1688 assert_eq!(metrics.outbound_queue_full_closes(), 1); 1689 } 1690 1691 fn tangle_v2_event( 1692 key: FixtureKey, 1693 created_at: u64, 1694 kind: u64, 1695 tags: Vec<Tag>, 1696 content: &str, 1697 ) -> Result<Event, String> { 1698 let event = session_pocket_event(key, created_at, kind, tags, content); 1699 session_pocket_event_to_protocol(&event) 1700 } 1701 1702 fn tangle_v2_auth_event( 1703 key: FixtureKey, 1704 challenge: &str, 1705 created_at: u64, 1706 ) -> Result<Event, String> { 1707 tangle_v2_event( 1708 key, 1709 created_at, 1710 22_242, 1711 vec![ 1712 Tag::from_parts("relay", &["wss://relay.radroots.test"])?, 1713 Tag::from_parts("challenge", &[challenge])?, 1714 ], 1715 "", 1716 ) 1717 } 1718 1719 fn tangle_v2_group_create_event( 1720 key: FixtureKey, 1721 group_id: &str, 1722 created_at: u64, 1723 flags: &[&str], 1724 ) -> Result<Event, String> { 1725 let mut tags = vec![ 1726 Tag::from_parts("h", &[group_id])?, 1727 Tag::from_parts("name", &[group_id])?, 1728 ]; 1729 for flag in flags { 1730 tags.push(Tag::from_parts(flag, &[])?); 1731 } 1732 tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 1733 } 1734 1735 fn tangle_v2_group_event( 1736 key: FixtureKey, 1737 group_id: &str, 1738 created_at: u64, 1739 kind: u64, 1740 content: &str, 1741 ) -> Result<Event, String> { 1742 tangle_v2_event( 1743 key, 1744 created_at, 1745 kind, 1746 vec![Tag::from_parts("h", &[group_id])?], 1747 content, 1748 ) 1749 } 1750 1751 fn session_pocket_event( 1752 key: FixtureKey, 1753 created_at: u64, 1754 kind: u64, 1755 tags: Vec<Tag>, 1756 content: &str, 1757 ) -> PocketOwnedEvent { 1758 let tags = session_pocket_tags_from_protocol(&tags); 1759 let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32); 1760 RelaySigner::from_secret_hex(&secret) 1761 .expect("signer") 1762 .sign_pocket_event( 1763 PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")), 1764 &tags, 1765 PocketTime::from_u64(created_at), 1766 content.as_bytes(), 1767 ) 1768 .expect("pocket event") 1769 } 1770 1771 fn session_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 1772 let parts = tags 1773 .iter() 1774 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 1775 .collect::<Vec<_>>(); 1776 PocketOwnedTags::new(&parts).expect("pocket tags") 1777 } 1778 1779 fn session_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> { 1780 let tags = event 1781 .tags() 1782 .map_err(|error| error.to_string())? 1783 .iter() 1784 .map(|tag| { 1785 Tag::new( 1786 tag.map(|value| { 1787 std::str::from_utf8(value) 1788 .map(str::to_owned) 1789 .map_err(|error| error.to_string()) 1790 }) 1791 .collect::<Result<Vec<_>, _>>()?, 1792 ) 1793 .map_err(|error| error.to_string()) 1794 }) 1795 .collect::<Result<Vec<_>, _>>()?; 1796 Ok(Event::new( 1797 EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?, 1798 UnsignedEvent::new( 1799 PublicKeyHex::new(&event.pubkey().as_hex_string()) 1800 .map_err(|error| error.to_string())?, 1801 UnixTimestamp::new(event.created_at().as_u64()), 1802 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?, 1803 tags, 1804 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?, 1805 ), 1806 SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?, 1807 )) 1808 } 1809 1810 fn fixture_secret_byte(key: FixtureKey) -> u8 { 1811 match key { 1812 FixtureKey::Relay => 9, 1813 FixtureKey::Owner => 10, 1814 FixtureKey::Admin => 11, 1815 FixtureKey::Member => 12, 1816 FixtureKey::Outsider => 13, 1817 } 1818 } 1819 1820 fn session_runtime( 1821 name: &str, 1822 ) -> ( 1823 RelayRuntimeHandle, 1824 crate::relay::auth::BaseAuthState, 1825 TangleEventReceiver, 1826 ) { 1827 let root = temp_root(name); 1828 let _ = std::fs::remove_dir_all(&root); 1829 let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime"); 1830 let auth = runtime.auth_state().expect("auth"); 1831 let events = runtime.event_bus().subscribe(); 1832 (RelayRuntimeHandle::new(runtime), auth, events) 1833 } 1834 1835 fn req(subscription_id: SubscriptionId) -> ClientMessage { 1836 ClientMessage::Req { 1837 subscription_id, 1838 filters: vec![Filter::empty()], 1839 } 1840 } 1841 1842 fn take_outbound_text(session: &mut TangleWebSocketSession) -> String { 1843 let message = session.outbound_receiver.try_recv().expect("message"); 1844 let Message::Text(text) = message else { 1845 panic!("expected text message") 1846 }; 1847 text.to_string() 1848 } 1849 1850 fn assert_relay_message_text(actual: &str, expected: RelayMessage) { 1851 assert_eq!( 1852 serde_json::from_str::<serde_json::Value>(actual).expect("actual relay JSON"), 1853 serde_json::from_str::<serde_json::Value>(&expected.encode()) 1854 .expect("expected relay JSON") 1855 ); 1856 } 1857 1858 fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { 1859 runtime_config_with_outbound_queue(root, 8) 1860 } 1861 1862 fn runtime_config_with_groups(root: &Path) -> BaseRelayRuntimeConfig { 1863 let raw = json!({ 1864 "server": { 1865 "listen_addr": "127.0.0.1:0", 1866 "relay_url": "wss://relay.radroots.test" 1867 }, 1868 "pocket": { 1869 "data_directory": root.join("pocket"), 1870 "sync_policy": "flush_on_shutdown", 1871 "query": { 1872 "allow_scraping": false, 1873 "allow_scrape_if_limited_to": 100, 1874 "allow_scrape_if_max_seconds": 3600 1875 } 1876 }, 1877 "groups": { 1878 "enabled": true, 1879 "canonical_relay_url": "wss://relay.radroots.test", 1880 "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", 1881 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], 1882 "policy": { 1883 "public_join": false, 1884 "invites_enabled": false 1885 } 1886 }, 1887 "auth": { 1888 "challenge_ttl_seconds": 300, 1889 "created_at_skew_seconds": 600 1890 }, 1891 "limits": { 1892 "max_message_length": 1048576, 1893 "max_subid_length": 64, 1894 "max_subscriptions_per_connection": 64, 1895 "max_filters_per_request": 10, 1896 "max_tag_values_per_filter": 100, 1897 "max_query_complexity": 2048, 1898 "max_limit": 500, 1899 "default_limit": 100, 1900 "max_event_tags": 200, 1901 "max_content_length": 65536, 1902 "broadcast_channel_capacity": 8, 1903 "per_connection_outbound_queue": 8 1904 }, 1905 "rate_limits": { 1906 "auth": { 1907 "per_ip": {"window_seconds": 60, "max_hits": 120}, 1908 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 1909 "failures": {"window_seconds": 300, "max_hits": 5}, 1910 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 1911 }, 1912 "event": { 1913 "per_ip": {"window_seconds": 60, "max_hits": 600}, 1914 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 1915 "per_kind": {"window_seconds": 60, "max_hits": 1000} 1916 }, 1917 "group": { 1918 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 1919 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 1920 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 1921 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 1922 "join_flow": {"window_seconds": 300, "max_hits": 10}, 1923 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 1924 }, 1925 "req": { 1926 "per_ip": {"window_seconds": 60, "max_hits": 600}, 1927 "per_connection": {"window_seconds": 60, "max_hits": 120}, 1928 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 1929 "per_group": {"window_seconds": 60, "max_hits": 240}, 1930 "per_kind": {"window_seconds": 60, "max_hits": 500}, 1931 "broad": {"window_seconds": 60, "max_hits": 30} 1932 }, 1933 "count": { 1934 "per_ip": {"window_seconds": 60, "max_hits": 300}, 1935 "per_connection": {"window_seconds": 60, "max_hits": 60}, 1936 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 1937 "per_group": {"window_seconds": 60, "max_hits": 120}, 1938 "per_kind": {"window_seconds": 60, "max_hits": 240}, 1939 "broad": {"window_seconds": 60, "max_hits": 20} 1940 } 1941 } 1942 }) 1943 .to_string(); 1944 parse_base_relay_runtime_config_json(&raw).expect("config") 1945 } 1946 1947 fn runtime_config_with_outbound_queue( 1948 root: &Path, 1949 per_connection_outbound_queue: usize, 1950 ) -> BaseRelayRuntimeConfig { 1951 let raw = json!({ 1952 "server": { 1953 "listen_addr": "127.0.0.1:0", 1954 "relay_url": "wss://relay.radroots.test" 1955 }, 1956 "pocket": { 1957 "data_directory": root.join("pocket"), 1958 "sync_policy": "flush_on_shutdown", 1959 "query": { 1960 "allow_scraping": false, 1961 "allow_scrape_if_limited_to": 100, 1962 "allow_scrape_if_max_seconds": 3600 1963 } 1964 }, 1965 "groups": { 1966 "enabled": false 1967 }, 1968 "auth": { 1969 "challenge_ttl_seconds": 300, 1970 "created_at_skew_seconds": 600 1971 }, 1972 "limits": { 1973 "max_message_length": 1048576, 1974 "max_subid_length": 64, 1975 "max_subscriptions_per_connection": 64, 1976 "max_filters_per_request": 10, 1977 "max_tag_values_per_filter": 100, 1978 "max_query_complexity": 2048, 1979 "max_limit": 500, 1980 "default_limit": 100, 1981 "max_event_tags": 200, 1982 "max_content_length": 65536, 1983 "broadcast_channel_capacity": per_connection_outbound_queue, 1984 "per_connection_outbound_queue": per_connection_outbound_queue 1985 }, 1986 "rate_limits": { 1987 "auth": { 1988 "per_ip": {"window_seconds": 60, "max_hits": 120}, 1989 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 1990 "failures": {"window_seconds": 300, "max_hits": 5}, 1991 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 1992 }, 1993 "event": { 1994 "per_ip": {"window_seconds": 60, "max_hits": 600}, 1995 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 1996 "per_kind": {"window_seconds": 60, "max_hits": 1000} 1997 }, 1998 "group": { 1999 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 2000 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 2001 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 2002 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 2003 "join_flow": {"window_seconds": 300, "max_hits": 10}, 2004 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 2005 }, 2006 "req": { 2007 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2008 "per_connection": {"window_seconds": 60, "max_hits": 120}, 2009 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 2010 "per_group": {"window_seconds": 60, "max_hits": 240}, 2011 "per_kind": {"window_seconds": 60, "max_hits": 500}, 2012 "broad": {"window_seconds": 60, "max_hits": 30} 2013 }, 2014 "count": { 2015 "per_ip": {"window_seconds": 60, "max_hits": 300}, 2016 "per_connection": {"window_seconds": 60, "max_hits": 60}, 2017 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2018 "per_group": {"window_seconds": 60, "max_hits": 120}, 2019 "per_kind": {"window_seconds": 60, "max_hits": 240}, 2020 "broad": {"window_seconds": 60, "max_hits": 20} 2021 } 2022 } 2023 }) 2024 .to_string(); 2025 parse_base_relay_runtime_config_json(&raw).expect("config") 2026 } 2027 2028 fn session_limits(per_connection_outbound_queue: usize) -> TangleRuntimeLimits { 2029 session_limits_result(per_connection_outbound_queue).expect("limits") 2030 } 2031 2032 fn session_limits_with_message_length( 2033 max_message_length: usize, 2034 per_connection_outbound_queue: usize, 2035 ) -> TangleRuntimeLimits { 2036 TangleRuntimeLimits::new( 2037 max_message_length, 2038 BaseRelayLimits::new(BaseRelayLimitSettings { 2039 max_pending_events: per_connection_outbound_queue, 2040 max_subscription_id_length: 64, 2041 max_subscriptions: 64, 2042 max_filters_per_request: 10, 2043 max_tag_values_per_filter: 100, 2044 max_query_complexity: 610, 2045 max_event_tags: 200, 2046 max_content_length: 65_536, 2047 max_limit: 500, 2048 default_limit: 100, 2049 }) 2050 .expect("relay limits"), 2051 16, 2052 per_connection_outbound_queue, 2053 ) 2054 .expect("limits") 2055 } 2056 2057 fn session_limits_result( 2058 per_connection_outbound_queue: usize, 2059 ) -> Result<TangleRuntimeLimits, BaseRelayError> { 2060 TangleRuntimeLimits::new( 2061 1_048_576, 2062 BaseRelayLimits::new(BaseRelayLimitSettings { 2063 max_pending_events: per_connection_outbound_queue, 2064 max_subscription_id_length: 64, 2065 max_subscriptions: 64, 2066 max_filters_per_request: 10, 2067 max_tag_values_per_filter: 100, 2068 max_query_complexity: 610, 2069 max_event_tags: 200, 2070 max_content_length: 65_536, 2071 max_limit: 500, 2072 default_limit: 100, 2073 })?, 2074 16, 2075 per_connection_outbound_queue, 2076 ) 2077 } 2078 2079 fn temp_root(name: &str) -> PathBuf { 2080 std::env::temp_dir().join(format!("tangle-session-{name}-{}", std::process::id())) 2081 } 2082 }