runtime.rs (202887B)
1 #![forbid(unsafe_code)] 2 3 #[cfg(test)] 4 use crate::relay::outbound::protocol_messages_for_test; 5 use crate::{ 6 client_message::RuntimeClientMessage, 7 config::BaseRelayRuntimeConfig, 8 errors::BaseRelayError, 9 event_bus::{TangleEventBus, TangleEventReceiver}, 10 groups::GroupServiceHandle, 11 logging, 12 ops::{BaseRelayReadinessHandle, BaseRelayReadinessState}, 13 pocket_event_validation::{pocket_event_id, pocket_event_kind, pocket_event_pubkey}, 14 rate_limits::{ 15 TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, 16 TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter, 17 }, 18 relay::{ 19 auth::BaseAuthState, 20 core::{ 21 BaseRelay, BaseRelayCountQuery, BaseRelayCountReport, BaseRelayEventWrite, 22 BaseRelayLimits, BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayReqQuery, 23 BaseRelayShutdownReport, 24 }, 25 live::LiveSubscriptionSet, 26 outbound::{RuntimeRelayMessage, protocol_control_messages}, 27 }, 28 }; 29 use serde::{Deserialize, Serialize}; 30 use std::{ 31 collections::BTreeSet, 32 fmt, fs, 33 net::IpAddr, 34 path::Path, 35 str, 36 sync::{ 37 Arc, 38 atomic::{AtomicU64, AtomicUsize, Ordering}, 39 }, 40 time::Instant, 41 }; 42 use tangle_groups::{ 43 GroupAuthContext, GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset, 44 validate_client_group_event_structure, 45 }; 46 use tangle_protocol::{Kind, RelayMessage, SubscriptionId, UnixTimestamp}; 47 use tangle_store_pocket::{ 48 PocketEvent, PocketFilter, PocketOwnedEvent, PocketOwnedFilter, PocketStoreHandle, PocketTime, 49 }; 50 use tokio::sync::watch; 51 52 pub struct RelayRuntime { 53 config: BaseRelayRuntimeConfig, 54 relay: BaseRelay, 55 readiness: BaseRelayReadinessHandle, 56 limits: TangleRuntimeLimits, 57 event_bus: TangleEventBus, 58 rate_limiter: TangleRateLimiter, 59 metrics: TangleRuntimeMetrics, 60 shutdown: TangleShutdownSignal, 61 hooks: Arc<dyn RelayRuntimeHooks>, 62 } 63 64 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] 65 pub struct TangleClientRateLimitContext { 66 peer_ip: Option<IpAddr>, 67 connection_id: Option<u64>, 68 } 69 70 impl TangleClientRateLimitContext { 71 pub fn new(peer_ip: Option<IpAddr>, connection_id: Option<u64>) -> Self { 72 Self { 73 peer_ip, 74 connection_id, 75 } 76 } 77 78 pub fn peer_ip(self) -> Option<IpAddr> { 79 self.peer_ip 80 } 81 82 pub fn connection_id(self) -> Option<u64> { 83 self.connection_id 84 } 85 } 86 87 pub trait RelayRuntimeHooks: Send + Sync { 88 fn admit_event(&self, _context: &RelayEventAdmissionContext) -> EventAdmissionDecision { 89 EventAdmissionDecision::Accept 90 } 91 92 fn event_stored(&self, _context: &RelayEventStoredContext) {} 93 } 94 95 #[derive(Debug, Default)] 96 pub struct NoopRelayRuntimeHooks; 97 98 impl RelayRuntimeHooks for NoopRelayRuntimeHooks {} 99 100 #[derive(Debug, Clone, PartialEq, Eq)] 101 pub enum EventAdmissionDecision { 102 Accept, 103 Reject { message: String }, 104 } 105 106 impl EventAdmissionDecision { 107 pub fn reject(message: impl Into<String>) -> Self { 108 Self::Reject { 109 message: message.into(), 110 } 111 } 112 } 113 114 #[derive(Debug, Clone, PartialEq, Eq)] 115 pub struct RelayEventContext { 116 event_id: String, 117 pubkey: String, 118 created_at: u64, 119 kind: u32, 120 tags: Vec<Vec<String>>, 121 content: String, 122 } 123 124 impl RelayEventContext { 125 pub fn new( 126 event_id: String, 127 pubkey: String, 128 created_at: u64, 129 kind: u32, 130 tags: Vec<Vec<String>>, 131 content: String, 132 ) -> Self { 133 Self { 134 event_id, 135 pubkey, 136 created_at, 137 kind, 138 tags, 139 content, 140 } 141 } 142 143 fn from_pocket_event(event: &PocketEvent) -> Result<Self, BaseRelayError> { 144 let tags = event 145 .tags() 146 .map_err(|error| BaseRelayError::invalid(error.to_string()))? 147 .iter() 148 .map(|tag| { 149 tag.map(|value| { 150 str::from_utf8(value) 151 .map(str::to_owned) 152 .map_err(|error| BaseRelayError::invalid(error.to_string())) 153 }) 154 .collect::<Result<Vec<_>, _>>() 155 }) 156 .collect::<Result<Vec<_>, _>>()?; 157 let content = str::from_utf8(event.content()) 158 .map(str::to_owned) 159 .map_err(|error| BaseRelayError::invalid(error.to_string()))?; 160 Ok(Self { 161 event_id: event.id().to_string(), 162 pubkey: event.pubkey().to_string(), 163 created_at: event.created_at().as_u64(), 164 kind: u32::from(event.kind().as_u16()), 165 tags, 166 content, 167 }) 168 } 169 170 pub fn event_id(&self) -> &str { 171 &self.event_id 172 } 173 174 pub fn pubkey(&self) -> &str { 175 &self.pubkey 176 } 177 178 pub fn created_at(&self) -> u64 { 179 self.created_at 180 } 181 182 pub fn kind(&self) -> u32 { 183 self.kind 184 } 185 186 pub fn tags(&self) -> &[Vec<String>] { 187 &self.tags 188 } 189 190 pub fn content(&self) -> &str { 191 &self.content 192 } 193 194 pub fn has_tag(&self, name: &str, value: &str) -> bool { 195 self.tags.iter().any(|tag| { 196 tag.first().is_some_and(|tag_name| tag_name == name) 197 && tag.iter().skip(1).any(|tag_value| tag_value == value) 198 }) 199 } 200 } 201 202 #[derive(Debug, Clone, PartialEq, Eq)] 203 pub struct RelayEventAdmissionContext { 204 event: RelayEventContext, 205 authenticated_pubkeys: Vec<String>, 206 peer_ip: Option<IpAddr>, 207 connection_id: Option<u64>, 208 now: u64, 209 } 210 211 impl RelayEventAdmissionContext { 212 pub fn new( 213 event: RelayEventContext, 214 authenticated_pubkeys: Vec<String>, 215 peer_ip: Option<IpAddr>, 216 connection_id: Option<u64>, 217 now: u64, 218 ) -> Self { 219 Self { 220 event, 221 authenticated_pubkeys, 222 peer_ip, 223 connection_id, 224 now, 225 } 226 } 227 228 pub fn event(&self) -> &RelayEventContext { 229 &self.event 230 } 231 232 pub fn authenticated_pubkeys(&self) -> &[String] { 233 &self.authenticated_pubkeys 234 } 235 236 pub fn peer_ip(&self) -> Option<IpAddr> { 237 self.peer_ip 238 } 239 240 pub fn connection_id(&self) -> Option<u64> { 241 self.connection_id 242 } 243 244 pub fn now(&self) -> u64 { 245 self.now 246 } 247 } 248 249 #[derive(Debug, Clone, PartialEq, Eq)] 250 pub struct RelayEventStoredContext { 251 event: RelayEventContext, 252 store_offsets: Vec<u64>, 253 } 254 255 impl RelayEventStoredContext { 256 pub fn new(event: RelayEventContext, store_offsets: Vec<u64>) -> Self { 257 Self { 258 event, 259 store_offsets, 260 } 261 } 262 263 pub fn event(&self) -> &RelayEventContext { 264 &self.event 265 } 266 267 pub fn store_offsets(&self) -> &[u64] { 268 &self.store_offsets 269 } 270 } 271 272 struct TanglePocketQueryRateLimitRequest<'a> { 273 scope: TangleRateLimitScope, 274 rules: TangleQueryRateLimitConfig, 275 label: &'static str, 276 subscription_id: &'a SubscriptionId, 277 filters: &'a [PocketOwnedFilter], 278 auth: &'a BaseAuthState, 279 context: TangleClientRateLimitContext, 280 now: UnixTimestamp, 281 } 282 283 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 284 enum TangleQueryClassification { 285 Bounded, 286 Broad(TangleBroadQueryReason), 287 } 288 289 impl TangleQueryClassification { 290 fn is_broad(self) -> bool { 291 matches!(self, Self::Broad(_)) 292 } 293 } 294 295 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 296 enum TangleBroadQueryReason { 297 EmptyFilters, 298 MissingPrimaryConstraint, 299 MissingBoundedSelector, 300 HighLimit, 301 BroadTimeWindow, 302 } 303 304 #[derive(Debug, Clone, Copy)] 305 struct TangleQueryClassifier { 306 limits: BaseRelayLimits, 307 } 308 309 const BROAD_QUERY_TIME_WINDOW_SECONDS: u64 = 31 * 24 * 60 * 60; 310 311 impl TangleQueryClassifier { 312 fn new(limits: BaseRelayLimits) -> Self { 313 Self { limits } 314 } 315 316 fn classify_pocket_query(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification { 317 if filters.is_empty() { 318 return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters); 319 } 320 filters 321 .iter() 322 .map(|filter| self.classify_pocket_query_filter(filter)) 323 .find(|classification| classification.is_broad()) 324 .unwrap_or(TangleQueryClassification::Bounded) 325 } 326 327 fn classify_pocket_count(self, filters: &[PocketOwnedFilter]) -> TangleQueryClassification { 328 if filters.is_empty() { 329 return TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters); 330 } 331 filters 332 .iter() 333 .map(|filter| self.classify_pocket_count_filter(filter)) 334 .find(|classification| classification.is_broad()) 335 .unwrap_or(TangleQueryClassification::Bounded) 336 } 337 338 fn classify_pocket_query_filter(self, filter: &PocketFilter) -> TangleQueryClassification { 339 if !self.has_pocket_primary_constraint(filter) { 340 return TangleQueryClassification::Broad( 341 TangleBroadQueryReason::MissingPrimaryConstraint, 342 ); 343 } 344 if self.has_pocket_high_limit(filter) { 345 return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); 346 } 347 if self.has_pocket_broad_time_window(filter) && !self.has_pocket_strong_constraint(filter) { 348 return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); 349 } 350 TangleQueryClassification::Bounded 351 } 352 353 fn classify_pocket_count_filter(self, filter: &PocketFilter) -> TangleQueryClassification { 354 if !self.has_pocket_primary_constraint(filter) { 355 return TangleQueryClassification::Broad( 356 TangleBroadQueryReason::MissingPrimaryConstraint, 357 ); 358 } 359 if self.has_pocket_high_limit(filter) { 360 return TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit); 361 } 362 if self.has_pocket_broad_time_window(filter) { 363 return TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow); 364 } 365 if !self.has_pocket_count_bounded_selector(filter) { 366 return TangleQueryClassification::Broad( 367 TangleBroadQueryReason::MissingBoundedSelector, 368 ); 369 } 370 TangleQueryClassification::Bounded 371 } 372 373 fn has_pocket_primary_constraint(self, filter: &PocketFilter) -> bool { 374 filter.num_ids() > 0 375 || filter.num_authors() > 0 376 || filter.num_kinds() > 0 377 || self.has_pocket_group_constraint(filter) 378 } 379 380 fn has_pocket_strong_constraint(self, filter: &PocketFilter) -> bool { 381 filter.num_ids() > 0 || filter.num_authors() > 0 || self.has_pocket_group_constraint(filter) 382 } 383 384 fn has_pocket_count_bounded_selector(self, filter: &PocketFilter) -> bool { 385 self.has_pocket_strong_constraint(filter) 386 || (filter.num_kinds() > 0 && self.has_pocket_bounded_time_window(filter)) 387 || self.has_pocket_hll_count_selector(filter) 388 } 389 390 fn has_pocket_hll_count_selector(self, filter: &PocketFilter) -> bool { 391 filter 392 .hyperloglog_offset() 393 .is_ok_and(|offset| offset.is_some()) 394 } 395 396 fn has_pocket_group_constraint(self, filter: &PocketFilter) -> bool { 397 filter 398 .tags() 399 .map(|tags| { 400 tags.iter().any(|mut tag| { 401 let name = tag.next(); 402 let has_value = tag.next().is_some(); 403 matches!(name, Some(value) if matches!(value, b"h" | b"d")) && has_value 404 }) 405 }) 406 .unwrap_or(false) 407 } 408 409 fn has_pocket_high_limit(self, filter: &PocketFilter) -> bool { 410 let limit = if filter.limit() == u32::MAX { 411 self.limits.default_limit() 412 } else { 413 u64::from(filter.limit()) 414 }; 415 limit >= self.limits.max_limit() 416 } 417 418 fn has_pocket_bounded_time_window(self, filter: &PocketFilter) -> bool { 419 if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() { 420 return false; 421 } 422 filter 423 .until() 424 .as_ref() 425 .saturating_sub(*filter.since().as_ref()) 426 <= BROAD_QUERY_TIME_WINDOW_SECONDS 427 } 428 429 fn has_pocket_broad_time_window(self, filter: &PocketFilter) -> bool { 430 if filter.since() == PocketTime::min() || filter.until() == PocketTime::max() { 431 return false; 432 } 433 filter 434 .until() 435 .as_ref() 436 .saturating_sub(*filter.since().as_ref()) 437 > BROAD_QUERY_TIME_WINDOW_SECONDS 438 } 439 } 440 441 impl RelayRuntime { 442 pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { 443 Self::open_with_hooks(config, Arc::new(NoopRelayRuntimeHooks)) 444 } 445 446 pub fn open_with_hooks( 447 config: BaseRelayRuntimeConfig, 448 hooks: Arc<dyn RelayRuntimeHooks>, 449 ) -> Result<Self, BaseRelayError> { 450 let limits = TangleRuntimeLimits::from_config(&config)?; 451 let relay = config.open_relay()?; 452 let readiness = BaseRelayReadinessHandle::new(relay.readiness_state()); 453 let event_bus = TangleEventBus::new(limits.event_bus_capacity())?; 454 let rate_limiter = TangleRateLimiter::new(); 455 let metrics = TangleRuntimeMetrics::new(); 456 metrics.record_disk_used_bytes(directory_size_bytes( 457 config.pocket_config().data_directory(), 458 )); 459 metrics.record_event_bus_receivers(event_bus.receiver_count()); 460 metrics.record_outbox_pending_events(relay.group_outbox_pending_events()); 461 logging::log_runtime_opened(&config); 462 Ok(Self { 463 config, 464 relay, 465 readiness, 466 event_bus, 467 rate_limiter, 468 metrics, 469 limits, 470 shutdown: TangleShutdownSignal::new(), 471 hooks, 472 }) 473 } 474 475 pub fn config(&self) -> &BaseRelayRuntimeConfig { 476 &self.config 477 } 478 479 pub fn relay(&self) -> &BaseRelay { 480 &self.relay 481 } 482 483 pub fn relay_mut(&mut self) -> &mut BaseRelay { 484 &mut self.relay 485 } 486 487 pub fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { 488 self.config.auth_state() 489 } 490 491 pub fn readiness_state(&self) -> BaseRelayReadinessState { 492 self.readiness.snapshot() 493 } 494 495 pub fn readiness_handle(&self) -> BaseRelayReadinessHandle { 496 self.readiness.clone() 497 } 498 499 pub fn limits(&self) -> TangleRuntimeLimits { 500 self.limits 501 } 502 503 pub fn event_bus(&self) -> &TangleEventBus { 504 &self.event_bus 505 } 506 507 pub fn rate_limiter(&self) -> &TangleRateLimiter { 508 &self.rate_limiter 509 } 510 511 pub fn metrics(&self) -> &TangleRuntimeMetrics { 512 &self.metrics 513 } 514 515 pub fn shutdown_signal(&self) -> &TangleShutdownSignal { 516 &self.shutdown 517 } 518 519 pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> { 520 self.shutdown.request_shutdown(); 521 self.relay.shutdown() 522 } 523 } 524 525 struct RelayRuntimeShared { 526 config: Arc<BaseRelayRuntimeConfig>, 527 store: PocketStoreHandle, 528 groups: Option<GroupServiceHandle>, 529 readiness: BaseRelayReadinessHandle, 530 limits: TangleRuntimeLimits, 531 event_bus: TangleEventBus, 532 rate_limiter: TangleRateLimiter, 533 metrics: TangleRuntimeMetrics, 534 shutdown: TangleShutdownSignal, 535 hooks: Arc<dyn RelayRuntimeHooks>, 536 } 537 538 impl RelayRuntimeShared { 539 fn from_runtime(runtime: RelayRuntime) -> Self { 540 let RelayRuntime { 541 config, 542 relay, 543 readiness, 544 limits, 545 event_bus, 546 rate_limiter, 547 metrics, 548 shutdown, 549 hooks, 550 } = runtime; 551 let store = relay.store_handle(); 552 let groups = relay.group_service_handle(); 553 Self { 554 config: Arc::new(config), 555 store, 556 groups, 557 readiness, 558 limits, 559 event_bus, 560 rate_limiter, 561 metrics, 562 shutdown, 563 hooks, 564 } 565 } 566 567 fn rate_limit_event_pocket( 568 &self, 569 event: &PocketEvent, 570 context: TangleClientRateLimitContext, 571 now: UnixTimestamp, 572 ) -> Result<Option<RelayMessage>, BaseRelayError> { 573 let rules = self.config.rate_limits().event(); 574 if let Some(peer_ip) = context.peer_ip 575 && let Some(message) = self.rate_limit_ok_pocket( 576 event, 577 TangleRateLimitKey::ip(TangleRateLimitScope::Event, peer_ip), 578 rules.per_ip(), 579 "event ip", 580 now, 581 )? 582 { 583 return Ok(Some(message)); 584 } 585 self.rate_limit_ok_pocket( 586 event, 587 TangleRateLimitKey::pubkey(TangleRateLimitScope::Event, pocket_event_pubkey(event)?), 588 rules.per_pubkey(), 589 "event pubkey", 590 now, 591 ) 592 .and_then(|message| { 593 if message.is_some() { 594 return Ok(message); 595 } 596 self.rate_limit_ok_pocket( 597 event, 598 TangleRateLimitKey::kind(TangleRateLimitScope::Event, pocket_event_kind(event)?), 599 rules.per_kind(), 600 "event kind", 601 now, 602 ) 603 }) 604 } 605 606 fn rate_limit_auth_attempt_pocket( 607 &self, 608 event: &PocketEvent, 609 context: TangleClientRateLimitContext, 610 now: UnixTimestamp, 611 ) -> Result<Option<RelayMessage>, BaseRelayError> { 612 let rules = self.config.rate_limits().auth(); 613 if let Some(peer_ip) = context.peer_ip 614 && let Some(message) = self.rate_limit_ok_pocket( 615 event, 616 TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip), 617 rules.per_ip(), 618 "auth ip", 619 now, 620 )? 621 { 622 return Ok(Some(message)); 623 } 624 self.rate_limit_ok_pocket( 625 event, 626 TangleRateLimitKey::pubkey(TangleRateLimitScope::Auth, pocket_event_pubkey(event)?), 627 rules.per_pubkey(), 628 "auth pubkey", 629 now, 630 ) 631 } 632 633 fn rate_limit_auth_failure_pocket( 634 &self, 635 event: &PocketEvent, 636 context: TangleClientRateLimitContext, 637 now: UnixTimestamp, 638 ) -> Result<Option<RelayMessage>, BaseRelayError> { 639 let rules = self.config.rate_limits().auth(); 640 if let Some(peer_ip) = context.peer_ip 641 && let Some(message) = self.rate_limit_ok_pocket( 642 event, 643 TangleRateLimitKey::auth_failure(Some(peer_ip), None), 644 rules.failures_per_ip(), 645 "auth failure ip", 646 now, 647 )? 648 { 649 return Ok(Some(message)); 650 } 651 self.rate_limit_ok_pocket( 652 event, 653 TangleRateLimitKey::auth_failure(None, Some(pocket_event_pubkey(event)?)), 654 rules.failures(), 655 "auth failure", 656 now, 657 ) 658 } 659 660 fn rate_limit_group_write_pocket( 661 &self, 662 event: &PocketEvent, 663 context: TangleClientRateLimitContext, 664 now: UnixTimestamp, 665 ) -> Result<Option<RelayMessage>, BaseRelayError> { 666 if !self.config.groups().enabled() { 667 return Ok(None); 668 } 669 let class = 670 validate_client_group_event_structure(event, self.config.groups().limits()).ok(); 671 let Some(class) = class else { 672 return Ok(None); 673 }; 674 let Some(group_id) = class.group_id().cloned() else { 675 return Ok(None); 676 }; 677 let rules = self.config.rate_limits().group(); 678 let kind = pocket_event_kind(event)?; 679 let pubkey = pocket_event_pubkey(event)?; 680 if kind.as_u32() == KIND_GROUP_JOIN_REQUEST { 681 if let Some(peer_ip) = context.peer_ip 682 && let Some(message) = self.rate_limit_ok_pocket( 683 event, 684 TangleRateLimitKey::join_flow_ip(group_id.clone(), peer_ip), 685 rules.join_flow_per_ip(), 686 "group join ip", 687 now, 688 )? 689 { 690 return Ok(Some(message)); 691 } 692 if let Some(message) = self.rate_limit_ok_pocket( 693 event, 694 TangleRateLimitKey::join_flow(group_id.clone(), pubkey.clone()), 695 rules.join_flow(), 696 "group join", 697 now, 698 )? { 699 return Ok(Some(message)); 700 } 701 } 702 if let Some(peer_ip) = context.peer_ip 703 && let Some(message) = self.rate_limit_ok_pocket( 704 event, 705 TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip), 706 rules.write_per_ip(), 707 "group ip", 708 now, 709 )? 710 { 711 return Ok(Some(message)); 712 } 713 if let Some(message) = self.rate_limit_ok_pocket( 714 event, 715 TangleRateLimitKey::pubkey(TangleRateLimitScope::GroupWrite, pubkey), 716 rules.write_per_pubkey(), 717 "group pubkey", 718 now, 719 )? { 720 return Ok(Some(message)); 721 } 722 if let Some(message) = self.rate_limit_ok_pocket( 723 event, 724 TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id), 725 rules.write_per_group(), 726 "group write", 727 now, 728 )? { 729 return Ok(Some(message)); 730 } 731 self.rate_limit_ok_pocket( 732 event, 733 TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, kind), 734 rules.write_per_kind(), 735 "group kind", 736 now, 737 ) 738 } 739 740 fn is_group_event_pocket(&self, event: &PocketEvent) -> bool { 741 self.config.groups().enabled() 742 && validate_client_group_event_structure(event, self.config.groups().limits()) 743 .is_ok_and(|class| !matches!(class, GroupEventClass::NonGroup)) 744 } 745 746 fn handle_pocket_event_with_auth_report( 747 &self, 748 event: &PocketEvent, 749 auth: &BaseAuthState, 750 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 751 BaseRelay::handle_pocket_event_with_shared_services( 752 &self.store, 753 self.groups.as_ref(), 754 self.limits.base_relay_limits(), 755 event, 756 auth, 757 ) 758 } 759 760 fn group_outbox_pending_events(&self) -> usize { 761 self.groups 762 .as_ref() 763 .map(GroupServiceHandle::outbox_pending_events) 764 .unwrap_or(0) 765 } 766 767 fn query_req_with_auth_report( 768 &self, 769 subscription_id: SubscriptionId, 770 filters: Vec<PocketOwnedFilter>, 771 search_present: bool, 772 auth: &BaseAuthState, 773 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 774 BaseRelay::query_req_with_shared_services( 775 &self.store, 776 self.groups.as_ref(), 777 self.limits.base_relay_limits(), 778 self.config.pocket_query_config(), 779 BaseRelayReqQuery::new(subscription_id, filters, search_present, auth), 780 ) 781 } 782 783 fn handle_count_with_auth_report( 784 &self, 785 subscription_id: SubscriptionId, 786 filters: Vec<PocketOwnedFilter>, 787 search_present: bool, 788 auth: &BaseAuthState, 789 ) -> Result<BaseRelayCountReport, BaseRelayError> { 790 BaseRelay::handle_count_with_shared_services( 791 &self.store, 792 self.groups.as_ref(), 793 self.limits.base_relay_limits(), 794 self.config.pocket_query_config(), 795 BaseRelayCountQuery::new(subscription_id, filters, search_present, auth), 796 ) 797 } 798 799 fn rate_limit_req_pocket( 800 &self, 801 subscription_id: &SubscriptionId, 802 filters: &[PocketOwnedFilter], 803 auth: &BaseAuthState, 804 context: TangleClientRateLimitContext, 805 now: UnixTimestamp, 806 ) -> Option<RelayMessage> { 807 self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest { 808 scope: TangleRateLimitScope::Req, 809 rules: self.config.rate_limits().req(), 810 label: "req", 811 subscription_id, 812 filters, 813 auth, 814 context, 815 now, 816 }) 817 } 818 819 fn rate_limit_count_pocket( 820 &self, 821 subscription_id: &SubscriptionId, 822 filters: &[PocketOwnedFilter], 823 auth: &BaseAuthState, 824 context: TangleClientRateLimitContext, 825 now: UnixTimestamp, 826 ) -> Option<RelayMessage> { 827 self.rate_limit_pocket_query(TanglePocketQueryRateLimitRequest { 828 scope: TangleRateLimitScope::Count, 829 rules: self.config.rate_limits().count(), 830 label: "count", 831 subscription_id, 832 filters, 833 auth, 834 context, 835 now, 836 }) 837 } 838 839 fn refuse_broad_count( 840 &self, 841 subscription_id: &SubscriptionId, 842 filters: &[PocketOwnedFilter], 843 ) -> Option<RelayMessage> { 844 if TangleQueryClassifier::new(self.limits.base_relay_limits()) 845 .classify_pocket_count(filters) 846 .is_broad() 847 { 848 self.metrics.record_count_refusal(); 849 self.metrics.record_broad_query_rejection(); 850 return Some(RelayMessage::Closed { 851 subscription_id: subscription_id.clone(), 852 message: BaseRelayError::restricted("count filters are too broad or expensive") 853 .prefixed_message(), 854 }); 855 } 856 None 857 } 858 859 fn rate_limit_pocket_query( 860 &self, 861 request: TanglePocketQueryRateLimitRequest<'_>, 862 ) -> Option<RelayMessage> { 863 if let Some(peer_ip) = request.context.peer_ip 864 && let Some(message) = self.rate_limit_closed( 865 request.subscription_id, 866 TangleRateLimitKey::ip(request.scope, peer_ip), 867 request.rules.per_ip(), 868 request.label, 869 "ip", 870 request.now, 871 ) 872 { 873 return Some(message); 874 } 875 if let Some(connection_id) = request.context.connection_id 876 && let Some(message) = self.rate_limit_closed( 877 request.subscription_id, 878 TangleRateLimitKey::connection(request.scope, connection_id), 879 request.rules.per_connection(), 880 request.label, 881 "connection", 882 request.now, 883 ) 884 { 885 return Some(message); 886 } 887 for pubkey in request.auth.authenticated_pubkeys() { 888 if let Some(message) = self.rate_limit_closed( 889 request.subscription_id, 890 TangleRateLimitKey::pubkey(request.scope, pubkey.clone()), 891 request.rules.per_pubkey(), 892 request.label, 893 "pubkey", 894 request.now, 895 ) { 896 return Some(message); 897 } 898 } 899 for group_id in pocket_filter_group_ids(request.filters) { 900 if let Some(message) = self.rate_limit_closed( 901 request.subscription_id, 902 TangleRateLimitKey::group(request.scope, group_id), 903 request.rules.per_group(), 904 request.label, 905 "group", 906 request.now, 907 ) { 908 return Some(message); 909 } 910 } 911 for kind in pocket_filter_kinds(request.filters) { 912 if let Some(message) = self.rate_limit_closed( 913 request.subscription_id, 914 TangleRateLimitKey::kind(request.scope, kind), 915 request.rules.per_kind(), 916 request.label, 917 "kind", 918 request.now, 919 ) { 920 return Some(message); 921 } 922 } 923 let classifier = TangleQueryClassifier::new(self.limits.base_relay_limits()); 924 let query_classification = match request.scope { 925 TangleRateLimitScope::Req => classifier.classify_pocket_query(request.filters), 926 TangleRateLimitScope::Count => classifier.classify_pocket_count(request.filters), 927 TangleRateLimitScope::Auth 928 | TangleRateLimitScope::Event 929 | TangleRateLimitScope::GroupWrite => classifier.classify_pocket_query(request.filters), 930 }; 931 if query_classification.is_broad() 932 && let Some(message) = self.rate_limit_closed( 933 request.subscription_id, 934 TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad), 935 request.rules.broad(), 936 request.label, 937 "broad", 938 request.now, 939 ) 940 { 941 self.metrics.record_broad_query_rejection(); 942 return Some(message); 943 } 944 None 945 } 946 947 fn rate_limit_closed( 948 &self, 949 subscription_id: &SubscriptionId, 950 key: TangleRateLimitKey, 951 rule: TangleRateLimitRule, 952 label: &'static str, 953 dimension: &'static str, 954 now: UnixTimestamp, 955 ) -> Option<RelayMessage> { 956 match self.rate_limiter.record(key, rule, now) { 957 TangleRateLimitDecision::Allowed { .. } => None, 958 TangleRateLimitDecision::Rejected { reset_at } => { 959 self.metrics.record_rate_limit_rejection(); 960 logging::log_rate_limit_rejected(label, dimension, reset_at); 961 Some(RelayMessage::Closed { 962 subscription_id: subscription_id.clone(), 963 message: BaseRelayError::rate_limited(format!( 964 "{label} {dimension} rate limit exceeded until {reset_at}" 965 )) 966 .prefixed_message(), 967 }) 968 } 969 } 970 } 971 972 fn rate_limit_ok_pocket( 973 &self, 974 event: &PocketEvent, 975 key: TangleRateLimitKey, 976 rule: TangleRateLimitRule, 977 label: &'static str, 978 now: UnixTimestamp, 979 ) -> Result<Option<RelayMessage>, BaseRelayError> { 980 Ok(match self.rate_limiter.record(key, rule, now) { 981 TangleRateLimitDecision::Allowed { .. } => None, 982 TangleRateLimitDecision::Rejected { reset_at } => { 983 self.metrics.record_rate_limit_rejection(); 984 logging::log_rate_limit_rejected(label, "event", reset_at); 985 Some(RelayMessage::Ok { 986 event_id: pocket_event_id(event)?, 987 accepted: false, 988 message: BaseRelayError::rate_limited(format!( 989 "{label} rate limit exceeded until {reset_at}" 990 )) 991 .prefixed_message(), 992 }) 993 } 994 }) 995 } 996 } 997 998 #[derive(Clone)] 999 pub struct RelayRuntimeHandle { 1000 inner: Arc<RelayRuntimeShared>, 1001 } 1002 1003 impl RelayRuntimeHandle { 1004 pub fn new(runtime: RelayRuntime) -> Self { 1005 Self { 1006 inner: Arc::new(RelayRuntimeShared::from_runtime(runtime)), 1007 } 1008 } 1009 1010 pub fn metrics(&self) -> TangleRuntimeMetrics { 1011 self.inner.metrics.clone() 1012 } 1013 1014 pub fn readiness_handle(&self) -> BaseRelayReadinessHandle { 1015 self.inner.readiness.clone() 1016 } 1017 1018 pub fn limits(&self) -> TangleRuntimeLimits { 1019 self.inner.limits 1020 } 1021 1022 pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { 1023 self.inner.config.auth_state() 1024 } 1025 1026 pub async fn handle_count_pocket( 1027 &self, 1028 subscription_id: SubscriptionId, 1029 filters: Vec<PocketOwnedFilter>, 1030 auth: &mut BaseAuthState, 1031 now: UnixTimestamp, 1032 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1033 let messages = self 1034 .handle_client_message_with_rate_limit_context( 1035 RuntimeClientMessage::Count { 1036 subscription_id, 1037 filters, 1038 search_present: false, 1039 }, 1040 auth, 1041 TangleClientRateLimitContext::default(), 1042 now, 1043 ) 1044 .await?; 1045 protocol_control_messages(messages) 1046 } 1047 1048 #[cfg(test)] 1049 pub(crate) async fn handle_client_message( 1050 &self, 1051 message: RuntimeClientMessage, 1052 auth: &mut BaseAuthState, 1053 now: UnixTimestamp, 1054 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1055 let messages = self 1056 .handle_client_message_with_rate_limit_context( 1057 message, 1058 auth, 1059 TangleClientRateLimitContext::default(), 1060 now, 1061 ) 1062 .await?; 1063 protocol_messages_for_test(messages) 1064 } 1065 1066 #[cfg(test)] 1067 pub(crate) async fn handle_protocol_client_message_for_test( 1068 &self, 1069 message: tangle_protocol::ClientMessage, 1070 auth: &mut BaseAuthState, 1071 now: UnixTimestamp, 1072 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1073 self.handle_client_message( 1074 protocol_client_message_to_runtime_for_test(message)?, 1075 auth, 1076 now, 1077 ) 1078 .await 1079 } 1080 1081 #[cfg(test)] 1082 pub(crate) async fn handle_protocol_client_message_with_rate_limit_context_for_test( 1083 &self, 1084 message: tangle_protocol::ClientMessage, 1085 auth: &mut BaseAuthState, 1086 rate_limit_context: TangleClientRateLimitContext, 1087 now: UnixTimestamp, 1088 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1089 let messages = self 1090 .handle_client_message_with_rate_limit_context( 1091 protocol_client_message_to_runtime_for_test(message)?, 1092 auth, 1093 rate_limit_context, 1094 now, 1095 ) 1096 .await?; 1097 protocol_messages_for_test(messages) 1098 } 1099 1100 pub(crate) async fn handle_client_message_with_rate_limit_context( 1101 &self, 1102 message: RuntimeClientMessage, 1103 auth: &mut BaseAuthState, 1104 rate_limit_context: TangleClientRateLimitContext, 1105 now: UnixTimestamp, 1106 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 1107 self.inner 1108 .metrics 1109 .record_client_message(runtime_client_message_metric_kind(&message)); 1110 match message { 1111 RuntimeClientMessage::Event(pocket_event) => { 1112 let started_at = Instant::now(); 1113 let event_id = pocket_event_id(&pocket_event)?; 1114 let event_context = RelayEventContext::from_pocket_event(&pocket_event)?; 1115 let is_group_event = self.inner.is_group_event_pocket(&pocket_event); 1116 if let Some(message) = 1117 self.inner 1118 .rate_limit_event_pocket(&pocket_event, rate_limit_context, now)? 1119 { 1120 record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); 1121 return Ok(vec![message.into()]); 1122 } 1123 if let Some(message) = self.inner.rate_limit_group_write_pocket( 1124 &pocket_event, 1125 rate_limit_context, 1126 now, 1127 )? { 1128 record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); 1129 return Ok(vec![message.into()]); 1130 } 1131 let authenticated_pubkeys = auth 1132 .authenticated_pubkeys() 1133 .iter() 1134 .map(ToString::to_string) 1135 .collect(); 1136 let admission = RelayEventAdmissionContext::new( 1137 event_context.clone(), 1138 authenticated_pubkeys, 1139 rate_limit_context.peer_ip(), 1140 rate_limit_context.connection_id(), 1141 now.as_u64(), 1142 ); 1143 if let EventAdmissionDecision::Reject { message } = 1144 self.inner.hooks.admit_event(&admission) 1145 { 1146 let message = RelayMessage::Ok { 1147 event_id, 1148 accepted: false, 1149 message: BaseRelayError::restricted(message).prefixed_message(), 1150 }; 1151 record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); 1152 return Ok(vec![message.into()]); 1153 } 1154 let result = self 1155 .inner 1156 .handle_pocket_event_with_auth_report(&pocket_event, auth)?; 1157 let group_outbox_pending_events = 1158 is_group_event.then(|| self.inner.group_outbox_pending_events()); 1159 if is_group_event { 1160 for _ in 0..result.stored_offsets().len().saturating_sub(1) { 1161 self.inner.metrics.record_outbox_replayed_event(); 1162 } 1163 self.inner 1164 .metrics 1165 .record_outbox_pending_events(group_outbox_pending_events.unwrap_or(0)); 1166 } 1167 for offset in result.stored_offsets() { 1168 self.inner.metrics.record_stored_event_offset(); 1169 let receivers = self.inner.event_bus.publish(*offset); 1170 self.inner.metrics.record_event_bus_publish(receivers); 1171 } 1172 if !result.stored_offsets().is_empty() { 1173 logging::log_event_stored( 1174 &event_id, 1175 result.stored_offsets().len(), 1176 self.inner.metrics.stored_event_offsets(), 1177 ); 1178 self.inner.hooks.event_stored(&RelayEventStoredContext::new( 1179 event_context, 1180 result 1181 .stored_offsets() 1182 .iter() 1183 .map(|offset| offset.as_u64()) 1184 .collect(), 1185 )); 1186 } 1187 let message = result.into_message(); 1188 record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); 1189 Ok(vec![message.into()]) 1190 } 1191 RuntimeClientMessage::Req { 1192 subscription_id, 1193 filters, 1194 search_present, 1195 } => { 1196 let started_at = Instant::now(); 1197 self.inner 1198 .limits 1199 .base_relay_limits() 1200 .validate_subscription_id(&subscription_id)?; 1201 self.inner 1202 .limits 1203 .base_relay_limits() 1204 .validate_pocket_filters(&filters)?; 1205 if let Some(message) = 1206 BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) 1207 { 1208 self.inner 1209 .metrics 1210 .record_query_latency(elapsed_micros(started_at)); 1211 return Ok(vec![message.into()]); 1212 } 1213 if let Some(message) = self.inner.rate_limit_req_pocket( 1214 &subscription_id, 1215 &filters, 1216 auth, 1217 rate_limit_context, 1218 now, 1219 ) { 1220 self.inner 1221 .metrics 1222 .record_query_latency(elapsed_micros(started_at)); 1223 return Ok(vec![message.into()]); 1224 } 1225 let report = self.inner.query_req_with_auth_report( 1226 subscription_id, 1227 filters, 1228 search_present, 1229 auth, 1230 )?; 1231 self.inner 1232 .metrics 1233 .record_query_metrics(report.query_metrics()); 1234 if report.group_read_denied() { 1235 self.inner.metrics.record_group_read_denial(); 1236 } 1237 self.inner 1238 .metrics 1239 .record_query_latency(elapsed_micros(started_at)); 1240 Ok(report.into_messages()) 1241 } 1242 RuntimeClientMessage::Count { 1243 subscription_id, 1244 filters, 1245 search_present, 1246 } => { 1247 let started_at = Instant::now(); 1248 self.inner 1249 .limits 1250 .base_relay_limits() 1251 .validate_subscription_id(&subscription_id)?; 1252 self.inner 1253 .limits 1254 .base_relay_limits() 1255 .validate_pocket_filters(&filters)?; 1256 if let Some(message) = 1257 BaseRelay::unsupported_search_present_closed(&subscription_id, search_present) 1258 { 1259 self.inner 1260 .metrics 1261 .record_query_latency(elapsed_micros(started_at)); 1262 return Ok(vec![message.into()]); 1263 } 1264 if let Some(message) = self.inner.refuse_broad_count(&subscription_id, &filters) { 1265 self.inner 1266 .metrics 1267 .record_query_latency(elapsed_micros(started_at)); 1268 return Ok(vec![message.into()]); 1269 } 1270 if let Some(message) = self.inner.rate_limit_count_pocket( 1271 &subscription_id, 1272 &filters, 1273 auth, 1274 rate_limit_context, 1275 now, 1276 ) { 1277 self.inner 1278 .metrics 1279 .record_query_latency(elapsed_micros(started_at)); 1280 return Ok(vec![message.into()]); 1281 } 1282 let report = self.inner.handle_count_with_auth_report( 1283 subscription_id, 1284 filters, 1285 search_present, 1286 auth, 1287 )?; 1288 self.inner 1289 .metrics 1290 .record_query_metrics(report.query_metrics()); 1291 if report.group_read_denied() { 1292 self.inner.metrics.record_group_read_denial(); 1293 } 1294 self.inner 1295 .metrics 1296 .record_query_latency(elapsed_micros(started_at)); 1297 Ok(vec![report.into_message().into()]) 1298 } 1299 RuntimeClientMessage::Auth(pocket_event) => { 1300 let event_id = pocket_event_id(&pocket_event)?; 1301 if let Err(error) = self 1302 .inner 1303 .limits 1304 .base_relay_limits() 1305 .validate_pocket_event(&pocket_event) 1306 { 1307 self.inner.metrics.record_auth_failure(); 1308 return Ok(vec![RuntimeRelayMessage::from(RelayMessage::Ok { 1309 event_id, 1310 accepted: false, 1311 message: error.prefixed_message(), 1312 })]); 1313 } 1314 if let Some(message) = self.inner.rate_limit_auth_attempt_pocket( 1315 &pocket_event, 1316 rate_limit_context, 1317 now, 1318 )? { 1319 self.inner.metrics.record_auth_failure(); 1320 return Ok(vec![message.into()]); 1321 } 1322 let event_for_failure = pocket_event.clone(); 1323 let replies = BaseRelay::handle_pocket_auth_with_limits( 1324 self.inner.limits.base_relay_limits(), 1325 &pocket_event, 1326 auth, 1327 now, 1328 ); 1329 if auth_response_failed(&replies) { 1330 self.inner.metrics.record_auth_failure(); 1331 if let Some(message) = self.inner.rate_limit_auth_failure_pocket( 1332 &event_for_failure, 1333 rate_limit_context, 1334 now, 1335 )? { 1336 return Ok(vec![message.into()]); 1337 } 1338 } else { 1339 self.inner.metrics.record_auth_success(); 1340 } 1341 Ok(replies.into_iter().map(Into::into).collect()) 1342 } 1343 RuntimeClientMessage::Close(subscription_id) => { 1344 self.inner 1345 .limits 1346 .base_relay_limits() 1347 .validate_subscription_id(&subscription_id)?; 1348 Ok(Vec::new()) 1349 } 1350 RuntimeClientMessage::NegOpen { 1351 subscription_id, .. 1352 } 1353 | RuntimeClientMessage::NegMsg { 1354 subscription_id, .. 1355 } => { 1356 self.inner 1357 .limits 1358 .base_relay_limits() 1359 .validate_subscription_id(&subscription_id)?; 1360 Ok(vec![ 1361 BaseRelay::disabled_negentropy_message(subscription_id).into(), 1362 ]) 1363 } 1364 RuntimeClientMessage::NegClose(subscription_id) => { 1365 self.inner 1366 .limits 1367 .base_relay_limits() 1368 .validate_subscription_id(&subscription_id)?; 1369 Ok(Vec::new()) 1370 } 1371 } 1372 } 1373 1374 pub async fn subscribe_events(&self) -> TangleEventReceiver { 1375 let receiver = self.inner.event_bus.subscribe(); 1376 self.inner 1377 .metrics 1378 .record_event_bus_receivers(self.inner.event_bus.receiver_count()); 1379 receiver 1380 } 1381 1382 pub async fn rate_limiter(&self) -> TangleRateLimiter { 1383 self.inner.rate_limiter.clone() 1384 } 1385 1386 pub(crate) async fn rate_limit_req_pocket( 1387 &self, 1388 subscription_id: &SubscriptionId, 1389 filters: &[PocketOwnedFilter], 1390 auth: &BaseAuthState, 1391 rate_limit_context: TangleClientRateLimitContext, 1392 now: UnixTimestamp, 1393 ) -> Option<RelayMessage> { 1394 self.inner 1395 .rate_limit_req_pocket(subscription_id, filters, auth, rate_limit_context, now) 1396 } 1397 1398 pub(crate) async fn query_req_with_auth_report( 1399 &self, 1400 subscription_id: SubscriptionId, 1401 filters: Vec<PocketOwnedFilter>, 1402 search_present: bool, 1403 auth: &BaseAuthState, 1404 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 1405 let started_at = Instant::now(); 1406 let report = self.inner.query_req_with_auth_report( 1407 subscription_id, 1408 filters, 1409 search_present, 1410 auth, 1411 )?; 1412 if report.group_read_denied() { 1413 self.inner.metrics.record_group_read_denial(); 1414 } 1415 self.inner 1416 .metrics 1417 .record_query_latency(elapsed_micros(started_at)); 1418 Ok(report) 1419 } 1420 1421 pub async fn event_by_offset_with_auth( 1422 &self, 1423 offset: StoreOffset, 1424 auth: &BaseAuthState, 1425 ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> { 1426 let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?; 1427 let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); 1428 let visible = BaseRelay::group_read_gate_visible_to_auth( 1429 self.inner.groups.as_ref(), 1430 &pocket_event, 1431 &group_auth, 1432 )?; 1433 if !visible { 1434 self.inner.metrics.record_group_read_denial(); 1435 return Ok(None); 1436 } 1437 Ok(Some(pocket_event)) 1438 } 1439 1440 pub(crate) async fn fanout_event_offset( 1441 &self, 1442 offset: StoreOffset, 1443 subscriptions: &mut LiveSubscriptionSet, 1444 auth: &BaseAuthState, 1445 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 1446 let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?; 1447 let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); 1448 let subscriptions = subscriptions.fanout(&pocket_event, &group_auth, |event, auth| { 1449 BaseRelay::group_read_gate_visible_to_auth(self.inner.groups.as_ref(), event, auth) 1450 .unwrap_or(false) 1451 })?; 1452 Ok(subscriptions 1453 .into_iter() 1454 .map(|subscription_id| { 1455 RuntimeRelayMessage::event(subscription_id, pocket_event.clone()) 1456 }) 1457 .collect()) 1458 } 1459 1460 pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> { 1461 self.inner.shutdown.request_shutdown(); 1462 self.inner.store.sync()?; 1463 Ok(BaseRelayShutdownReport::new(0)) 1464 } 1465 } 1466 1467 fn auth_response_failed(replies: &[RelayMessage]) -> bool { 1468 replies.iter().any(|reply| { 1469 matches!( 1470 reply, 1471 RelayMessage::Ok { 1472 accepted: false, 1473 .. 1474 } 1475 ) 1476 }) 1477 } 1478 1479 fn record_event_metrics( 1480 metrics: &TangleRuntimeMetrics, 1481 message: &RelayMessage, 1482 is_group_event: bool, 1483 started_at: Instant, 1484 ) { 1485 metrics.record_event_admission_latency(elapsed_micros(started_at)); 1486 if let RelayMessage::Ok { accepted, .. } = message { 1487 if *accepted { 1488 metrics.record_event_admission(); 1489 } else { 1490 metrics.record_event_rejection(); 1491 if is_group_event { 1492 metrics.record_group_write_denial(); 1493 } 1494 } 1495 } 1496 } 1497 1498 fn elapsed_micros(started_at: Instant) -> u64 { 1499 u64::try_from(started_at.elapsed().as_micros()).unwrap_or(u64::MAX) 1500 } 1501 1502 fn directory_size_bytes(path: &Path) -> u64 { 1503 let Ok(metadata) = fs::metadata(path) else { 1504 return 0; 1505 }; 1506 if metadata.is_file() { 1507 return metadata.len(); 1508 } 1509 if !metadata.is_dir() { 1510 return 0; 1511 } 1512 let Ok(entries) = fs::read_dir(path) else { 1513 return 0; 1514 }; 1515 entries 1516 .filter_map(Result::ok) 1517 .map(|entry| directory_size_bytes(&entry.path())) 1518 .sum() 1519 } 1520 1521 #[cfg(test)] 1522 fn protocol_client_message_to_runtime_for_test( 1523 message: tangle_protocol::ClientMessage, 1524 ) -> Result<RuntimeClientMessage, BaseRelayError> { 1525 match message { 1526 tangle_protocol::ClientMessage::Event(event) => Ok(RuntimeClientMessage::Event( 1527 crate::pocket_conversion::tangle_event_to_pocket(&event)?, 1528 )), 1529 tangle_protocol::ClientMessage::Req { 1530 subscription_id, 1531 filters, 1532 } => Ok(RuntimeClientMessage::Req { 1533 subscription_id, 1534 search_present: filters.iter().any(|filter| filter.search().is_some()), 1535 filters: filters 1536 .iter() 1537 .map(crate::pocket_conversion::tangle_filter_to_pocket) 1538 .collect::<Result<Vec<_>, _>>()?, 1539 }), 1540 tangle_protocol::ClientMessage::Count { 1541 subscription_id, 1542 filters, 1543 } => Ok(RuntimeClientMessage::Count { 1544 subscription_id, 1545 search_present: filters.iter().any(|filter| filter.search().is_some()), 1546 filters: filters 1547 .iter() 1548 .map(crate::pocket_conversion::tangle_filter_to_pocket) 1549 .collect::<Result<Vec<_>, _>>()?, 1550 }), 1551 tangle_protocol::ClientMessage::Close(subscription_id) => { 1552 Ok(RuntimeClientMessage::Close(subscription_id)) 1553 } 1554 tangle_protocol::ClientMessage::Auth(event) => Ok(RuntimeClientMessage::Auth( 1555 crate::pocket_conversion::tangle_event_to_pocket(&event)?, 1556 )), 1557 tangle_protocol::ClientMessage::NegOpen { 1558 subscription_id, 1559 filter, 1560 message, 1561 } => Ok(RuntimeClientMessage::NegOpen { 1562 subscription_id, 1563 filter: crate::pocket_conversion::tangle_filter_to_pocket(&filter)?, 1564 message, 1565 }), 1566 tangle_protocol::ClientMessage::NegMsg { 1567 subscription_id, 1568 message, 1569 } => Ok(RuntimeClientMessage::NegMsg { 1570 subscription_id, 1571 message, 1572 }), 1573 tangle_protocol::ClientMessage::NegClose(subscription_id) => { 1574 Ok(RuntimeClientMessage::NegClose(subscription_id)) 1575 } 1576 } 1577 } 1578 1579 fn runtime_client_message_metric_kind( 1580 message: &RuntimeClientMessage, 1581 ) -> TangleClientMessageMetricKind { 1582 match message { 1583 RuntimeClientMessage::Event(_) => TangleClientMessageMetricKind::Event, 1584 RuntimeClientMessage::Req { .. } => TangleClientMessageMetricKind::Req, 1585 RuntimeClientMessage::Count { .. } => TangleClientMessageMetricKind::Count, 1586 RuntimeClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth, 1587 RuntimeClientMessage::Close(_) => TangleClientMessageMetricKind::Close, 1588 RuntimeClientMessage::NegOpen { .. } 1589 | RuntimeClientMessage::NegMsg { .. } 1590 | RuntimeClientMessage::NegClose(_) => TangleClientMessageMetricKind::Negentropy, 1591 } 1592 } 1593 1594 fn pocket_filter_group_ids(filters: &[PocketOwnedFilter]) -> Vec<GroupId> { 1595 let mut group_ids = BTreeSet::new(); 1596 for filter in filters { 1597 let Ok(tags) = filter.tags() else { 1598 continue; 1599 }; 1600 for mut tag in tags.iter() { 1601 let name = tag.next(); 1602 if !matches!(name, Some(value) if matches!(value, b"h" | b"d")) { 1603 continue; 1604 } 1605 for value in tag { 1606 if let Ok(value) = std::str::from_utf8(value) 1607 && let Ok(group_id) = GroupId::new(value) 1608 { 1609 group_ids.insert(group_id); 1610 } 1611 } 1612 } 1613 } 1614 group_ids.into_iter().collect() 1615 } 1616 1617 fn pocket_filter_kinds(filters: &[PocketOwnedFilter]) -> Vec<Kind> { 1618 filters 1619 .iter() 1620 .flat_map(|filter| filter.kinds()) 1621 .filter_map(|kind| Kind::new(u64::from(kind.as_u16())).ok()) 1622 .collect::<BTreeSet<_>>() 1623 .into_iter() 1624 .collect() 1625 } 1626 1627 impl fmt::Debug for RelayRuntimeHandle { 1628 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { 1629 formatter.write_str("RelayRuntimeHandle") 1630 } 1631 } 1632 1633 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 1634 pub struct TangleRuntimeLimits { 1635 max_message_length: usize, 1636 base_relay_limits: BaseRelayLimits, 1637 event_bus_capacity: usize, 1638 outbound_queue_capacity: usize, 1639 } 1640 1641 impl TangleRuntimeLimits { 1642 pub fn new( 1643 max_message_length: usize, 1644 base_relay_limits: BaseRelayLimits, 1645 event_bus_capacity: usize, 1646 outbound_queue_capacity: usize, 1647 ) -> Result<Self, BaseRelayError> { 1648 if max_message_length == 0 { 1649 return Err(BaseRelayError::invalid( 1650 "runtime max message length must be greater than zero", 1651 )); 1652 } 1653 if event_bus_capacity == 0 { 1654 return Err(BaseRelayError::invalid( 1655 "runtime event bus capacity must be greater than zero", 1656 )); 1657 } 1658 if outbound_queue_capacity == 0 { 1659 return Err(BaseRelayError::invalid( 1660 "runtime outbound queue capacity must be greater than zero", 1661 )); 1662 } 1663 Ok(Self { 1664 max_message_length, 1665 base_relay_limits, 1666 event_bus_capacity, 1667 outbound_queue_capacity, 1668 }) 1669 } 1670 1671 pub fn from_config(config: &BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { 1672 let limits = config.limits(); 1673 Self::new( 1674 limits.max_message_length(), 1675 limits.base_relay_limits()?, 1676 limits.broadcast_channel_capacity(), 1677 limits.per_connection_outbound_queue(), 1678 ) 1679 } 1680 1681 pub fn max_message_length(self) -> usize { 1682 self.max_message_length 1683 } 1684 1685 pub fn base_relay_limits(self) -> BaseRelayLimits { 1686 self.base_relay_limits 1687 } 1688 1689 pub fn max_pending_events(self) -> usize { 1690 self.base_relay_limits.max_pending_events() 1691 } 1692 1693 pub fn event_bus_capacity(self) -> usize { 1694 self.event_bus_capacity 1695 } 1696 1697 pub fn outbound_queue_capacity(self) -> usize { 1698 self.outbound_queue_capacity 1699 } 1700 } 1701 1702 #[derive(Debug, Clone)] 1703 pub struct TangleRuntimeMetrics { 1704 inner: Arc<TangleRuntimeMetricsInner>, 1705 } 1706 1707 #[derive(Debug)] 1708 struct TangleRuntimeMetricsInner { 1709 started_at: Instant, 1710 active_sessions: AtomicUsize, 1711 total_sessions: AtomicU64, 1712 client_messages: AtomicU64, 1713 event_messages: AtomicU64, 1714 req_messages: AtomicU64, 1715 count_messages: AtomicU64, 1716 auth_messages: AtomicU64, 1717 close_messages: AtomicU64, 1718 opened_subscriptions: AtomicU64, 1719 closed_subscriptions: AtomicU64, 1720 stored_event_offsets: AtomicU64, 1721 rate_limit_rejections: AtomicU64, 1722 auth_successes: AtomicU64, 1723 auth_failures: AtomicU64, 1724 event_admissions: AtomicU64, 1725 event_rejections: AtomicU64, 1726 group_read_denials: AtomicU64, 1727 group_write_denials: AtomicU64, 1728 event_bus_receivers_current: AtomicUsize, 1729 event_bus_published_offsets: AtomicU64, 1730 event_bus_lagged_receivers: AtomicU64, 1731 event_bus_lagged_offsets: AtomicU64, 1732 outbound_queue_full_closes: AtomicU64, 1733 outbox_pending_events: AtomicUsize, 1734 outbox_replayed_events: AtomicU64, 1735 disk_used_bytes: AtomicU64, 1736 event_admission_latency_total_micros: AtomicU64, 1737 event_admission_latency_count: AtomicU64, 1738 query_latency_total_micros: AtomicU64, 1739 query_latency_count: AtomicU64, 1740 query_candidates_scanned: AtomicU64, 1741 query_returned_events: AtomicU64, 1742 query_redacted_events: AtomicU64, 1743 count_refusals: AtomicU64, 1744 broad_query_rejections: AtomicU64, 1745 } 1746 1747 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 1748 pub enum TangleClientMessageMetricKind { 1749 Event, 1750 Req, 1751 Count, 1752 Auth, 1753 Close, 1754 Negentropy, 1755 } 1756 1757 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 1758 pub struct TangleRuntimeMetricsSnapshot { 1759 tangle_runtime_uptime_seconds: u64, 1760 tangle_readiness_ready: bool, 1761 tangle_ws_connections_current: usize, 1762 tangle_ws_connections_total: u64, 1763 tangle_client_messages_total: u64, 1764 tangle_event_messages_total: u64, 1765 tangle_req_messages_total: u64, 1766 tangle_count_messages_total: u64, 1767 tangle_auth_messages_total: u64, 1768 tangle_close_messages_total: u64, 1769 tangle_subscriptions_opened_total: u64, 1770 tangle_subscriptions_closed_total: u64, 1771 tangle_stored_event_offsets_total: u64, 1772 tangle_rate_limit_rejections_total: u64, 1773 tangle_auth_success_total: u64, 1774 tangle_auth_failure_total: u64, 1775 tangle_event_admitted_total: u64, 1776 tangle_event_rejected_total: u64, 1777 tangle_group_read_denied_total: u64, 1778 tangle_group_write_denied_total: u64, 1779 tangle_event_bus_receivers_current: usize, 1780 tangle_event_bus_published_offsets_total: u64, 1781 tangle_event_bus_lagged_receivers_total: u64, 1782 tangle_event_bus_lagged_offsets_total: u64, 1783 tangle_outbound_queue_full_closes_total: u64, 1784 tangle_outbox_pending_events: usize, 1785 tangle_outbox_replayed_events_total: u64, 1786 tangle_disk_used_bytes: u64, 1787 tangle_event_admission_latency_total_micros: u64, 1788 tangle_event_admission_latency_count: u64, 1789 tangle_query_latency_total_micros: u64, 1790 tangle_query_latency_count: u64, 1791 tangle_query_candidates_scanned_total: u64, 1792 tangle_query_returned_events_total: u64, 1793 tangle_query_redacted_events_total: u64, 1794 tangle_count_refusals_total: u64, 1795 tangle_broad_query_rejections_total: u64, 1796 } 1797 1798 impl TangleRuntimeMetricsSnapshot { 1799 pub fn active_sessions(&self) -> usize { 1800 self.tangle_ws_connections_current 1801 } 1802 1803 pub fn total_sessions(&self) -> u64 { 1804 self.tangle_ws_connections_total 1805 } 1806 1807 pub fn client_messages(&self) -> u64 { 1808 self.tangle_client_messages_total 1809 } 1810 1811 pub fn event_messages(&self) -> u64 { 1812 self.tangle_event_messages_total 1813 } 1814 1815 pub fn req_messages(&self) -> u64 { 1816 self.tangle_req_messages_total 1817 } 1818 1819 pub fn count_messages(&self) -> u64 { 1820 self.tangle_count_messages_total 1821 } 1822 1823 pub fn auth_messages(&self) -> u64 { 1824 self.tangle_auth_messages_total 1825 } 1826 1827 pub fn close_messages(&self) -> u64 { 1828 self.tangle_close_messages_total 1829 } 1830 1831 pub fn opened_subscriptions(&self) -> u64 { 1832 self.tangle_subscriptions_opened_total 1833 } 1834 1835 pub fn closed_subscriptions(&self) -> u64 { 1836 self.tangle_subscriptions_closed_total 1837 } 1838 1839 pub fn stored_event_offsets(&self) -> u64 { 1840 self.tangle_stored_event_offsets_total 1841 } 1842 1843 pub fn rate_limit_rejections(&self) -> u64 { 1844 self.tangle_rate_limit_rejections_total 1845 } 1846 } 1847 1848 impl TangleRuntimeMetrics { 1849 pub fn new() -> Self { 1850 Self { 1851 inner: Arc::new(TangleRuntimeMetricsInner { 1852 started_at: Instant::now(), 1853 active_sessions: AtomicUsize::new(0), 1854 total_sessions: AtomicU64::new(0), 1855 client_messages: AtomicU64::new(0), 1856 event_messages: AtomicU64::new(0), 1857 req_messages: AtomicU64::new(0), 1858 count_messages: AtomicU64::new(0), 1859 auth_messages: AtomicU64::new(0), 1860 close_messages: AtomicU64::new(0), 1861 opened_subscriptions: AtomicU64::new(0), 1862 closed_subscriptions: AtomicU64::new(0), 1863 stored_event_offsets: AtomicU64::new(0), 1864 rate_limit_rejections: AtomicU64::new(0), 1865 auth_successes: AtomicU64::new(0), 1866 auth_failures: AtomicU64::new(0), 1867 event_admissions: AtomicU64::new(0), 1868 event_rejections: AtomicU64::new(0), 1869 group_read_denials: AtomicU64::new(0), 1870 group_write_denials: AtomicU64::new(0), 1871 event_bus_receivers_current: AtomicUsize::new(0), 1872 event_bus_published_offsets: AtomicU64::new(0), 1873 event_bus_lagged_receivers: AtomicU64::new(0), 1874 event_bus_lagged_offsets: AtomicU64::new(0), 1875 outbound_queue_full_closes: AtomicU64::new(0), 1876 outbox_pending_events: AtomicUsize::new(0), 1877 outbox_replayed_events: AtomicU64::new(0), 1878 disk_used_bytes: AtomicU64::new(0), 1879 event_admission_latency_total_micros: AtomicU64::new(0), 1880 event_admission_latency_count: AtomicU64::new(0), 1881 query_latency_total_micros: AtomicU64::new(0), 1882 query_latency_count: AtomicU64::new(0), 1883 query_candidates_scanned: AtomicU64::new(0), 1884 query_returned_events: AtomicU64::new(0), 1885 query_redacted_events: AtomicU64::new(0), 1886 count_refusals: AtomicU64::new(0), 1887 broad_query_rejections: AtomicU64::new(0), 1888 }), 1889 } 1890 } 1891 1892 pub fn snapshot(&self) -> TangleRuntimeMetricsSnapshot { 1893 self.snapshot_with_readiness(false) 1894 } 1895 1896 pub fn snapshot_with_readiness(&self, readiness_ready: bool) -> TangleRuntimeMetricsSnapshot { 1897 TangleRuntimeMetricsSnapshot { 1898 tangle_runtime_uptime_seconds: self.started_at().elapsed().as_secs(), 1899 tangle_readiness_ready: readiness_ready, 1900 tangle_ws_connections_current: self.active_sessions(), 1901 tangle_ws_connections_total: self.total_sessions(), 1902 tangle_client_messages_total: self.client_messages(), 1903 tangle_event_messages_total: self.event_messages(), 1904 tangle_req_messages_total: self.req_messages(), 1905 tangle_count_messages_total: self.count_messages(), 1906 tangle_auth_messages_total: self.auth_messages(), 1907 tangle_close_messages_total: self.close_messages(), 1908 tangle_subscriptions_opened_total: self.opened_subscriptions(), 1909 tangle_subscriptions_closed_total: self.closed_subscriptions(), 1910 tangle_stored_event_offsets_total: self.stored_event_offsets(), 1911 tangle_rate_limit_rejections_total: self.rate_limit_rejections(), 1912 tangle_auth_success_total: self.auth_successes(), 1913 tangle_auth_failure_total: self.auth_failures(), 1914 tangle_event_admitted_total: self.event_admissions(), 1915 tangle_event_rejected_total: self.event_rejections(), 1916 tangle_group_read_denied_total: self.group_read_denials(), 1917 tangle_group_write_denied_total: self.group_write_denials(), 1918 tangle_event_bus_receivers_current: self.event_bus_receivers_current(), 1919 tangle_event_bus_published_offsets_total: self.event_bus_published_offsets(), 1920 tangle_event_bus_lagged_receivers_total: self.event_bus_lagged_receivers(), 1921 tangle_event_bus_lagged_offsets_total: self.event_bus_lagged_offsets(), 1922 tangle_outbound_queue_full_closes_total: self.outbound_queue_full_closes(), 1923 tangle_outbox_pending_events: self.outbox_pending_events(), 1924 tangle_outbox_replayed_events_total: self.outbox_replayed_events(), 1925 tangle_disk_used_bytes: self.disk_used_bytes(), 1926 tangle_event_admission_latency_total_micros: self 1927 .event_admission_latency_total_micros(), 1928 tangle_event_admission_latency_count: self.event_admission_latency_count(), 1929 tangle_query_latency_total_micros: self.query_latency_total_micros(), 1930 tangle_query_latency_count: self.query_latency_count(), 1931 tangle_query_candidates_scanned_total: self.query_candidates_scanned(), 1932 tangle_query_returned_events_total: self.query_returned_events(), 1933 tangle_query_redacted_events_total: self.query_redacted_events(), 1934 tangle_count_refusals_total: self.count_refusals(), 1935 tangle_broad_query_rejections_total: self.broad_query_rejections(), 1936 } 1937 } 1938 1939 pub fn started_at(&self) -> Instant { 1940 self.inner.started_at 1941 } 1942 1943 pub fn active_sessions(&self) -> usize { 1944 self.inner.active_sessions.load(Ordering::Relaxed) 1945 } 1946 1947 pub fn total_sessions(&self) -> u64 { 1948 self.inner.total_sessions.load(Ordering::Relaxed) 1949 } 1950 1951 pub fn client_messages(&self) -> u64 { 1952 self.inner.client_messages.load(Ordering::Relaxed) 1953 } 1954 1955 pub fn event_messages(&self) -> u64 { 1956 self.inner.event_messages.load(Ordering::Relaxed) 1957 } 1958 1959 pub fn req_messages(&self) -> u64 { 1960 self.inner.req_messages.load(Ordering::Relaxed) 1961 } 1962 1963 pub fn count_messages(&self) -> u64 { 1964 self.inner.count_messages.load(Ordering::Relaxed) 1965 } 1966 1967 pub fn auth_messages(&self) -> u64 { 1968 self.inner.auth_messages.load(Ordering::Relaxed) 1969 } 1970 1971 pub fn close_messages(&self) -> u64 { 1972 self.inner.close_messages.load(Ordering::Relaxed) 1973 } 1974 1975 pub fn opened_subscriptions(&self) -> u64 { 1976 self.inner.opened_subscriptions.load(Ordering::Relaxed) 1977 } 1978 1979 pub fn closed_subscriptions(&self) -> u64 { 1980 self.inner.closed_subscriptions.load(Ordering::Relaxed) 1981 } 1982 1983 pub fn stored_event_offsets(&self) -> u64 { 1984 self.inner.stored_event_offsets.load(Ordering::Relaxed) 1985 } 1986 1987 pub fn rate_limit_rejections(&self) -> u64 { 1988 self.inner.rate_limit_rejections.load(Ordering::Relaxed) 1989 } 1990 1991 pub fn auth_successes(&self) -> u64 { 1992 self.inner.auth_successes.load(Ordering::Relaxed) 1993 } 1994 1995 pub fn auth_failures(&self) -> u64 { 1996 self.inner.auth_failures.load(Ordering::Relaxed) 1997 } 1998 1999 pub fn event_admissions(&self) -> u64 { 2000 self.inner.event_admissions.load(Ordering::Relaxed) 2001 } 2002 2003 pub fn event_rejections(&self) -> u64 { 2004 self.inner.event_rejections.load(Ordering::Relaxed) 2005 } 2006 2007 pub fn group_read_denials(&self) -> u64 { 2008 self.inner.group_read_denials.load(Ordering::Relaxed) 2009 } 2010 2011 pub fn group_write_denials(&self) -> u64 { 2012 self.inner.group_write_denials.load(Ordering::Relaxed) 2013 } 2014 2015 pub fn event_bus_receivers_current(&self) -> usize { 2016 self.inner 2017 .event_bus_receivers_current 2018 .load(Ordering::Relaxed) 2019 } 2020 2021 pub fn event_bus_published_offsets(&self) -> u64 { 2022 self.inner 2023 .event_bus_published_offsets 2024 .load(Ordering::Relaxed) 2025 } 2026 2027 pub fn event_bus_lagged_receivers(&self) -> u64 { 2028 self.inner 2029 .event_bus_lagged_receivers 2030 .load(Ordering::Relaxed) 2031 } 2032 2033 pub fn event_bus_lagged_offsets(&self) -> u64 { 2034 self.inner.event_bus_lagged_offsets.load(Ordering::Relaxed) 2035 } 2036 2037 pub fn outbound_queue_full_closes(&self) -> u64 { 2038 self.inner 2039 .outbound_queue_full_closes 2040 .load(Ordering::Relaxed) 2041 } 2042 2043 pub fn outbox_pending_events(&self) -> usize { 2044 self.inner.outbox_pending_events.load(Ordering::Relaxed) 2045 } 2046 2047 pub fn outbox_replayed_events(&self) -> u64 { 2048 self.inner.outbox_replayed_events.load(Ordering::Relaxed) 2049 } 2050 2051 pub fn disk_used_bytes(&self) -> u64 { 2052 self.inner.disk_used_bytes.load(Ordering::Relaxed) 2053 } 2054 2055 pub fn event_admission_latency_total_micros(&self) -> u64 { 2056 self.inner 2057 .event_admission_latency_total_micros 2058 .load(Ordering::Relaxed) 2059 } 2060 2061 pub fn event_admission_latency_count(&self) -> u64 { 2062 self.inner 2063 .event_admission_latency_count 2064 .load(Ordering::Relaxed) 2065 } 2066 2067 pub fn query_latency_total_micros(&self) -> u64 { 2068 self.inner 2069 .query_latency_total_micros 2070 .load(Ordering::Relaxed) 2071 } 2072 2073 pub fn query_latency_count(&self) -> u64 { 2074 self.inner.query_latency_count.load(Ordering::Relaxed) 2075 } 2076 2077 pub fn query_candidates_scanned(&self) -> u64 { 2078 self.inner.query_candidates_scanned.load(Ordering::Relaxed) 2079 } 2080 2081 pub fn query_returned_events(&self) -> u64 { 2082 self.inner.query_returned_events.load(Ordering::Relaxed) 2083 } 2084 2085 pub fn query_redacted_events(&self) -> u64 { 2086 self.inner.query_redacted_events.load(Ordering::Relaxed) 2087 } 2088 2089 pub fn count_refusals(&self) -> u64 { 2090 self.inner.count_refusals.load(Ordering::Relaxed) 2091 } 2092 2093 pub fn broad_query_rejections(&self) -> u64 { 2094 self.inner.broad_query_rejections.load(Ordering::Relaxed) 2095 } 2096 2097 pub fn record_session_opened(&self) -> usize { 2098 self.inner.total_sessions.fetch_add(1, Ordering::Relaxed); 2099 self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1 2100 } 2101 2102 pub fn record_session_closed(&self) -> usize { 2103 let mut current = self.inner.active_sessions.load(Ordering::Relaxed); 2104 loop { 2105 if current == 0 { 2106 return 0; 2107 } 2108 match self.inner.active_sessions.compare_exchange( 2109 current, 2110 current - 1, 2111 Ordering::Relaxed, 2112 Ordering::Relaxed, 2113 ) { 2114 Ok(_) => return current - 1, 2115 Err(actual) => current = actual, 2116 } 2117 } 2118 } 2119 2120 pub fn record_client_message(&self, kind: TangleClientMessageMetricKind) -> u64 { 2121 let total = self.inner.client_messages.fetch_add(1, Ordering::Relaxed) + 1; 2122 match kind { 2123 TangleClientMessageMetricKind::Event => { 2124 self.inner.event_messages.fetch_add(1, Ordering::Relaxed); 2125 } 2126 TangleClientMessageMetricKind::Req => { 2127 self.inner.req_messages.fetch_add(1, Ordering::Relaxed); 2128 } 2129 TangleClientMessageMetricKind::Count => { 2130 self.inner.count_messages.fetch_add(1, Ordering::Relaxed); 2131 } 2132 TangleClientMessageMetricKind::Auth => { 2133 self.inner.auth_messages.fetch_add(1, Ordering::Relaxed); 2134 } 2135 TangleClientMessageMetricKind::Close => { 2136 self.inner.close_messages.fetch_add(1, Ordering::Relaxed); 2137 } 2138 TangleClientMessageMetricKind::Negentropy => {} 2139 }; 2140 total 2141 } 2142 2143 pub fn record_subscription_opened(&self) -> u64 { 2144 self.inner 2145 .opened_subscriptions 2146 .fetch_add(1, Ordering::Relaxed) 2147 + 1 2148 } 2149 2150 pub fn record_subscriptions_closed(&self, count: usize) -> u64 { 2151 self.inner.closed_subscriptions.fetch_add( 2152 u64::try_from(count).expect("subscription count fits in u64"), 2153 Ordering::Relaxed, 2154 ) + u64::try_from(count).expect("subscription count fits in u64") 2155 } 2156 2157 pub fn record_stored_event_offset(&self) -> u64 { 2158 self.inner 2159 .stored_event_offsets 2160 .fetch_add(1, Ordering::Relaxed) 2161 + 1 2162 } 2163 2164 pub fn record_rate_limit_rejection(&self) -> u64 { 2165 self.inner 2166 .rate_limit_rejections 2167 .fetch_add(1, Ordering::Relaxed) 2168 + 1 2169 } 2170 2171 pub fn record_auth_success(&self) -> u64 { 2172 self.inner.auth_successes.fetch_add(1, Ordering::Relaxed) + 1 2173 } 2174 2175 pub fn record_auth_failure(&self) -> u64 { 2176 self.inner.auth_failures.fetch_add(1, Ordering::Relaxed) + 1 2177 } 2178 2179 pub fn record_event_admission(&self) -> u64 { 2180 self.inner.event_admissions.fetch_add(1, Ordering::Relaxed) + 1 2181 } 2182 2183 pub fn record_event_rejection(&self) -> u64 { 2184 self.inner.event_rejections.fetch_add(1, Ordering::Relaxed) + 1 2185 } 2186 2187 pub fn record_group_read_denial(&self) -> u64 { 2188 self.inner 2189 .group_read_denials 2190 .fetch_add(1, Ordering::Relaxed) 2191 + 1 2192 } 2193 2194 pub fn record_group_write_denial(&self) -> u64 { 2195 self.inner 2196 .group_write_denials 2197 .fetch_add(1, Ordering::Relaxed) 2198 + 1 2199 } 2200 2201 pub fn record_event_bus_receivers(&self, count: usize) { 2202 self.inner 2203 .event_bus_receivers_current 2204 .store(count, Ordering::Relaxed); 2205 } 2206 2207 pub fn record_event_bus_publish(&self, receivers: usize) -> u64 { 2208 self.record_event_bus_receivers(receivers); 2209 self.inner 2210 .event_bus_published_offsets 2211 .fetch_add(1, Ordering::Relaxed) 2212 + 1 2213 } 2214 2215 pub fn record_event_bus_lagged(&self, skipped: u64) { 2216 self.inner 2217 .event_bus_lagged_receivers 2218 .fetch_add(1, Ordering::Relaxed); 2219 self.inner 2220 .event_bus_lagged_offsets 2221 .fetch_add(skipped, Ordering::Relaxed); 2222 } 2223 2224 pub fn record_outbound_queue_full_close(&self) -> u64 { 2225 self.inner 2226 .outbound_queue_full_closes 2227 .fetch_add(1, Ordering::Relaxed) 2228 + 1 2229 } 2230 2231 pub fn record_outbox_pending_events(&self, count: usize) { 2232 self.inner 2233 .outbox_pending_events 2234 .store(count, Ordering::Relaxed); 2235 } 2236 2237 pub fn record_outbox_replayed_event(&self) -> u64 { 2238 self.inner 2239 .outbox_replayed_events 2240 .fetch_add(1, Ordering::Relaxed) 2241 + 1 2242 } 2243 2244 pub fn record_disk_used_bytes(&self, bytes: u64) { 2245 self.inner.disk_used_bytes.store(bytes, Ordering::Relaxed); 2246 } 2247 2248 pub fn record_event_admission_latency(&self, micros: u64) { 2249 self.inner 2250 .event_admission_latency_total_micros 2251 .fetch_add(micros, Ordering::Relaxed); 2252 self.inner 2253 .event_admission_latency_count 2254 .fetch_add(1, Ordering::Relaxed); 2255 } 2256 2257 pub fn record_query_latency(&self, micros: u64) { 2258 self.inner 2259 .query_latency_total_micros 2260 .fetch_add(micros, Ordering::Relaxed); 2261 self.inner 2262 .query_latency_count 2263 .fetch_add(1, Ordering::Relaxed); 2264 } 2265 2266 pub(crate) fn record_query_metrics(&self, metrics: BaseRelayQueryMetrics) { 2267 self.inner 2268 .query_candidates_scanned 2269 .fetch_add(metrics.candidates_scanned(), Ordering::Relaxed); 2270 self.inner 2271 .query_returned_events 2272 .fetch_add(metrics.returned_events(), Ordering::Relaxed); 2273 self.inner 2274 .query_redacted_events 2275 .fetch_add(metrics.redacted_events(), Ordering::Relaxed); 2276 } 2277 2278 pub fn record_count_refusal(&self) -> u64 { 2279 self.inner.count_refusals.fetch_add(1, Ordering::Relaxed) + 1 2280 } 2281 2282 pub fn record_broad_query_rejection(&self) -> u64 { 2283 self.inner 2284 .broad_query_rejections 2285 .fetch_add(1, Ordering::Relaxed) 2286 + 1 2287 } 2288 } 2289 2290 impl Default for TangleRuntimeMetrics { 2291 fn default() -> Self { 2292 Self::new() 2293 } 2294 } 2295 2296 #[derive(Debug, Clone)] 2297 pub struct TangleShutdownSignal { 2298 sender: watch::Sender<bool>, 2299 } 2300 2301 impl TangleShutdownSignal { 2302 pub fn new() -> Self { 2303 let (sender, _) = watch::channel(false); 2304 Self { sender } 2305 } 2306 2307 pub fn subscribe(&self) -> watch::Receiver<bool> { 2308 self.sender.subscribe() 2309 } 2310 2311 pub fn request_shutdown(&self) { 2312 self.sender.send_replace(true); 2313 } 2314 2315 pub fn requested(&self) -> bool { 2316 *self.sender.borrow() 2317 } 2318 } 2319 2320 impl Default for TangleShutdownSignal { 2321 fn default() -> Self { 2322 Self::new() 2323 } 2324 } 2325 2326 #[cfg(test)] 2327 mod tests { 2328 use super::{ 2329 BROAD_QUERY_TIME_WINDOW_SECONDS, EventAdmissionDecision, RelayEventAdmissionContext, 2330 RelayEventStoredContext, RelayRuntime, RelayRuntimeHandle, RelayRuntimeHooks, 2331 RuntimeClientMessage, TangleBroadQueryReason, TangleClientRateLimitContext, 2332 TangleQueryClassification, TangleQueryClassifier, TangleRuntimeLimits, 2333 }; 2334 use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; 2335 use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver}; 2336 use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope}; 2337 use crate::relay::auth::BaseAuthState; 2338 use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics}; 2339 use crate::relay::live::LiveSubscriptionSet; 2340 use crate::relay::outbound::RuntimeRelayMessage; 2341 use serde_json::json; 2342 use std::{ 2343 collections::{BTreeMap, BTreeSet}, 2344 net::{IpAddr, Ipv4Addr}, 2345 path::{Path, PathBuf}, 2346 sync::{Arc, Mutex}, 2347 time::Duration, 2348 }; 2349 use tangle_crypto::RelaySigner; 2350 use tangle_groups::{ 2351 CanonicalGroupEvent, GroupEventClass, GroupId, GroupProjection, KIND_GROUP_ADMINS, 2352 KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, 2353 KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, 2354 KIND_GROUP_REMOVE_USER, MemberStatus, StoreOffset, rebuild_group_projection, 2355 }; 2356 use tangle_protocol::{ 2357 ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, 2358 SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, filter_from_value, 2359 }; 2360 use tangle_store_pocket::{ 2361 PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime, 2362 }; 2363 use tangle_test_support::FixtureKey; 2364 2365 #[test] 2366 fn tangle_runtime_opens_owned_process_shell_from_config() { 2367 let root = temp_root("owned-runtime"); 2368 let _ = std::fs::remove_dir_all(&root); 2369 let config = runtime_config(&root, 8); 2370 2371 let mut runtime = RelayRuntime::open(config).expect("runtime"); 2372 let mut offsets = runtime.event_bus().subscribe(); 2373 let shutdown = runtime.shutdown_signal().subscribe(); 2374 2375 assert_eq!(runtime.config().relay_url(), "wss://relay.radroots.test"); 2376 assert_eq!(runtime.config().listen_addr().to_string(), "127.0.0.1:0"); 2377 assert_eq!(runtime.limits().max_pending_events(), 8); 2378 assert_eq!(runtime.limits().max_message_length(), 1_048_576); 2379 assert_eq!(runtime.limits().event_bus_capacity(), 16); 2380 assert_eq!(runtime.limits().outbound_queue_capacity(), 8); 2381 assert_eq!(runtime.event_bus().capacity(), 16); 2382 assert_eq!(runtime.event_bus().receiver_count(), 1); 2383 assert_eq!(runtime.rate_limiter().tracked_key_count(), 0); 2384 assert_eq!(runtime.metrics().active_sessions(), 0); 2385 assert_eq!(runtime.metrics().stored_event_offsets(), 0); 2386 assert!(runtime.relay().groups_enabled()); 2387 assert!(!runtime.readiness_state().is_ready()); 2388 assert_eq!( 2389 runtime.readiness_state().response().checks.server_bind, 2390 "not_ready" 2391 ); 2392 assert_eq!( 2393 runtime 2394 .readiness_state() 2395 .response() 2396 .checks 2397 .group_outbox_replay, 2398 "ready" 2399 ); 2400 assert_eq!( 2401 runtime.readiness_state().response().checks.event_bus, 2402 "ready" 2403 ); 2404 assert!(!*shutdown.borrow()); 2405 2406 assert_eq!(runtime.event_bus().publish(StoreOffset::new(42)), 1); 2407 assert_eq!(offsets.try_recv().expect("offset"), StoreOffset::new(42)); 2408 assert_eq!( 2409 runtime 2410 .auth_state() 2411 .expect("auth") 2412 .authenticated_pubkeys() 2413 .len(), 2414 0 2415 ); 2416 assert_eq!( 2417 runtime.config().pocket_config().data_directory(), 2418 Path::new(&root).join("pocket") 2419 ); 2420 2421 assert_eq!(runtime.metrics().record_session_opened(), 1); 2422 assert_eq!(runtime.metrics().active_sessions(), 1); 2423 assert_eq!(runtime.metrics().total_sessions(), 1); 2424 assert_eq!(runtime.metrics().record_session_closed(), 0); 2425 assert_eq!(runtime.metrics().active_sessions(), 0); 2426 assert_eq!(runtime.metrics().total_sessions(), 1); 2427 assert_eq!( 2428 runtime 2429 .metrics() 2430 .record_client_message(super::TangleClientMessageMetricKind::Req), 2431 1 2432 ); 2433 assert_eq!(runtime.metrics().client_messages(), 1); 2434 assert_eq!(runtime.metrics().req_messages(), 1); 2435 assert_eq!(runtime.metrics().record_subscription_opened(), 1); 2436 assert_eq!(runtime.metrics().opened_subscriptions(), 1); 2437 assert_eq!(runtime.metrics().record_subscriptions_closed(1), 1); 2438 assert_eq!(runtime.metrics().closed_subscriptions(), 1); 2439 assert_eq!(runtime.metrics().record_stored_event_offset(), 1); 2440 assert_eq!(runtime.metrics().stored_event_offsets(), 1); 2441 assert_eq!(runtime.metrics().record_rate_limit_rejection(), 1); 2442 assert_eq!(runtime.metrics().rate_limit_rejections(), 1); 2443 assert_eq!(runtime.metrics().record_auth_success(), 1); 2444 assert_eq!(runtime.metrics().record_auth_failure(), 1); 2445 assert_eq!(runtime.metrics().record_event_admission(), 1); 2446 assert_eq!(runtime.metrics().record_event_rejection(), 1); 2447 assert_eq!(runtime.metrics().record_group_read_denial(), 1); 2448 assert_eq!(runtime.metrics().record_group_write_denial(), 1); 2449 runtime.metrics().record_event_bus_receivers(3); 2450 assert_eq!(runtime.metrics().record_event_bus_publish(3), 1); 2451 runtime.metrics().record_event_bus_lagged(4); 2452 assert_eq!(runtime.metrics().record_outbound_queue_full_close(), 1); 2453 runtime.metrics().record_outbox_pending_events(2); 2454 assert_eq!(runtime.metrics().record_outbox_replayed_event(), 1); 2455 runtime.metrics().record_disk_used_bytes(5); 2456 runtime.metrics().record_event_admission_latency(13); 2457 runtime.metrics().record_query_latency(17); 2458 runtime 2459 .metrics() 2460 .record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2)); 2461 assert_eq!(runtime.metrics().record_count_refusal(), 1); 2462 assert_eq!(runtime.metrics().record_broad_query_rejection(), 1); 2463 let snapshot = runtime.metrics().snapshot_with_readiness(true); 2464 assert_eq!(snapshot.active_sessions(), 0); 2465 assert_eq!(snapshot.total_sessions(), 1); 2466 assert_eq!(snapshot.client_messages(), 1); 2467 assert_eq!(snapshot.req_messages(), 1); 2468 assert_eq!(snapshot.opened_subscriptions(), 1); 2469 assert_eq!(snapshot.closed_subscriptions(), 1); 2470 assert_eq!(snapshot.stored_event_offsets(), 1); 2471 assert_eq!(snapshot.rate_limit_rejections(), 1); 2472 let snapshot_value = serde_json::to_value(snapshot).expect("snapshot json"); 2473 assert_eq!(snapshot_value["tangle_readiness_ready"], true); 2474 assert_eq!(snapshot_value["tangle_auth_success_total"], 1); 2475 assert_eq!(snapshot_value["tangle_auth_failure_total"], 1); 2476 assert_eq!(snapshot_value["tangle_event_admitted_total"], 1); 2477 assert_eq!(snapshot_value["tangle_event_rejected_total"], 1); 2478 assert_eq!(snapshot_value["tangle_group_read_denied_total"], 1); 2479 assert_eq!(snapshot_value["tangle_group_write_denied_total"], 1); 2480 assert_eq!(snapshot_value["tangle_event_bus_receivers_current"], 3); 2481 assert_eq!( 2482 snapshot_value["tangle_event_bus_published_offsets_total"], 2483 1 2484 ); 2485 assert_eq!(snapshot_value["tangle_event_bus_lagged_receivers_total"], 1); 2486 assert_eq!(snapshot_value["tangle_event_bus_lagged_offsets_total"], 4); 2487 assert_eq!(snapshot_value["tangle_outbound_queue_full_closes_total"], 1); 2488 assert_eq!(snapshot_value["tangle_outbox_pending_events"], 2); 2489 assert_eq!(snapshot_value["tangle_outbox_replayed_events_total"], 1); 2490 assert_eq!(snapshot_value["tangle_disk_used_bytes"], 5); 2491 assert_eq!( 2492 snapshot_value["tangle_event_admission_latency_total_micros"], 2493 13 2494 ); 2495 assert_eq!(snapshot_value["tangle_event_admission_latency_count"], 1); 2496 assert_eq!(snapshot_value["tangle_query_latency_total_micros"], 17); 2497 assert_eq!(snapshot_value["tangle_query_latency_count"], 1); 2498 assert_eq!(snapshot_value["tangle_query_candidates_scanned_total"], 5); 2499 assert_eq!(snapshot_value["tangle_query_returned_events_total"], 3); 2500 assert_eq!(snapshot_value["tangle_query_redacted_events_total"], 2); 2501 assert_eq!(snapshot_value["tangle_count_refusals_total"], 1); 2502 assert_eq!(snapshot_value["tangle_broad_query_rejections_total"], 1); 2503 2504 let report = runtime.shutdown().expect("shutdown"); 2505 2506 assert_eq!(report.closed_subscriptions(), 0); 2507 assert!(runtime.shutdown_signal().requested()); 2508 assert!(*shutdown.borrow()); 2509 2510 let _ = std::fs::remove_dir_all(root); 2511 } 2512 2513 #[test] 2514 fn runtime_metrics_snapshot_serializes_tangle_contract_keys() { 2515 let metrics = super::TangleRuntimeMetrics::new(); 2516 metrics.record_session_opened(); 2517 metrics.record_auth_success(); 2518 metrics.record_event_admission(); 2519 metrics.record_event_bus_publish(1); 2520 metrics.record_disk_used_bytes(42); 2521 let snapshot = metrics.snapshot_with_readiness(true); 2522 let value = serde_json::to_value(snapshot).expect("snapshot"); 2523 2524 assert_eq!(value["tangle_readiness_ready"], true); 2525 assert_eq!(value["tangle_ws_connections_current"], 1); 2526 assert_eq!(value["tangle_auth_success_total"], 1); 2527 assert_eq!(value["tangle_event_admitted_total"], 1); 2528 assert_eq!(value["tangle_event_bus_published_offsets_total"], 1); 2529 assert_eq!(value["tangle_disk_used_bytes"], 42); 2530 assert_eq!(value["tangle_outbound_queue_full_closes_total"], 0); 2531 assert_eq!(value["tangle_query_candidates_scanned_total"], 0); 2532 assert_eq!(value["tangle_query_returned_events_total"], 0); 2533 assert_eq!(value["tangle_query_redacted_events_total"], 0); 2534 assert_eq!(value["tangle_count_refusals_total"], 0); 2535 assert_eq!(value["tangle_broad_query_rejections_total"], 0); 2536 assert!(value.get("active_sessions").is_none()); 2537 assert!(value.get("stored_event_offsets").is_none()); 2538 } 2539 2540 #[test] 2541 fn runtime_limits_and_event_bus_reject_zero_capacity() { 2542 assert!(TangleRuntimeLimits::new(0, runtime_relay_limits(1), 1, 1).is_err()); 2543 assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 0, 1).is_err()); 2544 assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 1, 0).is_err()); 2545 assert!(TangleEventBus::new(0).is_err()); 2546 } 2547 2548 #[tokio::test] 2549 async fn runtime_publishes_stored_event_offsets_for_live_fanout() { 2550 let root = temp_root("runtime-offset-fanout"); 2551 let _ = std::fs::remove_dir_all(&root); 2552 let handle = 2553 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); 2554 let mut offsets = handle.subscribe_events().await; 2555 let mut auth = handle.auth_state().await.expect("auth"); 2556 let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions"); 2557 let subscription_id = SubscriptionId::new("live-offset").expect("subscription"); 2558 subscriptions 2559 .subscribe( 2560 subscription_id.clone(), 2561 vec![pocket_filter(json!({"kinds":[1]}))], 2562 ) 2563 .expect("subscribe"); 2564 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "live") 2565 .expect("event"); 2566 2567 assert_eq!( 2568 handle 2569 .handle_protocol_client_message_for_test( 2570 ClientMessage::Event(event.clone()), 2571 &mut auth, 2572 UnixTimestamp::new(1_714_124_433) 2573 ) 2574 .await 2575 .expect("event"), 2576 vec![RelayMessage::Ok { 2577 event_id: event.id().clone(), 2578 accepted: true, 2579 message: String::new() 2580 }] 2581 ); 2582 let offset = offsets.try_recv().expect("offset"); 2583 assert!(matches!( 2584 handle 2585 .fanout_event_offset(offset, &mut subscriptions, &auth) 2586 .await 2587 .expect("fanout") 2588 .as_slice(), 2589 [RuntimeRelayMessage::Event { 2590 subscription_id: delivered, 2591 event: found 2592 }] if delivered == &subscription_id && found.id().as_hex_string() == event.id().as_str() 2593 )); 2594 2595 assert_eq!( 2596 handle 2597 .handle_protocol_client_message_for_test( 2598 ClientMessage::Event(event.clone()), 2599 &mut auth, 2600 UnixTimestamp::new(1_714_124_434) 2601 ) 2602 .await 2603 .expect("duplicate"), 2604 vec![RelayMessage::Ok { 2605 event_id: event.id().clone(), 2606 accepted: true, 2607 message: "duplicate: already have this event".to_owned() 2608 }] 2609 ); 2610 assert_eq!( 2611 offsets.try_recv().expect_err("no duplicate offset"), 2612 TangleEventReceiveError::Empty 2613 ); 2614 let snapshot = handle.metrics().snapshot(); 2615 assert_eq!(snapshot.client_messages(), 2); 2616 assert_eq!(snapshot.event_messages(), 2); 2617 assert_eq!(snapshot.stored_event_offsets(), 1); 2618 assert_eq!(handle.metrics().event_admissions(), 2); 2619 assert_eq!(handle.metrics().event_bus_receivers_current(), 1); 2620 assert_eq!(handle.metrics().event_bus_published_offsets(), 1); 2621 assert_eq!(handle.metrics().event_admission_latency_count(), 2); 2622 2623 let _ = std::fs::remove_dir_all(root); 2624 } 2625 2626 #[tokio::test] 2627 async fn runtime_hooks_reject_events_and_observe_stored_offsets() { 2628 let root = temp_root("runtime-hooks"); 2629 let _ = std::fs::remove_dir_all(&root); 2630 let hooks = Arc::new(RecordingHooks::default()); 2631 let handle = RelayRuntimeHandle::new( 2632 RelayRuntime::open_with_hooks(runtime_config(&root, 8), hooks.clone()) 2633 .expect("runtime"), 2634 ); 2635 let mut offsets = handle.subscribe_events().await; 2636 let mut auth = handle.auth_state().await.expect("auth"); 2637 let rejected = tangle_v2_event( 2638 FixtureKey::Member, 2639 1_714_124_433, 2640 1, 2641 vec![Tag::from_parts("policy", &["reject"]).expect("policy")], 2642 "rejected", 2643 ) 2644 .expect("rejected event"); 2645 2646 assert_eq!( 2647 handle 2648 .handle_protocol_client_message_for_test( 2649 ClientMessage::Event(rejected.clone()), 2650 &mut auth, 2651 UnixTimestamp::new(1_714_124_433) 2652 ) 2653 .await 2654 .expect("rejected"), 2655 vec![RelayMessage::Ok { 2656 event_id: rejected.id().clone(), 2657 accepted: false, 2658 message: "restricted: hook rejected event".to_owned() 2659 }] 2660 ); 2661 assert_eq!( 2662 offsets.try_recv().expect_err("no rejected offset"), 2663 TangleEventReceiveError::Empty 2664 ); 2665 2666 let accepted = tangle_v2_event( 2667 FixtureKey::Member, 2668 1_714_124_434, 2669 1, 2670 vec![Tag::from_parts("policy", &["accept"]).expect("policy")], 2671 "accepted", 2672 ) 2673 .expect("accepted event"); 2674 2675 assert_eq!( 2676 handle 2677 .handle_protocol_client_message_for_test( 2678 ClientMessage::Event(accepted.clone()), 2679 &mut auth, 2680 UnixTimestamp::new(1_714_124_434) 2681 ) 2682 .await 2683 .expect("accepted"), 2684 vec![RelayMessage::Ok { 2685 event_id: accepted.id().clone(), 2686 accepted: true, 2687 message: String::new() 2688 }] 2689 ); 2690 assert!(offsets.try_recv().is_ok()); 2691 let admissions = hooks.admissions.lock().expect("admissions"); 2692 assert_eq!(admissions.len(), 2); 2693 assert_eq!(admissions[0].event().event_id(), rejected.id().as_str()); 2694 assert_eq!(admissions[0].event().created_at(), 1_714_124_433); 2695 assert_eq!(admissions[1].event().event_id(), accepted.id().as_str()); 2696 assert_eq!(admissions[1].event().created_at(), 1_714_124_434); 2697 drop(admissions); 2698 let stored = hooks.stored.lock().expect("stored"); 2699 assert_eq!(stored.len(), 1); 2700 assert_eq!(stored[0].event().event_id(), accepted.id().as_str()); 2701 assert_eq!(stored[0].event().created_at(), 1_714_124_434); 2702 assert_eq!(stored[0].store_offsets().len(), 1); 2703 2704 let _ = std::fs::remove_dir_all(root); 2705 } 2706 2707 #[tokio::test] 2708 async fn runtime_rate_limits_event_pubkeys_before_storage() { 2709 let root = temp_root("runtime-event-rate-limit"); 2710 let _ = std::fs::remove_dir_all(&root); 2711 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2712 let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited") 2713 .expect("event"); 2714 let rule = runtime.config().rate_limits().event().per_pubkey(); 2715 let key = TangleRateLimitKey::pubkey( 2716 TangleRateLimitScope::Event, 2717 event.unsigned().pubkey().clone(), 2718 ); 2719 for _ in 0..rule.max_hits() { 2720 runtime 2721 .rate_limiter() 2722 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 2723 } 2724 let handle = RelayRuntimeHandle::new(runtime); 2725 let mut auth = handle.auth_state().await.expect("auth"); 2726 2727 assert_eq!( 2728 handle 2729 .handle_protocol_client_message_for_test( 2730 ClientMessage::Event(event.clone()), 2731 &mut auth, 2732 UnixTimestamp::new(1_714_124_433) 2733 ) 2734 .await 2735 .expect("event"), 2736 vec![RelayMessage::Ok { 2737 event_id: event.id().clone(), 2738 accepted: false, 2739 message: "rate-limited: event pubkey rate limit exceeded until 1714124493" 2740 .to_owned() 2741 }] 2742 ); 2743 2744 let _ = std::fs::remove_dir_all(root); 2745 } 2746 2747 #[tokio::test] 2748 async fn runtime_rate_limits_event_kinds_before_storage() { 2749 let root = temp_root("runtime-event-kind-rate-limit"); 2750 let _ = std::fs::remove_dir_all(&root); 2751 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2752 let event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 1, Vec::new(), "limited") 2753 .expect("event"); 2754 let rule = runtime.config().rate_limits().event().per_kind(); 2755 let key = TangleRateLimitKey::kind(TangleRateLimitScope::Event, event.unsigned().kind()); 2756 for _ in 0..rule.max_hits() { 2757 runtime 2758 .rate_limiter() 2759 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 2760 } 2761 let handle = RelayRuntimeHandle::new(runtime); 2762 let mut auth = handle.auth_state().await.expect("auth"); 2763 2764 assert_eq!( 2765 handle 2766 .handle_protocol_client_message_for_test( 2767 ClientMessage::Event(event.clone()), 2768 &mut auth, 2769 UnixTimestamp::new(1_714_124_433) 2770 ) 2771 .await 2772 .expect("event"), 2773 vec![RelayMessage::Ok { 2774 event_id: event.id().clone(), 2775 accepted: false, 2776 message: "rate-limited: event kind rate limit exceeded until 1714124493".to_owned() 2777 }] 2778 ); 2779 2780 let _ = std::fs::remove_dir_all(root); 2781 } 2782 2783 #[tokio::test] 2784 async fn runtime_rate_limits_event_peer_ips_partition_peers_and_precede_identity_keys() { 2785 let root = temp_root("runtime-event-ip-rate-limit"); 2786 let _ = std::fs::remove_dir_all(&root); 2787 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2788 let rule = runtime.config().rate_limits().event().per_ip(); 2789 let saturated_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 20)); 2790 let other_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 21)); 2791 let key = TangleRateLimitKey::ip(TangleRateLimitScope::Event, saturated_peer_ip); 2792 for _ in 0..rule.max_hits() { 2793 runtime 2794 .rate_limiter() 2795 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 2796 } 2797 let limited_event = 2798 tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited") 2799 .expect("limited event"); 2800 let rotated_event = 2801 tangle_v2_event(FixtureKey::Admin, 1_714_124_434, 2, Vec::new(), "rotated") 2802 .expect("rotated event"); 2803 let allowed_event = 2804 tangle_v2_event(FixtureKey::Owner, 1_714_124_435, 2, Vec::new(), "allowed") 2805 .expect("allowed event"); 2806 let handle = RelayRuntimeHandle::new(runtime); 2807 let mut auth = handle.auth_state().await.expect("auth"); 2808 2809 assert_eq!( 2810 handle 2811 .handle_protocol_client_message_with_rate_limit_context_for_test( 2812 ClientMessage::Event(limited_event.clone()), 2813 &mut auth, 2814 TangleClientRateLimitContext::new(Some(saturated_peer_ip), None), 2815 UnixTimestamp::new(1_714_124_433) 2816 ) 2817 .await 2818 .expect("event"), 2819 vec![RelayMessage::Ok { 2820 event_id: limited_event.id().clone(), 2821 accepted: false, 2822 message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned() 2823 }] 2824 ); 2825 assert_eq!( 2826 handle 2827 .handle_protocol_client_message_with_rate_limit_context_for_test( 2828 ClientMessage::Event(rotated_event.clone()), 2829 &mut auth, 2830 TangleClientRateLimitContext::new(Some(saturated_peer_ip), None), 2831 UnixTimestamp::new(1_714_124_433) 2832 ) 2833 .await 2834 .expect("event"), 2835 vec![RelayMessage::Ok { 2836 event_id: rotated_event.id().clone(), 2837 accepted: false, 2838 message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned() 2839 }] 2840 ); 2841 assert_eq!( 2842 handle 2843 .handle_protocol_client_message_with_rate_limit_context_for_test( 2844 ClientMessage::Event(allowed_event.clone()), 2845 &mut auth, 2846 TangleClientRateLimitContext::new(Some(other_peer_ip), None), 2847 UnixTimestamp::new(1_714_124_433) 2848 ) 2849 .await 2850 .expect("event"), 2851 vec![RelayMessage::Ok { 2852 event_id: allowed_event.id().clone(), 2853 accepted: true, 2854 message: String::new() 2855 }] 2856 ); 2857 assert_eq!(handle.metrics().rate_limit_rejections(), 2); 2858 assert_eq!(handle.metrics().event_rejections(), 2); 2859 assert_eq!(handle.metrics().event_admissions(), 1); 2860 2861 let _ = std::fs::remove_dir_all(root); 2862 } 2863 2864 #[tokio::test] 2865 async fn runtime_rate_limits_auth_pubkeys_before_authentication() { 2866 let root = temp_root("runtime-auth-pubkey-rate-limit"); 2867 let _ = std::fs::remove_dir_all(&root); 2868 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2869 let auth_event = 2870 tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); 2871 let rule = runtime.config().rate_limits().auth().per_pubkey(); 2872 let key = TangleRateLimitKey::pubkey( 2873 TangleRateLimitScope::Auth, 2874 auth_event.unsigned().pubkey().clone(), 2875 ); 2876 for _ in 0..rule.max_hits() { 2877 runtime 2878 .rate_limiter() 2879 .record(key.clone(), rule, UnixTimestamp::new(120)); 2880 } 2881 let handle = RelayRuntimeHandle::new(runtime); 2882 let mut auth = handle.auth_state().await.expect("auth"); 2883 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 2884 .expect("challenge"); 2885 2886 assert_eq!( 2887 handle 2888 .handle_protocol_client_message_for_test( 2889 ClientMessage::Auth(auth_event.clone()), 2890 &mut auth, 2891 UnixTimestamp::new(120) 2892 ) 2893 .await 2894 .expect("auth"), 2895 vec![RelayMessage::Ok { 2896 event_id: auth_event.id().clone(), 2897 accepted: false, 2898 message: "rate-limited: auth pubkey rate limit exceeded until 180".to_owned() 2899 }] 2900 ); 2901 assert!(auth.authenticated_pubkeys().is_empty()); 2902 2903 let _ = std::fs::remove_dir_all(root); 2904 } 2905 2906 #[tokio::test] 2907 async fn runtime_rate_limits_auth_peer_ips_before_authentication() { 2908 let root = temp_root("runtime-auth-ip-rate-limit"); 2909 let _ = std::fs::remove_dir_all(&root); 2910 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2911 let auth_event = 2912 tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); 2913 let rule = runtime.config().rate_limits().auth().per_ip(); 2914 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 30)); 2915 let key = TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip); 2916 for _ in 0..rule.max_hits() { 2917 runtime 2918 .rate_limiter() 2919 .record(key.clone(), rule, UnixTimestamp::new(120)); 2920 } 2921 let handle = RelayRuntimeHandle::new(runtime); 2922 let mut auth = handle.auth_state().await.expect("auth"); 2923 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 2924 .expect("challenge"); 2925 2926 assert_eq!( 2927 handle 2928 .handle_protocol_client_message_with_rate_limit_context_for_test( 2929 ClientMessage::Auth(auth_event.clone()), 2930 &mut auth, 2931 TangleClientRateLimitContext::new(Some(peer_ip), None), 2932 UnixTimestamp::new(120) 2933 ) 2934 .await 2935 .expect("auth"), 2936 vec![RelayMessage::Ok { 2937 event_id: auth_event.id().clone(), 2938 accepted: false, 2939 message: "rate-limited: auth ip rate limit exceeded until 180".to_owned() 2940 }] 2941 ); 2942 assert!(auth.authenticated_pubkeys().is_empty()); 2943 assert_eq!(handle.metrics().rate_limit_rejections(), 1); 2944 assert_eq!(handle.metrics().auth_failures(), 1); 2945 2946 let _ = std::fs::remove_dir_all(root); 2947 } 2948 2949 #[tokio::test] 2950 async fn runtime_rate_limits_auth_failures() { 2951 let root = temp_root("runtime-auth-failure-rate-limit"); 2952 let _ = std::fs::remove_dir_all(&root); 2953 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2954 let auth_event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "") 2955 .expect("auth event"); 2956 let key = 2957 TangleRateLimitKey::auth_failure(None, Some(auth_event.unsigned().pubkey().clone())); 2958 let rule = runtime.config().rate_limits().auth().failures(); 2959 for _ in 0..rule.max_hits() { 2960 runtime 2961 .rate_limiter() 2962 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 2963 } 2964 let handle = RelayRuntimeHandle::new(runtime); 2965 let mut auth = handle.auth_state().await.expect("auth"); 2966 2967 assert_eq!( 2968 handle 2969 .handle_protocol_client_message_for_test( 2970 ClientMessage::Auth(auth_event.clone()), 2971 &mut auth, 2972 UnixTimestamp::new(1_714_124_433) 2973 ) 2974 .await 2975 .expect("auth"), 2976 vec![RelayMessage::Ok { 2977 event_id: auth_event.id().clone(), 2978 accepted: false, 2979 message: "rate-limited: auth failure rate limit exceeded until 1714124733" 2980 .to_owned() 2981 }] 2982 ); 2983 2984 let _ = std::fs::remove_dir_all(root); 2985 } 2986 2987 #[tokio::test] 2988 async fn runtime_rate_limits_auth_failures_by_peer_ip() { 2989 let root = temp_root("runtime-auth-failure-ip-rate-limit"); 2990 let _ = std::fs::remove_dir_all(&root); 2991 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 2992 let auth_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 22_242, Vec::new(), "") 2993 .expect("auth event"); 2994 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 31)); 2995 let key = TangleRateLimitKey::auth_failure(Some(peer_ip), None); 2996 let rule = runtime.config().rate_limits().auth().failures_per_ip(); 2997 for _ in 0..rule.max_hits() { 2998 runtime 2999 .rate_limiter() 3000 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3001 } 3002 let handle = RelayRuntimeHandle::new(runtime); 3003 let mut auth = handle.auth_state().await.expect("auth"); 3004 3005 assert_eq!( 3006 handle 3007 .handle_protocol_client_message_with_rate_limit_context_for_test( 3008 ClientMessage::Auth(auth_event.clone()), 3009 &mut auth, 3010 TangleClientRateLimitContext::new(Some(peer_ip), None), 3011 UnixTimestamp::new(1_714_124_433) 3012 ) 3013 .await 3014 .expect("auth"), 3015 vec![RelayMessage::Ok { 3016 event_id: auth_event.id().clone(), 3017 accepted: false, 3018 message: "rate-limited: auth failure ip rate limit exceeded until 1714124733" 3019 .to_owned() 3020 }] 3021 ); 3022 assert_eq!(handle.metrics().rate_limit_rejections(), 1); 3023 assert_eq!(handle.metrics().auth_failures(), 1); 3024 3025 let _ = std::fs::remove_dir_all(root); 3026 } 3027 3028 #[tokio::test] 3029 async fn runtime_preserves_chorus_auth_failure_rate_limit_parity() { 3030 let root = temp_root("runtime-chorus-auth-rate-limit-parity"); 3031 let _ = std::fs::remove_dir_all(&root); 3032 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3033 let pubkey_event = 3034 tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "") 3035 .expect("pubkey auth event"); 3036 let pubkey_rule = runtime.config().rate_limits().auth().failures(); 3037 let pubkey_key = 3038 TangleRateLimitKey::auth_failure(None, Some(pubkey_event.unsigned().pubkey().clone())); 3039 for _ in 0..pubkey_rule.max_hits() { 3040 runtime.rate_limiter().record( 3041 pubkey_key.clone(), 3042 pubkey_rule, 3043 UnixTimestamp::new(1_714_124_433), 3044 ); 3045 } 3046 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 41)); 3047 let peer_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_434, 22_242, Vec::new(), "") 3048 .expect("peer auth event"); 3049 let peer_rule = runtime.config().rate_limits().auth().failures_per_ip(); 3050 let peer_key = TangleRateLimitKey::auth_failure(Some(peer_ip), None); 3051 for _ in 0..peer_rule.max_hits() { 3052 runtime.rate_limiter().record( 3053 peer_key.clone(), 3054 peer_rule, 3055 UnixTimestamp::new(1_714_124_434), 3056 ); 3057 } 3058 let handle = RelayRuntimeHandle::new(runtime); 3059 let mut auth = handle.auth_state().await.expect("auth"); 3060 3061 assert_eq!( 3062 handle 3063 .handle_protocol_client_message_for_test( 3064 ClientMessage::Auth(pubkey_event.clone()), 3065 &mut auth, 3066 UnixTimestamp::new(1_714_124_433) 3067 ) 3068 .await 3069 .expect("pubkey failure"), 3070 vec![RelayMessage::Ok { 3071 event_id: pubkey_event.id().clone(), 3072 accepted: false, 3073 message: "rate-limited: auth failure rate limit exceeded until 1714124733" 3074 .to_owned() 3075 }] 3076 ); 3077 assert_eq!( 3078 handle 3079 .handle_protocol_client_message_with_rate_limit_context_for_test( 3080 ClientMessage::Auth(peer_event.clone()), 3081 &mut auth, 3082 TangleClientRateLimitContext::new(Some(peer_ip), None), 3083 UnixTimestamp::new(1_714_124_434) 3084 ) 3085 .await 3086 .expect("peer failure"), 3087 vec![RelayMessage::Ok { 3088 event_id: peer_event.id().clone(), 3089 accepted: false, 3090 message: "rate-limited: auth failure ip rate limit exceeded until 1714124734" 3091 .to_owned() 3092 }] 3093 ); 3094 assert!(auth.authenticated_pubkeys().is_empty()); 3095 let snapshot = handle.metrics().snapshot(); 3096 assert_eq!(snapshot.client_messages(), 2); 3097 assert_eq!(snapshot.auth_messages(), 2); 3098 assert_eq!(snapshot.rate_limit_rejections(), 2); 3099 assert_eq!(handle.metrics().auth_successes(), 0); 3100 assert_eq!(handle.metrics().auth_failures(), 2); 3101 3102 let _ = std::fs::remove_dir_all(root); 3103 } 3104 3105 #[tokio::test] 3106 async fn runtime_rate_limits_group_writes_by_pubkey() { 3107 let root = temp_root("runtime-group-pubkey-rate-limit"); 3108 let _ = std::fs::remove_dir_all(&root); 3109 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3110 let event = tangle_v2_event( 3111 FixtureKey::Member, 3112 1_714_124_433, 3113 1, 3114 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 3115 "limited", 3116 ) 3117 .expect("event"); 3118 let rule = runtime.config().rate_limits().group().write_per_pubkey(); 3119 let key = TangleRateLimitKey::pubkey( 3120 TangleRateLimitScope::GroupWrite, 3121 event.unsigned().pubkey().clone(), 3122 ); 3123 for _ in 0..rule.max_hits() { 3124 runtime 3125 .rate_limiter() 3126 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3127 } 3128 let handle = RelayRuntimeHandle::new(runtime); 3129 let mut auth = handle.auth_state().await.expect("auth"); 3130 3131 assert_eq!( 3132 handle 3133 .handle_protocol_client_message_for_test( 3134 ClientMessage::Event(event.clone()), 3135 &mut auth, 3136 UnixTimestamp::new(1_714_124_433) 3137 ) 3138 .await 3139 .expect("event"), 3140 vec![RelayMessage::Ok { 3141 event_id: event.id().clone(), 3142 accepted: false, 3143 message: "rate-limited: group pubkey rate limit exceeded until 1714124493" 3144 .to_owned() 3145 }] 3146 ); 3147 3148 let _ = std::fs::remove_dir_all(root); 3149 } 3150 3151 #[tokio::test] 3152 async fn runtime_rate_limits_group_writes_by_peer_ip() { 3153 let root = temp_root("runtime-group-ip-rate-limit"); 3154 let _ = std::fs::remove_dir_all(&root); 3155 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3156 let event = tangle_v2_event( 3157 FixtureKey::Member, 3158 1_714_124_433, 3159 1, 3160 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 3161 "limited", 3162 ) 3163 .expect("event"); 3164 let rule = runtime.config().rate_limits().group().write_per_ip(); 3165 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 40)); 3166 let key = TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip); 3167 for _ in 0..rule.max_hits() { 3168 runtime 3169 .rate_limiter() 3170 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3171 } 3172 let handle = RelayRuntimeHandle::new(runtime); 3173 let mut auth = handle.auth_state().await.expect("auth"); 3174 3175 assert_eq!( 3176 handle 3177 .handle_protocol_client_message_with_rate_limit_context_for_test( 3178 ClientMessage::Event(event.clone()), 3179 &mut auth, 3180 TangleClientRateLimitContext::new(Some(peer_ip), None), 3181 UnixTimestamp::new(1_714_124_433) 3182 ) 3183 .await 3184 .expect("event"), 3185 vec![RelayMessage::Ok { 3186 event_id: event.id().clone(), 3187 accepted: false, 3188 message: "rate-limited: group ip rate limit exceeded until 1714124493".to_owned() 3189 }] 3190 ); 3191 assert_eq!(handle.metrics().rate_limit_rejections(), 1); 3192 assert_eq!(handle.metrics().event_rejections(), 1); 3193 assert_eq!(handle.metrics().group_write_denials(), 1); 3194 3195 let _ = std::fs::remove_dir_all(root); 3196 } 3197 3198 #[tokio::test] 3199 async fn runtime_rate_limits_group_writes_by_group_id() { 3200 let root = temp_root("runtime-group-write-rate-limit"); 3201 let _ = std::fs::remove_dir_all(&root); 3202 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3203 let group_id = GroupId::new("Farm").expect("group"); 3204 let event = tangle_v2_event( 3205 FixtureKey::Member, 3206 1_714_124_433, 3207 1, 3208 vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], 3209 "limited", 3210 ) 3211 .expect("event"); 3212 let rule = runtime.config().rate_limits().group().write_per_group(); 3213 let key = TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id); 3214 for _ in 0..rule.max_hits() { 3215 runtime 3216 .rate_limiter() 3217 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3218 } 3219 let handle = RelayRuntimeHandle::new(runtime); 3220 let mut auth = handle.auth_state().await.expect("auth"); 3221 3222 assert_eq!( 3223 handle 3224 .handle_protocol_client_message_for_test( 3225 ClientMessage::Event(event.clone()), 3226 &mut auth, 3227 UnixTimestamp::new(1_714_124_433) 3228 ) 3229 .await 3230 .expect("event"), 3231 vec![RelayMessage::Ok { 3232 event_id: event.id().clone(), 3233 accepted: false, 3234 message: "rate-limited: group write rate limit exceeded until 1714124493" 3235 .to_owned() 3236 }] 3237 ); 3238 3239 let _ = std::fs::remove_dir_all(root); 3240 } 3241 3242 #[tokio::test] 3243 async fn runtime_rate_limits_group_writes_by_kind() { 3244 let root = temp_root("runtime-group-kind-rate-limit"); 3245 let _ = std::fs::remove_dir_all(&root); 3246 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3247 let event = tangle_v2_event( 3248 FixtureKey::Member, 3249 1_714_124_433, 3250 1, 3251 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 3252 "limited", 3253 ) 3254 .expect("event"); 3255 let rule = runtime.config().rate_limits().group().write_per_kind(); 3256 let key = 3257 TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, event.unsigned().kind()); 3258 for _ in 0..rule.max_hits() { 3259 runtime 3260 .rate_limiter() 3261 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3262 } 3263 let handle = RelayRuntimeHandle::new(runtime); 3264 let mut auth = handle.auth_state().await.expect("auth"); 3265 3266 assert_eq!( 3267 handle 3268 .handle_protocol_client_message_for_test( 3269 ClientMessage::Event(event.clone()), 3270 &mut auth, 3271 UnixTimestamp::new(1_714_124_433) 3272 ) 3273 .await 3274 .expect("event"), 3275 vec![RelayMessage::Ok { 3276 event_id: event.id().clone(), 3277 accepted: false, 3278 message: "rate-limited: group kind rate limit exceeded until 1714124493".to_owned() 3279 }] 3280 ); 3281 3282 let _ = std::fs::remove_dir_all(root); 3283 } 3284 3285 #[tokio::test] 3286 async fn runtime_rate_limits_group_join_flows() { 3287 let root = temp_root("runtime-group-join-rate-limit"); 3288 let _ = std::fs::remove_dir_all(&root); 3289 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3290 let group_id = GroupId::new("Farm").expect("group"); 3291 let event = tangle_v2_event( 3292 FixtureKey::Member, 3293 1_714_124_433, 3294 KIND_GROUP_JOIN_REQUEST.into(), 3295 vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], 3296 "", 3297 ) 3298 .expect("event"); 3299 let rule = runtime.config().rate_limits().group().join_flow(); 3300 let key = TangleRateLimitKey::join_flow(group_id, event.unsigned().pubkey().clone()); 3301 for _ in 0..rule.max_hits() { 3302 runtime 3303 .rate_limiter() 3304 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3305 } 3306 let handle = RelayRuntimeHandle::new(runtime); 3307 let mut auth = handle.auth_state().await.expect("auth"); 3308 3309 assert_eq!( 3310 handle 3311 .handle_protocol_client_message_for_test( 3312 ClientMessage::Event(event.clone()), 3313 &mut auth, 3314 UnixTimestamp::new(1_714_124_433) 3315 ) 3316 .await 3317 .expect("event"), 3318 vec![RelayMessage::Ok { 3319 event_id: event.id().clone(), 3320 accepted: false, 3321 message: "rate-limited: group join rate limit exceeded until 1714124733".to_owned() 3322 }] 3323 ); 3324 3325 let _ = std::fs::remove_dir_all(root); 3326 } 3327 3328 #[tokio::test] 3329 async fn runtime_rate_limits_group_join_flows_by_peer_ip() { 3330 let root = temp_root("runtime-group-join-ip-rate-limit"); 3331 let _ = std::fs::remove_dir_all(&root); 3332 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3333 let group_id = GroupId::new("Farm").expect("group"); 3334 let event = tangle_v2_event( 3335 FixtureKey::Member, 3336 1_714_124_433, 3337 KIND_GROUP_JOIN_REQUEST.into(), 3338 vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], 3339 "", 3340 ) 3341 .expect("event"); 3342 let rule = runtime.config().rate_limits().group().join_flow_per_ip(); 3343 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 41)); 3344 let key = TangleRateLimitKey::join_flow_ip(group_id, peer_ip); 3345 for _ in 0..rule.max_hits() { 3346 runtime 3347 .rate_limiter() 3348 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3349 } 3350 let handle = RelayRuntimeHandle::new(runtime); 3351 let mut auth = handle.auth_state().await.expect("auth"); 3352 3353 assert_eq!( 3354 handle 3355 .handle_protocol_client_message_with_rate_limit_context_for_test( 3356 ClientMessage::Event(event.clone()), 3357 &mut auth, 3358 TangleClientRateLimitContext::new(Some(peer_ip), None), 3359 UnixTimestamp::new(1_714_124_433) 3360 ) 3361 .await 3362 .expect("event"), 3363 vec![RelayMessage::Ok { 3364 event_id: event.id().clone(), 3365 accepted: false, 3366 message: "rate-limited: group join ip rate limit exceeded until 1714124733" 3367 .to_owned() 3368 }] 3369 ); 3370 assert_eq!(handle.metrics().rate_limit_rejections(), 1); 3371 assert_eq!(handle.metrics().event_rejections(), 1); 3372 assert_eq!(handle.metrics().group_write_denials(), 1); 3373 3374 let _ = std::fs::remove_dir_all(root); 3375 } 3376 3377 #[tokio::test] 3378 async fn runtime_rate_limits_req_authenticated_pubkeys() { 3379 let root = temp_root("runtime-req-pubkey-rate-limit"); 3380 let _ = std::fs::remove_dir_all(&root); 3381 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3382 let rule = runtime.config().rate_limits().req().per_pubkey(); 3383 let handle = RelayRuntimeHandle::new(runtime); 3384 let mut auth = handle.auth_state().await.expect("auth"); 3385 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 3386 .expect("challenge"); 3387 let auth_event = 3388 tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); 3389 3390 assert_eq!( 3391 handle 3392 .handle_protocol_client_message_for_test( 3393 ClientMessage::Auth(auth_event.clone()), 3394 &mut auth, 3395 UnixTimestamp::new(120) 3396 ) 3397 .await 3398 .expect("auth"), 3399 vec![RelayMessage::Ok { 3400 event_id: auth_event.id().clone(), 3401 accepted: true, 3402 message: String::new() 3403 }] 3404 ); 3405 let key = 3406 TangleRateLimitKey::pubkey(TangleRateLimitScope::Req, FixtureKey::Member.public_key()); 3407 let limiter = handle.rate_limiter().await; 3408 for _ in 0..rule.max_hits() { 3409 limiter.record(key.clone(), rule, UnixTimestamp::new(120)); 3410 } 3411 let subscription_id = SubscriptionId::new("limited-req-pubkey").expect("subscription"); 3412 let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")]; 3413 3414 assert_eq!( 3415 handle 3416 .handle_protocol_client_message_for_test( 3417 ClientMessage::Req { 3418 subscription_id: subscription_id.clone(), 3419 filters 3420 }, 3421 &mut auth, 3422 UnixTimestamp::new(120) 3423 ) 3424 .await 3425 .expect("req"), 3426 vec![RelayMessage::Closed { 3427 subscription_id, 3428 message: "rate-limited: req pubkey rate limit exceeded until 180".to_owned() 3429 }] 3430 ); 3431 3432 let _ = std::fs::remove_dir_all(root); 3433 } 3434 3435 #[tokio::test] 3436 async fn runtime_rate_limits_req_connections() { 3437 let root = temp_root("runtime-req-connection-rate-limit"); 3438 let _ = std::fs::remove_dir_all(&root); 3439 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3440 let rule = runtime.config().rate_limits().req().per_connection(); 3441 let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, 77); 3442 for _ in 0..rule.max_hits() { 3443 runtime 3444 .rate_limiter() 3445 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3446 } 3447 let handle = RelayRuntimeHandle::new(runtime); 3448 let mut auth = handle.auth_state().await.expect("auth"); 3449 let subscription_id = SubscriptionId::new("limited-req-connection").expect("subscription"); 3450 let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")]; 3451 3452 assert_eq!( 3453 handle 3454 .handle_protocol_client_message_with_rate_limit_context_for_test( 3455 ClientMessage::Req { 3456 subscription_id: subscription_id.clone(), 3457 filters 3458 }, 3459 &mut auth, 3460 TangleClientRateLimitContext::new(None, Some(77)), 3461 UnixTimestamp::new(1_714_124_433) 3462 ) 3463 .await 3464 .expect("req"), 3465 vec![RelayMessage::Closed { 3466 subscription_id, 3467 message: "rate-limited: req connection rate limit exceeded until 1714124493" 3468 .to_owned() 3469 }] 3470 ); 3471 3472 let _ = std::fs::remove_dir_all(root); 3473 } 3474 3475 #[tokio::test] 3476 async fn runtime_rate_limits_req_filter_groups() { 3477 let root = temp_root("runtime-req-group-rate-limit"); 3478 let _ = std::fs::remove_dir_all(&root); 3479 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3480 let group_id = GroupId::new("Farm").expect("group"); 3481 let rule = runtime.config().rate_limits().req().per_group(); 3482 let key = TangleRateLimitKey::group(TangleRateLimitScope::Req, group_id); 3483 for _ in 0..rule.max_hits() { 3484 runtime 3485 .rate_limiter() 3486 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3487 } 3488 let handle = RelayRuntimeHandle::new(runtime); 3489 let mut auth = handle.auth_state().await.expect("auth"); 3490 let subscription_id = SubscriptionId::new("limited-req-group").expect("subscription"); 3491 let filters = 3492 vec![filter_from_value(&json!({"#h": ["Farm"], "limit": 1})).expect("filter")]; 3493 3494 assert_eq!( 3495 handle 3496 .handle_protocol_client_message_for_test( 3497 ClientMessage::Req { 3498 subscription_id: subscription_id.clone(), 3499 filters 3500 }, 3501 &mut auth, 3502 UnixTimestamp::new(1_714_124_433) 3503 ) 3504 .await 3505 .expect("req"), 3506 vec![RelayMessage::Closed { 3507 subscription_id, 3508 message: "rate-limited: req group rate limit exceeded until 1714124493".to_owned() 3509 }] 3510 ); 3511 3512 let _ = std::fs::remove_dir_all(root); 3513 } 3514 3515 #[test] 3516 fn query_classifier_identifies_broad_count_shapes() { 3517 let classifier = TangleQueryClassifier::new(runtime_relay_limits(8)); 3518 let empty_filter = pocket_filter(json!({})); 3519 let tag_only_filter = pocket_filter(json!({"#t": ["market"], "limit": 1})); 3520 let kind_only_filter = pocket_filter(json!({"kinds": [1], "limit": 1})); 3521 let high_limit_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 500})); 3522 let broad_time_filter = pocket_filter(json!({ 3523 "kinds": [1], 3524 "since": 1, 3525 "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2, 3526 "limit": 1 3527 })); 3528 let bounded_group_filter = pocket_filter(json!({"kinds": [1], "#h": ["Farm"], "limit": 1})); 3529 let bounded_time_filter = pocket_filter(json!({ 3530 "kinds": [1], 3531 "since": 1, 3532 "until": BROAD_QUERY_TIME_WINDOW_SECONDS, 3533 "limit": 1 3534 })); 3535 let hll_reaction_filter = pocket_filter(json!({"kinds": [7], "#e": ["a".repeat(64)]})); 3536 3537 assert_eq!( 3538 classifier.classify_pocket_count(&[]), 3539 TangleQueryClassification::Broad(TangleBroadQueryReason::EmptyFilters) 3540 ); 3541 assert_eq!( 3542 classifier.classify_pocket_count(&[empty_filter]), 3543 TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint) 3544 ); 3545 assert_eq!( 3546 classifier.classify_pocket_count(&[tag_only_filter]), 3547 TangleQueryClassification::Broad(TangleBroadQueryReason::MissingPrimaryConstraint) 3548 ); 3549 assert_eq!( 3550 classifier.classify_pocket_count(&[kind_only_filter]), 3551 TangleQueryClassification::Broad(TangleBroadQueryReason::MissingBoundedSelector) 3552 ); 3553 assert_eq!( 3554 classifier.classify_pocket_count(&[high_limit_filter]), 3555 TangleQueryClassification::Broad(TangleBroadQueryReason::HighLimit) 3556 ); 3557 assert_eq!( 3558 classifier.classify_pocket_count(&[broad_time_filter]), 3559 TangleQueryClassification::Broad(TangleBroadQueryReason::BroadTimeWindow) 3560 ); 3561 assert_eq!( 3562 classifier.classify_pocket_count(&[bounded_group_filter]), 3563 TangleQueryClassification::Bounded 3564 ); 3565 assert_eq!( 3566 classifier.classify_pocket_count(&[bounded_time_filter]), 3567 TangleQueryClassification::Bounded 3568 ); 3569 assert_eq!( 3570 classifier.classify_pocket_count(&[hll_reaction_filter]), 3571 TangleQueryClassification::Bounded 3572 ); 3573 } 3574 3575 #[tokio::test] 3576 async fn runtime_count_hll_accepts_public_pocket_selector() { 3577 let root = temp_root("runtime-count-hll"); 3578 let _ = std::fs::remove_dir_all(&root); 3579 let handle = 3580 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); 3581 let mut auth = handle.auth_state().await.expect("auth"); 3582 let target = "c".repeat(64); 3583 let tags = PocketOwnedTags::new(&[["e", target.as_str()]]).expect("tags"); 3584 let first = signed_pocket_event(12, 1_714_124_433, 7, &tags, b"first reaction"); 3585 let second = signed_pocket_event(11, 1_714_124_434, 7, &tags, b"second reaction"); 3586 3587 assert_accepted_pocket_reply( 3588 runtime_pocket_event_reply(&handle, &first, &mut auth), 3589 &first, 3590 ); 3591 assert_accepted_pocket_reply( 3592 runtime_pocket_event_reply(&handle, &second, &mut auth), 3593 &second, 3594 ); 3595 3596 let subscription_id = SubscriptionId::new("count-hll-runtime").expect("subscription"); 3597 let replies = handle 3598 .handle_protocol_client_message_for_test( 3599 ClientMessage::Count { 3600 subscription_id: subscription_id.clone(), 3601 filters: vec![ 3602 filter_from_value(&json!({"kinds":[7],"#e":[target]})).expect("filter"), 3603 ], 3604 }, 3605 &mut auth, 3606 UnixTimestamp::new(1_714_124_437), 3607 ) 3608 .await 3609 .expect("count"); 3610 let [ 3611 RelayMessage::Count { 3612 subscription_id: actual, 3613 count, 3614 hll: Some(hll), 3615 }, 3616 ] = replies.as_slice() 3617 else { 3618 panic!("count hll expected: {replies:?}") 3619 }; 3620 3621 assert_eq!(actual, &subscription_id); 3622 assert_eq!(*count, 2); 3623 assert_eq!(hll.len(), 512); 3624 assert_ne!(hll, &"00".repeat(256)); 3625 3626 let _ = std::fs::remove_dir_all(root); 3627 } 3628 3629 #[test] 3630 fn runtime_count_source_stays_exact() { 3631 let sources = [ 3632 include_str!("runtime.rs"), 3633 include_str!("relay/core.rs"), 3634 include_str!("../../tangle_protocol/src/lib.rs"), 3635 ]; 3636 let forbidden = [ 3637 concat!("approximate", "_count"), 3638 concat!("approx", "_count"), 3639 concat!("estimated", "_count"), 3640 concat!("count", "_estimate"), 3641 concat!("private", "_count", "_estimate"), 3642 ]; 3643 3644 for source in sources { 3645 for needle in forbidden { 3646 assert!(!source.contains(needle)); 3647 } 3648 } 3649 } 3650 3651 #[tokio::test] 3652 async fn runtime_rate_limits_count_peer_ips() { 3653 let root = temp_root("runtime-count-ip-rate-limit"); 3654 let _ = std::fs::remove_dir_all(&root); 3655 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3656 let rule = runtime.config().rate_limits().count().per_ip(); 3657 let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9)); 3658 let key = TangleRateLimitKey::ip(TangleRateLimitScope::Count, peer_ip); 3659 for _ in 0..rule.max_hits() { 3660 runtime 3661 .rate_limiter() 3662 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3663 } 3664 let handle = RelayRuntimeHandle::new(runtime); 3665 let mut auth = handle.auth_state().await.expect("auth"); 3666 let subscription_id = SubscriptionId::new("limited-count-ip").expect("subscription"); 3667 let filters = vec![ 3668 filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"), 3669 ]; 3670 3671 assert_eq!( 3672 handle 3673 .handle_protocol_client_message_with_rate_limit_context_for_test( 3674 ClientMessage::Count { 3675 subscription_id: subscription_id.clone(), 3676 filters 3677 }, 3678 &mut auth, 3679 TangleClientRateLimitContext::new(Some(peer_ip), None), 3680 UnixTimestamp::new(1_714_124_433) 3681 ) 3682 .await 3683 .expect("count"), 3684 vec![RelayMessage::Closed { 3685 subscription_id, 3686 message: "rate-limited: count ip rate limit exceeded until 1714124493".to_owned() 3687 }] 3688 ); 3689 3690 let _ = std::fs::remove_dir_all(root); 3691 } 3692 3693 #[tokio::test] 3694 async fn runtime_rejects_search_req_and_count_as_unsupported() { 3695 let root = temp_root("runtime-search-unsupported"); 3696 let _ = std::fs::remove_dir_all(&root); 3697 let handle = 3698 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); 3699 let mut auth = handle.auth_state().await.expect("auth"); 3700 let req_id = SubscriptionId::new("search-req").expect("req"); 3701 let count_id = SubscriptionId::new("search-count").expect("count"); 3702 let search = 3703 filter_from_value(&json!({"search": "fresh carrots", "limit": 1})).expect("filter"); 3704 3705 assert_eq!( 3706 handle 3707 .handle_protocol_client_message_for_test( 3708 ClientMessage::Req { 3709 subscription_id: req_id.clone(), 3710 filters: vec![search.clone()] 3711 }, 3712 &mut auth, 3713 UnixTimestamp::new(1_714_124_433) 3714 ) 3715 .await 3716 .expect("req"), 3717 vec![RelayMessage::Closed { 3718 subscription_id: req_id, 3719 message: "unsupported: search filters are not supported".to_owned() 3720 }] 3721 ); 3722 assert_eq!( 3723 handle 3724 .handle_protocol_client_message_for_test( 3725 ClientMessage::Count { 3726 subscription_id: count_id.clone(), 3727 filters: vec![search] 3728 }, 3729 &mut auth, 3730 UnixTimestamp::new(1_714_124_434) 3731 ) 3732 .await 3733 .expect("count"), 3734 vec![RelayMessage::Closed { 3735 subscription_id: count_id, 3736 message: "unsupported: search filters are not supported".to_owned() 3737 }] 3738 ); 3739 3740 let _ = std::fs::remove_dir_all(root); 3741 } 3742 3743 #[tokio::test] 3744 async fn runtime_rate_limits_count_filter_kinds() { 3745 let root = temp_root("runtime-count-kind-rate-limit"); 3746 let _ = std::fs::remove_dir_all(&root); 3747 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3748 let kind = Kind::new(1).expect("kind"); 3749 let rule = runtime.config().rate_limits().count().per_kind(); 3750 let key = TangleRateLimitKey::kind(TangleRateLimitScope::Count, kind); 3751 for _ in 0..rule.max_hits() { 3752 runtime 3753 .rate_limiter() 3754 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3755 } 3756 let handle = RelayRuntimeHandle::new(runtime); 3757 let mut auth = handle.auth_state().await.expect("auth"); 3758 let subscription_id = SubscriptionId::new("limited-count-kind").expect("subscription"); 3759 let filters = vec![ 3760 filter_from_value(&json!({"kinds": [1], "#h": ["Farm"], "limit": 1})).expect("filter"), 3761 ]; 3762 3763 assert_eq!( 3764 handle 3765 .handle_protocol_client_message_for_test( 3766 ClientMessage::Count { 3767 subscription_id: subscription_id.clone(), 3768 filters 3769 }, 3770 &mut auth, 3771 UnixTimestamp::new(1_714_124_433) 3772 ) 3773 .await 3774 .expect("count"), 3775 vec![RelayMessage::Closed { 3776 subscription_id, 3777 message: "rate-limited: count kind rate limit exceeded until 1714124493".to_owned() 3778 }] 3779 ); 3780 3781 let _ = std::fs::remove_dir_all(root); 3782 } 3783 3784 #[tokio::test] 3785 async fn runtime_refuses_broad_count_queries_before_rate_limits() { 3786 let root = temp_root("runtime-count-broad-refusal"); 3787 let _ = std::fs::remove_dir_all(&root); 3788 let runtime = RelayRuntime::open(runtime_config(&root, 8)).expect("runtime"); 3789 let rule = runtime.config().rate_limits().count().broad(); 3790 let key = TangleRateLimitKey::query_class( 3791 TangleRateLimitScope::Count, 3792 TangleRateLimitQueryClass::Broad, 3793 ); 3794 for _ in 0..rule.max_hits() { 3795 runtime 3796 .rate_limiter() 3797 .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); 3798 } 3799 let handle = RelayRuntimeHandle::new(runtime); 3800 let mut auth = handle.auth_state().await.expect("auth"); 3801 let subscription_id = SubscriptionId::new("limited-count-broad").expect("subscription"); 3802 let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")]; 3803 3804 assert_eq!( 3805 handle 3806 .handle_protocol_client_message_for_test( 3807 ClientMessage::Count { 3808 subscription_id: subscription_id.clone(), 3809 filters 3810 }, 3811 &mut auth, 3812 UnixTimestamp::new(1_714_124_433) 3813 ) 3814 .await 3815 .expect("count"), 3816 vec![RelayMessage::Closed { 3817 subscription_id, 3818 message: "restricted: count filters are too broad or expensive".to_owned() 3819 }] 3820 ); 3821 assert_eq!(handle.metrics().count_refusals(), 1); 3822 assert_eq!(handle.metrics().broad_query_rejections(), 1); 3823 3824 let _ = std::fs::remove_dir_all(root); 3825 } 3826 3827 #[tokio::test] 3828 async fn runtime_refuses_expensive_count_queries_deterministically() { 3829 let root = temp_root("runtime-count-expensive-refusal"); 3830 let _ = std::fs::remove_dir_all(&root); 3831 let handle = 3832 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); 3833 let mut auth = handle.auth_state().await.expect("auth"); 3834 let cases = [ 3835 ("missing-selector", json!({"kinds": [1], "limit": 1})), 3836 ( 3837 "high-limit", 3838 json!({"kinds": [1], "#h": ["Farm"], "limit": 500}), 3839 ), 3840 ( 3841 "broad-window", 3842 json!({ 3843 "kinds": [1], 3844 "since": 1, 3845 "until": BROAD_QUERY_TIME_WINDOW_SECONDS + 2, 3846 "limit": 1 3847 }), 3848 ), 3849 ]; 3850 3851 for (name, value) in cases { 3852 let subscription_id = SubscriptionId::new(name).expect("subscription"); 3853 let filters = vec![filter_from_value(&value).expect("filter")]; 3854 3855 assert_eq!( 3856 handle 3857 .handle_protocol_client_message_for_test( 3858 ClientMessage::Count { 3859 subscription_id: subscription_id.clone(), 3860 filters 3861 }, 3862 &mut auth, 3863 UnixTimestamp::new(1_714_124_433) 3864 ) 3865 .await 3866 .expect("count"), 3867 vec![RelayMessage::Closed { 3868 subscription_id, 3869 message: "restricted: count filters are too broad or expensive".to_owned() 3870 }] 3871 ); 3872 } 3873 assert_eq!(handle.metrics().count_refusals(), 3); 3874 assert_eq!(handle.metrics().broad_query_rejections(), 3); 3875 3876 let _ = std::fs::remove_dir_all(root); 3877 } 3878 3879 #[tokio::test] 3880 async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() { 3881 let root = temp_root("runtime-generated-offset-fanout"); 3882 let _ = std::fs::remove_dir_all(&root); 3883 let handle = 3884 RelayRuntimeHandle::new(RelayRuntime::open(runtime_config(&root, 8)).expect("runtime")); 3885 let mut offsets = handle.subscribe_events().await; 3886 let mut auth = handle.auth_state().await.expect("auth"); 3887 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 3888 .expect("challenge"); 3889 let auth_event = 3890 tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 120).expect("auth event"); 3891 let create = tangle_v2_group_create_event(FixtureKey::Owner, "RuntimeFarm", 121, &[]) 3892 .expect("create"); 3893 let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions"); 3894 let subscription_id = SubscriptionId::new("generated-offsets").expect("subscription"); 3895 subscriptions 3896 .subscribe( 3897 subscription_id.clone(), 3898 vec![pocket_filter(json!({ 3899 "kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS], 3900 "#d":["RuntimeFarm"] 3901 }))], 3902 ) 3903 .expect("subscribe"); 3904 3905 assert_eq!( 3906 handle 3907 .handle_protocol_client_message_for_test( 3908 ClientMessage::Auth(auth_event.clone()), 3909 &mut auth, 3910 UnixTimestamp::new(120) 3911 ) 3912 .await 3913 .expect("auth"), 3914 vec![RelayMessage::Ok { 3915 event_id: auth_event.id().clone(), 3916 accepted: true, 3917 message: String::new() 3918 }] 3919 ); 3920 assert_eq!( 3921 handle 3922 .handle_protocol_client_message_for_test( 3923 ClientMessage::Event(create.clone()), 3924 &mut auth, 3925 UnixTimestamp::new(121) 3926 ) 3927 .await 3928 .expect("create"), 3929 vec![RelayMessage::Ok { 3930 event_id: create.id().clone(), 3931 accepted: true, 3932 message: String::new() 3933 }] 3934 ); 3935 let source_offset = offsets.try_recv().expect("source offset"); 3936 let generated_offsets = [ 3937 offsets.try_recv().expect("first generated offset"), 3938 offsets.try_recv().expect("second generated offset"), 3939 ]; 3940 assert!(source_offset < generated_offsets[0]); 3941 assert!(generated_offsets[0] < generated_offsets[1]); 3942 let put_member = 3943 tangle_v2_put_user_event(FixtureKey::Owner, "RuntimeFarm", FixtureKey::Member, 122) 3944 .expect("put member"); 3945 assert_eq!( 3946 handle 3947 .handle_protocol_client_message_for_test( 3948 ClientMessage::Event(put_member.clone()), 3949 &mut auth, 3950 UnixTimestamp::new(122) 3951 ) 3952 .await 3953 .expect("put member"), 3954 vec![RelayMessage::Ok { 3955 event_id: put_member.id().clone(), 3956 accepted: true, 3957 message: String::new() 3958 }] 3959 ); 3960 let put_source_offset = offsets.try_recv().expect("put source offset"); 3961 let member_generated_offset = offsets.try_recv().expect("member generated offset"); 3962 assert!(generated_offsets[1] < put_source_offset); 3963 assert!(put_source_offset < member_generated_offset); 3964 let generated_offsets = [ 3965 generated_offsets[0], 3966 generated_offsets[1], 3967 member_generated_offset, 3968 ]; 3969 let mut generated_kinds = BTreeSet::new(); 3970 for offset in generated_offsets { 3971 let messages = handle 3972 .fanout_event_offset(offset, &mut subscriptions, &auth) 3973 .await 3974 .expect("fanout"); 3975 assert!(matches!( 3976 messages.as_slice(), 3977 [RuntimeRelayMessage::Event { 3978 subscription_id: delivered, 3979 event 3980 }] if delivered == &subscription_id 3981 && generated_kinds.insert(u32::from(event.kind().as_u16())) 3982 )); 3983 } 3984 assert_eq!( 3985 generated_kinds, 3986 BTreeSet::from([KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS]) 3987 ); 3988 assert_eq!(handle.metrics().outbox_replayed_events(), 3); 3989 assert_eq!(handle.metrics().outbox_pending_events(), 0); 3990 assert_eq!(handle.metrics().event_bus_published_offsets(), 5); 3991 assert_eq!( 3992 offsets.try_recv().expect_err("only source plus generated"), 3993 TangleEventReceiveError::Empty 3994 ); 3995 3996 let _ = std::fs::remove_dir_all(root); 3997 } 3998 3999 #[tokio::test] 4000 async fn runtime_group_concurrency_duplicate_create_accepts_one_projection() { 4001 let root = temp_root("runtime-group-concurrency-duplicate-create"); 4002 let _ = std::fs::remove_dir_all(&root); 4003 let handle = RelayRuntimeHandle::new( 4004 RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), 4005 ); 4006 let mut offsets = handle.subscribe_events().await; 4007 let owner_auth = 4008 authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-create", 1_714_126_100) 4009 .await; 4010 let first = 4011 tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_101, &[]) 4012 .expect("first create"); 4013 let second = 4014 tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_102, &[]) 4015 .expect("second create"); 4016 let first_task = { 4017 let handle = handle.clone(); 4018 let mut auth = owner_auth.clone(); 4019 let event = first.clone(); 4020 tokio::spawn(async move { 4021 runtime_event_reply(&handle, event, &mut auth, 1_714_126_101).await 4022 }) 4023 }; 4024 let second_task = { 4025 let handle = handle.clone(); 4026 let mut auth = owner_auth.clone(); 4027 let event = second.clone(); 4028 tokio::spawn(async move { 4029 runtime_event_reply(&handle, event, &mut auth, 1_714_126_102).await 4030 }) 4031 }; 4032 let replies = tokio::time::timeout(Duration::from_secs(3), async { 4033 vec![ 4034 first_task.await.expect("first task"), 4035 second_task.await.expect("second task"), 4036 ] 4037 }) 4038 .await 4039 .expect("duplicate create race"); 4040 4041 assert_eq!(accepted_count(&replies), 1); 4042 assert_eq!( 4043 rejected_messages(&replies), 4044 vec!["invalid: group already exists".to_owned()] 4045 ); 4046 assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3); 4047 assert_eq!( 4048 offsets 4049 .try_recv() 4050 .expect_err("one create source plus generated"), 4051 TangleEventReceiveError::Empty 4052 ); 4053 let mut auth = owner_auth.clone(); 4054 assert_eq!( 4055 runtime_group_count( 4056 &handle, 4057 "duplicate-create-count", 4058 "RaceCreate", 4059 KIND_GROUP_METADATA, 4060 "d", 4061 &mut auth, 4062 1_714_126_103, 4063 ) 4064 .await, 4065 1 4066 ); 4067 assert_live_projection_matches_rebuild(&handle, "RaceCreate"); 4068 4069 let _ = std::fs::remove_dir_all(root); 4070 } 4071 4072 #[tokio::test] 4073 async fn runtime_group_concurrency_duplicate_join_accepts_one_membership() { 4074 let root = temp_root("runtime-group-concurrency-duplicate-join"); 4075 let _ = std::fs::remove_dir_all(&root); 4076 let handle = RelayRuntimeHandle::new( 4077 RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), 4078 ); 4079 let mut offsets = handle.subscribe_events().await; 4080 let mut owner_auth = 4081 authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-join", 1_714_126_200) 4082 .await; 4083 let member_auth = 4084 authenticated_runtime_state(&handle, FixtureKey::Member, "member-join", 1_714_126_201) 4085 .await; 4086 let create = 4087 tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoin", 1_714_126_202, &[]) 4088 .expect("create"); 4089 assert_accepted_reply( 4090 runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_202).await, 4091 &create, 4092 ); 4093 assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3); 4094 let join_a = 4095 tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_203).expect("join a"); 4096 let join_b = 4097 tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_204).expect("join b"); 4098 let first_task = { 4099 let handle = handle.clone(); 4100 let mut auth = member_auth.clone(); 4101 let event = join_a.clone(); 4102 tokio::spawn(async move { 4103 runtime_event_reply(&handle, event, &mut auth, 1_714_126_203).await 4104 }) 4105 }; 4106 let second_task = { 4107 let handle = handle.clone(); 4108 let mut auth = member_auth.clone(); 4109 let event = join_b.clone(); 4110 tokio::spawn(async move { 4111 runtime_event_reply(&handle, event, &mut auth, 1_714_126_204).await 4112 }) 4113 }; 4114 let replies = tokio::time::timeout(Duration::from_secs(3), async { 4115 vec![ 4116 first_task.await.expect("first task"), 4117 second_task.await.expect("second task"), 4118 ] 4119 }) 4120 .await 4121 .expect("duplicate join race"); 4122 4123 assert_eq!(accepted_count(&replies), 1); 4124 assert_eq!( 4125 rejected_messages(&replies), 4126 vec!["duplicate: group member already exists".to_owned()] 4127 ); 4128 assert_eq!(drain_offsets(&mut offsets, 2).await.len(), 2); 4129 assert_runtime_member_status( 4130 &handle, 4131 "RaceJoin", 4132 &FixtureKey::Member.public_key(), 4133 MemberStatus::Member, 4134 ); 4135 assert_live_projection_matches_rebuild(&handle, "RaceJoin"); 4136 4137 let _ = std::fs::remove_dir_all(root); 4138 } 4139 4140 #[tokio::test] 4141 async fn runtime_group_concurrency_join_and_leave_match_rebuild() { 4142 let root = temp_root("runtime-group-concurrency-join-leave"); 4143 let _ = std::fs::remove_dir_all(&root); 4144 let handle = RelayRuntimeHandle::new( 4145 RelayRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), 4146 ); 4147 let mut owner_auth = authenticated_runtime_state( 4148 &handle, 4149 FixtureKey::Owner, 4150 "owner-join-leave", 4151 1_714_126_300, 4152 ) 4153 .await; 4154 let member_auth = authenticated_runtime_state( 4155 &handle, 4156 FixtureKey::Member, 4157 "member-join-leave", 4158 1_714_126_301, 4159 ) 4160 .await; 4161 let create = 4162 tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoinLeave", 1_714_126_302, &[]) 4163 .expect("create"); 4164 let put_member = tangle_v2_put_user_event( 4165 FixtureKey::Owner, 4166 "RaceJoinLeave", 4167 FixtureKey::Member, 4168 1_714_126_303, 4169 ) 4170 .expect("put member"); 4171 assert_accepted_reply( 4172 runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_302).await, 4173 &create, 4174 ); 4175 assert_accepted_reply( 4176 runtime_event_reply(&handle, put_member.clone(), &mut owner_auth, 1_714_126_303).await, 4177 &put_member, 4178 ); 4179 let leave = tangle_v2_leave_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_304) 4180 .expect("leave"); 4181 let join = 4182 tangle_v2_join_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_305).expect("join"); 4183 let leave_task = { 4184 let handle = handle.clone(); 4185 let mut auth = member_auth.clone(); 4186 let event = leave.clone(); 4187 tokio::spawn(async move { 4188 runtime_event_reply(&handle, event, &mut auth, 1_714_126_304).await 4189 }) 4190 }; 4191 let join_task = { 4192 let handle = handle.clone(); 4193 let mut auth = member_auth.clone(); 4194 let event = join.clone(); 4195 tokio::spawn(async move { 4196 runtime_event_reply(&handle, event, &mut auth, 1_714_126_305).await 4197 }) 4198 }; 4199 let replies = tokio::time::timeout(Duration::from_secs(3), async { 4200 vec![ 4201 leave_task.await.expect("leave task"), 4202 join_task.await.expect("join task"), 4203 ] 4204 }) 4205 .await 4206 .expect("join leave race"); 4207 let join_accepted = reply_is_accepted(&replies[1]); 4208 4209 assert_eq!(accepted_count(&replies), if join_accepted { 2 } else { 1 }); 4210 if join_accepted { 4211 assert!(rejected_messages(&replies).is_empty()); 4212 } else { 4213 assert_eq!( 4214 rejected_messages(&replies), 4215 vec!["duplicate: group member already exists".to_owned()] 4216 ); 4217 } 4218 assert_runtime_member_status( 4219 &handle, 4220 "RaceJoinLeave", 4221 &FixtureKey::Member.public_key(), 4222 if join_accepted { 4223 MemberStatus::Member 4224 } else { 4225 MemberStatus::Removed 4226 }, 4227 ); 4228 assert_live_projection_matches_rebuild(&handle, "RaceJoinLeave"); 4229 4230 let _ = std::fs::remove_dir_all(root); 4231 } 4232 4233 #[tokio::test] 4234 async fn runtime_group_concurrency_delete_tombstone_blocks_normal_write() { 4235 let root = temp_root("runtime-group-concurrency-delete-write"); 4236 let _ = std::fs::remove_dir_all(&root); 4237 let handle = RelayRuntimeHandle::new( 4238 RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), 4239 ); 4240 let mut owner_auth = 4241 authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-delete", 1_714_126_400) 4242 .await; 4243 let create = 4244 tangle_v2_group_create_event(FixtureKey::Owner, "RaceDelete", 1_714_126_401, &[]) 4245 .expect("create"); 4246 assert_accepted_reply( 4247 runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_401).await, 4248 &create, 4249 ); 4250 let normal = 4251 tangle_v2_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_402, 1, "normal") 4252 .expect("normal"); 4253 let delete = tangle_v2_delete_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_403) 4254 .expect("delete"); 4255 let normal_task = { 4256 let handle = handle.clone(); 4257 let mut auth = owner_auth.clone(); 4258 let event = normal.clone(); 4259 tokio::spawn(async move { 4260 runtime_event_reply(&handle, event, &mut auth, 1_714_126_402).await 4261 }) 4262 }; 4263 let delete_task = { 4264 let handle = handle.clone(); 4265 let mut auth = owner_auth.clone(); 4266 let event = delete.clone(); 4267 tokio::spawn(async move { 4268 runtime_event_reply(&handle, event, &mut auth, 1_714_126_403).await 4269 }) 4270 }; 4271 let replies = tokio::time::timeout(Duration::from_secs(3), async { 4272 vec![ 4273 normal_task.await.expect("normal task"), 4274 delete_task.await.expect("delete task"), 4275 ] 4276 }) 4277 .await 4278 .expect("delete write race"); 4279 let delete_reply = &replies[1]; 4280 4281 assert!(reply_is_accepted(delete_reply)); 4282 assert!( 4283 reply_is_accepted(&replies[0]) 4284 || rejected_messages(&replies) == vec!["blocked: group is deleted".to_owned()] 4285 ); 4286 let mut auth = owner_auth.clone(); 4287 assert_eq!( 4288 runtime_group_count( 4289 &handle, 4290 "deleted-normal-count", 4291 "RaceDelete", 4292 1, 4293 "h", 4294 &mut auth, 4295 1_714_126_404, 4296 ) 4297 .await, 4298 0 4299 ); 4300 assert_eq!( 4301 runtime_group_count( 4302 &handle, 4303 "deleted-marker-count", 4304 "RaceDelete", 4305 KIND_GROUP_DELETE_GROUP, 4306 "h", 4307 &mut auth, 4308 1_714_126_405, 4309 ) 4310 .await, 4311 1 4312 ); 4313 assert_live_projection_matches_rebuild(&handle, "RaceDelete"); 4314 4315 let _ = std::fs::remove_dir_all(root); 4316 } 4317 4318 #[tokio::test] 4319 async fn runtime_group_concurrency_membership_mutation_matches_rebuild() { 4320 let root = temp_root("runtime-group-concurrency-membership-mutation"); 4321 let _ = std::fs::remove_dir_all(&root); 4322 let handle = RelayRuntimeHandle::new( 4323 RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), 4324 ); 4325 let mut owner_auth = authenticated_runtime_state( 4326 &handle, 4327 FixtureKey::Owner, 4328 "owner-membership", 4329 1_714_126_500, 4330 ) 4331 .await; 4332 let create = 4333 tangle_v2_group_create_event(FixtureKey::Owner, "RaceMember", 1_714_126_501, &[]) 4334 .expect("create"); 4335 assert_accepted_reply( 4336 runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_501).await, 4337 &create, 4338 ); 4339 let put_member = tangle_v2_put_user_event( 4340 FixtureKey::Owner, 4341 "RaceMember", 4342 FixtureKey::Member, 4343 1_714_126_502, 4344 ) 4345 .expect("put member"); 4346 let remove_member = tangle_v2_remove_user_event( 4347 FixtureKey::Owner, 4348 "RaceMember", 4349 FixtureKey::Member, 4350 1_714_126_503, 4351 ) 4352 .expect("remove member"); 4353 let put_task = { 4354 let handle = handle.clone(); 4355 let mut auth = owner_auth.clone(); 4356 let event = put_member.clone(); 4357 tokio::spawn(async move { 4358 runtime_event_reply(&handle, event, &mut auth, 1_714_126_502).await 4359 }) 4360 }; 4361 let remove_task = { 4362 let handle = handle.clone(); 4363 let mut auth = owner_auth.clone(); 4364 let event = remove_member.clone(); 4365 tokio::spawn(async move { 4366 runtime_event_reply(&handle, event, &mut auth, 1_714_126_503).await 4367 }) 4368 }; 4369 let replies = tokio::time::timeout(Duration::from_secs(3), async { 4370 vec![ 4371 put_task.await.expect("put task"), 4372 remove_task.await.expect("remove task"), 4373 ] 4374 }) 4375 .await 4376 .expect("membership mutation race"); 4377 let remove_accepted = reply_is_accepted(&replies[1]); 4378 4379 assert!(reply_is_accepted(&replies[0])); 4380 if remove_accepted { 4381 assert!(rejected_messages(&replies).is_empty()); 4382 } else { 4383 assert_eq!( 4384 rejected_messages(&replies), 4385 vec!["duplicate: group member does not exist".to_owned()] 4386 ); 4387 } 4388 assert_runtime_member_status( 4389 &handle, 4390 "RaceMember", 4391 &FixtureKey::Member.public_key(), 4392 if remove_accepted { 4393 MemberStatus::Removed 4394 } else { 4395 MemberStatus::Member 4396 }, 4397 ); 4398 assert_live_projection_matches_rebuild(&handle, "RaceMember"); 4399 4400 let _ = std::fs::remove_dir_all(root); 4401 } 4402 4403 #[tokio::test] 4404 async fn runtime_shared_services_progress_under_concurrent_event_query_count_and_fanout() { 4405 let root = temp_root("runtime-shared-concurrency"); 4406 let _ = std::fs::remove_dir_all(&root); 4407 let handle = RelayRuntimeHandle::new( 4408 RelayRuntime::open(runtime_config(&root, 32)).expect("runtime"), 4409 ); 4410 let base_time = 1_714_126_000; 4411 let mut owner_auth = handle.auth_state().await.expect("owner auth"); 4412 owner_auth 4413 .issue_challenge("owner-stress", UnixTimestamp::new(base_time)) 4414 .expect("owner challenge"); 4415 let owner_auth_event = 4416 runtime_pocket_auth_event(FixtureKey::Owner, "owner-stress", base_time); 4417 assert_eq!( 4418 handle 4419 .handle_client_message( 4420 RuntimeClientMessage::Auth(owner_auth_event.clone()), 4421 &mut owner_auth, 4422 UnixTimestamp::new(base_time) 4423 ) 4424 .await 4425 .expect("owner auth"), 4426 vec![RelayMessage::Ok { 4427 event_id: runtime_pocket_event_id(&owner_auth_event), 4428 accepted: true, 4429 message: String::new() 4430 }] 4431 ); 4432 let create = runtime_pocket_group_create_event( 4433 FixtureKey::Owner, 4434 "StressPrivate", 4435 base_time + 1, 4436 &["private"], 4437 ); 4438 assert_eq!( 4439 handle 4440 .handle_client_message( 4441 RuntimeClientMessage::Event(create.clone()), 4442 &mut owner_auth, 4443 UnixTimestamp::new(base_time + 1) 4444 ) 4445 .await 4446 .expect("create"), 4447 vec![RelayMessage::Ok { 4448 event_id: runtime_pocket_event_id(&create), 4449 accepted: true, 4450 message: String::new() 4451 }] 4452 ); 4453 let put_member = runtime_pocket_put_user_event( 4454 FixtureKey::Owner, 4455 "StressPrivate", 4456 FixtureKey::Member, 4457 base_time + 2, 4458 ); 4459 assert_eq!( 4460 handle 4461 .handle_client_message( 4462 RuntimeClientMessage::Event(put_member.clone()), 4463 &mut owner_auth, 4464 UnixTimestamp::new(base_time + 2) 4465 ) 4466 .await 4467 .expect("put member"), 4468 vec![RelayMessage::Ok { 4469 event_id: runtime_pocket_event_id(&put_member), 4470 accepted: true, 4471 message: String::new() 4472 }] 4473 ); 4474 let mut member_auth = handle.auth_state().await.expect("member auth"); 4475 member_auth 4476 .issue_challenge("member-stress", UnixTimestamp::new(base_time + 3)) 4477 .expect("member challenge"); 4478 let member_auth_event = 4479 runtime_pocket_auth_event(FixtureKey::Member, "member-stress", base_time + 3); 4480 assert_eq!( 4481 handle 4482 .handle_client_message( 4483 RuntimeClientMessage::Auth(member_auth_event.clone()), 4484 &mut member_auth, 4485 UnixTimestamp::new(base_time + 3) 4486 ) 4487 .await 4488 .expect("member auth"), 4489 vec![RelayMessage::Ok { 4490 event_id: runtime_pocket_event_id(&member_auth_event), 4491 accepted: true, 4492 message: String::new() 4493 }] 4494 ); 4495 let public_auth = handle.auth_state().await.expect("public auth"); 4496 let mut offsets = handle.subscribe_events().await; 4497 let group_write_count = 6_usize; 4498 let public_write_count = 4_usize; 4499 let mut write_tasks = Vec::new(); 4500 for index in 0..group_write_count { 4501 let handle = handle.clone(); 4502 let mut auth = member_auth.clone(); 4503 write_tasks.push(tokio::spawn(async move { 4504 let event = runtime_pocket_group_event( 4505 FixtureKey::Member, 4506 "StressPrivate", 4507 base_time + 10 + u64::try_from(index).expect("index"), 4508 1, 4509 &format!("private stress {index}"), 4510 ); 4511 assert_eq!( 4512 handle 4513 .handle_client_message( 4514 RuntimeClientMessage::Event(event.clone()), 4515 &mut auth, 4516 UnixTimestamp::new( 4517 base_time + 10 + u64::try_from(index).expect("index") 4518 ) 4519 ) 4520 .await 4521 .expect("group write"), 4522 vec![RelayMessage::Ok { 4523 event_id: runtime_pocket_event_id(&event), 4524 accepted: true, 4525 message: String::new() 4526 }] 4527 ); 4528 (true, runtime_pocket_event_id(&event)) 4529 })); 4530 } 4531 for index in 0..public_write_count { 4532 let handle = handle.clone(); 4533 let mut auth = public_auth.clone(); 4534 write_tasks.push(tokio::spawn(async move { 4535 let event = runtime_pocket_event( 4536 FixtureKey::Admin, 4537 base_time + 40 + u64::try_from(index).expect("index"), 4538 1, 4539 Vec::new(), 4540 &format!("public stress {index}"), 4541 ); 4542 assert_eq!( 4543 handle 4544 .handle_client_message( 4545 RuntimeClientMessage::Event(event.clone()), 4546 &mut auth, 4547 UnixTimestamp::new( 4548 base_time + 40 + u64::try_from(index).expect("index") 4549 ) 4550 ) 4551 .await 4552 .expect("public write"), 4553 vec![RelayMessage::Ok { 4554 event_id: runtime_pocket_event_id(&event), 4555 accepted: true, 4556 message: String::new() 4557 }] 4558 ); 4559 (false, runtime_pocket_event_id(&event)) 4560 })); 4561 } 4562 let stored_events = tokio::time::timeout(Duration::from_secs(3), async { 4563 let mut stored_events = Vec::new(); 4564 for task in write_tasks { 4565 stored_events.push(task.await.expect("write task")); 4566 } 4567 stored_events 4568 }) 4569 .await 4570 .expect("write concurrency timeout"); 4571 assert_eq!( 4572 stored_events 4573 .iter() 4574 .filter(|(is_group, _)| *is_group) 4575 .count(), 4576 group_write_count 4577 ); 4578 assert_eq!( 4579 stored_events 4580 .iter() 4581 .filter(|(is_group, _)| !*is_group) 4582 .count(), 4583 public_write_count 4584 ); 4585 let group_event_ids = stored_events 4586 .iter() 4587 .filter(|(is_group, _)| *is_group) 4588 .map(|(_, event_id)| event_id.clone()) 4589 .collect::<BTreeSet<_>>(); 4590 let mut published_offsets = Vec::new(); 4591 for _ in 0..stored_events.len() { 4592 published_offsets.push( 4593 tokio::time::timeout(Duration::from_secs(1), offsets.recv()) 4594 .await 4595 .expect("offset timeout") 4596 .expect("offset"), 4597 ); 4598 } 4599 assert_eq!( 4600 offsets.try_recv().expect_err("no extra offsets"), 4601 TangleEventReceiveError::Empty 4602 ); 4603 let mut visibility_tasks = Vec::new(); 4604 for offset in published_offsets.iter().copied() { 4605 let handle = handle.clone(); 4606 let member_auth = member_auth.clone(); 4607 let public_auth = public_auth.clone(); 4608 let group_event_ids = group_event_ids.clone(); 4609 visibility_tasks.push(tokio::spawn(async move { 4610 let member_event = handle 4611 .event_by_offset_with_auth(offset, &member_auth) 4612 .await 4613 .expect("member offset") 4614 .expect("member visible"); 4615 let public_event = handle 4616 .event_by_offset_with_auth(offset, &public_auth) 4617 .await 4618 .expect("public offset"); 4619 let member_event_id = 4620 EventId::new(&member_event.id().as_hex_string()).expect("pocket id"); 4621 let is_group_event = group_event_ids.contains(&member_event_id); 4622 if is_group_event { 4623 assert!(public_event.is_none()); 4624 } else { 4625 assert!(public_event.is_some()); 4626 } 4627 is_group_event 4628 })); 4629 } 4630 let visible_group_offsets = tokio::time::timeout(Duration::from_secs(3), async { 4631 let mut visible_group_offsets = 0; 4632 for task in visibility_tasks { 4633 if task.await.expect("visibility task") { 4634 visible_group_offsets += 1; 4635 } 4636 } 4637 visible_group_offsets 4638 }) 4639 .await 4640 .expect("visibility timeout"); 4641 assert_eq!(visible_group_offsets, group_write_count); 4642 let member_subscription = SubscriptionId::new("member-stress-live").expect("subscription"); 4643 let public_subscription = SubscriptionId::new("public-stress-live").expect("subscription"); 4644 let mut member_subscriptions = LiveSubscriptionSet::new(32, 64).expect("member live set"); 4645 let mut public_subscriptions = LiveSubscriptionSet::new(32, 64).expect("public live set"); 4646 let stress_filter = pocket_filter(json!({"kinds":[1], "#h":["StressPrivate"]})); 4647 member_subscriptions 4648 .subscribe(member_subscription.clone(), vec![stress_filter.clone()]) 4649 .expect("member subscribe"); 4650 public_subscriptions 4651 .subscribe(public_subscription, vec![stress_filter]) 4652 .expect("public subscribe"); 4653 let mut member_fanout_count = 0; 4654 for offset in &published_offsets { 4655 let public_replies = handle 4656 .fanout_event_offset(*offset, &mut public_subscriptions, &public_auth) 4657 .await 4658 .expect("public fanout"); 4659 assert!(public_replies.is_empty()); 4660 let member_replies = handle 4661 .fanout_event_offset(*offset, &mut member_subscriptions, &member_auth) 4662 .await 4663 .expect("member fanout"); 4664 for reply in member_replies { 4665 match reply { 4666 RuntimeRelayMessage::Event { 4667 subscription_id, 4668 event, 4669 } => { 4670 assert_eq!(subscription_id, member_subscription); 4671 let event_id = 4672 EventId::new(&event.id().as_hex_string()).expect("pocket id"); 4673 assert!(group_event_ids.contains(&event_id)); 4674 member_fanout_count += 1; 4675 } 4676 other => panic!("unexpected fanout reply {other:?}"), 4677 } 4678 } 4679 } 4680 assert_eq!(member_fanout_count, group_write_count); 4681 let mut query_tasks = Vec::new(); 4682 for index in 0..3_u64 { 4683 let member_req_handle = handle.clone(); 4684 let mut auth = member_auth.clone(); 4685 let group_event_ids = group_event_ids.clone(); 4686 query_tasks.push(tokio::spawn(async move { 4687 let subscription_id = 4688 SubscriptionId::new(&format!("member-req-{index}")).expect("subscription"); 4689 let replies = member_req_handle 4690 .handle_protocol_client_message_for_test( 4691 ClientMessage::Req { 4692 subscription_id: subscription_id.clone(), 4693 filters: vec![ 4694 filter_from_value(&json!({ 4695 "kinds":[1], 4696 "#h":["StressPrivate"], 4697 "limit": 20 4698 })) 4699 .expect("filter"), 4700 ], 4701 }, 4702 &mut auth, 4703 UnixTimestamp::new(base_time + 100 + index), 4704 ) 4705 .await 4706 .expect("member req"); 4707 assert_eq!( 4708 replies 4709 .iter() 4710 .filter(|reply| matches!( 4711 reply, 4712 RelayMessage::Event { 4713 subscription_id: delivered, 4714 event 4715 } if delivered == &subscription_id && group_event_ids.contains(event.id()) 4716 )) 4717 .count(), 4718 group_event_ids.len() 4719 ); 4720 assert!(matches!( 4721 replies.last(), 4722 Some(RelayMessage::Eose(delivered)) if delivered == &subscription_id 4723 )); 4724 })); 4725 let public_req_handle = handle.clone(); 4726 let mut auth = public_auth.clone(); 4727 query_tasks.push(tokio::spawn(async move { 4728 let subscription_id = 4729 SubscriptionId::new(&format!("public-req-{index}")).expect("subscription"); 4730 let replies = public_req_handle 4731 .handle_protocol_client_message_for_test( 4732 ClientMessage::Req { 4733 subscription_id: subscription_id.clone(), 4734 filters: vec![ 4735 filter_from_value(&json!({ 4736 "kinds":[1], 4737 "#h":["StressPrivate"], 4738 "limit": 20 4739 })) 4740 .expect("filter"), 4741 ], 4742 }, 4743 &mut auth, 4744 UnixTimestamp::new(base_time + 110 + index), 4745 ) 4746 .await 4747 .expect("public req"); 4748 assert_eq!( 4749 replies, 4750 vec![RelayMessage::Closed { 4751 subscription_id, 4752 message: "auth-required: authentication required to read group events" 4753 .to_owned() 4754 }] 4755 ); 4756 })); 4757 let member_count_handle = handle.clone(); 4758 let mut auth = member_auth.clone(); 4759 query_tasks.push(tokio::spawn(async move { 4760 let subscription_id = 4761 SubscriptionId::new(&format!("member-count-{index}")).expect("subscription"); 4762 let replies = member_count_handle 4763 .handle_protocol_client_message_for_test( 4764 ClientMessage::Count { 4765 subscription_id: subscription_id.clone(), 4766 filters: vec![ 4767 filter_from_value(&json!({ 4768 "kinds":[1], 4769 "#h":["StressPrivate"] 4770 })) 4771 .expect("filter"), 4772 ], 4773 }, 4774 &mut auth, 4775 UnixTimestamp::new(base_time + 120 + index), 4776 ) 4777 .await 4778 .expect("member count"); 4779 assert_eq!( 4780 replies, 4781 vec![RelayMessage::Count { 4782 subscription_id, 4783 count: u64::try_from(group_write_count).expect("group count"), 4784 hll: None 4785 }] 4786 ); 4787 })); 4788 let public_count_handle = handle.clone(); 4789 let mut auth = public_auth.clone(); 4790 query_tasks.push(tokio::spawn(async move { 4791 let subscription_id = 4792 SubscriptionId::new(&format!("public-count-{index}")).expect("subscription"); 4793 let replies = public_count_handle 4794 .handle_protocol_client_message_for_test( 4795 ClientMessage::Count { 4796 subscription_id: subscription_id.clone(), 4797 filters: vec![ 4798 filter_from_value(&json!({ 4799 "kinds":[1], 4800 "#h":["StressPrivate"] 4801 })) 4802 .expect("filter"), 4803 ], 4804 }, 4805 &mut auth, 4806 UnixTimestamp::new(base_time + 130 + index), 4807 ) 4808 .await 4809 .expect("public count"); 4810 assert_eq!( 4811 replies, 4812 vec![RelayMessage::Count { 4813 subscription_id, 4814 count: 0, 4815 hll: None 4816 }] 4817 ); 4818 })); 4819 } 4820 tokio::time::timeout(Duration::from_secs(3), async { 4821 for task in query_tasks { 4822 task.await.expect("query task"); 4823 } 4824 }) 4825 .await 4826 .expect("query concurrency timeout"); 4827 assert!(handle.metrics().query_candidates_scanned() > 0); 4828 assert!( 4829 handle.metrics().query_returned_events() 4830 >= u64::try_from(group_write_count * 3).expect("returned event count") 4831 ); 4832 assert!(handle.metrics().query_redacted_events() > 0); 4833 handle.shutdown().await.expect("shutdown"); 4834 4835 let _ = std::fs::remove_dir_all(root); 4836 } 4837 4838 fn runtime_config(root: &Path, per_connection_outbound_queue: usize) -> BaseRelayRuntimeConfig { 4839 runtime_config_with_group_policy(root, per_connection_outbound_queue, false) 4840 } 4841 4842 fn runtime_config_with_public_join( 4843 root: &Path, 4844 per_connection_outbound_queue: usize, 4845 ) -> BaseRelayRuntimeConfig { 4846 runtime_config_with_group_policy(root, per_connection_outbound_queue, true) 4847 } 4848 4849 #[derive(Default)] 4850 struct RecordingHooks { 4851 admissions: Mutex<Vec<RelayEventAdmissionContext>>, 4852 stored: Mutex<Vec<RelayEventStoredContext>>, 4853 } 4854 4855 impl RelayRuntimeHooks for RecordingHooks { 4856 fn admit_event(&self, context: &RelayEventAdmissionContext) -> EventAdmissionDecision { 4857 self.admissions 4858 .lock() 4859 .expect("admissions") 4860 .push(context.clone()); 4861 if context.event().has_tag("policy", "reject") { 4862 EventAdmissionDecision::reject("hook rejected event") 4863 } else { 4864 EventAdmissionDecision::Accept 4865 } 4866 } 4867 4868 fn event_stored(&self, context: &RelayEventStoredContext) { 4869 self.stored.lock().expect("stored").push(context.clone()); 4870 } 4871 } 4872 4873 fn runtime_config_with_group_policy( 4874 root: &Path, 4875 per_connection_outbound_queue: usize, 4876 public_join: bool, 4877 ) -> BaseRelayRuntimeConfig { 4878 let raw = json!({ 4879 "server": { 4880 "listen_addr": "127.0.0.1:0", 4881 "relay_url": "wss://relay.radroots.test" 4882 }, 4883 "pocket": { 4884 "data_directory": root.join("pocket"), 4885 "sync_policy": "flush_on_shutdown", 4886 "query": { 4887 "allow_scraping": false, 4888 "allow_scrape_if_limited_to": 100, 4889 "allow_scrape_if_max_seconds": 3600 4890 } 4891 }, 4892 "groups": { 4893 "enabled": true, 4894 "canonical_relay_url": "wss://relay.radroots.test", 4895 "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", 4896 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], 4897 "policy": { 4898 "public_join": public_join, 4899 "invites_enabled": false 4900 } 4901 }, 4902 "auth": { 4903 "challenge_ttl_seconds": 300, 4904 "created_at_skew_seconds": 600 4905 }, 4906 "limits": { 4907 "max_message_length": 1048576, 4908 "max_subid_length": 64, 4909 "max_subscriptions_per_connection": 64, 4910 "max_filters_per_request": 10, 4911 "max_tag_values_per_filter": 100, 4912 "max_query_complexity": 2048, 4913 "max_limit": 500, 4914 "default_limit": 100, 4915 "max_event_tags": 200, 4916 "max_content_length": 65536, 4917 "broadcast_channel_capacity": 16, 4918 "per_connection_outbound_queue": per_connection_outbound_queue 4919 }, 4920 "rate_limits": { 4921 "auth": { 4922 "per_ip": {"window_seconds": 60, "max_hits": 120}, 4923 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 4924 "failures": {"window_seconds": 300, "max_hits": 5}, 4925 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 4926 }, 4927 "event": { 4928 "per_ip": {"window_seconds": 60, "max_hits": 600}, 4929 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 4930 "per_kind": {"window_seconds": 60, "max_hits": 1000} 4931 }, 4932 "group": { 4933 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 4934 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 4935 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 4936 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 4937 "join_flow": {"window_seconds": 300, "max_hits": 10}, 4938 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 4939 }, 4940 "req": { 4941 "per_ip": {"window_seconds": 60, "max_hits": 600}, 4942 "per_connection": {"window_seconds": 60, "max_hits": 120}, 4943 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 4944 "per_group": {"window_seconds": 60, "max_hits": 240}, 4945 "per_kind": {"window_seconds": 60, "max_hits": 500}, 4946 "broad": {"window_seconds": 60, "max_hits": 30} 4947 }, 4948 "count": { 4949 "per_ip": {"window_seconds": 60, "max_hits": 300}, 4950 "per_connection": {"window_seconds": 60, "max_hits": 60}, 4951 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 4952 "per_group": {"window_seconds": 60, "max_hits": 120}, 4953 "per_kind": {"window_seconds": 60, "max_hits": 240}, 4954 "broad": {"window_seconds": 60, "max_hits": 20} 4955 } 4956 } 4957 }) 4958 .to_string(); 4959 parse_base_relay_runtime_config_json(&raw).expect("config") 4960 } 4961 4962 async fn authenticated_runtime_state( 4963 handle: &RelayRuntimeHandle, 4964 key: FixtureKey, 4965 challenge: &str, 4966 now: u64, 4967 ) -> BaseAuthState { 4968 let mut auth = handle.auth_state().await.expect("auth"); 4969 auth.issue_challenge(challenge, UnixTimestamp::new(now)) 4970 .expect("challenge"); 4971 let event = tangle_v2_auth_event(key, challenge, now).expect("auth event"); 4972 let replies = handle 4973 .handle_protocol_client_message_for_test( 4974 ClientMessage::Auth(event.clone()), 4975 &mut auth, 4976 UnixTimestamp::new(now), 4977 ) 4978 .await 4979 .expect("auth message"); 4980 4981 assert_eq!( 4982 replies, 4983 vec![RelayMessage::Ok { 4984 event_id: event.id().clone(), 4985 accepted: true, 4986 message: String::new() 4987 }] 4988 ); 4989 auth 4990 } 4991 4992 async fn runtime_event_reply( 4993 handle: &RelayRuntimeHandle, 4994 event: Event, 4995 auth: &mut BaseAuthState, 4996 now: u64, 4997 ) -> RelayMessage { 4998 let replies = handle 4999 .handle_protocol_client_message_for_test( 5000 ClientMessage::Event(event), 5001 auth, 5002 UnixTimestamp::new(now), 5003 ) 5004 .await 5005 .expect("event message"); 5006 5007 assert_eq!(replies.len(), 1); 5008 replies.into_iter().next().expect("reply") 5009 } 5010 5011 fn runtime_pocket_event_reply( 5012 handle: &RelayRuntimeHandle, 5013 event: &PocketEvent, 5014 auth: &mut BaseAuthState, 5015 ) -> RelayMessage { 5016 handle 5017 .inner 5018 .handle_pocket_event_with_auth_report(event, auth) 5019 .expect("event message") 5020 .into_message() 5021 } 5022 5023 async fn runtime_group_count( 5024 handle: &RelayRuntimeHandle, 5025 subscription_id: &str, 5026 group_id: &str, 5027 kind: u32, 5028 tag_name: &str, 5029 auth: &mut BaseAuthState, 5030 now: u64, 5031 ) -> u64 { 5032 let replies = handle 5033 .handle_protocol_client_message_for_test( 5034 ClientMessage::Count { 5035 subscription_id: SubscriptionId::new(subscription_id).expect("subscription"), 5036 filters: vec![runtime_group_filter(group_id, kind, tag_name)], 5037 }, 5038 auth, 5039 UnixTimestamp::new(now), 5040 ) 5041 .await 5042 .expect("count"); 5043 5044 match replies.as_slice() { 5045 [RelayMessage::Count { count, .. }] => *count, 5046 other => panic!("count reply expected, got {other:?}"), 5047 } 5048 } 5049 5050 fn runtime_group_filter(group_id: &str, kind: u32, tag_name: &str) -> Filter { 5051 let mut value = json!({"kinds": [kind]}); 5052 value 5053 .as_object_mut() 5054 .expect("filter") 5055 .insert(format!("#{tag_name}"), json!([group_id])); 5056 filter_from_value(&value).expect("filter") 5057 } 5058 5059 async fn drain_offsets(receiver: &mut TangleEventReceiver, count: usize) -> Vec<StoreOffset> { 5060 let mut offsets = Vec::with_capacity(count); 5061 for _ in 0..count { 5062 offsets.push( 5063 tokio::time::timeout(Duration::from_secs(1), receiver.recv()) 5064 .await 5065 .expect("offset timeout") 5066 .expect("offset"), 5067 ); 5068 } 5069 offsets 5070 } 5071 5072 fn accepted_count(replies: &[RelayMessage]) -> usize { 5073 replies 5074 .iter() 5075 .filter(|reply| reply_is_accepted(reply)) 5076 .count() 5077 } 5078 5079 fn reply_is_accepted(reply: &RelayMessage) -> bool { 5080 matches!( 5081 reply, 5082 RelayMessage::Ok { 5083 accepted: true, 5084 message, 5085 .. 5086 } if message.is_empty() 5087 ) 5088 } 5089 5090 fn rejected_messages(replies: &[RelayMessage]) -> Vec<String> { 5091 replies 5092 .iter() 5093 .filter_map(|reply| match reply { 5094 RelayMessage::Ok { 5095 accepted: false, 5096 message, 5097 .. 5098 } => Some(message.clone()), 5099 _ => None, 5100 }) 5101 .collect() 5102 } 5103 5104 fn assert_accepted_reply(reply: RelayMessage, event: &Event) { 5105 assert_eq!( 5106 reply, 5107 RelayMessage::Ok { 5108 event_id: event.id().clone(), 5109 accepted: true, 5110 message: String::new() 5111 } 5112 ); 5113 } 5114 5115 fn assert_accepted_pocket_reply(reply: RelayMessage, event: &PocketEvent) { 5116 assert_eq!( 5117 reply, 5118 RelayMessage::Ok { 5119 event_id: runtime_pocket_event_id(event), 5120 accepted: true, 5121 message: String::new() 5122 } 5123 ); 5124 } 5125 5126 fn runtime_pocket_event_id(event: &PocketEvent) -> EventId { 5127 EventId::new(&event.id().as_hex_string()).expect("event id") 5128 } 5129 5130 fn assert_runtime_member_status( 5131 handle: &RelayRuntimeHandle, 5132 group_id: &str, 5133 pubkey: &PublicKeyHex, 5134 status: MemberStatus, 5135 ) { 5136 let group_id = GroupId::new(group_id).expect("group"); 5137 let groups = handle.inner.groups.as_ref().expect("groups"); 5138 let projection = groups.projection(); 5139 5140 assert_eq!( 5141 projection 5142 .member(&group_id, pubkey) 5143 .expect("member") 5144 .status(), 5145 status 5146 ); 5147 } 5148 5149 fn assert_live_projection_matches_rebuild(handle: &RelayRuntimeHandle, group_id: &str) { 5150 let group_id = GroupId::new(group_id).expect("group"); 5151 let groups = handle.inner.groups.as_ref().expect("groups"); 5152 let live = groups.projection(); 5153 let rebuilt = rebuilt_projection(handle); 5154 let live_group = live.group(&group_id); 5155 let rebuilt_group = rebuilt.group(&group_id); 5156 5157 assert_eq!( 5158 live_group.map(|group| group.lifecycle()), 5159 rebuilt_group.map(|group| group.lifecycle()) 5160 ); 5161 assert_eq!( 5162 live_group.map(|group| group.metadata()), 5163 rebuilt_group.map(|group| group.metadata()) 5164 ); 5165 assert_eq!( 5166 live_group.and_then(|group| group.delete_event_id()), 5167 rebuilt_group.and_then(|group| group.delete_event_id()) 5168 ); 5169 assert_eq!(live.tombstone(&group_id), rebuilt.tombstone(&group_id)); 5170 assert_eq!( 5171 projection_member_statuses(&live, &group_id), 5172 projection_member_statuses(&rebuilt, &group_id) 5173 ); 5174 } 5175 5176 fn rebuilt_projection(handle: &RelayRuntimeHandle) -> GroupProjection { 5177 let groups = handle.inner.groups.as_ref().expect("groups"); 5178 let limits = groups.limits(); 5179 let events = handle 5180 .inner 5181 .store 5182 .scan_events() 5183 .expect("scan") 5184 .into_iter() 5185 .filter_map(|stored| { 5186 let store_offset = StoreOffset::new(stored.store_offset()); 5187 match tangle_groups::classify_group_event(stored.event(), limits).expect("classify") 5188 { 5189 GroupEventClass::NonGroup => None, 5190 _ => Some(CanonicalGroupEvent::new(stored.into_event(), store_offset)), 5191 } 5192 }) 5193 .collect::<Vec<_>>(); 5194 5195 rebuild_group_projection(events, limits, UnixTimestamp::new(1_714_199_999)) 5196 .expect("rebuild") 5197 .into_projection() 5198 } 5199 5200 fn projection_member_statuses( 5201 projection: &GroupProjection, 5202 group_id: &GroupId, 5203 ) -> BTreeMap<String, MemberStatus> { 5204 projection 5205 .members() 5206 .iter() 5207 .filter(|((candidate, _), _)| candidate == group_id) 5208 .map(|((_, pubkey), member)| (pubkey.as_str().to_owned(), member.status())) 5209 .collect() 5210 } 5211 5212 fn runtime_relay_limits(max_pending_events: usize) -> BaseRelayLimits { 5213 BaseRelayLimits::new(BaseRelayLimitSettings { 5214 max_pending_events, 5215 max_subscription_id_length: 64, 5216 max_subscriptions: 64, 5217 max_filters_per_request: 10, 5218 max_tag_values_per_filter: 100, 5219 max_query_complexity: 610, 5220 max_event_tags: 200, 5221 max_content_length: 65_536, 5222 max_limit: 500, 5223 default_limit: 100, 5224 }) 5225 .expect("limits") 5226 } 5227 5228 fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter { 5229 let filter = filter_from_value(&value).expect("filter"); 5230 crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter") 5231 } 5232 5233 fn tangle_v2_event( 5234 key: FixtureKey, 5235 created_at: u64, 5236 kind: u64, 5237 tags: Vec<Tag>, 5238 content: &str, 5239 ) -> Result<Event, String> { 5240 let event = runtime_pocket_event(key, created_at, kind, tags, content); 5241 runtime_pocket_event_to_protocol(&event) 5242 } 5243 5244 fn tangle_v2_auth_event( 5245 key: FixtureKey, 5246 challenge: &str, 5247 created_at: u64, 5248 ) -> Result<Event, String> { 5249 tangle_v2_event( 5250 key, 5251 created_at, 5252 22_242, 5253 vec![ 5254 Tag::from_parts("relay", &["wss://relay.radroots.test"])?, 5255 Tag::from_parts("challenge", &[challenge])?, 5256 ], 5257 "", 5258 ) 5259 } 5260 5261 fn tangle_v2_group_create_event( 5262 key: FixtureKey, 5263 group_id: &str, 5264 created_at: u64, 5265 flags: &[&str], 5266 ) -> Result<Event, String> { 5267 let mut tags = vec![ 5268 Tag::from_parts("h", &[group_id])?, 5269 Tag::from_parts("name", &[group_id])?, 5270 ]; 5271 for flag in flags { 5272 tags.push(Tag::from_parts(flag, &[])?); 5273 } 5274 tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 5275 } 5276 5277 fn tangle_v2_put_user_event( 5278 key: FixtureKey, 5279 group_id: &str, 5280 target: FixtureKey, 5281 created_at: u64, 5282 ) -> Result<Event, String> { 5283 let target_pubkey = target.public_key(); 5284 tangle_v2_event( 5285 key, 5286 created_at, 5287 KIND_GROUP_PUT_USER.into(), 5288 vec![ 5289 Tag::from_parts("h", &[group_id])?, 5290 Tag::from_parts("p", &[target_pubkey.as_str()])?, 5291 ], 5292 "", 5293 ) 5294 } 5295 5296 fn tangle_v2_remove_user_event( 5297 key: FixtureKey, 5298 group_id: &str, 5299 target: FixtureKey, 5300 created_at: u64, 5301 ) -> Result<Event, String> { 5302 let target_pubkey = target.public_key(); 5303 tangle_v2_event( 5304 key, 5305 created_at, 5306 KIND_GROUP_REMOVE_USER.into(), 5307 vec![ 5308 Tag::from_parts("h", &[group_id])?, 5309 Tag::from_parts("p", &[target_pubkey.as_str()])?, 5310 ], 5311 "", 5312 ) 5313 } 5314 5315 fn tangle_v2_join_event( 5316 key: FixtureKey, 5317 group_id: &str, 5318 created_at: u64, 5319 ) -> Result<Event, String> { 5320 tangle_v2_group_event( 5321 key, 5322 group_id, 5323 created_at, 5324 KIND_GROUP_JOIN_REQUEST.into(), 5325 "", 5326 ) 5327 } 5328 5329 fn tangle_v2_leave_event( 5330 key: FixtureKey, 5331 group_id: &str, 5332 created_at: u64, 5333 ) -> Result<Event, String> { 5334 tangle_v2_group_event( 5335 key, 5336 group_id, 5337 created_at, 5338 KIND_GROUP_LEAVE_REQUEST.into(), 5339 "", 5340 ) 5341 } 5342 5343 fn tangle_v2_delete_group_event( 5344 key: FixtureKey, 5345 group_id: &str, 5346 created_at: u64, 5347 ) -> Result<Event, String> { 5348 tangle_v2_group_event( 5349 key, 5350 group_id, 5351 created_at, 5352 KIND_GROUP_DELETE_GROUP.into(), 5353 "", 5354 ) 5355 } 5356 5357 fn tangle_v2_group_event( 5358 key: FixtureKey, 5359 group_id: &str, 5360 created_at: u64, 5361 kind: u64, 5362 content: &str, 5363 ) -> Result<Event, String> { 5364 tangle_v2_event( 5365 key, 5366 created_at, 5367 kind, 5368 vec![Tag::from_parts("h", &[group_id])?], 5369 content, 5370 ) 5371 } 5372 5373 fn runtime_pocket_group_create_event( 5374 key: FixtureKey, 5375 group_id: &str, 5376 created_at: u64, 5377 flags: &[&str], 5378 ) -> PocketOwnedEvent { 5379 let mut tags = vec![ 5380 Tag::from_parts("h", &[group_id]).expect("h"), 5381 Tag::from_parts("name", &[group_id]).expect("name"), 5382 ]; 5383 for flag in flags { 5384 tags.push(Tag::from_parts(flag, &[]).expect("flag")); 5385 } 5386 runtime_pocket_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 5387 } 5388 5389 fn runtime_pocket_auth_event( 5390 key: FixtureKey, 5391 challenge: &str, 5392 created_at: u64, 5393 ) -> PocketOwnedEvent { 5394 runtime_pocket_event( 5395 key, 5396 created_at, 5397 22_242, 5398 vec![ 5399 Tag::from_parts("relay", &["wss://relay.radroots.test"]).expect("relay"), 5400 Tag::from_parts("challenge", &[challenge]).expect("challenge"), 5401 ], 5402 "", 5403 ) 5404 } 5405 5406 fn runtime_pocket_put_user_event( 5407 key: FixtureKey, 5408 group_id: &str, 5409 target: FixtureKey, 5410 created_at: u64, 5411 ) -> PocketOwnedEvent { 5412 let target_pubkey = target.public_key(); 5413 runtime_pocket_event( 5414 key, 5415 created_at, 5416 KIND_GROUP_PUT_USER.into(), 5417 vec![ 5418 Tag::from_parts("h", &[group_id]).expect("h"), 5419 Tag::from_parts("p", &[target_pubkey.as_str()]).expect("p"), 5420 ], 5421 "", 5422 ) 5423 } 5424 5425 fn runtime_pocket_group_event( 5426 key: FixtureKey, 5427 group_id: &str, 5428 created_at: u64, 5429 kind: u64, 5430 content: &str, 5431 ) -> PocketOwnedEvent { 5432 runtime_pocket_event( 5433 key, 5434 created_at, 5435 kind, 5436 vec![Tag::from_parts("h", &[group_id]).expect("h")], 5437 content, 5438 ) 5439 } 5440 5441 fn runtime_pocket_event( 5442 key: FixtureKey, 5443 created_at: u64, 5444 kind: u64, 5445 tags: Vec<Tag>, 5446 content: &str, 5447 ) -> PocketOwnedEvent { 5448 let tags = pocket_tags_from_protocol(&tags); 5449 signed_pocket_event( 5450 fixture_secret_byte(key), 5451 created_at, 5452 u16::try_from(kind).expect("pocket kind"), 5453 &tags, 5454 content.as_bytes(), 5455 ) 5456 } 5457 5458 fn runtime_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> { 5459 let tags = event 5460 .tags() 5461 .map_err(|error| error.to_string())? 5462 .iter() 5463 .map(|tag| { 5464 Tag::new( 5465 tag.map(|value| { 5466 std::str::from_utf8(value) 5467 .map(str::to_owned) 5468 .map_err(|error| error.to_string()) 5469 }) 5470 .collect::<Result<Vec<_>, _>>()?, 5471 ) 5472 .map_err(|error| error.to_string()) 5473 }) 5474 .collect::<Result<Vec<_>, _>>()?; 5475 Ok(Event::new( 5476 EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?, 5477 UnsignedEvent::new( 5478 PublicKeyHex::new(&event.pubkey().as_hex_string()) 5479 .map_err(|error| error.to_string())?, 5480 UnixTimestamp::new(event.created_at().as_u64()), 5481 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?, 5482 tags, 5483 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?, 5484 ), 5485 SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?, 5486 )) 5487 } 5488 5489 fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 5490 let parts = tags 5491 .iter() 5492 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 5493 .collect::<Vec<_>>(); 5494 PocketOwnedTags::new(&parts).expect("pocket tags") 5495 } 5496 5497 fn fixture_secret_byte(key: FixtureKey) -> u8 { 5498 match key { 5499 FixtureKey::Relay => 9, 5500 FixtureKey::Admin => 11, 5501 FixtureKey::Member => 12, 5502 FixtureKey::Outsider => 13, 5503 FixtureKey::Owner => 10, 5504 } 5505 } 5506 5507 fn signed_pocket_event( 5508 secret_byte: u8, 5509 created_at: u64, 5510 kind: u16, 5511 tags: &PocketOwnedTags, 5512 content: &[u8], 5513 ) -> PocketOwnedEvent { 5514 let secret = format!("{secret_byte:02x}").repeat(32); 5515 RelaySigner::from_secret_hex(&secret) 5516 .expect("signer") 5517 .sign_pocket_event( 5518 PocketKind::from_u16(kind), 5519 tags, 5520 PocketTime::from_u64(created_at), 5521 content, 5522 ) 5523 .expect("pocket event") 5524 } 5525 5526 fn temp_root(name: &str) -> PathBuf { 5527 std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) 5528 } 5529 }