tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

host.rs (29188B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{
      4     config::{TangleHostRuntimeConfigSet, TenantRuntimeConfig},
      5     errors::BaseRelayError,
      6     ops::BaseRelayReadinessCheckStatus,
      7     resource_limits::RelayResourceLimiter,
      8     runtime::{RelayRuntime, RelayRuntimeHandle, TangleShutdownSignal},
      9     tenant::{CanonicalHost, TenantId, TenantRelayUrl, TenantSchema},
     10 };
     11 use http::{HeaderMap, header};
     12 use std::collections::BTreeMap;
     13 use std::net::SocketAddr;
     14 
     15 #[derive(Debug, Clone)]
     16 pub struct TangleHostRuntime {
     17     config: TangleHostRuntimeConfigSet,
     18     registry: TenantRegistry,
     19     resources: RelayResourceLimiter,
     20     shutdown: TangleShutdownSignal,
     21 }
     22 
     23 impl TangleHostRuntime {
     24     pub fn open(config: TangleHostRuntimeConfigSet) -> Result<Self, BaseRelayError> {
     25         let limits = config.host().limits();
     26         let resources = RelayResourceLimiter::new(
     27             limits.max_total_connections(),
     28             limits.max_total_subscriptions(),
     29         );
     30         let registry = TenantRegistry::open(&config)?;
     31         Ok(Self {
     32             config,
     33             registry,
     34             resources,
     35             shutdown: TangleShutdownSignal::new(),
     36         })
     37     }
     38 
     39     pub fn config(&self) -> &TangleHostRuntimeConfigSet {
     40         &self.config
     41     }
     42 
     43     pub fn registry(&self) -> &TenantRegistry {
     44         &self.registry
     45     }
     46 
     47     pub fn resources(&self) -> RelayResourceLimiter {
     48         self.resources.clone()
     49     }
     50 
     51     pub fn shutdown_signal(&self) -> &TangleShutdownSignal {
     52         &self.shutdown
     53     }
     54 
     55     pub fn readiness_state(&self) -> TangleHostReadinessState {
     56         TangleHostReadinessState::new(
     57             BaseRelayReadinessCheckStatus::Ready,
     58             registry_ready(self.registry.active_tenant_count()),
     59             active_tenants_ready(&self.registry),
     60             self.shutdown.requested(),
     61         )
     62     }
     63 
     64     pub fn metrics_snapshot(&self) -> TangleHostMetricsSnapshot {
     65         TangleHostMetricsSnapshot::new(
     66             self.config.tenants().len(),
     67             self.registry.active_tenant_count(),
     68             self.config
     69                 .tenants()
     70                 .iter()
     71                 .filter(|tenant| tenant.inactive())
     72                 .count(),
     73             self.resources.active_connections(),
     74             self.resources.active_subscriptions(),
     75             self.config.host().limits().max_total_connections(),
     76             self.config.host().limits().max_total_subscriptions(),
     77         )
     78     }
     79 
     80     pub fn tenant_for_request(
     81         &self,
     82         headers: &HeaderMap,
     83         peer_addr: SocketAddr,
     84     ) -> Result<&TenantRuntimeEntry, HostResolutionError> {
     85         let host = resolve_request_host(headers, peer_addr, self.config.host().trusted_proxy())?;
     86         self.registry
     87             .tenant_by_host(&host)
     88             .ok_or(HostResolutionError::Unknown)
     89     }
     90 
     91     pub fn tenant_inventory(&self) -> Vec<TangleHostTenantInventoryItem> {
     92         let mut items = Vec::with_capacity(self.config.tenants().len());
     93         for tenant in self.config.tenants() {
     94             let runtime = self.registry.tenant_by_id(tenant.tenant_id());
     95             items.push(TangleHostTenantInventoryItem::new(
     96                 tenant.tenant_id().clone(),
     97                 tenant.tenant_schema().clone(),
     98                 tenant.host().clone(),
     99                 tenant.relay_url().clone(),
    100                 runtime.is_some(),
    101                 runtime.map(|entry| entry.runtime().readiness_handle().snapshot().is_ready()),
    102             ));
    103         }
    104         items
    105     }
    106 
    107     pub async fn shutdown(&self) -> Result<TangleHostShutdownReport, BaseRelayError> {
    108         self.shutdown.request_shutdown();
    109         let mut closed_subscriptions = 0;
    110         for tenant in self.registry.active_tenants() {
    111             closed_subscriptions += tenant.runtime().shutdown().await?.closed_subscriptions();
    112         }
    113         Ok(TangleHostShutdownReport::new(
    114             self.registry.active_tenant_count(),
    115             closed_subscriptions,
    116         ))
    117     }
    118 }
    119 
    120 #[derive(Debug, Clone, PartialEq, Eq)]
    121 pub struct TangleHostTenantInventoryItem {
    122     tenant_id: TenantId,
    123     tenant_schema: TenantSchema,
    124     host: CanonicalHost,
    125     relay_url: TenantRelayUrl,
    126     active: bool,
    127     ready: Option<bool>,
    128 }
    129 
    130 impl TangleHostTenantInventoryItem {
    131     pub fn new(
    132         tenant_id: TenantId,
    133         tenant_schema: TenantSchema,
    134         host: CanonicalHost,
    135         relay_url: TenantRelayUrl,
    136         active: bool,
    137         ready: Option<bool>,
    138     ) -> Self {
    139         Self {
    140             tenant_id,
    141             tenant_schema,
    142             host,
    143             relay_url,
    144             active,
    145             ready,
    146         }
    147     }
    148 
    149     pub fn tenant_id(&self) -> &TenantId {
    150         &self.tenant_id
    151     }
    152 
    153     pub fn tenant_schema(&self) -> &TenantSchema {
    154         &self.tenant_schema
    155     }
    156 
    157     pub fn host(&self) -> &CanonicalHost {
    158         &self.host
    159     }
    160 
    161     pub fn relay_url(&self) -> &TenantRelayUrl {
    162         &self.relay_url
    163     }
    164 
    165     pub fn active(&self) -> bool {
    166         self.active
    167     }
    168 
    169     pub fn ready(&self) -> bool {
    170         self.ready.unwrap_or(false)
    171     }
    172 }
    173 
    174 #[derive(Debug, Clone, PartialEq, Eq)]
    175 pub struct TangleHostReadinessState {
    176     config: BaseRelayReadinessCheckStatus,
    177     tenant_registry: BaseRelayReadinessCheckStatus,
    178     active_tenants: BaseRelayReadinessCheckStatus,
    179     shutdown_requested: bool,
    180 }
    181 
    182 impl TangleHostReadinessState {
    183     pub fn new(
    184         config: BaseRelayReadinessCheckStatus,
    185         tenant_registry: BaseRelayReadinessCheckStatus,
    186         active_tenants: BaseRelayReadinessCheckStatus,
    187         shutdown_requested: bool,
    188     ) -> Self {
    189         Self {
    190             config,
    191             tenant_registry,
    192             active_tenants,
    193             shutdown_requested,
    194         }
    195     }
    196 
    197     pub fn is_ready(&self) -> bool {
    198         !self.shutdown_requested
    199             && self.config.is_ready()
    200             && self.tenant_registry.is_ready()
    201             && self.active_tenants.is_ready()
    202     }
    203 
    204     pub fn config(&self) -> BaseRelayReadinessCheckStatus {
    205         self.config
    206     }
    207 
    208     pub fn tenant_registry(&self) -> BaseRelayReadinessCheckStatus {
    209         self.tenant_registry
    210     }
    211 
    212     pub fn active_tenants(&self) -> BaseRelayReadinessCheckStatus {
    213         self.active_tenants
    214     }
    215 
    216     pub fn shutdown_requested(&self) -> bool {
    217         self.shutdown_requested
    218     }
    219 }
    220 
    221 #[derive(Debug, Clone, PartialEq, Eq)]
    222 pub struct TangleHostMetricsSnapshot {
    223     configured_tenants: usize,
    224     active_tenants: usize,
    225     inactive_tenants: usize,
    226     active_connections: usize,
    227     active_subscriptions: usize,
    228     max_total_connections: usize,
    229     max_total_subscriptions: usize,
    230 }
    231 
    232 impl TangleHostMetricsSnapshot {
    233     pub fn new(
    234         configured_tenants: usize,
    235         active_tenants: usize,
    236         inactive_tenants: usize,
    237         active_connections: usize,
    238         active_subscriptions: usize,
    239         max_total_connections: usize,
    240         max_total_subscriptions: usize,
    241     ) -> Self {
    242         Self {
    243             configured_tenants,
    244             active_tenants,
    245             inactive_tenants,
    246             active_connections,
    247             active_subscriptions,
    248             max_total_connections,
    249             max_total_subscriptions,
    250         }
    251     }
    252 
    253     pub fn configured_tenants(&self) -> usize {
    254         self.configured_tenants
    255     }
    256 
    257     pub fn active_tenants(&self) -> usize {
    258         self.active_tenants
    259     }
    260 
    261     pub fn inactive_tenants(&self) -> usize {
    262         self.inactive_tenants
    263     }
    264 
    265     pub fn active_connections(&self) -> usize {
    266         self.active_connections
    267     }
    268 
    269     pub fn active_subscriptions(&self) -> usize {
    270         self.active_subscriptions
    271     }
    272 
    273     pub fn max_total_connections(&self) -> usize {
    274         self.max_total_connections
    275     }
    276 
    277     pub fn max_total_subscriptions(&self) -> usize {
    278         self.max_total_subscriptions
    279     }
    280 }
    281 
    282 #[derive(Debug, Clone, PartialEq, Eq)]
    283 pub struct TangleHostShutdownReport {
    284     tenants_shutdown: usize,
    285     closed_subscriptions: usize,
    286 }
    287 
    288 impl TangleHostShutdownReport {
    289     pub fn new(tenants_shutdown: usize, closed_subscriptions: usize) -> Self {
    290         Self {
    291             tenants_shutdown,
    292             closed_subscriptions,
    293         }
    294     }
    295 
    296     pub fn tenants_shutdown(&self) -> usize {
    297         self.tenants_shutdown
    298     }
    299 
    300     pub fn closed_subscriptions(&self) -> usize {
    301         self.closed_subscriptions
    302     }
    303 }
    304 
    305 #[derive(Debug, Clone)]
    306 pub struct TenantRegistry {
    307     tenants_by_host: BTreeMap<CanonicalHost, TenantRuntimeEntry>,
    308     host_by_tenant_id: BTreeMap<TenantId, CanonicalHost>,
    309 }
    310 
    311 impl TenantRegistry {
    312     pub fn open(config: &TangleHostRuntimeConfigSet) -> Result<Self, BaseRelayError> {
    313         let mut entries = Vec::new();
    314         let concurrency = config.host().limits().tenant_startup_concurrency();
    315         let active_tenants = config.active_tenants().cloned().collect::<Vec<_>>();
    316         for chunk in active_tenants.chunks(concurrency) {
    317             for tenant in chunk {
    318                 entries.push(open_tenant_runtime(config, tenant)?);
    319             }
    320         }
    321         Self::new(entries)
    322     }
    323 
    324     pub fn new(entries: Vec<TenantRuntimeEntry>) -> Result<Self, BaseRelayError> {
    325         let mut tenants_by_host = BTreeMap::new();
    326         let mut host_by_tenant_id = BTreeMap::new();
    327         for entry in entries {
    328             if tenants_by_host.contains_key(entry.host()) {
    329                 return Err(BaseRelayError::invalid(format!(
    330                     "duplicate active tenant host: {}",
    331                     entry.host()
    332                 )));
    333             }
    334             if host_by_tenant_id
    335                 .insert(entry.tenant_id().clone(), entry.host().clone())
    336                 .is_some()
    337             {
    338                 return Err(BaseRelayError::invalid(format!(
    339                     "duplicate active tenant id: {}",
    340                     entry.tenant_id()
    341                 )));
    342             }
    343             tenants_by_host.insert(entry.host().clone(), entry);
    344         }
    345         Ok(Self {
    346             tenants_by_host,
    347             host_by_tenant_id,
    348         })
    349     }
    350 
    351     pub fn active_tenant_count(&self) -> usize {
    352         self.tenants_by_host.len()
    353     }
    354 
    355     pub fn active_tenants(&self) -> impl Iterator<Item = &TenantRuntimeEntry> {
    356         self.tenants_by_host.values()
    357     }
    358 
    359     pub fn tenant_by_host(&self, host: &CanonicalHost) -> Option<&TenantRuntimeEntry> {
    360         self.tenants_by_host.get(host)
    361     }
    362 
    363     pub fn tenant_by_id(&self, tenant_id: &TenantId) -> Option<&TenantRuntimeEntry> {
    364         self.host_by_tenant_id
    365             .get(tenant_id)
    366             .and_then(|host| self.tenants_by_host.get(host))
    367     }
    368 }
    369 
    370 #[derive(Clone)]
    371 pub struct TenantRuntimeEntry {
    372     config: TenantRuntimeConfig,
    373     runtime: RelayRuntimeHandle,
    374 }
    375 
    376 impl TenantRuntimeEntry {
    377     pub fn new(config: TenantRuntimeConfig, runtime: RelayRuntimeHandle) -> Self {
    378         Self { config, runtime }
    379     }
    380 
    381     pub fn config(&self) -> &TenantRuntimeConfig {
    382         &self.config
    383     }
    384 
    385     pub fn tenant_id(&self) -> &TenantId {
    386         self.config.tenant_id()
    387     }
    388 
    389     pub fn tenant_schema(&self) -> &TenantSchema {
    390         self.config.tenant_schema()
    391     }
    392 
    393     pub fn host(&self) -> &CanonicalHost {
    394         self.config.host()
    395     }
    396 
    397     pub fn relay_url(&self) -> &TenantRelayUrl {
    398         self.config.relay_url()
    399     }
    400 
    401     pub fn runtime(&self) -> &RelayRuntimeHandle {
    402         &self.runtime
    403     }
    404 }
    405 
    406 impl std::fmt::Debug for TenantRuntimeEntry {
    407     fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    408         formatter
    409             .debug_struct("TenantRuntimeEntry")
    410             .field("tenant_id", self.tenant_id())
    411             .field("tenant_schema", self.tenant_schema())
    412             .field("host", self.host())
    413             .field("relay_url", self.relay_url())
    414             .finish_non_exhaustive()
    415     }
    416 }
    417 
    418 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    419 pub enum HostResolutionError {
    420     Missing,
    421     Invalid,
    422     Unknown,
    423 }
    424 
    425 fn open_tenant_runtime(
    426     config: &TangleHostRuntimeConfigSet,
    427     tenant: &TenantRuntimeConfig,
    428 ) -> Result<TenantRuntimeEntry, BaseRelayError> {
    429     let runtime_config = tenant
    430         .to_base_relay_runtime_config(config.host().listen_addr(), config.host().tracing().clone());
    431     let runtime = RelayRuntime::open(runtime_config).map_err(|error| {
    432         BaseRelayError::error(format!(
    433             "failed to open active tenant `{}`: {}",
    434             tenant.tenant_id(),
    435             error.prefixed_message()
    436         ))
    437     })?;
    438     Ok(TenantRuntimeEntry::new(
    439         tenant.clone(),
    440         RelayRuntimeHandle::new(runtime),
    441     ))
    442 }
    443 
    444 fn registry_ready(active_tenant_count: usize) -> BaseRelayReadinessCheckStatus {
    445     if active_tenant_count == 0 {
    446         BaseRelayReadinessCheckStatus::NotReady
    447     } else {
    448         BaseRelayReadinessCheckStatus::Ready
    449     }
    450 }
    451 
    452 fn active_tenants_ready(registry: &TenantRegistry) -> BaseRelayReadinessCheckStatus {
    453     if registry
    454         .active_tenants()
    455         .all(|tenant| tenant.runtime().readiness_handle().snapshot().is_ready())
    456     {
    457         BaseRelayReadinessCheckStatus::Ready
    458     } else {
    459         BaseRelayReadinessCheckStatus::NotReady
    460     }
    461 }
    462 
    463 fn resolve_request_host(
    464     headers: &HeaderMap,
    465     peer_addr: SocketAddr,
    466     trusted_proxy: &crate::config::TangleTrustedProxyConfig,
    467 ) -> Result<CanonicalHost, HostResolutionError> {
    468     let forwarded_host = trusted_proxy_peer_enabled(trusted_proxy, peer_addr)
    469         .then(|| forwarded_host_header(headers))
    470         .flatten();
    471     let host = forwarded_host
    472         .or_else(|| {
    473             headers
    474                 .get(header::HOST)
    475                 .and_then(|value| value.to_str().ok())
    476         })
    477         .ok_or(HostResolutionError::Missing)?;
    478     let host = host
    479         .split(',')
    480         .next()
    481         .map(str::trim)
    482         .filter(|host| !host.is_empty())
    483         .ok_or(HostResolutionError::Missing)?;
    484     CanonicalHost::new(host).map_err(|_| HostResolutionError::Invalid)
    485 }
    486 
    487 fn trusted_proxy_peer_enabled(
    488     trusted_proxy: &crate::config::TangleTrustedProxyConfig,
    489     peer_addr: SocketAddr,
    490 ) -> bool {
    491     trusted_proxy.enabled()
    492         && trusted_proxy
    493             .trusted_peers()
    494             .iter()
    495             .any(|peer| peer == &peer_addr.ip().to_string() || peer == &peer_addr.to_string())
    496 }
    497 
    498 fn forwarded_host_header(headers: &HeaderMap) -> Option<&str> {
    499     headers
    500         .get("x-forwarded-host")
    501         .and_then(|value| value.to_str().ok())
    502         .or_else(|| {
    503             headers
    504                 .get("forwarded")
    505                 .and_then(|value| value.to_str().ok())
    506                 .and_then(forwarded_host_value)
    507         })
    508 }
    509 
    510 fn forwarded_host_value(value: &str) -> Option<&str> {
    511     value.split(';').find_map(|part| {
    512         let (name, value) = part.trim().split_once('=')?;
    513         name.eq_ignore_ascii_case("host")
    514             .then(|| value.trim_matches('"'))
    515     })
    516 }
    517 
    518 #[cfg(test)]
    519 mod tests {
    520     use super::TangleHostRuntime;
    521     use crate::{
    522         config::{
    523             TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json,
    524             parse_tenant_runtime_config_json,
    525         },
    526         ops::BaseRelayReadinessCheckStatus,
    527         resource_limits::RelayResourceLimiter,
    528         tenant::{CanonicalHost, TenantId},
    529     };
    530     use serde_json::json;
    531     use std::{
    532         fs,
    533         path::{Path, PathBuf},
    534         time::{SystemTime, UNIX_EPOCH},
    535     };
    536     use tangle_test_support::FixtureKey;
    537 
    538     #[test]
    539     fn host_runtime_opens_two_active_tenants_with_distinct_serving_maps() {
    540         let root = temp_root("two-active-tenants");
    541         let _ = fs::remove_dir_all(&root);
    542         let config = config_set(
    543             &root,
    544             vec![
    545                 tenant_config(&root, "alpha", "alpha_schema", "alpha.relay.test", false, 9),
    546                 tenant_config(&root, "beta", "beta_schema", "beta.relay.test", false, 10),
    547             ],
    548         );
    549 
    550         let host = TangleHostRuntime::open(config).expect("host");
    551         let alpha_host = CanonicalHost::new("alpha.relay.test").expect("alpha host");
    552         let beta_id = TenantId::new("beta").expect("beta id");
    553 
    554         assert_eq!(host.registry().active_tenant_count(), 2);
    555         assert_eq!(
    556             host.registry()
    557                 .tenant_by_host(&alpha_host)
    558                 .expect("alpha")
    559                 .tenant_id()
    560                 .as_str(),
    561             "alpha"
    562         );
    563         assert_eq!(
    564             host.registry()
    565                 .tenant_by_id(&beta_id)
    566                 .expect("beta")
    567                 .host()
    568                 .as_str(),
    569             "beta.relay.test"
    570         );
    571         assert!(root.join("alpha/pocket").exists());
    572         assert!(root.join("beta/pocket").exists());
    573         assert_eq!(host.metrics_snapshot().configured_tenants(), 2);
    574         assert_eq!(host.metrics_snapshot().active_tenants(), 2);
    575         assert_eq!(host.metrics_snapshot().inactive_tenants(), 0);
    576 
    577         let _ = fs::remove_dir_all(root);
    578     }
    579 
    580     #[test]
    581     fn inactive_tenants_validate_without_serving_or_storage() {
    582         let root = temp_root("inactive-tenant");
    583         let _ = fs::remove_dir_all(&root);
    584         let config = config_set(
    585             &root,
    586             vec![
    587                 tenant_config(
    588                     &root,
    589                     "active",
    590                     "active_schema",
    591                     "active.relay.test",
    592                     false,
    593                     9,
    594                 ),
    595                 tenant_config(
    596                     &root,
    597                     "inactive",
    598                     "inactive_schema",
    599                     "inactive.relay.test",
    600                     true,
    601                     10,
    602                 ),
    603             ],
    604         );
    605 
    606         let host = TangleHostRuntime::open(config).expect("host");
    607         let inactive_host = CanonicalHost::new("inactive.relay.test").expect("inactive host");
    608         let inactive_id = TenantId::new("inactive").expect("inactive id");
    609         let active_id = TenantId::new("active").expect("active id");
    610 
    611         assert_eq!(host.registry().active_tenant_count(), 1);
    612         assert!(host.registry().tenant_by_host(&inactive_host).is_none());
    613         assert!(host.registry().tenant_by_id(&inactive_id).is_none());
    614         assert!(host.registry().tenant_by_id(&active_id).is_some());
    615         assert!(!root.join("inactive/pocket").exists());
    616         assert_eq!(host.metrics_snapshot().configured_tenants(), 2);
    617         assert_eq!(host.metrics_snapshot().inactive_tenants(), 1);
    618 
    619         let _ = fs::remove_dir_all(root);
    620     }
    621 
    622     #[test]
    623     fn active_tenant_open_failure_fails_host_startup() {
    624         let root = temp_root("active-open-failure");
    625         let _ = fs::remove_dir_all(&root);
    626         fs::create_dir_all(&root).expect("root");
    627         fs::write(root.join("broken-pocket"), "not a directory").expect("file");
    628         let config = config_set(
    629             &root,
    630             vec![tenant_config_with_pocket_path(
    631                 &root,
    632                 "broken",
    633                 "broken_schema",
    634                 "broken.relay.test",
    635                 false,
    636                 9,
    637                 root.join("broken-pocket"),
    638             )],
    639         );
    640 
    641         let error = TangleHostRuntime::open(config).expect_err("open failure");
    642         assert!(error.message().contains("failed to open active tenant"));
    643         assert!(error.message().contains("broken"));
    644 
    645         let _ = fs::remove_dir_all(root);
    646     }
    647 
    648     #[test]
    649     fn host_readiness_requires_all_active_tenants_and_ignores_inactive_tenants() {
    650         let root = temp_root("host-readiness");
    651         let _ = fs::remove_dir_all(&root);
    652         let config = config_set(
    653             &root,
    654             vec![
    655                 tenant_config(&root, "alpha", "alpha_schema", "alpha-ready.test", false, 9),
    656                 tenant_config(&root, "beta", "beta_schema", "beta-ready.test", false, 10),
    657                 tenant_config(
    658                     &root,
    659                     "paused",
    660                     "paused_schema",
    661                     "paused-ready.test",
    662                     true,
    663                     11,
    664                 ),
    665             ],
    666         );
    667         let host = TangleHostRuntime::open(config).expect("host");
    668         let alpha = TenantId::new("alpha").expect("alpha id");
    669         let beta = TenantId::new("beta").expect("beta id");
    670 
    671         assert!(!host.readiness_state().is_ready());
    672         host.registry()
    673             .tenant_by_id(&alpha)
    674             .expect("alpha")
    675             .runtime()
    676             .readiness_handle()
    677             .set_server_bind(BaseRelayReadinessCheckStatus::Ready);
    678         assert!(!host.readiness_state().is_ready());
    679         host.registry()
    680             .tenant_by_id(&beta)
    681             .expect("beta")
    682             .runtime()
    683             .readiness_handle()
    684             .set_server_bind(BaseRelayReadinessCheckStatus::Ready);
    685         let readiness = host.readiness_state();
    686         assert!(readiness.is_ready());
    687         assert_eq!(
    688             readiness.active_tenants(),
    689             BaseRelayReadinessCheckStatus::Ready
    690         );
    691 
    692         let _ = fs::remove_dir_all(root);
    693     }
    694 
    695     #[tokio::test]
    696     async fn host_shutdown_drains_all_active_tenant_runtimes() {
    697         let root = temp_root("host-shutdown");
    698         let _ = fs::remove_dir_all(&root);
    699         let config = config_set(
    700             &root,
    701             vec![
    702                 tenant_config(
    703                     &root,
    704                     "alpha",
    705                     "alpha_schema",
    706                     "alpha-shutdown.test",
    707                     false,
    708                     9,
    709                 ),
    710                 tenant_config(
    711                     &root,
    712                     "beta",
    713                     "beta_schema",
    714                     "beta-shutdown.test",
    715                     false,
    716                     10,
    717                 ),
    718             ],
    719         );
    720         let host = TangleHostRuntime::open(config).expect("host");
    721 
    722         let report = host.shutdown().await.expect("shutdown");
    723 
    724         assert!(host.shutdown_signal().requested());
    725         assert_eq!(report.tenants_shutdown(), 2);
    726         assert_eq!(report.closed_subscriptions(), 0);
    727         assert!(!host.readiness_state().is_ready());
    728 
    729         let _ = fs::remove_dir_all(root);
    730     }
    731 
    732     #[test]
    733     fn host_caps_reject_aggregate_connection_and_subscription_limits() {
    734         let resources = RelayResourceLimiter::new(1, 2);
    735 
    736         let connection = resources.try_open_connection().expect("connection");
    737         assert_eq!(resources.active_connections(), 1);
    738         assert!(resources.try_open_connection().is_err());
    739         drop(connection);
    740         assert_eq!(resources.active_connections(), 0);
    741         let connection = resources.try_open_connection().expect("connection again");
    742         connection.release();
    743         assert_eq!(resources.active_connections(), 0);
    744 
    745         let subscriptions = resources.try_open_subscriptions(2).expect("subscriptions");
    746         assert_eq!(resources.active_subscriptions(), 2);
    747         assert!(resources.try_open_subscriptions(1).is_err());
    748         drop(subscriptions);
    749         assert_eq!(resources.active_subscriptions(), 0);
    750         let subscriptions = resources
    751             .try_open_subscriptions(1)
    752             .expect("subscriptions again");
    753         subscriptions.release();
    754         assert_eq!(resources.active_subscriptions(), 0);
    755     }
    756 
    757     fn config_set(
    758         root: &Path,
    759         tenants: Vec<crate::config::TenantRuntimeConfig>,
    760     ) -> TangleHostRuntimeConfigSet {
    761         let host = parse_tangle_host_runtime_config_json(
    762             &json!({
    763                 "listen_addr": "127.0.0.1:0",
    764                 "tenant_config_dir": root.join("tenants"),
    765                 "limits": {
    766                     "max_total_connections": 16,
    767                     "max_total_subscriptions": 32,
    768                     "tenant_startup_concurrency": 2
    769                 },
    770                 "ops": {
    771                     "enabled": true,
    772                     "expose_tenant_inventory": true
    773                 },
    774                 "trusted_proxy": {
    775                     "enabled": false,
    776                     "trusted_peers": []
    777                 }
    778             })
    779             .to_string(),
    780         )
    781         .expect("host config");
    782         TangleHostRuntimeConfigSet::new(host, tenants).expect("config set")
    783     }
    784 
    785     fn tenant_config(
    786         root: &Path,
    787         tenant_id: &str,
    788         tenant_schema: &str,
    789         host: &str,
    790         inactive: bool,
    791         secret_byte: u8,
    792     ) -> crate::config::TenantRuntimeConfig {
    793         tenant_config_with_pocket_path(
    794             root,
    795             tenant_id,
    796             tenant_schema,
    797             host,
    798             inactive,
    799             secret_byte,
    800             root.join(tenant_id).join("pocket"),
    801         )
    802     }
    803 
    804     fn tenant_config_with_pocket_path(
    805         root: &Path,
    806         tenant_id: &str,
    807         tenant_schema: &str,
    808         host: &str,
    809         inactive: bool,
    810         secret_byte: u8,
    811         pocket_path: PathBuf,
    812     ) -> crate::config::TenantRuntimeConfig {
    813         let relay_url = format!("wss://{host}");
    814         let secret = format!("{secret_byte:02x}").repeat(32);
    815         let raw = json!({
    816             "tenant_id": tenant_id,
    817             "tenant_schema": tenant_schema,
    818             "host": host,
    819             "relay_url": relay_url,
    820             "inactive": inactive,
    821             "info": {
    822                 "name": format!("tenant {tenant_id}")
    823             },
    824             "pocket": {
    825                 "data_directory": pocket_path,
    826                 "sync_policy": "flush_on_shutdown"
    827             },
    828             "pocket_query": {
    829                 "allow_scraping": false,
    830                 "allow_scrape_if_limited_to": 100,
    831                 "allow_scrape_if_max_seconds": 3600
    832             },
    833             "groups": {
    834                 "enabled": true,
    835                 "canonical_relay_url": relay_url,
    836                 "relay_secret": secret,
    837                 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()]
    838             },
    839             "auth": {
    840                 "challenge_ttl_seconds": 300,
    841                 "created_at_skew_seconds": 600
    842             },
    843             "limits": {
    844                 "max_message_length": 1048576,
    845                 "max_subid_length": 64,
    846                 "max_subscriptions_per_connection": 64,
    847                 "max_filters_per_request": 10,
    848                 "max_tag_values_per_filter": 100,
    849                 "max_query_complexity": 2048,
    850                 "max_limit": 500,
    851                 "default_limit": 100,
    852                 "max_event_tags": 200,
    853                 "max_content_length": 65536,
    854                 "broadcast_channel_capacity": 8,
    855                 "per_connection_outbound_queue": 8
    856             },
    857             "rate_limits": {
    858                 "auth": {
    859                     "per_ip": {"window_seconds": 60, "max_hits": 120},
    860                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
    861                     "failures": {"window_seconds": 300, "max_hits": 5},
    862                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
    863                 },
    864                 "event": {
    865                     "per_ip": {"window_seconds": 60, "max_hits": 600},
    866                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    867                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
    868                 },
    869                 "group": {
    870                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
    871                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
    872                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
    873                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
    874                     "join_flow": {"window_seconds": 300, "max_hits": 10},
    875                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
    876                 },
    877                 "req": {
    878                     "per_ip": {"window_seconds": 60, "max_hits": 600},
    879                     "per_connection": {"window_seconds": 60, "max_hits": 120},
    880                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
    881                     "per_group": {"window_seconds": 60, "max_hits": 240},
    882                     "per_kind": {"window_seconds": 60, "max_hits": 500},
    883                     "broad": {"window_seconds": 60, "max_hits": 30}
    884                 },
    885                 "count": {
    886                     "per_ip": {"window_seconds": 60, "max_hits": 300},
    887                     "per_connection": {"window_seconds": 60, "max_hits": 60},
    888                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    889                     "per_group": {"window_seconds": 60, "max_hits": 120},
    890                     "per_kind": {"window_seconds": 60, "max_hits": 240},
    891                     "broad": {"window_seconds": 60, "max_hits": 20}
    892                 }
    893             }
    894         })
    895         .to_string();
    896         let tenant = parse_tenant_runtime_config_json(&raw).expect("tenant config");
    897         assert!(tenant.pocket_config().data_directory().starts_with(root));
    898         tenant
    899     }
    900 
    901     fn temp_root(name: &str) -> PathBuf {
    902         let nanos = SystemTime::now()
    903             .duration_since(UNIX_EPOCH)
    904             .expect("time")
    905             .as_nanos();
    906         std::env::temp_dir().join(format!("tangle-host-{name}-{nanos}"))
    907     }
    908 }