tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

core.rs (175602B)


      1 use crate::errors::{BaseRelayError, ok_accepted, ok_rejected};
      2 use crate::groups::{
      3     GroupEventWrite, GroupEventWriteError, GroupProjectionReadGuard, GroupServiceHandle,
      4 };
      5 use crate::logging::{self, TangleModerationAuditResult};
      6 use crate::ops::BaseRelayReadinessState;
      7 #[cfg(test)]
      8 use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket};
      9 use crate::pocket_event_validation::{
     10     is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind,
     11     pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature,
     12 };
     13 #[cfg(test)]
     14 use crate::relay::outbound::protocol_messages_for_test;
     15 use crate::relay::{
     16     auth::BaseAuthState,
     17     live::{CloseResult, LiveSubscriptionSet},
     18     outbound::RuntimeRelayMessage,
     19 };
     20 use std::{
     21     cell::{Cell, RefCell},
     22     collections::BTreeSet,
     23 };
     24 use tangle_groups::{
     25     GroupAuthContext, GroupEventClass, GroupEventView, GroupId, GroupRuntimeConfig,
     26     NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset, classify_group_event,
     27     validate_client_group_event_structure,
     28 };
     29 #[cfg(test)]
     30 use tangle_protocol::{ClientMessage, Event, Filter};
     31 use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp};
     32 use tangle_store_pocket::{
     33     PocketEvent, PocketFilter, PocketHll8, PocketOwnedEvent, PocketOwnedFilter, PocketQueryConfig,
     34     PocketScreenResult, PocketStoreConfig, PocketStoreHandle,
     35 };
     36 
     37 pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled";
     38 
     39 pub struct BaseRelay {
     40     store: PocketStoreHandle,
     41     subscriptions: LiveSubscriptionSet,
     42     groups: Option<GroupServiceHandle>,
     43     readiness: BaseRelayReadinessState,
     44     limits: BaseRelayLimits,
     45     query: PocketQueryConfig,
     46 }
     47 
     48 #[derive(Debug, Clone, PartialEq)]
     49 pub(crate) struct BaseRelayEventWrite {
     50     message: RelayMessage,
     51     stored_offsets: Vec<StoreOffset>,
     52 }
     53 
     54 impl BaseRelayEventWrite {
     55     fn stored(message: RelayMessage, stored_offsets: Vec<StoreOffset>) -> Self {
     56         Self {
     57             message,
     58             stored_offsets,
     59         }
     60     }
     61 
     62     fn unstored(message: RelayMessage) -> Self {
     63         Self {
     64             message,
     65             stored_offsets: Vec::new(),
     66         }
     67     }
     68 
     69     pub(crate) fn stored_offsets(&self) -> &[StoreOffset] {
     70         &self.stored_offsets
     71     }
     72 
     73     pub(crate) fn into_message(self) -> RelayMessage {
     74         self.message
     75     }
     76 }
     77 
     78 #[derive(Debug, Clone, PartialEq)]
     79 pub(crate) struct BaseRelayQueryReport {
     80     messages: Vec<RuntimeRelayMessage>,
     81     group_read_denied: bool,
     82     query_metrics: BaseRelayQueryMetrics,
     83 }
     84 
     85 pub(crate) struct BaseRelayReqQuery<'a> {
     86     subscription_id: SubscriptionId,
     87     filters: Vec<PocketOwnedFilter>,
     88     search_present: bool,
     89     auth: &'a BaseAuthState,
     90 }
     91 
     92 impl<'a> BaseRelayReqQuery<'a> {
     93     pub(crate) fn new(
     94         subscription_id: SubscriptionId,
     95         filters: Vec<PocketOwnedFilter>,
     96         search_present: bool,
     97         auth: &'a BaseAuthState,
     98     ) -> Self {
     99         Self {
    100             subscription_id,
    101             filters,
    102             search_present,
    103             auth,
    104         }
    105     }
    106 }
    107 
    108 struct BaseRelayGroupReqQuery<'a> {
    109     subscription_id: SubscriptionId,
    110     filters: Vec<PocketOwnedFilter>,
    111     search_present: bool,
    112     auth: &'a GroupAuthContext,
    113 }
    114 
    115 pub(crate) struct BaseRelayCountQuery<'a> {
    116     subscription_id: SubscriptionId,
    117     filters: Vec<PocketOwnedFilter>,
    118     search_present: bool,
    119     auth: &'a BaseAuthState,
    120 }
    121 
    122 impl<'a> BaseRelayCountQuery<'a> {
    123     pub(crate) fn new(
    124         subscription_id: SubscriptionId,
    125         filters: Vec<PocketOwnedFilter>,
    126         search_present: bool,
    127         auth: &'a BaseAuthState,
    128     ) -> Self {
    129         Self {
    130             subscription_id,
    131             filters,
    132             search_present,
    133             auth,
    134         }
    135     }
    136 }
    137 
    138 struct BaseRelayGroupCountQuery<'a> {
    139     subscription_id: SubscriptionId,
    140     filters: Vec<PocketOwnedFilter>,
    141     search_present: bool,
    142     auth: &'a GroupAuthContext,
    143 }
    144 
    145 impl BaseRelayQueryReport {
    146     fn new(
    147         messages: Vec<RuntimeRelayMessage>,
    148         group_read_denied: bool,
    149         query_metrics: BaseRelayQueryMetrics,
    150     ) -> Self {
    151         Self {
    152             messages,
    153             group_read_denied,
    154             query_metrics,
    155         }
    156     }
    157 
    158     pub(crate) fn group_read_denied(&self) -> bool {
    159         self.group_read_denied
    160     }
    161 
    162     pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics {
    163         self.query_metrics
    164     }
    165 
    166     pub(crate) fn into_messages(self) -> Vec<RuntimeRelayMessage> {
    167         self.messages
    168     }
    169 }
    170 
    171 #[derive(Debug, Clone, PartialEq)]
    172 pub(crate) struct BaseRelayCountReport {
    173     message: RelayMessage,
    174     group_read_denied: bool,
    175     query_metrics: BaseRelayQueryMetrics,
    176 }
    177 
    178 impl BaseRelayCountReport {
    179     fn new(
    180         message: RelayMessage,
    181         group_read_denied: bool,
    182         query_metrics: BaseRelayQueryMetrics,
    183     ) -> Self {
    184         Self {
    185             message,
    186             group_read_denied,
    187             query_metrics,
    188         }
    189     }
    190 
    191     pub(crate) fn group_read_denied(&self) -> bool {
    192         self.group_read_denied
    193     }
    194 
    195     pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics {
    196         self.query_metrics
    197     }
    198 
    199     pub(crate) fn into_message(self) -> RelayMessage {
    200         self.message
    201     }
    202 }
    203 
    204 #[derive(Debug, Clone, PartialEq)]
    205 struct BaseRelayEventQueryReport {
    206     events: Vec<PocketOwnedEvent>,
    207     group_read_denied: bool,
    208     query_metrics: BaseRelayQueryMetrics,
    209 }
    210 
    211 impl BaseRelayEventQueryReport {
    212     fn new(
    213         events: Vec<PocketOwnedEvent>,
    214         group_read_denied: bool,
    215         query_metrics: BaseRelayQueryMetrics,
    216     ) -> Self {
    217         Self {
    218             events,
    219             group_read_denied,
    220             query_metrics,
    221         }
    222     }
    223 }
    224 
    225 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
    226 pub(crate) struct BaseRelayQueryMetrics {
    227     candidates_scanned: u64,
    228     returned_events: u64,
    229     redacted_events: u64,
    230 }
    231 
    232 impl BaseRelayQueryMetrics {
    233     pub(crate) fn new(candidates_scanned: u64, returned_events: u64, redacted_events: u64) -> Self {
    234         Self {
    235             candidates_scanned,
    236             returned_events,
    237             redacted_events,
    238         }
    239     }
    240 
    241     fn add(self, other: Self) -> Self {
    242         Self {
    243             candidates_scanned: self
    244                 .candidates_scanned
    245                 .saturating_add(other.candidates_scanned),
    246             returned_events: self.returned_events.saturating_add(other.returned_events),
    247             redacted_events: self.redacted_events.saturating_add(other.redacted_events),
    248         }
    249     }
    250 
    251     fn with_returned_events(self, returned_events: usize) -> Self {
    252         Self {
    253             returned_events: u64::try_from(returned_events).expect("returned events fit in u64"),
    254             ..self
    255         }
    256     }
    257 
    258     pub(crate) fn candidates_scanned(self) -> u64 {
    259         self.candidates_scanned
    260     }
    261 
    262     pub(crate) fn returned_events(self) -> u64 {
    263         self.returned_events
    264     }
    265 
    266     pub(crate) fn redacted_events(self) -> u64 {
    267         self.redacted_events
    268     }
    269 }
    270 
    271 #[derive(Debug, Clone, PartialEq, Eq)]
    272 struct BaseRelayCountEventsReport {
    273     count: u64,
    274     hll: Option<String>,
    275     group_read_denied: bool,
    276     query_metrics: BaseRelayQueryMetrics,
    277 }
    278 
    279 impl BaseRelayCountEventsReport {
    280     fn new(
    281         count: u64,
    282         hll: Option<String>,
    283         group_read_denied: bool,
    284         query_metrics: BaseRelayQueryMetrics,
    285     ) -> Self {
    286         Self {
    287             count,
    288             hll,
    289             group_read_denied,
    290             query_metrics,
    291         }
    292     }
    293 }
    294 
    295 struct BaseRelayCountHll {
    296     offset: Option<usize>,
    297     hll: Option<PocketHll8>,
    298     suppressed: bool,
    299 }
    300 
    301 #[derive(Debug, Clone, PartialEq, Eq)]
    302 enum BaseRelayCountHllGroupTargets {
    303     None,
    304     Suppress,
    305     Targets(Vec<GroupId>),
    306 }
    307 
    308 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    309 enum BaseRelayCountHllTargetPolicy {
    310     Eligible,
    311     Suppress,
    312 }
    313 
    314 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    315 enum BaseRelayCountHllDTagMode {
    316     Ignore,
    317     Target,
    318     Suppress,
    319 }
    320 
    321 impl BaseRelayCountHll {
    322     fn new(filters: &[PocketOwnedFilter]) -> Result<Self, BaseRelayError> {
    323         let offset = BaseRelay::count_hll_offset(filters)?;
    324         Ok(Self {
    325             offset,
    326             hll: offset.map(|_| PocketHll8::new()),
    327             suppressed: false,
    328         })
    329     }
    330 
    331     fn suppress(&mut self) {
    332         if self.offset.is_some() {
    333             self.suppressed = true;
    334         }
    335     }
    336 
    337     fn suppress_for_filter_targets(
    338         &mut self,
    339         groups: Option<&GroupServiceHandle>,
    340         filters: &[PocketOwnedFilter],
    341     ) {
    342         if self.offset.is_none() {
    343             return;
    344         }
    345         let [filter] = filters else {
    346             return;
    347         };
    348         if BaseRelay::count_hll_filter_target_policy(groups, filter)
    349             == BaseRelayCountHllTargetPolicy::Suppress
    350         {
    351             self.suppress();
    352         }
    353     }
    354 
    355     fn observe(
    356         &mut self,
    357         groups: Option<&GroupServiceHandle>,
    358         event: &PocketEvent,
    359     ) -> Result<(), BaseRelayError> {
    360         let Some(offset) = self.offset else {
    361             return Ok(());
    362         };
    363         if BaseRelay::event_suppresses_count_hll(groups, event)? {
    364             self.suppressed = true;
    365             return Ok(());
    366         }
    367         if let Some(hll) = &mut self.hll {
    368             hll.add_element(event.pubkey().as_bytes(), offset)
    369                 .map_err(|error| BaseRelayError::error(error.to_string()))?;
    370         }
    371         Ok(())
    372     }
    373 
    374     fn into_hex(self) -> Option<String> {
    375         (!self.suppressed)
    376             .then(|| self.hll.map(|value| value.to_hex_string()))
    377             .flatten()
    378     }
    379 }
    380 
    381 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    382 enum BaseRelayFilterLimitMode {
    383     ApplyDefaultLimit,
    384     PreserveCountLimitless,
    385 }
    386 
    387 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    388 pub struct BaseRelayShutdownReport {
    389     closed_subscriptions: usize,
    390 }
    391 
    392 impl BaseRelayShutdownReport {
    393     pub fn new(closed_subscriptions: usize) -> Self {
    394         Self {
    395             closed_subscriptions,
    396         }
    397     }
    398 
    399     pub fn closed_subscriptions(self) -> usize {
    400         self.closed_subscriptions
    401     }
    402 }
    403 
    404 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    405 pub struct BaseRelayLimits {
    406     max_pending_events: usize,
    407     max_subscription_id_length: usize,
    408     max_subscriptions: usize,
    409     max_filters_per_request: usize,
    410     max_tag_values_per_filter: usize,
    411     max_query_complexity: usize,
    412     max_event_tags: usize,
    413     max_content_length: usize,
    414     max_limit: u64,
    415     default_limit: u64,
    416 }
    417 
    418 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    419 pub struct BaseRelayLimitSettings {
    420     pub max_pending_events: usize,
    421     pub max_subscription_id_length: usize,
    422     pub max_subscriptions: usize,
    423     pub max_filters_per_request: usize,
    424     pub max_tag_values_per_filter: usize,
    425     pub max_query_complexity: usize,
    426     pub max_event_tags: usize,
    427     pub max_content_length: usize,
    428     pub max_limit: u64,
    429     pub default_limit: u64,
    430 }
    431 
    432 impl BaseRelayLimits {
    433     pub fn new(settings: BaseRelayLimitSettings) -> Result<Self, BaseRelayError> {
    434         let max_pending_events = settings.max_pending_events;
    435         let max_subscription_id_length = settings.max_subscription_id_length;
    436         let max_subscriptions = settings.max_subscriptions;
    437         let max_filters_per_request = settings.max_filters_per_request;
    438         let max_tag_values_per_filter = settings.max_tag_values_per_filter;
    439         let max_query_complexity = settings.max_query_complexity;
    440         let max_event_tags = settings.max_event_tags;
    441         let max_content_length = settings.max_content_length;
    442         let max_limit = settings.max_limit;
    443         let default_limit = settings.default_limit;
    444         if max_pending_events == 0 {
    445             return Err(BaseRelayError::invalid(
    446                 "runtime max pending events must be greater than zero",
    447             ));
    448         }
    449         if max_subscription_id_length == 0 {
    450             return Err(BaseRelayError::invalid(
    451                 "runtime max subscription id length must be greater than zero",
    452             ));
    453         }
    454         if max_subscriptions == 0 {
    455             return Err(BaseRelayError::invalid(
    456                 "runtime max subscriptions per connection must be greater than zero",
    457             ));
    458         }
    459         if max_filters_per_request == 0 {
    460             return Err(BaseRelayError::invalid(
    461                 "runtime max filters per request must be greater than zero",
    462             ));
    463         }
    464         if max_tag_values_per_filter == 0 {
    465             return Err(BaseRelayError::invalid(
    466                 "runtime max tag values per filter must be greater than zero",
    467             ));
    468         }
    469         if max_query_complexity == 0 {
    470             return Err(BaseRelayError::invalid(
    471                 "runtime max query complexity must be greater than zero",
    472             ));
    473         }
    474         if max_event_tags == 0 {
    475             return Err(BaseRelayError::invalid(
    476                 "runtime max event tags must be greater than zero",
    477             ));
    478         }
    479         if max_content_length == 0 {
    480             return Err(BaseRelayError::invalid(
    481                 "runtime max content length must be greater than zero",
    482             ));
    483         }
    484         if max_limit == 0 {
    485             return Err(BaseRelayError::invalid(
    486                 "runtime max filter limit must be greater than zero",
    487             ));
    488         }
    489         if default_limit == 0 {
    490             return Err(BaseRelayError::invalid(
    491                 "runtime default filter limit must be greater than zero",
    492             ));
    493         }
    494         if default_limit > max_limit {
    495             return Err(BaseRelayError::invalid(
    496                 "runtime default filter limit must not exceed max filter limit",
    497             ));
    498         }
    499         if usize::try_from(default_limit).is_ok_and(|limit| limit > max_query_complexity) {
    500             return Err(BaseRelayError::invalid(
    501                 "runtime default filter limit must not exceed max query complexity",
    502             ));
    503         }
    504         Ok(Self {
    505             max_pending_events,
    506             max_subscription_id_length,
    507             max_subscriptions,
    508             max_filters_per_request,
    509             max_tag_values_per_filter,
    510             max_query_complexity,
    511             max_event_tags,
    512             max_content_length,
    513             max_limit,
    514             default_limit,
    515         })
    516     }
    517 
    518     pub fn max_pending_events(self) -> usize {
    519         self.max_pending_events
    520     }
    521 
    522     pub fn max_subscription_id_length(self) -> usize {
    523         self.max_subscription_id_length
    524     }
    525 
    526     pub fn max_subscriptions(self) -> usize {
    527         self.max_subscriptions
    528     }
    529 
    530     pub fn max_filters_per_request(self) -> usize {
    531         self.max_filters_per_request
    532     }
    533 
    534     pub fn max_tag_values_per_filter(self) -> usize {
    535         self.max_tag_values_per_filter
    536     }
    537 
    538     pub fn max_query_complexity(self) -> usize {
    539         self.max_query_complexity
    540     }
    541 
    542     pub fn max_event_tags(self) -> usize {
    543         self.max_event_tags
    544     }
    545 
    546     pub fn max_content_length(self) -> usize {
    547         self.max_content_length
    548     }
    549 
    550     pub fn max_limit(self) -> u64 {
    551         self.max_limit
    552     }
    553 
    554     pub fn default_limit(self) -> u64 {
    555         self.default_limit
    556     }
    557 
    558     #[cfg(test)]
    559     fn validate_protocol_event_for_test(&self, event: &Event) -> Result<(), BaseRelayError> {
    560         if event.unsigned().tags().len() > self.max_event_tags {
    561             return Err(BaseRelayError::invalid(format!(
    562                 "event tag count exceeds runtime max_event_tags {}",
    563                 self.max_event_tags
    564             )));
    565         }
    566         if event.unsigned().content().len() > self.max_content_length {
    567             return Err(BaseRelayError::invalid(format!(
    568                 "event content length exceeds runtime max_content_length {}",
    569                 self.max_content_length
    570             )));
    571         }
    572         Ok(())
    573     }
    574 
    575     pub(crate) fn validate_pocket_event(&self, event: &PocketEvent) -> Result<(), BaseRelayError> {
    576         validate_pocket_event_shape(event, self.max_event_tags, self.max_content_length)
    577     }
    578 
    579     pub fn validate_subscription_id(
    580         &self,
    581         subscription_id: &SubscriptionId,
    582     ) -> Result<(), BaseRelayError> {
    583         let actual = subscription_id.as_str().chars().count();
    584         if actual > self.max_subscription_id_length {
    585             return Err(BaseRelayError::invalid(format!(
    586                 "subscription id length exceeds runtime max_subid_length {}",
    587                 self.max_subscription_id_length
    588             )));
    589         }
    590         Ok(())
    591     }
    592 
    593     pub(crate) fn validate_pocket_filters(
    594         &self,
    595         filters: &[PocketOwnedFilter],
    596     ) -> Result<(), BaseRelayError> {
    597         if filters.is_empty() {
    598             return Err(BaseRelayError::invalid(
    599                 "request must include at least one filter",
    600             ));
    601         }
    602         if filters.len() > self.max_filters_per_request {
    603             return Err(BaseRelayError::invalid(format!(
    604                 "filter count exceeds runtime max_filters_per_request {}",
    605                 self.max_filters_per_request
    606             )));
    607         }
    608         for filter in filters {
    609             let tag_values = filter
    610                 .tags()
    611                 .map_err(|error| BaseRelayError::error(error.to_string()))?
    612                 .iter()
    613                 .map(|tag| tag.skip(1).count())
    614                 .sum::<usize>();
    615             if tag_values > self.max_tag_values_per_filter {
    616                 return Err(BaseRelayError::invalid(format!(
    617                     "filter tag value count exceeds runtime max_tag_values_per_filter {}",
    618                     self.max_tag_values_per_filter
    619                 )));
    620             }
    621             if filter.limit() != u32::MAX && u64::from(filter.limit()) > self.max_limit {
    622                 return Err(BaseRelayError::invalid(format!(
    623                     "filter limit exceeds runtime max_limit {}",
    624                     self.max_limit
    625                 )));
    626             }
    627         }
    628         self.validate_pocket_query_complexity(filters)?;
    629         Ok(())
    630     }
    631 
    632     fn effective_pocket_filter_limit(self, filter: &PocketFilter) -> usize {
    633         if filter.limit() == u32::MAX {
    634             usize::try_from(self.default_limit).unwrap_or(usize::MAX)
    635         } else {
    636             usize::try_from(filter.limit()).unwrap_or(usize::MAX)
    637         }
    638     }
    639 
    640     fn validate_pocket_query_complexity(
    641         &self,
    642         filters: &[PocketOwnedFilter],
    643     ) -> Result<(), BaseRelayError> {
    644         let score = filters
    645             .iter()
    646             .map(|filter| self.pocket_filter_complexity(filter))
    647             .fold(0_usize, usize::saturating_add);
    648         if score > self.max_query_complexity {
    649             return Err(BaseRelayError::invalid(format!(
    650                 "query complexity {score} exceeds runtime max_query_complexity {}",
    651                 self.max_query_complexity
    652             )));
    653         }
    654         Ok(())
    655     }
    656 
    657     fn pocket_filter_complexity(&self, filter: &PocketFilter) -> usize {
    658         let tag_score = filter
    659             .tags()
    660             .map(|tags| {
    661                 tags.iter()
    662                     .map(|tag| 1_usize.saturating_add(tag.skip(1).count()))
    663                     .fold(0_usize, usize::saturating_add)
    664             })
    665             .unwrap_or(usize::MAX);
    666         1_usize
    667             .saturating_add(filter.num_ids())
    668             .saturating_add(filter.num_authors())
    669             .saturating_add(filter.num_kinds())
    670             .saturating_add(tag_score)
    671             .saturating_add(usize::from(
    672                 filter.since() != tangle_store_pocket::PocketTime::min(),
    673             ))
    674             .saturating_add(usize::from(
    675                 filter.until() != tangle_store_pocket::PocketTime::max(),
    676             ))
    677             .saturating_add(self.effective_pocket_filter_limit(filter))
    678     }
    679 }
    680 
    681 impl BaseRelay {
    682     pub(crate) fn unsupported_search_present_closed(
    683         subscription_id: &SubscriptionId,
    684         search_present: bool,
    685     ) -> Option<RelayMessage> {
    686         search_present.then(|| RelayMessage::Closed {
    687             subscription_id: subscription_id.clone(),
    688             message: "unsupported: search filters are not supported".to_owned(),
    689         })
    690     }
    691 
    692     fn redacted_req_closed(
    693         subscription_id: SubscriptionId,
    694         auth: &GroupAuthContext,
    695     ) -> RelayMessage {
    696         let message = if auth.authenticated_pubkeys().is_empty() {
    697             BaseRelayError::auth_required("authentication required to read group events")
    698                 .prefixed_message()
    699         } else {
    700             BaseRelayError::restricted("group is unavailable").prefixed_message()
    701         };
    702         RelayMessage::Closed {
    703             subscription_id,
    704             message,
    705         }
    706     }
    707 
    708     pub fn open(
    709         config: &PocketStoreConfig,
    710         limits: BaseRelayLimits,
    711         query: PocketQueryConfig,
    712     ) -> Result<Self, BaseRelayError> {
    713         let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?;
    714         Self::new(store, limits, query)
    715     }
    716 
    717     pub fn open_with_groups(
    718         config: &PocketStoreConfig,
    719         limits: BaseRelayLimits,
    720         groups: &GroupRuntimeConfig,
    721         query: PocketQueryConfig,
    722     ) -> Result<Self, BaseRelayError> {
    723         let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?;
    724         Self::new_with_groups(store, limits, groups, query)
    725     }
    726 
    727     pub fn new(
    728         store: PocketStoreHandle,
    729         limits: BaseRelayLimits,
    730         query: PocketQueryConfig,
    731     ) -> Result<Self, BaseRelayError> {
    732         Self::new_with_groups(store, limits, &GroupRuntimeConfig::disabled(), query)
    733     }
    734 
    735     pub fn new_with_groups(
    736         store: PocketStoreHandle,
    737         limits: BaseRelayLimits,
    738         groups: &GroupRuntimeConfig,
    739         query: PocketQueryConfig,
    740     ) -> Result<Self, BaseRelayError> {
    741         let groups = GroupServiceHandle::from_config(&store, groups)?;
    742         let subscriptions =
    743             LiveSubscriptionSet::new(limits.max_pending_events(), limits.max_subscriptions())?;
    744         let readiness = BaseRelayReadinessState::runtime_ready_before_bind();
    745         Ok(Self {
    746             store,
    747             subscriptions,
    748             groups,
    749             readiness,
    750             limits,
    751             query,
    752         })
    753     }
    754 
    755     #[cfg(test)]
    756     pub fn handle_client_message(
    757         &mut self,
    758         message: ClientMessage,
    759         auth: &mut BaseAuthState,
    760         now: UnixTimestamp,
    761     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
    762         match message {
    763             ClientMessage::Event(event) => self
    764                 .handle_event_with_auth(event, auth)
    765                 .map(|message| vec![message]),
    766             ClientMessage::Req {
    767                 subscription_id,
    768                 filters,
    769             } => self.handle_protocol_req_with_auth_for_test(subscription_id, filters, auth),
    770             ClientMessage::Count {
    771                 subscription_id,
    772                 filters,
    773             } => {
    774                 let search_present = filters.iter().any(|filter| filter.search().is_some());
    775                 let filters = filters
    776                     .iter()
    777                     .map(tangle_filter_to_pocket)
    778                     .collect::<Result<Vec<_>, _>>()?;
    779                 self.handle_count_with_group_auth_report(
    780                     subscription_id,
    781                     filters,
    782                     search_present,
    783                     &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
    784                 )
    785                 .map(|report| vec![report.into_message()])
    786             }
    787             ClientMessage::Close(subscription_id) => {
    788                 self.handle_close(&subscription_id);
    789                 Ok(Vec::new())
    790             }
    791             ClientMessage::Auth(event) => Ok(self.handle_auth_message(event, auth, now)),
    792             ClientMessage::NegOpen {
    793                 subscription_id, ..
    794             }
    795             | ClientMessage::NegMsg {
    796                 subscription_id, ..
    797             } => Ok(vec![Self::disabled_negentropy_message(subscription_id)]),
    798             ClientMessage::NegClose(_) => Ok(Vec::new()),
    799         }
    800     }
    801 
    802     pub(crate) fn disabled_negentropy_message(subscription_id: SubscriptionId) -> RelayMessage {
    803         RelayMessage::NegErr {
    804             subscription_id,
    805             message: NEGENTROPY_DISABLED_MESSAGE.to_owned(),
    806         }
    807     }
    808 
    809     pub(crate) fn query_req_with_shared_services(
    810         store: &PocketStoreHandle,
    811         groups: Option<&GroupServiceHandle>,
    812         limits: BaseRelayLimits,
    813         query: PocketQueryConfig,
    814         request: BaseRelayReqQuery<'_>,
    815     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
    816         let group_auth =
    817             GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned());
    818         Self::query_req_with_group_auth_shared_services(
    819             store,
    820             groups,
    821             limits,
    822             query,
    823             BaseRelayGroupReqQuery {
    824                 subscription_id: request.subscription_id,
    825                 filters: request.filters,
    826                 search_present: request.search_present,
    827                 auth: &group_auth,
    828             },
    829         )
    830     }
    831 
    832     fn event_by_offset(&self, offset: StoreOffset) -> Result<PocketOwnedEvent, BaseRelayError> {
    833         self.store
    834             .event_by_offset(offset.as_u64())
    835             .map_err(BaseRelayError::from)
    836     }
    837 
    838     pub fn event_by_offset_with_auth(
    839         &self,
    840         offset: StoreOffset,
    841         auth: &BaseAuthState,
    842     ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> {
    843         let event = self.event_by_offset(offset)?;
    844         if Self::group_read_gate_visible_to_auth(
    845             self.groups.as_ref(),
    846             &event,
    847             &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
    848         )? {
    849             Ok(Some(event))
    850         } else {
    851             Ok(None)
    852         }
    853     }
    854 
    855     #[cfg(test)]
    856     fn handle_auth_message(
    857         &self,
    858         event: Event,
    859         auth: &mut BaseAuthState,
    860         now: UnixTimestamp,
    861     ) -> Vec<RelayMessage> {
    862         Self::handle_auth_with_limits(self.limits, event, auth, now)
    863     }
    864 
    865     #[cfg(test)]
    866     pub(crate) fn handle_auth_with_limits(
    867         limits: BaseRelayLimits,
    868         event: Event,
    869         auth: &mut BaseAuthState,
    870         now: UnixTimestamp,
    871     ) -> Vec<RelayMessage> {
    872         if let Err(error) = limits.validate_protocol_event_for_test(&event) {
    873             return vec![RelayMessage::Ok {
    874                 event_id: event.id().clone(),
    875                 accepted: false,
    876                 message: error.prefixed_message(),
    877             }];
    878         }
    879         auth.authenticate(&event, now)
    880             .map(|_| {
    881                 vec![RelayMessage::Ok {
    882                     event_id: event.id().clone(),
    883                     accepted: true,
    884                     message: String::new(),
    885                 }]
    886             })
    887             .unwrap_or_else(|error| {
    888                 vec![RelayMessage::Ok {
    889                     event_id: event.id().clone(),
    890                     accepted: false,
    891                     message: error.prefixed_message(),
    892                 }]
    893             })
    894     }
    895 
    896     pub(crate) fn handle_pocket_auth_with_limits(
    897         limits: BaseRelayLimits,
    898         event: &PocketEvent,
    899         auth: &mut BaseAuthState,
    900         now: UnixTimestamp,
    901     ) -> Vec<RelayMessage> {
    902         let event_id =
    903             pocket_runtime_event_id(event).expect("Pocket event id is valid hex by construction");
    904         if let Err(error) = limits.validate_pocket_event(event) {
    905             return vec![RelayMessage::Ok {
    906                 event_id,
    907                 accepted: false,
    908                 message: error.prefixed_message(),
    909             }];
    910         }
    911         auth.authenticate_pocket(event, now)
    912             .map(|_| {
    913                 vec![RelayMessage::Ok {
    914                     event_id: event_id.clone(),
    915                     accepted: true,
    916                     message: String::new(),
    917                 }]
    918             })
    919             .unwrap_or_else(|error| {
    920                 vec![RelayMessage::Ok {
    921                     event_id,
    922                     accepted: false,
    923                     message: error.prefixed_message(),
    924                 }]
    925             })
    926     }
    927 
    928     #[cfg(test)]
    929     pub fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> {
    930         self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated())
    931             .map(BaseRelayEventWrite::into_message)
    932     }
    933 
    934     #[cfg(test)]
    935     pub fn handle_event_with_auth(
    936         &self,
    937         event: Event,
    938         auth: &BaseAuthState,
    939     ) -> Result<RelayMessage, BaseRelayError> {
    940         self.handle_event_with_auth_report(event, auth)
    941             .map(BaseRelayEventWrite::into_message)
    942     }
    943 
    944     pub fn handle_pocket_event(&self, event: &PocketEvent) -> Result<RelayMessage, BaseRelayError> {
    945         self.handle_pocket_event_with_group_auth(event, &GroupAuthContext::unauthenticated())
    946             .map(BaseRelayEventWrite::into_message)
    947     }
    948 
    949     pub fn handle_pocket_event_with_auth(
    950         &self,
    951         event: &PocketEvent,
    952         auth: &BaseAuthState,
    953     ) -> Result<RelayMessage, BaseRelayError> {
    954         self.handle_pocket_event_with_auth_report(event, auth)
    955             .map(BaseRelayEventWrite::into_message)
    956     }
    957 
    958     #[cfg(test)]
    959     pub(crate) fn handle_event_with_auth_report(
    960         &self,
    961         event: Event,
    962         auth: &BaseAuthState,
    963     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
    964         Self::handle_event_with_shared_services(
    965             &self.store,
    966             self.groups.as_ref(),
    967             self.limits,
    968             event,
    969             auth,
    970         )
    971     }
    972 
    973     #[cfg(test)]
    974     pub(crate) fn handle_event_with_shared_services(
    975         store: &PocketStoreHandle,
    976         groups: Option<&GroupServiceHandle>,
    977         limits: BaseRelayLimits,
    978         event: Event,
    979         auth: &BaseAuthState,
    980     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
    981         Self::handle_event_with_group_auth_and_services(
    982             store,
    983             groups,
    984             limits,
    985             event,
    986             &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
    987         )
    988     }
    989 
    990     pub(crate) fn handle_pocket_event_with_auth_report(
    991         &self,
    992         event: &PocketEvent,
    993         auth: &BaseAuthState,
    994     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
    995         Self::handle_pocket_event_with_shared_services(
    996             &self.store,
    997             self.groups.as_ref(),
    998             self.limits,
    999             event,
   1000             auth,
   1001         )
   1002     }
   1003 
   1004     pub(crate) fn handle_pocket_event_with_shared_services(
   1005         store: &PocketStoreHandle,
   1006         groups: Option<&GroupServiceHandle>,
   1007         limits: BaseRelayLimits,
   1008         event: &PocketEvent,
   1009         auth: &BaseAuthState,
   1010     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
   1011         Self::handle_pocket_event_with_group_auth_and_services(
   1012             store,
   1013             groups,
   1014             limits,
   1015             event,
   1016             &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
   1017         )
   1018     }
   1019 
   1020     pub fn groups_enabled(&self) -> bool {
   1021         self.groups.is_some()
   1022     }
   1023 
   1024     pub(crate) fn store_handle(&self) -> PocketStoreHandle {
   1025         self.store.clone()
   1026     }
   1027 
   1028     pub fn group_projection(&self) -> Option<GroupProjectionReadGuard<'_>> {
   1029         self.groups.as_ref().map(GroupServiceHandle::projection)
   1030     }
   1031 
   1032     pub(crate) fn group_service_handle(&self) -> Option<GroupServiceHandle> {
   1033         self.groups.clone()
   1034     }
   1035 
   1036     pub(crate) fn group_outbox_pending_events(&self) -> usize {
   1037         self.groups
   1038             .as_ref()
   1039             .map(GroupServiceHandle::outbox_pending_events)
   1040             .unwrap_or(0)
   1041     }
   1042 
   1043     pub fn readiness_state(&self) -> BaseRelayReadinessState {
   1044         self.readiness.clone()
   1045     }
   1046 
   1047     pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
   1048         let closed = self.subscriptions.close_all();
   1049         self.store.sync()?;
   1050         Ok(BaseRelayShutdownReport::new(closed))
   1051     }
   1052 
   1053     #[cfg(test)]
   1054     fn handle_event_with_group_auth(
   1055         &self,
   1056         event: Event,
   1057         auth: &GroupAuthContext,
   1058     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
   1059         Self::handle_event_with_group_auth_and_services(
   1060             &self.store,
   1061             self.groups.as_ref(),
   1062             self.limits,
   1063             event,
   1064             auth,
   1065         )
   1066     }
   1067 
   1068     #[cfg(test)]
   1069     fn handle_event_with_group_auth_and_services(
   1070         store: &PocketStoreHandle,
   1071         groups: Option<&GroupServiceHandle>,
   1072         limits: BaseRelayLimits,
   1073         event: Event,
   1074         auth: &GroupAuthContext,
   1075     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
   1076         let pocket_event = tangle_event_to_pocket(&event)?;
   1077         Self::handle_pocket_event_with_group_auth_and_services(
   1078             store,
   1079             groups,
   1080             limits,
   1081             &pocket_event,
   1082             auth,
   1083         )
   1084     }
   1085 
   1086     fn handle_pocket_event_with_group_auth(
   1087         &self,
   1088         event: &PocketEvent,
   1089         auth: &GroupAuthContext,
   1090     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
   1091         Self::handle_pocket_event_with_group_auth_and_services(
   1092             &self.store,
   1093             self.groups.as_ref(),
   1094             self.limits,
   1095             event,
   1096             auth,
   1097         )
   1098     }
   1099 
   1100     fn handle_pocket_event_with_group_auth_and_services(
   1101         store: &PocketStoreHandle,
   1102         groups: Option<&GroupServiceHandle>,
   1103         limits: BaseRelayLimits,
   1104         event: &PocketEvent,
   1105         auth: &GroupAuthContext,
   1106     ) -> Result<BaseRelayEventWrite, BaseRelayError> {
   1107         let event_id = pocket_runtime_event_id(event)?;
   1108         if let Err(error) = limits.validate_pocket_event(event) {
   1109             return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1110                 event_id,
   1111                 error.prefixed_message(),
   1112             )));
   1113         }
   1114         if let Err(error) = verify_pocket_event_signature(event) {
   1115             return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1116                 event_id,
   1117                 error.prefixed_message(),
   1118             )));
   1119         }
   1120         let pubkey = pocket_event_pubkey(event)?;
   1121         if is_pocket_nip70_protected_event(event)? && !auth.contains(&pubkey) {
   1122             return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1123                 event_id,
   1124                 BaseRelayError::auth_required(
   1125                     "protected event requires authenticated event author",
   1126                 )
   1127                 .prefixed_message(),
   1128             )));
   1129         }
   1130         let group_limits = groups.map(GroupServiceHandle::limits).unwrap_or_default();
   1131         let audit_class = classify_group_event(event, group_limits).ok();
   1132         let class = match validate_client_group_event_structure(event, group_limits) {
   1133             Ok(class) => class,
   1134             Err(error) => {
   1135                 if let Some(class) = audit_class.as_ref() {
   1136                     logging::log_group_moderation_audit(
   1137                         event,
   1138                         class,
   1139                         TangleModerationAuditResult::Rejected,
   1140                     );
   1141                 }
   1142                 return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1143                     event_id,
   1144                     error.prefixed_message(),
   1145                 )));
   1146             }
   1147         };
   1148         if !matches!(class, GroupEventClass::NonGroup) {
   1149             let Some(groups) = groups else {
   1150                 logging::log_group_moderation_audit(
   1151                     event,
   1152                     &class,
   1153                     TangleModerationAuditResult::Rejected,
   1154                 );
   1155                 return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1156                     event_id,
   1157                     "blocked: NIP-29 group events are not accepted before group service".to_owned(),
   1158                 )));
   1159             };
   1160             match groups.store_group_pocket_event(store, event, &class, auth) {
   1161                 Ok(GroupEventWrite::Stored(stored_offsets)) => {
   1162                     logging::log_group_moderation_audit(
   1163                         event,
   1164                         &class,
   1165                         TangleModerationAuditResult::Accepted,
   1166                     );
   1167                     return Ok(BaseRelayEventWrite::stored(
   1168                         ok_accepted(event_id, String::new()),
   1169                         stored_offsets,
   1170                     ));
   1171                 }
   1172                 Ok(GroupEventWrite::Duplicate) => {
   1173                     logging::log_group_moderation_audit(
   1174                         event,
   1175                         &class,
   1176                         TangleModerationAuditResult::Accepted,
   1177                     );
   1178                     return Ok(BaseRelayEventWrite::unstored(ok_accepted(
   1179                         event_id,
   1180                         "duplicate: already have this event".to_owned(),
   1181                     )));
   1182                 }
   1183                 Err(GroupEventWriteError::Rejected(error)) => {
   1184                     logging::log_group_moderation_audit(
   1185                         event,
   1186                         &class,
   1187                         TangleModerationAuditResult::Rejected,
   1188                     );
   1189                     return Ok(BaseRelayEventWrite::unstored(ok_rejected(
   1190                         event_id,
   1191                         error.prefixed_message(),
   1192                     )));
   1193                 }
   1194                 Err(GroupEventWriteError::Storage(error)) => return Err(error),
   1195             }
   1196         }
   1197         if pocket_event_kind(event)?.is_ephemeral() {
   1198             return Ok(BaseRelayEventWrite::unstored(ok_accepted(
   1199                 event_id,
   1200                 String::new(),
   1201             )));
   1202         }
   1203         if store.event_by_id(event.id())?.is_some() {
   1204             return Ok(BaseRelayEventWrite::unstored(ok_accepted(
   1205                 event_id,
   1206                 "duplicate: already have this event".to_owned(),
   1207             )));
   1208         }
   1209         let store_offset = StoreOffset::new(store.store_event(event)?);
   1210         Ok(BaseRelayEventWrite::stored(
   1211             ok_accepted(event_id, String::new()),
   1212             vec![store_offset],
   1213         ))
   1214     }
   1215 
   1216     pub fn handle_pocket_req(
   1217         &mut self,
   1218         subscription_id: SubscriptionId,
   1219         filters: Vec<PocketOwnedFilter>,
   1220     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
   1221         self.handle_pocket_req_with_group_auth(
   1222             subscription_id,
   1223             filters,
   1224             &GroupAuthContext::unauthenticated(),
   1225         )
   1226     }
   1227 
   1228     #[cfg(test)]
   1229     pub fn handle_protocol_req_for_test(
   1230         &mut self,
   1231         subscription_id: SubscriptionId,
   1232         filters: Vec<Filter>,
   1233     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1234         self.handle_protocol_req_with_group_auth_for_test(
   1235             subscription_id,
   1236             filters,
   1237             &GroupAuthContext::unauthenticated(),
   1238         )
   1239     }
   1240 
   1241     #[cfg(test)]
   1242     pub fn handle_protocol_req_with_auth_for_test(
   1243         &mut self,
   1244         subscription_id: SubscriptionId,
   1245         filters: Vec<Filter>,
   1246         auth: &BaseAuthState,
   1247     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1248         self.handle_protocol_req_with_group_auth_for_test(
   1249             subscription_id,
   1250             filters,
   1251             &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
   1252         )
   1253     }
   1254 
   1255     #[cfg(test)]
   1256     fn handle_protocol_req_with_group_auth_for_test(
   1257         &mut self,
   1258         subscription_id: SubscriptionId,
   1259         filters: Vec<Filter>,
   1260         auth: &GroupAuthContext,
   1261     ) -> Result<Vec<RelayMessage>, BaseRelayError> {
   1262         self.handle_protocol_req_with_group_auth_report_for_test(subscription_id, filters, auth)
   1263             .map(BaseRelayQueryReport::into_messages)
   1264             .and_then(protocol_messages_for_test)
   1265     }
   1266 
   1267     pub fn handle_pocket_req_with_auth(
   1268         &mut self,
   1269         subscription_id: SubscriptionId,
   1270         filters: Vec<PocketOwnedFilter>,
   1271         auth: &BaseAuthState,
   1272     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
   1273         self.handle_pocket_req_with_group_auth(
   1274             subscription_id,
   1275             filters,
   1276             &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
   1277         )
   1278     }
   1279 
   1280     fn handle_pocket_req_with_group_auth(
   1281         &mut self,
   1282         subscription_id: SubscriptionId,
   1283         filters: Vec<PocketOwnedFilter>,
   1284         auth: &GroupAuthContext,
   1285     ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
   1286         self.handle_pocket_req_with_group_auth_report(subscription_id, filters, false, auth)
   1287             .map(BaseRelayQueryReport::into_messages)
   1288     }
   1289 
   1290     #[cfg(test)]
   1291     fn handle_protocol_req_with_group_auth_report_for_test(
   1292         &mut self,
   1293         subscription_id: SubscriptionId,
   1294         filters: Vec<Filter>,
   1295         auth: &GroupAuthContext,
   1296     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
   1297         let search_present = filters.iter().any(|filter| filter.search().is_some());
   1298         let filters = filters
   1299             .iter()
   1300             .map(tangle_filter_to_pocket)
   1301             .collect::<Result<Vec<_>, _>>()?;
   1302         self.handle_pocket_req_with_group_auth_report(
   1303             subscription_id,
   1304             filters,
   1305             search_present,
   1306             auth,
   1307         )
   1308     }
   1309 
   1310     fn handle_pocket_req_with_group_auth_report(
   1311         &mut self,
   1312         subscription_id: SubscriptionId,
   1313         filters: Vec<PocketOwnedFilter>,
   1314         search_present: bool,
   1315         auth: &GroupAuthContext,
   1316     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
   1317         self.limits.validate_subscription_id(&subscription_id)?;
   1318         self.limits.validate_pocket_filters(&filters)?;
   1319         if let Some(message) =
   1320             Self::unsupported_search_present_closed(&subscription_id, search_present)
   1321         {
   1322             return Ok(BaseRelayQueryReport::new(
   1323                 vec![message.into()],
   1324                 false,
   1325                 BaseRelayQueryMetrics::default(),
   1326             ));
   1327         }
   1328         let should_subscribe = !pocket_filters_are_complete(&filters);
   1329         if should_subscribe {
   1330             self.subscriptions
   1331                 .ensure_can_subscribe(&subscription_id, &filters)?;
   1332             let report = self.query_req_with_group_auth_report(
   1333                 subscription_id.clone(),
   1334                 filters.clone(),
   1335                 false,
   1336                 auth,
   1337             )?;
   1338             if !report.group_read_denied() {
   1339                 self.subscriptions.subscribe(subscription_id, filters)?;
   1340             }
   1341             return Ok(report);
   1342         }
   1343         self.query_req_with_group_auth_report(subscription_id, filters, false, auth)
   1344     }
   1345 
   1346     fn query_req_with_group_auth_report(
   1347         &self,
   1348         subscription_id: SubscriptionId,
   1349         filters: Vec<PocketOwnedFilter>,
   1350         search_present: bool,
   1351         auth: &GroupAuthContext,
   1352     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
   1353         Self::query_req_with_group_auth_shared_services(
   1354             &self.store,
   1355             self.groups.as_ref(),
   1356             self.limits,
   1357             self.query,
   1358             BaseRelayGroupReqQuery {
   1359                 subscription_id,
   1360                 filters,
   1361                 search_present,
   1362                 auth,
   1363             },
   1364         )
   1365     }
   1366 
   1367     fn query_req_with_group_auth_shared_services(
   1368         store: &PocketStoreHandle,
   1369         groups: Option<&GroupServiceHandle>,
   1370         limits: BaseRelayLimits,
   1371         query: PocketQueryConfig,
   1372         request: BaseRelayGroupReqQuery<'_>,
   1373     ) -> Result<BaseRelayQueryReport, BaseRelayError> {
   1374         let BaseRelayGroupReqQuery {
   1375             subscription_id,
   1376             filters,
   1377             search_present,
   1378             auth,
   1379         } = request;
   1380         limits.validate_subscription_id(&subscription_id)?;
   1381         limits.validate_pocket_filters(&filters)?;
   1382         if let Some(message) =
   1383             Self::unsupported_search_present_closed(&subscription_id, search_present)
   1384         {
   1385             return Ok(BaseRelayQueryReport::new(
   1386                 vec![message.into()],
   1387                 false,
   1388                 BaseRelayQueryMetrics::default(),
   1389             ));
   1390         }
   1391         let report =
   1392             Self::query_events_report_with_services(store, groups, limits, query, &filters, auth)?;
   1393         let group_read_denied = report.group_read_denied;
   1394         let query_metrics = report.query_metrics;
   1395         let mut messages = report
   1396             .events
   1397             .into_iter()
   1398             .map(|event| RuntimeRelayMessage::event(subscription_id.clone(), event))
   1399             .collect::<Vec<_>>();
   1400         if group_read_denied {
   1401             messages.push(Self::redacted_req_closed(subscription_id, auth).into());
   1402         } else {
   1403             messages.push(RelayMessage::Eose(subscription_id).into());
   1404         }
   1405         Ok(BaseRelayQueryReport::new(
   1406             messages,
   1407             group_read_denied,
   1408             query_metrics,
   1409         ))
   1410     }
   1411 
   1412     pub fn handle_count(
   1413         &self,
   1414         subscription_id: SubscriptionId,
   1415         filters: Vec<PocketOwnedFilter>,
   1416     ) -> Result<RelayMessage, BaseRelayError> {
   1417         self.handle_count_with_group_auth(
   1418             subscription_id,
   1419             filters,
   1420             &GroupAuthContext::unauthenticated(),
   1421         )
   1422     }
   1423 
   1424     pub fn handle_count_with_auth(
   1425         &self,
   1426         subscription_id: SubscriptionId,
   1427         filters: Vec<PocketOwnedFilter>,
   1428         auth: &BaseAuthState,
   1429     ) -> Result<RelayMessage, BaseRelayError> {
   1430         self.handle_count_with_auth_report(subscription_id, filters, auth)
   1431             .map(BaseRelayCountReport::into_message)
   1432     }
   1433 
   1434     pub(crate) fn handle_count_with_auth_report(
   1435         &self,
   1436         subscription_id: SubscriptionId,
   1437         filters: Vec<PocketOwnedFilter>,
   1438         auth: &BaseAuthState,
   1439     ) -> Result<BaseRelayCountReport, BaseRelayError> {
   1440         Self::handle_count_with_shared_services(
   1441             &self.store,
   1442             self.groups.as_ref(),
   1443             self.limits,
   1444             self.query,
   1445             BaseRelayCountQuery::new(subscription_id, filters, false, auth),
   1446         )
   1447     }
   1448 
   1449     pub(crate) fn handle_count_with_shared_services(
   1450         store: &PocketStoreHandle,
   1451         groups: Option<&GroupServiceHandle>,
   1452         limits: BaseRelayLimits,
   1453         query: PocketQueryConfig,
   1454         request: BaseRelayCountQuery<'_>,
   1455     ) -> Result<BaseRelayCountReport, BaseRelayError> {
   1456         let group_auth =
   1457             GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned());
   1458         Self::handle_count_with_group_auth_shared_services(
   1459             store,
   1460             groups,
   1461             limits,
   1462             query,
   1463             BaseRelayGroupCountQuery {
   1464                 subscription_id: request.subscription_id,
   1465                 filters: request.filters,
   1466                 search_present: request.search_present,
   1467                 auth: &group_auth,
   1468             },
   1469         )
   1470     }
   1471 
   1472     fn handle_count_with_group_auth(
   1473         &self,
   1474         subscription_id: SubscriptionId,
   1475         filters: Vec<PocketOwnedFilter>,
   1476         auth: &GroupAuthContext,
   1477     ) -> Result<RelayMessage, BaseRelayError> {
   1478         self.handle_count_with_group_auth_report(subscription_id, filters, false, auth)
   1479             .map(BaseRelayCountReport::into_message)
   1480     }
   1481 
   1482     fn handle_count_with_group_auth_report(
   1483         &self,
   1484         subscription_id: SubscriptionId,
   1485         filters: Vec<PocketOwnedFilter>,
   1486         search_present: bool,
   1487         auth: &GroupAuthContext,
   1488     ) -> Result<BaseRelayCountReport, BaseRelayError> {
   1489         Self::handle_count_with_group_auth_shared_services(
   1490             &self.store,
   1491             self.groups.as_ref(),
   1492             self.limits,
   1493             self.query,
   1494             BaseRelayGroupCountQuery {
   1495                 subscription_id,
   1496                 filters,
   1497                 search_present,
   1498                 auth,
   1499             },
   1500         )
   1501     }
   1502 
   1503     fn handle_count_with_group_auth_shared_services(
   1504         store: &PocketStoreHandle,
   1505         groups: Option<&GroupServiceHandle>,
   1506         limits: BaseRelayLimits,
   1507         query: PocketQueryConfig,
   1508         request: BaseRelayGroupCountQuery<'_>,
   1509     ) -> Result<BaseRelayCountReport, BaseRelayError> {
   1510         let BaseRelayGroupCountQuery {
   1511             subscription_id,
   1512             filters,
   1513             search_present,
   1514             auth,
   1515         } = request;
   1516         limits.validate_subscription_id(&subscription_id)?;
   1517         limits.validate_pocket_filters(&filters)?;
   1518         if let Some(message) =
   1519             Self::unsupported_search_present_closed(&subscription_id, search_present)
   1520         {
   1521             return Ok(BaseRelayCountReport::new(
   1522                 message,
   1523                 false,
   1524                 BaseRelayQueryMetrics::default(),
   1525             ));
   1526         }
   1527         let report =
   1528             Self::count_events_report_with_services(store, groups, limits, query, &filters, auth)?;
   1529         Ok(BaseRelayCountReport::new(
   1530             RelayMessage::Count {
   1531                 subscription_id,
   1532                 count: report.count,
   1533                 hll: report.hll,
   1534             },
   1535             report.group_read_denied,
   1536             report.query_metrics,
   1537         ))
   1538     }
   1539 
   1540     pub fn handle_close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
   1541         self.subscriptions.close(subscription_id)
   1542     }
   1543 
   1544     pub fn fanout_pocket(&mut self, event: &PocketEvent) -> Vec<RuntimeRelayMessage> {
   1545         self.fanout_pocket_with_group_auth(event, &GroupAuthContext::unauthenticated())
   1546     }
   1547 
   1548     pub fn fanout_pocket_with_group_auth(
   1549         &mut self,
   1550         event: &PocketEvent,
   1551         auth: &GroupAuthContext,
   1552     ) -> Vec<RuntimeRelayMessage> {
   1553         let groups = self.groups.as_ref();
   1554         self.subscriptions
   1555             .fanout(event, auth, |event, auth| {
   1556                 Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false)
   1557             })
   1558             .expect("Pocket live fanout must match")
   1559             .into_iter()
   1560             .map(|subscription_id| RuntimeRelayMessage::Event {
   1561                 subscription_id,
   1562                 event: event.to_owned(),
   1563             })
   1564             .collect()
   1565     }
   1566 
   1567     #[cfg(test)]
   1568     pub fn fanout_protocol_for_test(&mut self, event: &Event) -> Vec<RelayMessage> {
   1569         self.fanout_protocol_with_group_auth_for_test(event, &GroupAuthContext::unauthenticated())
   1570     }
   1571 
   1572     #[cfg(test)]
   1573     pub fn fanout_protocol_with_group_auth_for_test(
   1574         &mut self,
   1575         event: &Event,
   1576         auth: &GroupAuthContext,
   1577     ) -> Vec<RelayMessage> {
   1578         let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket");
   1579         protocol_messages_for_test(self.fanout_pocket_with_group_auth(&pocket_event, auth))
   1580             .expect("test protocol fanout must convert")
   1581     }
   1582 
   1583     pub fn active_subscription_count(&self) -> usize {
   1584         self.subscriptions.active_count()
   1585     }
   1586 
   1587     fn query_events_report_with_services(
   1588         store: &PocketStoreHandle,
   1589         groups: Option<&GroupServiceHandle>,
   1590         limits: BaseRelayLimits,
   1591         query: PocketQueryConfig,
   1592         filters: &[PocketOwnedFilter],
   1593         auth: &GroupAuthContext,
   1594     ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
   1595         let mut output = Vec::new();
   1596         let mut group_read_denied = false;
   1597         let mut query_metrics = BaseRelayQueryMetrics::default();
   1598         for filter in filters {
   1599             let report = Self::query_filter_events_report_with_services(
   1600                 store,
   1601                 groups,
   1602                 limits,
   1603                 query,
   1604                 filter,
   1605                 auth,
   1606                 BaseRelayFilterLimitMode::ApplyDefaultLimit,
   1607             )?;
   1608             group_read_denied |= report.group_read_denied;
   1609             query_metrics = query_metrics.add(report.query_metrics);
   1610             let mut events = Self::sort_and_dedupe_query_events(report.events);
   1611             events.truncate(limits.effective_pocket_filter_limit(filter));
   1612             output.extend(events);
   1613         }
   1614         let events = Self::sort_and_dedupe_query_events(output);
   1615         query_metrics = query_metrics.with_returned_events(events.len());
   1616         Ok(BaseRelayEventQueryReport::new(
   1617             events,
   1618             group_read_denied,
   1619             query_metrics,
   1620         ))
   1621     }
   1622 
   1623     fn count_events_report_with_services(
   1624         store: &PocketStoreHandle,
   1625         groups: Option<&GroupServiceHandle>,
   1626         limits: BaseRelayLimits,
   1627         query: PocketQueryConfig,
   1628         filters: &[PocketOwnedFilter],
   1629         auth: &GroupAuthContext,
   1630     ) -> Result<BaseRelayCountEventsReport, BaseRelayError> {
   1631         let mut seen = BTreeSet::new();
   1632         let mut group_read_denied = false;
   1633         let mut query_metrics = BaseRelayQueryMetrics::default();
   1634         let count_query = query.exact_count();
   1635         let mut hll = BaseRelayCountHll::new(filters)?;
   1636         hll.suppress_for_filter_targets(groups, filters);
   1637         for filter in filters {
   1638             let report = Self::query_filter_events_report_with_services(
   1639                 store,
   1640                 groups,
   1641                 limits,
   1642                 count_query,
   1643                 filter,
   1644                 auth,
   1645                 BaseRelayFilterLimitMode::PreserveCountLimitless,
   1646             )?;
   1647             group_read_denied |= report.group_read_denied;
   1648             if report.group_read_denied {
   1649                 hll.suppress();
   1650             }
   1651             query_metrics = query_metrics.add(report.query_metrics);
   1652             for event in report.events {
   1653                 let event: &PocketEvent = &event;
   1654                 hll.observe(groups, event)?;
   1655                 seen.insert(event.id());
   1656             }
   1657         }
   1658         let count = u64::try_from(seen.len())
   1659             .map_err(|_| BaseRelayError::error("visible event count overflow"))?;
   1660         let hll = hll.into_hex();
   1661         Ok(BaseRelayCountEventsReport::new(
   1662             count,
   1663             hll,
   1664             group_read_denied,
   1665             query_metrics,
   1666         ))
   1667     }
   1668 
   1669     fn count_hll_offset(filters: &[PocketOwnedFilter]) -> Result<Option<usize>, BaseRelayError> {
   1670         let [filter] = filters else {
   1671             return Ok(None);
   1672         };
   1673         filter
   1674             .hyperloglog_offset()
   1675             .map_err(|error| BaseRelayError::error(error.to_string()))
   1676     }
   1677 
   1678     fn event_suppresses_count_hll(
   1679         groups: Option<&GroupServiceHandle>,
   1680         event: &PocketEvent,
   1681     ) -> Result<bool, BaseRelayError> {
   1682         let Some(groups) = groups else {
   1683             return Ok(false);
   1684         };
   1685         let class = classify_group_event(event, groups.limits()).map_err(BaseRelayError::from)?;
   1686         let Some(group_id) = class.group_id() else {
   1687             return Ok(false);
   1688         };
   1689         let projection = groups.projection();
   1690         let Some(group) = projection.group(group_id) else {
   1691             return Ok(true);
   1692         };
   1693         Ok(projection.tombstone(group_id).is_some()
   1694             || group.metadata().private()
   1695             || group.metadata().hidden())
   1696     }
   1697 
   1698     fn count_hll_filter_target_policy(
   1699         groups: Option<&GroupServiceHandle>,
   1700         filter: &PocketFilter,
   1701     ) -> BaseRelayCountHllTargetPolicy {
   1702         let Some(groups) = groups else {
   1703             return if Self::count_hll_filter_has_group_target(filter) {
   1704                 BaseRelayCountHllTargetPolicy::Suppress
   1705             } else {
   1706                 BaseRelayCountHllTargetPolicy::Eligible
   1707             };
   1708         };
   1709         match Self::count_hll_group_targets(
   1710             filter,
   1711             usize::from(groups.limits().max_group_id_bytes()),
   1712         ) {
   1713             BaseRelayCountHllGroupTargets::None => BaseRelayCountHllTargetPolicy::Eligible,
   1714             BaseRelayCountHllGroupTargets::Suppress => BaseRelayCountHllTargetPolicy::Suppress,
   1715             BaseRelayCountHllGroupTargets::Targets(group_ids) => {
   1716                 let projection = groups.projection();
   1717                 if group_ids.iter().all(|group_id| {
   1718                     projection.group(group_id).is_some_and(|group| {
   1719                         projection.tombstone(group_id).is_none()
   1720                             && !group.metadata().private()
   1721                             && !group.metadata().hidden()
   1722                     })
   1723                 }) {
   1724                     BaseRelayCountHllTargetPolicy::Eligible
   1725                 } else {
   1726                     BaseRelayCountHllTargetPolicy::Suppress
   1727                 }
   1728             }
   1729         }
   1730     }
   1731 
   1732     fn count_hll_group_targets(
   1733         filter: &PocketFilter,
   1734         max_group_id_bytes: usize,
   1735     ) -> BaseRelayCountHllGroupTargets {
   1736         let Ok(tags) = filter.tags() else {
   1737             return BaseRelayCountHllGroupTargets::Suppress;
   1738         };
   1739         let d_tag_mode = Self::count_hll_filter_d_tag_mode(filter);
   1740         let mut group_ids = Vec::new();
   1741         for tag in tags.iter() {
   1742             let mut values = tag.into_iter();
   1743             let Some(name) = values.next() else {
   1744                 continue;
   1745             };
   1746             if name == b"d" {
   1747                 match d_tag_mode {
   1748                     BaseRelayCountHllDTagMode::Ignore => continue,
   1749                     BaseRelayCountHllDTagMode::Suppress => {
   1750                         return BaseRelayCountHllGroupTargets::Suppress;
   1751                     }
   1752                     BaseRelayCountHllDTagMode::Target => {}
   1753                 }
   1754             } else if name != b"h" {
   1755                 continue;
   1756             }
   1757             let mut found_value = false;
   1758             for value in values {
   1759                 found_value = true;
   1760                 let Ok(value) = std::str::from_utf8(value) else {
   1761                     return BaseRelayCountHllGroupTargets::Suppress;
   1762                 };
   1763                 let Ok(group_id) = GroupId::new_with_max_bytes(value, max_group_id_bytes) else {
   1764                     return BaseRelayCountHllGroupTargets::Suppress;
   1765                 };
   1766                 group_ids.push(group_id);
   1767             }
   1768             if !found_value {
   1769                 return BaseRelayCountHllGroupTargets::Suppress;
   1770             }
   1771         }
   1772         if group_ids.is_empty() {
   1773             BaseRelayCountHllGroupTargets::None
   1774         } else {
   1775             group_ids.sort();
   1776             group_ids.dedup();
   1777             BaseRelayCountHllGroupTargets::Targets(group_ids)
   1778         }
   1779     }
   1780 
   1781     fn count_hll_filter_has_group_target(filter: &PocketFilter) -> bool {
   1782         let Ok(tags) = filter.tags() else {
   1783             return true;
   1784         };
   1785         let d_tag_mode = Self::count_hll_filter_d_tag_mode(filter);
   1786         tags.iter().any(|tag| {
   1787             let mut values = tag.into_iter();
   1788             let name = values.next();
   1789             matches!(name, Some(b"h"))
   1790                 || (matches!(
   1791                     d_tag_mode,
   1792                     BaseRelayCountHllDTagMode::Target | BaseRelayCountHllDTagMode::Suppress
   1793                 ) && matches!(name, Some(b"d")))
   1794         })
   1795     }
   1796 
   1797     fn count_hll_filter_d_tag_mode(filter: &PocketFilter) -> BaseRelayCountHllDTagMode {
   1798         if filter.num_kinds() == 0 {
   1799             return BaseRelayCountHllDTagMode::Suppress;
   1800         }
   1801         if filter
   1802             .kinds()
   1803             .any(|kind| NIP29_RELAY_GENERATED_KIND_VALUES.contains(&u32::from(kind.as_u16())))
   1804         {
   1805             BaseRelayCountHllDTagMode::Target
   1806         } else {
   1807             BaseRelayCountHllDTagMode::Ignore
   1808         }
   1809     }
   1810 
   1811     fn query_filter_events_report_with_services(
   1812         store: &PocketStoreHandle,
   1813         groups: Option<&GroupServiceHandle>,
   1814         limits: BaseRelayLimits,
   1815         query: PocketQueryConfig,
   1816         filter: &PocketFilter,
   1817         auth: &GroupAuthContext,
   1818         limit_mode: BaseRelayFilterLimitMode,
   1819     ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
   1820         let pocket_filter = Self::pocket_filter_with_limit_mode(limits, filter, limit_mode)?;
   1821         let screen_error = RefCell::new(None);
   1822         let candidates_scanned = Cell::new(0_u64);
   1823         let redacted_events = Cell::new(0_u64);
   1824         let screened = store.find_events_with_screen(&pocket_filter, query, |pocket_event| {
   1825             candidates_scanned.set(candidates_scanned.get().saturating_add(1));
   1826             if screen_error.borrow().is_some() {
   1827                 return PocketScreenResult::Mismatch;
   1828             }
   1829             match pocket_filter.event_matches(pocket_event) {
   1830                 Ok(false) => PocketScreenResult::Mismatch,
   1831                 Ok(true) => {
   1832                     match Self::group_read_gate_visible_to_auth(groups, pocket_event, auth) {
   1833                         Ok(true) => PocketScreenResult::Match,
   1834                         Ok(false) => {
   1835                             redacted_events.set(redacted_events.get().saturating_add(1));
   1836                             PocketScreenResult::Redacted
   1837                         }
   1838                         Err(error) => {
   1839                             *screen_error.borrow_mut() = Some(error);
   1840                             PocketScreenResult::Mismatch
   1841                         }
   1842                     }
   1843                 }
   1844                 Err(error) => {
   1845                     *screen_error.borrow_mut() = Some(BaseRelayError::error(error.to_string()));
   1846                     PocketScreenResult::Mismatch
   1847                 }
   1848             }
   1849         })?;
   1850         if let Some(error) = screen_error.into_inner() {
   1851             return Err(error);
   1852         }
   1853         let group_read_denied = screened.redacted();
   1854         let events = screened.into_events();
   1855         Ok(BaseRelayEventQueryReport::new(
   1856             events,
   1857             group_read_denied,
   1858             BaseRelayQueryMetrics::new(candidates_scanned.get(), 0, redacted_events.get()),
   1859         ))
   1860     }
   1861 
   1862     fn pocket_filter_with_limit_mode(
   1863         limits: BaseRelayLimits,
   1864         filter: &PocketFilter,
   1865         limit_mode: BaseRelayFilterLimitMode,
   1866     ) -> Result<PocketOwnedFilter, BaseRelayError> {
   1867         let limit = match (limit_mode, filter.limit()) {
   1868             (BaseRelayFilterLimitMode::ApplyDefaultLimit, u32::MAX) => {
   1869                 u32::try_from(limits.default_limit)
   1870                     .map_err(|_| BaseRelayError::invalid("default filter limit exceeds u32"))?
   1871             }
   1872             (BaseRelayFilterLimitMode::PreserveCountLimitless, _) => u32::MAX,
   1873             (_, limit) => limit,
   1874         };
   1875         let ids = filter.ids().collect::<Vec<_>>();
   1876         let authors = filter.authors().collect::<Vec<_>>();
   1877         let kinds = filter.kinds().collect::<Vec<_>>();
   1878         let since =
   1879             (filter.since() != tangle_store_pocket::PocketTime::min()).then(|| filter.since());
   1880         let until =
   1881             (filter.until() != tangle_store_pocket::PocketTime::max()).then(|| filter.until());
   1882         let limit = (limit != u32::MAX).then_some(limit);
   1883         PocketOwnedFilter::new(
   1884             &ids,
   1885             &authors,
   1886             &kinds,
   1887             filter
   1888                 .tags()
   1889                 .map_err(|error| BaseRelayError::error(error.to_string()))?,
   1890             since,
   1891             until,
   1892             limit,
   1893         )
   1894         .map_err(|error| BaseRelayError::error(error.to_string()))
   1895     }
   1896 
   1897     fn sort_and_dedupe_query_events(mut events: Vec<PocketOwnedEvent>) -> Vec<PocketOwnedEvent> {
   1898         events.sort_by(|left, right| {
   1899             let left: &PocketEvent = left;
   1900             let right: &PocketEvent = right;
   1901             right
   1902                 .created_at()
   1903                 .cmp(&left.created_at())
   1904                 .then_with(|| left.id().cmp(&right.id()))
   1905         });
   1906         let mut seen = BTreeSet::new();
   1907         events
   1908             .into_iter()
   1909             .filter(|event| {
   1910                 let event: &PocketEvent = event;
   1911                 seen.insert(event.id())
   1912             })
   1913             .collect()
   1914     }
   1915 
   1916     pub(crate) fn group_read_gate_visible_to_auth(
   1917         groups: Option<&GroupServiceHandle>,
   1918         event: &(impl GroupEventView + ?Sized),
   1919         auth: &GroupAuthContext,
   1920     ) -> Result<bool, BaseRelayError> {
   1921         groups
   1922             .map(|groups| groups.event_visible_to_auth(event, auth))
   1923             .unwrap_or(Ok(true))
   1924             .map_err(BaseRelayError::from)
   1925     }
   1926 }
   1927 
   1928 fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool {
   1929     !filters.is_empty() && filters.iter().all(|filter| filter.completes())
   1930 }
   1931 
   1932 #[cfg(test)]
   1933 mod tests {
   1934     use super::{
   1935         BaseRelay, BaseRelayCountHll, BaseRelayCountHllTargetPolicy, BaseRelayLimitSettings,
   1936         BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE,
   1937     };
   1938     use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket};
   1939     use crate::relay::auth::BaseAuthState;
   1940     use crate::relay::live::CloseResult;
   1941     use tangle_crypto::RelaySigner;
   1942     use tangle_groups::{
   1943         GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP,
   1944         KIND_GROUP_CREATE_INVITE, KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP,
   1945         KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
   1946         KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER,
   1947         MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset,
   1948         parse_group_runtime_config_json,
   1949     };
   1950     use tangle_protocol::{
   1951         ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex,
   1952         SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, filter_from_value,
   1953     };
   1954     use tangle_store_pocket::{
   1955         PocketEvent, PocketHll8, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags,
   1956         PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, PocketTime,
   1957     };
   1958 
   1959     trait BaseRelayCountTestExt {
   1960         fn handle_count_protocol(
   1961             &self,
   1962             subscription_id: SubscriptionId,
   1963             filters: Vec<Filter>,
   1964         ) -> Result<RelayMessage, crate::errors::BaseRelayError>;
   1965 
   1966         fn handle_count_with_auth_protocol(
   1967             &self,
   1968             subscription_id: SubscriptionId,
   1969             filters: Vec<Filter>,
   1970             auth: &BaseAuthState,
   1971         ) -> Result<RelayMessage, crate::errors::BaseRelayError>;
   1972     }
   1973 
   1974     impl BaseRelayCountTestExt for BaseRelay {
   1975         fn handle_count_protocol(
   1976             &self,
   1977             subscription_id: SubscriptionId,
   1978             filters: Vec<Filter>,
   1979         ) -> Result<RelayMessage, crate::errors::BaseRelayError> {
   1980             let search_present = filters.iter().any(|filter| filter.search().is_some());
   1981             let filters = pocket_filters(filters)?;
   1982             self.handle_count_with_group_auth_report(
   1983                 subscription_id,
   1984                 filters,
   1985                 search_present,
   1986                 &GroupAuthContext::unauthenticated(),
   1987             )
   1988             .map(|report| report.into_message())
   1989         }
   1990 
   1991         fn handle_count_with_auth_protocol(
   1992             &self,
   1993             subscription_id: SubscriptionId,
   1994             filters: Vec<Filter>,
   1995             auth: &BaseAuthState,
   1996         ) -> Result<RelayMessage, crate::errors::BaseRelayError> {
   1997             let search_present = filters.iter().any(|filter| filter.search().is_some());
   1998             let filters = pocket_filters(filters)?;
   1999             let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
   2000             self.handle_count_with_group_auth_report(
   2001                 subscription_id,
   2002                 filters,
   2003                 search_present,
   2004                 &group_auth,
   2005             )
   2006             .map(|report| report.into_message())
   2007         }
   2008     }
   2009 
   2010     fn pocket_filters(
   2011         filters: Vec<Filter>,
   2012     ) -> Result<Vec<PocketOwnedFilter>, crate::errors::BaseRelayError> {
   2013         filters.iter().map(tangle_filter_to_pocket).collect()
   2014     }
   2015 
   2016     #[test]
   2017     fn base_relay_stores_queries_counts_closes_and_fans_out_public_events() {
   2018         let mut relay = test_relay("base-relay-public", 4);
   2019         let event = signed_public_event(7, 1, Vec::new(), "hello");
   2020         let subscription_id = SubscriptionId::new("sub-a").expect("sub");
   2021         let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter");
   2022 
   2023         assert_eq!(
   2024             relay.handle_event(event.clone()).expect("event"),
   2025             RelayMessage::Ok {
   2026                 event_id: event.id().clone(),
   2027                 accepted: true,
   2028                 message: String::new()
   2029             }
   2030         );
   2031         assert_eq!(
   2032             relay.handle_event(event.clone()).expect("duplicate"),
   2033             RelayMessage::Ok {
   2034                 event_id: event.id().clone(),
   2035                 accepted: true,
   2036                 message: "duplicate: already have this event".to_owned()
   2037             }
   2038         );
   2039 
   2040         let messages = relay
   2041             .handle_protocol_req_for_test(subscription_id.clone(), vec![filter.clone()])
   2042             .expect("req");
   2043         assert!(
   2044             matches!(&messages[0], RelayMessage::Event { event: found, .. } if found.id() == event.id())
   2045         );
   2046         assert_eq!(messages[1], RelayMessage::Eose(subscription_id.clone()));
   2047         assert_eq!(
   2048             relay
   2049                 .handle_count_protocol(subscription_id.clone(), vec![filter])
   2050                 .expect("count"),
   2051             RelayMessage::Count {
   2052                 subscription_id: subscription_id.clone(),
   2053                 count: 1,
   2054                 hll: None
   2055             }
   2056         );
   2057         assert!(matches!(
   2058             relay.fanout_protocol_for_test(&event).as_slice(),
   2059             [RelayMessage::Event { subscription_id: delivered, event: found }]
   2060                 if delivered == &subscription_id && found.id() == event.id()
   2061         ));
   2062         assert_eq!(relay.handle_close(&subscription_id), CloseResult::Closed);
   2063         assert_eq!(relay.active_subscription_count(), 0);
   2064         assert!(relay.fanout_protocol_for_test(&event).is_empty());
   2065     }
   2066 
   2067     #[test]
   2068     fn base_relay_uses_configured_pocket_query_scrape_controls() {
   2069         let strict_config = test_store_config("base-relay-query-strict");
   2070         let mut strict = BaseRelay::open(
   2071             &strict_config,
   2072             relay_limits(4),
   2073             PocketQueryConfig::new(false, 0, 0),
   2074         )
   2075         .expect("strict");
   2076         let strict_event = signed_public_event(7, 1, Vec::new(), "strict");
   2077         let broad = filter_from_value(&serde_json::json!({"limit":1})).expect("filter");
   2078 
   2079         assert_accepted(
   2080             strict
   2081                 .handle_event(strict_event.clone())
   2082                 .expect("strict event"),
   2083             &strict_event,
   2084         );
   2085         assert!(
   2086             strict
   2087                 .handle_protocol_req_for_test(
   2088                     SubscriptionId::new("strict").expect("sub"),
   2089                     vec![broad.clone()]
   2090                 )
   2091                 .expect_err("strict scrape")
   2092                 .prefixed_message()
   2093                 .to_lowercase()
   2094                 .contains("scraper")
   2095         );
   2096 
   2097         let limited_config = test_store_config("base-relay-query-limited");
   2098         let mut limited = BaseRelay::open(
   2099             &limited_config,
   2100             relay_limits(4),
   2101             PocketQueryConfig::new(false, 1, 0),
   2102         )
   2103         .expect("limited");
   2104         let limited_event = signed_public_event(8, 1, Vec::new(), "limited");
   2105 
   2106         assert_accepted(
   2107             limited
   2108                 .handle_event(limited_event.clone())
   2109                 .expect("limited event"),
   2110             &limited_event,
   2111         );
   2112         let messages = limited
   2113             .handle_protocol_req_for_test(SubscriptionId::new("limited").expect("sub"), vec![broad])
   2114             .expect("limited scrape");
   2115 
   2116         assert!(
   2117             matches!(&messages[0], RelayMessage::Event { event, .. } if event.id() == limited_event.id())
   2118         );
   2119     }
   2120 
   2121     #[test]
   2122     fn base_relay_rejects_search_req_and_count_as_unsupported() {
   2123         let mut relay = test_relay("base-relay-search-unsupported", 4);
   2124         let req_id = SubscriptionId::new("search-req").expect("req");
   2125         let count_id = SubscriptionId::new("search-count").expect("count");
   2126         let search = filter_from_value(&serde_json::json!({
   2127             "search": "fresh carrots",
   2128             "limit": 1
   2129         }))
   2130         .expect("filter");
   2131 
   2132         assert_eq!(
   2133             relay
   2134                 .handle_protocol_req_for_test(req_id.clone(), vec![search.clone()])
   2135                 .expect("req"),
   2136             vec![RelayMessage::Closed {
   2137                 subscription_id: req_id,
   2138                 message: "unsupported: search filters are not supported".to_owned()
   2139             }]
   2140         );
   2141         assert_eq!(relay.active_subscription_count(), 0);
   2142         assert_eq!(
   2143             relay
   2144                 .handle_count_protocol(count_id.clone(), vec![search])
   2145                 .expect("count"),
   2146             RelayMessage::Closed {
   2147                 subscription_id: count_id,
   2148                 message: "unsupported: search filters are not supported".to_owned()
   2149             }
   2150         );
   2151     }
   2152 
   2153     #[test]
   2154     fn base_relay_dispatch_returns_disabled_negentropy_surface() {
   2155         let mut relay = test_relay("base-relay-negentropy-disabled", 4);
   2156         let mut auth =
   2157             BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   2158         let subscription_id = SubscriptionId::new("neg-sub").expect("sub");
   2159 
   2160         assert_eq!(
   2161             relay
   2162                 .handle_client_message(
   2163                     ClientMessage::NegOpen {
   2164                         subscription_id: subscription_id.clone(),
   2165                         filter: Filter::empty(),
   2166                         message: "00".to_owned()
   2167                     },
   2168                     &mut auth,
   2169                     UnixTimestamp::new(100)
   2170                 )
   2171                 .expect("neg open"),
   2172             vec![RelayMessage::NegErr {
   2173                 subscription_id: subscription_id.clone(),
   2174                 message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
   2175             }]
   2176         );
   2177         assert_eq!(
   2178             relay
   2179                 .handle_client_message(
   2180                     ClientMessage::NegMsg {
   2181                         subscription_id: subscription_id.clone(),
   2182                         message: String::new()
   2183                     },
   2184                     &mut auth,
   2185                     UnixTimestamp::new(101)
   2186                 )
   2187                 .expect("neg msg"),
   2188             vec![RelayMessage::NegErr {
   2189                 subscription_id: subscription_id.clone(),
   2190                 message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
   2191             }]
   2192         );
   2193         assert_eq!(
   2194             relay
   2195                 .handle_client_message(
   2196                     ClientMessage::NegClose(subscription_id),
   2197                     &mut auth,
   2198                     UnixTimestamp::new(102)
   2199                 )
   2200                 .expect("neg close"),
   2201             Vec::<RelayMessage>::new()
   2202         );
   2203     }
   2204 
   2205     #[test]
   2206     fn base_relay_disabled_negentropy_does_not_validate_or_screen_filter() {
   2207         let owner = signer(7).public_key().clone();
   2208         let owner_auth = authenticated_state(7);
   2209         let mut auth =
   2210             BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   2211         let mut relay = test_relay_with_groups(
   2212             "base-relay-negentropy-disabled-no-screen",
   2213             4,
   2214             &enabled_groups_for_owner(&owner),
   2215         );
   2216         let private_create = signed_private_group_create_event(7, "PrivateNegentropy");
   2217         assert_accepted(
   2218             relay
   2219                 .handle_event_with_auth(private_create.clone(), &owner_auth)
   2220                 .expect("private create"),
   2221             &private_create,
   2222         );
   2223         let private_event = signed_event_at(
   2224             7,
   2225             1,
   2226             vec![h("PrivateNegentropy")],
   2227             "private negentropy",
   2228             1_714_124_434,
   2229         );
   2230         assert_accepted(
   2231             relay
   2232                 .handle_event_with_auth(private_event.clone(), &owner_auth)
   2233                 .expect("private event"),
   2234             &private_event,
   2235         );
   2236         let subscription_id = SubscriptionId::new("neg-noscreen").expect("sub");
   2237         let filter = filter_from_value(&serde_json::json!({
   2238             "kinds": [1],
   2239             "#h": ["PrivateNegentropy"],
   2240             "limit": 501
   2241         }))
   2242         .expect("filter");
   2243 
   2244         assert_eq!(
   2245             relay
   2246                 .handle_client_message(
   2247                     ClientMessage::NegOpen {
   2248                         subscription_id: subscription_id.clone(),
   2249                         filter,
   2250                         message: "00".to_owned()
   2251                     },
   2252                     &mut auth,
   2253                     UnixTimestamp::new(100)
   2254                 )
   2255                 .expect("neg open"),
   2256             vec![RelayMessage::NegErr {
   2257                 subscription_id: subscription_id.clone(),
   2258                 message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
   2259             }]
   2260         );
   2261         assert_eq!(
   2262             relay
   2263                 .handle_client_message(
   2264                     ClientMessage::NegMsg {
   2265                         subscription_id: subscription_id.clone(),
   2266                         message: "should-not-touch-storage".to_owned()
   2267                     },
   2268                     &mut auth,
   2269                     UnixTimestamp::new(101)
   2270                 )
   2271                 .expect("neg msg"),
   2272             vec![RelayMessage::NegErr {
   2273                 subscription_id,
   2274                 message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
   2275             }]
   2276         );
   2277     }
   2278 
   2279     #[test]
   2280     fn base_relay_fetches_events_by_store_offset() {
   2281         let relay = test_relay("base-relay-offset-lookup", 4);
   2282         let event = signed_public_event(7, 1, Vec::new(), "offset");
   2283         let pocket = tangle_event_to_pocket(&event).expect("pocket");
   2284         let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store"));
   2285 
   2286         let found = relay.event_by_offset(offset).expect("offset");
   2287         let found: &PocketEvent = &found;
   2288         assert_eq!(found.id().as_hex_string(), event.id().as_str());
   2289     }
   2290 
   2291     #[test]
   2292     fn base_relay_req_merges_filters_with_order_dedupe_and_limits() {
   2293         let mut relay = test_relay("base-relay-req-order", 8);
   2294         let market_tag = Tag::from_parts("t", &["market"]).expect("tag");
   2295         let old_market =
   2296             signed_event_at(7, 1, vec![market_tag.clone()], "old market", 1_714_124_433);
   2297         let tied_author =
   2298             signed_event_at(7, 1, vec![market_tag.clone()], "tied author", 1_714_124_434);
   2299         let tied_other =
   2300             signed_event_at(8, 1, vec![market_tag.clone()], "tied other", 1_714_124_434);
   2301         let kind_two = signed_event_at(7, 2, Vec::new(), "kind two", 1_714_124_435);
   2302         let wrong_tag = signed_event_at(
   2303             9,
   2304             1,
   2305             vec![Tag::from_parts("t", &["other"]).expect("tag")],
   2306             "wrong tag",
   2307             1_714_124_436,
   2308         );
   2309 
   2310         for event in [
   2311             &old_market,
   2312             &tied_other,
   2313             &kind_two,
   2314             &wrong_tag,
   2315             &tied_author,
   2316         ] {
   2317             assert_accepted(relay.handle_event(event.clone()).expect("event"), event);
   2318         }
   2319 
   2320         let subscription_id = SubscriptionId::new("req-order").expect("sub");
   2321         let market_limit =
   2322             filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2}))
   2323                 .expect("market filter");
   2324         let author_limit = filter_from_value(&serde_json::json!({
   2325             "authors":[tied_author.unsigned().pubkey().as_str()],
   2326             "kinds":[1,2],
   2327             "limit":2
   2328         }))
   2329         .expect("author filter");
   2330         let messages = relay
   2331             .handle_protocol_req_for_test(subscription_id.clone(), vec![market_limit, author_limit])
   2332             .expect("req");
   2333         let mut tied = [tied_author.clone(), tied_other.clone()];
   2334         tied.sort_by(|left, right| left.id().cmp(right.id()));
   2335         let expected = [kind_two.clone(), tied[0].clone(), tied[1].clone()];
   2336 
   2337         assert_eq!(messages.len(), expected.len() + 1);
   2338         for (message, event) in messages.iter().zip(expected.iter()) {
   2339             assert!(matches!(
   2340                 message,
   2341                 RelayMessage::Event {
   2342                     subscription_id: actual,
   2343                     event: found
   2344                 } if actual == &subscription_id && found.id() == event.id()
   2345             ));
   2346         }
   2347         assert_eq!(
   2348             messages.last(),
   2349             Some(&RelayMessage::Eose(subscription_id.clone()))
   2350         );
   2351         assert!(!messages.iter().any(|message| matches!(
   2352             message,
   2353             RelayMessage::Event { event, .. }
   2354                 if event.id() == old_market.id() || event.id() == wrong_tag.id()
   2355         )));
   2356     }
   2357 
   2358     #[test]
   2359     fn base_relay_req_count_paths_preserve_chorus_parity() {
   2360         let owner = signer(7).public_key().clone();
   2361         let auth = authenticated_state(7);
   2362         let outsider_auth = authenticated_state(8);
   2363         let mut relay = test_relay_with_groups(
   2364             "base-relay-req-count-chorus-parity",
   2365             8,
   2366             &enabled_groups_for_owner(&owner),
   2367         );
   2368         let market_tag = Tag::from_parts("t", &["market"]).expect("tag");
   2369         let old_market =
   2370             signed_event_at(7, 1, vec![market_tag.clone()], "old market", 1_714_124_433);
   2371         let tied_author =
   2372             signed_event_at(7, 1, vec![market_tag.clone()], "tied author", 1_714_124_434);
   2373         let tied_other =
   2374             signed_event_at(8, 1, vec![market_tag.clone()], "tied other", 1_714_124_434);
   2375         let kind_two = signed_event_at(7, 2, Vec::new(), "kind two", 1_714_124_435);
   2376         let wrong_tag = signed_event_at(
   2377             9,
   2378             1,
   2379             vec![Tag::from_parts("t", &["other"]).expect("tag")],
   2380             "wrong tag",
   2381             1_714_124_436,
   2382         );
   2383         for event in [
   2384             &old_market,
   2385             &tied_other,
   2386             &kind_two,
   2387             &wrong_tag,
   2388             &tied_author,
   2389         ] {
   2390             assert_accepted(relay.handle_event(event.clone()).expect("event"), event);
   2391         }
   2392         relay
   2393             .handle_event_with_auth(signed_private_group_create_event(7, "Private"), &auth)
   2394             .expect("private create");
   2395         let private_market = signed_event_at(
   2396             7,
   2397             1,
   2398             vec![h("Private"), market_tag.clone()],
   2399             "private market",
   2400             1_714_124_437,
   2401         );
   2402         assert_accepted(
   2403             relay
   2404                 .handle_event_with_auth(private_market.clone(), &auth)
   2405                 .expect("private event"),
   2406             &private_market,
   2407         );
   2408 
   2409         let subscription_id = SubscriptionId::new("req-count-parity").expect("sub");
   2410         let market_limit =
   2411             filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2}))
   2412                 .expect("market filter");
   2413         let author_limit = filter_from_value(&serde_json::json!({
   2414             "authors":[tied_author.unsigned().pubkey().as_str()],
   2415             "kinds":[1,2],
   2416             "limit":2
   2417         }))
   2418         .expect("author filter");
   2419         let messages = relay
   2420             .handle_protocol_req_for_test(
   2421                 subscription_id.clone(),
   2422                 vec![market_limit.clone(), author_limit.clone()],
   2423             )
   2424             .expect("req");
   2425         let mut tied = [tied_author.clone(), tied_other.clone()];
   2426         tied.sort_by(|left, right| left.id().cmp(right.id()));
   2427         let expected = [kind_two.clone(), tied[0].clone(), tied[1].clone()];
   2428         let event_ids = messages
   2429             .iter()
   2430             .filter_map(|message| match message {
   2431                 RelayMessage::Event {
   2432                     subscription_id: actual,
   2433                     event,
   2434                 } if actual == &subscription_id => Some(event.id().clone()),
   2435                 _ => None,
   2436             })
   2437             .collect::<Vec<_>>();
   2438         let expected_ids = expected
   2439             .iter()
   2440             .map(|event| event.id().clone())
   2441             .collect::<Vec<_>>();
   2442 
   2443         assert_eq!(event_ids, expected_ids);
   2444         assert_eq!(
   2445             messages.last(),
   2446             Some(&RelayMessage::Closed {
   2447                 subscription_id: subscription_id.clone(),
   2448                 message: "auth-required: authentication required to read group events".to_owned()
   2449             })
   2450         );
   2451         assert!(!messages.iter().any(
   2452             |message| matches!(message, RelayMessage::Eose(actual) if actual == &subscription_id)
   2453         ));
   2454         assert!(!event_ids.contains(private_market.id()));
   2455         assert!(!event_ids.contains(old_market.id()));
   2456         assert!(!event_ids.contains(wrong_tag.id()));
   2457         assert_eq!(relay.active_subscription_count(), 0);
   2458 
   2459         let restricted_sub = SubscriptionId::new("restricted-screened").expect("sub");
   2460         let restricted_messages = relay
   2461             .handle_protocol_req_with_auth_for_test(
   2462                 restricted_sub.clone(),
   2463                 vec![market_limit.clone(), author_limit.clone()],
   2464                 &outsider_auth,
   2465             )
   2466             .expect("restricted req");
   2467         let restricted_event_ids = restricted_messages
   2468             .iter()
   2469             .filter_map(|message| match message {
   2470                 RelayMessage::Event {
   2471                     subscription_id: actual,
   2472                     event,
   2473                 } if actual == &restricted_sub => Some(event.id().clone()),
   2474                 _ => None,
   2475             })
   2476             .collect::<Vec<_>>();
   2477         assert_eq!(restricted_event_ids, expected_ids);
   2478         assert_eq!(
   2479             restricted_messages.last(),
   2480             Some(&RelayMessage::Closed {
   2481                 subscription_id: restricted_sub.clone(),
   2482                 message: "restricted: group is unavailable".to_owned()
   2483             })
   2484         );
   2485         assert!(!restricted_messages.iter().any(
   2486             |message| matches!(message, RelayMessage::Eose(actual) if actual == &restricted_sub)
   2487         ));
   2488         assert_eq!(relay.active_subscription_count(), 0);
   2489 
   2490         let private_sub = SubscriptionId::new("private-screened").expect("sub");
   2491         assert_eq!(
   2492             relay
   2493                 .handle_protocol_req_for_test(
   2494                     private_sub.clone(),
   2495                     vec![filter_group_tag(1, "h", "Private")]
   2496                 )
   2497                 .expect("private unauth req"),
   2498             vec![RelayMessage::Closed {
   2499                 subscription_id: private_sub,
   2500                 message: "auth-required: authentication required to read group events".to_owned()
   2501             }]
   2502         );
   2503         assert_eq!(relay.active_subscription_count(), 0);
   2504         let private_auth_sub = SubscriptionId::new("private-auth").expect("sub");
   2505         assert!(matches!(
   2506             relay
   2507                 .handle_protocol_req_with_auth_for_test(
   2508                     private_auth_sub.clone(),
   2509                     vec![filter_group_tag(1, "h", "Private")],
   2510                     &auth
   2511                 )
   2512                 .expect("private auth req")
   2513                 .as_slice(),
   2514             [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)]
   2515                 if subscription_id == &private_auth_sub && event.id() == private_market.id() && eose == &private_auth_sub
   2516         ));
   2517 
   2518         let market_notes =
   2519             filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":10}))
   2520                 .expect("market count filter");
   2521         let author_events = filter_from_value(&serde_json::json!({
   2522             "authors":[tied_author.unsigned().pubkey().as_str()],
   2523             "kinds":[1,2],
   2524             "limit":10
   2525         }))
   2526         .expect("author count filter");
   2527         assert_eq!(
   2528             relay
   2529                 .handle_count_protocol(
   2530                     SubscriptionId::new("count-visible").expect("sub"),
   2531                     vec![market_notes.clone(), author_events.clone()]
   2532                 )
   2533                 .expect("visible count"),
   2534             RelayMessage::Count {
   2535                 subscription_id: SubscriptionId::new("count-visible").expect("sub"),
   2536                 count: 4,
   2537                 hll: None
   2538             }
   2539         );
   2540         assert_eq!(
   2541             relay
   2542                 .handle_count_with_auth_protocol(
   2543                     SubscriptionId::new("count-auth").expect("sub"),
   2544                     vec![market_notes, author_events],
   2545                     &auth
   2546                 )
   2547                 .expect("auth count"),
   2548             RelayMessage::Count {
   2549                 subscription_id: SubscriptionId::new("count-auth").expect("sub"),
   2550                 count: 5,
   2551                 hll: None
   2552             }
   2553         );
   2554 
   2555         let too_large_limit =
   2556             filter_from_value(&serde_json::json!({"limit":501})).expect("limit filter");
   2557         assert!(
   2558             relay
   2559                 .handle_protocol_req_for_test(
   2560                     SubscriptionId::new("limit-req").expect("sub"),
   2561                     vec![too_large_limit.clone()]
   2562                 )
   2563                 .expect_err("req limit")
   2564                 .prefixed_message()
   2565                 .contains("max_limit 500")
   2566         );
   2567         assert!(
   2568             relay
   2569                 .handle_count_protocol(
   2570                     SubscriptionId::new("limit-count").expect("sub"),
   2571                     vec![too_large_limit]
   2572                 )
   2573                 .expect_err("count limit")
   2574                 .prefixed_message()
   2575                 .contains("max_limit 500")
   2576         );
   2577 
   2578         let search = filter_from_value(&serde_json::json!({"search":"carrots","limit":1}))
   2579             .expect("search filter");
   2580         let search_req = SubscriptionId::new("search-req").expect("sub");
   2581         assert_eq!(
   2582             relay
   2583                 .handle_protocol_req_for_test(search_req.clone(), vec![search.clone()])
   2584                 .expect("search req"),
   2585             vec![RelayMessage::Closed {
   2586                 subscription_id: search_req,
   2587                 message: "unsupported: search filters are not supported".to_owned()
   2588             }]
   2589         );
   2590         let search_count = SubscriptionId::new("search-count").expect("sub");
   2591         assert_eq!(
   2592             relay
   2593                 .handle_count_protocol(search_count.clone(), vec![search])
   2594                 .expect("search count"),
   2595             RelayMessage::Closed {
   2596                 subscription_id: search_count,
   2597                 message: "unsupported: search filters are not supported".to_owned()
   2598             }
   2599         );
   2600     }
   2601 
   2602     #[test]
   2603     fn base_relay_enforces_runtime_limits() {
   2604         let config = test_store_config("base-relay-runtime-limits");
   2605         let mut relay = BaseRelay::open(
   2606             &config,
   2607             BaseRelayLimits::new(BaseRelayLimitSettings {
   2608                 max_pending_events: 2,
   2609                 max_subscription_id_length: 3,
   2610                 max_subscriptions: 1,
   2611                 max_filters_per_request: 1,
   2612                 max_tag_values_per_filter: 1,
   2613                 max_query_complexity: 4,
   2614                 max_event_tags: 1,
   2615                 max_content_length: 4,
   2616                 max_limit: 2,
   2617                 default_limit: 1,
   2618             })
   2619             .expect("limits"),
   2620             PocketQueryConfig::default(),
   2621         )
   2622         .expect("relay");
   2623         let first = signed_event_at(7, 1, Vec::new(), "one", 1_714_124_430);
   2624         let second = signed_event_at(8, 1, Vec::new(), "two", 1_714_124_431);
   2625 
   2626         assert_accepted(relay.handle_event(first.clone()).expect("first"), &first);
   2627         assert_accepted(relay.handle_event(second.clone()).expect("second"), &second);
   2628 
   2629         let limited = relay
   2630             .handle_protocol_req_for_test(
   2631                 SubscriptionId::new("lim").expect("sub"),
   2632                 vec![Filter::empty()],
   2633             )
   2634             .expect("limited");
   2635         assert_eq!(
   2636             limited
   2637                 .iter()
   2638                 .filter(|message| matches!(message, RelayMessage::Event { .. }))
   2639                 .count(),
   2640             1
   2641         );
   2642         assert_eq!(
   2643             relay.handle_close(&SubscriptionId::new("lim").expect("sub")),
   2644             CloseResult::Closed
   2645         );
   2646 
   2647         assert!(
   2648             relay
   2649                 .handle_protocol_req_for_test(
   2650                     SubscriptionId::new("long").expect("sub"),
   2651                     vec![Filter::empty()]
   2652                 )
   2653                 .expect_err("subscription id length")
   2654                 .prefixed_message()
   2655                 .contains("max_subid_length 3")
   2656         );
   2657         assert!(
   2658             relay
   2659                 .handle_count_protocol(
   2660                     SubscriptionId::new("cnt").expect("sub"),
   2661                     vec![Filter::empty(), Filter::empty()]
   2662                 )
   2663                 .expect_err("filter count")
   2664                 .prefixed_message()
   2665                 .contains("max_filters_per_request 1")
   2666         );
   2667         assert!(
   2668             relay
   2669                 .handle_count_protocol(
   2670                     SubscriptionId::new("tag").expect("sub"),
   2671                     vec![
   2672                         filter_from_value(&serde_json::json!({"#t":["one", "two"]}))
   2673                             .expect("filter")
   2674                     ]
   2675                 )
   2676                 .expect_err("tag values")
   2677                 .prefixed_message()
   2678                 .contains("max_tag_values_per_filter 1")
   2679         );
   2680         assert!(
   2681             relay
   2682                 .handle_count_protocol(
   2683                     SubscriptionId::new("max").expect("sub"),
   2684                     vec![filter_from_value(&serde_json::json!({"limit":3})).expect("filter")]
   2685                 )
   2686                 .expect_err("max limit")
   2687                 .prefixed_message()
   2688                 .contains("max_limit 2")
   2689         );
   2690 
   2691         let too_many_tags = signed_event_at(
   2692             9,
   2693             1,
   2694             vec![
   2695                 Tag::from_parts("t", &["one"]).expect("tag"),
   2696                 Tag::from_parts("p", &["two"]).expect("tag"),
   2697             ],
   2698             "ok",
   2699             1_714_124_432,
   2700         );
   2701         assert!(matches!(
   2702             relay.handle_event(too_many_tags).expect("tags"),
   2703             RelayMessage::Ok { accepted: false, message, .. }
   2704                 if message.contains("max_event_tags 1")
   2705         ));
   2706 
   2707         let too_much_content = signed_event_at(10, 1, Vec::new(), "12345", 1_714_124_433);
   2708         assert!(matches!(
   2709             relay.handle_event(too_much_content).expect("content"),
   2710             RelayMessage::Ok { accepted: false, message, .. }
   2711                 if message.contains("max_content_length 4")
   2712         ));
   2713     }
   2714 
   2715     #[test]
   2716     fn base_relay_rejects_over_budget_req_and_count() {
   2717         let config = test_store_config("base-relay-query-complexity");
   2718         let mut relay = BaseRelay::open(
   2719             &config,
   2720             BaseRelayLimits::new(BaseRelayLimitSettings {
   2721                 max_pending_events: 4,
   2722                 max_subscription_id_length: 64,
   2723                 max_subscriptions: 64,
   2724                 max_filters_per_request: 10,
   2725                 max_tag_values_per_filter: 10,
   2726                 max_query_complexity: 4,
   2727                 max_event_tags: 200,
   2728                 max_content_length: 65_536,
   2729                 max_limit: 10,
   2730                 default_limit: 1,
   2731             })
   2732             .expect("limits"),
   2733             PocketQueryConfig::default(),
   2734         )
   2735         .expect("relay");
   2736         let complex = filter_from_value(&serde_json::json!({
   2737             "kinds": [1],
   2738             "#t": ["market"],
   2739             "limit": 2
   2740         }))
   2741         .expect("filter");
   2742 
   2743         assert!(
   2744             relay
   2745                 .handle_protocol_req_for_test(
   2746                     SubscriptionId::new("req").expect("sub"),
   2747                     vec![complex.clone()]
   2748                 )
   2749                 .expect_err("req complexity")
   2750                 .prefixed_message()
   2751                 .contains("max_query_complexity 4")
   2752         );
   2753         assert_eq!(relay.active_subscription_count(), 0);
   2754         assert!(
   2755             relay
   2756                 .handle_count_protocol(SubscriptionId::new("cnt").expect("sub"), vec![complex])
   2757                 .expect_err("count complexity")
   2758                 .prefixed_message()
   2759                 .contains("max_query_complexity 4")
   2760         );
   2761     }
   2762 
   2763     #[test]
   2764     fn base_relay_count_dedupes_overlapping_visible_filters() {
   2765         let relay = test_relay("base-relay-count-dedupe", 8);
   2766         let market_tag = Tag::from_parts("t", &["market"]).expect("tag");
   2767         let first = signed_event_at(7, 1, vec![market_tag.clone()], "first", 1_714_124_433);
   2768         let second = signed_event_at(8, 1, vec![market_tag], "second", 1_714_124_434);
   2769         let third = signed_event_at(7, 2, Vec::new(), "third", 1_714_124_435);
   2770 
   2771         for event in [&first, &second, &third] {
   2772             assert_accepted(relay.handle_event(event.clone()).expect("event"), event);
   2773         }
   2774 
   2775         let market_notes =
   2776             filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2}))
   2777                 .expect("market filter");
   2778         let author_events = filter_from_value(&serde_json::json!({
   2779             "authors":[first.unsigned().pubkey().as_str()],
   2780             "kinds":[1,2],
   2781             "limit":10
   2782         }))
   2783         .expect("author filter");
   2784         let limited_market =
   2785             filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":1}))
   2786                 .expect("limited filter");
   2787 
   2788         assert_eq!(
   2789             relay
   2790                 .handle_count_protocol(
   2791                     SubscriptionId::new("count-limit").expect("sub"),
   2792                     vec![limited_market]
   2793                 )
   2794                 .expect("count"),
   2795             RelayMessage::Count {
   2796                 subscription_id: SubscriptionId::new("count-limit").expect("sub"),
   2797                 count: 2,
   2798                 hll: None
   2799             }
   2800         );
   2801 
   2802         assert_eq!(
   2803             relay
   2804                 .handle_count_protocol(
   2805                     SubscriptionId::new("count-dedupe").expect("sub"),
   2806                     vec![market_notes, author_events]
   2807                 )
   2808                 .expect("count"),
   2809             RelayMessage::Count {
   2810                 subscription_id: SubscriptionId::new("count-dedupe").expect("sub"),
   2811                 count: 3,
   2812                 hll: None
   2813             }
   2814         );
   2815     }
   2816 
   2817     #[test]
   2818     fn base_relay_count_hll_emits_for_public_single_filter() {
   2819         let relay = test_relay("base-relay-count-hll-public", 8);
   2820         let target = "a".repeat(EventId::HEX_LENGTH);
   2821         let target_tag = Tag::from_parts("e", &[&target]).expect("tag");
   2822         let first = signed_pocket_public_event(7, 7, vec![target_tag.clone()], "first reaction");
   2823         let second = signed_pocket_public_event(8, 7, vec![target_tag], "second reaction");
   2824 
   2825         for event in [&first, &second] {
   2826             assert_pocket_accepted(relay.handle_pocket_event(event).expect("event"), event);
   2827         }
   2828 
   2829         let RelayMessage::Count { count, hll, .. } = relay
   2830             .handle_count_protocol(
   2831                 SubscriptionId::new("count-hll-public").expect("sub"),
   2832                 vec![
   2833                     filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]}))
   2834                         .expect("filter"),
   2835                 ],
   2836             )
   2837             .expect("count")
   2838         else {
   2839             panic!("count expected")
   2840         };
   2841         let hll = hll.expect("hll");
   2842 
   2843         assert_eq!(count, 2);
   2844         assert_eq!(hll.len(), 512);
   2845         assert_ne!(hll, "00".repeat(256));
   2846     }
   2847 
   2848     #[test]
   2849     fn base_relay_count_hll_omits_for_private_hidden_unknown_limited_multi_and_redacted_counts() {
   2850         let owner = signer(7).public_key().clone();
   2851         let owner_auth = authenticated_state(7);
   2852         let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   2853         let relay = test_relay_with_groups(
   2854             "base-relay-count-hll-omits",
   2855             8,
   2856             &enabled_groups_for_owner(&owner),
   2857         );
   2858         let target = "b".repeat(EventId::HEX_LENGTH);
   2859         let target_tag = Tag::from_parts("e", &[&target]).expect("tag");
   2860         let public = signed_pocket_public_event(8, 7, vec![target_tag.clone()], "public reaction");
   2861 
   2862         assert_pocket_accepted(relay.handle_pocket_event(&public).expect("public"), &public);
   2863         let private_create = signed_pocket_private_group_create_event(7, "PrivateHll");
   2864         assert_pocket_accepted(
   2865             relay
   2866                 .handle_pocket_event_with_auth(&private_create, &owner_auth)
   2867                 .expect("private create"),
   2868             &private_create,
   2869         );
   2870         let private = signed_pocket_event_at_tags(
   2871             7,
   2872             7,
   2873             vec![h("PrivateHll"), target_tag.clone()],
   2874             "private reaction",
   2875             1_714_124_434,
   2876         );
   2877         assert_pocket_accepted(
   2878             relay
   2879                 .handle_pocket_event_with_auth(&private, &owner_auth)
   2880                 .expect("private reaction"),
   2881             &private,
   2882         );
   2883         let hidden_create = signed_pocket_group_create_event_with_tags(
   2884             7,
   2885             "HiddenHll",
   2886             vec![hidden()],
   2887             1_714_124_435,
   2888         );
   2889         assert_pocket_accepted(
   2890             relay
   2891                 .handle_pocket_event_with_auth(&hidden_create, &owner_auth)
   2892                 .expect("hidden create"),
   2893             &hidden_create,
   2894         );
   2895         let hidden = signed_pocket_event_at_tags(
   2896             7,
   2897             7,
   2898             vec![h("HiddenHll"), target_tag.clone()],
   2899             "hidden reaction",
   2900             1_714_124_436,
   2901         );
   2902         assert_pocket_accepted(
   2903             relay
   2904                 .handle_pocket_event_with_auth(&hidden, &owner_auth)
   2905                 .expect("hidden reaction"),
   2906             &hidden,
   2907         );
   2908         let deleted_create = signed_pocket_group_create_event(7, "DeletedHll");
   2909         assert_pocket_accepted(
   2910             relay
   2911                 .handle_pocket_event_with_auth(&deleted_create, &owner_auth)
   2912                 .expect("deleted create"),
   2913             &deleted_create,
   2914         );
   2915         let deleted = signed_pocket_event_at_tags(
   2916             7,
   2917             7,
   2918             vec![h("DeletedHll"), target_tag.clone()],
   2919             "deleted reaction",
   2920             1_714_124_438,
   2921         );
   2922         assert_pocket_accepted(
   2923             relay
   2924                 .handle_pocket_event_with_auth(&deleted, &owner_auth)
   2925                 .expect("deleted reaction"),
   2926             &deleted,
   2927         );
   2928         let delete_group = signed_pocket_event_at_tags(
   2929             7,
   2930             KIND_GROUP_DELETE_GROUP,
   2931             vec![h("DeletedHll")],
   2932             "",
   2933             1_714_124_439,
   2934         );
   2935         assert_pocket_accepted(
   2936             relay
   2937                 .handle_pocket_event_with_auth(&delete_group, &owner_auth)
   2938                 .expect("delete group"),
   2939             &delete_group,
   2940         );
   2941         let unknown = signed_pocket_event_at_tags(
   2942             7,
   2943             7,
   2944             vec![h("UnknownHll"), target_tag.clone()],
   2945             "unknown reaction",
   2946             1_714_124_440,
   2947         );
   2948         relay.store.store_event(&unknown).expect("store unknown");
   2949 
   2950         let authorized_private = relay
   2951             .handle_count_with_auth_protocol(
   2952                 SubscriptionId::new("count-hll-authorized-private").expect("sub"),
   2953                 vec![
   2954                     filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target.clone()]}))
   2955                         .expect("filter"),
   2956                 ],
   2957                 &owner_auth,
   2958             )
   2959             .expect("authorized private count");
   2960         assert!(matches!(
   2961             authorized_private,
   2962             RelayMessage::Count {
   2963                 count: 3,
   2964                 hll: None,
   2965                 ..
   2966             }
   2967         ));
   2968 
   2969         let multi_filter = relay
   2970             .handle_count_with_auth_protocol(
   2971                 SubscriptionId::new("count-hll-multi-filter").expect("sub"),
   2972                 vec![
   2973                     filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target.clone()]}))
   2974                         .expect("filter"),
   2975                     filter_from_value(&serde_json::json!({"kinds":[7],"#e":["c".repeat(64)]}))
   2976                         .expect("filter"),
   2977                 ],
   2978                 &owner_auth,
   2979             )
   2980             .expect("multi count");
   2981         assert!(matches!(
   2982             multi_filter,
   2983             RelayMessage::Count {
   2984                 count: 3,
   2985                 hll: None,
   2986                 ..
   2987             }
   2988         ));
   2989 
   2990         let limited = relay
   2991             .handle_count_with_auth_protocol(
   2992                 SubscriptionId::new("count-hll-limited").expect("sub"),
   2993                 vec![
   2994                     filter_from_value(
   2995                         &serde_json::json!({"kinds":[7],"#e":[target.clone()],"limit":1}),
   2996                     )
   2997                     .expect("filter"),
   2998                 ],
   2999                 &owner_auth,
   3000             )
   3001             .expect("limited count");
   3002         assert!(matches!(
   3003             limited,
   3004             RelayMessage::Count {
   3005                 count: 3,
   3006                 hll: None,
   3007                 ..
   3008             }
   3009         ));
   3010 
   3011         let redacted = relay
   3012             .handle_count_with_auth_protocol(
   3013                 SubscriptionId::new("count-hll-redacted").expect("sub"),
   3014                 vec![
   3015                     filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]}))
   3016                         .expect("filter"),
   3017                 ],
   3018                 &unauth,
   3019             )
   3020             .expect("redacted count");
   3021         assert!(matches!(
   3022             redacted,
   3023             RelayMessage::Count {
   3024                 count: 1,
   3025                 hll: None,
   3026                 ..
   3027             }
   3028         ));
   3029 
   3030         assert_count_without_hll(
   3031             &relay,
   3032             "count-hll-private-h-target",
   3033             serde_json::json!({"kinds":[7],"#h":["PrivateHll"]}),
   3034             None,
   3035             0,
   3036         );
   3037         assert_count_without_hll(
   3038             &relay,
   3039             "count-hll-hidden-h-target",
   3040             serde_json::json!({"kinds":[7],"#h":["HiddenHll"]}),
   3041             None,
   3042             0,
   3043         );
   3044         assert_count_without_hll(
   3045             &relay,
   3046             "count-hll-unknown-h-target",
   3047             serde_json::json!({"kinds":[7],"#h":["UnknownHll"]}),
   3048             None,
   3049             0,
   3050         );
   3051         assert_count_without_hll(
   3052             &relay,
   3053             "count-hll-deleted-h-target",
   3054             serde_json::json!({"kinds":[7],"#h":["DeletedHll"]}),
   3055             None,
   3056             0,
   3057         );
   3058         assert_count_without_hll(
   3059             &relay,
   3060             "count-hll-private-d-target",
   3061             serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PrivateHll"]}),
   3062             None,
   3063             1,
   3064         );
   3065         assert_count_without_hll(
   3066             &relay,
   3067             "count-hll-hidden-d-target",
   3068             serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["HiddenHll"]}),
   3069             None,
   3070             0,
   3071         );
   3072         assert_count_without_hll(
   3073             &relay,
   3074             "count-hll-unknown-d-target",
   3075             serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["UnknownHll"]}),
   3076             None,
   3077             0,
   3078         );
   3079         assert_count_without_hll(
   3080             &relay,
   3081             "count-hll-deleted-d-target",
   3082             serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["DeletedHll"]}),
   3083             None,
   3084             0,
   3085         );
   3086     }
   3087 
   3088     #[test]
   3089     fn base_relay_count_hll_group_target_policy_classifies_h_and_d_targets() {
   3090         let owner = signer(7).public_key().clone();
   3091         let owner_auth = authenticated_state(7);
   3092         let relay = test_relay_with_groups(
   3093             "base-relay-count-hll-target-policy",
   3094             8,
   3095             &enabled_groups_for_owner(&owner),
   3096         );
   3097         for event in [
   3098             signed_pocket_group_create_event(7, "PublicHll"),
   3099             signed_pocket_group_create_event(7, "SecondHll"),
   3100             signed_pocket_private_group_create_event(7, "PrivateHll"),
   3101             signed_pocket_group_create_event_with_tags(
   3102                 7,
   3103                 "HiddenHll",
   3104                 vec![hidden()],
   3105                 1_714_124_435,
   3106             ),
   3107             signed_pocket_group_create_event(7, "DeletedHll"),
   3108         ] {
   3109             assert_pocket_accepted(
   3110                 relay
   3111                     .handle_pocket_event_with_auth(&event, &owner_auth)
   3112                     .expect("group create"),
   3113                 &event,
   3114             );
   3115         }
   3116         let delete_group = signed_pocket_event_at_tags(
   3117             7,
   3118             KIND_GROUP_DELETE_GROUP,
   3119             vec![h("DeletedHll")],
   3120             "",
   3121             1_714_124_436,
   3122         );
   3123         assert_pocket_accepted(
   3124             relay
   3125                 .handle_pocket_event_with_auth(&delete_group, &owner_auth)
   3126                 .expect("delete group"),
   3127             &delete_group,
   3128         );
   3129 
   3130         assert_eq!(
   3131             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["PublicHll"]})),
   3132             BaseRelayCountHllTargetPolicy::Eligible
   3133         );
   3134         assert_eq!(
   3135             hll_target_policy(
   3136                 &relay,
   3137                 serde_json::json!({"kinds":[7],"#h":["PublicHll","SecondHll"]})
   3138             ),
   3139             BaseRelayCountHllTargetPolicy::Eligible
   3140         );
   3141         assert_eq!(
   3142             hll_target_policy(
   3143                 &relay,
   3144                 serde_json::json!({"kinds":[7],"#h":["PublicHll","PrivateHll"]})
   3145             ),
   3146             BaseRelayCountHllTargetPolicy::Suppress
   3147         );
   3148         assert_eq!(
   3149             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["HiddenHll"]})),
   3150             BaseRelayCountHllTargetPolicy::Suppress
   3151         );
   3152         assert_eq!(
   3153             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["DeletedHll"]})),
   3154             BaseRelayCountHllTargetPolicy::Suppress
   3155         );
   3156         assert_eq!(
   3157             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["UnknownHll"]})),
   3158             BaseRelayCountHllTargetPolicy::Suppress
   3159         );
   3160         assert_eq!(
   3161             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":[""]})),
   3162             BaseRelayCountHllTargetPolicy::Suppress
   3163         );
   3164         assert_eq!(
   3165             hll_target_policy(
   3166                 &relay,
   3167                 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PublicHll"]})
   3168             ),
   3169             BaseRelayCountHllTargetPolicy::Eligible
   3170         );
   3171         assert_eq!(
   3172             hll_target_policy(
   3173                 &relay,
   3174                 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PrivateHll"]})
   3175             ),
   3176             BaseRelayCountHllTargetPolicy::Suppress
   3177         );
   3178         assert_eq!(
   3179             hll_target_policy(&relay, serde_json::json!({"#d":["PublicHll"]})),
   3180             BaseRelayCountHllTargetPolicy::Suppress
   3181         );
   3182         assert_eq!(
   3183             hll_target_policy(
   3184                 &relay,
   3185                 serde_json::json!({"kinds":[30023],"#d":["PrivateHll"]})
   3186             ),
   3187             BaseRelayCountHllTargetPolicy::Eligible
   3188         );
   3189 
   3190         let mut private_hll = count_hll_for_target_policy_test();
   3191         let private_filter = [pocket_filter_from_value(
   3192             serde_json::json!({"kinds":[7],"#h":["PrivateHll"]}),
   3193         )];
   3194         private_hll.suppress_for_filter_targets(relay.groups.as_ref(), &private_filter);
   3195         assert!(private_hll.into_hex().is_none());
   3196 
   3197         let mut non_group_hll = count_hll_for_target_policy_test();
   3198         let non_group_filter = [pocket_filter_from_value(
   3199             serde_json::json!({"kinds":[30023],"#d":["PrivateHll"]}),
   3200         )];
   3201         non_group_hll.suppress_for_filter_targets(relay.groups.as_ref(), &non_group_filter);
   3202         assert!(non_group_hll.into_hex().is_some());
   3203     }
   3204 
   3205     #[test]
   3206     fn base_relay_count_hll_group_target_policy_suppresses_unresolved_group_targets() {
   3207         let relay = test_relay("base-relay-count-hll-target-policy-no-groups", 8);
   3208 
   3209         assert_eq!(
   3210             hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["PublicHll"]})),
   3211             BaseRelayCountHllTargetPolicy::Suppress
   3212         );
   3213         assert_eq!(
   3214             hll_target_policy(&relay, serde_json::json!({"#d":["PublicHll"]})),
   3215             BaseRelayCountHllTargetPolicy::Suppress
   3216         );
   3217         assert_eq!(
   3218             hll_target_policy(
   3219                 &relay,
   3220                 serde_json::json!({"kinds":[30023],"#d":["PublicHll"]})
   3221             ),
   3222             BaseRelayCountHllTargetPolicy::Eligible
   3223         );
   3224     }
   3225 
   3226     #[test]
   3227     fn base_relay_count_does_not_apply_default_or_client_limits() {
   3228         let config = test_store_config("base-relay-count-no-default-limit");
   3229         let relay = BaseRelay::open(
   3230             &config,
   3231             BaseRelayLimits::new(BaseRelayLimitSettings {
   3232                 max_pending_events: 4,
   3233                 max_subscription_id_length: 64,
   3234                 max_subscriptions: 64,
   3235                 max_filters_per_request: 10,
   3236                 max_tag_values_per_filter: 10,
   3237                 max_query_complexity: 4,
   3238                 max_event_tags: 200,
   3239                 max_content_length: 65_536,
   3240                 max_limit: 10,
   3241                 default_limit: 1,
   3242             })
   3243             .expect("limits"),
   3244             PocketQueryConfig::default(),
   3245         )
   3246         .expect("relay");
   3247         let first = signed_event_at(7, 1, Vec::new(), "first", 1_714_124_433);
   3248         let second = signed_event_at(7, 1, Vec::new(), "second", 1_714_124_434);
   3249         let third = signed_event_at(7, 1, Vec::new(), "third", 1_714_124_435);
   3250 
   3251         for event in [&first, &second, &third] {
   3252             assert_accepted(relay.handle_event(event.clone()).expect("event"), event);
   3253         }
   3254 
   3255         let unbounded = filter_from_value(&serde_json::json!({
   3256             "authors": [first.unsigned().pubkey().as_str()],
   3257             "kinds": [1]
   3258         }))
   3259         .expect("unbounded");
   3260         let client_limited = filter_from_value(&serde_json::json!({
   3261             "authors": [first.unsigned().pubkey().as_str()],
   3262             "kinds": [1],
   3263             "limit": 1
   3264         }))
   3265         .expect("client limited");
   3266 
   3267         assert_eq!(
   3268             relay
   3269                 .handle_count_protocol(
   3270                     SubscriptionId::new("count-unbounded").expect("sub"),
   3271                     vec![unbounded]
   3272                 )
   3273                 .expect("count"),
   3274             RelayMessage::Count {
   3275                 subscription_id: SubscriptionId::new("count-unbounded").expect("sub"),
   3276                 count: 3,
   3277                 hll: None
   3278             }
   3279         );
   3280         assert_eq!(
   3281             relay
   3282                 .handle_count_protocol(
   3283                     SubscriptionId::new("count-client-limited").expect("sub"),
   3284                     vec![client_limited]
   3285                 )
   3286                 .expect("count"),
   3287             RelayMessage::Count {
   3288                 subscription_id: SubscriptionId::new("count-client-limited").expect("sub"),
   3289                 count: 3,
   3290                 hll: None
   3291             }
   3292         );
   3293     }
   3294 
   3295     #[test]
   3296     fn base_relay_event_path_rejects_invalid_signatures_and_skips_ephemeral_storage() {
   3297         let relay = test_relay("base-relay-event-store-path", 8);
   3298         let valid = signed_public_event(7, 1, Vec::new(), "valid");
   3299         let signature_source = signed_public_event(8, 1, Vec::new(), "signature source");
   3300         let invalid = Event::new(
   3301             valid.id().clone(),
   3302             valid.unsigned().clone(),
   3303             signature_source.sig().clone(),
   3304         );
   3305         let ephemeral = signed_public_event(7, 20_001, Vec::new(), "ephemeral");
   3306 
   3307         assert!(matches!(
   3308             relay.handle_event(invalid.clone()).expect("invalid"),
   3309             RelayMessage::Ok {
   3310                 event_id,
   3311                 accepted: false,
   3312                 message
   3313             } if event_id == *invalid.id()
   3314                 && message.starts_with("invalid:")
   3315         ));
   3316         assert_eq!(count_kind(&relay, 1), 0);
   3317 
   3318         assert_accepted(relay.handle_event(valid.clone()).expect("valid"), &valid);
   3319         assert_eq!(
   3320             relay.handle_event(valid.clone()).expect("duplicate"),
   3321             RelayMessage::Ok {
   3322                 event_id: valid.id().clone(),
   3323                 accepted: true,
   3324                 message: "duplicate: already have this event".to_owned()
   3325             }
   3326         );
   3327         assert_eq!(count_kind(&relay, 1), 1);
   3328 
   3329         assert_accepted(
   3330             relay.handle_event(ephemeral.clone()).expect("ephemeral"),
   3331             &ephemeral,
   3332         );
   3333         assert_eq!(count_kind(&relay, 20_001), 0);
   3334     }
   3335 
   3336     #[test]
   3337     fn base_relay_pocket_event_path_preserves_event_admission_behavior() {
   3338         let relay = test_relay("base-relay-pocket-event-store-path", 8);
   3339         let tags = PocketOwnedTags::empty();
   3340         let protected_tags = PocketOwnedTags::new(&[["-"]]).expect("protected tags");
   3341         let valid_pocket = signed_pocket_event(7, 1, &tags, b"valid");
   3342         let signature_source = signed_pocket_event(8, 1, &tags, b"valid");
   3343         let invalid_pocket = PocketOwnedEvent::new(
   3344             valid_pocket.id(),
   3345             valid_pocket.kind(),
   3346             valid_pocket.pubkey(),
   3347             signature_source.sig(),
   3348             valid_pocket.tags().expect("tags"),
   3349             valid_pocket.created_at(),
   3350             valid_pocket.content(),
   3351         )
   3352         .expect("invalid pocket");
   3353         let ephemeral_pocket = signed_pocket_event(7, 20_001, &tags, b"ephemeral");
   3354         let protected_pocket = signed_pocket_event(7, 1, &protected_tags, b"protected");
   3355 
   3356         assert!(
   3357             rejected_message(relay.handle_pocket_event(&invalid_pocket).expect("invalid"))
   3358                 .starts_with("invalid:")
   3359         );
   3360         assert_eq!(count_kind(&relay, 1), 0);
   3361 
   3362         assert_pocket_accepted(
   3363             relay
   3364                 .handle_pocket_event(&valid_pocket)
   3365                 .expect("valid pocket"),
   3366             &valid_pocket,
   3367         );
   3368         assert_eq!(
   3369             relay.handle_pocket_event(&valid_pocket).expect("duplicate"),
   3370             RelayMessage::Ok {
   3371                 event_id: pocket_event_id(&valid_pocket),
   3372                 accepted: true,
   3373                 message: "duplicate: already have this event".to_owned()
   3374             }
   3375         );
   3376         assert_eq!(count_kind(&relay, 1), 1);
   3377 
   3378         assert_pocket_accepted(
   3379             relay
   3380                 .handle_pocket_event(&ephemeral_pocket)
   3381                 .expect("ephemeral"),
   3382             &ephemeral_pocket,
   3383         );
   3384         assert_eq!(count_kind(&relay, 20_001), 0);
   3385 
   3386         assert_eq!(
   3387             rejected_message(
   3388                 relay
   3389                     .handle_pocket_event(&protected_pocket)
   3390                     .expect("protected")
   3391             ),
   3392             "auth-required: protected event requires authenticated event author"
   3393         );
   3394         assert_pocket_accepted(
   3395             relay
   3396                 .handle_pocket_event_with_auth(&protected_pocket, &authenticated_state(7))
   3397                 .expect("protected auth"),
   3398             &protected_pocket,
   3399         );
   3400     }
   3401 
   3402     #[test]
   3403     fn group_write_source_uses_atomic_service_boundary() {
   3404         let core_source = include_str!("core.rs");
   3405         let group_source = include_str!("../groups.rs");
   3406 
   3407         assert!(core_source.contains("groups.store_group_pocket_event"));
   3408         assert!(!core_source.contains(concat!("groups.", "check_event")));
   3409         assert!(!core_source.contains(concat!("groups.", "after_source_event_stored")));
   3410         assert!(!core_source.contains(concat!(
   3411             "let tangle_event = ",
   3412             "pocket_event_to_tangle(event)?;"
   3413         )));
   3414         assert!(!group_source.contains("pub(crate) fn check_event("));
   3415         assert!(!group_source.contains("pub(crate) fn after_source_event_stored("));
   3416     }
   3417 
   3418     #[test]
   3419     fn base_relay_event_path_preserves_chorus_parity() {
   3420         let owner = signer(7).public_key().clone();
   3421         let relay = test_relay_with_groups(
   3422             "base-relay-event-chorus-parity",
   3423             8,
   3424             &enabled_groups_for_owner(&owner),
   3425         );
   3426         let valid = signed_public_event(7, 1, Vec::new(), "valid");
   3427         let signature_source = signed_public_event(8, 1, Vec::new(), "signature source");
   3428         let invalid = Event::new(
   3429             valid.id().clone(),
   3430             valid.unsigned().clone(),
   3431             signature_source.sig().clone(),
   3432         );
   3433         let ephemeral = signed_public_event(7, 20_001, Vec::new(), "ephemeral");
   3434         let protected = signed_public_event(
   3435             7,
   3436             1,
   3437             vec![Tag::from_parts("-", &[]).expect("protected")],
   3438             "protected",
   3439         );
   3440         let group_create = signed_group_create_event(7, "ParityFarm");
   3441         let empty_auth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth");
   3442 
   3443         assert!(
   3444             rejected_message(relay.handle_event(invalid.clone()).expect("invalid"))
   3445                 .starts_with("invalid:")
   3446         );
   3447         assert_eq!(count_kind(&relay, 1), 0);
   3448 
   3449         assert_accepted(relay.handle_event(valid.clone()).expect("valid"), &valid);
   3450         assert_eq!(count_kind(&relay, 1), 1);
   3451         assert_eq!(
   3452             relay.handle_event(valid.clone()).expect("duplicate"),
   3453             RelayMessage::Ok {
   3454                 event_id: valid.id().clone(),
   3455                 accepted: true,
   3456                 message: "duplicate: already have this event".to_owned()
   3457             }
   3458         );
   3459         assert_eq!(count_kind(&relay, 1), 1);
   3460 
   3461         assert_accepted(
   3462             relay.handle_event(ephemeral.clone()).expect("ephemeral"),
   3463             &ephemeral,
   3464         );
   3465         assert_eq!(count_kind(&relay, 20_001), 0);
   3466 
   3467         assert_eq!(
   3468             rejected_message(relay.handle_event(protected.clone()).expect("protected")),
   3469             "auth-required: protected event requires authenticated event author"
   3470         );
   3471         assert_eq!(
   3472             rejected_message(
   3473                 relay
   3474                     .handle_event_with_auth(group_create.clone(), &empty_auth)
   3475                     .expect("group unauth")
   3476             ),
   3477             "auth-required: group event author must authenticate with AUTH"
   3478         );
   3479         assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 0);
   3480 
   3481         assert_accepted(
   3482             relay
   3483                 .handle_event_with_auth(group_create.clone(), &authenticated_state(7))
   3484                 .expect("group auth"),
   3485             &group_create,
   3486         );
   3487         assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 1);
   3488         assert!(
   3489             relay
   3490                 .group_projection()
   3491                 .expect("projection")
   3492                 .group(&GroupId::new("ParityFarm").expect("group"))
   3493                 .is_some()
   3494         );
   3495     }
   3496 
   3497     #[test]
   3498     fn base_relay_enforces_nip70_protected_event_author_auth() {
   3499         let relay = test_relay("base-relay-nip70-protected", 8);
   3500         let protected = signed_public_event(
   3501             7,
   3502             1,
   3503             vec![Tag::from_parts("-", &[]).expect("protected")],
   3504             "protected",
   3505         );
   3506 
   3507         assert_eq!(
   3508             rejected_message(relay.handle_event(protected.clone()).expect("unauth")),
   3509             "auth-required: protected event requires authenticated event author"
   3510         );
   3511         assert_eq!(count_kind(&relay, 1), 0);
   3512         assert_eq!(
   3513             rejected_message(
   3514                 relay
   3515                     .handle_event_with_auth(protected.clone(), &authenticated_state(8))
   3516                     .expect("wrong auth")
   3517             ),
   3518             "auth-required: protected event requires authenticated event author"
   3519         );
   3520         assert_eq!(count_kind(&relay, 1), 0);
   3521         assert_accepted(
   3522             relay
   3523                 .handle_event_with_auth(protected.clone(), &authenticated_state(7))
   3524                 .expect("author auth"),
   3525             &protected,
   3526         );
   3527         assert_eq!(count_kind(&relay, 1), 1);
   3528     }
   3529 
   3530     #[test]
   3531     fn base_relay_rejects_group_marked_events_before_group_service() {
   3532         let relay = test_relay("base-relay-group-reject", 4);
   3533         let event = signed_public_event(
   3534             7,
   3535             1,
   3536             vec![Tag::from_parts("h", &["public-group"]).expect("group")],
   3537             "hello",
   3538         );
   3539 
   3540         assert_eq!(
   3541             relay.handle_event(event.clone()).expect("event"),
   3542             RelayMessage::Ok {
   3543                 event_id: event.id().clone(),
   3544                 accepted: false,
   3545                 message: "blocked: NIP-29 group events are not accepted before group service"
   3546                     .to_owned()
   3547             }
   3548         );
   3549     }
   3550 
   3551     #[test]
   3552     fn base_relay_rejects_client_submitted_relay_generated_group_state() {
   3553         let relay = test_relay("base-relay-generated-group-reject", 4);
   3554         for kind in NIP29_RELAY_GENERATED_KIND_VALUES {
   3555             let event = signed_public_event(
   3556                 7,
   3557                 kind.into(),
   3558                 vec![Tag::from_parts("d", &["public-group"]).expect("group")],
   3559                 "",
   3560             );
   3561 
   3562             assert_eq!(
   3563                 relay.handle_event(event.clone()).expect("event"),
   3564                 RelayMessage::Ok {
   3565                     event_id: event.id().clone(),
   3566                     accepted: false,
   3567                     message:
   3568                         "blocked: relay-generated group state events cannot be submitted by clients"
   3569                             .to_owned()
   3570                 }
   3571             );
   3572         }
   3573     }
   3574 
   3575     #[test]
   3576     fn base_relay_initializes_group_service_from_config() {
   3577         let owner = signer(7).public_key().clone();
   3578         let relay = test_relay_with_groups(
   3579             "base-relay-groups-enabled",
   3580             4,
   3581             &enabled_groups_for_owner(&owner),
   3582         );
   3583         let disabled = test_relay_with_groups("base-relay-groups-disabled", 4, &disabled_groups());
   3584 
   3585         assert!(relay.groups_enabled());
   3586         assert_eq!(
   3587             relay
   3588                 .readiness_state()
   3589                 .response()
   3590                 .checks
   3591                 .group_outbox_replay,
   3592             "ready"
   3593         );
   3594         assert!(
   3595             relay
   3596                 .group_projection()
   3597                 .expect("projection")
   3598                 .groups()
   3599                 .is_empty()
   3600         );
   3601         assert!(!disabled.groups_enabled());
   3602         assert_eq!(
   3603             disabled
   3604                 .readiness_state()
   3605                 .response()
   3606                 .checks
   3607                 .group_outbox_replay,
   3608             "ready"
   3609         );
   3610         assert!(disabled.group_projection().is_none());
   3611     }
   3612 
   3613     #[test]
   3614     fn group_event_write_requires_auth_before_storage() {
   3615         let owner = signer(7).public_key().clone();
   3616         let relay = test_relay_with_groups(
   3617             "base-relay-group-auth-required",
   3618             4,
   3619             &enabled_groups_for_owner(&owner),
   3620         );
   3621         let auth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth");
   3622         let event = signed_group_create_event(7, "Farm");
   3623 
   3624         assert_eq!(
   3625             relay
   3626                 .handle_event_with_auth(event.clone(), &auth)
   3627                 .expect("event"),
   3628             RelayMessage::Ok {
   3629                 event_id: event.id().clone(),
   3630                 accepted: false,
   3631                 message: "auth-required: group event author must authenticate with AUTH".to_owned()
   3632             }
   3633         );
   3634         assert!(
   3635             relay
   3636                 .group_projection()
   3637                 .expect("projection")
   3638                 .group(&GroupId::new("Farm").expect("group"))
   3639                 .is_none()
   3640         );
   3641         assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 0);
   3642     }
   3643 
   3644     #[test]
   3645     fn group_create_updates_projection_and_stores_generated_snapshots() {
   3646         let owner = signer(7).public_key().clone();
   3647         let relay = test_relay_with_groups(
   3648             "base-relay-group-create",
   3649             4,
   3650             &enabled_groups_for_owner(&owner),
   3651         );
   3652         let auth = authenticated_state(7);
   3653         let event = signed_group_create_event(7, "Farm");
   3654 
   3655         assert_eq!(
   3656             relay
   3657                 .handle_event_with_auth(event.clone(), &auth)
   3658                 .expect("event"),
   3659             RelayMessage::Ok {
   3660                 event_id: event.id().clone(),
   3661                 accepted: true,
   3662                 message: String::new()
   3663             }
   3664         );
   3665 
   3666         let group_id = GroupId::new("Farm").expect("group");
   3667         assert!(
   3668             relay
   3669                 .group_projection()
   3670                 .expect("projection")
   3671                 .group(&group_id)
   3672                 .is_some()
   3673         );
   3674         assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 1);
   3675         assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
   3676         assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
   3677     }
   3678 
   3679     #[test]
   3680     fn group_join_materializes_relay_membership_event() {
   3681         let owner = signer(7).public_key().clone();
   3682         let joiner = signer(8).public_key().clone();
   3683         let relay = test_relay_with_groups(
   3684             "base-relay-group-join",
   3685             4,
   3686             &enabled_groups_for_owner_with_public_join(&owner),
   3687         );
   3688         let create = signed_group_create_event(7, "Farm");
   3689         assert_accepted(
   3690             relay
   3691                 .handle_event_with_auth(create.clone(), &authenticated_state(7))
   3692                 .expect("create"),
   3693             &create,
   3694         );
   3695         let join = signed_event_at(
   3696             8,
   3697             KIND_GROUP_JOIN_REQUEST.into(),
   3698             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   3699             "",
   3700             1_714_124_434,
   3701         );
   3702 
   3703         assert_eq!(
   3704             relay
   3705                 .handle_event_with_auth(join.clone(), &authenticated_state(8))
   3706                 .expect("join"),
   3707             RelayMessage::Ok {
   3708                 event_id: join.id().clone(),
   3709                 accepted: true,
   3710                 message: String::new()
   3711             }
   3712         );
   3713 
   3714         assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 1);
   3715         assert_eq!(
   3716             relay
   3717                 .group_projection()
   3718                 .expect("projection")
   3719                 .member(&GroupId::new("Farm").expect("group"), &joiner)
   3720                 .expect("member")
   3721                 .status(),
   3722             MemberStatus::Member
   3723         );
   3724     }
   3725 
   3726     #[test]
   3727     fn group_join_requires_public_join_policy() {
   3728         let owner = signer(7).public_key().clone();
   3729         let relay = test_relay_with_groups(
   3730             "base-relay-group-join-default-deny",
   3731             4,
   3732             &enabled_groups_for_owner(&owner),
   3733         );
   3734         let create = signed_group_create_event(7, "Farm");
   3735         relay
   3736             .handle_event_with_auth(create, &authenticated_state(7))
   3737             .expect("create");
   3738         let join = signed_event_at(
   3739             8,
   3740             KIND_GROUP_JOIN_REQUEST.into(),
   3741             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   3742             "",
   3743             1_714_124_434,
   3744         );
   3745 
   3746         assert_eq!(
   3747             rejected_message(
   3748                 relay
   3749                     .handle_event_with_auth(join, &authenticated_state(8))
   3750                     .expect("join")
   3751             ),
   3752             "restricted: group is unavailable"
   3753         );
   3754         assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 0);
   3755     }
   3756 
   3757     #[test]
   3758     fn group_metadata_edit_replaces_generated_metadata_snapshot() {
   3759         let owner = signer(7).public_key().clone();
   3760         let mut relay = test_relay_with_groups(
   3761             "base-relay-group-metadata-edit",
   3762             4,
   3763             &enabled_groups_for_owner(&owner),
   3764         );
   3765         let auth = authenticated_state(7);
   3766         let create = signed_group_create_event(7, "Farm");
   3767         assert_accepted(
   3768             relay
   3769                 .handle_event_with_auth(create.clone(), &auth)
   3770                 .expect("create"),
   3771             &create,
   3772         );
   3773         let edit = signed_event_at(
   3774             7,
   3775             KIND_GROUP_EDIT_METADATA.into(),
   3776             vec![h("Farm"), name("Market")],
   3777             "",
   3778             1_714_124_436,
   3779         );
   3780         assert_accepted(
   3781             relay
   3782                 .handle_event_with_auth(edit.clone(), &auth)
   3783                 .expect("edit"),
   3784             &edit,
   3785         );
   3786 
   3787         let group_id = GroupId::new("Farm").expect("group");
   3788         {
   3789             let projection = relay.group_projection().expect("projection");
   3790             let group = projection.group(&group_id).expect("group");
   3791             assert_eq!(group.metadata().name(), Some("Market"));
   3792         }
   3793         let metadata = query_filter(
   3794             &mut relay,
   3795             "metadata-edit",
   3796             filter_group_tag(KIND_GROUP_METADATA, "d", "Farm"),
   3797         );
   3798         assert_eq!(metadata.len(), 1);
   3799         assert!(has_tag(&metadata[0], "d", &["Farm"]));
   3800         assert!(has_tag(&metadata[0], "name", &["Market"]));
   3801         assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
   3802     }
   3803 
   3804     #[test]
   3805     fn group_member_moderation_join_leave_and_snapshots_flow() {
   3806         let owner = signer(7).public_key().clone();
   3807         let member = signer(8).public_key().clone();
   3808         let target = signer(9).public_key().clone();
   3809         let relay = test_relay_with_groups(
   3810             "base-relay-group-member-flow",
   3811             4,
   3812             &enabled_groups_for_owner_with_public_join(&owner),
   3813         );
   3814         let owner_auth = authenticated_state(7);
   3815         let member_auth = authenticated_state(8);
   3816         let target_auth = authenticated_state(9);
   3817         relay
   3818             .handle_event_with_auth(signed_group_create_event(7, "Farm"), &owner_auth)
   3819             .expect("create");
   3820         let rejected_add = signed_event_at(
   3821             9,
   3822             KIND_GROUP_PUT_USER.into(),
   3823             vec![h("Farm"), p(&target)],
   3824             "",
   3825             1_714_124_434,
   3826         );
   3827         assert_eq!(
   3828             rejected_message(
   3829                 relay
   3830                     .handle_event_with_auth(rejected_add.clone(), &target_auth)
   3831                     .expect("rejected add")
   3832             ),
   3833             "restricted: missing group capability manage_members"
   3834         );
   3835         let add = signed_event_at(
   3836             7,
   3837             KIND_GROUP_PUT_USER.into(),
   3838             vec![h("Farm"), p(&member)],
   3839             "",
   3840             1_714_124_435,
   3841         );
   3842         assert_accepted(
   3843             relay
   3844                 .handle_event_with_auth(add.clone(), &owner_auth)
   3845                 .expect("add"),
   3846             &add,
   3847         );
   3848         assert_member_status(&relay, "Farm", &member, MemberStatus::Member);
   3849         assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1);
   3850 
   3851         let remove = signed_event_at(
   3852             7,
   3853             KIND_GROUP_REMOVE_USER.into(),
   3854             vec![h("Farm"), p(&member)],
   3855             "",
   3856             1_714_124_436,
   3857         );
   3858         assert_accepted(
   3859             relay
   3860                 .handle_event_with_auth(remove.clone(), &owner_auth)
   3861                 .expect("remove"),
   3862             &remove,
   3863         );
   3864         assert_member_status(&relay, "Farm", &member, MemberStatus::Removed);
   3865         assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1);
   3866 
   3867         let join = signed_event_at(
   3868             8,
   3869             KIND_GROUP_JOIN_REQUEST.into(),
   3870             vec![h("Farm")],
   3871             "",
   3872             1_714_124_437,
   3873         );
   3874         assert_accepted(
   3875             relay
   3876                 .handle_event_with_auth(join.clone(), &member_auth)
   3877                 .expect("join"),
   3878             &join,
   3879         );
   3880         assert_member_status(&relay, "Farm", &member, MemberStatus::Member);
   3881         let duplicate_join = signed_event_at(
   3882             8,
   3883             KIND_GROUP_JOIN_REQUEST.into(),
   3884             vec![h("Farm")],
   3885             "",
   3886             1_714_124_438,
   3887         );
   3888         assert_eq!(
   3889             rejected_message(
   3890                 relay
   3891                     .handle_event_with_auth(duplicate_join, &member_auth)
   3892                     .expect("duplicate join")
   3893             ),
   3894             "duplicate: group member already exists"
   3895         );
   3896 
   3897         let leave = signed_event_at(
   3898             8,
   3899             KIND_GROUP_LEAVE_REQUEST.into(),
   3900             vec![h("Farm")],
   3901             "",
   3902             1_714_124_439,
   3903         );
   3904         assert_accepted(
   3905             relay
   3906                 .handle_event_with_auth(leave.clone(), &member_auth)
   3907                 .expect("leave"),
   3908             &leave,
   3909         );
   3910         assert_member_status(&relay, "Farm", &member, MemberStatus::Removed);
   3911         assert_eq!(count_kind(&relay, KIND_GROUP_REMOVE_USER), 2);
   3912         let duplicate_leave = signed_event_at(
   3913             8,
   3914             KIND_GROUP_LEAVE_REQUEST.into(),
   3915             vec![h("Farm")],
   3916             "",
   3917             1_714_124_440,
   3918         );
   3919         assert_eq!(
   3920             rejected_message(
   3921                 relay
   3922                     .handle_event_with_auth(duplicate_leave, &member_auth)
   3923                     .expect("duplicate leave")
   3924             ),
   3925             "duplicate: group member does not exist"
   3926         );
   3927     }
   3928 
   3929     #[test]
   3930     fn group_delete_event_moderation_hides_target_and_validates_group() {
   3931         let owner = signer(7).public_key().clone();
   3932         let outsider_auth = authenticated_state(8);
   3933         let owner_auth = authenticated_state(7);
   3934         let relay = test_relay_with_groups(
   3935             "base-relay-group-delete-event",
   3936             4,
   3937             &enabled_groups_for_owner(&owner),
   3938         );
   3939         relay
   3940             .handle_event_with_auth(signed_group_create_event(7, "Farm"), &owner_auth)
   3941             .expect("create farm");
   3942         relay
   3943             .handle_event_with_auth(signed_group_create_event(7, "Other"), &owner_auth)
   3944             .expect("create other");
   3945         let target = signed_event_at(7, 1, vec![h("Farm")], "harvest", 1_714_124_434);
   3946         let other = signed_event_at(7, 1, vec![h("Other")], "other", 1_714_124_435);
   3947         relay
   3948             .handle_event_with_auth(target.clone(), &owner_auth)
   3949             .expect("target");
   3950         relay
   3951             .handle_event_with_auth(other.clone(), &owner_auth)
   3952             .expect("other");
   3953 
   3954         let wrong_group = signed_event_at(
   3955             7,
   3956             KIND_GROUP_DELETE_EVENT.into(),
   3957             vec![h("Farm"), e(other.id())],
   3958             "",
   3959             1_714_124_436,
   3960         );
   3961         assert_eq!(
   3962             rejected_message(
   3963                 relay
   3964                     .handle_event_with_auth(wrong_group, &owner_auth)
   3965                     .expect("wrong group")
   3966             ),
   3967             "invalid: delete target event is not in group"
   3968         );
   3969         let unauthorized = signed_event_at(
   3970             8,
   3971             KIND_GROUP_DELETE_EVENT.into(),
   3972             vec![h("Farm"), e(target.id())],
   3973             "",
   3974             1_714_124_437,
   3975         );
   3976         assert_eq!(
   3977             rejected_message(
   3978                 relay
   3979                     .handle_event_with_auth(unauthorized, &outsider_auth)
   3980                     .expect("unauthorized")
   3981             ),
   3982             "restricted: missing group capability delete_events"
   3983         );
   3984         assert_eq!(
   3985             count_filter(
   3986                 &relay,
   3987                 "target-before-delete",
   3988                 filter_group_tag(1, "h", "Farm")
   3989             ),
   3990             1
   3991         );
   3992 
   3993         let delete = signed_event_at(
   3994             7,
   3995             KIND_GROUP_DELETE_EVENT.into(),
   3996             vec![h("Farm"), e(target.id())],
   3997             "",
   3998             1_714_124_438,
   3999         );
   4000         assert_accepted(
   4001             relay
   4002                 .handle_event_with_auth(delete.clone(), &owner_auth)
   4003                 .expect("delete"),
   4004             &delete,
   4005         );
   4006 
   4007         assert_eq!(
   4008             count_filter(
   4009                 &relay,
   4010                 "target-after-delete",
   4011                 filter_group_tag(1, "h", "Farm")
   4012             ),
   4013             0
   4014         );
   4015         assert_eq!(
   4016             count_filter(
   4017                 &relay,
   4018                 "delete-event-marker",
   4019                 filter_group_tag(KIND_GROUP_DELETE_EVENT, "h", "Farm")
   4020             ),
   4021             1
   4022         );
   4023     }
   4024 
   4025     #[test]
   4026     fn group_delete_tombstone_hides_events_and_rejects_future_writes() {
   4027         let owner = signer(7).public_key().clone();
   4028         let auth = authenticated_state(7);
   4029         let relay = test_relay_with_groups(
   4030             "base-relay-group-delete-tombstone",
   4031             4,
   4032             &enabled_groups_for_owner(&owner),
   4033         );
   4034         relay
   4035             .handle_event_with_auth(signed_group_create_event(7, "Farm"), &auth)
   4036             .expect("create");
   4037         let normal = signed_event_at(7, 1, vec![h("Farm")], "harvest", 1_714_124_434);
   4038         relay.handle_event_with_auth(normal, &auth).expect("normal");
   4039         let delete_group = signed_event_at(
   4040             7,
   4041             KIND_GROUP_DELETE_GROUP.into(),
   4042             vec![h("Farm")],
   4043             "",
   4044             1_714_124_435,
   4045         );
   4046         assert_accepted(
   4047             relay
   4048                 .handle_event_with_auth(delete_group.clone(), &auth)
   4049                 .expect("delete group"),
   4050             &delete_group,
   4051         );
   4052 
   4053         let future = signed_event_at(7, 1, vec![h("Farm")], "future", 1_714_124_436);
   4054         assert_eq!(
   4055             rejected_message(relay.handle_event_with_auth(future, &auth).expect("future")),
   4056             "blocked: group is deleted"
   4057         );
   4058         assert_eq!(
   4059             count_filter(
   4060                 &relay,
   4061                 "deleted-group-normal",
   4062                 filter_group_tag(1, "h", "Farm")
   4063             ),
   4064             0
   4065         );
   4066         assert_eq!(
   4067             count_filter(
   4068                 &relay,
   4069                 "deleted-group-marker",
   4070                 filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "Farm")
   4071             ),
   4072             1
   4073         );
   4074     }
   4075 
   4076     #[test]
   4077     fn strict_closed_restricted_hidden_and_disabled_invite_flows() {
   4078         let owner = signer(7).public_key().clone();
   4079         let outsider_auth = authenticated_state(8);
   4080         let owner_auth = authenticated_state(7);
   4081         let relay = test_relay_with_groups(
   4082             "base-relay-group-strict-policy-flow",
   4083             4,
   4084             &enabled_groups_for_owner(&owner),
   4085         );
   4086         relay
   4087             .handle_event_with_auth(
   4088                 signed_group_create_event_with_tags(7, "Restricted", vec![restricted()], 1),
   4089                 &owner_auth,
   4090             )
   4091             .expect("restricted create");
   4092         let restricted_write =
   4093             signed_event_at(8, 1, vec![h("Restricted")], "restricted", 1_714_124_434);
   4094         assert_eq!(
   4095             rejected_message(
   4096                 relay
   4097                     .handle_event_with_auth(restricted_write, &outsider_auth)
   4098                     .expect("restricted write")
   4099             ),
   4100             "restricted: group is unavailable"
   4101         );
   4102 
   4103         relay
   4104             .handle_event_with_auth(
   4105                 signed_group_create_event_with_tags(7, "Closed", vec![closed()], 2),
   4106                 &owner_auth,
   4107             )
   4108             .expect("closed create");
   4109         let closed_join = signed_event_at(
   4110             8,
   4111             KIND_GROUP_JOIN_REQUEST.into(),
   4112             vec![h("Closed")],
   4113             "",
   4114             1_714_124_435,
   4115         );
   4116         assert_eq!(
   4117             rejected_message(
   4118                 relay
   4119                     .handle_event_with_auth(closed_join, &outsider_auth)
   4120                     .expect("closed join")
   4121             ),
   4122             "restricted: group is unavailable"
   4123         );
   4124         let closed_normal = signed_event_at(8, 1, vec![h("Closed")], "open", 1_714_124_436);
   4125         assert_accepted(
   4126             relay
   4127                 .handle_event_with_auth(closed_normal.clone(), &outsider_auth)
   4128                 .expect("closed normal"),
   4129             &closed_normal,
   4130         );
   4131 
   4132         relay
   4133             .handle_event_with_auth(
   4134                 signed_group_create_event_with_tags(7, "Hidden", vec![hidden()], 3),
   4135                 &owner_auth,
   4136             )
   4137             .expect("hidden create");
   4138         assert_eq!(
   4139             count_filter(
   4140                 &relay,
   4141                 "hidden-unauth",
   4142                 filter_group_tag(KIND_GROUP_METADATA, "d", "Hidden")
   4143             ),
   4144             0
   4145         );
   4146         assert_eq!(
   4147             count_filter_with_auth(
   4148                 &relay,
   4149                 "hidden-owner",
   4150                 filter_group_tag(KIND_GROUP_METADATA, "d", "Hidden"),
   4151                 &owner_auth
   4152             ),
   4153             1
   4154         );
   4155 
   4156         let invite = signed_event_at(
   4157             7,
   4158             KIND_GROUP_CREATE_INVITE.into(),
   4159             vec![h("Closed")],
   4160             "",
   4161             1_714_124_437,
   4162         );
   4163         assert_eq!(
   4164             rejected_message(
   4165                 relay
   4166                     .handle_event_with_auth(invite, &owner_auth)
   4167                     .expect("invite")
   4168             ),
   4169             "restricted: invites not enabled"
   4170         );
   4171     }
   4172 
   4173     #[test]
   4174     fn private_group_req_and_count_use_reader_auth() {
   4175         let owner = signer(7).public_key().clone();
   4176         let auth = authenticated_state(7);
   4177         let mut relay = test_relay_with_groups(
   4178             "base-relay-private-read",
   4179             4,
   4180             &enabled_groups_for_owner(&owner),
   4181         );
   4182         relay
   4183             .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth)
   4184             .expect("create");
   4185         let private_event = signed_event_at(
   4186             7,
   4187             1,
   4188             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   4189             "private harvest",
   4190             1_714_124_435,
   4191         );
   4192         relay
   4193             .handle_event_with_auth(private_event.clone(), &auth)
   4194             .expect("private event");
   4195 
   4196         let unauth_sub = SubscriptionId::new("private-unauth").expect("sub");
   4197         let auth_sub = SubscriptionId::new("private-auth").expect("sub");
   4198         assert_eq!(
   4199             relay
   4200                 .handle_protocol_req_for_test(unauth_sub.clone(), vec![filter_kind(1)])
   4201                 .expect("unauth req"),
   4202             vec![RelayMessage::Closed {
   4203                 subscription_id: unauth_sub,
   4204                 message: "auth-required: authentication required to read group events".to_owned()
   4205             }]
   4206         );
   4207         assert_eq!(relay.active_subscription_count(), 0);
   4208         assert!(matches!(
   4209             relay
   4210                 .handle_protocol_req_with_auth_for_test(auth_sub.clone(), vec![filter_kind(1)], &auth)
   4211                 .expect("auth req")
   4212                 .as_slice(),
   4213             [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)]
   4214                 if subscription_id == &auth_sub && event.id() == private_event.id() && eose == &auth_sub
   4215         ));
   4216         assert_eq!(count_kind(&relay, 1), 0);
   4217         assert_eq!(count_kind_with_auth(&relay, 1, &auth), 1);
   4218         assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
   4219         assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
   4220         assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 0);
   4221         assert_eq!(count_kind_with_auth(&relay, KIND_GROUP_METADATA, &auth), 1);
   4222         assert_eq!(count_kind_with_auth(&relay, KIND_GROUP_ADMINS, &auth), 1);
   4223     }
   4224 
   4225     #[test]
   4226     fn private_and_hidden_group_offset_lookup_uses_reader_auth() {
   4227         let owner = signer(7).public_key().clone();
   4228         let owner_auth = authenticated_state(7);
   4229         let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   4230         let relay = test_relay_with_groups(
   4231             "base-relay-private-offset-read",
   4232             4,
   4233             &enabled_groups_for_owner(&owner),
   4234         );
   4235         relay
   4236             .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &owner_auth)
   4237             .expect("create");
   4238         let private_event = signed_event_at(
   4239             7,
   4240             1,
   4241             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   4242             "private harvest",
   4243             1_714_124_435,
   4244         );
   4245         let pocket = tangle_event_to_pocket(&private_event).expect("pocket");
   4246         let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store"));
   4247 
   4248         assert_eq!(
   4249             relay
   4250                 .event_by_offset_with_auth(offset, &unauth)
   4251                 .expect("unauth offset"),
   4252             None
   4253         );
   4254         let visible = relay
   4255             .event_by_offset_with_auth(offset, &owner_auth)
   4256             .expect("owner offset")
   4257             .expect("visible");
   4258         let visible: &PocketEvent = &visible;
   4259         assert_eq!(visible.id().as_hex_string(), private_event.id().as_str());
   4260 
   4261         relay
   4262             .handle_event_with_auth(
   4263                 signed_group_create_event_with_tags(7, "HiddenFarm", vec![hidden()], 1_714_124_436),
   4264                 &owner_auth,
   4265             )
   4266             .expect("hidden create");
   4267         let hidden_event = signed_event_at(
   4268             7,
   4269             1,
   4270             vec![Tag::from_parts("h", &["HiddenFarm"]).expect("h")],
   4271             "hidden harvest",
   4272             1_714_124_437,
   4273         );
   4274         let pocket = tangle_event_to_pocket(&hidden_event).expect("hidden pocket");
   4275         let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store hidden"));
   4276 
   4277         assert_eq!(
   4278             relay
   4279                 .event_by_offset_with_auth(offset, &unauth)
   4280                 .expect("hidden unauth offset"),
   4281             None
   4282         );
   4283         let visible = relay
   4284             .event_by_offset_with_auth(offset, &owner_auth)
   4285             .expect("hidden owner offset")
   4286             .expect("hidden visible");
   4287         let visible: &PocketEvent = &visible;
   4288         assert_eq!(visible.id().as_hex_string(), hidden_event.id().as_str());
   4289     }
   4290 
   4291     #[test]
   4292     fn private_group_live_fanout_uses_current_auth() {
   4293         let owner = signer(7).public_key().clone();
   4294         let auth = authenticated_state(7);
   4295         let mut relay = test_relay_with_groups(
   4296             "base-relay-private-fanout",
   4297             4,
   4298             &enabled_groups_for_owner(&owner),
   4299         );
   4300         relay
   4301             .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth)
   4302             .expect("create");
   4303         let subscription_id = SubscriptionId::new("fanout-current-auth").expect("sub");
   4304         relay
   4305             .handle_protocol_req_for_test(subscription_id.clone(), vec![filter_kind(1)])
   4306             .expect("sub");
   4307         let private_event = signed_event_at(
   4308             7,
   4309             1,
   4310             vec![Tag::from_parts("h", &["Farm"]).expect("h")],
   4311             "private harvest",
   4312             1_714_124_435,
   4313         );
   4314         relay
   4315             .handle_event_with_auth(private_event.clone(), &auth)
   4316             .expect("private event");
   4317 
   4318         assert!(relay.fanout_protocol_for_test(&private_event).is_empty());
   4319         assert!(matches!(
   4320             relay
   4321                 .fanout_protocol_with_group_auth_for_test(
   4322                     &private_event,
   4323                     &GroupAuthContext::new([owner])
   4324                 )
   4325                 .as_slice(),
   4326             [RelayMessage::Event {
   4327                 subscription_id: delivered,
   4328                 event
   4329             }] if delivered == &subscription_id && event.id() == private_event.id()
   4330         ));
   4331     }
   4332 
   4333     #[test]
   4334     fn live_subscription_delivery_volume_does_not_close_subscription() {
   4335         let mut relay = test_relay("base-relay-delivery-volume", 1);
   4336         let subscription_id = SubscriptionId::new("sub-volume").expect("sub");
   4337         let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter");
   4338         relay
   4339             .handle_protocol_req_for_test(subscription_id.clone(), vec![filter])
   4340             .expect("req");
   4341         let first = signed_public_event(7, 1, Vec::new(), "first");
   4342         let second = signed_public_event(7, 1, Vec::new(), "second");
   4343 
   4344         assert!(matches!(
   4345             relay.fanout_protocol_for_test(&first).as_slice(),
   4346             [RelayMessage::Event { .. }]
   4347         ));
   4348         assert!(matches!(
   4349             relay.fanout_protocol_for_test(&second).as_slice(),
   4350             [RelayMessage::Event { .. }]
   4351         ));
   4352         assert_eq!(relay.active_subscription_count(), 1);
   4353     }
   4354 
   4355     #[test]
   4356     fn base_relay_shutdown_closes_live_subscriptions_and_syncs_store() {
   4357         let config = test_store_config("base-relay-shutdown");
   4358         let mut relay =
   4359             BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default()).expect("relay");
   4360         let event = signed_public_event(7, 1, Vec::new(), "shutdown");
   4361         let subscription_id = SubscriptionId::new("sub-shutdown").expect("sub");
   4362 
   4363         assert_accepted(relay.handle_event(event.clone()).expect("event"), &event);
   4364         relay
   4365             .handle_protocol_req_for_test(subscription_id, vec![filter_kind(1)])
   4366             .expect("req");
   4367 
   4368         assert_eq!(relay.active_subscription_count(), 1);
   4369 
   4370         let report = relay.shutdown().expect("shutdown");
   4371 
   4372         assert_eq!(report.closed_subscriptions(), 1);
   4373         assert_eq!(relay.active_subscription_count(), 0);
   4374         assert!(relay.fanout_protocol_for_test(&event).is_empty());
   4375 
   4376         let reopened = BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default())
   4377             .expect("reopened");
   4378         assert_eq!(count_kind(&reopened, 1), 1);
   4379     }
   4380 
   4381     #[test]
   4382     fn base_relay_client_message_dispatch_handles_count_and_auth() {
   4383         let mut relay = test_relay("base-relay-dispatch", 4);
   4384         let mut auth =
   4385             BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   4386         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   4387             .expect("challenge");
   4388         let auth_event = signed_auth_event(7, "challenge-a", 120);
   4389         let count_id = SubscriptionId::new("count-a").expect("sub");
   4390 
   4391         assert_eq!(
   4392             relay
   4393                 .handle_client_message(
   4394                     ClientMessage::Auth(auth_event.clone()),
   4395                     &mut auth,
   4396                     UnixTimestamp::new(120)
   4397                 )
   4398                 .expect("auth"),
   4399             vec![RelayMessage::Ok {
   4400                 event_id: auth_event.id().clone(),
   4401                 accepted: true,
   4402                 message: String::new()
   4403             }]
   4404         );
   4405         assert_eq!(
   4406             relay
   4407                 .handle_client_message(
   4408                     ClientMessage::Count {
   4409                         subscription_id: count_id.clone(),
   4410                         filters: vec![Filter::empty()]
   4411                     },
   4412                     &mut auth,
   4413                     UnixTimestamp::new(130)
   4414                 )
   4415                 .expect("count"),
   4416             vec![RelayMessage::Count {
   4417                 subscription_id: count_id,
   4418                 count: 0,
   4419                 hll: None
   4420             }]
   4421         );
   4422     }
   4423 
   4424     #[test]
   4425     fn base_relay_enforces_event_and_filter_runtime_limits() {
   4426         let config = test_store_config("base-relay-event-filter-runtime-limits");
   4427         let mut relay =
   4428             BaseRelay::open(&config, strict_relay_limits(), PocketQueryConfig::default())
   4429                 .expect("relay");
   4430         let first = signed_public_event(7, 1, Vec::new(), "a");
   4431         let second = signed_event_at(8, 1, Vec::new(), "b", 1_714_124_434);
   4432 
   4433         assert_accepted(relay.handle_event(first.clone()).expect("first"), &first);
   4434         assert_accepted(relay.handle_event(second.clone()).expect("second"), &second);
   4435         assert_eq!(
   4436             rejected_message(
   4437                 relay
   4438                     .handle_event(signed_public_event(7, 1, Vec::new(), "abcde"))
   4439                     .expect("content")
   4440             ),
   4441             "invalid: event content length exceeds runtime max_content_length 4"
   4442         );
   4443         assert_eq!(
   4444             rejected_message(
   4445                 relay
   4446                     .handle_event(signed_public_event(
   4447                         7,
   4448                         1,
   4449                         vec![
   4450                             Tag::from_parts("t", &["one"]).expect("tag"),
   4451                             Tag::from_parts("r", &["two"]).expect("tag"),
   4452                         ],
   4453                         "",
   4454                     ))
   4455                     .expect("tags")
   4456             ),
   4457             "invalid: event tag count exceeds runtime max_event_tags 1"
   4458         );
   4459         assert_eq!(
   4460             relay
   4461                 .handle_protocol_req_for_test(
   4462                     SubscriptionId::new("a").expect("sub"),
   4463                     vec![Filter::empty()]
   4464                 )
   4465                 .expect("default limit")
   4466                 .len(),
   4467             2
   4468         );
   4469         assert!(
   4470             relay
   4471                 .handle_protocol_req_for_test(
   4472                     SubscriptionId::new("a").expect("sub"),
   4473                     vec![Filter::empty(), Filter::empty()],
   4474                 )
   4475                 .expect_err("filter count")
   4476                 .prefixed_message()
   4477                 .contains("max_filters_per_request 1")
   4478         );
   4479         assert!(
   4480             relay
   4481                 .handle_count_protocol(
   4482                     SubscriptionId::new("a").expect("sub"),
   4483                     vec![
   4484                         filter_from_value(&serde_json::json!({"#t":["one", "two"]}))
   4485                             .expect("filter"),
   4486                     ],
   4487                 )
   4488                 .expect_err("tag values")
   4489                 .prefixed_message()
   4490                 .contains("max_tag_values_per_filter 1")
   4491         );
   4492         assert!(
   4493             relay
   4494                 .handle_protocol_req_for_test(
   4495                     SubscriptionId::new("a").expect("sub"),
   4496                     vec![filter_from_value(&serde_json::json!({"limit": 3})).expect("filter")],
   4497                 )
   4498                 .expect_err("max limit")
   4499                 .prefixed_message()
   4500                 .contains("max_limit 2")
   4501         );
   4502     }
   4503 
   4504     #[test]
   4505     fn base_relay_enforces_subscription_id_and_count_limits() {
   4506         let config = test_store_config("base-relay-subscription-limits");
   4507         let mut relay =
   4508             BaseRelay::open(&config, strict_relay_limits(), PocketQueryConfig::default())
   4509                 .expect("relay");
   4510 
   4511         assert!(
   4512             relay
   4513                 .handle_protocol_req_for_test(
   4514                     SubscriptionId::new("abcde").expect("sub"),
   4515                     vec![Filter::empty()],
   4516                 )
   4517                 .expect_err("sub id length")
   4518                 .prefixed_message()
   4519                 .contains("max_subid_length 4")
   4520         );
   4521         relay
   4522             .handle_protocol_req_for_test(
   4523                 SubscriptionId::new("a").expect("sub"),
   4524                 vec![Filter::empty()],
   4525             )
   4526             .expect("first subscription");
   4527         assert!(
   4528             relay
   4529                 .handle_protocol_req_for_test(
   4530                     SubscriptionId::new("b").expect("sub"),
   4531                     vec![Filter::empty()]
   4532                 )
   4533                 .expect_err("subscription count")
   4534                 .prefixed_message()
   4535                 .contains("connection subscription limit exceeded")
   4536         );
   4537         relay
   4538             .handle_protocol_req_for_test(
   4539                 SubscriptionId::new("a").expect("sub"),
   4540                 vec![Filter::empty()],
   4541             )
   4542             .expect("replace subscription");
   4543     }
   4544 
   4545     fn test_relay(name: &str, max_pending_events: usize) -> BaseRelay {
   4546         let config = test_store_config(name);
   4547         BaseRelay::open(
   4548             &config,
   4549             relay_limits(max_pending_events),
   4550             PocketQueryConfig::default(),
   4551         )
   4552         .expect("relay")
   4553     }
   4554 
   4555     fn test_relay_with_groups(
   4556         name: &str,
   4557         max_pending_events: usize,
   4558         groups: &tangle_groups::GroupRuntimeConfig,
   4559     ) -> BaseRelay {
   4560         let config = test_store_config(name);
   4561         BaseRelay::open_with_groups(
   4562             &config,
   4563             relay_limits(max_pending_events),
   4564             groups,
   4565             PocketQueryConfig::default(),
   4566         )
   4567         .expect("relay")
   4568     }
   4569 
   4570     fn relay_limits(max_pending_events: usize) -> BaseRelayLimits {
   4571         BaseRelayLimits::new(BaseRelayLimitSettings {
   4572             max_pending_events,
   4573             max_subscription_id_length: 64,
   4574             max_subscriptions: 64,
   4575             max_filters_per_request: 10,
   4576             max_tag_values_per_filter: 100,
   4577             max_query_complexity: 610,
   4578             max_event_tags: 200,
   4579             max_content_length: 65_536,
   4580             max_limit: 500,
   4581             default_limit: 100,
   4582         })
   4583         .expect("limits")
   4584     }
   4585 
   4586     fn strict_relay_limits() -> BaseRelayLimits {
   4587         BaseRelayLimits::new(BaseRelayLimitSettings {
   4588             max_pending_events: 4,
   4589             max_subscription_id_length: 4,
   4590             max_subscriptions: 1,
   4591             max_filters_per_request: 1,
   4592             max_tag_values_per_filter: 1,
   4593             max_query_complexity: 4,
   4594             max_event_tags: 1,
   4595             max_content_length: 4,
   4596             max_limit: 2,
   4597             default_limit: 1,
   4598         })
   4599         .expect("limits")
   4600     }
   4601 
   4602     fn test_store_config(name: &str) -> PocketStoreConfig {
   4603         let root = std::env::temp_dir().join(format!("tangle-{name}-{}", std::process::id()));
   4604         let _ = std::fs::remove_dir_all(&root);
   4605         PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
   4606             .expect("config")
   4607     }
   4608 
   4609     fn enabled_groups_for_owner(owner: &PublicKeyHex) -> tangle_groups::GroupRuntimeConfig {
   4610         parse_group_runtime_config_json(&format!(
   4611             r#"{{
   4612                 "enabled": true,
   4613                 "canonical_relay_url": "wss://relay.radroots.test",
   4614                 "relay_secret": "{}",
   4615                 "owner_pubkeys": ["{}"]
   4616             }}"#,
   4617             "7".repeat(64),
   4618             owner.as_str()
   4619         ))
   4620         .expect("groups")
   4621     }
   4622 
   4623     fn enabled_groups_for_owner_with_public_join(
   4624         owner: &PublicKeyHex,
   4625     ) -> tangle_groups::GroupRuntimeConfig {
   4626         parse_group_runtime_config_json(&format!(
   4627             r#"{{
   4628                 "enabled": true,
   4629                 "canonical_relay_url": "wss://relay.radroots.test",
   4630                 "relay_secret": "{}",
   4631                 "owner_pubkeys": ["{}"],
   4632                 "policy": {{"public_join": true, "invites_enabled": false}}
   4633             }}"#,
   4634             "7".repeat(64),
   4635             owner.as_str()
   4636         ))
   4637         .expect("groups")
   4638     }
   4639 
   4640     fn disabled_groups() -> tangle_groups::GroupRuntimeConfig {
   4641         parse_group_runtime_config_json(r#"{"enabled": false}"#).expect("groups")
   4642     }
   4643 
   4644     fn signed_auth_event(secret_byte: u8, challenge: &str, created_at: u64) -> Event {
   4645         signed_tangle_event_at(
   4646             secret_byte,
   4647             22_242,
   4648             vec![
   4649                 Tag::from_parts("relay", &["wss://relay.radroots.test"]).expect("relay"),
   4650                 Tag::from_parts("challenge", &[challenge]).expect("challenge"),
   4651             ],
   4652             "",
   4653             created_at,
   4654         )
   4655     }
   4656 
   4657     fn signed_public_event(secret_byte: u8, kind: u64, tags: Vec<Tag>, content: &str) -> Event {
   4658         signed_event_at(secret_byte, kind, tags, content, 1_714_124_433)
   4659     }
   4660 
   4661     fn signed_pocket_event(
   4662         secret_byte: u8,
   4663         kind: u16,
   4664         tags: &PocketOwnedTags,
   4665         content: &[u8],
   4666     ) -> PocketOwnedEvent {
   4667         signed_pocket_event_at(secret_byte, kind, tags, content, 1_714_124_433)
   4668     }
   4669 
   4670     fn signed_pocket_event_at(
   4671         secret_byte: u8,
   4672         kind: u16,
   4673         tags: &PocketOwnedTags,
   4674         content: &[u8],
   4675         created_at: u64,
   4676     ) -> PocketOwnedEvent {
   4677         let secret = format!("{secret_byte:02x}").repeat(32);
   4678         RelaySigner::from_secret_hex(&secret)
   4679             .expect("signer")
   4680             .sign_pocket_event(
   4681                 PocketKind::from_u16(kind),
   4682                 tags,
   4683                 PocketTime::from_u64(created_at),
   4684                 content,
   4685             )
   4686             .expect("pocket event")
   4687     }
   4688 
   4689     fn signed_pocket_public_event(
   4690         secret_byte: u8,
   4691         kind: u32,
   4692         tags: Vec<Tag>,
   4693         content: &str,
   4694     ) -> PocketOwnedEvent {
   4695         signed_pocket_event_at_tags(secret_byte, kind, tags, content, 1_714_124_433)
   4696     }
   4697 
   4698     fn signed_pocket_group_create_event(secret_byte: u8, group_id: &str) -> PocketOwnedEvent {
   4699         signed_pocket_group_create_event_with_tags(secret_byte, group_id, Vec::new(), 1_714_124_433)
   4700     }
   4701 
   4702     fn signed_pocket_group_create_event_with_tags(
   4703         secret_byte: u8,
   4704         group_id: &str,
   4705         mut extra_tags: Vec<Tag>,
   4706         created_at: u64,
   4707     ) -> PocketOwnedEvent {
   4708         let mut tags = vec![h(group_id), name(group_id)];
   4709         tags.append(&mut extra_tags);
   4710         signed_pocket_event_at_tags(secret_byte, KIND_GROUP_CREATE_GROUP, tags, "", created_at)
   4711     }
   4712 
   4713     fn signed_pocket_private_group_create_event(
   4714         secret_byte: u8,
   4715         group_id: &str,
   4716     ) -> PocketOwnedEvent {
   4717         signed_pocket_event_at_tags(
   4718             secret_byte,
   4719             KIND_GROUP_CREATE_GROUP,
   4720             vec![h(group_id), name(group_id), private()],
   4721             "",
   4722             1_714_124_433,
   4723         )
   4724     }
   4725 
   4726     fn signed_pocket_event_at_tags(
   4727         secret_byte: u8,
   4728         kind: u32,
   4729         tags: Vec<Tag>,
   4730         content: &str,
   4731         created_at: u64,
   4732     ) -> PocketOwnedEvent {
   4733         let tags = pocket_tags_from_protocol(&tags);
   4734         signed_pocket_event_at(
   4735             secret_byte,
   4736             u16::try_from(kind).expect("pocket kind"),
   4737             &tags,
   4738             content.as_bytes(),
   4739             created_at,
   4740         )
   4741     }
   4742 
   4743     fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags {
   4744         let parts = tags
   4745             .iter()
   4746             .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
   4747             .collect::<Vec<_>>();
   4748         PocketOwnedTags::new(&parts).expect("pocket tags")
   4749     }
   4750 
   4751     fn signed_group_create_event(secret_byte: u8, group_id: &str) -> Event {
   4752         signed_group_create_event_with_tags(secret_byte, group_id, Vec::new(), 1_714_124_433)
   4753     }
   4754 
   4755     fn signed_group_create_event_with_tags(
   4756         secret_byte: u8,
   4757         group_id: &str,
   4758         mut extra_tags: Vec<Tag>,
   4759         created_at: u64,
   4760     ) -> Event {
   4761         let mut tags = vec![h(group_id), name(group_id)];
   4762         tags.append(&mut extra_tags);
   4763         signed_event_at(
   4764             secret_byte,
   4765             KIND_GROUP_CREATE_GROUP.into(),
   4766             tags,
   4767             "",
   4768             created_at,
   4769         )
   4770     }
   4771 
   4772     fn signed_private_group_create_event(secret_byte: u8, group_id: &str) -> Event {
   4773         signed_event_at(
   4774             secret_byte,
   4775             KIND_GROUP_CREATE_GROUP.into(),
   4776             vec![h(group_id), name(group_id), private()],
   4777             "",
   4778             1_714_124_433,
   4779         )
   4780     }
   4781 
   4782     fn signed_event_at(
   4783         secret_byte: u8,
   4784         kind: u64,
   4785         tags: Vec<Tag>,
   4786         content: &str,
   4787         created_at: u64,
   4788     ) -> Event {
   4789         let pocket = signed_pocket_event_at_tags(
   4790             secret_byte,
   4791             u32::try_from(kind).expect("kind"),
   4792             tags,
   4793             content,
   4794             created_at,
   4795         );
   4796         pocket_event_to_protocol(&pocket)
   4797     }
   4798 
   4799     fn signed_tangle_event_at(
   4800         secret_byte: u8,
   4801         kind: u64,
   4802         tags: Vec<Tag>,
   4803         content: &str,
   4804         created_at: u64,
   4805     ) -> Event {
   4806         let secret = format!("{secret_byte:02x}").repeat(32);
   4807         let signer = RelaySigner::from_secret_hex(&secret).expect("signer");
   4808         let unsigned = UnsignedEvent::new(
   4809             signer.public_key().clone(),
   4810             UnixTimestamp::new(created_at),
   4811             Kind::new(kind).expect("kind"),
   4812             tags,
   4813             content,
   4814         );
   4815         signer.sign_unsigned_event(unsigned)
   4816     }
   4817 
   4818     fn pocket_event_id(event: &PocketEvent) -> EventId {
   4819         EventId::new(&event.id().as_hex_string()).expect("event id")
   4820     }
   4821 
   4822     fn pocket_event_to_protocol(event: &PocketEvent) -> Event {
   4823         let tags = event
   4824             .tags()
   4825             .expect("tags")
   4826             .iter()
   4827             .map(|tag| {
   4828                 Tag::new(
   4829                     tag.map(|value| std::str::from_utf8(value).expect("utf8").to_owned())
   4830                         .collect::<Vec<_>>(),
   4831                 )
   4832                 .expect("tag")
   4833             })
   4834             .collect::<Vec<_>>();
   4835         Event::new(
   4836             pocket_event_id(event),
   4837             tangle_protocol::UnsignedEvent::new(
   4838                 PublicKeyHex::new(&event.pubkey().as_hex_string()).expect("pubkey"),
   4839                 UnixTimestamp::new(event.created_at().as_u64()),
   4840                 tangle_protocol::Kind::new(u64::from(event.kind().as_u16())).expect("kind"),
   4841                 tags,
   4842                 std::str::from_utf8(event.content()).expect("content"),
   4843             ),
   4844             SignatureHex::new(&event.sig().to_string()).expect("sig"),
   4845         )
   4846     }
   4847 
   4848     fn authenticated_state(secret_byte: u8) -> BaseAuthState {
   4849         let mut auth =
   4850             BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
   4851         auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
   4852             .expect("challenge");
   4853         let event = signed_auth_event(secret_byte, "challenge-a", 120);
   4854         auth.authenticate(&event, UnixTimestamp::new(120))
   4855             .expect("authenticate");
   4856         auth
   4857     }
   4858 
   4859     fn count_kind(relay: &BaseRelay, kind: u32) -> u64 {
   4860         let subscription_id = SubscriptionId::new(&format!("count-{kind}")).expect("sub");
   4861         let filter = filter_kind(kind);
   4862         match relay
   4863             .handle_count_protocol(subscription_id, vec![filter])
   4864             .expect("count")
   4865         {
   4866             RelayMessage::Count { count, .. } => count,
   4867             _ => panic!("count response expected"),
   4868         }
   4869     }
   4870 
   4871     fn count_kind_with_auth(relay: &BaseRelay, kind: u32, auth: &BaseAuthState) -> u64 {
   4872         let subscription_id = SubscriptionId::new(&format!("count-auth-{kind}")).expect("sub");
   4873         match relay
   4874             .handle_count_with_auth_protocol(subscription_id, vec![filter_kind(kind)], auth)
   4875             .expect("count")
   4876         {
   4877             RelayMessage::Count { count, .. } => count,
   4878             _ => panic!("count response expected"),
   4879         }
   4880     }
   4881 
   4882     fn count_filter(relay: &BaseRelay, subscription_id: &str, filter: Filter) -> u64 {
   4883         match relay
   4884             .handle_count_protocol(
   4885                 SubscriptionId::new(subscription_id).expect("sub"),
   4886                 vec![filter],
   4887             )
   4888             .expect("count")
   4889         {
   4890             RelayMessage::Count { count, .. } => count,
   4891             _ => panic!("count response expected"),
   4892         }
   4893     }
   4894 
   4895     fn count_filter_with_auth(
   4896         relay: &BaseRelay,
   4897         subscription_id: &str,
   4898         filter: Filter,
   4899         auth: &BaseAuthState,
   4900     ) -> u64 {
   4901         match relay
   4902             .handle_count_with_auth_protocol(
   4903                 SubscriptionId::new(subscription_id).expect("sub"),
   4904                 vec![filter],
   4905                 auth,
   4906             )
   4907             .expect("count")
   4908         {
   4909             RelayMessage::Count { count, .. } => count,
   4910             _ => panic!("count response expected"),
   4911         }
   4912     }
   4913 
   4914     fn assert_count_without_hll(
   4915         relay: &BaseRelay,
   4916         subscription_id: &str,
   4917         value: serde_json::Value,
   4918         auth: Option<&BaseAuthState>,
   4919         expected_count: u64,
   4920     ) {
   4921         let subscription_id = SubscriptionId::new(subscription_id).expect("sub");
   4922         let filter = filter_from_value(&value).expect("filter");
   4923         let message = match auth {
   4924             Some(auth) => {
   4925                 relay.handle_count_with_auth_protocol(subscription_id.clone(), vec![filter], auth)
   4926             }
   4927             None => relay.handle_count_protocol(subscription_id.clone(), vec![filter]),
   4928         }
   4929         .expect("count");
   4930         assert_eq!(
   4931             message,
   4932             RelayMessage::Count {
   4933                 subscription_id,
   4934                 count: expected_count,
   4935                 hll: None
   4936             }
   4937         );
   4938     }
   4939 
   4940     fn query_filter(relay: &mut BaseRelay, subscription_id: &str, filter: Filter) -> Vec<Event> {
   4941         relay
   4942             .handle_protocol_req_for_test(
   4943                 SubscriptionId::new(subscription_id).expect("sub"),
   4944                 vec![filter],
   4945             )
   4946             .expect("query")
   4947             .into_iter()
   4948             .filter_map(|message| match message {
   4949                 RelayMessage::Event { event, .. } => Some(event),
   4950                 _ => None,
   4951             })
   4952             .collect()
   4953     }
   4954 
   4955     fn filter_kind(kind: u32) -> Filter {
   4956         filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter")
   4957     }
   4958 
   4959     fn filter_group_tag(kind: u32, tag: &str, group_id: &str) -> Filter {
   4960         let mut value = serde_json::json!({"kinds":[kind]});
   4961         value
   4962             .as_object_mut()
   4963             .expect("object")
   4964             .insert(format!("#{tag}"), serde_json::json!([group_id]));
   4965         filter_from_value(&value).expect("filter")
   4966     }
   4967 
   4968     fn pocket_filter_from_value(value: serde_json::Value) -> PocketOwnedFilter {
   4969         tangle_filter_to_pocket(&filter_from_value(&value).expect("filter")).expect("pocket")
   4970     }
   4971 
   4972     fn hll_target_policy(
   4973         relay: &BaseRelay,
   4974         value: serde_json::Value,
   4975     ) -> BaseRelayCountHllTargetPolicy {
   4976         let filter = pocket_filter_from_value(value);
   4977         BaseRelay::count_hll_filter_target_policy(relay.groups.as_ref(), &filter)
   4978     }
   4979 
   4980     fn count_hll_for_target_policy_test() -> BaseRelayCountHll {
   4981         BaseRelayCountHll {
   4982             offset: Some(0),
   4983             hll: Some(PocketHll8::new()),
   4984             suppressed: false,
   4985         }
   4986     }
   4987 
   4988     fn assert_accepted(message: RelayMessage, event: &Event) {
   4989         assert_eq!(
   4990             message,
   4991             RelayMessage::Ok {
   4992                 event_id: event.id().clone(),
   4993                 accepted: true,
   4994                 message: String::new()
   4995             }
   4996         );
   4997     }
   4998 
   4999     fn assert_pocket_accepted(message: RelayMessage, event: &PocketEvent) {
   5000         assert_eq!(
   5001             message,
   5002             RelayMessage::Ok {
   5003                 event_id: pocket_event_id(event),
   5004                 accepted: true,
   5005                 message: String::new()
   5006             }
   5007         );
   5008     }
   5009 
   5010     fn rejected_message(message: RelayMessage) -> String {
   5011         match message {
   5012             RelayMessage::Ok {
   5013                 accepted: false,
   5014                 message,
   5015                 ..
   5016             } => message,
   5017             _ => panic!("rejected OK expected"),
   5018         }
   5019     }
   5020 
   5021     fn assert_member_status(
   5022         relay: &BaseRelay,
   5023         group_id: &str,
   5024         pubkey: &PublicKeyHex,
   5025         status: MemberStatus,
   5026     ) {
   5027         assert_eq!(
   5028             relay
   5029                 .group_projection()
   5030                 .expect("projection")
   5031                 .member(&GroupId::new(group_id).expect("group"), pubkey)
   5032                 .expect("member")
   5033                 .status(),
   5034             status
   5035         );
   5036     }
   5037 
   5038     fn has_tag(event: &Event, name: &str, values: &[&str]) -> bool {
   5039         event.unsigned().tags().iter().any(|tag| {
   5040             tag.values().first().is_some_and(|value| value == name)
   5041                 && tag.values().len() == values.len() + 1
   5042                 && values.iter().enumerate().all(|(index, expected)| {
   5043                     tag.values()
   5044                         .get(index + 1)
   5045                         .is_some_and(|value| value == expected)
   5046                 })
   5047         })
   5048     }
   5049 
   5050     fn h(group_id: &str) -> Tag {
   5051         Tag::from_parts("h", &[group_id]).expect("h")
   5052     }
   5053 
   5054     fn p(pubkey: &PublicKeyHex) -> Tag {
   5055         Tag::from_parts("p", &[pubkey.as_str()]).expect("p")
   5056     }
   5057 
   5058     fn e(event_id: &EventId) -> Tag {
   5059         Tag::from_parts("e", &[event_id.as_str()]).expect("e")
   5060     }
   5061 
   5062     fn name(value: &str) -> Tag {
   5063         Tag::from_parts("name", &[value]).expect("name")
   5064     }
   5065 
   5066     fn private() -> Tag {
   5067         Tag::from_parts("private", &[]).expect("private")
   5068     }
   5069 
   5070     fn restricted() -> Tag {
   5071         Tag::from_parts("restricted", &[]).expect("restricted")
   5072     }
   5073 
   5074     fn hidden() -> Tag {
   5075         Tag::from_parts("hidden", &[]).expect("hidden")
   5076     }
   5077 
   5078     fn closed() -> Tag {
   5079         Tag::from_parts("closed", &[]).expect("closed")
   5080     }
   5081 
   5082     fn signer(secret_byte: u8) -> RelaySigner {
   5083         RelaySigner::from_secret_hex(&format!("{:02x}", secret_byte).repeat(32)).expect("signer")
   5084     }
   5085 }