tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

runtime.rs (202887B)


      1 #![forbid(unsafe_code)]
      2 
      3 #[cfg(test)]
      4 use crate::relay::outbound::protocol_messages_for_test;
      5 use crate::{
      6     client_message::RuntimeClientMessage,
      7     config::BaseRelayRuntimeConfig,
      8     errors::BaseRelayError,
      9     event_bus::{TangleEventBus, TangleEventReceiver},
     10     groups::GroupServiceHandle,
     11     logging,
     12     ops::{BaseRelayReadinessHandle, BaseRelayReadinessState},
     13     pocket_event_validation::{pocket_event_id, pocket_event_kind, pocket_event_pubkey},
     14     rate_limits::{
     15         TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey,
     16         TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter,
     17     },
     18     relay::{
     19         auth::BaseAuthState,
     20         core::{
     21             BaseRelay, BaseRelayCountQuery, BaseRelayCountReport, BaseRelayEventWrite,
     22             BaseRelayLimits, BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayReqQuery,
     23             BaseRelayShutdownReport,
     24         },
     25         live::LiveSubscriptionSet,
     26         outbound::{RuntimeRelayMessage, protocol_control_messages},
     27     },
     28 };
     29 use serde::{Deserialize, Serialize};
     30 use std::{
     31     collections::BTreeSet,
     32     fmt, fs,
     33     net::IpAddr,
     34     path::Path,
     35     str,
     36     sync::{
     37         Arc,
     38         atomic::{AtomicU64, AtomicUsize, Ordering},
     39     },
     40     time::Instant,
     41 };
     42 use tangle_groups::{
     43     GroupAuthContext, GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset,
     44     validate_client_group_event_structure,
     45 };
     46 use tangle_protocol::{Kind, RelayMessage, SubscriptionId, UnixTimestamp};
     47 use tangle_store_pocket::{
     48     PocketEvent, PocketFilter, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle, PocketTime,
     49 };
     50 use tokio::sync::watch;
     51 
     52 pub struct RelayRuntime {
     53     config: BaseRelayRuntimeConfig,
     54     relay: BaseRelay,
     55     readiness: BaseRelayReadinessHandle,
     56     limits: TangleRuntimeLimits,
     57     event_bus: TangleEventBus,
     58     rate_limiter: TangleRateLimiter,
     59     metrics: TangleRuntimeMetrics,
     60     shutdown: TangleShutdownSignal,
     61     hooks: Arc<dyn RelayRuntimeHooks>,
     62 }
     63 
     64 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
     65 pub struct TangleClientRateLimitContext {
     66     peer_ip: Option<IpAddr>,
     67     connection_id: Option<u64>,
     68 }
     69 
     70 impl TangleClientRateLimitContext {
     71     pub fn new(peer_ip: Option<IpAddr>, connection_id: Option<u64>) -> Self {
     72         Self {
     73             peer_ip,
     74             connection_id,
     75         }
     76     }
     77 
     78     pub fn peer_ip(self) -> Option<IpAddr> {
     79         self.peer_ip
     80     }
     81 
     82     pub fn connection_id(self) -> Option<u64> {
     83         self.connection_id
     84     }
     85 }
     86 
     87 pub trait RelayRuntimeHooks: Send + Sync {
     88     fn admit_event(&self, _context: &RelayEventAdmissionContext) -> EventAdmissionDecision {
     89         EventAdmissionDecision::Accept
     90     }
     91 
     92     fn event_stored(&self, _context: &RelayEventStoredContext) {}
     93 }
     94 
     95 #[derive(Debug, Default)]
     96 pub struct NoopRelayRuntimeHooks;
     97 
     98 impl RelayRuntimeHooks for NoopRelayRuntimeHooks {}
     99 
    100 #[derive(Debug, Clone, PartialEq, Eq)]
    101 pub enum EventAdmissionDecision {
    102     Accept,
    103     Reject { message: String },
    104 }
    105 
    106 impl EventAdmissionDecision {
    107     pub fn reject(message: impl Into<String>) -> Self {
    108         Self::Reject {
    109             message: message.into(),
    110         }
    111     }
    112 }
    113 
    114 #[derive(Debug, Clone, PartialEq, Eq)]
    115 pub struct RelayEventContext {
    116     event_id: String,
    117     pubkey: String,
    118     created_at: u64,
    119     kind: u32,
    120     tags: Vec<Vec<String>>,
    121     content: String,
    122 }
    123 
    124 impl RelayEventContext {
    125     pub fn new(
    126         event_id: String,
    127         pubkey: String,
    128         created_at: u64,
    129         kind: u32,
    130         tags: Vec<Vec<String>>,
    131         content: String,
    132     ) -> Self {
    133         Self {
    134             event_id,
    135             pubkey,
    136             created_at,
    137             kind,
    138             tags,
    139             content,
    140         }
    141     }
    142 
    143     fn from_pocket_event(event: &PocketEvent) -> Result<Self, BaseRelayError> {
    144         let tags = event
    145             .tags()
    146             .map_err(|error| BaseRelayError::invalid(error.to_string()))?
    147             .iter()
    148             .map(|tag| {
    149                 tag.map(|value| {
    150                     str::from_utf8(value)
    151                         .map(str::to_owned)
    152                         .map_err(|error| BaseRelayError::invalid(error.to_string()))
    153                 })
    154                 .collect::<Result<Vec<_>, _>>()
    155             })
    156             .collect::<Result<Vec<_>, _>>()?;
    157         let content = str::from_utf8(event.content())
    158             .map(str::to_owned)
    159             .map_err(|error| BaseRelayError::invalid(error.to_string()))?;
    160         Ok(Self {
    161             event_id: event.id().to_string(),
    162             pubkey: event.pubkey().to_string(),
    163             created_at: event.created_at().as_u64(),
    164             kind: u32::from(event.kind().as_u16()),
    165             tags,
    166             content,
    167         })
    168     }
    169 
    170     pub fn event_id(&self) -> &str {
    171         &self.event_id
    172     }
    173 
    174     pub fn pubkey(&self) -> &str {
    175         &self.pubkey
    176     }
    177 
    178     pub fn created_at(&self) -> u64 {
    179         self.created_at
    180     }
    181 
    182     pub fn kind(&self) -> u32 {
    183         self.kind
    184     }
    185 
    186     pub fn tags(&self) -> &[Vec<String>] {
    187         &self.tags
    188     }
    189 
    190     pub fn content(&self) -> &str {
    191         &self.content
    192     }
    193 
    194     pub fn has_tag(&self, name: &str, value: &str) -> bool {
    195         self.tags.iter().any(|tag| {
    196             tag.first().is_some_and(|tag_name| tag_name == name)
    197                 && tag.iter().skip(1).any(|tag_value| tag_value == value)
    198         })
    199     }
    200 }
    201 
    202 #[derive(Debug, Clone, PartialEq, Eq)]
    203 pub struct RelayEventAdmissionContext {
    204     event: RelayEventContext,
    205     authenticated_pubkeys: Vec<String>,
    206     peer_ip: Option<IpAddr>,
    207     connection_id: Option<u64>,
    208     now: u64,
    209 }
    210 
    211 impl RelayEventAdmissionContext {
    212     pub fn new(
    213         event: RelayEventContext,
    214         authenticated_pubkeys: Vec<String>,
    215         peer_ip: Option<IpAddr>,
    216         connection_id: Option<u64>,
    217         now: u64,
    218     ) -> Self {
    219         Self {
    220             event,
    221             authenticated_pubkeys,
    222             peer_ip,
    223             connection_id,
    224             now,
    225         }
    226     }
    227 
    228     pub fn event(&self) -> &RelayEventContext {
    229         &self.event
    230     }
    231 
    232     pub fn authenticated_pubkeys(&self) -> &[String] {
    233         &self.authenticated_pubkeys
    234     }
    235 
    236     pub fn peer_ip(&self) -> Option<IpAddr> {
    237         self.peer_ip
    238     }
    239 
    240     pub fn connection_id(&self) -> Option<u64> {
    241         self.connection_id
    242     }
    243 
    244     pub fn now(&self) -> u64 {
    245         self.now
    246     }
    247 }
    248 
    249 #[derive(Debug, Clone, PartialEq, Eq)]
    250 pub struct RelayEventStoredContext {
    251     event: RelayEventContext,
    252     store_offsets: Vec<u64>,
    253 }
    254 
    255 impl RelayEventStoredContext {
    256     pub fn new(event: RelayEventContext, store_offsets: Vec<u64>) -> Self {
    257         Self {
    258             event,
    259             store_offsets,
    260         }
    261     }
    262 
    263     pub fn event(&self) -> &RelayEventContext {
    264         &self.event
    265     }
    266 
    267     pub fn store_offsets(&self) -> &[u64] {
    268         &self.store_offsets
    269     }
    270 }
    271 
    272 struct TanglePocketQueryRateLimitRequest<'a> {
    273     scope: TangleRateLimitScope,
    274     rules: TangleQueryRateLimitConfig,
    275     label: &'static str,
    276     subscription_id: &'a SubscriptionId,
    277     filters: &'a [PocketOwnedFilter],
    278     auth: &'a BaseAuthState,
    279     context: TangleClientRateLimitContext,
    280     now: UnixTimestamp,
    281 }
    282 
    283 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    284 enum TangleQueryClassification {
    285     Bounded,
    286     Broad(TangleBroadQueryReason),
    287 }
    288 
    289 impl TangleQueryClassification {
    290     fn is_broad(self) -> bool {
    291         matches!(self, Self::Broad(_))
    292     }
    293 }
    294 
    295 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    296 enum TangleBroadQueryReason {
    297     EmptyFilters,
    298     MissingPrimaryConstraint,
    299     MissingBoundedSelector,
    300     HighLimit,
    301     BroadTimeWindow,
    302 }
    303 
    304 #[derive(Debug, Clone, Copy)]
    305 struct TangleQueryClassifier {
    306     limits: BaseRelayLimits,
    307 }
    308 
    309 const BROAD_QUERY_TIME_WINDOW_SECONDS: u64 = 31 * 24 * 60 * 60;
    310 
    311 impl TangleQueryClassifier {
    312     fn new(limits: BaseRelayLimits) -> Self {
    313         Self { limits }
    314     }
    315 
    316     fn classify_pocket_query(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification {
    317         if filters.is_empty() {
    318             return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters);
    319         }
    320         filters
    321             .iter()
    322             .map(|filter| self.classify_pocket_query_filter(filter))
    323             .find(|classification| classification.is_broad())
    324             .unwrap_or(TangleQueryClassification::Bounded)
    325     }
    326 
    327     fn classify_pocket_count(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification {
    328         if filters.is_empty() {
    329             return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters);
    330         }
    331         filters
    332             .iter()
    333             .map(|filter| self.classify_pocket_count_filter(filter))
    334             .find(|classification| classification.is_broad())
    335             .unwrap_or(TangleQueryClassification::Bounded)
    336     }
    337 
    338     fn classify_pocket_query_filter(self, filter: &PocketFilter) -> TangleQueryClassification {
    339         if !self.has_pocket_primary_constraint(filter) {
    340             return TangleQueryClassification::Broad(
    341                 TangleBroadQueryReason::MissingPrimaryConstraint,
    342             );
    343         }
    344         if self.has_pocket_high_limit(filter) {
    345             return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit);
    346         }
    347         if self.has_pocket_broad_time_window(filter) && !self.has_pocket_strong_constraint(filter) {
    348             return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow);
    349         }
    350         TangleQueryClassification::Bounded
    351     }
    352 
    353     fn classify_pocket_count_filter(self, filter: &PocketFilter) -> TangleQueryClassification {
    354         if !self.has_pocket_primary_constraint(filter) {
    355             return TangleQueryClassification::Broad(
    356                 TangleBroadQueryReason::MissingPrimaryConstraint,
    357             );
    358         }
    359         if self.has_pocket_high_limit(filter) {
    360             return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit);
    361         }
    362         if self.has_pocket_broad_time_window(filter) {
    363             return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow);
    364         }
    365         if !self.has_pocket_count_bounded_selector(filter) {
    366             return TangleQueryClassification::Broad(
    367                 TangleBroadQueryReason::MissingBoundedSelector,
    368             );
    369         }
    370         TangleQueryClassification::Bounded
    371     }
    372 
    373     fn has_pocket_primary_constraint(self, filter: &PocketFilter) -> bool {
    374         filter.num_ids() > 0
    375             || filter.num_authors() > 0
    376             || filter.num_kinds() > 0
    377             || self.has_pocket_group_constraint(filter)
    378     }
    379 
    380     fn has_pocket_strong_constraint(self, filter: &PocketFilter) -> bool {
    381         filter.num_ids() > 0 || filter.num_authors() > 0 || self.has_pocket_group_constraint(filter)
    382     }
    383 
    384     fn has_pocket_count_bounded_selector(self, filter: &PocketFilter) -> bool {
    385         self.has_pocket_strong_constraint(filter)
    386             || (filter.num_kinds() > 0 && self.has_pocket_bounded_time_window(filter))
    387             || self.has_pocket_hll_count_selector(filter)
    388     }
    389 
    390     fn has_pocket_hll_count_selector(self, filter: &PocketFilter) -> bool {
    391         filter
    392             .hyperloglog_offset()
    393             .is_ok_and(|offset| offset.is_some())
    394     }
    395 
    396     fn has_pocket_group_constraint(self, filter: &PocketFilter) -> bool {
    397         filter
    398             .tags()
    399             .map(|tags| {
    400                 tags.iter().any(|mut tag| {
    401                     let name = tag.next();
    402                     let has_value = tag.next().is_some();
    403                     matches!(name, Some(value) if matches!(value, b"h" | b"d")) && has_value
    404                 })
    405             })
    406             .unwrap_or(false)
    407     }
    408 
    409     fn has_pocket_high_limit(self, filter: &PocketFilter) -> bool {
    410         let limit = if filter.limit() == u32::MAX {
    411             self.limits.default_limit()
    412         } else {
    413             u64::from(filter.limit())
    414         };
    415         limit >= self.limits.max_limit()
    416     }
    417 
    418     fn has_pocket_bounded_time_window(self, filter: &PocketFilter) -> bool {
    419         if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() {
    420             return false;
    421         }
    422         filter
    423             .until()
    424             .as_ref()
    425             .saturating_sub(*filter.since().as_ref())
    426             <= BROAD_QUERY_TIME_WINDOW_SECONDS
    427     }
    428 
    429     fn has_pocket_broad_time_window(self, filter: &PocketFilter) -> bool {
    430         if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() {
    431             return false;
    432         }
    433         filter
    434             .until()
    435             .as_ref()
    436             .saturating_sub(*filter.since().as_ref())
    437             > BROAD_QUERY_TIME_WINDOW_SECONDS
    438     }
    439 }
    440 
    441 impl RelayRuntime {
    442     pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> {
    443         Self::open_with_hooks(config, Arc::new(NoopRelayRuntimeHooks))
    444     }
    445 
    446     pub fn open_with_hooks(
    447         config: BaseRelayRuntimeConfig,
    448         hooks: Arc<dyn RelayRuntimeHooks>,
    449     ) -> Result<Self, BaseRelayError> {
    450         let limits = TangleRuntimeLimits::from_config(&config)?;
    451         let relay = config.open_relay()?;
    452         let readiness = BaseRelayReadinessHandle::new(relay.readiness_state());
    453         let event_bus = TangleEventBus::new(limits.event_bus_capacity())?;
    454         let rate_limiter = TangleRateLimiter::new();
    455         let metrics = TangleRuntimeMetrics::new();
    456         metrics.record_disk_used_bytes(directory_size_bytes(
    457             config.pocket_config().data_directory(),
    458         ));
    459         metrics.record_event_bus_receivers(event_bus.receiver_count());
    460         metrics.record_outbox_pending_events(relay.group_outbox_pending_events());
    461         logging::log_runtime_opened(&config);
    462         Ok(Self {
    463             config,
    464             relay,
    465             readiness,
    466             event_bus,
    467             rate_limiter,
    468             metrics,
    469             limits,
    470             shutdown: TangleShutdownSignal::new(),
    471             hooks,
    472         })
    473     }
    474 
    475     pub fn config(&self) -> &BaseRelayRuntimeConfig {
    476         &self.config
    477     }
    478 
    479     pub fn relay(&self) -> &BaseRelay {
    480         &self.relay
    481     }
    482 
    483     pub fn relay_mut(&mut self) -> &mut BaseRelay {
    484         &mut self.relay
    485     }
    486 
    487     pub fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> {
    488         self.config.auth_state()
    489     }
    490 
    491     pub fn readiness_state(&self) -> BaseRelayReadinessState {
    492         self.readiness.snapshot()
    493     }
    494 
    495     pub fn readiness_handle(&self) -> BaseRelayReadinessHandle {
    496         self.readiness.clone()
    497     }
    498 
    499     pub fn limits(&self) -> TangleRuntimeLimits {
    500         self.limits
    501     }
    502 
    503     pub fn event_bus(&self) -> &TangleEventBus {
    504         &self.event_bus
    505     }
    506 
    507     pub fn rate_limiter(&self) -> &TangleRateLimiter {
    508         &self.rate_limiter
    509     }
    510 
    511     pub fn metrics(&self) -> &TangleRuntimeMetrics {
    512         &self.metrics
    513     }
    514 
    515     pub fn shutdown_signal(&self) -> &TangleShutdownSignal {
    516         &self.shutdown
    517     }
    518 
    519     pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
    520         self.shutdown.request_shutdown();
    521         self.relay.shutdown()
    522     }
    523 }
    524 
    525 struct RelayRuntimeShared {
    526     config: Arc<BaseRelayRuntimeConfig>,
    527     store: PocketStoreHandle,
    528     groups: Option<GroupServiceHandle>,
    529     readiness: BaseRelayReadinessHandle,
    530     limits: TangleRuntimeLimits,
    531     event_bus: TangleEventBus,
    532     rate_limiter: TangleRateLimiter,
    533     metrics: TangleRuntimeMetrics,
    534     shutdown: TangleShutdownSignal,
    535     hooks: Arc<dyn RelayRuntimeHooks>,
    536 }
    537 
    538 impl RelayRuntimeShared {
    539     fn from_runtime(runtime: RelayRuntime) -> Self {
    540         let RelayRuntime {
    541             config,
    542             relay,
    543             readiness,
    544             limits,
    545             event_bus,
    546             rate_limiter,
    547             metrics,
    548             shutdown,
    549             hooks,
    550         } = runtime;
    551         let store = relay.store_handle();
    552         let groups = relay.group_service_handle();
    553         Self {
    554             config: Arc::new(config),
    555             store,
    556             groups,
    557             readiness,
    558             limits,
    559             event_bus,
    560             rate_limiter,
    561             metrics,
    562             shutdown,
    563             hooks,
    564         }
    565     }
    566 
    567     fn rate_limit_event_pocket(
    568         &self,
    569         event: &PocketEvent,
    570         context: TangleClientRateLimitContext,
    571         now: UnixTimestamp,
    572     ) -> Result<Option<RelayMessage>, BaseRelayError> {
    573         let rules = self.config.rate_limits().event();
    574         if let Some(peer_ip) = context.peer_ip
    575             && let Some(message) = self.rate_limit_ok_pocket(
    576                 event,
    577                 TangleRateLimitKey::ip(TangleRateLimitScope::Event, peer_ip),
    578                 rules.per_ip(),
    579                 "event ip",
    580                 now,
    581             )?
    582         {
    583             return Ok(Some(message));
    584         }
    585         self.rate_limit_ok_pocket(
    586             event,
    587             TangleRateLimitKey::pubkey(TangleRateLimitScope::Event, pocket_event_pubkey(event)?),
    588             rules.per_pubkey(),
    589             "event pubkey",
    590             now,
    591         )
    592         .and_then(|message| {
    593             if message.is_some() {
    594                 return Ok(message);
    595             }
    596             self.rate_limit_ok_pocket(
    597                 event,
    598                 TangleRateLimitKey::kind(TangleRateLimitScope::Event, pocket_event_kind(event)?),
    599                 rules.per_kind(),
    600                 "event kind",
    601                 now,
    602             )
    603         })
    604     }
    605 
    606     fn rate_limit_auth_attempt_pocket(
    607         &self,
    608         event: &PocketEvent,
    609         context: TangleClientRateLimitContext,
    610         now: UnixTimestamp,
    611     ) -> Result<Option<RelayMessage>, BaseRelayError> {
    612         let rules = self.config.rate_limits().auth();
    613         if let Some(peer_ip) = context.peer_ip
    614             && let Some(message) = self.rate_limit_ok_pocket(
    615                 event,
    616                 TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip),
    617                 rules.per_ip(),
    618                 "auth ip",
    619                 now,
    620             )?
    621         {
    622             return Ok(Some(message));
    623         }
    624         self.rate_limit_ok_pocket(
    625             event,
    626             TangleRateLimitKey::pubkey(TangleRateLimitScope::Auth, pocket_event_pubkey(event)?),
    627             rules.per_pubkey(),
    628             "auth pubkey",
    629             now,
    630         )
    631     }
    632 
    633     fn rate_limit_auth_failure_pocket(
    634         &self,
    635         event: &PocketEvent,
    636         context: TangleClientRateLimitContext,
    637         now: UnixTimestamp,
    638     ) -> Result<Option<RelayMessage>, BaseRelayError> {
    639         let rules = self.config.rate_limits().auth();
    640         if let Some(peer_ip) = context.peer_ip
    641             && let Some(message) = self.rate_limit_ok_pocket(
    642                 event,
    643                 TangleRateLimitKey::auth_failure(Some(peer_ip), None),
    644                 rules.failures_per_ip(),
    645                 "auth failure ip",
    646                 now,
    647             )?
    648         {
    649             return Ok(Some(message));
    650         }
    651         self.rate_limit_ok_pocket(
    652             event,
    653             TangleRateLimitKey::auth_failure(None, Some(pocket_event_pubkey(event)?)),
    654             rules.failures(),
    655             "auth failure",
    656             now,
    657         )
    658     }
    659 
    660     fn rate_limit_group_write_pocket(
    661         &self,
    662         event: &PocketEvent,
    663         context: TangleClientRateLimitContext,
    664         now: UnixTimestamp,
    665     ) -> Result<Option<RelayMessage>, BaseRelayError> {
    666         if !self.config.groups().enabled() {
    667             return Ok(None);
    668         }
    669         let class =
    670             validate_client_group_event_structure(event, self.config.groups().limits()).ok();
    671         let Some(class) = class else {
    672             return Ok(None);
    673         };
    674         let Some(group_id) = class.group_id().cloned() else {
    675             return Ok(None);
    676         };
    677         let rules = self.config.rate_limits().group();
    678         let kind = pocket_event_kind(event)?;
    679         let pubkey = pocket_event_pubkey(event)?;
    680         if kind.as_u32() == KIND_GROUP_JOIN_REQUEST {
    681             if let Some(peer_ip) = context.peer_ip
    682                 && let Some(message) = self.rate_limit_ok_pocket(
    683                     event,
    684                     TangleRateLimitKey::join_flow_ip(group_id.clone(), peer_ip),
    685                     rules.join_flow_per_ip(),
    686                     "group join ip",
    687                     now,
    688                 )?
    689             {
    690                 return Ok(Some(message));
    691             }
    692             if let Some(message) = self.rate_limit_ok_pocket(
    693                 event,
    694                 TangleRateLimitKey::join_flow(group_id.clone(), pubkey.clone()),
    695                 rules.join_flow(),
    696                 "group join",
    697                 now,
    698             )? {
    699                 return Ok(Some(message));
    700             }
    701         }
    702         if let Some(peer_ip) = context.peer_ip
    703             && let Some(message) = self.rate_limit_ok_pocket(
    704                 event,
    705                 TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip),
    706                 rules.write_per_ip(),
    707                 "group ip",
    708                 now,
    709             )?
    710         {
    711             return Ok(Some(message));
    712         }
    713         if let Some(message) = self.rate_limit_ok_pocket(
    714             event,
    715             TangleRateLimitKey::pubkey(TangleRateLimitScope::GroupWrite, pubkey),
    716             rules.write_per_pubkey(),
    717             "group pubkey",
    718             now,
    719         )? {
    720             return Ok(Some(message));
    721         }
    722         if let Some(message) = self.rate_limit_ok_pocket(
    723             event,
    724             TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id),
    725             rules.write_per_group(),
    726             "group write",
    727             now,
    728         )? {
    729             return Ok(Some(message));
    730         }
    731         self.rate_limit_ok_pocket(
    732             event,
    733             TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, kind),
    734             rules.write_per_kind(),
    735             "group kind",
    736             now,
    737         )
    738     }
    739 
    740     fn is_group_event_pocket(&self, event: &PocketEvent) -> bool {
    741         self.config.groups().enabled()
    742             && validate_client_group_event_structure(event, self.config.groups().limits())
    743                 .is_ok_and(|class| !matches!(class, GroupEventClass::NonGroup))
    744     }
    745 
    746     fn handle_pocket_event_with_auth_report(
    747         &self,
    748         event: &PocketEvent,
    749         auth: &BaseAuthState,
    750     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
    751         BaseRelay::handle_pocket_event_with_shared_services(
    752             &self.store,
    753             self.groups.as_ref(),
    754             self.limits.base_relay_limits(),
    755             event,
    756             auth,
    757         )
    758     }
    759 
    760     fn group_outbox_pending_events(&self) -> usize {
    761         self.groups
    762             .as_ref()
    763             .map(GroupServiceHandle::outbox_pending_events)
    764             .unwrap_or(0)
    765     }
    766 
    767     fn query_req_with_auth_report(
    768         &self,
    769         subscription_id: SubscriptionId,
    770         filters: Vec<PocketOwnedFilter>,
    771         search_present: bool,
    772         auth: &BaseAuthState,
    773     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
    774         BaseRelay::query_req_with_shared_services(
    775             &self.store,
    776             self.groups.as_ref(),
    777             self.limits.base_relay_limits(),
    778             self.config.pocket_query_config(),
    779             BaseRelayReqQuery::new(subscription_id, filters, search_present, auth),
    780         )
    781     }
    782 
    783     fn handle_count_with_auth_report(
    784         &self,
    785         subscription_id: SubscriptionId,
    786         filters: Vec<PocketOwnedFilter>,
    787         search_present: bool,
    788         auth: &BaseAuthState,
    789     ) -> Result<BaseRelayCountReport, BaseRelayError> {
    790         BaseRelay::handle_count_with_shared_services(
    791             &self.store,
    792             self.groups.as_ref(),
    793             self.limits.base_relay_limits(),
    794             self.config.pocket_query_config(),
    795             BaseRelayCountQuery::new(subscription_id, filters, search_present, auth),
    796         )
    797     }
    798 
    799     fn rate_limit_req_pocket(
    800         &self,
    801         subscription_id: &SubscriptionId,
    802         filters: &[PocketOwnedFilter],
    803         auth: &BaseAuthState,
    804         context: TangleClientRateLimitContext,
    805         now: UnixTimestamp,
    806     ) -> Option<RelayMessage> {
    807         self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest {
    808             scope: TangleRateLimitScope::Req,
    809             rules: self.config.rate_limits().req(),
    810             label: "req",
    811             subscription_id,
    812             filters,
    813             auth,
    814             context,
    815             now,
    816         })
    817     }
    818 
    819     fn rate_limit_count_pocket(
    820         &self,
    821         subscription_id: &SubscriptionId,
    822         filters: &[PocketOwnedFilter],
    823         auth: &BaseAuthState,
    824         context: TangleClientRateLimitContext,
    825         now: UnixTimestamp,
    826     ) -> Option<RelayMessage> {
    827         self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest {
    828             scope: TangleRateLimitScope::Count,
    829             rules: self.config.rate_limits().count(),
    830             label: "count",
    831             subscription_id,
    832             filters,
    833             auth,
    834             context,
    835             now,
    836         })
    837     }
    838 
    839     fn refuse_broad_count(
    840         &self,
    841         subscription_id: &SubscriptionId,
    842         filters: &[PocketOwnedFilter],
    843     ) -> Option<RelayMessage> {
    844         if TangleQueryClassifier::new(self.limits.base_relay_limits())
    845             .classify_pocket_count(filters)
    846             .is_broad()
    847         {
    848             self.metrics.record_count_refusal();
    849             self.metrics.record_broad_query_rejection();
    850             return Some(RelayMessage::Closed {
    851                 subscription_id: subscription_id.clone(),
    852                 message: BaseRelayError::restricted("count filters are too broad or expensive")
    853                     .prefixed_message(),
    854             });
    855         }
    856         None
    857     }
    858 
    859     fn rate_limit_pocket_query(
    860         &self,
    861         request: TanglePocketQueryRateLimitRequest<'_>,
    862     ) -> Option<RelayMessage> {
    863         if let Some(peer_ip) = request.context.peer_ip
    864             && let Some(message) = self.rate_limit_closed(
    865                 request.subscription_id,
    866                 TangleRateLimitKey::ip(request.scope, peer_ip),
    867                 request.rules.per_ip(),
    868                 request.label,
    869                 "ip",
    870                 request.now,
    871             )
    872         {
    873             return Some(message);
    874         }
    875         if let Some(connection_id) = request.context.connection_id
    876             && let Some(message) = self.rate_limit_closed(
    877                 request.subscription_id,
    878                 TangleRateLimitKey::connection(request.scope, connection_id),
    879                 request.rules.per_connection(),
    880                 request.label,
    881                 "connection",
    882                 request.now,
    883             )
    884         {
    885             return Some(message);
    886         }
    887         for pubkey in request.auth.authenticated_pubkeys() {
    888             if let Some(message) = self.rate_limit_closed(
    889                 request.subscription_id,
    890                 TangleRateLimitKey::pubkey(request.scope, pubkey.clone()),
    891                 request.rules.per_pubkey(),
    892                 request.label,
    893                 "pubkey",
    894                 request.now,
    895             ) {
    896                 return Some(message);
    897             }
    898         }
    899         for group_id in pocket_filter_group_ids(request.filters) {
    900             if let Some(message) = self.rate_limit_closed(
    901                 request.subscription_id,
    902                 TangleRateLimitKey::group(request.scope, group_id),
    903                 request.rules.per_group(),
    904                 request.label,
    905                 "group",
    906                 request.now,
    907             ) {
    908                 return Some(message);
    909             }
    910         }
    911         for kind in pocket_filter_kinds(request.filters) {
    912             if let Some(message) = self.rate_limit_closed(
    913                 request.subscription_id,
    914                 TangleRateLimitKey::kind(request.scope, kind),
    915                 request.rules.per_kind(),
    916                 request.label,
    917                 "kind",
    918                 request.now,
    919             ) {
    920                 return Some(message);
    921             }
    922         }
    923         let classifier = TangleQueryClassifier::new(self.limits.base_relay_limits());
    924         let query_classification = match request.scope {
    925             TangleRateLimitScope::Req => classifier.classify_pocket_query(request.filters),
    926             TangleRateLimitScope::Count => classifier.classify_pocket_count(request.filters),
    927             TangleRateLimitScope::Auth
    928             | TangleRateLimitScope::Event
    929             | TangleRateLimitScope::GroupWrite => classifier.classify_pocket_query(request.filters),
    930         };
    931         if query_classification.is_broad()
    932             && let Some(message) = self.rate_limit_closed(
    933                 request.subscription_id,
    934                 TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad),
    935                 request.rules.broad(),
    936                 request.label,
    937                 "broad",
    938                 request.now,
    939             )
    940         {
    941             self.metrics.record_broad_query_rejection();
    942             return Some(message);
    943         }
    944         None
    945     }
    946 
    947     fn rate_limit_closed(
    948         &self,
    949         subscription_id: &SubscriptionId,
    950         key: TangleRateLimitKey,
    951         rule: TangleRateLimitRule,
    952         label: &'static str,
    953         dimension: &'static str,
    954         now: UnixTimestamp,
    955     ) -> Option<RelayMessage> {
    956         match self.rate_limiter.record(key, rule, now) {
    957             TangleRateLimitDecision::Allowed { .. } => None,
    958             TangleRateLimitDecision::Rejected { reset_at } => {
    959                 self.metrics.record_rate_limit_rejection();
    960                 logging::log_rate_limit_rejected(label, dimension, reset_at);
    961                 Some(RelayMessage::Closed {
    962                     subscription_id: subscription_id.clone(),
    963                     message: BaseRelayError::rate_limited(format!(
    964                         "{label} {dimension} rate limit exceeded until {reset_at}"
    965                     ))
    966                     .prefixed_message(),
    967                 })
    968             }
    969         }
    970     }
    971 
    972     fn rate_limit_ok_pocket(
    973         &self,
    974         event: &PocketEvent,
    975         key: TangleRateLimitKey,
    976         rule: TangleRateLimitRule,
    977         label: &'static str,
    978         now: UnixTimestamp,
    979     ) -> Result<Option<RelayMessage>, BaseRelayError> {
    980         Ok(match self.rate_limiter.record(key, rule, now) {
    981             TangleRateLimitDecision::Allowed { .. } => None,
    982             TangleRateLimitDecision::Rejected { reset_at } => {
    983                 self.metrics.record_rate_limit_rejection();
    984                 logging::log_rate_limit_rejected(label, "event", reset_at);
    985                 Some(RelayMessage::Ok {
    986                     event_id: pocket_event_id(event)?,
    987                     accepted: false,
    988                     message: BaseRelayError::rate_limited(format!(
    989                         "{label} rate limit exceeded until {reset_at}"
    990                     ))
    991                     .prefixed_message(),
    992                 })
    993             }
    994         })
    995     }
    996 }
    997 
    998 #[derive(Clone)]
    999 pub struct RelayRuntimeHandle {
   1000     inner: Arc<RelayRuntimeShared>,
   1001 }
   1002 
   1003 impl RelayRuntimeHandle {
   1004     pub fn new(runtime: RelayRuntime) -> Self {
   1005         Self {
   1006             inner: Arc::new(RelayRuntimeShared::from_runtime(runtime)),
   1007         }
   1008     }
   1009 
   1010     pub fn metrics(&self) -> TangleRuntimeMetrics {
   1011         self.inner.metrics.clone()
   1012     }
   1013 
   1014     pub fn readiness_handle(&self) -> BaseRelayReadinessHandle {
   1015         self.inner.readiness.clone()
   1016     }
   1017 
   1018     pub fn limits(&self) -> TangleRuntimeLimits {
   1019         self.inner.limits
   1020     }
   1021 
   1022     pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> {
   1023         self.inner.config.auth_state()
   1024     }
   1025 
   1026     pub async fn handle_count_pocket(
   1027         &self,
   1028         subscription_id: SubscriptionId,
   1029         filters: Vec<PocketOwnedFilter>,
   1030         auth: &mut BaseAuthState,
   1031         now: UnixTimestamp,
   1032     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1033         let messages = self
   1034             .handle_client_message_with_rate_limit_context(
   1035                 RuntimeClientMessage::Count {
   1036                     subscription_id,
   1037                     filters,
   1038                     search_present: false,
   1039                 },
   1040                 auth,
   1041                 TangleClientRateLimitContext::default(),
   1042                 now,
   1043             )
   1044             .await?;
   1045         protocol_control_messages(messages)
   1046     }
   1047 
   1048     #[cfg(test)]
   1049     pub(crate) async fn handle_client_message(
   1050         &self,
   1051         message: RuntimeClientMessage,
   1052         auth: &mut BaseAuthState,
   1053         now: UnixTimestamp,
   1054     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1055         let messages = self
   1056             .handle_client_message_with_rate_limit_context(
   1057                 message,
   1058                 auth,
   1059                 TangleClientRateLimitContext::default(),
   1060                 now,
   1061             )
   1062             .await?;
   1063         protocol_messages_for_test(messages)
   1064     }
   1065 
   1066     #[cfg(test)]
   1067     pub(crate) async fn handle_protocol_client_message_for_test(
   1068         &self,
   1069         message: tangle_protocol::ClientMessage,
   1070         auth: &mut BaseAuthState,
   1071         now: UnixTimestamp,
   1072     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1073         self.handle_client_message(
   1074             protocol_client_message_to_runtime_for_test(message)?,
   1075             auth,
   1076             now,
   1077         )
   1078         .await
   1079     }
   1080 
   1081     #[cfg(test)]
   1082     pub(crate) async fn handle_protocol_client_message_with_rate_limit_context_for_test(
   1083         &self,
   1084         message: tangle_protocol::ClientMessage,
   1085         auth: &mut BaseAuthState,
   1086         rate_limit_context: TangleClientRateLimitContext,
   1087         now: UnixTimestamp,
   1088     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1089         let messages = self
   1090             .handle_client_message_with_rate_limit_context(
   1091                 protocol_client_message_to_runtime_for_test(message)?,
   1092                 auth,
   1093                 rate_limit_context,
   1094                 now,
   1095             )
   1096             .await?;
   1097         protocol_messages_for_test(messages)
   1098     }
   1099 
   1100     pub(crate) async fn handle_client_message_with_rate_limit_context(
   1101         &self,
   1102         message: RuntimeClientMessage,
   1103         auth: &mut BaseAuthState,
   1104         rate_limit_context: TangleClientRateLimitContext,
   1105         now: UnixTimestamp,
   1106     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
   1107         self.inner
   1108             .metrics
   1109             .record_client_message(runtime_client_message_metric_kind(&message));
   1110         match message {
   1111             RuntimeClientMessage::Event(pocket_event) => {
   1112                 let started_at = Instant::now();
   1113                 let event_id = pocket_event_id(&pocket_event)?;
   1114                 let event_context = RelayEventContext::from_pocket_event(&pocket_event)?;
   1115                 let is_group_event = self.inner.is_group_event_pocket(&pocket_event);
   1116                 if let Some(message) =
   1117                     self.inner
   1118                         .rate_limit_event_pocket(&pocket_event, rate_limit_context, now)?
   1119                 {
   1120                     record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
   1121                     return Ok(vec![message.into()]);
   1122                 }
   1123                 if let Some(message) = self.inner.rate_limit_group_write_pocket(
   1124                     &pocket_event,
   1125                     rate_limit_context,
   1126                     now,
   1127                 )? {
   1128                     record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
   1129                     return Ok(vec![message.into()]);
   1130                 }
   1131                 let authenticated_pubkeys = auth
   1132                     .authenticated_pubkeys()
   1133                     .iter()
   1134                     .map(ToString::to_string)
   1135                     .collect();
   1136                 let admission = RelayEventAdmissionContext::new(
   1137                     event_context.clone(),
   1138                     authenticated_pubkeys,
   1139                     rate_limit_context.peer_ip(),
   1140                     rate_limit_context.connection_id(),
   1141                     now.as_u64(),
   1142                 );
   1143                 if let EventAdmissionDecision::Reject { message } =
   1144                     self.inner.hooks.admit_event(&admission)
   1145                 {
   1146                     let message = RelayMessage::Ok {
   1147                         event_id,
   1148                         accepted: false,
   1149                         message: BaseRelayError::restricted(message).prefixed_message(),
   1150                     };
   1151                     record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
   1152                     return Ok(vec![message.into()]);
   1153                 }
   1154                 let result = self
   1155                     .inner
   1156                     .handle_pocket_event_with_auth_report(&pocket_event, auth)?;
   1157                 let group_outbox_pending_events =
   1158                     is_group_event.then(|| self.inner.group_outbox_pending_events());
   1159                 if is_group_event {
   1160                     for _ in 0..result.stored_offsets().len().saturating_sub(1) {
   1161                         self.inner.metrics.record_outbox_replayed_event();
   1162                     }
   1163                     self.inner
   1164                         .metrics
   1165                         .record_outbox_pending_events(group_outbox_pending_events.unwrap_or(0));
   1166                 }
   1167                 for offset in result.stored_offsets() {
   1168                     self.inner.metrics.record_stored_event_offset();
   1169                     let receivers = self.inner.event_bus.publish(*offset);
   1170                     self.inner.metrics.record_event_bus_publish(receivers);
   1171                 }
   1172                 if !result.stored_offsets().is_empty() {
   1173                     logging::log_event_stored(
   1174                         &event_id,
   1175                         result.stored_offsets().len(),
   1176                         self.inner.metrics.stored_event_offsets(),
   1177                     );
   1178                     self.inner.hooks.event_stored(&RelayEventStoredContext::new(
   1179                         event_context,
   1180                         result
   1181                             .stored_offsets()
   1182                             .iter()
   1183                             .map(|offset| offset.as_u64())
   1184                             .collect(),
   1185                     ));
   1186                 }
   1187                 let message = result.into_message();
   1188                 record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
   1189                 Ok(vec![message.into()])
   1190             }
   1191             RuntimeClientMessage::Req {
   1192                 subscription_id,
   1193                 filters,
   1194                 search_present,
   1195             } => {
   1196                 let started_at = Instant::now();
   1197                 self.inner
   1198                     .limits
   1199                     .base_relay_limits()
   1200                     .validate_subscription_id(&subscription_id)?;
   1201                 self.inner
   1202                     .limits
   1203                     .base_relay_limits()
   1204                     .validate_pocket_filters(&filters)?;
   1205                 if let Some(message) =
   1206                     BaseRelay::unsupported_search_present_closed(&subscription_id, search_present)
   1207                 {
   1208                     self.inner
   1209                         .metrics
   1210                         .record_query_latency(elapsed_micros(started_at));
   1211                     return Ok(vec![message.into()]);
   1212                 }
   1213                 if let Some(message) = self.inner.rate_limit_req_pocket(
   1214                     &subscription_id,
   1215                     &filters,
   1216                     auth,
   1217                     rate_limit_context,
   1218                     now,
   1219                 ) {
   1220                     self.inner
   1221                         .metrics
   1222                         .record_query_latency(elapsed_micros(started_at));
   1223                     return Ok(vec![message.into()]);
   1224                 }
   1225                 let report = self.inner.query_req_with_auth_report(
   1226                     subscription_id,
   1227                     filters,
   1228                     search_present,
   1229                     auth,
   1230                 )?;
   1231                 self.inner
   1232                     .metrics
   1233                     .record_query_metrics(report.query_metrics());
   1234                 if report.group_read_denied() {
   1235                     self.inner.metrics.record_group_read_denial();
   1236                 }
   1237                 self.inner
   1238                     .metrics
   1239                     .record_query_latency(elapsed_micros(started_at));
   1240                 Ok(report.into_messages())
   1241             }
   1242             RuntimeClientMessage::Count {
   1243                 subscription_id,
   1244                 filters,
   1245                 search_present,
   1246             } => {
   1247                 let started_at = Instant::now();
   1248                 self.inner
   1249                     .limits
   1250                     .base_relay_limits()
   1251                     .validate_subscription_id(&subscription_id)?;
   1252                 self.inner
   1253                     .limits
   1254                     .base_relay_limits()
   1255                     .validate_pocket_filters(&filters)?;
   1256                 if let Some(message) =
   1257                     BaseRelay::unsupported_search_present_closed(&subscription_id, search_present)
   1258                 {
   1259                     self.inner
   1260                         .metrics
   1261                         .record_query_latency(elapsed_micros(started_at));
   1262                     return Ok(vec![message.into()]);
   1263                 }
   1264                 if let Some(message) = self.inner.refuse_broad_count(&subscription_id, &filters) {
   1265                     self.inner
   1266                         .metrics
   1267                         .record_query_latency(elapsed_micros(started_at));
   1268                     return Ok(vec![message.into()]);
   1269                 }
   1270                 if let Some(message) = self.inner.rate_limit_count_pocket(
   1271                     &subscription_id,
   1272                     &filters,
   1273                     auth,
   1274                     rate_limit_context,
   1275                     now,
   1276                 ) {
   1277                     self.inner
   1278                         .metrics
   1279                         .record_query_latency(elapsed_micros(started_at));
   1280                     return Ok(vec![message.into()]);
   1281                 }
   1282                 let report = self.inner.handle_count_with_auth_report(
   1283                     subscription_id,
   1284                     filters,
   1285                     search_present,
   1286                     auth,
   1287                 )?;
   1288                 self.inner
   1289                     .metrics
   1290                     .record_query_metrics(report.query_metrics());
   1291                 if report.group_read_denied() {
   1292                     self.inner.metrics.record_group_read_denial();
   1293                 }
   1294                 self.inner
   1295                     .metrics
   1296                     .record_query_latency(elapsed_micros(started_at));
   1297                 Ok(vec![report.into_message().into()])
   1298             }
   1299             RuntimeClientMessage::Auth(pocket_event) => {
   1300                 let event_id = pocket_event_id(&pocket_event)?;
   1301                 if let Err(error) = self
   1302                     .inner
   1303                     .limits
   1304                     .base_relay_limits()
   1305                     .validate_pocket_event(&pocket_event)
   1306                 {
   1307                     self.inner.metrics.record_auth_failure();
   1308                     return Ok(vec![RuntimeRelayMessage::from(RelayMessage::Ok {
   1309                         event_id,
   1310                         accepted: false,
   1311                         message: error.prefixed_message(),
   1312                     })]);
   1313                 }
   1314                 if let Some(message) = self.inner.rate_limit_auth_attempt_pocket(
   1315                     &pocket_event,
   1316                     rate_limit_context,
   1317                     now,
   1318                 )? {
   1319                     self.inner.metrics.record_auth_failure();
   1320                     return Ok(vec![message.into()]);
   1321                 }
   1322                 let event_for_failure = pocket_event.clone();
   1323                 let replies = BaseRelay::handle_pocket_auth_with_limits(
   1324                     self.inner.limits.base_relay_limits(),
   1325                     &pocket_event,
   1326                     auth,
   1327                     now,
   1328                 );
   1329                 if auth_response_failed(&replies) {
   1330                     self.inner.metrics.record_auth_failure();
   1331                     if let Some(message) = self.inner.rate_limit_auth_failure_pocket(
   1332                         &event_for_failure,
   1333                         rate_limit_context,
   1334                         now,
   1335                     )? {
   1336                         return Ok(vec![message.into()]);
   1337                     }
   1338                 } else {
   1339                     self.inner.metrics.record_auth_success();
   1340                 }
   1341                 Ok(replies.into_iter().map(Into::into).collect())
   1342             }
   1343             RuntimeClientMessage::Close(subscription_id) => {
   1344                 self.inner
   1345                     .limits
   1346                     .base_relay_limits()
   1347                     .validate_subscription_id(&subscription_id)?;
   1348                 Ok(Vec::new())
   1349             }
   1350             RuntimeClientMessage::NegOpen {
   1351                 subscription_id, ..
   1352             }
   1353             | RuntimeClientMessage::NegMsg {
   1354                 subscription_id, ..
   1355             } => {
   1356                 self.inner
   1357                     .limits
   1358                     .base_relay_limits()
   1359                     .validate_subscription_id(&subscription_id)?;
   1360                 Ok(vec![
   1361                     BaseRelay::disabled_negentropy_message(subscription_id).into(),
   1362                 ])
   1363             }
   1364             RuntimeClientMessage::NegClose(subscription_id) => {
   1365                 self.inner
   1366                     .limits
   1367                     .base_relay_limits()
   1368                     .validate_subscription_id(&subscription_id)?;
   1369                 Ok(Vec::new())
   1370             }
   1371         }
   1372     }
   1373 
   1374     pub async fn subscribe_events(&self) -> TangleEventReceiver {
   1375         let receiver = self.inner.event_bus.subscribe();
   1376         self.inner
   1377             .metrics
   1378             .record_event_bus_receivers(self.inner.event_bus.receiver_count());
   1379         receiver
   1380     }
   1381 
   1382     pub async fn rate_limiter(&self) -> TangleRateLimiter {
   1383         self.inner.rate_limiter.clone()
   1384     }
   1385 
   1386     pub(crate) async fn rate_limit_req_pocket(
   1387         &self,
   1388         subscription_id: &SubscriptionId,
   1389         filters: &[PocketOwnedFilter],
   1390         auth: &BaseAuthState,
   1391         rate_limit_context: TangleClientRateLimitContext,
   1392         now: UnixTimestamp,
   1393     ) -> Option<RelayMessage> {
   1394         self.inner
   1395             .rate_limit_req_pocket(subscription_id, filters, auth, rate_limit_context, now)
   1396     }
   1397 
   1398     pub(crate) async fn query_req_with_auth_report(
   1399         &self,
   1400         subscription_id: SubscriptionId,
   1401         filters: Vec<PocketOwnedFilter>,
   1402         search_present: bool,
   1403         auth: &BaseAuthState,
   1404     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
   1405         let started_at = Instant::now();
   1406         let report = self.inner.query_req_with_auth_report(
   1407             subscription_id,
   1408             filters,
   1409             search_present,
   1410             auth,
   1411         )?;
   1412         if report.group_read_denied() {
   1413             self.inner.metrics.record_group_read_denial();
   1414         }
   1415         self.inner
   1416             .metrics
   1417             .record_query_latency(elapsed_micros(started_at));
   1418         Ok(report)
   1419     }
   1420 
   1421     pub async fn event_by_offset_with_auth(
   1422         &self,
   1423         offset: StoreOffset,
   1424         auth: &BaseAuthState,
   1425     ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> {
   1426         let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?;
   1427         let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
   1428         let visible = BaseRelay::group_read_gate_visible_to_auth(
   1429             self.inner.groups.as_ref(),
   1430             &pocket_event,
   1431             &group_auth,
   1432         )?;
   1433         if !visible {
   1434             self.inner.metrics.record_group_read_denial();
   1435             return Ok(None);
   1436         }
   1437         Ok(Some(pocket_event))
   1438     }
   1439 
   1440     pub(crate) async fn fanout_event_offset(
   1441         &self,
   1442         offset: StoreOffset,
   1443         subscriptions: &mut LiveSubscriptionSet,
   1444         auth: &BaseAuthState,
   1445     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
   1446         let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?;
   1447         let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
   1448         let subscriptions = subscriptions.fanout(&pocket_event, &group_auth, |event, auth| {
   1449             BaseRelay::group_read_gate_visible_to_auth(self.inner.groups.as_ref(), event, auth)
   1450                 .unwrap_or(false)
   1451         })?;
   1452         Ok(subscriptions
   1453             .into_iter()
   1454             .map(|subscription_id| {
   1455                 RuntimeRelayMessage::event(subscription_id, pocket_event.clone())
   1456             })
   1457             .collect())
   1458     }
   1459 
   1460     pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
   1461         self.inner.shutdown.request_shutdown();
   1462         self.inner.store.sync()?;
   1463         Ok(BaseRelayShutdownReport::new(0))
   1464     }
   1465 }
   1466 
   1467 fn auth_response_failed(replies: &[RelayMessage]) -> bool {
   1468     replies.iter().any(|reply| {
   1469         matches!(
   1470             reply,
   1471             RelayMessage::Ok {
   1472                 accepted: false,
   1473                 ..
   1474             }
   1475         )
   1476     })
   1477 }
   1478 
   1479 fn record_event_metrics(
   1480     metrics: &TangleRuntimeMetrics,
   1481     message: &RelayMessage,
   1482     is_group_event: bool,
   1483     started_at: Instant,
   1484 ) {
   1485     metrics.record_event_admission_latency(elapsed_micros(started_at));
   1486     if let RelayMessage::Ok { accepted, .. } = message {
   1487         if *accepted {
   1488             metrics.record_event_admission();
   1489         } else {
   1490             metrics.record_event_rejection();
   1491             if is_group_event {
   1492                 metrics.record_group_write_denial();
   1493             }
   1494         }
   1495     }
   1496 }
   1497 
   1498 fn elapsed_micros(started_at: Instant) -> u64 {
   1499     u64::try_from(started_at.elapsed().as_micros()).unwrap_or(u64::MAX)
   1500 }
   1501 
   1502 fn directory_size_bytes(path: &Path) -> u64 {
   1503     let Ok(metadata) = fs::metadata(path) else {
   1504         return 0;
   1505     };
   1506     if metadata.is_file() {
   1507         return metadata.len();
   1508     }
   1509     if !metadata.is_dir() {
   1510         return 0;
   1511     }
   1512     let Ok(entries) = fs::read_dir(path) else {
   1513         return 0;
   1514     };
   1515     entries
   1516         .filter_map(Result::ok)
   1517         .map(|entry| directory_size_bytes(&entry.path()))
   1518         .sum()
   1519 }
   1520 
   1521 #[cfg(test)]
   1522 fn protocol_client_message_to_runtime_for_test(
   1523     message: tangle_protocol::ClientMessage,
   1524 ) -> Result<RuntimeClientMessage, BaseRelayError> {
   1525     match message {
   1526         tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event(
   1527             crate::pocket_conversion::tangle_event_to_pocket(&event)?,
   1528         )),
   1529         tangle_protocol::ClientMessage::Req {
   1530             subscription_id,
   1531             filters,
   1532         } => Ok(RuntimeClientMessage::Req {
   1533             subscription_id,
   1534             search_present: filters.iter().any(|filter| filter.search().is_some()),
   1535             filters: filters
   1536                 .iter()
   1537                 .map(crate::pocket_conversion::tangle_filter_to_pocket)
   1538                 .collect::<Result<Vec<_>, _>>()?,
   1539         }),
   1540         tangle_protocol::ClientMessage::Count {
   1541             subscription_id,
   1542             filters,
   1543         } => Ok(RuntimeClientMessage::Count {
   1544             subscription_id,
   1545             search_present: filters.iter().any(|filter| filter.search().is_some()),
   1546             filters: filters
   1547                 .iter()
   1548                 .map(crate::pocket_conversion::tangle_filter_to_pocket)
   1549                 .collect::<Result<Vec<_>, _>>()?,
   1550         }),
   1551         tangle_protocol::ClientMessage::Close(subscription_id) => {
   1552             Ok(RuntimeClientMessage::Close(subscription_id))
   1553         }
   1554         tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth(
   1555             crate::pocket_conversion::tangle_event_to_pocket(&event)?,
   1556         )),
   1557         tangle_protocol::ClientMessage::NegOpen {
   1558             subscription_id,
   1559             filter,
   1560             message,
   1561         } => Ok(RuntimeClientMessage::NegOpen {
   1562             subscription_id,
   1563             filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?,
   1564             message,
   1565         }),
   1566         tangle_protocol::ClientMessage::NegMsg {
   1567             subscription_id,
   1568             message,
   1569         } => Ok(RuntimeClientMessage::NegMsg {
   1570             subscription_id,
   1571             message,
   1572         }),
   1573         tangle_protocol::ClientMessage::NegClose(subscription_id) => {
   1574             Ok(RuntimeClientMessage::NegClose(subscription_id))
   1575         }
   1576     }
   1577 }
   1578 
   1579 fn runtime_client_message_metric_kind(
   1580     message: &RuntimeClientMessage,
   1581 ) -> TangleClientMessageMetricKind {
   1582     match message {
   1583         RuntimeClientMessage::Event(_) => TangleClientMessageMetricKind::Event,
   1584         RuntimeClientMessage::Req { .. } => TangleClientMessageMetricKind::Req,
   1585         RuntimeClientMessage::Count { .. } => TangleClientMessageMetricKind::Count,
   1586         RuntimeClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth,
   1587         RuntimeClientMessage::Close(_) => TangleClientMessageMetricKind::Close,
   1588         RuntimeClientMessage::NegOpen { .. }
   1589         | RuntimeClientMessage::NegMsg { .. }
   1590         | RuntimeClientMessage::NegClose(_) => TangleClientMessageMetricKind::Negentropy,
   1591     }
   1592 }
   1593 
   1594 fn pocket_filter_group_ids(filters: &[PocketOwnedFilter]) -> Vec<GroupId> {
   1595     let mut group_ids = BTreeSet::new();
   1596     for filter in filters {
   1597         let Ok(tags) = filter.tags() else {
   1598             continue;
   1599         };
   1600         for mut tag in tags.iter() {
   1601             let name = tag.next();
   1602             if !matches!(name, Some(value) if matches!(value, b"h" | b"d")) {
   1603                 continue;
   1604             }
   1605             for value in tag {
   1606                 if let Ok(value) = std::str::from_utf8(value)
   1607                     && let Ok(group_id) = GroupId::new(value)
   1608                 {
   1609                     group_ids.insert(group_id);
   1610                 }
   1611             }
   1612         }
   1613     }
   1614     group_ids.into_iter().collect()
   1615 }
   1616 
   1617 fn pocket_filter_kinds(filters: &[PocketOwnedFilter]) -> Vec<Kind> {
   1618     filters
   1619         .iter()
   1620         .flat_map(|filter| filter.kinds())
   1621         .filter_map(|kind| Kind::new(u64::from(kind.as_u16())).ok())
   1622         .collect::<BTreeSet<_>>()
   1623         .into_iter()
   1624         .collect()
   1625 }
   1626 
   1627 impl fmt::Debug for RelayRuntimeHandle {
   1628     fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
   1629         formatter.write_str("RelayRuntimeHandle")
   1630     }
   1631 }
   1632 
   1633 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
   1634 pub struct TangleRuntimeLimits {
   1635     max_message_length: usize,
   1636     base_relay_limits: BaseRelayLimits,
   1637     event_bus_capacity: usize,
   1638     outbound_queue_capacity: usize,
   1639 }
   1640 
   1641 impl TangleRuntimeLimits {
   1642     pub fn new(
   1643         max_message_length: usize,
   1644         base_relay_limits: BaseRelayLimits,
   1645         event_bus_capacity: usize,
   1646         outbound_queue_capacity: usize,
   1647     ) -> Result<Self, BaseRelayError> {
   1648         if max_message_length == 0 {
   1649             return Err(BaseRelayError::invalid(
   1650                 "runtime max message length must be greater than zero",
   1651             ));
   1652         }
   1653         if event_bus_capacity == 0 {
   1654             return Err(BaseRelayError::invalid(
   1655                 "runtime event bus capacity must be greater than zero",
   1656             ));
   1657         }
   1658         if outbound_queue_capacity == 0 {
   1659             return Err(BaseRelayError::invalid(
   1660                 "runtime outbound queue capacity must be greater than zero",
   1661             ));
   1662         }
   1663         Ok(Self {
   1664             max_message_length,
   1665             base_relay_limits,
   1666             event_bus_capacity,
   1667             outbound_queue_capacity,
   1668         })
   1669     }
   1670 
   1671     pub fn from_config(config: &BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> {
   1672         let limits = config.limits();
   1673         Self::new(
   1674             limits.max_message_length(),
   1675             limits.base_relay_limits()?,
   1676             limits.broadcast_channel_capacity(),
   1677             limits.per_connection_outbound_queue(),
   1678         )
   1679     }
   1680 
   1681     pub fn max_message_length(self) -> usize {
   1682         self.max_message_length
   1683     }
   1684 
   1685     pub fn base_relay_limits(self) -> BaseRelayLimits {
   1686         self.base_relay_limits
   1687     }
   1688 
   1689     pub fn max_pending_events(self) -> usize {
   1690         self.base_relay_limits.max_pending_events()
   1691     }
   1692 
   1693     pub fn event_bus_capacity(self) -> usize {
   1694         self.event_bus_capacity
   1695     }
   1696 
   1697     pub fn outbound_queue_capacity(self) -> usize {
   1698         self.outbound_queue_capacity
   1699     }
   1700 }
   1701 
   1702 #[derive(Debug, Clone)]
   1703 pub struct TangleRuntimeMetrics {
   1704     inner: Arc<TangleRuntimeMetricsInner>,
   1705 }
   1706 
   1707 #[derive(Debug)]
   1708 struct TangleRuntimeMetricsInner {
   1709     started_at: Instant,
   1710     active_sessions: AtomicUsize,
   1711     total_sessions: AtomicU64,
   1712     client_messages: AtomicU64,
   1713     event_messages: AtomicU64,
   1714     req_messages: AtomicU64,
   1715     count_messages: AtomicU64,
   1716     auth_messages: AtomicU64,
   1717     close_messages: AtomicU64,
   1718     opened_subscriptions: AtomicU64,
   1719     closed_subscriptions: AtomicU64,
   1720     stored_event_offsets: AtomicU64,
   1721     rate_limit_rejections: AtomicU64,
   1722     auth_successes: AtomicU64,
   1723     auth_failures: AtomicU64,
   1724     event_admissions: AtomicU64,
   1725     event_rejections: AtomicU64,
   1726     group_read_denials: AtomicU64,
   1727     group_write_denials: AtomicU64,
   1728     event_bus_receivers_current: AtomicUsize,
   1729     event_bus_published_offsets: AtomicU64,
   1730     event_bus_lagged_receivers: AtomicU64,
   1731     event_bus_lagged_offsets: AtomicU64,
   1732     outbound_queue_full_closes: AtomicU64,
   1733     outbox_pending_events: AtomicUsize,
   1734     outbox_replayed_events: AtomicU64,
   1735     disk_used_bytes: AtomicU64,
   1736     event_admission_latency_total_micros: AtomicU64,
   1737     event_admission_latency_count: AtomicU64,
   1738     query_latency_total_micros: AtomicU64,
   1739     query_latency_count: AtomicU64,
   1740     query_candidates_scanned: AtomicU64,
   1741     query_returned_events: AtomicU64,
   1742     query_redacted_events: AtomicU64,
   1743     count_refusals: AtomicU64,
   1744     broad_query_rejections: AtomicU64,
   1745 }
   1746 
   1747 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
   1748 pub enum TangleClientMessageMetricKind {
   1749     Event,
   1750     Req,
   1751     Count,
   1752     Auth,
   1753     Close,
   1754     Negentropy,
   1755 }
   1756 
   1757 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
   1758 pub struct TangleRuntimeMetricsSnapshot {
   1759     tangle_runtime_uptime_seconds: u64,
   1760     tangle_readiness_ready: bool,
   1761     tangle_ws_connections_current: usize,
   1762     tangle_ws_connections_total: u64,
   1763     tangle_client_messages_total: u64,
   1764     tangle_event_messages_total: u64,
   1765     tangle_req_messages_total: u64,
   1766     tangle_count_messages_total: u64,
   1767     tangle_auth_messages_total: u64,
   1768     tangle_close_messages_total: u64,
   1769     tangle_subscriptions_opened_total: u64,
   1770     tangle_subscriptions_closed_total: u64,
   1771     tangle_stored_event_offsets_total: u64,
   1772     tangle_rate_limit_rejections_total: u64,
   1773     tangle_auth_success_total: u64,
   1774     tangle_auth_failure_total: u64,
   1775     tangle_event_admitted_total: u64,
   1776     tangle_event_rejected_total: u64,
   1777     tangle_group_read_denied_total: u64,
   1778     tangle_group_write_denied_total: u64,
   1779     tangle_event_bus_receivers_current: usize,
   1780     tangle_event_bus_published_offsets_total: u64,
   1781     tangle_event_bus_lagged_receivers_total: u64,
   1782     tangle_event_bus_lagged_offsets_total: u64,
   1783     tangle_outbound_queue_full_closes_total: u64,
   1784     tangle_outbox_pending_events: usize,
   1785     tangle_outbox_replayed_events_total: u64,
   1786     tangle_disk_used_bytes: u64,
   1787     tangle_event_admission_latency_total_micros: u64,
   1788     tangle_event_admission_latency_count: u64,
   1789     tangle_query_latency_total_micros: u64,
   1790     tangle_query_latency_count: u64,
   1791     tangle_query_candidates_scanned_total: u64,
   1792     tangle_query_returned_events_total: u64,
   1793     tangle_query_redacted_events_total: u64,
   1794     tangle_count_refusals_total: u64,
   1795     tangle_broad_query_rejections_total: u64,
   1796 }
   1797 
   1798 impl TangleRuntimeMetricsSnapshot {
   1799     pub fn active_sessions(&self) -> usize {
   1800         self.tangle_ws_connections_current
   1801     }
   1802 
   1803     pub fn total_sessions(&self) -> u64 {
   1804         self.tangle_ws_connections_total
   1805     }
   1806 
   1807     pub fn client_messages(&self) -> u64 {
   1808         self.tangle_client_messages_total
   1809     }
   1810 
   1811     pub fn event_messages(&self) -> u64 {
   1812         self.tangle_event_messages_total
   1813     }
   1814 
   1815     pub fn req_messages(&self) -> u64 {
   1816         self.tangle_req_messages_total
   1817     }
   1818 
   1819     pub fn count_messages(&self) -> u64 {
   1820         self.tangle_count_messages_total
   1821     }
   1822 
   1823     pub fn auth_messages(&self) -> u64 {
   1824         self.tangle_auth_messages_total
   1825     }
   1826 
   1827     pub fn close_messages(&self) -> u64 {
   1828         self.tangle_close_messages_total
   1829     }
   1830 
   1831     pub fn opened_subscriptions(&self) -> u64 {
   1832         self.tangle_subscriptions_opened_total
   1833     }
   1834 
   1835     pub fn closed_subscriptions(&self) -> u64 {
   1836         self.tangle_subscriptions_closed_total
   1837     }
   1838 
   1839     pub fn stored_event_offsets(&self) -> u64 {
   1840         self.tangle_stored_event_offsets_total
   1841     }
   1842 
   1843     pub fn rate_limit_rejections(&self) -> u64 {
   1844         self.tangle_rate_limit_rejections_total
   1845     }
   1846 }
   1847 
   1848 impl TangleRuntimeMetrics {
   1849     pub fn new() -> Self {
   1850         Self {
   1851             inner: Arc::new(TangleRuntimeMetricsInner {
   1852                 started_at: Instant::now(),
   1853                 active_sessions: AtomicUsize::new(0),
   1854                 total_sessions: AtomicU64::new(0),
   1855                 client_messages: AtomicU64::new(0),
   1856                 event_messages: AtomicU64::new(0),
   1857                 req_messages: AtomicU64::new(0),
   1858                 count_messages: AtomicU64::new(0),
   1859                 auth_messages: AtomicU64::new(0),
   1860                 close_messages: AtomicU64::new(0),
   1861                 opened_subscriptions: AtomicU64::new(0),
   1862                 closed_subscriptions: AtomicU64::new(0),
   1863                 stored_event_offsets: AtomicU64::new(0),
   1864                 rate_limit_rejections: AtomicU64::new(0),
   1865                 auth_successes: AtomicU64::new(0),
   1866                 auth_failures: AtomicU64::new(0),
   1867                 event_admissions: AtomicU64::new(0),
   1868                 event_rejections: AtomicU64::new(0),
   1869                 group_read_denials: AtomicU64::new(0),
   1870                 group_write_denials: AtomicU64::new(0),
   1871                 event_bus_receivers_current: AtomicUsize::new(0),
   1872                 event_bus_published_offsets: AtomicU64::new(0),
   1873                 event_bus_lagged_receivers: AtomicU64::new(0),
   1874                 event_bus_lagged_offsets: AtomicU64::new(0),
   1875                 outbound_queue_full_closes: AtomicU64::new(0),
   1876                 outbox_pending_events: AtomicUsize::new(0),
   1877                 outbox_replayed_events: AtomicU64::new(0),
   1878                 disk_used_bytes: AtomicU64::new(0),
   1879                 event_admission_latency_total_micros: AtomicU64::new(0),
   1880                 event_admission_latency_count: AtomicU64::new(0),
   1881                 query_latency_total_micros: AtomicU64::new(0),
   1882                 query_latency_count: AtomicU64::new(0),
   1883                 query_candidates_scanned: AtomicU64::new(0),
   1884                 query_returned_events: AtomicU64::new(0),
   1885                 query_redacted_events: AtomicU64::new(0),
   1886                 count_refusals: AtomicU64::new(0),
   1887                 broad_query_rejections: AtomicU64::new(0),
   1888             }),
   1889         }
   1890     }
   1891 
   1892     pub fn snapshot(&self) -> TangleRuntimeMetricsSnapshot {
   1893         self.snapshot_with_readiness(false)
   1894     }
   1895 
   1896     pub fn snapshot_with_readiness(&self, readiness_ready: bool) -> TangleRuntimeMetricsSnapshot {
   1897         TangleRuntimeMetricsSnapshot {
   1898             tangle_runtime_uptime_seconds: self.started_at().elapsed().as_secs(),
   1899             tangle_readiness_ready: readiness_ready,
   1900             tangle_ws_connections_current: self.active_sessions(),
   1901             tangle_ws_connections_total: self.total_sessions(),
   1902             tangle_client_messages_total: self.client_messages(),
   1903             tangle_event_messages_total: self.event_messages(),
   1904             tangle_req_messages_total: self.req_messages(),
   1905             tangle_count_messages_total: self.count_messages(),
   1906             tangle_auth_messages_total: self.auth_messages(),
   1907             tangle_close_messages_total: self.close_messages(),
   1908             tangle_subscriptions_opened_total: self.opened_subscriptions(),
   1909             tangle_subscriptions_closed_total: self.closed_subscriptions(),
   1910             tangle_stored_event_offsets_total: self.stored_event_offsets(),
   1911             tangle_rate_limit_rejections_total: self.rate_limit_rejections(),
   1912             tangle_auth_success_total: self.auth_successes(),
   1913             tangle_auth_failure_total: self.auth_failures(),
   1914             tangle_event_admitted_total: self.event_admissions(),
   1915             tangle_event_rejected_total: self.event_rejections(),
   1916             tangle_group_read_denied_total: self.group_read_denials(),
   1917             tangle_group_write_denied_total: self.group_write_denials(),
   1918             tangle_event_bus_receivers_current: self.event_bus_receivers_current(),
   1919             tangle_event_bus_published_offsets_total: self.event_bus_published_offsets(),
   1920             tangle_event_bus_lagged_receivers_total: self.event_bus_lagged_receivers(),
   1921             tangle_event_bus_lagged_offsets_total: self.event_bus_lagged_offsets(),
   1922             tangle_outbound_queue_full_closes_total: self.outbound_queue_full_closes(),
   1923             tangle_outbox_pending_events: self.outbox_pending_events(),
   1924             tangle_outbox_replayed_events_total: self.outbox_replayed_events(),
   1925             tangle_disk_used_bytes: self.disk_used_bytes(),
   1926             tangle_event_admission_latency_total_micros: self
   1927                 .event_admission_latency_total_micros(),
   1928             tangle_event_admission_latency_count: self.event_admission_latency_count(),
   1929             tangle_query_latency_total_micros: self.query_latency_total_micros(),
   1930             tangle_query_latency_count: self.query_latency_count(),
   1931             tangle_query_candidates_scanned_total: self.query_candidates_scanned(),
   1932             tangle_query_returned_events_total: self.query_returned_events(),
   1933             tangle_query_redacted_events_total: self.query_redacted_events(),
   1934             tangle_count_refusals_total: self.count_refusals(),
   1935             tangle_broad_query_rejections_total: self.broad_query_rejections(),
   1936         }
   1937     }
   1938 
   1939     pub fn started_at(&self) -> Instant {
   1940         self.inner.started_at
   1941     }
   1942 
   1943     pub fn active_sessions(&self) -> usize {
   1944         self.inner.active_sessions.load(Ordering::Relaxed)
   1945     }
   1946 
   1947     pub fn total_sessions(&self) -> u64 {
   1948         self.inner.total_sessions.load(Ordering::Relaxed)
   1949     }
   1950 
   1951     pub fn client_messages(&self) -> u64 {
   1952         self.inner.client_messages.load(Ordering::Relaxed)
   1953     }
   1954 
   1955     pub fn event_messages(&self) -> u64 {
   1956         self.inner.event_messages.load(Ordering::Relaxed)
   1957     }
   1958 
   1959     pub fn req_messages(&self) -> u64 {
   1960         self.inner.req_messages.load(Ordering::Relaxed)
   1961     }
   1962 
   1963     pub fn count_messages(&self) -> u64 {
   1964         self.inner.count_messages.load(Ordering::Relaxed)
   1965     }
   1966 
   1967     pub fn auth_messages(&self) -> u64 {
   1968         self.inner.auth_messages.load(Ordering::Relaxed)
   1969     }
   1970 
   1971     pub fn close_messages(&self) -> u64 {
   1972         self.inner.close_messages.load(Ordering::Relaxed)
   1973     }
   1974 
   1975     pub fn opened_subscriptions(&self) -> u64 {
   1976         self.inner.opened_subscriptions.load(Ordering::Relaxed)
   1977     }
   1978 
   1979     pub fn closed_subscriptions(&self) -> u64 {
   1980         self.inner.closed_subscriptions.load(Ordering::Relaxed)
   1981     }
   1982 
   1983     pub fn stored_event_offsets(&self) -> u64 {
   1984         self.inner.stored_event_offsets.load(Ordering::Relaxed)
   1985     }
   1986 
   1987     pub fn rate_limit_rejections(&self) -> u64 {
   1988         self.inner.rate_limit_rejections.load(Ordering::Relaxed)
   1989     }
   1990 
   1991     pub fn auth_successes(&self) -> u64 {
   1992         self.inner.auth_successes.load(Ordering::Relaxed)
   1993     }
   1994 
   1995     pub fn auth_failures(&self) -> u64 {
   1996         self.inner.auth_failures.load(Ordering::Relaxed)
   1997     }
   1998 
   1999     pub fn event_admissions(&self) -> u64 {
   2000         self.inner.event_admissions.load(Ordering::Relaxed)
   2001     }
   2002 
   2003     pub fn event_rejections(&self) -> u64 {
   2004         self.inner.event_rejections.load(Ordering::Relaxed)
   2005     }
   2006 
   2007     pub fn group_read_denials(&self) -> u64 {
   2008         self.inner.group_read_denials.load(Ordering::Relaxed)
   2009     }
   2010 
   2011     pub fn group_write_denials(&self) -> u64 {
   2012         self.inner.group_write_denials.load(Ordering::Relaxed)
   2013     }
   2014 
   2015     pub fn event_bus_receivers_current(&self) -> usize {
   2016         self.inner
   2017             .event_bus_receivers_current
   2018             .load(Ordering::Relaxed)
   2019     }
   2020 
   2021     pub fn event_bus_published_offsets(&self) -> u64 {
   2022         self.inner
   2023             .event_bus_published_offsets
   2024             .load(Ordering::Relaxed)
   2025     }
   2026 
   2027     pub fn event_bus_lagged_receivers(&self) -> u64 {
   2028         self.inner
   2029             .event_bus_lagged_receivers
   2030             .load(Ordering::Relaxed)
   2031     }
   2032 
   2033     pub fn event_bus_lagged_offsets(&self) -> u64 {
   2034         self.inner.event_bus_lagged_offsets.load(Ordering::Relaxed)
   2035     }
   2036 
   2037     pub fn outbound_queue_full_closes(&self) -> u64 {
   2038         self.inner
   2039             .outbound_queue_full_closes
   2040             .load(Ordering::Relaxed)
   2041     }
   2042 
   2043     pub fn outbox_pending_events(&self) -> usize {
   2044         self.inner.outbox_pending_events.load(Ordering::Relaxed)
   2045     }
   2046 
   2047     pub fn outbox_replayed_events(&self) -> u64 {
   2048         self.inner.outbox_replayed_events.load(Ordering::Relaxed)
   2049     }
   2050 
   2051     pub fn disk_used_bytes(&self) -> u64 {
   2052         self.inner.disk_used_bytes.load(Ordering::Relaxed)
   2053     }
   2054 
   2055     pub fn event_admission_latency_total_micros(&self) -> u64 {
   2056         self.inner
   2057             .event_admission_latency_total_micros
   2058             .load(Ordering::Relaxed)
   2059     }
   2060 
   2061     pub fn event_admission_latency_count(&self) -> u64 {
   2062         self.inner
   2063             .event_admission_latency_count
   2064             .load(Ordering::Relaxed)
   2065     }
   2066 
   2067     pub fn query_latency_total_micros(&self) -> u64 {
   2068         self.inner
   2069             .query_latency_total_micros
   2070             .load(Ordering::Relaxed)
   2071     }
   2072 
   2073     pub fn query_latency_count(&self) -> u64 {
   2074         self.inner.query_latency_count.load(Ordering::Relaxed)
   2075     }
   2076 
   2077     pub fn query_candidates_scanned(&self) -> u64 {
   2078         self.inner.query_candidates_scanned.load(Ordering::Relaxed)
   2079     }
   2080 
   2081     pub fn query_returned_events(&self) -> u64 {
   2082         self.inner.query_returned_events.load(Ordering::Relaxed)
   2083     }
   2084 
   2085     pub fn query_redacted_events(&self) -> u64 {
   2086         self.inner.query_redacted_events.load(Ordering::Relaxed)
   2087     }
   2088 
   2089     pub fn count_refusals(&self) -> u64 {
   2090         self.inner.count_refusals.load(Ordering::Relaxed)
   2091     }
   2092 
   2093     pub fn broad_query_rejections(&self) -> u64 {
   2094         self.inner.broad_query_rejections.load(Ordering::Relaxed)
   2095     }
   2096 
   2097     pub fn record_session_opened(&self) -> usize {
   2098         self.inner.total_sessions.fetch_add(1, Ordering::Relaxed);
   2099         self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1
   2100     }
   2101 
   2102     pub fn record_session_closed(&self) -> usize {
   2103         let mut current = self.inner.active_sessions.load(Ordering::Relaxed);
   2104         loop {
   2105             if current == 0 {
   2106                 return 0;
   2107             }
   2108             match self.inner.active_sessions.compare_exchange(
   2109                 current,
   2110                 current - 1,
   2111                 Ordering::Relaxed,
   2112                 Ordering::Relaxed,
   2113             ) {
   2114                 Ok(_) => return current - 1,
   2115                 Err(actual) => current = actual,
   2116             }
   2117         }
   2118     }
   2119 
   2120     pub fn record_client_message(&self, kind: TangleClientMessageMetricKind) -> u64 {
   2121         let total = self.inner.client_messages.fetch_add(1, Ordering::Relaxed) + 1;
   2122         match kind {
   2123             TangleClientMessageMetricKind::Event => {
   2124                 self.inner.event_messages.fetch_add(1, Ordering::Relaxed);
   2125             }
   2126             TangleClientMessageMetricKind::Req => {
   2127                 self.inner.req_messages.fetch_add(1, Ordering::Relaxed);
   2128             }
   2129             TangleClientMessageMetricKind::Count => {
   2130                 self.inner.count_messages.fetch_add(1, Ordering::Relaxed);
   2131             }
   2132             TangleClientMessageMetricKind::Auth => {
   2133                 self.inner.auth_messages.fetch_add(1, Ordering::Relaxed);
   2134             }
   2135             TangleClientMessageMetricKind::Close => {
   2136                 self.inner.close_messages.fetch_add(1, Ordering::Relaxed);
   2137             }
   2138             TangleClientMessageMetricKind::Negentropy => {}
   2139         };
   2140         total
   2141     }
   2142 
   2143     pub fn record_subscription_opened(&self) -> u64 {
   2144         self.inner
   2145             .opened_subscriptions
   2146             .fetch_add(1, Ordering::Relaxed)
   2147             + 1
   2148     }
   2149 
   2150     pub fn record_subscriptions_closed(&self, count: usize) -> u64 {
   2151         self.inner.closed_subscriptions.fetch_add(
   2152             u64::try_from(count).expect("subscription count fits in u64"),
   2153             Ordering::Relaxed,
   2154         ) + u64::try_from(count).expect("subscription count fits in u64")
   2155     }
   2156 
   2157     pub fn record_stored_event_offset(&self) -> u64 {
   2158         self.inner
   2159             .stored_event_offsets
   2160             .fetch_add(1, Ordering::Relaxed)
   2161             + 1
   2162     }
   2163 
   2164     pub fn record_rate_limit_rejection(&self) -> u64 {
   2165         self.inner
   2166             .rate_limit_rejections
   2167             .fetch_add(1, Ordering::Relaxed)
   2168             + 1
   2169     }
   2170 
   2171     pub fn record_auth_success(&self) -> u64 {
   2172         self.inner.auth_successes.fetch_add(1, Ordering::Relaxed) + 1
   2173     }
   2174 
   2175     pub fn record_auth_failure(&self) -> u64 {
   2176         self.inner.auth_failures.fetch_add(1, Ordering::Relaxed) + 1
   2177     }
   2178 
   2179     pub fn record_event_admission(&self) -> u64 {
   2180         self.inner.event_admissions.fetch_add(1, Ordering::Relaxed) + 1
   2181     }
   2182 
   2183     pub fn record_event_rejection(&self) -> u64 {
   2184         self.inner.event_rejections.fetch_add(1, Ordering::Relaxed) + 1
   2185     }
   2186 
   2187     pub fn record_group_read_denial(&self) -> u64 {
   2188         self.inner
   2189             .group_read_denials
   2190             .fetch_add(1, Ordering::Relaxed)
   2191             + 1
   2192     }
   2193 
   2194     pub fn record_group_write_denial(&self) -> u64 {
   2195         self.inner
   2196             .group_write_denials
   2197             .fetch_add(1, Ordering::Relaxed)
   2198             + 1
   2199     }
   2200 
   2201     pub fn record_event_bus_receivers(&self, count: usize) {
   2202         self.inner
   2203             .event_bus_receivers_current
   2204             .store(count, Ordering::Relaxed);
   2205     }
   2206 
   2207     pub fn record_event_bus_publish(&self, receivers: usize) -> u64 {
   2208         self.record_event_bus_receivers(receivers);
   2209         self.inner
   2210             .event_bus_published_offsets
   2211             .fetch_add(1, Ordering::Relaxed)
   2212             + 1
   2213     }
   2214 
   2215     pub fn record_event_bus_lagged(&self, skipped: u64) {
   2216         self.inner
   2217             .event_bus_lagged_receivers
   2218             .fetch_add(1, Ordering::Relaxed);
   2219         self.inner
   2220             .event_bus_lagged_offsets
   2221             .fetch_add(skipped, Ordering::Relaxed);
   2222     }
   2223 
   2224     pub fn record_outbound_queue_full_close(&self) -> u64 {
   2225         self.inner
   2226             .outbound_queue_full_closes
   2227             .fetch_add(1, Ordering::Relaxed)
   2228             + 1
   2229     }
   2230 
   2231     pub fn record_outbox_pending_events(&self, count: usize) {
   2232         self.inner
   2233             .outbox_pending_events
   2234             .store(count, Ordering::Relaxed);
   2235     }
   2236 
   2237     pub fn record_outbox_replayed_event(&self) -> u64 {
   2238         self.inner
   2239             .outbox_replayed_events
   2240             .fetch_add(1, Ordering::Relaxed)
   2241             + 1
   2242     }
   2243 
   2244     pub fn record_disk_used_bytes(&self, bytes: u64) {
   2245         self.inner.disk_used_bytes.store(bytes, Ordering::Relaxed);
   2246     }
   2247 
   2248     pub fn record_event_admission_latency(&self, micros: u64) {
   2249         self.inner
   2250             .event_admission_latency_total_micros
   2251             .fetch_add(micros, Ordering::Relaxed);
   2252         self.inner
   2253             .event_admission_latency_count
   2254             .fetch_add(1, Ordering::Relaxed);
   2255     }
   2256 
   2257     pub fn record_query_latency(&self, micros: u64) {
   2258         self.inner
   2259             .query_latency_total_micros
   2260             .fetch_add(micros, Ordering::Relaxed);
   2261         self.inner
   2262             .query_latency_count
   2263             .fetch_add(1, Ordering::Relaxed);
   2264     }
   2265 
   2266     pub(crate) fn record_query_metrics(&self, metrics: BaseRelayQueryMetrics) {
   2267         self.inner
   2268             .query_candidates_scanned
   2269             .fetch_add(metrics.candidates_scanned(), Ordering::Relaxed);
   2270         self.inner
   2271             .query_returned_events
   2272             .fetch_add(metrics.returned_events(), Ordering::Relaxed);
   2273         self.inner
   2274             .query_redacted_events
   2275             .fetch_add(metrics.redacted_events(), Ordering::Relaxed);
   2276     }
   2277 
   2278     pub fn record_count_refusal(&self) -> u64 {
   2279         self.inner.count_refusals.fetch_add(1, Ordering::Relaxed) + 1
   2280     }
   2281 
   2282     pub fn record_broad_query_rejection(&self) -> u64 {
   2283         self.inner
   2284             .broad_query_rejections
   2285             .fetch_add(1, Ordering::Relaxed)
   2286             + 1
   2287     }
   2288 }
   2289 
   2290 impl Default for TangleRuntimeMetrics {
   2291     fn default() -> Self {
   2292         Self::new()
   2293     }
   2294 }
   2295 
   2296 #[derive(Debug, Clone)]
   2297 pub struct TangleShutdownSignal {
   2298     sender: watch::Sender<bool>,
   2299 }
   2300 
   2301 impl TangleShutdownSignal {
   2302     pub fn new() -> Self {
   2303         let (sender, _) = watch::channel(false);
   2304         Self { sender }
   2305     }
   2306 
   2307     pub fn subscribe(&self) -> watch::Receiver<bool> {
   2308         self.sender.subscribe()
   2309     }
   2310 
   2311     pub fn request_shutdown(&self) {
   2312         self.sender.send_replace(true);
   2313     }
   2314 
   2315     pub fn requested(&self) -> bool {
   2316         *self.sender.borrow()
   2317     }
   2318 }
   2319 
   2320 impl Default for TangleShutdownSignal {
   2321     fn default() -> Self {
   2322         Self::new()
   2323     }
   2324 }
   2325 
   2326 #[cfg(test)]
   2327 mod tests {
   2328     use super::{
   2329         BROAD_QUERY_TIME_WINDOW_SECONDS, EventAdmissionDecision, RelayEventAdmissionContext,
   2330         RelayEventStoredContext, RelayRuntime, RelayRuntimeHandle, RelayRuntimeHooks,
   2331         RuntimeClientMessage, TangleBroadQueryReason, TangleClientRateLimitContext,
   2332         TangleQueryClassification, TangleQueryClassifier, TangleRuntimeLimits,
   2333     };
   2334     use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
   2335     use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver};
   2336     use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope};
   2337     use crate::relay::auth::BaseAuthState;
   2338     use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics};
   2339     use crate::relay::live::LiveSubscriptionSet;
   2340     use crate::relay::outbound::RuntimeRelayMessage;
   2341     use serde_json::json;
   2342     use std::{
   2343         collections::{BTreeMap, BTreeSet},
   2344         net::{IpAddr, Ipv4Addr},
   2345         path::{Path, PathBuf},
   2346         sync::{Arc, Mutex},
   2347         time::Duration,
   2348     };
   2349     use tangle_crypto::RelaySigner;
   2350     use tangle_groups::{
   2351         CanonicalGroupEvent, GroupEventClass, GroupId, GroupProjection, KIND_GROUP_ADMINS,
   2352         KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST,
   2353         KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER,
   2354         KIND_GROUP_REMOVE_USER, MemberStatus, StoreOffset, rebuild_group_projection,
   2355     };
   2356     use tangle_protocol::{
   2357         ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex,
   2358         SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, filter_from_value,
   2359     };
   2360     use tangle_store_pocket::{
   2361         PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime,
   2362     };
   2363     use tangle_test_support::FixtureKey;
   2364 
   2365     #[test]
   2366     fn tangle_runtime_opens_owned_process_shell_from_config() {
   2367         let root = temp_root("owned-runtime");
   2368         let _ = std::fs::remove_dir_all(&root);
   2369         let config = runtime_config(&root, 8);
   2370 
   2371         let mut runtime = RelayRuntime::open(config).expect("runtime");
   2372         let mut offsets = runtime.event_bus().subscribe();
   2373         let shutdown = runtime.shutdown_signal().subscribe();
   2374 
   2375         assert_eq!(runtime.config().relay_url(), "wss://relay.radroots.test");
   2376         assert_eq!(runtime.config().listen_addr().to_string(), "127.0.0.1:0");
   2377         assert_eq!(runtime.limits().max_pending_events(), 8);
   2378         assert_eq!(runtime.limits().max_message_length(), 1_048_576);
   2379         assert_eq!(runtime.limits().event_bus_capacity(), 16);
   2380         assert_eq!(runtime.limits().outbound_queue_capacity(), 8);
   2381         assert_eq!(runtime.event_bus().capacity(), 16);
   2382         assert_eq!(runtime.event_bus().receiver_count(), 1);
   2383         assert_eq!(runtime.rate_limiter().tracked_key_count(), 0);
   2384         assert_eq!(runtime.metrics().active_sessions(), 0);
   2385         assert_eq!(runtime.metrics().stored_event_offsets(), 0);
   2386         assert!(runtime.relay().groups_enabled());
   2387         assert!(!runtime.readiness_state().is_ready());
   2388         assert_eq!(
   2389             runtime.readiness_state().response().checks.server_bind,
   2390             "not_ready"
   2391         );
   2392         assert_eq!(
   2393             runtime
   2394                 .readiness_state()
   2395                 .response()
   2396                 .checks
   2397                 .group_outbox_replay,
   2398             "ready"
   2399         );
   2400         assert_eq!(
   2401             runtime.readiness_state().response().checks.event_bus,
   2402             "ready"
   2403         );
   2404         assert!(!*shutdown.borrow());
   2405 
   2406         assert_eq!(runtime.event_bus().publish(StoreOffset::new(42)), 1);
   2407         assert_eq!(offsets.try_recv().expect("offset"), StoreOffset::new(42));
   2408         assert_eq!(
   2409             runtime
   2410                 .auth_state()
   2411                 .expect("auth")
   2412                 .authenticated_pubkeys()
   2413                 .len(),
   2414             0
   2415         );
   2416         assert_eq!(
   2417             runtime.config().pocket_config().data_directory(),
   2418             Path::new(&root).join("pocket")
   2419         );
   2420 
   2421         assert_eq!(runtime.metrics().record_session_opened(), 1);
   2422         assert_eq!(runtime.metrics().active_sessions(), 1);
   2423         assert_eq!(runtime.metrics().total_sessions(), 1);
   2424         assert_eq!(runtime.metrics().record_session_closed(), 0);
   2425         assert_eq!(runtime.metrics().active_sessions(), 0);
   2426         assert_eq!(runtime.metrics().total_sessions(), 1);
   2427         assert_eq!(
   2428             runtime
   2429                 .metrics()
   2430                 .record_client_message(super::TangleClientMessageMetricKind::Req),
   2431             1
   2432         );
   2433         assert_eq!(runtime.metrics().client_messages(), 1);
   2434         assert_eq!(runtime.metrics().req_messages(), 1);
   2435         assert_eq!(runtime.metrics().record_subscription_opened(), 1);
   2436         assert_eq!(runtime.metrics().opened_subscriptions(), 1);
   2437         assert_eq!(runtime.metrics().record_subscriptions_closed(1), 1);
   2438         assert_eq!(runtime.metrics().closed_subscriptions(), 1);
   2439         assert_eq!(runtime.metrics().record_stored_event_offset(), 1);
   2440         assert_eq!(runtime.metrics().stored_event_offsets(), 1);
   2441         assert_eq!(runtime.metrics().record_rate_limit_rejection(), 1);
   2442         assert_eq!(runtime.metrics().rate_limit_rejections(), 1);
   2443         assert_eq!(runtime.metrics().record_auth_success(), 1);
   2444         assert_eq!(runtime.metrics().record_auth_failure(), 1);
   2445         assert_eq!(runtime.metrics().record_event_admission(), 1);
   2446         assert_eq!(runtime.metrics().record_event_rejection(), 1);
   2447         assert_eq!(runtime.metrics().record_group_read_denial(), 1);
   2448         assert_eq!(runtime.metrics().record_group_write_denial(), 1);
   2449         runtime.metrics().record_event_bus_receivers(3);
   2450         assert_eq!(runtime.metrics().record_event_bus_publish(3), 1);
   2451         runtime.metrics().record_event_bus_lagged(4);
   2452         assert_eq!(runtime.metrics().record_outbound_queue_full_close(), 1);
   2453         runtime.metrics().record_outbox_pending_events(2);
   2454         assert_eq!(runtime.metrics().record_outbox_replayed_event(), 1);
   2455         runtime.metrics().record_disk_used_bytes(5);
   2456         runtime.metrics().record_event_admission_latency(13);
   2457         runtime.metrics().record_query_latency(17);
   2458         runtime
   2459             .metrics()
   2460             .record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2));
   2461         assert_eq!(runtime.metrics().record_count_refusal(), 1);
   2462         assert_eq!(runtime.metrics().record_broad_query_rejection(), 1);
   2463         let snapshot = runtime.metrics().snapshot_with_readiness(true);
   2464         assert_eq!(snapshot.active_sessions(), 0);
   2465         assert_eq!(snapshot.total_sessions(), 1);
   2466         assert_eq!(snapshot.client_messages(), 1);
   2467         assert_eq!(snapshot.req_messages(), 1);
   2468         assert_eq!(snapshot.opened_subscriptions(), 1);
   2469         assert_eq!(snapshot.closed_subscriptions(), 1);
   2470         assert_eq!(snapshot.stored_event_offsets(), 1);
   2471         assert_eq!(snapshot.rate_limit_rejections(), 1);
   2472         let snapshot_value = serde_json::to_value(snapshot).expect("snapshot json");
   2473         assert_eq!(snapshot_value["tangle_readiness_ready"], true);
   2474         assert_eq!(snapshot_value["tangle_auth_success_total"], 1);
   2475         assert_eq!(snapshot_value["tangle_auth_failure_total"], 1);
   2476         assert_eq!(snapshot_value["tangle_event_admitted_total"], 1);
   2477         assert_eq!(snapshot_value["tangle_event_rejected_total"], 1);
   2478         assert_eq!(snapshot_value["tangle_group_read_denied_total"], 1);
   2479         assert_eq!(snapshot_value["tangle_group_write_denied_total"], 1);
   2480         assert_eq!(snapshot_value["tangle_event_bus_receivers_current"], 3);
   2481         assert_eq!(
   2482             snapshot_value["tangle_event_bus_published_offsets_total"],
   2483             1
   2484         );
   2485         assert_eq!(snapshot_value["tangle_event_bus_lagged_receivers_total"], 1);
   2486         assert_eq!(snapshot_value["tangle_event_bus_lagged_offsets_total"], 4);
   2487         assert_eq!(snapshot_value["tangle_outbound_queue_full_closes_total"], 1);
   2488         assert_eq!(snapshot_value["tangle_outbox_pending_events"], 2);
   2489         assert_eq!(snapshot_value["tangle_outbox_replayed_events_total"], 1);
   2490         assert_eq!(snapshot_value["tangle_disk_used_bytes"], 5);
   2491         assert_eq!(
   2492             snapshot_value["tangle_event_admission_latency_total_micros"],
   2493             13
   2494         );
   2495         assert_eq!(snapshot_value["tangle_event_admission_latency_count"], 1);
   2496         assert_eq!(snapshot_value["tangle_query_latency_total_micros"], 17);
   2497         assert_eq!(snapshot_value["tangle_query_latency_count"], 1);
   2498         assert_eq!(snapshot_value["tangle_query_candidates_scanned_total"], 5);
   2499         assert_eq!(snapshot_value["tangle_query_returned_events_total"], 3);
   2500         assert_eq!(snapshot_value["tangle_query_redacted_events_total"], 2);
   2501         assert_eq!(snapshot_value["tangle_count_refusals_total"], 1);
   2502         assert_eq!(snapshot_value["tangle_broad_query_rejections_total"], 1);
   2503 
   2504         let report = runtime.shutdown().expect("shutdown");
   2505 
   2506         assert_eq!(report.closed_subscriptions(), 0);
   2507         assert!(runtime.shutdown_signal().requested());
   2508         assert!(*shutdown.borrow());
   2509 
   2510         let _ = std::fs::remove_dir_all(root);
   2511     }
   2512 
   2513     #[test]
   2514     fn runtime_metrics_snapshot_serializes_tangle_contract_keys() {
   2515         let metrics = super::TangleRuntimeMetrics::new();
   2516         metrics.record_session_opened();
   2517         metrics.record_auth_success();
   2518         metrics.record_event_admission();
   2519         metrics.record_event_bus_publish(1);
   2520         metrics.record_disk_used_bytes(42);
   2521         let snapshot = metrics.snapshot_with_readiness(true);
   2522         let value = serde_json::to_value(snapshot).expect("snapshot");
   2523 
   2524         assert_eq!(value["tangle_readiness_ready"], true);
   2525         assert_eq!(value["tangle_ws_connections_current"], 1);
   2526         assert_eq!(value["tangle_auth_success_total"], 1);
   2527         assert_eq!(value["tangle_event_admitted_total"], 1);
   2528         assert_eq!(value["tangle_event_bus_published_offsets_total"], 1);
   2529         assert_eq!(value["tangle_disk_used_bytes"], 42);
   2530         assert_eq!(value["tangle_outbound_queue_full_closes_total"], 0);
   2531         assert_eq!(value["tangle_query_candidates_scanned_total"], 0);
   2532         assert_eq!(value["tangle_query_returned_events_total"], 0);
   2533         assert_eq!(value["tangle_query_redacted_events_total"], 0);
   2534         assert_eq!(value["tangle_count_refusals_total"], 0);
   2535         assert_eq!(value["tangle_broad_query_rejections_total"], 0);
   2536         assert!(value.get("active_sessions").is_none());
   2537         assert!(value.get("stored_event_offsets").is_none());
   2538     }
   2539 
   2540     #[test]
   2541     fn runtime_limits_and_event_bus_reject_zero_capacity() {
   2542         assert!(TangleRuntimeLimits::new(0, runtime_relay_limits(1), 1, 1).is_err());
   2543         assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 0, 1).is_err());
   2544         assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 1, 0).is_err());
   2545         assert!(TangleEventBus::new(0).is_err());
   2546     }
   2547 
   2548     #[tokio::test]
   2549     async fn runtime_publishes_stored_event_offsets_for_live_fanout() {
   2550         let root = temp_root("runtime-offset-fanout");
   2551         let _ = std::fs::remove_dir_all(&root);
   2552         let handle =
   2553             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"));
   2554         let mut offsets = handle.subscribe_events().await;
   2555         let mut auth = handle.auth_state().await.expect("auth");
   2556         let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions");
   2557         let subscription_id = SubscriptionId::new("live-offset").expect("subscription");
   2558         subscriptions
   2559             .subscribe(
   2560                 subscription_id.clone(),
   2561                 vec![pocket_filter(json!({"kinds":[1]}))],
   2562             )
   2563             .expect("subscribe");
   2564         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "live")
   2565             .expect("event");
   2566 
   2567         assert_eq!(
   2568             handle
   2569                 .handle_protocol_client_message_for_test(
   2570                     ClientMessage::Event(event.clone()),
   2571                     &mut auth,
   2572                     UnixTimestamp::new(1_714_124_433)
   2573                 )
   2574                 .await
   2575                 .expect("event"),
   2576             vec![RelayMessage::Ok {
   2577                 event_id: event.id().clone(),
   2578                 accepted: true,
   2579                 message: String::new()
   2580             }]
   2581         );
   2582         let offset = offsets.try_recv().expect("offset");
   2583         assert!(matches!(
   2584             handle
   2585                 .fanout_event_offset(offset, &mut subscriptions, &auth)
   2586                 .await
   2587                 .expect("fanout")
   2588                 .as_slice(),
   2589             [RuntimeRelayMessage::Event {
   2590                 subscription_id: delivered,
   2591                 event: found
   2592             }] if delivered == &subscription_id && found.id().as_hex_string() == event.id().as_str()
   2593         ));
   2594 
   2595         assert_eq!(
   2596             handle
   2597                 .handle_protocol_client_message_for_test(
   2598                     ClientMessage::Event(event.clone()),
   2599                     &mut auth,
   2600                     UnixTimestamp::new(1_714_124_434)
   2601                 )
   2602                 .await
   2603                 .expect("duplicate"),
   2604             vec![RelayMessage::Ok {
   2605                 event_id: event.id().clone(),
   2606                 accepted: true,
   2607                 message: "duplicate: already have this event".to_owned()
   2608             }]
   2609         );
   2610         assert_eq!(
   2611             offsets.try_recv().expect_err("no duplicate offset"),
   2612             TangleEventReceiveError::Empty
   2613         );
   2614         let snapshot = handle.metrics().snapshot();
   2615         assert_eq!(snapshot.client_messages(), 2);
   2616         assert_eq!(snapshot.event_messages(), 2);
   2617         assert_eq!(snapshot.stored_event_offsets(), 1);
   2618         assert_eq!(handle.metrics().event_admissions(), 2);
   2619         assert_eq!(handle.metrics().event_bus_receivers_current(), 1);
   2620         assert_eq!(handle.metrics().event_bus_published_offsets(), 1);
   2621         assert_eq!(handle.metrics().event_admission_latency_count(), 2);
   2622 
   2623         let _ = std::fs::remove_dir_all(root);
   2624     }
   2625 
   2626     #[tokio::test]
   2627     async fn runtime_hooks_reject_events_and_observe_stored_offsets() {
   2628         let root = temp_root("runtime-hooks");
   2629         let _ = std::fs::remove_dir_all(&root);
   2630         let hooks = Arc::new(RecordingHooks::default());
   2631         let handle = RelayRuntimeHandle::new(
   2632             RelayRuntime::open_with_hooks(runtime_config(&root, 8), hooks.clone())
   2633                 .expect("runtime"),
   2634         );
   2635         let mut offsets = handle.subscribe_events().await;
   2636         let mut auth = handle.auth_state().await.expect("auth");
   2637         let rejected = tangle_v2_event(
   2638             FixtureKey::Member,
   2639             1_714_124_433,
   2640             1,
   2641             vec![Tag::from_parts("policy", &["reject"]).expect("policy")],
   2642             "rejected",
   2643         )
   2644         .expect("rejected event");
   2645 
   2646         assert_eq!(
   2647             handle
   2648                 .handle_protocol_client_message_for_test(
   2649                     ClientMessage::Event(rejected.clone()),
   2650                     &mut auth,
   2651                     UnixTimestamp::new(1_714_124_433)
   2652                 )
   2653                 .await
   2654                 .expect("rejected"),
   2655             vec![RelayMessage::Ok {
   2656                 event_id: rejected.id().clone(),
   2657                 accepted: false,
   2658                 message: "restricted: hook rejected event".to_owned()
   2659             }]
   2660         );
   2661         assert_eq!(
   2662             offsets.try_recv().expect_err("no rejected offset"),
   2663             TangleEventReceiveError::Empty
   2664         );
   2665 
   2666         let accepted = tangle_v2_event(
   2667             FixtureKey::Member,
   2668             1_714_124_434,
   2669             1,
   2670             vec![Tag::from_parts("policy", &["accept"]).expect("policy")],
   2671             "accepted",
   2672         )
   2673         .expect("accepted event");
   2674 
   2675         assert_eq!(
   2676             handle
   2677                 .handle_protocol_client_message_for_test(
   2678                     ClientMessage::Event(accepted.clone()),
   2679                     &mut auth,
   2680                     UnixTimestamp::new(1_714_124_434)
   2681                 )
   2682                 .await
   2683                 .expect("accepted"),
   2684             vec![RelayMessage::Ok {
   2685                 event_id: accepted.id().clone(),
   2686                 accepted: true,
   2687                 message: String::new()
   2688             }]
   2689         );
   2690         assert!(offsets.try_recv().is_ok());
   2691         let admissions = hooks.admissions.lock().expect("admissions");
   2692         assert_eq!(admissions.len(), 2);
   2693         assert_eq!(admissions[0].event().event_id(), rejected.id().as_str());
   2694         assert_eq!(admissions[0].event().created_at(), 1_714_124_433);
   2695         assert_eq!(admissions[1].event().event_id(), accepted.id().as_str());
   2696         assert_eq!(admissions[1].event().created_at(), 1_714_124_434);
   2697         drop(admissions);
   2698         let stored = hooks.stored.lock().expect("stored");
   2699         assert_eq!(stored.len(), 1);
   2700         assert_eq!(stored[0].event().event_id(), accepted.id().as_str());
   2701         assert_eq!(stored[0].event().created_at(), 1_714_124_434);
   2702         assert_eq!(stored[0].store_offsets().len(), 1);
   2703 
   2704         let _ = std::fs::remove_dir_all(root);
   2705     }
   2706 
   2707     #[tokio::test]
   2708     async fn runtime_rate_limits_event_pubkeys_before_storage() {
   2709         let root = temp_root("runtime-event-rate-limit");
   2710         let _ = std::fs::remove_dir_all(&root);
   2711         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2712         let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited")
   2713             .expect("event");
   2714         let rule = runtime.config().rate_limits().event().per_pubkey();
   2715         let key = TangleRateLimitKey::pubkey(
   2716             TangleRateLimitScope::Event,
   2717             event.unsigned().pubkey().clone(),
   2718         );
   2719         for _ in 0..rule.max_hits() {
   2720             runtime
   2721                 .rate_limiter()
   2722                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   2723         }
   2724         let handle = RelayRuntimeHandle::new(runtime);
   2725         let mut auth = handle.auth_state().await.expect("auth");
   2726 
   2727         assert_eq!(
   2728             handle
   2729                 .handle_protocol_client_message_for_test(
   2730                     ClientMessage::Event(event.clone()),
   2731                     &mut auth,
   2732                     UnixTimestamp::new(1_714_124_433)
   2733                 )
   2734                 .await
   2735                 .expect("event"),
   2736             vec![RelayMessage::Ok {
   2737                 event_id: event.id().clone(),
   2738                 accepted: false,
   2739                 message: "rate-limited: event pubkey rate limit exceeded until 1714124493"
   2740                     .to_owned()
   2741             }]
   2742         );
   2743 
   2744         let _ = std::fs::remove_dir_all(root);
   2745     }
   2746 
   2747     #[tokio::test]
   2748     async fn runtime_rate_limits_event_kinds_before_storage() {
   2749         let root = temp_root("runtime-event-kind-rate-limit");
   2750         let _ = std::fs::remove_dir_all(&root);
   2751         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2752         let event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 1, Vec::new(), "limited")
   2753             .expect("event");
   2754         let rule = runtime.config().rate_limits().event().per_kind();
   2755         let key = TangleRateLimitKey::kind(TangleRateLimitScope::Event, event.unsigned().kind());
   2756         for _ in 0..rule.max_hits() {
   2757             runtime
   2758                 .rate_limiter()
   2759                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   2760         }
   2761         let handle = RelayRuntimeHandle::new(runtime);
   2762         let mut auth = handle.auth_state().await.expect("auth");
   2763 
   2764         assert_eq!(
   2765             handle
   2766                 .handle_protocol_client_message_for_test(
   2767                     ClientMessage::Event(event.clone()),
   2768                     &mut auth,
   2769                     UnixTimestamp::new(1_714_124_433)
   2770                 )
   2771                 .await
   2772                 .expect("event"),
   2773             vec![RelayMessage::Ok {
   2774                 event_id: event.id().clone(),
   2775                 accepted: false,
   2776                 message: "rate-limited: event kind rate limit exceeded until 1714124493".to_owned()
   2777             }]
   2778         );
   2779 
   2780         let _ = std::fs::remove_dir_all(root);
   2781     }
   2782 
   2783     #[tokio::test]
   2784     async fn runtime_rate_limits_event_peer_ips_partition_peers_and_precede_identity_keys() {
   2785         let root = temp_root("runtime-event-ip-rate-limit");
   2786         let _ = std::fs::remove_dir_all(&root);
   2787         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2788         let rule = runtime.config().rate_limits().event().per_ip();
   2789         let saturated_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 20));
   2790         let other_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 21));
   2791         let key = TangleRateLimitKey::ip(TangleRateLimitScope::Event, saturated_peer_ip);
   2792         for _ in 0..rule.max_hits() {
   2793             runtime
   2794                 .rate_limiter()
   2795                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   2796         }
   2797         let limited_event =
   2798             tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited")
   2799                 .expect("limited event");
   2800         let rotated_event =
   2801             tangle_v2_event(FixtureKey::Admin, 1_714_124_434, 2, Vec::new(), "rotated")
   2802                 .expect("rotated event");
   2803         let allowed_event =
   2804             tangle_v2_event(FixtureKey::Owner, 1_714_124_435, 2, Vec::new(), "allowed")
   2805                 .expect("allowed event");
   2806         let handle = RelayRuntimeHandle::new(runtime);
   2807         let mut auth = handle.auth_state().await.expect("auth");
   2808 
   2809         assert_eq!(
   2810             handle
   2811                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   2812                     ClientMessage::Event(limited_event.clone()),
   2813                     &mut auth,
   2814                     TangleClientRateLimitContext::new(Some(saturated_peer_ip), None),
   2815                     UnixTimestamp::new(1_714_124_433)
   2816                 )
   2817                 .await
   2818                 .expect("event"),
   2819             vec![RelayMessage::Ok {
   2820                 event_id: limited_event.id().clone(),
   2821                 accepted: false,
   2822                 message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned()
   2823             }]
   2824         );
   2825         assert_eq!(
   2826             handle
   2827                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   2828                     ClientMessage::Event(rotated_event.clone()),
   2829                     &mut auth,
   2830                     TangleClientRateLimitContext::new(Some(saturated_peer_ip), None),
   2831                     UnixTimestamp::new(1_714_124_433)
   2832                 )
   2833                 .await
   2834                 .expect("event"),
   2835             vec![RelayMessage::Ok {
   2836                 event_id: rotated_event.id().clone(),
   2837                 accepted: false,
   2838                 message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned()
   2839             }]
   2840         );
   2841         assert_eq!(
   2842             handle
   2843                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   2844                     ClientMessage::Event(allowed_event.clone()),
   2845                     &mut auth,
   2846                     TangleClientRateLimitContext::new(Some(other_peer_ip), None),
   2847                     UnixTimestamp::new(1_714_124_433)
   2848                 )
   2849                 .await
   2850                 .expect("event"),
   2851             vec![RelayMessage::Ok {
   2852                 event_id: allowed_event.id().clone(),
   2853                 accepted: true,
   2854                 message: String::new()
   2855             }]
   2856         );
   2857         assert_eq!(handle.metrics().rate_limit_rejections(), 2);
   2858         assert_eq!(handle.metrics().event_rejections(), 2);
   2859         assert_eq!(handle.metrics().event_admissions(), 1);
   2860 
   2861         let _ = std::fs::remove_dir_all(root);
   2862     }
   2863 
   2864     #[tokio::test]
   2865     async fn runtime_rate_limits_auth_pubkeys_before_authentication() {
   2866         let root = temp_root("runtime-auth-pubkey-rate-limit");
   2867         let _ = std::fs::remove_dir_all(&root);
   2868         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2869         let auth_event =
   2870             tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event");
   2871         let rule = runtime.config().rate_limits().auth().per_pubkey();
   2872         let key = TangleRateLimitKey::pubkey(
   2873             TangleRateLimitScope::Auth,
   2874             auth_event.unsigned().pubkey().clone(),
   2875         );
   2876         for _ in 0..rule.max_hits() {
   2877             runtime
   2878                 .rate_limiter()
   2879                 .record(key.clone(), rule, UnixTimestamp::new(120));
   2880         }
   2881         let handle = RelayRuntimeHandle::new(runtime);
   2882         let mut auth = handle.auth_state().await.expect("auth");
   2883         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   2884             .expect("challenge");
   2885 
   2886         assert_eq!(
   2887             handle
   2888                 .handle_protocol_client_message_for_test(
   2889                     ClientMessage::Auth(auth_event.clone()),
   2890                     &mut auth,
   2891                     UnixTimestamp::new(120)
   2892                 )
   2893                 .await
   2894                 .expect("auth"),
   2895             vec![RelayMessage::Ok {
   2896                 event_id: auth_event.id().clone(),
   2897                 accepted: false,
   2898                 message: "rate-limited: auth pubkey rate limit exceeded until 180".to_owned()
   2899             }]
   2900         );
   2901         assert!(auth.authenticated_pubkeys().is_empty());
   2902 
   2903         let _ = std::fs::remove_dir_all(root);
   2904     }
   2905 
   2906     #[tokio::test]
   2907     async fn runtime_rate_limits_auth_peer_ips_before_authentication() {
   2908         let root = temp_root("runtime-auth-ip-rate-limit");
   2909         let _ = std::fs::remove_dir_all(&root);
   2910         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2911         let auth_event =
   2912             tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event");
   2913         let rule = runtime.config().rate_limits().auth().per_ip();
   2914         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 30));
   2915         let key = TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip);
   2916         for _ in 0..rule.max_hits() {
   2917             runtime
   2918                 .rate_limiter()
   2919                 .record(key.clone(), rule, UnixTimestamp::new(120));
   2920         }
   2921         let handle = RelayRuntimeHandle::new(runtime);
   2922         let mut auth = handle.auth_state().await.expect("auth");
   2923         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   2924             .expect("challenge");
   2925 
   2926         assert_eq!(
   2927             handle
   2928                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   2929                     ClientMessage::Auth(auth_event.clone()),
   2930                     &mut auth,
   2931                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   2932                     UnixTimestamp::new(120)
   2933                 )
   2934                 .await
   2935                 .expect("auth"),
   2936             vec![RelayMessage::Ok {
   2937                 event_id: auth_event.id().clone(),
   2938                 accepted: false,
   2939                 message: "rate-limited: auth ip rate limit exceeded until 180".to_owned()
   2940             }]
   2941         );
   2942         assert!(auth.authenticated_pubkeys().is_empty());
   2943         assert_eq!(handle.metrics().rate_limit_rejections(), 1);
   2944         assert_eq!(handle.metrics().auth_failures(), 1);
   2945 
   2946         let _ = std::fs::remove_dir_all(root);
   2947     }
   2948 
   2949     #[tokio::test]
   2950     async fn runtime_rate_limits_auth_failures() {
   2951         let root = temp_root("runtime-auth-failure-rate-limit");
   2952         let _ = std::fs::remove_dir_all(&root);
   2953         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2954         let auth_event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "")
   2955             .expect("auth event");
   2956         let key =
   2957             TangleRateLimitKey::auth_failure(None, Some(auth_event.unsigned().pubkey().clone()));
   2958         let rule = runtime.config().rate_limits().auth().failures();
   2959         for _ in 0..rule.max_hits() {
   2960             runtime
   2961                 .rate_limiter()
   2962                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   2963         }
   2964         let handle = RelayRuntimeHandle::new(runtime);
   2965         let mut auth = handle.auth_state().await.expect("auth");
   2966 
   2967         assert_eq!(
   2968             handle
   2969                 .handle_protocol_client_message_for_test(
   2970                     ClientMessage::Auth(auth_event.clone()),
   2971                     &mut auth,
   2972                     UnixTimestamp::new(1_714_124_433)
   2973                 )
   2974                 .await
   2975                 .expect("auth"),
   2976             vec![RelayMessage::Ok {
   2977                 event_id: auth_event.id().clone(),
   2978                 accepted: false,
   2979                 message: "rate-limited: auth failure rate limit exceeded until 1714124733"
   2980                     .to_owned()
   2981             }]
   2982         );
   2983 
   2984         let _ = std::fs::remove_dir_all(root);
   2985     }
   2986 
   2987     #[tokio::test]
   2988     async fn runtime_rate_limits_auth_failures_by_peer_ip() {
   2989         let root = temp_root("runtime-auth-failure-ip-rate-limit");
   2990         let _ = std::fs::remove_dir_all(&root);
   2991         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   2992         let auth_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 22_242, Vec::new(), "")
   2993             .expect("auth event");
   2994         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 31));
   2995         let key = TangleRateLimitKey::auth_failure(Some(peer_ip), None);
   2996         let rule = runtime.config().rate_limits().auth().failures_per_ip();
   2997         for _ in 0..rule.max_hits() {
   2998             runtime
   2999                 .rate_limiter()
   3000                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3001         }
   3002         let handle = RelayRuntimeHandle::new(runtime);
   3003         let mut auth = handle.auth_state().await.expect("auth");
   3004 
   3005         assert_eq!(
   3006             handle
   3007                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3008                     ClientMessage::Auth(auth_event.clone()),
   3009                     &mut auth,
   3010                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   3011                     UnixTimestamp::new(1_714_124_433)
   3012                 )
   3013                 .await
   3014                 .expect("auth"),
   3015             vec![RelayMessage::Ok {
   3016                 event_id: auth_event.id().clone(),
   3017                 accepted: false,
   3018                 message: "rate-limited: auth failure ip rate limit exceeded until 1714124733"
   3019                     .to_owned()
   3020             }]
   3021         );
   3022         assert_eq!(handle.metrics().rate_limit_rejections(), 1);
   3023         assert_eq!(handle.metrics().auth_failures(), 1);
   3024 
   3025         let _ = std::fs::remove_dir_all(root);
   3026     }
   3027 
   3028     #[tokio::test]
   3029     async fn runtime_preserves_chorus_auth_failure_rate_limit_parity() {
   3030         let root = temp_root("runtime-chorus-auth-rate-limit-parity");
   3031         let _ = std::fs::remove_dir_all(&root);
   3032         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3033         let pubkey_event =
   3034             tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "")
   3035                 .expect("pubkey auth event");
   3036         let pubkey_rule = runtime.config().rate_limits().auth().failures();
   3037         let pubkey_key =
   3038             TangleRateLimitKey::auth_failure(None, Some(pubkey_event.unsigned().pubkey().clone()));
   3039         for _ in 0..pubkey_rule.max_hits() {
   3040             runtime.rate_limiter().record(
   3041                 pubkey_key.clone(),
   3042                 pubkey_rule,
   3043                 UnixTimestamp::new(1_714_124_433),
   3044             );
   3045         }
   3046         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 41));
   3047         let peer_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_434, 22_242, Vec::new(), "")
   3048             .expect("peer auth event");
   3049         let peer_rule = runtime.config().rate_limits().auth().failures_per_ip();
   3050         let peer_key = TangleRateLimitKey::auth_failure(Some(peer_ip), None);
   3051         for _ in 0..peer_rule.max_hits() {
   3052             runtime.rate_limiter().record(
   3053                 peer_key.clone(),
   3054                 peer_rule,
   3055                 UnixTimestamp::new(1_714_124_434),
   3056             );
   3057         }
   3058         let handle = RelayRuntimeHandle::new(runtime);
   3059         let mut auth = handle.auth_state().await.expect("auth");
   3060 
   3061         assert_eq!(
   3062             handle
   3063                 .handle_protocol_client_message_for_test(
   3064                     ClientMessage::Auth(pubkey_event.clone()),
   3065                     &mut auth,
   3066                     UnixTimestamp::new(1_714_124_433)
   3067                 )
   3068                 .await
   3069                 .expect("pubkey failure"),
   3070             vec![RelayMessage::Ok {
   3071                 event_id: pubkey_event.id().clone(),
   3072                 accepted: false,
   3073                 message: "rate-limited: auth failure rate limit exceeded until 1714124733"
   3074                     .to_owned()
   3075             }]
   3076         );
   3077         assert_eq!(
   3078             handle
   3079                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3080                     ClientMessage::Auth(peer_event.clone()),
   3081                     &mut auth,
   3082                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   3083                     UnixTimestamp::new(1_714_124_434)
   3084                 )
   3085                 .await
   3086                 .expect("peer failure"),
   3087             vec![RelayMessage::Ok {
   3088                 event_id: peer_event.id().clone(),
   3089                 accepted: false,
   3090                 message: "rate-limited: auth failure ip rate limit exceeded until 1714124734"
   3091                     .to_owned()
   3092             }]
   3093         );
   3094         assert!(auth.authenticated_pubkeys().is_empty());
   3095         let snapshot = handle.metrics().snapshot();
   3096         assert_eq!(snapshot.client_messages(), 2);
   3097         assert_eq!(snapshot.auth_messages(), 2);
   3098         assert_eq!(snapshot.rate_limit_rejections(), 2);
   3099         assert_eq!(handle.metrics().auth_successes(), 0);
   3100         assert_eq!(handle.metrics().auth_failures(), 2);
   3101 
   3102         let _ = std::fs::remove_dir_all(root);
   3103     }
   3104 
   3105     #[tokio::test]
   3106     async fn runtime_rate_limits_group_writes_by_pubkey() {
   3107         let root = temp_root("runtime-group-pubkey-rate-limit");
   3108         let _ = std::fs::remove_dir_all(&root);
   3109         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3110         let event = tangle_v2_event(
   3111             FixtureKey::Member,
   3112             1_714_124_433,
   3113             1,
   3114             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   3115             "limited",
   3116         )
   3117         .expect("event");
   3118         let rule = runtime.config().rate_limits().group().write_per_pubkey();
   3119         let key = TangleRateLimitKey::pubkey(
   3120             TangleRateLimitScope::GroupWrite,
   3121             event.unsigned().pubkey().clone(),
   3122         );
   3123         for _ in 0..rule.max_hits() {
   3124             runtime
   3125                 .rate_limiter()
   3126                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3127         }
   3128         let handle = RelayRuntimeHandle::new(runtime);
   3129         let mut auth = handle.auth_state().await.expect("auth");
   3130 
   3131         assert_eq!(
   3132             handle
   3133                 .handle_protocol_client_message_for_test(
   3134                     ClientMessage::Event(event.clone()),
   3135                     &mut auth,
   3136                     UnixTimestamp::new(1_714_124_433)
   3137                 )
   3138                 .await
   3139                 .expect("event"),
   3140             vec![RelayMessage::Ok {
   3141                 event_id: event.id().clone(),
   3142                 accepted: false,
   3143                 message: "rate-limited: group pubkey rate limit exceeded until 1714124493"
   3144                     .to_owned()
   3145             }]
   3146         );
   3147 
   3148         let _ = std::fs::remove_dir_all(root);
   3149     }
   3150 
   3151     #[tokio::test]
   3152     async fn runtime_rate_limits_group_writes_by_peer_ip() {
   3153         let root = temp_root("runtime-group-ip-rate-limit");
   3154         let _ = std::fs::remove_dir_all(&root);
   3155         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3156         let event = tangle_v2_event(
   3157             FixtureKey::Member,
   3158             1_714_124_433,
   3159             1,
   3160             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   3161             "limited",
   3162         )
   3163         .expect("event");
   3164         let rule = runtime.config().rate_limits().group().write_per_ip();
   3165         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 40));
   3166         let key = TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip);
   3167         for _ in 0..rule.max_hits() {
   3168             runtime
   3169                 .rate_limiter()
   3170                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3171         }
   3172         let handle = RelayRuntimeHandle::new(runtime);
   3173         let mut auth = handle.auth_state().await.expect("auth");
   3174 
   3175         assert_eq!(
   3176             handle
   3177                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3178                     ClientMessage::Event(event.clone()),
   3179                     &mut auth,
   3180                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   3181                     UnixTimestamp::new(1_714_124_433)
   3182                 )
   3183                 .await
   3184                 .expect("event"),
   3185             vec![RelayMessage::Ok {
   3186                 event_id: event.id().clone(),
   3187                 accepted: false,
   3188                 message: "rate-limited: group ip rate limit exceeded until 1714124493".to_owned()
   3189             }]
   3190         );
   3191         assert_eq!(handle.metrics().rate_limit_rejections(), 1);
   3192         assert_eq!(handle.metrics().event_rejections(), 1);
   3193         assert_eq!(handle.metrics().group_write_denials(), 1);
   3194 
   3195         let _ = std::fs::remove_dir_all(root);
   3196     }
   3197 
   3198     #[tokio::test]
   3199     async fn runtime_rate_limits_group_writes_by_group_id() {
   3200         let root = temp_root("runtime-group-write-rate-limit");
   3201         let _ = std::fs::remove_dir_all(&root);
   3202         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3203         let group_id = GroupId::new("Farm").expect("group");
   3204         let event = tangle_v2_event(
   3205             FixtureKey::Member,
   3206             1_714_124_433,
   3207             1,
   3208             vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")],
   3209             "limited",
   3210         )
   3211         .expect("event");
   3212         let rule = runtime.config().rate_limits().group().write_per_group();
   3213         let key = TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id);
   3214         for _ in 0..rule.max_hits() {
   3215             runtime
   3216                 .rate_limiter()
   3217                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3218         }
   3219         let handle = RelayRuntimeHandle::new(runtime);
   3220         let mut auth = handle.auth_state().await.expect("auth");
   3221 
   3222         assert_eq!(
   3223             handle
   3224                 .handle_protocol_client_message_for_test(
   3225                     ClientMessage::Event(event.clone()),
   3226                     &mut auth,
   3227                     UnixTimestamp::new(1_714_124_433)
   3228                 )
   3229                 .await
   3230                 .expect("event"),
   3231             vec![RelayMessage::Ok {
   3232                 event_id: event.id().clone(),
   3233                 accepted: false,
   3234                 message: "rate-limited: group write rate limit exceeded until 1714124493"
   3235                     .to_owned()
   3236             }]
   3237         );
   3238 
   3239         let _ = std::fs::remove_dir_all(root);
   3240     }
   3241 
   3242     #[tokio::test]
   3243     async fn runtime_rate_limits_group_writes_by_kind() {
   3244         let root = temp_root("runtime-group-kind-rate-limit");
   3245         let _ = std::fs::remove_dir_all(&root);
   3246         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3247         let event = tangle_v2_event(
   3248             FixtureKey::Member,
   3249             1_714_124_433,
   3250             1,
   3251             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   3252             "limited",
   3253         )
   3254         .expect("event");
   3255         let rule = runtime.config().rate_limits().group().write_per_kind();
   3256         let key =
   3257             TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, event.unsigned().kind());
   3258         for _ in 0..rule.max_hits() {
   3259             runtime
   3260                 .rate_limiter()
   3261                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3262         }
   3263         let handle = RelayRuntimeHandle::new(runtime);
   3264         let mut auth = handle.auth_state().await.expect("auth");
   3265 
   3266         assert_eq!(
   3267             handle
   3268                 .handle_protocol_client_message_for_test(
   3269                     ClientMessage::Event(event.clone()),
   3270                     &mut auth,
   3271                     UnixTimestamp::new(1_714_124_433)
   3272                 )
   3273                 .await
   3274                 .expect("event"),
   3275             vec![RelayMessage::Ok {
   3276                 event_id: event.id().clone(),
   3277                 accepted: false,
   3278                 message: "rate-limited: group kind rate limit exceeded until 1714124493".to_owned()
   3279             }]
   3280         );
   3281 
   3282         let _ = std::fs::remove_dir_all(root);
   3283     }
   3284 
   3285     #[tokio::test]
   3286     async fn runtime_rate_limits_group_join_flows() {
   3287         let root = temp_root("runtime-group-join-rate-limit");
   3288         let _ = std::fs::remove_dir_all(&root);
   3289         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3290         let group_id = GroupId::new("Farm").expect("group");
   3291         let event = tangle_v2_event(
   3292             FixtureKey::Member,
   3293             1_714_124_433,
   3294             KIND_GROUP_JOIN_REQUEST.into(),
   3295             vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")],
   3296             "",
   3297         )
   3298         .expect("event");
   3299         let rule = runtime.config().rate_limits().group().join_flow();
   3300         let key = TangleRateLimitKey::join_flow(group_id, event.unsigned().pubkey().clone());
   3301         for _ in 0..rule.max_hits() {
   3302             runtime
   3303                 .rate_limiter()
   3304                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3305         }
   3306         let handle = RelayRuntimeHandle::new(runtime);
   3307         let mut auth = handle.auth_state().await.expect("auth");
   3308 
   3309         assert_eq!(
   3310             handle
   3311                 .handle_protocol_client_message_for_test(
   3312                     ClientMessage::Event(event.clone()),
   3313                     &mut auth,
   3314                     UnixTimestamp::new(1_714_124_433)
   3315                 )
   3316                 .await
   3317                 .expect("event"),
   3318             vec![RelayMessage::Ok {
   3319                 event_id: event.id().clone(),
   3320                 accepted: false,
   3321                 message: "rate-limited: group join rate limit exceeded until 1714124733".to_owned()
   3322             }]
   3323         );
   3324 
   3325         let _ = std::fs::remove_dir_all(root);
   3326     }
   3327 
   3328     #[tokio::test]
   3329     async fn runtime_rate_limits_group_join_flows_by_peer_ip() {
   3330         let root = temp_root("runtime-group-join-ip-rate-limit");
   3331         let _ = std::fs::remove_dir_all(&root);
   3332         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3333         let group_id = GroupId::new("Farm").expect("group");
   3334         let event = tangle_v2_event(
   3335             FixtureKey::Member,
   3336             1_714_124_433,
   3337             KIND_GROUP_JOIN_REQUEST.into(),
   3338             vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")],
   3339             "",
   3340         )
   3341         .expect("event");
   3342         let rule = runtime.config().rate_limits().group().join_flow_per_ip();
   3343         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 41));
   3344         let key = TangleRateLimitKey::join_flow_ip(group_id, peer_ip);
   3345         for _ in 0..rule.max_hits() {
   3346             runtime
   3347                 .rate_limiter()
   3348                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3349         }
   3350         let handle = RelayRuntimeHandle::new(runtime);
   3351         let mut auth = handle.auth_state().await.expect("auth");
   3352 
   3353         assert_eq!(
   3354             handle
   3355                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3356                     ClientMessage::Event(event.clone()),
   3357                     &mut auth,
   3358                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   3359                     UnixTimestamp::new(1_714_124_433)
   3360                 )
   3361                 .await
   3362                 .expect("event"),
   3363             vec![RelayMessage::Ok {
   3364                 event_id: event.id().clone(),
   3365                 accepted: false,
   3366                 message: "rate-limited: group join ip rate limit exceeded until 1714124733"
   3367                     .to_owned()
   3368             }]
   3369         );
   3370         assert_eq!(handle.metrics().rate_limit_rejections(), 1);
   3371         assert_eq!(handle.metrics().event_rejections(), 1);
   3372         assert_eq!(handle.metrics().group_write_denials(), 1);
   3373 
   3374         let _ = std::fs::remove_dir_all(root);
   3375     }
   3376 
   3377     #[tokio::test]
   3378     async fn runtime_rate_limits_req_authenticated_pubkeys() {
   3379         let root = temp_root("runtime-req-pubkey-rate-limit");
   3380         let _ = std::fs::remove_dir_all(&root);
   3381         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3382         let rule = runtime.config().rate_limits().req().per_pubkey();
   3383         let handle = RelayRuntimeHandle::new(runtime);
   3384         let mut auth = handle.auth_state().await.expect("auth");
   3385         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   3386             .expect("challenge");
   3387         let auth_event =
   3388             tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event");
   3389 
   3390         assert_eq!(
   3391             handle
   3392                 .handle_protocol_client_message_for_test(
   3393                     ClientMessage::Auth(auth_event.clone()),
   3394                     &mut auth,
   3395                     UnixTimestamp::new(120)
   3396                 )
   3397                 .await
   3398                 .expect("auth"),
   3399             vec![RelayMessage::Ok {
   3400                 event_id: auth_event.id().clone(),
   3401                 accepted: true,
   3402                 message: String::new()
   3403             }]
   3404         );
   3405         let key =
   3406             TangleRateLimitKey::pubkey(TangleRateLimitScope::Req, FixtureKey::Member.public_key());
   3407         let limiter = handle.rate_limiter().await;
   3408         for _ in 0..rule.max_hits() {
   3409             limiter.record(key.clone(), rule, UnixTimestamp::new(120));
   3410         }
   3411         let subscription_id = SubscriptionId::new("limited-req-pubkey").expect("subscription");
   3412         let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")];
   3413 
   3414         assert_eq!(
   3415             handle
   3416                 .handle_protocol_client_message_for_test(
   3417                     ClientMessage::Req {
   3418                         subscription_id: subscription_id.clone(),
   3419                         filters
   3420                     },
   3421                     &mut auth,
   3422                     UnixTimestamp::new(120)
   3423                 )
   3424                 .await
   3425                 .expect("req"),
   3426             vec![RelayMessage::Closed {
   3427                 subscription_id,
   3428                 message: "rate-limited: req pubkey rate limit exceeded until 180".to_owned()
   3429             }]
   3430         );
   3431 
   3432         let _ = std::fs::remove_dir_all(root);
   3433     }
   3434 
   3435     #[tokio::test]
   3436     async fn runtime_rate_limits_req_connections() {
   3437         let root = temp_root("runtime-req-connection-rate-limit");
   3438         let _ = std::fs::remove_dir_all(&root);
   3439         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3440         let rule = runtime.config().rate_limits().req().per_connection();
   3441         let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, 77);
   3442         for _ in 0..rule.max_hits() {
   3443             runtime
   3444                 .rate_limiter()
   3445                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3446         }
   3447         let handle = RelayRuntimeHandle::new(runtime);
   3448         let mut auth = handle.auth_state().await.expect("auth");
   3449         let subscription_id = SubscriptionId::new("limited-req-connection").expect("subscription");
   3450         let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
   3451 
   3452         assert_eq!(
   3453             handle
   3454                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3455                     ClientMessage::Req {
   3456                         subscription_id: subscription_id.clone(),
   3457                         filters
   3458                     },
   3459                     &mut auth,
   3460                     TangleClientRateLimitContext::new(None, Some(77)),
   3461                     UnixTimestamp::new(1_714_124_433)
   3462                 )
   3463                 .await
   3464                 .expect("req"),
   3465             vec![RelayMessage::Closed {
   3466                 subscription_id,
   3467                 message: "rate-limited: req connection rate limit exceeded until 1714124493"
   3468                     .to_owned()
   3469             }]
   3470         );
   3471 
   3472         let _ = std::fs::remove_dir_all(root);
   3473     }
   3474 
   3475     #[tokio::test]
   3476     async fn runtime_rate_limits_req_filter_groups() {
   3477         let root = temp_root("runtime-req-group-rate-limit");
   3478         let _ = std::fs::remove_dir_all(&root);
   3479         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3480         let group_id = GroupId::new("Farm").expect("group");
   3481         let rule = runtime.config().rate_limits().req().per_group();
   3482         let key = TangleRateLimitKey::group(TangleRateLimitScope::Req, group_id);
   3483         for _ in 0..rule.max_hits() {
   3484             runtime
   3485                 .rate_limiter()
   3486                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3487         }
   3488         let handle = RelayRuntimeHandle::new(runtime);
   3489         let mut auth = handle.auth_state().await.expect("auth");
   3490         let subscription_id = SubscriptionId::new("limited-req-group").expect("subscription");
   3491         let filters =
   3492             vec![filter_from_value(&json!({"#h": ["Farm"], "limit": 1})).expect("filter")];
   3493 
   3494         assert_eq!(
   3495             handle
   3496                 .handle_protocol_client_message_for_test(
   3497                     ClientMessage::Req {
   3498                         subscription_id: subscription_id.clone(),
   3499                         filters
   3500                     },
   3501                     &mut auth,
   3502                     UnixTimestamp::new(1_714_124_433)
   3503                 )
   3504                 .await
   3505                 .expect("req"),
   3506             vec![RelayMessage::Closed {
   3507                 subscription_id,
   3508                 message: "rate-limited: req group rate limit exceeded until 1714124493".to_owned()
   3509             }]
   3510         );
   3511 
   3512         let _ = std::fs::remove_dir_all(root);
   3513     }
   3514 
   3515     #[test]
   3516     fn query_classifier_identifies_broad_count_shapes() {
   3517         let classifier = TangleQueryClassifier::new(runtime_relay_limits(8));
   3518         let empty_filter = pocket_filter(json!({}));
   3519         let tag_only_filter = pocket_filter(json!({"#t": ["market"], "limit": 1}));
   3520         let kind_only_filter = pocket_filter(json!({"kinds": [1], "limit": 1}));
   3521         let high_limit_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 500}));
   3522         let broad_time_filter = pocket_filter(json!({
   3523             "kinds": [1],
   3524             "since": 1,
   3525             "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2,
   3526             "limit": 1
   3527         }));
   3528         let bounded_group_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 1}));
   3529         let bounded_time_filter = pocket_filter(json!({
   3530             "kinds": [1],
   3531             "since": 1,
   3532             "until": BROAD_QUERY_TIME_WINDOW_SECONDS,
   3533             "limit": 1
   3534         }));
   3535         let hll_reaction_filter = pocket_filter(json!({"kinds": [7], "#e": ["a".repeat(64)]}));
   3536 
   3537         assert_eq!(
   3538             classifier.classify_pocket_count(&[]),
   3539             TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters)
   3540         );
   3541         assert_eq!(
   3542             classifier.classify_pocket_count(&[empty_filter]),
   3543             TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint)
   3544         );
   3545         assert_eq!(
   3546             classifier.classify_pocket_count(&[tag_only_filter]),
   3547             TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint)
   3548         );
   3549         assert_eq!(
   3550             classifier.classify_pocket_count(&[kind_only_filter]),
   3551             TangleQueryClassification::Broad(TangleBroadQueryReason::MissingBoundedSelector)
   3552         );
   3553         assert_eq!(
   3554             classifier.classify_pocket_count(&[high_limit_filter]),
   3555             TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit)
   3556         );
   3557         assert_eq!(
   3558             classifier.classify_pocket_count(&[broad_time_filter]),
   3559             TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow)
   3560         );
   3561         assert_eq!(
   3562             classifier.classify_pocket_count(&[bounded_group_filter]),
   3563             TangleQueryClassification::Bounded
   3564         );
   3565         assert_eq!(
   3566             classifier.classify_pocket_count(&[bounded_time_filter]),
   3567             TangleQueryClassification::Bounded
   3568         );
   3569         assert_eq!(
   3570             classifier.classify_pocket_count(&[hll_reaction_filter]),
   3571             TangleQueryClassification::Bounded
   3572         );
   3573     }
   3574 
   3575     #[tokio::test]
   3576     async fn runtime_count_hll_accepts_public_pocket_selector() {
   3577         let root = temp_root("runtime-count-hll");
   3578         let _ = std::fs::remove_dir_all(&root);
   3579         let handle =
   3580             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"));
   3581         let mut auth = handle.auth_state().await.expect("auth");
   3582         let target = "c".repeat(64);
   3583         let tags = PocketOwnedTags::new(&[["e", target.as_str()]]).expect("tags");
   3584         let first = signed_pocket_event(12, 1_714_124_433, 7, &tags, b"first reaction");
   3585         let second = signed_pocket_event(11, 1_714_124_434, 7, &tags, b"second reaction");
   3586 
   3587         assert_accepted_pocket_reply(
   3588             runtime_pocket_event_reply(&handle, &first, &mut auth),
   3589             &first,
   3590         );
   3591         assert_accepted_pocket_reply(
   3592             runtime_pocket_event_reply(&handle, &second, &mut auth),
   3593             &second,
   3594         );
   3595 
   3596         let subscription_id = SubscriptionId::new("count-hll-runtime").expect("subscription");
   3597         let replies = handle
   3598             .handle_protocol_client_message_for_test(
   3599                 ClientMessage::Count {
   3600                     subscription_id: subscription_id.clone(),
   3601                     filters: vec![
   3602                         filter_from_value(&json!({"kinds":[7],"#e":[target]})).expect("filter"),
   3603                     ],
   3604                 },
   3605                 &mut auth,
   3606                 UnixTimestamp::new(1_714_124_437),
   3607             )
   3608             .await
   3609             .expect("count");
   3610         let [
   3611             RelayMessage::Count {
   3612                 subscription_id: actual,
   3613                 count,
   3614                 hll: Some(hll),
   3615             },
   3616         ] = replies.as_slice()
   3617         else {
   3618             panic!("count hll expected: {replies:?}")
   3619         };
   3620 
   3621         assert_eq!(actual, &subscription_id);
   3622         assert_eq!(*count, 2);
   3623         assert_eq!(hll.len(), 512);
   3624         assert_ne!(hll, &"00".repeat(256));
   3625 
   3626         let _ = std::fs::remove_dir_all(root);
   3627     }
   3628 
   3629     #[test]
   3630     fn runtime_count_source_stays_exact() {
   3631         let sources = [
   3632             include_str!("runtime.rs"),
   3633             include_str!("relay/core.rs"),
   3634             include_str!("../../tangle_protocol/src/lib.rs"),
   3635         ];
   3636         let forbidden = [
   3637             concat!("approximate", "_count"),
   3638             concat!("approx", "_count"),
   3639             concat!("estimated", "_count"),
   3640             concat!("count", "_estimate"),
   3641             concat!("private", "_count", "_estimate"),
   3642         ];
   3643 
   3644         for source in sources {
   3645             for needle in forbidden {
   3646                 assert!(!source.contains(needle));
   3647             }
   3648         }
   3649     }
   3650 
   3651     #[tokio::test]
   3652     async fn runtime_rate_limits_count_peer_ips() {
   3653         let root = temp_root("runtime-count-ip-rate-limit");
   3654         let _ = std::fs::remove_dir_all(&root);
   3655         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3656         let rule = runtime.config().rate_limits().count().per_ip();
   3657         let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9));
   3658         let key = TangleRateLimitKey::ip(TangleRateLimitScope::Count, peer_ip);
   3659         for _ in 0..rule.max_hits() {
   3660             runtime
   3661                 .rate_limiter()
   3662                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3663         }
   3664         let handle = RelayRuntimeHandle::new(runtime);
   3665         let mut auth = handle.auth_state().await.expect("auth");
   3666         let subscription_id = SubscriptionId::new("limited-count-ip").expect("subscription");
   3667         let filters = vec![
   3668             filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"),
   3669         ];
   3670 
   3671         assert_eq!(
   3672             handle
   3673                 .handle_protocol_client_message_with_rate_limit_context_for_test(
   3674                     ClientMessage::Count {
   3675                         subscription_id: subscription_id.clone(),
   3676                         filters
   3677                     },
   3678                     &mut auth,
   3679                     TangleClientRateLimitContext::new(Some(peer_ip), None),
   3680                     UnixTimestamp::new(1_714_124_433)
   3681                 )
   3682                 .await
   3683                 .expect("count"),
   3684             vec![RelayMessage::Closed {
   3685                 subscription_id,
   3686                 message: "rate-limited: count ip rate limit exceeded until 1714124493".to_owned()
   3687             }]
   3688         );
   3689 
   3690         let _ = std::fs::remove_dir_all(root);
   3691     }
   3692 
   3693     #[tokio::test]
   3694     async fn runtime_rejects_search_req_and_count_as_unsupported() {
   3695         let root = temp_root("runtime-search-unsupported");
   3696         let _ = std::fs::remove_dir_all(&root);
   3697         let handle =
   3698             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"));
   3699         let mut auth = handle.auth_state().await.expect("auth");
   3700         let req_id = SubscriptionId::new("search-req").expect("req");
   3701         let count_id = SubscriptionId::new("search-count").expect("count");
   3702         let search =
   3703             filter_from_value(&json!({"search": "fresh carrots", "limit": 1})).expect("filter");
   3704 
   3705         assert_eq!(
   3706             handle
   3707                 .handle_protocol_client_message_for_test(
   3708                     ClientMessage::Req {
   3709                         subscription_id: req_id.clone(),
   3710                         filters: vec![search.clone()]
   3711                     },
   3712                     &mut auth,
   3713                     UnixTimestamp::new(1_714_124_433)
   3714                 )
   3715                 .await
   3716                 .expect("req"),
   3717             vec![RelayMessage::Closed {
   3718                 subscription_id: req_id,
   3719                 message: "unsupported: search filters are not supported".to_owned()
   3720             }]
   3721         );
   3722         assert_eq!(
   3723             handle
   3724                 .handle_protocol_client_message_for_test(
   3725                     ClientMessage::Count {
   3726                         subscription_id: count_id.clone(),
   3727                         filters: vec![search]
   3728                     },
   3729                     &mut auth,
   3730                     UnixTimestamp::new(1_714_124_434)
   3731                 )
   3732                 .await
   3733                 .expect("count"),
   3734             vec![RelayMessage::Closed {
   3735                 subscription_id: count_id,
   3736                 message: "unsupported: search filters are not supported".to_owned()
   3737             }]
   3738         );
   3739 
   3740         let _ = std::fs::remove_dir_all(root);
   3741     }
   3742 
   3743     #[tokio::test]
   3744     async fn runtime_rate_limits_count_filter_kinds() {
   3745         let root = temp_root("runtime-count-kind-rate-limit");
   3746         let _ = std::fs::remove_dir_all(&root);
   3747         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3748         let kind = Kind::new(1).expect("kind");
   3749         let rule = runtime.config().rate_limits().count().per_kind();
   3750         let key = TangleRateLimitKey::kind(TangleRateLimitScope::Count, kind);
   3751         for _ in 0..rule.max_hits() {
   3752             runtime
   3753                 .rate_limiter()
   3754                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3755         }
   3756         let handle = RelayRuntimeHandle::new(runtime);
   3757         let mut auth = handle.auth_state().await.expect("auth");
   3758         let subscription_id = SubscriptionId::new("limited-count-kind").expect("subscription");
   3759         let filters = vec![
   3760             filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"),
   3761         ];
   3762 
   3763         assert_eq!(
   3764             handle
   3765                 .handle_protocol_client_message_for_test(
   3766                     ClientMessage::Count {
   3767                         subscription_id: subscription_id.clone(),
   3768                         filters
   3769                     },
   3770                     &mut auth,
   3771                     UnixTimestamp::new(1_714_124_433)
   3772                 )
   3773                 .await
   3774                 .expect("count"),
   3775             vec![RelayMessage::Closed {
   3776                 subscription_id,
   3777                 message: "rate-limited: count kind rate limit exceeded until 1714124493".to_owned()
   3778             }]
   3779         );
   3780 
   3781         let _ = std::fs::remove_dir_all(root);
   3782     }
   3783 
   3784     #[tokio::test]
   3785     async fn runtime_refuses_broad_count_queries_before_rate_limits() {
   3786         let root = temp_root("runtime-count-broad-refusal");
   3787         let _ = std::fs::remove_dir_all(&root);
   3788         let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime");
   3789         let rule = runtime.config().rate_limits().count().broad();
   3790         let key = TangleRateLimitKey::query_class(
   3791             TangleRateLimitScope::Count,
   3792             TangleRateLimitQueryClass::Broad,
   3793         );
   3794         for _ in 0..rule.max_hits() {
   3795             runtime
   3796                 .rate_limiter()
   3797                 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
   3798         }
   3799         let handle = RelayRuntimeHandle::new(runtime);
   3800         let mut auth = handle.auth_state().await.expect("auth");
   3801         let subscription_id = SubscriptionId::new("limited-count-broad").expect("subscription");
   3802         let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")];
   3803 
   3804         assert_eq!(
   3805             handle
   3806                 .handle_protocol_client_message_for_test(
   3807                     ClientMessage::Count {
   3808                         subscription_id: subscription_id.clone(),
   3809                         filters
   3810                     },
   3811                     &mut auth,
   3812                     UnixTimestamp::new(1_714_124_433)
   3813                 )
   3814                 .await
   3815                 .expect("count"),
   3816             vec![RelayMessage::Closed {
   3817                 subscription_id,
   3818                 message: "restricted: count filters are too broad or expensive".to_owned()
   3819             }]
   3820         );
   3821         assert_eq!(handle.metrics().count_refusals(), 1);
   3822         assert_eq!(handle.metrics().broad_query_rejections(), 1);
   3823 
   3824         let _ = std::fs::remove_dir_all(root);
   3825     }
   3826 
   3827     #[tokio::test]
   3828     async fn runtime_refuses_expensive_count_queries_deterministically() {
   3829         let root = temp_root("runtime-count-expensive-refusal");
   3830         let _ = std::fs::remove_dir_all(&root);
   3831         let handle =
   3832             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"));
   3833         let mut auth = handle.auth_state().await.expect("auth");
   3834         let cases = [
   3835             ("missing-selector", json!({"kinds": [1], "limit": 1})),
   3836             (
   3837                 "high-limit",
   3838                 json!({"kinds": [1], "#h": ["Farm"], "limit": 500}),
   3839             ),
   3840             (
   3841                 "broad-window",
   3842                 json!({
   3843                     "kinds": [1],
   3844                     "since": 1,
   3845                     "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2,
   3846                     "limit": 1
   3847                 }),
   3848             ),
   3849         ];
   3850 
   3851         for (name, value) in cases {
   3852             let subscription_id = SubscriptionId::new(name).expect("subscription");
   3853             let filters = vec![filter_from_value(&value).expect("filter")];
   3854 
   3855             assert_eq!(
   3856                 handle
   3857                     .handle_protocol_client_message_for_test(
   3858                         ClientMessage::Count {
   3859                             subscription_id: subscription_id.clone(),
   3860                             filters
   3861                         },
   3862                         &mut auth,
   3863                         UnixTimestamp::new(1_714_124_433)
   3864                     )
   3865                     .await
   3866                     .expect("count"),
   3867                 vec![RelayMessage::Closed {
   3868                     subscription_id,
   3869                     message: "restricted: count filters are too broad or expensive".to_owned()
   3870                 }]
   3871             );
   3872         }
   3873         assert_eq!(handle.metrics().count_refusals(), 3);
   3874         assert_eq!(handle.metrics().broad_query_rejections(), 3);
   3875 
   3876         let _ = std::fs::remove_dir_all(root);
   3877     }
   3878 
   3879     #[tokio::test]
   3880     async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() {
   3881         let root = temp_root("runtime-generated-offset-fanout");
   3882         let _ = std::fs::remove_dir_all(&root);
   3883         let handle =
   3884             RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"));
   3885         let mut offsets = handle.subscribe_events().await;
   3886         let mut auth = handle.auth_state().await.expect("auth");
   3887         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   3888             .expect("challenge");
   3889         let auth_event =
   3890             tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 120).expect("auth event");
   3891         let create = tangle_v2_group_create_event(FixtureKey::Owner, "RuntimeFarm", 121, &[])
   3892             .expect("create");
   3893         let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions");
   3894         let subscription_id = SubscriptionId::new("generated-offsets").expect("subscription");
   3895         subscriptions
   3896             .subscribe(
   3897                 subscription_id.clone(),
   3898                 vec![pocket_filter(json!({
   3899                     "kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS],
   3900                     "#d":["RuntimeFarm"]
   3901                 }))],
   3902             )
   3903             .expect("subscribe");
   3904 
   3905         assert_eq!(
   3906             handle
   3907                 .handle_protocol_client_message_for_test(
   3908                     ClientMessage::Auth(auth_event.clone()),
   3909                     &mut auth,
   3910                     UnixTimestamp::new(120)
   3911                 )
   3912                 .await
   3913                 .expect("auth"),
   3914             vec![RelayMessage::Ok {
   3915                 event_id: auth_event.id().clone(),
   3916                 accepted: true,
   3917                 message: String::new()
   3918             }]
   3919         );
   3920         assert_eq!(
   3921             handle
   3922                 .handle_protocol_client_message_for_test(
   3923                     ClientMessage::Event(create.clone()),
   3924                     &mut auth,
   3925                     UnixTimestamp::new(121)
   3926                 )
   3927                 .await
   3928                 .expect("create"),
   3929             vec![RelayMessage::Ok {
   3930                 event_id: create.id().clone(),
   3931                 accepted: true,
   3932                 message: String::new()
   3933             }]
   3934         );
   3935         let source_offset = offsets.try_recv().expect("source offset");
   3936         let generated_offsets = [
   3937             offsets.try_recv().expect("first generated offset"),
   3938             offsets.try_recv().expect("second generated offset"),
   3939         ];
   3940         assert!(source_offset < generated_offsets[0]);
   3941         assert!(generated_offsets[0] < generated_offsets[1]);
   3942         let put_member =
   3943             tangle_v2_put_user_event(FixtureKey::Owner, "RuntimeFarm", FixtureKey::Member, 122)
   3944                 .expect("put member");
   3945         assert_eq!(
   3946             handle
   3947                 .handle_protocol_client_message_for_test(
   3948                     ClientMessage::Event(put_member.clone()),
   3949                     &mut auth,
   3950                     UnixTimestamp::new(122)
   3951                 )
   3952                 .await
   3953                 .expect("put member"),
   3954             vec![RelayMessage::Ok {
   3955                 event_id: put_member.id().clone(),
   3956                 accepted: true,
   3957                 message: String::new()
   3958             }]
   3959         );
   3960         let put_source_offset = offsets.try_recv().expect("put source offset");
   3961         let member_generated_offset = offsets.try_recv().expect("member generated offset");
   3962         assert!(generated_offsets[1] < put_source_offset);
   3963         assert!(put_source_offset < member_generated_offset);
   3964         let generated_offsets = [
   3965             generated_offsets[0],
   3966             generated_offsets[1],
   3967             member_generated_offset,
   3968         ];
   3969         let mut generated_kinds = BTreeSet::new();
   3970         for offset in generated_offsets {
   3971             let messages = handle
   3972                 .fanout_event_offset(offset, &mut subscriptions, &auth)
   3973                 .await
   3974                 .expect("fanout");
   3975             assert!(matches!(
   3976                 messages.as_slice(),
   3977                 [RuntimeRelayMessage::Event {
   3978                     subscription_id: delivered,
   3979                     event
   3980                 }] if delivered == &subscription_id
   3981                     && generated_kinds.insert(u32::from(event.kind().as_u16()))
   3982             ));
   3983         }
   3984         assert_eq!(
   3985             generated_kinds,
   3986             BTreeSet::from([KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS])
   3987         );
   3988         assert_eq!(handle.metrics().outbox_replayed_events(), 3);
   3989         assert_eq!(handle.metrics().outbox_pending_events(), 0);
   3990         assert_eq!(handle.metrics().event_bus_published_offsets(), 5);
   3991         assert_eq!(
   3992             offsets.try_recv().expect_err("only source plus generated"),
   3993             TangleEventReceiveError::Empty
   3994         );
   3995 
   3996         let _ = std::fs::remove_dir_all(root);
   3997     }
   3998 
   3999     #[tokio::test]
   4000     async fn runtime_group_concurrency_duplicate_create_accepts_one_projection() {
   4001         let root = temp_root("runtime-group-concurrency-duplicate-create");
   4002         let _ = std::fs::remove_dir_all(&root);
   4003         let handle = RelayRuntimeHandle::new(
   4004             RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"),
   4005         );
   4006         let mut offsets = handle.subscribe_events().await;
   4007         let owner_auth =
   4008             authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-create", 1_714_126_100)
   4009                 .await;
   4010         let first =
   4011             tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_101, &[])
   4012                 .expect("first create");
   4013         let second =
   4014             tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_102, &[])
   4015                 .expect("second create");
   4016         let first_task = {
   4017             let handle = handle.clone();
   4018             let mut auth = owner_auth.clone();
   4019             let event = first.clone();
   4020             tokio::spawn(async move {
   4021                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_101).await
   4022             })
   4023         };
   4024         let second_task = {
   4025             let handle = handle.clone();
   4026             let mut auth = owner_auth.clone();
   4027             let event = second.clone();
   4028             tokio::spawn(async move {
   4029                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_102).await
   4030             })
   4031         };
   4032         let replies = tokio::time::timeout(Duration::from_secs(3), async {
   4033             vec![
   4034                 first_task.await.expect("first task"),
   4035                 second_task.await.expect("second task"),
   4036             ]
   4037         })
   4038         .await
   4039         .expect("duplicate create race");
   4040 
   4041         assert_eq!(accepted_count(&replies), 1);
   4042         assert_eq!(
   4043             rejected_messages(&replies),
   4044             vec!["invalid: group already exists".to_owned()]
   4045         );
   4046         assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3);
   4047         assert_eq!(
   4048             offsets
   4049                 .try_recv()
   4050                 .expect_err("one create source plus generated"),
   4051             TangleEventReceiveError::Empty
   4052         );
   4053         let mut auth = owner_auth.clone();
   4054         assert_eq!(
   4055             runtime_group_count(
   4056                 &handle,
   4057                 "duplicate-create-count",
   4058                 "RaceCreate",
   4059                 KIND_GROUP_METADATA,
   4060                 "d",
   4061                 &mut auth,
   4062                 1_714_126_103,
   4063             )
   4064             .await,
   4065             1
   4066         );
   4067         assert_live_projection_matches_rebuild(&handle, "RaceCreate");
   4068 
   4069         let _ = std::fs::remove_dir_all(root);
   4070     }
   4071 
   4072     #[tokio::test]
   4073     async fn runtime_group_concurrency_duplicate_join_accepts_one_membership() {
   4074         let root = temp_root("runtime-group-concurrency-duplicate-join");
   4075         let _ = std::fs::remove_dir_all(&root);
   4076         let handle = RelayRuntimeHandle::new(
   4077             RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"),
   4078         );
   4079         let mut offsets = handle.subscribe_events().await;
   4080         let mut owner_auth =
   4081             authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-join", 1_714_126_200)
   4082                 .await;
   4083         let member_auth =
   4084             authenticated_runtime_state(&handle, FixtureKey::Member, "member-join", 1_714_126_201)
   4085                 .await;
   4086         let create =
   4087             tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoin", 1_714_126_202, &[])
   4088                 .expect("create");
   4089         assert_accepted_reply(
   4090             runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_202).await,
   4091             &create,
   4092         );
   4093         assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3);
   4094         let join_a =
   4095             tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_203).expect("join a");
   4096         let join_b =
   4097             tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_204).expect("join b");
   4098         let first_task = {
   4099             let handle = handle.clone();
   4100             let mut auth = member_auth.clone();
   4101             let event = join_a.clone();
   4102             tokio::spawn(async move {
   4103                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_203).await
   4104             })
   4105         };
   4106         let second_task = {
   4107             let handle = handle.clone();
   4108             let mut auth = member_auth.clone();
   4109             let event = join_b.clone();
   4110             tokio::spawn(async move {
   4111                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_204).await
   4112             })
   4113         };
   4114         let replies = tokio::time::timeout(Duration::from_secs(3), async {
   4115             vec![
   4116                 first_task.await.expect("first task"),
   4117                 second_task.await.expect("second task"),
   4118             ]
   4119         })
   4120         .await
   4121         .expect("duplicate join race");
   4122 
   4123         assert_eq!(accepted_count(&replies), 1);
   4124         assert_eq!(
   4125             rejected_messages(&replies),
   4126             vec!["duplicate: group member already exists".to_owned()]
   4127         );
   4128         assert_eq!(drain_offsets(&mut offsets, 2).await.len(), 2);
   4129         assert_runtime_member_status(
   4130             &handle,
   4131             "RaceJoin",
   4132             &FixtureKey::Member.public_key(),
   4133             MemberStatus::Member,
   4134         );
   4135         assert_live_projection_matches_rebuild(&handle, "RaceJoin");
   4136 
   4137         let _ = std::fs::remove_dir_all(root);
   4138     }
   4139 
   4140     #[tokio::test]
   4141     async fn runtime_group_concurrency_join_and_leave_match_rebuild() {
   4142         let root = temp_root("runtime-group-concurrency-join-leave");
   4143         let _ = std::fs::remove_dir_all(&root);
   4144         let handle = RelayRuntimeHandle::new(
   4145             RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"),
   4146         );
   4147         let mut owner_auth = authenticated_runtime_state(
   4148             &handle,
   4149             FixtureKey::Owner,
   4150             "owner-join-leave",
   4151             1_714_126_300,
   4152         )
   4153         .await;
   4154         let member_auth = authenticated_runtime_state(
   4155             &handle,
   4156             FixtureKey::Member,
   4157             "member-join-leave",
   4158             1_714_126_301,
   4159         )
   4160         .await;
   4161         let create =
   4162             tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoinLeave", 1_714_126_302, &[])
   4163                 .expect("create");
   4164         let put_member = tangle_v2_put_user_event(
   4165             FixtureKey::Owner,
   4166             "RaceJoinLeave",
   4167             FixtureKey::Member,
   4168             1_714_126_303,
   4169         )
   4170         .expect("put member");
   4171         assert_accepted_reply(
   4172             runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_302).await,
   4173             &create,
   4174         );
   4175         assert_accepted_reply(
   4176             runtime_event_reply(&handle, put_member.clone(), &mut owner_auth, 1_714_126_303).await,
   4177             &put_member,
   4178         );
   4179         let leave = tangle_v2_leave_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_304)
   4180             .expect("leave");
   4181         let join =
   4182             tangle_v2_join_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_305).expect("join");
   4183         let leave_task = {
   4184             let handle = handle.clone();
   4185             let mut auth = member_auth.clone();
   4186             let event = leave.clone();
   4187             tokio::spawn(async move {
   4188                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_304).await
   4189             })
   4190         };
   4191         let join_task = {
   4192             let handle = handle.clone();
   4193             let mut auth = member_auth.clone();
   4194             let event = join.clone();
   4195             tokio::spawn(async move {
   4196                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_305).await
   4197             })
   4198         };
   4199         let replies = tokio::time::timeout(Duration::from_secs(3), async {
   4200             vec![
   4201                 leave_task.await.expect("leave task"),
   4202                 join_task.await.expect("join task"),
   4203             ]
   4204         })
   4205         .await
   4206         .expect("join leave race");
   4207         let join_accepted = reply_is_accepted(&replies[1]);
   4208 
   4209         assert_eq!(accepted_count(&replies), if join_accepted { 2 } else { 1 });
   4210         if join_accepted {
   4211             assert!(rejected_messages(&replies).is_empty());
   4212         } else {
   4213             assert_eq!(
   4214                 rejected_messages(&replies),
   4215                 vec!["duplicate: group member already exists".to_owned()]
   4216             );
   4217         }
   4218         assert_runtime_member_status(
   4219             &handle,
   4220             "RaceJoinLeave",
   4221             &FixtureKey::Member.public_key(),
   4222             if join_accepted {
   4223                 MemberStatus::Member
   4224             } else {
   4225                 MemberStatus::Removed
   4226             },
   4227         );
   4228         assert_live_projection_matches_rebuild(&handle, "RaceJoinLeave");
   4229 
   4230         let _ = std::fs::remove_dir_all(root);
   4231     }
   4232 
   4233     #[tokio::test]
   4234     async fn runtime_group_concurrency_delete_tombstone_blocks_normal_write() {
   4235         let root = temp_root("runtime-group-concurrency-delete-write");
   4236         let _ = std::fs::remove_dir_all(&root);
   4237         let handle = RelayRuntimeHandle::new(
   4238             RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"),
   4239         );
   4240         let mut owner_auth =
   4241             authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-delete", 1_714_126_400)
   4242                 .await;
   4243         let create =
   4244             tangle_v2_group_create_event(FixtureKey::Owner, "RaceDelete", 1_714_126_401, &[])
   4245                 .expect("create");
   4246         assert_accepted_reply(
   4247             runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_401).await,
   4248             &create,
   4249         );
   4250         let normal =
   4251             tangle_v2_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_402, 1, "normal")
   4252                 .expect("normal");
   4253         let delete = tangle_v2_delete_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_403)
   4254             .expect("delete");
   4255         let normal_task = {
   4256             let handle = handle.clone();
   4257             let mut auth = owner_auth.clone();
   4258             let event = normal.clone();
   4259             tokio::spawn(async move {
   4260                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_402).await
   4261             })
   4262         };
   4263         let delete_task = {
   4264             let handle = handle.clone();
   4265             let mut auth = owner_auth.clone();
   4266             let event = delete.clone();
   4267             tokio::spawn(async move {
   4268                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_403).await
   4269             })
   4270         };
   4271         let replies = tokio::time::timeout(Duration::from_secs(3), async {
   4272             vec![
   4273                 normal_task.await.expect("normal task"),
   4274                 delete_task.await.expect("delete task"),
   4275             ]
   4276         })
   4277         .await
   4278         .expect("delete write race");
   4279         let delete_reply = &replies[1];
   4280 
   4281         assert!(reply_is_accepted(delete_reply));
   4282         assert!(
   4283             reply_is_accepted(&replies[0])
   4284                 || rejected_messages(&replies) == vec!["blocked: group is deleted".to_owned()]
   4285         );
   4286         let mut auth = owner_auth.clone();
   4287         assert_eq!(
   4288             runtime_group_count(
   4289                 &handle,
   4290                 "deleted-normal-count",
   4291                 "RaceDelete",
   4292                 1,
   4293                 "h",
   4294                 &mut auth,
   4295                 1_714_126_404,
   4296             )
   4297             .await,
   4298             0
   4299         );
   4300         assert_eq!(
   4301             runtime_group_count(
   4302                 &handle,
   4303                 "deleted-marker-count",
   4304                 "RaceDelete",
   4305                 KIND_GROUP_DELETE_GROUP,
   4306                 "h",
   4307                 &mut auth,
   4308                 1_714_126_405,
   4309             )
   4310             .await,
   4311             1
   4312         );
   4313         assert_live_projection_matches_rebuild(&handle, "RaceDelete");
   4314 
   4315         let _ = std::fs::remove_dir_all(root);
   4316     }
   4317 
   4318     #[tokio::test]
   4319     async fn runtime_group_concurrency_membership_mutation_matches_rebuild() {
   4320         let root = temp_root("runtime-group-concurrency-membership-mutation");
   4321         let _ = std::fs::remove_dir_all(&root);
   4322         let handle = RelayRuntimeHandle::new(
   4323             RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"),
   4324         );
   4325         let mut owner_auth = authenticated_runtime_state(
   4326             &handle,
   4327             FixtureKey::Owner,
   4328             "owner-membership",
   4329             1_714_126_500,
   4330         )
   4331         .await;
   4332         let create =
   4333             tangle_v2_group_create_event(FixtureKey::Owner, "RaceMember", 1_714_126_501, &[])
   4334                 .expect("create");
   4335         assert_accepted_reply(
   4336             runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_501).await,
   4337             &create,
   4338         );
   4339         let put_member = tangle_v2_put_user_event(
   4340             FixtureKey::Owner,
   4341             "RaceMember",
   4342             FixtureKey::Member,
   4343             1_714_126_502,
   4344         )
   4345         .expect("put member");
   4346         let remove_member = tangle_v2_remove_user_event(
   4347             FixtureKey::Owner,
   4348             "RaceMember",
   4349             FixtureKey::Member,
   4350             1_714_126_503,
   4351         )
   4352         .expect("remove member");
   4353         let put_task = {
   4354             let handle = handle.clone();
   4355             let mut auth = owner_auth.clone();
   4356             let event = put_member.clone();
   4357             tokio::spawn(async move {
   4358                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_502).await
   4359             })
   4360         };
   4361         let remove_task = {
   4362             let handle = handle.clone();
   4363             let mut auth = owner_auth.clone();
   4364             let event = remove_member.clone();
   4365             tokio::spawn(async move {
   4366                 runtime_event_reply(&handle, event, &mut auth, 1_714_126_503).await
   4367             })
   4368         };
   4369         let replies = tokio::time::timeout(Duration::from_secs(3), async {
   4370             vec![
   4371                 put_task.await.expect("put task"),
   4372                 remove_task.await.expect("remove task"),
   4373             ]
   4374         })
   4375         .await
   4376         .expect("membership mutation race");
   4377         let remove_accepted = reply_is_accepted(&replies[1]);
   4378 
   4379         assert!(reply_is_accepted(&replies[0]));
   4380         if remove_accepted {
   4381             assert!(rejected_messages(&replies).is_empty());
   4382         } else {
   4383             assert_eq!(
   4384                 rejected_messages(&replies),
   4385                 vec!["duplicate: group member does not exist".to_owned()]
   4386             );
   4387         }
   4388         assert_runtime_member_status(
   4389             &handle,
   4390             "RaceMember",
   4391             &FixtureKey::Member.public_key(),
   4392             if remove_accepted {
   4393                 MemberStatus::Removed
   4394             } else {
   4395                 MemberStatus::Member
   4396             },
   4397         );
   4398         assert_live_projection_matches_rebuild(&handle, "RaceMember");
   4399 
   4400         let _ = std::fs::remove_dir_all(root);
   4401     }
   4402 
   4403     #[tokio::test]
   4404     async fn runtime_shared_services_progress_under_concurrent_event_query_count_and_fanout() {
   4405         let root = temp_root("runtime-shared-concurrency");
   4406         let _ = std::fs::remove_dir_all(&root);
   4407         let handle = RelayRuntimeHandle::new(
   4408             RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"),
   4409         );
   4410         let base_time = 1_714_126_000;
   4411         let mut owner_auth = handle.auth_state().await.expect("owner auth");
   4412         owner_auth
   4413             .issue_challenge("owner-stress", UnixTimestamp::new(base_time))
   4414             .expect("owner challenge");
   4415         let owner_auth_event =
   4416             runtime_pocket_auth_event(FixtureKey::Owner, "owner-stress", base_time);
   4417         assert_eq!(
   4418             handle
   4419                 .handle_client_message(
   4420                     RuntimeClientMessage::Auth(owner_auth_event.clone()),
   4421                     &mut owner_auth,
   4422                     UnixTimestamp::new(base_time)
   4423                 )
   4424                 .await
   4425                 .expect("owner auth"),
   4426             vec![RelayMessage::Ok {
   4427                 event_id: runtime_pocket_event_id(&owner_auth_event),
   4428                 accepted: true,
   4429                 message: String::new()
   4430             }]
   4431         );
   4432         let create = runtime_pocket_group_create_event(
   4433             FixtureKey::Owner,
   4434             "StressPrivate",
   4435             base_time + 1,
   4436             &["private"],
   4437         );
   4438         assert_eq!(
   4439             handle
   4440                 .handle_client_message(
   4441                     RuntimeClientMessage::Event(create.clone()),
   4442                     &mut owner_auth,
   4443                     UnixTimestamp::new(base_time + 1)
   4444                 )
   4445                 .await
   4446                 .expect("create"),
   4447             vec![RelayMessage::Ok {
   4448                 event_id: runtime_pocket_event_id(&create),
   4449                 accepted: true,
   4450                 message: String::new()
   4451             }]
   4452         );
   4453         let put_member = runtime_pocket_put_user_event(
   4454             FixtureKey::Owner,
   4455             "StressPrivate",
   4456             FixtureKey::Member,
   4457             base_time + 2,
   4458         );
   4459         assert_eq!(
   4460             handle
   4461                 .handle_client_message(
   4462                     RuntimeClientMessage::Event(put_member.clone()),
   4463                     &mut owner_auth,
   4464                     UnixTimestamp::new(base_time + 2)
   4465                 )
   4466                 .await
   4467                 .expect("put member"),
   4468             vec![RelayMessage::Ok {
   4469                 event_id: runtime_pocket_event_id(&put_member),
   4470                 accepted: true,
   4471                 message: String::new()
   4472             }]
   4473         );
   4474         let mut member_auth = handle.auth_state().await.expect("member auth");
   4475         member_auth
   4476             .issue_challenge("member-stress", UnixTimestamp::new(base_time + 3))
   4477             .expect("member challenge");
   4478         let member_auth_event =
   4479             runtime_pocket_auth_event(FixtureKey::Member, "member-stress", base_time + 3);
   4480         assert_eq!(
   4481             handle
   4482                 .handle_client_message(
   4483                     RuntimeClientMessage::Auth(member_auth_event.clone()),
   4484                     &mut member_auth,
   4485                     UnixTimestamp::new(base_time + 3)
   4486                 )
   4487                 .await
   4488                 .expect("member auth"),
   4489             vec![RelayMessage::Ok {
   4490                 event_id: runtime_pocket_event_id(&member_auth_event),
   4491                 accepted: true,
   4492                 message: String::new()
   4493             }]
   4494         );
   4495         let public_auth = handle.auth_state().await.expect("public auth");
   4496         let mut offsets = handle.subscribe_events().await;
   4497         let group_write_count = 6_usize;
   4498         let public_write_count = 4_usize;
   4499         let mut write_tasks = Vec::new();
   4500         for index in 0..group_write_count {
   4501             let handle = handle.clone();
   4502             let mut auth = member_auth.clone();
   4503             write_tasks.push(tokio::spawn(async move {
   4504                 let event = runtime_pocket_group_event(
   4505                     FixtureKey::Member,
   4506                     "StressPrivate",
   4507                     base_time + 10 + u64::try_from(index).expect("index"),
   4508                     1,
   4509                     &format!("private stress {index}"),
   4510                 );
   4511                 assert_eq!(
   4512                     handle
   4513                         .handle_client_message(
   4514                             RuntimeClientMessage::Event(event.clone()),
   4515                             &mut auth,
   4516                             UnixTimestamp::new(
   4517                                 base_time + 10 + u64::try_from(index).expect("index")
   4518                             )
   4519                         )
   4520                         .await
   4521                         .expect("group write"),
   4522                     vec![RelayMessage::Ok {
   4523                         event_id: runtime_pocket_event_id(&event),
   4524                         accepted: true,
   4525                         message: String::new()
   4526                     }]
   4527                 );
   4528                 (true, runtime_pocket_event_id(&event))
   4529             }));
   4530         }
   4531         for index in 0..public_write_count {
   4532             let handle = handle.clone();
   4533             let mut auth = public_auth.clone();
   4534             write_tasks.push(tokio::spawn(async move {
   4535                 let event = runtime_pocket_event(
   4536                     FixtureKey::Admin,
   4537                     base_time + 40 + u64::try_from(index).expect("index"),
   4538                     1,
   4539                     Vec::new(),
   4540                     &format!("public stress {index}"),
   4541                 );
   4542                 assert_eq!(
   4543                     handle
   4544                         .handle_client_message(
   4545                             RuntimeClientMessage::Event(event.clone()),
   4546                             &mut auth,
   4547                             UnixTimestamp::new(
   4548                                 base_time + 40 + u64::try_from(index).expect("index")
   4549                             )
   4550                         )
   4551                         .await
   4552                         .expect("public write"),
   4553                     vec![RelayMessage::Ok {
   4554                         event_id: runtime_pocket_event_id(&event),
   4555                         accepted: true,
   4556                         message: String::new()
   4557                     }]
   4558                 );
   4559                 (false, runtime_pocket_event_id(&event))
   4560             }));
   4561         }
   4562         let stored_events = tokio::time::timeout(Duration::from_secs(3), async {
   4563             let mut stored_events = Vec::new();
   4564             for task in write_tasks {
   4565                 stored_events.push(task.await.expect("write task"));
   4566             }
   4567             stored_events
   4568         })
   4569         .await
   4570         .expect("write concurrency timeout");
   4571         assert_eq!(
   4572             stored_events
   4573                 .iter()
   4574                 .filter(|(is_group, _)| *is_group)
   4575                 .count(),
   4576             group_write_count
   4577         );
   4578         assert_eq!(
   4579             stored_events
   4580                 .iter()
   4581                 .filter(|(is_group, _)| !*is_group)
   4582                 .count(),
   4583             public_write_count
   4584         );
   4585         let group_event_ids = stored_events
   4586             .iter()
   4587             .filter(|(is_group, _)| *is_group)
   4588             .map(|(_, event_id)| event_id.clone())
   4589             .collect::<BTreeSet<_>>();
   4590         let mut published_offsets = Vec::new();
   4591         for _ in 0..stored_events.len() {
   4592             published_offsets.push(
   4593                 tokio::time::timeout(Duration::from_secs(1), offsets.recv())
   4594                     .await
   4595                     .expect("offset timeout")
   4596                     .expect("offset"),
   4597             );
   4598         }
   4599         assert_eq!(
   4600             offsets.try_recv().expect_err("no extra offsets"),
   4601             TangleEventReceiveError::Empty
   4602         );
   4603         let mut visibility_tasks = Vec::new();
   4604         for offset in published_offsets.iter().copied() {
   4605             let handle = handle.clone();
   4606             let member_auth = member_auth.clone();
   4607             let public_auth = public_auth.clone();
   4608             let group_event_ids = group_event_ids.clone();
   4609             visibility_tasks.push(tokio::spawn(async move {
   4610                 let member_event = handle
   4611                     .event_by_offset_with_auth(offset, &member_auth)
   4612                     .await
   4613                     .expect("member offset")
   4614                     .expect("member visible");
   4615                 let public_event = handle
   4616                     .event_by_offset_with_auth(offset, &public_auth)
   4617                     .await
   4618                     .expect("public offset");
   4619                 let member_event_id =
   4620                     EventId::new(&member_event.id().as_hex_string()).expect("pocket id");
   4621                 let is_group_event = group_event_ids.contains(&member_event_id);
   4622                 if is_group_event {
   4623                     assert!(public_event.is_none());
   4624                 } else {
   4625                     assert!(public_event.is_some());
   4626                 }
   4627                 is_group_event
   4628             }));
   4629         }
   4630         let visible_group_offsets = tokio::time::timeout(Duration::from_secs(3), async {
   4631             let mut visible_group_offsets = 0;
   4632             for task in visibility_tasks {
   4633                 if task.await.expect("visibility task") {
   4634                     visible_group_offsets += 1;
   4635                 }
   4636             }
   4637             visible_group_offsets
   4638         })
   4639         .await
   4640         .expect("visibility timeout");
   4641         assert_eq!(visible_group_offsets, group_write_count);
   4642         let member_subscription = SubscriptionId::new("member-stress-live").expect("subscription");
   4643         let public_subscription = SubscriptionId::new("public-stress-live").expect("subscription");
   4644         let mut member_subscriptions = LiveSubscriptionSet::new(32, 64).expect("member live set");
   4645         let mut public_subscriptions = LiveSubscriptionSet::new(32, 64).expect("public live set");
   4646         let stress_filter = pocket_filter(json!({"kinds":[1], "#h":["StressPrivate"]}));
   4647         member_subscriptions
   4648             .subscribe(member_subscription.clone(), vec![stress_filter.clone()])
   4649             .expect("member subscribe");
   4650         public_subscriptions
   4651             .subscribe(public_subscription, vec![stress_filter])
   4652             .expect("public subscribe");
   4653         let mut member_fanout_count = 0;
   4654         for offset in &published_offsets {
   4655             let public_replies = handle
   4656                 .fanout_event_offset(*offset, &mut public_subscriptions, &public_auth)
   4657                 .await
   4658                 .expect("public fanout");
   4659             assert!(public_replies.is_empty());
   4660             let member_replies = handle
   4661                 .fanout_event_offset(*offset, &mut member_subscriptions, &member_auth)
   4662                 .await
   4663                 .expect("member fanout");
   4664             for reply in member_replies {
   4665                 match reply {
   4666                     RuntimeRelayMessage::Event {
   4667                         subscription_id,
   4668                         event,
   4669                     } => {
   4670                         assert_eq!(subscription_id, member_subscription);
   4671                         let event_id =
   4672                             EventId::new(&event.id().as_hex_string()).expect("pocket id");
   4673                         assert!(group_event_ids.contains(&event_id));
   4674                         member_fanout_count += 1;
   4675                     }
   4676                     other => panic!("unexpected fanout reply {other:?}"),
   4677                 }
   4678             }
   4679         }
   4680         assert_eq!(member_fanout_count, group_write_count);
   4681         let mut query_tasks = Vec::new();
   4682         for index in 0..3_u64 {
   4683             let member_req_handle = handle.clone();
   4684             let mut auth = member_auth.clone();
   4685             let group_event_ids = group_event_ids.clone();
   4686             query_tasks.push(tokio::spawn(async move {
   4687                 let subscription_id =
   4688                     SubscriptionId::new(&format!("member-req-{index}")).expect("subscription");
   4689                 let replies = member_req_handle
   4690                     .handle_protocol_client_message_for_test(
   4691                         ClientMessage::Req {
   4692                             subscription_id: subscription_id.clone(),
   4693                             filters: vec![
   4694                                 filter_from_value(&json!({
   4695                                     "kinds":[1],
   4696                                     "#h":["StressPrivate"],
   4697                                     "limit": 20
   4698                                 }))
   4699                                 .expect("filter"),
   4700                             ],
   4701                         },
   4702                         &mut auth,
   4703                         UnixTimestamp::new(base_time + 100 + index),
   4704                     )
   4705                     .await
   4706                     .expect("member req");
   4707                 assert_eq!(
   4708                     replies
   4709                         .iter()
   4710                         .filter(|reply| matches!(
   4711                             reply,
   4712                             RelayMessage::Event {
   4713                                 subscription_id: delivered,
   4714                                 event
   4715                             } if delivered == &subscription_id && group_event_ids.contains(event.id())
   4716                         ))
   4717                         .count(),
   4718                     group_event_ids.len()
   4719                 );
   4720                 assert!(matches!(
   4721                     replies.last(),
   4722                     Some(RelayMessage::Eose(delivered)) if delivered == &subscription_id
   4723                 ));
   4724             }));
   4725             let public_req_handle = handle.clone();
   4726             let mut auth = public_auth.clone();
   4727             query_tasks.push(tokio::spawn(async move {
   4728                 let subscription_id =
   4729                     SubscriptionId::new(&format!("public-req-{index}")).expect("subscription");
   4730                 let replies = public_req_handle
   4731                     .handle_protocol_client_message_for_test(
   4732                         ClientMessage::Req {
   4733                             subscription_id: subscription_id.clone(),
   4734                             filters: vec![
   4735                                 filter_from_value(&json!({
   4736                                     "kinds":[1],
   4737                                     "#h":["StressPrivate"],
   4738                                     "limit": 20
   4739                                 }))
   4740                                 .expect("filter"),
   4741                             ],
   4742                         },
   4743                         &mut auth,
   4744                         UnixTimestamp::new(base_time + 110 + index),
   4745                     )
   4746                     .await
   4747                     .expect("public req");
   4748                 assert_eq!(
   4749                     replies,
   4750                     vec![RelayMessage::Closed {
   4751                         subscription_id,
   4752                         message: "auth-required: authentication required to read group events"
   4753                             .to_owned()
   4754                     }]
   4755                 );
   4756             }));
   4757             let member_count_handle = handle.clone();
   4758             let mut auth = member_auth.clone();
   4759             query_tasks.push(tokio::spawn(async move {
   4760                 let subscription_id =
   4761                     SubscriptionId::new(&format!("member-count-{index}")).expect("subscription");
   4762                 let replies = member_count_handle
   4763                     .handle_protocol_client_message_for_test(
   4764                         ClientMessage::Count {
   4765                             subscription_id: subscription_id.clone(),
   4766                             filters: vec![
   4767                                 filter_from_value(&json!({
   4768                                     "kinds":[1],
   4769                                     "#h":["StressPrivate"]
   4770                                 }))
   4771                                 .expect("filter"),
   4772                             ],
   4773                         },
   4774                         &mut auth,
   4775                         UnixTimestamp::new(base_time + 120 + index),
   4776                     )
   4777                     .await
   4778                     .expect("member count");
   4779                 assert_eq!(
   4780                     replies,
   4781                     vec![RelayMessage::Count {
   4782                         subscription_id,
   4783                         count: u64::try_from(group_write_count).expect("group count"),
   4784                         hll: None
   4785                     }]
   4786                 );
   4787             }));
   4788             let public_count_handle = handle.clone();
   4789             let mut auth = public_auth.clone();
   4790             query_tasks.push(tokio::spawn(async move {
   4791                 let subscription_id =
   4792                     SubscriptionId::new(&format!("public-count-{index}")).expect("subscription");
   4793                 let replies = public_count_handle
   4794                     .handle_protocol_client_message_for_test(
   4795                         ClientMessage::Count {
   4796                             subscription_id: subscription_id.clone(),
   4797                             filters: vec![
   4798                                 filter_from_value(&json!({
   4799                                     "kinds":[1],
   4800                                     "#h":["StressPrivate"]
   4801                                 }))
   4802                                 .expect("filter"),
   4803                             ],
   4804                         },
   4805                         &mut auth,
   4806                         UnixTimestamp::new(base_time + 130 + index),
   4807                     )
   4808                     .await
   4809                     .expect("public count");
   4810                 assert_eq!(
   4811                     replies,
   4812                     vec![RelayMessage::Count {
   4813                         subscription_id,
   4814                         count: 0,
   4815                         hll: None
   4816                     }]
   4817                 );
   4818             }));
   4819         }
   4820         tokio::time::timeout(Duration::from_secs(3), async {
   4821             for task in query_tasks {
   4822                 task.await.expect("query task");
   4823             }
   4824         })
   4825         .await
   4826         .expect("query concurrency timeout");
   4827         assert!(handle.metrics().query_candidates_scanned() > 0);
   4828         assert!(
   4829             handle.metrics().query_returned_events()
   4830                 >= u64::try_from(group_write_count * 3).expect("returned event count")
   4831         );
   4832         assert!(handle.metrics().query_redacted_events() > 0);
   4833         handle.shutdown().await.expect("shutdown");
   4834 
   4835         let _ = std::fs::remove_dir_all(root);
   4836     }
   4837 
   4838     fn runtime_config(root: &Path, per_connection_outbound_queue: usize) -> BaseRelayRuntimeConfig {
   4839         runtime_config_with_group_policy(root, per_connection_outbound_queue, false)
   4840     }
   4841 
   4842     fn runtime_config_with_public_join(
   4843         root: &Path,
   4844         per_connection_outbound_queue: usize,
   4845     ) -> BaseRelayRuntimeConfig {
   4846         runtime_config_with_group_policy(root, per_connection_outbound_queue, true)
   4847     }
   4848 
   4849     #[derive(Default)]
   4850     struct RecordingHooks {
   4851         admissions: Mutex<Vec<RelayEventAdmissionContext>>,
   4852         stored: Mutex<Vec<RelayEventStoredContext>>,
   4853     }
   4854 
   4855     impl RelayRuntimeHooks for RecordingHooks {
   4856         fn admit_event(&self, context: &RelayEventAdmissionContext) -> EventAdmissionDecision {
   4857             self.admissions
   4858                 .lock()
   4859                 .expect("admissions")
   4860                 .push(context.clone());
   4861             if context.event().has_tag("policy", "reject") {
   4862                 EventAdmissionDecision::reject("hook rejected event")
   4863             } else {
   4864                 EventAdmissionDecision::Accept
   4865             }
   4866         }
   4867 
   4868         fn event_stored(&self, context: &RelayEventStoredContext) {
   4869             self.stored.lock().expect("stored").push(context.clone());
   4870         }
   4871     }
   4872 
   4873     fn runtime_config_with_group_policy(
   4874         root: &Path,
   4875         per_connection_outbound_queue: usize,
   4876         public_join: bool,
   4877     ) -> BaseRelayRuntimeConfig {
   4878         let raw = json!({
   4879             "server": {
   4880                 "listen_addr": "127.0.0.1:0",
   4881                 "relay_url": "wss://relay.radroots.test"
   4882             },
   4883             "pocket": {
   4884                 "data_directory": root.join("pocket"),
   4885                 "sync_policy": "flush_on_shutdown",
   4886                 "query": {
   4887                   "allow_scraping": false,
   4888                   "allow_scrape_if_limited_to": 100,
   4889                   "allow_scrape_if_max_seconds": 3600
   4890                 }
   4891             },
   4892             "groups": {
   4893                 "enabled": true,
   4894                 "canonical_relay_url": "wss://relay.radroots.test",
   4895                 "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777",
   4896                 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()],
   4897                 "policy": {
   4898                     "public_join": public_join,
   4899                     "invites_enabled": false
   4900                 }
   4901             },
   4902             "auth": {
   4903                 "challenge_ttl_seconds": 300,
   4904                 "created_at_skew_seconds": 600
   4905             },
   4906             "limits": {
   4907                 "max_message_length": 1048576,
   4908                 "max_subid_length": 64,
   4909                 "max_subscriptions_per_connection": 64,
   4910                 "max_filters_per_request": 10,
   4911                 "max_tag_values_per_filter": 100,
   4912                 "max_query_complexity": 2048,
   4913                 "max_limit": 500,
   4914                 "default_limit": 100,
   4915                 "max_event_tags": 200,
   4916                 "max_content_length": 65536,
   4917                 "broadcast_channel_capacity": 16,
   4918                 "per_connection_outbound_queue": per_connection_outbound_queue
   4919             },
   4920             "rate_limits": {
   4921                 "auth": {
   4922                     "per_ip": {"window_seconds": 60, "max_hits": 120},
   4923                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
   4924                     "failures": {"window_seconds": 300, "max_hits": 5},
   4925                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
   4926                 },
   4927                 "event": {
   4928                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   4929                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   4930                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
   4931                 },
   4932                 "group": {
   4933                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
   4934                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
   4935                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
   4936                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
   4937                     "join_flow": {"window_seconds": 300, "max_hits": 10},
   4938                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
   4939                 },
   4940                 "req": {
   4941                     "per_ip": {"window_seconds": 60, "max_hits": 600},
   4942                     "per_connection": {"window_seconds": 60, "max_hits": 120},
   4943                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
   4944                     "per_group": {"window_seconds": 60, "max_hits": 240},
   4945                     "per_kind": {"window_seconds": 60, "max_hits": 500},
   4946                     "broad": {"window_seconds": 60, "max_hits": 30}
   4947                 },
   4948                 "count": {
   4949                     "per_ip": {"window_seconds": 60, "max_hits": 300},
   4950                     "per_connection": {"window_seconds": 60, "max_hits": 60},
   4951                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   4952                     "per_group": {"window_seconds": 60, "max_hits": 120},
   4953                     "per_kind": {"window_seconds": 60, "max_hits": 240},
   4954                     "broad": {"window_seconds": 60, "max_hits": 20}
   4955                 }
   4956             }
   4957         })
   4958         .to_string();
   4959         parse_base_relay_runtime_config_json(&raw).expect("config")
   4960     }
   4961 
   4962     async fn authenticated_runtime_state(
   4963         handle: &RelayRuntimeHandle,
   4964         key: FixtureKey,
   4965         challenge: &str,
   4966         now: u64,
   4967     ) -> BaseAuthState {
   4968         let mut auth = handle.auth_state().await.expect("auth");
   4969         auth.issue_challenge(challenge, UnixTimestamp::new(now))
   4970             .expect("challenge");
   4971         let event = tangle_v2_auth_event(key, challenge, now).expect("auth event");
   4972         let replies = handle
   4973             .handle_protocol_client_message_for_test(
   4974                 ClientMessage::Auth(event.clone()),
   4975                 &mut auth,
   4976                 UnixTimestamp::new(now),
   4977             )
   4978             .await
   4979             .expect("auth message");
   4980 
   4981         assert_eq!(
   4982             replies,
   4983             vec![RelayMessage::Ok {
   4984                 event_id: event.id().clone(),
   4985                 accepted: true,
   4986                 message: String::new()
   4987             }]
   4988         );
   4989         auth
   4990     }
   4991 
   4992     async fn runtime_event_reply(
   4993         handle: &RelayRuntimeHandle,
   4994         event: Event,
   4995         auth: &mut BaseAuthState,
   4996         now: u64,
   4997     ) -> RelayMessage {
   4998         let replies = handle
   4999             .handle_protocol_client_message_for_test(
   5000                 ClientMessage::Event(event),
   5001                 auth,
   5002                 UnixTimestamp::new(now),
   5003             )
   5004             .await
   5005             .expect("event message");
   5006 
   5007         assert_eq!(replies.len(), 1);
   5008         replies.into_iter().next().expect("reply")
   5009     }
   5010 
   5011     fn runtime_pocket_event_reply(
   5012         handle: &RelayRuntimeHandle,
   5013         event: &PocketEvent,
   5014         auth: &mut BaseAuthState,
   5015     ) -> RelayMessage {
   5016         handle
   5017             .inner
   5018             .handle_pocket_event_with_auth_report(event, auth)
   5019             .expect("event message")
   5020             .into_message()
   5021     }
   5022 
   5023     async fn runtime_group_count(
   5024         handle: &RelayRuntimeHandle,
   5025         subscription_id: &str,
   5026         group_id: &str,
   5027         kind: u32,
   5028         tag_name: &str,
   5029         auth: &mut BaseAuthState,
   5030         now: u64,
   5031     ) -> u64 {
   5032         let replies = handle
   5033             .handle_protocol_client_message_for_test(
   5034                 ClientMessage::Count {
   5035                     subscription_id: SubscriptionId::new(subscription_id).expect("subscription"),
   5036                     filters: vec![runtime_group_filter(group_id, kind, tag_name)],
   5037                 },
   5038                 auth,
   5039                 UnixTimestamp::new(now),
   5040             )
   5041             .await
   5042             .expect("count");
   5043 
   5044         match replies.as_slice() {
   5045             [RelayMessage::Count { count, .. }] => *count,
   5046             other => panic!("count reply expected, got {other:?}"),
   5047         }
   5048     }
   5049 
   5050     fn runtime_group_filter(group_id: &str, kind: u32, tag_name: &str) -> Filter {
   5051         let mut value = json!({"kinds": [kind]});
   5052         value
   5053             .as_object_mut()
   5054             .expect("filter")
   5055             .insert(format!("#{tag_name}"), json!([group_id]));
   5056         filter_from_value(&value).expect("filter")
   5057     }
   5058 
   5059     async fn drain_offsets(receiver: &mut TangleEventReceiver, count: usize) -> Vec<StoreOffset> {
   5060         let mut offsets = Vec::with_capacity(count);
   5061         for _ in 0..count {
   5062             offsets.push(
   5063                 tokio::time::timeout(Duration::from_secs(1), receiver.recv())
   5064                     .await
   5065                     .expect("offset timeout")
   5066                     .expect("offset"),
   5067             );
   5068         }
   5069         offsets
   5070     }
   5071 
   5072     fn accepted_count(replies: &[RelayMessage]) -> usize {
   5073         replies
   5074             .iter()
   5075             .filter(|reply| reply_is_accepted(reply))
   5076             .count()
   5077     }
   5078 
   5079     fn reply_is_accepted(reply: &RelayMessage) -> bool {
   5080         matches!(
   5081             reply,
   5082             RelayMessage::Ok {
   5083                 accepted: true,
   5084                 message,
   5085                 ..
   5086             } if message.is_empty()
   5087         )
   5088     }
   5089 
   5090     fn rejected_messages(replies: &[RelayMessage]) -> Vec<String> {
   5091         replies
   5092             .iter()
   5093             .filter_map(|reply| match reply {
   5094                 RelayMessage::Ok {
   5095                     accepted: false,
   5096                     message,
   5097                     ..
   5098                 } => Some(message.clone()),
   5099                 _ => None,
   5100             })
   5101             .collect()
   5102     }
   5103 
   5104     fn assert_accepted_reply(reply: RelayMessage, event: &Event) {
   5105         assert_eq!(
   5106             reply,
   5107             RelayMessage::Ok {
   5108                 event_id: event.id().clone(),
   5109                 accepted: true,
   5110                 message: String::new()
   5111             }
   5112         );
   5113     }
   5114 
   5115     fn assert_accepted_pocket_reply(reply: RelayMessage, event: &PocketEvent) {
   5116         assert_eq!(
   5117             reply,
   5118             RelayMessage::Ok {
   5119                 event_id: runtime_pocket_event_id(event),
   5120                 accepted: true,
   5121                 message: String::new()
   5122             }
   5123         );
   5124     }
   5125 
   5126     fn runtime_pocket_event_id(event: &PocketEvent) -> EventId {
   5127         EventId::new(&event.id().as_hex_string()).expect("event id")
   5128     }
   5129 
   5130     fn assert_runtime_member_status(
   5131         handle: &RelayRuntimeHandle,
   5132         group_id: &str,
   5133         pubkey: &PublicKeyHex,
   5134         status: MemberStatus,
   5135     ) {
   5136         let group_id = GroupId::new(group_id).expect("group");
   5137         let groups = handle.inner.groups.as_ref().expect("groups");
   5138         let projection = groups.projection();
   5139 
   5140         assert_eq!(
   5141             projection
   5142                 .member(&group_id, pubkey)
   5143                 .expect("member")
   5144                 .status(),
   5145             status
   5146         );
   5147     }
   5148 
   5149     fn assert_live_projection_matches_rebuild(handle: &RelayRuntimeHandle, group_id: &str) {
   5150         let group_id = GroupId::new(group_id).expect("group");
   5151         let groups = handle.inner.groups.as_ref().expect("groups");
   5152         let live = groups.projection();
   5153         let rebuilt = rebuilt_projection(handle);
   5154         let live_group = live.group(&group_id);
   5155         let rebuilt_group = rebuilt.group(&group_id);
   5156 
   5157         assert_eq!(
   5158             live_group.map(|group| group.lifecycle()),
   5159             rebuilt_group.map(|group| group.lifecycle())
   5160         );
   5161         assert_eq!(
   5162             live_group.map(|group| group.metadata()),
   5163             rebuilt_group.map(|group| group.metadata())
   5164         );
   5165         assert_eq!(
   5166             live_group.and_then(|group| group.delete_event_id()),
   5167             rebuilt_group.and_then(|group| group.delete_event_id())
   5168         );
   5169         assert_eq!(live.tombstone(&group_id), rebuilt.tombstone(&group_id));
   5170         assert_eq!(
   5171             projection_member_statuses(&live, &group_id),
   5172             projection_member_statuses(&rebuilt, &group_id)
   5173         );
   5174     }
   5175 
   5176     fn rebuilt_projection(handle: &RelayRuntimeHandle) -> GroupProjection {
   5177         let groups = handle.inner.groups.as_ref().expect("groups");
   5178         let limits = groups.limits();
   5179         let events = handle
   5180             .inner
   5181             .store
   5182             .scan_events()
   5183             .expect("scan")
   5184             .into_iter()
   5185             .filter_map(|stored| {
   5186                 let store_offset = StoreOffset::new(stored.store_offset());
   5187                 match tangle_groups::classify_group_event(stored.event(), limits).expect("classify")
   5188                 {
   5189                     GroupEventClass::NonGroup => None,
   5190                     _ => Some(CanonicalGroupEvent::new(stored.into_event(), store_offset)),
   5191                 }
   5192             })
   5193             .collect::<Vec<_>>();
   5194 
   5195         rebuild_group_projection(events, limits, UnixTimestamp::new(1_714_199_999))
   5196             .expect("rebuild")
   5197             .into_projection()
   5198     }
   5199 
   5200     fn projection_member_statuses(
   5201         projection: &GroupProjection,
   5202         group_id: &GroupId,
   5203     ) -> BTreeMap<String, MemberStatus> {
   5204         projection
   5205             .members()
   5206             .iter()
   5207             .filter(|((candidate, _), _)| candidate == group_id)
   5208             .map(|((_, pubkey), member)| (pubkey.as_str().to_owned(), member.status()))
   5209             .collect()
   5210     }
   5211 
   5212     fn runtime_relay_limits(max_pending_events: usize) -> BaseRelayLimits {
   5213         BaseRelayLimits::new(BaseRelayLimitSettings {
   5214             max_pending_events,
   5215             max_subscription_id_length: 64,
   5216             max_subscriptions: 64,
   5217             max_filters_per_request: 10,
   5218             max_tag_values_per_filter: 100,
   5219             max_query_complexity: 610,
   5220             max_event_tags: 200,
   5221             max_content_length: 65_536,
   5222             max_limit: 500,
   5223             default_limit: 100,
   5224         })
   5225         .expect("limits")
   5226     }
   5227 
   5228     fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter {
   5229         let filter = filter_from_value(&value).expect("filter");
   5230         crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter")
   5231     }
   5232 
   5233     fn tangle_v2_event(
   5234         key: FixtureKey,
   5235         created_at: u64,
   5236         kind: u64,
   5237         tags: Vec<Tag>,
   5238         content: &str,
   5239     ) -> Result<Event, String> {
   5240         let event = runtime_pocket_event(key, created_at, kind, tags, content);
   5241         runtime_pocket_event_to_protocol(&event)
   5242     }
   5243 
   5244     fn tangle_v2_auth_event(
   5245         key: FixtureKey,
   5246         challenge: &str,
   5247         created_at: u64,
   5248     ) -> Result<Event, String> {
   5249         tangle_v2_event(
   5250             key,
   5251             created_at,
   5252             22_242,
   5253             vec![
   5254                 Tag::from_parts("relay", &["wss://relay.radroots.test"])?,
   5255                 Tag::from_parts("challenge", &[challenge])?,
   5256             ],
   5257             "",
   5258         )
   5259     }
   5260 
   5261     fn tangle_v2_group_create_event(
   5262         key: FixtureKey,
   5263         group_id: &str,
   5264         created_at: u64,
   5265         flags: &[&str],
   5266     ) -> Result<Event, String> {
   5267         let mut tags = vec![
   5268             Tag::from_parts("h", &[group_id])?,
   5269             Tag::from_parts("name", &[group_id])?,
   5270         ];
   5271         for flag in flags {
   5272             tags.push(Tag::from_parts(flag, &[])?);
   5273         }
   5274         tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "")
   5275     }
   5276 
   5277     fn tangle_v2_put_user_event(
   5278         key: FixtureKey,
   5279         group_id: &str,
   5280         target: FixtureKey,
   5281         created_at: u64,
   5282     ) -> Result<Event, String> {
   5283         let target_pubkey = target.public_key();
   5284         tangle_v2_event(
   5285             key,
   5286             created_at,
   5287             KIND_GROUP_PUT_USER.into(),
   5288             vec![
   5289                 Tag::from_parts("h", &[group_id])?,
   5290                 Tag::from_parts("p", &[target_pubkey.as_str()])?,
   5291             ],
   5292             "",
   5293         )
   5294     }
   5295 
   5296     fn tangle_v2_remove_user_event(
   5297         key: FixtureKey,
   5298         group_id: &str,
   5299         target: FixtureKey,
   5300         created_at: u64,
   5301     ) -> Result<Event, String> {
   5302         let target_pubkey = target.public_key();
   5303         tangle_v2_event(
   5304             key,
   5305             created_at,
   5306             KIND_GROUP_REMOVE_USER.into(),
   5307             vec![
   5308                 Tag::from_parts("h", &[group_id])?,
   5309                 Tag::from_parts("p", &[target_pubkey.as_str()])?,
   5310             ],
   5311             "",
   5312         )
   5313     }
   5314 
   5315     fn tangle_v2_join_event(
   5316         key: FixtureKey,
   5317         group_id: &str,
   5318         created_at: u64,
   5319     ) -> Result<Event, String> {
   5320         tangle_v2_group_event(
   5321             key,
   5322             group_id,
   5323             created_at,
   5324             KIND_GROUP_JOIN_REQUEST.into(),
   5325             "",
   5326         )
   5327     }
   5328 
   5329     fn tangle_v2_leave_event(
   5330         key: FixtureKey,
   5331         group_id: &str,
   5332         created_at: u64,
   5333     ) -> Result<Event, String> {
   5334         tangle_v2_group_event(
   5335             key,
   5336             group_id,
   5337             created_at,
   5338             KIND_GROUP_LEAVE_REQUEST.into(),
   5339             "",
   5340         )
   5341     }
   5342 
   5343     fn tangle_v2_delete_group_event(
   5344         key: FixtureKey,
   5345         group_id: &str,
   5346         created_at: u64,
   5347     ) -> Result<Event, String> {
   5348         tangle_v2_group_event(
   5349             key,
   5350             group_id,
   5351             created_at,
   5352             KIND_GROUP_DELETE_GROUP.into(),
   5353             "",
   5354         )
   5355     }
   5356 
   5357     fn tangle_v2_group_event(
   5358         key: FixtureKey,
   5359         group_id: &str,
   5360         created_at: u64,
   5361         kind: u64,
   5362         content: &str,
   5363     ) -> Result<Event, String> {
   5364         tangle_v2_event(
   5365             key,
   5366             created_at,
   5367             kind,
   5368             vec![Tag::from_parts("h", &[group_id])?],
   5369             content,
   5370         )
   5371     }
   5372 
   5373     fn runtime_pocket_group_create_event(
   5374         key: FixtureKey,
   5375         group_id: &str,
   5376         created_at: u64,
   5377         flags: &[&str],
   5378     ) -> PocketOwnedEvent {
   5379         let mut tags = vec![
   5380             Tag::from_parts("h", &[group_id]).expect("h"),
   5381             Tag::from_parts("name", &[group_id]).expect("name"),
   5382         ];
   5383         for flag in flags {
   5384             tags.push(Tag::from_parts(flag, &[]).expect("flag"));
   5385         }
   5386         runtime_pocket_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "")
   5387     }
   5388 
   5389     fn runtime_pocket_auth_event(
   5390         key: FixtureKey,
   5391         challenge: &str,
   5392         created_at: u64,
   5393     ) -> PocketOwnedEvent {
   5394         runtime_pocket_event(
   5395             key,
   5396             created_at,
   5397             22_242,
   5398             vec![
   5399                 Tag::from_parts("relay", &["wss://relay.radroots.test"]).expect("relay"),
   5400                 Tag::from_parts("challenge", &[challenge]).expect("challenge"),
   5401             ],
   5402             "",
   5403         )
   5404     }
   5405 
   5406     fn runtime_pocket_put_user_event(
   5407         key: FixtureKey,
   5408         group_id: &str,
   5409         target: FixtureKey,
   5410         created_at: u64,
   5411     ) -> PocketOwnedEvent {
   5412         let target_pubkey = target.public_key();
   5413         runtime_pocket_event(
   5414             key,
   5415             created_at,
   5416             KIND_GROUP_PUT_USER.into(),
   5417             vec![
   5418                 Tag::from_parts("h", &[group_id]).expect("h"),
   5419                 Tag::from_parts("p", &[target_pubkey.as_str()]).expect("p"),
   5420             ],
   5421             "",
   5422         )
   5423     }
   5424 
   5425     fn runtime_pocket_group_event(
   5426         key: FixtureKey,
   5427         group_id: &str,
   5428         created_at: u64,
   5429         kind: u64,
   5430         content: &str,
   5431     ) -> PocketOwnedEvent {
   5432         runtime_pocket_event(
   5433             key,
   5434             created_at,
   5435             kind,
   5436             vec![Tag::from_parts("h", &[group_id]).expect("h")],
   5437             content,
   5438         )
   5439     }
   5440 
   5441     fn runtime_pocket_event(
   5442         key: FixtureKey,
   5443         created_at: u64,
   5444         kind: u64,
   5445         tags: Vec<Tag>,
   5446         content: &str,
   5447     ) -> PocketOwnedEvent {
   5448         let tags = pocket_tags_from_protocol(&tags);
   5449         signed_pocket_event(
   5450             fixture_secret_byte(key),
   5451             created_at,
   5452             u16::try_from(kind).expect("pocket kind"),
   5453             &tags,
   5454             content.as_bytes(),
   5455         )
   5456     }
   5457 
   5458     fn runtime_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> {
   5459         let tags = event
   5460             .tags()
   5461             .map_err(|error| error.to_string())?
   5462             .iter()
   5463             .map(|tag| {
   5464                 Tag::new(
   5465                     tag.map(|value| {
   5466                         std::str::from_utf8(value)
   5467                             .map(str::to_owned)
   5468                             .map_err(|error| error.to_string())
   5469                     })
   5470                     .collect::<Result<Vec<_>, _>>()?,
   5471                 )
   5472                 .map_err(|error| error.to_string())
   5473             })
   5474             .collect::<Result<Vec<_>, _>>()?;
   5475         Ok(Event::new(
   5476             EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?,
   5477             UnsignedEvent::new(
   5478                 PublicKeyHex::new(&event.pubkey().as_hex_string())
   5479                     .map_err(|error| error.to_string())?,
   5480                 UnixTimestamp::new(event.created_at().as_u64()),
   5481                 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?,
   5482                 tags,
   5483                 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?,
   5484             ),
   5485             SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?,
   5486         ))
   5487     }
   5488 
   5489     fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags {
   5490         let parts = tags
   5491             .iter()
   5492             .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
   5493             .collect::<Vec<_>>();
   5494         PocketOwnedTags::new(&parts).expect("pocket tags")
   5495     }
   5496 
   5497     fn fixture_secret_byte(key: FixtureKey) -> u8 {
   5498         match key {
   5499             FixtureKey::Relay => 9,
   5500             FixtureKey::Admin => 11,
   5501             FixtureKey::Member => 12,
   5502             FixtureKey::Outsider => 13,
   5503             FixtureKey::Owner => 10,
   5504         }
   5505     }
   5506 
   5507     fn signed_pocket_event(
   5508         secret_byte: u8,
   5509         created_at: u64,
   5510         kind: u16,
   5511         tags: &PocketOwnedTags,
   5512         content: &[u8],
   5513     ) -> PocketOwnedEvent {
   5514         let secret = format!("{secret_byte:02x}").repeat(32);
   5515         RelaySigner::from_secret_hex(&secret)
   5516             .expect("signer")
   5517             .sign_pocket_event(
   5518                 PocketKind::from_u16(kind),
   5519                 tags,
   5520                 PocketTime::from_u64(created_at),
   5521                 content,
   5522             )
   5523             .expect("pocket event")
   5524     }
   5525 
   5526     fn temp_root(name: &str) -> PathBuf {
   5527         std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id()))
   5528     }
   5529 }