myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

operability_e2e.rs (14090B)


      1 use std::path::{Path, PathBuf};
      2 use std::time::Duration;
      3 use std::time::{SystemTime, UNIX_EPOCH};
      4 
      5 use myc::{
      6     MycActiveIdentity, MycConfig, MycDeliveryOutboxKind, MycDeliveryOutboxRecord,
      7     MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime,
      8     MycRuntimeAuditBackend, MycRuntimeStatus, MycSignerStateBackend, MycTransportDeliveryPolicy,
      9     collect_status_full,
     10 };
     11 use radroots_identity::RadrootsIdentity;
     12 use radroots_nostr::prelude::{
     13     RadrootsNostrEventBuilder, RadrootsNostrKind, RadrootsNostrRelayUrl,
     14 };
     15 use radroots_nostr_signer::prelude::{
     16     RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionDraft,
     17 };
     18 use tokio::net::TcpListener;
     19 use tokio::sync::oneshot;
     20 use tokio::time::sleep;
     21 
     22 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
     23 
     24 struct TestRelay {
     25     url: String,
     26     shutdown_tx: Option<oneshot::Sender<()>>,
     27 }
     28 
     29 impl TestRelay {
     30     async fn spawn() -> TestResult<Self> {
     31         let listener = TcpListener::bind("127.0.0.1:0").await?;
     32         let addr = listener.local_addr()?;
     33         let url = format!("ws://{addr}");
     34         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
     35 
     36         tokio::spawn(async move {
     37             loop {
     38                 tokio::select! {
     39                     _ = &mut shutdown_rx => break,
     40                     accept = listener.accept() => {
     41                         let Ok((stream, _)) = accept else {
     42                             break;
     43                         };
     44                         tokio::spawn(async move {
     45                             let _ = tokio_tungstenite::accept_async(stream).await;
     46                         });
     47                     }
     48                 }
     49             }
     50         });
     51 
     52         Ok(Self {
     53             url,
     54             shutdown_tx: Some(shutdown_tx),
     55         })
     56     }
     57 
     58     fn url(&self) -> &str {
     59         self.url.as_str()
     60     }
     61 }
     62 
     63 impl Drop for TestRelay {
     64     fn drop(&mut self) {
     65         if let Some(shutdown_tx) = self.shutdown_tx.take() {
     66             let _ = shutdown_tx.send(());
     67         }
     68     }
     69 }
     70 
     71 struct HangingRelay {
     72     url: String,
     73     shutdown_tx: Option<oneshot::Sender<()>>,
     74 }
     75 
     76 impl HangingRelay {
     77     async fn spawn(hold_open_for: Duration) -> TestResult<Self> {
     78         let listener = TcpListener::bind("127.0.0.1:0").await?;
     79         let addr = listener.local_addr()?;
     80         let url = format!("ws://{addr}");
     81         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
     82 
     83         tokio::spawn(async move {
     84             loop {
     85                 tokio::select! {
     86                     _ = &mut shutdown_rx => break,
     87                     accept = listener.accept() => {
     88                         let Ok((stream, _)) = accept else {
     89                             break;
     90                         };
     91                         tokio::spawn(async move {
     92                             sleep(hold_open_for).await;
     93                             drop(stream);
     94                         });
     95                     }
     96                 }
     97             }
     98         });
     99 
    100         Ok(Self {
    101             url,
    102             shutdown_tx: Some(shutdown_tx),
    103         })
    104     }
    105 
    106     fn url(&self) -> &str {
    107         self.url.as_str()
    108     }
    109 }
    110 
    111 impl Drop for HangingRelay {
    112     fn drop(&mut self) {
    113         if let Some(shutdown_tx) = self.shutdown_tx.take() {
    114             let _ = shutdown_tx.send(());
    115         }
    116     }
    117 }
    118 
    119 fn write_test_identity(path: &Path, secret_key: &str) {
    120     let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret");
    121     myc::identity_files::store_encrypted_identity(path, &identity).expect("write identity");
    122 }
    123 
    124 fn signed_delivery_event(identity: &MycActiveIdentity, content: &str) -> nostr::Event {
    125     identity
    126         .sign_event_builder(
    127             RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), content),
    128             "operability delivery test event",
    129         )
    130         .expect("sign event")
    131 }
    132 
    133 fn now_unix_secs() -> u64 {
    134     SystemTime::now()
    135         .duration_since(UNIX_EPOCH)
    136         .expect("system time")
    137         .as_secs()
    138 }
    139 
    140 fn build_runtime<F>(configure: F) -> MycRuntime
    141 where
    142     F: FnOnce(&mut MycConfig),
    143 {
    144     let temp = tempfile::tempdir().expect("tempdir").keep();
    145     let mut config = MycConfig::default();
    146     config.paths.state_dir = PathBuf::from(&temp).join("state");
    147     config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json");
    148     config.paths.user_identity_path = PathBuf::from(&temp).join("user.json");
    149     config.transport.connect_timeout_secs = 1;
    150     write_test_identity(
    151         &config.paths.signer_identity_path,
    152         "1111111111111111111111111111111111111111111111111111111111111111",
    153     );
    154     write_test_identity(
    155         &config.paths.user_identity_path,
    156         "2222222222222222222222222222222222222222222222222222222222222222",
    157     );
    158     configure(&mut config);
    159     MycRuntime::bootstrap(config).expect("runtime")
    160 }
    161 
    162 #[tokio::test]
    163 async fn status_is_unready_when_transport_is_disabled() -> TestResult<()> {
    164     let runtime = build_runtime(|_| {});
    165 
    166     let status = collect_status_full(&runtime).await?;
    167 
    168     assert_eq!(status.status, MycRuntimeStatus::Unready);
    169     assert!(!status.ready);
    170     assert_eq!(status.transport.status, MycRuntimeStatus::Unready);
    171     assert!(
    172         status
    173             .reasons
    174             .iter()
    175             .any(|reason| reason == "transport is disabled")
    176     );
    177     Ok(())
    178 }
    179 
    180 #[tokio::test]
    181 async fn status_is_degraded_but_ready_when_any_policy_has_one_live_relay() -> TestResult<()> {
    182     let relay = TestRelay::spawn().await?;
    183     let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?;
    184     let runtime = build_runtime(|config| {
    185         config.transport.enabled = true;
    186         config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()];
    187         config.transport.delivery_policy = MycTransportDeliveryPolicy::Any;
    188     });
    189 
    190     let status = collect_status_full(&runtime).await?;
    191 
    192     assert_eq!(status.status, MycRuntimeStatus::Degraded);
    193     assert!(status.ready);
    194     assert_eq!(status.transport.status, MycRuntimeStatus::Degraded);
    195     assert_eq!(status.transport.available_relay_count, 1);
    196     assert_eq!(status.transport.unavailable_relay_count, 1);
    197     Ok(())
    198 }
    199 
    200 #[tokio::test]
    201 async fn status_is_unready_when_all_policy_cannot_be_satisfied() -> TestResult<()> {
    202     let relay = TestRelay::spawn().await?;
    203     let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?;
    204     let runtime = build_runtime(|config| {
    205         config.transport.enabled = true;
    206         config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()];
    207         config.transport.delivery_policy = MycTransportDeliveryPolicy::All;
    208     });
    209 
    210     let status = collect_status_full(&runtime).await?;
    211 
    212     assert_eq!(status.status, MycRuntimeStatus::Unready);
    213     assert!(!status.ready);
    214     assert_eq!(status.transport.status, MycRuntimeStatus::Unready);
    215     assert_eq!(status.transport.available_relay_count, 1);
    216     assert_eq!(status.transport.required_available_relays, 2);
    217     Ok(())
    218 }
    219 
    220 #[tokio::test]
    221 async fn status_is_unready_when_critical_delivery_job_is_blocked() -> TestResult<()> {
    222     let relay = TestRelay::spawn().await?;
    223     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
    224     let runtime = build_runtime(|config| {
    225         config.transport.enabled = true;
    226         config.transport.relays = vec![relay.url().to_owned()];
    227     });
    228     let client_identity = RadrootsIdentity::from_secret_key_str(
    229         "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
    230     )?;
    231     let manager = runtime.signer_manager()?;
    232     let connection = manager.register_connection(
    233         RadrootsNostrSignerConnectionDraft::new(
    234             client_identity.public_key(),
    235             runtime.user_public_identity(),
    236         )
    237         .with_connect_secret("blocked-secret")
    238         .with_relays(vec![relay_url.clone()])
    239         .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
    240     )?;
    241     let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?;
    242     let outbox_record = MycDeliveryOutboxRecord::new(
    243         MycDeliveryOutboxKind::ListenerResponsePublish,
    244         signed_delivery_event(runtime.signer_identity(), "blocked-listener"),
    245         vec![relay_url],
    246     )?
    247     .with_connection_id(&connection.connection_id)
    248     .with_request_id("blocked-request")
    249     .with_signer_publish_workflow_id(&workflow.workflow_id);
    250     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
    251     manager.cancel_publish_workflow(&workflow.workflow_id)?;
    252 
    253     let status = collect_status_full(&runtime).await?;
    254 
    255     assert_eq!(status.transport.status, MycRuntimeStatus::Healthy);
    256     assert_eq!(status.status, MycRuntimeStatus::Unready);
    257     assert!(!status.ready);
    258     assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Unready);
    259     assert!(!status.delivery_outbox.ready);
    260     assert_eq!(status.delivery_outbox.unfinished_job_count, 1);
    261     assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 1);
    262     assert_eq!(status.delivery_outbox.blocked_job_count, 1);
    263     assert_eq!(status.delivery_outbox.critical_blocked_job_count, 1);
    264     assert!(
    265         status
    266             .reasons
    267             .iter()
    268             .any(|reason| reason == "1 critical delivery outbox job(s) are blocked")
    269     );
    270     Ok(())
    271 }
    272 
    273 #[tokio::test]
    274 async fn status_is_degraded_but_ready_when_only_discovery_job_is_stuck() -> TestResult<()> {
    275     let relay = TestRelay::spawn().await?;
    276     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
    277     let runtime = build_runtime(|config| {
    278         config.transport.enabled = true;
    279         config.transport.relays = vec![relay.url().to_owned()];
    280         config.transport.connect_timeout_secs = 1;
    281     });
    282     let mut outbox_record = MycDeliveryOutboxRecord::new(
    283         MycDeliveryOutboxKind::DiscoveryHandlerPublish,
    284         signed_delivery_event(runtime.signer_identity(), "stuck-discovery"),
    285         vec![relay_url],
    286     )?
    287     .with_attempt_id("discovery-attempt-1");
    288     let old_timestamp = now_unix_secs().saturating_sub(30);
    289     outbox_record.created_at_unix = old_timestamp;
    290     outbox_record.updated_at_unix = old_timestamp;
    291     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
    292 
    293     let status = collect_status_full(&runtime).await?;
    294 
    295     assert_eq!(status.transport.status, MycRuntimeStatus::Healthy);
    296     assert_eq!(status.status, MycRuntimeStatus::Degraded);
    297     assert!(status.ready);
    298     assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Degraded);
    299     assert!(status.delivery_outbox.ready);
    300     assert_eq!(status.delivery_outbox.unfinished_job_count, 1);
    301     assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 0);
    302     assert_eq!(status.delivery_outbox.blocked_job_count, 1);
    303     assert_eq!(status.delivery_outbox.critical_blocked_job_count, 0);
    304     assert_eq!(status.delivery_outbox.oldest_blocked_age_secs, Some(30));
    305     assert!(
    306         status
    307             .reasons
    308             .iter()
    309             .any(|reason| reason == "1 non-critical delivery outbox job(s) are blocked")
    310     );
    311     Ok(())
    312 }
    313 
    314 #[tokio::test]
    315 async fn status_surfaces_last_delivery_recovery_result() -> TestResult<()> {
    316     let runtime = build_runtime(|_| {});
    317     runtime.record_operation_audit(&MycOperationAuditRecord::new(
    318         MycOperationAuditKind::DeliveryRecovery,
    319         MycOperationAuditOutcome::Succeeded,
    320         None,
    321         None,
    322         2,
    323         2,
    324         "recovered 2/2 delivery outbox job(s); republished 1",
    325     ));
    326 
    327     let status = collect_status_full(&runtime).await?;
    328     let last_recovery = status
    329         .delivery_outbox
    330         .last_recovery
    331         .expect("last delivery recovery");
    332 
    333     assert_eq!(last_recovery.outcome, MycOperationAuditOutcome::Succeeded);
    334     assert_eq!(
    335         last_recovery.summary,
    336         "recovered 2/2 delivery outbox job(s); republished 1"
    337     );
    338     Ok(())
    339 }
    340 
    341 #[tokio::test]
    342 async fn status_reports_sqlite_persistence_schema_state() -> TestResult<()> {
    343     let runtime = build_runtime(|config| {
    344         config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite;
    345         config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite;
    346     });
    347 
    348     let status = collect_status_full(&runtime).await?;
    349 
    350     assert_eq!(
    351         status.persistence.signer_state.backend,
    352         MycSignerStateBackend::Sqlite
    353     );
    354     assert!(status.persistence.signer_state.exists);
    355     assert_eq!(
    356         status
    357             .persistence
    358             .signer_state
    359             .sqlite_schema
    360             .as_ref()
    361             .expect("signer sqlite schema")
    362             .applied_migration_count,
    363         Some(2)
    364     );
    365     assert_eq!(
    366         status
    367             .persistence
    368             .signer_state
    369             .sqlite_schema
    370             .as_ref()
    371             .expect("signer sqlite schema")
    372             .journal_mode
    373             .as_deref(),
    374         Some("wal")
    375     );
    376     assert_eq!(
    377         status
    378             .persistence
    379             .signer_state
    380             .sqlite_schema
    381             .as_ref()
    382             .expect("signer sqlite schema")
    383             .store_version,
    384         Some(1)
    385     );
    386     assert_eq!(
    387         status.persistence.runtime_audit.backend,
    388         MycRuntimeAuditBackend::Sqlite
    389     );
    390     assert!(status.persistence.runtime_audit.exists);
    391     assert_eq!(
    392         status
    393             .persistence
    394             .runtime_audit
    395             .sqlite_schema
    396             .as_ref()
    397             .expect("audit sqlite schema")
    398             .applied_migration_count,
    399         Some(1)
    400     );
    401     assert_eq!(
    402         status
    403             .persistence
    404             .runtime_audit
    405             .sqlite_schema
    406             .as_ref()
    407             .expect("audit sqlite schema")
    408             .latest_migration
    409             .as_deref(),
    410         Some("0000_runtime_audit_init")
    411     );
    412     assert_eq!(
    413         status
    414             .persistence
    415             .runtime_audit
    416             .sqlite_schema
    417             .as_ref()
    418             .expect("audit sqlite schema")
    419             .journal_mode
    420             .as_deref(),
    421         Some("wal")
    422     );
    423     Ok(())
    424 }