commit 817b3031a3e44860144e331fb3e67d41b2f4af3a
parent 9ef58e4f38c26d4affdaea768d4fccdd935028d6
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 17:00:39 -0700
runtime: remove base relay mutex
Diffstat:
3 files changed, 49 insertions(+), 14 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -560,7 +560,16 @@ impl BaseRelay {
auth: &mut BaseAuthState,
now: UnixTimestamp,
) -> Vec<RelayMessage> {
- if let Err(error) = self.limits.validate_event(&event) {
+ Self::handle_auth_with_limits(self.limits, event, auth, now)
+ }
+
+ pub(crate) fn handle_auth_with_limits(
+ limits: BaseRelayLimits,
+ event: Event,
+ auth: &mut BaseAuthState,
+ now: UnixTimestamp,
+ ) -> Vec<RelayMessage> {
+ if let Err(error) = limits.validate_event(&event) {
return vec![RelayMessage::Ok {
event_id: event.id().clone(),
accepted: false,
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -41,7 +41,7 @@ use tangle_protocol::{
ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, UnixTimestamp,
};
use tangle_store_pocket::PocketStoreHandle;
-use tokio::sync::{Mutex, watch};
+use tokio::sync::watch;
pub struct TangleRuntime {
config: BaseRelayRuntimeConfig,
@@ -160,7 +160,6 @@ struct TangleRuntimeShared {
config: Arc<BaseRelayRuntimeConfig>,
store: PocketStoreHandle,
groups: Option<GroupServiceHandle>,
- relay: Mutex<BaseRelay>,
readiness: BaseRelayReadinessHandle,
limits: TangleRuntimeLimits,
event_bus: TangleEventBus,
@@ -187,7 +186,6 @@ impl TangleRuntimeShared {
config: Arc::new(config),
store,
groups,
- relay: Mutex::new(relay),
readiness,
limits,
event_bus,
@@ -795,11 +793,12 @@ impl TangleRuntimeHandle {
return Ok(vec![message]);
}
let event_for_failure = event.clone();
- let replies = self.inner.relay.lock().await.handle_client_message(
- ClientMessage::Auth(event),
+ let replies = BaseRelay::handle_auth_with_limits(
+ self.inner.limits.base_relay_limits(),
+ event,
auth,
now,
- )?;
+ );
if auth_response_failed(&replies) {
self.inner.metrics.record_auth_failure();
if let Some(message) = self.inner.rate_limit_auth_failure(
@@ -814,12 +813,13 @@ impl TangleRuntimeHandle {
}
Ok(replies)
}
- message => self
- .inner
- .relay
- .lock()
- .await
- .handle_client_message(message, auth, now),
+ ClientMessage::Close(subscription_id) => {
+ self.inner
+ .limits
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ Ok(Vec::new())
+ }
}
}
@@ -901,7 +901,8 @@ impl TangleRuntimeHandle {
pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
self.inner.shutdown.request_shutdown();
- self.inner.relay.lock().await.shutdown()
+ self.inner.store.sync()?;
+ Ok(BaseRelayShutdownReport::new(0))
}
}
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -1395,6 +1395,31 @@ fn runtime_live_fanout_offset_lookup_does_not_lock_relay_state() {
}
#[test]
+fn runtime_shared_shell_does_not_keep_transitional_base_relay_mutex() {
+ let runtime = include_str!("../src/runtime.rs");
+ let shared_shell = runtime
+ .split("struct TangleRuntimeShared {")
+ .nth(1)
+ .expect("shared shell")
+ .split("impl TangleRuntimeShared")
+ .next()
+ .expect("shared shell fields");
+ let handle_impl = runtime
+ .split("impl TangleRuntimeHandle")
+ .nth(1)
+ .expect("runtime handle")
+ .split("fn auth_response_failed")
+ .next()
+ .expect("runtime handle body");
+
+ assert!(!runtime.contains("Mutex<BaseRelay>"));
+ assert!(!runtime.contains("relay.lock().await"));
+ assert!(!shared_shell.contains("relay:"));
+ assert!(handle_impl.contains("BaseRelay::handle_auth_with_limits"));
+ assert!(handle_impl.contains("self.inner.store.sync()?"));
+}
+
+#[test]
fn runtime_hot_path_does_not_stringify_and_reparse_events() {
let conversion_boundary = include_str!("../src/pocket_conversion.rs");
for forbidden in [