rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 5e246db9e4cc1de1aaffa8847bcbfdaa670de045
parent e8fa6be33f52d8e5167d0c6241399045d5ee03c3
Author: triesap <137732411+triesap@users.noreply.github.com>
Date:   Wed, 20 Aug 2025 15:38:26 -0700

Edit `rhi` migrating to `core`, `events`, `events-codec` and `trade` crates, add `nostr` adapters to implement event traits required by job codec, add/edit nostr utils, update directory tree, update subscriber with feature-scoped `trade_listing` job event handlers.

Diffstat:
M.gitignore | 40+++++++++++++++++++++++++++++++---------
MCargo.lock | 206++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
MCargo.toml | 2+-
Mconfig.toml | 19+++++++++++++++++++
Mcrates/rhi/Cargo.toml | 13+++++++++----
Acrates/rhi/src/adapters/mod.rs | 1+
Acrates/rhi/src/adapters/nostr/event.rs | 78++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/adapters/nostr/mod.rs | 1+
Dcrates/rhi/src/config.rs | 54------------------------------------------------------
Acrates/rhi/src/config/mod.rs | 54++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dcrates/rhi/src/events/job_request.rs | 339-------------------------------------------------------------------------------
Dcrates/rhi/src/events/mod.rs | 1-
Acrates/rhi/src/features/trade_listing/domain/mod.rs | 1+
Acrates/rhi/src/features/trade_listing/domain/pricing.rs | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/accept.rs | 142+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/conveyance.rs | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/fulfillment.rs | 125+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/invoice.rs | 165+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/mod.rs | 7+++++++
Acrates/rhi/src/features/trade_listing/handlers/order.rs | 106+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/payment.rs | 116+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/handlers/receipt.rs | 118+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/features/trade_listing/mod.rs | 3+++
Acrates/rhi/src/features/trade_listing/subscriber.rs | 401+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dcrates/rhi/src/handlers/job_request_order.rs | 72------------------------------------------------------------------------
Dcrates/rhi/src/handlers/job_request_preview.rs | 18------------------
Dcrates/rhi/src/handlers/job_request_quote.rs | 18------------------
Dcrates/rhi/src/handlers/mod.rs | 3---
Acrates/rhi/src/identity/keys.rs | 204+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/infra/mod.rs | 2++
Acrates/rhi/src/infra/nostr.rs | 224+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/rhi/src/infra/telemetry.rs | 23+++++++++++++++++++++++
Dcrates/rhi/src/keys.rs | 200-------------------------------------------------------------------------------
Mcrates/rhi/src/lib.rs | 104+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Mcrates/rhi/src/main.rs | 112++++++++-----------------------------------------------------------------------
Dcrates/rhi/src/models/event_classified.rs | 461-------------------------------------------------------------------------------
Dcrates/rhi/src/models/mod.rs | 1-
Dcrates/rhi/src/utils/mod.rs | 3---
Dcrates/rhi/src/utils/nostr.rs | 206-------------------------------------------------------------------------------
Dcrates/rhi/src/utils/price.rs | 15---------------
Dcrates/rhi/src/utils/unit.rs | 78------------------------------------------------------------------------------
41 files changed, 2337 insertions(+), 1616 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,17 +1,39 @@ /target -justfile -notes*.txt -.tmp* -.vscode -git-diff.txt +# Debug +logs/ + +# Local env files .env .env.* !.env.example !.env.test +# OS .DS_Store +Thumbs.db + +# secrets +*.pem +*.crt +*.key *.pem -*keys*.json -*rhi*.toml -dev*.py -\ No newline at end of file + +# local +.tmp* +.archive* +.dev* +.local* +.vscode +notes*.txt +notes*.md +notes*.json +tree*.txt +git-diff*.txt +prompt*.txt +tree*.txt +justfile + +# dev +*dev*.toml +*.dev*.json +\ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock @@ -489,6 +489,21 @@ dependencies = [ ] [[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] name = "crunchy" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -512,6 +527,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + +[[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1083,9 +1107,18 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "lru" -version = "0.13.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465" +checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] [[package]] name = "memchr" @@ -1115,21 +1148,15 @@ dependencies = [ [[package]] name = "negentropy" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e664971378a3987224f7a0e10059782035e89899ae403718ee07de85bec42afe" - -[[package]] -name = "negentropy" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0efe882e02d206d8d279c20eb40e03baf7cb5136a1476dc084a324fbc3ec42d" [[package]] name = "nostr" -version = "0.40.0" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f900ddcdc28395759fcd44b18a03255e7deee8858551bfe5d5d5a07311d82ea" +checksum = "f30e6dcb36d88017587b0b5578d1ed3398afe8e4f45fdb910e48b8675aaf6f68" dependencies = [ "aes", "base64 0.22.1", @@ -1141,7 +1168,6 @@ dependencies = [ "chacha20poly1305", "getrandom 0.2.15", "instant", - "regex", "scrypt", "secp256k1", "serde", @@ -1152,9 +1178,9 @@ dependencies = [ [[package]] name = "nostr-database" -version = "0.40.0" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "714512e4653f4e7c4f4abb50a0ac82257541b22087dee780b9e3d787276e88d4" +checksum = "b1c75a8c2175d2785ba73cfddef21d1e30da5fbbdf158569b6808ba44973a15b" dependencies = [ "lru", "nostr", @@ -1163,16 +1189,15 @@ dependencies = [ [[package]] name = "nostr-relay-pool" -version = "0.40.1" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bde07a729e0a1b306c9a07da81a0d1d55d0487316017090906f3b6660741b8d" +checksum = "265d9b44771ed15db93b183a0c93dbb703b2b0d0b74dffb5c2a081be52373a5a" dependencies = [ "async-utility", "async-wsocket", "atomic-destructor", "lru", - "negentropy 0.3.1", - "negentropy 0.5.0", + "negentropy", "nostr", "nostr-database", "tokio", @@ -1181,16 +1206,15 @@ dependencies = [ [[package]] name = "nostr-sdk" -version = "0.40.0" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26238eee805d7dc3abcc8d570068c81cb4285b08e9db4d7999e54e20748c472e" +checksum = "599f8963d6a1522a13b1a2b0ea6e168acfc367706606f1d33fa595e91fa22db0" dependencies = [ "async-utility", "nostr", "nostr-database", "nostr-relay-pool", "tokio", - "tracing", ] [[package]] @@ -1204,6 +1228,12 @@ dependencies = [ ] [[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] name = "num-traits" version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1374,6 +1404,12 @@ dependencies = [ ] [[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] name = "ppv-lite86" version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1407,12 +1443,41 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" [[package]] -name = "radroots-common" +name = "radroots-core" version = "0.1.0" dependencies = [ + "rust_decimal", + "rust_decimal_macros", + "serde", + "typeshare", +] + +[[package]] +name = "radroots-events" +version = "0.1.0" +dependencies = [ + "radroots-core", + "serde", + "typeshare", +] + +[[package]] +name = "radroots-events-codec" +version = "0.1.0" +dependencies = [ + "radroots-core", + "radroots-events", + "serde", +] + +[[package]] +name = "radroots-trade" +version = "0.1.0" +dependencies = [ + "radroots-core", + "radroots-events", + "radroots-events-codec", "serde", - "serde_json", - "thiserror 1.0.69", "typeshare", ] @@ -1493,8 +1558,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1505,11 +1579,17 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" @@ -1524,13 +1604,17 @@ dependencies = [ "futures", "nostr", "nostr-sdk", - "radroots-common", + "radroots-core", + "radroots-events", + "radroots-events-codec", + "radroots-trade", "serde", "serde_json", "tempfile", "thiserror 1.0.69", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "uuid", ] @@ -1573,6 +1657,27 @@ dependencies = [ ] [[package]] +name = "rust_decimal" +version = "1.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b203a6425500a03e0919c42d3c47caca51e79f1132046626d2c8871c5092035d" +dependencies = [ + "arrayvec", + "num-traits", + "serde", +] + +[[package]] +name = "rust_decimal_macros" +version = "1.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6268b74858287e1a062271b988a0c534bf85bbeb567fe09331bf40ed78113d5" +dependencies = [ + "quote", + "syn", +] + +[[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1897,6 +2002,37 @@ dependencies = [ ] [[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + +[[package]] name = "tiny-keccak" version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2043,6 +2179,18 @@ dependencies = [ ] [[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + +[[package]] name = "tracing-attributes" version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2080,10 +2228,14 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml @@ -7,6 +7,6 @@ resolver = "2" [workspace.package] version = "0.1.0" edition = "2024" -authors = ["Radroots <info@radroots.dev>"] +authors = ["rad roots <admin@radroots.dev>"] license = "AGPLv3" description = "rhizome is a Nostr data vending machine (NIP-90)" diff --git a/config.toml b/config.toml @@ -27,3 +27,21 @@ name = "rhi" # Lightning address internet identifiers format # lud16 = "" + +[config] +# Where to write logs/rotations. +logs_dir = "logs" + +# Path to the keys profile JSON. +keys_path = "keys.json" + +# Generate a new key profile if none is found. +generate_keys = true + +# Optional NIP-89 application handler identifier. +identifier = "" + +# Relays to connect to for publishing/subscribing. +relays = [ + "ws://127.0.0.1:8080" +] +\ No newline at end of file diff --git a/crates/rhi/Cargo.toml b/crates/rhi/Cargo.toml @@ -7,18 +7,23 @@ license.workspace = true description.workspace = true [dependencies] +radroots-core = { path = "../../../../../crates/radroots-common/crates/core" } +radroots-events = { path = "../../../../../crates/radroots-common/crates/events" } +radroots-events-codec = { path = "../../../../../crates/radroots-common/crates/events-codec" } +radroots-trade = { path = "../../../../../crates/radroots-common/crates/trade" } + anyhow = "1.0" clap = { version = "4", features = ["derive"] } config = "0.15" futures = "0.3" -nostr = { version = "0.40.0", features = ["nip04"] } -nostr-sdk = "0.40.0" +nostr = { version = "0.43.0", features = ["nip04"] } +nostr-sdk = "0.43.0" serde = "1.0" serde_json = "1.0" tempfile = "3.19.1" thiserror = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } +tracing-appender = "0.2" uuid = { version = "1.16.0", features = ["v4"] } -radroots-common = { path = "../../../../../crates/radroots-common/" } diff --git a/crates/rhi/src/adapters/mod.rs b/crates/rhi/src/adapters/mod.rs @@ -0,0 +1 @@ +pub mod nostr; diff --git a/crates/rhi/src/adapters/nostr/event.rs b/crates/rhi/src/adapters/nostr/event.rs @@ -0,0 +1,78 @@ +use nostr::event::Event; +use radroots_events_codec::job::traits::{JobEventBorrow, JobEventLike}; + +#[derive(Clone, Debug)] +pub struct NostrEventAdapter<'a> { + evt: &'a Event, + id_hex: String, + author_hex: String, +} + +impl<'a> NostrEventAdapter<'a> { + #[inline] + pub fn new(evt: &'a Event) -> Self { + Self { + evt, + id_hex: evt.id.to_hex(), + author_hex: evt.pubkey.to_string(), + } + } + + #[inline] + fn tags_as_slices(&self) -> Vec<Vec<String>> { + self.evt + .tags + .iter() + .map(|t| t.as_slice().to_vec()) + .collect() + } +} + +impl<'a> JobEventBorrow<'a> for NostrEventAdapter<'a> { + #[inline] + fn raw_id(&'a self) -> &'a str { + &self.id_hex + } + #[inline] + fn raw_author(&'a self) -> &'a str { + &self.author_hex + } + #[inline] + fn raw_content(&'a self) -> &'a str { + &self.evt.content + } + #[inline] + fn raw_kind(&'a self) -> u32 { + match self.evt.kind { + nostr::event::Kind::Custom(v) => v as u32, + _ => 0, + } + } +} + +impl JobEventLike for NostrEventAdapter<'_> { + fn raw_id(&self) -> String { + self.id_hex.clone() + } + fn raw_author(&self) -> String { + self.author_hex.clone() + } + fn raw_published_at(&self) -> u32 { + self.evt.created_at.as_u64() as u32 + } + fn raw_kind(&self) -> u32 { + match self.evt.kind { + nostr::event::Kind::Custom(v) => v as u32, + _ => 0, + } + } + fn raw_content(&self) -> String { + self.evt.content.clone() + } + fn raw_tags(&self) -> Vec<Vec<String>> { + self.tags_as_slices() + } + fn raw_sig(&self) -> String { + self.evt.sig.to_string() + } +} diff --git a/crates/rhi/src/adapters/nostr/mod.rs b/crates/rhi/src/adapters/nostr/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/crates/rhi/src/config.rs b/crates/rhi/src/config.rs @@ -1,54 +0,0 @@ -use anyhow::Result; -use config::{Config, ConfigError, File}; -use nostr::Metadata; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tracing::{error, warn}; - -#[derive(Debug, Error)] -pub enum SettingsError { - #[error("Configuration loading failed: {0}")] - Load(#[from] ConfigError), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Settings { - pub metadata: Metadata, -} - -impl Settings { - pub fn load(config_path: &Option<String>) -> Result<Self, SettingsError> { - let default = Self::default(); - - match Self::load_from_file(config_path) { - Ok(settings) => Ok(settings), - Err(err) if config_path.is_none() => { - warn!("Could not read config file: {err}. Using default configuration.",); - Ok(default) - } - Err(err) => Err(err), - } - } - - fn load_from_file(config_path: &Option<String>) -> Result<Self, SettingsError> { - let path = config_path.as_deref().unwrap_or("config.toml"); - - let config = Config::builder() - .add_source(File::with_name(path).required(false)) - .build()? - .try_deserialize::<Settings>()?; - - Ok(config) - } -} - -impl Default for Settings { - fn default() -> Self { - Self { - metadata: Metadata { - name: Some("rhi".to_string()), - ..Default::default() - }, - } - } -} diff --git a/crates/rhi/src/config/mod.rs b/crates/rhi/src/config/mod.rs @@ -0,0 +1,54 @@ +use config::{Config, ConfigError, File}; +use nostr::Metadata; +use serde::{Deserialize, Serialize}; +use std::path::Path; +use thiserror::Error; +use tracing::error; + +#[derive(Debug, Error)] +pub enum SettingsError { + #[error("Configuration loading failed: {0}")] + Load(#[from] ConfigError), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Configuration { + pub logs_dir: String, + pub keys_path: String, + pub generate_keys: bool, + pub identifier: Option<String>, + pub relays: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Settings { + pub metadata: Metadata, + pub config: Configuration, +} + +impl Settings { + pub fn load(config_path: &Option<std::path::PathBuf>) -> Result<Self, SettingsError> { + let path: &Path = config_path + .as_deref() + .unwrap_or_else(|| Path::new("config.toml")); + + let builder = Config::builder().add_source(File::from(path).required(true)); + + match builder.build() { + Ok(cfg) => match cfg.try_deserialize::<Settings>() { + Ok(settings) => Ok(settings), + Err(err) => { + error!("❌ Failed to deserialize configuration: {err}"); + Err(SettingsError::Load(err)) + } + }, + Err(err) => { + error!( + "❌ Failed to load configuration from '{}': {err}", + path.display() + ); + Err(SettingsError::Load(err)) + } + } + } +} diff --git a/crates/rhi/src/events/job_request.rs b/crates/rhi/src/events/job_request.rs @@ -1,339 +0,0 @@ -use std::time::Duration; - -use anyhow::Result; -use nostr::event::{Event, EventId, Tag, TagKind}; -use nostr::filter::{Alphabet, SingleLetterTag}; -use nostr::{event::Kind, key::Keys}; -use nostr_sdk::Client; -use nostr_sdk::RelayPoolNotification; -use radroots_common::KIND_JOB_REQUEST; -use tokio::time::sleep; -use tracing::{info, warn}; - -use crate::handlers::job_request_order::{JobRequestOrderError, handle_job_request_order}; -use crate::handlers::job_request_preview::handle_job_request_preview; -use crate::handlers::job_request_quote::handle_job_request_quote; -use crate::utils::nostr::{ - NostrTagsResolveError, NostrUtilsError, nostr_event_job_feedback, nostr_filter_kind, - nostr_filter_new_events, nostr_tag_at_value, nostr_tag_first_value, nostr_tag_relays_parse, - nostr_tag_slice, nostr_tags_resolve, -}; -use crate::utils::unit::MassUnitError; - -#[derive(thiserror::Error, Debug)] -pub enum JobRequestError { - #[error("{0}")] - NostrUtilsError(#[from] NostrUtilsError), - - #[error("{0}")] - MassUnit(#[from] MassUnitError), - - #[error("{0}")] - NostrTagsResolve(#[from] NostrTagsResolveError), - - #[error("Order: {0}")] - JobRequestOrder(#[from] JobRequestOrderError), - - #[error("Invalid job request input type: {0}")] - InvalidInputType(String), - - #[error("Invalid job request input marker: {0}")] - InvalidInputMarker(String), - - #[error("Deserialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Failure to process request")] - Failure, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum JobRequestInputType { - Url, - Event, - Job, - Text, -} - -impl TryFrom<&str> for JobRequestInputType { - type Error = JobRequestError; - - fn try_from(s: &str) -> Result<Self, Self::Error> { - match s { - "url" => Ok(Self::Url), - "event" => Ok(Self::Event), - "job" => Ok(Self::Job), - "text" => Ok(Self::Text), - other => Err(JobRequestError::InvalidInputType(other.to_string())), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum JobRequestInputMarker { - Order, - Quote, - Preview, -} - -impl TryFrom<&str> for JobRequestInputMarker { - type Error = JobRequestError; - - fn try_from(s: &str) -> Result<Self, Self::Error> { - match s { - "order" => Ok(Self::Order), - "quote" => Ok(Self::Quote), - "preview" => Ok(Self::Preview), - other => Err(JobRequestError::InvalidInputMarker(other.to_string())), - } - } -} - -#[derive(Debug, Clone)] -pub struct JobRequestInput { - pub data: String, - pub input_type: JobRequestInputType, - pub relay: Option<String>, - pub marker: Option<JobRequestInputMarker>, -} - -#[derive(Debug, Clone)] -pub struct JobRequest { - pub id: EventId, - pub inputs: Vec<JobRequestInput>, - pub output: Option<String>, - pub bid_msat: Option<u64>, - pub relays: Vec<String>, - pub service_providers: Vec<String>, - pub params: Vec<(String, String)>, - pub hashtags: Vec<String>, - pub tags: Vec<Tag>, -} - -pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> { - info!("Starting subscriber for kind {}", KIND_JOB_REQUEST); - let client = Client::new(keys.clone()); - - for relay in &relays { - client.add_relay(relay).await?; - } - - let filter = nostr_filter_new_events(nostr_filter_kind(KIND_JOB_REQUEST)); - - client.connect().await; - client.subscribe(filter, None).await?; - - let mut notifications = client.notifications(); - - while let Ok(n) = notifications.recv().await { - if let RelayPoolNotification::Event { event, .. } = n { - if event.kind == Kind::Custom(KIND_JOB_REQUEST) { - let event = (*event).clone(); - let keys = keys.clone(); - let client = client.clone(); - - tokio::spawn(async move { - if let Err(err) = - handle_event(event.clone(), keys.clone(), client.clone()).await - { - let _ = handle_error(err, event, keys, client, None).await; - } - }); - } - } - } - - client.disconnect().await; - - Ok(()) -} - -async fn handle_error( - error: JobRequestError, - event: Event, - _keys: Keys, - client: Client, - _job_req: Option<JobRequest>, -) -> Result<()> { - warn!("job_request handle_error error {}", error); - warn!("job_request handle_error event {:?}", { event.clone() }); - - let builder = nostr_event_job_feedback(&event, error, "error", None)?; - let event_id = client.send_event_builder(builder).await?; - - warn!("job_request handle_error sent feedback {:?}", { - event_id.clone() - }); - Ok(()) -} - -async fn handle_event(event: Event, keys: Keys, client: Client) -> Result<(), JobRequestError> { - let job_req = parse_event(&event, &keys)?; - for job_req_input in &job_req.inputs { - let marker = job_req_input - .marker - .as_ref() - .ok_or_else(|| JobRequestError::InvalidInputMarker(job_req.id.to_string()))?; - - match marker { - JobRequestInputMarker::Order => { - process_job_request( - handle_job_request_order, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - job_req_input.clone(), - ) - .await; - } - JobRequestInputMarker::Quote => { - process_job_request( - handle_job_request_quote, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - job_req_input.clone(), - ) - .await; - } - JobRequestInputMarker::Preview => { - process_job_request( - handle_job_request_preview, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - job_req_input.clone(), - ) - .await; - } - } - } - - Ok(()) -} - -fn parse_event(event: &Event, keys: &Keys) -> Result<JobRequest, JobRequestError> { - let tags = nostr_tags_resolve(event, keys)?; - let mut inputs = vec![]; - let mut output = None; - let mut bid_msat = None; - let mut relays = vec![]; - let mut providers = vec![]; - let mut params = vec![]; - let mut hashtags = vec![]; - - for tag in &tags { - match tag.kind() { - TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::I) => { - if let Some(vals) = nostr_tag_slice(tag, 1) { - match &vals[..] { - [data, input_type, relay, marker, ..] => { - let data = data.clone(); - let input_type = JobRequestInputType::try_from(input_type.as_str())?; - let relay = relay.clone(); - let marker = JobRequestInputMarker::try_from(marker.as_str())?; - inputs.push(JobRequestInput { - data, - input_type, - relay: Some(relay), - marker: Some(marker), - }); - } - _ => continue, - } - } - } - - TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::T) => { - if let Some(val) = nostr_tag_first_value(tag, "t") { - hashtags.push(val); - } - } - - TagKind::Custom(ref k) if k == "output" => { - output = nostr_tag_first_value(tag, k); - } - - TagKind::Custom(ref k) if k == "bid" => { - bid_msat = nostr_tag_first_value(tag, k).and_then(|s| s.parse().ok()); - } - - TagKind::Custom(k) if k == "param" => { - if let Some(vals) = nostr_tag_slice(tag, 1) { - if vals.len() >= 2 { - params.push((vals[0].clone(), vals[1].clone())); - } - } - } - - TagKind::Relays => { - if let Some(urls) = nostr_tag_relays_parse(tag) { - relays = urls.into_iter().map(|u| u.to_string()).collect(); - } - } - - TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::P) => { - if let Some(pk) = nostr_tag_at_value(tag, 1) { - providers.push(pk); - } - } - - _ => {} - } - } - - Ok(JobRequest { - id: event.id, - inputs, - output, - bid_msat, - relays, - service_providers: providers, - tags, - params, - hashtags, - }) -} - -async fn process_job_request<F, Fut>( - handler: F, - event: Event, - keys: Keys, - client: Client, - job_req: JobRequest, - job_req_input: JobRequestInput, -) where - F: FnOnce(Event, Keys, Client, JobRequest, JobRequestInput) -> Fut, - Fut: std::future::Future<Output = Result<(), JobRequestError>>, -{ - if cfg!(debug_assertions) { - sleep(Duration::from_millis(500)).await; - } - - let error_event = event.clone(); - let error_job_req = job_req.clone(); - let error_keys = keys.clone(); - let error_client = client.clone(); - - if let Err(err) = handler( - event, - keys.clone(), - client.clone(), - job_req.clone(), - job_req_input.clone(), - ) - .await - { - let _ = handle_error( - err, - error_event, - error_keys, - error_client, - Some(error_job_req), - ) - .await; - } -} diff --git a/crates/rhi/src/events/mod.rs b/crates/rhi/src/events/mod.rs @@ -1 +0,0 @@ -pub mod job_request; diff --git a/crates/rhi/src/features/trade_listing/domain/mod.rs b/crates/rhi/src/features/trade_listing/domain/mod.rs @@ -0,0 +1 @@ +pub mod pricing; diff --git a/crates/rhi/src/features/trade_listing/domain/pricing.rs b/crates/rhi/src/features/trade_listing/domain/pricing.rs @@ -0,0 +1,97 @@ +use radroots_core::{RadrootsCoreQuantity, RadrootsCoreQuantityPrice}; +use radroots_events::listing::models::{ + RadrootsListing, RadrootsListingDiscount, RadrootsListingQuantity, +}; +use radroots_trade::prelude::price_ext::ListingPricingExt; +use radroots_trade::prelude::stage::order::{ + TradeListingOrderRequestPayload, TradeListingOrderResult, +}; + +use crate::features::trade_listing::handlers::order::JobRequestOrderError; + +pub trait ListingOrderCalculator { + fn calculate_order( + &self, + order: &TradeListingOrderRequestPayload, + ) -> Result<TradeListingOrderResult, JobRequestOrderError>; +} + +impl ListingOrderCalculator for RadrootsListing { + fn calculate_order( + &self, + order: &TradeListingOrderRequestPayload, + ) -> Result<TradeListingOrderResult, JobRequestOrderError> { + let req_qty: &RadrootsListingQuantity = &order.quantity; + let req_qty_amount = req_qty.value.amount; + let req_qty_unit = req_qty.value.unit; + let req_qty_label_opt = req_qty.label.as_deref(); + + let matched_packaging = self.quantities.iter().any(|q| { + let same_amount = q.value.amount.normalize() == req_qty_amount.normalize(); + let same_unit = q.value.unit == req_qty_unit; + let label_ok = match (q.label.as_deref(), req_qty_label_opt) { + (Some(l), Some(r)) => l == r, + (None, None) => true, + _ => false, + }; + same_amount && same_unit && label_ok + }); + + if !matched_packaging { + return Err(JobRequestOrderError::Unsatisfiable(format!( + "requested packaging {} {} {} not available", + req_qty_amount, + req_qty_unit, + req_qty_label_opt.unwrap_or("") + ))); + } + + let req_money = order.price.amount.clone().quantize_to_currency(); + + let matched_tier: &RadrootsCoreQuantityPrice = self + .prices + .iter() + .find(|p| { + let money_ok = p.amount.currency == req_money.currency + && p.amount.amount.normalize() == req_money.amount.normalize(); + let per_amt_ok = + p.quantity.amount.normalize() == order.price.quantity.amount.normalize(); + let per_unit_ok = p.quantity.unit == order.price.quantity.unit; + money_ok && per_amt_ok && per_unit_ok + }) + .ok_or_else(|| { + JobRequestOrderError::Unsatisfiable(format!( + "no matching price tier {} {} found", + order.price.quantity.amount, order.price.quantity.unit + )) + })?; + + let price_amount = matched_tier.amount.clone(); + let price_quantity = matched_tier.quantity.clone(); + + let discounts_out: Vec<RadrootsListingDiscount> = + self.discounts.clone().unwrap_or_default(); + + let out_quantity = RadrootsListingQuantity { + value: RadrootsCoreQuantity::new(req_qty_amount, req_qty_unit), + label: req_qty.label.clone(), + count: req_qty.count, + }; + + let out_price = RadrootsCoreQuantityPrice { + amount: price_amount.clone(), + quantity: price_quantity.clone(), + }; + + let out_subtotal = out_price.subtotal_for(&out_quantity); + let out_total = out_price.total_for(&out_quantity); + + Ok(TradeListingOrderResult { + quantity: out_quantity, + price: out_price, + discounts: discounts_out, + subtotal: out_subtotal, + total: out_total, + }) + } +} diff --git a/crates/rhi/src/features/trade_listing/handlers/accept.rs b/crates/rhi/src/features/trade_listing/handlers/accept.rs @@ -0,0 +1,142 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{ + JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, + }, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::{ + kinds::{KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_ORDER_RES}, + tags::push_trade_listing_chain_tags, + }, + prelude::stage::accept::{TradeListingAcceptRequest, TradeListingAcceptResult}, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestAcceptError { + #[error("Failed to parse accept request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Unauthorized: accepting profile must own the listing")] + Unauthorized, + #[error("Order result not kind 6301 or listing mismatch")] + InvalidOrderResult, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +pub async fn handle_job_request_trade_accept( + event_job_request: Event, + keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingAcceptRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestAcceptError::ParseRequest(e.to_string()))?; + + let order_res_evt = nostr_fetch_event_by_id(client.clone(), &req.order_result_event_id) + .await + .map_err(|_| JobRequestAcceptError::FetchReference(req.order_result_event_id.clone()))?; + + let listing_evt = nostr_fetch_event_by_id(client.clone(), &req.listing_event_id) + .await + .map_err(|_| JobRequestAcceptError::FetchReference(req.listing_event_id.clone()))?; + + if listing_evt.pubkey != keys.public_key() { + return Err(JobRequestAcceptError::Unauthorized.into()); + } + + if order_res_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ORDER_RES) { + return Err(JobRequestAcceptError::InvalidOrderResult.into()); + } + let order_refs_listing = order_res_evt.tags.iter().any(|t| { + let s = t.as_slice(); + s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT) + && s.get(1).map(String::as_str) == Some(req.listing_event_id.as_str()) + }); + if !order_refs_listing { + return Err(JobRequestAcceptError::InvalidOrderResult.into()); + } + + let accept_result = TradeListingAcceptResult { + listing_event_id: req.listing_event_id.clone(), + order_result_event_id: req.order_result_event_id.clone(), + accepted_by: keys.public_key().to_string(), + }; + let payload_json = serde_json::to_string(&accept_result)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ACCEPT_RES); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = order_res_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .unwrap_or_else(|| req.listing_event_id.clone()); + + let trade_id = order_res_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.order_result_event_id.clone()), + trade_id, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/accept ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/conveyance.rs b/crates/rhi/src/features/trade_listing/handlers/conveyance.rs @@ -0,0 +1,120 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{ + JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, + }, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::{kinds::KIND_TRADE_LISTING_ACCEPT_RES, tags::push_trade_listing_chain_tags}, + prelude::stage::conveyance::{TradeListingConveyanceRequest, TradeListingConveyanceResult}, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestConveyanceError { + #[error("Failed to parse conveyance request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Invalid accept result kind")] + InvalidAcceptKind, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +pub async fn handle_job_request_trade_conveyance( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingConveyanceRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestConveyanceError::ParseRequest(e.to_string()))?; + + let accept_evt = nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) + .await + .map_err(|_| { + JobRequestConveyanceError::FetchReference(req.accept_result_event_id.clone()) + })?; + if accept_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { + return Err(JobRequestConveyanceError::InvalidAcceptKind.into()); + } + + let conv_res = TradeListingConveyanceResult { + verified: true, + method: req.method, + message: Some("conveyance method verified".into()), + }; + let payload_json = serde_json::to_string(&conv_res)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let d_tag = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone().unwrap_or_default(), + Some(req.accept_result_event_id.clone()), + d_tag, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/conveyance ({}={:?}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/fulfillment.rs b/crates/rhi/src/features/trade_listing/handlers/fulfillment.rs @@ -0,0 +1,125 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::tags::push_trade_listing_chain_tags, + prelude::{ + kinds::KIND_TRADE_LISTING_PAYMENT_RES, + stage::fulfillment::{ + TradeListingFulfillmentRequest, TradeListingFulfillmentResult, + TradeListingFulfillmentState, + }, + }, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestFulfillmentError { + #[error("Failed to parse fulfillment request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Payment result not kind 6305 or missing chain")] + InvalidPayment, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +pub async fn handle_job_request_trade_fulfillment( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingFulfillmentRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestFulfillmentError::ParseRequest(e.to_string()))?; + + let payment_evt = nostr_fetch_event_by_id(client.clone(), &req.payment_result_event_id) + .await + .map_err(|_| { + JobRequestFulfillmentError::FetchReference(req.payment_result_event_id.clone()) + })?; + if payment_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_PAYMENT_RES) { + return Err(JobRequestFulfillmentError::InvalidPayment.into()); + } + + let e_root = payment_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestFulfillmentError::InvalidPayment)?; + + let d_tag = payment_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let status = TradeListingFulfillmentResult { + state: TradeListingFulfillmentState::Preparing, + tracking: None, + eta: None, + notes: Some("order accepted and paid; preparing shipment".into()), + }; + let payload_json = serde_json::to_string(&status)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.payment_result_event_id.clone()), + d_tag, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/fulfillment ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/invoice.rs b/crates/rhi/src/features/trade_listing/handlers/invoice.rs @@ -0,0 +1,165 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{ + JobPaymentRequest, + request::models::{RadrootsJobInput, RadrootsJobParam}, + result::models::RadrootsJobResult, + }, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_PREV, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::tags::push_trade_listing_chain_tags, + prelude::{ + kinds::{ + KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_INVOICE_RES, + KIND_TRADE_LISTING_ORDER_RES, + }, + stage::invoice::{TradeListingInvoiceRequest, TradeListingInvoiceResult}, + }, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestInvoiceError { + #[error("Failed to parse invoice request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Accept result not kind 6302 or missing chain")] + InvalidAccept, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +fn param_lookup<'a>(params: &'a [RadrootsJobParam], key: &str) -> Option<&'a str> { + params + .iter() + .find(|p| p.key == key) + .map(|p| p.value.as_str()) +} + +pub async fn handle_job_request_trade_invoice( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingInvoiceRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestInvoiceError::ParseRequest(e.to_string()))?; + + let accept_evt = nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) + .await + .map_err(|_| JobRequestInvoiceError::FetchReference(req.accept_result_event_id.clone()))?; + if accept_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { + return Err(JobRequestInvoiceError::InvalidAccept.into()); + } + + let e_root = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestInvoiceError::InvalidAccept)?; + + let d_tag = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let order_res_id = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_PREV)).then(|| s.get(1).cloned()) + }) + .flatten(); + + if let Some(prev_id) = &order_res_id { + if let Ok(prev_evt) = nostr_fetch_event_by_id(client.clone(), prev_id).await { + if prev_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ORDER_RES) {} + } + } + + let amount_sat = param_lookup(&job_req.model.params, "amount_sat") + .and_then(|v| v.parse::<u32>().ok()) + .or_else(|| { + param_lookup(&job_req.model.params, "amount_msat") + .and_then(|v| v.parse::<u64>().ok()) + .map(|msat| (msat / 1000) as u32) + }) + .unwrap_or(0); + + let bolt11 = param_lookup(&job_req.model.params, "bolt11").map(|s| s.to_string()); + let note = param_lookup(&job_req.model.params, "note").map(|s| s.to_string()); + let expires_at = + param_lookup(&job_req.model.params, "expires_at").and_then(|v| v.parse::<u32>().ok()); + + let invoice = TradeListingInvoiceResult { + total_sat: amount_sat, + bolt11: bolt11.clone(), + note, + expires_at, + }; + let payload_json = serde_json::to_string(&invoice)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_INVOICE_RES); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: Some(JobPaymentRequest { amount_sat, bolt11 }), + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.accept_result_event_id.clone()), + d_tag, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/invoice ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/mod.rs b/crates/rhi/src/features/trade_listing/handlers/mod.rs @@ -0,0 +1,7 @@ +pub mod accept; +pub mod conveyance; +pub mod fulfillment; +pub mod invoice; +pub mod order; +pub mod payment; +pub mod receipt; diff --git a/crates/rhi/src/features/trade_listing/handlers/order.rs b/crates/rhi/src/features/trade_listing/handlers/order.rs @@ -0,0 +1,106 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{ + JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, + }, + kinds::result_kind_for_request_kind, + listing::models::RadrootsListing, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_ORDER_RES, stage::order::TradeListingOrderRequest, tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::{ + domain::pricing::ListingOrderCalculator, + subscriber::{JobRequestCtx, JobRequestError}, + }, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestOrderError { + #[error("Failed to parse reference event: {0}")] + ParseReference(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Reference event does not meet request requirements: {0}")] + MissingRequested(String), + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), + #[error("Request cannot be satisfied: {0}")] + Unsatisfiable(String), +} + +pub async fn handle_job_request_trade_order( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let order_data: TradeListingOrderRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestOrderError::ParseReference(e.to_string()))?; + + let ref_id = &order_data.event.id; + let ref_event = nostr_fetch_event_by_id(client.clone(), ref_id) + .await + .map_err(|_| JobRequestOrderError::FetchReference(ref_id.clone()))?; + + let listing: RadrootsListing = serde_json::from_str(&ref_event.content).map_err(|_| { + JobRequestOrderError::ParseReference(format!("invalid listing content for {}", ref_id)) + })?; + + let order_result = listing.calculate_order(&order_data.payload)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ORDER_RES as u16); + + let payload_json = serde_json::to_string(&order_result)?; + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = ref_event.id.to_hex(); + let trade_id = format!("trade:{}:{}", e_root, event_job_request.id.to_hex()); + tags::push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + None::<String>, + Some(trade_id.clone()), + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/order (e_root={}) result sent: {:?}", + e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/payment.rs b/crates/rhi/src/features/trade_listing/handlers/payment.rs @@ -0,0 +1,116 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_INVOICE_RES, + stage::payment::{TradeListingPaymentProofRequest, TradeListingPaymentResult}, + tags::push_trade_listing_chain_tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestPaymentError { + #[error("Failed to parse payment request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Invoice result not kind 6304 or missing chain")] + InvalidInvoice, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +pub async fn handle_job_request_trade_payment( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingPaymentProofRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestPaymentError::ParseRequest(e.to_string()))?; + + let invoice_evt = nostr_fetch_event_by_id(client.clone(), &req.invoice_result_event_id) + .await + .map_err(|_| JobRequestPaymentError::FetchReference(req.invoice_result_event_id.clone()))?; + if invoice_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_INVOICE_RES) { + return Err(JobRequestPaymentError::InvalidInvoice.into()); + } + + let e_root = invoice_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestPaymentError::InvalidInvoice)?; + + let d_tag = invoice_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let ack = TradeListingPaymentResult { + verified: true, + message: Some("payment proof accepted".into()), + }; + let payload_json = serde_json::to_string(&ack)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.invoice_result_event_id.clone()), + d_tag, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/payment ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/handlers/receipt.rs b/crates/rhi/src/features/trade_listing/handlers/receipt.rs @@ -0,0 +1,118 @@ +use nostr::{event::Event, key::Keys}; +use nostr_sdk::{Client, client::Error as NostrClientError}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, + kinds::result_kind_for_request_kind, + tag::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_FULFILL_RES, + stage::receipt::{TradeListingReceiptRequest, TradeListingReceiptResult}, + tags::push_trade_listing_chain_tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, + infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestReceiptError { + #[error("Failed to parse receipt request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Fulfillment result not kind 6306 or missing chain")] + InvalidFulfillment, + #[error("Failed to send job response")] + ResponseSend(#[from] NostrClientError), +} + +pub async fn handle_job_request_trade_receipt( + event_job_request: Event, + _keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingReceiptRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestReceiptError::ParseRequest(e.to_string()))?; + + let fulfill_evt = nostr_fetch_event_by_id(client.clone(), &req.fulfillment_result_event_id) + .await + .map_err(|_| { + JobRequestReceiptError::FetchReference(req.fulfillment_result_event_id.clone()) + })?; + if fulfill_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_FULFILL_RES) { + return Err(JobRequestReceiptError::InvalidFulfillment.into()); + } + + let e_root = fulfill_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestReceiptError::InvalidFulfillment)?; + + let d_tag = fulfill_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let ack = TradeListingReceiptResult { + acknowledged: true, + at: event_job_request.created_at.as_u64() as u32, + }; + let payload_json = serde_json::to_string(&ack)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.fulfillment_result_event_id.clone()), + d_tag, + ); + + let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = nostr_send_event(client, builder).await?; + + info!( + "job request trade/receipt ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/crates/rhi/src/features/trade_listing/mod.rs b/crates/rhi/src/features/trade_listing/mod.rs @@ -0,0 +1,3 @@ +pub mod domain; +pub mod handlers; +pub mod subscriber; diff --git a/crates/rhi/src/features/trade_listing/subscriber.rs b/crates/rhi/src/features/trade_listing/subscriber.rs @@ -0,0 +1,401 @@ +use std::{str::FromStr, time::Duration}; + +use anyhow::Result; +use nostr::event::{Event, EventId}; +use nostr::filter::Filter; +use nostr::{event::Kind, key::Keys}; +use nostr_sdk::{Client, RelayPoolNotification}; +use radroots_events::job::request::models::RadrootsJobInput; +use radroots_events_codec::job::error::JobParseError; +use radroots_events_codec::job::request::decode::job_request_from_tags; +use radroots_events_codec::job::traits::BorrowedEventAdapter; +use radroots_trade::listing::kinds::{ + KIND_TRADE_LISTING_ACCEPT_REQ, KIND_TRADE_LISTING_CONVEYANCE_REQ, + KIND_TRADE_LISTING_FULFILL_REQ, KIND_TRADE_LISTING_INVOICE_REQ, KIND_TRADE_LISTING_ORDER_REQ, + KIND_TRADE_LISTING_PAYMENT_REQ, KIND_TRADE_LISTING_RECEIPT_REQ, is_trade_listing_request_kind, +}; +use radroots_trade::listing::meta::MARKER_PAYLOAD; + +use tokio::time::sleep; +use tracing::{info, warn}; + +use crate::adapters::nostr::event::NostrEventAdapter; +use crate::features::trade_listing::handlers::accept::{ + JobRequestAcceptError, handle_job_request_trade_accept, +}; +use crate::features::trade_listing::handlers::conveyance::{ + JobRequestConveyanceError, handle_job_request_trade_conveyance, +}; +use crate::features::trade_listing::handlers::fulfillment::{ + JobRequestFulfillmentError, handle_job_request_trade_fulfillment, +}; +use crate::features::trade_listing::handlers::invoice::{ + JobRequestInvoiceError, handle_job_request_trade_invoice, +}; +use crate::features::trade_listing::handlers::order::{ + JobRequestOrderError, handle_job_request_trade_order, +}; +use crate::features::trade_listing::handlers::payment::{ + JobRequestPaymentError, handle_job_request_trade_payment, +}; +use crate::features::trade_listing::handlers::receipt::{ + JobRequestReceiptError, handle_job_request_trade_receipt, +}; +use crate::infra::nostr::{ + NostrTagsResolveError, NostrUtilsError, nostr_filter_new_events, nostr_tags_resolve, +}; + +#[derive(thiserror::Error, Debug)] +pub enum JobRequestError { + #[error("{0}")] + NostrUtilsError(#[from] NostrUtilsError), + + #[error("{0}")] + NostrTagsResolve(#[from] NostrTagsResolveError), + + #[error("{0}")] + JobParse(#[from] JobParseError), + + #[error("Order: {0}")] + JobRequestOrder(#[from] JobRequestOrderError), + + #[error("Accept: {0}")] + JobRequestAccept(#[from] JobRequestAcceptError), + + #[error("Conveyance: {0}")] + JobRequestConveyance(#[from] JobRequestConveyanceError), + + #[error("Invoice: {0}")] + JobRequestInvoice(#[from] JobRequestInvoiceError), + + #[error("Payment: {0}")] + JobRequestPayment(#[from] JobRequestPaymentError), + + #[error("Fulfillment: {0}")] + JobRequestFulfillment(#[from] JobRequestFulfillmentError), + + #[error("Receipt: {0}")] + JobRequestReceipt(#[from] JobRequestReceiptError), + + #[error("Invalid job request input marker: {0}")] + InvalidInputMarker(String), + + #[error("Deserialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("Failure to process request")] + Failure, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum JobRequestInputMarker { + TradeOrder, + TradeAccept, + TradeConveyance, + TradeInvoice, + TradePayment, + TradeFulfillment, + TradeReceipt, +} + +impl std::fmt::Display for JobRequestInputMarker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + JobRequestInputMarker::TradeOrder => "order", + JobRequestInputMarker::TradeAccept => "accept", + JobRequestInputMarker::TradeConveyance => "conveyance", + JobRequestInputMarker::TradeInvoice => "invoice", + JobRequestInputMarker::TradePayment => "payment", + JobRequestInputMarker::TradeFulfillment => "fulfillment", + JobRequestInputMarker::TradeReceipt => "receipt", + }) + } +} + +impl TryFrom<&str> for JobRequestInputMarker { + type Error = JobRequestError; + fn try_from(s: &str) -> Result<Self, Self::Error> { + match s { + "order" => Ok(Self::TradeOrder), + "accept" => Ok(Self::TradeAccept), + "conveyance" => Ok(Self::TradeConveyance), + "invoice" => Ok(Self::TradeInvoice), + "payment" => Ok(Self::TradePayment), + "fulfillment" => Ok(Self::TradeFulfillment), + "receipt" => Ok(Self::TradeReceipt), + other => Err(JobRequestError::InvalidInputMarker(other.to_string())), + } + } +} + +impl FromStr for JobRequestInputMarker { + type Err = JobRequestError; + fn from_str(s: &str) -> Result<Self, Self::Err> { + Self::try_from(s) + } +} + +#[derive(Debug, Clone)] +pub struct JobRequestCtx { + pub id: EventId, + pub model: radroots_events::job::request::models::RadrootsJobRequest, + pub tags: Vec<nostr::event::Tag>, +} + +pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> { + info!( + "Starting subscriber for trade listing request kinds: {}, {}, {}, {}, {}, {}, {}", + KIND_TRADE_LISTING_ORDER_REQ, + KIND_TRADE_LISTING_ACCEPT_REQ, + KIND_TRADE_LISTING_CONVEYANCE_REQ, + KIND_TRADE_LISTING_INVOICE_REQ, + KIND_TRADE_LISTING_PAYMENT_REQ, + KIND_TRADE_LISTING_FULFILL_REQ, + KIND_TRADE_LISTING_RECEIPT_REQ + ); + + let client = Client::new(keys.clone()); + for relay in &relays { + client.add_relay(relay).await?; + } + + let kinds: Vec<Kind> = vec![ + Kind::Custom(KIND_TRADE_LISTING_ORDER_REQ), + Kind::Custom(KIND_TRADE_LISTING_ACCEPT_REQ), + Kind::Custom(KIND_TRADE_LISTING_CONVEYANCE_REQ), + Kind::Custom(KIND_TRADE_LISTING_INVOICE_REQ), + Kind::Custom(KIND_TRADE_LISTING_PAYMENT_REQ), + Kind::Custom(KIND_TRADE_LISTING_FULFILL_REQ), + Kind::Custom(KIND_TRADE_LISTING_RECEIPT_REQ), + ]; + let filter = nostr_filter_new_events(Filter::new().kinds(kinds)); + + client.connect().await; + client.subscribe(filter, None).await?; + + let mut notifications = client.notifications(); + + while let Ok(n) = notifications.recv().await { + if let RelayPoolNotification::Event { event, .. } = n { + let event = (*event).clone(); + + let kind: u16 = match event.kind { + Kind::Custom(v) => v, + _ => 0, + }; + if !is_trade_listing_request_kind(kind) { + continue; + } + + let keys = keys.clone(); + let client = client.clone(); + + tokio::spawn(async move { + if let Err(err) = handle_event(event.clone(), keys.clone(), client.clone()).await { + let _ = handle_error(err, event, keys, client, None).await; + } + }); + } + } + + client.disconnect().await; + Ok(()) +} + +async fn handle_error( + error: JobRequestError, + event: Event, + _keys: Keys, + client: Client, + _job_req: Option<JobRequestCtx>, +) -> Result<()> { + use crate::infra::nostr::nostr_event_job_feedback; + + warn!("job_request handle_error: {}", error); + warn!("job_request handle_error event: {:?}", event); + + let builder = nostr_event_job_feedback(&event, error, "error", None)?; + let event_id = client.send_event_builder(builder).await?; + warn!("job_request handle_error sent feedback {:?}", event_id); + Ok(()) +} + +async fn handle_event(event: Event, keys: Keys, client: Client) -> Result<(), JobRequestError> { + let job_req = parse_event(&event, &keys)?; + + let kind: u16 = match event.kind { + Kind::Custom(v) => v, + _ => 0, + }; + + #[inline] + fn select_payload_input<'a>(inputs: &'a [RadrootsJobInput]) -> Option<&'a RadrootsJobInput> { + inputs + .iter() + .find(|i| i.marker.as_deref() == Some(MARKER_PAYLOAD)) + .or_else(|| inputs.get(0)) + } + + match kind { + KIND_TRADE_LISTING_ORDER_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_order, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_ACCEPT_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_accept, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_CONVEYANCE_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_conveyance, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_INVOICE_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_invoice, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_PAYMENT_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_payment, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_FULFILL_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_fulfillment, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + KIND_TRADE_LISTING_RECEIPT_REQ => { + let input = select_payload_input(&job_req.model.inputs) + .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; + process_job_request( + handle_job_request_trade_receipt, + event.clone(), + keys.clone(), + client.clone(), + job_req.clone(), + input.clone(), + ) + .await; + } + _ => {} + } + + Ok(()) +} + +fn parse_event(event: &Event, keys: &Keys) -> Result<JobRequestCtx, JobRequestError> { + let originally_encrypted = event + .tags + .iter() + .any(|t| t.kind() == nostr::event::TagKind::Encrypted); + + let resolved_tags = nostr_tags_resolve(event, keys)?; + let tag_slices: Vec<Vec<String>> = resolved_tags + .iter() + .map(|t| t.as_slice().to_vec()) + .collect(); + + let kind: u16 = match event.kind { + Kind::Custom(v) => v, + _ => 0, + }; + + let mut model = job_request_from_tags(kind as u32, &tag_slices)?; + if originally_encrypted { + model.encrypted = true; + } + + let ev = NostrEventAdapter::new(event); + let sig_hex = event.sig.to_string(); + let _evt_view = + BorrowedEventAdapter::new(&ev, event.created_at.as_u64() as u32, &tag_slices, &sig_hex); + + Ok(JobRequestCtx { + id: event.id, + model, + tags: resolved_tags, + }) +} + +async fn process_job_request<F, Fut>( + handler: F, + event: Event, + keys: Keys, + client: Client, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) where + F: FnOnce(Event, Keys, Client, JobRequestCtx, RadrootsJobInput) -> Fut + Send + 'static, + Fut: std::future::Future<Output = Result<(), JobRequestError>> + Send + 'static, +{ + if cfg!(debug_assertions) { + sleep(Duration::from_millis(500)).await; + } + + let error_event = event.clone(); + let error_job_req = job_req.clone(); + let error_keys = keys.clone(); + let error_client = client.clone(); + + if let Err(err) = handler(event, keys, client, job_req, job_req_input).await { + let _ = handle_error( + err, + error_event, + error_keys, + error_client, + Some(error_job_req), + ) + .await; + } +} diff --git a/crates/rhi/src/handlers/job_request_order.rs b/crates/rhi/src/handlers/job_request_order.rs @@ -1,72 +0,0 @@ -use anyhow::Result; -use nostr::{ - event::{Event, Tag, TagKind}, - key::Keys, -}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_common::models::listing_order_request::ListingOrderRequest; -use thiserror::Error; -use tracing::info; - -use crate::{ - events::job_request::{JobRequest, JobRequestError, JobRequestInput}, - models::event_classified::EventClassified, - utils::nostr::{nostr_event_job_result, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestOrderError { - #[error("Failed to parse reference event: {0}")] - ParseReference(String), - - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - - #[error("Reference event not found: {0}")] - MissingReference(String), - - #[error("Reference event does not meet request requirements: {0}")] - MissingRequested(String), - - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), - - #[error("Request cannot be satisfied: {0}")] - Unsatisfiable(String), -} - -pub async fn handle_job_request_order( - event_job_request: Event, - _keys: Keys, - client: Client, - _job_req: JobRequest, - job_req_input: JobRequestInput, -) -> Result<(), JobRequestError> { - let order_data: ListingOrderRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestOrderError::ParseReference(e.to_string()))?; - - let ref_id = &order_data.event.id; - let ref_event = nostr_fetch_event_by_id(client.clone(), ref_id) - .await - .map_err(|_| JobRequestOrderError::FetchReference(ref_id.clone()))?; - - let ref_classified = EventClassified::from_event(&ref_event) - .map_err(|_| JobRequestOrderError::ParseReference(ref_id.clone()))?; - - let order_result = ref_classified.calculate_order(&order_data.payload)?; - - let payload = serde_json::to_string(&order_result)?; - let tags = vec![Tag::custom( - TagKind::custom("e_ref"), - [ref_event.id.to_hex()], - )]; - - let job_result_event = - nostr_event_job_result(&event_job_request, payload, 0, None, Some(tags))?; - - let job_result_event_id = nostr_send_event(client, job_result_event).await?; - - info!("job request order result sent: {:?}", job_result_event_id); - - Ok(()) -} diff --git a/crates/rhi/src/handlers/job_request_preview.rs b/crates/rhi/src/handlers/job_request_preview.rs @@ -1,18 +0,0 @@ -use anyhow::Result; -use nostr::{event::Event, key::Keys}; -use nostr_sdk::Client; -use tracing::info; - -use crate::events::job_request::{JobRequest, JobRequestError, JobRequestInput}; - -pub async fn handle_job_request_preview( - _event: Event, - _keys: Keys, - _client: Client, - job_req: JobRequest, - _job_req_input: JobRequestInput, -) -> Result<(), JobRequestError> { - info!("handle_job_request_preview job_req: {:?}", job_req); - - Ok(()) -} diff --git a/crates/rhi/src/handlers/job_request_quote.rs b/crates/rhi/src/handlers/job_request_quote.rs @@ -1,18 +0,0 @@ -use anyhow::Result; -use nostr::{event::Event, key::Keys}; -use nostr_sdk::Client; -use tracing::info; - -use crate::events::job_request::{JobRequest, JobRequestError, JobRequestInput}; - -pub async fn handle_job_request_quote( - _event: Event, - _keys: Keys, - _client: Client, - job_req: JobRequest, - _job_req_input: JobRequestInput, -) -> Result<(), JobRequestError> { - info!("handle_job_request_quote job_req: {:?}", job_req); - - Ok(()) -} diff --git a/crates/rhi/src/handlers/mod.rs b/crates/rhi/src/handlers/mod.rs @@ -1,3 +0,0 @@ -pub mod job_request_order; -pub mod job_request_preview; -pub mod job_request_quote; diff --git a/crates/rhi/src/identity/keys.rs b/crates/rhi/src/identity/keys.rs @@ -0,0 +1,204 @@ +use anyhow::Result; +use nostr::{ + Event, Keys, + event::{EventBuilder, Kind, Tag, TagKind}, + nips::nip01::Metadata, +}; +use radroots_events::kinds::{KIND_APPLICATION_HANDLER, KIND_JOB_REQUEST_MIN}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{self, File}, + io::{BufReader, Write}, + path::{Path, PathBuf}, + str::FromStr, +}; +use tempfile::NamedTempFile; +use thiserror::Error; +use tracing::{error, warn}; +use uuid::Uuid; + +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; + +#[derive(Error, Debug)] +pub enum KeyProfileError { + #[error("Keys file does not exist at {0}")] + NotFound(PathBuf), + + #[error("Failed to open keys file at {0}: {1}")] + FileOpen(PathBuf, #[source] std::io::Error), + + #[error("Keys file already exists at {0}")] + AlreadyExists(PathBuf), + + #[error("Failed to parse keys file at {0}: {1}")] + FileParse(PathBuf, #[source] serde_json::Error), + + #[error("Failed to serialize keys: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("IO error during key write: {0}")] + Io(#[from] std::io::Error), + + #[error("Failed to persist keys to disk: {0}")] + Persist(#[from] tempfile::PersistError), + + #[error("Failed to build or sign nostr event: {0}")] + NostrBuilder(#[from] nostr::event::builder::Error), + + #[error("Invalid secret key for identifier: {0}")] + InvalidSecretKey(String), + + #[error("Kind 0 metadata must be initialized before building kind {0} application handler")] + MissingMetadata(u32), +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct KeyProfile { + key: String, + identifier: String, + pub metadata: Option<Event>, + pub application_handler: Option<Event>, + + #[serde(skip)] + path: Option<PathBuf>, +} + +impl KeyProfile { + pub fn init<P: AsRef<str>>( + path_str: P, + generate: bool, + identifier_tag: Option<String>, + ) -> Result<Self, KeyProfileError> { + let path = PathBuf::from(path_str.as_ref()); + + if path.exists() { + let file = File::open(&path).map_err(|e| KeyProfileError::FileOpen(path.clone(), e))?; + let reader = BufReader::new(file); + let mut profile: KeyProfile = serde_json::from_reader(reader) + .map_err(|e| KeyProfileError::FileParse(path.clone(), e))?; + profile.path = Some(path.clone()); + + if !profile.identifier.trim().is_empty() { + if let Some(new_id) = identifier_tag { + warn!( + "Provided identifier '{}' is being ignored because the keys file already contains identifier '{}'.", + new_id, profile.identifier + ); + } + } else { + profile.identifier = identifier_tag.unwrap_or_else(|| { + warn!( + "Missing NIP-89 application handler identifier in key file, generating UUID." + ); + Uuid::new_v4().to_string() + }); + profile.persist()?; + } + + Ok(profile) + } else if generate { + let keys = Keys::generate(); + let secret = keys.secret_key(); + let identifier = match identifier_tag { + Some(identifier) => identifier, + None => { + warn!( + "Missing NIP-89 application handler identifier in key file, generating UUID." + ); + Uuid::new_v4().to_string() + } + }; + + let profile = KeyProfile { + key: secret.to_secret_hex(), + identifier, + metadata: None, + application_handler: None, + path: Some(path.clone()), + }; + + profile.atomic_write(&path)?; + Ok(profile) + } else { + Err(KeyProfileError::NotFound(path)) + } + } + + pub fn keys(&self) -> Result<Keys, KeyProfileError> { + Keys::from_str(&self.key) + .map_err(|_| KeyProfileError::InvalidSecretKey(self.identifier.clone())) + } + + fn atomic_write<P: AsRef<Path>>(&self, path: P) -> Result<(), KeyProfileError> { + let json = serde_json::to_string(self)?; + + let dir = path.as_ref().parent().unwrap_or_else(|| Path::new(".")); + let mut temp_file = NamedTempFile::new_in(dir)?; + + temp_file.write_all(json.as_bytes())?; + temp_file.as_file_mut().sync_all()?; + + #[cfg(unix)] + { + fs::set_permissions(temp_file.path(), fs::Permissions::from_mode(0o600))?; + } + + temp_file.persist(path)?; + Ok(()) + } + + fn persist(&self) -> Result<(), KeyProfileError> { + match &self.path { + Some(p) => self.atomic_write(p), + None => Err(KeyProfileError::NotFound(PathBuf::from("[unknown path]"))), + } + } + + pub async fn build_metadata( + &mut self, + metadata: &Metadata, + ) -> Result<Option<Event>, KeyProfileError> { + if self.metadata.is_none() { + let keys = self.keys()?; + let event = EventBuilder::metadata(metadata).sign(&keys).await?; + self.metadata = Some(event.clone()); + self.persist()?; + Ok(Some(event)) + } else { + Ok(None) + } + } + + pub async fn build_application_handler(&mut self) -> Result<Option<Event>, KeyProfileError> { + if self.application_handler.is_none() { + let keys = self.keys()?; + let kind = KIND_APPLICATION_HANDLER; + + let kind_0_content = if let Some(m) = &self.metadata { + m.content.clone() + } else { + return Err(KeyProfileError::MissingMetadata(kind)); + }; + + let tags: Vec<Tag> = vec![ + Tag::custom( + TagKind::Custom("k".into()), + [KIND_JOB_REQUEST_MIN.to_string()], + ), + Tag::identifier(self.identifier.to_string()), + ]; + + let event = EventBuilder::new(Kind::Custom(kind as u16), kind_0_content) + .tags(tags) + .sign(&keys) + .await?; + + self.application_handler = Some(event.clone()); + self.persist()?; + Ok(Some(event)) + } else { + Ok(None) + } + } +} diff --git a/crates/rhi/src/infra/mod.rs b/crates/rhi/src/infra/mod.rs @@ -0,0 +1,2 @@ +pub mod nostr; +pub mod telemetry; diff --git a/crates/rhi/src/infra/nostr.rs b/crates/rhi/src/infra/nostr.rs @@ -0,0 +1,224 @@ +use std::{borrow::Cow, time::Duration}; + +use nostr::{ + event::{Event, EventBuilder, EventId, Kind, Tag, TagKind, TagStandard}, + filter::Filter, + key::{Keys, PublicKey}, + nips::{ + nip04, + nip90::{DataVendingMachineStatus, JobFeedbackData}, + }, + types::{RelayUrl, Timestamp}, +}; +use nostr_sdk::Client; +use nostr_sdk::prelude::*; +use thiserror::Error; + +use crate::features::trade_listing::subscriber::JobRequestError; + +#[derive(Debug, Error)] +pub enum NostrUtilsError { + #[error("Client error: {0}")] + ClientError(#[from] nostr_sdk::client::Error), + + #[error("Event error: {0}")] + EventError(#[from] nostr::event::Error), + + #[error("Event not found: {0}")] + EventNotFound(String), + + #[error("Event builder failure: {0}")] + EventBuildError(#[from] nostr::event::builder::Error), +} + +#[derive(Debug, Error)] +pub enum NostrTagsResolveError { + #[error("Missing public key tag in encrypted event: {0:?}")] + MissingPTag(nostr::Event), + + #[error("Encrypted event recipient mismatch")] + NotRecipient, + + #[error("Decryption error: {0}")] + DecryptionError(String), + + #[error("Failed to parse decrypted tag JSON: {0}")] + ParseError(#[from] serde_json::Error), +} + +pub fn nostr_kind(kind: u16) -> Kind { + Kind::Custom(kind) +} + +pub fn nostr_filter_kind(kind: u16) -> Filter { + Filter::new().kind(Kind::Custom(kind)) +} + +pub fn nostr_filter_new_events(filter: Filter) -> Filter { + filter.since(Timestamp::now()) +} + +pub fn nostr_tag_first_value(tag: &Tag, key: &str) -> Option<String> { + if tag.kind() == TagKind::custom(key) { + tag.content().map(|v| v.to_string()) + } else { + None + } +} + +pub fn nostr_tag_at_value(tag: &Tag, index: usize) -> Option<String> { + tag.as_slice().get(index).cloned() +} + +pub fn nostr_tag_slice(tag: &Tag, start: usize) -> Option<Vec<String>> { + tag.as_slice().get(start..).map(|s| s.to_vec()) +} + +pub fn nostr_tag_relays_parse(tag: &Tag) -> Option<&Vec<RelayUrl>> { + match tag.as_standardized()? { + TagStandard::Relays(urls) => Some(urls), + _ => None, + } +} + +pub fn nostr_tags_match<'a>(tag: &'a Tag) -> Option<(&'a str, &'a [String])> { + if let TagKind::Custom(Cow::Borrowed(key)) = tag.kind() { + Some((key, &tag.as_slice()[1..])) + } else { + None + } +} + +pub fn nostr_tag_match_l(tag: &Tag) -> Option<(&str, f64)> { + let values = tag.as_slice(); + + if values.len() >= 3 && values[0].eq_ignore_ascii_case("l") { + if let Ok(value) = values[1].parse::<f64>() { + return Some((values[2].as_str(), value)); + } + } + + None +} + +pub fn nostr_tag_match_location(tag: &Tag) -> Option<(&str, &str, &str)> { + let values = tag.as_slice(); + + if values.len() >= 4 && values[0] == "location" { + Some((values[1].as_str(), values[2].as_str(), values[3].as_str())) + } else { + None + } +} + +pub fn nostr_tag_match_geohash(tag: &Tag) -> Option<String> { + match tag.as_standardized()? { + TagStandard::Geohash(geohash) => Some(geohash.clone()), + _ => None, + } +} + +pub fn nostr_tag_match_title(tag: &Tag) -> Option<String> { + match tag.as_standardized()? { + TagStandard::Title(title) => Some(title.clone()), + _ => None, + } +} + +pub fn nostr_tag_match_summary(tag: &Tag) -> Option<String> { + match tag.as_standardized()? { + TagStandard::Summary(summary) => Some(summary.clone()), + _ => None, + } +} + +pub fn nostr_event_job_result( + job_request: &Event, + payload: impl Into<String>, + millisats: u64, + bolt11: Option<String>, + tags: Option<Vec<Tag>>, +) -> Result<EventBuilder, NostrUtilsError> { + let builder = EventBuilder::job_result(job_request.clone(), payload, millisats, bolt11)? + .tags(tags.unwrap_or_default()); + Ok(builder) +} + +pub fn nostr_event_job_feedback( + job_request: &Event, + error: JobRequestError, + status: &str, + tags: Option<Vec<Tag>>, +) -> Result<EventBuilder, NostrUtilsError> { + let status = status + .parse::<DataVendingMachineStatus>() + .unwrap_or(DataVendingMachineStatus::Error); + let feedback_data = + JobFeedbackData::new(&job_request.clone(), status).extra_info(error.to_string()); + let builder = EventBuilder::job_feedback(feedback_data).tags(tags.unwrap_or_default()); + Ok(builder) +} + +pub async fn nostr_send_event( + client: Client, + event: EventBuilder, +) -> Result<Output<EventId>, NostrUtilsError> { + Ok(client.send_event_builder(event).await?) +} + +pub async fn nostr_fetch_event_by_id(client: Client, id: &str) -> Result<Event, NostrUtilsError> { + let event_id = EventId::parse(id)?; + let filter = Filter::new().id(event_id); + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; + let event = events + .first() + .ok_or_else(|| NostrUtilsError::EventNotFound(event_id.to_hex()))?; + Ok(event.clone()) +} + +pub fn nostr_tags_resolve(event: &Event, keys: &Keys) -> Result<Vec<Tag>, NostrTagsResolveError> { + if event.tags.iter().any(|t| t.kind() == TagKind::Encrypted) { + let recipient = event + .tags + .iter() + .find_map(|tag| { + if tag.kind() == TagKind::p() { + tag.content()?.parse::<PublicKey>().ok() + } else { + None + } + }) + .ok_or_else(|| NostrTagsResolveError::MissingPTag(event.clone()))?; + + if recipient != keys.public_key() { + return Err(NostrTagsResolveError::NotRecipient.into()); + } + + let cleartext = nip04::decrypt(keys.secret_key(), &event.pubkey, &event.content) + .map_err(|e| NostrTagsResolveError::DecryptionError(e.to_string()))?; + + let decrypted_tags: nostr::event::tag::list::Tags = serde_json::from_str(&cleartext)?; + + Ok(decrypted_tags.to_vec()) + } else { + Ok(event.clone().tags.to_vec()) + } +} + +pub fn build_event_with_tags( + kind_u32: u32, + content: impl Into<String>, + tag_slices: Vec<Vec<String>>, +) -> Result<EventBuilder, NostrUtilsError> { + let mut tags: Vec<Tag> = Vec::new(); + for s in tag_slices { + if s.is_empty() { + continue; + } + let key = s[0].clone(); + let values = s.into_iter().skip(1).collect::<Vec<String>>(); + tags.push(Tag::custom(TagKind::Custom(key.into()), values)); + } + let builder = EventBuilder::new(Kind::Custom(kind_u32 as u16), content.into()).tags(tags); + Ok(builder) +} diff --git a/crates/rhi/src/infra/telemetry.rs b/crates/rhi/src/infra/telemetry.rs @@ -0,0 +1,23 @@ +use std::path::Path; +use tracing_appender::rolling; +use tracing_subscriber::{EnvFilter, Registry, fmt, prelude::*}; + +pub fn init(logs_dir: impl AsRef<Path>) { + let file_appender = rolling::daily(&logs_dir, concat!(env!("CARGO_PKG_NAME"), ".log")); + let (file_writer, guard) = tracing_appender::non_blocking(file_appender); + std::mem::forget(guard); + + let stdout_layer = fmt::layer().with_writer(std::io::stdout).with_target(false); + + let file_layer = fmt::layer() + .with_writer(file_writer) + .with_ansi(false) + .with_target(false); + + let subscriber = Registry::default() + .with(EnvFilter::from_default_env()) + .with(stdout_layer) + .with(file_layer); + + subscriber.init(); +} diff --git a/crates/rhi/src/keys.rs b/crates/rhi/src/keys.rs @@ -1,200 +0,0 @@ -use anyhow::Result; -use nostr::{ - Event, Keys, - event::{EventBuilder, Kind, Tag, TagKind}, - nips::nip01::Metadata, -}; -use radroots_common::{KIND_APPLICATION_HANDLER, KIND_JOB_REQUEST}; -use serde::{Deserialize, Serialize}; -use std::{ - fs::{self, File}, - io::{BufReader, Write}, - path::{Path, PathBuf}, - str::FromStr, -}; -use tempfile::NamedTempFile; -use thiserror::Error; -use tracing::{error, warn}; -use uuid::Uuid; - -#[cfg(unix)] -use std::os::unix::fs::PermissionsExt; - -#[derive(Error, Debug)] -pub enum KeyProfileError { - #[error("Keys file does not exist at {0}")] - NotFound(PathBuf), - - #[error("Failed to open keys file at {0}: {1}")] - FileOpen(PathBuf, #[source] std::io::Error), - - #[error("Keys file already exists at {0}")] - AlreadyExists(PathBuf), - - #[error("Failed to parse keys file at {0}: {1}")] - FileParse(PathBuf, #[source] serde_json::Error), - - #[error("Failed to serialize keys: {0}")] - Serialization(#[from] serde_json::Error), - - #[error("IO error during key write: {0}")] - Io(#[from] std::io::Error), - - #[error("Failed to persist keys to disk: {0}")] - Persist(#[from] tempfile::PersistError), - - #[error("Failed to build or sign nostr event: {0}")] - NostrBuilder(#[from] nostr::event::builder::Error), - - #[error("Invalid secret key for identifier: {0}")] - InvalidSecretKey(String), -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct KeyProfile { - key: String, - identifier: String, - pub metadata: Option<Event>, - pub application_handler: Option<Event>, - - #[serde(skip)] - path: Option<PathBuf>, -} - -impl KeyProfile { - pub fn init<P: AsRef<str>>( - path_str: P, - generate: bool, - identifier_tag: Option<String>, - ) -> Result<Self, KeyProfileError> { - let path = PathBuf::from(path_str.as_ref()); - - if path.exists() { - let file = File::open(&path).map_err(|e| KeyProfileError::FileOpen(path.clone(), e))?; - let reader = BufReader::new(file); - let mut profile: KeyProfile = serde_json::from_reader(reader) - .map_err(|e| KeyProfileError::FileParse(path.clone(), e))?; - profile.path = Some(path.clone()); - - if !profile.identifier.trim().is_empty() { - if let Some(new_id) = identifier_tag { - warn!( - "Provided identifier '{}' is being ignored because the keys file already contains identifier '{}'.", - new_id, profile.identifier - ); - } - } else { - profile.identifier = identifier_tag.unwrap_or_else(|| { - warn!( - "Missing NIP-89 application handler identifier in key file, generating UUID." - ); - Uuid::new_v4().to_string() - }); - profile.persist()?; - } - - Ok(profile) - } else if generate { - let keys = Keys::generate(); - let secret = keys.secret_key(); - let identifier = match identifier_tag { - Some(identifier) => identifier, - None => { - warn!( - "Missing NIP-89 application handler identifier in key file, generating UUID." - ); - Uuid::new_v4().to_string() - } - }; - - let profile = KeyProfile { - key: secret.to_secret_hex(), - identifier, - metadata: None, - application_handler: None, - path: Some(path.clone()), - }; - - profile.atomic_write(&path)?; - Ok(profile) - } else { - Err(KeyProfileError::NotFound(path)) - } - } - - pub fn keys(&self) -> Result<Keys, KeyProfileError> { - Keys::from_str(&self.key) - .map_err(|_| KeyProfileError::InvalidSecretKey(self.identifier.clone())) - } - - fn atomic_write<P: AsRef<Path>>(&self, path: P) -> Result<(), KeyProfileError> { - let json = serde_json::to_string(self)?; - - let dir = path.as_ref().parent().unwrap_or_else(|| Path::new(".")); - let mut temp_file = NamedTempFile::new_in(dir)?; - - temp_file.write_all(json.as_bytes())?; - temp_file.as_file_mut().sync_all()?; - - #[cfg(unix)] - { - fs::set_permissions(temp_file.path(), fs::Permissions::from_mode(0o600))?; - } - - temp_file.persist(path)?; - Ok(()) - } - - fn persist(&self) -> Result<(), KeyProfileError> { - match &self.path { - Some(p) => self.atomic_write(p), - None => Err(KeyProfileError::NotFound(PathBuf::from("[unknown path]"))), - } - } - - pub async fn build_metadata( - &mut self, - metadata: &Metadata, - ) -> Result<Option<Event>, KeyProfileError> { - if self.metadata.is_none() { - let keys = self.keys()?; - let event = EventBuilder::metadata(metadata).sign(&keys).await?; - self.metadata = Some(event.clone()); - self.persist()?; - Ok(Some(event)) - } else { - Ok(None) - } - } - - pub async fn build_application_handler(&mut self) -> Result<Option<Event>, KeyProfileError> { - if self.application_handler.is_none() { - let keys = self.keys()?; - let kind_0_content = self - .metadata - .as_ref() - .expect(&format!( - "The kind 0 metadata must be initialized before kind {} descriptor", - KIND_APPLICATION_HANDLER.to_string() - )) - .content - .clone(); - - let tags: Vec<Tag> = vec![ - Tag::custom(TagKind::Custom("k".into()), [KIND_JOB_REQUEST.to_string()]), - Tag::identifier(self.identifier.to_string()), - ]; - - let event = EventBuilder::new(Kind::Custom(KIND_APPLICATION_HANDLER), kind_0_content) - .tags(tags) - .sign(&keys) - .await?; - - self.application_handler = Some(event.clone()); - self.persist()?; - Ok(Some(event)) - } else { - Ok(None) - } - } -} diff --git a/crates/rhi/src/lib.rs b/crates/rhi/src/lib.rs @@ -1,6 +1,100 @@ +pub mod adapters; pub mod config; -pub mod events; -pub mod handlers; -pub mod keys; -pub mod models; -pub mod utils; +pub mod infra; + +pub mod features { + pub mod trade_listing; +} + +pub mod identity { + pub mod keys; +} + +use anyhow::Result; +use nostr::event::Event; +use nostr_sdk::Client; +use tokio::signal::unix::{SignalKind, signal}; +use tracing::{error, info}; + +use crate::{config::Settings, features::trade_listing, identity::keys::KeyProfile}; + +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Parser, Debug, Clone)] +#[command( + about = env!("CARGO_PKG_DESCRIPTION"), + author = env!("CARGO_PKG_AUTHORS"), + version = env!("CARGO_PKG_VERSION") +)] +pub struct Args { + #[arg( + long, + value_name = "PATH", + value_parser = clap::value_parser!(PathBuf), + help = "(Optional) Path to config file; default is 'config.toml'" + )] + pub config: Option<PathBuf>, +} + +pub async fn run(settings: Settings) -> Result<()> { + let mut key_profile = KeyProfile::init( + &settings.config.keys_path, + settings.config.generate_keys, + settings.config.identifier.clone(), + )?; + + let keys = key_profile.keys()?; + let metadata = settings.metadata.clone(); + + let mut events_to_send: Vec<Event> = vec![]; + + if let Some(event) = key_profile.build_metadata(&metadata).await? { + events_to_send.push(event); + } + + if let Some(event) = key_profile.build_application_handler().await? { + events_to_send.push(event); + } + + if !events_to_send.is_empty() { + let client = Client::new(keys.clone()); + for relay in &settings.config.relays { + client.add_relay(relay).await?; + } + client.connect().await; + for event in events_to_send { + client.send_event(&event).await?; + info!("Sent kind {} event for key profile", event.kind); + } + client.disconnect().await; + } + + let keys_sub = keys.clone(); + let relays_sub = settings.config.relays.clone(); + + tokio::spawn(async move { + loop { + if let Err(e) = + trade_listing::subscriber::subscriber(keys_sub.clone(), relays_sub.clone()).await + { + error!("Error on job request subscription: {e}"); + } + } + }); + + let mut sigterm = signal(SignalKind::terminate())?; + let mut sigint = signal(SignalKind::interrupt())?; + + tokio::select! { + _ = sigterm.recv() => { + info!("Received SIGTERM. Shutting down..."); + }, + _ = sigint.recv() => { + info!("Received SIGINT. Shutting down..."); + } + } + + Ok(()) +} diff --git a/crates/rhi/src/main.rs b/crates/rhi/src/main.rs @@ -1,114 +1,24 @@ use anyhow::Result; use clap::Parser; -use nostr::event::Event; -use nostr_sdk::Client; -use rhi::{config::Settings, events, keys::KeyProfile}; -use tokio::signal::unix::{SignalKind, signal}; use tracing::{error, info}; -fn init_tracing() { - tracing_subscriber::fmt::init(); -} - -#[derive(Parser)] -#[command( - about = env!("CARGO_PKG_DESCRIPTION"), - author = env!("CARGO_PKG_AUTHORS"), - version = env!("CARGO_PKG_VERSION") -)] -pub struct Args { - #[arg(long, help = "Adds the keys profiles file path", required = true)] - pub keys: String, - - #[arg(long, help = "Adds nostr relays to the subscription", required = true)] - pub relays: Vec<String>, - - #[arg( - long, - help = "(Optional) Sets flag to generate keys if none are found", - required = false - )] - pub generate_keys: bool, - - #[arg( - long, - help = "(Optional) Adds the application handler identifier tag (NIP-89)", - required = false - )] - pub identifier: Option<String>, - - #[arg( - long, - help = "(Optional) Adds the config file path. Defaults to 'config.toml'", - required = false - )] - pub config: Option<String>, -} +use rhi::{Args, config::Settings, infra::telemetry, run}; #[tokio::main] -async fn main() -> Result<()> { - init_tracing(); +async fn main() { + if let Err(err) = setup().await { + error!("Fatal error: {err:#?}"); + std::process::exit(1); + } +} +async fn setup() -> Result<()> { let args = Args::parse(); - let config = Settings::load(&args.config)?; - let relays = args.relays.clone(); + let settings = Settings::load(&args.config)?; + telemetry::init(&settings.config.logs_dir); info!("Starting"); - let mut key_profile = KeyProfile::init(args.keys, args.generate_keys, args.identifier)?; - - let keys = key_profile.keys()?; - - let metadata = config.metadata.clone(); - - let mut events: Vec<Event> = vec![]; - - if let Some(event) = key_profile.build_metadata(&metadata).await? { - events.push(event); - } - - if let Some(event) = key_profile.build_application_handler().await? { - events.push(event); - } - - if !events.is_empty() { - let client = Client::new(keys.clone()); - for relay in relays.iter() { - client.add_relay(relay).await?; - } - client.connect().await; - for event in events { - client.send_event(&event).await?; - info!("Sent kind {} event for key profile", { event.clone().kind }) - } - client.disconnect().await; - } - - let keys_sub = keys.clone(); - let relays_sub = relays.clone(); - - tokio::spawn(async move { - loop { - if let Err(e) = - events::job_request::subscriber(keys_sub.clone(), relays_sub.clone()).await - { - error!("Error on job request subscription: {e}"); - } - } - }); - - let mut sigterm = signal(SignalKind::terminate())?; - let mut sigint = signal(SignalKind::interrupt())?; - - tokio::select! { - _ = sigterm.recv() => { - info!("Received SIGTERM. Shutting down..."); - }, - _ = sigint.recv() => { - info!("Received SIGINT. Shutting down..."); - } - } - - Ok(()) + run(settings).await } diff --git a/crates/rhi/src/models/event_classified.rs b/crates/rhi/src/models/event_classified.rs @@ -1,461 +0,0 @@ -use anyhow::Result; -use nostr::{EventId, event::Event}; -use serde::{Deserialize, Serialize}; - -use crate::{ - handlers::job_request_order::JobRequestOrderError, - utils::{ - nostr::{ - nostr_tag_match_geohash, nostr_tag_match_l, nostr_tag_match_location, - nostr_tag_match_summary, nostr_tag_match_title, nostr_tags_match, - }, - unit::{MassUnit, convert_mass}, - }, -}; - -use radroots_common::models::{ - listing_order::{ - ListingOrder, ListingOrderDiscount, ListingOrderPrice, ListingOrderQuantity, - ListingOrderSubtotal, ListingOrderTotal, - }, - listing_order_request::ListingOrderRequestPayload, -}; - -#[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct EventClassifiedGeolocation { - pub geohash: Option<String>, - pub lat: f64, - pub lng: f64, -} - -#[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct EventClassifiedLocation { - pub address: String, - pub region: String, - pub country: String, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(tag = "type")] -pub enum EventClassifiedDiscount { - #[serde(rename = "subtotal")] - Subtotal { - threshold: f64, - currency: String, - value: f64, - is_percent: bool, - }, - #[serde(rename = "mass")] - Mass { - discount_unit: String, - threshold: f64, - threshold_unit: String, - discount_per_unit: f64, - currency: String, - }, - #[serde(rename = "quantity")] - Quantity { - product_key: String, - min_count: u32, - discount_per_unit: f64, - currency: String, - }, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct EventClassifiedQuantity { - pub amount: f64, - pub unit: MassUnit, - pub label: String, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct EventClassifiedPrice { - pub amount: f64, - pub currency: String, - pub quantity_amount: f64, - pub quantity_unit: MassUnit, -} - -#[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct EventClassifiedListing { - pub key: String, - pub category: String, - pub process: Option<String>, - pub lot: Option<String>, - pub profile: Option<String>, - pub year: Option<String>, -} - -#[derive(Debug, Serialize, Deserialize, Clone, Default)] -pub struct EventClassifiedBasis { - pub title: String, - pub summary: String, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct EventClassified { - pub id: EventId, - pub basis: EventClassifiedBasis, - pub listing: EventClassifiedListing, - pub prices: Vec<EventClassifiedPrice>, - pub quantities: Vec<EventClassifiedQuantity>, - pub discounts: Vec<EventClassifiedDiscount>, - pub location: Option<EventClassifiedLocation>, - pub geolocation: Option<EventClassifiedGeolocation>, -} - -impl EventClassified { - pub fn from_event(event: &Event) -> Result<Self> { - let mut prices = Vec::new(); - let mut quantities = Vec::new(); - let mut basis = EventClassifiedBasis::default(); - let mut listing = EventClassifiedListing::default(); - - let mut address: Option<String> = None; - let mut region: Option<String> = None; - let mut country: Option<String> = None; - let mut lat: Option<f64> = None; - let mut lng: Option<f64> = None; - let mut geohash: Option<String> = None; - let mut discounts: Vec<EventClassifiedDiscount> = Vec::new(); - - for tag in event.tags.iter() { - if let Some((key, values)) = nostr_tags_match(tag) { - match key { - "quantity" if values.len() >= 3 => { - let amount_str = &values[0]; - let unit_str = &values[1]; - let label = &values[2]; - - if let (Ok(amount), Ok(unit)) = - (amount_str.parse::<f64>(), unit_str.parse::<MassUnit>()) - { - quantities.push(EventClassifiedQuantity { - amount, - unit, - label: label.clone(), - }); - } - } - "price" if values.len() >= 4 => { - let amount_str = &values[0]; - let currency = &values[1]; - let quantity_amount_str = &values[2]; - let quantity_unit_str = &values[3]; - - if let (Ok(amount), Ok(quantity_amount), Ok(quantity_unit)) = ( - amount_str.parse::<f64>(), - quantity_amount_str.parse::<f64>(), - quantity_unit_str.to_lowercase().parse::<MassUnit>(), - ) { - prices.push(EventClassifiedPrice { - amount, - currency: currency.clone(), - quantity_amount, - quantity_unit, - }); - } - } - "key" if !values.is_empty() => listing.key = values[0].clone(), - "category" if !values.is_empty() => listing.category = values[0].clone(), - "process" if !values.is_empty() => listing.process = Some(values[0].clone()), - "lot" if !values.is_empty() => listing.lot = Some(values[0].clone()), - "profile" if !values.is_empty() => listing.profile = Some(values[0].clone()), - "year" if !values.is_empty() => listing.year = Some(values[0].clone()), - "price-discount-subtotal" if values.len() >= 4 => { - let threshold = values[0].parse().unwrap_or(0.0); - let currency = values[1].clone(); - let value = values[2].parse().unwrap_or(0.0); - let is_percent = values[3] == "%"; - discounts.push(EventClassifiedDiscount::Subtotal { - threshold, - currency, - value, - is_percent, - }); - } - "price-discount-mass" if values.len() >= 5 => { - let discount_unit = values[0].clone(); - let threshold = values[1].parse().unwrap_or(0.0); - let threshold_unit = values[2].clone(); - let discount_per_unit = values[3].parse().unwrap_or(0.0); - let currency = values[4].clone(); - discounts.push(EventClassifiedDiscount::Mass { - discount_unit, - threshold, - threshold_unit, - discount_per_unit, - currency, - }); - } - "price-discount-quantity" if values.len() >= 4 => { - let product_key = values[0].clone(); - let min_count = values[1].parse().unwrap_or(0); - let discount_per_unit = values[2].parse().unwrap_or(0.0); - let currency = values[3].clone(); - discounts.push(EventClassifiedDiscount::Quantity { - product_key, - min_count, - discount_per_unit, - currency, - }); - } - _ => {} - } - } - - if let Some((kind, value)) = nostr_tag_match_l(tag) { - let precision = value.to_string().split('.').nth(1).map_or(0, |s| s.len()); - - match kind { - "dd.lat" => { - let current_precision = lat - .map(|v| v.to_string().split('.').nth(1).map_or(0, |s| s.len())) - .unwrap_or(0); - if precision > current_precision { - lat = Some(value); - } - } - "dd.lon" => { - let current_precision = lng - .map(|v| v.to_string().split('.').nth(1).map_or(0, |s| s.len())) - .unwrap_or(0); - if precision > current_precision { - lng = Some(value); - } - } - _ => {} - } - } - - if let Some((addr, reg, coun)) = nostr_tag_match_location(tag) { - address = Some(addr.to_string()); - region = Some(reg.to_string()); - country = Some(coun.to_string()); - } - - if let Some(g) = nostr_tag_match_geohash(tag) { - if geohash - .as_ref() - .map_or(true, |current| g.len() > current.len()) - { - geohash = Some(g); - } - } - - if let Some(title) = nostr_tag_match_title(tag) { - basis.title = title; - } - - if let Some(summary) = nostr_tag_match_summary(tag) { - basis.summary = summary; - } - } - - let location = if address.is_some() || region.is_some() || country.is_some() { - Some(EventClassifiedLocation { - address: address.unwrap_or_default(), - region: region.unwrap_or_default(), - country: country.unwrap_or_default(), - }) - } else { - None - }; - - let geolocation = if let (Some(lat), Some(lng)) = (lat, lng) { - Some(EventClassifiedGeolocation { geohash, lat, lng }) - } else { - None - }; - - Ok(Self { - id: event.id, - basis, - listing, - prices, - quantities, - discounts, - location, - geolocation, - }) - } - - pub fn calculate_order( - &self, - order: &ListingOrderRequestPayload, - ) -> Result<ListingOrder, JobRequestOrderError> { - let quantity = &order.quantity; - let price = &order.price; - - let qty_unit = quantity - .unit - .parse::<MassUnit>() - .map_err(|_| JobRequestOrderError::Unsatisfiable("invalid quantity unit".into()))?; - let price_unit = price.quantity_unit.parse::<MassUnit>().map_err(|_| { - JobRequestOrderError::Unsatisfiable("invalid price quantity unit".into()) - })?; - - let total_qty = quantity.amount * quantity.count as f64; - - let matched_packaging = self - .quantities - .iter() - .any(|q| q.unit == qty_unit && (q.amount - quantity.amount).abs() < f64::EPSILON); - - if !matched_packaging { - return Err(JobRequestOrderError::Unsatisfiable(format!( - "requested packaging {} {} not available", - quantity.amount, quantity.unit - ))); - } - - let matched_tier = self.prices.iter().find(|p| { - p.quantity_unit == price_unit - && (p.quantity_amount - price.quantity_amount).abs() < f64::EPSILON - && p.currency.to_lowercase() == price.currency.to_lowercase() - }); - - let tier = matched_tier.ok_or_else(|| { - JobRequestOrderError::Unsatisfiable(format!( - "no matching price tier {} {} found", - price.quantity_amount, price.quantity_unit - )) - })?; - - if (tier.amount - price.amount).abs() > f64::EPSILON { - return Err(JobRequestOrderError::Unsatisfiable(format!( - "price mismatch: expected {}, got {}", - tier.amount, price.amount - ))); - } - - let converted_qty = convert_mass(total_qty, &qty_unit, &price_unit); - let unit_price = tier.amount / tier.quantity_amount; - let subtotal = (unit_price * converted_qty * 100.0).round() / 100.0; - - let mut discounts: Vec<ListingOrderDiscount> = Vec::new(); - let package_key = format!( - "{}-{}-{}", - quantity.amount, - quantity.unit.to_lowercase(), - quantity.label - ); - - for d in &self.discounts { - match d { - EventClassifiedDiscount::Subtotal { - threshold, - currency, - value, - is_percent, - } => { - if subtotal < *threshold { - continue; - } - let amt = if *is_percent { - (subtotal * value / 100.0 * 100.0).round() / 100.0 - } else { - (*value * 100.0).round() / 100.0 - }; - discounts.push(ListingOrderDiscount { - discount_type: "subtotal".into(), - threshold: Some(*threshold), - threshold_unit: None, - discount_per_unit: None, - discount_unit: None, - discount_percent: if *is_percent { Some(*value) } else { None }, - discount_amount: amt, - currency: currency.clone(), - }); - } - EventClassifiedDiscount::Mass { - discount_unit, - threshold, - threshold_unit, - discount_per_unit, - currency, - } => { - let th_unit = threshold_unit.parse::<MassUnit>().map_err(|_| { - JobRequestOrderError::Unsatisfiable("invalid threshold unit".into()) - })?; - let dis_unit = discount_unit.parse::<MassUnit>().map_err(|_| { - JobRequestOrderError::Unsatisfiable("invalid discount unit".into()) - })?; - - let qty_in_th = convert_mass(total_qty, &qty_unit, &th_unit); - if qty_in_th < *threshold { - continue; - } - - let qty_in_dis = convert_mass(total_qty, &qty_unit, &dis_unit); - let amt = (qty_in_dis * discount_per_unit * 100.0).round() / 100.0; - - discounts.push(ListingOrderDiscount { - discount_type: "mass".into(), - threshold: Some(*threshold), - threshold_unit: Some(threshold_unit.clone()), - discount_per_unit: Some(*discount_per_unit), - discount_unit: Some(discount_unit.clone()), - discount_percent: None, - discount_amount: amt, - currency: currency.clone(), - }); - } - EventClassifiedDiscount::Quantity { - product_key, - min_count, - discount_per_unit, - currency, - } => { - if product_key != &package_key || quantity.count < *min_count { - continue; - } - - let amt = (*discount_per_unit * quantity.count as f64 * 100.0).round() / 100.0; - - discounts.push(ListingOrderDiscount { - discount_type: "quantity".into(), - threshold: Some(*min_count as f64), - threshold_unit: None, - discount_per_unit: Some(*discount_per_unit), - discount_unit: None, - discount_percent: None, - discount_amount: amt, - currency: currency.clone(), - }); - } - } - } - - let total_discount: f64 = discounts.iter().map(|d| d.discount_amount).sum(); - let total = ((subtotal - total_discount) * 100.0).round() / 100.0; - - Ok(ListingOrder { - quantity: ListingOrderQuantity { - amount: quantity.amount, - unit: quantity.unit.clone(), - label: quantity.label.clone(), - }, - price: ListingOrderPrice { - amount: tier.amount, - currency: tier.currency.clone(), - quantity_amount: tier.quantity_amount, - quantity_unit: price.quantity_unit.clone(), - }, - discounts, - subtotal: ListingOrderSubtotal { - price_amount: subtotal, - price_currency: tier.currency.clone(), - quantity_amount: total_qty, - quantity_unit: quantity.unit.clone(), - }, - total: ListingOrderTotal { - price_amount: total, - price_currency: tier.currency.clone(), - quantity_amount: total_qty, - quantity_unit: quantity.unit.clone(), - }, - }) - } -} diff --git a/crates/rhi/src/models/mod.rs b/crates/rhi/src/models/mod.rs @@ -1 +0,0 @@ -pub mod event_classified; diff --git a/crates/rhi/src/utils/mod.rs b/crates/rhi/src/utils/mod.rs @@ -1,3 +0,0 @@ -pub mod nostr; -pub mod price; -pub mod unit; diff --git a/crates/rhi/src/utils/nostr.rs b/crates/rhi/src/utils/nostr.rs @@ -1,206 +0,0 @@ -use std::{borrow::Cow, time::Duration}; - -use crate::events::job_request::JobRequestError; -use anyhow::Result; -use nostr::{ - event::{Event, EventBuilder, EventId, Kind, Tag, TagKind, TagStandard}, - filter::Filter, - key::{Keys, PublicKey}, - nips::{ - nip04, - nip90::{DataVendingMachineStatus, JobFeedbackData}, - }, - types::{RelayUrl, Timestamp}, -}; -use nostr_sdk::Client; -use nostr_sdk::prelude::*; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum NostrUtilsError { - #[error("Client error: {0}")] - ClientError(#[from] nostr_sdk::client::Error), - - #[error("Event error: {0}")] - EventError(#[from] nostr::event::Error), - - #[error("Event not found: {0}")] - EventNotFound(String), - - #[error("Event builder failure: {0}")] - EventBuildError(#[from] nostr::event::builder::Error), -} - -#[derive(Debug, Error)] -pub enum NostrTagsResolveError { - #[error("Missing public key tag in encrypted event: {0:?}")] - MissingPTag(nostr::Event), - - #[error("Encrypted event recipient mismatch")] - NotRecipient, - - #[error("Decryption error: {0}")] - DecryptionError(String), - - #[error("Failed to parse decrypted tag JSON: {0}")] - ParseError(#[from] serde_json::Error), -} - -pub fn nostr_kind(kind: u16) -> Kind { - Kind::Custom(kind) -} - -pub fn nostr_filter_kind(kind: u16) -> Filter { - Filter::new().kind(Kind::Custom(kind)) -} - -pub fn nostr_filter_new_events(filter: Filter) -> Filter { - filter.since(Timestamp::now()) -} - -pub fn nostr_tag_first_value(tag: &Tag, key: &str) -> Option<String> { - if tag.kind() == TagKind::custom(key) { - tag.content().map(|v| v.to_string()) - } else { - None - } -} - -pub fn nostr_tag_at_value(tag: &Tag, index: usize) -> Option<String> { - tag.as_slice().get(index).cloned() -} - -pub fn nostr_tag_slice(tag: &Tag, start: usize) -> Option<Vec<String>> { - tag.as_slice().get(start..).map(|s| s.to_vec()) -} - -pub fn nostr_tag_relays_parse(tag: &Tag) -> Option<&Vec<RelayUrl>> { - match tag.as_standardized()? { - TagStandard::Relays(urls) => Some(urls), - _ => None, - } -} - -pub fn nostr_tags_match<'a>(tag: &'a Tag) -> Option<(&'a str, &'a [String])> { - if let TagKind::Custom(Cow::Borrowed(key)) = tag.kind() { - Some((key, &tag.as_slice()[1..])) - } else { - None - } -} - -pub fn nostr_tag_match_l(tag: &Tag) -> Option<(&str, f64)> { - let values = tag.as_slice(); - - if values.len() >= 3 && values[0].eq_ignore_ascii_case("l") { - if let Ok(value) = values[1].parse::<f64>() { - return Some((values[2].as_str(), value)); - } - } - - None -} - -pub fn nostr_tag_match_location(tag: &Tag) -> Option<(&str, &str, &str)> { - let values = tag.as_slice(); - - if values.len() >= 4 && values[0] == "location" { - Some((values[1].as_str(), values[2].as_str(), values[3].as_str())) - } else { - None - } -} - -pub fn nostr_tag_match_geohash(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Geohash(geohash) => Some(geohash.clone()), - _ => None, - } -} - -pub fn nostr_tag_match_title(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Title(title) => Some(title.clone()), - _ => None, - } -} - -pub fn nostr_tag_match_summary(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Summary(summary) => Some(summary.clone()), - _ => None, - } -} - -pub fn nostr_event_job_result( - job_request: &Event, - payload: impl Into<String>, - millisats: u64, - bolt11: Option<String>, - tags: Option<Vec<Tag>>, -) -> Result<EventBuilder, NostrUtilsError> { - let builder = EventBuilder::job_result(job_request.clone(), payload, millisats, bolt11)? - .tags(tags.unwrap_or_default()); - Ok(builder) -} - -pub fn nostr_event_job_feedback( - job_request: &Event, - error: JobRequestError, - status: &str, - tags: Option<Vec<Tag>>, -) -> Result<EventBuilder, NostrUtilsError> { - let status = status - .parse::<DataVendingMachineStatus>() - .unwrap_or(DataVendingMachineStatus::Error); - let feedback_data = - JobFeedbackData::new(&job_request.clone(), status).extra_info(error.to_string()); - let builder = EventBuilder::job_feedback(feedback_data).tags(tags.unwrap_or_default()); - Ok(builder) -} - -pub async fn nostr_send_event( - client: Client, - event: EventBuilder, -) -> Result<Output<EventId>, NostrUtilsError> { - Ok(client.send_event_builder(event).await?) -} - -pub async fn nostr_fetch_event_by_id(client: Client, id: &str) -> Result<Event, NostrUtilsError> { - let event_id = EventId::parse(id)?; - let filter = Filter::new().id(event_id); - let events = client.fetch_events(filter, Duration::from_secs(10)).await?; - let event = events - .first() - .ok_or_else(|| NostrUtilsError::EventNotFound(event_id.to_hex()))?; - Ok(event.clone()) -} - -pub fn nostr_tags_resolve(event: &Event, keys: &Keys) -> Result<Vec<Tag>, NostrTagsResolveError> { - if event.tags.iter().any(|t| t.kind() == TagKind::Encrypted) { - let recipient = event - .tags - .iter() - .find_map(|tag| { - if tag.kind() == TagKind::p() { - tag.content()?.parse::<PublicKey>().ok() - } else { - None - } - }) - .ok_or_else(|| NostrTagsResolveError::MissingPTag(event.clone()))?; - - if recipient != keys.public_key() { - return Err(NostrTagsResolveError::NotRecipient.into()); - } - - let cleartext = nip04::decrypt(keys.secret_key(), &event.pubkey, &event.content) - .map_err(|e| NostrTagsResolveError::DecryptionError(e.to_string()))?; - - let decrypted_tags: nostr::event::tag::list::Tags = serde_json::from_str(&cleartext)?; - - Ok(decrypted_tags.to_vec()) - } else { - Ok(event.clone().tags.to_vec()) - } -} diff --git a/crates/rhi/src/utils/price.rs b/crates/rhi/src/utils/price.rs @@ -1,15 +0,0 @@ -use super::unit::{MassUnit, convert_mass}; - -pub fn calculate_total_price( - quantity_amount: f64, - quantity_unit: &MassUnit, - quantity_count: u32, - price_amount: f64, - price_quantity_amount: f64, - price_quantity_unit: &MassUnit, -) -> f64 { - let total_mass = quantity_amount * quantity_count as f64; - let total_mass_in_price_unit = convert_mass(total_mass, quantity_unit, price_quantity_unit); - let price_per_quantity_unit = price_amount / price_quantity_amount; - price_per_quantity_unit * total_mass_in_price_unit -} diff --git a/crates/rhi/src/utils/unit.rs b/crates/rhi/src/utils/unit.rs @@ -1,78 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum MassUnitError { - #[error("Invalid mass unit: {0}")] - InvalidUnit(String), - - #[error("Invalid mass amount: {0}")] - InvalidAmount(f64), -} - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum MassUnit { - G, - Kg, - Oz, - Lb, -} - -impl MassUnit { - pub fn to_grams(&self) -> f64 { - match self { - MassUnit::G => 1.0, - MassUnit::Kg => 1000.0, - MassUnit::Oz => 28.3495, - MassUnit::Lb => 453.592, - } - } - - pub fn amount_in_grams(&self, amount: f64) -> Result<f64, MassUnitError> { - if !amount.is_finite() { - return Err(MassUnitError::InvalidAmount(amount)); - } - - let factor = match self { - MassUnit::G => 1.0, - MassUnit::Kg => 1000.0, - MassUnit::Oz => 28.3495, - MassUnit::Lb => 453.592, - }; - - Ok(amount * factor) - } -} - -impl fmt::Display for MassUnit { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let unit_str = match self { - MassUnit::G => "g", - MassUnit::Kg => "kg", - MassUnit::Oz => "oz", - MassUnit::Lb => "lb", - }; - write!(f, "{unit_str}") - } -} - -impl FromStr for MassUnit { - type Err = MassUnitError; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - "g" => Ok(MassUnit::G), - "kg" => Ok(MassUnit::Kg), - "oz" => Ok(MassUnit::Oz), - "lb" => Ok(MassUnit::Lb), - other => Err(MassUnitError::InvalidUnit(other.to_string())), - } - } -} - -pub fn convert_mass(amount: f64, from_unit: &MassUnit, to_unit: &MassUnit) -> f64 { - let amount_g = amount * from_unit.to_grams(); - amount_g / to_unit.to_grams() -}