lib.rs (108173B)
1 #![forbid(unsafe_code)] 2 3 use pocket_types::json::json_escape; 4 use pocket_types::secp256k1::{Keypair, Secp256k1, SecretKey}; 5 use serde_json::json; 6 use sha2::{Digest, Sha256}; 7 use std::collections::BTreeMap; 8 use std::fs; 9 use std::path::{Path, PathBuf}; 10 use std::sync::atomic::{AtomicU64, Ordering}; 11 use std::time::Instant; 12 use tangle_groups::{ 13 KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, 14 KIND_GROUP_PUT_USER, MemberStatus, 15 }; 16 use tangle_protocol::{ 17 Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, SubscriptionId, Tag, 18 UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, filter_to_value, 19 }; 20 use tangle_runtime::{ 21 config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, 22 relay::{ 23 auth::BaseAuthState, 24 core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, 25 outbound::RuntimeRelayMessage, 26 }, 27 runtime::{RelayRuntime, RelayRuntimeHandle}, 28 }; 29 use tangle_store_pocket::{ 30 PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, PocketPubkey, 31 PocketQueryConfig, PocketSig, PocketStoreConfig, PocketSyncPolicy, PocketTime, 32 parse_pocket_event_json, parse_pocket_filter_json, 33 }; 34 use tangle_test_support::{FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_group_config}; 35 36 static TEMP_ID: AtomicU64 = AtomicU64::new(0); 37 38 pub const SCENARIO_POCKET_QUERY_VISIBLE_EVENTS: &str = "pocket_query_visible_events"; 39 pub const SCENARIO_GROUP_READ_GATE_OVERHEAD: &str = "group_read_gate_overhead"; 40 pub const SCENARIO_COUNT_RESOURCE_CONTROLS: &str = "count_resource_controls"; 41 pub const SCENARIO_PROJECTION_REBUILD: &str = "projection_rebuild"; 42 pub const SCENARIO_OUTBOX_REPLAY: &str = "outbox_replay"; 43 pub const SCENARIO_BROADCAST_LAG: &str = "broadcast_lag"; 44 pub const SCENARIO_MEMORY_PROFILE: &str = "memory_profile"; 45 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT: &str = "virtual_relay_fanout_1_percent"; 46 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT: &str = "virtual_relay_fanout_10_percent"; 47 pub const SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT: &str = "virtual_relay_fanout_100_percent"; 48 pub const POCKET_SOURCE_REPOSITORY: &str = "https://github.com/triesap/pocket"; 49 pub const POCKET_SOURCE_REVISION: &str = "329334f20948c796c6016b673b92551ac4855ad7"; 50 51 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 52 pub struct BenchDatasetConfig { 53 pub group_count: usize, 54 pub public_events_per_group: usize, 55 pub private_events_per_group: usize, 56 pub public_note_count: usize, 57 pub member_count: usize, 58 } 59 60 impl BenchDatasetConfig { 61 pub fn new( 62 group_count: usize, 63 public_events_per_group: usize, 64 private_events_per_group: usize, 65 public_note_count: usize, 66 member_count: usize, 67 ) -> Self { 68 Self { 69 group_count, 70 public_events_per_group, 71 private_events_per_group, 72 public_note_count, 73 member_count, 74 } 75 } 76 77 pub fn smoke() -> Self { 78 Self::new(6, 4, 3, 6, 3) 79 } 80 81 pub fn medium() -> Self { 82 Self::new(24, 8, 6, 24, 5) 83 } 84 85 pub fn large_smoke() -> Self { 86 Self::new(120, 24, 16, 120, 12) 87 } 88 89 pub fn proof_10m() -> Self { 90 Self::new(30_000, 100, 100, 6_670_000, 10) 91 } 92 93 pub fn proof_large_group() -> Self { 94 Self::new(3, 50, 50, 10_000, 100_000) 95 } 96 97 pub fn proof_join_storm() -> Self { 98 Self::new(25_000, 1, 1, 25_000, 40) 99 } 100 101 pub fn proof_slow_client() -> Self { 102 Self::new(50_000, 2, 1, 50_000, 2) 103 } 104 105 pub fn validate(self) -> Result<Self, String> { 106 if self.group_count < 3 { 107 return Err("group-count must be at least 3".to_owned()); 108 } 109 if self.public_events_per_group == 0 { 110 return Err("public-events-per-group must be greater than zero".to_owned()); 111 } 112 if self.private_events_per_group == 0 { 113 return Err("private-events-per-group must be greater than zero".to_owned()); 114 } 115 if self.member_count == 0 { 116 return Err("member-count must be greater than zero".to_owned()); 117 } 118 Ok(self) 119 } 120 121 pub fn estimated_source_event_count(self) -> u64 { 122 let public_groups = self.group_count.div_ceil(3); 123 let private_and_hidden_groups = self.group_count - public_groups; 124 let total = self.group_count 125 + self.group_count * self.member_count 126 + public_groups * self.public_events_per_group 127 + private_and_hidden_groups * self.private_events_per_group 128 + self.public_note_count; 129 total.try_into().expect("estimated event count fits in u64") 130 } 131 } 132 133 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 134 pub enum BenchmarkProfileName { 135 Smoke, 136 VirtualRelayTenancy, 137 Medium, 138 LargeSmoke, 139 Proof10m, 140 ProofLargeGroup, 141 ProofJoinStorm, 142 ProofSlowClient, 143 } 144 145 impl BenchmarkProfileName { 146 pub fn parse(value: &str) -> Result<Self, String> { 147 match value { 148 "smoke" => Ok(Self::Smoke), 149 "virtual-relay-tenancy" => Ok(Self::VirtualRelayTenancy), 150 "medium" => Ok(Self::Medium), 151 "large-smoke" => Ok(Self::LargeSmoke), 152 "proof-10m" => Ok(Self::Proof10m), 153 "proof-large-group" => Ok(Self::ProofLargeGroup), 154 "proof-join-storm" => Ok(Self::ProofJoinStorm), 155 "proof-slow-client" => Ok(Self::ProofSlowClient), 156 _ => Err(format!( 157 "unknown benchmark profile `{value}`; expected smoke, virtual-relay-tenancy, medium, large-smoke, proof-10m, proof-large-group, proof-join-storm, or proof-slow-client" 158 )), 159 } 160 } 161 162 pub fn as_str(self) -> &'static str { 163 match self { 164 Self::Smoke => "smoke", 165 Self::VirtualRelayTenancy => "virtual-relay-tenancy", 166 Self::Medium => "medium", 167 Self::LargeSmoke => "large-smoke", 168 Self::Proof10m => "proof-10m", 169 Self::ProofLargeGroup => "proof-large-group", 170 Self::ProofJoinStorm => "proof-join-storm", 171 Self::ProofSlowClient => "proof-slow-client", 172 } 173 } 174 175 pub fn all() -> [Self; 8] { 176 [ 177 Self::Smoke, 178 Self::VirtualRelayTenancy, 179 Self::Medium, 180 Self::LargeSmoke, 181 Self::Proof10m, 182 Self::ProofLargeGroup, 183 Self::ProofJoinStorm, 184 Self::ProofSlowClient, 185 ] 186 } 187 188 pub fn is_proof(self) -> bool { 189 matches!( 190 self, 191 Self::Proof10m | Self::ProofLargeGroup | Self::ProofJoinStorm | Self::ProofSlowClient 192 ) 193 } 194 } 195 196 #[derive(Debug, Clone, PartialEq, Eq)] 197 pub struct BenchmarkProfile { 198 name: BenchmarkProfileName, 199 dataset_config: BenchDatasetConfig, 200 thresholds: BenchmarkThresholds, 201 threshold_source: String, 202 target_hardware_evidence: Option<String>, 203 } 204 205 impl BenchmarkProfile { 206 pub fn from_name(name: BenchmarkProfileName) -> Self { 207 match name { 208 BenchmarkProfileName::Smoke => Self::smoke(), 209 BenchmarkProfileName::VirtualRelayTenancy => Self::virtual_relay_tenancy(), 210 BenchmarkProfileName::Medium => Self::medium(), 211 BenchmarkProfileName::LargeSmoke => Self::large_smoke(), 212 BenchmarkProfileName::Proof10m => Self::proof_10m(), 213 BenchmarkProfileName::ProofLargeGroup => Self::proof_large_group(), 214 BenchmarkProfileName::ProofJoinStorm => Self::proof_join_storm(), 215 BenchmarkProfileName::ProofSlowClient => Self::proof_slow_client(), 216 } 217 } 218 219 pub fn smoke() -> Self { 220 Self::new( 221 BenchmarkProfileName::Smoke, 222 BenchDatasetConfig::smoke(), 223 BenchmarkThresholds::smoke(), 224 ) 225 } 226 227 pub fn virtual_relay_tenancy() -> Self { 228 Self::new( 229 BenchmarkProfileName::VirtualRelayTenancy, 230 BenchDatasetConfig::smoke(), 231 BenchmarkThresholds::large_smoke(), 232 ) 233 } 234 235 pub fn medium() -> Self { 236 Self::new( 237 BenchmarkProfileName::Medium, 238 BenchDatasetConfig::medium(), 239 BenchmarkThresholds::medium(), 240 ) 241 } 242 243 pub fn large_smoke() -> Self { 244 Self::new( 245 BenchmarkProfileName::LargeSmoke, 246 BenchDatasetConfig::large_smoke(), 247 BenchmarkThresholds::large_smoke(), 248 ) 249 } 250 251 pub fn proof_10m() -> Self { 252 Self::new( 253 BenchmarkProfileName::Proof10m, 254 BenchDatasetConfig::proof_10m(), 255 BenchmarkThresholds::proof_10m(), 256 ) 257 } 258 259 pub fn proof_large_group() -> Self { 260 Self::new( 261 BenchmarkProfileName::ProofLargeGroup, 262 BenchDatasetConfig::proof_large_group(), 263 BenchmarkThresholds::proof_large_group(), 264 ) 265 } 266 267 pub fn proof_join_storm() -> Self { 268 Self::new( 269 BenchmarkProfileName::ProofJoinStorm, 270 BenchDatasetConfig::proof_join_storm(), 271 BenchmarkThresholds::proof_join_storm(), 272 ) 273 } 274 275 pub fn proof_slow_client() -> Self { 276 Self::new( 277 BenchmarkProfileName::ProofSlowClient, 278 BenchDatasetConfig::proof_slow_client(), 279 BenchmarkThresholds::proof_slow_client(), 280 ) 281 } 282 283 fn new( 284 name: BenchmarkProfileName, 285 dataset_config: BenchDatasetConfig, 286 thresholds: BenchmarkThresholds, 287 ) -> Self { 288 Self { 289 name, 290 dataset_config, 291 thresholds, 292 threshold_source: format!("builtin:{}", name.as_str()), 293 target_hardware_evidence: None, 294 } 295 } 296 297 pub fn name(&self) -> BenchmarkProfileName { 298 self.name 299 } 300 301 pub fn dataset_config(&self) -> BenchDatasetConfig { 302 self.dataset_config 303 } 304 305 pub fn thresholds(&self) -> BenchmarkThresholds { 306 self.thresholds 307 } 308 309 pub fn threshold_source(&self) -> &str { 310 &self.threshold_source 311 } 312 313 pub fn target_hardware_evidence(&self) -> Option<&str> { 314 self.target_hardware_evidence.as_deref() 315 } 316 317 pub fn requires_target_hardware_evidence(&self) -> bool { 318 self.name.is_proof() 319 } 320 321 pub fn validate_for_run(&self) -> Result<(), String> { 322 if self.requires_target_hardware_evidence() && self.target_hardware_evidence.is_none() { 323 return Err(format!( 324 "target hardware evidence is required for `{}` benchmark profile", 325 self.name.as_str() 326 )); 327 } 328 Ok(()) 329 } 330 331 pub fn with_dataset_config(mut self, config: BenchDatasetConfig) -> Result<Self, String> { 332 self.dataset_config = config.validate()?; 333 Ok(self) 334 } 335 336 pub fn with_thresholds( 337 mut self, 338 thresholds: BenchmarkThresholds, 339 source: impl Into<String>, 340 ) -> Result<Self, String> { 341 let source = source.into(); 342 if source.is_empty() { 343 return Err("benchmark threshold source must not be empty".to_owned()); 344 } 345 self.thresholds = thresholds; 346 self.threshold_source = source; 347 Ok(self) 348 } 349 350 pub fn with_target_hardware_evidence( 351 mut self, 352 evidence: impl Into<String>, 353 ) -> Result<Self, String> { 354 let evidence = evidence.into(); 355 if evidence.is_empty() { 356 return Err("target hardware evidence must not be empty".to_owned()); 357 } 358 self.target_hardware_evidence = Some(evidence); 359 Ok(self) 360 } 361 362 pub fn proof_claim_eligible(&self) -> bool { 363 self.name.is_proof() && self.target_hardware_evidence.is_some() 364 } 365 } 366 367 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 368 pub struct VirtualRelayTenancyConfig { 369 pub tenant_count: usize, 370 pub subscriptions_per_tenant: usize, 371 pub max_pending_events_per_subscription: usize, 372 } 373 374 impl VirtualRelayTenancyConfig { 375 pub fn mvp() -> Self { 376 Self { 377 tenant_count: 10, 378 subscriptions_per_tenant: 2_000, 379 max_pending_events_per_subscription: 16, 380 } 381 } 382 383 fn validate(self) -> Result<Self, String> { 384 if self.tenant_count < 10 { 385 return Err("virtual relay tenancy benchmark requires at least 10 tenants".to_owned()); 386 } 387 if self.subscriptions_per_tenant < 2_000 { 388 return Err( 389 "virtual relay tenancy benchmark requires at least 2,000 subscriptions per busiest tenant" 390 .to_owned(), 391 ); 392 } 393 if self.aggregate_active_subscriptions() < 20_000 { 394 return Err( 395 "virtual relay tenancy benchmark requires at least 20,000 aggregate subscriptions" 396 .to_owned(), 397 ); 398 } 399 if self.max_pending_events_per_subscription == 0 { 400 return Err( 401 "virtual relay tenancy benchmark pending event capacity must be greater than zero" 402 .to_owned(), 403 ); 404 } 405 Ok(self) 406 } 407 408 pub fn aggregate_active_subscriptions(self) -> usize { 409 self.tenant_count * self.subscriptions_per_tenant 410 } 411 412 pub fn busiest_tenant_active_subscriptions(self) -> usize { 413 self.subscriptions_per_tenant 414 } 415 416 fn one_percent_fanout(self) -> usize { 417 self.aggregate_active_subscriptions() / 100 418 } 419 420 fn ten_percent_fanout(self) -> usize { 421 self.aggregate_active_subscriptions() / 10 422 } 423 424 fn to_json(self) -> serde_json::Value { 425 json!({ 426 "tenant_count": self.tenant_count, 427 "aggregate_active_subscriptions": self.aggregate_active_subscriptions(), 428 "busiest_tenant_active_subscriptions": self.busiest_tenant_active_subscriptions(), 429 "max_pending_events_per_subscription": self.max_pending_events_per_subscription 430 }) 431 } 432 } 433 434 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 435 pub enum BenchGroupVisibility { 436 Public, 437 Private, 438 Hidden, 439 } 440 441 impl BenchGroupVisibility { 442 pub fn as_str(self) -> &'static str { 443 match self { 444 Self::Public => "public", 445 Self::Private => "private", 446 Self::Hidden => "hidden", 447 } 448 } 449 450 fn flags(self) -> &'static [&'static str] { 451 match self { 452 Self::Public => &[], 453 Self::Private => &["private"], 454 Self::Hidden => &["hidden"], 455 } 456 } 457 } 458 459 #[derive(Debug, Clone, PartialEq, Eq)] 460 pub struct BenchGroup { 461 id: String, 462 visibility: BenchGroupVisibility, 463 } 464 465 impl BenchGroup { 466 pub fn id(&self) -> &str { 467 &self.id 468 } 469 470 pub fn visibility(&self) -> BenchGroupVisibility { 471 self.visibility 472 } 473 } 474 475 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 476 pub enum BenchEventAuth { 477 None, 478 Owner, 479 Admin, 480 } 481 482 #[derive(Debug, Clone, PartialEq, Eq)] 483 pub struct BenchSourceEvent { 484 event: Event, 485 auth: BenchEventAuth, 486 } 487 488 impl BenchSourceEvent { 489 pub fn event(&self) -> &Event { 490 &self.event 491 } 492 493 pub fn auth(&self) -> BenchEventAuth { 494 self.auth 495 } 496 } 497 498 #[derive(Debug, Clone, PartialEq, Eq)] 499 pub struct BenchDataset { 500 config: BenchDatasetConfig, 501 groups: Vec<BenchGroup>, 502 group_create_events: Vec<BenchSourceEvent>, 503 membership_events: Vec<BenchSourceEvent>, 504 group_timeline_events: Vec<BenchSourceEvent>, 505 public_note_events: Vec<BenchSourceEvent>, 506 } 507 508 impl BenchDataset { 509 pub fn generate(config: BenchDatasetConfig) -> Result<Self, String> { 510 let config = config.validate()?; 511 let groups = (0..config.group_count) 512 .map(|index| BenchGroup { 513 id: format!("BenchFarm{index:04}"), 514 visibility: group_visibility(index), 515 }) 516 .collect::<Vec<_>>(); 517 let mut group_create_events = Vec::with_capacity(groups.len()); 518 let mut membership_events = Vec::with_capacity(groups.len() * config.member_count); 519 let mut group_timeline_events = Vec::new(); 520 let mut public_note_events = Vec::with_capacity(config.public_note_count); 521 522 for (group_index, group) in groups.iter().enumerate() { 523 group_create_events.push(BenchSourceEvent { 524 event: pocket_protocol_group_create_event( 525 FixtureKey::Owner, 526 &group.id, 527 1_714_200_000 + u64::try_from(group_index).expect("group index fits in u64"), 528 group.visibility.flags(), 529 )?, 530 auth: BenchEventAuth::Owner, 531 }); 532 for member_index in 0..config.member_count { 533 membership_events.push(BenchSourceEvent { 534 event: bench_member_event(&group.id, group_index, member_index, 1_714_300_000)?, 535 auth: BenchEventAuth::Admin, 536 }); 537 } 538 let per_group = match group.visibility { 539 BenchGroupVisibility::Public => config.public_events_per_group, 540 BenchGroupVisibility::Private | BenchGroupVisibility::Hidden => { 541 config.private_events_per_group 542 } 543 }; 544 for event_index in 0..per_group { 545 let created_at = 1_714_400_000 546 + u64::try_from(group_index * 10_000 + event_index) 547 .expect("event index fits in u64"); 548 group_timeline_events.push(BenchSourceEvent { 549 event: pocket_protocol_group_event( 550 FixtureKey::Owner, 551 &group.id, 552 created_at, 553 1, 554 &format!( 555 "bench {} group event {group_index:04}-{event_index:04}", 556 group.visibility.as_str() 557 ), 558 )?, 559 auth: BenchEventAuth::Owner, 560 }); 561 } 562 } 563 564 for index in 0..config.public_note_count { 565 public_note_events.push(BenchSourceEvent { 566 event: pocket_protocol_event( 567 FixtureKey::Outsider, 568 1_714_500_000 + u64::try_from(index).expect("note index fits in u64"), 569 1, 570 vec![ 571 Tag::from_parts("t", &["tangle-bench"]) 572 .map_err(|error| error.to_string())?, 573 ], 574 &format!("bench public note {index:04}"), 575 )?, 576 auth: BenchEventAuth::None, 577 }); 578 } 579 580 Ok(Self { 581 config, 582 groups, 583 group_create_events, 584 membership_events, 585 group_timeline_events, 586 public_note_events, 587 }) 588 } 589 590 pub fn config(&self) -> BenchDatasetConfig { 591 self.config 592 } 593 594 pub fn groups(&self) -> &[BenchGroup] { 595 &self.groups 596 } 597 598 pub fn source_events(&self) -> Vec<&BenchSourceEvent> { 599 self.group_create_events 600 .iter() 601 .chain(self.membership_events.iter()) 602 .chain(self.group_timeline_events.iter()) 603 .chain(self.public_note_events.iter()) 604 .collect() 605 } 606 607 pub fn source_event_count(&self) -> u64 { 608 self.source_events() 609 .len() 610 .try_into() 611 .expect("source event count fits in u64") 612 } 613 614 pub fn group_event_count(&self) -> u64 { 615 (self.group_create_events.len() 616 + self.membership_events.len() 617 + self.group_timeline_events.len()) 618 .try_into() 619 .expect("group event count fits in u64") 620 } 621 622 pub fn membership_event_count(&self) -> u64 { 623 self.membership_events 624 .len() 625 .try_into() 626 .expect("membership event count fits in u64") 627 } 628 629 pub fn largest_group_members(&self) -> u64 { 630 self.config 631 .member_count 632 .try_into() 633 .expect("member count fits in u64") 634 } 635 636 pub fn dataset_digest(&self) -> Result<String, String> { 637 let mut hasher = Sha256::new(); 638 for event in self.source_events() { 639 let raw = serde_json::to_string(&event_to_value(event.event())) 640 .map_err(|error| error.to_string())?; 641 hasher.update(raw.as_bytes()); 642 hasher.update(b"\n"); 643 } 644 Ok(lower_hex(&hasher.finalize())) 645 } 646 647 pub fn source_events_jsonl(&self) -> Result<String, String> { 648 let mut output = String::new(); 649 for source in self.source_events() { 650 let raw = serde_json::to_string(&event_to_value(source.event())) 651 .map_err(|error| error.to_string())?; 652 output.push_str(&raw); 653 output.push('\n'); 654 } 655 Ok(output) 656 } 657 658 fn first_group(&self, visibility: BenchGroupVisibility) -> Result<&BenchGroup, String> { 659 self.groups 660 .iter() 661 .find(|group| group.visibility == visibility) 662 .ok_or_else(|| format!("dataset does not include {} group", visibility.as_str())) 663 } 664 665 fn first_timeline_event(&self, visibility: BenchGroupVisibility) -> Result<&Event, String> { 666 let group = self.first_group(visibility)?; 667 self.group_timeline_events 668 .iter() 669 .find(|source| event_has_group(source.event(), group.id())) 670 .map(BenchSourceEvent::event) 671 .ok_or_else(|| { 672 format!( 673 "dataset does not include {} timeline event", 674 visibility.as_str() 675 ) 676 }) 677 } 678 } 679 680 #[derive(Debug, Clone, PartialEq, Eq)] 681 pub struct DatasetProfile { 682 pub total_events: u64, 683 pub group_events: u64, 684 pub groups: u64, 685 pub memberships: u64, 686 pub largest_group_members: u64, 687 pub dataset_digest: String, 688 pub fixture_family: String, 689 } 690 691 impl DatasetProfile { 692 fn from_dataset(dataset: &BenchDataset) -> Result<Self, String> { 693 Ok(Self { 694 total_events: dataset.source_event_count(), 695 group_events: dataset.group_event_count(), 696 groups: dataset 697 .groups() 698 .len() 699 .try_into() 700 .expect("group count fits in u64"), 701 memberships: dataset.membership_event_count(), 702 largest_group_members: dataset.largest_group_members(), 703 dataset_digest: dataset.dataset_digest()?, 704 fixture_family: "synthetic repo-owned fixtures".to_owned(), 705 }) 706 } 707 708 fn to_json(&self) -> serde_json::Value { 709 json!({ 710 "total_events": self.total_events, 711 "group_events": self.group_events, 712 "groups": self.groups, 713 "memberships": self.memberships, 714 "largest_group_members": self.largest_group_members, 715 "dataset_digest": self.dataset_digest, 716 "fixture_family": self.fixture_family 717 }) 718 } 719 } 720 721 #[derive(Debug, Clone, PartialEq)] 722 pub struct ScenarioReport { 723 pub scenario: String, 724 pub attempted: u64, 725 pub accepted: u64, 726 pub rejected: u64, 727 pub elapsed_micros: u64, 728 pub events_per_second: f64, 729 pub p50_micros: u64, 730 pub p95_micros: u64, 731 pub p99_micros: u64, 732 pub max_rss_bytes: u64, 733 pub observations: BTreeMap<String, serde_json::Value>, 734 } 735 736 impl ScenarioReport { 737 fn new( 738 scenario: &str, 739 attempted: u64, 740 accepted: u64, 741 rejected: u64, 742 elapsed_micros: u64, 743 mut samples: Vec<u64>, 744 max_rss_bytes: u64, 745 ) -> Self { 746 samples.sort_unstable(); 747 let events_per_second = if elapsed_micros == 0 { 748 0.0 749 } else { 750 attempted as f64 * 1_000_000.0 / elapsed_micros as f64 751 }; 752 Self { 753 scenario: scenario.to_owned(), 754 attempted, 755 accepted, 756 rejected, 757 elapsed_micros, 758 events_per_second, 759 p50_micros: percentile(&samples, 50), 760 p95_micros: percentile(&samples, 95), 761 p99_micros: percentile(&samples, 99), 762 max_rss_bytes, 763 observations: BTreeMap::new(), 764 } 765 } 766 767 fn with_observations(mut self, observations: BTreeMap<String, serde_json::Value>) -> Self { 768 self.observations = observations; 769 self 770 } 771 772 fn pass_latency_gate(&self, p95_threshold_micros: u64) -> bool { 773 self.rejected == 0 774 && self.accepted == self.attempted 775 && self.p95_micros <= p95_threshold_micros 776 } 777 778 fn pass_elapsed_gate(&self, elapsed_threshold_micros: u64) -> bool { 779 self.rejected == 0 780 && self.accepted == self.attempted 781 && self.elapsed_micros <= elapsed_threshold_micros 782 } 783 784 fn pass_memory_gate(&self, max_bytes: u64) -> bool { 785 self.rejected == 0 && self.accepted == self.attempted && self.max_rss_bytes <= max_bytes 786 } 787 788 fn to_json(&self) -> serde_json::Value { 789 json!({ 790 "scenario": self.scenario, 791 "status": status(self.accepted == self.attempted && self.rejected == 0), 792 "attempted": self.attempted, 793 "accepted": self.accepted, 794 "rejected": self.rejected, 795 "elapsed_micros": self.elapsed_micros, 796 "events_per_second": self.events_per_second, 797 "p50_micros": self.p50_micros, 798 "p95_micros": self.p95_micros, 799 "p99_micros": self.p99_micros, 800 "max_rss_bytes": self.max_rss_bytes, 801 "query_metrics": { 802 "candidates_scanned": self.attempted, 803 "events_returned": self.accepted, 804 "events_rejected": self.rejected 805 }, 806 "memory": { 807 "max_rss_bytes": self.max_rss_bytes 808 }, 809 "observations": &self.observations 810 }) 811 } 812 } 813 814 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 815 pub struct BenchmarkThresholds { 816 pub pocket_query_p95_micros: u64, 817 pub read_gate_p95_micros: u64, 818 pub count_resource_controls_p95_micros: u64, 819 pub projection_rebuild_elapsed_micros: u64, 820 pub outbox_replay_elapsed_micros: u64, 821 pub broadcast_lag_p95_micros: u64, 822 pub memory_profile_max_bytes: u64, 823 } 824 825 impl BenchmarkThresholds { 826 pub fn smoke() -> Self { 827 Self { 828 pocket_query_p95_micros: 1_000_000, 829 read_gate_p95_micros: 1_000_000, 830 count_resource_controls_p95_micros: 1_000_000, 831 projection_rebuild_elapsed_micros: 5_000_000, 832 outbox_replay_elapsed_micros: 5_000_000, 833 broadcast_lag_p95_micros: 1_000_000, 834 memory_profile_max_bytes: 512 * 1024 * 1024, 835 } 836 } 837 838 pub fn medium() -> Self { 839 Self { 840 pocket_query_p95_micros: 2_500_000, 841 read_gate_p95_micros: 2_500_000, 842 count_resource_controls_p95_micros: 2_500_000, 843 projection_rebuild_elapsed_micros: 15_000_000, 844 outbox_replay_elapsed_micros: 15_000_000, 845 broadcast_lag_p95_micros: 2_500_000, 846 memory_profile_max_bytes: 768 * 1024 * 1024, 847 } 848 } 849 850 pub fn large_smoke() -> Self { 851 Self { 852 pocket_query_p95_micros: 5_000_000, 853 read_gate_p95_micros: 5_000_000, 854 count_resource_controls_p95_micros: 5_000_000, 855 projection_rebuild_elapsed_micros: 60_000_000, 856 outbox_replay_elapsed_micros: 60_000_000, 857 broadcast_lag_p95_micros: 5_000_000, 858 memory_profile_max_bytes: 1024 * 1024 * 1024, 859 } 860 } 861 862 pub fn proof_10m() -> Self { 863 Self { 864 pocket_query_p95_micros: 10_000_000, 865 read_gate_p95_micros: 10_000_000, 866 count_resource_controls_p95_micros: 10_000_000, 867 projection_rebuild_elapsed_micros: 300_000_000, 868 outbox_replay_elapsed_micros: 300_000_000, 869 broadcast_lag_p95_micros: 10_000_000, 870 memory_profile_max_bytes: 16 * 1024 * 1024 * 1024, 871 } 872 } 873 874 pub fn proof_large_group() -> Self { 875 Self { 876 pocket_query_p95_micros: 10_000_000, 877 read_gate_p95_micros: 10_000_000, 878 count_resource_controls_p95_micros: 10_000_000, 879 projection_rebuild_elapsed_micros: 300_000_000, 880 outbox_replay_elapsed_micros: 300_000_000, 881 broadcast_lag_p95_micros: 10_000_000, 882 memory_profile_max_bytes: 16 * 1024 * 1024 * 1024, 883 } 884 } 885 886 pub fn proof_join_storm() -> Self { 887 Self { 888 pocket_query_p95_micros: 10_000_000, 889 read_gate_p95_micros: 10_000_000, 890 count_resource_controls_p95_micros: 10_000_000, 891 projection_rebuild_elapsed_micros: 300_000_000, 892 outbox_replay_elapsed_micros: 300_000_000, 893 broadcast_lag_p95_micros: 10_000_000, 894 memory_profile_max_bytes: 16 * 1024 * 1024 * 1024, 895 } 896 } 897 898 pub fn proof_slow_client() -> Self { 899 Self { 900 pocket_query_p95_micros: 10_000_000, 901 read_gate_p95_micros: 10_000_000, 902 count_resource_controls_p95_micros: 10_000_000, 903 projection_rebuild_elapsed_micros: 300_000_000, 904 outbox_replay_elapsed_micros: 300_000_000, 905 broadcast_lag_p95_micros: 10_000_000, 906 memory_profile_max_bytes: 16 * 1024 * 1024 * 1024, 907 } 908 } 909 910 pub fn from_json_str(raw: &str) -> Result<Self, String> { 911 let value = serde_json::from_str::<serde_json::Value>(raw) 912 .map_err(|error| format!("benchmark thresholds JSON is invalid: {error}"))?; 913 Self::from_json_value(&value) 914 } 915 916 pub fn from_json_value(value: &serde_json::Value) -> Result<Self, String> { 917 let object = value 918 .as_object() 919 .ok_or_else(|| "benchmark thresholds JSON must be an object".to_owned())?; 920 for key in object.keys() { 921 if !benchmark_threshold_fields().contains(&key.as_str()) { 922 return Err(format!("unknown benchmark threshold field `{key}`")); 923 } 924 } 925 Ok(Self { 926 pocket_query_p95_micros: threshold_u64(value, "pocket_query_p95_micros")?, 927 read_gate_p95_micros: threshold_u64(value, "read_gate_p95_micros")?, 928 count_resource_controls_p95_micros: threshold_u64( 929 value, 930 "count_resource_controls_p95_micros", 931 )?, 932 projection_rebuild_elapsed_micros: threshold_u64( 933 value, 934 "projection_rebuild_elapsed_micros", 935 )?, 936 outbox_replay_elapsed_micros: threshold_u64(value, "outbox_replay_elapsed_micros")?, 937 broadcast_lag_p95_micros: threshold_u64(value, "broadcast_lag_p95_micros")?, 938 memory_profile_max_bytes: threshold_u64(value, "memory_profile_max_bytes")?, 939 }) 940 } 941 942 pub fn to_json(self) -> serde_json::Value { 943 json!({ 944 "pocket_query_p95_micros": self.pocket_query_p95_micros, 945 "read_gate_p95_micros": self.read_gate_p95_micros, 946 "count_resource_controls_p95_micros": self.count_resource_controls_p95_micros, 947 "projection_rebuild_elapsed_micros": self.projection_rebuild_elapsed_micros, 948 "outbox_replay_elapsed_micros": self.outbox_replay_elapsed_micros, 949 "broadcast_lag_p95_micros": self.broadcast_lag_p95_micros, 950 "memory_profile_max_bytes": self.memory_profile_max_bytes 951 }) 952 } 953 } 954 955 fn benchmark_threshold_fields() -> [&'static str; 7] { 956 [ 957 "pocket_query_p95_micros", 958 "read_gate_p95_micros", 959 "count_resource_controls_p95_micros", 960 "projection_rebuild_elapsed_micros", 961 "outbox_replay_elapsed_micros", 962 "broadcast_lag_p95_micros", 963 "memory_profile_max_bytes", 964 ] 965 } 966 967 fn threshold_u64(value: &serde_json::Value, field: &str) -> Result<u64, String> { 968 let value = value 969 .get(field) 970 .ok_or_else(|| format!("missing benchmark threshold field `{field}`"))?; 971 let Some(value) = value.as_u64() else { 972 return Err(format!( 973 "benchmark threshold field `{field}` must be an unsigned integer" 974 )); 975 }; 976 if value == 0 { 977 return Err(format!( 978 "benchmark threshold field `{field}` must be greater than zero" 979 )); 980 } 981 Ok(value) 982 } 983 984 #[derive(Debug, Clone, PartialEq)] 985 pub struct BenchmarkRunReport { 986 dataset: BenchDataset, 987 dataset_profile: DatasetProfile, 988 profile: BenchmarkProfile, 989 scenarios: Vec<ScenarioReport>, 990 validation_summary: BTreeMap<String, String>, 991 } 992 993 impl BenchmarkRunReport { 994 pub fn run(profile: BenchmarkProfile) -> Result<Self, String> { 995 profile.validate_for_run()?; 996 let dataset = BenchDataset::generate(profile.dataset_config())?; 997 let thresholds = profile.thresholds(); 998 let pocket_query = run_pocket_query_benchmark(&dataset)?; 999 let read_gate = run_read_gate_benchmark(&dataset)?; 1000 let count_resource_controls = run_count_resource_control_benchmark(&dataset)?; 1001 let projection_rebuild = run_projection_rebuild_benchmark(&dataset)?; 1002 let outbox_replay = run_outbox_replay_benchmark(&dataset)?; 1003 let broadcast_lag = run_broadcast_lag_benchmark(&dataset)?; 1004 let memory_profile = run_memory_profile_benchmark(&dataset)?; 1005 let mut scenarios = vec![ 1006 pocket_query, 1007 read_gate, 1008 count_resource_controls, 1009 projection_rebuild, 1010 outbox_replay, 1011 broadcast_lag, 1012 memory_profile, 1013 ]; 1014 if profile.name() == BenchmarkProfileName::VirtualRelayTenancy { 1015 scenarios.extend(run_virtual_relay_tenancy_benchmarks( 1016 VirtualRelayTenancyConfig::mvp(), 1017 )?); 1018 } 1019 let validation_summary = validation_summary(&scenarios, thresholds)?; 1020 let dataset_profile = DatasetProfile::from_dataset(&dataset)?; 1021 Ok(Self { 1022 dataset, 1023 dataset_profile, 1024 profile, 1025 scenarios, 1026 validation_summary, 1027 }) 1028 } 1029 1030 pub fn dataset(&self) -> &BenchDataset { 1031 &self.dataset 1032 } 1033 1034 pub fn dataset_profile(&self) -> &DatasetProfile { 1035 &self.dataset_profile 1036 } 1037 1038 pub fn profile(&self) -> &BenchmarkProfile { 1039 &self.profile 1040 } 1041 1042 pub fn scenarios(&self) -> &[ScenarioReport] { 1043 &self.scenarios 1044 } 1045 1046 pub fn scenario(&self, name: &str) -> Option<&ScenarioReport> { 1047 self.scenarios 1048 .iter() 1049 .find(|scenario| scenario.scenario == name) 1050 } 1051 1052 pub fn validation_summary(&self) -> &BTreeMap<String, String> { 1053 &self.validation_summary 1054 } 1055 1056 pub fn summary_json(&self, run_id: &str, artifact_directory: &Path) -> serde_json::Value { 1057 json!({ 1058 "schema": 2, 1059 "run_id": run_id, 1060 "artifact_directory": artifact_directory.to_string_lossy(), 1061 "profile": self.profile.name().as_str(), 1062 "dataset": self.dataset_profile.to_json(), 1063 "dataset_profile": self.dataset_profile.to_json(), 1064 "scenarios": self.scenarios.iter().map(ScenarioReport::to_json).collect::<Vec<_>>(), 1065 "pocket_source": pocket_source_json(), 1066 "threshold_source": self.profile.threshold_source(), 1067 "thresholds": self.profile.thresholds().to_json(), 1068 "validation_summary": self.validation_summary, 1069 "pass_fail_summary": { 1070 "overall_status": validation_overall_status(&self.validation_summary), 1071 "passed_scenarios": self 1072 .validation_summary 1073 .values() 1074 .filter(|value| value.as_str() == "pass") 1075 .count(), 1076 "failed_scenarios": self 1077 .validation_summary 1078 .values() 1079 .filter(|value| value.as_str() == "fail") 1080 .count() 1081 }, 1082 "proof_claim": { 1083 "eligible": self.profile.proof_claim_eligible(), 1084 "profile_required": "proof-*", 1085 "target_hardware_evidence": self 1086 .profile 1087 .target_hardware_evidence() 1088 .unwrap_or("absent") 1089 }, 1090 "tangle_v1_mvp": { 1091 "benchmark_evidence": virtual_relay_tenancy_summary_json( 1092 self.profile.name(), 1093 &self.scenarios 1094 ), 1095 "external_integration": external_integration_json() 1096 }, 1097 "artifacts": { 1098 "summary_json": "summary.json", 1099 "dataset_events_jsonl": "dataset-events.jsonl" 1100 } 1101 }) 1102 } 1103 } 1104 1105 fn pocket_source_json() -> serde_json::Value { 1106 json!({ 1107 "repository": POCKET_SOURCE_REPOSITORY, 1108 "revision": POCKET_SOURCE_REVISION, 1109 "crates": ["pocket-db", "pocket-types"] 1110 }) 1111 } 1112 1113 fn virtual_relay_tenancy_summary_json( 1114 profile_name: BenchmarkProfileName, 1115 scenarios: &[ScenarioReport], 1116 ) -> serde_json::Value { 1117 let fanout_scenarios = [ 1118 SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT, 1119 SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT, 1120 SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT, 1121 ] 1122 .into_iter() 1123 .filter_map(|name| scenarios.iter().find(|scenario| scenario.scenario == name)) 1124 .map(ScenarioReport::to_json) 1125 .collect::<Vec<_>>(); 1126 let status = if profile_name == BenchmarkProfileName::VirtualRelayTenancy 1127 && fanout_scenarios.len() == 3 1128 { 1129 "pass" 1130 } else { 1131 "not_run" 1132 }; 1133 json!({ 1134 "status": status, 1135 "required_profile": BenchmarkProfileName::VirtualRelayTenancy.as_str(), 1136 "configured_profile": profile_name.as_str(), 1137 "target": VirtualRelayTenancyConfig::mvp().to_json(), 1138 "fanout_scenarios": fanout_scenarios 1139 }) 1140 } 1141 1142 fn external_integration_json() -> serde_json::Value { 1143 json!({ 1144 "runner": "radroots_testing_tangle_integration", 1145 "local_status": "unavailable", 1146 "unavailable_command": "cargo test -p radroots_testing_tangle_integration -- tangle_v1_mvp", 1147 "repo_boundary": "not present in the standalone Tangle workspace", 1148 "required_scenarios": [ 1149 "host_routing", 1150 "tenant_isolation", 1151 "auth_realm_isolation", 1152 "nip29_same_group_isolation", 1153 "backup_export_boundaries" 1154 ] 1155 }) 1156 } 1157 1158 fn validation_overall_status(summary: &BTreeMap<String, String>) -> &'static str { 1159 if summary.values().all(|value| value == "pass") { 1160 "pass" 1161 } else { 1162 "fail" 1163 } 1164 } 1165 1166 struct MaterializedBenchRelay { 1167 relay: BaseRelay, 1168 store_config: PocketStoreConfig, 1169 ingest_report: ScenarioReport, 1170 } 1171 1172 fn run_pocket_query_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1173 let mut materialized = materialize_dataset(dataset, "pocket-query", 128)?; 1174 let public_group = dataset.first_group(BenchGroupVisibility::Public)?; 1175 let public_event = dataset.first_timeline_event(BenchGroupVisibility::Public)?; 1176 let owner_auth = authenticated(FixtureKey::Owner)?; 1177 let owner = FixtureKey::Owner.public_key(); 1178 let created_at = public_event.unsigned().created_at().as_u64(); 1179 let operations = vec![ 1180 QueryOperation::new( 1181 "pocket-h", 1182 filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()], "limit": 50}))?, 1183 QueryAuth::None, 1184 QueryExpectation::AtLeast(1), 1185 ), 1186 QueryOperation::new( 1187 "pocket-d", 1188 filter_from_value(&json!({ 1189 "kinds": [KIND_GROUP_METADATA], 1190 "#d": [public_group.id()], 1191 "limit": 10 1192 }))?, 1193 QueryAuth::None, 1194 QueryExpectation::AtLeast(1), 1195 ), 1196 QueryOperation::new( 1197 "pocket-kind-author-window-limit", 1198 filter_from_value(&json!({ 1199 "kinds": [1], 1200 "authors": [owner.as_str()], 1201 "since": created_at.saturating_sub(1), 1202 "until": created_at.saturating_add(100_000), 1203 "limit": 25 1204 }))?, 1205 QueryAuth::Owner, 1206 QueryExpectation::AtLeast(1), 1207 ), 1208 QueryOperation::new( 1209 "pocket-count", 1210 filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?, 1211 QueryAuth::None, 1212 QueryExpectation::AtLeast(1), 1213 ), 1214 ]; 1215 let started = Instant::now(); 1216 let mut samples = Vec::with_capacity(operations.len()); 1217 let mut accepted = 0; 1218 let mut rejected = 0; 1219 for operation in operations { 1220 let sample = Instant::now(); 1221 let observed = match operation.name { 1222 "pocket-count" => count_for_operation(&materialized.relay, &operation, &owner_auth)?, 1223 _ => query_for_operation(&mut materialized.relay, &operation, &owner_auth)?, 1224 }; 1225 samples.push(elapsed_micros(sample)); 1226 if operation.expectation.matches(observed) { 1227 accepted += 1; 1228 } else { 1229 rejected += 1; 1230 } 1231 } 1232 Ok(ScenarioReport::new( 1233 SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, 1234 accepted + rejected, 1235 accepted, 1236 rejected, 1237 elapsed_micros(started), 1238 samples, 1239 materialized.ingest_report.max_rss_bytes, 1240 )) 1241 } 1242 1243 fn run_read_gate_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1244 let mut materialized = materialize_dataset(dataset, "read-gate", 128)?; 1245 let public_group = dataset.first_group(BenchGroupVisibility::Public)?; 1246 let private_group = dataset.first_group(BenchGroupVisibility::Private)?; 1247 let hidden_group = dataset.first_group(BenchGroupVisibility::Hidden)?; 1248 let owner_auth = authenticated(FixtureKey::Owner)?; 1249 let operations = vec![ 1250 QueryOperation::new( 1251 "public-unauth", 1252 filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?, 1253 QueryAuth::None, 1254 QueryExpectation::AtLeast(1), 1255 ), 1256 QueryOperation::new( 1257 "private-unauth", 1258 filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, 1259 QueryAuth::None, 1260 QueryExpectation::Exactly(0), 1261 ), 1262 QueryOperation::new( 1263 "private-owner", 1264 filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, 1265 QueryAuth::Owner, 1266 QueryExpectation::AtLeast(1), 1267 ), 1268 QueryOperation::new( 1269 "hidden-metadata-unauth", 1270 filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?, 1271 QueryAuth::None, 1272 QueryExpectation::Exactly(0), 1273 ), 1274 QueryOperation::new( 1275 "hidden-metadata-owner", 1276 filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?, 1277 QueryAuth::Owner, 1278 QueryExpectation::AtLeast(1), 1279 ), 1280 QueryOperation::new( 1281 "private-count-unauth", 1282 filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, 1283 QueryAuth::None, 1284 QueryExpectation::Exactly(0), 1285 ), 1286 QueryOperation::new( 1287 "private-count-owner", 1288 filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, 1289 QueryAuth::Owner, 1290 QueryExpectation::AtLeast(1), 1291 ), 1292 ]; 1293 let started = Instant::now(); 1294 let mut samples = Vec::with_capacity(operations.len()); 1295 let mut accepted = 0; 1296 let mut rejected = 0; 1297 for operation in operations { 1298 let sample = Instant::now(); 1299 let observed = if operation.name.contains("count") { 1300 count_for_operation(&materialized.relay, &operation, &owner_auth)? 1301 } else { 1302 query_for_operation(&mut materialized.relay, &operation, &owner_auth)? 1303 }; 1304 samples.push(elapsed_micros(sample)); 1305 if operation.expectation.matches(observed) { 1306 accepted += 1; 1307 } else { 1308 rejected += 1; 1309 } 1310 } 1311 Ok(ScenarioReport::new( 1312 SCENARIO_GROUP_READ_GATE_OVERHEAD, 1313 accepted + rejected, 1314 accepted, 1315 rejected, 1316 elapsed_micros(started), 1317 samples, 1318 materialized.ingest_report.max_rss_bytes, 1319 )) 1320 } 1321 1322 fn run_count_resource_control_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1323 let materialized = materialize_dataset(dataset, "count-resource-controls", 128)?; 1324 let public_group = dataset.first_group(BenchGroupVisibility::Public)?; 1325 let private_group = dataset.first_group(BenchGroupVisibility::Private)?; 1326 let owner_auth = authenticated(FixtureKey::Owner)?; 1327 let operations = vec![ 1328 QueryOperation::new( 1329 "bounded-public-count", 1330 filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?, 1331 QueryAuth::None, 1332 QueryExpectation::Exactly( 1333 dataset 1334 .config 1335 .public_events_per_group 1336 .try_into() 1337 .expect("public event count fits in u64"), 1338 ), 1339 ), 1340 QueryOperation::new( 1341 "bounded-private-owner-count", 1342 filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, 1343 QueryAuth::Owner, 1344 QueryExpectation::Exactly( 1345 dataset 1346 .config 1347 .private_events_per_group 1348 .try_into() 1349 .expect("private event count fits in u64"), 1350 ), 1351 ), 1352 ]; 1353 let started = Instant::now(); 1354 let mut samples = Vec::with_capacity(operations.len() + 3); 1355 let mut accepted = 0; 1356 let mut rejected = 0; 1357 for operation in operations { 1358 let sample = Instant::now(); 1359 let observed = count_for_operation(&materialized.relay, &operation, &owner_auth)?; 1360 samples.push(elapsed_micros(sample)); 1361 if operation.expectation.matches(observed) { 1362 accepted += 1; 1363 } else { 1364 rejected += 1; 1365 } 1366 } 1367 let runtime = tokio::runtime::Builder::new_current_thread() 1368 .build() 1369 .map_err(|error| format!("failed to build count resource benchmark runtime: {error}"))?; 1370 let probe = runtime.block_on(runtime_count_resource_control_probe())?; 1371 samples.extend(probe.samples); 1372 accepted += probe.accepted; 1373 rejected += probe.rejected; 1374 Ok(ScenarioReport::new( 1375 SCENARIO_COUNT_RESOURCE_CONTROLS, 1376 accepted + rejected, 1377 accepted, 1378 rejected, 1379 elapsed_micros(started), 1380 samples, 1381 materialized.ingest_report.max_rss_bytes, 1382 )) 1383 } 1384 1385 fn run_projection_rebuild_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1386 let mut materialized = materialize_dataset(dataset, "projection-rebuild", 128)?; 1387 materialized 1388 .relay 1389 .shutdown() 1390 .map_err(|error| error.to_string())?; 1391 let started = Instant::now(); 1392 let reopened = BaseRelay::open_with_groups( 1393 &materialized.store_config, 1394 relay_limits(128), 1395 &group_config()?, 1396 PocketQueryConfig::default(), 1397 ) 1398 .map_err(|error| error.to_string())?; 1399 let elapsed = elapsed_micros(started); 1400 let projection = reopened 1401 .group_projection() 1402 .ok_or_else(|| "group projection is unavailable".to_owned())?; 1403 let groups_match = projection.groups().len() == dataset.groups().len(); 1404 let members_match = projection 1405 .members() 1406 .values() 1407 .filter(|member| member.status() == MemberStatus::Member) 1408 .count() 1409 == usize::try_from(dataset.membership_event_count()) 1410 .expect("membership count fits in usize"); 1411 let accepted = u64::from(groups_match && members_match); 1412 Ok(ScenarioReport::new( 1413 SCENARIO_PROJECTION_REBUILD, 1414 1, 1415 accepted, 1416 1 - accepted, 1417 elapsed, 1418 vec![elapsed], 1419 estimate_memory_bytes(dataset), 1420 )) 1421 } 1422 1423 fn run_outbox_replay_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1424 let mut materialized = materialize_dataset(dataset, "outbox-replay", 128)?; 1425 let before = generated_state_counts(&materialized.relay)?; 1426 materialized 1427 .relay 1428 .shutdown() 1429 .map_err(|error| error.to_string())?; 1430 let started = Instant::now(); 1431 let mut reopened = BaseRelay::open_with_groups( 1432 &materialized.store_config, 1433 relay_limits(128), 1434 &group_config()?, 1435 PocketQueryConfig::default(), 1436 ) 1437 .map_err(|error| error.to_string())?; 1438 let after_first = generated_state_counts(&reopened)?; 1439 reopened.shutdown().map_err(|error| error.to_string())?; 1440 let reopened = BaseRelay::open_with_groups( 1441 &materialized.store_config, 1442 relay_limits(128), 1443 &group_config()?, 1444 PocketQueryConfig::default(), 1445 ) 1446 .map_err(|error| error.to_string())?; 1447 let after_second = generated_state_counts(&reopened)?; 1448 let elapsed = elapsed_micros(started); 1449 let accepted = u64::from(before == after_first && before == after_second); 1450 Ok(ScenarioReport::new( 1451 SCENARIO_OUTBOX_REPLAY, 1452 1, 1453 accepted, 1454 1 - accepted, 1455 elapsed, 1456 vec![elapsed], 1457 estimate_memory_bytes(dataset), 1458 )) 1459 } 1460 1461 fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1462 let mut materialized = materialize_dataset(dataset, "broadcast-lag", 1)?; 1463 let public_group = dataset.first_group(BenchGroupVisibility::Public)?; 1464 let subscriber_count = dataset.config.group_count.max(4); 1465 let filter = filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?; 1466 let pocket_filter = pocket_filter(&filter)?; 1467 for index in 0..subscriber_count { 1468 materialized 1469 .relay 1470 .handle_pocket_req( 1471 subscription(&format!("lag-{index:04}"))?, 1472 vec![pocket_filter.clone()], 1473 ) 1474 .map_err(|error| error.to_string())?; 1475 } 1476 let first = pocket_protocol_group_event( 1477 FixtureKey::Owner, 1478 public_group.id(), 1479 1_714_600_000, 1480 1, 1481 "broadcast lag first", 1482 )?; 1483 let second = pocket_protocol_group_event( 1484 FixtureKey::Owner, 1485 public_group.id(), 1486 1_714_600_001, 1487 1, 1488 "broadcast lag second", 1489 )?; 1490 let started = Instant::now(); 1491 let first_pocket = pocket_event(&first)?; 1492 let second_pocket = pocket_event(&second)?; 1493 let first_messages = materialized.relay.fanout_pocket(&first_pocket); 1494 let second_messages = materialized.relay.fanout_pocket(&second_pocket); 1495 let elapsed = elapsed_micros(started); 1496 let first_events = first_messages 1497 .iter() 1498 .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) 1499 .count(); 1500 let second_events = second_messages 1501 .iter() 1502 .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) 1503 .count(); 1504 let accepted = if first_events == subscriber_count 1505 && second_events == subscriber_count 1506 && materialized.relay.active_subscription_count() == subscriber_count 1507 { 1508 subscriber_count 1509 } else { 1510 0 1511 }; 1512 let attempted = subscriber_count 1513 .try_into() 1514 .expect("subscriber count fits in u64"); 1515 let accepted = accepted.try_into().expect("accepted fits in u64"); 1516 Ok(ScenarioReport::new( 1517 SCENARIO_BROADCAST_LAG, 1518 attempted, 1519 accepted, 1520 attempted - accepted, 1521 elapsed, 1522 vec![elapsed], 1523 materialized.ingest_report.max_rss_bytes, 1524 )) 1525 } 1526 1527 fn run_memory_profile_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { 1528 let started = Instant::now(); 1529 let estimated = estimate_memory_bytes(dataset); 1530 let elapsed = elapsed_micros(started); 1531 Ok(ScenarioReport::new( 1532 SCENARIO_MEMORY_PROFILE, 1533 1, 1534 1, 1535 0, 1536 elapsed, 1537 vec![elapsed], 1538 estimated, 1539 )) 1540 } 1541 1542 struct VirtualRelayTenantBench { 1543 tenant_index: usize, 1544 relay: BaseRelay, 1545 } 1546 1547 fn run_virtual_relay_tenancy_benchmarks( 1548 config: VirtualRelayTenancyConfig, 1549 ) -> Result<Vec<ScenarioReport>, String> { 1550 let config = config.validate()?; 1551 let mut tenants = materialize_virtual_relay_tenants(config)?; 1552 let scenarios = [ 1553 ( 1554 SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT, 1555 "fanout-001", 1556 1, 1557 config.one_percent_fanout(), 1558 ), 1559 ( 1560 SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT, 1561 "fanout-010", 1562 10, 1563 config.ten_percent_fanout(), 1564 ), 1565 ( 1566 SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT, 1567 "fanout-100", 1568 100, 1569 config.aggregate_active_subscriptions(), 1570 ), 1571 ]; 1572 let mut reports = Vec::with_capacity(scenarios.len()); 1573 for (scenario, tag_value, fanout_percent, expected_messages) in scenarios { 1574 reports.push(run_virtual_relay_fanout_scenario( 1575 &mut tenants, 1576 config, 1577 scenario, 1578 tag_value, 1579 fanout_percent, 1580 expected_messages, 1581 )?); 1582 } 1583 for tenant in &mut tenants { 1584 tenant.relay.shutdown().map_err(|error| error.to_string())?; 1585 } 1586 Ok(reports) 1587 } 1588 1589 fn materialize_virtual_relay_tenants( 1590 config: VirtualRelayTenancyConfig, 1591 ) -> Result<Vec<VirtualRelayTenantBench>, String> { 1592 let mut tenants = Vec::with_capacity(config.tenant_count); 1593 for tenant_index in 0..config.tenant_count { 1594 let store_config = bench_store_config(&format!("virtual-relay-tenancy-{tenant_index}"))?; 1595 let mut relay = BaseRelay::open( 1596 &store_config, 1597 relay_limits_with_subscriptions( 1598 config.max_pending_events_per_subscription, 1599 config.subscriptions_per_tenant, 1600 ), 1601 PocketQueryConfig::default(), 1602 ) 1603 .map_err(|error| error.to_string())?; 1604 for subscription_index in 0..config.subscriptions_per_tenant { 1605 let global_index = tenant_index * config.subscriptions_per_tenant + subscription_index; 1606 let filter = virtual_relay_subscription_filter(config, tenant_index, global_index)?; 1607 relay 1608 .handle_pocket_req( 1609 subscription(&format!("vr-{tenant_index:02}-{subscription_index:04}"))?, 1610 vec![filter], 1611 ) 1612 .map_err(|error| error.to_string())?; 1613 } 1614 tenants.push(VirtualRelayTenantBench { 1615 tenant_index, 1616 relay, 1617 }); 1618 } 1619 Ok(tenants) 1620 } 1621 1622 fn virtual_relay_subscription_filter( 1623 config: VirtualRelayTenancyConfig, 1624 tenant_index: usize, 1625 global_index: usize, 1626 ) -> Result<PocketOwnedFilter, String> { 1627 let mut tag_values = vec!["fanout-100"]; 1628 if tenant_index == 0 { 1629 tag_values.push("fanout-010"); 1630 } 1631 if global_index < config.one_percent_fanout() { 1632 tag_values.push("fanout-001"); 1633 } 1634 pocket_filter_from_value(&json!({"kinds": [1], "#t": tag_values})) 1635 } 1636 1637 fn run_virtual_relay_fanout_scenario( 1638 tenants: &mut [VirtualRelayTenantBench], 1639 config: VirtualRelayTenancyConfig, 1640 scenario: &str, 1641 tag_value: &str, 1642 fanout_percent: u64, 1643 expected_messages: usize, 1644 ) -> Result<ScenarioReport, String> { 1645 let started = Instant::now(); 1646 let mut samples = Vec::new(); 1647 let mut observed_messages = 0_usize; 1648 let mut publish_tasks = 0_usize; 1649 for tenant in tenants { 1650 if !virtual_relay_tenant_participates(fanout_percent, tenant.tenant_index) { 1651 continue; 1652 } 1653 let publish_started = Instant::now(); 1654 let event = pocket_protocol_event( 1655 FixtureKey::Member, 1656 1_714_800_000 1657 + u64::try_from(tenant.tenant_index).expect("tenant index fits in u64") 1658 + fanout_percent, 1659 1, 1660 vec![Tag::from_parts("t", &[tag_value]).map_err(|error| error.to_string())?], 1661 &format!("virtual relay fanout {fanout_percent} percent"), 1662 )?; 1663 let pocket = pocket_event(&event)?; 1664 let messages = tenant.relay.fanout_pocket(&pocket); 1665 samples.push(elapsed_micros(publish_started)); 1666 observed_messages += messages 1667 .iter() 1668 .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) 1669 .count(); 1670 publish_tasks += 1; 1671 } 1672 let elapsed = elapsed_micros(started); 1673 let attempted = u64::try_from(expected_messages).expect("expected message count fits in u64"); 1674 let accepted = u64::try_from(observed_messages).expect("observed message count fits in u64"); 1675 let rejected = attempted.saturating_sub(accepted) + accepted.saturating_sub(attempted); 1676 let max_rss_bytes = estimate_virtual_relay_memory_bytes(config); 1677 let mut observations = BTreeMap::new(); 1678 observations.insert("tenant_count".to_owned(), json!(config.tenant_count)); 1679 observations.insert( 1680 "aggregate_active_subscriptions".to_owned(), 1681 json!(config.aggregate_active_subscriptions()), 1682 ); 1683 observations.insert( 1684 "busiest_tenant_active_subscriptions".to_owned(), 1685 json!(config.busiest_tenant_active_subscriptions()), 1686 ); 1687 observations.insert("fanout_percent".to_owned(), json!(fanout_percent)); 1688 observations.insert("publish_task_count".to_owned(), json!(publish_tasks)); 1689 observations.insert( 1690 "expected_enqueued_events".to_owned(), 1691 json!(expected_messages), 1692 ); 1693 observations.insert( 1694 "observed_enqueued_events".to_owned(), 1695 json!(observed_messages), 1696 ); 1697 observations.insert( 1698 "observable_queue_depth".to_owned(), 1699 json!(observed_messages), 1700 ); 1701 observations.insert( 1702 "max_pending_events_per_subscription".to_owned(), 1703 json!(config.max_pending_events_per_subscription), 1704 ); 1705 observations.insert("memory_estimate_bytes".to_owned(), json!(max_rss_bytes)); 1706 observations.insert( 1707 "tenant_publish_model".to_owned(), 1708 json!("one publish per targeted virtual relay tenant"), 1709 ); 1710 Ok(ScenarioReport::new( 1711 scenario, 1712 attempted, 1713 accepted, 1714 rejected, 1715 elapsed, 1716 samples, 1717 max_rss_bytes, 1718 ) 1719 .with_observations(observations)) 1720 } 1721 1722 fn virtual_relay_tenant_participates(fanout_percent: u64, tenant_index: usize) -> bool { 1723 match fanout_percent { 1724 1 | 10 => tenant_index == 0, 1725 100 => true, 1726 _ => false, 1727 } 1728 } 1729 1730 struct CountResourceControlProbe { 1731 accepted: u64, 1732 rejected: u64, 1733 samples: Vec<u64>, 1734 } 1735 1736 async fn runtime_count_resource_control_probe() -> Result<CountResourceControlProbe, String> { 1737 let root = bench_temp_root("count-resource-controls-runtime"); 1738 let _ = fs::remove_dir_all(&root); 1739 let handle = RelayRuntimeHandle::new( 1740 RelayRuntime::open(bench_runtime_config(&root)?).map_err(|error| error.to_string())?, 1741 ); 1742 let mut auth = handle 1743 .auth_state() 1744 .await 1745 .map_err(|error| error.to_string())?; 1746 let cases = [ 1747 ("broad-empty-selector-count", json!({"limit": 1})), 1748 ("broad-kind-only-count", json!({"kinds": [1], "limit": 1})), 1749 ( 1750 "broad-high-limit-count", 1751 json!({"kinds": [1], "#h": ["BenchFarm0000"], "limit": 500}), 1752 ), 1753 ]; 1754 let mut samples = Vec::with_capacity(cases.len()); 1755 let mut accepted = 0; 1756 let mut rejected = 0; 1757 for (name, filter) in cases { 1758 let sample = Instant::now(); 1759 let subscription_id = subscription(name)?; 1760 let pocket_filter = parse_pocket_filter_json(filter.to_string().as_bytes()) 1761 .map_err(|error| error.to_string())?; 1762 let replies = handle 1763 .handle_count_pocket( 1764 subscription_id.clone(), 1765 vec![pocket_filter], 1766 &mut auth, 1767 UnixTimestamp::new(1_714_700_000), 1768 ) 1769 .await 1770 .map_err(|error| error.to_string())?; 1771 samples.push(elapsed_micros(sample)); 1772 if replies 1773 == vec![RelayMessage::Closed { 1774 subscription_id, 1775 message: "restricted: count filters are too broad or expensive".to_owned(), 1776 }] 1777 { 1778 accepted += 1; 1779 } else { 1780 rejected += 1; 1781 } 1782 } 1783 let metrics = handle.metrics(); 1784 if metrics.count_refusals() != accepted || metrics.broad_query_rejections() != accepted { 1785 rejected += 1; 1786 } 1787 let _ = fs::remove_dir_all(root); 1788 Ok(CountResourceControlProbe { 1789 accepted, 1790 rejected, 1791 samples, 1792 }) 1793 } 1794 1795 fn materialize_dataset( 1796 dataset: &BenchDataset, 1797 run_name: &str, 1798 max_pending_events: usize, 1799 ) -> Result<MaterializedBenchRelay, String> { 1800 let store_config = bench_store_config(run_name)?; 1801 let relay = BaseRelay::open_with_groups( 1802 &store_config, 1803 relay_limits(max_pending_events), 1804 &group_config()?, 1805 PocketQueryConfig::default(), 1806 ) 1807 .map_err(|error| error.to_string())?; 1808 let owner_auth = authenticated(FixtureKey::Owner)?; 1809 let admin_auth = authenticated(FixtureKey::Admin)?; 1810 let started = Instant::now(); 1811 let mut samples = Vec::with_capacity(dataset.source_events().len()); 1812 let mut accepted = 0; 1813 let mut rejected = 0; 1814 for source in dataset.source_events() { 1815 let sample = Instant::now(); 1816 let pocket_event = 1817 parse_pocket_event_json(event_to_value(source.event()).to_string().as_bytes()) 1818 .map_err(|error| error.to_string())?; 1819 let message = match source.auth() { 1820 BenchEventAuth::None => relay 1821 .handle_pocket_event(&pocket_event) 1822 .map_err(|error| error.to_string())?, 1823 BenchEventAuth::Owner => relay 1824 .handle_pocket_event_with_auth(&pocket_event, &owner_auth) 1825 .map_err(|error| error.to_string())?, 1826 BenchEventAuth::Admin => relay 1827 .handle_pocket_event_with_auth(&pocket_event, &admin_auth) 1828 .map_err(|error| error.to_string())?, 1829 }; 1830 samples.push(elapsed_micros(sample)); 1831 if ok_accepted(&message) { 1832 accepted += 1; 1833 } else { 1834 rejected += 1; 1835 } 1836 } 1837 let attempted = accepted + rejected; 1838 let ingest_report = ScenarioReport::new( 1839 "dataset_ingest", 1840 attempted, 1841 accepted, 1842 rejected, 1843 elapsed_micros(started), 1844 samples, 1845 estimate_memory_bytes(dataset), 1846 ); 1847 Ok(MaterializedBenchRelay { 1848 relay, 1849 store_config, 1850 ingest_report, 1851 }) 1852 } 1853 1854 fn relay_limits(max_pending_events: usize) -> BaseRelayLimits { 1855 relay_limits_with_subscriptions(max_pending_events, 512) 1856 } 1857 1858 fn relay_limits_with_subscriptions( 1859 max_pending_events: usize, 1860 max_subscriptions: usize, 1861 ) -> BaseRelayLimits { 1862 BaseRelayLimits::new(BaseRelayLimitSettings { 1863 max_pending_events, 1864 max_subscription_id_length: 64, 1865 max_subscriptions, 1866 max_filters_per_request: 10, 1867 max_tag_values_per_filter: 100, 1868 max_query_complexity: 610, 1869 max_event_tags: 200, 1870 max_content_length: 65_536, 1871 max_limit: 500, 1872 default_limit: 100, 1873 }) 1874 .expect("benchmark relay limits") 1875 } 1876 1877 #[derive(Clone)] 1878 struct QueryOperation { 1879 name: &'static str, 1880 filter: Filter, 1881 auth: QueryAuth, 1882 expectation: QueryExpectation, 1883 } 1884 1885 impl QueryOperation { 1886 fn new( 1887 name: &'static str, 1888 filter: Filter, 1889 auth: QueryAuth, 1890 expectation: QueryExpectation, 1891 ) -> Self { 1892 Self { 1893 name, 1894 filter, 1895 auth, 1896 expectation, 1897 } 1898 } 1899 } 1900 1901 #[derive(Clone, Copy)] 1902 enum QueryAuth { 1903 None, 1904 Owner, 1905 } 1906 1907 #[derive(Clone, Copy)] 1908 enum QueryExpectation { 1909 Exactly(u64), 1910 AtLeast(u64), 1911 } 1912 1913 impl QueryExpectation { 1914 fn matches(self, observed: u64) -> bool { 1915 match self { 1916 Self::Exactly(expected) => observed == expected, 1917 Self::AtLeast(expected) => observed >= expected, 1918 } 1919 } 1920 } 1921 1922 fn query_for_operation( 1923 relay: &mut BaseRelay, 1924 operation: &QueryOperation, 1925 owner_auth: &BaseAuthState, 1926 ) -> Result<u64, String> { 1927 let subscription_id = subscription(operation.name)?; 1928 let messages = match operation.auth { 1929 QueryAuth::None => relay 1930 .handle_pocket_req( 1931 subscription_id.clone(), 1932 vec![pocket_filter(&operation.filter)?], 1933 ) 1934 .map_err(|error| error.to_string())?, 1935 QueryAuth::Owner => relay 1936 .handle_pocket_req_with_auth( 1937 subscription_id.clone(), 1938 vec![pocket_filter(&operation.filter)?], 1939 owner_auth, 1940 ) 1941 .map_err(|error| error.to_string())?, 1942 }; 1943 relay.handle_close(&subscription_id); 1944 Ok(messages 1945 .iter() 1946 .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) 1947 .count() 1948 .try_into() 1949 .expect("message count fits in u64")) 1950 } 1951 1952 fn count_for_operation( 1953 relay: &BaseRelay, 1954 operation: &QueryOperation, 1955 owner_auth: &BaseAuthState, 1956 ) -> Result<u64, String> { 1957 let subscription_id = subscription(operation.name)?; 1958 let filter = pocket_filter(&operation.filter)?; 1959 let message = match operation.auth { 1960 QueryAuth::None => relay 1961 .handle_count(subscription_id, vec![filter]) 1962 .map_err(|error| error.to_string())?, 1963 QueryAuth::Owner => relay 1964 .handle_count_with_auth(subscription_id, vec![filter], owner_auth) 1965 .map_err(|error| error.to_string())?, 1966 }; 1967 match message { 1968 RelayMessage::Count { count, .. } => Ok(count), 1969 value => Err(format!("expected COUNT message, got {value:?}")), 1970 } 1971 } 1972 1973 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 1974 struct GeneratedStateCounts { 1975 metadata: u64, 1976 admins: u64, 1977 members: u64, 1978 } 1979 1980 fn generated_state_counts(relay: &BaseRelay) -> Result<GeneratedStateCounts, String> { 1981 Ok(GeneratedStateCounts { 1982 metadata: count_kind(relay, KIND_GROUP_METADATA)?, 1983 admins: count_kind(relay, KIND_GROUP_ADMINS)?, 1984 members: count_kind(relay, KIND_GROUP_MEMBERS)?, 1985 }) 1986 } 1987 1988 fn count_kind(relay: &BaseRelay, kind: u32) -> Result<u64, String> { 1989 let owner_auth = authenticated(FixtureKey::Owner)?; 1990 let message = relay 1991 .handle_count_with_auth( 1992 subscription(&format!("count-{kind}"))?, 1993 vec![pocket_filter_from_value(&json!({"kinds": [kind]}))?], 1994 &owner_auth, 1995 ) 1996 .map_err(|error| error.to_string())?; 1997 match message { 1998 RelayMessage::Count { count, .. } => Ok(count), 1999 value => Err(format!("expected COUNT message, got {value:?}")), 2000 } 2001 } 2002 2003 fn pocket_filter(filter: &Filter) -> Result<PocketOwnedFilter, String> { 2004 let raw = serde_json::to_vec(&filter_to_value(filter)).map_err(|error| error.to_string())?; 2005 parse_pocket_filter_json(&raw).map_err(|error| error.to_string()) 2006 } 2007 2008 fn pocket_event(event: &Event) -> Result<PocketOwnedEvent, String> { 2009 let raw = serde_json::to_vec(&event_to_value(event)).map_err(|error| error.to_string())?; 2010 parse_pocket_event_json(&raw).map_err(|error| error.to_string()) 2011 } 2012 2013 fn pocket_protocol_event( 2014 key: FixtureKey, 2015 created_at: u64, 2016 kind: u64, 2017 tags: Vec<Tag>, 2018 content: &str, 2019 ) -> Result<Event, String> { 2020 let tags = pocket_tags_from_protocol(&tags)?; 2021 let pocket = signed_pocket_event( 2022 fixture_secret_byte(key), 2023 created_at, 2024 u16::try_from(kind).map_err(|error| error.to_string())?, 2025 &tags, 2026 content.as_bytes(), 2027 )?; 2028 pocket_event_to_protocol(&pocket) 2029 } 2030 2031 fn pocket_protocol_auth_event( 2032 key: FixtureKey, 2033 challenge: &str, 2034 created_at: u64, 2035 ) -> Result<Event, String> { 2036 pocket_protocol_event( 2037 key, 2038 created_at, 2039 22_242, 2040 vec![ 2041 Tag::from_parts("relay", &[TANGLE_V2_RELAY_URL]).map_err(|error| error.to_string())?, 2042 Tag::from_parts("challenge", &[challenge]).map_err(|error| error.to_string())?, 2043 ], 2044 "", 2045 ) 2046 } 2047 2048 fn pocket_protocol_group_create_event( 2049 key: FixtureKey, 2050 group_id: &str, 2051 created_at: u64, 2052 flags: &[&str], 2053 ) -> Result<Event, String> { 2054 let mut tags = vec![ 2055 Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?, 2056 Tag::from_parts("name", &[group_id]).map_err(|error| error.to_string())?, 2057 ]; 2058 for flag in flags { 2059 tags.push(Tag::from_parts(flag, &[]).map_err(|error| error.to_string())?); 2060 } 2061 pocket_protocol_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 2062 } 2063 2064 fn pocket_protocol_put_user_event( 2065 key: FixtureKey, 2066 group_id: &str, 2067 target: FixtureKey, 2068 created_at: u64, 2069 ) -> Result<Event, String> { 2070 let target_pubkey = target.public_key(); 2071 pocket_protocol_put_pubkey_event(key, group_id, target_pubkey.as_str(), created_at) 2072 } 2073 2074 fn pocket_protocol_put_pubkey_event( 2075 key: FixtureKey, 2076 group_id: &str, 2077 target_pubkey: &str, 2078 created_at: u64, 2079 ) -> Result<Event, String> { 2080 pocket_protocol_event( 2081 key, 2082 created_at, 2083 KIND_GROUP_PUT_USER.into(), 2084 vec![ 2085 Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?, 2086 Tag::from_parts("p", &[target_pubkey]).map_err(|error| error.to_string())?, 2087 ], 2088 "", 2089 ) 2090 } 2091 2092 fn pocket_protocol_group_event( 2093 key: FixtureKey, 2094 group_id: &str, 2095 created_at: u64, 2096 kind: u64, 2097 content: &str, 2098 ) -> Result<Event, String> { 2099 pocket_protocol_event( 2100 key, 2101 created_at, 2102 kind, 2103 vec![Tag::from_parts("h", &[group_id]).map_err(|error| error.to_string())?], 2104 content, 2105 ) 2106 } 2107 2108 fn signed_pocket_event( 2109 secret_byte: u8, 2110 created_at: u64, 2111 kind: u16, 2112 tags: &PocketOwnedTags, 2113 content: &[u8], 2114 ) -> Result<PocketOwnedEvent, String> { 2115 let secp = Secp256k1::new(); 2116 let secret_key = 2117 SecretKey::from_byte_array([secret_byte; 32]).map_err(|error| error.to_string())?; 2118 let keypair = Keypair::from_secret_key(&secp, &secret_key); 2119 let (xonlypubkey, _) = keypair.x_only_public_key(); 2120 let pubkey_bytes = xonlypubkey.serialize(); 2121 let pubkey = PocketPubkey::from_bytes(pubkey_bytes); 2122 let pocket_kind = PocketKind::from_u16(kind); 2123 let pocket_time = PocketTime::from_u64(created_at); 2124 let escaped_content = json_escape(content, Vec::with_capacity(content.len() * 7 / 6)) 2125 .map_err(|error| error.to_string())?; 2126 let escaped_content = 2127 std::str::from_utf8(&escaped_content).map_err(|error| error.to_string())?; 2128 let tags_json = tags.as_json(); 2129 let tags_json = std::str::from_utf8(&tags_json).map_err(|error| error.to_string())?; 2130 let signable = format!( 2131 r#"[0,"{}",{},{},{},"{}"]"#, 2132 pubkey, pocket_time, pocket_kind, tags_json, escaped_content 2133 ); 2134 let digest = Sha256::digest(signable.as_bytes()); 2135 let digest_slice: &[u8] = digest.as_ref(); 2136 let event_id: [u8; 32] = digest_slice 2137 .try_into() 2138 .expect("sha256 digest length is 32 bytes"); 2139 let signature = secp.sign_schnorr_no_aux_rand(&event_id, &keypair); 2140 PocketOwnedEvent::new( 2141 PocketEventId::from_bytes(event_id), 2142 pocket_kind, 2143 pubkey, 2144 PocketSig::from_bytes(signature.to_byte_array()), 2145 tags, 2146 pocket_time, 2147 content, 2148 ) 2149 .map_err(|error| error.to_string()) 2150 } 2151 2152 fn pocket_tags_from_protocol(tags: &[Tag]) -> Result<PocketOwnedTags, String> { 2153 let parts = tags 2154 .iter() 2155 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 2156 .collect::<Vec<_>>(); 2157 PocketOwnedTags::new(&parts).map_err(|error| error.to_string()) 2158 } 2159 2160 fn pocket_event_to_protocol(event: &PocketOwnedEvent) -> Result<Event, String> { 2161 let tags = event 2162 .tags() 2163 .map_err(|error| error.to_string())? 2164 .iter() 2165 .map(|tag| { 2166 Tag::new( 2167 tag.map(|value| { 2168 std::str::from_utf8(value) 2169 .map(str::to_owned) 2170 .map_err(|error| error.to_string()) 2171 }) 2172 .collect::<Result<Vec<_>, _>>()?, 2173 ) 2174 .map_err(|error| error.to_string()) 2175 }) 2176 .collect::<Result<Vec<_>, _>>()?; 2177 Ok(Event::new( 2178 EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?, 2179 UnsignedEvent::new( 2180 PublicKeyHex::new(&event.pubkey().as_hex_string()) 2181 .map_err(|error| error.to_string())?, 2182 UnixTimestamp::new(event.created_at().as_u64()), 2183 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?, 2184 tags, 2185 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?, 2186 ), 2187 SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?, 2188 )) 2189 } 2190 2191 fn fixture_secret_byte(key: FixtureKey) -> u8 { 2192 match key { 2193 FixtureKey::Relay => 9, 2194 FixtureKey::Owner => 10, 2195 FixtureKey::Admin => 11, 2196 FixtureKey::Member => 12, 2197 FixtureKey::Outsider => 13, 2198 } 2199 } 2200 2201 fn pocket_filter_from_value(value: &serde_json::Value) -> Result<PocketOwnedFilter, String> { 2202 let filter = filter_from_value(value)?; 2203 pocket_filter(&filter) 2204 } 2205 2206 fn validation_summary( 2207 scenarios: &[ScenarioReport], 2208 thresholds: BenchmarkThresholds, 2209 ) -> Result<BTreeMap<String, String>, String> { 2210 let mut summary = BTreeMap::new(); 2211 let mut failures = Vec::new(); 2212 if let Some(failure) = record_threshold_status( 2213 &mut summary, 2214 SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, 2215 scenario(scenarios, SCENARIO_POCKET_QUERY_VISIBLE_EVENTS)? 2216 .pass_latency_gate(thresholds.pocket_query_p95_micros), 2217 ) { 2218 failures.push(failure); 2219 } 2220 if let Some(failure) = record_threshold_status( 2221 &mut summary, 2222 SCENARIO_GROUP_READ_GATE_OVERHEAD, 2223 scenario(scenarios, SCENARIO_GROUP_READ_GATE_OVERHEAD)? 2224 .pass_latency_gate(thresholds.read_gate_p95_micros), 2225 ) { 2226 failures.push(failure); 2227 } 2228 if let Some(failure) = record_threshold_status( 2229 &mut summary, 2230 SCENARIO_COUNT_RESOURCE_CONTROLS, 2231 scenario(scenarios, SCENARIO_COUNT_RESOURCE_CONTROLS)? 2232 .pass_latency_gate(thresholds.count_resource_controls_p95_micros), 2233 ) { 2234 failures.push(failure); 2235 } 2236 if let Some(failure) = record_threshold_status( 2237 &mut summary, 2238 SCENARIO_PROJECTION_REBUILD, 2239 scenario(scenarios, SCENARIO_PROJECTION_REBUILD)? 2240 .pass_elapsed_gate(thresholds.projection_rebuild_elapsed_micros), 2241 ) { 2242 failures.push(failure); 2243 } 2244 if let Some(failure) = record_threshold_status( 2245 &mut summary, 2246 SCENARIO_OUTBOX_REPLAY, 2247 scenario(scenarios, SCENARIO_OUTBOX_REPLAY)? 2248 .pass_elapsed_gate(thresholds.outbox_replay_elapsed_micros), 2249 ) { 2250 failures.push(failure); 2251 } 2252 if let Some(failure) = record_threshold_status( 2253 &mut summary, 2254 SCENARIO_BROADCAST_LAG, 2255 scenario(scenarios, SCENARIO_BROADCAST_LAG)? 2256 .pass_latency_gate(thresholds.broadcast_lag_p95_micros), 2257 ) { 2258 failures.push(failure); 2259 } 2260 if let Some(failure) = record_threshold_status( 2261 &mut summary, 2262 SCENARIO_MEMORY_PROFILE, 2263 scenario(scenarios, SCENARIO_MEMORY_PROFILE)? 2264 .pass_memory_gate(thresholds.memory_profile_max_bytes), 2265 ) { 2266 failures.push(failure); 2267 } 2268 for name in [ 2269 SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT, 2270 SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT, 2271 SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT, 2272 ] { 2273 let Some(report) = scenarios 2274 .iter() 2275 .find(|scenario| scenario.scenario.as_str() == name) 2276 else { 2277 continue; 2278 }; 2279 if let Some(failure) = record_threshold_status( 2280 &mut summary, 2281 name, 2282 report.pass_latency_gate(thresholds.broadcast_lag_p95_micros), 2283 ) { 2284 failures.push(failure); 2285 } 2286 } 2287 if failures.is_empty() { 2288 Ok(summary) 2289 } else { 2290 Err(failures.join("; ")) 2291 } 2292 } 2293 2294 fn record_threshold_status( 2295 summary: &mut BTreeMap<String, String>, 2296 name: &str, 2297 passed: bool, 2298 ) -> Option<String> { 2299 summary.insert(name.to_owned(), status(passed)); 2300 if passed { 2301 None 2302 } else { 2303 Some(format!("scenario `{name}` failed benchmark threshold")) 2304 } 2305 } 2306 2307 fn scenario<'a>(scenarios: &'a [ScenarioReport], name: &str) -> Result<&'a ScenarioReport, String> { 2308 scenarios 2309 .iter() 2310 .find(|scenario| scenario.scenario == name) 2311 .ok_or_else(|| format!("scenario `{name}` was not run")) 2312 } 2313 2314 fn status(value: bool) -> String { 2315 if value { "pass" } else { "fail" }.to_owned() 2316 } 2317 2318 fn bench_member_event( 2319 group_id: &str, 2320 group_index: usize, 2321 member_index: usize, 2322 base_created_at: u64, 2323 ) -> Result<Event, String> { 2324 if member_index == 0 { 2325 return pocket_protocol_put_user_event( 2326 FixtureKey::Admin, 2327 group_id, 2328 FixtureKey::Member, 2329 base_created_at + u64::try_from(group_index * 10_000).expect("group index fits in u64"), 2330 ); 2331 } 2332 let pubkey = synthetic_member_pubkey(group_index, member_index); 2333 pocket_protocol_put_pubkey_event( 2334 FixtureKey::Admin, 2335 group_id, 2336 &pubkey, 2337 base_created_at 2338 + u64::try_from(group_index * 10_000 + member_index).expect("member index fits in u64"), 2339 ) 2340 } 2341 2342 fn synthetic_member_pubkey(group_index: usize, member_index: usize) -> String { 2343 format!( 2344 "{:064x}", 2345 0x100000_u128 + (group_index as u128 * 10_000) + member_index as u128 2346 ) 2347 } 2348 2349 fn group_visibility(index: usize) -> BenchGroupVisibility { 2350 match index % 3 { 2351 0 => BenchGroupVisibility::Public, 2352 1 => BenchGroupVisibility::Private, 2353 _ => BenchGroupVisibility::Hidden, 2354 } 2355 } 2356 2357 fn event_has_group(event: &Event, group_id: &str) -> bool { 2358 event.unsigned().tags().iter().any(|tag| { 2359 tag.indexed_pair() 2360 .is_some_and(|(name, value)| name == "h" && value == group_id) 2361 }) 2362 } 2363 2364 fn group_config() -> Result<tangle_groups::GroupRuntimeConfig, String> { 2365 tangle_v2_group_config(FixtureKey::Owner, &[FixtureKey::Admin]) 2366 } 2367 2368 fn authenticated(key: FixtureKey) -> Result<BaseAuthState, String> { 2369 let mut auth = 2370 BaseAuthState::new(TANGLE_V2_RELAY_URL, 60, 600).map_err(|error| error.to_string())?; 2371 auth.issue_challenge("challenge-a", tangle_protocol::UnixTimestamp::new(100)) 2372 .map_err(|error| error.to_string())?; 2373 let event = pocket_protocol_auth_event(key, "challenge-a", 120)?; 2374 let pocket = pocket_event(&event)?; 2375 auth.authenticate_pocket(&pocket, tangle_protocol::UnixTimestamp::new(120)) 2376 .map_err(|error| error.to_string())?; 2377 Ok(auth) 2378 } 2379 2380 fn bench_store_config(run_name: &str) -> Result<PocketStoreConfig, String> { 2381 let root = bench_temp_root(run_name); 2382 let _ = fs::remove_dir_all(&root); 2383 PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 2384 .map_err(|error| error.to_string()) 2385 } 2386 2387 fn bench_runtime_config(root: &Path) -> Result<BaseRelayRuntimeConfig, String> { 2388 let raw = json!({ 2389 "server": { 2390 "listen_addr": "127.0.0.1:0", 2391 "relay_url": TANGLE_V2_RELAY_URL 2392 }, 2393 "pocket": { 2394 "data_directory": root.join("pocket"), 2395 "sync_policy": "flush_on_shutdown", 2396 "query": { 2397 "allow_scraping": false, 2398 "allow_scrape_if_limited_to": 100, 2399 "allow_scrape_if_max_seconds": 3600 2400 } 2401 }, 2402 "groups": { 2403 "enabled": true, 2404 "canonical_relay_url": TANGLE_V2_RELAY_URL, 2405 "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", 2406 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] 2407 }, 2408 "auth": { 2409 "challenge_ttl_seconds": 300, 2410 "created_at_skew_seconds": 600 2411 }, 2412 "limits": { 2413 "max_message_length": 1048576, 2414 "max_subid_length": 64, 2415 "max_subscriptions_per_connection": 64, 2416 "max_filters_per_request": 10, 2417 "max_tag_values_per_filter": 100, 2418 "max_query_complexity": 2048, 2419 "max_limit": 500, 2420 "default_limit": 100, 2421 "max_event_tags": 200, 2422 "max_content_length": 65536, 2423 "broadcast_channel_capacity": 16, 2424 "per_connection_outbound_queue": 8 2425 }, 2426 "rate_limits": { 2427 "auth": { 2428 "per_ip": {"window_seconds": 60, "max_hits": 120}, 2429 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 2430 "failures": {"window_seconds": 300, "max_hits": 5}, 2431 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 2432 }, 2433 "event": { 2434 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2435 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2436 "per_kind": {"window_seconds": 60, "max_hits": 1000} 2437 }, 2438 "group": { 2439 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 2440 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 2441 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 2442 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 2443 "join_flow": {"window_seconds": 300, "max_hits": 10}, 2444 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 2445 }, 2446 "req": { 2447 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2448 "per_connection": {"window_seconds": 60, "max_hits": 120}, 2449 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 2450 "per_group": {"window_seconds": 60, "max_hits": 240}, 2451 "per_kind": {"window_seconds": 60, "max_hits": 500}, 2452 "broad": {"window_seconds": 60, "max_hits": 30} 2453 }, 2454 "count": { 2455 "per_ip": {"window_seconds": 60, "max_hits": 300}, 2456 "per_connection": {"window_seconds": 60, "max_hits": 60}, 2457 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2458 "per_group": {"window_seconds": 60, "max_hits": 120}, 2459 "per_kind": {"window_seconds": 60, "max_hits": 240}, 2460 "broad": {"window_seconds": 60, "max_hits": 20} 2461 } 2462 } 2463 }) 2464 .to_string(); 2465 parse_base_relay_runtime_config_json(&raw).map_err(|error| error.to_string()) 2466 } 2467 2468 fn bench_temp_root(run_name: &str) -> PathBuf { 2469 let id = TEMP_ID.fetch_add(1, Ordering::Relaxed); 2470 std::env::temp_dir().join(format!( 2471 "tangle-bench-{run_name}-{}-{id}", 2472 std::process::id() 2473 )) 2474 } 2475 2476 fn subscription(value: &str) -> Result<SubscriptionId, String> { 2477 SubscriptionId::new(value).map_err(|error| error.to_string()) 2478 } 2479 2480 fn ok_accepted(message: &RelayMessage) -> bool { 2481 matches!(message, RelayMessage::Ok { accepted: true, .. }) 2482 } 2483 2484 fn estimate_memory_bytes(dataset: &BenchDataset) -> u64 { 2485 let event_bytes = dataset 2486 .source_events() 2487 .iter() 2488 .map(|source| { 2489 serde_json::to_string(&event_to_value(source.event())) 2490 .unwrap_or_default() 2491 .len() 2492 }) 2493 .sum::<usize>(); 2494 let projection_bytes = dataset.groups().len() * 512 2495 + usize::try_from(dataset.membership_event_count()).expect("member count fits in usize") 2496 * 192; 2497 (event_bytes + projection_bytes) 2498 .try_into() 2499 .expect("estimated memory fits in u64") 2500 } 2501 2502 fn estimate_virtual_relay_memory_bytes(config: VirtualRelayTenancyConfig) -> u64 { 2503 let subscription_bytes = config.aggregate_active_subscriptions() * 512; 2504 let tenant_bytes = config.tenant_count * 1024 * 1024; 2505 (subscription_bytes + tenant_bytes) 2506 .try_into() 2507 .expect("estimated virtual relay memory fits in u64") 2508 } 2509 2510 fn percentile(samples: &[u64], percentile: u64) -> u64 { 2511 if samples.is_empty() { 2512 return 0; 2513 } 2514 let last = samples.len() - 1; 2515 let index = (last as u64 * percentile).div_ceil(100); 2516 samples[usize::try_from(index).expect("percentile index fits in usize")] 2517 } 2518 2519 fn elapsed_micros(started: Instant) -> u64 { 2520 u64::try_from(started.elapsed().as_micros()) 2521 .unwrap_or(u64::MAX) 2522 .max(1) 2523 } 2524 2525 fn lower_hex(bytes: &[u8]) -> String { 2526 const HEX: &[u8; 16] = b"0123456789abcdef"; 2527 let mut output = String::with_capacity(bytes.len() * 2); 2528 for byte in bytes { 2529 output.push(char::from(HEX[usize::from(byte >> 4)])); 2530 output.push(char::from(HEX[usize::from(byte & 0x0f)])); 2531 } 2532 output 2533 } 2534 2535 #[cfg(test)] 2536 mod tests { 2537 use super::{ 2538 BenchDataset, BenchDatasetConfig, BenchGroupVisibility, BenchmarkProfile, 2539 BenchmarkProfileName, BenchmarkRunReport, BenchmarkThresholds, POCKET_SOURCE_REPOSITORY, 2540 POCKET_SOURCE_REVISION, SCENARIO_BROADCAST_LAG, SCENARIO_COUNT_RESOURCE_CONTROLS, 2541 SCENARIO_GROUP_READ_GATE_OVERHEAD, SCENARIO_MEMORY_PROFILE, SCENARIO_OUTBOX_REPLAY, 2542 SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, SCENARIO_PROJECTION_REBUILD, 2543 SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT, SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT, 2544 SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT, ScenarioReport, VirtualRelayTenancyConfig, 2545 generated_state_counts, materialize_dataset, 2546 }; 2547 use std::collections::BTreeSet; 2548 use tangle_groups::{GroupId, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA}; 2549 2550 #[test] 2551 fn deterministic_dataset_generator_produces_stable_group_events() { 2552 let first = 2553 BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("first dataset"); 2554 let second = 2555 BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("second dataset"); 2556 let first_ids = first 2557 .source_events() 2558 .into_iter() 2559 .map(|source| source.event().id().as_str().to_owned()) 2560 .collect::<Vec<_>>(); 2561 let second_ids = second 2562 .source_events() 2563 .into_iter() 2564 .map(|source| source.event().id().as_str().to_owned()) 2565 .collect::<Vec<_>>(); 2566 let unique_ids = first_ids.iter().cloned().collect::<BTreeSet<_>>(); 2567 2568 assert_eq!(first_ids, second_ids); 2569 assert_eq!(first.groups().len(), 3); 2570 assert_eq!(first.source_event_count(), 17); 2571 assert_eq!(first.group_event_count(), 15); 2572 assert_eq!(first.membership_event_count(), 6); 2573 assert_eq!(unique_ids.len(), first_ids.len()); 2574 assert_eq!( 2575 first 2576 .groups() 2577 .iter() 2578 .map(|group| group.visibility()) 2579 .collect::<Vec<_>>(), 2580 vec![ 2581 BenchGroupVisibility::Public, 2582 BenchGroupVisibility::Private, 2583 BenchGroupVisibility::Hidden 2584 ] 2585 ); 2586 assert_eq!( 2587 first.dataset_digest().expect("first digest"), 2588 second.dataset_digest().expect("second digest") 2589 ); 2590 assert_eq!( 2591 first.source_events_jsonl().expect("jsonl").lines().count(), 2592 usize::try_from(first.source_event_count()).expect("count fits") 2593 ); 2594 } 2595 2596 #[test] 2597 fn dataset_config_rejects_benchmark_shapes_without_privacy_coverage() { 2598 assert!(BenchDataset::generate(BenchDatasetConfig::new(2, 1, 1, 0, 1)).is_err()); 2599 assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 0, 1, 0, 1)).is_err()); 2600 assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 0, 0, 1)).is_err()); 2601 assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 0)).is_err()); 2602 } 2603 2604 #[test] 2605 fn materialized_dataset_populates_generated_group_state() { 2606 let dataset = 2607 BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 1, 1)).expect("dataset"); 2608 let materialized = materialize_dataset(&dataset, "test-generated-state", 16) 2609 .expect("materialized dataset"); 2610 let counts = generated_state_counts(&materialized.relay).expect("state counts"); 2611 2612 assert_eq!(counts.metadata, 3); 2613 assert_eq!(counts.admins, 3); 2614 assert_eq!(counts.members, 3); 2615 assert_eq!( 2616 super::count_kind(&materialized.relay, KIND_GROUP_METADATA).expect("metadata"), 2617 3 2618 ); 2619 assert_eq!( 2620 super::count_kind(&materialized.relay, KIND_GROUP_ADMINS).expect("admins"), 2621 3 2622 ); 2623 assert_eq!( 2624 super::count_kind(&materialized.relay, KIND_GROUP_MEMBERS).expect("members"), 2625 3 2626 ); 2627 } 2628 2629 #[test] 2630 fn benchmark_suite_runs_all_required_v2_scenarios() { 2631 let report = BenchmarkRunReport::run(smoke_profile(BenchDatasetConfig::new(3, 1, 1, 2, 1))) 2632 .expect("report"); 2633 2634 for name in [ 2635 SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, 2636 SCENARIO_GROUP_READ_GATE_OVERHEAD, 2637 SCENARIO_COUNT_RESOURCE_CONTROLS, 2638 SCENARIO_PROJECTION_REBUILD, 2639 SCENARIO_OUTBOX_REPLAY, 2640 SCENARIO_BROADCAST_LAG, 2641 SCENARIO_MEMORY_PROFILE, 2642 ] { 2643 let scenario = report.scenario(name).expect("scenario"); 2644 assert_eq!(scenario.rejected, 0, "{name} rejected operations"); 2645 assert_eq!(scenario.accepted, scenario.attempted, "{name} acceptance"); 2646 assert!(scenario.elapsed_micros > 0, "{name} elapsed"); 2647 } 2648 assert_eq!(report.dataset_profile().groups, 3); 2649 assert_eq!(report.validation_summary().len(), 7); 2650 assert!( 2651 report 2652 .validation_summary() 2653 .values() 2654 .all(|status| status == "pass") 2655 ); 2656 } 2657 2658 #[test] 2659 fn local_smoke_profiles_run_without_hardware_evidence() { 2660 for profile in [BenchmarkProfile::smoke(), BenchmarkProfile::large_smoke()] { 2661 assert!(!profile.requires_target_hardware_evidence()); 2662 let report = BenchmarkRunReport::run(profile).expect("local profile report"); 2663 assert!( 2664 report 2665 .validation_summary() 2666 .values() 2667 .all(|status| status == "pass") 2668 ); 2669 } 2670 } 2671 2672 #[test] 2673 fn protocol_conversion_for_supported_profile_sizes_is_bounded() { 2674 let dataset = 2675 BenchDataset::generate(BenchDatasetConfig::new(4, 3, 3, 4, 3)).expect("dataset"); 2676 let mut total_event_json_bytes = 0_usize; 2677 for source in dataset.source_events() { 2678 let event_json = tangle_protocol::event_to_value(source.event()).to_string(); 2679 total_event_json_bytes += event_json.len(); 2680 assert!( 2681 tangle_protocol::parse_client_message(&format!("[\"EVENT\",{event_json}]")).is_ok() 2682 ); 2683 } 2684 2685 assert!(total_event_json_bytes < 1_000_000); 2686 } 2687 2688 #[test] 2689 fn benchmark_profiles_are_explicit_and_unknown_profiles_fail_closed() { 2690 assert_eq!( 2691 BenchmarkProfileName::all() 2692 .iter() 2693 .map(|profile| profile.as_str()) 2694 .collect::<Vec<_>>(), 2695 vec![ 2696 "smoke", 2697 "virtual-relay-tenancy", 2698 "medium", 2699 "large-smoke", 2700 "proof-10m", 2701 "proof-large-group", 2702 "proof-join-storm", 2703 "proof-slow-client" 2704 ] 2705 ); 2706 assert_eq!( 2707 BenchmarkProfileName::parse("smoke") 2708 .expect("smoke") 2709 .as_str(), 2710 "smoke" 2711 ); 2712 assert_eq!( 2713 BenchmarkProfileName::parse("virtual-relay-tenancy") 2714 .expect("virtual") 2715 .as_str(), 2716 "virtual-relay-tenancy" 2717 ); 2718 assert_eq!( 2719 BenchmarkProfileName::parse("medium") 2720 .expect("medium") 2721 .as_str(), 2722 "medium" 2723 ); 2724 assert_eq!( 2725 BenchmarkProfileName::parse("large-smoke") 2726 .expect("large-smoke") 2727 .as_str(), 2728 "large-smoke" 2729 ); 2730 assert!(BenchmarkProfileName::parse("production").is_err()); 2731 assert!( 2732 BenchmarkProfileName::parse("local") 2733 .expect_err("unknown") 2734 .contains("unknown benchmark profile") 2735 ); 2736 assert_eq!( 2737 BenchmarkProfile::smoke().dataset_config(), 2738 BenchDatasetConfig::smoke() 2739 ); 2740 assert_eq!( 2741 BenchmarkProfile::virtual_relay_tenancy().dataset_config(), 2742 BenchDatasetConfig::smoke() 2743 ); 2744 assert_eq!( 2745 BenchmarkProfile::medium().dataset_config(), 2746 BenchDatasetConfig::medium() 2747 ); 2748 assert_eq!( 2749 BenchmarkProfile::large_smoke().dataset_config(), 2750 BenchDatasetConfig::large_smoke() 2751 ); 2752 assert_eq!( 2753 BenchmarkProfile::proof_10m().dataset_config(), 2754 BenchDatasetConfig::proof_10m() 2755 ); 2756 assert_eq!( 2757 BenchmarkProfile::proof_large_group().dataset_config(), 2758 BenchDatasetConfig::proof_large_group() 2759 ); 2760 assert_eq!( 2761 BenchmarkProfile::proof_join_storm().dataset_config(), 2762 BenchDatasetConfig::proof_join_storm() 2763 ); 2764 assert_eq!( 2765 BenchmarkProfile::proof_slow_client().dataset_config(), 2766 BenchDatasetConfig::proof_slow_client() 2767 ); 2768 } 2769 2770 #[test] 2771 fn proof_profile_dataset_definitions_are_hardware_scale_without_materialization() { 2772 assert_eq!( 2773 BenchDatasetConfig::proof_10m().estimated_source_event_count(), 2774 10_000_000 2775 ); 2776 assert_eq!( 2777 BenchDatasetConfig::proof_large_group().member_count, 2778 100_000 2779 ); 2780 assert_eq!( 2781 BenchDatasetConfig::proof_join_storm().group_count 2782 * BenchDatasetConfig::proof_join_storm().member_count, 2783 1_000_000 2784 ); 2785 assert_eq!(BenchDatasetConfig::proof_slow_client().group_count, 50_000); 2786 for profile in [ 2787 BenchmarkProfile::proof_10m(), 2788 BenchmarkProfile::proof_large_group(), 2789 BenchmarkProfile::proof_join_storm(), 2790 BenchmarkProfile::proof_slow_client(), 2791 ] { 2792 assert!(profile.requires_target_hardware_evidence()); 2793 assert!(profile.validate_for_run().is_err()); 2794 assert!(!profile.proof_claim_eligible()); 2795 assert!(profile.dataset_config().validate().is_ok()); 2796 } 2797 } 2798 2799 #[test] 2800 fn benchmark_threshold_json_rejects_missing_unknown_or_zero_fields() { 2801 let valid = BenchmarkThresholds::from_json_value(&BenchmarkThresholds::smoke().to_json()) 2802 .expect("valid thresholds"); 2803 assert_eq!(valid, BenchmarkThresholds::smoke()); 2804 2805 let missing = serde_json::json!({ 2806 "pocket_query_p95_micros": 1, 2807 "read_gate_p95_micros": 1, 2808 "count_resource_controls_p95_micros": 1, 2809 "projection_rebuild_elapsed_micros": 1, 2810 "outbox_replay_elapsed_micros": 1, 2811 "broadcast_lag_p95_micros": 1 2812 }); 2813 assert!( 2814 BenchmarkThresholds::from_json_value(&missing) 2815 .expect_err("missing") 2816 .contains("memory_profile_max_bytes") 2817 ); 2818 2819 let unknown = serde_json::json!({ 2820 "pocket_query_p95_micros": 1, 2821 "read_gate_p95_micros": 1, 2822 "count_resource_controls_p95_micros": 1, 2823 "projection_rebuild_elapsed_micros": 1, 2824 "outbox_replay_elapsed_micros": 1, 2825 "broadcast_lag_p95_micros": 1, 2826 "memory_profile_max_bytes": 1, 2827 "extra": 1 2828 }); 2829 assert!( 2830 BenchmarkThresholds::from_json_value(&unknown) 2831 .expect_err("unknown") 2832 .contains("unknown benchmark threshold field") 2833 ); 2834 2835 let zero = serde_json::json!({ 2836 "pocket_query_p95_micros": 0, 2837 "read_gate_p95_micros": 1, 2838 "count_resource_controls_p95_micros": 1, 2839 "projection_rebuild_elapsed_micros": 1, 2840 "outbox_replay_elapsed_micros": 1, 2841 "broadcast_lag_p95_micros": 1, 2842 "memory_profile_max_bytes": 1 2843 }); 2844 assert!( 2845 BenchmarkThresholds::from_json_value(&zero) 2846 .expect_err("zero") 2847 .contains("greater than zero") 2848 ); 2849 } 2850 2851 #[test] 2852 fn proof_claim_eligibility_requires_manual_proof_profile() { 2853 assert!(!BenchmarkProfile::smoke().proof_claim_eligible()); 2854 assert!( 2855 !BenchmarkProfile::smoke() 2856 .with_target_hardware_evidence("target-hardware:ci") 2857 .expect("evidence") 2858 .proof_claim_eligible() 2859 ); 2860 assert!(!BenchmarkProfile::large_smoke().proof_claim_eligible()); 2861 assert!( 2862 !BenchmarkProfile::large_smoke() 2863 .with_target_hardware_evidence("target-hardware:bench-node-001") 2864 .expect("evidence") 2865 .proof_claim_eligible() 2866 ); 2867 assert!(!BenchmarkProfile::proof_10m().proof_claim_eligible()); 2868 assert!( 2869 BenchmarkProfile::proof_10m() 2870 .with_target_hardware_evidence("target-hardware:proof-node-001") 2871 .expect("evidence") 2872 .proof_claim_eligible() 2873 ); 2874 } 2875 2876 #[test] 2877 fn proof_profile_runs_fail_closed_without_hardware_evidence() { 2878 for profile in [ 2879 BenchmarkProfile::proof_10m(), 2880 BenchmarkProfile::proof_large_group(), 2881 BenchmarkProfile::proof_join_storm(), 2882 BenchmarkProfile::proof_slow_client(), 2883 ] { 2884 let error = 2885 BenchmarkRunReport::run(profile).expect_err("proof profile requires evidence"); 2886 2887 assert!(error.contains("target hardware evidence is required")); 2888 } 2889 } 2890 2891 #[test] 2892 fn benchmark_threshold_validation_rejects_missing_or_failed_scenarios() { 2893 let scenarios = vec![ 2894 passing_scenario(SCENARIO_POCKET_QUERY_VISIBLE_EVENTS), 2895 ScenarioReport::new( 2896 SCENARIO_GROUP_READ_GATE_OVERHEAD, 2897 1, 2898 1, 2899 0, 2900 10, 2901 vec![BenchmarkThresholds::smoke().read_gate_p95_micros + 1], 2902 128, 2903 ), 2904 passing_scenario(SCENARIO_COUNT_RESOURCE_CONTROLS), 2905 passing_scenario(SCENARIO_PROJECTION_REBUILD), 2906 passing_scenario(SCENARIO_OUTBOX_REPLAY), 2907 passing_scenario(SCENARIO_BROADCAST_LAG), 2908 passing_scenario(SCENARIO_MEMORY_PROFILE), 2909 ]; 2910 let failed = 2911 super::validation_summary(&scenarios, BenchmarkThresholds::smoke()).expect_err("fail"); 2912 assert!(failed.contains(SCENARIO_GROUP_READ_GATE_OVERHEAD)); 2913 2914 let missing = super::validation_summary( 2915 &scenarios[..scenarios.len() - 1], 2916 BenchmarkThresholds::smoke(), 2917 ) 2918 .expect_err("missing"); 2919 assert!(missing.contains(SCENARIO_MEMORY_PROFILE)); 2920 } 2921 2922 #[test] 2923 fn benchmark_summary_json_matches_report_template_surface() { 2924 let report = BenchmarkRunReport::run(smoke_profile(BenchDatasetConfig::new(3, 1, 1, 1, 1))) 2925 .expect("report"); 2926 let summary = report.summary_json("unit-run", std::path::Path::new(".local/unit")); 2927 2928 assert_eq!(summary["schema"], 2); 2929 assert_eq!(summary["run_id"], "unit-run"); 2930 assert_eq!(summary["profile"], "smoke"); 2931 assert_eq!(summary["threshold_source"], "builtin:smoke"); 2932 assert_eq!( 2933 summary["pocket_source"]["repository"], 2934 POCKET_SOURCE_REPOSITORY 2935 ); 2936 assert_eq!(summary["pocket_source"]["revision"], POCKET_SOURCE_REVISION); 2937 assert_eq!(summary["proof_claim"]["eligible"], false); 2938 assert_eq!(summary["proof_claim"]["target_hardware_evidence"], "absent"); 2939 assert_eq!( 2940 summary["dataset"]["fixture_family"], 2941 "synthetic repo-owned fixtures" 2942 ); 2943 assert_eq!( 2944 summary["dataset_profile"]["fixture_family"], 2945 "synthetic repo-owned fixtures" 2946 ); 2947 assert_eq!(summary["scenarios"].as_array().expect("scenarios").len(), 7); 2948 let first_scenario = &summary["scenarios"] 2949 .as_array() 2950 .expect("scenarios") 2951 .first() 2952 .expect("first scenario"); 2953 assert_eq!(first_scenario["status"], "pass"); 2954 assert!(first_scenario["p50_micros"].as_u64().expect("p50") > 0); 2955 assert!(first_scenario["p95_micros"].as_u64().expect("p95") > 0); 2956 assert!(first_scenario["p99_micros"].as_u64().expect("p99") > 0); 2957 assert!( 2958 first_scenario["query_metrics"]["candidates_scanned"] 2959 .as_u64() 2960 .expect("candidates") 2961 > 0 2962 ); 2963 assert!( 2964 first_scenario["query_metrics"]["events_returned"] 2965 .as_u64() 2966 .expect("returned") 2967 > 0 2968 ); 2969 assert!( 2970 first_scenario["memory"]["max_rss_bytes"] 2971 .as_u64() 2972 .expect("memory") 2973 > 0 2974 ); 2975 assert_eq!( 2976 summary["validation_summary"][SCENARIO_POCKET_QUERY_VISIBLE_EVENTS], 2977 "pass" 2978 ); 2979 assert_eq!(summary["pass_fail_summary"]["overall_status"], "pass"); 2980 assert!( 2981 summary["thresholds"]["read_gate_p95_micros"] 2982 .as_u64() 2983 .expect("threshold") 2984 > 0 2985 ); 2986 assert_eq!( 2987 summary["artifacts"]["dataset_events_jsonl"], 2988 "dataset-events.jsonl" 2989 ); 2990 assert_eq!( 2991 summary["tangle_v1_mvp"]["benchmark_evidence"]["status"], 2992 "not_run" 2993 ); 2994 assert_eq!( 2995 summary["tangle_v1_mvp"]["benchmark_evidence"]["required_profile"], 2996 "virtual-relay-tenancy" 2997 ); 2998 assert_eq!( 2999 summary["tangle_v1_mvp"]["external_integration"]["runner"], 3000 "radroots_testing_tangle_integration" 3001 ); 3002 assert_eq!( 3003 summary["tangle_v1_mvp"]["external_integration"]["local_status"], 3004 "unavailable" 3005 ); 3006 } 3007 3008 #[test] 3009 fn virtual_relay_tenancy_profile_contract_matches_mvp_target() { 3010 let config = VirtualRelayTenancyConfig::mvp(); 3011 3012 assert_eq!(config.tenant_count, 10); 3013 assert_eq!(config.aggregate_active_subscriptions(), 20_000); 3014 assert_eq!(config.busiest_tenant_active_subscriptions(), 2_000); 3015 assert_eq!(config.one_percent_fanout(), 200); 3016 assert_eq!(config.ten_percent_fanout(), 2_000); 3017 assert!(config.validate().is_ok()); 3018 assert_eq!( 3019 BenchmarkProfile::virtual_relay_tenancy().name(), 3020 BenchmarkProfileName::VirtualRelayTenancy 3021 ); 3022 } 3023 3024 #[test] 3025 fn virtual_relay_tenancy_summary_requires_three_fanout_scenarios() { 3026 let summary = super::virtual_relay_tenancy_summary_json( 3027 BenchmarkProfileName::VirtualRelayTenancy, 3028 &[ 3029 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_1_PERCENT), 3030 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_10_PERCENT), 3031 passing_scenario(SCENARIO_VIRTUAL_RELAY_FANOUT_100_PERCENT), 3032 ], 3033 ); 3034 3035 assert_eq!(summary["status"], "pass"); 3036 assert_eq!(summary["target"]["tenant_count"], 10); 3037 assert_eq!(summary["target"]["aggregate_active_subscriptions"], 20_000); 3038 assert_eq!( 3039 summary["target"]["busiest_tenant_active_subscriptions"], 3040 2_000 3041 ); 3042 assert_eq!( 3043 summary["fanout_scenarios"] 3044 .as_array() 3045 .expect("fanout scenarios") 3046 .len(), 3047 3 3048 ); 3049 } 3050 3051 #[test] 3052 fn count_resource_controls_scenario_accepts_bounded_counts_and_refuses_broad_counts() { 3053 let dataset = 3054 BenchDataset::generate(BenchDatasetConfig::new(3, 101, 101, 0, 1)).expect("dataset"); 3055 let scenario = 3056 super::run_count_resource_control_benchmark(&dataset).expect("count controls"); 3057 3058 assert_eq!(scenario.scenario, SCENARIO_COUNT_RESOURCE_CONTROLS); 3059 assert_eq!(scenario.attempted, 5); 3060 assert_eq!(scenario.accepted, scenario.attempted); 3061 assert_eq!(scenario.rejected, 0); 3062 } 3063 3064 #[test] 3065 fn benchmark_pocket_source_matches_store_boundary() { 3066 assert_eq!( 3067 POCKET_SOURCE_REPOSITORY, 3068 tangle_store_pocket::POCKET_SOURCE_REPOSITORY 3069 ); 3070 assert_eq!( 3071 POCKET_SOURCE_REVISION, 3072 tangle_store_pocket::POCKET_SOURCE_REVISION 3073 ); 3074 } 3075 3076 #[test] 3077 fn projection_rebuild_scenario_recreates_groups_and_members() { 3078 let dataset = 3079 BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 2)).expect("dataset"); 3080 let scenario = super::run_projection_rebuild_benchmark(&dataset).expect("rebuild"); 3081 3082 assert_eq!(scenario.scenario, SCENARIO_PROJECTION_REBUILD); 3083 assert_eq!(scenario.accepted, 1); 3084 assert_eq!(scenario.rejected, 0); 3085 } 3086 3087 #[test] 3088 fn outbox_replay_scenario_keeps_generated_state_idempotent() { 3089 let dataset = 3090 BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); 3091 let scenario = super::run_outbox_replay_benchmark(&dataset).expect("outbox"); 3092 3093 assert_eq!(scenario.scenario, SCENARIO_OUTBOX_REPLAY); 3094 assert_eq!(scenario.accepted, 1); 3095 assert_eq!(scenario.rejected, 0); 3096 } 3097 3098 #[test] 3099 fn broadcast_lag_scenario_keeps_healthy_subscriptions_open() { 3100 let dataset = 3101 BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); 3102 let scenario = super::run_broadcast_lag_benchmark(&dataset).expect("lag"); 3103 3104 assert_eq!(scenario.scenario, SCENARIO_BROADCAST_LAG); 3105 assert_eq!(scenario.accepted, scenario.attempted); 3106 assert_eq!(scenario.rejected, 0); 3107 } 3108 3109 #[test] 3110 fn percentile_helper_handles_empty_and_sorted_samples() { 3111 assert_eq!(super::percentile(&[], 95), 0); 3112 assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 50), 3); 3113 assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 95), 5); 3114 assert_eq!(super::lower_hex(&[0, 15, 16, 255]), "000f10ff"); 3115 } 3116 3117 #[test] 3118 fn group_id_helper_accepts_dataset_group_names() { 3119 let dataset = 3120 BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); 3121 3122 for group in dataset.groups() { 3123 GroupId::new(group.id()).expect("group id"); 3124 } 3125 } 3126 3127 fn passing_scenario(name: &str) -> ScenarioReport { 3128 ScenarioReport::new(name, 1, 1, 0, 10, vec![1], 128) 3129 } 3130 3131 fn smoke_profile(config: BenchDatasetConfig) -> BenchmarkProfile { 3132 BenchmarkProfile::smoke() 3133 .with_dataset_config(config) 3134 .expect("smoke profile") 3135 } 3136 }