commit b70d855cf5f63dc8993c9bdcf433600d11aba27b
parent 2897f545c1705ef43a29ac5d53094b7e93df7237
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 16:19:38 -0700
storage: share pocket store handle
Diffstat:
3 files changed, 52 insertions(+), 16 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -606,10 +606,18 @@ impl BaseRelay {
self.groups.is_some()
}
+ pub(crate) fn store_handle(&self) -> PocketStoreHandle {
+ self.store.clone()
+ }
+
pub fn group_projection(&self) -> Option<&GroupProjection> {
self.groups.as_ref().map(|groups| groups.projection())
}
+ pub(crate) fn group_service(&self) -> Option<&GroupService> {
+ self.groups.as_ref()
+ }
+
pub(crate) fn group_outbox_pending_events(&self) -> usize {
self.groups
.as_ref()
@@ -1019,7 +1027,7 @@ impl BaseRelay {
.collect()
}
- fn group_read_gate_visible_to_auth(
+ pub(crate) fn group_read_gate_visible_to_auth(
groups: Option<&GroupService>,
event: &(impl GroupEventView + ?Sized),
auth: &GroupAuthContext,
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -6,6 +6,7 @@ use crate::{
event_bus::{TangleEventBus, TangleEventReceiver},
logging,
ops::{BaseRelayReadinessHandle, BaseRelayReadinessState},
+ pocket_conversion::pocket_event_to_tangle,
rate_limits::{
TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey,
TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter,
@@ -29,12 +30,13 @@ use std::{
time::Instant,
};
use tangle_groups::{
- GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset,
+ GroupAuthContext, GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset,
validate_client_group_event_structure,
};
use tangle_protocol::{
ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, UnixTimestamp,
};
+use tangle_store_pocket::PocketStoreHandle;
use tokio::sync::{Mutex, watch};
pub struct TangleRuntime {
@@ -152,6 +154,7 @@ impl TangleRuntime {
struct TangleRuntimeShared {
config: Arc<BaseRelayRuntimeConfig>,
+ store: PocketStoreHandle,
relay: Mutex<BaseRelay>,
readiness: BaseRelayReadinessHandle,
limits: TangleRuntimeLimits,
@@ -173,8 +176,10 @@ impl TangleRuntimeShared {
metrics,
shutdown,
} = runtime;
+ let store = relay.store_handle();
Self {
config: Arc::new(config),
+ store,
relay: Mutex::new(relay),
readiness,
limits,
@@ -815,16 +820,18 @@ impl TangleRuntimeHandle {
offset: StoreOffset,
auth: &BaseAuthState,
) -> Result<Option<Event>, BaseRelayError> {
- let event = self
- .inner
- .relay
- .lock()
- .await
- .event_by_offset_with_auth(offset, auth)?;
- if event.is_none() {
+ let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?;
+ let event = pocket_event_to_tangle(&pocket_event)?;
+ let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
+ let visible = {
+ let relay = self.inner.relay.lock().await;
+ BaseRelay::group_read_gate_visible_to_auth(relay.group_service(), &event, &group_auth)?
+ };
+ if !visible {
self.inner.metrics.record_group_read_denial();
+ return Ok(None);
}
- Ok(event)
+ Ok(Some(event))
}
pub(crate) async fn fanout_event_offset(
diff --git a/crates/tangle_store_pocket/src/lib.rs b/crates/tangle_store_pocket/src/lib.rs
@@ -11,6 +11,7 @@ use pocket_types::{
use std::{
io,
path::{Path, PathBuf},
+ sync::Arc,
};
pub const POCKET_SOURCE_REPOSITORY: &str = "https://github.com/triesap/pocket";
@@ -154,8 +155,9 @@ impl PocketDependencyBoundary {
}
}
+#[derive(Clone)]
pub struct PocketStoreHandle {
- store: PocketStore,
+ store: Arc<PocketStore>,
sync_policy: PocketSyncPolicy,
}
@@ -166,7 +168,7 @@ impl PocketStoreHandle {
let store = PocketStore::new(config.data_directory(), TANGLE_POCKET_EXTRA_TABLES.to_vec())
.map_err(PocketStoreError::from_pocket)?;
Ok(Self {
- store,
+ store: Arc::new(store),
sync_policy: config.sync_policy(),
})
}
@@ -355,10 +357,6 @@ impl PocketStoreHandle {
Ok(records)
}
- pub fn into_inner(self) -> PocketStore {
- self.store
- }
-
fn extra_table(&self, table: &'static str) -> Result<Database<Bytes, Bytes>, PocketStoreError> {
self.store
.extra_table(table)
@@ -602,6 +600,29 @@ mod tests {
}
#[test]
+ fn pocket_store_handle_clones_share_one_store_boundary() {
+ let root = temp_root("tangle-pocket-shared");
+ let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
+ .expect("config");
+ let writer = PocketStoreHandle::open(&config).expect("open");
+ let reader = writer.clone();
+ let event = parse_pocket_event_json(event_json().as_bytes()).expect("event");
+ let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter");
+
+ let offset = writer.store_event(&event).expect("store");
+ let stored = reader.event_by_offset(offset).expect("offset");
+ let found = reader
+ .find_events(&filter, PocketQueryConfig::default())
+ .expect("find");
+
+ assert_eq!(stored.id(), event.id());
+ assert_eq!(found.len(), 1);
+ assert_eq!(found[0].id(), event.id());
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[test]
fn pocket_store_handle_stores_queries_and_counts_events() {
let root = std::env::temp_dir().join(format!("tangle-pocket-query-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&root);