rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 1a68784649446c6cebdc3b4d3cadee55eccf0417
parent e73a0bdab6c462d7774b2a83586ccedd59021bbb
Author: triesap <tyson@radroots.org>
Date:   Thu, 21 May 2026 08:16:52 +0000

rhi: harden remote http prover polling

- require same-origin status polling before bearer reuse
- reject bearer-token remote prover configs over plain HTTP
- bound remote prover response reads by configured byte limits
- cover remote http credential and polling failure modes

Diffstat:
Msrc/features/trade_validation_receipt.rs | 292++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 274 insertions(+), 18 deletions(-)

diff --git a/src/features/trade_validation_receipt.rs b/src/features/trade_validation_receipt.rs @@ -253,16 +253,19 @@ pub struct TradeValidationReceiptRemoteHttpProverConfig { impl TradeValidationReceiptRemoteHttpProverConfig { pub fn validate(&self) -> Result<(), TradeValidationReceiptJobError> { - if self.endpoint_url.trim().is_empty() { + let url = remote_http_endpoint_url(self)?; + if url.scheme() != "http" && url.scheme() != "https" { return Err(TradeValidationReceiptJobError::RemoteHttpInvalidConfig( "endpoint_url", )); } - let url = reqwest::Url::parse(self.endpoint_url.as_str()) - .map_err(|_| TradeValidationReceiptJobError::RemoteHttpInvalidConfig("endpoint_url"))?; - if url.scheme() != "http" && url.scheme() != "https" { + if matches!( + self.auth, + TradeValidationReceiptRemoteHttpAuth::BearerTokenEnv { .. } + ) && url.scheme() != "https" + { return Err(TradeValidationReceiptJobError::RemoteHttpInvalidConfig( - "endpoint_url", + "auth.endpoint_url_scheme", )); } if self.request_timeout_ms == 0 { @@ -284,6 +287,18 @@ impl TradeValidationReceiptRemoteHttpProverConfig { } } +fn remote_http_endpoint_url( + config: &TradeValidationReceiptRemoteHttpProverConfig, +) -> Result<reqwest::Url, TradeValidationReceiptJobError> { + if config.endpoint_url.trim().is_empty() { + return Err(TradeValidationReceiptJobError::RemoteHttpInvalidConfig( + "endpoint_url", + )); + } + reqwest::Url::parse(config.endpoint_url.as_str()) + .map_err(|_| TradeValidationReceiptJobError::RemoteHttpInvalidConfig("endpoint_url")) +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "mode", rename_all = "snake_case")] pub enum TradeValidationReceiptRemoteHttpAuth { @@ -898,6 +913,7 @@ async fn remote_http_completed_response( ) -> Result<RadrootsSp1TradeRemoteProverResponse, TradeValidationReceiptJobError> { let mut response = remote_http_post_json_io(config, config.endpoint_url.as_str(), request).await?; + remote_http_validate_response_identity(&response, request)?; match response.status { RadrootsSp1TradeRemoteProverStatus::Completed => return Ok(response), RadrootsSp1TradeRemoteProverStatus::Failed => { @@ -913,6 +929,7 @@ async fn remote_http_completed_response( let status_url = remote_http_status_url(config, &response)?; tokio::time::sleep(Duration::from_millis(config.poll_interval_ms)).await; response = remote_http_get_json_io(config, status_url.as_str(), request).await?; + remote_http_validate_response_identity(&response, request)?; match response.status { RadrootsSp1TradeRemoteProverStatus::Completed => return Ok(response), RadrootsSp1TradeRemoteProverStatus::Failed => { @@ -929,6 +946,24 @@ async fn remote_http_completed_response( } #[cfg(feature = "sp1_proving")] +fn remote_http_validate_response_identity( + response: &RadrootsSp1TradeRemoteProverResponse, + request: &RadrootsSp1TradeRemoteProverRequest, +) -> Result<(), TradeValidationReceiptJobError> { + if response.schema_version != RADROOTS_SP1_TRADE_REMOTE_PROVER_SCHEMA_VERSION { + return Err(TradeValidationReceiptJobError::RemoteHttpInvalidResponse( + "schema_version", + )); + } + if response.request_id != request.request_id { + return Err(TradeValidationReceiptJobError::RemoteHttpIdentityMismatch( + "request_id", + )); + } + Ok(()) +} + +#[cfg(feature = "sp1_proving")] fn remote_http_terminal_error( status: &'static str, response: RadrootsSp1TradeRemoteProverResponse, @@ -1070,20 +1105,30 @@ async fn remote_http_response_json( config: &TradeValidationReceiptRemoteHttpProverConfig, response: Result<reqwest::Response, reqwest::Error>, ) -> Result<RadrootsSp1TradeRemoteProverResponse, TradeValidationReceiptJobError> { - let response = response + let mut response = response .map_err(|error| TradeValidationReceiptJobError::RemoteHttpTransport(error.to_string()))?; if !response.status().is_success() { return Err(TradeValidationReceiptJobError::RemoteHttpTransport( format!("http status {}", response.status().as_u16()), )); } - let bytes = response - .bytes() - .await - .map_err(|error| TradeValidationReceiptJobError::RemoteHttpTransport(error.to_string()))?; - if bytes.len() > config.max_response_bytes { + if response + .content_length() + .is_some_and(|length| length > config.max_response_bytes as u64) + { return Err(TradeValidationReceiptJobError::RemoteHttpResponseTooLarge); } + let mut bytes = Vec::with_capacity(config.max_response_bytes.min(8192)); + while let Some(chunk) = response + .chunk() + .await + .map_err(|error| TradeValidationReceiptJobError::RemoteHttpTransport(error.to_string()))? + { + if chunk.len() > config.max_response_bytes.saturating_sub(bytes.len()) { + return Err(TradeValidationReceiptJobError::RemoteHttpResponseTooLarge); + } + bytes.extend_from_slice(&chunk); + } serde_json::from_slice::<RadrootsSp1TradeRemoteProverResponse>(&bytes) .map_err(TradeValidationReceiptJobError::Serde) } @@ -1093,10 +1138,13 @@ fn remote_http_status_url( config: &TradeValidationReceiptRemoteHttpProverConfig, response: &RadrootsSp1TradeRemoteProverResponse, ) -> Result<String, TradeValidationReceiptJobError> { + let base = remote_http_endpoint_url(config)?; if let Some(url) = response.status_url.as_deref() { let parsed = reqwest::Url::parse(url) .map_err(|_| TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_url"))?; - if parsed.scheme() != "http" && parsed.scheme() != "https" { + if (parsed.scheme() != "http" && parsed.scheme() != "https") + || !remote_http_same_origin(&base, &parsed) + { return Err(TradeValidationReceiptJobError::RemoteHttpInvalidResponse( "status_url", )); @@ -1104,18 +1152,37 @@ fn remote_http_status_url( return Ok(parsed.to_string()); } if let Some(path) = response.status_path.as_deref() { - let base = reqwest::Url::parse(config.endpoint_url.as_str()) - .map_err(|_| TradeValidationReceiptJobError::RemoteHttpInvalidConfig("endpoint_url"))?; - return base - .join(path) - .map(|url| url.to_string()) - .map_err(|_| TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_path")); + if path.trim() != path + || !path.starts_with('/') + || path.starts_with("//") + || reqwest::Url::parse(path).is_ok() + { + return Err(TradeValidationReceiptJobError::RemoteHttpInvalidResponse( + "status_path", + )); + } + let parsed = base.join(path).map_err(|_| { + TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_path") + })?; + if !remote_http_same_origin(&base, &parsed) { + return Err(TradeValidationReceiptJobError::RemoteHttpInvalidResponse( + "status_path", + )); + } + return Ok(parsed.to_string()); } Err(TradeValidationReceiptJobError::RemoteHttpInvalidResponse( "status_url", )) } +#[cfg(feature = "sp1_proving")] +fn remote_http_same_origin(base: &reqwest::Url, candidate: &reqwest::Url) -> bool { + base.scheme() == candidate.scheme() + && base.host_str() == candidate.host_str() + && base.port_or_known_default() == candidate.port_or_known_default() +} + #[cfg(not(feature = "sp1_proving"))] async fn run_local_cpu_prove_backend( _witness: &RadrootsSp1TradeOrderAcceptanceWitness, @@ -1705,6 +1772,19 @@ mod tests { } #[cfg(feature = "sp1_proving")] + fn remote_http_local_response_url(response: &'static str) -> String { + let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("test listener"); + let addr = listener.local_addr().expect("test listener address"); + std::thread::spawn(move || { + let (mut stream, _) = listener.accept().expect("test connection"); + let mut buffer = [0; 4096]; + let _ = std::io::Read::read(&mut stream, &mut buffer); + std::io::Write::write_all(&mut stream, response.as_bytes()).expect("test response"); + }); + format!("http://{addr}/prove") + } + + #[cfg(feature = "sp1_proving")] async fn run_remote_http_job( remote_http_results: Vec< Result<RadrootsSp1TradeRemoteProverResponse, TradeValidationReceiptJobError>, @@ -1868,6 +1948,21 @@ mod tests { "endpoint_url" )) )); + + let mut bearer_over_http = remote_http_policy(); + bearer_over_http + .remote_http + .as_mut() + .expect("remote config") + .auth = TradeValidationReceiptRemoteHttpAuth::BearerTokenEnv { + env_var: "RADROOTS_TEST_REMOTE_HTTP_TOKEN".to_string(), + }; + assert!(matches!( + bearer_over_http.validate(), + Err(TradeValidationReceiptJobError::RemoteHttpInvalidConfig( + "auth.endpoint_url_scheme" + )) + )); } #[cfg(feature = "sp1_proving")] @@ -1943,6 +2038,133 @@ mod tests { #[cfg(feature = "sp1_proving")] #[tokio::test] + async fn remote_http_prove_accepts_same_origin_status_url() { + let mut policy = remote_http_policy(); + policy + .remote_http + .as_mut() + .expect("remote config") + .max_poll_attempts = 1; + let mut accepted = remote_response(RadrootsSp1TradeRemoteProverStatus::Accepted); + accepted.status_url = Some("http://127.0.0.1:65535/prove/status/request-1".to_string()); + let published = run_remote_http_job_with_policy( + policy, + vec![ + Ok(accepted), + Ok(remote_response( + RadrootsSp1TradeRemoteProverStatus::Completed, + )), + ], + vec![Ok(())], + vec![Ok(publish_result_id(1)), Ok(publish_result_id(2))], + ) + .await + .expect("same-origin status url"); + + assert_eq!(published.len(), 2); + } + + #[cfg(feature = "sp1_proving")] + #[tokio::test] + async fn remote_http_prove_rejects_cross_origin_status_url() { + let mut accepted = remote_response(RadrootsSp1TradeRemoteProverStatus::Accepted); + accepted.status_url = Some("http://127.0.0.2:65535/prove/status/request-1".to_string()); + let error = run_remote_http_job( + vec![Ok(accepted)], + Vec::new(), + vec![Err(TradeValidationReceiptJobError::InvalidJobRequest)], + ) + .await + .expect_err("cross-origin status url"); + + assert!(matches!( + error, + TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_url") + )); + assert!( + trade_validation_receipt_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .published_events + .is_empty() + ); + } + + #[cfg(feature = "sp1_proving")] + #[tokio::test] + async fn remote_http_prove_rejects_absolute_or_scheme_relative_status_path() { + let mut absolute = remote_response(RadrootsSp1TradeRemoteProverStatus::Accepted); + absolute.status_path = Some("https://example.invalid/status".to_string()); + let error = run_remote_http_job( + vec![Ok(absolute)], + Vec::new(), + vec![Err(TradeValidationReceiptJobError::InvalidJobRequest)], + ) + .await + .expect_err("absolute status path"); + + assert!(matches!( + error, + TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_path") + )); + + let mut scheme_relative = remote_response(RadrootsSp1TradeRemoteProverStatus::Accepted); + scheme_relative.status_path = Some("//example.invalid/status".to_string()); + let error = run_remote_http_job( + vec![Ok(scheme_relative)], + Vec::new(), + vec![Err(TradeValidationReceiptJobError::InvalidJobRequest)], + ) + .await + .expect_err("scheme-relative status path"); + + assert!(matches!( + error, + TradeValidationReceiptJobError::RemoteHttpInvalidResponse("status_path") + )); + assert!( + trade_validation_receipt_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .published_events + .is_empty() + ); + } + + #[cfg(feature = "sp1_proving")] + #[tokio::test] + async fn remote_http_prove_rejects_polling_request_id_mismatch_before_next_poll() { + let mut accepted = remote_response(RadrootsSp1TradeRemoteProverStatus::Accepted); + accepted.request_id = "wrong-request".to_string(); + accepted.status_path = Some("/prove/status/request-1".to_string()); + let error = run_remote_http_job( + vec![ + Ok(accepted), + Ok(remote_response( + RadrootsSp1TradeRemoteProverStatus::Completed, + )), + ], + Vec::new(), + vec![Err(TradeValidationReceiptJobError::InvalidJobRequest)], + ) + .await + .expect_err("polling identity mismatch"); + + assert!(matches!( + error, + TradeValidationReceiptJobError::RemoteHttpIdentityMismatch("request_id") + )); + assert!( + trade_validation_receipt_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .published_events + .is_empty() + ); + } + + #[cfg(feature = "sp1_proving")] + #[tokio::test] async fn remote_http_prove_does_not_publish_when_verification_fails() { let error = run_remote_http_job( vec![Ok(remote_response( @@ -2153,6 +2375,40 @@ mod tests { ); } + #[cfg(feature = "sp1_proving")] + #[tokio::test] + async fn remote_http_prove_rejects_oversized_http_response_before_publish() { + let endpoint = remote_http_local_response_url( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n2\r\n{}\r\n0\r\n\r\n", + ); + let mut policy = remote_http_policy(); + { + let remote_http = policy.remote_http.as_mut().expect("remote config"); + remote_http.endpoint_url = endpoint; + remote_http.max_response_bytes = 1; + } + let error = run_remote_http_job_with_policy( + policy, + Vec::new(), + Vec::new(), + vec![Err(TradeValidationReceiptJobError::InvalidJobRequest)], + ) + .await + .expect_err("oversized streamed response"); + + assert!(matches!( + error, + TradeValidationReceiptJobError::RemoteHttpResponseTooLarge + )); + assert!( + trade_validation_receipt_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .published_events + .is_empty() + ); + } + #[tokio::test] async fn proof_job_publishes_verified_receipt_and_result_after_proof_verification() { let _guard = test_guard();