tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit ae6ea890ea3dba4b974be661bfccc101ee74ecb5
parent 0a84d0a813e87869fa4d5a9e55f4ad86d1bf99d3
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 14:59:05 -0700

runtime: add safe count hll parity

Diffstat:
Mcrates/tangle_protocol/src/lib.rs | 33+++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/src/pocket_conversion.rs | 2+-
Mcrates/tangle_runtime/src/relay/core.rs | 170+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/tangle_runtime/src/runtime.rs | 102++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 3++-
Mcrates/tangle_store_pocket/src/lib.rs | 3++-
6 files changed, 293 insertions(+), 20 deletions(-)

diff --git a/crates/tangle_protocol/src/lib.rs b/crates/tangle_protocol/src/lib.rs @@ -815,6 +815,7 @@ pub enum RelayMessage { Count { subscription_id: SubscriptionId, count: u64, + hll: Option<String>, }, Notice(String), Auth(String), @@ -859,7 +860,18 @@ pub fn relay_message_to_value(message: &RelayMessage) -> serde_json::Value { RelayMessage::Count { subscription_id, count, - } => serde_json::json!(["COUNT", subscription_id.as_str(), {"count": count}]), + hll, + } => { + let mut payload = serde_json::Map::new(); + payload.insert( + "count".to_owned(), + serde_json::Value::Number((*count).into()), + ); + if let Some(hll) = hll { + payload.insert("hll".to_owned(), serde_json::Value::String(hll.clone())); + } + serde_json::json!(["COUNT", subscription_id.as_str(), payload]) + } RelayMessage::Notice(message) => serde_json::json!(["NOTICE", message]), RelayMessage::Auth(challenge) => serde_json::json!(["AUTH", challenge]), RelayMessage::NegErr { @@ -1955,6 +1967,7 @@ mod tests { RelayMessage::Count { subscription_id: subscription_id.clone(), count: 3, + hll: None, }, serde_json::json!(["COUNT", "sub-vector", {"count": 3}]), ), @@ -2646,7 +2659,8 @@ mod tests { assert_eq!( relay_message_to_value(&RelayMessage::Count { subscription_id, - count: 7 + count: 7, + hll: None }), serde_json::json!(["COUNT", "sub-a", {"count": 7}]) ); @@ -2681,6 +2695,21 @@ mod tests { } #[test] + fn relay_message_encoder_emits_count_hll_when_present() { + let subscription_id = SubscriptionId::new("count-hll").expect("sub"); + let hll = "0a".repeat(256); + + assert_eq!( + relay_message_to_value(&RelayMessage::Count { + subscription_id, + count: 42, + hll: Some(hll.clone()) + }), + serde_json::json!(["COUNT", "count-hll", {"count": 42, "hll": hll}]) + ); + } + + #[test] fn event_to_value_round_trips_with_event_parser() { let event = parse_event_json( &RawEventJson::new(&event_json("e", "f", 30402, tags_json())).expect("raw"), diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs @@ -155,7 +155,7 @@ pub(crate) fn pocket_event_id(event_id: &EventId) -> Result<PocketEventId, BaseR .map_err(|error| BaseRelayError::error(error.to_string())) } -fn pocket_pubkey(pubkey: &PublicKeyHex) -> Result<PocketPubkey, BaseRelayError> { +pub(crate) fn pocket_pubkey(pubkey: &PublicKeyHex) -> Result<PocketPubkey, BaseRelayError> { PocketPubkey::read_hex(pubkey.as_str().as_bytes()) .map_err(|error| BaseRelayError::error(error.to_string())) } diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -5,7 +5,8 @@ use crate::groups::{ use crate::logging::{self, TangleModerationAuditResult}; use crate::ops::BaseRelayReadinessState; use crate::pocket_conversion::{ - pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket, tangle_filter_to_pocket, + pocket_event_id, pocket_event_to_tangle, pocket_pubkey, tangle_event_to_pocket, + tangle_filter_to_pocket, }; use crate::relay::{ auth::BaseAuthState, @@ -22,7 +23,7 @@ use tangle_groups::{ }; use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ - PocketQueryConfig, PocketScreenResult, PocketStoreConfig, PocketStoreHandle, + PocketHll8, PocketQueryConfig, PocketScreenResult, PocketStoreConfig, PocketStoreHandle, }; pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled"; @@ -199,17 +200,24 @@ impl BaseRelayQueryMetrics { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] struct BaseRelayCountEventsReport { count: u64, + hll: Option<String>, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics, } impl BaseRelayCountEventsReport { - fn new(count: u64, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics) -> Self { + fn new( + count: u64, + hll: Option<String>, + group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, + ) -> Self { Self { count, + hll, group_read_denied, query_metrics, } @@ -1149,6 +1157,7 @@ impl BaseRelay { RelayMessage::Count { subscription_id, count: report.count, + hll: report.hll, }, report.group_read_denied, report.query_metrics, @@ -1250,6 +1259,8 @@ impl BaseRelay { let mut group_read_denied = false; let mut query_metrics = BaseRelayQueryMetrics::default(); let count_query = query.exact_count(); + let hll_offset = Self::count_hll_offset(filters)?; + let mut hll = hll_offset.map(|_| PocketHll8::new()); for filter in filters { let filter = filter.without_limit(); let report = Self::query_filter_events_report_with_services( @@ -1264,18 +1275,37 @@ impl BaseRelay { group_read_denied |= report.group_read_denied; query_metrics = query_metrics.add(report.query_metrics); for event in report.events { + if let (Some(hll), Some(offset)) = (&mut hll, hll_offset) { + let pubkey = pocket_pubkey(event.unsigned().pubkey())?; + hll.add_element(pubkey.as_bytes(), offset) + .map_err(|error| BaseRelayError::error(error.to_string()))?; + } seen.insert(event.id().clone()); } } let count = u64::try_from(seen.len()) .map_err(|_| BaseRelayError::error("visible event count overflow"))?; + let hll = (!group_read_denied) + .then(|| hll.map(|value| value.to_hex_string())) + .flatten(); Ok(BaseRelayCountEventsReport::new( count, + hll, group_read_denied, query_metrics, )) } + fn count_hll_offset(filters: &[Filter]) -> Result<Option<usize>, BaseRelayError> { + let [filter] = filters else { + return Ok(None); + }; + let pocket_filter = tangle_filter_to_pocket(filter)?; + pocket_filter + .hyperloglog_offset() + .map_err(|error| BaseRelayError::error(error.to_string())) + } + fn query_filter_events_report_with_services( store: &PocketStoreHandle, groups: Option<&GroupServiceHandle>, @@ -1433,7 +1463,8 @@ mod tests { .expect("count"), RelayMessage::Count { subscription_id: subscription_id.clone(), - count: 1 + count: 1, + hll: None } ); assert!(matches!( @@ -1839,7 +1870,8 @@ mod tests { .expect("visible count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-visible").expect("sub"), - count: 4 + count: 4, + hll: None } ); assert_eq!( @@ -1852,7 +1884,8 @@ mod tests { .expect("auth count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-auth").expect("sub"), - count: 5 + count: 5, + hll: None } ); @@ -2098,7 +2131,8 @@ mod tests { .expect("count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-limit").expect("sub"), - count: 2 + count: 2, + hll: None } ); @@ -2111,12 +2145,121 @@ mod tests { .expect("count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-dedupe").expect("sub"), - count: 3 + count: 3, + hll: None } ); } #[test] + fn base_relay_count_hll_emits_for_public_single_filter() { + let relay = test_relay("base-relay-count-hll-public", 8); + let target = "a".repeat(EventId::HEX_LENGTH); + let target_tag = Tag::from_parts("e", &[&target]).expect("tag"); + let first = signed_public_event(7, 7, vec![target_tag.clone()], "first reaction"); + let second = signed_public_event(8, 7, vec![target_tag], "second reaction"); + + for event in [&first, &second] { + assert_accepted(relay.handle_event(event.clone()).expect("event"), event); + } + + let RelayMessage::Count { count, hll, .. } = relay + .handle_count( + SubscriptionId::new("count-hll-public").expect("sub"), + vec![ + filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) + .expect("filter"), + ], + ) + .expect("count") + else { + panic!("count expected") + }; + let hll = hll.expect("hll"); + + assert_eq!(count, 2); + assert_eq!(hll.len(), 512); + assert_ne!(hll, "00".repeat(256)); + } + + #[test] + fn base_relay_count_hll_omits_for_noneligible_and_redacted_counts() { + let owner = signer(7).public_key().clone(); + let owner_auth = authenticated_state(7); + let unauth = BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state"); + let relay = test_relay_with_groups( + "base-relay-count-hll-omits", + 8, + &enabled_groups_for_owner(&owner), + ); + let target = "b".repeat(EventId::HEX_LENGTH); + let target_tag = Tag::from_parts("e", &[&target]).expect("tag"); + let public = signed_public_event(8, 7, vec![target_tag.clone()], "public reaction"); + + assert_accepted(relay.handle_event(public.clone()).expect("public"), &public); + let private_create = signed_private_group_create_event(7, "PrivateHll"); + assert_accepted( + relay + .handle_event_with_auth(private_create.clone(), &owner_auth) + .expect("private create"), + &private_create, + ); + let private = signed_event_at( + 7, + 7, + vec![h("PrivateHll"), target_tag], + "private reaction", + 1_714_124_434, + ); + assert_accepted( + relay + .handle_event_with_auth(private.clone(), &owner_auth) + .expect("private reaction"), + &private, + ); + + let limited = relay + .handle_count_with_auth( + SubscriptionId::new("count-hll-limited").expect("sub"), + vec![ + filter_from_value( + &serde_json::json!({"kinds":[7],"#e":[target.clone()],"limit":1}), + ) + .expect("filter"), + ], + &owner_auth, + ) + .expect("limited count"); + assert!(matches!( + limited, + RelayMessage::Count { + count: 2, + hll: None, + .. + } + )); + + let redacted = relay + .handle_count_with_auth( + SubscriptionId::new("count-hll-redacted").expect("sub"), + vec![ + filter_from_value(&serde_json::json!({"kinds":[7],"#e":[target]})) + .expect("filter"), + ], + &unauth, + ) + .expect("redacted count"); + assert!(matches!( + redacted, + RelayMessage::Count { + count: 1, + hll: None, + .. + } + )); + } + + #[test] fn base_relay_count_does_not_apply_default_or_client_limits() { let config = test_store_config("base-relay-count-no-default-limit"); let relay = BaseRelay::open( @@ -2166,7 +2309,8 @@ mod tests { .expect("count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-unbounded").expect("sub"), - count: 3 + count: 3, + hll: None } ); assert_eq!( @@ -2178,7 +2322,8 @@ mod tests { .expect("count"), RelayMessage::Count { subscription_id: SubscriptionId::new("count-client-limited").expect("sub"), - count: 3 + count: 3, + hll: None } ); } @@ -3234,7 +3379,8 @@ mod tests { .expect("count"), vec![RelayMessage::Count { subscription_id: count_id, - count: 0 + count: 0, + hll: None }] ); } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -38,7 +38,8 @@ use tangle_groups::{ validate_client_group_event_structure, }; use tangle_protocol::{ - ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, UnixTimestamp, + ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, + UnixTimestamp, }; use tangle_store_pocket::PocketStoreHandle; use tokio::sync::watch; @@ -201,6 +202,25 @@ impl TangleQueryClassifier { fn has_count_bounded_selector(self, filter: &Filter) -> bool { self.has_strong_constraint(filter) || (!filter.kinds().is_empty() && self.has_bounded_time_window(filter)) + || self.has_hll_count_selector(filter) + } + + fn has_hll_count_selector(self, filter: &Filter) -> bool { + let [kind] = filter.kinds() else { + return false; + }; + let mut tags = filter.tag_filters().iter(); + let Some((name, values)) = tags.next() else { + return false; + }; + if tags.next().is_some() || values.len() != 1 { + return false; + } + match (kind.as_u32(), name.as_str()) { + (3, "p") => PublicKeyHex::new(values[0].as_str()).is_ok(), + (7, "e") => EventId::new(values[0].as_str()).is_ok(), + _ => false, + } } fn has_group_constraint(self, filter: &Filter) -> bool { @@ -3038,6 +3058,8 @@ mod tests { "limit": 1 })) .expect("filter"); + let hll_reaction_filter = + filter_from_value(&json!({"kinds": [7], "#e": ["a".repeat(64)]})).expect("filter"); assert_eq!( classifier.classify_count(&[]), @@ -3071,6 +3093,78 @@ mod tests { classifier.classify_count(&[bounded_time_filter]), TangleQueryClassification::Bounded ); + assert_eq!( + classifier.classify_count(&[hll_reaction_filter]), + TangleQueryClassification::Bounded + ); + } + + #[tokio::test] + async fn runtime_count_hll_accepts_public_pocket_selector() { + let root = temp_root("runtime-count-hll"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"), + ); + let mut auth = handle.auth_state().await.expect("auth"); + let target = "c".repeat(64); + let first = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 7, + vec![Tag::from_parts("e", &[&target]).expect("tag")], + "first reaction", + ) + .expect("first"); + let second = tangle_v2_event( + FixtureKey::Admin, + 1_714_124_434, + 7, + vec![Tag::from_parts("e", &[&target]).expect("tag")], + "second reaction", + ) + .expect("second"); + + assert_accepted_reply( + runtime_event_reply(&handle, first.clone(), &mut auth, 1_714_124_435).await, + &first, + ); + assert_accepted_reply( + runtime_event_reply(&handle, second.clone(), &mut auth, 1_714_124_436).await, + &second, + ); + + let subscription_id = SubscriptionId::new("count-hll-runtime").expect("subscription"); + let replies = handle + .handle_client_message( + ClientMessage::Count { + subscription_id: subscription_id.clone(), + filters: vec![ + filter_from_value(&json!({"kinds":[7],"#e":[target]})).expect("filter"), + ], + }, + &mut auth, + UnixTimestamp::new(1_714_124_437), + ) + .await + .expect("count"); + let [ + RelayMessage::Count { + subscription_id: actual, + count, + hll: Some(hll), + }, + ] = replies.as_slice() + else { + panic!("count hll expected: {replies:?}") + }; + + assert_eq!(actual, &subscription_id); + assert_eq!(*count, 2); + assert_eq!(hll.len(), 512); + assert_ne!(hll, &"00".repeat(256)); + + let _ = std::fs::remove_dir_all(root); } #[test] @@ -4235,7 +4329,8 @@ mod tests { replies, vec![RelayMessage::Count { subscription_id, - count: u64::try_from(group_write_count).expect("group count") + count: u64::try_from(group_write_count).expect("group count"), + hll: None }] ); })); @@ -4265,7 +4360,8 @@ mod tests { replies, vec![RelayMessage::Count { subscription_id, - count: 0 + count: 0, + hll: None }] ); })); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -2369,7 +2369,8 @@ fn assert_relay_count(message: RelayMessage, subscription_id: &str, count: u64) message, RelayMessage::Count { subscription_id: SubscriptionId::new(subscription_id).expect("subscription"), - count + count, + hll: None } ); } diff --git a/crates/tangle_store_pocket/src/lib.rs b/crates/tangle_store_pocket/src/lib.rs @@ -6,7 +6,7 @@ use pocket_db::{ heed::{Database, types::Bytes}, }; use pocket_types::{ - Event, Filter, Id, Kind, OwnedEvent, OwnedFilter, OwnedTags, Pubkey, Sig, Tags, Time, + Event, Filter, Hll8, Id, Kind, OwnedEvent, OwnedFilter, OwnedTags, Pubkey, Sig, Tags, Time, }; use std::{ io, @@ -20,6 +20,7 @@ pub const POCKET_SOURCE_REVISION: &str = "329334f20948c796c6016b673b92551ac4855a pub type PocketEvent = Event; pub type PocketEventId = Id; pub type PocketFilter = Filter; +pub type PocketHll8 = Hll8; pub type PocketKind = Kind; pub type PocketOwnedEvent = OwnedEvent; pub type PocketOwnedFilter = OwnedFilter;