commit cbf5c7eb3e9241e32e067dbbbbd41ba344f6e2e7
parent 6d7b25daf0460bec600ba4394324033799c241e5
Author: triesap <tyson@radroots.org>
Date: Sun, 22 Mar 2026 15:20:55 +0000
discovery: parallelize live relay fetch
- fetch discovery relays concurrently with a bounded internal worker limit
- preserve deterministic normalized relay ordering and existing availability semantics
- add relay-backed proof that mixed relay latency stays bounded without losing order
- validate with cargo metadata --format-version 1 --no-deps, cargo fmt --all --check, cargo check --locked, and cargo test --locked
Diffstat:
| M | src/discovery.rs | | | 116 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------- |
| M | tests/nip46_e2e.rs | | | 148 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- |
2 files changed, 240 insertions(+), 24 deletions(-)
diff --git a/src/discovery.rs b/src/discovery.rs
@@ -12,6 +12,7 @@ use radroots_nostr::prelude::{
};
use radroots_nostr_connect::prelude::{RadrootsNostrConnectBunkerUri, RadrootsNostrConnectUri};
use serde::{Deserialize, Serialize};
+use tokio::task::JoinSet;
use crate::app::MycRuntime;
use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
@@ -24,6 +25,7 @@ const DISCOVERY_BUNDLE_VERSION: u32 = 1;
const DISCOVERY_BUNDLE_MANIFEST_FILE_NAME: &str = "bundle.json";
const DISCOVERY_BUNDLE_NIP89_FILE_NAME: &str = "nip89-handler.json";
const DISCOVERY_BUNDLE_NIP05_RELATIVE_PATH: &str = ".well-known/nostr.json";
+const DISCOVERY_RELAY_FETCH_CONCURRENCY_LIMIT: usize = 8;
#[derive(Clone)]
pub struct MycDiscoveryContext {
@@ -195,6 +197,13 @@ struct MycFetchedLiveNip89State {
relay_states: Vec<MycLiveNip89RelayState>,
}
+#[derive(Debug)]
+struct MycRelayFetchTaskOutput {
+ relay_index: usize,
+ relay_events: Vec<MycSourcedLiveNip89Event>,
+ relay_state: MycLiveNip89RelayState,
+}
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MycNip89HandlerDocument {
pub kinds: Vec<u32>,
@@ -719,32 +728,53 @@ pub fn verify_bundle(output_dir: impl AsRef<Path>) -> Result<MycDiscoveryBundleO
async fn fetch_live_nip89_state(
context: &MycDiscoveryContext,
) -> Result<MycFetchedLiveNip89State, MycError> {
- let mut relay_states = Vec::new();
- let mut all_events = Vec::new();
+ let relay_count = context.publish_relays().len();
+ let mut pending = context
+ .publish_relays()
+ .iter()
+ .cloned()
+ .enumerate()
+ .collect::<Vec<_>>()
+ .into_iter();
+ let mut join_set = JoinSet::new();
+ let max_concurrency = relay_count.min(DISCOVERY_RELAY_FETCH_CONCURRENCY_LIMIT);
+
+ while join_set.len() < max_concurrency {
+ let Some((relay_index, relay)) = pending.next() else {
+ break;
+ };
+ spawn_live_nip89_relay_fetch(&mut join_set, context.clone(), relay_index, relay);
+ }
- for relay in context.publish_relays() {
- let relay_url = relay.to_string();
- match fetch_live_nip89_events_for_relay(context, relay).await {
- Ok(relay_events) => {
- all_events.extend(relay_events.iter().cloned());
- relay_states.push(MycLiveNip89RelayState {
- relay_url,
- fetch_status: MycDiscoveryRelayFetchStatus::Available,
- fetch_error: None,
- live_groups: group_live_nip89_events(relay_events)?,
- });
- }
- Err(error) => {
- relay_states.push(MycLiveNip89RelayState {
- relay_url,
- fetch_status: MycDiscoveryRelayFetchStatus::Unavailable,
- fetch_error: Some(error.to_string()),
- live_groups: Vec::new(),
- });
- }
+ let mut fetched = std::iter::repeat_with(|| None)
+ .take(relay_count)
+ .collect::<Vec<Option<MycRelayFetchTaskOutput>>>();
+
+ while let Some(joined) = join_set.join_next().await {
+ let output = joined.map_err(|error| {
+ MycError::InvalidOperation(format!("discovery relay fetch task failed: {error}"))
+ })??;
+ let relay_index = output.relay_index;
+ fetched[relay_index] = Some(output);
+
+ while join_set.len() < max_concurrency {
+ let Some((relay_index, relay)) = pending.next() else {
+ break;
+ };
+ spawn_live_nip89_relay_fetch(&mut join_set, context.clone(), relay_index, relay);
}
}
+ let mut relay_states = Vec::with_capacity(relay_count);
+ let mut all_events = Vec::new();
+ for fetched_relay in fetched {
+ let fetched_relay = fetched_relay.ok_or_else(|| {
+ MycError::InvalidOperation("missing discovery relay fetch result".to_owned())
+ })?;
+ all_events.extend(fetched_relay.relay_events.into_iter());
+ relay_states.push(fetched_relay.relay_state);
+ }
+
let available_relay_count = relay_states
.iter()
.filter(|relay_state| relay_state.fetch_status == MycDiscoveryRelayFetchStatus::Available)
@@ -762,6 +792,48 @@ async fn fetch_live_nip89_state(
})
}
+fn spawn_live_nip89_relay_fetch(
+ join_set: &mut JoinSet<Result<MycRelayFetchTaskOutput, MycError>>,
+ context: MycDiscoveryContext,
+ relay_index: usize,
+ relay: RadrootsNostrRelayUrl,
+) {
+ join_set.spawn(async move { fetch_live_nip89_relay_state(&context, relay_index, relay).await });
+}
+
+async fn fetch_live_nip89_relay_state(
+ context: &MycDiscoveryContext,
+ relay_index: usize,
+ relay: RadrootsNostrRelayUrl,
+) -> Result<MycRelayFetchTaskOutput, MycError> {
+ let relay_url = relay.to_string();
+ match fetch_live_nip89_events_for_relay(context, &relay).await {
+ Ok(relay_events) => {
+ let live_groups = group_live_nip89_events(relay_events.clone())?;
+ Ok(MycRelayFetchTaskOutput {
+ relay_index,
+ relay_events,
+ relay_state: MycLiveNip89RelayState {
+ relay_url,
+ fetch_status: MycDiscoveryRelayFetchStatus::Available,
+ fetch_error: None,
+ live_groups,
+ },
+ })
+ }
+ Err(error) => Ok(MycRelayFetchTaskOutput {
+ relay_index,
+ relay_events: Vec::new(),
+ relay_state: MycLiveNip89RelayState {
+ relay_url,
+ fetch_status: MycDiscoveryRelayFetchStatus::Unavailable,
+ fetch_error: Some(error.to_string()),
+ live_groups: Vec::new(),
+ },
+ }),
+ }
+}
+
async fn fetch_live_nip89_events_for_relay(
context: &MycDiscoveryContext,
relay: &RadrootsNostrRelayUrl,
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -35,7 +35,7 @@ use radroots_nostr_signer::prelude::{
use tempfile::TempDir;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, Notify, mpsc, oneshot};
-use tokio::time::{sleep, timeout};
+use tokio::time::{Instant, sleep, timeout};
use tokio_tungstenite::tungstenite::Message;
type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
@@ -162,6 +162,54 @@ impl Drop for TestRelay {
}
}
+struct HangingRelay {
+ url: String,
+ shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl HangingRelay {
+ async fn spawn(hold_open_for: Duration) -> TestResult<Self> {
+ let listener = TcpListener::bind("127.0.0.1:0").await?;
+ let addr = listener.local_addr()?;
+ let url = format!("ws://{addr}");
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = &mut shutdown_rx => break,
+ accept = listener.accept() => {
+ let Ok((stream, _)) = accept else {
+ break;
+ };
+ tokio::spawn(async move {
+ sleep(hold_open_for).await;
+ drop(stream);
+ });
+ }
+ }
+ }
+ });
+
+ Ok(Self {
+ url,
+ shutdown_tx: Some(shutdown_tx),
+ })
+ }
+
+ fn url(&self) -> &str {
+ self.url.as_str()
+ }
+}
+
+impl Drop for HangingRelay {
+ fn drop(&mut self) {
+ if let Some(shutdown_tx) = self.shutdown_tx.take() {
+ let _ = shutdown_tx.send(());
+ }
+ }
+}
+
async fn handle_relay_connection(
stream: TcpStream,
state: Arc<Mutex<RelayState>>,
@@ -367,13 +415,21 @@ impl MycTestRuntime {
}
fn new_with_discovery_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self {
+ Self::new_with_discovery_relays_and_timeout(relay_urls, approval, 1)
+ }
+
+ fn new_with_discovery_relays_and_timeout(
+ relay_urls: &[&str],
+ approval: MycConnectionApproval,
+ connect_timeout_secs: u64,
+ ) -> Self {
let temp = tempfile::tempdir().expect("tempdir");
let mut config = MycConfig::default();
config.paths.state_dir = temp.path().join("state");
config.paths.signer_identity_path = temp.path().join("signer.json");
config.paths.user_identity_path = temp.path().join("user.json");
config.policy.connection_approval = approval;
- config.transport.connect_timeout_secs = 1;
+ config.transport.connect_timeout_secs = connect_timeout_secs;
config.discovery.enabled = true;
config.discovery.domain = Some("signer.example.com".to_owned());
config.discovery.public_relays =
@@ -1180,6 +1236,94 @@ async fn fetch_live_nip89_fails_when_all_discovery_relays_are_unavailable() -> T
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn fetch_live_nip89_parallelizes_relay_fetch_and_preserves_configured_order() -> TestResult<()>
+{
+ let live_relay = TestRelay::spawn().await?;
+ let slow_a = HangingRelay::spawn(Duration::from_secs(3)).await?;
+ let slow_b = HangingRelay::spawn(Duration::from_secs(3)).await?;
+ let slow_c = HangingRelay::spawn(Duration::from_secs(3)).await?;
+ let slow_d = HangingRelay::spawn(Duration::from_secs(3)).await?;
+ let relay_urls = [
+ slow_a.url(),
+ live_relay.url(),
+ slow_b.url(),
+ slow_c.url(),
+ slow_d.url(),
+ ];
+ let mut expected_relay_states = vec![
+ (
+ slow_a.url().to_owned(),
+ MycDiscoveryRelayFetchStatus::Unavailable,
+ ),
+ (
+ live_relay.url().to_owned(),
+ MycDiscoveryRelayFetchStatus::Available,
+ ),
+ (
+ slow_b.url().to_owned(),
+ MycDiscoveryRelayFetchStatus::Unavailable,
+ ),
+ (
+ slow_c.url().to_owned(),
+ MycDiscoveryRelayFetchStatus::Unavailable,
+ ),
+ (
+ slow_d.url().to_owned(),
+ MycDiscoveryRelayFetchStatus::Unavailable,
+ ),
+ ];
+ expected_relay_states.sort_by(|left, right| left.0.cmp(&right.0));
+ let expected_relay_urls = expected_relay_states
+ .iter()
+ .map(|(relay_url, _)| relay_url.clone())
+ .collect::<Vec<_>>();
+ let test_runtime = MycTestRuntime::new_with_discovery_relays_and_timeout(
+ &relay_urls,
+ MycConnectionApproval::ExplicitUser,
+ 1,
+ );
+
+ let started_at = Instant::now();
+ let output = fetch_live_nip89(&test_runtime.runtime).await?;
+ let elapsed = started_at.elapsed();
+
+ assert!(
+ elapsed < Duration::from_millis(2500),
+ "expected concurrent relay fetch to finish under 2.5s, got {:?}",
+ elapsed
+ );
+ assert_eq!(
+ output
+ .relay_states
+ .iter()
+ .map(|relay_state| relay_state.relay_url.clone())
+ .collect::<Vec<_>>(),
+ expected_relay_urls
+ );
+ assert_eq!(
+ output
+ .relay_states
+ .iter()
+ .map(|relay_state| relay_state.fetch_status)
+ .collect::<Vec<_>>(),
+ expected_relay_states
+ .iter()
+ .map(|(_, fetch_status)| *fetch_status)
+ .collect::<Vec<_>>()
+ );
+ for relay_state in &output.relay_states {
+ if relay_state.fetch_status == MycDiscoveryRelayFetchStatus::Available {
+ assert!(relay_state.fetch_error.is_none());
+ assert!(relay_state.live_groups.is_empty());
+ } else {
+ assert!(relay_state.fetch_error.is_some());
+ }
+ }
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn diff_live_nip89_reports_matched_after_publish() -> TestResult<()> {
let relay = TestRelay::spawn().await?;
let test_runtime =