commit a1cae260eb13e7344e78743e704af6bd18e5a996
parent b596a53b6de7441ee1f817dc485568ce7f044723
Author: triesap <tyson@radroots.org>
Date: Sun, 22 Mar 2026 13:55:14 +0000
discovery: add relay provenance to live handler sync
- query discovery relays individually instead of pooling live handler fetches
- carry source relay provenance through grouped live events and grouped handler state
- add per-relay comparison outputs and compact relay summaries for diff and refresh
- validate with cargo metadata --format-version 1 --no-deps cargo check --locked cargo test --locked and cargo fmt --check
Diffstat:
2 files changed, 187 insertions(+), 25 deletions(-)
diff --git a/src/discovery.rs b/src/discovery.rs
@@ -104,21 +104,47 @@ pub struct MycNormalizedNip89Handler {
pub struct MycLiveNip89Event {
pub event_id_hex: String,
pub created_at_unix: u64,
+ pub source_relays: Vec<String>,
pub handler: MycNormalizedNip89Handler,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MycLiveNip89Group {
pub handler: MycNormalizedNip89Handler,
+ pub source_relays: Vec<String>,
pub events: Vec<MycLiveNip89Event>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
+pub struct MycLiveNip89RelayState {
+ pub relay_url: String,
+ pub live_groups: Vec<MycLiveNip89Group>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
+pub struct MycDiscoveryRelayState {
+ pub relay_url: String,
+ pub status: MycDiscoveryLiveStatus,
+ pub differing_fields: Vec<String>,
+ pub live_groups: Vec<MycLiveNip89Group>,
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)]
+pub struct MycDiscoveryRelaySummary {
+ pub total_relays: usize,
+ pub missing_relays: Vec<String>,
+ pub matched_relays: Vec<String>,
+ pub drifted_relays: Vec<String>,
+ pub conflicted_relays: Vec<String>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct MycFetchedLiveNip89Output {
pub author_public_key_hex: String,
pub publish_relays: Vec<String>,
pub handler_identifier: String,
pub live_groups: Vec<MycLiveNip89Group>,
+ pub relay_states: Vec<MycLiveNip89RelayState>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@@ -126,6 +152,8 @@ pub struct MycDiscoveryDiffOutput {
pub status: MycDiscoveryLiveStatus,
pub local_handler: MycNormalizedNip89Handler,
pub live_groups: Vec<MycLiveNip89Group>,
+ pub relay_states: Vec<MycDiscoveryRelayState>,
+ pub relay_summary: MycDiscoveryRelaySummary,
pub differing_fields: Vec<String>,
}
@@ -135,9 +163,23 @@ pub struct MycRefreshedNip89Output {
pub force: bool,
pub differing_fields: Vec<String>,
pub live_groups: Vec<MycLiveNip89Group>,
+ pub relay_states: Vec<MycDiscoveryRelayState>,
+ pub relay_summary: MycDiscoveryRelaySummary,
pub published: Option<MycPublishedNip89Output>,
}
+#[derive(Debug, Clone)]
+struct MycSourcedLiveNip89Event {
+ source_relay: String,
+ event: RadrootsNostrEvent,
+}
+
+#[derive(Debug, Clone)]
+struct MycFetchedLiveNip89State {
+ live_groups: Vec<MycLiveNip89Group>,
+ relay_states: Vec<MycLiveNip89RelayState>,
+}
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MycNip89HandlerDocument {
pub kinds: Vec<u32>,
@@ -462,7 +504,7 @@ pub async fn publish_nip89_event(
pub async fn fetch_live_nip89(runtime: &MycRuntime) -> Result<MycFetchedLiveNip89Output, MycError> {
let context = MycDiscoveryContext::from_runtime(runtime)?;
- let live_groups = fetch_live_nip89_groups(&context).await?;
+ let fetched = fetch_live_nip89_state(&context).await?;
Ok(MycFetchedLiveNip89Output {
author_public_key_hex: context.app_identity().public_key_hex(),
publish_relays: context
@@ -471,19 +513,25 @@ pub async fn fetch_live_nip89(runtime: &MycRuntime) -> Result<MycFetchedLiveNip8
.map(ToString::to_string)
.collect(),
handler_identifier: context.handler_identifier().to_owned(),
- live_groups,
+ live_groups: fetched.live_groups,
+ relay_states: fetched.relay_states,
})
}
pub async fn diff_live_nip89(runtime: &MycRuntime) -> Result<MycDiscoveryDiffOutput, MycError> {
let context = MycDiscoveryContext::from_runtime(runtime)?;
let local_handler = context.render_normalized_nip89_handler();
- let live_groups = fetch_live_nip89_groups(&context).await?;
+ let fetched = fetch_live_nip89_state(&context).await?;
+ let relay_states = build_relay_diffs(&local_handler, &fetched.relay_states);
+ let relay_summary = summarize_relay_diffs(&relay_states);
+ let live_groups = fetched.live_groups;
let (status, differing_fields) = compare_live_handler(&local_handler, &live_groups);
Ok(MycDiscoveryDiffOutput {
status,
local_handler,
live_groups,
+ relay_states,
+ relay_summary,
differing_fields,
})
}
@@ -494,11 +542,15 @@ pub async fn refresh_nip89(
) -> Result<MycRefreshedNip89Output, MycError> {
let context = MycDiscoveryContext::from_runtime(runtime)?;
let local_handler = context.render_normalized_nip89_handler();
- let live_groups = fetch_live_nip89_groups(&context).await?;
+ let fetched = fetch_live_nip89_state(&context).await?;
+ let relay_states = build_relay_diffs(&local_handler, &fetched.relay_states);
+ let relay_summary = summarize_relay_diffs(&relay_states);
+ let live_groups = fetched.live_groups;
let (status, differing_fields) = compare_live_handler(&local_handler, &live_groups);
let relay_count = context.publish_relays().len();
let compare_request_id = latest_live_event_id(&live_groups);
- let compare_summary = describe_compare_status(status, &differing_fields, &live_groups);
+ let compare_summary =
+ describe_compare_status(status, &differing_fields, &live_groups, &relay_summary);
runtime.record_operation_audit(&MycOperationAuditRecord::new(
MycOperationAuditKind::DiscoveryHandlerCompare,
@@ -542,6 +594,8 @@ pub async fn refresh_nip89(
force,
differing_fields,
live_groups,
+ relay_states,
+ relay_summary,
published: None,
});
}
@@ -552,6 +606,8 @@ pub async fn refresh_nip89(
force,
differing_fields,
live_groups,
+ relay_states,
+ relay_summary,
published: Some(published),
})
}
@@ -578,13 +634,34 @@ pub fn verify_bundle(output_dir: impl AsRef<Path>) -> Result<MycDiscoveryBundleO
Ok(bundle)
}
-async fn fetch_live_nip89_groups(
+async fn fetch_live_nip89_state(
context: &MycDiscoveryContext,
-) -> Result<Vec<MycLiveNip89Group>, MycError> {
- let client = RadrootsNostrClient::from_identity(context.app_identity());
+) -> Result<MycFetchedLiveNip89State, MycError> {
+ let mut relay_states = Vec::new();
+ let mut all_events = Vec::new();
+
for relay in context.publish_relays() {
- let _ = client.add_relay(relay.as_str()).await?;
+ let relay_url = relay.to_string();
+ let relay_events = fetch_live_nip89_events_for_relay(context, relay).await?;
+ all_events.extend(relay_events.iter().cloned());
+ relay_states.push(MycLiveNip89RelayState {
+ relay_url,
+ live_groups: group_live_nip89_events(relay_events)?,
+ });
}
+
+ Ok(MycFetchedLiveNip89State {
+ live_groups: group_live_nip89_events(all_events)?,
+ relay_states,
+ })
+}
+
+async fn fetch_live_nip89_events_for_relay(
+ context: &MycDiscoveryContext,
+ relay: &RadrootsNostrRelayUrl,
+) -> Result<Vec<MycSourcedLiveNip89Event>, MycError> {
+ let client = RadrootsNostrClient::from_identity(context.app_identity());
+ let _ = client.add_relay(relay.as_str()).await?;
client.connect().await;
client
.wait_for_connection(Duration::from_secs(context.connect_timeout_secs()))
@@ -605,7 +682,13 @@ async fn fetch_live_nip89_groups(
.cmp(&right.created_at.as_secs())
.then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
});
- group_live_nip89_events(events)
+ Ok(events
+ .into_iter()
+ .map(|event| MycSourcedLiveNip89Event {
+ source_relay: relay.to_string(),
+ event,
+ })
+ .collect())
}
fn compare_live_handler(
@@ -667,6 +750,7 @@ fn describe_compare_status(
status: MycDiscoveryLiveStatus,
differing_fields: &[String],
live_groups: &[MycLiveNip89Group],
+ relay_summary: &MycDiscoveryRelaySummary,
) -> String {
match status {
MycDiscoveryLiveStatus::Missing => {
@@ -680,12 +764,16 @@ fn describe_compare_status(
differing_fields.join(", ")
),
MycDiscoveryLiveStatus::Conflicted => format!(
- "found {} conflicting live NIP-89 handler states across {} events",
+ "found {} conflicting live NIP-89 handler states across {} events (matched relays: {}, drifted relays: {}, missing relays: {}, conflicted relays: {})",
live_groups.len(),
live_groups
.iter()
.map(|group| group.events.len())
- .sum::<usize>()
+ .sum::<usize>(),
+ relay_summary.matched_relays.len(),
+ relay_summary.drifted_relays.len(),
+ relay_summary.missing_relays.len(),
+ relay_summary.conflicted_relays.len(),
),
}
}
@@ -768,34 +856,62 @@ fn normalize_live_nip89_handler(
}
fn group_live_nip89_events(
- events: Vec<RadrootsNostrEvent>,
+ events: Vec<MycSourcedLiveNip89Event>,
) -> Result<Vec<MycLiveNip89Group>, MycError> {
let mut groups = Vec::<MycLiveNip89Group>::new();
- for event in events {
+ for sourced_event in events {
+ let handler = normalize_live_nip89_handler(&sourced_event.event)?;
+ let source_relay = sourced_event.source_relay;
let live_event = MycLiveNip89Event {
- event_id_hex: event.id.to_hex(),
- created_at_unix: event.created_at.as_secs(),
- handler: normalize_live_nip89_handler(&event)?,
+ event_id_hex: sourced_event.event.id.to_hex(),
+ created_at_unix: sourced_event.event.created_at.as_secs(),
+ source_relays: vec![source_relay.clone()],
+ handler: handler.clone(),
};
- if let Some(existing) = groups
- .iter_mut()
- .find(|group| group.handler == live_event.handler)
- {
- existing.events.push(live_event);
+ if let Some(existing_group) = groups.iter_mut().find(|group| group.handler == handler) {
+ if let Some(existing_event) = existing_group
+ .events
+ .iter_mut()
+ .find(|event| event.event_id_hex == live_event.event_id_hex)
+ {
+ existing_event.source_relays = normalize_string_list(
+ existing_event
+ .source_relays
+ .iter()
+ .cloned()
+ .chain(std::iter::once(source_relay.clone()))
+ .collect(),
+ );
+ } else {
+ existing_group.events.push(live_event);
+ }
+ existing_group.source_relays = normalize_string_list(
+ existing_group
+ .source_relays
+ .iter()
+ .cloned()
+ .chain(std::iter::once(source_relay))
+ .collect(),
+ );
} else {
groups.push(MycLiveNip89Group {
- handler: live_event.handler.clone(),
+ handler: handler.clone(),
+ source_relays: vec![source_relay],
events: vec![live_event],
});
}
}
for group in &mut groups {
+ group.source_relays = normalize_string_list(group.source_relays.clone());
group.events.sort_by(|left, right| {
left.created_at_unix
.cmp(&right.created_at_unix)
.then_with(|| left.event_id_hex.cmp(&right.event_id_hex))
});
+ for event in &mut group.events {
+ event.source_relays = normalize_string_list(event.source_relays.clone());
+ }
}
groups.sort_by(|left, right| {
@@ -812,6 +928,51 @@ fn group_live_nip89_events(
Ok(groups)
}
+fn build_relay_diffs(
+ local_handler: &MycNormalizedNip89Handler,
+ relay_states: &[MycLiveNip89RelayState],
+) -> Vec<MycDiscoveryRelayState> {
+ relay_states
+ .iter()
+ .map(|relay_state| {
+ let (status, differing_fields) =
+ compare_live_handler(local_handler, &relay_state.live_groups);
+ MycDiscoveryRelayState {
+ relay_url: relay_state.relay_url.clone(),
+ status,
+ differing_fields,
+ live_groups: relay_state.live_groups.clone(),
+ }
+ })
+ .collect()
+}
+
+fn summarize_relay_diffs(relay_states: &[MycDiscoveryRelayState]) -> MycDiscoveryRelaySummary {
+ let mut summary = MycDiscoveryRelaySummary {
+ total_relays: relay_states.len(),
+ ..MycDiscoveryRelaySummary::default()
+ };
+
+ for relay_state in relay_states {
+ match relay_state.status {
+ MycDiscoveryLiveStatus::Missing => {
+ summary.missing_relays.push(relay_state.relay_url.clone())
+ }
+ MycDiscoveryLiveStatus::Matched => {
+ summary.matched_relays.push(relay_state.relay_url.clone())
+ }
+ MycDiscoveryLiveStatus::Drifted => {
+ summary.drifted_relays.push(relay_state.relay_url.clone())
+ }
+ MycDiscoveryLiveStatus::Conflicted => summary
+ .conflicted_relays
+ .push(relay_state.relay_url.clone()),
+ }
+ }
+
+ summary
+}
+
fn latest_group_sort_key(group: &MycLiveNip89Group) -> (u64, &str) {
group
.events
diff --git a/src/lib.rs b/src/lib.rs
@@ -23,8 +23,9 @@ pub use config::{
pub use control::{MycAcceptedConnectionOutput, MycAuthorizedReplayOutput};
pub use discovery::{
MycDiscoveryBundleManifest, MycDiscoveryBundleOutput, MycDiscoveryContext,
- MycDiscoveryDiffOutput, MycDiscoveryLiveStatus, MycFetchedLiveNip89Output, MycLiveNip89Event,
- MycLiveNip89Group, MycNip05Document, MycNip05DocumentSection, MycNip89HandlerDocument,
+ MycDiscoveryDiffOutput, MycDiscoveryLiveStatus, MycDiscoveryRelayState,
+ MycDiscoveryRelaySummary, MycFetchedLiveNip89Output, MycLiveNip89Event, MycLiveNip89Group,
+ MycLiveNip89RelayState, MycNip05Document, MycNip05DocumentSection, MycNip89HandlerDocument,
MycNormalizedNip89Handler, MycPublishedNip89Output, MycRefreshedNip89Output,
MycRenderedNip05Output, MycRenderedNip89Output, diff_live_nip89, fetch_live_nip89,
publish_nip89_event, refresh_nip89, render_nip05_output, verify_bundle,