rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit dcfaef659ca41ba48cab68f683721f608b3d56f0
parent 60143307860335d7a824ee5e0d9d52194d73b11a
Author: triesap <triesap@radroots.dev>
Date:   Wed, 24 Dec 2025 14:54:05 +0000

core: migrate to standalone crate layout

- Convert workspace manifest to single package and bump toolchain to 1.88
- Swap nostr-sdk usage for radroots-nostr client/types and add jsonrpsee/reqwest deps
- Implement trade listing DVM handler + stateful subscriber with configurable backoff
- Remove legacy trade_listing/infra modules and relocate sources under src/

Diffstat:
MCargo.lock | 601+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
MCargo.toml | 42++++++++++++++++++------------------------
Mconfig.toml | 9+++++++--
Dcrates/rhi/Cargo.toml | 29-----------------------------
Dcrates/rhi/src/adapters/nostr/event.rs | 78------------------------------------------------------------------------------
Dcrates/rhi/src/cli.rs | 35-----------------------------------
Dcrates/rhi/src/config.rs | 13-------------
Dcrates/rhi/src/features/trade_listing/domain/pricing.rs | 97-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/accept.rs | 142-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/conveyance.rs | 120-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/fulfillment.rs | 125-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/invoice.rs | 165-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/mod.rs | 7-------
Dcrates/rhi/src/features/trade_listing/handlers/order.rs | 106-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/payment.rs | 116-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/handlers/receipt.rs | 118-------------------------------------------------------------------------------
Dcrates/rhi/src/features/trade_listing/mod.rs | 3---
Dcrates/rhi/src/features/trade_listing/subscriber.rs | 401-------------------------------------------------------------------------------
Dcrates/rhi/src/infra/mod.rs | 1-
Dcrates/rhi/src/infra/nostr.rs | 224-------------------------------------------------------------------------------
Dcrates/rhi/src/key_profile.rs | 29-----------------------------
Dcrates/rhi/src/lib.rs | 142-------------------------------------------------------------------------------
Dcrates/rhi/src/main.rs | 22----------------------
Dcrates/rhi/src/rhi.rs | 70----------------------------------------------------------------------
Aidentity.json | 4++++
Mrust-toolchain.toml | 4++--
Rcrates/rhi/src/adapters/mod.rs -> src/adapters/mod.rs | 0
Asrc/adapters/nostr/event.rs | 78++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Rcrates/rhi/src/adapters/nostr/mod.rs -> src/adapters/nostr/mod.rs | 0
Asrc/cli.rs | 35+++++++++++++++++++++++++++++++++++
Asrc/config.rs | 23+++++++++++++++++++++++
Rcrates/rhi/src/features/trade_listing/domain/mod.rs -> src/features/trade_listing/domain/mod.rs | 0
Asrc/features/trade_listing/domain/pricing.rs | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/accept.rs | 148+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/conveyance.rs | 126+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/dvm.rs | 1041+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/fulfillment.rs | 132+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/invoice.rs | 169+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/mod.rs | 1+
Asrc/features/trade_listing/handlers/order.rs | 111+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/payment.rs | 123+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/handlers/receipt.rs | 126+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/mod.rs | 3+++
Asrc/features/trade_listing/state.rs | 104+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/features/trade_listing/subscriber.rs | 115+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/infra/mod.rs | 1+
Asrc/lib.rs | 87+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/main.rs | 30++++++++++++++++++++++++++++++
Asrc/rhi.rs | 105+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
49 files changed, 3261 insertions(+), 2097 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -197,6 +197,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef49f5882e4b6afaac09ad239a4f8c70a24b8f2b0897edb1f706008efd109cf4" [[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] name = "autocfg" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -349,6 +355,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + +[[package]] name = "chacha20" version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -644,7 +656,6 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -668,17 +679,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -755,9 +755,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasi 0.14.2+wasi-0.2.4", + "wasm-bindgen", ] [[package]] @@ -779,6 +781,25 @@ dependencies = [ ] [[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -845,12 +866,105 @@ dependencies = [ ] [[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] name = "httparse" version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots 1.0.4", +] + +[[package]] +name = "hyper-util" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "ipnet", + "libc", + "percent-encoding", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + +[[package]] name = "iana-time-zone" version = "0.1.63" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1046,6 +1160,22 @@ dependencies = [ ] [[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +dependencies = [ + "memchr", + "serde", +] + +[[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1079,6 +1209,82 @@ dependencies = [ ] [[package]] +name = "jsonrpsee" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f3f48dc3e6b8bd21e15436c1ddd0bc22a6a54e8ec46fedd6adf3425f396ec6a" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-server", + "jsonrpsee-types", + "tokio", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "316c96719901f05d1137f19ba598b5fe9c9bc39f4335f67f6be8613921946480" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types", + "parking_lot", + "pin-project", + "rand 0.9.0", + "rustc-hash", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "jsonrpsee-server" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c51b7c290bb68ce3af2d029648148403863b982f138484a73f02a9dd52dbd7f" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc88ff4688e43cc3fa9883a8a95c6fa27aa2e76c96e610b737b6554d650d7fd5" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror 2.0.12", +] + +[[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1125,6 +1331,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" [[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + +[[package]] name = "matchers" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1410,6 +1622,26 @@ dependencies = [ ] [[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1457,6 +1689,61 @@ dependencies = [ ] [[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.12", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.2", + "lru-slab", + "rand 0.9.0", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.12", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] name = "quote" version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1487,6 +1774,7 @@ version = "0.1.0" dependencies = [ "radroots-core", "serde", + "ts-rs", "typeshare", ] @@ -1494,6 +1782,7 @@ dependencies = [ name = "radroots-events-codec" version = "0.1.0" dependencies = [ + "nostr", "radroots-core", "radroots-events", "serde", @@ -1506,9 +1795,24 @@ dependencies = [ "nostr", "radroots-runtime", "serde", + "serde_json", "thiserror 1.0.69", "tracing", - "uuid", +] + +[[package]] +name = "radroots-nostr" +version = "0.1.0" +dependencies = [ + "nostr", + "nostr-sdk", + "radroots-events", + "radroots-events-codec", + "radroots-identity", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", ] [[package]] @@ -1537,7 +1841,8 @@ dependencies = [ "radroots-events", "radroots-events-codec", "serde", - "typeshare", + "serde_json", + "ts-rs", ] [[package]] @@ -1654,23 +1959,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 1.0.4", +] + +[[package]] name = "rhi" version = "0.1.0" dependencies = [ "anyhow", "clap", - "futures", - "nostr", - "nostr-sdk", + "jsonrpsee", "radroots-core", "radroots-events", "radroots-events-codec", "radroots-identity", + "radroots-nostr", "radroots-runtime", "radroots-trade", + "reqwest", "serde", "serde_json", - "tempfile", "thiserror 1.0.69", "tokio", "tracing", @@ -1704,6 +2046,12 @@ dependencies = [ ] [[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + +[[package]] name = "rust-ini" version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1741,6 +2089,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + +[[package]] name = "rustix" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1772,6 +2126,9 @@ name = "rustls-pki-types" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -1845,18 +2202,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1865,14 +2232,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.146" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8" dependencies = [ "itoa", "memchr", "ryu", "serde", + "serde_core", ] [[package]] @@ -1885,6 +2253,18 @@ dependencies = [ ] [[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] name = "sha1" version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1956,6 +2336,22 @@ dependencies = [ ] [[package]] +name = "soketto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", +] + +[[package]] name = "stable_deref_trait" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1985,6 +2381,15 @@ dependencies = [ ] [[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] name = "synstructure" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2009,6 +2414,15 @@ dependencies = [ ] [[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + +[[package]] name = "thiserror" version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2175,6 +2589,18 @@ dependencies = [ ] [[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] name = "tokio-tungstenite" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2187,7 +2613,21 @@ dependencies = [ "tokio", "tokio-rustls", "tungstenite", - "webpki-roots", + "webpki-roots 0.26.8", +] + +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] @@ -2225,6 +2665,51 @@ dependencies = [ ] [[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] name = "tracing" version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2298,6 +2783,34 @@ dependencies = [ ] [[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "ts-rs" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4994acea2522cd2b3b85c1d9529a55991e3ad5e25cdcd3de9d505972c4379424" +dependencies = [ + "thiserror 2.0.12", + "ts-rs-macros", +] + +[[package]] +name = "ts-rs-macros" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee6ff59666c9cbaec3533964505d39154dc4e0a56151fdea30a09ed0301f62e2" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "termcolor", +] + +[[package]] name = "tungstenite" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2430,7 +2943,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ "getrandom 0.3.2", - "serde", ] [[package]] @@ -2446,6 +2958,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2542,6 +3063,16 @@ dependencies = [ ] [[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] name = "webpki-roots" version = "0.26.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2551,6 +3082,15 @@ dependencies = [ ] [[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + +[[package]] name = "winapi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2567,6 +3107,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -1,34 +1,28 @@ -[workspace] -members = [ - "crates/*", -] -resolver = "2" - -[workspace.package] +[package] +name = "rhi" version = "0.1.0" edition = "2024" -rust-version = "1.86.0" +authors = ["Radroots Authors"] +rust-version = "1.88.0" license = "AGPL-3.0" +description = "Rhizome Nostr data vending machine (NIP-90)" -[workspace.dependencies] -radroots-core = { path = "../../crates/crates/core" } -radroots-events = { path = "../../crates/crates/events" } -radroots-events-codec = { path = "../../crates/crates/events-codec" } -radroots-identity = { path = "../../crates/crates/identity" } -radroots-nostr = { path = "../../crates/crates/nostr" } -radroots-runtime = { path = "../../crates/crates/runtime" } -radroots-trade = { path = "../../crates/crates/trade" } +[dependencies] +radroots-core = { path = "../crates/core", features = ["std", "serde", "typeshare"] } +radroots-events = { path = "../crates/events", features = ["serde"] } +radroots-events-codec = { path = "../crates/events-codec", features = ["nostr"] } +radroots-identity = { path = "../crates/identity" } +radroots-nostr = { path = "../crates/nostr", features = ["client", "codec", "events", "http"] } +radroots-runtime = { path = "../crates/runtime", features = ["cli"] } +radroots-trade = { path = "../crates/trade" } anyhow = { version = "1" } -clap = { version = "4" } -config = { version = "0.15" } -futures = { version = "0.3" } -nostr = { version = "0.43.0", features = ["nip04"] } -nostr-sdk = { version = "0.43.0" } +clap = { version = "4", features = ["derive"] } +jsonrpsee = { version = "0.26", features = ["server"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1", default-features = false } serde_json = { version = "1", default-features = false } -tempfile = { version = "3.19.1" } +tokio = { version = "1", features = ["full"] } thiserror = { version = "1" } -tokio = { version = "1" } tracing = { version = "0.1" } -uuid = { version = "1.16.0" } +uuid = { version = "1.16.0", features = ["v4"] } diff --git a/config.toml b/config.toml @@ -13,4 +13,10 @@ name = "rhi" logs_dir = "logs" relays = [ "ws://127.0.0.1:8080" -] -\ No newline at end of file +] + +[config.subscriber.backoff] +base_ms = 500 +max_ms = 30000 +factor = 2 +jitter_ms = 0 diff --git a/crates/rhi/Cargo.toml b/crates/rhi/Cargo.toml @@ -1,29 +0,0 @@ -[package] -name = "rhi" -version.workspace = true -edition.workspace = true -authors = ["Radroots Authors"] -rust-version.workspace = true -license.workspace = true -description = "Rhizome Nostr data vending machine (NIP-90)" - -[dependencies] -radroots-core = { workspace = true, features = ["std", "serde", "typeshare"] } -radroots-events = { workspace = true, features = ["serde"] } -radroots-events-codec = { workspace = true, features = ["serde"] } -radroots-identity = { workspace = true } -radroots-runtime = { workspace = true, features = ["cli"] } -radroots-trade = { workspace = true } - -anyhow = { workspace = true } -clap = { workspace = true, features = ["derive"]} -futures = { workspace = true } -nostr = { workspace = true } -nostr-sdk = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -tempfile = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true, features = ["full"] } -tracing = { workspace = true } -uuid = { workspace = true, features = ["v4"] } diff --git a/crates/rhi/src/adapters/nostr/event.rs b/crates/rhi/src/adapters/nostr/event.rs @@ -1,78 +0,0 @@ -use nostr::event::Event; -use radroots_events_codec::job::traits::{JobEventBorrow, JobEventLike}; - -#[derive(Clone, Debug)] -pub struct NostrEventAdapter<'a> { - evt: &'a Event, - id_hex: String, - author_hex: String, -} - -impl<'a> NostrEventAdapter<'a> { - #[inline] - pub fn new(evt: &'a Event) -> Self { - Self { - evt, - id_hex: evt.id.to_hex(), - author_hex: evt.pubkey.to_string(), - } - } - - #[inline] - fn tags_as_slices(&self) -> Vec<Vec<String>> { - self.evt - .tags - .iter() - .map(|t| t.as_slice().to_vec()) - .collect() - } -} - -impl<'a> JobEventBorrow<'a> for NostrEventAdapter<'a> { - #[inline] - fn raw_id(&'a self) -> &'a str { - &self.id_hex - } - #[inline] - fn raw_author(&'a self) -> &'a str { - &self.author_hex - } - #[inline] - fn raw_content(&'a self) -> &'a str { - &self.evt.content - } - #[inline] - fn raw_kind(&'a self) -> u32 { - match self.evt.kind { - nostr::event::Kind::Custom(v) => v as u32, - _ => 0, - } - } -} - -impl JobEventLike for NostrEventAdapter<'_> { - fn raw_id(&self) -> String { - self.id_hex.clone() - } - fn raw_author(&self) -> String { - self.author_hex.clone() - } - fn raw_published_at(&self) -> u32 { - self.evt.created_at.as_u64() as u32 - } - fn raw_kind(&self) -> u32 { - match self.evt.kind { - nostr::event::Kind::Custom(v) => v as u32, - _ => 0, - } - } - fn raw_content(&self) -> String { - self.evt.content.clone() - } - fn raw_tags(&self) -> Vec<Vec<String>> { - self.tags_as_slices() - } - fn raw_sig(&self) -> String { - self.evt.sig.to_string() - } -} diff --git a/crates/rhi/src/cli.rs b/crates/rhi/src/cli.rs @@ -1,35 +0,0 @@ -use std::path::PathBuf; - -use clap::{Parser, ValueHint, command}; - -#[derive(Parser, Debug, Clone)] -#[command( - about = env!("CARGO_PKG_DESCRIPTION"), - author = env!("CARGO_PKG_AUTHORS"), - version = env!("CARGO_PKG_VERSION") -)] -pub struct Args { - #[arg( - long, - value_name = "PATH", - value_hint = ValueHint::FilePath, - default_value = "config.toml", - help = "Path to the daemon configuration file (defaults to config.toml)" - )] - pub config: PathBuf, - - #[arg( - long, - value_name = "PATH", - value_hint = ValueHint::FilePath, - help = "Path to the daemon identity JSON file (defaults to identity.json)", - )] - pub identity: Option<PathBuf>, - - #[arg( - long, - action = clap::ArgAction::SetTrue, - help = "Allow generating a new identity file if missing; if not set and identity file is absent, the daemon will fail" - )] - pub allow_generate_identity: bool, -} diff --git a/crates/rhi/src/config.rs b/crates/rhi/src/config.rs @@ -1,13 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Configuration { - pub logs_dir: String, - pub relays: Vec<String>, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Settings { - pub metadata: nostr::Metadata, - pub config: Configuration, -} diff --git a/crates/rhi/src/features/trade_listing/domain/pricing.rs b/crates/rhi/src/features/trade_listing/domain/pricing.rs @@ -1,97 +0,0 @@ -use radroots_core::{RadrootsCoreQuantity, RadrootsCoreQuantityPrice}; -use radroots_events::listing::models::{ - RadrootsListing, RadrootsListingDiscount, RadrootsListingQuantity, -}; -use radroots_trade::prelude::price_ext::ListingPricingExt; -use radroots_trade::prelude::stage::order::{ - TradeListingOrderRequestPayload, TradeListingOrderResult, -}; - -use crate::features::trade_listing::handlers::order::JobRequestOrderError; - -pub trait ListingOrderCalculator { - fn calculate_order( - &self, - order: &TradeListingOrderRequestPayload, - ) -> Result<TradeListingOrderResult, JobRequestOrderError>; -} - -impl ListingOrderCalculator for RadrootsListing { - fn calculate_order( - &self, - order: &TradeListingOrderRequestPayload, - ) -> Result<TradeListingOrderResult, JobRequestOrderError> { - let req_qty: &RadrootsListingQuantity = &order.quantity; - let req_qty_amount = req_qty.value.amount; - let req_qty_unit = req_qty.value.unit; - let req_qty_label_opt = req_qty.label.as_deref(); - - let matched_packaging = self.quantities.iter().any(|q| { - let same_amount = q.value.amount.normalize() == req_qty_amount.normalize(); - let same_unit = q.value.unit == req_qty_unit; - let label_ok = match (q.label.as_deref(), req_qty_label_opt) { - (Some(l), Some(r)) => l == r, - (None, None) => true, - _ => false, - }; - same_amount && same_unit && label_ok - }); - - if !matched_packaging { - return Err(JobRequestOrderError::Unsatisfiable(format!( - "requested packaging {} {} {} not available", - req_qty_amount, - req_qty_unit, - req_qty_label_opt.unwrap_or("") - ))); - } - - let req_money = order.price.amount.clone().quantize_to_currency(); - - let matched_tier: &RadrootsCoreQuantityPrice = self - .prices - .iter() - .find(|p| { - let money_ok = p.amount.currency == req_money.currency - && p.amount.amount.normalize() == req_money.amount.normalize(); - let per_amt_ok = - p.quantity.amount.normalize() == order.price.quantity.amount.normalize(); - let per_unit_ok = p.quantity.unit == order.price.quantity.unit; - money_ok && per_amt_ok && per_unit_ok - }) - .ok_or_else(|| { - JobRequestOrderError::Unsatisfiable(format!( - "no matching price tier {} {} found", - order.price.quantity.amount, order.price.quantity.unit - )) - })?; - - let price_amount = matched_tier.amount.clone(); - let price_quantity = matched_tier.quantity.clone(); - - let discounts_out: Vec<RadrootsListingDiscount> = - self.discounts.clone().unwrap_or_default(); - - let out_quantity = RadrootsListingQuantity { - value: RadrootsCoreQuantity::new(req_qty_amount, req_qty_unit), - label: req_qty.label.clone(), - count: req_qty.count, - }; - - let out_price = RadrootsCoreQuantityPrice { - amount: price_amount.clone(), - quantity: price_quantity.clone(), - }; - - let out_subtotal = out_price.subtotal_for(&out_quantity); - let out_total = out_price.total_for(&out_quantity); - - Ok(TradeListingOrderResult { - quantity: out_quantity, - price: out_price, - discounts: discounts_out, - subtotal: out_subtotal, - total: out_total, - }) - } -} diff --git a/crates/rhi/src/features/trade_listing/handlers/accept.rs b/crates/rhi/src/features/trade_listing/handlers/accept.rs @@ -1,142 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{ - JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, - }, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_ROOT}, -}; -use radroots_trade::{ - listing::{ - kinds::{KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_ORDER_RES}, - tags::push_trade_listing_chain_tags, - }, - prelude::stage::accept::{TradeListingAcceptRequest, TradeListingAcceptResult}, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestAcceptError { - #[error("Failed to parse accept request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Unauthorized: accepting profile must own the listing")] - Unauthorized, - #[error("Order result not kind 6301 or listing mismatch")] - InvalidOrderResult, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -pub async fn handle_job_request_trade_accept( - event_job_request: Event, - keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingAcceptRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestAcceptError::ParseRequest(e.to_string()))?; - - let order_res_evt = nostr_fetch_event_by_id(client.clone(), &req.order_result_event_id) - .await - .map_err(|_| JobRequestAcceptError::FetchReference(req.order_result_event_id.clone()))?; - - let listing_evt = nostr_fetch_event_by_id(client.clone(), &req.listing_event_id) - .await - .map_err(|_| JobRequestAcceptError::FetchReference(req.listing_event_id.clone()))?; - - if listing_evt.pubkey != keys.public_key() { - return Err(JobRequestAcceptError::Unauthorized.into()); - } - - if order_res_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ORDER_RES) { - return Err(JobRequestAcceptError::InvalidOrderResult.into()); - } - let order_refs_listing = order_res_evt.tags.iter().any(|t| { - let s = t.as_slice(); - s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT) - && s.get(1).map(String::as_str) == Some(req.listing_event_id.as_str()) - }); - if !order_refs_listing { - return Err(JobRequestAcceptError::InvalidOrderResult.into()); - } - - let accept_result = TradeListingAcceptResult { - listing_event_id: req.listing_event_id.clone(), - order_result_event_id: req.order_result_event_id.clone(), - accepted_by: keys.public_key().to_string(), - }; - let payload_json = serde_json::to_string(&accept_result)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ACCEPT_RES); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None::<JobPaymentRequest>, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - - let e_root = order_res_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten() - .unwrap_or_else(|| req.listing_event_id.clone()); - - let trade_id = order_res_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - Some(req.order_result_event_id.clone()), - trade_id, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/accept ({}={}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/conveyance.rs b/crates/rhi/src/features/trade_listing/handlers/conveyance.rs @@ -1,120 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{ - JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, - }, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_ROOT}, -}; -use radroots_trade::{ - listing::{kinds::KIND_TRADE_LISTING_ACCEPT_RES, tags::push_trade_listing_chain_tags}, - prelude::stage::conveyance::{TradeListingConveyanceRequest, TradeListingConveyanceResult}, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestConveyanceError { - #[error("Failed to parse conveyance request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Invalid accept result kind")] - InvalidAcceptKind, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -pub async fn handle_job_request_trade_conveyance( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingConveyanceRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestConveyanceError::ParseRequest(e.to_string()))?; - - let accept_evt = nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) - .await - .map_err(|_| { - JobRequestConveyanceError::FetchReference(req.accept_result_event_id.clone()) - })?; - if accept_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { - return Err(JobRequestConveyanceError::InvalidAcceptKind.into()); - } - - let conv_res = TradeListingConveyanceResult { - verified: true, - method: req.method, - message: Some("conveyance method verified".into()), - }; - let payload_json = serde_json::to_string(&conv_res)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None::<JobPaymentRequest>, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - - let e_root = accept_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten(); - - let d_tag = accept_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone().unwrap_or_default(), - Some(req.accept_result_event_id.clone()), - d_tag, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/conveyance ({}={:?}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/fulfillment.rs b/crates/rhi/src/features/trade_listing/handlers/fulfillment.rs @@ -1,125 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_ROOT}, -}; -use radroots_trade::{ - listing::tags::push_trade_listing_chain_tags, - prelude::{ - kinds::KIND_TRADE_LISTING_PAYMENT_RES, - stage::fulfillment::{ - TradeListingFulfillmentRequest, TradeListingFulfillmentResult, - TradeListingFulfillmentState, - }, - }, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestFulfillmentError { - #[error("Failed to parse fulfillment request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Payment result not kind 6305 or missing chain")] - InvalidPayment, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -pub async fn handle_job_request_trade_fulfillment( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingFulfillmentRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestFulfillmentError::ParseRequest(e.to_string()))?; - - let payment_evt = nostr_fetch_event_by_id(client.clone(), &req.payment_result_event_id) - .await - .map_err(|_| { - JobRequestFulfillmentError::FetchReference(req.payment_result_event_id.clone()) - })?; - if payment_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_PAYMENT_RES) { - return Err(JobRequestFulfillmentError::InvalidPayment.into()); - } - - let e_root = payment_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten() - .ok_or(JobRequestFulfillmentError::InvalidPayment)?; - - let d_tag = payment_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - let status = TradeListingFulfillmentResult { - state: TradeListingFulfillmentState::Preparing, - tracking: None, - eta: None, - notes: Some("order accepted and paid; preparing shipment".into()), - }; - let payload_json = serde_json::to_string(&status)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - Some(req.payment_result_event_id.clone()), - d_tag, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/fulfillment ({}={}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/invoice.rs b/crates/rhi/src/features/trade_listing/handlers/invoice.rs @@ -1,165 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{ - JobPaymentRequest, - request::models::{RadrootsJobInput, RadrootsJobParam}, - result::models::RadrootsJobResult, - }, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_PREV, TAG_E_ROOT}, -}; -use radroots_trade::{ - listing::tags::push_trade_listing_chain_tags, - prelude::{ - kinds::{ - KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_INVOICE_RES, - KIND_TRADE_LISTING_ORDER_RES, - }, - stage::invoice::{TradeListingInvoiceRequest, TradeListingInvoiceResult}, - }, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestInvoiceError { - #[error("Failed to parse invoice request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Accept result not kind 6302 or missing chain")] - InvalidAccept, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -fn param_lookup<'a>(params: &'a [RadrootsJobParam], key: &str) -> Option<&'a str> { - params - .iter() - .find(|p| p.key == key) - .map(|p| p.value.as_str()) -} - -pub async fn handle_job_request_trade_invoice( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingInvoiceRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestInvoiceError::ParseRequest(e.to_string()))?; - - let accept_evt = nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) - .await - .map_err(|_| JobRequestInvoiceError::FetchReference(req.accept_result_event_id.clone()))?; - if accept_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { - return Err(JobRequestInvoiceError::InvalidAccept.into()); - } - - let e_root = accept_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten() - .ok_or(JobRequestInvoiceError::InvalidAccept)?; - - let d_tag = accept_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - let order_res_id = accept_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_PREV)).then(|| s.get(1).cloned()) - }) - .flatten(); - - if let Some(prev_id) = &order_res_id { - if let Ok(prev_evt) = nostr_fetch_event_by_id(client.clone(), prev_id).await { - if prev_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_ORDER_RES) {} - } - } - - let amount_sat = param_lookup(&job_req.model.params, "amount_sat") - .and_then(|v| v.parse::<u32>().ok()) - .or_else(|| { - param_lookup(&job_req.model.params, "amount_msat") - .and_then(|v| v.parse::<u64>().ok()) - .map(|msat| (msat / 1000) as u32) - }) - .unwrap_or(0); - - let bolt11 = param_lookup(&job_req.model.params, "bolt11").map(|s| s.to_string()); - let note = param_lookup(&job_req.model.params, "note").map(|s| s.to_string()); - let expires_at = - param_lookup(&job_req.model.params, "expires_at").and_then(|v| v.parse::<u32>().ok()); - - let invoice = TradeListingInvoiceResult { - total_sat: amount_sat, - bolt11: bolt11.clone(), - note, - expires_at, - }; - let payload_json = serde_json::to_string(&invoice)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_INVOICE_RES); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: Some(JobPaymentRequest { amount_sat, bolt11 }), - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - Some(req.accept_result_event_id.clone()), - d_tag, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/invoice ({}={}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/mod.rs b/crates/rhi/src/features/trade_listing/handlers/mod.rs @@ -1,7 +0,0 @@ -pub mod accept; -pub mod conveyance; -pub mod fulfillment; -pub mod invoice; -pub mod order; -pub mod payment; -pub mod receipt; diff --git a/crates/rhi/src/features/trade_listing/handlers/order.rs b/crates/rhi/src/features/trade_listing/handlers/order.rs @@ -1,106 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{ - JobPaymentRequest, request::models::RadrootsJobInput, result::models::RadrootsJobResult, - }, - kinds::result_kind_for_request_kind, - listing::models::RadrootsListing, -}; -use radroots_trade::prelude::{ - kinds::KIND_TRADE_LISTING_ORDER_RES, stage::order::TradeListingOrderRequest, tags, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::{ - domain::pricing::ListingOrderCalculator, - subscriber::{JobRequestCtx, JobRequestError}, - }, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestOrderError { - #[error("Failed to parse reference event: {0}")] - ParseReference(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Reference event does not meet request requirements: {0}")] - MissingRequested(String), - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), - #[error("Request cannot be satisfied: {0}")] - Unsatisfiable(String), -} - -pub async fn handle_job_request_trade_order( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let order_data: TradeListingOrderRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestOrderError::ParseReference(e.to_string()))?; - - let ref_id = &order_data.event.id; - let ref_event = nostr_fetch_event_by_id(client.clone(), ref_id) - .await - .map_err(|_| JobRequestOrderError::FetchReference(ref_id.clone()))?; - - let listing: RadrootsListing = serde_json::from_str(&ref_event.content).map_err(|_| { - JobRequestOrderError::ParseReference(format!("invalid listing content for {}", ref_id)) - })?; - - let order_result = listing.calculate_order(&order_data.payload)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ORDER_RES as u16); - - let payload_json = serde_json::to_string(&order_result)?; - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None::<JobPaymentRequest>, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - - let e_root = ref_event.id.to_hex(); - let trade_id = format!("trade:{}:{}", e_root, event_job_request.id.to_hex()); - tags::push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - None::<String>, - Some(trade_id.clone()), - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/order (e_root={}) result sent: {:?}", - e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/payment.rs b/crates/rhi/src/features/trade_listing/handlers/payment.rs @@ -1,116 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_ROOT}, -}; -use radroots_trade::prelude::{ - kinds::KIND_TRADE_LISTING_INVOICE_RES, - stage::payment::{TradeListingPaymentProofRequest, TradeListingPaymentResult}, - tags::push_trade_listing_chain_tags, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestPaymentError { - #[error("Failed to parse payment request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Invoice result not kind 6304 or missing chain")] - InvalidInvoice, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -pub async fn handle_job_request_trade_payment( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingPaymentProofRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestPaymentError::ParseRequest(e.to_string()))?; - - let invoice_evt = nostr_fetch_event_by_id(client.clone(), &req.invoice_result_event_id) - .await - .map_err(|_| JobRequestPaymentError::FetchReference(req.invoice_result_event_id.clone()))?; - if invoice_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_INVOICE_RES) { - return Err(JobRequestPaymentError::InvalidInvoice.into()); - } - - let e_root = invoice_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten() - .ok_or(JobRequestPaymentError::InvalidInvoice)?; - - let d_tag = invoice_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - let ack = TradeListingPaymentResult { - verified: true, - message: Some("payment proof accepted".into()), - }; - let payload_json = serde_json::to_string(&ack)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - Some(req.invoice_result_event_id.clone()), - d_tag, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/payment ({}={}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/handlers/receipt.rs b/crates/rhi/src/features/trade_listing/handlers/receipt.rs @@ -1,118 +0,0 @@ -use nostr::{event::Event, key::Keys}; -use nostr_sdk::{Client, client::Error as NostrClientError}; -use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; -use thiserror::Error; -use tracing::info; - -use radroots_events::{ - RadrootsNostrEventPtr, - job::{request::models::RadrootsJobInput, result::models::RadrootsJobResult}, - kinds::result_kind_for_request_kind, - tag::{TAG_D, TAG_E_ROOT}, -}; -use radroots_trade::prelude::{ - kinds::KIND_TRADE_LISTING_FULFILL_RES, - stage::receipt::{TradeListingReceiptRequest, TradeListingReceiptResult}, - tags::push_trade_listing_chain_tags, -}; - -use crate::{ - adapters::nostr::event::NostrEventAdapter, - features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, - infra::nostr::{build_event_with_tags, nostr_fetch_event_by_id, nostr_send_event}, -}; - -#[derive(Debug, Error)] -pub enum JobRequestReceiptError { - #[error("Failed to parse receipt request: {0}")] - ParseRequest(String), - #[error("Failed to fetch reference event: {0}")] - FetchReference(String), - #[error("Reference event not found: {0}")] - MissingReference(String), - #[error("Fulfillment result not kind 6306 or missing chain")] - InvalidFulfillment, - #[error("Failed to send job response")] - ResponseSend(#[from] NostrClientError), -} - -pub async fn handle_job_request_trade_receipt( - event_job_request: Event, - _keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) -> Result<(), JobRequestError> { - let ev = NostrEventAdapter::new(&event_job_request); - - let req: TradeListingReceiptRequest = serde_json::from_str(&job_req_input.data) - .map_err(|e| JobRequestReceiptError::ParseRequest(e.to_string()))?; - - let fulfill_evt = nostr_fetch_event_by_id(client.clone(), &req.fulfillment_result_event_id) - .await - .map_err(|_| { - JobRequestReceiptError::FetchReference(req.fulfillment_result_event_id.clone()) - })?; - if fulfill_evt.kind != nostr::event::Kind::Custom(KIND_TRADE_LISTING_FULFILL_RES) { - return Err(JobRequestReceiptError::InvalidFulfillment.into()); - } - - let e_root = fulfill_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) - }) - .flatten() - .ok_or(JobRequestReceiptError::InvalidFulfillment)?; - - let d_tag = fulfill_evt - .tags - .iter() - .find_map(|t| { - let s = t.as_slice(); - (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) - }) - .flatten(); - - let ack = TradeListingReceiptResult { - acknowledged: true, - at: event_job_request.created_at.as_u64() as u32, - }; - let payload_json = serde_json::to_string(&ack)?; - - let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) - .unwrap_or(job_req.model.kind as u32 + 1000); - - let result_model = RadrootsJobResult { - kind: result_kind as u16, - request_event: RadrootsNostrEventPtr { - id: ev.raw_id().to_string(), - relays: None, - }, - request_json: Some(serde_json::to_string(&job_req.model)?), - inputs: job_req.model.inputs.clone(), - customer_pubkey: Some(ev.raw_author().to_string()), - payment: None, - content: Some(payload_json.clone()), - encrypted: false, - }; - - let mut tag_slices = job_result_build_tags(&result_model); - push_trade_listing_chain_tags( - &mut tag_slices, - e_root.clone(), - Some(req.fulfillment_result_event_id.clone()), - d_tag, - ); - - let builder = build_event_with_tags(result_kind as u32, payload_json, tag_slices)?; - let job_result_event_id = nostr_send_event(client, builder).await?; - - info!( - "job request trade/receipt ({}={}) result sent: {:?}", - TAG_E_ROOT, e_root, job_result_event_id - ); - Ok(()) -} diff --git a/crates/rhi/src/features/trade_listing/mod.rs b/crates/rhi/src/features/trade_listing/mod.rs @@ -1,3 +0,0 @@ -pub mod domain; -pub mod handlers; -pub mod subscriber; diff --git a/crates/rhi/src/features/trade_listing/subscriber.rs b/crates/rhi/src/features/trade_listing/subscriber.rs @@ -1,401 +0,0 @@ -use std::{str::FromStr, time::Duration}; - -use anyhow::Result; -use nostr::event::{Event, EventId}; -use nostr::filter::Filter; -use nostr::{event::Kind, key::Keys}; -use nostr_sdk::{Client, RelayPoolNotification}; -use radroots_events::job::request::models::RadrootsJobInput; -use radroots_events_codec::job::error::JobParseError; -use radroots_events_codec::job::request::decode::job_request_from_tags; -use radroots_events_codec::job::traits::BorrowedEventAdapter; -use radroots_trade::listing::kinds::{ - KIND_TRADE_LISTING_ACCEPT_REQ, KIND_TRADE_LISTING_CONVEYANCE_REQ, - KIND_TRADE_LISTING_FULFILL_REQ, KIND_TRADE_LISTING_INVOICE_REQ, KIND_TRADE_LISTING_ORDER_REQ, - KIND_TRADE_LISTING_PAYMENT_REQ, KIND_TRADE_LISTING_RECEIPT_REQ, is_trade_listing_request_kind, -}; -use radroots_trade::listing::meta::MARKER_PAYLOAD; - -use tokio::time::sleep; -use tracing::{info, warn}; - -use crate::adapters::nostr::event::NostrEventAdapter; -use crate::features::trade_listing::handlers::accept::{ - JobRequestAcceptError, handle_job_request_trade_accept, -}; -use crate::features::trade_listing::handlers::conveyance::{ - JobRequestConveyanceError, handle_job_request_trade_conveyance, -}; -use crate::features::trade_listing::handlers::fulfillment::{ - JobRequestFulfillmentError, handle_job_request_trade_fulfillment, -}; -use crate::features::trade_listing::handlers::invoice::{ - JobRequestInvoiceError, handle_job_request_trade_invoice, -}; -use crate::features::trade_listing::handlers::order::{ - JobRequestOrderError, handle_job_request_trade_order, -}; -use crate::features::trade_listing::handlers::payment::{ - JobRequestPaymentError, handle_job_request_trade_payment, -}; -use crate::features::trade_listing::handlers::receipt::{ - JobRequestReceiptError, handle_job_request_trade_receipt, -}; -use crate::infra::nostr::{ - NostrTagsResolveError, NostrUtilsError, nostr_filter_new_events, nostr_tags_resolve, -}; - -#[derive(thiserror::Error, Debug)] -pub enum JobRequestError { - #[error("{0}")] - NostrUtilsError(#[from] NostrUtilsError), - - #[error("{0}")] - NostrTagsResolve(#[from] NostrTagsResolveError), - - #[error("{0}")] - JobParse(#[from] JobParseError), - - #[error("Order: {0}")] - JobRequestOrder(#[from] JobRequestOrderError), - - #[error("Accept: {0}")] - JobRequestAccept(#[from] JobRequestAcceptError), - - #[error("Conveyance: {0}")] - JobRequestConveyance(#[from] JobRequestConveyanceError), - - #[error("Invoice: {0}")] - JobRequestInvoice(#[from] JobRequestInvoiceError), - - #[error("Payment: {0}")] - JobRequestPayment(#[from] JobRequestPaymentError), - - #[error("Fulfillment: {0}")] - JobRequestFulfillment(#[from] JobRequestFulfillmentError), - - #[error("Receipt: {0}")] - JobRequestReceipt(#[from] JobRequestReceiptError), - - #[error("Invalid job request input marker: {0}")] - InvalidInputMarker(String), - - #[error("Deserialization error: {0}")] - Serde(#[from] serde_json::Error), - - #[error("Failure to process request")] - Failure, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum JobRequestInputMarker { - TradeOrder, - TradeAccept, - TradeConveyance, - TradeInvoice, - TradePayment, - TradeFulfillment, - TradeReceipt, -} - -impl std::fmt::Display for JobRequestInputMarker { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - JobRequestInputMarker::TradeOrder => "order", - JobRequestInputMarker::TradeAccept => "accept", - JobRequestInputMarker::TradeConveyance => "conveyance", - JobRequestInputMarker::TradeInvoice => "invoice", - JobRequestInputMarker::TradePayment => "payment", - JobRequestInputMarker::TradeFulfillment => "fulfillment", - JobRequestInputMarker::TradeReceipt => "receipt", - }) - } -} - -impl TryFrom<&str> for JobRequestInputMarker { - type Error = JobRequestError; - fn try_from(s: &str) -> Result<Self, Self::Error> { - match s { - "order" => Ok(Self::TradeOrder), - "accept" => Ok(Self::TradeAccept), - "conveyance" => Ok(Self::TradeConveyance), - "invoice" => Ok(Self::TradeInvoice), - "payment" => Ok(Self::TradePayment), - "fulfillment" => Ok(Self::TradeFulfillment), - "receipt" => Ok(Self::TradeReceipt), - other => Err(JobRequestError::InvalidInputMarker(other.to_string())), - } - } -} - -impl FromStr for JobRequestInputMarker { - type Err = JobRequestError; - fn from_str(s: &str) -> Result<Self, Self::Err> { - Self::try_from(s) - } -} - -#[derive(Debug, Clone)] -pub struct JobRequestCtx { - pub id: EventId, - pub model: radroots_events::job::request::models::RadrootsJobRequest, - pub tags: Vec<nostr::event::Tag>, -} - -pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> { - info!( - "Starting subscriber for trade listing request kinds: {}, {}, {}, {}, {}, {}, {}", - KIND_TRADE_LISTING_ORDER_REQ, - KIND_TRADE_LISTING_ACCEPT_REQ, - KIND_TRADE_LISTING_CONVEYANCE_REQ, - KIND_TRADE_LISTING_INVOICE_REQ, - KIND_TRADE_LISTING_PAYMENT_REQ, - KIND_TRADE_LISTING_FULFILL_REQ, - KIND_TRADE_LISTING_RECEIPT_REQ - ); - - let client = Client::new(keys.clone()); - for relay in &relays { - client.add_relay(relay).await?; - } - - let kinds: Vec<Kind> = vec![ - Kind::Custom(KIND_TRADE_LISTING_ORDER_REQ), - Kind::Custom(KIND_TRADE_LISTING_ACCEPT_REQ), - Kind::Custom(KIND_TRADE_LISTING_CONVEYANCE_REQ), - Kind::Custom(KIND_TRADE_LISTING_INVOICE_REQ), - Kind::Custom(KIND_TRADE_LISTING_PAYMENT_REQ), - Kind::Custom(KIND_TRADE_LISTING_FULFILL_REQ), - Kind::Custom(KIND_TRADE_LISTING_RECEIPT_REQ), - ]; - let filter = nostr_filter_new_events(Filter::new().kinds(kinds)); - - client.connect().await; - client.subscribe(filter, None).await?; - - let mut notifications = client.notifications(); - - while let Ok(n) = notifications.recv().await { - if let RelayPoolNotification::Event { event, .. } = n { - let event = (*event).clone(); - - let kind: u16 = match event.kind { - Kind::Custom(v) => v, - _ => 0, - }; - if !is_trade_listing_request_kind(kind) { - continue; - } - - let keys = keys.clone(); - let client = client.clone(); - - tokio::spawn(async move { - if let Err(err) = handle_event(event.clone(), keys.clone(), client.clone()).await { - let _ = handle_error(err, event, keys, client, None).await; - } - }); - } - } - - client.disconnect().await; - Ok(()) -} - -async fn handle_error( - error: JobRequestError, - event: Event, - _keys: Keys, - client: Client, - _job_req: Option<JobRequestCtx>, -) -> Result<()> { - use crate::infra::nostr::nostr_event_job_feedback; - - warn!("job_request handle_error: {}", error); - warn!("job_request handle_error event: {:?}", event); - - let builder = nostr_event_job_feedback(&event, error, "error", None)?; - let event_id = client.send_event_builder(builder).await?; - warn!("job_request handle_error sent feedback {:?}", event_id); - Ok(()) -} - -async fn handle_event(event: Event, keys: Keys, client: Client) -> Result<(), JobRequestError> { - let job_req = parse_event(&event, &keys)?; - - let kind: u16 = match event.kind { - Kind::Custom(v) => v, - _ => 0, - }; - - #[inline] - fn select_payload_input<'a>(inputs: &'a [RadrootsJobInput]) -> Option<&'a RadrootsJobInput> { - inputs - .iter() - .find(|i| i.marker.as_deref() == Some(MARKER_PAYLOAD)) - .or_else(|| inputs.get(0)) - } - - match kind { - KIND_TRADE_LISTING_ORDER_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_order, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_ACCEPT_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_accept, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_CONVEYANCE_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_conveyance, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_INVOICE_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_invoice, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_PAYMENT_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_payment, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_FULFILL_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_fulfillment, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - KIND_TRADE_LISTING_RECEIPT_REQ => { - let input = select_payload_input(&job_req.model.inputs) - .ok_or_else(|| JobRequestError::InvalidInputMarker(MARKER_PAYLOAD.into()))?; - process_job_request( - handle_job_request_trade_receipt, - event.clone(), - keys.clone(), - client.clone(), - job_req.clone(), - input.clone(), - ) - .await; - } - _ => {} - } - - Ok(()) -} - -fn parse_event(event: &Event, keys: &Keys) -> Result<JobRequestCtx, JobRequestError> { - let originally_encrypted = event - .tags - .iter() - .any(|t| t.kind() == nostr::event::TagKind::Encrypted); - - let resolved_tags = nostr_tags_resolve(event, keys)?; - let tag_slices: Vec<Vec<String>> = resolved_tags - .iter() - .map(|t| t.as_slice().to_vec()) - .collect(); - - let kind: u16 = match event.kind { - Kind::Custom(v) => v, - _ => 0, - }; - - let mut model = job_request_from_tags(kind as u32, &tag_slices)?; - if originally_encrypted { - model.encrypted = true; - } - - let ev = NostrEventAdapter::new(event); - let sig_hex = event.sig.to_string(); - let _evt_view = - BorrowedEventAdapter::new(&ev, event.created_at.as_u64() as u32, &tag_slices, &sig_hex); - - Ok(JobRequestCtx { - id: event.id, - model, - tags: resolved_tags, - }) -} - -async fn process_job_request<F, Fut>( - handler: F, - event: Event, - keys: Keys, - client: Client, - job_req: JobRequestCtx, - job_req_input: RadrootsJobInput, -) where - F: FnOnce(Event, Keys, Client, JobRequestCtx, RadrootsJobInput) -> Fut + Send + 'static, - Fut: std::future::Future<Output = Result<(), JobRequestError>> + Send + 'static, -{ - if cfg!(debug_assertions) { - sleep(Duration::from_millis(500)).await; - } - - let error_event = event.clone(); - let error_job_req = job_req.clone(); - let error_keys = keys.clone(); - let error_client = client.clone(); - - if let Err(err) = handler(event, keys, client, job_req, job_req_input).await { - let _ = handle_error( - err, - error_event, - error_keys, - error_client, - Some(error_job_req), - ) - .await; - } -} diff --git a/crates/rhi/src/infra/mod.rs b/crates/rhi/src/infra/mod.rs @@ -1 +0,0 @@ -pub mod nostr; diff --git a/crates/rhi/src/infra/nostr.rs b/crates/rhi/src/infra/nostr.rs @@ -1,224 +0,0 @@ -use std::{borrow::Cow, time::Duration}; - -use nostr::{ - event::{Event, EventBuilder, EventId, Kind, Tag, TagKind, TagStandard}, - filter::Filter, - key::{Keys, PublicKey}, - nips::{ - nip04, - nip90::{DataVendingMachineStatus, JobFeedbackData}, - }, - types::{RelayUrl, Timestamp}, -}; -use nostr_sdk::Client; -use nostr_sdk::prelude::*; -use thiserror::Error; - -use crate::features::trade_listing::subscriber::JobRequestError; - -#[derive(Debug, Error)] -pub enum NostrUtilsError { - #[error("Client error: {0}")] - ClientError(#[from] nostr_sdk::client::Error), - - #[error("Event error: {0}")] - EventError(#[from] nostr::event::Error), - - #[error("Event not found: {0}")] - EventNotFound(String), - - #[error("Event builder failure: {0}")] - EventBuildError(#[from] nostr::event::builder::Error), -} - -#[derive(Debug, Error)] -pub enum NostrTagsResolveError { - #[error("Missing public key tag in encrypted event: {0:?}")] - MissingPTag(nostr::Event), - - #[error("Encrypted event recipient mismatch")] - NotRecipient, - - #[error("Decryption error: {0}")] - DecryptionError(String), - - #[error("Failed to parse decrypted tag JSON: {0}")] - ParseError(#[from] serde_json::Error), -} - -pub fn nostr_kind(kind: u16) -> Kind { - Kind::Custom(kind) -} - -pub fn nostr_filter_kind(kind: u16) -> Filter { - Filter::new().kind(Kind::Custom(kind)) -} - -pub fn nostr_filter_new_events(filter: Filter) -> Filter { - filter.since(Timestamp::now()) -} - -pub fn nostr_tag_first_value(tag: &Tag, key: &str) -> Option<String> { - if tag.kind() == TagKind::custom(key) { - tag.content().map(|v| v.to_string()) - } else { - None - } -} - -pub fn nostr_tag_at_value(tag: &Tag, index: usize) -> Option<String> { - tag.as_slice().get(index).cloned() -} - -pub fn nostr_tag_slice(tag: &Tag, start: usize) -> Option<Vec<String>> { - tag.as_slice().get(start..).map(|s| s.to_vec()) -} - -pub fn nostr_tag_relays_parse(tag: &Tag) -> Option<&Vec<RelayUrl>> { - match tag.as_standardized()? { - TagStandard::Relays(urls) => Some(urls), - _ => None, - } -} - -pub fn nostr_tags_match<'a>(tag: &'a Tag) -> Option<(&'a str, &'a [String])> { - if let TagKind::Custom(Cow::Borrowed(key)) = tag.kind() { - Some((key, &tag.as_slice()[1..])) - } else { - None - } -} - -pub fn nostr_tag_match_l(tag: &Tag) -> Option<(&str, f64)> { - let values = tag.as_slice(); - - if values.len() >= 3 && values[0].eq_ignore_ascii_case("l") { - if let Ok(value) = values[1].parse::<f64>() { - return Some((values[2].as_str(), value)); - } - } - - None -} - -pub fn nostr_tag_match_location(tag: &Tag) -> Option<(&str, &str, &str)> { - let values = tag.as_slice(); - - if values.len() >= 4 && values[0] == "location" { - Some((values[1].as_str(), values[2].as_str(), values[3].as_str())) - } else { - None - } -} - -pub fn nostr_tag_match_geohash(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Geohash(geohash) => Some(geohash.clone()), - _ => None, - } -} - -pub fn nostr_tag_match_title(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Title(title) => Some(title.clone()), - _ => None, - } -} - -pub fn nostr_tag_match_summary(tag: &Tag) -> Option<String> { - match tag.as_standardized()? { - TagStandard::Summary(summary) => Some(summary.clone()), - _ => None, - } -} - -pub fn nostr_event_job_result( - job_request: &Event, - payload: impl Into<String>, - millisats: u64, - bolt11: Option<String>, - tags: Option<Vec<Tag>>, -) -> Result<EventBuilder, NostrUtilsError> { - let builder = EventBuilder::job_result(job_request.clone(), payload, millisats, bolt11)? - .tags(tags.unwrap_or_default()); - Ok(builder) -} - -pub fn nostr_event_job_feedback( - job_request: &Event, - error: JobRequestError, - status: &str, - tags: Option<Vec<Tag>>, -) -> Result<EventBuilder, NostrUtilsError> { - let status = status - .parse::<DataVendingMachineStatus>() - .unwrap_or(DataVendingMachineStatus::Error); - let feedback_data = - JobFeedbackData::new(&job_request.clone(), status).extra_info(error.to_string()); - let builder = EventBuilder::job_feedback(feedback_data).tags(tags.unwrap_or_default()); - Ok(builder) -} - -pub async fn nostr_send_event( - client: Client, - event: EventBuilder, -) -> Result<Output<EventId>, NostrUtilsError> { - Ok(client.send_event_builder(event).await?) -} - -pub async fn nostr_fetch_event_by_id(client: Client, id: &str) -> Result<Event, NostrUtilsError> { - let event_id = EventId::parse(id)?; - let filter = Filter::new().id(event_id); - let events = client.fetch_events(filter, Duration::from_secs(10)).await?; - let event = events - .first() - .ok_or_else(|| NostrUtilsError::EventNotFound(event_id.to_hex()))?; - Ok(event.clone()) -} - -pub fn nostr_tags_resolve(event: &Event, keys: &Keys) -> Result<Vec<Tag>, NostrTagsResolveError> { - if event.tags.iter().any(|t| t.kind() == TagKind::Encrypted) { - let recipient = event - .tags - .iter() - .find_map(|tag| { - if tag.kind() == TagKind::p() { - tag.content()?.parse::<PublicKey>().ok() - } else { - None - } - }) - .ok_or_else(|| NostrTagsResolveError::MissingPTag(event.clone()))?; - - if recipient != keys.public_key() { - return Err(NostrTagsResolveError::NotRecipient.into()); - } - - let cleartext = nip04::decrypt(keys.secret_key(), &event.pubkey, &event.content) - .map_err(|e| NostrTagsResolveError::DecryptionError(e.to_string()))?; - - let decrypted_tags: nostr::event::tag::list::Tags = serde_json::from_str(&cleartext)?; - - Ok(decrypted_tags.to_vec()) - } else { - Ok(event.clone().tags.to_vec()) - } -} - -pub fn build_event_with_tags( - kind_u32: u32, - content: impl Into<String>, - tag_slices: Vec<Vec<String>>, -) -> Result<EventBuilder, NostrUtilsError> { - let mut tags: Vec<Tag> = Vec::new(); - for s in tag_slices { - if s.is_empty() { - continue; - } - let key = s[0].clone(); - let values = s.into_iter().skip(1).collect::<Vec<String>>(); - tags.push(Tag::custom(TagKind::Custom(key.into()), values)); - } - let builder = EventBuilder::new(Kind::Custom(kind_u32 as u16), content.into()).tags(tags); - Ok(builder) -} diff --git a/crates/rhi/src/key_profile.rs b/crates/rhi/src/key_profile.rs @@ -1,29 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct KeyProfile { - pub key: String, - pub identifier: String, - pub metadata: Option<nostr::Event>, - pub application_handler: Option<nostr::Event>, -} - -impl radroots_identity::IdentitySpec for KeyProfile { - type Keys = nostr::Keys; - type ParseError = nostr::key::Error; - - fn generate_new() -> Self { - let keys = nostr::Keys::generate(); - Self { - key: keys.secret_key().to_secret_hex(), - identifier: uuid::Uuid::new_v4().to_string(), - metadata: None, - application_handler: None, - } - } - - fn to_keys(&self) -> Result<Self::Keys, Self::ParseError> { - use std::str::FromStr; - nostr::Keys::from_str(&self.key) - } -} diff --git a/crates/rhi/src/lib.rs b/crates/rhi/src/lib.rs @@ -1,142 +0,0 @@ -pub mod adapters; -pub mod cli; -pub mod config; -pub mod infra; -pub mod rhi; - -pub mod features { - pub mod trade_listing; -} - -pub mod key_profile; - -pub use cli::Args as cli_args; - -use anyhow::Result; - -use crate::{ - key_profile::KeyProfile, - rhi::{Rhi, start_subscriber}, -}; - -pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> { - let identity = radroots_identity::load_or_generate::<KeyProfile, _>( - args.identity.as_ref(), - args.allow_generate_identity, - )?; - let keys = radroots_identity::to_keys(&identity.value)?; - - let rhi = Rhi::new(keys.clone()); - - for relay in settings.config.relays.iter() { - rhi.client.add_relay(relay).await?; - } - - if !settings.config.relays.is_empty() { - let client = rhi.client.clone(); - let md = settings.metadata.clone(); - let has_metadata = serde_json::to_value(&md) - .ok() - .and_then(|v| v.as_object().cloned()) - .map(|o| !o.is_empty()) - .unwrap_or(false); - - tokio::spawn(async move { - client.connect().await; - if has_metadata { - if let Err(e) = client.set_metadata(&md).await { - tracing::warn!("Failed to publish metadata on startup: {e}"); - } else { - tracing::info!("Published metadata on startup"); - } - } - }); - } - - let keys_sub = keys.clone(); - let relays_sub = settings.config.relays.clone(); - tokio::spawn(async move { - loop { - if let Err(e) = crate::features::trade_listing::subscriber::subscriber( - keys_sub.clone(), - relays_sub.clone(), - ) - .await - { - tracing::error!("Error on job request subscription: {e}"); - } - } - }); - - let handle = start_subscriber(keys.clone(), settings.config.relays.clone()).await; - - let stop_handle = handle.clone(); - - tokio::select! { - _ = radroots_runtime::shutdown_signal() => { - tracing::info!("Shutting down…"); - stop_handle.stop(); - } - _ = handle.stopped() => {} - } - - /* - let identity = radroots_identity::load_or_generate::<KeyProfile, _>( - args.identity.as_ref(), - args.allow_generate_identity, - )?; - let keys = radroots_identity::to_keys(&identity.value)?; - - let metadata = settings.metadata.clone(); - - let mut events_to_send: Vec<Event> = vec![]; - - if let Some(event) = identity.value.metadata.clone() { - events_to_send.push(event); - } - - if let Some(event) = identity.value.application_handler.clone() { - events_to_send.push(event); - } - - if !events_to_send.is_empty() { - let client = Client::new(keys.clone()); - for relay in &settings.config.relays { - client.add_relay(relay).await?; - } - client.connect().await; - for event in events_to_send { - client.send_event(&event).await?; - info!("Sent kind {} event for key profile", event.kind); - } - client.disconnect().await; - } - - let keys_sub = keys.clone(); - let relays_sub = settings.config.relays.clone(); - - tokio::spawn(async move { - loop { - if let Err(e) = - trade_listing::subscriber::subscriber(keys_sub.clone(), relays_sub.clone()).await - { - error!("Error on job request subscription: {e}"); - } - } - }); - - let mut sigterm = signal(SignalKind::terminate())?; - let mut sigint = signal(SignalKind::interrupt())?; - - tokio::select! { - _ = sigterm.recv() => { - info!("Received SIGTERM. Shutting down..."); - }, - _ = sigint.recv() => { - info!("Received SIGINT. Shutting down..."); - } - } - */ - - Ok(()) -} diff --git a/crates/rhi/src/main.rs b/crates/rhi/src/main.rs @@ -1,22 +0,0 @@ -use anyhow::Result; -use rhi::{cli_args, config, run_rhi}; -use tracing::info; - -#[tokio::main] -async fn main() { - if let Err(err) = setup().await { - eprintln!("Fatal error: {err:#?}"); - std::process::exit(1); - } -} - -async fn setup() -> Result<()> { - let (args, settings): (cli_args, config::Settings) = - radroots_runtime::parse_and_load_path(|a: &cli_args| Some(a.config.as_path()))?; - - radroots_runtime::init_with(&settings.config.logs_dir, None)?; - - info!("Starting"); - - run_rhi(&settings, &args).await -} diff --git a/crates/rhi/src/rhi.rs b/crates/rhi/src/rhi.rs @@ -1,70 +0,0 @@ -use nostr_sdk::Client; -use std::time::Instant; - -pub struct Rhi { - pub(crate) _started: Instant, - pub client: Client, -} - -impl Rhi { - pub fn new(keys: nostr::Keys) -> Self { - let client = Client::new(keys); - Self { - _started: Instant::now(), - client, - } - } -} - -use std::sync::Arc; -use tokio::sync::Mutex; - -pub struct RhiHandle { - stop_tx: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>, - join: Option<tokio::task::JoinHandle<()>>, -} - -impl Clone for RhiHandle { - fn clone(&self) -> Self { - Self { - stop_tx: Arc::clone(&self.stop_tx), - join: None, // don’t clone the JoinHandle! - } - } -} - -impl RhiHandle { - pub fn stop(&self) { - if let Some(tx) = self.stop_tx.try_lock().ok().and_then(|mut opt| opt.take()) { - let _ = tx.send(()); - } - } - - pub async fn stopped(mut self) { - if let Some(join) = self.join.take() { - let _ = join.await; - } - } -} - -pub async fn start_subscriber(keys: nostr::Keys, relays: Vec<String>) -> RhiHandle { - let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel(); - - let join = tokio::spawn(async move { - loop { - tokio::select! { - _ = &mut stop_rx => break, - res = crate::features::trade_listing::subscriber::subscriber(keys.clone(), relays.clone()) => { - if let Err(e) = res { - tracing::error!("Error on job request subscription: {e}"); - } - } - } - } - }); - - RhiHandle { - stop_tx: Arc::new(Mutex::new(Some(stop_tx))), - join: Some(join), - } -} diff --git a/identity.json b/identity.json @@ -0,0 +1,3 @@ +{ + "secret_key": "b13b688e1aa9f113739745549484901d90a7b0e84d4c78d8baf969670d5cfce4" +} +\ No newline at end of file diff --git a/rust-toolchain.toml b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.86.0" -\ No newline at end of file +channel = "1.88.0" +\ No newline at end of file diff --git a/crates/rhi/src/adapters/mod.rs b/src/adapters/mod.rs diff --git a/src/adapters/nostr/event.rs b/src/adapters/nostr/event.rs @@ -0,0 +1,78 @@ +use radroots_events_codec::job::traits::{JobEventBorrow, JobEventLike}; +use radroots_nostr::prelude::{RadrootsNostrEvent, RadrootsNostrKind}; + +#[derive(Clone, Debug)] +pub struct NostrEventAdapter<'a> { + evt: &'a RadrootsNostrEvent, + id_hex: String, + author_hex: String, +} + +impl<'a> NostrEventAdapter<'a> { + #[inline] + pub fn new(evt: &'a RadrootsNostrEvent) -> Self { + Self { + evt, + id_hex: evt.id.to_hex(), + author_hex: evt.pubkey.to_string(), + } + } + + #[inline] + fn tags_as_slices(&self) -> Vec<Vec<String>> { + self.evt + .tags + .iter() + .map(|t| t.as_slice().to_vec()) + .collect() + } +} + +impl<'a> JobEventBorrow<'a> for NostrEventAdapter<'a> { + #[inline] + fn raw_id(&'a self) -> &'a str { + &self.id_hex + } + #[inline] + fn raw_author(&'a self) -> &'a str { + &self.author_hex + } + #[inline] + fn raw_content(&'a self) -> &'a str { + &self.evt.content + } + #[inline] + fn raw_kind(&'a self) -> u32 { + match self.evt.kind { + RadrootsNostrKind::Custom(v) => v as u32, + _ => 0, + } + } +} + +impl JobEventLike for NostrEventAdapter<'_> { + fn raw_id(&self) -> String { + self.id_hex.clone() + } + fn raw_author(&self) -> String { + self.author_hex.clone() + } + fn raw_published_at(&self) -> u32 { + self.evt.created_at.as_u64() as u32 + } + fn raw_kind(&self) -> u32 { + match self.evt.kind { + RadrootsNostrKind::Custom(v) => v as u32, + _ => 0, + } + } + fn raw_content(&self) -> String { + self.evt.content.clone() + } + fn raw_tags(&self) -> Vec<Vec<String>> { + self.tags_as_slices() + } + fn raw_sig(&self) -> String { + self.evt.sig.to_string() + } +} diff --git a/crates/rhi/src/adapters/nostr/mod.rs b/src/adapters/nostr/mod.rs diff --git a/src/cli.rs b/src/cli.rs @@ -0,0 +1,35 @@ +use std::path::PathBuf; + +use clap::{Parser, ValueHint, command}; + +#[derive(Parser, Debug, Clone)] +#[command( + about = env!("CARGO_PKG_DESCRIPTION"), + author = env!("CARGO_PKG_AUTHORS"), + version = env!("CARGO_PKG_VERSION") +)] +pub struct Args { + #[arg( + long, + value_name = "PATH", + value_hint = ValueHint::FilePath, + default_value = "config.toml", + help = "Path to the daemon configuration file (defaults to config.toml)" + )] + pub config: PathBuf, + + #[arg( + long, + value_name = "PATH", + value_hint = ValueHint::FilePath, + help = "Path to the daemon identity file (json, txt, or raw 32-byte key; defaults to identity.json)", + )] + pub identity: Option<PathBuf>, + + #[arg( + long, + action = clap::ArgAction::SetTrue, + help = "Allow generating a new identity file if missing; if not set and identity file is absent, the daemon will fail" + )] + pub allow_generate_identity: bool, +} diff --git a/src/config.rs b/src/config.rs @@ -0,0 +1,23 @@ +use radroots_nostr::prelude::RadrootsNostrMetadata; +use radroots_runtime::BackoffConfig; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Configuration { + pub logs_dir: String, + pub relays: Vec<String>, + #[serde(default)] + pub subscriber: SubscriberConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SubscriberConfig { + #[serde(default)] + pub backoff: BackoffConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Settings { + pub metadata: RadrootsNostrMetadata, + pub config: Configuration, +} diff --git a/crates/rhi/src/features/trade_listing/domain/mod.rs b/src/features/trade_listing/domain/mod.rs diff --git a/src/features/trade_listing/domain/pricing.rs b/src/features/trade_listing/domain/pricing.rs @@ -0,0 +1,97 @@ +use radroots_core::{RadrootsCoreQuantity, RadrootsCoreQuantityPrice}; +use radroots_events::listing::{ + RadrootsListing, RadrootsListingDiscount, RadrootsListingQuantity, +}; +use radroots_trade::prelude::price_ext::ListingPricingExt; +use radroots_trade::prelude::stage::order::{ + TradeListingOrderRequestPayload, TradeListingOrderResult, +}; + +use crate::features::trade_listing::handlers::order::JobRequestOrderError; + +pub trait ListingOrderCalculator { + fn calculate_order( + &self, + order: &TradeListingOrderRequestPayload, + ) -> Result<TradeListingOrderResult, JobRequestOrderError>; +} + +impl ListingOrderCalculator for RadrootsListing { + fn calculate_order( + &self, + order: &TradeListingOrderRequestPayload, + ) -> Result<TradeListingOrderResult, JobRequestOrderError> { + let req_qty: &RadrootsListingQuantity = &order.quantity; + let req_qty_amount = req_qty.value.amount; + let req_qty_unit = req_qty.value.unit; + let req_qty_label_opt = req_qty.label.as_deref(); + + let matched_packaging = self.quantities.iter().any(|q| { + let same_amount = q.value.amount.normalize() == req_qty_amount.normalize(); + let same_unit = q.value.unit == req_qty_unit; + let label_ok = match (q.label.as_deref(), req_qty_label_opt) { + (Some(l), Some(r)) => l == r, + (None, None) => true, + _ => false, + }; + same_amount && same_unit && label_ok + }); + + if !matched_packaging { + return Err(JobRequestOrderError::Unsatisfiable(format!( + "requested packaging {} {} {} not available", + req_qty_amount, + req_qty_unit, + req_qty_label_opt.unwrap_or("") + ))); + } + + let req_money = order.price.amount.clone().quantize_to_currency(); + + let matched_tier: &RadrootsCoreQuantityPrice = self + .prices + .iter() + .find(|p| { + let money_ok = p.amount.currency == req_money.currency + && p.amount.amount.normalize() == req_money.amount.normalize(); + let per_amt_ok = + p.quantity.amount.normalize() == order.price.quantity.amount.normalize(); + let per_unit_ok = p.quantity.unit == order.price.quantity.unit; + money_ok && per_amt_ok && per_unit_ok + }) + .ok_or_else(|| { + JobRequestOrderError::Unsatisfiable(format!( + "no matching price tier {} {} found", + order.price.quantity.amount, order.price.quantity.unit + )) + })?; + + let price_amount = matched_tier.amount.clone(); + let price_quantity = matched_tier.quantity.clone(); + + let discounts_out: Vec<RadrootsListingDiscount> = + self.discounts.clone().unwrap_or_default(); + + let out_quantity = RadrootsListingQuantity { + value: RadrootsCoreQuantity::new(req_qty_amount, req_qty_unit), + label: req_qty.label.clone(), + count: req_qty.count, + }; + + let out_price = RadrootsCoreQuantityPrice { + amount: price_amount.clone(), + quantity: price_quantity.clone(), + }; + + let out_subtotal = out_price.subtotal_for(&out_quantity); + let out_total = out_price.total_for(&out_quantity); + + Ok(TradeListingOrderResult { + quantity: out_quantity, + price: out_price, + discounts: discounts_out, + subtotal: out_subtotal, + total: out_total, + }) + } +} diff --git a/src/features/trade_listing/handlers/accept.rs b/src/features/trade_listing/handlers/accept.rs @@ -0,0 +1,148 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::JobPaymentRequest, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::{ + kinds::{KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_ORDER_RES}, + tags::push_trade_listing_chain_tags, + }, + prelude::stage::accept::{TradeListingAcceptRequest, TradeListingAcceptResult}, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestAcceptError { + #[error("Failed to parse accept request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Unauthorized: accepting profile must own the listing")] + Unauthorized, + #[error("Order result not kind 6301 or listing mismatch")] + InvalidOrderResult, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +pub async fn handle_job_request_trade_accept( + event_job_request: RadrootsNostrEvent, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingAcceptRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestAcceptError::ParseRequest(e.to_string()))?; + + let order_res_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.order_result_event_id) + .await + .map_err(|_| JobRequestAcceptError::FetchReference(req.order_result_event_id.clone()))?; + + let listing_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.listing_event_id) + .await + .map_err(|_| JobRequestAcceptError::FetchReference(req.listing_event_id.clone()))?; + + if listing_evt.pubkey != keys.public_key() { + return Err(JobRequestAcceptError::Unauthorized.into()); + } + + if order_res_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_RES) { + return Err(JobRequestAcceptError::InvalidOrderResult.into()); + } + let order_refs_listing = order_res_evt.tags.iter().any(|t| { + let s = t.as_slice(); + s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT) + && s.get(1).map(String::as_str) == Some(req.listing_event_id.as_str()) + }); + if !order_refs_listing { + return Err(JobRequestAcceptError::InvalidOrderResult.into()); + } + + let accept_result = TradeListingAcceptResult { + listing_event_id: req.listing_event_id.clone(), + order_result_event_id: req.order_result_event_id.clone(), + accepted_by: keys.public_key().to_string(), + }; + let payload_json = serde_json::to_string(&accept_result)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ACCEPT_RES); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = order_res_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .unwrap_or_else(|| req.listing_event_id.clone()); + + let trade_id = order_res_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.order_result_event_id.clone()), + trade_id, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/accept ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/conveyance.rs b/src/features/trade_listing/handlers/conveyance.rs @@ -0,0 +1,126 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::JobPaymentRequest, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::{kinds::KIND_TRADE_LISTING_ACCEPT_RES, tags::push_trade_listing_chain_tags}, + prelude::stage::conveyance::{TradeListingConveyanceRequest, TradeListingConveyanceResult}, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestConveyanceError { + #[error("Failed to parse conveyance request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Invalid accept result kind")] + InvalidAcceptKind, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +pub async fn handle_job_request_trade_conveyance( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingConveyanceRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestConveyanceError::ParseRequest(e.to_string()))?; + + let accept_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) + .await + .map_err(|_| { + JobRequestConveyanceError::FetchReference(req.accept_result_event_id.clone()) + })?; + if accept_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { + return Err(JobRequestConveyanceError::InvalidAcceptKind.into()); + } + + let conv_res = TradeListingConveyanceResult { + verified: true, + method: req.method, + message: Some("conveyance method verified".into()), + }; + let payload_json = serde_json::to_string(&conv_res)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let d_tag = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone().unwrap_or_default(), + Some(req.accept_result_event_id.clone()), + d_tag, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/conveyance ({}={:?}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -0,0 +1,1041 @@ +#![forbid(unsafe_code)] + +use std::{sync::Arc, time::Duration}; + +use radroots_nostr::prelude::{ + radroots_event_from_nostr, + radroots_nostr_build_event, + radroots_nostr_build_event_job_feedback, + radroots_nostr_fetch_event_by_id, + radroots_nostr_parse_pubkey, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrFilter, + RadrootsNostrKind, + RadrootsNostrKeys, + RadrootsNostrTag, +}; +use radroots_trade::listing::{ + dvm::{ + TradeListingEnvelope, TradeListingEnvelopeError, TradeListingMessageType, + TradeListingValidateRequest, TradeListingValidateResult, TradeOrderResponse, + TradeOrderRevisionResponse, TradeListingCancel, TradeListingAddress, + }, + dvm_kinds::is_trade_listing_dvm_kind, + order::{ + TradeAnswer, TradeDiscountDecision, TradeDiscountOffer, TradeDiscountRequest, + TradeFulfillmentUpdate, TradeOrder, TradeOrderRevision, TradeOrderStatus, TradeQuestion, + TradeReceipt, + }, + validation::{validate_listing_event, TradeListingValidationError}, +}; +use serde::de::DeserializeOwned; +use thiserror::Error; + +use crate::features::trade_listing::state::{TradeListingState, TradeListingStateError, TradeOrderState}; + +#[derive(Debug, Error)] +pub enum TradeListingDvmError { + #[error("event kind not supported")] + UnsupportedKind, + #[error("missing recipient tag")] + MissingRecipient, + #[error("missing required tag: {0}")] + MissingTag(&'static str), + #[error("tag mismatch: {0}")] + TagMismatch(&'static str), + #[error("invalid envelope: {0}")] + InvalidEnvelope(#[from] TradeListingEnvelopeError), + #[error("invalid envelope payload: {0}")] + InvalidPayload(String), + #[error("invalid listing address")] + InvalidListingAddr, + #[error("invalid order request payload")] + InvalidOrder, + #[error("state error: {0}")] + State(#[from] TradeListingStateError), + #[error("nostr error: {0}")] + Nostr(#[from] radroots_nostr::error::RadrootsNostrError), + #[error("serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("unauthorized sender")] + Unauthorized, + #[error("listing not validated")] + ListingNotValidated, +} + +pub async fn handle_event( + event: RadrootsNostrEvent, + tags: Vec<RadrootsNostrTag>, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + state: Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let kind = match event.kind { + RadrootsNostrKind::Custom(v) => v, + _ => return Err(TradeListingDvmError::UnsupportedKind), + }; + if !is_trade_listing_dvm_kind(kind) { + return Err(TradeListingDvmError::UnsupportedKind); + } + + if event.pubkey == keys.public_key() { + return Ok(()); + } + + let tag_slices: Vec<Vec<String>> = tags.iter().map(|t| t.as_slice().to_vec()).collect(); + let rhi_pubkey = keys.public_key().to_string(); + if !tag_has_value(&tag_slices, "p", &rhi_pubkey) { + return Err(TradeListingDvmError::MissingRecipient); + } + + let envelope: TradeListingEnvelope<serde_json::Value> = + serde_json::from_str(&event.content)?; + envelope.validate()?; + if envelope.message_type.kind() != kind { + return Err(TradeListingDvmError::TagMismatch("kind")); + } + + let listing_addr = tag_value(&tag_slices, "a").ok_or(TradeListingDvmError::MissingTag("a"))?; + if listing_addr != envelope.listing_addr { + return Err(TradeListingDvmError::TagMismatch("a")); + } + + let order_id = envelope.order_id.as_deref(); + if envelope.message_type.requires_order_id() { + let tag_order_id = + tag_value(&tag_slices, "d").ok_or(TradeListingDvmError::MissingTag("d"))?; + if Some(tag_order_id.as_str()) != order_id { + return Err(TradeListingDvmError::TagMismatch("d")); + } + } + + let listing_addr_parsed = + TradeListingAddress::parse(&listing_addr).map_err(|_| TradeListingDvmError::InvalidListingAddr)?; + if listing_addr_parsed.kind != 30402 { + return Err(TradeListingDvmError::InvalidListingAddr); + } + + match envelope.message_type { + TradeListingMessageType::ListingValidateRequest => { + let payload: TradeListingValidateRequest = parse_payload(envelope.payload)?; + handle_listing_validate_request( + &event, + payload, + &listing_addr, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::OrderRequest => { + let payload: TradeOrder = parse_payload(envelope.payload)?; + handle_order_request( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::OrderResponse => { + let payload: TradeOrderResponse = parse_payload(envelope.payload)?; + handle_order_response( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::OrderRevision => { + let payload: TradeOrderRevision = parse_payload(envelope.payload)?; + handle_order_revision( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::OrderRevisionAccept | TradeListingMessageType::OrderRevisionDecline => { + let payload: TradeOrderRevisionResponse = parse_payload(envelope.payload)?; + handle_order_revision_response( + &event, + envelope.message_type, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::Question => { + let payload: TradeQuestion = parse_payload(envelope.payload)?; + handle_question( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::Answer => { + let payload: TradeAnswer = parse_payload(envelope.payload)?; + handle_answer( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::DiscountRequest => { + let payload: TradeDiscountRequest = parse_payload(envelope.payload)?; + handle_discount_request( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::DiscountOffer => { + let payload: TradeDiscountOffer = parse_payload(envelope.payload)?; + handle_discount_offer( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::DiscountAccept | TradeListingMessageType::DiscountDecline => { + let payload: TradeDiscountDecision = parse_payload(envelope.payload)?; + handle_discount_decision( + &event, + envelope.message_type, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::Cancel => { + let payload: TradeListingCancel = parse_payload(envelope.payload)?; + handle_cancel( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::FulfillmentUpdate => { + let payload: TradeFulfillmentUpdate = parse_payload(envelope.payload)?; + handle_fulfillment_update( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::Receipt => { + let payload: TradeReceipt = parse_payload(envelope.payload)?; + handle_receipt( + &event, + payload, + &listing_addr_parsed, + order_id, + &client, + &state, + ) + .await?; + } + TradeListingMessageType::ListingValidateResult => {} + } + + Ok(()) +} + +async fn handle_listing_validate_request( + event: &RadrootsNostrEvent, + payload: TradeListingValidateRequest, + listing_addr: &str, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let listing_event = if let Some(ptr) = payload.listing_event { + match radroots_nostr_fetch_event_by_id(client, &ptr.id).await { + Ok(evt) => Some(evt), + Err(err) => { + let error = match err { + radroots_nostr::error::RadrootsNostrError::EventNotFound(_) => { + TradeListingValidationError::ListingEventNotFound { + listing_addr: listing_addr.to_string(), + } + } + _ => TradeListingValidationError::ListingEventFetchFailed { + listing_addr: listing_addr.to_string(), + }, + }; + send_validate_result(event, client, listing_addr, vec![error]).await?; + return Ok(()); + } + } + } else { + match fetch_listing_by_addr(client, listing_addr).await { + Ok(event) => event, + Err(_) => { + let error = TradeListingValidationError::ListingEventFetchFailed { + listing_addr: listing_addr.to_string(), + }; + send_validate_result(event, client, listing_addr, vec![error]).await?; + return Ok(()); + } + } + }; + + let errors = if let Some(event) = listing_event { + let rr_event = radroots_event_from_nostr(&event); + match validate_listing_event(&rr_event) { + Ok(_) => { + let mut state = state.lock().await; + state.mark_listing_validated(listing_addr); + Vec::new() + } + Err(err) => vec![err], + } + } else { + vec![TradeListingValidationError::ListingEventNotFound { + listing_addr: listing_addr.to_string(), + }] + }; + + send_validate_result(event, client, listing_addr, errors).await +} + +async fn send_validate_result( + event: &RadrootsNostrEvent, + client: &RadrootsNostrClient, + listing_addr: &str, + errors: Vec<TradeListingValidationError>, +) -> Result<(), TradeListingDvmError> { + let payload = TradeListingValidateResult { + valid: errors.is_empty(), + errors, + }; + send_envelope( + client, + event.pubkey.to_string(), + TradeListingMessageType::ListingValidateResult, + listing_addr, + None, + &payload, + ) + .await +} + +async fn handle_order_request( + event: &RadrootsNostrEvent, + payload: TradeOrder, + listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if payload.order_id != order_id || payload.listing_addr != listing_addr.as_str() { + return Err(TradeListingDvmError::InvalidOrder); + } + + let mut state = state.lock().await; + if !state.is_listing_validated(&payload.listing_addr) { + return Err(TradeListingDvmError::ListingNotValidated); + } + if state.order_exists(order_id) { + return Ok(()); + } + + if payload.buyer_pubkey != event.pubkey.to_string() + || payload.seller_pubkey != listing_addr.seller_pubkey + { + return Err(TradeListingDvmError::Unauthorized); + } + + let mut seen = std::collections::HashSet::new(); + seen.insert(event.id.to_string()); + + state.insert_order(TradeOrderState { + order_id: order_id.to_string(), + listing_addr: payload.listing_addr.clone(), + buyer_pubkey: payload.buyer_pubkey.clone(), + seller_pubkey: payload.seller_pubkey.clone(), + status: TradeOrderStatus::Requested, + seen_event_ids: seen, + }); + + drop(state); + + send_envelope( + client, + payload.seller_pubkey.clone(), + TradeListingMessageType::OrderRequest, + &payload.listing_addr, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_order_response( + event: &RadrootsNostrEvent, + payload: TradeOrderResponse, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.seller_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + + let next_status = if payload.accepted { + TradeOrderStatus::Accepted + } else { + TradeOrderStatus::Declined + }; + ensure_transition(order.status.clone(), next_status.clone())?; + order.status = next_status; + order.seen_event_ids.insert(event_id); + + let buyer = order.buyer_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + buyer, + TradeListingMessageType::OrderResponse, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_order_revision( + event: &RadrootsNostrEvent, + payload: TradeOrderRevision, + listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if payload.order_id != order_id { + return Err(TradeListingDvmError::InvalidOrder); + } + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.seller_pubkey != event.pubkey.to_string() + || listing_addr.seller_pubkey != order.seller_pubkey + { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; + order.status = TradeOrderStatus::Revised; + order.seen_event_ids.insert(event_id); + let buyer = order.buyer_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + buyer, + TradeListingMessageType::OrderRevision, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_order_revision_response( + event: &RadrootsNostrEvent, + message_type: TradeListingMessageType, + payload: TradeOrderRevisionResponse, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + if message_type == TradeListingMessageType::OrderRevisionAccept && !payload.accepted { + return Err(TradeListingDvmError::InvalidOrder); + } + if message_type == TradeListingMessageType::OrderRevisionDecline && payload.accepted { + return Err(TradeListingDvmError::InvalidOrder); + } + + let next_status = if matches!(message_type, TradeListingMessageType::OrderRevisionAccept) { + TradeOrderStatus::Accepted + } else { + TradeOrderStatus::Declined + }; + ensure_transition(order.status.clone(), next_status.clone())?; + order.status = next_status; + order.seen_event_ids.insert(event_id); + let seller = order.seller_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + seller, + message_type, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_question( + event: &RadrootsNostrEvent, + payload: TradeQuestion, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if let Some(ref payload_order_id) = payload.order_id { + if payload_order_id != order_id { + return Err(TradeListingDvmError::InvalidOrder); + } + } + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Questioned)?; + order.status = TradeOrderStatus::Questioned; + order.seen_event_ids.insert(event_id); + let seller = order.seller_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + seller, + TradeListingMessageType::Question, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_answer( + event: &RadrootsNostrEvent, + payload: TradeAnswer, + listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if let Some(ref payload_order_id) = payload.order_id { + if payload_order_id != order_id { + return Err(TradeListingDvmError::InvalidOrder); + } + } + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.seller_pubkey != event.pubkey.to_string() + || listing_addr.seller_pubkey != order.seller_pubkey + { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Requested)?; + order.status = TradeOrderStatus::Requested; + order.seen_event_ids.insert(event_id); + let buyer = order.buyer_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + buyer, + TradeListingMessageType::Answer, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_discount_request( + event: &RadrootsNostrEvent, + payload: TradeDiscountRequest, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if payload.order_id != order_id { + return Err(TradeListingDvmError::InvalidOrder); + } + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + order.seen_event_ids.insert(event_id); + let seller = order.seller_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + seller, + TradeListingMessageType::DiscountRequest, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_discount_offer( + event: &RadrootsNostrEvent, + payload: TradeDiscountOffer, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + if payload.order_id != order_id { + return Err(TradeListingDvmError::InvalidOrder); + } + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.seller_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; + order.status = TradeOrderStatus::Revised; + order.seen_event_ids.insert(event_id); + let buyer = order.buyer_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + buyer, + TradeListingMessageType::DiscountOffer, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_discount_decision( + event: &RadrootsNostrEvent, + message_type: TradeListingMessageType, + payload: TradeDiscountDecision, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + let payload_is_accept = matches!(payload, TradeDiscountDecision::Accept { .. }); + let payload_is_decline = matches!(payload, TradeDiscountDecision::Decline { .. }); + if message_type == TradeListingMessageType::DiscountAccept && !payload_is_accept { + return Err(TradeListingDvmError::InvalidOrder); + } + if message_type == TradeListingMessageType::DiscountDecline && !payload_is_decline { + return Err(TradeListingDvmError::InvalidOrder); + } + let next_status = match message_type { + TradeListingMessageType::DiscountAccept => TradeOrderStatus::Accepted, + TradeListingMessageType::DiscountDecline => TradeOrderStatus::Requested, + _ => order.status.clone(), + }; + ensure_transition(order.status.clone(), next_status.clone())?; + order.status = next_status; + order.seen_event_ids.insert(event_id); + let seller = order.seller_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + seller, + message_type, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_cancel( + event: &RadrootsNostrEvent, + payload: TradeListingCancel, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + let sender = event.pubkey.to_string(); + if sender != order.buyer_pubkey && sender != order.seller_pubkey { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Cancelled)?; + order.status = TradeOrderStatus::Cancelled; + order.seen_event_ids.insert(event_id); + let recipient = if sender == order.buyer_pubkey { + order.seller_pubkey.clone() + } else { + order.buyer_pubkey.clone() + }; + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + recipient, + TradeListingMessageType::Cancel, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_fulfillment_update( + event: &RadrootsNostrEvent, + payload: TradeFulfillmentUpdate, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.seller_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Fulfilled)?; + order.status = TradeOrderStatus::Fulfilled; + order.seen_event_ids.insert(event_id); + let buyer = order.buyer_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + buyer, + TradeListingMessageType::FulfillmentUpdate, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn handle_receipt( + event: &RadrootsNostrEvent, + payload: TradeReceipt, + _listing_addr: &TradeListingAddress, + order_id: Option<&str>, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; + let mut state = state.lock().await; + let event_id = event.id.to_string(); + if state.is_event_seen(order_id, &event_id) { + return Ok(()); + } + let order = state + .get_order_mut(order_id) + .ok_or(TradeListingStateError::MissingOrder)?; + if order.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + ensure_transition(order.status.clone(), TradeOrderStatus::Completed)?; + order.status = TradeOrderStatus::Completed; + order.seen_event_ids.insert(event_id); + let seller = order.seller_pubkey.clone(); + let listing_addr_str = order.listing_addr.clone(); + drop(state); + + send_envelope( + client, + seller, + TradeListingMessageType::Receipt, + &listing_addr_str, + Some(order_id), + &payload, + ) + .await +} + +async fn send_envelope<T: serde::Serialize + Clone>( + client: &RadrootsNostrClient, + recipient_pubkey: String, + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: Option<&str>, + payload: &T, +) -> Result<(), TradeListingDvmError> { + let envelope = TradeListingEnvelope::new( + message_type, + listing_addr.to_string(), + order_id.map(|v| v.to_string()), + payload.clone(), + ); + let content = serde_json::to_string(&envelope)?; + let mut tags = Vec::with_capacity(3); + tags.push(vec!["p".into(), recipient_pubkey]); + tags.push(vec!["a".into(), listing_addr.to_string()]); + if let Some(order_id) = order_id { + tags.push(vec!["d".into(), order_id.to_string()]); + } + let builder = radroots_nostr_build_event(message_type.kind() as u32, content, tags)?; + radroots_nostr_send_event(client, builder).await?; + Ok(()) +} + +async fn fetch_listing_by_addr( + client: &RadrootsNostrClient, + listing_addr: &str, +) -> Result<Option<RadrootsNostrEvent>, TradeListingDvmError> { + let addr = TradeListingAddress::parse(listing_addr) + .map_err(|_| TradeListingDvmError::InvalidListingAddr)?; + let author = radroots_nostr_parse_pubkey(&addr.seller_pubkey) + .map_err(|_| TradeListingDvmError::InvalidListingAddr)?; + let filter = RadrootsNostrFilter::new() + .kind(RadrootsNostrKind::Custom(addr.kind)) + .author(author) + .identifier(addr.listing_id); + let events = client.fetch_events(filter, Duration::from_secs(10)).await?; + let mut latest: Option<RadrootsNostrEvent> = None; + for ev in events { + if ev.kind != RadrootsNostrKind::Custom(addr.kind) { + continue; + } + match &latest { + Some(cur) if ev.created_at <= cur.created_at => {} + _ => latest = Some(ev), + } + } + Ok(latest) +} + +fn parse_payload<T: DeserializeOwned>(value: serde_json::Value) -> Result<T, TradeListingDvmError> { + serde_json::from_value(value).map_err(|e| TradeListingDvmError::InvalidPayload(e.to_string())) +} + +fn tag_value(tags: &[Vec<String>], key: &str) -> Option<String> { + tags.iter().find_map(|t| { + if t.get(0).map(|k| k.as_str()) == Some(key) { + t.get(1).cloned() + } else { + None + } + }) +} + +fn tag_has_value(tags: &[Vec<String>], key: &str, value: &str) -> bool { + tags.iter().any(|t| { + t.get(0).map(|k| k.as_str()) == Some(key) && t.get(1).map(|v| v.as_str()) == Some(value) + }) +} + +fn ensure_transition( + from: TradeOrderStatus, + to: TradeOrderStatus, +) -> Result<(), TradeListingStateError> { + if from == to { + return Ok(()); + } + let allowed = match from { + TradeOrderStatus::Draft => matches!(to, TradeOrderStatus::Requested), + TradeOrderStatus::Validated => matches!(to, TradeOrderStatus::Requested), + TradeOrderStatus::Requested => matches!( + to, + TradeOrderStatus::Accepted + | TradeOrderStatus::Declined + | TradeOrderStatus::Questioned + | TradeOrderStatus::Revised + | TradeOrderStatus::Cancelled + | TradeOrderStatus::Requested + ), + TradeOrderStatus::Questioned => matches!( + to, + TradeOrderStatus::Requested | TradeOrderStatus::Revised | TradeOrderStatus::Cancelled + ), + TradeOrderStatus::Revised => matches!( + to, + TradeOrderStatus::Accepted + | TradeOrderStatus::Declined + | TradeOrderStatus::Cancelled + | TradeOrderStatus::Requested + ), + TradeOrderStatus::Accepted => { + matches!(to, TradeOrderStatus::Fulfilled | TradeOrderStatus::Cancelled) + } + TradeOrderStatus::Declined => false, + TradeOrderStatus::Cancelled => false, + TradeOrderStatus::Fulfilled => { + matches!( + to, + TradeOrderStatus::Completed | TradeOrderStatus::Fulfilled | TradeOrderStatus::Cancelled + ) + } + TradeOrderStatus::Completed => false, + }; + if allowed { + Ok(()) + } else { + Err(TradeListingStateError::InvalidTransition { from, to }) + } +} + +pub async fn handle_error( + error: TradeListingDvmError, + event: &RadrootsNostrEvent, + client: &RadrootsNostrClient, +) -> Result<(), TradeListingDvmError> { + let builder = + radroots_nostr_build_event_job_feedback(event, "error", Some(error.to_string()), None)?; + let _ = radroots_nostr_send_event(client, builder).await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::ensure_transition; + use radroots_trade::listing::order::TradeOrderStatus; + + #[test] + fn transition_rejects_accept_after_decline() { + let err = ensure_transition(TradeOrderStatus::Declined, TradeOrderStatus::Accepted); + assert!(err.is_err()); + } + + #[test] + fn transition_allows_revision_after_request() { + let ok = ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Revised); + assert!(ok.is_ok()); + } +} diff --git a/src/features/trade_listing/handlers/fulfillment.rs b/src/features/trade_listing/handlers/fulfillment.rs @@ -0,0 +1,132 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::tags::push_trade_listing_chain_tags, + prelude::{ + kinds::KIND_TRADE_LISTING_PAYMENT_RES, + stage::fulfillment::{ + TradeListingFulfillmentRequest, TradeListingFulfillmentResult, + TradeListingFulfillmentState, + }, + }, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestFulfillmentError { + #[error("Failed to parse fulfillment request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Payment result not kind 6305 or missing chain")] + InvalidPayment, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +pub async fn handle_job_request_trade_fulfillment( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingFulfillmentRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestFulfillmentError::ParseRequest(e.to_string()))?; + + let payment_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.payment_result_event_id) + .await + .map_err(|_| { + JobRequestFulfillmentError::FetchReference(req.payment_result_event_id.clone()) + })?; + if payment_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_PAYMENT_RES) { + return Err(JobRequestFulfillmentError::InvalidPayment.into()); + } + + let e_root = payment_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestFulfillmentError::InvalidPayment)?; + + let d_tag = payment_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let status = TradeListingFulfillmentResult { + state: TradeListingFulfillmentState::Preparing, + tracking: None, + eta: None, + notes: Some("order accepted and paid; preparing shipment".into()), + }; + let payload_json = serde_json::to_string(&status)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.payment_result_event_id.clone()), + d_tag, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/fulfillment ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/invoice.rs b/src/features/trade_listing/handlers/invoice.rs @@ -0,0 +1,169 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::JobPaymentRequest, + job_request::{RadrootsJobInput, RadrootsJobParam}, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_PREV, TAG_E_ROOT}, +}; +use radroots_trade::{ + listing::tags::push_trade_listing_chain_tags, + prelude::{ + kinds::{ + KIND_TRADE_LISTING_ACCEPT_RES, KIND_TRADE_LISTING_INVOICE_RES, + KIND_TRADE_LISTING_ORDER_RES, + }, + stage::invoice::{TradeListingInvoiceRequest, TradeListingInvoiceResult}, + }, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestInvoiceError { + #[error("Failed to parse invoice request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Accept result not kind 6302 or missing chain")] + InvalidAccept, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +fn param_lookup<'a>(params: &'a [RadrootsJobParam], key: &str) -> Option<&'a str> { + params + .iter() + .find(|p| p.key == key) + .map(|p| p.value.as_str()) +} + +pub async fn handle_job_request_trade_invoice( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingInvoiceRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestInvoiceError::ParseRequest(e.to_string()))?; + + let accept_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.accept_result_event_id) + .await + .map_err(|_| JobRequestInvoiceError::FetchReference(req.accept_result_event_id.clone()))?; + if accept_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ACCEPT_RES) { + return Err(JobRequestInvoiceError::InvalidAccept.into()); + } + + let e_root = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestInvoiceError::InvalidAccept)?; + + let d_tag = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let order_res_id = accept_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_PREV)).then(|| s.get(1).cloned()) + }) + .flatten(); + + if let Some(prev_id) = &order_res_id { + if let Ok(prev_evt) = radroots_nostr_fetch_event_by_id(client.clone(), prev_id).await { + if prev_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_RES) {} + } + } + + let amount_sat = param_lookup(&job_req.model.params, "amount_sat") + .and_then(|v| v.parse::<u32>().ok()) + .or_else(|| { + param_lookup(&job_req.model.params, "amount_msat") + .and_then(|v| v.parse::<u64>().ok()) + .map(|msat| (msat / 1000) as u32) + }) + .unwrap_or(0); + + let bolt11 = param_lookup(&job_req.model.params, "bolt11").map(|s| s.to_string()); + let note = param_lookup(&job_req.model.params, "note").map(|s| s.to_string()); + let expires_at = + param_lookup(&job_req.model.params, "expires_at").and_then(|v| v.parse::<u32>().ok()); + + let invoice = TradeListingInvoiceResult { + total_sat: amount_sat, + bolt11: bolt11.clone(), + note, + expires_at, + }; + let payload_json = serde_json::to_string(&invoice)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_INVOICE_RES); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: Some(JobPaymentRequest { amount_sat, bolt11 }), + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.accept_result_event_id.clone()), + d_tag, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/invoice ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/mod.rs b/src/features/trade_listing/handlers/mod.rs @@ -0,0 +1 @@ +pub mod dvm; diff --git a/src/features/trade_listing/handlers/order.rs b/src/features/trade_listing/handlers/order.rs @@ -0,0 +1,111 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job::JobPaymentRequest, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + listing::RadrootsListing, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_ORDER_RES, stage::order::TradeListingOrderRequest, tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::{ + domain::pricing::ListingOrderCalculator, + subscriber::{JobRequestCtx, JobRequestError}, + }, +}; + +#[derive(Debug, Error)] +pub enum JobRequestOrderError { + #[error("Failed to parse reference event: {0}")] + ParseReference(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Reference event does not meet request requirements: {0}")] + MissingRequested(String), + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), + #[error("Request cannot be satisfied: {0}")] + Unsatisfiable(String), +} + +pub async fn handle_job_request_trade_order( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let order_data: TradeListingOrderRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestOrderError::ParseReference(e.to_string()))?; + + let ref_id = &order_data.event.id; + let ref_event = radroots_nostr_fetch_event_by_id(client.clone(), ref_id) + .await + .map_err(|_| JobRequestOrderError::FetchReference(ref_id.clone()))?; + + let listing: RadrootsListing = serde_json::from_str(&ref_event.content).map_err(|_| { + JobRequestOrderError::ParseReference(format!("invalid listing content for {}", ref_id)) + })?; + + let order_result = listing.calculate_order(&order_data.payload)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + debug_assert_eq!(result_kind as u16, KIND_TRADE_LISTING_ORDER_RES as u16); + + let payload_json = serde_json::to_string(&order_result)?; + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None::<JobPaymentRequest>, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + + let e_root = ref_event.id.to_hex(); + let trade_id = format!("trade:{}:{}", e_root, event_job_request.id.to_hex()); + tags::push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + None::<String>, + Some(trade_id.clone()), + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/order (e_root={}) result sent: {:?}", + e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/payment.rs b/src/features/trade_listing/handlers/payment.rs @@ -0,0 +1,123 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_INVOICE_RES, + stage::payment::{TradeListingPaymentProofRequest, TradeListingPaymentResult}, + tags::push_trade_listing_chain_tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestPaymentError { + #[error("Failed to parse payment request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Invoice result not kind 6304 or missing chain")] + InvalidInvoice, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +pub async fn handle_job_request_trade_payment( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingPaymentProofRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestPaymentError::ParseRequest(e.to_string()))?; + + let invoice_evt = radroots_nostr_fetch_event_by_id(client.clone(), &req.invoice_result_event_id) + .await + .map_err(|_| JobRequestPaymentError::FetchReference(req.invoice_result_event_id.clone()))?; + if invoice_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_INVOICE_RES) { + return Err(JobRequestPaymentError::InvalidInvoice.into()); + } + + let e_root = invoice_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestPaymentError::InvalidInvoice)?; + + let d_tag = invoice_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let ack = TradeListingPaymentResult { + verified: true, + message: Some("payment proof accepted".into()), + }; + let payload_json = serde_json::to_string(&ack)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.invoice_result_event_id.clone()), + d_tag, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/payment ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/handlers/receipt.rs b/src/features/trade_listing/handlers/receipt.rs @@ -0,0 +1,126 @@ +use radroots_nostr::prelude::{ + radroots_nostr_build_event, + radroots_nostr_fetch_event_by_id, + radroots_nostr_send_event, + RadrootsNostrClient, + RadrootsNostrEvent, + RadrootsNostrKind, + RadrootsNostrKeys, +}; +use radroots_events_codec::job::{result::encode::job_result_build_tags, traits::JobEventBorrow}; +use thiserror::Error; +use tracing::info; + +use radroots_events::{ + RadrootsNostrEventPtr, + job_request::RadrootsJobInput, + job_result::RadrootsJobResult, + kinds::result_kind_for_request_kind, + tags::{TAG_D, TAG_E_ROOT}, +}; +use radroots_trade::prelude::{ + kinds::KIND_TRADE_LISTING_FULFILL_RES, + stage::receipt::{TradeListingReceiptRequest, TradeListingReceiptResult}, + tags::push_trade_listing_chain_tags, +}; + +use crate::{ + adapters::nostr::event::NostrEventAdapter, + features::trade_listing::subscriber::{JobRequestCtx, JobRequestError}, +}; + +#[derive(Debug, Error)] +pub enum JobRequestReceiptError { + #[error("Failed to parse receipt request: {0}")] + ParseRequest(String), + #[error("Failed to fetch reference event: {0}")] + FetchReference(String), + #[error("Reference event not found: {0}")] + MissingReference(String), + #[error("Fulfillment result not kind 6306 or missing chain")] + InvalidFulfillment, + #[error("Failed to send job response")] + ResponseSend(#[from] radroots_nostr::error::RadrootsNostrError), +} + +pub async fn handle_job_request_trade_receipt( + event_job_request: RadrootsNostrEvent, + _keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + job_req: JobRequestCtx, + job_req_input: RadrootsJobInput, +) -> Result<(), JobRequestError> { + let ev = NostrEventAdapter::new(&event_job_request); + + let req: TradeListingReceiptRequest = serde_json::from_str(&job_req_input.data) + .map_err(|e| JobRequestReceiptError::ParseRequest(e.to_string()))?; + + let fulfill_evt = + radroots_nostr_fetch_event_by_id(client.clone(), &req.fulfillment_result_event_id) + .await + .map_err(|_| { + JobRequestReceiptError::FetchReference(req.fulfillment_result_event_id.clone()) + })?; + if fulfill_evt.kind != RadrootsNostrKind::Custom(KIND_TRADE_LISTING_FULFILL_RES) { + return Err(JobRequestReceiptError::InvalidFulfillment.into()); + } + + let e_root = fulfill_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_E_ROOT)).then(|| s.get(1).cloned()) + }) + .flatten() + .ok_or(JobRequestReceiptError::InvalidFulfillment)?; + + let d_tag = fulfill_evt + .tags + .iter() + .find_map(|t| { + let s = t.as_slice(); + (s.get(0).map(|k| k.as_str()) == Some(TAG_D)).then(|| s.get(1).cloned()) + }) + .flatten(); + + let ack = TradeListingReceiptResult { + acknowledged: true, + at: event_job_request.created_at.as_u64() as u32, + }; + let payload_json = serde_json::to_string(&ack)?; + + let result_kind = result_kind_for_request_kind(job_req.model.kind as u32) + .unwrap_or(job_req.model.kind as u32 + 1000); + + let result_model = RadrootsJobResult { + kind: result_kind as u16, + request_event: RadrootsNostrEventPtr { + id: ev.raw_id().to_string(), + relays: None, + }, + request_json: Some(serde_json::to_string(&job_req.model)?), + inputs: job_req.model.inputs.clone(), + customer_pubkey: Some(ev.raw_author().to_string()), + payment: None, + content: Some(payload_json.clone()), + encrypted: false, + }; + + let mut tag_slices = job_result_build_tags(&result_model); + push_trade_listing_chain_tags( + &mut tag_slices, + e_root.clone(), + Some(req.fulfillment_result_event_id.clone()), + d_tag, + ); + + let builder = radroots_nostr_build_event(result_kind as u32, payload_json, tag_slices)?; + let job_result_event_id = radroots_nostr_send_event(client, builder).await?; + + info!( + "job request trade/receipt ({}={}) result sent: {:?}", + TAG_E_ROOT, e_root, job_result_event_id + ); + Ok(()) +} diff --git a/src/features/trade_listing/mod.rs b/src/features/trade_listing/mod.rs @@ -0,0 +1,3 @@ +pub mod handlers; +pub mod state; +pub mod subscriber; diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs @@ -0,0 +1,104 @@ +#![forbid(unsafe_code)] + +use std::collections::{HashMap, HashSet}; + +use radroots_trade::listing::order::TradeOrderStatus; + +#[derive(Clone, Debug)] +pub struct TradeOrderState { + pub order_id: String, + pub listing_addr: String, + pub buyer_pubkey: String, + pub seller_pubkey: String, + pub status: TradeOrderStatus, + pub seen_event_ids: HashSet<String>, +} + +#[derive(Debug, Default)] +pub struct TradeListingState { + validated_listings: HashSet<String>, + orders: HashMap<String, TradeOrderState>, +} + +impl TradeListingState { + pub fn mark_listing_validated(&mut self, listing_addr: &str) { + self.validated_listings.insert(listing_addr.to_string()); + } + + pub fn is_listing_validated(&self, listing_addr: &str) -> bool { + self.validated_listings.contains(listing_addr) + } + + pub fn order_exists(&self, order_id: &str) -> bool { + self.orders.contains_key(order_id) + } + + pub fn get_order_mut(&mut self, order_id: &str) -> Option<&mut TradeOrderState> { + self.orders.get_mut(order_id) + } + + pub fn insert_order(&mut self, order: TradeOrderState) { + self.orders.insert(order.order_id.clone(), order); + } + + pub fn mark_event_seen(&mut self, order_id: &str, event_id: &str) -> bool { + if let Some(state) = self.orders.get_mut(order_id) { + state.seen_event_ids.insert(event_id.to_string()) + } else { + false + } + } + + pub fn is_event_seen(&self, order_id: &str, event_id: &str) -> bool { + self.orders + .get(order_id) + .map(|state| state.seen_event_ids.contains(event_id)) + .unwrap_or(false) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TradeListingStateError { + MissingOrder, + InvalidTransition { from: TradeOrderStatus, to: TradeOrderStatus }, +} + +impl core::fmt::Display for TradeListingStateError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + TradeListingStateError::MissingOrder => write!(f, "missing order state"), + TradeListingStateError::InvalidTransition { from, to } => { + write!(f, "invalid order transition: {from:?} -> {to:?}") + } + } + } +} + +impl std::error::Error for TradeListingStateError {} + +#[cfg(test)] +mod tests { + use super::{TradeListingState, TradeOrderState}; + use radroots_trade::listing::order::TradeOrderStatus; + + #[test] + fn state_tracks_listings_and_events() { + let mut state = TradeListingState::default(); + assert!(!state.is_listing_validated("addr")); + state.mark_listing_validated("addr"); + assert!(state.is_listing_validated("addr")); + + let order = TradeOrderState { + order_id: "order-1".into(), + listing_addr: "addr".into(), + buyer_pubkey: "buyer".into(), + seller_pubkey: "seller".into(), + status: TradeOrderStatus::Requested, + seen_event_ids: Default::default(), + }; + state.insert_order(order); + assert!(!state.is_event_seen("order-1", "evt")); + assert!(state.mark_event_seen("order-1", "evt")); + assert!(state.is_event_seen("order-1", "evt")); + } +} diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -0,0 +1,115 @@ +#![forbid(unsafe_code)] + +use std::{sync::Arc, time::Duration}; + +use anyhow::{anyhow, Result}; +use radroots_nostr::prelude::{ + radroots_nostr_filter_new_events, + radroots_nostr_tags_resolve, + RadrootsNostrClient, + RadrootsNostrFilter, + RadrootsNostrKind, + RadrootsNostrKeys, + RadrootsNostrRelayPoolNotification, +}; +use tokio::sync::watch; +use tokio::time::sleep; +use tracing::{info, warn}; + +use radroots_trade::listing::dvm_kinds::TRADE_LISTING_DVM_KINDS; + +use crate::features::trade_listing::{ + handlers::dvm::{handle_error, handle_event, TradeListingDvmError}, + state::TradeListingState, +}; + +pub async fn subscriber( + client: RadrootsNostrClient, + keys: RadrootsNostrKeys, + mut stop_rx: watch::Receiver<bool>, +) -> Result<()> { + info!( + "Starting subscriber for trade listing DVM kinds: {:?}", + TRADE_LISTING_DVM_KINDS + ); + + let kinds: Vec<RadrootsNostrKind> = TRADE_LISTING_DVM_KINDS + .iter() + .map(|kind| RadrootsNostrKind::Custom(*kind)) + .collect(); + let filter = radroots_nostr_filter_new_events(RadrootsNostrFilter::new().kinds(kinds)); + + if *stop_rx.borrow() { + return Ok(()); + } + + let subscription = client.subscribe(filter, None).await?; + + let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + let mut notifications = client.notifications(); + + let mut stop_requested = false; + let mut notifications_closed = false; + + loop { + tokio::select! { + _ = stop_rx.changed() => { + stop_requested = true; + break; + } + msg = notifications.recv() => { + let n = match msg { + Ok(n) => n, + Err(_) => { + notifications_closed = true; + break; + } + }; + + if let RadrootsNostrRelayPoolNotification::Event { event, .. } = n { + let event = (*event).clone(); + let keys = keys.clone(); + let client = client.clone(); + let state = Arc::clone(&state); + + tokio::spawn(async move { + if cfg!(debug_assertions) { + sleep(Duration::from_millis(200)).await; + } + + let resolved_tags = match radroots_nostr_tags_resolve(&event, &keys) { + Ok(tags) => tags, + Err(err) => { + warn!("trade_listing: failed to resolve tags: {err}"); + return; + } + }; + + if let Err(err) = + handle_event(event.clone(), resolved_tags, keys, client.clone(), state).await + { + match err { + TradeListingDvmError::MissingRecipient + | TradeListingDvmError::UnsupportedKind => {} + other => { + if let Err(err) = handle_error(other, &event, &client).await { + warn!("trade_listing: failed to send error feedback: {err}"); + } + } + } + } + }); + } + } + } + } + + client.unsubscribe(&subscription.val).await; + if stop_requested { + return Ok(()); + } + if notifications_closed { + return Err(anyhow!("trade_listing subscriber notifications closed")); + } + Ok(()) +} diff --git a/src/infra/mod.rs b/src/infra/mod.rs @@ -0,0 +1 @@ +#![forbid(unsafe_code)] diff --git a/src/lib.rs b/src/lib.rs @@ -0,0 +1,87 @@ +pub mod adapters; +pub mod cli; +pub mod config; +pub mod infra; +pub mod rhi; + +pub mod features { + pub mod trade_listing; +} + +pub use cli::Args as cli_args; + +use anyhow::Result; +use std::time::Duration; + +use crate::{ + rhi::{Rhi, start_subscriber}, +}; +use radroots_identity::RadrootsIdentity; +use radroots_nostr::prelude::RadrootsNostrMetadata; +use tracing::{info, warn}; + +fn metadata_has_fields(md: &RadrootsNostrMetadata) -> bool { + md.name.is_some() + || md.display_name.is_some() + || md.about.is_some() + || md.website.is_some() + || md.picture.is_some() + || md.banner.is_some() + || md.nip05.is_some() + || md.lud06.is_some() + || md.lud16.is_some() + || !md.custom.is_empty() +} + +pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> { + let identity = RadrootsIdentity::load_or_generate( + args.identity.as_ref(), + args.allow_generate_identity, + )?; + let keys = identity.keys().clone(); + + let rhi = Rhi::new(keys.clone()); + let client = rhi.client.clone(); + let relays = settings.config.relays.clone(); + + for relay in &relays { + client.add_relay(relay).await?; + } + + let md = settings.metadata.clone(); + let has_metadata = metadata_has_fields(&md); + + if !relays.is_empty() { + client.connect().await; + client.wait_for_connection(Duration::from_secs(5)).await; + if has_metadata { + if let Err(e) = client.set_metadata(&md).await { + warn!("Failed to publish metadata on startup: {e}"); + } else { + info!("Published metadata on startup"); + } + } + } + + let handle = start_subscriber( + client.clone(), + keys.clone(), + settings.config.subscriber.backoff.clone(), + ) + .await; + + let stop_handle = handle.clone(); + + tokio::select! { + _ = radroots_runtime::shutdown_signal() => { + info!("Shutting down…"); + stop_handle.stop(); + } + _ = handle.stopped() => {} + } + + client.unsubscribe_all().await; + client.disconnect().await; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs @@ -0,0 +1,30 @@ +use anyhow::{Context, Result}; +use rhi::{cli_args, config, run_rhi}; +use std::process::ExitCode; +use tracing::info; + +#[tokio::main] +async fn main() -> ExitCode { + match run().await { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + tracing::error!(error = ?err, "Fatal error"); + eprintln!("Fatal error: {err:#}"); + ExitCode::FAILURE + } + } +} + +async fn run() -> Result<()> { + let (args, settings): (cli_args, config::Settings) = + radroots_runtime::parse_and_load_path_with_init( + |a: &cli_args| Some(a.config.as_path()), + |cfg: &config::Settings| cfg.config.logs_dir.as_str(), + None, + ) + .context("load configuration")?; + + info!("Starting"); + + run_rhi(&settings, &args).await +} diff --git a/src/rhi.rs b/src/rhi.rs @@ -0,0 +1,105 @@ +use std::time::{Duration, Instant}; + +use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; +use radroots_runtime::{Backoff, BackoffConfig}; + +pub struct Rhi { + pub(crate) _started: Instant, + pub client: RadrootsNostrClient, +} + +impl Rhi { + pub fn new(keys: RadrootsNostrKeys) -> Self { + let client = RadrootsNostrClient::new(keys); + Self { + _started: Instant::now(), + client, + } + } +} + +use std::sync::Arc; +use tokio::sync::Mutex; + +pub struct RhiHandle { + stop_tx: Arc<Mutex<Option<tokio::sync::watch::Sender<bool>>>>, + join: Option<tokio::task::JoinHandle<()>>, +} + +impl Clone for RhiHandle { + fn clone(&self) -> Self { + Self { + stop_tx: Arc::clone(&self.stop_tx), + join: None, // don’t clone the JoinHandle! + } + } +} + +impl RhiHandle { + pub fn stop(&self) { + if let Some(tx) = self.stop_tx.try_lock().ok().and_then(|mut opt| opt.take()) { + let _ = tx.send(true); + } + } + + pub async fn stopped(mut self) { + if let Some(join) = self.join.take() { + let _ = join.await; + } + } +} + +pub async fn start_subscriber( + client: RadrootsNostrClient, + keys: RadrootsNostrKeys, + backoff_cfg: BackoffConfig, +) -> RhiHandle { + let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false); + + let join = tokio::spawn(async move { + let mut backoff = Backoff::new(backoff_cfg); + loop { + if *stop_rx.borrow() { + break; + } + + client.connect().await; + tokio::select! { + _ = client.wait_for_connection(Duration::from_secs(5)) => {} + _ = stop_rx.changed() => break, + } + + let res = crate::features::trade_listing::subscriber::subscriber( + client.clone(), + keys.clone(), + stop_rx.clone(), + ) + .await; + + let failed = res.is_err(); + + if let Err(e) = res { + tracing::error!("Error on job request subscription: {e}"); + } else { + backoff.reset(); + } + + if *stop_rx.borrow() { + break; + } + + if failed { + let delay = backoff.next_delay(); + tokio::select! { + _ = tokio::time::sleep(delay) => {} + _ = stop_rx.changed() => break, + } + } + } + }); + + RhiHandle { + stop_tx: Arc::new(Mutex::new(Some(stop_tx))), + join: Some(join), + } +}