projection.rs (64557B)
1 use std::collections::{BTreeMap, BTreeSet}; 2 3 use crate::{ 4 CapabilitySet, GroupError, GroupErrorKind, GroupEventClass, GroupId, GroupLimitsConfig, 5 GroupMetadata, GroupMetadataFlags, GroupMetadataText, RoleDefinition, RoleName, SupportedKinds, 6 classify_group_event, event_view::GroupEventView, parse_group_metadata, 7 }; 8 use pocket_types::{Event as PocketEvent, OwnedEvent as PocketOwnedEvent}; 9 use serde::{Deserialize, Serialize}; 10 use tangle_protocol::{EventId, Kind, PublicKeyHex, UnixTimestamp}; 11 12 pub const GROUP_PROJECTION_SCHEMA_VERSION: u32 = 1; 13 pub const GROUP_POLICY_VERSION: u32 = 1; 14 15 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] 16 pub struct StoreOffset(u64); 17 18 impl StoreOffset { 19 pub fn new(value: u64) -> Self { 20 Self(value) 21 } 22 23 pub fn as_u64(self) -> u64 { 24 self.0 25 } 26 } 27 28 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] 29 pub struct ProjectionOrderTuple { 30 created_at: UnixTimestamp, 31 event_id: EventId, 32 store_offset: StoreOffset, 33 } 34 35 impl ProjectionOrderTuple { 36 pub fn new(created_at: UnixTimestamp, event_id: EventId, store_offset: StoreOffset) -> Self { 37 Self { 38 created_at, 39 event_id, 40 store_offset, 41 } 42 } 43 44 pub fn from_event_view( 45 event: &(impl GroupEventView + ?Sized), 46 store_offset: StoreOffset, 47 ) -> Result<Self, GroupError> { 48 Ok(Self::new(event.created_at(), event.id()?, store_offset)) 49 } 50 51 pub fn created_at(&self) -> UnixTimestamp { 52 self.created_at 53 } 54 55 pub fn event_id(&self) -> &EventId { 56 &self.event_id 57 } 58 59 pub fn store_offset(&self) -> StoreOffset { 60 self.store_offset 61 } 62 } 63 64 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 65 pub enum GroupLifecycleState { 66 Active, 67 Deleted, 68 } 69 70 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 71 pub enum MemberStatus { 72 Member, 73 Removed, 74 } 75 76 #[derive(Debug, Clone, PartialEq, Eq)] 77 pub struct GroupSnapshotIds { 78 metadata: Option<EventId>, 79 admins: Option<EventId>, 80 members: Option<EventId>, 81 roles: Option<EventId>, 82 state_39004: Option<EventId>, 83 } 84 85 impl GroupSnapshotIds { 86 pub fn empty() -> Self { 87 Self { 88 metadata: None, 89 admins: None, 90 members: None, 91 roles: None, 92 state_39004: None, 93 } 94 } 95 96 pub fn metadata(&self) -> Option<&EventId> { 97 self.metadata.as_ref() 98 } 99 100 pub fn admins(&self) -> Option<&EventId> { 101 self.admins.as_ref() 102 } 103 104 pub fn members(&self) -> Option<&EventId> { 105 self.members.as_ref() 106 } 107 108 pub fn roles(&self) -> Option<&EventId> { 109 self.roles.as_ref() 110 } 111 112 pub fn state_39004(&self) -> Option<&EventId> { 113 self.state_39004.as_ref() 114 } 115 116 fn set_for_kind(&mut self, kind: Kind, event_id: EventId) { 117 match kind.as_u32() { 118 crate::KIND_GROUP_METADATA => self.metadata = Some(event_id), 119 crate::KIND_GROUP_ADMINS => self.admins = Some(event_id), 120 crate::KIND_GROUP_MEMBERS => self.members = Some(event_id), 121 crate::KIND_GROUP_ROLES => self.roles = Some(event_id), 122 crate::KIND_GROUP_STATE_39004 => self.state_39004 = Some(event_id), 123 _ => {} 124 } 125 } 126 } 127 128 #[derive(Debug, Clone, PartialEq, Eq)] 129 pub struct GroupState { 130 id: GroupId, 131 lifecycle: GroupLifecycleState, 132 metadata: GroupMetadata, 133 created_by: PublicKeyHex, 134 created_event_id: EventId, 135 created_tuple: ProjectionOrderTuple, 136 metadata_tuple: ProjectionOrderTuple, 137 deleted_at: Option<UnixTimestamp>, 138 delete_event_id: Option<EventId>, 139 deleted_tuple: Option<ProjectionOrderTuple>, 140 snapshots: GroupSnapshotIds, 141 } 142 143 impl GroupState { 144 pub fn new( 145 id: GroupId, 146 metadata: GroupMetadata, 147 created_by: PublicKeyHex, 148 created_event_id: EventId, 149 created_tuple: ProjectionOrderTuple, 150 ) -> Self { 151 Self { 152 id, 153 lifecycle: GroupLifecycleState::Active, 154 metadata, 155 created_by, 156 created_event_id, 157 metadata_tuple: created_tuple.clone(), 158 created_tuple, 159 deleted_at: None, 160 delete_event_id: None, 161 deleted_tuple: None, 162 snapshots: GroupSnapshotIds::empty(), 163 } 164 } 165 166 pub fn id(&self) -> &GroupId { 167 &self.id 168 } 169 170 pub fn lifecycle(&self) -> GroupLifecycleState { 171 self.lifecycle 172 } 173 174 pub fn metadata(&self) -> &GroupMetadata { 175 &self.metadata 176 } 177 178 pub fn created_by(&self) -> &PublicKeyHex { 179 &self.created_by 180 } 181 182 pub fn created_event_id(&self) -> &EventId { 183 &self.created_event_id 184 } 185 186 pub fn created_tuple(&self) -> &ProjectionOrderTuple { 187 &self.created_tuple 188 } 189 190 pub fn metadata_tuple(&self) -> &ProjectionOrderTuple { 191 &self.metadata_tuple 192 } 193 194 pub fn deleted_at(&self) -> Option<UnixTimestamp> { 195 self.deleted_at 196 } 197 198 pub fn delete_event_id(&self) -> Option<&EventId> { 199 self.delete_event_id.as_ref() 200 } 201 202 pub fn snapshots(&self) -> &GroupSnapshotIds { 203 &self.snapshots 204 } 205 206 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 207 serde_json::to_vec(&GroupStateDocument::from_state(self)).map_err(|error| { 208 GroupError::internal(format!("group state JSON encode failed: {error}")) 209 }) 210 } 211 212 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 213 let document = serde_json::from_slice::<GroupStateDocument>(raw).map_err(|error| { 214 GroupError::internal(format!("group state JSON decode failed: {error}")) 215 })?; 216 document.into_state() 217 } 218 219 fn update_metadata(&mut self, metadata: GroupMetadata, tuple: ProjectionOrderTuple) { 220 if tuple >= self.metadata_tuple { 221 self.metadata = metadata; 222 self.metadata_tuple = tuple; 223 } 224 } 225 226 fn mark_deleted( 227 &mut self, 228 deleted_at: UnixTimestamp, 229 delete_event_id: EventId, 230 tuple: ProjectionOrderTuple, 231 ) { 232 if self 233 .deleted_tuple 234 .as_ref() 235 .is_none_or(|current| &tuple >= current) 236 { 237 self.lifecycle = GroupLifecycleState::Deleted; 238 self.deleted_at = Some(deleted_at); 239 self.delete_event_id = Some(delete_event_id); 240 self.deleted_tuple = Some(tuple); 241 } 242 } 243 } 244 245 #[derive(Debug, Clone, PartialEq, Eq)] 246 pub struct MemberState { 247 pubkey: PublicKeyHex, 248 status: MemberStatus, 249 roles: BTreeSet<RoleName>, 250 last_event_id: EventId, 251 last_tuple: ProjectionOrderTuple, 252 } 253 254 impl MemberState { 255 pub fn new( 256 pubkey: PublicKeyHex, 257 status: MemberStatus, 258 roles: BTreeSet<RoleName>, 259 last_event_id: EventId, 260 last_tuple: ProjectionOrderTuple, 261 ) -> Self { 262 Self { 263 pubkey, 264 status, 265 roles, 266 last_event_id, 267 last_tuple, 268 } 269 } 270 271 pub fn pubkey(&self) -> &PublicKeyHex { 272 &self.pubkey 273 } 274 275 pub fn status(&self) -> MemberStatus { 276 self.status 277 } 278 279 pub fn roles(&self) -> &BTreeSet<RoleName> { 280 &self.roles 281 } 282 283 pub fn last_event_id(&self) -> &EventId { 284 &self.last_event_id 285 } 286 287 pub fn last_tuple(&self) -> &ProjectionOrderTuple { 288 &self.last_tuple 289 } 290 291 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 292 serde_json::to_vec(&MemberStateDocument::from_state(self)).map_err(|error| { 293 GroupError::internal(format!("member state JSON encode failed: {error}")) 294 }) 295 } 296 297 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 298 let document = serde_json::from_slice::<MemberStateDocument>(raw).map_err(|error| { 299 GroupError::internal(format!("member state JSON decode failed: {error}")) 300 })?; 301 document.into_state() 302 } 303 } 304 305 #[derive(Debug, Clone, PartialEq, Eq)] 306 pub struct ProjectedRoleDefinition { 307 definition: RoleDefinition, 308 last_event_id: EventId, 309 last_tuple: ProjectionOrderTuple, 310 } 311 312 impl ProjectedRoleDefinition { 313 pub fn new( 314 definition: RoleDefinition, 315 last_event_id: EventId, 316 last_tuple: ProjectionOrderTuple, 317 ) -> Self { 318 Self { 319 definition, 320 last_event_id, 321 last_tuple, 322 } 323 } 324 325 pub fn definition(&self) -> &RoleDefinition { 326 &self.definition 327 } 328 329 pub fn last_event_id(&self) -> &EventId { 330 &self.last_event_id 331 } 332 333 pub fn last_tuple(&self) -> &ProjectionOrderTuple { 334 &self.last_tuple 335 } 336 337 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 338 serde_json::to_vec(&ProjectedRoleDefinitionDocument::from_role(self)).map_err(|error| { 339 GroupError::internal(format!("role state JSON encode failed: {error}")) 340 }) 341 } 342 343 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 344 let document = 345 serde_json::from_slice::<ProjectedRoleDefinitionDocument>(raw).map_err(|error| { 346 GroupError::internal(format!("role state JSON decode failed: {error}")) 347 })?; 348 document.into_role() 349 } 350 } 351 352 #[derive(Debug, Clone, PartialEq, Eq)] 353 pub struct GroupTombstone { 354 group_id: GroupId, 355 delete_event_id: EventId, 356 deleted_at: UnixTimestamp, 357 deleted_by: PublicKeyHex, 358 last_tuple: ProjectionOrderTuple, 359 } 360 361 impl GroupTombstone { 362 pub fn new( 363 group_id: GroupId, 364 delete_event_id: EventId, 365 deleted_at: UnixTimestamp, 366 deleted_by: PublicKeyHex, 367 last_tuple: ProjectionOrderTuple, 368 ) -> Self { 369 Self { 370 group_id, 371 delete_event_id, 372 deleted_at, 373 deleted_by, 374 last_tuple, 375 } 376 } 377 378 pub fn group_id(&self) -> &GroupId { 379 &self.group_id 380 } 381 382 pub fn delete_event_id(&self) -> &EventId { 383 &self.delete_event_id 384 } 385 386 pub fn deleted_at(&self) -> UnixTimestamp { 387 self.deleted_at 388 } 389 390 pub fn deleted_by(&self) -> &PublicKeyHex { 391 &self.deleted_by 392 } 393 394 pub fn last_tuple(&self) -> &ProjectionOrderTuple { 395 &self.last_tuple 396 } 397 398 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 399 serde_json::to_vec(&GroupTombstoneDocument::from_tombstone(self)) 400 .map_err(|error| GroupError::internal(format!("tombstone JSON encode failed: {error}"))) 401 } 402 403 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 404 let document = serde_json::from_slice::<GroupTombstoneDocument>(raw).map_err(|error| { 405 GroupError::internal(format!("tombstone JSON decode failed: {error}")) 406 })?; 407 document.into_tombstone() 408 } 409 } 410 411 #[derive(Debug, Clone, PartialEq, Eq)] 412 pub struct GroupEventDeletion { 413 group_id: GroupId, 414 target_event_id: EventId, 415 delete_event_id: EventId, 416 deleted_at: UnixTimestamp, 417 deleted_by: PublicKeyHex, 418 last_tuple: ProjectionOrderTuple, 419 } 420 421 impl GroupEventDeletion { 422 pub fn new( 423 group_id: GroupId, 424 target_event_id: EventId, 425 delete_event_id: EventId, 426 deleted_at: UnixTimestamp, 427 deleted_by: PublicKeyHex, 428 last_tuple: ProjectionOrderTuple, 429 ) -> Self { 430 Self { 431 group_id, 432 target_event_id, 433 delete_event_id, 434 deleted_at, 435 deleted_by, 436 last_tuple, 437 } 438 } 439 440 pub fn group_id(&self) -> &GroupId { 441 &self.group_id 442 } 443 444 pub fn target_event_id(&self) -> &EventId { 445 &self.target_event_id 446 } 447 448 pub fn delete_event_id(&self) -> &EventId { 449 &self.delete_event_id 450 } 451 452 pub fn deleted_at(&self) -> UnixTimestamp { 453 self.deleted_at 454 } 455 456 pub fn deleted_by(&self) -> &PublicKeyHex { 457 &self.deleted_by 458 } 459 460 pub fn last_tuple(&self) -> &ProjectionOrderTuple { 461 &self.last_tuple 462 } 463 464 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 465 serde_json::to_vec(&GroupEventDeletionDocument::from_deletion(self)).map_err(|error| { 466 GroupError::internal(format!("event deletion JSON encode failed: {error}")) 467 }) 468 } 469 470 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 471 let document = 472 serde_json::from_slice::<GroupEventDeletionDocument>(raw).map_err(|error| { 473 GroupError::internal(format!("event deletion JSON decode failed: {error}")) 474 })?; 475 document.into_deletion() 476 } 477 } 478 479 #[derive(Debug, Clone, PartialEq, Eq)] 480 pub struct ProjectionCheckpoint { 481 projection_version: u32, 482 policy_version: u32, 483 last_offset: Option<StoreOffset>, 484 rebuilt_at: UnixTimestamp, 485 } 486 487 impl ProjectionCheckpoint { 488 pub fn new( 489 projection_version: u32, 490 policy_version: u32, 491 last_offset: Option<StoreOffset>, 492 rebuilt_at: UnixTimestamp, 493 ) -> Self { 494 Self { 495 projection_version, 496 policy_version, 497 last_offset, 498 rebuilt_at, 499 } 500 } 501 502 pub fn current(last_offset: Option<StoreOffset>, rebuilt_at: UnixTimestamp) -> Self { 503 Self::new( 504 GROUP_PROJECTION_SCHEMA_VERSION, 505 GROUP_POLICY_VERSION, 506 last_offset, 507 rebuilt_at, 508 ) 509 } 510 511 pub fn projection_version(&self) -> u32 { 512 self.projection_version 513 } 514 515 pub fn policy_version(&self) -> u32 { 516 self.policy_version 517 } 518 519 pub fn last_offset(&self) -> Option<StoreOffset> { 520 self.last_offset 521 } 522 523 pub fn rebuilt_at(&self) -> UnixTimestamp { 524 self.rebuilt_at 525 } 526 527 pub fn matches_current_versions(&self) -> bool { 528 self.projection_version == GROUP_PROJECTION_SCHEMA_VERSION 529 && self.policy_version == GROUP_POLICY_VERSION 530 } 531 532 pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> { 533 serde_json::to_vec(&ProjectionCheckpointDocument::from_checkpoint(self)).map_err(|error| { 534 GroupError::internal(format!("checkpoint JSON encode failed: {error}")) 535 }) 536 } 537 538 pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> { 539 let document = 540 serde_json::from_slice::<ProjectionCheckpointDocument>(raw).map_err(|error| { 541 GroupError::internal(format!("checkpoint JSON decode failed: {error}")) 542 })?; 543 Ok(document.into_checkpoint()) 544 } 545 } 546 547 #[derive(Debug, Clone, PartialEq, Eq, Default)] 548 pub struct GroupProjection { 549 groups: BTreeMap<GroupId, GroupState>, 550 members: BTreeMap<(GroupId, PublicKeyHex), MemberState>, 551 roles: BTreeMap<(GroupId, RoleName), ProjectedRoleDefinition>, 552 tombstones: BTreeMap<GroupId, GroupTombstone>, 553 event_deletions: BTreeMap<EventId, GroupEventDeletion>, 554 checkpoint: Option<ProjectionCheckpoint>, 555 } 556 557 impl GroupProjection { 558 pub fn new() -> Self { 559 Self::default() 560 } 561 562 pub fn group(&self, group_id: &GroupId) -> Option<&GroupState> { 563 self.groups.get(group_id) 564 } 565 566 pub fn member(&self, group_id: &GroupId, pubkey: &PublicKeyHex) -> Option<&MemberState> { 567 self.members.get(&(group_id.clone(), pubkey.clone())) 568 } 569 570 pub fn role( 571 &self, 572 group_id: &GroupId, 573 role_name: &RoleName, 574 ) -> Option<&ProjectedRoleDefinition> { 575 self.roles.get(&(group_id.clone(), role_name.clone())) 576 } 577 578 pub fn tombstone(&self, group_id: &GroupId) -> Option<&GroupTombstone> { 579 self.tombstones.get(group_id) 580 } 581 582 pub fn event_deletion(&self, event_id: &EventId) -> Option<&GroupEventDeletion> { 583 self.event_deletions.get(event_id) 584 } 585 586 pub fn groups(&self) -> &BTreeMap<GroupId, GroupState> { 587 &self.groups 588 } 589 590 pub fn members(&self) -> &BTreeMap<(GroupId, PublicKeyHex), MemberState> { 591 &self.members 592 } 593 594 pub fn roles(&self) -> &BTreeMap<(GroupId, RoleName), ProjectedRoleDefinition> { 595 &self.roles 596 } 597 598 pub fn tombstones(&self) -> &BTreeMap<GroupId, GroupTombstone> { 599 &self.tombstones 600 } 601 602 pub fn event_deletions(&self) -> &BTreeMap<EventId, GroupEventDeletion> { 603 &self.event_deletions 604 } 605 606 pub fn checkpoint(&self) -> Option<&ProjectionCheckpoint> { 607 self.checkpoint.as_ref() 608 } 609 610 pub fn set_checkpoint(&mut self, checkpoint: ProjectionCheckpoint) { 611 self.checkpoint = Some(checkpoint); 612 } 613 614 pub fn put_group(&mut self, state: GroupState) { 615 self.groups.insert(state.id().clone(), state); 616 } 617 618 pub fn put_member(&mut self, group_id: GroupId, state: MemberState) { 619 let key = (group_id, state.pubkey().clone()); 620 if self 621 .members 622 .get(&key) 623 .is_none_or(|current| state.last_tuple() >= current.last_tuple()) 624 { 625 self.members.insert(key, state); 626 } 627 } 628 629 pub fn put_role(&mut self, group_id: GroupId, role: ProjectedRoleDefinition) { 630 let key = (group_id, role.definition().name().clone()); 631 if self 632 .roles 633 .get(&key) 634 .is_none_or(|current| role.last_tuple() >= current.last_tuple()) 635 { 636 self.roles.insert(key, role); 637 } 638 } 639 640 pub fn put_tombstone(&mut self, tombstone: GroupTombstone) { 641 if self 642 .tombstones 643 .get(tombstone.group_id()) 644 .is_none_or(|current| tombstone.last_tuple() >= current.last_tuple()) 645 { 646 self.tombstones 647 .insert(tombstone.group_id().clone(), tombstone); 648 } 649 } 650 651 pub fn put_event_deletion(&mut self, deletion: GroupEventDeletion) { 652 if self 653 .event_deletions 654 .get(deletion.target_event_id()) 655 .is_none_or(|current| deletion.last_tuple() >= current.last_tuple()) 656 { 657 self.event_deletions 658 .insert(deletion.target_event_id().clone(), deletion); 659 } 660 } 661 662 pub fn apply_canonical_event( 663 &mut self, 664 event: &(impl GroupEventView + ?Sized), 665 store_offset: StoreOffset, 666 limits: GroupLimitsConfig, 667 ) -> Result<ProjectionApplyOutcome, GroupError> { 668 let class = classify_group_event(event, limits)?; 669 let tuple = ProjectionOrderTuple::from_event_view(event, store_offset)?; 670 match class { 671 GroupEventClass::NonGroup => Ok(ProjectionApplyOutcome::Skipped), 672 GroupEventClass::Normal { .. } => Ok(ProjectionApplyOutcome::Ignored), 673 GroupEventClass::Moderation { group_id, .. } => { 674 self.apply_moderation_event(group_id, event, tuple, limits) 675 } 676 GroupEventClass::RelayGeneratedSnapshot { kind, group_id } => { 677 self.apply_snapshot_event(group_id, kind, event, tuple, limits) 678 } 679 } 680 } 681 682 fn apply_moderation_event( 683 &mut self, 684 group_id: GroupId, 685 event: &(impl GroupEventView + ?Sized), 686 tuple: ProjectionOrderTuple, 687 limits: GroupLimitsConfig, 688 ) -> Result<ProjectionApplyOutcome, GroupError> { 689 match event.kind_u32() { 690 crate::KIND_GROUP_CREATE_GROUP => { 691 let state = GroupState::new( 692 group_id.clone(), 693 parse_group_metadata(event, limits)?, 694 event.pubkey()?, 695 event.id()?, 696 tuple, 697 ); 698 if self 699 .group(&group_id) 700 .is_none_or(|current| state.created_tuple() >= current.created_tuple()) 701 { 702 self.put_group(state); 703 Ok(ProjectionApplyOutcome::Applied) 704 } else { 705 Ok(ProjectionApplyOutcome::Ignored) 706 } 707 } 708 crate::KIND_GROUP_EDIT_METADATA => { 709 let Some(group) = self.groups.get_mut(&group_id) else { 710 return Ok(ProjectionApplyOutcome::Ignored); 711 }; 712 group.update_metadata(parse_group_metadata(event, limits)?, tuple); 713 Ok(ProjectionApplyOutcome::Applied) 714 } 715 crate::KIND_GROUP_PUT_USER => { 716 self.apply_member_status(group_id, event, tuple, MemberStatus::Member) 717 } 718 crate::KIND_GROUP_REMOVE_USER => { 719 self.apply_member_status(group_id, event, tuple, MemberStatus::Removed) 720 } 721 crate::KIND_GROUP_DELETE_EVENT => { 722 let target_event_id = 723 EventId::new(&first_tag_value(event, "e")?).map_err(|reason| { 724 GroupError::invalid( 725 GroupErrorKind::MalformedTargetTag, 726 format!("malformed e target tag: {reason}"), 727 ) 728 })?; 729 let deletion = GroupEventDeletion::new( 730 group_id, 731 target_event_id, 732 event.id()?, 733 event.created_at(), 734 event.pubkey()?, 735 tuple, 736 ); 737 self.put_event_deletion(deletion); 738 Ok(ProjectionApplyOutcome::Applied) 739 } 740 crate::KIND_GROUP_DELETE_GROUP => { 741 let tombstone = GroupTombstone::new( 742 group_id.clone(), 743 event.id()?, 744 event.created_at(), 745 event.pubkey()?, 746 tuple.clone(), 747 ); 748 if let Some(group) = self.groups.get_mut(&group_id) { 749 group.mark_deleted(event.created_at(), event.id()?, tuple.clone()); 750 } 751 if self 752 .tombstones 753 .get(&group_id) 754 .is_none_or(|current| &tuple >= current.last_tuple()) 755 { 756 self.tombstones.insert(group_id, tombstone); 757 Ok(ProjectionApplyOutcome::Applied) 758 } else { 759 Ok(ProjectionApplyOutcome::Ignored) 760 } 761 } 762 _ => Ok(ProjectionApplyOutcome::Ignored), 763 } 764 } 765 766 fn apply_snapshot_event( 767 &mut self, 768 group_id: GroupId, 769 kind: Kind, 770 event: &(impl GroupEventView + ?Sized), 771 tuple: ProjectionOrderTuple, 772 limits: GroupLimitsConfig, 773 ) -> Result<ProjectionApplyOutcome, GroupError> { 774 if kind.as_u32() == crate::KIND_GROUP_METADATA { 775 let metadata = parse_group_metadata(event, limits)?; 776 let event_id = event.id()?; 777 if let Some(group) = self.groups.get_mut(&group_id) { 778 group.update_metadata(metadata, tuple.clone()); 779 group.snapshots.set_for_kind(kind, event_id); 780 } else { 781 let mut state = GroupState::new( 782 group_id.clone(), 783 metadata, 784 event.pubkey()?, 785 event_id.clone(), 786 tuple.clone(), 787 ); 788 state.snapshots.set_for_kind(kind, event_id); 789 self.put_group(state); 790 } 791 } else if let Some(group) = self.groups.get_mut(&group_id) { 792 group.snapshots.set_for_kind(kind, event.id()?); 793 } 794 Ok(ProjectionApplyOutcome::Applied) 795 } 796 797 fn apply_member_status( 798 &mut self, 799 group_id: GroupId, 800 event: &(impl GroupEventView + ?Sized), 801 tuple: ProjectionOrderTuple, 802 status: MemberStatus, 803 ) -> Result<ProjectionApplyOutcome, GroupError> { 804 let target = first_tag_value(event, "p")?; 805 let pubkey = PublicKeyHex::new(&target).map_err(|reason| { 806 GroupError::invalid( 807 GroupErrorKind::MalformedTargetTag, 808 format!("malformed p target tag: {reason}"), 809 ) 810 })?; 811 let roles = role_tags(event)?; 812 let state = MemberState::new(pubkey, status, roles, event.id()?, tuple); 813 self.put_member(group_id, state); 814 Ok(ProjectionApplyOutcome::Applied) 815 } 816 } 817 818 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 819 pub enum ProjectionApplyOutcome { 820 Applied, 821 Ignored, 822 Skipped, 823 } 824 825 #[derive(Debug, Clone, PartialEq, Eq)] 826 pub struct CanonicalGroupEvent { 827 event: PocketOwnedEvent, 828 store_offset: StoreOffset, 829 } 830 831 impl CanonicalGroupEvent { 832 pub fn new(event: PocketOwnedEvent, store_offset: StoreOffset) -> Self { 833 Self { 834 event, 835 store_offset, 836 } 837 } 838 839 pub fn event(&self) -> &PocketEvent { 840 &self.event 841 } 842 843 pub fn store_offset(&self) -> StoreOffset { 844 self.store_offset 845 } 846 847 pub fn tuple(&self) -> ProjectionOrderTuple { 848 ProjectionOrderTuple::from_event_view(&self.event, self.store_offset).expect("tuple") 849 } 850 } 851 852 #[derive(Debug, Clone, PartialEq, Eq)] 853 pub struct ProjectionRebuildReport { 854 projection: GroupProjection, 855 applied_events: usize, 856 ignored_events: usize, 857 skipped_events: usize, 858 last_offset: Option<StoreOffset>, 859 } 860 861 impl ProjectionRebuildReport { 862 pub fn projection(&self) -> &GroupProjection { 863 &self.projection 864 } 865 866 pub fn into_projection(self) -> GroupProjection { 867 self.projection 868 } 869 870 pub fn applied_events(&self) -> usize { 871 self.applied_events 872 } 873 874 pub fn ignored_events(&self) -> usize { 875 self.ignored_events 876 } 877 878 pub fn skipped_events(&self) -> usize { 879 self.skipped_events 880 } 881 882 pub fn last_offset(&self) -> Option<StoreOffset> { 883 self.last_offset 884 } 885 } 886 887 #[derive(Debug, Clone, PartialEq, Eq)] 888 pub enum GroupRecoveryReadiness { 889 Ready, 890 FailedClosed { reason: String }, 891 } 892 893 impl GroupRecoveryReadiness { 894 pub fn from_projection_result<T>(result: &Result<T, GroupError>) -> Self { 895 match result { 896 Ok(_) => Self::Ready, 897 Err(error) => Self::FailedClosed { 898 reason: error.prefixed_message(), 899 }, 900 } 901 } 902 } 903 904 pub fn rebuild_group_projection( 905 events: impl IntoIterator<Item = CanonicalGroupEvent>, 906 limits: GroupLimitsConfig, 907 rebuilt_at: UnixTimestamp, 908 ) -> Result<ProjectionRebuildReport, GroupError> { 909 let mut events = events.into_iter().collect::<Vec<_>>(); 910 events.sort_by_key(CanonicalGroupEvent::tuple); 911 let mut projection = GroupProjection::new(); 912 let mut applied_events = 0; 913 let mut ignored_events = 0; 914 let mut skipped_events = 0; 915 let mut last_offset = None; 916 for item in events { 917 last_offset = Some( 918 last_offset 919 .map(|current: StoreOffset| current.max(item.store_offset())) 920 .unwrap_or_else(|| item.store_offset()), 921 ); 922 match projection.apply_canonical_event(item.event(), item.store_offset(), limits)? { 923 ProjectionApplyOutcome::Applied => applied_events += 1, 924 ProjectionApplyOutcome::Ignored => ignored_events += 1, 925 ProjectionApplyOutcome::Skipped => skipped_events += 1, 926 } 927 } 928 projection.set_checkpoint(ProjectionCheckpoint::current(last_offset, rebuilt_at)); 929 Ok(ProjectionRebuildReport { 930 projection, 931 applied_events, 932 ignored_events, 933 skipped_events, 934 last_offset, 935 }) 936 } 937 938 pub fn group_current_key(group_id: &GroupId) -> Vec<u8> { 939 prefixed_key("group", group_id.as_str(), None) 940 } 941 942 pub fn member_current_key(group_id: &GroupId, pubkey: &PublicKeyHex) -> Vec<u8> { 943 prefixed_key("member", group_id.as_str(), Some(pubkey.as_str())) 944 } 945 946 pub fn role_current_key(group_id: &GroupId, role_name: &RoleName) -> Vec<u8> { 947 prefixed_key("role", group_id.as_str(), Some(role_name.as_str())) 948 } 949 950 pub fn tombstone_key(group_id: &GroupId) -> Vec<u8> { 951 prefixed_key("tombstone", group_id.as_str(), None) 952 } 953 954 pub fn event_deletion_key(event_id: &EventId) -> Vec<u8> { 955 prefixed_key("event_deletion", event_id.as_str(), None) 956 } 957 958 pub fn projection_checkpoint_key() -> Vec<u8> { 959 b"checkpoint\0groups".to_vec() 960 } 961 962 fn prefixed_key(prefix: &str, first: &str, second: Option<&str>) -> Vec<u8> { 963 let mut key = Vec::new(); 964 key.extend_from_slice(prefix.as_bytes()); 965 key.push(0); 966 key.extend_from_slice(first.as_bytes()); 967 if let Some(second) = second { 968 key.push(0); 969 key.extend_from_slice(second.as_bytes()); 970 } 971 key 972 } 973 974 fn first_tag_value( 975 event: &(impl GroupEventView + ?Sized), 976 name: &str, 977 ) -> Result<String, GroupError> { 978 let mut found = None; 979 event.visit_tags(|tag| { 980 if tag.first_value().is_none_or(|tag_name| tag_name != name) { 981 return Ok(()); 982 } 983 let Some((_, value)) = tag.indexed_pair() else { 984 return Err(GroupError::invalid( 985 GroupErrorKind::MalformedTargetTag, 986 format!("malformed {name} target tag"), 987 )); 988 }; 989 found = Some(value.to_owned()); 990 Ok(()) 991 })?; 992 found.ok_or_else(|| { 993 GroupError::invalid( 994 GroupErrorKind::MissingTargetTag, 995 format!("missing {name} target tag"), 996 ) 997 }) 998 } 999 1000 fn role_tags(event: &(impl GroupEventView + ?Sized)) -> Result<BTreeSet<RoleName>, GroupError> { 1001 let mut roles = BTreeSet::new(); 1002 event.visit_tags(|tag| { 1003 if tag.first_value().is_some_and(|name| name == "role") 1004 && let Some(value) = tag.value(1) 1005 { 1006 roles.insert(RoleName::new(value)?); 1007 } 1008 Ok(()) 1009 })?; 1010 Ok(roles) 1011 } 1012 1013 #[derive(Debug, Clone, Serialize, Deserialize)] 1014 struct TupleDocument { 1015 created_at: u64, 1016 event_id: String, 1017 store_offset: u64, 1018 } 1019 1020 impl TupleDocument { 1021 fn from_tuple(tuple: &ProjectionOrderTuple) -> Self { 1022 Self { 1023 created_at: tuple.created_at().as_u64(), 1024 event_id: tuple.event_id().as_str().to_owned(), 1025 store_offset: tuple.store_offset().as_u64(), 1026 } 1027 } 1028 1029 fn into_tuple(self) -> Result<ProjectionOrderTuple, GroupError> { 1030 Ok(ProjectionOrderTuple::new( 1031 UnixTimestamp::new(self.created_at), 1032 EventId::new(&self.event_id).map_err(GroupError::internal)?, 1033 StoreOffset::new(self.store_offset), 1034 )) 1035 } 1036 } 1037 1038 #[derive(Debug, Clone, Serialize, Deserialize)] 1039 struct MetadataDocument { 1040 name: Option<String>, 1041 picture: Option<String>, 1042 about: Option<String>, 1043 private: bool, 1044 restricted: bool, 1045 hidden: bool, 1046 closed: bool, 1047 supported_kinds: SupportedKindsDocument, 1048 } 1049 1050 impl MetadataDocument { 1051 fn from_metadata(metadata: &GroupMetadata) -> Self { 1052 Self { 1053 name: metadata.name().map(str::to_owned), 1054 picture: metadata.picture().map(str::to_owned), 1055 about: metadata.about().map(str::to_owned), 1056 private: metadata.private(), 1057 restricted: metadata.restricted(), 1058 hidden: metadata.hidden(), 1059 closed: metadata.closed(), 1060 supported_kinds: SupportedKindsDocument::from_supported(metadata.supported_kinds()), 1061 } 1062 } 1063 1064 fn into_metadata(self) -> Result<GroupMetadata, GroupError> { 1065 Ok(GroupMetadata::from_parts( 1066 GroupMetadataText::new(self.name, self.picture, self.about), 1067 GroupMetadataFlags::new(self.private, self.restricted, self.hidden, self.closed), 1068 self.supported_kinds.into_supported()?, 1069 )) 1070 } 1071 } 1072 1073 #[derive(Debug, Clone, Serialize, Deserialize)] 1074 #[serde(tag = "mode", content = "kinds")] 1075 enum SupportedKindsDocument { 1076 UnspecifiedAll, 1077 None, 1078 Only(Vec<u32>), 1079 } 1080 1081 impl SupportedKindsDocument { 1082 fn from_supported(value: &SupportedKinds) -> Self { 1083 match value { 1084 SupportedKinds::UnspecifiedAll => Self::UnspecifiedAll, 1085 SupportedKinds::None => Self::None, 1086 SupportedKinds::Only(kinds) => { 1087 Self::Only(kinds.iter().map(|kind| kind.as_u32()).collect()) 1088 } 1089 } 1090 } 1091 1092 fn into_supported(self) -> Result<SupportedKinds, GroupError> { 1093 match self { 1094 Self::UnspecifiedAll => Ok(SupportedKinds::UnspecifiedAll), 1095 Self::None => Ok(SupportedKinds::None), 1096 Self::Only(values) => values 1097 .into_iter() 1098 .map(|value| { 1099 Kind::new(value.into()).map_err(|reason| { 1100 GroupError::invalid( 1101 GroupErrorKind::UnsupportedGroupKind, 1102 format!("supported kind is invalid: {reason}"), 1103 ) 1104 }) 1105 }) 1106 .collect::<Result<BTreeSet<_>, _>>() 1107 .map(SupportedKinds::Only), 1108 } 1109 } 1110 } 1111 1112 #[derive(Debug, Clone, Serialize, Deserialize)] 1113 struct SnapshotIdsDocument { 1114 metadata: Option<String>, 1115 admins: Option<String>, 1116 members: Option<String>, 1117 roles: Option<String>, 1118 state_39004: Option<String>, 1119 } 1120 1121 impl SnapshotIdsDocument { 1122 fn from_snapshots(value: &GroupSnapshotIds) -> Self { 1123 Self { 1124 metadata: value.metadata().map(|id| id.as_str().to_owned()), 1125 admins: value.admins().map(|id| id.as_str().to_owned()), 1126 members: value.members().map(|id| id.as_str().to_owned()), 1127 roles: value.roles().map(|id| id.as_str().to_owned()), 1128 state_39004: value.state_39004().map(|id| id.as_str().to_owned()), 1129 } 1130 } 1131 1132 fn into_snapshots(self) -> Result<GroupSnapshotIds, GroupError> { 1133 Ok(GroupSnapshotIds { 1134 metadata: parse_optional_event_id(self.metadata)?, 1135 admins: parse_optional_event_id(self.admins)?, 1136 members: parse_optional_event_id(self.members)?, 1137 roles: parse_optional_event_id(self.roles)?, 1138 state_39004: parse_optional_event_id(self.state_39004)?, 1139 }) 1140 } 1141 } 1142 1143 #[derive(Debug, Clone, Serialize, Deserialize)] 1144 struct GroupStateDocument { 1145 id: String, 1146 lifecycle: String, 1147 metadata: MetadataDocument, 1148 created_by: String, 1149 created_event_id: String, 1150 created_tuple: TupleDocument, 1151 metadata_tuple: TupleDocument, 1152 deleted_at: Option<u64>, 1153 delete_event_id: Option<String>, 1154 deleted_tuple: Option<TupleDocument>, 1155 snapshots: SnapshotIdsDocument, 1156 } 1157 1158 impl GroupStateDocument { 1159 fn from_state(state: &GroupState) -> Self { 1160 Self { 1161 id: state.id().as_str().to_owned(), 1162 lifecycle: lifecycle_label(state.lifecycle()), 1163 metadata: MetadataDocument::from_metadata(state.metadata()), 1164 created_by: state.created_by().as_str().to_owned(), 1165 created_event_id: state.created_event_id().as_str().to_owned(), 1166 created_tuple: TupleDocument::from_tuple(state.created_tuple()), 1167 metadata_tuple: TupleDocument::from_tuple(state.metadata_tuple()), 1168 deleted_at: state.deleted_at().map(UnixTimestamp::as_u64), 1169 delete_event_id: state.delete_event_id().map(|id| id.as_str().to_owned()), 1170 deleted_tuple: state.deleted_tuple.as_ref().map(TupleDocument::from_tuple), 1171 snapshots: SnapshotIdsDocument::from_snapshots(state.snapshots()), 1172 } 1173 } 1174 1175 fn into_state(self) -> Result<GroupState, GroupError> { 1176 Ok(GroupState { 1177 id: GroupId::new(&self.id)?, 1178 lifecycle: parse_lifecycle(&self.lifecycle)?, 1179 metadata: self.metadata.into_metadata()?, 1180 created_by: PublicKeyHex::new(&self.created_by).map_err(GroupError::internal)?, 1181 created_event_id: EventId::new(&self.created_event_id).map_err(GroupError::internal)?, 1182 created_tuple: self.created_tuple.into_tuple()?, 1183 metadata_tuple: self.metadata_tuple.into_tuple()?, 1184 deleted_at: self.deleted_at.map(UnixTimestamp::new), 1185 delete_event_id: parse_optional_event_id(self.delete_event_id)?, 1186 deleted_tuple: self 1187 .deleted_tuple 1188 .map(TupleDocument::into_tuple) 1189 .transpose()?, 1190 snapshots: self.snapshots.into_snapshots()?, 1191 }) 1192 } 1193 } 1194 1195 #[derive(Debug, Clone, Serialize, Deserialize)] 1196 struct MemberStateDocument { 1197 pubkey: String, 1198 status: String, 1199 roles: Vec<String>, 1200 last_event_id: String, 1201 last_tuple: TupleDocument, 1202 } 1203 1204 impl MemberStateDocument { 1205 fn from_state(state: &MemberState) -> Self { 1206 Self { 1207 pubkey: state.pubkey().as_str().to_owned(), 1208 status: member_status_label(state.status()), 1209 roles: state 1210 .roles() 1211 .iter() 1212 .map(|role| role.as_str().to_owned()) 1213 .collect(), 1214 last_event_id: state.last_event_id().as_str().to_owned(), 1215 last_tuple: TupleDocument::from_tuple(state.last_tuple()), 1216 } 1217 } 1218 1219 fn into_state(self) -> Result<MemberState, GroupError> { 1220 Ok(MemberState::new( 1221 PublicKeyHex::new(&self.pubkey).map_err(GroupError::internal)?, 1222 parse_member_status(&self.status)?, 1223 self.roles 1224 .into_iter() 1225 .map(|role| RoleName::new(&role)) 1226 .collect::<Result<BTreeSet<_>, _>>()?, 1227 EventId::new(&self.last_event_id).map_err(GroupError::internal)?, 1228 self.last_tuple.into_tuple()?, 1229 )) 1230 } 1231 } 1232 1233 #[derive(Debug, Clone, Serialize, Deserialize)] 1234 struct ProjectedRoleDefinitionDocument { 1235 name: String, 1236 capabilities: Vec<String>, 1237 description: Option<String>, 1238 last_event_id: String, 1239 last_tuple: TupleDocument, 1240 } 1241 1242 impl ProjectedRoleDefinitionDocument { 1243 fn from_role(role: &ProjectedRoleDefinition) -> Self { 1244 Self { 1245 name: role.definition().name().as_str().to_owned(), 1246 capabilities: role 1247 .definition() 1248 .capabilities() 1249 .labels() 1250 .into_iter() 1251 .map(str::to_owned) 1252 .collect(), 1253 description: role.definition().description().map(str::to_owned), 1254 last_event_id: role.last_event_id().as_str().to_owned(), 1255 last_tuple: TupleDocument::from_tuple(role.last_tuple()), 1256 } 1257 } 1258 1259 fn into_role(self) -> Result<ProjectedRoleDefinition, GroupError> { 1260 Ok(ProjectedRoleDefinition::new( 1261 RoleDefinition::new( 1262 RoleName::new(&self.name)?, 1263 CapabilitySet::from_labels(&self.capabilities)?, 1264 self.description, 1265 ), 1266 EventId::new(&self.last_event_id).map_err(GroupError::internal)?, 1267 self.last_tuple.into_tuple()?, 1268 )) 1269 } 1270 } 1271 1272 #[derive(Debug, Clone, Serialize, Deserialize)] 1273 struct GroupTombstoneDocument { 1274 group_id: String, 1275 delete_event_id: String, 1276 deleted_at: u64, 1277 deleted_by: String, 1278 last_tuple: TupleDocument, 1279 } 1280 1281 impl GroupTombstoneDocument { 1282 fn from_tombstone(tombstone: &GroupTombstone) -> Self { 1283 Self { 1284 group_id: tombstone.group_id().as_str().to_owned(), 1285 delete_event_id: tombstone.delete_event_id().as_str().to_owned(), 1286 deleted_at: tombstone.deleted_at().as_u64(), 1287 deleted_by: tombstone.deleted_by().as_str().to_owned(), 1288 last_tuple: TupleDocument::from_tuple(tombstone.last_tuple()), 1289 } 1290 } 1291 1292 fn into_tombstone(self) -> Result<GroupTombstone, GroupError> { 1293 Ok(GroupTombstone::new( 1294 GroupId::new(&self.group_id)?, 1295 EventId::new(&self.delete_event_id).map_err(GroupError::internal)?, 1296 UnixTimestamp::new(self.deleted_at), 1297 PublicKeyHex::new(&self.deleted_by).map_err(GroupError::internal)?, 1298 self.last_tuple.into_tuple()?, 1299 )) 1300 } 1301 } 1302 1303 #[derive(Debug, Clone, Serialize, Deserialize)] 1304 struct GroupEventDeletionDocument { 1305 group_id: String, 1306 target_event_id: String, 1307 delete_event_id: String, 1308 deleted_at: u64, 1309 deleted_by: String, 1310 last_tuple: TupleDocument, 1311 } 1312 1313 impl GroupEventDeletionDocument { 1314 fn from_deletion(deletion: &GroupEventDeletion) -> Self { 1315 Self { 1316 group_id: deletion.group_id().as_str().to_owned(), 1317 target_event_id: deletion.target_event_id().as_str().to_owned(), 1318 delete_event_id: deletion.delete_event_id().as_str().to_owned(), 1319 deleted_at: deletion.deleted_at().as_u64(), 1320 deleted_by: deletion.deleted_by().as_str().to_owned(), 1321 last_tuple: TupleDocument::from_tuple(deletion.last_tuple()), 1322 } 1323 } 1324 1325 fn into_deletion(self) -> Result<GroupEventDeletion, GroupError> { 1326 Ok(GroupEventDeletion::new( 1327 GroupId::new(&self.group_id)?, 1328 EventId::new(&self.target_event_id).map_err(GroupError::internal)?, 1329 EventId::new(&self.delete_event_id).map_err(GroupError::internal)?, 1330 UnixTimestamp::new(self.deleted_at), 1331 PublicKeyHex::new(&self.deleted_by).map_err(GroupError::internal)?, 1332 self.last_tuple.into_tuple()?, 1333 )) 1334 } 1335 } 1336 1337 #[derive(Debug, Clone, Serialize, Deserialize)] 1338 struct ProjectionCheckpointDocument { 1339 projection_version: u32, 1340 policy_version: u32, 1341 last_offset: Option<u64>, 1342 rebuilt_at: u64, 1343 } 1344 1345 impl ProjectionCheckpointDocument { 1346 fn from_checkpoint(checkpoint: &ProjectionCheckpoint) -> Self { 1347 Self { 1348 projection_version: checkpoint.projection_version(), 1349 policy_version: checkpoint.policy_version(), 1350 last_offset: checkpoint.last_offset().map(StoreOffset::as_u64), 1351 rebuilt_at: checkpoint.rebuilt_at().as_u64(), 1352 } 1353 } 1354 1355 fn into_checkpoint(self) -> ProjectionCheckpoint { 1356 ProjectionCheckpoint::new( 1357 self.projection_version, 1358 self.policy_version, 1359 self.last_offset.map(StoreOffset::new), 1360 UnixTimestamp::new(self.rebuilt_at), 1361 ) 1362 } 1363 } 1364 1365 fn lifecycle_label(value: GroupLifecycleState) -> String { 1366 match value { 1367 GroupLifecycleState::Active => "active", 1368 GroupLifecycleState::Deleted => "deleted", 1369 } 1370 .to_owned() 1371 } 1372 1373 fn parse_lifecycle(value: &str) -> Result<GroupLifecycleState, GroupError> { 1374 match value { 1375 "active" => Ok(GroupLifecycleState::Active), 1376 "deleted" => Ok(GroupLifecycleState::Deleted), 1377 _ => Err(GroupError::internal(format!( 1378 "unknown group lifecycle {value}" 1379 ))), 1380 } 1381 } 1382 1383 fn member_status_label(value: MemberStatus) -> String { 1384 match value { 1385 MemberStatus::Member => "member", 1386 MemberStatus::Removed => "removed", 1387 } 1388 .to_owned() 1389 } 1390 1391 fn parse_member_status(value: &str) -> Result<MemberStatus, GroupError> { 1392 match value { 1393 "member" => Ok(MemberStatus::Member), 1394 "removed" => Ok(MemberStatus::Removed), 1395 _ => Err(GroupError::internal(format!( 1396 "unknown member status {value}" 1397 ))), 1398 } 1399 } 1400 1401 fn parse_optional_event_id(value: Option<String>) -> Result<Option<EventId>, GroupError> { 1402 value 1403 .as_deref() 1404 .map(EventId::new) 1405 .transpose() 1406 .map_err(GroupError::internal) 1407 } 1408 1409 #[cfg(test)] 1410 mod tests { 1411 use super::{ 1412 CanonicalGroupEvent, GroupEventDeletion, GroupLifecycleState, GroupProjection, 1413 GroupTombstone, MemberStatus, ProjectedRoleDefinition, ProjectionCheckpoint, 1414 ProjectionOrderTuple, StoreOffset, event_deletion_key, group_current_key, 1415 member_current_key, projection_checkpoint_key, rebuild_group_projection, role_current_key, 1416 tombstone_key, 1417 }; 1418 use crate::{ 1419 Capability, CapabilitySet, GroupId, GroupLimitsConfig, KIND_GROUP_CREATE_GROUP, 1420 KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, 1421 KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, RoleDefinition, RoleName, SupportedKinds, 1422 }; 1423 use pocket_types::{Event as PocketEvent, OwnedEvent as PocketOwnedEvent}; 1424 use tangle_protocol::{ 1425 Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, 1426 event_to_value, 1427 }; 1428 1429 #[test] 1430 fn projection_order_tuple_sorts_by_created_event_and_offset() { 1431 let mut tuples = [ 1432 tuple(2, "b", 1), 1433 tuple(1, "c", 2), 1434 tuple(1, "a", 3), 1435 tuple(1, "a", 1), 1436 ]; 1437 1438 tuples.sort(); 1439 1440 assert_eq!(tuples[0], tuple(1, "a", 1)); 1441 assert_eq!(tuples[1], tuple(1, "a", 3)); 1442 assert_eq!(tuples[2], tuple(1, "c", 2)); 1443 assert_eq!(tuples[3], tuple(2, "b", 1)); 1444 } 1445 1446 #[test] 1447 fn projection_applies_create_metadata_member_delete_and_snapshots() { 1448 let mut projection = GroupProjection::new(); 1449 projection 1450 .apply_canonical_event( 1451 &event( 1452 KIND_GROUP_CREATE_GROUP, 1453 "10", 1454 10, 1455 vec![ 1456 Tag::from_parts("h", &["Farm"]).expect("h"), 1457 Tag::from_parts("name", &["Farmers"]).expect("name"), 1458 ], 1459 ), 1460 StoreOffset::new(1), 1461 GroupLimitsConfig::default(), 1462 ) 1463 .expect("create"); 1464 projection 1465 .apply_canonical_event( 1466 &event( 1467 KIND_GROUP_EDIT_METADATA, 1468 "20", 1469 20, 1470 vec![ 1471 Tag::from_parts("h", &["Farm"]).expect("h"), 1472 Tag::from_parts("name", &["Market"]).expect("name"), 1473 ], 1474 ), 1475 StoreOffset::new(2), 1476 GroupLimitsConfig::default(), 1477 ) 1478 .expect("metadata"); 1479 projection 1480 .apply_canonical_event( 1481 &event( 1482 KIND_GROUP_PUT_USER, 1483 "30", 1484 30, 1485 vec![ 1486 Tag::from_parts("h", &["Farm"]).expect("h"), 1487 Tag::from_parts("p", &[&"8".repeat(64)]).expect("p"), 1488 Tag::from_parts("role", &["moderator"]).expect("role"), 1489 ], 1490 ), 1491 StoreOffset::new(3), 1492 GroupLimitsConfig::default(), 1493 ) 1494 .expect("member"); 1495 projection 1496 .apply_canonical_event( 1497 &event( 1498 KIND_GROUP_METADATA, 1499 "40", 1500 40, 1501 vec![ 1502 Tag::from_parts("d", &["Farm"]).expect("d"), 1503 Tag::from_parts("name", &["Snapshot"]).expect("name"), 1504 ], 1505 ), 1506 StoreOffset::new(4), 1507 GroupLimitsConfig::default(), 1508 ) 1509 .expect("snapshot"); 1510 projection 1511 .apply_canonical_event( 1512 &event( 1513 KIND_GROUP_DELETE_EVENT, 1514 "45", 1515 45, 1516 vec![ 1517 Tag::from_parts("h", &["Farm"]).expect("h"), 1518 Tag::from_parts("e", &[id("30")]).expect("e"), 1519 ], 1520 ), 1521 StoreOffset::new(5), 1522 GroupLimitsConfig::default(), 1523 ) 1524 .expect("event deletion"); 1525 projection 1526 .apply_canonical_event( 1527 &event( 1528 KIND_GROUP_DELETE_GROUP, 1529 "50", 1530 50, 1531 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1532 ), 1533 StoreOffset::new(6), 1534 GroupLimitsConfig::default(), 1535 ) 1536 .expect("delete"); 1537 1538 let group = projection 1539 .group(&GroupId::new("Farm").expect("group")) 1540 .expect("group"); 1541 assert_eq!(group.metadata().name(), Some("Snapshot")); 1542 assert_eq!(group.lifecycle(), GroupLifecycleState::Deleted); 1543 assert_eq!( 1544 group.snapshots().metadata().expect("snapshot").as_str(), 1545 id("40") 1546 ); 1547 let member = projection 1548 .member( 1549 &GroupId::new("Farm").expect("group"), 1550 &PublicKeyHex::new(&"8".repeat(64)).expect("pubkey"), 1551 ) 1552 .expect("member"); 1553 assert_eq!(member.status(), MemberStatus::Member); 1554 assert!( 1555 member 1556 .roles() 1557 .contains(&crate::RoleName::new("moderator").expect("role")) 1558 ); 1559 assert!( 1560 projection 1561 .tombstone(&GroupId::new("Farm").expect("group")) 1562 .is_some() 1563 ); 1564 assert!( 1565 projection 1566 .event_deletion(&EventId::new(id("30")).expect("event")) 1567 .is_some() 1568 ); 1569 } 1570 1571 #[test] 1572 fn projection_applies_pocket_event_views_equivalent_to_protocol_events() { 1573 let limits = GroupLimitsConfig::default(); 1574 let events = projection_event_stream(); 1575 let mut protocol_projection = GroupProjection::new(); 1576 let mut pocket_projection = GroupProjection::new(); 1577 1578 for (event, offset) in &events { 1579 protocol_projection 1580 .apply_canonical_event(event, *offset, limits) 1581 .expect("protocol event"); 1582 let pocket = pocket_event(event); 1583 pocket_projection 1584 .apply_canonical_event(&pocket, *offset, limits) 1585 .expect("pocket event"); 1586 } 1587 1588 assert_eq!(protocol_projection.groups(), pocket_projection.groups()); 1589 assert_eq!(protocol_projection.members(), pocket_projection.members()); 1590 assert_eq!(protocol_projection.roles(), pocket_projection.roles()); 1591 assert_eq!( 1592 protocol_projection.tombstones(), 1593 pocket_projection.tombstones() 1594 ); 1595 assert_eq!( 1596 protocol_projection.event_deletions(), 1597 pocket_projection.event_deletions() 1598 ); 1599 } 1600 1601 #[test] 1602 fn projection_rebuild_sorts_before_applying_last_tuple_wins() { 1603 let report = rebuild_group_projection( 1604 [ 1605 canonical_event( 1606 KIND_GROUP_EDIT_METADATA, 1607 "30", 1608 30, 1609 vec![ 1610 Tag::from_parts("h", &["Farm"]).expect("h"), 1611 Tag::from_parts("name", &["New"]).expect("name"), 1612 ], 1613 3, 1614 ), 1615 canonical_event( 1616 1, 1617 "40", 1618 5, 1619 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1620 99, 1621 ), 1622 canonical_event( 1623 KIND_GROUP_CREATE_GROUP, 1624 "10", 1625 10, 1626 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1627 1, 1628 ), 1629 canonical_event( 1630 KIND_GROUP_EDIT_METADATA, 1631 "20", 1632 20, 1633 vec![ 1634 Tag::from_parts("h", &["Farm"]).expect("h"), 1635 Tag::from_parts("name", &["Old"]).expect("name"), 1636 ], 1637 2, 1638 ), 1639 ], 1640 GroupLimitsConfig::default(), 1641 UnixTimestamp::new(100), 1642 ) 1643 .expect("rebuild"); 1644 1645 let group = report 1646 .projection() 1647 .group(&GroupId::new("Farm").expect("group")) 1648 .expect("group"); 1649 1650 assert_eq!(group.metadata().name(), Some("New")); 1651 assert_eq!(report.applied_events(), 3); 1652 assert_eq!(report.ignored_events(), 1); 1653 assert_eq!(report.last_offset(), Some(StoreOffset::new(99))); 1654 assert!( 1655 report 1656 .projection() 1657 .checkpoint() 1658 .expect("checkpoint") 1659 .matches_current_versions() 1660 ); 1661 } 1662 1663 #[test] 1664 fn projection_rebuild_matches_incremental_projection_for_full_event_stream() { 1665 let limits = GroupLimitsConfig::default(); 1666 let events = vec![ 1667 canonical_event( 1668 KIND_GROUP_EDIT_METADATA, 1669 "20", 1670 20, 1671 vec![ 1672 Tag::from_parts("h", &["Farm"]).expect("h"), 1673 Tag::from_parts("name", &["Market"]).expect("name"), 1674 ], 1675 2, 1676 ), 1677 canonical_event( 1678 1, 1679 "b", 1680 15, 1681 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1682 7, 1683 ), 1684 canonical_event( 1685 KIND_GROUP_DELETE_GROUP, 1686 "50", 1687 50, 1688 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1689 6, 1690 ), 1691 canonical_event( 1692 KIND_GROUP_CREATE_GROUP, 1693 "10", 1694 10, 1695 vec![ 1696 Tag::from_parts("h", &["Farm"]).expect("h"), 1697 Tag::from_parts("name", &["Farmers"]).expect("name"), 1698 ], 1699 1, 1700 ), 1701 canonical_event( 1702 KIND_GROUP_PUT_USER, 1703 "30", 1704 30, 1705 vec![ 1706 Tag::from_parts("h", &["Farm"]).expect("h"), 1707 Tag::from_parts("p", &[&"8".repeat(64)]).expect("p"), 1708 Tag::from_parts("role", &["moderator"]).expect("role"), 1709 ], 1710 3, 1711 ), 1712 canonical_event(1, "a", 5, Vec::new(), 8), 1713 canonical_event( 1714 KIND_GROUP_METADATA, 1715 "40", 1716 40, 1717 vec![ 1718 Tag::from_parts("d", &["Farm"]).expect("d"), 1719 Tag::from_parts("name", &["Snapshot"]).expect("name"), 1720 ], 1721 4, 1722 ), 1723 canonical_event( 1724 KIND_GROUP_DELETE_EVENT, 1725 "45", 1726 45, 1727 vec![ 1728 Tag::from_parts("h", &["Farm"]).expect("h"), 1729 Tag::from_parts("e", &[id("30")]).expect("e"), 1730 ], 1731 5, 1732 ), 1733 ]; 1734 let mut incremental_events = events.clone(); 1735 incremental_events.sort_by_key(CanonicalGroupEvent::tuple); 1736 let mut incremental = GroupProjection::new(); 1737 for item in &incremental_events { 1738 incremental 1739 .apply_canonical_event(item.event(), item.store_offset(), limits) 1740 .expect("incremental"); 1741 } 1742 1743 let report = 1744 rebuild_group_projection(events, limits, UnixTimestamp::new(100)).expect("rebuild"); 1745 let rebuilt = report.projection(); 1746 1747 assert_eq!(report.applied_events(), 6); 1748 assert_eq!(report.ignored_events(), 1); 1749 assert_eq!(report.skipped_events(), 1); 1750 assert_eq!(report.last_offset(), Some(StoreOffset::new(8))); 1751 assert_eq!(incremental.groups(), rebuilt.groups()); 1752 assert_eq!(incremental.members(), rebuilt.members()); 1753 assert_eq!(incremental.roles(), rebuilt.roles()); 1754 assert_eq!(incremental.tombstones(), rebuilt.tombstones()); 1755 assert_eq!(incremental.event_deletions(), rebuilt.event_deletions()); 1756 assert_eq!( 1757 rebuilt 1758 .checkpoint() 1759 .and_then(ProjectionCheckpoint::last_offset), 1760 Some(StoreOffset::new(8)) 1761 ); 1762 } 1763 1764 #[test] 1765 fn projection_records_round_trip_for_persistence() { 1766 let base_tuple = tuple(10, "10", 1); 1767 let state = super::GroupState::new( 1768 GroupId::new("Farm").expect("group"), 1769 crate::GroupMetadata::from_parts( 1770 crate::GroupMetadataText::new(Some("Farmers".to_owned()), None, None), 1771 crate::GroupMetadataFlags::new(true, false, false, false), 1772 SupportedKinds::UnspecifiedAll, 1773 ), 1774 PublicKeyHex::new(&"1".repeat(64)).expect("pubkey"), 1775 EventId::new(id("10")).expect("id"), 1776 base_tuple.clone(), 1777 ); 1778 let member = super::MemberState::new( 1779 PublicKeyHex::new(&"2".repeat(64)).expect("pubkey"), 1780 MemberStatus::Member, 1781 [RoleName::new("moderator").expect("role")] 1782 .into_iter() 1783 .collect(), 1784 EventId::new(id("20")).expect("id"), 1785 base_tuple, 1786 ); 1787 let role = ProjectedRoleDefinition::new( 1788 RoleDefinition::new( 1789 RoleName::new("moderator").expect("role"), 1790 CapabilitySet::new([Capability::ManageMembers]), 1791 Some("member manager".to_owned()), 1792 ), 1793 EventId::new(id("30")).expect("id"), 1794 tuple(10, "30", 3), 1795 ); 1796 let tombstone = GroupTombstone::new( 1797 GroupId::new("Farm").expect("group"), 1798 EventId::new(id("40")).expect("id"), 1799 UnixTimestamp::new(40), 1800 PublicKeyHex::new(&"3".repeat(64)).expect("pubkey"), 1801 tuple(40, "40", 4), 1802 ); 1803 let deletion = GroupEventDeletion::new( 1804 GroupId::new("Farm").expect("group"), 1805 EventId::new(id("30")).expect("target"), 1806 EventId::new(id("45")).expect("delete"), 1807 UnixTimestamp::new(45), 1808 PublicKeyHex::new(&"3".repeat(64)).expect("pubkey"), 1809 tuple(45, "45", 5), 1810 ); 1811 let checkpoint = 1812 ProjectionCheckpoint::current(Some(StoreOffset::new(25)), UnixTimestamp::new(99)); 1813 1814 assert_eq!( 1815 super::GroupState::from_json_bytes(&state.to_json_bytes().expect("bytes")) 1816 .expect("state"), 1817 state 1818 ); 1819 assert_eq!( 1820 super::MemberState::from_json_bytes(&member.to_json_bytes().expect("bytes")) 1821 .expect("member"), 1822 member 1823 ); 1824 assert_eq!( 1825 ProjectedRoleDefinition::from_json_bytes(&role.to_json_bytes().expect("bytes")) 1826 .expect("role"), 1827 role 1828 ); 1829 assert_eq!( 1830 GroupTombstone::from_json_bytes(&tombstone.to_json_bytes().expect("bytes")) 1831 .expect("tombstone"), 1832 tombstone 1833 ); 1834 assert_eq!( 1835 GroupEventDeletion::from_json_bytes(&deletion.to_json_bytes().expect("bytes")) 1836 .expect("deletion"), 1837 deletion 1838 ); 1839 assert_eq!( 1840 ProjectionCheckpoint::from_json_bytes(&checkpoint.to_json_bytes().expect("bytes")) 1841 .expect("checkpoint"), 1842 checkpoint 1843 ); 1844 } 1845 1846 #[test] 1847 fn projection_storage_keys_are_deterministic() { 1848 let group = GroupId::new("Farm").expect("group"); 1849 let pubkey = PublicKeyHex::new(&"4".repeat(64)).expect("pubkey"); 1850 1851 assert_eq!(group_current_key(&group), b"group\0Farm".to_vec()); 1852 assert_eq!( 1853 member_current_key(&group, &pubkey), 1854 format!("member\0Farm\0{}", "4".repeat(64)).into_bytes() 1855 ); 1856 assert_eq!( 1857 role_current_key(&group, &RoleName::new("moderator").expect("role")), 1858 b"role\0Farm\0moderator".to_vec() 1859 ); 1860 assert_eq!(tombstone_key(&group), b"tombstone\0Farm".to_vec()); 1861 assert_eq!( 1862 event_deletion_key(&EventId::new(id("30")).expect("event")), 1863 format!("event_deletion\0{}", id("30")).into_bytes() 1864 ); 1865 assert_eq!(projection_checkpoint_key(), b"checkpoint\0groups".to_vec()); 1866 } 1867 1868 fn tuple(created_at: u64, event_suffix: &str, offset: u64) -> ProjectionOrderTuple { 1869 ProjectionOrderTuple::new( 1870 UnixTimestamp::new(created_at), 1871 EventId::new(id(event_suffix)).expect("id"), 1872 StoreOffset::new(offset), 1873 ) 1874 } 1875 1876 fn event(kind: u32, suffix: &str, created_at: u64, tags: Vec<Tag>) -> Event { 1877 Event::new( 1878 EventId::new(id(suffix)).expect("id"), 1879 UnsignedEvent::new( 1880 PublicKeyHex::new(&"1".repeat(64)).expect("pubkey"), 1881 UnixTimestamp::new(created_at), 1882 Kind::new(kind.into()).expect("kind"), 1883 tags, 1884 "", 1885 ), 1886 SignatureHex::new(&"2".repeat(128)).expect("sig"), 1887 ) 1888 } 1889 1890 fn projection_event_stream() -> Vec<(Event, StoreOffset)> { 1891 vec![ 1892 ( 1893 event( 1894 KIND_GROUP_CREATE_GROUP, 1895 "10", 1896 10, 1897 vec![ 1898 Tag::from_parts("h", &["Farm"]).expect("h"), 1899 Tag::from_parts("name", &["Farmers"]).expect("name"), 1900 ], 1901 ), 1902 StoreOffset::new(1), 1903 ), 1904 ( 1905 event( 1906 KIND_GROUP_EDIT_METADATA, 1907 "20", 1908 20, 1909 vec![ 1910 Tag::from_parts("h", &["Farm"]).expect("h"), 1911 Tag::from_parts("name", &["Market"]).expect("name"), 1912 ], 1913 ), 1914 StoreOffset::new(2), 1915 ), 1916 ( 1917 event( 1918 KIND_GROUP_PUT_USER, 1919 "30", 1920 30, 1921 vec![ 1922 Tag::from_parts("h", &["Farm"]).expect("h"), 1923 Tag::from_parts("p", &[&"8".repeat(64)]).expect("p"), 1924 Tag::from_parts("role", &["moderator"]).expect("role"), 1925 ], 1926 ), 1927 StoreOffset::new(3), 1928 ), 1929 ( 1930 event( 1931 KIND_GROUP_METADATA, 1932 "40", 1933 40, 1934 vec![ 1935 Tag::from_parts("d", &["Farm"]).expect("d"), 1936 Tag::from_parts("name", &["Snapshot"]).expect("name"), 1937 ], 1938 ), 1939 StoreOffset::new(4), 1940 ), 1941 ( 1942 event( 1943 KIND_GROUP_DELETE_EVENT, 1944 "45", 1945 45, 1946 vec![ 1947 Tag::from_parts("h", &["Farm"]).expect("h"), 1948 Tag::from_parts("e", &[id("30")]).expect("e"), 1949 ], 1950 ), 1951 StoreOffset::new(5), 1952 ), 1953 ( 1954 event( 1955 KIND_GROUP_DELETE_GROUP, 1956 "50", 1957 50, 1958 vec![Tag::from_parts("h", &["Farm"]).expect("h")], 1959 ), 1960 StoreOffset::new(6), 1961 ), 1962 ] 1963 } 1964 1965 fn canonical_event( 1966 kind: u32, 1967 suffix: &str, 1968 created_at: u64, 1969 tags: Vec<Tag>, 1970 offset: u64, 1971 ) -> CanonicalGroupEvent { 1972 let event = event(kind, suffix, created_at, tags); 1973 CanonicalGroupEvent::new(pocket_event(&event), StoreOffset::new(offset)) 1974 } 1975 1976 fn pocket_event(event: &Event) -> PocketOwnedEvent { 1977 let raw = event_to_value(event).to_string(); 1978 let mut buffer = vec![0; 4096]; 1979 let (_, pocket) = PocketEvent::from_json(raw.as_bytes(), &mut buffer).expect("pocket"); 1980 pocket.to_owned() 1981 } 1982 1983 fn id(suffix: &str) -> &'static str { 1984 match suffix { 1985 "10" => "0000000000000000000000000000000000000000000000000000000000000010", 1986 "20" => "0000000000000000000000000000000000000000000000000000000000000020", 1987 "30" => "0000000000000000000000000000000000000000000000000000000000000030", 1988 "40" => "0000000000000000000000000000000000000000000000000000000000000040", 1989 "45" => "0000000000000000000000000000000000000000000000000000000000000045", 1990 "50" => "0000000000000000000000000000000000000000000000000000000000000050", 1991 "a" => "000000000000000000000000000000000000000000000000000000000000000a", 1992 "b" => "000000000000000000000000000000000000000000000000000000000000000b", 1993 "c" => "000000000000000000000000000000000000000000000000000000000000000c", 1994 _ => "0000000000000000000000000000000000000000000000000000000000000001", 1995 } 1996 } 1997 }