commit 384791318a65845bf744ea5bd48fc595b3e77c4f
parent b32ee30abcb80c7971f6b58c63bfcd13c7c5aa6c
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:08:16 -0700
ws: add connection model
Diffstat:
1 file changed, 250 insertions(+), 6 deletions(-)
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -10,8 +10,9 @@ use core::fmt;
use http::{HeaderMap, HeaderValue, StatusCode, header};
use serde::{Deserialize, Serialize};
use tangle_core::{
- MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec,
- MarketplaceSort, RuntimeLimits,
+ AuthChallengeState, FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery,
+ MarketplaceQueryError, MarketplaceQuerySpec, MarketplaceSort, RateLimitConfig, RuntimeLimits,
+ SubscriptionManager, SubscriptionMatcher,
};
use tangle_nips::{FulfillmentMethod, ListingUnit};
use tangle_protocol::{EventId, PublicKeyHex};
@@ -21,6 +22,154 @@ use url::form_urlencoded;
pub const TANGLE_SUPPORTED_NIPS: [u16; 8] = [1, 9, 11, 16, 33, 42, 50, 99];
pub const TANGLE_RELAY_SOFTWARE: &str = "https://github.com/radrootslabs/tangle";
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct RelayConnectionId(String);
+
+impl RelayConnectionId {
+ pub const MAX_LENGTH: usize = 128;
+
+ pub fn new(value: &str) -> Result<Self, String> {
+ let value = value.trim();
+ if value.is_empty() {
+ return Err("connection id must not be empty".to_owned());
+ }
+ if value.len() > Self::MAX_LENGTH {
+ return Err(format!(
+ "connection id must be at most {} bytes, got {}",
+ Self::MAX_LENGTH,
+ value.len()
+ ));
+ }
+ Ok(Self(value.to_owned()))
+ }
+
+ pub fn as_str(&self) -> &str {
+ &self.0
+ }
+}
+
+impl fmt::Display for RelayConnectionId {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str(self.as_str())
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RelayConnectionConfig {
+ relay_url: String,
+ auth_ttl_seconds: u64,
+ message_rate_limit: RateLimitConfig,
+ runtime_limits: RuntimeLimits,
+}
+
+impl RelayConnectionConfig {
+ pub fn new(
+ relay_url: impl Into<String>,
+ auth_ttl_seconds: u64,
+ message_rate_limit: RateLimitConfig,
+ runtime_limits: RuntimeLimits,
+ ) -> Result<Self, String> {
+ let relay_url = relay_url.into();
+ let auth = AuthChallengeState::new(&relay_url, auth_ttl_seconds)
+ .map_err(|error| error.to_string())?;
+ Ok(Self {
+ relay_url: auth.relay_url().to_owned(),
+ auth_ttl_seconds,
+ message_rate_limit,
+ runtime_limits,
+ })
+ }
+
+ pub fn relay_url(&self) -> &str {
+ &self.relay_url
+ }
+
+ pub fn auth_ttl_seconds(&self) -> u64 {
+ self.auth_ttl_seconds
+ }
+
+ pub fn message_rate_limit(&self) -> RateLimitConfig {
+ self.message_rate_limit
+ }
+
+ pub fn runtime_limits(&self) -> RuntimeLimits {
+ self.runtime_limits
+ }
+}
+
+impl Default for RelayConnectionConfig {
+ fn default() -> Self {
+ Self::new(
+ "wss://relay.radroots.test",
+ 300,
+ RateLimitConfig::new(120, 60).expect("default message rate limit is valid"),
+ RuntimeLimits::default(),
+ )
+ .expect("default relay connection config is valid")
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RelayConnection {
+ id: RelayConnectionId,
+ remote_addr: Option<String>,
+ subscriptions: SubscriptionManager,
+ auth: AuthChallengeState,
+ rate_limiter: FixedWindowRateLimiter,
+}
+
+impl RelayConnection {
+ pub fn new(id: RelayConnectionId, config: RelayConnectionConfig) -> Self {
+ Self {
+ id,
+ remote_addr: None,
+ subscriptions: SubscriptionManager::new(
+ config.runtime_limits(),
+ SubscriptionMatcher::default(),
+ ),
+ auth: AuthChallengeState::new(config.relay_url(), config.auth_ttl_seconds())
+ .expect("connection config validates auth state"),
+ rate_limiter: FixedWindowRateLimiter::new(config.message_rate_limit()),
+ }
+ }
+
+ pub fn id(&self) -> &RelayConnectionId {
+ &self.id
+ }
+
+ pub fn remote_addr(&self) -> Option<&str> {
+ self.remote_addr.as_deref()
+ }
+
+ pub fn set_remote_addr(&mut self, remote_addr: impl Into<String>) {
+ self.remote_addr = Some(remote_addr.into());
+ }
+
+ pub fn subscriptions(&self) -> &SubscriptionManager {
+ &self.subscriptions
+ }
+
+ pub fn subscriptions_mut(&mut self) -> &mut SubscriptionManager {
+ &mut self.subscriptions
+ }
+
+ pub fn auth(&self) -> &AuthChallengeState {
+ &self.auth
+ }
+
+ pub fn auth_mut(&mut self) -> &mut AuthChallengeState {
+ &mut self.auth
+ }
+
+ pub fn rate_limiter(&self) -> &FixedWindowRateLimiter {
+ &self.rate_limiter
+ }
+
+ pub fn rate_limiter_mut(&mut self) -> &mut FixedWindowRateLimiter {
+ &mut self.rate_limiter
+ }
+}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApiErrorCode {
InvalidRequest,
@@ -1165,14 +1314,15 @@ fn invalid_parameter(field: &'static str, requirement: &str) -> ApiError {
mod tests {
use super::{
ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, ListingsHttpState,
- ReadinessCheckStatus, ReadinessState, RelayInfoDocument, TANGLE_RELAY_SOFTWARE,
- TANGLE_SUPPORTED_NIPS, health_router, listing_item_document, listing_projection_query,
- listings_router, parse_listing_query, parse_marketplace_search_query, relay_info_router,
+ ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig,
+ RelayConnectionId, RelayInfoDocument, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS,
+ health_router, listing_item_document, listing_projection_query, listings_router,
+ parse_listing_query, parse_marketplace_search_query, relay_info_router,
search_document_query,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
- use tangle_core::{MarketplaceListingStatus, MarketplaceSort, RuntimeLimits};
+ use tangle_core::{MarketplaceListingStatus, MarketplaceSort, RateLimitConfig, RuntimeLimits};
use tangle_nips::{FulfillmentMethod, ListingUnit};
use tangle_protocol::{UnixTimestamp, event_to_value};
use tangle_store::StoredEvent;
@@ -1246,6 +1396,100 @@ mod tests {
);
}
+ #[test]
+ fn relay_connection_id_validates_and_displays_stable_value() {
+ let id = RelayConnectionId::new(" conn-001 ").expect("id");
+
+ assert_eq!(id.as_str(), "conn-001");
+ assert_eq!(id.to_string(), "conn-001");
+ assert_eq!(
+ RelayConnectionId::new("").expect_err("empty"),
+ "connection id must not be empty"
+ );
+ assert_eq!(
+ RelayConnectionId::new(&"x".repeat(RelayConnectionId::MAX_LENGTH + 1))
+ .expect_err("too long"),
+ "connection id must be at most 128 bytes, got 129"
+ );
+ }
+
+ #[test]
+ fn relay_connection_config_normalizes_core_runtime_state() {
+ let rate_limit = RateLimitConfig::new(10, 60).expect("rate limit");
+ let config = RelayConnectionConfig::new(
+ " wss://relay.radroots.test ",
+ 42,
+ rate_limit,
+ RuntimeLimits::default(),
+ )
+ .expect("config");
+
+ assert_eq!(config.relay_url(), "wss://relay.radroots.test");
+ assert_eq!(config.auth_ttl_seconds(), 42);
+ assert_eq!(config.message_rate_limit(), rate_limit);
+ assert_eq!(config.runtime_limits(), RuntimeLimits::default());
+ assert_eq!(
+ RelayConnectionConfig::new("", 42, rate_limit, RuntimeLimits::default())
+ .expect_err("relay"),
+ "relay url must not be empty"
+ );
+ assert_eq!(
+ RelayConnectionConfig::new(
+ "wss://relay.radroots.test",
+ 0,
+ rate_limit,
+ RuntimeLimits::default()
+ )
+ .expect_err("ttl"),
+ "auth challenge ttl must be greater than zero"
+ );
+ }
+
+ #[test]
+ fn relay_connection_composes_subscription_auth_and_rate_state() {
+ let config = RelayConnectionConfig::new(
+ "wss://relay.radroots.test",
+ 30,
+ RateLimitConfig::new(2, 60).expect("rate limit"),
+ RuntimeLimits::default(),
+ )
+ .expect("config");
+ let mut connection =
+ RelayConnection::new(RelayConnectionId::new("conn-a").expect("id"), config);
+
+ assert_eq!(connection.id().as_str(), "conn-a");
+ assert_eq!(connection.remote_addr(), None);
+ connection.set_remote_addr("127.0.0.1:7777");
+ assert_eq!(connection.remote_addr(), Some("127.0.0.1:7777"));
+ assert_eq!(connection.subscriptions().active_count(), 0);
+ assert_eq!(connection.auth().relay_url(), "wss://relay.radroots.test");
+ assert_eq!(connection.auth().ttl_seconds(), 30);
+ assert_eq!(connection.rate_limiter().tracked_key_count(), 0);
+
+ let challenge = connection
+ .auth_mut()
+ .issue_challenge("challenge-a", UnixTimestamp::new(100))
+ .expect("challenge");
+ let decision = connection
+ .rate_limiter_mut()
+ .check("conn-a", UnixTimestamp::new(100), 1)
+ .expect("rate limit");
+
+ assert_eq!(challenge.value, "challenge-a");
+ assert_eq!(
+ connection
+ .auth()
+ .active_challenge()
+ .expect("active")
+ .expires_at,
+ UnixTimestamp::new(130)
+ );
+ assert_eq!(decision.allowed(), true);
+ assert_eq!(decision.remaining(), 1);
+ assert_eq!(connection.rate_limiter().tracked_key_count(), 1);
+ assert_eq!(connection.subscriptions_mut().active_count(), 0);
+ }
+
#[tokio::test]
async fn api_error_into_response_keeps_public_envelope_shape() {
let response = ApiError::not_found("listing not found").into_response();