tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 0fd3dc76a18b51e49c3b258bfd63f74b162d59b2
parent b3733c51fc4686d929a1e6a370b818387a048a0a
Author: triesap <tyson@radroots.org>
Date:   Fri,  5 Jun 2026 22:08:54 -0700

core: add subscription matcher

Diffstat:
Mcrates/tangle_core/src/lib.rs | 322++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 319 insertions(+), 3 deletions(-)

diff --git a/crates/tangle_core/src/lib.rs b/crates/tangle_core/src/lib.rs @@ -2266,6 +2266,84 @@ pub enum Nip50QueryCompileErrorKind { MissingSearchTerms, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SubscriptionMatcher { + live_search_policy: LiveSearchPolicy, +} + +impl SubscriptionMatcher { + pub fn new(live_search_policy: LiveSearchPolicy) -> Self { + Self { live_search_policy } + } + + pub fn live_search_policy(self) -> LiveSearchPolicy { + self.live_search_policy + } + + pub fn match_event(&self, plan: &QueryPlan, event: &Event) -> SubscriptionMatch { + if !plan.subscribes_to_live_events() { + return SubscriptionMatch::empty(); + } + let branch_indexes = plan + .branches() + .iter() + .enumerate() + .filter_map(|(index, branch)| { + branch_matches_event(branch, event, self.live_search_policy).then_some(index) + }) + .collect(); + SubscriptionMatch { branch_indexes } + } +} + +impl Default for SubscriptionMatcher { + fn default() -> Self { + Self::new(LiveSearchPolicy::BestEffortTokenMatch) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LiveSearchPolicy { + BestEffortTokenMatch, + DisabledLiveSearch, +} + +impl LiveSearchPolicy { + pub fn as_str(self) -> &'static str { + match self { + Self::BestEffortTokenMatch => "best_effort_token_match", + Self::DisabledLiveSearch => "disabled_live_search", + } + } +} + +impl fmt::Display for LiveSearchPolicy { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SubscriptionMatch { + branch_indexes: Vec<usize>, +} + +impl SubscriptionMatch { + pub fn empty() -> Self { + Self { + branch_indexes: Vec::new(), + } + } + + pub fn matched(&self) -> bool { + !self.branch_indexes.is_empty() + } + + pub fn branch_indexes(&self) -> &[usize] { + &self.branch_indexes + } +} + fn compile_filter_branch(filter: &Filter) -> Result<QueryPlanBranch, NostrFilterCompileError> { let tag_filters = compile_filter_tag_constraints(filter).map_err(NostrFilterCompileError::QueryPlan)?; @@ -2327,6 +2405,93 @@ fn compile_nip50_filter_branch( .map_err(Nip50QueryCompileError::QueryPlan) } +fn branch_matches_event( + branch: &QueryPlanBranch, + event: &Event, + live_search_policy: LiveSearchPolicy, +) -> bool { + if !branch.ids().is_empty() && !branch.ids().iter().any(|id| id == event.id()) { + return false; + } + if !branch.authors().is_empty() + && !branch + .authors() + .iter() + .any(|author| author == event.unsigned().pubkey()) + { + return false; + } + if !branch.kinds().is_empty() + && !branch + .kinds() + .iter() + .any(|kind| *kind == event.unsigned().kind()) + { + return false; + } + if let Some(since) = branch.since() + && event.unsigned().created_at() < since + { + return false; + } + if let Some(until) = branch.until() + && event.unsigned().created_at() > until + { + return false; + } + for (name, values) in branch.tag_filters() { + let matched = event.unsigned().tags().iter().any(|tag| { + tag.indexed_pair().is_some_and(|(tag_name, tag_value)| { + tag_name == name.to_string() && values.iter().any(|value| value == tag_value) + }) + }); + if !matched { + return false; + } + } + match branch.search() { + Some(search) => live_search_matches(search, event, live_search_policy), + None => true, + } +} + +fn live_search_matches( + search: &QuerySearch, + event: &Event, + live_search_policy: LiveSearchPolicy, +) -> bool { + match live_search_policy { + LiveSearchPolicy::DisabledLiveSearch => false, + LiveSearchPolicy::BestEffortTokenMatch => { + let tokens = event_search_tokens(event); + search + .terms() + .iter() + .all(|term| tokens.contains(&term.to_ascii_lowercase())) + } + } +} + +fn event_search_tokens(event: &Event) -> BTreeSet<String> { + let mut tokens = BTreeSet::new(); + collect_search_tokens(event.unsigned().content(), &mut tokens); + for tag in event.unsigned().tags() { + for value in tag.values() { + collect_search_tokens(value, &mut tokens); + } + } + tokens +} + +fn collect_search_tokens(value: &str, tokens: &mut BTreeSet<String>) { + tokens.extend( + value + .split(|character: char| !character.is_ascii_alphanumeric()) + .filter(|term| !term.is_empty()) + .map(|term| term.to_ascii_lowercase()), + ); +} + fn compile_filter_tag_constraints(filter: &Filter) -> Result<Vec<QueryTagFilter>, QueryPlanError> { filter .tag_filters() @@ -2395,14 +2560,14 @@ mod tests { AdmissionContext, AdmissionEffect, AdmissionEvent, AdmissionEventKind, AdmissionPolicy, AdmissionRejectionKind, EventIngestionEffect, EventIngestionRejectionKind, EventIngestor, EventParser, EventValidationRejection, EventValidationRejectionKind, EventValidator, - MarketplaceCursor, MarketplaceCursorSpec, MarketplaceDecimal, MarketplaceGeoPoint, - MarketplaceListingStatus, MarketplaceLocationFilter, MarketplaceQuery, + LiveSearchPolicy, MarketplaceCursor, MarketplaceCursorSpec, MarketplaceDecimal, + MarketplaceGeoPoint, MarketplaceListingStatus, MarketplaceLocationFilter, MarketplaceQuery, MarketplaceQueryErrorKind, MarketplaceQuerySpec, MarketplaceSort, Nip50QueryCompileErrorKind, Nip50QueryCompiler, NostrFilterCompileErrorKind, NostrFilterCompiler, ProjectionExclusionReason, QueryExecutionMode, QueryPlan, QueryPlanBranch, QueryPlanBranchSpec, QueryPlanError, QuerySearch, QuerySort, QuerySource, QueryTagFilter, RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues, - RuntimeLimits, UnapprovedSellerAction, + RuntimeLimits, SubscriptionMatch, SubscriptionMatcher, UnapprovedSellerAction, }; use tangle_nips::{ FulfillmentMethod, ListingProjection, ListingUnit, evaluate_listing_projection, @@ -4583,6 +4748,157 @@ mod tests { ); } + #[test] + fn subscription_matcher_matches_live_query_plan_branches() { + let event = event_with( + vec![ + Tag::new(vec!["t".to_owned(), "carrots".to_owned()]).expect("tag"), + Tag::new(vec!["title".to_owned(), "Sweet carrots".to_owned()]).expect("tag"), + ], + "Sweet storage carrots.", + UnixTimestamp::new(100), + ); + let matching_branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + ids: vec![event.id().clone()], + authors: vec![event.unsigned().pubkey().clone()], + kinds: vec![event.unsigned().kind()], + tag_filters: vec![QueryTagFilter::new('t', vec!["carrots".to_owned()]).expect("tag")], + since: Some(UnixTimestamp::new(99)), + until: Some(UnixTimestamp::new(101)), + search: Some( + QuerySearch::new( + "sweet carrots", + vec!["sweet".to_owned(), "carrots".to_owned()], + ) + .expect("search"), + ), + ..QueryPlanBranchSpec::default() + }) + .expect("matching"); + let id_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + ids: vec![EventId::new(&"c".repeat(EventId::HEX_LENGTH)).expect("id")], + ..QueryPlanBranchSpec::default() + }) + .expect("id"); + let author_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + authors: vec![pubkey("2")], + ..QueryPlanBranchSpec::default() + }) + .expect("author"); + let kind_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + kinds: vec![Kind::new(1).expect("kind")], + ..QueryPlanBranchSpec::default() + }) + .expect("kind"); + let since_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + since: Some(UnixTimestamp::new(101)), + ..QueryPlanBranchSpec::default() + }) + .expect("since"); + let until_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + until: Some(UnixTimestamp::new(99)), + ..QueryPlanBranchSpec::default() + }) + .expect("until"); + let tag_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + tag_filters: vec![QueryTagFilter::new('t', vec!["greens".to_owned()]).expect("tag")], + ..QueryPlanBranchSpec::default() + }) + .expect("tag"); + let search_miss = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + search: Some(QuerySearch::new("missing", vec!["missing".to_owned()]).expect("search")), + ..QueryPlanBranchSpec::default() + }) + .expect("search"); + let no_search_match = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + limit: Some(0), + ..QueryPlanBranchSpec::default() + }) + .expect("no search"); + let plan = QueryPlan::new( + QuerySource::SearchDocuments, + QueryExecutionMode::HistoricalThenLive, + QuerySort::ScoreDescCreatedAtDescEventIdAsc, + vec![ + id_miss, + author_miss, + kind_miss, + since_miss, + until_miss, + tag_miss, + search_miss, + matching_branch, + no_search_match, + ], + ) + .expect("plan"); + let matcher = SubscriptionMatcher::default(); + let matched = matcher.match_event(&plan, &event); + + assert_eq!( + matcher.live_search_policy(), + LiveSearchPolicy::BestEffortTokenMatch + ); + assert!(matched.matched()); + assert_eq!(matched.branch_indexes(), &[7, 8]); + } + + #[test] + fn subscription_matcher_respects_historical_mode_and_live_search_policy() { + let event = event_with( + vec![Tag::new(vec!["t".to_owned(), "carrots".to_owned()]).expect("tag")], + "Sweet storage carrots.", + UnixTimestamp::new(100), + ); + let search_branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec { + search: Some( + QuerySearch::new( + "storage carrots", + vec!["storage".to_owned(), "carrots".to_owned()], + ) + .expect("search"), + ), + ..QueryPlanBranchSpec::default() + }) + .expect("search branch"); + let historical = QueryPlan::new( + QuerySource::RawEvents, + QueryExecutionMode::Historical, + QuerySort::CreatedAtDescEventIdAsc, + vec![search_branch.clone()], + ) + .expect("historical"); + let live_search = QueryPlan::new( + QuerySource::SearchDocuments, + QueryExecutionMode::Live, + QuerySort::ScoreDescCreatedAtDescEventIdAsc, + vec![search_branch], + ) + .expect("live search"); + let disabled = SubscriptionMatcher::new(LiveSearchPolicy::DisabledLiveSearch); + let historical_match = SubscriptionMatcher::default().match_event(&historical, &event); + let disabled_match = disabled.match_event(&live_search, &event); + let empty = SubscriptionMatch::empty(); + + assert_eq!( + disabled.live_search_policy(), + LiveSearchPolicy::DisabledLiveSearch + ); + assert!(!historical_match.matched()); + assert_eq!(historical_match.branch_indexes(), &[] as &[usize]); + assert!(!disabled_match.matched()); + assert!(!empty.matched()); + assert_eq!(empty.branch_indexes(), &[] as &[usize]); + assert_eq!( + LiveSearchPolicy::BestEffortTokenMatch.as_str(), + "best_effort_token_match" + ); + assert_eq!( + LiveSearchPolicy::DisabledLiveSearch.to_string(), + "disabled_live_search" + ); + } + fn limits_with(update: impl FnOnce(&mut RuntimeLimitValues)) -> RuntimeLimits { let mut values = RuntimeLimitValues::default(); update(&mut values);