commit 5889b8d30cea8a4756f9eef882a9b8c4c82d3306
parent 8f50bc2ff1cb2ea293edb5c37506f93046cf7b5a
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 20:57:23 +0000
publish-proxy: bound publish requests
- add max_concurrent_publish_jobs config with validation
- reject zero and excessive publish timeout requests
- include normalized timeout in publish intent fingerprints
- enforce concurrency permits before creating publish jobs
Diffstat:
3 files changed, 232 insertions(+), 5 deletions(-)
diff --git a/config.toml b/config.toml
@@ -41,6 +41,7 @@ enabled = true
max_event_bytes = 131072
max_relays_per_request = 20
job_list_limit = 100
+max_concurrent_publish_jobs = 8
relay_url_policy = "localhost"
author_relay_discovery_relays = []
daemon_default_publish_relays = ["ws://127.0.0.1:8080"]
diff --git a/src/app/config.rs b/src/app/config.rs
@@ -69,6 +69,10 @@ fn default_publish_proxy_job_list_limit() -> usize {
100
}
+fn default_publish_proxy_max_concurrent_publish_jobs() -> usize {
+ 8
+}
+
fn default_publish_proxy_relay_url_policy() -> PublishProxyRelayUrlPolicy {
PublishProxyRelayUrlPolicy::Public
}
@@ -110,6 +114,8 @@ struct RawPublishProxyConfig {
pub max_relays_per_request: usize,
#[serde(default = "default_publish_proxy_job_list_limit")]
pub job_list_limit: usize,
+ #[serde(default = "default_publish_proxy_max_concurrent_publish_jobs")]
+ pub max_concurrent_publish_jobs: usize,
#[serde(default)]
pub database_path: Option<PathBuf>,
#[serde(default = "default_publish_proxy_relay_url_policy")]
@@ -128,6 +134,7 @@ impl Default for RawPublishProxyConfig {
max_event_bytes: default_publish_proxy_max_event_bytes(),
max_relays_per_request: default_publish_proxy_max_relays_per_request(),
job_list_limit: default_publish_proxy_job_list_limit(),
+ max_concurrent_publish_jobs: default_publish_proxy_max_concurrent_publish_jobs(),
database_path: None,
relay_url_policy: default_publish_proxy_relay_url_policy(),
author_relay_discovery_relays: Vec::new(),
@@ -144,6 +151,7 @@ impl RawPublishProxyConfig {
max_event_bytes: self.max_event_bytes,
max_relays_per_request: self.max_relays_per_request,
job_list_limit: self.job_list_limit,
+ max_concurrent_publish_jobs: self.max_concurrent_publish_jobs,
database_path: self
.database_path
.unwrap_or_else(|| paths.publish_proxy_database_path.clone()),
@@ -262,6 +270,8 @@ pub struct PublishProxyConfig {
pub max_relays_per_request: usize,
#[serde(default = "default_publish_proxy_job_list_limit")]
pub job_list_limit: usize,
+ #[serde(default = "default_publish_proxy_max_concurrent_publish_jobs")]
+ pub max_concurrent_publish_jobs: usize,
#[serde(default = "default_publish_proxy_database_path")]
pub database_path: PathBuf,
#[serde(default = "default_publish_proxy_relay_url_policy")]
@@ -280,6 +290,7 @@ impl Default for PublishProxyConfig {
max_event_bytes: default_publish_proxy_max_event_bytes(),
max_relays_per_request: default_publish_proxy_max_relays_per_request(),
job_list_limit: default_publish_proxy_job_list_limit(),
+ max_concurrent_publish_jobs: default_publish_proxy_max_concurrent_publish_jobs(),
database_path: default_publish_proxy_database_path(),
relay_url_policy: default_publish_proxy_relay_url_policy(),
author_relay_discovery_relays: Vec::new(),
@@ -299,6 +310,9 @@ impl PublishProxyConfig {
if self.job_list_limit == 0 {
bail!("publish_proxy job_list_limit must be greater than zero");
}
+ if self.max_concurrent_publish_jobs == 0 {
+ bail!("publish_proxy max_concurrent_publish_jobs must be greater than zero");
+ }
if self.connect_timeout_secs == 0 {
bail!("publish_proxy connect_timeout_secs must be greater than zero");
}
@@ -447,6 +461,7 @@ mod tests {
assert_eq!(cfg.max_event_bytes, 128 * 1024);
assert_eq!(cfg.max_relays_per_request, 20);
assert_eq!(cfg.job_list_limit, 100);
+ assert_eq!(cfg.max_concurrent_publish_jobs, 8);
assert_eq!(cfg.database_path, paths.publish_proxy_database_path);
assert_eq!(cfg.relay_url_policy, PublishProxyRelayUrlPolicy::Public);
assert!(cfg.author_relay_discovery_relays.is_empty());
@@ -483,6 +498,9 @@ mod tests {
cfg.job_list_limit = 0;
assert!(cfg.validate().is_err());
let mut cfg = PublishProxyConfig::default();
+ cfg.max_concurrent_publish_jobs = 0;
+ assert!(cfg.validate().is_err());
+ let mut cfg = PublishProxyConfig::default();
cfg.connect_timeout_secs = 0;
assert!(cfg.validate().is_err());
}
diff --git a/src/core/publish_proxy/mod.rs b/src/core/publish_proxy/mod.rs
@@ -31,6 +31,7 @@ use rusqlite::{Connection, OptionalExtension, Row, params};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use thiserror::Error;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use uuid::Uuid;
use crate::app::config::PublishProxyConfig;
@@ -59,6 +60,8 @@ pub enum PublishProxyError {
Relay(#[from] RadrootsRelayTransportError),
#[error("publish proxy transport error: {0}")]
Transport(String),
+ #[error("publish proxy concurrency limit reached")]
+ ConcurrencyLimit,
#[error("publish proxy idempotency conflict for key `{0}`")]
IdempotencyConflict(String),
}
@@ -69,26 +72,31 @@ pub struct PublishProxy {
pub store: PublishProxyStore,
publisher: Option<Arc<dyn RadrootsRelayPublishAdapter>>,
resolver: Arc<dyn PublishRelayResolver>,
+ publish_jobs: Arc<Semaphore>,
}
impl PublishProxy {
pub fn open(config: PublishProxyConfig) -> Result<Self, PublishProxyError> {
let store = PublishProxyStore::open(config.database_path.clone())?;
+ let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs));
Ok(Self {
config,
store,
publisher: None,
resolver: Arc::new(SystemPublishRelayResolver),
+ publish_jobs,
})
}
pub fn memory(config: PublishProxyConfig) -> Result<Self, PublishProxyError> {
let store = PublishProxyStore::memory()?;
+ let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs));
Ok(Self {
config,
store,
publisher: None,
resolver: Arc::new(SystemPublishRelayResolver),
+ publish_jobs,
})
}
@@ -103,6 +111,13 @@ impl PublishProxy {
self
}
+ fn acquire_publish_permit(&self) -> Result<OwnedSemaphorePermit, PublishProxyError> {
+ self.publish_jobs
+ .clone()
+ .try_acquire_owned()
+ .map_err(|_| PublishProxyError::ConcurrencyLimit)
+ }
+
pub async fn publish_event(
&self,
principal: &PublishPrincipal,
@@ -122,10 +137,13 @@ impl PublishProxy {
"signed event exceeds publish_proxy max_event_bytes".to_owned(),
));
}
+ let effective_timeout_ms = effective_publish_timeout_ms(&self.config, request.timeout_ms)?;
+ let _permit = self.acquire_publish_permit()?;
let request_fingerprint = request_intent_fingerprint(
principal.principal_id.as_str(),
signed_event.raw_json.as_str(),
&request,
+ effective_timeout_ms,
)?;
let resolution = self
.resolve_relays_for_request(signed_event.pubkey.as_str(), &request)
@@ -145,7 +163,7 @@ impl PublishProxy {
response.job.job_id.as_str(),
signed_event,
request.delivery_policy.clone(),
- request.timeout_ms,
+ effective_timeout_ms,
resolution,
)
.await?;
@@ -380,7 +398,7 @@ impl PublishProxy {
job_id: &str,
signed_event: RadrootsSignedNostrEvent,
delivery_policy: PublishDeliveryPolicy,
- timeout_ms: Option<u64>,
+ timeout_ms: u64,
resolution: PublishRelayResolution,
) -> Result<PublishJobView, PublishProxyError> {
if resolution.targets.is_empty() {
@@ -428,9 +446,7 @@ impl PublishProxy {
RadrootsRelayPublishRequest::new(signed_event, target_set, current_unix_millis())
.with_accepted_quorum(required_ack_count);
let started = Instant::now();
- let publish_timeout = Duration::from_millis(
- timeout_ms.unwrap_or_else(|| self.config.connect_timeout_secs.saturating_mul(1_000)),
- );
+ let publish_timeout = Duration::from_millis(timeout_ms);
let receipts =
match tokio::time::timeout(publish_timeout, self.publish_with_adapter(publish_request))
.await
@@ -1395,6 +1411,7 @@ fn request_intent_fingerprint(
principal_id: &str,
canonical_event_json: &str,
request: &PublishEventRequest,
+ effective_timeout_ms: u64,
) -> Result<String, PublishProxyError> {
#[derive(Serialize)]
struct FingerprintInput<'a> {
@@ -1403,6 +1420,7 @@ fn request_intent_fingerprint(
relays: Vec<String>,
relay_policy: &'a PublishRelayPolicy,
delivery_policy: &'a PublishDeliveryPolicy,
+ effective_timeout_ms: u64,
}
let input = FingerprintInput {
@@ -1415,6 +1433,7 @@ fn request_intent_fingerprint(
.collect(),
relay_policy: &request.relay_policy,
delivery_policy: &request.delivery_policy,
+ effective_timeout_ms,
};
let bytes = serde_json::to_vec(&input)?;
let mut hasher = Sha256::new();
@@ -1422,6 +1441,25 @@ fn request_intent_fingerprint(
Ok(hex_lower(&hasher.finalize()))
}
+fn effective_publish_timeout_ms(
+ config: &PublishProxyConfig,
+ timeout_ms: Option<u64>,
+) -> Result<u64, PublishProxyError> {
+ let max_timeout_ms = config.connect_timeout_secs.saturating_mul(1_000);
+ match timeout_ms {
+ Some(0) => Err(PublishProxyError::InvalidSignedEvent(
+ "timeout_ms must be greater than zero".to_owned(),
+ )),
+ Some(timeout_ms) if timeout_ms > max_timeout_ms => {
+ Err(PublishProxyError::InvalidSignedEvent(format!(
+ "timeout_ms must be at most {max_timeout_ms}"
+ )))
+ }
+ Some(timeout_ms) => Ok(timeout_ms),
+ None => Ok(max_timeout_ms),
+ }
+}
+
fn push_resolved_relay(
targets: &mut Vec<ResolvedPublishRelay>,
url: RadrootsRelayUrl,
@@ -2360,6 +2398,176 @@ mod tests {
}
#[tokio::test]
+ async fn publish_event_rejects_zero_and_excessive_timeout_before_job_creation() {
+ let identity = RadrootsIdentity::generate();
+ let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
+ let principal = principal(
+ &proxy,
+ identity.public_key_hex(),
+ vec![PublishRelayPolicy::DaemonDefaultOnly],
+ false,
+ PublishJobVisibility::Own,
+ );
+ let mut zero = publish_request(
+ signed_event(&identity, "{}"),
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-zero-timeout"),
+ );
+ zero.timeout_ms = Some(0);
+ let zero_error = proxy
+ .publish_event(&principal, zero)
+ .await
+ .expect_err("zero timeout should fail");
+ assert!(matches!(
+ zero_error,
+ PublishProxyError::InvalidSignedEvent(_)
+ ));
+
+ let mut excessive = publish_request(
+ signed_event(&identity, "changed"),
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-excessive-timeout"),
+ );
+ excessive.timeout_ms = Some(10_001);
+ let excessive_error = proxy
+ .publish_event(&principal, excessive)
+ .await
+ .expect_err("excessive timeout should fail");
+ assert!(matches!(
+ excessive_error,
+ PublishProxyError::InvalidSignedEvent(_)
+ ));
+ assert!(
+ proxy
+ .store
+ .list_jobs_for_principal(&principal, 50)
+ .expect("jobs")
+ .is_empty()
+ );
+ assert!(adapter.captured_raw_events().is_empty());
+ }
+
+ #[tokio::test]
+ async fn publish_event_default_timeout_fingerprints_as_effective_timeout() {
+ let identity = RadrootsIdentity::generate();
+ let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
+ let principal = principal(
+ &proxy,
+ identity.public_key_hex(),
+ vec![PublishRelayPolicy::DaemonDefaultOnly],
+ false,
+ PublishJobVisibility::Own,
+ );
+ let event = signed_event(&identity, "{}");
+ let mut default_timeout = publish_request(
+ event.clone(),
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-default-timeout"),
+ );
+ default_timeout.timeout_ms = None;
+ let mut explicit_default = publish_request(
+ event,
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-default-timeout"),
+ );
+ explicit_default.timeout_ms = Some(10_000);
+
+ let first = proxy
+ .publish_event(&principal, default_timeout)
+ .await
+ .expect("first");
+ let duplicate = proxy
+ .publish_event(&principal, explicit_default)
+ .await
+ .expect("duplicate");
+ assert!(!first.deduplicated);
+ assert!(duplicate.deduplicated);
+ assert_eq!(duplicate.job.job_id, first.job.job_id);
+ }
+
+ #[tokio::test]
+ async fn publish_event_fingerprint_conflicts_on_different_effective_timeout() {
+ let identity = RadrootsIdentity::generate();
+ let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
+ let principal = principal(
+ &proxy,
+ identity.public_key_hex(),
+ vec![PublishRelayPolicy::DaemonDefaultOnly],
+ false,
+ PublishJobVisibility::Own,
+ );
+ let event = signed_event(&identity, "{}");
+ let first = publish_request(
+ event.clone(),
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-timeout-conflict"),
+ );
+ let mut conflict = publish_request(
+ event,
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-timeout-conflict"),
+ );
+ conflict.timeout_ms = Some(6_000);
+
+ proxy.publish_event(&principal, first).await.expect("first");
+ let error = proxy
+ .publish_event(&principal, conflict)
+ .await
+ .expect_err("timeout conflict");
+ assert!(matches!(error, PublishProxyError::IdempotencyConflict(_)));
+ }
+
+ #[tokio::test]
+ async fn publish_event_concurrency_limit_rejects_without_job_creation() {
+ let identity = RadrootsIdentity::generate();
+ let mut config = config_with_defaults(vec![RELAY_PRIMARY]);
+ config.max_concurrent_publish_jobs = 1;
+ let (proxy, adapter) = publish_proxy(config);
+ let principal = principal(
+ &proxy,
+ identity.public_key_hex(),
+ vec![PublishRelayPolicy::DaemonDefaultOnly],
+ false,
+ PublishJobVisibility::Own,
+ );
+ let _permit = proxy.acquire_publish_permit().expect("permit");
+ let error = proxy
+ .publish_event(
+ &principal,
+ publish_request(
+ signed_event(&identity, "{}"),
+ Vec::new(),
+ PublishRelayPolicy::DaemonDefaultOnly,
+ PublishDeliveryPolicy::Any,
+ Some("idem-concurrency"),
+ ),
+ )
+ .await
+ .expect_err("concurrency limit");
+ assert!(matches!(error, PublishProxyError::ConcurrencyLimit));
+ assert!(
+ proxy
+ .store
+ .list_jobs_for_principal(&principal, 50)
+ .expect("jobs")
+ .is_empty()
+ );
+ assert!(adapter.captured_raw_events().is_empty());
+ }
+
+ #[tokio::test]
async fn publish_jobs_respect_own_and_admin_visibility() {
let identity = RadrootsIdentity::generate();
let other_identity = RadrootsIdentity::generate();