lib.rs (35091B)
1 #![forbid(unsafe_code)] 2 3 use core::fmt; 4 use pocket_db::{ 5 ScreenResult, Store, 6 heed::{Database, types::Bytes}, 7 }; 8 use pocket_types::{ 9 Event, Filter, Hll8, Id, Kind, OwnedEvent, OwnedFilter, OwnedTags, Pubkey, Sig, Tags, Time, 10 }; 11 use std::{ 12 io, 13 path::{Path, PathBuf}, 14 sync::Arc, 15 }; 16 17 pub const POCKET_SOURCE_REPOSITORY: &str = "https://github.com/triesap/pocket"; 18 pub const POCKET_SOURCE_REVISION: &str = "329334f20948c796c6016b673b92551ac4855ad7"; 19 20 pub type PocketEvent = Event; 21 pub type PocketEventId = Id; 22 pub type PocketFilter = Filter; 23 pub type PocketHll8 = Hll8; 24 pub type PocketKind = Kind; 25 pub type PocketOwnedEvent = OwnedEvent; 26 pub type PocketOwnedFilter = OwnedFilter; 27 pub type PocketOwnedTags = OwnedTags; 28 pub type PocketPubkey = Pubkey; 29 pub type PocketSig = Sig; 30 pub type PocketTags = Tags; 31 pub type PocketTime = Time; 32 pub type PocketScreenResult = ScreenResult; 33 pub type PocketStore = Store; 34 pub type PocketExtraRecord = (Vec<u8>, Vec<u8>); 35 pub type PocketExtraRecords = Vec<PocketExtraRecord>; 36 37 #[derive(Debug, Clone, PartialEq, Eq)] 38 pub struct PocketStoredEvent { 39 store_offset: u64, 40 event: PocketOwnedEvent, 41 } 42 43 impl PocketStoredEvent { 44 pub fn new(store_offset: u64, event: PocketOwnedEvent) -> Self { 45 Self { 46 store_offset, 47 event, 48 } 49 } 50 51 pub fn store_offset(&self) -> u64 { 52 self.store_offset 53 } 54 55 pub fn event(&self) -> &PocketEvent { 56 &self.event 57 } 58 59 pub fn into_event(self) -> PocketOwnedEvent { 60 self.event 61 } 62 } 63 64 #[derive(Debug, Clone, PartialEq, Eq)] 65 pub struct PocketScreenedEvents { 66 events: Vec<PocketOwnedEvent>, 67 redacted: bool, 68 } 69 70 impl PocketScreenedEvents { 71 pub fn new(events: Vec<PocketOwnedEvent>, redacted: bool) -> Self { 72 Self { events, redacted } 73 } 74 75 pub fn events(&self) -> &[PocketOwnedEvent] { 76 &self.events 77 } 78 79 pub fn redacted(&self) -> bool { 80 self.redacted 81 } 82 83 pub fn into_events(self) -> Vec<PocketOwnedEvent> { 84 self.events 85 } 86 } 87 88 pub const TANGLE_GROUP_PROJECTION_TABLE: &str = "group_projection"; 89 pub const TANGLE_GROUP_OUTBOX_TABLE: &str = "group_outbox"; 90 pub const TANGLE_GROUP_CHECKPOINT_TABLE: &str = "group_checkpoint"; 91 pub const TANGLE_POCKET_EXTRA_TABLES: [&str; 3] = [ 92 TANGLE_GROUP_PROJECTION_TABLE, 93 TANGLE_GROUP_OUTBOX_TABLE, 94 TANGLE_GROUP_CHECKPOINT_TABLE, 95 ]; 96 97 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 98 pub struct PocketQueryConfig { 99 allow_scraping: bool, 100 allow_scrape_if_limited_to: u32, 101 allow_scrape_if_max_seconds: u64, 102 } 103 104 impl PocketQueryConfig { 105 pub const fn new( 106 allow_scraping: bool, 107 allow_scrape_if_limited_to: u32, 108 allow_scrape_if_max_seconds: u64, 109 ) -> Self { 110 Self { 111 allow_scraping, 112 allow_scrape_if_limited_to, 113 allow_scrape_if_max_seconds, 114 } 115 } 116 117 pub fn allow_scraping(self) -> bool { 118 self.allow_scraping 119 } 120 121 pub fn allow_scrape_if_limited_to(self) -> u32 { 122 self.allow_scrape_if_limited_to 123 } 124 125 pub fn allow_scrape_if_max_seconds(self) -> u64 { 126 self.allow_scrape_if_max_seconds 127 } 128 129 pub fn exact_count(self) -> Self { 130 Self::new( 131 true, 132 self.allow_scrape_if_limited_to, 133 self.allow_scrape_if_max_seconds, 134 ) 135 } 136 } 137 138 impl Default for PocketQueryConfig { 139 fn default() -> Self { 140 Self::new(false, 100, 3_600) 141 } 142 } 143 144 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 145 pub struct PocketDependencyBoundary { 146 source_repository: &'static str, 147 source_revision: &'static str, 148 } 149 150 impl PocketDependencyBoundary { 151 pub fn current() -> Self { 152 Self { 153 source_repository: POCKET_SOURCE_REPOSITORY, 154 source_revision: POCKET_SOURCE_REVISION, 155 } 156 } 157 158 pub fn source_repository(&self) -> &'static str { 159 self.source_repository 160 } 161 162 pub fn source_revision(&self) -> &'static str { 163 self.source_revision 164 } 165 } 166 167 #[derive(Clone)] 168 pub struct PocketStoreHandle { 169 store: Arc<PocketStore>, 170 sync_policy: PocketSyncPolicy, 171 } 172 173 impl PocketStoreHandle { 174 pub fn open(config: &PocketStoreConfig) -> Result<Self, PocketStoreError> { 175 std::fs::create_dir_all(config.data_directory()) 176 .map_err(|error| PocketStoreError::from_create_dir(config.data_directory(), error))?; 177 let store = PocketStore::new(config.data_directory(), TANGLE_POCKET_EXTRA_TABLES.to_vec()) 178 .map_err(PocketStoreError::from_pocket)?; 179 Ok(Self { 180 store: Arc::new(store), 181 sync_policy: config.sync_policy(), 182 }) 183 } 184 185 pub fn dir(&self) -> &Path { 186 self.store.dir() 187 } 188 189 pub fn sync(&self) -> Result<(), PocketStoreError> { 190 self.store.sync().map_err(PocketStoreError::from_pocket) 191 } 192 193 pub fn sync_policy(&self) -> PocketSyncPolicy { 194 self.sync_policy 195 } 196 197 pub fn store_event(&self, event: &PocketEvent) -> Result<u64, PocketStoreError> { 198 let offset = self 199 .store 200 .store_event(event) 201 .map_err(PocketStoreError::from_pocket)?; 202 self.sync_after_write()?; 203 Ok(offset) 204 } 205 206 pub fn event_by_id( 207 &self, 208 event_id: PocketEventId, 209 ) -> Result<Option<PocketOwnedEvent>, PocketStoreError> { 210 self.store 211 .get_event_by_id(event_id) 212 .map(|event| event.map(PocketEvent::to_owned)) 213 .map_err(PocketStoreError::from_pocket) 214 } 215 216 pub fn event_by_offset(&self, offset: u64) -> Result<PocketOwnedEvent, PocketStoreError> { 217 self.store 218 .get_event_by_offset(offset) 219 .map(PocketEvent::to_owned) 220 .map_err(PocketStoreError::from_pocket) 221 } 222 223 pub fn find_events( 224 &self, 225 filter: &PocketFilter, 226 query: PocketQueryConfig, 227 ) -> Result<Vec<PocketOwnedEvent>, PocketStoreError> { 228 self.find_events_with_screen(filter, query, |_| PocketScreenResult::Match) 229 .map(PocketScreenedEvents::into_events) 230 } 231 232 pub fn find_events_with_screen<F>( 233 &self, 234 filter: &PocketFilter, 235 query: PocketQueryConfig, 236 screen: F, 237 ) -> Result<PocketScreenedEvents, PocketStoreError> 238 where 239 F: Fn(&PocketEvent) -> PocketScreenResult, 240 { 241 let (events, redacted) = self 242 .store 243 .find_events( 244 filter, 245 query.allow_scraping(), 246 query.allow_scrape_if_limited_to(), 247 query.allow_scrape_if_max_seconds(), 248 screen, 249 ) 250 .map_err(PocketStoreError::from_pocket)?; 251 Ok(PocketScreenedEvents::new( 252 events.into_iter().map(PocketEvent::to_owned).collect(), 253 redacted, 254 )) 255 } 256 257 pub fn count_events( 258 &self, 259 filter: &PocketFilter, 260 query: PocketQueryConfig, 261 ) -> Result<u64, PocketStoreError> { 262 self.find_events(filter, query) 263 .map(|events| u64::try_from(events.len()).expect("usize count fits in u64")) 264 } 265 266 pub fn scan_events(&self) -> Result<Vec<PocketStoredEvent>, PocketStoreError> { 267 self.scan_events_after(None) 268 } 269 270 pub fn scan_events_after( 271 &self, 272 last_offset: Option<u64>, 273 ) -> Result<Vec<PocketStoredEvent>, PocketStoreError> { 274 let stats = self.store.stats().map_err(PocketStoreError::from_pocket)?; 275 let end = u64::try_from(stats.event_bytes) 276 .map_err(|_| PocketStoreError::invalid("Pocket event map size exceeds u64"))?; 277 let mut offset = match last_offset { 278 Some(offset) => { 279 let event = self 280 .store 281 .get_event_by_offset(offset) 282 .map_err(PocketStoreError::from_pocket)?; 283 next_event_offset(offset, event)? 284 } 285 None => event_map_start_offset(), 286 }; 287 let mut events = Vec::new(); 288 while offset < end { 289 let event = self 290 .store 291 .get_event_by_offset(offset) 292 .map_err(PocketStoreError::from_pocket)?; 293 events.push(PocketStoredEvent::new(offset, event.to_owned())); 294 offset = next_event_offset(offset, event)?; 295 } 296 Ok(events) 297 } 298 299 pub fn put_extra_record( 300 &self, 301 table: &'static str, 302 key: &[u8], 303 value: &[u8], 304 ) -> Result<(), PocketStoreError> { 305 let table_handle = self.extra_table(table)?; 306 let mut txn = self.store.write_txn().map_err(|error| { 307 PocketStoreError::from_extra_table(table, "write transaction", error) 308 })?; 309 table_handle 310 .put(&mut txn, key, value) 311 .map_err(|error| PocketStoreError::from_extra_table(table, "put", error))?; 312 txn.commit() 313 .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?; 314 self.sync_after_write() 315 } 316 317 pub fn get_extra_record( 318 &self, 319 table: &'static str, 320 key: &[u8], 321 ) -> Result<Option<Vec<u8>>, PocketStoreError> { 322 let table_handle = self.extra_table(table)?; 323 let txn = self.store.read_txn().map_err(|error| { 324 PocketStoreError::from_extra_table(table, "read transaction", error) 325 })?; 326 table_handle 327 .get(&txn, key) 328 .map(|value| value.map(<[u8]>::to_vec)) 329 .map_err(|error| PocketStoreError::from_extra_table(table, "get", error)) 330 } 331 332 pub fn delete_extra_record( 333 &self, 334 table: &'static str, 335 key: &[u8], 336 ) -> Result<(), PocketStoreError> { 337 let table_handle = self.extra_table(table)?; 338 let mut txn = self.store.write_txn().map_err(|error| { 339 PocketStoreError::from_extra_table(table, "write transaction", error) 340 })?; 341 table_handle 342 .delete(&mut txn, key) 343 .map_err(|error| PocketStoreError::from_extra_table(table, "delete", error))?; 344 txn.commit() 345 .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?; 346 self.sync_after_write() 347 } 348 349 pub fn scan_extra_records( 350 &self, 351 table: &'static str, 352 ) -> Result<PocketExtraRecords, PocketStoreError> { 353 let table_handle = self.extra_table(table)?; 354 let txn = self.store.read_txn().map_err(|error| { 355 PocketStoreError::from_extra_table(table, "read transaction", error) 356 })?; 357 let mut records = Vec::new(); 358 let iter = table_handle 359 .iter(&txn) 360 .map_err(|error| PocketStoreError::from_extra_table(table, "scan", error))?; 361 for item in iter { 362 let (key, value) = 363 item.map_err(|error| PocketStoreError::from_extra_table(table, "scan", error))?; 364 records.push((key.to_vec(), value.to_vec())); 365 } 366 Ok(records) 367 } 368 369 fn extra_table(&self, table: &'static str) -> Result<Database<Bytes, Bytes>, PocketStoreError> { 370 self.store 371 .extra_table(table) 372 .ok_or_else(|| PocketStoreError::missing_table(table)) 373 } 374 375 fn sync_after_write(&self) -> Result<(), PocketStoreError> { 376 match self.sync_policy { 377 PocketSyncPolicy::FlushOnWrite => self.sync(), 378 PocketSyncPolicy::FlushOnShutdown => Ok(()), 379 } 380 } 381 } 382 383 pub fn parse_pocket_event_json(raw: &[u8]) -> Result<PocketOwnedEvent, PocketStoreError> { 384 if raw.is_empty() { 385 return Err(PocketStoreError::invalid( 386 "pocket event JSON must not be empty", 387 )); 388 } 389 let mut buffer = vec![0; pocket_json_buffer_len(raw.len())]; 390 let (_, event) = 391 PocketEvent::from_json(raw, &mut buffer).map_err(PocketStoreError::from_pocket_types)?; 392 Ok(event.to_owned()) 393 } 394 395 pub fn parse_pocket_filter_json(raw: &[u8]) -> Result<PocketOwnedFilter, PocketStoreError> { 396 if raw.is_empty() { 397 return Err(PocketStoreError::invalid( 398 "pocket filter JSON must not be empty", 399 )); 400 } 401 let mut buffer = vec![0; pocket_json_buffer_len(raw.len())]; 402 let (_, _, filter) = 403 PocketFilter::from_json(raw, &mut buffer).map_err(PocketStoreError::from_pocket_types)?; 404 Ok(filter.to_owned()) 405 } 406 407 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 408 pub enum PocketSyncPolicy { 409 FlushOnWrite, 410 FlushOnShutdown, 411 } 412 413 #[derive(Debug, Clone, PartialEq, Eq)] 414 pub struct PocketStoreConfig { 415 data_directory: PathBuf, 416 sync_policy: PocketSyncPolicy, 417 } 418 419 impl PocketStoreConfig { 420 pub fn new( 421 data_directory: impl Into<PathBuf>, 422 sync_policy: PocketSyncPolicy, 423 ) -> Result<Self, PocketConfigError> { 424 let config = Self { 425 data_directory: data_directory.into(), 426 sync_policy, 427 }; 428 config.validate()?; 429 Ok(config) 430 } 431 432 pub fn validate(&self) -> Result<(), PocketConfigError> { 433 if self.data_directory.as_os_str().is_empty() { 434 return Err(PocketConfigError::invalid( 435 "pocket.data_directory must not be empty", 436 )); 437 } 438 Ok(()) 439 } 440 441 pub fn data_directory(&self) -> &Path { 442 &self.data_directory 443 } 444 445 pub fn sync_policy(&self) -> PocketSyncPolicy { 446 self.sync_policy 447 } 448 } 449 450 #[derive(Debug, Clone, PartialEq, Eq)] 451 pub struct PocketConfigError { 452 message: String, 453 } 454 455 impl PocketConfigError { 456 pub fn invalid(message: impl Into<String>) -> Self { 457 Self { 458 message: message.into(), 459 } 460 } 461 462 pub fn message(&self) -> &str { 463 &self.message 464 } 465 } 466 467 impl fmt::Display for PocketConfigError { 468 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { 469 formatter.write_str(&self.message) 470 } 471 } 472 473 impl std::error::Error for PocketConfigError {} 474 475 #[derive(Debug, Clone, PartialEq, Eq)] 476 pub struct PocketStoreError { 477 message: String, 478 } 479 480 impl PocketStoreError { 481 pub fn invalid(message: impl Into<String>) -> Self { 482 Self { 483 message: message.into(), 484 } 485 } 486 487 pub fn from_create_dir(path: &Path, error: io::Error) -> Self { 488 Self { 489 message: format!( 490 "failed to create Pocket store directory {}: {error}", 491 path.display() 492 ), 493 } 494 } 495 496 pub fn from_pocket(error: pocket_db::Error) -> Self { 497 Self { 498 message: error.to_string(), 499 } 500 } 501 502 pub fn from_pocket_types(error: pocket_types::Error) -> Self { 503 Self { 504 message: error.to_string(), 505 } 506 } 507 508 pub fn missing_table(table: &'static str) -> Self { 509 Self { 510 message: format!("missing Pocket extra table {table}"), 511 } 512 } 513 514 pub fn from_extra_table( 515 table: &'static str, 516 operation: &'static str, 517 error: impl fmt::Display, 518 ) -> Self { 519 Self { 520 message: format!("Pocket extra table {table} {operation} failed: {error}"), 521 } 522 } 523 524 pub fn message(&self) -> &str { 525 &self.message 526 } 527 } 528 529 impl fmt::Display for PocketStoreError { 530 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { 531 formatter.write_str(&self.message) 532 } 533 } 534 535 impl std::error::Error for PocketStoreError {} 536 537 fn pocket_json_buffer_len(raw_len: usize) -> usize { 538 raw_len.saturating_mul(2).max(4096) 539 } 540 541 fn event_map_start_offset() -> u64 { 542 u64::try_from(std::mem::size_of::<usize>()).expect("usize header size fits u64") 543 } 544 545 fn align_event_offset(offset: u64) -> u64 { 546 if offset.is_multiple_of(8) { 547 offset 548 } else { 549 offset + (8 - offset % 8) 550 } 551 } 552 553 fn next_event_offset(offset: u64, event: &PocketEvent) -> Result<u64, PocketStoreError> { 554 let next = offset 555 .checked_add(event_len_u64(event)?) 556 .ok_or_else(|| PocketStoreError::invalid("Pocket event offset exceeds u64"))?; 557 Ok(align_event_offset(next)) 558 } 559 560 fn event_len_u64(event: &PocketEvent) -> Result<u64, PocketStoreError> { 561 u64::try_from(event.len()) 562 .map_err(|_| PocketStoreError::invalid("Pocket event size exceeds u64")) 563 } 564 565 #[cfg(test)] 566 mod tests { 567 use super::{ 568 POCKET_SOURCE_REPOSITORY, POCKET_SOURCE_REVISION, PocketDependencyBoundary, 569 PocketQueryConfig, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, 570 TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, 571 TANGLE_POCKET_EXTRA_TABLES, parse_pocket_event_json, parse_pocket_filter_json, 572 }; 573 use pocket_db::ScreenResult; 574 use std::time::{SystemTime, UNIX_EPOCH}; 575 576 #[test] 577 fn pocket_dependency_boundary_pins_triesap_revision() { 578 let boundary = PocketDependencyBoundary::current(); 579 580 assert_eq!( 581 boundary.source_repository(), 582 "https://github.com/triesap/pocket" 583 ); 584 assert_eq!(boundary.source_repository(), POCKET_SOURCE_REPOSITORY); 585 assert_eq!( 586 boundary.source_revision(), 587 "329334f20948c796c6016b673b92551ac4855ad7" 588 ); 589 assert_eq!(boundary.source_revision(), POCKET_SOURCE_REVISION); 590 } 591 592 #[test] 593 fn pocket_dependency_boundary_matches_manifest_and_lock_state() { 594 let store_manifest = include_str!("../Cargo.toml"); 595 let groups_manifest = include_str!("../../tangle_groups/Cargo.toml"); 596 let lockfile = include_str!("../../../Cargo.lock"); 597 let approved_source = format!("git = \"{}\"", POCKET_SOURCE_REPOSITORY); 598 let approved_revision = format!("rev = \"{}\"", POCKET_SOURCE_REVISION); 599 let approved_lock_source = format!( 600 "git+{}?rev={}#{}", 601 POCKET_SOURCE_REPOSITORY, POCKET_SOURCE_REVISION, POCKET_SOURCE_REVISION 602 ); 603 604 for manifest in [store_manifest, groups_manifest] { 605 assert!(!manifest.contains("mikedilger/pocket")); 606 assert!(manifest.contains(&approved_source)); 607 assert!(manifest.contains(&approved_revision)); 608 } 609 assert!(!lockfile.contains("mikedilger/pocket")); 610 assert!(lockfile.contains(&approved_lock_source)); 611 } 612 613 #[test] 614 fn pocket_query_config_exact_count_enables_scrape_scan() { 615 let config = PocketQueryConfig::new(false, 7, 11).exact_count(); 616 617 assert!(config.allow_scraping()); 618 assert_eq!(config.allow_scrape_if_limited_to(), 7); 619 assert_eq!(config.allow_scrape_if_max_seconds(), 11); 620 } 621 622 #[test] 623 fn pocket_store_handle_opens_syncs_and_exposes_tangle_tables() { 624 let root = std::env::temp_dir().join(format!("tangle-pocket-store-{}", std::process::id())); 625 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 626 .expect("config"); 627 628 let handle = PocketStoreHandle::open(&config).expect("open"); 629 630 assert_eq!(handle.dir(), config.data_directory()); 631 assert_eq!(handle.sync_policy(), PocketSyncPolicy::FlushOnShutdown); 632 assert_eq!( 633 TANGLE_POCKET_EXTRA_TABLES, 634 ["group_projection", "group_outbox", "group_checkpoint"] 635 ); 636 handle.sync().expect("sync"); 637 638 let _ = std::fs::remove_dir_all(root); 639 } 640 641 #[test] 642 fn pocket_store_handle_clones_share_one_store_boundary() { 643 let root = temp_root("tangle-pocket-shared"); 644 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 645 .expect("config"); 646 let writer = PocketStoreHandle::open(&config).expect("open"); 647 let reader = writer.clone(); 648 let event = parse_pocket_event_json(event_json().as_bytes()).expect("event"); 649 let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter"); 650 651 let offset = writer.store_event(&event).expect("store"); 652 let stored = reader.event_by_offset(offset).expect("offset"); 653 let found = reader 654 .find_events(&filter, PocketQueryConfig::default()) 655 .expect("find"); 656 657 assert_eq!(stored.id(), event.id()); 658 assert_eq!(found.len(), 1); 659 assert_eq!(found[0].id(), event.id()); 660 661 let _ = std::fs::remove_dir_all(root); 662 } 663 664 #[test] 665 fn pocket_store_handle_stores_queries_and_counts_events() { 666 let root = std::env::temp_dir().join(format!("tangle-pocket-query-{}", std::process::id())); 667 let _ = std::fs::remove_dir_all(&root); 668 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 669 .expect("config"); 670 let handle = PocketStoreHandle::open(&config).expect("open"); 671 let event = parse_pocket_event_json(event_json().as_bytes()).expect("event"); 672 let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter"); 673 674 let offset = handle.store_event(&event).expect("store"); 675 let stored = handle 676 .event_by_id(event.id()) 677 .expect("lookup") 678 .expect("event"); 679 let offset_event = handle.event_by_offset(offset).expect("offset lookup"); 680 let found = handle 681 .find_events(&filter, PocketQueryConfig::default()) 682 .expect("find"); 683 684 assert_eq!(stored.id(), event.id()); 685 assert_eq!(offset_event.id(), event.id()); 686 assert_eq!(found.len(), 1); 687 assert_eq!(found[0].id(), event.id()); 688 assert_eq!( 689 handle 690 .count_events(&filter, PocketQueryConfig::default()) 691 .expect("count"), 692 1 693 ); 694 695 let _ = std::fs::remove_dir_all(root); 696 } 697 698 #[test] 699 fn pocket_store_handle_scans_canonical_events_with_offsets() { 700 let root = temp_root("tangle-pocket-scan"); 701 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 702 .expect("config"); 703 let handle = PocketStoreHandle::open(&config).expect("open"); 704 let first = 705 parse_pocket_event_json(event_json_with("a", "1", "first").as_bytes()).expect("first"); 706 let second = parse_pocket_event_json(event_json_with("c", "2", "second").as_bytes()) 707 .expect("second"); 708 709 let first_offset = handle.store_event(&first).expect("store first"); 710 let second_offset = handle.store_event(&second).expect("store second"); 711 let all = handle.scan_events().expect("scan"); 712 let after_first = handle 713 .scan_events_after(Some(first_offset)) 714 .expect("scan after first"); 715 716 assert_eq!(all.len(), 2); 717 assert_eq!(all[0].store_offset(), first_offset); 718 assert_eq!(all[0].event().id(), first.id()); 719 assert_eq!(all[1].store_offset(), second_offset); 720 assert_eq!(all[1].event().id(), second.id()); 721 assert_eq!(after_first.len(), 1); 722 assert_eq!(after_first[0].store_offset(), second_offset); 723 assert_eq!(after_first[0].event().id(), second.id()); 724 725 let _ = std::fs::remove_dir_all(root); 726 } 727 728 #[test] 729 fn pocket_store_handle_screens_events_before_materialization() { 730 let root = temp_root("tangle-pocket-screen"); 731 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 732 .expect("config"); 733 let handle = PocketStoreHandle::open(&config).expect("open"); 734 let visible = parse_pocket_event_json(event_json_with("a", "1", "visible").as_bytes()) 735 .expect("visible"); 736 let redacted = parse_pocket_event_json(event_json_with("c", "2", "redacted").as_bytes()) 737 .expect("redacted"); 738 let filter = parse_pocket_filter_json(kind_filter_json().as_bytes()).expect("filter"); 739 740 handle.store_event(&visible).expect("store visible"); 741 handle.store_event(&redacted).expect("store redacted"); 742 743 let screened = handle 744 .find_events_with_screen(&filter, PocketQueryConfig::default(), |event| { 745 if event.id() == visible.id() { 746 ScreenResult::Match 747 } else { 748 ScreenResult::Redacted 749 } 750 }) 751 .expect("screened"); 752 753 assert!(screened.redacted()); 754 assert_eq!(screened.events().len(), 1); 755 assert_eq!(screened.events()[0].id(), visible.id()); 756 757 let mismatched = handle 758 .find_events_with_screen(&filter, PocketQueryConfig::default(), |event| { 759 if event.id() == visible.id() { 760 ScreenResult::Match 761 } else { 762 ScreenResult::Mismatch 763 } 764 }) 765 .expect("mismatched"); 766 767 assert!(!mismatched.redacted()); 768 assert_eq!(mismatched.events().len(), 1); 769 assert_eq!(mismatched.events()[0].id(), visible.id()); 770 771 let hidden = handle 772 .find_events_with_screen(&filter, PocketQueryConfig::default(), |_| { 773 ScreenResult::Mismatch 774 }) 775 .expect("hidden"); 776 777 assert!(!hidden.redacted()); 778 assert!(hidden.events().is_empty()); 779 780 let _ = std::fs::remove_dir_all(root); 781 } 782 783 #[test] 784 fn pocket_store_handle_rejects_duplicate_event_writes_without_duplicate_materialization() { 785 let root = temp_root("tangle-pocket-duplicate"); 786 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 787 .expect("config"); 788 let handle = PocketStoreHandle::open(&config).expect("open"); 789 let event = parse_pocket_event_json(event_json().as_bytes()).expect("event"); 790 let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter"); 791 792 let first_offset = handle.store_event(&event).expect("store first"); 793 let duplicate_error = handle.store_event(&event).expect_err("duplicate"); 794 let by_id = handle 795 .event_by_id(event.id()) 796 .expect("lookup") 797 .expect("event"); 798 let by_offset = handle.event_by_offset(first_offset).expect("offset"); 799 let found = handle 800 .find_events(&filter, PocketQueryConfig::default()) 801 .expect("find"); 802 let scanned = handle.scan_events().expect("scan"); 803 804 assert!( 805 duplicate_error 806 .message() 807 .to_lowercase() 808 .contains("duplicate") 809 ); 810 assert_eq!(by_id.id(), event.id()); 811 assert_eq!(by_offset.id(), event.id()); 812 assert_eq!(found.len(), 1); 813 assert_eq!(found[0].id(), event.id()); 814 assert_eq!(scanned.len(), 1); 815 assert_eq!(scanned[0].store_offset(), first_offset); 816 assert_eq!(scanned[0].event().id(), event.id()); 817 818 let _ = std::fs::remove_dir_all(root); 819 } 820 821 #[test] 822 fn pocket_store_query_config_controls_scraping() { 823 let root = temp_root("tangle-pocket-query-config"); 824 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 825 .expect("config"); 826 let handle = PocketStoreHandle::open(&config).expect("open"); 827 let event = 828 parse_pocket_event_json(event_json_with("f", "6", "scrape").as_bytes()).expect("event"); 829 let broad = parse_pocket_filter_json(r#"{"limit":1}"#.as_bytes()).expect("filter"); 830 831 handle.store_event(&event).expect("store"); 832 833 assert!( 834 handle 835 .find_events(&broad, PocketQueryConfig::new(false, 0, 0)) 836 .expect_err("scrape rejected") 837 .message() 838 .contains("scraper") 839 ); 840 let found = handle 841 .find_events(&broad, PocketQueryConfig::new(false, 1, 0)) 842 .expect("limited scrape"); 843 844 assert_eq!(found.len(), 1); 845 assert_eq!(found[0].id(), event.id()); 846 847 let _ = std::fs::remove_dir_all(root); 848 } 849 850 #[test] 851 fn pocket_store_handle_persists_extra_table_records() { 852 let root = temp_root("tangle-pocket-extra"); 853 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 854 .expect("config"); 855 let handle = PocketStoreHandle::open(&config).expect("open"); 856 857 handle 858 .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"state-v1") 859 .expect("put projection"); 860 handle 861 .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"state-v2") 862 .expect("update projection"); 863 handle 864 .put_extra_record(TANGLE_GROUP_OUTBOX_TABLE, b"outbox\0b", b"record-1") 865 .expect("put outbox one"); 866 handle 867 .put_extra_record(TANGLE_GROUP_OUTBOX_TABLE, b"outbox\0a", b"record-0") 868 .expect("put outbox zero"); 869 handle 870 .put_extra_record( 871 TANGLE_GROUP_CHECKPOINT_TABLE, 872 b"checkpoint\0groups", 873 b"checkpoint", 874 ) 875 .expect("put checkpoint"); 876 877 assert_eq!( 878 handle 879 .get_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm") 880 .expect("get projection"), 881 Some(b"state-v2".to_vec()) 882 ); 883 assert_eq!( 884 handle 885 .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE) 886 .expect("scan outbox"), 887 vec![ 888 (b"outbox\0a".to_vec(), b"record-0".to_vec()), 889 (b"outbox\0b".to_vec(), b"record-1".to_vec()), 890 ] 891 ); 892 handle 893 .delete_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm") 894 .expect("delete projection"); 895 assert_eq!( 896 handle 897 .get_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm") 898 .expect("deleted projection"), 899 None 900 ); 901 drop(handle); 902 903 let reopened = PocketStoreHandle::open(&config).expect("reopen"); 904 assert_eq!( 905 reopened 906 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0groups") 907 .expect("checkpoint"), 908 Some(b"checkpoint".to_vec()) 909 ); 910 911 drop(reopened); 912 let _ = std::fs::remove_dir_all(root); 913 } 914 915 #[test] 916 fn pocket_store_handle_flush_on_write_syncs_written_events_and_extra_records() { 917 let root = temp_root("tangle-pocket-flush-write"); 918 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnWrite) 919 .expect("config"); 920 let handle = PocketStoreHandle::open(&config).expect("open"); 921 let event = 922 parse_pocket_event_json(event_json_with("e", "5", "flush").as_bytes()).expect("event"); 923 924 let offset = handle.store_event(&event).expect("store"); 925 handle 926 .put_extra_record( 927 TANGLE_GROUP_CHECKPOINT_TABLE, 928 b"checkpoint\0flush", 929 b"flushed", 930 ) 931 .expect("checkpoint"); 932 drop(handle); 933 934 let reopened = PocketStoreHandle::open(&config).expect("reopen"); 935 let by_id = reopened 936 .event_by_id(event.id()) 937 .expect("lookup") 938 .expect("event"); 939 let by_offset = reopened.event_by_offset(offset).expect("offset"); 940 941 assert_eq!(by_id.id(), event.id()); 942 assert_eq!(by_offset.id(), event.id()); 943 assert_eq!( 944 reopened 945 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0flush") 946 .expect("checkpoint"), 947 Some(b"flushed".to_vec()) 948 ); 949 950 drop(reopened); 951 let _ = std::fs::remove_dir_all(root); 952 } 953 954 #[test] 955 fn pocket_store_handle_syncs_written_events_and_extra_records() { 956 let root = temp_root("tangle-pocket-sync"); 957 let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown) 958 .expect("config"); 959 let handle = PocketStoreHandle::open(&config).expect("open"); 960 let event = 961 parse_pocket_event_json(event_json_with("d", "4", "synced").as_bytes()).expect("event"); 962 963 let offset = handle.store_event(&event).expect("store"); 964 handle 965 .put_extra_record( 966 TANGLE_GROUP_CHECKPOINT_TABLE, 967 b"checkpoint\0sync", 968 b"synced", 969 ) 970 .expect("checkpoint"); 971 handle.sync().expect("sync"); 972 drop(handle); 973 974 let reopened = PocketStoreHandle::open(&config).expect("reopen"); 975 let by_id = reopened 976 .event_by_id(event.id()) 977 .expect("lookup") 978 .expect("event"); 979 let by_offset = reopened.event_by_offset(offset).expect("offset"); 980 981 assert_eq!(by_id.id(), event.id()); 982 assert_eq!(by_offset.id(), event.id()); 983 assert_eq!( 984 reopened 985 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0sync") 986 .expect("checkpoint"), 987 Some(b"synced".to_vec()) 988 ); 989 990 drop(reopened); 991 let _ = std::fs::remove_dir_all(root); 992 } 993 994 #[test] 995 fn pocket_store_config_preserves_explicit_storage_boundary() { 996 let config = PocketStoreConfig::new( 997 "runtime/radroots/tangle/pocket", 998 PocketSyncPolicy::FlushOnShutdown, 999 ) 1000 .expect("config"); 1001 1002 assert_eq!( 1003 config.data_directory().to_string_lossy(), 1004 "runtime/radroots/tangle/pocket" 1005 ); 1006 assert_eq!(config.sync_policy(), PocketSyncPolicy::FlushOnShutdown); 1007 } 1008 1009 #[test] 1010 fn pocket_store_config_rejects_implicit_storage_values() { 1011 assert_eq!( 1012 PocketStoreConfig::new("", PocketSyncPolicy::FlushOnWrite) 1013 .expect_err("error") 1014 .message(), 1015 "pocket.data_directory must not be empty" 1016 ); 1017 } 1018 1019 fn event_json() -> String { 1020 event_json_with("a", "1", "hello") 1021 } 1022 1023 fn event_json_with(id_hex: &str, pubkey_hex: &str, content: &str) -> String { 1024 format!( 1025 r#"{{ 1026 "id":"{}", 1027 "pubkey":"{}", 1028 "created_at":1714124433, 1029 "kind":1, 1030 "tags":[["t","radroots"]], 1031 "content":"{}", 1032 "sig":"{}" 1033 }}"#, 1034 id_hex.repeat(64), 1035 pubkey_hex.repeat(64), 1036 content, 1037 "b".repeat(128) 1038 ) 1039 } 1040 1041 fn filter_json() -> String { 1042 r#"{"ids":["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"],"limit":10}"# 1043 .to_owned() 1044 } 1045 1046 fn kind_filter_json() -> String { 1047 r#"{"kinds":[1],"limit":10}"#.to_owned() 1048 } 1049 1050 fn temp_root(prefix: &str) -> std::path::PathBuf { 1051 std::env::temp_dir().join(format!( 1052 "{}-{}-{}", 1053 prefix, 1054 std::process::id(), 1055 SystemTime::now() 1056 .duration_since(UNIX_EPOCH) 1057 .expect("system time") 1058 .as_nanos() 1059 )) 1060 } 1061 }