commit 4f8ba91c3bdf29e5fcc3f6b8621ef5085623bef6
parent 48b3a888c0b22cd0c19e579ada25925c8fdee189
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 03:18:03 -0700
runtime: order public req results
- merge multi-filter REQ results with OR semantics after per-filter matching
- enforce Tangle filter matches inside the Pocket screen callback before visibility checks
- sort initial REQ results newest-first with lowest event id as the timestamp tie-break
- cover AND filtering, per-filter limits, dedupe, and deterministic ordering
Diffstat:
1 file changed, 124 insertions(+), 26 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -377,43 +377,74 @@ impl BaseRelay {
filters: &[Filter],
auth: &GroupAuthContext,
) -> Result<Vec<Event>, BaseRelayError> {
- let mut seen = BTreeSet::new();
let mut output = Vec::new();
for filter in filters {
- let pocket_filter = tangle_filter_to_pocket(filter)?;
- let screen_error = RefCell::new(None);
- let screened = self.store.find_events_with_screen(
- &pocket_filter,
- true,
- 0,
- u64::MAX,
- |pocket_event| {
- if screen_error.borrow().is_some() {
- return PocketScreenResult::Mismatch;
- }
- match pocket_event_to_tangle(pocket_event)
- .and_then(|event| self.event_visible_to_auth(&event, auth))
- {
+ let mut events =
+ Self::sort_and_dedupe_query_events(self.query_filter_events(filter, auth)?);
+ if let Some(limit) = filter.limit() {
+ events.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
+ }
+ output.extend(events);
+ }
+ Ok(Self::sort_and_dedupe_query_events(output))
+ }
+
+ fn query_filter_events(
+ &self,
+ filter: &Filter,
+ auth: &GroupAuthContext,
+ ) -> Result<Vec<Event>, BaseRelayError> {
+ let pocket_filter = tangle_filter_to_pocket(filter)?;
+ let screen_error = RefCell::new(None);
+ let screened = self.store.find_events_with_screen(
+ &pocket_filter,
+ true,
+ 0,
+ u64::MAX,
+ |pocket_event| {
+ if screen_error.borrow().is_some() {
+ return PocketScreenResult::Mismatch;
+ }
+ match pocket_event_to_tangle(pocket_event) {
+ Ok(event) if !filter.matches(&event) => PocketScreenResult::Mismatch,
+ Ok(event) => match self.event_visible_to_auth(&event, auth) {
Ok(true) => PocketScreenResult::Match,
Ok(false) => PocketScreenResult::Redacted,
Err(error) => {
*screen_error.borrow_mut() = Some(error);
PocketScreenResult::Mismatch
}
+ },
+ Err(error) => {
+ *screen_error.borrow_mut() = Some(error);
+ PocketScreenResult::Mismatch
}
- },
- )?;
- if let Some(error) = screen_error.into_inner() {
- return Err(error);
- }
- for pocket_event in screened.into_events() {
- let event = pocket_event_to_tangle(&pocket_event)?;
- if seen.insert(event.id().clone()) {
- output.push(event);
}
- }
+ },
+ )?;
+ if let Some(error) = screen_error.into_inner() {
+ return Err(error);
}
- Ok(output)
+ screened
+ .into_events()
+ .into_iter()
+ .map(|pocket_event| pocket_event_to_tangle(&pocket_event))
+ .collect()
+ }
+
+ fn sort_and_dedupe_query_events(mut events: Vec<Event>) -> Vec<Event> {
+ events.sort_by(|left, right| {
+ right
+ .unsigned()
+ .created_at()
+ .cmp(&left.unsigned().created_at())
+ .then_with(|| left.id().cmp(right.id()))
+ });
+ let mut seen = BTreeSet::new();
+ events
+ .into_iter()
+ .filter(|event| seen.insert(event.id().clone()))
+ .collect()
}
fn event_visible_to_auth(
@@ -509,6 +540,73 @@ mod tests {
}
#[test]
+ fn base_relay_req_merges_filters_with_order_dedupe_and_limits() {
+ let mut relay = test_relay("base-relay-req-order", 8);
+ let market_tag = Tag::from_parts("t", &["market"]).expect("tag");
+ let old_market =
+ signed_event_at(7, 1, vec![market_tag.clone()], "old market", 1_714_124_433);
+ let tied_author =
+ signed_event_at(7, 1, vec![market_tag.clone()], "tied author", 1_714_124_434);
+ let tied_other =
+ signed_event_at(8, 1, vec![market_tag.clone()], "tied other", 1_714_124_434);
+ let kind_two = signed_event_at(7, 2, Vec::new(), "kind two", 1_714_124_435);
+ let wrong_tag = signed_event_at(
+ 9,
+ 1,
+ vec![Tag::from_parts("t", &["other"]).expect("tag")],
+ "wrong tag",
+ 1_714_124_436,
+ );
+
+ for event in [
+ &old_market,
+ &tied_other,
+ &kind_two,
+ &wrong_tag,
+ &tied_author,
+ ] {
+ assert_accepted(relay.handle_event(event.clone()).expect("event"), event);
+ }
+
+ let subscription_id = SubscriptionId::new("req-order").expect("sub");
+ let market_limit =
+ filter_from_value(&serde_json::json!({"kinds":[1],"#t":["market"],"limit":2}))
+ .expect("market filter");
+ let author_limit = filter_from_value(&serde_json::json!({
+ "authors":[tied_author.unsigned().pubkey().as_str()],
+ "kinds":[1,2],
+ "limit":2
+ }))
+ .expect("author filter");
+ let messages = relay
+ .handle_req(subscription_id.clone(), vec![market_limit, author_limit])
+ .expect("req");
+ let mut tied = [tied_author.clone(), tied_other.clone()];
+ tied.sort_by(|left, right| left.id().cmp(right.id()));
+ let expected = [kind_two.clone(), tied[0].clone(), tied[1].clone()];
+
+ assert_eq!(messages.len(), expected.len() + 1);
+ for (message, event) in messages.iter().zip(expected.iter()) {
+ assert!(matches!(
+ message,
+ RelayMessage::Event {
+ subscription_id: actual,
+ event: found
+ } if actual == &subscription_id && found.id() == event.id()
+ ));
+ }
+ assert_eq!(
+ messages.last(),
+ Some(&RelayMessage::Eose(subscription_id.clone()))
+ );
+ assert!(!messages.iter().any(|message| matches!(
+ message,
+ RelayMessage::Event { event, .. }
+ if event.id() == old_market.id() || event.id() == wrong_tag.id()
+ )));
+ }
+
+ #[test]
fn base_relay_rejects_group_marked_events_before_group_service() {
let mut relay = test_relay("base-relay-group-reject", 4);
let event = signed_public_event(