commit c409089ca9a07c0a97ac2af8e1c4487e3ff3f917
parent b54fdc9d481f4b37c0f783c45973aca01734ca52
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 23:07:58 -0700
store-surreal: query indexed tags
Diffstat:
1 file changed, 206 insertions(+), 0 deletions(-)
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -2,6 +2,7 @@
use core::fmt;
use sha2::{Digest, Sha256};
+use std::collections::BTreeSet;
use surrealdb::Surreal;
use surrealdb::engine::local::{Db, Mem};
use tangle_nips::{
@@ -1001,6 +1002,49 @@ CREATE event_tag_index CONTENT {
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn query_indexed_tag_event_ids(
+ &self,
+ filter: &Filter,
+ ) -> Result<Vec<String>, SurrealStoreError> {
+ if filter.tag_filters().is_empty() {
+ return Ok(Vec::new());
+ }
+ let mut first_order = Vec::new();
+ let mut intersection = None::<BTreeSet<String>>;
+ for (name, values) in filter.tag_filters() {
+ let ids = self
+ .query_single_indexed_tag_event_ids(
+ name.as_str(),
+ &values
+ .iter()
+ .map(|value| value.as_str().to_owned())
+ .collect::<Vec<_>>(),
+ filter,
+ )
+ .await?;
+ let ids = unique_in_order(ids);
+ if first_order.is_empty() {
+ first_order = ids.clone();
+ }
+ let current = ids.into_iter().collect::<BTreeSet<_>>();
+ intersection = Some(match intersection {
+ Some(previous) => previous.intersection(¤t).cloned().collect(),
+ None => current,
+ });
+ }
+ let Some(intersection) = intersection else {
+ return Ok(Vec::new());
+ };
+ let mut result = first_order
+ .into_iter()
+ .filter(|event_id| intersection.contains(event_id))
+ .collect::<Vec<_>>();
+ if let Some(limit) = filter.limit() {
+ result.truncate(limit as usize);
+ }
+ Ok(result)
+ }
+
pub async fn maintain_current_event(
&self,
event: &Event,
@@ -1590,6 +1634,67 @@ UPSERT type::record('search_doc', $doc_key) CONTENT {
response.take(0).map_err(SurrealStoreError::from)
}
+ async fn query_single_indexed_tag_event_ids(
+ &self,
+ tag: &str,
+ values: &[String],
+ filter: &Filter,
+ ) -> Result<Vec<String>, SurrealStoreError> {
+ let mut statement =
+ "SELECT VALUE event_id FROM event_tag_index WHERE tag = $tag AND value IN $values"
+ .to_owned();
+ if !filter.authors().is_empty() {
+ statement.push_str(" AND pubkey IN $authors");
+ }
+ if !filter.kinds().is_empty() {
+ statement.push_str(" AND kind IN $kinds");
+ }
+ if filter.since().is_some() {
+ statement.push_str(" AND created_at >= $since");
+ }
+ if filter.until().is_some() {
+ statement.push_str(" AND created_at <= $until");
+ }
+ statement.push_str(" ORDER BY created_at DESC, event_id ASC;");
+ let mut query = self
+ .db
+ .query(statement)
+ .bind(("tag", tag))
+ .bind(("values", values.to_vec()));
+ if !filter.authors().is_empty() {
+ query = query.bind((
+ "authors",
+ filter
+ .authors()
+ .iter()
+ .map(|pubkey| pubkey.as_str().to_owned())
+ .collect::<Vec<_>>(),
+ ));
+ }
+ if !filter.kinds().is_empty() {
+ query = query.bind((
+ "kinds",
+ filter
+ .kinds()
+ .iter()
+ .map(|kind| kind.as_u32())
+ .collect::<Vec<_>>(),
+ ));
+ }
+ if let Some(since) = filter.since() {
+ query = query.bind(("since", since.as_u64()));
+ }
+ if let Some(until) = filter.until() {
+ query = query.bind(("until", until.as_u64()));
+ }
+ let mut response = query
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
async fn applied_migration(
&self,
name: &str,
@@ -1965,6 +2070,17 @@ fn tag_values(event: &Event, name: &str) -> Vec<String> {
.collect()
}
+fn unique_in_order(values: Vec<String>) -> Vec<String> {
+ let mut seen = BTreeSet::new();
+ let mut unique = Vec::new();
+ for value in values {
+ if seen.insert(value.clone()) {
+ unique.push(value);
+ }
+ }
+ unique
+}
+
fn price_minor(raw: &str) -> Option<i64> {
let mut parts = raw.split('.');
let whole = parts.next()?.parse::<i64>().ok()?;
@@ -2827,6 +2943,96 @@ mod tests {
}
#[tokio::test]
+ async fn query_indexed_tags_intersects_filter_tag_constraints() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let target_event = "a".repeat(EventId::HEX_LENGTH);
+ let other_event = "b".repeat(EventId::HEX_LENGTH);
+ let pubkey_a = "1".repeat(PublicKeyHex::HEX_LENGTH);
+ let pubkey_b = "2".repeat(PublicKeyHex::HEX_LENGTH);
+ let first = synthetic_event(
+ "1",
+ "b",
+ &pubkey_a,
+ 100,
+ 1,
+ vec![
+ Tag::from_parts("e", &[&target_event]).expect("e tag"),
+ Tag::from_parts("p", &[&pubkey_a]).expect("p tag"),
+ ],
+ "first",
+ );
+ let second = synthetic_event(
+ "2",
+ "c",
+ &pubkey_b,
+ 101,
+ 1,
+ vec![
+ Tag::from_parts("e", &[&target_event]).expect("e tag"),
+ Tag::from_parts("p", &[&pubkey_b]).expect("p tag"),
+ ],
+ "second",
+ );
+ let third = synthetic_event(
+ "3",
+ "d",
+ &pubkey_a,
+ 102,
+ 1,
+ vec![
+ Tag::from_parts("e", &[&other_event]).expect("e tag"),
+ Tag::from_parts("p", &[&pubkey_a]).expect("p tag"),
+ ],
+ "third",
+ );
+ for event in [&first, &second, &third] {
+ store.index_event_tags(event).await.expect("index");
+ }
+
+ let intersection = filter_from_value(&serde_json::json!({
+ "#e": [target_event],
+ "#p": [pubkey_a],
+ "kinds": [1],
+ "since": 100,
+ "until": 102
+ }))
+ .expect("intersection filter");
+ assert_eq!(
+ store
+ .query_indexed_tag_event_ids(&intersection)
+ .await
+ .expect("intersection"),
+ vec![first.id().as_str().to_owned()]
+ );
+
+ let ordered = filter_from_value(&serde_json::json!({
+ "#e": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"],
+ "limit": 1
+ }))
+ .expect("ordered filter");
+ assert_eq!(
+ store
+ .query_indexed_tag_event_ids(&ordered)
+ .await
+ .expect("ordered"),
+ vec![second.id().as_str().to_owned()]
+ );
+ assert!(
+ store
+ .query_indexed_tag_event_ids(
+ &filter_from_value(&serde_json::json!({"kinds": [1]})).expect("no tag filter")
+ )
+ .await
+ .expect("no tags")
+ .is_empty()
+ );
+ }
+
+ #[tokio::test]
async fn maintain_current_events_tracks_replaceable_and_addressable_winners() {
let store = memory_store().await;
store