radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

publish_proxy.rs (19162B)


      1 use anyhow::Result;
      2 use jsonrpsee::server::RpcModule;
      3 use radroots_publish_proxy_protocol::{
      4     METHOD_CAPABILITIES, METHOD_EVENT, METHOD_JOB_GET, METHOD_JOB_LIST, METHOD_RELAYS_RESOLVE,
      5     PublishCapabilities, PublishDeliveryPolicy, PublishEventRequest, PublishRelayOutcome,
      6     PublishRelaySource,
      7 };
      8 use serde::{Deserialize, Serialize};
      9 
     10 use crate::core::publish_proxy::PublishProxyError;
     11 use crate::transport::jsonrpc::auth::require_publish_principal;
     12 use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError};
     13 
     14 #[derive(Debug, Deserialize)]
     15 struct JobGetParams {
     16     job_id: String,
     17 }
     18 
     19 #[derive(Debug, Deserialize)]
     20 #[serde(deny_unknown_fields)]
     21 struct JobListParams {
     22     limit: Option<usize>,
     23 }
     24 
     25 #[derive(Debug, Deserialize)]
     26 struct RelaysResolveParams {
     27     event: radroots_publish_proxy_protocol::SignedNostrEventWire,
     28     relay_policy: radroots_publish_proxy_protocol::PublishRelayPolicy,
     29     #[serde(default)]
     30     relays: Vec<String>,
     31 }
     32 
     33 #[derive(Clone, Debug, Serialize)]
     34 struct RelaysResolveResponse {
     35     relays: Vec<ResolvedRelayResponseItem>,
     36     rejected_relays: Vec<PublishRelayOutcome>,
     37 }
     38 
     39 #[derive(Clone, Debug, Serialize)]
     40 struct ResolvedRelayResponseItem {
     41     relay_url: String,
     42     source: PublishRelaySource,
     43 }
     44 
     45 pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> {
     46     let mut module = RpcModule::new(ctx);
     47     register_capabilities(&mut module, &registry)?;
     48     register_event(&mut module, &registry)?;
     49     register_job_get(&mut module, &registry)?;
     50     register_job_list(&mut module, &registry)?;
     51     register_relays_resolve(&mut module, &registry)?;
     52     Ok(module)
     53 }
     54 
     55 fn register_capabilities(
     56     module: &mut RpcModule<RpcContext>,
     57     registry: &MethodRegistry,
     58 ) -> Result<()> {
     59     registry.track(METHOD_CAPABILITIES);
     60     module.register_async_method(METHOD_CAPABILITIES, |_params, ctx, extensions| async move {
     61         require_publish_principal(&extensions)?;
     62         Ok::<PublishCapabilities, RpcError>(PublishCapabilities::v1(
     63             ctx.state.publish_proxy.config.max_event_bytes,
     64             ctx.state.publish_proxy.config.max_relays_per_request,
     65         ))
     66     })?;
     67     Ok(())
     68 }
     69 
     70 fn register_event(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> {
     71     registry.track(METHOD_EVENT);
     72     module.register_async_method(METHOD_EVENT, |params, ctx, extensions| async move {
     73         let principal = require_publish_principal(&extensions)?;
     74         let request: PublishEventRequest = params
     75             .parse()
     76             .map_err(|error| RpcError::InvalidParams(error.to_string()))?;
     77         ctx.state
     78             .publish_proxy
     79             .publish_event(&principal, request)
     80             .await
     81             .map_err(rpc_error_from_publish_proxy)
     82     })?;
     83     Ok(())
     84 }
     85 
     86 fn register_job_get(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> {
     87     registry.track(METHOD_JOB_GET);
     88     module.register_async_method(METHOD_JOB_GET, |params, ctx, extensions| async move {
     89         let principal = require_publish_principal(&extensions)?;
     90         let params: JobGetParams = params
     91             .parse()
     92             .map_err(|error| RpcError::InvalidParams(error.to_string()))?;
     93         let job_id = params.job_id.trim();
     94         if job_id.is_empty() {
     95             return Err(RpcError::InvalidParams("missing job_id".to_owned()));
     96         }
     97         ctx.state
     98             .publish_proxy
     99             .store
    100             .job_by_id_for_principal(job_id, &principal)
    101             .map_err(|error| RpcError::Other(error.to_string()))?
    102             .ok_or_else(|| RpcError::Other(format!("unknown publish job: {job_id}")))
    103     })?;
    104     Ok(())
    105 }
    106 
    107 fn register_job_list(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> {
    108     registry.track(METHOD_JOB_LIST);
    109     module.register_async_method(METHOD_JOB_LIST, |params, ctx, extensions| async move {
    110         let principal = require_publish_principal(&extensions)?;
    111         let params = if params.len_bytes() == 0 || params.as_str() == Some("[]") {
    112             JobListParams { limit: None }
    113         } else {
    114             params
    115                 .parse::<JobListParams>()
    116                 .map_err(|error| RpcError::InvalidParams(error.to_string()))?
    117         };
    118         if params.limit == Some(0) {
    119             return Err(RpcError::InvalidParams(
    120                 "limit must be greater than zero".to_owned(),
    121             ));
    122         }
    123         let configured_limit = ctx.state.publish_proxy.config.job_list_limit;
    124         let limit = params
    125             .limit
    126             .unwrap_or(configured_limit)
    127             .min(configured_limit);
    128         ctx.state
    129             .publish_proxy
    130             .store
    131             .list_jobs_for_principal(&principal, limit)
    132             .map_err(|error| RpcError::Other(error.to_string()))
    133     })?;
    134     Ok(())
    135 }
    136 
    137 fn register_relays_resolve(
    138     module: &mut RpcModule<RpcContext>,
    139     registry: &MethodRegistry,
    140 ) -> Result<()> {
    141     registry.track(METHOD_RELAYS_RESOLVE);
    142     module.register_async_method(
    143         METHOD_RELAYS_RESOLVE,
    144         |params, ctx, extensions| async move {
    145             let principal = require_publish_principal(&extensions)?;
    146             let params: RelaysResolveParams = params
    147                 .parse()
    148                 .map_err(|error| RpcError::InvalidParams(error.to_string()))?;
    149             params
    150                 .event
    151                 .validate()
    152                 .map_err(|error| RpcError::InvalidParams(error.to_string()))?;
    153             let request = PublishEventRequest {
    154                 event: params.event,
    155                 relays: params.relays,
    156                 relay_policy: params.relay_policy,
    157                 delivery_policy: PublishDeliveryPolicy::Any,
    158                 idempotency_key: None,
    159                 timeout_ms: None,
    160             };
    161             principal
    162                 .allows_event(&request)
    163                 .map_err(|error| RpcError::Unauthorized(error.to_string()))?;
    164             let resolution = ctx
    165                 .state
    166                 .publish_proxy
    167                 .resolve_relays_for_request(request.event.pubkey.as_str(), &request)
    168                 .await
    169                 .map_err(rpc_error_from_publish_proxy)?;
    170             Ok::<RelaysResolveResponse, RpcError>(RelaysResolveResponse {
    171                 relays: resolution
    172                     .targets
    173                     .into_iter()
    174                     .map(|target| ResolvedRelayResponseItem {
    175                         relay_url: target.url.into_string(),
    176                         source: target.source,
    177                     })
    178                     .collect(),
    179                 rejected_relays: resolution.outcomes,
    180             })
    181         },
    182     )?;
    183     Ok(())
    184 }
    185 
    186 fn rpc_error_from_publish_proxy(error: PublishProxyError) -> RpcError {
    187     match error {
    188         PublishProxyError::InvalidScope(message) => RpcError::Unauthorized(message),
    189         PublishProxyError::InvalidSignedEvent(message) => RpcError::InvalidParams(message),
    190         PublishProxyError::SignedEventVerification(_)
    191         | PublishProxyError::Draft(_)
    192         | PublishProxyError::Relay(_) => RpcError::InvalidParams(error.to_string()),
    193         PublishProxyError::IdempotencyConflict(_) => RpcError::Other(error.to_string()),
    194         other => RpcError::Other(other.to_string()),
    195     }
    196 }
    197 
    198 #[cfg(test)]
    199 mod tests {
    200     use super::module;
    201     use std::sync::Arc;
    202 
    203     use crate::app::config::{Nip46Config, PublishProxyConfig};
    204     use crate::core::Radrootsd;
    205     use crate::core::publish_proxy::{
    206         PublishJobVisibility, PublishPrincipalInit, generate_bearer_token, hash_bearer_token,
    207     };
    208     use crate::transport::jsonrpc::auth::{
    209         PublishProxyAuthorization, authorize_publish_proxy_request,
    210     };
    211     use crate::transport::jsonrpc::{MethodRegistry, RpcContext};
    212     use jsonrpsee::server::RpcModule;
    213     use nostr::JsonUtil;
    214     use radroots_identity::RadrootsIdentity;
    215     use radroots_nostr::prelude::{
    216         RadrootsNostrMetadata, RadrootsNostrTimestamp, radroots_nostr_build_event,
    217     };
    218     use radroots_publish_proxy_protocol::{PublishRelayPolicy, SignedNostrEventWire};
    219     use radroots_relay_transport::RadrootsMockRelayPublishAdapter;
    220 
    221     fn signed_event(identity: &RadrootsIdentity) -> SignedNostrEventWire {
    222         let event = radroots_nostr_build_event(
    223             30_402,
    224             "{}",
    225             vec![vec!["d".to_owned(), "listing-1".to_owned()]],
    226         )
    227         .expect("event builder")
    228         .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000))
    229         .sign_with_keys(identity.keys())
    230         .expect("signed event");
    231         serde_json::from_str(event.as_json().as_str()).expect("event wire")
    232     }
    233 
    234     fn module_with_principal_and_config(
    235         admin: bool,
    236         publish_proxy_config: PublishProxyConfig,
    237     ) -> (
    238         RpcModule<RpcContext>,
    239         RpcContext,
    240         String,
    241         SignedNostrEventWire,
    242     ) {
    243         let identity = RadrootsIdentity::generate();
    244         let signed_event = signed_event(&identity);
    245         let metadata: RadrootsNostrMetadata =
    246             serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata");
    247         let state = Radrootsd::new(
    248             identity.clone(),
    249             metadata,
    250             publish_proxy_config,
    251             Nip46Config::default(),
    252         )
    253         .expect("state");
    254         let mut state = state;
    255         state.publish_proxy = state
    256             .publish_proxy
    257             .clone()
    258             .with_publisher(Arc::new(RadrootsMockRelayPublishAdapter::new()));
    259         let token = generate_bearer_token();
    260         let principal = state
    261             .publish_proxy
    262             .store
    263             .create_principal(PublishPrincipalInit {
    264                 label: "tester".to_owned(),
    265                 token_hash: hash_bearer_token(token.as_str()),
    266                 allowed_pubkeys: vec![identity.public_key_hex()],
    267                 allowed_kinds: vec![30_402],
    268                 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly],
    269                 allow_request_relays: false,
    270                 job_visibility: if admin {
    271                     PublishJobVisibility::Admin
    272                 } else {
    273                     PublishJobVisibility::Own
    274                 },
    275                 expires_at_unix: None,
    276             })
    277             .expect("principal");
    278         let registry = MethodRegistry::default();
    279         let ctx = RpcContext::new(state, registry.clone());
    280         let mut module = module(ctx.clone(), registry).expect("module");
    281         module
    282             .extensions_mut()
    283             .insert(PublishProxyAuthorization::Authorized(principal));
    284         (module, ctx, token, signed_event)
    285     }
    286 
    287     fn module_with_principal(
    288         admin: bool,
    289     ) -> (
    290         RpcModule<RpcContext>,
    291         RpcContext,
    292         String,
    293         SignedNostrEventWire,
    294     ) {
    295         module_with_principal_and_config(
    296             admin,
    297             PublishProxyConfig {
    298                 daemon_default_publish_relays: vec!["wss://relay.example.com".to_owned()],
    299                 ..PublishProxyConfig::default()
    300             },
    301         )
    302     }
    303 
    304     #[tokio::test]
    305     async fn publish_event_records_job_and_deduplicates_idempotency() {
    306         let (module, _ctx, _token, event) = module_with_principal(false);
    307         let request = format!(
    308             r#"{{
    309                 "jsonrpc":"2.0",
    310                 "method":"publish.event",
    311                 "params":{{
    312                     "event":{},
    313                     "relays":[],
    314                     "relay_policy":"daemon_default_only",
    315                     "delivery_policy":{{"mode":"any"}},
    316                     "idempotency_key":"idem-1"
    317                 }},
    318                 "id":1
    319             }}"#,
    320             serde_json::to_string(&event).expect("event json")
    321         );
    322         let (response, _stream) = module
    323             .raw_json_request(request.as_str(), 1)
    324             .await
    325             .expect("request");
    326         assert!(response.get().contains("\"deduplicated\":false"));
    327         let (response, _stream) = module
    328             .raw_json_request(request.as_str(), 1)
    329             .await
    330             .expect("request");
    331         assert!(response.get().contains("\"deduplicated\":true"));
    332     }
    333 
    334     #[tokio::test]
    335     async fn publish_event_rejects_principal_scope_gap() {
    336         let (module, _ctx, _token, _pubkey) = module_with_principal(false);
    337         let other_identity = RadrootsIdentity::generate();
    338         let event = signed_event(&other_identity);
    339         let request = format!(
    340             r#"{{
    341                 "jsonrpc":"2.0",
    342                 "method":"publish.event",
    343                 "params":{{
    344                     "event":{},
    345                     "relays":[],
    346                     "relay_policy":"daemon_default_only",
    347                     "delivery_policy":{{"mode":"any"}}
    348                 }},
    349                 "id":1
    350             }}"#,
    351             serde_json::to_string(&event).expect("event json")
    352         );
    353         let (response, _stream) = module
    354             .raw_json_request(request.as_str(), 1)
    355             .await
    356             .expect("request");
    357         assert!(response.get().contains("unauthorized"));
    358     }
    359 
    360     #[tokio::test]
    361     async fn publish_job_list_rejects_malformed_and_zero_limits() {
    362         let (module, _ctx, _token, _event) = module_with_principal(false);
    363         let malformed = r#"{
    364             "jsonrpc":"2.0",
    365             "method":"publish.job.list",
    366             "params":"bad",
    367             "id":1
    368         }"#;
    369         let (response, _stream) = module
    370             .raw_json_request(malformed, 1)
    371             .await
    372             .expect("malformed request");
    373         assert!(response.get().contains("\"code\":-32602"));
    374 
    375         let zero = r#"{
    376             "jsonrpc":"2.0",
    377             "method":"publish.job.list",
    378             "params":{"limit":0},
    379             "id":1
    380         }"#;
    381         let (response, _stream) = module
    382             .raw_json_request(zero, 1)
    383             .await
    384             .expect("zero request");
    385         assert!(response.get().contains("\"code\":-32602"));
    386         assert!(response.get().contains("limit must be greater than zero"));
    387     }
    388 
    389     #[tokio::test]
    390     async fn publish_job_list_rejects_unknown_fields() {
    391         let (module, _ctx, _token, _event) = module_with_principal(false);
    392         for params in [
    393             r#"{"cursor":"next"}"#,
    394             r#"{"status":"publishing"}"#,
    395             r#"{"limit":1,"extra":true}"#,
    396         ] {
    397             let request = format!(
    398                 r#"{{
    399                     "jsonrpc":"2.0",
    400                     "method":"publish.job.list",
    401                     "params":{params},
    402                     "id":1
    403                 }}"#
    404             );
    405             let (response, _stream) = module
    406                 .raw_json_request(request.as_str(), 1)
    407                 .await
    408                 .expect("unknown field request");
    409             assert!(
    410                 response.get().contains("\"code\":-32602"),
    411                 "{}",
    412                 response.get()
    413             );
    414         }
    415     }
    416 
    417     #[tokio::test]
    418     async fn publish_job_list_uses_configured_limit_when_omitted_and_caps_positive_limits() {
    419         let mut config = PublishProxyConfig {
    420             daemon_default_publish_relays: vec!["wss://relay.example.com".to_owned()],
    421             ..PublishProxyConfig::default()
    422         };
    423         config.job_list_limit = 1;
    424         let (module, _ctx, _token, event) = module_with_principal_and_config(false, config);
    425         for idempotency_key in ["idem-list-1", "idem-list-2"] {
    426             let request = format!(
    427                 r#"{{
    428                     "jsonrpc":"2.0",
    429                     "method":"publish.event",
    430                     "params":{{
    431                         "event":{},
    432                         "relays":[],
    433                         "relay_policy":"daemon_default_only",
    434                         "delivery_policy":{{"mode":"any"}},
    435                         "idempotency_key":"{idempotency_key}"
    436                     }},
    437                     "id":1
    438                 }}"#,
    439                 serde_json::to_string(&event).expect("event json")
    440             );
    441             let (response, _stream) = module
    442                 .raw_json_request(request.as_str(), 1)
    443                 .await
    444                 .expect("publish request");
    445             assert!(response.get().contains("\"deduplicated\":false"));
    446         }
    447 
    448         let omitted = r#"{
    449             "jsonrpc":"2.0",
    450             "method":"publish.job.list",
    451             "id":1
    452         }"#;
    453         let (response, _stream) = module
    454             .raw_json_request(omitted, 1)
    455             .await
    456             .expect("omitted request");
    457         let value: serde_json::Value =
    458             serde_json::from_str(response.get()).expect("omitted response json");
    459         assert_eq!(value["result"].as_array().expect("jobs").len(), 1);
    460 
    461         let empty_array = r#"{
    462             "jsonrpc":"2.0",
    463             "method":"publish.job.list",
    464             "params":[],
    465             "id":1
    466         }"#;
    467         let (response, _stream) = module
    468             .raw_json_request(empty_array, 1)
    469             .await
    470             .expect("empty array request");
    471         let value: serde_json::Value =
    472             serde_json::from_str(response.get()).expect("empty array response json");
    473         assert_eq!(value["result"].as_array().expect("jobs").len(), 1);
    474 
    475         let over_limit = r#"{
    476             "jsonrpc":"2.0",
    477             "method":"publish.job.list",
    478             "params":{"limit":50},
    479             "id":1
    480         }"#;
    481         let (response, _stream) = module
    482             .raw_json_request(over_limit, 1)
    483             .await
    484             .expect("over limit request");
    485         let value: serde_json::Value =
    486             serde_json::from_str(response.get()).expect("over limit response json");
    487         assert_eq!(value["result"].as_array().expect("jobs").len(), 1);
    488     }
    489 
    490     #[tokio::test]
    491     async fn publish_relays_resolve_returns_daemon_default_targets() {
    492         let (module, _ctx, _token, event) = module_with_principal(false);
    493         let request = format!(
    494             r#"{{
    495                 "jsonrpc":"2.0",
    496                 "method":"publish.relays.resolve",
    497                 "params":{{
    498                     "event":{},
    499                     "relay_policy":"daemon_default_only",
    500                     "relays":[]
    501                 }},
    502                 "id":1
    503             }}"#,
    504             serde_json::to_string(&event).expect("event json")
    505         );
    506         let (response, _stream) = module
    507             .raw_json_request(request.as_str(), 1)
    508             .await
    509             .expect("request");
    510         assert!(
    511             response
    512                 .get()
    513                 .contains("\"relay_url\":\"wss://relay.example.com\"")
    514         );
    515         assert!(response.get().contains("\"source\":\"daemon_default\""));
    516     }
    517 
    518     #[test]
    519     fn http_auth_finds_principal_from_hashed_token() {
    520         let (_module, ctx, token, _pubkey) = module_with_principal(false);
    521         let header = format!("Bearer {token}");
    522         let auth =
    523             authorize_publish_proxy_request(Some(header.as_str()), &ctx.state.publish_proxy.store);
    524         assert!(matches!(auth, PublishProxyAuthorization::Authorized(_)));
    525     }
    526 }