commit b357b27818a0ac898ecb6da1b0e83775715211a5
parent 6c55c2b65a60652decc87f5645641ce0605d2c26
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 16:38:57 -0700
runtime: remove event relay lock
Diffstat:
3 files changed, 84 insertions(+), 25 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -596,7 +596,26 @@ impl BaseRelay {
event: Event,
auth: &BaseAuthState,
) -> Result<BaseRelayEventWrite, BaseRelayError> {
- self.handle_event_with_group_auth(
+ Self::handle_event_with_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ event,
+ auth,
+ )
+ }
+
+ pub(crate) fn handle_event_with_shared_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ event: Event,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayEventWrite, BaseRelayError> {
+ Self::handle_event_with_group_auth_and_services(
+ store,
+ groups,
+ limits,
event,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
)
@@ -640,8 +659,24 @@ impl BaseRelay {
event: Event,
auth: &GroupAuthContext,
) -> Result<BaseRelayEventWrite, BaseRelayError> {
+ Self::handle_event_with_group_auth_and_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits,
+ event,
+ auth,
+ )
+ }
+
+ fn handle_event_with_group_auth_and_services(
+ store: &PocketStoreHandle,
+ groups: Option<&GroupServiceHandle>,
+ limits: BaseRelayLimits,
+ event: Event,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayEventWrite, BaseRelayError> {
let event_id = event.id().clone();
- if let Err(error) = self.limits.validate_event(&event) {
+ if let Err(error) = limits.validate_event(&event) {
return Ok(BaseRelayEventWrite::unstored(ok_rejected(
event_id,
error.prefixed_message(),
@@ -662,11 +697,7 @@ impl BaseRelay {
.prefixed_message(),
)));
}
- let group_limits = self
- .groups
- .as_ref()
- .map(GroupServiceHandle::limits)
- .unwrap_or_default();
+ let group_limits = groups.map(GroupServiceHandle::limits).unwrap_or_default();
let audit_class = classify_group_event(&event, group_limits).ok();
let class = match validate_client_group_event_structure(&event, group_limits) {
Ok(class) => class,
@@ -685,7 +716,7 @@ impl BaseRelay {
}
};
if !matches!(class, GroupEventClass::NonGroup) {
- let Some(groups) = self.groups.as_ref() else {
+ let Some(groups) = groups else {
logging::log_group_moderation_audit(
&event,
&class,
@@ -696,7 +727,7 @@ impl BaseRelay {
"blocked: NIP-29 group events are not accepted before group service".to_owned(),
)));
};
- if let Err(error) = groups.check_event(&self.store, &event, &class, auth) {
+ if let Err(error) = groups.check_event(store, &event, &class, auth) {
logging::log_group_moderation_audit(
&event,
&class,
@@ -719,24 +750,20 @@ impl BaseRelay {
String::new(),
)));
}
- if self
- .store
- .event_by_id(pocket_event_id(&event_id)?)?
- .is_some()
- {
+ if store.event_by_id(pocket_event_id(&event_id)?)?.is_some() {
return Ok(BaseRelayEventWrite::unstored(ok_accepted(
event_id,
"duplicate: already have this event".to_owned(),
)));
}
let pocket_event = tangle_event_to_pocket(&event)?;
- let store_offset = StoreOffset::new(self.store.store_event(&pocket_event)?);
+ let store_offset = StoreOffset::new(store.store_event(&pocket_event)?);
let mut stored_offsets = vec![store_offset];
if !matches!(class, GroupEventClass::NonGroup)
- && let Some(groups) = self.groups.as_ref()
+ && let Some(groups) = groups
{
stored_offsets.extend(groups.after_source_event_stored(
- &self.store,
+ store,
&event,
&class,
store_offset,
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -14,7 +14,7 @@ use crate::{
},
relay::{
auth::BaseAuthState,
- core::{BaseRelay, BaseRelayLimits, BaseRelayShutdownReport},
+ core::{BaseRelay, BaseRelayEventWrite, BaseRelayLimits, BaseRelayShutdownReport},
live::LiveSubscriptionSet,
},
};
@@ -372,6 +372,27 @@ impl TangleRuntimeShared {
.is_ok_and(|class| !matches!(class, GroupEventClass::NonGroup))
}
+ fn handle_event_with_auth_report(
+ &self,
+ event: Event,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayEventWrite, BaseRelayError> {
+ BaseRelay::handle_event_with_shared_services(
+ &self.store,
+ self.groups.as_ref(),
+ self.limits.base_relay_limits(),
+ event,
+ auth,
+ )
+ }
+
+ fn group_outbox_pending_events(&self) -> usize {
+ self.groups
+ .as_ref()
+ .map(GroupServiceHandle::outbox_pending_events)
+ .unwrap_or(0)
+ }
+
fn rate_limit_req(
&self,
subscription_id: &SubscriptionId,
@@ -605,13 +626,9 @@ impl TangleRuntimeHandle {
record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
return Ok(vec![message]);
}
- let (result, group_outbox_pending_events) = {
- let relay = self.inner.relay.lock().await;
- let result = relay.handle_event_with_auth_report(event, auth)?;
- let pending_events =
- is_group_event.then(|| relay.group_outbox_pending_events());
- (result, pending_events)
- };
+ let result = self.inner.handle_event_with_auth_report(event, auth)?;
+ let group_outbox_pending_events =
+ is_group_event.then(|| self.inner.group_outbox_pending_events());
if is_group_event {
for _ in 0..result.stored_offsets().len().saturating_sub(1) {
self.inner.metrics.record_outbox_replayed_event();
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -1323,6 +1323,21 @@ fn req_count_and_live_fanout_share_one_group_read_gate() {
}
#[test]
+fn runtime_event_handling_does_not_lock_relay_state() {
+ let runtime = include_str!("../src/runtime.rs");
+ let event_branch = runtime
+ .split("ClientMessage::Event(event) => {")
+ .nth(1)
+ .expect("event branch")
+ .split("ClientMessage::Req")
+ .next()
+ .expect("req branch");
+
+ assert!(!event_branch.contains("relay.lock().await"));
+ assert!(event_branch.contains("self.inner.handle_event_with_auth_report(event, auth)?"));
+}
+
+#[test]
fn runtime_hot_path_does_not_stringify_and_reparse_events() {
let conversion_boundary = include_str!("../src/pocket_conversion.rs");
for forbidden in [