commit 48b3a888c0b22cd0c19e579ada25925c8fdee189
parent e976ba37a9b0e80c9017fad951444ff6865e1330
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 03:13:16 -0700
runtime: expose screened pocket queries
- add screened Pocket query results and event offset lookup to the store boundary
- route relay REQ and COUNT paths through the group read gate inside Pocket screening
- expose auth-screened event-by-offset lookup for runtime session fanout
- cover store screening, offset lookup, and private group offset visibility
Diffstat:
3 files changed, 235 insertions(+), 13 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -8,14 +8,14 @@ use crate::relay::{
auth::BaseAuthState,
live::{CloseResult, LiveSubscriptionSet},
};
-use std::collections::BTreeSet;
+use std::{cell::RefCell, collections::BTreeSet};
use tangle_crypto::verify_event_signature;
use tangle_groups::{
GroupAuthContext, GroupEventClass, GroupProjection, GroupRuntimeConfig, StoreOffset,
validate_client_group_event_structure,
};
use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
-use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle};
+use tangle_store_pocket::{PocketScreenResult, PocketStoreConfig, PocketStoreHandle};
pub struct BaseRelay {
store: PocketStoreHandle,
@@ -119,6 +119,38 @@ impl BaseRelay {
)
}
+ fn event_by_offset(&self, offset: StoreOffset) -> Result<Event, BaseRelayError> {
+ let event = self.store.event_by_offset(offset.as_u64())?;
+ pocket_event_to_tangle(&event)
+ }
+
+ pub fn event_by_offset_with_auth(
+ &self,
+ offset: StoreOffset,
+ auth: &BaseAuthState,
+ ) -> Result<Option<Event>, BaseRelayError> {
+ let event = self.event_by_offset(offset)?;
+ if self.event_visible_to_auth(
+ &event,
+ &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
+ )? {
+ Ok(Some(event))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub fn query_events_with_auth(
+ &self,
+ filters: &[Filter],
+ auth: &BaseAuthState,
+ ) -> Result<Vec<Event>, BaseRelayError> {
+ self.query_events(
+ filters,
+ &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
+ )
+ }
+
fn handle_auth_message(
&self,
event: Event,
@@ -349,9 +381,34 @@ impl BaseRelay {
let mut output = Vec::new();
for filter in filters {
let pocket_filter = tangle_filter_to_pocket(filter)?;
- for pocket_event in self.store.find_events(&pocket_filter)? {
+ let screen_error = RefCell::new(None);
+ let screened = self.store.find_events_with_screen(
+ &pocket_filter,
+ true,
+ 0,
+ u64::MAX,
+ |pocket_event| {
+ if screen_error.borrow().is_some() {
+ return PocketScreenResult::Mismatch;
+ }
+ match pocket_event_to_tangle(pocket_event)
+ .and_then(|event| self.event_visible_to_auth(&event, auth))
+ {
+ Ok(true) => PocketScreenResult::Match,
+ Ok(false) => PocketScreenResult::Redacted,
+ Err(error) => {
+ *screen_error.borrow_mut() = Some(error);
+ PocketScreenResult::Mismatch
+ }
+ }
+ },
+ )?;
+ if let Some(error) = screen_error.into_inner() {
+ return Err(error);
+ }
+ for pocket_event in screened.into_events() {
let event = pocket_event_to_tangle(&pocket_event)?;
- if seen.insert(event.id().clone()) && self.event_visible_to_auth(&event, auth)? {
+ if seen.insert(event.id().clone()) {
output.push(event);
}
}
@@ -375,6 +432,7 @@ impl BaseRelay {
#[cfg(test)]
mod tests {
use super::BaseRelay;
+ use crate::pocket_conversion::tangle_event_to_pocket;
use crate::relay::auth::BaseAuthState;
use crate::relay::live::CloseResult;
use tangle_crypto::RelaySigner;
@@ -382,7 +440,8 @@ mod tests {
GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_CREATE_INVITE,
KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA,
KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
- KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberStatus, parse_group_runtime_config_json,
+ KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberStatus, StoreOffset,
+ parse_group_runtime_config_json,
};
use tangle_protocol::{
ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId,
@@ -440,6 +499,16 @@ mod tests {
}
#[test]
+ fn base_relay_fetches_events_by_store_offset() {
+ let relay = test_relay("base-relay-offset-lookup", 4);
+ let event = signed_public_event(7, 1, Vec::new(), "offset");
+ let pocket = tangle_event_to_pocket(&event).expect("pocket");
+ let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store"));
+
+ assert_eq!(relay.event_by_offset(offset).expect("offset"), event);
+ }
+
+ #[test]
fn base_relay_rejects_group_marked_events_before_group_service() {
let mut relay = test_relay("base-relay-group-reject", 4);
let event = signed_public_event(
@@ -1077,6 +1146,42 @@ mod tests {
}
#[test]
+ fn private_group_offset_lookup_uses_reader_auth() {
+ let owner = signer(7).public_key().clone();
+ let owner_auth = authenticated_state(7);
+ let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
+ let mut relay = test_relay_with_groups(
+ "base-relay-private-offset-read",
+ 4,
+ &enabled_groups_for_owner(&owner),
+ );
+ relay
+ .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &owner_auth)
+ .expect("create");
+ let private_event = signed_event_at(
+ 7,
+ 1,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ "private harvest",
+ 1_714_124_435,
+ );
+ let pocket = tangle_event_to_pocket(&private_event).expect("pocket");
+ let offset = StoreOffset::new(relay.store.store_event(&pocket).expect("store"));
+
+ assert_eq!(
+ relay
+ .event_by_offset_with_auth(offset, &unauth)
+ .expect("unauth offset"),
+ None
+ );
+ let visible = relay
+ .event_by_offset_with_auth(offset, &owner_auth)
+ .expect("owner offset")
+ .expect("visible");
+ assert_eq!(visible.id(), private_event.id());
+ }
+
+ #[test]
fn private_group_live_fanout_uses_subscription_auth() {
let owner = signer(7).public_key().clone();
let auth = authenticated_state(7);
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -18,7 +18,8 @@ use std::{
},
time::Instant,
};
-use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
+use tangle_groups::StoreOffset;
+use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
use tokio::sync::{Mutex, watch};
pub struct TangleRuntime {
@@ -131,6 +132,18 @@ impl TangleRuntimeHandle {
.query_req_with_auth(subscription_id, filters, auth)
}
+ pub async fn event_by_offset_with_auth(
+ &self,
+ offset: StoreOffset,
+ auth: &BaseAuthState,
+ ) -> Result<Option<Event>, BaseRelayError> {
+ self.inner
+ .lock()
+ .await
+ .relay()
+ .event_by_offset_with_auth(offset, auth)
+ }
+
pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
self.inner.lock().await.shutdown()
}
diff --git a/crates/tangle_store_pocket/src/lib.rs b/crates/tangle_store_pocket/src/lib.rs
@@ -25,6 +25,30 @@ pub type PocketStore = Store;
pub type PocketExtraRecord = (Vec<u8>, Vec<u8>);
pub type PocketExtraRecords = Vec<PocketExtraRecord>;
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PocketScreenedEvents {
+ events: Vec<PocketOwnedEvent>,
+ redacted: bool,
+}
+
+impl PocketScreenedEvents {
+ pub fn new(events: Vec<PocketOwnedEvent>, redacted: bool) -> Self {
+ Self { events, redacted }
+ }
+
+ pub fn events(&self) -> &[PocketOwnedEvent] {
+ &self.events
+ }
+
+ pub fn redacted(&self) -> bool {
+ self.redacted
+ }
+
+ pub fn into_events(self) -> Vec<PocketOwnedEvent> {
+ self.events
+ }
+}
+
pub const TANGLE_GROUP_PROJECTION_TABLE: &str = "group_projection";
pub const TANGLE_GROUP_OUTBOX_TABLE: &str = "group_outbox";
pub const TANGLE_GROUP_CHECKPOINT_TABLE: &str = "group_checkpoint";
@@ -94,15 +118,46 @@ impl PocketStoreHandle {
.map_err(PocketStoreError::from_pocket)
}
+ pub fn event_by_offset(&self, offset: u64) -> Result<PocketOwnedEvent, PocketStoreError> {
+ self.store
+ .get_event_by_offset(offset)
+ .map(PocketEvent::to_owned)
+ .map_err(PocketStoreError::from_pocket)
+ }
+
pub fn find_events(
&self,
filter: &PocketFilter,
) -> Result<Vec<PocketOwnedEvent>, PocketStoreError> {
- let (events, _) = self
+ self.find_events_with_screen(filter, true, 0, u64::MAX, |_| PocketScreenResult::Match)
+ .map(PocketScreenedEvents::into_events)
+ }
+
+ pub fn find_events_with_screen<F>(
+ &self,
+ filter: &PocketFilter,
+ allow_scraping: bool,
+ allow_scrape_if_limited_to: u32,
+ allow_scrape_if_max_seconds: u64,
+ screen: F,
+ ) -> Result<PocketScreenedEvents, PocketStoreError>
+ where
+ F: Fn(&PocketEvent) -> PocketScreenResult,
+ {
+ let (events, redacted) = self
.store
- .find_events(filter, true, 0, u64::MAX, |_| PocketScreenResult::Match)
+ .find_events(
+ filter,
+ allow_scraping,
+ allow_scrape_if_limited_to,
+ allow_scrape_if_max_seconds,
+ screen,
+ )
.map_err(PocketStoreError::from_pocket)?;
- Ok(events.into_iter().map(PocketEvent::to_owned).collect())
+ Ok(PocketScreenedEvents::new(
+ events.into_iter().map(PocketEvent::to_owned).collect(),
+ redacted,
+ ))
}
pub fn count_events(&self, filter: &PocketFilter) -> Result<u64, PocketStoreError> {
@@ -379,6 +434,7 @@ mod tests {
TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, TANGLE_POCKET_EXTRA_TABLES,
parse_pocket_event_json, parse_pocket_filter_json,
};
+ use pocket_db::ScreenResult;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
@@ -435,14 +491,16 @@ mod tests {
let event = parse_pocket_event_json(event_json().as_bytes()).expect("event");
let filter = parse_pocket_filter_json(filter_json().as_bytes()).expect("filter");
- let _offset = handle.store_event(&event).expect("store");
+ let offset = handle.store_event(&event).expect("store");
let stored = handle
.event_by_id(event.id())
.expect("lookup")
.expect("event");
+ let offset_event = handle.event_by_offset(offset).expect("offset lookup");
let found = handle.find_events(&filter).expect("find");
assert_eq!(stored.id(), event.id());
+ assert_eq!(offset_event.id(), event.id());
assert_eq!(found.len(), 1);
assert_eq!(found[0].id(), event.id());
assert_eq!(handle.count_events(&filter).expect("count"), 1);
@@ -451,6 +509,43 @@ mod tests {
}
#[test]
+ fn pocket_store_handle_screens_events_before_materialization() {
+ let root = temp_root("tangle-pocket-screen");
+ let config = PocketStoreConfig::new(
+ root.join("pocket"),
+ 1024 * 1024 * 1024,
+ 128,
+ PocketSyncPolicy::FlushOnShutdown,
+ )
+ .expect("config");
+ let handle = PocketStoreHandle::open(&config).expect("open");
+ let visible = parse_pocket_event_json(event_json_with("a", "1", "visible").as_bytes())
+ .expect("visible");
+ let redacted = parse_pocket_event_json(event_json_with("c", "2", "redacted").as_bytes())
+ .expect("redacted");
+ let filter = parse_pocket_filter_json(kind_filter_json().as_bytes()).expect("filter");
+
+ handle.store_event(&visible).expect("store visible");
+ handle.store_event(&redacted).expect("store redacted");
+
+ let screened = handle
+ .find_events_with_screen(&filter, true, 0, u64::MAX, |event| {
+ if event.id() == visible.id() {
+ ScreenResult::Match
+ } else {
+ ScreenResult::Redacted
+ }
+ })
+ .expect("screened");
+
+ assert!(screened.redacted());
+ assert_eq!(screened.events().len(), 1);
+ assert_eq!(screened.events()[0].id(), visible.id());
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[test]
fn pocket_store_handle_persists_extra_table_records() {
let root = temp_root("tangle-pocket-extra");
let config = PocketStoreConfig::new(
@@ -572,6 +667,10 @@ mod tests {
}
fn event_json() -> String {
+ event_json_with("a", "1", "hello")
+ }
+
+ fn event_json_with(id_hex: &str, pubkey_hex: &str, content: &str) -> String {
format!(
r#"{{
"id":"{}",
@@ -579,11 +678,12 @@ mod tests {
"created_at":1714124433,
"kind":1,
"tags":[["t","radroots"]],
- "content":"hello",
+ "content":"{}",
"sig":"{}"
}}"#,
- "a".repeat(64),
- "1".repeat(64),
+ id_hex.repeat(64),
+ pubkey_hex.repeat(64),
+ content,
"b".repeat(128)
)
}
@@ -593,6 +693,10 @@ mod tests {
.to_owned()
}
+ fn kind_filter_json() -> String {
+ r#"{"kinds":[1],"limit":10}"#.to_owned()
+ }
+
fn temp_root(prefix: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!(
"{}-{}-{}",