commit d05f5d169c85a0207db6faaae0065d0f6d0ad52c
parent 67cb8a1a310e12f06b83b28654a8b34462b04c64
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 18:11:16 -0700
runtime: refuse broad count queries
Diffstat:
3 files changed, 94 insertions(+), 8 deletions(-)
diff --git a/crates/tangle_runtime/src/errors.rs b/crates/tangle_runtime/src/errors.rs
@@ -32,6 +32,13 @@ impl BaseRelayError {
}
}
+ pub fn restricted(message: impl Into<String>) -> Self {
+ Self {
+ prefix: "restricted",
+ message: message.into(),
+ }
+ }
+
pub fn error(message: impl Into<String>) -> Self {
Self {
prefix: "error",
@@ -103,6 +110,10 @@ mod tests {
"rate-limited: slow down"
);
assert_eq!(
+ BaseRelayError::restricted("nope").prefixed_message(),
+ "restricted: nope"
+ );
+ assert_eq!(
BaseRelayError::error("store").prefixed_message(),
"error: store"
);
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -621,6 +621,21 @@ impl TangleRuntimeShared {
})
}
+ fn refuse_broad_count(
+ &self,
+ subscription_id: &SubscriptionId,
+ filters: &[Filter],
+ ) -> Option<RelayMessage> {
+ TangleQueryClassifier::new(self.limits.base_relay_limits())
+ .classify_count(filters)
+ .is_broad()
+ .then(|| RelayMessage::Closed {
+ subscription_id: subscription_id.clone(),
+ message: BaseRelayError::restricted("count filters are too broad or expensive")
+ .prefixed_message(),
+ })
+ }
+
fn rate_limit_query(&self, request: TangleQueryRateLimitRequest<'_>) -> Option<RelayMessage> {
if let Some(peer_ip) = request.context.peer_ip
&& let Some(message) = self.rate_limit_closed(
@@ -908,6 +923,12 @@ impl TangleRuntimeHandle {
.record_query_latency(elapsed_micros(started_at));
return Ok(vec![message]);
}
+ if let Some(message) = self.inner.refuse_broad_count(&subscription_id, &filters) {
+ self.inner
+ .metrics
+ .record_query_latency(elapsed_micros(started_at));
+ return Ok(vec![message]);
+ }
if let Some(message) = self.inner.rate_limit_count(
&subscription_id,
&filters,
@@ -2946,7 +2967,9 @@ mod tests {
let handle = TangleRuntimeHandle::new(runtime);
let mut auth = handle.auth_state().await.expect("auth");
let subscription_id = SubscriptionId::new("limited-count-ip").expect("subscription");
- let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
+ let filters = vec![
+ filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"),
+ ];
assert_eq!(
handle
@@ -3037,7 +3060,9 @@ mod tests {
let handle = TangleRuntimeHandle::new(runtime);
let mut auth = handle.auth_state().await.expect("auth");
let subscription_id = SubscriptionId::new("limited-count-kind").expect("subscription");
- let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
+ let filters = vec![
+ filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"),
+ ];
assert_eq!(
handle
@@ -3061,8 +3086,8 @@ mod tests {
}
#[tokio::test]
- async fn runtime_rate_limits_count_broad_queries() {
- let root = temp_root("runtime-count-broad-rate-limit");
+ async fn runtime_refuses_broad_count_queries_before_rate_limits() {
+ let root = temp_root("runtime-count-broad-refusal");
let _ = std::fs::remove_dir_all(&root);
let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
let rule = runtime.config().rate_limits().count().broad();
@@ -3094,8 +3119,7 @@ mod tests {
.expect("count"),
vec![RelayMessage::Closed {
subscription_id,
- message: "rate-limited: count broad rate limit exceeded until 1714124493"
- .to_owned()
+ message: "restricted: count filters are too broad or expensive".to_owned()
}]
);
@@ -3103,6 +3127,57 @@ mod tests {
}
#[tokio::test]
+ async fn runtime_refuses_expensive_count_queries_deterministically() {
+ let root = temp_root("runtime-count-expensive-refusal");
+ let _ = std::fs::remove_dir_all(&root);
+ let handle = TangleRuntimeHandle::new(
+ TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"),
+ );
+ let mut auth = handle.auth_state().await.expect("auth");
+ let cases = [
+ ("missing-selector", json!({"kinds": [1], "limit": 1})),
+ (
+ "high-limit",
+ json!({"kinds": [1], "#h": ["Farm"], "limit": 500}),
+ ),
+ (
+ "broad-window",
+ json!({
+ "kinds": [1],
+ "since": 1,
+ "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2,
+ "limit": 1
+ }),
+ ),
+ ];
+
+ for (name, value) in cases {
+ let subscription_id = SubscriptionId::new(name).expect("subscription");
+ let filters = vec![filter_from_value(&value).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Count {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("count"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "restricted: count filters are too broad or expensive".to_owned()
+ }]
+ );
+ }
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() {
let root = temp_root("runtime-generated-offset-fanout");
let _ = std::fs::remove_dir_all(&root);
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -364,7 +364,7 @@ async fn websocket_public_relay_covers_query_count_ephemeral_and_rejection_flows
send_client_value(
&mut publisher,
- json!(["COUNT", "count-kind-one", {"kinds":[1]}]),
+ json!(["COUNT", "count-kind-one", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_435}]),
)
.await;
assert_eq!(
@@ -374,7 +374,7 @@ async fn websocket_public_relay_covers_query_count_ephemeral_and_rejection_flows
send_client_value(
&mut publisher,
- json!(["COUNT", "count-ephemeral", {"kinds":[20001]}]),
+ json!(["COUNT", "count-ephemeral", {"kinds":[20001], "since": 1_714_124_437, "until": 1_714_124_437}]),
)
.await;
assert_eq!(