tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

session.rs (77092B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{
      4     client_message::{RuntimeClientMessage, parse_runtime_client_message},
      5     errors::BaseRelayError,
      6     event_bus::{TangleEventReceiveError, TangleEventReceiver},
      7     logging,
      8     relay::{
      9         auth::{BaseAuthState, generate_auth_challenge},
     10         core::BaseRelay,
     11         live::{CloseResult, LiveSubscriptionSet},
     12         outbound::RuntimeRelayMessage,
     13     },
     14     resource_limits::{RelayResourceLimiter, RelaySubscriptionPermit},
     15     runtime::{
     16         RelayRuntimeHandle, TangleClientMessageMetricKind, TangleClientRateLimitContext,
     17         TangleRuntimeLimits,
     18     },
     19 };
     20 use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket};
     21 use std::{
     22     collections::BTreeMap,
     23     net::IpAddr,
     24     sync::atomic::{AtomicU64, Ordering},
     25     time::{Instant, SystemTime, UNIX_EPOCH},
     26 };
     27 use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp};
     28 use tangle_store_pocket::PocketOwnedFilter;
     29 use tokio::sync::{mpsc, watch};
     30 
     31 #[derive(Debug)]
     32 pub struct TangleWebSocketSession {
     33     connection_id: u64,
     34     peer_ip: Option<IpAddr>,
     35     connected_at: Instant,
     36     outbound: TangleOutboundSender,
     37     outbound_receiver: mpsc::Receiver<Message>,
     38     shutdown: watch::Receiver<bool>,
     39     runtime: RelayRuntimeHandle,
     40     limits: TangleRuntimeLimits,
     41     auth: BaseAuthState,
     42     subscriptions: LiveSubscriptionSet,
     43     resource_limiter: Option<RelayResourceLimiter>,
     44     subscription_permits: BTreeMap<SubscriptionId, RelaySubscriptionPermit>,
     45     events: TangleEventReceiver,
     46 }
     47 
     48 static NEXT_TANGLE_CONNECTION_ID: AtomicU64 = AtomicU64::new(1);
     49 
     50 impl TangleWebSocketSession {
     51     pub fn new(
     52         limits: TangleRuntimeLimits,
     53         shutdown: watch::Receiver<bool>,
     54         runtime: RelayRuntimeHandle,
     55         auth: BaseAuthState,
     56         events: TangleEventReceiver,
     57     ) -> Result<Self, BaseRelayError> {
     58         Self::new_with_peer(limits, shutdown, runtime, auth, events, None)
     59     }
     60 
     61     pub fn new_with_peer(
     62         limits: TangleRuntimeLimits,
     63         shutdown: watch::Receiver<bool>,
     64         runtime: RelayRuntimeHandle,
     65         auth: BaseAuthState,
     66         events: TangleEventReceiver,
     67         peer_ip: Option<IpAddr>,
     68     ) -> Result<Self, BaseRelayError> {
     69         Self::new_with_peer_and_resources(limits, shutdown, runtime, auth, events, peer_ip, None)
     70     }
     71 
     72     pub fn new_with_peer_and_resources(
     73         limits: TangleRuntimeLimits,
     74         shutdown: watch::Receiver<bool>,
     75         runtime: RelayRuntimeHandle,
     76         auth: BaseAuthState,
     77         events: TangleEventReceiver,
     78         peer_ip: Option<IpAddr>,
     79         resource_limiter: Option<RelayResourceLimiter>,
     80     ) -> Result<Self, BaseRelayError> {
     81         let outbound_queue_capacity = limits.outbound_queue_capacity();
     82         let (sender, receiver) = mpsc::channel(outbound_queue_capacity);
     83         let subscriptions = LiveSubscriptionSet::new(
     84             limits.base_relay_limits().max_pending_events(),
     85             limits.base_relay_limits().max_subscriptions(),
     86         )?;
     87         Ok(Self {
     88             connection_id: NEXT_TANGLE_CONNECTION_ID.fetch_add(1, Ordering::Relaxed),
     89             peer_ip,
     90             connected_at: Instant::now(),
     91             outbound: TangleOutboundSender {
     92                 sender,
     93                 capacity: outbound_queue_capacity,
     94             },
     95             outbound_receiver: receiver,
     96             shutdown,
     97             runtime,
     98             limits,
     99             auth,
    100             subscriptions,
    101             resource_limiter,
    102             subscription_permits: BTreeMap::new(),
    103             events,
    104         })
    105     }
    106 
    107     pub fn connected_at(&self) -> Instant {
    108         self.connected_at
    109     }
    110 
    111     pub fn outbound(&self) -> TangleOutboundSender {
    112         self.outbound.clone()
    113     }
    114 
    115     pub fn shutdown_requested(&self) -> bool {
    116         *self.shutdown.borrow()
    117     }
    118 
    119     #[cfg(test)]
    120     fn active_subscription_count(&self) -> usize {
    121         self.subscriptions.active_count()
    122     }
    123 
    124     pub async fn run(mut self, mut socket: WebSocket) {
    125         let metrics = self.runtime.metrics();
    126         metrics.record_session_opened();
    127         logging::log_websocket_session_opened(self.connection_id, self.peer_ip);
    128         if !self.issue_auth_challenge() {
    129             let closed_subscriptions = self.close_all_subscriptions();
    130             metrics.record_subscriptions_closed(closed_subscriptions);
    131             metrics.record_session_closed();
    132             metrics.record_event_bus_receivers(
    133                 metrics.event_bus_receivers_current().saturating_sub(1),
    134             );
    135             logging::log_websocket_session_closed(
    136                 self.connection_id,
    137                 self.peer_ip,
    138                 closed_subscriptions,
    139             );
    140             return;
    141         }
    142         loop {
    143             if self.shutdown_requested() {
    144                 let _ = socket.send(Message::Close(None)).await;
    145                 break;
    146             }
    147             tokio::select! {
    148                 incoming = socket.recv() => {
    149                     match incoming {
    150                         Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break,
    151                         Some(Ok(message)) => {
    152                             match self.handle_incoming_message(message).await {
    153                                 TangleSessionControl::Continue => {}
    154                                 TangleSessionControl::Close(message) => {
    155                                     let _ = socket.send(message).await;
    156                                     break;
    157                                 }
    158                                 TangleSessionControl::Stop => break,
    159                             }
    160                         }
    161                     }
    162                 }
    163                 outgoing = self.outbound_receiver.recv() => {
    164                     let Some(message) = outgoing else {
    165                         break;
    166                     };
    167                     if socket.send(message).await.is_err() {
    168                         break;
    169                     }
    170                 }
    171                 event = self.events.recv() => {
    172                     match self.handle_event_receive_result(event).await {
    173                         TangleSessionControl::Continue => {}
    174                         TangleSessionControl::Close(message) => {
    175                             let _ = socket.send(message).await;
    176                             break;
    177                         }
    178                         TangleSessionControl::Stop => break,
    179                     }
    180                 }
    181                 changed = self.shutdown.changed() => {
    182                     if changed.is_err() || self.shutdown_requested() {
    183                         let _ = socket.send(Message::Close(None)).await;
    184                         break;
    185                     }
    186                 }
    187             }
    188         }
    189         let closed_subscriptions = self.close_all_subscriptions();
    190         metrics.record_subscriptions_closed(closed_subscriptions);
    191         metrics.record_session_closed();
    192         metrics.record_event_bus_receivers(metrics.event_bus_receivers_current().saturating_sub(1));
    193         logging::log_websocket_session_closed(
    194             self.connection_id,
    195             self.peer_ip,
    196             closed_subscriptions,
    197         );
    198     }
    199 
    200     async fn handle_event_receive_result(
    201         &mut self,
    202         result: Result<tangle_groups::StoreOffset, TangleEventReceiveError>,
    203     ) -> TangleSessionControl {
    204         match result {
    205             Ok(offset) => self.handle_event_offset(offset).await,
    206             Err(TangleEventReceiveError::Lagged(skipped)) => {
    207                 self.runtime.metrics().record_event_bus_lagged(skipped);
    208                 TangleSessionControl::Close(event_stream_lag_close_message())
    209             }
    210             Err(TangleEventReceiveError::Closed) => TangleSessionControl::Stop,
    211             Err(TangleEventReceiveError::Empty) => TangleSessionControl::Continue,
    212         }
    213     }
    214 
    215     async fn handle_event_offset(
    216         &mut self,
    217         offset: tangle_groups::StoreOffset,
    218     ) -> TangleSessionControl {
    219         let runtime = self.runtime.clone();
    220         let auth = self.auth.clone();
    221         let replies = match runtime
    222             .fanout_event_offset(offset, &mut self.subscriptions, &auth)
    223             .await
    224         {
    225             Ok(replies) => replies,
    226             Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()],
    227         };
    228         for reply in replies {
    229             if let Err(control) = self.enqueue_relay_message(reply) {
    230                 return control;
    231             }
    232         }
    233         TangleSessionControl::Continue
    234     }
    235 
    236     async fn handle_incoming_message(&mut self, message: Message) -> TangleSessionControl {
    237         match message {
    238             Message::Text(raw) => self.dispatch_text(raw.as_str()).await,
    239             Message::Binary(_) => self
    240                 .enqueue_relay_message(
    241                     RelayMessage::Notice("invalid: client message must be a text frame".to_owned())
    242                         .into(),
    243                 )
    244                 .map(|_| TangleSessionControl::Continue)
    245                 .unwrap_or_else(|control| control),
    246             Message::Ping(_) | Message::Pong(_) => TangleSessionControl::Continue,
    247             Message::Close(_) => TangleSessionControl::Stop,
    248         }
    249     }
    250 
    251     fn issue_auth_challenge(&mut self) -> bool {
    252         let message = generate_auth_challenge()
    253             .and_then(|challenge| {
    254                 self.auth
    255                     .issue_challenge(challenge, current_unix_timestamp())
    256             })
    257             .unwrap_or_else(|error| RelayMessage::Notice(error.prefixed_message()));
    258         self.send_relay_message(message.into()).is_ok()
    259     }
    260 
    261     async fn dispatch_text(&mut self, raw: &str) -> TangleSessionControl {
    262         if raw.len() > self.limits.max_message_length() {
    263             return self
    264                 .enqueue_relay_message(
    265                     RelayMessage::Notice(format!(
    266                         "invalid: client message length exceeds runtime max_message_length {}",
    267                         self.limits.max_message_length()
    268                     ))
    269                     .into(),
    270                 )
    271                 .map(|_| TangleSessionControl::Continue)
    272                 .unwrap_or_else(|control| control);
    273         }
    274         let replies = match parse_runtime_client_message(raw) {
    275             Ok(message) => match self.handle_client_message(message).await {
    276                 Ok(replies) => replies,
    277                 Err(error) => vec![RelayMessage::Notice(error.prefixed_message()).into()],
    278             },
    279             Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}")).into()],
    280         };
    281         for reply in replies {
    282             if let Err(control) = self.enqueue_relay_message(reply) {
    283                 return control;
    284             }
    285         }
    286         TangleSessionControl::Continue
    287     }
    288 
    289     async fn handle_client_message(
    290         &mut self,
    291         message: RuntimeClientMessage,
    292     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
    293         match message {
    294             RuntimeClientMessage::Req {
    295                 subscription_id,
    296                 filters,
    297                 search_present,
    298             } => {
    299                 self.handle_req(subscription_id, filters, search_present)
    300                     .await
    301             }
    302             RuntimeClientMessage::Count {
    303                 subscription_id,
    304                 filters,
    305                 search_present,
    306             } => {
    307                 let context = self.client_rate_limit_context();
    308                 self.runtime
    309                     .handle_client_message_with_rate_limit_context(
    310                         RuntimeClientMessage::Count {
    311                             subscription_id,
    312                             filters,
    313                             search_present,
    314                         },
    315                         &mut self.auth,
    316                         context,
    317                         current_unix_timestamp(),
    318                     )
    319                     .await
    320             }
    321             RuntimeClientMessage::Close(subscription_id) => {
    322                 let metrics = self.runtime.metrics();
    323                 metrics.record_client_message(TangleClientMessageMetricKind::Close);
    324                 self.limits
    325                     .base_relay_limits()
    326                     .validate_subscription_id(&subscription_id)?;
    327                 if self.subscriptions.close(&subscription_id) == CloseResult::Closed {
    328                     self.subscription_permits.remove(&subscription_id);
    329                     metrics.record_subscriptions_closed(1);
    330                 }
    331                 Ok(Vec::new())
    332             }
    333             message => {
    334                 let context = self.client_rate_limit_context();
    335                 self.runtime
    336                     .handle_client_message_with_rate_limit_context(
    337                         message,
    338                         &mut self.auth,
    339                         context,
    340                         current_unix_timestamp(),
    341                     )
    342                     .await
    343             }
    344         }
    345     }
    346 
    347     async fn handle_req(
    348         &mut self,
    349         subscription_id: SubscriptionId,
    350         filters: Vec<PocketOwnedFilter>,
    351         search_present: bool,
    352     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
    353         let metrics = self.runtime.metrics();
    354         metrics.record_client_message(TangleClientMessageMetricKind::Req);
    355         self.limits
    356             .base_relay_limits()
    357             .validate_subscription_id(&subscription_id)?;
    358         self.limits
    359             .base_relay_limits()
    360             .validate_pocket_filters(&filters)?;
    361         if let Some(message) =
    362             BaseRelay::unsupported_search_present_closed(&subscription_id, search_present)
    363         {
    364             return Ok(vec![message.into()]);
    365         }
    366         if let Some(message) = self
    367             .runtime
    368             .rate_limit_req_pocket(
    369                 &subscription_id,
    370                 &filters,
    371                 &self.auth,
    372                 self.client_rate_limit_context(),
    373                 current_unix_timestamp(),
    374             )
    375             .await
    376         {
    377             return Ok(vec![message.into()]);
    378         }
    379         let should_subscribe = !pocket_filters_are_complete(&filters);
    380         let already_subscribed = self.subscriptions.contains(&subscription_id);
    381         if should_subscribe {
    382             self.subscriptions
    383                 .ensure_can_subscribe(&subscription_id, &filters)?;
    384         }
    385         let report = self
    386             .runtime
    387             .query_req_with_auth_report(
    388                 subscription_id.clone(),
    389                 filters.clone(),
    390                 search_present,
    391                 &self.auth,
    392             )
    393             .await?;
    394         let closes_subscription = report.group_read_denied();
    395         let replies = report.into_messages();
    396         if should_subscribe && !closes_subscription {
    397             let host_permit = if already_subscribed {
    398                 None
    399             } else {
    400                 self.resource_limiter
    401                     .as_ref()
    402                     .map(|resources| resources.try_open_subscriptions(1))
    403                     .transpose()?
    404             };
    405             self.subscriptions
    406                 .subscribe(subscription_id.clone(), filters)?;
    407             if let Some(permit) = host_permit {
    408                 self.subscription_permits
    409                     .insert(subscription_id.clone(), permit);
    410             }
    411             metrics.record_subscription_opened();
    412             logging::log_subscription_opened(self.connection_id, &subscription_id);
    413         }
    414         Ok(replies)
    415     }
    416 
    417     fn close_all_subscriptions(&mut self) -> usize {
    418         let closed = self.subscriptions.close_all();
    419         self.subscription_permits.clear();
    420         closed
    421     }
    422 
    423     fn client_rate_limit_context(&self) -> TangleClientRateLimitContext {
    424         TangleClientRateLimitContext::new(self.peer_ip, Some(self.connection_id))
    425     }
    426 
    427     fn send_relay_message(&self, message: RuntimeRelayMessage) -> Result<(), TangleSessionControl> {
    428         let text = message
    429             .encode()
    430             .map_err(|_| TangleSessionControl::Close(outbound_encode_close_message()))?;
    431         self.outbound
    432             .try_send(Message::Text(text.into()))
    433             .map_err(|error| self.outbound_queue_error_control(error))
    434     }
    435 
    436     fn enqueue_relay_message(
    437         &self,
    438         message: RuntimeRelayMessage,
    439     ) -> Result<(), TangleSessionControl> {
    440         self.send_relay_message(message)
    441     }
    442 
    443     fn outbound_queue_error_control(
    444         &self,
    445         error: TangleOutboundQueueError,
    446     ) -> TangleSessionControl {
    447         match error {
    448             TangleOutboundQueueError::Full => {
    449                 self.runtime.metrics().record_outbound_queue_full_close();
    450                 TangleSessionControl::Close(outbound_queue_full_close_message())
    451             }
    452             TangleOutboundQueueError::Closed => TangleSessionControl::Stop,
    453         }
    454     }
    455 }
    456 
    457 #[derive(Debug, Clone, PartialEq, Eq)]
    458 enum TangleSessionControl {
    459     Continue,
    460     Close(Message),
    461     Stop,
    462 }
    463 
    464 fn event_stream_lag_close_message() -> Message {
    465     Message::Close(Some(CloseFrame {
    466         code: 1008,
    467         reason: Utf8Bytes::from_static("event stream lagged; reconnect required"),
    468     }))
    469 }
    470 
    471 fn outbound_queue_full_close_message() -> Message {
    472     Message::Close(Some(CloseFrame {
    473         code: 1013,
    474         reason: Utf8Bytes::from_static("outbound queue full; reconnect required"),
    475     }))
    476 }
    477 
    478 fn outbound_encode_close_message() -> Message {
    479     Message::Close(Some(CloseFrame {
    480         code: 1011,
    481         reason: Utf8Bytes::from_static("outbound relay message encode failed"),
    482     }))
    483 }
    484 
    485 #[derive(Debug, Clone)]
    486 pub struct TangleOutboundSender {
    487     sender: mpsc::Sender<Message>,
    488     capacity: usize,
    489 }
    490 
    491 impl TangleOutboundSender {
    492     pub fn capacity(&self) -> usize {
    493         self.capacity
    494     }
    495 
    496     pub fn try_send(&self, message: Message) -> Result<(), TangleOutboundQueueError> {
    497         self.sender.try_send(message).map_err(Into::into)
    498     }
    499 }
    500 
    501 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    502 pub enum TangleOutboundQueueError {
    503     Full,
    504     Closed,
    505 }
    506 
    507 impl From<mpsc::error::TrySendError<Message>> for TangleOutboundQueueError {
    508     fn from(error: mpsc::error::TrySendError<Message>) -> Self {
    509         match error {
    510             mpsc::error::TrySendError::Full(_) => Self::Full,
    511             mpsc::error::TrySendError::Closed(_) => Self::Closed,
    512         }
    513     }
    514 }
    515 
    516 fn current_unix_timestamp() -> UnixTimestamp {
    517     UnixTimestamp::new(
    518         SystemTime::now()
    519             .duration_since(UNIX_EPOCH)
    520             .map(|duration| duration.as_secs())
    521             .unwrap_or(0),
    522     )
    523 }
    524 
    525 fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool {
    526     !filters.is_empty() && filters.iter().all(|filter| filter.completes())
    527 }
    528 
    529 #[cfg(test)]
    530 impl TangleWebSocketSession {
    531     async fn handle_protocol_client_message_for_test(
    532         &mut self,
    533         message: tangle_protocol::ClientMessage,
    534     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
    535         let messages = self
    536             .handle_client_message(protocol_client_message_to_runtime_for_session_test(
    537                 message,
    538             )?)
    539             .await?;
    540         crate::relay::outbound::protocol_messages_for_test(messages)
    541     }
    542 }
    543 
    544 #[cfg(test)]
    545 fn protocol_client_message_to_runtime_for_session_test(
    546     message: tangle_protocol::ClientMessage,
    547 ) -> Result<RuntimeClientMessage, BaseRelayError> {
    548     match message {
    549         tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event(
    550             crate::pocket_conversion::tangle_event_to_pocket(&event)?,
    551         )),
    552         tangle_protocol::ClientMessage::Req {
    553             subscription_id,
    554             filters,
    555         } => Ok(RuntimeClientMessage::Req {
    556             subscription_id,
    557             search_present: filters.iter().any(|filter| filter.search().is_some()),
    558             filters: filters
    559                 .iter()
    560                 .map(crate::pocket_conversion::tangle_filter_to_pocket)
    561                 .collect::<Result<Vec<_>, _>>()?,
    562         }),
    563         tangle_protocol::ClientMessage::Count {
    564             subscription_id,
    565             filters,
    566         } => Ok(RuntimeClientMessage::Count {
    567             subscription_id,
    568             search_present: filters.iter().any(|filter| filter.search().is_some()),
    569             filters: filters
    570                 .iter()
    571                 .map(crate::pocket_conversion::tangle_filter_to_pocket)
    572                 .collect::<Result<Vec<_>, _>>()?,
    573         }),
    574         tangle_protocol::ClientMessage::Close(subscription_id) => {
    575             Ok(RuntimeClientMessage::Close(subscription_id))
    576         }
    577         tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth(
    578             crate::pocket_conversion::tangle_event_to_pocket(&event)?,
    579         )),
    580         tangle_protocol::ClientMessage::NegOpen {
    581             subscription_id,
    582             filter,
    583             message,
    584         } => Ok(RuntimeClientMessage::NegOpen {
    585             subscription_id,
    586             filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?,
    587             message,
    588         }),
    589         tangle_protocol::ClientMessage::NegMsg {
    590             subscription_id,
    591             message,
    592         } => Ok(RuntimeClientMessage::NegMsg {
    593             subscription_id,
    594             message,
    595         }),
    596         tangle_protocol::ClientMessage::NegClose(subscription_id) => {
    597             Ok(RuntimeClientMessage::NegClose(subscription_id))
    598         }
    599     }
    600 }
    601 
    602 #[cfg(test)]
    603 mod tests {
    604     use super::{
    605         TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession,
    606         current_unix_timestamp, event_stream_lag_close_message, outbound_queue_full_close_message,
    607     };
    608     use crate::{
    609         config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
    610         errors::BaseRelayError,
    611         event_bus::TangleEventReceiver,
    612         rate_limits::{TangleRateLimitKey, TangleRateLimitScope},
    613         relay::core::{BaseRelayLimitSettings, BaseRelayLimits},
    614         runtime::{RelayRuntime, RelayRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal},
    615     };
    616     use axum::extract::ws::Message;
    617     use serde_json::json;
    618     use std::path::{Path, PathBuf};
    619     use tangle_crypto::RelaySigner;
    620     use tangle_groups::{KIND_GROUP_CREATE_GROUP, StoreOffset};
    621     use tangle_protocol::{
    622         ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex,
    623         SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value,
    624     };
    625     use tangle_store_pocket::{
    626         PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime,
    627     };
    628     use tangle_test_support::FixtureKey;
    629 
    630     #[test]
    631     fn websocket_session_records_connection_time() {
    632         let before = std::time::Instant::now();
    633         let shutdown = TangleShutdownSignal::new();
    634         let (runtime, auth, events) = session_runtime("records-connection-time");
    635         let session = TangleWebSocketSession::new(
    636             session_limits(8),
    637             shutdown.subscribe(),
    638             runtime,
    639             auth,
    640             events,
    641         )
    642         .expect("session");
    643 
    644         assert!(session.connected_at() >= before);
    645     }
    646 
    647     #[test]
    648     fn websocket_session_limit_config_rejects_zero_outbound_capacity() {
    649         assert!(session_limits_result(0).is_err());
    650     }
    651 
    652     #[test]
    653     fn websocket_session_observes_shutdown_request() {
    654         let shutdown = TangleShutdownSignal::new();
    655         let (runtime, auth, events) = session_runtime("observes-shutdown");
    656         let session = TangleWebSocketSession::new(
    657             session_limits(8),
    658             shutdown.subscribe(),
    659             runtime,
    660             auth,
    661             events,
    662         )
    663         .expect("session");
    664 
    665         assert!(!session.shutdown_requested());
    666 
    667         shutdown.request_shutdown();
    668 
    669         assert!(session.shutdown_requested());
    670     }
    671 
    672     #[tokio::test]
    673     async fn websocket_session_rejects_overlong_text_before_parsing() {
    674         let shutdown = TangleShutdownSignal::new();
    675         let (runtime, auth, events) = session_runtime("overlong-text");
    676         let mut session = TangleWebSocketSession::new(
    677             session_limits_with_message_length(8, 8),
    678             shutdown.subscribe(),
    679             runtime,
    680             auth,
    681             events,
    682         )
    683         .expect("session");
    684 
    685         assert_eq!(
    686             session.dispatch_text("123456789").await,
    687             TangleSessionControl::Continue
    688         );
    689         let message = session.outbound_receiver.try_recv().expect("notice");
    690         let Message::Text(text) = message else {
    691             panic!("expected text notice")
    692         };
    693         assert_eq!(
    694             text.as_str(),
    695             "[\"NOTICE\",\"invalid: client message length exceeds runtime max_message_length 8\"]"
    696         );
    697     }
    698 
    699     #[tokio::test]
    700     async fn websocket_session_preserves_chorus_malformed_message_parity() {
    701         let shutdown = TangleShutdownSignal::new();
    702         let (runtime, auth, events) = session_runtime("chorus-malformed-parity");
    703         let mut session = TangleWebSocketSession::new(
    704             session_limits(16),
    705             shutdown.subscribe(),
    706             runtime,
    707             auth,
    708             events,
    709         )
    710         .expect("session");
    711         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "parity")
    712             .expect("event");
    713         for (raw, expected) in [
    714             ("{", None),
    715             (
    716                 "[\"NOTICE\",\"client\"]",
    717                 Some("[\"NOTICE\",\"invalid: client message command `NOTICE` is unsupported\"]"),
    718             ),
    719             (
    720                 "[\"NEG-OPEN\",\"sub\",{}]",
    721                 Some(
    722                     "[\"NOTICE\",\"invalid: NEG-OPEN client message must contain a subscription id, filter, and message\"]",
    723                 ),
    724             ),
    725             (
    726                 "[\"REQ\"]",
    727                 Some(
    728                     "[\"NOTICE\",\"invalid: REQ client message must contain a subscription id and filters\"]",
    729                 ),
    730             ),
    731             (
    732                 "[\"CLOSE\",1]",
    733                 Some("[\"NOTICE\",\"invalid: CLOSE subscription id must be a string\"]"),
    734             ),
    735         ] {
    736             assert_eq!(
    737                 session.dispatch_text(raw).await,
    738                 TangleSessionControl::Continue
    739             );
    740             let text = take_outbound_text(&mut session);
    741             if let Some(expected) = expected {
    742                 assert_eq!(text, expected);
    743             } else {
    744                 assert!(text.starts_with("[\"NOTICE\",\"invalid: client message JSON is invalid:"));
    745             }
    746         }
    747 
    748         assert_eq!(
    749             session
    750                 .dispatch_text("[\"REQ\",\"sub-search\",{\"search\":\"carrots\"}]")
    751                 .await,
    752             TangleSessionControl::Continue
    753         );
    754         assert_eq!(
    755             take_outbound_text(&mut session),
    756             "[\"CLOSED\",\"sub-search\",\"unsupported: search filters are not supported\"]"
    757         );
    758 
    759         assert_eq!(
    760             session
    761                 .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string())
    762                 .await,
    763             TangleSessionControl::Continue
    764         );
    765         assert_eq!(
    766             take_outbound_text(&mut session),
    767             format!("[\"OK\",\"{}\",true,\"\"]", event.id().as_str())
    768         );
    769     }
    770 
    771     #[tokio::test]
    772     async fn websocket_session_returns_disabled_negentropy_errors() {
    773         let shutdown = TangleShutdownSignal::new();
    774         let (runtime, auth, events) = session_runtime("disabled-negentropy");
    775         let mut session = TangleWebSocketSession::new(
    776             session_limits(16),
    777             shutdown.subscribe(),
    778             runtime,
    779             auth,
    780             events,
    781         )
    782         .expect("session");
    783 
    784         assert_eq!(
    785             session
    786                 .dispatch_text("[\"NEG-OPEN\",\"neg-sub\",{\"kinds\":[1]},\"00\"]")
    787                 .await,
    788             TangleSessionControl::Continue
    789         );
    790         assert_eq!(
    791             take_outbound_text(&mut session),
    792             "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]"
    793         );
    794         assert_eq!(
    795             session
    796                 .dispatch_text("[\"NEG-MSG\",\"neg-sub\",\"\"]")
    797                 .await,
    798             TangleSessionControl::Continue
    799         );
    800         assert_eq!(
    801             take_outbound_text(&mut session),
    802             "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]"
    803         );
    804         assert_eq!(
    805             session.dispatch_text("[\"NEG-CLOSE\",\"neg-sub\"]").await,
    806             TangleSessionControl::Continue
    807         );
    808         assert!(session.outbound_receiver.try_recv().is_err());
    809     }
    810 
    811     #[tokio::test]
    812     async fn websocket_session_disabled_negentropy_privacy_response_omits_filter_material() {
    813         let shutdown = TangleShutdownSignal::new();
    814         let (runtime, auth, events) = session_runtime("disabled-negentropy-privacy");
    815         let mut session = TangleWebSocketSession::new(
    816             session_limits(16),
    817             shutdown.subscribe(),
    818             runtime,
    819             auth,
    820             events,
    821         )
    822         .expect("session");
    823         let hidden_event_id = "a".repeat(64);
    824         let private_group_id = "private-group-alpha";
    825         let raw = json!([
    826             "NEG-OPEN",
    827             "neg-private",
    828             {"ids": [hidden_event_id], "#h": [private_group_id]},
    829             "00"
    830         ])
    831         .to_string();
    832 
    833         assert_eq!(
    834             session.dispatch_text(&raw).await,
    835             TangleSessionControl::Continue
    836         );
    837         let text = take_outbound_text(&mut session);
    838 
    839         assert_eq!(
    840             text,
    841             "[\"NEG-ERR\",\"neg-private\",\"blocked: Negentropy sync is disabled\"]"
    842         );
    843         assert!(!text.contains(private_group_id));
    844         assert!(!text.contains(&hidden_event_id));
    845         assert!(!text.contains("inventory"));
    846         assert!(!text.contains("#h"));
    847     }
    848 
    849     #[tokio::test]
    850     async fn websocket_session_scopes_subscriptions_per_connection() {
    851         let shutdown = TangleShutdownSignal::new();
    852         let root = temp_root("connection-scope");
    853         let _ = std::fs::remove_dir_all(&root);
    854         let runtime =
    855             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime"));
    856         let metrics = runtime.metrics();
    857         let auth_a = runtime.auth_state().await.expect("auth a");
    858         let auth_b = runtime.auth_state().await.expect("auth b");
    859         let events_a = runtime.subscribe_events().await;
    860         let events_b = runtime.subscribe_events().await;
    861         let mut first = TangleWebSocketSession::new(
    862             session_limits(8),
    863             shutdown.subscribe(),
    864             runtime.clone(),
    865             auth_a,
    866             events_a,
    867         )
    868         .expect("first");
    869         let mut second = TangleWebSocketSession::new(
    870             session_limits(8),
    871             shutdown.subscribe(),
    872             runtime.clone(),
    873             auth_b,
    874             events_b,
    875         )
    876         .expect("second");
    877         let subscription_id = SubscriptionId::new("shared").expect("subscription");
    878 
    879         assert_eq!(
    880             first
    881                 .handle_protocol_client_message_for_test(req(subscription_id.clone()))
    882                 .await
    883                 .expect("first req"),
    884             vec![RelayMessage::Eose(subscription_id.clone())]
    885         );
    886         assert_eq!(
    887             second
    888                 .handle_protocol_client_message_for_test(req(subscription_id.clone()))
    889                 .await
    890                 .expect("second req"),
    891             vec![RelayMessage::Eose(subscription_id.clone())]
    892         );
    893         assert_eq!(first.active_subscription_count(), 1);
    894         assert_eq!(second.active_subscription_count(), 1);
    895 
    896         assert_eq!(
    897             first
    898                 .handle_protocol_client_message_for_test(ClientMessage::Close(
    899                     subscription_id.clone()
    900                 ))
    901                 .await
    902                 .expect("close first"),
    903             Vec::<RelayMessage>::new()
    904         );
    905         assert_eq!(first.active_subscription_count(), 0);
    906         assert_eq!(second.active_subscription_count(), 1);
    907 
    908         assert_eq!(
    909             second
    910                 .handle_protocol_client_message_for_test(req(subscription_id.clone()))
    911                 .await
    912                 .expect("replace second"),
    913             vec![RelayMessage::Eose(subscription_id.clone())]
    914         );
    915         assert_eq!(first.active_subscription_count(), 0);
    916         assert_eq!(second.active_subscription_count(), 1);
    917 
    918         assert_eq!(
    919             second
    920                 .handle_protocol_client_message_for_test(ClientMessage::Close(subscription_id))
    921                 .await
    922                 .expect("close second"),
    923             Vec::<RelayMessage>::new()
    924         );
    925         assert_eq!(second.active_subscription_count(), 0);
    926         let snapshot = metrics.snapshot();
    927         assert_eq!(snapshot.client_messages(), 5);
    928         assert_eq!(snapshot.req_messages(), 3);
    929         assert_eq!(snapshot.close_messages(), 2);
    930         assert_eq!(snapshot.opened_subscriptions(), 3);
    931         assert_eq!(snapshot.closed_subscriptions(), 2);
    932 
    933         let _ = std::fs::remove_dir_all(root);
    934     }
    935 
    936     #[tokio::test]
    937     async fn websocket_session_live_fanout_uses_current_auth() {
    938         let shutdown = TangleShutdownSignal::new();
    939         let root = temp_root("current-auth-live");
    940         let _ = std::fs::remove_dir_all(&root);
    941         let runtime = RelayRuntimeHandle::new(
    942             RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"),
    943         );
    944         let mut owner_auth = runtime.auth_state().await.expect("owner auth");
    945         owner_auth
    946             .issue_challenge("owner-live", UnixTimestamp::new(100))
    947             .expect("owner challenge");
    948         let owner_auth_event =
    949             tangle_v2_auth_event(FixtureKey::Owner, "owner-live", 120).expect("owner auth event");
    950         assert_eq!(
    951             runtime
    952                 .handle_protocol_client_message_for_test(
    953                     ClientMessage::Auth(owner_auth_event.clone()),
    954                     &mut owner_auth,
    955                     UnixTimestamp::new(120)
    956                 )
    957                 .await
    958                 .expect("owner auth"),
    959             vec![RelayMessage::Ok {
    960                 event_id: owner_auth_event.id().clone(),
    961                 accepted: true,
    962                 message: String::new()
    963             }]
    964         );
    965         let create = tangle_v2_group_create_event(FixtureKey::Owner, "LiveFarm", 121, &["private"])
    966             .expect("create");
    967         assert_eq!(
    968             runtime
    969                 .handle_protocol_client_message_for_test(
    970                     ClientMessage::Event(create.clone()),
    971                     &mut owner_auth,
    972                     UnixTimestamp::new(121)
    973                 )
    974                 .await
    975                 .expect("create"),
    976             vec![RelayMessage::Ok {
    977                 event_id: create.id().clone(),
    978                 accepted: true,
    979                 message: String::new()
    980             }]
    981         );
    982         let session_auth = runtime.auth_state().await.expect("session auth");
    983         let events = runtime.subscribe_events().await;
    984         let mut session = TangleWebSocketSession::new(
    985             session_limits(8),
    986             shutdown.subscribe(),
    987             runtime.clone(),
    988             session_auth,
    989             events,
    990         )
    991         .expect("session");
    992         let subscription_id = SubscriptionId::new("current-auth-live").expect("subscription");
    993 
    994         assert_eq!(
    995             session
    996                 .handle_protocol_client_message_for_test(ClientMessage::Req {
    997                     subscription_id: subscription_id.clone(),
    998                     filters: vec![
    999                         filter_from_value(&json!({"kinds":[1], "#h":["LiveFarm"]}))
   1000                             .expect("filter")
   1001                     ],
   1002                 })
   1003                 .await
   1004                 .expect("req"),
   1005             vec![RelayMessage::Eose(subscription_id.clone())]
   1006         );
   1007         assert_eq!(session.active_subscription_count(), 1);
   1008         let before_auth =
   1009             tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 122, 1, "before auth")
   1010                 .expect("before auth");
   1011         let before_auth_id = before_auth.id().clone();
   1012         assert_eq!(
   1013             runtime
   1014                 .handle_protocol_client_message_for_test(
   1015                     ClientMessage::Event(before_auth),
   1016                     &mut owner_auth,
   1017                     UnixTimestamp::new(122)
   1018                 )
   1019                 .await
   1020                 .expect("before event"),
   1021             vec![RelayMessage::Ok {
   1022                 event_id: before_auth_id,
   1023                 accepted: true,
   1024                 message: String::new()
   1025             }]
   1026         );
   1027         let offset = session.events.recv().await;
   1028         assert_eq!(
   1029             session.handle_event_receive_result(offset).await,
   1030             TangleSessionControl::Continue
   1031         );
   1032         assert!(session.outbound_receiver.try_recv().is_err());
   1033 
   1034         let session_now = current_unix_timestamp();
   1035         session
   1036             .auth
   1037             .issue_challenge("session-live", session_now)
   1038             .expect("session challenge");
   1039         let session_auth_event =
   1040             tangle_v2_auth_event(FixtureKey::Owner, "session-live", session_now.as_u64())
   1041                 .expect("auth event");
   1042         assert_eq!(
   1043             session
   1044                 .handle_protocol_client_message_for_test(ClientMessage::Auth(
   1045                     session_auth_event.clone()
   1046                 ))
   1047                 .await
   1048                 .expect("session auth"),
   1049             vec![RelayMessage::Ok {
   1050                 event_id: session_auth_event.id().clone(),
   1051                 accepted: true,
   1052                 message: String::new()
   1053             }]
   1054         );
   1055         let after_auth = tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 132, 1, "after auth")
   1056             .expect("after auth");
   1057         assert_eq!(
   1058             runtime
   1059                 .handle_protocol_client_message_for_test(
   1060                     ClientMessage::Event(after_auth.clone()),
   1061                     &mut owner_auth,
   1062                     UnixTimestamp::new(132)
   1063                 )
   1064                 .await
   1065                 .expect("after event"),
   1066             vec![RelayMessage::Ok {
   1067                 event_id: after_auth.id().clone(),
   1068                 accepted: true,
   1069                 message: String::new()
   1070             }]
   1071         );
   1072         let offset = session.events.recv().await;
   1073         assert_eq!(
   1074             session.handle_event_receive_result(offset).await,
   1075             TangleSessionControl::Continue
   1076         );
   1077         assert_relay_message_text(
   1078             &take_outbound_text(&mut session),
   1079             RelayMessage::Event {
   1080                 subscription_id,
   1081                 event: after_auth,
   1082             },
   1083         );
   1084 
   1085         let _ = std::fs::remove_dir_all(root);
   1086     }
   1087 
   1088     #[tokio::test]
   1089     async fn websocket_session_complete_and_failed_reqs_do_not_subscribe() {
   1090         let shutdown = TangleShutdownSignal::new();
   1091         let root = temp_root("complete-req-lifecycle");
   1092         let _ = std::fs::remove_dir_all(&root);
   1093         let runtime =
   1094             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime"));
   1095         let mut auth = runtime.auth_state().await.expect("auth");
   1096         let events = runtime.subscribe_events().await;
   1097         let mut session = TangleWebSocketSession::new(
   1098             session_limits(8),
   1099             shutdown.subscribe(),
   1100             runtime.clone(),
   1101             runtime.auth_state().await.expect("session auth"),
   1102             events,
   1103         )
   1104         .expect("session");
   1105         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "complete")
   1106             .expect("event");
   1107 
   1108         assert_eq!(
   1109             runtime
   1110                 .handle_protocol_client_message_for_test(
   1111                     ClientMessage::Event(event.clone()),
   1112                     &mut auth,
   1113                     UnixTimestamp::new(1_714_124_433)
   1114                 )
   1115                 .await
   1116                 .expect("event"),
   1117             vec![RelayMessage::Ok {
   1118                 event_id: event.id().clone(),
   1119                 accepted: true,
   1120                 message: String::new()
   1121             }]
   1122         );
   1123         let exact_id = SubscriptionId::new("exact-id").expect("subscription");
   1124         assert_eq!(
   1125             session
   1126                 .handle_protocol_client_message_for_test(ClientMessage::Req {
   1127                     subscription_id: exact_id.clone(),
   1128                     filters: vec![
   1129                         filter_from_value(&json!({"ids":[event.id().as_str()]}))
   1130                             .expect("exact filter")
   1131                     ],
   1132                 })
   1133                 .await
   1134                 .expect("exact req"),
   1135             vec![
   1136                 RelayMessage::Event {
   1137                     subscription_id: exact_id.clone(),
   1138                     event: event.clone()
   1139                 },
   1140                 RelayMessage::Eose(exact_id)
   1141             ]
   1142         );
   1143         assert_eq!(session.active_subscription_count(), 0);
   1144 
   1145         let open = SubscriptionId::new("open").expect("subscription");
   1146         assert_eq!(
   1147             session
   1148                 .handle_protocol_client_message_for_test(ClientMessage::Req {
   1149                     subscription_id: open.clone(),
   1150                     filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("open filter")],
   1151                 })
   1152                 .await
   1153                 .expect("open req"),
   1154             vec![
   1155                 RelayMessage::Event {
   1156                     subscription_id: open.clone(),
   1157                     event
   1158                 },
   1159                 RelayMessage::Eose(open.clone())
   1160             ]
   1161         );
   1162         assert_eq!(session.active_subscription_count(), 1);
   1163 
   1164         let search = SubscriptionId::new("search").expect("subscription");
   1165         assert_eq!(
   1166             session
   1167                 .handle_protocol_client_message_for_test(ClientMessage::Req {
   1168                     subscription_id: search.clone(),
   1169                     filters: vec![
   1170                         filter_from_value(&json!({"search":"carrots"})).expect("search filter")
   1171                     ],
   1172                 })
   1173                 .await
   1174                 .expect("search req"),
   1175             vec![RelayMessage::Closed {
   1176                 subscription_id: search,
   1177                 message: "unsupported: search filters are not supported".to_owned()
   1178             }]
   1179         );
   1180         assert_eq!(session.active_subscription_count(), 1);
   1181 
   1182         let invalid = SubscriptionId::new("invalid").expect("subscription");
   1183         let invalid_result = session
   1184             .handle_protocol_client_message_for_test(ClientMessage::Req {
   1185                 subscription_id: invalid,
   1186                 filters: vec![Filter::empty(); 11],
   1187             })
   1188             .await;
   1189         assert!(invalid_result.is_err());
   1190         assert_eq!(session.active_subscription_count(), 1);
   1191 
   1192         let _ = std::fs::remove_dir_all(root);
   1193     }
   1194 
   1195     #[tokio::test]
   1196     async fn websocket_session_redacted_initial_req_closes_without_live_subscription() {
   1197         let shutdown = TangleShutdownSignal::new();
   1198         let root = temp_root("redacted-req-close");
   1199         let _ = std::fs::remove_dir_all(&root);
   1200         let runtime = RelayRuntimeHandle::new(
   1201             RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"),
   1202         );
   1203         let mut owner_auth = runtime.auth_state().await.expect("owner auth");
   1204         owner_auth
   1205             .issue_challenge("owner-redacted", UnixTimestamp::new(100))
   1206             .expect("owner challenge");
   1207         let owner_auth_event = tangle_v2_auth_event(FixtureKey::Owner, "owner-redacted", 120)
   1208             .expect("owner auth event");
   1209         assert_eq!(
   1210             runtime
   1211                 .handle_protocol_client_message_for_test(
   1212                     ClientMessage::Auth(owner_auth_event.clone()),
   1213                     &mut owner_auth,
   1214                     UnixTimestamp::new(120)
   1215                 )
   1216                 .await
   1217                 .expect("owner auth"),
   1218             vec![RelayMessage::Ok {
   1219                 event_id: owner_auth_event.id().clone(),
   1220                 accepted: true,
   1221                 message: String::new()
   1222             }]
   1223         );
   1224         let create =
   1225             tangle_v2_group_create_event(FixtureKey::Owner, "RedactedFarm", 121, &["private"])
   1226                 .expect("create");
   1227         assert_eq!(
   1228             runtime
   1229                 .handle_protocol_client_message_for_test(
   1230                     ClientMessage::Event(create.clone()),
   1231                     &mut owner_auth,
   1232                     UnixTimestamp::new(121)
   1233                 )
   1234                 .await
   1235                 .expect("create"),
   1236             vec![RelayMessage::Ok {
   1237                 event_id: create.id().clone(),
   1238                 accepted: true,
   1239                 message: String::new()
   1240             }]
   1241         );
   1242         let public_event =
   1243             tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "public")
   1244                 .expect("public");
   1245         assert_eq!(
   1246             runtime
   1247                 .handle_protocol_client_message_for_test(
   1248                     ClientMessage::Event(public_event.clone()),
   1249                     &mut owner_auth,
   1250                     UnixTimestamp::new(122)
   1251                 )
   1252                 .await
   1253                 .expect("public event"),
   1254             vec![RelayMessage::Ok {
   1255                 event_id: public_event.id().clone(),
   1256                 accepted: true,
   1257                 message: String::new()
   1258             }]
   1259         );
   1260         let private_event =
   1261             tangle_v2_group_event(FixtureKey::Owner, "RedactedFarm", 123, 1, "private")
   1262                 .expect("private");
   1263         assert_eq!(
   1264             runtime
   1265                 .handle_protocol_client_message_for_test(
   1266                     ClientMessage::Event(private_event.clone()),
   1267                     &mut owner_auth,
   1268                     UnixTimestamp::new(123)
   1269                 )
   1270                 .await
   1271                 .expect("private event"),
   1272             vec![RelayMessage::Ok {
   1273                 event_id: private_event.id().clone(),
   1274                 accepted: true,
   1275                 message: String::new()
   1276             }]
   1277         );
   1278 
   1279         let events = runtime.subscribe_events().await;
   1280         let mut session = TangleWebSocketSession::new(
   1281             session_limits(8),
   1282             shutdown.subscribe(),
   1283             runtime.clone(),
   1284             runtime.auth_state().await.expect("session auth"),
   1285             events,
   1286         )
   1287         .expect("session");
   1288         let subscription_id = SubscriptionId::new("redacted-req").expect("subscription");
   1289         assert_eq!(
   1290             session
   1291                 .handle_protocol_client_message_for_test(ClientMessage::Req {
   1292                     subscription_id: subscription_id.clone(),
   1293                     filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")],
   1294                 })
   1295                 .await
   1296                 .expect("redacted req"),
   1297             vec![
   1298                 RelayMessage::Event {
   1299                     subscription_id: subscription_id.clone(),
   1300                     event: public_event
   1301                 },
   1302                 RelayMessage::Closed {
   1303                     subscription_id,
   1304                     message: "auth-required: authentication required to read group events"
   1305                         .to_owned()
   1306                 }
   1307             ]
   1308         );
   1309         assert_eq!(session.active_subscription_count(), 0);
   1310 
   1311         let _ = std::fs::remove_dir_all(root);
   1312     }
   1313 
   1314     #[tokio::test]
   1315     async fn websocket_session_preserves_chorus_close_scope_parity() {
   1316         let shutdown = TangleShutdownSignal::new();
   1317         let root = temp_root("chorus-close-scope-parity");
   1318         let _ = std::fs::remove_dir_all(&root);
   1319         let runtime =
   1320             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime"));
   1321         let metrics = runtime.metrics();
   1322         let auth_a = runtime.auth_state().await.expect("auth a");
   1323         let auth_b = runtime.auth_state().await.expect("auth b");
   1324         let events_a = runtime.subscribe_events().await;
   1325         let events_b = runtime.subscribe_events().await;
   1326         let mut first = TangleWebSocketSession::new(
   1327             session_limits(8),
   1328             shutdown.subscribe(),
   1329             runtime.clone(),
   1330             auth_a,
   1331             events_a,
   1332         )
   1333         .expect("first");
   1334         let mut second = TangleWebSocketSession::new(
   1335             session_limits(8),
   1336             shutdown.subscribe(),
   1337             runtime,
   1338             auth_b,
   1339             events_b,
   1340         )
   1341         .expect("second");
   1342         let subscription_id = SubscriptionId::new("shared-close").expect("subscription");
   1343         let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string();
   1344 
   1345         assert_eq!(
   1346             first.dispatch_text(&req_text).await,
   1347             TangleSessionControl::Continue
   1348         );
   1349         assert_eq!(
   1350             take_outbound_text(&mut first),
   1351             RelayMessage::Eose(subscription_id.clone()).encode()
   1352         );
   1353         assert_eq!(
   1354             second.dispatch_text(&req_text).await,
   1355             TangleSessionControl::Continue
   1356         );
   1357         assert_eq!(
   1358             take_outbound_text(&mut second),
   1359             RelayMessage::Eose(subscription_id.clone()).encode()
   1360         );
   1361         assert_eq!(first.active_subscription_count(), 1);
   1362         assert_eq!(second.active_subscription_count(), 1);
   1363 
   1364         let close_text = json!(["CLOSE", subscription_id.as_str()]).to_string();
   1365         assert_eq!(
   1366             first.dispatch_text(&close_text).await,
   1367             TangleSessionControl::Continue
   1368         );
   1369         assert!(first.outbound_receiver.try_recv().is_err());
   1370         assert_eq!(
   1371             first.dispatch_text(&close_text).await,
   1372             TangleSessionControl::Continue
   1373         );
   1374         assert!(first.outbound_receiver.try_recv().is_err());
   1375         assert_eq!(first.active_subscription_count(), 0);
   1376         assert_eq!(second.active_subscription_count(), 1);
   1377 
   1378         let event = tangle_v2_event(
   1379             FixtureKey::Member,
   1380             1_714_124_433,
   1381             1,
   1382             Vec::new(),
   1383             "close scope parity",
   1384         )
   1385         .expect("event");
   1386         assert_eq!(
   1387             first
   1388                 .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string())
   1389                 .await,
   1390             TangleSessionControl::Continue
   1391         );
   1392         assert_eq!(
   1393             take_outbound_text(&mut first),
   1394             RelayMessage::Ok {
   1395                 event_id: event.id().clone(),
   1396                 accepted: true,
   1397                 message: String::new()
   1398             }
   1399             .encode()
   1400         );
   1401 
   1402         let first_offset = first.events.recv().await;
   1403         let second_offset = second.events.recv().await;
   1404         assert_eq!(
   1405             first.handle_event_receive_result(first_offset).await,
   1406             TangleSessionControl::Continue
   1407         );
   1408         assert!(first.outbound_receiver.try_recv().is_err());
   1409         assert_eq!(
   1410             second.handle_event_receive_result(second_offset).await,
   1411             TangleSessionControl::Continue
   1412         );
   1413         assert_relay_message_text(
   1414             &take_outbound_text(&mut second),
   1415             RelayMessage::Event {
   1416                 subscription_id: subscription_id.clone(),
   1417                 event,
   1418             },
   1419         );
   1420         let snapshot = metrics.snapshot();
   1421         assert_eq!(snapshot.client_messages(), 5);
   1422         assert_eq!(snapshot.event_messages(), 1);
   1423         assert_eq!(snapshot.req_messages(), 2);
   1424         assert_eq!(snapshot.close_messages(), 2);
   1425         assert_eq!(snapshot.opened_subscriptions(), 2);
   1426         assert_eq!(snapshot.closed_subscriptions(), 1);
   1427 
   1428         let _ = std::fs::remove_dir_all(root);
   1429     }
   1430 
   1431     #[tokio::test]
   1432     async fn websocket_session_rate_limited_req_does_not_subscribe() {
   1433         let shutdown = TangleShutdownSignal::new();
   1434         let root = temp_root("rate-limited-req");
   1435         let _ = std::fs::remove_dir_all(&root);
   1436         let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime");
   1437         let rule = runtime.config().rate_limits().req().per_connection();
   1438         let runtime = RelayRuntimeHandle::new(runtime);
   1439         let auth = runtime.auth_state().await.expect("auth");
   1440         let events = runtime.subscribe_events().await;
   1441         let now = current_unix_timestamp();
   1442         let mut session = TangleWebSocketSession::new(
   1443             session_limits(8),
   1444             shutdown.subscribe(),
   1445             runtime.clone(),
   1446             auth,
   1447             events,
   1448         )
   1449         .expect("session");
   1450         let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, session.connection_id);
   1451         let limiter = runtime.rate_limiter().await;
   1452         for _ in 0..rule.max_hits() {
   1453             limiter.record(key.clone(), rule, now);
   1454         }
   1455         let subscription_id = SubscriptionId::new("limited").expect("subscription");
   1456 
   1457         assert_eq!(
   1458             session
   1459                 .handle_protocol_client_message_for_test(ClientMessage::Req {
   1460                     subscription_id: subscription_id.clone(),
   1461                     filters: vec![
   1462                         filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")
   1463                     ]
   1464                 })
   1465                 .await
   1466                 .expect("req"),
   1467             vec![RelayMessage::Closed {
   1468                 subscription_id,
   1469                 message: format!(
   1470                     "rate-limited: req connection rate limit exceeded until {}",
   1471                     now.as_u64() + 60
   1472                 )
   1473             }]
   1474         );
   1475         assert_eq!(session.active_subscription_count(), 0);
   1476         let snapshot = runtime.metrics().snapshot();
   1477         assert_eq!(snapshot.client_messages(), 1);
   1478         assert_eq!(snapshot.req_messages(), 1);
   1479         assert_eq!(snapshot.opened_subscriptions(), 0);
   1480         assert_eq!(snapshot.rate_limit_rejections(), 1);
   1481 
   1482         let _ = std::fs::remove_dir_all(root);
   1483     }
   1484 
   1485     #[tokio::test]
   1486     async fn websocket_session_closes_when_event_receiver_lags() {
   1487         let shutdown = TangleShutdownSignal::new();
   1488         let root = temp_root("event-receiver-lag");
   1489         let _ = std::fs::remove_dir_all(&root);
   1490         let runtime =
   1491             RelayRuntime::open(runtime_config_with_outbound_queue(&root, 1)).expect("runtime");
   1492         let auth = runtime.auth_state().expect("auth");
   1493         let events = runtime.event_bus().subscribe();
   1494         assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1);
   1495         assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1);
   1496         let runtime = RelayRuntimeHandle::new(runtime);
   1497         let metrics = runtime.metrics();
   1498         let mut session = TangleWebSocketSession::new(
   1499             session_limits(1),
   1500             shutdown.subscribe(),
   1501             runtime,
   1502             auth,
   1503             events,
   1504         )
   1505         .expect("session");
   1506         let event = session.events.recv().await;
   1507 
   1508         assert_eq!(
   1509             session.handle_event_receive_result(event).await,
   1510             TangleSessionControl::Close(event_stream_lag_close_message())
   1511         );
   1512         assert_eq!(metrics.event_bus_lagged_receivers(), 1);
   1513         assert_eq!(metrics.event_bus_lagged_offsets(), 1);
   1514 
   1515         let _ = std::fs::remove_dir_all(root);
   1516     }
   1517 
   1518     #[tokio::test]
   1519     async fn websocket_session_preserves_chorus_live_fanout_backpressure_parity() {
   1520         let shutdown = TangleShutdownSignal::new();
   1521         let live_root = temp_root("chorus-live-fanout-parity");
   1522         let _ = std::fs::remove_dir_all(&live_root);
   1523         let runtime = RelayRuntimeHandle::new(
   1524             RelayRuntime::open(runtime_config_with_outbound_queue(&live_root, 1)).expect("runtime"),
   1525         );
   1526         let metrics = runtime.metrics();
   1527         let auth = runtime.auth_state().await.expect("auth");
   1528         let events = runtime.subscribe_events().await;
   1529         let mut session = TangleWebSocketSession::new(
   1530             session_limits(1),
   1531             shutdown.subscribe(),
   1532             runtime,
   1533             auth,
   1534             events,
   1535         )
   1536         .expect("session");
   1537         let subscription_id = SubscriptionId::new("chorus-live").expect("subscription");
   1538         let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string();
   1539 
   1540         assert_eq!(
   1541             session.dispatch_text(&req_text).await,
   1542             TangleSessionControl::Continue
   1543         );
   1544         assert_eq!(
   1545             take_outbound_text(&mut session),
   1546             RelayMessage::Eose(subscription_id.clone()).encode()
   1547         );
   1548         for index in 0..3 {
   1549             let content = format!("chorus live {index}");
   1550             let event = tangle_v2_event(
   1551                 FixtureKey::Member,
   1552                 1_714_124_433 + index,
   1553                 1,
   1554                 Vec::new(),
   1555                 &content,
   1556             )
   1557             .expect("event");
   1558             assert_eq!(
   1559                 session
   1560                     .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string())
   1561                     .await,
   1562                 TangleSessionControl::Continue
   1563             );
   1564             assert_eq!(
   1565                 take_outbound_text(&mut session),
   1566                 RelayMessage::Ok {
   1567                     event_id: event.id().clone(),
   1568                     accepted: true,
   1569                     message: String::new()
   1570                 }
   1571                 .encode()
   1572             );
   1573             let offset = session.events.recv().await;
   1574             assert_eq!(
   1575                 session.handle_event_receive_result(offset).await,
   1576                 TangleSessionControl::Continue
   1577             );
   1578             assert_relay_message_text(
   1579                 &take_outbound_text(&mut session),
   1580                 RelayMessage::Event {
   1581                     subscription_id: subscription_id.clone(),
   1582                     event,
   1583                 },
   1584             );
   1585             assert_eq!(session.active_subscription_count(), 1);
   1586         }
   1587         assert_eq!(metrics.outbound_queue_full_closes(), 0);
   1588         assert_eq!(metrics.event_bus_lagged_receivers(), 0);
   1589         assert_eq!(metrics.event_bus_lagged_offsets(), 0);
   1590         let _ = std::fs::remove_dir_all(live_root);
   1591 
   1592         let lag_root = temp_root("chorus-live-lag-parity");
   1593         let _ = std::fs::remove_dir_all(&lag_root);
   1594         let runtime =
   1595             RelayRuntime::open(runtime_config_with_outbound_queue(&lag_root, 1)).expect("runtime");
   1596         let auth = runtime.auth_state().expect("auth");
   1597         let events = runtime.event_bus().subscribe();
   1598         assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1);
   1599         assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1);
   1600         let runtime = RelayRuntimeHandle::new(runtime);
   1601         let metrics = runtime.metrics();
   1602         let mut lagged = TangleWebSocketSession::new(
   1603             session_limits(1),
   1604             shutdown.subscribe(),
   1605             runtime,
   1606             auth,
   1607             events,
   1608         )
   1609         .expect("lagged");
   1610         let event = lagged.events.recv().await;
   1611         assert_eq!(
   1612             lagged.handle_event_receive_result(event).await,
   1613             TangleSessionControl::Close(event_stream_lag_close_message())
   1614         );
   1615         assert_eq!(metrics.event_bus_lagged_receivers(), 1);
   1616         assert_eq!(metrics.event_bus_lagged_offsets(), 1);
   1617         let _ = std::fs::remove_dir_all(lag_root);
   1618 
   1619         let (runtime, auth, events) = session_runtime("chorus-outbound-full-parity");
   1620         let metrics = runtime.metrics();
   1621         let mut blocked = TangleWebSocketSession::new(
   1622             session_limits(1),
   1623             shutdown.subscribe(),
   1624             runtime,
   1625             auth,
   1626             events,
   1627         )
   1628         .expect("blocked");
   1629         blocked
   1630             .outbound()
   1631             .try_send(Message::Text("blocked".into()))
   1632             .expect("fill queue");
   1633         assert_eq!(
   1634             blocked.dispatch_text("{").await,
   1635             TangleSessionControl::Close(outbound_queue_full_close_message())
   1636         );
   1637         assert_eq!(metrics.outbound_queue_full_closes(), 1);
   1638     }
   1639 
   1640     #[test]
   1641     fn outbound_queue_is_bounded() {
   1642         let shutdown = TangleShutdownSignal::new();
   1643         let (runtime, auth, events) = session_runtime("outbound-queue");
   1644         let session = TangleWebSocketSession::new(
   1645             session_limits(1),
   1646             shutdown.subscribe(),
   1647             runtime,
   1648             auth,
   1649             events,
   1650         )
   1651         .expect("session");
   1652         let outbound = session.outbound();
   1653 
   1654         assert_eq!(outbound.capacity(), 1);
   1655         outbound
   1656             .try_send(Message::Text("first".into()))
   1657             .expect("first");
   1658         assert_eq!(
   1659             outbound
   1660                 .try_send(Message::Text("second".into()))
   1661                 .expect_err("full"),
   1662             TangleOutboundQueueError::Full
   1663         );
   1664     }
   1665 
   1666     #[tokio::test]
   1667     async fn websocket_session_closes_when_outbound_queue_is_full() {
   1668         let shutdown = TangleShutdownSignal::new();
   1669         let (runtime, auth, events) = session_runtime("outbound-queue-full-close");
   1670         let metrics = runtime.metrics();
   1671         let mut session = TangleWebSocketSession::new(
   1672             session_limits(1),
   1673             shutdown.subscribe(),
   1674             runtime,
   1675             auth,
   1676             events,
   1677         )
   1678         .expect("session");
   1679         session
   1680             .outbound()
   1681             .try_send(Message::Text("blocked".into()))
   1682             .expect("fill queue");
   1683 
   1684         assert_eq!(
   1685             session.dispatch_text("{").await,
   1686             TangleSessionControl::Close(outbound_queue_full_close_message())
   1687         );
   1688         assert_eq!(metrics.outbound_queue_full_closes(), 1);
   1689     }
   1690 
   1691     fn tangle_v2_event(
   1692         key: FixtureKey,
   1693         created_at: u64,
   1694         kind: u64,
   1695         tags: Vec<Tag>,
   1696         content: &str,
   1697     ) -> Result<Event, String> {
   1698         let event = session_pocket_event(key, created_at, kind, tags, content);
   1699         session_pocket_event_to_protocol(&event)
   1700     }
   1701 
   1702     fn tangle_v2_auth_event(
   1703         key: FixtureKey,
   1704         challenge: &str,
   1705         created_at: u64,
   1706     ) -> Result<Event, String> {
   1707         tangle_v2_event(
   1708             key,
   1709             created_at,
   1710             22_242,
   1711             vec![
   1712                 Tag::from_parts("relay", &["wss://relay.radroots.test"])?,
   1713                 Tag::from_parts("challenge", &[challenge])?,
   1714             ],
   1715             "",
   1716         )
   1717     }
   1718 
   1719     fn tangle_v2_group_create_event(
   1720         key: FixtureKey,
   1721         group_id: &str,
   1722         created_at: u64,
   1723         flags: &[&str],
   1724     ) -> Result<Event, String> {
   1725         let mut tags = vec![
   1726             Tag::from_parts("h", &[group_id])?,
   1727             Tag::from_parts("name", &[group_id])?,
   1728         ];
   1729         for flag in flags {
   1730             tags.push(Tag::from_parts(flag, &[])?);
   1731         }
   1732         tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "")
   1733     }
   1734 
   1735     fn tangle_v2_group_event(
   1736         key: FixtureKey,
   1737         group_id: &str,
   1738         created_at: u64,
   1739         kind: u64,
   1740         content: &str,
   1741     ) -> Result<Event, String> {
   1742         tangle_v2_event(
   1743             key,
   1744             created_at,
   1745             kind,
   1746             vec![Tag::from_parts("h", &[group_id])?],
   1747             content,
   1748         )
   1749     }
   1750 
   1751     fn session_pocket_event(
   1752         key: FixtureKey,
   1753         created_at: u64,
   1754         kind: u64,
   1755         tags: Vec<Tag>,
   1756         content: &str,
   1757     ) -> PocketOwnedEvent {
   1758         let tags = session_pocket_tags_from_protocol(&tags);
   1759         let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32);
   1760         RelaySigner::from_secret_hex(&secret)
   1761             .expect("signer")
   1762             .sign_pocket_event(
   1763                 PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")),
   1764                 &tags,
   1765                 PocketTime::from_u64(created_at),
   1766                 content.as_bytes(),
   1767             )
   1768             .expect("pocket event")
   1769     }
   1770 
   1771     fn session_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags {
   1772         let parts = tags
   1773             .iter()
   1774             .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
   1775             .collect::<Vec<_>>();
   1776         PocketOwnedTags::new(&parts).expect("pocket tags")
   1777     }
   1778 
   1779     fn session_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> {
   1780         let tags = event
   1781             .tags()
   1782             .map_err(|error| error.to_string())?
   1783             .iter()
   1784             .map(|tag| {
   1785                 Tag::new(
   1786                     tag.map(|value| {
   1787                         std::str::from_utf8(value)
   1788                             .map(str::to_owned)
   1789                             .map_err(|error| error.to_string())
   1790                     })
   1791                     .collect::<Result<Vec<_>, _>>()?,
   1792                 )
   1793                 .map_err(|error| error.to_string())
   1794             })
   1795             .collect::<Result<Vec<_>, _>>()?;
   1796         Ok(Event::new(
   1797             EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?,
   1798             UnsignedEvent::new(
   1799                 PublicKeyHex::new(&event.pubkey().as_hex_string())
   1800                     .map_err(|error| error.to_string())?,
   1801                 UnixTimestamp::new(event.created_at().as_u64()),
   1802                 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?,
   1803                 tags,
   1804                 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?,
   1805             ),
   1806             SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?,
   1807         ))
   1808     }
   1809 
   1810     fn fixture_secret_byte(key: FixtureKey) -> u8 {
   1811         match key {
   1812             FixtureKey::Relay => 9,
   1813             FixtureKey::Owner => 10,
   1814             FixtureKey::Admin => 11,
   1815             FixtureKey::Member => 12,
   1816             FixtureKey::Outsider => 13,
   1817         }
   1818     }
   1819 
   1820     fn session_runtime(
   1821         name: &str,
   1822     ) -> (
   1823         RelayRuntimeHandle,
   1824         crate::relay::auth::BaseAuthState,
   1825         TangleEventReceiver,
   1826     ) {
   1827         let root = temp_root(name);
   1828         let _ = std::fs::remove_dir_all(&root);
   1829         let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime");
   1830         let auth = runtime.auth_state().expect("auth");
   1831         let events = runtime.event_bus().subscribe();
   1832         (RelayRuntimeHandle::new(runtime), auth, events)
   1833     }
   1834 
   1835     fn req(subscription_id: SubscriptionId) -> ClientMessage {
   1836         ClientMessage::Req {
   1837             subscription_id,
   1838             filters: vec![Filter::empty()],
   1839         }
   1840     }
   1841 
   1842     fn take_outbound_text(session: &mut TangleWebSocketSession) -> String {
   1843         let message = session.outbound_receiver.try_recv().expect("message");
   1844         let Message::Text(text) = message else {
   1845             panic!("expected text message")
   1846         };
   1847         text.to_string()
   1848     }
   1849 
   1850     fn assert_relay_message_text(actual: &str, expected: RelayMessage) {
   1851         assert_eq!(
   1852             serde_json::from_str::<serde_json::Value>(actual).expect("actual relay JSON"),
   1853             serde_json::from_str::<serde_json::Value>(&expected.encode())
   1854                 .expect("expected relay JSON")
   1855         );
   1856     }
   1857 
   1858     fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig {
   1859         runtime_config_with_outbound_queue(root, 8)
   1860     }
   1861 
   1862     fn runtime_config_with_groups(root: &Path) -> BaseRelayRuntimeConfig {
   1863         let raw = json!({
   1864             "server": {
   1865                 "listen_addr": "127.0.0.1:0",
   1866                 "relay_url": "wss://relay.radroots.test"
   1867             },
   1868             "pocket": {
   1869                 "data_directory": root.join("pocket"),
   1870                 "sync_policy": "flush_on_shutdown",
   1871                 "query": {
   1872                   "allow_scraping": false,
   1873                   "allow_scrape_if_limited_to": 100,
   1874                   "allow_scrape_if_max_seconds": 3600
   1875                 }
   1876             },
   1877             "groups": {
   1878                 "enabled": true,
   1879                 "canonical_relay_url": "wss://relay.radroots.test",
   1880                 "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777",
   1881                 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()],
   1882                 "policy": {
   1883                     "public_join": false,
   1884                     "invites_enabled": false
   1885                 }
   1886             },
   1887             "auth": {
   1888                 "challenge_ttl_seconds": 300,
   1889                 "created_at_skew_seconds": 600
   1890             },
   1891             "limits": {
   1892                 "max_message_length": 1048576,
   1893                 "max_subid_length": 64,
   1894                 "max_subscriptions_per_connection": 64,
   1895                 "max_filters_per_request": 10,
   1896                 "max_tag_values_per_filter": 100,
   1897                 "max_query_complexity": 2048,
   1898                 "max_limit": 500,
   1899                 "default_limit": 100,
   1900                 "max_event_tags": 200,
   1901                 "max_content_length": 65536,
   1902                 "broadcast_channel_capacity": 8,
   1903                 "per_connection_outbound_queue": 8
   1904             },
   1905             "rate_limits": {
   1906                 "auth": {
   1907                     "per_ip": {"window_seconds": 60, "max_hits": 120},
   1908                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
   1909                     "failures": {"window_seconds": 300, "max_hits": 5},
   1910                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
   1911                 },
   1912                 "event": {
   1913                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   1914                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   1915                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
   1916                 },
   1917                 "group": {
   1918                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
   1919                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
   1920                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
   1921                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
   1922                     "join_flow": {"window_seconds": 300, "max_hits": 10},
   1923                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
   1924                 },
   1925                 "req": {
   1926                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   1927                     "per_connection": {"window_seconds": 60, "max_hits": 120},
   1928                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
   1929                     "per_group": {"window_seconds": 60, "max_hits": 240},
   1930                     "per_kind": {"window_seconds": 60, "max_hits": 500},
   1931                     "broad": {"window_seconds": 60, "max_hits": 30}
   1932                 },
   1933                 "count": {
   1934                     "per_ip": {"window_seconds": 60, "max_hits": 300},
   1935                     "per_connection": {"window_seconds": 60, "max_hits": 60},
   1936                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   1937                     "per_group": {"window_seconds": 60, "max_hits": 120},
   1938                     "per_kind": {"window_seconds": 60, "max_hits": 240},
   1939                     "broad": {"window_seconds": 60, "max_hits": 20}
   1940                 }
   1941             }
   1942         })
   1943         .to_string();
   1944         parse_base_relay_runtime_config_json(&raw).expect("config")
   1945     }
   1946 
   1947     fn runtime_config_with_outbound_queue(
   1948         root: &Path,
   1949         per_connection_outbound_queue: usize,
   1950     ) -> BaseRelayRuntimeConfig {
   1951         let raw = json!({
   1952             "server": {
   1953                 "listen_addr": "127.0.0.1:0",
   1954                 "relay_url": "wss://relay.radroots.test"
   1955             },
   1956             "pocket": {
   1957                 "data_directory": root.join("pocket"),
   1958                 "sync_policy": "flush_on_shutdown",
   1959                 "query": {
   1960                   "allow_scraping": false,
   1961                   "allow_scrape_if_limited_to": 100,
   1962                   "allow_scrape_if_max_seconds": 3600
   1963                 }
   1964             },
   1965             "groups": {
   1966                 "enabled": false
   1967             },
   1968             "auth": {
   1969                 "challenge_ttl_seconds": 300,
   1970                 "created_at_skew_seconds": 600
   1971             },
   1972             "limits": {
   1973                 "max_message_length": 1048576,
   1974                 "max_subid_length": 64,
   1975                 "max_subscriptions_per_connection": 64,
   1976                 "max_filters_per_request": 10,
   1977                 "max_tag_values_per_filter": 100,
   1978                 "max_query_complexity": 2048,
   1979                 "max_limit": 500,
   1980                 "default_limit": 100,
   1981                 "max_event_tags": 200,
   1982                 "max_content_length": 65536,
   1983                 "broadcast_channel_capacity": per_connection_outbound_queue,
   1984                 "per_connection_outbound_queue": per_connection_outbound_queue
   1985             },
   1986             "rate_limits": {
   1987                 "auth": {
   1988                     "per_ip": {"window_seconds": 60, "max_hits": 120},
   1989                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
   1990                     "failures": {"window_seconds": 300, "max_hits": 5},
   1991                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
   1992                 },
   1993                 "event": {
   1994                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   1995                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   1996                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
   1997                 },
   1998                 "group": {
   1999                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
   2000                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
   2001                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
   2002                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
   2003                     "join_flow": {"window_seconds": 300, "max_hits": 10},
   2004                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
   2005                 },
   2006                 "req": {
   2007                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   2008                     "per_connection": {"window_seconds": 60, "max_hits": 120},
   2009                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
   2010                     "per_group": {"window_seconds": 60, "max_hits": 240},
   2011                     "per_kind": {"window_seconds": 60, "max_hits": 500},
   2012                     "broad": {"window_seconds": 60, "max_hits": 30}
   2013                 },
   2014                 "count": {
   2015                     "per_ip": {"window_seconds": 60, "max_hits": 300},
   2016                     "per_connection": {"window_seconds": 60, "max_hits": 60},
   2017                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   2018                     "per_group": {"window_seconds": 60, "max_hits": 120},
   2019                     "per_kind": {"window_seconds": 60, "max_hits": 240},
   2020                     "broad": {"window_seconds": 60, "max_hits": 20}
   2021                 }
   2022             }
   2023         })
   2024         .to_string();
   2025         parse_base_relay_runtime_config_json(&raw).expect("config")
   2026     }
   2027 
   2028     fn session_limits(per_connection_outbound_queue: usize) -> TangleRuntimeLimits {
   2029         session_limits_result(per_connection_outbound_queue).expect("limits")
   2030     }
   2031 
   2032     fn session_limits_with_message_length(
   2033         max_message_length: usize,
   2034         per_connection_outbound_queue: usize,
   2035     ) -> TangleRuntimeLimits {
   2036         TangleRuntimeLimits::new(
   2037             max_message_length,
   2038             BaseRelayLimits::new(BaseRelayLimitSettings {
   2039                 max_pending_events: per_connection_outbound_queue,
   2040                 max_subscription_id_length: 64,
   2041                 max_subscriptions: 64,
   2042                 max_filters_per_request: 10,
   2043                 max_tag_values_per_filter: 100,
   2044                 max_query_complexity: 610,
   2045                 max_event_tags: 200,
   2046                 max_content_length: 65_536,
   2047                 max_limit: 500,
   2048                 default_limit: 100,
   2049             })
   2050             .expect("relay limits"),
   2051             16,
   2052             per_connection_outbound_queue,
   2053         )
   2054         .expect("limits")
   2055     }
   2056 
   2057     fn session_limits_result(
   2058         per_connection_outbound_queue: usize,
   2059     ) -> Result<TangleRuntimeLimits, BaseRelayError> {
   2060         TangleRuntimeLimits::new(
   2061             1_048_576,
   2062             BaseRelayLimits::new(BaseRelayLimitSettings {
   2063                 max_pending_events: per_connection_outbound_queue,
   2064                 max_subscription_id_length: 64,
   2065                 max_subscriptions: 64,
   2066                 max_filters_per_request: 10,
   2067                 max_tag_values_per_filter: 100,
   2068                 max_query_complexity: 610,
   2069                 max_event_tags: 200,
   2070                 max_content_length: 65_536,
   2071                 max_limit: 500,
   2072                 default_limit: 100,
   2073             })?,
   2074             16,
   2075             per_connection_outbound_queue,
   2076         )
   2077     }
   2078 
   2079     fn temp_root(name: &str) -> PathBuf {
   2080         std::env::temp_dir().join(format!("tangle-session-{name}-{}", std::process::id()))
   2081     }
   2082 }