mod.rs (107515B)
1 use std::collections::BTreeMap; 2 use std::fmt; 3 use std::future::Future; 4 use std::net::IpAddr; 5 use std::path::{Path, PathBuf}; 6 use std::pin::Pin; 7 use std::str::FromStr; 8 use std::sync::{Arc, Mutex}; 9 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; 10 11 use radroots_events::RadrootsNostrEvent; 12 use radroots_events::draft::{ 13 RadrootsDraftError, RadrootsSignedNostrEvent, RadrootsSignedNostrEventParts, 14 }; 15 use radroots_nostr::prelude::{ 16 RadrootsNostrClient, RadrootsNostrEventVerification, RadrootsNostrFilter, RadrootsNostrKind, 17 RadrootsNostrPublicKey, radroots_nostr_verify_event, 18 }; 19 use radroots_publish_proxy_protocol::{ 20 PublishDeliveryPolicy, PublishEventRequest, PublishEventResponse, PublishJobStatus, 21 PublishJobView, PublishRelayOutcome, PublishRelayOutcomeKind, PublishRelayPolicy, 22 PublishRelaySource, SignedNostrEventWire, 23 }; 24 use radroots_relay_transport::{ 25 RadrootsNostrClientPublishAdapter, RadrootsRelayOutcome, RadrootsRelayOutcomeKind, 26 RadrootsRelayPublishAdapter, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, 27 RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy, 28 }; 29 use rusqlite::types::Type; 30 use rusqlite::{Connection, OptionalExtension, Row, params}; 31 use serde::{Deserialize, Serialize}; 32 use sha2::{Digest, Sha256}; 33 use thiserror::Error; 34 use tokio::sync::{OwnedSemaphorePermit, Semaphore}; 35 use uuid::Uuid; 36 37 use crate::app::config::PublishProxyConfig; 38 39 const TOKEN_PREFIX: &str = "rrd_pp_"; 40 const TOKEN_HASH_PREFIX: &str = "sha256:"; 41 const SCHEMA_VERSION: i64 = 2; 42 43 #[derive(Debug, Error)] 44 pub enum PublishProxyError { 45 #[error("publish proxy storage error: {0}")] 46 Sqlite(#[from] rusqlite::Error), 47 #[error("publish proxy json error: {0}")] 48 Json(#[from] serde_json::Error), 49 #[error("publish proxy io error: {0}")] 50 Io(#[from] std::io::Error), 51 #[error("invalid publish proxy scope: {0}")] 52 InvalidScope(String), 53 #[error("invalid signed Nostr event: {0}")] 54 InvalidSignedEvent(String), 55 #[error("signed Nostr event verification failed: {0:?}")] 56 SignedEventVerification(RadrootsNostrEventVerification), 57 #[error("signed Nostr event conversion error: {0}")] 58 Draft(#[from] RadrootsDraftError), 59 #[error("publish proxy relay error: {0}")] 60 Relay(#[from] RadrootsRelayTransportError), 61 #[error("publish proxy transport error: {0}")] 62 Transport(String), 63 #[error("publish proxy concurrency limit reached")] 64 ConcurrencyLimit, 65 #[error("publish proxy idempotency conflict for key `{0}`")] 66 IdempotencyConflict(String), 67 } 68 69 #[derive(Clone)] 70 pub struct PublishProxy { 71 pub config: PublishProxyConfig, 72 pub store: PublishProxyStore, 73 publisher: Option<Arc<dyn RadrootsRelayPublishAdapter>>, 74 resolver: Arc<dyn PublishRelayResolver>, 75 author_relay_discovery: Arc<dyn PublishAuthorRelayDiscovery>, 76 publish_jobs: Arc<Semaphore>, 77 } 78 79 impl PublishProxy { 80 pub fn open(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { 81 let store = PublishProxyStore::open(config.database_path.clone())?; 82 let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs)); 83 Ok(Self { 84 config, 85 store, 86 publisher: None, 87 resolver: Arc::new(SystemPublishRelayResolver), 88 author_relay_discovery: Arc::new(NostrPublishAuthorRelayDiscovery), 89 publish_jobs, 90 }) 91 } 92 93 pub fn memory(config: PublishProxyConfig) -> Result<Self, PublishProxyError> { 94 let store = PublishProxyStore::memory()?; 95 let publish_jobs = Arc::new(Semaphore::new(config.max_concurrent_publish_jobs)); 96 Ok(Self { 97 config, 98 store, 99 publisher: None, 100 resolver: Arc::new(SystemPublishRelayResolver), 101 author_relay_discovery: Arc::new(NostrPublishAuthorRelayDiscovery), 102 publish_jobs, 103 }) 104 } 105 106 pub fn with_publisher(mut self, publisher: Arc<dyn RadrootsRelayPublishAdapter>) -> Self { 107 self.publisher = Some(publisher); 108 self 109 } 110 111 #[cfg(test)] 112 pub(crate) fn with_relay_resolver(mut self, resolver: Arc<dyn PublishRelayResolver>) -> Self { 113 self.resolver = resolver; 114 self 115 } 116 117 #[cfg(test)] 118 fn with_author_relay_discovery( 119 mut self, 120 author_relay_discovery: Arc<dyn PublishAuthorRelayDiscovery>, 121 ) -> Self { 122 self.author_relay_discovery = author_relay_discovery; 123 self 124 } 125 126 fn acquire_publish_permit(&self) -> Result<OwnedSemaphorePermit, PublishProxyError> { 127 self.publish_jobs 128 .clone() 129 .try_acquire_owned() 130 .map_err(|_| PublishProxyError::ConcurrencyLimit) 131 } 132 133 pub async fn publish_event( 134 &self, 135 principal: &PublishPrincipal, 136 request: PublishEventRequest, 137 ) -> Result<PublishEventResponse, PublishProxyError> { 138 request 139 .validate(self.config.max_relays_per_request) 140 .map_err(|error| { 141 PublishProxyError::InvalidSignedEvent(format!( 142 "publish request validation failed: {error}" 143 )) 144 })?; 145 principal.allows_event(&request)?; 146 let signed_event = signed_event_from_wire(&request.event)?; 147 if signed_event.raw_json.len() > self.config.max_event_bytes { 148 return Err(PublishProxyError::InvalidSignedEvent( 149 "signed event exceeds publish_proxy max_event_bytes".to_owned(), 150 )); 151 } 152 let effective_timeout_ms = effective_publish_timeout_ms(&self.config, request.timeout_ms)?; 153 let _permit = self.acquire_publish_permit()?; 154 let request_fingerprint = request_intent_fingerprint( 155 principal.principal_id.as_str(), 156 signed_event.raw_json.as_str(), 157 &request, 158 effective_timeout_ms, 159 )?; 160 let resolution = self 161 .resolve_relays_for_request(signed_event.pubkey.as_str(), &request) 162 .await?; 163 let response = self.store.record_publish_job(PublishJobInsert { 164 principal_id: principal.principal_id.clone(), 165 idempotency_key: request.idempotency_key.clone(), 166 request: request.clone(), 167 request_fingerprint, 168 effective_relay_count: resolution.targets.len(), 169 })?; 170 if response.deduplicated { 171 return Ok(response); 172 } 173 let completed = self 174 .complete_job_execution( 175 response.job.job_id.as_str(), 176 signed_event, 177 request.delivery_policy.clone(), 178 effective_timeout_ms, 179 resolution, 180 ) 181 .await?; 182 Ok(PublishEventResponse { 183 deduplicated: false, 184 job: completed, 185 }) 186 } 187 188 pub async fn resolve_relays_for_request( 189 &self, 190 pubkey: &str, 191 request: &PublishEventRequest, 192 ) -> Result<PublishRelayResolution, PublishProxyError> { 193 match request.relay_policy { 194 PublishRelayPolicy::ExplicitOnly => self.resolve_request_relays(&request.relays).await, 195 PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault => { 196 if !request.relays.is_empty() { 197 self.resolve_request_relays(&request.relays).await 198 } else { 199 self.resolve_author_or_default_relays(pubkey).await 200 } 201 } 202 PublishRelayPolicy::AuthorWriteThenDaemonDefault => { 203 self.resolve_author_or_default_relays(pubkey).await 204 } 205 PublishRelayPolicy::DaemonDefaultOnly => self.resolve_daemon_default_relays().await, 206 } 207 } 208 209 async fn resolve_author_or_default_relays( 210 &self, 211 pubkey: &str, 212 ) -> Result<PublishRelayResolution, PublishProxyError> { 213 let mut author_relays = self.resolve_author_write_relays(pubkey).await?; 214 if author_relays.targets.is_empty() { 215 let mut daemon_defaults = self.resolve_daemon_default_relays().await?; 216 daemon_defaults.outcomes.append(&mut author_relays.outcomes); 217 Ok(daemon_defaults) 218 } else { 219 Ok(author_relays) 220 } 221 } 222 223 async fn resolve_request_relays( 224 &self, 225 relays: &[String], 226 ) -> Result<PublishRelayResolution, PublishProxyError> { 227 let mut targets = Vec::new(); 228 let mut outcomes = Vec::new(); 229 for relay in relays { 230 match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) { 231 Ok(url) => { 232 self.push_checked_relay_target( 233 &mut targets, 234 &mut outcomes, 235 url, 236 PublishRelaySource::Request, 237 ) 238 .await; 239 } 240 Err(error) => outcomes.push(PublishRelayOutcome { 241 relay_url: relay.trim().to_owned(), 242 source: PublishRelaySource::Request, 243 attempted: false, 244 outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected, 245 message: Some(error.to_string()), 246 latency_ms: None, 247 }), 248 } 249 } 250 Ok(PublishRelayResolution { targets, outcomes }) 251 } 252 253 async fn resolve_author_write_relays( 254 &self, 255 pubkey: &str, 256 ) -> Result<PublishRelayResolution, PublishProxyError> { 257 let cached = self.store.cached_author_write_relays(pubkey)?; 258 let mut cached_resolution = self.resolve_author_relay_inputs(&cached).await?; 259 if !cached_resolution.targets.is_empty() { 260 return Ok(cached_resolution); 261 } 262 if self.config.author_relay_discovery_relays.is_empty() { 263 return Ok(cached_resolution); 264 } 265 let mut discovery_targets = self 266 .resolve_config_relays( 267 &self.config.author_relay_discovery_relays, 268 PublishRelaySource::DaemonDefault, 269 ) 270 .await?; 271 if discovery_targets.targets.is_empty() { 272 discovery_targets 273 .outcomes 274 .append(&mut cached_resolution.outcomes); 275 return Ok(discovery_targets); 276 } 277 let discovered = self 278 .author_relay_discovery 279 .fetch_author_write_relays( 280 pubkey, 281 std::mem::take(&mut discovery_targets.targets), 282 self.config.connect_timeout_secs, 283 ) 284 .await?; 285 self.store.cache_author_write_relays(pubkey, &discovered)?; 286 let mut discovered_resolution = self.resolve_author_relay_inputs(&discovered).await?; 287 discovered_resolution 288 .outcomes 289 .append(&mut cached_resolution.outcomes); 290 discovered_resolution 291 .outcomes 292 .append(&mut discovery_targets.outcomes); 293 Ok(discovered_resolution) 294 } 295 296 async fn resolve_author_relay_inputs( 297 &self, 298 relays: &[String], 299 ) -> Result<PublishRelayResolution, PublishProxyError> { 300 let mut targets = Vec::new(); 301 let mut outcomes = Vec::new(); 302 for relay in relays { 303 match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) { 304 Ok(url) => { 305 self.push_checked_relay_target( 306 &mut targets, 307 &mut outcomes, 308 url, 309 PublishRelaySource::AuthorWrite, 310 ) 311 .await; 312 } 313 Err(error) => outcomes.push(PublishRelayOutcome { 314 relay_url: relay.trim().to_owned(), 315 source: PublishRelaySource::AuthorWrite, 316 attempted: false, 317 outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected, 318 message: Some(error.to_string()), 319 latency_ms: None, 320 }), 321 } 322 } 323 Ok(PublishRelayResolution { targets, outcomes }) 324 } 325 326 async fn resolve_daemon_default_relays( 327 &self, 328 ) -> Result<PublishRelayResolution, PublishProxyError> { 329 self.resolve_config_relays( 330 &self.config.daemon_default_publish_relays, 331 PublishRelaySource::DaemonDefault, 332 ) 333 .await 334 } 335 336 async fn resolve_config_relays( 337 &self, 338 relays: &[String], 339 source: PublishRelaySource, 340 ) -> Result<PublishRelayResolution, PublishProxyError> { 341 let mut targets = Vec::new(); 342 let mut outcomes = Vec::new(); 343 for relay in relays { 344 match RadrootsRelayUrl::parse(relay, relay_url_policy(&self.config)) { 345 Ok(url) => { 346 self.push_checked_relay_target(&mut targets, &mut outcomes, url, source) 347 .await; 348 } 349 Err(error) => outcomes.push(PublishRelayOutcome { 350 relay_url: relay.trim().to_owned(), 351 source, 352 attempted: false, 353 outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected, 354 message: Some(error.to_string()), 355 latency_ms: None, 356 }), 357 } 358 } 359 Ok(PublishRelayResolution { targets, outcomes }) 360 } 361 362 async fn push_checked_relay_target( 363 &self, 364 targets: &mut Vec<ResolvedPublishRelay>, 365 outcomes: &mut Vec<PublishRelayOutcome>, 366 url: RadrootsRelayUrl, 367 source: PublishRelaySource, 368 ) { 369 if relay_url_policy(&self.config) == RadrootsRelayUrlPolicy::Localhost { 370 push_resolved_relay(targets, url, source); 371 return; 372 } 373 match self.resolver.resolve(&url).await { 374 Ok(addresses) if addresses.is_empty() => { 375 outcomes.push(relay_resolution_connection_failure( 376 url.as_str(), 377 source, 378 "dns lookup returned no addresses", 379 )); 380 } 381 Ok(addresses) => match url.validate_public_resolved_ip_addrs(addresses) { 382 Ok(()) => push_resolved_relay(targets, url, source), 383 Err(error) => outcomes.push(PublishRelayOutcome { 384 relay_url: url.as_str().to_owned(), 385 source, 386 attempted: false, 387 outcome_kind: PublishRelayOutcomeKind::RelayUrlRejected, 388 message: Some(error.to_string()), 389 latency_ms: None, 390 }), 391 }, 392 Err(error) => outcomes.push(relay_resolution_connection_failure( 393 url.as_str(), 394 source, 395 format!("dns lookup failed: {error}"), 396 )), 397 } 398 } 399 400 async fn complete_job_execution( 401 &self, 402 job_id: &str, 403 signed_event: RadrootsSignedNostrEvent, 404 delivery_policy: PublishDeliveryPolicy, 405 timeout_ms: u64, 406 resolution: PublishRelayResolution, 407 ) -> Result<PublishJobView, PublishProxyError> { 408 if resolution.targets.is_empty() { 409 let status = if resolution 410 .outcomes 411 .iter() 412 .any(|outcome| outcome.outcome_kind.is_retryable()) 413 { 414 PublishJobStatus::DeliveryUnsatisfiedRetryable 415 } else { 416 PublishJobStatus::Rejected 417 }; 418 let last_error = if status == PublishJobStatus::DeliveryUnsatisfiedRetryable { 419 "delivery_unsatisfied" 420 } else { 421 "no_publish_relays" 422 }; 423 self.store.complete_publish_job( 424 job_id, 425 status, 426 resolution.outcomes, 427 Some(last_error.to_owned()), 428 )?; 429 return self.store.job_by_id(job_id); 430 } 431 let required_ack_count = delivery_policy.required_ack_count(resolution.targets.len()); 432 if required_ack_count > resolution.targets.len() { 433 self.store.complete_publish_job( 434 job_id, 435 PublishJobStatus::Rejected, 436 resolution.outcomes, 437 Some("delivery_quorum_exceeds_relay_count".to_owned()), 438 )?; 439 return self.store.job_by_id(job_id); 440 } 441 let source_by_relay = resolution.source_by_relay(); 442 let target_set = RadrootsRelayTargetSet::from_urls( 443 resolution 444 .targets 445 .iter() 446 .map(|target| target.url.clone()) 447 .collect(), 448 )?; 449 let publish_request = 450 RadrootsRelayPublishRequest::new(signed_event, target_set, current_unix_millis()) 451 .with_accepted_quorum(required_ack_count); 452 let started = Instant::now(); 453 let publish_timeout = Duration::from_millis(timeout_ms); 454 let receipts = 455 match tokio::time::timeout(publish_timeout, self.publish_with_adapter(publish_request)) 456 .await 457 { 458 Ok(Ok(receipts)) => receipts, 459 Ok(Err(error)) => transport_error_receipts(&resolution.targets, error), 460 Err(_) => timeout_receipts(&resolution.targets), 461 }; 462 let latency_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX); 463 let mut outcomes = resolution.outcomes; 464 outcomes.extend(receipts.into_iter().map(|receipt| { 465 publish_outcome_from_receipt(receipt, &source_by_relay, Some(latency_ms)) 466 })); 467 let status = delivery_status(&delivery_policy, resolution.targets.len(), &outcomes); 468 let last_error = if status == PublishJobStatus::DeliverySatisfied { 469 None 470 } else { 471 Some("delivery_unsatisfied".to_owned()) 472 }; 473 self.store 474 .complete_publish_job(job_id, status, outcomes, last_error)?; 475 self.store.job_by_id(job_id) 476 } 477 478 async fn publish_with_adapter( 479 &self, 480 request: RadrootsRelayPublishRequest, 481 ) -> Result<Vec<RadrootsRelayPublishRelayReceipt>, PublishProxyError> { 482 if let Some(publisher) = &self.publisher { 483 return publisher 484 .publish(request) 485 .await 486 .map_err(PublishProxyError::Relay); 487 } 488 let adapter = RadrootsNostrClientPublishAdapter::new(RadrootsNostrClient::new_signerless()); 489 adapter 490 .publish(request) 491 .await 492 .map_err(PublishProxyError::Relay) 493 } 494 } 495 496 #[derive(Clone)] 497 pub struct PublishProxyStore { 498 inner: Arc<Mutex<Connection>>, 499 } 500 501 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 502 #[serde(rename_all = "snake_case")] 503 pub enum PublishJobVisibility { 504 Own, 505 Admin, 506 } 507 508 impl FromStr for PublishJobVisibility { 509 type Err = PublishProxyError; 510 511 fn from_str(value: &str) -> Result<Self, Self::Err> { 512 match value { 513 "own" => Ok(Self::Own), 514 "admin" => Ok(Self::Admin), 515 other => Err(PublishProxyError::InvalidScope(format!( 516 "unknown job visibility `{other}`" 517 ))), 518 } 519 } 520 } 521 522 impl fmt::Display for PublishJobVisibility { 523 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 524 match self { 525 Self::Own => f.write_str("own"), 526 Self::Admin => f.write_str("admin"), 527 } 528 } 529 } 530 531 #[derive(Debug, Clone, PartialEq, Eq)] 532 pub struct PublishPrincipalInit { 533 pub label: String, 534 pub token_hash: String, 535 pub allowed_pubkeys: Vec<String>, 536 pub allowed_kinds: Vec<u32>, 537 pub allowed_relay_policies: Vec<PublishRelayPolicy>, 538 pub allow_request_relays: bool, 539 pub job_visibility: PublishJobVisibility, 540 pub expires_at_unix: Option<i64>, 541 } 542 543 #[derive(Debug, Clone, PartialEq, Eq)] 544 pub struct PublishPrincipal { 545 pub principal_id: String, 546 pub label: String, 547 pub allowed_pubkeys: Vec<String>, 548 pub allowed_kinds: Vec<u32>, 549 pub allowed_relay_policies: Vec<PublishRelayPolicy>, 550 pub allow_request_relays: bool, 551 pub job_visibility: PublishJobVisibility, 552 pub expires_at_unix: Option<i64>, 553 } 554 555 impl PublishPrincipal { 556 pub fn allows_event(&self, request: &PublishEventRequest) -> Result<(), PublishProxyError> { 557 ensure_lower_hex("pubkey", request.event.pubkey.as_str(), 64)?; 558 if !self 559 .allowed_pubkeys 560 .iter() 561 .any(|pubkey| pubkey == &request.event.pubkey) 562 { 563 return Err(PublishProxyError::InvalidScope( 564 "principal is not allowed to publish for event pubkey".to_owned(), 565 )); 566 } 567 if !self.allowed_kinds.contains(&request.event.kind) { 568 return Err(PublishProxyError::InvalidScope( 569 "principal is not allowed to publish event kind".to_owned(), 570 )); 571 } 572 if !self.allowed_relay_policies.contains(&request.relay_policy) { 573 return Err(PublishProxyError::InvalidScope( 574 "principal is not allowed to use requested relay policy".to_owned(), 575 )); 576 } 577 if !self.allow_request_relays && !request.relays.is_empty() { 578 return Err(PublishProxyError::InvalidScope( 579 "principal is not allowed to provide request relays".to_owned(), 580 )); 581 } 582 Ok(()) 583 } 584 585 fn can_read_job(&self, principal_id: &str) -> bool { 586 self.job_visibility == PublishJobVisibility::Admin || self.principal_id == principal_id 587 } 588 } 589 590 #[derive(Debug, Clone)] 591 pub struct PublishJobInsert { 592 pub principal_id: String, 593 pub idempotency_key: Option<String>, 594 pub request: PublishEventRequest, 595 pub request_fingerprint: String, 596 pub effective_relay_count: usize, 597 } 598 599 #[derive(Debug, Clone, PartialEq, Eq)] 600 pub struct ResolvedPublishRelay { 601 pub url: RadrootsRelayUrl, 602 pub source: PublishRelaySource, 603 } 604 605 #[derive(Debug, Clone, PartialEq, Eq)] 606 pub struct PublishRelayResolution { 607 pub targets: Vec<ResolvedPublishRelay>, 608 pub outcomes: Vec<PublishRelayOutcome>, 609 } 610 611 impl PublishRelayResolution { 612 fn source_by_relay(&self) -> BTreeMap<String, PublishRelaySource> { 613 self.targets 614 .iter() 615 .map(|target| (target.url.as_str().to_owned(), target.source)) 616 .collect() 617 } 618 } 619 620 pub(crate) type PublishRelayResolveFuture<'a> = 621 Pin<Box<dyn Future<Output = Result<Vec<IpAddr>, std::io::Error>> + Send + 'a>>; 622 623 pub(crate) trait PublishRelayResolver: Send + Sync { 624 fn resolve<'a>(&'a self, url: &'a RadrootsRelayUrl) -> PublishRelayResolveFuture<'a>; 625 } 626 627 type PublishAuthorRelayDiscoveryFuture<'a> = 628 Pin<Box<dyn Future<Output = Result<Vec<String>, PublishProxyError>> + Send + 'a>>; 629 630 trait PublishAuthorRelayDiscovery: Send + Sync { 631 fn fetch_author_write_relays<'a>( 632 &'a self, 633 pubkey: &'a str, 634 discovery_targets: Vec<ResolvedPublishRelay>, 635 connect_timeout_secs: u64, 636 ) -> PublishAuthorRelayDiscoveryFuture<'a>; 637 } 638 639 #[derive(Debug)] 640 struct SystemPublishRelayResolver; 641 642 impl PublishRelayResolver for SystemPublishRelayResolver { 643 fn resolve<'a>(&'a self, url: &'a RadrootsRelayUrl) -> PublishRelayResolveFuture<'a> { 644 Box::pin(async move { 645 let (host, port) = relay_socket_target(url)?; 646 let addrs = tokio::net::lookup_host((host.as_str(), port)).await?; 647 Ok(addrs.map(|addr| addr.ip()).collect()) 648 }) 649 } 650 } 651 652 #[derive(Debug)] 653 struct NostrPublishAuthorRelayDiscovery; 654 655 impl PublishAuthorRelayDiscovery for NostrPublishAuthorRelayDiscovery { 656 fn fetch_author_write_relays<'a>( 657 &'a self, 658 pubkey: &'a str, 659 discovery_targets: Vec<ResolvedPublishRelay>, 660 connect_timeout_secs: u64, 661 ) -> PublishAuthorRelayDiscoveryFuture<'a> { 662 Box::pin(async move { 663 let Ok(public_key) = RadrootsNostrPublicKey::from_hex(pubkey) else { 664 return Ok(Vec::new()); 665 }; 666 let client = RadrootsNostrClient::new_signerless(); 667 for target in discovery_targets { 668 if client.add_read_relay(target.url.as_str()).await.is_err() { 669 return Ok(Vec::new()); 670 } 671 } 672 let filter = RadrootsNostrFilter::new() 673 .author(public_key) 674 .kind(RadrootsNostrKind::Custom(10_002)) 675 .limit(10); 676 let timeout = Duration::from_secs(connect_timeout_secs); 677 let Ok(events) = client.fetch_events(filter, timeout).await else { 678 return Ok(Vec::new()); 679 }; 680 let Some(event) = events.into_iter().max_by(|left, right| { 681 left.created_at 682 .as_secs() 683 .cmp(&right.created_at.as_secs()) 684 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex())) 685 }) else { 686 return Ok(Vec::new()); 687 }; 688 Ok(author_write_relays_from_nip65_event(&event)) 689 }) 690 } 691 } 692 693 impl PublishProxyStore { 694 pub fn open(path: PathBuf) -> Result<Self, PublishProxyError> { 695 if let Some(parent) = path 696 .parent() 697 .filter(|parent| !parent.as_os_str().is_empty()) 698 { 699 std::fs::create_dir_all(parent)?; 700 } 701 let connection = Connection::open(path)?; 702 Self::from_connection(connection) 703 } 704 705 pub fn memory() -> Result<Self, PublishProxyError> { 706 Self::from_connection(Connection::open_in_memory()?) 707 } 708 709 fn from_connection(connection: Connection) -> Result<Self, PublishProxyError> { 710 connection.execute_batch( 711 r#" 712 PRAGMA foreign_keys = ON; 713 CREATE TABLE IF NOT EXISTS publish_proxy_principals ( 714 principal_id TEXT PRIMARY KEY NOT NULL, 715 label TEXT NOT NULL, 716 token_hash TEXT NOT NULL UNIQUE, 717 allowed_pubkeys_json TEXT NOT NULL, 718 allowed_kinds_json TEXT NOT NULL, 719 allowed_relay_policies_json TEXT NOT NULL, 720 allow_request_relays INTEGER NOT NULL, 721 job_visibility TEXT NOT NULL, 722 expires_at_unix INTEGER, 723 revoked_at_unix INTEGER, 724 created_at_unix INTEGER NOT NULL 725 ); 726 CREATE TABLE IF NOT EXISTS publish_proxy_jobs ( 727 job_id TEXT PRIMARY KEY NOT NULL, 728 principal_id TEXT NOT NULL, 729 idempotency_key TEXT, 730 request_fingerprint TEXT NOT NULL, 731 status TEXT NOT NULL, 732 event_id TEXT NOT NULL, 733 event_pubkey TEXT NOT NULL, 734 event_kind INTEGER NOT NULL, 735 relay_policy_json TEXT NOT NULL, 736 delivery_policy_json TEXT NOT NULL, 737 requested_relay_count INTEGER NOT NULL, 738 effective_relay_count INTEGER NOT NULL, 739 request_json TEXT NOT NULL, 740 requested_at_ms INTEGER NOT NULL, 741 updated_at_ms INTEGER NOT NULL, 742 completed_at_ms INTEGER, 743 last_error TEXT, 744 FOREIGN KEY(principal_id) REFERENCES publish_proxy_principals(principal_id) 745 ); 746 CREATE UNIQUE INDEX IF NOT EXISTS publish_proxy_jobs_principal_idempotency_idx 747 ON publish_proxy_jobs(principal_id, idempotency_key) 748 WHERE idempotency_key IS NOT NULL; 749 CREATE TABLE IF NOT EXISTS publish_proxy_relay_results ( 750 job_id TEXT NOT NULL, 751 relay_url TEXT NOT NULL, 752 source TEXT NOT NULL, 753 attempted INTEGER NOT NULL, 754 outcome_kind TEXT NOT NULL, 755 message TEXT, 756 latency_ms INTEGER, 757 updated_at_ms INTEGER NOT NULL, 758 PRIMARY KEY(job_id, relay_url), 759 FOREIGN KEY(job_id) REFERENCES publish_proxy_jobs(job_id) 760 ); 761 CREATE TABLE IF NOT EXISTS publish_proxy_relay_list_cache ( 762 pubkey TEXT PRIMARY KEY NOT NULL, 763 relays_json TEXT NOT NULL, 764 updated_at_ms INTEGER NOT NULL 765 ); 766 "#, 767 )?; 768 migrate_schema(&connection)?; 769 recover_interrupted_publish_jobs(&connection)?; 770 connection.pragma_update(None, "user_version", SCHEMA_VERSION)?; 771 Ok(Self { 772 inner: Arc::new(Mutex::new(connection)), 773 }) 774 } 775 776 pub fn create_principal( 777 &self, 778 input: PublishPrincipalInit, 779 ) -> Result<PublishPrincipal, PublishProxyError> { 780 validate_principal_init(&input)?; 781 let principal_id = Uuid::new_v4().to_string(); 782 let now = current_unix_secs(); 783 let connection = self 784 .inner 785 .lock() 786 .unwrap_or_else(std::sync::PoisonError::into_inner); 787 connection.execute( 788 r#" 789 INSERT INTO publish_proxy_principals ( 790 principal_id, 791 label, 792 token_hash, 793 allowed_pubkeys_json, 794 allowed_kinds_json, 795 allowed_relay_policies_json, 796 allow_request_relays, 797 job_visibility, 798 expires_at_unix, 799 revoked_at_unix, 800 created_at_unix 801 ) 802 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, NULL, ?10) 803 "#, 804 params![ 805 principal_id, 806 input.label.trim(), 807 input.token_hash, 808 serde_json::to_string(&input.allowed_pubkeys)?, 809 serde_json::to_string(&input.allowed_kinds)?, 810 serde_json::to_string(&input.allowed_relay_policies)?, 811 input.allow_request_relays, 812 input.job_visibility.to_string(), 813 input.expires_at_unix, 814 now, 815 ], 816 )?; 817 drop(connection); 818 self.principal_by_id(principal_id.as_str())? 819 .ok_or_else(|| PublishProxyError::InvalidScope("created principal missing".to_owned())) 820 } 821 822 pub fn principal_for_token_hash( 823 &self, 824 token_hash: &str, 825 ) -> Result<Option<PublishPrincipal>, PublishProxyError> { 826 let now = current_unix_secs(); 827 let connection = self 828 .inner 829 .lock() 830 .unwrap_or_else(std::sync::PoisonError::into_inner); 831 let principal = connection 832 .query_row( 833 r#" 834 SELECT 835 principal_id, 836 label, 837 allowed_pubkeys_json, 838 allowed_kinds_json, 839 allowed_relay_policies_json, 840 allow_request_relays, 841 job_visibility, 842 expires_at_unix 843 FROM publish_proxy_principals 844 WHERE token_hash = ?1 845 AND revoked_at_unix IS NULL 846 AND (expires_at_unix IS NULL OR expires_at_unix > ?2) 847 "#, 848 params![token_hash, now], 849 principal_from_row, 850 ) 851 .optional()?; 852 Ok(principal) 853 } 854 855 pub fn principal_by_id( 856 &self, 857 principal_id: &str, 858 ) -> Result<Option<PublishPrincipal>, PublishProxyError> { 859 let connection = self 860 .inner 861 .lock() 862 .unwrap_or_else(std::sync::PoisonError::into_inner); 863 let principal = connection 864 .query_row( 865 r#" 866 SELECT 867 principal_id, 868 label, 869 allowed_pubkeys_json, 870 allowed_kinds_json, 871 allowed_relay_policies_json, 872 allow_request_relays, 873 job_visibility, 874 expires_at_unix 875 FROM publish_proxy_principals 876 WHERE principal_id = ?1 877 "#, 878 params![principal_id], 879 principal_from_row, 880 ) 881 .optional()?; 882 Ok(principal) 883 } 884 885 pub fn record_publish_job( 886 &self, 887 insert: PublishJobInsert, 888 ) -> Result<PublishEventResponse, PublishProxyError> { 889 if let Some(idempotency_key) = insert.idempotency_key.as_deref() { 890 if let Some(existing) = 891 self.job_for_principal_id_and_key(insert.principal_id.as_str(), idempotency_key)? 892 { 893 if existing.request_fingerprint != insert.request_fingerprint { 894 return Err(PublishProxyError::IdempotencyConflict( 895 idempotency_key.to_owned(), 896 )); 897 } 898 return Ok(PublishEventResponse { 899 deduplicated: true, 900 job: existing.view, 901 }); 902 } 903 } 904 905 let job_id = Uuid::new_v4().to_string(); 906 let now = current_unix_millis(); 907 let request_json = serde_json::to_string(&insert.request)?; 908 let connection = self 909 .inner 910 .lock() 911 .unwrap_or_else(std::sync::PoisonError::into_inner); 912 let insert_result = connection.execute( 913 r#" 914 INSERT INTO publish_proxy_jobs ( 915 job_id, 916 principal_id, 917 idempotency_key, 918 request_fingerprint, 919 status, 920 event_id, 921 event_pubkey, 922 event_kind, 923 relay_policy_json, 924 delivery_policy_json, 925 requested_relay_count, 926 effective_relay_count, 927 request_json, 928 requested_at_ms, 929 updated_at_ms 930 ) 931 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) 932 "#, 933 params![ 934 job_id, 935 insert.principal_id, 936 insert.idempotency_key, 937 insert.request_fingerprint, 938 serde_json::to_string(&PublishJobStatus::Publishing)?, 939 insert.request.event.id, 940 insert.request.event.pubkey, 941 insert.request.event.kind, 942 serde_json::to_string(&insert.request.relay_policy)?, 943 serde_json::to_string(&insert.request.delivery_policy)?, 944 insert.request.relays.len(), 945 insert.effective_relay_count, 946 request_json, 947 now, 948 now, 949 ], 950 ); 951 match insert_result { 952 Ok(_) => {} 953 Err(rusqlite::Error::SqliteFailure(error, _)) 954 if error.code == rusqlite::ErrorCode::ConstraintViolation => 955 { 956 return Err(PublishProxyError::IdempotencyConflict( 957 "idempotency key conflicts with an existing publish job".to_owned(), 958 )); 959 } 960 Err(error) => return Err(error.into()), 961 } 962 drop(connection); 963 let job = self.job_by_id(job_id.as_str())?; 964 Ok(PublishEventResponse { 965 deduplicated: false, 966 job, 967 }) 968 } 969 970 pub fn job_by_id_for_principal( 971 &self, 972 job_id: &str, 973 principal: &PublishPrincipal, 974 ) -> Result<Option<PublishJobView>, PublishProxyError> { 975 let connection = self 976 .inner 977 .lock() 978 .unwrap_or_else(std::sync::PoisonError::into_inner); 979 let sql = job_select_sql("WHERE job_id = ?1"); 980 let row = connection 981 .query_row(sql.as_str(), params![job_id], job_from_row) 982 .optional()?; 983 drop(connection); 984 let Some(mut job) = row else { 985 return Ok(None); 986 }; 987 if !principal.can_read_job(job.principal_id.as_str()) { 988 return Ok(None); 989 } 990 job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; 991 finalize_job_view(&mut job.view); 992 Ok(Some(job.view)) 993 } 994 995 pub fn list_jobs_for_principal( 996 &self, 997 principal: &PublishPrincipal, 998 limit: usize, 999 ) -> Result<Vec<PublishJobView>, PublishProxyError> { 1000 let limit = i64::try_from(limit.clamp(1, 200)).unwrap_or(200); 1001 let connection = self 1002 .inner 1003 .lock() 1004 .unwrap_or_else(std::sync::PoisonError::into_inner); 1005 let sql = if principal.job_visibility == PublishJobVisibility::Admin { 1006 job_select_sql("ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?1") 1007 } else { 1008 job_select_sql( 1009 "WHERE principal_id = ?1 ORDER BY requested_at_ms DESC, job_id DESC LIMIT ?2", 1010 ) 1011 }; 1012 let mut stmt = connection.prepare(sql.as_str())?; 1013 let rows = if principal.job_visibility == PublishJobVisibility::Admin { 1014 stmt.query_map(params![limit], job_from_row)? 1015 .collect::<Result<Vec<_>, _>>()? 1016 } else { 1017 stmt.query_map(params![principal.principal_id, limit], job_from_row)? 1018 .collect::<Result<Vec<_>, _>>()? 1019 }; 1020 drop(stmt); 1021 drop(connection); 1022 1023 rows.into_iter() 1024 .map(|mut row| { 1025 row.view.relays = self.relay_outcomes(row.view.job_id.as_str())?; 1026 finalize_job_view(&mut row.view); 1027 Ok(row.view) 1028 }) 1029 .collect() 1030 } 1031 1032 fn job_for_principal_id_and_key( 1033 &self, 1034 principal_id: &str, 1035 idempotency_key: &str, 1036 ) -> Result<Option<PublishJobRow>, PublishProxyError> { 1037 let connection = self 1038 .inner 1039 .lock() 1040 .unwrap_or_else(std::sync::PoisonError::into_inner); 1041 let sql = job_select_sql("WHERE principal_id = ?1 AND idempotency_key = ?2"); 1042 let row = connection 1043 .query_row( 1044 sql.as_str(), 1045 params![principal_id, idempotency_key], 1046 job_from_row, 1047 ) 1048 .optional()?; 1049 drop(connection); 1050 let Some(mut job) = row else { 1051 return Ok(None); 1052 }; 1053 job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; 1054 finalize_job_view(&mut job.view); 1055 Ok(Some(job)) 1056 } 1057 1058 pub fn job_by_id(&self, job_id: &str) -> Result<PublishJobView, PublishProxyError> { 1059 let connection = self 1060 .inner 1061 .lock() 1062 .unwrap_or_else(std::sync::PoisonError::into_inner); 1063 let sql = job_select_sql("WHERE job_id = ?1"); 1064 let row = connection 1065 .query_row(sql.as_str(), params![job_id], job_from_row) 1066 .optional()?; 1067 drop(connection); 1068 let Some(mut job) = row else { 1069 return Err(PublishProxyError::InvalidScope( 1070 "unknown publish job".to_owned(), 1071 )); 1072 }; 1073 job.view.relays = self.relay_outcomes(job.view.job_id.as_str())?; 1074 finalize_job_view(&mut job.view); 1075 Ok(job.view) 1076 } 1077 1078 pub fn complete_publish_job( 1079 &self, 1080 job_id: &str, 1081 status: PublishJobStatus, 1082 outcomes: Vec<PublishRelayOutcome>, 1083 last_error: Option<String>, 1084 ) -> Result<(), PublishProxyError> { 1085 let now = current_unix_millis(); 1086 let connection = self 1087 .inner 1088 .lock() 1089 .unwrap_or_else(std::sync::PoisonError::into_inner); 1090 connection.execute( 1091 r#" 1092 UPDATE publish_proxy_jobs 1093 SET status = ?2, 1094 updated_at_ms = ?3, 1095 completed_at_ms = ?4, 1096 last_error = ?5 1097 WHERE job_id = ?1 1098 "#, 1099 params![ 1100 job_id, 1101 serde_json::to_string(&status)?, 1102 now, 1103 now, 1104 last_error, 1105 ], 1106 )?; 1107 connection.execute( 1108 "DELETE FROM publish_proxy_relay_results WHERE job_id = ?1", 1109 params![job_id], 1110 )?; 1111 for outcome in outcomes { 1112 connection.execute( 1113 r#" 1114 INSERT OR REPLACE INTO publish_proxy_relay_results ( 1115 job_id, 1116 relay_url, 1117 source, 1118 attempted, 1119 outcome_kind, 1120 message, 1121 latency_ms, 1122 updated_at_ms 1123 ) 1124 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) 1125 "#, 1126 params![ 1127 job_id, 1128 outcome.relay_url, 1129 serde_json::to_string(&outcome.source)?, 1130 outcome.attempted, 1131 serde_json::to_string(&outcome.outcome_kind)?, 1132 outcome.message, 1133 outcome 1134 .latency_ms 1135 .and_then(|value| i64::try_from(value).ok()), 1136 now, 1137 ], 1138 )?; 1139 } 1140 Ok(()) 1141 } 1142 1143 pub fn cached_author_write_relays( 1144 &self, 1145 pubkey: &str, 1146 ) -> Result<Vec<String>, PublishProxyError> { 1147 let connection = self 1148 .inner 1149 .lock() 1150 .unwrap_or_else(std::sync::PoisonError::into_inner); 1151 let relays_json = connection 1152 .query_row( 1153 "SELECT relays_json FROM publish_proxy_relay_list_cache WHERE pubkey = ?1", 1154 params![pubkey], 1155 |row| row.get::<_, String>(0), 1156 ) 1157 .optional()?; 1158 relays_json 1159 .map(|value| serde_json::from_str(value.as_str()).map_err(PublishProxyError::from)) 1160 .unwrap_or_else(|| Ok(Vec::new())) 1161 } 1162 1163 pub fn cache_author_write_relays( 1164 &self, 1165 pubkey: &str, 1166 relays: &[String], 1167 ) -> Result<(), PublishProxyError> { 1168 let now = current_unix_millis(); 1169 let connection = self 1170 .inner 1171 .lock() 1172 .unwrap_or_else(std::sync::PoisonError::into_inner); 1173 connection.execute( 1174 r#" 1175 INSERT INTO publish_proxy_relay_list_cache (pubkey, relays_json, updated_at_ms) 1176 VALUES (?1, ?2, ?3) 1177 ON CONFLICT(pubkey) DO UPDATE SET 1178 relays_json = excluded.relays_json, 1179 updated_at_ms = excluded.updated_at_ms 1180 "#, 1181 params![pubkey, serde_json::to_string(relays)?, now], 1182 )?; 1183 Ok(()) 1184 } 1185 1186 fn relay_outcomes(&self, job_id: &str) -> Result<Vec<PublishRelayOutcome>, PublishProxyError> { 1187 let connection = self 1188 .inner 1189 .lock() 1190 .unwrap_or_else(std::sync::PoisonError::into_inner); 1191 let mut stmt = connection.prepare( 1192 r#" 1193 SELECT relay_url, source, attempted, outcome_kind, message, latency_ms 1194 FROM publish_proxy_relay_results 1195 WHERE job_id = ?1 1196 ORDER BY relay_url 1197 "#, 1198 )?; 1199 let outcomes = stmt 1200 .query_map(params![job_id], relay_outcome_from_row)? 1201 .collect::<Result<Vec<_>, _>>()?; 1202 Ok(outcomes) 1203 } 1204 } 1205 1206 struct PublishJobRow { 1207 principal_id: String, 1208 request_fingerprint: String, 1209 view: PublishJobView, 1210 } 1211 1212 fn migrate_schema(connection: &Connection) -> Result<(), PublishProxyError> { 1213 let version: i64 = connection.pragma_query_value(None, "user_version", |row| row.get(0))?; 1214 if version < 2 { 1215 if !table_has_column(connection, "publish_proxy_jobs", "request_fingerprint")? { 1216 connection.execute( 1217 "ALTER TABLE publish_proxy_jobs ADD COLUMN request_fingerprint TEXT NOT NULL DEFAULT ''", 1218 [], 1219 )?; 1220 } 1221 if !table_has_column(connection, "publish_proxy_jobs", "effective_relay_count")? { 1222 connection.execute( 1223 "ALTER TABLE publish_proxy_jobs ADD COLUMN effective_relay_count INTEGER NOT NULL DEFAULT 0", 1224 [], 1225 )?; 1226 connection.execute( 1227 "UPDATE publish_proxy_jobs SET effective_relay_count = requested_relay_count WHERE effective_relay_count = 0", 1228 [], 1229 )?; 1230 } 1231 } 1232 Ok(()) 1233 } 1234 1235 fn recover_interrupted_publish_jobs(connection: &Connection) -> Result<(), PublishProxyError> { 1236 let now = current_unix_millis(); 1237 connection.execute( 1238 r#" 1239 UPDATE publish_proxy_jobs 1240 SET status = ?1, 1241 updated_at_ms = ?2, 1242 completed_at_ms = ?3, 1243 last_error = ?4 1244 WHERE status = ?5 1245 "#, 1246 params![ 1247 serde_json::to_string(&PublishJobStatus::DeliveryUnsatisfiedRetryable)?, 1248 now, 1249 now, 1250 "publish_attempt_interrupted", 1251 serde_json::to_string(&PublishJobStatus::Publishing)?, 1252 ], 1253 )?; 1254 Ok(()) 1255 } 1256 1257 fn table_has_column( 1258 connection: &Connection, 1259 table: &str, 1260 column: &str, 1261 ) -> Result<bool, PublishProxyError> { 1262 let mut stmt = connection.prepare(format!("PRAGMA table_info({table})").as_str())?; 1263 let columns = stmt 1264 .query_map([], |row| row.get::<_, String>(1))? 1265 .collect::<Result<Vec<_>, _>>()?; 1266 Ok(columns.iter().any(|existing| existing == column)) 1267 } 1268 1269 fn job_select_sql(tail: &str) -> String { 1270 format!( 1271 r#" 1272 SELECT 1273 job_id, 1274 principal_id, 1275 request_fingerprint, 1276 status, 1277 event_id, 1278 event_pubkey, 1279 event_kind, 1280 relay_policy_json, 1281 delivery_policy_json, 1282 effective_relay_count, 1283 requested_at_ms, 1284 completed_at_ms, 1285 last_error 1286 FROM publish_proxy_jobs 1287 {tail} 1288 "# 1289 ) 1290 } 1291 1292 fn principal_from_row(row: &Row<'_>) -> Result<PublishPrincipal, rusqlite::Error> { 1293 let visibility: String = row.get(6)?; 1294 Ok(PublishPrincipal { 1295 principal_id: row.get(0)?, 1296 label: row.get(1)?, 1297 allowed_pubkeys: json_column(row, 2)?, 1298 allowed_kinds: json_column(row, 3)?, 1299 allowed_relay_policies: json_column(row, 4)?, 1300 allow_request_relays: row.get(5)?, 1301 job_visibility: PublishJobVisibility::from_str(visibility.as_str()) 1302 .map_err(|error| conversion_error(6, error))?, 1303 expires_at_unix: row.get(7)?, 1304 }) 1305 } 1306 1307 fn job_from_row(row: &Row<'_>) -> Result<PublishJobRow, rusqlite::Error> { 1308 let status: PublishJobStatus = json_text(row, 3)?; 1309 let relay_policy: PublishRelayPolicy = json_text(row, 7)?; 1310 let delivery_policy: PublishDeliveryPolicy = json_text(row, 8)?; 1311 let relay_count: i64 = row.get(9)?; 1312 Ok(PublishJobRow { 1313 principal_id: row.get(1)?, 1314 request_fingerprint: row.get(2)?, 1315 view: PublishJobView { 1316 job_id: row.get(0)?, 1317 status, 1318 terminal: false, 1319 delivery_satisfied: false, 1320 event_id: row.get(4)?, 1321 pubkey: row.get(5)?, 1322 event_kind: row.get::<_, i64>(6)? as u32, 1323 relay_policy, 1324 delivery_policy, 1325 relay_count: usize::try_from(relay_count).unwrap_or(0), 1326 acknowledged_count: 0, 1327 retryable_count: 0, 1328 terminal_count: 0, 1329 requested_at_ms: row.get(10)?, 1330 completed_at_ms: row.get(11)?, 1331 last_error: row.get(12)?, 1332 relays: Vec::new(), 1333 }, 1334 }) 1335 } 1336 1337 fn relay_outcome_from_row(row: &Row<'_>) -> Result<PublishRelayOutcome, rusqlite::Error> { 1338 let source: PublishRelaySource = json_text(row, 1)?; 1339 let outcome_kind: PublishRelayOutcomeKind = json_text(row, 3)?; 1340 Ok(PublishRelayOutcome { 1341 relay_url: row.get(0)?, 1342 source, 1343 attempted: row.get(2)?, 1344 outcome_kind, 1345 message: row.get(4)?, 1346 latency_ms: row 1347 .get::<_, Option<i64>>(5)? 1348 .map(|latency| u64::try_from(latency).unwrap_or(0)), 1349 }) 1350 } 1351 1352 fn finalize_job_view(view: &mut PublishJobView) { 1353 view.acknowledged_count = view 1354 .relays 1355 .iter() 1356 .filter(|relay| relay.outcome_kind.counts_toward_quorum()) 1357 .count(); 1358 view.retryable_count = view 1359 .relays 1360 .iter() 1361 .filter(|relay| relay.outcome_kind.is_retryable()) 1362 .count(); 1363 view.terminal_count = view 1364 .relays 1365 .iter() 1366 .filter(|relay| relay.outcome_kind.is_terminal_failure()) 1367 .count(); 1368 view.terminal = matches!( 1369 view.status, 1370 PublishJobStatus::DeliverySatisfied 1371 | PublishJobStatus::DeliveryUnsatisfiedTerminal 1372 | PublishJobStatus::Rejected 1373 ); 1374 view.delivery_satisfied = view.status == PublishJobStatus::DeliverySatisfied; 1375 } 1376 1377 fn validate_principal_init(input: &PublishPrincipalInit) -> Result<(), PublishProxyError> { 1378 if input.label.trim().is_empty() { 1379 return Err(PublishProxyError::InvalidScope( 1380 "principal label must not be empty".to_owned(), 1381 )); 1382 } 1383 if !input.token_hash.starts_with(TOKEN_HASH_PREFIX) { 1384 return Err(PublishProxyError::InvalidScope( 1385 "principal token hash must use sha256 prefix".to_owned(), 1386 )); 1387 } 1388 if input.allowed_pubkeys.is_empty() { 1389 return Err(PublishProxyError::InvalidScope( 1390 "principal must include at least one allowed pubkey".to_owned(), 1391 )); 1392 } 1393 for pubkey in &input.allowed_pubkeys { 1394 ensure_lower_hex("allowed_pubkey", pubkey, 64)?; 1395 } 1396 if input.allowed_kinds.is_empty() { 1397 return Err(PublishProxyError::InvalidScope( 1398 "principal must include at least one allowed kind".to_owned(), 1399 )); 1400 } 1401 if input 1402 .allowed_kinds 1403 .iter() 1404 .any(|kind| *kind > u16::MAX as u32) 1405 { 1406 return Err(PublishProxyError::InvalidScope( 1407 "allowed kind exceeds publish proxy range".to_owned(), 1408 )); 1409 } 1410 if input.allowed_relay_policies.is_empty() { 1411 return Err(PublishProxyError::InvalidScope( 1412 "principal must include at least one allowed relay policy".to_owned(), 1413 )); 1414 } 1415 Ok(()) 1416 } 1417 1418 pub fn generate_bearer_token() -> String { 1419 let bytes: [u8; 32] = rand::random(); 1420 format!("{TOKEN_PREFIX}{}", hex_lower(&bytes)) 1421 } 1422 1423 pub fn hash_bearer_token(token: &str) -> String { 1424 let mut hasher = Sha256::new(); 1425 hasher.update(token.as_bytes()); 1426 format!("{TOKEN_HASH_PREFIX}{}", hex_lower(&hasher.finalize())) 1427 } 1428 1429 fn hex_lower(bytes: &[u8]) -> String { 1430 let mut output = String::with_capacity(bytes.len() * 2); 1431 for byte in bytes { 1432 use std::fmt::Write; 1433 let _ = write!(&mut output, "{byte:02x}"); 1434 } 1435 output 1436 } 1437 1438 pub fn parse_relay_policy(value: &str) -> Result<PublishRelayPolicy, PublishProxyError> { 1439 match value { 1440 "explicit_only" => Ok(PublishRelayPolicy::ExplicitOnly), 1441 "request_then_author_write_then_daemon_default" => { 1442 Ok(PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault) 1443 } 1444 "author_write_then_daemon_default" => Ok(PublishRelayPolicy::AuthorWriteThenDaemonDefault), 1445 "daemon_default_only" => Ok(PublishRelayPolicy::DaemonDefaultOnly), 1446 other => Err(PublishProxyError::InvalidScope(format!( 1447 "unknown relay policy `{other}`" 1448 ))), 1449 } 1450 } 1451 1452 fn signed_event_from_wire( 1453 event: &SignedNostrEventWire, 1454 ) -> Result<RadrootsSignedNostrEvent, PublishProxyError> { 1455 event 1456 .validate() 1457 .map_err(|error| PublishProxyError::InvalidSignedEvent(error.to_string()))?; 1458 let created_at = u32::try_from(event.created_at).map_err(|_| { 1459 PublishProxyError::InvalidSignedEvent( 1460 "signed event created_at exceeds daemon-supported range".to_owned(), 1461 ) 1462 })?; 1463 let raw_json = serde_json::to_string(event)?; 1464 let radroots_event = RadrootsNostrEvent { 1465 id: event.id.clone(), 1466 author: event.pubkey.clone(), 1467 created_at, 1468 kind: event.kind, 1469 tags: event.tags.clone(), 1470 content: event.content.clone(), 1471 sig: event.sig.clone(), 1472 }; 1473 match radroots_nostr_verify_event(&radroots_event) { 1474 RadrootsNostrEventVerification::Verified => {} 1475 verification => return Err(PublishProxyError::SignedEventVerification(verification)), 1476 } 1477 RadrootsSignedNostrEvent::new(RadrootsSignedNostrEventParts { 1478 id: event.id.clone(), 1479 pubkey: event.pubkey.clone(), 1480 created_at, 1481 kind: event.kind, 1482 tags: event.tags.clone(), 1483 content: event.content.clone(), 1484 sig: event.sig.clone(), 1485 raw_json, 1486 }) 1487 .map_err(PublishProxyError::from) 1488 } 1489 1490 fn request_intent_fingerprint( 1491 principal_id: &str, 1492 canonical_event_json: &str, 1493 request: &PublishEventRequest, 1494 effective_timeout_ms: u64, 1495 ) -> Result<String, PublishProxyError> { 1496 #[derive(Serialize)] 1497 struct FingerprintInput<'a> { 1498 principal_id: &'a str, 1499 canonical_event_json: &'a str, 1500 relays: Vec<String>, 1501 relay_policy: &'a PublishRelayPolicy, 1502 delivery_policy: &'a PublishDeliveryPolicy, 1503 effective_timeout_ms: u64, 1504 } 1505 1506 let input = FingerprintInput { 1507 principal_id, 1508 canonical_event_json, 1509 relays: request 1510 .relays 1511 .iter() 1512 .map(|relay| relay.trim().to_owned()) 1513 .collect(), 1514 relay_policy: &request.relay_policy, 1515 delivery_policy: &request.delivery_policy, 1516 effective_timeout_ms, 1517 }; 1518 let bytes = serde_json::to_vec(&input)?; 1519 let mut hasher = Sha256::new(); 1520 hasher.update(bytes); 1521 Ok(hex_lower(&hasher.finalize())) 1522 } 1523 1524 fn effective_publish_timeout_ms( 1525 config: &PublishProxyConfig, 1526 timeout_ms: Option<u64>, 1527 ) -> Result<u64, PublishProxyError> { 1528 let max_timeout_ms = config.connect_timeout_secs.saturating_mul(1_000); 1529 match timeout_ms { 1530 Some(0) => Err(PublishProxyError::InvalidSignedEvent( 1531 "timeout_ms must be greater than zero".to_owned(), 1532 )), 1533 Some(timeout_ms) if timeout_ms > max_timeout_ms => { 1534 Err(PublishProxyError::InvalidSignedEvent(format!( 1535 "timeout_ms must be at most {max_timeout_ms}" 1536 ))) 1537 } 1538 Some(timeout_ms) => Ok(timeout_ms), 1539 None => Ok(max_timeout_ms), 1540 } 1541 } 1542 1543 fn push_resolved_relay( 1544 targets: &mut Vec<ResolvedPublishRelay>, 1545 url: RadrootsRelayUrl, 1546 source: PublishRelaySource, 1547 ) { 1548 if !targets.iter().any(|target| target.url == url) { 1549 targets.push(ResolvedPublishRelay { url, source }); 1550 } 1551 } 1552 1553 fn relay_resolution_connection_failure( 1554 relay_url: impl Into<String>, 1555 source: PublishRelaySource, 1556 message: impl Into<String>, 1557 ) -> PublishRelayOutcome { 1558 PublishRelayOutcome { 1559 relay_url: relay_url.into(), 1560 source, 1561 attempted: false, 1562 outcome_kind: PublishRelayOutcomeKind::ConnectionFailed, 1563 message: Some(message.into()), 1564 latency_ms: None, 1565 } 1566 } 1567 1568 fn relay_socket_target(url: &RadrootsRelayUrl) -> Result<(String, u16), std::io::Error> { 1569 let parsed = url::Url::parse(url.as_str()) 1570 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?; 1571 let host = parsed 1572 .host_str() 1573 .filter(|host| !host.is_empty()) 1574 .ok_or_else(|| { 1575 std::io::Error::new( 1576 std::io::ErrorKind::InvalidInput, 1577 "relay URL must include a DNS host", 1578 ) 1579 })? 1580 .to_owned(); 1581 let port = parsed.port_or_known_default().ok_or_else(|| { 1582 std::io::Error::new( 1583 std::io::ErrorKind::InvalidInput, 1584 "relay URL scheme must have a default port", 1585 ) 1586 })?; 1587 Ok((host, port)) 1588 } 1589 1590 fn relay_url_policy(config: &PublishProxyConfig) -> RadrootsRelayUrlPolicy { 1591 match config.relay_url_policy { 1592 crate::app::config::PublishProxyRelayUrlPolicy::Public => RadrootsRelayUrlPolicy::Public, 1593 crate::app::config::PublishProxyRelayUrlPolicy::Localhost => { 1594 RadrootsRelayUrlPolicy::Localhost 1595 } 1596 } 1597 } 1598 1599 fn author_write_relays_from_nip65_event( 1600 event: &radroots_nostr::prelude::RadrootsNostrEvent, 1601 ) -> Vec<String> { 1602 event 1603 .tags 1604 .iter() 1605 .filter_map(|tag| { 1606 let values = tag.as_slice(); 1607 if values.first().map(String::as_str) != Some("r") { 1608 return None; 1609 } 1610 let relay = values.get(1)?.trim(); 1611 if relay.is_empty() { 1612 return None; 1613 } 1614 if values.get(2).map(String::as_str) == Some("read") { 1615 return None; 1616 } 1617 Some(relay.to_owned()) 1618 }) 1619 .collect() 1620 } 1621 1622 fn publish_outcome_from_receipt( 1623 receipt: RadrootsRelayPublishRelayReceipt, 1624 source_by_relay: &BTreeMap<String, PublishRelaySource>, 1625 latency_ms: Option<u64>, 1626 ) -> PublishRelayOutcome { 1627 let source = source_by_relay 1628 .get(receipt.relay_url.as_str()) 1629 .copied() 1630 .unwrap_or(PublishRelaySource::DaemonDefault); 1631 PublishRelayOutcome { 1632 relay_url: receipt.relay_url, 1633 source, 1634 attempted: receipt.attempted, 1635 outcome_kind: publish_outcome_kind(receipt.outcome.kind), 1636 message: receipt.outcome.message, 1637 latency_ms, 1638 } 1639 } 1640 1641 fn publish_outcome_kind(kind: RadrootsRelayOutcomeKind) -> PublishRelayOutcomeKind { 1642 match kind { 1643 RadrootsRelayOutcomeKind::Accepted => PublishRelayOutcomeKind::Accepted, 1644 RadrootsRelayOutcomeKind::DuplicateAccepted => PublishRelayOutcomeKind::DuplicateAccepted, 1645 RadrootsRelayOutcomeKind::Blocked => PublishRelayOutcomeKind::Blocked, 1646 RadrootsRelayOutcomeKind::RateLimited => PublishRelayOutcomeKind::RateLimited, 1647 RadrootsRelayOutcomeKind::Invalid => PublishRelayOutcomeKind::Invalid, 1648 RadrootsRelayOutcomeKind::PowRequired => PublishRelayOutcomeKind::PowRequired, 1649 RadrootsRelayOutcomeKind::Restricted => PublishRelayOutcomeKind::Restricted, 1650 RadrootsRelayOutcomeKind::AuthRequired => PublishRelayOutcomeKind::AuthRequired, 1651 RadrootsRelayOutcomeKind::Muted => PublishRelayOutcomeKind::Muted, 1652 RadrootsRelayOutcomeKind::Unsupported => PublishRelayOutcomeKind::Unsupported, 1653 RadrootsRelayOutcomeKind::PaymentRequired => PublishRelayOutcomeKind::PaymentRequired, 1654 RadrootsRelayOutcomeKind::Error => PublishRelayOutcomeKind::Error, 1655 RadrootsRelayOutcomeKind::Timeout => PublishRelayOutcomeKind::Timeout, 1656 RadrootsRelayOutcomeKind::ConnectionFailed => PublishRelayOutcomeKind::ConnectionFailed, 1657 RadrootsRelayOutcomeKind::RelayUrlRejected => PublishRelayOutcomeKind::RelayUrlRejected, 1658 RadrootsRelayOutcomeKind::SkippedAlreadyAccepted => { 1659 PublishRelayOutcomeKind::SkippedAlreadyAccepted 1660 } 1661 RadrootsRelayOutcomeKind::Unknown => PublishRelayOutcomeKind::Unknown, 1662 } 1663 } 1664 1665 fn delivery_status( 1666 delivery_policy: &PublishDeliveryPolicy, 1667 relay_count: usize, 1668 outcomes: &[PublishRelayOutcome], 1669 ) -> PublishJobStatus { 1670 let required = delivery_policy.required_ack_count(relay_count); 1671 let acknowledged = outcomes 1672 .iter() 1673 .filter(|outcome| outcome.outcome_kind.counts_toward_quorum()) 1674 .count(); 1675 if acknowledged >= required { 1676 return PublishJobStatus::DeliverySatisfied; 1677 } 1678 if outcomes 1679 .iter() 1680 .any(|outcome| outcome.outcome_kind.is_retryable()) 1681 { 1682 PublishJobStatus::DeliveryUnsatisfiedRetryable 1683 } else { 1684 PublishJobStatus::DeliveryUnsatisfiedTerminal 1685 } 1686 } 1687 1688 fn timeout_receipts(targets: &[ResolvedPublishRelay]) -> Vec<RadrootsRelayPublishRelayReceipt> { 1689 targets 1690 .iter() 1691 .map(|target| { 1692 RadrootsRelayPublishRelayReceipt::attempted( 1693 target.url.as_str(), 1694 RadrootsRelayOutcome::timeout("timeout: publish attempt exceeded daemon bound"), 1695 ) 1696 }) 1697 .collect() 1698 } 1699 1700 fn transport_error_receipts( 1701 targets: &[ResolvedPublishRelay], 1702 error: PublishProxyError, 1703 ) -> Vec<RadrootsRelayPublishRelayReceipt> { 1704 let message = format!("error: {error}"); 1705 targets 1706 .iter() 1707 .map(|target| { 1708 RadrootsRelayPublishRelayReceipt::attempted( 1709 target.url.as_str(), 1710 RadrootsRelayOutcome::connection_failed(message.clone()), 1711 ) 1712 }) 1713 .collect() 1714 } 1715 1716 pub fn write_token_file(path: &Path, token: &str) -> Result<(), PublishProxyError> { 1717 if let Some(parent) = path 1718 .parent() 1719 .filter(|parent| !parent.as_os_str().is_empty()) 1720 { 1721 std::fs::create_dir_all(parent)?; 1722 } 1723 let mut options = std::fs::OpenOptions::new(); 1724 options.write(true).create_new(true); 1725 #[cfg(unix)] 1726 { 1727 use std::os::unix::fs::OpenOptionsExt; 1728 options.mode(0o600); 1729 } 1730 use std::io::Write; 1731 let mut file = options.open(path)?; 1732 file.write_all(token.as_bytes())?; 1733 file.write_all(b"\n")?; 1734 Ok(()) 1735 } 1736 1737 fn ensure_lower_hex( 1738 field: &str, 1739 value: &str, 1740 expected_len: usize, 1741 ) -> Result<(), PublishProxyError> { 1742 if value.len() == expected_len 1743 && value 1744 .bytes() 1745 .all(|byte| byte.is_ascii_digit() || matches!(byte, b'a'..=b'f')) 1746 { 1747 Ok(()) 1748 } else { 1749 Err(PublishProxyError::InvalidScope(format!( 1750 "{field} must be {expected_len} lowercase hex characters" 1751 ))) 1752 } 1753 } 1754 1755 fn json_column<T: for<'de> Deserialize<'de>>( 1756 row: &Row<'_>, 1757 index: usize, 1758 ) -> Result<T, rusqlite::Error> { 1759 let value: String = row.get(index)?; 1760 serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error)) 1761 } 1762 1763 fn json_text<T: for<'de> Deserialize<'de>>( 1764 row: &Row<'_>, 1765 index: usize, 1766 ) -> Result<T, rusqlite::Error> { 1767 let value: String = row.get(index)?; 1768 serde_json::from_str(value.as_str()).map_err(|error| conversion_error(index, error)) 1769 } 1770 1771 fn conversion_error<E>(index: usize, error: E) -> rusqlite::Error 1772 where 1773 E: std::error::Error + Send + Sync + 'static, 1774 { 1775 rusqlite::Error::FromSqlConversionFailure(index, Type::Text, Box::new(error)) 1776 } 1777 1778 fn current_unix_secs() -> i64 { 1779 SystemTime::now() 1780 .duration_since(UNIX_EPOCH) 1781 .map(|duration| duration.as_secs() as i64) 1782 .unwrap_or_default() 1783 } 1784 1785 fn current_unix_millis() -> i64 { 1786 SystemTime::now() 1787 .duration_since(UNIX_EPOCH) 1788 .map(|duration| duration.as_millis() as i64) 1789 .unwrap_or_default() 1790 } 1791 1792 #[cfg(test)] 1793 mod tests { 1794 use super::{ 1795 PublishJobInsert, PublishJobVisibility, PublishPrincipal, PublishPrincipalInit, 1796 PublishProxy, PublishProxyError, PublishProxyStore, generate_bearer_token, 1797 hash_bearer_token, parse_relay_policy, 1798 }; 1799 use crate::app::config::{PublishProxyConfig, PublishProxyRelayUrlPolicy}; 1800 use nostr::JsonUtil; 1801 use radroots_identity::RadrootsIdentity; 1802 use radroots_nostr::prelude::{ 1803 RadrootsNostrEventVerification, RadrootsNostrTimestamp, radroots_nostr_build_event, 1804 }; 1805 use radroots_publish_proxy_protocol::{ 1806 PublishDeliveryPolicy, PublishEventRequest, PublishJobStatus, PublishRelayOutcomeKind, 1807 PublishRelayPolicy, PublishRelaySource, SignedNostrEventWire, 1808 }; 1809 use radroots_relay_transport::{RadrootsMockRelayPublishAdapter, RadrootsRelayOutcome}; 1810 use std::collections::BTreeMap; 1811 use std::net::{IpAddr, Ipv4Addr}; 1812 use std::sync::Arc; 1813 1814 const RELAY_PRIMARY: &str = "wss://relay.example.com"; 1815 const RELAY_SECONDARY: &str = "wss://relay-2.example.com"; 1816 const RELAY_FORBIDDEN: &str = "wss://forbidden-relay.example.com"; 1817 1818 fn event(pubkey: &str, kind: u32) -> SignedNostrEventWire { 1819 SignedNostrEventWire { 1820 id: "0".repeat(64), 1821 pubkey: pubkey.to_owned(), 1822 created_at: 1_700_000_000, 1823 kind, 1824 tags: vec![vec!["d".to_owned(), "listing-1".to_owned()]], 1825 content: "{}".to_owned(), 1826 sig: "1".repeat(128), 1827 } 1828 } 1829 1830 fn request(pubkey: &str, kind: u32) -> PublishEventRequest { 1831 PublishEventRequest { 1832 event: event(pubkey, kind), 1833 relays: Vec::new(), 1834 relay_policy: PublishRelayPolicy::DaemonDefaultOnly, 1835 delivery_policy: PublishDeliveryPolicy::Any, 1836 idempotency_key: Some("idem-1".to_owned()), 1837 timeout_ms: None, 1838 } 1839 } 1840 1841 fn signed_event(identity: &RadrootsIdentity, content: &str) -> SignedNostrEventWire { 1842 let event = radroots_nostr_build_event( 1843 30_402, 1844 content, 1845 vec![vec!["d".to_owned(), "listing-1".to_owned()]], 1846 ) 1847 .expect("event builder") 1848 .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_000)) 1849 .sign_with_keys(identity.keys()) 1850 .expect("signed event"); 1851 serde_json::from_str(event.as_json().as_str()).expect("event wire") 1852 } 1853 1854 fn publish_request( 1855 event: SignedNostrEventWire, 1856 relays: Vec<String>, 1857 relay_policy: PublishRelayPolicy, 1858 delivery_policy: PublishDeliveryPolicy, 1859 idempotency_key: Option<&str>, 1860 ) -> PublishEventRequest { 1861 PublishEventRequest { 1862 event, 1863 relays, 1864 relay_policy, 1865 delivery_policy, 1866 idempotency_key: idempotency_key.map(str::to_owned), 1867 timeout_ms: Some(5_000), 1868 } 1869 } 1870 1871 fn publish_proxy( 1872 config: PublishProxyConfig, 1873 ) -> (PublishProxy, RadrootsMockRelayPublishAdapter) { 1874 publish_proxy_with_resolver(config, Arc::new(StaticPublishRelayResolver::new())) 1875 } 1876 1877 fn publish_proxy_with_resolver( 1878 config: PublishProxyConfig, 1879 resolver: Arc<dyn super::PublishRelayResolver>, 1880 ) -> (PublishProxy, RadrootsMockRelayPublishAdapter) { 1881 let adapter = RadrootsMockRelayPublishAdapter::new(); 1882 let proxy = PublishProxy::memory(config) 1883 .expect("proxy") 1884 .with_relay_resolver(resolver) 1885 .with_publisher(Arc::new(adapter.clone())); 1886 (proxy, adapter) 1887 } 1888 1889 fn principal( 1890 proxy: &PublishProxy, 1891 pubkey: String, 1892 policies: Vec<PublishRelayPolicy>, 1893 allow_request_relays: bool, 1894 visibility: PublishJobVisibility, 1895 ) -> PublishPrincipal { 1896 proxy 1897 .store 1898 .create_principal(PublishPrincipalInit { 1899 label: "tester".to_owned(), 1900 token_hash: hash_bearer_token(generate_bearer_token().as_str()), 1901 allowed_pubkeys: vec![pubkey], 1902 allowed_kinds: vec![30_402], 1903 allowed_relay_policies: policies, 1904 allow_request_relays, 1905 job_visibility: visibility, 1906 expires_at_unix: None, 1907 }) 1908 .expect("principal") 1909 } 1910 1911 fn config_with_defaults(relays: Vec<&str>) -> PublishProxyConfig { 1912 PublishProxyConfig { 1913 daemon_default_publish_relays: relays.into_iter().map(str::to_owned).collect(), 1914 ..PublishProxyConfig::default() 1915 } 1916 } 1917 1918 #[derive(Default)] 1919 struct StaticPublishRelayResolver { 1920 results: BTreeMap<String, Result<Vec<IpAddr>, String>>, 1921 } 1922 1923 impl StaticPublishRelayResolver { 1924 fn new() -> Self { 1925 Self::default() 1926 } 1927 1928 fn with_addresses(mut self, url: &str, addresses: Vec<IpAddr>) -> Self { 1929 self.results.insert(url.to_owned(), Ok(addresses)); 1930 self 1931 } 1932 1933 fn with_failure(mut self, url: &str, error: &str) -> Self { 1934 self.results.insert(url.to_owned(), Err(error.to_owned())); 1935 self 1936 } 1937 } 1938 1939 impl super::PublishRelayResolver for StaticPublishRelayResolver { 1940 fn resolve<'a>( 1941 &'a self, 1942 url: &'a radroots_relay_transport::RadrootsRelayUrl, 1943 ) -> super::PublishRelayResolveFuture<'a> { 1944 Box::pin(async move { 1945 match self.results.get(url.as_str()) { 1946 Some(Ok(addresses)) => Ok(addresses.clone()), 1947 Some(Err(error)) => Err(std::io::Error::other(error.clone())), 1948 None => Ok(vec![IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34))]), 1949 } 1950 }) 1951 } 1952 } 1953 1954 struct StaticPublishAuthorRelayDiscovery { 1955 relays: Vec<String>, 1956 } 1957 1958 impl StaticPublishAuthorRelayDiscovery { 1959 fn new(relays: Vec<&str>) -> Self { 1960 Self { 1961 relays: relays.into_iter().map(str::to_owned).collect(), 1962 } 1963 } 1964 } 1965 1966 impl super::PublishAuthorRelayDiscovery for StaticPublishAuthorRelayDiscovery { 1967 fn fetch_author_write_relays<'a>( 1968 &'a self, 1969 _pubkey: &'a str, 1970 _discovery_targets: Vec<super::ResolvedPublishRelay>, 1971 _connect_timeout_secs: u64, 1972 ) -> super::PublishAuthorRelayDiscoveryFuture<'a> { 1973 let relays = self.relays.clone(); 1974 Box::pin(async move { Ok(relays) }) 1975 } 1976 } 1977 1978 #[test] 1979 fn token_generation_and_hashing_do_not_store_plaintext() { 1980 let token = generate_bearer_token(); 1981 assert!(token.starts_with("rrd_pp_")); 1982 let hash = hash_bearer_token(token.as_str()); 1983 assert!(hash.starts_with("sha256:")); 1984 assert!(!hash.contains(token.as_str())); 1985 } 1986 1987 #[test] 1988 fn relay_policy_parser_accepts_contract_values() { 1989 assert_eq!( 1990 parse_relay_policy("explicit_only").expect("policy"), 1991 PublishRelayPolicy::ExplicitOnly 1992 ); 1993 assert!(parse_relay_policy("unknown").is_err()); 1994 } 1995 1996 #[test] 1997 fn storage_authenticates_hashed_tokens_and_scopes_jobs() { 1998 let store = PublishProxyStore::memory().expect("store"); 1999 let token = generate_bearer_token(); 2000 let token_hash = hash_bearer_token(token.as_str()); 2001 let principal = store 2002 .create_principal(PublishPrincipalInit { 2003 label: "tester".to_owned(), 2004 token_hash: token_hash.clone(), 2005 allowed_pubkeys: vec!["a".repeat(64)], 2006 allowed_kinds: vec![30_402], 2007 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], 2008 allow_request_relays: false, 2009 job_visibility: PublishJobVisibility::Own, 2010 expires_at_unix: None, 2011 }) 2012 .expect("principal"); 2013 assert_eq!( 2014 store 2015 .principal_for_token_hash(token_hash.as_str()) 2016 .expect("lookup") 2017 .expect("principal") 2018 .principal_id, 2019 principal.principal_id 2020 ); 2021 let denied = request("b".repeat(64).as_str(), 30_402); 2022 assert!(principal.allows_event(&denied).is_err()); 2023 2024 let accepted = request("a".repeat(64).as_str(), 30_402); 2025 principal.allows_event(&accepted).expect("scope"); 2026 let response = store 2027 .record_publish_job(PublishJobInsert { 2028 principal_id: principal.principal_id.clone(), 2029 idempotency_key: Some("idem-1".to_owned()), 2030 request: accepted.clone(), 2031 request_fingerprint: "fingerprint-1".to_owned(), 2032 effective_relay_count: 1, 2033 }) 2034 .expect("record job"); 2035 assert!(!response.deduplicated); 2036 let duplicate = store 2037 .record_publish_job(PublishJobInsert { 2038 principal_id: principal.principal_id.clone(), 2039 idempotency_key: Some("idem-1".to_owned()), 2040 request: accepted, 2041 request_fingerprint: "fingerprint-1".to_owned(), 2042 effective_relay_count: 1, 2043 }) 2044 .expect("dedupe"); 2045 assert!(duplicate.deduplicated); 2046 assert_eq!(duplicate.job.job_id, response.job.job_id); 2047 assert_eq!( 2048 store 2049 .list_jobs_for_principal(&principal, 50) 2050 .expect("jobs") 2051 .len(), 2052 1 2053 ); 2054 } 2055 2056 #[test] 2057 fn store_open_recovers_interrupted_publishing_jobs() { 2058 let directory = tempfile::tempdir().expect("tempdir"); 2059 let database_path = directory.path().join("publish-proxy.sqlite"); 2060 let token_hash = hash_bearer_token(generate_bearer_token().as_str()); 2061 let pubkey = "a".repeat(64); 2062 let request = request(pubkey.as_str(), 30_402); 2063 let job_id = { 2064 let store = PublishProxyStore::open(database_path.clone()).expect("store"); 2065 let principal = store 2066 .create_principal(PublishPrincipalInit { 2067 label: "tester".to_owned(), 2068 token_hash, 2069 allowed_pubkeys: vec![pubkey], 2070 allowed_kinds: vec![30_402], 2071 allowed_relay_policies: vec![PublishRelayPolicy::DaemonDefaultOnly], 2072 allow_request_relays: false, 2073 job_visibility: PublishJobVisibility::Own, 2074 expires_at_unix: None, 2075 }) 2076 .expect("principal"); 2077 let response = store 2078 .record_publish_job(PublishJobInsert { 2079 principal_id: principal.principal_id, 2080 idempotency_key: Some("idem-interrupted".to_owned()), 2081 request, 2082 request_fingerprint: "fingerprint-interrupted".to_owned(), 2083 effective_relay_count: 1, 2084 }) 2085 .expect("record job"); 2086 assert_eq!(response.job.status, PublishJobStatus::Publishing); 2087 response.job.job_id 2088 }; 2089 2090 let reopened = PublishProxyStore::open(database_path).expect("reopen store"); 2091 let recovered = reopened.job_by_id(job_id.as_str()).expect("recovered job"); 2092 assert_eq!( 2093 recovered.status, 2094 PublishJobStatus::DeliveryUnsatisfiedRetryable 2095 ); 2096 assert_eq!( 2097 recovered.last_error.as_deref(), 2098 Some("publish_attempt_interrupted") 2099 ); 2100 assert!(recovered.completed_at_ms.is_some()); 2101 assert!(recovered.relays.is_empty()); 2102 } 2103 2104 #[tokio::test] 2105 async fn publish_event_verifies_and_records_daemon_default_outcome() { 2106 let identity = RadrootsIdentity::generate(); 2107 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2108 let principal = principal( 2109 &proxy, 2110 identity.public_key_hex(), 2111 vec![PublishRelayPolicy::DaemonDefaultOnly], 2112 false, 2113 PublishJobVisibility::Own, 2114 ); 2115 let event = signed_event(&identity, "{}"); 2116 let raw_event = serde_json::to_string(&event).expect("raw event"); 2117 let response = proxy 2118 .publish_event( 2119 &principal, 2120 publish_request( 2121 event, 2122 Vec::new(), 2123 PublishRelayPolicy::DaemonDefaultOnly, 2124 PublishDeliveryPolicy::Any, 2125 Some("idem-valid"), 2126 ), 2127 ) 2128 .await 2129 .expect("publish"); 2130 2131 assert!(!response.deduplicated); 2132 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2133 assert_eq!(response.job.relay_count, 1); 2134 assert_eq!(response.job.acknowledged_count, 1); 2135 assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); 2136 assert_eq!( 2137 response.job.relays[0].source, 2138 PublishRelaySource::DaemonDefault 2139 ); 2140 assert_eq!(adapter.captured_raw_events(), vec![raw_event]); 2141 } 2142 2143 #[tokio::test] 2144 async fn publish_event_rejects_tampered_content_before_publish() { 2145 let identity = RadrootsIdentity::generate(); 2146 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2147 let principal = principal( 2148 &proxy, 2149 identity.public_key_hex(), 2150 vec![PublishRelayPolicy::DaemonDefaultOnly], 2151 false, 2152 PublishJobVisibility::Own, 2153 ); 2154 let mut event = signed_event(&identity, "trusted"); 2155 event.content = "tampered".to_owned(); 2156 let error = proxy 2157 .publish_event( 2158 &principal, 2159 publish_request( 2160 event, 2161 Vec::new(), 2162 PublishRelayPolicy::DaemonDefaultOnly, 2163 PublishDeliveryPolicy::Any, 2164 None, 2165 ), 2166 ) 2167 .await 2168 .expect_err("tampered event should fail"); 2169 2170 assert!(matches!( 2171 error, 2172 PublishProxyError::SignedEventVerification(RadrootsNostrEventVerification::IdMismatch) 2173 )); 2174 assert!(adapter.captured_raw_events().is_empty()); 2175 } 2176 2177 #[tokio::test] 2178 async fn publish_event_rejects_wrong_signature_before_publish() { 2179 let identity = RadrootsIdentity::generate(); 2180 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2181 let principal = principal( 2182 &proxy, 2183 identity.public_key_hex(), 2184 vec![PublishRelayPolicy::DaemonDefaultOnly], 2185 false, 2186 PublishJobVisibility::Own, 2187 ); 2188 let mut event = signed_event(&identity, "{}"); 2189 let replacement = if event.sig.starts_with('0') { "1" } else { "0" }; 2190 event.sig.replace_range(0..1, replacement); 2191 let error = proxy 2192 .publish_event( 2193 &principal, 2194 publish_request( 2195 event, 2196 Vec::new(), 2197 PublishRelayPolicy::DaemonDefaultOnly, 2198 PublishDeliveryPolicy::Any, 2199 None, 2200 ), 2201 ) 2202 .await 2203 .expect_err("wrong signature should fail"); 2204 2205 assert!(matches!( 2206 error, 2207 PublishProxyError::SignedEventVerification( 2208 RadrootsNostrEventVerification::SignatureInvalid 2209 ) 2210 )); 2211 assert!(adapter.captured_raw_events().is_empty()); 2212 } 2213 2214 #[tokio::test] 2215 async fn publish_event_rejects_malformed_wire_fields() { 2216 let identity = RadrootsIdentity::generate(); 2217 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2218 let principal = principal( 2219 &proxy, 2220 identity.public_key_hex(), 2221 vec![PublishRelayPolicy::DaemonDefaultOnly], 2222 false, 2223 PublishJobVisibility::Own, 2224 ); 2225 let mut event = signed_event(&identity, "{}"); 2226 event.id = event.id.to_uppercase(); 2227 let error = proxy 2228 .publish_event( 2229 &principal, 2230 publish_request( 2231 event, 2232 Vec::new(), 2233 PublishRelayPolicy::DaemonDefaultOnly, 2234 PublishDeliveryPolicy::Any, 2235 None, 2236 ), 2237 ) 2238 .await 2239 .expect_err("malformed field should fail"); 2240 2241 assert!(matches!(error, PublishProxyError::InvalidSignedEvent(_))); 2242 assert!(adapter.captured_raw_events().is_empty()); 2243 } 2244 2245 #[tokio::test] 2246 async fn publish_event_uses_explicit_request_relays_when_allowed() { 2247 let identity = RadrootsIdentity::generate(); 2248 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY])); 2249 let principal = principal( 2250 &proxy, 2251 identity.public_key_hex(), 2252 vec![PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault], 2253 true, 2254 PublishJobVisibility::Own, 2255 ); 2256 let response = proxy 2257 .publish_event( 2258 &principal, 2259 publish_request( 2260 signed_event(&identity, "{}"), 2261 vec![RELAY_PRIMARY.to_owned()], 2262 PublishRelayPolicy::RequestThenAuthorWriteThenDaemonDefault, 2263 PublishDeliveryPolicy::Any, 2264 None, 2265 ), 2266 ) 2267 .await 2268 .expect("publish"); 2269 2270 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2271 assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); 2272 assert_eq!(response.job.relays[0].source, PublishRelaySource::Request); 2273 } 2274 2275 #[tokio::test] 2276 async fn publish_event_uses_cached_nip65_author_write_before_defaults() { 2277 let identity = RadrootsIdentity::generate(); 2278 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY])); 2279 proxy 2280 .store 2281 .cache_author_write_relays( 2282 identity.public_key_hex().as_str(), 2283 &[RELAY_PRIMARY.to_owned()], 2284 ) 2285 .expect("cache author relays"); 2286 let principal = principal( 2287 &proxy, 2288 identity.public_key_hex(), 2289 vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault], 2290 false, 2291 PublishJobVisibility::Own, 2292 ); 2293 let response = proxy 2294 .publish_event( 2295 &principal, 2296 publish_request( 2297 signed_event(&identity, "{}"), 2298 Vec::new(), 2299 PublishRelayPolicy::AuthorWriteThenDaemonDefault, 2300 PublishDeliveryPolicy::Any, 2301 None, 2302 ), 2303 ) 2304 .await 2305 .expect("publish"); 2306 2307 assert_eq!(response.job.relays[0].relay_url, RELAY_PRIMARY); 2308 assert_eq!( 2309 response.job.relays[0].source, 2310 PublishRelaySource::AuthorWrite 2311 ); 2312 } 2313 2314 #[tokio::test] 2315 async fn publish_event_records_invalid_cached_author_write_relay() { 2316 let identity = RadrootsIdentity::generate(); 2317 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_SECONDARY])); 2318 proxy 2319 .store 2320 .cache_author_write_relays( 2321 identity.public_key_hex().as_str(), 2322 &[RELAY_PRIMARY.to_owned(), "not a cached relay".to_owned()], 2323 ) 2324 .expect("cache author relays"); 2325 let principal = principal( 2326 &proxy, 2327 identity.public_key_hex(), 2328 vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault], 2329 false, 2330 PublishJobVisibility::Own, 2331 ); 2332 let response = proxy 2333 .publish_event( 2334 &principal, 2335 publish_request( 2336 signed_event(&identity, "{}"), 2337 Vec::new(), 2338 PublishRelayPolicy::AuthorWriteThenDaemonDefault, 2339 PublishDeliveryPolicy::Any, 2340 None, 2341 ), 2342 ) 2343 .await 2344 .expect("publish"); 2345 2346 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2347 let accepted = response 2348 .job 2349 .relays 2350 .iter() 2351 .find(|relay| relay.relay_url == RELAY_PRIMARY) 2352 .expect("accepted author relay"); 2353 assert_eq!(accepted.source, PublishRelaySource::AuthorWrite); 2354 assert!(accepted.attempted); 2355 let rejected = response 2356 .job 2357 .relays 2358 .iter() 2359 .find(|relay| relay.relay_url == "not a cached relay") 2360 .expect("rejected cached author relay"); 2361 assert_eq!(rejected.source, PublishRelaySource::AuthorWrite); 2362 assert_eq!( 2363 rejected.outcome_kind, 2364 PublishRelayOutcomeKind::RelayUrlRejected 2365 ); 2366 assert!(!rejected.attempted); 2367 assert_eq!(adapter.captured_raw_events().len(), 1); 2368 } 2369 2370 #[tokio::test] 2371 async fn publish_event_preserves_author_and_discovery_rejections_through_fallback() { 2372 let identity = RadrootsIdentity::generate(); 2373 let mut config = config_with_defaults(vec![RELAY_SECONDARY]); 2374 config.author_relay_discovery_relays = vec!["not a discovery relay".to_owned()]; 2375 let (proxy, adapter) = publish_proxy(config); 2376 proxy 2377 .store 2378 .cache_author_write_relays( 2379 identity.public_key_hex().as_str(), 2380 &["not a cached relay".to_owned()], 2381 ) 2382 .expect("cache author relays"); 2383 let principal = principal( 2384 &proxy, 2385 identity.public_key_hex(), 2386 vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault], 2387 false, 2388 PublishJobVisibility::Own, 2389 ); 2390 let response = proxy 2391 .publish_event( 2392 &principal, 2393 publish_request( 2394 signed_event(&identity, "{}"), 2395 Vec::new(), 2396 PublishRelayPolicy::AuthorWriteThenDaemonDefault, 2397 PublishDeliveryPolicy::Any, 2398 None, 2399 ), 2400 ) 2401 .await 2402 .expect("publish"); 2403 2404 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2405 let daemon_default = response 2406 .job 2407 .relays 2408 .iter() 2409 .find(|relay| relay.relay_url == RELAY_SECONDARY) 2410 .expect("daemon default relay"); 2411 assert_eq!(daemon_default.source, PublishRelaySource::DaemonDefault); 2412 assert!(daemon_default.attempted); 2413 let cached = response 2414 .job 2415 .relays 2416 .iter() 2417 .find(|relay| relay.relay_url == "not a cached relay") 2418 .expect("cached author rejection"); 2419 assert_eq!(cached.source, PublishRelaySource::AuthorWrite); 2420 assert_eq!( 2421 cached.outcome_kind, 2422 PublishRelayOutcomeKind::RelayUrlRejected 2423 ); 2424 assert!(!cached.attempted); 2425 let discovery = response 2426 .job 2427 .relays 2428 .iter() 2429 .find(|relay| relay.relay_url == "not a discovery relay") 2430 .expect("discovery relay rejection"); 2431 assert_eq!(discovery.source, PublishRelaySource::DaemonDefault); 2432 assert_eq!( 2433 discovery.outcome_kind, 2434 PublishRelayOutcomeKind::RelayUrlRejected 2435 ); 2436 assert!(!discovery.attempted); 2437 assert_eq!(adapter.captured_raw_events().len(), 1); 2438 } 2439 2440 #[tokio::test] 2441 async fn publish_event_preserves_discovery_and_discovered_author_rejections() { 2442 let identity = RadrootsIdentity::generate(); 2443 let mut config = config_with_defaults(vec![RELAY_PRIMARY]); 2444 config.author_relay_discovery_relays = 2445 vec![RELAY_PRIMARY.to_owned(), RELAY_FORBIDDEN.to_owned()]; 2446 let resolver = StaticPublishRelayResolver::new().with_addresses( 2447 RELAY_FORBIDDEN, 2448 vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))], 2449 ); 2450 let adapter = RadrootsMockRelayPublishAdapter::new(); 2451 let proxy = PublishProxy::memory(config) 2452 .expect("proxy") 2453 .with_relay_resolver(Arc::new(resolver)) 2454 .with_author_relay_discovery(Arc::new(StaticPublishAuthorRelayDiscovery::new(vec![ 2455 "not a discovered author relay", 2456 RELAY_SECONDARY, 2457 ]))) 2458 .with_publisher(Arc::new(adapter.clone())); 2459 let principal = principal( 2460 &proxy, 2461 identity.public_key_hex(), 2462 vec![PublishRelayPolicy::AuthorWriteThenDaemonDefault], 2463 false, 2464 PublishJobVisibility::Own, 2465 ); 2466 let response = proxy 2467 .publish_event( 2468 &principal, 2469 publish_request( 2470 signed_event(&identity, "{}"), 2471 Vec::new(), 2472 PublishRelayPolicy::AuthorWriteThenDaemonDefault, 2473 PublishDeliveryPolicy::Any, 2474 None, 2475 ), 2476 ) 2477 .await 2478 .expect("publish"); 2479 2480 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2481 let accepted = response 2482 .job 2483 .relays 2484 .iter() 2485 .find(|relay| relay.relay_url == RELAY_SECONDARY) 2486 .expect("discovered author relay"); 2487 assert_eq!(accepted.source, PublishRelaySource::AuthorWrite); 2488 assert!(accepted.attempted); 2489 let discovered = response 2490 .job 2491 .relays 2492 .iter() 2493 .find(|relay| relay.relay_url == "not a discovered author relay") 2494 .expect("discovered author rejection"); 2495 assert_eq!(discovered.source, PublishRelaySource::AuthorWrite); 2496 assert_eq!( 2497 discovered.outcome_kind, 2498 PublishRelayOutcomeKind::RelayUrlRejected 2499 ); 2500 assert!(!discovered.attempted); 2501 let discovery = response 2502 .job 2503 .relays 2504 .iter() 2505 .find(|relay| relay.relay_url == RELAY_FORBIDDEN) 2506 .expect("discovery relay rejection"); 2507 assert_eq!(discovery.source, PublishRelaySource::DaemonDefault); 2508 assert_eq!( 2509 discovery.outcome_kind, 2510 PublishRelayOutcomeKind::RelayUrlRejected 2511 ); 2512 assert!(!discovery.attempted); 2513 assert_eq!(adapter.captured_raw_events().len(), 1); 2514 } 2515 2516 #[tokio::test] 2517 async fn publish_event_records_no_publish_relays_failure() { 2518 let identity = RadrootsIdentity::generate(); 2519 let (proxy, adapter) = publish_proxy(PublishProxyConfig::default()); 2520 let principal = principal( 2521 &proxy, 2522 identity.public_key_hex(), 2523 vec![PublishRelayPolicy::DaemonDefaultOnly], 2524 false, 2525 PublishJobVisibility::Own, 2526 ); 2527 let response = proxy 2528 .publish_event( 2529 &principal, 2530 publish_request( 2531 signed_event(&identity, "{}"), 2532 Vec::new(), 2533 PublishRelayPolicy::DaemonDefaultOnly, 2534 PublishDeliveryPolicy::Any, 2535 None, 2536 ), 2537 ) 2538 .await 2539 .expect("publish"); 2540 2541 assert_eq!(response.job.status, PublishJobStatus::Rejected); 2542 assert_eq!( 2543 response.job.last_error.as_deref(), 2544 Some("no_publish_relays") 2545 ); 2546 assert!(response.job.relays.is_empty()); 2547 assert!(adapter.captured_raw_events().is_empty()); 2548 } 2549 2550 #[tokio::test] 2551 async fn publish_event_records_unsafe_request_relay_rejection() { 2552 let identity = RadrootsIdentity::generate(); 2553 let (proxy, adapter) = publish_proxy(PublishProxyConfig::default()); 2554 let principal = principal( 2555 &proxy, 2556 identity.public_key_hex(), 2557 vec![PublishRelayPolicy::ExplicitOnly], 2558 true, 2559 PublishJobVisibility::Own, 2560 ); 2561 let response = proxy 2562 .publish_event( 2563 &principal, 2564 publish_request( 2565 signed_event(&identity, "{}"), 2566 vec!["wss://127.0.0.1:7777".to_owned()], 2567 PublishRelayPolicy::ExplicitOnly, 2568 PublishDeliveryPolicy::Any, 2569 None, 2570 ), 2571 ) 2572 .await 2573 .expect("publish"); 2574 2575 assert_eq!(response.job.status, PublishJobStatus::Rejected); 2576 assert_eq!(response.job.relays.len(), 1); 2577 assert_eq!( 2578 response.job.relays[0].outcome_kind, 2579 PublishRelayOutcomeKind::RelayUrlRejected 2580 ); 2581 assert!(!response.job.relays[0].attempted); 2582 assert!(adapter.captured_raw_events().is_empty()); 2583 } 2584 2585 #[tokio::test] 2586 async fn publish_event_rejects_forbidden_public_dns_destination_before_publish() { 2587 let identity = RadrootsIdentity::generate(); 2588 let resolver = StaticPublishRelayResolver::new() 2589 .with_addresses(RELAY_PRIMARY, vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))]); 2590 let (proxy, adapter) = publish_proxy_with_resolver( 2591 config_with_defaults(vec![RELAY_PRIMARY]), 2592 Arc::new(resolver), 2593 ); 2594 let principal = principal( 2595 &proxy, 2596 identity.public_key_hex(), 2597 vec![PublishRelayPolicy::DaemonDefaultOnly], 2598 false, 2599 PublishJobVisibility::Own, 2600 ); 2601 let response = proxy 2602 .publish_event( 2603 &principal, 2604 publish_request( 2605 signed_event(&identity, "{}"), 2606 Vec::new(), 2607 PublishRelayPolicy::DaemonDefaultOnly, 2608 PublishDeliveryPolicy::Any, 2609 None, 2610 ), 2611 ) 2612 .await 2613 .expect("publish"); 2614 2615 assert_eq!(response.job.status, PublishJobStatus::Rejected); 2616 assert_eq!(response.job.relays.len(), 1); 2617 assert_eq!( 2618 response.job.relays[0].outcome_kind, 2619 PublishRelayOutcomeKind::RelayUrlRejected 2620 ); 2621 assert!(!response.job.relays[0].attempted); 2622 assert!(adapter.captured_raw_events().is_empty()); 2623 } 2624 2625 #[tokio::test] 2626 async fn publish_event_records_dns_failure_as_unattempted_retryable_outcome() { 2627 let identity = RadrootsIdentity::generate(); 2628 let resolver = StaticPublishRelayResolver::new().with_failure(RELAY_PRIMARY, "no records"); 2629 let (proxy, adapter) = publish_proxy_with_resolver( 2630 config_with_defaults(vec![RELAY_PRIMARY]), 2631 Arc::new(resolver), 2632 ); 2633 let principal = principal( 2634 &proxy, 2635 identity.public_key_hex(), 2636 vec![PublishRelayPolicy::DaemonDefaultOnly], 2637 false, 2638 PublishJobVisibility::Own, 2639 ); 2640 let response = proxy 2641 .publish_event( 2642 &principal, 2643 publish_request( 2644 signed_event(&identity, "{}"), 2645 Vec::new(), 2646 PublishRelayPolicy::DaemonDefaultOnly, 2647 PublishDeliveryPolicy::Any, 2648 None, 2649 ), 2650 ) 2651 .await 2652 .expect("publish"); 2653 2654 assert_eq!( 2655 response.job.status, 2656 PublishJobStatus::DeliveryUnsatisfiedRetryable 2657 ); 2658 assert_eq!( 2659 response.job.last_error.as_deref(), 2660 Some("delivery_unsatisfied") 2661 ); 2662 assert_eq!(response.job.relays.len(), 1); 2663 assert_eq!( 2664 response.job.relays[0].outcome_kind, 2665 PublishRelayOutcomeKind::ConnectionFailed 2666 ); 2667 assert!(!response.job.relays[0].attempted); 2668 assert!(adapter.captured_raw_events().is_empty()); 2669 } 2670 2671 #[tokio::test] 2672 async fn publish_event_localhost_policy_skips_public_dns_guard() { 2673 let identity = RadrootsIdentity::generate(); 2674 let mut config = config_with_defaults(vec!["ws://localhost:7777"]); 2675 config.relay_url_policy = PublishProxyRelayUrlPolicy::Localhost; 2676 let resolver = StaticPublishRelayResolver::new() 2677 .with_failure("ws://localhost:7777", "localhost resolution should not run"); 2678 let (proxy, adapter) = publish_proxy_with_resolver(config, Arc::new(resolver)); 2679 let principal = principal( 2680 &proxy, 2681 identity.public_key_hex(), 2682 vec![PublishRelayPolicy::DaemonDefaultOnly], 2683 false, 2684 PublishJobVisibility::Own, 2685 ); 2686 let response = proxy 2687 .publish_event( 2688 &principal, 2689 publish_request( 2690 signed_event(&identity, "{}"), 2691 Vec::new(), 2692 PublishRelayPolicy::DaemonDefaultOnly, 2693 PublishDeliveryPolicy::Any, 2694 None, 2695 ), 2696 ) 2697 .await 2698 .expect("publish"); 2699 2700 assert_eq!(response.job.status, PublishJobStatus::DeliverySatisfied); 2701 assert_eq!(response.job.relays[0].relay_url, "ws://localhost:7777"); 2702 assert!(!adapter.captured_raw_events().is_empty()); 2703 } 2704 2705 #[tokio::test] 2706 async fn publish_event_deduplicates_same_intent_and_conflicts_different_intent() { 2707 let identity = RadrootsIdentity::generate(); 2708 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2709 let principal = principal( 2710 &proxy, 2711 identity.public_key_hex(), 2712 vec![PublishRelayPolicy::DaemonDefaultOnly], 2713 false, 2714 PublishJobVisibility::Own, 2715 ); 2716 let request = publish_request( 2717 signed_event(&identity, "{}"), 2718 Vec::new(), 2719 PublishRelayPolicy::DaemonDefaultOnly, 2720 PublishDeliveryPolicy::Any, 2721 Some("idem-conflict"), 2722 ); 2723 let first = proxy 2724 .publish_event(&principal, request.clone()) 2725 .await 2726 .expect("first"); 2727 let duplicate = proxy 2728 .publish_event(&principal, request) 2729 .await 2730 .expect("duplicate"); 2731 2732 assert!(!first.deduplicated); 2733 assert!(duplicate.deduplicated); 2734 assert_eq!(duplicate.job.job_id, first.job.job_id); 2735 2736 let conflict = proxy 2737 .publish_event( 2738 &principal, 2739 publish_request( 2740 signed_event(&identity, "changed"), 2741 Vec::new(), 2742 PublishRelayPolicy::DaemonDefaultOnly, 2743 PublishDeliveryPolicy::Any, 2744 Some("idem-conflict"), 2745 ), 2746 ) 2747 .await 2748 .expect_err("conflict"); 2749 assert!(matches!( 2750 conflict, 2751 PublishProxyError::IdempotencyConflict(_) 2752 )); 2753 } 2754 2755 #[tokio::test] 2756 async fn publish_event_rejects_zero_and_excessive_timeout_before_job_creation() { 2757 let identity = RadrootsIdentity::generate(); 2758 let (proxy, adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2759 let principal = principal( 2760 &proxy, 2761 identity.public_key_hex(), 2762 vec![PublishRelayPolicy::DaemonDefaultOnly], 2763 false, 2764 PublishJobVisibility::Own, 2765 ); 2766 let mut zero = publish_request( 2767 signed_event(&identity, "{}"), 2768 Vec::new(), 2769 PublishRelayPolicy::DaemonDefaultOnly, 2770 PublishDeliveryPolicy::Any, 2771 Some("idem-zero-timeout"), 2772 ); 2773 zero.timeout_ms = Some(0); 2774 let zero_error = proxy 2775 .publish_event(&principal, zero) 2776 .await 2777 .expect_err("zero timeout should fail"); 2778 assert!(matches!( 2779 zero_error, 2780 PublishProxyError::InvalidSignedEvent(_) 2781 )); 2782 2783 let mut excessive = publish_request( 2784 signed_event(&identity, "changed"), 2785 Vec::new(), 2786 PublishRelayPolicy::DaemonDefaultOnly, 2787 PublishDeliveryPolicy::Any, 2788 Some("idem-excessive-timeout"), 2789 ); 2790 excessive.timeout_ms = Some(10_001); 2791 let excessive_error = proxy 2792 .publish_event(&principal, excessive) 2793 .await 2794 .expect_err("excessive timeout should fail"); 2795 assert!(matches!( 2796 excessive_error, 2797 PublishProxyError::InvalidSignedEvent(_) 2798 )); 2799 assert!( 2800 proxy 2801 .store 2802 .list_jobs_for_principal(&principal, 50) 2803 .expect("jobs") 2804 .is_empty() 2805 ); 2806 assert!(adapter.captured_raw_events().is_empty()); 2807 } 2808 2809 #[tokio::test] 2810 async fn publish_event_default_timeout_fingerprints_as_effective_timeout() { 2811 let identity = RadrootsIdentity::generate(); 2812 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2813 let principal = principal( 2814 &proxy, 2815 identity.public_key_hex(), 2816 vec![PublishRelayPolicy::DaemonDefaultOnly], 2817 false, 2818 PublishJobVisibility::Own, 2819 ); 2820 let event = signed_event(&identity, "{}"); 2821 let mut default_timeout = publish_request( 2822 event.clone(), 2823 Vec::new(), 2824 PublishRelayPolicy::DaemonDefaultOnly, 2825 PublishDeliveryPolicy::Any, 2826 Some("idem-default-timeout"), 2827 ); 2828 default_timeout.timeout_ms = None; 2829 let mut explicit_default = publish_request( 2830 event, 2831 Vec::new(), 2832 PublishRelayPolicy::DaemonDefaultOnly, 2833 PublishDeliveryPolicy::Any, 2834 Some("idem-default-timeout"), 2835 ); 2836 explicit_default.timeout_ms = Some(10_000); 2837 2838 let first = proxy 2839 .publish_event(&principal, default_timeout) 2840 .await 2841 .expect("first"); 2842 let duplicate = proxy 2843 .publish_event(&principal, explicit_default) 2844 .await 2845 .expect("duplicate"); 2846 assert!(!first.deduplicated); 2847 assert!(duplicate.deduplicated); 2848 assert_eq!(duplicate.job.job_id, first.job.job_id); 2849 } 2850 2851 #[tokio::test] 2852 async fn publish_event_fingerprint_conflicts_on_different_effective_timeout() { 2853 let identity = RadrootsIdentity::generate(); 2854 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2855 let principal = principal( 2856 &proxy, 2857 identity.public_key_hex(), 2858 vec![PublishRelayPolicy::DaemonDefaultOnly], 2859 false, 2860 PublishJobVisibility::Own, 2861 ); 2862 let event = signed_event(&identity, "{}"); 2863 let first = publish_request( 2864 event.clone(), 2865 Vec::new(), 2866 PublishRelayPolicy::DaemonDefaultOnly, 2867 PublishDeliveryPolicy::Any, 2868 Some("idem-timeout-conflict"), 2869 ); 2870 let mut conflict = publish_request( 2871 event, 2872 Vec::new(), 2873 PublishRelayPolicy::DaemonDefaultOnly, 2874 PublishDeliveryPolicy::Any, 2875 Some("idem-timeout-conflict"), 2876 ); 2877 conflict.timeout_ms = Some(6_000); 2878 2879 proxy.publish_event(&principal, first).await.expect("first"); 2880 let error = proxy 2881 .publish_event(&principal, conflict) 2882 .await 2883 .expect_err("timeout conflict"); 2884 assert!(matches!(error, PublishProxyError::IdempotencyConflict(_))); 2885 } 2886 2887 #[tokio::test] 2888 async fn publish_event_concurrency_limit_rejects_without_job_creation() { 2889 let identity = RadrootsIdentity::generate(); 2890 let mut config = config_with_defaults(vec![RELAY_PRIMARY]); 2891 config.max_concurrent_publish_jobs = 1; 2892 let (proxy, adapter) = publish_proxy(config); 2893 let principal = principal( 2894 &proxy, 2895 identity.public_key_hex(), 2896 vec![PublishRelayPolicy::DaemonDefaultOnly], 2897 false, 2898 PublishJobVisibility::Own, 2899 ); 2900 let _permit = proxy.acquire_publish_permit().expect("permit"); 2901 let error = proxy 2902 .publish_event( 2903 &principal, 2904 publish_request( 2905 signed_event(&identity, "{}"), 2906 Vec::new(), 2907 PublishRelayPolicy::DaemonDefaultOnly, 2908 PublishDeliveryPolicy::Any, 2909 Some("idem-concurrency"), 2910 ), 2911 ) 2912 .await 2913 .expect_err("concurrency limit"); 2914 assert!(matches!(error, PublishProxyError::ConcurrencyLimit)); 2915 assert!( 2916 proxy 2917 .store 2918 .list_jobs_for_principal(&principal, 50) 2919 .expect("jobs") 2920 .is_empty() 2921 ); 2922 assert!(adapter.captured_raw_events().is_empty()); 2923 } 2924 2925 #[tokio::test] 2926 async fn publish_jobs_respect_own_and_admin_visibility() { 2927 let identity = RadrootsIdentity::generate(); 2928 let other_identity = RadrootsIdentity::generate(); 2929 let (proxy, _adapter) = publish_proxy(config_with_defaults(vec![RELAY_PRIMARY])); 2930 let owner = principal( 2931 &proxy, 2932 identity.public_key_hex(), 2933 vec![PublishRelayPolicy::DaemonDefaultOnly], 2934 false, 2935 PublishJobVisibility::Own, 2936 ); 2937 let other = principal( 2938 &proxy, 2939 other_identity.public_key_hex(), 2940 vec![PublishRelayPolicy::DaemonDefaultOnly], 2941 false, 2942 PublishJobVisibility::Own, 2943 ); 2944 let admin = principal( 2945 &proxy, 2946 other_identity.public_key_hex(), 2947 vec![PublishRelayPolicy::DaemonDefaultOnly], 2948 false, 2949 PublishJobVisibility::Admin, 2950 ); 2951 let response = proxy 2952 .publish_event( 2953 &owner, 2954 publish_request( 2955 signed_event(&identity, "{}"), 2956 Vec::new(), 2957 PublishRelayPolicy::DaemonDefaultOnly, 2958 PublishDeliveryPolicy::Any, 2959 None, 2960 ), 2961 ) 2962 .await 2963 .expect("publish"); 2964 2965 assert!( 2966 proxy 2967 .store 2968 .job_by_id_for_principal(response.job.job_id.as_str(), &other) 2969 .expect("other read") 2970 .is_none() 2971 ); 2972 assert!( 2973 proxy 2974 .store 2975 .job_by_id_for_principal(response.job.job_id.as_str(), &admin) 2976 .expect("admin read") 2977 .is_some() 2978 ); 2979 } 2980 2981 #[tokio::test] 2982 async fn publish_event_records_retryable_relay_failures() { 2983 let identity = RadrootsIdentity::generate(); 2984 let adapter = RadrootsMockRelayPublishAdapter::new().with_outcome( 2985 RELAY_PRIMARY, 2986 RadrootsRelayOutcome::connection_failed("error: unavailable"), 2987 ); 2988 let proxy = PublishProxy::memory(config_with_defaults(vec![RELAY_PRIMARY])) 2989 .expect("proxy") 2990 .with_publisher(Arc::new(adapter)); 2991 let principal = principal( 2992 &proxy, 2993 identity.public_key_hex(), 2994 vec![PublishRelayPolicy::DaemonDefaultOnly], 2995 false, 2996 PublishJobVisibility::Own, 2997 ); 2998 let response = proxy 2999 .publish_event( 3000 &principal, 3001 publish_request( 3002 signed_event(&identity, "{}"), 3003 Vec::new(), 3004 PublishRelayPolicy::DaemonDefaultOnly, 3005 PublishDeliveryPolicy::Any, 3006 None, 3007 ), 3008 ) 3009 .await 3010 .expect("publish"); 3011 3012 assert_eq!( 3013 response.job.status, 3014 PublishJobStatus::DeliveryUnsatisfiedRetryable 3015 ); 3016 assert_eq!(response.job.retryable_count, 1); 3017 } 3018 }