tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 2897f545c1705ef43a29ac5d53094b7e93df7237
parent 23fb3e1ceba3fec8fadfad293ede580b1e6f91fb
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 16:14:51 -0700

runtime: introduce shared handle state

Diffstat:
Mcrates/tangle_runtime/src/runtime.rs | 237++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
1 file changed, 143 insertions(+), 94 deletions(-)

diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -148,6 +148,42 @@ impl TangleRuntime { self.shutdown.request_shutdown(); self.relay.shutdown() } +} + +struct TangleRuntimeShared { + config: Arc<BaseRelayRuntimeConfig>, + relay: Mutex<BaseRelay>, + readiness: BaseRelayReadinessHandle, + limits: TangleRuntimeLimits, + event_bus: TangleEventBus, + rate_limiter: TangleRateLimiter, + metrics: TangleRuntimeMetrics, + shutdown: TangleShutdownSignal, +} + +impl TangleRuntimeShared { + fn from_runtime(runtime: TangleRuntime) -> Self { + let TangleRuntime { + config, + relay, + readiness, + limits, + event_bus, + rate_limiter, + metrics, + shutdown, + } = runtime; + Self { + config: Arc::new(config), + relay: Mutex::new(relay), + readiness, + limits, + event_bus, + rate_limiter, + metrics, + shutdown, + } + } fn rate_limit_event( &self, @@ -496,25 +532,26 @@ impl TangleRuntime { #[derive(Clone)] pub struct TangleRuntimeHandle { - inner: Arc<Mutex<TangleRuntime>>, - metrics: TangleRuntimeMetrics, + inner: Arc<TangleRuntimeShared>, } impl TangleRuntimeHandle { pub fn new(runtime: TangleRuntime) -> Self { - let metrics = runtime.metrics().clone(); Self { - inner: Arc::new(Mutex::new(runtime)), - metrics, + inner: Arc::new(TangleRuntimeShared::from_runtime(runtime)), } } pub fn metrics(&self) -> TangleRuntimeMetrics { - self.metrics.clone() + self.inner.metrics.clone() + } + + pub fn readiness_handle(&self) -> BaseRelayReadinessHandle { + self.inner.readiness.clone() } pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { - self.inner.lock().await.auth_state() + self.inner.config.auth_state() } pub async fn handle_client_message( @@ -539,49 +576,55 @@ impl TangleRuntimeHandle { rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.metrics + self.inner + .metrics .record_client_message(client_message_metric_kind(&message)); - let mut runtime = self.inner.lock().await; match message { ClientMessage::Event(event) => { let started_at = Instant::now(); let event_id = event.id().clone(); - let is_group_event = runtime.is_group_event(&event); - if let Some(message) = runtime.rate_limit_event(&event, rate_limit_context, now) { - record_event_metrics(runtime.metrics(), &message, is_group_event, started_at); + let is_group_event = self.inner.is_group_event(&event); + if let Some(message) = self.inner.rate_limit_event(&event, rate_limit_context, now) + { + record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); return Ok(vec![message]); } if let Some(message) = - runtime.rate_limit_group_write(&event, rate_limit_context, now) + self.inner + .rate_limit_group_write(&event, rate_limit_context, now) { - record_event_metrics(runtime.metrics(), &message, is_group_event, started_at); + record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); return Ok(vec![message]); } - let result = runtime - .relay_mut() - .handle_event_with_auth_report(event, auth)?; + let (result, group_outbox_pending_events) = { + let mut relay = self.inner.relay.lock().await; + let result = relay.handle_event_with_auth_report(event, auth)?; + let pending_events = + is_group_event.then(|| relay.group_outbox_pending_events()); + (result, pending_events) + }; if is_group_event { for _ in 0..result.stored_offsets().len().saturating_sub(1) { - runtime.metrics().record_outbox_replayed_event(); + self.inner.metrics.record_outbox_replayed_event(); } - runtime.metrics().record_outbox_pending_events( - runtime.relay().group_outbox_pending_events(), - ); + self.inner + .metrics + .record_outbox_pending_events(group_outbox_pending_events.unwrap_or(0)); } for offset in result.stored_offsets() { - runtime.metrics().record_stored_event_offset(); - let receivers = runtime.event_bus().publish(*offset); - runtime.metrics().record_event_bus_publish(receivers); + self.inner.metrics.record_stored_event_offset(); + let receivers = self.inner.event_bus.publish(*offset); + self.inner.metrics.record_event_bus_publish(receivers); } if !result.stored_offsets().is_empty() { logging::log_event_stored( &event_id, result.stored_offsets().len(), - runtime.metrics().stored_event_offsets(), + self.inner.metrics.stored_event_offsets(), ); } let message = result.into_message(); - record_event_metrics(runtime.metrics(), &message, is_group_event, started_at); + record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); Ok(vec![message]) } ClientMessage::Req { @@ -589,44 +632,44 @@ impl TangleRuntimeHandle { filters, } => { let started_at = Instant::now(); - runtime - .limits() + self.inner + .limits .base_relay_limits() .validate_subscription_id(&subscription_id)?; - runtime - .limits() + self.inner + .limits .base_relay_limits() .validate_filters(&filters)?; if let Some(message) = BaseRelay::unsupported_search_closed(&subscription_id, &filters) { - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message]); } - if let Some(message) = runtime.rate_limit_req( + if let Some(message) = self.inner.rate_limit_req( &subscription_id, &filters, auth, rate_limit_context, now, ) { - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message]); } - let report = runtime.relay_mut().query_req_with_auth_report( + let report = self.inner.relay.lock().await.query_req_with_auth_report( subscription_id, filters, auth, )?; if report.group_read_denied() { - runtime.metrics().record_group_read_denial(); + self.inner.metrics.record_group_read_denial(); } - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); Ok(report.into_messages()) } @@ -635,50 +678,51 @@ impl TangleRuntimeHandle { filters, } => { let started_at = Instant::now(); - runtime - .limits() + self.inner + .limits .base_relay_limits() .validate_subscription_id(&subscription_id)?; - runtime - .limits() + self.inner + .limits .base_relay_limits() .validate_filters(&filters)?; if let Some(message) = BaseRelay::unsupported_search_closed(&subscription_id, &filters) { - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message]); } - if let Some(message) = runtime.rate_limit_count( + if let Some(message) = self.inner.rate_limit_count( &subscription_id, &filters, auth, rate_limit_context, now, ) { - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); return Ok(vec![message]); } - let report = runtime.relay_mut().handle_count_with_auth_report( - subscription_id, - filters, - auth, - )?; + let report = self + .inner + .relay + .lock() + .await + .handle_count_with_auth_report(subscription_id, filters, auth)?; if report.group_read_denied() { - runtime.metrics().record_group_read_denial(); + self.inner.metrics.record_group_read_denial(); } - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); Ok(vec![report.into_message()]) } ClientMessage::Auth(event) => { - if let Err(error) = runtime.limits().base_relay_limits().validate_event(&event) { - runtime.metrics().record_auth_failure(); + if let Err(error) = self.inner.limits.base_relay_limits().validate_event(&event) { + self.inner.metrics.record_auth_failure(); return Ok(vec![RelayMessage::Ok { event_id: event.id().clone(), accepted: false, @@ -686,46 +730,51 @@ impl TangleRuntimeHandle { }]); } if let Some(message) = - runtime.rate_limit_auth_attempt(&event, rate_limit_context, now) + self.inner + .rate_limit_auth_attempt(&event, rate_limit_context, now) { - runtime.metrics().record_auth_failure(); + self.inner.metrics.record_auth_failure(); return Ok(vec![message]); } let event_for_failure = event.clone(); - let replies = runtime.relay_mut().handle_client_message( + let replies = self.inner.relay.lock().await.handle_client_message( ClientMessage::Auth(event), auth, now, )?; if auth_response_failed(&replies) { - runtime.metrics().record_auth_failure(); - if let Some(message) = - runtime.rate_limit_auth_failure(&event_for_failure, rate_limit_context, now) - { + self.inner.metrics.record_auth_failure(); + if let Some(message) = self.inner.rate_limit_auth_failure( + &event_for_failure, + rate_limit_context, + now, + ) { return Ok(vec![message]); } } else { - runtime.metrics().record_auth_success(); + self.inner.metrics.record_auth_success(); } Ok(replies) } - message => runtime - .relay_mut() + message => self + .inner + .relay + .lock() + .await .handle_client_message(message, auth, now), } } pub async fn subscribe_events(&self) -> TangleEventReceiver { - let runtime = self.inner.lock().await; - let receiver = runtime.event_bus().subscribe(); - runtime - .metrics() - .record_event_bus_receivers(runtime.event_bus().receiver_count()); + let receiver = self.inner.event_bus.subscribe(); + self.inner + .metrics + .record_event_bus_receivers(self.inner.event_bus.receiver_count()); receiver } pub async fn rate_limiter(&self) -> TangleRateLimiter { - self.inner.lock().await.rate_limiter().clone() + self.inner.rate_limiter.clone() } pub async fn rate_limit_req( @@ -736,13 +785,8 @@ impl TangleRuntimeHandle { rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Option<RelayMessage> { - self.inner.lock().await.rate_limit_req( - subscription_id, - filters, - auth, - rate_limit_context, - now, - ) + self.inner + .rate_limit_req(subscription_id, filters, auth, rate_limit_context, now) } pub(crate) async fn query_req_with_auth( @@ -752,16 +796,16 @@ impl TangleRuntimeHandle { auth: &BaseAuthState, ) -> Result<Vec<RelayMessage>, BaseRelayError> { let started_at = Instant::now(); - let mut runtime = self.inner.lock().await; - let report = - runtime - .relay_mut() - .query_req_with_auth_report(subscription_id, filters, auth)?; + let report = self.inner.relay.lock().await.query_req_with_auth_report( + subscription_id, + filters, + auth, + )?; if report.group_read_denied() { - runtime.metrics().record_group_read_denial(); + self.inner.metrics.record_group_read_denial(); } - runtime - .metrics() + self.inner + .metrics .record_query_latency(elapsed_micros(started_at)); Ok(report.into_messages()) } @@ -771,10 +815,14 @@ impl TangleRuntimeHandle { offset: StoreOffset, auth: &BaseAuthState, ) -> Result<Option<Event>, BaseRelayError> { - let runtime = self.inner.lock().await; - let event = runtime.relay().event_by_offset_with_auth(offset, auth)?; + let event = self + .inner + .relay + .lock() + .await + .event_by_offset_with_auth(offset, auth)?; if event.is_none() { - runtime.metrics().record_group_read_denial(); + self.inner.metrics.record_group_read_denial(); } Ok(event) } @@ -785,14 +833,15 @@ impl TangleRuntimeHandle { subscriptions: &mut LiveSubscriptionSet, ) -> Result<Vec<RelayMessage>, BaseRelayError> { self.inner + .relay .lock() .await - .relay() .fanout_offset(offset, subscriptions) } pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> { - self.inner.lock().await.shutdown() + self.inner.shutdown.request_shutdown(); + self.inner.relay.lock().await.shutdown() } }