tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 42bb1ef88508d320d723d0925f86f14e3593bfee
parent c5492a5795298358b2963f121789ca488458ac91
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 18:49:30 -0700

bench: cover count resource controls

Diffstat:
MCargo.lock | 1+
Mcrates/tangle_bench/Cargo.toml | 1+
Mcrates/tangle_bench/src/lib.rs | 261++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 4++--
4 files changed, 255 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1283,6 +1283,7 @@ dependencies = [ "tangle_runtime", "tangle_store_pocket", "tangle_test_support", + "tokio", ] [[package]] diff --git a/crates/tangle_bench/Cargo.toml b/crates/tangle_bench/Cargo.toml @@ -19,6 +19,7 @@ tangle_protocol = { path = "../tangle_protocol" } tangle_runtime = { path = "../tangle_runtime" } tangle_store_pocket = { path = "../tangle_store_pocket" } tangle_test_support = { path = "../tangle_test_support" } +tokio = { version = "1", features = ["rt"] } [lints] workspace = true diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -9,11 +9,16 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use tangle_groups::{KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, MemberStatus}; use tangle_protocol::{ - Event, Filter, RelayMessage, SubscriptionId, event_to_value, filter_from_value, + ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value, + filter_from_value, }; -use tangle_runtime::relay::{ - auth::BaseAuthState, - core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, +use tangle_runtime::{ + config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + relay::{ + auth::BaseAuthState, + core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, + }, + runtime::{TangleRuntime, TangleRuntimeHandle}, }; use tangle_store_pocket::{PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy}; use tangle_test_support::{ @@ -25,6 +30,7 @@ static TEMP_ID: AtomicU64 = AtomicU64::new(0); pub const SCENARIO_POCKET_QUERY_VISIBLE_EVENTS: &str = "pocket_query_visible_events"; pub const SCENARIO_GROUP_READ_GATE_OVERHEAD: &str = "group_read_gate_overhead"; +pub const SCENARIO_COUNT_RESOURCE_CONTROLS: &str = "count_resource_controls"; pub const SCENARIO_PROJECTION_REBUILD: &str = "projection_rebuild"; pub const SCENARIO_OUTBOX_REPLAY: &str = "outbox_replay"; pub const SCENARIO_BROADCAST_LAG: &str = "broadcast_lag"; @@ -593,6 +599,7 @@ impl ScenarioReport { pub struct BenchmarkThresholds { pub pocket_query_p95_micros: u64, pub read_gate_p95_micros: u64, + pub count_resource_controls_p95_micros: u64, pub projection_rebuild_elapsed_micros: u64, pub outbox_replay_elapsed_micros: u64, pub broadcast_lag_p95_micros: u64, @@ -604,6 +611,7 @@ impl BenchmarkThresholds { Self { pocket_query_p95_micros: 1_000_000, read_gate_p95_micros: 1_000_000, + count_resource_controls_p95_micros: 1_000_000, projection_rebuild_elapsed_micros: 5_000_000, outbox_replay_elapsed_micros: 5_000_000, broadcast_lag_p95_micros: 1_000_000, @@ -615,6 +623,7 @@ impl BenchmarkThresholds { Self { pocket_query_p95_micros: 2_500_000, read_gate_p95_micros: 2_500_000, + count_resource_controls_p95_micros: 2_500_000, projection_rebuild_elapsed_micros: 15_000_000, outbox_replay_elapsed_micros: 15_000_000, broadcast_lag_p95_micros: 2_500_000, @@ -626,6 +635,7 @@ impl BenchmarkThresholds { Self { pocket_query_p95_micros: 5_000_000, read_gate_p95_micros: 5_000_000, + count_resource_controls_p95_micros: 5_000_000, projection_rebuild_elapsed_micros: 60_000_000, outbox_replay_elapsed_micros: 60_000_000, broadcast_lag_p95_micros: 5_000_000, @@ -651,6 +661,10 @@ impl BenchmarkThresholds { Ok(Self { pocket_query_p95_micros: threshold_u64(value, "pocket_query_p95_micros")?, read_gate_p95_micros: threshold_u64(value, "read_gate_p95_micros")?, + count_resource_controls_p95_micros: threshold_u64( + value, + "count_resource_controls_p95_micros", + )?, projection_rebuild_elapsed_micros: threshold_u64( value, "projection_rebuild_elapsed_micros", @@ -665,6 +679,7 @@ impl BenchmarkThresholds { json!({ "pocket_query_p95_micros": self.pocket_query_p95_micros, "read_gate_p95_micros": self.read_gate_p95_micros, + "count_resource_controls_p95_micros": self.count_resource_controls_p95_micros, "projection_rebuild_elapsed_micros": self.projection_rebuild_elapsed_micros, "outbox_replay_elapsed_micros": self.outbox_replay_elapsed_micros, "broadcast_lag_p95_micros": self.broadcast_lag_p95_micros, @@ -673,10 +688,11 @@ impl BenchmarkThresholds { } } -fn benchmark_threshold_fields() -> [&'static str; 6] { +fn benchmark_threshold_fields() -> [&'static str; 7] { [ "pocket_query_p95_micros", "read_gate_p95_micros", + "count_resource_controls_p95_micros", "projection_rebuild_elapsed_micros", "outbox_replay_elapsed_micros", "broadcast_lag_p95_micros", @@ -716,6 +732,7 @@ impl BenchmarkRunReport { let thresholds = profile.thresholds(); let pocket_query = run_pocket_query_benchmark(&dataset)?; let read_gate = run_read_gate_benchmark(&dataset)?; + let count_resource_controls = run_count_resource_control_benchmark(&dataset)?; let projection_rebuild = run_projection_rebuild_benchmark(&dataset)?; let outbox_replay = run_outbox_replay_benchmark(&dataset)?; let broadcast_lag = run_broadcast_lag_benchmark(&dataset)?; @@ -723,6 +740,7 @@ impl BenchmarkRunReport { let scenarios = vec![ pocket_query, read_gate, + count_resource_controls, projection_rebuild, outbox_replay, broadcast_lag, @@ -948,6 +966,57 @@ fn run_read_gate_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, Str )) } +fn run_count_resource_control_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { + let materialized = materialize_dataset(dataset, "count-resource-controls", 128)?; + let public_group = dataset.first_group(BenchGroupVisibility::Public)?; + let private_group = dataset.first_group(BenchGroupVisibility::Private)?; + let owner_auth = authenticated(FixtureKey::Owner)?; + let operations = vec![ + QueryOperation::new( + "bounded-public-count", + filter_from_value(&json!({"kinds": [1], "#h": [public_group.id()], "limit": 25}))?, + QueryAuth::None, + QueryExpectation::AtLeast(1), + ), + QueryOperation::new( + "bounded-private-owner-count", + filter_from_value(&json!({"kinds": [1], "#h": [private_group.id()], "limit": 25}))?, + QueryAuth::Owner, + QueryExpectation::AtLeast(1), + ), + ]; + let started = Instant::now(); + let mut samples = Vec::with_capacity(operations.len() + 3); + let mut accepted = 0; + let mut rejected = 0; + for operation in operations { + let sample = Instant::now(); + let observed = count_for_operation(&materialized.relay, &operation, &owner_auth)?; + samples.push(elapsed_micros(sample)); + if operation.expectation.matches(observed) { + accepted += 1; + } else { + rejected += 1; + } + } + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .map_err(|error| format!("failed to build count resource benchmark runtime: {error}"))?; + let probe = runtime.block_on(runtime_count_resource_control_probe())?; + samples.extend(probe.samples); + accepted += probe.accepted; + rejected += probe.rejected; + Ok(ScenarioReport::new( + SCENARIO_COUNT_RESOURCE_CONTROLS, + accepted + rejected, + accepted, + rejected, + elapsed_micros(started), + samples, + materialized.ingest_report.max_rss_bytes, + )) +} + fn run_projection_rebuild_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, String> { let mut materialized = materialize_dataset(dataset, "projection-rebuild", 128)?; materialized @@ -1102,6 +1171,71 @@ fn run_memory_profile_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport )) } +struct CountResourceControlProbe { + accepted: u64, + rejected: u64, + samples: Vec<u64>, +} + +async fn runtime_count_resource_control_probe() -> Result<CountResourceControlProbe, String> { + let root = bench_temp_root("count-resource-controls-runtime"); + let _ = fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(bench_runtime_config(&root)?).map_err(|error| error.to_string())?, + ); + let mut auth = handle + .auth_state() + .await + .map_err(|error| error.to_string())?; + let cases = [ + ("broad-empty-selector-count", json!({"limit": 1})), + ("broad-kind-only-count", json!({"kinds": [1], "limit": 1})), + ( + "broad-high-limit-count", + json!({"kinds": [1], "#h": ["BenchFarm0000"], "limit": 500}), + ), + ]; + let mut samples = Vec::with_capacity(cases.len()); + let mut accepted = 0; + let mut rejected = 0; + for (name, filter) in cases { + let sample = Instant::now(); + let subscription_id = subscription(name)?; + let replies = handle + .handle_client_message( + ClientMessage::Count { + subscription_id: subscription_id.clone(), + filters: vec![filter_from_value(&filter)?], + }, + &mut auth, + UnixTimestamp::new(1_714_700_000), + ) + .await + .map_err(|error| error.to_string())?; + samples.push(elapsed_micros(sample)); + if replies + == vec![RelayMessage::Closed { + subscription_id, + message: "restricted: count filters are too broad or expensive".to_owned(), + }] + { + accepted += 1; + } else { + rejected += 1; + } + } + let metrics = handle.metrics(); + if metrics.count_refusals() != accepted || metrics.broad_query_rejections() != accepted { + rejected += 1; + } + let _ = fs::remove_dir_all(root); + Ok(CountResourceControlProbe { + accepted, + rejected, + samples, + }) +} + fn materialize_dataset( dataset: &BenchDataset, run_name: &str, @@ -1320,6 +1454,14 @@ fn validation_summary( } if let Some(failure) = record_threshold_status( &mut summary, + SCENARIO_COUNT_RESOURCE_CONTROLS, + scenario(scenarios, SCENARIO_COUNT_RESOURCE_CONTROLS)? + .pass_latency_gate(thresholds.count_resource_controls_p95_micros), + ) { + failures.push(failure); + } + if let Some(failure) = record_threshold_status( + &mut summary, SCENARIO_PROJECTION_REBUILD, scenario(scenarios, SCENARIO_PROJECTION_REBUILD)? .pass_elapsed_gate(thresholds.projection_rebuild_elapsed_micros), @@ -1453,6 +1595,87 @@ fn bench_store_config(run_name: &str) -> Result<PocketStoreConfig, String> { .map_err(|error| error.to_string()) } +fn bench_runtime_config(root: &Path) -> Result<BaseRelayRuntimeConfig, String> { + let raw = json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": TANGLE_V2_RELAY_URL + }, + "pocket": { + "data_directory": root.join("pocket"), + "sync_policy": "flush_on_shutdown", + "query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 + } + }, + "groups": { + "enabled": true, + "canonical_relay_url": TANGLE_V2_RELAY_URL, + "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] + }, + "auth": { + "challenge_ttl_seconds": 300, + "created_at_skew_seconds": 600 + }, + "limits": { + "max_message_length": 1048576, + "max_subid_length": 64, + "max_subscriptions_per_connection": 64, + "max_filters_per_request": 10, + "max_tag_values_per_filter": 100, + "max_query_complexity": 2048, + "max_limit": 500, + "default_limit": 100, + "max_event_tags": 200, + "max_content_length": 65536, + "broadcast_channel_capacity": 16, + "per_connection_outbound_queue": 8 + }, + "rate_limits": { + "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} + }, + "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} + }, + "req": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_connection": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 240}, + "per_group": {"window_seconds": 60, "max_hits": 240}, + "per_kind": {"window_seconds": 60, "max_hits": 500}, + "broad": {"window_seconds": 60, "max_hits": 30} + }, + "count": { + "per_ip": {"window_seconds": 60, "max_hits": 300}, + "per_connection": {"window_seconds": 60, "max_hits": 60}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_group": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 240}, + "broad": {"window_seconds": 60, "max_hits": 20} + } + } + }) + .to_string(); + parse_base_relay_runtime_config_json(&raw).map_err(|error| error.to_string()) +} + fn bench_temp_root(run_name: &str) -> PathBuf { let id = TEMP_ID.fetch_add(1, Ordering::Relaxed); std::env::temp_dir().join(format!( @@ -1517,9 +1740,9 @@ mod tests { use super::{ BenchDataset, BenchDatasetConfig, BenchGroupVisibility, BenchmarkProfile, BenchmarkProfileName, BenchmarkRunReport, BenchmarkThresholds, SCENARIO_BROADCAST_LAG, - SCENARIO_GROUP_READ_GATE_OVERHEAD, SCENARIO_MEMORY_PROFILE, SCENARIO_OUTBOX_REPLAY, - SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, SCENARIO_PROJECTION_REBUILD, ScenarioReport, - generated_state_counts, materialize_dataset, + SCENARIO_COUNT_RESOURCE_CONTROLS, SCENARIO_GROUP_READ_GATE_OVERHEAD, + SCENARIO_MEMORY_PROFILE, SCENARIO_OUTBOX_REPLAY, SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, + SCENARIO_PROJECTION_REBUILD, ScenarioReport, generated_state_counts, materialize_dataset, }; use std::collections::BTreeSet; use tangle_groups::{GroupId, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA}; @@ -1611,6 +1834,7 @@ mod tests { for name in [ SCENARIO_POCKET_QUERY_VISIBLE_EVENTS, SCENARIO_GROUP_READ_GATE_OVERHEAD, + SCENARIO_COUNT_RESOURCE_CONTROLS, SCENARIO_PROJECTION_REBUILD, SCENARIO_OUTBOX_REPLAY, SCENARIO_BROADCAST_LAG, @@ -1622,7 +1846,7 @@ mod tests { assert!(scenario.elapsed_micros > 0, "{name} elapsed"); } assert_eq!(report.dataset_profile().groups, 3); - assert_eq!(report.validation_summary().len(), 6); + assert_eq!(report.validation_summary().len(), 7); assert!( report .validation_summary() @@ -1702,6 +1926,7 @@ mod tests { let missing = serde_json::json!({ "pocket_query_p95_micros": 1, "read_gate_p95_micros": 1, + "count_resource_controls_p95_micros": 1, "projection_rebuild_elapsed_micros": 1, "outbox_replay_elapsed_micros": 1, "broadcast_lag_p95_micros": 1 @@ -1715,6 +1940,7 @@ mod tests { let unknown = serde_json::json!({ "pocket_query_p95_micros": 1, "read_gate_p95_micros": 1, + "count_resource_controls_p95_micros": 1, "projection_rebuild_elapsed_micros": 1, "outbox_replay_elapsed_micros": 1, "broadcast_lag_p95_micros": 1, @@ -1730,6 +1956,7 @@ mod tests { let zero = serde_json::json!({ "pocket_query_p95_micros": 0, "read_gate_p95_micros": 1, + "count_resource_controls_p95_micros": 1, "projection_rebuild_elapsed_micros": 1, "outbox_replay_elapsed_micros": 1, "broadcast_lag_p95_micros": 1, @@ -1773,6 +2000,7 @@ mod tests { vec![BenchmarkThresholds::smoke().read_gate_p95_micros + 1], 128, ), + passing_scenario(SCENARIO_COUNT_RESOURCE_CONTROLS), passing_scenario(SCENARIO_PROJECTION_REBUILD), passing_scenario(SCENARIO_OUTBOX_REPLAY), passing_scenario(SCENARIO_BROADCAST_LAG), @@ -1809,7 +2037,7 @@ mod tests { summary["dataset"]["fixture_family"], "synthetic repo-owned fixtures" ); - assert_eq!(summary["scenarios"].as_array().expect("scenarios").len(), 6); + assert_eq!(summary["scenarios"].as_array().expect("scenarios").len(), 7); assert_eq!( summary["validation_summary"][SCENARIO_POCKET_QUERY_VISIBLE_EVENTS], "pass" @@ -1827,6 +2055,19 @@ mod tests { } #[test] + fn count_resource_controls_scenario_accepts_bounded_counts_and_refuses_broad_counts() { + let dataset = + BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset"); + let scenario = + super::run_count_resource_control_benchmark(&dataset).expect("count controls"); + + assert_eq!(scenario.scenario, SCENARIO_COUNT_RESOURCE_CONTROLS); + assert_eq!(scenario.attempted, 5); + assert_eq!(scenario.accepted, scenario.attempted); + assert_eq!(scenario.rejected, 0); + } + + #[test] fn projection_rebuild_scenario_recreates_groups_and_members() { let dataset = BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 2)).expect("dataset"); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -184,7 +184,7 @@ async fn websocket_clients_use_nip01_nip42_and_nip45_flows() { send_client_value( &mut first, - json!(["COUNT", "count-websocket", {"kinds":[1]}]), + json!(["COUNT", "count-websocket", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_433}]), ) .await; assert_eq!( @@ -493,7 +493,7 @@ async fn websocket_healthy_subscriber_receives_more_than_outbound_capacity() { } send_client_value( &mut subscriber, - json!(["COUNT", "healthy-count", {"kinds":[1]}]), + json!(["COUNT", "healthy-count", {"kinds":[1], "since": 1_714_124_500, "until": 1_714_124_509}]), ) .await; assert_eq!(