radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

server.rs (17012B)


      1 #![forbid(unsafe_code)]
      2 
      3 use std::net::SocketAddr;
      4 
      5 use anyhow::Result;
      6 use jsonrpsee::server::middleware::rpc::{
      7     Batch, MethodResponse, Notification, Request, RpcServiceBuilder, RpcServiceT,
      8 };
      9 use jsonrpsee::server::{
     10     BatchRequestConfig, HttpBody, HttpRequest, RpcModule, ServerBuilder, ServerConfigBuilder,
     11     ServerHandle,
     12 };
     13 use jsonrpsee::types::{ErrorObject, Id};
     14 
     15 use crate::app::config::RpcConfig;
     16 use crate::core::publish_proxy::PublishProxyStore;
     17 use crate::transport::jsonrpc::RpcContext;
     18 use crate::transport::jsonrpc::auth;
     19 
     20 #[derive(Clone)]
     21 struct RejectPublishNotifications<S> {
     22     service: S,
     23 }
     24 
     25 impl<S> RpcServiceT for RejectPublishNotifications<S>
     26 where
     27     S: RpcServiceT<
     28             MethodResponse = MethodResponse,
     29             NotificationResponse = MethodResponse,
     30             BatchResponse = MethodResponse,
     31         > + Clone
     32         + Send
     33         + Sync
     34         + 'static,
     35 {
     36     type MethodResponse = MethodResponse;
     37     type NotificationResponse = MethodResponse;
     38     type BatchResponse = MethodResponse;
     39 
     40     fn call<'a>(
     41         &self,
     42         request: Request<'a>,
     43     ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
     44         self.service.call(request)
     45     }
     46 
     47     fn batch<'a>(
     48         &self,
     49         requests: Batch<'a>,
     50     ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
     51         self.service.batch(requests)
     52     }
     53 
     54     fn notification<'a>(
     55         &self,
     56         notification: Notification<'a>,
     57     ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
     58         let service = self.service.clone();
     59         async move {
     60             if notification.method_name().starts_with("publish.") {
     61                 MethodResponse::error(
     62                     Id::Null,
     63                     ErrorObject::owned(
     64                         -32600,
     65                         "publish notifications are not accepted",
     66                         None::<()>,
     67                     ),
     68                 )
     69             } else {
     70                 service.notification(notification).await
     71             }
     72         }
     73     }
     74 }
     75 
     76 #[cfg(test)]
     77 mod tests {
     78     use super::start_server;
     79     use crate::app::config::{
     80         Nip46Config, PublishProxyConfig, PublishProxyRelayUrlPolicy, RpcConfig,
     81     };
     82     use crate::core::Radrootsd;
     83     use crate::core::publish_proxy::{
     84         PublishJobVisibility, PublishPrincipalInit, PublishRelayResolveFuture,
     85         PublishRelayResolver, generate_bearer_token, hash_bearer_token,
     86     };
     87     use crate::transport::jsonrpc::methods;
     88     use crate::transport::jsonrpc::{MethodRegistry, RpcContext};
     89     use jsonrpsee::server::RpcModule;
     90     use nostr::JsonUtil;
     91     use radroots_identity::RadrootsIdentity;
     92     use radroots_nostr::prelude::{
     93         RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event,
     94     };
     95     use radroots_publish_proxy_protocol::PublishRelayPolicy;
     96     use radroots_relay_transport::RadrootsMockRelayPublishAdapter;
     97     use serde_json::Value;
     98     use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
     99     use std::sync::Arc;
    100     use tokio::io::{AsyncReadExt, AsyncWriteExt};
    101 
    102     const RELAY_PRIMARY: &str = "ws://localhost:7777";
    103     const RELAY_PUBLIC: &str = "wss://relay.example.com";
    104 
    105     fn unused_addr() -> SocketAddr {
    106         let listener = TcpListener::bind("127.0.0.1:0").expect("bind local addr");
    107         listener.local_addr().expect("local addr")
    108     }
    109 
    110     fn signed_event_json(identity: &RadrootsIdentity) -> String {
    111         radroots_nostr_build_event(
    112             30_402,
    113             "{}",
    114             vec![vec!["d".to_owned(), "listing-1".to_owned()]],
    115         )
    116         .expect("event builder")
    117         .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000))
    118         .sign_with_keys(identity.keys())
    119         .expect("signed event")
    120         .as_json()
    121     }
    122 
    123     async fn post_json(addr: SocketAddr, body: &str, token: Option<&str>) -> String {
    124         let mut stream = tokio::net::TcpStream::connect(addr).await.expect("connect");
    125         let auth_header = token
    126             .map(|token| format!("Authorization: Bearer {token}\r\n"))
    127             .unwrap_or_default();
    128         let request = format!(
    129             "POST / HTTP/1.1\r\nHost: {addr}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n{auth_header}\r\n{body}",
    130             body.len()
    131         );
    132         stream
    133             .write_all(request.as_bytes())
    134             .await
    135             .expect("write request");
    136         let mut bytes = Vec::new();
    137         stream.read_to_end(&mut bytes).await.expect("read response");
    138         String::from_utf8(bytes).expect("response utf8")
    139     }
    140 
    141     fn publish_server_state_with_config(
    142         publish_proxy_config: PublishProxyConfig,
    143         resolver: Option<Arc<dyn PublishRelayResolver>>,
    144     ) -> (
    145         Radrootsd,
    146         String,
    147         RadrootsIdentity,
    148         RadrootsMockRelayPublishAdapter,
    149     ) {
    150         let identity = RadrootsIdentity::generate();
    151         let metadata: RadrootsNostrMetadata =
    152             serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata");
    153         let mut state = Radrootsd::new(
    154             identity.clone(),
    155             metadata,
    156             publish_proxy_config,
    157             Nip46Config::default(),
    158         )
    159         .expect("state");
    160         let adapter = RadrootsMockRelayPublishAdapter::new();
    161         let mut publish_proxy = state.publish_proxy.clone();
    162         if let Some(resolver) = resolver {
    163             publish_proxy = publish_proxy.with_relay_resolver(resolver);
    164         }
    165         state.publish_proxy = publish_proxy.with_publisher(Arc::new(adapter.clone()));
    166         let token = generate_bearer_token();
    167         state
    168             .publish_proxy
    169             .store
    170             .create_principal(PublishPrincipalInit {
    171                 label: "tester".to_owned(),
    172                 token_hash: hash_bearer_token(token.as_str()),
    173                 allowed_pubkeys: vec![identity.public_key_hex()],
    174                 allowed_kinds: vec![30_402],
    175                 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly],
    176                 allow_request_relays: false,
    177                 job_visibility: PublishJobVisibility::Own,
    178                 expires_at_unix: None,
    179             })
    180             .expect("principal");
    181         (state, token, identity, adapter)
    182     }
    183 
    184     fn publish_server_state() -> (
    185         Radrootsd,
    186         String,
    187         RadrootsIdentity,
    188         RadrootsMockRelayPublishAdapter,
    189     ) {
    190         publish_server_state_with_config(
    191             PublishProxyConfig {
    192                 daemon_default_publish_relays: vec![RELAY_PRIMARY.to_owned()],
    193                 relay_url_policy: PublishProxyRelayUrlPolicy::Localhost,
    194                 ..PublishProxyConfig::default()
    195             },
    196             None,
    197         )
    198     }
    199 
    200     struct StaticPublishRelayResolver {
    201         addresses: Vec<IpAddr>,
    202     }
    203 
    204     impl StaticPublishRelayResolver {
    205         fn forbidden_localhost() -> Self {
    206             Self {
    207                 addresses: vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))],
    208             }
    209         }
    210     }
    211 
    212     impl PublishRelayResolver for StaticPublishRelayResolver {
    213         fn resolve<'a>(
    214             &'a self,
    215             _url: &'a radroots_relay_transport::RadrootsRelayUrl,
    216         ) -> PublishRelayResolveFuture<'a> {
    217             Box::pin(async move { Ok(self.addresses.clone()) })
    218         }
    219     }
    220 
    221     async fn start_publish_server(
    222         state: Radrootsd,
    223         rpc_cfg: RpcConfig,
    224     ) -> (SocketAddr, jsonrpsee::server::ServerHandle) {
    225         let addr = unused_addr();
    226         let store = state.publish_proxy.store.clone();
    227         let registry = MethodRegistry::default();
    228         let ctx = RpcContext::new(state, registry.clone());
    229         let mut root = RpcModule::new(ctx.clone());
    230         methods::register_all(&mut root, ctx, registry).expect("register methods");
    231         let handle = start_server(addr, &rpc_cfg, store, root)
    232             .await
    233             .expect("start server");
    234         (addr, handle)
    235     }
    236 
    237     fn json_response_body(response: &str) -> Value {
    238         let (_headers, body) = response.split_once("\r\n\r\n").expect("http body");
    239         serde_json::from_str(body).expect("json response body")
    240     }
    241 
    242     #[tokio::test]
    243     async fn raw_http_publish_event_get_and_list_preserve_signed_event() {
    244         let (state, token, identity, adapter) = publish_server_state();
    245         let event_json = signed_event_json(&identity);
    246         let (addr, handle) = start_publish_server(state, RpcConfig::default()).await;
    247         let publish = format!(
    248             r#"{{
    249                 "jsonrpc":"2.0",
    250                 "method":"publish.event",
    251                 "params":{{
    252                     "event":{},
    253                     "relays":[],
    254                     "relay_policy":"daemon_default_only",
    255                     "delivery_policy":{{"mode":"any"}},
    256                     "idempotency_key":"raw-http-idem"
    257                 }},
    258                 "id":1
    259             }}"#,
    260             event_json
    261         );
    262         let publish_response = post_json(addr, publish.as_str(), Some(token.as_str())).await;
    263         let publish_value = json_response_body(publish_response.as_str());
    264         let job_id = publish_value["result"]["job"]["job_id"]
    265             .as_str()
    266             .expect("job id")
    267             .to_owned();
    268         assert_eq!(publish_value["result"]["deduplicated"], false);
    269         assert_eq!(
    270             publish_value["result"]["job"]["status"],
    271             "delivery_satisfied"
    272         );
    273 
    274         let get = format!(
    275             r#"{{
    276                 "jsonrpc":"2.0",
    277                 "method":"publish.job.get",
    278                 "params":{{"job_id":"{job_id}"}},
    279                 "id":2
    280             }}"#
    281         );
    282         let get_response = post_json(addr, get.as_str(), Some(token.as_str())).await;
    283         let get_value = json_response_body(get_response.as_str());
    284         assert_eq!(get_value["result"]["job_id"], job_id);
    285         assert_eq!(get_value["result"]["status"], "delivery_satisfied");
    286 
    287         let list = r#"{
    288             "jsonrpc":"2.0",
    289             "method":"publish.job.list",
    290             "params":{"limit":10},
    291             "id":3
    292         }"#;
    293         let list_response = post_json(addr, list, Some(token.as_str())).await;
    294         let list_value = json_response_body(list_response.as_str());
    295         let jobs = list_value["result"].as_array().expect("jobs");
    296         assert_eq!(jobs.len(), 1);
    297         assert_eq!(jobs[0]["job_id"], job_id);
    298         handle.stop().expect("stop server");
    299 
    300         assert_eq!(adapter.captured_raw_events(), vec![event_json]);
    301     }
    302 
    303     #[tokio::test]
    304     async fn raw_http_publish_event_rejects_public_relay_forbidden_dns_destination() {
    305         let (state, token, identity, adapter) = publish_server_state_with_config(
    306             PublishProxyConfig {
    307                 daemon_default_publish_relays: vec![RELAY_PUBLIC.to_owned()],
    308                 relay_url_policy: PublishProxyRelayUrlPolicy::Public,
    309                 ..PublishProxyConfig::default()
    310             },
    311             Some(Arc::new(StaticPublishRelayResolver::forbidden_localhost())),
    312         );
    313         let event_json = signed_event_json(&identity);
    314         let (addr, handle) = start_publish_server(state, RpcConfig::default()).await;
    315         let publish = format!(
    316             r#"{{
    317                 "jsonrpc":"2.0",
    318                 "method":"publish.event",
    319                 "params":{{
    320                     "event":{},
    321                     "relays":[],
    322                     "relay_policy":"daemon_default_only",
    323                     "delivery_policy":{{"mode":"any"}},
    324                     "idempotency_key":"raw-http-public-dns-reject"
    325                 }},
    326                 "id":1
    327             }}"#,
    328             event_json
    329         );
    330         let publish_response = post_json(addr, publish.as_str(), Some(token.as_str())).await;
    331         handle.stop().expect("stop server");
    332 
    333         let publish_value = json_response_body(publish_response.as_str());
    334         let job = &publish_value["result"]["job"];
    335         assert_eq!(publish_value["result"]["deduplicated"], false);
    336         assert_eq!(job["status"], "rejected");
    337         assert_eq!(job["last_error"], "no_publish_relays");
    338         let relays = job["relays"].as_array().expect("relay outcomes");
    339         assert_eq!(relays.len(), 1);
    340         assert_eq!(relays[0]["relay_url"], RELAY_PUBLIC);
    341         assert_eq!(relays[0]["source"], "daemon_default");
    342         assert_eq!(relays[0]["outcome_kind"], "relay_url_rejected");
    343         assert_eq!(relays[0]["attempted"], false);
    344         assert!(adapter.captured_raw_events().is_empty());
    345     }
    346 
    347     #[tokio::test]
    348     async fn publish_notifications_do_not_create_jobs() {
    349         let (state, token, identity, _adapter) = publish_server_state();
    350         let store = state.publish_proxy.store.clone();
    351         let (addr, handle) = start_publish_server(state, RpcConfig::default()).await;
    352         let notification = format!(
    353             r#"{{
    354                 "jsonrpc":"2.0",
    355                 "method":"publish.event",
    356                 "params":{{
    357                     "event":{},
    358                     "relays":[],
    359                     "relay_policy":"daemon_default_only",
    360                     "delivery_policy":{{"mode":"any"}}
    361                 }}
    362             }}"#,
    363             signed_event_json(&identity)
    364         );
    365         let response = post_json(addr, notification.as_str(), Some(token.as_str())).await;
    366         handle.stop().expect("stop server");
    367 
    368         assert!(
    369             response.contains("publish notifications are not accepted")
    370                 || response.ends_with("\r\n\r\n")
    371         );
    372         let principal = store
    373             .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str())
    374             .expect("principal lookup")
    375             .expect("principal");
    376         assert!(
    377             store
    378                 .list_jobs_for_principal(&principal, 10)
    379                 .expect("jobs")
    380                 .is_empty()
    381         );
    382     }
    383 
    384     #[tokio::test]
    385     async fn batch_requests_are_disabled_by_default() {
    386         let (state, token, identity, _adapter) = publish_server_state();
    387         let store = state.publish_proxy.store.clone();
    388         let (addr, handle) = start_publish_server(state, RpcConfig::default()).await;
    389         let batch = format!(
    390             r#"[{{
    391                 "jsonrpc":"2.0",
    392                 "method":"publish.event",
    393                 "params":{{
    394                     "event":{},
    395                     "relays":[],
    396                     "relay_policy":"daemon_default_only",
    397                     "delivery_policy":{{"mode":"any"}}
    398                 }},
    399                 "id":1
    400             }}]"#,
    401             signed_event_json(&identity)
    402         );
    403         let response = post_json(addr, batch.as_str(), Some(token.as_str())).await;
    404         handle.stop().expect("stop server");
    405 
    406         assert!(
    407             response.contains("Batched requests are not supported by this server"),
    408             "{response}"
    409         );
    410         let principal = store
    411             .principal_for_token_hash(hash_bearer_token(token.as_str()).as_str())
    412             .expect("principal lookup")
    413             .expect("principal");
    414         assert!(
    415             store
    416                 .list_jobs_for_principal(&principal, 10)
    417                 .expect("jobs")
    418                 .is_empty()
    419         );
    420     }
    421 }
    422 
    423 pub async fn start_server(
    424     addr: SocketAddr,
    425     rpc_cfg: &RpcConfig,
    426     publish_proxy_store: PublishProxyStore,
    427     root: RpcModule<RpcContext>,
    428 ) -> Result<ServerHandle> {
    429     let mut builder = ServerConfigBuilder::new()
    430         .max_request_body_size(rpc_cfg.max_request_body_size)
    431         .max_response_body_size(rpc_cfg.max_response_body_size)
    432         .max_connections(rpc_cfg.max_connections)
    433         .max_subscriptions_per_connection(rpc_cfg.max_subscriptions_per_connection)
    434         .set_message_buffer_capacity(rpc_cfg.message_buffer_capacity);
    435 
    436     if let Some(limit) = rpc_cfg.batch_request_limit {
    437         let cfg = if limit == 0 {
    438             BatchRequestConfig::Disabled
    439         } else {
    440             BatchRequestConfig::Limit(limit)
    441         };
    442         builder = builder.set_batch_request_config(cfg);
    443     }
    444 
    445     let server_cfg = builder.build();
    446     let rpc_middleware =
    447         RpcServiceBuilder::new().layer_fn(|service| RejectPublishNotifications { service });
    448     let server = ServerBuilder::with_config(server_cfg)
    449         .set_rpc_middleware(rpc_middleware)
    450         .set_http_middleware(tower::ServiceBuilder::new().map_request(
    451             move |mut request: HttpRequest<HttpBody>| {
    452                 let publish_proxy_auth = auth::authorize_publish_proxy_request(
    453                     request
    454                         .headers()
    455                         .get("authorization")
    456                         .and_then(|value| value.to_str().ok()),
    457                     &publish_proxy_store,
    458                 );
    459                 request.extensions_mut().insert(publish_proxy_auth);
    460                 request
    461             },
    462         ))
    463         .build(addr)
    464         .await?;
    465     Ok(server.start(root))
    466 }