radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

mod.rs (107515B)


      1 use std::collections::BTreeMap;
      2 use std::fmt;
      3 use std::future::Future;
      4 use std::net::IpAddr;
      5 use std::path::{Path, PathBuf};
      6 use std::pin::Pin;
      7 use std::str::FromStr;
      8 use std::sync::{Arc, Mutex};
      9 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
     10 
     11 use radroots_events::RadrootsNostrEvent;
     12 use radroots_events::draft::{
     13     RadrootsDraftError, RadrootsSignedNostrEvent, RadrootsSignedNostrEventParts,
     14 };
     15 use radroots_nostr::prelude::{
     16     RadrootsNostrClient, RadrootsNostrEventVerification, RadrootsNostrFilter, RadrootsNostrKind,
     17     RadrootsNostrPublicKey, radroots_nostr_verify_event,
     18 };
     19 use radroots_publish_proxy_protocol::{
     20     PublishDeliveryPolicy, PublishEventRequest, PublishEventResponse, PublishJobStatus,
     21     PublishJobView, PublishRelayOutcome, PublishRelayOutcomeKind, PublishRelayPolicy,
     22     PublishRelaySource, SignedNostrEventWire,
     23 };
     24 use radroots_relay_transport::{
     25     RadrootsNostrClientPublishAdapter, RadrootsRelayOutcome, RadrootsRelayOutcomeKind,
     26     RadrootsRelayPublishAdapter, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest,
     27     RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy,
     28 };
     29 use rusqlite::types::Type;
     30 use rusqlite::{Connection, OptionalExtension, Row, params};
     31 use serde::{Deserialize, Serialize};
     32 use sha2::{Digest, Sha256};
     33 use thiserror::Error;
     34 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
     35 use uuid::Uuid;
     36 
     37 use crate::app::config::PublishProxyConfig;
     38 
     39 const TOKEN_PREFIX: &str = "rrd_pp_";
     40 const TOKEN_HASH_PREFIX: &str = "sha256:";
     41 const SCHEMA_VERSION: i64 = 2;
     42 
     43 #[derive(Debug, Error)]
     44 pub enum PublishProxyError {
     45     #[error("publish proxy storage error: {0}")]
     46     Sqlite(#[from] rusqlite::Error),
     47     #[error("publish proxy json error: {0}")]
     48     Json(#[from] serde_json::Error),
     49     #[error("publish proxy io error: {0}")]
     50     Io(#[from] std::io::Error),
     51     #[error("invalid publish proxy scope: {0}")]
     52     InvalidScope(String),
     53     #[error("invalid signed Nostr event: {0}")]
     54     InvalidSignedEvent(String),
     55     #[error("signed Nostr event verification failed: {0:?}")]
     56     SignedEventVerification(RadrootsNostrEventVerification),
     57     #[error("signed Nostr event conversion error: {0}")]
     58     Draft(#[from] RadrootsDraftError),
     59     #[error("publish proxy relay error: {0}")]
     60     Relay(#[from] RadrootsRelayTransportError),
     61     #[error("publish proxy transport error: {0}")]
     62     Transport(String),
     63     #[error("publish proxy concurrency limit reached")]
     64     ConcurrencyLimit,
     65     #[error("publish proxy idempotency conflict for key `{0}`")]
     66     IdempotencyConflict(String),
     67 }
     68 
     69 #[derive(Clone)]
     70 pub struct PublishProxy {
     71     pub config: PublishProxyConfig,
     72     pub store: PublishProxyStore,
     73     publisher: Option<Arc<dyn RadrootsRelayPublishAdapter>>,
     74     resolver: Arc<dyn PublishRelayResolver>,
     75     author_relay_discovery: Arc<dyn PublishAuthorRelayDiscovery>,
     76     publish_jobs: Arc<Semaphore>,
     77 }
     78 
     79 impl PublishProxy {
     80     pub fn open(config: PublishProxyConfig) -> Result<Self, PublishProxyError> {
     81         let store = PublishProxyStore::open(config.database_path.clone())?;
     82         let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs));
     83         Ok(Self {
     84             config,
     85             store,
     86             publisher: None,
     87             resolver: Arc::new(SystemPublishRelayResolver),
     88             author_relay_discovery: Arc::new(NostrPublishAuthorRelayDiscovery),
     89             publish_jobs,
     90         })
     91     }
     92 
     93     pub fn memory(config: PublishProxyConfig) -> Result<Self, PublishProxyError> {
     94         let store = PublishProxyStore::memory()?;
     95         let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs));
     96         Ok(Self {
     97             config,
     98             store,
     99             publisher: None,
    100             resolver: Arc::new(SystemPublishRelayResolver),
    101             author_relay_discovery: Arc::new(NostrPublishAuthorRelayDiscovery),
    102             publish_jobs,
    103         })
    104     }
    105 
    106     pub fn with_publisher(mut self, publisher: Arc<dyn RadrootsRelayPublishAdapter>) -> Self {
    107         self.publisher = Some(publisher);
    108         self
    109     }
    110 
    111     #[cfg(test)]
    112     pub(crate) fn with_relay_resolver(mut self, resolver: Arc<dyn PublishRelayResolver>) -> Self {
    113         self.resolver = resolver;
    114         self
    115     }
    116 
    117     #[cfg(test)]
    118     fn with_author_relay_discovery(
    119         mut self,
    120         author_relay_discovery: Arc<dyn PublishAuthorRelayDiscovery>,
    121     ) -> Self {
    122         self.author_relay_discovery = author_relay_discovery;
    123         self
    124     }
    125 
    126     fn acquire_publish_permit(&self) -> Result<OwnedSemaphorePermit, PublishProxyError> {
    127         self.publish_jobs
    128             .clone()
    129             .try_acquire_owned()
    130             .map_err(|_| PublishProxyError::ConcurrencyLimit)
    131     }
    132 
    133     pub async fn publish_event(
    134         &self,
    135         principal: &PublishPrincipal,
    136         request: PublishEventRequest,
    137     ) -> Result<PublishEventResponse, PublishProxyError> {
    138         request
    139             .validate(self.config.max_relays_per_request)
    140             .map_err(|error| {
    141                 PublishProxyError::InvalidSignedEvent(format!(
    142                     "publish request validation failed: {error}"
    143                 ))
    144             })?;
    145         principal.allows_event(&request)?;
    146         let signed_event = signed_event_from_wire(&request.event)?;
    147         if signed_event.raw_json.len() > self.config.max_event_bytes {
    148             return Err(PublishProxyError::InvalidSignedEvent(
    149                 "signed event exceeds publish_proxy max_event_bytes".to_owned(),
    150             ));
    151         }
    152         let effective_timeout_ms = effective_publish_timeout_ms(&self.config, request.timeout_ms)?;
    153         let _permit = self.acquire_publish_permit()?;
    154         let request_fingerprint = request_intent_fingerprint(
    155             principal.principal_id.as_str(),
    156             signed_event.raw_json.as_str(),
    157             &request,
    158             effective_timeout_ms,
    159         )?;
    160         let resolution = self
    161             .resolve_relays_for_request(signed_event.pubkey.as_str(), &request)
    162             .await?;
    163         let response = self.store.record_publish_job(PublishJobInsert {
    164             principal_id: principal.principal_id.clone(),
    165             idempotency_key: request.idempotency_key.clone(),
    166             request: request.clone(),
    167             request_fingerprint,
    168             effective_relay_count: resolution.targets.len(),
    169         })?;
    170         if response.deduplicated {
    171             return Ok(response);
    172         }
    173         let completed = self
    174             .complete_job_execution(
    175                 response.job.job_id.as_str(),
    176                 signed_event,
    177                 request.delivery_policy.clone(),
    178                 effective_timeout_ms,
    179                 resolution,
    180             )
    181             .await?;
    182         Ok(PublishEventResponse {
    183             deduplicated: false,
    184             job: completed,
    185         })
    186     }
    187 
    188     pub async fn resolve_relays_for_request(
    189         &self,
    190         pubkey: &str,
    191         request: &PublishEventRequest,
    192     ) -> Result<PublishRelayResolution, PublishProxyError> {
    193         match request.relay_policy {
    194             PublishRelayPolicy::ExplicitOnly => self.resolve_request_relays(&request.relays).await,
    195             PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault => {
    196                 if !request.relays.is_empty() {
    197                     self.resolve_request_relays(&request.relays).await
    198                 } else {
    199                     self.resolve_author_or_default_relays(pubkey).await
    200                 }
    201             }
    202             PublishRelayPolicy::AuthorWriteThenDaemonDefault => {
    203                 self.resolve_author_or_default_relays(pubkey).await
    204             }
    205             PublishRelayPolicy::DaemonDefaultOnly => self.resolve_daemon_default_relays().await,
    206         }
    207     }
    208 
    209     async fn resolve_author_or_default_relays(
    210         &self,
    211         pubkey: &str,
    212     ) -> Result<PublishRelayResolution, PublishProxyError> {
    213         let mut author_relays = self.resolve_author_write_relays(pubkey).await?;
    214         if author_relays.targets.is_empty() {
    215             let mut daemon_defaults = self.resolve_daemon_default_relays().await?;
    216             daemon_defaults.outcomes.append(&mut author_relays.outcomes);
    217             Ok(daemon_defaults)
    218         } else {
    219             Ok(author_relays)
    220         }
    221     }
    222 
    223     async fn resolve_request_relays(
    224         &self,
    225         relays: &[String],
    226     ) -> Result<PublishRelayResolution, PublishProxyError> {
    227         let mut targets = Vec::new();
    228         let mut outcomes = Vec::new();
    229         for relay in relays {
    230             match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) {
    231                 Ok(url) => {
    232                     self.push_checked_relay_target(
    233                         &mut targets,
    234                         &mut outcomes,
    235                         url,
    236                         PublishRelaySource::Request,
    237                     )
    238                     .await;
    239                 }
    240                 Err(error) => outcomes.push(PublishRelayOutcome {
    241                     relay_url: relay.trim().to_owned(),
    242                     source: PublishRelaySource::Request,
    243                     attempted: false,
    244                     outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected,
    245                     message: Some(error.to_string()),
    246                     latency_ms: None,
    247                 }),
    248             }
    249         }
    250         Ok(PublishRelayResolution { targets, outcomes })
    251     }
    252 
    253     async fn resolve_author_write_relays(
    254         &self,
    255         pubkey: &str,
    256     ) -> Result<PublishRelayResolution, PublishProxyError> {
    257         let cached = self.store.cached_author_write_relays(pubkey)?;
    258         let mut cached_resolution = self.resolve_author_relay_inputs(&cached).await?;
    259         if !cached_resolution.targets.is_empty() {
    260             return Ok(cached_resolution);
    261         }
    262         if self.config.author_relay_discovery_relays.is_empty() {
    263             return Ok(cached_resolution);
    264         }
    265         let mut discovery_targets = self
    266             .resolve_config_relays(
    267                 &self.config.author_relay_discovery_relays,
    268                 PublishRelaySource::DaemonDefault,
    269             )
    270             .await?;
    271         if discovery_targets.targets.is_empty() {
    272             discovery_targets
    273                 .outcomes
    274                 .append(&mut cached_resolution.outcomes);
    275             return Ok(discovery_targets);
    276         }
    277         let discovered = self
    278             .author_relay_discovery
    279             .fetch_author_write_relays(
    280                 pubkey,
    281                 std::mem::take(&mut discovery_targets.targets),
    282                 self.config.connect_timeout_secs,
    283             )
    284             .await?;
    285         self.store.cache_author_write_relays(pubkey, &discovered)?;
    286         let mut discovered_resolution = self.resolve_author_relay_inputs(&discovered).await?;
    287         discovered_resolution
    288             .outcomes
    289             .append(&mut cached_resolution.outcomes);
    290         discovered_resolution
    291             .outcomes
    292             .append(&mut discovery_targets.outcomes);
    293         Ok(discovered_resolution)
    294     }
    295 
    296     async fn resolve_author_relay_inputs(
    297         &self,
    298         relays: &[String],
    299     ) -> Result<PublishRelayResolution, PublishProxyError> {
    300         let mut targets = Vec::new();
    301         let mut outcomes = Vec::new();
    302         for relay in relays {
    303             match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) {
    304                 Ok(url) => {
    305                     self.push_checked_relay_target(
    306                         &mut targets,
    307                         &mut outcomes,
    308                         url,
    309                         PublishRelaySource::AuthorWrite,
    310                     )
    311                     .await;
    312                 }
    313                 Err(error) => outcomes.push(PublishRelayOutcome {
    314                     relay_url: relay.trim().to_owned(),
    315                     source: PublishRelaySource::AuthorWrite,
    316                     attempted: false,
    317                     outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected,
    318                     message: Some(error.to_string()),
    319                     latency_ms: None,
    320                 }),
    321             }
    322         }
    323         Ok(PublishRelayResolution { targets, outcomes })
    324     }
    325 
    326     async fn resolve_daemon_default_relays(
    327         &self,
    328     ) -> Result<PublishRelayResolution, PublishProxyError> {
    329         self.resolve_config_relays(
    330             &self.config.daemon_default_publish_relays,
    331             PublishRelaySource::DaemonDefault,
    332         )
    333         .await
    334     }
    335 
    336     async fn resolve_config_relays(
    337         &self,
    338         relays: &[String],
    339         source: PublishRelaySource,
    340     ) -> Result<PublishRelayResolution, PublishProxyError> {
    341         let mut targets = Vec::new();
    342         let mut outcomes = Vec::new();
    343         for relay in relays {
    344             match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) {
    345                 Ok(url) => {
    346                     self.push_checked_relay_target(&mut targets, &mut outcomes, url, source)
    347                         .await;
    348                 }
    349                 Err(error) => outcomes.push(PublishRelayOutcome {
    350                     relay_url: relay.trim().to_owned(),
    351                     source,
    352                     attempted: false,
    353                     outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected,
    354                     message: Some(error.to_string()),
    355                     latency_ms: None,
    356                 }),
    357             }
    358         }
    359         Ok(PublishRelayResolution { targets, outcomes })
    360     }
    361 
    362     async fn push_checked_relay_target(
    363         &self,
    364         targets: &mut Vec<ResolvedPublishRelay>,
    365         outcomes: &mut Vec<PublishRelayOutcome>,
    366         url: RadrootsRelayUrl,
    367         source: PublishRelaySource,
    368     ) {
    369         if relay_url_policy(&self.config) == RadrootsRelayUrlPolicy::Localhost {
    370             push_resolved_relay(targets, url, source);
    371             return;
    372         }
    373         match self.resolver.resolve(&url).await {
    374             Ok(addresses) if addresses.is_empty() => {
    375                 outcomes.push(relay_resolution_connection_failure(
    376                     url.as_str(),
    377                     source,
    378                     "dns lookup returned no addresses",
    379                 ));
    380             }
    381             Ok(addresses) => match url.validate_public_resolved_ip_addrs(addresses) {
    382                 Ok(()) => push_resolved_relay(targets, url, source),
    383                 Err(error) => outcomes.push(PublishRelayOutcome {
    384                     relay_url: url.as_str().to_owned(),
    385                     source,
    386                     attempted: false,
    387                     outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected,
    388                     message: Some(error.to_string()),
    389                     latency_ms: None,
    390                 }),
    391             },
    392             Err(error) => outcomes.push(relay_resolution_connection_failure(
    393                 url.as_str(),
    394                 source,
    395                 format!("dns lookup failed: {error}"),
    396             )),
    397         }
    398     }
    399 
    400     async fn complete_job_execution(
    401         &self,
    402         job_id: &str,
    403         signed_event: RadrootsSignedNostrEvent,
    404         delivery_policy: PublishDeliveryPolicy,
    405         timeout_ms: u64,
    406         resolution: PublishRelayResolution,
    407     ) -> Result<PublishJobView, PublishProxyError> {
    408         if resolution.targets.is_empty() {
    409             let status = if resolution
    410                 .outcomes
    411                 .iter()
    412                 .any(|outcome| outcome.outcome_kind.is_retryable())
    413             {
    414                 PublishJobStatus::DeliveryUnsatisfiedRetryable
    415             } else {
    416                 PublishJobStatus::Rejected
    417             };
    418             let last_error = if status == PublishJobStatus::DeliveryUnsatisfiedRetryable {
    419                 "delivery_unsatisfied"
    420             } else {
    421                 "no_publish_relays"
    422             };
    423             self.store.complete_publish_job(
    424                 job_id,
    425                 status,
    426                 resolution.outcomes,
    427                 Some(last_error.to_owned()),
    428             )?;
    429             return self.store.job_by_id(job_id);
    430         }
    431         let required_ack_count = delivery_policy.required_ack_count(resolution.targets.len());
    432         if required_ack_count > resolution.targets.len() {
    433             self.store.complete_publish_job(
    434                 job_id,
    435                 PublishJobStatus::Rejected,
    436                 resolution.outcomes,
    437                 Some("delivery_quorum_exceeds_relay_count".to_owned()),
    438             )?;
    439             return self.store.job_by_id(job_id);
    440         }
    441         let source_by_relay = resolution.source_by_relay();
    442         let target_set = RadrootsRelayTargetSet::from_urls(
    443             resolution
    444                 .targets
    445                 .iter()
    446                 .map(|target| target.url.clone())
    447                 .collect(),
    448         )?;
    449         let publish_request =
    450             RadrootsRelayPublishRequest::new(signed_event, target_set, current_unix_millis())
    451                 .with_accepted_quorum(required_ack_count);
    452         let started = Instant::now();
    453         let publish_timeout = Duration::from_millis(timeout_ms);
    454         let receipts =
    455             match tokio::time::timeout(publish_timeout, self.publish_with_adapter(publish_request))
    456                 .await
    457             {
    458                 Ok(Ok(receipts)) => receipts,
    459                 Ok(Err(error)) => transport_error_receipts(&resolution.targets, error),
    460                 Err(_) => timeout_receipts(&resolution.targets),
    461             };
    462         let latency_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
    463         let mut outcomes = resolution.outcomes;
    464         outcomes.extend(receipts.into_iter().map(|receipt| {
    465             publish_outcome_from_receipt(receipt, &source_by_relay, Some(latency_ms))
    466         }));
    467         let status = delivery_status(&delivery_policy, resolution.targets.len(), &outcomes);
    468         let last_error = if status == PublishJobStatus::DeliverySatisfied {
    469             None
    470         } else {
    471             Some("delivery_unsatisfied".to_owned())
    472         };
    473         self.store
    474             .complete_publish_job(job_id, status, outcomes, last_error)?;
    475         self.store.job_by_id(job_id)
    476     }
    477 
    478     async fn publish_with_adapter(
    479         &self,
    480         request: RadrootsRelayPublishRequest,
    481     ) -> Result<Vec<RadrootsRelayPublishRelayReceipt>, PublishProxyError> {
    482         if let Some(publisher) = &self.publisher {
    483             return publisher
    484                 .publish(request)
    485                 .await
    486                 .map_err(PublishProxyError::Relay);
    487         }
    488         let adapter = RadrootsNostrClientPublishAdapter::new(RadrootsNostrClient::new_signerless());
    489         adapter
    490             .publish(request)
    491             .await
    492             .map_err(PublishProxyError::Relay)
    493     }
    494 }
    495 
    496 #[derive(Clone)]
    497 pub struct PublishProxyStore {
    498     inner: Arc<Mutex<Connection>>,
    499 }
    500 
    501 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    502 #[serde(rename_all = "snake_case")]
    503 pub enum PublishJobVisibility {
    504     Own,
    505     Admin,
    506 }
    507 
    508 impl FromStr for PublishJobVisibility {
    509     type Err = PublishProxyError;
    510 
    511     fn from_str(value: &str) -> Result<Self, Self::Err> {
    512         match value {
    513             "own" => Ok(Self::Own),
    514             "admin" => Ok(Self::Admin),
    515             other => Err(PublishProxyError::InvalidScope(format!(
    516                 "unknown job visibility `{other}`"
    517             ))),
    518         }
    519     }
    520 }
    521 
    522 impl fmt::Display for PublishJobVisibility {
    523     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    524         match self {
    525             Self::Own => f.write_str("own"),
    526             Self::Admin => f.write_str("admin"),
    527         }
    528     }
    529 }
    530 
    531 #[derive(Debug, Clone, PartialEq, Eq)]
    532 pub struct PublishPrincipalInit {
    533     pub label: String,
    534     pub token_hash: String,
    535     pub allowed_pubkeys: Vec<String>,
    536     pub allowed_kinds: Vec<u32>,
    537     pub allowed_relay_policies: Vec<PublishRelayPolicy>,
    538     pub allow_request_relays: bool,
    539     pub job_visibility: PublishJobVisibility,
    540     pub expires_at_unix: Option<i64>,
    541 }
    542 
    543 #[derive(Debug, Clone, PartialEq, Eq)]
    544 pub struct PublishPrincipal {
    545     pub principal_id: String,
    546     pub label: String,
    547     pub allowed_pubkeys: Vec<String>,
    548     pub allowed_kinds: Vec<u32>,
    549     pub allowed_relay_policies: Vec<PublishRelayPolicy>,
    550     pub allow_request_relays: bool,
    551     pub job_visibility: PublishJobVisibility,
    552     pub expires_at_unix: Option<i64>,
    553 }
    554 
    555 impl PublishPrincipal {
    556     pub fn allows_event(&self, request: &PublishEventRequest) -> Result<(), PublishProxyError> {
    557         ensure_lower_hex("pubkey", request.event.pubkey.as_str(), 64)?;
    558         if !self
    559             .allowed_pubkeys
    560             .iter()
    561             .any(|pubkey| pubkey == &request.event.pubkey)
    562         {
    563             return Err(PublishProxyError::InvalidScope(
    564                 "principal is not allowed to publish for event pubkey".to_owned(),
    565             ));
    566         }
    567         if !self.allowed_kinds.contains(&request.event.kind) {
    568             return Err(PublishProxyError::InvalidScope(
    569                 "principal is not allowed to publish event kind".to_owned(),
    570             ));
    571         }
    572         if !self.allowed_relay_policies.contains(&request.relay_policy) {
    573             return Err(PublishProxyError::InvalidScope(
    574                 "principal is not allowed to use requested relay policy".to_owned(),
    575             ));
    576         }
    577         if !self.allow_request_relays && !request.relays.is_empty() {
    578             return Err(PublishProxyError::InvalidScope(
    579                 "principal is not allowed to provide request relays".to_owned(),
    580             ));
    581         }
    582         Ok(())
    583     }
    584 
    585     fn can_read_job(&self, principal_id: &str) -> bool {
    586         self.job_visibility == PublishJobVisibility::Admin || self.principal_id == principal_id
    587     }
    588 }
    589 
    590 #[derive(Debug, Clone)]
    591 pub struct PublishJobInsert {
    592     pub principal_id: String,
    593     pub idempotency_key: Option<String>,
    594     pub request: PublishEventRequest,
    595     pub request_fingerprint: String,
    596     pub effective_relay_count: usize,
    597 }
    598 
    599 #[derive(Debug, Clone, PartialEq, Eq)]
    600 pub struct ResolvedPublishRelay {
    601     pub url: RadrootsRelayUrl,
    602     pub source: PublishRelaySource,
    603 }
    604 
    605 #[derive(Debug, Clone, PartialEq, Eq)]
    606 pub struct PublishRelayResolution {
    607     pub targets: Vec<ResolvedPublishRelay>,
    608     pub outcomes: Vec<PublishRelayOutcome>,
    609 }
    610 
    611 impl PublishRelayResolution {
    612     fn source_by_relay(&self) -> BTreeMap<String, PublishRelaySource> {
    613         self.targets
    614             .iter()
    615             .map(|target| (target.url.as_str().to_owned(), target.source))
    616             .collect()
    617     }
    618 }
    619 
    620 pub(crate) type PublishRelayResolveFuture<'a> =
    621     Pin<Box<dyn Future<Output = Result<Vec<IpAddr>, std::io::Error>> + Send + 'a>>;
    622 
    623 pub(crate) trait PublishRelayResolver: Send + Sync {
    624     fn resolve<'a>(&'a self, url: &'a RadrootsRelayUrl) -> PublishRelayResolveFuture<'a>;
    625 }
    626 
    627 type PublishAuthorRelayDiscoveryFuture<'a> =
    628     Pin<Box<dyn Future<Output = Result<Vec<String>, PublishProxyError>> + Send + 'a>>;
    629 
    630 trait PublishAuthorRelayDiscovery: Send + Sync {
    631     fn fetch_author_write_relays<'a>(
    632         &'a self,
    633         pubkey: &'a str,
    634         discovery_targets: Vec<ResolvedPublishRelay>,
    635         connect_timeout_secs: u64,
    636     ) -> PublishAuthorRelayDiscoveryFuture<'a>;
    637 }
    638 
    639 #[derive(Debug)]
    640 struct SystemPublishRelayResolver;
    641 
    642 impl PublishRelayResolver for SystemPublishRelayResolver {
    643     fn resolve<'a>(&'a self, url: &'a RadrootsRelayUrl) -> PublishRelayResolveFuture<'a> {
    644         Box::pin(async move {
    645             let (host, port) = relay_socket_target(url)?;
    646             let addrs = tokio::net::lookup_host((host.as_str(), port)).await?;
    647             Ok(addrs.map(|addr| addr.ip()).collect())
    648         })
    649     }
    650 }
    651 
    652 #[derive(Debug)]
    653 struct NostrPublishAuthorRelayDiscovery;
    654 
    655 impl PublishAuthorRelayDiscovery for NostrPublishAuthorRelayDiscovery {
    656     fn fetch_author_write_relays<'a>(
    657         &'a self,
    658         pubkey: &'a str,
    659         discovery_targets: Vec<ResolvedPublishRelay>,
    660         connect_timeout_secs: u64,
    661     ) -> PublishAuthorRelayDiscoveryFuture<'a> {
    662         Box::pin(async move {
    663             let Ok(public_key) = RadrootsNostrPublicKey::from_hex(pubkey) else {
    664                 return Ok(Vec::new());
    665             };
    666             let client = RadrootsNostrClient::new_signerless();
    667             for target in discovery_targets {
    668                 if client.add_read_relay(target.url.as_str()).await.is_err() {
    669                     return Ok(Vec::new());
    670                 }
    671             }
    672             let filter = RadrootsNostrFilter::new()
    673                 .author(public_key)
    674                 .kind(RadrootsNostrKind::Custom(10_002))
    675                 .limit(10);
    676             let timeout = Duration::from_secs(connect_timeout_secs);
    677             let Ok(events) = client.fetch_events(filter, timeout).await else {
    678                 return Ok(Vec::new());
    679             };
    680             let Some(event) = events.into_iter().max_by(|left, right| {
    681                 left.created_at
    682                     .as_secs()
    683                     .cmp(&right.created_at.as_secs())
    684                     .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
    685             }) else {
    686                 return Ok(Vec::new());
    687             };
    688             Ok(author_write_relays_from_nip65_event(&event))
    689         })
    690     }
    691 }
    692 
    693 impl PublishProxyStore {
    694     pub fn open(path: PathBuf) -> Result<Self, PublishProxyError> {
    695         if let Some(parent) = path
    696             .parent()
    697             .filter(|parent| !parent.as_os_str().is_empty())
    698         {
    699             std::fs::create_dir_all(parent)?;
    700         }
    701         let connection = Connection::open(path)?;
    702         Self::from_connection(connection)
    703     }
    704 
    705     pub fn memory() -> Result<Self, PublishProxyError> {
    706         Self::from_connection(Connection::open_in_memory()?)
    707     }
    708 
    709     fn from_connection(connection: Connection) -> Result<Self, PublishProxyError> {
    710         connection.execute_batch(
    711             r#"
    712             PRAGMA foreign_keys = ON;
    713             CREATE TABLE IF NOT EXISTS publish_proxy_principals (
    714                 principal_id TEXT PRIMARY KEY NOT NULL,
    715                 label TEXT NOT NULL,
    716                 token_hash TEXT NOT NULL UNIQUE,
    717                 allowed_pubkeys_json TEXT NOT NULL,
    718                 allowed_kinds_json TEXT NOT NULL,
    719                 allowed_relay_policies_json TEXT NOT NULL,
    720                 allow_request_relays INTEGER NOT NULL,
    721                 job_visibility TEXT NOT NULL,
    722                 expires_at_unix INTEGER,
    723                 revoked_at_unix INTEGER,
    724                 created_at_unix INTEGER NOT NULL
    725             );
    726             CREATE TABLE IF NOT EXISTS publish_proxy_jobs (
    727                 job_id TEXT PRIMARY KEY NOT NULL,
    728                 principal_id TEXT NOT NULL,
    729                 idempotency_key TEXT,
    730                 request_fingerprint TEXT NOT NULL,
    731                 status TEXT NOT NULL,
    732                 event_id TEXT NOT NULL,
    733                 event_pubkey TEXT NOT NULL,
    734                 event_kind INTEGER NOT NULL,
    735                 relay_policy_json TEXT NOT NULL,
    736                 delivery_policy_json TEXT NOT NULL,
    737                 requested_relay_count INTEGER NOT NULL,
    738                 effective_relay_count INTEGER NOT NULL,
    739                 request_json TEXT NOT NULL,
    740                 requested_at_ms INTEGER NOT NULL,
    741                 updated_at_ms INTEGER NOT NULL,
    742                 completed_at_ms INTEGER,
    743                 last_error TEXT,
    744                 FOREIGN KEY(principal_id) REFERENCES publish_proxy_principals(principal_id)
    745             );
    746             CREATE UNIQUE INDEX IF NOT EXISTS publish_proxy_jobs_principal_idempotency_idx
    747                 ON publish_proxy_jobs(principal_id, idempotency_key)
    748                 WHERE idempotency_key IS NOT NULL;
    749             CREATE TABLE IF NOT EXISTS publish_proxy_relay_results (
    750                 job_id TEXT NOT NULL,
    751                 relay_url TEXT NOT NULL,
    752                 source TEXT NOT NULL,
    753                 attempted INTEGER NOT NULL,
    754                 outcome_kind TEXT NOT NULL,
    755                 message TEXT,
    756                 latency_ms INTEGER,
    757                 updated_at_ms INTEGER NOT NULL,
    758                 PRIMARY KEY(job_id, relay_url),
    759                 FOREIGN KEY(job_id) REFERENCES publish_proxy_jobs(job_id)
    760             );
    761             CREATE TABLE IF NOT EXISTS publish_proxy_relay_list_cache (
    762                 pubkey TEXT PRIMARY KEY NOT NULL,
    763                 relays_json TEXT NOT NULL,
    764                 updated_at_ms INTEGER NOT NULL
    765             );
    766             "#,
    767         )?;
    768         migrate_schema(&connection)?;
    769         recover_interrupted_publish_jobs(&connection)?;
    770         connection.pragma_update(None, "user_version", SCHEMA_VERSION)?;
    771         Ok(Self {
    772             inner: Arc::new(Mutex::new(connection)),
    773         })
    774     }
    775 
    776     pub fn create_principal(
    777         &self,
    778         input: PublishPrincipalInit,
    779     ) -> Result<PublishPrincipal, PublishProxyError> {
    780         validate_principal_init(&input)?;
    781         let principal_id = Uuid::new_v4().to_string();
    782         let now = current_unix_secs();
    783         let connection = self
    784             .inner
    785             .lock()
    786             .unwrap_or_else(std::sync::PoisonError::into_inner);
    787         connection.execute(
    788             r#"
    789             INSERT INTO publish_proxy_principals (
    790                 principal_id,
    791                 label,
    792                 token_hash,
    793                 allowed_pubkeys_json,
    794                 allowed_kinds_json,
    795                 allowed_relay_policies_json,
    796                 allow_request_relays,
    797                 job_visibility,
    798                 expires_at_unix,
    799                 revoked_at_unix,
    800                 created_at_unix
    801             )
    802             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, NULL, ?10)
    803             "#,
    804             params![
    805                 principal_id,
    806                 input.label.trim(),
    807                 input.token_hash,
    808                 serde_json::to_string(&input.allowed_pubkeys)?,
    809                 serde_json::to_string(&input.allowed_kinds)?,
    810                 serde_json::to_string(&input.allowed_relay_policies)?,
    811                 input.allow_request_relays,
    812                 input.job_visibility.to_string(),
    813                 input.expires_at_unix,
    814                 now,
    815             ],
    816         )?;
    817         drop(connection);
    818         self.principal_by_id(principal_id.as_str())?
    819             .ok_or_else(|| PublishProxyError::InvalidScope("created principal missing".to_owned()))
    820     }
    821 
    822     pub fn principal_for_token_hash(
    823         &self,
    824         token_hash: &str,
    825     ) -> Result<Option<PublishPrincipal>, PublishProxyError> {
    826         let now = current_unix_secs();
    827         let connection = self
    828             .inner
    829             .lock()
    830             .unwrap_or_else(std::sync::PoisonError::into_inner);
    831         let principal = connection
    832             .query_row(
    833                 r#"
    834                 SELECT
    835                     principal_id,
    836                     label,
    837                     allowed_pubkeys_json,
    838                     allowed_kinds_json,
    839                     allowed_relay_policies_json,
    840                     allow_request_relays,
    841                     job_visibility,
    842                     expires_at_unix
    843                 FROM publish_proxy_principals
    844                 WHERE token_hash = ?1
    845                   AND revoked_at_unix IS NULL
    846                   AND (expires_at_unix IS NULL OR expires_at_unix > ?2)
    847                 "#,
    848                 params![token_hash, now],
    849                 principal_from_row,
    850             )
    851             .optional()?;
    852         Ok(principal)
    853     }
    854 
    855     pub fn principal_by_id(
    856         &self,
    857         principal_id: &str,
    858     ) -> Result<Option<PublishPrincipal>, PublishProxyError> {
    859         let connection = self
    860             .inner
    861             .lock()
    862             .unwrap_or_else(std::sync::PoisonError::into_inner);
    863         let principal = connection
    864             .query_row(
    865                 r#"
    866                 SELECT
    867                     principal_id,
    868                     label,
    869                     allowed_pubkeys_json,
    870                     allowed_kinds_json,
    871                     allowed_relay_policies_json,
    872                     allow_request_relays,
    873                     job_visibility,
    874                     expires_at_unix
    875                 FROM publish_proxy_principals
    876                 WHERE principal_id = ?1
    877                 "#,
    878                 params![principal_id],
    879                 principal_from_row,
    880             )
    881             .optional()?;
    882         Ok(principal)
    883     }
    884 
    885     pub fn record_publish_job(
    886         &self,
    887         insert: PublishJobInsert,
    888     ) -> Result<PublishEventResponse, PublishProxyError> {
    889         if let Some(idempotency_key) = insert.idempotency_key.as_deref() {
    890             if let Some(existing) =
    891                 self.job_for_principal_id_and_key(insert.principal_id.as_str(), idempotency_key)?
    892             {
    893                 if existing.request_fingerprint != insert.request_fingerprint {
    894                     return Err(PublishProxyError::IdempotencyConflict(
    895                         idempotency_key.to_owned(),
    896                     ));
    897                 }
    898                 return Ok(PublishEventResponse {
    899                     deduplicated: true,
    900                     job: existing.view,
    901                 });
    902             }
    903         }
    904 
    905         let job_id = Uuid::new_v4().to_string();
    906         let now = current_unix_millis();
    907         let request_json = serde_json::to_string(&insert.request)?;
    908         let connection = self
    909             .inner
    910             .lock()
    911             .unwrap_or_else(std::sync::PoisonError::into_inner);
    912         let insert_result = connection.execute(
    913             r#"
    914             INSERT INTO publish_proxy_jobs (
    915                 job_id,
    916                 principal_id,
    917                 idempotency_key,
    918                 request_fingerprint,
    919                 status,
    920                 event_id,
    921                 event_pubkey,
    922                 event_kind,
    923                 relay_policy_json,
    924                 delivery_policy_json,
    925                 requested_relay_count,
    926                 effective_relay_count,
    927                 request_json,
    928                 requested_at_ms,
    929                 updated_at_ms
    930             )
    931             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
    932             "#,
    933             params![
    934                 job_id,
    935                 insert.principal_id,
    936                 insert.idempotency_key,
    937                 insert.request_fingerprint,
    938                 serde_json::to_string(&PublishJobStatus::Publishing)?,
    939                 insert.request.event.id,
    940                 insert.request.event.pubkey,
    941                 insert.request.event.kind,
    942                 serde_json::to_string(&insert.request.relay_policy)?,
    943                 serde_json::to_string(&insert.request.delivery_policy)?,
    944                 insert.request.relays.len(),
    945                 insert.effective_relay_count,
    946                 request_json,
    947                 now,
    948                 now,
    949             ],
    950         );
    951         match insert_result {
    952             Ok(_) => {}
    953             Err(rusqlite::Error::SqliteFailure(error, _))
    954                 if error.code == rusqlite::ErrorCode::ConstraintViolation =>
    955             {
    956                 return Err(PublishProxyError::IdempotencyConflict(
    957                     "idempotency key conflicts with an existing publish job".to_owned(),
    958                 ));
    959             }
    960             Err(error) => return Err(error.into()),
    961         }
    962         drop(connection);
    963         let job = self.job_by_id(job_id.as_str())?;
    964         Ok(PublishEventResponse {
    965             deduplicated: false,
    966             job,
    967         })
    968     }
    969 
    970     pub fn job_by_id_for_principal(
    971         &self,
    972         job_id: &str,
    973         principal: &PublishPrincipal,
    974     ) -> Result<Option<PublishJobView>, PublishProxyError> {
    975         let connection = self
    976             .inner
    977             .lock()
    978             .unwrap_or_else(std::sync::PoisonError::into_inner);
    979         let sql = job_select_sql("WHERE job_id = ?1");
    980         let row = connection
    981             .query_row(sql.as_str(), params![job_id], job_from_row)
    982             .optional()?;
    983         drop(connection);
    984         let Some(mut job) = row else {
    985             return Ok(None);
    986         };
    987         if !principal.can_read_job(job.principal_id.as_str()) {
    988             return Ok(None);
    989         }
    990         job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?;
    991         finalize_job_view(&mut job.view);
    992         Ok(Some(job.view))
    993     }
    994 
    995     pub fn list_jobs_for_principal(
    996         &self,
    997         principal: &PublishPrincipal,
    998         limit: usize,
    999     ) -> Result<Vec<PublishJobView>, PublishProxyError> {
   1000         let limit = i64::try_from(limit.clamp(1, 200)).unwrap_or(200);
   1001         let connection = self
   1002             .inner
   1003             .lock()
   1004             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1005         let sql = if principal.job_visibility == PublishJobVisibility::Admin {
   1006             job_select_sql("ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?1")
   1007         } else {
   1008             job_select_sql(
   1009                 "WHERE principal_id = ?1 ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?2",
   1010             )
   1011         };
   1012         let mut stmt = connection.prepare(sql.as_str())?;
   1013         let rows = if principal.job_visibility == PublishJobVisibility::Admin {
   1014             stmt.query_map(params![limit], job_from_row)?
   1015                 .collect::<Result<Vec<_>, _>>()?
   1016         } else {
   1017             stmt.query_map(params![principal.principal_id, limit], job_from_row)?
   1018                 .collect::<Result<Vec<_>, _>>()?
   1019         };
   1020         drop(stmt);
   1021         drop(connection);
   1022 
   1023         rows.into_iter()
   1024             .map(|mut row| {
   1025                 row.view.relays = self.relay_outcomes(row.view.job_id.as_str())?;
   1026                 finalize_job_view(&mut row.view);
   1027                 Ok(row.view)
   1028             })
   1029             .collect()
   1030     }
   1031 
   1032     fn job_for_principal_id_and_key(
   1033         &self,
   1034         principal_id: &str,
   1035         idempotency_key: &str,
   1036     ) -> Result<Option<PublishJobRow>, PublishProxyError> {
   1037         let connection = self
   1038             .inner
   1039             .lock()
   1040             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1041         let sql = job_select_sql("WHERE principal_id = ?1 AND idempotency_key = ?2");
   1042         let row = connection
   1043             .query_row(
   1044                 sql.as_str(),
   1045                 params![principal_id, idempotency_key],
   1046                 job_from_row,
   1047             )
   1048             .optional()?;
   1049         drop(connection);
   1050         let Some(mut job) = row else {
   1051             return Ok(None);
   1052         };
   1053         job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?;
   1054         finalize_job_view(&mut job.view);
   1055         Ok(Some(job))
   1056     }
   1057 
   1058     pub fn job_by_id(&self, job_id: &str) -> Result<PublishJobView, PublishProxyError> {
   1059         let connection = self
   1060             .inner
   1061             .lock()
   1062             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1063         let sql = job_select_sql("WHERE job_id = ?1");
   1064         let row = connection
   1065             .query_row(sql.as_str(), params![job_id], job_from_row)
   1066             .optional()?;
   1067         drop(connection);
   1068         let Some(mut job) = row else {
   1069             return Err(PublishProxyError::InvalidScope(
   1070                 "unknown publish job".to_owned(),
   1071             ));
   1072         };
   1073         job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?;
   1074         finalize_job_view(&mut job.view);
   1075         Ok(job.view)
   1076     }
   1077 
   1078     pub fn complete_publish_job(
   1079         &self,
   1080         job_id: &str,
   1081         status: PublishJobStatus,
   1082         outcomes: Vec<PublishRelayOutcome>,
   1083         last_error: Option<String>,
   1084     ) -> Result<(), PublishProxyError> {
   1085         let now = current_unix_millis();
   1086         let connection = self
   1087             .inner
   1088             .lock()
   1089             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1090         connection.execute(
   1091             r#"
   1092             UPDATE publish_proxy_jobs
   1093             SET status = ?2,
   1094                 updated_at_ms = ?3,
   1095                 completed_at_ms = ?4,
   1096                 last_error = ?5
   1097             WHERE job_id = ?1
   1098             "#,
   1099             params![
   1100                 job_id,
   1101                 serde_json::to_string(&status)?,
   1102                 now,
   1103                 now,
   1104                 last_error,
   1105             ],
   1106         )?;
   1107         connection.execute(
   1108             "DELETE FROM publish_proxy_relay_results WHERE job_id = ?1",
   1109             params![job_id],
   1110         )?;
   1111         for outcome in outcomes {
   1112             connection.execute(
   1113                 r#"
   1114                 INSERT OR REPLACE INTO publish_proxy_relay_results (
   1115                     job_id,
   1116                     relay_url,
   1117                     source,
   1118                     attempted,
   1119                     outcome_kind,
   1120                     message,
   1121                     latency_ms,
   1122                     updated_at_ms
   1123                 )
   1124                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
   1125                 "#,
   1126                 params![
   1127                     job_id,
   1128                     outcome.relay_url,
   1129                     serde_json::to_string(&outcome.source)?,
   1130                     outcome.attempted,
   1131                     serde_json::to_string(&outcome.outcome_kind)?,
   1132                     outcome.message,
   1133                     outcome
   1134                         .latency_ms
   1135                         .and_then(|value| i64::try_from(value).ok()),
   1136                     now,
   1137                 ],
   1138             )?;
   1139         }
   1140         Ok(())
   1141     }
   1142 
   1143     pub fn cached_author_write_relays(
   1144         &self,
   1145         pubkey: &str,
   1146     ) -> Result<Vec<String>, PublishProxyError> {
   1147         let connection = self
   1148             .inner
   1149             .lock()
   1150             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1151         let relays_json = connection
   1152             .query_row(
   1153                 "SELECT relays_json FROM publish_proxy_relay_list_cache WHERE pubkey = ?1",
   1154                 params![pubkey],
   1155                 |row| row.get::<_, String>(0),
   1156             )
   1157             .optional()?;
   1158         relays_json
   1159             .map(|value| serde_json::from_str(value.as_str()).map_err(PublishProxyError::from))
   1160             .unwrap_or_else(|| Ok(Vec::new()))
   1161     }
   1162 
   1163     pub fn cache_author_write_relays(
   1164         &self,
   1165         pubkey: &str,
   1166         relays: &[String],
   1167     ) -> Result<(), PublishProxyError> {
   1168         let now = current_unix_millis();
   1169         let connection = self
   1170             .inner
   1171             .lock()
   1172             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1173         connection.execute(
   1174             r#"
   1175             INSERT INTO publish_proxy_relay_list_cache (pubkey, relays_json, updated_at_ms)
   1176             VALUES (?1, ?2, ?3)
   1177             ON CONFLICT(pubkey) DO UPDATE SET
   1178                 relays_json = excluded.relays_json,
   1179                 updated_at_ms = excluded.updated_at_ms
   1180             "#,
   1181             params![pubkey, serde_json::to_string(relays)?, now],
   1182         )?;
   1183         Ok(())
   1184     }
   1185 
   1186     fn relay_outcomes(&self, job_id: &str) -> Result<Vec<PublishRelayOutcome>, PublishProxyError> {
   1187         let connection = self
   1188             .inner
   1189             .lock()
   1190             .unwrap_or_else(std::sync::PoisonError::into_inner);
   1191         let mut stmt = connection.prepare(
   1192             r#"
   1193             SELECT relay_url, source, attempted, outcome_kind, message, latency_ms
   1194             FROM publish_proxy_relay_results
   1195             WHERE job_id = ?1
   1196             ORDER BY relay_url
   1197             "#,
   1198         )?;
   1199         let outcomes = stmt
   1200             .query_map(params![job_id], relay_outcome_from_row)?
   1201             .collect::<Result<Vec<_>, _>>()?;
   1202         Ok(outcomes)
   1203     }
   1204 }
   1205 
   1206 struct PublishJobRow {
   1207     principal_id: String,
   1208     request_fingerprint: String,
   1209     view: PublishJobView,
   1210 }
   1211 
   1212 fn migrate_schema(connection: &Connection) -> Result<(), PublishProxyError> {
   1213     let version: i64 = connection.pragma_query_value(None, "user_version", |row| row.get(0))?;
   1214     if version < 2 {
   1215         if !table_has_column(connection, "publish_proxy_jobs", "request_fingerprint")? {
   1216             connection.execute(
   1217                 "ALTER TABLE publish_proxy_jobs ADD COLUMN request_fingerprint TEXT NOT NULL DEFAULT ''",
   1218                 [],
   1219             )?;
   1220         }
   1221         if !table_has_column(connection, "publish_proxy_jobs", "effective_relay_count")? {
   1222             connection.execute(
   1223                 "ALTER TABLE publish_proxy_jobs ADD COLUMN effective_relay_count INTEGER NOT NULL DEFAULT 0",
   1224                 [],
   1225             )?;
   1226             connection.execute(
   1227                 "UPDATE publish_proxy_jobs SET effective_relay_count = requested_relay_count WHERE effective_relay_count = 0",
   1228                 [],
   1229             )?;
   1230         }
   1231     }
   1232     Ok(())
   1233 }
   1234 
   1235 fn recover_interrupted_publish_jobs(connection: &Connection) -> Result<(), PublishProxyError> {
   1236     let now = current_unix_millis();
   1237     connection.execute(
   1238         r#"
   1239         UPDATE publish_proxy_jobs
   1240         SET status = ?1,
   1241             updated_at_ms = ?2,
   1242             completed_at_ms = ?3,
   1243             last_error = ?4
   1244         WHERE status = ?5
   1245         "#,
   1246         params![
   1247             serde_json::to_string(&PublishJobStatus::DeliveryUnsatisfiedRetryable)?,
   1248             now,
   1249             now,
   1250             "publish_attempt_interrupted",
   1251             serde_json::to_string(&PublishJobStatus::Publishing)?,
   1252         ],
   1253     )?;
   1254     Ok(())
   1255 }
   1256 
   1257 fn table_has_column(
   1258     connection: &Connection,
   1259     table: &str,
   1260     column: &str,
   1261 ) -> Result<bool, PublishProxyError> {
   1262     let mut stmt = connection.prepare(format!("PRAGMA table_info({table})").as_str())?;
   1263     let columns = stmt
   1264         .query_map([], |row| row.get::<_, String>(1))?
   1265         .collect::<Result<Vec<_>, _>>()?;
   1266     Ok(columns.iter().any(|existing| existing == column))
   1267 }
   1268 
   1269 fn job_select_sql(tail: &str) -> String {
   1270     format!(
   1271         r#"
   1272         SELECT
   1273             job_id,
   1274             principal_id,
   1275             request_fingerprint,
   1276             status,
   1277             event_id,
   1278             event_pubkey,
   1279             event_kind,
   1280             relay_policy_json,
   1281             delivery_policy_json,
   1282             effective_relay_count,
   1283             requested_at_ms,
   1284             completed_at_ms,
   1285             last_error
   1286         FROM publish_proxy_jobs
   1287         {tail}
   1288         "#
   1289     )
   1290 }
   1291 
   1292 fn principal_from_row(row: &Row<'_>) -> Result<PublishPrincipal, rusqlite::Error> {
   1293     let visibility: String = row.get(6)?;
   1294     Ok(PublishPrincipal {
   1295         principal_id: row.get(0)?,
   1296         label: row.get(1)?,
   1297         allowed_pubkeys: json_column(row, 2)?,
   1298         allowed_kinds: json_column(row, 3)?,
   1299         allowed_relay_policies: json_column(row, 4)?,
   1300         allow_request_relays: row.get(5)?,
   1301         job_visibility: PublishJobVisibility::from_str(visibility.as_str())
   1302             .map_err(|error| conversion_error(6, error))?,
   1303         expires_at_unix: row.get(7)?,
   1304     })
   1305 }
   1306 
   1307 fn job_from_row(row: &Row<'_>) -> Result<PublishJobRow, rusqlite::Error> {
   1308     let status: PublishJobStatus = json_text(row, 3)?;
   1309     let relay_policy: PublishRelayPolicy = json_text(row, 7)?;
   1310     let delivery_policy: PublishDeliveryPolicy = json_text(row, 8)?;
   1311     let relay_count: i64 = row.get(9)?;
   1312     Ok(PublishJobRow {
   1313         principal_id: row.get(1)?,
   1314         request_fingerprint: row.get(2)?,
   1315         view: PublishJobView {
   1316             job_id: row.get(0)?,
   1317             status,
   1318             terminal: false,
   1319             delivery_satisfied: false,
   1320             event_id: row.get(4)?,
   1321             pubkey: row.get(5)?,
   1322             event_kind: row.get::<_, i64>(6)? as u32,
   1323             relay_policy,
   1324             delivery_policy,
   1325             relay_count: usize::try_from(relay_count).unwrap_or(0),
   1326             acknowledged_count: 0,
   1327             retryable_count: 0,
   1328             terminal_count: 0,
   1329             requested_at_ms: row.get(10)?,
   1330             completed_at_ms: row.get(11)?,
   1331             last_error: row.get(12)?,
   1332             relays: Vec::new(),
   1333         },
   1334     })
   1335 }
   1336 
   1337 fn relay_outcome_from_row(row: &Row<'_>) -> Result<PublishRelayOutcome, rusqlite::Error> {
   1338     let source: PublishRelaySource = json_text(row, 1)?;
   1339     let outcome_kind: PublishRelayOutcomeKind = json_text(row, 3)?;
   1340     Ok(PublishRelayOutcome {
   1341         relay_url: row.get(0)?,
   1342         source,
   1343         attempted: row.get(2)?,
   1344         outcome_kind,
   1345         message: row.get(4)?,
   1346         latency_ms: row
   1347             .get::<_, Option<i64>>(5)?
   1348             .map(|latency| u64::try_from(latency).unwrap_or(0)),
   1349     })
   1350 }
   1351 
   1352 fn finalize_job_view(view: &mut PublishJobView) {
   1353     view.acknowledged_count = view
   1354         .relays
   1355         .iter()
   1356         .filter(|relay| relay.outcome_kind.counts_toward_quorum())
   1357         .count();
   1358     view.retryable_count = view
   1359         .relays
   1360         .iter()
   1361         .filter(|relay| relay.outcome_kind.is_retryable())
   1362         .count();
   1363     view.terminal_count = view
   1364         .relays
   1365         .iter()
   1366         .filter(|relay| relay.outcome_kind.is_terminal_failure())
   1367         .count();
   1368     view.terminal = matches!(
   1369         view.status,
   1370         PublishJobStatus::DeliverySatisfied
   1371             | PublishJobStatus::DeliveryUnsatisfiedTerminal
   1372             | PublishJobStatus::Rejected
   1373     );
   1374     view.delivery_satisfied = view.status == PublishJobStatus::DeliverySatisfied;
   1375 }
   1376 
   1377 fn validate_principal_init(input: &PublishPrincipalInit) -> Result<(), PublishProxyError> {
   1378     if input.label.trim().is_empty() {
   1379         return Err(PublishProxyError::InvalidScope(
   1380             "principal label must not be empty".to_owned(),
   1381         ));
   1382     }
   1383     if !input.token_hash.starts_with(TOKEN_HASH_PREFIX) {
   1384         return Err(PublishProxyError::InvalidScope(
   1385             "principal token hash must use sha256 prefix".to_owned(),
   1386         ));
   1387     }
   1388     if input.allowed_pubkeys.is_empty() {
   1389         return Err(PublishProxyError::InvalidScope(
   1390             "principal must include at least one allowed pubkey".to_owned(),
   1391         ));
   1392     }
   1393     for pubkey in &input.allowed_pubkeys {
   1394         ensure_lower_hex("allowed_pubkey", pubkey, 64)?;
   1395     }
   1396     if input.allowed_kinds.is_empty() {
   1397         return Err(PublishProxyError::InvalidScope(
   1398             "principal must include at least one allowed kind".to_owned(),
   1399         ));
   1400     }
   1401     if input
   1402         .allowed_kinds
   1403         .iter()
   1404         .any(|kind| *kind > u16::MAX as u32)
   1405     {
   1406         return Err(PublishProxyError::InvalidScope(
   1407             "allowed kind exceeds publish proxy range".to_owned(),
   1408         ));
   1409     }
   1410     if input.allowed_relay_policies.is_empty() {
   1411         return Err(PublishProxyError::InvalidScope(
   1412             "principal must include at least one allowed relay policy".to_owned(),
   1413         ));
   1414     }
   1415     Ok(())
   1416 }
   1417 
   1418 pub fn generate_bearer_token() -> String {
   1419     let bytes: [u8; 32] = rand::random();
   1420     format!("{TOKEN_PREFIX}{}", hex_lower(&bytes))
   1421 }
   1422 
   1423 pub fn hash_bearer_token(token: &str) -> String {
   1424     let mut hasher = Sha256::new();
   1425     hasher.update(token.as_bytes());
   1426     format!("{TOKEN_HASH_PREFIX}{}", hex_lower(&hasher.finalize()))
   1427 }
   1428 
   1429 fn hex_lower(bytes: &[u8]) -> String {
   1430     let mut output = String::with_capacity(bytes.len() * 2);
   1431     for byte in bytes {
   1432         use std::fmt::Write;
   1433         let _ = write!(&mut output, "{byte:02x}");
   1434     }
   1435     output
   1436 }
   1437 
   1438 pub fn parse_relay_policy(value: &str) -> Result<PublishRelayPolicy, PublishProxyError> {
   1439     match value {
   1440         "explicit_only" => Ok(PublishRelayPolicy::ExplicitOnly),
   1441         "request_then_author_write_then_daemon_default" => {
   1442             Ok(PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault)
   1443         }
   1444         "author_write_then_daemon_default" => Ok(PublishRelayPolicy::AuthorWriteThenDaemonDefault),
   1445         "daemon_default_only" => Ok(PublishRelayPolicy::DaemonDefaultOnly),
   1446         other => Err(PublishProxyError::InvalidScope(format!(
   1447             "unknown relay policy `{other}`"
   1448         ))),
   1449     }
   1450 }
   1451 
   1452 fn signed_event_from_wire(
   1453     event: &SignedNostrEventWire,
   1454 ) -> Result<RadrootsSignedNostrEvent, PublishProxyError> {
   1455     event
   1456         .validate()
   1457         .map_err(|error| PublishProxyError::InvalidSignedEvent(error.to_string()))?;
   1458     let created_at = u32::try_from(event.created_at).map_err(|_| {
   1459         PublishProxyError::InvalidSignedEvent(
   1460             "signed event created_at exceeds daemon-supported range".to_owned(),
   1461         )
   1462     })?;
   1463     let raw_json = serde_json::to_string(event)?;
   1464     let radroots_event = RadrootsNostrEvent {
   1465         id: event.id.clone(),
   1466         author: event.pubkey.clone(),
   1467         created_at,
   1468         kind: event.kind,
   1469         tags: event.tags.clone(),
   1470         content: event.content.clone(),
   1471         sig: event.sig.clone(),
   1472     };
   1473     match radroots_nostr_verify_event(&radroots_event) {
   1474         RadrootsNostrEventVerification::Verified => {}
   1475         verification => return Err(PublishProxyError::SignedEventVerification(verification)),
   1476     }
   1477     RadrootsSignedNostrEvent::new(RadrootsSignedNostrEventParts {
   1478         id: event.id.clone(),
   1479         pubkey: event.pubkey.clone(),
   1480         created_at,
   1481         kind: event.kind,
   1482         tags: event.tags.clone(),
   1483         content: event.content.clone(),
   1484         sig: event.sig.clone(),
   1485         raw_json,
   1486     })
   1487     .map_err(PublishProxyError::from)
   1488 }
   1489 
   1490 fn request_intent_fingerprint(
   1491     principal_id: &str,
   1492     canonical_event_json: &str,
   1493     request: &PublishEventRequest,
   1494     effective_timeout_ms: u64,
   1495 ) -> Result<String, PublishProxyError> {
   1496     #[derive(Serialize)]
   1497     struct FingerprintInput<'a> {
   1498         principal_id: &'a str,
   1499         canonical_event_json: &'a str,
   1500         relays: Vec<String>,
   1501         relay_policy: &'a PublishRelayPolicy,
   1502         delivery_policy: &'a PublishDeliveryPolicy,
   1503         effective_timeout_ms: u64,
   1504     }
   1505 
   1506     let input = FingerprintInput {
   1507         principal_id,
   1508         canonical_event_json,
   1509         relays: request
   1510             .relays
   1511             .iter()
   1512             .map(|relay| relay.trim().to_owned())
   1513             .collect(),
   1514         relay_policy: &request.relay_policy,
   1515         delivery_policy: &request.delivery_policy,
   1516         effective_timeout_ms,
   1517     };
   1518     let bytes = serde_json::to_vec(&input)?;
   1519     let mut hasher = Sha256::new();
   1520     hasher.update(bytes);
   1521     Ok(hex_lower(&hasher.finalize()))
   1522 }
   1523 
   1524 fn effective_publish_timeout_ms(
   1525     config: &PublishProxyConfig,
   1526     timeout_ms: Option<u64>,
   1527 ) -> Result<u64, PublishProxyError> {
   1528     let max_timeout_ms = config.connect_timeout_secs.saturating_mul(1_000);
   1529     match timeout_ms {
   1530         Some(0) => Err(PublishProxyError::InvalidSignedEvent(
   1531             "timeout_ms must be greater than zero".to_owned(),
   1532         )),
   1533         Some(timeout_ms) if timeout_ms > max_timeout_ms => {
   1534             Err(PublishProxyError::InvalidSignedEvent(format!(
   1535                 "timeout_ms must be at most {max_timeout_ms}"
   1536             )))
   1537         }
   1538         Some(timeout_ms) => Ok(timeout_ms),
   1539         None => Ok(max_timeout_ms),
   1540     }
   1541 }
   1542 
   1543 fn push_resolved_relay(
   1544     targets: &mut Vec<ResolvedPublishRelay>,
   1545     url: RadrootsRelayUrl,
   1546     source: PublishRelaySource,
   1547 ) {
   1548     if !targets.iter().any(|target| target.url == url) {
   1549         targets.push(ResolvedPublishRelay { url, source });
   1550     }
   1551 }
   1552 
   1553 fn relay_resolution_connection_failure(
   1554     relay_url: impl Into<String>,
   1555     source: PublishRelaySource,
   1556     message: impl Into<String>,
   1557 ) -> PublishRelayOutcome {
   1558     PublishRelayOutcome {
   1559         relay_url: relay_url.into(),
   1560         source,
   1561         attempted: false,
   1562         outcome_kind: PublishRelayOutcomeKind::ConnectionFailed,
   1563         message: Some(message.into()),
   1564         latency_ms: None,
   1565     }
   1566 }
   1567 
   1568 fn relay_socket_target(url: &RadrootsRelayUrl) -> Result<(String, u16), std::io::Error> {
   1569     let parsed = url::Url::parse(url.as_str())
   1570         .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
   1571     let host = parsed
   1572         .host_str()
   1573         .filter(|host| !host.is_empty())
   1574         .ok_or_else(|| {
   1575             std::io::Error::new(
   1576                 std::io::ErrorKind::InvalidInput,
   1577                 "relay URL must include a DNS host",
   1578             )
   1579         })?
   1580         .to_owned();
   1581     let port = parsed.port_or_known_default().ok_or_else(|| {
   1582         std::io::Error::new(
   1583             std::io::ErrorKind::InvalidInput,
   1584             "relay URL scheme must have a default port",
   1585         )
   1586     })?;
   1587     Ok((host, port))
   1588 }
   1589 
   1590 fn relay_url_policy(config: &PublishProxyConfig) -> RadrootsRelayUrlPolicy {
   1591     match config.relay_url_policy {
   1592         crate::app::config::PublishProxyRelayUrlPolicy::Public => RadrootsRelayUrlPolicy::Public,
   1593         crate::app::config::PublishProxyRelayUrlPolicy::Localhost => {
   1594             RadrootsRelayUrlPolicy::Localhost
   1595         }
   1596     }
   1597 }
   1598 
   1599 fn author_write_relays_from_nip65_event(
   1600     event: &radroots_nostr::prelude::RadrootsNostrEvent,
   1601 ) -> Vec<String> {
   1602     event
   1603         .tags
   1604         .iter()
   1605         .filter_map(|tag| {
   1606             let values = tag.as_slice();
   1607             if values.first().map(String::as_str) != Some("r") {
   1608                 return None;
   1609             }
   1610             let relay = values.get(1)?.trim();
   1611             if relay.is_empty() {
   1612                 return None;
   1613             }
   1614             if values.get(2).map(String::as_str) == Some("read") {
   1615                 return None;
   1616             }
   1617             Some(relay.to_owned())
   1618         })
   1619         .collect()
   1620 }
   1621 
   1622 fn publish_outcome_from_receipt(
   1623     receipt: RadrootsRelayPublishRelayReceipt,
   1624     source_by_relay: &BTreeMap<String, PublishRelaySource>,
   1625     latency_ms: Option<u64>,
   1626 ) -> PublishRelayOutcome {
   1627     let source = source_by_relay
   1628         .get(receipt.relay_url.as_str())
   1629         .copied()
   1630         .unwrap_or(PublishRelaySource::DaemonDefault);
   1631     PublishRelayOutcome {
   1632         relay_url: receipt.relay_url,
   1633         source,
   1634         attempted: receipt.attempted,
   1635         outcome_kind: publish_outcome_kind(receipt.outcome.kind),
   1636         message: receipt.outcome.message,
   1637         latency_ms,
   1638     }
   1639 }
   1640 
   1641 fn publish_outcome_kind(kind: RadrootsRelayOutcomeKind) -> PublishRelayOutcomeKind {
   1642     match kind {
   1643         RadrootsRelayOutcomeKind::Accepted => PublishRelayOutcomeKind::Accepted,
   1644         RadrootsRelayOutcomeKind::DuplicateAccepted => PublishRelayOutcomeKind::DuplicateAccepted,
   1645         RadrootsRelayOutcomeKind::Blocked => PublishRelayOutcomeKind::Blocked,
   1646         RadrootsRelayOutcomeKind::RateLimited => PublishRelayOutcomeKind::RateLimited,
   1647         RadrootsRelayOutcomeKind::Invalid => PublishRelayOutcomeKind::Invalid,
   1648         RadrootsRelayOutcomeKind::PowRequired => PublishRelayOutcomeKind::PowRequired,
   1649         RadrootsRelayOutcomeKind::Restricted => PublishRelayOutcomeKind::Restricted,
   1650         RadrootsRelayOutcomeKind::AuthRequired => PublishRelayOutcomeKind::AuthRequired,
   1651         RadrootsRelayOutcomeKind::Muted => PublishRelayOutcomeKind::Muted,
   1652         RadrootsRelayOutcomeKind::Unsupported => PublishRelayOutcomeKind::Unsupported,
   1653         RadrootsRelayOutcomeKind::PaymentRequired => PublishRelayOutcomeKind::PaymentRequired,
   1654         RadrootsRelayOutcomeKind::Error => PublishRelayOutcomeKind::Error,
   1655         RadrootsRelayOutcomeKind::Timeout => PublishRelayOutcomeKind::Timeout,
   1656         RadrootsRelayOutcomeKind::ConnectionFailed => PublishRelayOutcomeKind::ConnectionFailed,
   1657         RadrootsRelayOutcomeKind::RelayUrlRejected => PublishRelayOutcomeKind::RelayUrlRejected,
   1658         RadrootsRelayOutcomeKind::SkippedAlreadyAccepted => {
   1659             PublishRelayOutcomeKind::SkippedAlreadyAccepted
   1660         }
   1661         RadrootsRelayOutcomeKind::Unknown => PublishRelayOutcomeKind::Unknown,
   1662     }
   1663 }
   1664 
   1665 fn delivery_status(
   1666     delivery_policy: &PublishDeliveryPolicy,
   1667     relay_count: usize,
   1668     outcomes: &[PublishRelayOutcome],
   1669 ) -> PublishJobStatus {
   1670     let required = delivery_policy.required_ack_count(relay_count);
   1671     let acknowledged = outcomes
   1672         .iter()
   1673         .filter(|outcome| outcome.outcome_kind.counts_toward_quorum())
   1674         .count();
   1675     if acknowledged >= required {
   1676         return PublishJobStatus::DeliverySatisfied;
   1677     }
   1678     if outcomes
   1679         .iter()
   1680         .any(|outcome| outcome.outcome_kind.is_retryable())
   1681     {
   1682         PublishJobStatus::DeliveryUnsatisfiedRetryable
   1683     } else {
   1684         PublishJobStatus::DeliveryUnsatisfiedTerminal
   1685     }
   1686 }
   1687 
   1688 fn timeout_receipts(targets: &[ResolvedPublishRelay]) -> Vec<RadrootsRelayPublishRelayReceipt> {
   1689     targets
   1690         .iter()
   1691         .map(|target| {
   1692             RadrootsRelayPublishRelayReceipt::attempted(
   1693                 target.url.as_str(),
   1694                 RadrootsRelayOutcome::timeout("timeout: publish attempt exceeded daemon bound"),
   1695             )
   1696         })
   1697         .collect()
   1698 }
   1699 
   1700 fn transport_error_receipts(
   1701     targets: &[ResolvedPublishRelay],
   1702     error: PublishProxyError,
   1703 ) -> Vec<RadrootsRelayPublishRelayReceipt> {
   1704     let message = format!("error: {error}");
   1705     targets
   1706         .iter()
   1707         .map(|target| {
   1708             RadrootsRelayPublishRelayReceipt::attempted(
   1709                 target.url.as_str(),
   1710                 RadrootsRelayOutcome::connection_failed(message.clone()),
   1711             )
   1712         })
   1713         .collect()
   1714 }
   1715 
   1716 pub fn write_token_file(path: &Path, token: &str) -> Result<(), PublishProxyError> {
   1717     if let Some(parent) = path
   1718         .parent()
   1719         .filter(|parent| !parent.as_os_str().is_empty())
   1720     {
   1721         std::fs::create_dir_all(parent)?;
   1722     }
   1723     let mut options = std::fs::OpenOptions::new();
   1724     options.write(true).create_new(true);
   1725     #[cfg(unix)]
   1726     {
   1727         use std::os::unix::fs::OpenOptionsExt;
   1728         options.mode(0o600);
   1729     }
   1730     use std::io::Write;
   1731     let mut file = options.open(path)?;
   1732     file.write_all(token.as_bytes())?;
   1733     file.write_all(b"\n")?;
   1734     Ok(())
   1735 }
   1736 
   1737 fn ensure_lower_hex(
   1738     field: &str,
   1739     value: &str,
   1740     expected_len: usize,
   1741 ) -> Result<(), PublishProxyError> {
   1742     if value.len() == expected_len
   1743         && value
   1744             .bytes()
   1745             .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f'))
   1746     {
   1747         Ok(())
   1748     } else {
   1749         Err(PublishProxyError::InvalidScope(format!(
   1750             "{field} must be {expected_len} lowercase hex characters"
   1751         )))
   1752     }
   1753 }
   1754 
   1755 fn json_column<T: for<'de> Deserialize<'de>>(
   1756     row: &Row<'_>,
   1757     index: usize,
   1758 ) -> Result<T, rusqlite::Error> {
   1759     let value: String = row.get(index)?;
   1760     serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error))
   1761 }
   1762 
   1763 fn json_text<T: for<'de> Deserialize<'de>>(
   1764     row: &Row<'_>,
   1765     index: usize,
   1766 ) -> Result<T, rusqlite::Error> {
   1767     let value: String = row.get(index)?;
   1768     serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error))
   1769 }
   1770 
   1771 fn conversion_error<E>(index: usize, error: E) -> rusqlite::Error
   1772 where
   1773     E: std::error::Error + Send + Sync + 'static,
   1774 {
   1775     rusqlite::Error::FromSqlConversionFailure(index, Type::Text, Box::new(error))
   1776 }
   1777 
   1778 fn current_unix_secs() -> i64 {
   1779     SystemTime::now()
   1780         .duration_since(UNIX_EPOCH)
   1781         .map(|duration| duration.as_secs() as i64)
   1782         .unwrap_or_default()
   1783 }
   1784 
   1785 fn current_unix_millis() -> i64 {
   1786     SystemTime::now()
   1787         .duration_since(UNIX_EPOCH)
   1788         .map(|duration| duration.as_millis() as i64)
   1789         .unwrap_or_default()
   1790 }
   1791 
   1792 #[cfg(test)]
   1793 mod tests {
   1794     use super::{
   1795         PublishJobInsert, PublishJobVisibility, PublishPrincipal, PublishPrincipalInit,
   1796         PublishProxy, PublishProxyError, PublishProxyStore, generate_bearer_token,
   1797         hash_bearer_token, parse_relay_policy,
   1798     };
   1799     use crate::app::config::{PublishProxyConfig, PublishProxyRelayUrlPolicy};
   1800     use nostr::JsonUtil;
   1801     use radroots_identity::RadrootsIdentity;
   1802     use radroots_nostr::prelude::{
   1803         RadrootsNostrEventVerification, RadrootsNostrTimestamp, radroots_nostr_build_event,
   1804     };
   1805     use radroots_publish_proxy_protocol::{
   1806         PublishDeliveryPolicy, PublishEventRequest, PublishJobStatus, PublishRelayOutcomeKind,
   1807         PublishRelayPolicy, PublishRelaySource, SignedNostrEventWire,
   1808     };
   1809     use radroots_relay_transport::{RadrootsMockRelayPublishAdapter, RadrootsRelayOutcome};
   1810     use std::collections::BTreeMap;
   1811     use std::net::{IpAddr, Ipv4Addr};
   1812     use std::sync::Arc;
   1813 
   1814     const RELAY_PRIMARY: &str = "wss://relay.example.com";
   1815     const RELAY_SECONDARY: &str = "wss://relay-2.example.com";
   1816     const RELAY_FORBIDDEN: &str = "wss://forbidden-relay.example.com";
   1817 
   1818     fn event(pubkey: &str, kind: u32) -> SignedNostrEventWire {
   1819         SignedNostrEventWire {
   1820             id: "0".repeat(64),
   1821             pubkey: pubkey.to_owned(),
   1822             created_at: 1_700_000_000,
   1823             kind,
   1824             tags: vec![vec!["d".to_owned(), "listing-1".to_owned()]],
   1825             content: "{}".to_owned(),
   1826             sig: "1".repeat(128),
   1827         }
   1828     }
   1829 
   1830     fn request(pubkey: &str, kind: u32) -> PublishEventRequest {
   1831         PublishEventRequest {
   1832             event: event(pubkey, kind),
   1833             relays: Vec::new(),
   1834             relay_policy: PublishRelayPolicy::DaemonDefaultOnly,
   1835             delivery_policy: PublishDeliveryPolicy::Any,
   1836             idempotency_key: Some("idem-1".to_owned()),
   1837             timeout_ms: None,
   1838         }
   1839     }
   1840 
   1841     fn signed_event(identity: &RadrootsIdentity, content: &str) -> SignedNostrEventWire {
   1842         let event = radroots_nostr_build_event(
   1843             30_402,
   1844             content,
   1845             vec![vec!["d".to_owned(), "listing-1".to_owned()]],
   1846         )
   1847         .expect("event builder")
   1848         .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000))
   1849         .sign_with_keys(identity.keys())
   1850         .expect("signed event");
   1851         serde_json::from_str(event.as_json().as_str()).expect("event wire")
   1852     }
   1853 
   1854     fn publish_request(
   1855         event: SignedNostrEventWire,
   1856         relays: Vec<String>,
   1857         relay_policy: PublishRelayPolicy,
   1858         delivery_policy: PublishDeliveryPolicy,
   1859         idempotency_key: Option<&str>,
   1860     ) -> PublishEventRequest {
   1861         PublishEventRequest {
   1862             event,
   1863             relays,
   1864             relay_policy,
   1865             delivery_policy,
   1866             idempotency_key: idempotency_key.map(str::to_owned),
   1867             timeout_ms: Some(5_000),
   1868         }
   1869     }
   1870 
   1871     fn publish_proxy(
   1872         config: PublishProxyConfig,
   1873     ) -> (PublishProxy, RadrootsMockRelayPublishAdapter) {
   1874         publish_proxy_with_resolver(config, Arc::new(StaticPublishRelayResolver::new()))
   1875     }
   1876 
   1877     fn publish_proxy_with_resolver(
   1878         config: PublishProxyConfig,
   1879         resolver: Arc<dyn super::PublishRelayResolver>,
   1880     ) -> (PublishProxy, RadrootsMockRelayPublishAdapter) {
   1881         let adapter = RadrootsMockRelayPublishAdapter::new();
   1882         let proxy = PublishProxy::memory(config)
   1883             .expect("proxy")
   1884             .with_relay_resolver(resolver)
   1885             .with_publisher(Arc::new(adapter.clone()));
   1886         (proxy, adapter)
   1887     }
   1888 
   1889     fn principal(
   1890         proxy: &PublishProxy,
   1891         pubkey: String,
   1892         policies: Vec<PublishRelayPolicy>,
   1893         allow_request_relays: bool,
   1894         visibility: PublishJobVisibility,
   1895     ) -> PublishPrincipal {
   1896         proxy
   1897             .store
   1898             .create_principal(PublishPrincipalInit {
   1899                 label: "tester".to_owned(),
   1900                 token_hash: hash_bearer_token(generate_bearer_token().as_str()),
   1901                 allowed_pubkeys: vec![pubkey],
   1902                 allowed_kinds: vec![30_402],
   1903                 allowed_relay_policies: policies,
   1904                 allow_request_relays,
   1905                 job_visibility: visibility,
   1906                 expires_at_unix: None,
   1907             })
   1908             .expect("principal")
   1909     }
   1910 
   1911     fn config_with_defaults(relays: Vec<&str>) -> PublishProxyConfig {
   1912         PublishProxyConfig {
   1913             daemon_default_publish_relays: relays.into_iter().map(str::to_owned).collect(),
   1914             ..PublishProxyConfig::default()
   1915         }
   1916     }
   1917 
   1918     #[derive(Default)]
   1919     struct StaticPublishRelayResolver {
   1920         results: BTreeMap<String, Result<Vec<IpAddr>, String>>,
   1921     }
   1922 
   1923     impl StaticPublishRelayResolver {
   1924         fn new() -> Self {
   1925             Self::default()
   1926         }
   1927 
   1928         fn with_addresses(mut self, url: &str, addresses: Vec<IpAddr>) -> Self {
   1929             self.results.insert(url.to_owned(), Ok(addresses));
   1930             self
   1931         }
   1932 
   1933         fn with_failure(mut self, url: &str, error: &str) -> Self {
   1934             self.results.insert(url.to_owned(), Err(error.to_owned()));
   1935             self
   1936         }
   1937     }
   1938 
   1939     impl super::PublishRelayResolver for StaticPublishRelayResolver {
   1940         fn resolve<'a>(
   1941             &'a self,
   1942             url: &'a radroots_relay_transport::RadrootsRelayUrl,
   1943         ) -> super::PublishRelayResolveFuture<'a> {
   1944             Box::pin(async move {
   1945                 match self.results.get(url.as_str()) {
   1946                     Some(Ok(addresses)) => Ok(addresses.clone()),
   1947                     Some(Err(error)) => Err(std::io::Error::other(error.clone())),
   1948                     None => Ok(vec![IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34))]),
   1949                 }
   1950             })
   1951         }
   1952     }
   1953 
   1954     struct StaticPublishAuthorRelayDiscovery {
   1955         relays: Vec<String>,
   1956     }
   1957 
   1958     impl StaticPublishAuthorRelayDiscovery {
   1959         fn new(relays: Vec<&str>) -> Self {
   1960             Self {
   1961                 relays: relays.into_iter().map(str::to_owned).collect(),
   1962             }
   1963         }
   1964     }
   1965 
   1966     impl super::PublishAuthorRelayDiscovery for StaticPublishAuthorRelayDiscovery {
   1967         fn fetch_author_write_relays<'a>(
   1968             &'a self,
   1969             _pubkey: &'a str,
   1970             _discovery_targets: Vec<super::ResolvedPublishRelay>,
   1971             _connect_timeout_secs: u64,
   1972         ) -> super::PublishAuthorRelayDiscoveryFuture<'a> {
   1973             let relays = self.relays.clone();
   1974             Box::pin(async move { Ok(relays) })
   1975         }
   1976     }
   1977 
   1978     #[test]
   1979     fn token_generation_and_hashing_do_not_store_plaintext() {
   1980         let token = generate_bearer_token();
   1981         assert!(token.starts_with("rrd_pp_"));
   1982         let hash = hash_bearer_token(token.as_str());
   1983         assert!(hash.starts_with("sha256:"));
   1984         assert!(!hash.contains(token.as_str()));
   1985     }
   1986 
   1987     #[test]
   1988     fn relay_policy_parser_accepts_contract_values() {
   1989         assert_eq!(
   1990             parse_relay_policy("explicit_only").expect("policy"),
   1991             PublishRelayPolicy::ExplicitOnly
   1992         );
   1993         assert!(parse_relay_policy("unknown").is_err());
   1994     }
   1995 
   1996     #[test]
   1997     fn storage_authenticates_hashed_tokens_and_scopes_jobs() {
   1998         let store = PublishProxyStore::memory().expect("store");
   1999         let token = generate_bearer_token();
   2000         let token_hash = hash_bearer_token(token.as_str());
   2001         let principal = store
   2002             .create_principal(PublishPrincipalInit {
   2003                 label: "tester".to_owned(),
   2004                 token_hash: token_hash.clone(),
   2005                 allowed_pubkeys: vec!["a".repeat(64)],
   2006                 allowed_kinds: vec![30_402],
   2007                 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly],
   2008                 allow_request_relays: false,
   2009                 job_visibility: PublishJobVisibility::Own,
   2010                 expires_at_unix: None,
   2011             })
   2012             .expect("principal");
   2013         assert_eq!(
   2014             store
   2015                 .principal_for_token_hash(token_hash.as_str())
   2016                 .expect("lookup")
   2017                 .expect("principal")
   2018                 .principal_id,
   2019             principal.principal_id
   2020         );
   2021         let denied = request("b".repeat(64).as_str(), 30_402);
   2022         assert!(principal.allows_event(&denied).is_err());
   2023 
   2024         let accepted = request("a".repeat(64).as_str(), 30_402);
   2025         principal.allows_event(&accepted).expect("scope");
   2026         let response = store
   2027             .record_publish_job(PublishJobInsert {
   2028                 principal_id: principal.principal_id.clone(),
   2029                 idempotency_key: Some("idem-1".to_owned()),
   2030                 request: accepted.clone(),
   2031                 request_fingerprint: "fingerprint-1".to_owned(),
   2032                 effective_relay_count: 1,
   2033             })
   2034             .expect("record job");
   2035         assert!(!response.deduplicated);
   2036         let duplicate = store
   2037             .record_publish_job(PublishJobInsert {
   2038                 principal_id: principal.principal_id.clone(),
   2039                 idempotency_key: Some("idem-1".to_owned()),
   2040                 request: accepted,
   2041                 request_fingerprint: "fingerprint-1".to_owned(),
   2042                 effective_relay_count: 1,
   2043             })
   2044             .expect("dedupe");
   2045         assert!(duplicate.deduplicated);
   2046         assert_eq!(duplicate.job.job_id, response.job.job_id);
   2047         assert_eq!(
   2048             store
   2049                 .list_jobs_for_principal(&principal, 50)
   2050                 .expect("jobs")
   2051                 .len(),
   2052             1
   2053         );
   2054     }
   2055 
   2056     #[test]
   2057     fn store_open_recovers_interrupted_publishing_jobs() {
   2058         let directory = tempfile::tempdir().expect("tempdir");
   2059         let database_path = directory.path().join("publish-proxy.sqlite");
   2060         let token_hash = hash_bearer_token(generate_bearer_token().as_str());
   2061         let pubkey = "a".repeat(64);
   2062         let request = request(pubkey.as_str(), 30_402);
   2063         let job_id = {
   2064             let store = PublishProxyStore::open(database_path.clone()).expect("store");
   2065             let principal = store
   2066                 .create_principal(PublishPrincipalInit {
   2067                     label: "tester".to_owned(),
   2068                     token_hash,
   2069                     allowed_pubkeys: vec![pubkey],
   2070                     allowed_kinds: vec![30_402],
   2071                     allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly],
   2072                     allow_request_relays: false,
   2073                     job_visibility: PublishJobVisibility::Own,
   2074                     expires_at_unix: None,
   2075                 })
   2076                 .expect("principal");
   2077             let response = store
   2078                 .record_publish_job(PublishJobInsert {
   2079                     principal_id: principal.principal_id,
   2080                     idempotency_key: Some("idem-interrupted".to_owned()),
   2081                     request,
   2082                     request_fingerprint: "fingerprint-interrupted".to_owned(),
   2083                     effective_relay_count: 1,
   2084                 })
   2085                 .expect("record job");
   2086             assert_eq!(response.job.status, PublishJobStatus::Publishing);
   2087             response.job.job_id
   2088         };
   2089 
   2090         let reopened = PublishProxyStore::open(database_path).expect("reopen store");
   2091         let recovered = reopened.job_by_id(job_id.as_str()).expect("recovered job");
   2092         assert_eq!(
   2093             recovered.status,
   2094             PublishJobStatus::DeliveryUnsatisfiedRetryable
   2095         );
   2096         assert_eq!(
   2097             recovered.last_error.as_deref(),
   2098             Some("publish_attempt_interrupted")
   2099         );
   2100         assert!(recovered.completed_at_ms.is_some());
   2101         assert!(recovered.relays.is_empty());
   2102     }
   2103 
   2104     #[tokio::test]
   2105     async fn publish_event_verifies_and_records_daemon_default_outcome() {
   2106         let identity = RadrootsIdentity::generate();
   2107         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2108         let principal = principal(
   2109             &proxy,
   2110             identity.public_key_hex(),
   2111             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2112             false,
   2113             PublishJobVisibility::Own,
   2114         );
   2115         let event = signed_event(&identity, "{}");
   2116         let raw_event = serde_json::to_string(&event).expect("raw event");
   2117         let response = proxy
   2118             .publish_event(
   2119                 &principal,
   2120                 publish_request(
   2121                     event,
   2122                     Vec::new(),
   2123                     PublishRelayPolicy::DaemonDefaultOnly,
   2124                     PublishDeliveryPolicy::Any,
   2125                     Some("idem-valid"),
   2126                 ),
   2127             )
   2128             .await
   2129             .expect("publish");
   2130 
   2131         assert!(!response.deduplicated);
   2132         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2133         assert_eq!(response.job.relay_count, 1);
   2134         assert_eq!(response.job.acknowledged_count, 1);
   2135         assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY);
   2136         assert_eq!(
   2137             response.job.relays[0].source,
   2138             PublishRelaySource::DaemonDefault
   2139         );
   2140         assert_eq!(adapter.captured_raw_events(), vec![raw_event]);
   2141     }
   2142 
   2143     #[tokio::test]
   2144     async fn publish_event_rejects_tampered_content_before_publish() {
   2145         let identity = RadrootsIdentity::generate();
   2146         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2147         let principal = principal(
   2148             &proxy,
   2149             identity.public_key_hex(),
   2150             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2151             false,
   2152             PublishJobVisibility::Own,
   2153         );
   2154         let mut event = signed_event(&identity, "trusted");
   2155         event.content = "tampered".to_owned();
   2156         let error = proxy
   2157             .publish_event(
   2158                 &principal,
   2159                 publish_request(
   2160                     event,
   2161                     Vec::new(),
   2162                     PublishRelayPolicy::DaemonDefaultOnly,
   2163                     PublishDeliveryPolicy::Any,
   2164                     None,
   2165                 ),
   2166             )
   2167             .await
   2168             .expect_err("tampered event should fail");
   2169 
   2170         assert!(matches!(
   2171             error,
   2172             PublishProxyError::SignedEventVerification(RadrootsNostrEventVerification::IdMismatch)
   2173         ));
   2174         assert!(adapter.captured_raw_events().is_empty());
   2175     }
   2176 
   2177     #[tokio::test]
   2178     async fn publish_event_rejects_wrong_signature_before_publish() {
   2179         let identity = RadrootsIdentity::generate();
   2180         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2181         let principal = principal(
   2182             &proxy,
   2183             identity.public_key_hex(),
   2184             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2185             false,
   2186             PublishJobVisibility::Own,
   2187         );
   2188         let mut event = signed_event(&identity, "{}");
   2189         let replacement = if event.sig.starts_with('0') { "1" } else { "0" };
   2190         event.sig.replace_range(0..1, replacement);
   2191         let error = proxy
   2192             .publish_event(
   2193                 &principal,
   2194                 publish_request(
   2195                     event,
   2196                     Vec::new(),
   2197                     PublishRelayPolicy::DaemonDefaultOnly,
   2198                     PublishDeliveryPolicy::Any,
   2199                     None,
   2200                 ),
   2201             )
   2202             .await
   2203             .expect_err("wrong signature should fail");
   2204 
   2205         assert!(matches!(
   2206             error,
   2207             PublishProxyError::SignedEventVerification(
   2208                 RadrootsNostrEventVerification::SignatureInvalid
   2209             )
   2210         ));
   2211         assert!(adapter.captured_raw_events().is_empty());
   2212     }
   2213 
   2214     #[tokio::test]
   2215     async fn publish_event_rejects_malformed_wire_fields() {
   2216         let identity = RadrootsIdentity::generate();
   2217         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2218         let principal = principal(
   2219             &proxy,
   2220             identity.public_key_hex(),
   2221             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2222             false,
   2223             PublishJobVisibility::Own,
   2224         );
   2225         let mut event = signed_event(&identity, "{}");
   2226         event.id = event.id.to_uppercase();
   2227         let error = proxy
   2228             .publish_event(
   2229                 &principal,
   2230                 publish_request(
   2231                     event,
   2232                     Vec::new(),
   2233                     PublishRelayPolicy::DaemonDefaultOnly,
   2234                     PublishDeliveryPolicy::Any,
   2235                     None,
   2236                 ),
   2237             )
   2238             .await
   2239             .expect_err("malformed field should fail");
   2240 
   2241         assert!(matches!(error, PublishProxyError::InvalidSignedEvent(_)));
   2242         assert!(adapter.captured_raw_events().is_empty());
   2243     }
   2244 
   2245     #[tokio::test]
   2246     async fn publish_event_uses_explicit_request_relays_when_allowed() {
   2247         let identity = RadrootsIdentity::generate();
   2248         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY]));
   2249         let principal = principal(
   2250             &proxy,
   2251             identity.public_key_hex(),
   2252             vec![PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault],
   2253             true,
   2254             PublishJobVisibility::Own,
   2255         );
   2256         let response = proxy
   2257             .publish_event(
   2258                 &principal,
   2259                 publish_request(
   2260                     signed_event(&identity, "{}"),
   2261                     vec![RELAY_PRIMARY.to_owned()],
   2262                     PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault,
   2263                     PublishDeliveryPolicy::Any,
   2264                     None,
   2265                 ),
   2266             )
   2267             .await
   2268             .expect("publish");
   2269 
   2270         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2271         assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY);
   2272         assert_eq!(response.job.relays[0].source, PublishRelaySource::Request);
   2273     }
   2274 
   2275     #[tokio::test]
   2276     async fn publish_event_uses_cached_nip65_author_write_before_defaults() {
   2277         let identity = RadrootsIdentity::generate();
   2278         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY]));
   2279         proxy
   2280             .store
   2281             .cache_author_write_relays(
   2282                 identity.public_key_hex().as_str(),
   2283                 &[RELAY_PRIMARY.to_owned()],
   2284             )
   2285             .expect("cache author relays");
   2286         let principal = principal(
   2287             &proxy,
   2288             identity.public_key_hex(),
   2289             vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault],
   2290             false,
   2291             PublishJobVisibility::Own,
   2292         );
   2293         let response = proxy
   2294             .publish_event(
   2295                 &principal,
   2296                 publish_request(
   2297                     signed_event(&identity, "{}"),
   2298                     Vec::new(),
   2299                     PublishRelayPolicy::AuthorWriteThenDaemonDefault,
   2300                     PublishDeliveryPolicy::Any,
   2301                     None,
   2302                 ),
   2303             )
   2304             .await
   2305             .expect("publish");
   2306 
   2307         assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY);
   2308         assert_eq!(
   2309             response.job.relays[0].source,
   2310             PublishRelaySource::AuthorWrite
   2311         );
   2312     }
   2313 
   2314     #[tokio::test]
   2315     async fn publish_event_records_invalid_cached_author_write_relay() {
   2316         let identity = RadrootsIdentity::generate();
   2317         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY]));
   2318         proxy
   2319             .store
   2320             .cache_author_write_relays(
   2321                 identity.public_key_hex().as_str(),
   2322                 &[RELAY_PRIMARY.to_owned(), "not a cached relay".to_owned()],
   2323             )
   2324             .expect("cache author relays");
   2325         let principal = principal(
   2326             &proxy,
   2327             identity.public_key_hex(),
   2328             vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault],
   2329             false,
   2330             PublishJobVisibility::Own,
   2331         );
   2332         let response = proxy
   2333             .publish_event(
   2334                 &principal,
   2335                 publish_request(
   2336                     signed_event(&identity, "{}"),
   2337                     Vec::new(),
   2338                     PublishRelayPolicy::AuthorWriteThenDaemonDefault,
   2339                     PublishDeliveryPolicy::Any,
   2340                     None,
   2341                 ),
   2342             )
   2343             .await
   2344             .expect("publish");
   2345 
   2346         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2347         let accepted = response
   2348             .job
   2349             .relays
   2350             .iter()
   2351             .find(|relay| relay.relay_url == RELAY_PRIMARY)
   2352             .expect("accepted author relay");
   2353         assert_eq!(accepted.source, PublishRelaySource::AuthorWrite);
   2354         assert!(accepted.attempted);
   2355         let rejected = response
   2356             .job
   2357             .relays
   2358             .iter()
   2359             .find(|relay| relay.relay_url == "not a cached relay")
   2360             .expect("rejected cached author relay");
   2361         assert_eq!(rejected.source, PublishRelaySource::AuthorWrite);
   2362         assert_eq!(
   2363             rejected.outcome_kind,
   2364             PublishRelayOutcomeKind::RelayUrlRejected
   2365         );
   2366         assert!(!rejected.attempted);
   2367         assert_eq!(adapter.captured_raw_events().len(), 1);
   2368     }
   2369 
   2370     #[tokio::test]
   2371     async fn publish_event_preserves_author_and_discovery_rejections_through_fallback() {
   2372         let identity = RadrootsIdentity::generate();
   2373         let mut config = config_with_defaults(vec![RELAY_SECONDARY]);
   2374         config.author_relay_discovery_relays = vec!["not a discovery relay".to_owned()];
   2375         let (proxy, adapter) = publish_proxy(config);
   2376         proxy
   2377             .store
   2378             .cache_author_write_relays(
   2379                 identity.public_key_hex().as_str(),
   2380                 &["not a cached relay".to_owned()],
   2381             )
   2382             .expect("cache author relays");
   2383         let principal = principal(
   2384             &proxy,
   2385             identity.public_key_hex(),
   2386             vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault],
   2387             false,
   2388             PublishJobVisibility::Own,
   2389         );
   2390         let response = proxy
   2391             .publish_event(
   2392                 &principal,
   2393                 publish_request(
   2394                     signed_event(&identity, "{}"),
   2395                     Vec::new(),
   2396                     PublishRelayPolicy::AuthorWriteThenDaemonDefault,
   2397                     PublishDeliveryPolicy::Any,
   2398                     None,
   2399                 ),
   2400             )
   2401             .await
   2402             .expect("publish");
   2403 
   2404         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2405         let daemon_default = response
   2406             .job
   2407             .relays
   2408             .iter()
   2409             .find(|relay| relay.relay_url == RELAY_SECONDARY)
   2410             .expect("daemon default relay");
   2411         assert_eq!(daemon_default.source, PublishRelaySource::DaemonDefault);
   2412         assert!(daemon_default.attempted);
   2413         let cached = response
   2414             .job
   2415             .relays
   2416             .iter()
   2417             .find(|relay| relay.relay_url == "not a cached relay")
   2418             .expect("cached author rejection");
   2419         assert_eq!(cached.source, PublishRelaySource::AuthorWrite);
   2420         assert_eq!(
   2421             cached.outcome_kind,
   2422             PublishRelayOutcomeKind::RelayUrlRejected
   2423         );
   2424         assert!(!cached.attempted);
   2425         let discovery = response
   2426             .job
   2427             .relays
   2428             .iter()
   2429             .find(|relay| relay.relay_url == "not a discovery relay")
   2430             .expect("discovery relay rejection");
   2431         assert_eq!(discovery.source, PublishRelaySource::DaemonDefault);
   2432         assert_eq!(
   2433             discovery.outcome_kind,
   2434             PublishRelayOutcomeKind::RelayUrlRejected
   2435         );
   2436         assert!(!discovery.attempted);
   2437         assert_eq!(adapter.captured_raw_events().len(), 1);
   2438     }
   2439 
   2440     #[tokio::test]
   2441     async fn publish_event_preserves_discovery_and_discovered_author_rejections() {
   2442         let identity = RadrootsIdentity::generate();
   2443         let mut config = config_with_defaults(vec![RELAY_PRIMARY]);
   2444         config.author_relay_discovery_relays =
   2445             vec![RELAY_PRIMARY.to_owned(), RELAY_FORBIDDEN.to_owned()];
   2446         let resolver = StaticPublishRelayResolver::new().with_addresses(
   2447             RELAY_FORBIDDEN,
   2448             vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))],
   2449         );
   2450         let adapter = RadrootsMockRelayPublishAdapter::new();
   2451         let proxy = PublishProxy::memory(config)
   2452             .expect("proxy")
   2453             .with_relay_resolver(Arc::new(resolver))
   2454             .with_author_relay_discovery(Arc::new(StaticPublishAuthorRelayDiscovery::new(vec![
   2455                 "not a discovered author relay",
   2456                 RELAY_SECONDARY,
   2457             ])))
   2458             .with_publisher(Arc::new(adapter.clone()));
   2459         let principal = principal(
   2460             &proxy,
   2461             identity.public_key_hex(),
   2462             vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault],
   2463             false,
   2464             PublishJobVisibility::Own,
   2465         );
   2466         let response = proxy
   2467             .publish_event(
   2468                 &principal,
   2469                 publish_request(
   2470                     signed_event(&identity, "{}"),
   2471                     Vec::new(),
   2472                     PublishRelayPolicy::AuthorWriteThenDaemonDefault,
   2473                     PublishDeliveryPolicy::Any,
   2474                     None,
   2475                 ),
   2476             )
   2477             .await
   2478             .expect("publish");
   2479 
   2480         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2481         let accepted = response
   2482             .job
   2483             .relays
   2484             .iter()
   2485             .find(|relay| relay.relay_url == RELAY_SECONDARY)
   2486             .expect("discovered author relay");
   2487         assert_eq!(accepted.source, PublishRelaySource::AuthorWrite);
   2488         assert!(accepted.attempted);
   2489         let discovered = response
   2490             .job
   2491             .relays
   2492             .iter()
   2493             .find(|relay| relay.relay_url == "not a discovered author relay")
   2494             .expect("discovered author rejection");
   2495         assert_eq!(discovered.source, PublishRelaySource::AuthorWrite);
   2496         assert_eq!(
   2497             discovered.outcome_kind,
   2498             PublishRelayOutcomeKind::RelayUrlRejected
   2499         );
   2500         assert!(!discovered.attempted);
   2501         let discovery = response
   2502             .job
   2503             .relays
   2504             .iter()
   2505             .find(|relay| relay.relay_url == RELAY_FORBIDDEN)
   2506             .expect("discovery relay rejection");
   2507         assert_eq!(discovery.source, PublishRelaySource::DaemonDefault);
   2508         assert_eq!(
   2509             discovery.outcome_kind,
   2510             PublishRelayOutcomeKind::RelayUrlRejected
   2511         );
   2512         assert!(!discovery.attempted);
   2513         assert_eq!(adapter.captured_raw_events().len(), 1);
   2514     }
   2515 
   2516     #[tokio::test]
   2517     async fn publish_event_records_no_publish_relays_failure() {
   2518         let identity = RadrootsIdentity::generate();
   2519         let (proxy, adapter) = publish_proxy(PublishProxyConfig::default());
   2520         let principal = principal(
   2521             &proxy,
   2522             identity.public_key_hex(),
   2523             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2524             false,
   2525             PublishJobVisibility::Own,
   2526         );
   2527         let response = proxy
   2528             .publish_event(
   2529                 &principal,
   2530                 publish_request(
   2531                     signed_event(&identity, "{}"),
   2532                     Vec::new(),
   2533                     PublishRelayPolicy::DaemonDefaultOnly,
   2534                     PublishDeliveryPolicy::Any,
   2535                     None,
   2536                 ),
   2537             )
   2538             .await
   2539             .expect("publish");
   2540 
   2541         assert_eq!(response.job.status, PublishJobStatus::Rejected);
   2542         assert_eq!(
   2543             response.job.last_error.as_deref(),
   2544             Some("no_publish_relays")
   2545         );
   2546         assert!(response.job.relays.is_empty());
   2547         assert!(adapter.captured_raw_events().is_empty());
   2548     }
   2549 
   2550     #[tokio::test]
   2551     async fn publish_event_records_unsafe_request_relay_rejection() {
   2552         let identity = RadrootsIdentity::generate();
   2553         let (proxy, adapter) = publish_proxy(PublishProxyConfig::default());
   2554         let principal = principal(
   2555             &proxy,
   2556             identity.public_key_hex(),
   2557             vec![PublishRelayPolicy::ExplicitOnly],
   2558             true,
   2559             PublishJobVisibility::Own,
   2560         );
   2561         let response = proxy
   2562             .publish_event(
   2563                 &principal,
   2564                 publish_request(
   2565                     signed_event(&identity, "{}"),
   2566                     vec!["wss://127.0.0.1:7777".to_owned()],
   2567                     PublishRelayPolicy::ExplicitOnly,
   2568                     PublishDeliveryPolicy::Any,
   2569                     None,
   2570                 ),
   2571             )
   2572             .await
   2573             .expect("publish");
   2574 
   2575         assert_eq!(response.job.status, PublishJobStatus::Rejected);
   2576         assert_eq!(response.job.relays.len(), 1);
   2577         assert_eq!(
   2578             response.job.relays[0].outcome_kind,
   2579             PublishRelayOutcomeKind::RelayUrlRejected
   2580         );
   2581         assert!(!response.job.relays[0].attempted);
   2582         assert!(adapter.captured_raw_events().is_empty());
   2583     }
   2584 
   2585     #[tokio::test]
   2586     async fn publish_event_rejects_forbidden_public_dns_destination_before_publish() {
   2587         let identity = RadrootsIdentity::generate();
   2588         let resolver = StaticPublishRelayResolver::new()
   2589             .with_addresses(RELAY_PRIMARY, vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))]);
   2590         let (proxy, adapter) = publish_proxy_with_resolver(
   2591             config_with_defaults(vec![RELAY_PRIMARY]),
   2592             Arc::new(resolver),
   2593         );
   2594         let principal = principal(
   2595             &proxy,
   2596             identity.public_key_hex(),
   2597             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2598             false,
   2599             PublishJobVisibility::Own,
   2600         );
   2601         let response = proxy
   2602             .publish_event(
   2603                 &principal,
   2604                 publish_request(
   2605                     signed_event(&identity, "{}"),
   2606                     Vec::new(),
   2607                     PublishRelayPolicy::DaemonDefaultOnly,
   2608                     PublishDeliveryPolicy::Any,
   2609                     None,
   2610                 ),
   2611             )
   2612             .await
   2613             .expect("publish");
   2614 
   2615         assert_eq!(response.job.status, PublishJobStatus::Rejected);
   2616         assert_eq!(response.job.relays.len(), 1);
   2617         assert_eq!(
   2618             response.job.relays[0].outcome_kind,
   2619             PublishRelayOutcomeKind::RelayUrlRejected
   2620         );
   2621         assert!(!response.job.relays[0].attempted);
   2622         assert!(adapter.captured_raw_events().is_empty());
   2623     }
   2624 
   2625     #[tokio::test]
   2626     async fn publish_event_records_dns_failure_as_unattempted_retryable_outcome() {
   2627         let identity = RadrootsIdentity::generate();
   2628         let resolver = StaticPublishRelayResolver::new().with_failure(RELAY_PRIMARY, "no records");
   2629         let (proxy, adapter) = publish_proxy_with_resolver(
   2630             config_with_defaults(vec![RELAY_PRIMARY]),
   2631             Arc::new(resolver),
   2632         );
   2633         let principal = principal(
   2634             &proxy,
   2635             identity.public_key_hex(),
   2636             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2637             false,
   2638             PublishJobVisibility::Own,
   2639         );
   2640         let response = proxy
   2641             .publish_event(
   2642                 &principal,
   2643                 publish_request(
   2644                     signed_event(&identity, "{}"),
   2645                     Vec::new(),
   2646                     PublishRelayPolicy::DaemonDefaultOnly,
   2647                     PublishDeliveryPolicy::Any,
   2648                     None,
   2649                 ),
   2650             )
   2651             .await
   2652             .expect("publish");
   2653 
   2654         assert_eq!(
   2655             response.job.status,
   2656             PublishJobStatus::DeliveryUnsatisfiedRetryable
   2657         );
   2658         assert_eq!(
   2659             response.job.last_error.as_deref(),
   2660             Some("delivery_unsatisfied")
   2661         );
   2662         assert_eq!(response.job.relays.len(), 1);
   2663         assert_eq!(
   2664             response.job.relays[0].outcome_kind,
   2665             PublishRelayOutcomeKind::ConnectionFailed
   2666         );
   2667         assert!(!response.job.relays[0].attempted);
   2668         assert!(adapter.captured_raw_events().is_empty());
   2669     }
   2670 
   2671     #[tokio::test]
   2672     async fn publish_event_localhost_policy_skips_public_dns_guard() {
   2673         let identity = RadrootsIdentity::generate();
   2674         let mut config = config_with_defaults(vec!["ws://localhost:7777"]);
   2675         config.relay_url_policy = PublishProxyRelayUrlPolicy::Localhost;
   2676         let resolver = StaticPublishRelayResolver::new()
   2677             .with_failure("ws://localhost:7777", "localhost resolution should not run");
   2678         let (proxy, adapter) = publish_proxy_with_resolver(config, Arc::new(resolver));
   2679         let principal = principal(
   2680             &proxy,
   2681             identity.public_key_hex(),
   2682             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2683             false,
   2684             PublishJobVisibility::Own,
   2685         );
   2686         let response = proxy
   2687             .publish_event(
   2688                 &principal,
   2689                 publish_request(
   2690                     signed_event(&identity, "{}"),
   2691                     Vec::new(),
   2692                     PublishRelayPolicy::DaemonDefaultOnly,
   2693                     PublishDeliveryPolicy::Any,
   2694                     None,
   2695                 ),
   2696             )
   2697             .await
   2698             .expect("publish");
   2699 
   2700         assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied);
   2701         assert_eq!(response.job.relays[0].relay_url, "ws://localhost:7777");
   2702         assert!(!adapter.captured_raw_events().is_empty());
   2703     }
   2704 
   2705     #[tokio::test]
   2706     async fn publish_event_deduplicates_same_intent_and_conflicts_different_intent() {
   2707         let identity = RadrootsIdentity::generate();
   2708         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2709         let principal = principal(
   2710             &proxy,
   2711             identity.public_key_hex(),
   2712             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2713             false,
   2714             PublishJobVisibility::Own,
   2715         );
   2716         let request = publish_request(
   2717             signed_event(&identity, "{}"),
   2718             Vec::new(),
   2719             PublishRelayPolicy::DaemonDefaultOnly,
   2720             PublishDeliveryPolicy::Any,
   2721             Some("idem-conflict"),
   2722         );
   2723         let first = proxy
   2724             .publish_event(&principal, request.clone())
   2725             .await
   2726             .expect("first");
   2727         let duplicate = proxy
   2728             .publish_event(&principal, request)
   2729             .await
   2730             .expect("duplicate");
   2731 
   2732         assert!(!first.deduplicated);
   2733         assert!(duplicate.deduplicated);
   2734         assert_eq!(duplicate.job.job_id, first.job.job_id);
   2735 
   2736         let conflict = proxy
   2737             .publish_event(
   2738                 &principal,
   2739                 publish_request(
   2740                     signed_event(&identity, "changed"),
   2741                     Vec::new(),
   2742                     PublishRelayPolicy::DaemonDefaultOnly,
   2743                     PublishDeliveryPolicy::Any,
   2744                     Some("idem-conflict"),
   2745                 ),
   2746             )
   2747             .await
   2748             .expect_err("conflict");
   2749         assert!(matches!(
   2750             conflict,
   2751             PublishProxyError::IdempotencyConflict(_)
   2752         ));
   2753     }
   2754 
   2755     #[tokio::test]
   2756     async fn publish_event_rejects_zero_and_excessive_timeout_before_job_creation() {
   2757         let identity = RadrootsIdentity::generate();
   2758         let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2759         let principal = principal(
   2760             &proxy,
   2761             identity.public_key_hex(),
   2762             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2763             false,
   2764             PublishJobVisibility::Own,
   2765         );
   2766         let mut zero = publish_request(
   2767             signed_event(&identity, "{}"),
   2768             Vec::new(),
   2769             PublishRelayPolicy::DaemonDefaultOnly,
   2770             PublishDeliveryPolicy::Any,
   2771             Some("idem-zero-timeout"),
   2772         );
   2773         zero.timeout_ms = Some(0);
   2774         let zero_error = proxy
   2775             .publish_event(&principal, zero)
   2776             .await
   2777             .expect_err("zero timeout should fail");
   2778         assert!(matches!(
   2779             zero_error,
   2780             PublishProxyError::InvalidSignedEvent(_)
   2781         ));
   2782 
   2783         let mut excessive = publish_request(
   2784             signed_event(&identity, "changed"),
   2785             Vec::new(),
   2786             PublishRelayPolicy::DaemonDefaultOnly,
   2787             PublishDeliveryPolicy::Any,
   2788             Some("idem-excessive-timeout"),
   2789         );
   2790         excessive.timeout_ms = Some(10_001);
   2791         let excessive_error = proxy
   2792             .publish_event(&principal, excessive)
   2793             .await
   2794             .expect_err("excessive timeout should fail");
   2795         assert!(matches!(
   2796             excessive_error,
   2797             PublishProxyError::InvalidSignedEvent(_)
   2798         ));
   2799         assert!(
   2800             proxy
   2801                 .store
   2802                 .list_jobs_for_principal(&principal, 50)
   2803                 .expect("jobs")
   2804                 .is_empty()
   2805         );
   2806         assert!(adapter.captured_raw_events().is_empty());
   2807     }
   2808 
   2809     #[tokio::test]
   2810     async fn publish_event_default_timeout_fingerprints_as_effective_timeout() {
   2811         let identity = RadrootsIdentity::generate();
   2812         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2813         let principal = principal(
   2814             &proxy,
   2815             identity.public_key_hex(),
   2816             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2817             false,
   2818             PublishJobVisibility::Own,
   2819         );
   2820         let event = signed_event(&identity, "{}");
   2821         let mut default_timeout = publish_request(
   2822             event.clone(),
   2823             Vec::new(),
   2824             PublishRelayPolicy::DaemonDefaultOnly,
   2825             PublishDeliveryPolicy::Any,
   2826             Some("idem-default-timeout"),
   2827         );
   2828         default_timeout.timeout_ms = None;
   2829         let mut explicit_default = publish_request(
   2830             event,
   2831             Vec::new(),
   2832             PublishRelayPolicy::DaemonDefaultOnly,
   2833             PublishDeliveryPolicy::Any,
   2834             Some("idem-default-timeout"),
   2835         );
   2836         explicit_default.timeout_ms = Some(10_000);
   2837 
   2838         let first = proxy
   2839             .publish_event(&principal, default_timeout)
   2840             .await
   2841             .expect("first");
   2842         let duplicate = proxy
   2843             .publish_event(&principal, explicit_default)
   2844             .await
   2845             .expect("duplicate");
   2846         assert!(!first.deduplicated);
   2847         assert!(duplicate.deduplicated);
   2848         assert_eq!(duplicate.job.job_id, first.job.job_id);
   2849     }
   2850 
   2851     #[tokio::test]
   2852     async fn publish_event_fingerprint_conflicts_on_different_effective_timeout() {
   2853         let identity = RadrootsIdentity::generate();
   2854         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2855         let principal = principal(
   2856             &proxy,
   2857             identity.public_key_hex(),
   2858             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2859             false,
   2860             PublishJobVisibility::Own,
   2861         );
   2862         let event = signed_event(&identity, "{}");
   2863         let first = publish_request(
   2864             event.clone(),
   2865             Vec::new(),
   2866             PublishRelayPolicy::DaemonDefaultOnly,
   2867             PublishDeliveryPolicy::Any,
   2868             Some("idem-timeout-conflict"),
   2869         );
   2870         let mut conflict = publish_request(
   2871             event,
   2872             Vec::new(),
   2873             PublishRelayPolicy::DaemonDefaultOnly,
   2874             PublishDeliveryPolicy::Any,
   2875             Some("idem-timeout-conflict"),
   2876         );
   2877         conflict.timeout_ms = Some(6_000);
   2878 
   2879         proxy.publish_event(&principal, first).await.expect("first");
   2880         let error = proxy
   2881             .publish_event(&principal, conflict)
   2882             .await
   2883             .expect_err("timeout conflict");
   2884         assert!(matches!(error, PublishProxyError::IdempotencyConflict(_)));
   2885     }
   2886 
   2887     #[tokio::test]
   2888     async fn publish_event_concurrency_limit_rejects_without_job_creation() {
   2889         let identity = RadrootsIdentity::generate();
   2890         let mut config = config_with_defaults(vec![RELAY_PRIMARY]);
   2891         config.max_concurrent_publish_jobs = 1;
   2892         let (proxy, adapter) = publish_proxy(config);
   2893         let principal = principal(
   2894             &proxy,
   2895             identity.public_key_hex(),
   2896             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2897             false,
   2898             PublishJobVisibility::Own,
   2899         );
   2900         let _permit = proxy.acquire_publish_permit().expect("permit");
   2901         let error = proxy
   2902             .publish_event(
   2903                 &principal,
   2904                 publish_request(
   2905                     signed_event(&identity, "{}"),
   2906                     Vec::new(),
   2907                     PublishRelayPolicy::DaemonDefaultOnly,
   2908                     PublishDeliveryPolicy::Any,
   2909                     Some("idem-concurrency"),
   2910                 ),
   2911             )
   2912             .await
   2913             .expect_err("concurrency limit");
   2914         assert!(matches!(error, PublishProxyError::ConcurrencyLimit));
   2915         assert!(
   2916             proxy
   2917                 .store
   2918                 .list_jobs_for_principal(&principal, 50)
   2919                 .expect("jobs")
   2920                 .is_empty()
   2921         );
   2922         assert!(adapter.captured_raw_events().is_empty());
   2923     }
   2924 
   2925     #[tokio::test]
   2926     async fn publish_jobs_respect_own_and_admin_visibility() {
   2927         let identity = RadrootsIdentity::generate();
   2928         let other_identity = RadrootsIdentity::generate();
   2929         let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY]));
   2930         let owner = principal(
   2931             &proxy,
   2932             identity.public_key_hex(),
   2933             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2934             false,
   2935             PublishJobVisibility::Own,
   2936         );
   2937         let other = principal(
   2938             &proxy,
   2939             other_identity.public_key_hex(),
   2940             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2941             false,
   2942             PublishJobVisibility::Own,
   2943         );
   2944         let admin = principal(
   2945             &proxy,
   2946             other_identity.public_key_hex(),
   2947             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2948             false,
   2949             PublishJobVisibility::Admin,
   2950         );
   2951         let response = proxy
   2952             .publish_event(
   2953                 &owner,
   2954                 publish_request(
   2955                     signed_event(&identity, "{}"),
   2956                     Vec::new(),
   2957                     PublishRelayPolicy::DaemonDefaultOnly,
   2958                     PublishDeliveryPolicy::Any,
   2959                     None,
   2960                 ),
   2961             )
   2962             .await
   2963             .expect("publish");
   2964 
   2965         assert!(
   2966             proxy
   2967                 .store
   2968                 .job_by_id_for_principal(response.job.job_id.as_str(), &other)
   2969                 .expect("other read")
   2970                 .is_none()
   2971         );
   2972         assert!(
   2973             proxy
   2974                 .store
   2975                 .job_by_id_for_principal(response.job.job_id.as_str(), &admin)
   2976                 .expect("admin read")
   2977                 .is_some()
   2978         );
   2979     }
   2980 
   2981     #[tokio::test]
   2982     async fn publish_event_records_retryable_relay_failures() {
   2983         let identity = RadrootsIdentity::generate();
   2984         let adapter = RadrootsMockRelayPublishAdapter::new().with_outcome(
   2985             RELAY_PRIMARY,
   2986             RadrootsRelayOutcome::connection_failed("error: unavailable"),
   2987         );
   2988         let proxy = PublishProxy::memory(config_with_defaults(vec![RELAY_PRIMARY]))
   2989             .expect("proxy")
   2990             .with_publisher(Arc::new(adapter));
   2991         let principal = principal(
   2992             &proxy,
   2993             identity.public_key_hex(),
   2994             vec![PublishRelayPolicy::DaemonDefaultOnly],
   2995             false,
   2996             PublishJobVisibility::Own,
   2997         );
   2998         let response = proxy
   2999             .publish_event(
   3000                 &principal,
   3001                 publish_request(
   3002                     signed_event(&identity, "{}"),
   3003                     Vec::new(),
   3004                     PublishRelayPolicy::DaemonDefaultOnly,
   3005                     PublishDeliveryPolicy::Any,
   3006                     None,
   3007                 ),
   3008             )
   3009             .await
   3010             .expect("publish");
   3011 
   3012         assert_eq!(
   3013             response.job.status,
   3014             PublishJobStatus::DeliveryUnsatisfiedRetryable
   3015         );
   3016         assert_eq!(response.job.retryable_count, 1);
   3017     }
   3018 }