commit 78edbcd5c77e36ba7fcbeaf1441dad6133c3149c
parent ba1ebbb8c81cc72eb5831899f40fff9236b337cd
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 16:53:10 -0700
runtime: remove count relay lock
Diffstat:
3 files changed, 96 insertions(+), 30 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -907,7 +907,31 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &BaseAuthState,
) -> Result<BaseRelayCountReport, BaseRelayError> {
- self.handle_count_with_group_auth_report(
+ Self::handle_count_with_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ self.query,
+ subscription_id,
+ filters,
+ auth,
+ )
+ }
+
+ pub(crate) fn handle_count_with_shared_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayCountReport, BaseRelayError> {
+ Self::handle_count_with_group_auth_shared_services(
+ store,
+ groups,
+ limits,
+ query,
subscription_id,
filters,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -930,12 +954,33 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<BaseRelayCountReport, BaseRelayError> {
- self.limits.validate_subscription_id(&subscription_id)?;
- self.limits.validate_filters(&filters)?;
+ Self::handle_count_with_group_auth_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ self.query,
+ subscription_id,
+ filters,
+ auth,
+ )
+ }
+
+ fn handle_count_with_group_auth_shared_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayCountReport, BaseRelayError> {
+ limits.validate_subscription_id(&subscription_id)?;
+ limits.validate_filters(&filters)?;
if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) {
return Ok(BaseRelayCountReport::new(message, false));
}
- let report = self.count_events_report(&filters, auth)?;
+ let report =
+ Self::count_events_report_with_services(store, groups, limits, query, &filters, auth)?;
Ok(BaseRelayCountReport::new(
RelayMessage::Count {
subscription_id,
@@ -1013,8 +1058,11 @@ impl BaseRelay {
))
}
- fn count_events_report(
- &self,
+ fn count_events_report_with_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
filters: &[Filter],
auth: &GroupAuthContext,
) -> Result<BaseRelayCountEventsReport, BaseRelayError> {
@@ -1022,7 +1070,9 @@ impl BaseRelay {
let mut group_read_denied = false;
for filter in filters {
let filter = filter.without_limit();
- let report = self.query_filter_events_report(&filter, auth)?;
+ let report = Self::query_filter_events_report_with_services(
+ store, groups, limits, query, &filter, auth,
+ )?;
group_read_denied |= report.group_read_denied;
for event in report.events {
seen.insert(event.id().clone());
@@ -1033,21 +1083,6 @@ impl BaseRelay {
Ok(BaseRelayCountEventsReport::new(count, group_read_denied))
}
- fn query_filter_events_report(
- &self,
- filter: &Filter,
- auth: &GroupAuthContext,
- ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
- Self::query_filter_events_report_with_services(
- &self.store,
- self.groups.as_ref(),
- self.limits,
- self.query,
- filter,
- auth,
- )
- }
-
fn query_filter_events_report_with_services(
store: &PocketStoreHandle,
groups: Option<&GroupServiceHandle>,
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -15,8 +15,8 @@ use crate::{
relay::{
auth::BaseAuthState,
core::{
- BaseRelay, BaseRelayEventWrite, BaseRelayLimits, BaseRelayQueryReport,
- BaseRelayShutdownReport,
+ BaseRelay, BaseRelayCountReport, BaseRelayEventWrite, BaseRelayLimits,
+ BaseRelayQueryReport, BaseRelayShutdownReport,
},
live::LiveSubscriptionSet,
},
@@ -413,6 +413,23 @@ impl TangleRuntimeShared {
)
}
+ fn handle_count_with_auth_report(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayCountReport, BaseRelayError> {
+ BaseRelay::handle_count_with_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits.base_relay_limits(),
+ self.config.pocket_query_config(),
+ subscription_id,
+ filters,
+ auth,
+ )
+ }
+
fn rate_limit_req(
&self,
subscription_id: &SubscriptionId,
@@ -750,12 +767,9 @@ impl TangleRuntimeHandle {
.record_query_latency(elapsed_micros(started_at));
return Ok(vec![message]);
}
- let report = self
- .inner
- .relay
- .lock()
- .await
- .handle_count_with_auth_report(subscription_id, filters, auth)?;
+ let report =
+ self.inner
+ .handle_count_with_auth_report(subscription_id, filters, auth)?;
if report.group_read_denied() {
self.inner.metrics.record_group_read_denial();
}
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -1362,6 +1362,23 @@ fn runtime_req_handling_does_not_lock_relay_state() {
}
#[test]
+fn runtime_count_handling_does_not_lock_relay_state() {
+ let runtime = include_str!("../src/runtime.rs");
+ let count_branch = runtime
+ .split("ClientMessage::Count {")
+ .nth(1)
+ .expect("count branch")
+ .split("ClientMessage::Auth")
+ .next()
+ .expect("auth branch");
+
+ assert!(!count_branch.contains("relay.lock().await"));
+ assert!(
+ count_branch.contains("handle_count_with_auth_report(subscription_id, filters, auth)?")
+ );
+}
+
+#[test]
fn runtime_hot_path_does_not_stringify_and_reparse_events() {
let conversion_boundary = include_str!("../src/pocket_conversion.rs");
for forbidden in [