host.rs (29188B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 config::{TangleHostRuntimeConfigSet, TenantRuntimeConfig}, 5 errors::BaseRelayError, 6 ops::BaseRelayReadinessCheckStatus, 7 resource_limits::RelayResourceLimiter, 8 runtime::{RelayRuntime, RelayRuntimeHandle, TangleShutdownSignal}, 9 tenant::{CanonicalHost, TenantId, TenantRelayUrl, TenantSchema}, 10 }; 11 use http::{HeaderMap, header}; 12 use std::collections::BTreeMap; 13 use std::net::SocketAddr; 14 15 #[derive(Debug, Clone)] 16 pub struct TangleHostRuntime { 17 config: TangleHostRuntimeConfigSet, 18 registry: TenantRegistry, 19 resources: RelayResourceLimiter, 20 shutdown: TangleShutdownSignal, 21 } 22 23 impl TangleHostRuntime { 24 pub fn open(config: TangleHostRuntimeConfigSet) -> Result<Self, BaseRelayError> { 25 let limits = config.host().limits(); 26 let resources = RelayResourceLimiter::new( 27 limits.max_total_connections(), 28 limits.max_total_subscriptions(), 29 ); 30 let registry = TenantRegistry::open(&config)?; 31 Ok(Self { 32 config, 33 registry, 34 resources, 35 shutdown: TangleShutdownSignal::new(), 36 }) 37 } 38 39 pub fn config(&self) -> &TangleHostRuntimeConfigSet { 40 &self.config 41 } 42 43 pub fn registry(&self) -> &TenantRegistry { 44 &self.registry 45 } 46 47 pub fn resources(&self) -> RelayResourceLimiter { 48 self.resources.clone() 49 } 50 51 pub fn shutdown_signal(&self) -> &TangleShutdownSignal { 52 &self.shutdown 53 } 54 55 pub fn readiness_state(&self) -> TangleHostReadinessState { 56 TangleHostReadinessState::new( 57 BaseRelayReadinessCheckStatus::Ready, 58 registry_ready(self.registry.active_tenant_count()), 59 active_tenants_ready(&self.registry), 60 self.shutdown.requested(), 61 ) 62 } 63 64 pub fn metrics_snapshot(&self) -> TangleHostMetricsSnapshot { 65 TangleHostMetricsSnapshot::new( 66 self.config.tenants().len(), 67 self.registry.active_tenant_count(), 68 self.config 69 .tenants() 70 .iter() 71 .filter(|tenant| tenant.inactive()) 72 .count(), 73 self.resources.active_connections(), 74 self.resources.active_subscriptions(), 75 self.config.host().limits().max_total_connections(), 76 self.config.host().limits().max_total_subscriptions(), 77 ) 78 } 79 80 pub fn tenant_for_request( 81 &self, 82 headers: &HeaderMap, 83 peer_addr: SocketAddr, 84 ) -> Result<&TenantRuntimeEntry, HostResolutionError> { 85 let host = resolve_request_host(headers, peer_addr, self.config.host().trusted_proxy())?; 86 self.registry 87 .tenant_by_host(&host) 88 .ok_or(HostResolutionError::Unknown) 89 } 90 91 pub fn tenant_inventory(&self) -> Vec<TangleHostTenantInventoryItem> { 92 let mut items = Vec::with_capacity(self.config.tenants().len()); 93 for tenant in self.config.tenants() { 94 let runtime = self.registry.tenant_by_id(tenant.tenant_id()); 95 items.push(TangleHostTenantInventoryItem::new( 96 tenant.tenant_id().clone(), 97 tenant.tenant_schema().clone(), 98 tenant.host().clone(), 99 tenant.relay_url().clone(), 100 runtime.is_some(), 101 runtime.map(|entry| entry.runtime().readiness_handle().snapshot().is_ready()), 102 )); 103 } 104 items 105 } 106 107 pub async fn shutdown(&self) -> Result<TangleHostShutdownReport, BaseRelayError> { 108 self.shutdown.request_shutdown(); 109 let mut closed_subscriptions = 0; 110 for tenant in self.registry.active_tenants() { 111 closed_subscriptions += tenant.runtime().shutdown().await?.closed_subscriptions(); 112 } 113 Ok(TangleHostShutdownReport::new( 114 self.registry.active_tenant_count(), 115 closed_subscriptions, 116 )) 117 } 118 } 119 120 #[derive(Debug, Clone, PartialEq, Eq)] 121 pub struct TangleHostTenantInventoryItem { 122 tenant_id: TenantId, 123 tenant_schema: TenantSchema, 124 host: CanonicalHost, 125 relay_url: TenantRelayUrl, 126 active: bool, 127 ready: Option<bool>, 128 } 129 130 impl TangleHostTenantInventoryItem { 131 pub fn new( 132 tenant_id: TenantId, 133 tenant_schema: TenantSchema, 134 host: CanonicalHost, 135 relay_url: TenantRelayUrl, 136 active: bool, 137 ready: Option<bool>, 138 ) -> Self { 139 Self { 140 tenant_id, 141 tenant_schema, 142 host, 143 relay_url, 144 active, 145 ready, 146 } 147 } 148 149 pub fn tenant_id(&self) -> &TenantId { 150 &self.tenant_id 151 } 152 153 pub fn tenant_schema(&self) -> &TenantSchema { 154 &self.tenant_schema 155 } 156 157 pub fn host(&self) -> &CanonicalHost { 158 &self.host 159 } 160 161 pub fn relay_url(&self) -> &TenantRelayUrl { 162 &self.relay_url 163 } 164 165 pub fn active(&self) -> bool { 166 self.active 167 } 168 169 pub fn ready(&self) -> bool { 170 self.ready.unwrap_or(false) 171 } 172 } 173 174 #[derive(Debug, Clone, PartialEq, Eq)] 175 pub struct TangleHostReadinessState { 176 config: BaseRelayReadinessCheckStatus, 177 tenant_registry: BaseRelayReadinessCheckStatus, 178 active_tenants: BaseRelayReadinessCheckStatus, 179 shutdown_requested: bool, 180 } 181 182 impl TangleHostReadinessState { 183 pub fn new( 184 config: BaseRelayReadinessCheckStatus, 185 tenant_registry: BaseRelayReadinessCheckStatus, 186 active_tenants: BaseRelayReadinessCheckStatus, 187 shutdown_requested: bool, 188 ) -> Self { 189 Self { 190 config, 191 tenant_registry, 192 active_tenants, 193 shutdown_requested, 194 } 195 } 196 197 pub fn is_ready(&self) -> bool { 198 !self.shutdown_requested 199 && self.config.is_ready() 200 && self.tenant_registry.is_ready() 201 && self.active_tenants.is_ready() 202 } 203 204 pub fn config(&self) -> BaseRelayReadinessCheckStatus { 205 self.config 206 } 207 208 pub fn tenant_registry(&self) -> BaseRelayReadinessCheckStatus { 209 self.tenant_registry 210 } 211 212 pub fn active_tenants(&self) -> BaseRelayReadinessCheckStatus { 213 self.active_tenants 214 } 215 216 pub fn shutdown_requested(&self) -> bool { 217 self.shutdown_requested 218 } 219 } 220 221 #[derive(Debug, Clone, PartialEq, Eq)] 222 pub struct TangleHostMetricsSnapshot { 223 configured_tenants: usize, 224 active_tenants: usize, 225 inactive_tenants: usize, 226 active_connections: usize, 227 active_subscriptions: usize, 228 max_total_connections: usize, 229 max_total_subscriptions: usize, 230 } 231 232 impl TangleHostMetricsSnapshot { 233 pub fn new( 234 configured_tenants: usize, 235 active_tenants: usize, 236 inactive_tenants: usize, 237 active_connections: usize, 238 active_subscriptions: usize, 239 max_total_connections: usize, 240 max_total_subscriptions: usize, 241 ) -> Self { 242 Self { 243 configured_tenants, 244 active_tenants, 245 inactive_tenants, 246 active_connections, 247 active_subscriptions, 248 max_total_connections, 249 max_total_subscriptions, 250 } 251 } 252 253 pub fn configured_tenants(&self) -> usize { 254 self.configured_tenants 255 } 256 257 pub fn active_tenants(&self) -> usize { 258 self.active_tenants 259 } 260 261 pub fn inactive_tenants(&self) -> usize { 262 self.inactive_tenants 263 } 264 265 pub fn active_connections(&self) -> usize { 266 self.active_connections 267 } 268 269 pub fn active_subscriptions(&self) -> usize { 270 self.active_subscriptions 271 } 272 273 pub fn max_total_connections(&self) -> usize { 274 self.max_total_connections 275 } 276 277 pub fn max_total_subscriptions(&self) -> usize { 278 self.max_total_subscriptions 279 } 280 } 281 282 #[derive(Debug, Clone, PartialEq, Eq)] 283 pub struct TangleHostShutdownReport { 284 tenants_shutdown: usize, 285 closed_subscriptions: usize, 286 } 287 288 impl TangleHostShutdownReport { 289 pub fn new(tenants_shutdown: usize, closed_subscriptions: usize) -> Self { 290 Self { 291 tenants_shutdown, 292 closed_subscriptions, 293 } 294 } 295 296 pub fn tenants_shutdown(&self) -> usize { 297 self.tenants_shutdown 298 } 299 300 pub fn closed_subscriptions(&self) -> usize { 301 self.closed_subscriptions 302 } 303 } 304 305 #[derive(Debug, Clone)] 306 pub struct TenantRegistry { 307 tenants_by_host: BTreeMap<CanonicalHost, TenantRuntimeEntry>, 308 host_by_tenant_id: BTreeMap<TenantId, CanonicalHost>, 309 } 310 311 impl TenantRegistry { 312 pub fn open(config: &TangleHostRuntimeConfigSet) -> Result<Self, BaseRelayError> { 313 let mut entries = Vec::new(); 314 let concurrency = config.host().limits().tenant_startup_concurrency(); 315 let active_tenants = config.active_tenants().cloned().collect::<Vec<_>>(); 316 for chunk in active_tenants.chunks(concurrency) { 317 for tenant in chunk { 318 entries.push(open_tenant_runtime(config, tenant)?); 319 } 320 } 321 Self::new(entries) 322 } 323 324 pub fn new(entries: Vec<TenantRuntimeEntry>) -> Result<Self, BaseRelayError> { 325 let mut tenants_by_host = BTreeMap::new(); 326 let mut host_by_tenant_id = BTreeMap::new(); 327 for entry in entries { 328 if tenants_by_host.contains_key(entry.host()) { 329 return Err(BaseRelayError::invalid(format!( 330 "duplicate active tenant host: {}", 331 entry.host() 332 ))); 333 } 334 if host_by_tenant_id 335 .insert(entry.tenant_id().clone(), entry.host().clone()) 336 .is_some() 337 { 338 return Err(BaseRelayError::invalid(format!( 339 "duplicate active tenant id: {}", 340 entry.tenant_id() 341 ))); 342 } 343 tenants_by_host.insert(entry.host().clone(), entry); 344 } 345 Ok(Self { 346 tenants_by_host, 347 host_by_tenant_id, 348 }) 349 } 350 351 pub fn active_tenant_count(&self) -> usize { 352 self.tenants_by_host.len() 353 } 354 355 pub fn active_tenants(&self) -> impl Iterator<Item = &TenantRuntimeEntry> { 356 self.tenants_by_host.values() 357 } 358 359 pub fn tenant_by_host(&self, host: &CanonicalHost) -> Option<&TenantRuntimeEntry> { 360 self.tenants_by_host.get(host) 361 } 362 363 pub fn tenant_by_id(&self, tenant_id: &TenantId) -> Option<&TenantRuntimeEntry> { 364 self.host_by_tenant_id 365 .get(tenant_id) 366 .and_then(|host| self.tenants_by_host.get(host)) 367 } 368 } 369 370 #[derive(Clone)] 371 pub struct TenantRuntimeEntry { 372 config: TenantRuntimeConfig, 373 runtime: RelayRuntimeHandle, 374 } 375 376 impl TenantRuntimeEntry { 377 pub fn new(config: TenantRuntimeConfig, runtime: RelayRuntimeHandle) -> Self { 378 Self { config, runtime } 379 } 380 381 pub fn config(&self) -> &TenantRuntimeConfig { 382 &self.config 383 } 384 385 pub fn tenant_id(&self) -> &TenantId { 386 self.config.tenant_id() 387 } 388 389 pub fn tenant_schema(&self) -> &TenantSchema { 390 self.config.tenant_schema() 391 } 392 393 pub fn host(&self) -> &CanonicalHost { 394 self.config.host() 395 } 396 397 pub fn relay_url(&self) -> &TenantRelayUrl { 398 self.config.relay_url() 399 } 400 401 pub fn runtime(&self) -> &RelayRuntimeHandle { 402 &self.runtime 403 } 404 } 405 406 impl std::fmt::Debug for TenantRuntimeEntry { 407 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 408 formatter 409 .debug_struct("TenantRuntimeEntry") 410 .field("tenant_id", self.tenant_id()) 411 .field("tenant_schema", self.tenant_schema()) 412 .field("host", self.host()) 413 .field("relay_url", self.relay_url()) 414 .finish_non_exhaustive() 415 } 416 } 417 418 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 419 pub enum HostResolutionError { 420 Missing, 421 Invalid, 422 Unknown, 423 } 424 425 fn open_tenant_runtime( 426 config: &TangleHostRuntimeConfigSet, 427 tenant: &TenantRuntimeConfig, 428 ) -> Result<TenantRuntimeEntry, BaseRelayError> { 429 let runtime_config = tenant 430 .to_base_relay_runtime_config(config.host().listen_addr(), config.host().tracing().clone()); 431 let runtime = RelayRuntime::open(runtime_config).map_err(|error| { 432 BaseRelayError::error(format!( 433 "failed to open active tenant `{}`: {}", 434 tenant.tenant_id(), 435 error.prefixed_message() 436 )) 437 })?; 438 Ok(TenantRuntimeEntry::new( 439 tenant.clone(), 440 RelayRuntimeHandle::new(runtime), 441 )) 442 } 443 444 fn registry_ready(active_tenant_count: usize) -> BaseRelayReadinessCheckStatus { 445 if active_tenant_count == 0 { 446 BaseRelayReadinessCheckStatus::NotReady 447 } else { 448 BaseRelayReadinessCheckStatus::Ready 449 } 450 } 451 452 fn active_tenants_ready(registry: &TenantRegistry) -> BaseRelayReadinessCheckStatus { 453 if registry 454 .active_tenants() 455 .all(|tenant| tenant.runtime().readiness_handle().snapshot().is_ready()) 456 { 457 BaseRelayReadinessCheckStatus::Ready 458 } else { 459 BaseRelayReadinessCheckStatus::NotReady 460 } 461 } 462 463 fn resolve_request_host( 464 headers: &HeaderMap, 465 peer_addr: SocketAddr, 466 trusted_proxy: &crate::config::TangleTrustedProxyConfig, 467 ) -> Result<CanonicalHost, HostResolutionError> { 468 let forwarded_host = trusted_proxy_peer_enabled(trusted_proxy, peer_addr) 469 .then(|| forwarded_host_header(headers)) 470 .flatten(); 471 let host = forwarded_host 472 .or_else(|| { 473 headers 474 .get(header::HOST) 475 .and_then(|value| value.to_str().ok()) 476 }) 477 .ok_or(HostResolutionError::Missing)?; 478 let host = host 479 .split(',') 480 .next() 481 .map(str::trim) 482 .filter(|host| !host.is_empty()) 483 .ok_or(HostResolutionError::Missing)?; 484 CanonicalHost::new(host).map_err(|_| HostResolutionError::Invalid) 485 } 486 487 fn trusted_proxy_peer_enabled( 488 trusted_proxy: &crate::config::TangleTrustedProxyConfig, 489 peer_addr: SocketAddr, 490 ) -> bool { 491 trusted_proxy.enabled() 492 && trusted_proxy 493 .trusted_peers() 494 .iter() 495 .any(|peer| peer == &peer_addr.ip().to_string() || peer == &peer_addr.to_string()) 496 } 497 498 fn forwarded_host_header(headers: &HeaderMap) -> Option<&str> { 499 headers 500 .get("x-forwarded-host") 501 .and_then(|value| value.to_str().ok()) 502 .or_else(|| { 503 headers 504 .get("forwarded") 505 .and_then(|value| value.to_str().ok()) 506 .and_then(forwarded_host_value) 507 }) 508 } 509 510 fn forwarded_host_value(value: &str) -> Option<&str> { 511 value.split(';').find_map(|part| { 512 let (name, value) = part.trim().split_once('=')?; 513 name.eq_ignore_ascii_case("host") 514 .then(|| value.trim_matches('"')) 515 }) 516 } 517 518 #[cfg(test)] 519 mod tests { 520 use super::TangleHostRuntime; 521 use crate::{ 522 config::{ 523 TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json, 524 parse_tenant_runtime_config_json, 525 }, 526 ops::BaseRelayReadinessCheckStatus, 527 resource_limits::RelayResourceLimiter, 528 tenant::{CanonicalHost, TenantId}, 529 }; 530 use serde_json::json; 531 use std::{ 532 fs, 533 path::{Path, PathBuf}, 534 time::{SystemTime, UNIX_EPOCH}, 535 }; 536 use tangle_test_support::FixtureKey; 537 538 #[test] 539 fn host_runtime_opens_two_active_tenants_with_distinct_serving_maps() { 540 let root = temp_root("two-active-tenants"); 541 let _ = fs::remove_dir_all(&root); 542 let config = config_set( 543 &root, 544 vec![ 545 tenant_config(&root, "alpha", "alpha_schema", "alpha.relay.test", false, 9), 546 tenant_config(&root, "beta", "beta_schema", "beta.relay.test", false, 10), 547 ], 548 ); 549 550 let host = TangleHostRuntime::open(config).expect("host"); 551 let alpha_host = CanonicalHost::new("alpha.relay.test").expect("alpha host"); 552 let beta_id = TenantId::new("beta").expect("beta id"); 553 554 assert_eq!(host.registry().active_tenant_count(), 2); 555 assert_eq!( 556 host.registry() 557 .tenant_by_host(&alpha_host) 558 .expect("alpha") 559 .tenant_id() 560 .as_str(), 561 "alpha" 562 ); 563 assert_eq!( 564 host.registry() 565 .tenant_by_id(&beta_id) 566 .expect("beta") 567 .host() 568 .as_str(), 569 "beta.relay.test" 570 ); 571 assert!(root.join("alpha/pocket").exists()); 572 assert!(root.join("beta/pocket").exists()); 573 assert_eq!(host.metrics_snapshot().configured_tenants(), 2); 574 assert_eq!(host.metrics_snapshot().active_tenants(), 2); 575 assert_eq!(host.metrics_snapshot().inactive_tenants(), 0); 576 577 let _ = fs::remove_dir_all(root); 578 } 579 580 #[test] 581 fn inactive_tenants_validate_without_serving_or_storage() { 582 let root = temp_root("inactive-tenant"); 583 let _ = fs::remove_dir_all(&root); 584 let config = config_set( 585 &root, 586 vec![ 587 tenant_config( 588 &root, 589 "active", 590 "active_schema", 591 "active.relay.test", 592 false, 593 9, 594 ), 595 tenant_config( 596 &root, 597 "inactive", 598 "inactive_schema", 599 "inactive.relay.test", 600 true, 601 10, 602 ), 603 ], 604 ); 605 606 let host = TangleHostRuntime::open(config).expect("host"); 607 let inactive_host = CanonicalHost::new("inactive.relay.test").expect("inactive host"); 608 let inactive_id = TenantId::new("inactive").expect("inactive id"); 609 let active_id = TenantId::new("active").expect("active id"); 610 611 assert_eq!(host.registry().active_tenant_count(), 1); 612 assert!(host.registry().tenant_by_host(&inactive_host).is_none()); 613 assert!(host.registry().tenant_by_id(&inactive_id).is_none()); 614 assert!(host.registry().tenant_by_id(&active_id).is_some()); 615 assert!(!root.join("inactive/pocket").exists()); 616 assert_eq!(host.metrics_snapshot().configured_tenants(), 2); 617 assert_eq!(host.metrics_snapshot().inactive_tenants(), 1); 618 619 let _ = fs::remove_dir_all(root); 620 } 621 622 #[test] 623 fn active_tenant_open_failure_fails_host_startup() { 624 let root = temp_root("active-open-failure"); 625 let _ = fs::remove_dir_all(&root); 626 fs::create_dir_all(&root).expect("root"); 627 fs::write(root.join("broken-pocket"), "not a directory").expect("file"); 628 let config = config_set( 629 &root, 630 vec![tenant_config_with_pocket_path( 631 &root, 632 "broken", 633 "broken_schema", 634 "broken.relay.test", 635 false, 636 9, 637 root.join("broken-pocket"), 638 )], 639 ); 640 641 let error = TangleHostRuntime::open(config).expect_err("open failure"); 642 assert!(error.message().contains("failed to open active tenant")); 643 assert!(error.message().contains("broken")); 644 645 let _ = fs::remove_dir_all(root); 646 } 647 648 #[test] 649 fn host_readiness_requires_all_active_tenants_and_ignores_inactive_tenants() { 650 let root = temp_root("host-readiness"); 651 let _ = fs::remove_dir_all(&root); 652 let config = config_set( 653 &root, 654 vec![ 655 tenant_config(&root, "alpha", "alpha_schema", "alpha-ready.test", false, 9), 656 tenant_config(&root, "beta", "beta_schema", "beta-ready.test", false, 10), 657 tenant_config( 658 &root, 659 "paused", 660 "paused_schema", 661 "paused-ready.test", 662 true, 663 11, 664 ), 665 ], 666 ); 667 let host = TangleHostRuntime::open(config).expect("host"); 668 let alpha = TenantId::new("alpha").expect("alpha id"); 669 let beta = TenantId::new("beta").expect("beta id"); 670 671 assert!(!host.readiness_state().is_ready()); 672 host.registry() 673 .tenant_by_id(&alpha) 674 .expect("alpha") 675 .runtime() 676 .readiness_handle() 677 .set_server_bind(BaseRelayReadinessCheckStatus::Ready); 678 assert!(!host.readiness_state().is_ready()); 679 host.registry() 680 .tenant_by_id(&beta) 681 .expect("beta") 682 .runtime() 683 .readiness_handle() 684 .set_server_bind(BaseRelayReadinessCheckStatus::Ready); 685 let readiness = host.readiness_state(); 686 assert!(readiness.is_ready()); 687 assert_eq!( 688 readiness.active_tenants(), 689 BaseRelayReadinessCheckStatus::Ready 690 ); 691 692 let _ = fs::remove_dir_all(root); 693 } 694 695 #[tokio::test] 696 async fn host_shutdown_drains_all_active_tenant_runtimes() { 697 let root = temp_root("host-shutdown"); 698 let _ = fs::remove_dir_all(&root); 699 let config = config_set( 700 &root, 701 vec![ 702 tenant_config( 703 &root, 704 "alpha", 705 "alpha_schema", 706 "alpha-shutdown.test", 707 false, 708 9, 709 ), 710 tenant_config( 711 &root, 712 "beta", 713 "beta_schema", 714 "beta-shutdown.test", 715 false, 716 10, 717 ), 718 ], 719 ); 720 let host = TangleHostRuntime::open(config).expect("host"); 721 722 let report = host.shutdown().await.expect("shutdown"); 723 724 assert!(host.shutdown_signal().requested()); 725 assert_eq!(report.tenants_shutdown(), 2); 726 assert_eq!(report.closed_subscriptions(), 0); 727 assert!(!host.readiness_state().is_ready()); 728 729 let _ = fs::remove_dir_all(root); 730 } 731 732 #[test] 733 fn host_caps_reject_aggregate_connection_and_subscription_limits() { 734 let resources = RelayResourceLimiter::new(1, 2); 735 736 let connection = resources.try_open_connection().expect("connection"); 737 assert_eq!(resources.active_connections(), 1); 738 assert!(resources.try_open_connection().is_err()); 739 drop(connection); 740 assert_eq!(resources.active_connections(), 0); 741 let connection = resources.try_open_connection().expect("connection again"); 742 connection.release(); 743 assert_eq!(resources.active_connections(), 0); 744 745 let subscriptions = resources.try_open_subscriptions(2).expect("subscriptions"); 746 assert_eq!(resources.active_subscriptions(), 2); 747 assert!(resources.try_open_subscriptions(1).is_err()); 748 drop(subscriptions); 749 assert_eq!(resources.active_subscriptions(), 0); 750 let subscriptions = resources 751 .try_open_subscriptions(1) 752 .expect("subscriptions again"); 753 subscriptions.release(); 754 assert_eq!(resources.active_subscriptions(), 0); 755 } 756 757 fn config_set( 758 root: &Path, 759 tenants: Vec<crate::config::TenantRuntimeConfig>, 760 ) -> TangleHostRuntimeConfigSet { 761 let host = parse_tangle_host_runtime_config_json( 762 &json!({ 763 "listen_addr": "127.0.0.1:0", 764 "tenant_config_dir": root.join("tenants"), 765 "limits": { 766 "max_total_connections": 16, 767 "max_total_subscriptions": 32, 768 "tenant_startup_concurrency": 2 769 }, 770 "ops": { 771 "enabled": true, 772 "expose_tenant_inventory": true 773 }, 774 "trusted_proxy": { 775 "enabled": false, 776 "trusted_peers": [] 777 } 778 }) 779 .to_string(), 780 ) 781 .expect("host config"); 782 TangleHostRuntimeConfigSet::new(host, tenants).expect("config set") 783 } 784 785 fn tenant_config( 786 root: &Path, 787 tenant_id: &str, 788 tenant_schema: &str, 789 host: &str, 790 inactive: bool, 791 secret_byte: u8, 792 ) -> crate::config::TenantRuntimeConfig { 793 tenant_config_with_pocket_path( 794 root, 795 tenant_id, 796 tenant_schema, 797 host, 798 inactive, 799 secret_byte, 800 root.join(tenant_id).join("pocket"), 801 ) 802 } 803 804 fn tenant_config_with_pocket_path( 805 root: &Path, 806 tenant_id: &str, 807 tenant_schema: &str, 808 host: &str, 809 inactive: bool, 810 secret_byte: u8, 811 pocket_path: PathBuf, 812 ) -> crate::config::TenantRuntimeConfig { 813 let relay_url = format!("wss://{host}"); 814 let secret = format!("{secret_byte:02x}").repeat(32); 815 let raw = json!({ 816 "tenant_id": tenant_id, 817 "tenant_schema": tenant_schema, 818 "host": host, 819 "relay_url": relay_url, 820 "inactive": inactive, 821 "info": { 822 "name": format!("tenant {tenant_id}") 823 }, 824 "pocket": { 825 "data_directory": pocket_path, 826 "sync_policy": "flush_on_shutdown" 827 }, 828 "pocket_query": { 829 "allow_scraping": false, 830 "allow_scrape_if_limited_to": 100, 831 "allow_scrape_if_max_seconds": 3600 832 }, 833 "groups": { 834 "enabled": true, 835 "canonical_relay_url": relay_url, 836 "relay_secret": secret, 837 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] 838 }, 839 "auth": { 840 "challenge_ttl_seconds": 300, 841 "created_at_skew_seconds": 600 842 }, 843 "limits": { 844 "max_message_length": 1048576, 845 "max_subid_length": 64, 846 "max_subscriptions_per_connection": 64, 847 "max_filters_per_request": 10, 848 "max_tag_values_per_filter": 100, 849 "max_query_complexity": 2048, 850 "max_limit": 500, 851 "default_limit": 100, 852 "max_event_tags": 200, 853 "max_content_length": 65536, 854 "broadcast_channel_capacity": 8, 855 "per_connection_outbound_queue": 8 856 }, 857 "rate_limits": { 858 "auth": { 859 "per_ip": {"window_seconds": 60, "max_hits": 120}, 860 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 861 "failures": {"window_seconds": 300, "max_hits": 5}, 862 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 863 }, 864 "event": { 865 "per_ip": {"window_seconds": 60, "max_hits": 600}, 866 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 867 "per_kind": {"window_seconds": 60, "max_hits": 1000} 868 }, 869 "group": { 870 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 871 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 872 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 873 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 874 "join_flow": {"window_seconds": 300, "max_hits": 10}, 875 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 876 }, 877 "req": { 878 "per_ip": {"window_seconds": 60, "max_hits": 600}, 879 "per_connection": {"window_seconds": 60, "max_hits": 120}, 880 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 881 "per_group": {"window_seconds": 60, "max_hits": 240}, 882 "per_kind": {"window_seconds": 60, "max_hits": 500}, 883 "broad": {"window_seconds": 60, "max_hits": 30} 884 }, 885 "count": { 886 "per_ip": {"window_seconds": 60, "max_hits": 300}, 887 "per_connection": {"window_seconds": 60, "max_hits": 60}, 888 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 889 "per_group": {"window_seconds": 60, "max_hits": 120}, 890 "per_kind": {"window_seconds": 60, "max_hits": 240}, 891 "broad": {"window_seconds": 60, "max_hits": 20} 892 } 893 } 894 }) 895 .to_string(); 896 let tenant = parse_tenant_runtime_config_json(&raw).expect("tenant config"); 897 assert!(tenant.pocket_config().data_directory().starts_with(root)); 898 tenant 899 } 900 901 fn temp_root(name: &str) -> PathBuf { 902 let nanos = SystemTime::now() 903 .duration_since(UNIX_EPOCH) 904 .expect("time") 905 .as_nanos(); 906 std::env::temp_dir().join(format!("tangle-host-{name}-{nanos}")) 907 } 908 }