commit 6da9f18bbfb92c876371e3b9bf6ee74f15243a33
parent 8302eb065eacdf3183079ff81e3756a1a8d14b99
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 01:57:17 -0700
runtime: extract live subscription module
- move live subscription state into relay::live
- keep CloseResult with the subscription lifecycle boundary
- preserve group visibility checks through an explicit fanout callback
- verify formatting, runtime checks, fanout tests, and benchmark compile checks
Diffstat:
4 files changed, 181 insertions(+), 129 deletions(-)
diff --git a/crates/tangle_runtime/src/base_relay.rs b/crates/tangle_runtime/src/base_relay.rs
@@ -1,7 +1,10 @@
use crate::errors::{BaseRelayError, ok_accepted, ok_rejected};
use crate::ops::BaseRelayReadinessState;
-use crate::relay::auth::BaseAuthState;
-use std::{collections::BTreeMap, collections::BTreeSet, str};
+use crate::relay::{
+ auth::BaseAuthState,
+ live::{CloseResult, LiveSubscriptionSet},
+};
+use std::{collections::BTreeSet, str};
use tangle_crypto::{RelaySigner, verify_event_signature};
use tangle_groups::{
GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind, GroupEventClass,
@@ -302,7 +305,12 @@ impl BaseRelay {
}
pub fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> {
- self.subscriptions.fanout(event, self.groups.as_ref())
+ let groups = self.groups.as_ref();
+ self.subscriptions.fanout(event, |event, auth| {
+ groups
+ .map(|groups| groups.event_visible_to_auth(event, auth).unwrap_or(false))
+ .unwrap_or(true)
+ })
}
pub fn mark_delivered(&mut self, subscription_id: &SubscriptionId) {
@@ -816,129 +824,6 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> {
))
}
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct LiveSubscriptionSet {
- subscriptions: BTreeMap<SubscriptionId, LiveSubscription>,
- pending: BTreeMap<SubscriptionId, usize>,
- max_pending_events: usize,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-struct LiveSubscription {
- filters: Vec<Filter>,
- auth: GroupAuthContext,
-}
-
-impl LiveSubscriptionSet {
- pub fn new(max_pending_events: usize) -> Result<Self, BaseRelayError> {
- if max_pending_events == 0 {
- return Err(BaseRelayError::invalid(
- "live subscription pending event limit must be greater than zero",
- ));
- }
- Ok(Self {
- subscriptions: BTreeMap::new(),
- pending: BTreeMap::new(),
- max_pending_events,
- })
- }
-
- pub fn subscribe(
- &mut self,
- subscription_id: SubscriptionId,
- filters: Vec<Filter>,
- auth: GroupAuthContext,
- ) -> Result<(), BaseRelayError> {
- if filters.is_empty() {
- return Err(BaseRelayError::invalid(
- "subscription must include at least one filter",
- ));
- }
- self.subscriptions
- .insert(subscription_id.clone(), LiveSubscription { filters, auth });
- self.pending.insert(subscription_id, 0);
- Ok(())
- }
-
- pub fn close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
- self.pending.remove(subscription_id);
- if self.subscriptions.remove(subscription_id).is_some() {
- CloseResult::Closed
- } else {
- CloseResult::NotFound
- }
- }
-
- pub fn close_all(&mut self) -> usize {
- let closed = self.subscriptions.len();
- self.subscriptions.clear();
- self.pending.clear();
- closed
- }
-
- fn fanout(&mut self, event: &Event, groups: Option<&GroupService>) -> Vec<RelayMessage> {
- let matched = self
- .subscriptions
- .iter()
- .filter_map(|(subscription_id, subscription)| {
- if !subscription
- .filters
- .iter()
- .any(|filter| filter.matches(event))
- {
- return None;
- }
- if groups
- .map(|groups| {
- groups
- .event_visible_to_auth(event, &subscription.auth)
- .unwrap_or(false)
- })
- .unwrap_or(true)
- {
- Some(subscription_id.clone())
- } else {
- None
- }
- })
- .collect::<Vec<_>>();
- let mut messages = Vec::new();
- for subscription_id in matched {
- let pending = self.pending.entry(subscription_id.clone()).or_insert(0);
- *pending += 1;
- if *pending > self.max_pending_events {
- self.close(&subscription_id);
- messages.push(RelayMessage::Closed {
- subscription_id,
- message: "error: subscription lagged; resync required".to_owned(),
- });
- } else {
- messages.push(RelayMessage::Event {
- subscription_id,
- event: event.clone(),
- });
- }
- }
- messages
- }
-
- pub fn mark_delivered(&mut self, subscription_id: &SubscriptionId) {
- if let Some(pending) = self.pending.get_mut(subscription_id) {
- *pending = 0;
- }
- }
-
- pub fn active_count(&self) -> usize {
- self.subscriptions.len()
- }
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum CloseResult {
- Closed,
- NotFound,
-}
-
fn tangle_event_to_pocket(event: &Event) -> Result<PocketOwnedEvent, BaseRelayError> {
let raw = event_to_value(event).to_string();
parse_pocket_event_json(raw.as_bytes()).map_err(BaseRelayError::from)
@@ -966,8 +851,9 @@ fn pocket_event_id(event_id: &EventId) -> Result<PocketEventId, BaseRelayError>
#[cfg(test)]
mod tests {
- use super::{BaseRelay, CloseResult};
+ use super::BaseRelay;
use crate::relay::auth::BaseAuthState;
+ use crate::relay::live::CloseResult;
use tangle_crypto::RelaySigner;
use tangle_groups::{
GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_CREATE_INVITE,
diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs
@@ -0,0 +1,165 @@
+#![forbid(unsafe_code)]
+
+use crate::errors::BaseRelayError;
+use std::collections::BTreeMap;
+use tangle_groups::GroupAuthContext;
+use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) struct LiveSubscriptionSet {
+ subscriptions: BTreeMap<SubscriptionId, LiveSubscription>,
+ pending: BTreeMap<SubscriptionId, usize>,
+ max_pending_events: usize,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct LiveSubscription {
+ filters: Vec<Filter>,
+ auth: GroupAuthContext,
+}
+
+impl LiveSubscriptionSet {
+ pub(crate) fn new(max_pending_events: usize) -> Result<Self, BaseRelayError> {
+ if max_pending_events == 0 {
+ return Err(BaseRelayError::invalid(
+ "live subscription pending event limit must be greater than zero",
+ ));
+ }
+ Ok(Self {
+ subscriptions: BTreeMap::new(),
+ pending: BTreeMap::new(),
+ max_pending_events,
+ })
+ }
+
+ pub(crate) fn subscribe(
+ &mut self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: GroupAuthContext,
+ ) -> Result<(), BaseRelayError> {
+ if filters.is_empty() {
+ return Err(BaseRelayError::invalid(
+ "subscription must include at least one filter",
+ ));
+ }
+ self.subscriptions
+ .insert(subscription_id.clone(), LiveSubscription { filters, auth });
+ self.pending.insert(subscription_id, 0);
+ Ok(())
+ }
+
+ pub(crate) fn close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
+ self.pending.remove(subscription_id);
+ if self.subscriptions.remove(subscription_id).is_some() {
+ CloseResult::Closed
+ } else {
+ CloseResult::NotFound
+ }
+ }
+
+ pub(crate) fn close_all(&mut self) -> usize {
+ let closed = self.subscriptions.len();
+ self.subscriptions.clear();
+ self.pending.clear();
+ closed
+ }
+
+ pub(crate) fn fanout(
+ &mut self,
+ event: &Event,
+ visible_to_auth: impl Fn(&Event, &GroupAuthContext) -> bool,
+ ) -> Vec<RelayMessage> {
+ let matched = self
+ .subscriptions
+ .iter()
+ .filter_map(|(subscription_id, subscription)| {
+ if !subscription
+ .filters
+ .iter()
+ .any(|filter| filter.matches(event))
+ {
+ return None;
+ }
+ if visible_to_auth(event, &subscription.auth) {
+ Some(subscription_id.clone())
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+ let mut messages = Vec::new();
+ for subscription_id in matched {
+ let pending = self.pending.entry(subscription_id.clone()).or_insert(0);
+ *pending += 1;
+ if *pending > self.max_pending_events {
+ self.close(&subscription_id);
+ messages.push(RelayMessage::Closed {
+ subscription_id,
+ message: "error: subscription lagged; resync required".to_owned(),
+ });
+ } else {
+ messages.push(RelayMessage::Event {
+ subscription_id,
+ event: event.clone(),
+ });
+ }
+ }
+ messages
+ }
+
+ pub(crate) fn mark_delivered(&mut self, subscription_id: &SubscriptionId) {
+ if let Some(pending) = self.pending.get_mut(subscription_id) {
+ *pending = 0;
+ }
+ }
+
+ pub(crate) fn active_count(&self) -> usize {
+ self.subscriptions.len()
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CloseResult {
+ Closed,
+ NotFound,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{CloseResult, LiveSubscriptionSet};
+ use tangle_groups::GroupAuthContext;
+ use tangle_protocol::{RelayMessage, SubscriptionId, filter_from_value};
+ use tangle_test_support::{FixtureKey, tangle_v2_event};
+
+ #[test]
+ fn live_subscription_fanout_closes_lagged_subscriptions() {
+ let mut subscriptions = LiveSubscriptionSet::new(1).expect("subscriptions");
+ let subscription_id = SubscriptionId::new("live").expect("subscription");
+ subscriptions
+ .subscribe(
+ subscription_id.clone(),
+ vec![filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter")],
+ GroupAuthContext::unauthenticated(),
+ )
+ .expect("subscribe");
+ let first = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "first")
+ .expect("first");
+ let second = tangle_v2_event(FixtureKey::Member, 1_714_124_434, 1, Vec::new(), "second")
+ .expect("second");
+
+ assert!(matches!(
+ subscriptions.fanout(&first, |_, _| true).as_slice(),
+ [RelayMessage::Event { subscription_id: delivered, event }]
+ if delivered == &subscription_id && event.id() == first.id()
+ ));
+ assert_eq!(
+ subscriptions.fanout(&second, |_, _| true),
+ vec![RelayMessage::Closed {
+ subscription_id: subscription_id.clone(),
+ message: "error: subscription lagged; resync required".to_owned()
+ }]
+ );
+ assert_eq!(subscriptions.close(&subscription_id), CloseResult::NotFound);
+ }
+}
diff --git a/crates/tangle_runtime/src/relay/mod.rs b/crates/tangle_runtime/src/relay/mod.rs
@@ -1,3 +1,4 @@
#![forbid(unsafe_code)]
pub mod auth;
+pub mod live;
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -11,9 +11,9 @@ use tangle_protocol::{
filter_from_value, parse_client_message, parse_event_json,
};
use tangle_runtime::{
- base_relay::{BaseRelay, CloseResult},
+ base_relay::BaseRelay,
nip11::{BASE_RELAY_SUPPORTED_NIPS, BaseRelayInfoConfig},
- relay::auth::BaseAuthState,
+ relay::{auth::BaseAuthState, live::CloseResult},
};
use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy};
use tangle_test_support::{