radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit 40b44da6728dd587fe9a6825468ec812fd0035c5
parent ae44b3c0408139ae819bd9779b953f61cb9eef6f
Author: triesap <tyson@radroots.org>
Date:   Tue, 23 Jun 2026 08:55:23 +0000

publish-proxy: execute daemon relay publishing

- verify signed publish events before relay transport
- resolve request, author, and daemon relay targets
- persist idempotent jobs and relay outcomes in SQLite
- cover scope, delivery, notification, and batch behavior

Diffstat:
MCargo.lock | 27+++++++++++++++++++++++++++
MCargo.toml | 2++
Mconfig.toml | 3+++
Msrc/app/config.rs | 43++++++++++++++++++++++++++++++++++++++++++-
Msrc/core/publish_proxy/mod.rs | 1307++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/transport/jsonrpc/methods/publish_proxy.rs | 189++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------
Msrc/transport/jsonrpc/server.rs | 252+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 1727 insertions(+), 96 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -628,6 +628,7 @@ checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -651,6 +652,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-io" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1853,6 +1865,20 @@ dependencies = [ ] [[package]] +name = "radroots_relay_transport" +version = "0.1.0-alpha.2" +dependencies = [ + "futures", + "nostr", + "radroots_events", + "radroots_nostr", + "serde", + "serde_json", + "thiserror 1.0.69", + "url", +] + +[[package]] name = "radroots_runtime" version = "0.1.0-alpha.2" dependencies = [ @@ -1899,6 +1925,7 @@ dependencies = [ "radroots_identity", "radroots_nostr", "radroots_publish_proxy_protocol", + "radroots_relay_transport", "radroots_runtime", "radroots_runtime_paths", "rand 0.9.2", diff --git a/Cargo.toml b/Cargo.toml @@ -15,6 +15,7 @@ radroots_events = { path = "../lib/crates/events" } radroots_identity = { path = "../lib/crates/identity" } radroots_nostr = { path = "../lib/crates/nostr" } radroots_publish_proxy_protocol = { path = "../lib/crates/publish_proxy_protocol" } +radroots_relay_transport = { path = "../lib/crates/relay_transport", default-features = false } radroots_runtime = { path = "../lib/crates/runtime" } [lints.rust] @@ -25,6 +26,7 @@ radroots_events = { workspace = true, features = ["serde"] } radroots_identity = { workspace = true } radroots_nostr = { workspace = true, features = ["client", "codec", "events", "http"] } radroots_publish_proxy_protocol = { workspace = true, features = ["std", "serde"] } +radroots_relay_transport = { workspace = true, features = ["std", "client"] } radroots_runtime = { workspace = true, features = ["cli"] } radroots_runtime_paths = { path = "../lib/crates/runtime_paths" } nostr = { version = "0.44.2", features = ["nip46"] } diff --git a/config.toml b/config.toml @@ -41,6 +41,9 @@ enabled = true max_event_bytes = 131072 max_relays_per_request = 20 job_list_limit = 100 +relay_url_policy = "localhost" +author_relay_discovery_relays = [] +daemon_default_publish_relays = ["ws://127.0.0.1:8080"] [config.nip46] public_jsonrpc_enabled = false diff --git a/src/app/config.rs b/src/app/config.rs @@ -69,6 +69,10 @@ fn default_publish_proxy_job_list_limit() -> usize { 100 } +fn default_publish_proxy_relay_url_policy() -> PublishProxyRelayUrlPolicy { + PublishProxyRelayUrlPolicy::Public +} + #[derive(Debug, Deserialize, Clone, Default)] struct RawServiceConfig { #[serde(default)] @@ -108,6 +112,12 @@ struct RawPublishProxyConfig { pub job_list_limit: usize, #[serde(default)] pub database_path: Option<PathBuf>, + #[serde(default = "default_publish_proxy_relay_url_policy")] + pub relay_url_policy: PublishProxyRelayUrlPolicy, + #[serde(default)] + pub author_relay_discovery_relays: Vec<String>, + #[serde(default)] + pub daemon_default_publish_relays: Vec<String>, } impl Default for RawPublishProxyConfig { @@ -119,6 +129,9 @@ impl Default for RawPublishProxyConfig { max_relays_per_request: default_publish_proxy_max_relays_per_request(), job_list_limit: default_publish_proxy_job_list_limit(), database_path: None, + relay_url_policy: default_publish_proxy_relay_url_policy(), + author_relay_discovery_relays: Vec::new(), + daemon_default_publish_relays: Vec::new(), } } } @@ -134,6 +147,9 @@ impl RawPublishProxyConfig { database_path: self .database_path .unwrap_or_else(|| paths.publish_proxy_database_path.clone()), + relay_url_policy: self.relay_url_policy, + author_relay_discovery_relays: self.author_relay_discovery_relays, + daemon_default_publish_relays: self.daemon_default_publish_relays, } } } @@ -225,6 +241,13 @@ impl Default for Nip46Config { } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PublishProxyRelayUrlPolicy { + Public, + Localhost, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct PublishProxyConfig { #[serde(default = "default_publish_proxy_enabled")] pub enabled: bool, @@ -238,6 +261,12 @@ pub struct PublishProxyConfig { pub job_list_limit: usize, #[serde(default = "default_publish_proxy_database_path")] pub database_path: PathBuf, + #[serde(default = "default_publish_proxy_relay_url_policy")] + pub relay_url_policy: PublishProxyRelayUrlPolicy, + #[serde(default)] + pub author_relay_discovery_relays: Vec<String>, + #[serde(default)] + pub daemon_default_publish_relays: Vec<String>, } impl Default for PublishProxyConfig { @@ -249,6 +278,9 @@ impl Default for PublishProxyConfig { max_relays_per_request: default_publish_proxy_max_relays_per_request(), job_list_limit: default_publish_proxy_job_list_limit(), database_path: default_publish_proxy_database_path(), + relay_url_policy: default_publish_proxy_relay_url_policy(), + author_relay_discovery_relays: Vec::new(), + daemon_default_publish_relays: Vec::new(), } } } @@ -264,6 +296,9 @@ impl PublishProxyConfig { if self.job_list_limit == 0 { bail!("publish_proxy job_list_limit must be greater than zero"); } + if self.connect_timeout_secs == 0 { + bail!("publish_proxy connect_timeout_secs must be greater than zero"); + } Ok(()) } } @@ -347,7 +382,7 @@ mod tests { use std::path::PathBuf; use super::{ - Configuration, Nip46Config, PublishProxyConfig, RpcConfig, + Configuration, Nip46Config, PublishProxyConfig, PublishProxyRelayUrlPolicy, RpcConfig, load_settings_from_path_with_resolver, }; use crate::app::paths::{ @@ -410,6 +445,9 @@ mod tests { assert_eq!(cfg.max_relays_per_request, 20); assert_eq!(cfg.job_list_limit, 100); assert_eq!(cfg.database_path, paths.publish_proxy_database_path); + assert_eq!(cfg.relay_url_policy, PublishProxyRelayUrlPolicy::Public); + assert!(cfg.author_relay_discovery_relays.is_empty()); + assert!(cfg.daemon_default_publish_relays.is_empty()); } #[test] @@ -441,6 +479,9 @@ mod tests { let mut cfg = PublishProxyConfig::default(); cfg.job_list_limit = 0; assert!(cfg.validate().is_err()); + let mut cfg = PublishProxyConfig::default(); + cfg.connect_timeout_secs = 0; + assert!(cfg.validate().is_err()); } #[test] diff --git a/src/core/publish_proxy/mod.rs b/src/core/publish_proxy/mod.rs @@ -1,13 +1,27 @@ +use std::collections::BTreeMap; use std::fmt; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use radroots_events::RadrootsNostrEvent; +use radroots_events::draft::{ + RadrootsDraftError, RadrootsSignedNostrEvent, RadrootsSignedNostrEventParts, +}; +use radroots_nostr::prelude::{ + RadrootsNostrClient, RadrootsNostrEventVerification, RadrootsNostrFilter, RadrootsNostrKind, + RadrootsNostrPublicKey, radroots_nostr_verify_event, +}; use radroots_publish_proxy_protocol::{ PublishDeliveryPolicy, PublishEventRequest, PublishEventResponse, PublishJobStatus, PublishJobView, PublishRelayOutcome, PublishRelayOutcomeKind, PublishRelayPolicy, - PublishRelaySource, + PublishRelaySource, SignedNostrEventWire, +}; +use radroots_relay_transport::{ + RadrootsNostrClientPublishAdapter, RadrootsRelayOutcome, RadrootsRelayOutcomeKind, + RadrootsRelayPublishAdapter, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, + RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy, }; use rusqlite::types::Type; use rusqlite::{Connection, OptionalExtension, Row, params}; @@ -20,7 +34,7 @@ use crate::app::config::PublishProxyConfig; const TOKEN_PREFIX: &str = "rrd_pp_"; const TOKEN_HASH_PREFIX: &str = "sha256:"; -const SCHEMA_VERSION: i64 = 1; +const SCHEMA_VERSION: i64 = 2; #[derive(Debug, Error)] pub enum PublishProxyError { @@ -32,6 +46,16 @@ pub enum PublishProxyError { Io(#[from] std::io::Error), #[error("invalid publish proxy scope: {0}")] InvalidScope(String), + #[error("invalid signed Nostr event: {0}")] + InvalidSignedEvent(String), + #[error("signed Nostr event verification failed: {0:?}")] + SignedEventVerification(RadrootsNostrEventVerification), + #[error("signed Nostr event conversion error: {0}")] + Draft(#[from] RadrootsDraftError), + #[error("publish proxy relay error: {0}")] + Relay(#[from] RadrootsRelayTransportError), + #[error("publish proxy transport error: {0}")] + Transport(String), #[error("publish proxy idempotency conflict for key `{0}`")] IdempotencyConflict(String), } @@ -40,17 +64,330 @@ pub enum PublishProxyError { pub struct PublishProxy { pub config: PublishProxyConfig, pub store: PublishProxyStore, + publisher: Option<Arc<dyn RadrootsRelayPublishAdapter>>, } impl PublishProxy { pub fn open(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { let store = PublishProxyStore::open(config.database_path.clone())?; - Ok(Self { config, store }) + Ok(Self { + config, + store, + publisher: None, + }) } pub fn memory(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { let store = PublishProxyStore::memory()?; - Ok(Self { config, store }) + Ok(Self { + config, + store, + publisher: None, + }) + } + + pub fn with_publisher(mut self, publisher: Arc<dyn RadrootsRelayPublishAdapter>) -> Self { + self.publisher = Some(publisher); + self + } + + pub async fn publish_event( + &self, + principal: &PublishPrincipal, + request: PublishEventRequest, + ) -> Result<PublishEventResponse, PublishProxyError> { + request + .validate(self.config.max_relays_per_request) + .map_err(|error| { + PublishProxyError::InvalidSignedEvent(format!( + "publish request validation failed: {error}" + )) + })?; + principal.allows_event(&request)?; + let signed_event = signed_event_from_wire(&request.event)?; + if signed_event.raw_json.len() > self.config.max_event_bytes { + return Err(PublishProxyError::InvalidSignedEvent( + "signed event exceeds publish_proxy max_event_bytes".to_owned(), + )); + } + let request_fingerprint = request_intent_fingerprint( + principal.principal_id.as_str(), + signed_event.raw_json.as_str(), + &request, + )?; + let resolution = self + .resolve_relays_for_request(signed_event.pubkey.as_str(), &request) + .await?; + let response = self.store.record_publish_job(PublishJobInsert { + principal_id: principal.principal_id.clone(), + idempotency_key: request.idempotency_key.clone(), + request: request.clone(), + request_fingerprint, + effective_relay_count: resolution.targets.len(), + })?; + if response.deduplicated { + return Ok(response); + } + let completed = self + .complete_job_execution( + response.job.job_id.as_str(), + signed_event, + request.delivery_policy.clone(), + request.timeout_ms, + resolution, + ) + .await?; + Ok(PublishEventResponse { + deduplicated: false, + job: completed, + }) + } + + pub async fn resolve_relays_for_request( + &self, + pubkey: &str, + request: &PublishEventRequest, + ) -> Result<PublishRelayResolution, PublishProxyError> { + match request.relay_policy { + PublishRelayPolicy::ExplicitOnly => self.resolve_request_relays(&request.relays), + PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault => { + if !request.relays.is_empty() { + self.resolve_request_relays(&request.relays) + } else { + self.resolve_author_or_default_relays(pubkey).await + } + } + PublishRelayPolicy::AuthorWriteThenDaemonDefault => { + self.resolve_author_or_default_relays(pubkey).await + } + PublishRelayPolicy::DaemonDefaultOnly => self.resolve_daemon_default_relays(), + } + } + + async fn resolve_author_or_default_relays( + &self, + pubkey: &str, + ) -> Result<PublishRelayResolution, PublishProxyError> { + let author_relays = self.resolve_author_write_relays(pubkey).await?; + if !author_relays.targets.is_empty() { + Ok(author_relays) + } else { + self.resolve_daemon_default_relays() + } + } + + fn resolve_request_relays( + &self, + relays: &[String], + ) -> Result<PublishRelayResolution, PublishProxyError> { + let mut targets = Vec::new(); + let mut outcomes = Vec::new(); + for relay in relays { + match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) { + Ok(url) => push_resolved_relay(&mut targets, url, PublishRelaySource::Request), + Err(error) => outcomes.push(PublishRelayOutcome { + relay_url: relay.trim().to_owned(), + source: PublishRelaySource::Request, + attempted: false, + outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected, + message: Some(error.to_string()), + latency_ms: None, + }), + } + } + Ok(PublishRelayResolution { targets, outcomes }) + } + + async fn resolve_author_write_relays( + &self, + pubkey: &str, + ) -> Result<PublishRelayResolution, PublishProxyError> { + let cached = self.store.cached_author_write_relays(pubkey)?; + let cached_targets = self.resolve_author_relay_inputs(&cached)?; + if !cached_targets.is_empty() { + return Ok(PublishRelayResolution { + targets: cached_targets, + outcomes: Vec::new(), + }); + } + if self.config.author_relay_discovery_relays.is_empty() { + return Ok(PublishRelayResolution::empty()); + } + let discovery_targets = self.resolve_config_relays( + &self.config.author_relay_discovery_relays, + PublishRelaySource::DaemonDefault, + "publish_proxy author_relay_discovery_relays", + )?; + let discovered = self + .fetch_author_write_relays(pubkey, discovery_targets) + .await?; + self.store.cache_author_write_relays(pubkey, &discovered)?; + let targets = self.resolve_author_relay_inputs(&discovered)?; + Ok(PublishRelayResolution { + targets, + outcomes: Vec::new(), + }) + } + + fn resolve_author_relay_inputs( + &self, + relays: &[String], + ) -> Result<Vec<ResolvedPublishRelay>, PublishProxyError> { + let mut targets = Vec::new(); + for relay in relays { + if let Ok(url) = RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) { + push_resolved_relay(&mut targets, url, PublishRelaySource::AuthorWrite); + } + } + Ok(targets) + } + + fn resolve_daemon_default_relays(&self) -> Result<PublishRelayResolution, PublishProxyError> { + let targets = self.resolve_config_relays( + &self.config.daemon_default_publish_relays, + PublishRelaySource::DaemonDefault, + "publish_proxy daemon_default_publish_relays", + )?; + Ok(PublishRelayResolution { + targets, + outcomes: Vec::new(), + }) + } + + fn resolve_config_relays( + &self, + relays: &[String], + source: PublishRelaySource, + label: &str, + ) -> Result<Vec<ResolvedPublishRelay>, PublishProxyError> { + let mut targets = Vec::new(); + for relay in relays { + let url = RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)).map_err( + |error| { + PublishProxyError::InvalidScope(format!( + "{label} contains invalid relay URL: {error}" + )) + }, + )?; + push_resolved_relay(&mut targets, url, source); + } + Ok(targets) + } + + async fn fetch_author_write_relays( + &self, + pubkey: &str, + discovery_targets: Vec<ResolvedPublishRelay>, + ) -> Result<Vec<String>, PublishProxyError> { + let Ok(public_key) = RadrootsNostrPublicKey::from_hex(pubkey) else { + return Ok(Vec::new()); + }; + let client = RadrootsNostrClient::new_signerless(); + for target in discovery_targets { + if client.add_read_relay(target.url.as_str()).await.is_err() { + return Ok(Vec::new()); + } + } + let filter = RadrootsNostrFilter::new() + .author(public_key) + .kind(RadrootsNostrKind::Custom(10_002)) + .limit(10); + let timeout = Duration::from_secs(self.config.connect_timeout_secs); + let Ok(events) = client.fetch_events(filter, timeout).await else { + return Ok(Vec::new()); + }; + let Some(event) = events.into_iter().max_by(|left, right| { + left.created_at + .as_secs() + .cmp(&right.created_at.as_secs()) + .then_with(|| left.id.to_hex().cmp(&right.id.to_hex())) + }) else { + return Ok(Vec::new()); + }; + Ok(author_write_relays_from_nip65_event(&event)) + } + + async fn complete_job_execution( + &self, + job_id: &str, + signed_event: RadrootsSignedNostrEvent, + delivery_policy: PublishDeliveryPolicy, + timeout_ms: Option<u64>, + resolution: PublishRelayResolution, + ) -> Result<PublishJobView, PublishProxyError> { + if resolution.targets.is_empty() { + self.store.complete_publish_job( + job_id, + PublishJobStatus::Rejected, + resolution.outcomes, + Some("no_publish_relays".to_owned()), + )?; + return self.store.job_by_id(job_id); + } + let required_ack_count = delivery_policy.required_ack_count(resolution.targets.len()); + if required_ack_count > resolution.targets.len() { + self.store.complete_publish_job( + job_id, + PublishJobStatus::Rejected, + resolution.outcomes, + Some("delivery_quorum_exceeds_relay_count".to_owned()), + )?; + return self.store.job_by_id(job_id); + } + let source_by_relay = resolution.source_by_relay(); + let target_set = RadrootsRelayTargetSet::from_urls( + resolution + .targets + .iter() + .map(|target| target.url.clone()) + .collect(), + )?; + let publish_request = + RadrootsRelayPublishRequest::new(signed_event, target_set, current_unix_millis()) + .with_accepted_quorum(required_ack_count); + let started = Instant::now(); + let publish_timeout = Duration::from_millis( + timeout_ms.unwrap_or_else(|| self.config.connect_timeout_secs.saturating_mul(1_000)), + ); + let receipts = + match tokio::time::timeout(publish_timeout, self.publish_with_adapter(publish_request)) + .await + { + Ok(Ok(receipts)) => receipts, + Ok(Err(error)) => transport_error_receipts(&resolution.targets, error), + Err(_) => timeout_receipts(&resolution.targets), + }; + let latency_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX); + let mut outcomes = resolution.outcomes; + outcomes.extend(receipts.into_iter().map(|receipt| { + publish_outcome_from_receipt(receipt, &source_by_relay, Some(latency_ms)) + })); + let status = delivery_status(&delivery_policy, resolution.targets.len(), &outcomes); + let last_error = if status == PublishJobStatus::DeliverySatisfied { + None + } else { + Some("delivery_unsatisfied".to_owned()) + }; + self.store + .complete_publish_job(job_id, status, outcomes, last_error)?; + self.store.job_by_id(job_id) + } + + async fn publish_with_adapter( + &self, + request: RadrootsRelayPublishRequest, + ) -> Result<Vec<RadrootsRelayPublishRelayReceipt>, PublishProxyError> { + if let Some(publisher) = &self.publisher { + return publisher + .publish(request) + .await + .map_err(PublishProxyError::Relay); + } + let adapter = RadrootsNostrClientPublishAdapter::new(RadrootsNostrClient::new_signerless()); + adapter + .publish(request) + .await + .map_err(PublishProxyError::Relay) } } @@ -153,6 +490,36 @@ pub struct PublishJobInsert { pub principal_id: String, pub idempotency_key: Option<String>, pub request: PublishEventRequest, + pub request_fingerprint: String, + pub effective_relay_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ResolvedPublishRelay { + pub url: RadrootsRelayUrl, + pub source: PublishRelaySource, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PublishRelayResolution { + pub targets: Vec<ResolvedPublishRelay>, + pub outcomes: Vec<PublishRelayOutcome>, +} + +impl PublishRelayResolution { + fn empty() -> Self { + Self { + targets: Vec::new(), + outcomes: Vec::new(), + } + } + + fn source_by_relay(&self) -> BTreeMap<String, PublishRelaySource> { + self.targets + .iter() + .map(|target| (target.url.as_str().to_owned(), target.source)) + .collect() + } } impl PublishProxyStore { @@ -192,6 +559,7 @@ impl PublishProxyStore { job_id TEXT PRIMARY KEY NOT NULL, principal_id TEXT NOT NULL, idempotency_key TEXT, + request_fingerprint TEXT NOT NULL, status TEXT NOT NULL, event_id TEXT NOT NULL, event_pubkey TEXT NOT NULL, @@ -199,6 +567,7 @@ impl PublishProxyStore { relay_policy_json TEXT NOT NULL, delivery_policy_json TEXT NOT NULL, requested_relay_count INTEGER NOT NULL, + effective_relay_count INTEGER NOT NULL, request_json TEXT NOT NULL, requested_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL, @@ -228,6 +597,7 @@ impl PublishProxyStore { ); "#, )?; + migrate_schema(&connection)?; connection.pragma_update(None, "user_version", SCHEMA_VERSION)?; Ok(Self { inner: Arc::new(Mutex::new(connection)), @@ -351,9 +721,14 @@ impl PublishProxyStore { if let Some(existing) = self.job_for_principal_id_and_key(insert.principal_id.as_str(), idempotency_key)? { + if existing.request_fingerprint != insert.request_fingerprint { + return Err(PublishProxyError::IdempotencyConflict( + idempotency_key.to_owned(), + )); + } return Ok(PublishEventResponse { deduplicated: true, - job: existing, + job: existing.view, }); } } @@ -371,6 +746,7 @@ impl PublishProxyStore { job_id, principal_id, idempotency_key, + request_fingerprint, status, event_id, event_pubkey, @@ -378,23 +754,26 @@ impl PublishProxyStore { relay_policy_json, delivery_policy_json, requested_relay_count, + effective_relay_count, request_json, requested_at_ms, updated_at_ms ) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) "#, params![ job_id, insert.principal_id, insert.idempotency_key, - serde_json::to_string(&PublishJobStatus::Accepted)?, + insert.request_fingerprint, + serde_json::to_string(&PublishJobStatus::Publishing)?, insert.request.event.id, insert.request.event.pubkey, insert.request.event.kind, serde_json::to_string(&insert.request.relay_policy)?, serde_json::to_string(&insert.request.delivery_policy)?, insert.request.relays.len(), + insert.effective_relay_count, request_json, now, now, @@ -412,9 +791,7 @@ impl PublishProxyStore { Err(error) => return Err(error.into()), } drop(connection); - let job = self - .job_by_id_for_principal_id(job_id.as_str(), insert.principal_id.as_str())? - .ok_or_else(|| PublishProxyError::InvalidScope("created job missing".to_owned()))?; + let job = self.job_by_id(job_id.as_str())?; Ok(PublishEventResponse { deduplicated: false, job, @@ -487,7 +864,7 @@ impl PublishProxyStore { &self, principal_id: &str, idempotency_key: &str, - ) -> Result<Option<PublishJobView>, PublishProxyError> { + ) -> Result<Option<PublishJobRow>, PublishProxyError> { let connection = self .inner .lock() @@ -506,29 +883,135 @@ impl PublishProxyStore { }; job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; finalize_job_view(&mut job.view); - Ok(Some(job.view)) + Ok(Some(job)) } - fn job_by_id_for_principal_id( - &self, - job_id: &str, - principal_id: &str, - ) -> Result<Option<PublishJobView>, PublishProxyError> { + pub fn job_by_id(&self, job_id: &str) -> Result<PublishJobView, PublishProxyError> { let connection = self .inner .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); - let sql = job_select_sql("WHERE job_id = ?1 AND principal_id = ?2"); + let sql = job_select_sql("WHERE job_id = ?1"); let row = connection - .query_row(sql.as_str(), params![job_id, principal_id], job_from_row) + .query_row(sql.as_str(), params![job_id], job_from_row) .optional()?; drop(connection); let Some(mut job) = row else { - return Ok(None); + return Err(PublishProxyError::InvalidScope( + "unknown publish job".to_owned(), + )); }; job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; finalize_job_view(&mut job.view); - Ok(Some(job.view)) + Ok(job.view) + } + + pub fn complete_publish_job( + &self, + job_id: &str, + status: PublishJobStatus, + outcomes: Vec<PublishRelayOutcome>, + last_error: Option<String>, + ) -> Result<(), PublishProxyError> { + let now = current_unix_millis(); + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + connection.execute( + r#" + UPDATE publish_proxy_jobs + SET status = ?2, + updated_at_ms = ?3, + completed_at_ms = ?4, + last_error = ?5 + WHERE job_id = ?1 + "#, + params![ + job_id, + serde_json::to_string(&status)?, + now, + now, + last_error, + ], + )?; + connection.execute( + "DELETE FROM publish_proxy_relay_results WHERE job_id = ?1", + params![job_id], + )?; + for outcome in outcomes { + connection.execute( + r#" + INSERT OR REPLACE INTO publish_proxy_relay_results ( + job_id, + relay_url, + source, + attempted, + outcome_kind, + message, + latency_ms, + updated_at_ms + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + "#, + params![ + job_id, + outcome.relay_url, + serde_json::to_string(&outcome.source)?, + outcome.attempted, + serde_json::to_string(&outcome.outcome_kind)?, + outcome.message, + outcome + .latency_ms + .and_then(|value| i64::try_from(value).ok()), + now, + ], + )?; + } + Ok(()) + } + + pub fn cached_author_write_relays( + &self, + pubkey: &str, + ) -> Result<Vec<String>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let relays_json = connection + .query_row( + "SELECT relays_json FROM publish_proxy_relay_list_cache WHERE pubkey = ?1", + params![pubkey], + |row| row.get::<_, String>(0), + ) + .optional()?; + relays_json + .map(|value| serde_json::from_str(value.as_str()).map_err(PublishProxyError::from)) + .unwrap_or_else(|| Ok(Vec::new())) + } + + pub fn cache_author_write_relays( + &self, + pubkey: &str, + relays: &[String], + ) -> Result<(), PublishProxyError> { + let now = current_unix_millis(); + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + connection.execute( + r#" + INSERT INTO publish_proxy_relay_list_cache (pubkey, relays_json, updated_at_ms) + VALUES (?1, ?2, ?3) + ON CONFLICT(pubkey) DO UPDATE SET + relays_json = excluded.relays_json, + updated_at_ms = excluded.updated_at_ms + "#, + params![pubkey, serde_json::to_string(relays)?, now], + )?; + Ok(()) } fn relay_outcomes(&self, job_id: &str) -> Result<Vec<PublishRelayOutcome>, PublishProxyError> { @@ -553,22 +1036,59 @@ impl PublishProxyStore { struct PublishJobRow { principal_id: String, + request_fingerprint: String, view: PublishJobView, } +fn migrate_schema(connection: &Connection) -> Result<(), PublishProxyError> { + let version: i64 = connection.pragma_query_value(None, "user_version", |row| row.get(0))?; + if version < 2 { + if !table_has_column(connection, "publish_proxy_jobs", "request_fingerprint")? { + connection.execute( + "ALTER TABLE publish_proxy_jobs ADD COLUMN request_fingerprint TEXT NOT NULL DEFAULT ''", + [], + )?; + } + if !table_has_column(connection, "publish_proxy_jobs", "effective_relay_count")? { + connection.execute( + "ALTER TABLE publish_proxy_jobs ADD COLUMN effective_relay_count INTEGER NOT NULL DEFAULT 0", + [], + )?; + connection.execute( + "UPDATE publish_proxy_jobs SET effective_relay_count = requested_relay_count WHERE effective_relay_count = 0", + [], + )?; + } + } + Ok(()) +} + +fn table_has_column( + connection: &Connection, + table: &str, + column: &str, +) -> Result<bool, PublishProxyError> { + let mut stmt = connection.prepare(format!("PRAGMA table_info({table})").as_str())?; + let columns = stmt + .query_map([], |row| row.get::<_, String>(1))? + .collect::<Result<Vec<_>, _>>()?; + Ok(columns.iter().any(|existing| existing == column)) +} + fn job_select_sql(tail: &str) -> String { format!( r#" SELECT job_id, principal_id, + request_fingerprint, status, event_id, event_pubkey, event_kind, relay_policy_json, delivery_policy_json, - requested_relay_count, + effective_relay_count, requested_at_ms, completed_at_ms, last_error @@ -594,29 +1114,30 @@ fn principal_from_row(row: &Row<'_>) -> Result<PublishPrincipal, rusqlite::Error } fn job_from_row(row: &Row<'_>) -> Result<PublishJobRow, rusqlite::Error> { - let status: PublishJobStatus = json_text(row, 2)?; - let relay_policy: PublishRelayPolicy = json_text(row, 6)?; - let delivery_policy: PublishDeliveryPolicy = json_text(row, 7)?; - let relay_count: i64 = row.get(8)?; + let status: PublishJobStatus = json_text(row, 3)?; + let relay_policy: PublishRelayPolicy = json_text(row, 7)?; + let delivery_policy: PublishDeliveryPolicy = json_text(row, 8)?; + let relay_count: i64 = row.get(9)?; Ok(PublishJobRow { principal_id: row.get(1)?, + request_fingerprint: row.get(2)?, view: PublishJobView { job_id: row.get(0)?, status, terminal: false, delivery_satisfied: false, - event_id: row.get(3)?, - pubkey: row.get(4)?, - event_kind: row.get::<_, i64>(5)? as u32, + event_id: row.get(4)?, + pubkey: row.get(5)?, + event_kind: row.get::<_, i64>(6)? as u32, relay_policy, delivery_policy, relay_count: usize::try_from(relay_count).unwrap_or(0), acknowledged_count: 0, retryable_count: 0, terminal_count: 0, - requested_at_ms: row.get(9)?, - completed_at_ms: row.get(10)?, - last_error: row.get(11)?, + requested_at_ms: row.get(10)?, + completed_at_ms: row.get(11)?, + last_error: row.get(12)?, relays: Vec::new(), }, }) @@ -737,6 +1258,211 @@ pub fn parse_relay_policy(value: &str) -> Result<PublishRelayPolicy, PublishProx } } +fn signed_event_from_wire( + event: &SignedNostrEventWire, +) -> Result<RadrootsSignedNostrEvent, PublishProxyError> { + event + .validate() + .map_err(|error| PublishProxyError::InvalidSignedEvent(error.to_string()))?; + let created_at = u32::try_from(event.created_at).map_err(|_| { + PublishProxyError::InvalidSignedEvent( + "signed event created_at exceeds daemon-supported range".to_owned(), + ) + })?; + let raw_json = serde_json::to_string(event)?; + let radroots_event = RadrootsNostrEvent { + id: event.id.clone(), + author: event.pubkey.clone(), + created_at, + kind: event.kind, + tags: event.tags.clone(), + content: event.content.clone(), + sig: event.sig.clone(), + }; + match radroots_nostr_verify_event(&radroots_event) { + RadrootsNostrEventVerification::Verified => {} + verification => return Err(PublishProxyError::SignedEventVerification(verification)), + } + RadrootsSignedNostrEvent::new(RadrootsSignedNostrEventParts { + id: event.id.clone(), + pubkey: event.pubkey.clone(), + created_at, + kind: event.kind, + tags: event.tags.clone(), + content: event.content.clone(), + sig: event.sig.clone(), + raw_json, + }) + .map_err(PublishProxyError::from) +} + +fn request_intent_fingerprint( + principal_id: &str, + canonical_event_json: &str, + request: &PublishEventRequest, +) -> Result<String, PublishProxyError> { + #[derive(Serialize)] + struct FingerprintInput<'a> { + principal_id: &'a str, + canonical_event_json: &'a str, + relays: Vec<String>, + relay_policy: &'a PublishRelayPolicy, + delivery_policy: &'a PublishDeliveryPolicy, + } + + let input = FingerprintInput { + principal_id, + canonical_event_json, + relays: request + .relays + .iter() + .map(|relay| relay.trim().to_owned()) + .collect(), + relay_policy: &request.relay_policy, + delivery_policy: &request.delivery_policy, + }; + let bytes = serde_json::to_vec(&input)?; + let mut hasher = Sha256::new(); + hasher.update(bytes); + Ok(hex_lower(&hasher.finalize())) +} + +fn push_resolved_relay( + targets: &mut Vec<ResolvedPublishRelay>, + url: RadrootsRelayUrl, + source: PublishRelaySource, +) { + if !targets.iter().any(|target| target.url == url) { + targets.push(ResolvedPublishRelay { url, source }); + } +} + +fn relay_url_policy(config: &PublishProxyConfig) -> RadrootsRelayUrlPolicy { + match config.relay_url_policy { + crate::app::config::PublishProxyRelayUrlPolicy::Public => RadrootsRelayUrlPolicy::Public, + crate::app::config::PublishProxyRelayUrlPolicy::Localhost => { + RadrootsRelayUrlPolicy::Localhost + } + } +} + +fn author_write_relays_from_nip65_event( + event: &radroots_nostr::prelude::RadrootsNostrEvent, +) -> Vec<String> { + event + .tags + .iter() + .filter_map(|tag| { + let values = tag.as_slice(); + if values.first().map(String::as_str) != Some("r") { + return None; + } + let relay = values.get(1)?.trim(); + if relay.is_empty() { + return None; + } + if values.get(2).map(String::as_str) == Some("read") { + return None; + } + Some(relay.to_owned()) + }) + .collect() +} + +fn publish_outcome_from_receipt( + receipt: RadrootsRelayPublishRelayReceipt, + source_by_relay: &BTreeMap<String, PublishRelaySource>, + latency_ms: Option<u64>, +) -> PublishRelayOutcome { + let source = source_by_relay + .get(receipt.relay_url.as_str()) + .copied() + .unwrap_or(PublishRelaySource::DaemonDefault); + PublishRelayOutcome { + relay_url: receipt.relay_url, + source, + attempted: receipt.attempted, + outcome_kind: publish_outcome_kind(receipt.outcome.kind), + message: receipt.outcome.message, + latency_ms, + } +} + +fn publish_outcome_kind(kind: RadrootsRelayOutcomeKind) -> PublishRelayOutcomeKind { + match kind { + RadrootsRelayOutcomeKind::Accepted => PublishRelayOutcomeKind::Accepted, + RadrootsRelayOutcomeKind::DuplicateAccepted => PublishRelayOutcomeKind::DuplicateAccepted, + RadrootsRelayOutcomeKind::Blocked => PublishRelayOutcomeKind::Blocked, + RadrootsRelayOutcomeKind::RateLimited => PublishRelayOutcomeKind::RateLimited, + RadrootsRelayOutcomeKind::Invalid => PublishRelayOutcomeKind::Invalid, + RadrootsRelayOutcomeKind::PowRequired => PublishRelayOutcomeKind::PowRequired, + RadrootsRelayOutcomeKind::Restricted => PublishRelayOutcomeKind::Restricted, + RadrootsRelayOutcomeKind::AuthRequired => PublishRelayOutcomeKind::AuthRequired, + RadrootsRelayOutcomeKind::Muted => PublishRelayOutcomeKind::Muted, + RadrootsRelayOutcomeKind::Unsupported => PublishRelayOutcomeKind::Unsupported, + RadrootsRelayOutcomeKind::PaymentRequired => PublishRelayOutcomeKind::PaymentRequired, + RadrootsRelayOutcomeKind::Error => PublishRelayOutcomeKind::Error, + RadrootsRelayOutcomeKind::Timeout => PublishRelayOutcomeKind::Timeout, + RadrootsRelayOutcomeKind::ConnectionFailed => PublishRelayOutcomeKind::ConnectionFailed, + RadrootsRelayOutcomeKind::RelayUrlRejected => PublishRelayOutcomeKind::RelayUrlRejected, + RadrootsRelayOutcomeKind::SkippedAlreadyAccepted => { + PublishRelayOutcomeKind::SkippedAlreadyAccepted + } + RadrootsRelayOutcomeKind::Unknown => PublishRelayOutcomeKind::Unknown, + } +} + +fn delivery_status( + delivery_policy: &PublishDeliveryPolicy, + relay_count: usize, + outcomes: &[PublishRelayOutcome], +) -> PublishJobStatus { + let required = delivery_policy.required_ack_count(relay_count); + let acknowledged = outcomes + .iter() + .filter(|outcome| outcome.outcome_kind.counts_toward_quorum()) + .count(); + if acknowledged >= required { + return PublishJobStatus::DeliverySatisfied; + } + if outcomes + .iter() + .any(|outcome| outcome.outcome_kind.is_retryable()) + { + PublishJobStatus::DeliveryUnsatisfiedRetryable + } else { + PublishJobStatus::DeliveryUnsatisfiedTerminal + } +} + +fn timeout_receipts(targets: &[ResolvedPublishRelay]) -> Vec<RadrootsRelayPublishRelayReceipt> { + targets + .iter() + .map(|target| { + RadrootsRelayPublishRelayReceipt::attempted( + target.url.as_str(), + RadrootsRelayOutcome::timeout("timeout: publish attempt exceeded daemon bound"), + ) + }) + .collect() +} + +fn transport_error_receipts( + targets: &[ResolvedPublishRelay], + error: PublishProxyError, +) -> Vec<RadrootsRelayPublishRelayReceipt> { + let message = format!("error: {error}"); + targets + .iter() + .map(|target| { + RadrootsRelayPublishRelayReceipt::attempted( + target.url.as_str(), + RadrootsRelayOutcome::connection_failed(message.clone()), + ) + }) + .collect() +} + pub fn write_token_file(path: &Path, token: &str) -> Result<(), PublishProxyError> { if let Some(parent) = path .parent() @@ -816,12 +1542,25 @@ fn current_unix_millis() -> i64 { #[cfg(test)] mod tests { use super::{ - PublishJobInsert, PublishJobVisibility, PublishPrincipalInit, PublishProxyStore, - generate_bearer_token, hash_bearer_token, parse_relay_policy, + PublishJobInsert, PublishJobVisibility, PublishPrincipal, PublishPrincipalInit, + PublishProxy, PublishProxyError, PublishProxyStore, generate_bearer_token, + hash_bearer_token, parse_relay_policy, + }; + use crate::app::config::PublishProxyConfig; + use nostr::JsonUtil; + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::{ + RadrootsNostrEventVerification, RadrootsNostrTimestamp, radroots_nostr_build_event, }; use radroots_publish_proxy_protocol::{ - PublishDeliveryPolicy, PublishEventRequest, PublishRelayPolicy, SignedNostrEventWire, + PublishDeliveryPolicy, PublishEventRequest, PublishJobStatus, PublishRelayOutcomeKind, + PublishRelayPolicy, PublishRelaySource, SignedNostrEventWire, }; + use radroots_relay_transport::{RadrootsMockRelayPublishAdapter, RadrootsRelayOutcome}; + use std::sync::Arc; + + const RELAY_PRIMARY: &str = "wss://relay.example.com"; + const RELAY_SECONDARY: &str = "wss://relay-2.example.com"; fn event(pubkey: &str, kind: u32) -> SignedNostrEventWire { SignedNostrEventWire { @@ -846,6 +1585,75 @@ mod tests { } } + fn signed_event(identity: &RadrootsIdentity, content: &str) -> SignedNostrEventWire { + let event = radroots_nostr_build_event( + 30_402, + content, + vec![vec!["d".to_owned(), "listing-1".to_owned()]], + ) + .expect("event builder") + .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) + .sign_with_keys(identity.keys()) + .expect("signed event"); + serde_json::from_str(event.as_json().as_str()).expect("event wire") + } + + fn publish_request( + event: SignedNostrEventWire, + relays: Vec<String>, + relay_policy: PublishRelayPolicy, + delivery_policy: PublishDeliveryPolicy, + idempotency_key: Option<&str>, + ) -> PublishEventRequest { + PublishEventRequest { + event, + relays, + relay_policy, + delivery_policy, + idempotency_key: idempotency_key.map(str::to_owned), + timeout_ms: Some(5_000), + } + } + + fn publish_proxy( + config: PublishProxyConfig, + ) -> (PublishProxy, RadrootsMockRelayPublishAdapter) { + let adapter = RadrootsMockRelayPublishAdapter::new(); + let proxy = PublishProxy::memory(config) + .expect("proxy") + .with_publisher(Arc::new(adapter.clone())); + (proxy, adapter) + } + + fn principal( + proxy: &PublishProxy, + pubkey: String, + policies: Vec<PublishRelayPolicy>, + allow_request_relays: bool, + visibility: PublishJobVisibility, + ) -> PublishPrincipal { + proxy + .store + .create_principal(PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: hash_bearer_token(generate_bearer_token().as_str()), + allowed_pubkeys: vec![pubkey], + allowed_kinds: vec![30_402], + allowed_relay_policies: policies, + allow_request_relays, + job_visibility: visibility, + expires_at_unix: None, + }) + .expect("principal") + } + + fn config_with_defaults(relays: Vec<&str>) -> PublishProxyConfig { + PublishProxyConfig { + daemon_default_publish_relays: relays.into_iter().map(str::to_owned).collect(), + ..PublishProxyConfig::default() + } + } + #[test] fn token_generation_and_hashing_do_not_store_plaintext() { let token = generate_bearer_token(); @@ -899,6 +1707,8 @@ mod tests { principal_id: principal.principal_id.clone(), idempotency_key: Some("idem-1".to_owned()), request: accepted.clone(), + request_fingerprint: "fingerprint-1".to_owned(), + effective_relay_count: 1, }) .expect("record job"); assert!(!response.deduplicated); @@ -907,6 +1717,8 @@ mod tests { principal_id: principal.principal_id.clone(), idempotency_key: Some("idem-1".to_owned()), request: accepted, + request_fingerprint: "fingerprint-1".to_owned(), + effective_relay_count: 1, }) .expect("dedupe"); assert!(duplicate.deduplicated); @@ -919,4 +1731,427 @@ mod tests { 1 ); } + + #[tokio::test] + async fn publish_event_verifies_and_records_daemon_default_outcome() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let event = signed_event(&identity, "{}"); + let raw_event = serde_json::to_string(&event).expect("raw event"); + let response = proxy + .publish_event( + &principal, + publish_request( + event, + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + Some("idem-valid"), + ), + ) + .await + .expect("publish"); + + assert!(!response.deduplicated); + assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); + assert_eq!(response.job.relay_count, 1); + assert_eq!(response.job.acknowledged_count, 1); + assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); + assert_eq!( + response.job.relays[0].source, + PublishRelaySource::DaemonDefault + ); + assert_eq!(adapter.captured_raw_events(), vec![raw_event]); + } + + #[tokio::test] + async fn publish_event_rejects_tampered_content_before_publish() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let mut event = signed_event(&identity, "trusted"); + event.content = "tampered".to_owned(); + let error = proxy + .publish_event( + &principal, + publish_request( + event, + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect_err("tampered event should fail"); + + assert!(matches!( + error, + PublishProxyError::SignedEventVerification(RadrootsNostrEventVerification::IdMismatch) + )); + assert!(adapter.captured_raw_events().is_empty()); + } + + #[tokio::test] + async fn publish_event_rejects_wrong_signature_before_publish() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let mut event = signed_event(&identity, "{}"); + let replacement = if event.sig.starts_with('0') { "1" } else { "0" }; + event.sig.replace_range(0..1, replacement); + let error = proxy + .publish_event( + &principal, + publish_request( + event, + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect_err("wrong signature should fail"); + + assert!(matches!( + error, + PublishProxyError::SignedEventVerification( + RadrootsNostrEventVerification::SignatureInvalid + ) + )); + assert!(adapter.captured_raw_events().is_empty()); + } + + #[tokio::test] + async fn publish_event_rejects_malformed_wire_fields() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let mut event = signed_event(&identity, "{}"); + event.id = event.id.to_uppercase(); + let error = proxy + .publish_event( + &principal, + publish_request( + event, + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect_err("malformed field should fail"); + + assert!(matches!(error, PublishProxyError::InvalidSignedEvent(_))); + assert!(adapter.captured_raw_events().is_empty()); + } + + #[tokio::test] + async fn publish_event_uses_explicit_request_relays_when_allowed() { + let identity = RadrootsIdentity::generate(); + let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault], + true, + PublishJobVisibility::Own, + ); + let response = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "{}"), + vec![RELAY_PRIMARY.to_owned()], + PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); + assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); + assert_eq!(response.job.relays[0].source, PublishRelaySource::Request); + } + + #[tokio::test] + async fn publish_event_uses_cached_nip65_author_write_before_defaults() { + let identity = RadrootsIdentity::generate(); + let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY])); + proxy + .store + .cache_author_write_relays( + identity.public_key_hex().as_str(), + &[RELAY_PRIMARY.to_owned()], + ) + .expect("cache author relays"); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault], + false, + PublishJobVisibility::Own, + ); + let response = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "{}"), + Vec::new(), + PublishRelayPolicy::AuthorWriteThenDaemonDefault, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); + assert_eq!( + response.job.relays[0].source, + PublishRelaySource::AuthorWrite + ); + } + + #[tokio::test] + async fn publish_event_records_no_publish_relays_failure() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(PublishProxyConfig::default()); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let response = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "{}"), + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert_eq!(response.job.status, PublishJobStatus::Rejected); + assert_eq!( + response.job.last_error.as_deref(), + Some("no_publish_relays") + ); + assert!(response.job.relays.is_empty()); + assert!(adapter.captured_raw_events().is_empty()); + } + + #[tokio::test] + async fn publish_event_records_unsafe_request_relay_rejection() { + let identity = RadrootsIdentity::generate(); + let (proxy, adapter) = publish_proxy(PublishProxyConfig::default()); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::ExplicitOnly], + true, + PublishJobVisibility::Own, + ); + let response = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "{}"), + vec!["wss://127.0.0.1:7777".to_owned()], + PublishRelayPolicy::ExplicitOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert_eq!(response.job.status, PublishJobStatus::Rejected); + assert_eq!(response.job.relays.len(), 1); + assert_eq!( + response.job.relays[0].outcome_kind, + PublishRelayOutcomeKind::RelayUrlRejected + ); + assert!(!response.job.relays[0].attempted); + assert!(adapter.captured_raw_events().is_empty()); + } + + #[tokio::test] + async fn publish_event_deduplicates_same_intent_and_conflicts_different_intent() { + let identity = RadrootsIdentity::generate(); + let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let request = publish_request( + signed_event(&identity, "{}"), + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + Some("idem-conflict"), + ); + let first = proxy + .publish_event(&principal, request.clone()) + .await + .expect("first"); + let duplicate = proxy + .publish_event(&principal, request) + .await + .expect("duplicate"); + + assert!(!first.deduplicated); + assert!(duplicate.deduplicated); + assert_eq!(duplicate.job.job_id, first.job.job_id); + + let conflict = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "changed"), + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + Some("idem-conflict"), + ), + ) + .await + .expect_err("conflict"); + assert!(matches!( + conflict, + PublishProxyError::IdempotencyConflict(_) + )); + } + + #[tokio::test] + async fn publish_jobs_respect_own_and_admin_visibility() { + let identity = RadrootsIdentity::generate(); + let other_identity = RadrootsIdentity::generate(); + let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); + let owner = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let other = principal( + &proxy, + other_identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let admin = principal( + &proxy, + other_identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Admin, + ); + let response = proxy + .publish_event( + &owner, + publish_request( + signed_event(&identity, "{}"), + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert!( + proxy + .store + .job_by_id_for_principal(response.job.job_id.as_str(), &other) + .expect("other read") + .is_none() + ); + assert!( + proxy + .store + .job_by_id_for_principal(response.job.job_id.as_str(), &admin) + .expect("admin read") + .is_some() + ); + } + + #[tokio::test] + async fn publish_event_records_retryable_relay_failures() { + let identity = RadrootsIdentity::generate(); + let adapter = RadrootsMockRelayPublishAdapter::new().with_outcome( + RELAY_PRIMARY, + RadrootsRelayOutcome::connection_failed("error: unavailable"), + ); + let proxy = PublishProxy::memory(config_with_defaults(vec![RELAY_PRIMARY])) + .expect("proxy") + .with_publisher(Arc::new(adapter)); + let principal = principal( + &proxy, + identity.public_key_hex(), + vec![PublishRelayPolicy::DaemonDefaultOnly], + false, + PublishJobVisibility::Own, + ); + let response = proxy + .publish_event( + &principal, + publish_request( + signed_event(&identity, "{}"), + Vec::new(), + PublishRelayPolicy::DaemonDefaultOnly, + PublishDeliveryPolicy::Any, + None, + ), + ) + .await + .expect("publish"); + + assert_eq!( + response.job.status, + PublishJobStatus::DeliveryUnsatisfiedRetryable + ); + assert_eq!(response.job.retryable_count, 1); + } } diff --git a/src/transport/jsonrpc/methods/publish_proxy.rs b/src/transport/jsonrpc/methods/publish_proxy.rs @@ -2,11 +2,12 @@ use anyhow::Result; use jsonrpsee::server::RpcModule; use radroots_publish_proxy_protocol::{ METHOD_CAPABILITIES, METHOD_EVENT, METHOD_JOB_GET, METHOD_JOB_LIST, METHOD_RELAYS_RESOLVE, - PublishCapabilities, PublishEventRequest, PublishRelayOutcome, + PublishCapabilities, PublishDeliveryPolicy, PublishEventRequest, PublishRelayOutcome, + PublishRelaySource, }; use serde::{Deserialize, Serialize}; -use crate::core::publish_proxy::PublishJobInsert; +use crate::core::publish_proxy::PublishProxyError; use crate::transport::jsonrpc::auth::require_publish_principal; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; @@ -30,7 +31,14 @@ struct RelaysResolveParams { #[derive(Clone, Debug, Serialize)] struct RelaysResolveResponse { - relays: Vec<PublishRelayOutcome>, + relays: Vec<ResolvedRelayResponseItem>, + rejected_relays: Vec<PublishRelayOutcome>, +} + +#[derive(Clone, Debug, Serialize)] +struct ResolvedRelayResponseItem { + relay_url: String, + source: PublishRelaySource, } pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { @@ -65,38 +73,11 @@ fn register_event(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) let request: PublishEventRequest = params .parse() .map_err(|error| RpcError::InvalidParams(error.to_string()))?; - request - .validate(ctx.state.publish_proxy.config.max_relays_per_request) - .map_err(|error| RpcError::InvalidParams(error.to_string()))?; - let event_size = request.event.content.len() - + request.event.id.len() - + request.event.pubkey.len() - + request.event.sig.len() - + request - .event - .tags - .iter() - .flatten() - .map(String::len) - .sum::<usize>(); - if event_size > ctx.state.publish_proxy.config.max_event_bytes { - return Err(RpcError::InvalidParams( - "signed event exceeds publish_proxy max_event_bytes".to_owned(), - )); - } - principal - .allows_event(&request) - .map_err(|error| RpcError::Unauthorized(error.to_string()))?; - let idempotency_key = request.idempotency_key.clone(); ctx.state .publish_proxy - .store - .record_publish_job(PublishJobInsert { - principal_id: principal.principal_id, - idempotency_key, - request, - }) - .map_err(|error| RpcError::Other(error.to_string())) + .publish_event(&principal, request) + .await + .map_err(rpc_error_from_publish_proxy) })?; Ok(()) } @@ -150,8 +131,8 @@ fn register_relays_resolve( registry.track(METHOD_RELAYS_RESOLVE); module.register_async_method( METHOD_RELAYS_RESOLVE, - |params, _ctx, extensions| async move { - require_publish_principal(&extensions)?; + |params, ctx, extensions| async move { + let principal = require_publish_principal(&extensions)?; let params: RelaysResolveParams = params .parse() .map_err(|error| RpcError::InvalidParams(error.to_string()))?; @@ -159,17 +140,56 @@ fn register_relays_resolve( .event .validate() .map_err(|error| RpcError::InvalidParams(error.to_string()))?; - let _ = params.relay_policy; - let _ = params.relays; - Ok::<RelaysResolveResponse, RpcError>(RelaysResolveResponse { relays: Vec::new() }) + let request = PublishEventRequest { + event: params.event, + relays: params.relays, + relay_policy: params.relay_policy, + delivery_policy: PublishDeliveryPolicy::Any, + idempotency_key: None, + timeout_ms: None, + }; + principal + .allows_event(&request) + .map_err(|error| RpcError::Unauthorized(error.to_string()))?; + let resolution = ctx + .state + .publish_proxy + .resolve_relays_for_request(request.event.pubkey.as_str(), &request) + .await + .map_err(rpc_error_from_publish_proxy)?; + Ok::<RelaysResolveResponse, RpcError>(RelaysResolveResponse { + relays: resolution + .targets + .into_iter() + .map(|target| ResolvedRelayResponseItem { + relay_url: target.url.into_string(), + source: target.source, + }) + .collect(), + rejected_relays: resolution.outcomes, + }) }, )?; Ok(()) } +fn rpc_error_from_publish_proxy(error: PublishProxyError) -> RpcError { + match error { + PublishProxyError::InvalidScope(message) => RpcError::Unauthorized(message), + PublishProxyError::InvalidSignedEvent(message) => RpcError::InvalidParams(message), + PublishProxyError::SignedEventVerification(_) + | PublishProxyError::Draft(_) + | PublishProxyError::Relay(_) => RpcError::InvalidParams(error.to_string()), + PublishProxyError::IdempotencyConflict(_) => RpcError::Other(error.to_string()), + other => RpcError::Other(other.to_string()), + } +} + #[cfg(test)] mod tests { use super::module; + use std::sync::Arc; + use crate::app::config::{Nip46Config, PublishProxyConfig}; use crate::core::Radrootsd; use crate::core::publish_proxy::{ @@ -180,34 +200,55 @@ mod tests { }; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; use jsonrpsee::server::RpcModule; + use nostr::JsonUtil; use radroots_identity::RadrootsIdentity; - use radroots_nostr::prelude::RadrootsNostrMetadata; - use radroots_publish_proxy_protocol::PublishRelayPolicy; + use radroots_nostr::prelude::{ + RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event, + }; + use radroots_publish_proxy_protocol::{PublishRelayPolicy, SignedNostrEventWire}; + use radroots_relay_transport::RadrootsMockRelayPublishAdapter; - fn event_json(pubkey: &str) -> String { - serde_json::json!({ - "id": "0".repeat(64), - "pubkey": pubkey, - "created_at": 1_700_000_000u64, - "kind": 30402u32, - "tags": [["d", "listing-1"]], - "content": "{}", - "sig": "1".repeat(128) - }) - .to_string() + fn signed_event(identity: &RadrootsIdentity) -> SignedNostrEventWire { + let event = radroots_nostr_build_event( + 30_402, + "{}", + vec![vec!["d".to_owned(), "listing-1".to_owned()]], + ) + .expect("event builder") + .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) + .sign_with_keys(identity.keys()) + .expect("signed event"); + serde_json::from_str(event.as_json().as_str()).expect("event wire") } - fn module_with_principal(admin: bool) -> (RpcModule<RpcContext>, RpcContext, String, String) { + fn module_with_principal( + admin: bool, + ) -> ( + RpcModule<RpcContext>, + RpcContext, + String, + SignedNostrEventWire, + ) { let identity = RadrootsIdentity::generate(); + let signed_event = signed_event(&identity); let metadata: RadrootsNostrMetadata = serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let publish_proxy_config = PublishProxyConfig { + daemon_default_publish_relays: vec!["wss://relay.example.com".to_owned()], + ..PublishProxyConfig::default() + }; let state = Radrootsd::new( - identity, + identity.clone(), metadata, - PublishProxyConfig::default(), + publish_proxy_config, Nip46Config::default(), ) .expect("state"); + let mut state = state; + state.publish_proxy = state + .publish_proxy + .clone() + .with_publisher(Arc::new(RadrootsMockRelayPublishAdapter::new())); let token = generate_bearer_token(); let principal = state .publish_proxy @@ -215,7 +256,7 @@ mod tests { .create_principal(PublishPrincipalInit { label: "tester".to_owned(), token_hash: hash_bearer_token(token.as_str()), - allowed_pubkeys: vec!["a".repeat(64)], + allowed_pubkeys: vec![identity.public_key_hex()], allowed_kinds: vec![30_402], allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], allow_request_relays: false, @@ -233,12 +274,12 @@ mod tests { module .extensions_mut() .insert(PublishProxyAuthorization::Authorized(principal)); - (module, ctx, token, "a".repeat(64)) + (module, ctx, token, signed_event) } #[tokio::test] async fn publish_event_records_job_and_deduplicates_idempotency() { - let (module, _ctx, _token, pubkey) = module_with_principal(false); + let (module, _ctx, _token, event) = module_with_principal(false); let request = format!( r#"{{ "jsonrpc":"2.0", @@ -252,7 +293,7 @@ mod tests { }}, "id":1 }}"#, - event_json(pubkey.as_str()) + serde_json::to_string(&event).expect("event json") ); let (response, _stream) = module .raw_json_request(request.as_str(), 1) @@ -269,6 +310,8 @@ mod tests { #[tokio::test] async fn publish_event_rejects_principal_scope_gap() { let (module, _ctx, _token, _pubkey) = module_with_principal(false); + let other_identity = RadrootsIdentity::generate(); + let event = signed_event(&other_identity); let request = format!( r#"{{ "jsonrpc":"2.0", @@ -281,7 +324,7 @@ mod tests { }}, "id":1 }}"#, - event_json("b".repeat(64).as_str()) + serde_json::to_string(&event).expect("event json") ); let (response, _stream) = module .raw_json_request(request.as_str(), 1) @@ -290,6 +333,34 @@ mod tests { assert!(response.get().contains("unauthorized")); } + #[tokio::test] + async fn publish_relays_resolve_returns_daemon_default_targets() { + let (module, _ctx, _token, event) = module_with_principal(false); + let request = format!( + r#"{{ + "jsonrpc":"2.0", + "method":"publish.relays.resolve", + "params":{{ + "event":{}, + "relay_policy":"daemon_default_only", + "relays":[] + }}, + "id":1 + }}"#, + serde_json::to_string(&event).expect("event json") + ); + let (response, _stream) = module + .raw_json_request(request.as_str(), 1) + .await + .expect("request"); + assert!( + response + .get() + .contains("\"relay_url\":\"wss://relay.example.com\"") + ); + assert!(response.get().contains("\"source\":\"daemon_default\"")); + } + #[test] fn http_auth_finds_principal_from_hashed_token() { let (_module, ctx, token, _pubkey) = module_with_principal(false); diff --git a/src/transport/jsonrpc/server.rs b/src/transport/jsonrpc/server.rs @@ -3,16 +3,265 @@ use std::net::SocketAddr; use anyhow::Result; +use jsonrpsee::server::middleware::rpc::{ + Batch, MethodResponse, Notification, Request, RpcServiceBuilder, RpcServiceT, +}; use jsonrpsee::server::{ BatchRequestConfig, HttpBody, HttpRequest, RpcModule, ServerBuilder, ServerConfigBuilder, ServerHandle, }; +use jsonrpsee::types::{ErrorObject, Id}; use crate::app::config::RpcConfig; use crate::core::publish_proxy::PublishProxyStore; use crate::transport::jsonrpc::RpcContext; use crate::transport::jsonrpc::auth; +#[derive(Clone)] +struct RejectPublishNotifications<S> { + service: S, +} + +impl<S> RpcServiceT for RejectPublishNotifications<S> +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Clone + + Send + + Sync + + 'static, +{ + type MethodResponse = MethodResponse; + type NotificationResponse = MethodResponse; + type BatchResponse = MethodResponse; + + fn call<'a>( + &self, + request: Request<'a>, + ) -> impl Future<Output = Self::MethodResponse> + Send + 'a { + self.service.call(request) + } + + fn batch<'a>( + &self, + requests: Batch<'a>, + ) -> impl Future<Output = Self::BatchResponse> + Send + 'a { + self.service.batch(requests) + } + + fn notification<'a>( + &self, + notification: Notification<'a>, + ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a { + let service = self.service.clone(); + async move { + if notification.method_name().starts_with("publish.") { + MethodResponse::error( + Id::Null, + ErrorObject::owned( + -32600, + "publish notifications are not accepted", + None::<()>, + ), + ) + } else { + service.notification(notification).await + } + } + } +} + +#[cfg(test)] +mod tests { + use super::start_server; + use crate::app::config::{Nip46Config, PublishProxyConfig, RpcConfig}; + use crate::core::Radrootsd; + use crate::core::publish_proxy::{ + PublishJobVisibility, PublishPrincipalInit, generate_bearer_token, hash_bearer_token, + }; + use crate::transport::jsonrpc::methods; + use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; + use jsonrpsee::server::RpcModule; + use nostr::JsonUtil; + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::{ + RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event, + }; + use radroots_publish_proxy_protocol::PublishRelayPolicy; + use radroots_relay_transport::RadrootsMockRelayPublishAdapter; + use std::net::{SocketAddr, TcpListener}; + use std::sync::Arc; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + const RELAY_PRIMARY: &str = "wss://relay.example.com"; + + fn unused_addr() -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind local addr"); + listener.local_addr().expect("local addr") + } + + fn signed_event_json(identity: &RadrootsIdentity) -> String { + radroots_nostr_build_event( + 30_402, + "{}", + vec![vec!["d".to_owned(), "listing-1".to_owned()]], + ) + .expect("event builder") + .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) + .sign_with_keys(identity.keys()) + .expect("signed event") + .as_json() + } + + async fn post_json(addr: SocketAddr, body: &str, token: Option<&str>) -> String { + let mut stream = tokio::net::TcpStream::connect(addr).await.expect("connect"); + let auth_header = token + .map(|token| format!("Authorization: Bearer {token}\r\n")) + .unwrap_or_default(); + let request = format!( + "POST / HTTP/1.1\r\nHost: {addr}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n{auth_header}\r\n{body}", + body.len() + ); + stream + .write_all(request.as_bytes()) + .await + .expect("write request"); + let mut bytes = Vec::new(); + stream.read_to_end(&mut bytes).await.expect("read response"); + String::from_utf8(bytes).expect("response utf8") + } + + fn publish_server_state() -> (Radrootsd, String, RadrootsIdentity) { + let identity = RadrootsIdentity::generate(); + let metadata: RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let publish_proxy_config = PublishProxyConfig { + daemon_default_publish_relays: vec![RELAY_PRIMARY.to_owned()], + ..PublishProxyConfig::default() + }; + let mut state = Radrootsd::new( + identity.clone(), + metadata, + publish_proxy_config, + Nip46Config::default(), + ) + .expect("state"); + state.publish_proxy = state + .publish_proxy + .clone() + .with_publisher(Arc::new(RadrootsMockRelayPublishAdapter::new())); + let token = generate_bearer_token(); + state + .publish_proxy + .store + .create_principal(PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: hash_bearer_token(token.as_str()), + allowed_pubkeys: vec![identity.public_key_hex()], + allowed_kinds: vec![30_402], + allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], + allow_request_relays: false, + job_visibility: PublishJobVisibility::Own, + expires_at_unix: None, + }) + .expect("principal"); + (state, token, identity) + } + + async fn start_publish_server( + state: Radrootsd, + rpc_cfg: RpcConfig, + ) -> (SocketAddr, jsonrpsee::server::ServerHandle) { + let addr = unused_addr(); + let store = state.publish_proxy.store.clone(); + let registry = MethodRegistry::default(); + let ctx = RpcContext::new(state, registry.clone()); + let mut root = RpcModule::new(ctx.clone()); + methods::register_all(&mut root, ctx, registry).expect("register methods"); + let handle = start_server(addr, &rpc_cfg, store, root) + .await + .expect("start server"); + (addr, handle) + } + + #[tokio::test] + async fn publish_notifications_do_not_create_jobs() { + let (state, token, identity) = publish_server_state(); + let store = state.publish_proxy.store.clone(); + let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; + let notification = format!( + r#"{{ + "jsonrpc":"2.0", + "method":"publish.event", + "params":{{ + "event":{}, + "relays":[], + "relay_policy":"daemon_default_only", + "delivery_policy":{{"mode":"any"}} + }} + }}"#, + signed_event_json(&identity) + ); + let response = post_json(addr, notification.as_str(), Some(token.as_str())).await; + handle.stop().expect("stop server"); + + assert!( + response.contains("publish notifications are not accepted") + || response.ends_with("\r\n\r\n") + ); + let principal = store + .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str()) + .expect("principal lookup") + .expect("principal"); + assert!( + store + .list_jobs_for_principal(&principal, 10) + .expect("jobs") + .is_empty() + ); + } + + #[tokio::test] + async fn batch_requests_are_disabled_by_default() { + let (state, token, identity) = publish_server_state(); + let store = state.publish_proxy.store.clone(); + let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; + let batch = format!( + r#"[{{ + "jsonrpc":"2.0", + "method":"publish.event", + "params":{{ + "event":{}, + "relays":[], + "relay_policy":"daemon_default_only", + "delivery_policy":{{"mode":"any"}} + }}, + "id":1 + }}]"#, + signed_event_json(&identity) + ); + let response = post_json(addr, batch.as_str(), Some(token.as_str())).await; + handle.stop().expect("stop server"); + + assert!( + response.contains("Batched requests are not supported by this server"), + "{response}" + ); + let principal = store + .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str()) + .expect("principal lookup") + .expect("principal"); + assert!( + store + .list_jobs_for_principal(&principal, 10) + .expect("jobs") + .is_empty() + ); + } +} + pub async fn start_server( addr: SocketAddr, rpc_cfg: &RpcConfig, @@ -36,7 +285,10 @@ pub async fn start_server( } let server_cfg = builder.build(); + let rpc_middleware = + RpcServiceBuilder::new().layer_fn(|service| RejectPublishNotifications { service }); let server = ServerBuilder::with_config(server_cfg) + .set_rpc_middleware(rpc_middleware) .set_http_middleware(tower::ServiceBuilder::new().map_request( move |mut request: HttpRequest<HttpBody>| { let publish_proxy_auth = auth::authorize_publish_proxy_request(