commit b20b6ed9b7f745db648a8b91d861e2b1083ec7f3
parent 0fd3dc76a18b51e49c3b258bfd63f74b162d59b2
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 22:10:44 -0700
core: add subscription manager
Diffstat:
1 file changed, 248 insertions(+), 4 deletions(-)
diff --git a/crates/tangle_core/src/lib.rs b/crates/tangle_core/src/lib.rs
@@ -8,7 +8,9 @@ use tangle_nips::{
evaluate_listing_projection, parse_deletion_request, parse_nip50_filter_search,
parse_relay_auth_event,
};
-use tangle_protocol::{Event, EventId, Filter, PublicKeyHex, UnixTimestamp, event_to_value};
+use tangle_protocol::{
+ Event, EventId, Filter, PublicKeyHex, SubscriptionId, UnixTimestamp, event_to_value,
+};
use tangle_store::{
DeletionMarker, DeletionMarkerRepository, ListingProjectionRepository, RawEventRepository,
RepositoryError, StoreEventOutcome, StoreProjectionOutcome, StoredEvent,
@@ -2344,6 +2346,132 @@ impl SubscriptionMatch {
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SubscriptionManager {
+ limits: RuntimeLimits,
+ matcher: SubscriptionMatcher,
+ subscriptions: BTreeMap<SubscriptionId, QueryPlan>,
+}
+
+impl SubscriptionManager {
+ pub fn new(limits: RuntimeLimits, matcher: SubscriptionMatcher) -> Self {
+ Self {
+ limits,
+ matcher,
+ subscriptions: BTreeMap::new(),
+ }
+ }
+
+ pub fn limits(&self) -> RuntimeLimits {
+ self.limits
+ }
+
+ pub fn matcher(&self) -> SubscriptionMatcher {
+ self.matcher
+ }
+
+ pub fn active_count(&self) -> usize {
+ self.subscriptions.len()
+ }
+
+ pub fn plan(&self, subscription_id: &SubscriptionId) -> Option<&QueryPlan> {
+ self.subscriptions.get(subscription_id)
+ }
+
+ pub fn subscribe(
+ &mut self,
+ subscription_id: SubscriptionId,
+ plan: QueryPlan,
+ ) -> Result<SubscriptionAddOutcome, SubscriptionManagerError> {
+ let replacing = self.subscriptions.contains_key(&subscription_id);
+ let active_count = self.subscriptions.len() + usize::from(!replacing);
+ self.limits
+ .validate_subscription_count(active_count as u64)
+ .map_err(SubscriptionManagerError::RuntimeLimit)?;
+ let outcome = if replacing {
+ SubscriptionAddOutcome::Replaced
+ } else {
+ SubscriptionAddOutcome::Inserted
+ };
+ self.subscriptions.insert(subscription_id, plan);
+ Ok(outcome)
+ }
+
+ pub fn close(&mut self, subscription_id: &SubscriptionId) -> SubscriptionCloseOutcome {
+ match self.subscriptions.remove(subscription_id) {
+ Some(_) => SubscriptionCloseOutcome::Closed,
+ None => SubscriptionCloseOutcome::NotFound,
+ }
+ }
+
+ pub fn match_event(&self, event: &Event) -> Vec<SubscriptionEventMatch> {
+ self.subscriptions
+ .iter()
+ .filter_map(|(subscription_id, plan)| {
+ let subscription_match = self.matcher.match_event(plan, event);
+ subscription_match
+ .matched()
+ .then(|| SubscriptionEventMatch {
+ subscription_id: subscription_id.clone(),
+ branch_indexes: subscription_match.branch_indexes().to_vec(),
+ })
+ })
+ .collect()
+ }
+}
+
+impl Default for SubscriptionManager {
+ fn default() -> Self {
+ Self::new(RuntimeLimits::default(), SubscriptionMatcher::default())
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SubscriptionAddOutcome {
+ Inserted,
+ Replaced,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SubscriptionCloseOutcome {
+ Closed,
+ NotFound,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SubscriptionEventMatch {
+ pub subscription_id: SubscriptionId,
+ pub branch_indexes: Vec<usize>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum SubscriptionManagerError {
+ RuntimeLimit(RuntimeLimitViolation),
+}
+
+impl SubscriptionManagerError {
+ pub fn kind(&self) -> SubscriptionManagerErrorKind {
+ match self {
+ Self::RuntimeLimit(_) => SubscriptionManagerErrorKind::RuntimeLimit,
+ }
+ }
+}
+
+impl fmt::Display for SubscriptionManagerError {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::RuntimeLimit(violation) => write!(formatter, "runtime limit: {violation}"),
+ }
+ }
+}
+
+impl std::error::Error for SubscriptionManagerError {}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SubscriptionManagerErrorKind {
+ RuntimeLimit,
+}
+
fn compile_filter_branch(filter: &Filter) -> Result<QueryPlanBranch, NostrFilterCompileError> {
let tag_filters =
compile_filter_tag_constraints(filter).map_err(NostrFilterCompileError::QueryPlan)?;
@@ -2567,15 +2695,17 @@ mod tests {
NostrFilterCompiler, ProjectionExclusionReason, QueryExecutionMode, QueryPlan,
QueryPlanBranch, QueryPlanBranchSpec, QueryPlanError, QuerySearch, QuerySort, QuerySource,
QueryTagFilter, RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues,
- RuntimeLimits, SubscriptionMatch, SubscriptionMatcher, UnapprovedSellerAction,
+ RuntimeLimits, SubscriptionAddOutcome, SubscriptionCloseOutcome, SubscriptionManager,
+ SubscriptionManagerErrorKind, SubscriptionMatch, SubscriptionMatcher,
+ UnapprovedSellerAction,
};
use tangle_nips::{
FulfillmentMethod, ListingProjection, ListingUnit, evaluate_listing_projection,
parse_deletion_request,
};
use tangle_protocol::{
- AddressCoordinate, Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp,
- UnsignedEvent, filter_from_value,
+ AddressCoordinate, Event, EventId, Kind, PublicKeyHex, SignatureHex, SubscriptionId, Tag,
+ UnixTimestamp, UnsignedEvent, filter_from_value,
};
use tangle_store::{
DeletionMarker, DeletionMarkerRepository, ListingProjectionRepository, RawEventRepository,
@@ -4899,6 +5029,120 @@ mod tests {
);
}
+ #[test]
+ fn subscription_manager_inserts_replaces_closes_and_fans_out() {
+ let event = event_with(
+ vec![Tag::new(vec!["t".to_owned(), "carrots".to_owned()]).expect("tag")],
+ "Sweet storage carrots.",
+ UnixTimestamp::new(100),
+ );
+ let matching_branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec {
+ tag_filters: vec![QueryTagFilter::new('t', vec!["carrots".to_owned()]).expect("tag")],
+ search: Some(QuerySearch::new("carrots", vec!["carrots".to_owned()]).expect("search")),
+ ..QueryPlanBranchSpec::default()
+ })
+ .expect("matching");
+ let no_match_branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec {
+ ids: vec![EventId::new(&"c".repeat(EventId::HEX_LENGTH)).expect("id")],
+ ..QueryPlanBranchSpec::default()
+ })
+ .expect("no match");
+ let matching_plan = QueryPlan::new(
+ QuerySource::SearchDocuments,
+ QueryExecutionMode::Live,
+ QuerySort::ScoreDescCreatedAtDescEventIdAsc,
+ vec![matching_branch],
+ )
+ .expect("matching plan");
+ let no_match_plan = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::Live,
+ QuerySort::CreatedAtDescEventIdAsc,
+ vec![no_match_branch],
+ )
+ .expect("no match plan");
+ let mut manager = SubscriptionManager::default();
+ let id_a = SubscriptionId::new("a").expect("id");
+ let id_b = SubscriptionId::new("b").expect("id");
+
+ assert_eq!(manager.limits(), RuntimeLimits::default());
+ assert_eq!(manager.matcher(), SubscriptionMatcher::default());
+ assert_eq!(
+ manager.subscribe(id_b.clone(), matching_plan.clone()),
+ Ok(SubscriptionAddOutcome::Inserted)
+ );
+ assert_eq!(
+ manager.subscribe(id_a.clone(), no_match_plan),
+ Ok(SubscriptionAddOutcome::Inserted)
+ );
+ assert_eq!(manager.active_count(), 2);
+ assert!(manager.plan(&id_a).is_some());
+ assert_eq!(manager.match_event(&event).len(), 1);
+ assert_eq!(manager.match_event(&event)[0].subscription_id, id_b);
+ assert_eq!(manager.match_event(&event)[0].branch_indexes, [0]);
+ assert_eq!(
+ manager.subscribe(id_a.clone(), matching_plan),
+ Ok(SubscriptionAddOutcome::Replaced)
+ );
+ let matches = manager.match_event(&event);
+
+ assert_eq!(matches.len(), 2);
+ assert_eq!(matches[0].subscription_id, id_a);
+ assert_eq!(matches[0].branch_indexes, [0]);
+ assert_eq!(
+ matches[1].subscription_id,
+ SubscriptionId::new("b").expect("id")
+ );
+ assert_eq!(
+ manager.close(&SubscriptionId::new("b").expect("id")),
+ SubscriptionCloseOutcome::Closed
+ );
+ assert_eq!(
+ manager.close(&SubscriptionId::new("b").expect("id")),
+ SubscriptionCloseOutcome::NotFound
+ );
+ assert_eq!(manager.active_count(), 1);
+ assert!(
+ manager
+ .plan(&SubscriptionId::new("b").expect("id"))
+ .is_none()
+ );
+ }
+
+ #[test]
+ fn subscription_manager_enforces_subscription_count_limits() {
+ let branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec::default()).expect("branch");
+ let plan = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::Live,
+ QuerySort::CreatedAtDescEventIdAsc,
+ vec![branch],
+ )
+ .expect("plan");
+ let mut manager = SubscriptionManager::new(
+ limits_with(|values| values.max_subscriptions_per_connection = 1),
+ SubscriptionMatcher::new(LiveSearchPolicy::DisabledLiveSearch),
+ );
+ let id_a = SubscriptionId::new("a").expect("id");
+ let id_b = SubscriptionId::new("b").expect("id");
+
+ assert_eq!(
+ manager.subscribe(id_a.clone(), plan.clone()),
+ Ok(SubscriptionAddOutcome::Inserted)
+ );
+ assert_eq!(
+ manager.subscribe(id_a, plan.clone()),
+ Ok(SubscriptionAddOutcome::Replaced)
+ );
+ let too_many = manager.subscribe(id_b, plan).expect_err("limit");
+
+ assert_eq!(too_many.kind(), SubscriptionManagerErrorKind::RuntimeLimit);
+ assert_eq!(
+ too_many.to_string(),
+ "runtime limit: subscriptions per connection exceeded: 2 > 1"
+ );
+ }
+
fn limits_with(update: impl FnOnce(&mut RuntimeLimitValues)) -> RuntimeLimits {
let mut values = RuntimeLimitValues::default();
update(&mut values);