myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

operability_server.rs (8840B)


      1 use std::net::{SocketAddr, TcpListener as StdTcpListener};
      2 use std::path::{Path, PathBuf};
      3 use std::time::Duration;
      4 
      5 use myc::{MycConfig, MycRuntime, MycTransportDeliveryPolicy};
      6 use radroots_identity::RadrootsIdentity;
      7 use serde_json::Value;
      8 use tokio::io::{AsyncReadExt, AsyncWriteExt};
      9 use tokio::net::{TcpListener, TcpStream};
     10 use tokio::sync::oneshot;
     11 use tokio::time::{sleep, timeout};
     12 
     13 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
     14 
     15 const HTTP_READY_TIMEOUT: Duration = Duration::from_secs(15);
     16 
     17 struct TestRelay {
     18     url: String,
     19     shutdown_tx: Option<oneshot::Sender<()>>,
     20 }
     21 
     22 impl TestRelay {
     23     async fn spawn() -> TestResult<Self> {
     24         let listener = TcpListener::bind("127.0.0.1:0").await?;
     25         let addr = listener.local_addr()?;
     26         let url = format!("ws://{addr}");
     27         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
     28 
     29         tokio::spawn(async move {
     30             loop {
     31                 tokio::select! {
     32                     _ = &mut shutdown_rx => break,
     33                     accept = listener.accept() => {
     34                         let Ok((stream, _)) = accept else {
     35                             break;
     36                         };
     37                         tokio::spawn(async move {
     38                             let _ = tokio_tungstenite::accept_async(stream).await;
     39                         });
     40                     }
     41                 }
     42             }
     43         });
     44 
     45         Ok(Self {
     46             url,
     47             shutdown_tx: Some(shutdown_tx),
     48         })
     49     }
     50 
     51     fn url(&self) -> &str {
     52         self.url.as_str()
     53     }
     54 }
     55 
     56 impl Drop for TestRelay {
     57     fn drop(&mut self) {
     58         if let Some(shutdown_tx) = self.shutdown_tx.take() {
     59             let _ = shutdown_tx.send(());
     60         }
     61     }
     62 }
     63 
     64 struct HangingRelay {
     65     url: String,
     66     shutdown_tx: Option<oneshot::Sender<()>>,
     67 }
     68 
     69 impl HangingRelay {
     70     async fn spawn(hold_open_for: Duration) -> TestResult<Self> {
     71         let listener = TcpListener::bind("127.0.0.1:0").await?;
     72         let addr = listener.local_addr()?;
     73         let url = format!("ws://{addr}");
     74         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
     75 
     76         tokio::spawn(async move {
     77             loop {
     78                 tokio::select! {
     79                     _ = &mut shutdown_rx => break,
     80                     accept = listener.accept() => {
     81                         let Ok((stream, _)) = accept else {
     82                             break;
     83                         };
     84                         tokio::spawn(async move {
     85                             sleep(hold_open_for).await;
     86                             drop(stream);
     87                         });
     88                     }
     89                 }
     90             }
     91         });
     92 
     93         Ok(Self {
     94             url,
     95             shutdown_tx: Some(shutdown_tx),
     96         })
     97     }
     98 
     99     fn url(&self) -> &str {
    100         self.url.as_str()
    101     }
    102 }
    103 
    104 impl Drop for HangingRelay {
    105     fn drop(&mut self) {
    106         if let Some(shutdown_tx) = self.shutdown_tx.take() {
    107             let _ = shutdown_tx.send(());
    108         }
    109     }
    110 }
    111 
    112 fn write_test_identity(path: &Path, secret_key: &str) {
    113     let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret");
    114     myc::identity_files::store_encrypted_identity(path, &identity).expect("write identity");
    115 }
    116 
    117 fn free_loopback_addr() -> SocketAddr {
    118     let listener = StdTcpListener::bind("127.0.0.1:0").expect("bind free loopback addr");
    119     let addr = listener.local_addr().expect("local addr");
    120     drop(listener);
    121     addr
    122 }
    123 
    124 fn build_runtime<F>(configure: F) -> (MycRuntime, SocketAddr)
    125 where
    126     F: FnOnce(&mut MycConfig),
    127 {
    128     let temp = tempfile::tempdir().expect("tempdir").keep();
    129     let bind_addr = free_loopback_addr();
    130     let mut config = MycConfig::default();
    131     config.paths.state_dir = PathBuf::from(&temp).join("state");
    132     config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json");
    133     config.paths.user_identity_path = PathBuf::from(&temp).join("user.json");
    134     config.transport.connect_timeout_secs = 1;
    135     config.observability.enabled = true;
    136     config.observability.bind_addr = bind_addr;
    137     write_test_identity(
    138         &config.paths.signer_identity_path,
    139         "1111111111111111111111111111111111111111111111111111111111111111",
    140     );
    141     write_test_identity(
    142         &config.paths.user_identity_path,
    143         "2222222222222222222222222222222222222222222222222222222222222222",
    144     );
    145     configure(&mut config);
    146     (MycRuntime::bootstrap(config).expect("runtime"), bind_addr)
    147 }
    148 
    149 async fn spawn_runtime(runtime: MycRuntime) -> oneshot::Sender<()> {
    150     let (shutdown_tx, shutdown_rx) = oneshot::channel();
    151     tokio::spawn(async move {
    152         let _ = runtime
    153             .run_until(async move {
    154                 let _ = shutdown_rx.await;
    155             })
    156             .await;
    157     });
    158     shutdown_tx
    159 }
    160 
    161 async fn wait_for_http(addr: SocketAddr) -> TestResult<()> {
    162     timeout(HTTP_READY_TIMEOUT, async {
    163         loop {
    164             match TcpStream::connect(addr).await {
    165                 Ok(mut stream) => {
    166                     let _ = stream.shutdown().await;
    167                     return;
    168                 }
    169                 Err(_) => sleep(Duration::from_millis(50)).await,
    170             }
    171         }
    172     })
    173     .await?;
    174     Ok(())
    175 }
    176 
    177 struct SimpleHttpResponse {
    178     status: u16,
    179     content_type: Option<String>,
    180     body: String,
    181 }
    182 
    183 async fn http_get(addr: SocketAddr, path: &str) -> TestResult<SimpleHttpResponse> {
    184     let mut stream = TcpStream::connect(addr).await?;
    185     let request = format!("GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n");
    186     stream.write_all(request.as_bytes()).await?;
    187     let mut response = Vec::new();
    188     stream.read_to_end(&mut response).await?;
    189     let response = String::from_utf8(response)?;
    190     let (head, body) = response
    191         .split_once("\r\n\r\n")
    192         .ok_or("missing http body separator")?;
    193     let mut lines = head.lines();
    194     let status_line = lines.next().ok_or("missing status line")?;
    195     let status = status_line
    196         .split_whitespace()
    197         .nth(1)
    198         .ok_or("missing status code")?
    199         .parse::<u16>()?;
    200     let content_type = lines.find_map(|line| {
    201         let (key, value) = line.split_once(':')?;
    202         if key.eq_ignore_ascii_case("content-type") {
    203             Some(value.trim().to_owned())
    204         } else {
    205             None
    206         }
    207     });
    208     Ok(SimpleHttpResponse {
    209         status,
    210         content_type,
    211         body: body.to_owned(),
    212     })
    213 }
    214 
    215 #[tokio::test]
    216 async fn observability_server_reports_unready_when_transport_is_disabled() -> TestResult<()> {
    217     let (runtime, bind_addr) = build_runtime(|_| {});
    218     let shutdown_tx = spawn_runtime(runtime).await;
    219     wait_for_http(bind_addr).await?;
    220 
    221     let health = http_get(bind_addr, "/healthz").await?;
    222     assert_eq!(health.status, 503);
    223     assert_eq!(health.body, "unready");
    224 
    225     let ready = http_get(bind_addr, "/readyz").await?;
    226     assert_eq!(ready.status, 503);
    227     assert_eq!(ready.body, "unready");
    228 
    229     let status = http_get(bind_addr, "/status").await?;
    230     assert_eq!(status.status, 200);
    231     let body: Value = serde_json::from_str(status.body.as_str())?;
    232     assert_eq!(body["status"], "unready");
    233     assert_eq!(body["ready"], false);
    234 
    235     let metrics = http_get(bind_addr, "/metrics").await?;
    236     assert_eq!(metrics.status, 200);
    237     assert!(
    238         metrics
    239             .content_type
    240             .as_deref()
    241             .unwrap_or_default()
    242             .starts_with("text/plain")
    243     );
    244     assert!(metrics.body.contains("myc_runtime_operation_total"));
    245 
    246     let _ = shutdown_tx.send(());
    247     Ok(())
    248 }
    249 
    250 #[tokio::test]
    251 async fn observability_server_reports_degraded_but_ready_partial_outage() -> TestResult<()> {
    252     let relay = TestRelay::spawn().await?;
    253     let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?;
    254     let (runtime, bind_addr) = build_runtime(|config| {
    255         config.transport.enabled = true;
    256         config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()];
    257         config.transport.delivery_policy = MycTransportDeliveryPolicy::Any;
    258     });
    259     let shutdown_tx = spawn_runtime(runtime).await;
    260     wait_for_http(bind_addr).await?;
    261 
    262     let health = http_get(bind_addr, "/healthz").await?;
    263     assert_eq!(health.status, 200);
    264     assert_eq!(health.body, "degraded");
    265 
    266     let ready = http_get(bind_addr, "/readyz").await?;
    267     assert_eq!(ready.status, 200);
    268     assert_eq!(ready.body, "ready");
    269 
    270     let status = http_get(bind_addr, "/status").await?;
    271     let body: Value = serde_json::from_str(status.body.as_str())?;
    272     assert_eq!(body["status"], "degraded");
    273     assert_eq!(body["ready"], true);
    274     assert_eq!(body["transport"]["available_relay_count"], 1);
    275     assert_eq!(body["transport"]["unavailable_relay_count"], 1);
    276 
    277     let _ = shutdown_tx.send(());
    278     Ok(())
    279 }