commit 97682a264ca1929fde4dbc9e7ca664960e723d0e
parent ec20604b94025d5a539df2f93c2277c335a44afb
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 21:47:41 -0700
core: add query plan model
Diffstat:
1 file changed, 552 insertions(+), 3 deletions(-)
diff --git a/crates/tangle_core/src/lib.rs b/crates/tangle_core/src/lib.rs
@@ -1,7 +1,7 @@
#![forbid(unsafe_code)]
use core::fmt;
-use std::collections::BTreeSet;
+use std::collections::{BTreeMap, BTreeSet};
use tangle_crypto::verify_event_signature;
use tangle_nips::{
DeletionRequest, ListingProjectionEvaluation, RelayAuthEvent, evaluate_listing_projection,
@@ -1227,6 +1227,335 @@ where
Ok(request.targets().len())
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct QueryPlan {
+ source: QuerySource,
+ mode: QueryExecutionMode,
+ sort: QuerySort,
+ branches: Vec<QueryPlanBranch>,
+}
+
+impl QueryPlan {
+ pub fn new(
+ source: QuerySource,
+ mode: QueryExecutionMode,
+ sort: QuerySort,
+ branches: Vec<QueryPlanBranch>,
+ ) -> Result<Self, QueryPlanError> {
+ if branches.is_empty() {
+ return Err(QueryPlanError::EmptyBranches);
+ }
+ Ok(Self {
+ source,
+ mode,
+ sort,
+ branches,
+ })
+ }
+
+ pub fn source(&self) -> QuerySource {
+ self.source
+ }
+
+ pub fn mode(&self) -> QueryExecutionMode {
+ self.mode
+ }
+
+ pub fn sort(&self) -> QuerySort {
+ self.sort
+ }
+
+ pub fn branches(&self) -> &[QueryPlanBranch] {
+ &self.branches
+ }
+
+ pub fn requires_historical_query(&self) -> bool {
+ self.mode != QueryExecutionMode::Live
+ && self.branches.iter().any(|branch| branch.limit() != Some(0))
+ }
+
+ pub fn subscribes_to_live_events(&self) -> bool {
+ self.mode != QueryExecutionMode::Historical
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct QueryPlanBranch {
+ ids: Vec<EventId>,
+ authors: Vec<PublicKeyHex>,
+ kinds: Vec<tangle_protocol::Kind>,
+ tag_filters: BTreeMap<char, Vec<String>>,
+ since: Option<UnixTimestamp>,
+ until: Option<UnixTimestamp>,
+ limit: Option<u64>,
+ search: Option<QuerySearch>,
+}
+
+impl QueryPlanBranch {
+ pub fn from_spec(spec: QueryPlanBranchSpec) -> Result<Self, QueryPlanError> {
+ if let (Some(since), Some(until)) = (spec.since, spec.until)
+ && since > until
+ {
+ return Err(QueryPlanError::InvalidTimeRange { since, until });
+ }
+ let mut tag_filters = BTreeMap::new();
+ for filter in spec.tag_filters {
+ tag_filters
+ .entry(filter.name())
+ .or_insert_with(Vec::new)
+ .extend(filter.values().iter().cloned());
+ }
+ for values in tag_filters.values_mut() {
+ let unique = values.drain(..).collect::<BTreeSet<_>>();
+ values.extend(unique);
+ }
+ Ok(Self {
+ ids: unique_sorted(spec.ids),
+ authors: unique_sorted(spec.authors),
+ kinds: unique_sorted(spec.kinds),
+ tag_filters,
+ since: spec.since,
+ until: spec.until,
+ limit: spec.limit,
+ search: spec.search,
+ })
+ }
+
+ pub fn ids(&self) -> &[EventId] {
+ &self.ids
+ }
+
+ pub fn authors(&self) -> &[PublicKeyHex] {
+ &self.authors
+ }
+
+ pub fn kinds(&self) -> &[tangle_protocol::Kind] {
+ &self.kinds
+ }
+
+ pub fn tag_filters(&self) -> &BTreeMap<char, Vec<String>> {
+ &self.tag_filters
+ }
+
+ pub fn since(&self) -> Option<UnixTimestamp> {
+ self.since
+ }
+
+ pub fn until(&self) -> Option<UnixTimestamp> {
+ self.until
+ }
+
+ pub fn limit(&self) -> Option<u64> {
+ self.limit
+ }
+
+ pub fn search(&self) -> Option<&QuerySearch> {
+ self.search.as_ref()
+ }
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Eq)]
+pub struct QueryPlanBranchSpec {
+ pub ids: Vec<EventId>,
+ pub authors: Vec<PublicKeyHex>,
+ pub kinds: Vec<tangle_protocol::Kind>,
+ pub tag_filters: Vec<QueryTagFilter>,
+ pub since: Option<UnixTimestamp>,
+ pub until: Option<UnixTimestamp>,
+ pub limit: Option<u64>,
+ pub search: Option<QuerySearch>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct QueryTagFilter {
+ name: char,
+ values: Vec<String>,
+}
+
+impl QueryTagFilter {
+ pub fn new(name: char, values: Vec<String>) -> Result<Self, QueryPlanError> {
+ if !name.is_ascii_alphabetic() {
+ return Err(QueryPlanError::InvalidTagName { name });
+ }
+ if values.is_empty() {
+ return Err(QueryPlanError::EmptyTagValues { name });
+ }
+ if values.iter().any(String::is_empty) {
+ return Err(QueryPlanError::EmptyTagValue { name });
+ }
+ Ok(Self {
+ name,
+ values: unique_sorted(values),
+ })
+ }
+
+ pub fn name(&self) -> char {
+ self.name
+ }
+
+ pub fn values(&self) -> &[String] {
+ &self.values
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct QuerySearch {
+ raw: String,
+ terms: Vec<String>,
+}
+
+impl QuerySearch {
+ pub fn new(raw: &str, terms: Vec<String>) -> Result<Self, QueryPlanError> {
+ let raw = raw.trim();
+ if raw.is_empty() || terms.is_empty() || terms.iter().any(String::is_empty) {
+ return Err(QueryPlanError::EmptySearch);
+ }
+ Ok(Self {
+ raw: raw.to_owned(),
+ terms: unique_sorted(terms),
+ })
+ }
+
+ pub fn raw(&self) -> &str {
+ &self.raw
+ }
+
+ pub fn terms(&self) -> &[String] {
+ &self.terms
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum QuerySource {
+ RawEvents,
+ ListingProjections,
+ SearchDocuments,
+}
+
+impl QuerySource {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::RawEvents => "raw events",
+ Self::ListingProjections => "listing projections",
+ Self::SearchDocuments => "search documents",
+ }
+ }
+}
+
+impl fmt::Display for QuerySource {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str(self.as_str())
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum QueryExecutionMode {
+ Historical,
+ Live,
+ HistoricalThenLive,
+}
+
+impl QueryExecutionMode {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::Historical => "historical",
+ Self::Live => "live",
+ Self::HistoricalThenLive => "historical then live",
+ }
+ }
+}
+
+impl fmt::Display for QueryExecutionMode {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str(self.as_str())
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum QuerySort {
+ CreatedAtDescEventIdAsc,
+ ScoreDescCreatedAtDescEventIdAsc,
+}
+
+impl QuerySort {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::CreatedAtDescEventIdAsc => "created_at desc event_id asc",
+ Self::ScoreDescCreatedAtDescEventIdAsc => "score desc created_at desc event_id asc",
+ }
+ }
+}
+
+impl fmt::Display for QuerySort {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str(self.as_str())
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum QueryPlanError {
+ EmptyBranches,
+ InvalidTimeRange {
+ since: UnixTimestamp,
+ until: UnixTimestamp,
+ },
+ InvalidTagName {
+ name: char,
+ },
+ EmptyTagValues {
+ name: char,
+ },
+ EmptyTagValue {
+ name: char,
+ },
+ EmptySearch,
+}
+
+impl fmt::Display for QueryPlanError {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::EmptyBranches => {
+ formatter.write_str("query plan must include at least one branch")
+ }
+ Self::InvalidTimeRange { since, until } => {
+ write!(
+ formatter,
+ "query time range is invalid: since {since} > until {until}"
+ )
+ }
+ Self::InvalidTagName { name } => {
+ write!(
+ formatter,
+ "tag filter name must be ASCII alphabetic, got `{name}`"
+ )
+ }
+ Self::EmptyTagValues { name } => {
+ write!(
+ formatter,
+ "tag filter `{name}` must include at least one value"
+ )
+ }
+ Self::EmptyTagValue { name } => {
+ write!(formatter, "tag filter `{name}` values must not be empty")
+ }
+ Self::EmptySearch => formatter.write_str("search query must include terms"),
+ }
+ }
+}
+
+impl std::error::Error for QueryPlanError {}
+
+fn unique_sorted<T>(values: Vec<T>) -> Vec<T>
+where
+ T: Ord,
+{
+ values
+ .into_iter()
+ .collect::<BTreeSet<_>>()
+ .into_iter()
+ .collect()
+}
+
fn require_positive(field: &'static str, value: u64) -> Result<(), RuntimeLimitConfigError> {
if value == 0 {
Err(RuntimeLimitConfigError::Zero { field })
@@ -1253,8 +1582,10 @@ mod tests {
AdmissionContext, AdmissionEffect, AdmissionEvent, AdmissionEventKind, AdmissionPolicy,
AdmissionRejectionKind, EventIngestionEffect, EventIngestionRejectionKind, EventIngestor,
EventParser, EventValidationRejection, EventValidationRejectionKind, EventValidator,
- ProjectionExclusionReason, RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues,
- RuntimeLimits, UnapprovedSellerAction,
+ ProjectionExclusionReason, QueryExecutionMode, QueryPlan, QueryPlanBranch,
+ QueryPlanBranchSpec, QueryPlanError, QuerySearch, QuerySort, QuerySource, QueryTagFilter,
+ RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues, RuntimeLimits,
+ UnapprovedSellerAction,
};
use tangle_nips::{ListingProjection, evaluate_listing_projection, parse_deletion_request};
use tangle_protocol::{
@@ -2501,6 +2832,224 @@ mod tests {
assert_eq!(deletion_failing.events().expect("events").len(), 1);
}
+ #[test]
+ fn query_plan_model_accepts_multi_branch_historical_live_plans() {
+ let search = QuerySearch::new(
+ " carrots local ",
+ vec![
+ "local".to_owned(),
+ "carrots".to_owned(),
+ "carrots".to_owned(),
+ ],
+ )
+ .expect("search");
+ let tag_filter = QueryTagFilter::new(
+ 't',
+ vec![
+ "vegetables".to_owned(),
+ "carrots".to_owned(),
+ "vegetables".to_owned(),
+ ],
+ )
+ .expect("tag");
+ let branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec {
+ ids: vec![
+ EventId::new(&"b".repeat(EventId::HEX_LENGTH)).expect("id"),
+ EventId::new(&"a".repeat(EventId::HEX_LENGTH)).expect("id"),
+ EventId::new(&"a".repeat(EventId::HEX_LENGTH)).expect("id"),
+ ],
+ authors: vec![pubkey("2"), pubkey("1"), pubkey("1")],
+ kinds: vec![
+ Kind::new(30_402).expect("kind"),
+ Kind::new(1).expect("kind"),
+ Kind::new(1).expect("kind"),
+ ],
+ tag_filters: vec![
+ tag_filter.clone(),
+ QueryTagFilter::new('t', vec!["local".to_owned()]).expect("tag"),
+ ],
+ since: Some(UnixTimestamp::new(10)),
+ until: Some(UnixTimestamp::new(20)),
+ limit: Some(50),
+ search: Some(search.clone()),
+ })
+ .expect("branch");
+ let second_branch =
+ QueryPlanBranch::from_spec(QueryPlanBranchSpec::default()).expect("second branch");
+ let plan = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::HistoricalThenLive,
+ QuerySort::CreatedAtDescEventIdAsc,
+ vec![branch.clone(), second_branch],
+ )
+ .expect("plan");
+
+ assert_eq!(search.raw(), "carrots local");
+ assert_eq!(search.terms(), &["carrots".to_owned(), "local".to_owned()]);
+ assert_eq!(tag_filter.name(), 't');
+ assert_eq!(
+ tag_filter.values(),
+ &["carrots".to_owned(), "vegetables".to_owned()]
+ );
+ assert_eq!(plan.source(), QuerySource::RawEvents);
+ assert_eq!(plan.mode(), QueryExecutionMode::HistoricalThenLive);
+ assert_eq!(plan.sort(), QuerySort::CreatedAtDescEventIdAsc);
+ assert_eq!(plan.branches().len(), 2);
+ assert!(plan.requires_historical_query());
+ assert!(plan.subscribes_to_live_events());
+ assert_eq!(branch.ids()[0].as_str(), &"a".repeat(EventId::HEX_LENGTH));
+ assert_eq!(branch.authors()[0], pubkey("1"));
+ assert_eq!(branch.kinds()[0], Kind::new(1).expect("kind"));
+ assert_eq!(
+ branch.tag_filters().get(&'t').expect("tag values"),
+ &[
+ "carrots".to_owned(),
+ "local".to_owned(),
+ "vegetables".to_owned(),
+ ]
+ );
+ assert_eq!(branch.since(), Some(UnixTimestamp::new(10)));
+ assert_eq!(branch.until(), Some(UnixTimestamp::new(20)));
+ assert_eq!(branch.limit(), Some(50));
+ assert_eq!(branch.search(), Some(&search));
+ }
+
+ #[test]
+ fn query_plan_model_distinguishes_historical_and_live_execution() {
+ let zero_limit_branch = QueryPlanBranch::from_spec(QueryPlanBranchSpec {
+ limit: Some(0),
+ ..QueryPlanBranchSpec::default()
+ })
+ .expect("branch");
+ let historical_then_live = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::HistoricalThenLive,
+ QuerySort::CreatedAtDescEventIdAsc,
+ vec![zero_limit_branch.clone()],
+ )
+ .expect("historical then live");
+ let live = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::Live,
+ QuerySort::CreatedAtDescEventIdAsc,
+ vec![zero_limit_branch.clone()],
+ )
+ .expect("live");
+ let historical = QueryPlan::new(
+ QuerySource::ListingProjections,
+ QueryExecutionMode::Historical,
+ QuerySort::ScoreDescCreatedAtDescEventIdAsc,
+ vec![zero_limit_branch],
+ )
+ .expect("historical");
+
+ assert!(!historical_then_live.requires_historical_query());
+ assert!(historical_then_live.subscribes_to_live_events());
+ assert!(!live.requires_historical_query());
+ assert!(live.subscribes_to_live_events());
+ assert!(!historical.requires_historical_query());
+ assert!(!historical.subscribes_to_live_events());
+ assert_eq!(historical.source(), QuerySource::ListingProjections);
+ assert_eq!(
+ historical.sort(),
+ QuerySort::ScoreDescCreatedAtDescEventIdAsc
+ );
+ }
+
+ #[test]
+ fn query_plan_model_rejects_invalid_shapes_and_has_stable_labels() {
+ let empty_branches = QueryPlan::new(
+ QuerySource::RawEvents,
+ QueryExecutionMode::Historical,
+ QuerySort::CreatedAtDescEventIdAsc,
+ Vec::new(),
+ )
+ .expect_err("empty");
+ let invalid_time = QueryPlanBranch::from_spec(QueryPlanBranchSpec {
+ since: Some(UnixTimestamp::new(20)),
+ until: Some(UnixTimestamp::new(10)),
+ ..QueryPlanBranchSpec::default()
+ })
+ .expect_err("time");
+ let invalid_tag = QueryTagFilter::new('1', vec!["value".to_owned()]).expect_err("tag");
+ let empty_tag_values = QueryTagFilter::new('t', Vec::new()).expect_err("tag values");
+ let empty_tag_value = QueryTagFilter::new('t', vec![String::new()]).expect_err("tag value");
+ let empty_search = QuerySearch::new(" ", vec!["carrots".to_owned()]).expect_err("search");
+ let empty_search_terms = QuerySearch::new("carrots", Vec::new()).expect_err("terms");
+ let empty_search_term = QuerySearch::new("carrots", vec![String::new()]).expect_err("term");
+
+ assert_eq!(empty_branches, QueryPlanError::EmptyBranches);
+ assert_eq!(
+ invalid_time,
+ QueryPlanError::InvalidTimeRange {
+ since: UnixTimestamp::new(20),
+ until: UnixTimestamp::new(10),
+ }
+ );
+ assert_eq!(invalid_tag, QueryPlanError::InvalidTagName { name: '1' });
+ assert_eq!(
+ empty_tag_values,
+ QueryPlanError::EmptyTagValues { name: 't' }
+ );
+ assert_eq!(empty_tag_value, QueryPlanError::EmptyTagValue { name: 't' });
+ assert_eq!(empty_search, QueryPlanError::EmptySearch);
+ assert_eq!(empty_search_terms, QueryPlanError::EmptySearch);
+ assert_eq!(empty_search_term, QueryPlanError::EmptySearch);
+ assert_eq!(
+ empty_branches.to_string(),
+ "query plan must include at least one branch"
+ );
+ assert_eq!(
+ invalid_time.to_string(),
+ "query time range is invalid: since 20 > until 10"
+ );
+ assert_eq!(
+ invalid_tag.to_string(),
+ "tag filter name must be ASCII alphabetic, got `1`"
+ );
+ assert_eq!(
+ empty_tag_values.to_string(),
+ "tag filter `t` must include at least one value"
+ );
+ assert_eq!(
+ empty_tag_value.to_string(),
+ "tag filter `t` values must not be empty"
+ );
+ assert_eq!(empty_search.to_string(), "search query must include terms");
+ assert_eq!(
+ [
+ QuerySource::RawEvents.as_str(),
+ QuerySource::ListingProjections.as_str(),
+ QuerySource::SearchDocuments.as_str(),
+ ],
+ ["raw events", "listing projections", "search documents"]
+ );
+ assert_eq!(
+ [
+ QueryExecutionMode::Historical.as_str(),
+ QueryExecutionMode::Live.as_str(),
+ QueryExecutionMode::HistoricalThenLive.as_str(),
+ ],
+ ["historical", "live", "historical then live"]
+ );
+ assert_eq!(
+ [
+ QuerySort::CreatedAtDescEventIdAsc.as_str(),
+ QuerySort::ScoreDescCreatedAtDescEventIdAsc.as_str(),
+ ],
+ [
+ "created_at desc event_id asc",
+ "score desc created_at desc event_id asc",
+ ]
+ );
+ assert_eq!(QuerySource::SearchDocuments.to_string(), "search documents");
+ assert_eq!(QueryExecutionMode::Live.to_string(), "live");
+ assert_eq!(
+ QuerySort::ScoreDescCreatedAtDescEventIdAsc.to_string(),
+ "score desc created_at desc event_id asc"
+ );
+ }
+
fn limits_with(update: impl FnOnce(&mut RuntimeLimitValues)) -> RuntimeLimits {
let mut values = RuntimeLimitValues::default();
update(&mut values);