publish_proxy.rs (19162B)
1 use anyhow::Result; 2 use jsonrpsee::server::RpcModule; 3 use radroots_publish_proxy_protocol::{ 4 METHOD_CAPABILITIES, METHOD_EVENT, METHOD_JOB_GET, METHOD_JOB_LIST, METHOD_RELAYS_RESOLVE, 5 PublishCapabilities, PublishDeliveryPolicy, PublishEventRequest, PublishRelayOutcome, 6 PublishRelaySource, 7 }; 8 use serde::{Deserialize, Serialize}; 9 10 use crate::core::publish_proxy::PublishProxyError; 11 use crate::transport::jsonrpc::auth::require_publish_principal; 12 use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; 13 14 #[derive(Debug, Deserialize)] 15 struct JobGetParams { 16 job_id: String, 17 } 18 19 #[derive(Debug, Deserialize)] 20 #[serde(deny_unknown_fields)] 21 struct JobListParams { 22 limit: Option<usize>, 23 } 24 25 #[derive(Debug, Deserialize)] 26 struct RelaysResolveParams { 27 event: radroots_publish_proxy_protocol::SignedNostrEventWire, 28 relay_policy: radroots_publish_proxy_protocol::PublishRelayPolicy, 29 #[serde(default)] 30 relays: Vec<String>, 31 } 32 33 #[derive(Clone, Debug, Serialize)] 34 struct RelaysResolveResponse { 35 relays: Vec<ResolvedRelayResponseItem>, 36 rejected_relays: Vec<PublishRelayOutcome>, 37 } 38 39 #[derive(Clone, Debug, Serialize)] 40 struct ResolvedRelayResponseItem { 41 relay_url: String, 42 source: PublishRelaySource, 43 } 44 45 pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { 46 let mut module = RpcModule::new(ctx); 47 register_capabilities(&mut module, ®istry)?; 48 register_event(&mut module, ®istry)?; 49 register_job_get(&mut module, ®istry)?; 50 register_job_list(&mut module, ®istry)?; 51 register_relays_resolve(&mut module, ®istry)?; 52 Ok(module) 53 } 54 55 fn register_capabilities( 56 module: &mut RpcModule<RpcContext>, 57 registry: &MethodRegistry, 58 ) -> Result<()> { 59 registry.track(METHOD_CAPABILITIES); 60 module.register_async_method(METHOD_CAPABILITIES, |_params, ctx, extensions| async move { 61 require_publish_principal(&extensions)?; 62 Ok::<PublishCapabilities, RpcError>(PublishCapabilities::v1( 63 ctx.state.publish_proxy.config.max_event_bytes, 64 ctx.state.publish_proxy.config.max_relays_per_request, 65 )) 66 })?; 67 Ok(()) 68 } 69 70 fn register_event(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { 71 registry.track(METHOD_EVENT); 72 module.register_async_method(METHOD_EVENT, |params, ctx, extensions| async move { 73 let principal = require_publish_principal(&extensions)?; 74 let request: PublishEventRequest = params 75 .parse() 76 .map_err(|error| RpcError::InvalidParams(error.to_string()))?; 77 ctx.state 78 .publish_proxy 79 .publish_event(&principal, request) 80 .await 81 .map_err(rpc_error_from_publish_proxy) 82 })?; 83 Ok(()) 84 } 85 86 fn register_job_get(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { 87 registry.track(METHOD_JOB_GET); 88 module.register_async_method(METHOD_JOB_GET, |params, ctx, extensions| async move { 89 let principal = require_publish_principal(&extensions)?; 90 let params: JobGetParams = params 91 .parse() 92 .map_err(|error| RpcError::InvalidParams(error.to_string()))?; 93 let job_id = params.job_id.trim(); 94 if job_id.is_empty() { 95 return Err(RpcError::InvalidParams("missing job_id".to_owned())); 96 } 97 ctx.state 98 .publish_proxy 99 .store 100 .job_by_id_for_principal(job_id, &principal) 101 .map_err(|error| RpcError::Other(error.to_string()))? 102 .ok_or_else(|| RpcError::Other(format!("unknown publish job: {job_id}"))) 103 })?; 104 Ok(()) 105 } 106 107 fn register_job_list(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { 108 registry.track(METHOD_JOB_LIST); 109 module.register_async_method(METHOD_JOB_LIST, |params, ctx, extensions| async move { 110 let principal = require_publish_principal(&extensions)?; 111 let params = if params.len_bytes() == 0 || params.as_str() == Some("[]") { 112 JobListParams { limit: None } 113 } else { 114 params 115 .parse::<JobListParams>() 116 .map_err(|error| RpcError::InvalidParams(error.to_string()))? 117 }; 118 if params.limit == Some(0) { 119 return Err(RpcError::InvalidParams( 120 "limit must be greater than zero".to_owned(), 121 )); 122 } 123 let configured_limit = ctx.state.publish_proxy.config.job_list_limit; 124 let limit = params 125 .limit 126 .unwrap_or(configured_limit) 127 .min(configured_limit); 128 ctx.state 129 .publish_proxy 130 .store 131 .list_jobs_for_principal(&principal, limit) 132 .map_err(|error| RpcError::Other(error.to_string())) 133 })?; 134 Ok(()) 135 } 136 137 fn register_relays_resolve( 138 module: &mut RpcModule<RpcContext>, 139 registry: &MethodRegistry, 140 ) -> Result<()> { 141 registry.track(METHOD_RELAYS_RESOLVE); 142 module.register_async_method( 143 METHOD_RELAYS_RESOLVE, 144 |params, ctx, extensions| async move { 145 let principal = require_publish_principal(&extensions)?; 146 let params: RelaysResolveParams = params 147 .parse() 148 .map_err(|error| RpcError::InvalidParams(error.to_string()))?; 149 params 150 .event 151 .validate() 152 .map_err(|error| RpcError::InvalidParams(error.to_string()))?; 153 let request = PublishEventRequest { 154 event: params.event, 155 relays: params.relays, 156 relay_policy: params.relay_policy, 157 delivery_policy: PublishDeliveryPolicy::Any, 158 idempotency_key: None, 159 timeout_ms: None, 160 }; 161 principal 162 .allows_event(&request) 163 .map_err(|error| RpcError::Unauthorized(error.to_string()))?; 164 let resolution = ctx 165 .state 166 .publish_proxy 167 .resolve_relays_for_request(request.event.pubkey.as_str(), &request) 168 .await 169 .map_err(rpc_error_from_publish_proxy)?; 170 Ok::<RelaysResolveResponse, RpcError>(RelaysResolveResponse { 171 relays: resolution 172 .targets 173 .into_iter() 174 .map(|target| ResolvedRelayResponseItem { 175 relay_url: target.url.into_string(), 176 source: target.source, 177 }) 178 .collect(), 179 rejected_relays: resolution.outcomes, 180 }) 181 }, 182 )?; 183 Ok(()) 184 } 185 186 fn rpc_error_from_publish_proxy(error: PublishProxyError) -> RpcError { 187 match error { 188 PublishProxyError::InvalidScope(message) => RpcError::Unauthorized(message), 189 PublishProxyError::InvalidSignedEvent(message) => RpcError::InvalidParams(message), 190 PublishProxyError::SignedEventVerification(_) 191 | PublishProxyError::Draft(_) 192 | PublishProxyError::Relay(_) => RpcError::InvalidParams(error.to_string()), 193 PublishProxyError::IdempotencyConflict(_) => RpcError::Other(error.to_string()), 194 other => RpcError::Other(other.to_string()), 195 } 196 } 197 198 #[cfg(test)] 199 mod tests { 200 use super::module; 201 use std::sync::Arc; 202 203 use crate::app::config::{Nip46Config, PublishProxyConfig}; 204 use crate::core::Radrootsd; 205 use crate::core::publish_proxy::{ 206 PublishJobVisibility, PublishPrincipalInit, generate_bearer_token, hash_bearer_token, 207 }; 208 use crate::transport::jsonrpc::auth::{ 209 PublishProxyAuthorization, authorize_publish_proxy_request, 210 }; 211 use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; 212 use jsonrpsee::server::RpcModule; 213 use nostr::JsonUtil; 214 use radroots_identity::RadrootsIdentity; 215 use radroots_nostr::prelude::{ 216 RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event, 217 }; 218 use radroots_publish_proxy_protocol::{PublishRelayPolicy, SignedNostrEventWire}; 219 use radroots_relay_transport::RadrootsMockRelayPublishAdapter; 220 221 fn signed_event(identity: &RadrootsIdentity) -> SignedNostrEventWire { 222 let event = radroots_nostr_build_event( 223 30_402, 224 "{}", 225 vec![vec!["d".to_owned(), "listing-1".to_owned()]], 226 ) 227 .expect("event builder") 228 .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) 229 .sign_with_keys(identity.keys()) 230 .expect("signed event"); 231 serde_json::from_str(event.as_json().as_str()).expect("event wire") 232 } 233 234 fn module_with_principal_and_config( 235 admin: bool, 236 publish_proxy_config: PublishProxyConfig, 237 ) -> ( 238 RpcModule<RpcContext>, 239 RpcContext, 240 String, 241 SignedNostrEventWire, 242 ) { 243 let identity = RadrootsIdentity::generate(); 244 let signed_event = signed_event(&identity); 245 let metadata: RadrootsNostrMetadata = 246 serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); 247 let state = Radrootsd::new( 248 identity.clone(), 249 metadata, 250 publish_proxy_config, 251 Nip46Config::default(), 252 ) 253 .expect("state"); 254 let mut state = state; 255 state.publish_proxy = state 256 .publish_proxy 257 .clone() 258 .with_publisher(Arc::new(RadrootsMockRelayPublishAdapter::new())); 259 let token = generate_bearer_token(); 260 let principal = state 261 .publish_proxy 262 .store 263 .create_principal(PublishPrincipalInit { 264 label: "tester".to_owned(), 265 token_hash: hash_bearer_token(token.as_str()), 266 allowed_pubkeys: vec![identity.public_key_hex()], 267 allowed_kinds: vec![30_402], 268 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], 269 allow_request_relays: false, 270 job_visibility: if admin { 271 PublishJobVisibility::Admin 272 } else { 273 PublishJobVisibility::Own 274 }, 275 expires_at_unix: None, 276 }) 277 .expect("principal"); 278 let registry = MethodRegistry::default(); 279 let ctx = RpcContext::new(state, registry.clone()); 280 let mut module = module(ctx.clone(), registry).expect("module"); 281 module 282 .extensions_mut() 283 .insert(PublishProxyAuthorization::Authorized(principal)); 284 (module, ctx, token, signed_event) 285 } 286 287 fn module_with_principal( 288 admin: bool, 289 ) -> ( 290 RpcModule<RpcContext>, 291 RpcContext, 292 String, 293 SignedNostrEventWire, 294 ) { 295 module_with_principal_and_config( 296 admin, 297 PublishProxyConfig { 298 daemon_default_publish_relays: vec!["wss://relay.example.com".to_owned()], 299 ..PublishProxyConfig::default() 300 }, 301 ) 302 } 303 304 #[tokio::test] 305 async fn publish_event_records_job_and_deduplicates_idempotency() { 306 let (module, _ctx, _token, event) = module_with_principal(false); 307 let request = format!( 308 r#"{{ 309 "jsonrpc":"2.0", 310 "method":"publish.event", 311 "params":{{ 312 "event":{}, 313 "relays":[], 314 "relay_policy":"daemon_default_only", 315 "delivery_policy":{{"mode":"any"}}, 316 "idempotency_key":"idem-1" 317 }}, 318 "id":1 319 }}"#, 320 serde_json::to_string(&event).expect("event json") 321 ); 322 let (response, _stream) = module 323 .raw_json_request(request.as_str(), 1) 324 .await 325 .expect("request"); 326 assert!(response.get().contains("\"deduplicated\":false")); 327 let (response, _stream) = module 328 .raw_json_request(request.as_str(), 1) 329 .await 330 .expect("request"); 331 assert!(response.get().contains("\"deduplicated\":true")); 332 } 333 334 #[tokio::test] 335 async fn publish_event_rejects_principal_scope_gap() { 336 let (module, _ctx, _token, _pubkey) = module_with_principal(false); 337 let other_identity = RadrootsIdentity::generate(); 338 let event = signed_event(&other_identity); 339 let request = format!( 340 r#"{{ 341 "jsonrpc":"2.0", 342 "method":"publish.event", 343 "params":{{ 344 "event":{}, 345 "relays":[], 346 "relay_policy":"daemon_default_only", 347 "delivery_policy":{{"mode":"any"}} 348 }}, 349 "id":1 350 }}"#, 351 serde_json::to_string(&event).expect("event json") 352 ); 353 let (response, _stream) = module 354 .raw_json_request(request.as_str(), 1) 355 .await 356 .expect("request"); 357 assert!(response.get().contains("unauthorized")); 358 } 359 360 #[tokio::test] 361 async fn publish_job_list_rejects_malformed_and_zero_limits() { 362 let (module, _ctx, _token, _event) = module_with_principal(false); 363 let malformed = r#"{ 364 "jsonrpc":"2.0", 365 "method":"publish.job.list", 366 "params":"bad", 367 "id":1 368 }"#; 369 let (response, _stream) = module 370 .raw_json_request(malformed, 1) 371 .await 372 .expect("malformed request"); 373 assert!(response.get().contains("\"code\":-32602")); 374 375 let zero = r#"{ 376 "jsonrpc":"2.0", 377 "method":"publish.job.list", 378 "params":{"limit":0}, 379 "id":1 380 }"#; 381 let (response, _stream) = module 382 .raw_json_request(zero, 1) 383 .await 384 .expect("zero request"); 385 assert!(response.get().contains("\"code\":-32602")); 386 assert!(response.get().contains("limit must be greater than zero")); 387 } 388 389 #[tokio::test] 390 async fn publish_job_list_rejects_unknown_fields() { 391 let (module, _ctx, _token, _event) = module_with_principal(false); 392 for params in [ 393 r#"{"cursor":"next"}"#, 394 r#"{"status":"publishing"}"#, 395 r#"{"limit":1,"extra":true}"#, 396 ] { 397 let request = format!( 398 r#"{{ 399 "jsonrpc":"2.0", 400 "method":"publish.job.list", 401 "params":{params}, 402 "id":1 403 }}"# 404 ); 405 let (response, _stream) = module 406 .raw_json_request(request.as_str(), 1) 407 .await 408 .expect("unknown field request"); 409 assert!( 410 response.get().contains("\"code\":-32602"), 411 "{}", 412 response.get() 413 ); 414 } 415 } 416 417 #[tokio::test] 418 async fn publish_job_list_uses_configured_limit_when_omitted_and_caps_positive_limits() { 419 let mut config = PublishProxyConfig { 420 daemon_default_publish_relays: vec!["wss://relay.example.com".to_owned()], 421 ..PublishProxyConfig::default() 422 }; 423 config.job_list_limit = 1; 424 let (module, _ctx, _token, event) = module_with_principal_and_config(false, config); 425 for idempotency_key in ["idem-list-1", "idem-list-2"] { 426 let request = format!( 427 r#"{{ 428 "jsonrpc":"2.0", 429 "method":"publish.event", 430 "params":{{ 431 "event":{}, 432 "relays":[], 433 "relay_policy":"daemon_default_only", 434 "delivery_policy":{{"mode":"any"}}, 435 "idempotency_key":"{idempotency_key}" 436 }}, 437 "id":1 438 }}"#, 439 serde_json::to_string(&event).expect("event json") 440 ); 441 let (response, _stream) = module 442 .raw_json_request(request.as_str(), 1) 443 .await 444 .expect("publish request"); 445 assert!(response.get().contains("\"deduplicated\":false")); 446 } 447 448 let omitted = r#"{ 449 "jsonrpc":"2.0", 450 "method":"publish.job.list", 451 "id":1 452 }"#; 453 let (response, _stream) = module 454 .raw_json_request(omitted, 1) 455 .await 456 .expect("omitted request"); 457 let value: serde_json::Value = 458 serde_json::from_str(response.get()).expect("omitted response json"); 459 assert_eq!(value["result"].as_array().expect("jobs").len(), 1); 460 461 let empty_array = r#"{ 462 "jsonrpc":"2.0", 463 "method":"publish.job.list", 464 "params":[], 465 "id":1 466 }"#; 467 let (response, _stream) = module 468 .raw_json_request(empty_array, 1) 469 .await 470 .expect("empty array request"); 471 let value: serde_json::Value = 472 serde_json::from_str(response.get()).expect("empty array response json"); 473 assert_eq!(value["result"].as_array().expect("jobs").len(), 1); 474 475 let over_limit = r#"{ 476 "jsonrpc":"2.0", 477 "method":"publish.job.list", 478 "params":{"limit":50}, 479 "id":1 480 }"#; 481 let (response, _stream) = module 482 .raw_json_request(over_limit, 1) 483 .await 484 .expect("over limit request"); 485 let value: serde_json::Value = 486 serde_json::from_str(response.get()).expect("over limit response json"); 487 assert_eq!(value["result"].as_array().expect("jobs").len(), 1); 488 } 489 490 #[tokio::test] 491 async fn publish_relays_resolve_returns_daemon_default_targets() { 492 let (module, _ctx, _token, event) = module_with_principal(false); 493 let request = format!( 494 r#"{{ 495 "jsonrpc":"2.0", 496 "method":"publish.relays.resolve", 497 "params":{{ 498 "event":{}, 499 "relay_policy":"daemon_default_only", 500 "relays":[] 501 }}, 502 "id":1 503 }}"#, 504 serde_json::to_string(&event).expect("event json") 505 ); 506 let (response, _stream) = module 507 .raw_json_request(request.as_str(), 1) 508 .await 509 .expect("request"); 510 assert!( 511 response 512 .get() 513 .contains("\"relay_url\":\"wss://relay.example.com\"") 514 ); 515 assert!(response.get().contains("\"source\":\"daemon_default\"")); 516 } 517 518 #[test] 519 fn http_auth_finds_principal_from_hashed_token() { 520 let (_module, ctx, token, _pubkey) = module_with_principal(false); 521 let header = format!("Bearer {token}"); 522 let auth = 523 authorize_publish_proxy_request(Some(header.as_str()), &ctx.state.publish_proxy.store); 524 assert!(matches!(auth, PublishProxyAuthorization::Authorized(_))); 525 } 526 }