commit 70cd3ac0838d031745623c6a2e03e4236c6d3534
parent 6ff22a49f409d7559f66d8ff3dbe1d2f5c299452
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 02:59:57 -0700
bench: add rebuild benchmark
Diffstat:
3 files changed, 148 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -4044,6 +4044,7 @@ name = "tangle_bench"
version = "0.1.0"
dependencies = [
"serde_json",
+ "sha2",
"tangle_nips",
"tangle_protocol",
"tangle_store",
diff --git a/crates/tangle_bench/Cargo.toml b/crates/tangle_bench/Cargo.toml
@@ -9,6 +9,7 @@ description = "Deterministic benchmark and proof-gate harnesses for tangle"
[dependencies]
serde_json = "1"
+sha2 = "0.10"
tangle_nips = { path = "../tangle_nips" }
tangle_protocol = { path = "../tangle_protocol" }
tangle_store = { path = "../tangle_store" }
diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs
@@ -1,7 +1,8 @@
#![forbid(unsafe_code)]
+use sha2::{Digest, Sha256};
use std::time::Instant;
-use tangle_protocol::{Event, filter_from_value};
+use tangle_protocol::{Event, Filter, RawEventJson, filter_from_value, parse_event_json};
use tangle_store::{StoreEventOutcome, StoredEvent};
use tangle_store_surreal::{
ListingCurrentOutcome, ListingProjectionQuery, SearchDocumentOutcome, SearchDocumentQuery,
@@ -388,6 +389,80 @@ pub async fn capture_query_plans(
})
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RebuildBenchmarkReport {
+ pub scanned: u64,
+ pub rebuilt: u64,
+ pub projected: u64,
+ pub listing_rows: u64,
+ pub checksum: String,
+ pub elapsed_micros: u128,
+}
+
+pub async fn run_rebuild_benchmark(
+ config: BenchDatasetConfig,
+) -> Result<RebuildBenchmarkReport, String> {
+ let dataset = BenchDataset::generate(config)?;
+ let materialized = materialize_listing_workload(&dataset).await?;
+ clear_projection_tables(materialized.store()).await?;
+ let started = Instant::now();
+ let rows = materialized
+ .store()
+ .query_raw_events(&Filter::empty())
+ .await
+ .map_err(|error| error.to_string())?;
+ let mut rebuilt = 0;
+ let mut projected = 0;
+ for row in &rows {
+ let raw = row
+ .get("raw_json")
+ .and_then(serde_json::Value::as_str)
+ .ok_or_else(|| "raw event row is missing raw_json".to_owned())?;
+ let raw = RawEventJson::new(raw).map_err(|error| error.to_string())?;
+ let event = parse_event_json(&raw)
+ .map_err(|error| format!("stored event JSON is invalid: {error}"))?;
+ let now = event.unsigned().created_at();
+ materialized
+ .store()
+ .maintain_current_event(&event)
+ .await
+ .map_err(|error| error.to_string())?;
+ materialized
+ .store()
+ .store_listing_revision(&event, now)
+ .await
+ .map_err(|error| error.to_string())?;
+ if materialized
+ .store()
+ .project_current_listing(&event, now)
+ .await
+ .map_err(|error| error.to_string())?
+ == ListingCurrentOutcome::Projected
+ {
+ projected += 1;
+ }
+ materialized
+ .store()
+ .project_listing_helpers(&event)
+ .await
+ .map_err(|error| error.to_string())?;
+ rebuilt += 1;
+ }
+ let listing_rows = materialized
+ .store()
+ .query_current_listings(&ListingProjectionQuery::new().with_effective_status("active"))
+ .await
+ .map_err(|error| error.to_string())?;
+ Ok(RebuildBenchmarkReport {
+ scanned: rows.len() as u64,
+ rebuilt,
+ projected,
+ listing_rows: listing_rows.len() as u64,
+ checksum: listing_rows_checksum(&listing_rows),
+ elapsed_micros: started.elapsed().as_micros(),
+ })
+}
+
async fn explain_query(
store: &SurrealStore,
query: &str,
@@ -402,6 +477,59 @@ async fn explain_query(
response.take(0).map_err(|error| error.to_string())
}
+async fn clear_projection_tables(store: &SurrealStore) -> Result<(), String> {
+ store
+ .database()
+ .query(
+ r#"
+DELETE event_current;
+DELETE listing_revision;
+DELETE listing_current;
+DELETE listing_category;
+DELETE listing_fulfillment;
+DELETE listing_tag;
+DELETE listing_practice;
+DELETE listing_certification;
+DELETE search_doc;
+"#,
+ )
+ .await
+ .map_err(|error| error.to_string())?
+ .check()
+ .map_err(|error| error.to_string())?;
+ Ok(())
+}
+
+fn listing_rows_checksum(rows: &[serde_json::Value]) -> String {
+ let mut parts = rows
+ .iter()
+ .map(|row| {
+ format!(
+ "{}:{}",
+ row.get("listing_key")
+ .and_then(serde_json::Value::as_str)
+ .unwrap_or_default(),
+ row.get("event_id")
+ .and_then(serde_json::Value::as_str)
+ .unwrap_or_default()
+ )
+ })
+ .collect::<Vec<_>>();
+ parts.sort();
+ let digest = Sha256::digest(parts.join("\n").as_bytes());
+ lower_hex(&digest)
+}
+
+fn lower_hex(bytes: &[u8]) -> String {
+ const HEX: &[u8; 16] = b"0123456789abcdef";
+ let mut output = String::with_capacity(bytes.len() * 2);
+ for byte in bytes {
+ output.push(char::from(HEX[usize::from(byte >> 4)]));
+ output.push(char::from(HEX[usize::from(byte & 0x0f)]));
+ }
+ output
+}
+
async fn bench_memory_store(database: &str) -> Result<SurrealStore, String> {
let config = SurrealConnectionConfig::memory("tangle_bench", database)
.map_err(|error| error.to_string())?;
@@ -634,4 +762,21 @@ mod tests {
assert!(report.listing_plan_text.contains("listing_current"));
assert!(report.search_plan_text.contains("search_doc"));
}
+
+ #[tokio::test]
+ async fn rebuild_benchmark_replays_raw_events_into_projection_rows() {
+ let first = super::run_rebuild_benchmark(BenchDatasetConfig::new(7, 0))
+ .await
+ .expect("first rebuild");
+ let second = super::run_rebuild_benchmark(BenchDatasetConfig::new(7, 0))
+ .await
+ .expect("second rebuild");
+
+ assert_eq!(first.scanned, 7);
+ assert_eq!(first.rebuilt, 7);
+ assert_eq!(first.projected, 7);
+ assert_eq!(first.listing_rows, 7);
+ assert_eq!(first.checksum, second.checksum);
+ assert!(first.elapsed_micros > 0);
+ }
}