groups.rs (49951B)
1 #![forbid(unsafe_code)] 2 3 use crate::{errors::BaseRelayError, pocket_conversion::pocket_event_id}; 4 use std::{ 5 ops::Deref, 6 str, 7 sync::{Arc, RwLock, RwLockReadGuard}, 8 time::{SystemTime, UNIX_EPOCH}, 9 }; 10 use tangle_crypto::RelaySigner; 11 use tangle_groups::{ 12 CanonicalGroupEvent, GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind, 13 GroupEventClass, GroupEventDeletion, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, 14 GroupOutbox, GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, GroupOutboxRecord, 15 GroupPolicyConfig, GroupProjection, GroupReadDecision, GroupReadGate, GroupRuntimeConfig, 16 GroupState, GroupTombstone, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT, 17 KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, 18 KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberState, MemberStatus, 19 ProjectedRoleDefinition, ProjectionCheckpoint, RoleName, StoreOffset, event_deletion_key, 20 event_view::GroupEventView, group_current_key, member_current_key, projection_checkpoint_key, 21 rebuild_group_projection, role_current_key, tombstone_key, 22 }; 23 use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp}; 24 use tangle_store_pocket::{ 25 PocketEvent, PocketEventId, PocketOwnedEvent, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, 26 TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, 27 }; 28 29 #[derive(Clone)] 30 pub(crate) struct GroupServiceHandle { 31 state: Arc<RwLock<GroupServiceState>>, 32 } 33 34 pub(crate) enum GroupEventWrite { 35 Stored(Vec<StoreOffset>), 36 Duplicate, 37 } 38 39 pub(crate) enum GroupEventWriteError { 40 Rejected(GroupError), 41 Storage(BaseRelayError), 42 } 43 44 struct GeneratedGroupStorageEvent { 45 event: PocketOwnedEvent, 46 } 47 48 impl GeneratedGroupStorageEvent { 49 fn build( 50 builder: &GroupGeneratedEventBuilder, 51 payload: &GroupOutboxPayload, 52 ) -> Result<Self, BaseRelayError> { 53 let event = builder.sign_payload_pocket(payload)?; 54 Ok(Self { event }) 55 } 56 57 fn event(&self) -> &PocketEvent { 58 &self.event 59 } 60 61 fn event_id(&self) -> Result<EventId, BaseRelayError> { 62 EventId::new(&self.event().id().as_hex_string()).map_err(BaseRelayError::error) 63 } 64 } 65 66 impl From<BaseRelayError> for GroupEventWriteError { 67 fn from(error: BaseRelayError) -> Self { 68 Self::Storage(error) 69 } 70 } 71 72 pub struct GroupProjectionReadGuard<'a> { 73 state: RwLockReadGuard<'a, GroupServiceState>, 74 } 75 76 impl Deref for GroupProjectionReadGuard<'_> { 77 type Target = GroupProjection; 78 79 fn deref(&self) -> &Self::Target { 80 &self.state.projection 81 } 82 } 83 84 pub(crate) struct GroupServiceState { 85 builder: GroupGeneratedEventBuilder, 86 authority: GroupAuthority, 87 projection: GroupProjection, 88 outbox: GroupOutbox, 89 policy: GroupPolicyConfig, 90 limits: GroupLimitsConfig, 91 member_snapshot_cap: u32, 92 outbox_replay_batch_cap: u32, 93 } 94 95 impl GroupServiceHandle { 96 pub(crate) fn from_config( 97 store: &PocketStoreHandle, 98 config: &GroupRuntimeConfig, 99 ) -> Result<Option<Self>, BaseRelayError> { 100 GroupServiceState::from_config(store, config).map(|state| { 101 state.map(|state| Self { 102 state: Arc::new(RwLock::new(state)), 103 }) 104 }) 105 } 106 107 pub(crate) fn projection(&self) -> GroupProjectionReadGuard<'_> { 108 GroupProjectionReadGuard { 109 state: self 110 .state 111 .read() 112 .expect("group service state lock is not poisoned"), 113 } 114 } 115 116 pub(crate) fn limits(&self) -> GroupLimitsConfig { 117 self.state 118 .read() 119 .expect("group service state lock is not poisoned") 120 .limits() 121 } 122 123 pub(crate) fn outbox_pending_events(&self) -> usize { 124 self.state 125 .read() 126 .expect("group service state lock is not poisoned") 127 .outbox_pending_events() 128 } 129 130 pub(crate) fn event_visible_to_auth( 131 &self, 132 event: &(impl GroupEventView + ?Sized), 133 auth: &GroupAuthContext, 134 ) -> Result<bool, GroupError> { 135 self.state 136 .read() 137 .map_err(|_| GroupError::internal("group service state lock is poisoned"))? 138 .event_visible_to_auth(event, auth) 139 } 140 141 pub(crate) fn store_group_pocket_event( 142 &self, 143 store: &PocketStoreHandle, 144 event: &PocketEvent, 145 class: &GroupEventClass, 146 auth: &GroupAuthContext, 147 ) -> Result<GroupEventWrite, GroupEventWriteError> { 148 self.state 149 .write() 150 .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))? 151 .store_group_pocket_event(store, event, class, auth) 152 } 153 } 154 155 impl GroupServiceState { 156 fn from_config( 157 store: &PocketStoreHandle, 158 config: &GroupRuntimeConfig, 159 ) -> Result<Option<Self>, BaseRelayError> { 160 if !config.enabled() { 161 return Ok(None); 162 } 163 let relay_secret = config 164 .relay_secret() 165 .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?; 166 let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing()) 167 .map_err(BaseRelayError::invalid)?; 168 let storage = load_group_storage(store, config.limits())?; 169 let mut state = Self { 170 builder: GroupGeneratedEventBuilder::new(signer), 171 authority: GroupAuthority::new( 172 config.owner_pubkeys().iter().cloned(), 173 config.admin_pubkeys().iter().cloned(), 174 ), 175 projection: storage.projection, 176 outbox: storage.outbox, 177 policy: config.policy(), 178 limits: config.limits(), 179 member_snapshot_cap: config.limits().max_member_list_pubkeys(), 180 outbox_replay_batch_cap: config.limits().max_outbox_replay_batch(), 181 }; 182 state.derive_missing_outbox_records(store)?; 183 state.materialize_outbox(store)?; 184 Ok(Some(state)) 185 } 186 187 fn limits(&self) -> GroupLimitsConfig { 188 self.limits 189 } 190 191 fn outbox_pending_events(&self) -> usize { 192 self.outbox.replay_plan().records().len() 193 } 194 195 fn check_event( 196 &self, 197 store: &PocketStoreHandle, 198 event: &(impl GroupEventView + ?Sized), 199 class: &GroupEventClass, 200 auth: &GroupAuthContext, 201 ) -> Result<(), GroupError> { 202 tangle_groups::GroupWritePolicy::new(&self.projection, &self.authority, self.policy) 203 .check_event(event, class, auth) 204 .map(|_| ())?; 205 self.check_runtime_write_constraints(store, event, class) 206 } 207 208 fn check_runtime_write_constraints( 209 &self, 210 store: &PocketStoreHandle, 211 event: &(impl GroupEventView + ?Sized), 212 class: &GroupEventClass, 213 ) -> Result<(), GroupError> { 214 if let GroupEventClass::Moderation { kind, group_id } = class 215 && kind.as_u32() == KIND_GROUP_DELETE_EVENT 216 { 217 self.check_delete_event_target(store, event, group_id)?; 218 } 219 Ok(()) 220 } 221 222 fn check_delete_event_target( 223 &self, 224 store: &PocketStoreHandle, 225 event: &(impl GroupEventView + ?Sized), 226 group_id: &GroupId, 227 ) -> Result<(), GroupError> { 228 let target_id = delete_target_event_id(event)?; 229 let Some(target) = store 230 .event_by_id( 231 pocket_event_id(&target_id) 232 .map_err(|error| GroupError::internal(error.prefixed_message()))?, 233 ) 234 .map_err(|error| GroupError::internal(error.to_string()))? 235 else { 236 return Err(GroupError::invalid( 237 GroupErrorKind::MalformedTargetTag, 238 "delete target event is unavailable", 239 )); 240 }; 241 let target_class = tangle_groups::classify_group_event(&target, self.limits)?; 242 if target_class.group_id() != Some(group_id) { 243 return Err(GroupError::invalid( 244 GroupErrorKind::MalformedTargetTag, 245 "delete target event is not in group", 246 )); 247 } 248 Ok(()) 249 } 250 251 fn store_group_pocket_event( 252 &mut self, 253 store: &PocketStoreHandle, 254 event: &PocketEvent, 255 class: &GroupEventClass, 256 auth: &GroupAuthContext, 257 ) -> Result<GroupEventWrite, GroupEventWriteError> { 258 self.check_event(store, event, class, auth) 259 .map_err(GroupEventWriteError::Rejected)?; 260 if store 261 .event_by_id(event.id()) 262 .map_err(BaseRelayError::from)? 263 .is_some() 264 { 265 return Ok(GroupEventWrite::Duplicate); 266 } 267 let store_offset = 268 StoreOffset::new(store.store_event(event).map_err(BaseRelayError::from)?); 269 let mut stored_offsets = vec![store_offset]; 270 stored_offsets.extend(self.after_source_event_stored(store, event, class, store_offset)?); 271 Ok(GroupEventWrite::Stored(stored_offsets)) 272 } 273 274 fn event_visible_to_auth( 275 &self, 276 event: &(impl GroupEventView + ?Sized), 277 auth: &GroupAuthContext, 278 ) -> Result<bool, GroupError> { 279 let gate = GroupReadGate::new(&self.projection, &self.authority); 280 if auth.authenticated_pubkeys().is_empty() { 281 return gate 282 .screen_event(event, None, self.limits) 283 .map(|decision| decision == GroupReadDecision::Visible); 284 } 285 for pubkey in auth.authenticated_pubkeys() { 286 if gate.screen_event(event, Some(pubkey), self.limits)? == GroupReadDecision::Visible { 287 return Ok(true); 288 } 289 } 290 Ok(false) 291 } 292 293 fn after_source_event_stored( 294 &mut self, 295 store: &PocketStoreHandle, 296 event: &(impl GroupEventView + ?Sized), 297 class: &GroupEventClass, 298 store_offset: StoreOffset, 299 ) -> Result<Vec<StoreOffset>, BaseRelayError> { 300 let before_membership_admin = 301 membership_admin_snapshot_state(&self.projection, event, class)?; 302 self.projection 303 .apply_canonical_event(event, store_offset, self.limits)?; 304 if let Some(group_id) = class_group_id(class) { 305 self.persist_group_projection(store, group_id)?; 306 } 307 for record in self.plan_outbox_records(event, class, before_membership_admin)? { 308 let inserted = self.outbox.merge_idempotent(record.clone())?; 309 if inserted { 310 persist_outbox_record(store, &record)?; 311 } 312 } 313 if let Some(group_id) = class_group_id(class) { 314 return self.materialize_outbox_for_group(store, group_id); 315 } 316 Ok(Vec::new()) 317 } 318 319 fn plan_outbox_records( 320 &self, 321 event: &(impl GroupEventView + ?Sized), 322 class: &GroupEventClass, 323 before_membership_admin: Option<bool>, 324 ) -> Result<Vec<GroupOutboxRecord>, GroupError> { 325 plan_group_outbox_records( 326 event, 327 class, 328 &self.projection, 329 &self.authority, 330 self.member_snapshot_cap, 331 before_membership_admin, 332 ) 333 } 334 335 fn derive_missing_outbox_records( 336 &mut self, 337 store: &PocketStoreHandle, 338 ) -> Result<(), BaseRelayError> { 339 let relay_pubkey = self.builder.relay_pubkey().clone(); 340 let scan = scan_canonical_group_events(store, self.limits)?; 341 let mut projection = GroupProjection::new(); 342 let mut events = scan.into_events(); 343 events.sort_by_key(CanonicalGroupEvent::tuple); 344 for item in events { 345 let class = tangle_groups::classify_group_event(item.event(), self.limits)?; 346 let before_membership_admin = 347 membership_admin_snapshot_state(&projection, item.event(), &class)?; 348 projection.apply_canonical_event(item.event(), item.store_offset(), self.limits)?; 349 if item.event().pubkey().as_hex_string() == relay_pubkey.as_str() { 350 continue; 351 } 352 for record in plan_group_outbox_records( 353 item.event(), 354 &class, 355 &projection, 356 &self.authority, 357 self.member_snapshot_cap, 358 before_membership_admin, 359 )? { 360 let inserted = self.outbox.merge_idempotent(record.clone())?; 361 if inserted { 362 persist_outbox_record(store, &record)?; 363 } 364 } 365 } 366 Ok(()) 367 } 368 369 fn materialize_outbox( 370 &mut self, 371 store: &PocketStoreHandle, 372 ) -> Result<Vec<StoreOffset>, BaseRelayError> { 373 let mut stored_offsets = Vec::new(); 374 loop { 375 let records = self 376 .outbox 377 .replay_plan() 378 .records() 379 .iter() 380 .take(self.outbox_replay_batch_cap()) 381 .cloned() 382 .collect::<Vec<_>>(); 383 if records.is_empty() { 384 break; 385 } 386 stored_offsets.extend(self.materialize_records(store, records)?); 387 } 388 Ok(stored_offsets) 389 } 390 391 fn materialize_outbox_for_group( 392 &mut self, 393 store: &PocketStoreHandle, 394 group_id: &GroupId, 395 ) -> Result<Vec<StoreOffset>, BaseRelayError> { 396 let mut stored_offsets = Vec::new(); 397 loop { 398 let records = self 399 .outbox 400 .replay_plan_for_group(group_id) 401 .records() 402 .iter() 403 .take(self.outbox_replay_batch_cap()) 404 .cloned() 405 .collect::<Vec<_>>(); 406 if records.is_empty() { 407 break; 408 } 409 stored_offsets.extend(self.materialize_records(store, records)?); 410 } 411 Ok(stored_offsets) 412 } 413 414 fn outbox_replay_batch_cap(&self) -> usize { 415 usize::try_from(self.outbox_replay_batch_cap) 416 .expect("u32 outbox replay batch cap fits usize") 417 } 418 419 fn materialize_records( 420 &mut self, 421 store: &PocketStoreHandle, 422 records: Vec<GroupOutboxRecord>, 423 ) -> Result<Vec<StoreOffset>, BaseRelayError> { 424 let mut stored_offsets = Vec::new(); 425 for record in records { 426 if let Some(offset) = self.materialize_record(store, record)? { 427 stored_offsets.push(offset); 428 } 429 } 430 Ok(stored_offsets) 431 } 432 433 fn materialize_record( 434 &mut self, 435 store: &PocketStoreHandle, 436 mut record: GroupOutboxRecord, 437 ) -> Result<Option<StoreOffset>, BaseRelayError> { 438 if matches!( 439 record.key().effect(), 440 GroupOutboxEffect::RoleListSnapshot | GroupOutboxEffect::State39004Snapshot 441 ) { 442 record.mark_skipped("generated group effect is not supported"); 443 self.outbox.update(record.clone()); 444 persist_outbox_record(store, &record)?; 445 return Ok(None); 446 } 447 match self.store_generated_event(store, &record) { 448 Ok((generated_event_id, stored_offset)) => { 449 record.mark_stored(generated_event_id); 450 self.outbox.update(record.clone()); 451 persist_outbox_record(store, &record)?; 452 Ok(stored_offset) 453 } 454 Err(error) => { 455 record.mark_failed(true, error.prefixed_message()); 456 self.outbox.update(record.clone()); 457 persist_outbox_record(store, &record)?; 458 Err(error) 459 } 460 } 461 } 462 463 fn store_generated_event( 464 &mut self, 465 store: &PocketStoreHandle, 466 record: &GroupOutboxRecord, 467 ) -> Result<(EventId, Option<StoreOffset>), BaseRelayError> { 468 let generated = GeneratedGroupStorageEvent::build(&self.builder, record.payload())?; 469 let event_id = generated.event_id()?; 470 if generated_event_already_stored(store, generated.event().id())? { 471 return Ok((event_id, None)); 472 } 473 let offset = StoreOffset::new(store.store_event(generated.event())?); 474 self.projection 475 .apply_canonical_event(generated.event(), offset, self.limits)?; 476 self.persist_group_projection(store, record.key().group_id())?; 477 Ok((event_id, Some(offset))) 478 } 479 480 fn persist_group_projection( 481 &self, 482 store: &PocketStoreHandle, 483 group_id: &GroupId, 484 ) -> Result<(), BaseRelayError> { 485 if let Some(group) = self.projection.group(group_id) { 486 store.put_extra_record( 487 TANGLE_GROUP_PROJECTION_TABLE, 488 &group_current_key(group_id), 489 &group.to_json_bytes()?, 490 )?; 491 } 492 for ((candidate_group, pubkey), member) in self.projection.members() { 493 if candidate_group == group_id { 494 store.put_extra_record( 495 TANGLE_GROUP_PROJECTION_TABLE, 496 &member_current_key(group_id, pubkey), 497 &member.to_json_bytes()?, 498 )?; 499 } 500 } 501 for ((candidate_group, role_name), role) in self.projection.roles() { 502 if candidate_group == group_id { 503 store.put_extra_record( 504 TANGLE_GROUP_PROJECTION_TABLE, 505 &role_current_key(group_id, role_name), 506 &role.to_json_bytes()?, 507 )?; 508 } 509 } 510 if let Some(tombstone) = self.projection.tombstone(group_id) { 511 store.put_extra_record( 512 TANGLE_GROUP_PROJECTION_TABLE, 513 &tombstone_key(group_id), 514 &tombstone.to_json_bytes()?, 515 )?; 516 } 517 for (target_event_id, deletion) in self.projection.event_deletions() { 518 if deletion.group_id() == group_id { 519 store.put_extra_record( 520 TANGLE_GROUP_PROJECTION_TABLE, 521 &event_deletion_key(target_event_id), 522 &deletion.to_json_bytes()?, 523 )?; 524 } 525 } 526 Ok(()) 527 } 528 } 529 530 fn plan_group_outbox_records( 531 event: &(impl GroupEventView + ?Sized), 532 class: &GroupEventClass, 533 projection: &GroupProjection, 534 authority: &GroupAuthority, 535 member_snapshot_cap: u32, 536 before_membership_admin: Option<bool>, 537 ) -> Result<Vec<GroupOutboxRecord>, GroupError> { 538 let created_at = event.created_at(); 539 match class { 540 GroupEventClass::Moderation { kind, group_id } => match kind.as_u32() { 541 KIND_GROUP_CREATE_GROUP => { 542 let group = require_projected_group(projection, group_id)?; 543 Ok(vec![ 544 pending_record( 545 event, 546 GroupOutboxEffect::MetadataSnapshot, 547 group_id, 548 None, 549 GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?, 550 )?, 551 pending_record( 552 event, 553 GroupOutboxEffect::AdminListSnapshot, 554 group_id, 555 None, 556 GroupGeneratedEventBuilder::admin_list_snapshot_payload( 557 group_id, projection, authority, created_at, 558 )?, 559 )?, 560 ]) 561 } 562 KIND_GROUP_EDIT_METADATA => { 563 let group = require_projected_group(projection, group_id)?; 564 Ok(vec![pending_record( 565 event, 566 GroupOutboxEffect::MetadataSnapshot, 567 group_id, 568 None, 569 GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?, 570 )?]) 571 } 572 KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER => member_snapshot_records( 573 event, 574 group_id, 575 projection, 576 authority, 577 created_at, 578 member_snapshot_cap, 579 before_membership_admin, 580 ), 581 _ => Ok(Vec::new()), 582 }, 583 GroupEventClass::Normal { group_id } => match event.kind_u32() { 584 KIND_GROUP_JOIN_REQUEST => Ok(vec![pending_record( 585 event, 586 GroupOutboxEffect::JoinAccepted, 587 group_id, 588 Some(event.pubkey()?), 589 GroupGeneratedEventBuilder::join_accepted_payload( 590 group_id, 591 &event.pubkey()?, 592 created_at, 593 ), 594 )?]), 595 KIND_GROUP_LEAVE_REQUEST => Ok(vec![pending_record( 596 event, 597 GroupOutboxEffect::LeaveAccepted, 598 group_id, 599 Some(event.pubkey()?), 600 GroupGeneratedEventBuilder::leave_accepted_payload( 601 group_id, 602 &event.pubkey()?, 603 created_at, 604 ), 605 )?]), 606 _ => Ok(Vec::new()), 607 }, 608 GroupEventClass::NonGroup | GroupEventClass::RelayGeneratedSnapshot { .. } => { 609 Ok(Vec::new()) 610 } 611 } 612 } 613 614 fn member_snapshot_records( 615 event: &(impl GroupEventView + ?Sized), 616 group_id: &GroupId, 617 projection: &GroupProjection, 618 authority: &GroupAuthority, 619 created_at: UnixTimestamp, 620 member_snapshot_cap: u32, 621 before_membership_admin: Option<bool>, 622 ) -> Result<Vec<GroupOutboxRecord>, GroupError> { 623 let mut records = 624 member_snapshot_record(event, group_id, projection, created_at, member_snapshot_cap)?; 625 if let Some(before) = before_membership_admin { 626 let target = membership_target_pubkey(event)?; 627 let after = member_is_relay_override_admin(projection, group_id, &target); 628 if before != after { 629 records.push(pending_record( 630 event, 631 GroupOutboxEffect::AdminListSnapshot, 632 group_id, 633 None, 634 GroupGeneratedEventBuilder::admin_list_snapshot_payload( 635 group_id, projection, authority, created_at, 636 )?, 637 )?); 638 } 639 } 640 Ok(records) 641 } 642 643 fn member_snapshot_record( 644 event: &(impl GroupEventView + ?Sized), 645 group_id: &GroupId, 646 projection: &GroupProjection, 647 created_at: UnixTimestamp, 648 member_snapshot_cap: u32, 649 ) -> Result<Vec<GroupOutboxRecord>, GroupError> { 650 let key = GroupOutboxKey::new( 651 event.id()?, 652 GroupOutboxEffect::MemberListSnapshot, 653 group_id.clone(), 654 None, 655 ); 656 let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload( 657 group_id, 658 projection, 659 created_at, 660 member_snapshot_cap, 661 )?; 662 Ok(vec![match payload { 663 Some(payload) => GroupOutboxRecord::pending(key, payload), 664 None => { 665 let mut record = GroupOutboxRecord::pending( 666 key, 667 GroupOutboxPayload::new( 668 KIND_GROUP_MEMBERS, 669 created_at, 670 vec![vec!["d".to_owned(), group_id.as_str().to_owned()]], 671 "", 672 ), 673 ); 674 record.mark_skipped("member snapshot exceeds configured cap"); 675 record 676 } 677 }]) 678 } 679 680 fn membership_admin_snapshot_state( 681 projection: &GroupProjection, 682 event: &(impl GroupEventView + ?Sized), 683 class: &GroupEventClass, 684 ) -> Result<Option<bool>, GroupError> { 685 match class { 686 GroupEventClass::Moderation { kind, group_id } 687 if matches!(kind.as_u32(), KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER) => 688 { 689 let target = membership_target_pubkey(event)?; 690 Ok(Some(member_is_relay_override_admin( 691 projection, group_id, &target, 692 ))) 693 } 694 _ => Ok(None), 695 } 696 } 697 698 fn member_is_relay_override_admin( 699 projection: &GroupProjection, 700 group_id: &GroupId, 701 pubkey: &PublicKeyHex, 702 ) -> bool { 703 projection 704 .member(group_id, pubkey) 705 .filter(|member| member.status() == MemberStatus::Member) 706 .is_some_and(|member| { 707 member 708 .roles() 709 .contains(&RoleName::permanent_relay_override()) 710 }) 711 } 712 713 fn membership_target_pubkey( 714 event: &(impl GroupEventView + ?Sized), 715 ) -> Result<PublicKeyHex, GroupError> { 716 let mut target = None; 717 event.visit_tags(|tag| { 718 if tag.first_value().is_none_or(|name| name != "p") { 719 return Ok(()); 720 } 721 let Some((_, value)) = tag.indexed_pair() else { 722 return Err(GroupError::invalid( 723 GroupErrorKind::MalformedTargetTag, 724 "malformed p target tag", 725 )); 726 }; 727 target = Some(PublicKeyHex::new(value).map_err(|reason| { 728 GroupError::invalid( 729 GroupErrorKind::MalformedTargetTag, 730 format!("malformed p target tag: {reason}"), 731 ) 732 })?); 733 Ok(()) 734 })?; 735 target.ok_or_else(|| { 736 GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing p target tag") 737 }) 738 } 739 740 fn pending_record( 741 event: &(impl GroupEventView + ?Sized), 742 effect: GroupOutboxEffect, 743 group_id: &GroupId, 744 target_pubkey: Option<PublicKeyHex>, 745 payload: GroupOutboxPayload, 746 ) -> Result<GroupOutboxRecord, GroupError> { 747 Ok(GroupOutboxRecord::pending( 748 GroupOutboxKey::new(event.id()?, effect, group_id.clone(), target_pubkey), 749 payload, 750 )) 751 } 752 753 fn require_projected_group<'a>( 754 projection: &'a GroupProjection, 755 group_id: &GroupId, 756 ) -> Result<&'a GroupState, GroupError> { 757 projection 758 .group(group_id) 759 .ok_or_else(|| GroupError::internal("group projection is missing after accepted write")) 760 } 761 762 #[derive(Debug, Clone, PartialEq, Eq)] 763 struct GroupStorageState { 764 projection: GroupProjection, 765 outbox: GroupOutbox, 766 } 767 768 fn load_group_storage( 769 store: &PocketStoreHandle, 770 limits: GroupLimitsConfig, 771 ) -> Result<GroupStorageState, BaseRelayError> { 772 let checkpoint_status = validate_group_checkpoint(store)?; 773 let outbox_records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?; 774 if checkpoint_status.requires_rebuild() { 775 let scan = scan_canonical_group_events(store, limits)?; 776 let report = 777 rebuild_group_projection(scan.into_events(), limits, projection_rebuilt_at()?)?; 778 persist_group_projection_snapshot(store, report.projection())?; 779 validate_rebuilt_group_projection(store)?; 780 return Ok(GroupStorageState { 781 projection: report.into_projection(), 782 outbox: load_group_outbox(outbox_records)?, 783 }); 784 } 785 let checkpoint = checkpoint_status.checkpoint().cloned(); 786 let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; 787 let mut projection = load_group_projection(projection_records, checkpoint)?; 788 apply_canonical_events_after_checkpoint(store, &mut projection, limits)?; 789 Ok(GroupStorageState { 790 projection, 791 outbox: load_group_outbox(outbox_records)?, 792 }) 793 } 794 795 fn load_group_projection( 796 records: Vec<(Vec<u8>, Vec<u8>)>, 797 checkpoint: Option<ProjectionCheckpoint>, 798 ) -> Result<GroupProjection, BaseRelayError> { 799 let mut projection = GroupProjection::new(); 800 for (key, value) in records { 801 match projection_key_parts(&key)?.as_slice() { 802 ["group", _] => projection.put_group(GroupState::from_json_bytes(&value)?), 803 ["member", group_id, _] => projection.put_member( 804 GroupId::new(group_id)?, 805 MemberState::from_json_bytes(&value)?, 806 ), 807 ["role", group_id, _] => projection.put_role( 808 GroupId::new(group_id)?, 809 ProjectedRoleDefinition::from_json_bytes(&value)?, 810 ), 811 ["tombstone", _] => projection.put_tombstone(GroupTombstone::from_json_bytes(&value)?), 812 ["event_deletion", _] => { 813 projection.put_event_deletion(GroupEventDeletion::from_json_bytes(&value)?) 814 } 815 _ => { 816 return Err(BaseRelayError::error(format!( 817 "unknown group projection extra-table key: {}", 818 projection_key_label(&key) 819 ))); 820 } 821 } 822 } 823 if let Some(checkpoint) = checkpoint { 824 projection.set_checkpoint(checkpoint); 825 } 826 Ok(projection) 827 } 828 829 fn load_group_outbox(records: Vec<(Vec<u8>, Vec<u8>)>) -> Result<GroupOutbox, BaseRelayError> { 830 let mut outbox = GroupOutbox::new(); 831 for (_, value) in records { 832 outbox.update(GroupOutboxRecord::from_json_bytes(&value)?); 833 } 834 Ok(outbox) 835 } 836 837 fn apply_canonical_events_after_checkpoint( 838 store: &PocketStoreHandle, 839 projection: &mut GroupProjection, 840 limits: GroupLimitsConfig, 841 ) -> Result<(), BaseRelayError> { 842 let last_offset = projection 843 .checkpoint() 844 .and_then(ProjectionCheckpoint::last_offset); 845 let scan = scan_canonical_group_events_after(store, last_offset, limits)?; 846 if scan.events().is_empty() { 847 return Ok(()); 848 } 849 let mut events = scan.into_events(); 850 let next_offset = events.iter().map(CanonicalGroupEvent::store_offset).max(); 851 events.sort_by_key(CanonicalGroupEvent::tuple); 852 for item in events { 853 projection.apply_canonical_event(item.event(), item.store_offset(), limits)?; 854 } 855 projection.set_checkpoint(ProjectionCheckpoint::current( 856 next_offset, 857 projection_rebuilt_at()?, 858 )); 859 persist_group_projection_snapshot(store, projection)?; 860 validate_rebuilt_group_projection(store) 861 } 862 863 fn persist_group_projection_snapshot( 864 store: &PocketStoreHandle, 865 projection: &GroupProjection, 866 ) -> Result<(), BaseRelayError> { 867 clear_extra_table(store, TANGLE_GROUP_PROJECTION_TABLE)?; 868 for (group_id, group) in projection.groups() { 869 store.put_extra_record( 870 TANGLE_GROUP_PROJECTION_TABLE, 871 &group_current_key(group_id), 872 &group.to_json_bytes()?, 873 )?; 874 } 875 for ((group_id, pubkey), member) in projection.members() { 876 store.put_extra_record( 877 TANGLE_GROUP_PROJECTION_TABLE, 878 &member_current_key(group_id, pubkey), 879 &member.to_json_bytes()?, 880 )?; 881 } 882 for ((group_id, role_name), role) in projection.roles() { 883 store.put_extra_record( 884 TANGLE_GROUP_PROJECTION_TABLE, 885 &role_current_key(group_id, role_name), 886 &role.to_json_bytes()?, 887 )?; 888 } 889 for (group_id, tombstone) in projection.tombstones() { 890 store.put_extra_record( 891 TANGLE_GROUP_PROJECTION_TABLE, 892 &tombstone_key(group_id), 893 &tombstone.to_json_bytes()?, 894 )?; 895 } 896 for (target_event_id, deletion) in projection.event_deletions() { 897 store.put_extra_record( 898 TANGLE_GROUP_PROJECTION_TABLE, 899 &event_deletion_key(target_event_id), 900 &deletion.to_json_bytes()?, 901 )?; 902 } 903 let checkpoint = projection 904 .checkpoint() 905 .ok_or_else(|| BaseRelayError::error("group projection rebuild checkpoint is missing"))?; 906 store.put_extra_record( 907 TANGLE_GROUP_CHECKPOINT_TABLE, 908 &projection_checkpoint_key(), 909 &checkpoint.to_json_bytes()?, 910 )?; 911 Ok(()) 912 } 913 914 fn clear_extra_table(store: &PocketStoreHandle, table: &'static str) -> Result<(), BaseRelayError> { 915 for (key, _) in store.scan_extra_records(table)? { 916 store.delete_extra_record(table, &key)?; 917 } 918 Ok(()) 919 } 920 921 fn validate_rebuilt_group_projection(store: &PocketStoreHandle) -> Result<(), BaseRelayError> { 922 let validation = validate_group_extra_tables(store)?; 923 if validation.checkpoint_status().requires_rebuild() { 924 return Err(BaseRelayError::error( 925 "group projection checkpoint is not current after rebuild", 926 )); 927 } 928 Ok(()) 929 } 930 931 #[derive(Debug, Clone, PartialEq, Eq)] 932 pub struct GroupExtraTableValidation { 933 projection_records: usize, 934 outbox_records: usize, 935 checkpoint_status: GroupCheckpointStatus, 936 } 937 938 impl GroupExtraTableValidation { 939 pub fn projection_records(&self) -> usize { 940 self.projection_records 941 } 942 943 pub fn outbox_records(&self) -> usize { 944 self.outbox_records 945 } 946 947 pub fn checkpoint_status(&self) -> &GroupCheckpointStatus { 948 &self.checkpoint_status 949 } 950 } 951 952 #[derive(Debug, Clone, PartialEq, Eq)] 953 pub enum GroupCheckpointStatus { 954 Missing, 955 Current { checkpoint: ProjectionCheckpoint }, 956 Stale { checkpoint: ProjectionCheckpoint }, 957 } 958 959 impl GroupCheckpointStatus { 960 pub fn requires_rebuild(&self) -> bool { 961 !matches!(self, Self::Current { .. }) 962 } 963 964 pub fn checkpoint(&self) -> Option<&ProjectionCheckpoint> { 965 match self { 966 Self::Missing => None, 967 Self::Current { checkpoint } | Self::Stale { checkpoint } => Some(checkpoint), 968 } 969 } 970 } 971 972 pub fn validate_group_extra_tables( 973 store: &PocketStoreHandle, 974 ) -> Result<GroupExtraTableValidation, BaseRelayError> { 975 let projection_records = validate_group_projection_records(store)?; 976 let outbox_records = validate_group_outbox_records(store)?; 977 let checkpoint_status = validate_group_checkpoint(store)?; 978 Ok(GroupExtraTableValidation { 979 projection_records, 980 outbox_records, 981 checkpoint_status, 982 }) 983 } 984 985 fn validate_group_projection_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> { 986 let records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; 987 let count = records.len(); 988 for (key, value) in records { 989 match projection_key_parts(&key)?.as_slice() { 990 ["group", _] => { 991 GroupState::from_json_bytes(&value)?; 992 } 993 ["member", _, _] => { 994 MemberState::from_json_bytes(&value)?; 995 } 996 ["role", _, _] => { 997 ProjectedRoleDefinition::from_json_bytes(&value)?; 998 } 999 ["tombstone", _] => { 1000 GroupTombstone::from_json_bytes(&value)?; 1001 } 1002 ["event_deletion", _] => { 1003 GroupEventDeletion::from_json_bytes(&value)?; 1004 } 1005 _ => { 1006 return Err(BaseRelayError::error(format!( 1007 "unknown group projection extra-table key: {}", 1008 projection_key_label(&key) 1009 ))); 1010 } 1011 } 1012 } 1013 Ok(count) 1014 } 1015 1016 fn validate_group_outbox_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> { 1017 let records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?; 1018 let count = records.len(); 1019 for (_, value) in records { 1020 GroupOutboxRecord::from_json_bytes(&value)?; 1021 } 1022 Ok(count) 1023 } 1024 1025 fn validate_group_checkpoint( 1026 store: &PocketStoreHandle, 1027 ) -> Result<GroupCheckpointStatus, BaseRelayError> { 1028 let Some(raw) = 1029 store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? 1030 else { 1031 return Ok(GroupCheckpointStatus::Missing); 1032 }; 1033 let checkpoint = ProjectionCheckpoint::from_json_bytes(&raw)?; 1034 if checkpoint.matches_current_versions() { 1035 Ok(GroupCheckpointStatus::Current { checkpoint }) 1036 } else { 1037 Ok(GroupCheckpointStatus::Stale { checkpoint }) 1038 } 1039 } 1040 1041 fn projection_rebuilt_at() -> Result<UnixTimestamp, BaseRelayError> { 1042 Ok(UnixTimestamp::new( 1043 SystemTime::now() 1044 .duration_since(UNIX_EPOCH) 1045 .map_err(|error| { 1046 BaseRelayError::error(format!("system clock is before UNIX epoch: {error}")) 1047 })? 1048 .as_secs(), 1049 )) 1050 } 1051 1052 #[derive(Debug, Clone, PartialEq, Eq)] 1053 pub struct CanonicalGroupEventScan { 1054 events: Vec<CanonicalGroupEvent>, 1055 scanned_events: usize, 1056 skipped_events: usize, 1057 } 1058 1059 impl CanonicalGroupEventScan { 1060 pub fn events(&self) -> &[CanonicalGroupEvent] { 1061 &self.events 1062 } 1063 1064 pub fn into_events(self) -> Vec<CanonicalGroupEvent> { 1065 self.events 1066 } 1067 1068 pub fn scanned_events(&self) -> usize { 1069 self.scanned_events 1070 } 1071 1072 pub fn skipped_events(&self) -> usize { 1073 self.skipped_events 1074 } 1075 } 1076 1077 pub fn scan_canonical_group_events( 1078 store: &PocketStoreHandle, 1079 limits: GroupLimitsConfig, 1080 ) -> Result<CanonicalGroupEventScan, BaseRelayError> { 1081 scan_canonical_group_events_after(store, None, limits) 1082 } 1083 1084 pub fn scan_canonical_group_events_after( 1085 store: &PocketStoreHandle, 1086 last_offset: Option<StoreOffset>, 1087 limits: GroupLimitsConfig, 1088 ) -> Result<CanonicalGroupEventScan, BaseRelayError> { 1089 let stored_events = store.scan_events_after(last_offset.map(StoreOffset::as_u64))?; 1090 let scanned_events = stored_events.len(); 1091 let mut events = Vec::new(); 1092 let mut skipped_events = 0; 1093 for stored in stored_events { 1094 match tangle_groups::classify_group_event(stored.event(), limits)? { 1095 GroupEventClass::NonGroup => skipped_events += 1, 1096 GroupEventClass::Normal { .. } 1097 | GroupEventClass::Moderation { .. } 1098 | GroupEventClass::RelayGeneratedSnapshot { .. } => { 1099 let store_offset = StoreOffset::new(stored.store_offset()); 1100 events.push(CanonicalGroupEvent::new(stored.into_event(), store_offset)); 1101 } 1102 } 1103 } 1104 Ok(CanonicalGroupEventScan { 1105 events, 1106 scanned_events, 1107 skipped_events, 1108 }) 1109 } 1110 1111 fn projection_key_parts(key: &[u8]) -> Result<Vec<&str>, BaseRelayError> { 1112 let key = str::from_utf8(key).map_err(|error| BaseRelayError::error(error.to_string()))?; 1113 Ok(key.split('\0').collect()) 1114 } 1115 1116 fn projection_key_label(key: &[u8]) -> String { 1117 String::from_utf8_lossy(key).replace('\0', "\\0") 1118 } 1119 1120 fn persist_outbox_record( 1121 store: &PocketStoreHandle, 1122 record: &GroupOutboxRecord, 1123 ) -> Result<(), BaseRelayError> { 1124 store.put_extra_record( 1125 TANGLE_GROUP_OUTBOX_TABLE, 1126 &record.key().storage_key(), 1127 &record.to_json_bytes()?, 1128 )?; 1129 Ok(()) 1130 } 1131 1132 fn generated_event_already_stored( 1133 store: &PocketStoreHandle, 1134 event_id: PocketEventId, 1135 ) -> Result<bool, BaseRelayError> { 1136 if store.event_by_id(event_id)?.is_some() { 1137 return Ok(true); 1138 } 1139 for stored in store.scan_events()? { 1140 if stored.event().id() == event_id { 1141 return Ok(true); 1142 } 1143 } 1144 Ok(false) 1145 } 1146 1147 fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> { 1148 match class { 1149 GroupEventClass::Moderation { group_id, .. } 1150 | GroupEventClass::Normal { group_id } 1151 | GroupEventClass::RelayGeneratedSnapshot { group_id, .. } => Some(group_id), 1152 GroupEventClass::NonGroup => None, 1153 } 1154 } 1155 1156 fn delete_target_event_id(event: &(impl GroupEventView + ?Sized)) -> Result<EventId, GroupError> { 1157 let mut target = None; 1158 event.visit_tags(|tag| { 1159 if tag.first_value().is_none_or(|name| name != "e") { 1160 return Ok(()); 1161 } 1162 let Some((_, value)) = tag.indexed_pair() else { 1163 return Err(GroupError::invalid( 1164 GroupErrorKind::MalformedTargetTag, 1165 "malformed e target tag", 1166 )); 1167 }; 1168 target = Some(EventId::new(value).map_err(|reason| { 1169 GroupError::invalid( 1170 GroupErrorKind::MalformedTargetTag, 1171 format!("malformed e target tag: {reason}"), 1172 ) 1173 })?); 1174 Ok(()) 1175 })?; 1176 target.ok_or_else(|| { 1177 GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing e target tag") 1178 }) 1179 } 1180 1181 #[cfg(test)] 1182 mod tests { 1183 use super::{ 1184 GeneratedGroupStorageEvent, GroupCheckpointStatus, GroupServiceHandle, 1185 scan_canonical_group_events, scan_canonical_group_events_after, 1186 validate_group_extra_tables, 1187 }; 1188 use crate::pocket_conversion::tangle_event_to_pocket; 1189 use tangle_crypto::RelaySigner; 1190 use tangle_groups::{ 1191 GroupGeneratedEventBuilder, GroupId, GroupRuntimeConfig, KIND_GROUP_METADATA, 1192 KIND_GROUP_PUT_USER, ProjectionCheckpoint, StoreOffset, projection_checkpoint_key, 1193 }; 1194 use tangle_protocol::{Tag, UnixTimestamp}; 1195 use tangle_store_pocket::{ 1196 PocketEvent, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, 1197 TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_PROJECTION_TABLE, 1198 }; 1199 use tangle_test_support::{ 1200 FixtureKey, tangle_v2_event, tangle_v2_group_create_event, tangle_v2_group_event, 1201 }; 1202 1203 #[test] 1204 fn generated_group_storage_event_adapter_preserves_pocket_id_signature_and_tags() { 1205 let builder = GroupGeneratedEventBuilder::new( 1206 RelaySigner::from_secret_hex(&"7".repeat(64)).expect("key"), 1207 ); 1208 let group_id = GroupId::new("PocketFarm").expect("group"); 1209 let member = FixtureKey::Member.public_key(); 1210 let payload = GroupGeneratedEventBuilder::join_accepted_payload( 1211 &group_id, 1212 &member, 1213 UnixTimestamp::new(1_714_124_433), 1214 ); 1215 let generated = GeneratedGroupStorageEvent::build(&builder, &payload).expect("generated"); 1216 1217 assert_eq!( 1218 generated.event().id().as_hex_string(), 1219 generated.event_id().expect("event id").as_str() 1220 ); 1221 assert_eq!( 1222 generated.event().pubkey().as_hex_string(), 1223 builder.relay_pubkey().as_str() 1224 ); 1225 assert_eq!( 1226 u32::from(generated.event().kind().as_u16()), 1227 KIND_GROUP_PUT_USER 1228 ); 1229 assert!(has_pocket_tag(generated.event(), &["h", "PocketFarm"])); 1230 assert!(has_pocket_tag(generated.event(), &["p", member.as_str()])); 1231 generated.event().verify().expect("signature"); 1232 } 1233 1234 #[test] 1235 fn group_service_from_disabled_config_is_absent() { 1236 let root = std::env::temp_dir().join(format!( 1237 "tangle-group-service-disabled-{}", 1238 std::process::id() 1239 )); 1240 let _ = std::fs::remove_dir_all(&root); 1241 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 1242 .expect("config"); 1243 let store = PocketStoreHandle::open(&config).expect("store"); 1244 1245 assert!( 1246 GroupServiceHandle::from_config(&store, &GroupRuntimeConfig::disabled()) 1247 .expect("service") 1248 .is_none() 1249 ); 1250 } 1251 1252 #[test] 1253 fn canonical_group_event_scanner_returns_group_events_with_offsets() { 1254 let root = std::env::temp_dir().join(format!( 1255 "tangle-canonical-group-scan-{}", 1256 std::process::id() 1257 )); 1258 let _ = std::fs::remove_dir_all(&root); 1259 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 1260 .expect("config"); 1261 let store = PocketStoreHandle::open(&config).expect("store"); 1262 let public = 1263 tangle_v2_event(FixtureKey::Member, 1, 1, Vec::new(), "public").expect("public"); 1264 let normal = 1265 tangle_v2_group_event(FixtureKey::Member, "ScanFarm", 2, 1, "normal").expect("normal"); 1266 let group = 1267 tangle_v2_group_create_event(FixtureKey::Owner, "ScanFarm", 3, &[]).expect("group"); 1268 let generated = tangle_v2_event( 1269 FixtureKey::Owner, 1270 4, 1271 KIND_GROUP_METADATA.into(), 1272 vec![Tag::from_parts("d", &["ScanFarm"]).expect("d")], 1273 "", 1274 ) 1275 .expect("generated"); 1276 let public_offset = store 1277 .store_event(&tangle_event_to_pocket(&public).expect("public pocket")) 1278 .expect("store public"); 1279 let normal_offset = store 1280 .store_event(&tangle_event_to_pocket(&normal).expect("normal pocket")) 1281 .expect("store normal"); 1282 let group_offset = store 1283 .store_event(&tangle_event_to_pocket(&group).expect("group pocket")) 1284 .expect("store group"); 1285 let generated_offset = store 1286 .store_event(&tangle_event_to_pocket(&generated).expect("generated pocket")) 1287 .expect("store generated"); 1288 1289 let scan = scan_canonical_group_events(&store, Default::default()).expect("scan"); 1290 let after_public = scan_canonical_group_events_after( 1291 &store, 1292 Some(StoreOffset::new(public_offset)), 1293 Default::default(), 1294 ) 1295 .expect("after public"); 1296 1297 assert_eq!(scan.scanned_events(), 4); 1298 assert_eq!(scan.skipped_events(), 1); 1299 assert_eq!( 1300 scan.events() 1301 .iter() 1302 .map(|event| event.event().id().as_hex_string()) 1303 .collect::<Vec<_>>(), 1304 vec![ 1305 normal.id().as_str().to_owned(), 1306 group.id().as_str().to_owned(), 1307 generated.id().as_str().to_owned(), 1308 ] 1309 ); 1310 assert_eq!( 1311 scan.events() 1312 .iter() 1313 .map(|event| event.store_offset()) 1314 .collect::<Vec<_>>(), 1315 vec![ 1316 StoreOffset::new(normal_offset), 1317 StoreOffset::new(group_offset), 1318 StoreOffset::new(generated_offset), 1319 ] 1320 ); 1321 assert_eq!(after_public.scanned_events(), 3); 1322 assert_eq!(after_public.skipped_events(), 0); 1323 assert_eq!( 1324 after_public 1325 .events() 1326 .iter() 1327 .map(|event| event.event().id().as_hex_string()) 1328 .collect::<Vec<_>>(), 1329 vec![ 1330 normal.id().as_str().to_owned(), 1331 group.id().as_str().to_owned(), 1332 generated.id().as_str().to_owned(), 1333 ] 1334 ); 1335 1336 let _ = std::fs::remove_dir_all(root); 1337 } 1338 1339 #[test] 1340 fn group_extra_table_validation_reports_checkpoint_version_status() { 1341 let (root, store) = test_store("tangle-group-extra-version"); 1342 let missing = validate_group_extra_tables(&store).expect("missing"); 1343 1344 assert_eq!(missing.projection_records(), 0); 1345 assert_eq!(missing.outbox_records(), 0); 1346 assert_eq!(missing.checkpoint_status(), &GroupCheckpointStatus::Missing); 1347 assert!(missing.checkpoint_status().requires_rebuild()); 1348 1349 let current = 1350 ProjectionCheckpoint::current(Some(StoreOffset::new(42)), UnixTimestamp::new(100)); 1351 store 1352 .put_extra_record( 1353 TANGLE_GROUP_CHECKPOINT_TABLE, 1354 &projection_checkpoint_key(), 1355 ¤t.to_json_bytes().expect("current bytes"), 1356 ) 1357 .expect("put current"); 1358 let current_validation = validate_group_extra_tables(&store).expect("current"); 1359 assert_eq!( 1360 current_validation.checkpoint_status(), 1361 &GroupCheckpointStatus::Current { 1362 checkpoint: current.clone() 1363 } 1364 ); 1365 assert!(!current_validation.checkpoint_status().requires_rebuild()); 1366 assert_eq!( 1367 current_validation.checkpoint_status().checkpoint(), 1368 Some(¤t) 1369 ); 1370 1371 let stale = 1372 ProjectionCheckpoint::new(0, 0, Some(StoreOffset::new(42)), UnixTimestamp::new(101)); 1373 store 1374 .put_extra_record( 1375 TANGLE_GROUP_CHECKPOINT_TABLE, 1376 &projection_checkpoint_key(), 1377 &stale.to_json_bytes().expect("stale bytes"), 1378 ) 1379 .expect("put stale"); 1380 let stale_validation = validate_group_extra_tables(&store).expect("stale"); 1381 assert_eq!( 1382 stale_validation.checkpoint_status(), 1383 &GroupCheckpointStatus::Stale { 1384 checkpoint: stale.clone() 1385 } 1386 ); 1387 assert!(stale_validation.checkpoint_status().requires_rebuild()); 1388 1389 let _ = std::fs::remove_dir_all(root); 1390 } 1391 1392 #[test] 1393 fn group_extra_table_validation_rejects_bad_projection_schema() { 1394 let (unknown_root, unknown_store) = test_store("tangle-group-extra-unknown"); 1395 unknown_store 1396 .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"unknown\0Farm", b"{}") 1397 .expect("put unknown"); 1398 assert_eq!( 1399 validate_group_extra_tables(&unknown_store) 1400 .expect_err("unknown") 1401 .prefixed_message(), 1402 "error: unknown group projection extra-table key: unknown\\0Farm" 1403 ); 1404 let _ = std::fs::remove_dir_all(unknown_root); 1405 1406 let (corrupt_root, corrupt_store) = test_store("tangle-group-extra-corrupt"); 1407 corrupt_store 1408 .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"not-json") 1409 .expect("put corrupt"); 1410 assert!( 1411 validate_group_extra_tables(&corrupt_store) 1412 .expect_err("corrupt") 1413 .prefixed_message() 1414 .contains("group state JSON decode failed") 1415 ); 1416 let _ = std::fs::remove_dir_all(corrupt_root); 1417 } 1418 1419 fn test_store(name: &str) -> (std::path::PathBuf, PocketStoreHandle) { 1420 let root = std::env::temp_dir().join(format!("{}-{}", name, std::process::id())); 1421 let _ = std::fs::remove_dir_all(&root); 1422 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 1423 .expect("config"); 1424 let store = PocketStoreHandle::open(&config).expect("store"); 1425 (root, store) 1426 } 1427 1428 fn has_pocket_tag(event: &PocketEvent, expected: &[&str]) -> bool { 1429 event.tags().expect("tags").iter().any(|tag| { 1430 tag.map(|value| std::str::from_utf8(value).expect("utf8")) 1431 .eq(expected.iter().copied()) 1432 }) 1433 } 1434 }