commit f957017778faa41e47274b03b51d4b6b07232b37
parent d525ffc7ad6d54e2aa3cf7853fd9acdf44f059e6
Author: triesap <tyson@radroots.org>
Date: Sun, 21 Jun 2026 19:23:26 +0000
local-events: expand local event coverage
- reuse parsed order support issues and remove redundant exportability revalidation
- cover local order-work, relay URL, and relay delivery validation edges
- add store executor and row-conversion error coverage for deterministic failures
- verify radroots_local_events tests, check, diff check, and coverage policy gate
Diffstat:
7 files changed, 988 insertions(+), 43 deletions(-)
diff --git a/crates/local_events/src/order_work.rs b/crates/local_events/src/order_work.rs
@@ -67,7 +67,7 @@ pub fn validate_buyer_order_request_local_work_payload(
validate_positive_i64(payload, &["currentness", "created_at_ms"])?;
validate_required_string(payload, &["currentness", "order_updated_at"])?;
- let support_state = validate_support_status(payload)?;
+ let (support_state, support_issues) = validate_support_status(payload)?;
validate_exportability(payload, support_state)?;
validate_order_identity(payload, support_state)?;
validate_order_items(payload)?;
@@ -76,7 +76,7 @@ pub fn validate_buyer_order_request_local_work_payload(
Ok(BuyerOrderRequestLocalWorkValidation {
order_id: order_id.to_owned(),
support_state,
- support_issues: support_issues(payload)?,
+ support_issues,
})
}
@@ -108,7 +108,7 @@ pub fn validate_unsupported_buyer_order_request_local_work_payload(
fn validate_support_status(
payload: &Value,
-) -> Result<BuyerOrderRequestSupportState, LocalEventsError> {
+) -> Result<(BuyerOrderRequestSupportState, Vec<String>), LocalEventsError> {
let state = validate_required_string(payload, &["support_status", "state"])?;
let issues = support_issues(payload)?;
match state {
@@ -119,7 +119,7 @@ fn validate_support_status(
"must be empty when support_status.state is supported",
));
}
- Ok(BuyerOrderRequestSupportState::Supported)
+ Ok((BuyerOrderRequestSupportState::Supported, issues))
}
"unsupported" => {
if issues.is_empty() {
@@ -128,7 +128,7 @@ fn validate_support_status(
"must contain at least one issue when support_status.state is unsupported",
));
}
- Ok(BuyerOrderRequestSupportState::Unsupported)
+ Ok((BuyerOrderRequestSupportState::Unsupported, issues))
}
_ => Err(invalid_field(
"support_status.state",
@@ -142,8 +142,6 @@ fn validate_exportability(
support_state: BuyerOrderRequestSupportState,
) -> Result<(), LocalEventsError> {
let state = validate_required_string(payload, &["exportability", "state"])?;
- let buyer_actor_source =
- validate_required_string(payload, &["document", "buyer_actor", "source"])?;
match state {
"exportable" => {
validate_string_field(
@@ -174,9 +172,6 @@ fn validate_exportability(
));
}
}
- if buyer_actor_source == BUYER_ORDER_REQUEST_ACTOR_SOURCE_RESOLVED_ACCOUNT {
- validate_buyer_pubkey(payload)?;
- }
Ok(())
}
@@ -483,8 +478,13 @@ mod tests {
assert_eq!(
validate_support_status(&payload).expect("support status"),
- BuyerOrderRequestSupportState::Supported
+ (
+ BuyerOrderRequestSupportState::Supported,
+ Vec::<String>::new()
+ )
);
+ validate_supported_buyer_order_request_local_work_payload(&payload)
+ .expect("supported payload");
validate_exportability(&payload, BuyerOrderRequestSupportState::Supported)
.expect("exportability");
validate_order_identity(&payload, BuyerOrderRequestSupportState::Supported)
@@ -595,6 +595,17 @@ mod tests {
bad_currency["document"]["order"]["economics"]["currency"] = json!("usd");
assert_invalid(bad_currency, "currency");
+ let mut bad_currency_length = supported_payload();
+ bad_currency_length["document"]["order"]["economics"]["currency"] = json!("US");
+ assert_invalid(bad_currency_length, "currency");
+
+ let mut missing_economics = supported_payload();
+ missing_economics["document"]["order"]
+ .as_object_mut()
+ .expect("order object")
+ .remove("economics");
+ assert_invalid(missing_economics, "economics");
+
let mut economics_items_missing = supported_payload();
economics_items_missing["document"]["order"]["economics"]["items"] = Value::Null;
assert_invalid(economics_items_missing, "items");
@@ -603,6 +614,27 @@ mod tests {
economics_items_short["document"]["order"]["economics"]["items"] = json!([]);
assert_invalid(economics_items_short, "economics.items");
+ let mut economics_items_long = supported_payload();
+ economics_items_long["document"]["order"]["economics"]["items"] = json!([
+ {
+ "bin_id": "dozen-eggs",
+ "bin_count": 2,
+ "quantity_amount": "1",
+ "quantity_unit": "dozen",
+ "unit_price_amount": "8.00",
+ "unit_price_currency": "USD",
+ "line_subtotal": {
+ "amount": "16.00",
+ "currency": "USD"
+ }
+ },
+ {
+ "bin_id": "half-dozen-eggs",
+ "bin_count": 1
+ }
+ ]);
+ assert_invalid(economics_items_long, "economics.items");
+
let mut economics_bin_missing = supported_payload();
economics_bin_missing["document"]["order"]["economics"]["items"][0]["bin_id"] = Value::Null;
assert_invalid(economics_bin_missing, "economics.items[0].bin_id");
@@ -635,6 +667,13 @@ mod tests {
Value::Null;
assert_invalid(line_subtotal_missing, "amount");
+ let mut missing_line_subtotal = supported_payload();
+ missing_line_subtotal["document"]["order"]["economics"]["items"][0]
+ .as_object_mut()
+ .expect("economics item")
+ .remove("line_subtotal");
+ assert_invalid(missing_line_subtotal, "line_subtotal");
+
let mut line_subtotal_currency = supported_payload();
line_subtotal_currency["document"]["order"]["economics"]["items"][0]["line_subtotal"]["currency"] =
json!("CAD");
@@ -647,6 +686,10 @@ mod tests {
let mut order_item_missing = supported_payload();
order_item_missing["document"]["order"]["items"] = Value::Null;
assert_invalid(order_item_missing, "document.order.items");
+
+ let mut missing_order_bin = supported_payload();
+ missing_order_bin["document"]["order"]["items"][0]["bin_id"] = Value::Null;
+ assert_error_contains(validate_order_items(&missing_order_bin), "items[0].bin_id");
}
fn supported_payload() -> Value {
diff --git a/crates/local_events/src/relay_delivery.rs b/crates/local_events/src/relay_delivery.rs
@@ -411,6 +411,22 @@ mod tests {
assert_error_contains(
RelayDeliveryEvidence {
+ state: RelayDeliveryState::Pending,
+ target_relays: vec!["ws://relay.test".to_owned()],
+ connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
+ observed_relays: Vec::new(),
+ failed_relays: vec![RelayDeliveryFailure {
+ relay_url: "ws://relay.test".to_owned(),
+ error: "timeout".to_owned(),
+ }],
+ }
+ .validate(),
+ "pending delivery evidence",
+ );
+
+ assert_error_contains(
+ RelayDeliveryEvidence {
state: RelayDeliveryState::Acknowledged,
target_relays: vec!["ws://relay.test".to_owned()],
connected_relays: Vec::new(),
@@ -450,6 +466,22 @@ mod tests {
assert_error_contains(
RelayDeliveryEvidence {
+ state: RelayDeliveryState::Failed,
+ target_relays: vec!["ws://relay.test".to_owned()],
+ connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
+ observed_relays: vec!["ws://relay.test".to_owned()],
+ failed_relays: vec![RelayDeliveryFailure {
+ relay_url: "ws://relay.test".to_owned(),
+ error: "timeout".to_owned(),
+ }],
+ }
+ .validate(),
+ "failed delivery evidence",
+ );
+
+ assert_error_contains(
+ RelayDeliveryEvidence {
state: RelayDeliveryState::Observed,
target_relays: vec!["ws://relay.test".to_owned()],
connected_relays: Vec::new(),
@@ -532,6 +564,25 @@ mod tests {
})),
"target_relays",
);
+
+ let relay_vec = vec!["ws://relay-a.test".to_owned()];
+ let relay_slice = relay_vec.as_slice();
+ RelayDeliveryEvidence::acknowledged(
+ relay_vec.clone(),
+ relay_slice,
+ relay_vec.clone(),
+ Vec::new(),
+ )
+ .expect("acknowledged from vecs and slices");
+ assert_error_contains(
+ RelayDeliveryEvidence::observed(
+ ["http://relay.test"],
+ Vec::<String>::new(),
+ Vec::<String>::new(),
+ Vec::new(),
+ ),
+ "target_relays",
+ );
}
fn assert_error_contains<T: std::fmt::Debug>(
diff --git a/crates/local_events/src/relay_url.rs b/crates/local_events/src/relay_url.rs
@@ -181,26 +181,10 @@ mod tests {
normalize_relay_url("ws://[::1]:8080").expect("ipv6 relay"),
"ws://[::1]:8080"
);
- assert!(matches!(
- normalize_relay_url("ws://[::1]extra"),
- Err(RelayUrlValidationError::InvalidAuthority(_))
- ));
- assert!(matches!(
- normalize_relay_url("ws://relay.test:"),
- Err(RelayUrlValidationError::InvalidPort(_))
- ));
- assert!(matches!(
- normalize_relay_url("ws://relay.test:8a"),
- Err(RelayUrlValidationError::InvalidPort(_))
- ));
- assert!(matches!(
- normalize_relay_url("ws://relay one.test"),
- Err(RelayUrlValidationError::InvalidAuthority(_))
- ));
- assert!(matches!(
- normalize_relay_url("ws://relay:8080:9090"),
- Err(RelayUrlValidationError::InvalidAuthority(_))
- ));
+ assert_eq!(
+ normalize_relay_url("ws://[::1]").expect("ipv6 relay without port"),
+ "ws://[::1]"
+ );
}
#[test]
@@ -213,5 +197,14 @@ mod tests {
.expect("relay set");
assert_eq!(relays, vec!["ws://relay-a.test", "ws://relay-b.test"]);
+
+ assert_eq!(
+ normalize_relay_urls(["ws://relay-c.test"]).expect("one relay"),
+ vec!["ws://relay-c.test"]
+ );
+ assert_eq!(
+ normalize_relay_urls(vec!["ws://relay-d.test".to_owned()]).expect("vec relay"),
+ vec!["ws://relay-d.test"]
+ );
}
}
diff --git a/crates/local_events/src/store.rs b/crates/local_events/src/store.rs
@@ -54,18 +54,18 @@ impl<E: SqlExecutor> LocalEventsStore<E> {
input.owner_pubkey,
input.farm_id,
input.listing_addr,
- encode_json(input.local_work_json.as_ref())?,
+ encode_json(input.local_work_json.as_ref()),
input.event_id,
input.event_kind,
input.event_pubkey,
input.event_created_at,
- encode_json(input.event_tags_json.as_ref())?,
+ encode_json(input.event_tags_json.as_ref()),
input.event_content,
input.event_sig,
- encode_json(input.raw_event_json.as_ref())?,
+ encode_json(input.raw_event_json.as_ref()),
input.outbox_status.as_str(),
input.relay_set_fingerprint,
- encode_json(input.relay_delivery_json.as_ref())?
+ encode_json(input.relay_delivery_json.as_ref())
])
.to_string();
let sql = "insert or ignore into local_event_record(
@@ -191,7 +191,7 @@ impl<E: SqlExecutor> LocalEventsStore<E> {
update.status.as_str(),
update.outbox_status.as_str(),
update.relay_set_fingerprint,
- encode_json(update.relay_delivery_json.as_ref())?,
+ encode_json(update.relay_delivery_json.as_ref()),
update.updated_at_ms,
update.record_id
])
@@ -372,11 +372,8 @@ struct ChangeSeqRow {
change_seq: i64,
}
-fn encode_json(value: Option<&Value>) -> Result<Option<String>, LocalEventsError> {
- value
- .map(serde_json::to_string)
- .transpose()
- .map_err(Into::into)
+fn encode_json(value: Option<&Value>) -> Option<String> {
+ value.map(Value::to_string)
}
fn decode_json(value: Option<String>) -> Result<Option<Value>, LocalEventsError> {
@@ -385,3 +382,573 @@ fn decode_json(value: Option<String>) -> Result<Option<Value>, LocalEventsError>
.transpose()
.map_err(Into::into)
}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::VecDeque;
+ use std::sync::Mutex;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ use radroots_sql_core::{ExecOutcome, SqlExecutor, SqliteExecutor};
+ use serde_json::json;
+
+ use super::*;
+
+ fn store() -> LocalEventsStore<SqliteExecutor> {
+ let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
+ let store = LocalEventsStore::new(executor);
+ store.migrate_up().expect("migrate up");
+ store
+ }
+
+ fn local_work(record_id: &str) -> LocalEventRecordInput {
+ LocalEventRecordInput {
+ record_id: record_id.to_owned(),
+ family: LocalRecordFamily::LocalWork,
+ status: LocalRecordStatus::LocalSaved,
+ source_runtime: SourceRuntime::Cli,
+ created_at_ms: 1000,
+ inserted_at_ms: 1001,
+ owner_account_id: Some("seller-account".to_owned()),
+ owner_pubkey: Some("seller-pubkey".to_owned()),
+ farm_id: Some("farm-a".to_owned()),
+ listing_addr: Some("listing-a".to_owned()),
+ local_work_json: Some(json!({"kind":"listing","title":"Eggs"})),
+ event_id: None,
+ event_kind: None,
+ event_pubkey: None,
+ event_created_at: None,
+ event_tags_json: None,
+ event_content: None,
+ event_sig: None,
+ raw_event_json: None,
+ outbox_status: PublishOutboxStatus::None,
+ relay_set_fingerprint: None,
+ relay_delivery_json: None,
+ }
+ }
+
+ fn signed_event(record_id: &str) -> LocalEventRecordInput {
+ LocalEventRecordInput {
+ record_id: record_id.to_owned(),
+ family: LocalRecordFamily::SignedEvent,
+ status: LocalRecordStatus::PendingPublish,
+ source_runtime: SourceRuntime::Cli,
+ created_at_ms: 2000,
+ inserted_at_ms: 2001,
+ owner_account_id: Some("seller-account".to_owned()),
+ owner_pubkey: Some("seller-pubkey".to_owned()),
+ farm_id: Some("farm-a".to_owned()),
+ listing_addr: Some("listing-a".to_owned()),
+ local_work_json: None,
+ event_id: Some(record_id.to_owned()),
+ event_kind: Some(3421),
+ event_pubkey: Some("seller-pubkey".to_owned()),
+ event_created_at: Some(2000),
+ event_tags_json: Some(json!([["d", "listing-a"]])),
+ event_content: Some("{\"title\":\"Eggs\"}".to_owned()),
+ event_sig: Some("sig-a".to_owned()),
+ raw_event_json: Some(json!({"id":record_id,"kind":3421})),
+ outbox_status: PublishOutboxStatus::Pending,
+ relay_set_fingerprint: Some("relay-set-a".to_owned()),
+ relay_delivery_json: Some(json!({
+ "state": "pending",
+ "target_relays": ["ws://127.0.0.1:8080"],
+ "connected_relays": [],
+ "acknowledged_relays": [],
+ "failed_relays": []
+ })),
+ }
+ }
+
+ #[derive(Debug)]
+ struct ScriptedExecutor {
+ begin_result: Mutex<Result<(), SqlError>>,
+ commit_result: Mutex<Result<(), SqlError>>,
+ exec_results: Mutex<VecDeque<Result<ExecOutcome, SqlError>>>,
+ query_results: Mutex<VecDeque<Result<String, SqlError>>>,
+ rollbacks: AtomicUsize,
+ }
+
+ impl ScriptedExecutor {
+ fn new(
+ exec_results: Vec<Result<ExecOutcome, SqlError>>,
+ query_results: Vec<Result<String, SqlError>>,
+ ) -> Self {
+ Self {
+ begin_result: Mutex::new(Ok(())),
+ commit_result: Mutex::new(Ok(())),
+ exec_results: Mutex::new(exec_results.into()),
+ query_results: Mutex::new(query_results.into()),
+ rollbacks: AtomicUsize::new(0),
+ }
+ }
+
+ fn with_begin_error(error: SqlError) -> Self {
+ let executor = Self::new(Vec::new(), Vec::new());
+ *executor.begin_result.lock().expect("begin result") = Err(error);
+ executor
+ }
+
+ fn with_commit_error(error: SqlError) -> Self {
+ let executor = Self::new(
+ vec![Ok(ExecOutcome {
+ changes: 1,
+ last_insert_id: 0,
+ })],
+ vec![Ok(r#"[{"change_seq":1}]"#.to_owned())],
+ );
+ *executor.commit_result.lock().expect("commit result") = Err(error);
+ executor
+ }
+ }
+
+ impl SqlExecutor for ScriptedExecutor {
+ fn exec(&self, _sql: &str, _params_json: &str) -> Result<ExecOutcome, SqlError> {
+ self.exec_results
+ .lock()
+ .expect("exec results")
+ .pop_front()
+ .unwrap_or(Ok(ExecOutcome {
+ changes: 1,
+ last_insert_id: 0,
+ }))
+ }
+
+ fn query_raw(&self, _sql: &str, _params_json: &str) -> Result<String, SqlError> {
+ self.query_results
+ .lock()
+ .expect("query results")
+ .pop_front()
+ .unwrap_or_else(|| Ok("[]".to_owned()))
+ }
+
+ fn begin(&self) -> Result<(), SqlError> {
+ self.begin_result.lock().expect("begin result").clone()
+ }
+
+ fn commit(&self) -> Result<(), SqlError> {
+ self.commit_result.lock().expect("commit result").clone()
+ }
+
+ fn rollback(&self) -> Result<(), SqlError> {
+ self.rollbacks.fetch_add(1, Ordering::SeqCst);
+ Ok(())
+ }
+ }
+
+ fn record_row_with(field: &str, value: serde_json::Value) -> String {
+ let mut row = json!({
+ "seq": 1,
+ "change_seq": 1,
+ "record_id": "record-a",
+ "family": "signed_event",
+ "status": "pending_publish",
+ "source_runtime": "cli",
+ "created_at_ms": 1000,
+ "inserted_at_ms": 1001,
+ "updated_at_ms": 1001,
+ "owner_account_id": "seller-account",
+ "owner_pubkey": "seller-pubkey",
+ "farm_id": "farm-a",
+ "listing_addr": "listing-a",
+ "local_work_json": null,
+ "event_id": "event-a",
+ "event_kind": 3421,
+ "event_pubkey": "seller-pubkey",
+ "event_created_at": 1000,
+ "event_tags_json": "[[\"d\",\"listing-a\"]]",
+ "event_content": "{}",
+ "event_sig": "sig-a",
+ "raw_event_json": "{\"id\":\"event-a\",\"kind\":3421}",
+ "outbox_status": "pending",
+ "relay_set_fingerprint": "relay-set-a",
+ "relay_delivery_json": "{\"state\":\"pending\",\"target_relays\":[\"ws://127.0.0.1:8080\"],\"connected_relays\":[],\"acknowledged_relays\":[],\"failed_relays\":[]}"
+ });
+ row[field] = value;
+ json!([row]).to_string()
+ }
+
+ #[test]
+ fn store_methods_round_trip_records_and_cursors() {
+ let store = store();
+
+ assert!(
+ store
+ .executor()
+ .query_raw("select 1 as value", "[]")
+ .is_ok()
+ );
+ assert!(store.get_record("missing").expect("get missing").is_none());
+ assert!(store.get_cursor("app").expect("cursor missing").is_none());
+
+ let local = store
+ .append_record(&local_work("local-a"))
+ .expect("append local work");
+ let event = store
+ .append_record(&signed_event("event-a"))
+ .expect("append signed event");
+
+ assert_eq!(
+ store
+ .get_record("local-a")
+ .expect("get local")
+ .expect("local record")
+ .record_id,
+ local.record_id
+ );
+ assert_eq!(
+ store
+ .list_records_after_seq(0, 10)
+ .expect("list after seq")
+ .len(),
+ 2
+ );
+ assert_eq!(
+ store
+ .list_records_changed_after(local.change_seq, 10)
+ .expect("list changed after")[0]
+ .record_id,
+ event.record_id
+ );
+ assert_eq!(
+ store.list_records_changed_latest(1).expect("list latest")[0].record_id,
+ event.record_id
+ );
+ assert_eq!(
+ store
+ .list_records_changed_before(event.change_seq, event.seq, 10)
+ .expect("list before")[0]
+ .record_id,
+ local.record_id
+ );
+
+ let cursor = store
+ .advance_cursor("app", event.change_seq, 3000)
+ .expect("advance cursor");
+ assert_eq!(cursor.consumer_id, "app");
+ assert_eq!(
+ store
+ .get_cursor("app")
+ .expect("get cursor")
+ .expect("cursor")
+ .last_change_seq,
+ event.change_seq
+ );
+
+ let updated = store
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: "event-a".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: Some("relay-set-a".to_owned()),
+ relay_delivery_json: Some(json!({
+ "state": "acknowledged",
+ "target_relays": ["ws://127.0.0.1:8080"],
+ "connected_relays": ["ws://127.0.0.1:8080"],
+ "acknowledged_relays": ["ws://127.0.0.1:8080"],
+ "failed_relays": []
+ })),
+ updated_at_ms: 4000,
+ })
+ .expect("update outbox");
+
+ assert_eq!(updated.status, LocalRecordStatus::Published);
+ assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged);
+ store.migrate_down().expect("migrate down");
+ }
+
+ #[test]
+ fn store_reports_missing_updates_and_decode_errors() {
+ let store = store();
+ assert!(
+ store
+ .get_record(" ")
+ .expect_err("empty record id")
+ .to_string()
+ .contains("record_id")
+ );
+ assert!(
+ store
+ .get_cursor(" ")
+ .expect_err("empty consumer id")
+ .to_string()
+ .contains("consumer_id")
+ );
+ assert!(
+ store
+ .advance_cursor(" ", 1, 1000)
+ .expect_err("empty cursor consumer")
+ .to_string()
+ .contains("consumer_id")
+ );
+ assert!(
+ store
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: " ".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: None,
+ relay_delivery_json: None,
+ updated_at_ms: 4000,
+ })
+ .expect_err("empty update record id")
+ .to_string()
+ .contains("record_id")
+ );
+
+ let missing_update = store
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: "missing-event".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: None,
+ relay_delivery_json: None,
+ updated_at_ms: 4000,
+ })
+ .expect_err("missing record update");
+
+ assert!(missing_update.to_string().contains("missing-event"));
+
+ store
+ .append_record(&local_work("local-a"))
+ .expect("append local");
+ let params = json!(["{", "local-a"]).to_string();
+ store
+ .executor()
+ .exec(
+ "update local_event_record set local_work_json = ? where record_id = ?",
+ ¶ms,
+ )
+ .expect("corrupt local work json");
+ let decode_error = store.get_record("local-a").expect_err("decode error");
+
+ assert!(decode_error.to_string().contains("EOF"));
+ }
+
+ #[test]
+ fn store_rolls_back_when_change_sequence_is_unavailable() {
+ let append_store =
+ LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())]));
+ let append_error = append_store
+ .append_record(&local_work("local-a"))
+ .expect_err("append error");
+
+ assert!(append_error.to_string().contains("change sequence"));
+ assert_eq!(append_store.executor().rollbacks.load(Ordering::SeqCst), 1);
+
+ let update_store =
+ LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())]));
+ let update_error = update_store
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: "event-a".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: None,
+ relay_delivery_json: None,
+ updated_at_ms: 4000,
+ })
+ .expect_err("update error");
+
+ assert!(update_error.to_string().contains("change sequence"));
+ assert_eq!(update_store.executor().rollbacks.load(Ordering::SeqCst), 1);
+ }
+
+ #[test]
+ fn store_reports_cursor_advance_without_returned_cursor() {
+ let store = LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), Vec::new()));
+
+ assert!(store.get_cursor("app").expect("missing cursor").is_none());
+ let cursor_error = store
+ .advance_cursor("app", 1, 1000)
+ .expect_err("cursor advance error");
+
+ assert!(cursor_error.to_string().contains("cursor advance failed"));
+ }
+
+ #[test]
+ fn store_reports_executor_and_decode_failures() {
+ let begin_store = LocalEventsStore::new(ScriptedExecutor::with_begin_error(
+ SqlError::InvalidQuery("begin failed".to_owned()),
+ ));
+ assert!(
+ begin_store
+ .append_record(&local_work("local-a"))
+ .expect_err("begin failure")
+ .to_string()
+ .contains("begin failed")
+ );
+
+ let exec_store = LocalEventsStore::new(ScriptedExecutor::new(
+ vec![Err(SqlError::InvalidQuery("insert failed".to_owned()))],
+ vec![Ok(r#"[{"change_seq":1}]"#.to_owned())],
+ ));
+ assert!(
+ exec_store
+ .append_record(&local_work("local-a"))
+ .expect_err("exec failure")
+ .to_string()
+ .contains("insert failed")
+ );
+ assert_eq!(exec_store.executor().rollbacks.load(Ordering::SeqCst), 1);
+
+ let commit_store = LocalEventsStore::new(ScriptedExecutor::with_commit_error(
+ SqlError::InvalidQuery("commit failed".to_owned()),
+ ));
+ assert!(
+ commit_store
+ .append_record(&local_work("local-a"))
+ .expect_err("commit failure")
+ .to_string()
+ .contains("commit failed")
+ );
+
+ let query_error_store = LocalEventsStore::new(ScriptedExecutor::new(
+ Vec::new(),
+ vec![Err(SqlError::InvalidQuery("query failed".to_owned()))],
+ ));
+ assert!(
+ query_error_store
+ .get_record("record-a")
+ .expect_err("query failure")
+ .to_string()
+ .contains("query failed")
+ );
+
+ let invalid_rows_store =
+ LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
+ let _ = invalid_rows_store
+ .get_record("record-a")
+ .expect_err("invalid rows");
+
+ let cursor_rows_store =
+ LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
+ let _ = cursor_rows_store
+ .get_cursor("app")
+ .expect_err("invalid cursor rows");
+
+ let change_rows_store =
+ LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
+ let _ = change_rows_store
+ .append_record(&local_work("local-a"))
+ .expect_err("invalid change rows");
+
+ let cursor_exec_store = LocalEventsStore::new(ScriptedExecutor::new(
+ vec![Err(SqlError::InvalidQuery("cursor failed".to_owned()))],
+ Vec::new(),
+ ));
+ assert!(
+ cursor_exec_store
+ .advance_cursor("app", 1, 1000)
+ .expect_err("cursor exec failure")
+ .to_string()
+ .contains("cursor failed")
+ );
+
+ let append_lookup_store = LocalEventsStore::new(ScriptedExecutor::new(
+ vec![Ok(ExecOutcome {
+ changes: 1,
+ last_insert_id: 0,
+ })],
+ vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())],
+ ));
+ assert!(
+ append_lookup_store
+ .append_record(&local_work("local-a"))
+ .expect_err("append lookup failure")
+ .to_string()
+ .contains("record append failed")
+ );
+
+ let update_lookup_store = LocalEventsStore::new(ScriptedExecutor::new(
+ vec![Ok(ExecOutcome {
+ changes: 1,
+ last_insert_id: 0,
+ })],
+ vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())],
+ ));
+ assert!(
+ update_lookup_store
+ .update_outbox(&LocalEventRecordUpdate {
+ record_id: "event-a".to_owned(),
+ status: LocalRecordStatus::Published,
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: None,
+ relay_delivery_json: None,
+ updated_at_ms: 4000,
+ })
+ .expect_err("update lookup failure")
+ .to_string()
+ .contains("event-a")
+ );
+
+ let cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new(
+ Vec::new(),
+ vec![Err(SqlError::InvalidQuery(
+ "cursor query failed".to_owned(),
+ ))],
+ ));
+ assert!(
+ cursor_query_store
+ .get_cursor("app")
+ .expect_err("cursor query failure")
+ .to_string()
+ .contains("cursor query failed")
+ );
+
+ let advance_cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new(
+ vec![Ok(ExecOutcome {
+ changes: 1,
+ last_insert_id: 0,
+ })],
+ vec![Err(SqlError::InvalidQuery(
+ "advanced cursor query failed".to_owned(),
+ ))],
+ ));
+ assert!(
+ advance_cursor_query_store
+ .advance_cursor("app", 1, 1000)
+ .expect_err("advance cursor query failure")
+ .to_string()
+ .contains("advanced cursor query failed")
+ );
+
+ let change_query_store = LocalEventsStore::new(ScriptedExecutor::new(
+ Vec::new(),
+ vec![Err(SqlError::InvalidQuery(
+ "change query failed".to_owned(),
+ ))],
+ ));
+ assert!(
+ change_query_store
+ .append_record(&local_work("local-a"))
+ .expect_err("change query failure")
+ .to_string()
+ .contains("change query failed")
+ );
+ }
+
+ #[test]
+ fn store_reports_record_row_conversion_failures() {
+ for (field, value, expected) in [
+ ("family", json!("bad_family"), "family"),
+ ("status", json!("bad_status"), "status"),
+ ("source_runtime", json!("bad_runtime"), "runtime"),
+ ("event_tags_json", json!("{"), "EOF"),
+ ("raw_event_json", json!("{"), "EOF"),
+ ("outbox_status", json!("bad_outbox"), "outbox"),
+ ("relay_delivery_json", json!("{"), "EOF"),
+ ] {
+ let store = LocalEventsStore::new(ScriptedExecutor::new(
+ Vec::new(),
+ vec![Ok(record_row_with(field, value))],
+ ));
+ let error = store.get_record("record-a").expect_err("conversion error");
+
+ assert!(
+ error.to_string().contains(expected),
+ "expected error to contain {expected}, got {error}"
+ );
+ }
+ }
+}
diff --git a/crates/local_events/tests/order_work.rs b/crates/local_events/tests/order_work.rs
@@ -30,6 +30,7 @@ fn buyer_order_request_payload_accepts_supported_exportable_work() {
validation.support_state,
BuyerOrderRequestSupportState::Supported
);
+ assert_eq!(validation.support_state.as_str(), "supported");
assert!(validation.support_issues.is_empty());
assert_eq!(supported, validation);
}
@@ -61,6 +62,7 @@ fn buyer_order_request_payload_accepts_explicit_unsupported_work() {
validation.support_state,
BuyerOrderRequestSupportState::Unsupported
);
+ assert_eq!(validation.support_state.as_str(), "unsupported");
assert_eq!(validation.support_issues, vec!["buyer_pubkey_required"]);
assert_eq!(unsupported, validation);
assert!(supported_error.to_string().contains("support_status.state"));
@@ -106,21 +108,60 @@ fn buyer_order_request_payload_rejects_invalid_item_identity() {
#[test]
fn buyer_order_request_payload_rejects_invalid_economics() {
let mut missing_economics = supported_payload();
- missing_economics["document"]["order"]["economics"] = Value::Null;
+ missing_economics["document"]["order"]
+ .as_object_mut()
+ .expect("order object")
+ .remove("economics");
assert_invalid(missing_economics, "economics");
+ let mut non_object_economics = supported_payload();
+ non_object_economics["document"]["order"]["economics"] = Value::Null;
+ assert_invalid(non_object_economics, "economics");
+
let mut mismatched_currency = supported_payload();
mismatched_currency["document"]["order"]["economics"]["items"][0]["unit_price_currency"] =
json!("CAD");
assert_invalid(mismatched_currency, "unit_price_currency");
let mut mismatched_items = supported_payload();
- mismatched_items["document"]["order"]["economics"]["items"] = json!([]);
+ mismatched_items["document"]["order"]["economics"]["items"] = json!([
+ {
+ "bin_id": "dozen-eggs",
+ "bin_count": 2,
+ "quantity_amount": "1",
+ "quantity_unit": "dozen",
+ "unit_price_amount": "8.00",
+ "unit_price_currency": "USD",
+ "line_subtotal": {
+ "amount": "16.00",
+ "currency": "USD"
+ }
+ },
+ {
+ "bin_id": "half-dozen-eggs",
+ "bin_count": 1
+ }
+ ]);
assert_invalid(mismatched_items, "economics.items");
+ let mut empty_economics_items = supported_payload();
+ empty_economics_items["document"]["order"]["economics"]["items"] = json!([]);
+ assert_invalid(empty_economics_items, "economics.items");
+
let mut mismatched_bin = supported_payload();
mismatched_bin["document"]["order"]["economics"]["items"][0]["bin_id"] = json!("other-bin");
assert_invalid(mismatched_bin, "economics.items[0].bin_id");
+
+ let mut bad_currency_length = supported_payload();
+ bad_currency_length["document"]["order"]["economics"]["currency"] = json!("US");
+ assert_invalid(bad_currency_length, "currency");
+
+ let mut missing_line_subtotal = supported_payload();
+ missing_line_subtotal["document"]["order"]["economics"]["items"][0]
+ .as_object_mut()
+ .expect("economics item")
+ .remove("line_subtotal");
+ assert_invalid(missing_line_subtotal, "line_subtotal");
}
#[test]
@@ -129,6 +170,10 @@ fn buyer_order_request_payload_rejects_stale_or_conflicting_currentness() {
stale["currentness"]["current"] = json!(false);
assert_invalid(stale, "currentness.current");
+ let mut missing_current = supported_payload();
+ missing_current["currentness"]["current"] = Value::Null;
+ assert_invalid(missing_current, "currentness.current");
+
let mut wrong_order = supported_payload();
wrong_order["currentness"]["order_id"] = json!("ord_other");
assert_invalid(wrong_order, "currentness.order_id");
diff --git a/crates/local_events/tests/relay_delivery.rs b/crates/local_events/tests/relay_delivery.rs
@@ -14,6 +14,7 @@ fn pending_delivery_evidence_uses_canonical_json_shape() {
.expect("pending evidence");
assert_eq!(evidence.state, RelayDeliveryState::Pending);
+ assert_eq!(evidence.state.as_str(), "pending");
assert_eq!(
evidence.target_relays,
vec![
@@ -43,6 +44,7 @@ fn acknowledged_delivery_evidence_uses_canonical_failure_fields() {
)
.expect("acknowledged evidence");
+ assert_eq!(evidence.state.as_str(), "acknowledged");
assert_eq!(
evidence.to_json_value().expect("json"),
json!({
@@ -68,6 +70,7 @@ fn observed_delivery_evidence_tracks_observed_relays_without_acknowledgement() {
.expect("observed evidence");
assert_eq!(evidence.state, RelayDeliveryState::Observed);
+ assert_eq!(evidence.state.as_str(), "observed");
assert!(evidence.acknowledged_relays.is_empty());
assert_eq!(
evidence.to_json_value().expect("json"),
@@ -116,6 +119,7 @@ fn failed_delivery_evidence_requires_failures_without_acknowledgements() {
.expect("failed evidence");
assert_eq!(evidence.state, RelayDeliveryState::Failed);
+ assert_eq!(evidence.state.as_str(), "failed");
assert!(evidence.acknowledged_relays.is_empty());
assert_eq!(evidence.failed_relays.len(), 1);
}
@@ -164,3 +168,194 @@ fn delivery_evidence_rejects_invalid_json_shape() {
assert!(err.to_string().contains("acknowledged_relays"));
}
+
+#[test]
+fn delivery_evidence_rejects_pending_and_failed_cross_state_fields() {
+ let pending_err = RelayDeliveryEvidence::from_json_value(&json!({
+ "state": "pending",
+ "target_relays": ["wss://relay-a.example"],
+ "connected_relays": [],
+ "acknowledged_relays": [],
+ "observed_relays": ["wss://relay-a.example"],
+ "failed_relays": [
+ {"relay_url": "wss://relay-a.example", "error": "timeout"}
+ ]
+ }))
+ .expect_err("pending evidence with terminal fields");
+
+ assert!(
+ pending_err
+ .to_string()
+ .contains("pending delivery evidence")
+ );
+
+ let failed_err = RelayDeliveryEvidence::from_json_value(&json!({
+ "state": "failed",
+ "target_relays": ["wss://relay-a.example"],
+ "connected_relays": [],
+ "acknowledged_relays": ["wss://relay-a.example"],
+ "observed_relays": ["wss://relay-a.example"],
+ "failed_relays": [
+ {"relay_url": "wss://relay-a.example", "error": "timeout"}
+ ]
+ }))
+ .expect_err("failed evidence with success fields");
+
+ assert!(failed_err.to_string().contains("failed delivery evidence"));
+}
+
+#[test]
+fn delivery_evidence_rejects_invalid_failure_and_relay_values() {
+ let normalized_failure_err = RelayDeliveryEvidence::from_json_value(&json!({
+ "state": "failed",
+ "target_relays": ["wss://relay-a.example"],
+ "connected_relays": [],
+ "acknowledged_relays": [],
+ "failed_relays": [
+ {"relay_url": " wss://relay-a.example ", "error": "timeout"}
+ ]
+ }))
+ .expect_err("non-normalized failure relay");
+
+ assert!(
+ normalized_failure_err
+ .to_string()
+ .contains("failed_relays.relay_url")
+ );
+
+ let trimmed_error_err = RelayDeliveryEvidence::from_json_value(&json!({
+ "state": "failed",
+ "target_relays": ["wss://relay-a.example"],
+ "connected_relays": [],
+ "acknowledged_relays": [],
+ "failed_relays": [
+ {"relay_url": "wss://relay-a.example", "error": " timeout "}
+ ]
+ }))
+ .expect_err("non-normalized failure text");
+
+ assert!(trimmed_error_err.to_string().contains("must be trimmed"));
+
+ let constructor_err =
+ RelayDeliveryEvidence::pending(["http://relay-a.example"]).expect_err("invalid relay");
+
+ assert!(constructor_err.to_string().contains("target_relays"));
+}
+
+#[test]
+fn delivery_evidence_rejects_invalid_relay_sets_in_each_field() {
+ for (field, evidence) in [
+ (
+ "connected_relays",
+ RelayDeliveryEvidence {
+ state: RelayDeliveryState::Pending,
+ target_relays: vec!["wss://relay-a.example".to_owned()],
+ connected_relays: vec!["http://relay-a.example".to_owned()],
+ acknowledged_relays: Vec::new(),
+ observed_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ },
+ ),
+ (
+ "acknowledged_relays",
+ RelayDeliveryEvidence {
+ state: RelayDeliveryState::Acknowledged,
+ target_relays: vec!["wss://relay-a.example".to_owned()],
+ connected_relays: Vec::new(),
+ acknowledged_relays: vec!["http://relay-a.example".to_owned()],
+ observed_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ },
+ ),
+ (
+ "observed_relays",
+ RelayDeliveryEvidence {
+ state: RelayDeliveryState::Observed,
+ target_relays: vec!["wss://relay-a.example".to_owned()],
+ connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
+ observed_relays: vec!["http://relay-a.example".to_owned()],
+ failed_relays: Vec::new(),
+ },
+ ),
+ ] {
+ let error = evidence.validate().expect_err("invalid relay set");
+
+ assert!(
+ error.to_string().contains(field),
+ "expected error to contain {field}, got {error}"
+ );
+ }
+}
+
+#[test]
+fn delivery_evidence_rejects_constructor_and_json_error_paths() {
+ let empty_failure_error = RelayDeliveryEvidence::failed(
+ ["wss://relay-a.example"],
+ Vec::<String>::new(),
+ vec![RelayDeliveryFailure {
+ relay_url: "wss://relay-a.example".to_owned(),
+ error: " ".to_owned(),
+ }],
+ )
+ .expect_err("empty failure error");
+
+ assert!(
+ empty_failure_error
+ .to_string()
+ .contains("failed_relays.error")
+ );
+
+ let invalid_json_error = RelayDeliveryEvidence::from_json_value(&json!({
+ "state": 1,
+ "target_relays": [],
+ "connected_relays": [],
+ "acknowledged_relays": [],
+ "failed_relays": []
+ }))
+ .expect_err("invalid json");
+
+ assert!(!invalid_json_error.to_string().is_empty());
+
+ let to_json_error = RelayDeliveryEvidence {
+ state: RelayDeliveryState::Pending,
+ target_relays: Vec::new(),
+ connected_relays: Vec::new(),
+ acknowledged_relays: Vec::new(),
+ observed_relays: Vec::new(),
+ failed_relays: Vec::new(),
+ }
+ .to_json_value()
+ .expect_err("invalid to json evidence");
+
+ assert!(to_json_error.to_string().contains("target_relays"));
+
+ for result in [
+ RelayDeliveryEvidence::acknowledged(
+ ["wss://relay-a.example"],
+ ["http://relay-a.example"],
+ ["wss://relay-a.example"],
+ Vec::new(),
+ ),
+ RelayDeliveryEvidence::acknowledged(
+ ["wss://relay-a.example"],
+ Vec::<String>::new(),
+ ["http://relay-a.example"],
+ Vec::new(),
+ ),
+ RelayDeliveryEvidence::observed(
+ ["wss://relay-a.example"],
+ Vec::<String>::new(),
+ ["http://relay-a.example"],
+ Vec::new(),
+ ),
+ RelayDeliveryEvidence::acknowledged(
+ ["wss://relay-a.example"],
+ Vec::<String>::new(),
+ Vec::<String>::new(),
+ Vec::new(),
+ ),
+ ] {
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/local_events/tests/relay_url.rs b/crates/local_events/tests/relay_url.rs
@@ -1,6 +1,31 @@
use radroots_local_events::{RelayUrlValidationError, normalize_relay_url, normalize_relay_urls};
#[test]
+fn relay_url_error_display_is_stable() {
+ assert_eq!(
+ RelayUrlValidationError::Empty.to_string(),
+ "relay url must not be empty"
+ );
+ assert_eq!(
+ RelayUrlValidationError::UnsupportedScheme("https://relay.example".to_owned()).to_string(),
+ "relay url must use ws or wss, got `https://relay.example`"
+ );
+ assert_eq!(
+ RelayUrlValidationError::MissingHost("wss://".to_owned()).to_string(),
+ "relay url must include a host, got `wss://`"
+ );
+ assert_eq!(
+ RelayUrlValidationError::InvalidAuthority("wss://user@relay.example".to_owned())
+ .to_string(),
+ "relay url authority is invalid, got `wss://user@relay.example`"
+ );
+ assert_eq!(
+ RelayUrlValidationError::InvalidPort("wss://relay.example:abc".to_owned()).to_string(),
+ "relay url port is invalid, got `wss://relay.example:abc`"
+ );
+}
+
+#[test]
fn relay_url_normalization_trims_and_dedupes() {
let relays = normalize_relay_urls([
" wss://relay-a.example ",
@@ -76,11 +101,33 @@ fn relay_url_validation_rejects_malformed_authority() {
"wss://2001:db8::1".to_owned()
))
);
+ assert_eq!(
+ normalize_relay_url("wss://[2001:db8::1"),
+ Err(RelayUrlValidationError::InvalidAuthority(
+ "wss://[2001:db8::1".to_owned()
+ ))
+ );
+ assert_eq!(
+ normalize_relay_url("wss://[]"),
+ Err(RelayUrlValidationError::MissingHost("wss://[]".to_owned()))
+ );
+ assert_eq!(
+ normalize_relay_url("wss://[2001:db8::1]extra"),
+ Err(RelayUrlValidationError::InvalidAuthority(
+ "wss://[2001:db8::1]extra".to_owned()
+ ))
+ );
}
#[test]
fn relay_url_validation_rejects_invalid_ports() {
assert_eq!(
+ normalize_relay_url("wss://relay.example:"),
+ Err(RelayUrlValidationError::InvalidPort(
+ "wss://relay.example:".to_owned()
+ ))
+ );
+ assert_eq!(
normalize_relay_url("wss://relay.example:abc"),
Err(RelayUrlValidationError::InvalidPort(
"wss://relay.example:abc".to_owned()
@@ -100,4 +147,8 @@ fn relay_url_validation_accepts_bracketed_ipv6() {
normalize_relay_url("wss://[2001:db8::1]:8080/nostr").expect("ipv6 relay"),
"wss://[2001:db8::1]:8080/nostr"
);
+ assert_eq!(
+ normalize_relay_url("wss://[2001:db8::1]").expect("ipv6 relay without port"),
+ "wss://[2001:db8::1]"
+ );
}