commit ba1ebbb8c81cc72eb5831899f40fff9236b337cd
parent b357b27818a0ac898ecb6da1b0e83775715211a5
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 16:44:21 -0700
runtime: remove req relay lock
Diffstat:
3 files changed, 146 insertions(+), 45 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -500,13 +500,20 @@ impl BaseRelay {
}
}
- pub(crate) fn query_req_with_auth_report(
- &self,
+ pub(crate) fn query_req_with_shared_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
auth: &BaseAuthState,
) -> Result<BaseRelayQueryReport, BaseRelayError> {
- self.query_req_with_group_auth_report(
+ Self::query_req_with_group_auth_shared_services(
+ store,
+ groups,
+ limits,
+ query,
subscription_id,
filters,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -832,12 +839,33 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<BaseRelayQueryReport, BaseRelayError> {
- self.limits.validate_subscription_id(&subscription_id)?;
- self.limits.validate_filters(&filters)?;
+ Self::query_req_with_group_auth_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ self.query,
+ subscription_id,
+ filters,
+ auth,
+ )
+ }
+
+ fn query_req_with_group_auth_shared_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayQueryReport, BaseRelayError> {
+ limits.validate_subscription_id(&subscription_id)?;
+ limits.validate_filters(&filters)?;
if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) {
return Ok(BaseRelayQueryReport::new(vec![message], false));
}
- let report = self.query_events_report(&filters, auth)?;
+ let report =
+ Self::query_events_report_with_services(store, groups, limits, query, &filters, auth)?;
let group_read_denied = report.group_read_denied;
let mut messages = report
.events
@@ -950,13 +978,33 @@ impl BaseRelay {
filters: &[Filter],
auth: &GroupAuthContext,
) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
+ Self::query_events_report_with_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ self.query,
+ filters,
+ auth,
+ )
+ }
+
+ fn query_events_report_with_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
+ filters: &[Filter],
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
let mut output = Vec::new();
let mut group_read_denied = false;
for filter in filters {
- let report = self.query_filter_events_report(filter, auth)?;
+ let report = Self::query_filter_events_report_with_services(
+ store, groups, limits, query, filter, auth,
+ )?;
group_read_denied |= report.group_read_denied;
let mut events = Self::sort_and_dedupe_query_events(report.events);
- events.truncate(self.limits.effective_filter_limit(filter));
+ events.truncate(limits.effective_filter_limit(filter));
output.extend(events);
}
Ok(BaseRelayEventQueryReport::new(
@@ -990,36 +1038,49 @@ impl BaseRelay {
filter: &Filter,
auth: &GroupAuthContext,
) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
- let effective_filter = self.filter_with_effective_limit(filter);
+ Self::query_filter_events_report_with_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ self.query,
+ filter,
+ auth,
+ )
+ }
+
+ fn query_filter_events_report_with_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ query: PocketQueryConfig,
+ filter: &Filter,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
+ let effective_filter = Self::filter_with_limits(limits, filter);
let pocket_filter = tangle_filter_to_pocket(&effective_filter)?;
let screen_error = RefCell::new(None);
- let screened =
- self.store
- .find_events_with_screen(&pocket_filter, self.query, |pocket_event| {
- if screen_error.borrow().is_some() {
- return PocketScreenResult::Mismatch;
- }
- match pocket_filter.event_matches(pocket_event) {
- Ok(false) => PocketScreenResult::Mismatch,
- Ok(true) => match Self::group_read_gate_visible_to_auth(
- self.groups.as_ref(),
- pocket_event,
- auth,
- ) {
- Ok(true) => PocketScreenResult::Match,
- Ok(false) => PocketScreenResult::Redacted,
- Err(error) => {
- *screen_error.borrow_mut() = Some(error);
- PocketScreenResult::Mismatch
- }
- },
+ let screened = store.find_events_with_screen(&pocket_filter, query, |pocket_event| {
+ if screen_error.borrow().is_some() {
+ return PocketScreenResult::Mismatch;
+ }
+ match pocket_filter.event_matches(pocket_event) {
+ Ok(false) => PocketScreenResult::Mismatch,
+ Ok(true) => {
+ match Self::group_read_gate_visible_to_auth(groups, pocket_event, auth) {
+ Ok(true) => PocketScreenResult::Match,
+ Ok(false) => PocketScreenResult::Redacted,
Err(error) => {
- *screen_error.borrow_mut() =
- Some(BaseRelayError::error(error.to_string()));
+ *screen_error.borrow_mut() = Some(error);
PocketScreenResult::Mismatch
}
}
- })?;
+ }
+ Err(error) => {
+ *screen_error.borrow_mut() = Some(BaseRelayError::error(error.to_string()));
+ PocketScreenResult::Mismatch
+ }
+ }
+ })?;
if let Some(error) = screen_error.into_inner() {
return Err(error);
}
@@ -1032,10 +1093,10 @@ impl BaseRelay {
Ok(BaseRelayEventQueryReport::new(events, group_read_denied))
}
- fn filter_with_effective_limit(&self, filter: &Filter) -> Filter {
+ fn filter_with_limits(limits: BaseRelayLimits, filter: &Filter) -> Filter {
match filter.limit() {
Some(_) => filter.clone(),
- None => filter.with_limit(self.limits.default_limit()),
+ None => filter.with_limit(limits.default_limit()),
}
}
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -14,7 +14,10 @@ use crate::{
},
relay::{
auth::BaseAuthState,
- core::{BaseRelay, BaseRelayEventWrite, BaseRelayLimits, BaseRelayShutdownReport},
+ core::{
+ BaseRelay, BaseRelayEventWrite, BaseRelayLimits, BaseRelayQueryReport,
+ BaseRelayShutdownReport,
+ },
live::LiveSubscriptionSet,
},
};
@@ -393,6 +396,23 @@ impl TangleRuntimeShared {
.unwrap_or(0)
}
+ fn query_req_with_auth_report(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayQueryReport, BaseRelayError> {
+ BaseRelay::query_req_with_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits.base_relay_limits(),
+ self.config.pocket_query_config(),
+ subscription_id,
+ filters,
+ auth,
+ )
+ }
+
fn rate_limit_req(
&self,
subscription_id: &SubscriptionId,
@@ -686,11 +706,9 @@ impl TangleRuntimeHandle {
.record_query_latency(elapsed_micros(started_at));
return Ok(vec![message]);
}
- let report = self.inner.relay.lock().await.query_req_with_auth_report(
- subscription_id,
- filters,
- auth,
- )?;
+ let report =
+ self.inner
+ .query_req_with_auth_report(subscription_id, filters, auth)?;
if report.group_read_denied() {
self.inner.metrics.record_group_read_denial();
}
@@ -822,11 +840,9 @@ impl TangleRuntimeHandle {
auth: &BaseAuthState,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
let started_at = Instant::now();
- let report = self.inner.relay.lock().await.query_req_with_auth_report(
- subscription_id,
- filters,
- auth,
- )?;
+ let report = self
+ .inner
+ .query_req_with_auth_report(subscription_id, filters, auth)?;
if report.group_read_denied() {
self.inner.metrics.record_group_read_denial();
}
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -1338,6 +1338,30 @@ fn runtime_event_handling_does_not_lock_relay_state() {
}
#[test]
+fn runtime_req_handling_does_not_lock_relay_state() {
+ let runtime = include_str!("../src/runtime.rs");
+ let req_branch = runtime
+ .split("ClientMessage::Req {")
+ .nth(1)
+ .expect("req branch")
+ .split("ClientMessage::Count")
+ .next()
+ .expect("count branch");
+ let query_helper = runtime
+ .split("pub(crate) async fn query_req_with_auth")
+ .nth(1)
+ .expect("query helper")
+ .split("pub async fn event_by_offset_with_auth")
+ .next()
+ .expect("offset helper");
+
+ assert!(!req_branch.contains("relay.lock().await"));
+ assert!(!query_helper.contains("relay.lock().await"));
+ assert!(req_branch.contains("query_req_with_auth_report(subscription_id, filters, auth)?"));
+ assert!(query_helper.contains("query_req_with_auth_report(subscription_id, filters, auth)?"));
+}
+
+#[test]
fn runtime_hot_path_does_not_stringify_and_reparse_events() {
let conversion_boundary = include_str!("../src/pocket_conversion.rs");
for forbidden in [