rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 97efeccbd63deb0ff085524ddb69cad9b9261240
parent e98b8e9182bbd976a702be02179068df884c691d
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 04:01:30 +0000

workflow: harden listing validation recovery semantics

- track validated listing snapshots by event id instead of address-only state
- clear stale validation when listing revalidation fails or latest listing snapshots diverge
- advance replay anchors after handled domain errors and cover the new semantics with tests
- validate with cargo metadata, cargo fmt, git diff --check, cargo check, and CARGO_INCREMENTAL=0 cargo test

Diffstat:
Msrc/features/trade_listing/handlers/dvm.rs | 164+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Msrc/features/trade_listing/state.rs | 99+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/features/trade_listing/subscriber.rs | 40+++++++++++++++++++++++++++++++++++++---
Msrc/rhi.rs | 5++++-
4 files changed, 283 insertions(+), 25 deletions(-)

diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -466,6 +466,7 @@ async fn handle_listing_validate_request( listing_addr: listing_addr.to_string(), }, }; + state.lock().await.clear_listing_validation(listing_addr); send_validate_result(event, client, listing_addr, vec![error]).await?; return Ok(()); } @@ -477,36 +478,53 @@ async fn handle_listing_validate_request( let error = TradeListingValidationError::ListingEventFetchFailed { listing_addr: listing_addr.to_string(), }; + state.lock().await.clear_listing_validation(listing_addr); send_validate_result(event, client, listing_addr, vec![error]).await?; return Ok(()); } } }; - let errors = if let Some(event) = listing_event { - match validate_listing_event_io(&event) { + let (validated_event_id, errors) = if let Some(listing_event) = listing_event { + match validate_listing_event_io(&listing_event) { Ok((validated_listing_addr, farm)) => { if validated_listing_addr != listing_addr { - vec![TradeListingValidationError::ListingEventNotFound { - listing_addr: listing_addr.to_string(), - }] + ( + None, + vec![TradeListingValidationError::ListingEventNotFound { + listing_addr: listing_addr.to_string(), + }], + ) } else { let errors = validate_farm_dependencies(client, &farm).await?; if errors.is_empty() { - let mut state = state.lock().await; - state.mark_listing_validated(listing_addr); + (Some(listing_event.id.to_string()), errors) + } else { + (None, errors) } - errors } } - Err(err) => vec![err], + Err(err) => (None, vec![err]), } } else { - vec![TradeListingValidationError::ListingEventNotFound { - listing_addr: listing_addr.to_string(), - }] + ( + None, + vec![TradeListingValidationError::ListingEventNotFound { + listing_addr: listing_addr.to_string(), + }], + ) }; + { + let mut state = state.lock().await; + match validated_event_id { + Some(validated_event_id) => { + state.mark_listing_validated(listing_addr, &validated_event_id); + } + None => state.clear_listing_validation(listing_addr), + } + } + send_validate_result(event, client, listing_addr, errors).await } @@ -545,10 +563,39 @@ async fn handle_order_request( return Err(TradeListingDvmError::InvalidOrder); } - let mut state = state.lock().await; - if !state.is_listing_validated(&payload.listing_addr) { + { + let state = state.lock().await; + if state.order_exists(order_id) { + return Ok(()); + } + } + + let validated_event_id = { + let state = state.lock().await; + state + .validated_listing_event_id(&payload.listing_addr) + .map(str::to_owned) + } + .ok_or(TradeListingDvmError::ListingNotValidated)?; + + let latest_listing_event = fetch_listing_by_addr(client, &payload.listing_addr).await?; + let Some(latest_listing_event) = latest_listing_event else { + state + .lock() + .await + .clear_listing_validation(&payload.listing_addr); + return Err(TradeListingDvmError::ListingNotValidated); + }; + + if latest_listing_event.id.to_string() != validated_event_id { + state + .lock() + .await + .clear_listing_validation(&payload.listing_addr); return Err(TradeListingDvmError::ListingNotValidated); } + + let mut state = state.lock().await; if state.order_exists(order_id) { return Ok(()); } @@ -1444,7 +1491,7 @@ mod tests { ) -> Arc<AsyncMutex<TradeListingState>> { let state = Arc::new(AsyncMutex::new(TradeListingState::default())); let mut locked = state.lock().await; - locked.mark_listing_validated(listing_addr); + locked.mark_listing_validated(listing_addr, "validated-listing-event"); locked.insert_order(make_order_state( order_id, listing_addr, @@ -1909,6 +1956,10 @@ mod tests { .is_ok() ); assert!(state.lock().await.is_listing_validated(&listing_addr)); + assert_eq!( + state.lock().await.validated_listing_event_id(&listing_addr), + Some(event.id.to_string().as_str()) + ); let other_listing_addr = listing_addr_for_seller(&rhi_keys); dvm_test_hooks() @@ -1948,6 +1999,22 @@ mod tests { .await .is_listing_validated(&mismatch_listing_addr) ); + + state + .lock() + .await + .mark_listing_validated(&listing_addr, "stale-listing-event"); + push_fetch_events_ok(Vec::new()); + push_send_ok(); + let payload = TradeListingValidateRequest { + listing_event: None, + }; + assert!( + handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) + .await + .is_ok() + ); + assert!(!state.lock().await.is_listing_validated(&listing_addr)); } #[tokio::test] @@ -1961,7 +2028,15 @@ mod tests { let seller_pub = seller_keys.public_key().to_hex(); let buyer_pub = buyer_keys.public_key().to_hex(); let state = Arc::new(AsyncMutex::new(TradeListingState::default())); - state.lock().await.mark_listing_validated(&listing_addr); + let validated_listing_event = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("listing event"); + state + .lock() + .await + .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); let order_event = make_event( &buyer_keys, @@ -1969,6 +2044,7 @@ mod tests { "order".to_string(), Vec::new(), ); + push_fetch_events_ok(vec![validated_listing_event.clone()]); push_send_ok(); let order_payload = make_order( order_id, @@ -3059,6 +3135,42 @@ mod tests { Err(TradeListingDvmError::ListingNotValidated) )); + let stale_listing_event = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing-stale") + .custom_created_at(RadrootsNostrTimestamp::from(9_u64)) + .sign_with_keys(&seller_keys) + .expect("stale listing event"); + state + .lock() + .await + .mark_listing_validated(&listing_addr, &stale_listing_event.id.to_string()); + let latest_listing_event = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing-latest") + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("latest listing event"); + push_fetch_events_ok(vec![latest_listing_event]); + let stale_order = make_order( + "order-3", + &listing_addr, + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + ); + assert!(matches!( + handle_order_request( + &event, + stale_order, + &parsed, + Some("order-3"), + &client, + &state + ) + .await, + Err(TradeListingDvmError::ListingNotValidated) + )); + assert!(!state.lock().await.is_listing_validated(&listing_addr)); + set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; let seller_event = make_event( &seller_keys, @@ -3204,6 +3316,15 @@ mod tests { .is_ok() ); + let validated_listing_event = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("listing event"); + state + .lock() + .await + .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); let unauthorized_order = make_order( "order-3", &listing_addr, @@ -3211,6 +3332,7 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); + push_fetch_events_ok(vec![validated_listing_event]); assert!(matches!( handle_order_request( &event, @@ -4121,6 +4243,15 @@ mod tests { "order".to_string(), Vec::new(), ); + let validated_listing_event = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("listing event"); + state + .lock() + .await + .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); let mismatch_payload = make_order( "order-2", @@ -4149,6 +4280,7 @@ mod tests { "not-seller", TradeOrderStatus::Requested, ); + push_fetch_events_ok(vec![validated_listing_event]); assert!(matches!( handle_order_request( &order_event, diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs @@ -24,9 +24,17 @@ pub struct TradeOrderState { pub seen_event_ids: HashSet<String>, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ValidatedListingState { + pub event_id: String, +} + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct TradeListingState { + #[serde(default)] validated_listings: HashSet<String>, + #[serde(default)] + validated_listing_events: HashMap<String, ValidatedListingState>, orders: HashMap<String, TradeOrderState>, last_event_created_at: Option<u32>, } @@ -130,12 +138,29 @@ impl TradeListingRuntime { } impl TradeListingState { - pub fn mark_listing_validated(&mut self, listing_addr: &str) { + pub fn mark_listing_validated(&mut self, listing_addr: &str, event_id: &str) { self.validated_listings.insert(listing_addr.to_string()); + self.validated_listing_events.insert( + listing_addr.to_string(), + ValidatedListingState { + event_id: event_id.to_string(), + }, + ); + } + + pub fn clear_listing_validation(&mut self, listing_addr: &str) { + self.validated_listings.remove(listing_addr); + self.validated_listing_events.remove(listing_addr); + } + + pub fn validated_listing_event_id(&self, listing_addr: &str) -> Option<&str> { + self.validated_listing_events + .get(listing_addr) + .map(|validated| validated.event_id.as_str()) } pub fn is_listing_validated(&self, listing_addr: &str) -> bool { - self.validated_listings.contains(listing_addr) + self.validated_listing_event_id(listing_addr).is_some() } pub fn order_exists(&self, order_id: &str) -> bool { @@ -275,8 +300,10 @@ mod tests { use super::{ PersistedTradeListingState, TradeListingRuntime, TradeListingRuntimeConfig, TradeListingRuntimeError, TradeListingState, TradeListingStateError, TradeOrderState, + ValidatedListingState, }; use radroots_trade::listing::order::TradeOrderStatus; + use std::collections::HashMap; fn unique_state_path(suffix: &str) -> std::path::PathBuf { let nanos = std::time::SystemTime::now() @@ -290,8 +317,12 @@ mod tests { fn state_tracks_listings_events_and_replay_anchor() { let mut state = TradeListingState::default(); assert!(!state.is_listing_validated("addr")); - state.mark_listing_validated("addr"); + state.mark_listing_validated("addr", "evt-listing-1"); assert!(state.is_listing_validated("addr")); + assert_eq!( + state.validated_listing_event_id("addr"), + Some("evt-listing-1") + ); let order = TradeOrderState { order_id: "order-1".into(), @@ -339,7 +370,10 @@ mod tests { async fn runtime_reuses_shared_trade_listing_state() { let runtime = TradeListingRuntime::new(); let state = runtime.state(); - state.lock().await.mark_listing_validated("addr"); + state + .lock() + .await + .mark_listing_validated("addr", "evt-listing-1"); assert!(runtime.state().lock().await.is_listing_validated("addr")); } @@ -359,7 +393,7 @@ mod tests { { let state_handle = runtime.state(); let mut state = state_handle.lock().await; - state.mark_listing_validated("addr"); + state.mark_listing_validated("addr", "evt-listing-1"); state.observe_event_created_at(456); } runtime.persist().await.expect("persist"); @@ -368,6 +402,10 @@ mod tests { let loaded_state_handle = loaded.state(); let loaded_state = loaded_state_handle.lock().await; assert!(loaded_state.is_listing_validated("addr")); + assert_eq!( + loaded_state.validated_listing_event_id("addr"), + Some("evt-listing-1") + ); assert_eq!(loaded_state.last_event_created_at(), Some(456)); let _ = tokio::fs::remove_file(path).await; @@ -398,4 +436,55 @@ mod tests { let _ = tokio::fs::remove_file(path).await; } + + #[tokio::test] + async fn runtime_loads_legacy_validation_state_without_trusting_it() { + let path = unique_state_path("legacy-validation"); + let payload = PersistedTradeListingState { + version: 1, + state: TradeListingState { + validated_listings: ["addr".to_string()].into_iter().collect(), + validated_listing_events: HashMap::new(), + orders: HashMap::new(), + last_event_created_at: Some(321), + }, + }; + tokio::fs::write(&path, serde_json::to_vec(&payload).expect("payload")) + .await + .expect("write"); + + let loaded = TradeListingRuntime::load(TradeListingRuntimeConfig { + state_path: path.clone(), + replay_window_secs: 600, + replay_overlap_secs: 30, + }) + .await + .expect("load"); + let loaded_state_handle = loaded.state(); + let loaded_state = loaded_state_handle.lock().await; + assert!(!loaded_state.is_listing_validated("addr")); + assert_eq!(loaded_state.validated_listing_event_id("addr"), None); + assert_eq!(loaded_state.last_event_created_at(), Some(321)); + + let _ = tokio::fs::remove_file(path).await; + } + + #[test] + fn state_can_clear_listing_validation() { + let mut state = TradeListingState { + validated_listings: ["addr".to_string()].into_iter().collect(), + validated_listing_events: HashMap::from([( + "addr".to_string(), + ValidatedListingState { + event_id: "evt-listing-1".to_string(), + }, + )]), + orders: HashMap::new(), + last_event_created_at: None, + }; + assert!(state.is_listing_validated("addr")); + state.clear_listing_validation("addr"); + assert!(!state.is_listing_validated("addr")); + assert_eq!(state.validated_listing_event_id("addr"), None); + } } diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -192,6 +192,7 @@ async fn process_event_notification( client: RadrootsNostrClient, runtime: TradeListingRuntime, ) -> Result<()> { + let created_at = u32::try_from(event.created_at.as_secs()).unwrap_or(u32::MAX); if should_delay_before_event_handle() { sleep(Duration::from_millis(200)).await; } @@ -220,13 +221,12 @@ async fn process_event_notification( if let Err(err) = handle_error_io(other, &event, &client).await { warn!("trade_listing: failed to send error feedback: {err}"); } - runtime.persist().await?; + runtime.mark_processed_event(created_at).await?; } } return Ok(()); } - let created_at = u32::try_from(event.created_at.as_secs()).unwrap_or(u32::MAX); runtime.mark_processed_event(created_at).await?; Ok(()) } @@ -439,7 +439,10 @@ mod tests { let client = RadrootsNostrClient::new(keys.clone()); let runtime = shared_runtime(); let state = runtime.state(); - state.lock().await.mark_listing_validated("addr"); + state + .lock() + .await + .mark_listing_validated("addr", "evt-listing-1"); let (_tx_first, rx_first) = watch::channel(true); assert!( @@ -618,6 +621,37 @@ mod tests { } #[tokio::test] + async fn handled_domain_errors_advance_replay_anchor() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let runtime = shared_runtime(); + let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test") + .custom_created_at(1_234_u64.into()) + .sign_with_keys(&keys) + .expect("event"); + + let mut hooks = subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + hooks.handle_error_results.push_back(Ok(())); + drop(hooks); + + process_event_notification(event, keys, client, runtime.clone()) + .await + .expect("notification"); + + assert_eq!( + runtime.state().lock().await.last_event_created_at(), + Some(1_234) + ); + } + + #[tokio::test] async fn subscriber_process_event_feedback_error_branches_are_covered() { let _guard = test_guard(); let keys = RadrootsNostrKeys::generate(); diff --git a/src/rhi.rs b/src/rhi.rs @@ -189,7 +189,10 @@ mod tests { let rhi = Rhi::new(keys); let _ = rhi.client.clone(); let state = rhi.trade_listing_runtime.state(); - state.lock().await.mark_listing_validated("addr"); + state + .lock() + .await + .mark_listing_validated("addr", "evt-listing-1"); assert!( rhi.trade_listing_runtime .state()