base_relay_v2.rs (107974B)
1 #![forbid(unsafe_code)] 2 3 use std::{fs, panic, path::PathBuf}; 4 use tangle_crypto::RelaySigner; 5 use tangle_groups::{ 6 GroupAuthContext, GroupAuthority, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, 7 GroupOutboxEffect, GroupOutboxKey, GroupOutboxRecord, GroupOutboxStatus, GroupProjection, 8 GroupRuntimeConfig, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT, 9 KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, 10 KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, 11 KIND_GROUP_REMOVE_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, 12 PERMANENT_RELAY_OVERRIDE_ROLE, ProjectionCheckpoint, StoreOffset, member_current_key, 13 parse_group_runtime_config_json, projection_checkpoint_key, 14 }; 15 use tangle_protocol::{ 16 Event, EventId, Filter, Kind, PublicKeyHex, RawEventJson, RelayMessage, SignatureHex, 17 SubscriptionId, Tag, UnixTimestamp, UnsignedEvent, event_from_value, event_to_value, 18 filter_from_value, filter_to_value, parse_client_message, parse_event_json, 19 }; 20 use tangle_runtime::{ 21 config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, 22 errors::BaseRelayError, 23 groups::{GroupCheckpointStatus, validate_group_extra_tables}, 24 nip11::BaseRelayInfoConfig, 25 relay::{ 26 auth::BaseAuthState, 27 core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, 28 live::CloseResult, 29 outbound::RuntimeRelayMessage, 30 }, 31 }; 32 use tangle_store_pocket::{ 33 PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketQueryConfig, PocketStoreConfig, 34 PocketStoreHandle, PocketSyncPolicy, PocketTime, TANGLE_GROUP_CHECKPOINT_TABLE, 35 TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, 36 parse_pocket_filter_json, 37 }; 38 use tangle_test_support::{ 39 FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_group_config, 40 tangle_v2_group_tag, tangle_v2_pubkey_tag, tangle_v2_tag, 41 }; 42 43 trait BaseRelayEventTestExt { 44 fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError>; 45 46 fn handle_event_with_auth( 47 &self, 48 event: Event, 49 auth: &BaseAuthState, 50 ) -> Result<RelayMessage, BaseRelayError>; 51 52 fn fanout(&mut self, event: &Event) -> Vec<RelayMessage>; 53 54 fn fanout_with_group_auth( 55 &mut self, 56 event: &Event, 57 auth: &GroupAuthContext, 58 ) -> Vec<RelayMessage>; 59 } 60 61 impl BaseRelayEventTestExt for BaseRelay { 62 fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { 63 let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); 64 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 65 self.handle_pocket_event(&pocket) 66 } 67 68 fn handle_event_with_auth( 69 &self, 70 event: Event, 71 auth: &BaseAuthState, 72 ) -> Result<RelayMessage, BaseRelayError> { 73 let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); 74 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 75 self.handle_pocket_event_with_auth(&pocket, auth) 76 } 77 78 fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { 79 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 80 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 81 runtime_messages_to_protocol(self.fanout_pocket(&pocket)).expect("protocol fanout") 82 } 83 84 fn fanout_with_group_auth( 85 &mut self, 86 event: &Event, 87 auth: &GroupAuthContext, 88 ) -> Vec<RelayMessage> { 89 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 90 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 91 runtime_messages_to_protocol(self.fanout_pocket_with_group_auth(&pocket, auth)) 92 .expect("protocol fanout") 93 } 94 } 95 96 trait BaseRelayReqTestExt { 97 fn handle_req( 98 &mut self, 99 subscription_id: SubscriptionId, 100 filters: Vec<Filter>, 101 ) -> Result<Vec<RelayMessage>, BaseRelayError>; 102 103 fn handle_req_with_auth( 104 &mut self, 105 subscription_id: SubscriptionId, 106 filters: Vec<Filter>, 107 auth: &BaseAuthState, 108 ) -> Result<Vec<RelayMessage>, BaseRelayError>; 109 } 110 111 impl BaseRelayReqTestExt for BaseRelay { 112 fn handle_req( 113 &mut self, 114 subscription_id: SubscriptionId, 115 filters: Vec<Filter>, 116 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 117 runtime_messages_to_protocol( 118 self.handle_pocket_req(subscription_id, pocket_filters(filters))?, 119 ) 120 } 121 122 fn handle_req_with_auth( 123 &mut self, 124 subscription_id: SubscriptionId, 125 filters: Vec<Filter>, 126 auth: &BaseAuthState, 127 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 128 runtime_messages_to_protocol(self.handle_pocket_req_with_auth( 129 subscription_id, 130 pocket_filters(filters), 131 auth, 132 )?) 133 } 134 } 135 136 fn runtime_messages_to_protocol( 137 messages: Vec<RuntimeRelayMessage>, 138 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 139 messages 140 .into_iter() 141 .map(runtime_message_to_protocol) 142 .collect() 143 } 144 145 fn runtime_message_to_protocol( 146 message: RuntimeRelayMessage, 147 ) -> Result<RelayMessage, BaseRelayError> { 148 match message { 149 RuntimeRelayMessage::Protocol(message) => Ok(message), 150 RuntimeRelayMessage::Event { 151 subscription_id, 152 event, 153 } => { 154 let encoded = RuntimeRelayMessage::Event { 155 subscription_id: subscription_id.clone(), 156 event, 157 } 158 .encode()?; 159 let value = serde_json::from_str::<serde_json::Value>(&encoded) 160 .map_err(|error| BaseRelayError::error(error.to_string()))?; 161 let event = value 162 .as_array() 163 .and_then(|items| items.get(2)) 164 .ok_or_else(|| BaseRelayError::error("encoded EVENT frame is malformed")) 165 .and_then(|value| { 166 event_from_value(value) 167 .map_err(|error| BaseRelayError::error(error.to_string())) 168 })?; 169 Ok(RelayMessage::Event { 170 subscription_id, 171 event, 172 }) 173 } 174 } 175 } 176 177 trait BaseRelayCountTestExt { 178 fn handle_count_protocol( 179 &self, 180 subscription_id: SubscriptionId, 181 filters: Vec<Filter>, 182 ) -> Result<RelayMessage, BaseRelayError>; 183 184 fn handle_count_with_auth_protocol( 185 &self, 186 subscription_id: SubscriptionId, 187 filters: Vec<Filter>, 188 auth: &BaseAuthState, 189 ) -> Result<RelayMessage, BaseRelayError>; 190 } 191 192 impl BaseRelayCountTestExt for BaseRelay { 193 fn handle_count_protocol( 194 &self, 195 subscription_id: SubscriptionId, 196 filters: Vec<Filter>, 197 ) -> Result<RelayMessage, BaseRelayError> { 198 BaseRelay::handle_count(self, subscription_id, pocket_filters(filters)) 199 } 200 201 fn handle_count_with_auth_protocol( 202 &self, 203 subscription_id: SubscriptionId, 204 filters: Vec<Filter>, 205 auth: &BaseAuthState, 206 ) -> Result<RelayMessage, BaseRelayError> { 207 BaseRelay::handle_count_with_auth(self, subscription_id, pocket_filters(filters), auth) 208 } 209 } 210 211 fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedFilter> { 212 filters 213 .iter() 214 .map(|filter| { 215 let raw = serde_json::to_vec(&filter_to_value(filter)).expect("filter JSON"); 216 parse_pocket_filter_json(&raw).expect("pocket filter") 217 }) 218 .collect() 219 } 220 221 fn authenticate_pocket_event_for_test( 222 auth: &mut BaseAuthState, 223 event: &Event, 224 now: UnixTimestamp, 225 ) -> Result<PublicKeyHex, BaseRelayError> { 226 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 227 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 228 auth.authenticate_pocket(&pocket, now) 229 } 230 231 fn pocket_event_for_test(event: &Event) -> PocketOwnedEvent { 232 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 233 parse_pocket_event_json(&raw).expect("pocket event") 234 } 235 236 fn pocket_protocol_event( 237 key: FixtureKey, 238 created_at: u64, 239 kind: u64, 240 tags: Vec<Tag>, 241 content: &str, 242 ) -> Event { 243 let tags = pocket_tags_from_protocol(&tags); 244 let pocket = signed_pocket_event( 245 fixture_secret_byte(key), 246 created_at, 247 u16::try_from(kind).expect("pocket kind"), 248 &tags, 249 content.as_bytes(), 250 ); 251 pocket_event_to_protocol(&pocket) 252 } 253 254 fn pocket_protocol_auth_event(key: FixtureKey, challenge: &str, created_at: u64) -> Event { 255 pocket_protocol_event( 256 key, 257 created_at, 258 22_242, 259 vec![ 260 Tag::from_parts("relay", &[TANGLE_V2_RELAY_URL]).expect("relay"), 261 Tag::from_parts("challenge", &[challenge]).expect("challenge"), 262 ], 263 "", 264 ) 265 } 266 267 fn pocket_protocol_group_create_event( 268 key: FixtureKey, 269 group_id: &str, 270 created_at: u64, 271 flags: &[&str], 272 ) -> Event { 273 let mut tags = vec![ 274 tangle_v2_group_tag(group_id).expect("group"), 275 tangle_v2_tag("name", &[group_id]).expect("name"), 276 ]; 277 for flag in flags { 278 tags.push(tangle_v2_tag(flag, &[]).expect("flag")); 279 } 280 pocket_protocol_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 281 } 282 283 fn pocket_protocol_group_event( 284 key: FixtureKey, 285 group_id: &str, 286 created_at: u64, 287 kind: u64, 288 content: &str, 289 ) -> Event { 290 pocket_protocol_event( 291 key, 292 created_at, 293 kind, 294 vec![tangle_v2_group_tag(group_id).expect("group")], 295 content, 296 ) 297 } 298 299 fn pocket_protocol_join_event(key: FixtureKey, group_id: &str, created_at: u64) -> Event { 300 pocket_protocol_group_event( 301 key, 302 group_id, 303 created_at, 304 KIND_GROUP_JOIN_REQUEST.into(), 305 "", 306 ) 307 } 308 309 fn tangle_v2_event( 310 key: FixtureKey, 311 created_at: u64, 312 kind: u64, 313 tags: Vec<Tag>, 314 content: &str, 315 ) -> Result<Event, String> { 316 Ok(pocket_protocol_event(key, created_at, kind, tags, content)) 317 } 318 319 fn tangle_v2_auth_event( 320 key: FixtureKey, 321 challenge: &str, 322 created_at: u64, 323 ) -> Result<Event, String> { 324 Ok(pocket_protocol_auth_event(key, challenge, created_at)) 325 } 326 327 fn tangle_v2_group_create_event( 328 key: FixtureKey, 329 group_id: &str, 330 created_at: u64, 331 flags: &[&str], 332 ) -> Result<Event, String> { 333 Ok(pocket_protocol_group_create_event( 334 key, group_id, created_at, flags, 335 )) 336 } 337 338 fn tangle_v2_group_metadata_event( 339 key: FixtureKey, 340 group_id: &str, 341 name: &str, 342 created_at: u64, 343 flags: &[&str], 344 ) -> Result<Event, String> { 345 let mut tags = vec![ 346 tangle_v2_group_tag(group_id)?, 347 tangle_v2_tag("name", &[name])?, 348 ]; 349 for flag in flags { 350 tags.push(tangle_v2_tag(flag, &[])?); 351 } 352 tangle_v2_event(key, created_at, KIND_GROUP_EDIT_METADATA.into(), tags, "") 353 } 354 355 fn tangle_v2_put_user_event( 356 key: FixtureKey, 357 group_id: &str, 358 target: FixtureKey, 359 created_at: u64, 360 ) -> Result<Event, String> { 361 tangle_v2_event( 362 key, 363 created_at, 364 KIND_GROUP_PUT_USER.into(), 365 vec![ 366 tangle_v2_group_tag(group_id)?, 367 tangle_v2_pubkey_tag(target)?, 368 ], 369 "", 370 ) 371 } 372 373 fn tangle_v2_remove_user_event( 374 key: FixtureKey, 375 group_id: &str, 376 target: FixtureKey, 377 created_at: u64, 378 ) -> Result<Event, String> { 379 tangle_v2_event( 380 key, 381 created_at, 382 KIND_GROUP_REMOVE_USER.into(), 383 vec![ 384 tangle_v2_group_tag(group_id)?, 385 tangle_v2_pubkey_tag(target)?, 386 ], 387 "", 388 ) 389 } 390 391 fn tangle_v2_join_event(key: FixtureKey, group_id: &str, created_at: u64) -> Result<Event, String> { 392 Ok(pocket_protocol_join_event(key, group_id, created_at)) 393 } 394 395 fn tangle_v2_leave_event( 396 key: FixtureKey, 397 group_id: &str, 398 created_at: u64, 399 ) -> Result<Event, String> { 400 tangle_v2_group_event( 401 key, 402 group_id, 403 created_at, 404 KIND_GROUP_LEAVE_REQUEST.into(), 405 "", 406 ) 407 } 408 409 fn tangle_v2_delete_event_event( 410 key: FixtureKey, 411 group_id: &str, 412 target: &Event, 413 created_at: u64, 414 ) -> Result<Event, String> { 415 tangle_v2_event( 416 key, 417 created_at, 418 KIND_GROUP_DELETE_EVENT.into(), 419 vec![ 420 tangle_v2_group_tag(group_id)?, 421 tangle_v2_tag("e", &[target.id().as_str()])?, 422 ], 423 "", 424 ) 425 } 426 427 fn tangle_v2_delete_group_event( 428 key: FixtureKey, 429 group_id: &str, 430 created_at: u64, 431 ) -> Result<Event, String> { 432 tangle_v2_group_event( 433 key, 434 group_id, 435 created_at, 436 KIND_GROUP_DELETE_GROUP.into(), 437 "", 438 ) 439 } 440 441 fn tangle_v2_group_event( 442 key: FixtureKey, 443 group_id: &str, 444 created_at: u64, 445 kind: u64, 446 content: &str, 447 ) -> Result<Event, String> { 448 Ok(pocket_protocol_group_event( 449 key, group_id, created_at, kind, content, 450 )) 451 } 452 453 fn signed_pocket_event( 454 secret_byte: u8, 455 created_at: u64, 456 kind: u16, 457 tags: &PocketOwnedTags, 458 content: &[u8], 459 ) -> PocketOwnedEvent { 460 let secret = format!("{secret_byte:02x}").repeat(32); 461 RelaySigner::from_secret_hex(&secret) 462 .expect("signer") 463 .sign_pocket_event( 464 PocketKind::from_u16(kind), 465 tags, 466 PocketTime::from_u64(created_at), 467 content, 468 ) 469 .expect("pocket event") 470 } 471 472 fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 473 let parts = tags 474 .iter() 475 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 476 .collect::<Vec<_>>(); 477 PocketOwnedTags::new(&parts).expect("pocket tags") 478 } 479 480 fn pocket_event_to_protocol(event: &PocketOwnedEvent) -> Event { 481 let tags = event 482 .tags() 483 .expect("tags") 484 .iter() 485 .map(|tag| { 486 Tag::new( 487 tag.map(|value| std::str::from_utf8(value).expect("utf8").to_owned()) 488 .collect::<Vec<_>>(), 489 ) 490 .expect("tag") 491 }) 492 .collect::<Vec<_>>(); 493 Event::new( 494 EventId::new(&event.id().as_hex_string()).expect("event id"), 495 UnsignedEvent::new( 496 PublicKeyHex::new(&event.pubkey().as_hex_string()).expect("pubkey"), 497 UnixTimestamp::new(event.created_at().as_u64()), 498 Kind::new(u64::from(event.kind().as_u16())).expect("kind"), 499 tags, 500 std::str::from_utf8(event.content()).expect("content"), 501 ), 502 SignatureHex::new(&event.sig().to_string()).expect("sig"), 503 ) 504 } 505 506 fn fixture_secret_byte(key: FixtureKey) -> u8 { 507 match key { 508 FixtureKey::Relay => 9, 509 FixtureKey::Owner => 10, 510 FixtureKey::Admin => 11, 511 FixtureKey::Member => 12, 512 FixtureKey::Outsider => 13, 513 } 514 } 515 516 #[test] 517 fn public_relay_smoke_stores_queries_counts_and_fans_out() { 518 let config = test_store_config("public-smoke"); 519 let mut relay = 520 BaseRelay::open(&config, relay_limits(4), PocketQueryConfig::default()).expect("relay"); 521 let first = pocket_protocol_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello"); 522 let query_id = subscription("public-query"); 523 524 assert_accepted_pocket(relay.handle_event(first.clone()).expect("event"), &first); 525 assert_event_query( 526 relay 527 .handle_req(query_id.clone(), vec![filter_kind(1)]) 528 .expect("query"), 529 &query_id, 530 &[&first], 531 ); 532 assert_count( 533 relay.handle_count_protocol(subscription("public-count"), vec![filter_kind(1)]), 534 1, 535 ); 536 assert_eq!(relay.handle_close(&query_id), CloseResult::Closed); 537 538 let live_id = subscription("public-live"); 539 relay 540 .handle_req(live_id.clone(), vec![filter_kind(1)]) 541 .expect("live"); 542 let second = pocket_protocol_event(FixtureKey::Member, 1_714_124_434, 1, Vec::new(), "again"); 543 assert_accepted_pocket(relay.handle_event(second.clone()).expect("event"), &second); 544 545 assert!(matches!( 546 relay.fanout(&second).as_slice(), 547 [RelayMessage::Event { subscription_id, event }] 548 if subscription_id == &live_id && event.id() == second.id() 549 )); 550 } 551 552 #[test] 553 fn nip11_integration_reports_group_contracts() { 554 let config = runtime_config(true); 555 let disabled_config = runtime_config(false); 556 let document = BaseRelayInfoConfig::new("tangle", &config) 557 .expect("config") 558 .build_document() 559 .expect("document"); 560 let disabled = BaseRelayInfoConfig::new("tangle", &disabled_config) 561 .expect("config") 562 .build_document() 563 .expect("disabled"); 564 565 assert_eq!(document.supported_nips, vec![1, 11, 29, 42, 45, 70]); 566 assert!(!document.supported_nips.contains(&50)); 567 assert!(!document.supported_nips.contains(&77)); 568 assert!(!document.supported_nips.contains(&99)); 569 assert!(document.relay_self().is_some()); 570 assert_eq!(document.limitation.max_message_length, 1_048_576); 571 assert_eq!(document.limitation.max_subscriptions, 64); 572 assert_eq!(document.limitation.max_filters, 10); 573 assert_eq!(document.limitation.max_limit, 500); 574 assert_eq!(document.limitation.max_query_complexity, 2_048); 575 assert_eq!(document.limitation.max_subid_length, 64); 576 assert_eq!(document.limitation.max_event_tags, 200); 577 assert_eq!(document.limitation.max_content_length, 65_536); 578 assert!(!document.limitation.auth_required); 579 assert!(!document.limitation.payment_required); 580 assert!(document.limitation.restricted_writes); 581 assert_eq!(document.limitation.default_limit, 100); 582 assert!(!document.retention.physical_erasure); 583 assert!(!document.retention.compaction_guarantee); 584 assert_eq!( 585 document.retention.group_visibility, 586 "private and hidden group policy gates visibility without implying physical deletion" 587 ); 588 assert_eq!(disabled.supported_nips, vec![1, 11, 42, 45, 70]); 589 assert!(disabled.relay_self().is_none()); 590 } 591 592 #[test] 593 fn auth_integration_covers_challenge_edges() { 594 let mut auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 20, 600).expect("auth"); 595 596 assert_eq!( 597 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 598 .expect("challenge"), 599 RelayMessage::Auth("challenge-a".to_owned()) 600 ); 601 602 let owner_event = tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 105).expect("owner"); 603 let admin_event = tangle_v2_auth_event(FixtureKey::Admin, "challenge-a", 110).expect("admin"); 604 605 let owner = 606 authenticate_pocket_event_for_test(&mut auth, &owner_event, UnixTimestamp::new(105)) 607 .expect("owner"); 608 let admin = 609 authenticate_pocket_event_for_test(&mut auth, &admin_event, UnixTimestamp::new(110)) 610 .expect("admin"); 611 612 assert_ne!(owner, admin); 613 assert!(auth.authenticated_pubkeys().contains(&owner)); 614 assert!(auth.authenticated_pubkeys().contains(&admin)); 615 assert_eq!( 616 authenticate_pocket_event_for_test( 617 &mut auth, 618 &tangle_v2_auth_event(FixtureKey::Member, "wrong", 111).expect("wrong"), 619 UnixTimestamp::new(111), 620 ) 621 .expect_err("wrong") 622 .prefixed_message(), 623 "auth-required: auth challenge does not match" 624 ); 625 626 let expired = BaseAuthState::new(TANGLE_V2_RELAY_URL, 1, 600).expect("expired"); 627 let mut expired = issue_challenge(expired, "challenge-b", 100); 628 assert_eq!( 629 authenticate_pocket_event_for_test( 630 &mut expired, 631 &tangle_v2_auth_event(FixtureKey::Owner, "challenge-b", 101).expect("expired"), 632 UnixTimestamp::new(102), 633 ) 634 .expect_err("expired") 635 .prefixed_message(), 636 "auth-required: auth challenge expired" 637 ); 638 639 let mut wrong_relay = BaseAuthState::new("wss://other.radroots.test", 20, 600).expect("relay"); 640 wrong_relay 641 .issue_challenge("challenge-a", UnixTimestamp::new(100)) 642 .expect("challenge"); 643 assert_eq!( 644 authenticate_pocket_event_for_test( 645 &mut wrong_relay, 646 &owner_event, 647 UnixTimestamp::new(105), 648 ) 649 .expect_err("relay") 650 .prefixed_message(), 651 "auth-required: auth relay does not match canonical relay URL" 652 ); 653 } 654 655 #[test] 656 fn group_auth_lifecycle_membership_and_flag_flows_pass_in_process() { 657 let config = test_store_config("group-flows"); 658 let groups = group_config_with_public_join(); 659 let relay = BaseRelay::open_with_groups( 660 &config, 661 relay_limits(8), 662 &groups, 663 PocketQueryConfig::default(), 664 ) 665 .expect("relay"); 666 let owner_auth = authenticated(FixtureKey::Owner); 667 let admin_auth = authenticated(FixtureKey::Admin); 668 let member_auth = authenticated(FixtureKey::Member); 669 let outsider_auth = authenticated(FixtureKey::Outsider); 670 let create = tangle_v2_group_create_event(FixtureKey::Owner, "Farm", 1, &[]).expect("create"); 671 672 assert_eq!( 673 rejected_message(relay.handle_event(create.clone()).expect("no auth")), 674 "auth-required: group event author must authenticate with AUTH" 675 ); 676 assert_eq!( 677 rejected_message( 678 relay 679 .handle_event_with_auth(create.clone(), &outsider_auth) 680 .expect("wrong auth") 681 ), 682 "auth-required: group event author must authenticate with AUTH" 683 ); 684 assert_accepted( 685 relay 686 .handle_event_with_auth(create.clone(), &owner_auth) 687 .expect("create"), 688 &create, 689 ); 690 691 let metadata = tangle_v2_group_metadata_event(FixtureKey::Admin, "Farm", "Market", 2, &[]) 692 .expect("metadata"); 693 assert_accepted( 694 relay 695 .handle_event_with_auth(metadata.clone(), &admin_auth) 696 .expect("metadata"), 697 &metadata, 698 ); 699 700 let put = 701 tangle_v2_put_user_event(FixtureKey::Admin, "Farm", FixtureKey::Member, 3).expect("put"); 702 assert_accepted( 703 relay 704 .handle_event_with_auth(put.clone(), &admin_auth) 705 .expect("put"), 706 &put, 707 ); 708 assert_eq!( 709 relay 710 .group_projection() 711 .expect("projection") 712 .member(&group("Farm"), &FixtureKey::Member.public_key()) 713 .expect("member") 714 .status(), 715 MemberStatus::Member 716 ); 717 718 let join = tangle_v2_join_event(FixtureKey::Outsider, "Farm", 4).expect("join"); 719 assert_accepted( 720 relay 721 .handle_event_with_auth(join.clone(), &outsider_auth) 722 .expect("join"), 723 &join, 724 ); 725 assert_eq!( 726 rejected_message( 727 relay 728 .handle_event_with_auth( 729 tangle_v2_join_event(FixtureKey::Outsider, "Farm", 5).expect("duplicate"), 730 &outsider_auth, 731 ) 732 .expect("duplicate") 733 ), 734 "duplicate: group member already exists" 735 ); 736 737 let leave = tangle_v2_leave_event(FixtureKey::Outsider, "Farm", 6).expect("leave"); 738 assert_accepted( 739 relay 740 .handle_event_with_auth(leave.clone(), &outsider_auth) 741 .expect("leave"), 742 &leave, 743 ); 744 assert_eq!( 745 rejected_message( 746 relay 747 .handle_event_with_auth( 748 tangle_v2_leave_event(FixtureKey::Admin, "Farm", 7).expect("admin leave"), 749 &admin_auth, 750 ) 751 .expect("admin leave") 752 ), 753 "duplicate: group member does not exist" 754 ); 755 756 let protected_remove = 757 tangle_v2_remove_user_event(FixtureKey::Admin, "Farm", FixtureKey::Admin, 8) 758 .expect("remove admin"); 759 assert_eq!( 760 rejected_message( 761 relay 762 .handle_event_with_auth(protected_remove, &admin_auth) 763 .expect("remove admin") 764 ), 765 "restricted: permanent group admins cannot be removed" 766 ); 767 768 let remove = tangle_v2_remove_user_event(FixtureKey::Admin, "Farm", FixtureKey::Member, 9) 769 .expect("remove member"); 770 assert_accepted( 771 relay 772 .handle_event_with_auth(remove.clone(), &admin_auth) 773 .expect("remove member"), 774 &remove, 775 ); 776 assert_count( 777 relay.handle_count_protocol( 778 subscription("members"), 779 vec![filter_kind(KIND_GROUP_MEMBERS)], 780 ), 781 1, 782 ); 783 assert_eq!(member_auth.authenticated_pubkeys().len(), 1); 784 } 785 786 #[test] 787 fn relay_override_role_changes_generate_admin_snapshots() { 788 let config = test_store_config("role-admin-snapshots"); 789 let mut relay = BaseRelay::open_with_groups( 790 &config, 791 relay_limits(8), 792 &group_config(), 793 PocketQueryConfig::default(), 794 ) 795 .expect("relay"); 796 let owner_auth = authenticated(FixtureKey::Owner); 797 let admin_auth = authenticated(FixtureKey::Admin); 798 let member = FixtureKey::Member.public_key().as_str().to_owned(); 799 let owner = FixtureKey::Owner.public_key().as_str().to_owned(); 800 let admin = FixtureKey::Admin.public_key().as_str().to_owned(); 801 802 accept_group_create(&mut relay, "RoleFarm", &[], 1, &owner_auth); 803 assert_eq!( 804 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).len(), 805 1 806 ); 807 808 let promote = tangle_v2_put_user_event_with_roles( 809 FixtureKey::Admin, 810 "RoleFarm", 811 FixtureKey::Member, 812 2, 813 &[PERMANENT_RELAY_OVERRIDE_ROLE], 814 ); 815 assert_accepted( 816 relay 817 .handle_event_with_auth(promote.clone(), &admin_auth) 818 .expect("promote"), 819 &promote, 820 ); 821 assert!( 822 relay 823 .group_projection() 824 .expect("projection") 825 .member(&group("RoleFarm"), &FixtureKey::Member.public_key()) 826 .expect("member") 827 .roles() 828 .iter() 829 .any(|role| role.as_str() == PERMANENT_RELAY_OVERRIDE_ROLE) 830 ); 831 assert_eq!(outbox_status_counts(&config).stored, 4); 832 assert_eq!( 833 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).len(), 834 2 835 ); 836 assert_eq!( 837 latest_admin_snapshot_pubkeys(&mut relay, "RoleFarm"), 838 sorted_strings([owner.clone(), admin.clone(), member.clone()]) 839 ); 840 841 let demote = tangle_v2_put_user_event_with_roles( 842 FixtureKey::Admin, 843 "RoleFarm", 844 FixtureKey::Member, 845 3, 846 &[], 847 ); 848 assert_accepted( 849 relay 850 .handle_event_with_auth(demote.clone(), &admin_auth) 851 .expect("demote"), 852 &demote, 853 ); 854 assert_eq!( 855 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).len(), 856 3 857 ); 858 assert_eq!( 859 latest_admin_snapshot_pubkeys(&mut relay, "RoleFarm"), 860 sorted_strings([owner, admin]) 861 ); 862 } 863 864 #[test] 865 fn group_join_requests_are_denied_by_default() { 866 let config = test_store_config("group-public-join-default"); 867 let relay = BaseRelay::open_with_groups( 868 &config, 869 relay_limits(8), 870 &group_config(), 871 PocketQueryConfig::default(), 872 ) 873 .expect("relay"); 874 let owner_auth = authenticated(FixtureKey::Owner); 875 let outsider_auth = authenticated(FixtureKey::Outsider); 876 let create = tangle_v2_group_create_event(FixtureKey::Owner, "Farm", 1, &[]).expect("create"); 877 assert_accepted( 878 relay 879 .handle_event_with_auth(create.clone(), &owner_auth) 880 .expect("create"), 881 &create, 882 ); 883 let join = tangle_v2_join_event(FixtureKey::Outsider, "Farm", 2).expect("join"); 884 885 assert_eq!( 886 rejected_message( 887 relay 888 .handle_event_with_auth(join, &outsider_auth) 889 .expect("join") 890 ), 891 "restricted: group is unavailable" 892 ); 893 assert!( 894 relay 895 .group_projection() 896 .expect("projection") 897 .member(&group("Farm"), &FixtureKey::Outsider.public_key()) 898 .is_none() 899 ); 900 assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 0); 901 } 902 903 #[test] 904 fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { 905 let config = test_store_config("privacy-flags"); 906 let mut relay = BaseRelay::open_with_groups( 907 &config, 908 relay_limits(8), 909 &group_config(), 910 PocketQueryConfig::default(), 911 ) 912 .expect("relay"); 913 let owner_auth = authenticated(FixtureKey::Owner); 914 let outsider_auth = authenticated(FixtureKey::Outsider); 915 916 accept_group_create(&mut relay, "PrivateFarm", &["private"], 1, &owner_auth); 917 let private_event = 918 pocket_protocol_group_event(FixtureKey::Owner, "PrivateFarm", 2, 1, "private harvest"); 919 assert_accepted_pocket( 920 relay 921 .handle_event_with_auth(private_event.clone(), &owner_auth) 922 .expect("private"), 923 &private_event, 924 ); 925 926 let unauth_id = subscription("private-unauth"); 927 assert_eq!( 928 relay 929 .handle_req( 930 unauth_id.clone(), 931 vec![filter_group_tag(1, "h", "PrivateFarm")] 932 ) 933 .expect("unauth"), 934 vec![RelayMessage::Closed { 935 subscription_id: unauth_id, 936 message: "auth-required: authentication required to read group events".to_owned() 937 }] 938 ); 939 assert_eq!(relay.active_subscription_count(), 0); 940 assert_count( 941 relay.handle_count_protocol( 942 subscription("private-count-unauth"), 943 vec![filter_group_tag(1, "h", "PrivateFarm")], 944 ), 945 0, 946 ); 947 assert_count( 948 relay.handle_count_protocol( 949 subscription("private-metadata-unauth"), 950 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "PrivateFarm")], 951 ), 952 1, 953 ); 954 assert_count( 955 relay.handle_count_protocol( 956 subscription("private-admins-unauth"), 957 vec![filter_group_tag(KIND_GROUP_ADMINS, "d", "PrivateFarm")], 958 ), 959 1, 960 ); 961 assert_count( 962 relay.handle_count_protocol( 963 subscription("private-members-unauth"), 964 vec![filter_kind(KIND_GROUP_MEMBERS)], 965 ), 966 0, 967 ); 968 let owner_query_id = subscription("private-owner"); 969 assert_event_query( 970 relay 971 .handle_req_with_auth( 972 owner_query_id.clone(), 973 vec![filter_group_tag(1, "h", "PrivateFarm")], 974 &owner_auth, 975 ) 976 .expect("owner"), 977 &owner_query_id, 978 &[&private_event], 979 ); 980 assert_eq!(relay.handle_close(&owner_query_id), CloseResult::Closed); 981 assert_count( 982 relay.handle_count_with_auth_protocol( 983 subscription("private-count-owner"), 984 vec![filter_group_tag(1, "h", "PrivateFarm")], 985 &owner_auth, 986 ), 987 1, 988 ); 989 990 let live_unauth = subscription("live-private-unauth"); 991 let live_owner = subscription("live-private-owner"); 992 assert_eq!( 993 relay 994 .handle_req( 995 live_unauth.clone(), 996 vec![filter_group_tag(1, "h", "PrivateFarm")], 997 ) 998 .expect("live unauth"), 999 vec![RelayMessage::Closed { 1000 subscription_id: live_unauth, 1001 message: "auth-required: authentication required to read group events".to_owned() 1002 }] 1003 ); 1004 relay 1005 .handle_req_with_auth( 1006 live_owner.clone(), 1007 vec![filter_group_tag(1, "h", "PrivateFarm")], 1008 &owner_auth, 1009 ) 1010 .expect("live owner"); 1011 let second_private = 1012 pocket_protocol_group_event(FixtureKey::Owner, "PrivateFarm", 3, 1, "second"); 1013 assert_accepted_pocket( 1014 relay 1015 .handle_event_with_auth(second_private.clone(), &owner_auth) 1016 .expect("second"), 1017 &second_private, 1018 ); 1019 assert!(relay.fanout(&second_private).is_empty()); 1020 let owner_live = relay.fanout_with_group_auth( 1021 &second_private, 1022 &GroupAuthContext::new([FixtureKey::Owner.public_key()]), 1023 ); 1024 assert!(owner_live.iter().any(|message| { 1025 matches!( 1026 message, 1027 RelayMessage::Event { subscription_id, event } 1028 if subscription_id == &live_owner && event.id() == second_private.id() 1029 ) 1030 })); 1031 1032 accept_group_create(&mut relay, "HiddenFarm", &["hidden"], 10, &owner_auth); 1033 assert_count( 1034 relay.handle_count_protocol( 1035 subscription("hidden-unauth"), 1036 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "HiddenFarm")], 1037 ), 1038 0, 1039 ); 1040 assert_count( 1041 relay.handle_count_with_auth_protocol( 1042 subscription("hidden-owner"), 1043 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "HiddenFarm")], 1044 &owner_auth, 1045 ), 1046 1, 1047 ); 1048 1049 accept_group_create( 1050 &mut relay, 1051 "RestrictedFarm", 1052 &["restricted"], 1053 20, 1054 &owner_auth, 1055 ); 1056 assert_eq!( 1057 rejected_message( 1058 relay 1059 .handle_event_with_auth( 1060 pocket_protocol_group_event( 1061 FixtureKey::Outsider, 1062 "RestrictedFarm", 1063 21, 1064 1, 1065 "no", 1066 ), 1067 &outsider_auth, 1068 ) 1069 .expect("restricted") 1070 ), 1071 "restricted: group is unavailable" 1072 ); 1073 1074 accept_group_create(&mut relay, "ClosedFarm", &["closed"], 30, &owner_auth); 1075 assert_eq!( 1076 rejected_message( 1077 relay 1078 .handle_event_with_auth( 1079 pocket_protocol_join_event(FixtureKey::Outsider, "ClosedFarm", 31), 1080 &outsider_auth, 1081 ) 1082 .expect("closed join") 1083 ), 1084 "restricted: group is unavailable" 1085 ); 1086 let closed_normal = 1087 pocket_protocol_group_event(FixtureKey::Outsider, "ClosedFarm", 32, 1, "visible"); 1088 assert_accepted_pocket( 1089 relay 1090 .handle_event_with_auth(closed_normal.clone(), &outsider_auth) 1091 .expect("closed normal"), 1092 &closed_normal, 1093 ); 1094 } 1095 1096 #[test] 1097 fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { 1098 let config = test_store_config("nip29-leak-suite"); 1099 let mut relay = BaseRelay::open_with_groups( 1100 &config, 1101 relay_limits(16), 1102 &group_config(), 1103 PocketQueryConfig::default(), 1104 ) 1105 .expect("relay"); 1106 let owner_auth = authenticated(FixtureKey::Owner); 1107 let admin_auth = authenticated(FixtureKey::Admin); 1108 let member_auth = authenticated(FixtureKey::Member); 1109 let outsider_auth = authenticated(FixtureKey::Outsider); 1110 1111 let unauthorized_create = 1112 tangle_v2_group_create_event(FixtureKey::Owner, "UnauthorizedFarm", 1, &[]) 1113 .expect("unauthorized"); 1114 assert_eq!( 1115 rejected_message( 1116 relay 1117 .handle_event(unauthorized_create.clone()) 1118 .expect("no auth") 1119 ), 1120 "auth-required: group event author must authenticate with AUTH" 1121 ); 1122 assert_eq!( 1123 rejected_message( 1124 relay 1125 .handle_event_with_auth(unauthorized_create, &outsider_auth) 1126 .expect("wrong auth") 1127 ), 1128 "auth-required: group event author must authenticate with AUTH" 1129 ); 1130 assert_count( 1131 relay.handle_count_protocol( 1132 subscription("unauthorized-generated"), 1133 vec![filter_group_tag( 1134 KIND_GROUP_METADATA, 1135 "d", 1136 "UnauthorizedFarm", 1137 )], 1138 ), 1139 0, 1140 ); 1141 1142 accept_group_create(&mut relay, "LeakPrivate", &["private"], 10, &owner_auth); 1143 let put_member = 1144 tangle_v2_put_user_event(FixtureKey::Admin, "LeakPrivate", FixtureKey::Member, 11) 1145 .expect("put member"); 1146 assert_accepted( 1147 relay 1148 .handle_event_with_auth(put_member.clone(), &admin_auth) 1149 .expect("put member"), 1150 &put_member, 1151 ); 1152 let private_event = tangle_v2_group_event(FixtureKey::Member, "LeakPrivate", 12, 1, "private") 1153 .expect("private"); 1154 assert_accepted( 1155 relay 1156 .handle_event_with_auth(private_event.clone(), &member_auth) 1157 .expect("private"), 1158 &private_event, 1159 ); 1160 1161 let private_unauth = subscription("private-leak-unauth"); 1162 assert_eq!( 1163 relay 1164 .handle_req( 1165 private_unauth.clone(), 1166 vec![filter_group_tag(1, "h", "LeakPrivate")], 1167 ) 1168 .expect("private unauth"), 1169 vec![RelayMessage::Closed { 1170 subscription_id: private_unauth, 1171 message: "auth-required: authentication required to read group events".to_owned() 1172 }] 1173 ); 1174 assert_count( 1175 relay.handle_count_protocol( 1176 subscription("private-count-unauth"), 1177 vec![filter_group_tag(1, "h", "LeakPrivate")], 1178 ), 1179 0, 1180 ); 1181 let private_member = subscription("private-leak-member"); 1182 assert_event_query( 1183 relay 1184 .handle_req_with_auth( 1185 private_member.clone(), 1186 vec![filter_group_tag(1, "h", "LeakPrivate")], 1187 &member_auth, 1188 ) 1189 .expect("private member"), 1190 &private_member, 1191 &[&private_event], 1192 ); 1193 assert_eq!(relay.handle_close(&private_member), CloseResult::Closed); 1194 1195 let live_unauth = subscription("private-live-unauth"); 1196 let live_member = subscription("private-live-member"); 1197 assert_eq!( 1198 relay 1199 .handle_req( 1200 live_unauth.clone(), 1201 vec![filter_group_tag(1, "h", "LeakPrivate")], 1202 ) 1203 .expect("private live unauth"), 1204 vec![RelayMessage::Closed { 1205 subscription_id: live_unauth, 1206 message: "auth-required: authentication required to read group events".to_owned() 1207 }] 1208 ); 1209 relay 1210 .handle_req_with_auth( 1211 live_member.clone(), 1212 vec![filter_group_tag(1, "h", "LeakPrivate")], 1213 &member_auth, 1214 ) 1215 .expect("private live member"); 1216 let live_private = tangle_v2_group_event(FixtureKey::Member, "LeakPrivate", 13, 1, "live") 1217 .expect("live private"); 1218 assert_accepted( 1219 relay 1220 .handle_event_with_auth(live_private.clone(), &member_auth) 1221 .expect("live private"), 1222 &live_private, 1223 ); 1224 assert!(relay.fanout(&live_private).is_empty()); 1225 let member_live = relay.fanout_with_group_auth( 1226 &live_private, 1227 &GroupAuthContext::new([FixtureKey::Member.public_key()]), 1228 ); 1229 assert!(member_live.iter().any(|message| matches!( 1230 message, 1231 RelayMessage::Event { subscription_id, event } 1232 if subscription_id == &live_member && event.id() == live_private.id() 1233 ))); 1234 1235 assert_count( 1236 relay.handle_count_protocol( 1237 subscription("private-metadata-public"), 1238 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakPrivate")], 1239 ), 1240 1, 1241 ); 1242 assert_count( 1243 relay.handle_count_protocol( 1244 subscription("private-members-public"), 1245 vec![filter_group_tag(KIND_GROUP_MEMBERS, "d", "LeakPrivate")], 1246 ), 1247 0, 1248 ); 1249 1250 accept_group_create(&mut relay, "LeakHidden", &["hidden"], 20, &owner_auth); 1251 assert_count( 1252 relay.handle_count_protocol( 1253 subscription("hidden-metadata-public"), 1254 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakHidden")], 1255 ), 1256 0, 1257 ); 1258 assert_count( 1259 relay.handle_count_with_auth_protocol( 1260 subscription("hidden-metadata-owner"), 1261 vec![filter_group_tag(KIND_GROUP_METADATA, "d", "LeakHidden")], 1262 &owner_auth, 1263 ), 1264 1, 1265 ); 1266 1267 accept_group_create( 1268 &mut relay, 1269 "LeakRestricted", 1270 &["restricted"], 1271 30, 1272 &owner_auth, 1273 ); 1274 let restricted_event = 1275 tangle_v2_group_event(FixtureKey::Outsider, "LeakRestricted", 31, 1, "restricted") 1276 .expect("restricted"); 1277 assert_eq!( 1278 rejected_message( 1279 relay 1280 .handle_event_with_auth(restricted_event, &outsider_auth) 1281 .expect("restricted") 1282 ), 1283 "restricted: group is unavailable" 1284 ); 1285 assert_count( 1286 relay.handle_count_protocol( 1287 subscription("restricted-count"), 1288 vec![filter_group_tag(1, "h", "LeakRestricted")], 1289 ), 1290 0, 1291 ); 1292 1293 accept_group_create(&mut relay, "LeakClosed", &["closed"], 40, &owner_auth); 1294 let closed_join = 1295 tangle_v2_join_event(FixtureKey::Outsider, "LeakClosed", 41).expect("closed join"); 1296 assert_eq!( 1297 rejected_message( 1298 relay 1299 .handle_event_with_auth(closed_join, &outsider_auth) 1300 .expect("closed join") 1301 ), 1302 "restricted: group is unavailable" 1303 ); 1304 assert_count( 1305 relay.handle_count_protocol( 1306 subscription("closed-join-count"), 1307 vec![filter_group_tag(KIND_GROUP_JOIN_REQUEST, "h", "LeakClosed")], 1308 ), 1309 0, 1310 ); 1311 let closed_normal = 1312 tangle_v2_group_event(FixtureKey::Outsider, "LeakClosed", 42, 1, "closed normal") 1313 .expect("closed normal"); 1314 assert_accepted( 1315 relay 1316 .handle_event_with_auth(closed_normal.clone(), &outsider_auth) 1317 .expect("closed normal"), 1318 &closed_normal, 1319 ); 1320 assert_count( 1321 relay.handle_count_protocol( 1322 subscription("closed-normal-count"), 1323 vec![filter_group_tag(1, "h", "LeakClosed")], 1324 ), 1325 1, 1326 ); 1327 1328 let duplicate_join = 1329 tangle_v2_join_event(FixtureKey::Member, "LeakPrivate", 50).expect("duplicate join"); 1330 assert_eq!( 1331 rejected_message( 1332 relay 1333 .handle_event_with_auth(duplicate_join, &member_auth) 1334 .expect("duplicate join") 1335 ), 1336 "duplicate: group member already exists" 1337 ); 1338 assert_count( 1339 relay.handle_count_protocol( 1340 subscription("duplicate-join-count"), 1341 vec![filter_group_tag( 1342 KIND_GROUP_JOIN_REQUEST, 1343 "h", 1344 "LeakPrivate", 1345 )], 1346 ), 1347 0, 1348 ); 1349 let duplicate_leave = 1350 tangle_v2_leave_event(FixtureKey::Outsider, "LeakPrivate", 51).expect("duplicate leave"); 1351 assert_eq!( 1352 rejected_message( 1353 relay 1354 .handle_event_with_auth(duplicate_leave, &outsider_auth) 1355 .expect("duplicate leave") 1356 ), 1357 "duplicate: group member does not exist" 1358 ); 1359 assert_count( 1360 relay.handle_count_protocol( 1361 subscription("duplicate-leave-count"), 1362 vec![filter_group_tag( 1363 KIND_GROUP_LEAVE_REQUEST, 1364 "h", 1365 "LeakPrivate", 1366 )], 1367 ), 1368 0, 1369 ); 1370 1371 for (index, kind) in NIP29_RELAY_GENERATED_KIND_VALUES 1372 .iter() 1373 .copied() 1374 .enumerate() 1375 { 1376 let generated = tangle_v2_event( 1377 FixtureKey::Owner, 1378 60 + u64::try_from(index).expect("index"), 1379 u64::from(kind), 1380 vec![Tag::from_parts("d", &["ClientGenerated"]).expect("d")], 1381 "", 1382 ) 1383 .expect("generated"); 1384 assert_eq!( 1385 rejected_message( 1386 relay 1387 .handle_event_with_auth(generated, &owner_auth) 1388 .expect("generated") 1389 ), 1390 "blocked: relay-generated group state events cannot be submitted by clients" 1391 ); 1392 assert_count( 1393 relay.handle_count_protocol( 1394 subscription("client-generated-count"), 1395 vec![filter_group_tag(kind, "d", "ClientGenerated")], 1396 ), 1397 0, 1398 ); 1399 } 1400 1401 accept_group_create(&mut relay, "LeakDeleted", &[], 70, &owner_auth); 1402 let deleted_target = tangle_v2_group_event(FixtureKey::Owner, "LeakDeleted", 71, 1, "deleted") 1403 .expect("deleted target"); 1404 assert_accepted( 1405 relay 1406 .handle_event_with_auth(deleted_target.clone(), &owner_auth) 1407 .expect("deleted target"), 1408 &deleted_target, 1409 ); 1410 let delete_target = 1411 tangle_v2_delete_event_event(FixtureKey::Owner, "LeakDeleted", &deleted_target, 72) 1412 .expect("delete target"); 1413 assert_accepted( 1414 relay 1415 .handle_event_with_auth(delete_target.clone(), &owner_auth) 1416 .expect("delete target"), 1417 &delete_target, 1418 ); 1419 assert_count( 1420 relay.handle_count_protocol( 1421 subscription("deleted-target-count"), 1422 vec![filter_group_tag(1, "h", "LeakDeleted")], 1423 ), 1424 0, 1425 ); 1426 let deleted_query = subscription("deleted-target-query"); 1427 assert_eq!( 1428 relay 1429 .handle_req( 1430 deleted_query.clone(), 1431 vec![filter_group_tag(1, "h", "LeakDeleted")], 1432 ) 1433 .expect("deleted query"), 1434 vec![RelayMessage::Closed { 1435 subscription_id: deleted_query, 1436 message: "auth-required: authentication required to read group events".to_owned() 1437 }] 1438 ); 1439 let delete_group = 1440 tangle_v2_delete_group_event(FixtureKey::Owner, "LeakDeleted", 73).expect("delete group"); 1441 assert_accepted( 1442 relay 1443 .handle_event_with_auth(delete_group.clone(), &owner_auth) 1444 .expect("delete group"), 1445 &delete_group, 1446 ); 1447 assert_eq!( 1448 rejected_message( 1449 relay 1450 .handle_event_with_auth( 1451 tangle_v2_group_event(FixtureKey::Owner, "LeakDeleted", 74, 1, "late") 1452 .expect("late deleted"), 1453 &owner_auth, 1454 ) 1455 .expect("late deleted") 1456 ), 1457 "blocked: group is deleted" 1458 ); 1459 1460 accept_group_create( 1461 &mut relay, 1462 "LeakUnauthorizedCapability", 1463 &[], 1464 80, 1465 &owner_auth, 1466 ); 1467 let unauthorized_put = tangle_v2_put_user_event( 1468 FixtureKey::Outsider, 1469 "LeakUnauthorizedCapability", 1470 FixtureKey::Member, 1471 81, 1472 ) 1473 .expect("unauthorized put"); 1474 assert_eq!( 1475 rejected_message( 1476 relay 1477 .handle_event_with_auth(unauthorized_put, &outsider_auth) 1478 .expect("unauthorized put") 1479 ), 1480 "restricted: missing group capability manage_members" 1481 ); 1482 assert_count( 1483 relay.handle_count_protocol( 1484 subscription("unauthorized-put-count"), 1485 vec![filter_group_tag( 1486 KIND_GROUP_PUT_USER, 1487 "h", 1488 "LeakUnauthorizedCapability", 1489 )], 1490 ), 1491 0, 1492 ); 1493 } 1494 1495 #[test] 1496 fn delete_and_secondary_privacy_surfaces_are_read_gated_or_absent() { 1497 let config = test_store_config("delete-privacy"); 1498 let mut relay = BaseRelay::open_with_groups( 1499 &config, 1500 relay_limits(8), 1501 &group_config(), 1502 PocketQueryConfig::default(), 1503 ) 1504 .expect("relay"); 1505 let owner_auth = authenticated(FixtureKey::Owner); 1506 1507 accept_group_create(&mut relay, "DeleteFarm", &[], 1, &owner_auth); 1508 let target = 1509 tangle_v2_group_event(FixtureKey::Owner, "DeleteFarm", 2, 1, "target").expect("target"); 1510 assert_accepted( 1511 relay 1512 .handle_event_with_auth(target.clone(), &owner_auth) 1513 .expect("target"), 1514 &target, 1515 ); 1516 let delete = 1517 tangle_v2_delete_event_event(FixtureKey::Owner, "DeleteFarm", &target, 3).expect("delete"); 1518 assert_accepted( 1519 relay 1520 .handle_event_with_auth(delete.clone(), &owner_auth) 1521 .expect("delete"), 1522 &delete, 1523 ); 1524 1525 assert_count( 1526 relay.handle_count_protocol( 1527 subscription("deleted-target"), 1528 vec![filter_group_tag(1, "h", "DeleteFarm")], 1529 ), 1530 0, 1531 ); 1532 assert_count( 1533 relay.handle_count_protocol( 1534 subscription("delete-marker"), 1535 vec![filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "DeleteFarm")], 1536 ), 1537 0, 1538 ); 1539 let delete_group = 1540 tangle_v2_delete_group_event(FixtureKey::Owner, "DeleteFarm", 4).expect("delete group"); 1541 assert_accepted( 1542 relay 1543 .handle_event_with_auth(delete_group.clone(), &owner_auth) 1544 .expect("delete group"), 1545 &delete_group, 1546 ); 1547 assert_eq!( 1548 rejected_message( 1549 relay 1550 .handle_event_with_auth( 1551 tangle_v2_group_event(FixtureKey::Owner, "DeleteFarm", 5, 1, "late") 1552 .expect("late"), 1553 &owner_auth, 1554 ) 1555 .expect("late") 1556 ), 1557 "blocked: group is deleted" 1558 ); 1559 assert_count( 1560 relay.handle_count_protocol( 1561 subscription("group-marker"), 1562 vec![filter_group_tag(KIND_GROUP_DELETE_GROUP, "h", "DeleteFarm")], 1563 ), 1564 1, 1565 ); 1566 1567 let config = runtime_config(true); 1568 let document = BaseRelayInfoConfig::new("tangle", &config) 1569 .expect("config") 1570 .build_document() 1571 .expect("document"); 1572 assert!(!document.supported_nips.contains(&77)); 1573 assert!(!document.supported_nips.contains(&86)); 1574 assert!(!document.supported_nips.contains(&98)); 1575 } 1576 1577 #[test] 1578 fn group_tombstone_hides_prior_events_and_generated_snapshots() { 1579 let config = test_store_config("group-tombstone-visibility"); 1580 let mut relay = BaseRelay::open_with_groups( 1581 &config, 1582 relay_limits(8), 1583 &group_config(), 1584 PocketQueryConfig::default(), 1585 ) 1586 .expect("relay"); 1587 let owner_auth = authenticated(FixtureKey::Owner); 1588 let admin_auth = authenticated(FixtureKey::Admin); 1589 let member_auth = authenticated(FixtureKey::Member); 1590 1591 accept_group_create(&mut relay, "TombstoneFarm", &[], 1, &owner_auth); 1592 let put = tangle_v2_put_user_event(FixtureKey::Admin, "TombstoneFarm", FixtureKey::Member, 2) 1593 .expect("put"); 1594 assert_accepted( 1595 relay 1596 .handle_event_with_auth(put.clone(), &admin_auth) 1597 .expect("put"), 1598 &put, 1599 ); 1600 let note = 1601 tangle_v2_group_event(FixtureKey::Member, "TombstoneFarm", 3, 1, "harvest").expect("note"); 1602 assert_accepted( 1603 relay 1604 .handle_event_with_auth(note.clone(), &member_auth) 1605 .expect("note"), 1606 ¬e, 1607 ); 1608 1609 assert_count( 1610 relay.handle_count_protocol( 1611 subscription("tombstone-note-before"), 1612 vec![filter_group_tag(1, "h", "TombstoneFarm")], 1613 ), 1614 1, 1615 ); 1616 for (subscription_id, kind) in [ 1617 ("tombstone-metadata-before", KIND_GROUP_METADATA), 1618 ("tombstone-admins-before", KIND_GROUP_ADMINS), 1619 ("tombstone-members-before", KIND_GROUP_MEMBERS), 1620 ] { 1621 assert_count( 1622 relay.handle_count_protocol( 1623 subscription(subscription_id), 1624 vec![filter_group_tag(kind, "d", "TombstoneFarm")], 1625 ), 1626 1, 1627 ); 1628 } 1629 1630 let delete_group = 1631 tangle_v2_delete_group_event(FixtureKey::Owner, "TombstoneFarm", 4).expect("delete group"); 1632 assert_accepted( 1633 relay 1634 .handle_event_with_auth(delete_group.clone(), &owner_auth) 1635 .expect("delete group"), 1636 &delete_group, 1637 ); 1638 1639 assert_count( 1640 relay.handle_count_protocol( 1641 subscription("tombstone-note-after"), 1642 vec![filter_group_tag(1, "h", "TombstoneFarm")], 1643 ), 1644 0, 1645 ); 1646 assert_auth_required_redacted_query( 1647 &mut relay, 1648 "tombstone-note-query", 1649 vec![filter_group_tag(1, "h", "TombstoneFarm")], 1650 ); 1651 for (subscription_id, query_id, kind) in [ 1652 ( 1653 "tombstone-metadata-after", 1654 "tombstone-metadata-query", 1655 KIND_GROUP_METADATA, 1656 ), 1657 ( 1658 "tombstone-admins-after", 1659 "tombstone-admins-query", 1660 KIND_GROUP_ADMINS, 1661 ), 1662 ( 1663 "tombstone-members-after", 1664 "tombstone-members-query", 1665 KIND_GROUP_MEMBERS, 1666 ), 1667 ] { 1668 assert_count( 1669 relay.handle_count_protocol( 1670 subscription(subscription_id), 1671 vec![filter_group_tag(kind, "d", "TombstoneFarm")], 1672 ), 1673 0, 1674 ); 1675 assert_auth_required_redacted_query( 1676 &mut relay, 1677 query_id, 1678 vec![filter_group_tag(kind, "d", "TombstoneFarm")], 1679 ); 1680 } 1681 assert_count( 1682 relay.handle_count_protocol( 1683 subscription("tombstone-marker-after"), 1684 vec![filter_group_tag( 1685 KIND_GROUP_DELETE_GROUP, 1686 "h", 1687 "TombstoneFarm", 1688 )], 1689 ), 1690 1, 1691 ); 1692 } 1693 1694 #[test] 1695 fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent() { 1696 let config = test_store_config("projection-restart"); 1697 let owner_auth = authenticated(FixtureKey::Owner); 1698 { 1699 let mut relay = BaseRelay::open_with_groups( 1700 &config, 1701 relay_limits(8), 1702 &group_config(), 1703 PocketQueryConfig::default(), 1704 ) 1705 .expect("relay"); 1706 accept_group_create(&mut relay, "RestartFarm", &[], 1, &owner_auth); 1707 let put = tangle_v2_put_user_event(FixtureKey::Admin, "RestartFarm", FixtureKey::Member, 2) 1708 .expect("put"); 1709 let admin_auth = authenticated(FixtureKey::Admin); 1710 assert_accepted( 1711 relay 1712 .handle_event_with_auth(put.clone(), &admin_auth) 1713 .expect("put"), 1714 &put, 1715 ); 1716 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 1717 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 1718 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); 1719 relay.shutdown().expect("shutdown"); 1720 } 1721 delete_group_extra_records(&config); 1722 1723 let relay = BaseRelay::open_with_groups( 1724 &config, 1725 relay_limits(8), 1726 &group_config(), 1727 PocketQueryConfig::default(), 1728 ) 1729 .expect("reopen"); 1730 assert_eq!( 1731 relay 1732 .readiness_state() 1733 .response() 1734 .checks 1735 .group_outbox_replay, 1736 "ready" 1737 ); 1738 assert!( 1739 relay 1740 .group_projection() 1741 .expect("projection") 1742 .group(&group("RestartFarm")) 1743 .is_some() 1744 ); 1745 assert_eq!( 1746 relay 1747 .group_projection() 1748 .expect("projection") 1749 .member(&group("RestartFarm"), &FixtureKey::Member.public_key()) 1750 .expect("member") 1751 .status(), 1752 MemberStatus::Member 1753 ); 1754 assert!( 1755 relay 1756 .group_projection() 1757 .expect("projection") 1758 .checkpoint() 1759 .is_some() 1760 ); 1761 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 1762 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 1763 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); 1764 let validation = group_extra_table_validation(&config); 1765 assert!(validation.projection_records() > 0); 1766 assert_eq!(validation.outbox_records(), 3); 1767 assert!(matches!( 1768 validation.checkpoint_status(), 1769 &GroupCheckpointStatus::Current { .. } 1770 )); 1771 1772 let relay = BaseRelay::open_with_groups( 1773 &config, 1774 relay_limits(8), 1775 &group_config(), 1776 PocketQueryConfig::default(), 1777 ) 1778 .expect("second reopen"); 1779 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 1780 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 1781 assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); 1782 } 1783 1784 #[test] 1785 fn projection_applies_canonical_events_after_checkpoint_on_restart() { 1786 let config = test_store_config("projection-incremental"); 1787 let owner_auth = authenticated(FixtureKey::Owner); 1788 let admin_auth = authenticated(FixtureKey::Admin); 1789 let create = 1790 tangle_v2_group_create_event(FixtureKey::Owner, "IncrementalFarm", 1, &[]).expect("create"); 1791 let put = tangle_v2_put_user_event(FixtureKey::Admin, "IncrementalFarm", FixtureKey::Member, 2) 1792 .expect("put"); 1793 { 1794 let mut relay = BaseRelay::open_with_groups( 1795 &config, 1796 relay_limits(8), 1797 &group_config(), 1798 PocketQueryConfig::default(), 1799 ) 1800 .expect("relay"); 1801 assert_accepted( 1802 relay 1803 .handle_event_with_auth(create.clone(), &owner_auth) 1804 .expect("create"), 1805 &create, 1806 ); 1807 assert_accepted( 1808 relay 1809 .handle_event_with_auth(put.clone(), &admin_auth) 1810 .expect("put"), 1811 &put, 1812 ); 1813 relay.shutdown().expect("shutdown"); 1814 } 1815 let create_offset = stored_event_offset(&config, &create); 1816 regress_member_projection_to_checkpoint(&config, create_offset, "IncrementalFarm"); 1817 1818 let relay = BaseRelay::open_with_groups( 1819 &config, 1820 relay_limits(8), 1821 &group_config(), 1822 PocketQueryConfig::default(), 1823 ) 1824 .expect("reopen"); 1825 assert_eq!( 1826 relay 1827 .group_projection() 1828 .expect("projection") 1829 .member(&group("IncrementalFarm"), &FixtureKey::Member.public_key()) 1830 .expect("member") 1831 .status(), 1832 MemberStatus::Member 1833 ); 1834 let validation = group_extra_table_validation(&config); 1835 match validation.checkpoint_status() { 1836 GroupCheckpointStatus::Current { checkpoint } => assert!( 1837 checkpoint 1838 .last_offset() 1839 .is_some_and(|offset| offset.as_u64() > create_offset) 1840 ), 1841 status => panic!("expected current checkpoint, got {status:?}"), 1842 } 1843 } 1844 1845 #[test] 1846 fn source_store_crash_recovery_rebuilds_projection_outbox_and_generated_events() { 1847 let config = test_store_config("source-store-crash-recovery"); 1848 let create = 1849 tangle_v2_group_create_event(FixtureKey::Owner, "CrashFarm", 1, &[]).expect("create"); 1850 let put = tangle_v2_put_user_event(FixtureKey::Admin, "CrashFarm", FixtureKey::Member, 2) 1851 .expect("put"); 1852 1853 store_source_events(&config, &[create, put]); 1854 1855 let relay = BaseRelay::open_with_groups( 1856 &config, 1857 relay_limits(8), 1858 &group_config(), 1859 PocketQueryConfig::default(), 1860 ) 1861 .expect("reopen"); 1862 assert_eq!( 1863 relay 1864 .readiness_state() 1865 .response() 1866 .checks 1867 .group_outbox_replay, 1868 "ready" 1869 ); 1870 assert!( 1871 relay 1872 .group_projection() 1873 .expect("projection") 1874 .group(&group("CrashFarm")) 1875 .is_some() 1876 ); 1877 assert_eq!( 1878 relay 1879 .group_projection() 1880 .expect("projection") 1881 .member(&group("CrashFarm"), &FixtureKey::Member.public_key()) 1882 .expect("member") 1883 .status(), 1884 MemberStatus::Member 1885 ); 1886 assert_eq!( 1887 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA).len(), 1888 1 1889 ); 1890 assert_eq!( 1891 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).len(), 1892 1 1893 ); 1894 assert_eq!( 1895 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS).len(), 1896 1 1897 ); 1898 let counts = outbox_status_counts(&config); 1899 assert_eq!(counts.pending, 0); 1900 assert_eq!(counts.retryable, 0); 1901 assert_eq!(counts.stored, 3); 1902 } 1903 1904 #[test] 1905 fn source_before_outbox_recovery_derives_missing_records_without_duplicates() { 1906 let config = test_store_config("source-before-outbox-recovery"); 1907 let events = recovery_equivalence_events(); 1908 let offsets = store_source_events(&config, &events); 1909 assert_eq!(offsets.len(), events.len()); 1910 assert_eq!(outbox_status_counts(&config), OutboxStatusCounts::default()); 1911 1912 let mut recovered = BaseRelay::open_with_groups( 1913 &config, 1914 relay_limits(8), 1915 &group_config(), 1916 PocketQueryConfig::default(), 1917 ) 1918 .expect("recovered"); 1919 let first_summary = recovery_summary(&mut recovered, "CrashFarm"); 1920 let first_outbox = outbox_status_counts(&config); 1921 let first_metadata_ids = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA); 1922 let first_admin_ids = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS); 1923 let first_member_ids = stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS); 1924 1925 assert_eq!(first_outbox.pending, 0); 1926 assert_eq!(first_outbox.retryable, 0); 1927 assert_eq!(first_outbox.stored, 5); 1928 assert_eq!(first_summary.group_name.as_deref(), Some("Crash Market")); 1929 assert_eq!(first_summary.member_status, Some(MemberStatus::Member)); 1930 assert_eq!( 1931 first_summary.member_roles, 1932 vec![PERMANENT_RELAY_OVERRIDE_ROLE.to_owned()] 1933 ); 1934 assert_eq!(first_metadata_ids.len(), 2); 1935 assert_eq!(first_admin_ids.len(), 2); 1936 assert_eq!(first_member_ids.len(), 1); 1937 recovered.shutdown().expect("shutdown"); 1938 1939 let mut reopened = BaseRelay::open_with_groups( 1940 &config, 1941 relay_limits(8), 1942 &group_config(), 1943 PocketQueryConfig::default(), 1944 ) 1945 .expect("reopen"); 1946 assert_eq!(recovery_summary(&mut reopened, "CrashFarm"), first_summary); 1947 assert_eq!(outbox_status_counts(&config), first_outbox); 1948 assert_eq!( 1949 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 1950 first_metadata_ids 1951 ); 1952 assert_eq!( 1953 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 1954 first_admin_ids 1955 ); 1956 assert_eq!( 1957 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS), 1958 first_member_ids 1959 ); 1960 } 1961 1962 #[test] 1963 fn outbox_before_generated_recovery_materializes_pending_records_without_duplicates() { 1964 let config = test_store_config("outbox-before-generated-recovery"); 1965 let events = recovery_equivalence_events(); 1966 let offsets = store_source_events(&config, &events); 1967 seed_pending_recovery_outbox_records(&config, &events, &offsets); 1968 assert_eq!( 1969 outbox_status_counts(&config), 1970 OutboxStatusCounts { 1971 pending: 5, 1972 retryable: 0, 1973 stored: 0, 1974 } 1975 ); 1976 assert!(stored_event_ids_for_kind(&config, KIND_GROUP_METADATA).is_empty()); 1977 assert!(stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).is_empty()); 1978 assert!(stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS).is_empty()); 1979 1980 let mut recovered = BaseRelay::open_with_groups( 1981 &config, 1982 relay_limits(8), 1983 &group_config(), 1984 PocketQueryConfig::default(), 1985 ) 1986 .expect("recovered"); 1987 let first_summary = recovery_summary(&mut recovered, "CrashFarm"); 1988 let first_outbox = outbox_status_counts(&config); 1989 let first_metadata_ids = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA); 1990 let first_admin_ids = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS); 1991 let first_member_ids = stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS); 1992 1993 assert_eq!(first_outbox.pending, 0); 1994 assert_eq!(first_outbox.retryable, 0); 1995 assert_eq!(first_outbox.stored, 5); 1996 assert_eq!(first_summary.group_name.as_deref(), Some("Crash Market")); 1997 assert_eq!(first_summary.member_status, Some(MemberStatus::Member)); 1998 assert_eq!( 1999 first_summary.member_roles, 2000 vec![PERMANENT_RELAY_OVERRIDE_ROLE.to_owned()] 2001 ); 2002 assert_eq!(first_metadata_ids.len(), 2); 2003 assert_eq!(first_admin_ids.len(), 2); 2004 assert_eq!(first_member_ids.len(), 1); 2005 recovered.shutdown().expect("shutdown"); 2006 2007 let mut reopened = BaseRelay::open_with_groups( 2008 &config, 2009 relay_limits(8), 2010 &group_config(), 2011 PocketQueryConfig::default(), 2012 ) 2013 .expect("reopen"); 2014 assert_eq!(recovery_summary(&mut reopened, "CrashFarm"), first_summary); 2015 assert_eq!(outbox_status_counts(&config), first_outbox); 2016 assert_eq!( 2017 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 2018 first_metadata_ids 2019 ); 2020 assert_eq!( 2021 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 2022 first_admin_ids 2023 ); 2024 assert_eq!( 2025 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS), 2026 first_member_ids 2027 ); 2028 } 2029 2030 #[test] 2031 fn generated_before_outbox_mark_recovery_marks_records_without_duplicates() { 2032 let config = test_store_config("generated-before-outbox-mark-recovery"); 2033 let events = recovery_equivalence_events(); 2034 let offsets = store_source_events(&config, &events); 2035 let records = seed_pending_recovery_outbox_records(&config, &events, &offsets); 2036 store_generated_events_for_pending_outbox_records(&config, &records); 2037 let generated_metadata_ids = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA); 2038 let generated_admin_ids = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS); 2039 let generated_member_ids = stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS); 2040 2041 assert_eq!( 2042 outbox_status_counts(&config), 2043 OutboxStatusCounts { 2044 pending: 5, 2045 retryable: 0, 2046 stored: 0, 2047 } 2048 ); 2049 assert_eq!(generated_metadata_ids.len(), 2); 2050 assert_eq!(generated_admin_ids.len(), 2); 2051 assert_eq!(generated_member_ids.len(), 1); 2052 2053 let mut recovered = BaseRelay::open_with_groups( 2054 &config, 2055 relay_limits(8), 2056 &group_config(), 2057 PocketQueryConfig::default(), 2058 ) 2059 .expect("recovered"); 2060 let first_summary = recovery_summary(&mut recovered, "CrashFarm"); 2061 let first_outbox = outbox_status_counts(&config); 2062 2063 assert_eq!(first_outbox.pending, 0); 2064 assert_eq!(first_outbox.retryable, 0); 2065 assert_eq!(first_outbox.stored, 5); 2066 assert_eq!(first_summary.group_name.as_deref(), Some("Crash Market")); 2067 assert_eq!(first_summary.member_status, Some(MemberStatus::Member)); 2068 assert_eq!( 2069 first_summary.member_roles, 2070 vec![PERMANENT_RELAY_OVERRIDE_ROLE.to_owned()] 2071 ); 2072 assert_eq!(first_summary.metadata_event_ids.len(), 1); 2073 assert_eq!(first_summary.admin_event_ids.len(), 1); 2074 assert_eq!(first_summary.member_event_ids.len(), 1); 2075 assert_eq!( 2076 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 2077 generated_metadata_ids 2078 ); 2079 assert_eq!( 2080 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 2081 generated_admin_ids 2082 ); 2083 assert_eq!( 2084 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS), 2085 generated_member_ids 2086 ); 2087 recovered.shutdown().expect("shutdown"); 2088 2089 let mut reopened = BaseRelay::open_with_groups( 2090 &config, 2091 relay_limits(8), 2092 &group_config(), 2093 PocketQueryConfig::default(), 2094 ) 2095 .expect("reopen"); 2096 assert_eq!(recovery_summary(&mut reopened, "CrashFarm"), first_summary); 2097 assert_eq!(outbox_status_counts(&config), first_outbox); 2098 assert_eq!( 2099 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 2100 generated_metadata_ids 2101 ); 2102 assert_eq!( 2103 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 2104 generated_admin_ids 2105 ); 2106 assert_eq!( 2107 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS), 2108 generated_member_ids 2109 ); 2110 } 2111 2112 #[test] 2113 fn rebuilt_projection_matches_live_projection_for_moderation_stream() { 2114 let config = test_store_config("projection-equivalence"); 2115 let owner_auth = authenticated(FixtureKey::Owner); 2116 let admin_auth = authenticated(FixtureKey::Admin); 2117 let member_auth = authenticated(FixtureKey::Member); 2118 let live_projection; 2119 let metadata_before; 2120 let admins_before; 2121 let members_before; 2122 2123 { 2124 let mut relay = BaseRelay::open_with_groups( 2125 &config, 2126 relay_limits(16), 2127 &group_config(), 2128 PocketQueryConfig::default(), 2129 ) 2130 .expect("relay"); 2131 accept_group_create(&mut relay, "EquivFarm", &[], 1, &owner_auth); 2132 let metadata = 2133 tangle_v2_group_metadata_event(FixtureKey::Admin, "EquivFarm", "Market", 2, &[]) 2134 .expect("metadata"); 2135 assert_accepted( 2136 relay 2137 .handle_event_with_auth(metadata.clone(), &admin_auth) 2138 .expect("metadata"), 2139 &metadata, 2140 ); 2141 let promote = tangle_v2_put_user_event_with_roles( 2142 FixtureKey::Admin, 2143 "EquivFarm", 2144 FixtureKey::Member, 2145 3, 2146 &[PERMANENT_RELAY_OVERRIDE_ROLE], 2147 ); 2148 assert_accepted( 2149 relay 2150 .handle_event_with_auth(promote.clone(), &admin_auth) 2151 .expect("promote"), 2152 &promote, 2153 ); 2154 let normal = tangle_v2_group_event(FixtureKey::Member, "EquivFarm", 4, 1, "harvest") 2155 .expect("normal"); 2156 assert_accepted( 2157 relay 2158 .handle_event_with_auth(normal.clone(), &member_auth) 2159 .expect("normal"), 2160 &normal, 2161 ); 2162 let delete_event = tangle_v2_delete_event_event(FixtureKey::Admin, "EquivFarm", &normal, 5) 2163 .expect("delete event"); 2164 assert_accepted( 2165 relay 2166 .handle_event_with_auth(delete_event.clone(), &admin_auth) 2167 .expect("delete event"), 2168 &delete_event, 2169 ); 2170 let demote = tangle_v2_put_user_event_with_roles( 2171 FixtureKey::Admin, 2172 "EquivFarm", 2173 FixtureKey::Member, 2174 6, 2175 &[], 2176 ); 2177 assert_accepted( 2178 relay 2179 .handle_event_with_auth(demote.clone(), &admin_auth) 2180 .expect("demote"), 2181 &demote, 2182 ); 2183 let remove = 2184 tangle_v2_remove_user_event(FixtureKey::Admin, "EquivFarm", FixtureKey::Member, 7) 2185 .expect("remove"); 2186 assert_accepted( 2187 relay 2188 .handle_event_with_auth(remove.clone(), &admin_auth) 2189 .expect("remove"), 2190 &remove, 2191 ); 2192 let delete_group = 2193 tangle_v2_delete_group_event(FixtureKey::Owner, "EquivFarm", 8).expect("delete group"); 2194 assert_accepted( 2195 relay 2196 .handle_event_with_auth(delete_group.clone(), &owner_auth) 2197 .expect("delete group"), 2198 &delete_group, 2199 ); 2200 live_projection = relay.group_projection().expect("projection").clone(); 2201 metadata_before = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA); 2202 admins_before = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS); 2203 members_before = stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS); 2204 relay.shutdown().expect("shutdown"); 2205 } 2206 2207 delete_group_extra_records(&config); 2208 2209 let relay = BaseRelay::open_with_groups( 2210 &config, 2211 relay_limits(16), 2212 &group_config(), 2213 PocketQueryConfig::default(), 2214 ) 2215 .expect("reopen"); 2216 let recovered_projection = relay.group_projection().expect("projection"); 2217 assert_projection_without_checkpoint_eq(&live_projection, &recovered_projection); 2218 assert_eq!( 2219 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 2220 metadata_before 2221 ); 2222 assert_eq!( 2223 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 2224 admins_before 2225 ); 2226 assert_eq!( 2227 stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS), 2228 members_before 2229 ); 2230 } 2231 2232 #[test] 2233 fn pending_and_retryable_group_outbox_records_materialize_on_restart() { 2234 let config = test_store_config("outbox-retryable-restart"); 2235 let owner_auth = authenticated(FixtureKey::Owner); 2236 { 2237 let mut relay = BaseRelay::open_with_groups( 2238 &config, 2239 relay_limits(8), 2240 &group_config(), 2241 PocketQueryConfig::default(), 2242 ) 2243 .expect("relay"); 2244 accept_group_create(&mut relay, "OutboxFarm", &[], 1, &owner_auth); 2245 relay.shutdown().expect("shutdown"); 2246 } 2247 regress_outbox_records_to_retryable(&config); 2248 assert_eq!(outbox_status_counts(&config).pending, 1); 2249 assert_eq!(outbox_status_counts(&config).retryable, 1); 2250 2251 let relay = BaseRelay::open_with_groups( 2252 &config, 2253 relay_limits(8), 2254 &group_config(), 2255 PocketQueryConfig::default(), 2256 ) 2257 .expect("reopen"); 2258 assert_eq!( 2259 relay 2260 .readiness_state() 2261 .response() 2262 .checks 2263 .group_outbox_replay, 2264 "ready" 2265 ); 2266 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 2267 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 2268 let counts = outbox_status_counts(&config); 2269 assert_eq!(counts.pending, 0); 2270 assert_eq!(counts.retryable, 0); 2271 assert!(counts.stored >= 2); 2272 } 2273 2274 #[test] 2275 fn max_outbox_replay_batch_one_drains_all_pending_generated_records() { 2276 let config = test_store_config("outbox-batch-one"); 2277 let owner_auth = authenticated(FixtureKey::Owner); 2278 let mut relay = BaseRelay::open_with_groups( 2279 &config, 2280 relay_limits(8), 2281 &group_config_with_outbox_batch(1), 2282 PocketQueryConfig::default(), 2283 ) 2284 .expect("relay"); 2285 2286 accept_group_create(&mut relay, "BatchFarm", &[], 1, &owner_auth); 2287 2288 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 2289 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 2290 let counts = outbox_status_counts(&config); 2291 assert_eq!(counts.pending, 0); 2292 assert_eq!(counts.retryable, 0); 2293 assert_eq!(counts.stored, 2); 2294 } 2295 2296 #[test] 2297 fn already_stored_generated_events_mark_outbox_stored_without_duplication_on_restart() { 2298 let config = test_store_config("outbox-generated-already-stored"); 2299 let owner_auth = authenticated(FixtureKey::Owner); 2300 { 2301 let mut relay = BaseRelay::open_with_groups( 2302 &config, 2303 relay_limits(8), 2304 &group_config(), 2305 PocketQueryConfig::default(), 2306 ) 2307 .expect("relay"); 2308 accept_group_create(&mut relay, "StoredGeneratedFarm", &[], 1, &owner_auth); 2309 relay.shutdown().expect("shutdown"); 2310 } 2311 let metadata_before = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA); 2312 let admins_before = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS); 2313 assert_eq!(metadata_before.len(), 1); 2314 assert_eq!(admins_before.len(), 1); 2315 2316 regress_outbox_records_to_pending(&config); 2317 assert_eq!(outbox_status_counts(&config).pending, 2); 2318 2319 let relay = BaseRelay::open_with_groups( 2320 &config, 2321 relay_limits(8), 2322 &group_config(), 2323 PocketQueryConfig::default(), 2324 ) 2325 .expect("reopen"); 2326 assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); 2327 assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); 2328 assert_eq!( 2329 stored_event_ids_for_kind(&config, KIND_GROUP_METADATA), 2330 metadata_before 2331 ); 2332 assert_eq!( 2333 stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS), 2334 admins_before 2335 ); 2336 let counts = outbox_status_counts(&config); 2337 assert_eq!(counts.pending, 0); 2338 assert_eq!(counts.stored, 2); 2339 } 2340 2341 #[test] 2342 fn crash_point_recovery_states_match_live_projection_and_generated_events() { 2343 let live_config = test_store_config("crash-equivalence-live"); 2344 let source_only_config = test_store_config("crash-equivalence-source-only"); 2345 let pending_outbox_config = test_store_config("crash-equivalence-pending-outbox"); 2346 let events = recovery_equivalence_events(); 2347 let expected = { 2348 let mut relay = BaseRelay::open_with_groups( 2349 &live_config, 2350 relay_limits(8), 2351 &group_config(), 2352 PocketQueryConfig::default(), 2353 ) 2354 .expect("live"); 2355 let owner_auth = authenticated(FixtureKey::Owner); 2356 let admin_auth = authenticated(FixtureKey::Admin); 2357 assert_accepted( 2358 relay 2359 .handle_event_with_auth(events[0].clone(), &owner_auth) 2360 .expect("create"), 2361 &events[0], 2362 ); 2363 for event in events.iter().skip(1) { 2364 assert_accepted( 2365 relay 2366 .handle_event_with_auth(event.clone(), &admin_auth) 2367 .expect("event"), 2368 event, 2369 ); 2370 } 2371 recovery_summary(&mut relay, "CrashFarm") 2372 }; 2373 2374 store_source_events(&source_only_config, &events); 2375 let mut source_only = BaseRelay::open_with_groups( 2376 &source_only_config, 2377 relay_limits(8), 2378 &group_config(), 2379 PocketQueryConfig::default(), 2380 ) 2381 .expect("source only"); 2382 assert_eq!(recovery_summary(&mut source_only, "CrashFarm"), expected); 2383 assert_eq!(outbox_status_counts(&source_only_config).stored, 5); 2384 2385 let offsets = store_source_events(&pending_outbox_config, &events); 2386 seed_pending_create_outbox_records(&pending_outbox_config, &events[0], offsets[0]); 2387 let mut pending_outbox = BaseRelay::open_with_groups( 2388 &pending_outbox_config, 2389 relay_limits(8), 2390 &group_config(), 2391 PocketQueryConfig::default(), 2392 ) 2393 .expect("pending outbox"); 2394 assert_eq!(recovery_summary(&mut pending_outbox, "CrashFarm"), expected); 2395 let counts = outbox_status_counts(&pending_outbox_config); 2396 assert_eq!(counts.pending, 0); 2397 assert_eq!(counts.retryable, 0); 2398 assert_eq!(counts.stored, 5); 2399 } 2400 2401 #[test] 2402 fn same_timestamp_conflicts_are_deterministic_across_ingest_order() { 2403 let first = tangle_v2_group_metadata_event(FixtureKey::Owner, "ClockFarm", "Alpha", 100, &[]) 2404 .expect("first"); 2405 let second = tangle_v2_group_metadata_event(FixtureKey::Owner, "ClockFarm", "Beta", 100, &[]) 2406 .expect("second"); 2407 let expected = if first.id() > second.id() { 2408 "Alpha" 2409 } else { 2410 "Beta" 2411 }; 2412 2413 assert_eq!( 2414 final_group_name_for_order("conflict-a", [&first, &second]), 2415 expected 2416 ); 2417 assert_eq!( 2418 final_group_name_for_order("conflict-b", [&second, &first]), 2419 expected 2420 ); 2421 } 2422 2423 #[test] 2424 fn malformed_input_fuzz_smoke_rejects_without_panic() { 2425 for raw in [ 2426 "", 2427 "[]", 2428 "[\"EVENT\"]", 2429 "[\"REQ\",\"sub\",{\"#h\":[1]}]", 2430 "[\"AUTH\",{}]", 2431 "[\"COUNT\",\"sub\",{\"kinds\":[4294967296]}]", 2432 ] { 2433 panic::catch_unwind(|| { 2434 let _ = parse_client_message(raw); 2435 }) 2436 .expect("client parser must not panic"); 2437 } 2438 2439 for raw in [ 2440 "{}", 2441 "{\"id\":\"bad\"}", 2442 "{\"id\":\"0000000000000000000000000000000000000000000000000000000000000000\",\"pubkey\":\"bad\",\"created_at\":0,\"kind\":1,\"tags\":[],\"content\":\"\",\"sig\":\"bad\"}", 2443 ] { 2444 panic::catch_unwind(|| { 2445 if let Ok(raw) = RawEventJson::new(raw) { 2446 let _ = parse_event_json(&raw); 2447 } 2448 }) 2449 .expect("event parser must not panic"); 2450 } 2451 2452 for value in [ 2453 serde_json::json!({"#h":[1]}), 2454 serde_json::json!({"ids":[1]}), 2455 serde_json::json!({"authors":[false]}), 2456 serde_json::json!({"kinds":["bad"]}), 2457 serde_json::json!({"limit":-1}), 2458 ] { 2459 panic::catch_unwind(|| { 2460 let _ = filter_from_value(&value); 2461 }) 2462 .expect("filter parser must not panic"); 2463 } 2464 2465 for values in [vec![], vec!["".to_owned()], vec!["h".to_owned()]] { 2466 panic::catch_unwind(|| { 2467 let _ = Tag::new(values); 2468 }) 2469 .expect("tag parser must not panic"); 2470 } 2471 } 2472 2473 fn accept_group_create( 2474 relay: &mut BaseRelay, 2475 group_id: &str, 2476 flags: &[&str], 2477 created_at: u64, 2478 auth: &BaseAuthState, 2479 ) { 2480 let event = pocket_protocol_group_create_event(FixtureKey::Owner, group_id, created_at, flags); 2481 assert_accepted_pocket( 2482 relay 2483 .handle_event_with_auth(event.clone(), auth) 2484 .expect("create"), 2485 &event, 2486 ); 2487 } 2488 2489 #[derive(Debug, Clone, PartialEq, Eq)] 2490 struct RecoverySummary { 2491 group_name: Option<String>, 2492 member_status: Option<MemberStatus>, 2493 member_roles: Vec<String>, 2494 metadata_event_ids: Vec<String>, 2495 admin_event_ids: Vec<String>, 2496 member_event_ids: Vec<String>, 2497 latest_admin_pubkeys: Vec<String>, 2498 } 2499 2500 fn recovery_equivalence_events() -> Vec<Event> { 2501 vec![ 2502 tangle_v2_group_create_event(FixtureKey::Owner, "CrashFarm", 1, &[]).expect("create"), 2503 tangle_v2_put_user_event_with_roles( 2504 FixtureKey::Admin, 2505 "CrashFarm", 2506 FixtureKey::Member, 2507 2, 2508 &[PERMANENT_RELAY_OVERRIDE_ROLE], 2509 ), 2510 tangle_v2_group_metadata_event(FixtureKey::Admin, "CrashFarm", "Crash Market", 3, &[]) 2511 .expect("metadata"), 2512 ] 2513 } 2514 2515 fn recovery_summary(relay: &mut BaseRelay, group_id: &str) -> RecoverySummary { 2516 let group_id_model = group(group_id); 2517 let (group_name, member_status, member_roles) = { 2518 let projection = relay.group_projection().expect("projection"); 2519 let group_state = projection.group(&group_id_model).expect("group"); 2520 let member = projection.member(&group_id_model, &FixtureKey::Member.public_key()); 2521 let mut roles = member 2522 .map(|member| { 2523 member 2524 .roles() 2525 .iter() 2526 .map(|role| role.as_str().to_owned()) 2527 .collect::<Vec<_>>() 2528 }) 2529 .unwrap_or_default(); 2530 roles.sort(); 2531 ( 2532 group_state.metadata().name().map(str::to_owned), 2533 member.map(|member| member.status()), 2534 roles, 2535 ) 2536 }; 2537 RecoverySummary { 2538 group_name, 2539 member_status, 2540 member_roles, 2541 metadata_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_METADATA), 2542 admin_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_ADMINS), 2543 member_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_MEMBERS), 2544 latest_admin_pubkeys: latest_admin_snapshot_pubkeys(relay, group_id), 2545 } 2546 } 2547 2548 fn assert_projection_without_checkpoint_eq(left: &GroupProjection, right: &GroupProjection) { 2549 assert_eq!(left.groups(), right.groups()); 2550 assert_eq!(left.members(), right.members()); 2551 assert_eq!(left.roles(), right.roles()); 2552 assert_eq!(left.tombstones(), right.tombstones()); 2553 assert_eq!(left.event_deletions(), right.event_deletions()); 2554 } 2555 2556 fn event_ids_for_group_kind(relay: &mut BaseRelay, group_id: &str, kind: u32) -> Vec<String> { 2557 let mut ids = query_events( 2558 relay, 2559 &format!("summary-{group_id}-{kind}"), 2560 vec![filter_group_tag(kind, "d", group_id)], 2561 ) 2562 .into_iter() 2563 .map(|event| event.id().as_str().to_owned()) 2564 .collect::<Vec<_>>(); 2565 ids.sort(); 2566 ids 2567 } 2568 2569 fn store_source_events(config: &PocketStoreConfig, events: &[Event]) -> Vec<StoreOffset> { 2570 let store = PocketStoreHandle::open(config).expect("store"); 2571 let mut offsets = Vec::new(); 2572 for event in events { 2573 let pocket = pocket_event_for_test(event); 2574 offsets.push(StoreOffset::new(store.store_event(&pocket).expect("store"))); 2575 } 2576 store.sync().expect("sync"); 2577 offsets 2578 } 2579 2580 fn seed_pending_create_outbox_records( 2581 config: &PocketStoreConfig, 2582 create: &Event, 2583 create_offset: StoreOffset, 2584 ) { 2585 let store = PocketStoreHandle::open(config).expect("store"); 2586 let group_id = group("CrashFarm"); 2587 let mut projection = GroupProjection::new(); 2588 let create_pocket = pocket_event_for_test(create); 2589 projection 2590 .apply_canonical_event(&create_pocket, create_offset, GroupLimitsConfig::default()) 2591 .expect("projection"); 2592 let authority = GroupAuthority::new( 2593 [FixtureKey::Owner.public_key()], 2594 [FixtureKey::Admin.public_key()], 2595 ); 2596 let group_state = projection.group(&group_id).expect("group"); 2597 let records = [ 2598 GroupOutboxRecord::pending( 2599 GroupOutboxKey::new( 2600 create.id().clone(), 2601 GroupOutboxEffect::MetadataSnapshot, 2602 group_id.clone(), 2603 None, 2604 ), 2605 GroupGeneratedEventBuilder::metadata_snapshot_payload( 2606 group_state, 2607 create.unsigned().created_at(), 2608 ) 2609 .expect("metadata payload"), 2610 ), 2611 GroupOutboxRecord::pending( 2612 GroupOutboxKey::new( 2613 create.id().clone(), 2614 GroupOutboxEffect::AdminListSnapshot, 2615 group_id.clone(), 2616 None, 2617 ), 2618 GroupGeneratedEventBuilder::admin_list_snapshot_payload( 2619 &group_id, 2620 &projection, 2621 &authority, 2622 create.unsigned().created_at(), 2623 ) 2624 .expect("admin payload"), 2625 ), 2626 ]; 2627 for record in records { 2628 store 2629 .put_extra_record( 2630 TANGLE_GROUP_OUTBOX_TABLE, 2631 &record.key().storage_key(), 2632 &record.to_json_bytes().expect("record bytes"), 2633 ) 2634 .expect("outbox"); 2635 } 2636 store.sync().expect("sync"); 2637 } 2638 2639 fn seed_pending_recovery_outbox_records( 2640 config: &PocketStoreConfig, 2641 events: &[Event], 2642 offsets: &[StoreOffset], 2643 ) -> Vec<GroupOutboxRecord> { 2644 let records = pending_recovery_outbox_records(events, offsets); 2645 let store = PocketStoreHandle::open(config).expect("store"); 2646 for record in &records { 2647 store 2648 .put_extra_record( 2649 TANGLE_GROUP_OUTBOX_TABLE, 2650 &record.key().storage_key(), 2651 &record.to_json_bytes().expect("record bytes"), 2652 ) 2653 .expect("outbox"); 2654 } 2655 store.sync().expect("sync"); 2656 records 2657 } 2658 2659 fn pending_recovery_outbox_records( 2660 events: &[Event], 2661 offsets: &[StoreOffset], 2662 ) -> Vec<GroupOutboxRecord> { 2663 assert_eq!(events.len(), 3); 2664 assert_eq!(offsets.len(), events.len()); 2665 let group_id = group("CrashFarm"); 2666 let limits = GroupLimitsConfig::default(); 2667 let authority = GroupAuthority::new( 2668 [FixtureKey::Owner.public_key()], 2669 [FixtureKey::Admin.public_key()], 2670 ); 2671 let mut projection = GroupProjection::new(); 2672 let pocket_events = events.iter().map(pocket_event_for_test).collect::<Vec<_>>(); 2673 let mut records = Vec::new(); 2674 2675 projection 2676 .apply_canonical_event(&pocket_events[0], offsets[0], limits) 2677 .expect("create projection"); 2678 let create_group = projection.group(&group_id).expect("create group"); 2679 records.push(GroupOutboxRecord::pending( 2680 GroupOutboxKey::new( 2681 events[0].id().clone(), 2682 GroupOutboxEffect::MetadataSnapshot, 2683 group_id.clone(), 2684 None, 2685 ), 2686 GroupGeneratedEventBuilder::metadata_snapshot_payload( 2687 create_group, 2688 events[0].unsigned().created_at(), 2689 ) 2690 .expect("create metadata payload"), 2691 )); 2692 records.push(GroupOutboxRecord::pending( 2693 GroupOutboxKey::new( 2694 events[0].id().clone(), 2695 GroupOutboxEffect::AdminListSnapshot, 2696 group_id.clone(), 2697 None, 2698 ), 2699 GroupGeneratedEventBuilder::admin_list_snapshot_payload( 2700 &group_id, 2701 &projection, 2702 &authority, 2703 events[0].unsigned().created_at(), 2704 ) 2705 .expect("create admin payload"), 2706 )); 2707 2708 projection 2709 .apply_canonical_event(&pocket_events[1], offsets[1], limits) 2710 .expect("put projection"); 2711 records.push(GroupOutboxRecord::pending( 2712 GroupOutboxKey::new( 2713 events[1].id().clone(), 2714 GroupOutboxEffect::MemberListSnapshot, 2715 group_id.clone(), 2716 None, 2717 ), 2718 GroupGeneratedEventBuilder::member_list_snapshot_payload( 2719 &group_id, 2720 &projection, 2721 events[1].unsigned().created_at(), 2722 limits.max_member_list_pubkeys(), 2723 ) 2724 .expect("put member payload") 2725 .expect("put member snapshot"), 2726 )); 2727 records.push(GroupOutboxRecord::pending( 2728 GroupOutboxKey::new( 2729 events[1].id().clone(), 2730 GroupOutboxEffect::AdminListSnapshot, 2731 group_id.clone(), 2732 None, 2733 ), 2734 GroupGeneratedEventBuilder::admin_list_snapshot_payload( 2735 &group_id, 2736 &projection, 2737 &authority, 2738 events[1].unsigned().created_at(), 2739 ) 2740 .expect("put admin payload"), 2741 )); 2742 2743 projection 2744 .apply_canonical_event(&pocket_events[2], offsets[2], limits) 2745 .expect("metadata projection"); 2746 let metadata_group = projection.group(&group_id).expect("metadata group"); 2747 records.push(GroupOutboxRecord::pending( 2748 GroupOutboxKey::new( 2749 events[2].id().clone(), 2750 GroupOutboxEffect::MetadataSnapshot, 2751 group_id.clone(), 2752 None, 2753 ), 2754 GroupGeneratedEventBuilder::metadata_snapshot_payload( 2755 metadata_group, 2756 events[2].unsigned().created_at(), 2757 ) 2758 .expect("metadata payload"), 2759 )); 2760 2761 records 2762 } 2763 2764 fn store_generated_events_for_pending_outbox_records( 2765 config: &PocketStoreConfig, 2766 records: &[GroupOutboxRecord], 2767 ) { 2768 let store = PocketStoreHandle::open(config).expect("store"); 2769 let signer = RelaySigner::from_secret_hex(TANGLE_V2_RELAY_SECRET_HEX).expect("relay signer"); 2770 let builder = GroupGeneratedEventBuilder::new(signer); 2771 for record in records { 2772 let event = builder 2773 .sign_payload_pocket(record.payload()) 2774 .expect("generated event"); 2775 store.store_event(&event).expect("store generated"); 2776 } 2777 store.sync().expect("sync"); 2778 } 2779 2780 fn tangle_v2_put_user_event_with_roles( 2781 actor: FixtureKey, 2782 group_id: &str, 2783 target: FixtureKey, 2784 created_at: u64, 2785 roles: &[&str], 2786 ) -> Event { 2787 let mut tags = vec![ 2788 tangle_v2_group_tag(group_id).expect("group tag"), 2789 tangle_v2_pubkey_tag(target).expect("pubkey tag"), 2790 ]; 2791 for role in roles { 2792 tags.push(tangle_v2_tag("role", &[*role]).expect("role tag")); 2793 } 2794 tangle_v2_event(actor, created_at, KIND_GROUP_PUT_USER.into(), tags, "").expect("put user") 2795 } 2796 2797 fn latest_admin_snapshot_pubkeys(relay: &mut BaseRelay, group_id: &str) -> Vec<String> { 2798 let mut events = query_events( 2799 relay, 2800 "admin-snapshots", 2801 vec![filter_group_tag(KIND_GROUP_ADMINS, "d", group_id)], 2802 ); 2803 events.sort_by_key(|event| (event.unsigned().created_at(), event.id().clone())); 2804 let latest = events.last().expect("admin snapshot"); 2805 let mut pubkeys = latest 2806 .unsigned() 2807 .tags() 2808 .iter() 2809 .filter_map(|tag| match tag.values() { 2810 [name, pubkey, ..] if name == "p" => Some(pubkey.clone()), 2811 _ => None, 2812 }) 2813 .collect::<Vec<_>>(); 2814 pubkeys.sort(); 2815 pubkeys 2816 } 2817 2818 fn query_events(relay: &mut BaseRelay, subscription_id: &str, filters: Vec<Filter>) -> Vec<Event> { 2819 let subscription_id = subscription(subscription_id); 2820 let messages = relay 2821 .handle_req(subscription_id.clone(), filters) 2822 .expect("query"); 2823 let mut events = Vec::new(); 2824 for message in messages { 2825 match message { 2826 RelayMessage::Event { 2827 subscription_id: actual, 2828 event, 2829 } => { 2830 assert_eq!(actual, subscription_id); 2831 events.push(event); 2832 } 2833 RelayMessage::Eose(actual) => assert_eq!(actual, subscription_id), 2834 value => panic!("expected event or EOSE, got {value:?}"), 2835 } 2836 } 2837 events 2838 } 2839 2840 fn assert_auth_required_redacted_query( 2841 relay: &mut BaseRelay, 2842 subscription_id: &str, 2843 filters: Vec<Filter>, 2844 ) { 2845 let subscription_id = subscription(subscription_id); 2846 assert_eq!( 2847 relay 2848 .handle_req(subscription_id.clone(), filters) 2849 .expect("query"), 2850 vec![RelayMessage::Closed { 2851 subscription_id, 2852 message: "auth-required: authentication required to read group events".to_owned() 2853 }] 2854 ); 2855 } 2856 2857 fn sorted_strings(values: impl IntoIterator<Item = String>) -> Vec<String> { 2858 let mut values = values.into_iter().collect::<Vec<_>>(); 2859 values.sort(); 2860 values 2861 } 2862 2863 fn final_group_name_for_order(name: &str, edits: [&Event; 2]) -> String { 2864 let config = test_store_config(name); 2865 let mut relay = BaseRelay::open_with_groups( 2866 &config, 2867 relay_limits(8), 2868 &group_config(), 2869 PocketQueryConfig::default(), 2870 ) 2871 .expect("relay"); 2872 let auth = authenticated(FixtureKey::Owner); 2873 accept_group_create(&mut relay, "ClockFarm", &[], 1, &auth); 2874 for edit in edits { 2875 assert_accepted( 2876 relay 2877 .handle_event_with_auth(edit.clone(), &auth) 2878 .expect("edit"), 2879 edit, 2880 ); 2881 } 2882 relay 2883 .group_projection() 2884 .expect("projection") 2885 .group(&group("ClockFarm")) 2886 .expect("group") 2887 .metadata() 2888 .name() 2889 .expect("name") 2890 .to_owned() 2891 } 2892 2893 fn test_store_config(name: &str) -> PocketStoreConfig { 2894 let root = temp_root(name); 2895 let _ = fs::remove_dir_all(&root); 2896 PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown).expect("config") 2897 } 2898 2899 fn relay_limits(max_pending_events: usize) -> BaseRelayLimits { 2900 BaseRelayLimits::new(BaseRelayLimitSettings { 2901 max_pending_events, 2902 max_subscription_id_length: 64, 2903 max_subscriptions: 64, 2904 max_filters_per_request: 10, 2905 max_tag_values_per_filter: 100, 2906 max_query_complexity: 610, 2907 max_event_tags: 200, 2908 max_content_length: 65_536, 2909 max_limit: 500, 2910 default_limit: 100, 2911 }) 2912 .expect("limits") 2913 } 2914 2915 fn delete_group_extra_records(config: &PocketStoreConfig) { 2916 let store = PocketStoreHandle::open(config).expect("store"); 2917 for table in [ 2918 TANGLE_GROUP_PROJECTION_TABLE, 2919 TANGLE_GROUP_OUTBOX_TABLE, 2920 TANGLE_GROUP_CHECKPOINT_TABLE, 2921 ] { 2922 for (key, _) in store.scan_extra_records(table).expect("scan") { 2923 store.delete_extra_record(table, &key).expect("delete"); 2924 } 2925 } 2926 store.sync().expect("sync"); 2927 } 2928 2929 fn group_extra_table_validation( 2930 config: &PocketStoreConfig, 2931 ) -> tangle_runtime::groups::GroupExtraTableValidation { 2932 let store = PocketStoreHandle::open(config).expect("store"); 2933 validate_group_extra_tables(&store).expect("validation") 2934 } 2935 2936 fn stored_event_offset(config: &PocketStoreConfig, event: &Event) -> u64 { 2937 let store = PocketStoreHandle::open(config).expect("store"); 2938 store 2939 .scan_events() 2940 .expect("events") 2941 .into_iter() 2942 .find(|stored| stored.event().id().as_hex_string() == event.id().as_str()) 2943 .expect("stored event") 2944 .store_offset() 2945 } 2946 2947 fn regress_member_projection_to_checkpoint( 2948 config: &PocketStoreConfig, 2949 checkpoint_offset: u64, 2950 group_id: &str, 2951 ) { 2952 let store = PocketStoreHandle::open(config).expect("store"); 2953 let group_id = GroupId::new(group_id).expect("group"); 2954 let checkpoint = ProjectionCheckpoint::current( 2955 Some(StoreOffset::new(checkpoint_offset)), 2956 UnixTimestamp::new(1_714_999_999), 2957 ); 2958 store 2959 .put_extra_record( 2960 TANGLE_GROUP_CHECKPOINT_TABLE, 2961 &projection_checkpoint_key(), 2962 &checkpoint.to_json_bytes().expect("checkpoint"), 2963 ) 2964 .expect("checkpoint"); 2965 store 2966 .delete_extra_record( 2967 TANGLE_GROUP_PROJECTION_TABLE, 2968 &member_current_key(&group_id, &FixtureKey::Member.public_key()), 2969 ) 2970 .expect("delete member"); 2971 store.sync().expect("sync"); 2972 } 2973 2974 fn regress_outbox_records_to_retryable(config: &PocketStoreConfig) { 2975 let store = PocketStoreHandle::open(config).expect("store"); 2976 let records = store 2977 .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE) 2978 .expect("outbox records"); 2979 assert!(records.len() >= 2); 2980 let mut first = GroupOutboxRecord::from_json_bytes(&records[0].1).expect("first outbox record"); 2981 let second = GroupOutboxRecord::from_json_bytes(&records[1].1).expect("second outbox record"); 2982 first.mark_failed(true, "retry on restart"); 2983 let pending = GroupOutboxRecord::pending(second.key().clone(), second.payload().clone()); 2984 store 2985 .put_extra_record( 2986 TANGLE_GROUP_OUTBOX_TABLE, 2987 &first.key().storage_key(), 2988 &first.to_json_bytes().expect("failed bytes"), 2989 ) 2990 .expect("put failed"); 2991 store 2992 .put_extra_record( 2993 TANGLE_GROUP_OUTBOX_TABLE, 2994 &pending.key().storage_key(), 2995 &pending.to_json_bytes().expect("pending bytes"), 2996 ) 2997 .expect("put pending"); 2998 store.sync().expect("sync"); 2999 } 3000 3001 fn regress_outbox_records_to_pending(config: &PocketStoreConfig) { 3002 let store = PocketStoreHandle::open(config).expect("store"); 3003 for (_, value) in store 3004 .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE) 3005 .expect("outbox records") 3006 { 3007 let record = GroupOutboxRecord::from_json_bytes(&value).expect("outbox record"); 3008 let pending = GroupOutboxRecord::pending(record.key().clone(), record.payload().clone()); 3009 store 3010 .put_extra_record( 3011 TANGLE_GROUP_OUTBOX_TABLE, 3012 &pending.key().storage_key(), 3013 &pending.to_json_bytes().expect("pending bytes"), 3014 ) 3015 .expect("put pending"); 3016 } 3017 store.sync().expect("sync"); 3018 } 3019 3020 fn stored_event_ids_for_kind(config: &PocketStoreConfig, kind: u32) -> Vec<String> { 3021 let store = PocketStoreHandle::open(config).expect("store"); 3022 let mut ids = store 3023 .scan_events() 3024 .expect("events") 3025 .into_iter() 3026 .filter(|stored| u32::from(stored.event().kind().as_u16()) == kind) 3027 .map(|stored| stored.event().id().as_hex_string()) 3028 .collect::<Vec<_>>(); 3029 ids.sort(); 3030 ids 3031 } 3032 3033 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] 3034 struct OutboxStatusCounts { 3035 pending: usize, 3036 retryable: usize, 3037 stored: usize, 3038 } 3039 3040 fn outbox_status_counts(config: &PocketStoreConfig) -> OutboxStatusCounts { 3041 let store = PocketStoreHandle::open(config).expect("store"); 3042 let mut counts = OutboxStatusCounts::default(); 3043 for (_, value) in store 3044 .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE) 3045 .expect("outbox records") 3046 { 3047 match GroupOutboxRecord::from_json_bytes(&value) 3048 .expect("outbox record") 3049 .status() 3050 { 3051 GroupOutboxStatus::Pending => counts.pending += 1, 3052 GroupOutboxStatus::Failed { retryable: true } => counts.retryable += 1, 3053 GroupOutboxStatus::Stored { .. } => counts.stored += 1, 3054 GroupOutboxStatus::Skipped { .. } | GroupOutboxStatus::Failed { retryable: false } => {} 3055 } 3056 } 3057 counts 3058 } 3059 3060 fn temp_root(name: &str) -> PathBuf { 3061 std::env::temp_dir().join(format!("tangle-rcld12-{name}-{}", std::process::id())) 3062 } 3063 3064 fn group_config() -> GroupRuntimeConfig { 3065 tangle_v2_group_config(FixtureKey::Owner, &[FixtureKey::Admin]).expect("groups") 3066 } 3067 3068 fn runtime_config(groups_enabled: bool) -> BaseRelayRuntimeConfig { 3069 let groups = if groups_enabled { 3070 serde_json::json!({ 3071 "enabled": true, 3072 "canonical_relay_url": TANGLE_V2_RELAY_URL, 3073 "relay_secret": TANGLE_V2_RELAY_SECRET_HEX, 3074 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], 3075 "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()] 3076 }) 3077 } else { 3078 serde_json::json!({"enabled": false}) 3079 }; 3080 parse_base_relay_runtime_config_json( 3081 &serde_json::json!({ 3082 "server": { 3083 "listen_addr": "127.0.0.1:0", 3084 "relay_url": TANGLE_V2_RELAY_URL 3085 }, 3086 "pocket": { 3087 "data_directory": "runtime/pocket", 3088 "sync_policy": "flush_on_shutdown", 3089 "query": { 3090 "allow_scraping": false, 3091 "allow_scrape_if_limited_to": 100, 3092 "allow_scrape_if_max_seconds": 3600 3093 } 3094 }, 3095 "groups": groups, 3096 "auth": { 3097 "challenge_ttl_seconds": 300, 3098 "created_at_skew_seconds": 600 3099 }, 3100 "limits": { 3101 "max_message_length": 1048576, 3102 "max_subid_length": 64, 3103 "max_subscriptions_per_connection": 64, 3104 "max_filters_per_request": 10, 3105 "max_tag_values_per_filter": 100, 3106 "max_query_complexity": 2048, 3107 "max_limit": 500, 3108 "default_limit": 100, 3109 "max_event_tags": 200, 3110 "max_content_length": 65536, 3111 "broadcast_channel_capacity": 4096, 3112 "per_connection_outbound_queue": 256 3113 }, 3114 "rate_limits": { 3115 "auth": { 3116 "per_ip": {"window_seconds": 60, "max_hits": 120}, 3117 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 3118 "failures": {"window_seconds": 300, "max_hits": 5}, 3119 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 3120 }, 3121 "event": { 3122 "per_ip": {"window_seconds": 60, "max_hits": 600}, 3123 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 3124 "per_kind": {"window_seconds": 60, "max_hits": 1000} 3125 }, 3126 "group": { 3127 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 3128 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 3129 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 3130 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 3131 "join_flow": {"window_seconds": 300, "max_hits": 10}, 3132 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 3133 }, 3134 "req": { 3135 "per_ip": {"window_seconds": 60, "max_hits": 600}, 3136 "per_connection": {"window_seconds": 60, "max_hits": 120}, 3137 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 3138 "per_group": {"window_seconds": 60, "max_hits": 240}, 3139 "per_kind": {"window_seconds": 60, "max_hits": 500}, 3140 "broad": {"window_seconds": 60, "max_hits": 30} 3141 }, 3142 "count": { 3143 "per_ip": {"window_seconds": 60, "max_hits": 300}, 3144 "per_connection": {"window_seconds": 60, "max_hits": 60}, 3145 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 3146 "per_group": {"window_seconds": 60, "max_hits": 120}, 3147 "per_kind": {"window_seconds": 60, "max_hits": 240}, 3148 "broad": {"window_seconds": 60, "max_hits": 20} 3149 } 3150 } 3151 }) 3152 .to_string(), 3153 ) 3154 .expect("runtime config") 3155 } 3156 3157 fn group_config_with_public_join() -> GroupRuntimeConfig { 3158 parse_group_runtime_config_json(&format!( 3159 r#"{{ 3160 "enabled": true, 3161 "canonical_relay_url": "{TANGLE_V2_RELAY_URL}", 3162 "relay_secret": "{TANGLE_V2_RELAY_SECRET_HEX}", 3163 "owner_pubkeys": ["{}"], 3164 "admin_pubkeys": ["{}"], 3165 "policy": {{"public_join": true, "invites_enabled": false}} 3166 }}"#, 3167 FixtureKey::Owner.public_key().as_str(), 3168 FixtureKey::Admin.public_key().as_str() 3169 )) 3170 .expect("groups") 3171 } 3172 3173 fn group_config_with_outbox_batch(batch: u32) -> GroupRuntimeConfig { 3174 parse_group_runtime_config_json(&format!( 3175 r#"{{ 3176 "enabled": true, 3177 "canonical_relay_url": "{TANGLE_V2_RELAY_URL}", 3178 "relay_secret": "{TANGLE_V2_RELAY_SECRET_HEX}", 3179 "owner_pubkeys": ["{}"], 3180 "admin_pubkeys": ["{}"], 3181 "limits": {{"max_outbox_replay_batch": {batch}}} 3182 }}"#, 3183 FixtureKey::Owner.public_key().as_str(), 3184 FixtureKey::Admin.public_key().as_str() 3185 )) 3186 .expect("groups") 3187 } 3188 3189 fn authenticated(key: FixtureKey) -> BaseAuthState { 3190 let auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 60, 600).expect("auth"); 3191 let mut auth = issue_challenge(auth, "challenge-a", 100); 3192 let event = pocket_protocol_auth_event(key, "challenge-a", 120); 3193 authenticate_pocket_event_for_test(&mut auth, &event, UnixTimestamp::new(120)) 3194 .expect("authenticate"); 3195 auth 3196 } 3197 3198 fn issue_challenge(mut auth: BaseAuthState, challenge: &str, created_at: u64) -> BaseAuthState { 3199 auth.issue_challenge(challenge, UnixTimestamp::new(created_at)) 3200 .expect("challenge"); 3201 auth 3202 } 3203 3204 fn assert_accepted(message: RelayMessage, event: &Event) { 3205 assert_eq!( 3206 message, 3207 RelayMessage::Ok { 3208 event_id: event.id().clone(), 3209 accepted: true, 3210 message: String::new() 3211 } 3212 ); 3213 pocket_event_for_test(event) 3214 .verify() 3215 .expect("pocket verify"); 3216 } 3217 3218 fn assert_accepted_pocket(message: RelayMessage, event: &Event) { 3219 assert_eq!( 3220 message, 3221 RelayMessage::Ok { 3222 event_id: event.id().clone(), 3223 accepted: true, 3224 message: String::new() 3225 } 3226 ); 3227 pocket_event_for_test(event) 3228 .verify() 3229 .expect("pocket verify"); 3230 } 3231 3232 fn rejected_message(message: RelayMessage) -> String { 3233 match message { 3234 RelayMessage::Ok { 3235 accepted: false, 3236 message, 3237 .. 3238 } => message, 3239 value => panic!("expected rejected OK, got {value:?}"), 3240 } 3241 } 3242 3243 fn assert_event_query( 3244 messages: Vec<RelayMessage>, 3245 subscription_id: &SubscriptionId, 3246 events: &[&Event], 3247 ) { 3248 assert_eq!(messages.len(), events.len() + 1); 3249 for (message, expected) in messages.iter().zip(events.iter()) { 3250 match message { 3251 RelayMessage::Event { 3252 subscription_id: actual_subscription, 3253 event, 3254 } => { 3255 assert_eq!(actual_subscription, subscription_id); 3256 assert_eq!(event.id(), expected.id()); 3257 } 3258 value => panic!("expected event, got {value:?}"), 3259 } 3260 } 3261 assert_eq!( 3262 messages.last(), 3263 Some(&RelayMessage::Eose(subscription_id.clone())) 3264 ); 3265 } 3266 3267 fn assert_count( 3268 message: Result<RelayMessage, tangle_runtime::errors::BaseRelayError>, 3269 expected: u64, 3270 ) { 3271 let RelayMessage::Count { count, .. } = message.expect("count") else { 3272 panic!("expected count") 3273 }; 3274 assert_eq!(count, expected); 3275 } 3276 3277 fn count_kind(relay: &BaseRelay, kind: u32) -> u64 { 3278 let RelayMessage::Count { count, .. } = relay 3279 .handle_count_protocol(subscription("count-kind"), vec![filter_kind(kind)]) 3280 .expect("count") 3281 else { 3282 panic!("expected count") 3283 }; 3284 count 3285 } 3286 3287 fn filter_kind(kind: u32) -> Filter { 3288 filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter") 3289 } 3290 3291 fn filter_group_tag(kind: u32, tag_name: &str, tag_value: &str) -> Filter { 3292 let mut value = serde_json::Map::new(); 3293 value.insert("kinds".to_owned(), serde_json::json!([kind])); 3294 value.insert(format!("#{tag_name}"), serde_json::json!([tag_value])); 3295 filter_from_value(&serde_json::Value::Object(value)).expect("filter") 3296 } 3297 3298 fn group(value: &str) -> GroupId { 3299 GroupId::new(value).expect("group") 3300 } 3301 3302 fn subscription(value: &str) -> SubscriptionId { 3303 SubscriptionId::new(value).expect("subscription") 3304 }