tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit b8992e9381892358079e3042ba91c07cf8a3fc21
parent 4b0a20d9cd416b5c27cdb1565d7ac3d2144e4a6b
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 14:11:41 -0700

runtime: close redacted reqs

Diffstat:
Mcrates/tangle_runtime/src/relay/core.rs | 80++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Mcrates/tangle_runtime/src/runtime.rs | 16+++++++++++-----
Mcrates/tangle_runtime/src/session.rs | 127+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 101++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 20++++++++++++++------
5 files changed, 288 insertions(+), 56 deletions(-)

diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -510,6 +510,22 @@ impl BaseRelay { }) } + fn redacted_req_closed( + subscription_id: SubscriptionId, + auth: &GroupAuthContext, + ) -> RelayMessage { + let message = if auth.authenticated_pubkeys().is_empty() { + BaseRelayError::auth_required("authentication required to read group events") + .prefixed_message() + } else { + BaseRelayError::restricted("group is unavailable").prefixed_message() + }; + RelayMessage::Closed { + subscription_id, + message, + } + } + pub fn open( config: &PocketStoreConfig, limits: BaseRelayLimits, @@ -942,7 +958,7 @@ impl BaseRelay { } let report = self.query_req_with_group_auth_report(subscription_id.clone(), filters.clone(), auth)?; - if should_subscribe { + if should_subscribe && !report.group_read_denied() { self.subscriptions.subscribe(subscription_id, filters)?; } Ok(report) @@ -995,7 +1011,11 @@ impl BaseRelay { event, }) .collect::<Vec<_>>(); - messages.push(RelayMessage::Eose(subscription_id)); + if group_read_denied { + messages.push(Self::redacted_req_closed(subscription_id, auth)); + } else { + messages.push(RelayMessage::Eose(subscription_id)); + } Ok(BaseRelayQueryReport::new( messages, group_read_denied, @@ -1577,6 +1597,7 @@ mod tests { fn base_relay_req_count_paths_preserve_chorus_parity() { let owner = signer(7).public_key().clone(); let auth = authenticated_state(7); + let outsider_auth = authenticated_state(8); let mut relay = test_relay_with_groups( "base-relay-req-count-chorus-parity", 8, @@ -1658,10 +1679,51 @@ mod tests { .collect::<Vec<_>>(); assert_eq!(event_ids, expected_ids); - assert_eq!(messages.last(), Some(&RelayMessage::Eose(subscription_id))); + assert_eq!( + messages.last(), + Some(&RelayMessage::Closed { + subscription_id: subscription_id.clone(), + message: "auth-required: authentication required to read group events".to_owned() + }) + ); + assert!(!messages.iter().any( + |message| matches!(message, RelayMessage::Eose(actual) if actual == &subscription_id) + )); assert!(!event_ids.contains(private_market.id())); assert!(!event_ids.contains(old_market.id())); assert!(!event_ids.contains(wrong_tag.id())); + assert_eq!(relay.active_subscription_count(), 0); + + let restricted_sub = SubscriptionId::new("restricted-screened").expect("sub"); + let restricted_messages = relay + .handle_req_with_auth( + restricted_sub.clone(), + vec![market_limit.clone(), author_limit.clone()], + &outsider_auth, + ) + .expect("restricted req"); + let restricted_event_ids = restricted_messages + .iter() + .filter_map(|message| match message { + RelayMessage::Event { + subscription_id: actual, + event, + } if actual == &restricted_sub => Some(event.id().clone()), + _ => None, + }) + .collect::<Vec<_>>(); + assert_eq!(restricted_event_ids, expected_ids); + assert_eq!( + restricted_messages.last(), + Some(&RelayMessage::Closed { + subscription_id: restricted_sub.clone(), + message: "restricted: group is unavailable".to_owned() + }) + ); + assert!(!restricted_messages.iter().any( + |message| matches!(message, RelayMessage::Eose(actual) if actual == &restricted_sub) + )); + assert_eq!(relay.active_subscription_count(), 0); let private_sub = SubscriptionId::new("private-screened").expect("sub"); assert_eq!( @@ -1671,8 +1733,12 @@ mod tests { vec![filter_group_tag(1, "h", "Private")] ) .expect("private unauth req"), - vec![RelayMessage::Eose(private_sub)] + vec![RelayMessage::Closed { + subscription_id: private_sub, + message: "auth-required: authentication required to read group events".to_owned() + }] ); + assert_eq!(relay.active_subscription_count(), 0); let private_auth_sub = SubscriptionId::new("private-auth").expect("sub"); assert!(matches!( relay @@ -2886,8 +2952,12 @@ mod tests { relay .handle_req(unauth_sub.clone(), vec![filter_kind(1)]) .expect("unauth req"), - vec![RelayMessage::Eose(unauth_sub)] + vec![RelayMessage::Closed { + subscription_id: unauth_sub, + message: "auth-required: authentication required to read group events".to_owned() + }] ); + assert_eq!(relay.active_subscription_count(), 0); assert!(matches!( relay .handle_req_with_auth(auth_sub.clone(), vec![filter_kind(1)], &auth) diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1035,12 +1035,12 @@ impl TangleRuntimeHandle { .rate_limit_req(subscription_id, filters, auth, rate_limit_context, now) } - pub(crate) async fn query_req_with_auth( + pub(crate) async fn query_req_with_auth_report( &self, subscription_id: SubscriptionId, filters: Vec<Filter>, auth: &BaseAuthState, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<BaseRelayQueryReport, BaseRelayError> { let started_at = Instant::now(); let report = self .inner @@ -1051,7 +1051,7 @@ impl TangleRuntimeHandle { self.inner .metrics .record_query_latency(elapsed_micros(started_at)); - Ok(report.into_messages()) + Ok(report) } pub async fn event_by_offset_with_auth( @@ -4174,8 +4174,14 @@ mod tests { ) .await .expect("public req"); - assert_eq!(replies.len(), 1); - assert_eq!(replies[0], RelayMessage::Eose(subscription_id)); + assert_eq!( + replies, + vec![RelayMessage::Closed { + subscription_id, + message: "auth-required: authentication required to read group events" + .to_owned() + }] + ); })); let member_count_handle = handle.clone(); let mut auth = member_auth.clone(); diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -346,11 +346,13 @@ impl TangleWebSocketSession { self.subscriptions .ensure_can_subscribe(&subscription_id, &filters)?; } - let replies = self + let report = self .runtime - .query_req_with_auth(subscription_id.clone(), filters.clone(), &self.auth) + .query_req_with_auth_report(subscription_id.clone(), filters.clone(), &self.auth) .await?; - if should_subscribe { + let closes_subscription = report.group_read_denied(); + let replies = report.into_messages(); + if should_subscribe && !closes_subscription { self.subscriptions .subscribe(subscription_id.clone(), filters)?; metrics.record_subscription_opened(); @@ -962,6 +964,125 @@ mod tests { } #[tokio::test] + async fn websocket_session_redacted_initial_req_closes_without_live_subscription() { + let shutdown = TangleShutdownSignal::new(); + let root = temp_root("redacted-req-close"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config_with_groups(&root)).expect("runtime"), + ); + let mut owner_auth = runtime.auth_state().await.expect("owner auth"); + owner_auth + .issue_challenge("owner-redacted", UnixTimestamp::new(100)) + .expect("owner challenge"); + let owner_auth_event = tangle_v2_auth_event(FixtureKey::Owner, "owner-redacted", 120) + .expect("owner auth event"); + assert_eq!( + runtime + .handle_client_message( + ClientMessage::Auth(owner_auth_event.clone()), + &mut owner_auth, + UnixTimestamp::new(120) + ) + .await + .expect("owner auth"), + vec![RelayMessage::Ok { + event_id: owner_auth_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + let create = + tangle_v2_group_create_event(FixtureKey::Owner, "RedactedFarm", 121, &["private"]) + .expect("create"); + assert_eq!( + runtime + .handle_client_message( + ClientMessage::Event(create.clone()), + &mut owner_auth, + UnixTimestamp::new(121) + ) + .await + .expect("create"), + vec![RelayMessage::Ok { + event_id: create.id().clone(), + accepted: true, + message: String::new() + }] + ); + let public_event = + tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "public") + .expect("public"); + assert_eq!( + runtime + .handle_client_message( + ClientMessage::Event(public_event.clone()), + &mut owner_auth, + UnixTimestamp::new(122) + ) + .await + .expect("public event"), + vec![RelayMessage::Ok { + event_id: public_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + let private_event = + tangle_v2_group_event(FixtureKey::Owner, "RedactedFarm", 123, 1, "private") + .expect("private"); + assert_eq!( + runtime + .handle_client_message( + ClientMessage::Event(private_event.clone()), + &mut owner_auth, + UnixTimestamp::new(123) + ) + .await + .expect("private event"), + vec![RelayMessage::Ok { + event_id: private_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + + let events = runtime.subscribe_events().await; + let mut session = TangleWebSocketSession::new( + session_limits(8), + shutdown.subscribe(), + runtime.clone(), + runtime.auth_state().await.expect("session auth"), + events, + ) + .expect("session"); + let subscription_id = SubscriptionId::new("redacted-req").expect("subscription"); + assert_eq!( + session + .handle_client_message(ClientMessage::Req { + subscription_id: subscription_id.clone(), + filters: vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")], + }) + .await + .expect("redacted req"), + vec![ + RelayMessage::Event { + subscription_id: subscription_id.clone(), + event: public_event + }, + RelayMessage::Closed { + subscription_id, + message: "auth-required: authentication required to read group events" + .to_owned() + } + ] + ); + assert_eq!(session.active_subscription_count(), 0); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn websocket_session_preserves_chorus_close_scope_parity() { let shutdown = TangleShutdownSignal::new(); let root = temp_root("chorus-close-scope-parity"); diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -456,8 +456,12 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { vec![filter_group_tag(1, "h", "PrivateFarm")] ) .expect("unauth"), - vec![RelayMessage::Eose(unauth_id)] + vec![RelayMessage::Closed { + subscription_id: unauth_id, + message: "auth-required: authentication required to read group events".to_owned() + }] ); + assert_eq!(relay.active_subscription_count(), 0); assert_count( relay.handle_count( subscription("private-count-unauth"), @@ -510,12 +514,18 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { let live_unauth = subscription("live-private-unauth"); let live_owner = subscription("live-private-owner"); - relay - .handle_req( - live_unauth.clone(), - vec![filter_group_tag(1, "h", "PrivateFarm")], - ) - .expect("live unauth"); + assert_eq!( + relay + .handle_req( + live_unauth.clone(), + vec![filter_group_tag(1, "h", "PrivateFarm")], + ) + .expect("live unauth"), + vec![RelayMessage::Closed { + subscription_id: live_unauth, + message: "auth-required: authentication required to read group events".to_owned() + }] + ); relay .handle_req_with_auth( live_owner.clone(), @@ -540,13 +550,6 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() { matches!( message, RelayMessage::Event { subscription_id, event } - if subscription_id == &live_unauth && event.id() == second_private.id() - ) - })); - assert!(owner_live.iter().any(|message| { - matches!( - message, - RelayMessage::Event { subscription_id, event } if subscription_id == &live_owner && event.id() == second_private.id() ) })); @@ -677,15 +680,17 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { ); let private_unauth = subscription("private-leak-unauth"); - assert_event_query( + assert_eq!( relay .handle_req( private_unauth.clone(), vec![filter_group_tag(1, "h", "LeakPrivate")], ) .expect("private unauth"), - &private_unauth, - &[], + vec![RelayMessage::Closed { + subscription_id: private_unauth, + message: "auth-required: authentication required to read group events".to_owned() + }] ); assert_count( relay.handle_count( @@ -710,9 +715,18 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { let live_unauth = subscription("private-live-unauth"); let live_member = subscription("private-live-member"); - relay - .handle_req(live_unauth, vec![filter_group_tag(1, "h", "LeakPrivate")]) - .expect("private live unauth"); + assert_eq!( + relay + .handle_req( + live_unauth.clone(), + vec![filter_group_tag(1, "h", "LeakPrivate")], + ) + .expect("private live unauth"), + vec![RelayMessage::Closed { + subscription_id: live_unauth, + message: "auth-required: authentication required to read group events".to_owned() + }] + ); relay .handle_req_with_auth( live_member.clone(), @@ -935,15 +949,17 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() { 0, ); let deleted_query = subscription("deleted-target-query"); - assert_event_query( + assert_eq!( relay .handle_req( deleted_query.clone(), vec![filter_group_tag(1, "h", "LeakDeleted")], ) .expect("deleted query"), - &deleted_query, - &[], + vec![RelayMessage::Closed { + subscription_id: deleted_query, + message: "auth-required: authentication required to read group events".to_owned() + }] ); let delete_group = tangle_v2_delete_group_event(FixtureKey::Owner, "LeakDeleted", 73).expect("delete group"); @@ -1157,13 +1173,10 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ), 0, ); - assert!( - query_events( - &mut relay, - "tombstone-note-query", - vec![filter_group_tag(1, "h", "TombstoneFarm")] - ) - .is_empty() + assert_auth_required_redacted_query( + &mut relay, + "tombstone-note-query", + vec![filter_group_tag(1, "h", "TombstoneFarm")], ); for (subscription_id, query_id, kind) in [ ( @@ -1189,13 +1202,10 @@ fn group_tombstone_hides_prior_events_and_generated_snapshots() { ), 0, ); - assert!( - query_events( - &mut relay, - query_id, - vec![filter_group_tag(kind, "d", "TombstoneFarm")] - ) - .is_empty() + assert_auth_required_redacted_query( + &mut relay, + query_id, + vec![filter_group_tag(kind, "d", "TombstoneFarm")], ); } assert_count( @@ -2359,6 +2369,23 @@ fn query_events(relay: &mut BaseRelay, subscription_id: &str, filters: Vec<Filte events } +fn assert_auth_required_redacted_query( + relay: &mut BaseRelay, + subscription_id: &str, + filters: Vec<Filter>, +) { + let subscription_id = subscription(subscription_id); + assert_eq!( + relay + .handle_req(subscription_id.clone(), filters) + .expect("query"), + vec![RelayMessage::Closed { + subscription_id, + message: "auth-required: authentication required to read group events".to_owned() + }] + ); +} + fn sorted_strings(values: impl IntoIterator<Item = String>) -> Vec<String> { let mut values = values.into_iter().collect::<Vec<_>>(); values.sort(); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -836,7 +836,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ "PrivateSocket", ) .await; - assert_empty_req( + assert_redacted_req_closed( &mut observer, "private-members-public-query", json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["PrivateSocket"]}), @@ -929,7 +929,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ "invalid: filter field `approximate` is unsupported" ]) ); - assert_empty_req( + assert_redacted_req_closed( &mut observer, "private-public-query", json!({"kinds":[1], "#h":["PrivateSocket"]}), @@ -1012,7 +1012,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ ("hidden-admins-public-query", KIND_GROUP_ADMINS), ("hidden-members-public-query", KIND_GROUP_MEMBERS), ] { - assert_empty_req( + assert_redacted_req_closed( &mut observer, subscription_id, json!({"kinds":[kind], "#d":["HiddenSocket"]}), @@ -1126,7 +1126,7 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ "invalid: filter field `approximate` is unsupported" ]) ); - assert_empty_req( + assert_redacted_req_closed( &mut observer, "hidden-public-query", json!({"kinds":[1], "#h":["HiddenSocket"]}), @@ -2301,11 +2301,19 @@ async fn assert_count_closed( ); } -async fn assert_empty_req(socket: &mut TestWebSocket, subscription_id: &str, filter: Value) { +async fn assert_redacted_req_closed( + socket: &mut TestWebSocket, + subscription_id: &str, + filter: Value, +) { send_client_value(socket, json!(["REQ", subscription_id, filter])).await; assert_eq!( read_relay_value(socket).await, - json!(["EOSE", subscription_id]) + json!([ + "CLOSED", + subscription_id, + "auth-required: authentication required to read group events" + ]) ); }