commit e98b8e9182bbd976a702be02179068df884c691d
parent af77cb3154cb018839b0ddf6ea6f580af1f98f81
Author: triesap <tyson@radroots.org>
Date: Fri, 27 Mar 2026 23:22:38 +0000
trade: persist workflow state and replay history
- add durable trade listing runtime state config and snapshot storage
- load persisted workflow state before subscriber startup in run_rhi
- recover subscriptions from replay anchors instead of new-only filters
- document the state path config and cover persistence with tests
Diffstat:
7 files changed, 365 insertions(+), 56 deletions(-)
diff --git a/config.toml b/config.toml
@@ -22,3 +22,8 @@ base_ms = 500
max_ms = 30000
factor = 2
jitter_ms = 0
+
+[config.subscriber.state]
+path = "state/trade-listing-state.json"
+replay_window_secs = 86400
+replay_overlap_secs = 300
diff --git a/src/config.rs b/src/config.rs
@@ -1,6 +1,7 @@
use radroots_nostr::prelude::RadrootsNostrMetadata;
use radroots_runtime::{BackoffConfig, RadrootsNostrServiceConfig};
use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Configuration {
@@ -14,6 +15,25 @@ pub struct Configuration {
pub struct SubscriberConfig {
#[serde(default)]
pub backoff: BackoffConfig,
+ #[serde(default)]
+ pub state: SubscriberStateConfig,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct SubscriberStateConfig {
+ pub path: PathBuf,
+ pub replay_window_secs: u64,
+ pub replay_overlap_secs: u64,
+}
+
+impl Default for SubscriberStateConfig {
+ fn default() -> Self {
+ Self {
+ path: PathBuf::from("state/trade-listing-state.json"),
+ replay_window_secs: 24 * 60 * 60,
+ replay_overlap_secs: 5 * 60,
+ }
+ }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs
@@ -1,14 +1,20 @@
#![forbid(unsafe_code)]
use std::collections::{HashMap, HashSet};
+use std::path::{Path, PathBuf};
use std::sync::Arc;
+use radroots_nostr::prelude::{RadrootsNostrFilter, RadrootsNostrKind, RadrootsNostrTimestamp};
use radroots_trade::listing::order::TradeOrderStatus;
+use serde::{Deserialize, Serialize};
+use thiserror::Error;
use tokio::sync::Mutex;
pub type SharedTradeListingState = Arc<Mutex<TradeListingState>>;
-#[derive(Clone, Debug)]
+const TRADE_LISTING_STATE_VERSION: u32 = 1;
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TradeOrderState {
pub order_id: String,
pub listing_addr: String,
@@ -18,21 +24,54 @@ pub struct TradeOrderState {
pub seen_event_ids: HashSet<String>,
}
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct TradeListingState {
validated_listings: HashSet<String>,
orders: HashMap<String, TradeOrderState>,
+ last_event_created_at: Option<u32>,
}
#[derive(Clone, Debug)]
pub struct TradeListingRuntime {
state: SharedTradeListingState,
+ config: TradeListingRuntimeConfig,
+ persistence: Option<Arc<TradeListingStatePersistence>>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct TradeListingRuntimeConfig {
+ pub state_path: PathBuf,
+ pub replay_window_secs: u64,
+ pub replay_overlap_secs: u64,
+}
+
+#[derive(Clone, Debug)]
+struct TradeListingStatePersistence {
+ path: PathBuf,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct PersistedTradeListingState {
+ version: u32,
+ state: TradeListingState,
+}
+
+impl Default for TradeListingRuntimeConfig {
+ fn default() -> Self {
+ Self {
+ state_path: PathBuf::from("state/trade-listing-state.json"),
+ replay_window_secs: 24 * 60 * 60,
+ replay_overlap_secs: 5 * 60,
+ }
+ }
}
impl Default for TradeListingRuntime {
fn default() -> Self {
Self {
state: Arc::new(Mutex::new(TradeListingState::default())),
+ config: TradeListingRuntimeConfig::default(),
+ persistence: None,
}
}
}
@@ -42,9 +81,52 @@ impl TradeListingRuntime {
Self::default()
}
+ pub async fn load(config: TradeListingRuntimeConfig) -> Result<Self, TradeListingRuntimeError> {
+ let persistence = Arc::new(TradeListingStatePersistence::new(config.state_path.clone()));
+ let state = persistence.load().await?;
+ Ok(Self {
+ state: Arc::new(Mutex::new(state)),
+ config,
+ persistence: Some(persistence),
+ })
+ }
+
pub fn state(&self) -> SharedTradeListingState {
Arc::clone(&self.state)
}
+
+ pub async fn persist(&self) -> Result<(), TradeListingRuntimeError> {
+ let Some(persistence) = &self.persistence else {
+ return Ok(());
+ };
+ let snapshot = self.state.lock().await.clone();
+ persistence.persist(&snapshot).await
+ }
+
+ pub async fn mark_processed_event(
+ &self,
+ created_at: u32,
+ ) -> Result<(), TradeListingRuntimeError> {
+ {
+ let mut state = self.state.lock().await;
+ state.observe_event_created_at(created_at);
+ }
+ self.persist().await
+ }
+
+ pub async fn recovery_filter(&self, kinds: Vec<RadrootsNostrKind>) -> RadrootsNostrFilter {
+ let since = {
+ let state = self.state.lock().await;
+ state.replay_since(
+ RadrootsNostrTimestamp::now().as_secs(),
+ self.config.replay_window_secs,
+ self.config.replay_overlap_secs,
+ )
+ };
+ RadrootsNostrFilter::new()
+ .kinds(kinds)
+ .since(RadrootsNostrTimestamp::from(since))
+ }
}
impl TradeListingState {
@@ -82,6 +164,75 @@ impl TradeListingState {
.map(|state| state.seen_event_ids.contains(event_id))
.unwrap_or(false)
}
+
+ pub fn observe_event_created_at(&mut self, created_at: u32) {
+ self.last_event_created_at = Some(
+ self.last_event_created_at
+ .map_or(created_at, |current| current.max(created_at)),
+ );
+ }
+
+ pub fn last_event_created_at(&self) -> Option<u32> {
+ self.last_event_created_at
+ }
+
+ pub fn replay_since(
+ &self,
+ now_secs: u64,
+ replay_window_secs: u64,
+ replay_overlap_secs: u64,
+ ) -> u64 {
+ match self.last_event_created_at {
+ Some(last) => u64::from(last).saturating_sub(replay_overlap_secs),
+ None => now_secs.saturating_sub(replay_window_secs),
+ }
+ }
+}
+
+impl TradeListingStatePersistence {
+ fn new(path: PathBuf) -> Self {
+ Self { path }
+ }
+
+ async fn load(&self) -> Result<TradeListingState, TradeListingRuntimeError> {
+ if !tokio::fs::try_exists(&self.path).await? {
+ return Ok(TradeListingState::default());
+ }
+
+ let payload = tokio::fs::read_to_string(&self.path).await?;
+ let snapshot: PersistedTradeListingState = serde_json::from_str(&payload)?;
+ if snapshot.version != TRADE_LISTING_STATE_VERSION {
+ return Err(TradeListingRuntimeError::UnsupportedStateVersion(
+ snapshot.version,
+ ));
+ }
+ Ok(snapshot.state)
+ }
+
+ async fn persist(&self, state: &TradeListingState) -> Result<(), TradeListingRuntimeError> {
+ if let Some(parent) = self.path.parent() {
+ if !parent.as_os_str().is_empty() {
+ tokio::fs::create_dir_all(parent).await?;
+ }
+ }
+
+ let snapshot = PersistedTradeListingState {
+ version: TRADE_LISTING_STATE_VERSION,
+ state: state.clone(),
+ };
+ let payload = serde_json::to_vec_pretty(&snapshot)?;
+ let temp_path = temp_state_path(&self.path)?;
+ tokio::fs::write(&temp_path, payload).await?;
+ tokio::fs::rename(&temp_path, &self.path).await?;
+ Ok(())
+ }
+}
+
+fn temp_state_path(path: &Path) -> Result<PathBuf, TradeListingRuntimeError> {
+ let file_name = path
+ .file_name()
+ .ok_or_else(|| TradeListingRuntimeError::InvalidStatePath(path.to_path_buf()))?;
+ Ok(path.with_file_name(format!("{}.tmp", file_name.to_string_lossy())))
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -106,14 +257,37 @@ impl core::fmt::Display for TradeListingStateError {
impl std::error::Error for TradeListingStateError {}
+#[derive(Debug, Error)]
+pub enum TradeListingRuntimeError {
+ #[error("invalid trade listing state path: {0}")]
+ InvalidStatePath(PathBuf),
+ #[error("unsupported trade listing state version: {0}")]
+ UnsupportedStateVersion(u32),
+ #[error("trade listing state io error: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("trade listing state json error: {0}")]
+ Json(#[from] serde_json::Error),
+}
+
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
- use super::{TradeListingRuntime, TradeListingState, TradeListingStateError, TradeOrderState};
+ use super::{
+ PersistedTradeListingState, TradeListingRuntime, TradeListingRuntimeConfig,
+ TradeListingRuntimeError, TradeListingState, TradeListingStateError, TradeOrderState,
+ };
use radroots_trade::listing::order::TradeOrderStatus;
+ fn unique_state_path(suffix: &str) -> std::path::PathBuf {
+ let nanos = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("time")
+ .as_nanos();
+ std::env::temp_dir().join(format!("rhi-trade-state-{suffix}-{nanos}.json"))
+ }
+
#[test]
- fn state_tracks_listings_and_events() {
+ fn state_tracks_listings_events_and_replay_anchor() {
let mut state = TradeListingState::default();
assert!(!state.is_listing_validated("addr"));
state.mark_listing_validated("addr");
@@ -131,6 +305,11 @@ mod tests {
assert!(!state.is_event_seen("order-1", "evt"));
assert!(state.mark_event_seen("order-1", "evt"));
assert!(state.is_event_seen("order-1", "evt"));
+ assert_eq!(state.replay_since(1_000, 300, 60), 700);
+
+ state.observe_event_created_at(900);
+ assert_eq!(state.last_event_created_at(), Some(900));
+ assert_eq!(state.replay_since(1_000, 300, 60), 840);
}
#[test]
@@ -164,4 +343,59 @@ mod tests {
assert!(runtime.state().lock().await.is_listing_validated("addr"));
}
+
+ #[tokio::test]
+ async fn runtime_persists_and_loads_trade_listing_state() {
+ let path = unique_state_path("roundtrip");
+ let config = TradeListingRuntimeConfig {
+ state_path: path.clone(),
+ replay_window_secs: 600,
+ replay_overlap_secs: 30,
+ };
+ let runtime = TradeListingRuntime::load(config.clone())
+ .await
+ .expect("runtime");
+
+ {
+ let state_handle = runtime.state();
+ let mut state = state_handle.lock().await;
+ state.mark_listing_validated("addr");
+ state.observe_event_created_at(456);
+ }
+ runtime.persist().await.expect("persist");
+
+ let loaded = TradeListingRuntime::load(config).await.expect("load");
+ let loaded_state_handle = loaded.state();
+ let loaded_state = loaded_state_handle.lock().await;
+ assert!(loaded_state.is_listing_validated("addr"));
+ assert_eq!(loaded_state.last_event_created_at(), Some(456));
+
+ let _ = tokio::fs::remove_file(path).await;
+ }
+
+ #[tokio::test]
+ async fn runtime_load_rejects_unsupported_snapshot_version() {
+ let path = unique_state_path("version");
+ let payload = PersistedTradeListingState {
+ version: 99,
+ state: TradeListingState::default(),
+ };
+ tokio::fs::write(&path, serde_json::to_vec(&payload).expect("payload"))
+ .await
+ .expect("write");
+
+ let err = TradeListingRuntime::load(TradeListingRuntimeConfig {
+ state_path: path.clone(),
+ replay_window_secs: 600,
+ replay_overlap_secs: 30,
+ })
+ .await
+ .expect_err("unsupported snapshot should fail");
+ assert!(matches!(
+ err,
+ TradeListingRuntimeError::UnsupportedStateVersion(99)
+ ));
+
+ let _ = tokio::fs::remove_file(path).await;
+ }
}
diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs
@@ -7,7 +7,7 @@ use anyhow::{Result, anyhow};
use radroots_nostr::prelude::{
RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys,
RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag,
- radroots_nostr_filter_new_events, radroots_nostr_tags_resolve,
+ radroots_nostr_tags_resolve,
};
use tokio::sync::watch;
use tokio::time::sleep;
@@ -17,7 +17,7 @@ use radroots_trade::listing::kinds::TRADE_LISTING_KINDS;
use crate::features::trade_listing::{
handlers::dvm::{TradeListingDvmError, handle_error, handle_event},
- state::SharedTradeListingState,
+ state::{SharedTradeListingState, TradeListingRuntime},
};
#[cfg(test)]
@@ -190,8 +190,8 @@ async fn process_event_notification(
event: RadrootsNostrEvent,
keys: RadrootsNostrKeys,
client: RadrootsNostrClient,
- state: SharedTradeListingState,
-) {
+ runtime: TradeListingRuntime,
+) -> Result<()> {
if should_delay_before_event_handle() {
sleep(Duration::from_millis(200)).await;
}
@@ -200,12 +200,19 @@ async fn process_event_notification(
Ok(tags) => tags,
Err(err) => {
warn!("trade_listing: failed to resolve tags: {err}");
- return;
+ return Ok(());
}
};
- if let Err(err) =
- handle_event_io(event.clone(), resolved_tags, keys, client.clone(), state).await
+ let state = runtime.state();
+ if let Err(err) = handle_event_io(
+ event.clone(),
+ resolved_tags,
+ keys,
+ client.clone(),
+ state.clone(),
+ )
+ .await
{
match err {
TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {}
@@ -213,24 +220,30 @@ async fn process_event_notification(
if let Err(err) = handle_error_io(other, &event, &client).await {
warn!("trade_listing: failed to send error feedback: {err}");
}
+ runtime.persist().await?;
}
}
+ return Ok(());
}
+
+ let created_at = u32::try_from(event.created_at.as_secs()).unwrap_or(u32::MAX);
+ runtime.mark_processed_event(created_at).await?;
+ Ok(())
}
async fn dispatch_event_processing(
event: RadrootsNostrEvent,
keys: RadrootsNostrKeys,
client: RadrootsNostrClient,
- state: SharedTradeListingState,
-) {
- process_event_notification(event, keys, client, state).await;
+ runtime: TradeListingRuntime,
+) -> Result<()> {
+ process_event_notification(event, keys, client, runtime).await
}
pub async fn subscriber(
client: RadrootsNostrClient,
keys: RadrootsNostrKeys,
- state: SharedTradeListingState,
+ runtime: TradeListingRuntime,
mut stop_rx: watch::Receiver<bool>,
) -> Result<()> {
info!(
@@ -242,7 +255,7 @@ pub async fn subscriber(
.iter()
.map(|kind| RadrootsNostrKind::Custom(*kind))
.collect();
- let filter = radroots_nostr_filter_new_events(RadrootsNostrFilter::new().kinds(kinds));
+ let filter: RadrootsNostrFilter = runtime.recovery_filter(kinds).await;
if *stop_rx.borrow() {
return Ok(());
@@ -277,8 +290,8 @@ pub async fn subscriber(
let event = (*event).clone();
let keys = keys.clone();
let client = client.clone();
- let state = state.clone();
- dispatch_event_processing(event, keys, client, state).await;
+ let runtime = runtime.clone();
+ dispatch_event_processing(event, keys, client, runtime).await?;
}
}
}
@@ -299,14 +312,14 @@ mod tests {
process_event_notification, resolve_tags_io, subscriber, subscriber_test_hooks,
};
use crate::features::trade_listing::handlers::dvm::TradeListingDvmError;
- use crate::features::trade_listing::state::{SharedTradeListingState, TradeListingState};
+ use crate::features::trade_listing::state::TradeListingRuntime;
use radroots_nostr::error::RadrootsNostrTagsResolveError;
use radroots_nostr::prelude::{
RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrKeys, RadrootsNostrKind,
RadrootsNostrRelayPoolNotification, RadrootsNostrRelayUrl, RadrootsNostrSubscriptionId,
RadrootsNostrTag,
};
- use std::sync::{Arc, Mutex, MutexGuard};
+ use std::sync::{Mutex, MutexGuard};
use tokio::sync::watch;
static TEST_LOCK: Mutex<()> = Mutex::new(());
@@ -336,8 +349,8 @@ mod tests {
RadrootsNostrRelayPoolNotification::Shutdown
}
- fn shared_state() -> SharedTradeListingState {
- Arc::new(tokio::sync::Mutex::new(TradeListingState::default()))
+ fn shared_runtime() -> TradeListingRuntime {
+ TradeListingRuntime::new()
}
#[test]
@@ -367,7 +380,8 @@ mod tests {
.push_back(Ok(Vec::<RadrootsNostrTag>::new()));
assert!(resolve_tags_io(&event, &keys).is_ok());
- let state = shared_state();
+ let runtime = shared_runtime();
+ let state = runtime.state();
assert!(matches!(
handle_event_io(
event.clone(),
@@ -415,7 +429,7 @@ mod tests {
let keys = RadrootsNostrKeys::generate();
let client = RadrootsNostrClient::new(keys.clone());
let (_tx, rx) = watch::channel(true);
- assert!(subscriber(client, keys, shared_state(), rx).await.is_ok());
+ assert!(subscriber(client, keys, shared_runtime(), rx).await.is_ok());
}
#[tokio::test]
@@ -423,12 +437,13 @@ mod tests {
let _guard = test_guard();
let keys = RadrootsNostrKeys::generate();
let client = RadrootsNostrClient::new(keys.clone());
- let state = shared_state();
+ let runtime = shared_runtime();
+ let state = runtime.state();
state.lock().await.mark_listing_validated("addr");
let (_tx_first, rx_first) = watch::channel(true);
assert!(
- subscriber(client.clone(), keys.clone(), state.clone(), rx_first)
+ subscriber(client.clone(), keys.clone(), runtime.clone(), rx_first)
.await
.is_ok()
);
@@ -436,7 +451,7 @@ mod tests {
let (_tx_second, rx_second) = watch::channel(true);
assert!(
- subscriber(client, keys, state.clone(), rx_second)
+ subscriber(client, keys, runtime.clone(), rx_second)
.await
.is_ok()
);
@@ -449,7 +464,7 @@ mod tests {
let keys = RadrootsNostrKeys::generate();
let client = RadrootsNostrClient::new(keys.clone());
let (_tx, rx) = watch::channel(false);
- let err = subscriber(client, keys, shared_state(), rx)
+ let err = subscriber(client, keys, shared_runtime(), rx)
.await
.expect_err("expected relay error");
let msg = format!("{err:#}");
@@ -463,7 +478,7 @@ mod tests {
let client = RadrootsNostrClient::new(keys.clone());
let _ = client.add_relay("wss://relay.example.com").await;
let (tx, rx) = watch::channel(false);
- let join = tokio::spawn(subscriber(client, keys, shared_state(), rx));
+ let join = tokio::spawn(subscriber(client, keys, shared_runtime(), rx));
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = tx.send(true);
let _ = join.await;
@@ -481,7 +496,7 @@ mod tests {
.notifications
.push_back(Err(()));
let (_tx, rx) = watch::channel(false);
- let err = subscriber(client, keys, shared_state(), rx)
+ let err = subscriber(client, keys, shared_runtime(), rx)
.await
.expect_err("closed notifications");
let msg = format!("{err:#}");
@@ -502,7 +517,7 @@ mod tests {
.push_back(Ok(scripted_shutdown_notification()));
let (tx, rx) = watch::channel(false);
- let join = tokio::spawn(subscriber(client, keys, shared_state(), rx));
+ let join = tokio::spawn(subscriber(client, keys, shared_runtime(), rx));
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let _ = tx.send(true);
let result = join.await.expect("subscriber join");
@@ -528,7 +543,7 @@ mod tests {
"resolve-failed".to_string(),
)));
let (tx, rx) = watch::channel(false);
- let join = tokio::spawn(subscriber(client, keys, shared_state(), rx));
+ let join = tokio::spawn(subscriber(client, keys, shared_runtime(), rx));
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let _ = tx.send(true);
let _ = join.await;
@@ -564,7 +579,7 @@ mod tests {
drop(hooks);
let (tx, rx) = watch::channel(false);
- let join = tokio::spawn(subscriber(client, keys, shared_state(), rx));
+ let join = tokio::spawn(subscriber(client, keys, shared_runtime(), rx));
tokio::time::sleep(std::time::Duration::from_millis(40)).await;
let _ = tx.send(true);
let _ = join.await;
@@ -595,7 +610,7 @@ mod tests {
drop(hooks);
let (_tx, rx) = watch::channel(false);
- let err = subscriber(client, keys, shared_state(), rx)
+ let err = subscriber(client, keys, shared_runtime(), rx)
.await
.expect_err("notifications closed");
let msg = format!("{err:#}");
@@ -610,7 +625,7 @@ mod tests {
let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "event")
.sign_with_keys(&keys)
.expect("event");
- let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default()));
+ let runtime = shared_runtime();
let mut hooks = subscriber_test_hooks()
.lock()
@@ -624,7 +639,9 @@ mod tests {
.push_back(Err(TradeListingDvmError::InvalidOrder));
drop(hooks);
- process_event_notification(event, keys, client, state).await;
+ process_event_notification(event, keys, client, runtime)
+ .await
+ .expect("processing");
}
#[tokio::test]
@@ -638,7 +655,7 @@ mod tests {
let event_err = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "err")
.sign_with_keys(&keys)
.expect("event err");
- let state = shared_state();
+ let runtime = shared_runtime();
let mut hooks = subscriber_test_hooks()
.lock()
@@ -652,7 +669,11 @@ mod tests {
hooks.handle_error_results.push_back(Ok(()));
drop(hooks);
- process_event_notification(event_ok, keys.clone(), client.clone(), state.clone()).await;
- process_event_notification(event_err, keys, client, state).await;
+ process_event_notification(event_ok, keys.clone(), client.clone(), runtime.clone())
+ .await
+ .expect("ok path");
+ process_event_notification(event_err, keys, client, runtime)
+ .await
+ .expect("error path");
}
}
diff --git a/src/lib.rs b/src/lib.rs
@@ -11,6 +11,7 @@ pub use cli::Args as cli_args;
use anyhow::Result;
use std::time::Duration;
+use crate::features::trade_listing::state::{TradeListingRuntime, TradeListingRuntimeConfig};
use crate::rhi::{Rhi, start_subscriber};
use radroots_identity::RadrootsIdentity;
use radroots_nostr::prelude::{
@@ -110,8 +111,14 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()>
args.service.allow_generate_identity,
)?;
let keys = identity.keys().clone();
-
- let rhi = Rhi::new(keys.clone());
+ let trade_listing_runtime = TradeListingRuntime::load(TradeListingRuntimeConfig {
+ state_path: settings.config.subscriber.state.path.clone(),
+ replay_window_secs: settings.config.subscriber.state.replay_window_secs,
+ replay_overlap_secs: settings.config.subscriber.state.replay_overlap_secs,
+ })
+ .await?;
+
+ let rhi = Rhi::with_trade_listing_runtime(keys.clone(), trade_listing_runtime);
let client = rhi.client.clone();
let service_cfg = settings.config.service.clone();
let relays = service_cfg.relays.clone();
@@ -150,7 +157,7 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()>
let handle = start_subscriber(
client.clone(),
keys.clone(),
- rhi.trade_listing_runtime.state(),
+ rhi.trade_listing_runtime.clone(),
settings.config.subscriber.backoff.clone(),
)
.await;
@@ -222,6 +229,10 @@ mod tests {
factor: 1,
jitter_ms: 0,
},
+ state: config::SubscriberStateConfig {
+ path: unique_state_path("settings"),
+ ..Default::default()
+ },
},
},
}
@@ -245,6 +256,14 @@ mod tests {
std::env::temp_dir().join(format!("rhi-{suffix}-{nanos}.json"))
}
+ fn unique_state_path(suffix: &str) -> PathBuf {
+ let nanos = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("time")
+ .as_nanos();
+ std::env::temp_dir().join(format!("rhi-state-{suffix}-{nanos}.json"))
+ }
+
#[tokio::test]
async fn run_rhi_completes_with_auto_stop_and_empty_relays() {
let _guard = test_guard();
diff --git a/src/main.rs b/src/main.rs
@@ -162,7 +162,7 @@ mod tests {
let handle = rhi::rhi::start_subscriber(
client,
keys,
- TradeListingRuntime::new().state(),
+ TradeListingRuntime::new(),
radroots_runtime::BackoffConfig {
base_ms: 1,
max_ms: 2,
diff --git a/src/rhi.rs b/src/rhi.rs
@@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys};
use radroots_runtime::{Backoff, BackoffConfig};
-use crate::features::trade_listing::state::{SharedTradeListingState, TradeListingRuntime};
+use crate::features::trade_listing::state::TradeListingRuntime;
#[cfg(not(test))]
fn connection_wait_timeout() -> Duration {
@@ -31,7 +31,7 @@ fn subscriber_result_hook()
async fn run_subscriber_once(
client: RadrootsNostrClient,
keys: RadrootsNostrKeys,
- state: SharedTradeListingState,
+ runtime: TradeListingRuntime,
stop_rx: tokio::sync::watch::Receiver<bool>,
) -> Result<(), anyhow::Error> {
#[cfg(test)]
@@ -43,7 +43,7 @@ async fn run_subscriber_once(
return result;
}
- crate::features::trade_listing::subscriber::subscriber(client, keys, state, stop_rx).await
+ crate::features::trade_listing::subscriber::subscriber(client, keys, runtime, stop_rx).await
}
async fn wait_for_connection_or_stop(
@@ -67,11 +67,18 @@ pub struct Rhi {
impl Rhi {
pub fn new(keys: RadrootsNostrKeys) -> Self {
+ Self::with_trade_listing_runtime(keys, TradeListingRuntime::new())
+ }
+
+ pub fn with_trade_listing_runtime(
+ keys: RadrootsNostrKeys,
+ trade_listing_runtime: TradeListingRuntime,
+ ) -> Self {
let client = RadrootsNostrClient::new(keys);
Self {
_started: Instant::now(),
client,
- trade_listing_runtime: TradeListingRuntime::new(),
+ trade_listing_runtime,
}
}
}
@@ -110,7 +117,7 @@ impl RhiHandle {
pub async fn start_subscriber(
client: RadrootsNostrClient,
keys: RadrootsNostrKeys,
- state: SharedTradeListingState,
+ runtime: TradeListingRuntime,
backoff_cfg: BackoffConfig,
) -> RhiHandle {
let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false);
@@ -127,9 +134,13 @@ pub async fn start_subscriber(
break;
}
- let res =
- run_subscriber_once(client.clone(), keys.clone(), state.clone(), stop_rx.clone())
- .await;
+ let res = run_subscriber_once(
+ client.clone(),
+ keys.clone(),
+ runtime.clone(),
+ stop_rx.clone(),
+ )
+ .await;
let failed = res.is_err();
@@ -216,7 +227,7 @@ mod tests {
let handle_err = start_subscriber(
client_err,
keys.clone(),
- TradeListingRuntime::new().state(),
+ TradeListingRuntime::new(),
cfg.clone(),
)
.await;
@@ -230,8 +241,7 @@ mod tests {
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push_back(Ok(()));
- let handle_ok =
- start_subscriber(client_ok, keys, TradeListingRuntime::new().state(), cfg).await;
+ let handle_ok = start_subscriber(client_ok, keys, TradeListingRuntime::new(), cfg).await;
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
handle_ok.stop();
handle_ok.stopped().await;
@@ -244,7 +254,7 @@ mod tests {
let handle = start_subscriber(
client,
keys,
- TradeListingRuntime::new().state(),
+ TradeListingRuntime::new(),
BackoffConfig {
base_ms: 25,
max_ms: 50,
@@ -270,7 +280,7 @@ mod tests {
let handle = start_subscriber(
client,
keys,
- TradeListingRuntime::new().state(),
+ TradeListingRuntime::new(),
BackoffConfig {
base_ms: 200,
max_ms: 200,