commit 0a84d0a813e87869fa4d5a9e55f4ad86d1bf99d3
parent 3b55068d79195f859bc789729aae3e8307ec8bff
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 14:45:07 -0700
runtime: disable negentropy surface
Diffstat:
5 files changed, 336 insertions(+), 2 deletions(-)
diff --git a/crates/tangle_protocol/src/lib.rs b/crates/tangle_protocol/src/lib.rs
@@ -631,6 +631,16 @@ pub enum ClientMessage {
},
Close(SubscriptionId),
Auth(Event),
+ NegOpen {
+ subscription_id: SubscriptionId,
+ filter: Filter,
+ message: String,
+ },
+ NegMsg {
+ subscription_id: SubscriptionId,
+ message: String,
+ },
+ NegClose(SubscriptionId),
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -808,6 +818,14 @@ pub enum RelayMessage {
},
Notice(String),
Auth(String),
+ NegErr {
+ subscription_id: SubscriptionId,
+ message: String,
+ },
+ NegMsg {
+ subscription_id: SubscriptionId,
+ message: String,
+ },
}
impl RelayMessage {
@@ -844,6 +862,14 @@ pub fn relay_message_to_value(message: &RelayMessage) -> serde_json::Value {
} => serde_json::json!(["COUNT", subscription_id.as_str(), {"count": count}]),
RelayMessage::Notice(message) => serde_json::json!(["NOTICE", message]),
RelayMessage::Auth(challenge) => serde_json::json!(["AUTH", challenge]),
+ RelayMessage::NegErr {
+ subscription_id,
+ message,
+ } => serde_json::json!(["NEG-ERR", subscription_id.as_str(), message]),
+ RelayMessage::NegMsg {
+ subscription_id,
+ message,
+ } => serde_json::json!(["NEG-MSG", subscription_id.as_str(), message]),
}
}
@@ -2635,6 +2661,26 @@ mod tests {
}
#[test]
+ fn relay_message_encoder_emits_negentropy_messages() {
+ let subscription_id = SubscriptionId::new("neg-sub").expect("sub");
+
+ assert_eq!(
+ relay_message_to_value(&RelayMessage::NegErr {
+ subscription_id: subscription_id.clone(),
+ message: "blocked: Negentropy sync is disabled".to_owned()
+ }),
+ serde_json::json!(["NEG-ERR", "neg-sub", "blocked: Negentropy sync is disabled"])
+ );
+ assert_eq!(
+ relay_message_to_value(&RelayMessage::NegMsg {
+ subscription_id,
+ message: "00ff".to_owned()
+ }),
+ serde_json::json!(["NEG-MSG", "neg-sub", "00ff"])
+ );
+ }
+
+ #[test]
fn event_to_value_round_trips_with_event_parser() {
let event = parse_event_json(
&RawEventJson::new(&event_json("e", "f", 30402, tags_json())).expect("raw"),
diff --git a/crates/tangle_runtime/src/client_message.rs b/crates/tangle_runtime/src/client_message.rs
@@ -35,6 +35,9 @@ pub(crate) fn parse_runtime_client_message(raw: &str) -> Result<ClientMessage, S
}
}),
"CLOSE" => parse_close(&values),
+ "NEG-OPEN" => parse_neg_open(&values),
+ "NEG-MSG" => parse_neg_msg(&values),
+ "NEG-CLOSE" => parse_neg_close(&values),
unsupported => Err(format!(
"client message command `{unsupported}` is unsupported"
)),
@@ -79,6 +82,37 @@ fn parse_close(values: &[Box<RawValue>]) -> Result<ClientMessage, String> {
parse_subscription_id(&values[1], "CLOSE").map(ClientMessage::Close)
}
+fn parse_neg_open(values: &[Box<RawValue>]) -> Result<ClientMessage, String> {
+ if values.len() != 4 {
+ return Err(
+ "NEG-OPEN client message must contain a subscription id, filter, and message"
+ .to_owned(),
+ );
+ }
+ Ok(ClientMessage::NegOpen {
+ subscription_id: parse_subscription_id(&values[1], "NEG-OPEN")?,
+ filter: parse_filter(&values[2])?,
+ message: parse_negentropy_message(&values[3], "NEG-OPEN")?,
+ })
+}
+
+fn parse_neg_msg(values: &[Box<RawValue>]) -> Result<ClientMessage, String> {
+ if values.len() != 3 {
+ return Err("NEG-MSG client message must contain a subscription id and message".to_owned());
+ }
+ Ok(ClientMessage::NegMsg {
+ subscription_id: parse_subscription_id(&values[1], "NEG-MSG")?,
+ message: parse_negentropy_message(&values[2], "NEG-MSG")?,
+ })
+}
+
+fn parse_neg_close(values: &[Box<RawValue>]) -> Result<ClientMessage, String> {
+ if values.len() != 2 {
+ return Err("NEG-CLOSE client message must contain exactly 2 elements".to_owned());
+ }
+ parse_subscription_id(&values[1], "NEG-CLOSE").map(ClientMessage::NegClose)
+}
+
fn parse_subscription_id(
value: &RawValue,
command: &'static str,
@@ -88,6 +122,22 @@ fn parse_subscription_id(
.and_then(|subscription_id| SubscriptionId::new(&subscription_id))
}
+fn parse_negentropy_message(value: &RawValue, command: &'static str) -> Result<String, String> {
+ let message = serde_json::from_str::<String>(value.get())
+ .map_err(|_| format!("{command} message must be a string"))?;
+ if message.len() % 2 == 0
+ && message
+ .bytes()
+ .all(|byte| matches!(byte, b'0'..=b'9' | b'a'..=b'f'))
+ {
+ Ok(message)
+ } else {
+ Err(format!(
+ "{command} message must be a lowercase even-length hex string"
+ ))
+ }
+}
+
fn parse_filter(value: &RawValue) -> Result<Filter, String> {
let shape = inspect_filter_shape(value.get())?;
let search = shape.search;
@@ -352,6 +402,33 @@ mod tests {
}
#[test]
+ fn runtime_parser_maps_negentropy_commands_without_protocol_parser_delegation() {
+ let filter = filter_from_value(&json!({"kinds": [1]})).expect("filter");
+ assert_eq!(
+ parse_runtime_client_message(
+ &json!(["NEG-OPEN", "neg-sub", json!({"kinds": [1]}), "00ff"]).to_string()
+ )
+ .expect("neg open"),
+ ClientMessage::NegOpen {
+ subscription_id: "neg-sub".parse().expect("subscription"),
+ filter,
+ message: "00ff".to_owned()
+ }
+ );
+ assert_eq!(
+ parse_runtime_client_message("[\"NEG-MSG\",\"neg-sub\",\"\"]").expect("neg msg"),
+ ClientMessage::NegMsg {
+ subscription_id: "neg-sub".parse().expect("subscription"),
+ message: String::new()
+ }
+ );
+ assert_eq!(
+ parse_runtime_client_message("[\"NEG-CLOSE\",\"neg-sub\"]").expect("neg close"),
+ ClientMessage::NegClose("neg-sub".parse().expect("subscription"))
+ );
+ }
+
+ #[test]
fn runtime_parser_preserves_search_rejection_marker_before_dispatch() {
let ClientMessage::Req { filters, .. } =
parse_runtime_client_message("[\"REQ\",\"sub\",{\"search\":\"carrots\",\"limit\":1}]")
@@ -394,4 +471,41 @@ mod tests {
assert!(actual.contains(expected), "{actual}");
}
}
+
+ #[test]
+ fn runtime_parser_rejects_malformed_negentropy_commands() {
+ for (raw, expected) in [
+ (
+ "[\"NEG-OPEN\",\"sub\",{}]",
+ "NEG-OPEN client message must contain a subscription id, filter, and message",
+ ),
+ (
+ "[\"NEG-OPEN\",1,{},\"00\"]",
+ "NEG-OPEN subscription id must be a string",
+ ),
+ (
+ "[\"NEG-OPEN\",\"sub\",1,\"00\"]",
+ "expected a filter JSON object",
+ ),
+ (
+ "[\"NEG-OPEN\",\"sub\",{},1]",
+ "NEG-OPEN message must be a string",
+ ),
+ (
+ "[\"NEG-MSG\",\"sub\",\"0\"]",
+ "NEG-MSG message must be a lowercase even-length hex string",
+ ),
+ (
+ "[\"NEG-MSG\",\"sub\",\"0G\"]",
+ "NEG-MSG message must be a lowercase even-length hex string",
+ ),
+ (
+ "[\"NEG-CLOSE\",\"sub\",{}]",
+ "NEG-CLOSE client message must contain exactly 2 elements",
+ ),
+ ] {
+ let actual = parse_runtime_client_message(raw).expect_err(raw);
+ assert!(actual.contains(expected), "{actual}");
+ }
+ }
}
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -25,6 +25,8 @@ use tangle_store_pocket::{
PocketQueryConfig, PocketScreenResult, PocketStoreConfig, PocketStoreHandle,
};
+pub(crate) const NEGENTROPY_DISABLED_MESSAGE: &str = "blocked: Negentropy sync is disabled";
+
pub struct BaseRelay {
store: PocketStoreHandle,
subscriptions: LiveSubscriptionSet,
@@ -598,6 +600,20 @@ impl BaseRelay {
Ok(Vec::new())
}
ClientMessage::Auth(event) => Ok(self.handle_auth_message(event, auth, now)),
+ ClientMessage::NegOpen {
+ subscription_id, ..
+ }
+ | ClientMessage::NegMsg {
+ subscription_id, ..
+ } => Ok(vec![Self::disabled_negentropy_message(subscription_id)]),
+ ClientMessage::NegClose(_) => Ok(Vec::new()),
+ }
+ }
+
+ pub(crate) fn disabled_negentropy_message(subscription_id: SubscriptionId) -> RelayMessage {
+ RelayMessage::NegErr {
+ subscription_id,
+ message: NEGENTROPY_DISABLED_MESSAGE.to_owned(),
}
}
@@ -1362,7 +1378,7 @@ fn filters_are_complete(filters: &[Filter]) -> bool {
#[cfg(test)]
mod tests {
- use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits};
+ use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE};
use crate::pocket_conversion::tangle_event_to_pocket;
use crate::relay::auth::BaseAuthState;
use crate::relay::live::CloseResult;
@@ -1517,6 +1533,58 @@ mod tests {
}
#[test]
+ fn base_relay_dispatch_returns_disabled_negentropy_surface() {
+ let mut relay = test_relay("base-relay-negentropy-disabled", 4);
+ let mut auth =
+ BaseAuthState::new("wss://relay.radroots.test", 60, 600).expect("auth state");
+ let subscription_id = SubscriptionId::new("neg-sub").expect("sub");
+
+ assert_eq!(
+ relay
+ .handle_client_message(
+ ClientMessage::NegOpen {
+ subscription_id: subscription_id.clone(),
+ filter: Filter::empty(),
+ message: "00".to_owned()
+ },
+ &mut auth,
+ UnixTimestamp::new(100)
+ )
+ .expect("neg open"),
+ vec![RelayMessage::NegErr {
+ subscription_id: subscription_id.clone(),
+ message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
+ }]
+ );
+ assert_eq!(
+ relay
+ .handle_client_message(
+ ClientMessage::NegMsg {
+ subscription_id: subscription_id.clone(),
+ message: String::new()
+ },
+ &mut auth,
+ UnixTimestamp::new(101)
+ )
+ .expect("neg msg"),
+ vec![RelayMessage::NegErr {
+ subscription_id: subscription_id.clone(),
+ message: NEGENTROPY_DISABLED_MESSAGE.to_owned()
+ }]
+ );
+ assert_eq!(
+ relay
+ .handle_client_message(
+ ClientMessage::NegClose(subscription_id),
+ &mut auth,
+ UnixTimestamp::new(102)
+ )
+ .expect("neg close"),
+ Vec::<RelayMessage>::new()
+ );
+ }
+
+ #[test]
fn base_relay_fetches_events_by_store_offset() {
let relay = test_relay("base-relay-offset-lookup", 4);
let event = signed_public_event(7, 1, Vec::new(), "offset");
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -1008,6 +1008,27 @@ impl TangleRuntimeHandle {
.validate_subscription_id(&subscription_id)?;
Ok(Vec::new())
}
+ ClientMessage::NegOpen {
+ subscription_id, ..
+ }
+ | ClientMessage::NegMsg {
+ subscription_id, ..
+ } => {
+ self.inner
+ .limits
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ Ok(vec![BaseRelay::disabled_negentropy_message(
+ subscription_id,
+ )])
+ }
+ ClientMessage::NegClose(subscription_id) => {
+ self.inner
+ .limits
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ Ok(Vec::new())
+ }
}
}
@@ -1157,6 +1178,9 @@ fn client_message_metric_kind(message: &ClientMessage) -> TangleClientMessageMet
ClientMessage::Count { .. } => TangleClientMessageMetricKind::Count,
ClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth,
ClientMessage::Close(_) => TangleClientMessageMetricKind::Close,
+ ClientMessage::NegOpen { .. }
+ | ClientMessage::NegMsg { .. }
+ | ClientMessage::NegClose(_) => TangleClientMessageMetricKind::Negentropy,
}
}
@@ -1309,6 +1333,7 @@ pub enum TangleClientMessageMetricKind {
Count,
Auth,
Close,
+ Negentropy,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -1692,6 +1717,7 @@ impl TangleRuntimeMetrics {
TangleClientMessageMetricKind::Close => {
self.inner.close_messages.fetch_add(1, Ordering::Relaxed);
}
+ TangleClientMessageMetricKind::Negentropy => {}
};
total
}
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -571,7 +571,9 @@ mod tests {
),
(
"[\"NEG-OPEN\",\"sub\",{}]",
- Some("[\"NOTICE\",\"invalid: client message command `NEG-OPEN` is unsupported\"]"),
+ Some(
+ "[\"NOTICE\",\"invalid: NEG-OPEN client message must contain a subscription id, filter, and message\"]",
+ ),
),
(
"[\"REQ\"]",
@@ -620,6 +622,84 @@ mod tests {
}
#[tokio::test]
+ async fn websocket_session_returns_disabled_negentropy_errors() {
+ let shutdown = TangleShutdownSignal::new();
+ let (runtime, auth, events) = session_runtime("disabled-negentropy");
+ let mut session = TangleWebSocketSession::new(
+ session_limits(16),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
+
+ assert_eq!(
+ session
+ .dispatch_text("[\"NEG-OPEN\",\"neg-sub\",{\"kinds\":[1]},\"00\"]")
+ .await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]"
+ );
+ assert_eq!(
+ session
+ .dispatch_text("[\"NEG-MSG\",\"neg-sub\",\"\"]")
+ .await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ "[\"NEG-ERR\",\"neg-sub\",\"blocked: Negentropy sync is disabled\"]"
+ );
+ assert_eq!(
+ session.dispatch_text("[\"NEG-CLOSE\",\"neg-sub\"]").await,
+ TangleSessionControl::Continue
+ );
+ assert!(session.outbound_receiver.try_recv().is_err());
+ }
+
+ #[tokio::test]
+ async fn websocket_session_disabled_negentropy_privacy_response_omits_filter_material() {
+ let shutdown = TangleShutdownSignal::new();
+ let (runtime, auth, events) = session_runtime("disabled-negentropy-privacy");
+ let mut session = TangleWebSocketSession::new(
+ session_limits(16),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
+ let hidden_event_id = "a".repeat(64);
+ let private_group_id = "private-group-alpha";
+ let raw = json!([
+ "NEG-OPEN",
+ "neg-private",
+ {"ids": [hidden_event_id], "#h": [private_group_id]},
+ "00"
+ ])
+ .to_string();
+
+ assert_eq!(
+ session.dispatch_text(&raw).await,
+ TangleSessionControl::Continue
+ );
+ let text = take_outbound_text(&mut session);
+
+ assert_eq!(
+ text,
+ "[\"NEG-ERR\",\"neg-private\",\"blocked: Negentropy sync is disabled\"]"
+ );
+ assert!(!text.contains(private_group_id));
+ assert!(!text.contains(&hidden_event_id));
+ assert!(!text.contains("inventory"));
+ assert!(!text.contains("#h"));
+ }
+
+ #[tokio::test]
async fn websocket_session_scopes_subscriptions_per_connection() {
let shutdown = TangleShutdownSignal::new();
let root = temp_root("connection-scope");