commit 89c88d56c832052d2cb2d8b44025ce80c7160f1c
parent 2294160193addfdfb64099453ead4bfb7a535221
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 15:52:13 -0700
storage: honor pocket sync policy
Diffstat:
3 files changed, 65 insertions(+), 7 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -67,7 +67,6 @@ impl GroupService {
};
service.derive_missing_outbox_records(store)?;
service.materialize_outbox(store)?;
- store.sync()?;
Ok(Some(service))
}
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -710,7 +710,6 @@ impl BaseRelay {
store_offset,
)?);
}
- self.store.sync()?;
Ok(BaseRelayEventWrite::stored(
ok_accepted(event_id, String::new()),
stored_offsets,
diff --git a/crates/tangle_store_pocket/src/lib.rs b/crates/tangle_store_pocket/src/lib.rs
@@ -117,6 +117,7 @@ impl PocketDependencyBoundary {
pub struct PocketStoreHandle {
store: PocketStore,
+ sync_policy: PocketSyncPolicy,
}
impl PocketStoreHandle {
@@ -125,7 +126,10 @@ impl PocketStoreHandle {
.map_err(|error| PocketStoreError::from_create_dir(config.data_directory(), error))?;
let store = PocketStore::new(config.data_directory(), TANGLE_POCKET_EXTRA_TABLES.to_vec())
.map_err(PocketStoreError::from_pocket)?;
- Ok(Self { store })
+ Ok(Self {
+ store,
+ sync_policy: config.sync_policy(),
+ })
}
pub fn dir(&self) -> &Path {
@@ -136,10 +140,17 @@ impl PocketStoreHandle {
self.store.sync().map_err(PocketStoreError::from_pocket)
}
+ pub fn sync_policy(&self) -> PocketSyncPolicy {
+ self.sync_policy
+ }
+
pub fn store_event(&self, event: &PocketEvent) -> Result<u64, PocketStoreError> {
- self.store
+ let offset = self
+ .store
.store_event(event)
- .map_err(PocketStoreError::from_pocket)
+ .map_err(PocketStoreError::from_pocket)?;
+ self.sync_after_write()?;
+ Ok(offset)
}
pub fn event_by_id(
@@ -246,7 +257,8 @@ impl PocketStoreHandle {
.put(&mut txn, key, value)
.map_err(|error| PocketStoreError::from_extra_table(table, "put", error))?;
txn.commit()
- .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))
+ .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?;
+ self.sync_after_write()
}
pub fn get_extra_record(
@@ -277,7 +289,8 @@ impl PocketStoreHandle {
.delete(&mut txn, key)
.map_err(|error| PocketStoreError::from_extra_table(table, "delete", error))?;
txn.commit()
- .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))
+ .map_err(|error| PocketStoreError::from_extra_table(table, "commit", error))?;
+ self.sync_after_write()
}
pub fn scan_extra_records(
@@ -309,6 +322,13 @@ impl PocketStoreHandle {
.extra_table(table)
.ok_or_else(|| PocketStoreError::missing_table(table))
}
+
+ fn sync_after_write(&self) -> Result<(), PocketStoreError> {
+ match self.sync_policy {
+ PocketSyncPolicy::FlushOnWrite => self.sync(),
+ PocketSyncPolicy::FlushOnShutdown => Ok(()),
+ }
+ }
}
pub fn parse_pocket_event_json(raw: &[u8]) -> Result<PocketOwnedEvent, PocketStoreError> {
@@ -529,6 +549,7 @@ mod tests {
let handle = PocketStoreHandle::open(&config).expect("open");
assert_eq!(handle.dir(), config.data_directory());
+ assert_eq!(handle.sync_policy(), PocketSyncPolicy::FlushOnShutdown);
assert_eq!(
TANGLE_POCKET_EXTRA_TABLES,
["group_projection", "group_outbox", "group_checkpoint"]
@@ -693,6 +714,45 @@ mod tests {
}
#[test]
+ fn pocket_store_handle_flush_on_write_syncs_written_events_and_extra_records() {
+ let root = temp_root("tangle-pocket-flush-write");
+ let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnWrite)
+ .expect("config");
+ let handle = PocketStoreHandle::open(&config).expect("open");
+ let event =
+ parse_pocket_event_json(event_json_with("e", "5", "flush").as_bytes()).expect("event");
+
+ let offset = handle.store_event(&event).expect("store");
+ handle
+ .put_extra_record(
+ TANGLE_GROUP_CHECKPOINT_TABLE,
+ b"checkpoint\0flush",
+ b"flushed",
+ )
+ .expect("checkpoint");
+ drop(handle);
+
+ let reopened = PocketStoreHandle::open(&config).expect("reopen");
+ let by_id = reopened
+ .event_by_id(event.id())
+ .expect("lookup")
+ .expect("event");
+ let by_offset = reopened.event_by_offset(offset).expect("offset");
+
+ assert_eq!(by_id.id(), event.id());
+ assert_eq!(by_offset.id(), event.id());
+ assert_eq!(
+ reopened
+ .get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, b"checkpoint\0flush")
+ .expect("checkpoint"),
+ Some(b"flushed".to_vec())
+ );
+
+ drop(reopened);
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[test]
fn pocket_store_handle_syncs_written_events_and_extra_records() {
let root = temp_root("tangle-pocket-sync");
let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)