commit 661c8fe4f887997e59fd413312535532c39fc35f
parent 4251f6ac71c3b9ee613b1fbbc8591978b9097df2
Author: triesap <tyson@radroots.org>
Date: Wed, 17 Jun 2026 14:03:38 -0700
sync: add store status summaries
- expose event-store and outbox status summaries for SDK sync status
- preserve relay target caller order while deduping transport targets
- remove localhost-policy reparsing from publish receipt matching
- validate with rr-rs fmt and targeted event_store/outbox/relay_transport tests
Diffstat:
9 files changed, 236 insertions(+), 27 deletions(-)
diff --git a/crates/event_store/src/lib.rs b/crates/event_store/src/lib.rs
@@ -16,9 +16,9 @@ pub use migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP};
#[cfg(feature = "sqlite")]
pub use model::{
RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest,
- RadrootsEventIngestReceipt, RadrootsEventVerificationStatus, RadrootsProjectionCursor,
- RadrootsRelayObservation, RadrootsRelayObservationType, RadrootsStoredEvent,
- RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass,
+ RadrootsEventIngestReceipt, RadrootsEventStoreStatusSummary, RadrootsEventVerificationStatus,
+ RadrootsProjectionCursor, RadrootsRelayObservation, RadrootsRelayObservationType,
+ RadrootsStoredEvent, RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass,
};
#[cfg(feature = "sqlite")]
pub use store::{
diff --git a/crates/event_store/src/model.rs b/crates/event_store/src/model.rs
@@ -240,6 +240,15 @@ pub struct RadrootsEventIngestReceipt {
}
#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RadrootsEventStoreStatusSummary {
+ pub total_events: i64,
+ pub projection_eligible_events: i64,
+ pub relay_observations: i64,
+ pub last_event_seq: Option<i64>,
+ pub last_event_updated_at_ms: Option<i64>,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RadrootsStoredEvent {
pub seq: i64,
pub event_id: String,
diff --git a/crates/event_store/src/store.rs b/crates/event_store/src/store.rs
@@ -2,9 +2,10 @@ use crate::RadrootsEventStoreError;
use crate::migrations::{EVENT_STORE_MIGRATION_DOWN, EVENT_STORE_MIGRATION_UP};
use crate::model::{
RadrootsEventContractStatus, RadrootsEventHeadStoreDecision, RadrootsEventIngest,
- RadrootsEventIngestReceipt, RadrootsEventVerificationStatus, RadrootsProjectionCursor,
- RadrootsRelayObservation, RadrootsStoredEvent, RadrootsStoredEventHead, RadrootsStoredEventTag,
- StoredEventClass, tag_semantic_name, tag_value_type_name,
+ RadrootsEventIngestReceipt, RadrootsEventStoreStatusSummary, RadrootsEventVerificationStatus,
+ RadrootsProjectionCursor, RadrootsRelayObservation, RadrootsStoredEvent,
+ RadrootsStoredEventHead, RadrootsStoredEventTag, StoredEventClass, tag_semantic_name,
+ tag_value_type_name,
};
use radroots_events::RadrootsNostrEvent;
use radroots_events::contract::{
@@ -75,6 +76,25 @@ impl RadrootsEventStore {
query_string(&self.pool, "PRAGMA journal_mode").await
}
+ pub async fn status_summary(
+ &self,
+ ) -> Result<RadrootsEventStoreStatusSummary, RadrootsEventStoreError> {
+ let row = sqlx::query(
+ "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN projection_eligible = 1 THEN 1 ELSE 0 END), 0) AS projection_eligible_events, MAX(seq) AS last_event_seq, MAX(updated_at_ms) AS last_event_updated_at_ms FROM nostr_event",
+ )
+ .fetch_one(&self.pool)
+ .await?;
+ let relay_observations =
+ query_i64(&self.pool, "SELECT COUNT(*) FROM relay_event_seen").await?;
+ Ok(RadrootsEventStoreStatusSummary {
+ total_events: row.try_get("total_events")?,
+ projection_eligible_events: row.try_get("projection_eligible_events")?,
+ relay_observations,
+ last_event_seq: row.try_get("last_event_seq")?,
+ last_event_updated_at_ms: row.try_get("last_event_updated_at_ms")?,
+ })
+ }
+
pub async fn ingest_event(
&self,
ingest: RadrootsEventIngest,
@@ -888,6 +908,45 @@ mod tests {
}
#[tokio::test]
+ async fn status_summary_counts_events_projections_and_relay_observations() {
+ let store = RadrootsEventStore::open_memory().await.expect("open");
+
+ let empty = store.status_summary().await.expect("empty status");
+ assert_eq!(empty.total_events, 0);
+ assert_eq!(empty.projection_eligible_events, 0);
+ assert_eq!(empty.relay_observations, 0);
+ assert_eq!(empty.last_event_seq, None);
+ assert_eq!(empty.last_event_updated_at_ms, None);
+
+ let event = signed_event(
+ KIND_POST,
+ 10,
+ vec![vec!["t".to_owned(), "soil".to_owned()]],
+ "hello",
+ );
+ let observation = RadrootsRelayObservation::new(
+ "wss://relay.example.com",
+ crate::RadrootsRelayObservationType::PublishAck,
+ 1_100,
+ );
+ store
+ .ingest_event(RadrootsEventIngest::new(event.clone(), 1_000))
+ .await
+ .expect("event ingest");
+ store
+ .ingest_event(RadrootsEventIngest::new(event, 1_100).with_observation(observation))
+ .await
+ .expect("observation ingest");
+
+ let status = store.status_summary().await.expect("status");
+ assert_eq!(status.total_events, 1);
+ assert_eq!(status.projection_eligible_events, 1);
+ assert_eq!(status.relay_observations, 1);
+ assert_eq!(status.last_event_seq, Some(1));
+ assert_eq!(status.last_event_updated_at_ms, Some(1_000));
+ }
+
+ #[tokio::test]
async fn file_store_reopens_existing_schema() {
let tempdir = tempfile::tempdir().expect("tempdir");
let path = tempdir.path().join("event_store.sqlite");
diff --git a/crates/outbox/src/lib.rs b/crates/outbox/src/lib.rs
@@ -12,5 +12,6 @@ pub use model::{
RadrootsOutboxEventRecord, RadrootsOutboxEventState, RadrootsOutboxEventStoreIngestReceipt,
RadrootsOutboxOperationInput, RadrootsOutboxOperationRecord, RadrootsOutboxOperationStatus,
RadrootsOutboxRelayStatus, RadrootsOutboxRelayStatusRecord, RadrootsOutboxSignedOperationInput,
+ RadrootsOutboxStatusSummary,
};
pub use store::RadrootsOutbox;
diff --git a/crates/outbox/src/model.rs b/crates/outbox/src/model.rs
@@ -273,3 +273,15 @@ pub struct RadrootsOutboxEventStoreIngestReceipt {
pub already_ingested: bool,
pub event_store_inserted: bool,
}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RadrootsOutboxStatusSummary {
+ pub total_events: i64,
+ pub pending_events: i64,
+ pub retryable_events: i64,
+ pub terminal_events: i64,
+ pub ready_signed_events: i64,
+ pub publishing_events: i64,
+ pub last_attempt_at_ms: Option<i64>,
+ pub last_error: Option<String>,
+}
diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs
@@ -7,6 +7,7 @@ use crate::model::{
RadrootsOutboxEventRecord, RadrootsOutboxEventState, RadrootsOutboxEventStoreIngestReceipt,
RadrootsOutboxOperationInput, RadrootsOutboxOperationRecord, RadrootsOutboxOperationStatus,
RadrootsOutboxRelayStatus, RadrootsOutboxRelayStatusRecord, RadrootsOutboxSignedOperationInput,
+ RadrootsOutboxStatusSummary,
};
use radroots_event_store::{RadrootsEventIngest, RadrootsEventStore};
use radroots_events::RadrootsNostrEvent;
@@ -70,6 +71,47 @@ impl RadrootsOutbox {
query_string(&self.pool, "PRAGMA journal_mode").await
}
+ pub async fn status_summary(
+ &self,
+ now_ms: i64,
+ ) -> Result<RadrootsOutboxStatusSummary, RadrootsOutboxError> {
+ let row = sqlx::query(
+ "SELECT COUNT(*) AS total_events, COALESCE(SUM(CASE WHEN state IN ('draft_queued', 'signing', 'signed', 'publishing') THEN 1 ELSE 0 END), 0) AS pending_events, COALESCE(SUM(CASE WHEN state IN ('sign_retryable', 'publish_retryable') THEN 1 ELSE 0 END), 0) AS retryable_events, COALESCE(SUM(CASE WHEN state IN ('published', 'failed_terminal', 'cancelled') THEN 1 ELSE 0 END), 0) AS terminal_events, COALESCE(SUM(CASE WHEN state = 'publishing' THEN 1 ELSE 0 END), 0) AS publishing_events FROM outbox_event",
+ )
+ .fetch_one(&self.pool)
+ .await?;
+ let ready_signed_events = sqlx::query(
+ "SELECT COUNT(*) FROM outbox_event WHERE state IN ('signed', 'publish_retryable') AND signed_event_json IS NOT NULL AND next_attempt_after_ms <= ? AND (claim_token IS NULL OR claim_expires_at_ms <= ?)",
+ )
+ .bind(now_ms)
+ .bind(now_ms)
+ .fetch_one(&self.pool)
+ .await?
+ .try_get(0)?;
+ let last_attempt_at_ms =
+ sqlx::query("SELECT MAX(last_attempt_at_ms) FROM outbox_event_relay_status")
+ .fetch_one(&self.pool)
+ .await?
+ .try_get(0)?;
+ let last_error = sqlx::query(
+ "SELECT last_error FROM outbox_event WHERE last_error IS NOT NULL ORDER BY updated_at_ms DESC, outbox_event_id DESC LIMIT 1",
+ )
+ .fetch_optional(&self.pool)
+ .await?
+ .map(|row| row.try_get("last_error"))
+ .transpose()?;
+ Ok(RadrootsOutboxStatusSummary {
+ total_events: row.try_get("total_events")?,
+ pending_events: row.try_get("pending_events")?,
+ retryable_events: row.try_get("retryable_events")?,
+ terminal_events: row.try_get("terminal_events")?,
+ ready_signed_events,
+ publishing_events: row.try_get("publishing_events")?,
+ last_attempt_at_ms,
+ last_error,
+ })
+ }
+
pub async fn enqueue_operation(
&self,
input: RadrootsOutboxOperationInput,
@@ -1291,6 +1333,89 @@ mod tests {
assert_eq!(second.pragma_foreign_keys().await.expect("foreign keys"), 1);
}
+ #[tokio::test]
+ async fn status_summary_counts_ready_publishing_retryable_and_terminal_work() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+
+ let empty = outbox.status_summary(1_000).await.expect("empty status");
+ assert_eq!(empty.total_events, 0);
+ assert_eq!(empty.pending_events, 0);
+ assert_eq!(empty.retryable_events, 0);
+ assert_eq!(empty.terminal_events, 0);
+ assert_eq!(empty.ready_signed_events, 0);
+ assert_eq!(empty.publishing_events, 0);
+ assert_eq!(empty.last_attempt_at_ms, None);
+ assert_eq!(empty.last_error, None);
+
+ let keys = fixture_keys();
+ let draft = post_draft(FIXTURE_ALICE_PUBLIC_KEY_HEX, "ready");
+ let signed_event = radroots_nostr_sign_frozen_draft(&keys, &draft).expect("signed event");
+ let receipt = outbox
+ .enqueue_signed_operation(signed_operation_input(draft, signed_event, 1_000))
+ .await
+ .expect("signed enqueue");
+ let ready = outbox.status_summary(1_000).await.expect("ready status");
+ assert_eq!(ready.total_events, 1);
+ assert_eq!(ready.pending_events, 1);
+ assert_eq!(ready.retryable_events, 0);
+ assert_eq!(ready.terminal_events, 0);
+ assert_eq!(ready.ready_signed_events, 1);
+
+ let claimed = outbox
+ .claim_next_ready_signed_event("publisher", "claim-a", 2_500, 2_000)
+ .await
+ .expect("claim")
+ .expect("claimed");
+ let publishing = outbox
+ .status_summary(2_000)
+ .await
+ .expect("publishing status");
+ assert_eq!(publishing.pending_events, 1);
+ assert_eq!(publishing.publishing_events, 1);
+ assert_eq!(publishing.ready_signed_events, 0);
+
+ outbox
+ .mark_relay_failed_retryable(
+ receipt.outbox_event_id,
+ claimed.claim_token.as_str(),
+ RELAY_PRIMARY_WSS,
+ "timeout: relay unavailable",
+ 2_100,
+ )
+ .await
+ .expect("relay failed");
+ outbox
+ .mark_publish_retryable(
+ receipt.outbox_event_id,
+ claimed.claim_token.as_str(),
+ "relay publish incomplete",
+ 3_000,
+ 2_200,
+ )
+ .await
+ .expect("retryable");
+
+ let retry_wait = outbox
+ .status_summary(2_900)
+ .await
+ .expect("retry wait status");
+ assert_eq!(retry_wait.pending_events, 0);
+ assert_eq!(retry_wait.retryable_events, 1);
+ assert_eq!(retry_wait.terminal_events, 0);
+ assert_eq!(retry_wait.ready_signed_events, 0);
+ assert_eq!(retry_wait.last_attempt_at_ms, Some(2_100));
+ assert_eq!(
+ retry_wait.last_error.as_deref(),
+ Some("relay publish incomplete")
+ );
+
+ let retry_ready = outbox
+ .status_summary(3_000)
+ .await
+ .expect("retry ready status");
+ assert_eq!(retry_ready.ready_signed_events, 1);
+ }
+
#[test]
fn terminal_and_cancelled_event_states_round_trip() {
assert_eq!(
diff --git a/crates/relay_transport/src/publish.rs b/crates/relay_transport/src/publish.rs
@@ -2,7 +2,7 @@
use crate::{
RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayTargetSet,
- RadrootsRelayTransportError, RadrootsRelayUrlPolicy,
+ RadrootsRelayTransportError,
};
use futures::future::BoxFuture;
use radroots_events::draft::RadrootsSignedNostrEvent;
@@ -239,11 +239,11 @@ impl RadrootsRelayPublishAdapter for RadrootsNostrClientPublishAdapter {
};
let mut receipts = Vec::new();
for relay_url in &target_strings {
- let relay =
- crate::RadrootsRelayUrl::parse(relay_url, RadrootsRelayUrlPolicy::Localhost)?;
- let success = output.success.iter().any(|success_url| {
- success_url.to_string().trim_end_matches('/') == relay.as_str()
- });
+ let target_url = relay_url.trim_end_matches('/');
+ let success = output
+ .success
+ .iter()
+ .any(|success_url| success_url.to_string().trim_end_matches('/') == target_url);
if success {
receipts.push(RadrootsRelayPublishRelayReceipt::attempted(
relay_url,
@@ -257,7 +257,7 @@ impl RadrootsRelayPublishAdapter for RadrootsNostrClientPublishAdapter {
continue;
}
let failed = output.failed.iter().find_map(|(failed_url, message)| {
- if failed_url.to_string().trim_end_matches('/') == relay.as_str() {
+ if failed_url.to_string().trim_end_matches('/') == target_url {
Some(message.clone())
} else {
None
diff --git a/crates/relay_transport/src/relay.rs b/crates/relay_transport/src/relay.rs
@@ -2,7 +2,6 @@
use crate::RadrootsRelayTransportError;
use serde::{Deserialize, Serialize};
-use std::collections::BTreeSet;
use std::fmt;
use url::Url;
@@ -100,12 +99,14 @@ impl RadrootsRelayTargetSet {
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
- let relays = relays
- .into_iter()
- .map(|relay| RadrootsRelayUrl::parse(relay, policy))
- .collect::<Result<BTreeSet<_>, _>>()?
- .into_iter()
- .collect::<Vec<_>>();
+ let mut ordered_relays = Vec::new();
+ for relay in relays {
+ let relay = RadrootsRelayUrl::parse(relay, policy)?;
+ if !ordered_relays.iter().any(|existing| existing == &relay) {
+ ordered_relays.push(relay);
+ }
+ }
+ let relays = ordered_relays;
if relays.is_empty() {
return Err(RadrootsRelayTransportError::EmptyTargetSet);
}
@@ -113,11 +114,13 @@ impl RadrootsRelayTargetSet {
}
pub fn from_urls(relays: Vec<RadrootsRelayUrl>) -> Result<Self, RadrootsRelayTransportError> {
- let relays = relays
- .into_iter()
- .collect::<BTreeSet<_>>()
- .into_iter()
- .collect::<Vec<_>>();
+ let mut ordered_relays = Vec::new();
+ for relay in relays {
+ if !ordered_relays.iter().any(|existing| existing == &relay) {
+ ordered_relays.push(relay);
+ }
+ }
+ let relays = ordered_relays;
if relays.is_empty() {
return Err(RadrootsRelayTransportError::EmptyTargetSet);
}
diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs
@@ -181,9 +181,9 @@ fn relay_url_validation_and_target_normalization() {
assert_eq!(
targets.relay_strings(),
vec![
- RELAY_SECONDARY_WSS.to_owned(),
RELAY_TERTIARY_WSS.to_owned(),
- RELAY_PRIMARY_WSS.to_owned()
+ RELAY_PRIMARY_WSS.to_owned(),
+ RELAY_SECONDARY_WSS.to_owned()
]
);
}