rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 8907b64eb82ad5ac0f85eb625914c72d6f4bc5f0
parent 9f673e9e59a27336cb98037730802e92f18a7bac
Author: triesap <137732411+triesap@users.noreply.github.com>
Date:   Sat, 12 Apr 2025 19:57:34 +0000

Adds event subscription handlers module.

Diffstat:
Asrc/events/job_request.rs | 37+++++++++++++++++++++++++++++++++++++
Asrc/events/mod.rs | 1+
Msrc/lib.rs | 1+
Msrc/main.rs | 46+++++++---------------------------------------
4 files changed, 46 insertions(+), 39 deletions(-)

diff --git a/src/events/job_request.rs b/src/events/job_request.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use nostr::{event::Kind, filter::Filter, key::Keys, types::Timestamp}; +use nostr_sdk::Client; +use nostr_sdk::RelayPoolNotification; +use tracing::info; + +use crate::KIND_JOB_REQUEST; + +pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> { + info!("Starting subscriber for kind {}", KIND_JOB_REQUEST); + let client = Client::new(keys.clone()); + + for relay in &relays { + client.add_relay(relay).await?; + } + + let filter = Filter::new() + .kind(Kind::Custom(KIND_JOB_REQUEST)) + .since(Timestamp::now()); + + client.connect().await; + client.subscribe(filter, None).await?; + + let mut notifications = client.notifications(); + + while let Ok(n) = notifications.recv().await { + if let RelayPoolNotification::Event { event, .. } = n { + if event.kind == Kind::Custom(KIND_JOB_REQUEST) { + info!("Receieved job request event: {:?}", { event.clone() }); + } + } + } + + client.disconnect().await; + + Ok(()) +} diff --git a/src/events/mod.rs b/src/events/mod.rs @@ -0,0 +1 @@ +pub mod job_request; diff --git a/src/lib.rs b/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod events; pub mod keys; pub const KIND_JOB_REQUEST: u16 = 5300; diff --git a/src/main.rs b/src/main.rs @@ -1,8 +1,8 @@ use anyhow::Result; use clap::Parser; -use nostr::{Filter, Keys, Kind, Timestamp, event::Event}; -use nostr_sdk::{Client, RelayPoolNotification}; -use rhi::{KIND_JOB_REQUEST, config::Settings, keys::KeyProfile}; +use nostr::event::Event; +use nostr_sdk::Client; +use rhi::{config::Settings, events, keys::KeyProfile}; use tokio::signal::unix::{SignalKind, signal}; use tracing::{error, info}; @@ -10,40 +10,6 @@ fn init_tracing() { tracing_subscriber::fmt::init(); } -async fn subscribe(keys: Keys, relays: Vec<String>) -> Result<()> { - let client = Client::new(keys); - for relay in relays.iter() { - client.add_relay(relay).await?; - } - client.connect().await; - - let filter = Filter::new() - .kind(Kind::Custom(KIND_JOB_REQUEST)) - .since(Timestamp::now()); - - client.subscribe(filter, None).await?; - - info!("Subscription started for kind {}", { - KIND_JOB_REQUEST.to_string() - }); - - let mut notifications = client.notifications(); - - while let Ok(notification) = notifications.recv().await { - match notification { - RelayPoolNotification::Event { event, .. } => { - info!("Event received {:?}", { event.clone() }); - } - RelayPoolNotification::Message { .. } => {} - RelayPoolNotification::Shutdown => {} - } - } - - client.disconnect().await; - - Ok(()) -} - #[derive(Parser)] #[command( about = env!("CARGO_PKG_DESCRIPTION"), @@ -124,8 +90,10 @@ async fn main() -> Result<()> { tokio::spawn(async move { loop { - if let Err(e) = subscribe(keys_sub.clone(), relays_sub.clone()).await { - error!("Error on subscription: {e}"); + if let Err(e) = + events::job_request::subscriber(keys_sub.clone(), relays_sub.clone()).await + { + error!("Error on job request subscription: {e}"); } } });