server.rs (17012B)
1 #![forbid(unsafe_code)] 2 3 use std::net::SocketAddr; 4 5 use anyhow::Result; 6 use jsonrpsee::server::middleware::rpc::{ 7 Batch, MethodResponse, Notification, Request, RpcServiceBuilder, RpcServiceT, 8 }; 9 use jsonrpsee::server::{ 10 BatchRequestConfig, HttpBody, HttpRequest, RpcModule, ServerBuilder, ServerConfigBuilder, 11 ServerHandle, 12 }; 13 use jsonrpsee::types::{ErrorObject, Id}; 14 15 use crate::app::config::RpcConfig; 16 use crate::core::publish_proxy::PublishProxyStore; 17 use crate::transport::jsonrpc::RpcContext; 18 use crate::transport::jsonrpc::auth; 19 20 #[derive(Clone)] 21 struct RejectPublishNotifications<S> { 22 service: S, 23 } 24 25 impl<S> RpcServiceT for RejectPublishNotifications<S> 26 where 27 S: RpcServiceT< 28 MethodResponse = MethodResponse, 29 NotificationResponse = MethodResponse, 30 BatchResponse = MethodResponse, 31 > + Clone 32 + Send 33 + Sync 34 + 'static, 35 { 36 type MethodResponse = MethodResponse; 37 type NotificationResponse = MethodResponse; 38 type BatchResponse = MethodResponse; 39 40 fn call<'a>( 41 &self, 42 request: Request<'a>, 43 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a { 44 self.service.call(request) 45 } 46 47 fn batch<'a>( 48 &self, 49 requests: Batch<'a>, 50 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a { 51 self.service.batch(requests) 52 } 53 54 fn notification<'a>( 55 &self, 56 notification: Notification<'a>, 57 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a { 58 let service = self.service.clone(); 59 async move { 60 if notification.method_name().starts_with("publish.") { 61 MethodResponse::error( 62 Id::Null, 63 ErrorObject::owned( 64 -32600, 65 "publish notifications are not accepted", 66 None::<()>, 67 ), 68 ) 69 } else { 70 service.notification(notification).await 71 } 72 } 73 } 74 } 75 76 #[cfg(test)] 77 mod tests { 78 use super::start_server; 79 use crate::app::config::{ 80 Nip46Config, PublishProxyConfig, PublishProxyRelayUrlPolicy, RpcConfig, 81 }; 82 use crate::core::Radrootsd; 83 use crate::core::publish_proxy::{ 84 PublishJobVisibility, PublishPrincipalInit, PublishRelayResolveFuture, 85 PublishRelayResolver, generate_bearer_token, hash_bearer_token, 86 }; 87 use crate::transport::jsonrpc::methods; 88 use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; 89 use jsonrpsee::server::RpcModule; 90 use nostr::JsonUtil; 91 use radroots_identity::RadrootsIdentity; 92 use radroots_nostr::prelude::{ 93 RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event, 94 }; 95 use radroots_publish_proxy_protocol::PublishRelayPolicy; 96 use radroots_relay_transport::RadrootsMockRelayPublishAdapter; 97 use serde_json::Value; 98 use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}; 99 use std::sync::Arc; 100 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 101 102 const RELAY_PRIMARY: &str = "ws://localhost:7777"; 103 const RELAY_PUBLIC: &str = "wss://relay.example.com"; 104 105 fn unused_addr() -> SocketAddr { 106 let listener = TcpListener::bind("127.0.0.1:0").expect("bind local addr"); 107 listener.local_addr().expect("local addr") 108 } 109 110 fn signed_event_json(identity: &RadrootsIdentity) -> String { 111 radroots_nostr_build_event( 112 30_402, 113 "{}", 114 vec![vec!["d".to_owned(), "listing-1".to_owned()]], 115 ) 116 .expect("event builder") 117 .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) 118 .sign_with_keys(identity.keys()) 119 .expect("signed event") 120 .as_json() 121 } 122 123 async fn post_json(addr: SocketAddr, body: &str, token: Option<&str>) -> String { 124 let mut stream = tokio::net::TcpStream::connect(addr).await.expect("connect"); 125 let auth_header = token 126 .map(|token| format!("Authorization: Bearer {token}\r\n")) 127 .unwrap_or_default(); 128 let request = format!( 129 "POST / HTTP/1.1\r\nHost: {addr}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n{auth_header}\r\n{body}", 130 body.len() 131 ); 132 stream 133 .write_all(request.as_bytes()) 134 .await 135 .expect("write request"); 136 let mut bytes = Vec::new(); 137 stream.read_to_end(&mut bytes).await.expect("read response"); 138 String::from_utf8(bytes).expect("response utf8") 139 } 140 141 fn publish_server_state_with_config( 142 publish_proxy_config: PublishProxyConfig, 143 resolver: Option<Arc<dyn PublishRelayResolver>>, 144 ) -> ( 145 Radrootsd, 146 String, 147 RadrootsIdentity, 148 RadrootsMockRelayPublishAdapter, 149 ) { 150 let identity = RadrootsIdentity::generate(); 151 let metadata: RadrootsNostrMetadata = 152 serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); 153 let mut state = Radrootsd::new( 154 identity.clone(), 155 metadata, 156 publish_proxy_config, 157 Nip46Config::default(), 158 ) 159 .expect("state"); 160 let adapter = RadrootsMockRelayPublishAdapter::new(); 161 let mut publish_proxy = state.publish_proxy.clone(); 162 if let Some(resolver) = resolver { 163 publish_proxy = publish_proxy.with_relay_resolver(resolver); 164 } 165 state.publish_proxy = publish_proxy.with_publisher(Arc::new(adapter.clone())); 166 let token = generate_bearer_token(); 167 state 168 .publish_proxy 169 .store 170 .create_principal(PublishPrincipalInit { 171 label: "tester".to_owned(), 172 token_hash: hash_bearer_token(token.as_str()), 173 allowed_pubkeys: vec![identity.public_key_hex()], 174 allowed_kinds: vec![30_402], 175 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], 176 allow_request_relays: false, 177 job_visibility: PublishJobVisibility::Own, 178 expires_at_unix: None, 179 }) 180 .expect("principal"); 181 (state, token, identity, adapter) 182 } 183 184 fn publish_server_state() -> ( 185 Radrootsd, 186 String, 187 RadrootsIdentity, 188 RadrootsMockRelayPublishAdapter, 189 ) { 190 publish_server_state_with_config( 191 PublishProxyConfig { 192 daemon_default_publish_relays: vec![RELAY_PRIMARY.to_owned()], 193 relay_url_policy: PublishProxyRelayUrlPolicy::Localhost, 194 ..PublishProxyConfig::default() 195 }, 196 None, 197 ) 198 } 199 200 struct StaticPublishRelayResolver { 201 addresses: Vec<IpAddr>, 202 } 203 204 impl StaticPublishRelayResolver { 205 fn forbidden_localhost() -> Self { 206 Self { 207 addresses: vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))], 208 } 209 } 210 } 211 212 impl PublishRelayResolver for StaticPublishRelayResolver { 213 fn resolve<'a>( 214 &'a self, 215 _url: &'a radroots_relay_transport::RadrootsRelayUrl, 216 ) -> PublishRelayResolveFuture<'a> { 217 Box::pin(async move { Ok(self.addresses.clone()) }) 218 } 219 } 220 221 async fn start_publish_server( 222 state: Radrootsd, 223 rpc_cfg: RpcConfig, 224 ) -> (SocketAddr, jsonrpsee::server::ServerHandle) { 225 let addr = unused_addr(); 226 let store = state.publish_proxy.store.clone(); 227 let registry = MethodRegistry::default(); 228 let ctx = RpcContext::new(state, registry.clone()); 229 let mut root = RpcModule::new(ctx.clone()); 230 methods::register_all(&mut root, ctx, registry).expect("register methods"); 231 let handle = start_server(addr, &rpc_cfg, store, root) 232 .await 233 .expect("start server"); 234 (addr, handle) 235 } 236 237 fn json_response_body(response: &str) -> Value { 238 let (_headers, body) = response.split_once("\r\n\r\n").expect("http body"); 239 serde_json::from_str(body).expect("json response body") 240 } 241 242 #[tokio::test] 243 async fn raw_http_publish_event_get_and_list_preserve_signed_event() { 244 let (state, token, identity, adapter) = publish_server_state(); 245 let event_json = signed_event_json(&identity); 246 let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; 247 let publish = format!( 248 r#"{{ 249 "jsonrpc":"2.0", 250 "method":"publish.event", 251 "params":{{ 252 "event":{}, 253 "relays":[], 254 "relay_policy":"daemon_default_only", 255 "delivery_policy":{{"mode":"any"}}, 256 "idempotency_key":"raw-http-idem" 257 }}, 258 "id":1 259 }}"#, 260 event_json 261 ); 262 let publish_response = post_json(addr, publish.as_str(), Some(token.as_str())).await; 263 let publish_value = json_response_body(publish_response.as_str()); 264 let job_id = publish_value["result"]["job"]["job_id"] 265 .as_str() 266 .expect("job id") 267 .to_owned(); 268 assert_eq!(publish_value["result"]["deduplicated"], false); 269 assert_eq!( 270 publish_value["result"]["job"]["status"], 271 "delivery_satisfied" 272 ); 273 274 let get = format!( 275 r#"{{ 276 "jsonrpc":"2.0", 277 "method":"publish.job.get", 278 "params":{{"job_id":"{job_id}"}}, 279 "id":2 280 }}"# 281 ); 282 let get_response = post_json(addr, get.as_str(), Some(token.as_str())).await; 283 let get_value = json_response_body(get_response.as_str()); 284 assert_eq!(get_value["result"]["job_id"], job_id); 285 assert_eq!(get_value["result"]["status"], "delivery_satisfied"); 286 287 let list = r#"{ 288 "jsonrpc":"2.0", 289 "method":"publish.job.list", 290 "params":{"limit":10}, 291 "id":3 292 }"#; 293 let list_response = post_json(addr, list, Some(token.as_str())).await; 294 let list_value = json_response_body(list_response.as_str()); 295 let jobs = list_value["result"].as_array().expect("jobs"); 296 assert_eq!(jobs.len(), 1); 297 assert_eq!(jobs[0]["job_id"], job_id); 298 handle.stop().expect("stop server"); 299 300 assert_eq!(adapter.captured_raw_events(), vec![event_json]); 301 } 302 303 #[tokio::test] 304 async fn raw_http_publish_event_rejects_public_relay_forbidden_dns_destination() { 305 let (state, token, identity, adapter) = publish_server_state_with_config( 306 PublishProxyConfig { 307 daemon_default_publish_relays: vec![RELAY_PUBLIC.to_owned()], 308 relay_url_policy: PublishProxyRelayUrlPolicy::Public, 309 ..PublishProxyConfig::default() 310 }, 311 Some(Arc::new(StaticPublishRelayResolver::forbidden_localhost())), 312 ); 313 let event_json = signed_event_json(&identity); 314 let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; 315 let publish = format!( 316 r#"{{ 317 "jsonrpc":"2.0", 318 "method":"publish.event", 319 "params":{{ 320 "event":{}, 321 "relays":[], 322 "relay_policy":"daemon_default_only", 323 "delivery_policy":{{"mode":"any"}}, 324 "idempotency_key":"raw-http-public-dns-reject" 325 }}, 326 "id":1 327 }}"#, 328 event_json 329 ); 330 let publish_response = post_json(addr, publish.as_str(), Some(token.as_str())).await; 331 handle.stop().expect("stop server"); 332 333 let publish_value = json_response_body(publish_response.as_str()); 334 let job = &publish_value["result"]["job"]; 335 assert_eq!(publish_value["result"]["deduplicated"], false); 336 assert_eq!(job["status"], "rejected"); 337 assert_eq!(job["last_error"], "no_publish_relays"); 338 let relays = job["relays"].as_array().expect("relay outcomes"); 339 assert_eq!(relays.len(), 1); 340 assert_eq!(relays[0]["relay_url"], RELAY_PUBLIC); 341 assert_eq!(relays[0]["source"], "daemon_default"); 342 assert_eq!(relays[0]["outcome_kind"], "relay_url_rejected"); 343 assert_eq!(relays[0]["attempted"], false); 344 assert!(adapter.captured_raw_events().is_empty()); 345 } 346 347 #[tokio::test] 348 async fn publish_notifications_do_not_create_jobs() { 349 let (state, token, identity, _adapter) = publish_server_state(); 350 let store = state.publish_proxy.store.clone(); 351 let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; 352 let notification = format!( 353 r#"{{ 354 "jsonrpc":"2.0", 355 "method":"publish.event", 356 "params":{{ 357 "event":{}, 358 "relays":[], 359 "relay_policy":"daemon_default_only", 360 "delivery_policy":{{"mode":"any"}} 361 }} 362 }}"#, 363 signed_event_json(&identity) 364 ); 365 let response = post_json(addr, notification.as_str(), Some(token.as_str())).await; 366 handle.stop().expect("stop server"); 367 368 assert!( 369 response.contains("publish notifications are not accepted") 370 || response.ends_with("\r\n\r\n") 371 ); 372 let principal = store 373 .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str()) 374 .expect("principal lookup") 375 .expect("principal"); 376 assert!( 377 store 378 .list_jobs_for_principal(&principal, 10) 379 .expect("jobs") 380 .is_empty() 381 ); 382 } 383 384 #[tokio::test] 385 async fn batch_requests_are_disabled_by_default() { 386 let (state, token, identity, _adapter) = publish_server_state(); 387 let store = state.publish_proxy.store.clone(); 388 let (addr, handle) = start_publish_server(state, RpcConfig::default()).await; 389 let batch = format!( 390 r#"[{{ 391 "jsonrpc":"2.0", 392 "method":"publish.event", 393 "params":{{ 394 "event":{}, 395 "relays":[], 396 "relay_policy":"daemon_default_only", 397 "delivery_policy":{{"mode":"any"}} 398 }}, 399 "id":1 400 }}]"#, 401 signed_event_json(&identity) 402 ); 403 let response = post_json(addr, batch.as_str(), Some(token.as_str())).await; 404 handle.stop().expect("stop server"); 405 406 assert!( 407 response.contains("Batched requests are not supported by this server"), 408 "{response}" 409 ); 410 let principal = store 411 .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str()) 412 .expect("principal lookup") 413 .expect("principal"); 414 assert!( 415 store 416 .list_jobs_for_principal(&principal, 10) 417 .expect("jobs") 418 .is_empty() 419 ); 420 } 421 } 422 423 pub async fn start_server( 424 addr: SocketAddr, 425 rpc_cfg: &RpcConfig, 426 publish_proxy_store: PublishProxyStore, 427 root: RpcModule<RpcContext>, 428 ) -> Result<ServerHandle> { 429 let mut builder = ServerConfigBuilder::new() 430 .max_request_body_size(rpc_cfg.max_request_body_size) 431 .max_response_body_size(rpc_cfg.max_response_body_size) 432 .max_connections(rpc_cfg.max_connections) 433 .max_subscriptions_per_connection(rpc_cfg.max_subscriptions_per_connection) 434 .set_message_buffer_capacity(rpc_cfg.message_buffer_capacity); 435 436 if let Some(limit) = rpc_cfg.batch_request_limit { 437 let cfg = if limit == 0 { 438 BatchRequestConfig::Disabled 439 } else { 440 BatchRequestConfig::Limit(limit) 441 }; 442 builder = builder.set_batch_request_config(cfg); 443 } 444 445 let server_cfg = builder.build(); 446 let rpc_middleware = 447 RpcServiceBuilder::new().layer_fn(|service| RejectPublishNotifications { service }); 448 let server = ServerBuilder::with_config(server_cfg) 449 .set_rpc_middleware(rpc_middleware) 450 .set_http_middleware(tower::ServiceBuilder::new().map_request( 451 move |mut request: HttpRequest<HttpBody>| { 452 let publish_proxy_auth = auth::authorize_publish_proxy_request( 453 request 454 .headers() 455 .get("authorization") 456 .and_then(|value| value.to_str().ok()), 457 &publish_proxy_store, 458 ); 459 request.extensions_mut().insert(publish_proxy_auth); 460 request 461 }, 462 )) 463 .build(addr) 464 .await?; 465 Ok(server.start(root)) 466 }