commit 3b2f8393a6dbbc34e02d61cca7aee129a02a1cb8
parent 7c4c3baa07a1cd81f64122a5dc85de0c3ccae1b3
Author: triesap <tyson@radroots.org>
Date: Sat, 13 Jun 2026 05:32:51 -0700
relay_transport: expose fetch outcomes
- add per-relay fetch outcome receipts for EOSE, CLOSED, and NOTICE
- add phase1-1 invariant gate for relay, cursor, and trade contract regressions
- add deterministic event-store, outbox, and relay fetch smoke probes
- validate final Phase 1.1 nested gates and workspace diagnostics
Diffstat:
8 files changed, 508 insertions(+), 11 deletions(-)
diff --git a/crates/event_store/src/store.rs b/crates/event_store/src/store.rs
@@ -1129,4 +1129,51 @@ mod tests {
assert_eq!(replay.len(), 1);
assert_eq!(replay[0].event_id, second.id);
}
+
+ #[tokio::test]
+ async fn smoke_event_store_ingests_and_replays_ten_thousand_events() {
+ let store = RadrootsEventStore::open_memory().await.expect("open");
+ for index in 0..10_000u32 {
+ let event = signed_event(
+ KIND_POST,
+ 10_000 + index,
+ vec![vec!["t".to_owned(), "smoke".to_owned()]],
+ format!("smoke-{index}").as_str(),
+ );
+ let receipt = store
+ .ingest_event(RadrootsEventIngest::new(event, 10_000 + i64::from(index)))
+ .await
+ .expect("ingest");
+ assert!(receipt.inserted);
+ assert_eq!(
+ receipt.verification_status,
+ RadrootsEventVerificationStatus::Verified
+ );
+ }
+
+ let replay = store
+ .events_since_cursor("smoke", 10_000)
+ .await
+ .expect("replay");
+ assert_eq!(replay.len(), 10_000);
+ assert_eq!(replay[0].seq, 1);
+ assert_eq!(replay[9_999].seq, 10_000);
+
+ store
+ .update_projection_cursor(&RadrootsProjectionCursor {
+ projection_id: "smoke".to_owned(),
+ projection_version: 1,
+ last_event_seq: replay[4_999].seq,
+ updated_at_ms: 25_000,
+ })
+ .await
+ .expect("cursor");
+ let replay = store
+ .events_since_cursor("smoke", 10_000)
+ .await
+ .expect("replay after cursor");
+ assert_eq!(replay.len(), 5_000);
+ assert_eq!(replay[0].seq, 5_001);
+ assert_eq!(replay[4_999].seq, 10_000);
+ }
}
diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs
@@ -1631,4 +1631,63 @@ mod tests {
.is_none()
);
}
+
+ #[tokio::test]
+ async fn smoke_outbox_claim_cancel_cycles_complete_one_thousand_events() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+ let mut receipts = Vec::new();
+ for index in 0..1_000 {
+ let draft = post_draft(
+ FIXTURE_ALICE_PUBLIC_KEY_HEX,
+ format!("claim-cycle-{index}").as_str(),
+ );
+ let receipt = outbox
+ .enqueue_operation(operation_input(draft, 1_000 + index))
+ .await
+ .expect("enqueue");
+ receipts.push(receipt);
+ }
+
+ for index in 0..1_000 {
+ let claim_token = format!("claim-{index}");
+ let claimed = outbox
+ .claim_next_ready_event(
+ "smoke-worker",
+ claim_token.as_str(),
+ 10_000 + index,
+ 2_000 + index,
+ )
+ .await
+ .expect("claim")
+ .expect("claimed");
+ outbox
+ .cancel_claimed_event(claimed.outbox_event_id, claim_token.as_str(), 3_000 + index)
+ .await
+ .expect("cancel");
+ }
+
+ for receipt in receipts {
+ let event = outbox
+ .get_event(receipt.outbox_event_id)
+ .await
+ .expect("event")
+ .expect("event");
+ assert_eq!(event.state, RadrootsOutboxEventState::Cancelled);
+ assert!(event.claim_token.is_none());
+ let operation = outbox
+ .get_operation(receipt.operation_id)
+ .await
+ .expect("operation")
+ .expect("operation");
+ assert_eq!(operation.status, RadrootsOutboxOperationStatus::Cancelled);
+ }
+
+ assert!(
+ outbox
+ .claim_next_ready_event("smoke-worker", "claim-final", 20_000, 20_000)
+ .await
+ .expect("claim")
+ .is_none()
+ );
+ }
}
diff --git a/crates/relay_transport/src/fetch.rs b/crates/relay_transport/src/fetch.rs
@@ -1,6 +1,6 @@
#![forbid(unsafe_code)]
-use crate::RadrootsRelayTransportError;
+use crate::{RadrootsRelayOutcome, RadrootsRelayTransportError};
use futures::future::BoxFuture;
use nostr::JsonUtil;
use radroots_event_store::{
@@ -63,6 +63,21 @@ pub enum RadrootsRelayFetchItem {
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub enum RadrootsRelayFetchOutcomeKind {
+ Eose,
+ Closed,
+ Notice,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RadrootsRelayFetchRelayOutcome {
+ pub relay_url: String,
+ pub kind: RadrootsRelayFetchOutcomeKind,
+ pub relay_outcome: Option<RadrootsRelayOutcome>,
+ pub message: Option<String>,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RadrootsRelayFetchEventReceipt {
pub relay_url: String,
pub event_id: Option<String>,
@@ -85,6 +100,7 @@ pub struct RadrootsRelayFetchReceipt {
pub closed_count: usize,
pub notice_count: usize,
pub events: Vec<RadrootsRelayFetchEventReceipt>,
+ pub relay_outcomes: Vec<RadrootsRelayFetchRelayOutcome>,
}
pub trait RadrootsRelayFetchAdapter: Send + Sync {
@@ -114,6 +130,7 @@ where
closed_count: 0,
notice_count: 0,
events: Vec::new(),
+ relay_outcomes: Vec::new(),
};
let mut processed_events = 0usize;
for item in items {
@@ -199,14 +216,32 @@ where
}
}
}
- RadrootsRelayFetchItem::Eose { .. } => {
+ RadrootsRelayFetchItem::Eose { relay_url } => {
receipt.eose_count += 1;
+ receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
+ relay_url,
+ kind: RadrootsRelayFetchOutcomeKind::Eose,
+ relay_outcome: None,
+ message: None,
+ });
}
- RadrootsRelayFetchItem::Closed { .. } => {
+ RadrootsRelayFetchItem::Closed { relay_url, message } => {
receipt.closed_count += 1;
+ receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
+ relay_url,
+ kind: RadrootsRelayFetchOutcomeKind::Closed,
+ relay_outcome: Some(RadrootsRelayOutcome::classify(message.as_str())),
+ message: Some(message),
+ });
}
- RadrootsRelayFetchItem::Notice { .. } => {
+ RadrootsRelayFetchItem::Notice { relay_url, message } => {
receipt.notice_count += 1;
+ receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
+ relay_url,
+ kind: RadrootsRelayFetchOutcomeKind::Notice,
+ relay_outcome: None,
+ message: Some(message),
+ });
}
}
}
diff --git a/crates/relay_transport/src/lib.rs b/crates/relay_transport/src/lib.rs
@@ -13,8 +13,9 @@ pub use error::RadrootsRelayTransportError;
#[cfg(feature = "storage")]
pub use fetch::{
RadrootsMockRelayFetchAdapter, RadrootsRelayFetchAdapter, RadrootsRelayFetchEventReceipt,
- RadrootsRelayFetchItem, RadrootsRelayFetchMode, RadrootsRelayFetchReceipt,
- RadrootsRelayFetchRequest, fetch_and_ingest_relay_events,
+ RadrootsRelayFetchItem, RadrootsRelayFetchMode, RadrootsRelayFetchOutcomeKind,
+ RadrootsRelayFetchReceipt, RadrootsRelayFetchRelayOutcome, RadrootsRelayFetchRequest,
+ fetch_and_ingest_relay_events,
};
#[cfg(feature = "storage")]
pub use outbox::{
diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs
@@ -12,9 +12,10 @@ use radroots_outbox::{
};
use radroots_relay_transport::{
RadrootsMockRelayFetchAdapter, RadrootsMockRelayPublishAdapter, RadrootsOutboxPublishPolicy,
- RadrootsRelayFetchItem, RadrootsRelayFetchRequest, RadrootsRelayOutcome,
- RadrootsRelayOutcomeKind, RadrootsRelayTargetSet, RadrootsRelayUrl, RadrootsRelayUrlPolicy,
- fetch_and_ingest_relay_events, publish_claimed_outbox_event, publish_signed_event,
+ RadrootsRelayFetchItem, RadrootsRelayFetchOutcomeKind, RadrootsRelayFetchRequest,
+ RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayTargetSet, RadrootsRelayUrl,
+ RadrootsRelayUrlPolicy, fetch_and_ingest_relay_events, publish_claimed_outbox_event,
+ publish_signed_event,
};
const FIXTURE_ALICE_SECRET_KEY_HEX: &str =
@@ -220,7 +221,11 @@ async fn fetch_ingests_events_and_records_relay_observations() {
},
RadrootsRelayFetchItem::Closed {
relay_url: RELAY_SECONDARY_WSS.to_owned(),
- message: "closed: done".to_owned(),
+ message: "auth-required: challenge".to_owned(),
+ },
+ RadrootsRelayFetchItem::Closed {
+ relay_url: RELAY_TERTIARY_WSS.to_owned(),
+ message: "restricted: group write denied".to_owned(),
},
RadrootsRelayFetchItem::Notice {
relay_url: RELAY_TERTIARY_WSS.to_owned(),
@@ -241,8 +246,38 @@ async fn fetch_ingests_events_and_records_relay_observations() {
assert_eq!(receipt.unsupported_count, 1);
assert_eq!(receipt.malformed_count, 1);
assert_eq!(receipt.eose_count, 1);
- assert_eq!(receipt.closed_count, 1);
+ assert_eq!(receipt.closed_count, 2);
assert_eq!(receipt.notice_count, 1);
+ assert_eq!(receipt.relay_outcomes.len(), 4);
+ assert_eq!(receipt.relay_outcomes[0].relay_url, RELAY_PRIMARY_WSS);
+ assert_eq!(
+ receipt.relay_outcomes[0].kind,
+ RadrootsRelayFetchOutcomeKind::Eose
+ );
+ assert!(receipt.relay_outcomes[0].relay_outcome.is_none());
+ assert_eq!(receipt.relay_outcomes[1].relay_url, RELAY_SECONDARY_WSS);
+ assert_eq!(
+ receipt.relay_outcomes[1]
+ .relay_outcome
+ .as_ref()
+ .expect("auth outcome")
+ .kind,
+ RadrootsRelayOutcomeKind::AuthRequired
+ );
+ assert_eq!(receipt.relay_outcomes[2].relay_url, RELAY_TERTIARY_WSS);
+ assert_eq!(
+ receipt.relay_outcomes[2]
+ .relay_outcome
+ .as_ref()
+ .expect("restricted outcome")
+ .kind,
+ RadrootsRelayOutcomeKind::Restricted
+ );
+ assert_eq!(
+ receipt.relay_outcomes[3].kind,
+ RadrootsRelayFetchOutcomeKind::Notice
+ );
+ assert!(receipt.relay_outcomes[3].relay_outcome.is_none());
assert_eq!(
receipt.events[0].verification_status.as_deref(),
Some(RadrootsEventVerificationStatus::Verified.as_str())
@@ -533,3 +568,42 @@ async fn outbox_publish_marks_published_when_policy_quorum_is_met_with_failure_d
.expect("observations");
assert_eq!(observations.len(), 2);
}
+
+#[tokio::test]
+async fn smoke_relay_fetch_processes_one_thousand_event_receipts() {
+ let store = RadrootsEventStore::open_memory().await.expect("store");
+ let mut items = Vec::new();
+ for index in 0..1_000 {
+ let signed = signed_post(format!("fetch-smoke-{index}").as_str());
+ let relay_url = match index % 3 {
+ 0 => RELAY_PRIMARY_WSS,
+ 1 => RELAY_SECONDARY_WSS,
+ _ => RELAY_TERTIARY_WSS,
+ };
+ items.push(RadrootsRelayFetchItem::Event {
+ relay_url: relay_url.to_owned(),
+ raw_json: signed.raw_json,
+ observed_at_ms: 10_000 + index,
+ });
+ }
+ let adapter = RadrootsMockRelayFetchAdapter::new(items);
+ let receipt = fetch_and_ingest_relay_events(
+ &adapter,
+ &store,
+ RadrootsRelayFetchRequest::fetch(10_000, 1_000),
+ )
+ .await
+ .expect("fetch");
+
+ assert_eq!(receipt.inserted_count, 1_000);
+ assert_eq!(receipt.duplicate_count, 0);
+ assert_eq!(receipt.malformed_count, 0);
+ assert_eq!(receipt.unsupported_count, 0);
+ assert_eq!(receipt.events.len(), 1_000);
+ assert!(receipt.events.iter().all(|event| event.projection_eligible));
+ let replay = store
+ .events_since_cursor("fetch-smoke", 1_000)
+ .await
+ .expect("replay");
+ assert_eq!(replay.len(), 1_000);
+}
diff --git a/crates/xtask/src/main.rs b/crates/xtask/src/main.rs
@@ -2,6 +2,7 @@
mod contract;
mod coverage;
+mod phase1_1;
use std::env;
use std::path::{Path, PathBuf};
@@ -23,6 +24,7 @@ fn usage() {
eprintln!(
" cargo xtask sdk coverage refresh-summary [--reports-root <dir>] [--out <file>] [--status-out <file>]"
);
+ eprintln!(" cargo xtask phase1-1 invariants");
}
fn workspace_root_with_override(override_root: Option<&str>) -> PathBuf {
@@ -73,6 +75,7 @@ fn run_sdk(args: &[String]) -> Result<(), String> {
fn run(args: &[String]) -> Result<(), String> {
match args.first().map(String::as_str) {
Some("sdk") => run_sdk(&args[1..]),
+ Some("phase1-1") => phase1_1::run(&args[1..], &workspace_root()),
_ => Err("unknown command".to_string()),
}
}
@@ -276,6 +279,7 @@ mod tests {
"help".to_string(),
])
.expect("root run sdk coverage");
+ run(&["phase1-1".to_string(), "invariants".to_string()]).expect("phase1-1 invariants");
let _ = fs::remove_dir_all(out_dir);
}
diff --git a/crates/xtask/src/phase1_1.rs b/crates/xtask/src/phase1_1.rs
@@ -0,0 +1,276 @@
+use std::fs;
+use std::path::{Path, PathBuf};
+
+pub fn run(args: &[String], root: &Path) -> Result<(), String> {
+ match args.first().map(String::as_str) {
+ Some("invariants") => validate_invariants(root),
+ _ => Err("unknown phase1-1 subcommand".to_string()),
+ }
+}
+
+pub fn validate_invariants(root: &Path) -> Result<(), String> {
+ let mut failures = Vec::new();
+ reject_substrings(
+ root,
+ &[PathBuf::from("crates/relay_transport/src")],
+ &["RadrootsEventIngest::verified"],
+ "relay fetch must not bypass event-store verification",
+ &mut failures,
+ );
+ reject_substrings(
+ root,
+ &[PathBuf::from("crates/event_store/src")],
+ &["last_created_at", "last_event_id"],
+ "event-store projection cursors must use last_event_seq",
+ &mut failures,
+ );
+ reject_raw_protocol_strings(root, &mut failures);
+ reject_substrings(
+ root,
+ &[
+ PathBuf::from("crates/events/src"),
+ PathBuf::from("crates/events_codec/src"),
+ PathBuf::from("crates/trade/src"),
+ PathBuf::from("crates/sdk/src"),
+ ],
+ &[
+ "RadrootsTradeMessageType",
+ "RadrootsTradeEnvelope",
+ "RadrootsTradeMessagePayload",
+ "RadrootsTradeQuestion",
+ "RadrootsTradeAnswer",
+ "RadrootsTradeDiscount",
+ "RadrootsTradeOrder",
+ "RadrootsActiveOrder",
+ "RadrootsActiveTrade",
+ "RadrootsTradeListingParseError",
+ "RadrootsTradeDomain",
+ "TradeListingParseError",
+ "TradeListingEnvelope",
+ "TradeListingMessage",
+ "KIND_TRADE_ORDER",
+ "TRADE_LISTING_KINDS",
+ "build_envelope_draft",
+ "parse_envelope",
+ "public_trade",
+ "events::trade::",
+ "events_codec::trade::",
+ "radroots_sdk::trade::",
+ "trade_order_economics_digest",
+ "trade_revision",
+ "trade_lifecycle",
+ "reduce_active_order",
+ "canonicalize_active_order",
+ "active_trade_",
+ "ActiveOrder",
+ "active_order",
+ "active order",
+ "active trade",
+ "RADROOTS_TRADE_LISTING_DOMAIN",
+ "RADROOTS_TRADE_ENVELOPE_VERSION",
+ ],
+ "legacy trade identifiers must not reappear",
+ &mut failures,
+ );
+
+ if failures.is_empty() {
+ println!("phase1-1 invariants passed");
+ Ok(())
+ } else {
+ Err(format!(
+ "phase1-1 invariant violations:\n{}",
+ failures.join("\n")
+ ))
+ }
+}
+
+fn reject_substrings(
+ root: &Path,
+ rel_roots: &[PathBuf],
+ patterns: &[&str],
+ label: &str,
+ failures: &mut Vec<String>,
+) {
+ for file in files_under(root, rel_roots) {
+ let Ok(content) = fs::read_to_string(&file) else {
+ continue;
+ };
+ for (line_index, line) in content.lines().enumerate() {
+ for pattern in patterns {
+ if line.contains(pattern) {
+ failures.push(format!(
+ "{label}: {}:{}: {}",
+ display_path(root, &file),
+ line_index + 1,
+ line.trim()
+ ));
+ }
+ }
+ }
+ }
+}
+
+fn reject_raw_protocol_strings(root: &Path, failures: &mut Vec<String>) {
+ let rel_roots = [
+ PathBuf::from("crates/events/src"),
+ PathBuf::from("crates/events_codec/src"),
+ PathBuf::from("crates/trade/src"),
+ ];
+ for file in files_under(root, &rel_roots) {
+ let Ok(content) = fs::read_to_string(&file) else {
+ continue;
+ };
+ let mut struct_name = String::new();
+ for (line_index, line) in content.lines().enumerate() {
+ let trimmed = line.trim();
+ if let Some(rest) = trimmed.strip_prefix("pub struct ") {
+ struct_name = rest
+ .split(['<', '{', ' ', '('])
+ .next()
+ .unwrap_or_default()
+ .to_owned();
+ }
+ if trimmed == "}" {
+ struct_name.clear();
+ }
+ if is_raw_protocol_field(trimmed) && !is_allowed_raw_boundary(&struct_name) {
+ failures.push(format!(
+ "raw commercial protocol identifier String fields are forbidden: {}:{}: {}",
+ display_path(root, &file),
+ line_index + 1,
+ trimmed
+ ));
+ }
+ }
+ }
+}
+
+fn is_raw_protocol_field(line: &str) -> bool {
+ [
+ "pub order_id: String,",
+ "pub listing_addr: String,",
+ "pub revision_id: String,",
+ "pub quote_id: String,",
+ "pub primary_bin_id: String,",
+ "pub bin_id: String,",
+ "pub economics_digest: String,",
+ ]
+ .iter()
+ .any(|field| line == *field)
+}
+
+fn is_allowed_raw_boundary(struct_name: &str) -> bool {
+ struct_name == "RadrootsOrderEnvelope"
+ || struct_name == "RadrootsValidationReceiptTags"
+ || struct_name == "RadrootsTradeListing"
+ || struct_name.ends_with("Projection")
+ || struct_name.ends_with("Accounting")
+ || struct_name.ends_with("Availability")
+ || struct_name.ends_with("Reservation")
+ || struct_name.ends_with("Issue")
+ || struct_name.ends_with("NormalizedInventoryCount")
+}
+
+fn files_under(root: &Path, rel_roots: &[PathBuf]) -> Vec<PathBuf> {
+ let mut files = Vec::new();
+ for rel_root in rel_roots {
+ collect_files(root.join(rel_root), &mut files);
+ }
+ files.sort();
+ files
+}
+
+fn collect_files(path: PathBuf, files: &mut Vec<PathBuf>) {
+ let Ok(metadata) = fs::metadata(&path) else {
+ return;
+ };
+ if metadata.is_file() {
+ if matches!(
+ path.extension().and_then(|ext| ext.to_str()),
+ Some("rs" | "sql" | "sh")
+ ) {
+ files.push(path);
+ }
+ return;
+ }
+ let Ok(entries) = fs::read_dir(path) else {
+ return;
+ };
+ for entry in entries.flatten() {
+ collect_files(entry.path(), files);
+ }
+}
+
+fn display_path(root: &Path, file: &Path) -> String {
+ file.strip_prefix(root)
+ .unwrap_or(file)
+ .to_string_lossy()
+ .to_string()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::{SystemTime, UNIX_EPOCH};
+
+ fn unique_temp_dir(prefix: &str) -> PathBuf {
+ let ns = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("system time")
+ .as_nanos();
+ std::env::temp_dir().join(format!("radroots_xtask_phase1_1_{prefix}_{ns}"))
+ }
+
+ fn write_file(root: &Path, rel: &str, content: &str) {
+ let path = root.join(rel);
+ fs::create_dir_all(path.parent().expect("parent")).expect("create parent");
+ fs::write(path, content).expect("write");
+ }
+
+ #[test]
+ fn invariants_accept_clean_synthetic_tree() {
+ let root = unique_temp_dir("clean");
+ write_file(
+ &root,
+ "crates/relay_transport/src/fetch.rs",
+ "fn fetch() { let _ = RadrootsEventIngest::new; }\n",
+ );
+ write_file(
+ &root,
+ "crates/event_store/src/store.rs",
+ "pub struct RadrootsProjectionCursor { pub last_event_seq: i64 }\n",
+ );
+ write_file(
+ &root,
+ "crates/trade/src/order.rs",
+ "pub struct RadrootsOrderProjection { pub order_id: RadrootsOrderId, }\n",
+ );
+ validate_invariants(&root).expect("clean tree");
+ let _ = fs::remove_dir_all(root);
+ }
+
+ #[test]
+ fn invariants_reject_phase1_regressions() {
+ let root = unique_temp_dir("dirty");
+ write_file(
+ &root,
+ "crates/relay_transport/src/fetch.rs",
+ "fn fetch() { let _ = RadrootsEventIngest::verified; }\n",
+ );
+ write_file(
+ &root,
+ "crates/event_store/src/store.rs",
+ "pub struct Cursor { pub last_event_id: String }\n",
+ );
+ write_file(
+ &root,
+ "crates/trade/src/order.rs",
+ "pub struct BadOrder {\n pub order_id: String,\n}\n",
+ );
+ let err = validate_invariants(&root).expect_err("dirty tree");
+ assert!(err.contains("relay fetch must not bypass event-store verification"));
+ assert!(err.contains("event-store projection cursors must use last_event_seq"));
+ assert!(err.contains("raw commercial protocol identifier String fields are forbidden"));
+ let _ = fs::remove_dir_all(root);
+ }
+}
diff --git a/scripts/ci/guard_no_legacy_identifiers.sh b/scripts/ci/guard_no_legacy_identifiers.sh
@@ -11,6 +11,7 @@ scan_forbidden() {
rg -nI \
--glob '!AGENTS.md' \
--glob '!scripts/ci/guard_no_legacy_identifiers.sh' \
+ --glob '!crates/xtask/src/phase1_1.rs' \
-- "$pattern" "$@" ||
true
)"