tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

lib.rs (35091B)


      1 #![forbid(unsafe_code)]
      2 
      3 use core::fmt;
      4 use pocket_db::{
      5     ScreenResult, Store,
      6     heed::{Database, types::Bytes},
      7 };
      8 use pocket_types::{
      9     Event, Filter, Hll8, Id, Kind, OwnedEvent, OwnedFilter, OwnedTags, Pubkey, Sig, Tags, Time,
     10 };
     11 use std::{
     12     io,
     13     path::{Path, PathBuf},
     14     sync::Arc,
     15 };
     16 
     17 pub const POCKET_SOURCE_REPOSITORY: &str = "https://github.com/triesap/pocket";
     18 pub const POCKET_SOURCE_REVISION: &str = "329334f20948c796c6016b673b92551ac4855ad7";
     19 
     20 pub type PocketEvent = Event;
     21 pub type PocketEventId = Id;
     22 pub type PocketFilter = Filter;
     23 pub type PocketHll8 = Hll8;
     24 pub type PocketKind = Kind;
     25 pub type PocketOwnedEvent = OwnedEvent;
     26 pub type PocketOwnedFilter = OwnedFilter;
     27 pub type PocketOwnedTags = OwnedTags;
     28 pub type PocketPubkey = Pubkey;
     29 pub type PocketSig = Sig;
     30 pub type PocketTags = Tags;
     31 pub type PocketTime = Time;
     32 pub type PocketScreenResult = ScreenResult;
     33 pub type PocketStore = Store;
     34 pub type PocketExtraRecord = (Vec<u8>, Vec<u8>);
     35 pub type PocketExtraRecords = Vec<PocketExtraRecord>;
     36 
     37 #[derive(Debug, Clone, PartialEq, Eq)]
     38 pub struct PocketStoredEvent {
     39     store_offset: u64,
     40     event: PocketOwnedEvent,
     41 }
     42 
     43 impl PocketStoredEvent {
     44     pub fn new(store_offset: u64, event: PocketOwnedEvent) -> Self {
     45         Self {
     46             store_offset,
     47             event,
     48         }
     49     }
     50 
     51     pub fn store_offset(&self) -> u64 {
     52         self.store_offset
     53     }
     54 
     55     pub fn event(&self) -> &PocketEvent {
     56         &self.event
     57     }
     58 
     59     pub fn into_event(self) -> PocketOwnedEvent {
     60         self.event
     61     }
     62 }
     63 
     64 #[derive(Debug, Clone, PartialEq, Eq)]
     65 pub struct PocketScreenedEvents {
     66     events: Vec<PocketOwnedEvent>,
     67     redacted: bool,
     68 }
     69 
     70 impl PocketScreenedEvents {
     71     pub fn new(events: Vec<PocketOwnedEvent>, redacted: bool) -> Self {
     72         Self { events, redacted }
     73     }
     74 
     75     pub fn events(&self) -> &[PocketOwnedEvent] {
     76         &self.events
     77     }
     78 
     79     pub fn redacted(&self) -> bool {
     80         self.redacted
     81     }
     82 
     83     pub fn into_events(self) -> Vec<PocketOwnedEvent> {
     84         self.events
     85     }
     86 }
     87 
     88 pub const TANGLE_GROUP_PROJECTION_TABLE: &str = "group_projection";
     89 pub const TANGLE_GROUP_OUTBOX_TABLE: &str = "group_outbox";
     90 pub const TANGLE_GROUP_CHECKPOINT_TABLE: &str = "group_checkpoint";
     91 pub const TANGLE_POCKET_EXTRA_TABLES: [&str; 3] = [
     92     TANGLE_GROUP_PROJECTION_TABLE,
     93     TANGLE_GROUP_OUTBOX_TABLE,
     94     TANGLE_GROUP_CHECKPOINT_TABLE,
     95 ];
     96 
     97 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     98 pub struct PocketQueryConfig {
     99     allow_scraping: bool,
    100     allow_scrape_if_limited_to: u32,
    101     allow_scrape_if_max_seconds: u64,
    102 }
    103 
    104 impl PocketQueryConfig {
    105     pub const fn new(
    106         allow_scraping: bool,
    107         allow_scrape_if_limited_to: u32,
    108         allow_scrape_if_max_seconds: u64,
    109     ) -> Self {
    110         Self {
    111             allow_scraping,
    112             allow_scrape_if_limited_to,
    113             allow_scrape_if_max_seconds,
    114         }
    115     }
    116 
    117     pub fn allow_scraping(self) -> bool {
    118         self.allow_scraping
    119     }
    120 
    121     pub fn allow_scrape_if_limited_to(self) -> u32 {
    122         self.allow_scrape_if_limited_to
    123     }
    124 
    125     pub fn allow_scrape_if_max_seconds(self) -> u64 {
    126         self.allow_scrape_if_max_seconds
    127     }
    128 
    129     pub fn exact_count(self) -> Self {
    130         Self::new(
    131             true,
    132             self.allow_scrape_if_limited_to,
    133             self.allow_scrape_if_max_seconds,
    134         )
    135     }
    136 }
    137 
    138 impl Default for PocketQueryConfig {
    139     fn default() -> Self {
    140         Self::new(false, 100, 3_600)
    141     }
    142 }
    143 
    144 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    145 pub struct PocketDependencyBoundary {
    146     source_repository: &'static str,
    147     source_revision: &'static str,
    148 }
    149 
    150 impl PocketDependencyBoundary {
    151     pub fn current() -> Self {
    152         Self {
    153             source_repository: POCKET_SOURCE_REPOSITORY,
    154             source_revision: POCKET_SOURCE_REVISION,
    155         }
    156     }
    157 
    158     pub fn source_repository(&self) -> &'static str {
    159         self.source_repository
    160     }
    161 
    162     pub fn source_revision(&self) -> &'static str {
    163         self.source_revision
    164     }
    165 }
    166 
    167 #[derive(Clone)]
    168 pub struct PocketStoreHandle {
    169     store: Arc<PocketStore>,
    170     sync_policy: PocketSyncPolicy,
    171 }
    172 
    173 impl PocketStoreHandle {
    174     pub fn open(config: &PocketStoreConfig) -> Result<Self, PocketStoreError> {
    175         std::fs::create_dir_all(config.data_directory())
    176             .map_err(|error| PocketStoreError::from_create_dir(config.data_directory(), error))?;
    177         let store = PocketStore::new(config.data_directory(), TANGLE_POCKET_EXTRA_TABLES.to_vec())
    178             .map_err(PocketStoreError::from_pocket)?;
    179         Ok(Self {
    180             store: Arc::new(store),
    181             sync_policy: config.sync_policy(),
    182         })
    183     }
    184 
    185     pub fn dir(&self) -> &Path {
    186         self.store.dir()
    187     }
    188 
    189     pub fn sync(&self) -> Result<(), PocketStoreError> {
    190         self.store.sync().map_err(PocketStoreError::from_pocket)
    191     }
    192 
    193     pub fn sync_policy(&self) -> PocketSyncPolicy {
    194         self.sync_policy
    195     }
    196 
    197     pub fn store_event(&self, event: &PocketEvent) -> Result<u64, PocketStoreError> {
    198         let offset = self
    199             .store
    200             .store_event(event)
    201             .map_err(PocketStoreError::from_pocket)?;
    202         self.sync_after_write()?;
    203         Ok(offset)
    204     }
    205 
    206     pub fn event_by_id(
    207         &self,
    208         event_id: PocketEventId,
    209     ) -> Result<Option<PocketOwnedEvent>, PocketStoreError> {
    210         self.store
    211             .get_event_by_id(event_id)
    212             .map(|event| event.map(PocketEvent::to_owned))
    213             .map_err(PocketStoreError::from_pocket)
    214     }
    215 
    216     pub fn event_by_offset(&self, offset: u64) -> Result<PocketOwnedEvent, PocketStoreError> {
    217         self.store
    218             .get_event_by_offset(offset)
    219             .map(PocketEvent::to_owned)
    220             .map_err(PocketStoreError::from_pocket)
    221     }
    222 
    223     pub fn find_events(
    224         &self,
    225         filter: &PocketFilter,
    226         query: PocketQueryConfig,
    227     ) -> Result<Vec<PocketOwnedEvent>, PocketStoreError> {
    228         self.find_events_with_screen(filter, query, |_| PocketScreenResult::Match)
    229             .map(PocketScreenedEvents::into_events)
    230     }
    231 
    232     pub fn find_events_with_screen<F>(
    233         &self,
    234         filter: &PocketFilter,
    235         query: PocketQueryConfig,
    236         screen: F,
    237     ) -> Result<PocketScreenedEvents, PocketStoreError>
    238     where
    239         F: Fn(&PocketEvent) -> PocketScreenResult,
    240     {
    241         let (events, redacted) = self
    242             .store
    243             .find_events(
    244                 filter,
    245                 query.allow_scraping(),
    246                 query.allow_scrape_if_limited_to(),
    247                 query.allow_scrape_if_max_seconds(),
    248                 screen,
    249             )
    250             .map_err(PocketStoreError::from_pocket)?;
    251         Ok(PocketScreenedEvents::new(
    252             events.into_iter().map(PocketEvent::to_owned).collect(),
    253             redacted,
    254         ))
    255     }
    256 
    257     pub fn count_events(
    258         &self,
    259         filter: &PocketFilter,
    260         query: PocketQueryConfig,
    261     ) -> Result<u64, PocketStoreError> {
    262         self.find_events(filter, query)
    263             .map(|events| u64::try_from(events.len()).expect("usize count fits in u64"))
    264     }
    265 
    266     pub fn scan_events(&self) -> Result<Vec<PocketStoredEvent>, PocketStoreError> {
    267         self.scan_events_after(None)
    268     }
    269 
    270     pub fn scan_events_after(
    271         &self,
    272         last_offset: Option<u64>,
    273     ) -> Result<Vec<PocketStoredEvent>, PocketStoreError> {
    274         let stats = self.store.stats().map_err(PocketStoreError::from_pocket)?;
    275         let end = u64::try_from(stats.event_bytes)
    276             .map_err(|_| PocketStoreError::invalid("Pocket event map size exceeds u64"))?;
    277         let mut offset = match last_offset {
    278             Some(offset) => {
    279                 let event = self
    280                     .store
    281                     .get_event_by_offset(offset)
    282                     .map_err(PocketStoreError::from_pocket)?;
    283                 next_event_offset(offset, event)?
    284             }
    285             None => event_map_start_offset(),
    286         };
    287         let mut events = Vec::new();
    288         while offset < end {
    289             let event = self
    290                 .store
    291                 .get_event_by_offset(offset)
    292                 .map_err(PocketStoreError::from_pocket)?;
    293             events.push(PocketStoredEvent::new(offset, event.to_owned()));
    294             offset = next_event_offset(offset, event)?;
    295         }
    296         Ok(events)
    297     }
    298 
    299     pub fn put_extra_record(
    300         &self,
    301         table: &'static str,
    302         key: &[u8],
    303         value: &[u8],
    304     ) -> Result<(), PocketStoreError> {
    305         let table_handle = self.extra_table(table)?;
    306         let mut txn = self.store.write_txn().map_err(|error| {
    307             PocketStoreError::from_extra_table(table, "write transaction", error)
    308         })?;
    309         table_handle
    310             .put(&mut txn, key, value)
    311             .map_err(|error| PocketStoreError::from_extra_table(table, "put", error))?;
    312         txn.commit()
    313             .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?;
    314         self.sync_after_write()
    315     }
    316 
    317     pub fn get_extra_record(
    318         &self,
    319         table: &'static str,
    320         key: &[u8],
    321     ) -> Result<Option<Vec<u8>>, PocketStoreError> {
    322         let table_handle = self.extra_table(table)?;
    323         let txn = self.store.read_txn().map_err(|error| {
    324             PocketStoreError::from_extra_table(table, "read transaction", error)
    325         })?;
    326         table_handle
    327             .get(&txn, key)
    328             .map(|value| value.map(<[u8]>::to_vec))
    329             .map_err(|error| PocketStoreError::from_extra_table(table, "get", error))
    330     }
    331 
    332     pub fn delete_extra_record(
    333         &self,
    334         table: &'static str,
    335         key: &[u8],
    336     ) -> Result<(), PocketStoreError> {
    337         let table_handle = self.extra_table(table)?;
    338         let mut txn = self.store.write_txn().map_err(|error| {
    339             PocketStoreError::from_extra_table(table, "write transaction", error)
    340         })?;
    341         table_handle
    342             .delete(&mut txn, key)
    343             .map_err(|error| PocketStoreError::from_extra_table(table, "delete", error))?;
    344         txn.commit()
    345             .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?;
    346         self.sync_after_write()
    347     }
    348 
    349     pub fn scan_extra_records(
    350         &self,
    351         table: &'static str,
    352     ) -> Result<PocketExtraRecords, PocketStoreError> {
    353         let table_handle = self.extra_table(table)?;
    354         let txn = self.store.read_txn().map_err(|error| {
    355             PocketStoreError::from_extra_table(table, "read transaction", error)
    356         })?;
    357         let mut records = Vec::new();
    358         let iter = table_handle
    359             .iter(&txn)
    360             .map_err(|error| PocketStoreError::from_extra_table(table, "scan", error))?;
    361         for item in iter {
    362             let (key, value) =
    363                 item.map_err(|error| PocketStoreError::from_extra_table(table, "scan", error))?;
    364             records.push((key.to_vec(), value.to_vec()));
    365         }
    366         Ok(records)
    367     }
    368 
    369     fn extra_table(&self, table: &'static str) -> Result<Database<Bytes, Bytes>, PocketStoreError> {
    370         self.store
    371             .extra_table(table)
    372             .ok_or_else(|| PocketStoreError::missing_table(table))
    373     }
    374 
    375     fn sync_after_write(&self) -> Result<(), PocketStoreError> {
    376         match self.sync_policy {
    377             PocketSyncPolicy::FlushOnWrite => self.sync(),
    378             PocketSyncPolicy::FlushOnShutdown => Ok(()),
    379         }
    380     }
    381 }
    382 
    383 pub fn parse_pocket_event_json(raw: &[u8]) -> Result<PocketOwnedEvent, PocketStoreError> {
    384     if raw.is_empty() {
    385         return Err(PocketStoreError::invalid(
    386             "pocket event JSON must not be empty",
    387         ));
    388     }
    389     let mut buffer = vec![0; pocket_json_buffer_len(raw.len())];
    390     let (_, event) =
    391         PocketEvent::from_json(raw, &mut buffer).map_err(PocketStoreError::from_pocket_types)?;
    392     Ok(event.to_owned())
    393 }
    394 
    395 pub fn parse_pocket_filter_json(raw: &[u8]) -> Result<PocketOwnedFilter, PocketStoreError> {
    396     if raw.is_empty() {
    397         return Err(PocketStoreError::invalid(
    398             "pocket filter JSON must not be empty",
    399         ));
    400     }
    401     let mut buffer = vec![0; pocket_json_buffer_len(raw.len())];
    402     let (_, _, filter) =
    403         PocketFilter::from_json(raw, &mut buffer).map_err(PocketStoreError::from_pocket_types)?;
    404     Ok(filter.to_owned())
    405 }
    406 
    407 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    408 pub enum PocketSyncPolicy {
    409     FlushOnWrite,
    410     FlushOnShutdown,
    411 }
    412 
    413 #[derive(Debug, Clone, PartialEq, Eq)]
    414 pub struct PocketStoreConfig {
    415     data_directory: PathBuf,
    416     sync_policy: PocketSyncPolicy,
    417 }
    418 
    419 impl PocketStoreConfig {
    420     pub fn new(
    421         data_directory: impl Into<PathBuf>,
    422         sync_policy: PocketSyncPolicy,
    423     ) -> Result<Self, PocketConfigError> {
    424         let config = Self {
    425             data_directory: data_directory.into(),
    426             sync_policy,
    427         };
    428         config.validate()?;
    429         Ok(config)
    430     }
    431 
    432     pub fn validate(&self) -> Result<(), PocketConfigError> {
    433         if self.data_directory.as_os_str().is_empty() {
    434             return Err(PocketConfigError::invalid(
    435                 "pocket.data_directory must not be empty",
    436             ));
    437         }
    438         Ok(())
    439     }
    440 
    441     pub fn data_directory(&self) -> &Path {
    442         &self.data_directory
    443     }
    444 
    445     pub fn sync_policy(&self) -> PocketSyncPolicy {
    446         self.sync_policy
    447     }
    448 }
    449 
    450 #[derive(Debug, Clone, PartialEq, Eq)]
    451 pub struct PocketConfigError {
    452     message: String,
    453 }
    454 
    455 impl PocketConfigError {
    456     pub fn invalid(message: impl Into<String>) -> Self {
    457         Self {
    458             message: message.into(),
    459         }
    460     }
    461 
    462     pub fn message(&self) -> &str {
    463         &self.message
    464     }
    465 }
    466 
    467 impl fmt::Display for PocketConfigError {
    468     fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
    469         formatter.write_str(&self.message)
    470     }
    471 }
    472 
    473 impl std::error::Error for PocketConfigError {}
    474 
    475 #[derive(Debug, Clone, PartialEq, Eq)]
    476 pub struct PocketStoreError {
    477     message: String,
    478 }
    479 
    480 impl PocketStoreError {
    481     pub fn invalid(message: impl Into<String>) -> Self {
    482         Self {
    483             message: message.into(),
    484         }
    485     }
    486 
    487     pub fn from_create_dir(path: &Path, error: io::Error) -> Self {
    488         Self {
    489             message: format!(
    490                 "failed to create Pocket store directory {}: {error}",
    491                 path.display()
    492             ),
    493         }
    494     }
    495 
    496     pub fn from_pocket(error: pocket_db::Error) -> Self {
    497         Self {
    498             message: error.to_string(),
    499         }
    500     }
    501 
    502     pub fn from_pocket_types(error: pocket_types::Error) -> Self {
    503         Self {
    504             message: error.to_string(),
    505         }
    506     }
    507 
    508     pub fn missing_table(table: &'static str) -> Self {
    509         Self {
    510             message: format!("missing Pocket extra table {table}"),
    511         }
    512     }
    513 
    514     pub fn from_extra_table(
    515         table: &'static str,
    516         operation: &'static str,
    517         error: impl fmt::Display,
    518     ) -> Self {
    519         Self {
    520             message: format!("Pocket extra table {table} {operation} failed: {error}"),
    521         }
    522     }
    523 
    524     pub fn message(&self) -> &str {
    525         &self.message
    526     }
    527 }
    528 
    529 impl fmt::Display for PocketStoreError {
    530     fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
    531         formatter.write_str(&self.message)
    532     }
    533 }
    534 
    535 impl std::error::Error for PocketStoreError {}
    536 
    537 fn pocket_json_buffer_len(raw_len: usize) -> usize {
    538     raw_len.saturating_mul(2).max(4096)
    539 }
    540 
    541 fn event_map_start_offset() -> u64 {
    542     u64::try_from(std::mem::size_of::<usize>()).expect("usize header size fits u64")
    543 }
    544 
    545 fn align_event_offset(offset: u64) -> u64 {
    546     if offset.is_multiple_of(8) {
    547         offset
    548     } else {
    549         offset + (8 - offset % 8)
    550     }
    551 }
    552 
    553 fn next_event_offset(offset: u64, event: &PocketEvent) -> Result<u64, PocketStoreError> {
    554     let next = offset
    555         .checked_add(event_len_u64(event)?)
    556         .ok_or_else(|| PocketStoreError::invalid("Pocket event offset exceeds u64"))?;
    557     Ok(align_event_offset(next))
    558 }
    559 
    560 fn event_len_u64(event: &PocketEvent) -> Result<u64, PocketStoreError> {
    561     u64::try_from(event.len())
    562         .map_err(|_| PocketStoreError::invalid("Pocket event size exceeds u64"))
    563 }
    564 
    565 #[cfg(test)]
    566 mod tests {
    567     use super::{
    568         POCKET_SOURCE_REPOSITORY, POCKET_SOURCE_REVISION, PocketDependencyBoundary,
    569         PocketQueryConfig, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy,
    570         TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE,
    571         TANGLE_POCKET_EXTRA_TABLES, parse_pocket_event_json, parse_pocket_filter_json,
    572     };
    573     use pocket_db::ScreenResult;
    574     use std::time::{SystemTime, UNIX_EPOCH};
    575 
    576     #[test]
    577     fn pocket_dependency_boundary_pins_triesap_revision() {
    578         let boundary = PocketDependencyBoundary::current();
    579 
    580         assert_eq!(
    581             boundary.source_repository(),
    582             "https://github.com/triesap/pocket"
    583         );
    584         assert_eq!(boundary.source_repository(), POCKET_SOURCE_REPOSITORY);
    585         assert_eq!(
    586             boundary.source_revision(),
    587             "329334f20948c796c6016b673b92551ac4855ad7"
    588         );
    589         assert_eq!(boundary.source_revision(), POCKET_SOURCE_REVISION);
    590     }
    591 
    592     #[test]
    593     fn pocket_dependency_boundary_matches_manifest_and_lock_state() {
    594         let store_manifest = include_str!("../Cargo.toml");
    595         let groups_manifest = include_str!("../../tangle_groups/Cargo.toml");
    596         let lockfile = include_str!("../../../Cargo.lock");
    597         let approved_source = format!("git = \"{}\"", POCKET_SOURCE_REPOSITORY);
    598         let approved_revision = format!("rev = \"{}\"", POCKET_SOURCE_REVISION);
    599         let approved_lock_source = format!(
    600             "git+{}?rev={}#{}",
    601             POCKET_SOURCE_REPOSITORY, POCKET_SOURCE_REVISION, POCKET_SOURCE_REVISION
    602         );
    603 
    604         for manifest in [store_manifest, groups_manifest] {
    605             assert!(!manifest.contains("mikedilger/pocket"));
    606             assert!(manifest.contains(&approved_source));
    607             assert!(manifest.contains(&approved_revision));
    608         }
    609         assert!(!lockfile.contains("mikedilger/pocket"));
    610         assert!(lockfile.contains(&approved_lock_source));
    611     }
    612 
    613     #[test]
    614     fn pocket_query_config_exact_count_enables_scrape_scan() {
    615         let config = PocketQueryConfig::new(false, 7, 11).exact_count();
    616 
    617         assert!(config.allow_scraping());
    618         assert_eq!(config.allow_scrape_if_limited_to(), 7);
    619         assert_eq!(config.allow_scrape_if_max_seconds(), 11);
    620     }
    621 
    622     #[test]
    623     fn pocket_store_handle_opens_syncs_and_exposes_tangle_tables() {
    624         let root = std::env::temp_dir().join(format!("tangle-pocket-store-{}", std::process::id()));
    625         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    626             .expect("config");
    627 
    628         let handle = PocketStoreHandle::open(&config).expect("open");
    629 
    630         assert_eq!(handle.dir(), config.data_directory());
    631         assert_eq!(handle.sync_policy(), PocketSyncPolicy::FlushOnShutdown);
    632         assert_eq!(
    633             TANGLE_POCKET_EXTRA_TABLES,
    634             ["group_projection", "group_outbox", "group_checkpoint"]
    635         );
    636         handle.sync().expect("sync");
    637 
    638         let _ = std::fs::remove_dir_all(root);
    639     }
    640 
    641     #[test]
    642     fn pocket_store_handle_clones_share_one_store_boundary() {
    643         let root = temp_root("tangle-pocket-shared");
    644         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    645             .expect("config");
    646         let writer = PocketStoreHandle::open(&config).expect("open");
    647         let reader = writer.clone();
    648         let event = parse_pocket_event_json(event_json().as_bytes()).expect("event");
    649         let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter");
    650 
    651         let offset = writer.store_event(&event).expect("store");
    652         let stored = reader.event_by_offset(offset).expect("offset");
    653         let found = reader
    654             .find_events(&filter, PocketQueryConfig::default())
    655             .expect("find");
    656 
    657         assert_eq!(stored.id(), event.id());
    658         assert_eq!(found.len(), 1);
    659         assert_eq!(found[0].id(), event.id());
    660 
    661         let _ = std::fs::remove_dir_all(root);
    662     }
    663 
    664     #[test]
    665     fn pocket_store_handle_stores_queries_and_counts_events() {
    666         let root = std::env::temp_dir().join(format!("tangle-pocket-query-{}", std::process::id()));
    667         let _ = std::fs::remove_dir_all(&root);
    668         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    669             .expect("config");
    670         let handle = PocketStoreHandle::open(&config).expect("open");
    671         let event = parse_pocket_event_json(event_json().as_bytes()).expect("event");
    672         let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter");
    673 
    674         let offset = handle.store_event(&event).expect("store");
    675         let stored = handle
    676             .event_by_id(event.id())
    677             .expect("lookup")
    678             .expect("event");
    679         let offset_event = handle.event_by_offset(offset).expect("offset lookup");
    680         let found = handle
    681             .find_events(&filter, PocketQueryConfig::default())
    682             .expect("find");
    683 
    684         assert_eq!(stored.id(), event.id());
    685         assert_eq!(offset_event.id(), event.id());
    686         assert_eq!(found.len(), 1);
    687         assert_eq!(found[0].id(), event.id());
    688         assert_eq!(
    689             handle
    690                 .count_events(&filter, PocketQueryConfig::default())
    691                 .expect("count"),
    692             1
    693         );
    694 
    695         let _ = std::fs::remove_dir_all(root);
    696     }
    697 
    698     #[test]
    699     fn pocket_store_handle_scans_canonical_events_with_offsets() {
    700         let root = temp_root("tangle-pocket-scan");
    701         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    702             .expect("config");
    703         let handle = PocketStoreHandle::open(&config).expect("open");
    704         let first =
    705             parse_pocket_event_json(event_json_with("a", "1", "first").as_bytes()).expect("first");
    706         let second = parse_pocket_event_json(event_json_with("c", "2", "second").as_bytes())
    707             .expect("second");
    708 
    709         let first_offset = handle.store_event(&first).expect("store first");
    710         let second_offset = handle.store_event(&second).expect("store second");
    711         let all = handle.scan_events().expect("scan");
    712         let after_first = handle
    713             .scan_events_after(Some(first_offset))
    714             .expect("scan after first");
    715 
    716         assert_eq!(all.len(), 2);
    717         assert_eq!(all[0].store_offset(), first_offset);
    718         assert_eq!(all[0].event().id(), first.id());
    719         assert_eq!(all[1].store_offset(), second_offset);
    720         assert_eq!(all[1].event().id(), second.id());
    721         assert_eq!(after_first.len(), 1);
    722         assert_eq!(after_first[0].store_offset(), second_offset);
    723         assert_eq!(after_first[0].event().id(), second.id());
    724 
    725         let _ = std::fs::remove_dir_all(root);
    726     }
    727 
    728     #[test]
    729     fn pocket_store_handle_screens_events_before_materialization() {
    730         let root = temp_root("tangle-pocket-screen");
    731         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    732             .expect("config");
    733         let handle = PocketStoreHandle::open(&config).expect("open");
    734         let visible = parse_pocket_event_json(event_json_with("a", "1", "visible").as_bytes())
    735             .expect("visible");
    736         let redacted = parse_pocket_event_json(event_json_with("c", "2", "redacted").as_bytes())
    737             .expect("redacted");
    738         let filter = parse_pocket_filter_json(kind_filter_json().as_bytes()).expect("filter");
    739 
    740         handle.store_event(&visible).expect("store visible");
    741         handle.store_event(&redacted).expect("store redacted");
    742 
    743         let screened = handle
    744             .find_events_with_screen(&filter, PocketQueryConfig::default(), |event| {
    745                 if event.id() == visible.id() {
    746                     ScreenResult::Match
    747                 } else {
    748                     ScreenResult::Redacted
    749                 }
    750             })
    751             .expect("screened");
    752 
    753         assert!(screened.redacted());
    754         assert_eq!(screened.events().len(), 1);
    755         assert_eq!(screened.events()[0].id(), visible.id());
    756 
    757         let mismatched = handle
    758             .find_events_with_screen(&filter, PocketQueryConfig::default(), |event| {
    759                 if event.id() == visible.id() {
    760                     ScreenResult::Match
    761                 } else {
    762                     ScreenResult::Mismatch
    763                 }
    764             })
    765             .expect("mismatched");
    766 
    767         assert!(!mismatched.redacted());
    768         assert_eq!(mismatched.events().len(), 1);
    769         assert_eq!(mismatched.events()[0].id(), visible.id());
    770 
    771         let hidden = handle
    772             .find_events_with_screen(&filter, PocketQueryConfig::default(), |_| {
    773                 ScreenResult::Mismatch
    774             })
    775             .expect("hidden");
    776 
    777         assert!(!hidden.redacted());
    778         assert!(hidden.events().is_empty());
    779 
    780         let _ = std::fs::remove_dir_all(root);
    781     }
    782 
    783     #[test]
    784     fn pocket_store_handle_rejects_duplicate_event_writes_without_duplicate_materialization() {
    785         let root = temp_root("tangle-pocket-duplicate");
    786         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    787             .expect("config");
    788         let handle = PocketStoreHandle::open(&config).expect("open");
    789         let event = parse_pocket_event_json(event_json().as_bytes()).expect("event");
    790         let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter");
    791 
    792         let first_offset = handle.store_event(&event).expect("store first");
    793         let duplicate_error = handle.store_event(&event).expect_err("duplicate");
    794         let by_id = handle
    795             .event_by_id(event.id())
    796             .expect("lookup")
    797             .expect("event");
    798         let by_offset = handle.event_by_offset(first_offset).expect("offset");
    799         let found = handle
    800             .find_events(&filter, PocketQueryConfig::default())
    801             .expect("find");
    802         let scanned = handle.scan_events().expect("scan");
    803 
    804         assert!(
    805             duplicate_error
    806                 .message()
    807                 .to_lowercase()
    808                 .contains("duplicate")
    809         );
    810         assert_eq!(by_id.id(), event.id());
    811         assert_eq!(by_offset.id(), event.id());
    812         assert_eq!(found.len(), 1);
    813         assert_eq!(found[0].id(), event.id());
    814         assert_eq!(scanned.len(), 1);
    815         assert_eq!(scanned[0].store_offset(), first_offset);
    816         assert_eq!(scanned[0].event().id(), event.id());
    817 
    818         let _ = std::fs::remove_dir_all(root);
    819     }
    820 
    821     #[test]
    822     fn pocket_store_query_config_controls_scraping() {
    823         let root = temp_root("tangle-pocket-query-config");
    824         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    825             .expect("config");
    826         let handle = PocketStoreHandle::open(&config).expect("open");
    827         let event =
    828             parse_pocket_event_json(event_json_with("f", "6", "scrape").as_bytes()).expect("event");
    829         let broad = parse_pocket_filter_json(r#"{"limit":1}"#.as_bytes()).expect("filter");
    830 
    831         handle.store_event(&event).expect("store");
    832 
    833         assert!(
    834             handle
    835                 .find_events(&broad, PocketQueryConfig::new(false, 0, 0))
    836                 .expect_err("scrape rejected")
    837                 .message()
    838                 .contains("scraper")
    839         );
    840         let found = handle
    841             .find_events(&broad, PocketQueryConfig::new(false, 1, 0))
    842             .expect("limited scrape");
    843 
    844         assert_eq!(found.len(), 1);
    845         assert_eq!(found[0].id(), event.id());
    846 
    847         let _ = std::fs::remove_dir_all(root);
    848     }
    849 
    850     #[test]
    851     fn pocket_store_handle_persists_extra_table_records() {
    852         let root = temp_root("tangle-pocket-extra");
    853         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    854             .expect("config");
    855         let handle = PocketStoreHandle::open(&config).expect("open");
    856 
    857         handle
    858             .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"state-v1")
    859             .expect("put projection");
    860         handle
    861             .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"state-v2")
    862             .expect("update projection");
    863         handle
    864             .put_extra_record(TANGLE_GROUP_OUTBOX_TABLE, b"outbox\0b", b"record-1")
    865             .expect("put outbox one");
    866         handle
    867             .put_extra_record(TANGLE_GROUP_OUTBOX_TABLE, b"outbox\0a", b"record-0")
    868             .expect("put outbox zero");
    869         handle
    870             .put_extra_record(
    871                 TANGLE_GROUP_CHECKPOINT_TABLE,
    872                 b"checkpoint\0groups",
    873                 b"checkpoint",
    874             )
    875             .expect("put checkpoint");
    876 
    877         assert_eq!(
    878             handle
    879                 .get_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm")
    880                 .expect("get projection"),
    881             Some(b"state-v2".to_vec())
    882         );
    883         assert_eq!(
    884             handle
    885                 .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)
    886                 .expect("scan outbox"),
    887             vec![
    888                 (b"outbox\0a".to_vec(), b"record-0".to_vec()),
    889                 (b"outbox\0b".to_vec(), b"record-1".to_vec()),
    890             ]
    891         );
    892         handle
    893             .delete_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm")
    894             .expect("delete projection");
    895         assert_eq!(
    896             handle
    897                 .get_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm")
    898                 .expect("deleted projection"),
    899             None
    900         );
    901         drop(handle);
    902 
    903         let reopened = PocketStoreHandle::open(&config).expect("reopen");
    904         assert_eq!(
    905             reopened
    906                 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0groups")
    907                 .expect("checkpoint"),
    908             Some(b"checkpoint".to_vec())
    909         );
    910 
    911         drop(reopened);
    912         let _ = std::fs::remove_dir_all(root);
    913     }
    914 
    915     #[test]
    916     fn pocket_store_handle_flush_on_write_syncs_written_events_and_extra_records() {
    917         let root = temp_root("tangle-pocket-flush-write");
    918         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnWrite)
    919             .expect("config");
    920         let handle = PocketStoreHandle::open(&config).expect("open");
    921         let event =
    922             parse_pocket_event_json(event_json_with("e", "5", "flush").as_bytes()).expect("event");
    923 
    924         let offset = handle.store_event(&event).expect("store");
    925         handle
    926             .put_extra_record(
    927                 TANGLE_GROUP_CHECKPOINT_TABLE,
    928                 b"checkpoint\0flush",
    929                 b"flushed",
    930             )
    931             .expect("checkpoint");
    932         drop(handle);
    933 
    934         let reopened = PocketStoreHandle::open(&config).expect("reopen");
    935         let by_id = reopened
    936             .event_by_id(event.id())
    937             .expect("lookup")
    938             .expect("event");
    939         let by_offset = reopened.event_by_offset(offset).expect("offset");
    940 
    941         assert_eq!(by_id.id(), event.id());
    942         assert_eq!(by_offset.id(), event.id());
    943         assert_eq!(
    944             reopened
    945                 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0flush")
    946                 .expect("checkpoint"),
    947             Some(b"flushed".to_vec())
    948         );
    949 
    950         drop(reopened);
    951         let _ = std::fs::remove_dir_all(root);
    952     }
    953 
    954     #[test]
    955     fn pocket_store_handle_syncs_written_events_and_extra_records() {
    956         let root = temp_root("tangle-pocket-sync");
    957         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
    958             .expect("config");
    959         let handle = PocketStoreHandle::open(&config).expect("open");
    960         let event =
    961             parse_pocket_event_json(event_json_with("d", "4", "synced").as_bytes()).expect("event");
    962 
    963         let offset = handle.store_event(&event).expect("store");
    964         handle
    965             .put_extra_record(
    966                 TANGLE_GROUP_CHECKPOINT_TABLE,
    967                 b"checkpoint\0sync",
    968                 b"synced",
    969             )
    970             .expect("checkpoint");
    971         handle.sync().expect("sync");
    972         drop(handle);
    973 
    974         let reopened = PocketStoreHandle::open(&config).expect("reopen");
    975         let by_id = reopened
    976             .event_by_id(event.id())
    977             .expect("lookup")
    978             .expect("event");
    979         let by_offset = reopened.event_by_offset(offset).expect("offset");
    980 
    981         assert_eq!(by_id.id(), event.id());
    982         assert_eq!(by_offset.id(), event.id());
    983         assert_eq!(
    984             reopened
    985                 .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0sync")
    986                 .expect("checkpoint"),
    987             Some(b"synced".to_vec())
    988         );
    989 
    990         drop(reopened);
    991         let _ = std::fs::remove_dir_all(root);
    992     }
    993 
    994     #[test]
    995     fn pocket_store_config_preserves_explicit_storage_boundary() {
    996         let config = PocketStoreConfig::new(
    997             "runtime/radroots/tangle/pocket",
    998             PocketSyncPolicy::FlushOnShutdown,
    999         )
   1000         .expect("config");
   1001 
   1002         assert_eq!(
   1003             config.data_directory().to_string_lossy(),
   1004             "runtime/radroots/tangle/pocket"
   1005         );
   1006         assert_eq!(config.sync_policy(), PocketSyncPolicy::FlushOnShutdown);
   1007     }
   1008 
   1009     #[test]
   1010     fn pocket_store_config_rejects_implicit_storage_values() {
   1011         assert_eq!(
   1012             PocketStoreConfig::new("", PocketSyncPolicy::FlushOnWrite)
   1013                 .expect_err("error")
   1014                 .message(),
   1015             "pocket.data_directory must not be empty"
   1016         );
   1017     }
   1018 
   1019     fn event_json() -> String {
   1020         event_json_with("a", "1", "hello")
   1021     }
   1022 
   1023     fn event_json_with(id_hex: &str, pubkey_hex: &str, content: &str) -> String {
   1024         format!(
   1025             r#"{{
   1026                 "id":"{}",
   1027                 "pubkey":"{}",
   1028                 "created_at":1714124433,
   1029                 "kind":1,
   1030                 "tags":[["t","radroots"]],
   1031                 "content":"{}",
   1032                 "sig":"{}"
   1033             }}"#,
   1034             id_hex.repeat(64),
   1035             pubkey_hex.repeat(64),
   1036             content,
   1037             "b".repeat(128)
   1038         )
   1039     }
   1040 
   1041     fn filter_json() -> String {
   1042         r#"{"ids":["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"],"limit":10}"#
   1043             .to_owned()
   1044     }
   1045 
   1046     fn kind_filter_json() -> String {
   1047         r#"{"kinds":[1],"limit":10}"#.to_owned()
   1048     }
   1049 
   1050     fn temp_root(prefix: &str) -> std::path::PathBuf {
   1051         std::env::temp_dir().join(format!(
   1052             "{}-{}-{}",
   1053             prefix,
   1054             std::process::id(),
   1055             SystemTime::now()
   1056                 .duration_since(UNIX_EPOCH)
   1057                 .expect("system time")
   1058                 .as_nanos()
   1059         ))
   1060     }
   1061 }