commit 4d6ceba3f3eceb036e5faaa015a260041e3de473
parent ff98dc8a5457ce71d8af9d63d99e3143dfd8139c
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 02:27:01 -0700
policy: persist durable rate limits
Diffstat:
3 files changed, 587 insertions(+), 16 deletions(-)
diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs
@@ -288,6 +288,105 @@ async fn tangle_run_enforces_seller_projection_policy() {
fs::remove_dir_all(&reject_write.root).expect("remove reject root");
}
+#[tokio::test]
+async fn tangle_run_persists_durable_write_rate_limits() {
+ let port = free_port();
+ let root = std::env::temp_dir().join(format!(
+ "tangle-rate-limit-integration-{}-{port}",
+ std::process::id()
+ ));
+ let db_path = root.join("surrealdb");
+ let config_path = root.join("runtime.json");
+ fs::create_dir_all(&root).expect("runtime root");
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let auth = build_fixture_event(&auth_event_spec()).expect("auth");
+ let seller = FixtureKey::Seller.public_key();
+ write_runtime_config(
+ &config_path,
+ &db_path,
+ port,
+ "tangle_rate_limit",
+ serde_json::json!({
+ "approved_sellers": [seller.as_str()],
+ "write_rate_limit": {
+ "limit": 1,
+ "window_seconds": 60
+ }
+ }),
+ );
+ let mut relay = spawn_relay(&config_path);
+ wait_for_http(port, &mut relay);
+ let (mut client, _) = connect_async(format!("ws://127.0.0.1:{port}/ws"))
+ .await
+ .expect("client connect");
+ assert_eq!(next_label(&mut client).await, "AUTH");
+ client
+ .send(Message::Text(
+ serde_json::json!(["AUTH", event_to_value(&auth)])
+ .to_string()
+ .into(),
+ ))
+ .await
+ .expect("auth send");
+ assert_ok(&next_json(&mut client).await, true);
+
+ client
+ .send(Message::Text(
+ serde_json::json!(["EVENT", event_to_value(&listing)])
+ .to_string()
+ .into(),
+ ))
+ .await
+ .expect("first event send");
+ assert_ok(&next_json(&mut client).await, true);
+ client
+ .send(Message::Text(
+ serde_json::json!(["EVENT", event_to_value(&listing)])
+ .to_string()
+ .into(),
+ ))
+ .await
+ .expect("second event send");
+ let rejected = next_json(&mut client).await;
+ assert_ok(&rejected, false);
+ assert!(
+ rejected[3]
+ .as_str()
+ .expect("rate rejection")
+ .contains("rate-limited: retry after")
+ );
+ stop_relay(relay);
+
+ let store_config = SurrealConnectionConfig::rocksdb(
+ db_path.to_str().expect("db path"),
+ "tangle_rate_limit",
+ "relay",
+ )
+ .expect("store config");
+ let store = reopen_store(&store_config).await;
+ let key = format!("event_write:{}", seller.as_str());
+ let row = store
+ .rate_limit_state_row(&key)
+ .await
+ .expect("rate row")
+ .expect("rate row exists");
+ assert_eq!(row["key"], key);
+ assert_eq!(
+ serde_json::from_str::<serde_json::Value>(row["state"].as_str().expect("state"))
+ .expect("state json")["used"],
+ 1_u64
+ );
+ assert!(
+ store
+ .raw_event_row(listing.id())
+ .await
+ .expect("raw row")
+ .is_some()
+ );
+ drop(store);
+ fs::remove_dir_all(&root).expect("remove runtime root");
+}
+
struct PolicyWriteScenario {
root: std::path::PathBuf,
store_config: SurrealConnectionConfig,
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -35,8 +35,8 @@ use tangle_protocol::{
};
use tangle_store::{StoreEventOutcome, StoredEvent};
use tangle_store_surreal::{
- ListingProjectionQuery, MigrationApplyOutcome, SearchDocumentQuery, SurrealConnectionConfig,
- SurrealConnectionMode, SurrealStore, base_migration_plan,
+ DurableRateLimitDecision, ListingProjectionQuery, MigrationApplyOutcome, SearchDocumentQuery,
+ SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, base_migration_plan,
};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
@@ -236,6 +236,7 @@ pub struct TangleRuntimeConfig {
relay_connection: RelayConnectionConfig,
database: SurrealConnectionConfig,
admission_policy: AdmissionPolicy,
+ durable_write_rate_limit: Option<RateLimitConfig>,
limits: RuntimeLimits,
}
@@ -256,6 +257,10 @@ impl TangleRuntimeConfig {
&self.admission_policy
}
+ pub fn durable_write_rate_limit(&self) -> Option<RateLimitConfig> {
+ self.durable_write_rate_limit
+ }
+
pub fn limits(&self) -> RuntimeLimits {
self.limits
}
@@ -1035,7 +1040,8 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) {
let mut shutdown = state.shutdown_signal.subscribe();
let mut event_rx = state.event_tx.subscribe();
let mut loop_state = ClientMessageLoop::new(state.next_connection());
- let event_handler = EventMessageHandler::new(state.store.clone(), state.validator());
+ let event_handler = EventMessageHandler::new(state.store.clone(), state.validator())
+ .with_durable_write_rate_limit(state.config.durable_write_rate_limit());
let auth_handler = AuthMessageHandler;
let req_handler = ReqMessageHandler::new(state.store.clone(), NostrFilterCompiler::default());
let close_handler = CloseMessageHandler;
@@ -1269,6 +1275,7 @@ struct RuntimeLimitValuesDocument {
struct RuntimePolicyConfigDocument {
require_write_auth: Option<bool>,
unapproved_seller_action: Option<RuntimeUnapprovedSellerActionDocument>,
+ write_rate_limit: Option<RuntimeRateLimitConfigDocument>,
#[serde(default)]
approved_sellers: Vec<String>,
#[serde(default)]
@@ -1301,12 +1308,14 @@ fn runtime_config_from_document(
)
.map_err(RuntimeConfigError::invalid)?;
let database = database_config_from_document(document.database)?;
- let admission_policy = admission_policy_from_document(document.policy)?;
+ let durable_write_rate_limit = durable_write_rate_limit_from_document(&document.policy)?;
+ let admission_policy = admission_policy_from_document(&document.policy)?;
Ok(TangleRuntimeConfig {
listen_addr,
relay_connection,
database,
admission_policy,
+ durable_write_rate_limit,
limits: limits.runtime,
})
}
@@ -1388,8 +1397,21 @@ fn required_path(value: Option<String>, mode: &str) -> Result<String, RuntimeCon
})
}
+fn durable_write_rate_limit_from_document(
+ document: &RuntimePolicyConfigDocument,
+) -> Result<Option<RateLimitConfig>, RuntimeConfigError> {
+ document
+ .write_rate_limit
+ .as_ref()
+ .map(|value| {
+ RateLimitConfig::new(value.limit, value.window_seconds)
+ .map_err(|error| RuntimeConfigError::invalid(error.to_string()))
+ })
+ .transpose()
+}
+
fn admission_policy_from_document(
- document: RuntimePolicyConfigDocument,
+ document: &RuntimePolicyConfigDocument,
) -> Result<AdmissionPolicy, RuntimeConfigError> {
let action = match document.unapproved_seller_action {
Some(RuntimeUnapprovedSellerActionDocument::StoreRawOnly) | None => {
@@ -1402,15 +1424,15 @@ fn admission_policy_from_document(
let mut policy = AdmissionPolicy::new()
.with_write_auth_required(document.require_write_auth.unwrap_or(true))
.with_unapproved_seller_action(action);
- for pubkey in document.approved_sellers {
- policy = policy.approve_seller(PublicKeyHex::new(&pubkey).map_err(|error| {
+ for pubkey in &document.approved_sellers {
+ policy = policy.approve_seller(PublicKeyHex::new(pubkey.as_str()).map_err(|error| {
RuntimeConfigError::invalid(format!(
"policy.approved_sellers contains invalid pubkey: {error}"
))
})?);
}
- for pubkey in document.blocked_pubkeys {
- policy = policy.block_pubkey(PublicKeyHex::new(&pubkey).map_err(|error| {
+ for pubkey in &document.blocked_pubkeys {
+ policy = policy.block_pubkey(PublicKeyHex::new(pubkey.as_str()).map_err(|error| {
RuntimeConfigError::invalid(format!(
"policy.blocked_pubkeys contains invalid pubkey: {error}"
))
@@ -1596,11 +1618,16 @@ impl ClientMessageLoop {
pub struct EventMessageHandler {
store: SurrealStore,
validator: EventValidator,
+ durable_write_rate_limit: Option<RateLimitConfig>,
}
impl EventMessageHandler {
pub fn new(store: SurrealStore, validator: EventValidator) -> Self {
- Self { store, validator }
+ Self {
+ store,
+ validator,
+ durable_write_rate_limit: None,
+ }
}
pub fn store(&self) -> &SurrealStore {
@@ -1611,6 +1638,15 @@ impl EventMessageHandler {
&self.validator
}
+ pub fn durable_write_rate_limit(&self) -> Option<RateLimitConfig> {
+ self.durable_write_rate_limit
+ }
+
+ pub fn with_durable_write_rate_limit(mut self, config: Option<RateLimitConfig>) -> Self {
+ self.durable_write_rate_limit = config;
+ self
+ }
+
pub async fn handle_event(
&self,
connection: &RelayConnection,
@@ -1627,6 +1663,33 @@ impl EventMessageHandler {
if validated.admission().effect() == AdmissionEffect::AuthenticateOnly {
return ok_rejected(event_id, "invalid: auth events must use AUTH".to_owned());
}
+ if let Some(config) = self.durable_write_rate_limit {
+ match self
+ .store
+ .check_durable_rate_limit(
+ &durable_write_rate_limit_key(validated.author_pubkey()),
+ config.limit,
+ config.window_seconds,
+ 1,
+ now,
+ )
+ .await
+ {
+ Ok(DurableRateLimitDecision::Accepted { .. }) => {}
+ Ok(DurableRateLimitDecision::Rejected {
+ retry_after_seconds,
+ ..
+ }) => {
+ return ok_rejected(
+ event_id,
+ format!("rate-limited: retry after {retry_after_seconds} seconds"),
+ );
+ }
+ Err(_) => {
+ return ok_rejected(event_id, "error: rate limit unavailable".to_owned());
+ }
+ }
+ }
if event.unsigned().kind().is_ephemeral() {
return ok_accepted(event_id);
}
@@ -1901,6 +1964,10 @@ fn admission_context(connection: &RelayConnection) -> AdmissionContext {
.unwrap_or_else(AdmissionContext::unauthenticated)
}
+fn durable_write_rate_limit_key(pubkey: &PublicKeyHex) -> String {
+ format!("event_write:{}", pubkey.as_str())
+}
+
fn ok_accepted(event_id: EventId) -> RelayMessage {
RelayMessage::Ok {
event_id,
@@ -3345,6 +3412,12 @@ mod tests {
"max_content_bytes": 1024,
"max_subscriptions_per_connection": 8
}
+ },
+ "policy": {
+ "write_rate_limit": {
+ "limit": 2,
+ "window_seconds": 60
+ }
}
}"#,
)
@@ -3365,6 +3438,10 @@ mod tests {
assert_eq!(config.limits().max_event_bytes(), 2048);
assert_eq!(config.limits().max_content_bytes(), 1024);
assert_eq!(config.limits().max_subscriptions_per_connection(), 8);
+ assert_eq!(
+ config.durable_write_rate_limit(),
+ Some(RateLimitConfig::new(2, 60).expect("write limit"))
+ );
assert_eq!(config.database_config().namespace(), "tangle_test");
assert_eq!(config.database_config().database(), "relay");
assert_eq!(
@@ -3842,6 +3919,75 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn event_message_handler_persists_durable_write_rate_limits() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let connection = authenticated_connection();
+ let handler = EventMessageHandler::new(
+ store.clone(),
+ EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()),
+ ),
+ )
+ .with_durable_write_rate_limit(Some(RateLimitConfig::new(1, 60).expect("write rate")));
+
+ let accepted = handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_500),
+ UnixTimestamp::new(1_714_125_500),
+ )
+ .await;
+ let rejected = handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_125_501),
+ UnixTimestamp::new(1_714_125_501),
+ )
+ .await;
+
+ assert_eq!(
+ handler.durable_write_rate_limit(),
+ Some(RateLimitConfig::new(1, 60).expect("write rate"))
+ );
+ assert_eq!(
+ accepted,
+ RelayMessage::Ok {
+ event_id: listing.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ );
+ match rejected {
+ RelayMessage::Ok {
+ accepted: false,
+ message,
+ ..
+ } => assert_eq!(message, "rate-limited: retry after 59 seconds"),
+ outcome => panic!("unexpected outcome: {outcome:?}"),
+ }
+ let key = format!("event_write:{}", listing.unsigned().pubkey().as_str());
+ let row = store
+ .rate_limit_state_row(&key)
+ .await
+ .expect("rate row")
+ .expect("rate row exists");
+ assert_eq!(row["key"], key);
+ assert_eq!(row["expires_at"], 1_714_125_560_u64);
+ assert_eq!(
+ serde_json::from_str::<serde_json::Value>(row["state"].as_str().expect("state"))
+ .expect("state json"),
+ serde_json::json!({
+ "started_at": 1714125500_u64,
+ "used": 1
+ })
+ );
+ }
+
#[test]
fn auth_message_handler_issues_and_accepts_auth_events() {
let handler = AuthMessageHandler;
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -712,6 +712,47 @@ pub enum HiddenEventOutcome {
Unhidden,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum DurableRateLimitDecision {
+ Accepted {
+ remaining: u64,
+ reset_at: UnixTimestamp,
+ },
+ Rejected {
+ retry_after_seconds: u64,
+ reset_at: UnixTimestamp,
+ },
+}
+
+impl DurableRateLimitDecision {
+ pub fn allowed(self) -> bool {
+ matches!(self, Self::Accepted { .. })
+ }
+
+ pub fn remaining(self) -> u64 {
+ match self {
+ Self::Accepted { remaining, .. } => remaining,
+ Self::Rejected { .. } => 0,
+ }
+ }
+
+ pub fn reset_at(self) -> UnixTimestamp {
+ match self {
+ Self::Accepted { reset_at, .. } | Self::Rejected { reset_at, .. } => reset_at,
+ }
+ }
+
+ pub fn retry_after_seconds(self) -> Option<u64> {
+ match self {
+ Self::Accepted { .. } => None,
+ Self::Rejected {
+ retry_after_seconds,
+ ..
+ } => Some(retry_after_seconds),
+ }
+ }
+}
+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ListingProjectionQuery {
effective_status: Option<String>,
@@ -2044,6 +2085,99 @@ UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_eve
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn check_durable_rate_limit(
+ &self,
+ key: &str,
+ limit: u64,
+ window_seconds: u64,
+ cost: u64,
+ now: UnixTimestamp,
+ ) -> Result<DurableRateLimitDecision, SurrealStoreError> {
+ let key = required_policy_text(key, "rate limit key")?;
+ if limit == 0 {
+ return Err(SurrealStoreError::new(
+ "rate limit must be greater than zero",
+ ));
+ }
+ if window_seconds == 0 {
+ return Err(SurrealStoreError::new(
+ "rate limit window must be greater than zero seconds",
+ ));
+ }
+ if cost == 0 {
+ return Err(SurrealStoreError::new(
+ "rate limit cost must be greater than zero",
+ ));
+ }
+ if cost > limit {
+ return Err(SurrealStoreError::new(&format!(
+ "rate limit cost {cost} exceeds limit {limit}"
+ )));
+ }
+ let row = self.rate_limit_state_row(&key).await?;
+ let created_at = row
+ .as_ref()
+ .and_then(|row| row.get("created_at"))
+ .and_then(serde_json::Value::as_u64)
+ .unwrap_or_else(|| now.as_u64());
+ let mut state = row
+ .as_ref()
+ .map(rate_limit_window_state_from_row)
+ .transpose()?
+ .unwrap_or_else(|| DurableRateLimitWindowState::new(now));
+ state.reset_if_elapsed(now, window_seconds);
+ let reset_at = state.reset_at(window_seconds);
+ if state.used.saturating_add(cost) > limit {
+ return Ok(DurableRateLimitDecision::Rejected {
+ retry_after_seconds: reset_at.as_u64().saturating_sub(now.as_u64()),
+ reset_at,
+ });
+ }
+ state.used += cost;
+ self.upsert_rate_limit_state(&key, state, reset_at, created_at, now)
+ .await?;
+ Ok(DurableRateLimitDecision::Accepted {
+ remaining: limit - state.used,
+ reset_at,
+ })
+ }
+
+ pub async fn rate_limit_state_row(
+ &self,
+ key: &str,
+ ) -> Result<Option<serde_json::Value>, SurrealStoreError> {
+ let key = required_policy_text(key, "rate limit key")?;
+ let mut response = self
+ .db
+ .query("SELECT * FROM ONLY type::record('rate_limit_state', $key);")
+ .bind(("key", key.as_str()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
+ pub async fn prune_expired_rate_limit_state(
+ &self,
+ now: UnixTimestamp,
+ ) -> Result<u64, SurrealStoreError> {
+ let mut response = self
+ .db
+ .query(
+ "DELETE rate_limit_state WHERE expires_at != NONE AND expires_at <= $now RETURN BEFORE;",
+ )
+ .bind(("now", now.as_u64()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ let rows = response
+ .take::<Vec<serde_json::Value>>(0)
+ .map_err(SurrealStoreError::from)?;
+ Ok(rows.len() as u64)
+ }
+
async fn replace_listing_helper_rows(
&self,
table: &str,
@@ -2097,6 +2231,38 @@ UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_eve
response.take(0).map_err(SurrealStoreError::from)
}
+ async fn upsert_rate_limit_state(
+ &self,
+ key: &str,
+ state: DurableRateLimitWindowState,
+ expires_at: UnixTimestamp,
+ created_at: u64,
+ updated_at: UnixTimestamp,
+ ) -> Result<(), SurrealStoreError> {
+ self.db
+ .query(
+ r#"
+UPSERT type::record('rate_limit_state', $key) CONTENT {
+ key: $key,
+ state: $state,
+ expires_at: $expires_at,
+ created_at: $created_at,
+ updated_at: $updated_at
+};
+"#,
+ )
+ .bind(("key", key))
+ .bind(("state", state.to_json_string()))
+ .bind(("expires_at", expires_at.as_u64()))
+ .bind(("created_at", created_at))
+ .bind(("updated_at", updated_at.as_u64()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ Ok(())
+ }
+
async fn query_single_indexed_tag_event_ids(
&self,
tag: &str,
@@ -2279,6 +2445,63 @@ fn moderation_action_id(
))
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct DurableRateLimitWindowState {
+ started_at: UnixTimestamp,
+ used: u64,
+}
+
+impl DurableRateLimitWindowState {
+ fn new(started_at: UnixTimestamp) -> Self {
+ Self {
+ started_at,
+ used: 0,
+ }
+ }
+
+ fn reset_at(self, window_seconds: u64) -> UnixTimestamp {
+ UnixTimestamp::new(self.started_at.as_u64().saturating_add(window_seconds))
+ }
+
+ fn reset_if_elapsed(&mut self, now: UnixTimestamp, window_seconds: u64) {
+ if now >= self.reset_at(window_seconds) || now < self.started_at {
+ self.started_at = now;
+ self.used = 0;
+ }
+ }
+
+ fn to_json_string(self) -> String {
+ serde_json::json!({
+ "started_at": self.started_at.as_u64(),
+ "used": self.used
+ })
+ .to_string()
+ }
+}
+
+fn rate_limit_window_state_from_row(
+ row: &serde_json::Value,
+) -> Result<DurableRateLimitWindowState, SurrealStoreError> {
+ let raw = row
+ .get("state")
+ .and_then(serde_json::Value::as_str)
+ .ok_or_else(|| SurrealStoreError::new("rate limit state is invalid"))?;
+ let value = serde_json::from_str::<serde_json::Value>(raw)
+ .map_err(|_| SurrealStoreError::new("rate limit state is invalid"))?;
+ let started_at = value
+ .get("started_at")
+ .and_then(serde_json::Value::as_u64)
+ .ok_or_else(|| SurrealStoreError::new("rate limit state is invalid"))?;
+ let used = value
+ .get("used")
+ .and_then(serde_json::Value::as_u64)
+ .ok_or_else(|| SurrealStoreError::new("rate limit state is invalid"))?;
+ Ok(DurableRateLimitWindowState {
+ started_at: UnixTimestamp::new(started_at),
+ used,
+ })
+}
+
fn d_tag_value(event: &Event) -> Option<String> {
event
.unsigned()
@@ -2624,12 +2847,12 @@ impl From<surrealdb::Error> for SurrealStoreError {
#[cfg(test)]
mod tests {
use super::{
- CurrentEventOutcome, DeletionMarkerOutcome, HiddenEventOutcome, ListingCurrentOutcome,
- ListingHelperOutcome, ListingProjectionQuery, ListingRevisionOutcome,
- MigrationApplyOutcome, SearchDocumentOutcome, SearchDocumentQuery, SurrealConfigError,
- SurrealConnectionConfig, SurrealConnectionMode, SurrealMigration, SurrealMigrationError,
- SurrealMigrationPlan, SurrealStore, SurrealStoreError, base_migration_plan,
- migration_tracking_schema,
+ CurrentEventOutcome, DeletionMarkerOutcome, DurableRateLimitDecision, HiddenEventOutcome,
+ ListingCurrentOutcome, ListingHelperOutcome, ListingProjectionQuery,
+ ListingRevisionOutcome, MigrationApplyOutcome, SearchDocumentOutcome, SearchDocumentQuery,
+ SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode, SurrealMigration,
+ SurrealMigrationError, SurrealMigrationPlan, SurrealStore, SurrealStoreError,
+ base_migration_plan, migration_tracking_schema,
};
use tangle_nips::ListingProjectionEvaluation;
use tangle_protocol::{
@@ -4755,6 +4978,109 @@ mod tests {
}
#[tokio::test]
+ async fn durable_rate_limit_state_persists_fixed_windows() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let key = "event_write:".to_owned() + &"1".repeat(PublicKeyHex::HEX_LENGTH);
+
+ let first = store
+ .check_durable_rate_limit(&key, 3, 60, 1, UnixTimestamp::new(100))
+ .await
+ .expect("first");
+ let second = store
+ .check_durable_rate_limit(&key, 3, 60, 2, UnixTimestamp::new(110))
+ .await
+ .expect("second");
+ let rejected = store
+ .check_durable_rate_limit(&key, 3, 60, 1, UnixTimestamp::new(120))
+ .await
+ .expect("rejected");
+
+ assert_eq!(
+ first,
+ DurableRateLimitDecision::Accepted {
+ remaining: 2,
+ reset_at: UnixTimestamp::new(160)
+ }
+ );
+ assert!(first.allowed());
+ assert_eq!(first.remaining(), 2);
+ assert_eq!(first.reset_at(), UnixTimestamp::new(160));
+ assert_eq!(first.retry_after_seconds(), None);
+ assert_eq!(
+ second,
+ DurableRateLimitDecision::Accepted {
+ remaining: 0,
+ reset_at: UnixTimestamp::new(160)
+ }
+ );
+ assert_eq!(
+ rejected,
+ DurableRateLimitDecision::Rejected {
+ retry_after_seconds: 40,
+ reset_at: UnixTimestamp::new(160)
+ }
+ );
+ assert!(!rejected.allowed());
+ assert_eq!(rejected.remaining(), 0);
+ assert_eq!(rejected.retry_after_seconds(), Some(40));
+ let row = store
+ .rate_limit_state_row(&key)
+ .await
+ .expect("rate row")
+ .expect("rate row exists");
+ assert_eq!(row["key"], key);
+ assert_eq!(row["expires_at"], 160_u64);
+ assert_eq!(row["created_at"], 100_u64);
+ assert_eq!(row["updated_at"], 110_u64);
+ assert_eq!(
+ serde_json::from_str::<serde_json::Value>(row["state"].as_str().expect("state"))
+ .expect("state json"),
+ serde_json::json!({
+ "started_at": 100,
+ "used": 3
+ })
+ );
+
+ let reset = store
+ .check_durable_rate_limit(&key, 3, 60, 1, UnixTimestamp::new(160))
+ .await
+ .expect("reset");
+ assert_eq!(
+ reset,
+ DurableRateLimitDecision::Accepted {
+ remaining: 2,
+ reset_at: UnixTimestamp::new(220)
+ }
+ );
+ let row = store
+ .rate_limit_state_row(&key)
+ .await
+ .expect("rate row")
+ .expect("rate row exists");
+ assert_eq!(row["expires_at"], 220_u64);
+ assert_eq!(row["created_at"], 100_u64);
+ assert_eq!(row["updated_at"], 160_u64);
+ assert_eq!(
+ store
+ .prune_expired_rate_limit_state(UnixTimestamp::new(221))
+ .await
+ .expect("prune"),
+ 1
+ );
+ assert!(
+ store
+ .rate_limit_state_row(&key)
+ .await
+ .expect("pruned row")
+ .is_none()
+ );
+ }
+
+ #[tokio::test]
async fn private_helpers_cover_debug_errors_and_decimal_edges() {
let store = memory_store().await;
assert!(format!("{store:?}").contains("SurrealStore"));