core.rs (175602B)
1 use crate::errors::{BaseRelayError, ok_accepted, ok_rejected}; 2 use crate::groups::{ 3 GroupEventWrite, GroupEventWriteError, GroupProjectionReadGuard, GroupServiceHandle, 4 }; 5 use crate::logging::{self, TangleModerationAuditResult}; 6 use crate::ops::BaseRelayReadinessState; 7 #[cfg(test)] 8 use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket}; 9 use crate::pocket_event_validation::{ 10 is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind, 11 pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, 12 }; 13 #[cfg(test)] 14 use crate::relay::outbound::protocol_messages_for_test; 15 use crate::relay::{ 16 auth::BaseAuthState, 17 live::{CloseResult, LiveSubscriptionSet}, 18 outbound::RuntimeRelayMessage, 19 }; 20 use std::{ 21 cell::{Cell, RefCell}, 22 collections::BTreeSet, 23 }; 24 use tangle_groups::{ 25 GroupAuthContext, GroupEventClass, GroupEventView, GroupId, GroupRuntimeConfig, 26 NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset, classify_group_event, 27 validate_client_group_event_structure, 28 }; 29 #[cfg(test)] 30 use tangle_protocol::{ClientMessage, Event, Filter}; 31 use tangle_protocol::{RelayMessage, SubscriptionId, UnixTimestamp}; 32 use tangle_store_pocket::{ 33 PocketEvent, PocketFilter, PocketHll8, PocketOwnedEvent, PocketOwnedFilter, PocketQueryConfig, 34 PocketScreenResult, PocketStoreConfig, PocketStoreHandle, 35 }; 36 37 pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled"; 38 39 pub struct BaseRelay { 40 store: PocketStoreHandle, 41 subscriptions: LiveSubscriptionSet, 42 groups: Option<GroupServiceHandle>, 43 readiness: BaseRelayReadinessState, 44 limits: BaseRelayLimits, 45 query: PocketQueryConfig, 46 } 47 48 #[derive(Debug, Clone, PartialEq)] 49 pub(crate) struct BaseRelayEventWrite { 50 message: RelayMessage, 51 stored_offsets: Vec<StoreOffset>, 52 } 53 54 impl BaseRelayEventWrite { 55 fn stored(message: RelayMessage, stored_offsets: Vec<StoreOffset>) -> Self { 56 Self { 57 message, 58 stored_offsets, 59 } 60 } 61 62 fn unstored(message: RelayMessage) -> Self { 63 Self { 64 message, 65 stored_offsets: Vec::new(), 66 } 67 } 68 69 pub(crate) fn stored_offsets(&self) -> &[StoreOffset] { 70 &self.stored_offsets 71 } 72 73 pub(crate) fn into_message(self) -> RelayMessage { 74 self.message 75 } 76 } 77 78 #[derive(Debug, Clone, PartialEq)] 79 pub(crate) struct BaseRelayQueryReport { 80 messages: Vec<RuntimeRelayMessage>, 81 group_read_denied: bool, 82 query_metrics: BaseRelayQueryMetrics, 83 } 84 85 pub(crate) struct BaseRelayReqQuery<'a> { 86 subscription_id: SubscriptionId, 87 filters: Vec<PocketOwnedFilter>, 88 search_present: bool, 89 auth: &'a BaseAuthState, 90 } 91 92 impl<'a> BaseRelayReqQuery<'a> { 93 pub(crate) fn new( 94 subscription_id: SubscriptionId, 95 filters: Vec<PocketOwnedFilter>, 96 search_present: bool, 97 auth: &'a BaseAuthState, 98 ) -> Self { 99 Self { 100 subscription_id, 101 filters, 102 search_present, 103 auth, 104 } 105 } 106 } 107 108 struct BaseRelayGroupReqQuery<'a> { 109 subscription_id: SubscriptionId, 110 filters: Vec<PocketOwnedFilter>, 111 search_present: bool, 112 auth: &'a GroupAuthContext, 113 } 114 115 pub(crate) struct BaseRelayCountQuery<'a> { 116 subscription_id: SubscriptionId, 117 filters: Vec<PocketOwnedFilter>, 118 search_present: bool, 119 auth: &'a BaseAuthState, 120 } 121 122 impl<'a> BaseRelayCountQuery<'a> { 123 pub(crate) fn new( 124 subscription_id: SubscriptionId, 125 filters: Vec<PocketOwnedFilter>, 126 search_present: bool, 127 auth: &'a BaseAuthState, 128 ) -> Self { 129 Self { 130 subscription_id, 131 filters, 132 search_present, 133 auth, 134 } 135 } 136 } 137 138 struct BaseRelayGroupCountQuery<'a> { 139 subscription_id: SubscriptionId, 140 filters: Vec<PocketOwnedFilter>, 141 search_present: bool, 142 auth: &'a GroupAuthContext, 143 } 144 145 impl BaseRelayQueryReport { 146 fn new( 147 messages: Vec<RuntimeRelayMessage>, 148 group_read_denied: bool, 149 query_metrics: BaseRelayQueryMetrics, 150 ) -> Self { 151 Self { 152 messages, 153 group_read_denied, 154 query_metrics, 155 } 156 } 157 158 pub(crate) fn group_read_denied(&self) -> bool { 159 self.group_read_denied 160 } 161 162 pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics { 163 self.query_metrics 164 } 165 166 pub(crate) fn into_messages(self) -> Vec<RuntimeRelayMessage> { 167 self.messages 168 } 169 } 170 171 #[derive(Debug, Clone, PartialEq)] 172 pub(crate) struct BaseRelayCountReport { 173 message: RelayMessage, 174 group_read_denied: bool, 175 query_metrics: BaseRelayQueryMetrics, 176 } 177 178 impl BaseRelayCountReport { 179 fn new( 180 message: RelayMessage, 181 group_read_denied: bool, 182 query_metrics: BaseRelayQueryMetrics, 183 ) -> Self { 184 Self { 185 message, 186 group_read_denied, 187 query_metrics, 188 } 189 } 190 191 pub(crate) fn group_read_denied(&self) -> bool { 192 self.group_read_denied 193 } 194 195 pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics { 196 self.query_metrics 197 } 198 199 pub(crate) fn into_message(self) -> RelayMessage { 200 self.message 201 } 202 } 203 204 #[derive(Debug, Clone, PartialEq)] 205 struct BaseRelayEventQueryReport { 206 events: Vec<PocketOwnedEvent>, 207 group_read_denied: bool, 208 query_metrics: BaseRelayQueryMetrics, 209 } 210 211 impl BaseRelayEventQueryReport { 212 fn new( 213 events: Vec<PocketOwnedEvent>, 214 group_read_denied: bool, 215 query_metrics: BaseRelayQueryMetrics, 216 ) -> Self { 217 Self { 218 events, 219 group_read_denied, 220 query_metrics, 221 } 222 } 223 } 224 225 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] 226 pub(crate) struct BaseRelayQueryMetrics { 227 candidates_scanned: u64, 228 returned_events: u64, 229 redacted_events: u64, 230 } 231 232 impl BaseRelayQueryMetrics { 233 pub(crate) fn new(candidates_scanned: u64, returned_events: u64, redacted_events: u64) -> Self { 234 Self { 235 candidates_scanned, 236 returned_events, 237 redacted_events, 238 } 239 } 240 241 fn add(self, other: Self) -> Self { 242 Self { 243 candidates_scanned: self 244 .candidates_scanned 245 .saturating_add(other.candidates_scanned), 246 returned_events: self.returned_events.saturating_add(other.returned_events), 247 redacted_events: self.redacted_events.saturating_add(other.redacted_events), 248 } 249 } 250 251 fn with_returned_events(self, returned_events: usize) -> Self { 252 Self { 253 returned_events: u64::try_from(returned_events).expect("returned events fit in u64"), 254 ..self 255 } 256 } 257 258 pub(crate) fn candidates_scanned(self) -> u64 { 259 self.candidates_scanned 260 } 261 262 pub(crate) fn returned_events(self) -> u64 { 263 self.returned_events 264 } 265 266 pub(crate) fn redacted_events(self) -> u64 { 267 self.redacted_events 268 } 269 } 270 271 #[derive(Debug, Clone, PartialEq, Eq)] 272 struct BaseRelayCountEventsReport { 273 count: u64, 274 hll: Option<String>, 275 group_read_denied: bool, 276 query_metrics: BaseRelayQueryMetrics, 277 } 278 279 impl BaseRelayCountEventsReport { 280 fn new( 281 count: u64, 282 hll: Option<String>, 283 group_read_denied: bool, 284 query_metrics: BaseRelayQueryMetrics, 285 ) -> Self { 286 Self { 287 count, 288 hll, 289 group_read_denied, 290 query_metrics, 291 } 292 } 293 } 294 295 struct BaseRelayCountHll { 296 offset: Option<usize>, 297 hll: Option<PocketHll8>, 298 suppressed: bool, 299 } 300 301 #[derive(Debug, Clone, PartialEq, Eq)] 302 enum BaseRelayCountHllGroupTargets { 303 None, 304 Suppress, 305 Targets(Vec<GroupId>), 306 } 307 308 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 309 enum BaseRelayCountHllTargetPolicy { 310 Eligible, 311 Suppress, 312 } 313 314 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 315 enum BaseRelayCountHllDTagMode { 316 Ignore, 317 Target, 318 Suppress, 319 } 320 321 impl BaseRelayCountHll { 322 fn new(filters: &[PocketOwnedFilter]) -> Result<Self, BaseRelayError> { 323 let offset = BaseRelay::count_hll_offset(filters)?; 324 Ok(Self { 325 offset, 326 hll: offset.map(|_| PocketHll8::new()), 327 suppressed: false, 328 }) 329 } 330 331 fn suppress(&mut self) { 332 if self.offset.is_some() { 333 self.suppressed = true; 334 } 335 } 336 337 fn suppress_for_filter_targets( 338 &mut self, 339 groups: Option<&GroupServiceHandle>, 340 filters: &[PocketOwnedFilter], 341 ) { 342 if self.offset.is_none() { 343 return; 344 } 345 let [filter] = filters else { 346 return; 347 }; 348 if BaseRelay::count_hll_filter_target_policy(groups, filter) 349 == BaseRelayCountHllTargetPolicy::Suppress 350 { 351 self.suppress(); 352 } 353 } 354 355 fn observe( 356 &mut self, 357 groups: Option<&GroupServiceHandle>, 358 event: &PocketEvent, 359 ) -> Result<(), BaseRelayError> { 360 let Some(offset) = self.offset else { 361 return Ok(()); 362 }; 363 if BaseRelay::event_suppresses_count_hll(groups, event)? { 364 self.suppressed = true; 365 return Ok(()); 366 } 367 if let Some(hll) = &mut self.hll { 368 hll.add_element(event.pubkey().as_bytes(), offset) 369 .map_err(|error| BaseRelayError::error(error.to_string()))?; 370 } 371 Ok(()) 372 } 373 374 fn into_hex(self) -> Option<String> { 375 (!self.suppressed) 376 .then(|| self.hll.map(|value| value.to_hex_string())) 377 .flatten() 378 } 379 } 380 381 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 382 enum BaseRelayFilterLimitMode { 383 ApplyDefaultLimit, 384 PreserveCountLimitless, 385 } 386 387 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 388 pub struct BaseRelayShutdownReport { 389 closed_subscriptions: usize, 390 } 391 392 impl BaseRelayShutdownReport { 393 pub fn new(closed_subscriptions: usize) -> Self { 394 Self { 395 closed_subscriptions, 396 } 397 } 398 399 pub fn closed_subscriptions(self) -> usize { 400 self.closed_subscriptions 401 } 402 } 403 404 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 405 pub struct BaseRelayLimits { 406 max_pending_events: usize, 407 max_subscription_id_length: usize, 408 max_subscriptions: usize, 409 max_filters_per_request: usize, 410 max_tag_values_per_filter: usize, 411 max_query_complexity: usize, 412 max_event_tags: usize, 413 max_content_length: usize, 414 max_limit: u64, 415 default_limit: u64, 416 } 417 418 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 419 pub struct BaseRelayLimitSettings { 420 pub max_pending_events: usize, 421 pub max_subscription_id_length: usize, 422 pub max_subscriptions: usize, 423 pub max_filters_per_request: usize, 424 pub max_tag_values_per_filter: usize, 425 pub max_query_complexity: usize, 426 pub max_event_tags: usize, 427 pub max_content_length: usize, 428 pub max_limit: u64, 429 pub default_limit: u64, 430 } 431 432 impl BaseRelayLimits { 433 pub fn new(settings: BaseRelayLimitSettings) -> Result<Self, BaseRelayError> { 434 let max_pending_events = settings.max_pending_events; 435 let max_subscription_id_length = settings.max_subscription_id_length; 436 let max_subscriptions = settings.max_subscriptions; 437 let max_filters_per_request = settings.max_filters_per_request; 438 let max_tag_values_per_filter = settings.max_tag_values_per_filter; 439 let max_query_complexity = settings.max_query_complexity; 440 let max_event_tags = settings.max_event_tags; 441 let max_content_length = settings.max_content_length; 442 let max_limit = settings.max_limit; 443 let default_limit = settings.default_limit; 444 if max_pending_events == 0 { 445 return Err(BaseRelayError::invalid( 446 "runtime max pending events must be greater than zero", 447 )); 448 } 449 if max_subscription_id_length == 0 { 450 return Err(BaseRelayError::invalid( 451 "runtime max subscription id length must be greater than zero", 452 )); 453 } 454 if max_subscriptions == 0 { 455 return Err(BaseRelayError::invalid( 456 "runtime max subscriptions per connection must be greater than zero", 457 )); 458 } 459 if max_filters_per_request == 0 { 460 return Err(BaseRelayError::invalid( 461 "runtime max filters per request must be greater than zero", 462 )); 463 } 464 if max_tag_values_per_filter == 0 { 465 return Err(BaseRelayError::invalid( 466 "runtime max tag values per filter must be greater than zero", 467 )); 468 } 469 if max_query_complexity == 0 { 470 return Err(BaseRelayError::invalid( 471 "runtime max query complexity must be greater than zero", 472 )); 473 } 474 if max_event_tags == 0 { 475 return Err(BaseRelayError::invalid( 476 "runtime max event tags must be greater than zero", 477 )); 478 } 479 if max_content_length == 0 { 480 return Err(BaseRelayError::invalid( 481 "runtime max content length must be greater than zero", 482 )); 483 } 484 if max_limit == 0 { 485 return Err(BaseRelayError::invalid( 486 "runtime max filter limit must be greater than zero", 487 )); 488 } 489 if default_limit == 0 { 490 return Err(BaseRelayError::invalid( 491 "runtime default filter limit must be greater than zero", 492 )); 493 } 494 if default_limit > max_limit { 495 return Err(BaseRelayError::invalid( 496 "runtime default filter limit must not exceed max filter limit", 497 )); 498 } 499 if usize::try_from(default_limit).is_ok_and(|limit| limit > max_query_complexity) { 500 return Err(BaseRelayError::invalid( 501 "runtime default filter limit must not exceed max query complexity", 502 )); 503 } 504 Ok(Self { 505 max_pending_events, 506 max_subscription_id_length, 507 max_subscriptions, 508 max_filters_per_request, 509 max_tag_values_per_filter, 510 max_query_complexity, 511 max_event_tags, 512 max_content_length, 513 max_limit, 514 default_limit, 515 }) 516 } 517 518 pub fn max_pending_events(self) -> usize { 519 self.max_pending_events 520 } 521 522 pub fn max_subscription_id_length(self) -> usize { 523 self.max_subscription_id_length 524 } 525 526 pub fn max_subscriptions(self) -> usize { 527 self.max_subscriptions 528 } 529 530 pub fn max_filters_per_request(self) -> usize { 531 self.max_filters_per_request 532 } 533 534 pub fn max_tag_values_per_filter(self) -> usize { 535 self.max_tag_values_per_filter 536 } 537 538 pub fn max_query_complexity(self) -> usize { 539 self.max_query_complexity 540 } 541 542 pub fn max_event_tags(self) -> usize { 543 self.max_event_tags 544 } 545 546 pub fn max_content_length(self) -> usize { 547 self.max_content_length 548 } 549 550 pub fn max_limit(self) -> u64 { 551 self.max_limit 552 } 553 554 pub fn default_limit(self) -> u64 { 555 self.default_limit 556 } 557 558 #[cfg(test)] 559 fn validate_protocol_event_for_test(&self, event: &Event) -> Result<(), BaseRelayError> { 560 if event.unsigned().tags().len() > self.max_event_tags { 561 return Err(BaseRelayError::invalid(format!( 562 "event tag count exceeds runtime max_event_tags {}", 563 self.max_event_tags 564 ))); 565 } 566 if event.unsigned().content().len() > self.max_content_length { 567 return Err(BaseRelayError::invalid(format!( 568 "event content length exceeds runtime max_content_length {}", 569 self.max_content_length 570 ))); 571 } 572 Ok(()) 573 } 574 575 pub(crate) fn validate_pocket_event(&self, event: &PocketEvent) -> Result<(), BaseRelayError> { 576 validate_pocket_event_shape(event, self.max_event_tags, self.max_content_length) 577 } 578 579 pub fn validate_subscription_id( 580 &self, 581 subscription_id: &SubscriptionId, 582 ) -> Result<(), BaseRelayError> { 583 let actual = subscription_id.as_str().chars().count(); 584 if actual > self.max_subscription_id_length { 585 return Err(BaseRelayError::invalid(format!( 586 "subscription id length exceeds runtime max_subid_length {}", 587 self.max_subscription_id_length 588 ))); 589 } 590 Ok(()) 591 } 592 593 pub(crate) fn validate_pocket_filters( 594 &self, 595 filters: &[PocketOwnedFilter], 596 ) -> Result<(), BaseRelayError> { 597 if filters.is_empty() { 598 return Err(BaseRelayError::invalid( 599 "request must include at least one filter", 600 )); 601 } 602 if filters.len() > self.max_filters_per_request { 603 return Err(BaseRelayError::invalid(format!( 604 "filter count exceeds runtime max_filters_per_request {}", 605 self.max_filters_per_request 606 ))); 607 } 608 for filter in filters { 609 let tag_values = filter 610 .tags() 611 .map_err(|error| BaseRelayError::error(error.to_string()))? 612 .iter() 613 .map(|tag| tag.skip(1).count()) 614 .sum::<usize>(); 615 if tag_values > self.max_tag_values_per_filter { 616 return Err(BaseRelayError::invalid(format!( 617 "filter tag value count exceeds runtime max_tag_values_per_filter {}", 618 self.max_tag_values_per_filter 619 ))); 620 } 621 if filter.limit() != u32::MAX && u64::from(filter.limit()) > self.max_limit { 622 return Err(BaseRelayError::invalid(format!( 623 "filter limit exceeds runtime max_limit {}", 624 self.max_limit 625 ))); 626 } 627 } 628 self.validate_pocket_query_complexity(filters)?; 629 Ok(()) 630 } 631 632 fn effective_pocket_filter_limit(self, filter: &PocketFilter) -> usize { 633 if filter.limit() == u32::MAX { 634 usize::try_from(self.default_limit).unwrap_or(usize::MAX) 635 } else { 636 usize::try_from(filter.limit()).unwrap_or(usize::MAX) 637 } 638 } 639 640 fn validate_pocket_query_complexity( 641 &self, 642 filters: &[PocketOwnedFilter], 643 ) -> Result<(), BaseRelayError> { 644 let score = filters 645 .iter() 646 .map(|filter| self.pocket_filter_complexity(filter)) 647 .fold(0_usize, usize::saturating_add); 648 if score > self.max_query_complexity { 649 return Err(BaseRelayError::invalid(format!( 650 "query complexity {score} exceeds runtime max_query_complexity {}", 651 self.max_query_complexity 652 ))); 653 } 654 Ok(()) 655 } 656 657 fn pocket_filter_complexity(&self, filter: &PocketFilter) -> usize { 658 let tag_score = filter 659 .tags() 660 .map(|tags| { 661 tags.iter() 662 .map(|tag| 1_usize.saturating_add(tag.skip(1).count())) 663 .fold(0_usize, usize::saturating_add) 664 }) 665 .unwrap_or(usize::MAX); 666 1_usize 667 .saturating_add(filter.num_ids()) 668 .saturating_add(filter.num_authors()) 669 .saturating_add(filter.num_kinds()) 670 .saturating_add(tag_score) 671 .saturating_add(usize::from( 672 filter.since() != tangle_store_pocket::PocketTime::min(), 673 )) 674 .saturating_add(usize::from( 675 filter.until() != tangle_store_pocket::PocketTime::max(), 676 )) 677 .saturating_add(self.effective_pocket_filter_limit(filter)) 678 } 679 } 680 681 impl BaseRelay { 682 pub(crate) fn unsupported_search_present_closed( 683 subscription_id: &SubscriptionId, 684 search_present: bool, 685 ) -> Option<RelayMessage> { 686 search_present.then(|| RelayMessage::Closed { 687 subscription_id: subscription_id.clone(), 688 message: "unsupported: search filters are not supported".to_owned(), 689 }) 690 } 691 692 fn redacted_req_closed( 693 subscription_id: SubscriptionId, 694 auth: &GroupAuthContext, 695 ) -> RelayMessage { 696 let message = if auth.authenticated_pubkeys().is_empty() { 697 BaseRelayError::auth_required("authentication required to read group events") 698 .prefixed_message() 699 } else { 700 BaseRelayError::restricted("group is unavailable").prefixed_message() 701 }; 702 RelayMessage::Closed { 703 subscription_id, 704 message, 705 } 706 } 707 708 pub fn open( 709 config: &PocketStoreConfig, 710 limits: BaseRelayLimits, 711 query: PocketQueryConfig, 712 ) -> Result<Self, BaseRelayError> { 713 let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?; 714 Self::new(store, limits, query) 715 } 716 717 pub fn open_with_groups( 718 config: &PocketStoreConfig, 719 limits: BaseRelayLimits, 720 groups: &GroupRuntimeConfig, 721 query: PocketQueryConfig, 722 ) -> Result<Self, BaseRelayError> { 723 let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?; 724 Self::new_with_groups(store, limits, groups, query) 725 } 726 727 pub fn new( 728 store: PocketStoreHandle, 729 limits: BaseRelayLimits, 730 query: PocketQueryConfig, 731 ) -> Result<Self, BaseRelayError> { 732 Self::new_with_groups(store, limits, &GroupRuntimeConfig::disabled(), query) 733 } 734 735 pub fn new_with_groups( 736 store: PocketStoreHandle, 737 limits: BaseRelayLimits, 738 groups: &GroupRuntimeConfig, 739 query: PocketQueryConfig, 740 ) -> Result<Self, BaseRelayError> { 741 let groups = GroupServiceHandle::from_config(&store, groups)?; 742 let subscriptions = 743 LiveSubscriptionSet::new(limits.max_pending_events(), limits.max_subscriptions())?; 744 let readiness = BaseRelayReadinessState::runtime_ready_before_bind(); 745 Ok(Self { 746 store, 747 subscriptions, 748 groups, 749 readiness, 750 limits, 751 query, 752 }) 753 } 754 755 #[cfg(test)] 756 pub fn handle_client_message( 757 &mut self, 758 message: ClientMessage, 759 auth: &mut BaseAuthState, 760 now: UnixTimestamp, 761 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 762 match message { 763 ClientMessage::Event(event) => self 764 .handle_event_with_auth(event, auth) 765 .map(|message| vec![message]), 766 ClientMessage::Req { 767 subscription_id, 768 filters, 769 } => self.handle_protocol_req_with_auth_for_test(subscription_id, filters, auth), 770 ClientMessage::Count { 771 subscription_id, 772 filters, 773 } => { 774 let search_present = filters.iter().any(|filter| filter.search().is_some()); 775 let filters = filters 776 .iter() 777 .map(tangle_filter_to_pocket) 778 .collect::<Result<Vec<_>, _>>()?; 779 self.handle_count_with_group_auth_report( 780 subscription_id, 781 filters, 782 search_present, 783 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 784 ) 785 .map(|report| vec![report.into_message()]) 786 } 787 ClientMessage::Close(subscription_id) => { 788 self.handle_close(&subscription_id); 789 Ok(Vec::new()) 790 } 791 ClientMessage::Auth(event) => Ok(self.handle_auth_message(event, auth, now)), 792 ClientMessage::NegOpen { 793 subscription_id, .. 794 } 795 | ClientMessage::NegMsg { 796 subscription_id, .. 797 } => Ok(vec![Self::disabled_negentropy_message(subscription_id)]), 798 ClientMessage::NegClose(_) => Ok(Vec::new()), 799 } 800 } 801 802 pub(crate) fn disabled_negentropy_message(subscription_id: SubscriptionId) -> RelayMessage { 803 RelayMessage::NegErr { 804 subscription_id, 805 message: NEGENTROPY_DISABLED_MESSAGE.to_owned(), 806 } 807 } 808 809 pub(crate) fn query_req_with_shared_services( 810 store: &PocketStoreHandle, 811 groups: Option<&GroupServiceHandle>, 812 limits: BaseRelayLimits, 813 query: PocketQueryConfig, 814 request: BaseRelayReqQuery<'_>, 815 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 816 let group_auth = 817 GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned()); 818 Self::query_req_with_group_auth_shared_services( 819 store, 820 groups, 821 limits, 822 query, 823 BaseRelayGroupReqQuery { 824 subscription_id: request.subscription_id, 825 filters: request.filters, 826 search_present: request.search_present, 827 auth: &group_auth, 828 }, 829 ) 830 } 831 832 fn event_by_offset(&self, offset: StoreOffset) -> Result<PocketOwnedEvent, BaseRelayError> { 833 self.store 834 .event_by_offset(offset.as_u64()) 835 .map_err(BaseRelayError::from) 836 } 837 838 pub fn event_by_offset_with_auth( 839 &self, 840 offset: StoreOffset, 841 auth: &BaseAuthState, 842 ) -> Result<Option<PocketOwnedEvent>, BaseRelayError> { 843 let event = self.event_by_offset(offset)?; 844 if Self::group_read_gate_visible_to_auth( 845 self.groups.as_ref(), 846 &event, 847 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 848 )? { 849 Ok(Some(event)) 850 } else { 851 Ok(None) 852 } 853 } 854 855 #[cfg(test)] 856 fn handle_auth_message( 857 &self, 858 event: Event, 859 auth: &mut BaseAuthState, 860 now: UnixTimestamp, 861 ) -> Vec<RelayMessage> { 862 Self::handle_auth_with_limits(self.limits, event, auth, now) 863 } 864 865 #[cfg(test)] 866 pub(crate) fn handle_auth_with_limits( 867 limits: BaseRelayLimits, 868 event: Event, 869 auth: &mut BaseAuthState, 870 now: UnixTimestamp, 871 ) -> Vec<RelayMessage> { 872 if let Err(error) = limits.validate_protocol_event_for_test(&event) { 873 return vec![RelayMessage::Ok { 874 event_id: event.id().clone(), 875 accepted: false, 876 message: error.prefixed_message(), 877 }]; 878 } 879 auth.authenticate(&event, now) 880 .map(|_| { 881 vec![RelayMessage::Ok { 882 event_id: event.id().clone(), 883 accepted: true, 884 message: String::new(), 885 }] 886 }) 887 .unwrap_or_else(|error| { 888 vec![RelayMessage::Ok { 889 event_id: event.id().clone(), 890 accepted: false, 891 message: error.prefixed_message(), 892 }] 893 }) 894 } 895 896 pub(crate) fn handle_pocket_auth_with_limits( 897 limits: BaseRelayLimits, 898 event: &PocketEvent, 899 auth: &mut BaseAuthState, 900 now: UnixTimestamp, 901 ) -> Vec<RelayMessage> { 902 let event_id = 903 pocket_runtime_event_id(event).expect("Pocket event id is valid hex by construction"); 904 if let Err(error) = limits.validate_pocket_event(event) { 905 return vec![RelayMessage::Ok { 906 event_id, 907 accepted: false, 908 message: error.prefixed_message(), 909 }]; 910 } 911 auth.authenticate_pocket(event, now) 912 .map(|_| { 913 vec![RelayMessage::Ok { 914 event_id: event_id.clone(), 915 accepted: true, 916 message: String::new(), 917 }] 918 }) 919 .unwrap_or_else(|error| { 920 vec![RelayMessage::Ok { 921 event_id, 922 accepted: false, 923 message: error.prefixed_message(), 924 }] 925 }) 926 } 927 928 #[cfg(test)] 929 pub fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { 930 self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated()) 931 .map(BaseRelayEventWrite::into_message) 932 } 933 934 #[cfg(test)] 935 pub fn handle_event_with_auth( 936 &self, 937 event: Event, 938 auth: &BaseAuthState, 939 ) -> Result<RelayMessage, BaseRelayError> { 940 self.handle_event_with_auth_report(event, auth) 941 .map(BaseRelayEventWrite::into_message) 942 } 943 944 pub fn handle_pocket_event(&self, event: &PocketEvent) -> Result<RelayMessage, BaseRelayError> { 945 self.handle_pocket_event_with_group_auth(event, &GroupAuthContext::unauthenticated()) 946 .map(BaseRelayEventWrite::into_message) 947 } 948 949 pub fn handle_pocket_event_with_auth( 950 &self, 951 event: &PocketEvent, 952 auth: &BaseAuthState, 953 ) -> Result<RelayMessage, BaseRelayError> { 954 self.handle_pocket_event_with_auth_report(event, auth) 955 .map(BaseRelayEventWrite::into_message) 956 } 957 958 #[cfg(test)] 959 pub(crate) fn handle_event_with_auth_report( 960 &self, 961 event: Event, 962 auth: &BaseAuthState, 963 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 964 Self::handle_event_with_shared_services( 965 &self.store, 966 self.groups.as_ref(), 967 self.limits, 968 event, 969 auth, 970 ) 971 } 972 973 #[cfg(test)] 974 pub(crate) fn handle_event_with_shared_services( 975 store: &PocketStoreHandle, 976 groups: Option<&GroupServiceHandle>, 977 limits: BaseRelayLimits, 978 event: Event, 979 auth: &BaseAuthState, 980 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 981 Self::handle_event_with_group_auth_and_services( 982 store, 983 groups, 984 limits, 985 event, 986 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 987 ) 988 } 989 990 pub(crate) fn handle_pocket_event_with_auth_report( 991 &self, 992 event: &PocketEvent, 993 auth: &BaseAuthState, 994 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 995 Self::handle_pocket_event_with_shared_services( 996 &self.store, 997 self.groups.as_ref(), 998 self.limits, 999 event, 1000 auth, 1001 ) 1002 } 1003 1004 pub(crate) fn handle_pocket_event_with_shared_services( 1005 store: &PocketStoreHandle, 1006 groups: Option<&GroupServiceHandle>, 1007 limits: BaseRelayLimits, 1008 event: &PocketEvent, 1009 auth: &BaseAuthState, 1010 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 1011 Self::handle_pocket_event_with_group_auth_and_services( 1012 store, 1013 groups, 1014 limits, 1015 event, 1016 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 1017 ) 1018 } 1019 1020 pub fn groups_enabled(&self) -> bool { 1021 self.groups.is_some() 1022 } 1023 1024 pub(crate) fn store_handle(&self) -> PocketStoreHandle { 1025 self.store.clone() 1026 } 1027 1028 pub fn group_projection(&self) -> Option<GroupProjectionReadGuard<'_>> { 1029 self.groups.as_ref().map(GroupServiceHandle::projection) 1030 } 1031 1032 pub(crate) fn group_service_handle(&self) -> Option<GroupServiceHandle> { 1033 self.groups.clone() 1034 } 1035 1036 pub(crate) fn group_outbox_pending_events(&self) -> usize { 1037 self.groups 1038 .as_ref() 1039 .map(GroupServiceHandle::outbox_pending_events) 1040 .unwrap_or(0) 1041 } 1042 1043 pub fn readiness_state(&self) -> BaseRelayReadinessState { 1044 self.readiness.clone() 1045 } 1046 1047 pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> { 1048 let closed = self.subscriptions.close_all(); 1049 self.store.sync()?; 1050 Ok(BaseRelayShutdownReport::new(closed)) 1051 } 1052 1053 #[cfg(test)] 1054 fn handle_event_with_group_auth( 1055 &self, 1056 event: Event, 1057 auth: &GroupAuthContext, 1058 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 1059 Self::handle_event_with_group_auth_and_services( 1060 &self.store, 1061 self.groups.as_ref(), 1062 self.limits, 1063 event, 1064 auth, 1065 ) 1066 } 1067 1068 #[cfg(test)] 1069 fn handle_event_with_group_auth_and_services( 1070 store: &PocketStoreHandle, 1071 groups: Option<&GroupServiceHandle>, 1072 limits: BaseRelayLimits, 1073 event: Event, 1074 auth: &GroupAuthContext, 1075 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 1076 let pocket_event = tangle_event_to_pocket(&event)?; 1077 Self::handle_pocket_event_with_group_auth_and_services( 1078 store, 1079 groups, 1080 limits, 1081 &pocket_event, 1082 auth, 1083 ) 1084 } 1085 1086 fn handle_pocket_event_with_group_auth( 1087 &self, 1088 event: &PocketEvent, 1089 auth: &GroupAuthContext, 1090 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 1091 Self::handle_pocket_event_with_group_auth_and_services( 1092 &self.store, 1093 self.groups.as_ref(), 1094 self.limits, 1095 event, 1096 auth, 1097 ) 1098 } 1099 1100 fn handle_pocket_event_with_group_auth_and_services( 1101 store: &PocketStoreHandle, 1102 groups: Option<&GroupServiceHandle>, 1103 limits: BaseRelayLimits, 1104 event: &PocketEvent, 1105 auth: &GroupAuthContext, 1106 ) -> Result<BaseRelayEventWrite, BaseRelayError> { 1107 let event_id = pocket_runtime_event_id(event)?; 1108 if let Err(error) = limits.validate_pocket_event(event) { 1109 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1110 event_id, 1111 error.prefixed_message(), 1112 ))); 1113 } 1114 if let Err(error) = verify_pocket_event_signature(event) { 1115 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1116 event_id, 1117 error.prefixed_message(), 1118 ))); 1119 } 1120 let pubkey = pocket_event_pubkey(event)?; 1121 if is_pocket_nip70_protected_event(event)? && !auth.contains(&pubkey) { 1122 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1123 event_id, 1124 BaseRelayError::auth_required( 1125 "protected event requires authenticated event author", 1126 ) 1127 .prefixed_message(), 1128 ))); 1129 } 1130 let group_limits = groups.map(GroupServiceHandle::limits).unwrap_or_default(); 1131 let audit_class = classify_group_event(event, group_limits).ok(); 1132 let class = match validate_client_group_event_structure(event, group_limits) { 1133 Ok(class) => class, 1134 Err(error) => { 1135 if let Some(class) = audit_class.as_ref() { 1136 logging::log_group_moderation_audit( 1137 event, 1138 class, 1139 TangleModerationAuditResult::Rejected, 1140 ); 1141 } 1142 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1143 event_id, 1144 error.prefixed_message(), 1145 ))); 1146 } 1147 }; 1148 if !matches!(class, GroupEventClass::NonGroup) { 1149 let Some(groups) = groups else { 1150 logging::log_group_moderation_audit( 1151 event, 1152 &class, 1153 TangleModerationAuditResult::Rejected, 1154 ); 1155 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1156 event_id, 1157 "blocked: NIP-29 group events are not accepted before group service".to_owned(), 1158 ))); 1159 }; 1160 match groups.store_group_pocket_event(store, event, &class, auth) { 1161 Ok(GroupEventWrite::Stored(stored_offsets)) => { 1162 logging::log_group_moderation_audit( 1163 event, 1164 &class, 1165 TangleModerationAuditResult::Accepted, 1166 ); 1167 return Ok(BaseRelayEventWrite::stored( 1168 ok_accepted(event_id, String::new()), 1169 stored_offsets, 1170 )); 1171 } 1172 Ok(GroupEventWrite::Duplicate) => { 1173 logging::log_group_moderation_audit( 1174 event, 1175 &class, 1176 TangleModerationAuditResult::Accepted, 1177 ); 1178 return Ok(BaseRelayEventWrite::unstored(ok_accepted( 1179 event_id, 1180 "duplicate: already have this event".to_owned(), 1181 ))); 1182 } 1183 Err(GroupEventWriteError::Rejected(error)) => { 1184 logging::log_group_moderation_audit( 1185 event, 1186 &class, 1187 TangleModerationAuditResult::Rejected, 1188 ); 1189 return Ok(BaseRelayEventWrite::unstored(ok_rejected( 1190 event_id, 1191 error.prefixed_message(), 1192 ))); 1193 } 1194 Err(GroupEventWriteError::Storage(error)) => return Err(error), 1195 } 1196 } 1197 if pocket_event_kind(event)?.is_ephemeral() { 1198 return Ok(BaseRelayEventWrite::unstored(ok_accepted( 1199 event_id, 1200 String::new(), 1201 ))); 1202 } 1203 if store.event_by_id(event.id())?.is_some() { 1204 return Ok(BaseRelayEventWrite::unstored(ok_accepted( 1205 event_id, 1206 "duplicate: already have this event".to_owned(), 1207 ))); 1208 } 1209 let store_offset = StoreOffset::new(store.store_event(event)?); 1210 Ok(BaseRelayEventWrite::stored( 1211 ok_accepted(event_id, String::new()), 1212 vec![store_offset], 1213 )) 1214 } 1215 1216 pub fn handle_pocket_req( 1217 &mut self, 1218 subscription_id: SubscriptionId, 1219 filters: Vec<PocketOwnedFilter>, 1220 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 1221 self.handle_pocket_req_with_group_auth( 1222 subscription_id, 1223 filters, 1224 &GroupAuthContext::unauthenticated(), 1225 ) 1226 } 1227 1228 #[cfg(test)] 1229 pub fn handle_protocol_req_for_test( 1230 &mut self, 1231 subscription_id: SubscriptionId, 1232 filters: Vec<Filter>, 1233 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1234 self.handle_protocol_req_with_group_auth_for_test( 1235 subscription_id, 1236 filters, 1237 &GroupAuthContext::unauthenticated(), 1238 ) 1239 } 1240 1241 #[cfg(test)] 1242 pub fn handle_protocol_req_with_auth_for_test( 1243 &mut self, 1244 subscription_id: SubscriptionId, 1245 filters: Vec<Filter>, 1246 auth: &BaseAuthState, 1247 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1248 self.handle_protocol_req_with_group_auth_for_test( 1249 subscription_id, 1250 filters, 1251 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 1252 ) 1253 } 1254 1255 #[cfg(test)] 1256 fn handle_protocol_req_with_group_auth_for_test( 1257 &mut self, 1258 subscription_id: SubscriptionId, 1259 filters: Vec<Filter>, 1260 auth: &GroupAuthContext, 1261 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 1262 self.handle_protocol_req_with_group_auth_report_for_test(subscription_id, filters, auth) 1263 .map(BaseRelayQueryReport::into_messages) 1264 .and_then(protocol_messages_for_test) 1265 } 1266 1267 pub fn handle_pocket_req_with_auth( 1268 &mut self, 1269 subscription_id: SubscriptionId, 1270 filters: Vec<PocketOwnedFilter>, 1271 auth: &BaseAuthState, 1272 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 1273 self.handle_pocket_req_with_group_auth( 1274 subscription_id, 1275 filters, 1276 &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), 1277 ) 1278 } 1279 1280 fn handle_pocket_req_with_group_auth( 1281 &mut self, 1282 subscription_id: SubscriptionId, 1283 filters: Vec<PocketOwnedFilter>, 1284 auth: &GroupAuthContext, 1285 ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { 1286 self.handle_pocket_req_with_group_auth_report(subscription_id, filters, false, auth) 1287 .map(BaseRelayQueryReport::into_messages) 1288 } 1289 1290 #[cfg(test)] 1291 fn handle_protocol_req_with_group_auth_report_for_test( 1292 &mut self, 1293 subscription_id: SubscriptionId, 1294 filters: Vec<Filter>, 1295 auth: &GroupAuthContext, 1296 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 1297 let search_present = filters.iter().any(|filter| filter.search().is_some()); 1298 let filters = filters 1299 .iter() 1300 .map(tangle_filter_to_pocket) 1301 .collect::<Result<Vec<_>, _>>()?; 1302 self.handle_pocket_req_with_group_auth_report( 1303 subscription_id, 1304 filters, 1305 search_present, 1306 auth, 1307 ) 1308 } 1309 1310 fn handle_pocket_req_with_group_auth_report( 1311 &mut self, 1312 subscription_id: SubscriptionId, 1313 filters: Vec<PocketOwnedFilter>, 1314 search_present: bool, 1315 auth: &GroupAuthContext, 1316 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 1317 self.limits.validate_subscription_id(&subscription_id)?; 1318 self.limits.validate_pocket_filters(&filters)?; 1319 if let Some(message) = 1320 Self::unsupported_search_present_closed(&subscription_id, search_present) 1321 { 1322 return Ok(BaseRelayQueryReport::new( 1323 vec![message.into()], 1324 false, 1325 BaseRelayQueryMetrics::default(), 1326 )); 1327 } 1328 let should_subscribe = !pocket_filters_are_complete(&filters); 1329 if should_subscribe { 1330 self.subscriptions 1331 .ensure_can_subscribe(&subscription_id, &filters)?; 1332 let report = self.query_req_with_group_auth_report( 1333 subscription_id.clone(), 1334 filters.clone(), 1335 false, 1336 auth, 1337 )?; 1338 if !report.group_read_denied() { 1339 self.subscriptions.subscribe(subscription_id, filters)?; 1340 } 1341 return Ok(report); 1342 } 1343 self.query_req_with_group_auth_report(subscription_id, filters, false, auth) 1344 } 1345 1346 fn query_req_with_group_auth_report( 1347 &self, 1348 subscription_id: SubscriptionId, 1349 filters: Vec<PocketOwnedFilter>, 1350 search_present: bool, 1351 auth: &GroupAuthContext, 1352 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 1353 Self::query_req_with_group_auth_shared_services( 1354 &self.store, 1355 self.groups.as_ref(), 1356 self.limits, 1357 self.query, 1358 BaseRelayGroupReqQuery { 1359 subscription_id, 1360 filters, 1361 search_present, 1362 auth, 1363 }, 1364 ) 1365 } 1366 1367 fn query_req_with_group_auth_shared_services( 1368 store: &PocketStoreHandle, 1369 groups: Option<&GroupServiceHandle>, 1370 limits: BaseRelayLimits, 1371 query: PocketQueryConfig, 1372 request: BaseRelayGroupReqQuery<'_>, 1373 ) -> Result<BaseRelayQueryReport, BaseRelayError> { 1374 let BaseRelayGroupReqQuery { 1375 subscription_id, 1376 filters, 1377 search_present, 1378 auth, 1379 } = request; 1380 limits.validate_subscription_id(&subscription_id)?; 1381 limits.validate_pocket_filters(&filters)?; 1382 if let Some(message) = 1383 Self::unsupported_search_present_closed(&subscription_id, search_present) 1384 { 1385 return Ok(BaseRelayQueryReport::new( 1386 vec![message.into()], 1387 false, 1388 BaseRelayQueryMetrics::default(), 1389 )); 1390 } 1391 let report = 1392 Self::query_events_report_with_services(store, groups, limits, query, &filters, auth)?; 1393 let group_read_denied = report.group_read_denied; 1394 let query_metrics = report.query_metrics; 1395 let mut messages = report 1396 .events 1397 .into_iter() 1398 .map(|event| RuntimeRelayMessage::event(subscription_id.clone(), event)) 1399 .collect::<Vec<_>>(); 1400 if group_read_denied { 1401 messages.push(Self::redacted_req_closed(subscription_id, auth).into()); 1402 } else { 1403 messages.push(RelayMessage::Eose(subscription_id).into()); 1404 } 1405 Ok(BaseRelayQueryReport::new( 1406 messages, 1407 group_read_denied, 1408 query_metrics, 1409 )) 1410 } 1411 1412 pub fn handle_count( 1413 &self, 1414 subscription_id: SubscriptionId, 1415 filters: Vec<PocketOwnedFilter>, 1416 ) -> Result<RelayMessage, BaseRelayError> { 1417 self.handle_count_with_group_auth( 1418 subscription_id, 1419 filters, 1420 &GroupAuthContext::unauthenticated(), 1421 ) 1422 } 1423 1424 pub fn handle_count_with_auth( 1425 &self, 1426 subscription_id: SubscriptionId, 1427 filters: Vec<PocketOwnedFilter>, 1428 auth: &BaseAuthState, 1429 ) -> Result<RelayMessage, BaseRelayError> { 1430 self.handle_count_with_auth_report(subscription_id, filters, auth) 1431 .map(BaseRelayCountReport::into_message) 1432 } 1433 1434 pub(crate) fn handle_count_with_auth_report( 1435 &self, 1436 subscription_id: SubscriptionId, 1437 filters: Vec<PocketOwnedFilter>, 1438 auth: &BaseAuthState, 1439 ) -> Result<BaseRelayCountReport, BaseRelayError> { 1440 Self::handle_count_with_shared_services( 1441 &self.store, 1442 self.groups.as_ref(), 1443 self.limits, 1444 self.query, 1445 BaseRelayCountQuery::new(subscription_id, filters, false, auth), 1446 ) 1447 } 1448 1449 pub(crate) fn handle_count_with_shared_services( 1450 store: &PocketStoreHandle, 1451 groups: Option<&GroupServiceHandle>, 1452 limits: BaseRelayLimits, 1453 query: PocketQueryConfig, 1454 request: BaseRelayCountQuery<'_>, 1455 ) -> Result<BaseRelayCountReport, BaseRelayError> { 1456 let group_auth = 1457 GroupAuthContext::new(request.auth.authenticated_pubkeys().iter().cloned()); 1458 Self::handle_count_with_group_auth_shared_services( 1459 store, 1460 groups, 1461 limits, 1462 query, 1463 BaseRelayGroupCountQuery { 1464 subscription_id: request.subscription_id, 1465 filters: request.filters, 1466 search_present: request.search_present, 1467 auth: &group_auth, 1468 }, 1469 ) 1470 } 1471 1472 fn handle_count_with_group_auth( 1473 &self, 1474 subscription_id: SubscriptionId, 1475 filters: Vec<PocketOwnedFilter>, 1476 auth: &GroupAuthContext, 1477 ) -> Result<RelayMessage, BaseRelayError> { 1478 self.handle_count_with_group_auth_report(subscription_id, filters, false, auth) 1479 .map(BaseRelayCountReport::into_message) 1480 } 1481 1482 fn handle_count_with_group_auth_report( 1483 &self, 1484 subscription_id: SubscriptionId, 1485 filters: Vec<PocketOwnedFilter>, 1486 search_present: bool, 1487 auth: &GroupAuthContext, 1488 ) -> Result<BaseRelayCountReport, BaseRelayError> { 1489 Self::handle_count_with_group_auth_shared_services( 1490 &self.store, 1491 self.groups.as_ref(), 1492 self.limits, 1493 self.query, 1494 BaseRelayGroupCountQuery { 1495 subscription_id, 1496 filters, 1497 search_present, 1498 auth, 1499 }, 1500 ) 1501 } 1502 1503 fn handle_count_with_group_auth_shared_services( 1504 store: &PocketStoreHandle, 1505 groups: Option<&GroupServiceHandle>, 1506 limits: BaseRelayLimits, 1507 query: PocketQueryConfig, 1508 request: BaseRelayGroupCountQuery<'_>, 1509 ) -> Result<BaseRelayCountReport, BaseRelayError> { 1510 let BaseRelayGroupCountQuery { 1511 subscription_id, 1512 filters, 1513 search_present, 1514 auth, 1515 } = request; 1516 limits.validate_subscription_id(&subscription_id)?; 1517 limits.validate_pocket_filters(&filters)?; 1518 if let Some(message) = 1519 Self::unsupported_search_present_closed(&subscription_id, search_present) 1520 { 1521 return Ok(BaseRelayCountReport::new( 1522 message, 1523 false, 1524 BaseRelayQueryMetrics::default(), 1525 )); 1526 } 1527 let report = 1528 Self::count_events_report_with_services(store, groups, limits, query, &filters, auth)?; 1529 Ok(BaseRelayCountReport::new( 1530 RelayMessage::Count { 1531 subscription_id, 1532 count: report.count, 1533 hll: report.hll, 1534 }, 1535 report.group_read_denied, 1536 report.query_metrics, 1537 )) 1538 } 1539 1540 pub fn handle_close(&mut self, subscription_id: &SubscriptionId) -> CloseResult { 1541 self.subscriptions.close(subscription_id) 1542 } 1543 1544 pub fn fanout_pocket(&mut self, event: &PocketEvent) -> Vec<RuntimeRelayMessage> { 1545 self.fanout_pocket_with_group_auth(event, &GroupAuthContext::unauthenticated()) 1546 } 1547 1548 pub fn fanout_pocket_with_group_auth( 1549 &mut self, 1550 event: &PocketEvent, 1551 auth: &GroupAuthContext, 1552 ) -> Vec<RuntimeRelayMessage> { 1553 let groups = self.groups.as_ref(); 1554 self.subscriptions 1555 .fanout(event, auth, |event, auth| { 1556 Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false) 1557 }) 1558 .expect("Pocket live fanout must match") 1559 .into_iter() 1560 .map(|subscription_id| RuntimeRelayMessage::Event { 1561 subscription_id, 1562 event: event.to_owned(), 1563 }) 1564 .collect() 1565 } 1566 1567 #[cfg(test)] 1568 pub fn fanout_protocol_for_test(&mut self, event: &Event) -> Vec<RelayMessage> { 1569 self.fanout_protocol_with_group_auth_for_test(event, &GroupAuthContext::unauthenticated()) 1570 } 1571 1572 #[cfg(test)] 1573 pub fn fanout_protocol_with_group_auth_for_test( 1574 &mut self, 1575 event: &Event, 1576 auth: &GroupAuthContext, 1577 ) -> Vec<RelayMessage> { 1578 let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket"); 1579 protocol_messages_for_test(self.fanout_pocket_with_group_auth(&pocket_event, auth)) 1580 .expect("test protocol fanout must convert") 1581 } 1582 1583 pub fn active_subscription_count(&self) -> usize { 1584 self.subscriptions.active_count() 1585 } 1586 1587 fn query_events_report_with_services( 1588 store: &PocketStoreHandle, 1589 groups: Option<&GroupServiceHandle>, 1590 limits: BaseRelayLimits, 1591 query: PocketQueryConfig, 1592 filters: &[PocketOwnedFilter], 1593 auth: &GroupAuthContext, 1594 ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { 1595 let mut output = Vec::new(); 1596 let mut group_read_denied = false; 1597 let mut query_metrics = BaseRelayQueryMetrics::default(); 1598 for filter in filters { 1599 let report = Self::query_filter_events_report_with_services( 1600 store, 1601 groups, 1602 limits, 1603 query, 1604 filter, 1605 auth, 1606 BaseRelayFilterLimitMode::ApplyDefaultLimit, 1607 )?; 1608 group_read_denied |= report.group_read_denied; 1609 query_metrics = query_metrics.add(report.query_metrics); 1610 let mut events = Self::sort_and_dedupe_query_events(report.events); 1611 events.truncate(limits.effective_pocket_filter_limit(filter)); 1612 output.extend(events); 1613 } 1614 let events = Self::sort_and_dedupe_query_events(output); 1615 query_metrics = query_metrics.with_returned_events(events.len()); 1616 Ok(BaseRelayEventQueryReport::new( 1617 events, 1618 group_read_denied, 1619 query_metrics, 1620 )) 1621 } 1622 1623 fn count_events_report_with_services( 1624 store: &PocketStoreHandle, 1625 groups: Option<&GroupServiceHandle>, 1626 limits: BaseRelayLimits, 1627 query: PocketQueryConfig, 1628 filters: &[PocketOwnedFilter], 1629 auth: &GroupAuthContext, 1630 ) -> Result<BaseRelayCountEventsReport, BaseRelayError> { 1631 let mut seen = BTreeSet::new(); 1632 let mut group_read_denied = false; 1633 let mut query_metrics = BaseRelayQueryMetrics::default(); 1634 let count_query = query.exact_count(); 1635 let mut hll = BaseRelayCountHll::new(filters)?; 1636 hll.suppress_for_filter_targets(groups, filters); 1637 for filter in filters { 1638 let report = Self::query_filter_events_report_with_services( 1639 store, 1640 groups, 1641 limits, 1642 count_query, 1643 filter, 1644 auth, 1645 BaseRelayFilterLimitMode::PreserveCountLimitless, 1646 )?; 1647 group_read_denied |= report.group_read_denied; 1648 if report.group_read_denied { 1649 hll.suppress(); 1650 } 1651 query_metrics = query_metrics.add(report.query_metrics); 1652 for event in report.events { 1653 let event: &PocketEvent = &event; 1654 hll.observe(groups, event)?; 1655 seen.insert(event.id()); 1656 } 1657 } 1658 let count = u64::try_from(seen.len()) 1659 .map_err(|_| BaseRelayError::error("visible event count overflow"))?; 1660 let hll = hll.into_hex(); 1661 Ok(BaseRelayCountEventsReport::new( 1662 count, 1663 hll, 1664 group_read_denied, 1665 query_metrics, 1666 )) 1667 } 1668 1669 fn count_hll_offset(filters: &[PocketOwnedFilter]) -> Result<Option<usize>, BaseRelayError> { 1670 let [filter] = filters else { 1671 return Ok(None); 1672 }; 1673 filter 1674 .hyperloglog_offset() 1675 .map_err(|error| BaseRelayError::error(error.to_string())) 1676 } 1677 1678 fn event_suppresses_count_hll( 1679 groups: Option<&GroupServiceHandle>, 1680 event: &PocketEvent, 1681 ) -> Result<bool, BaseRelayError> { 1682 let Some(groups) = groups else { 1683 return Ok(false); 1684 }; 1685 let class = classify_group_event(event, groups.limits()).map_err(BaseRelayError::from)?; 1686 let Some(group_id) = class.group_id() else { 1687 return Ok(false); 1688 }; 1689 let projection = groups.projection(); 1690 let Some(group) = projection.group(group_id) else { 1691 return Ok(true); 1692 }; 1693 Ok(projection.tombstone(group_id).is_some() 1694 || group.metadata().private() 1695 || group.metadata().hidden()) 1696 } 1697 1698 fn count_hll_filter_target_policy( 1699 groups: Option<&GroupServiceHandle>, 1700 filter: &PocketFilter, 1701 ) -> BaseRelayCountHllTargetPolicy { 1702 let Some(groups) = groups else { 1703 return if Self::count_hll_filter_has_group_target(filter) { 1704 BaseRelayCountHllTargetPolicy::Suppress 1705 } else { 1706 BaseRelayCountHllTargetPolicy::Eligible 1707 }; 1708 }; 1709 match Self::count_hll_group_targets( 1710 filter, 1711 usize::from(groups.limits().max_group_id_bytes()), 1712 ) { 1713 BaseRelayCountHllGroupTargets::None => BaseRelayCountHllTargetPolicy::Eligible, 1714 BaseRelayCountHllGroupTargets::Suppress => BaseRelayCountHllTargetPolicy::Suppress, 1715 BaseRelayCountHllGroupTargets::Targets(group_ids) => { 1716 let projection = groups.projection(); 1717 if group_ids.iter().all(|group_id| { 1718 projection.group(group_id).is_some_and(|group| { 1719 projection.tombstone(group_id).is_none() 1720 && !group.metadata().private() 1721 && !group.metadata().hidden() 1722 }) 1723 }) { 1724 BaseRelayCountHllTargetPolicy::Eligible 1725 } else { 1726 BaseRelayCountHllTargetPolicy::Suppress 1727 } 1728 } 1729 } 1730 } 1731 1732 fn count_hll_group_targets( 1733 filter: &PocketFilter, 1734 max_group_id_bytes: usize, 1735 ) -> BaseRelayCountHllGroupTargets { 1736 let Ok(tags) = filter.tags() else { 1737 return BaseRelayCountHllGroupTargets::Suppress; 1738 }; 1739 let d_tag_mode = Self::count_hll_filter_d_tag_mode(filter); 1740 let mut group_ids = Vec::new(); 1741 for tag in tags.iter() { 1742 let mut values = tag.into_iter(); 1743 let Some(name) = values.next() else { 1744 continue; 1745 }; 1746 if name == b"d" { 1747 match d_tag_mode { 1748 BaseRelayCountHllDTagMode::Ignore => continue, 1749 BaseRelayCountHllDTagMode::Suppress => { 1750 return BaseRelayCountHllGroupTargets::Suppress; 1751 } 1752 BaseRelayCountHllDTagMode::Target => {} 1753 } 1754 } else if name != b"h" { 1755 continue; 1756 } 1757 let mut found_value = false; 1758 for value in values { 1759 found_value = true; 1760 let Ok(value) = std::str::from_utf8(value) else { 1761 return BaseRelayCountHllGroupTargets::Suppress; 1762 }; 1763 let Ok(group_id) = GroupId::new_with_max_bytes(value, max_group_id_bytes) else { 1764 return BaseRelayCountHllGroupTargets::Suppress; 1765 }; 1766 group_ids.push(group_id); 1767 } 1768 if !found_value { 1769 return BaseRelayCountHllGroupTargets::Suppress; 1770 } 1771 } 1772 if group_ids.is_empty() { 1773 BaseRelayCountHllGroupTargets::None 1774 } else { 1775 group_ids.sort(); 1776 group_ids.dedup(); 1777 BaseRelayCountHllGroupTargets::Targets(group_ids) 1778 } 1779 } 1780 1781 fn count_hll_filter_has_group_target(filter: &PocketFilter) -> bool { 1782 let Ok(tags) = filter.tags() else { 1783 return true; 1784 }; 1785 let d_tag_mode = Self::count_hll_filter_d_tag_mode(filter); 1786 tags.iter().any(|tag| { 1787 let mut values = tag.into_iter(); 1788 let name = values.next(); 1789 matches!(name, Some(b"h")) 1790 || (matches!( 1791 d_tag_mode, 1792 BaseRelayCountHllDTagMode::Target | BaseRelayCountHllDTagMode::Suppress 1793 ) && matches!(name, Some(b"d"))) 1794 }) 1795 } 1796 1797 fn count_hll_filter_d_tag_mode(filter: &PocketFilter) -> BaseRelayCountHllDTagMode { 1798 if filter.num_kinds() == 0 { 1799 return BaseRelayCountHllDTagMode::Suppress; 1800 } 1801 if filter 1802 .kinds() 1803 .any(|kind| NIP29_RELAY_GENERATED_KIND_VALUES.contains(&u32::from(kind.as_u16()))) 1804 { 1805 BaseRelayCountHllDTagMode::Target 1806 } else { 1807 BaseRelayCountHllDTagMode::Ignore 1808 } 1809 } 1810 1811 fn query_filter_events_report_with_services( 1812 store: &PocketStoreHandle, 1813 groups: Option<&GroupServiceHandle>, 1814 limits: BaseRelayLimits, 1815 query: PocketQueryConfig, 1816 filter: &PocketFilter, 1817 auth: &GroupAuthContext, 1818 limit_mode: BaseRelayFilterLimitMode, 1819 ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { 1820 let pocket_filter = Self::pocket_filter_with_limit_mode(limits, filter, limit_mode)?; 1821 let screen_error = RefCell::new(None); 1822 let candidates_scanned = Cell::new(0_u64); 1823 let redacted_events = Cell::new(0_u64); 1824 let screened = store.find_events_with_screen(&pocket_filter, query, |pocket_event| { 1825 candidates_scanned.set(candidates_scanned.get().saturating_add(1)); 1826 if screen_error.borrow().is_some() { 1827 return PocketScreenResult::Mismatch; 1828 } 1829 match pocket_filter.event_matches(pocket_event) { 1830 Ok(false) => PocketScreenResult::Mismatch, 1831 Ok(true) => { 1832 match Self::group_read_gate_visible_to_auth(groups, pocket_event, auth) { 1833 Ok(true) => PocketScreenResult::Match, 1834 Ok(false) => { 1835 redacted_events.set(redacted_events.get().saturating_add(1)); 1836 PocketScreenResult::Redacted 1837 } 1838 Err(error) => { 1839 *screen_error.borrow_mut() = Some(error); 1840 PocketScreenResult::Mismatch 1841 } 1842 } 1843 } 1844 Err(error) => { 1845 *screen_error.borrow_mut() = Some(BaseRelayError::error(error.to_string())); 1846 PocketScreenResult::Mismatch 1847 } 1848 } 1849 })?; 1850 if let Some(error) = screen_error.into_inner() { 1851 return Err(error); 1852 } 1853 let group_read_denied = screened.redacted(); 1854 let events = screened.into_events(); 1855 Ok(BaseRelayEventQueryReport::new( 1856 events, 1857 group_read_denied, 1858 BaseRelayQueryMetrics::new(candidates_scanned.get(), 0, redacted_events.get()), 1859 )) 1860 } 1861 1862 fn pocket_filter_with_limit_mode( 1863 limits: BaseRelayLimits, 1864 filter: &PocketFilter, 1865 limit_mode: BaseRelayFilterLimitMode, 1866 ) -> Result<PocketOwnedFilter, BaseRelayError> { 1867 let limit = match (limit_mode, filter.limit()) { 1868 (BaseRelayFilterLimitMode::ApplyDefaultLimit, u32::MAX) => { 1869 u32::try_from(limits.default_limit) 1870 .map_err(|_| BaseRelayError::invalid("default filter limit exceeds u32"))? 1871 } 1872 (BaseRelayFilterLimitMode::PreserveCountLimitless, _) => u32::MAX, 1873 (_, limit) => limit, 1874 }; 1875 let ids = filter.ids().collect::<Vec<_>>(); 1876 let authors = filter.authors().collect::<Vec<_>>(); 1877 let kinds = filter.kinds().collect::<Vec<_>>(); 1878 let since = 1879 (filter.since() != tangle_store_pocket::PocketTime::min()).then(|| filter.since()); 1880 let until = 1881 (filter.until() != tangle_store_pocket::PocketTime::max()).then(|| filter.until()); 1882 let limit = (limit != u32::MAX).then_some(limit); 1883 PocketOwnedFilter::new( 1884 &ids, 1885 &authors, 1886 &kinds, 1887 filter 1888 .tags() 1889 .map_err(|error| BaseRelayError::error(error.to_string()))?, 1890 since, 1891 until, 1892 limit, 1893 ) 1894 .map_err(|error| BaseRelayError::error(error.to_string())) 1895 } 1896 1897 fn sort_and_dedupe_query_events(mut events: Vec<PocketOwnedEvent>) -> Vec<PocketOwnedEvent> { 1898 events.sort_by(|left, right| { 1899 let left: &PocketEvent = left; 1900 let right: &PocketEvent = right; 1901 right 1902 .created_at() 1903 .cmp(&left.created_at()) 1904 .then_with(|| left.id().cmp(&right.id())) 1905 }); 1906 let mut seen = BTreeSet::new(); 1907 events 1908 .into_iter() 1909 .filter(|event| { 1910 let event: &PocketEvent = event; 1911 seen.insert(event.id()) 1912 }) 1913 .collect() 1914 } 1915 1916 pub(crate) fn group_read_gate_visible_to_auth( 1917 groups: Option<&GroupServiceHandle>, 1918 event: &(impl GroupEventView + ?Sized), 1919 auth: &GroupAuthContext, 1920 ) -> Result<bool, BaseRelayError> { 1921 groups 1922 .map(|groups| groups.event_visible_to_auth(event, auth)) 1923 .unwrap_or(Ok(true)) 1924 .map_err(BaseRelayError::from) 1925 } 1926 } 1927 1928 fn pocket_filters_are_complete(filters: &[PocketOwnedFilter]) -> bool { 1929 !filters.is_empty() && filters.iter().all(|filter| filter.completes()) 1930 } 1931 1932 #[cfg(test)] 1933 mod tests { 1934 use super::{ 1935 BaseRelay, BaseRelayCountHll, BaseRelayCountHllTargetPolicy, BaseRelayLimitSettings, 1936 BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE, 1937 }; 1938 use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket}; 1939 use crate::relay::auth::BaseAuthState; 1940 use crate::relay::live::CloseResult; 1941 use tangle_crypto::RelaySigner; 1942 use tangle_groups::{ 1943 GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, 1944 KIND_GROUP_CREATE_INVITE, KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, 1945 KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, 1946 KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, 1947 MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset, 1948 parse_group_runtime_config_json, 1949 }; 1950 use tangle_protocol::{ 1951 ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, 1952 SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, filter_from_value, 1953 }; 1954 use tangle_store_pocket::{ 1955 PocketEvent, PocketHll8, PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, 1956 PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, PocketTime, 1957 }; 1958 1959 trait BaseRelayCountTestExt { 1960 fn handle_count_protocol( 1961 &self, 1962 subscription_id: SubscriptionId, 1963 filters: Vec<Filter>, 1964 ) -> Result<RelayMessage, crate::errors::BaseRelayError>; 1965 1966 fn handle_count_with_auth_protocol( 1967 &self, 1968 subscription_id: SubscriptionId, 1969 filters: Vec<Filter>, 1970 auth: &BaseAuthState, 1971 ) -> Result<RelayMessage, crate::errors::BaseRelayError>; 1972 } 1973 1974 impl BaseRelayCountTestExt for BaseRelay { 1975 fn handle_count_protocol( 1976 &self, 1977 subscription_id: SubscriptionId, 1978 filters: Vec<Filter>, 1979 ) -> Result<RelayMessage, crate::errors::BaseRelayError> { 1980 let search_present = filters.iter().any(|filter| filter.search().is_some()); 1981 let filters = pocket_filters(filters)?; 1982 self.handle_count_with_group_auth_report( 1983 subscription_id, 1984 filters, 1985 search_present, 1986 &GroupAuthContext::unauthenticated(), 1987 ) 1988 .map(|report| report.into_message()) 1989 } 1990 1991 fn handle_count_with_auth_protocol( 1992 &self, 1993 subscription_id: SubscriptionId, 1994 filters: Vec<Filter>, 1995 auth: &BaseAuthState, 1996 ) -> Result<RelayMessage, crate::errors::BaseRelayError> { 1997 let search_present = filters.iter().any(|filter| filter.search().is_some()); 1998 let filters = pocket_filters(filters)?; 1999 let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()); 2000 self.handle_count_with_group_auth_report( 2001 subscription_id, 2002 filters, 2003 search_present, 2004 &group_auth, 2005 ) 2006 .map(|report| report.into_message()) 2007 } 2008 } 2009 2010 fn pocket_filters( 2011 filters: Vec<Filter>, 2012 ) -> Result<Vec<PocketOwnedFilter>, crate::errors::BaseRelayError> { 2013 filters.iter().map(tangle_filter_to_pocket).collect() 2014 } 2015 2016 #[test] 2017 fn base_relay_stores_queries_counts_closes_and_fans_out_public_events() { 2018 let mut relay = test_relay("base-relay-public", 4); 2019 let event = signed_public_event(7, 1, Vec::new(), "hello"); 2020 let subscription_id = SubscriptionId::new("sub-a").expect("sub"); 2021 let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter"); 2022 2023 assert_eq!( 2024 relay.handle_event(event.clone()).expect("event"), 2025 RelayMessage::Ok { 2026 event_id: event.id().clone(), 2027 accepted: true, 2028 message: String::new() 2029 } 2030 ); 2031 assert_eq!( 2032 relay.handle_event(event.clone()).expect("duplicate"), 2033 RelayMessage::Ok { 2034 event_id: event.id().clone(), 2035 accepted: true, 2036 message: "duplicate: already have this event".to_owned() 2037 } 2038 ); 2039 2040 let messages = relay 2041 .handle_protocol_req_for_test(subscription_id.clone(), vec![filter.clone()]) 2042 .expect("req"); 2043 assert!( 2044 matches!(&messages[0], RelayMessage::Event { event: found, .. } if found.id() == event.id()) 2045 ); 2046 assert_eq!(messages[1], RelayMessage::Eose(subscription_id.clone())); 2047 assert_eq!( 2048 relay 2049 .handle_count_protocol(subscription_id.clone(), vec![filter]) 2050 .expect("count"), 2051 RelayMessage::Count { 2052 subscription_id: subscription_id.clone(), 2053 count: 1, 2054 hll: None 2055 } 2056 ); 2057 assert!(matches!( 2058 relay.fanout_protocol_for_test(&event).as_slice(), 2059 [RelayMessage::Event { subscription_id: delivered, event: found }] 2060 if delivered == &subscription_id && found.id() == event.id() 2061 )); 2062 assert_eq!(relay.handle_close(&subscription_id), CloseResult::Closed); 2063 assert_eq!(relay.active_subscription_count(), 0); 2064 assert!(relay.fanout_protocol_for_test(&event).is_empty()); 2065 } 2066 2067 #[test] 2068 fn base_relay_uses_configured_pocket_query_scrape_controls() { 2069 let strict_config = test_store_config("base-relay-query-strict"); 2070 let mut strict = BaseRelay::open( 2071 &strict_config, 2072 relay_limits(4), 2073 PocketQueryConfig::new(false, 0, 0), 2074 ) 2075 .expect("strict"); 2076 let strict_event = signed_public_event(7, 1, Vec::new(), "strict"); 2077 let broad = filter_from_value(&serde_json::json!({"limit":1})).expect("filter"); 2078 2079 assert_accepted( 2080 strict 2081 .handle_event(strict_event.clone()) 2082 .expect("strict event"), 2083 &strict_event, 2084 ); 2085 assert!( 2086 strict 2087 .handle_protocol_req_for_test( 2088 SubscriptionId::new("strict").expect("sub"), 2089 vec![broad.clone()] 2090 ) 2091 .expect_err("strict scrape") 2092 .prefixed_message() 2093 .to_lowercase() 2094 .contains("scraper") 2095 ); 2096 2097 let limited_config = test_store_config("base-relay-query-limited"); 2098 let mut limited = BaseRelay::open( 2099 &limited_config, 2100 relay_limits(4), 2101 PocketQueryConfig::new(false, 1, 0), 2102 ) 2103 .expect("limited"); 2104 let limited_event = signed_public_event(8, 1, Vec::new(), "limited"); 2105 2106 assert_accepted( 2107 limited 2108 .handle_event(limited_event.clone()) 2109 .expect("limited event"), 2110 &limited_event, 2111 ); 2112 let messages = limited 2113 .handle_protocol_req_for_test(SubscriptionId::new("limited").expect("sub"), vec![broad]) 2114 .expect("limited scrape"); 2115 2116 assert!( 2117 matches!(&messages[0], RelayMessage::Event { event, .. } if event.id() == limited_event.id()) 2118 ); 2119 } 2120 2121 #[test] 2122 fn base_relay_rejects_search_req_and_count_as_unsupported() { 2123 let mut relay = test_relay("base-relay-search-unsupported", 4); 2124 let req_id = SubscriptionId::new("search-req").expect("req"); 2125 let count_id = SubscriptionId::new("search-count").expect("count"); 2126 let search = filter_from_value(&serde_json::json!({ 2127 "search": "fresh carrots", 2128 "limit": 1 2129 })) 2130 .expect("filter"); 2131 2132 assert_eq!( 2133 relay 2134 .handle_protocol_req_for_test(req_id.clone(), vec![search.clone()]) 2135 .expect("req"), 2136 vec![RelayMessage::Closed { 2137 subscription_id: req_id, 2138 message: "unsupported: search filters are not supported".to_owned() 2139 }] 2140 ); 2141 assert_eq!(relay.active_subscription_count(), 0); 2142 assert_eq!( 2143 relay 2144 .handle_count_protocol(count_id.clone(), vec![search]) 2145 .expect("count"), 2146 RelayMessage::Closed { 2147 subscription_id: count_id, 2148 message: "unsupported: search filters are not supported".to_owned() 2149 } 2150 ); 2151 } 2152 2153 #[test] 2154 fn base_relay_dispatch_returns_disabled_negentropy_surface() { 2155 let mut relay = test_relay("base-relay-negentropy-disabled", 4); 2156 let mut auth = 2157 BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 2158 let subscription_id = SubscriptionId::new("neg-sub").expect("sub"); 2159 2160 assert_eq!( 2161 relay 2162 .handle_client_message( 2163 ClientMessage::NegOpen { 2164 subscription_id: subscription_id.clone(), 2165 filter: Filter::empty(), 2166 message: "00".to_owned() 2167 }, 2168 &mut auth, 2169 UnixTimestamp::new(100) 2170 ) 2171 .expect("neg open"), 2172 vec![RelayMessage::NegErr { 2173 subscription_id: subscription_id.clone(), 2174 message: NEGENTROPY_DISABLED_MESSAGE.to_owned() 2175 }] 2176 ); 2177 assert_eq!( 2178 relay 2179 .handle_client_message( 2180 ClientMessage::NegMsg { 2181 subscription_id: subscription_id.clone(), 2182 message: String::new() 2183 }, 2184 &mut auth, 2185 UnixTimestamp::new(101) 2186 ) 2187 .expect("neg msg"), 2188 vec![RelayMessage::NegErr { 2189 subscription_id: subscription_id.clone(), 2190 message: NEGENTROPY_DISABLED_MESSAGE.to_owned() 2191 }] 2192 ); 2193 assert_eq!( 2194 relay 2195 .handle_client_message( 2196 ClientMessage::NegClose(subscription_id), 2197 &mut auth, 2198 UnixTimestamp::new(102) 2199 ) 2200 .expect("neg close"), 2201 Vec::<RelayMessage>::new() 2202 ); 2203 } 2204 2205 #[test] 2206 fn base_relay_disabled_negentropy_does_not_validate_or_screen_filter() { 2207 let owner = signer(7).public_key().clone(); 2208 let owner_auth = authenticated_state(7); 2209 let mut auth = 2210 BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 2211 let mut relay = test_relay_with_groups( 2212 "base-relay-negentropy-disabled-no-screen", 2213 4, 2214 &enabled_groups_for_owner(&owner), 2215 ); 2216 let private_create = signed_private_group_create_event(7, "PrivateNegentropy"); 2217 assert_accepted( 2218 relay 2219 .handle_event_with_auth(private_create.clone(), &owner_auth) 2220 .expect("private create"), 2221 &private_create, 2222 ); 2223 let private_event = signed_event_at( 2224 7, 2225 1, 2226 vec![h("PrivateNegentropy")], 2227 "private negentropy", 2228 1_714_124_434, 2229 ); 2230 assert_accepted( 2231 relay 2232 .handle_event_with_auth(private_event.clone(), &owner_auth) 2233 .expect("private event"), 2234 &private_event, 2235 ); 2236 let subscription_id = SubscriptionId::new("neg-noscreen").expect("sub"); 2237 let filter = filter_from_value(&serde_json::json!({ 2238 "kinds": [1], 2239 "#h": ["PrivateNegentropy"], 2240 "limit": 501 2241 })) 2242 .expect("filter"); 2243 2244 assert_eq!( 2245 relay 2246 .handle_client_message( 2247 ClientMessage::NegOpen { 2248 subscription_id: subscription_id.clone(), 2249 filter, 2250 message: "00".to_owned() 2251 }, 2252 &mut auth, 2253 UnixTimestamp::new(100) 2254 ) 2255 .expect("neg open"), 2256 vec![RelayMessage::NegErr { 2257 subscription_id: subscription_id.clone(), 2258 message: NEGENTROPY_DISABLED_MESSAGE.to_owned() 2259 }] 2260 ); 2261 assert_eq!( 2262 relay 2263 .handle_client_message( 2264 ClientMessage::NegMsg { 2265 subscription_id: subscription_id.clone(), 2266 message: "should-not-touch-storage".to_owned() 2267 }, 2268 &mut auth, 2269 UnixTimestamp::new(101) 2270 ) 2271 .expect("neg msg"), 2272 vec![RelayMessage::NegErr { 2273 subscription_id, 2274 message: NEGENTROPY_DISABLED_MESSAGE.to_owned() 2275 }] 2276 ); 2277 } 2278 2279 #[test] 2280 fn base_relay_fetches_events_by_store_offset() { 2281 let relay = test_relay("base-relay-offset-lookup", 4); 2282 let event = signed_public_event(7, 1, Vec::new(), "offset"); 2283 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 2284 let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store")); 2285 2286 let found = relay.event_by_offset(offset).expect("offset"); 2287 let found: &PocketEvent = &found; 2288 assert_eq!(found.id().as_hex_string(), event.id().as_str()); 2289 } 2290 2291 #[test] 2292 fn base_relay_req_merges_filters_with_order_dedupe_and_limits() { 2293 let mut relay = test_relay("base-relay-req-order", 8); 2294 let market_tag = Tag::from_parts("t", &["market"]).expect("tag"); 2295 let old_market = 2296 signed_event_at(7, 1, vec![market_tag.clone()], "old market", 1_714_124_433); 2297 let tied_author = 2298 signed_event_at(7, 1, vec![market_tag.clone()], "tied author", 1_714_124_434); 2299 let tied_other = 2300 signed_event_at(8, 1, vec![market_tag.clone()], "tied other", 1_714_124_434); 2301 let kind_two = signed_event_at(7, 2, Vec::new(), "kind two", 1_714_124_435); 2302 let wrong_tag = signed_event_at( 2303 9, 2304 1, 2305 vec![Tag::from_parts("t", &["other"]).expect("tag")], 2306 "wrong tag", 2307 1_714_124_436, 2308 ); 2309 2310 for event in [ 2311 &old_market, 2312 &tied_other, 2313 &kind_two, 2314 &wrong_tag, 2315 &tied_author, 2316 ] { 2317 assert_accepted(relay.handle_event(event.clone()).expect("event"), event); 2318 } 2319 2320 let subscription_id = SubscriptionId::new("req-order").expect("sub"); 2321 let market_limit = 2322 filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2})) 2323 .expect("market filter"); 2324 let author_limit = filter_from_value(&serde_json::json!({ 2325 "authors":[tied_author.unsigned().pubkey().as_str()], 2326 "kinds":[1,2], 2327 "limit":2 2328 })) 2329 .expect("author filter"); 2330 let messages = relay 2331 .handle_protocol_req_for_test(subscription_id.clone(), vec![market_limit, author_limit]) 2332 .expect("req"); 2333 let mut tied = [tied_author.clone(), tied_other.clone()]; 2334 tied.sort_by(|left, right| left.id().cmp(right.id())); 2335 let expected = [kind_two.clone(), tied[0].clone(), tied[1].clone()]; 2336 2337 assert_eq!(messages.len(), expected.len() + 1); 2338 for (message, event) in messages.iter().zip(expected.iter()) { 2339 assert!(matches!( 2340 message, 2341 RelayMessage::Event { 2342 subscription_id: actual, 2343 event: found 2344 } if actual == &subscription_id && found.id() == event.id() 2345 )); 2346 } 2347 assert_eq!( 2348 messages.last(), 2349 Some(&RelayMessage::Eose(subscription_id.clone())) 2350 ); 2351 assert!(!messages.iter().any(|message| matches!( 2352 message, 2353 RelayMessage::Event { event, .. } 2354 if event.id() == old_market.id() || event.id() == wrong_tag.id() 2355 ))); 2356 } 2357 2358 #[test] 2359 fn base_relay_req_count_paths_preserve_chorus_parity() { 2360 let owner = signer(7).public_key().clone(); 2361 let auth = authenticated_state(7); 2362 let outsider_auth = authenticated_state(8); 2363 let mut relay = test_relay_with_groups( 2364 "base-relay-req-count-chorus-parity", 2365 8, 2366 &enabled_groups_for_owner(&owner), 2367 ); 2368 let market_tag = Tag::from_parts("t", &["market"]).expect("tag"); 2369 let old_market = 2370 signed_event_at(7, 1, vec![market_tag.clone()], "old market", 1_714_124_433); 2371 let tied_author = 2372 signed_event_at(7, 1, vec![market_tag.clone()], "tied author", 1_714_124_434); 2373 let tied_other = 2374 signed_event_at(8, 1, vec![market_tag.clone()], "tied other", 1_714_124_434); 2375 let kind_two = signed_event_at(7, 2, Vec::new(), "kind two", 1_714_124_435); 2376 let wrong_tag = signed_event_at( 2377 9, 2378 1, 2379 vec![Tag::from_parts("t", &["other"]).expect("tag")], 2380 "wrong tag", 2381 1_714_124_436, 2382 ); 2383 for event in [ 2384 &old_market, 2385 &tied_other, 2386 &kind_two, 2387 &wrong_tag, 2388 &tied_author, 2389 ] { 2390 assert_accepted(relay.handle_event(event.clone()).expect("event"), event); 2391 } 2392 relay 2393 .handle_event_with_auth(signed_private_group_create_event(7, "Private"), &auth) 2394 .expect("private create"); 2395 let private_market = signed_event_at( 2396 7, 2397 1, 2398 vec![h("Private"), market_tag.clone()], 2399 "private market", 2400 1_714_124_437, 2401 ); 2402 assert_accepted( 2403 relay 2404 .handle_event_with_auth(private_market.clone(), &auth) 2405 .expect("private event"), 2406 &private_market, 2407 ); 2408 2409 let subscription_id = SubscriptionId::new("req-count-parity").expect("sub"); 2410 let market_limit = 2411 filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2})) 2412 .expect("market filter"); 2413 let author_limit = filter_from_value(&serde_json::json!({ 2414 "authors":[tied_author.unsigned().pubkey().as_str()], 2415 "kinds":[1,2], 2416 "limit":2 2417 })) 2418 .expect("author filter"); 2419 let messages = relay 2420 .handle_protocol_req_for_test( 2421 subscription_id.clone(), 2422 vec![market_limit.clone(), author_limit.clone()], 2423 ) 2424 .expect("req"); 2425 let mut tied = [tied_author.clone(), tied_other.clone()]; 2426 tied.sort_by(|left, right| left.id().cmp(right.id())); 2427 let expected = [kind_two.clone(), tied[0].clone(), tied[1].clone()]; 2428 let event_ids = messages 2429 .iter() 2430 .filter_map(|message| match message { 2431 RelayMessage::Event { 2432 subscription_id: actual, 2433 event, 2434 } if actual == &subscription_id => Some(event.id().clone()), 2435 _ => None, 2436 }) 2437 .collect::<Vec<_>>(); 2438 let expected_ids = expected 2439 .iter() 2440 .map(|event| event.id().clone()) 2441 .collect::<Vec<_>>(); 2442 2443 assert_eq!(event_ids, expected_ids); 2444 assert_eq!( 2445 messages.last(), 2446 Some(&RelayMessage::Closed { 2447 subscription_id: subscription_id.clone(), 2448 message: "auth-required: authentication required to read group events".to_owned() 2449 }) 2450 ); 2451 assert!(!messages.iter().any( 2452 |message| matches!(message, RelayMessage::Eose(actual) if actual == &subscription_id) 2453 )); 2454 assert!(!event_ids.contains(private_market.id())); 2455 assert!(!event_ids.contains(old_market.id())); 2456 assert!(!event_ids.contains(wrong_tag.id())); 2457 assert_eq!(relay.active_subscription_count(), 0); 2458 2459 let restricted_sub = SubscriptionId::new("restricted-screened").expect("sub"); 2460 let restricted_messages = relay 2461 .handle_protocol_req_with_auth_for_test( 2462 restricted_sub.clone(), 2463 vec![market_limit.clone(), author_limit.clone()], 2464 &outsider_auth, 2465 ) 2466 .expect("restricted req"); 2467 let restricted_event_ids = restricted_messages 2468 .iter() 2469 .filter_map(|message| match message { 2470 RelayMessage::Event { 2471 subscription_id: actual, 2472 event, 2473 } if actual == &restricted_sub => Some(event.id().clone()), 2474 _ => None, 2475 }) 2476 .collect::<Vec<_>>(); 2477 assert_eq!(restricted_event_ids, expected_ids); 2478 assert_eq!( 2479 restricted_messages.last(), 2480 Some(&RelayMessage::Closed { 2481 subscription_id: restricted_sub.clone(), 2482 message: "restricted: group is unavailable".to_owned() 2483 }) 2484 ); 2485 assert!(!restricted_messages.iter().any( 2486 |message| matches!(message, RelayMessage::Eose(actual) if actual == &restricted_sub) 2487 )); 2488 assert_eq!(relay.active_subscription_count(), 0); 2489 2490 let private_sub = SubscriptionId::new("private-screened").expect("sub"); 2491 assert_eq!( 2492 relay 2493 .handle_protocol_req_for_test( 2494 private_sub.clone(), 2495 vec![filter_group_tag(1, "h", "Private")] 2496 ) 2497 .expect("private unauth req"), 2498 vec![RelayMessage::Closed { 2499 subscription_id: private_sub, 2500 message: "auth-required: authentication required to read group events".to_owned() 2501 }] 2502 ); 2503 assert_eq!(relay.active_subscription_count(), 0); 2504 let private_auth_sub = SubscriptionId::new("private-auth").expect("sub"); 2505 assert!(matches!( 2506 relay 2507 .handle_protocol_req_with_auth_for_test( 2508 private_auth_sub.clone(), 2509 vec![filter_group_tag(1, "h", "Private")], 2510 &auth 2511 ) 2512 .expect("private auth req") 2513 .as_slice(), 2514 [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)] 2515 if subscription_id == &private_auth_sub && event.id() == private_market.id() && eose == &private_auth_sub 2516 )); 2517 2518 let market_notes = 2519 filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":10})) 2520 .expect("market count filter"); 2521 let author_events = filter_from_value(&serde_json::json!({ 2522 "authors":[tied_author.unsigned().pubkey().as_str()], 2523 "kinds":[1,2], 2524 "limit":10 2525 })) 2526 .expect("author count filter"); 2527 assert_eq!( 2528 relay 2529 .handle_count_protocol( 2530 SubscriptionId::new("count-visible").expect("sub"), 2531 vec![market_notes.clone(), author_events.clone()] 2532 ) 2533 .expect("visible count"), 2534 RelayMessage::Count { 2535 subscription_id: SubscriptionId::new("count-visible").expect("sub"), 2536 count: 4, 2537 hll: None 2538 } 2539 ); 2540 assert_eq!( 2541 relay 2542 .handle_count_with_auth_protocol( 2543 SubscriptionId::new("count-auth").expect("sub"), 2544 vec![market_notes, author_events], 2545 &auth 2546 ) 2547 .expect("auth count"), 2548 RelayMessage::Count { 2549 subscription_id: SubscriptionId::new("count-auth").expect("sub"), 2550 count: 5, 2551 hll: None 2552 } 2553 ); 2554 2555 let too_large_limit = 2556 filter_from_value(&serde_json::json!({"limit":501})).expect("limit filter"); 2557 assert!( 2558 relay 2559 .handle_protocol_req_for_test( 2560 SubscriptionId::new("limit-req").expect("sub"), 2561 vec![too_large_limit.clone()] 2562 ) 2563 .expect_err("req limit") 2564 .prefixed_message() 2565 .contains("max_limit 500") 2566 ); 2567 assert!( 2568 relay 2569 .handle_count_protocol( 2570 SubscriptionId::new("limit-count").expect("sub"), 2571 vec![too_large_limit] 2572 ) 2573 .expect_err("count limit") 2574 .prefixed_message() 2575 .contains("max_limit 500") 2576 ); 2577 2578 let search = filter_from_value(&serde_json::json!({"search":"carrots","limit":1})) 2579 .expect("search filter"); 2580 let search_req = SubscriptionId::new("search-req").expect("sub"); 2581 assert_eq!( 2582 relay 2583 .handle_protocol_req_for_test(search_req.clone(), vec![search.clone()]) 2584 .expect("search req"), 2585 vec![RelayMessage::Closed { 2586 subscription_id: search_req, 2587 message: "unsupported: search filters are not supported".to_owned() 2588 }] 2589 ); 2590 let search_count = SubscriptionId::new("search-count").expect("sub"); 2591 assert_eq!( 2592 relay 2593 .handle_count_protocol(search_count.clone(), vec![search]) 2594 .expect("search count"), 2595 RelayMessage::Closed { 2596 subscription_id: search_count, 2597 message: "unsupported: search filters are not supported".to_owned() 2598 } 2599 ); 2600 } 2601 2602 #[test] 2603 fn base_relay_enforces_runtime_limits() { 2604 let config = test_store_config("base-relay-runtime-limits"); 2605 let mut relay = BaseRelay::open( 2606 &config, 2607 BaseRelayLimits::new(BaseRelayLimitSettings { 2608 max_pending_events: 2, 2609 max_subscription_id_length: 3, 2610 max_subscriptions: 1, 2611 max_filters_per_request: 1, 2612 max_tag_values_per_filter: 1, 2613 max_query_complexity: 4, 2614 max_event_tags: 1, 2615 max_content_length: 4, 2616 max_limit: 2, 2617 default_limit: 1, 2618 }) 2619 .expect("limits"), 2620 PocketQueryConfig::default(), 2621 ) 2622 .expect("relay"); 2623 let first = signed_event_at(7, 1, Vec::new(), "one", 1_714_124_430); 2624 let second = signed_event_at(8, 1, Vec::new(), "two", 1_714_124_431); 2625 2626 assert_accepted(relay.handle_event(first.clone()).expect("first"), &first); 2627 assert_accepted(relay.handle_event(second.clone()).expect("second"), &second); 2628 2629 let limited = relay 2630 .handle_protocol_req_for_test( 2631 SubscriptionId::new("lim").expect("sub"), 2632 vec![Filter::empty()], 2633 ) 2634 .expect("limited"); 2635 assert_eq!( 2636 limited 2637 .iter() 2638 .filter(|message| matches!(message, RelayMessage::Event { .. })) 2639 .count(), 2640 1 2641 ); 2642 assert_eq!( 2643 relay.handle_close(&SubscriptionId::new("lim").expect("sub")), 2644 CloseResult::Closed 2645 ); 2646 2647 assert!( 2648 relay 2649 .handle_protocol_req_for_test( 2650 SubscriptionId::new("long").expect("sub"), 2651 vec![Filter::empty()] 2652 ) 2653 .expect_err("subscription id length") 2654 .prefixed_message() 2655 .contains("max_subid_length 3") 2656 ); 2657 assert!( 2658 relay 2659 .handle_count_protocol( 2660 SubscriptionId::new("cnt").expect("sub"), 2661 vec![Filter::empty(), Filter::empty()] 2662 ) 2663 .expect_err("filter count") 2664 .prefixed_message() 2665 .contains("max_filters_per_request 1") 2666 ); 2667 assert!( 2668 relay 2669 .handle_count_protocol( 2670 SubscriptionId::new("tag").expect("sub"), 2671 vec![ 2672 filter_from_value(&serde_json::json!({"#t":["one", "two"]})) 2673 .expect("filter") 2674 ] 2675 ) 2676 .expect_err("tag values") 2677 .prefixed_message() 2678 .contains("max_tag_values_per_filter 1") 2679 ); 2680 assert!( 2681 relay 2682 .handle_count_protocol( 2683 SubscriptionId::new("max").expect("sub"), 2684 vec![filter_from_value(&serde_json::json!({"limit":3})).expect("filter")] 2685 ) 2686 .expect_err("max limit") 2687 .prefixed_message() 2688 .contains("max_limit 2") 2689 ); 2690 2691 let too_many_tags = signed_event_at( 2692 9, 2693 1, 2694 vec![ 2695 Tag::from_parts("t", &["one"]).expect("tag"), 2696 Tag::from_parts("p", &["two"]).expect("tag"), 2697 ], 2698 "ok", 2699 1_714_124_432, 2700 ); 2701 assert!(matches!( 2702 relay.handle_event(too_many_tags).expect("tags"), 2703 RelayMessage::Ok { accepted: false, message, .. } 2704 if message.contains("max_event_tags 1") 2705 )); 2706 2707 let too_much_content = signed_event_at(10, 1, Vec::new(), "12345", 1_714_124_433); 2708 assert!(matches!( 2709 relay.handle_event(too_much_content).expect("content"), 2710 RelayMessage::Ok { accepted: false, message, .. } 2711 if message.contains("max_content_length 4") 2712 )); 2713 } 2714 2715 #[test] 2716 fn base_relay_rejects_over_budget_req_and_count() { 2717 let config = test_store_config("base-relay-query-complexity"); 2718 let mut relay = BaseRelay::open( 2719 &config, 2720 BaseRelayLimits::new(BaseRelayLimitSettings { 2721 max_pending_events: 4, 2722 max_subscription_id_length: 64, 2723 max_subscriptions: 64, 2724 max_filters_per_request: 10, 2725 max_tag_values_per_filter: 10, 2726 max_query_complexity: 4, 2727 max_event_tags: 200, 2728 max_content_length: 65_536, 2729 max_limit: 10, 2730 default_limit: 1, 2731 }) 2732 .expect("limits"), 2733 PocketQueryConfig::default(), 2734 ) 2735 .expect("relay"); 2736 let complex = filter_from_value(&serde_json::json!({ 2737 "kinds": [1], 2738 "#t": ["market"], 2739 "limit": 2 2740 })) 2741 .expect("filter"); 2742 2743 assert!( 2744 relay 2745 .handle_protocol_req_for_test( 2746 SubscriptionId::new("req").expect("sub"), 2747 vec![complex.clone()] 2748 ) 2749 .expect_err("req complexity") 2750 .prefixed_message() 2751 .contains("max_query_complexity 4") 2752 ); 2753 assert_eq!(relay.active_subscription_count(), 0); 2754 assert!( 2755 relay 2756 .handle_count_protocol(SubscriptionId::new("cnt").expect("sub"), vec![complex]) 2757 .expect_err("count complexity") 2758 .prefixed_message() 2759 .contains("max_query_complexity 4") 2760 ); 2761 } 2762 2763 #[test] 2764 fn base_relay_count_dedupes_overlapping_visible_filters() { 2765 let relay = test_relay("base-relay-count-dedupe", 8); 2766 let market_tag = Tag::from_parts("t", &["market"]).expect("tag"); 2767 let first = signed_event_at(7, 1, vec![market_tag.clone()], "first", 1_714_124_433); 2768 let second = signed_event_at(8, 1, vec![market_tag], "second", 1_714_124_434); 2769 let third = signed_event_at(7, 2, Vec::new(), "third", 1_714_124_435); 2770 2771 for event in [&first, &second, &third] { 2772 assert_accepted(relay.handle_event(event.clone()).expect("event"), event); 2773 } 2774 2775 let market_notes = 2776 filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2})) 2777 .expect("market filter"); 2778 let author_events = filter_from_value(&serde_json::json!({ 2779 "authors":[first.unsigned().pubkey().as_str()], 2780 "kinds":[1,2], 2781 "limit":10 2782 })) 2783 .expect("author filter"); 2784 let limited_market = 2785 filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":1})) 2786 .expect("limited filter"); 2787 2788 assert_eq!( 2789 relay 2790 .handle_count_protocol( 2791 SubscriptionId::new("count-limit").expect("sub"), 2792 vec![limited_market] 2793 ) 2794 .expect("count"), 2795 RelayMessage::Count { 2796 subscription_id: SubscriptionId::new("count-limit").expect("sub"), 2797 count: 2, 2798 hll: None 2799 } 2800 ); 2801 2802 assert_eq!( 2803 relay 2804 .handle_count_protocol( 2805 SubscriptionId::new("count-dedupe").expect("sub"), 2806 vec![market_notes, author_events] 2807 ) 2808 .expect("count"), 2809 RelayMessage::Count { 2810 subscription_id: SubscriptionId::new("count-dedupe").expect("sub"), 2811 count: 3, 2812 hll: None 2813 } 2814 ); 2815 } 2816 2817 #[test] 2818 fn base_relay_count_hll_emits_for_public_single_filter() { 2819 let relay = test_relay("base-relay-count-hll-public", 8); 2820 let target = "a".repeat(EventId::HEX_LENGTH); 2821 let target_tag = Tag::from_parts("e", &[&target]).expect("tag"); 2822 let first = signed_pocket_public_event(7, 7, vec![target_tag.clone()], "first reaction"); 2823 let second = signed_pocket_public_event(8, 7, vec![target_tag], "second reaction"); 2824 2825 for event in [&first, &second] { 2826 assert_pocket_accepted(relay.handle_pocket_event(event).expect("event"), event); 2827 } 2828 2829 let RelayMessage::Count { count, hll, .. } = relay 2830 .handle_count_protocol( 2831 SubscriptionId::new("count-hll-public").expect("sub"), 2832 vec![ 2833 filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) 2834 .expect("filter"), 2835 ], 2836 ) 2837 .expect("count") 2838 else { 2839 panic!("count expected") 2840 }; 2841 let hll = hll.expect("hll"); 2842 2843 assert_eq!(count, 2); 2844 assert_eq!(hll.len(), 512); 2845 assert_ne!(hll, "00".repeat(256)); 2846 } 2847 2848 #[test] 2849 fn base_relay_count_hll_omits_for_private_hidden_unknown_limited_multi_and_redacted_counts() { 2850 let owner = signer(7).public_key().clone(); 2851 let owner_auth = authenticated_state(7); 2852 let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 2853 let relay = test_relay_with_groups( 2854 "base-relay-count-hll-omits", 2855 8, 2856 &enabled_groups_for_owner(&owner), 2857 ); 2858 let target = "b".repeat(EventId::HEX_LENGTH); 2859 let target_tag = Tag::from_parts("e", &[&target]).expect("tag"); 2860 let public = signed_pocket_public_event(8, 7, vec![target_tag.clone()], "public reaction"); 2861 2862 assert_pocket_accepted(relay.handle_pocket_event(&public).expect("public"), &public); 2863 let private_create = signed_pocket_private_group_create_event(7, "PrivateHll"); 2864 assert_pocket_accepted( 2865 relay 2866 .handle_pocket_event_with_auth(&private_create, &owner_auth) 2867 .expect("private create"), 2868 &private_create, 2869 ); 2870 let private = signed_pocket_event_at_tags( 2871 7, 2872 7, 2873 vec![h("PrivateHll"), target_tag.clone()], 2874 "private reaction", 2875 1_714_124_434, 2876 ); 2877 assert_pocket_accepted( 2878 relay 2879 .handle_pocket_event_with_auth(&private, &owner_auth) 2880 .expect("private reaction"), 2881 &private, 2882 ); 2883 let hidden_create = signed_pocket_group_create_event_with_tags( 2884 7, 2885 "HiddenHll", 2886 vec![hidden()], 2887 1_714_124_435, 2888 ); 2889 assert_pocket_accepted( 2890 relay 2891 .handle_pocket_event_with_auth(&hidden_create, &owner_auth) 2892 .expect("hidden create"), 2893 &hidden_create, 2894 ); 2895 let hidden = signed_pocket_event_at_tags( 2896 7, 2897 7, 2898 vec![h("HiddenHll"), target_tag.clone()], 2899 "hidden reaction", 2900 1_714_124_436, 2901 ); 2902 assert_pocket_accepted( 2903 relay 2904 .handle_pocket_event_with_auth(&hidden, &owner_auth) 2905 .expect("hidden reaction"), 2906 &hidden, 2907 ); 2908 let deleted_create = signed_pocket_group_create_event(7, "DeletedHll"); 2909 assert_pocket_accepted( 2910 relay 2911 .handle_pocket_event_with_auth(&deleted_create, &owner_auth) 2912 .expect("deleted create"), 2913 &deleted_create, 2914 ); 2915 let deleted = signed_pocket_event_at_tags( 2916 7, 2917 7, 2918 vec![h("DeletedHll"), target_tag.clone()], 2919 "deleted reaction", 2920 1_714_124_438, 2921 ); 2922 assert_pocket_accepted( 2923 relay 2924 .handle_pocket_event_with_auth(&deleted, &owner_auth) 2925 .expect("deleted reaction"), 2926 &deleted, 2927 ); 2928 let delete_group = signed_pocket_event_at_tags( 2929 7, 2930 KIND_GROUP_DELETE_GROUP, 2931 vec![h("DeletedHll")], 2932 "", 2933 1_714_124_439, 2934 ); 2935 assert_pocket_accepted( 2936 relay 2937 .handle_pocket_event_with_auth(&delete_group, &owner_auth) 2938 .expect("delete group"), 2939 &delete_group, 2940 ); 2941 let unknown = signed_pocket_event_at_tags( 2942 7, 2943 7, 2944 vec![h("UnknownHll"), target_tag.clone()], 2945 "unknown reaction", 2946 1_714_124_440, 2947 ); 2948 relay.store.store_event(&unknown).expect("store unknown"); 2949 2950 let authorized_private = relay 2951 .handle_count_with_auth_protocol( 2952 SubscriptionId::new("count-hll-authorized-private").expect("sub"), 2953 vec![ 2954 filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target.clone()]})) 2955 .expect("filter"), 2956 ], 2957 &owner_auth, 2958 ) 2959 .expect("authorized private count"); 2960 assert!(matches!( 2961 authorized_private, 2962 RelayMessage::Count { 2963 count: 3, 2964 hll: None, 2965 .. 2966 } 2967 )); 2968 2969 let multi_filter = relay 2970 .handle_count_with_auth_protocol( 2971 SubscriptionId::new("count-hll-multi-filter").expect("sub"), 2972 vec![ 2973 filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target.clone()]})) 2974 .expect("filter"), 2975 filter_from_value(&serde_json::json!({"kinds":[7],"#e":["c".repeat(64)]})) 2976 .expect("filter"), 2977 ], 2978 &owner_auth, 2979 ) 2980 .expect("multi count"); 2981 assert!(matches!( 2982 multi_filter, 2983 RelayMessage::Count { 2984 count: 3, 2985 hll: None, 2986 .. 2987 } 2988 )); 2989 2990 let limited = relay 2991 .handle_count_with_auth_protocol( 2992 SubscriptionId::new("count-hll-limited").expect("sub"), 2993 vec![ 2994 filter_from_value( 2995 &serde_json::json!({"kinds":[7],"#e":[target.clone()],"limit":1}), 2996 ) 2997 .expect("filter"), 2998 ], 2999 &owner_auth, 3000 ) 3001 .expect("limited count"); 3002 assert!(matches!( 3003 limited, 3004 RelayMessage::Count { 3005 count: 3, 3006 hll: None, 3007 .. 3008 } 3009 )); 3010 3011 let redacted = relay 3012 .handle_count_with_auth_protocol( 3013 SubscriptionId::new("count-hll-redacted").expect("sub"), 3014 vec![ 3015 filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) 3016 .expect("filter"), 3017 ], 3018 &unauth, 3019 ) 3020 .expect("redacted count"); 3021 assert!(matches!( 3022 redacted, 3023 RelayMessage::Count { 3024 count: 1, 3025 hll: None, 3026 .. 3027 } 3028 )); 3029 3030 assert_count_without_hll( 3031 &relay, 3032 "count-hll-private-h-target", 3033 serde_json::json!({"kinds":[7],"#h":["PrivateHll"]}), 3034 None, 3035 0, 3036 ); 3037 assert_count_without_hll( 3038 &relay, 3039 "count-hll-hidden-h-target", 3040 serde_json::json!({"kinds":[7],"#h":["HiddenHll"]}), 3041 None, 3042 0, 3043 ); 3044 assert_count_without_hll( 3045 &relay, 3046 "count-hll-unknown-h-target", 3047 serde_json::json!({"kinds":[7],"#h":["UnknownHll"]}), 3048 None, 3049 0, 3050 ); 3051 assert_count_without_hll( 3052 &relay, 3053 "count-hll-deleted-h-target", 3054 serde_json::json!({"kinds":[7],"#h":["DeletedHll"]}), 3055 None, 3056 0, 3057 ); 3058 assert_count_without_hll( 3059 &relay, 3060 "count-hll-private-d-target", 3061 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PrivateHll"]}), 3062 None, 3063 1, 3064 ); 3065 assert_count_without_hll( 3066 &relay, 3067 "count-hll-hidden-d-target", 3068 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["HiddenHll"]}), 3069 None, 3070 0, 3071 ); 3072 assert_count_without_hll( 3073 &relay, 3074 "count-hll-unknown-d-target", 3075 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["UnknownHll"]}), 3076 None, 3077 0, 3078 ); 3079 assert_count_without_hll( 3080 &relay, 3081 "count-hll-deleted-d-target", 3082 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["DeletedHll"]}), 3083 None, 3084 0, 3085 ); 3086 } 3087 3088 #[test] 3089 fn base_relay_count_hll_group_target_policy_classifies_h_and_d_targets() { 3090 let owner = signer(7).public_key().clone(); 3091 let owner_auth = authenticated_state(7); 3092 let relay = test_relay_with_groups( 3093 "base-relay-count-hll-target-policy", 3094 8, 3095 &enabled_groups_for_owner(&owner), 3096 ); 3097 for event in [ 3098 signed_pocket_group_create_event(7, "PublicHll"), 3099 signed_pocket_group_create_event(7, "SecondHll"), 3100 signed_pocket_private_group_create_event(7, "PrivateHll"), 3101 signed_pocket_group_create_event_with_tags( 3102 7, 3103 "HiddenHll", 3104 vec![hidden()], 3105 1_714_124_435, 3106 ), 3107 signed_pocket_group_create_event(7, "DeletedHll"), 3108 ] { 3109 assert_pocket_accepted( 3110 relay 3111 .handle_pocket_event_with_auth(&event, &owner_auth) 3112 .expect("group create"), 3113 &event, 3114 ); 3115 } 3116 let delete_group = signed_pocket_event_at_tags( 3117 7, 3118 KIND_GROUP_DELETE_GROUP, 3119 vec![h("DeletedHll")], 3120 "", 3121 1_714_124_436, 3122 ); 3123 assert_pocket_accepted( 3124 relay 3125 .handle_pocket_event_with_auth(&delete_group, &owner_auth) 3126 .expect("delete group"), 3127 &delete_group, 3128 ); 3129 3130 assert_eq!( 3131 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["PublicHll"]})), 3132 BaseRelayCountHllTargetPolicy::Eligible 3133 ); 3134 assert_eq!( 3135 hll_target_policy( 3136 &relay, 3137 serde_json::json!({"kinds":[7],"#h":["PublicHll","SecondHll"]}) 3138 ), 3139 BaseRelayCountHllTargetPolicy::Eligible 3140 ); 3141 assert_eq!( 3142 hll_target_policy( 3143 &relay, 3144 serde_json::json!({"kinds":[7],"#h":["PublicHll","PrivateHll"]}) 3145 ), 3146 BaseRelayCountHllTargetPolicy::Suppress 3147 ); 3148 assert_eq!( 3149 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["HiddenHll"]})), 3150 BaseRelayCountHllTargetPolicy::Suppress 3151 ); 3152 assert_eq!( 3153 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["DeletedHll"]})), 3154 BaseRelayCountHllTargetPolicy::Suppress 3155 ); 3156 assert_eq!( 3157 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["UnknownHll"]})), 3158 BaseRelayCountHllTargetPolicy::Suppress 3159 ); 3160 assert_eq!( 3161 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":[""]})), 3162 BaseRelayCountHllTargetPolicy::Suppress 3163 ); 3164 assert_eq!( 3165 hll_target_policy( 3166 &relay, 3167 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PublicHll"]}) 3168 ), 3169 BaseRelayCountHllTargetPolicy::Eligible 3170 ); 3171 assert_eq!( 3172 hll_target_policy( 3173 &relay, 3174 serde_json::json!({"kinds":[KIND_GROUP_METADATA],"#d":["PrivateHll"]}) 3175 ), 3176 BaseRelayCountHllTargetPolicy::Suppress 3177 ); 3178 assert_eq!( 3179 hll_target_policy(&relay, serde_json::json!({"#d":["PublicHll"]})), 3180 BaseRelayCountHllTargetPolicy::Suppress 3181 ); 3182 assert_eq!( 3183 hll_target_policy( 3184 &relay, 3185 serde_json::json!({"kinds":[30023],"#d":["PrivateHll"]}) 3186 ), 3187 BaseRelayCountHllTargetPolicy::Eligible 3188 ); 3189 3190 let mut private_hll = count_hll_for_target_policy_test(); 3191 let private_filter = [pocket_filter_from_value( 3192 serde_json::json!({"kinds":[7],"#h":["PrivateHll"]}), 3193 )]; 3194 private_hll.suppress_for_filter_targets(relay.groups.as_ref(), &private_filter); 3195 assert!(private_hll.into_hex().is_none()); 3196 3197 let mut non_group_hll = count_hll_for_target_policy_test(); 3198 let non_group_filter = [pocket_filter_from_value( 3199 serde_json::json!({"kinds":[30023],"#d":["PrivateHll"]}), 3200 )]; 3201 non_group_hll.suppress_for_filter_targets(relay.groups.as_ref(), &non_group_filter); 3202 assert!(non_group_hll.into_hex().is_some()); 3203 } 3204 3205 #[test] 3206 fn base_relay_count_hll_group_target_policy_suppresses_unresolved_group_targets() { 3207 let relay = test_relay("base-relay-count-hll-target-policy-no-groups", 8); 3208 3209 assert_eq!( 3210 hll_target_policy(&relay, serde_json::json!({"kinds":[7],"#h":["PublicHll"]})), 3211 BaseRelayCountHllTargetPolicy::Suppress 3212 ); 3213 assert_eq!( 3214 hll_target_policy(&relay, serde_json::json!({"#d":["PublicHll"]})), 3215 BaseRelayCountHllTargetPolicy::Suppress 3216 ); 3217 assert_eq!( 3218 hll_target_policy( 3219 &relay, 3220 serde_json::json!({"kinds":[30023],"#d":["PublicHll"]}) 3221 ), 3222 BaseRelayCountHllTargetPolicy::Eligible 3223 ); 3224 } 3225 3226 #[test] 3227 fn base_relay_count_does_not_apply_default_or_client_limits() { 3228 let config = test_store_config("base-relay-count-no-default-limit"); 3229 let relay = BaseRelay::open( 3230 &config, 3231 BaseRelayLimits::new(BaseRelayLimitSettings { 3232 max_pending_events: 4, 3233 max_subscription_id_length: 64, 3234 max_subscriptions: 64, 3235 max_filters_per_request: 10, 3236 max_tag_values_per_filter: 10, 3237 max_query_complexity: 4, 3238 max_event_tags: 200, 3239 max_content_length: 65_536, 3240 max_limit: 10, 3241 default_limit: 1, 3242 }) 3243 .expect("limits"), 3244 PocketQueryConfig::default(), 3245 ) 3246 .expect("relay"); 3247 let first = signed_event_at(7, 1, Vec::new(), "first", 1_714_124_433); 3248 let second = signed_event_at(7, 1, Vec::new(), "second", 1_714_124_434); 3249 let third = signed_event_at(7, 1, Vec::new(), "third", 1_714_124_435); 3250 3251 for event in [&first, &second, &third] { 3252 assert_accepted(relay.handle_event(event.clone()).expect("event"), event); 3253 } 3254 3255 let unbounded = filter_from_value(&serde_json::json!({ 3256 "authors": [first.unsigned().pubkey().as_str()], 3257 "kinds": [1] 3258 })) 3259 .expect("unbounded"); 3260 let client_limited = filter_from_value(&serde_json::json!({ 3261 "authors": [first.unsigned().pubkey().as_str()], 3262 "kinds": [1], 3263 "limit": 1 3264 })) 3265 .expect("client limited"); 3266 3267 assert_eq!( 3268 relay 3269 .handle_count_protocol( 3270 SubscriptionId::new("count-unbounded").expect("sub"), 3271 vec![unbounded] 3272 ) 3273 .expect("count"), 3274 RelayMessage::Count { 3275 subscription_id: SubscriptionId::new("count-unbounded").expect("sub"), 3276 count: 3, 3277 hll: None 3278 } 3279 ); 3280 assert_eq!( 3281 relay 3282 .handle_count_protocol( 3283 SubscriptionId::new("count-client-limited").expect("sub"), 3284 vec![client_limited] 3285 ) 3286 .expect("count"), 3287 RelayMessage::Count { 3288 subscription_id: SubscriptionId::new("count-client-limited").expect("sub"), 3289 count: 3, 3290 hll: None 3291 } 3292 ); 3293 } 3294 3295 #[test] 3296 fn base_relay_event_path_rejects_invalid_signatures_and_skips_ephemeral_storage() { 3297 let relay = test_relay("base-relay-event-store-path", 8); 3298 let valid = signed_public_event(7, 1, Vec::new(), "valid"); 3299 let signature_source = signed_public_event(8, 1, Vec::new(), "signature source"); 3300 let invalid = Event::new( 3301 valid.id().clone(), 3302 valid.unsigned().clone(), 3303 signature_source.sig().clone(), 3304 ); 3305 let ephemeral = signed_public_event(7, 20_001, Vec::new(), "ephemeral"); 3306 3307 assert!(matches!( 3308 relay.handle_event(invalid.clone()).expect("invalid"), 3309 RelayMessage::Ok { 3310 event_id, 3311 accepted: false, 3312 message 3313 } if event_id == *invalid.id() 3314 && message.starts_with("invalid:") 3315 )); 3316 assert_eq!(count_kind(&relay, 1), 0); 3317 3318 assert_accepted(relay.handle_event(valid.clone()).expect("valid"), &valid); 3319 assert_eq!( 3320 relay.handle_event(valid.clone()).expect("duplicate"), 3321 RelayMessage::Ok { 3322 event_id: valid.id().clone(), 3323 accepted: true, 3324 message: "duplicate: already have this event".to_owned() 3325 } 3326 ); 3327 assert_eq!(count_kind(&relay, 1), 1); 3328 3329 assert_accepted( 3330 relay.handle_event(ephemeral.clone()).expect("ephemeral"), 3331 &ephemeral, 3332 ); 3333 assert_eq!(count_kind(&relay, 20_001), 0); 3334 } 3335 3336 #[test] 3337 fn base_relay_pocket_event_path_preserves_event_admission_behavior() { 3338 let relay = test_relay("base-relay-pocket-event-store-path", 8); 3339 let tags = PocketOwnedTags::empty(); 3340 let protected_tags = PocketOwnedTags::new(&[["-"]]).expect("protected tags"); 3341 let valid_pocket = signed_pocket_event(7, 1, &tags, b"valid"); 3342 let signature_source = signed_pocket_event(8, 1, &tags, b"valid"); 3343 let invalid_pocket = PocketOwnedEvent::new( 3344 valid_pocket.id(), 3345 valid_pocket.kind(), 3346 valid_pocket.pubkey(), 3347 signature_source.sig(), 3348 valid_pocket.tags().expect("tags"), 3349 valid_pocket.created_at(), 3350 valid_pocket.content(), 3351 ) 3352 .expect("invalid pocket"); 3353 let ephemeral_pocket = signed_pocket_event(7, 20_001, &tags, b"ephemeral"); 3354 let protected_pocket = signed_pocket_event(7, 1, &protected_tags, b"protected"); 3355 3356 assert!( 3357 rejected_message(relay.handle_pocket_event(&invalid_pocket).expect("invalid")) 3358 .starts_with("invalid:") 3359 ); 3360 assert_eq!(count_kind(&relay, 1), 0); 3361 3362 assert_pocket_accepted( 3363 relay 3364 .handle_pocket_event(&valid_pocket) 3365 .expect("valid pocket"), 3366 &valid_pocket, 3367 ); 3368 assert_eq!( 3369 relay.handle_pocket_event(&valid_pocket).expect("duplicate"), 3370 RelayMessage::Ok { 3371 event_id: pocket_event_id(&valid_pocket), 3372 accepted: true, 3373 message: "duplicate: already have this event".to_owned() 3374 } 3375 ); 3376 assert_eq!(count_kind(&relay, 1), 1); 3377 3378 assert_pocket_accepted( 3379 relay 3380 .handle_pocket_event(&ephemeral_pocket) 3381 .expect("ephemeral"), 3382 &ephemeral_pocket, 3383 ); 3384 assert_eq!(count_kind(&relay, 20_001), 0); 3385 3386 assert_eq!( 3387 rejected_message( 3388 relay 3389 .handle_pocket_event(&protected_pocket) 3390 .expect("protected") 3391 ), 3392 "auth-required: protected event requires authenticated event author" 3393 ); 3394 assert_pocket_accepted( 3395 relay 3396 .handle_pocket_event_with_auth(&protected_pocket, &authenticated_state(7)) 3397 .expect("protected auth"), 3398 &protected_pocket, 3399 ); 3400 } 3401 3402 #[test] 3403 fn group_write_source_uses_atomic_service_boundary() { 3404 let core_source = include_str!("core.rs"); 3405 let group_source = include_str!("../groups.rs"); 3406 3407 assert!(core_source.contains("groups.store_group_pocket_event")); 3408 assert!(!core_source.contains(concat!("groups.", "check_event"))); 3409 assert!(!core_source.contains(concat!("groups.", "after_source_event_stored"))); 3410 assert!(!core_source.contains(concat!( 3411 "let tangle_event = ", 3412 "pocket_event_to_tangle(event)?;" 3413 ))); 3414 assert!(!group_source.contains("pub(crate) fn check_event(")); 3415 assert!(!group_source.contains("pub(crate) fn after_source_event_stored(")); 3416 } 3417 3418 #[test] 3419 fn base_relay_event_path_preserves_chorus_parity() { 3420 let owner = signer(7).public_key().clone(); 3421 let relay = test_relay_with_groups( 3422 "base-relay-event-chorus-parity", 3423 8, 3424 &enabled_groups_for_owner(&owner), 3425 ); 3426 let valid = signed_public_event(7, 1, Vec::new(), "valid"); 3427 let signature_source = signed_public_event(8, 1, Vec::new(), "signature source"); 3428 let invalid = Event::new( 3429 valid.id().clone(), 3430 valid.unsigned().clone(), 3431 signature_source.sig().clone(), 3432 ); 3433 let ephemeral = signed_public_event(7, 20_001, Vec::new(), "ephemeral"); 3434 let protected = signed_public_event( 3435 7, 3436 1, 3437 vec![Tag::from_parts("-", &[]).expect("protected")], 3438 "protected", 3439 ); 3440 let group_create = signed_group_create_event(7, "ParityFarm"); 3441 let empty_auth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth"); 3442 3443 assert!( 3444 rejected_message(relay.handle_event(invalid.clone()).expect("invalid")) 3445 .starts_with("invalid:") 3446 ); 3447 assert_eq!(count_kind(&relay, 1), 0); 3448 3449 assert_accepted(relay.handle_event(valid.clone()).expect("valid"), &valid); 3450 assert_eq!(count_kind(&relay, 1), 1); 3451 assert_eq!( 3452 relay.handle_event(valid.clone()).expect("duplicate"), 3453 RelayMessage::Ok { 3454 event_id: valid.id().clone(), 3455 accepted: true, 3456 message: "duplicate: already have this event".to_owned() 3457 } 3458 ); 3459 assert_eq!(count_kind(&relay, 1), 1); 3460 3461 assert_accepted( 3462 relay.handle_event(ephemeral.clone()).expect("ephemeral"), 3463 &ephemeral, 3464 ); 3465 assert_eq!(count_kind(&relay, 20_001), 0); 3466 3467 assert_eq!( 3468 rejected_message(relay.handle_event(protected.clone()).expect("protected")), 3469 "auth-required: protected event requires authenticated event author" 3470 ); 3471 assert_eq!( 3472 rejected_message( 3473 relay 3474 .handle_event_with_auth(group_create.clone(), &empty_auth) 3475 .expect("group unauth") 3476 ), 3477 "auth-required: group event author must authenticate with AUTH" 3478 ); 3479 assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 0); 3480 3481 assert_accepted( 3482 relay 3483 .handle_event_with_auth(group_create.clone(), &authenticated_state(7)) 3484 .expect("group auth"), 3485 &group_create, 3486 ); 3487 assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 1); 3488 assert!( 3489 relay 3490 .group_projection() 3491 .expect("projection") 3492 .group(&GroupId::new("ParityFarm").expect("group")) 3493 .is_some() 3494 ); 3495 } 3496 3497 #[test] 3498 fn base_relay_enforces_nip70_protected_event_author_auth() { 3499 let relay = test_relay("base-relay-nip70-protected", 8); 3500 let protected = signed_public_event( 3501 7, 3502 1, 3503 vec![Tag::from_parts("-", &[]).expect("protected")], 3504 "protected", 3505 ); 3506 3507 assert_eq!( 3508 rejected_message(relay.handle_event(protected.clone()).expect("unauth")), 3509 "auth-required: protected event requires authenticated event author" 3510 ); 3511 assert_eq!(count_kind(&relay, 1), 0); 3512 assert_eq!( 3513 rejected_message( 3514 relay 3515 .handle_event_with_auth(protected.clone(), &authenticated_state(8)) 3516 .expect("wrong auth") 3517 ), 3518 "auth-required: protected event requires authenticated event author" 3519 ); 3520 assert_eq!(count_kind(&relay, 1), 0); 3521 assert_accepted( 3522 relay 3523 .handle_event_with_auth(protected.clone(), &authenticated_state(7)) 3524 .expect("author auth"), 3525 &protected, 3526 ); 3527 assert_eq!(count_kind(&relay, 1), 1); 3528 } 3529 3530 #[test] 3531 fn base_relay_rejects_group_marked_events_before_group_service() { 3532 let relay = test_relay("base-relay-group-reject", 4); 3533 let event = signed_public_event( 3534 7, 3535 1, 3536 vec![Tag::from_parts("h", &["public-group"]).expect("group")], 3537 "hello", 3538 ); 3539 3540 assert_eq!( 3541 relay.handle_event(event.clone()).expect("event"), 3542 RelayMessage::Ok { 3543 event_id: event.id().clone(), 3544 accepted: false, 3545 message: "blocked: NIP-29 group events are not accepted before group service" 3546 .to_owned() 3547 } 3548 ); 3549 } 3550 3551 #[test] 3552 fn base_relay_rejects_client_submitted_relay_generated_group_state() { 3553 let relay = test_relay("base-relay-generated-group-reject", 4); 3554 for kind in NIP29_RELAY_GENERATED_KIND_VALUES { 3555 let event = signed_public_event( 3556 7, 3557 kind.into(), 3558 vec![Tag::from_parts("d", &["public-group"]).expect("group")], 3559 "", 3560 ); 3561 3562 assert_eq!( 3563 relay.handle_event(event.clone()).expect("event"), 3564 RelayMessage::Ok { 3565 event_id: event.id().clone(), 3566 accepted: false, 3567 message: 3568 "blocked: relay-generated group state events cannot be submitted by clients" 3569 .to_owned() 3570 } 3571 ); 3572 } 3573 } 3574 3575 #[test] 3576 fn base_relay_initializes_group_service_from_config() { 3577 let owner = signer(7).public_key().clone(); 3578 let relay = test_relay_with_groups( 3579 "base-relay-groups-enabled", 3580 4, 3581 &enabled_groups_for_owner(&owner), 3582 ); 3583 let disabled = test_relay_with_groups("base-relay-groups-disabled", 4, &disabled_groups()); 3584 3585 assert!(relay.groups_enabled()); 3586 assert_eq!( 3587 relay 3588 .readiness_state() 3589 .response() 3590 .checks 3591 .group_outbox_replay, 3592 "ready" 3593 ); 3594 assert!( 3595 relay 3596 .group_projection() 3597 .expect("projection") 3598 .groups() 3599 .is_empty() 3600 ); 3601 assert!(!disabled.groups_enabled()); 3602 assert_eq!( 3603 disabled 3604 .readiness_state() 3605 .response() 3606 .checks 3607 .group_outbox_replay, 3608 "ready" 3609 ); 3610 assert!(disabled.group_projection().is_none()); 3611 } 3612 3613 #[test] 3614 fn group_event_write_requires_auth_before_storage() { 3615 let owner = signer(7).public_key().clone(); 3616 let relay = test_relay_with_groups( 3617 "base-relay-group-auth-required", 3618 4, 3619 &enabled_groups_for_owner(&owner), 3620 ); 3621 let auth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth"); 3622 let event = signed_group_create_event(7, "Farm"); 3623 3624 assert_eq!( 3625 relay 3626 .handle_event_with_auth(event.clone(), &auth) 3627 .expect("event"), 3628 RelayMessage::Ok { 3629 event_id: event.id().clone(), 3630 accepted: false, 3631 message: "auth-required: group event author must authenticate with AUTH".to_owned() 3632 } 3633 ); 3634 assert!( 3635 relay 3636 .group_projection() 3637 .expect("projection") 3638 .group(&GroupId::new("Farm").expect("group")) 3639 .is_none() 3640 ); 3641 assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 0); 3642 } 3643 3644 #[test] 3645 fn group_create_updates_projection_and_stores_generated_snapshots() { 3646 let owner = signer(7).public_key().clone(); 3647 let relay = test_relay_with_groups( 3648 "base-relay-group-create", 3649 4, 3650 &enabled_groups_for_owner(&owner), 3651 ); 3652 let auth = authenticated_state(7); 3653 let event = signed_group_create_event(7, "Farm"); 3654 3655 assert_eq!( 3656 relay 3657 .handle_event_with_auth(event.clone(), &auth) 3658 .expect("event"), 3659 RelayMessage::Ok { 3660 event_id: event.id().clone(), 3661 accepted: true, 3662 message: String::new() 3663 } 3664 ); 3665 3666 let group_id = GroupId::new("Farm").expect("group"); 3667 assert!( 3668 relay 3669 .group_projection() 3670 .expect("projection") 3671 .group(&group_id) 3672 .is_some() 3673 ); 3674 assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 1); 3675 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 3676 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 3677 } 3678 3679 #[test] 3680 fn group_join_materializes_relay_membership_event() { 3681 let owner = signer(7).public_key().clone(); 3682 let joiner = signer(8).public_key().clone(); 3683 let relay = test_relay_with_groups( 3684 "base-relay-group-join", 3685 4, 3686 &enabled_groups_for_owner_with_public_join(&owner), 3687 ); 3688 let create = signed_group_create_event(7, "Farm"); 3689 assert_accepted( 3690 relay 3691 .handle_event_with_auth(create.clone(), &authenticated_state(7)) 3692 .expect("create"), 3693 &create, 3694 ); 3695 let join = signed_event_at( 3696 8, 3697 KIND_GROUP_JOIN_REQUEST.into(), 3698 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 3699 "", 3700 1_714_124_434, 3701 ); 3702 3703 assert_eq!( 3704 relay 3705 .handle_event_with_auth(join.clone(), &authenticated_state(8)) 3706 .expect("join"), 3707 RelayMessage::Ok { 3708 event_id: join.id().clone(), 3709 accepted: true, 3710 message: String::new() 3711 } 3712 ); 3713 3714 assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 1); 3715 assert_eq!( 3716 relay 3717 .group_projection() 3718 .expect("projection") 3719 .member(&GroupId::new("Farm").expect("group"), &joiner) 3720 .expect("member") 3721 .status(), 3722 MemberStatus::Member 3723 ); 3724 } 3725 3726 #[test] 3727 fn group_join_requires_public_join_policy() { 3728 let owner = signer(7).public_key().clone(); 3729 let relay = test_relay_with_groups( 3730 "base-relay-group-join-default-deny", 3731 4, 3732 &enabled_groups_for_owner(&owner), 3733 ); 3734 let create = signed_group_create_event(7, "Farm"); 3735 relay 3736 .handle_event_with_auth(create, &authenticated_state(7)) 3737 .expect("create"); 3738 let join = signed_event_at( 3739 8, 3740 KIND_GROUP_JOIN_REQUEST.into(), 3741 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 3742 "", 3743 1_714_124_434, 3744 ); 3745 3746 assert_eq!( 3747 rejected_message( 3748 relay 3749 .handle_event_with_auth(join, &authenticated_state(8)) 3750 .expect("join") 3751 ), 3752 "restricted: group is unavailable" 3753 ); 3754 assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 0); 3755 } 3756 3757 #[test] 3758 fn group_metadata_edit_replaces_generated_metadata_snapshot() { 3759 let owner = signer(7).public_key().clone(); 3760 let mut relay = test_relay_with_groups( 3761 "base-relay-group-metadata-edit", 3762 4, 3763 &enabled_groups_for_owner(&owner), 3764 ); 3765 let auth = authenticated_state(7); 3766 let create = signed_group_create_event(7, "Farm"); 3767 assert_accepted( 3768 relay 3769 .handle_event_with_auth(create.clone(), &auth) 3770 .expect("create"), 3771 &create, 3772 ); 3773 let edit = signed_event_at( 3774 7, 3775 KIND_GROUP_EDIT_METADATA.into(), 3776 vec![h("Farm"), name("Market")], 3777 "", 3778 1_714_124_436, 3779 ); 3780 assert_accepted( 3781 relay 3782 .handle_event_with_auth(edit.clone(), &auth) 3783 .expect("edit"), 3784 &edit, 3785 ); 3786 3787 let group_id = GroupId::new("Farm").expect("group"); 3788 { 3789 let projection = relay.group_projection().expect("projection"); 3790 let group = projection.group(&group_id).expect("group"); 3791 assert_eq!(group.metadata().name(), Some("Market")); 3792 } 3793 let metadata = query_filter( 3794 &mut relay, 3795 "metadata-edit", 3796 filter_group_tag(KIND_GROUP_METADATA, "d", "Farm"), 3797 ); 3798 assert_eq!(metadata.len(), 1); 3799 assert!(has_tag(&metadata[0], "d", &["Farm"])); 3800 assert!(has_tag(&metadata[0], "name", &["Market"])); 3801 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 3802 } 3803 3804 #[test] 3805 fn group_member_moderation_join_leave_and_snapshots_flow() { 3806 let owner = signer(7).public_key().clone(); 3807 let member = signer(8).public_key().clone(); 3808 let target = signer(9).public_key().clone(); 3809 let relay = test_relay_with_groups( 3810 "base-relay-group-member-flow", 3811 4, 3812 &enabled_groups_for_owner_with_public_join(&owner), 3813 ); 3814 let owner_auth = authenticated_state(7); 3815 let member_auth = authenticated_state(8); 3816 let target_auth = authenticated_state(9); 3817 relay 3818 .handle_event_with_auth(signed_group_create_event(7, "Farm"), &owner_auth) 3819 .expect("create"); 3820 let rejected_add = signed_event_at( 3821 9, 3822 KIND_GROUP_PUT_USER.into(), 3823 vec![h("Farm"), p(&target)], 3824 "", 3825 1_714_124_434, 3826 ); 3827 assert_eq!( 3828 rejected_message( 3829 relay 3830 .handle_event_with_auth(rejected_add.clone(), &target_auth) 3831 .expect("rejected add") 3832 ), 3833 "restricted: missing group capability manage_members" 3834 ); 3835 let add = signed_event_at( 3836 7, 3837 KIND_GROUP_PUT_USER.into(), 3838 vec![h("Farm"), p(&member)], 3839 "", 3840 1_714_124_435, 3841 ); 3842 assert_accepted( 3843 relay 3844 .handle_event_with_auth(add.clone(), &owner_auth) 3845 .expect("add"), 3846 &add, 3847 ); 3848 assert_member_status(&relay, "Farm", &member, MemberStatus::Member); 3849 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); 3850 3851 let remove = signed_event_at( 3852 7, 3853 KIND_GROUP_REMOVE_USER.into(), 3854 vec![h("Farm"), p(&member)], 3855 "", 3856 1_714_124_436, 3857 ); 3858 assert_accepted( 3859 relay 3860 .handle_event_with_auth(remove.clone(), &owner_auth) 3861 .expect("remove"), 3862 &remove, 3863 ); 3864 assert_member_status(&relay, "Farm", &member, MemberStatus::Removed); 3865 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); 3866 3867 let join = signed_event_at( 3868 8, 3869 KIND_GROUP_JOIN_REQUEST.into(), 3870 vec![h("Farm")], 3871 "", 3872 1_714_124_437, 3873 ); 3874 assert_accepted( 3875 relay 3876 .handle_event_with_auth(join.clone(), &member_auth) 3877 .expect("join"), 3878 &join, 3879 ); 3880 assert_member_status(&relay, "Farm", &member, MemberStatus::Member); 3881 let duplicate_join = signed_event_at( 3882 8, 3883 KIND_GROUP_JOIN_REQUEST.into(), 3884 vec![h("Farm")], 3885 "", 3886 1_714_124_438, 3887 ); 3888 assert_eq!( 3889 rejected_message( 3890 relay 3891 .handle_event_with_auth(duplicate_join, &member_auth) 3892 .expect("duplicate join") 3893 ), 3894 "duplicate: group member already exists" 3895 ); 3896 3897 let leave = signed_event_at( 3898 8, 3899 KIND_GROUP_LEAVE_REQUEST.into(), 3900 vec![h("Farm")], 3901 "", 3902 1_714_124_439, 3903 ); 3904 assert_accepted( 3905 relay 3906 .handle_event_with_auth(leave.clone(), &member_auth) 3907 .expect("leave"), 3908 &leave, 3909 ); 3910 assert_member_status(&relay, "Farm", &member, MemberStatus::Removed); 3911 assert_eq!(count_kind(&relay, KIND_GROUP_REMOVE_USER), 2); 3912 let duplicate_leave = signed_event_at( 3913 8, 3914 KIND_GROUP_LEAVE_REQUEST.into(), 3915 vec![h("Farm")], 3916 "", 3917 1_714_124_440, 3918 ); 3919 assert_eq!( 3920 rejected_message( 3921 relay 3922 .handle_event_with_auth(duplicate_leave, &member_auth) 3923 .expect("duplicate leave") 3924 ), 3925 "duplicate: group member does not exist" 3926 ); 3927 } 3928 3929 #[test] 3930 fn group_delete_event_moderation_hides_target_and_validates_group() { 3931 let owner = signer(7).public_key().clone(); 3932 let outsider_auth = authenticated_state(8); 3933 let owner_auth = authenticated_state(7); 3934 let relay = test_relay_with_groups( 3935 "base-relay-group-delete-event", 3936 4, 3937 &enabled_groups_for_owner(&owner), 3938 ); 3939 relay 3940 .handle_event_with_auth(signed_group_create_event(7, "Farm"), &owner_auth) 3941 .expect("create farm"); 3942 relay 3943 .handle_event_with_auth(signed_group_create_event(7, "Other"), &owner_auth) 3944 .expect("create other"); 3945 let target = signed_event_at(7, 1, vec![h("Farm")], "harvest", 1_714_124_434); 3946 let other = signed_event_at(7, 1, vec![h("Other")], "other", 1_714_124_435); 3947 relay 3948 .handle_event_with_auth(target.clone(), &owner_auth) 3949 .expect("target"); 3950 relay 3951 .handle_event_with_auth(other.clone(), &owner_auth) 3952 .expect("other"); 3953 3954 let wrong_group = signed_event_at( 3955 7, 3956 KIND_GROUP_DELETE_EVENT.into(), 3957 vec![h("Farm"), e(other.id())], 3958 "", 3959 1_714_124_436, 3960 ); 3961 assert_eq!( 3962 rejected_message( 3963 relay 3964 .handle_event_with_auth(wrong_group, &owner_auth) 3965 .expect("wrong group") 3966 ), 3967 "invalid: delete target event is not in group" 3968 ); 3969 let unauthorized = signed_event_at( 3970 8, 3971 KIND_GROUP_DELETE_EVENT.into(), 3972 vec![h("Farm"), e(target.id())], 3973 "", 3974 1_714_124_437, 3975 ); 3976 assert_eq!( 3977 rejected_message( 3978 relay 3979 .handle_event_with_auth(unauthorized, &outsider_auth) 3980 .expect("unauthorized") 3981 ), 3982 "restricted: missing group capability delete_events" 3983 ); 3984 assert_eq!( 3985 count_filter( 3986 &relay, 3987 "target-before-delete", 3988 filter_group_tag(1, "h", "Farm") 3989 ), 3990 1 3991 ); 3992 3993 let delete = signed_event_at( 3994 7, 3995 KIND_GROUP_DELETE_EVENT.into(), 3996 vec![h("Farm"), e(target.id())], 3997 "", 3998 1_714_124_438, 3999 ); 4000 assert_accepted( 4001 relay 4002 .handle_event_with_auth(delete.clone(), &owner_auth) 4003 .expect("delete"), 4004 &delete, 4005 ); 4006 4007 assert_eq!( 4008 count_filter( 4009 &relay, 4010 "target-after-delete", 4011 filter_group_tag(1, "h", "Farm") 4012 ), 4013 0 4014 ); 4015 assert_eq!( 4016 count_filter( 4017 &relay, 4018 "delete-event-marker", 4019 filter_group_tag(KIND_GROUP_DELETE_EVENT, "h", "Farm") 4020 ), 4021 1 4022 ); 4023 } 4024 4025 #[test] 4026 fn group_delete_tombstone_hides_events_and_rejects_future_writes() { 4027 let owner = signer(7).public_key().clone(); 4028 let auth = authenticated_state(7); 4029 let relay = test_relay_with_groups( 4030 "base-relay-group-delete-tombstone", 4031 4, 4032 &enabled_groups_for_owner(&owner), 4033 ); 4034 relay 4035 .handle_event_with_auth(signed_group_create_event(7, "Farm"), &auth) 4036 .expect("create"); 4037 let normal = signed_event_at(7, 1, vec![h("Farm")], "harvest", 1_714_124_434); 4038 relay.handle_event_with_auth(normal, &auth).expect("normal"); 4039 let delete_group = signed_event_at( 4040 7, 4041 KIND_GROUP_DELETE_GROUP.into(), 4042 vec![h("Farm")], 4043 "", 4044 1_714_124_435, 4045 ); 4046 assert_accepted( 4047 relay 4048 .handle_event_with_auth(delete_group.clone(), &auth) 4049 .expect("delete group"), 4050 &delete_group, 4051 ); 4052 4053 let future = signed_event_at(7, 1, vec![h("Farm")], "future", 1_714_124_436); 4054 assert_eq!( 4055 rejected_message(relay.handle_event_with_auth(future, &auth).expect("future")), 4056 "blocked: group is deleted" 4057 ); 4058 assert_eq!( 4059 count_filter( 4060 &relay, 4061 "deleted-group-normal", 4062 filter_group_tag(1, "h", "Farm") 4063 ), 4064 0 4065 ); 4066 assert_eq!( 4067 count_filter( 4068 &relay, 4069 "deleted-group-marker", 4070 filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "Farm") 4071 ), 4072 1 4073 ); 4074 } 4075 4076 #[test] 4077 fn strict_closed_restricted_hidden_and_disabled_invite_flows() { 4078 let owner = signer(7).public_key().clone(); 4079 let outsider_auth = authenticated_state(8); 4080 let owner_auth = authenticated_state(7); 4081 let relay = test_relay_with_groups( 4082 "base-relay-group-strict-policy-flow", 4083 4, 4084 &enabled_groups_for_owner(&owner), 4085 ); 4086 relay 4087 .handle_event_with_auth( 4088 signed_group_create_event_with_tags(7, "Restricted", vec![restricted()], 1), 4089 &owner_auth, 4090 ) 4091 .expect("restricted create"); 4092 let restricted_write = 4093 signed_event_at(8, 1, vec![h("Restricted")], "restricted", 1_714_124_434); 4094 assert_eq!( 4095 rejected_message( 4096 relay 4097 .handle_event_with_auth(restricted_write, &outsider_auth) 4098 .expect("restricted write") 4099 ), 4100 "restricted: group is unavailable" 4101 ); 4102 4103 relay 4104 .handle_event_with_auth( 4105 signed_group_create_event_with_tags(7, "Closed", vec![closed()], 2), 4106 &owner_auth, 4107 ) 4108 .expect("closed create"); 4109 let closed_join = signed_event_at( 4110 8, 4111 KIND_GROUP_JOIN_REQUEST.into(), 4112 vec![h("Closed")], 4113 "", 4114 1_714_124_435, 4115 ); 4116 assert_eq!( 4117 rejected_message( 4118 relay 4119 .handle_event_with_auth(closed_join, &outsider_auth) 4120 .expect("closed join") 4121 ), 4122 "restricted: group is unavailable" 4123 ); 4124 let closed_normal = signed_event_at(8, 1, vec![h("Closed")], "open", 1_714_124_436); 4125 assert_accepted( 4126 relay 4127 .handle_event_with_auth(closed_normal.clone(), &outsider_auth) 4128 .expect("closed normal"), 4129 &closed_normal, 4130 ); 4131 4132 relay 4133 .handle_event_with_auth( 4134 signed_group_create_event_with_tags(7, "Hidden", vec![hidden()], 3), 4135 &owner_auth, 4136 ) 4137 .expect("hidden create"); 4138 assert_eq!( 4139 count_filter( 4140 &relay, 4141 "hidden-unauth", 4142 filter_group_tag(KIND_GROUP_METADATA, "d", "Hidden") 4143 ), 4144 0 4145 ); 4146 assert_eq!( 4147 count_filter_with_auth( 4148 &relay, 4149 "hidden-owner", 4150 filter_group_tag(KIND_GROUP_METADATA, "d", "Hidden"), 4151 &owner_auth 4152 ), 4153 1 4154 ); 4155 4156 let invite = signed_event_at( 4157 7, 4158 KIND_GROUP_CREATE_INVITE.into(), 4159 vec![h("Closed")], 4160 "", 4161 1_714_124_437, 4162 ); 4163 assert_eq!( 4164 rejected_message( 4165 relay 4166 .handle_event_with_auth(invite, &owner_auth) 4167 .expect("invite") 4168 ), 4169 "restricted: invites not enabled" 4170 ); 4171 } 4172 4173 #[test] 4174 fn private_group_req_and_count_use_reader_auth() { 4175 let owner = signer(7).public_key().clone(); 4176 let auth = authenticated_state(7); 4177 let mut relay = test_relay_with_groups( 4178 "base-relay-private-read", 4179 4, 4180 &enabled_groups_for_owner(&owner), 4181 ); 4182 relay 4183 .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth) 4184 .expect("create"); 4185 let private_event = signed_event_at( 4186 7, 4187 1, 4188 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 4189 "private harvest", 4190 1_714_124_435, 4191 ); 4192 relay 4193 .handle_event_with_auth(private_event.clone(), &auth) 4194 .expect("private event"); 4195 4196 let unauth_sub = SubscriptionId::new("private-unauth").expect("sub"); 4197 let auth_sub = SubscriptionId::new("private-auth").expect("sub"); 4198 assert_eq!( 4199 relay 4200 .handle_protocol_req_for_test(unauth_sub.clone(), vec![filter_kind(1)]) 4201 .expect("unauth req"), 4202 vec![RelayMessage::Closed { 4203 subscription_id: unauth_sub, 4204 message: "auth-required: authentication required to read group events".to_owned() 4205 }] 4206 ); 4207 assert_eq!(relay.active_subscription_count(), 0); 4208 assert!(matches!( 4209 relay 4210 .handle_protocol_req_with_auth_for_test(auth_sub.clone(), vec![filter_kind(1)], &auth) 4211 .expect("auth req") 4212 .as_slice(), 4213 [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)] 4214 if subscription_id == &auth_sub && event.id() == private_event.id() && eose == &auth_sub 4215 )); 4216 assert_eq!(count_kind(&relay, 1), 0); 4217 assert_eq!(count_kind_with_auth(&relay, 1, &auth), 1); 4218 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 4219 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 4220 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 0); 4221 assert_eq!(count_kind_with_auth(&relay, KIND_GROUP_METADATA, &auth), 1); 4222 assert_eq!(count_kind_with_auth(&relay, KIND_GROUP_ADMINS, &auth), 1); 4223 } 4224 4225 #[test] 4226 fn private_and_hidden_group_offset_lookup_uses_reader_auth() { 4227 let owner = signer(7).public_key().clone(); 4228 let owner_auth = authenticated_state(7); 4229 let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 4230 let relay = test_relay_with_groups( 4231 "base-relay-private-offset-read", 4232 4, 4233 &enabled_groups_for_owner(&owner), 4234 ); 4235 relay 4236 .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &owner_auth) 4237 .expect("create"); 4238 let private_event = signed_event_at( 4239 7, 4240 1, 4241 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 4242 "private harvest", 4243 1_714_124_435, 4244 ); 4245 let pocket = tangle_event_to_pocket(&private_event).expect("pocket"); 4246 let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store")); 4247 4248 assert_eq!( 4249 relay 4250 .event_by_offset_with_auth(offset, &unauth) 4251 .expect("unauth offset"), 4252 None 4253 ); 4254 let visible = relay 4255 .event_by_offset_with_auth(offset, &owner_auth) 4256 .expect("owner offset") 4257 .expect("visible"); 4258 let visible: &PocketEvent = &visible; 4259 assert_eq!(visible.id().as_hex_string(), private_event.id().as_str()); 4260 4261 relay 4262 .handle_event_with_auth( 4263 signed_group_create_event_with_tags(7, "HiddenFarm", vec![hidden()], 1_714_124_436), 4264 &owner_auth, 4265 ) 4266 .expect("hidden create"); 4267 let hidden_event = signed_event_at( 4268 7, 4269 1, 4270 vec![Tag::from_parts("h", &["HiddenFarm"]).expect("h")], 4271 "hidden harvest", 4272 1_714_124_437, 4273 ); 4274 let pocket = tangle_event_to_pocket(&hidden_event).expect("hidden pocket"); 4275 let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store hidden")); 4276 4277 assert_eq!( 4278 relay 4279 .event_by_offset_with_auth(offset, &unauth) 4280 .expect("hidden unauth offset"), 4281 None 4282 ); 4283 let visible = relay 4284 .event_by_offset_with_auth(offset, &owner_auth) 4285 .expect("hidden owner offset") 4286 .expect("hidden visible"); 4287 let visible: &PocketEvent = &visible; 4288 assert_eq!(visible.id().as_hex_string(), hidden_event.id().as_str()); 4289 } 4290 4291 #[test] 4292 fn private_group_live_fanout_uses_current_auth() { 4293 let owner = signer(7).public_key().clone(); 4294 let auth = authenticated_state(7); 4295 let mut relay = test_relay_with_groups( 4296 "base-relay-private-fanout", 4297 4, 4298 &enabled_groups_for_owner(&owner), 4299 ); 4300 relay 4301 .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth) 4302 .expect("create"); 4303 let subscription_id = SubscriptionId::new("fanout-current-auth").expect("sub"); 4304 relay 4305 .handle_protocol_req_for_test(subscription_id.clone(), vec![filter_kind(1)]) 4306 .expect("sub"); 4307 let private_event = signed_event_at( 4308 7, 4309 1, 4310 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 4311 "private harvest", 4312 1_714_124_435, 4313 ); 4314 relay 4315 .handle_event_with_auth(private_event.clone(), &auth) 4316 .expect("private event"); 4317 4318 assert!(relay.fanout_protocol_for_test(&private_event).is_empty()); 4319 assert!(matches!( 4320 relay 4321 .fanout_protocol_with_group_auth_for_test( 4322 &private_event, 4323 &GroupAuthContext::new([owner]) 4324 ) 4325 .as_slice(), 4326 [RelayMessage::Event { 4327 subscription_id: delivered, 4328 event 4329 }] if delivered == &subscription_id && event.id() == private_event.id() 4330 )); 4331 } 4332 4333 #[test] 4334 fn live_subscription_delivery_volume_does_not_close_subscription() { 4335 let mut relay = test_relay("base-relay-delivery-volume", 1); 4336 let subscription_id = SubscriptionId::new("sub-volume").expect("sub"); 4337 let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter"); 4338 relay 4339 .handle_protocol_req_for_test(subscription_id.clone(), vec![filter]) 4340 .expect("req"); 4341 let first = signed_public_event(7, 1, Vec::new(), "first"); 4342 let second = signed_public_event(7, 1, Vec::new(), "second"); 4343 4344 assert!(matches!( 4345 relay.fanout_protocol_for_test(&first).as_slice(), 4346 [RelayMessage::Event { .. }] 4347 )); 4348 assert!(matches!( 4349 relay.fanout_protocol_for_test(&second).as_slice(), 4350 [RelayMessage::Event { .. }] 4351 )); 4352 assert_eq!(relay.active_subscription_count(), 1); 4353 } 4354 4355 #[test] 4356 fn base_relay_shutdown_closes_live_subscriptions_and_syncs_store() { 4357 let config = test_store_config("base-relay-shutdown"); 4358 let mut relay = 4359 BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default()).expect("relay"); 4360 let event = signed_public_event(7, 1, Vec::new(), "shutdown"); 4361 let subscription_id = SubscriptionId::new("sub-shutdown").expect("sub"); 4362 4363 assert_accepted(relay.handle_event(event.clone()).expect("event"), &event); 4364 relay 4365 .handle_protocol_req_for_test(subscription_id, vec![filter_kind(1)]) 4366 .expect("req"); 4367 4368 assert_eq!(relay.active_subscription_count(), 1); 4369 4370 let report = relay.shutdown().expect("shutdown"); 4371 4372 assert_eq!(report.closed_subscriptions(), 1); 4373 assert_eq!(relay.active_subscription_count(), 0); 4374 assert!(relay.fanout_protocol_for_test(&event).is_empty()); 4375 4376 let reopened = BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default()) 4377 .expect("reopened"); 4378 assert_eq!(count_kind(&reopened, 1), 1); 4379 } 4380 4381 #[test] 4382 fn base_relay_client_message_dispatch_handles_count_and_auth() { 4383 let mut relay = test_relay("base-relay-dispatch", 4); 4384 let mut auth = 4385 BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 4386 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 4387 .expect("challenge"); 4388 let auth_event = signed_auth_event(7, "challenge-a", 120); 4389 let count_id = SubscriptionId::new("count-a").expect("sub"); 4390 4391 assert_eq!( 4392 relay 4393 .handle_client_message( 4394 ClientMessage::Auth(auth_event.clone()), 4395 &mut auth, 4396 UnixTimestamp::new(120) 4397 ) 4398 .expect("auth"), 4399 vec![RelayMessage::Ok { 4400 event_id: auth_event.id().clone(), 4401 accepted: true, 4402 message: String::new() 4403 }] 4404 ); 4405 assert_eq!( 4406 relay 4407 .handle_client_message( 4408 ClientMessage::Count { 4409 subscription_id: count_id.clone(), 4410 filters: vec![Filter::empty()] 4411 }, 4412 &mut auth, 4413 UnixTimestamp::new(130) 4414 ) 4415 .expect("count"), 4416 vec![RelayMessage::Count { 4417 subscription_id: count_id, 4418 count: 0, 4419 hll: None 4420 }] 4421 ); 4422 } 4423 4424 #[test] 4425 fn base_relay_enforces_event_and_filter_runtime_limits() { 4426 let config = test_store_config("base-relay-event-filter-runtime-limits"); 4427 let mut relay = 4428 BaseRelay::open(&config, strict_relay_limits(), PocketQueryConfig::default()) 4429 .expect("relay"); 4430 let first = signed_public_event(7, 1, Vec::new(), "a"); 4431 let second = signed_event_at(8, 1, Vec::new(), "b", 1_714_124_434); 4432 4433 assert_accepted(relay.handle_event(first.clone()).expect("first"), &first); 4434 assert_accepted(relay.handle_event(second.clone()).expect("second"), &second); 4435 assert_eq!( 4436 rejected_message( 4437 relay 4438 .handle_event(signed_public_event(7, 1, Vec::new(), "abcde")) 4439 .expect("content") 4440 ), 4441 "invalid: event content length exceeds runtime max_content_length 4" 4442 ); 4443 assert_eq!( 4444 rejected_message( 4445 relay 4446 .handle_event(signed_public_event( 4447 7, 4448 1, 4449 vec![ 4450 Tag::from_parts("t", &["one"]).expect("tag"), 4451 Tag::from_parts("r", &["two"]).expect("tag"), 4452 ], 4453 "", 4454 )) 4455 .expect("tags") 4456 ), 4457 "invalid: event tag count exceeds runtime max_event_tags 1" 4458 ); 4459 assert_eq!( 4460 relay 4461 .handle_protocol_req_for_test( 4462 SubscriptionId::new("a").expect("sub"), 4463 vec![Filter::empty()] 4464 ) 4465 .expect("default limit") 4466 .len(), 4467 2 4468 ); 4469 assert!( 4470 relay 4471 .handle_protocol_req_for_test( 4472 SubscriptionId::new("a").expect("sub"), 4473 vec![Filter::empty(), Filter::empty()], 4474 ) 4475 .expect_err("filter count") 4476 .prefixed_message() 4477 .contains("max_filters_per_request 1") 4478 ); 4479 assert!( 4480 relay 4481 .handle_count_protocol( 4482 SubscriptionId::new("a").expect("sub"), 4483 vec![ 4484 filter_from_value(&serde_json::json!({"#t":["one", "two"]})) 4485 .expect("filter"), 4486 ], 4487 ) 4488 .expect_err("tag values") 4489 .prefixed_message() 4490 .contains("max_tag_values_per_filter 1") 4491 ); 4492 assert!( 4493 relay 4494 .handle_protocol_req_for_test( 4495 SubscriptionId::new("a").expect("sub"), 4496 vec![filter_from_value(&serde_json::json!({"limit": 3})).expect("filter")], 4497 ) 4498 .expect_err("max limit") 4499 .prefixed_message() 4500 .contains("max_limit 2") 4501 ); 4502 } 4503 4504 #[test] 4505 fn base_relay_enforces_subscription_id_and_count_limits() { 4506 let config = test_store_config("base-relay-subscription-limits"); 4507 let mut relay = 4508 BaseRelay::open(&config, strict_relay_limits(), PocketQueryConfig::default()) 4509 .expect("relay"); 4510 4511 assert!( 4512 relay 4513 .handle_protocol_req_for_test( 4514 SubscriptionId::new("abcde").expect("sub"), 4515 vec![Filter::empty()], 4516 ) 4517 .expect_err("sub id length") 4518 .prefixed_message() 4519 .contains("max_subid_length 4") 4520 ); 4521 relay 4522 .handle_protocol_req_for_test( 4523 SubscriptionId::new("a").expect("sub"), 4524 vec![Filter::empty()], 4525 ) 4526 .expect("first subscription"); 4527 assert!( 4528 relay 4529 .handle_protocol_req_for_test( 4530 SubscriptionId::new("b").expect("sub"), 4531 vec![Filter::empty()] 4532 ) 4533 .expect_err("subscription count") 4534 .prefixed_message() 4535 .contains("connection subscription limit exceeded") 4536 ); 4537 relay 4538 .handle_protocol_req_for_test( 4539 SubscriptionId::new("a").expect("sub"), 4540 vec![Filter::empty()], 4541 ) 4542 .expect("replace subscription"); 4543 } 4544 4545 fn test_relay(name: &str, max_pending_events: usize) -> BaseRelay { 4546 let config = test_store_config(name); 4547 BaseRelay::open( 4548 &config, 4549 relay_limits(max_pending_events), 4550 PocketQueryConfig::default(), 4551 ) 4552 .expect("relay") 4553 } 4554 4555 fn test_relay_with_groups( 4556 name: &str, 4557 max_pending_events: usize, 4558 groups: &tangle_groups::GroupRuntimeConfig, 4559 ) -> BaseRelay { 4560 let config = test_store_config(name); 4561 BaseRelay::open_with_groups( 4562 &config, 4563 relay_limits(max_pending_events), 4564 groups, 4565 PocketQueryConfig::default(), 4566 ) 4567 .expect("relay") 4568 } 4569 4570 fn relay_limits(max_pending_events: usize) -> BaseRelayLimits { 4571 BaseRelayLimits::new(BaseRelayLimitSettings { 4572 max_pending_events, 4573 max_subscription_id_length: 64, 4574 max_subscriptions: 64, 4575 max_filters_per_request: 10, 4576 max_tag_values_per_filter: 100, 4577 max_query_complexity: 610, 4578 max_event_tags: 200, 4579 max_content_length: 65_536, 4580 max_limit: 500, 4581 default_limit: 100, 4582 }) 4583 .expect("limits") 4584 } 4585 4586 fn strict_relay_limits() -> BaseRelayLimits { 4587 BaseRelayLimits::new(BaseRelayLimitSettings { 4588 max_pending_events: 4, 4589 max_subscription_id_length: 4, 4590 max_subscriptions: 1, 4591 max_filters_per_request: 1, 4592 max_tag_values_per_filter: 1, 4593 max_query_complexity: 4, 4594 max_event_tags: 1, 4595 max_content_length: 4, 4596 max_limit: 2, 4597 default_limit: 1, 4598 }) 4599 .expect("limits") 4600 } 4601 4602 fn test_store_config(name: &str) -> PocketStoreConfig { 4603 let root = std::env::temp_dir().join(format!("tangle-{name}-{}", std::process::id())); 4604 let _ = std::fs::remove_dir_all(&root); 4605 PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 4606 .expect("config") 4607 } 4608 4609 fn enabled_groups_for_owner(owner: &PublicKeyHex) -> tangle_groups::GroupRuntimeConfig { 4610 parse_group_runtime_config_json(&format!( 4611 r#"{{ 4612 "enabled": true, 4613 "canonical_relay_url": "wss://relay.radroots.test", 4614 "relay_secret": "{}", 4615 "owner_pubkeys": ["{}"] 4616 }}"#, 4617 "7".repeat(64), 4618 owner.as_str() 4619 )) 4620 .expect("groups") 4621 } 4622 4623 fn enabled_groups_for_owner_with_public_join( 4624 owner: &PublicKeyHex, 4625 ) -> tangle_groups::GroupRuntimeConfig { 4626 parse_group_runtime_config_json(&format!( 4627 r#"{{ 4628 "enabled": true, 4629 "canonical_relay_url": "wss://relay.radroots.test", 4630 "relay_secret": "{}", 4631 "owner_pubkeys": ["{}"], 4632 "policy": {{"public_join": true, "invites_enabled": false}} 4633 }}"#, 4634 "7".repeat(64), 4635 owner.as_str() 4636 )) 4637 .expect("groups") 4638 } 4639 4640 fn disabled_groups() -> tangle_groups::GroupRuntimeConfig { 4641 parse_group_runtime_config_json(r#"{"enabled": false}"#).expect("groups") 4642 } 4643 4644 fn signed_auth_event(secret_byte: u8, challenge: &str, created_at: u64) -> Event { 4645 signed_tangle_event_at( 4646 secret_byte, 4647 22_242, 4648 vec![ 4649 Tag::from_parts("relay", &["wss://relay.radroots.test"]).expect("relay"), 4650 Tag::from_parts("challenge", &[challenge]).expect("challenge"), 4651 ], 4652 "", 4653 created_at, 4654 ) 4655 } 4656 4657 fn signed_public_event(secret_byte: u8, kind: u64, tags: Vec<Tag>, content: &str) -> Event { 4658 signed_event_at(secret_byte, kind, tags, content, 1_714_124_433) 4659 } 4660 4661 fn signed_pocket_event( 4662 secret_byte: u8, 4663 kind: u16, 4664 tags: &PocketOwnedTags, 4665 content: &[u8], 4666 ) -> PocketOwnedEvent { 4667 signed_pocket_event_at(secret_byte, kind, tags, content, 1_714_124_433) 4668 } 4669 4670 fn signed_pocket_event_at( 4671 secret_byte: u8, 4672 kind: u16, 4673 tags: &PocketOwnedTags, 4674 content: &[u8], 4675 created_at: u64, 4676 ) -> PocketOwnedEvent { 4677 let secret = format!("{secret_byte:02x}").repeat(32); 4678 RelaySigner::from_secret_hex(&secret) 4679 .expect("signer") 4680 .sign_pocket_event( 4681 PocketKind::from_u16(kind), 4682 tags, 4683 PocketTime::from_u64(created_at), 4684 content, 4685 ) 4686 .expect("pocket event") 4687 } 4688 4689 fn signed_pocket_public_event( 4690 secret_byte: u8, 4691 kind: u32, 4692 tags: Vec<Tag>, 4693 content: &str, 4694 ) -> PocketOwnedEvent { 4695 signed_pocket_event_at_tags(secret_byte, kind, tags, content, 1_714_124_433) 4696 } 4697 4698 fn signed_pocket_group_create_event(secret_byte: u8, group_id: &str) -> PocketOwnedEvent { 4699 signed_pocket_group_create_event_with_tags(secret_byte, group_id, Vec::new(), 1_714_124_433) 4700 } 4701 4702 fn signed_pocket_group_create_event_with_tags( 4703 secret_byte: u8, 4704 group_id: &str, 4705 mut extra_tags: Vec<Tag>, 4706 created_at: u64, 4707 ) -> PocketOwnedEvent { 4708 let mut tags = vec![h(group_id), name(group_id)]; 4709 tags.append(&mut extra_tags); 4710 signed_pocket_event_at_tags(secret_byte, KIND_GROUP_CREATE_GROUP, tags, "", created_at) 4711 } 4712 4713 fn signed_pocket_private_group_create_event( 4714 secret_byte: u8, 4715 group_id: &str, 4716 ) -> PocketOwnedEvent { 4717 signed_pocket_event_at_tags( 4718 secret_byte, 4719 KIND_GROUP_CREATE_GROUP, 4720 vec![h(group_id), name(group_id), private()], 4721 "", 4722 1_714_124_433, 4723 ) 4724 } 4725 4726 fn signed_pocket_event_at_tags( 4727 secret_byte: u8, 4728 kind: u32, 4729 tags: Vec<Tag>, 4730 content: &str, 4731 created_at: u64, 4732 ) -> PocketOwnedEvent { 4733 let tags = pocket_tags_from_protocol(&tags); 4734 signed_pocket_event_at( 4735 secret_byte, 4736 u16::try_from(kind).expect("pocket kind"), 4737 &tags, 4738 content.as_bytes(), 4739 created_at, 4740 ) 4741 } 4742 4743 fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 4744 let parts = tags 4745 .iter() 4746 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 4747 .collect::<Vec<_>>(); 4748 PocketOwnedTags::new(&parts).expect("pocket tags") 4749 } 4750 4751 fn signed_group_create_event(secret_byte: u8, group_id: &str) -> Event { 4752 signed_group_create_event_with_tags(secret_byte, group_id, Vec::new(), 1_714_124_433) 4753 } 4754 4755 fn signed_group_create_event_with_tags( 4756 secret_byte: u8, 4757 group_id: &str, 4758 mut extra_tags: Vec<Tag>, 4759 created_at: u64, 4760 ) -> Event { 4761 let mut tags = vec![h(group_id), name(group_id)]; 4762 tags.append(&mut extra_tags); 4763 signed_event_at( 4764 secret_byte, 4765 KIND_GROUP_CREATE_GROUP.into(), 4766 tags, 4767 "", 4768 created_at, 4769 ) 4770 } 4771 4772 fn signed_private_group_create_event(secret_byte: u8, group_id: &str) -> Event { 4773 signed_event_at( 4774 secret_byte, 4775 KIND_GROUP_CREATE_GROUP.into(), 4776 vec![h(group_id), name(group_id), private()], 4777 "", 4778 1_714_124_433, 4779 ) 4780 } 4781 4782 fn signed_event_at( 4783 secret_byte: u8, 4784 kind: u64, 4785 tags: Vec<Tag>, 4786 content: &str, 4787 created_at: u64, 4788 ) -> Event { 4789 let pocket = signed_pocket_event_at_tags( 4790 secret_byte, 4791 u32::try_from(kind).expect("kind"), 4792 tags, 4793 content, 4794 created_at, 4795 ); 4796 pocket_event_to_protocol(&pocket) 4797 } 4798 4799 fn signed_tangle_event_at( 4800 secret_byte: u8, 4801 kind: u64, 4802 tags: Vec<Tag>, 4803 content: &str, 4804 created_at: u64, 4805 ) -> Event { 4806 let secret = format!("{secret_byte:02x}").repeat(32); 4807 let signer = RelaySigner::from_secret_hex(&secret).expect("signer"); 4808 let unsigned = UnsignedEvent::new( 4809 signer.public_key().clone(), 4810 UnixTimestamp::new(created_at), 4811 Kind::new(kind).expect("kind"), 4812 tags, 4813 content, 4814 ); 4815 signer.sign_unsigned_event(unsigned) 4816 } 4817 4818 fn pocket_event_id(event: &PocketEvent) -> EventId { 4819 EventId::new(&event.id().as_hex_string()).expect("event id") 4820 } 4821 4822 fn pocket_event_to_protocol(event: &PocketEvent) -> Event { 4823 let tags = event 4824 .tags() 4825 .expect("tags") 4826 .iter() 4827 .map(|tag| { 4828 Tag::new( 4829 tag.map(|value| std::str::from_utf8(value).expect("utf8").to_owned()) 4830 .collect::<Vec<_>>(), 4831 ) 4832 .expect("tag") 4833 }) 4834 .collect::<Vec<_>>(); 4835 Event::new( 4836 pocket_event_id(event), 4837 tangle_protocol::UnsignedEvent::new( 4838 PublicKeyHex::new(&event.pubkey().as_hex_string()).expect("pubkey"), 4839 UnixTimestamp::new(event.created_at().as_u64()), 4840 tangle_protocol::Kind::new(u64::from(event.kind().as_u16())).expect("kind"), 4841 tags, 4842 std::str::from_utf8(event.content()).expect("content"), 4843 ), 4844 SignatureHex::new(&event.sig().to_string()).expect("sig"), 4845 ) 4846 } 4847 4848 fn authenticated_state(secret_byte: u8) -> BaseAuthState { 4849 let mut auth = 4850 BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); 4851 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 4852 .expect("challenge"); 4853 let event = signed_auth_event(secret_byte, "challenge-a", 120); 4854 auth.authenticate(&event, UnixTimestamp::new(120)) 4855 .expect("authenticate"); 4856 auth 4857 } 4858 4859 fn count_kind(relay: &BaseRelay, kind: u32) -> u64 { 4860 let subscription_id = SubscriptionId::new(&format!("count-{kind}")).expect("sub"); 4861 let filter = filter_kind(kind); 4862 match relay 4863 .handle_count_protocol(subscription_id, vec![filter]) 4864 .expect("count") 4865 { 4866 RelayMessage::Count { count, .. } => count, 4867 _ => panic!("count response expected"), 4868 } 4869 } 4870 4871 fn count_kind_with_auth(relay: &BaseRelay, kind: u32, auth: &BaseAuthState) -> u64 { 4872 let subscription_id = SubscriptionId::new(&format!("count-auth-{kind}")).expect("sub"); 4873 match relay 4874 .handle_count_with_auth_protocol(subscription_id, vec![filter_kind(kind)], auth) 4875 .expect("count") 4876 { 4877 RelayMessage::Count { count, .. } => count, 4878 _ => panic!("count response expected"), 4879 } 4880 } 4881 4882 fn count_filter(relay: &BaseRelay, subscription_id: &str, filter: Filter) -> u64 { 4883 match relay 4884 .handle_count_protocol( 4885 SubscriptionId::new(subscription_id).expect("sub"), 4886 vec![filter], 4887 ) 4888 .expect("count") 4889 { 4890 RelayMessage::Count { count, .. } => count, 4891 _ => panic!("count response expected"), 4892 } 4893 } 4894 4895 fn count_filter_with_auth( 4896 relay: &BaseRelay, 4897 subscription_id: &str, 4898 filter: Filter, 4899 auth: &BaseAuthState, 4900 ) -> u64 { 4901 match relay 4902 .handle_count_with_auth_protocol( 4903 SubscriptionId::new(subscription_id).expect("sub"), 4904 vec![filter], 4905 auth, 4906 ) 4907 .expect("count") 4908 { 4909 RelayMessage::Count { count, .. } => count, 4910 _ => panic!("count response expected"), 4911 } 4912 } 4913 4914 fn assert_count_without_hll( 4915 relay: &BaseRelay, 4916 subscription_id: &str, 4917 value: serde_json::Value, 4918 auth: Option<&BaseAuthState>, 4919 expected_count: u64, 4920 ) { 4921 let subscription_id = SubscriptionId::new(subscription_id).expect("sub"); 4922 let filter = filter_from_value(&value).expect("filter"); 4923 let message = match auth { 4924 Some(auth) => { 4925 relay.handle_count_with_auth_protocol(subscription_id.clone(), vec![filter], auth) 4926 } 4927 None => relay.handle_count_protocol(subscription_id.clone(), vec![filter]), 4928 } 4929 .expect("count"); 4930 assert_eq!( 4931 message, 4932 RelayMessage::Count { 4933 subscription_id, 4934 count: expected_count, 4935 hll: None 4936 } 4937 ); 4938 } 4939 4940 fn query_filter(relay: &mut BaseRelay, subscription_id: &str, filter: Filter) -> Vec<Event> { 4941 relay 4942 .handle_protocol_req_for_test( 4943 SubscriptionId::new(subscription_id).expect("sub"), 4944 vec![filter], 4945 ) 4946 .expect("query") 4947 .into_iter() 4948 .filter_map(|message| match message { 4949 RelayMessage::Event { event, .. } => Some(event), 4950 _ => None, 4951 }) 4952 .collect() 4953 } 4954 4955 fn filter_kind(kind: u32) -> Filter { 4956 filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter") 4957 } 4958 4959 fn filter_group_tag(kind: u32, tag: &str, group_id: &str) -> Filter { 4960 let mut value = serde_json::json!({"kinds":[kind]}); 4961 value 4962 .as_object_mut() 4963 .expect("object") 4964 .insert(format!("#{tag}"), serde_json::json!([group_id])); 4965 filter_from_value(&value).expect("filter") 4966 } 4967 4968 fn pocket_filter_from_value(value: serde_json::Value) -> PocketOwnedFilter { 4969 tangle_filter_to_pocket(&filter_from_value(&value).expect("filter")).expect("pocket") 4970 } 4971 4972 fn hll_target_policy( 4973 relay: &BaseRelay, 4974 value: serde_json::Value, 4975 ) -> BaseRelayCountHllTargetPolicy { 4976 let filter = pocket_filter_from_value(value); 4977 BaseRelay::count_hll_filter_target_policy(relay.groups.as_ref(), &filter) 4978 } 4979 4980 fn count_hll_for_target_policy_test() -> BaseRelayCountHll { 4981 BaseRelayCountHll { 4982 offset: Some(0), 4983 hll: Some(PocketHll8::new()), 4984 suppressed: false, 4985 } 4986 } 4987 4988 fn assert_accepted(message: RelayMessage, event: &Event) { 4989 assert_eq!( 4990 message, 4991 RelayMessage::Ok { 4992 event_id: event.id().clone(), 4993 accepted: true, 4994 message: String::new() 4995 } 4996 ); 4997 } 4998 4999 fn assert_pocket_accepted(message: RelayMessage, event: &PocketEvent) { 5000 assert_eq!( 5001 message, 5002 RelayMessage::Ok { 5003 event_id: pocket_event_id(event), 5004 accepted: true, 5005 message: String::new() 5006 } 5007 ); 5008 } 5009 5010 fn rejected_message(message: RelayMessage) -> String { 5011 match message { 5012 RelayMessage::Ok { 5013 accepted: false, 5014 message, 5015 .. 5016 } => message, 5017 _ => panic!("rejected OK expected"), 5018 } 5019 } 5020 5021 fn assert_member_status( 5022 relay: &BaseRelay, 5023 group_id: &str, 5024 pubkey: &PublicKeyHex, 5025 status: MemberStatus, 5026 ) { 5027 assert_eq!( 5028 relay 5029 .group_projection() 5030 .expect("projection") 5031 .member(&GroupId::new(group_id).expect("group"), pubkey) 5032 .expect("member") 5033 .status(), 5034 status 5035 ); 5036 } 5037 5038 fn has_tag(event: &Event, name: &str, values: &[&str]) -> bool { 5039 event.unsigned().tags().iter().any(|tag| { 5040 tag.values().first().is_some_and(|value| value == name) 5041 && tag.values().len() == values.len() + 1 5042 && values.iter().enumerate().all(|(index, expected)| { 5043 tag.values() 5044 .get(index + 1) 5045 .is_some_and(|value| value == expected) 5046 }) 5047 }) 5048 } 5049 5050 fn h(group_id: &str) -> Tag { 5051 Tag::from_parts("h", &[group_id]).expect("h") 5052 } 5053 5054 fn p(pubkey: &PublicKeyHex) -> Tag { 5055 Tag::from_parts("p", &[pubkey.as_str()]).expect("p") 5056 } 5057 5058 fn e(event_id: &EventId) -> Tag { 5059 Tag::from_parts("e", &[event_id.as_str()]).expect("e") 5060 } 5061 5062 fn name(value: &str) -> Tag { 5063 Tag::from_parts("name", &[value]).expect("name") 5064 } 5065 5066 fn private() -> Tag { 5067 Tag::from_parts("private", &[]).expect("private") 5068 } 5069 5070 fn restricted() -> Tag { 5071 Tag::from_parts("restricted", &[]).expect("restricted") 5072 } 5073 5074 fn hidden() -> Tag { 5075 Tag::from_parts("hidden", &[]).expect("hidden") 5076 } 5077 5078 fn closed() -> Tag { 5079 Tag::from_parts("closed", &[]).expect("closed") 5080 } 5081 5082 fn signer(secret_byte: u8) -> RelaySigner { 5083 RelaySigner::from_secret_hex(&format!("{:02x}", secret_byte).repeat(32)).expect("signer") 5084 } 5085 }