commit 4b0a20d9cd416b5c27cdb1565d7ac3d2144e4a6b
parent d33f78987ee4e56fc777cb89efa13cade9858e40
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 13:57:06 -0700
runtime: use current auth for live reqs
Diffstat:
6 files changed, 509 insertions(+), 86 deletions(-)
diff --git a/crates/tangle_protocol/src/lib.rs b/crates/tangle_protocol/src/lib.rs
@@ -703,6 +703,10 @@ impl Filter {
self.search.as_deref()
}
+ pub fn is_complete(&self) -> bool {
+ !self.ids.is_empty()
+ }
+
pub fn matches(&self, event: &Event) -> bool {
if !self.ids.is_empty() && !self.ids.iter().any(|id| id == event.id()) {
return false;
@@ -2309,6 +2313,23 @@ mod tests {
}
#[test]
+ fn filter_complete_semantics_are_exact_id_only() {
+ assert!(
+ filter_from_value(&serde_json::json!({
+ "ids": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"]
+ }))
+ .expect("id filter")
+ .is_complete()
+ );
+ assert!(
+ !filter_from_value(&serde_json::json!({"kinds": [1]}))
+ .expect("kind filter")
+ .is_complete()
+ );
+ assert!(!Filter::empty().is_complete());
+ }
+
+ #[test]
fn filter_model_rejects_non_matching_events() {
let event_tag = "e".repeat(EventId::HEX_LENGTH);
let event = event_for_filter(&event_tag, 50, 1);
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -935,9 +935,17 @@ impl BaseRelay {
BaseRelayQueryMetrics::default(),
));
}
- self.subscriptions
- .subscribe(subscription_id.clone(), filters.clone(), auth.clone())?;
- self.query_req_with_group_auth_report(subscription_id, filters, auth)
+ let should_subscribe = !filters_are_complete(&filters);
+ if should_subscribe {
+ self.subscriptions
+ .ensure_can_subscribe(&subscription_id, &filters)?;
+ }
+ let report =
+ self.query_req_with_group_auth_report(subscription_id.clone(), filters.clone(), auth)?;
+ if should_subscribe {
+ self.subscriptions.subscribe(subscription_id, filters)?;
+ }
+ Ok(report)
}
fn query_req_with_group_auth_report(
@@ -1116,8 +1124,16 @@ impl BaseRelay {
}
pub fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> {
+ self.fanout_with_group_auth(event, &GroupAuthContext::unauthenticated())
+ }
+
+ pub fn fanout_with_group_auth(
+ &mut self,
+ event: &Event,
+ auth: &GroupAuthContext,
+ ) -> Vec<RelayMessage> {
let groups = self.groups.as_ref();
- self.subscriptions.fanout(event, |event, auth| {
+ self.subscriptions.fanout(event, auth, |event, auth| {
Self::group_read_gate_visible_to_auth(groups, event, auth).unwrap_or(false)
})
}
@@ -1320,6 +1336,10 @@ impl BaseRelay {
}
}
+fn filters_are_complete(filters: &[Filter]) -> bool {
+ !filters.is_empty() && filters.iter().all(Filter::is_complete)
+}
+
#[cfg(test)]
mod tests {
use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits};
@@ -1328,11 +1348,12 @@ mod tests {
use crate::relay::live::CloseResult;
use tangle_crypto::RelaySigner;
use tangle_groups::{
- GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_CREATE_INVITE,
- KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA,
- KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
- KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberStatus,
- NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset, parse_group_runtime_config_json,
+ GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP,
+ KIND_GROUP_CREATE_INVITE, KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP,
+ KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
+ KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER,
+ MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, StoreOffset,
+ parse_group_runtime_config_json,
};
use tangle_protocol::{
ClientMessage, Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId,
@@ -2949,7 +2970,7 @@ mod tests {
}
#[test]
- fn private_group_live_fanout_uses_subscription_auth() {
+ fn private_group_live_fanout_uses_current_auth() {
let owner = signer(7).public_key().clone();
let auth = authenticated_state(7);
let mut relay = test_relay_with_groups(
@@ -2960,14 +2981,10 @@ mod tests {
relay
.handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth)
.expect("create");
- let unauth_sub = SubscriptionId::new("fanout-unauth").expect("sub");
- let auth_sub = SubscriptionId::new("fanout-auth").expect("sub");
- relay
- .handle_req(unauth_sub, vec![filter_kind(1)])
- .expect("unauth sub");
+ let subscription_id = SubscriptionId::new("fanout-current-auth").expect("sub");
relay
- .handle_req_with_auth(auth_sub.clone(), vec![filter_kind(1)], &auth)
- .expect("auth sub");
+ .handle_req(subscription_id.clone(), vec![filter_kind(1)])
+ .expect("sub");
let private_event = signed_event_at(
7,
1,
@@ -2979,10 +2996,18 @@ mod tests {
.handle_event_with_auth(private_event.clone(), &auth)
.expect("private event");
+ assert!(relay.fanout(&private_event).is_empty());
assert!(matches!(
- relay.fanout(&private_event).as_slice(),
- [RelayMessage::Event { subscription_id, event }]
- if subscription_id == &auth_sub && event.id() == private_event.id()
+ relay
+ .fanout_with_group_auth(
+ &private_event,
+ &GroupAuthContext::new([owner])
+ )
+ .as_slice(),
+ [RelayMessage::Event {
+ subscription_id: delivered,
+ event
+ }] if delivered == &subscription_id && event.id() == private_event.id()
));
}
diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs
@@ -14,7 +14,6 @@ pub(crate) struct LiveSubscriptionSet {
#[derive(Debug, Clone, PartialEq, Eq)]
struct LiveSubscription {
filters: Vec<Filter>,
- auth: GroupAuthContext,
}
impl LiveSubscriptionSet {
@@ -42,22 +41,30 @@ impl LiveSubscriptionSet {
&mut self,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
- auth: GroupAuthContext,
+ ) -> Result<(), BaseRelayError> {
+ self.ensure_can_subscribe(&subscription_id, &filters)?;
+ self.subscriptions
+ .insert(subscription_id, LiveSubscription { filters });
+ Ok(())
+ }
+
+ pub(crate) fn ensure_can_subscribe(
+ &self,
+ subscription_id: &SubscriptionId,
+ filters: &[Filter],
) -> Result<(), BaseRelayError> {
if filters.is_empty() {
return Err(BaseRelayError::invalid(
"subscription must include at least one filter",
));
}
- if !self.subscriptions.contains_key(&subscription_id)
+ if !self.subscriptions.contains_key(subscription_id)
&& self.subscriptions.len() >= self.max_subscriptions
{
return Err(BaseRelayError::invalid(
"connection subscription limit exceeded",
));
}
- self.subscriptions
- .insert(subscription_id, LiveSubscription { filters, auth });
Ok(())
}
@@ -78,6 +85,7 @@ impl LiveSubscriptionSet {
pub(crate) fn fanout(
&mut self,
event: &Event,
+ auth: &GroupAuthContext,
visible_to_auth: impl Fn(&Event, &GroupAuthContext) -> bool,
) -> Vec<RelayMessage> {
let matched = self
@@ -91,7 +99,7 @@ impl LiveSubscriptionSet {
{
return None;
}
- if visible_to_auth(event, &subscription.auth) {
+ if visible_to_auth(event, auth) {
Some(subscription_id.clone())
} else {
None
@@ -134,7 +142,6 @@ mod tests {
.subscribe(
subscription_id.clone(),
vec![filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter")],
- GroupAuthContext::unauthenticated(),
)
.expect("subscribe");
let first = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "first")
@@ -145,17 +152,23 @@ mod tests {
.expect("third");
assert!(matches!(
- subscriptions.fanout(&first, |_, _| true).as_slice(),
+ subscriptions
+ .fanout(&first, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .as_slice(),
[RelayMessage::Event { subscription_id: delivered, event }]
if delivered == &subscription_id && event.id() == first.id()
));
assert!(matches!(
- subscriptions.fanout(&second, |_, _| true).as_slice(),
+ subscriptions
+ .fanout(&second, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .as_slice(),
[RelayMessage::Event { subscription_id: delivered, event }]
if delivered == &subscription_id && event.id() == second.id()
));
assert!(matches!(
- subscriptions.fanout(&third, |_, _| true).as_slice(),
+ subscriptions
+ .fanout(&third, &GroupAuthContext::unauthenticated(), |_, _| true)
+ .as_slice(),
[RelayMessage::Event { subscription_id: delivered, event }]
if delivered == &subscription_id && event.id() == third.id()
));
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -1078,10 +1078,12 @@ impl TangleRuntimeHandle {
&self,
offset: StoreOffset,
subscriptions: &mut LiveSubscriptionSet,
+ auth: &BaseAuthState,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
let pocket_event = self.inner.store.event_by_offset(offset.as_u64())?;
let event = pocket_event_to_tangle(&pocket_event)?;
- Ok(subscriptions.fanout(&event, |event, auth| {
+ let group_auth = GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned());
+ Ok(subscriptions.fanout(&event, &group_auth, |event, auth| {
BaseRelay::group_read_gate_visible_to_auth(self.inner.groups.as_ref(), event, auth)
.unwrap_or(false)
}))
@@ -1899,9 +1901,9 @@ mod tests {
time::Duration,
};
use tangle_groups::{
- CanonicalGroupEvent, GroupAuthContext, GroupEventClass, GroupId, GroupProjection,
- KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_MEMBERS,
- KIND_GROUP_METADATA, MemberStatus, StoreOffset, rebuild_group_projection,
+ CanonicalGroupEvent, GroupEventClass, GroupId, GroupProjection, KIND_GROUP_ADMINS,
+ KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
+ MemberStatus, StoreOffset, rebuild_group_projection,
};
use tangle_protocol::{
ClientMessage, Event, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, Tag,
@@ -2111,7 +2113,6 @@ mod tests {
.subscribe(
subscription_id.clone(),
vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")],
- GroupAuthContext::unauthenticated(),
)
.expect("subscribe");
let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "live")
@@ -2135,7 +2136,7 @@ mod tests {
let offset = offsets.try_recv().expect("offset");
assert!(matches!(
handle
- .fanout_event_offset(offset, &mut subscriptions)
+ .fanout_event_offset(offset, &mut subscriptions, &auth)
.await
.expect("fanout")
.as_slice(),
@@ -3325,7 +3326,6 @@ mod tests {
}))
.expect("filter"),
],
- GroupAuthContext::unauthenticated(),
)
.expect("subscribe");
@@ -3396,7 +3396,7 @@ mod tests {
let mut generated_kinds = BTreeSet::new();
for offset in generated_offsets {
let messages = handle
- .fanout_event_offset(offset, &mut subscriptions)
+ .fanout_event_offset(offset, &mut subscriptions, &auth)
.await
.expect("fanout");
assert!(matches!(
@@ -4076,28 +4076,20 @@ mod tests {
let stress_filter =
filter_from_value(&json!({"kinds":[1], "#h":["StressPrivate"]})).expect("filter");
member_subscriptions
- .subscribe(
- member_subscription.clone(),
- vec![stress_filter.clone()],
- GroupAuthContext::new([FixtureKey::Member.public_key()]),
- )
+ .subscribe(member_subscription.clone(), vec![stress_filter.clone()])
.expect("member subscribe");
public_subscriptions
- .subscribe(
- public_subscription,
- vec![stress_filter],
- GroupAuthContext::unauthenticated(),
- )
+ .subscribe(public_subscription, vec![stress_filter])
.expect("public subscribe");
let mut member_fanout_count = 0;
for offset in &published_offsets {
let public_replies = handle
- .fanout_event_offset(*offset, &mut public_subscriptions)
+ .fanout_event_offset(*offset, &mut public_subscriptions, &public_auth)
.await
.expect("public fanout");
assert!(public_replies.is_empty());
let member_replies = handle
- .fanout_event_offset(*offset, &mut member_subscriptions)
+ .fanout_event_offset(*offset, &mut member_subscriptions, &member_auth)
.await
.expect("member fanout");
for reply in member_replies {
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -20,7 +20,6 @@ use std::{
sync::atomic::{AtomicU64, Ordering},
time::{Instant, SystemTime, UNIX_EPOCH},
};
-use tangle_groups::GroupAuthContext;
use tangle_protocol::{
ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp, parse_client_message,
};
@@ -199,8 +198,9 @@ impl TangleWebSocketSession {
offset: tangle_groups::StoreOffset,
) -> TangleSessionControl {
let runtime = self.runtime.clone();
+ let auth = self.auth.clone();
let replies = match runtime
- .fanout_event_offset(offset, &mut self.subscriptions)
+ .fanout_event_offset(offset, &mut self.subscriptions, &auth)
.await
{
Ok(replies) => replies,
@@ -341,24 +341,22 @@ impl TangleWebSocketSession {
{
return Ok(vec![message]);
}
- self.subscriptions.subscribe(
- subscription_id.clone(),
- filters.clone(),
- GroupAuthContext::new(self.auth.authenticated_pubkeys().iter().cloned()),
- )?;
- metrics.record_subscription_opened();
- logging::log_subscription_opened(self.connection_id, &subscription_id);
- match self
+ let should_subscribe = !filters_are_complete(&filters);
+ if should_subscribe {
+ self.subscriptions
+ .ensure_can_subscribe(&subscription_id, &filters)?;
+ }
+ let replies = self
.runtime
- .query_req_with_auth(subscription_id.clone(), filters, &self.auth)
- .await
- {
- Ok(replies) => Ok(replies),
- Err(error) => {
- self.subscriptions.close(&subscription_id);
- Err(error)
- }
+ .query_req_with_auth(subscription_id.clone(), filters.clone(), &self.auth)
+ .await?;
+ if should_subscribe {
+ self.subscriptions
+ .subscribe(subscription_id.clone(), filters)?;
+ metrics.record_subscription_opened();
+ logging::log_subscription_opened(self.connection_id, &subscription_id);
}
+ Ok(replies)
}
fn client_rate_limit_context(&self) -> TangleClientRateLimitContext {
@@ -450,6 +448,10 @@ fn current_unix_timestamp() -> UnixTimestamp {
)
}
+fn filters_are_complete(filters: &[Filter]) -> bool {
+ !filters.is_empty() && filters.iter().all(Filter::is_complete)
+}
+
#[cfg(test)]
mod tests {
use super::{
@@ -469,9 +471,13 @@ mod tests {
use std::path::{Path, PathBuf};
use tangle_groups::StoreOffset;
use tangle_protocol::{
- ClientMessage, Filter, RelayMessage, SubscriptionId, event_to_value, filter_from_value,
+ ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value,
+ filter_from_value,
+ };
+ use tangle_test_support::{
+ FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event,
+ tangle_v2_group_event,
};
- use tangle_test_support::{FixtureKey, tangle_v2_event};
#[test]
fn websocket_session_records_connection_time() {
@@ -698,6 +704,264 @@ mod tests {
}
#[tokio::test]
+ async fn websocket_session_live_fanout_uses_current_auth() {
+ let shutdown = TangleShutdownSignal::new();
+ let root = temp_root("current-auth-live");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntimeHandle::new(
+ TangleRuntime::open(runtime_config_with_groups(&root)).expect("runtime"),
+ );
+ let mut owner_auth = runtime.auth_state().await.expect("owner auth");
+ owner_auth
+ .issue_challenge("owner-live", UnixTimestamp::new(100))
+ .expect("owner challenge");
+ let owner_auth_event =
+ tangle_v2_auth_event(FixtureKey::Owner, "owner-live", 120).expect("owner auth event");
+ assert_eq!(
+ runtime
+ .handle_client_message(
+ ClientMessage::Auth(owner_auth_event.clone()),
+ &mut owner_auth,
+ UnixTimestamp::new(120)
+ )
+ .await
+ .expect("owner auth"),
+ vec![RelayMessage::Ok {
+ event_id: owner_auth_event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let create = tangle_v2_group_create_event(FixtureKey::Owner, "LiveFarm", 121, &["private"])
+ .expect("create");
+ assert_eq!(
+ runtime
+ .handle_client_message(
+ ClientMessage::Event(create.clone()),
+ &mut owner_auth,
+ UnixTimestamp::new(121)
+ )
+ .await
+ .expect("create"),
+ vec![RelayMessage::Ok {
+ event_id: create.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let session_auth = runtime.auth_state().await.expect("session auth");
+ let events = runtime.subscribe_events().await;
+ let mut session = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime.clone(),
+ session_auth,
+ events,
+ )
+ .expect("session");
+ let subscription_id = SubscriptionId::new("current-auth-live").expect("subscription");
+
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: subscription_id.clone(),
+ filters: vec![
+ filter_from_value(&json!({"kinds":[1], "#h":["LiveFarm"]}))
+ .expect("filter")
+ ],
+ })
+ .await
+ .expect("req"),
+ vec![RelayMessage::Eose(subscription_id.clone())]
+ );
+ assert_eq!(session.active_subscription_count(), 1);
+ let before_auth =
+ tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 122, 1, "before auth")
+ .expect("before auth");
+ let before_auth_id = before_auth.id().clone();
+ assert_eq!(
+ runtime
+ .handle_client_message(
+ ClientMessage::Event(before_auth),
+ &mut owner_auth,
+ UnixTimestamp::new(122)
+ )
+ .await
+ .expect("before event"),
+ vec![RelayMessage::Ok {
+ event_id: before_auth_id,
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let offset = session.events.recv().await;
+ assert_eq!(
+ session.handle_event_receive_result(offset).await,
+ TangleSessionControl::Continue
+ );
+ assert!(session.outbound_receiver.try_recv().is_err());
+
+ let session_now = current_unix_timestamp();
+ session
+ .auth
+ .issue_challenge("session-live", session_now)
+ .expect("session challenge");
+ let session_auth_event =
+ tangle_v2_auth_event(FixtureKey::Owner, "session-live", session_now.as_u64())
+ .expect("auth event");
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Auth(session_auth_event.clone()))
+ .await
+ .expect("session auth"),
+ vec![RelayMessage::Ok {
+ event_id: session_auth_event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let after_auth = tangle_v2_group_event(FixtureKey::Owner, "LiveFarm", 132, 1, "after auth")
+ .expect("after auth");
+ assert_eq!(
+ runtime
+ .handle_client_message(
+ ClientMessage::Event(after_auth.clone()),
+ &mut owner_auth,
+ UnixTimestamp::new(132)
+ )
+ .await
+ .expect("after event"),
+ vec![RelayMessage::Ok {
+ event_id: after_auth.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let offset = session.events.recv().await;
+ assert_eq!(
+ session.handle_event_receive_result(offset).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ RelayMessage::Event {
+ subscription_id,
+ event: after_auth
+ }
+ .encode()
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn websocket_session_complete_and_failed_reqs_do_not_subscribe() {
+ let shutdown = TangleShutdownSignal::new();
+ let root = temp_root("complete-req-lifecycle");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime =
+ TangleRuntimeHandle::new(TangleRuntime::open(runtime_config(&root)).expect("runtime"));
+ let mut auth = runtime.auth_state().await.expect("auth");
+ let events = runtime.subscribe_events().await;
+ let mut session = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime.clone(),
+ runtime.auth_state().await.expect("session auth"),
+ events,
+ )
+ .expect("session");
+ let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "complete")
+ .expect("event");
+
+ assert_eq!(
+ runtime
+ .handle_client_message(
+ ClientMessage::Event(event.clone()),
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("event"),
+ vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let exact_id = SubscriptionId::new("exact-id").expect("subscription");
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: exact_id.clone(),
+ filters: vec![
+ filter_from_value(&json!({"ids":[event.id().as_str()]}))
+ .expect("exact filter")
+ ],
+ })
+ .await
+ .expect("exact req"),
+ vec![
+ RelayMessage::Event {
+ subscription_id: exact_id.clone(),
+ event: event.clone()
+ },
+ RelayMessage::Eose(exact_id)
+ ]
+ );
+ assert_eq!(session.active_subscription_count(), 0);
+
+ let open = SubscriptionId::new("open").expect("subscription");
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: open.clone(),
+ filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("open filter")],
+ })
+ .await
+ .expect("open req"),
+ vec![
+ RelayMessage::Event {
+ subscription_id: open.clone(),
+ event
+ },
+ RelayMessage::Eose(open.clone())
+ ]
+ );
+ assert_eq!(session.active_subscription_count(), 1);
+
+ let search = SubscriptionId::new("search").expect("subscription");
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: search.clone(),
+ filters: vec![
+ filter_from_value(&json!({"search":"carrots"})).expect("search filter")
+ ],
+ })
+ .await
+ .expect("search req"),
+ vec![RelayMessage::Closed {
+ subscription_id: search,
+ message: "unsupported: search filters are not supported".to_owned()
+ }]
+ );
+ assert_eq!(session.active_subscription_count(), 1);
+
+ let invalid = SubscriptionId::new("invalid").expect("subscription");
+ let invalid_result = session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: invalid,
+ filters: vec![Filter::empty(); 11],
+ })
+ .await;
+ assert!(invalid_result.is_err());
+ assert_eq!(session.active_subscription_count(), 1);
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn websocket_session_preserves_chorus_close_scope_parity() {
let shutdown = TangleShutdownSignal::new();
let root = temp_root("chorus-close-scope-parity");
@@ -1111,6 +1375,91 @@ mod tests {
runtime_config_with_outbound_queue(root, 8)
}
+ fn runtime_config_with_groups(root: &Path) -> BaseRelayRuntimeConfig {
+ let raw = json!({
+ "server": {
+ "listen_addr": "127.0.0.1:0",
+ "relay_url": "wss://relay.radroots.test"
+ },
+ "pocket": {
+ "data_directory": root.join("pocket"),
+ "sync_policy": "flush_on_shutdown",
+ "query": {
+ "allow_scraping": false,
+ "allow_scrape_if_limited_to": 100,
+ "allow_scrape_if_max_seconds": 3600
+ }
+ },
+ "groups": {
+ "enabled": true,
+ "canonical_relay_url": "wss://relay.radroots.test",
+ "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777",
+ "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()],
+ "policy": {
+ "public_join": false,
+ "invites_enabled": false
+ }
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300,
+ "created_at_skew_seconds": 600
+ },
+ "limits": {
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_query_complexity": 2048,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 8,
+ "per_connection_outbound_queue": 8
+ },
+ "rate_limits": {
+ "auth": {
+ "per_ip": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 30},
+ "failures": {"window_seconds": 300, "max_hits": 5},
+ "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
+ },
+ "event": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 1000}
+ },
+ "group": {
+ "write_per_ip": {"window_seconds": 60, "max_hits": 300},
+ "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
+ "write_per_group": {"window_seconds": 60, "max_hits": 90},
+ "write_per_kind": {"window_seconds": 60, "max_hits": 300},
+ "join_flow": {"window_seconds": 300, "max_hits": 10},
+ "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
+ }
+ }
+ })
+ .to_string();
+ parse_base_relay_runtime_config_json(&raw).expect("config")
+ }
+
fn runtime_config_with_outbound_queue(
root: &Path,
per_connection_outbound_queue: usize,
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -3,12 +3,13 @@
use std::{fs, panic, path::PathBuf};
use tangle_crypto::{RelaySigner, event_id_matches, verify_event_signature};
use tangle_groups::{
- GroupAuthority, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, GroupOutboxEffect,
- GroupOutboxKey, GroupOutboxRecord, GroupOutboxStatus, GroupProjection, GroupRuntimeConfig,
- KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
- KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, MemberStatus,
- NIP29_RELAY_GENERATED_KIND_VALUES, PERMANENT_RELAY_OVERRIDE_ROLE, ProjectionCheckpoint,
- StoreOffset, member_current_key, parse_group_runtime_config_json, projection_checkpoint_key,
+ GroupAuthContext, GroupAuthority, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig,
+ GroupOutboxEffect, GroupOutboxKey, GroupOutboxRecord, GroupOutboxStatus, GroupProjection,
+ GroupRuntimeConfig, KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST,
+ KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER,
+ MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, PERMANENT_RELAY_OVERRIDE_ROLE,
+ ProjectionCheckpoint, StoreOffset, member_current_key, parse_group_runtime_config_json,
+ projection_checkpoint_key,
};
use tangle_protocol::{
Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp, event_to_value,
@@ -510,7 +511,10 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() {
let live_unauth = subscription("live-private-unauth");
let live_owner = subscription("live-private-owner");
relay
- .handle_req(live_unauth, vec![filter_group_tag(1, "h", "PrivateFarm")])
+ .handle_req(
+ live_unauth.clone(),
+ vec![filter_group_tag(1, "h", "PrivateFarm")],
+ )
.expect("live unauth");
relay
.handle_req_with_auth(
@@ -527,11 +531,25 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() {
.expect("second"),
&second_private,
);
- assert!(matches!(
- relay.fanout(&second_private).as_slice(),
- [RelayMessage::Event { subscription_id, event }]
- if subscription_id == &live_owner && event.id() == second_private.id()
- ));
+ assert!(relay.fanout(&second_private).is_empty());
+ let owner_live = relay.fanout_with_group_auth(
+ &second_private,
+ &GroupAuthContext::new([FixtureKey::Owner.public_key()]),
+ );
+ assert!(owner_live.iter().any(|message| {
+ matches!(
+ message,
+ RelayMessage::Event { subscription_id, event }
+ if subscription_id == &live_unauth && event.id() == second_private.id()
+ )
+ }));
+ assert!(owner_live.iter().any(|message| {
+ matches!(
+ message,
+ RelayMessage::Event { subscription_id, event }
+ if subscription_id == &live_owner && event.id() == second_private.id()
+ )
+ }));
accept_group_create(&mut relay, "HiddenFarm", &["hidden"], 10, &owner_auth);
assert_count(
@@ -710,11 +728,16 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() {
.expect("live private"),
&live_private,
);
- assert!(matches!(
- relay.fanout(&live_private).as_slice(),
- [RelayMessage::Event { subscription_id, event }]
+ assert!(relay.fanout(&live_private).is_empty());
+ let member_live = relay.fanout_with_group_auth(
+ &live_private,
+ &GroupAuthContext::new([FixtureKey::Member.public_key()]),
+ );
+ assert!(member_live.iter().any(|message| matches!(
+ message,
+ RelayMessage::Event { subscription_id, event }
if subscription_id == &live_member && event.id() == live_private.id()
- ));
+ )));
assert_count(
relay.handle_count(