radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit ae44b3c0408139ae819bd9779b953f61cb9eef6f
parent 1b468871eb1e63c41e88ca395d4d2c2b6cad4fd8
Author: triesap <tyson@radroots.org>
Date:   Tue, 23 Jun 2026 08:34:05 +0000

publish-proxy: replace bridge daemon ingress

- add publish proxy config, runtime paths, and principal bootstrap command
- store scoped principals and publish jobs in SQLite with hashed bearer tokens
- expose authenticated publish.* JSON-RPC method shells
- remove public bridge publish modules and state storage

Diffstat:
MCargo.lock | 128++++++++++++++++++++++++++++++++++++++++++-------------------------------------
MCargo.toml | 13++++---------
Mconfig.toml | 14+++++++-------
Msrc/app/cli.rs | 51++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/app/config.rs | 407++++++++++++++++++++++++++++---------------------------------------------------
Msrc/app/paths.rs | 40++++++++++++----------------------------
Msrc/app/runtime.rs | 167+++++++++++++++++++++++++++++++++++++++++++------------------------------------
Dsrc/core/bridge/mod.rs | 2--
Dsrc/core/bridge/publish.rs | 590-------------------------------------------------------------------------------
Dsrc/core/bridge/store.rs | 805-------------------------------------------------------------------------------
Msrc/core/mod.rs | 2+-
Asrc/core/publish_proxy/mod.rs | 922+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/core/state.rs | 50++++++++++++--------------------------------------
Msrc/transport/jsonrpc/auth.rs | 121++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Dsrc/transport/jsonrpc/methods/bridge/farm_publish.rs | 143-------------------------------------------------------------------------------
Dsrc/transport/jsonrpc/methods/bridge/job_list.rs | 22----------------------
Dsrc/transport/jsonrpc/methods/bridge/job_status.rs | 32--------------------------------
Dsrc/transport/jsonrpc/methods/bridge/listing_publish.rs | 489-------------------------------------------------------------------------------
Dsrc/transport/jsonrpc/methods/bridge/mod.rs | 25-------------------------
Dsrc/transport/jsonrpc/methods/bridge/order_request.rs | 437-------------------------------------------------------------------------------
Dsrc/transport/jsonrpc/methods/bridge/profile_publish.rs | 137-------------------------------------------------------------------------------
Dsrc/transport/jsonrpc/methods/bridge/shared.rs | 558-------------------------------------------------------------------------------
Dsrc/transport/jsonrpc/methods/bridge/status.rs | 82-------------------------------------------------------------------------------
Msrc/transport/jsonrpc/methods/mod.rs | 113++++++++++++++++++++++++++++++++++---------------------------------------------
Asrc/transport/jsonrpc/methods/publish_proxy.rs | 301+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/transport/jsonrpc/mod.rs | 6+++---
Msrc/transport/jsonrpc/server.rs | 12++++++------
27 files changed, 1746 insertions(+), 3923 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -576,6 +576,18 @@ dependencies = [ ] [[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + +[[package]] name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -800,6 +812,15 @@ dependencies = [ ] [[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + +[[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1247,6 +1268,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + +[[package]] name = "linux-raw-sys" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1595,6 +1627,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + +[[package]] name = "poly1305" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1725,14 +1763,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" [[package]] -name = "radroots_authority" -version = "0.1.0-alpha.2" -dependencies = [ - "radroots_events", - "thiserror 1.0.69", -] - -[[package]] name = "radroots_core" version = "0.1.0-alpha.2" dependencies = [ @@ -1759,8 +1789,6 @@ dependencies = [ "nostr", "radroots_core", "radroots_events", - "serde", - "serde_json", ] [[package]] @@ -1806,44 +1834,22 @@ dependencies = [ ] [[package]] -name = "radroots_nostr_connect" -version = "0.1.0-alpha.2" -dependencies = [ - "nostr", - "serde", - "serde_json", - "thiserror 1.0.69", - "url", -] - -[[package]] -name = "radroots_nostr_signer" +name = "radroots_protected_store" version = "0.1.0-alpha.2" dependencies = [ - "hex", - "nostr", - "radroots_identity", - "radroots_nostr", - "radroots_nostr_connect", - "radroots_runtime", + "chacha20poly1305", + "getrandom 0.2.17", + "radroots_secret_vault", "serde", "serde_json", - "sha2", - "thiserror 1.0.69", - "url", - "uuid", + "zeroize", ] [[package]] -name = "radroots_protected_store" +name = "radroots_publish_proxy_protocol" version = "0.1.0-alpha.2" dependencies = [ - "chacha20poly1305", - "getrandom 0.2.17", - "radroots_secret_vault", "serde", - "serde_json", - "zeroize", ] [[package]] @@ -1882,22 +1888,6 @@ name = "radroots_secret_vault" version = "0.1.0-alpha.2" [[package]] -name = "radroots_trade" -version = "0.1.0-alpha.2" -dependencies = [ - "base64 0.22.1", - "hex", - "radroots_authority", - "radroots_core", - "radroots_events", - "radroots_events_codec", - "serde", - "serde_json", - "sha2", - "thiserror 1.0.69", -] - -[[package]] name = "radrootsd" version = "0.1.0" dependencies = [ @@ -1905,16 +1895,14 @@ dependencies = [ "clap", "jsonrpsee", "nostr", - "radroots_core", "radroots_events", - "radroots_events_codec", "radroots_identity", "radroots_nostr", - "radroots_nostr_signer", + "radroots_publish_proxy_protocol", "radroots_runtime", "radroots_runtime_paths", - "radroots_trade", - "reqwest", + "rand 0.9.2", + "rusqlite", "serde", "serde_json", "serde_qs", @@ -2084,6 +2072,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" [[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink 0.9.1", + "libsqlite3-sys", + "smallvec", +] + +[[package]] name = "rust-ini" version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2950,6 +2952,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" [[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3445,7 +3453,7 @@ checksum = "8902160c4e6f2fb145dbe9d6760a75e3c9522d8bf796ed7047c85919ac7115f8" dependencies = [ "arraydeque", "encoding_rs", - "hashlink", + "hashlink 0.8.4", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml @@ -11,34 +11,29 @@ description = "Radroots daemon binary" resolver = "2" [workspace.dependencies] -radroots_core = { path = "../lib/crates/core" } radroots_events = { path = "../lib/crates/events" } -radroots_events_codec = { path = "../lib/crates/events_codec" } radroots_identity = { path = "../lib/crates/identity" } radroots_nostr = { path = "../lib/crates/nostr" } -radroots_nostr_signer = { path = "../lib/crates/nostr_signer" } +radroots_publish_proxy_protocol = { path = "../lib/crates/publish_proxy_protocol" } radroots_runtime = { path = "../lib/crates/runtime" } -radroots_trade = { path = "../lib/crates/trade" } [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } [dependencies] -radroots_core = { workspace = true, features = ["std", "serde"] } radroots_events = { workspace = true, features = ["serde"] } -radroots_events_codec = { workspace = true, features = ["nostr", "serde_json"] } radroots_identity = { workspace = true } radroots_nostr = { workspace = true, features = ["client", "codec", "events", "http"] } -radroots_nostr_signer = { workspace = true } +radroots_publish_proxy_protocol = { workspace = true, features = ["std", "serde"] } radroots_runtime = { workspace = true, features = ["cli"] } radroots_runtime_paths = { path = "../lib/crates/runtime_paths" } -radroots_trade = { workspace = true } nostr = { version = "0.44.2", features = ["nip46"] } anyhow = { version = "1" } clap = { version = "4", features = ["derive"] } jsonrpsee = { version = "0.26", features = ["server"] } -reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +rand = { version = "0.9" } +rusqlite = { version = "0.32", features = ["bundled"] } serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", default-features = false } serde_qs = { version = "1.0" } diff --git a/config.toml b/config.toml @@ -9,13 +9,13 @@ # interactive_user: # logs_dir = ~/.radroots/logs/services/radrootsd # service identity = ~/.radroots/secrets/services/radrootsd/identity.secret.json -# bridge state = ~/.radroots/data/services/radrootsd/bridge/bridge-jobs.json +# publish proxy database = ~/.radroots/data/services/radrootsd/publish_proxy.sqlite # service_host: # logs_dir = /var/log/radroots/services/radrootsd # service identity = /etc/radroots/secrets/services/radrootsd/identity.secret.json -# bridge state = /var/lib/radroots/services/radrootsd/bridge/bridge-jobs.json +# publish proxy database = /var/lib/radroots/services/radrootsd/publish_proxy.sqlite # the canonical live service identity is always an encrypted local envelope -# only override logs_dir or config.bridge.state_path intentionally +# only override logs_dir or config.publish_proxy.database_path intentionally [metadata] name = "radrootsd" @@ -36,11 +36,11 @@ relays = [ [config.rpc] addr = "127.0.0.1:7070" -[config.bridge] +[config.publish_proxy] enabled = true -bearer_token = "change-me" -delivery_policy = "any" -publish_max_attempts = 2 +max_event_bytes = 131072 +max_relays_per_request = 20 +job_list_limit = 100 [config.nip46] public_jsonrpc_enabled = false diff --git a/src/app/cli.rs b/src/app/cli.rs @@ -1,4 +1,6 @@ -use clap::Parser; +use std::path::PathBuf; + +use clap::{Args as ClapArgs, Parser, Subcommand}; use radroots_runtime::RadrootsServiceCliArgs; #[derive(Parser, Debug, Clone)] @@ -10,4 +12,51 @@ use radroots_runtime::RadrootsServiceCliArgs; pub struct Args { #[command(flatten)] pub service: RadrootsServiceCliArgs, + #[command(subcommand)] + pub command: Option<Command>, +} + +#[derive(Subcommand, Debug, Clone)] +pub enum Command { + PublishProxy(PublishProxyCommand), +} + +#[derive(ClapArgs, Debug, Clone)] +pub struct PublishProxyCommand { + #[command(subcommand)] + pub command: PublishProxySubcommand, +} + +#[derive(Subcommand, Debug, Clone)] +pub enum PublishProxySubcommand { + Principal(PrincipalCommand), +} + +#[derive(ClapArgs, Debug, Clone)] +pub struct PrincipalCommand { + #[command(subcommand)] + pub command: PrincipalSubcommand, +} + +#[derive(Subcommand, Debug, Clone)] +pub enum PrincipalSubcommand { + Init(PrincipalInitArgs), +} + +#[derive(ClapArgs, Debug, Clone)] +pub struct PrincipalInitArgs { + #[arg(long)] + pub label: String, + #[arg(long)] + pub token_file: PathBuf, + #[arg(long)] + pub allowed_pubkey: Vec<String>, + #[arg(long)] + pub allowed_kind: Vec<u32>, + #[arg(long)] + pub allowed_relay_policy: Vec<String>, + #[arg(long)] + pub job_visibility: String, + #[arg(long)] + pub allow_request_relays: bool, } diff --git a/src/app/config.rs b/src/app/config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use super::paths::{ - RadrootsdRuntimePaths, default_bridge_state_path, process_path_selection, + RadrootsdRuntimePaths, default_publish_proxy_database_path, process_path_selection, resolve_runtime_paths_with_resolver, }; @@ -33,6 +33,10 @@ fn default_message_buffer_capacity() -> u32 { 1024 } +fn default_rpc_batch_request_limit() -> Option<u32> { + Some(0) +} + fn default_nip46_session_ttl_secs() -> u64 { 900 } @@ -45,32 +49,24 @@ fn default_nip46_public_jsonrpc_enabled() -> bool { false } -fn default_bridge_enabled() -> bool { - false +fn default_publish_proxy_enabled() -> bool { + true } -fn default_bridge_connect_timeout_secs() -> u64 { +fn default_publish_proxy_connect_timeout_secs() -> u64 { 10 } -fn default_bridge_delivery_policy() -> BridgeDeliveryPolicy { - BridgeDeliveryPolicy::Any -} - -fn default_bridge_publish_max_attempts() -> usize { - 1 -} - -fn default_bridge_publish_initial_backoff_millis() -> u64 { - 250 +fn default_publish_proxy_max_event_bytes() -> usize { + 128 * 1024 } -fn default_bridge_publish_max_backoff_millis() -> u64 { - 2_000 +fn default_publish_proxy_max_relays_per_request() -> usize { + 20 } -fn default_bridge_job_status_retention() -> usize { - 256 +fn default_publish_proxy_job_list_limit() -> usize { + 100 } #[derive(Debug, Deserialize, Clone, Default)] @@ -99,61 +95,45 @@ impl RawServiceConfig { } #[derive(Debug, Deserialize, Clone)] -struct RawBridgeConfig { - #[serde(default = "default_bridge_enabled")] +struct RawPublishProxyConfig { + #[serde(default = "default_publish_proxy_enabled")] pub enabled: bool, - #[serde(default)] - pub bearer_token: Option<String>, - #[serde(default = "default_bridge_connect_timeout_secs")] + #[serde(default = "default_publish_proxy_connect_timeout_secs")] pub connect_timeout_secs: u64, - #[serde(default = "default_bridge_delivery_policy")] - pub delivery_policy: BridgeDeliveryPolicy, + #[serde(default = "default_publish_proxy_max_event_bytes")] + pub max_event_bytes: usize, + #[serde(default = "default_publish_proxy_max_relays_per_request")] + pub max_relays_per_request: usize, + #[serde(default = "default_publish_proxy_job_list_limit")] + pub job_list_limit: usize, #[serde(default)] - pub delivery_quorum: Option<usize>, - #[serde(default = "default_bridge_publish_max_attempts")] - pub publish_max_attempts: usize, - #[serde(default = "default_bridge_publish_initial_backoff_millis")] - pub publish_initial_backoff_millis: u64, - #[serde(default = "default_bridge_publish_max_backoff_millis")] - pub publish_max_backoff_millis: u64, - #[serde(default = "default_bridge_job_status_retention")] - pub job_status_retention: usize, - #[serde(default)] - pub state_path: Option<PathBuf>, + pub database_path: Option<PathBuf>, } -impl Default for RawBridgeConfig { +impl Default for RawPublishProxyConfig { fn default() -> Self { Self { - enabled: default_bridge_enabled(), - bearer_token: None, - connect_timeout_secs: default_bridge_connect_timeout_secs(), - delivery_policy: default_bridge_delivery_policy(), - delivery_quorum: None, - publish_max_attempts: default_bridge_publish_max_attempts(), - publish_initial_backoff_millis: default_bridge_publish_initial_backoff_millis(), - publish_max_backoff_millis: default_bridge_publish_max_backoff_millis(), - job_status_retention: default_bridge_job_status_retention(), - state_path: None, + enabled: default_publish_proxy_enabled(), + connect_timeout_secs: default_publish_proxy_connect_timeout_secs(), + max_event_bytes: default_publish_proxy_max_event_bytes(), + max_relays_per_request: default_publish_proxy_max_relays_per_request(), + job_list_limit: default_publish_proxy_job_list_limit(), + database_path: None, } } } -impl RawBridgeConfig { - fn into_bridge_config(self, paths: &RadrootsdRuntimePaths) -> BridgeConfig { - BridgeConfig { +impl RawPublishProxyConfig { + fn into_publish_proxy_config(self, paths: &RadrootsdRuntimePaths) -> PublishProxyConfig { + PublishProxyConfig { enabled: self.enabled, - bearer_token: self.bearer_token, connect_timeout_secs: self.connect_timeout_secs, - delivery_policy: self.delivery_policy, - delivery_quorum: self.delivery_quorum, - publish_max_attempts: self.publish_max_attempts, - publish_initial_backoff_millis: self.publish_initial_backoff_millis, - publish_max_backoff_millis: self.publish_max_backoff_millis, - job_status_retention: self.job_status_retention, - state_path: self - .state_path - .unwrap_or_else(|| paths.bridge_state_path.clone()), + max_event_bytes: self.max_event_bytes, + max_relays_per_request: self.max_relays_per_request, + job_list_limit: self.job_list_limit, + database_path: self + .database_path + .unwrap_or_else(|| paths.publish_proxy_database_path.clone()), } } } @@ -169,7 +149,9 @@ struct RawConfiguration { #[serde(default)] pub nip46: Nip46Config, #[serde(default)] - pub bridge: RawBridgeConfig, + pub publish_proxy: RawPublishProxyConfig, + #[serde(default)] + pub bridge: Option<serde::de::IgnoredAny>, } #[derive(Debug, Deserialize, Clone)] @@ -187,7 +169,8 @@ impl RawSettings { rpc: self.config.rpc, rpc_addr: self.config.rpc_addr, nip46: self.config.nip46, - bridge: self.config.bridge.into_bridge_config(paths), + publish_proxy: self.config.publish_proxy.into_publish_proxy_config(paths), + obsolete_bridge_config_present: self.config.bridge.is_some(), }, } } @@ -241,76 +224,45 @@ impl Default for Nip46Config { } } -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -pub enum BridgeDeliveryPolicy { - Any, - Quorum, - All, -} - -impl BridgeDeliveryPolicy { - pub fn as_str(self) -> &'static str { - match self { - Self::Any => "any", - Self::Quorum => "quorum", - Self::All => "all", - } - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct BridgeConfig { - #[serde(default = "default_bridge_enabled")] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct PublishProxyConfig { + #[serde(default = "default_publish_proxy_enabled")] pub enabled: bool, - #[serde(default)] - pub bearer_token: Option<String>, - #[serde(default = "default_bridge_connect_timeout_secs")] + #[serde(default = "default_publish_proxy_connect_timeout_secs")] pub connect_timeout_secs: u64, - #[serde(default = "default_bridge_delivery_policy")] - pub delivery_policy: BridgeDeliveryPolicy, - #[serde(default)] - pub delivery_quorum: Option<usize>, - #[serde(default = "default_bridge_publish_max_attempts")] - pub publish_max_attempts: usize, - #[serde(default = "default_bridge_publish_initial_backoff_millis")] - pub publish_initial_backoff_millis: u64, - #[serde(default = "default_bridge_publish_max_backoff_millis")] - pub publish_max_backoff_millis: u64, - #[serde(default = "default_bridge_job_status_retention")] - pub job_status_retention: usize, - #[serde(default = "default_bridge_state_path")] - pub state_path: PathBuf, -} - -impl Default for BridgeConfig { + #[serde(default = "default_publish_proxy_max_event_bytes")] + pub max_event_bytes: usize, + #[serde(default = "default_publish_proxy_max_relays_per_request")] + pub max_relays_per_request: usize, + #[serde(default = "default_publish_proxy_job_list_limit")] + pub job_list_limit: usize, + #[serde(default = "default_publish_proxy_database_path")] + pub database_path: PathBuf, +} + +impl Default for PublishProxyConfig { fn default() -> Self { Self { - enabled: default_bridge_enabled(), - bearer_token: None, - connect_timeout_secs: default_bridge_connect_timeout_secs(), - delivery_policy: default_bridge_delivery_policy(), - delivery_quorum: None, - publish_max_attempts: default_bridge_publish_max_attempts(), - publish_initial_backoff_millis: default_bridge_publish_initial_backoff_millis(), - publish_max_backoff_millis: default_bridge_publish_max_backoff_millis(), - job_status_retention: default_bridge_job_status_retention(), - state_path: default_bridge_state_path(), + enabled: default_publish_proxy_enabled(), + connect_timeout_secs: default_publish_proxy_connect_timeout_secs(), + max_event_bytes: default_publish_proxy_max_event_bytes(), + max_relays_per_request: default_publish_proxy_max_relays_per_request(), + job_list_limit: default_publish_proxy_job_list_limit(), + database_path: default_publish_proxy_database_path(), } } } -impl BridgeConfig { - pub fn bearer_token(&self) -> Option<&str> { - self.bearer_token - .as_deref() - .map(str::trim) - .filter(|token| !token.is_empty()) - } - +impl PublishProxyConfig { pub fn validate(&self) -> Result<()> { - if self.enabled && self.bearer_token().is_none() { - bail!("bridge bearer_token is required when bridge ingress is enabled"); + if self.max_event_bytes == 0 { + bail!("publish_proxy max_event_bytes must be greater than zero"); + } + if self.max_relays_per_request == 0 { + bail!("publish_proxy max_relays_per_request must be greater than zero"); + } + if self.job_list_limit == 0 { + bail!("publish_proxy job_list_limit must be greater than zero"); } Ok(()) } @@ -330,7 +282,7 @@ pub struct RpcConfig { pub max_subscriptions_per_connection: u32, #[serde(default = "default_message_buffer_capacity")] pub message_buffer_capacity: u32, - #[serde(default)] + #[serde(default = "default_rpc_batch_request_limit")] pub batch_request_limit: Option<u32>, } @@ -343,7 +295,7 @@ impl Default for RpcConfig { max_connections: default_max_connections(), max_subscriptions_per_connection: default_max_subscriptions_per_connection(), message_buffer_capacity: default_message_buffer_capacity(), - batch_request_limit: None, + batch_request_limit: default_rpc_batch_request_limit(), } } } @@ -359,7 +311,9 @@ pub struct Configuration { #[serde(default)] pub nip46: Nip46Config, #[serde(default)] - pub bridge: BridgeConfig, + pub publish_proxy: PublishProxyConfig, + #[serde(default, skip_serializing)] + pub(crate) obsolete_bridge_config_present: bool, } impl Configuration { @@ -368,7 +322,10 @@ impl Configuration { } pub fn validate(&self) -> Result<()> { - self.bridge.validate()?; + if self.obsolete_bridge_config_present { + bail!("config.bridge is obsolete; use config.publish_proxy"); + } + self.publish_proxy.validate()?; Ok(()) } } @@ -390,7 +347,7 @@ mod tests { use std::path::PathBuf; use super::{ - BridgeConfig, BridgeDeliveryPolicy, Configuration, Nip46Config, RpcConfig, + Configuration, Nip46Config, PublishProxyConfig, RpcConfig, load_settings_from_path_with_resolver, }; use crate::app::paths::{ @@ -437,31 +394,22 @@ mod tests { } #[test] - fn rpc_defaults_are_expected() { + fn rpc_defaults_disable_batches() { let cfg = RpcConfig::default(); assert_eq!(cfg.addr, "127.0.0.1:7070"); - assert_eq!(cfg.max_request_body_size, 10 * 1024 * 1024); - assert_eq!(cfg.max_response_body_size, 10 * 1024 * 1024); - assert_eq!(cfg.max_connections, 100); - assert_eq!(cfg.max_subscriptions_per_connection, 1024); - assert_eq!(cfg.message_buffer_capacity, 1024); - assert!(cfg.batch_request_limit.is_none()); + assert_eq!(cfg.batch_request_limit, Some(0)); } #[test] - fn bridge_defaults_are_expected() { + fn publish_proxy_defaults_are_expected() { let paths = default_runtime_paths_for_process().expect("resolve process runtime paths"); - let cfg = BridgeConfig::default(); - assert!(!cfg.enabled); - assert!(cfg.bearer_token.is_none()); + let cfg = PublishProxyConfig::default(); + assert!(cfg.enabled); assert_eq!(cfg.connect_timeout_secs, 10); - assert_eq!(cfg.delivery_policy, BridgeDeliveryPolicy::Any); - assert_eq!(cfg.delivery_quorum, None); - assert_eq!(cfg.publish_max_attempts, 1); - assert_eq!(cfg.publish_initial_backoff_millis, 250); - assert_eq!(cfg.publish_max_backoff_millis, 2_000); - assert_eq!(cfg.job_status_retention, 256); - assert_eq!(cfg.state_path, paths.bridge_state_path); + assert_eq!(cfg.max_event_bytes, 128 * 1024); + assert_eq!(cfg.max_relays_per_request, 20); + assert_eq!(cfg.job_list_limit, 100); + assert_eq!(cfg.database_path, paths.publish_proxy_database_path); } #[test] @@ -474,7 +422,8 @@ mod tests { }, rpc_addr: None, nip46: Nip46Config::default(), - bridge: BridgeConfig::default(), + publish_proxy: PublishProxyConfig::default(), + obsolete_bridge_config_present: false, }; assert_eq!(cfg.rpc_addr(), "127.0.0.1:1111"); cfg.rpc_addr = Some("127.0.0.1:2222".to_string()); @@ -482,25 +431,16 @@ mod tests { } #[test] - fn bridge_validation_requires_bearer_token_when_enabled() { - let err = BridgeConfig { - enabled: true, - ..BridgeConfig::default() - } - .validate() - .expect_err("missing token should fail"); - assert!(err.to_string().contains("bearer_token")); - } - - #[test] - fn bridge_validation_accepts_enabled_bridge_with_bearer_token() { - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - } - .validate() - .expect("valid bridge config"); + fn publish_proxy_validation_rejects_zero_limits() { + let mut cfg = PublishProxyConfig::default(); + cfg.max_event_bytes = 0; + assert!(cfg.validate().is_err()); + let mut cfg = PublishProxyConfig::default(); + cfg.max_relays_per_request = 0; + assert!(cfg.validate().is_err()); + let mut cfg = PublishProxyConfig::default(); + cfg.job_list_limit = 0; + assert!(cfg.validate().is_err()); } #[test] @@ -527,10 +467,8 @@ mod tests { ) ); assert_eq!( - paths.bridge_state_path, - PathBuf::from( - "/home/treesap/.radroots/data/services/radrootsd/bridge/bridge-jobs.json" - ) + paths.publish_proxy_database_path, + PathBuf::from("/home/treesap/.radroots/data/services/radrootsd/publish_proxy.sqlite") ); } @@ -556,8 +494,8 @@ mod tests { PathBuf::from("/etc/radroots/secrets/services/radrootsd/identity.secret.json") ); assert_eq!( - paths.bridge_state_path, - PathBuf::from("/var/lib/radroots/services/radrootsd/bridge/bridge-jobs.json") + paths.publish_proxy_database_path, + PathBuf::from("/var/lib/radroots/services/radrootsd/publish_proxy.sqlite") ); } @@ -584,8 +522,8 @@ mod tests { repo_local_root.join("secrets/services/radrootsd/identity.secret.json") ); assert_eq!( - paths.bridge_state_path, - repo_local_root.join("data/services/radrootsd/bridge/bridge-jobs.json") + paths.publish_proxy_database_path, + repo_local_root.join("data/services/radrootsd/publish_proxy.sqlite") ); } @@ -604,10 +542,6 @@ relays = ["ws://127.0.0.1:8080"] [config.rpc] addr = "127.0.0.1:7070" - -[config.bridge] -enabled = true -bearer_token = "change-me" "#, ) .expect("write config"); @@ -625,14 +559,41 @@ bearer_token = "change-me" "/home/treesap/.radroots/logs/services/radrootsd" ); assert_eq!( - settings.config.bridge.state_path, - PathBuf::from( - "/home/treesap/.radroots/data/services/radrootsd/bridge/bridge-jobs.json" - ) + settings.config.publish_proxy.database_path, + PathBuf::from("/home/treesap/.radroots/data/services/radrootsd/publish_proxy.sqlite") ); } #[test] + fn obsolete_config_is_rejected() { + let temp = tempfile::tempdir().expect("tempdir"); + let config_path = temp.path().join("radrootsd.toml"); + std::fs::write( + &config_path, + r#" +[metadata] +name = "radrootsd-test" + +[config] +relays = [] + +[config.bridge] +enabled = true +"#, + ) + .expect("write config"); + + let err = load_settings_from_path_with_resolver( + &config_path, + &linux_resolver("/home/treesap"), + RadrootsPathProfile::InteractiveUser, + None, + ) + .expect_err("obsolete config should fail"); + assert!(err.to_string().contains("config.bridge")); + } + + #[test] fn runtime_contract_output_matches_interactive_user_contract() { let contract = runtime_contract_with_resolver( &linux_resolver("/home/treesap"), @@ -642,92 +603,16 @@ bearer_token = "change-me" .expect("interactive-user contract"); assert_eq!(contract.active_profile, "interactive_user"); - assert_eq!(contract.path_overrides.profile_source, "caller"); - assert_eq!(contract.path_overrides.root_source, "host_defaults"); - assert_eq!(contract.path_overrides.repo_local_root, None); - assert_eq!(contract.path_overrides.repo_local_root_source, None); - assert_eq!( - contract.path_overrides.subordinate_path_override_source, - "config_artifact" - ); assert_eq!( contract.path_overrides.subordinate_path_override_keys, vec![ "config.service.logs_dir".to_owned(), - "config.bridge.state_path".to_owned(), + "config.publish_proxy.database_path".to_owned(), ] ); assert_eq!( - contract.allowed_profiles, - vec![ - "interactive_user".to_owned(), - "service_host".to_owned(), - "repo_local".to_owned(), - ] - ); - assert_eq!(contract.default_shared_secret_backend, "encrypted_file"); - assert_eq!( - contract.allowed_shared_secret_backends, - vec!["encrypted_file".to_owned()] - ); - assert_eq!( - contract.migration.posture, - "explicit_operator_import_required" - ); - assert_eq!(contract.migration.state, "ready"); - assert_eq!(contract.migration.silent_startup_relocation, false); - assert_eq!( - contract.migration.compatibility_window, - "detect_and_report_only" - ); - assert!(contract.migration.detected_legacy_paths.is_empty()); - assert_eq!( - contract.canonical_config_path, - PathBuf::from("/home/treesap/.radroots/config/services/radrootsd/config.toml") - ); - assert_eq!( - contract.canonical_logs_dir, - PathBuf::from("/home/treesap/.radroots/logs/services/radrootsd") - ); - assert_eq!( - contract.canonical_identity_path, - PathBuf::from( - "/home/treesap/.radroots/secrets/services/radrootsd/identity.secret.json" - ) - ); - assert_eq!( - contract.canonical_bridge_state_path, - PathBuf::from( - "/home/treesap/.radroots/data/services/radrootsd/bridge/bridge-jobs.json" - ) - ); - } - - #[test] - fn runtime_contract_output_matches_service_host_contract() { - let contract = runtime_contract_with_resolver( - &linux_resolver("/home/treesap"), - RadrootsPathProfile::ServiceHost, - None, - ) - .expect("service-host contract"); - - assert_eq!(contract.active_profile, "service_host"); - assert_eq!( - contract.canonical_config_path, - PathBuf::from("/etc/radroots/services/radrootsd/config.toml") - ); - assert_eq!( - contract.canonical_logs_dir, - PathBuf::from("/var/log/radroots/services/radrootsd") - ); - assert_eq!( - contract.canonical_identity_path, - PathBuf::from("/etc/radroots/secrets/services/radrootsd/identity.secret.json") - ); - assert_eq!( - contract.canonical_bridge_state_path, - PathBuf::from("/var/lib/radroots/services/radrootsd/bridge/bridge-jobs.json") + contract.canonical_publish_proxy_database_path, + PathBuf::from("/home/treesap/.radroots/data/services/radrootsd/publish_proxy.sqlite") ); } } diff --git a/src/app/paths.rs b/src/app/paths.rs @@ -11,16 +11,17 @@ use radroots_runtime_paths::{ use serde::Serialize; const RADROOTSD_RUNTIME_ID: &str = "radrootsd"; -const BRIDGE_STATE_DIR_NAME: &str = "bridge"; -const BRIDGE_STATE_FILE_NAME: &str = "bridge-jobs.json"; +const PUBLISH_PROXY_DATABASE_FILE_NAME: &str = "publish_proxy.sqlite"; const RADROOTSD_PATHS_PROFILE_ENV: &str = "RADROOTSD_PATHS_PROFILE"; const RADROOTSD_PATHS_REPO_LOCAL_ROOT_ENV: &str = "RADROOTSD_PATHS_REPO_LOCAL_ROOT"; const RADROOTSD_DEFAULT_SHARED_SECRET_BACKEND: &str = "encrypted_file"; const RADROOTSD_ALLOWED_PROFILES: [&str; 3] = ["interactive_user", "service_host", "repo_local"]; const RADROOTSD_ALLOWED_SHARED_SECRET_BACKENDS: [&str; 1] = ["encrypted_file"]; const SUBORDINATE_PATH_OVERRIDE_SOURCE: &str = "config_artifact"; -const SUBORDINATE_PATH_OVERRIDE_KEYS: [&str; 2] = - ["config.service.logs_dir", "config.bridge.state_path"]; +const SUBORDINATE_PATH_OVERRIDE_KEYS: [&str; 2] = [ + "config.service.logs_dir", + "config.publish_proxy.database_path", +]; const MIGRATION_IMPORT_HINT: &str = "stop the runtime, inspect this legacy path, then perform an explicit import or manual copy into the canonical destination; radrootsd will not move it on startup"; #[derive(Debug, Clone, PartialEq, Eq)] @@ -28,7 +29,7 @@ pub(crate) struct RadrootsdRuntimePaths { pub(crate) config_path: PathBuf, pub(crate) logs_dir: PathBuf, pub(crate) identity_path: PathBuf, - pub(crate) bridge_state_path: PathBuf, + pub(crate) publish_proxy_database_path: PathBuf, } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -42,7 +43,7 @@ pub struct RadrootsdRuntimeContractOutput { pub canonical_config_path: PathBuf, pub canonical_logs_dir: PathBuf, pub canonical_identity_path: PathBuf, - pub canonical_bridge_state_path: PathBuf, + pub canonical_publish_proxy_database_path: PathBuf, } pub type RadrootsdRuntimeMigrationContractOutput = RadrootsRuntimeMigrationContract; @@ -81,10 +82,7 @@ pub(crate) fn resolve_runtime_paths_with_resolver( config_path: namespaced.config.join(DEFAULT_CONFIG_FILE_NAME), logs_dir: namespaced.logs, identity_path: namespaced.secrets.join(DEFAULT_SERVICE_IDENTITY_FILE_NAME), - bridge_state_path: namespaced - .data - .join(BRIDGE_STATE_DIR_NAME) - .join(BRIDGE_STATE_FILE_NAME), + publish_proxy_database_path: namespaced.data.join(PUBLISH_PROXY_DATABASE_FILE_NAME), }) } @@ -97,10 +95,10 @@ pub(crate) fn default_runtime_paths_for_process() -> Result<RadrootsdRuntimePath ) } -pub(crate) fn default_bridge_state_path() -> PathBuf { +pub(crate) fn default_publish_proxy_database_path() -> PathBuf { default_runtime_paths_for_process() .expect("resolve canonical radrootsd runtime paths") - .bridge_state_path + .publish_proxy_database_path } #[cfg_attr(test, allow(dead_code))] @@ -155,7 +153,7 @@ fn runtime_contract_with_selection( canonical_config_path: paths.config_path, canonical_logs_dir: paths.logs_dir, canonical_identity_path: paths.identity_path, - canonical_bridge_state_path: paths.bridge_state_path, + canonical_publish_proxy_database_path: paths.publish_proxy_database_path, }) } @@ -197,13 +195,6 @@ fn legacy_path_candidates( Some(contract.canonical_logs_dir.clone()), MIGRATION_IMPORT_HINT, ), - RadrootsLegacyPathCandidate::new( - "radrootsd_repo_bridge_state_v0", - "legacy radrootsd repo-relative bridge state", - current_dir.join("state/bridge-jobs.json"), - Some(contract.canonical_bridge_state_path.clone()), - MIGRATION_IMPORT_HINT, - ), ] } @@ -241,9 +232,6 @@ mod tests { "[metadata]\nname = \"old\"\n", ) .expect("write old config"); - std::fs::create_dir_all(temp.path().join("state")).expect("state dir"); - std::fs::write(temp.path().join("state/bridge-jobs.json"), "[]") - .expect("write old bridge state"); let contract = runtime_contract_with_resolver( &linux_resolver(), RadrootsPathProfile::InteractiveUser, @@ -256,7 +244,7 @@ mod tests { assert_eq!(report.posture, "explicit_operator_import_required"); assert_eq!(report.state, "legacy_state_detected"); assert!(!report.silent_startup_relocation); - assert_eq!(report.detected_legacy_paths.len(), 2); + assert_eq!(report.detected_legacy_paths.len(), 1); assert_eq!( report.detected_legacy_paths[0].id, "radrootsd_repo_config_v0" @@ -265,9 +253,5 @@ mod tests { report.detected_legacy_paths[0].destination, Some(contract.canonical_config_path) ); - assert_eq!( - report.detected_legacy_paths[1].id, - "radrootsd_repo_bridge_state_v0" - ); } } diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -14,7 +14,6 @@ use crate::transport::nostr::listener::spawn_nip46_listener; use anyhow::Context; #[cfg(not(test))] use clap::Parser; -use radroots_events::kinds::KIND_LISTING; use radroots_events::profile::RadrootsProfileType; use radroots_nostr::prelude::{ RadrootsNostrApplicationHandlerSpec, RadrootsNostrKind, @@ -58,9 +57,9 @@ struct RadrootsdRuntimeStartupReport { identity_path: PathBuf, identity_path_source: String, canonical_identity_path: PathBuf, - bridge_state_path: PathBuf, - bridge_state_path_source: String, - canonical_bridge_state_path: PathBuf, + publish_proxy_database_path: PathBuf, + publish_proxy_database_path_source: String, + canonical_publish_proxy_database_path: PathBuf, path_overrides: paths::RadrootsdRuntimePathOverrideContractOutput, migration: paths::RadrootsdRuntimeMigrationContractOutput, default_shared_secret_backend: String, @@ -197,12 +196,14 @@ fn runtime_startup_report( &contract.canonical_identity_path, ), canonical_identity_path: contract.canonical_identity_path.clone(), - bridge_state_path: settings.config.bridge.state_path.clone(), - bridge_state_path_source: config_or_profile_path_source( - &settings.config.bridge.state_path, - &contract.canonical_bridge_state_path, + publish_proxy_database_path: settings.config.publish_proxy.database_path.clone(), + publish_proxy_database_path_source: config_or_profile_path_source( + &settings.config.publish_proxy.database_path, + &contract.canonical_publish_proxy_database_path, ), - canonical_bridge_state_path: contract.canonical_bridge_state_path.clone(), + canonical_publish_proxy_database_path: contract + .canonical_publish_proxy_database_path + .clone(), path_overrides: contract.path_overrides.clone(), migration, default_shared_secret_backend: contract.default_shared_secret_backend.clone(), @@ -252,9 +253,9 @@ fn log_runtime_startup_report(report: &RadrootsdRuntimeStartupReport) { identity_path = %report.identity_path.display(), identity_path_source = report.identity_path_source.as_str(), canonical_identity_path = %report.canonical_identity_path.display(), - bridge_state_path = %report.bridge_state_path.display(), - bridge_state_path_source = report.bridge_state_path_source.as_str(), - canonical_bridge_state_path = %report.canonical_bridge_state_path.display(), + publish_proxy_database_path = %report.publish_proxy_database_path.display(), + publish_proxy_database_path_source = report.publish_proxy_database_path_source.as_str(), + canonical_publish_proxy_database_path = %report.canonical_publish_proxy_database_path.display(), default_shared_secret_backend = report.default_shared_secret_backend.as_str(), allowed_shared_secret_backends = ?report.allowed_shared_secret_backends, "radrootsd runtime contract" @@ -292,10 +293,9 @@ async fn publish_service_presence( identity: RadrootsIdentity, metadata: radroots_nostr::prelude::RadrootsNostrMetadata, service_cfg: radroots_runtime::RadrootsNostrServiceConfig, - bridge_config: config::BridgeConfig, nip46_config: config::Nip46Config, ) -> Result<()> { - let kinds = service_presence_kinds(&bridge_config); + let kinds = service_presence_kinds(); let handler_spec = RadrootsNostrApplicationHandlerSpec { kinds, identifier: service_cfg.nip89_identifier.clone(), @@ -313,20 +313,12 @@ async fn maybe_publish_service_presence( identity: RadrootsIdentity, metadata: radroots_nostr::prelude::RadrootsNostrMetadata, service_cfg: radroots_runtime::RadrootsNostrServiceConfig, - bridge_config: config::BridgeConfig, nip46_config: config::Nip46Config, ) { #[cfg(test)] { - let result = publish_service_presence( - client, - identity, - metadata, - service_cfg, - bridge_config, - nip46_config, - ) - .await; + let result = + publish_service_presence(client, identity, metadata, service_cfg, nip46_config).await; if let Err(err) = result { warn!("Failed to publish service presence on startup: {err}"); } else { @@ -337,15 +329,8 @@ async fn maybe_publish_service_presence( #[cfg(not(test))] tokio::spawn(async move { - let result = publish_service_presence( - client, - identity, - metadata, - service_cfg, - bridge_config, - nip46_config, - ) - .await; + let result = + publish_service_presence(client, identity, metadata, service_cfg, nip46_config).await; if let Err(err) = result { warn!("Failed to publish service presence on startup: {err}"); } else { @@ -403,6 +388,51 @@ async fn wait_for_shutdown_or_stopped(handle: ServerHandle) -> RunWaitOutcome { } } +async fn handle_command(command: cli::Command, settings: &config::Settings) -> Result<()> { + match command { + cli::Command::PublishProxy(command) => match command.command { + cli::PublishProxySubcommand::Principal(command) => match command.command { + cli::PrincipalSubcommand::Init(args) => { + let token = crate::core::publish_proxy::generate_bearer_token(); + let token_hash = crate::core::publish_proxy::hash_bearer_token(token.as_str()); + let store = crate::core::publish_proxy::PublishProxyStore::open( + settings.config.publish_proxy.database_path.clone(), + )?; + let principal = store.create_principal( + crate::core::publish_proxy::PublishPrincipalInit { + label: args.label, + token_hash, + allowed_pubkeys: args.allowed_pubkey, + allowed_kinds: args.allowed_kind, + allowed_relay_policies: args + .allowed_relay_policy + .iter() + .map(|policy| { + crate::core::publish_proxy::parse_relay_policy(policy.as_str()) + }) + .collect::<Result<Vec<_>, _>>()?, + allow_request_relays: args.allow_request_relays, + job_visibility: args.job_visibility.parse()?, + expires_at_unix: None, + }, + )?; + crate::core::publish_proxy::write_token_file(&args.token_file, token.as_str())?; + println!( + "{}", + serde_json::json!({ + "principal_id": principal.principal_id, + "label": principal.label, + "token_file": args.token_file, + "database_path": settings.config.publish_proxy.database_path, + }) + ); + Ok(()) + } + }, + }, + } +} + pub async fn run() -> Result<()> { let (args, settings): (cli::Args, config::Settings) = load_args_and_settings()?; settings.config.validate()?; @@ -416,6 +446,10 @@ pub async fn run() -> Result<()> { log_runtime_startup_report(&report); } + if let Some(command) = args.command.clone() { + return handle_command(command, &settings).await; + } + info!("Starting radrootsd"); let identity = load_service_identity( @@ -425,7 +459,7 @@ pub async fn run() -> Result<()> { let radrootsd = Radrootsd::new( identity.clone(), settings.metadata.clone(), - settings.config.bridge.clone(), + settings.config.publish_proxy.clone(), settings.config.nip46.clone(), ); let radrootsd = radrootsd?; @@ -440,7 +474,6 @@ pub async fn run() -> Result<()> { identity.clone(), settings.metadata.clone(), settings.config.service.clone(), - settings.config.bridge.clone(), settings.config.nip46.clone(), ) .await; @@ -465,11 +498,8 @@ pub async fn run() -> Result<()> { Ok(()) } -fn service_presence_kinds(bridge_config: &config::BridgeConfig) -> Vec<u32> { +fn service_presence_kinds() -> Vec<u32> { let mut kinds = vec![RadrootsNostrKind::NostrConnect.as_u16() as u32]; - if bridge_config.enabled { - kinds.push(KIND_LISTING); - } kinds.sort_unstable(); kinds.dedup(); kinds @@ -485,7 +515,6 @@ mod tests { use crate::app::{cli, config, paths}; use crate::core::Radrootsd; use crate::transport::jsonrpc; - use radroots_events::kinds::KIND_LISTING; use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::RadrootsNostrMetadata; use std::path::Path; @@ -535,6 +564,7 @@ mod tests { identity: Some(path), allow_generate_identity: allow_generate, }, + command: None, } } @@ -555,8 +585,9 @@ mod tests { ..config::RpcConfig::default() }, rpc_addr: Some("127.0.0.1:0".to_string()), - bridge: config::BridgeConfig::default(), nip46: config::Nip46Config::default(), + publish_proxy: config::PublishProxyConfig::default(), + obsolete_bridge_config_present: false, }, } } @@ -577,7 +608,7 @@ mod tests { subordinate_path_override_source: "config_artifact".to_string(), subordinate_path_override_keys: vec![ "config.service.logs_dir".to_string(), - "config.bridge.state_path".to_string(), + "config.publish_proxy.database_path".to_string(), ], }, default_shared_secret_backend: "encrypted_file".to_string(), @@ -596,8 +627,8 @@ mod tests { canonical_identity_path: PathBuf::from( "/home/treesap/.radroots/secrets/services/radrootsd/identity.secret.json", ), - canonical_bridge_state_path: PathBuf::from( - "/home/treesap/.radroots/data/services/radrootsd/bridge/bridge-jobs.json", + canonical_publish_proxy_database_path: PathBuf::from( + "/home/treesap/.radroots/data/services/radrootsd/publish_proxy.sqlite", ), } } @@ -607,7 +638,7 @@ mod tests { let state = Radrootsd::new( identity, settings.metadata.clone(), - settings.config.bridge.clone(), + settings.config.publish_proxy.clone(), settings.config.nip46.clone(), ) .expect("state"); @@ -642,21 +673,6 @@ mod tests { } #[tokio::test] - async fn run_returns_error_when_bridge_is_enabled_without_bearer_token() { - let _guard = test_guard(); - let path = unique_identity_path("bridge-auth"); - let args = args_for_identity(path, true); - let mut settings = settings_with_relays(Vec::new()); - settings.config.bridge.enabled = true; - settings.config.bridge.bearer_token = None; - *run_load_hook() - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, settings))); - let err = run().await.expect_err("invalid bridge config should error"); - assert!(err.to_string().contains("bearer_token")); - } - - #[tokio::test] async fn run_covers_shutdown_path_and_presence_success() { let _guard = test_guard(); let path = unique_identity_path("shutdown"); @@ -811,18 +827,15 @@ mod tests { } #[test] - fn service_presence_kinds_include_listing_when_bridge_is_enabled() { - let mut bridge = config::BridgeConfig::default(); - bridge.enabled = true; - - let kinds = super::service_presence_kinds(&bridge); + fn service_presence_kinds_include_nostr_connect_only() { + let kinds = super::service_presence_kinds(); assert!( kinds.contains( &(radroots_nostr::prelude::RadrootsNostrKind::NostrConnect.as_u16() as u32) ) ); - assert!(kinds.contains(&KIND_LISTING)); + assert_eq!(kinds.len(), 1); } #[test] @@ -833,10 +846,12 @@ mod tests { identity: Some(PathBuf::from("/tmp/radrootsd/identity.secret.json")), allow_generate_identity: false, }, + command: None, }; let mut settings = settings_with_relays(Vec::new()); settings.config.service.logs_dir = "/tmp/radrootsd/logs".to_string(); - settings.config.bridge.state_path = PathBuf::from("/tmp/radrootsd/bridge-jobs.json"); + settings.config.publish_proxy.database_path = + PathBuf::from("/tmp/radrootsd/publish_proxy.sqlite"); let contract = sample_runtime_contract(); let report = @@ -861,10 +876,10 @@ mod tests { canonical_identity_path: PathBuf::from( "/home/treesap/.radroots/secrets/services/radrootsd/identity.secret.json" ), - bridge_state_path: PathBuf::from("/tmp/radrootsd/bridge-jobs.json"), - bridge_state_path_source: "config_artifact".to_string(), - canonical_bridge_state_path: PathBuf::from( - "/home/treesap/.radroots/data/services/radrootsd/bridge/bridge-jobs.json" + publish_proxy_database_path: PathBuf::from("/tmp/radrootsd/publish_proxy.sqlite"), + publish_proxy_database_path_source: "config_artifact".to_string(), + canonical_publish_proxy_database_path: PathBuf::from( + "/home/treesap/.radroots/data/services/radrootsd/publish_proxy.sqlite" ), path_overrides: sample_runtime_contract().path_overrides, migration: sample_runtime_contract().migration, @@ -882,11 +897,13 @@ mod tests { identity: None, allow_generate_identity: false, }, + command: None, }; let contract = sample_runtime_contract(); let mut settings = settings_with_relays(Vec::new()); settings.config.service.logs_dir = contract.canonical_logs_dir.display().to_string(); - settings.config.bridge.state_path = contract.canonical_bridge_state_path.clone(); + settings.config.publish_proxy.database_path = + contract.canonical_publish_proxy_database_path.clone(); let report = runtime_startup_report(&args, &settings, &contract, contract.migration.clone()); @@ -898,10 +915,10 @@ mod tests { assert_eq!(report.identity_path, contract.canonical_identity_path); assert_eq!(report.identity_path_source, "profile_default"); assert_eq!( - report.bridge_state_path, - contract.canonical_bridge_state_path + report.publish_proxy_database_path, + contract.canonical_publish_proxy_database_path ); - assert_eq!(report.bridge_state_path_source, "profile_default"); + assert_eq!(report.publish_proxy_database_path_source, "profile_default"); assert_eq!(report.path_overrides, contract.path_overrides); assert_eq!(report.migration, contract.migration); assert_eq!(report.default_shared_secret_backend, "encrypted_file"); diff --git a/src/core/bridge/mod.rs b/src/core/bridge/mod.rs @@ -1,2 +0,0 @@ -pub mod publish; -pub mod store; diff --git a/src/core/bridge/publish.rs b/src/core/bridge/publish.rs @@ -1,590 +0,0 @@ -use std::collections::{BTreeMap, BTreeSet}; -use std::time::Duration; - -use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrOutput, RadrootsNostrRelayUrl}; -use serde::{Deserialize, Serialize}; -use tokio::time::sleep; - -use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct BridgeRelayPublishResult { - pub relay_url: String, - pub acknowledged: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub detail: Option<String>, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] -pub struct BridgePublishExecution { - pub published: bool, - pub relay_count: usize, - pub acknowledged_relay_count: usize, - pub required_acknowledged_relay_count: usize, - pub delivery_policy: BridgeDeliveryPolicy, - pub attempt_count: usize, - pub relay_outcome_summary: String, - pub relay_results: Vec<BridgeRelayPublishResult>, - pub attempt_summaries: Vec<String>, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BridgePublishSettings { - pub connect_timeout_secs: u64, - pub delivery_policy: BridgeDeliveryPolicy, - pub delivery_quorum: Option<usize>, - pub publish_max_attempts: usize, - pub publish_initial_backoff_millis: u64, - pub publish_max_backoff_millis: u64, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct BridgePublishAttemptResult { - attempt_number: usize, - acknowledged_relay_count: usize, - relay_outcome_summary: String, - relay_results: Vec<BridgeRelayPublishResult>, -} - -impl BridgePublishSettings { - pub fn from_config(config: &BridgeConfig) -> Self { - Self { - connect_timeout_secs: config.connect_timeout_secs, - delivery_policy: config.delivery_policy, - delivery_quorum: config.delivery_quorum, - publish_max_attempts: config.publish_max_attempts, - publish_initial_backoff_millis: config.publish_initial_backoff_millis, - publish_max_backoff_millis: config.publish_max_backoff_millis, - } - } - - fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, String> { - if relay_count == 0 { - return Err("cannot publish without at least one relay".to_string()); - } - if self.connect_timeout_secs == 0 { - return Err("bridge.connect_timeout_secs must be greater than zero".to_string()); - } - if self.publish_max_attempts == 0 { - return Err("bridge.publish_max_attempts must be greater than zero".to_string()); - } - if self.publish_initial_backoff_millis == 0 { - return Err( - "bridge.publish_initial_backoff_millis must be greater than zero".to_string(), - ); - } - if self.publish_max_backoff_millis == 0 { - return Err("bridge.publish_max_backoff_millis must be greater than zero".to_string()); - } - if self.publish_initial_backoff_millis > self.publish_max_backoff_millis { - return Err( - "bridge.publish_max_backoff_millis must be greater than or equal to bridge.publish_initial_backoff_millis" - .to_string(), - ); - } - - match self.delivery_policy { - BridgeDeliveryPolicy::Any => Ok(1), - BridgeDeliveryPolicy::All => Ok(relay_count), - BridgeDeliveryPolicy::Quorum => { - let delivery_quorum = self.delivery_quorum.ok_or_else(|| { - "bridge.delivery_quorum must be set when bridge.delivery_policy is `quorum`" - .to_string() - })?; - if delivery_quorum == 0 { - return Err("bridge.delivery_quorum must be greater than zero".to_string()); - } - if delivery_quorum > relay_count { - return Err(format!( - "bridge.delivery_quorum `{delivery_quorum}` cannot be satisfied by `{relay_count}` target relays" - )); - } - Ok(delivery_quorum) - } - } - } - - fn backoff_for_attempt(&self, completed_attempt_number: usize) -> u64 { - let exponent = completed_attempt_number.saturating_sub(1) as u32; - let scaled = self - .publish_initial_backoff_millis - .saturating_mul(2_u64.saturating_pow(exponent)); - scaled.min(self.publish_max_backoff_millis) - } -} - -pub async fn connect_and_publish_event( - client: &RadrootsNostrClient, - settings: &BridgePublishSettings, - event: &radroots_nostr::prelude::RadrootsNostrEvent, -) -> BridgePublishExecution { - let relays = client - .relays() - .await - .keys() - .cloned() - .collect::<Vec<RadrootsNostrRelayUrl>>(); - publish_with_policy(&relays, settings, || async { - client.connect().await; - client - .wait_for_connection(Duration::from_secs(settings.connect_timeout_secs)) - .await; - client - .send_event(event) - .await - .map_err(|error| error.to_string()) - }) - .await -} - -pub fn failed_prepublish_execution( - settings: &BridgePublishSettings, - summary: impl Into<String>, -) -> BridgePublishExecution { - let summary = summary.into(); - BridgePublishExecution { - published: false, - relay_count: 0, - acknowledged_relay_count: 0, - required_acknowledged_relay_count: 0, - delivery_policy: settings.delivery_policy, - attempt_count: 0, - relay_outcome_summary: summary.clone(), - relay_results: Vec::new(), - attempt_summaries: vec![summary], - } -} - -pub async fn publish_with_policy<T, F, Fut>( - relays: &[RadrootsNostrRelayUrl], - settings: &BridgePublishSettings, - mut send_attempt: F, -) -> BridgePublishExecution -where - T: std::fmt::Debug, - F: FnMut() -> Fut, - Fut: std::future::Future<Output = Result<RadrootsNostrOutput<T>, String>>, -{ - let relay_count = relays.len(); - let required_acknowledged_relay_count = - match settings.required_acknowledged_relay_count(relay_count) { - Ok(required) => required, - Err(error) => { - let relay_results = relays - .iter() - .map(|relay| BridgeRelayPublishResult { - relay_url: relay.to_string(), - acknowledged: false, - detail: Some(error.clone()), - }) - .collect::<Vec<_>>(); - return BridgePublishExecution { - published: false, - relay_count, - acknowledged_relay_count: 0, - required_acknowledged_relay_count: 0, - delivery_policy: settings.delivery_policy, - attempt_count: 0, - relay_outcome_summary: error.clone(), - relay_results, - attempt_summaries: vec![error], - }; - } - }; - let mut attempt_results = Vec::new(); - - for attempt_number in 1..=settings.publish_max_attempts { - let attempt = match send_attempt().await { - Ok(output) => build_publish_attempt_result(relays, attempt_number, &output), - Err(error) => build_failed_publish_attempt_result(relays, attempt_number, error), - }; - let threshold_reached = - attempt.acknowledged_relay_count >= required_acknowledged_relay_count; - attempt_results.push(attempt); - - if threshold_reached { - let final_attempt = attempt_results - .last() - .expect("publish attempt results contain the successful attempt"); - return BridgePublishExecution { - published: true, - relay_count, - acknowledged_relay_count: final_attempt.acknowledged_relay_count, - required_acknowledged_relay_count, - delivery_policy: settings.delivery_policy, - attempt_count: attempt_results.len(), - relay_outcome_summary: summarize_delivery_policy_result( - settings.delivery_policy, - required_acknowledged_relay_count, - &attempt_results, - ), - relay_results: final_attempt.relay_results.clone(), - attempt_summaries: attempt_results - .iter() - .map(|attempt| attempt.relay_outcome_summary.clone()) - .collect(), - }; - } - - if attempt_number < settings.publish_max_attempts { - sleep(Duration::from_millis( - settings.backoff_for_attempt(attempt_number), - )) - .await; - } - } - - let final_attempt = attempt_results - .last() - .expect("publish attempt results contain at least one attempt"); - BridgePublishExecution { - published: false, - relay_count, - acknowledged_relay_count: final_attempt.acknowledged_relay_count, - required_acknowledged_relay_count, - delivery_policy: settings.delivery_policy, - attempt_count: attempt_results.len(), - relay_outcome_summary: summarize_delivery_policy_result( - settings.delivery_policy, - required_acknowledged_relay_count, - &attempt_results, - ), - relay_results: final_attempt.relay_results.clone(), - attempt_summaries: attempt_results - .iter() - .map(|attempt| attempt.relay_outcome_summary.clone()) - .collect(), - } -} - -fn build_publish_relay_results<T>( - relays: &[RadrootsNostrRelayUrl], - output: &RadrootsNostrOutput<T>, -) -> Vec<BridgeRelayPublishResult> -where - T: std::fmt::Debug, -{ - let acknowledged_relays = output - .success - .iter() - .map(ToString::to_string) - .collect::<BTreeSet<_>>(); - let failed_relays = output - .failed - .iter() - .map(|(relay, error)| (relay.to_string(), error.to_string())) - .collect::<BTreeMap<_, _>>(); - - relays - .iter() - .map(|relay| { - let relay_url = relay.to_string(); - if acknowledged_relays.contains(&relay_url) { - BridgeRelayPublishResult { - relay_url, - acknowledged: true, - detail: None, - } - } else { - BridgeRelayPublishResult { - relay_url: relay_url.clone(), - acknowledged: false, - detail: Some( - failed_relays - .get(&relay_url) - .cloned() - .unwrap_or_else(|| "no relay acknowledgement reported".to_owned()), - ), - } - } - }) - .collect() -} - -fn build_publish_attempt_result<T>( - relays: &[RadrootsNostrRelayUrl], - attempt_number: usize, - output: &RadrootsNostrOutput<T>, -) -> BridgePublishAttemptResult -where - T: std::fmt::Debug, -{ - let relay_results = build_publish_relay_results(relays, output); - let acknowledged_relay_count = relay_results - .iter() - .filter(|result| result.acknowledged) - .count(); - BridgePublishAttemptResult { - attempt_number, - acknowledged_relay_count, - relay_outcome_summary: summarize_publish_results(&relay_results), - relay_results, - } -} - -fn build_failed_publish_attempt_result( - relays: &[RadrootsNostrRelayUrl], - attempt_number: usize, - error: String, -) -> BridgePublishAttemptResult { - let relay_results = relays - .iter() - .map(|relay| BridgeRelayPublishResult { - relay_url: relay.to_string(), - acknowledged: false, - detail: Some(error.clone()), - }) - .collect::<Vec<_>>(); - BridgePublishAttemptResult { - attempt_number, - acknowledged_relay_count: 0, - relay_outcome_summary: summarize_publish_results(&relay_results), - relay_results, - } -} - -fn summarize_publish_results(relay_results: &[BridgeRelayPublishResult]) -> String { - let relay_count = relay_results.len(); - let acknowledged_relay_count = relay_results - .iter() - .filter(|result| result.acknowledged) - .count(); - if relay_count == 0 { - return "no relay acknowledged the publish".to_owned(); - } - - let mut summary = - format!("{acknowledged_relay_count}/{relay_count} relays acknowledged publish"); - let acknowledged = relay_results - .iter() - .filter(|result| result.acknowledged) - .map(|result| result.relay_url.clone()) - .collect::<Vec<_>>(); - if !acknowledged.is_empty() { - summary.push_str("; acknowledged: "); - summary.push_str(&acknowledged.join(", ")); - } - let failures = relay_results - .iter() - .filter(|result| !result.acknowledged) - .map(|result| match result.detail.as_deref() { - Some(detail) => format!("{}: {detail}", result.relay_url), - None => result.relay_url.clone(), - }) - .collect::<Vec<_>>(); - if !failures.is_empty() { - summary.push_str("; failures: "); - summary.push_str(&failures.join("; ")); - } - summary -} - -fn summarize_delivery_policy_result( - delivery_policy: BridgeDeliveryPolicy, - required_acknowledged_relay_count: usize, - attempt_results: &[BridgePublishAttemptResult], -) -> String { - let attempt_count = attempt_results.len(); - let final_attempt = attempt_results - .last() - .expect("delivery policy summary requires at least one attempt"); - let mut summary = format!( - "delivery policy {} required {required_acknowledged_relay_count} acknowledgements across {attempt_count} attempt(s); final attempt {}: {}", - delivery_policy.as_str(), - final_attempt.attempt_number, - final_attempt.relay_outcome_summary, - ); - if attempt_results.len() > 1 { - let attempt_summaries = attempt_results - .iter() - .map(|attempt| { - format!( - "attempt {}: {}", - attempt.attempt_number, attempt.relay_outcome_summary - ) - }) - .collect::<Vec<_>>(); - summary.push_str("; "); - summary.push_str(&attempt_summaries.join(" | ")); - } - summary -} - -#[cfg(test)] -mod tests { - use std::collections::{HashMap, HashSet}; - use std::sync::{Arc, Mutex}; - - use radroots_nostr::prelude::{ - RadrootsNostrEventId, RadrootsNostrOutput, RadrootsNostrRelayUrl, - }; - use tokio::time::Instant; - - use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy}; - - use super::{BridgePublishSettings, publish_with_policy}; - - #[test] - fn publish_settings_from_config_copies_values() { - let config = BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - connect_timeout_secs: 15, - delivery_policy: BridgeDeliveryPolicy::Quorum, - delivery_quorum: Some(2), - publish_max_attempts: 3, - publish_initial_backoff_millis: 125, - publish_max_backoff_millis: 500, - job_status_retention: 64, - ..BridgeConfig::default() - }; - - assert_eq!( - BridgePublishSettings::from_config(&config), - BridgePublishSettings { - connect_timeout_secs: 15, - delivery_policy: BridgeDeliveryPolicy::Quorum, - delivery_quorum: Some(2), - publish_max_attempts: 3, - publish_initial_backoff_millis: 125, - publish_max_backoff_millis: 500, - } - ); - } - - #[tokio::test] - async fn publish_with_policy_retries_until_threshold_is_met() { - let relays = vec![ - RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"), - RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"), - ]; - let settings = BridgePublishSettings { - connect_timeout_secs: 10, - delivery_policy: BridgeDeliveryPolicy::All, - delivery_quorum: None, - publish_max_attempts: 2, - publish_initial_backoff_millis: 10, - publish_max_backoff_millis: 10, - }; - let attempts = Arc::new(Mutex::new(vec![ - publish_output( - "1111111111111111111111111111111111111111111111111111111111111111", - &["wss://relay-a.example.com"], - &[("wss://relay-b.example.com", "blocked")], - ), - publish_output( - "2222222222222222222222222222222222222222222222222222222222222222", - &["wss://relay-a.example.com", "wss://relay-b.example.com"], - &[], - ), - ])); - - let start = Instant::now(); - let outcome = publish_with_policy(&relays, &settings, || { - let attempts = Arc::clone(&attempts); - async move { - let output = attempts.lock().expect("attempts lock").remove(0); - Ok(output) - } - }) - .await; - - assert!(outcome.published); - assert_eq!(outcome.delivery_policy, BridgeDeliveryPolicy::All); - assert_eq!(outcome.required_acknowledged_relay_count, 2); - assert_eq!(outcome.attempt_count, 2); - assert_eq!(outcome.acknowledged_relay_count, 2); - assert_eq!(outcome.relay_results.len(), 2); - assert_eq!(outcome.attempt_summaries.len(), 2); - assert!( - outcome - .relay_outcome_summary - .contains("delivery policy all") - ); - assert!(outcome.relay_outcome_summary.contains("attempt 1")); - assert!(start.elapsed() >= std::time::Duration::from_millis(10)); - } - - #[tokio::test] - async fn publish_with_policy_reports_threshold_failure() { - let relays = vec![ - RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"), - RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"), - ]; - let settings = BridgePublishSettings { - connect_timeout_secs: 10, - delivery_policy: BridgeDeliveryPolicy::Quorum, - delivery_quorum: Some(2), - publish_max_attempts: 2, - publish_initial_backoff_millis: 1, - publish_max_backoff_millis: 1, - }; - - let outcome = - publish_with_policy::<RadrootsNostrEventId, _, _>(&relays, &settings, || async { - Ok(publish_output( - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - &["wss://relay-a.example.com"], - &[("wss://relay-b.example.com", "blocked")], - )) - }) - .await; - - assert!(!outcome.published); - assert_eq!(outcome.delivery_policy, BridgeDeliveryPolicy::Quorum); - assert_eq!(outcome.required_acknowledged_relay_count, 2); - assert_eq!(outcome.attempt_count, 2); - assert!( - outcome - .relay_outcome_summary - .contains("delivery policy quorum") - ); - } - - #[tokio::test] - async fn publish_with_policy_reports_configuration_failure_without_attempts() { - let settings = BridgePublishSettings { - connect_timeout_secs: 0, - delivery_policy: BridgeDeliveryPolicy::Any, - delivery_quorum: None, - publish_max_attempts: 1, - publish_initial_backoff_millis: 10, - publish_max_backoff_millis: 10, - }; - - let outcome = publish_with_policy::<RadrootsNostrEventId, _, _>(&[], &settings, || async { - unreachable!("configuration failure should short-circuit") - }) - .await; - - assert!(!outcome.published); - assert_eq!(outcome.attempt_count, 0); - assert!(outcome.relay_outcome_summary.contains("cannot publish")); - } - - fn publish_output( - event_id_hex: &str, - succeeded_relays: &[&str], - failed_relays: &[(&str, &str)], - ) -> RadrootsNostrOutput<RadrootsNostrEventId> { - let success = succeeded_relays - .iter() - .map(|relay| RadrootsNostrRelayUrl::parse(*relay).expect("success relay")) - .collect::<HashSet<_>>(); - let failed = failed_relays - .iter() - .map(|(relay, error)| { - ( - RadrootsNostrRelayUrl::parse(*relay).expect("failed relay"), - (*error).to_owned(), - ) - }) - .collect::<HashMap<_, _>>(); - - RadrootsNostrOutput { - val: RadrootsNostrEventId::parse(event_id_hex).expect("event id"), - success, - failed, - } - } -} diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs @@ -1,805 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -use crate::app::config::BridgeDeliveryPolicy; -use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult}; - -const BRIDGE_JOB_STORE_VERSION: u32 = 2; -pub(crate) const BRIDGE_PENDING_RECOVERY_SUMMARY: &str = - "bridge publish did not complete before process restart"; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum BridgeJobStatus { - Accepted, - Published, - Failed, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct BridgeJobRecord { - pub job_id: String, - pub command: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub idempotency_key: Option<String>, - pub status: BridgeJobStatus, - pub requested_at_unix: u64, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub completed_at_unix: Option<u64>, - pub signer_mode: String, - pub event_kind: u32, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_id: Option<String>, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_addr: Option<String>, - pub delivery_policy: BridgeDeliveryPolicy, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub delivery_quorum: Option<usize>, - pub relay_count: usize, - pub acknowledged_relay_count: usize, - pub required_acknowledged_relay_count: usize, - pub attempt_count: usize, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub attempt_summaries: Vec<String>, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub relay_results: Vec<BridgeRelayPublishResult>, - pub relay_outcome_summary: String, -} - -impl BridgeJobRecord { - pub fn is_terminal(&self) -> bool { - self.status != BridgeJobStatus::Accepted - } - - pub fn recovered_after_restart(&self) -> bool { - self.status == BridgeJobStatus::Failed - && self.relay_outcome_summary == BRIDGE_PENDING_RECOVERY_SUMMARY - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] -pub struct BridgeJobStoreSnapshot { - pub retained_jobs: usize, - pub retained_idempotency_keys: usize, - pub accepted_jobs: usize, - pub published_jobs: usize, - pub failed_jobs: usize, - pub recovered_failed_jobs: usize, - pub capacity: usize, -} - -#[derive(Debug, Clone)] -pub struct BridgeJobStoreLoadOutcome { - pub store: BridgeJobStore, - pub recovered_jobs: Vec<BridgeJobRecord>, -} - -#[derive(Debug, Clone)] -pub struct BridgeJobStore { - inner: Arc<RwLock<BridgeJobStoreInner>>, - persistence: Option<Arc<BridgeJobStorePersistence>>, -} - -#[derive(Debug)] -struct BridgeJobStoreInner { - jobs: HashMap<String, BridgeJobRecord>, - idempotency: HashMap<String, BridgeIdempotencyRecord>, - order: VecDeque<String>, - capacity: usize, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -struct BridgeIdempotencyRecord { - job_id: String, - request_fingerprint: String, -} - -#[derive(Debug, Clone)] -struct BridgeJobStorePersistence { - path: PathBuf, -} - -#[derive(Debug, Serialize, Deserialize)] -struct PersistedBridgeJobStore { - version: u32, - jobs: HashMap<String, BridgeJobRecord>, - idempotency: HashMap<String, BridgeIdempotencyRecord>, - order: VecDeque<String>, -} - -#[derive(Debug, Error)] -pub enum BridgeJobStoreError { - #[error("invalid bridge job store path: {0}")] - InvalidStatePath(PathBuf), - #[error("unsupported bridge job store version: {0}")] - UnsupportedStateVersion(u32), - #[error("bridge job store io error: {0}")] - Io(#[from] std::io::Error), - #[error("bridge job store json error: {0}")] - Json(#[from] serde_json::Error), - #[error("idempotency_key `{key}` conflicts with existing bridge job `{existing_job_id}`")] - IdempotencyConflict { - key: String, - existing_job_id: String, - }, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum BridgeJobReservation { - Accepted(BridgeJobRecord), - Duplicate(BridgeJobRecord), -} - -impl BridgeJobStore { - pub fn new(capacity: usize) -> Self { - Self { - inner: Arc::new(RwLock::new(BridgeJobStoreInner { - jobs: HashMap::new(), - idempotency: HashMap::new(), - order: VecDeque::new(), - capacity, - })), - persistence: None, - } - } - - pub fn load( - path: PathBuf, - capacity: usize, - ) -> Result<BridgeJobStoreLoadOutcome, BridgeJobStoreError> { - let persistence = Arc::new(BridgeJobStorePersistence::new(path)); - let inner = persistence.load(capacity)?; - let store = Self { - inner: Arc::new(RwLock::new(inner)), - persistence: Some(persistence), - }; - let recovered_jobs = store.recover_pending_jobs()?; - Ok(BridgeJobStoreLoadOutcome { - store, - recovered_jobs, - }) - } - - pub fn reserve( - &self, - mut record: BridgeJobRecord, - request_fingerprint: String, - ) -> Result<BridgeJobReservation, BridgeJobStoreError> { - let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); - if let Some(idempotency_key) = record.idempotency_key.as_ref() { - if let Some(existing_idempotency) = inner.idempotency.get(idempotency_key) { - if existing_idempotency.request_fingerprint != request_fingerprint { - return Err(BridgeJobStoreError::IdempotencyConflict { - key: idempotency_key.clone(), - existing_job_id: existing_idempotency.job_id.clone(), - }); - } - if let Some(existing) = inner.jobs.get(&existing_idempotency.job_id) { - return Ok(BridgeJobReservation::Duplicate(existing.clone())); - } - } - } - - record.status = BridgeJobStatus::Accepted; - inner.order.push_back(record.job_id.clone()); - if let Some(idempotency_key) = record.idempotency_key.as_ref() { - inner.idempotency.insert( - idempotency_key.clone(), - BridgeIdempotencyRecord { - job_id: record.job_id.clone(), - request_fingerprint, - }, - ); - } - inner.jobs.insert(record.job_id.clone(), record.clone()); - inner.prune(); - let persisted = persisted_store_from_inner(&inner); - drop(inner); - self.persist_snapshot(&persisted)?; - Ok(BridgeJobReservation::Accepted(record)) - } - - pub fn complete( - &self, - job_id: &str, - event_id: Option<String>, - execution: BridgePublishExecution, - ) -> Result<Option<BridgeJobRecord>, BridgeJobStoreError> { - let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); - let Some(record) = inner.jobs.get_mut(job_id) else { - return Ok(None); - }; - if let Some(event_id) = event_id { - record.event_id = Some(event_id); - } - record.status = if execution.published { - BridgeJobStatus::Published - } else { - BridgeJobStatus::Failed - }; - record.completed_at_unix = Some(unix_timestamp_now()); - record.relay_count = execution.relay_count; - record.acknowledged_relay_count = execution.acknowledged_relay_count; - record.required_acknowledged_relay_count = execution.required_acknowledged_relay_count; - record.attempt_count = execution.attempt_count; - record.attempt_summaries = execution.attempt_summaries; - record.relay_results = execution.relay_results; - record.relay_outcome_summary = execution.relay_outcome_summary; - let completed = record.clone(); - let persisted = persisted_store_from_inner(&inner); - drop(inner); - self.persist_snapshot(&persisted)?; - Ok(Some(completed)) - } - - pub fn get(&self, job_id: &str) -> Option<BridgeJobRecord> { - self.inner - .read() - .unwrap_or_else(|e| e.into_inner()) - .jobs - .get(job_id) - .cloned() - } - - pub fn list(&self) -> Vec<BridgeJobRecord> { - let inner = self.inner.read().unwrap_or_else(|e| e.into_inner()); - inner - .order - .iter() - .rev() - .filter_map(|job_id| inner.jobs.get(job_id).cloned()) - .collect() - } - - pub fn snapshot(&self) -> BridgeJobStoreSnapshot { - let inner = self.inner.read().unwrap_or_else(|e| e.into_inner()); - let mut accepted_jobs = 0usize; - let mut published_jobs = 0usize; - let mut failed_jobs = 0usize; - let mut recovered_failed_jobs = 0usize; - for record in inner.jobs.values() { - match record.status { - BridgeJobStatus::Accepted => accepted_jobs += 1, - BridgeJobStatus::Published => published_jobs += 1, - BridgeJobStatus::Failed => { - failed_jobs += 1; - if record.recovered_after_restart() { - recovered_failed_jobs += 1; - } - } - } - } - BridgeJobStoreSnapshot { - retained_jobs: inner.jobs.len(), - retained_idempotency_keys: inner.idempotency.len(), - accepted_jobs, - published_jobs, - failed_jobs, - recovered_failed_jobs, - capacity: inner.capacity, - } - } - - fn recover_pending_jobs(&self) -> Result<Vec<BridgeJobRecord>, BridgeJobStoreError> { - let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); - let mut recovered_jobs = Vec::new(); - let completed_at_unix = unix_timestamp_now(); - - for record in inner.jobs.values_mut() { - if record.status != BridgeJobStatus::Accepted { - continue; - } - record.status = BridgeJobStatus::Failed; - record.completed_at_unix = Some(completed_at_unix); - record.relay_count = 0; - record.acknowledged_relay_count = 0; - record.required_acknowledged_relay_count = 0; - record.attempt_count = 0; - record.relay_results.clear(); - record.attempt_summaries = vec![BRIDGE_PENDING_RECOVERY_SUMMARY.to_string()]; - record.relay_outcome_summary = BRIDGE_PENDING_RECOVERY_SUMMARY.to_string(); - recovered_jobs.push(record.clone()); - } - - if recovered_jobs.is_empty() { - return Ok(recovered_jobs); - } - - let persisted = persisted_store_from_inner(&inner); - drop(inner); - self.persist_snapshot(&persisted)?; - Ok(recovered_jobs) - } - - fn persist_snapshot( - &self, - snapshot: &PersistedBridgeJobStore, - ) -> Result<(), BridgeJobStoreError> { - let Some(persistence) = &self.persistence else { - return Ok(()); - }; - persistence.persist(snapshot) - } -} - -impl BridgeJobStoreInner { - fn prune(&mut self) { - while self.jobs.len() > self.capacity { - let Some(job_id) = self.order.pop_front() else { - break; - }; - let Some(removed) = self.jobs.remove(&job_id) else { - continue; - }; - if let Some(idempotency_key) = removed.idempotency_key { - if self - .idempotency - .get(&idempotency_key) - .map(|record| record.job_id.as_str()) - == Some(job_id.as_str()) - { - self.idempotency.remove(&idempotency_key); - } - } - } - } -} - -impl BridgeJobStorePersistence { - fn new(path: PathBuf) -> Self { - Self { path } - } - - fn load(&self, capacity: usize) -> Result<BridgeJobStoreInner, BridgeJobStoreError> { - if !self.path.exists() { - return Ok(BridgeJobStoreInner { - jobs: HashMap::new(), - idempotency: HashMap::new(), - order: VecDeque::new(), - capacity, - }); - } - - let payload = std::fs::read_to_string(&self.path)?; - let snapshot: PersistedBridgeJobStore = serde_json::from_str(&payload)?; - if snapshot.version != BRIDGE_JOB_STORE_VERSION { - return Err(BridgeJobStoreError::UnsupportedStateVersion( - snapshot.version, - )); - } - let mut inner = BridgeJobStoreInner { - jobs: snapshot.jobs, - idempotency: snapshot.idempotency, - order: snapshot.order, - capacity, - }; - inner.prune(); - Ok(inner) - } - - fn persist(&self, snapshot: &PersistedBridgeJobStore) -> Result<(), BridgeJobStoreError> { - if let Some(parent) = self.path.parent() { - if !parent.as_os_str().is_empty() { - std::fs::create_dir_all(parent)?; - } - } - - let payload = serde_json::to_vec_pretty(snapshot)?; - let temp_path = temp_store_path(&self.path)?; - std::fs::write(&temp_path, payload)?; - std::fs::rename(&temp_path, &self.path)?; - Ok(()) - } -} - -fn persisted_store_from_inner(inner: &BridgeJobStoreInner) -> PersistedBridgeJobStore { - PersistedBridgeJobStore { - version: BRIDGE_JOB_STORE_VERSION, - jobs: inner.jobs.clone(), - idempotency: inner.idempotency.clone(), - order: inner.order.clone(), - } -} - -fn temp_store_path(path: &Path) -> Result<PathBuf, BridgeJobStoreError> { - let file_name = path - .file_name() - .ok_or_else(|| BridgeJobStoreError::InvalidStatePath(path.to_path_buf()))?; - Ok(path.with_file_name(format!("{}.tmp", file_name.to_string_lossy()))) -} - -pub fn new_publish_job( - command: &str, - job_id: String, - idempotency_key: Option<String>, - signer_mode: String, - event_kind: u32, - event_id: Option<String>, - event_addr: Option<String>, - delivery_policy: BridgeDeliveryPolicy, - delivery_quorum: Option<usize>, -) -> BridgeJobRecord { - BridgeJobRecord { - job_id, - command: command.to_string(), - idempotency_key, - status: BridgeJobStatus::Accepted, - requested_at_unix: unix_timestamp_now(), - completed_at_unix: None, - signer_mode, - event_kind, - event_id, - event_addr, - delivery_policy, - delivery_quorum, - relay_count: 0, - acknowledged_relay_count: 0, - required_acknowledged_relay_count: 0, - attempt_count: 0, - attempt_summaries: Vec::new(), - relay_results: Vec::new(), - relay_outcome_summary: "accepted".to_string(), - } -} - -pub fn new_listing_publish_job( - job_id: String, - idempotency_key: Option<String>, - signer_mode: String, - event_kind: u32, - event_id: Option<String>, - event_addr: String, - delivery_policy: BridgeDeliveryPolicy, - delivery_quorum: Option<usize>, -) -> BridgeJobRecord { - new_publish_job( - "bridge.listing.publish", - job_id, - idempotency_key, - signer_mode, - event_kind, - event_id, - Some(event_addr), - delivery_policy, - delivery_quorum, - ) -} - -pub fn new_order_request_job( - job_id: String, - idempotency_key: Option<String>, - signer_mode: String, - event_kind: u32, - event_id: Option<String>, - listing_addr: String, - delivery_policy: BridgeDeliveryPolicy, - delivery_quorum: Option<usize>, -) -> BridgeJobRecord { - new_publish_job( - "bridge.order.request", - job_id, - idempotency_key, - signer_mode, - event_kind, - event_id, - Some(listing_addr), - delivery_policy, - delivery_quorum, - ) -} - -fn unix_timestamp_now() -> u64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_else(|_| std::time::Duration::from_secs(0)) - .as_secs() -} - -#[cfg(test)] -mod tests { - use crate::app::config::BridgeDeliveryPolicy; - use crate::core::bridge::publish::BridgePublishExecution; - - use super::{ - BRIDGE_PENDING_RECOVERY_SUMMARY, BridgeJobReservation, BridgeJobStatus, BridgeJobStore, - PersistedBridgeJobStore, new_listing_publish_job, new_order_request_job, - }; - - #[test] - fn reserve_returns_existing_job_for_same_idempotency_key() { - let store = BridgeJobStore::new(8); - let first = new_listing_publish_job( - "job-1".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - let second = new_listing_publish_job( - "job-2".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-2".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - - assert!(matches!( - store - .reserve(first.clone(), "fingerprint-1".to_string()) - .expect("reserve"), - BridgeJobReservation::Accepted(_) - )); - let existing = match store - .reserve(second, "fingerprint-1".to_string()) - .expect("same idempotency key") - { - BridgeJobReservation::Duplicate(existing) => existing, - BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), - }; - assert_eq!(existing.job_id, first.job_id); - assert_eq!(existing.status, BridgeJobStatus::Accepted); - } - - #[test] - fn reserve_rejects_conflicting_idempotency_key_reuse() { - let store = BridgeJobStore::new(8); - let first = new_listing_publish_job( - "job-1".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - let second = new_listing_publish_job( - "job-2".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-2".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - - store - .reserve(first, "fingerprint-1".to_string()) - .expect("reserve first"); - let err = store - .reserve(second, "fingerprint-2".to_string()) - .expect_err("conflicting idempotency"); - assert!(err.to_string().contains("conflicts")); - } - - #[test] - fn complete_updates_job_record() { - let store = BridgeJobStore::new(8); - let job = new_listing_publish_job( - "job-1".to_string(), - None, - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - assert!(matches!( - store - .reserve(job, "fingerprint-1".to_string()) - .expect("reserve job"), - BridgeJobReservation::Accepted(_) - )); - - let completed = store - .complete( - "job-1", - Some("event-1".to_string()), - BridgePublishExecution { - published: true, - relay_count: 2, - acknowledged_relay_count: 1, - required_acknowledged_relay_count: 1, - delivery_policy: BridgeDeliveryPolicy::Any, - attempt_count: 1, - relay_outcome_summary: "1/2 relays acknowledged publish".to_string(), - relay_results: Vec::new(), - attempt_summaries: vec!["attempt 1".to_string()], - }, - ) - .expect("complete job") - .expect("record"); - - assert_eq!(completed.status, BridgeJobStatus::Published); - assert_eq!(completed.attempt_count, 1); - assert_eq!(completed.acknowledged_relay_count, 1); - assert!(completed.completed_at_unix.is_some()); - } - - #[test] - fn list_returns_jobs_in_reverse_insertion_order() { - let store = BridgeJobStore::new(8); - for (job_id, fingerprint) in [("job-1", "fingerprint-1"), ("job-2", "fingerprint-2")] { - let job = new_listing_publish_job( - job_id.to_string(), - None, - "embedded_service_identity".to_string(), - 30402, - Some(format!("event-{job_id}")), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - store - .reserve(job, fingerprint.to_string()) - .expect("reserve job"); - } - - let jobs = store.list(); - - assert_eq!(jobs.len(), 2); - assert_eq!(jobs[0].job_id, "job-2"); - assert_eq!(jobs[1].job_id, "job-1"); - } - - #[test] - fn reserve_prunes_oldest_jobs_when_capacity_is_exceeded() { - let store = BridgeJobStore::new(1); - let first = new_listing_publish_job( - "job-1".to_string(), - Some("first".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing-1".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - let second = new_listing_publish_job( - "job-2".to_string(), - Some("second".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-2".to_string()), - "30402:author:listing-2".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - - assert!(matches!( - store - .reserve(first, "fingerprint-1".to_string()) - .expect("first"), - BridgeJobReservation::Accepted(_) - )); - assert!(matches!( - store - .reserve(second, "fingerprint-2".to_string()) - .expect("second"), - BridgeJobReservation::Accepted(_) - )); - - assert!(store.get("job-1").is_none()); - assert!(store.get("job-2").is_some()); - assert_eq!(store.snapshot().retained_jobs, 1); - assert_eq!(store.snapshot().accepted_jobs, 1); - assert_eq!(store.snapshot().published_jobs, 0); - assert_eq!(store.snapshot().failed_jobs, 0); - } - - #[test] - fn order_request_job_uses_order_command_name() { - let job = new_order_request_job( - "job-1".to_string(), - Some("same".to_string()), - "nip46_session:session-1".to_string(), - 5322, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - - assert_eq!(job.command, "bridge.order.request"); - assert_eq!(job.event_addr.as_deref(), Some("30402:author:listing")); - assert_eq!(job.signer_mode, "nip46_session:session-1"); - } - - #[test] - fn load_terminalizes_persisted_accepted_jobs_and_preserves_idempotency() { - let nanos = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("time") - .as_nanos(); - let path = std::env::temp_dir().join(format!("radrootsd-bridge-jobs-{nanos}.json")); - let store = BridgeJobStore::load(path.clone(), 8) - .expect("load empty store") - .store; - let first = new_listing_publish_job( - "job-1".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - assert!(matches!( - store - .reserve(first, "fingerprint-1".to_string()) - .expect("reserve first"), - BridgeJobReservation::Accepted(_) - )); - - let loaded = BridgeJobStore::load(path.clone(), 8).expect("reload store"); - assert_eq!(loaded.recovered_jobs.len(), 1); - assert_eq!(loaded.recovered_jobs[0].job_id, "job-1"); - assert_eq!(loaded.recovered_jobs[0].status, BridgeJobStatus::Failed); - assert_eq!( - loaded.recovered_jobs[0].relay_outcome_summary, - BRIDGE_PENDING_RECOVERY_SUMMARY - ); - - let duplicate = new_listing_publish_job( - "job-2".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-2".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - let existing = match loaded - .store - .reserve(duplicate, "fingerprint-1".to_string()) - .expect("dedupe after reload") - { - BridgeJobReservation::Duplicate(existing) => existing, - BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), - }; - assert_eq!(existing.job_id, "job-1"); - assert_eq!(existing.status, BridgeJobStatus::Failed); - assert!(existing.completed_at_unix.is_some()); - assert_eq!( - existing.relay_outcome_summary, - BRIDGE_PENDING_RECOVERY_SUMMARY - ); - assert!(existing.is_terminal()); - assert!(existing.recovered_after_restart()); - - let snapshot = loaded.store.snapshot(); - assert_eq!(snapshot.accepted_jobs, 0); - assert_eq!(snapshot.published_jobs, 0); - assert_eq!(snapshot.failed_jobs, 1); - assert_eq!(snapshot.recovered_failed_jobs, 1); - - let payload = std::fs::read_to_string(&path).expect("persisted payload"); - let persisted: PersistedBridgeJobStore = - serde_json::from_str(&payload).expect("persisted store"); - assert_eq!(persisted.version, 2); - assert_eq!( - persisted - .jobs - .get("job-1") - .expect("persisted recovered job") - .status, - BridgeJobStatus::Failed - ); - - let _ = std::fs::remove_file(path); - } -} diff --git a/src/core/mod.rs b/src/core/mod.rs @@ -1,5 +1,5 @@ -pub mod bridge; pub mod nip46; +pub mod publish_proxy; pub mod state; pub use state::Radrootsd; diff --git a/src/core/publish_proxy/mod.rs b/src/core/publish_proxy/mod.rs @@ -0,0 +1,922 @@ +use std::fmt; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use radroots_publish_proxy_protocol::{ + PublishDeliveryPolicy, PublishEventRequest, PublishEventResponse, PublishJobStatus, + PublishJobView, PublishRelayOutcome, PublishRelayOutcomeKind, PublishRelayPolicy, + PublishRelaySource, +}; +use rusqlite::types::Type; +use rusqlite::{Connection, OptionalExtension, Row, params}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use thiserror::Error; +use uuid::Uuid; + +use crate::app::config::PublishProxyConfig; + +const TOKEN_PREFIX: &str = "rrd_pp_"; +const TOKEN_HASH_PREFIX: &str = "sha256:"; +const SCHEMA_VERSION: i64 = 1; + +#[derive(Debug, Error)] +pub enum PublishProxyError { + #[error("publish proxy storage error: {0}")] + Sqlite(#[from] rusqlite::Error), + #[error("publish proxy json error: {0}")] + Json(#[from] serde_json::Error), + #[error("publish proxy io error: {0}")] + Io(#[from] std::io::Error), + #[error("invalid publish proxy scope: {0}")] + InvalidScope(String), + #[error("publish proxy idempotency conflict for key `{0}`")] + IdempotencyConflict(String), +} + +#[derive(Clone)] +pub struct PublishProxy { + pub config: PublishProxyConfig, + pub store: PublishProxyStore, +} + +impl PublishProxy { + pub fn open(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { + let store = PublishProxyStore::open(config.database_path.clone())?; + Ok(Self { config, store }) + } + + pub fn memory(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { + let store = PublishProxyStore::memory()?; + Ok(Self { config, store }) + } +} + +#[derive(Clone)] +pub struct PublishProxyStore { + inner: Arc<Mutex<Connection>>, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PublishJobVisibility { + Own, + Admin, +} + +impl FromStr for PublishJobVisibility { + type Err = PublishProxyError; + + fn from_str(value: &str) -> Result<Self, Self::Err> { + match value { + "own" => Ok(Self::Own), + "admin" => Ok(Self::Admin), + other => Err(PublishProxyError::InvalidScope(format!( + "unknown job visibility `{other}`" + ))), + } + } +} + +impl fmt::Display for PublishJobVisibility { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Own => f.write_str("own"), + Self::Admin => f.write_str("admin"), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PublishPrincipalInit { + pub label: String, + pub token_hash: String, + pub allowed_pubkeys: Vec<String>, + pub allowed_kinds: Vec<u32>, + pub allowed_relay_policies: Vec<PublishRelayPolicy>, + pub allow_request_relays: bool, + pub job_visibility: PublishJobVisibility, + pub expires_at_unix: Option<i64>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PublishPrincipal { + pub principal_id: String, + pub label: String, + pub allowed_pubkeys: Vec<String>, + pub allowed_kinds: Vec<u32>, + pub allowed_relay_policies: Vec<PublishRelayPolicy>, + pub allow_request_relays: bool, + pub job_visibility: PublishJobVisibility, + pub expires_at_unix: Option<i64>, +} + +impl PublishPrincipal { + pub fn allows_event(&self, request: &PublishEventRequest) -> Result<(), PublishProxyError> { + ensure_lower_hex("pubkey", request.event.pubkey.as_str(), 64)?; + if !self + .allowed_pubkeys + .iter() + .any(|pubkey| pubkey == &request.event.pubkey) + { + return Err(PublishProxyError::InvalidScope( + "principal is not allowed to publish for event pubkey".to_owned(), + )); + } + if !self.allowed_kinds.contains(&request.event.kind) { + return Err(PublishProxyError::InvalidScope( + "principal is not allowed to publish event kind".to_owned(), + )); + } + if !self.allowed_relay_policies.contains(&request.relay_policy) { + return Err(PublishProxyError::InvalidScope( + "principal is not allowed to use requested relay policy".to_owned(), + )); + } + if !self.allow_request_relays && !request.relays.is_empty() { + return Err(PublishProxyError::InvalidScope( + "principal is not allowed to provide request relays".to_owned(), + )); + } + Ok(()) + } + + fn can_read_job(&self, principal_id: &str) -> bool { + self.job_visibility == PublishJobVisibility::Admin || self.principal_id == principal_id + } +} + +#[derive(Debug, Clone)] +pub struct PublishJobInsert { + pub principal_id: String, + pub idempotency_key: Option<String>, + pub request: PublishEventRequest, +} + +impl PublishProxyStore { + pub fn open(path: PathBuf) -> Result<Self, PublishProxyError> { + if let Some(parent) = path + .parent() + .filter(|parent| !parent.as_os_str().is_empty()) + { + std::fs::create_dir_all(parent)?; + } + let connection = Connection::open(path)?; + Self::from_connection(connection) + } + + pub fn memory() -> Result<Self, PublishProxyError> { + Self::from_connection(Connection::open_in_memory()?) + } + + fn from_connection(connection: Connection) -> Result<Self, PublishProxyError> { + connection.execute_batch( + r#" + PRAGMA foreign_keys = ON; + CREATE TABLE IF NOT EXISTS publish_proxy_principals ( + principal_id TEXT PRIMARY KEY NOT NULL, + label TEXT NOT NULL, + token_hash TEXT NOT NULL UNIQUE, + allowed_pubkeys_json TEXT NOT NULL, + allowed_kinds_json TEXT NOT NULL, + allowed_relay_policies_json TEXT NOT NULL, + allow_request_relays INTEGER NOT NULL, + job_visibility TEXT NOT NULL, + expires_at_unix INTEGER, + revoked_at_unix INTEGER, + created_at_unix INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS publish_proxy_jobs ( + job_id TEXT PRIMARY KEY NOT NULL, + principal_id TEXT NOT NULL, + idempotency_key TEXT, + status TEXT NOT NULL, + event_id TEXT NOT NULL, + event_pubkey TEXT NOT NULL, + event_kind INTEGER NOT NULL, + relay_policy_json TEXT NOT NULL, + delivery_policy_json TEXT NOT NULL, + requested_relay_count INTEGER NOT NULL, + request_json TEXT NOT NULL, + requested_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL, + completed_at_ms INTEGER, + last_error TEXT, + FOREIGN KEY(principal_id) REFERENCES publish_proxy_principals(principal_id) + ); + CREATE UNIQUE INDEX IF NOT EXISTS publish_proxy_jobs_principal_idempotency_idx + ON publish_proxy_jobs(principal_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; + CREATE TABLE IF NOT EXISTS publish_proxy_relay_results ( + job_id TEXT NOT NULL, + relay_url TEXT NOT NULL, + source TEXT NOT NULL, + attempted INTEGER NOT NULL, + outcome_kind TEXT NOT NULL, + message TEXT, + latency_ms INTEGER, + updated_at_ms INTEGER NOT NULL, + PRIMARY KEY(job_id, relay_url), + FOREIGN KEY(job_id) REFERENCES publish_proxy_jobs(job_id) + ); + CREATE TABLE IF NOT EXISTS publish_proxy_relay_list_cache ( + pubkey TEXT PRIMARY KEY NOT NULL, + relays_json TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL + ); + "#, + )?; + connection.pragma_update(None, "user_version", SCHEMA_VERSION)?; + Ok(Self { + inner: Arc::new(Mutex::new(connection)), + }) + } + + pub fn create_principal( + &self, + input: PublishPrincipalInit, + ) -> Result<PublishPrincipal, PublishProxyError> { + validate_principal_init(&input)?; + let principal_id = Uuid::new_v4().to_string(); + let now = current_unix_secs(); + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + connection.execute( + r#" + INSERT INTO publish_proxy_principals ( + principal_id, + label, + token_hash, + allowed_pubkeys_json, + allowed_kinds_json, + allowed_relay_policies_json, + allow_request_relays, + job_visibility, + expires_at_unix, + revoked_at_unix, + created_at_unix + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, NULL, ?10) + "#, + params![ + principal_id, + input.label.trim(), + input.token_hash, + serde_json::to_string(&input.allowed_pubkeys)?, + serde_json::to_string(&input.allowed_kinds)?, + serde_json::to_string(&input.allowed_relay_policies)?, + input.allow_request_relays, + input.job_visibility.to_string(), + input.expires_at_unix, + now, + ], + )?; + drop(connection); + self.principal_by_id(principal_id.as_str())? + .ok_or_else(|| PublishProxyError::InvalidScope("created principal missing".to_owned())) + } + + pub fn principal_for_token_hash( + &self, + token_hash: &str, + ) -> Result<Option<PublishPrincipal>, PublishProxyError> { + let now = current_unix_secs(); + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let principal = connection + .query_row( + r#" + SELECT + principal_id, + label, + allowed_pubkeys_json, + allowed_kinds_json, + allowed_relay_policies_json, + allow_request_relays, + job_visibility, + expires_at_unix + FROM publish_proxy_principals + WHERE token_hash = ?1 + AND revoked_at_unix IS NULL + AND (expires_at_unix IS NULL OR expires_at_unix > ?2) + "#, + params![token_hash, now], + principal_from_row, + ) + .optional()?; + Ok(principal) + } + + pub fn principal_by_id( + &self, + principal_id: &str, + ) -> Result<Option<PublishPrincipal>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let principal = connection + .query_row( + r#" + SELECT + principal_id, + label, + allowed_pubkeys_json, + allowed_kinds_json, + allowed_relay_policies_json, + allow_request_relays, + job_visibility, + expires_at_unix + FROM publish_proxy_principals + WHERE principal_id = ?1 + "#, + params![principal_id], + principal_from_row, + ) + .optional()?; + Ok(principal) + } + + pub fn record_publish_job( + &self, + insert: PublishJobInsert, + ) -> Result<PublishEventResponse, PublishProxyError> { + if let Some(idempotency_key) = insert.idempotency_key.as_deref() { + if let Some(existing) = + self.job_for_principal_id_and_key(insert.principal_id.as_str(), idempotency_key)? + { + return Ok(PublishEventResponse { + deduplicated: true, + job: existing, + }); + } + } + + let job_id = Uuid::new_v4().to_string(); + let now = current_unix_millis(); + let request_json = serde_json::to_string(&insert.request)?; + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let insert_result = connection.execute( + r#" + INSERT INTO publish_proxy_jobs ( + job_id, + principal_id, + idempotency_key, + status, + event_id, + event_pubkey, + event_kind, + relay_policy_json, + delivery_policy_json, + requested_relay_count, + request_json, + requested_at_ms, + updated_at_ms + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13) + "#, + params![ + job_id, + insert.principal_id, + insert.idempotency_key, + serde_json::to_string(&PublishJobStatus::Accepted)?, + insert.request.event.id, + insert.request.event.pubkey, + insert.request.event.kind, + serde_json::to_string(&insert.request.relay_policy)?, + serde_json::to_string(&insert.request.delivery_policy)?, + insert.request.relays.len(), + request_json, + now, + now, + ], + ); + match insert_result { + Ok(_) => {} + Err(rusqlite::Error::SqliteFailure(error, _)) + if error.code == rusqlite::ErrorCode::ConstraintViolation => + { + return Err(PublishProxyError::IdempotencyConflict( + "idempotency key conflicts with an existing publish job".to_owned(), + )); + } + Err(error) => return Err(error.into()), + } + drop(connection); + let job = self + .job_by_id_for_principal_id(job_id.as_str(), insert.principal_id.as_str())? + .ok_or_else(|| PublishProxyError::InvalidScope("created job missing".to_owned()))?; + Ok(PublishEventResponse { + deduplicated: false, + job, + }) + } + + pub fn job_by_id_for_principal( + &self, + job_id: &str, + principal: &PublishPrincipal, + ) -> Result<Option<PublishJobView>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let sql = job_select_sql("WHERE job_id = ?1"); + let row = connection + .query_row(sql.as_str(), params![job_id], job_from_row) + .optional()?; + drop(connection); + let Some(mut job) = row else { + return Ok(None); + }; + if !principal.can_read_job(job.principal_id.as_str()) { + return Ok(None); + } + job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; + finalize_job_view(&mut job.view); + Ok(Some(job.view)) + } + + pub fn list_jobs_for_principal( + &self, + principal: &PublishPrincipal, + limit: usize, + ) -> Result<Vec<PublishJobView>, PublishProxyError> { + let limit = i64::try_from(limit.clamp(1, 200)).unwrap_or(200); + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let sql = if principal.job_visibility == PublishJobVisibility::Admin { + job_select_sql("ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?1") + } else { + job_select_sql( + "WHERE principal_id = ?1 ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?2", + ) + }; + let mut stmt = connection.prepare(sql.as_str())?; + let rows = if principal.job_visibility == PublishJobVisibility::Admin { + stmt.query_map(params![limit], job_from_row)? + .collect::<Result<Vec<_>, _>>()? + } else { + stmt.query_map(params![principal.principal_id, limit], job_from_row)? + .collect::<Result<Vec<_>, _>>()? + }; + drop(stmt); + drop(connection); + + rows.into_iter() + .map(|mut row| { + row.view.relays = self.relay_outcomes(row.view.job_id.as_str())?; + finalize_job_view(&mut row.view); + Ok(row.view) + }) + .collect() + } + + fn job_for_principal_id_and_key( + &self, + principal_id: &str, + idempotency_key: &str, + ) -> Result<Option<PublishJobView>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let sql = job_select_sql("WHERE principal_id = ?1 AND idempotency_key = ?2"); + let row = connection + .query_row( + sql.as_str(), + params![principal_id, idempotency_key], + job_from_row, + ) + .optional()?; + drop(connection); + let Some(mut job) = row else { + return Ok(None); + }; + job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; + finalize_job_view(&mut job.view); + Ok(Some(job.view)) + } + + fn job_by_id_for_principal_id( + &self, + job_id: &str, + principal_id: &str, + ) -> Result<Option<PublishJobView>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let sql = job_select_sql("WHERE job_id = ?1 AND principal_id = ?2"); + let row = connection + .query_row(sql.as_str(), params![job_id, principal_id], job_from_row) + .optional()?; + drop(connection); + let Some(mut job) = row else { + return Ok(None); + }; + job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; + finalize_job_view(&mut job.view); + Ok(Some(job.view)) + } + + fn relay_outcomes(&self, job_id: &str) -> Result<Vec<PublishRelayOutcome>, PublishProxyError> { + let connection = self + .inner + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut stmt = connection.prepare( + r#" + SELECT relay_url, source, attempted, outcome_kind, message, latency_ms + FROM publish_proxy_relay_results + WHERE job_id = ?1 + ORDER BY relay_url + "#, + )?; + let outcomes = stmt + .query_map(params![job_id], relay_outcome_from_row)? + .collect::<Result<Vec<_>, _>>()?; + Ok(outcomes) + } +} + +struct PublishJobRow { + principal_id: String, + view: PublishJobView, +} + +fn job_select_sql(tail: &str) -> String { + format!( + r#" + SELECT + job_id, + principal_id, + status, + event_id, + event_pubkey, + event_kind, + relay_policy_json, + delivery_policy_json, + requested_relay_count, + requested_at_ms, + completed_at_ms, + last_error + FROM publish_proxy_jobs + {tail} + "# + ) +} + +fn principal_from_row(row: &Row<'_>) -> Result<PublishPrincipal, rusqlite::Error> { + let visibility: String = row.get(6)?; + Ok(PublishPrincipal { + principal_id: row.get(0)?, + label: row.get(1)?, + allowed_pubkeys: json_column(row, 2)?, + allowed_kinds: json_column(row, 3)?, + allowed_relay_policies: json_column(row, 4)?, + allow_request_relays: row.get(5)?, + job_visibility: PublishJobVisibility::from_str(visibility.as_str()) + .map_err(|error| conversion_error(6, error))?, + expires_at_unix: row.get(7)?, + }) +} + +fn job_from_row(row: &Row<'_>) -> Result<PublishJobRow, rusqlite::Error> { + let status: PublishJobStatus = json_text(row, 2)?; + let relay_policy: PublishRelayPolicy = json_text(row, 6)?; + let delivery_policy: PublishDeliveryPolicy = json_text(row, 7)?; + let relay_count: i64 = row.get(8)?; + Ok(PublishJobRow { + principal_id: row.get(1)?, + view: PublishJobView { + job_id: row.get(0)?, + status, + terminal: false, + delivery_satisfied: false, + event_id: row.get(3)?, + pubkey: row.get(4)?, + event_kind: row.get::<_, i64>(5)? as u32, + relay_policy, + delivery_policy, + relay_count: usize::try_from(relay_count).unwrap_or(0), + acknowledged_count: 0, + retryable_count: 0, + terminal_count: 0, + requested_at_ms: row.get(9)?, + completed_at_ms: row.get(10)?, + last_error: row.get(11)?, + relays: Vec::new(), + }, + }) +} + +fn relay_outcome_from_row(row: &Row<'_>) -> Result<PublishRelayOutcome, rusqlite::Error> { + let source: PublishRelaySource = json_text(row, 1)?; + let outcome_kind: PublishRelayOutcomeKind = json_text(row, 3)?; + Ok(PublishRelayOutcome { + relay_url: row.get(0)?, + source, + attempted: row.get(2)?, + outcome_kind, + message: row.get(4)?, + latency_ms: row + .get::<_, Option<i64>>(5)? + .map(|latency| u64::try_from(latency).unwrap_or(0)), + }) +} + +fn finalize_job_view(view: &mut PublishJobView) { + view.acknowledged_count = view + .relays + .iter() + .filter(|relay| relay.outcome_kind.counts_toward_quorum()) + .count(); + view.retryable_count = view + .relays + .iter() + .filter(|relay| relay.outcome_kind.is_retryable()) + .count(); + view.terminal_count = view + .relays + .iter() + .filter(|relay| relay.outcome_kind.is_terminal_failure()) + .count(); + view.terminal = matches!( + view.status, + PublishJobStatus::DeliverySatisfied + | PublishJobStatus::DeliveryUnsatisfiedTerminal + | PublishJobStatus::Rejected + ); + view.delivery_satisfied = view.status == PublishJobStatus::DeliverySatisfied; +} + +fn validate_principal_init(input: &PublishPrincipalInit) -> Result<(), PublishProxyError> { + if input.label.trim().is_empty() { + return Err(PublishProxyError::InvalidScope( + "principal label must not be empty".to_owned(), + )); + } + if !input.token_hash.starts_with(TOKEN_HASH_PREFIX) { + return Err(PublishProxyError::InvalidScope( + "principal token hash must use sha256 prefix".to_owned(), + )); + } + if input.allowed_pubkeys.is_empty() { + return Err(PublishProxyError::InvalidScope( + "principal must include at least one allowed pubkey".to_owned(), + )); + } + for pubkey in &input.allowed_pubkeys { + ensure_lower_hex("allowed_pubkey", pubkey, 64)?; + } + if input.allowed_kinds.is_empty() { + return Err(PublishProxyError::InvalidScope( + "principal must include at least one allowed kind".to_owned(), + )); + } + if input + .allowed_kinds + .iter() + .any(|kind| *kind > u16::MAX as u32) + { + return Err(PublishProxyError::InvalidScope( + "allowed kind exceeds publish proxy range".to_owned(), + )); + } + if input.allowed_relay_policies.is_empty() { + return Err(PublishProxyError::InvalidScope( + "principal must include at least one allowed relay policy".to_owned(), + )); + } + Ok(()) +} + +pub fn generate_bearer_token() -> String { + let bytes: [u8; 32] = rand::random(); + format!("{TOKEN_PREFIX}{}", hex_lower(&bytes)) +} + +pub fn hash_bearer_token(token: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(token.as_bytes()); + format!("{TOKEN_HASH_PREFIX}{}", hex_lower(&hasher.finalize())) +} + +fn hex_lower(bytes: &[u8]) -> String { + let mut output = String::with_capacity(bytes.len() * 2); + for byte in bytes { + use std::fmt::Write; + let _ = write!(&mut output, "{byte:02x}"); + } + output +} + +pub fn parse_relay_policy(value: &str) -> Result<PublishRelayPolicy, PublishProxyError> { + match value { + "explicit_only" => Ok(PublishRelayPolicy::ExplicitOnly), + "request_then_author_write_then_daemon_default" => { + Ok(PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault) + } + "author_write_then_daemon_default" => Ok(PublishRelayPolicy::AuthorWriteThenDaemonDefault), + "daemon_default_only" => Ok(PublishRelayPolicy::DaemonDefaultOnly), + other => Err(PublishProxyError::InvalidScope(format!( + "unknown relay policy `{other}`" + ))), + } +} + +pub fn write_token_file(path: &Path, token: &str) -> Result<(), PublishProxyError> { + if let Some(parent) = path + .parent() + .filter(|parent| !parent.as_os_str().is_empty()) + { + std::fs::create_dir_all(parent)?; + } + let mut options = std::fs::OpenOptions::new(); + options.write(true).create_new(true); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + options.mode(0o600); + } + use std::io::Write; + let mut file = options.open(path)?; + file.write_all(token.as_bytes())?; + file.write_all(b"\n")?; + Ok(()) +} + +fn ensure_lower_hex( + field: &str, + value: &str, + expected_len: usize, +) -> Result<(), PublishProxyError> { + if value.len() == expected_len + && value + .bytes() + .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f')) + { + Ok(()) + } else { + Err(PublishProxyError::InvalidScope(format!( + "{field} must be {expected_len} lowercase hex characters" + ))) + } +} + +fn json_column<T: for<'de> Deserialize<'de>>( + row: &Row<'_>, + index: usize, +) -> Result<T, rusqlite::Error> { + let value: String = row.get(index)?; + serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error)) +} + +fn json_text<T: for<'de> Deserialize<'de>>( + row: &Row<'_>, + index: usize, +) -> Result<T, rusqlite::Error> { + let value: String = row.get(index)?; + serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error)) +} + +fn conversion_error<E>(index: usize, error: E) -> rusqlite::Error +where + E: std::error::Error + Send + Sync + 'static, +{ + rusqlite::Error::FromSqlConversionFailure(index, Type::Text, Box::new(error)) +} + +fn current_unix_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs() as i64) + .unwrap_or_default() +} + +fn current_unix_millis() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis() as i64) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::{ + PublishJobInsert, PublishJobVisibility, PublishPrincipalInit, PublishProxyStore, + generate_bearer_token, hash_bearer_token, parse_relay_policy, + }; + use radroots_publish_proxy_protocol::{ + PublishDeliveryPolicy, PublishEventRequest, PublishRelayPolicy, SignedNostrEventWire, + }; + + fn event(pubkey: &str, kind: u32) -> SignedNostrEventWire { + SignedNostrEventWire { + id: "0".repeat(64), + pubkey: pubkey.to_owned(), + created_at: 1_700_000_000, + kind, + tags: vec![vec!["d".to_owned(), "listing-1".to_owned()]], + content: "{}".to_owned(), + sig: "1".repeat(128), + } + } + + fn request(pubkey: &str, kind: u32) -> PublishEventRequest { + PublishEventRequest { + event: event(pubkey, kind), + relays: Vec::new(), + relay_policy: PublishRelayPolicy::DaemonDefaultOnly, + delivery_policy: PublishDeliveryPolicy::Any, + idempotency_key: Some("idem-1".to_owned()), + timeout_ms: None, + } + } + + #[test] + fn token_generation_and_hashing_do_not_store_plaintext() { + let token = generate_bearer_token(); + assert!(token.starts_with("rrd_pp_")); + let hash = hash_bearer_token(token.as_str()); + assert!(hash.starts_with("sha256:")); + assert!(!hash.contains(token.as_str())); + } + + #[test] + fn relay_policy_parser_accepts_contract_values() { + assert_eq!( + parse_relay_policy("explicit_only").expect("policy"), + PublishRelayPolicy::ExplicitOnly + ); + assert!(parse_relay_policy("unknown").is_err()); + } + + #[test] + fn storage_authenticates_hashed_tokens_and_scopes_jobs() { + let store = PublishProxyStore::memory().expect("store"); + let token = generate_bearer_token(); + let token_hash = hash_bearer_token(token.as_str()); + let principal = store + .create_principal(PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: token_hash.clone(), + allowed_pubkeys: vec!["a".repeat(64)], + allowed_kinds: vec![30_402], + allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], + allow_request_relays: false, + job_visibility: PublishJobVisibility::Own, + expires_at_unix: None, + }) + .expect("principal"); + assert_eq!( + store + .principal_for_token_hash(token_hash.as_str()) + .expect("lookup") + .expect("principal") + .principal_id, + principal.principal_id + ); + let denied = request("b".repeat(64).as_str(), 30_402); + assert!(principal.allows_event(&denied).is_err()); + + let accepted = request("a".repeat(64).as_str(), 30_402); + principal.allows_event(&accepted).expect("scope"); + let response = store + .record_publish_job(PublishJobInsert { + principal_id: principal.principal_id.clone(), + idempotency_key: Some("idem-1".to_owned()), + request: accepted.clone(), + }) + .expect("record job"); + assert!(!response.deduplicated); + let duplicate = store + .record_publish_job(PublishJobInsert { + principal_id: principal.principal_id.clone(), + idempotency_key: Some("idem-1".to_owned()), + request: accepted, + }) + .expect("dedupe"); + assert!(duplicate.deduplicated); + assert_eq!(duplicate.job.job_id, response.job.job_id); + assert_eq!( + store + .list_jobs_for_principal(&principal, 50) + .expect("jobs") + .len(), + 1 + ); + } +} diff --git a/src/core/state.rs b/src/core/state.rs @@ -3,9 +3,9 @@ use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata, RadrootsNostrPublicKey, }; -use radroots_nostr_signer::prelude::RadrootsNostrEmbeddedSignerBackend; -use crate::app::config::{BridgeConfig, Nip46Config}; +use crate::app::config::{Nip46Config, PublishProxyConfig}; +use crate::core::publish_proxy::PublishProxy; #[derive(Clone)] pub struct Radrootsd { @@ -14,9 +14,7 @@ pub struct Radrootsd { pub pubkey: RadrootsNostrPublicKey, pub metadata: RadrootsNostrMetadata, pub info: serde_json::Value, - pub bridge_signer: RadrootsNostrEmbeddedSignerBackend, - pub(crate) bridge_jobs: crate::core::bridge::store::BridgeJobStore, - pub bridge_config: BridgeConfig, + pub publish_proxy: PublishProxy, pub(crate) nip46_sessions: crate::core::nip46::session::Nip46SessionStore, pub nip46_config: Nip46Config, } @@ -25,7 +23,7 @@ impl Radrootsd { pub fn new( identity: RadrootsIdentity, metadata: RadrootsNostrMetadata, - bridge_config: BridgeConfig, + publish_proxy_config: PublishProxyConfig, nip46_config: Nip46Config, ) -> Result<Self> { let keys: RadrootsNostrKeys = identity.keys().clone(); @@ -35,24 +33,10 @@ impl Radrootsd { "version": env!("CARGO_PKG_VERSION"), "build": option_env!("GIT_HASH").unwrap_or("unknown"), }); - let bridge_signer = RadrootsNostrEmbeddedSignerBackend::new_in_memory(identity)?; - #[cfg(not(test))] - let bridge_jobs = crate::core::bridge::store::BridgeJobStore::load( - bridge_config.state_path.clone(), - bridge_config.job_status_retention, - )?; - #[cfg(not(test))] - if !bridge_jobs.recovered_jobs.is_empty() { - tracing::warn!( - recovered_bridge_jobs = bridge_jobs.recovered_jobs.len(), - "terminalized bridge jobs left accepted across restart" - ); - } #[cfg(test)] - let bridge_jobs = - crate::core::bridge::store::BridgeJobStore::new(bridge_config.job_status_retention); + let publish_proxy = PublishProxy::memory(publish_proxy_config)?; #[cfg(not(test))] - let bridge_jobs = bridge_jobs.store; + let publish_proxy = PublishProxy::open(publish_proxy_config)?; let nip46_sessions = crate::core::nip46::session::Nip46SessionStore::new(); Ok(Self { @@ -61,9 +45,7 @@ impl Radrootsd { pubkey, metadata, info, - bridge_signer, - bridge_jobs, - bridge_config, + publish_proxy, nip46_sessions, nip46_config, }) @@ -73,42 +55,34 @@ impl Radrootsd { #[cfg(test)] mod tests { use super::Radrootsd; - use crate::app::config::{BridgeConfig, Nip46Config}; + use crate::app::config::{Nip46Config, PublishProxyConfig}; use radroots_identity::RadrootsIdentity; use radroots_nostr::prelude::RadrootsNostrMetadata; - use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; #[test] fn new_sets_core_fields() { let identity = RadrootsIdentity::generate(); let metadata: RadrootsNostrMetadata = serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let bridge_cfg = BridgeConfig::default(); + let publish_proxy_cfg = PublishProxyConfig::default(); let cfg = Nip46Config::default(); let state = Radrootsd::new( identity.clone(), metadata.clone(), - bridge_cfg.clone(), + publish_proxy_cfg.clone(), cfg.clone(), ) .expect("state"); assert_eq!(state.pubkey, identity.public_key()); assert_eq!(state.metadata, metadata); - assert_eq!(state.bridge_config.enabled, bridge_cfg.enabled); assert_eq!( - state.bridge_jobs.snapshot().capacity, - bridge_cfg.job_status_retention + state.publish_proxy.config.enabled, + publish_proxy_cfg.enabled ); assert_eq!(state.nip46_config.session_ttl_secs, cfg.session_ttl_secs); assert_eq!(state.nip46_config.perms, cfg.perms); assert_eq!(state.info["version"], env!("CARGO_PKG_VERSION")); assert_eq!(state.info["build"], "unknown"); - let signer_identity = state - .bridge_signer - .signer_identity() - .expect("bridge signer identity") - .expect("present"); - assert_eq!(signer_identity.public_key_hex, state.pubkey.to_hex()); } } diff --git a/src/transport/jsonrpc/auth.rs b/src/transport/jsonrpc/auth.rs @@ -2,27 +2,26 @@ use jsonrpsee::core::server::Extensions; +use crate::core::publish_proxy::{PublishPrincipal, PublishProxyStore, hash_bearer_token}; + use super::RpcError; -pub(crate) const BRIDGE_AUTH_MODE: &str = "bearer_token"; +#[cfg(test)] +pub(crate) const PUBLISH_PROXY_AUTH_MODE: &str = "scoped_bearer_token"; -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) enum BridgeAuthorization { - Disabled, - Authorized, +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum PublishProxyAuthorization { + Authorized(PublishPrincipal), Missing, Invalid, } -pub(crate) fn authorize_bridge_request( +pub(crate) fn authorize_publish_proxy_request( authorization_header: Option<&str>, - expected_token: Option<&str>, -) -> BridgeAuthorization { - let Some(expected_token) = expected_token else { - return BridgeAuthorization::Disabled; - }; + store: &PublishProxyStore, +) -> PublishProxyAuthorization { let Some(authorization_header) = authorization_header else { - return BridgeAuthorization::Missing; + return PublishProxyAuthorization::Missing; }; let mut parts = authorization_header.split_whitespace(); @@ -30,28 +29,29 @@ pub(crate) fn authorize_bridge_request( let token = parts.next().unwrap_or_default(); if !scheme.eq_ignore_ascii_case("bearer") || token.is_empty() || parts.next().is_some() { - return BridgeAuthorization::Invalid; + return PublishProxyAuthorization::Invalid; } - if token == expected_token { - BridgeAuthorization::Authorized - } else { - BridgeAuthorization::Invalid + match store.principal_for_token_hash(hash_bearer_token(token).as_str()) { + Ok(Some(principal)) => PublishProxyAuthorization::Authorized(principal), + Ok(None) | Err(_) => PublishProxyAuthorization::Invalid, } } -pub(crate) fn require_bridge_auth(extensions: &Extensions) -> Result<(), RpcError> { +pub(crate) fn require_publish_principal( + extensions: &Extensions, +) -> Result<PublishPrincipal, RpcError> { match extensions - .get::<BridgeAuthorization>() - .copied() - .unwrap_or(BridgeAuthorization::Missing) + .get::<PublishProxyAuthorization>() + .cloned() + .unwrap_or(PublishProxyAuthorization::Missing) { - BridgeAuthorization::Authorized => Ok(()), - BridgeAuthorization::Disabled | BridgeAuthorization::Missing => Err( - RpcError::Unauthorized("bridge bearer token required".to_string()), - ), - BridgeAuthorization::Invalid => Err(RpcError::Unauthorized( - "invalid bridge bearer token".to_string(), + PublishProxyAuthorization::Authorized(principal) => Ok(principal), + PublishProxyAuthorization::Missing => Err(RpcError::Unauthorized( + "publish proxy bearer token required".to_string(), + )), + PublishProxyAuthorization::Invalid => Err(RpcError::Unauthorized( + "invalid publish proxy bearer token".to_string(), )), } } @@ -59,50 +59,75 @@ pub(crate) fn require_bridge_auth(extensions: &Extensions) -> Result<(), RpcErro #[cfg(test)] mod tests { use jsonrpsee::core::server::Extensions; + use radroots_publish_proxy_protocol::PublishRelayPolicy; use super::{ - BRIDGE_AUTH_MODE, BridgeAuthorization, authorize_bridge_request, require_bridge_auth, + PUBLISH_PROXY_AUTH_MODE, PublishProxyAuthorization, authorize_publish_proxy_request, + require_publish_principal, + }; + use crate::core::publish_proxy::{ + PublishJobVisibility, PublishPrincipalInit, PublishProxyStore, generate_bearer_token, + hash_bearer_token, }; - #[test] - fn authorize_bridge_request_returns_disabled_without_configured_token() { - let auth = authorize_bridge_request(None, None); - assert_eq!(auth, BridgeAuthorization::Disabled); + fn store_with_token() -> (PublishProxyStore, String) { + let store = PublishProxyStore::memory().expect("store"); + let token = generate_bearer_token(); + store + .create_principal(PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: hash_bearer_token(token.as_str()), + allowed_pubkeys: vec!["a".repeat(64)], + allowed_kinds: vec![30_402], + allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], + allow_request_relays: false, + job_visibility: PublishJobVisibility::Own, + expires_at_unix: None, + }) + .expect("principal"); + (store, token) } #[test] - fn authorize_bridge_request_accepts_matching_bearer_token() { - let auth = authorize_bridge_request(Some("Bearer secret"), Some("secret")); - assert_eq!(auth, BridgeAuthorization::Authorized); - assert_eq!(BRIDGE_AUTH_MODE, "bearer_token"); + fn publish_proxy_auth_accepts_matching_bearer_token() { + let (store, token) = store_with_token(); + let header = format!("Bearer {token}"); + let auth = authorize_publish_proxy_request(Some(header.as_str()), &store); + assert!(matches!(auth, PublishProxyAuthorization::Authorized(_))); + assert_eq!(PUBLISH_PROXY_AUTH_MODE, "scoped_bearer_token"); } #[test] - fn authorize_bridge_request_rejects_invalid_headers() { + fn publish_proxy_auth_rejects_missing_and_invalid_headers() { + let (store, _token) = store_with_token(); assert_eq!( - authorize_bridge_request(Some("Basic secret"), Some("secret")), - BridgeAuthorization::Invalid + authorize_publish_proxy_request(None, &store), + PublishProxyAuthorization::Missing ); assert_eq!( - authorize_bridge_request(Some("Bearer wrong"), Some("secret")), - BridgeAuthorization::Invalid + authorize_publish_proxy_request(Some("Basic secret"), &store), + PublishProxyAuthorization::Invalid ); assert_eq!( - authorize_bridge_request(None, Some("secret")), - BridgeAuthorization::Missing + authorize_publish_proxy_request(Some("Bearer wrong"), &store), + PublishProxyAuthorization::Invalid ); } #[test] - fn require_bridge_auth_accepts_authorized_extensions() { + fn require_publish_principal_reads_authorized_extensions() { + let (store, token) = store_with_token(); + let header = format!("Bearer {token}"); + let auth = authorize_publish_proxy_request(Some(header.as_str()), &store); let mut extensions = Extensions::new(); - extensions.insert(BridgeAuthorization::Authorized); - require_bridge_auth(&extensions).expect("authorized"); + extensions.insert(auth); + require_publish_principal(&extensions).expect("authorized"); } #[test] - fn require_bridge_auth_rejects_missing_extensions() { - let err = require_bridge_auth(&Extensions::new()).expect_err("missing auth should fail"); + fn require_publish_principal_rejects_missing_extensions() { + let err = + require_publish_principal(&Extensions::new()).expect_err("missing auth should fail"); assert!(err.to_string().contains("required")); } } diff --git a/src/transport/jsonrpc/methods/bridge/farm_publish.rs b/src/transport/jsonrpc/methods/bridge/farm_publish.rs @@ -1,143 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use radroots_events::{farm::RadrootsFarm, kinds::KIND_FARM}; -use radroots_events_codec::farm::encode::to_wire_parts_with_kind; -use radroots_nostr::prelude::radroots_nostr_build_event; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::core::bridge::publish::{ - BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, -}; -use crate::core::bridge::store::new_publish_job; -use crate::core::nip46::session::Nip46SessionAuthority; -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, - normalize_idempotency_key, reserve_bridge_job, resolve_actor_bridge_signer, - sign_bridge_event_builder, -}; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -#[derive(Debug, Deserialize)] -struct BridgeFarmPublishParams { - farm: RadrootsFarm, - #[serde(default)] - kind: Option<u32>, - #[serde(default)] - signer_session_id: Option<String>, - #[serde(default)] - signer_authority: Option<Nip46SessionAuthority>, - #[serde(default)] - idempotency_key: Option<String>, -} - -#[derive(Debug, Clone, Serialize)] -struct CanonicalBridgeFarmPublishRequest { - kind: u32, - farm: RadrootsFarm, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.farm.publish"); - m.register_async_method( - "bridge.farm.publish", - |params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let params: BridgeFarmPublishParams = params - .parse() - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let response = publish_farm(ctx.as_ref().clone(), params).await?; - Ok::<BridgePublishResponse, RpcError>(response) - }, - )?; - Ok(()) -} - -async fn publish_farm( - ctx: RpcContext, - params: BridgeFarmPublishParams, -) -> Result<BridgePublishResponse, RpcError> { - ensure_bridge_enabled(&ctx)?; - let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let kind = params.kind.unwrap_or(KIND_FARM); - if kind != KIND_FARM { - return Err(RpcError::InvalidParams(format!( - "farm publish only supports kind {KIND_FARM}, got {kind}" - ))); - } - let signer = resolve_actor_bridge_signer( - &ctx, - params.signer_session_id.as_deref(), - params.signer_authority.as_ref(), - kind, - "bridge.farm.publish", - ) - .await?; - let signer_pubkey = signer.signer_pubkey_hex(); - let canonical = CanonicalBridgeFarmPublishRequest { - kind, - farm: params.farm, - }; - let request_fingerprint = - fingerprint_bridge_request("bridge.farm.publish", &signer, &canonical)?; - let parts = to_wire_parts_with_kind(&canonical.farm, canonical.kind) - .map_err(|error| RpcError::InvalidParams(format!("invalid farm contract: {error}")))?; - let event_addr = format!("{}:{}:{}", parts.kind, signer_pubkey, canonical.farm.d_tag); - let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) - .map_err(|error| RpcError::Other(format!("failed to build farm event: {error}")))?; - - let reserved = reserve_bridge_job( - &ctx, - new_publish_job( - "bridge.farm.publish", - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - parts.kind, - None, - Some(event_addr.clone()), - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - ), - request_fingerprint, - "bridge farm", - )?; - let job = match reserved { - crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, - crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { - return Ok(BridgePublishResponse { - deduplicated: true, - job: existing.into(), - }); - } - }; - - let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); - let event = match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.farm.publish").await - { - Ok(event) => event, - Err(error) => { - let _ = ctx.state.bridge_jobs.complete( - &job.job_id, - None, - failed_prepublish_execution(&publish_settings, error.to_string()), - ); - return Err(error); - } - }; - - let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; - let job = ctx - .state - .bridge_jobs - .complete(&job.job_id, Some(event.id.to_hex()), execution) - .map_err(|error| RpcError::Other(format!("failed to persist bridge farm job: {error}")))? - .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; - debug_assert_eq!(job.event_addr.as_deref(), Some(event_addr.as_str())); - - Ok(BridgePublishResponse { - deduplicated: false, - job: job.into(), - }) -} diff --git a/src/transport/jsonrpc/methods/bridge/job_list.rs b/src/transport/jsonrpc/methods/bridge/job_list.rs @@ -1,22 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; - -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::BridgeJobView; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.job.list"); - m.register_async_method("bridge.job.list", |_params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let jobs = ctx - .state - .bridge_jobs - .list() - .into_iter() - .map(BridgeJobView::from) - .collect::<Vec<_>>(); - Ok::<Vec<BridgeJobView>, RpcError>(jobs) - })?; - Ok(()) -} diff --git a/src/transport/jsonrpc/methods/bridge/job_status.rs b/src/transport/jsonrpc/methods/bridge/job_status.rs @@ -1,32 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use serde::Deserialize; - -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::BridgeJobView; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -#[derive(Debug, Deserialize)] -struct BridgeJobStatusParams { - job_id: String, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.job.status"); - m.register_async_method("bridge.job.status", |params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let params: BridgeJobStatusParams = params - .parse() - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let job_id = params.job_id.trim(); - if job_id.is_empty() { - return Err(RpcError::InvalidParams("missing job_id".to_string())); - } - ctx.state - .bridge_jobs - .get(job_id) - .ok_or_else(|| RpcError::Other(format!("unknown bridge job: {job_id}"))) - .map(BridgeJobView::from) - })?; - Ok(()) -} diff --git a/src/transport/jsonrpc/methods/bridge/listing_publish.rs b/src/transport/jsonrpc/methods/bridge/listing_publish.rs @@ -1,489 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use radroots_events::RadrootsNostrEvent; -use radroots_events::kinds::{KIND_LISTING, KIND_LISTING_DRAFT, is_listing_kind}; -use radroots_events::listing::RadrootsListing; -use radroots_events_codec::listing::encode::to_wire_parts_with_kind; -use radroots_events_codec::wire::WireEventParts; -use radroots_nostr::prelude::radroots_nostr_build_event; -use radroots_trade::listing::validation::{RadrootsTradeListing, validate_listing_event}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::core::bridge::publish::{ - BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, -}; -use crate::core::bridge::store::new_listing_publish_job; -use crate::core::nip46::session::Nip46SessionAuthority; -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, - normalize_idempotency_key, reserve_bridge_job, resolve_actor_bridge_signer, - sign_bridge_event_builder, -}; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -#[derive(Debug, Deserialize)] -struct BridgeListingPublishParams { - listing: RadrootsListing, - #[serde(default)] - kind: Option<u32>, - #[serde(default)] - signer_session_id: Option<String>, - #[serde(default)] - signer_authority: Option<Nip46SessionAuthority>, - #[serde(default)] - idempotency_key: Option<String>, -} - -#[derive(Debug, Clone, Serialize)] -struct CanonicalBridgeListingPublishRequest { - kind: u32, - listing: RadrootsListing, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.listing.publish"); - m.register_async_method( - "bridge.listing.publish", - |params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let params: BridgeListingPublishParams = params - .parse() - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let response = publish_listing(ctx.as_ref().clone(), params).await?; - Ok::<BridgePublishResponse, RpcError>(response) - }, - )?; - Ok(()) -} - -async fn publish_listing( - ctx: RpcContext, - params: BridgeListingPublishParams, -) -> Result<BridgePublishResponse, RpcError> { - ensure_bridge_enabled(&ctx)?; - let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let kind = resolve_bridge_listing_kind(params.kind)?; - let signer = resolve_actor_bridge_signer( - &ctx, - params.signer_session_id.as_deref(), - params.signer_authority.as_ref(), - kind, - "bridge.listing.publish", - ) - .await?; - let signer_pubkey = signer.signer_pubkey_hex(); - let listing = canonicalize_bridge_listing_for_signer(params.listing, signer_pubkey.as_str()); - let canonical = CanonicalBridgeListingPublishRequest { kind, listing }; - let request_fingerprint = - fingerprint_bridge_request("bridge.listing.publish", &signer, &canonical)?; - let parts = to_wire_parts_with_kind(&canonical.listing, canonical.kind) - .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; - let validated = validate_canonical_listing_contract_for_signer( - &canonical.listing, - signer_pubkey.as_str(), - &parts, - )?; - let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) - .map_err(|error| RpcError::Other(format!("failed to build listing event: {error}")))?; - - let reserved = reserve_bridge_job( - &ctx, - new_listing_publish_job( - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - parts.kind, - None, - validated.listing_addr.clone(), - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - ), - request_fingerprint, - "bridge listing", - )?; - let job = match reserved { - crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, - crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { - return Ok(BridgePublishResponse { - deduplicated: true, - job: existing.into(), - }); - } - }; - - let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); - let event = - match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.listing.publish").await { - Ok(event) => event, - Err(error) => { - let _ = ctx.state.bridge_jobs.complete( - &job.job_id, - None, - failed_prepublish_execution(&publish_settings, error.to_string()), - ); - return Err(error); - } - }; - - let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; - let job = ctx - .state - .bridge_jobs - .complete(&job.job_id, Some(event.id.to_hex()), execution) - .map_err(|error| RpcError::Other(format!("failed to persist bridge listing job: {error}")))? - .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; - debug_assert_eq!( - job.event_addr.as_deref(), - Some(validated.listing_addr.as_str()) - ); - - Ok(BridgePublishResponse { - deduplicated: false, - job: job.into(), - }) -} - -fn validate_canonical_listing_contract_for_signer( - listing: &RadrootsListing, - signer_pubkey: &str, - parts: &WireEventParts, -) -> Result<RadrootsTradeListing, RpcError> { - let event = RadrootsNostrEvent { - id: String::new(), - author: signer_pubkey.to_string(), - created_at: 0, - kind: parts.kind, - tags: parts.tags.clone(), - content: parts.content.clone(), - sig: String::new(), - }; - let validated = validate_listing_event(&event) - .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; - debug_assert_eq!(validated.listing.d_tag, listing.d_tag); - Ok(validated) -} - -fn resolve_bridge_listing_kind(kind: Option<u32>) -> Result<u32, RpcError> { - let kind = kind.unwrap_or(KIND_LISTING); - if !is_listing_kind(kind) { - return Err(RpcError::InvalidParams(format!( - "listing kind must be {KIND_LISTING} or {KIND_LISTING_DRAFT}" - ))); - } - Ok(kind) -} - -fn canonicalize_bridge_listing_for_signer( - mut listing: RadrootsListing, - signer_pubkey: &str, -) -> RadrootsListing { - if listing.farm.pubkey.trim().is_empty() { - listing.farm.pubkey = signer_pubkey.to_string(); - } - listing -} - -#[cfg(test)] -mod tests { - use radroots_core::{ - RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity, - RadrootsCoreQuantityPrice, RadrootsCoreUnit, - }; - use radroots_events::farm::RadrootsFarmRef; - use radroots_events::ids::{RadrootsDTag, RadrootsInventoryBinId}; - use radroots_events::kinds::{KIND_LISTING, KIND_LISTING_DRAFT}; - use radroots_events::listing::{ - RadrootsListing, RadrootsListingAvailability, RadrootsListingBin, - RadrootsListingDeliveryMethod, RadrootsListingLocation, RadrootsListingProduct, - }; - use radroots_events_codec::listing::encode::to_wire_parts_with_kind; - use radroots_identity::RadrootsIdentity; - use radroots_nostr::prelude::{ - RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata, radroots_nostr_parse_pubkey, - }; - use std::time::Instant; - - use crate::app::config::{BridgeConfig, Nip46Config}; - use crate::core::Radrootsd; - use crate::core::nip46::session::Nip46Session; - use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - - use super::{ - BridgeListingPublishParams, canonicalize_bridge_listing_for_signer, publish_listing, - validate_canonical_listing_contract_for_signer, - }; - - #[test] - fn canonicalize_listing_sets_missing_farm_pubkey() { - let listing = canonicalize_bridge_listing_for_signer(base_listing(), "abc123"); - assert_eq!(listing.farm.pubkey, "abc123"); - } - - #[test] - fn validate_canonical_listing_contract_rejects_mismatched_seller_before_sign() { - let listing = canonicalize_bridge_listing_for_signer(base_listing(), "abc123"); - let mut invalid = listing.clone(); - invalid.farm.pubkey = "other".to_string(); - let parts = to_wire_parts_with_kind(&invalid, KIND_LISTING).expect("wire parts"); - let err = - validate_canonical_listing_contract_for_signer(&invalid, "abc123", &parts).unwrap_err(); - assert!(err.to_string().contains("invalid listing contract")); - } - - #[tokio::test] - async fn publish_listing_is_job_backed_and_idempotent() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - let session_id = insert_signer_session(&ctx, "session-1").await; - let params = BridgeListingPublishParams { - listing: base_listing(), - kind: None, - signer_session_id: Some(session_id.clone()), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }; - - let first = publish_listing(ctx.clone(), params).await.expect("first"); - assert!(!first.deduplicated); - assert_eq!(first.job.command, "bridge.listing.publish"); - assert!(first.job.event_addr.is_some()); - - let second = publish_listing( - ctx, - BridgeListingPublishParams { - listing: base_listing(), - kind: None, - signer_session_id: Some(session_id), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }, - ) - .await - .expect("second"); - assert!(second.deduplicated); - assert_eq!(second.job.job_id, first.job.job_id); - } - - #[tokio::test] - async fn publish_listing_rejects_invalid_seller_before_job_reserve() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - let session_id = insert_signer_session(&ctx, "session-1").await; - let mut listing = base_listing(); - listing.farm.pubkey = - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string(); - let err = publish_listing( - ctx.clone(), - BridgeListingPublishParams { - listing, - kind: None, - signer_session_id: Some(session_id), - signer_authority: None, - idempotency_key: Some("bad-listing".to_string()), - }, - ) - .await - .expect_err("invalid seller rejected"); - assert!(err.to_string().contains("invalid listing contract")); - assert_eq!(ctx.state.bridge_jobs.snapshot().retained_jobs, 0); - } - - #[tokio::test] - async fn publish_listing_allows_draft_kind() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - let session_id = insert_signer_session(&ctx, "session-1").await; - - let response = publish_listing( - ctx, - BridgeListingPublishParams { - listing: base_listing(), - kind: Some(KIND_LISTING_DRAFT), - signer_session_id: Some(session_id), - signer_authority: None, - idempotency_key: Some("draft-kind".to_string()), - }, - ) - .await - .expect("draft listing"); - - assert_eq!(response.job.event_kind, KIND_LISTING_DRAFT); - assert!( - response - .job - .event_addr - .as_deref() - .is_some_and(|addr| addr.starts_with("30403:")) - ); - } - - #[tokio::test] - async fn publish_listing_rejects_missing_signer_session() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - - let err = publish_listing( - ctx, - BridgeListingPublishParams { - listing: base_listing(), - kind: None, - signer_session_id: None, - signer_authority: None, - idempotency_key: Some("missing-session".to_string()), - }, - ) - .await - .expect_err("missing session rejected"); - assert!(err.to_string().contains("requires signer_session_id")); - } - - async fn insert_signer_session(ctx: &RpcContext, session_id: &str) -> String { - let signer_keys = RadrootsNostrKeys::generate(); - let signer_pubkey = signer_keys.public_key().to_hex(); - let remote_signer_pubkey = - radroots_nostr_parse_pubkey(signer_pubkey.as_str()).expect("signer pubkey"); - let client = RadrootsNostrClient::new(signer_keys.clone()); - let client_keys = signer_keys.clone(); - let client_pubkey = client_keys.public_key(); - ctx.state - .nip46_sessions - .insert(Nip46Session { - id: session_id.to_string(), - client, - client_keys, - client_pubkey, - remote_signer_pubkey, - user_pubkey: None, - relays: Vec::new(), - perms: vec!["sign_event".to_string()], - name: None, - url: None, - image: None, - expires_at: Some(Instant::now() + std::time::Duration::from_secs(60)), - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: None, - }) - .await; - session_id.to_string() - } - - fn base_listing() -> RadrootsListing { - RadrootsListing { - d_tag: RadrootsDTag::parse("AAAAAAAAAAAAAAAAAAAAAg").expect("listing d tag"), - farm: RadrootsFarmRef { - pubkey: String::new(), - d_tag: "AAAAAAAAAAAAAAAAAAAAAw".to_string(), - }, - product: RadrootsListingProduct { - key: "coffee".to_string(), - title: "Coffee".to_string(), - category: "coffee".to_string(), - summary: Some("Single origin coffee".to_string()), - process: None, - lot: None, - location: None, - profile: None, - year: None, - }, - primary_bin_id: RadrootsInventoryBinId::parse("bin-1").expect("primary bin id"), - bins: vec![RadrootsListingBin { - bin_id: RadrootsInventoryBinId::parse("bin-1").expect("bin id"), - quantity: RadrootsCoreQuantity::new( - RadrootsCoreDecimal::from(1000u32), - RadrootsCoreUnit::MassG, - ), - price_per_canonical_unit: RadrootsCoreQuantityPrice::new( - RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(20u32), - RadrootsCoreCurrency::USD, - ), - RadrootsCoreQuantity::new( - RadrootsCoreDecimal::from(1u32), - RadrootsCoreUnit::MassG, - ), - ), - display_amount: None, - display_unit: None, - display_label: None, - display_price: None, - display_price_unit: None, - }], - resource_area: None, - plot: None, - discounts: None, - inventory_available: Some(RadrootsCoreDecimal::from(5u32)), - availability: Some(RadrootsListingAvailability::Status { - status: radroots_events::listing::RadrootsListingStatus::Active, - }), - delivery_method: Some(RadrootsListingDeliveryMethod::Pickup), - location: Some(RadrootsListingLocation { - primary: "Farm".to_string(), - city: None, - region: None, - country: None, - lat: None, - lng: None, - geohash: None, - }), - images: None, - published_at: None, - } - } -} diff --git a/src/transport/jsonrpc/methods/bridge/mod.rs b/src/transport/jsonrpc/methods/bridge/mod.rs @@ -1,25 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; - -use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - -mod farm_publish; -mod job_list; -mod job_status; -mod listing_publish; -mod order_request; -mod profile_publish; -mod shared; -mod status; - -pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { - let mut m = RpcModule::new(ctx); - status::register(&mut m, &registry)?; - job_list::register(&mut m, &registry)?; - job_status::register(&mut m, &registry)?; - profile_publish::register(&mut m, &registry)?; - farm_publish::register(&mut m, &registry)?; - listing_publish::register(&mut m, &registry)?; - order_request::register(&mut m, &registry)?; - Ok(m) -} diff --git a/src/transport/jsonrpc/methods/bridge/order_request.rs b/src/transport/jsonrpc/methods/bridge/order_request.rs @@ -1,437 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use radroots_events::RadrootsNostrEventPtr; -use radroots_events::kinds::KIND_ORDER_REQUEST; -use radroots_events::order::RadrootsOrderRequest as TradeOrder; -use radroots_events_codec::order::order_request_event_build; -use radroots_nostr::prelude::{radroots_nostr_build_event, radroots_nostr_parse_pubkey}; -use radroots_trade::order::canonicalize_order_request_for_signer; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::core::bridge::publish::{ - BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, -}; -use crate::core::bridge::store::new_order_request_job; -use crate::core::nip46::session::Nip46SessionAuthority; -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, - normalize_idempotency_key, reserve_bridge_job, resolve_actor_bridge_signer, - sign_bridge_event_builder, -}; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -#[derive(Debug, Deserialize)] -struct BridgeOrderRequestParams { - order: TradeOrder, - listing_event: RadrootsNostrEventPtr, - #[serde(default)] - signer_session_id: Option<String>, - #[serde(default)] - signer_authority: Option<Nip46SessionAuthority>, - #[serde(default)] - idempotency_key: Option<String>, -} - -#[derive(Serialize)] -struct CanonicalBridgeOrderRequest<'a> { - order: &'a TradeOrder, - listing_event: &'a RadrootsNostrEventPtr, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.order.request"); - m.register_async_method( - "bridge.order.request", - |params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let params: BridgeOrderRequestParams = params - .parse() - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let response = publish_order_request(ctx.as_ref().clone(), params).await?; - Ok::<BridgePublishResponse, RpcError>(response) - }, - )?; - Ok(()) -} - -async fn publish_order_request( - ctx: RpcContext, - params: BridgeOrderRequestParams, -) -> Result<BridgePublishResponse, RpcError> { - ensure_bridge_enabled(&ctx)?; - - let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let signer = resolve_actor_bridge_signer( - &ctx, - params.signer_session_id.as_deref(), - params.signer_authority.as_ref(), - KIND_ORDER_REQUEST, - "bridge.order.request", - ) - .await?; - let signer_pubkey = signer.signer_pubkey_hex(); - let listing_event = params.listing_event; - let order = canonicalize_order_request_for_signer(params.order, signer_pubkey.as_str()) - .map_err(|error| RpcError::InvalidParams(error.to_string()))?; - radroots_nostr_parse_pubkey(&order.buyer_pubkey) - .map_err(|error| RpcError::InvalidParams(format!("invalid order.buyer_pubkey: {error}")))?; - radroots_nostr_parse_pubkey(&order.seller_pubkey).map_err(|error| { - RpcError::InvalidParams(format!("invalid order.seller_pubkey: {error}")) - })?; - let request_fingerprint = fingerprint_bridge_request( - "bridge.order.request", - &signer, - &CanonicalBridgeOrderRequest { - order: &order, - listing_event: &listing_event, - }, - )?; - let built = order_request_event_build(&listing_event, &order).map_err(|error| { - RpcError::Other(format!("failed to build order request event: {error}")) - })?; - let builder = - radroots_nostr_build_event(built.kind, built.content, built.tags).map_err(|error| { - RpcError::Other(format!("failed to build order request event: {error}")) - })?; - - let reserved = reserve_bridge_job( - &ctx, - new_order_request_job( - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - KIND_ORDER_REQUEST, - None, - order.listing_addr.to_string(), - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - ), - request_fingerprint, - "bridge order", - )?; - let job = match reserved { - crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, - crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { - return Ok(BridgePublishResponse { - deduplicated: true, - job: existing.into(), - }); - } - }; - - let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); - let event = - match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.order.request").await { - Ok(event) => event, - Err(error) => { - let _ = ctx.state.bridge_jobs.complete( - &job.job_id, - None, - failed_prepublish_execution(&publish_settings, error.to_string()), - ); - return Err(error); - } - }; - - let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; - let job = ctx - .state - .bridge_jobs - .complete(&job.job_id, Some(event.id.to_hex()), execution) - .map_err(|error| RpcError::Other(format!("failed to persist bridge order job: {error}")))? - .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; - - Ok(BridgePublishResponse { - deduplicated: false, - job: job.into(), - }) -} - -#[cfg(test)] -mod tests { - use radroots_core::{ - RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreUnit, - }; - use radroots_events::RadrootsNostrEventPtr; - use radroots_events::ids::{ - RadrootsInventoryBinId, RadrootsListingAddress, RadrootsOrderId, RadrootsOrderQuoteId, - RadrootsPublicKey, - }; - use radroots_events::order::{ - RadrootsOrderEconomicItem as TradeOrderEconomicItem, - RadrootsOrderEconomicLine as TradeOrderEconomicLine, - RadrootsOrderEconomics as TradeOrderEconomics, RadrootsOrderItem as TradeOrderItem, - RadrootsOrderPricingBasis as TradePricingBasis, RadrootsOrderRequest as TradeOrder, - }; - use radroots_identity::RadrootsIdentity; - use radroots_nostr::prelude::{ - RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata, radroots_nostr_parse_pubkey, - }; - use std::time::Instant; - - use crate::app::config::{BridgeConfig, Nip46Config}; - use crate::core::Radrootsd; - use crate::core::nip46::session::Nip46Session; - use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - use radroots_trade::order::canonicalize_order_request_for_signer; - - use super::{BridgeOrderRequestParams, publish_order_request}; - - const BUYER: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - const SELLER: &str = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; - - #[test] - fn canonicalize_order_request_accepts_matching_buyer_and_seller_pubkeys() { - let order = canonicalize_order_request_for_signer(base_order("", ""), BUYER) - .expect("canonicalize"); - - assert_eq!(order.buyer_pubkey, BUYER); - assert_eq!(order.seller_pubkey, SELLER); - } - - #[test] - fn canonicalize_order_request_rejects_items_with_zero_bin_count() { - let mut order = base_order( - BUYER, "", - ); - order.items[0].bin_count = 0; - let err = - canonicalize_order_request_for_signer(order, BUYER).expect_err("zero bin count"); - assert!(err.to_string().contains("bin_count")); - } - - #[tokio::test] - async fn publish_order_request_is_job_backed_and_idempotent() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - let (session_id, signer_pubkey) = insert_signer_session(&ctx, "session-1").await; - let params = BridgeOrderRequestParams { - order: base_order(signer_pubkey.as_str(), ""), - listing_event: base_listing_event(), - signer_session_id: Some(session_id.clone()), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }; - - let first = publish_order_request(ctx.clone(), params) - .await - .expect("first"); - assert!(!first.deduplicated); - assert_eq!(first.job.command, "bridge.order.request"); - assert_eq!(first.job.event_addr.as_deref(), Some(base_listing_addr())); - - let second = publish_order_request( - ctx, - BridgeOrderRequestParams { - order: base_order(signer_pubkey.as_str(), ""), - listing_event: base_listing_event(), - signer_session_id: Some(session_id), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }, - ) - .await - .expect("second"); - assert!(second.deduplicated); - assert_eq!(second.job.job_id, first.job.job_id); - } - - #[tokio::test] - async fn publish_order_request_rejects_conflicting_idempotency_key_reuse() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - let (session_id, signer_pubkey) = insert_signer_session(&ctx, "session-1").await; - publish_order_request( - ctx.clone(), - BridgeOrderRequestParams { - order: base_order(signer_pubkey.as_str(), ""), - listing_event: base_listing_event(), - signer_session_id: Some(session_id.clone()), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }, - ) - .await - .expect("first"); - - let mut conflicting = base_order(signer_pubkey.as_str(), ""); - conflicting.order_id = RadrootsOrderId::parse("order-2").expect("order id"); - let err = publish_order_request( - ctx, - BridgeOrderRequestParams { - order: conflicting, - listing_event: base_listing_event(), - signer_session_id: Some(session_id), - signer_authority: None, - idempotency_key: Some("same-key".to_string()), - }, - ) - .await - .expect_err("conflicting idempotency"); - assert!(err.to_string().contains("conflicts")); - } - - #[tokio::test] - async fn publish_order_request_rejects_missing_signer_session() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig { - enabled: true, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() - }, - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - - let err = publish_order_request( - ctx, - BridgeOrderRequestParams { - order: base_order("", ""), - listing_event: base_listing_event(), - signer_session_id: None, - signer_authority: None, - idempotency_key: Some("missing-session".to_string()), - }, - ) - .await - .expect_err("missing session rejected"); - assert!(err.to_string().contains("requires signer_session_id")); - } - - async fn insert_signer_session(ctx: &RpcContext, session_id: &str) -> (String, String) { - let signer_keys = RadrootsNostrKeys::generate(); - let signer_pubkey = signer_keys.public_key().to_hex(); - let remote_signer_pubkey = - radroots_nostr_parse_pubkey(signer_pubkey.as_str()).expect("signer pubkey"); - let client = RadrootsNostrClient::new(signer_keys.clone()); - let client_keys = signer_keys.clone(); - let client_pubkey = client_keys.public_key(); - ctx.state - .nip46_sessions - .insert(Nip46Session { - id: session_id.to_string(), - client, - client_keys, - client_pubkey, - remote_signer_pubkey, - user_pubkey: None, - relays: Vec::new(), - perms: vec!["sign_event".to_string()], - name: None, - url: None, - image: None, - expires_at: Some(Instant::now() + std::time::Duration::from_secs(60)), - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: None, - }) - .await; - (session_id.to_string(), signer_pubkey) - } - - fn base_listing_addr() -> &'static str { - "30402:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb:AAAAAAAAAAAAAAAAAAAAAg" - } - - fn base_listing_event() -> RadrootsNostrEventPtr { - RadrootsNostrEventPtr { - id: "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc".to_string(), - relays: None, - } - } - - fn base_order_economics() -> TradeOrderEconomics { - TradeOrderEconomics { - quote_id: RadrootsOrderQuoteId::parse("quote-1").expect("quote id"), - quote_version: 1, - pricing_basis: TradePricingBasis::ListingEvent, - currency: RadrootsCoreCurrency::USD, - items: vec![TradeOrderEconomicItem { - bin_id: RadrootsInventoryBinId::parse("bin-1").expect("bin id"), - bin_count: 2, - quantity_amount: RadrootsCoreDecimal::from(1u32), - quantity_unit: RadrootsCoreUnit::Each, - unit_price_amount: RadrootsCoreDecimal::from(5u32), - unit_price_currency: RadrootsCoreCurrency::USD, - line_subtotal: RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(10u32), - RadrootsCoreCurrency::USD, - ), - }], - discounts: Vec::<TradeOrderEconomicLine>::new(), - adjustments: Vec::<TradeOrderEconomicLine>::new(), - subtotal: RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(10u32), - RadrootsCoreCurrency::USD, - ), - discount_total: RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(0u32), - RadrootsCoreCurrency::USD, - ), - adjustment_total: RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(0u32), - RadrootsCoreCurrency::USD, - ), - total: RadrootsCoreMoney::new( - RadrootsCoreDecimal::from(10u32), - RadrootsCoreCurrency::USD, - ), - } - } - - fn base_order(buyer_pubkey: &str, seller_pubkey: &str) -> TradeOrder { - TradeOrder { - order_id: RadrootsOrderId::parse("order-1").expect("order id"), - listing_addr: RadrootsListingAddress::parse(base_listing_addr()) - .expect("listing address"), - buyer_pubkey: pubkey_or(BUYER, buyer_pubkey), - seller_pubkey: pubkey_or(SELLER, seller_pubkey), - items: vec![TradeOrderItem { - bin_id: RadrootsInventoryBinId::parse("bin-1").expect("bin id"), - bin_count: 2, - }], - economics: base_order_economics(), - } - } - - fn pubkey_or(default: &str, value: &str) -> RadrootsPublicKey { - let value = if value.is_empty() { default } else { value }; - RadrootsPublicKey::parse(value).expect("pubkey") - } -} diff --git a/src/transport/jsonrpc/methods/bridge/profile_publish.rs b/src/transport/jsonrpc/methods/bridge/profile_publish.rs @@ -1,137 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use radroots_events::{ - kinds::KIND_PROFILE, - profile::{RadrootsProfile, RadrootsProfileType}, -}; -use radroots_events_codec::profile::encode::to_wire_parts_with_profile_type; -use radroots_nostr::prelude::radroots_nostr_build_event; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::core::bridge::publish::{ - BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, -}; -use crate::core::bridge::store::new_publish_job; -use crate::core::nip46::session::Nip46SessionAuthority; -use crate::transport::jsonrpc::auth::require_bridge_auth; -use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, - normalize_idempotency_key, reserve_bridge_job, resolve_actor_bridge_signer, - sign_bridge_event_builder, -}; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -#[derive(Debug, Deserialize)] -struct BridgeProfilePublishParams { - profile: RadrootsProfile, - #[serde(default)] - profile_type: Option<RadrootsProfileType>, - #[serde(default)] - signer_session_id: Option<String>, - #[serde(default)] - signer_authority: Option<Nip46SessionAuthority>, - #[serde(default)] - idempotency_key: Option<String>, -} - -#[derive(Debug, Clone, Serialize)] -struct CanonicalBridgeProfilePublishRequest { - profile: RadrootsProfile, - profile_type: Option<RadrootsProfileType>, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.profile.publish"); - m.register_async_method( - "bridge.profile.publish", - |params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let params: BridgeProfilePublishParams = params - .parse() - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let response = publish_profile(ctx.as_ref().clone(), params).await?; - Ok::<BridgePublishResponse, RpcError>(response) - }, - )?; - Ok(()) -} - -async fn publish_profile( - ctx: RpcContext, - params: BridgeProfilePublishParams, -) -> Result<BridgePublishResponse, RpcError> { - ensure_bridge_enabled(&ctx)?; - let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let signer = resolve_actor_bridge_signer( - &ctx, - params.signer_session_id.as_deref(), - params.signer_authority.as_ref(), - KIND_PROFILE, - "bridge.profile.publish", - ) - .await?; - let canonical = CanonicalBridgeProfilePublishRequest { - profile: params.profile, - profile_type: params.profile_type, - }; - let request_fingerprint = - fingerprint_bridge_request("bridge.profile.publish", &signer, &canonical)?; - let parts = to_wire_parts_with_profile_type(&canonical.profile, canonical.profile_type) - .map_err(|error| RpcError::InvalidParams(format!("invalid profile contract: {error}")))?; - let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) - .map_err(|error| RpcError::Other(format!("failed to build profile event: {error}")))?; - - let reserved = reserve_bridge_job( - &ctx, - new_publish_job( - "bridge.profile.publish", - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - parts.kind, - None, - None, - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - ), - request_fingerprint, - "bridge profile", - )?; - let job = match reserved { - crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, - crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { - return Ok(BridgePublishResponse { - deduplicated: true, - job: existing.into(), - }); - } - }; - - let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); - let event = - match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.profile.publish").await { - Ok(event) => event, - Err(error) => { - let _ = ctx.state.bridge_jobs.complete( - &job.job_id, - None, - failed_prepublish_execution(&publish_settings, error.to_string()), - ); - return Err(error); - } - }; - - let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; - let job = ctx - .state - .bridge_jobs - .complete(&job.job_id, Some(event.id.to_hex()), execution) - .map_err(|error| RpcError::Other(format!("failed to persist bridge profile job: {error}")))? - .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; - - Ok(BridgePublishResponse { - deduplicated: false, - job: job.into(), - }) -} diff --git a/src/transport/jsonrpc/methods/bridge/shared.rs b/src/transport/jsonrpc/methods/bridge/shared.rs @@ -1,558 +0,0 @@ -use anyhow::Result; -use nostr::Event; -use radroots_nostr::prelude::RadrootsNostrEventBuilder; -use serde::Serialize; -use sha2::{Digest, Sha256}; - -use crate::core::bridge::publish::BridgeRelayPublishResult; -use crate::core::bridge::store::{ - BridgeJobRecord, BridgeJobReservation, BridgeJobStatus, BridgeJobStoreError, -}; -use crate::core::nip46::session::{Nip46SessionAuthority, Nip46SessionRole}; -use crate::transport::jsonrpc::nip46::{client as nip46_client, session as nip46_session}; -use crate::transport::jsonrpc::{RpcContext, RpcError}; - -#[derive(Clone, Debug, Serialize)] -pub(super) struct BridgePublishResponse { - pub deduplicated: bool, - pub job: BridgeJobView, -} - -#[derive(Clone, Debug, Serialize)] -pub(super) struct BridgeJobView { - pub job_id: String, - pub command: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub idempotency_key: Option<String>, - pub status: BridgeJobStatus, - pub terminal: bool, - pub recovered_after_restart: bool, - pub requested_at_unix: u64, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub completed_at_unix: Option<u64>, - pub signer_mode: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub signer_session_id: Option<String>, - pub event_kind: u32, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_id: Option<String>, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_addr: Option<String>, - pub delivery_policy: crate::app::config::BridgeDeliveryPolicy, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub delivery_quorum: Option<usize>, - pub relay_count: usize, - pub acknowledged_relay_count: usize, - pub required_acknowledged_relay_count: usize, - pub attempt_count: usize, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub attempt_summaries: Vec<String>, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub relay_results: Vec<BridgeRelayPublishResult>, - pub relay_outcome_summary: String, -} - -impl From<BridgeJobRecord> for BridgeJobView { - fn from(record: BridgeJobRecord) -> Self { - let (signer_mode, signer_session_id) = split_signer_usage(record.signer_mode.as_str()); - Self { - terminal: record.is_terminal(), - recovered_after_restart: record.recovered_after_restart(), - job_id: record.job_id, - command: record.command, - idempotency_key: record.idempotency_key, - status: record.status, - requested_at_unix: record.requested_at_unix, - completed_at_unix: record.completed_at_unix, - signer_mode, - signer_session_id, - event_kind: record.event_kind, - event_id: record.event_id, - event_addr: record.event_addr, - delivery_policy: record.delivery_policy, - delivery_quorum: record.delivery_quorum, - relay_count: record.relay_count, - acknowledged_relay_count: record.acknowledged_relay_count, - required_acknowledged_relay_count: record.required_acknowledged_relay_count, - attempt_count: record.attempt_count, - attempt_summaries: record.attempt_summaries, - relay_results: record.relay_results, - relay_outcome_summary: record.relay_outcome_summary, - } - } -} - -fn split_signer_usage(value: &str) -> (String, Option<String>) { - match value.split_once(':') { - Some(("nip46_session", session_id)) if !session_id.trim().is_empty() => { - ("nip46_session".to_owned(), Some(session_id.to_owned())) - } - _ => (value.to_owned(), None), - } -} - -pub(super) fn ensure_bridge_enabled(ctx: &RpcContext) -> Result<(), RpcError> { - if !ctx.state.bridge_config.enabled { - return Err(RpcError::Other("bridge ingress is disabled".to_string())); - } - Ok(()) -} - -#[derive(Clone)] -pub(super) enum BridgeSignerSelection { - Nip46Session { - session_id: String, - session: crate::core::nip46::session::Nip46Session, - }, -} - -impl BridgeSignerSelection { - pub(super) fn signer_pubkey_hex(&self) -> String { - match self { - Self::Nip46Session { session, .. } => session.remote_signer_pubkey.to_hex(), - } - } - - pub(super) fn signer_mode(&self) -> String { - match self { - Self::Nip46Session { session_id, .. } => format!("nip46_session:{session_id}"), - } - } -} - -pub(super) async fn resolve_actor_bridge_signer( - ctx: &RpcContext, - signer_session_id: Option<&str>, - signer_authority: Option<&Nip46SessionAuthority>, - event_kind: u32, - command: &str, -) -> Result<BridgeSignerSelection, RpcError> { - let session_id = signer_session_id - .map(str::trim) - .filter(|value| !value.is_empty()) - .ok_or_else(|| { - RpcError::Unauthorized(format!( - "{command} requires signer_session_id for actor-authored bridge writes" - )) - })?; - let session = ctx - .state - .nip46_sessions - .get(session_id) - .await - .ok_or_else(|| { - RpcError::Unauthorized(format!( - "{command} signer_session_id `{session_id}` was not found" - )) - })?; - nip46_session::require_sign_event_permission(&session, event_kind).map_err(|error| { - RpcError::Unauthorized(format!( - "{command} signer_session_id `{session_id}` {}", - error - )) - })?; - require_signer_authority(&session, signer_authority).map_err(|reason| { - RpcError::Unauthorized(format!( - "{command} signer_session_id `{session_id}` {reason}" - )) - })?; - Ok(BridgeSignerSelection::Nip46Session { - session_id: session_id.to_string(), - session, - }) -} - -fn require_signer_authority( - session: &crate::core::nip46::session::Nip46Session, - signer_authority: Option<&Nip46SessionAuthority>, -) -> Result<(), String> { - let Some(expected) = signer_authority else { - return Ok(()); - }; - let Some(actual) = session.signer_authority.as_ref() else { - return Err("is missing signer authority continuity metadata".to_owned()); - }; - if actual.provider_runtime_id != expected.provider_runtime_id { - return Err(format!( - "provider `{}` does not match required provider `{}`", - actual.provider_runtime_id, expected.provider_runtime_id - )); - } - if actual.account_identity_id != expected.account_identity_id { - return Err(format!( - "account identity `{}` does not match required account `{}`", - actual.account_identity_id, expected.account_identity_id - )); - } - if actual.provider_signer_session_id != expected.provider_signer_session_id { - return Err(format!( - "provider signer session `{}` does not match required provider session `{}`", - actual - .provider_signer_session_id - .as_deref() - .unwrap_or("<none>"), - expected - .provider_signer_session_id - .as_deref() - .unwrap_or("<none>") - )); - } - Ok(()) -} - -pub(super) async fn sign_bridge_event_builder( - _ctx: &RpcContext, - signer: &BridgeSignerSelection, - builder: RadrootsNostrEventBuilder, - label: &str, -) -> Result<Event, RpcError> { - match signer { - BridgeSignerSelection::Nip46Session { session, .. } => match session.role() { - Nip46SessionRole::InboundLocalSigner => builder - .sign_with_keys(&session.client_keys) - .map_err(|error| RpcError::Other(format!("failed to sign {label} event: {error}"))), - Nip46SessionRole::OutboundRemoteSigner => { - let unsigned = builder.build(session.remote_signer_pubkey); - nip46_client::sign_event(session, unsigned, label).await - } - }, - } -} - -pub(super) fn normalize_idempotency_key(value: Option<String>) -> Result<Option<String>, RpcError> { - let value = value.map(|value| value.trim().to_string()); - match value { - Some(value) if value.is_empty() => Err(RpcError::InvalidParams( - "idempotency_key cannot be empty".to_string(), - )), - Some(value) => Ok(Some(value)), - None => Ok(None), - } -} - -#[derive(Serialize)] -struct BridgeRequestFingerprint<'a, T> { - command: &'a str, - signer_pubkey_hex: &'a str, - payload: &'a T, -} - -pub(super) fn fingerprint_bridge_request<T: Serialize>( - command: &str, - signer: &BridgeSignerSelection, - payload: &T, -) -> Result<String, RpcError> { - let payload = serde_json::to_vec(&BridgeRequestFingerprint { - command, - signer_pubkey_hex: &signer.signer_pubkey_hex(), - payload, - }) - .map_err(|error| RpcError::Other(format!("failed to fingerprint bridge request: {error}")))?; - let digest = Sha256::digest(payload); - Ok(format!("{digest:x}")) -} - -pub(super) fn reserve_bridge_job( - ctx: &RpcContext, - record: BridgeJobRecord, - request_fingerprint: String, - label: &str, -) -> Result<BridgeJobReservation, RpcError> { - ctx.state - .bridge_jobs - .reserve(record, request_fingerprint) - .map_err(|error| match error { - BridgeJobStoreError::IdempotencyConflict { .. } => { - RpcError::InvalidParams(error.to_string()) - } - _ => RpcError::Other(format!("failed to persist {label} job: {error}")), - }) -} - -#[cfg(test)] -mod tests { - use radroots_identity::RadrootsIdentity; - use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata}; - - use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy, Nip46Config}; - use crate::core::Radrootsd; - use crate::core::bridge::store::{ - BRIDGE_PENDING_RECOVERY_SUMMARY, BridgeJobStatus, new_listing_publish_job, - }; - use crate::core::nip46::session::{Nip46Session, Nip46SessionAuthority}; - use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - - use super::{ - BridgeJobView, fingerprint_bridge_request, normalize_idempotency_key, - resolve_actor_bridge_signer, - }; - use std::time::Instant; - - #[test] - fn normalize_idempotency_key_rejects_empty_values() { - let err = normalize_idempotency_key(Some(" ".to_string())).expect_err("empty key"); - assert!(err.to_string().contains("idempotency_key")); - } - - #[tokio::test] - async fn resolve_actor_bridge_signer_rejects_missing_session_id() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity, - metadata, - BridgeConfig::default(), - Nip46Config::default(), - ) - .expect("state"); - let ctx = RpcContext::new(state, MethodRegistry::default()); - - let err = - match resolve_actor_bridge_signer(&ctx, None, None, 30402, "bridge.listing.publish") - .await - { - Ok(_) => panic!("expected missing session to fail"), - Err(err) => err, - }; - assert!(err.to_string().contains("requires signer_session_id")); - } - - #[tokio::test] - async fn resolve_actor_bridge_signer_rejects_sign_event_permission_gap() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity.clone(), - metadata, - BridgeConfig::default(), - Nip46Config::default(), - ) - .expect("state"); - let session_keys = RadrootsNostrKeys::generate(); - state - .nip46_sessions - .insert(Nip46Session { - id: "session-1".to_string(), - client: RadrootsNostrClient::new(session_keys.clone()), - client_keys: session_keys.clone(), - client_pubkey: session_keys.public_key(), - remote_signer_pubkey: session_keys.public_key(), - user_pubkey: None, - relays: vec!["wss://relay.example.com".to_string()], - perms: vec!["nip04_encrypt".to_string()], - name: None, - url: None, - image: None, - expires_at: Some(Instant::now() + std::time::Duration::from_secs(60)), - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: None, - }) - .await; - let ctx = RpcContext::new(state, MethodRegistry::default()); - - let err = match resolve_actor_bridge_signer( - &ctx, - Some("session-1"), - None, - 30402, - "bridge.listing.publish", - ) - .await - { - Ok(_) => panic!("expected permission gap to fail"), - Err(err) => err, - }; - assert!(err.to_string().contains("unauthorized")); - assert!(err.to_string().contains("sign_event:30402")); - } - - #[tokio::test] - async fn resolve_actor_bridge_signer_rejects_mismatched_authority() { - let identity = RadrootsIdentity::generate(); - let metadata: RadrootsNostrMetadata = - serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let state = Radrootsd::new( - identity.clone(), - metadata, - BridgeConfig::default(), - Nip46Config::default(), - ) - .expect("state"); - let session_keys = RadrootsNostrKeys::generate(); - state - .nip46_sessions - .insert(Nip46Session { - id: "session-1".to_string(), - client: RadrootsNostrClient::new(session_keys.clone()), - client_keys: session_keys.clone(), - client_pubkey: session_keys.public_key(), - remote_signer_pubkey: session_keys.public_key(), - user_pubkey: None, - relays: vec!["wss://relay.example.com".to_string()], - perms: vec!["sign_event".to_string()], - name: None, - url: None, - image: None, - expires_at: Some(Instant::now() + std::time::Duration::from_secs(60)), - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: Some(Nip46SessionAuthority { - provider_runtime_id: "myc".to_owned(), - account_identity_id: "acct-authorized".to_owned(), - provider_signer_session_id: Some("conn-authorized".to_owned()), - }), - }) - .await; - let ctx = RpcContext::new(state, MethodRegistry::default()); - - let err = match resolve_actor_bridge_signer( - &ctx, - Some("session-1"), - Some(&Nip46SessionAuthority { - provider_runtime_id: "myc".to_owned(), - account_identity_id: "acct-other".to_owned(), - provider_signer_session_id: Some("conn-authorized".to_owned()), - }), - 30402, - "bridge.listing.publish", - ) - .await - { - Ok(_) => panic!("expected authority mismatch to fail"), - Err(err) => err, - }; - assert!(err.to_string().contains("account identity")); - assert!(err.to_string().contains("acct-other")); - } - - #[test] - fn fingerprint_bridge_request_changes_when_request_changes() { - let session_keys = RadrootsNostrKeys::generate(); - let signer = super::BridgeSignerSelection::Nip46Session { - session_id: "session-1".to_string(), - session: Nip46Session { - id: "session-1".to_string(), - client: RadrootsNostrClient::new(session_keys.clone()), - client_keys: session_keys.clone(), - client_pubkey: session_keys.public_key(), - remote_signer_pubkey: session_keys.public_key(), - user_pubkey: None, - relays: vec!["wss://relay.example.com".to_string()], - perms: vec!["sign_event".to_string()], - name: None, - url: None, - image: None, - expires_at: None, - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: None, - }, - }; - let first = fingerprint_bridge_request( - "bridge.order.request", - &signer, - &serde_json::json!({"order_id":"one"}), - ) - .expect("first"); - let second = fingerprint_bridge_request( - "bridge.order.request", - &signer, - &serde_json::json!({"order_id":"two"}), - ) - .expect("second"); - assert_ne!(first, second); - } - - #[test] - fn fingerprint_bridge_request_is_stable_across_nip46_session_renewal() { - let session_keys = RadrootsNostrKeys::generate(); - let session = Nip46Session { - id: "session-1".to_string(), - client: RadrootsNostrClient::new(session_keys.clone()), - client_keys: RadrootsNostrKeys::generate(), - client_pubkey: RadrootsNostrKeys::generate().public_key(), - remote_signer_pubkey: session_keys.public_key(), - user_pubkey: None, - relays: vec!["wss://relay.example.com".to_string()], - perms: vec!["sign_event".to_string()], - name: None, - url: None, - image: None, - expires_at: None, - auth_required: false, - authorized: true, - auth_url: None, - pending_request: None, - signer_authority: None, - }; - let renewed_session = Nip46Session { - id: "session-2".to_string(), - ..session.clone() - }; - let first = fingerprint_bridge_request( - "bridge.order.request", - &super::BridgeSignerSelection::Nip46Session { - session_id: "session-1".to_string(), - session, - }, - &serde_json::json!({"order_id":"same"}), - ) - .expect("first"); - let second = fingerprint_bridge_request( - "bridge.order.request", - &super::BridgeSignerSelection::Nip46Session { - session_id: "session-2".to_string(), - session: renewed_session, - }, - &serde_json::json!({"order_id":"same"}), - ) - .expect("second"); - assert_eq!(first, second); - } - - #[test] - fn bridge_job_view_exposes_terminal_and_recovery_flags() { - let mut job = new_listing_publish_job( - "job-1".to_string(), - Some("same".to_string()), - "embedded_service_identity".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - job.status = BridgeJobStatus::Failed; - job.completed_at_unix = Some(1); - job.relay_outcome_summary = BRIDGE_PENDING_RECOVERY_SUMMARY.to_string(); - let view = BridgeJobView::from(job); - assert!(view.terminal); - assert!(view.recovered_after_restart); - } - - #[test] - fn bridge_job_view_exposes_signer_session_separately() { - let job = new_listing_publish_job( - "job-1".to_string(), - Some("same".to_string()), - "nip46_session:session-1".to_string(), - 30402, - Some("event-1".to_string()), - "30402:author:listing".to_string(), - BridgeDeliveryPolicy::Any, - None, - ); - let view = BridgeJobView::from(job); - assert_eq!(view.signer_mode, "nip46_session"); - assert_eq!(view.signer_session_id.as_deref(), Some("session-1")); - } -} diff --git a/src/transport/jsonrpc/methods/bridge/status.rs b/src/transport/jsonrpc/methods/bridge/status.rs @@ -1,82 +0,0 @@ -use anyhow::Result; -use jsonrpsee::server::RpcModule; -use serde::Serialize; - -use crate::app::config::BridgeDeliveryPolicy; -use crate::core::nip46::session::Nip46SessionRole; -use crate::transport::jsonrpc::auth::{BRIDGE_AUTH_MODE, require_bridge_auth}; -use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; - -const BRIDGE_SIGNER_SELECTION_MODE: &str = "selectable_per_request"; -const BRIDGE_DEFAULT_SIGNER_MODE: &str = "embedded_service_identity"; -const BRIDGE_NIP46_SIGNER_MODE: &str = "nip46_session"; - -#[derive(Clone, Debug, Serialize)] -struct BridgeStatusResponse { - enabled: bool, - ready: bool, - auth_mode: String, - signer_mode: String, - default_signer_mode: String, - supported_signer_modes: Vec<String>, - available_nip46_signer_sessions: usize, - relay_count: usize, - delivery_policy: BridgeDeliveryPolicy, - #[serde(default, skip_serializing_if = "Option::is_none")] - delivery_quorum: Option<usize>, - publish_max_attempts: usize, - publish_initial_backoff_millis: u64, - publish_max_backoff_millis: u64, - job_status_retention: usize, - retained_jobs: usize, - retained_idempotency_keys: usize, - accepted_jobs: usize, - published_jobs: usize, - failed_jobs: usize, - recovered_failed_jobs: usize, - methods: Vec<String>, -} - -pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { - registry.track("bridge.status"); - m.register_async_method("bridge.status", |_params, ctx, extensions| async move { - require_bridge_auth(&extensions)?; - let relay_count = ctx.state.client.relays().await.len(); - let snapshot = ctx.state.bridge_jobs.snapshot(); - let available_nip46_signer_sessions = ctx - .state - .nip46_sessions - .list() - .await - .into_iter() - .filter(|session| session.role() == Nip46SessionRole::OutboundRemoteSigner) - .count(); - Ok::<BridgeStatusResponse, RpcError>(BridgeStatusResponse { - enabled: ctx.state.bridge_config.enabled, - ready: ctx.state.bridge_config.enabled && relay_count > 0, - auth_mode: BRIDGE_AUTH_MODE.to_string(), - signer_mode: BRIDGE_SIGNER_SELECTION_MODE.to_string(), - default_signer_mode: BRIDGE_DEFAULT_SIGNER_MODE.to_string(), - supported_signer_modes: vec![ - BRIDGE_DEFAULT_SIGNER_MODE.to_string(), - BRIDGE_NIP46_SIGNER_MODE.to_string(), - ], - available_nip46_signer_sessions, - relay_count, - delivery_policy: ctx.state.bridge_config.delivery_policy, - delivery_quorum: ctx.state.bridge_config.delivery_quorum, - publish_max_attempts: ctx.state.bridge_config.publish_max_attempts, - publish_initial_backoff_millis: ctx.state.bridge_config.publish_initial_backoff_millis, - publish_max_backoff_millis: ctx.state.bridge_config.publish_max_backoff_millis, - job_status_retention: ctx.state.bridge_config.job_status_retention, - retained_jobs: snapshot.retained_jobs, - retained_idempotency_keys: snapshot.retained_idempotency_keys, - accepted_jobs: snapshot.accepted_jobs, - published_jobs: snapshot.published_jobs, - failed_jobs: snapshot.failed_jobs, - recovered_failed_jobs: snapshot.recovered_failed_jobs, - methods: ctx.methods.list(), - }) - })?; - Ok(()) -} diff --git a/src/transport/jsonrpc/methods/mod.rs b/src/transport/jsonrpc/methods/mod.rs @@ -5,16 +5,16 @@ use jsonrpsee::server::RpcModule; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; -pub mod bridge; pub mod nip46; +pub mod publish_proxy; pub fn register_all( root: &mut RpcModule<RpcContext>, ctx: RpcContext, registry: MethodRegistry, ) -> Result<()> { - if ctx.state.bridge_config.enabled { - root.merge(bridge::module(ctx.clone(), registry.clone())?)?; + if ctx.state.publish_proxy.config.enabled { + root.merge(publish_proxy::module(ctx.clone(), registry.clone())?)?; } if ctx.state.nip46_config.public_jsonrpc_enabled { root.merge(nip46::module(ctx, registry)?)?; @@ -29,41 +29,40 @@ mod tests { use radroots_nostr::prelude::RadrootsNostrMetadata; use super::register_all; - use crate::app::config::{BridgeConfig, Nip46Config}; + use crate::app::config::{Nip46Config, PublishProxyConfig}; use crate::core::Radrootsd; - use crate::transport::jsonrpc::auth::BridgeAuthorization; + use crate::transport::jsonrpc::auth::PublishProxyAuthorization; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - fn state(bridge_enabled: bool, nip46_public_jsonrpc_enabled: bool) -> Radrootsd { + fn state(publish_proxy_enabled: bool, nip46_public_jsonrpc_enabled: bool) -> Radrootsd { let identity = RadrootsIdentity::generate(); let metadata: RadrootsNostrMetadata = serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); - let bridge = BridgeConfig { - enabled: bridge_enabled, - bearer_token: Some("secret".to_string()), - ..BridgeConfig::default() + let publish_proxy = PublishProxyConfig { + enabled: publish_proxy_enabled, + ..PublishProxyConfig::default() }; let nip46 = Nip46Config { public_jsonrpc_enabled: nip46_public_jsonrpc_enabled, ..Nip46Config::default() }; - Radrootsd::new(identity, metadata, bridge, nip46).expect("state") + Radrootsd::new(identity, metadata, publish_proxy, nip46).expect("state") } #[test] - fn register_all_exposes_bridge_methods_by_default() { + fn register_all_exposes_publish_proxy_methods_by_default() { let registry = MethodRegistry::default(); let ctx = RpcContext::new(state(true, false), registry.clone()); let mut root = RpcModule::new(ctx.clone()); register_all(&mut root, ctx, registry).expect("register"); - assert!(root.method("bridge.status").is_some()); - assert!(root.method("bridge.job.list").is_some()); - assert!(root.method("bridge.job.status").is_some()); - assert!(root.method("bridge.profile.publish").is_some()); - assert!(root.method("bridge.farm.publish").is_some()); - assert!(root.method("bridge.listing.publish").is_some()); - assert!(root.method("bridge.order.request").is_some()); + assert!(root.method("publish.capabilities").is_some()); + assert!(root.method("publish.event").is_some()); + assert!(root.method("publish.job.get").is_some()); + assert!(root.method("publish.job.list").is_some()); + assert!(root.method("publish.relays.resolve").is_some()); + let legacy_method = ["br", "idge.status"].concat(); + assert!(root.method(legacy_method.as_str()).is_none()); assert!(root.method("nip46.connect").is_none()); } @@ -74,75 +73,61 @@ mod tests { let mut root = RpcModule::new(ctx.clone()); register_all(&mut root, ctx, registry).expect("register"); - assert!(root.method("bridge.status").is_some()); + assert!(root.method("publish.capabilities").is_some()); assert!(root.method("nip46.connect").is_some()); } #[tokio::test] - async fn bridge_status_rejects_unauthenticated_requests() { + async fn publish_capabilities_rejects_unauthenticated_requests() { let registry = MethodRegistry::default(); let ctx = RpcContext::new(state(true, false), registry.clone()); let mut root = RpcModule::new(ctx.clone()); register_all(&mut root, ctx, registry).expect("register"); let (response, _stream) = root - .raw_json_request(r#"{"jsonrpc":"2.0","method":"bridge.status","id":1}"#, 1) + .raw_json_request( + r#"{"jsonrpc":"2.0","method":"publish.capabilities","id":1}"#, + 1, + ) .await .expect("request"); assert!(response.get().contains("unauthorized")); } #[tokio::test] - async fn bridge_status_accepts_authenticated_requests() { + async fn publish_capabilities_accepts_authenticated_requests() { let registry = MethodRegistry::default(); let ctx = RpcContext::new(state(true, false), registry.clone()); + let principal = ctx + .state + .publish_proxy + .store + .create_principal(crate::core::publish_proxy::PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: crate::core::publish_proxy::hash_bearer_token("secret"), + allowed_pubkeys: vec!["a".repeat(64)], + allowed_kinds: vec![30_402], + allowed_relay_policies: vec![ + radroots_publish_proxy_protocol::PublishRelayPolicy::DaemonDefaultOnly, + ], + allow_request_relays: false, + job_visibility: crate::core::publish_proxy::PublishJobVisibility::Own, + expires_at_unix: None, + }) + .expect("principal"); let mut root = RpcModule::new(ctx.clone()); root.extensions_mut() - .insert(BridgeAuthorization::Authorized); + .insert(PublishProxyAuthorization::Authorized(principal)); register_all(&mut root, ctx, registry).expect("register"); let (response, _stream) = root - .raw_json_request(r#"{"jsonrpc":"2.0","method":"bridge.status","id":1}"#, 1) + .raw_json_request( + r#"{"jsonrpc":"2.0","method":"publish.capabilities","id":1}"#, + 1, + ) .await .expect("request"); - assert!(response.get().contains("\"auth_mode\":\"bearer_token\"")); - assert!( - response - .get() - .contains("\"signer_mode\":\"selectable_per_request\"") - ); - assert!( - response - .get() - .contains("\"default_signer_mode\":\"embedded_service_identity\"") - ); - assert!(response.get().contains( - "\"supported_signer_modes\":[\"embedded_service_identity\",\"nip46_session\"]" - )); - assert!( - response - .get() - .contains("\"available_nip46_signer_sessions\":0") - ); - assert!(response.get().contains("\"accepted_jobs\":0")); - assert!(response.get().contains("\"published_jobs\":0")); - assert!(response.get().contains("\"failed_jobs\":0")); - assert!(response.get().contains("\"recovered_failed_jobs\":0")); - } - - #[tokio::test] - async fn bridge_job_list_accepts_authenticated_requests() { - let registry = MethodRegistry::default(); - let ctx = RpcContext::new(state(true, false), registry.clone()); - let mut root = RpcModule::new(ctx.clone()); - root.extensions_mut() - .insert(BridgeAuthorization::Authorized); - register_all(&mut root, ctx, registry).expect("register"); - - let (response, _stream) = root - .raw_json_request(r#"{"jsonrpc":"2.0","method":"bridge.job.list","id":1}"#, 1) - .await - .expect("request"); - assert!(response.get().contains("\"result\":[]")); + assert!(response.get().contains("\"scoped_bearer_token\"")); + assert!(response.get().contains("\"signed_event_ingress\":true")); } } diff --git a/src/transport/jsonrpc/methods/publish_proxy.rs b/src/transport/jsonrpc/methods/publish_proxy.rs @@ -0,0 +1,301 @@ +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use radroots_publish_proxy_protocol::{ + METHOD_CAPABILITIES, METHOD_EVENT, METHOD_JOB_GET, METHOD_JOB_LIST, METHOD_RELAYS_RESOLVE, + PublishCapabilities, PublishEventRequest, PublishRelayOutcome, +}; +use serde::{Deserialize, Serialize}; + +use crate::core::publish_proxy::PublishJobInsert; +use crate::transport::jsonrpc::auth::require_publish_principal; +use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; + +#[derive(Debug, Deserialize)] +struct JobGetParams { + job_id: String, +} + +#[derive(Debug, Deserialize)] +struct JobListParams { + limit: Option<usize>, +} + +#[derive(Debug, Deserialize)] +struct RelaysResolveParams { + event: radroots_publish_proxy_protocol::SignedNostrEventWire, + relay_policy: radroots_publish_proxy_protocol::PublishRelayPolicy, + #[serde(default)] + relays: Vec<String>, +} + +#[derive(Clone, Debug, Serialize)] +struct RelaysResolveResponse { + relays: Vec<PublishRelayOutcome>, +} + +pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { + let mut module = RpcModule::new(ctx); + register_capabilities(&mut module, &registry)?; + register_event(&mut module, &registry)?; + register_job_get(&mut module, &registry)?; + register_job_list(&mut module, &registry)?; + register_relays_resolve(&mut module, &registry)?; + Ok(module) +} + +fn register_capabilities( + module: &mut RpcModule<RpcContext>, + registry: &MethodRegistry, +) -> Result<()> { + registry.track(METHOD_CAPABILITIES); + module.register_async_method(METHOD_CAPABILITIES, |_params, ctx, extensions| async move { + require_publish_principal(&extensions)?; + Ok::<PublishCapabilities, RpcError>(PublishCapabilities::v1( + ctx.state.publish_proxy.config.max_event_bytes, + ctx.state.publish_proxy.config.max_relays_per_request, + )) + })?; + Ok(()) +} + +fn register_event(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track(METHOD_EVENT); + module.register_async_method(METHOD_EVENT, |params, ctx, extensions| async move { + let principal = require_publish_principal(&extensions)?; + let request: PublishEventRequest = params + .parse() + .map_err(|error| RpcError::InvalidParams(error.to_string()))?; + request + .validate(ctx.state.publish_proxy.config.max_relays_per_request) + .map_err(|error| RpcError::InvalidParams(error.to_string()))?; + let event_size = request.event.content.len() + + request.event.id.len() + + request.event.pubkey.len() + + request.event.sig.len() + + request + .event + .tags + .iter() + .flatten() + .map(String::len) + .sum::<usize>(); + if event_size > ctx.state.publish_proxy.config.max_event_bytes { + return Err(RpcError::InvalidParams( + "signed event exceeds publish_proxy max_event_bytes".to_owned(), + )); + } + principal + .allows_event(&request) + .map_err(|error| RpcError::Unauthorized(error.to_string()))?; + let idempotency_key = request.idempotency_key.clone(); + ctx.state + .publish_proxy + .store + .record_publish_job(PublishJobInsert { + principal_id: principal.principal_id, + idempotency_key, + request, + }) + .map_err(|error| RpcError::Other(error.to_string())) + })?; + Ok(()) +} + +fn register_job_get(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track(METHOD_JOB_GET); + module.register_async_method(METHOD_JOB_GET, |params, ctx, extensions| async move { + let principal = require_publish_principal(&extensions)?; + let params: JobGetParams = params + .parse() + .map_err(|error| RpcError::InvalidParams(error.to_string()))?; + let job_id = params.job_id.trim(); + if job_id.is_empty() { + return Err(RpcError::InvalidParams("missing job_id".to_owned())); + } + ctx.state + .publish_proxy + .store + .job_by_id_for_principal(job_id, &principal) + .map_err(|error| RpcError::Other(error.to_string()))? + .ok_or_else(|| RpcError::Other(format!("unknown publish job: {job_id}"))) + })?; + Ok(()) +} + +fn register_job_list(module: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track(METHOD_JOB_LIST); + module.register_async_method(METHOD_JOB_LIST, |params, ctx, extensions| async move { + let principal = require_publish_principal(&extensions)?; + let params = params + .parse::<JobListParams>() + .unwrap_or(JobListParams { limit: None }); + let configured_limit = ctx.state.publish_proxy.config.job_list_limit; + let limit = params + .limit + .unwrap_or(configured_limit) + .min(configured_limit); + ctx.state + .publish_proxy + .store + .list_jobs_for_principal(&principal, limit) + .map_err(|error| RpcError::Other(error.to_string())) + })?; + Ok(()) +} + +fn register_relays_resolve( + module: &mut RpcModule<RpcContext>, + registry: &MethodRegistry, +) -> Result<()> { + registry.track(METHOD_RELAYS_RESOLVE); + module.register_async_method( + METHOD_RELAYS_RESOLVE, + |params, _ctx, extensions| async move { + require_publish_principal(&extensions)?; + let params: RelaysResolveParams = params + .parse() + .map_err(|error| RpcError::InvalidParams(error.to_string()))?; + params + .event + .validate() + .map_err(|error| RpcError::InvalidParams(error.to_string()))?; + let _ = params.relay_policy; + let _ = params.relays; + Ok::<RelaysResolveResponse, RpcError>(RelaysResolveResponse { relays: Vec::new() }) + }, + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::module; + use crate::app::config::{Nip46Config, PublishProxyConfig}; + use crate::core::Radrootsd; + use crate::core::publish_proxy::{ + PublishJobVisibility, PublishPrincipalInit, generate_bearer_token, hash_bearer_token, + }; + use crate::transport::jsonrpc::auth::{ + PublishProxyAuthorization, authorize_publish_proxy_request, + }; + use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; + use jsonrpsee::server::RpcModule; + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::RadrootsNostrMetadata; + use radroots_publish_proxy_protocol::PublishRelayPolicy; + + fn event_json(pubkey: &str) -> String { + serde_json::json!({ + "id": "0".repeat(64), + "pubkey": pubkey, + "created_at": 1_700_000_000u64, + "kind": 30402u32, + "tags": [["d", "listing-1"]], + "content": "{}", + "sig": "1".repeat(128) + }) + .to_string() + } + + fn module_with_principal(admin: bool) -> (RpcModule<RpcContext>, RpcContext, String, String) { + let identity = RadrootsIdentity::generate(); + let metadata: RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let state = Radrootsd::new( + identity, + metadata, + PublishProxyConfig::default(), + Nip46Config::default(), + ) + .expect("state"); + let token = generate_bearer_token(); + let principal = state + .publish_proxy + .store + .create_principal(PublishPrincipalInit { + label: "tester".to_owned(), + token_hash: hash_bearer_token(token.as_str()), + allowed_pubkeys: vec!["a".repeat(64)], + allowed_kinds: vec![30_402], + allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], + allow_request_relays: false, + job_visibility: if admin { + PublishJobVisibility::Admin + } else { + PublishJobVisibility::Own + }, + expires_at_unix: None, + }) + .expect("principal"); + let registry = MethodRegistry::default(); + let ctx = RpcContext::new(state, registry.clone()); + let mut module = module(ctx.clone(), registry).expect("module"); + module + .extensions_mut() + .insert(PublishProxyAuthorization::Authorized(principal)); + (module, ctx, token, "a".repeat(64)) + } + + #[tokio::test] + async fn publish_event_records_job_and_deduplicates_idempotency() { + let (module, _ctx, _token, pubkey) = module_with_principal(false); + let request = format!( + r#"{{ + "jsonrpc":"2.0", + "method":"publish.event", + "params":{{ + "event":{}, + "relays":[], + "relay_policy":"daemon_default_only", + "delivery_policy":{{"mode":"any"}}, + "idempotency_key":"idem-1" + }}, + "id":1 + }}"#, + event_json(pubkey.as_str()) + ); + let (response, _stream) = module + .raw_json_request(request.as_str(), 1) + .await + .expect("request"); + assert!(response.get().contains("\"deduplicated\":false")); + let (response, _stream) = module + .raw_json_request(request.as_str(), 1) + .await + .expect("request"); + assert!(response.get().contains("\"deduplicated\":true")); + } + + #[tokio::test] + async fn publish_event_rejects_principal_scope_gap() { + let (module, _ctx, _token, _pubkey) = module_with_principal(false); + let request = format!( + r#"{{ + "jsonrpc":"2.0", + "method":"publish.event", + "params":{{ + "event":{}, + "relays":[], + "relay_policy":"daemon_default_only", + "delivery_policy":{{"mode":"any"}} + }}, + "id":1 + }}"#, + event_json("b".repeat(64).as_str()) + ); + let (response, _stream) = module + .raw_json_request(request.as_str(), 1) + .await + .expect("request"); + assert!(response.get().contains("unauthorized")); + } + + #[test] + fn http_auth_finds_principal_from_hashed_token() { + let (_module, ctx, token, _pubkey) = module_with_principal(false); + let header = format!("Bearer {token}"); + let auth = + authorize_publish_proxy_request(Some(header.as_str()), &ctx.state.publish_proxy.store); + assert!(matches!(auth, PublishProxyAuthorization::Authorized(_))); + } +} diff --git a/src/transport/jsonrpc/mod.rs b/src/transport/jsonrpc/mod.rs @@ -27,14 +27,14 @@ pub async fn start_rpc( addr: SocketAddr, rpc_cfg: &RpcConfig, ) -> Result<ServerHandle> { - state.bridge_config.validate()?; + state.publish_proxy.config.validate()?; let registry = MethodRegistry::default(); let ctx = RpcContext::new(state, registry.clone()); - let bridge_config = ctx.state.bridge_config.clone(); + let publish_proxy_store = ctx.state.publish_proxy.store.clone(); let mut root = RpcModule::new(ctx.clone()); methods::register_all(&mut root, ctx, registry)?; - let handle = server::start_server(addr, rpc_cfg, &bridge_config, root).await?; + let handle = server::start_server(addr, rpc_cfg, publish_proxy_store, root).await?; Ok(handle) } diff --git a/src/transport/jsonrpc/server.rs b/src/transport/jsonrpc/server.rs @@ -8,14 +8,15 @@ use jsonrpsee::server::{ ServerHandle, }; -use crate::app::config::{BridgeConfig, RpcConfig}; +use crate::app::config::RpcConfig; +use crate::core::publish_proxy::PublishProxyStore; use crate::transport::jsonrpc::RpcContext; use crate::transport::jsonrpc::auth; pub async fn start_server( addr: SocketAddr, rpc_cfg: &RpcConfig, - bridge_cfg: &BridgeConfig, + publish_proxy_store: PublishProxyStore, root: RpcModule<RpcContext>, ) -> Result<ServerHandle> { let mut builder = ServerConfigBuilder::new() @@ -35,18 +36,17 @@ pub async fn start_server( } let server_cfg = builder.build(); - let bridge_bearer_token = bridge_cfg.bearer_token().map(str::to_owned); let server = ServerBuilder::with_config(server_cfg) .set_http_middleware(tower::ServiceBuilder::new().map_request( move |mut request: HttpRequest<HttpBody>| { - let bridge_auth = auth::authorize_bridge_request( + let publish_proxy_auth = auth::authorize_publish_proxy_request( request .headers() .get("authorization") .and_then(|value| value.to_str().ok()), - bridge_bearer_token.as_deref(), + &publish_proxy_store, ); - request.extensions_mut().insert(bridge_auth); + request.extensions_mut().insert(publish_proxy_auth); request }, ))