tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 3303026a759d61933d9c4ab3bc90885def8abae4
parent 2c1b7c56e99354aec0800de6dbd6f4d4b6018a36
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 19:51:32 -0700

bench: add tangle v2 benchmark harness

Diffstat:
MCargo.lock | 6++----
Mcrates/tangle_bench/Cargo.toml | 6++----
Mcrates/tangle_bench/src/bin/tangle_benchmark_report.rs | 218++++++++++++++++++++++++++++---------------------------------------------------
Mcrates/tangle_bench/src/lib.rs | 2070++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
4 files changed, 1331 insertions(+), 969 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4321,13 +4321,11 @@ version = "0.1.0" dependencies = [ "serde_json", "sha2", - "tangle_nips", + "tangle_groups", "tangle_protocol", "tangle_runtime", - "tangle_store", - "tangle_store_surreal", + "tangle_store_pocket", "tangle_test_support", - "tokio", ] [[package]] diff --git a/crates/tangle_bench/Cargo.toml b/crates/tangle_bench/Cargo.toml @@ -14,13 +14,11 @@ path = "src/bin/tangle_benchmark_report.rs" [dependencies] serde_json = "1" sha2 = "0.10" -tangle_nips = { path = "../tangle_nips" } +tangle_groups = { path = "../tangle_groups" } tangle_protocol = { path = "../tangle_protocol" } tangle_runtime = { path = "../tangle_runtime" } -tangle_store = { path = "../tangle_store" } -tangle_store_surreal = { path = "../tangle_store_surreal" } +tangle_store_pocket = { path = "../tangle_store_pocket" } tangle_test_support = { path = "../tangle_test_support" } -tokio = { version = "1", features = ["macros", "rt"] } [lints] workspace = true diff --git a/crates/tangle_bench/src/bin/tangle_benchmark_report.rs b/crates/tangle_bench/src/bin/tangle_benchmark_report.rs @@ -1,151 +1,71 @@ #![forbid(unsafe_code)] -use serde_json::json; use std::env; use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; use std::time::{SystemTime, UNIX_EPOCH}; -use tangle_bench::{ - BenchDatasetConfig, capture_query_plans, run_ingest_benchmark, run_listing_query_benchmark, - run_rebuild_benchmark, run_restore_drill_smoke, run_search_benchmark, -}; +use tangle_bench::{BenchDatasetConfig, BenchmarkRunReport}; use tangle_runtime::TANGLE_SUPPORTED_NIPS; struct BenchmarkReportArgs { output_root: PathBuf, run_id: String, - listing_count: usize, - note_count: usize, + config: BenchDatasetConfig, } fn main() { - let result = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|error| error.to_string()) - .and_then(|runtime| runtime.block_on(run())); - if let Err(error) = result { - eprintln!("{error}"); - std::process::exit(1); + match run() { + Ok(Some(artifact_dir)) => println!("{}", path_string(&artifact_dir)), + Ok(None) => {} + Err(error) => { + eprintln!("{error}"); + std::process::exit(1); + } } } -async fn run() -> Result<(), String> { - let args = BenchmarkReportArgs::parse(env::args().skip(1))?; +fn run() -> Result<Option<PathBuf>, String> { + let Some(args) = BenchmarkReportArgs::parse(env::args().skip(1))? else { + println!("{}", help_text()); + return Ok(None); + }; let artifact_dir = args.output_root.join(&args.run_id); fs::create_dir_all(&artifact_dir).map_err(|error| error.to_string())?; - let config = BenchDatasetConfig::new(args.listing_count, args.note_count); - - let ingest = run_ingest_benchmark(config).await?; - let listing = run_listing_query_benchmark(config).await?; - let search = run_search_benchmark(config).await?; - let query_plans = capture_query_plans(config).await?; - let rebuild = run_rebuild_benchmark(config).await?; - let restore = run_restore_drill_smoke(config).await?; - - let listing_plan_path = artifact_dir.join("listing-query-plan.txt"); - let search_plan_path = artifact_dir.join("search-query-plan.txt"); - fs::write(&listing_plan_path, &query_plans.listing_plan_text) - .map_err(|error| error.to_string())?; - fs::write(&search_plan_path, &query_plans.search_plan_text) - .map_err(|error| error.to_string())?; - - let expected_events = (args.listing_count + args.note_count) as u64; - let benchmark_smoke = ingest.attempted == expected_events - && ingest.inserted == expected_events - && listing.listing_rows == args.listing_count as u64 - && listing.limited_rows <= listing.listing_rows - && search.indexed == args.listing_count as u64 - && search.text_results > 0 - && search.browse_results > 0 - && rebuild.scanned == args.listing_count as u64 - && rebuild.rebuilt == rebuild.scanned - && rebuild.projected == rebuild.scanned - && rebuild.listing_rows == args.listing_count as u64 - && rebuild.checksum.len() == 64; - let query_plan_capture = - query_plans.listing_plan_steps > 0 && query_plans.search_plan_steps > 0; - let restore_drill_smoke = restore.exported == args.listing_count as u64 - && restore.restored == restore.exported - && restore.checksum_matches; - - let summary = json!({ - "schema": 1, - "run_id": args.run_id, - "artifact_directory": path_string(&artifact_dir), - "surrealdb_mode": "memory", - "dataset": { - "listing_count": args.listing_count, - "note_count": args.note_count, - "fixture_builder_family": "tangle_test_support canonical event builders" - }, - "artifacts": { - "summary_json": "summary.json", - "listing_query_plan": "listing-query-plan.txt", - "search_query_plan": "search-query-plan.txt" - }, - "ingest": { - "attempted": ingest.attempted, - "inserted": ingest.inserted, - "elapsed_micros": elapsed(ingest.elapsed_micros) - }, - "listing_query": { - "listing_rows": listing.listing_rows, - "limited_rows": listing.limited_rows, - "elapsed_micros": elapsed(listing.elapsed_micros) - }, - "search": { - "indexed": search.indexed, - "text_results": search.text_results, - "browse_results": search.browse_results, - "elapsed_micros": elapsed(search.elapsed_micros) - }, - "query_plan_capture": { - "listing_plan_steps": query_plans.listing_plan_steps, - "search_plan_steps": query_plans.search_plan_steps, - "listing_plan_text": "listing-query-plan.txt", - "search_plan_text": "search-query-plan.txt" - }, - "rebuild": { - "scanned": rebuild.scanned, - "rebuilt": rebuild.rebuilt, - "projected": rebuild.projected, - "listing_rows": rebuild.listing_rows, - "checksum": rebuild.checksum, - "elapsed_micros": elapsed(rebuild.elapsed_micros) - }, - "restore_drill": { - "exported": restore.exported, - "restored": restore.restored, - "source_checksum": restore.source_checksum, - "restored_checksum": restore.restored_checksum, - "checksum_matches": restore.checksum_matches - }, - "supported_nips_audit": { - "supported_nips": TANGLE_SUPPORTED_NIPS, - "count": TANGLE_SUPPORTED_NIPS.len() - }, - "validation_summary": { - "benchmark_smoke": status(benchmark_smoke), - "restore_drill_smoke": status(restore_drill_smoke), - "query_plan_capture": status(query_plan_capture), - "coverage_diagnostic": "not_run_by_release_acceptance" - } + + let report = BenchmarkRunReport::run(args.config)?; + let dataset_path = artifact_dir.join("dataset-events.jsonl"); + fs::write( + &dataset_path, + report + .dataset() + .source_events_jsonl() + .map_err(|error| error.to_string())?, + ) + .map_err(|error| error.to_string())?; + + let mut summary = report.summary_json(&args.run_id, &artifact_dir); + summary["supported_nips_audit"] = serde_json::json!({ + "supported_nips": TANGLE_SUPPORTED_NIPS, + "count": TANGLE_SUPPORTED_NIPS.len() }); + summary["run_identity"] = serde_json::json!({ + "git_commit": git_short_commit(), + "rust_toolchain": rust_toolchain(), + "host_profile": host_profile() + }); + let summary_path = artifact_dir.join("summary.json"); let raw = serde_json::to_string_pretty(&summary).map_err(|error| error.to_string())?; fs::write(&summary_path, format!("{raw}\n")).map_err(|error| error.to_string())?; - println!("{}", path_string(&artifact_dir)); - Ok(()) + Ok(Some(artifact_dir)) } impl BenchmarkReportArgs { - fn parse(args: impl IntoIterator<Item = String>) -> Result<Self, String> { + fn parse(args: impl IntoIterator<Item = String>) -> Result<Option<Self>, String> { let mut output_root = PathBuf::from(".local/tangle/benchmarks"); let mut run_id = None; - let mut listing_count = 12; - let mut note_count = 4; + let mut config = BenchDatasetConfig::smoke(); let mut args = args.into_iter(); while let Some(arg) = args.next() { match arg.as_str() { @@ -155,24 +75,35 @@ impl BenchmarkReportArgs { "--run-id" => { run_id = Some(require_value("--run-id", args.next())?); } - "--listing-count" => { - listing_count = parse_count("--listing-count", args.next())?; + "--group-count" => { + config.group_count = parse_count("--group-count", args.next())?; + } + "--public-events-per-group" => { + config.public_events_per_group = + parse_count("--public-events-per-group", args.next())?; + } + "--private-events-per-group" => { + config.private_events_per_group = + parse_count("--private-events-per-group", args.next())?; } - "--note-count" => { - note_count = parse_count("--note-count", args.next())?; + "--public-note-count" => { + config.public_note_count = parse_count("--public-note-count", args.next())?; } - "--help" => return Err(help_text()), + "--member-count" => { + config.member_count = parse_count("--member-count", args.next())?; + } + "--help" => return Ok(None), other => return Err(format!("unsupported argument `{other}`")), } } let run_id = run_id.unwrap_or_else(default_run_id); validate_run_id(&run_id)?; - Ok(Self { + let config = config.validate()?; + Ok(Some(Self { output_root, run_id, - listing_count, - note_count, - }) + config, + })) } } @@ -205,23 +136,28 @@ fn unix_seconds() -> u64 { } fn git_short_commit() -> String { - Command::new("git") - .args(["rev-parse", "--short", "HEAD"]) + command_text("git", &["rev-parse", "--short", "HEAD"]).unwrap_or_else(|| "unknown".to_owned()) +} + +fn rust_toolchain() -> String { + command_text("rustc", &["--version"]).unwrap_or_else(|| "unknown".to_owned()) +} + +fn host_profile() -> String { + let os = env::consts::OS; + let arch = env::consts::ARCH; + format!("{os}-{arch}") +} + +fn command_text(command: &str, args: &[&str]) -> Option<String> { + Command::new(command) + .args(args) .output() .ok() .filter(|output| output.status.success()) .and_then(|output| String::from_utf8(output.stdout).ok()) .map(|value| value.trim().to_owned()) .filter(|value| !value.is_empty()) - .unwrap_or_else(|| "unknown".to_owned()) -} - -fn status(value: bool) -> &'static str { - if value { "pass" } else { "fail" } -} - -fn elapsed(value: u128) -> u64 { - u64::try_from(value).unwrap_or(u64::MAX) } fn path_string(path: &Path) -> String { @@ -231,7 +167,9 @@ fn path_string(path: &Path) -> String { fn help_text() -> String { [ "usage: tangle-benchmark-report [--output-root PATH] [--run-id ID]", - " [--listing-count COUNT] [--note-count COUNT]", + " [--group-count COUNT] [--public-events-per-group COUNT]", + " [--private-events-per-group COUNT] [--public-note-count COUNT]", + " [--member-count COUNT]", ] .join("\n") } diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -1,984 +1,1412 @@ #![forbid(unsafe_code)] +use serde_json::json; use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; -use tangle_protocol::{Event, Filter, RawEventJson, filter_from_value, parse_event_json}; -use tangle_store::{StoreEventOutcome, StoredEvent}; -use tangle_store_surreal::{ - ListingCurrentOutcome, ListingProjectionQuery, SearchDocumentOutcome, SearchDocumentQuery, - SurrealConnectionConfig, SurrealStore, base_migration_plan, +use tangle_groups::{KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, MemberStatus}; +use tangle_protocol::{ + Event, Filter, RelayMessage, SubscriptionId, event_to_value, filter_from_value, }; -use tangle_test_support::{FixtureKey, build_fixture_event_from_parts}; +use tangle_runtime::base_relay::{BaseAuthState, BaseRelay}; +use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy}; +use tangle_test_support::{ + FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_config, + tangle_v2_group_create_event, tangle_v2_group_event, tangle_v2_put_user_event, tangle_v2_tag, +}; + +static TEMP_ID: AtomicU64 = AtomicU64::new(0); + +pub const SCENARIO_POCKET_QUERY_VISIBLE_EVENTS: &str = "pocket_query_visible_events"; +pub const SCENARIO_GROUP_READ_GATE_OVERHEAD: &str = "group_read_gate_overhead"; +pub const SCENARIO_PROJECTION_REBUILD: &str = "projection_rebuild"; +pub const SCENARIO_OUTBOX_REPLAY: &str = "outbox_replay"; +pub const SCENARIO_BROADCAST_LAG: &str = "broadcast_lag"; +pub const SCENARIO_MEMORY_PROFILE: &str = "memory_profile"; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BenchDatasetConfig { - pub listing_count: usize, - pub note_count: usize, + pub group_count: usize, + pub public_events_per_group: usize, + pub private_events_per_group: usize, + pub public_note_count: usize, + pub member_count: usize, } impl BenchDatasetConfig { - pub fn new(listing_count: usize, note_count: usize) -> Self { + pub fn new( + group_count: usize, + public_events_per_group: usize, + private_events_per_group: usize, + public_note_count: usize, + member_count: usize, + ) -> Self { Self { - listing_count, - note_count, + group_count, + public_events_per_group, + private_events_per_group, + public_note_count, + member_count, + } + } + + pub fn smoke() -> Self { + Self::new(6, 4, 3, 6, 3) + } + + pub fn validate(self) -> Result<Self, String> { + if self.group_count < 3 { + return Err("group-count must be at least 3".to_owned()); + } + if self.public_events_per_group == 0 { + return Err("public-events-per-group must be greater than zero".to_owned()); + } + if self.private_events_per_group == 0 { + return Err("private-events-per-group must be greater than zero".to_owned()); + } + if self.member_count == 0 { + return Err("member-count must be greater than zero".to_owned()); + } + Ok(self) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BenchGroupVisibility { + Public, + Private, + Hidden, +} + +impl BenchGroupVisibility { + pub fn as_str(self) -> &'static str { + match self { + Self::Public => "public", + Self::Private => "private", + Self::Hidden => "hidden", + } + } + + fn flags(self) -> &'static [&'static str] { + match self { + Self::Public => &[], + Self::Private => &["private"], + Self::Hidden => &["hidden"], } } } #[derive(Debug, Clone, PartialEq, Eq)] +pub struct BenchGroup { + id: String, + visibility: BenchGroupVisibility, +} + +impl BenchGroup { + pub fn id(&self) -> &str { + &self.id + } + + pub fn visibility(&self) -> BenchGroupVisibility { + self.visibility + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BenchEventAuth { + None, + Owner, + Admin, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BenchSourceEvent { + event: Event, + auth: BenchEventAuth, +} + +impl BenchSourceEvent { + pub fn event(&self) -> &Event { + &self.event + } + + pub fn auth(&self) -> BenchEventAuth { + self.auth + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct BenchDataset { - listings: Vec<Event>, - notes: Vec<Event>, + config: BenchDatasetConfig, + groups: Vec<BenchGroup>, + group_create_events: Vec<BenchSourceEvent>, + membership_events: Vec<BenchSourceEvent>, + group_timeline_events: Vec<BenchSourceEvent>, + public_note_events: Vec<BenchSourceEvent>, } impl BenchDataset { - pub fn generate(config: BenchDatasetConfig) -> Self { - let mut listings = Vec::with_capacity(config.listing_count); - for index in 0..config.listing_count { - listings.push(bench_listing(index)); + pub fn generate(config: BenchDatasetConfig) -> Result<Self, String> { + let config = config.validate()?; + let groups = (0..config.group_count) + .map(|index| BenchGroup { + id: format!("BenchFarm{index:04}"), + visibility: group_visibility(index), + }) + .collect::<Vec<_>>(); + let mut group_create_events = Vec::with_capacity(groups.len()); + let mut membership_events = Vec::with_capacity(groups.len() * config.member_count); + let mut group_timeline_events = Vec::new(); + let mut public_note_events = Vec::with_capacity(config.public_note_count); + + for (group_index, group) in groups.iter().enumerate() { + group_create_events.push(BenchSourceEvent { + event: tangle_v2_group_create_event( + FixtureKey::Owner, + &group.id, + 1_714_200_000 + u64::try_from(group_index).expect("group index fits in u64"), + group.visibility.flags(), + )?, + auth: BenchEventAuth::Owner, + }); + for member_index in 0..config.member_count { + membership_events.push(BenchSourceEvent { + event: bench_member_event(&group.id, group_index, member_index, 1_714_300_000)?, + auth: BenchEventAuth::Admin, + }); + } + let per_group = match group.visibility { + BenchGroupVisibility::Public => config.public_events_per_group, + BenchGroupVisibility::Private | BenchGroupVisibility::Hidden => { + config.private_events_per_group + } + }; + for event_index in 0..per_group { + let created_at = 1_714_400_000 + + u64::try_from(group_index * 10_000 + event_index) + .expect("event index fits in u64"); + group_timeline_events.push(BenchSourceEvent { + event: tangle_v2_group_event( + FixtureKey::Owner, + &group.id, + created_at, + 1, + &format!( + "bench {} group event {group_index:04}-{event_index:04}", + group.visibility.as_str() + ), + )?, + auth: BenchEventAuth::Owner, + }); + } } - let mut notes = Vec::with_capacity(config.note_count); - for index in 0..config.note_count { - notes.push(bench_note(index)); + + for index in 0..config.public_note_count { + public_note_events.push(BenchSourceEvent { + event: tangle_v2_event( + FixtureKey::Outsider, + 1_714_500_000 + u64::try_from(index).expect("note index fits in u64"), + 1, + vec![tangle_v2_tag("t", &["tangle-bench"])?], + &format!("bench public note {index:04}"), + )?, + auth: BenchEventAuth::None, + }); } - Self { listings, notes } + + Ok(Self { + config, + groups, + group_create_events, + membership_events, + group_timeline_events, + public_note_events, + }) } - pub fn listings(&self) -> &[Event] { - &self.listings + pub fn config(&self) -> BenchDatasetConfig { + self.config } - pub fn notes(&self) -> &[Event] { - &self.notes + pub fn groups(&self) -> &[BenchGroup] { + &self.groups } - pub fn events(&self) -> Vec<Event> { - self.listings + pub fn source_events(&self) -> Vec<&BenchSourceEvent> { + self.group_create_events .iter() - .chain(self.notes.iter()) - .cloned() + .chain(self.membership_events.iter()) + .chain(self.group_timeline_events.iter()) + .chain(self.public_note_events.iter()) .collect() } -} -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ListingWorkloadReport { - pub attempted: u64, - pub inserted: u64, - pub projected: u64, - pub listing_rows: u64, -} + pub fn source_event_count(&self) -> u64 { + self.source_events() + .len() + .try_into() + .expect("source event count fits in u64") + } -#[derive(Debug, Clone)] -pub struct MaterializedListingWorkload { - store: SurrealStore, - report: ListingWorkloadReport, -} + pub fn group_event_count(&self) -> u64 { + (self.group_create_events.len() + + self.membership_events.len() + + self.group_timeline_events.len()) + .try_into() + .expect("group event count fits in u64") + } -impl MaterializedListingWorkload { - pub fn store(&self) -> &SurrealStore { - &self.store + pub fn membership_event_count(&self) -> u64 { + self.membership_events + .len() + .try_into() + .expect("membership event count fits in u64") } - pub fn report(&self) -> ListingWorkloadReport { - self.report + pub fn largest_group_members(&self) -> u64 { + self.config + .member_count + .try_into() + .expect("member count fits in u64") } -} -pub async fn materialize_listing_workload( - dataset: &BenchDataset, -) -> Result<MaterializedListingWorkload, String> { - let store = bench_memory_store("listing_workload").await?; - let mut inserted = 0; - let mut projected = 0; - for event in dataset.listings() { - let now = event.unsigned().created_at(); - if store - .store_raw_event(&StoredEvent::new(event.clone(), now)) - .await - .map_err(|error| error.to_string())? - == StoreEventOutcome::Inserted - { - inserted += 1; + pub fn dataset_digest(&self) -> Result<String, String> { + let mut hasher = Sha256::new(); + for event in self.source_events() { + let raw = serde_json::to_string(&event_to_value(event.event())) + .map_err(|error| error.to_string())?; + hasher.update(raw.as_bytes()); + hasher.update(b"\n"); } - store - .index_event_tags(event) - .await - .map_err(|error| error.to_string())?; - store - .maintain_current_event(event) - .await - .map_err(|error| error.to_string())?; - store - .store_listing_revision(event, now) - .await - .map_err(|error| error.to_string())?; - if store - .project_current_listing(event, now) - .await - .map_err(|error| error.to_string())? - == ListingCurrentOutcome::Projected - { - projected += 1; + Ok(lower_hex(&hasher.finalize())) + } + + pub fn source_events_jsonl(&self) -> Result<String, String> { + let mut output = String::new(); + for source in self.source_events() { + let raw = serde_json::to_string(&event_to_value(source.event())) + .map_err(|error| error.to_string())?; + output.push_str(&raw); + output.push('\n'); } - store - .project_listing_helpers(event) - .await - .map_err(|error| error.to_string())?; + Ok(output) + } + + fn first_group(&self, visibility: BenchGroupVisibility) -> Result<&BenchGroup, String> { + self.groups + .iter() + .find(|group| group.visibility == visibility) + .ok_or_else(|| format!("dataset does not include {} group", visibility.as_str())) + } + + fn first_timeline_event(&self, visibility: BenchGroupVisibility) -> Result<&Event, String> { + let group = self.first_group(visibility)?; + self.group_timeline_events + .iter() + .find(|source| event_has_group(source.event(), group.id())) + .map(BenchSourceEvent::event) + .ok_or_else(|| { + format!( + "dataset does not include {} timeline event", + visibility.as_str() + ) + }) } - let listing_rows = store - .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active")) - .await - .map_err(|error| error.to_string())? - .len() as u64; - Ok(MaterializedListingWorkload { - store, - report: ListingWorkloadReport { - attempted: dataset.listings().len() as u64, - inserted, - projected, - listing_rows, - }, - }) } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct SearchWorkloadReport { - pub indexed: u64, - pub carrot_results: u64, - pub browse_results: u64, -} - -pub async fn run_search_workload(dataset: &BenchDataset) -> Result<SearchWorkloadReport, String> { - let materialized = materialize_listing_workload(dataset).await?; - let store = materialized.store(); - let mut indexed = 0; - for event in dataset.listings() { - if store - .index_listing_search_document(event) - .await - .map_err(|error| error.to_string())? - == SearchDocumentOutcome::Indexed - { - indexed += 1; - } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DatasetProfile { + pub total_events: u64, + pub group_events: u64, + pub groups: u64, + pub memberships: u64, + pub largest_group_members: u64, + pub dataset_digest: String, + pub fixture_family: String, +} + +impl DatasetProfile { + fn from_dataset(dataset: &BenchDataset) -> Result<Self, String> { + Ok(Self { + total_events: dataset.source_event_count(), + group_events: dataset.group_event_count(), + groups: dataset + .groups() + .len() + .try_into() + .expect("group count fits in u64"), + memberships: dataset.membership_event_count(), + largest_group_members: dataset.largest_group_members(), + dataset_digest: dataset.dataset_digest()?, + fixture_family: "synthetic repo-owned fixtures".to_owned(), + }) + } + + fn to_json(&self) -> serde_json::Value { + json!({ + "total_events": self.total_events, + "group_events": self.group_events, + "groups": self.groups, + "memberships": self.memberships, + "largest_group_members": self.largest_group_members, + "dataset_digest": self.dataset_digest, + "fixture_family": self.fixture_family + }) } - let carrot_results = store - .query_search_documents( - &SearchDocumentQuery::new() - .with_text("carrots") - .with_doc_type("listing") - .with_visible(true), - ) - .await - .map_err(|error| error.to_string())? - .len() as u64; - let browse_results = store - .query_search_documents( - &SearchDocumentQuery::new() - .with_doc_type("listing") - .with_visible(true) - .with_limit(5), - ) - .await - .map_err(|error| error.to_string())? - .len() as u64; - Ok(SearchWorkloadReport { - indexed, - carrot_results, - browse_results, - }) } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct GenericRelayWorkloadReport { +#[derive(Debug, Clone, PartialEq)] +pub struct ScenarioReport { + pub scenario: String, pub attempted: u64, - pub inserted: u64, - pub note_results: u64, - pub all_results: u64, + pub accepted: u64, + pub rejected: u64, + pub elapsed_micros: u64, + pub events_per_second: f64, + pub p50_micros: u64, + pub p95_micros: u64, + pub p99_micros: u64, + pub max_rss_bytes: u64, } -pub async fn run_generic_relay_workload( - dataset: &BenchDataset, -) -> Result<GenericRelayWorkloadReport, String> { - let store = bench_memory_store("generic_relay_workload").await?; - let events = dataset.events(); - let mut inserted = 0; - for event in &events { - if store - .store_raw_event(&StoredEvent::new( - event.clone(), - event.unsigned().created_at(), - )) - .await - .map_err(|error| error.to_string())? - == StoreEventOutcome::Inserted - { - inserted += 1; +impl ScenarioReport { + fn new( + scenario: &str, + attempted: u64, + accepted: u64, + rejected: u64, + elapsed_micros: u64, + mut samples: Vec<u64>, + max_rss_bytes: u64, + ) -> Self { + samples.sort_unstable(); + let events_per_second = if elapsed_micros == 0 { + 0.0 + } else { + attempted as f64 * 1_000_000.0 / elapsed_micros as f64 + }; + Self { + scenario: scenario.to_owned(), + attempted, + accepted, + rejected, + elapsed_micros, + events_per_second, + p50_micros: percentile(&samples, 50), + p95_micros: percentile(&samples, 95), + p99_micros: percentile(&samples, 99), + max_rss_bytes, } - store - .index_event_tags(event) - .await - .map_err(|error| error.to_string())?; - store - .maintain_current_event(event) - .await - .map_err(|error| error.to_string())?; } - let note_filter = filter_from_value(&serde_json::json!({ - "kinds": [1], - "limit": dataset.notes().len() - }))?; - let note_results = store - .query_raw_events(&note_filter) - .await - .map_err(|error| error.to_string())? - .len() as u64; - let all_results = store - .query_raw_events(&tangle_protocol::Filter::empty()) - .await - .map_err(|error| error.to_string())? - .len() as u64; - Ok(GenericRelayWorkloadReport { - attempted: events.len() as u64, - inserted, - note_results, - all_results, - }) + + fn pass_latency_gate(&self, p95_threshold_micros: u64) -> bool { + self.rejected == 0 + && self.accepted == self.attempted + && self.p95_micros <= p95_threshold_micros + } + + fn pass_elapsed_gate(&self, elapsed_threshold_micros: u64) -> bool { + self.rejected == 0 + && self.accepted == self.attempted + && self.elapsed_micros <= elapsed_threshold_micros + } + + fn pass_memory_gate(&self, max_bytes: u64) -> bool { + self.rejected == 0 && self.accepted == self.attempted && self.max_rss_bytes <= max_bytes + } + + fn to_json(&self) -> serde_json::Value { + json!({ + "scenario": self.scenario, + "attempted": self.attempted, + "accepted": self.accepted, + "rejected": self.rejected, + "elapsed_micros": self.elapsed_micros, + "events_per_second": self.events_per_second, + "p50_micros": self.p50_micros, + "p95_micros": self.p95_micros, + "p99_micros": self.p99_micros, + "max_rss_bytes": self.max_rss_bytes + }) + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct IngestBenchmarkReport { - pub attempted: u64, - pub inserted: u64, - pub elapsed_micros: u128, +pub struct BenchmarkThresholds { + pub pocket_query_p95_micros: u64, + pub read_gate_p95_micros: u64, + pub projection_rebuild_elapsed_micros: u64, + pub outbox_replay_elapsed_micros: u64, + pub broadcast_lag_p95_micros: u64, + pub memory_profile_max_bytes: u64, } -pub async fn run_ingest_benchmark( - config: BenchDatasetConfig, -) -> Result<IngestBenchmarkReport, String> { - let dataset = BenchDataset::generate(config); - let started = Instant::now(); - let report = run_generic_relay_workload(&dataset).await?; - Ok(IngestBenchmarkReport { - attempted: report.attempted, - inserted: report.inserted, - elapsed_micros: started.elapsed().as_micros(), - }) +impl BenchmarkThresholds { + pub fn smoke() -> Self { + Self { + pocket_query_p95_micros: 1_000_000, + read_gate_p95_micros: 1_000_000, + projection_rebuild_elapsed_micros: 5_000_000, + outbox_replay_elapsed_micros: 5_000_000, + broadcast_lag_p95_micros: 1_000_000, + memory_profile_max_bytes: 512 * 1024 * 1024, + } + } + + fn to_json(self) -> serde_json::Value { + json!({ + "pocket_query_p95_micros": self.pocket_query_p95_micros, + "read_gate_p95_micros": self.read_gate_p95_micros, + "projection_rebuild_elapsed_micros": self.projection_rebuild_elapsed_micros, + "outbox_replay_elapsed_micros": self.outbox_replay_elapsed_micros, + "broadcast_lag_p95_micros": self.broadcast_lag_p95_micros, + "memory_profile_max_bytes": self.memory_profile_max_bytes + }) + } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ListingQueryBenchmarkReport { - pub listing_rows: u64, - pub limited_rows: u64, - pub elapsed_micros: u128, +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkRunReport { + dataset: BenchDataset, + dataset_profile: DatasetProfile, + scenarios: Vec<ScenarioReport>, + thresholds: BenchmarkThresholds, + validation_summary: BTreeMap<String, String>, } -pub async fn run_listing_query_benchmark( - config: BenchDatasetConfig, -) -> Result<ListingQueryBenchmarkReport, String> { - let dataset = BenchDataset::generate(config); - let materialized = materialize_listing_workload(&dataset).await?; - let started = Instant::now(); - let listing_rows = materialized - .store() - .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active")) - .await - .map_err(|error| error.to_string())? - .len() as u64; - let limited_rows = materialized - .store() - .query_current_listings( - &ListingProjectionQuery::new() - .with_effective_status("active") - .with_limit(7), - ) - .await - .map_err(|error| error.to_string())? - .len() as u64; - Ok(ListingQueryBenchmarkReport { - listing_rows, - limited_rows, - elapsed_micros: started.elapsed().as_micros(), - }) +impl BenchmarkRunReport { + pub fn run(config: BenchDatasetConfig) -> Result<Self, String> { + let dataset = BenchDataset::generate(config)?; + let thresholds = BenchmarkThresholds::smoke(); + let pocket_query = run_pocket_query_benchmark(&dataset)?; + let read_gate = run_read_gate_benchmark(&dataset)?; + let projection_rebuild = run_projection_rebuild_benchmark(&dataset)?; + let outbox_replay = run_outbox_replay_benchmark(&dataset)?; + let broadcast_lag = run_broadcast_lag_benchmark(&dataset)?; + let memory_profile = run_memory_profile_benchmark(&dataset)?; + let scenarios = vec![ + pocket_query, + read_gate, + projection_rebuild, + outbox_replay, + broadcast_lag, + memory_profile, + ]; + let validation_summary = validation_summary(&scenarios, thresholds)?; + let dataset_profile = DatasetProfile::from_dataset(&dataset)?; + Ok(Self { + dataset, + dataset_profile, + scenarios, + thresholds, + validation_summary, + }) + } + + pub fn dataset(&self) -> &BenchDataset { + &self.dataset + } + + pub fn dataset_profile(&self) -> &DatasetProfile { + &self.dataset_profile + } + + pub fn scenarios(&self) -> &[ScenarioReport] { + &self.scenarios + } + + pub fn scenario(&self, name: &str) -> Option<&ScenarioReport> { + self.scenarios + .iter() + .find(|scenario| scenario.scenario == name) + } + + pub fn validation_summary(&self) -> &BTreeMap<String, String> { + &self.validation_summary + } + + pub fn summary_json(&self, run_id: &str, artifact_directory: &Path) -> serde_json::Value { + json!({ + "schema": 1, + "run_id": run_id, + "artifact_directory": artifact_directory.to_string_lossy(), + "dataset": self.dataset_profile.to_json(), + "scenarios": self.scenarios.iter().map(ScenarioReport::to_json).collect::<Vec<_>>(), + "thresholds": self.thresholds.to_json(), + "validation_summary": self.validation_summary, + "artifacts": { + "summary_json": "summary.json", + "dataset_events_jsonl": "dataset-events.jsonl" + } + }) + } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct SearchBenchmarkReport { - pub indexed: u64, - pub text_results: u64, - pub browse_results: u64, - pub elapsed_micros: u128, +struct MaterializedBenchRelay { + relay: BaseRelay, + store_config: PocketStoreConfig, + ingest_report: ScenarioReport, } -pub async fn run_search_benchmark( - config: BenchDatasetConfig, -) -> Result<SearchBenchmarkReport, String> { - let dataset = BenchDataset::generate(config); - let materialized = materialize_listing_workload(&dataset).await?; - let store = materialized.store(); - let mut indexed = 0; - for event in dataset.listings() { - if store - .index_listing_search_document(event) - .await - .map_err(|error| error.to_string())? - == SearchDocumentOutcome::Indexed - { - indexed += 1; +fn run_pocket_query_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let mut materialized = materialize_dataset(dataset, "pocket-query", 128)?; + let public_group = dataset.first_group(BenchGroupVisibility::Public)?; + let public_event = dataset.first_timeline_event(BenchGroupVisibility::Public)?; + let owner_auth = authenticated(FixtureKey::Owner)?; + let owner = FixtureKey::Owner.public_key(); + let created_at = public_event.unsigned().created_at().as_u64(); + let operations = vec![ + QueryOperation::new( + "pocket-h", + filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()], "limit": 50}))?, + QueryAuth::None, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "pocket-d", + filter_from_value(&json!({ + "kinds": [KIND_GROUP_METADATA], + "#d": [public_group.id()], + "limit": 10 + }))?, + QueryAuth::None, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "pocket-kind-author-window-limit", + filter_from_value(&json!({ + "kinds": [1], + "authors": [owner.as_str()], + "since": created_at.saturating_sub(1), + "until": created_at.saturating_add(100_000), + "limit": 25 + }))?, + QueryAuth::Owner, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "pocket-count", + filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?, + QueryAuth::None, + QueryExpectation::AtLeast(1), + ), + ]; + let started = Instant::now(); + let mut samples = Vec::with_capacity(operations.len()); + let mut accepted = 0; + let mut rejected = 0; + for operation in operations { + let sample = Instant::now(); + let observed = match operation.name { + "pocket-count" => count_for_operation(&materialized.relay, &operation, &owner_auth)?, + _ => query_for_operation(&mut materialized.relay, &operation, &owner_auth)?, + }; + samples.push(elapsed_micros(sample)); + if operation.expectation.matches(observed) { + accepted += 1; + } else { + rejected += 1; } } + Ok(ScenarioReport::new( + SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, + accepted + rejected, + accepted, + rejected, + elapsed_micros(started), + samples, + materialized.ingest_report.max_rss_bytes, + )) +} + +fn run_read_gate_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let mut materialized = materialize_dataset(dataset, "read-gate", 128)?; + let public_group = dataset.first_group(BenchGroupVisibility::Public)?; + let private_group = dataset.first_group(BenchGroupVisibility::Private)?; + let hidden_group = dataset.first_group(BenchGroupVisibility::Hidden)?; + let owner_auth = authenticated(FixtureKey::Owner)?; + let operations = vec![ + QueryOperation::new( + "public-unauth", + filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?, + QueryAuth::None, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "private-unauth", + filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, + QueryAuth::None, + QueryExpectation::Exactly(0), + ), + QueryOperation::new( + "private-owner", + filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, + QueryAuth::Owner, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "hidden-metadata-unauth", + filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?, + QueryAuth::None, + QueryExpectation::Exactly(0), + ), + QueryOperation::new( + "hidden-metadata-owner", + filter_from_value(&json!({"kinds": [KIND_GROUP_METADATA], "#d": [hidden_group.id()]}))?, + QueryAuth::Owner, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "private-count-unauth", + filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, + QueryAuth::None, + QueryExpectation::Exactly(0), + ), + QueryOperation::new( + "private-count-owner", + filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()]}))?, + QueryAuth::Owner, + QueryExpectation::AtLeast(1), + ), + ]; let started = Instant::now(); - let text_results = store - .query_search_documents( - &SearchDocumentQuery::new() - .with_text("carrots") - .with_doc_type("listing") - .with_visible(true), - ) - .await - .map_err(|error| error.to_string())? - .len() as u64; - let browse_results = store - .query_search_documents( - &SearchDocumentQuery::new() - .with_doc_type("listing") - .with_visible(true) - .with_limit(9), - ) - .await - .map_err(|error| error.to_string())? - .len() as u64; - Ok(SearchBenchmarkReport { - indexed, - text_results, - browse_results, - elapsed_micros: started.elapsed().as_micros(), - }) + let mut samples = Vec::with_capacity(operations.len()); + let mut accepted = 0; + let mut rejected = 0; + for operation in operations { + let sample = Instant::now(); + let observed = if operation.name.contains("count") { + count_for_operation(&materialized.relay, &operation, &owner_auth)? + } else { + query_for_operation(&mut materialized.relay, &operation, &owner_auth)? + }; + samples.push(elapsed_micros(sample)); + if operation.expectation.matches(observed) { + accepted += 1; + } else { + rejected += 1; + } + } + Ok(ScenarioReport::new( + SCENARIO_GROUP_READ_GATE_OVERHEAD, + accepted + rejected, + accepted, + rejected, + elapsed_micros(started), + samples, + materialized.ingest_report.max_rss_bytes, + )) } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct QueryPlanCaptureReport { - pub listing_plan_steps: u64, - pub search_plan_steps: u64, - pub listing_plan_text: String, - pub search_plan_text: String, +fn run_projection_rebuild_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let mut materialized = materialize_dataset(dataset, "projection-rebuild", 128)?; + materialized + .relay + .shutdown() + .map_err(|error| error.to_string())?; + let started = Instant::now(); + let reopened = BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?) + .map_err(|error| error.to_string())?; + let elapsed = elapsed_micros(started); + let projection = reopened + .group_projection() + .ok_or_else(|| "group projection is unavailable".to_owned())?; + let groups_match = projection.groups().len() == dataset.groups().len(); + let members_match = projection + .members() + .values() + .filter(|member| member.status() == MemberStatus::Member) + .count() + == usize::try_from(dataset.membership_event_count()) + .expect("membership count fits in usize"); + let accepted = u64::from(groups_match && members_match); + Ok(ScenarioReport::new( + SCENARIO_PROJECTION_REBUILD, + 1, + accepted, + 1 - accepted, + elapsed, + vec![elapsed], + estimate_memory_bytes(dataset), + )) } -pub async fn capture_query_plans( - config: BenchDatasetConfig, -) -> Result<QueryPlanCaptureReport, String> { - let dataset = BenchDataset::generate(config); - let materialized = materialize_listing_workload(&dataset).await?; - for event in dataset.listings() { +fn run_outbox_replay_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let mut materialized = materialize_dataset(dataset, "outbox-replay", 128)?; + let before = generated_state_counts(&materialized.relay)?; + materialized + .relay + .shutdown() + .map_err(|error| error.to_string())?; + let started = Instant::now(); + let mut reopened = + BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?) + .map_err(|error| error.to_string())?; + let after_first = generated_state_counts(&reopened)?; + reopened.shutdown().map_err(|error| error.to_string())?; + let reopened = BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?) + .map_err(|error| error.to_string())?; + let after_second = generated_state_counts(&reopened)?; + let elapsed = elapsed_micros(started); + let accepted = u64::from(before == after_first && before == after_second); + Ok(ScenarioReport::new( + SCENARIO_OUTBOX_REPLAY, + 1, + accepted, + 1 - accepted, + elapsed, + vec![elapsed], + estimate_memory_bytes(dataset), + )) +} + +fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let mut materialized = materialize_dataset(dataset, "broadcast-lag", 1)?; + let public_group = dataset.first_group(BenchGroupVisibility::Public)?; + let subscriber_count = dataset.config.group_count.max(4); + let filter = filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()]}))?; + for index in 0..subscriber_count { materialized - .store() - .index_listing_search_document(event) - .await + .relay + .handle_req( + subscription(&format!("lag-{index:04}"))?, + vec![filter.clone()], + ) .map_err(|error| error.to_string())?; } - let listing_query = "SELECT * FROM listing_current WHERE hidden = false AND deleted = false AND effective_status = 'active' ORDER BY updated_at DESC, event_id ASC LIMIT 10 EXPLAIN;"; - let search_query = "SELECT * FROM search_doc WHERE doc_type = 'listing' AND visible = true ORDER BY updated_at DESC, event_id ASC LIMIT 10 EXPLAIN;"; - let listing_plan = explain_query(materialized.store(), listing_query).await?; - let search_plan = explain_query(materialized.store(), search_query).await?; - Ok(QueryPlanCaptureReport { - listing_plan_steps: listing_plan.len() as u64, - search_plan_steps: search_plan.len() as u64, - listing_plan_text: format!("{listing_query}\n{listing_plan:?}"), - search_plan_text: format!("{search_query}\n{search_plan:?}"), - }) + let first = tangle_v2_group_event( + FixtureKey::Owner, + public_group.id(), + 1_714_600_000, + 1, + "broadcast lag first", + )?; + let second = tangle_v2_group_event( + FixtureKey::Owner, + public_group.id(), + 1_714_600_001, + 1, + "broadcast lag second", + )?; + let started = Instant::now(); + let first_messages = materialized.relay.fanout(&first); + let second_messages = materialized.relay.fanout(&second); + let elapsed = elapsed_micros(started); + let first_events = first_messages + .iter() + .filter(|message| matches!(message, RelayMessage::Event { .. })) + .count(); + let closed = second_messages + .iter() + .filter(|message| matches!(message, RelayMessage::Closed { .. })) + .count(); + let accepted = if first_events == subscriber_count + && closed == subscriber_count + && materialized.relay.active_subscription_count() == 0 + { + subscriber_count + } else { + 0 + }; + let attempted = subscriber_count + .try_into() + .expect("subscriber count fits in u64"); + let accepted = accepted.try_into().expect("accepted fits in u64"); + Ok(ScenarioReport::new( + SCENARIO_BROADCAST_LAG, + attempted, + accepted, + attempted - accepted, + elapsed, + vec![elapsed], + materialized.ingest_report.max_rss_bytes, + )) } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RebuildBenchmarkReport { - pub scanned: u64, - pub rebuilt: u64, - pub projected: u64, - pub listing_rows: u64, - pub checksum: String, - pub elapsed_micros: u128, +fn run_memory_profile_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let started = Instant::now(); + let estimated = estimate_memory_bytes(dataset); + let elapsed = elapsed_micros(started); + Ok(ScenarioReport::new( + SCENARIO_MEMORY_PROFILE, + 1, + 1, + 0, + elapsed, + vec![elapsed], + estimated, + )) } -pub async fn run_rebuild_benchmark( - config: BenchDatasetConfig, -) -> Result<RebuildBenchmarkReport, String> { - let dataset = BenchDataset::generate(config); - let materialized = materialize_listing_workload(&dataset).await?; - clear_projection_tables(materialized.store()).await?; - let started = Instant::now(); - let rows = materialized - .store() - .query_raw_events(&Filter::empty()) - .await - .map_err(|error| error.to_string())?; - let mut rebuilt = 0; - let mut projected = 0; - for row in &rows { - let raw = row - .get("raw_json") - .and_then(serde_json::Value::as_str) - .ok_or_else(|| "raw event row is missing raw_json".to_owned())?; - let raw = RawEventJson::new(raw).map_err(|error| error.to_string())?; - let event = parse_event_json(&raw) - .map_err(|error| format!("stored event JSON is invalid: {error}"))?; - let now = event.unsigned().created_at(); - materialized - .store() - .maintain_current_event(&event) - .await - .map_err(|error| error.to_string())?; - materialized - .store() - .store_listing_revision(&event, now) - .await +fn materialize_dataset( + dataset: &BenchDataset, + run_name: &str, + max_pending_events: usize, +) -> Result<MaterializedBenchRelay, String> { + let store_config = bench_store_config(run_name)?; + let mut relay = + BaseRelay::open_with_groups(&store_config, max_pending_events, &group_config()?) .map_err(|error| error.to_string())?; - if materialized - .store() - .project_current_listing(&event, now) - .await - .map_err(|error| error.to_string())? - == ListingCurrentOutcome::Projected - { - projected += 1; + let owner_auth = authenticated(FixtureKey::Owner)?; + let admin_auth = authenticated(FixtureKey::Admin)?; + let started = Instant::now(); + let mut samples = Vec::with_capacity(dataset.source_events().len()); + let mut accepted = 0; + let mut rejected = 0; + for source in dataset.source_events() { + let sample = Instant::now(); + let message = match source.auth() { + BenchEventAuth::None => relay + .handle_event(source.event().clone()) + .map_err(|error| error.to_string())?, + BenchEventAuth::Owner => relay + .handle_event_with_auth(source.event().clone(), &owner_auth) + .map_err(|error| error.to_string())?, + BenchEventAuth::Admin => relay + .handle_event_with_auth(source.event().clone(), &admin_auth) + .map_err(|error| error.to_string())?, + }; + samples.push(elapsed_micros(sample)); + if ok_accepted(&message) { + accepted += 1; + } else { + rejected += 1; } - materialized - .store() - .project_listing_helpers(&event) - .await - .map_err(|error| error.to_string())?; - rebuilt += 1; } - let listing_rows = materialized - .store() - .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active")) - .await - .map_err(|error| error.to_string())?; - Ok(RebuildBenchmarkReport { - scanned: rows.len() as u64, - rebuilt, - projected, - listing_rows: listing_rows.len() as u64, - checksum: listing_rows_checksum(&listing_rows), - elapsed_micros: started.elapsed().as_micros(), + let attempted = accepted + rejected; + let ingest_report = ScenarioReport::new( + "dataset_ingest", + attempted, + accepted, + rejected, + elapsed_micros(started), + samples, + estimate_memory_bytes(dataset), + ); + Ok(MaterializedBenchRelay { + relay, + store_config, + ingest_report, }) } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RestoreDrillSmokeReport { - pub exported: u64, - pub restored: u64, - pub source_checksum: String, - pub restored_checksum: String, - pub checksum_matches: bool, +#[derive(Clone)] +struct QueryOperation { + name: &'static str, + filter: Filter, + auth: QueryAuth, + expectation: QueryExpectation, } -pub async fn run_restore_drill_smoke( - config: BenchDatasetConfig, -) -> Result<RestoreDrillSmokeReport, String> { - let dataset = BenchDataset::generate(config); - let source = materialize_listing_workload(&dataset).await?; - let source_rows = source - .store() - .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active")) - .await - .map_err(|error| error.to_string())?; - let source_checksum = listing_rows_checksum(&source_rows); - let exported_rows = source - .store() - .query_raw_events(&Filter::empty()) - .await - .map_err(|error| error.to_string())?; - let exported = exported_rows - .iter() - .map(|row| { - row.get("raw_json") - .and_then(serde_json::Value::as_str) - .map(str::to_owned) - .ok_or_else(|| "raw event row is missing raw_json".to_owned()) - }) - .collect::<Result<Vec<_>, _>>()?; - let restored_store = bench_memory_store("restore_drill_smoke").await?; - let mut restored = 0; - for raw in &exported { - let raw = RawEventJson::new(raw).map_err(|error| error.to_string())?; - let event = parse_event_json(&raw) - .map_err(|error| format!("restored event JSON is invalid: {error}"))?; - let now = event.unsigned().created_at(); - if restored_store - .store_raw_event(&StoredEvent::new(event.clone(), now)) - .await - .map_err(|error| error.to_string())? - == StoreEventOutcome::Inserted - { - restored += 1; +impl QueryOperation { + fn new( + name: &'static str, + filter: Filter, + auth: QueryAuth, + expectation: QueryExpectation, + ) -> Self { + Self { + name, + filter, + auth, + expectation, } - restored_store - .maintain_current_event(&event) - .await - .map_err(|error| error.to_string())?; - restored_store - .store_listing_revision(&event, now) - .await - .map_err(|error| error.to_string())?; - restored_store - .project_current_listing(&event, now) - .await - .map_err(|error| error.to_string())?; - restored_store - .project_listing_helpers(&event) - .await - .map_err(|error| error.to_string())?; } - let restored_rows = restored_store - .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active")) - .await - .map_err(|error| error.to_string())?; - let restored_checksum = listing_rows_checksum(&restored_rows); - Ok(RestoreDrillSmokeReport { - exported: exported.len() as u64, - restored, - checksum_matches: source_checksum == restored_checksum, - source_checksum, - restored_checksum, +} + +#[derive(Clone, Copy)] +enum QueryAuth { + None, + Owner, +} + +#[derive(Clone, Copy)] +enum QueryExpectation { + Exactly(u64), + AtLeast(u64), +} + +impl QueryExpectation { + fn matches(self, observed: u64) -> bool { + match self { + Self::Exactly(expected) => observed == expected, + Self::AtLeast(expected) => observed >= expected, + } + } +} + +fn query_for_operation( + relay: &mut BaseRelay, + operation: &QueryOperation, + owner_auth: &BaseAuthState, +) -> Result<u64, String> { + let subscription_id = subscription(operation.name)?; + let messages = match operation.auth { + QueryAuth::None => relay + .handle_req(subscription_id.clone(), vec![operation.filter.clone()]) + .map_err(|error| error.to_string())?, + QueryAuth::Owner => relay + .handle_req_with_auth( + subscription_id.clone(), + vec![operation.filter.clone()], + owner_auth, + ) + .map_err(|error| error.to_string())?, + }; + relay.handle_close(&subscription_id); + Ok(messages + .iter() + .filter(|message| matches!(message, RelayMessage::Event { .. })) + .count() + .try_into() + .expect("message count fits in u64")) +} + +fn count_for_operation( + relay: &BaseRelay, + operation: &QueryOperation, + owner_auth: &BaseAuthState, +) -> Result<u64, String> { + let subscription_id = subscription(operation.name)?; + let message = match operation.auth { + QueryAuth::None => relay + .handle_count(subscription_id, vec![operation.filter.clone()]) + .map_err(|error| error.to_string())?, + QueryAuth::Owner => relay + .handle_count_with_auth(subscription_id, vec![operation.filter.clone()], owner_auth) + .map_err(|error| error.to_string())?, + }; + match message { + RelayMessage::Count { count, .. } => Ok(count), + value => Err(format!("expected COUNT message, got {value:?}")), + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct GeneratedStateCounts { + metadata: u64, + admins: u64, + members: u64, +} + +fn generated_state_counts(relay: &BaseRelay) -> Result<GeneratedStateCounts, String> { + Ok(GeneratedStateCounts { + metadata: count_kind(relay, KIND_GROUP_METADATA)?, + admins: count_kind(relay, KIND_GROUP_ADMINS)?, + members: count_kind(relay, KIND_GROUP_MEMBERS)?, }) } -async fn explain_query( - store: &SurrealStore, - query: &str, -) -> Result<Vec<serde_json::Value>, String> { - let mut response = store - .database() - .query(query) - .await - .map_err(|error| error.to_string())? - .check() - .map_err(|error| error.to_string())?; - response.take(0).map_err(|error| error.to_string()) -} - -async fn clear_projection_tables(store: &SurrealStore) -> Result<(), String> { - store - .database() - .query( - r#" -DELETE event_current; -DELETE listing_revision; -DELETE listing_current; -DELETE listing_category; -DELETE listing_fulfillment; -DELETE listing_tag; -DELETE listing_practice; -DELETE listing_certification; -DELETE search_doc; -"#, +fn count_kind(relay: &BaseRelay, kind: u32) -> Result<u64, String> { + let owner_auth = authenticated(FixtureKey::Owner)?; + let message = relay + .handle_count_with_auth( + subscription(&format!("count-{kind}"))?, + vec![filter_from_value(&json!({"kinds": [kind]}))?], + &owner_auth, ) - .await - .map_err(|error| error.to_string())? - .check() .map_err(|error| error.to_string())?; - Ok(()) + match message { + RelayMessage::Count { count, .. } => Ok(count), + value => Err(format!("expected COUNT message, got {value:?}")), + } } -fn listing_rows_checksum(rows: &[serde_json::Value]) -> String { - let mut parts = rows +fn validation_summary( + scenarios: &[ScenarioReport], + thresholds: BenchmarkThresholds, +) -> Result<BTreeMap<String, String>, String> { + let mut summary = BTreeMap::new(); + summary.insert( + SCENARIO_POCKET_QUERY_VISIBLE_EVENTS.to_owned(), + status( + scenario(scenarios, SCENARIO_POCKET_QUERY_VISIBLE_EVENTS)? + .pass_latency_gate(thresholds.pocket_query_p95_micros), + ), + ); + summary.insert( + SCENARIO_GROUP_READ_GATE_OVERHEAD.to_owned(), + status( + scenario(scenarios, SCENARIO_GROUP_READ_GATE_OVERHEAD)? + .pass_latency_gate(thresholds.read_gate_p95_micros), + ), + ); + summary.insert( + SCENARIO_PROJECTION_REBUILD.to_owned(), + status( + scenario(scenarios, SCENARIO_PROJECTION_REBUILD)? + .pass_elapsed_gate(thresholds.projection_rebuild_elapsed_micros), + ), + ); + summary.insert( + SCENARIO_OUTBOX_REPLAY.to_owned(), + status( + scenario(scenarios, SCENARIO_OUTBOX_REPLAY)? + .pass_elapsed_gate(thresholds.outbox_replay_elapsed_micros), + ), + ); + summary.insert( + SCENARIO_BROADCAST_LAG.to_owned(), + status( + scenario(scenarios, SCENARIO_BROADCAST_LAG)? + .pass_latency_gate(thresholds.broadcast_lag_p95_micros), + ), + ); + summary.insert( + SCENARIO_MEMORY_PROFILE.to_owned(), + status( + scenario(scenarios, SCENARIO_MEMORY_PROFILE)? + .pass_memory_gate(thresholds.memory_profile_max_bytes), + ), + ); + Ok(summary) +} + +fn scenario<'a>(scenarios: &'a [ScenarioReport], name: &str) -> Result<&'a ScenarioReport, String> { + scenarios .iter() - .map(|row| { - format!( - "{}:{}", - row.get("listing_key") - .and_then(serde_json::Value::as_str) - .unwrap_or_default(), - row.get("event_id") - .and_then(serde_json::Value::as_str) - .unwrap_or_default() - ) - }) - .collect::<Vec<_>>(); - parts.sort(); - let digest = Sha256::digest(parts.join("\n").as_bytes()); - lower_hex(&digest) + .find(|scenario| scenario.scenario == name) + .ok_or_else(|| format!("scenario `{name}` was not run")) } -fn lower_hex(bytes: &[u8]) -> String { - const HEX: &[u8; 16] = b"0123456789abcdef"; - let mut output = String::with_capacity(bytes.len() * 2); - for byte in bytes { - output.push(char::from(HEX[usize::from(byte >> 4)])); - output.push(char::from(HEX[usize::from(byte & 0x0f)])); - } - output +fn status(value: bool) -> String { + if value { "pass" } else { "fail" }.to_owned() } -async fn bench_memory_store(database: &str) -> Result<SurrealStore, String> { - let config = SurrealConnectionConfig::memory("tangle_bench", database) - .map_err(|error| error.to_string())?; - let store = SurrealStore::connect_memory(&config) - .await - .map_err(|error| error.to_string())?; - store - .apply_plan(&base_migration_plan()) - .await - .map_err(|error| error.to_string())?; - Ok(store) -} - -fn bench_listing(index: usize) -> Event { - let created_at = 1_714_200_000 + index as u64; - let price_major = 10 + (index % 50); - let price_minor = (index * 7) % 100; - let d = format!("bench-listing-{index:04}"); - let title = format!("Bench carrots {index:04}"); - let content = format!("Deterministic bench listing body {index:04}"); - build_fixture_event_from_parts( - FixtureKey::Seller, - created_at, - 30_402, +fn bench_member_event( + group_id: &str, + group_index: usize, + member_index: usize, + base_created_at: u64, +) -> Result<Event, String> { + if member_index == 0 { + return tangle_v2_put_user_event( + FixtureKey::Admin, + group_id, + FixtureKey::Member, + base_created_at + u64::try_from(group_index * 10_000).expect("group index fits in u64"), + ); + } + let pubkey = synthetic_member_pubkey(group_index, member_index); + tangle_v2_event( + FixtureKey::Admin, + base_created_at + + u64::try_from(group_index * 10_000 + member_index).expect("member index fits in u64"), + 9_000, vec![ - vec!["d".to_owned(), d], - vec!["title".to_owned(), title], - vec![ - "price".to_owned(), - format!("{price_major}.{price_minor:02}"), - "USD".to_owned(), - ], - vec!["unit".to_owned(), "lb".to_owned()], - vec!["fulfillment".to_owned(), "pickup".to_owned()], - vec!["g".to_owned(), format!("c22yzu{}", index % 10)], - vec!["category".to_owned(), bench_category(index).to_owned()], - vec!["t".to_owned(), bench_topic(index).to_owned()], - vec!["practice".to_owned(), "no spray".to_owned()], - vec!["certification".to_owned(), "organic".to_owned()], + tangle_v2_tag("h", &[group_id])?, + tangle_v2_tag("p", &[pubkey.as_str()])?, ], - &content, + "", ) - .expect("deterministic benchmark listing fixture builds") } -fn bench_note(index: usize) -> Event { - build_fixture_event_from_parts( - FixtureKey::Buyer, - 1_714_300_000 + index as u64, - 1, - vec![vec!["t".to_owned(), "bench".to_owned()]], - &format!("Deterministic generic relay note {index:04}"), +fn synthetic_member_pubkey(group_index: usize, member_index: usize) -> String { + format!( + "{:064x}", + 0x100000_u128 + (group_index as u128 * 10_000) + member_index as u128 ) - .expect("deterministic benchmark note fixture builds") } -fn bench_category(index: usize) -> &'static str { +fn group_visibility(index: usize) -> BenchGroupVisibility { match index % 3 { - 0 => "vegetables", - 1 => "fruit", - _ => "herbs", + 0 => BenchGroupVisibility::Public, + 1 => BenchGroupVisibility::Private, + _ => BenchGroupVisibility::Hidden, } } -fn bench_topic(index: usize) -> &'static str { - match index % 4 { - 0 => "carrots", - 1 => "greens", - 2 => "apples", - _ => "basil", +fn event_has_group(event: &Event, group_id: &str) -> bool { + event.unsigned().tags().iter().any(|tag| { + tag.indexed_pair() + .is_some_and(|(name, value)| name == "h" && value == group_id) + }) +} + +fn group_config() -> Result<tangle_groups::GroupRuntimeConfig, String> { + tangle_v2_group_config(FixtureKey::Owner, &[FixtureKey::Admin]) +} + +fn authenticated(key: FixtureKey) -> Result<BaseAuthState, String> { + let mut auth = + BaseAuthState::new(TANGLE_V2_RELAY_URL, 60).map_err(|error| error.to_string())?; + auth.issue_challenge("challenge-a", tangle_protocol::UnixTimestamp::new(100)) + .map_err(|error| error.to_string())?; + let event = tangle_v2_auth_event(key, "challenge-a", 120)?; + auth.authenticate(&event, tangle_protocol::UnixTimestamp::new(120)) + .map_err(|error| error.to_string())?; + Ok(auth) +} + +fn bench_store_config(run_name: &str) -> Result<PocketStoreConfig, String> { + let root = bench_temp_root(run_name); + let _ = fs::remove_dir_all(&root); + PocketStoreConfig::new( + root.join("pocket"), + 1024 * 1024 * 1024, + 128, + PocketSyncPolicy::FlushOnShutdown, + ) + .map_err(|error| error.to_string()) +} + +fn bench_temp_root(run_name: &str) -> PathBuf { + let id = TEMP_ID.fetch_add(1, Ordering::Relaxed); + std::env::temp_dir().join(format!( + "tangle-bench-{run_name}-{}-{id}", + std::process::id() + )) +} + +fn subscription(value: &str) -> Result<SubscriptionId, String> { + SubscriptionId::new(value).map_err(|error| error.to_string()) +} + +fn ok_accepted(message: &RelayMessage) -> bool { + matches!(message, RelayMessage::Ok { accepted: true, .. }) +} + +fn estimate_memory_bytes(dataset: &BenchDataset) -> u64 { + let event_bytes = dataset + .source_events() + .iter() + .map(|source| { + serde_json::to_string(&event_to_value(source.event())) + .unwrap_or_default() + .len() + }) + .sum::<usize>(); + let projection_bytes = dataset.groups().len() * 512 + + usize::try_from(dataset.membership_event_count()).expect("member count fits in usize") + * 192; + (event_bytes + projection_bytes) + .try_into() + .expect("estimated memory fits in u64") +} + +fn percentile(samples: &[u64], percentile: u64) -> u64 { + if samples.is_empty() { + return 0; } + let last = samples.len() - 1; + let index = (last as u64 * percentile).div_ceil(100); + samples[usize::try_from(index).expect("percentile index fits in usize")] +} + +fn elapsed_micros(started: Instant) -> u64 { + u64::try_from(started.elapsed().as_micros()) + .unwrap_or(u64::MAX) + .max(1) +} + +fn lower_hex(bytes: &[u8]) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut output = String::with_capacity(bytes.len() * 2); + for byte in bytes { + output.push(char::from(HEX[usize::from(byte >> 4)])); + output.push(char::from(HEX[usize::from(byte & 0x0f)])); + } + output } #[cfg(test)] mod tests { - use super::{BenchDataset, BenchDatasetConfig}; + use super::{ + BenchDataset, BenchDatasetConfig, BenchGroupVisibility, BenchmarkRunReport, + SCENARIO_BROADCAST_LAG, SCENARIO_GROUP_READ_GATE_OVERHEAD, SCENARIO_MEMORY_PROFILE, + SCENARIO_OUTBOX_REPLAY, SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, SCENARIO_PROJECTION_REBUILD, + generated_state_counts, materialize_dataset, + }; use std::collections::BTreeSet; - use tangle_nips::{ListingProjectionEvaluation, evaluate_listing_projection}; - use tangle_test_support::{build_fixture_event, projection_ineligible_listing_spec}; + use tangle_groups::{GroupId, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA}; #[test] - fn deterministic_dataset_generator_produces_stable_signed_events() { - let first = BenchDataset::generate(BenchDatasetConfig::new(4, 2)); - let second = BenchDataset::generate(BenchDatasetConfig::new(4, 2)); - let listing_ids = first - .listings() - .iter() - .map(|event| event.id().as_str()) - .collect::<BTreeSet<_>>(); - let note_ids = first - .notes() - .iter() - .map(|event| event.id().as_str()) - .collect::<BTreeSet<_>>(); - + fn deterministic_dataset_generator_produces_stable_group_events() { + let first = + BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("first dataset"); + let second = + BenchDataset::generate(BenchDatasetConfig::new(3, 2, 2, 2, 2)).expect("second dataset"); + let first_ids = first + .source_events() + .into_iter() + .map(|source| source.event().id().as_str().to_owned()) + .collect::<Vec<_>>(); + let second_ids = second + .source_events() + .into_iter() + .map(|source| source.event().id().as_str().to_owned()) + .collect::<Vec<_>>(); + let unique_ids = first_ids.iter().cloned().collect::<BTreeSet<_>>(); + + assert_eq!(first_ids, second_ids); + assert_eq!(first.groups().len(), 3); + assert_eq!(first.source_event_count(), 17); + assert_eq!(first.group_event_count(), 15); + assert_eq!(first.membership_event_count(), 6); + assert_eq!(unique_ids.len(), first_ids.len()); assert_eq!( first - .events() + .groups() .iter() - .map(|event| event.id().as_str().to_owned()) + .map(|group| group.visibility()) .collect::<Vec<_>>(), - second - .events() - .iter() - .map(|event| event.id().as_str().to_owned()) - .collect::<Vec<_>>() + vec![ + BenchGroupVisibility::Public, + BenchGroupVisibility::Private, + BenchGroupVisibility::Hidden + ] ); - assert_eq!(first.listings().len(), 4); - assert_eq!(first.notes().len(), 2); - assert_eq!(listing_ids.len(), 4); - assert_eq!(note_ids.len(), 2); - assert!(first.listings().iter().all(|event| matches!( - evaluate_listing_projection(event), - ListingProjectionEvaluation::Eligible(_) - ))); - assert!( - first - .notes() - .iter() - .all(|event| event.unsigned().kind().as_u32() == 1) + assert_eq!( + first.dataset_digest().expect("first digest"), + second.dataset_digest().expect("second digest") + ); + assert_eq!( + first.source_events_jsonl().expect("jsonl").lines().count(), + usize::try_from(first.source_event_count()).expect("count fits") ); } #[test] - fn deterministic_dataset_generator_handles_empty_edges() { - let empty = BenchDataset::generate(BenchDatasetConfig::new(0, 0)); - let listings_only = BenchDataset::generate(BenchDatasetConfig::new(2, 0)); - let notes_only = BenchDataset::generate(BenchDatasetConfig::new(0, 3)); - - assert!(empty.events().is_empty()); - assert_eq!(listings_only.events().len(), 2); - assert_eq!(notes_only.events().len(), 3); - assert!(listings_only.notes().is_empty()); - assert!(notes_only.listings().is_empty()); - } - - #[tokio::test] - async fn listing_workload_materializes_projected_listing_rows() { - let dataset = BenchDataset::generate(BenchDatasetConfig::new(8, 3)); - let materialized = super::materialize_listing_workload(&dataset) - .await - .expect("listing workload"); - let report = materialized.report(); + fn dataset_config_rejects_benchmark_shapes_without_privacy_coverage() { + assert!(BenchDataset::generate(BenchDatasetConfig::new(2, 1, 1, 0, 1)).is_err()); + assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 0, 1, 0, 1)).is_err()); + assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 0, 0, 1)).is_err()); + assert!(BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 0)).is_err()); + } + #[test] + fn materialized_dataset_populates_generated_group_state() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 1, 1)).expect("dataset"); + let materialized = materialize_dataset(&dataset, "test-generated-state", 16) + .expect("materialized dataset"); + let counts = generated_state_counts(&materialized.relay).expect("state counts"); + + assert_eq!(counts.metadata, 3); + assert_eq!(counts.admins, 3); + assert_eq!(counts.members, 3); assert_eq!( - report, - super::ListingWorkloadReport { - attempted: 8, - inserted: 8, - projected: 8, - listing_rows: 8 - } + super::count_kind(&materialized.relay, KIND_GROUP_METADATA).expect("metadata"), + 3 ); assert_eq!( - materialized - .store() - .query_current_listings( - &tangle_store_surreal::ListingProjectionQuery::new() - .with_effective_status("active") - .with_limit(3) - ) - .await - .expect("listing rows") - .len(), + super::count_kind(&materialized.relay, KIND_GROUP_ADMINS).expect("admins"), 3 ); - } - - #[tokio::test] - async fn listing_workload_counts_duplicates_and_ineligible_events() { - let listing = super::bench_listing(0); - let ineligible = - build_fixture_event(&projection_ineligible_listing_spec()).expect("ineligible listing"); - let dataset = BenchDataset { - listings: vec![listing.clone(), listing, ineligible], - notes: Vec::new(), - }; - let materialized = super::materialize_listing_workload(&dataset) - .await - .expect("listing workload"); - assert_eq!( - materialized.report(), - super::ListingWorkloadReport { - attempted: 3, - inserted: 2, - projected: 2, - listing_rows: 1 - } + super::count_kind(&materialized.relay, KIND_GROUP_MEMBERS).expect("members"), + 3 ); } - #[tokio::test] - async fn search_workload_indexes_and_queries_listing_documents() { - let dataset = BenchDataset::generate(BenchDatasetConfig::new(12, 2)); - let report = super::run_search_workload(&dataset) - .await - .expect("search workload"); - - assert_eq!( - report, - super::SearchWorkloadReport { - indexed: 12, - carrot_results: 12, - browse_results: 5 - } + #[test] + fn benchmark_suite_runs_all_required_v2_scenarios() { + let report = + BenchmarkRunReport::run(BenchDatasetConfig::new(3, 1, 1, 2, 1)).expect("report"); + + for name in [ + SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, + SCENARIO_GROUP_READ_GATE_OVERHEAD, + SCENARIO_PROJECTION_REBUILD, + SCENARIO_OUTBOX_REPLAY, + SCENARIO_BROADCAST_LAG, + SCENARIO_MEMORY_PROFILE, + ] { + let scenario = report.scenario(name).expect("scenario"); + assert_eq!(scenario.rejected, 0, "{name} rejected operations"); + assert_eq!(scenario.accepted, scenario.attempted, "{name} acceptance"); + assert!(scenario.elapsed_micros > 0, "{name} elapsed"); + } + assert_eq!(report.dataset_profile().groups, 3); + assert_eq!(report.validation_summary().len(), 6); + assert!( + report + .validation_summary() + .values() + .all(|status| status == "pass") ); } - #[tokio::test] - async fn search_workload_ignores_ineligible_listing_documents() { - let listing = super::bench_listing(0); - let ineligible = - build_fixture_event(&projection_ineligible_listing_spec()).expect("ineligible listing"); - let dataset = BenchDataset { - listings: vec![listing, ineligible], - notes: Vec::new(), - }; - let report = super::run_search_workload(&dataset) - .await - .expect("search workload"); + #[test] + fn benchmark_summary_json_matches_report_template_surface() { + let report = + BenchmarkRunReport::run(BenchDatasetConfig::new(3, 1, 1, 1, 1)).expect("report"); + let summary = report.summary_json("unit-run", std::path::Path::new(".local/unit")); + assert_eq!(summary["schema"], 1); + assert_eq!(summary["run_id"], "unit-run"); assert_eq!( - report, - super::SearchWorkloadReport { - indexed: 1, - carrot_results: 1, - browse_results: 1 - } + summary["dataset"]["fixture_family"], + "synthetic repo-owned fixtures" ); - } - - #[tokio::test] - async fn generic_relay_workload_stores_and_queries_non_marketplace_events() { - let dataset = BenchDataset::generate(BenchDatasetConfig::new(5, 7)); - let report = super::run_generic_relay_workload(&dataset) - .await - .expect("generic workload"); - + assert_eq!(summary["scenarios"].as_array().expect("scenarios").len(), 6); assert_eq!( - report, - super::GenericRelayWorkloadReport { - attempted: 12, - inserted: 12, - note_results: 7, - all_results: 12 - } + summary["validation_summary"][SCENARIO_POCKET_QUERY_VISIBLE_EVENTS], + "pass" + ); + assert!( + summary["thresholds"]["read_gate_p95_micros"] + .as_u64() + .expect("threshold") + > 0 ); - } - - #[tokio::test] - async fn generic_relay_workload_counts_duplicate_raw_events() { - let listing = super::bench_listing(0); - let note = super::bench_note(0); - let dataset = BenchDataset { - listings: vec![listing.clone(), listing], - notes: vec![note.clone(), note], - }; - let report = super::run_generic_relay_workload(&dataset) - .await - .expect("generic workload"); - assert_eq!( - report, - super::GenericRelayWorkloadReport { - attempted: 4, - inserted: 2, - note_results: 1, - all_results: 2 - } + summary["artifacts"]["dataset_events_jsonl"], + "dataset-events.jsonl" ); } - #[tokio::test] - async fn ingest_benchmark_reports_deterministic_event_counts() { - let report = super::run_ingest_benchmark(BenchDatasetConfig::new(6, 4)) - .await - .expect("ingest benchmark"); - - assert_eq!(report.attempted, 10); - assert_eq!(report.inserted, 10); - assert!(report.elapsed_micros > 0); - } - - #[tokio::test] - async fn listing_query_benchmark_reports_deterministic_row_counts() { - let report = super::run_listing_query_benchmark(BenchDatasetConfig::new(18, 0)) - .await - .expect("listing query benchmark"); - - assert_eq!(report.listing_rows, 18); - assert_eq!(report.limited_rows, 7); - assert!(report.elapsed_micros > 0); - } - - #[tokio::test] - async fn search_benchmark_reports_deterministic_query_counts() { - let report = super::run_search_benchmark(BenchDatasetConfig::new(16, 0)) - .await - .expect("search benchmark"); - - assert_eq!(report.indexed, 16); - assert_eq!(report.text_results, 16); - assert_eq!(report.browse_results, 9); - assert!(report.elapsed_micros > 0); - } - - #[tokio::test] - async fn query_plan_capture_records_listing_and_search_plans() { - let report = super::capture_query_plans(BenchDatasetConfig::new(4, 0)) - .await - .expect("query plan capture"); - - assert!(report.listing_plan_steps > 0); - assert!(report.search_plan_steps > 0); - assert!(report.listing_plan_text.contains("listing_current")); - assert!(report.search_plan_text.contains("search_doc")); + #[test] + fn projection_rebuild_scenario_recreates_groups_and_members() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 2)).expect("dataset"); + let scenario = super::run_projection_rebuild_benchmark(&dataset).expect("rebuild"); + + assert_eq!(scenario.scenario, SCENARIO_PROJECTION_REBUILD); + assert_eq!(scenario.accepted, 1); + assert_eq!(scenario.rejected, 0); } - #[tokio::test] - async fn query_plan_capture_reports_invalid_query_errors() { - let store = super::bench_memory_store("invalid_query_plan") - .await - .expect("store"); - let error = super::explain_query(&store, "SELECT * FROM").await; - - assert!(!error.expect_err("query error").is_empty()); + #[test] + fn outbox_replay_scenario_keeps_generated_state_idempotent() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); + let scenario = super::run_outbox_replay_benchmark(&dataset).expect("outbox"); + + assert_eq!(scenario.scenario, SCENARIO_OUTBOX_REPLAY); + assert_eq!(scenario.accepted, 1); + assert_eq!(scenario.rejected, 0); } - #[tokio::test] - async fn rebuild_benchmark_replays_raw_events_into_projection_rows() { - let first = super::run_rebuild_benchmark(BenchDatasetConfig::new(7, 0)) - .await - .expect("first rebuild"); - let second = super::run_rebuild_benchmark(BenchDatasetConfig::new(7, 0)) - .await - .expect("second rebuild"); - - assert_eq!(first.scanned, 7); - assert_eq!(first.rebuilt, 7); - assert_eq!(first.projected, 7); - assert_eq!(first.listing_rows, 7); - assert_eq!(first.checksum, second.checksum); - assert!(first.elapsed_micros > 0); + #[test] + fn broadcast_lag_scenario_closes_slow_subscriptions() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); + let scenario = super::run_broadcast_lag_benchmark(&dataset).expect("lag"); + + assert_eq!(scenario.scenario, SCENARIO_BROADCAST_LAG); + assert_eq!(scenario.accepted, scenario.attempted); + assert_eq!(scenario.rejected, 0); } - #[tokio::test] - async fn restore_drill_smoke_replays_exported_raw_events() { - let report = super::run_restore_drill_smoke(BenchDatasetConfig::new(6, 0)) - .await - .expect("restore drill"); - - assert_eq!(report.exported, 6); - assert_eq!(report.restored, 6); - assert_eq!(report.source_checksum, report.restored_checksum); - assert!(report.checksum_matches); + #[test] + fn percentile_helper_handles_empty_and_sorted_samples() { + assert_eq!(super::percentile(&[], 95), 0); + assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 50), 3); + assert_eq!(super::percentile(&[1, 2, 3, 4, 5], 95), 5); + assert_eq!(super::lower_hex(&[0, 15, 16, 255]), "000f10ff"); } #[test] - fn checksum_helpers_handle_empty_and_missing_fields() { - let empty = super::listing_rows_checksum(&[]); - let sparse = super::listing_rows_checksum(&[ - serde_json::json!({"listing_key": "b"}), - serde_json::json!({"event_id": "a"}), - ]); - - assert_eq!(empty.len(), 64); - assert_eq!(sparse.len(), 64); - assert_ne!(empty, sparse); - assert_eq!(super::lower_hex(&[0, 15, 16, 255]), "000f10ff"); + fn group_id_helper_accepts_dataset_group_names() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); + + for group in dataset.groups() { + GroupId::new(group.id()).expect("group id"); + } } }