tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

lib.rs (108173B)


      1 #![forbid(unsafe_code)]
      2 
      3 use pocket_types::json::json_escape;
      4 use pocket_types::secp256k1::{Keypair, Secp256k1, SecretKey};
      5 use serde_json::json;
      6 use sha2::{Digest, Sha256};
      7 use std::collections::BTreeMap;
      8 use std::fs;
      9 use std::path::{Path, PathBuf};
     10 use std::sync::atomic::{AtomicU64, Ordering};
     11 use std::time::Instant;
     12 use tangle_groups::{
     13     KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
     14     KIND_GROUP_PUT_USER, MemberStatus,
     15 };
     16 use tangle_protocol::{
     17     Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, SubscriptionId, Tag,
     18     UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, filter_to_value,
     19 };
     20 use tangle_runtime::{
     21     config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
     22     relay::{
     23         auth::BaseAuthState,
     24         core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits},
     25         outbound::RuntimeRelayMessage,
     26     },
     27     runtime::{RelayRuntime, RelayRuntimeHandle},
     28 };
     29 use tangle_store_pocket::{
     30     PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, PocketPubkey,
     31     PocketQueryConfig, PocketSig, PocketStoreConfig, PocketSyncPolicy, PocketTime,
     32     parse_pocket_event_json, parse_pocket_filter_json,
     33 };
     34 use tangle_test_support::{FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_group_config};
     35 
     36 static TEMP_ID: AtomicU64 = AtomicU64::new(0);
     37 
     38 pub const SCENARIO_POCKET_QUERY_VISIBLE_EVENTS: &str = "pocket_query_visible_events";
     39 pub const SCENARIO_GROUP_READ_GATE_OVERHEAD: &str = "group_read_gate_overhead";
     40 pub const SCENARIO_COUNT_RESOURCE_CONTROLS: &str = "count_resource_controls";
     41 pub const SCENARIO_PROJECTION_REBUILD: &str = "projection_rebuild";
     42 pub const SCENARIO_OUTBOX_REPLAY: &str = "outbox_replay";
     43 pub const SCENARIO_BROADCAST_LAG: &str = "broadcast_lag";
     44 pub const SCENARIO_MEMORY_PROFILE: &str = "memory_profile";
     45 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT: &str = "virtual_relay_fanout_1_percent";
     46 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT: &str = "virtual_relay_fanout_10_percent";
     47 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT: &str = "virtual_relay_fanout_100_percent";
     48 pub const POCKET_SOURCE_REPOSITORY: &str = "https://github.com/triesap/pocket";
     49 pub const POCKET_SOURCE_REVISION: &str = "329334f20948c796c6016b673b92551ac4855ad7";
     50 
     51 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     52 pub struct BenchDatasetConfig {
     53     pub group_count: usize,
     54     pub public_events_per_group: usize,
     55     pub private_events_per_group: usize,
     56     pub public_note_count: usize,
     57     pub member_count: usize,
     58 }
     59 
     60 impl BenchDatasetConfig {
     61     pub fn new(
     62         group_count: usize,
     63         public_events_per_group: usize,
     64         private_events_per_group: usize,
     65         public_note_count: usize,
     66         member_count: usize,
     67     ) -> Self {
     68         Self {
     69             group_count,
     70             public_events_per_group,
     71             private_events_per_group,
     72             public_note_count,
     73             member_count,
     74         }
     75     }
     76 
     77     pub fn smoke() -> Self {
     78         Self::new(6, 4, 3, 6, 3)
     79     }
     80 
     81     pub fn medium() -> Self {
     82         Self::new(24, 8, 6, 24, 5)
     83     }
     84 
     85     pub fn large_smoke() -> Self {
     86         Self::new(120, 24, 16, 120, 12)
     87     }
     88 
     89     pub fn proof_10m() -> Self {
     90         Self::new(30_000, 100, 100, 6_670_000, 10)
     91     }
     92 
     93     pub fn proof_large_group() -> Self {
     94         Self::new(3, 50, 50, 10_000, 100_000)
     95     }
     96 
     97     pub fn proof_join_storm() -> Self {
     98         Self::new(25_000, 1, 1, 25_000, 40)
     99     }
    100 
    101     pub fn proof_slow_client() -> Self {
    102         Self::new(50_000, 2, 1, 50_000, 2)
    103     }
    104 
    105     pub fn validate(self) -> Result<Self, String> {
    106         if self.group_count < 3 {
    107             return Err("group-count must be at least 3".to_owned());
    108         }
    109         if self.public_events_per_group == 0 {
    110             return Err("public-events-per-group must be greater than zero".to_owned());
    111         }
    112         if self.private_events_per_group == 0 {
    113             return Err("private-events-per-group must be greater than zero".to_owned());
    114         }
    115         if self.member_count == 0 {
    116             return Err("member-count must be greater than zero".to_owned());
    117         }
    118         Ok(self)
    119     }
    120 
    121     pub fn estimated_source_event_count(self) -> u64 {
    122         let public_groups = self.group_count.div_ceil(3);
    123         let private_and_hidden_groups = self.group_count - public_groups;
    124         let total = self.group_count
    125             + self.group_count * self.member_count
    126             + public_groups * self.public_events_per_group
    127             + private_and_hidden_groups * self.private_events_per_group
    128             + self.public_note_count;
    129         total.try_into().expect("estimated event count fits in u64")
    130     }
    131 }
    132 
    133 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    134 pub enum BenchmarkProfileName {
    135     Smoke,
    136     VirtualRelayTenancy,
    137     Medium,
    138     LargeSmoke,
    139     Proof10m,
    140     ProofLargeGroup,
    141     ProofJoinStorm,
    142     ProofSlowClient,
    143 }
    144 
    145 impl BenchmarkProfileName {
    146     pub fn parse(value: &str) -> Result<Self, String> {
    147         match value {
    148             "smoke" => Ok(Self::Smoke),
    149             "virtual-relay-tenancy" => Ok(Self::VirtualRelayTenancy),
    150             "medium" => Ok(Self::Medium),
    151             "large-smoke" => Ok(Self::LargeSmoke),
    152             "proof-10m" => Ok(Self::Proof10m),
    153             "proof-large-group" => Ok(Self::ProofLargeGroup),
    154             "proof-join-storm" => Ok(Self::ProofJoinStorm),
    155             "proof-slow-client" => Ok(Self::ProofSlowClient),
    156             _ => Err(format!(
    157                 "unknown benchmark profile `{value}`; expected smoke, virtual-relay-tenancy, medium, large-smoke, proof-10m, proof-large-group, proof-join-storm, or proof-slow-client"
    158             )),
    159         }
    160     }
    161 
    162     pub fn as_str(self) -> &'static str {
    163         match self {
    164             Self::Smoke => "smoke",
    165             Self::VirtualRelayTenancy => "virtual-relay-tenancy",
    166             Self::Medium => "medium",
    167             Self::LargeSmoke => "large-smoke",
    168             Self::Proof10m => "proof-10m",
    169             Self::ProofLargeGroup => "proof-large-group",
    170             Self::ProofJoinStorm => "proof-join-storm",
    171             Self::ProofSlowClient => "proof-slow-client",
    172         }
    173     }
    174 
    175     pub fn all() -> [Self; 8] {
    176         [
    177             Self::Smoke,
    178             Self::VirtualRelayTenancy,
    179             Self::Medium,
    180             Self::LargeSmoke,
    181             Self::Proof10m,
    182             Self::ProofLargeGroup,
    183             Self::ProofJoinStorm,
    184             Self::ProofSlowClient,
    185         ]
    186     }
    187 
    188     pub fn is_proof(self) -> bool {
    189         matches!(
    190             self,
    191             Self::Proof10m | Self::ProofLargeGroup | Self::ProofJoinStorm | Self::ProofSlowClient
    192         )
    193     }
    194 }
    195 
    196 #[derive(Debug, Clone, PartialEq, Eq)]
    197 pub struct BenchmarkProfile {
    198     name: BenchmarkProfileName,
    199     dataset_config: BenchDatasetConfig,
    200     thresholds: BenchmarkThresholds,
    201     threshold_source: String,
    202     target_hardware_evidence: Option<String>,
    203 }
    204 
    205 impl BenchmarkProfile {
    206     pub fn from_name(name: BenchmarkProfileName) -> Self {
    207         match name {
    208             BenchmarkProfileName::Smoke => Self::smoke(),
    209             BenchmarkProfileName::VirtualRelayTenancy => Self::virtual_relay_tenancy(),
    210             BenchmarkProfileName::Medium => Self::medium(),
    211             BenchmarkProfileName::LargeSmoke => Self::large_smoke(),
    212             BenchmarkProfileName::Proof10m => Self::proof_10m(),
    213             BenchmarkProfileName::ProofLargeGroup => Self::proof_large_group(),
    214             BenchmarkProfileName::ProofJoinStorm => Self::proof_join_storm(),
    215             BenchmarkProfileName::ProofSlowClient => Self::proof_slow_client(),
    216         }
    217     }
    218 
    219     pub fn smoke() -> Self {
    220         Self::new(
    221             BenchmarkProfileName::Smoke,
    222             BenchDatasetConfig::smoke(),
    223             BenchmarkThresholds::smoke(),
    224         )
    225     }
    226 
    227     pub fn virtual_relay_tenancy() -> Self {
    228         Self::new(
    229             BenchmarkProfileName::VirtualRelayTenancy,
    230             BenchDatasetConfig::smoke(),
    231             BenchmarkThresholds::large_smoke(),
    232         )
    233     }
    234 
    235     pub fn medium() -> Self {
    236         Self::new(
    237             BenchmarkProfileName::Medium,
    238             BenchDatasetConfig::medium(),
    239             BenchmarkThresholds::medium(),
    240         )
    241     }
    242 
    243     pub fn large_smoke() -> Self {
    244         Self::new(
    245             BenchmarkProfileName::LargeSmoke,
    246             BenchDatasetConfig::large_smoke(),
    247             BenchmarkThresholds::large_smoke(),
    248         )
    249     }
    250 
    251     pub fn proof_10m() -> Self {
    252         Self::new(
    253             BenchmarkProfileName::Proof10m,
    254             BenchDatasetConfig::proof_10m(),
    255             BenchmarkThresholds::proof_10m(),
    256         )
    257     }
    258 
    259     pub fn proof_large_group() -> Self {
    260         Self::new(
    261             BenchmarkProfileName::ProofLargeGroup,
    262             BenchDatasetConfig::proof_large_group(),
    263             BenchmarkThresholds::proof_large_group(),
    264         )
    265     }
    266 
    267     pub fn proof_join_storm() -> Self {
    268         Self::new(
    269             BenchmarkProfileName::ProofJoinStorm,
    270             BenchDatasetConfig::proof_join_storm(),
    271             BenchmarkThresholds::proof_join_storm(),
    272         )
    273     }
    274 
    275     pub fn proof_slow_client() -> Self {
    276         Self::new(
    277             BenchmarkProfileName::ProofSlowClient,
    278             BenchDatasetConfig::proof_slow_client(),
    279             BenchmarkThresholds::proof_slow_client(),
    280         )
    281     }
    282 
    283     fn new(
    284         name: BenchmarkProfileName,
    285         dataset_config: BenchDatasetConfig,
    286         thresholds: BenchmarkThresholds,
    287     ) -> Self {
    288         Self {
    289             name,
    290             dataset_config,
    291             thresholds,
    292             threshold_source: format!("builtin:{}", name.as_str()),
    293             target_hardware_evidence: None,
    294         }
    295     }
    296 
    297     pub fn name(&self) -> BenchmarkProfileName {
    298         self.name
    299     }
    300 
    301     pub fn dataset_config(&self) -> BenchDatasetConfig {
    302         self.dataset_config
    303     }
    304 
    305     pub fn thresholds(&self) -> BenchmarkThresholds {
    306         self.thresholds
    307     }
    308 
    309     pub fn threshold_source(&self) -> &str {
    310         &self.threshold_source
    311     }
    312 
    313     pub fn target_hardware_evidence(&self) -> Option<&str> {
    314         self.target_hardware_evidence.as_deref()
    315     }
    316 
    317     pub fn requires_target_hardware_evidence(&self) -> bool {
    318         self.name.is_proof()
    319     }
    320 
    321     pub fn validate_for_run(&self) -> Result<(), String> {
    322         if self.requires_target_hardware_evidence() && self.target_hardware_evidence.is_none() {
    323             return Err(format!(
    324                 "target hardware evidence is required for `{}` benchmark profile",
    325                 self.name.as_str()
    326             ));
    327         }
    328         Ok(())
    329     }
    330 
    331     pub fn with_dataset_config(mut self, config: BenchDatasetConfig) -> Result<Self, String> {
    332         self.dataset_config = config.validate()?;
    333         Ok(self)
    334     }
    335 
    336     pub fn with_thresholds(
    337         mut self,
    338         thresholds: BenchmarkThresholds,
    339         source: impl Into<String>,
    340     ) -> Result<Self, String> {
    341         let source = source.into();
    342         if source.is_empty() {
    343             return Err("benchmark threshold source must not be empty".to_owned());
    344         }
    345         self.thresholds = thresholds;
    346         self.threshold_source = source;
    347         Ok(self)
    348     }
    349 
    350     pub fn with_target_hardware_evidence(
    351         mut self,
    352         evidence: impl Into<String>,
    353     ) -> Result<Self, String> {
    354         let evidence = evidence.into();
    355         if evidence.is_empty() {
    356             return Err("target hardware evidence must not be empty".to_owned());
    357         }
    358         self.target_hardware_evidence = Some(evidence);
    359         Ok(self)
    360     }
    361 
    362     pub fn proof_claim_eligible(&self) -> bool {
    363         self.name.is_proof() && self.target_hardware_evidence.is_some()
    364     }
    365 }
    366 
    367 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    368 pub struct VirtualRelayTenancyConfig {
    369     pub tenant_count: usize,
    370     pub subscriptions_per_tenant: usize,
    371     pub max_pending_events_per_subscription: usize,
    372 }
    373 
    374 impl VirtualRelayTenancyConfig {
    375     pub fn mvp() -> Self {
    376         Self {
    377             tenant_count: 10,
    378             subscriptions_per_tenant: 2_000,
    379             max_pending_events_per_subscription: 16,
    380         }
    381     }
    382 
    383     fn validate(self) -> Result<Self, String> {
    384         if self.tenant_count < 10 {
    385             return Err("virtual relay tenancy benchmark requires at least 10 tenants".to_owned());
    386         }
    387         if self.subscriptions_per_tenant < 2_000 {
    388             return Err(
    389                 "virtual relay tenancy benchmark requires at least 2,000 subscriptions per busiest tenant"
    390                     .to_owned(),
    391             );
    392         }
    393         if self.aggregate_active_subscriptions() < 20_000 {
    394             return Err(
    395                 "virtual relay tenancy benchmark requires at least 20,000 aggregate subscriptions"
    396                     .to_owned(),
    397             );
    398         }
    399         if self.max_pending_events_per_subscription == 0 {
    400             return Err(
    401                 "virtual relay tenancy benchmark pending event capacity must be greater than zero"
    402                     .to_owned(),
    403             );
    404         }
    405         Ok(self)
    406     }
    407 
    408     pub fn aggregate_active_subscriptions(self) -> usize {
    409         self.tenant_count * self.subscriptions_per_tenant
    410     }
    411 
    412     pub fn busiest_tenant_active_subscriptions(self) -> usize {
    413         self.subscriptions_per_tenant
    414     }
    415 
    416     fn one_percent_fanout(self) -> usize {
    417         self.aggregate_active_subscriptions() / 100
    418     }
    419 
    420     fn ten_percent_fanout(self) -> usize {
    421         self.aggregate_active_subscriptions() / 10
    422     }
    423 
    424     fn to_json(self) -> serde_json::Value {
    425         json!({
    426             "tenant_count": self.tenant_count,
    427             "aggregate_active_subscriptions": self.aggregate_active_subscriptions(),
    428             "busiest_tenant_active_subscriptions": self.busiest_tenant_active_subscriptions(),
    429             "max_pending_events_per_subscription": self.max_pending_events_per_subscription
    430         })
    431     }
    432 }
    433 
    434 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    435 pub enum BenchGroupVisibility {
    436     Public,
    437     Private,
    438     Hidden,
    439 }
    440 
    441 impl BenchGroupVisibility {
    442     pub fn as_str(self) -> &'static str {
    443         match self {
    444             Self::Public => "public",
    445             Self::Private => "private",
    446             Self::Hidden => "hidden",
    447         }
    448     }
    449 
    450     fn flags(self) -> &'static [&'static str] {
    451         match self {
    452             Self::Public => &[],
    453             Self::Private => &["private"],
    454             Self::Hidden => &["hidden"],
    455         }
    456     }
    457 }
    458 
    459 #[derive(Debug, Clone, PartialEq, Eq)]
    460 pub struct BenchGroup {
    461     id: String,
    462     visibility: BenchGroupVisibility,
    463 }
    464 
    465 impl BenchGroup {
    466     pub fn id(&self) -> &str {
    467         &self.id
    468     }
    469 
    470     pub fn visibility(&self) -> BenchGroupVisibility {
    471         self.visibility
    472     }
    473 }
    474 
    475 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    476 pub enum BenchEventAuth {
    477     None,
    478     Owner,
    479     Admin,
    480 }
    481 
    482 #[derive(Debug, Clone, PartialEq, Eq)]
    483 pub struct BenchSourceEvent {
    484     event: Event,
    485     auth: BenchEventAuth,
    486 }
    487 
    488 impl BenchSourceEvent {
    489     pub fn event(&self) -> &Event {
    490         &self.event
    491     }
    492 
    493     pub fn auth(&self) -> BenchEventAuth {
    494         self.auth
    495     }
    496 }
    497 
    498 #[derive(Debug, Clone, PartialEq, Eq)]
    499 pub struct BenchDataset {
    500     config: BenchDatasetConfig,
    501     groups: Vec<BenchGroup>,
    502     group_create_events: Vec<BenchSourceEvent>,
    503     membership_events: Vec<BenchSourceEvent>,
    504     group_timeline_events: Vec<BenchSourceEvent>,
    505     public_note_events: Vec<BenchSourceEvent>,
    506 }
    507 
    508 impl BenchDataset {
    509     pub fn generate(config: BenchDatasetConfig) -> Result<Self, String> {
    510         let config = config.validate()?;
    511         let groups = (0..config.group_count)
    512             .map(|index| BenchGroup {
    513                 id: format!("BenchFarm{index:04}"),
    514                 visibility: group_visibility(index),
    515             })
    516             .collect::<Vec<_>>();
    517         let mut group_create_events = Vec::with_capacity(groups.len());
    518         let mut membership_events = Vec::with_capacity(groups.len() * config.member_count);
    519         let mut group_timeline_events = Vec::new();
    520         let mut public_note_events = Vec::with_capacity(config.public_note_count);
    521 
    522         for (group_index, group) in groups.iter().enumerate() {
    523             group_create_events.push(BenchSourceEvent {
    524                 event: pocket_protocol_group_create_event(
    525                     FixtureKey::Owner,
    526                     &group.id,
    527                     1_714_200_000 + u64::try_from(group_index).expect("group index fits in u64"),
    528                     group.visibility.flags(),
    529                 )?,
    530                 auth: BenchEventAuth::Owner,
    531             });
    532             for member_index in 0..config.member_count {
    533                 membership_events.push(BenchSourceEvent {
    534                     event: bench_member_event(&group.id, group_index, member_index, 1_714_300_000)?,
    535                     auth: BenchEventAuth::Admin,
    536                 });
    537             }
    538             let per_group = match group.visibility {
    539                 BenchGroupVisibility::Public => config.public_events_per_group,
    540                 BenchGroupVisibility::Private | BenchGroupVisibility::Hidden => {
    541                     config.private_events_per_group
    542                 }
    543             };
    544             for event_index in 0..per_group {
    545                 let created_at = 1_714_400_000
    546                     + u64::try_from(group_index * 10_000 + event_index)
    547                         .expect("event index fits in u64");
    548                 group_timeline_events.push(BenchSourceEvent {
    549                     event: pocket_protocol_group_event(
    550                         FixtureKey::Owner,
    551                         &group.id,
    552                         created_at,
    553                         1,
    554                         &format!(
    555                             "bench {} group event {group_index:04}-{event_index:04}",
    556                             group.visibility.as_str()
    557                         ),
    558                     )?,
    559                     auth: BenchEventAuth::Owner,
    560                 });
    561             }
    562         }
    563 
    564         for index in 0..config.public_note_count {
    565             public_note_events.push(BenchSourceEvent {
    566                 event: pocket_protocol_event(
    567                     FixtureKey::Outsider,
    568                     1_714_500_000 + u64::try_from(index).expect("note index fits in u64"),
    569                     1,
    570                     vec![
    571                         Tag::from_parts("t", &["tangle-bench"])
    572                             .map_err(|error| error.to_string())?,
    573                     ],
    574                     &format!("bench public note {index:04}"),
    575                 )?,
    576                 auth: BenchEventAuth::None,
    577             });
    578         }
    579 
    580         Ok(Self {
    581             config,
    582             groups,
    583             group_create_events,
    584             membership_events,
    585             group_timeline_events,
    586             public_note_events,
    587         })
    588     }
    589 
    590     pub fn config(&self) -> BenchDatasetConfig {
    591         self.config
    592     }
    593 
    594     pub fn groups(&self) -> &[BenchGroup] {
    595         &self.groups
    596     }
    597 
    598     pub fn source_events(&self) -> Vec<&BenchSourceEvent> {
    599         self.group_create_events
    600             .iter()
    601             .chain(self.membership_events.iter())
    602             .chain(self.group_timeline_events.iter())
    603             .chain(self.public_note_events.iter())
    604             .collect()
    605     }
    606 
    607     pub fn source_event_count(&self) -> u64 {
    608         self.source_events()
    609             .len()
    610             .try_into()
    611             .expect("source event count fits in u64")
    612     }
    613 
    614     pub fn group_event_count(&self) -> u64 {
    615         (self.group_create_events.len()
    616             + self.membership_events.len()
    617             + self.group_timeline_events.len())
    618         .try_into()
    619         .expect("group event count fits in u64")
    620     }
    621 
    622     pub fn membership_event_count(&self) -> u64 {
    623         self.membership_events
    624             .len()
    625             .try_into()
    626             .expect("membership event count fits in u64")
    627     }
    628 
    629     pub fn largest_group_members(&self) -> u64 {
    630         self.config
    631             .member_count
    632             .try_into()
    633             .expect("member count fits in u64")
    634     }
    635 
    636     pub fn dataset_digest(&self) -> Result<String, String> {
    637         let mut hasher = Sha256::new();
    638         for event in self.source_events() {
    639             let raw = serde_json::to_string(&event_to_value(event.event()))
    640                 .map_err(|error| error.to_string())?;
    641             hasher.update(raw.as_bytes());
    642             hasher.update(b"\n");
    643         }
    644         Ok(lower_hex(&hasher.finalize()))
    645     }
    646 
    647     pub fn source_events_jsonl(&self) -> Result<String, String> {
    648         let mut output = String::new();
    649         for source in self.source_events() {
    650             let raw = serde_json::to_string(&event_to_value(source.event()))
    651                 .map_err(|error| error.to_string())?;
    652             output.push_str(&raw);
    653             output.push('\n');
    654         }
    655         Ok(output)
    656     }
    657 
    658     fn first_group(&self, visibility: BenchGroupVisibility) -> Result<&BenchGroup, String> {
    659         self.groups
    660             .iter()
    661             .find(|group| group.visibility == visibility)
    662             .ok_or_else(|| format!("dataset does not include {} group", visibility.as_str()))
    663     }
    664 
    665     fn first_timeline_event(&self, visibility: BenchGroupVisibility) -> Result<&Event, String> {
    666         let group = self.first_group(visibility)?;
    667         self.group_timeline_events
    668             .iter()
    669             .find(|source| event_has_group(source.event(), group.id()))
    670             .map(BenchSourceEvent::event)
    671             .ok_or_else(|| {
    672                 format!(
    673                     "dataset does not include {} timeline event",
    674                     visibility.as_str()
    675                 )
    676             })
    677     }
    678 }
    679 
    680 #[derive(Debug, Clone, PartialEq, Eq)]
    681 pub struct DatasetProfile {
    682     pub total_events: u64,
    683     pub group_events: u64,
    684     pub groups: u64,
    685     pub memberships: u64,
    686     pub largest_group_members: u64,
    687     pub dataset_digest: String,
    688     pub fixture_family: String,
    689 }
    690 
    691 impl DatasetProfile {
    692     fn from_dataset(dataset: &BenchDataset) -> Result<Self, String> {
    693         Ok(Self {
    694             total_events: dataset.source_event_count(),
    695             group_events: dataset.group_event_count(),
    696             groups: dataset
    697                 .groups()
    698                 .len()
    699                 .try_into()
    700                 .expect("group count fits in u64"),
    701             memberships: dataset.membership_event_count(),
    702             largest_group_members: dataset.largest_group_members(),
    703             dataset_digest: dataset.dataset_digest()?,
    704             fixture_family: "synthetic repo-owned fixtures".to_owned(),
    705         })
    706     }
    707 
    708     fn to_json(&self) -> serde_json::Value {
    709         json!({
    710             "total_events": self.total_events,
    711             "group_events": self.group_events,
    712             "groups": self.groups,
    713             "memberships": self.memberships,
    714             "largest_group_members": self.largest_group_members,
    715             "dataset_digest": self.dataset_digest,
    716             "fixture_family": self.fixture_family
    717         })
    718     }
    719 }
    720 
    721 #[derive(Debug, Clone, PartialEq)]
    722 pub struct ScenarioReport {
    723     pub scenario: String,
    724     pub attempted: u64,
    725     pub accepted: u64,
    726     pub rejected: u64,
    727     pub elapsed_micros: u64,
    728     pub events_per_second: f64,
    729     pub p50_micros: u64,
    730     pub p95_micros: u64,
    731     pub p99_micros: u64,
    732     pub max_rss_bytes: u64,
    733     pub observations: BTreeMap<String, serde_json::Value>,
    734 }
    735 
    736 impl ScenarioReport {
    737     fn new(
    738         scenario: &str,
    739         attempted: u64,
    740         accepted: u64,
    741         rejected: u64,
    742         elapsed_micros: u64,
    743         mut samples: Vec<u64>,
    744         max_rss_bytes: u64,
    745     ) -> Self {
    746         samples.sort_unstable();
    747         let events_per_second = if elapsed_micros == 0 {
    748             0.0
    749         } else {
    750             attempted as f64 * 1_000_000.0 / elapsed_micros as f64
    751         };
    752         Self {
    753             scenario: scenario.to_owned(),
    754             attempted,
    755             accepted,
    756             rejected,
    757             elapsed_micros,
    758             events_per_second,
    759             p50_micros: percentile(&samples, 50),
    760             p95_micros: percentile(&samples, 95),
    761             p99_micros: percentile(&samples, 99),
    762             max_rss_bytes,
    763             observations: BTreeMap::new(),
    764         }
    765     }
    766 
    767     fn with_observations(mut self, observations: BTreeMap<String, serde_json::Value>) -> Self {
    768         self.observations = observations;
    769         self
    770     }
    771 
    772     fn pass_latency_gate(&self, p95_threshold_micros: u64) -> bool {
    773         self.rejected == 0
    774             && self.accepted == self.attempted
    775             && self.p95_micros <= p95_threshold_micros
    776     }
    777 
    778     fn pass_elapsed_gate(&self, elapsed_threshold_micros: u64) -> bool {
    779         self.rejected == 0
    780             && self.accepted == self.attempted
    781             && self.elapsed_micros <= elapsed_threshold_micros
    782     }
    783 
    784     fn pass_memory_gate(&self, max_bytes: u64) -> bool {
    785         self.rejected == 0 && self.accepted == self.attempted && self.max_rss_bytes <= max_bytes
    786     }
    787 
    788     fn to_json(&self) -> serde_json::Value {
    789         json!({
    790             "scenario": self.scenario,
    791             "status": status(self.accepted == self.attempted && self.rejected == 0),
    792             "attempted": self.attempted,
    793             "accepted": self.accepted,
    794             "rejected": self.rejected,
    795             "elapsed_micros": self.elapsed_micros,
    796             "events_per_second": self.events_per_second,
    797             "p50_micros": self.p50_micros,
    798             "p95_micros": self.p95_micros,
    799             "p99_micros": self.p99_micros,
    800             "max_rss_bytes": self.max_rss_bytes,
    801             "query_metrics": {
    802                 "candidates_scanned": self.attempted,
    803                 "events_returned": self.accepted,
    804                 "events_rejected": self.rejected
    805             },
    806             "memory": {
    807                 "max_rss_bytes": self.max_rss_bytes
    808             },
    809             "observations": &self.observations
    810         })
    811     }
    812 }
    813 
    814 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    815 pub struct BenchmarkThresholds {
    816     pub pocket_query_p95_micros: u64,
    817     pub read_gate_p95_micros: u64,
    818     pub count_resource_controls_p95_micros: u64,
    819     pub projection_rebuild_elapsed_micros: u64,
    820     pub outbox_replay_elapsed_micros: u64,
    821     pub broadcast_lag_p95_micros: u64,
    822     pub memory_profile_max_bytes: u64,
    823 }
    824 
    825 impl BenchmarkThresholds {
    826     pub fn smoke() -> Self {
    827         Self {
    828             pocket_query_p95_micros: 1_000_000,
    829             read_gate_p95_micros: 1_000_000,
    830             count_resource_controls_p95_micros: 1_000_000,
    831             projection_rebuild_elapsed_micros: 5_000_000,
    832             outbox_replay_elapsed_micros: 5_000_000,
    833             broadcast_lag_p95_micros: 1_000_000,
    834             memory_profile_max_bytes: 512 * 1024 * 1024,
    835         }
    836     }
    837 
    838     pub fn medium() -> Self {
    839         Self {
    840             pocket_query_p95_micros: 2_500_000,
    841             read_gate_p95_micros: 2_500_000,
    842             count_resource_controls_p95_micros: 2_500_000,
    843             projection_rebuild_elapsed_micros: 15_000_000,
    844             outbox_replay_elapsed_micros: 15_000_000,
    845             broadcast_lag_p95_micros: 2_500_000,
    846             memory_profile_max_bytes: 768 * 1024 * 1024,
    847         }
    848     }
    849 
    850     pub fn large_smoke() -> Self {
    851         Self {
    852             pocket_query_p95_micros: 5_000_000,
    853             read_gate_p95_micros: 5_000_000,
    854             count_resource_controls_p95_micros: 5_000_000,
    855             projection_rebuild_elapsed_micros: 60_000_000,
    856             outbox_replay_elapsed_micros: 60_000_000,
    857             broadcast_lag_p95_micros: 5_000_000,
    858             memory_profile_max_bytes: 1024 * 1024 * 1024,
    859         }
    860     }
    861 
    862     pub fn proof_10m() -> Self {
    863         Self {
    864             pocket_query_p95_micros: 10_000_000,
    865             read_gate_p95_micros: 10_000_000,
    866             count_resource_controls_p95_micros: 10_000_000,
    867             projection_rebuild_elapsed_micros: 300_000_000,
    868             outbox_replay_elapsed_micros: 300_000_000,
    869             broadcast_lag_p95_micros: 10_000_000,
    870             memory_profile_max_bytes: 16 * 1024 * 1024 * 1024,
    871         }
    872     }
    873 
    874     pub fn proof_large_group() -> Self {
    875         Self {
    876             pocket_query_p95_micros: 10_000_000,
    877             read_gate_p95_micros: 10_000_000,
    878             count_resource_controls_p95_micros: 10_000_000,
    879             projection_rebuild_elapsed_micros: 300_000_000,
    880             outbox_replay_elapsed_micros: 300_000_000,
    881             broadcast_lag_p95_micros: 10_000_000,
    882             memory_profile_max_bytes: 16 * 1024 * 1024 * 1024,
    883         }
    884     }
    885 
    886     pub fn proof_join_storm() -> Self {
    887         Self {
    888             pocket_query_p95_micros: 10_000_000,
    889             read_gate_p95_micros: 10_000_000,
    890             count_resource_controls_p95_micros: 10_000_000,
    891             projection_rebuild_elapsed_micros: 300_000_000,
    892             outbox_replay_elapsed_micros: 300_000_000,
    893             broadcast_lag_p95_micros: 10_000_000,
    894             memory_profile_max_bytes: 16 * 1024 * 1024 * 1024,
    895         }
    896     }
    897 
    898     pub fn proof_slow_client() -> Self {
    899         Self {
    900             pocket_query_p95_micros: 10_000_000,
    901             read_gate_p95_micros: 10_000_000,
    902             count_resource_controls_p95_micros: 10_000_000,
    903             projection_rebuild_elapsed_micros: 300_000_000,
    904             outbox_replay_elapsed_micros: 300_000_000,
    905             broadcast_lag_p95_micros: 10_000_000,
    906             memory_profile_max_bytes: 16 * 1024 * 1024 * 1024,
    907         }
    908     }
    909 
    910     pub fn from_json_str(raw: &str) -> Result<Self, String> {
    911         let value = serde_json::from_str::<serde_json::Value>(raw)
    912             .map_err(|error| format!("benchmark thresholds JSON is invalid: {error}"))?;
    913         Self::from_json_value(&value)
    914     }
    915 
    916     pub fn from_json_value(value: &serde_json::Value) -> Result<Self, String> {
    917         let object = value
    918             .as_object()
    919             .ok_or_else(|| "benchmark thresholds JSON must be an object".to_owned())?;
    920         for key in object.keys() {
    921             if !benchmark_threshold_fields().contains(&key.as_str()) {
    922                 return Err(format!("unknown benchmark threshold field `{key}`"));
    923             }
    924         }
    925         Ok(Self {
    926             pocket_query_p95_micros: threshold_u64(value, "pocket_query_p95_micros")?,
    927             read_gate_p95_micros: threshold_u64(value, "read_gate_p95_micros")?,
    928             count_resource_controls_p95_micros: threshold_u64(
    929                 value,
    930                 "count_resource_controls_p95_micros",
    931             )?,
    932             projection_rebuild_elapsed_micros: threshold_u64(
    933                 value,
    934                 "projection_rebuild_elapsed_micros",
    935             )?,
    936             outbox_replay_elapsed_micros: threshold_u64(value, "outbox_replay_elapsed_micros")?,
    937             broadcast_lag_p95_micros: threshold_u64(value, "broadcast_lag_p95_micros")?,
    938             memory_profile_max_bytes: threshold_u64(value, "memory_profile_max_bytes")?,
    939         })
    940     }
    941 
    942     pub fn to_json(self) -> serde_json::Value {
    943         json!({
    944             "pocket_query_p95_micros": self.pocket_query_p95_micros,
    945             "read_gate_p95_micros": self.read_gate_p95_micros,
    946             "count_resource_controls_p95_micros": self.count_resource_controls_p95_micros,
    947             "projection_rebuild_elapsed_micros": self.projection_rebuild_elapsed_micros,
    948             "outbox_replay_elapsed_micros": self.outbox_replay_elapsed_micros,
    949             "broadcast_lag_p95_micros": self.broadcast_lag_p95_micros,
    950             "memory_profile_max_bytes": self.memory_profile_max_bytes
    951         })
    952     }
    953 }
    954 
    955 fn benchmark_threshold_fields() -> [&'static str; 7] {
    956     [
    957         "pocket_query_p95_micros",
    958         "read_gate_p95_micros",
    959         "count_resource_controls_p95_micros",
    960         "projection_rebuild_elapsed_micros",
    961         "outbox_replay_elapsed_micros",
    962         "broadcast_lag_p95_micros",
    963         "memory_profile_max_bytes",
    964     ]
    965 }
    966 
    967 fn threshold_u64(value: &serde_json::Value, field: &str) -> Result<u64, String> {
    968     let value = value
    969         .get(field)
    970         .ok_or_else(|| format!("missing benchmark threshold field `{field}`"))?;
    971     let Some(value) = value.as_u64() else {
    972         return Err(format!(
    973             "benchmark threshold field `{field}` must be an unsigned integer"
    974         ));
    975     };
    976     if value == 0 {
    977         return Err(format!(
    978             "benchmark threshold field `{field}` must be greater than zero"
    979         ));
    980     }
    981     Ok(value)
    982 }
    983 
    984 #[derive(Debug, Clone, PartialEq)]
    985 pub struct BenchmarkRunReport {
    986     dataset: BenchDataset,
    987     dataset_profile: DatasetProfile,
    988     profile: BenchmarkProfile,
    989     scenarios: Vec<ScenarioReport>,
    990     validation_summary: BTreeMap<String, String>,
    991 }
    992 
    993 impl BenchmarkRunReport {
    994     pub fn run(profile: BenchmarkProfile) -> Result<Self, String> {
    995         profile.validate_for_run()?;
    996         let dataset = BenchDataset::generate(profile.dataset_config())?;
    997         let thresholds = profile.thresholds();
    998         let pocket_query = run_pocket_query_benchmark(&dataset)?;
    999         let read_gate = run_read_gate_benchmark(&dataset)?;
   1000         let count_resource_controls = run_count_resource_control_benchmark(&dataset)?;
   1001         let projection_rebuild = run_projection_rebuild_benchmark(&dataset)?;
   1002         let outbox_replay = run_outbox_replay_benchmark(&dataset)?;
   1003         let broadcast_lag = run_broadcast_lag_benchmark(&dataset)?;
   1004         let memory_profile = run_memory_profile_benchmark(&dataset)?;
   1005         let mut scenarios = vec![
   1006             pocket_query,
   1007             read_gate,
   1008             count_resource_controls,
   1009             projection_rebuild,
   1010             outbox_replay,
   1011             broadcast_lag,
   1012             memory_profile,
   1013         ];
   1014         if profile.name() == BenchmarkProfileName::VirtualRelayTenancy {
   1015             scenarios.extend(run_virtual_relay_tenancy_benchmarks(
   1016                 VirtualRelayTenancyConfig::mvp(),
   1017             )?);
   1018         }
   1019         let validation_summary = validation_summary(&scenarios, thresholds)?;
   1020         let dataset_profile = DatasetProfile::from_dataset(&dataset)?;
   1021         Ok(Self {
   1022             dataset,
   1023             dataset_profile,
   1024             profile,
   1025             scenarios,
   1026             validation_summary,
   1027         })
   1028     }
   1029 
   1030     pub fn dataset(&self) -> &BenchDataset {
   1031         &self.dataset
   1032     }
   1033 
   1034     pub fn dataset_profile(&self) -> &DatasetProfile {
   1035         &self.dataset_profile
   1036     }
   1037 
   1038     pub fn profile(&self) -> &BenchmarkProfile {
   1039         &self.profile
   1040     }
   1041 
   1042     pub fn scenarios(&self) -> &[ScenarioReport] {
   1043         &self.scenarios
   1044     }
   1045 
   1046     pub fn scenario(&self, name: &str) -> Option<&ScenarioReport> {
   1047         self.scenarios
   1048             .iter()
   1049             .find(|scenario| scenario.scenario == name)
   1050     }
   1051 
   1052     pub fn validation_summary(&self) -> &BTreeMap<String, String> {
   1053         &self.validation_summary
   1054     }
   1055 
   1056     pub fn summary_json(&self, run_id: &str, artifact_directory: &Path) -> serde_json::Value {
   1057         json!({
   1058             "schema": 2,
   1059             "run_id": run_id,
   1060             "artifact_directory": artifact_directory.to_string_lossy(),
   1061             "profile": self.profile.name().as_str(),
   1062             "dataset": self.dataset_profile.to_json(),
   1063             "dataset_profile": self.dataset_profile.to_json(),
   1064             "scenarios": self.scenarios.iter().map(ScenarioReport::to_json).collect::<Vec<_>>(),
   1065             "pocket_source": pocket_source_json(),
   1066             "threshold_source": self.profile.threshold_source(),
   1067             "thresholds": self.profile.thresholds().to_json(),
   1068             "validation_summary": self.validation_summary,
   1069             "pass_fail_summary": {
   1070                 "overall_status": validation_overall_status(&self.validation_summary),
   1071                 "passed_scenarios": self
   1072                     .validation_summary
   1073                     .values()
   1074                     .filter(|value| value.as_str() == "pass")
   1075                     .count(),
   1076                 "failed_scenarios": self
   1077                     .validation_summary
   1078                     .values()
   1079                     .filter(|value| value.as_str() == "fail")
   1080                     .count()
   1081             },
   1082             "proof_claim": {
   1083                 "eligible": self.profile.proof_claim_eligible(),
   1084                 "profile_required": "proof-*",
   1085                 "target_hardware_evidence": self
   1086                     .profile
   1087                     .target_hardware_evidence()
   1088                     .unwrap_or("absent")
   1089             },
   1090             "tangle_v1_mvp": {
   1091                 "benchmark_evidence": virtual_relay_tenancy_summary_json(
   1092                     self.profile.name(),
   1093                     &self.scenarios
   1094                 ),
   1095                 "external_integration": external_integration_json()
   1096             },
   1097             "artifacts": {
   1098                 "summary_json": "summary.json",
   1099                 "dataset_events_jsonl": "dataset-events.jsonl"
   1100             }
   1101         })
   1102     }
   1103 }
   1104 
   1105 fn pocket_source_json() -> serde_json::Value {
   1106     json!({
   1107         "repository": POCKET_SOURCE_REPOSITORY,
   1108         "revision": POCKET_SOURCE_REVISION,
   1109         "crates": ["pocket-db", "pocket-types"]
   1110     })
   1111 }
   1112 
   1113 fn virtual_relay_tenancy_summary_json(
   1114     profile_name: BenchmarkProfileName,
   1115     scenarios: &[ScenarioReport],
   1116 ) -> serde_json::Value {
   1117     let fanout_scenarios = [
   1118         SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT,
   1119         SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT,
   1120         SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT,
   1121     ]
   1122     .into_iter()
   1123     .filter_map(|name| scenarios.iter().find(|scenario| scenario.scenario == name))
   1124     .map(ScenarioReport::to_json)
   1125     .collect::<Vec<_>>();
   1126     let status = if profile_name == BenchmarkProfileName::VirtualRelayTenancy
   1127         && fanout_scenarios.len() == 3
   1128     {
   1129         "pass"
   1130     } else {
   1131         "not_run"
   1132     };
   1133     json!({
   1134         "status": status,
   1135         "required_profile": BenchmarkProfileName::VirtualRelayTenancy.as_str(),
   1136         "configured_profile": profile_name.as_str(),
   1137         "target": VirtualRelayTenancyConfig::mvp().to_json(),
   1138         "fanout_scenarios": fanout_scenarios
   1139     })
   1140 }
   1141 
   1142 fn external_integration_json() -> serde_json::Value {
   1143     json!({
   1144         "runner": "radroots_testing_tangle_integration",
   1145         "local_status": "unavailable",
   1146         "unavailable_command": "cargo test -p radroots_testing_tangle_integration -- tangle_v1_mvp",
   1147         "repo_boundary": "not present in the standalone Tangle workspace",
   1148         "required_scenarios": [
   1149             "host_routing",
   1150             "tenant_isolation",
   1151             "auth_realm_isolation",
   1152             "nip29_same_group_isolation",
   1153             "backup_export_boundaries"
   1154         ]
   1155     })
   1156 }
   1157 
   1158 fn validation_overall_status(summary: &BTreeMap<String, String>) -> &'static str {
   1159     if summary.values().all(|value| value == "pass") {
   1160         "pass"
   1161     } else {
   1162         "fail"
   1163     }
   1164 }
   1165 
   1166 struct MaterializedBenchRelay {
   1167     relay: BaseRelay,
   1168     store_config: PocketStoreConfig,
   1169     ingest_report: ScenarioReport,
   1170 }
   1171 
   1172 fn run_pocket_query_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1173     let mut materialized = materialize_dataset(dataset, "pocket-query", 128)?;
   1174     let public_group = dataset.first_group(BenchGroupVisibility::Public)?;
   1175     let public_event = dataset.first_timeline_event(BenchGroupVisibility::Public)?;
   1176     let owner_auth = authenticated(FixtureKey::Owner)?;
   1177     let owner = FixtureKey::Owner.public_key();
   1178     let created_at = public_event.unsigned().created_at().as_u64();
   1179     let operations = vec![
   1180         QueryOperation::new(
   1181             "pocket-h",
   1182             filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()], "limit": 50}))?,
   1183             QueryAuth::None,
   1184             QueryExpectation::AtLeast(1),
   1185         ),
   1186         QueryOperation::new(
   1187             "pocket-d",
   1188             filter_from_value(&json!({
   1189                 "kinds": [KIND_GROUP_METADATA],
   1190                 "#d": [public_group.id()],
   1191                 "limit": 10
   1192             }))?,
   1193             QueryAuth::None,
   1194             QueryExpectation::AtLeast(1),
   1195         ),
   1196         QueryOperation::new(
   1197             "pocket-kind-author-window-limit",
   1198             filter_from_value(&json!({
   1199                 "kinds": [1],
   1200                 "authors": [owner.as_str()],
   1201                 "since": created_at.saturating_sub(1),
   1202                 "until": created_at.saturating_add(100_000),
   1203                 "limit": 25
   1204             }))?,
   1205             QueryAuth::Owner,
   1206             QueryExpectation::AtLeast(1),
   1207         ),
   1208         QueryOperation::new(
   1209             "pocket-count",
   1210             filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?,
   1211             QueryAuth::None,
   1212             QueryExpectation::AtLeast(1),
   1213         ),
   1214     ];
   1215     let started = Instant::now();
   1216     let mut samples = Vec::with_capacity(operations.len());
   1217     let mut accepted = 0;
   1218     let mut rejected = 0;
   1219     for operation in operations {
   1220         let sample = Instant::now();
   1221         let observed = match operation.name {
   1222             "pocket-count" => count_for_operation(&materialized.relay, &operation, &owner_auth)?,
   1223             _ => query_for_operation(&mut materialized.relay, &operation, &owner_auth)?,
   1224         };
   1225         samples.push(elapsed_micros(sample));
   1226         if operation.expectation.matches(observed) {
   1227             accepted += 1;
   1228         } else {
   1229             rejected += 1;
   1230         }
   1231     }
   1232     Ok(ScenarioReport::new(
   1233         SCENARIO_POCKET_QUERY_VISIBLE_EVENTS,
   1234         accepted + rejected,
   1235         accepted,
   1236         rejected,
   1237         elapsed_micros(started),
   1238         samples,
   1239         materialized.ingest_report.max_rss_bytes,
   1240     ))
   1241 }
   1242 
   1243 fn run_read_gate_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1244     let mut materialized = materialize_dataset(dataset, "read-gate", 128)?;
   1245     let public_group = dataset.first_group(BenchGroupVisibility::Public)?;
   1246     let private_group = dataset.first_group(BenchGroupVisibility::Private)?;
   1247     let hidden_group = dataset.first_group(BenchGroupVisibility::Hidden)?;
   1248     let owner_auth = authenticated(FixtureKey::Owner)?;
   1249     let operations = vec![
   1250         QueryOperation::new(
   1251             "public-unauth",
   1252             filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?,
   1253             QueryAuth::None,
   1254             QueryExpectation::AtLeast(1),
   1255         ),
   1256         QueryOperation::new(
   1257             "private-unauth",
   1258             filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?,
   1259             QueryAuth::None,
   1260             QueryExpectation::Exactly(0),
   1261         ),
   1262         QueryOperation::new(
   1263             "private-owner",
   1264             filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?,
   1265             QueryAuth::Owner,
   1266             QueryExpectation::AtLeast(1),
   1267         ),
   1268         QueryOperation::new(
   1269             "hidden-metadata-unauth",
   1270             filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?,
   1271             QueryAuth::None,
   1272             QueryExpectation::Exactly(0),
   1273         ),
   1274         QueryOperation::new(
   1275             "hidden-metadata-owner",
   1276             filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?,
   1277             QueryAuth::Owner,
   1278             QueryExpectation::AtLeast(1),
   1279         ),
   1280         QueryOperation::new(
   1281             "private-count-unauth",
   1282             filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?,
   1283             QueryAuth::None,
   1284             QueryExpectation::Exactly(0),
   1285         ),
   1286         QueryOperation::new(
   1287             "private-count-owner",
   1288             filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?,
   1289             QueryAuth::Owner,
   1290             QueryExpectation::AtLeast(1),
   1291         ),
   1292     ];
   1293     let started = Instant::now();
   1294     let mut samples = Vec::with_capacity(operations.len());
   1295     let mut accepted = 0;
   1296     let mut rejected = 0;
   1297     for operation in operations {
   1298         let sample = Instant::now();
   1299         let observed = if operation.name.contains("count") {
   1300             count_for_operation(&materialized.relay, &operation, &owner_auth)?
   1301         } else {
   1302             query_for_operation(&mut materialized.relay, &operation, &owner_auth)?
   1303         };
   1304         samples.push(elapsed_micros(sample));
   1305         if operation.expectation.matches(observed) {
   1306             accepted += 1;
   1307         } else {
   1308             rejected += 1;
   1309         }
   1310     }
   1311     Ok(ScenarioReport::new(
   1312         SCENARIO_GROUP_READ_GATE_OVERHEAD,
   1313         accepted + rejected,
   1314         accepted,
   1315         rejected,
   1316         elapsed_micros(started),
   1317         samples,
   1318         materialized.ingest_report.max_rss_bytes,
   1319     ))
   1320 }
   1321 
   1322 fn run_count_resource_control_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1323     let materialized = materialize_dataset(dataset, "count-resource-controls", 128)?;
   1324     let public_group = dataset.first_group(BenchGroupVisibility::Public)?;
   1325     let private_group = dataset.first_group(BenchGroupVisibility::Private)?;
   1326     let owner_auth = authenticated(FixtureKey::Owner)?;
   1327     let operations = vec![
   1328         QueryOperation::new(
   1329             "bounded-public-count",
   1330             filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?,
   1331             QueryAuth::None,
   1332             QueryExpectation::Exactly(
   1333                 dataset
   1334                     .config
   1335                     .public_events_per_group
   1336                     .try_into()
   1337                     .expect("public event count fits in u64"),
   1338             ),
   1339         ),
   1340         QueryOperation::new(
   1341             "bounded-private-owner-count",
   1342             filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?,
   1343             QueryAuth::Owner,
   1344             QueryExpectation::Exactly(
   1345                 dataset
   1346                     .config
   1347                     .private_events_per_group
   1348                     .try_into()
   1349                     .expect("private event count fits in u64"),
   1350             ),
   1351         ),
   1352     ];
   1353     let started = Instant::now();
   1354     let mut samples = Vec::with_capacity(operations.len() + 3);
   1355     let mut accepted = 0;
   1356     let mut rejected = 0;
   1357     for operation in operations {
   1358         let sample = Instant::now();
   1359         let observed = count_for_operation(&materialized.relay, &operation, &owner_auth)?;
   1360         samples.push(elapsed_micros(sample));
   1361         if operation.expectation.matches(observed) {
   1362             accepted += 1;
   1363         } else {
   1364             rejected += 1;
   1365         }
   1366     }
   1367     let runtime = tokio::runtime::Builder::new_current_thread()
   1368         .build()
   1369         .map_err(|error| format!("failed to build count resource benchmark runtime: {error}"))?;
   1370     let probe = runtime.block_on(runtime_count_resource_control_probe())?;
   1371     samples.extend(probe.samples);
   1372     accepted += probe.accepted;
   1373     rejected += probe.rejected;
   1374     Ok(ScenarioReport::new(
   1375         SCENARIO_COUNT_RESOURCE_CONTROLS,
   1376         accepted + rejected,
   1377         accepted,
   1378         rejected,
   1379         elapsed_micros(started),
   1380         samples,
   1381         materialized.ingest_report.max_rss_bytes,
   1382     ))
   1383 }
   1384 
   1385 fn run_projection_rebuild_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1386     let mut materialized = materialize_dataset(dataset, "projection-rebuild", 128)?;
   1387     materialized
   1388         .relay
   1389         .shutdown()
   1390         .map_err(|error| error.to_string())?;
   1391     let started = Instant::now();
   1392     let reopened = BaseRelay::open_with_groups(
   1393         &materialized.store_config,
   1394         relay_limits(128),
   1395         &group_config()?,
   1396         PocketQueryConfig::default(),
   1397     )
   1398     .map_err(|error| error.to_string())?;
   1399     let elapsed = elapsed_micros(started);
   1400     let projection = reopened
   1401         .group_projection()
   1402         .ok_or_else(|| "group projection is unavailable".to_owned())?;
   1403     let groups_match = projection.groups().len() == dataset.groups().len();
   1404     let members_match = projection
   1405         .members()
   1406         .values()
   1407         .filter(|member| member.status() == MemberStatus::Member)
   1408         .count()
   1409         == usize::try_from(dataset.membership_event_count())
   1410             .expect("membership count fits in usize");
   1411     let accepted = u64::from(groups_match && members_match);
   1412     Ok(ScenarioReport::new(
   1413         SCENARIO_PROJECTION_REBUILD,
   1414         1,
   1415         accepted,
   1416         1 - accepted,
   1417         elapsed,
   1418         vec![elapsed],
   1419         estimate_memory_bytes(dataset),
   1420     ))
   1421 }
   1422 
   1423 fn run_outbox_replay_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1424     let mut materialized = materialize_dataset(dataset, "outbox-replay", 128)?;
   1425     let before = generated_state_counts(&materialized.relay)?;
   1426     materialized
   1427         .relay
   1428         .shutdown()
   1429         .map_err(|error| error.to_string())?;
   1430     let started = Instant::now();
   1431     let mut reopened = BaseRelay::open_with_groups(
   1432         &materialized.store_config,
   1433         relay_limits(128),
   1434         &group_config()?,
   1435         PocketQueryConfig::default(),
   1436     )
   1437     .map_err(|error| error.to_string())?;
   1438     let after_first = generated_state_counts(&reopened)?;
   1439     reopened.shutdown().map_err(|error| error.to_string())?;
   1440     let reopened = BaseRelay::open_with_groups(
   1441         &materialized.store_config,
   1442         relay_limits(128),
   1443         &group_config()?,
   1444         PocketQueryConfig::default(),
   1445     )
   1446     .map_err(|error| error.to_string())?;
   1447     let after_second = generated_state_counts(&reopened)?;
   1448     let elapsed = elapsed_micros(started);
   1449     let accepted = u64::from(before == after_first && before == after_second);
   1450     Ok(ScenarioReport::new(
   1451         SCENARIO_OUTBOX_REPLAY,
   1452         1,
   1453         accepted,
   1454         1 - accepted,
   1455         elapsed,
   1456         vec![elapsed],
   1457         estimate_memory_bytes(dataset),
   1458     ))
   1459 }
   1460 
   1461 fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1462     let mut materialized = materialize_dataset(dataset, "broadcast-lag", 1)?;
   1463     let public_group = dataset.first_group(BenchGroupVisibility::Public)?;
   1464     let subscriber_count = dataset.config.group_count.max(4);
   1465     let filter = filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?;
   1466     let pocket_filter = pocket_filter(&filter)?;
   1467     for index in 0..subscriber_count {
   1468         materialized
   1469             .relay
   1470             .handle_pocket_req(
   1471                 subscription(&format!("lag-{index:04}"))?,
   1472                 vec![pocket_filter.clone()],
   1473             )
   1474             .map_err(|error| error.to_string())?;
   1475     }
   1476     let first = pocket_protocol_group_event(
   1477         FixtureKey::Owner,
   1478         public_group.id(),
   1479         1_714_600_000,
   1480         1,
   1481         "broadcast lag first",
   1482     )?;
   1483     let second = pocket_protocol_group_event(
   1484         FixtureKey::Owner,
   1485         public_group.id(),
   1486         1_714_600_001,
   1487         1,
   1488         "broadcast lag second",
   1489     )?;
   1490     let started = Instant::now();
   1491     let first_pocket = pocket_event(&first)?;
   1492     let second_pocket = pocket_event(&second)?;
   1493     let first_messages = materialized.relay.fanout_pocket(&first_pocket);
   1494     let second_messages = materialized.relay.fanout_pocket(&second_pocket);
   1495     let elapsed = elapsed_micros(started);
   1496     let first_events = first_messages
   1497         .iter()
   1498         .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. }))
   1499         .count();
   1500     let second_events = second_messages
   1501         .iter()
   1502         .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. }))
   1503         .count();
   1504     let accepted = if first_events == subscriber_count
   1505         && second_events == subscriber_count
   1506         && materialized.relay.active_subscription_count() == subscriber_count
   1507     {
   1508         subscriber_count
   1509     } else {
   1510         0
   1511     };
   1512     let attempted = subscriber_count
   1513         .try_into()
   1514         .expect("subscriber count fits in u64");
   1515     let accepted = accepted.try_into().expect("accepted fits in u64");
   1516     Ok(ScenarioReport::new(
   1517         SCENARIO_BROADCAST_LAG,
   1518         attempted,
   1519         accepted,
   1520         attempted - accepted,
   1521         elapsed,
   1522         vec![elapsed],
   1523         materialized.ingest_report.max_rss_bytes,
   1524     ))
   1525 }
   1526 
   1527 fn run_memory_profile_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> {
   1528     let started = Instant::now();
   1529     let estimated = estimate_memory_bytes(dataset);
   1530     let elapsed = elapsed_micros(started);
   1531     Ok(ScenarioReport::new(
   1532         SCENARIO_MEMORY_PROFILE,
   1533         1,
   1534         1,
   1535         0,
   1536         elapsed,
   1537         vec![elapsed],
   1538         estimated,
   1539     ))
   1540 }
   1541 
   1542 struct VirtualRelayTenantBench {
   1543     tenant_index: usize,
   1544     relay: BaseRelay,
   1545 }
   1546 
   1547 fn run_virtual_relay_tenancy_benchmarks(
   1548     config: VirtualRelayTenancyConfig,
   1549 ) -> Result<Vec<ScenarioReport>, String> {
   1550     let config = config.validate()?;
   1551     let mut tenants = materialize_virtual_relay_tenants(config)?;
   1552     let scenarios = [
   1553         (
   1554             SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT,
   1555             "fanout-001",
   1556             1,
   1557             config.one_percent_fanout(),
   1558         ),
   1559         (
   1560             SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT,
   1561             "fanout-010",
   1562             10,
   1563             config.ten_percent_fanout(),
   1564         ),
   1565         (
   1566             SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT,
   1567             "fanout-100",
   1568             100,
   1569             config.aggregate_active_subscriptions(),
   1570         ),
   1571     ];
   1572     let mut reports = Vec::with_capacity(scenarios.len());
   1573     for (scenario, tag_value, fanout_percent, expected_messages) in scenarios {
   1574         reports.push(run_virtual_relay_fanout_scenario(
   1575             &mut tenants,
   1576             config,
   1577             scenario,
   1578             tag_value,
   1579             fanout_percent,
   1580             expected_messages,
   1581         )?);
   1582     }
   1583     for tenant in &mut tenants {
   1584         tenant.relay.shutdown().map_err(|error| error.to_string())?;
   1585     }
   1586     Ok(reports)
   1587 }
   1588 
   1589 fn materialize_virtual_relay_tenants(
   1590     config: VirtualRelayTenancyConfig,
   1591 ) -> Result<Vec<VirtualRelayTenantBench>, String> {
   1592     let mut tenants = Vec::with_capacity(config.tenant_count);
   1593     for tenant_index in 0..config.tenant_count {
   1594         let store_config = bench_store_config(&format!("virtual-relay-tenancy-{tenant_index}"))?;
   1595         let mut relay = BaseRelay::open(
   1596             &store_config,
   1597             relay_limits_with_subscriptions(
   1598                 config.max_pending_events_per_subscription,
   1599                 config.subscriptions_per_tenant,
   1600             ),
   1601             PocketQueryConfig::default(),
   1602         )
   1603         .map_err(|error| error.to_string())?;
   1604         for subscription_index in 0..config.subscriptions_per_tenant {
   1605             let global_index = tenant_index * config.subscriptions_per_tenant + subscription_index;
   1606             let filter = virtual_relay_subscription_filter(config, tenant_index, global_index)?;
   1607             relay
   1608                 .handle_pocket_req(
   1609                     subscription(&format!("vr-{tenant_index:02}-{subscription_index:04}"))?,
   1610                     vec![filter],
   1611                 )
   1612                 .map_err(|error| error.to_string())?;
   1613         }
   1614         tenants.push(VirtualRelayTenantBench {
   1615             tenant_index,
   1616             relay,
   1617         });
   1618     }
   1619     Ok(tenants)
   1620 }
   1621 
   1622 fn virtual_relay_subscription_filter(
   1623     config: VirtualRelayTenancyConfig,
   1624     tenant_index: usize,
   1625     global_index: usize,
   1626 ) -> Result<PocketOwnedFilter, String> {
   1627     let mut tag_values = vec!["fanout-100"];
   1628     if tenant_index == 0 {
   1629         tag_values.push("fanout-010");
   1630     }
   1631     if global_index < config.one_percent_fanout() {
   1632         tag_values.push("fanout-001");
   1633     }
   1634     pocket_filter_from_value(&json!({"kinds": [1], "#t": tag_values}))
   1635 }
   1636 
   1637 fn run_virtual_relay_fanout_scenario(
   1638     tenants: &mut [VirtualRelayTenantBench],
   1639     config: VirtualRelayTenancyConfig,
   1640     scenario: &str,
   1641     tag_value: &str,
   1642     fanout_percent: u64,
   1643     expected_messages: usize,
   1644 ) -> Result<ScenarioReport, String> {
   1645     let started = Instant::now();
   1646     let mut samples = Vec::new();
   1647     let mut observed_messages = 0_usize;
   1648     let mut publish_tasks = 0_usize;
   1649     for tenant in tenants {
   1650         if !virtual_relay_tenant_participates(fanout_percent, tenant.tenant_index) {
   1651             continue;
   1652         }
   1653         let publish_started = Instant::now();
   1654         let event = pocket_protocol_event(
   1655             FixtureKey::Member,
   1656             1_714_800_000
   1657                 + u64::try_from(tenant.tenant_index).expect("tenant index fits in u64")
   1658                 + fanout_percent,
   1659             1,
   1660             vec![Tag::from_parts("t", &[tag_value]).map_err(|error| error.to_string())?],
   1661             &format!("virtual relay fanout {fanout_percent} percent"),
   1662         )?;
   1663         let pocket = pocket_event(&event)?;
   1664         let messages = tenant.relay.fanout_pocket(&pocket);
   1665         samples.push(elapsed_micros(publish_started));
   1666         observed_messages += messages
   1667             .iter()
   1668             .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. }))
   1669             .count();
   1670         publish_tasks += 1;
   1671     }
   1672     let elapsed = elapsed_micros(started);
   1673     let attempted = u64::try_from(expected_messages).expect("expected message count fits in u64");
   1674     let accepted = u64::try_from(observed_messages).expect("observed message count fits in u64");
   1675     let rejected = attempted.saturating_sub(accepted) + accepted.saturating_sub(attempted);
   1676     let max_rss_bytes = estimate_virtual_relay_memory_bytes(config);
   1677     let mut observations = BTreeMap::new();
   1678     observations.insert("tenant_count".to_owned(), json!(config.tenant_count));
   1679     observations.insert(
   1680         "aggregate_active_subscriptions".to_owned(),
   1681         json!(config.aggregate_active_subscriptions()),
   1682     );
   1683     observations.insert(
   1684         "busiest_tenant_active_subscriptions".to_owned(),
   1685         json!(config.busiest_tenant_active_subscriptions()),
   1686     );
   1687     observations.insert("fanout_percent".to_owned(), json!(fanout_percent));
   1688     observations.insert("publish_task_count".to_owned(), json!(publish_tasks));
   1689     observations.insert(
   1690         "expected_enqueued_events".to_owned(),
   1691         json!(expected_messages),
   1692     );
   1693     observations.insert(
   1694         "observed_enqueued_events".to_owned(),
   1695         json!(observed_messages),
   1696     );
   1697     observations.insert(
   1698         "observable_queue_depth".to_owned(),
   1699         json!(observed_messages),
   1700     );
   1701     observations.insert(
   1702         "max_pending_events_per_subscription".to_owned(),
   1703         json!(config.max_pending_events_per_subscription),
   1704     );
   1705     observations.insert("memory_estimate_bytes".to_owned(), json!(max_rss_bytes));
   1706     observations.insert(
   1707         "tenant_publish_model".to_owned(),
   1708         json!("one publish per targeted virtual relay tenant"),
   1709     );
   1710     Ok(ScenarioReport::new(
   1711         scenario,
   1712         attempted,
   1713         accepted,
   1714         rejected,
   1715         elapsed,
   1716         samples,
   1717         max_rss_bytes,
   1718     )
   1719     .with_observations(observations))
   1720 }
   1721 
   1722 fn virtual_relay_tenant_participates(fanout_percent: u64, tenant_index: usize) -> bool {
   1723     match fanout_percent {
   1724         1 | 10 => tenant_index == 0,
   1725         100 => true,
   1726         _ => false,
   1727     }
   1728 }
   1729 
   1730 struct CountResourceControlProbe {
   1731     accepted: u64,
   1732     rejected: u64,
   1733     samples: Vec<u64>,
   1734 }
   1735 
   1736 async fn runtime_count_resource_control_probe() -> Result<CountResourceControlProbe, String> {
   1737     let root = bench_temp_root("count-resource-controls-runtime");
   1738     let _ = fs::remove_dir_all(&root);
   1739     let handle = RelayRuntimeHandle::new(
   1740         RelayRuntime::open(bench_runtime_config(&root)?).map_err(|error| error.to_string())?,
   1741     );
   1742     let mut auth = handle
   1743         .auth_state()
   1744         .await
   1745         .map_err(|error| error.to_string())?;
   1746     let cases = [
   1747         ("broad-empty-selector-count", json!({"limit": 1})),
   1748         ("broad-kind-only-count", json!({"kinds": [1], "limit": 1})),
   1749         (
   1750             "broad-high-limit-count",
   1751             json!({"kinds": [1], "#h": ["BenchFarm0000"], "limit": 500}),
   1752         ),
   1753     ];
   1754     let mut samples = Vec::with_capacity(cases.len());
   1755     let mut accepted = 0;
   1756     let mut rejected = 0;
   1757     for (name, filter) in cases {
   1758         let sample = Instant::now();
   1759         let subscription_id = subscription(name)?;
   1760         let pocket_filter = parse_pocket_filter_json(filter.to_string().as_bytes())
   1761             .map_err(|error| error.to_string())?;
   1762         let replies = handle
   1763             .handle_count_pocket(
   1764                 subscription_id.clone(),
   1765                 vec![pocket_filter],
   1766                 &mut auth,
   1767                 UnixTimestamp::new(1_714_700_000),
   1768             )
   1769             .await
   1770             .map_err(|error| error.to_string())?;
   1771         samples.push(elapsed_micros(sample));
   1772         if replies
   1773             == vec![RelayMessage::Closed {
   1774                 subscription_id,
   1775                 message: "restricted: count filters are too broad or expensive".to_owned(),
   1776             }]
   1777         {
   1778             accepted += 1;
   1779         } else {
   1780             rejected += 1;
   1781         }
   1782     }
   1783     let metrics = handle.metrics();
   1784     if metrics.count_refusals() != accepted || metrics.broad_query_rejections() != accepted {
   1785         rejected += 1;
   1786     }
   1787     let _ = fs::remove_dir_all(root);
   1788     Ok(CountResourceControlProbe {
   1789         accepted,
   1790         rejected,
   1791         samples,
   1792     })
   1793 }
   1794 
   1795 fn materialize_dataset(
   1796     dataset: &BenchDataset,
   1797     run_name: &str,
   1798     max_pending_events: usize,
   1799 ) -> Result<MaterializedBenchRelay, String> {
   1800     let store_config = bench_store_config(run_name)?;
   1801     let relay = BaseRelay::open_with_groups(
   1802         &store_config,
   1803         relay_limits(max_pending_events),
   1804         &group_config()?,
   1805         PocketQueryConfig::default(),
   1806     )
   1807     .map_err(|error| error.to_string())?;
   1808     let owner_auth = authenticated(FixtureKey::Owner)?;
   1809     let admin_auth = authenticated(FixtureKey::Admin)?;
   1810     let started = Instant::now();
   1811     let mut samples = Vec::with_capacity(dataset.source_events().len());
   1812     let mut accepted = 0;
   1813     let mut rejected = 0;
   1814     for source in dataset.source_events() {
   1815         let sample = Instant::now();
   1816         let pocket_event =
   1817             parse_pocket_event_json(event_to_value(source.event()).to_string().as_bytes())
   1818                 .map_err(|error| error.to_string())?;
   1819         let message = match source.auth() {
   1820             BenchEventAuth::None => relay
   1821                 .handle_pocket_event(&pocket_event)
   1822                 .map_err(|error| error.to_string())?,
   1823             BenchEventAuth::Owner => relay
   1824                 .handle_pocket_event_with_auth(&pocket_event, &owner_auth)
   1825                 .map_err(|error| error.to_string())?,
   1826             BenchEventAuth::Admin => relay
   1827                 .handle_pocket_event_with_auth(&pocket_event, &admin_auth)
   1828                 .map_err(|error| error.to_string())?,
   1829         };
   1830         samples.push(elapsed_micros(sample));
   1831         if ok_accepted(&message) {
   1832             accepted += 1;
   1833         } else {
   1834             rejected += 1;
   1835         }
   1836     }
   1837     let attempted = accepted + rejected;
   1838     let ingest_report = ScenarioReport::new(
   1839         "dataset_ingest",
   1840         attempted,
   1841         accepted,
   1842         rejected,
   1843         elapsed_micros(started),
   1844         samples,
   1845         estimate_memory_bytes(dataset),
   1846     );
   1847     Ok(MaterializedBenchRelay {
   1848         relay,
   1849         store_config,
   1850         ingest_report,
   1851     })
   1852 }
   1853 
   1854 fn relay_limits(max_pending_events: usize) -> BaseRelayLimits {
   1855     relay_limits_with_subscriptions(max_pending_events, 512)
   1856 }
   1857 
   1858 fn relay_limits_with_subscriptions(
   1859     max_pending_events: usize,
   1860     max_subscriptions: usize,
   1861 ) -> BaseRelayLimits {
   1862     BaseRelayLimits::new(BaseRelayLimitSettings {
   1863         max_pending_events,
   1864         max_subscription_id_length: 64,
   1865         max_subscriptions,
   1866         max_filters_per_request: 10,
   1867         max_tag_values_per_filter: 100,
   1868         max_query_complexity: 610,
   1869         max_event_tags: 200,
   1870         max_content_length: 65_536,
   1871         max_limit: 500,
   1872         default_limit: 100,
   1873     })
   1874     .expect("benchmark relay limits")
   1875 }
   1876 
   1877 #[derive(Clone)]
   1878 struct QueryOperation {
   1879     name: &'static str,
   1880     filter: Filter,
   1881     auth: QueryAuth,
   1882     expectation: QueryExpectation,
   1883 }
   1884 
   1885 impl QueryOperation {
   1886     fn new(
   1887         name: &'static str,
   1888         filter: Filter,
   1889         auth: QueryAuth,
   1890         expectation: QueryExpectation,
   1891     ) -> Self {
   1892         Self {
   1893             name,
   1894             filter,
   1895             auth,
   1896             expectation,
   1897         }
   1898     }
   1899 }
   1900 
   1901 #[derive(Clone, Copy)]
   1902 enum QueryAuth {
   1903     None,
   1904     Owner,
   1905 }
   1906 
   1907 #[derive(Clone, Copy)]
   1908 enum QueryExpectation {
   1909     Exactly(u64),
   1910     AtLeast(u64),
   1911 }
   1912 
   1913 impl QueryExpectation {
   1914     fn matches(self, observed: u64) -> bool {
   1915         match self {
   1916             Self::Exactly(expected) => observed == expected,
   1917             Self::AtLeast(expected) => observed >= expected,
   1918         }
   1919     }
   1920 }
   1921 
   1922 fn query_for_operation(
   1923     relay: &mut BaseRelay,
   1924     operation: &QueryOperation,
   1925     owner_auth: &BaseAuthState,
   1926 ) -> Result<u64, String> {
   1927     let subscription_id = subscription(operation.name)?;
   1928     let messages = match operation.auth {
   1929         QueryAuth::None => relay
   1930             .handle_pocket_req(
   1931                 subscription_id.clone(),
   1932                 vec![pocket_filter(&operation.filter)?],
   1933             )
   1934             .map_err(|error| error.to_string())?,
   1935         QueryAuth::Owner => relay
   1936             .handle_pocket_req_with_auth(
   1937                 subscription_id.clone(),
   1938                 vec![pocket_filter(&operation.filter)?],
   1939                 owner_auth,
   1940             )
   1941             .map_err(|error| error.to_string())?,
   1942     };
   1943     relay.handle_close(&subscription_id);
   1944     Ok(messages
   1945         .iter()
   1946         .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. }))
   1947         .count()
   1948         .try_into()
   1949         .expect("message count fits in u64"))
   1950 }
   1951 
   1952 fn count_for_operation(
   1953     relay: &BaseRelay,
   1954     operation: &QueryOperation,
   1955     owner_auth: &BaseAuthState,
   1956 ) -> Result<u64, String> {
   1957     let subscription_id = subscription(operation.name)?;
   1958     let filter = pocket_filter(&operation.filter)?;
   1959     let message = match operation.auth {
   1960         QueryAuth::None => relay
   1961             .handle_count(subscription_id, vec![filter])
   1962             .map_err(|error| error.to_string())?,
   1963         QueryAuth::Owner => relay
   1964             .handle_count_with_auth(subscription_id, vec![filter], owner_auth)
   1965             .map_err(|error| error.to_string())?,
   1966     };
   1967     match message {
   1968         RelayMessage::Count { count, .. } => Ok(count),
   1969         value => Err(format!("expected COUNT message, got {value:?}")),
   1970     }
   1971 }
   1972 
   1973 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
   1974 struct GeneratedStateCounts {
   1975     metadata: u64,
   1976     admins: u64,
   1977     members: u64,
   1978 }
   1979 
   1980 fn generated_state_counts(relay: &BaseRelay) -> Result<GeneratedStateCounts, String> {
   1981     Ok(GeneratedStateCounts {
   1982         metadata: count_kind(relay, KIND_GROUP_METADATA)?,
   1983         admins: count_kind(relay, KIND_GROUP_ADMINS)?,
   1984         members: count_kind(relay, KIND_GROUP_MEMBERS)?,
   1985     })
   1986 }
   1987 
   1988 fn count_kind(relay: &BaseRelay, kind: u32) -> Result<u64, String> {
   1989     let owner_auth = authenticated(FixtureKey::Owner)?;
   1990     let message = relay
   1991         .handle_count_with_auth(
   1992             subscription(&format!("count-{kind}"))?,
   1993             vec![pocket_filter_from_value(&json!({"kinds": [kind]}))?],
   1994             &owner_auth,
   1995         )
   1996         .map_err(|error| error.to_string())?;
   1997     match message {
   1998         RelayMessage::Count { count, .. } => Ok(count),
   1999         value => Err(format!("expected COUNT message, got {value:?}")),
   2000     }
   2001 }
   2002 
   2003 fn pocket_filter(filter: &Filter) -> Result<PocketOwnedFilter, String> {
   2004     let raw = serde_json::to_vec(&filter_to_value(filter)).map_err(|error| error.to_string())?;
   2005     parse_pocket_filter_json(&raw).map_err(|error| error.to_string())
   2006 }
   2007 
   2008 fn pocket_event(event: &Event) -> Result<PocketOwnedEvent, String> {
   2009     let raw = serde_json::to_vec(&event_to_value(event)).map_err(|error| error.to_string())?;
   2010     parse_pocket_event_json(&raw).map_err(|error| error.to_string())
   2011 }
   2012 
   2013 fn pocket_protocol_event(
   2014     key: FixtureKey,
   2015     created_at: u64,
   2016     kind: u64,
   2017     tags: Vec<Tag>,
   2018     content: &str,
   2019 ) -> Result<Event, String> {
   2020     let tags = pocket_tags_from_protocol(&tags)?;
   2021     let pocket = signed_pocket_event(
   2022         fixture_secret_byte(key),
   2023         created_at,
   2024         u16::try_from(kind).map_err(|error| error.to_string())?,
   2025         &tags,
   2026         content.as_bytes(),
   2027     )?;
   2028     pocket_event_to_protocol(&pocket)
   2029 }
   2030 
   2031 fn pocket_protocol_auth_event(
   2032     key: FixtureKey,
   2033     challenge: &str,
   2034     created_at: u64,
   2035 ) -> Result<Event, String> {
   2036     pocket_protocol_event(
   2037         key,
   2038         created_at,
   2039         22_242,
   2040         vec![
   2041             Tag::from_parts("relay", &[TANGLE_V2_RELAY_URL]).map_err(|error| error.to_string())?,
   2042             Tag::from_parts("challenge", &[challenge]).map_err(|error| error.to_string())?,
   2043         ],
   2044         "",
   2045     )
   2046 }
   2047 
   2048 fn pocket_protocol_group_create_event(
   2049     key: FixtureKey,
   2050     group_id: &str,
   2051     created_at: u64,
   2052     flags: &[&str],
   2053 ) -> Result<Event, String> {
   2054     let mut tags = vec![
   2055         Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?,
   2056         Tag::from_parts("name", &[group_id]).map_err(|error| error.to_string())?,
   2057     ];
   2058     for flag in flags {
   2059         tags.push(Tag::from_parts(flag, &[]).map_err(|error| error.to_string())?);
   2060     }
   2061     pocket_protocol_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "")
   2062 }
   2063 
   2064 fn pocket_protocol_put_user_event(
   2065     key: FixtureKey,
   2066     group_id: &str,
   2067     target: FixtureKey,
   2068     created_at: u64,
   2069 ) -> Result<Event, String> {
   2070     let target_pubkey = target.public_key();
   2071     pocket_protocol_put_pubkey_event(key, group_id, target_pubkey.as_str(), created_at)
   2072 }
   2073 
   2074 fn pocket_protocol_put_pubkey_event(
   2075     key: FixtureKey,
   2076     group_id: &str,
   2077     target_pubkey: &str,
   2078     created_at: u64,
   2079 ) -> Result<Event, String> {
   2080     pocket_protocol_event(
   2081         key,
   2082         created_at,
   2083         KIND_GROUP_PUT_USER.into(),
   2084         vec![
   2085             Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?,
   2086             Tag::from_parts("p", &[target_pubkey]).map_err(|error| error.to_string())?,
   2087         ],
   2088         "",
   2089     )
   2090 }
   2091 
   2092 fn pocket_protocol_group_event(
   2093     key: FixtureKey,
   2094     group_id: &str,
   2095     created_at: u64,
   2096     kind: u64,
   2097     content: &str,
   2098 ) -> Result<Event, String> {
   2099     pocket_protocol_event(
   2100         key,
   2101         created_at,
   2102         kind,
   2103         vec![Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?],
   2104         content,
   2105     )
   2106 }
   2107 
   2108 fn signed_pocket_event(
   2109     secret_byte: u8,
   2110     created_at: u64,
   2111     kind: u16,
   2112     tags: &PocketOwnedTags,
   2113     content: &[u8],
   2114 ) -> Result<PocketOwnedEvent, String> {
   2115     let secp = Secp256k1::new();
   2116     let secret_key =
   2117         SecretKey::from_byte_array([secret_byte; 32]).map_err(|error| error.to_string())?;
   2118     let keypair = Keypair::from_secret_key(&secp, &secret_key);
   2119     let (xonlypubkey, _) = keypair.x_only_public_key();
   2120     let pubkey_bytes = xonlypubkey.serialize();
   2121     let pubkey = PocketPubkey::from_bytes(pubkey_bytes);
   2122     let pocket_kind = PocketKind::from_u16(kind);
   2123     let pocket_time = PocketTime::from_u64(created_at);
   2124     let escaped_content = json_escape(content, Vec::with_capacity(content.len() * 7 / 6))
   2125         .map_err(|error| error.to_string())?;
   2126     let escaped_content =
   2127         std::str::from_utf8(&escaped_content).map_err(|error| error.to_string())?;
   2128     let tags_json = tags.as_json();
   2129     let tags_json = std::str::from_utf8(&tags_json).map_err(|error| error.to_string())?;
   2130     let signable = format!(
   2131         r#"[0,"{}",{},{},{},"{}"]"#,
   2132         pubkey, pocket_time, pocket_kind, tags_json, escaped_content
   2133     );
   2134     let digest = Sha256::digest(signable.as_bytes());
   2135     let digest_slice: &[u8] = digest.as_ref();
   2136     let event_id: [u8; 32] = digest_slice
   2137         .try_into()
   2138         .expect("sha256 digest length is 32 bytes");
   2139     let signature = secp.sign_schnorr_no_aux_rand(&event_id, &keypair);
   2140     PocketOwnedEvent::new(
   2141         PocketEventId::from_bytes(event_id),
   2142         pocket_kind,
   2143         pubkey,
   2144         PocketSig::from_bytes(signature.to_byte_array()),
   2145         tags,
   2146         pocket_time,
   2147         content,
   2148     )
   2149     .map_err(|error| error.to_string())
   2150 }
   2151 
   2152 fn pocket_tags_from_protocol(tags: &[Tag]) -> Result<PocketOwnedTags, String> {
   2153     let parts = tags
   2154         .iter()
   2155         .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
   2156         .collect::<Vec<_>>();
   2157     PocketOwnedTags::new(&parts).map_err(|error| error.to_string())
   2158 }
   2159 
   2160 fn pocket_event_to_protocol(event: &PocketOwnedEvent) -> Result<Event, String> {
   2161     let tags = event
   2162         .tags()
   2163         .map_err(|error| error.to_string())?
   2164         .iter()
   2165         .map(|tag| {
   2166             Tag::new(
   2167                 tag.map(|value| {
   2168                     std::str::from_utf8(value)
   2169                         .map(str::to_owned)
   2170                         .map_err(|error| error.to_string())
   2171                 })
   2172                 .collect::<Result<Vec<_>, _>>()?,
   2173             )
   2174             .map_err(|error| error.to_string())
   2175         })
   2176         .collect::<Result<Vec<_>, _>>()?;
   2177     Ok(Event::new(
   2178         EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?,
   2179         UnsignedEvent::new(
   2180             PublicKeyHex::new(&event.pubkey().as_hex_string())
   2181                 .map_err(|error| error.to_string())?,
   2182             UnixTimestamp::new(event.created_at().as_u64()),
   2183             Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?,
   2184             tags,
   2185             std::str::from_utf8(event.content()).map_err(|error| error.to_string())?,
   2186         ),
   2187         SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?,
   2188     ))
   2189 }
   2190 
   2191 fn fixture_secret_byte(key: FixtureKey) -> u8 {
   2192     match key {
   2193         FixtureKey::Relay => 9,
   2194         FixtureKey::Owner => 10,
   2195         FixtureKey::Admin => 11,
   2196         FixtureKey::Member => 12,
   2197         FixtureKey::Outsider => 13,
   2198     }
   2199 }
   2200 
   2201 fn pocket_filter_from_value(value: &serde_json::Value) -> Result<PocketOwnedFilter, String> {
   2202     let filter = filter_from_value(value)?;
   2203     pocket_filter(&filter)
   2204 }
   2205 
   2206 fn validation_summary(
   2207     scenarios: &[ScenarioReport],
   2208     thresholds: BenchmarkThresholds,
   2209 ) -> Result<BTreeMap<String, String>, String> {
   2210     let mut summary = BTreeMap::new();
   2211     let mut failures = Vec::new();
   2212     if let Some(failure) = record_threshold_status(
   2213         &mut summary,
   2214         SCENARIO_POCKET_QUERY_VISIBLE_EVENTS,
   2215         scenario(scenarios, SCENARIO_POCKET_QUERY_VISIBLE_EVENTS)?
   2216             .pass_latency_gate(thresholds.pocket_query_p95_micros),
   2217     ) {
   2218         failures.push(failure);
   2219     }
   2220     if let Some(failure) = record_threshold_status(
   2221         &mut summary,
   2222         SCENARIO_GROUP_READ_GATE_OVERHEAD,
   2223         scenario(scenarios, SCENARIO_GROUP_READ_GATE_OVERHEAD)?
   2224             .pass_latency_gate(thresholds.read_gate_p95_micros),
   2225     ) {
   2226         failures.push(failure);
   2227     }
   2228     if let Some(failure) = record_threshold_status(
   2229         &mut summary,
   2230         SCENARIO_COUNT_RESOURCE_CONTROLS,
   2231         scenario(scenarios, SCENARIO_COUNT_RESOURCE_CONTROLS)?
   2232             .pass_latency_gate(thresholds.count_resource_controls_p95_micros),
   2233     ) {
   2234         failures.push(failure);
   2235     }
   2236     if let Some(failure) = record_threshold_status(
   2237         &mut summary,
   2238         SCENARIO_PROJECTION_REBUILD,
   2239         scenario(scenarios, SCENARIO_PROJECTION_REBUILD)?
   2240             .pass_elapsed_gate(thresholds.projection_rebuild_elapsed_micros),
   2241     ) {
   2242         failures.push(failure);
   2243     }
   2244     if let Some(failure) = record_threshold_status(
   2245         &mut summary,
   2246         SCENARIO_OUTBOX_REPLAY,
   2247         scenario(scenarios, SCENARIO_OUTBOX_REPLAY)?
   2248             .pass_elapsed_gate(thresholds.outbox_replay_elapsed_micros),
   2249     ) {
   2250         failures.push(failure);
   2251     }
   2252     if let Some(failure) = record_threshold_status(
   2253         &mut summary,
   2254         SCENARIO_BROADCAST_LAG,
   2255         scenario(scenarios, SCENARIO_BROADCAST_LAG)?
   2256             .pass_latency_gate(thresholds.broadcast_lag_p95_micros),
   2257     ) {
   2258         failures.push(failure);
   2259     }
   2260     if let Some(failure) = record_threshold_status(
   2261         &mut summary,
   2262         SCENARIO_MEMORY_PROFILE,
   2263         scenario(scenarios, SCENARIO_MEMORY_PROFILE)?
   2264             .pass_memory_gate(thresholds.memory_profile_max_bytes),
   2265     ) {
   2266         failures.push(failure);
   2267     }
   2268     for name in [
   2269         SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT,
   2270         SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT,
   2271         SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT,
   2272     ] {
   2273         let Some(report) = scenarios
   2274             .iter()
   2275             .find(|scenario| scenario.scenario.as_str() == name)
   2276         else {
   2277             continue;
   2278         };
   2279         if let Some(failure) = record_threshold_status(
   2280             &mut summary,
   2281             name,
   2282             report.pass_latency_gate(thresholds.broadcast_lag_p95_micros),
   2283         ) {
   2284             failures.push(failure);
   2285         }
   2286     }
   2287     if failures.is_empty() {
   2288         Ok(summary)
   2289     } else {
   2290         Err(failures.join("; "))
   2291     }
   2292 }
   2293 
   2294 fn record_threshold_status(
   2295     summary: &mut BTreeMap<String, String>,
   2296     name: &str,
   2297     passed: bool,
   2298 ) -> Option<String> {
   2299     summary.insert(name.to_owned(), status(passed));
   2300     if passed {
   2301         None
   2302     } else {
   2303         Some(format!("scenario `{name}` failed benchmark threshold"))
   2304     }
   2305 }
   2306 
   2307 fn scenario<'a>(scenarios: &'a [ScenarioReport], name: &str) -> Result<&'a ScenarioReport, String> {
   2308     scenarios
   2309         .iter()
   2310         .find(|scenario| scenario.scenario == name)
   2311         .ok_or_else(|| format!("scenario `{name}` was not run"))
   2312 }
   2313 
   2314 fn status(value: bool) -> String {
   2315     if value { "pass" } else { "fail" }.to_owned()
   2316 }
   2317 
   2318 fn bench_member_event(
   2319     group_id: &str,
   2320     group_index: usize,
   2321     member_index: usize,
   2322     base_created_at: u64,
   2323 ) -> Result<Event, String> {
   2324     if member_index == 0 {
   2325         return pocket_protocol_put_user_event(
   2326             FixtureKey::Admin,
   2327             group_id,
   2328             FixtureKey::Member,
   2329             base_created_at + u64::try_from(group_index * 10_000).expect("group index fits in u64"),
   2330         );
   2331     }
   2332     let pubkey = synthetic_member_pubkey(group_index, member_index);
   2333     pocket_protocol_put_pubkey_event(
   2334         FixtureKey::Admin,
   2335         group_id,
   2336         &pubkey,
   2337         base_created_at
   2338             + u64::try_from(group_index * 10_000 + member_index).expect("member index fits in u64"),
   2339     )
   2340 }
   2341 
   2342 fn synthetic_member_pubkey(group_index: usize, member_index: usize) -> String {
   2343     format!(
   2344         "{:064x}",
   2345         0x100000_u128 + (group_index as u128 * 10_000) + member_index as u128
   2346     )
   2347 }
   2348 
   2349 fn group_visibility(index: usize) -> BenchGroupVisibility {
   2350     match index % 3 {
   2351         0 => BenchGroupVisibility::Public,
   2352         1 => BenchGroupVisibility::Private,
   2353         _ => BenchGroupVisibility::Hidden,
   2354     }
   2355 }
   2356 
   2357 fn event_has_group(event: &Event, group_id: &str) -> bool {
   2358     event.unsigned().tags().iter().any(|tag| {
   2359         tag.indexed_pair()
   2360             .is_some_and(|(name, value)| name == "h" && value == group_id)
   2361     })
   2362 }
   2363 
   2364 fn group_config() -> Result<tangle_groups::GroupRuntimeConfig, String> {
   2365     tangle_v2_group_config(FixtureKey::Owner, &[FixtureKey::Admin])
   2366 }
   2367 
   2368 fn authenticated(key: FixtureKey) -> Result<BaseAuthState, String> {
   2369     let mut auth =
   2370         BaseAuthState::new(TANGLE_V2_RELAY_URL, 60, 600).map_err(|error| error.to_string())?;
   2371     auth.issue_challenge("challenge-a", tangle_protocol::UnixTimestamp::new(100))
   2372         .map_err(|error| error.to_string())?;
   2373     let event = pocket_protocol_auth_event(key, "challenge-a", 120)?;
   2374     let pocket = pocket_event(&event)?;
   2375     auth.authenticate_pocket(&pocket, tangle_protocol::UnixTimestamp::new(120))
   2376         .map_err(|error| error.to_string())?;
   2377     Ok(auth)
   2378 }
   2379 
   2380 fn bench_store_config(run_name: &str) -> Result<PocketStoreConfig, String> {
   2381     let root = bench_temp_root(run_name);
   2382     let _ = fs::remove_dir_all(&root);
   2383     PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
   2384         .map_err(|error| error.to_string())
   2385 }
   2386 
   2387 fn bench_runtime_config(root: &Path) -> Result<BaseRelayRuntimeConfig, String> {
   2388     let raw = json!({
   2389         "server": {
   2390             "listen_addr": "127.0.0.1:0",
   2391             "relay_url": TANGLE_V2_RELAY_URL
   2392         },
   2393         "pocket": {
   2394             "data_directory": root.join("pocket"),
   2395             "sync_policy": "flush_on_shutdown",
   2396             "query": {
   2397                 "allow_scraping": false,
   2398                 "allow_scrape_if_limited_to": 100,
   2399                 "allow_scrape_if_max_seconds": 3600
   2400             }
   2401         },
   2402         "groups": {
   2403             "enabled": true,
   2404             "canonical_relay_url": TANGLE_V2_RELAY_URL,
   2405             "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777",
   2406             "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()]
   2407         },
   2408         "auth": {
   2409             "challenge_ttl_seconds": 300,
   2410             "created_at_skew_seconds": 600
   2411         },
   2412         "limits": {
   2413             "max_message_length": 1048576,
   2414             "max_subid_length": 64,
   2415             "max_subscriptions_per_connection": 64,
   2416             "max_filters_per_request": 10,
   2417             "max_tag_values_per_filter": 100,
   2418             "max_query_complexity": 2048,
   2419             "max_limit": 500,
   2420             "default_limit": 100,
   2421             "max_event_tags": 200,
   2422             "max_content_length": 65536,
   2423             "broadcast_channel_capacity": 16,
   2424             "per_connection_outbound_queue": 8
   2425         },
   2426         "rate_limits": {
   2427             "auth": {
   2428                 "per_ip": {"window_seconds": 60, "max_hits": 120},
   2429                 "per_pubkey": {"window_seconds": 60, "max_hits": 30},
   2430                 "failures": {"window_seconds": 300, "max_hits": 5},
   2431                 "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
   2432             },
   2433             "event": {
   2434                 "per_ip": {"window_seconds": 60, "max_hits": 600},
   2435                 "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   2436                 "per_kind": {"window_seconds": 60, "max_hits": 1000}
   2437             },
   2438             "group": {
   2439                 "write_per_ip": {"window_seconds": 60, "max_hits": 300},
   2440                 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
   2441                 "write_per_group": {"window_seconds": 60, "max_hits": 90},
   2442                 "write_per_kind": {"window_seconds": 60, "max_hits": 300},
   2443                 "join_flow": {"window_seconds": 300, "max_hits": 10},
   2444                 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
   2445             },
   2446             "req": {
   2447                 "per_ip": {"window_seconds": 60, "max_hits": 600},
   2448                 "per_connection": {"window_seconds": 60, "max_hits": 120},
   2449                 "per_pubkey": {"window_seconds": 60, "max_hits": 240},
   2450                 "per_group": {"window_seconds": 60, "max_hits": 240},
   2451                 "per_kind": {"window_seconds": 60, "max_hits": 500},
   2452                 "broad": {"window_seconds": 60, "max_hits": 30}
   2453             },
   2454             "count": {
   2455                 "per_ip": {"window_seconds": 60, "max_hits": 300},
   2456                 "per_connection": {"window_seconds": 60, "max_hits": 60},
   2457                 "per_pubkey": {"window_seconds": 60, "max_hits": 120},
   2458                 "per_group": {"window_seconds": 60, "max_hits": 120},
   2459                 "per_kind": {"window_seconds": 60, "max_hits": 240},
   2460                 "broad": {"window_seconds": 60, "max_hits": 20}
   2461             }
   2462         }
   2463     })
   2464     .to_string();
   2465     parse_base_relay_runtime_config_json(&raw).map_err(|error| error.to_string())
   2466 }
   2467 
   2468 fn bench_temp_root(run_name: &str) -> PathBuf {
   2469     let id = TEMP_ID.fetch_add(1, Ordering::Relaxed);
   2470     std::env::temp_dir().join(format!(
   2471         "tangle-bench-{run_name}-{}-{id}",
   2472         std::process::id()
   2473     ))
   2474 }
   2475 
   2476 fn subscription(value: &str) -> Result<SubscriptionId, String> {
   2477     SubscriptionId::new(value).map_err(|error| error.to_string())
   2478 }
   2479 
   2480 fn ok_accepted(message: &RelayMessage) -> bool {
   2481     matches!(message, RelayMessage::Ok { accepted: true, .. })
   2482 }
   2483 
   2484 fn estimate_memory_bytes(dataset: &BenchDataset) -> u64 {
   2485     let event_bytes = dataset
   2486         .source_events()
   2487         .iter()
   2488         .map(|source| {
   2489             serde_json::to_string(&event_to_value(source.event()))
   2490                 .unwrap_or_default()
   2491                 .len()
   2492         })
   2493         .sum::<usize>();
   2494     let projection_bytes = dataset.groups().len() * 512
   2495         + usize::try_from(dataset.membership_event_count()).expect("member count fits in usize")
   2496             * 192;
   2497     (event_bytes + projection_bytes)
   2498         .try_into()
   2499         .expect("estimated memory fits in u64")
   2500 }
   2501 
   2502 fn estimate_virtual_relay_memory_bytes(config: VirtualRelayTenancyConfig) -> u64 {
   2503     let subscription_bytes = config.aggregate_active_subscriptions() * 512;
   2504     let tenant_bytes = config.tenant_count * 1024 * 1024;
   2505     (subscription_bytes + tenant_bytes)
   2506         .try_into()
   2507         .expect("estimated virtual relay memory fits in u64")
   2508 }
   2509 
   2510 fn percentile(samples: &[u64], percentile: u64) -> u64 {
   2511     if samples.is_empty() {
   2512         return 0;
   2513     }
   2514     let last = samples.len() - 1;
   2515     let index = (last as u64 * percentile).div_ceil(100);
   2516     samples[usize::try_from(index).expect("percentile index fits in usize")]
   2517 }
   2518 
   2519 fn elapsed_micros(started: Instant) -> u64 {
   2520     u64::try_from(started.elapsed().as_micros())
   2521         .unwrap_or(u64::MAX)
   2522         .max(1)
   2523 }
   2524 
   2525 fn lower_hex(bytes: &[u8]) -> String {
   2526     const HEX: &[u8; 16] = b"0123456789abcdef";
   2527     let mut output = String::with_capacity(bytes.len() * 2);
   2528     for byte in bytes {
   2529         output.push(char::from(HEX[usize::from(byte >> 4)]));
   2530         output.push(char::from(HEX[usize::from(byte & 0x0f)]));
   2531     }
   2532     output
   2533 }
   2534 
   2535 #[cfg(test)]
   2536 mod tests {
   2537     use super::{
   2538         BenchDataset, BenchDatasetConfig, BenchGroupVisibility, BenchmarkProfile,
   2539         BenchmarkProfileName, BenchmarkRunReport, BenchmarkThresholds, POCKET_SOURCE_REPOSITORY,
   2540         POCKET_SOURCE_REVISION, SCENARIO_BROADCAST_LAG, SCENARIO_COUNT_RESOURCE_CONTROLS,
   2541         SCENARIO_GROUP_READ_GATE_OVERHEAD, SCENARIO_MEMORY_PROFILE, SCENARIO_OUTBOX_REPLAY,
   2542         SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, SCENARIO_PROJECTION_REBUILD,
   2543         SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT, SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT,
   2544         SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT, ScenarioReport, VirtualRelayTenancyConfig,
   2545         generated_state_counts, materialize_dataset,
   2546     };
   2547     use std::collections::BTreeSet;
   2548     use tangle_groups::{GroupId, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA};
   2549 
   2550     #[test]
   2551     fn deterministic_dataset_generator_produces_stable_group_events() {
   2552         let first =
   2553             BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("first dataset");
   2554         let second =
   2555             BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("second dataset");
   2556         let first_ids = first
   2557             .source_events()
   2558             .into_iter()
   2559             .map(|source| source.event().id().as_str().to_owned())
   2560             .collect::<Vec<_>>();
   2561         let second_ids = second
   2562             .source_events()
   2563             .into_iter()
   2564             .map(|source| source.event().id().as_str().to_owned())
   2565             .collect::<Vec<_>>();
   2566         let unique_ids = first_ids.iter().cloned().collect::<BTreeSet<_>>();
   2567 
   2568         assert_eq!(first_ids, second_ids);
   2569         assert_eq!(first.groups().len(), 3);
   2570         assert_eq!(first.source_event_count(), 17);
   2571         assert_eq!(first.group_event_count(), 15);
   2572         assert_eq!(first.membership_event_count(), 6);
   2573         assert_eq!(unique_ids.len(), first_ids.len());
   2574         assert_eq!(
   2575             first
   2576                 .groups()
   2577                 .iter()
   2578                 .map(|group| group.visibility())
   2579                 .collect::<Vec<_>>(),
   2580             vec![
   2581                 BenchGroupVisibility::Public,
   2582                 BenchGroupVisibility::Private,
   2583                 BenchGroupVisibility::Hidden
   2584             ]
   2585         );
   2586         assert_eq!(
   2587             first.dataset_digest().expect("first digest"),
   2588             second.dataset_digest().expect("second digest")
   2589         );
   2590         assert_eq!(
   2591             first.source_events_jsonl().expect("jsonl").lines().count(),
   2592             usize::try_from(first.source_event_count()).expect("count fits")
   2593         );
   2594     }
   2595 
   2596     #[test]
   2597     fn dataset_config_rejects_benchmark_shapes_without_privacy_coverage() {
   2598         assert!(BenchDataset::generate(BenchDatasetConfig::new(2, 1, 1, 0, 1)).is_err());
   2599         assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 0, 1, 0, 1)).is_err());
   2600         assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 0, 0, 1)).is_err());
   2601         assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 0)).is_err());
   2602     }
   2603 
   2604     #[test]
   2605     fn materialized_dataset_populates_generated_group_state() {
   2606         let dataset =
   2607             BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 1, 1)).expect("dataset");
   2608         let materialized = materialize_dataset(&dataset, "test-generated-state", 16)
   2609             .expect("materialized dataset");
   2610         let counts = generated_state_counts(&materialized.relay).expect("state counts");
   2611 
   2612         assert_eq!(counts.metadata, 3);
   2613         assert_eq!(counts.admins, 3);
   2614         assert_eq!(counts.members, 3);
   2615         assert_eq!(
   2616             super::count_kind(&materialized.relay, KIND_GROUP_METADATA).expect("metadata"),
   2617             3
   2618         );
   2619         assert_eq!(
   2620             super::count_kind(&materialized.relay, KIND_GROUP_ADMINS).expect("admins"),
   2621             3
   2622         );
   2623         assert_eq!(
   2624             super::count_kind(&materialized.relay, KIND_GROUP_MEMBERS).expect("members"),
   2625             3
   2626         );
   2627     }
   2628 
   2629     #[test]
   2630     fn benchmark_suite_runs_all_required_v2_scenarios() {
   2631         let report = BenchmarkRunReport::run(smoke_profile(BenchDatasetConfig::new(3, 1, 1, 2, 1)))
   2632             .expect("report");
   2633 
   2634         for name in [
   2635             SCENARIO_POCKET_QUERY_VISIBLE_EVENTS,
   2636             SCENARIO_GROUP_READ_GATE_OVERHEAD,
   2637             SCENARIO_COUNT_RESOURCE_CONTROLS,
   2638             SCENARIO_PROJECTION_REBUILD,
   2639             SCENARIO_OUTBOX_REPLAY,
   2640             SCENARIO_BROADCAST_LAG,
   2641             SCENARIO_MEMORY_PROFILE,
   2642         ] {
   2643             let scenario = report.scenario(name).expect("scenario");
   2644             assert_eq!(scenario.rejected, 0, "{name} rejected operations");
   2645             assert_eq!(scenario.accepted, scenario.attempted, "{name} acceptance");
   2646             assert!(scenario.elapsed_micros > 0, "{name} elapsed");
   2647         }
   2648         assert_eq!(report.dataset_profile().groups, 3);
   2649         assert_eq!(report.validation_summary().len(), 7);
   2650         assert!(
   2651             report
   2652                 .validation_summary()
   2653                 .values()
   2654                 .all(|status| status == "pass")
   2655         );
   2656     }
   2657 
   2658     #[test]
   2659     fn local_smoke_profiles_run_without_hardware_evidence() {
   2660         for profile in [BenchmarkProfile::smoke(), BenchmarkProfile::large_smoke()] {
   2661             assert!(!profile.requires_target_hardware_evidence());
   2662             let report = BenchmarkRunReport::run(profile).expect("local profile report");
   2663             assert!(
   2664                 report
   2665                     .validation_summary()
   2666                     .values()
   2667                     .all(|status| status == "pass")
   2668             );
   2669         }
   2670     }
   2671 
   2672     #[test]
   2673     fn protocol_conversion_for_supported_profile_sizes_is_bounded() {
   2674         let dataset =
   2675             BenchDataset::generate(BenchDatasetConfig::new(4, 3, 3, 4, 3)).expect("dataset");
   2676         let mut total_event_json_bytes = 0_usize;
   2677         for source in dataset.source_events() {
   2678             let event_json = tangle_protocol::event_to_value(source.event()).to_string();
   2679             total_event_json_bytes += event_json.len();
   2680             assert!(
   2681                 tangle_protocol::parse_client_message(&format!("[\"EVENT\",{event_json}]")).is_ok()
   2682             );
   2683         }
   2684 
   2685         assert!(total_event_json_bytes < 1_000_000);
   2686     }
   2687 
   2688     #[test]
   2689     fn benchmark_profiles_are_explicit_and_unknown_profiles_fail_closed() {
   2690         assert_eq!(
   2691             BenchmarkProfileName::all()
   2692                 .iter()
   2693                 .map(|profile| profile.as_str())
   2694                 .collect::<Vec<_>>(),
   2695             vec![
   2696                 "smoke",
   2697                 "virtual-relay-tenancy",
   2698                 "medium",
   2699                 "large-smoke",
   2700                 "proof-10m",
   2701                 "proof-large-group",
   2702                 "proof-join-storm",
   2703                 "proof-slow-client"
   2704             ]
   2705         );
   2706         assert_eq!(
   2707             BenchmarkProfileName::parse("smoke")
   2708                 .expect("smoke")
   2709                 .as_str(),
   2710             "smoke"
   2711         );
   2712         assert_eq!(
   2713             BenchmarkProfileName::parse("virtual-relay-tenancy")
   2714                 .expect("virtual")
   2715                 .as_str(),
   2716             "virtual-relay-tenancy"
   2717         );
   2718         assert_eq!(
   2719             BenchmarkProfileName::parse("medium")
   2720                 .expect("medium")
   2721                 .as_str(),
   2722             "medium"
   2723         );
   2724         assert_eq!(
   2725             BenchmarkProfileName::parse("large-smoke")
   2726                 .expect("large-smoke")
   2727                 .as_str(),
   2728             "large-smoke"
   2729         );
   2730         assert!(BenchmarkProfileName::parse("production").is_err());
   2731         assert!(
   2732             BenchmarkProfileName::parse("local")
   2733                 .expect_err("unknown")
   2734                 .contains("unknown benchmark profile")
   2735         );
   2736         assert_eq!(
   2737             BenchmarkProfile::smoke().dataset_config(),
   2738             BenchDatasetConfig::smoke()
   2739         );
   2740         assert_eq!(
   2741             BenchmarkProfile::virtual_relay_tenancy().dataset_config(),
   2742             BenchDatasetConfig::smoke()
   2743         );
   2744         assert_eq!(
   2745             BenchmarkProfile::medium().dataset_config(),
   2746             BenchDatasetConfig::medium()
   2747         );
   2748         assert_eq!(
   2749             BenchmarkProfile::large_smoke().dataset_config(),
   2750             BenchDatasetConfig::large_smoke()
   2751         );
   2752         assert_eq!(
   2753             BenchmarkProfile::proof_10m().dataset_config(),
   2754             BenchDatasetConfig::proof_10m()
   2755         );
   2756         assert_eq!(
   2757             BenchmarkProfile::proof_large_group().dataset_config(),
   2758             BenchDatasetConfig::proof_large_group()
   2759         );
   2760         assert_eq!(
   2761             BenchmarkProfile::proof_join_storm().dataset_config(),
   2762             BenchDatasetConfig::proof_join_storm()
   2763         );
   2764         assert_eq!(
   2765             BenchmarkProfile::proof_slow_client().dataset_config(),
   2766             BenchDatasetConfig::proof_slow_client()
   2767         );
   2768     }
   2769 
   2770     #[test]
   2771     fn proof_profile_dataset_definitions_are_hardware_scale_without_materialization() {
   2772         assert_eq!(
   2773             BenchDatasetConfig::proof_10m().estimated_source_event_count(),
   2774             10_000_000
   2775         );
   2776         assert_eq!(
   2777             BenchDatasetConfig::proof_large_group().member_count,
   2778             100_000
   2779         );
   2780         assert_eq!(
   2781             BenchDatasetConfig::proof_join_storm().group_count
   2782                 * BenchDatasetConfig::proof_join_storm().member_count,
   2783             1_000_000
   2784         );
   2785         assert_eq!(BenchDatasetConfig::proof_slow_client().group_count, 50_000);
   2786         for profile in [
   2787             BenchmarkProfile::proof_10m(),
   2788             BenchmarkProfile::proof_large_group(),
   2789             BenchmarkProfile::proof_join_storm(),
   2790             BenchmarkProfile::proof_slow_client(),
   2791         ] {
   2792             assert!(profile.requires_target_hardware_evidence());
   2793             assert!(profile.validate_for_run().is_err());
   2794             assert!(!profile.proof_claim_eligible());
   2795             assert!(profile.dataset_config().validate().is_ok());
   2796         }
   2797     }
   2798 
   2799     #[test]
   2800     fn benchmark_threshold_json_rejects_missing_unknown_or_zero_fields() {
   2801         let valid = BenchmarkThresholds::from_json_value(&BenchmarkThresholds::smoke().to_json())
   2802             .expect("valid thresholds");
   2803         assert_eq!(valid, BenchmarkThresholds::smoke());
   2804 
   2805         let missing = serde_json::json!({
   2806             "pocket_query_p95_micros": 1,
   2807             "read_gate_p95_micros": 1,
   2808             "count_resource_controls_p95_micros": 1,
   2809             "projection_rebuild_elapsed_micros": 1,
   2810             "outbox_replay_elapsed_micros": 1,
   2811             "broadcast_lag_p95_micros": 1
   2812         });
   2813         assert!(
   2814             BenchmarkThresholds::from_json_value(&missing)
   2815                 .expect_err("missing")
   2816                 .contains("memory_profile_max_bytes")
   2817         );
   2818 
   2819         let unknown = serde_json::json!({
   2820             "pocket_query_p95_micros": 1,
   2821             "read_gate_p95_micros": 1,
   2822             "count_resource_controls_p95_micros": 1,
   2823             "projection_rebuild_elapsed_micros": 1,
   2824             "outbox_replay_elapsed_micros": 1,
   2825             "broadcast_lag_p95_micros": 1,
   2826             "memory_profile_max_bytes": 1,
   2827             "extra": 1
   2828         });
   2829         assert!(
   2830             BenchmarkThresholds::from_json_value(&unknown)
   2831                 .expect_err("unknown")
   2832                 .contains("unknown benchmark threshold field")
   2833         );
   2834 
   2835         let zero = serde_json::json!({
   2836             "pocket_query_p95_micros": 0,
   2837             "read_gate_p95_micros": 1,
   2838             "count_resource_controls_p95_micros": 1,
   2839             "projection_rebuild_elapsed_micros": 1,
   2840             "outbox_replay_elapsed_micros": 1,
   2841             "broadcast_lag_p95_micros": 1,
   2842             "memory_profile_max_bytes": 1
   2843         });
   2844         assert!(
   2845             BenchmarkThresholds::from_json_value(&zero)
   2846                 .expect_err("zero")
   2847                 .contains("greater than zero")
   2848         );
   2849     }
   2850 
   2851     #[test]
   2852     fn proof_claim_eligibility_requires_manual_proof_profile() {
   2853         assert!(!BenchmarkProfile::smoke().proof_claim_eligible());
   2854         assert!(
   2855             !BenchmarkProfile::smoke()
   2856                 .with_target_hardware_evidence("target-hardware:ci")
   2857                 .expect("evidence")
   2858                 .proof_claim_eligible()
   2859         );
   2860         assert!(!BenchmarkProfile::large_smoke().proof_claim_eligible());
   2861         assert!(
   2862             !BenchmarkProfile::large_smoke()
   2863                 .with_target_hardware_evidence("target-hardware:bench-node-001")
   2864                 .expect("evidence")
   2865                 .proof_claim_eligible()
   2866         );
   2867         assert!(!BenchmarkProfile::proof_10m().proof_claim_eligible());
   2868         assert!(
   2869             BenchmarkProfile::proof_10m()
   2870                 .with_target_hardware_evidence("target-hardware:proof-node-001")
   2871                 .expect("evidence")
   2872                 .proof_claim_eligible()
   2873         );
   2874     }
   2875 
   2876     #[test]
   2877     fn proof_profile_runs_fail_closed_without_hardware_evidence() {
   2878         for profile in [
   2879             BenchmarkProfile::proof_10m(),
   2880             BenchmarkProfile::proof_large_group(),
   2881             BenchmarkProfile::proof_join_storm(),
   2882             BenchmarkProfile::proof_slow_client(),
   2883         ] {
   2884             let error =
   2885                 BenchmarkRunReport::run(profile).expect_err("proof profile requires evidence");
   2886 
   2887             assert!(error.contains("target hardware evidence is required"));
   2888         }
   2889     }
   2890 
   2891     #[test]
   2892     fn benchmark_threshold_validation_rejects_missing_or_failed_scenarios() {
   2893         let scenarios = vec![
   2894             passing_scenario(SCENARIO_POCKET_QUERY_VISIBLE_EVENTS),
   2895             ScenarioReport::new(
   2896                 SCENARIO_GROUP_READ_GATE_OVERHEAD,
   2897                 1,
   2898                 1,
   2899                 0,
   2900                 10,
   2901                 vec![BenchmarkThresholds::smoke().read_gate_p95_micros + 1],
   2902                 128,
   2903             ),
   2904             passing_scenario(SCENARIO_COUNT_RESOURCE_CONTROLS),
   2905             passing_scenario(SCENARIO_PROJECTION_REBUILD),
   2906             passing_scenario(SCENARIO_OUTBOX_REPLAY),
   2907             passing_scenario(SCENARIO_BROADCAST_LAG),
   2908             passing_scenario(SCENARIO_MEMORY_PROFILE),
   2909         ];
   2910         let failed =
   2911             super::validation_summary(&scenarios, BenchmarkThresholds::smoke()).expect_err("fail");
   2912         assert!(failed.contains(SCENARIO_GROUP_READ_GATE_OVERHEAD));
   2913 
   2914         let missing = super::validation_summary(
   2915             &scenarios[..scenarios.len() - 1],
   2916             BenchmarkThresholds::smoke(),
   2917         )
   2918         .expect_err("missing");
   2919         assert!(missing.contains(SCENARIO_MEMORY_PROFILE));
   2920     }
   2921 
   2922     #[test]
   2923     fn benchmark_summary_json_matches_report_template_surface() {
   2924         let report = BenchmarkRunReport::run(smoke_profile(BenchDatasetConfig::new(3, 1, 1, 1, 1)))
   2925             .expect("report");
   2926         let summary = report.summary_json("unit-run", std::path::Path::new(".local/unit"));
   2927 
   2928         assert_eq!(summary["schema"], 2);
   2929         assert_eq!(summary["run_id"], "unit-run");
   2930         assert_eq!(summary["profile"], "smoke");
   2931         assert_eq!(summary["threshold_source"], "builtin:smoke");
   2932         assert_eq!(
   2933             summary["pocket_source"]["repository"],
   2934             POCKET_SOURCE_REPOSITORY
   2935         );
   2936         assert_eq!(summary["pocket_source"]["revision"], POCKET_SOURCE_REVISION);
   2937         assert_eq!(summary["proof_claim"]["eligible"], false);
   2938         assert_eq!(summary["proof_claim"]["target_hardware_evidence"], "absent");
   2939         assert_eq!(
   2940             summary["dataset"]["fixture_family"],
   2941             "synthetic repo-owned fixtures"
   2942         );
   2943         assert_eq!(
   2944             summary["dataset_profile"]["fixture_family"],
   2945             "synthetic repo-owned fixtures"
   2946         );
   2947         assert_eq!(summary["scenarios"].as_array().expect("scenarios").len(), 7);
   2948         let first_scenario = &summary["scenarios"]
   2949             .as_array()
   2950             .expect("scenarios")
   2951             .first()
   2952             .expect("first scenario");
   2953         assert_eq!(first_scenario["status"], "pass");
   2954         assert!(first_scenario["p50_micros"].as_u64().expect("p50") > 0);
   2955         assert!(first_scenario["p95_micros"].as_u64().expect("p95") > 0);
   2956         assert!(first_scenario["p99_micros"].as_u64().expect("p99") > 0);
   2957         assert!(
   2958             first_scenario["query_metrics"]["candidates_scanned"]
   2959                 .as_u64()
   2960                 .expect("candidates")
   2961                 > 0
   2962         );
   2963         assert!(
   2964             first_scenario["query_metrics"]["events_returned"]
   2965                 .as_u64()
   2966                 .expect("returned")
   2967                 > 0
   2968         );
   2969         assert!(
   2970             first_scenario["memory"]["max_rss_bytes"]
   2971                 .as_u64()
   2972                 .expect("memory")
   2973                 > 0
   2974         );
   2975         assert_eq!(
   2976             summary["validation_summary"][SCENARIO_POCKET_QUERY_VISIBLE_EVENTS],
   2977             "pass"
   2978         );
   2979         assert_eq!(summary["pass_fail_summary"]["overall_status"], "pass");
   2980         assert!(
   2981             summary["thresholds"]["read_gate_p95_micros"]
   2982                 .as_u64()
   2983                 .expect("threshold")
   2984                 > 0
   2985         );
   2986         assert_eq!(
   2987             summary["artifacts"]["dataset_events_jsonl"],
   2988             "dataset-events.jsonl"
   2989         );
   2990         assert_eq!(
   2991             summary["tangle_v1_mvp"]["benchmark_evidence"]["status"],
   2992             "not_run"
   2993         );
   2994         assert_eq!(
   2995             summary["tangle_v1_mvp"]["benchmark_evidence"]["required_profile"],
   2996             "virtual-relay-tenancy"
   2997         );
   2998         assert_eq!(
   2999             summary["tangle_v1_mvp"]["external_integration"]["runner"],
   3000             "radroots_testing_tangle_integration"
   3001         );
   3002         assert_eq!(
   3003             summary["tangle_v1_mvp"]["external_integration"]["local_status"],
   3004             "unavailable"
   3005         );
   3006     }
   3007 
   3008     #[test]
   3009     fn virtual_relay_tenancy_profile_contract_matches_mvp_target() {
   3010         let config = VirtualRelayTenancyConfig::mvp();
   3011 
   3012         assert_eq!(config.tenant_count, 10);
   3013         assert_eq!(config.aggregate_active_subscriptions(), 20_000);
   3014         assert_eq!(config.busiest_tenant_active_subscriptions(), 2_000);
   3015         assert_eq!(config.one_percent_fanout(), 200);
   3016         assert_eq!(config.ten_percent_fanout(), 2_000);
   3017         assert!(config.validate().is_ok());
   3018         assert_eq!(
   3019             BenchmarkProfile::virtual_relay_tenancy().name(),
   3020             BenchmarkProfileName::VirtualRelayTenancy
   3021         );
   3022     }
   3023 
   3024     #[test]
   3025     fn virtual_relay_tenancy_summary_requires_three_fanout_scenarios() {
   3026         let summary = super::virtual_relay_tenancy_summary_json(
   3027             BenchmarkProfileName::VirtualRelayTenancy,
   3028             &[
   3029                 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT),
   3030                 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT),
   3031                 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT),
   3032             ],
   3033         );
   3034 
   3035         assert_eq!(summary["status"], "pass");
   3036         assert_eq!(summary["target"]["tenant_count"], 10);
   3037         assert_eq!(summary["target"]["aggregate_active_subscriptions"], 20_000);
   3038         assert_eq!(
   3039             summary["target"]["busiest_tenant_active_subscriptions"],
   3040             2_000
   3041         );
   3042         assert_eq!(
   3043             summary["fanout_scenarios"]
   3044                 .as_array()
   3045                 .expect("fanout scenarios")
   3046                 .len(),
   3047             3
   3048         );
   3049     }
   3050 
   3051     #[test]
   3052     fn count_resource_controls_scenario_accepts_bounded_counts_and_refuses_broad_counts() {
   3053         let dataset =
   3054             BenchDataset::generate(BenchDatasetConfig::new(3, 101, 101, 0, 1)).expect("dataset");
   3055         let scenario =
   3056             super::run_count_resource_control_benchmark(&dataset).expect("count controls");
   3057 
   3058         assert_eq!(scenario.scenario, SCENARIO_COUNT_RESOURCE_CONTROLS);
   3059         assert_eq!(scenario.attempted, 5);
   3060         assert_eq!(scenario.accepted, scenario.attempted);
   3061         assert_eq!(scenario.rejected, 0);
   3062     }
   3063 
   3064     #[test]
   3065     fn benchmark_pocket_source_matches_store_boundary() {
   3066         assert_eq!(
   3067             POCKET_SOURCE_REPOSITORY,
   3068             tangle_store_pocket::POCKET_SOURCE_REPOSITORY
   3069         );
   3070         assert_eq!(
   3071             POCKET_SOURCE_REVISION,
   3072             tangle_store_pocket::POCKET_SOURCE_REVISION
   3073         );
   3074     }
   3075 
   3076     #[test]
   3077     fn projection_rebuild_scenario_recreates_groups_and_members() {
   3078         let dataset =
   3079             BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 2)).expect("dataset");
   3080         let scenario = super::run_projection_rebuild_benchmark(&dataset).expect("rebuild");
   3081 
   3082         assert_eq!(scenario.scenario, SCENARIO_PROJECTION_REBUILD);
   3083         assert_eq!(scenario.accepted, 1);
   3084         assert_eq!(scenario.rejected, 0);
   3085     }
   3086 
   3087     #[test]
   3088     fn outbox_replay_scenario_keeps_generated_state_idempotent() {
   3089         let dataset =
   3090             BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset");
   3091         let scenario = super::run_outbox_replay_benchmark(&dataset).expect("outbox");
   3092 
   3093         assert_eq!(scenario.scenario, SCENARIO_OUTBOX_REPLAY);
   3094         assert_eq!(scenario.accepted, 1);
   3095         assert_eq!(scenario.rejected, 0);
   3096     }
   3097 
   3098     #[test]
   3099     fn broadcast_lag_scenario_keeps_healthy_subscriptions_open() {
   3100         let dataset =
   3101             BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset");
   3102         let scenario = super::run_broadcast_lag_benchmark(&dataset).expect("lag");
   3103 
   3104         assert_eq!(scenario.scenario, SCENARIO_BROADCAST_LAG);
   3105         assert_eq!(scenario.accepted, scenario.attempted);
   3106         assert_eq!(scenario.rejected, 0);
   3107     }
   3108 
   3109     #[test]
   3110     fn percentile_helper_handles_empty_and_sorted_samples() {
   3111         assert_eq!(super::percentile(&[], 95), 0);
   3112         assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 50), 3);
   3113         assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 95), 5);
   3114         assert_eq!(super::lower_hex(&[0, 15, 16, 255]), "000f10ff");
   3115     }
   3116 
   3117     #[test]
   3118     fn group_id_helper_accepts_dataset_group_names() {
   3119         let dataset =
   3120             BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset");
   3121 
   3122         for group in dataset.groups() {
   3123             GroupId::new(group.id()).expect("group id");
   3124         }
   3125     }
   3126 
   3127     fn passing_scenario(name: &str) -> ScenarioReport {
   3128         ScenarioReport::new(name, 1, 1, 0, 10, vec![1], 128)
   3129     }
   3130 
   3131     fn smoke_profile(config: BenchDatasetConfig) -> BenchmarkProfile {
   3132         BenchmarkProfile::smoke()
   3133             .with_dataset_config(config)
   3134             .expect("smoke profile")
   3135     }
   3136 }