commit 6ac248f91364861379dc18f458afaf63aefc2db3
parent 7421ae582d1d6c82ef8ec8ee654fa5e286adfd76
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 19:52:51 -0700
runtime: match live subscriptions with Pocket
- Store live subscription filters as Pocket-owned filters.
- Match live fanout with Pocket event_matches and return subscription ids.
- Keep runtime offset fanout Pocket-backed through outbound EVENT construction.
- Validate live, current-auth live, workspace check, source scan, and clippy lanes.
Diffstat:
4 files changed, 159 insertions(+), 87 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -5,8 +5,10 @@ use crate::groups::{
use crate::logging::{self, TangleModerationAuditResult};
use crate::ops::BaseRelayReadinessState;
#[cfg(test)]
-use crate::pocket_conversion::{pocket_event_id, tangle_event_to_pocket};
-use crate::pocket_conversion::{pocket_event_to_tangle, tangle_filter_to_pocket};
+use crate::pocket_conversion::pocket_event_id;
+use crate::pocket_conversion::{
+ pocket_event_to_tangle, tangle_event_to_pocket, tangle_filter_to_pocket,
+};
use crate::pocket_event_validation::{
is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind,
pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature,
@@ -1210,15 +1212,21 @@ impl BaseRelay {
}
let should_subscribe = !filters_are_complete(&filters);
if should_subscribe {
+ let pocket_filters = filters
+ .iter()
+ .map(tangle_filter_to_pocket)
+ .collect::<Result<Vec<_>, _>>()?;
self.subscriptions
- .ensure_can_subscribe(&subscription_id, &filters)?;
- }
- let report =
- self.query_req_with_group_auth_report(subscription_id.clone(), filters.clone(), auth)?;
- if should_subscribe && !report.group_read_denied() {
- self.subscriptions.subscribe(subscription_id, filters)?;
+ .ensure_can_subscribe(&subscription_id, &pocket_filters)?;
+ let report =
+ self.query_req_with_group_auth_report(subscription_id.clone(), filters, auth)?;
+ if !report.group_read_denied() {
+ self.subscriptions
+ .subscribe(subscription_id, pocket_filters)?;
+ }
+ return Ok(report);
}
- Ok(report)
+ self.query_req_with_group_auth_report(subscription_id, filters, auth)
}
fn query_req_with_group_auth_report(
@@ -1408,9 +1416,18 @@ impl BaseRelay {
auth: &GroupAuthContext,
) -> Vec<RelayMessage> {
let groups = self.groups.as_ref();
- self.subscriptions.fanout(event, auth, |event, auth| {
- Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false)
- })
+ let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket");
+ self.subscriptions
+ .fanout(&pocket_event, auth, |event, auth| {
+ Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false)
+ })
+ .expect("Pocket live fanout must match")
+ .into_iter()
+ .map(|subscription_id| RelayMessage::Event {
+ subscription_id,
+ event: event.clone(),
+ })
+ .collect()
}
pub fn active_subscription_count(&self) -> usize {
diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs
@@ -3,7 +3,8 @@
use crate::errors::BaseRelayError;
use std::collections::BTreeMap;
use tangle_groups::GroupAuthContext;
-use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId};
+use tangle_protocol::SubscriptionId;
+use tangle_store_pocket::{PocketEvent, PocketOwnedFilter};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LiveSubscriptionSet {
@@ -13,7 +14,7 @@ pub(crate) struct LiveSubscriptionSet {
#[derive(Debug, Clone, PartialEq, Eq)]
struct LiveSubscription {
- filters: Vec<Filter>,
+ filters: Vec<PocketOwnedFilter>,
}
impl LiveSubscriptionSet {
@@ -40,7 +41,7 @@ impl LiveSubscriptionSet {
pub(crate) fn subscribe(
&mut self,
subscription_id: SubscriptionId,
- filters: Vec<Filter>,
+ filters: Vec<PocketOwnedFilter>,
) -> Result<(), BaseRelayError> {
self.ensure_can_subscribe(&subscription_id, &filters)?;
self.subscriptions
@@ -51,7 +52,7 @@ impl LiveSubscriptionSet {
pub(crate) fn ensure_can_subscribe(
&self,
subscription_id: &SubscriptionId,
- filters: &[Filter],
+ filters: &[PocketOwnedFilter],
) -> Result<(), BaseRelayError> {
if filters.is_empty() {
return Err(BaseRelayError::invalid(
@@ -83,37 +84,31 @@ impl LiveSubscriptionSet {
}
pub(crate) fn fanout(
- &mut self,
- event: &Event,
+ &self,
+ event: &PocketEvent,
auth: &GroupAuthContext,
- visible_to_auth: impl Fn(&Event, &GroupAuthContext) -> bool,
- ) -> Vec<RelayMessage> {
- let matched = self
- .subscriptions
- .iter()
- .filter_map(|(subscription_id, subscription)| {
+ visible_to_auth: impl Fn(&PocketEvent, &GroupAuthContext) -> bool,
+ ) -> Result<Vec<SubscriptionId>, BaseRelayError> {
+ self.subscriptions.iter().try_fold(
+ Vec::new(),
+ |mut matched, (subscription_id, subscription)| {
if !subscription
.filters
.iter()
- .any(|filter| filter.matches(event))
+ .map(|filter| filter.event_matches(event))
+ .collect::<Result<Vec<_>, _>>()
+ .map_err(|error| BaseRelayError::error(error.to_string()))?
+ .into_iter()
+ .any(|matches| matches)
{
- return None;
+ return Ok(matched);
}
if visible_to_auth(event, auth) {
- Some(subscription_id.clone())
- } else {
- None
+ matched.push(subscription_id.clone());
}
- })
- .collect::<Vec<_>>();
- let mut messages = Vec::new();
- for subscription_id in matched {
- messages.push(RelayMessage::Event {
- subscription_id,
- event: event.clone(),
- });
- }
- messages
+ Ok(matched)
+ },
+ )
}
pub(crate) fn active_count(&self) -> usize {
@@ -131,7 +126,7 @@ pub enum CloseResult {
mod tests {
use super::{CloseResult, LiveSubscriptionSet};
use tangle_groups::GroupAuthContext;
- use tangle_protocol::{RelayMessage, SubscriptionId, filter_from_value};
+ use tangle_protocol::{SubscriptionId, filter_from_value};
use tangle_test_support::{FixtureKey, tangle_v2_event};
#[test]
@@ -141,7 +136,7 @@ mod tests {
subscriptions
.subscribe(
subscription_id.clone(),
- vec![filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter")],
+ vec![pocket_filter(serde_json::json!({"kinds":[1]}))],
)
.expect("subscribe");
let first = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "first")
@@ -153,25 +148,82 @@ mod tests {
assert!(matches!(
subscriptions
- .fanout(&first, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .fanout(&pocket_event(&first), &GroupAuthContext::unauthenticated(), |_, _| true)
+ .expect("fanout")
.as_slice(),
- [RelayMessage::Event { subscription_id: delivered, event }]
- if delivered == &subscription_id && event.id() == first.id()
+ [delivered] if delivered == &subscription_id
));
assert!(matches!(
subscriptions
- .fanout(&second, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .fanout(&pocket_event(&second), &GroupAuthContext::unauthenticated(), |_, _| true)
+ .expect("fanout")
.as_slice(),
- [RelayMessage::Event { subscription_id: delivered, event }]
- if delivered == &subscription_id && event.id() == second.id()
+ [delivered] if delivered == &subscription_id
));
assert!(matches!(
subscriptions
- .fanout(&third, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .fanout(&pocket_event(&third), &GroupAuthContext::unauthenticated(), |_, _| true)
+ .expect("fanout")
.as_slice(),
- [RelayMessage::Event { subscription_id: delivered, event }]
- if delivered == &subscription_id && event.id() == third.id()
+ [delivered] if delivered == &subscription_id
));
assert_eq!(subscriptions.close(&subscription_id), CloseResult::Closed);
}
+
+ #[test]
+ fn live_subscription_fanout_uses_pocket_filter_matching_and_auth_gate() {
+ let mut subscriptions = LiveSubscriptionSet::new(4, 4).expect("subscriptions");
+ let event = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_433,
+ 1,
+ vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")],
+ "first",
+ )
+ .expect("event");
+ let matched = SubscriptionId::new("matched").expect("subscription");
+ let mismatched = SubscriptionId::new("mismatched").expect("subscription");
+ subscriptions
+ .subscribe(
+ matched.clone(),
+ vec![pocket_filter(serde_json::json!({
+ "ids": [event.id().as_str()],
+ "authors": [event.unsigned().pubkey().as_str()],
+ "kinds": [1],
+ "#t": ["market"],
+ "since": 1_714_124_433,
+ "until": 1_714_124_434
+ }))],
+ )
+ .expect("matched subscribe");
+ subscriptions
+ .subscribe(
+ mismatched,
+ vec![pocket_filter(serde_json::json!({"kinds":[2]}))],
+ )
+ .expect("mismatched subscribe");
+ let event = pocket_event(&event);
+
+ assert_eq!(
+ subscriptions
+ .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .expect("fanout"),
+ vec![matched.clone()]
+ );
+ assert!(
+ subscriptions
+ .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| false)
+ .expect("auth gated fanout")
+ .is_empty()
+ );
+ }
+
+ fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter {
+ let filter = filter_from_value(&value).expect("filter");
+ crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter")
+ }
+
+ fn pocket_event(event: &tangle_protocol::Event) -> tangle_store_pocket::PocketOwnedEvent {
+ crate::pocket_conversion::tangle_event_to_pocket(event).expect("pocket event")
+ }
}
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -8,7 +8,7 @@ use crate::{
groups::GroupServiceHandle,
logging,
ops::{BaseRelayReadinessHandle, BaseRelayReadinessState},
- pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle},
+ pocket_conversion::pocket_filter_to_tangle,
pocket_event_validation::{pocket_event_id, pocket_event_kind, pocket_event_pubkey},
rate_limits::{
TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey,
@@ -1199,19 +1199,15 @@ impl TangleRuntimeHandle {
auth: &BaseAuthState,
) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?;
- let event = pocket_event_to_tangle(&pocket_event)?;
let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
- let messages = subscriptions.fanout(&event, &group_auth, |event, auth| {
+ let subscriptions = subscriptions.fanout(&pocket_event, &group_auth, |event, auth| {
BaseRelay::group_read_gate_visible_to_auth(self.inner.groups.as_ref(), event, auth)
.unwrap_or(false)
- });
- Ok(messages
+ })?;
+ Ok(subscriptions
.into_iter()
- .map(|message| match message {
- RelayMessage::Event {
- subscription_id, ..
- } => RuntimeRelayMessage::event(subscription_id, pocket_event.clone()),
- message => message.into(),
+ .map(|subscription_id| {
+ RuntimeRelayMessage::event(subscription_id, pocket_event.clone())
})
.collect())
}
@@ -2319,7 +2315,7 @@ mod tests {
subscriptions
.subscribe(
subscription_id.clone(),
- vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")],
+ vec![pocket_filter(json!({"kinds":[1]}))],
)
.expect("subscribe");
let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "live")
@@ -3600,13 +3596,10 @@ mod tests {
subscriptions
.subscribe(
subscription_id.clone(),
- vec![
- filter_from_value(&json!({
- "kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS],
- "#d":["RuntimeFarm"]
- }))
- .expect("filter"),
- ],
+ vec![pocket_filter(json!({
+ "kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS],
+ "#d":["RuntimeFarm"]
+ }))],
)
.expect("subscribe");
@@ -4356,8 +4349,7 @@ mod tests {
let public_subscription = SubscriptionId::new("public-stress-live").expect("subscription");
let mut member_subscriptions = LiveSubscriptionSet::new(32, 64).expect("member live set");
let mut public_subscriptions = LiveSubscriptionSet::new(32, 64).expect("public live set");
- let stress_filter =
- filter_from_value(&json!({"kinds":[1], "#h":["StressPrivate"]})).expect("filter");
+ let stress_filter = pocket_filter(json!({"kinds":[1], "#h":["StressPrivate"]}));
member_subscriptions
.subscribe(member_subscription.clone(), vec![stress_filter.clone()])
.expect("member subscribe");
@@ -4890,6 +4882,11 @@ mod tests {
.expect("limits")
}
+ fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter {
+ let filter = filter_from_value(&value).expect("filter");
+ crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter")
+ }
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id()))
}
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -279,11 +279,10 @@ impl TangleWebSocketSession {
filters,
search_present,
} => {
- self.handle_req(
- subscription_id,
- runtime_filters_to_protocol(filters, search_present)?,
- )
- .await
+ let protocol_filters =
+ runtime_filters_to_protocol(filters.clone(), search_present)?;
+ self.handle_req(subscription_id, protocol_filters, filters)
+ .await
}
RuntimeClientMessage::Count {
subscription_id,
@@ -333,6 +332,7 @@ impl TangleWebSocketSession {
&mut self,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
+ pocket_filters: Vec<PocketOwnedFilter>,
) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> {
let metrics = self.runtime.metrics();
metrics.record_client_message(TangleClientMessageMetricKind::Req);
@@ -359,7 +359,7 @@ impl TangleWebSocketSession {
let should_subscribe = !filters_are_complete(&filters);
if should_subscribe {
self.subscriptions
- .ensure_can_subscribe(&subscription_id, &filters)?;
+ .ensure_can_subscribe(&subscription_id, &pocket_filters)?;
}
let report = self
.runtime
@@ -369,7 +369,7 @@ impl TangleWebSocketSession {
let replies = report.into_messages();
if should_subscribe && !closes_subscription {
self.subscriptions
- .subscribe(subscription_id.clone(), filters)?;
+ .subscribe(subscription_id.clone(), pocket_filters)?;
metrics.record_subscription_opened();
logging::log_subscription_opened(self.connection_id, &subscription_id);
}
@@ -1043,13 +1043,12 @@ mod tests {
session.handle_event_receive_result(offset).await,
TangleSessionControl::Continue
);
- assert_eq!(
- take_outbound_text(&mut session),
+ assert_relay_message_text(
+ &take_outbound_text(&mut session),
RelayMessage::Event {
subscription_id,
- event: after_auth
- }
- .encode()
+ event: after_auth,
+ },
);
let _ = std::fs::remove_dir_all(root);
@@ -1547,13 +1546,12 @@ mod tests {
session.handle_event_receive_result(offset).await,
TangleSessionControl::Continue
);
- assert_eq!(
- take_outbound_text(&mut session),
+ assert_relay_message_text(
+ &take_outbound_text(&mut session),
RelayMessage::Event {
subscription_id: subscription_id.clone(),
- event
- }
- .encode()
+ event,
+ },
);
assert_eq!(session.active_subscription_count(), 1);
}
@@ -1691,6 +1689,14 @@ mod tests {
text.to_string()
}
+ fn assert_relay_message_text(actual: &str, expected: RelayMessage) {
+ assert_eq!(
+ serde_json::from_str::<serde_json::Value>(actual).expect("actual relay JSON"),
+ serde_json::from_str::<serde_json::Value>(&expected.encode())
+ .expect("expected relay JSON")
+ );
+ }
+
fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig {
runtime_config_with_outbound_queue(root, 8)
}