commit 837df617cb50696a759e6320495e91f616f10a2d
parent c409089ca9a07c0a97ac2af8e1c4487e3ff3f917
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 23:09:01 -0700
store-surreal: query current events
Diffstat:
1 file changed, 147 insertions(+), 0 deletions(-)
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -1107,6 +1107,80 @@ UPSERT type::record('event_current', $address_key) CONTENT {
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn query_current_events(
+ &self,
+ filter: &Filter,
+ ) -> Result<Vec<serde_json::Value>, SurrealStoreError> {
+ let mut statement =
+ "SELECT * FROM event_current WHERE deleted = false AND hidden = false".to_owned();
+ if !filter.ids().is_empty() {
+ statement.push_str(" AND event_id IN $ids");
+ }
+ if !filter.authors().is_empty() {
+ statement.push_str(" AND pubkey IN $authors");
+ }
+ if !filter.kinds().is_empty() {
+ statement.push_str(" AND kind IN $kinds");
+ }
+ if filter.since().is_some() {
+ statement.push_str(" AND created_at >= $since");
+ }
+ if filter.until().is_some() {
+ statement.push_str(" AND created_at <= $until");
+ }
+ statement.push_str(" ORDER BY created_at DESC, event_id ASC");
+ if filter.limit().is_some() {
+ statement.push_str(" LIMIT $limit");
+ }
+ statement.push(';');
+ let mut query = self.db.query(statement);
+ if !filter.ids().is_empty() {
+ query = query.bind((
+ "ids",
+ filter
+ .ids()
+ .iter()
+ .map(|id| id.as_str().to_owned())
+ .collect::<Vec<_>>(),
+ ));
+ }
+ if !filter.authors().is_empty() {
+ query = query.bind((
+ "authors",
+ filter
+ .authors()
+ .iter()
+ .map(|pubkey| pubkey.as_str().to_owned())
+ .collect::<Vec<_>>(),
+ ));
+ }
+ if !filter.kinds().is_empty() {
+ query = query.bind((
+ "kinds",
+ filter
+ .kinds()
+ .iter()
+ .map(|kind| kind.as_u32())
+ .collect::<Vec<_>>(),
+ ));
+ }
+ if let Some(since) = filter.since() {
+ query = query.bind(("since", since.as_u64()));
+ }
+ if let Some(until) = filter.until() {
+ query = query.bind(("until", until.as_u64()));
+ }
+ if let Some(limit) = filter.limit() {
+ query = query.bind(("limit", limit));
+ }
+ let mut response = query
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
pub async fn apply_deletion_markers(
&self,
event: &Event,
@@ -3176,6 +3250,79 @@ mod tests {
}
#[tokio::test]
+ async fn query_current_events_returns_replaceable_winners() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let pubkey_a = "1".repeat(PublicKeyHex::HEX_LENGTH);
+ let pubkey_b = "2".repeat(PublicKeyHex::HEX_LENGTH);
+ let older = synthetic_event("1", "b", &pubkey_a, 100, 3, Vec::new(), "older");
+ let newer = synthetic_event("2", "c", &pubkey_a, 101, 3, Vec::new(), "newer");
+ let other = synthetic_event("3", "d", &pubkey_b, 102, 3, Vec::new(), "other");
+ let addressable = synthetic_event(
+ "4",
+ "e",
+ &pubkey_a,
+ 103,
+ 30_402,
+ vec![Tag::from_parts("d", &["listing-current"]).expect("d tag")],
+ "listing",
+ );
+ for event in [&older, &newer, &other, &addressable] {
+ store.maintain_current_event(event).await.expect("current");
+ }
+
+ let replaceable_filter = filter_from_value(&serde_json::json!({
+ "authors": [pubkey_a],
+ "kinds": [3],
+ "since": 100,
+ "until": 101,
+ "limit": 1
+ }))
+ .expect("replaceable filter");
+ let rows = store
+ .query_current_events(&replaceable_filter)
+ .await
+ .expect("current rows");
+ assert_eq!(rows.len(), 1);
+ assert_eq!(rows[0]["event_id"], newer.id().as_str());
+
+ let kind_filter = filter_from_value(&serde_json::json!({
+ "kinds": [3]
+ }))
+ .expect("kind filter");
+ let rows = store
+ .query_current_events(&kind_filter)
+ .await
+ .expect("kind rows");
+ assert_eq!(rows.len(), 2);
+ assert_eq!(rows[0]["event_id"], other.id().as_str());
+ assert_eq!(rows[1]["event_id"], newer.id().as_str());
+
+ store
+ .database()
+ .query("UPDATE event_current SET deleted = true WHERE event_id = $event_id;")
+ .bind(("event_id", newer.id().as_str()))
+ .await
+ .expect("delete current")
+ .check()
+ .expect("delete check");
+ let id_filter = filter_from_value(&serde_json::json!({
+ "ids": [newer.id().as_str()]
+ }))
+ .expect("id filter");
+ assert!(
+ store
+ .query_current_events(&id_filter)
+ .await
+ .expect("deleted current")
+ .is_empty()
+ );
+ }
+
+ #[tokio::test]
async fn apply_deletion_markers_persists_markers_and_author_scoped_deletes() {
let store = memory_store().await;
store