commit 99e9842b4bc4156c8017faa6b110840a33423d03
parent a8924637a85671bd35eb040bc217e2ea794dedf4
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 06:33:18 -0700
limits: enforce runtime resource caps
- Replace the single production max_pending_events knob with explicit message, subscription, filter, event, broadcast, and outbound queue limits.
- Thread BaseRelayLimits through relay, runtime, session, server, benchmark, and integration construction paths.
- Enforce event tag/content caps, request filter caps, subscription id length, subscription count, default limit, and raw websocket message length.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
10 files changed, 1109 insertions(+), 139 deletions(-)
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -125,7 +125,17 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": 1024
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 4096,
+ "per_connection_outbound_queue": 256
}
})
.to_string(),
diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs
@@ -11,7 +11,10 @@ use tangle_groups::{KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
use tangle_protocol::{
Event, Filter, RelayMessage, SubscriptionId, event_to_value, filter_from_value,
};
-use tangle_runtime::relay::{auth::BaseAuthState, core::BaseRelay};
+use tangle_runtime::relay::{
+ auth::BaseAuthState,
+ core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits},
+};
use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy};
use tangle_test_support::{
FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_config,
@@ -708,8 +711,12 @@ fn run_projection_rebuild_benchmark(dataset: &BenchDataset) -> Result<ScenarioRe
.shutdown()
.map_err(|error| error.to_string())?;
let started = Instant::now();
- let reopened = BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?)
- .map_err(|error| error.to_string())?;
+ let reopened = BaseRelay::open_with_groups(
+ &materialized.store_config,
+ relay_limits(128),
+ &group_config()?,
+ )
+ .map_err(|error| error.to_string())?;
let elapsed = elapsed_micros(started);
let projection = reopened
.group_projection()
@@ -742,13 +749,20 @@ fn run_outbox_replay_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport,
.shutdown()
.map_err(|error| error.to_string())?;
let started = Instant::now();
- let mut reopened =
- BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?)
- .map_err(|error| error.to_string())?;
+ let mut reopened = BaseRelay::open_with_groups(
+ &materialized.store_config,
+ relay_limits(128),
+ &group_config()?,
+ )
+ .map_err(|error| error.to_string())?;
let after_first = generated_state_counts(&reopened)?;
reopened.shutdown().map_err(|error| error.to_string())?;
- let reopened = BaseRelay::open_with_groups(&materialized.store_config, 128, &group_config()?)
- .map_err(|error| error.to_string())?;
+ let reopened = BaseRelay::open_with_groups(
+ &materialized.store_config,
+ relay_limits(128),
+ &group_config()?,
+ )
+ .map_err(|error| error.to_string())?;
let after_second = generated_state_counts(&reopened)?;
let elapsed = elapsed_micros(started);
let accepted = u64::from(before == after_first && before == after_second);
@@ -847,9 +861,12 @@ fn materialize_dataset(
max_pending_events: usize,
) -> Result<MaterializedBenchRelay, String> {
let store_config = bench_store_config(run_name)?;
- let mut relay =
- BaseRelay::open_with_groups(&store_config, max_pending_events, &group_config()?)
- .map_err(|error| error.to_string())?;
+ let mut relay = BaseRelay::open_with_groups(
+ &store_config,
+ relay_limits(max_pending_events),
+ &group_config()?,
+ )
+ .map_err(|error| error.to_string())?;
let owner_auth = authenticated(FixtureKey::Owner)?;
let admin_auth = authenticated(FixtureKey::Admin)?;
let started = Instant::now();
@@ -893,6 +910,21 @@ fn materialize_dataset(
})
}
+fn relay_limits(max_pending_events: usize) -> BaseRelayLimits {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events,
+ max_subscription_id_length: 64,
+ max_subscriptions: 512,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })
+ .expect("benchmark relay limits")
+}
+
#[derive(Clone)]
struct QueryOperation {
name: &'static str,
diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs
@@ -2,11 +2,15 @@
use crate::{
errors::BaseRelayError,
- relay::{auth::BaseAuthState, core::BaseRelay},
+ relay::{
+ auth::BaseAuthState,
+ core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits},
+ },
};
use serde::Deserialize;
use std::{net::SocketAddr, path::PathBuf};
use tangle_groups::GroupRuntimeConfig;
+use tangle_protocol::SubscriptionId;
use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy};
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -17,7 +21,7 @@ pub struct BaseRelayRuntimeConfig {
groups: GroupRuntimeConfig,
auth_ttl_seconds: u64,
auth_created_at_skew_seconds: u64,
- max_pending_events: usize,
+ limits: BaseRelayRuntimeLimitsConfig,
tracing: BaseRelayTracingConfig,
}
@@ -46,8 +50,8 @@ impl BaseRelayRuntimeConfig {
self.auth_created_at_skew_seconds
}
- pub fn max_pending_events(&self) -> usize {
- self.max_pending_events
+ pub fn limits(&self) -> BaseRelayRuntimeLimitsConfig {
+ self.limits
}
pub fn tracing(&self) -> &BaseRelayTracingConfig {
@@ -55,7 +59,7 @@ impl BaseRelayRuntimeConfig {
}
pub fn open_relay(&self) -> Result<BaseRelay, BaseRelayError> {
- BaseRelay::open_with_groups(&self.pocket, self.max_pending_events, &self.groups)
+ BaseRelay::open_with_groups(&self.pocket, self.limits.base_relay_limits()?, &self.groups)
}
pub fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> {
@@ -132,6 +136,134 @@ impl Default for BaseRelayTracingConfig {
}
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BaseRelayRuntimeLimitsConfig {
+ max_message_length: usize,
+ max_subid_length: usize,
+ max_subscriptions_per_connection: usize,
+ max_filters_per_request: usize,
+ max_tag_values_per_filter: usize,
+ max_limit: u64,
+ default_limit: u64,
+ max_event_tags: usize,
+ max_content_length: usize,
+ broadcast_channel_capacity: usize,
+ per_connection_outbound_queue: usize,
+}
+
+impl BaseRelayRuntimeLimitsConfig {
+ fn from_document(document: BaseRelayRuntimeLimitsDocument) -> Result<Self, BaseRelayError> {
+ require_positive("limits.max_message_length", document.max_message_length)?;
+ require_positive("limits.max_subid_length", document.max_subid_length)?;
+ require_positive(
+ "limits.max_subscriptions_per_connection",
+ document.max_subscriptions_per_connection,
+ )?;
+ require_positive(
+ "limits.max_filters_per_request",
+ document.max_filters_per_request,
+ )?;
+ require_positive(
+ "limits.max_tag_values_per_filter",
+ document.max_tag_values_per_filter,
+ )?;
+ require_positive_u64("limits.max_limit", document.max_limit)?;
+ require_positive_u64("limits.default_limit", document.default_limit)?;
+ require_positive("limits.max_event_tags", document.max_event_tags)?;
+ require_positive("limits.max_content_length", document.max_content_length)?;
+ require_positive(
+ "limits.broadcast_channel_capacity",
+ document.broadcast_channel_capacity,
+ )?;
+ require_positive(
+ "limits.per_connection_outbound_queue",
+ document.per_connection_outbound_queue,
+ )?;
+ if document.max_subid_length > SubscriptionId::MAX_LENGTH {
+ return Err(BaseRelayError::invalid(format!(
+ "limits.max_subid_length must be less than or equal to {}",
+ SubscriptionId::MAX_LENGTH
+ )));
+ }
+ if document.default_limit > document.max_limit {
+ return Err(BaseRelayError::invalid(
+ "limits.default_limit must be less than or equal to limits.max_limit",
+ ));
+ }
+ Ok(Self {
+ max_message_length: document.max_message_length,
+ max_subid_length: document.max_subid_length,
+ max_subscriptions_per_connection: document.max_subscriptions_per_connection,
+ max_filters_per_request: document.max_filters_per_request,
+ max_tag_values_per_filter: document.max_tag_values_per_filter,
+ max_limit: document.max_limit,
+ default_limit: document.default_limit,
+ max_event_tags: document.max_event_tags,
+ max_content_length: document.max_content_length,
+ broadcast_channel_capacity: document.broadcast_channel_capacity,
+ per_connection_outbound_queue: document.per_connection_outbound_queue,
+ })
+ }
+
+ pub fn max_message_length(self) -> usize {
+ self.max_message_length
+ }
+
+ pub fn max_subid_length(self) -> usize {
+ self.max_subid_length
+ }
+
+ pub fn max_subscriptions_per_connection(self) -> usize {
+ self.max_subscriptions_per_connection
+ }
+
+ pub fn max_filters_per_request(self) -> usize {
+ self.max_filters_per_request
+ }
+
+ pub fn max_tag_values_per_filter(self) -> usize {
+ self.max_tag_values_per_filter
+ }
+
+ pub fn max_limit(self) -> u64 {
+ self.max_limit
+ }
+
+ pub fn default_limit(self) -> u64 {
+ self.default_limit
+ }
+
+ pub fn max_event_tags(self) -> usize {
+ self.max_event_tags
+ }
+
+ pub fn max_content_length(self) -> usize {
+ self.max_content_length
+ }
+
+ pub fn broadcast_channel_capacity(self) -> usize {
+ self.broadcast_channel_capacity
+ }
+
+ pub fn per_connection_outbound_queue(self) -> usize {
+ self.per_connection_outbound_queue
+ }
+
+ pub fn base_relay_limits(self) -> Result<BaseRelayLimits, BaseRelayError> {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events: self.per_connection_outbound_queue,
+ max_subscription_id_length: self.max_subid_length,
+ max_subscriptions: self.max_subscriptions_per_connection,
+ max_filters_per_request: self.max_filters_per_request,
+ max_tag_values_per_filter: self.max_tag_values_per_filter,
+ max_event_tags: self.max_event_tags,
+ max_content_length: self.max_content_length,
+ max_limit: self.max_limit,
+ default_limit: self.default_limit,
+ })
+ }
+}
+
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct BaseRelayRuntimeConfigDocument {
@@ -177,7 +309,17 @@ struct BaseRelayAuthConfigDocument {
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct BaseRelayRuntimeLimitsDocument {
- max_pending_events: usize,
+ max_message_length: usize,
+ max_subid_length: usize,
+ max_subscriptions_per_connection: usize,
+ max_filters_per_request: usize,
+ max_tag_values_per_filter: usize,
+ max_limit: u64,
+ default_limit: u64,
+ max_event_tags: usize,
+ max_content_length: usize,
+ broadcast_channel_capacity: usize,
+ per_connection_outbound_queue: usize,
}
#[derive(Debug, Default, Deserialize)]
@@ -233,11 +375,7 @@ pub fn parse_base_relay_runtime_config_json(
})?;
let groups = tangle_groups::parse_group_runtime_config_json(&groups_raw)
.map_err(|error| BaseRelayError::invalid(error.to_string()))?;
- if document.limits.max_pending_events == 0 {
- return Err(BaseRelayError::invalid(
- "limits.max_pending_events must be greater than zero",
- ));
- }
+ let limits = BaseRelayRuntimeLimitsConfig::from_document(document.limits)?;
if document.auth.created_at_skew_seconds == 0 {
return Err(BaseRelayError::invalid(
"auth.created_at_skew_seconds must be greater than zero",
@@ -251,11 +389,29 @@ pub fn parse_base_relay_runtime_config_json(
groups,
auth_ttl_seconds: document.auth.challenge_ttl_seconds,
auth_created_at_skew_seconds: document.auth.created_at_skew_seconds,
- max_pending_events: document.limits.max_pending_events,
+ limits,
tracing,
})
}
+fn require_positive(field: &str, value: usize) -> Result<(), BaseRelayError> {
+ if value == 0 {
+ return Err(BaseRelayError::invalid(format!(
+ "{field} must be greater than zero"
+ )));
+ }
+ Ok(())
+}
+
+fn require_positive_u64(field: &str, value: u64) -> Result<(), BaseRelayError> {
+ if value == 0 {
+ return Err(BaseRelayError::invalid(format!(
+ "{field} must be greater than zero"
+ )));
+ }
+ Ok(())
+}
+
fn base_relay_tracing_config_from_document(
document: BaseRelayTracingConfigDocument,
) -> Result<BaseRelayTracingConfig, BaseRelayError> {
@@ -303,7 +459,17 @@ mod tests {
assert!(config.groups().enabled());
assert_eq!(config.auth_ttl_seconds(), 300);
assert_eq!(config.auth_created_at_skew_seconds(), 600);
- assert_eq!(config.max_pending_events(), 1024);
+ assert_eq!(config.limits().max_message_length(), 1_048_576);
+ assert_eq!(config.limits().max_subid_length(), 64);
+ assert_eq!(config.limits().max_subscriptions_per_connection(), 64);
+ assert_eq!(config.limits().max_filters_per_request(), 10);
+ assert_eq!(config.limits().max_tag_values_per_filter(), 100);
+ assert_eq!(config.limits().max_limit(), 500);
+ assert_eq!(config.limits().default_limit(), 100);
+ assert_eq!(config.limits().max_event_tags(), 200);
+ assert_eq!(config.limits().max_content_length(), 65_536);
+ assert_eq!(config.limits().broadcast_channel_capacity(), 4_096);
+ assert_eq!(config.limits().per_connection_outbound_queue(), 256);
assert!(config.tracing().enabled());
assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json);
config.auth_state().expect("auth");
@@ -330,7 +496,17 @@ mod tests {
"created_at_skew_seconds": 0
},
"limits": {
- "max_pending_events": 8
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 4096,
+ "per_connection_outbound_queue": 256
}
}"#;
@@ -363,7 +539,17 @@ mod tests {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": 8
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 4096,
+ "per_connection_outbound_queue": 256
},
"ignored": true
}"#;
@@ -393,7 +579,17 @@ mod tests {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": 8,
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 4096,
+ "per_connection_outbound_queue": 256,
"max_unimplemented_limit": 99
}
}"#;
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -22,6 +22,7 @@ pub struct BaseRelay {
subscriptions: LiveSubscriptionSet,
groups: Option<GroupService>,
readiness: BaseRelayReadinessState,
+ limits: BaseRelayLimits,
}
#[derive(Debug, Clone, PartialEq)]
@@ -71,44 +72,244 @@ impl BaseRelayShutdownReport {
}
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BaseRelayLimits {
+ max_pending_events: usize,
+ max_subscription_id_length: usize,
+ max_subscriptions: usize,
+ max_filters_per_request: usize,
+ max_tag_values_per_filter: usize,
+ max_event_tags: usize,
+ max_content_length: usize,
+ max_limit: u64,
+ default_limit: u64,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct BaseRelayLimitSettings {
+ pub max_pending_events: usize,
+ pub max_subscription_id_length: usize,
+ pub max_subscriptions: usize,
+ pub max_filters_per_request: usize,
+ pub max_tag_values_per_filter: usize,
+ pub max_event_tags: usize,
+ pub max_content_length: usize,
+ pub max_limit: u64,
+ pub default_limit: u64,
+}
+
+impl BaseRelayLimits {
+ pub fn new(settings: BaseRelayLimitSettings) -> Result<Self, BaseRelayError> {
+ let max_pending_events = settings.max_pending_events;
+ let max_subscription_id_length = settings.max_subscription_id_length;
+ let max_subscriptions = settings.max_subscriptions;
+ let max_filters_per_request = settings.max_filters_per_request;
+ let max_tag_values_per_filter = settings.max_tag_values_per_filter;
+ let max_event_tags = settings.max_event_tags;
+ let max_content_length = settings.max_content_length;
+ let max_limit = settings.max_limit;
+ let default_limit = settings.default_limit;
+ if max_pending_events == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max pending events must be greater than zero",
+ ));
+ }
+ if max_subscription_id_length == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max subscription id length must be greater than zero",
+ ));
+ }
+ if max_subscriptions == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max subscriptions per connection must be greater than zero",
+ ));
+ }
+ if max_filters_per_request == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max filters per request must be greater than zero",
+ ));
+ }
+ if max_tag_values_per_filter == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max tag values per filter must be greater than zero",
+ ));
+ }
+ if max_event_tags == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max event tags must be greater than zero",
+ ));
+ }
+ if max_content_length == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max content length must be greater than zero",
+ ));
+ }
+ if max_limit == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime max filter limit must be greater than zero",
+ ));
+ }
+ if default_limit == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime default filter limit must be greater than zero",
+ ));
+ }
+ if default_limit > max_limit {
+ return Err(BaseRelayError::invalid(
+ "runtime default filter limit must not exceed max filter limit",
+ ));
+ }
+ Ok(Self {
+ max_pending_events,
+ max_subscription_id_length,
+ max_subscriptions,
+ max_filters_per_request,
+ max_tag_values_per_filter,
+ max_event_tags,
+ max_content_length,
+ max_limit,
+ default_limit,
+ })
+ }
+
+ pub fn max_pending_events(self) -> usize {
+ self.max_pending_events
+ }
+
+ pub fn max_subscription_id_length(self) -> usize {
+ self.max_subscription_id_length
+ }
+
+ pub fn max_subscriptions(self) -> usize {
+ self.max_subscriptions
+ }
+
+ pub fn max_filters_per_request(self) -> usize {
+ self.max_filters_per_request
+ }
+
+ pub fn max_tag_values_per_filter(self) -> usize {
+ self.max_tag_values_per_filter
+ }
+
+ pub fn max_event_tags(self) -> usize {
+ self.max_event_tags
+ }
+
+ pub fn max_content_length(self) -> usize {
+ self.max_content_length
+ }
+
+ pub fn max_limit(self) -> u64 {
+ self.max_limit
+ }
+
+ pub fn default_limit(self) -> u64 {
+ self.default_limit
+ }
+
+ pub fn validate_event(&self, event: &Event) -> Result<(), BaseRelayError> {
+ if event.unsigned().tags().len() > self.max_event_tags {
+ return Err(BaseRelayError::invalid(format!(
+ "event tag count exceeds runtime max_event_tags {}",
+ self.max_event_tags
+ )));
+ }
+ if event.unsigned().content().len() > self.max_content_length {
+ return Err(BaseRelayError::invalid(format!(
+ "event content length exceeds runtime max_content_length {}",
+ self.max_content_length
+ )));
+ }
+ Ok(())
+ }
+
+ pub fn validate_subscription_id(
+ &self,
+ subscription_id: &SubscriptionId,
+ ) -> Result<(), BaseRelayError> {
+ let actual = subscription_id.as_str().chars().count();
+ if actual > self.max_subscription_id_length {
+ return Err(BaseRelayError::invalid(format!(
+ "subscription id length exceeds runtime max_subid_length {}",
+ self.max_subscription_id_length
+ )));
+ }
+ Ok(())
+ }
+
+ pub fn validate_filters(&self, filters: &[Filter]) -> Result<(), BaseRelayError> {
+ if filters.is_empty() {
+ return Err(BaseRelayError::invalid(
+ "request must include at least one filter",
+ ));
+ }
+ if filters.len() > self.max_filters_per_request {
+ return Err(BaseRelayError::invalid(format!(
+ "filter count exceeds runtime max_filters_per_request {}",
+ self.max_filters_per_request
+ )));
+ }
+ for filter in filters {
+ let tag_values = filter.tag_filters().values().map(Vec::len).sum::<usize>();
+ if tag_values > self.max_tag_values_per_filter {
+ return Err(BaseRelayError::invalid(format!(
+ "filter tag value count exceeds runtime max_tag_values_per_filter {}",
+ self.max_tag_values_per_filter
+ )));
+ }
+ if filter.limit().is_some_and(|limit| limit > self.max_limit) {
+ return Err(BaseRelayError::invalid(format!(
+ "filter limit exceeds runtime max_limit {}",
+ self.max_limit
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ fn effective_filter_limit(self, filter: &Filter) -> usize {
+ usize::try_from(filter.limit().unwrap_or(self.default_limit)).unwrap_or(usize::MAX)
+ }
+}
+
impl BaseRelay {
pub fn open(
config: &PocketStoreConfig,
- max_pending_events: usize,
+ limits: BaseRelayLimits,
) -> Result<Self, BaseRelayError> {
let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?;
- Self::new(store, max_pending_events)
+ Self::new(store, limits)
}
pub fn open_with_groups(
config: &PocketStoreConfig,
- max_pending_events: usize,
+ limits: BaseRelayLimits,
groups: &GroupRuntimeConfig,
) -> Result<Self, BaseRelayError> {
let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?;
- Self::new_with_groups(store, max_pending_events, groups)
+ Self::new_with_groups(store, limits, groups)
}
- pub fn new(
- store: PocketStoreHandle,
- max_pending_events: usize,
- ) -> Result<Self, BaseRelayError> {
- Self::new_with_groups(store, max_pending_events, &GroupRuntimeConfig::disabled())
+ pub fn new(store: PocketStoreHandle, limits: BaseRelayLimits) -> Result<Self, BaseRelayError> {
+ Self::new_with_groups(store, limits, &GroupRuntimeConfig::disabled())
}
pub fn new_with_groups(
store: PocketStoreHandle,
- max_pending_events: usize,
+ limits: BaseRelayLimits,
groups: &GroupRuntimeConfig,
) -> Result<Self, BaseRelayError> {
let groups = GroupService::from_config(&store, groups)?;
- let subscriptions = LiveSubscriptionSet::new(max_pending_events)?;
+ let subscriptions =
+ LiveSubscriptionSet::new(limits.max_pending_events(), limits.max_subscriptions())?;
let readiness = BaseRelayReadinessState::ready();
Ok(Self {
store,
subscriptions,
groups,
readiness,
+ limits,
})
}
@@ -180,6 +381,7 @@ impl BaseRelay {
filters: &[Filter],
auth: &BaseAuthState,
) -> Result<Vec<Event>, BaseRelayError> {
+ self.limits.validate_filters(filters)?;
self.query_events(
filters,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -192,6 +394,13 @@ impl BaseRelay {
auth: &mut BaseAuthState,
now: UnixTimestamp,
) -> Vec<RelayMessage> {
+ if let Err(error) = self.limits.validate_event(&event) {
+ return vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: false,
+ message: error.prefixed_message(),
+ }];
+ }
auth.authenticate(&event, now)
.map(|_| {
vec![RelayMessage::Ok {
@@ -258,6 +467,12 @@ impl BaseRelay {
auth: &GroupAuthContext,
) -> Result<BaseRelayEventWrite, BaseRelayError> {
let event_id = event.id().clone();
+ if let Err(error) = self.limits.validate_event(&event) {
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
+ event_id,
+ error.prefixed_message(),
+ )));
+ }
if let Err(error) = verify_event_signature(&event) {
return Ok(BaseRelayEventWrite::unstored(ok_rejected(
event_id,
@@ -359,6 +574,8 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.limits.validate_subscription_id(&subscription_id)?;
+ self.limits.validate_filters(&filters)?;
self.subscriptions
.subscribe(subscription_id.clone(), filters.clone(), auth.clone())?;
self.query_req_with_group_auth(subscription_id, filters, auth)
@@ -370,6 +587,8 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.limits.validate_subscription_id(&subscription_id)?;
+ self.limits.validate_filters(&filters)?;
let mut messages = self
.query_events(&filters, auth)?
.into_iter()
@@ -413,6 +632,8 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<RelayMessage, BaseRelayError> {
+ self.limits.validate_subscription_id(&subscription_id)?;
+ self.limits.validate_filters(&filters)?;
Ok(RelayMessage::Count {
subscription_id,
count: self.count_events(&filters, auth)?,
@@ -447,9 +668,7 @@ impl BaseRelay {
for filter in filters {
let mut events =
Self::sort_and_dedupe_query_events(self.query_filter_events(filter, auth)?);
- if let Some(limit) = filter.limit() {
- events.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
- }
+ events.truncate(self.limits.effective_filter_limit(filter));
output.extend(events);
}
Ok(Self::sort_and_dedupe_query_events(output))
@@ -558,7 +777,7 @@ impl BaseRelay {
#[cfg(test)]
mod tests {
- use super::BaseRelay;
+ use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits};
use crate::pocket_conversion::tangle_event_to_pocket;
use crate::relay::auth::BaseAuthState;
use crate::relay::live::CloseResult;
@@ -703,6 +922,117 @@ mod tests {
}
#[test]
+ fn base_relay_enforces_runtime_limits() {
+ let config = test_store_config("base-relay-runtime-limits");
+ let mut relay = BaseRelay::open(
+ &config,
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events: 2,
+ max_subscription_id_length: 3,
+ max_subscriptions: 1,
+ max_filters_per_request: 1,
+ max_tag_values_per_filter: 1,
+ max_event_tags: 1,
+ max_content_length: 4,
+ max_limit: 2,
+ default_limit: 1,
+ })
+ .expect("limits"),
+ )
+ .expect("relay");
+ let first = signed_event_at(7, 1, Vec::new(), "one", 1_714_124_430);
+ let second = signed_event_at(8, 1, Vec::new(), "two", 1_714_124_431);
+
+ assert_accepted(relay.handle_event(first.clone()).expect("first"), &first);
+ assert_accepted(relay.handle_event(second.clone()).expect("second"), &second);
+
+ let limited = relay
+ .handle_req(
+ SubscriptionId::new("lim").expect("sub"),
+ vec![Filter::empty()],
+ )
+ .expect("limited");
+ assert_eq!(
+ limited
+ .iter()
+ .filter(|message| matches!(message, RelayMessage::Event { .. }))
+ .count(),
+ 1
+ );
+ assert_eq!(
+ relay.handle_close(&SubscriptionId::new("lim").expect("sub")),
+ CloseResult::Closed
+ );
+
+ assert!(
+ relay
+ .handle_req(
+ SubscriptionId::new("long").expect("sub"),
+ vec![Filter::empty()]
+ )
+ .expect_err("subscription id length")
+ .prefixed_message()
+ .contains("max_subid_length 3")
+ );
+ assert!(
+ relay
+ .handle_count(
+ SubscriptionId::new("cnt").expect("sub"),
+ vec![Filter::empty(), Filter::empty()]
+ )
+ .expect_err("filter count")
+ .prefixed_message()
+ .contains("max_filters_per_request 1")
+ );
+ assert!(
+ relay
+ .handle_count(
+ SubscriptionId::new("tag").expect("sub"),
+ vec![
+ filter_from_value(&serde_json::json!({"#t":["one", "two"]}))
+ .expect("filter")
+ ]
+ )
+ .expect_err("tag values")
+ .prefixed_message()
+ .contains("max_tag_values_per_filter 1")
+ );
+ assert!(
+ relay
+ .handle_count(
+ SubscriptionId::new("max").expect("sub"),
+ vec![filter_from_value(&serde_json::json!({"limit":3})).expect("filter")]
+ )
+ .expect_err("max limit")
+ .prefixed_message()
+ .contains("max_limit 2")
+ );
+
+ let too_many_tags = signed_event_at(
+ 9,
+ 1,
+ vec![
+ Tag::from_parts("t", &["one"]).expect("tag"),
+ Tag::from_parts("p", &["two"]).expect("tag"),
+ ],
+ "ok",
+ 1_714_124_432,
+ );
+ assert!(matches!(
+ relay.handle_event(too_many_tags).expect("tags"),
+ RelayMessage::Ok { accepted: false, message, .. }
+ if message.contains("max_event_tags 1")
+ ));
+
+ let too_much_content = signed_event_at(10, 1, Vec::new(), "12345", 1_714_124_433);
+ assert!(matches!(
+ relay.handle_event(too_much_content).expect("content"),
+ RelayMessage::Ok { accepted: false, message, .. }
+ if message.contains("max_content_length 4")
+ ));
+ }
+
+ #[test]
fn base_relay_count_dedupes_overlapping_visible_filters() {
let mut relay = test_relay("base-relay-count-dedupe", 8);
let market_tag = Tag::from_parts("t", &["market"]).expect("tag");
@@ -1589,7 +1919,7 @@ mod tests {
#[test]
fn base_relay_shutdown_closes_live_subscriptions_and_syncs_store() {
let config = test_store_config("base-relay-shutdown");
- let mut relay = BaseRelay::open(&config, 4).expect("relay");
+ let mut relay = BaseRelay::open(&config, relay_limits(4)).expect("relay");
let event = signed_public_event(7, 1, Vec::new(), "shutdown");
let subscription_id = SubscriptionId::new("sub-shutdown").expect("sub");
@@ -1606,7 +1936,7 @@ mod tests {
assert_eq!(relay.active_subscription_count(), 0);
assert!(relay.fanout(&event).is_empty());
- let reopened = BaseRelay::open(&config, 4).expect("reopened");
+ let reopened = BaseRelay::open(&config, relay_limits(4)).expect("reopened");
assert_eq!(count_kind(&reopened, 1), 1);
}
@@ -1652,9 +1982,126 @@ mod tests {
);
}
+ #[test]
+ fn base_relay_enforces_event_and_filter_runtime_limits() {
+ let config = test_store_config("base-relay-runtime-limits");
+ let mut relay = BaseRelay::open(&config, strict_relay_limits()).expect("relay");
+ let first = signed_public_event(7, 1, Vec::new(), "a");
+ let second = signed_event_at(8, 1, Vec::new(), "b", 1_714_124_434);
+
+ assert_accepted(relay.handle_event(first.clone()).expect("first"), &first);
+ assert_accepted(relay.handle_event(second.clone()).expect("second"), &second);
+ assert_eq!(
+ rejected_message(
+ relay
+ .handle_event(signed_public_event(7, 1, Vec::new(), "abcde"))
+ .expect("content")
+ ),
+ "invalid: event content length exceeds runtime max_content_length 4"
+ );
+ assert_eq!(
+ rejected_message(
+ relay
+ .handle_event(signed_public_event(
+ 7,
+ 1,
+ vec![
+ Tag::from_parts("t", &["one"]).expect("tag"),
+ Tag::from_parts("r", &["two"]).expect("tag"),
+ ],
+ "",
+ ))
+ .expect("tags")
+ ),
+ "invalid: event tag count exceeds runtime max_event_tags 1"
+ );
+ assert_eq!(
+ relay
+ .handle_req(
+ SubscriptionId::new("a").expect("sub"),
+ vec![Filter::empty()]
+ )
+ .expect("default limit")
+ .len(),
+ 2
+ );
+ assert!(
+ relay
+ .handle_req(
+ SubscriptionId::new("a").expect("sub"),
+ vec![Filter::empty(), Filter::empty()],
+ )
+ .expect_err("filter count")
+ .prefixed_message()
+ .contains("max_filters_per_request 1")
+ );
+ assert!(
+ relay
+ .handle_count(
+ SubscriptionId::new("a").expect("sub"),
+ vec![
+ filter_from_value(&serde_json::json!({"#t":["one", "two"]}))
+ .expect("filter"),
+ ],
+ )
+ .expect_err("tag values")
+ .prefixed_message()
+ .contains("max_tag_values_per_filter 1")
+ );
+ assert!(
+ relay
+ .handle_req(
+ SubscriptionId::new("a").expect("sub"),
+ vec![filter_from_value(&serde_json::json!({"limit": 3})).expect("filter")],
+ )
+ .expect_err("max limit")
+ .prefixed_message()
+ .contains("max_limit 2")
+ );
+ }
+
+ #[test]
+ fn base_relay_enforces_subscription_id_and_count_limits() {
+ let config = test_store_config("base-relay-subscription-limits");
+ let mut relay = BaseRelay::open(&config, strict_relay_limits()).expect("relay");
+
+ assert!(
+ relay
+ .handle_req(
+ SubscriptionId::new("abcde").expect("sub"),
+ vec![Filter::empty()],
+ )
+ .expect_err("sub id length")
+ .prefixed_message()
+ .contains("max_subid_length 4")
+ );
+ relay
+ .handle_req(
+ SubscriptionId::new("a").expect("sub"),
+ vec![Filter::empty()],
+ )
+ .expect("first subscription");
+ assert!(
+ relay
+ .handle_req(
+ SubscriptionId::new("b").expect("sub"),
+ vec![Filter::empty()]
+ )
+ .expect_err("subscription count")
+ .prefixed_message()
+ .contains("connection subscription limit exceeded")
+ );
+ relay
+ .handle_req(
+ SubscriptionId::new("a").expect("sub"),
+ vec![Filter::empty()],
+ )
+ .expect("replace subscription");
+ }
+
fn test_relay(name: &str, max_pending_events: usize) -> BaseRelay {
let config = test_store_config(name);
- BaseRelay::open(&config, max_pending_events).expect("relay")
+ BaseRelay::open(&config, relay_limits(max_pending_events)).expect("relay")
}
fn test_relay_with_groups(
@@ -1663,7 +2110,38 @@ mod tests {
groups: &tangle_groups::GroupRuntimeConfig,
) -> BaseRelay {
let config = test_store_config(name);
- BaseRelay::open_with_groups(&config, max_pending_events, groups).expect("relay")
+ BaseRelay::open_with_groups(&config, relay_limits(max_pending_events), groups)
+ .expect("relay")
+ }
+
+ fn relay_limits(max_pending_events: usize) -> BaseRelayLimits {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events,
+ max_subscription_id_length: 64,
+ max_subscriptions: 64,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })
+ .expect("limits")
+ }
+
+ fn strict_relay_limits() -> BaseRelayLimits {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events: 4,
+ max_subscription_id_length: 4,
+ max_subscriptions: 1,
+ max_filters_per_request: 1,
+ max_tag_values_per_filter: 1,
+ max_event_tags: 1,
+ max_content_length: 4,
+ max_limit: 2,
+ default_limit: 1,
+ })
+ .expect("limits")
}
fn test_store_config(name: &str) -> PocketStoreConfig {
diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs
@@ -10,6 +10,7 @@ pub(crate) struct LiveSubscriptionSet {
subscriptions: BTreeMap<SubscriptionId, LiveSubscription>,
pending: BTreeMap<SubscriptionId, usize>,
max_pending_events: usize,
+ max_subscriptions: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -19,16 +20,25 @@ struct LiveSubscription {
}
impl LiveSubscriptionSet {
- pub(crate) fn new(max_pending_events: usize) -> Result<Self, BaseRelayError> {
+ pub(crate) fn new(
+ max_pending_events: usize,
+ max_subscriptions: usize,
+ ) -> Result<Self, BaseRelayError> {
if max_pending_events == 0 {
return Err(BaseRelayError::invalid(
"live subscription pending event limit must be greater than zero",
));
}
+ if max_subscriptions == 0 {
+ return Err(BaseRelayError::invalid(
+ "live subscription count limit must be greater than zero",
+ ));
+ }
Ok(Self {
subscriptions: BTreeMap::new(),
pending: BTreeMap::new(),
max_pending_events,
+ max_subscriptions,
})
}
@@ -43,6 +53,13 @@ impl LiveSubscriptionSet {
"subscription must include at least one filter",
));
}
+ if !self.subscriptions.contains_key(&subscription_id)
+ && self.subscriptions.len() >= self.max_subscriptions
+ {
+ return Err(BaseRelayError::invalid(
+ "connection subscription limit exceeded",
+ ));
+ }
self.subscriptions
.insert(subscription_id.clone(), LiveSubscription { filters, auth });
self.pending.insert(subscription_id, 0);
@@ -134,7 +151,7 @@ mod tests {
#[test]
fn live_subscription_fanout_closes_lagged_subscriptions() {
- let mut subscriptions = LiveSubscriptionSet::new(1).expect("subscriptions");
+ let mut subscriptions = LiveSubscriptionSet::new(1, 1).expect("subscriptions");
let subscription_id = SubscriptionId::new("live").expect("subscription");
subscriptions
.subscribe(
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -7,7 +7,7 @@ use crate::{
ops::BaseRelayReadinessState,
relay::{
auth::BaseAuthState,
- core::{BaseRelay, BaseRelayShutdownReport},
+ core::{BaseRelay, BaseRelayLimits, BaseRelayShutdownReport},
live::LiveSubscriptionSet,
},
};
@@ -125,6 +125,18 @@ impl TangleRuntimeHandle {
}
Ok(vec![result.into_message()])
}
+ ClientMessage::Auth(event) => {
+ if let Err(error) = runtime.limits().base_relay_limits().validate_event(&event) {
+ return Ok(vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: false,
+ message: error.prefixed_message(),
+ }]);
+ }
+ runtime
+ .relay_mut()
+ .handle_client_message(ClientMessage::Auth(event), auth, now)
+ }
message => runtime
.relay_mut()
.handle_client_message(message, auth, now),
@@ -185,20 +197,22 @@ impl fmt::Debug for TangleRuntimeHandle {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TangleRuntimeLimits {
- max_pending_events: usize,
+ max_message_length: usize,
+ base_relay_limits: BaseRelayLimits,
event_bus_capacity: usize,
outbound_queue_capacity: usize,
}
impl TangleRuntimeLimits {
pub fn new(
- max_pending_events: usize,
+ max_message_length: usize,
+ base_relay_limits: BaseRelayLimits,
event_bus_capacity: usize,
outbound_queue_capacity: usize,
) -> Result<Self, BaseRelayError> {
- if max_pending_events == 0 {
+ if max_message_length == 0 {
return Err(BaseRelayError::invalid(
- "runtime max pending events must be greater than zero",
+ "runtime max message length must be greater than zero",
));
}
if event_bus_capacity == 0 {
@@ -212,22 +226,33 @@ impl TangleRuntimeLimits {
));
}
Ok(Self {
- max_pending_events,
+ max_message_length,
+ base_relay_limits,
event_bus_capacity,
outbound_queue_capacity,
})
}
pub fn from_config(config: &BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> {
+ let limits = config.limits();
Self::new(
- config.max_pending_events(),
- config.max_pending_events(),
- config.max_pending_events(),
+ limits.max_message_length(),
+ limits.base_relay_limits()?,
+ limits.broadcast_channel_capacity(),
+ limits.per_connection_outbound_queue(),
)
}
+ pub fn max_message_length(self) -> usize {
+ self.max_message_length
+ }
+
+ pub fn base_relay_limits(self) -> BaseRelayLimits {
+ self.base_relay_limits
+ }
+
pub fn max_pending_events(self) -> usize {
- self.max_pending_events
+ self.base_relay_limits.max_pending_events()
}
pub fn event_bus_capacity(self) -> usize {
@@ -331,6 +356,7 @@ mod tests {
use super::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits};
use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
use crate::event_bus::{TangleEventBus, TangleEventReceiveError};
+ use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits};
use crate::relay::live::LiveSubscriptionSet;
use serde_json::json;
use std::path::{Path, PathBuf};
@@ -355,9 +381,10 @@ mod tests {
assert_eq!(runtime.config().relay_url(), "wss://relay.radroots.test");
assert_eq!(runtime.config().listen_addr().to_string(), "127.0.0.1:0");
assert_eq!(runtime.limits().max_pending_events(), 8);
- assert_eq!(runtime.limits().event_bus_capacity(), 8);
+ assert_eq!(runtime.limits().max_message_length(), 1_048_576);
+ assert_eq!(runtime.limits().event_bus_capacity(), 16);
assert_eq!(runtime.limits().outbound_queue_capacity(), 8);
- assert_eq!(runtime.event_bus().capacity(), 8);
+ assert_eq!(runtime.event_bus().capacity(), 16);
assert_eq!(runtime.event_bus().receiver_count(), 1);
assert_eq!(runtime.metrics().active_sessions(), 0);
assert_eq!(runtime.metrics().stored_event_offsets(), 0);
@@ -398,9 +425,9 @@ mod tests {
#[test]
fn runtime_limits_and_event_bus_reject_zero_capacity() {
- assert!(TangleRuntimeLimits::new(0, 1, 1).is_err());
- assert!(TangleRuntimeLimits::new(1, 0, 1).is_err());
- assert!(TangleRuntimeLimits::new(1, 1, 0).is_err());
+ assert!(TangleRuntimeLimits::new(0, runtime_relay_limits(1), 1, 1).is_err());
+ assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 0, 1).is_err());
+ assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 1, 0).is_err());
assert!(TangleEventBus::new(0).is_err());
}
@@ -413,7 +440,7 @@ mod tests {
);
let mut offsets = handle.subscribe_events().await;
let mut auth = handle.auth_state().await.expect("auth");
- let mut subscriptions = LiveSubscriptionSet::new(8).expect("subscriptions");
+ let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions");
let subscription_id = SubscriptionId::new("live-offset").expect("subscription");
subscriptions
.subscribe(
@@ -491,7 +518,7 @@ mod tests {
tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 120).expect("auth event");
let create = tangle_v2_group_create_event(FixtureKey::Owner, "RuntimeFarm", 121, &[])
.expect("create");
- let mut subscriptions = LiveSubscriptionSet::new(8).expect("subscriptions");
+ let mut subscriptions = LiveSubscriptionSet::new(8, 64).expect("subscriptions");
let subscription_id = SubscriptionId::new("generated-offsets").expect("subscription");
subscriptions
.subscribe(
@@ -564,7 +591,7 @@ mod tests {
let _ = std::fs::remove_dir_all(root);
}
- fn runtime_config(root: &Path, max_pending_events: usize) -> BaseRelayRuntimeConfig {
+ fn runtime_config(root: &Path, per_connection_outbound_queue: usize) -> BaseRelayRuntimeConfig {
let raw = json!({
"server": {
"listen_addr": "127.0.0.1:0",
@@ -587,13 +614,38 @@ mod tests {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": max_pending_events
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 16,
+ "per_connection_outbound_queue": per_connection_outbound_queue
}
})
.to_string();
parse_base_relay_runtime_config_json(&raw).expect("config")
}
+ fn runtime_relay_limits(max_pending_events: usize) -> BaseRelayLimits {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events,
+ max_subscription_id_length: 64,
+ max_subscriptions: 64,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })
+ .expect("limits")
+ }
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id()))
}
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -4,7 +4,7 @@ use crate::{
errors::BaseRelayError,
nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response},
ops::{BaseRelayReadinessState, base_relay_ops_router},
- runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal},
+ runtime::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal},
session::TangleWebSocketSession,
};
use axum::{
@@ -62,13 +62,13 @@ pub async fn serve_listener_until_shutdown(
let info =
BaseRelayInfoConfig::new("tangle", runtime.config().groups().clone())?.build_document()?;
let readiness = runtime.readiness_state().clone();
- let outbound_queue_capacity = runtime.limits().outbound_queue_capacity();
+ let limits = runtime.limits();
let shutdown_signal = runtime.shutdown_signal().clone();
let runtime = TangleRuntimeHandle::new(runtime);
let router = tangle_http_router(
readiness,
info,
- outbound_queue_capacity,
+ limits,
shutdown_signal.clone(),
runtime.clone(),
);
@@ -96,7 +96,7 @@ pub async fn serve_listener_until_shutdown(
pub fn tangle_http_router(
readiness: BaseRelayReadinessState,
info: BaseRelayInfoDocument,
- outbound_queue_capacity: usize,
+ limits: TangleRuntimeLimits,
shutdown: TangleShutdownSignal,
runtime: TangleRuntimeHandle,
) -> Router {
@@ -104,7 +104,7 @@ pub fn tangle_http_router(
.route("/", get(tangle_root))
.with_state(TangleHttpState {
info,
- outbound_queue_capacity,
+ limits,
shutdown,
runtime,
})
@@ -114,7 +114,7 @@ pub fn tangle_http_router(
#[derive(Debug, Clone)]
struct TangleHttpState {
info: BaseRelayInfoDocument,
- outbound_queue_capacity: usize,
+ limits: TangleRuntimeLimits,
shutdown: TangleShutdownSignal,
runtime: TangleRuntimeHandle,
}
@@ -128,7 +128,7 @@ async fn tangle_root(
Ok(websocket) => {
let session = match state.runtime.auth_state().await {
Ok(auth) => TangleWebSocketSession::new(
- state.outbound_queue_capacity,
+ state.limits,
state.shutdown.subscribe(),
state.runtime.clone(),
auth,
@@ -380,12 +380,14 @@ mod tests {
.expect("info config")
.build_document()
.expect("info");
+ let runtime = TangleRuntime::open(config).expect("runtime");
+ let limits = runtime.limits();
let router = tangle_http_router(
BaseRelayReadinessState::ready(),
info,
- 8,
+ limits,
TangleShutdownSignal::new(),
- TangleRuntimeHandle::new(TangleRuntime::open(config).expect("runtime")),
+ TangleRuntimeHandle::new(runtime),
);
let nip11 = router
.clone()
@@ -478,7 +480,17 @@ mod tests {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": 8
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 8,
+ "per_connection_outbound_queue": 8
}
})
.to_string();
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -7,7 +7,7 @@ use crate::{
auth::{BaseAuthState, generate_auth_challenge},
live::LiveSubscriptionSet,
},
- runtime::TangleRuntimeHandle,
+ runtime::{TangleRuntimeHandle, TangleRuntimeLimits},
};
use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
@@ -24,6 +24,7 @@ pub struct TangleWebSocketSession {
outbound_receiver: mpsc::Receiver<Message>,
shutdown: watch::Receiver<bool>,
runtime: TangleRuntimeHandle,
+ limits: TangleRuntimeLimits,
auth: BaseAuthState,
subscriptions: LiveSubscriptionSet,
events: TangleEventReceiver,
@@ -31,19 +32,18 @@ pub struct TangleWebSocketSession {
impl TangleWebSocketSession {
pub fn new(
- outbound_queue_capacity: usize,
+ limits: TangleRuntimeLimits,
shutdown: watch::Receiver<bool>,
runtime: TangleRuntimeHandle,
auth: BaseAuthState,
events: TangleEventReceiver,
) -> Result<Self, BaseRelayError> {
- if outbound_queue_capacity == 0 {
- return Err(BaseRelayError::invalid(
- "runtime outbound queue capacity must be greater than zero",
- ));
- }
+ let outbound_queue_capacity = limits.outbound_queue_capacity();
let (sender, receiver) = mpsc::channel(outbound_queue_capacity);
- let subscriptions = LiveSubscriptionSet::new(outbound_queue_capacity)?;
+ let subscriptions = LiveSubscriptionSet::new(
+ limits.base_relay_limits().max_pending_events(),
+ limits.base_relay_limits().max_subscriptions(),
+ )?;
Ok(Self {
connected_at: Instant::now(),
outbound: TangleOutboundSender {
@@ -53,6 +53,7 @@ impl TangleWebSocketSession {
outbound_receiver: receiver,
shutdown,
runtime,
+ limits,
auth,
subscriptions,
events,
@@ -186,6 +187,14 @@ impl TangleWebSocketSession {
}
async fn dispatch_text(&mut self, raw: &str) -> bool {
+ if raw.len() > self.limits.max_message_length() {
+ return self
+ .send_relay_message(RelayMessage::Notice(format!(
+ "invalid: client message length exceeds runtime max_message_length {}",
+ self.limits.max_message_length()
+ )))
+ .is_ok();
+ }
let replies = match parse_client_message(raw) {
Ok(message) => match self.handle_client_message(message).await {
Ok(replies) => replies,
@@ -211,6 +220,9 @@ impl TangleWebSocketSession {
filters,
} => self.handle_req(subscription_id, filters).await,
ClientMessage::Close(subscription_id) => {
+ self.limits
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
self.subscriptions.close(&subscription_id);
Ok(Vec::new())
}
@@ -227,6 +239,10 @@ impl TangleWebSocketSession {
subscription_id: SubscriptionId,
filters: Vec<Filter>,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.limits
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ self.limits.base_relay_limits().validate_filters(&filters)?;
self.subscriptions.subscribe(
subscription_id.clone(),
filters.clone(),
@@ -313,8 +329,10 @@ mod tests {
};
use crate::{
config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
+ errors::BaseRelayError,
event_bus::TangleEventReceiver,
- runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal},
+ relay::core::{BaseRelayLimitSettings, BaseRelayLimits},
+ runtime::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal},
};
use axum::extract::ws::Message;
use serde_json::json;
@@ -327,28 +345,35 @@ mod tests {
let before = std::time::Instant::now();
let shutdown = TangleShutdownSignal::new();
let (runtime, auth, events) = session_runtime("records-connection-time");
- let session = TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth, events)
- .expect("session");
+ let session = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
assert!(session.connected_at() >= before);
}
#[test]
- fn websocket_session_rejects_zero_outbound_capacity() {
- let shutdown = TangleShutdownSignal::new();
- let (runtime, auth, events) = session_runtime("zero-outbound-capacity");
-
- assert!(
- TangleWebSocketSession::new(0, shutdown.subscribe(), runtime, auth, events).is_err()
- );
+ fn websocket_session_limit_config_rejects_zero_outbound_capacity() {
+ assert!(session_limits_result(0).is_err());
}
#[test]
fn websocket_session_observes_shutdown_request() {
let shutdown = TangleShutdownSignal::new();
let (runtime, auth, events) = session_runtime("observes-shutdown");
- let session = TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth, events)
- .expect("session");
+ let session = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
assert!(!session.shutdown_requested());
@@ -358,6 +383,30 @@ mod tests {
}
#[tokio::test]
+ async fn websocket_session_rejects_overlong_text_before_parsing() {
+ let shutdown = TangleShutdownSignal::new();
+ let (runtime, auth, events) = session_runtime("overlong-text");
+ let mut session = TangleWebSocketSession::new(
+ session_limits_with_message_length(8, 8),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
+
+ assert!(session.dispatch_text("123456789").await);
+ let message = session.outbound_receiver.try_recv().expect("notice");
+ let Message::Text(text) = message else {
+ panic!("expected text notice")
+ };
+ assert_eq!(
+ text.as_str(),
+ "[\"NOTICE\",\"invalid: client message length exceeds runtime max_message_length 8\"]"
+ );
+ }
+
+ #[tokio::test]
async fn websocket_session_scopes_subscriptions_per_connection() {
let shutdown = TangleShutdownSignal::new();
let root = temp_root("connection-scope");
@@ -368,12 +417,22 @@ mod tests {
let auth_b = runtime.auth_state().await.expect("auth b");
let events_a = runtime.subscribe_events().await;
let events_b = runtime.subscribe_events().await;
- let mut first =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime.clone(), auth_a, events_a)
- .expect("first");
- let mut second =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth_b, events_b)
- .expect("second");
+ let mut first = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime.clone(),
+ auth_a,
+ events_a,
+ )
+ .expect("first");
+ let mut second = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime,
+ auth_b,
+ events_b,
+ )
+ .expect("second");
let subscription_id = SubscriptionId::new("shared").expect("subscription");
assert_eq!(
@@ -431,13 +490,13 @@ mod tests {
let root = temp_root("event-receiver-lag");
let _ = std::fs::remove_dir_all(&root);
let runtime =
- TangleRuntime::open(runtime_config_with_max_pending_events(&root, 1)).expect("runtime");
+ TangleRuntime::open(runtime_config_with_outbound_queue(&root, 1)).expect("runtime");
let auth = runtime.auth_state().expect("auth");
let events = runtime.event_bus().subscribe();
assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1);
assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1);
let mut session = TangleWebSocketSession::new(
- 1,
+ session_limits(1),
shutdown.subscribe(),
TangleRuntimeHandle::new(runtime),
auth,
@@ -458,8 +517,14 @@ mod tests {
fn outbound_queue_is_bounded() {
let shutdown = TangleShutdownSignal::new();
let (runtime, auth, events) = session_runtime("outbound-queue");
- let session = TangleWebSocketSession::new(1, shutdown.subscribe(), runtime, auth, events)
- .expect("session");
+ let session = TangleWebSocketSession::new(
+ session_limits(1),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
let outbound = session.outbound();
assert_eq!(outbound.capacity(), 1);
@@ -497,12 +562,12 @@ mod tests {
}
fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig {
- runtime_config_with_max_pending_events(root, 8)
+ runtime_config_with_outbound_queue(root, 8)
}
- fn runtime_config_with_max_pending_events(
+ fn runtime_config_with_outbound_queue(
root: &Path,
- max_pending_events: usize,
+ per_connection_outbound_queue: usize,
) -> BaseRelayRuntimeConfig {
let raw = json!({
"server": {
@@ -523,13 +588,72 @@ mod tests {
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": max_pending_events
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": per_connection_outbound_queue,
+ "per_connection_outbound_queue": per_connection_outbound_queue
}
})
.to_string();
parse_base_relay_runtime_config_json(&raw).expect("config")
}
+ fn session_limits(per_connection_outbound_queue: usize) -> TangleRuntimeLimits {
+ session_limits_result(per_connection_outbound_queue).expect("limits")
+ }
+
+ fn session_limits_with_message_length(
+ max_message_length: usize,
+ per_connection_outbound_queue: usize,
+ ) -> TangleRuntimeLimits {
+ TangleRuntimeLimits::new(
+ max_message_length,
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events: per_connection_outbound_queue,
+ max_subscription_id_length: 64,
+ max_subscriptions: 64,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })
+ .expect("relay limits"),
+ 16,
+ per_connection_outbound_queue,
+ )
+ .expect("limits")
+ }
+
+ fn session_limits_result(
+ per_connection_outbound_queue: usize,
+ ) -> Result<TangleRuntimeLimits, BaseRelayError> {
+ TangleRuntimeLimits::new(
+ 1_048_576,
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events: per_connection_outbound_queue,
+ max_subscription_id_length: 64,
+ max_subscriptions: 64,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })?,
+ 16,
+ per_connection_outbound_queue,
+ )
+ }
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-session-{name}-{}", std::process::id()))
}
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -17,7 +17,11 @@ use tangle_protocol::{
use tangle_runtime::{
groups::{GroupCheckpointStatus, validate_group_extra_tables},
nip11::{BASE_RELAY_SUPPORTED_NIPS, BaseRelayInfoConfig},
- relay::{auth::BaseAuthState, core::BaseRelay, live::CloseResult},
+ relay::{
+ auth::BaseAuthState,
+ core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits},
+ live::CloseResult,
+ },
};
use tangle_store_pocket::{
PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE,
@@ -35,7 +39,7 @@ use tangle_test_support::{
#[test]
fn public_relay_smoke_stores_queries_counts_and_fans_out() {
let config = test_store_config("public-smoke");
- let mut relay = BaseRelay::open(&config, 4).expect("relay");
+ let mut relay = BaseRelay::open(&config, relay_limits(4)).expect("relay");
let first =
tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello").expect("first");
let query_id = subscription("public-query");
@@ -157,7 +161,7 @@ fn auth_integration_covers_challenge_edges() {
fn group_auth_lifecycle_membership_and_flag_flows_pass_in_process() {
let config = test_store_config("group-flows");
let groups = group_config_with_public_join();
- let mut relay = BaseRelay::open_with_groups(&config, 8, &groups).expect("relay");
+ let mut relay = BaseRelay::open_with_groups(&config, relay_limits(8), &groups).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
let admin_auth = authenticated(FixtureKey::Admin);
let member_auth = authenticated(FixtureKey::Member);
@@ -281,7 +285,8 @@ fn group_auth_lifecycle_membership_and_flag_flows_pass_in_process() {
#[test]
fn relay_override_role_changes_generate_admin_snapshots() {
let config = test_store_config("role-admin-snapshots");
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
let admin_auth = authenticated(FixtureKey::Admin);
let member = FixtureKey::Member.public_key().as_str().to_owned();
@@ -353,7 +358,8 @@ fn relay_override_role_changes_generate_admin_snapshots() {
#[test]
fn group_join_requests_are_denied_by_default() {
let config = test_store_config("group-public-join-default");
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
let outsider_auth = authenticated(FixtureKey::Outsider);
let create = tangle_v2_group_create_event(FixtureKey::Owner, "Farm", 1, &[]).expect("create");
@@ -386,7 +392,8 @@ fn group_join_requests_are_denied_by_default() {
#[test]
fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() {
let config = test_store_config("privacy-flags");
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
let outsider_auth = authenticated(FixtureKey::Outsider);
@@ -550,7 +557,8 @@ fn metadata_flags_and_read_privacy_cover_req_count_and_fanout() {
#[test]
fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() {
let config = test_store_config("nip29-leak-suite");
- let mut relay = BaseRelay::open_with_groups(&config, 16, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(16), &group_config()).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
let admin_auth = authenticated(FixtureKey::Admin);
let member_auth = authenticated(FixtureKey::Member);
@@ -929,7 +937,8 @@ fn nip29_privacy_leak_suite_covers_relay_exposure_and_rejection_paths() {
#[test]
fn delete_and_secondary_privacy_surfaces_are_read_gated_or_absent() {
let config = test_store_config("delete-privacy");
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
let owner_auth = authenticated(FixtureKey::Owner);
accept_group_create(&mut relay, "DeleteFarm", &[], 1, &owner_auth);
@@ -1011,7 +1020,8 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
let config = test_store_config("projection-restart");
let owner_auth = authenticated(FixtureKey::Owner);
{
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
accept_group_create(&mut relay, "RestartFarm", &[], 1, &owner_auth);
let put = tangle_v2_put_user_event(FixtureKey::Admin, "RestartFarm", FixtureKey::Member, 2)
.expect("put");
@@ -1029,7 +1039,8 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
}
delete_group_extra_records(&config);
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("reopen");
assert_eq!(
relay
.readiness_state()
@@ -1072,7 +1083,8 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
&GroupCheckpointStatus::Current { .. }
));
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("second reopen");
+ let relay = BaseRelay::open_with_groups(&config, relay_limits(8), &group_config())
+ .expect("second reopen");
assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1);
@@ -1088,7 +1100,8 @@ fn projection_applies_canonical_events_after_checkpoint_on_restart() {
let put = tangle_v2_put_user_event(FixtureKey::Admin, "IncrementalFarm", FixtureKey::Member, 2)
.expect("put");
{
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
assert_accepted(
relay
.handle_event_with_auth(create.clone(), &owner_auth)
@@ -1106,7 +1119,8 @@ fn projection_applies_canonical_events_after_checkpoint_on_restart() {
let create_offset = stored_event_offset(&config, &create);
regress_member_projection_to_checkpoint(&config, create_offset, "IncrementalFarm");
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("reopen");
assert_eq!(
relay
.group_projection()
@@ -1137,7 +1151,8 @@ fn source_store_crash_recovery_rebuilds_projection_outbox_and_generated_events()
store_source_events(&config, &[create, put]);
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("reopen");
assert_eq!(
relay
.readiness_state()
@@ -1192,7 +1207,8 @@ fn rebuilt_projection_matches_live_projection_for_moderation_stream() {
let members_before;
{
- let mut relay = BaseRelay::open_with_groups(&config, 16, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(16), &group_config()).expect("relay");
accept_group_create(&mut relay, "EquivFarm", &[], 1, &owner_auth);
let metadata =
tangle_v2_group_metadata_event(FixtureKey::Admin, "EquivFarm", "Market", 2, &[])
@@ -1271,7 +1287,8 @@ fn rebuilt_projection_matches_live_projection_for_moderation_stream() {
delete_group_extra_records(&config);
- let relay = BaseRelay::open_with_groups(&config, 16, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(16), &group_config()).expect("reopen");
assert_projection_without_checkpoint_eq(
&live_projection,
relay.group_projection().expect("projection"),
@@ -1295,7 +1312,8 @@ fn pending_and_retryable_group_outbox_records_materialize_on_restart() {
let config = test_store_config("outbox-retryable-restart");
let owner_auth = authenticated(FixtureKey::Owner);
{
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
accept_group_create(&mut relay, "OutboxFarm", &[], 1, &owner_auth);
relay.shutdown().expect("shutdown");
}
@@ -1303,7 +1321,8 @@ fn pending_and_retryable_group_outbox_records_materialize_on_restart() {
assert_eq!(outbox_status_counts(&config).pending, 1);
assert_eq!(outbox_status_counts(&config).retryable, 1);
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("reopen");
assert_eq!(
relay
.readiness_state()
@@ -1325,7 +1344,8 @@ fn max_outbox_replay_batch_one_drains_all_pending_generated_records() {
let config = test_store_config("outbox-batch-one");
let owner_auth = authenticated(FixtureKey::Owner);
let mut relay =
- BaseRelay::open_with_groups(&config, 8, &group_config_with_outbox_batch(1)).expect("relay");
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config_with_outbox_batch(1))
+ .expect("relay");
accept_group_create(&mut relay, "BatchFarm", &[], 1, &owner_auth);
@@ -1342,7 +1362,8 @@ fn already_stored_generated_events_mark_outbox_stored_without_duplication_on_res
let config = test_store_config("outbox-generated-already-stored");
let owner_auth = authenticated(FixtureKey::Owner);
{
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
accept_group_create(&mut relay, "StoredGeneratedFarm", &[], 1, &owner_auth);
relay.shutdown().expect("shutdown");
}
@@ -1354,7 +1375,8 @@ fn already_stored_generated_events_mark_outbox_stored_without_duplication_on_res
regress_outbox_records_to_pending(&config);
assert_eq!(outbox_status_counts(&config).pending, 2);
- let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ let relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("reopen");
assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
assert_eq!(
@@ -1377,8 +1399,8 @@ fn crash_point_recovery_states_match_live_projection_and_generated_events() {
let pending_outbox_config = test_store_config("crash-equivalence-pending-outbox");
let events = recovery_equivalence_events();
let expected = {
- let mut relay =
- BaseRelay::open_with_groups(&live_config, 8, &group_config()).expect("live");
+ let mut relay = BaseRelay::open_with_groups(&live_config, relay_limits(8), &group_config())
+ .expect("live");
let owner_auth = authenticated(FixtureKey::Owner);
let admin_auth = authenticated(FixtureKey::Admin);
assert_accepted(
@@ -1400,14 +1422,15 @@ fn crash_point_recovery_states_match_live_projection_and_generated_events() {
store_source_events(&source_only_config, &events);
let mut source_only =
- BaseRelay::open_with_groups(&source_only_config, 8, &group_config()).expect("source only");
+ BaseRelay::open_with_groups(&source_only_config, relay_limits(8), &group_config())
+ .expect("source only");
assert_eq!(recovery_summary(&mut source_only, "CrashFarm"), expected);
assert_eq!(outbox_status_counts(&source_only_config).stored, 5);
let offsets = store_source_events(&pending_outbox_config, &events);
seed_pending_create_outbox_records(&pending_outbox_config, &events[0], offsets[0]);
let mut pending_outbox =
- BaseRelay::open_with_groups(&pending_outbox_config, 8, &group_config())
+ BaseRelay::open_with_groups(&pending_outbox_config, relay_limits(8), &group_config())
.expect("pending outbox");
assert_eq!(recovery_summary(&mut pending_outbox, "CrashFarm"), expected);
let counts = outbox_status_counts(&pending_outbox_config);
@@ -1723,7 +1746,8 @@ fn sorted_strings(values: impl IntoIterator<Item = String>) -> Vec<String> {
fn final_group_name_for_order(name: &str, edits: [&Event; 2]) -> String {
let config = test_store_config(name);
- let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ let mut relay =
+ BaseRelay::open_with_groups(&config, relay_limits(8), &group_config()).expect("relay");
let auth = authenticated(FixtureKey::Owner);
accept_group_create(&mut relay, "ClockFarm", &[], 1, &auth);
for edit in edits {
@@ -1757,6 +1781,21 @@ fn test_store_config(name: &str) -> PocketStoreConfig {
.expect("config")
}
+fn relay_limits(max_pending_events: usize) -> BaseRelayLimits {
+ BaseRelayLimits::new(BaseRelayLimitSettings {
+ max_pending_events,
+ max_subscription_id_length: 64,
+ max_subscriptions: 64,
+ max_filters_per_request: 10,
+ max_tag_values_per_filter: 100,
+ max_event_tags: 200,
+ max_content_length: 65_536,
+ max_limit: 500,
+ default_limit: 100,
+ })
+ .expect("limits")
+}
+
fn delete_group_extra_records(config: &PocketStoreConfig) {
let store = PocketStoreHandle::open(config).expect("store");
for table in [
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -558,7 +558,17 @@ fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfi
"created_at_skew_seconds": 600
},
"limits": {
- "max_pending_events": 8
+ "max_message_length": 1048576,
+ "max_subid_length": 64,
+ "max_subscriptions_per_connection": 64,
+ "max_filters_per_request": 10,
+ "max_tag_values_per_filter": 100,
+ "max_limit": 500,
+ "default_limit": 100,
+ "max_event_tags": 200,
+ "max_content_length": 65536,
+ "broadcast_channel_capacity": 8,
+ "per_connection_outbound_queue": 8
}
})
.to_string();