tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 1a2c9857097e5446a920f5afeec745486630fa12
parent 56cc5dc4589285464b4fb2076228082ee87a13ec
Author: triesap <tyson@radroots.org>
Date:   Fri, 19 Jun 2026 15:55:51 -0700

runtime: extract neutral relay runtime boundary

- rename per-relay runtime handles away from tenant-owned terminology
- move aggregate resource permits into a neutral runtime module
- route host and websocket sessions through neutral relay resources
- add source invariants for relay runtime and session admission boundaries

Diffstat:
Mcrates/tangle/tests/source_invariant.rs | 20+++++++++++++++++---
Mcrates/tangle_bench/src/lib.rs | 6+++---
Mcrates/tangle_runtime/src/host.rs | 196+++++++++----------------------------------------------------------------------
Mcrates/tangle_runtime/src/lib.rs | 1+
Acrates/tangle_runtime/src/resource_limits.rs | 160+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/runtime.rs | 177++++++++++++++++++++++++++++++++++++++-----------------------------------------
Mcrates/tangle_runtime/src/server.rs | 2+-
Mcrates/tangle_runtime/src/session.rs | 77+++++++++++++++++++++++++++++++++++++----------------------------------------
Mcrates/tangle_runtime/tests/ops_truthfulness.rs | 4++--
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 12++++++------
10 files changed, 334 insertions(+), 321 deletions(-)

diff --git a/crates/tangle/tests/source_invariant.rs b/crates/tangle/tests/source_invariant.rs @@ -43,7 +43,7 @@ fn tangle_v1_mvp_authority_requires_virtual_relay_tenancy() { } #[test] -fn tenant_runtime_surface_has_no_stale_single_runtime_api_names() { +fn relay_runtime_surface_has_no_stale_single_runtime_api_names() { let workspace_root = workspace_root(); let runtime_source = fs::read_to_string(workspace_root.join("crates/tangle_runtime/src/runtime.rs")) @@ -54,6 +54,9 @@ fn tenant_runtime_surface_has_no_stale_single_runtime_api_names() { "pub struct TangleRuntime {", "TangleRuntimeHandle", "TangleRuntimeShared", + "pub struct TenantRuntime {", + "TenantRuntimeHandle", + "TenantRuntimeShared", "load_base_relay_runtime_config", "open_tangle_runtime_from_config_path", ] { @@ -62,8 +65,8 @@ fn tenant_runtime_surface_has_no_stale_single_runtime_api_names() { "stale single-runtime API name remains: {forbidden}" ); } - assert!(runtime_source.contains("pub struct TenantRuntime")); - assert!(runtime_source.contains("pub struct TenantRuntimeHandle")); + assert!(runtime_source.contains("pub struct RelayRuntime")); + assert!(runtime_source.contains("pub struct RelayRuntimeHandle")); } #[test] @@ -77,6 +80,9 @@ fn tangle_v1_mvp_source_invariants_guard_tenancy_boundaries() { .expect("server source"); let host_source = fs::read_to_string(workspace_root.join("crates/tangle_runtime/src/host.rs")) .expect("host source"); + let session_source = + fs::read_to_string(workspace_root.join("crates/tangle_runtime/src/session.rs")) + .expect("session source"); assert!( config_source.contains("fn reject_legacy_single_relay_config"), @@ -107,6 +113,14 @@ fn tangle_v1_mvp_source_invariants_guard_tenancy_boundaries() { .contains(".tenant_by_host(&host)\n .ok_or(HostResolutionError::Unknown)"), "relay request routing must fail closed when the host is not a configured tenant" ); + assert!( + session_source.contains("resource_limits::{RelayResourceLimiter, RelaySubscriptionPermit}"), + "websocket sessions must depend on neutral relay resource admission types" + ); + assert!( + !session_source.contains("host::{RelayResourceLimiter, RelaySubscriptionPermit}"), + "websocket sessions must not import host-layer resource permits" + ); let tenant_resolution = server_source .find("let tenant = match resolve_tenant") .expect("tenant resolution"); diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -24,7 +24,7 @@ use tangle_runtime::{ core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, outbound::RuntimeRelayMessage, }, - runtime::{TenantRuntime, TenantRuntimeHandle}, + runtime::{RelayRuntime, RelayRuntimeHandle}, }; use tangle_store_pocket::{ PocketEventId, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, PocketPubkey, @@ -1736,8 +1736,8 @@ struct CountResourceControlProbe { async fn runtime_count_resource_control_probe() -> Result<CountResourceControlProbe, String> { let root = bench_temp_root("count-resource-controls-runtime"); let _ = fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(bench_runtime_config(&root)?).map_err(|error| error.to_string())?, + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(bench_runtime_config(&root)?).map_err(|error| error.to_string())?, ); let mut auth = handle .auth_state() diff --git a/crates/tangle_runtime/src/host.rs b/crates/tangle_runtime/src/host.rs @@ -1,31 +1,30 @@ #![forbid(unsafe_code)] use crate::{ - config::{TangleHostLimitsConfig, TangleHostRuntimeConfigSet, TenantRuntimeConfig}, + config::{TangleHostRuntimeConfigSet, TenantRuntimeConfig}, errors::BaseRelayError, ops::BaseRelayReadinessCheckStatus, - runtime::{TangleShutdownSignal, TenantRuntime, TenantRuntimeHandle}, + resource_limits::RelayResourceLimiter, + runtime::{RelayRuntime, RelayRuntimeHandle, TangleShutdownSignal}, tenant::{CanonicalHost, TenantId, TenantRelayUrl, TenantSchema}, }; -use std::{ - collections::BTreeMap, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, -}; +use std::collections::BTreeMap; #[derive(Debug, Clone)] pub struct TangleHostRuntime { config: TangleHostRuntimeConfigSet, registry: TenantRegistry, - resources: TangleHostResourceLimiter, + resources: RelayResourceLimiter, shutdown: TangleShutdownSignal, } impl TangleHostRuntime { pub fn open(config: TangleHostRuntimeConfigSet) -> Result<Self, BaseRelayError> { - let resources = TangleHostResourceLimiter::new(config.host().limits()); + let limits = config.host().limits(); + let resources = RelayResourceLimiter::new( + limits.max_total_connections(), + limits.max_total_subscriptions(), + ); let registry = TenantRegistry::open(&config)?; Ok(Self { config, @@ -43,7 +42,7 @@ impl TangleHostRuntime { &self.registry } - pub fn resources(&self) -> TangleHostResourceLimiter { + pub fn resources(&self) -> RelayResourceLimiter { self.resources.clone() } @@ -288,11 +287,11 @@ impl TenantRegistry { #[derive(Clone)] pub struct TenantRuntimeEntry { config: TenantRuntimeConfig, - runtime: TenantRuntimeHandle, + runtime: RelayRuntimeHandle, } impl TenantRuntimeEntry { - pub fn new(config: TenantRuntimeConfig, runtime: TenantRuntimeHandle) -> Self { + pub fn new(config: TenantRuntimeConfig, runtime: RelayRuntimeHandle) -> Self { Self { config, runtime } } @@ -316,7 +315,7 @@ impl TenantRuntimeEntry { self.config.relay_url() } - pub fn runtime(&self) -> &TenantRuntimeHandle { + pub fn runtime(&self) -> &RelayRuntimeHandle { &self.runtime } } @@ -333,145 +332,13 @@ impl std::fmt::Debug for TenantRuntimeEntry { } } -#[derive(Debug, Clone)] -pub struct TangleHostResourceLimiter { - inner: Arc<TangleHostResourceLimiterInner>, -} - -#[derive(Debug)] -struct TangleHostResourceLimiterInner { - max_total_connections: usize, - max_total_subscriptions: usize, - active_connections: AtomicUsize, - active_subscriptions: AtomicUsize, -} - -impl TangleHostResourceLimiter { - pub fn new(limits: TangleHostLimitsConfig) -> Self { - Self { - inner: Arc::new(TangleHostResourceLimiterInner { - max_total_connections: limits.max_total_connections(), - max_total_subscriptions: limits.max_total_subscriptions(), - active_connections: AtomicUsize::new(0), - active_subscriptions: AtomicUsize::new(0), - }), - } - } - - pub fn try_open_connection(&self) -> Result<TangleHostConnectionPermit, BaseRelayError> { - increment_with_limit( - &self.inner.active_connections, - 1, - self.inner.max_total_connections, - "host total connection limit exceeded", - )?; - Ok(TangleHostConnectionPermit { - resources: self.inner.clone(), - released: false, - }) - } - - pub fn try_open_subscriptions( - &self, - count: usize, - ) -> Result<TangleHostSubscriptionPermit, BaseRelayError> { - if count == 0 { - return Err(BaseRelayError::invalid( - "subscription reservation count must be greater than zero", - )); - } - increment_with_limit( - &self.inner.active_subscriptions, - count, - self.inner.max_total_subscriptions, - "host total subscription limit exceeded", - )?; - Ok(TangleHostSubscriptionPermit { - resources: self.inner.clone(), - count, - released: false, - }) - } - - pub fn active_connections(&self) -> usize { - self.inner.active_connections.load(Ordering::Relaxed) - } - - pub fn active_subscriptions(&self) -> usize { - self.inner.active_subscriptions.load(Ordering::Relaxed) - } - - pub fn max_total_connections(&self) -> usize { - self.inner.max_total_connections - } - - pub fn max_total_subscriptions(&self) -> usize { - self.inner.max_total_subscriptions - } -} - -#[derive(Debug)] -pub struct TangleHostConnectionPermit { - resources: Arc<TangleHostResourceLimiterInner>, - released: bool, -} - -impl TangleHostConnectionPermit { - pub fn release(mut self) { - self.release_inner(); - } - - fn release_inner(&mut self) { - if !self.released { - self.resources - .active_connections - .fetch_sub(1, Ordering::Relaxed); - self.released = true; - } - } -} - -impl Drop for TangleHostConnectionPermit { - fn drop(&mut self) { - self.release_inner(); - } -} - -#[derive(Debug)] -pub struct TangleHostSubscriptionPermit { - resources: Arc<TangleHostResourceLimiterInner>, - count: usize, - released: bool, -} - -impl TangleHostSubscriptionPermit { - pub fn release(mut self) { - self.release_inner(); - } - - fn release_inner(&mut self) { - if !self.released { - self.resources - .active_subscriptions - .fetch_sub(self.count, Ordering::Relaxed); - self.released = true; - } - } -} - -impl Drop for TangleHostSubscriptionPermit { - fn drop(&mut self) { - self.release_inner(); - } -} - fn open_tenant_runtime( config: &TangleHostRuntimeConfigSet, tenant: &TenantRuntimeConfig, ) -> Result<TenantRuntimeEntry, BaseRelayError> { let runtime_config = tenant .to_base_relay_runtime_config(config.host().listen_addr(), config.host().tracing().clone()); - let runtime = TenantRuntime::open(runtime_config).map_err(|error| { + let runtime = RelayRuntime::open(runtime_config).map_err(|error| { BaseRelayError::error(format!( "failed to open active tenant `{}`: {}", tenant.tenant_id(), @@ -480,7 +347,7 @@ fn open_tenant_runtime( })?; Ok(TenantRuntimeEntry::new( tenant.clone(), - TenantRuntimeHandle::new(runtime), + RelayRuntimeHandle::new(runtime), )) } @@ -503,36 +370,16 @@ fn active_tenants_ready(registry: &TenantRegistry) -> BaseRelayReadinessCheckSta } } -fn increment_with_limit( - counter: &AtomicUsize, - amount: usize, - limit: usize, - message: &'static str, -) -> Result<(), BaseRelayError> { - let mut current = counter.load(Ordering::Relaxed); - loop { - let Some(next) = current.checked_add(amount) else { - return Err(BaseRelayError::restricted(message)); - }; - if next > limit { - return Err(BaseRelayError::restricted(message)); - } - match counter.compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) { - Ok(_) => return Ok(()), - Err(actual) => current = actual, - } - } -} - #[cfg(test)] mod tests { - use super::{TangleHostResourceLimiter, TangleHostRuntime}; + use super::TangleHostRuntime; use crate::{ config::{ - TangleHostLimitsConfig, TangleHostRuntimeConfigSet, - parse_tangle_host_runtime_config_json, parse_tenant_runtime_config_json, + TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json, + parse_tenant_runtime_config_json, }, ops::BaseRelayReadinessCheckStatus, + resource_limits::RelayResourceLimiter, tenant::{CanonicalHost, TenantId}, }; use serde_json::json; @@ -739,8 +586,7 @@ mod tests { #[test] fn host_caps_reject_aggregate_connection_and_subscription_limits() { - let resources = - TangleHostResourceLimiter::new(TangleHostLimitsConfig::new(1, 2, 4).expect("limits")); + let resources = RelayResourceLimiter::new(1, 2); let connection = resources.try_open_connection().expect("connection"); assert_eq!(resources.active_connections(), 1); diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -15,6 +15,7 @@ pub(crate) mod pocket_conversion; pub(crate) mod pocket_event_validation; pub mod rate_limits; pub mod relay; +pub mod resource_limits; pub mod runtime; pub mod server; pub mod session; diff --git a/crates/tangle_runtime/src/resource_limits.rs b/crates/tangle_runtime/src/resource_limits.rs @@ -0,0 +1,160 @@ +#![forbid(unsafe_code)] + +use crate::errors::BaseRelayError; +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +#[derive(Debug, Clone)] +pub struct RelayResourceLimiter { + inner: Arc<RelayResourceLimiterInner>, +} + +#[derive(Debug)] +struct RelayResourceLimiterInner { + max_connections: usize, + max_subscriptions: usize, + active_connections: AtomicUsize, + active_subscriptions: AtomicUsize, +} + +impl RelayResourceLimiter { + pub fn new(max_connections: usize, max_subscriptions: usize) -> Self { + Self { + inner: Arc::new(RelayResourceLimiterInner { + max_connections, + max_subscriptions, + active_connections: AtomicUsize::new(0), + active_subscriptions: AtomicUsize::new(0), + }), + } + } + + pub fn try_open_connection(&self) -> Result<RelayConnectionPermit, BaseRelayError> { + increment_with_limit( + &self.inner.active_connections, + 1, + self.inner.max_connections, + "host total connection limit exceeded", + )?; + Ok(RelayConnectionPermit { + resources: self.inner.clone(), + released: false, + }) + } + + pub fn try_open_subscriptions( + &self, + count: usize, + ) -> Result<RelaySubscriptionPermit, BaseRelayError> { + if count == 0 { + return Err(BaseRelayError::invalid( + "subscription reservation count must be greater than zero", + )); + } + increment_with_limit( + &self.inner.active_subscriptions, + count, + self.inner.max_subscriptions, + "host total subscription limit exceeded", + )?; + Ok(RelaySubscriptionPermit { + resources: self.inner.clone(), + count, + released: false, + }) + } + + pub fn active_connections(&self) -> usize { + self.inner.active_connections.load(Ordering::Relaxed) + } + + pub fn active_subscriptions(&self) -> usize { + self.inner.active_subscriptions.load(Ordering::Relaxed) + } + + pub fn max_connections(&self) -> usize { + self.inner.max_connections + } + + pub fn max_subscriptions(&self) -> usize { + self.inner.max_subscriptions + } +} + +#[derive(Debug)] +pub struct RelayConnectionPermit { + resources: Arc<RelayResourceLimiterInner>, + released: bool, +} + +impl RelayConnectionPermit { + pub fn release(mut self) { + self.release_inner(); + } + + fn release_inner(&mut self) { + if !self.released { + self.resources + .active_connections + .fetch_sub(1, Ordering::Relaxed); + self.released = true; + } + } +} + +impl Drop for RelayConnectionPermit { + fn drop(&mut self) { + self.release_inner(); + } +} + +#[derive(Debug)] +pub struct RelaySubscriptionPermit { + resources: Arc<RelayResourceLimiterInner>, + count: usize, + released: bool, +} + +impl RelaySubscriptionPermit { + pub fn release(mut self) { + self.release_inner(); + } + + fn release_inner(&mut self) { + if !self.released { + self.resources + .active_subscriptions + .fetch_sub(self.count, Ordering::Relaxed); + self.released = true; + } + } +} + +impl Drop for RelaySubscriptionPermit { + fn drop(&mut self) { + self.release_inner(); + } +} + +fn increment_with_limit( + counter: &AtomicUsize, + amount: usize, + limit: usize, + message: &'static str, +) -> Result<(), BaseRelayError> { + let mut current = counter.load(Ordering::Relaxed); + loop { + let Some(next) = current.checked_add(amount) else { + return Err(BaseRelayError::restricted(message)); + }; + if next > limit { + return Err(BaseRelayError::restricted(message)); + } + match counter.compare_exchange(current, next, Ordering::Relaxed, Ordering::Relaxed) { + Ok(_) => return Ok(()), + Err(actual) => current = actual, + } + } +} diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -48,7 +48,7 @@ use tangle_store_pocket::{ }; use tokio::sync::watch; -pub struct TenantRuntime { +pub struct RelayRuntime { config: BaseRelayRuntimeConfig, relay: BaseRelay, readiness: BaseRelayReadinessHandle, @@ -243,7 +243,7 @@ impl TangleQueryClassifier { } } -impl TenantRuntime { +impl RelayRuntime { pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { let limits = TangleRuntimeLimits::from_config(&config)?; let relay = config.open_relay()?; @@ -319,7 +319,7 @@ impl TenantRuntime { } } -struct TenantRuntimeShared { +struct RelayRuntimeShared { config: Arc<BaseRelayRuntimeConfig>, store: PocketStoreHandle, groups: Option<GroupServiceHandle>, @@ -331,9 +331,9 @@ struct TenantRuntimeShared { shutdown: TangleShutdownSignal, } -impl TenantRuntimeShared { - fn from_runtime(runtime: TenantRuntime) -> Self { - let TenantRuntime { +impl RelayRuntimeShared { + fn from_runtime(runtime: RelayRuntime) -> Self { + let RelayRuntime { config, relay, readiness, @@ -790,14 +790,14 @@ impl TenantRuntimeShared { } #[derive(Clone)] -pub struct TenantRuntimeHandle { - inner: Arc<TenantRuntimeShared>, +pub struct RelayRuntimeHandle { + inner: Arc<RelayRuntimeShared>, } -impl TenantRuntimeHandle { - pub fn new(runtime: TenantRuntime) -> Self { +impl RelayRuntimeHandle { + pub fn new(runtime: RelayRuntime) -> Self { Self { - inner: Arc::new(TenantRuntimeShared::from_runtime(runtime)), + inner: Arc::new(RelayRuntimeShared::from_runtime(runtime)), } } @@ -1386,9 +1386,9 @@ fn pocket_filter_kinds(filters: &[PocketOwnedFilter]) -> Vec<Kind> { .collect() } -impl fmt::Debug for TenantRuntimeHandle { +impl fmt::Debug for RelayRuntimeHandle { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter.write_str("TenantRuntimeHandle") + formatter.write_str("RelayRuntimeHandle") } } @@ -2088,9 +2088,9 @@ impl Default for TangleShutdownSignal { #[cfg(test)] mod tests { use super::{ - BROAD_QUERY_TIME_WINDOW_SECONDS, RuntimeClientMessage, TangleBroadQueryReason, - TangleClientRateLimitContext, TangleQueryClassification, TangleQueryClassifier, - TangleRuntimeLimits, TenantRuntime, TenantRuntimeHandle, + BROAD_QUERY_TIME_WINDOW_SECONDS, RelayRuntime, RelayRuntimeHandle, RuntimeClientMessage, + TangleBroadQueryReason, TangleClientRateLimitContext, TangleQueryClassification, + TangleQueryClassifier, TangleRuntimeLimits, }; use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver}; @@ -2128,7 +2128,7 @@ mod tests { let _ = std::fs::remove_dir_all(&root); let config = runtime_config(&root, 8); - let mut runtime = TenantRuntime::open(config).expect("runtime"); + let mut runtime = RelayRuntime::open(config).expect("runtime"); let mut offsets = runtime.event_bus().subscribe(); let shutdown = runtime.shutdown_signal().subscribe(); @@ -2309,9 +2309,8 @@ mod tests { async fn runtime_publishes_stored_event_offsets_for_live_fanout() { let root = temp_root("runtime-offset-fanout"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"), - ); + let handle = + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); let mut offsets = handle.subscribe_events().await; let mut auth = handle.auth_state().await.expect("auth"); let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions"); @@ -2388,7 +2387,7 @@ mod tests { async fn runtime_rate_limits_event_pubkeys_before_storage() { let root = temp_root("runtime-event-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited") .expect("event"); let rule = runtime.config().rate_limits().event().per_pubkey(); @@ -2401,7 +2400,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2428,7 +2427,7 @@ mod tests { async fn runtime_rate_limits_event_kinds_before_storage() { let root = temp_root("runtime-event-kind-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 1, Vec::new(), "limited") .expect("event"); let rule = runtime.config().rate_limits().event().per_kind(); @@ -2438,7 +2437,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2464,7 +2463,7 @@ mod tests { async fn runtime_rate_limits_event_peer_ips_partition_peers_and_precede_identity_keys() { let root = temp_root("runtime-event-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let rule = runtime.config().rate_limits().event().per_ip(); let saturated_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 20)); let other_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 21)); @@ -2483,7 +2482,7 @@ mod tests { let allowed_event = tangle_v2_event(FixtureKey::Owner, 1_714_124_435, 2, Vec::new(), "allowed") .expect("allowed event"); - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2545,7 +2544,7 @@ mod tests { async fn runtime_rate_limits_auth_pubkeys_before_authentication() { let root = temp_root("runtime-auth-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let auth_event = tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); let rule = runtime.config().rate_limits().auth().per_pubkey(); @@ -2558,7 +2557,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(120)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) .expect("challenge"); @@ -2587,7 +2586,7 @@ mod tests { async fn runtime_rate_limits_auth_peer_ips_before_authentication() { let root = temp_root("runtime-auth-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let auth_event = tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); let rule = runtime.config().rate_limits().auth().per_ip(); @@ -2598,7 +2597,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(120)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) .expect("challenge"); @@ -2630,7 +2629,7 @@ mod tests { async fn runtime_rate_limits_auth_failures() { let root = temp_root("runtime-auth-failure-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let auth_event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "") .expect("auth event"); let key = @@ -2641,7 +2640,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2668,7 +2667,7 @@ mod tests { async fn runtime_rate_limits_auth_failures_by_peer_ip() { let root = temp_root("runtime-auth-failure-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let auth_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 22_242, Vec::new(), "") .expect("auth event"); let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 31)); @@ -2679,7 +2678,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2709,7 +2708,7 @@ mod tests { async fn runtime_preserves_chorus_auth_failure_rate_limit_parity() { let root = temp_root("runtime-chorus-auth-rate-limit-parity"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let pubkey_event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "") .expect("pubkey auth event"); @@ -2735,7 +2734,7 @@ mod tests { UnixTimestamp::new(1_714_124_434), ); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2786,7 +2785,7 @@ mod tests { async fn runtime_rate_limits_group_writes_by_pubkey() { let root = temp_root("runtime-group-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let event = tangle_v2_event( FixtureKey::Member, 1_714_124_433, @@ -2805,7 +2804,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2832,7 +2831,7 @@ mod tests { async fn runtime_rate_limits_group_writes_by_peer_ip() { let root = temp_root("runtime-group-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let event = tangle_v2_event( FixtureKey::Member, 1_714_124_433, @@ -2849,7 +2848,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2879,7 +2878,7 @@ mod tests { async fn runtime_rate_limits_group_writes_by_group_id() { let root = temp_root("runtime-group-write-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let group_id = GroupId::new("Farm").expect("group"); let event = tangle_v2_event( FixtureKey::Member, @@ -2896,7 +2895,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2923,7 +2922,7 @@ mod tests { async fn runtime_rate_limits_group_writes_by_kind() { let root = temp_root("runtime-group-kind-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let event = tangle_v2_event( FixtureKey::Member, 1_714_124_433, @@ -2940,7 +2939,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -2966,7 +2965,7 @@ mod tests { async fn runtime_rate_limits_group_join_flows() { let root = temp_root("runtime-group-join-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let group_id = GroupId::new("Farm").expect("group"); let event = tangle_v2_event( FixtureKey::Member, @@ -2983,7 +2982,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -3009,7 +3008,7 @@ mod tests { async fn runtime_rate_limits_group_join_flows_by_peer_ip() { let root = temp_root("runtime-group-join-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let group_id = GroupId::new("Farm").expect("group"); let event = tangle_v2_event( FixtureKey::Member, @@ -3027,7 +3026,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); assert_eq!( @@ -3058,9 +3057,9 @@ mod tests { async fn runtime_rate_limits_req_authenticated_pubkeys() { let root = temp_root("runtime-req-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let rule = runtime.config().rate_limits().req().per_pubkey(); - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) .expect("challenge"); @@ -3116,7 +3115,7 @@ mod tests { async fn runtime_rate_limits_req_connections() { let root = temp_root("runtime-req-connection-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let rule = runtime.config().rate_limits().req().per_connection(); let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, 77); for _ in 0..rule.max_hits() { @@ -3124,7 +3123,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); let subscription_id = SubscriptionId::new("limited-req-connection").expect("subscription"); let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")]; @@ -3156,7 +3155,7 @@ mod tests { async fn runtime_rate_limits_req_filter_groups() { let root = temp_root("runtime-req-group-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let group_id = GroupId::new("Farm").expect("group"); let rule = runtime.config().rate_limits().req().per_group(); let key = TangleRateLimitKey::group(TangleRateLimitScope::Req, group_id); @@ -3165,7 +3164,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); let subscription_id = SubscriptionId::new("limited-req-group").expect("subscription"); let filters = @@ -3256,9 +3255,8 @@ mod tests { async fn runtime_count_hll_accepts_public_pocket_selector() { let root = temp_root("runtime-count-hll"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"), - ); + let handle = + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); let mut auth = handle.auth_state().await.expect("auth"); let target = "c".repeat(64); let tags = PocketOwnedTags::new(&[["e", target.as_str()]]).expect("tags"); @@ -3333,7 +3331,7 @@ mod tests { async fn runtime_rate_limits_count_peer_ips() { let root = temp_root("runtime-count-ip-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let rule = runtime.config().rate_limits().count().per_ip(); let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9)); let key = TangleRateLimitKey::ip(TangleRateLimitScope::Count, peer_ip); @@ -3342,7 +3340,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); let subscription_id = SubscriptionId::new("limited-count-ip").expect("subscription"); let filters = vec![ @@ -3375,9 +3373,8 @@ mod tests { async fn runtime_rejects_search_req_and_count_as_unsupported() { let root = temp_root("runtime-search-unsupported"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"), - ); + let handle = + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); let mut auth = handle.auth_state().await.expect("auth"); let req_id = SubscriptionId::new("search-req").expect("req"); let count_id = SubscriptionId::new("search-count").expect("count"); @@ -3426,7 +3423,7 @@ mod tests { async fn runtime_rate_limits_count_filter_kinds() { let root = temp_root("runtime-count-kind-rate-limit"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let kind = Kind::new(1).expect("kind"); let rule = runtime.config().rate_limits().count().per_kind(); let key = TangleRateLimitKey::kind(TangleRateLimitScope::Count, kind); @@ -3435,7 +3432,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); let subscription_id = SubscriptionId::new("limited-count-kind").expect("subscription"); let filters = vec![ @@ -3467,7 +3464,7 @@ mod tests { async fn runtime_refuses_broad_count_queries_before_rate_limits() { let root = temp_root("runtime-count-broad-refusal"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); let rule = runtime.config().rate_limits().count().broad(); let key = TangleRateLimitKey::query_class( TangleRateLimitScope::Count, @@ -3478,7 +3475,7 @@ mod tests { .rate_limiter() .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); } - let handle = TenantRuntimeHandle::new(runtime); + let handle = RelayRuntimeHandle::new(runtime); let mut auth = handle.auth_state().await.expect("auth"); let subscription_id = SubscriptionId::new("limited-count-broad").expect("subscription"); let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")]; @@ -3510,9 +3507,8 @@ mod tests { async fn runtime_refuses_expensive_count_queries_deterministically() { let root = temp_root("runtime-count-expensive-refusal"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"), - ); + let handle = + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); let mut auth = handle.auth_state().await.expect("auth"); let cases = [ ("missing-selector", json!({"kinds": [1], "limit": 1})), @@ -3563,9 +3559,8 @@ mod tests { async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() { let root = temp_root("runtime-generated-offset-fanout"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 8)).expect("runtime"), - ); + let handle = + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); let mut offsets = handle.subscribe_events().await; let mut auth = handle.auth_state().await.expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) @@ -3684,8 +3679,8 @@ mod tests { async fn runtime_group_concurrency_duplicate_create_accepts_one_projection() { let root = temp_root("runtime-group-concurrency-duplicate-create"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), ); let mut offsets = handle.subscribe_events().await; let owner_auth = @@ -3757,8 +3752,8 @@ mod tests { async fn runtime_group_concurrency_duplicate_join_accepts_one_membership() { let root = temp_root("runtime-group-concurrency-duplicate-join"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), ); let mut offsets = handle.subscribe_events().await; let mut owner_auth = @@ -3825,8 +3820,8 @@ mod tests { async fn runtime_group_concurrency_join_and_leave_match_rebuild() { let root = temp_root("runtime-group-concurrency-join-leave"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), ); let mut owner_auth = authenticated_runtime_state( &handle, @@ -3918,8 +3913,8 @@ mod tests { async fn runtime_group_concurrency_delete_tombstone_blocks_normal_write() { let root = temp_root("runtime-group-concurrency-delete-write"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), ); let mut owner_auth = authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-delete", 1_714_126_400) @@ -4003,8 +3998,8 @@ mod tests { async fn runtime_group_concurrency_membership_mutation_matches_rebuild() { let root = temp_root("runtime-group-concurrency-membership-mutation"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), ); let mut owner_auth = authenticated_runtime_state( &handle, @@ -4088,8 +4083,8 @@ mod tests { async fn runtime_shared_services_progress_under_concurrent_event_query_count_and_fanout() { let root = temp_root("runtime-shared-concurrency"); let _ = std::fs::remove_dir_all(&root); - let handle = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config(&root, 32)).expect("runtime"), + let handle = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), ); let base_time = 1_714_126_000; let mut owner_auth = handle.auth_state().await.expect("owner auth"); @@ -4620,7 +4615,7 @@ mod tests { } async fn authenticated_runtime_state( - handle: &TenantRuntimeHandle, + handle: &RelayRuntimeHandle, key: FixtureKey, challenge: &str, now: u64, @@ -4650,7 +4645,7 @@ mod tests { } async fn runtime_event_reply( - handle: &TenantRuntimeHandle, + handle: &RelayRuntimeHandle, event: Event, auth: &mut BaseAuthState, now: u64, @@ -4669,7 +4664,7 @@ mod tests { } fn runtime_pocket_event_reply( - handle: &TenantRuntimeHandle, + handle: &RelayRuntimeHandle, event: &PocketEvent, auth: &mut BaseAuthState, ) -> RelayMessage { @@ -4681,7 +4676,7 @@ mod tests { } async fn runtime_group_count( - handle: &TenantRuntimeHandle, + handle: &RelayRuntimeHandle, subscription_id: &str, group_id: &str, kind: u32, @@ -4788,7 +4783,7 @@ mod tests { } fn assert_runtime_member_status( - handle: &TenantRuntimeHandle, + handle: &RelayRuntimeHandle, group_id: &str, pubkey: &PublicKeyHex, status: MemberStatus, @@ -4806,7 +4801,7 @@ mod tests { ); } - fn assert_live_projection_matches_rebuild(handle: &TenantRuntimeHandle, group_id: &str) { + fn assert_live_projection_matches_rebuild(handle: &RelayRuntimeHandle, group_id: &str) { let group_id = GroupId::new(group_id).expect("group"); let groups = handle.inner.groups.as_ref().expect("groups"); let live = groups.projection(); @@ -4833,7 +4828,7 @@ mod tests { ); } - fn rebuilt_projection(handle: &TenantRuntimeHandle) -> GroupProjection { + fn rebuilt_projection(handle: &RelayRuntimeHandle) -> GroupProjection { let groups = handle.inner.groups.as_ref().expect("groups"); let limits = groups.limits(); let events = handle diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -130,7 +130,7 @@ async fn tangle_root( }; let tenant_runtime = tenant.runtime().clone(); let session = match tenant_runtime.auth_state().await { - Ok(auth) => TangleWebSocketSession::new_with_peer_and_host_resources( + Ok(auth) => TangleWebSocketSession::new_with_peer_and_resources( tenant_runtime.limits(), state.runtime.shutdown_signal().subscribe(), tenant_runtime.clone(), diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -4,7 +4,6 @@ use crate::{ client_message::{RuntimeClientMessage, parse_runtime_client_message}, errors::BaseRelayError, event_bus::{TangleEventReceiveError, TangleEventReceiver}, - host::{TangleHostResourceLimiter, TangleHostSubscriptionPermit}, logging, relay::{ auth::{BaseAuthState, generate_auth_challenge}, @@ -12,9 +11,10 @@ use crate::{ live::{CloseResult, LiveSubscriptionSet}, outbound::RuntimeRelayMessage, }, + resource_limits::{RelayResourceLimiter, RelaySubscriptionPermit}, runtime::{ - TangleClientMessageMetricKind, TangleClientRateLimitContext, TangleRuntimeLimits, - TenantRuntimeHandle, + RelayRuntimeHandle, TangleClientMessageMetricKind, TangleClientRateLimitContext, + TangleRuntimeLimits, }, }; use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket}; @@ -36,12 +36,12 @@ pub struct TangleWebSocketSession { outbound: TangleOutboundSender, outbound_receiver: mpsc::Receiver<Message>, shutdown: watch::Receiver<bool>, - runtime: TenantRuntimeHandle, + runtime: RelayRuntimeHandle, limits: TangleRuntimeLimits, auth: BaseAuthState, subscriptions: LiveSubscriptionSet, - host_resources: Option<TangleHostResourceLimiter>, - host_subscription_permits: BTreeMap<SubscriptionId, TangleHostSubscriptionPermit>, + resource_limiter: Option<RelayResourceLimiter>, + subscription_permits: BTreeMap<SubscriptionId, RelaySubscriptionPermit>, events: TangleEventReceiver, } @@ -51,7 +51,7 @@ impl TangleWebSocketSession { pub fn new( limits: TangleRuntimeLimits, shutdown: watch::Receiver<bool>, - runtime: TenantRuntimeHandle, + runtime: RelayRuntimeHandle, auth: BaseAuthState, events: TangleEventReceiver, ) -> Result<Self, BaseRelayError> { @@ -61,24 +61,22 @@ impl TangleWebSocketSession { pub fn new_with_peer( limits: TangleRuntimeLimits, shutdown: watch::Receiver<bool>, - runtime: TenantRuntimeHandle, + runtime: RelayRuntimeHandle, auth: BaseAuthState, events: TangleEventReceiver, peer_ip: Option<IpAddr>, ) -> Result<Self, BaseRelayError> { - Self::new_with_peer_and_host_resources( - limits, shutdown, runtime, auth, events, peer_ip, None, - ) + Self::new_with_peer_and_resources(limits, shutdown, runtime, auth, events, peer_ip, None) } - pub fn new_with_peer_and_host_resources( + pub fn new_with_peer_and_resources( limits: TangleRuntimeLimits, shutdown: watch::Receiver<bool>, - runtime: TenantRuntimeHandle, + runtime: RelayRuntimeHandle, auth: BaseAuthState, events: TangleEventReceiver, peer_ip: Option<IpAddr>, - host_resources: Option<TangleHostResourceLimiter>, + resource_limiter: Option<RelayResourceLimiter>, ) -> Result<Self, BaseRelayError> { let outbound_queue_capacity = limits.outbound_queue_capacity(); let (sender, receiver) = mpsc::channel(outbound_queue_capacity); @@ -100,8 +98,8 @@ impl TangleWebSocketSession { limits, auth, subscriptions, - host_resources, - host_subscription_permits: BTreeMap::new(), + resource_limiter, + subscription_permits: BTreeMap::new(), events, }) } @@ -327,7 +325,7 @@ impl TangleWebSocketSession { .base_relay_limits() .validate_subscription_id(&subscription_id)?; if self.subscriptions.close(&subscription_id) == CloseResult::Closed { - self.host_subscription_permits.remove(&subscription_id); + self.subscription_permits.remove(&subscription_id); metrics.record_subscriptions_closed(1); } Ok(Vec::new()) @@ -399,7 +397,7 @@ impl TangleWebSocketSession { let host_permit = if already_subscribed { None } else { - self.host_resources + self.resource_limiter .as_ref() .map(|resources| resources.try_open_subscriptions(1)) .transpose()? @@ -407,7 +405,7 @@ impl TangleWebSocketSession { self.subscriptions .subscribe(subscription_id.clone(), filters)?; if let Some(permit) = host_permit { - self.host_subscription_permits + self.subscription_permits .insert(subscription_id.clone(), permit); } metrics.record_subscription_opened(); @@ -418,7 +416,7 @@ impl TangleWebSocketSession { fn close_all_subscriptions(&mut self) -> usize { let closed = self.subscriptions.close_all(); - self.host_subscription_permits.clear(); + self.subscription_permits.clear(); closed } @@ -613,7 +611,7 @@ mod tests { event_bus::TangleEventReceiver, rate_limits::{TangleRateLimitKey, TangleRateLimitScope}, relay::core::{BaseRelayLimitSettings, BaseRelayLimits}, - runtime::{TangleRuntimeLimits, TangleShutdownSignal, TenantRuntime, TenantRuntimeHandle}, + runtime::{RelayRuntime, RelayRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal}, }; use axum::extract::ws::Message; use serde_json::json; @@ -854,7 +852,7 @@ mod tests { let root = temp_root("connection-scope"); let _ = std::fs::remove_dir_all(&root); let runtime = - TenantRuntimeHandle::new(TenantRuntime::open(runtime_config(&root)).expect("runtime")); + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); let metrics = runtime.metrics(); let auth_a = runtime.auth_state().await.expect("auth a"); let auth_b = runtime.auth_state().await.expect("auth b"); @@ -940,8 +938,8 @@ mod tests { let shutdown = TangleShutdownSignal::new(); let root = temp_root("current-auth-live"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), + let runtime = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), ); let mut owner_auth = runtime.auth_state().await.expect("owner auth"); owner_auth @@ -1093,7 +1091,7 @@ mod tests { let root = temp_root("complete-req-lifecycle"); let _ = std::fs::remove_dir_all(&root); let runtime = - TenantRuntimeHandle::new(TenantRuntime::open(runtime_config(&root)).expect("runtime")); + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); let mut auth = runtime.auth_state().await.expect("auth"); let events = runtime.subscribe_events().await; let mut session = TangleWebSocketSession::new( @@ -1199,8 +1197,8 @@ mod tests { let shutdown = TangleShutdownSignal::new(); let root = temp_root("redacted-req-close"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), + let runtime = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), ); let mut owner_auth = runtime.auth_state().await.expect("owner auth"); owner_auth @@ -1319,7 +1317,7 @@ mod tests { let root = temp_root("chorus-close-scope-parity"); let _ = std::fs::remove_dir_all(&root); let runtime = - TenantRuntimeHandle::new(TenantRuntime::open(runtime_config(&root)).expect("runtime")); + RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root)).expect("runtime")); let metrics = runtime.metrics(); let auth_a = runtime.auth_state().await.expect("auth a"); let auth_b = runtime.auth_state().await.expect("auth b"); @@ -1435,9 +1433,9 @@ mod tests { let shutdown = TangleShutdownSignal::new(); let root = temp_root("rate-limited-req"); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime"); let rule = runtime.config().rate_limits().req().per_connection(); - let runtime = TenantRuntimeHandle::new(runtime); + let runtime = RelayRuntimeHandle::new(runtime); let auth = runtime.auth_state().await.expect("auth"); let events = runtime.subscribe_events().await; let now = current_unix_timestamp(); @@ -1490,12 +1488,12 @@ mod tests { let root = temp_root("event-receiver-lag"); let _ = std::fs::remove_dir_all(&root); let runtime = - TenantRuntime::open(runtime_config_with_outbound_queue(&root, 1)).expect("runtime"); + RelayRuntime::open(runtime_config_with_outbound_queue(&root, 1)).expect("runtime"); let auth = runtime.auth_state().expect("auth"); let events = runtime.event_bus().subscribe(); assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1); assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1); - let runtime = TenantRuntimeHandle::new(runtime); + let runtime = RelayRuntimeHandle::new(runtime); let metrics = runtime.metrics(); let mut session = TangleWebSocketSession::new( session_limits(1), @@ -1522,9 +1520,8 @@ mod tests { let shutdown = TangleShutdownSignal::new(); let live_root = temp_root("chorus-live-fanout-parity"); let _ = std::fs::remove_dir_all(&live_root); - let runtime = TenantRuntimeHandle::new( - TenantRuntime::open(runtime_config_with_outbound_queue(&live_root, 1)) - .expect("runtime"), + let runtime = RelayRuntimeHandle::new( + RelayRuntime::open(runtime_config_with_outbound_queue(&live_root, 1)).expect("runtime"), ); let metrics = runtime.metrics(); let auth = runtime.auth_state().await.expect("auth"); @@ -1595,12 +1592,12 @@ mod tests { let lag_root = temp_root("chorus-live-lag-parity"); let _ = std::fs::remove_dir_all(&lag_root); let runtime = - TenantRuntime::open(runtime_config_with_outbound_queue(&lag_root, 1)).expect("runtime"); + RelayRuntime::open(runtime_config_with_outbound_queue(&lag_root, 1)).expect("runtime"); let auth = runtime.auth_state().expect("auth"); let events = runtime.event_bus().subscribe(); assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1); assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1); - let runtime = TenantRuntimeHandle::new(runtime); + let runtime = RelayRuntimeHandle::new(runtime); let metrics = runtime.metrics(); let mut lagged = TangleWebSocketSession::new( session_limits(1), @@ -1823,16 +1820,16 @@ mod tests { fn session_runtime( name: &str, ) -> ( - TenantRuntimeHandle, + RelayRuntimeHandle, crate::relay::auth::BaseAuthState, TangleEventReceiver, ) { let root = temp_root(name); let _ = std::fs::remove_dir_all(&root); - let runtime = TenantRuntime::open(runtime_config(&root)).expect("runtime"); + let runtime = RelayRuntime::open(runtime_config(&root)).expect("runtime"); let auth = runtime.auth_state().expect("auth"); let events = runtime.event_bus().subscribe(); - (TenantRuntimeHandle::new(runtime), auth, events) + (RelayRuntimeHandle::new(runtime), auth, events) } fn req(subscription_id: SubscriptionId) -> ClientMessage { diff --git a/crates/tangle_runtime/tests/ops_truthfulness.rs b/crates/tangle_runtime/tests/ops_truthfulness.rs @@ -15,7 +15,7 @@ use tangle_runtime::{ ops::BaseRelayReadinessCheckStatus, rate_limits::{TangleRateLimitKey, TangleRateLimitScope, TangleRateLimiter}, relay::{auth::BaseAuthState, core::BaseRelay}, - runtime::TenantRuntime, + runtime::RelayRuntime, }; use tangle_store_pocket::parse_pocket_event_json; use tangle_store_pocket::{PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime}; @@ -210,7 +210,7 @@ fn operations_surfaces_match_enforced_runtime_contracts() { .is_allowed() ); - let runtime = TenantRuntime::open(config.clone()).expect("runtime"); + let runtime = RelayRuntime::open(config.clone()).expect("runtime"); let pre_bind = runtime.readiness_state().response(); assert_eq!(pre_bind.status, "not_ready"); assert_eq!(pre_bind.checks.server_bind, "not_ready"); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -32,7 +32,7 @@ use tangle_runtime::{ host::TangleHostRuntime, nip11::BaseRelayInfoConfig, relay::{auth::BaseAuthState, core::BaseRelay}, - runtime::TenantRuntime, + runtime::RelayRuntime, server::serve_listener_until_shutdown, }; use tangle_store_pocket::{ @@ -1984,14 +1984,14 @@ fn runtime_live_fanout_offset_lookup_does_not_lock_relay_state() { fn runtime_shared_shell_does_not_keep_transitional_base_relay_mutex() { let runtime = include_str!("../src/runtime.rs"); let shared_shell = runtime - .split("struct TenantRuntimeShared {") + .split("struct RelayRuntimeShared {") .nth(1) .expect("shared shell") - .split("impl TenantRuntimeShared") + .split("impl RelayRuntimeShared") .next() .expect("shared shell fields"); let handle_impl = runtime - .split("impl TenantRuntimeHandle") + .split("impl RelayRuntimeHandle") .nth(1) .expect("runtime handle") .split("fn auth_response_failed") @@ -2059,7 +2059,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { .expect("note"); { - let mut runtime = TenantRuntime::open(config.clone()).expect("runtime"); + let mut runtime = RelayRuntime::open(config.clone()).expect("runtime"); assert_relay_ok( runtime .relay_mut() @@ -2105,7 +2105,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { delete_group_extra_records(config.pocket_config()); - let recovered = TenantRuntime::open(config.clone()).expect("recovered"); + let recovered = RelayRuntime::open(config.clone()).expect("recovered"); let readiness = recovered.readiness_state().response(); assert_eq!(readiness.checks.group_projection, "ready"); assert_eq!(readiness.checks.group_outbox_replay, "ready");