lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

outbox.rs (9812B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{
      4     RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter,
      5     RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest,
      6     RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrlPolicy,
      7     publish_signed_event,
      8 };
      9 use radroots_event_store::{
     10     RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, RadrootsRelayObservationType,
     11 };
     12 use radroots_events::RadrootsNostrEvent;
     13 use radroots_events::draft::RadrootsSignedNostrEvent;
     14 use radroots_outbox::{
     15     RadrootsOutbox, RadrootsOutboxClaimedEvent, RadrootsOutboxEventStoreIngestReceipt,
     16     RadrootsOutboxRelayStatus,
     17 };
     18 
     19 #[derive(Clone, Debug, PartialEq, Eq)]
     20 pub struct RadrootsOutboxPublishPolicy {
     21     pub accepted_quorum: Option<usize>,
     22     pub next_attempt_after_ms: i64,
     23     pub republish_accepted_relays: bool,
     24     pub relay_url_policy: RadrootsRelayUrlPolicy,
     25 }
     26 
     27 impl RadrootsOutboxPublishPolicy {
     28     pub fn new(next_attempt_after_ms: i64) -> Self {
     29         Self {
     30             accepted_quorum: None,
     31             next_attempt_after_ms,
     32             republish_accepted_relays: false,
     33             relay_url_policy: RadrootsRelayUrlPolicy::Public,
     34         }
     35     }
     36 
     37     pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self {
     38         self.accepted_quorum = Some(accepted_quorum);
     39         self
     40     }
     41 
     42     pub fn republish_accepted_relays(mut self, enabled: bool) -> Self {
     43         self.republish_accepted_relays = enabled;
     44         self
     45     }
     46 
     47     pub fn relay_url_policy(mut self, policy: RadrootsRelayUrlPolicy) -> Self {
     48         self.relay_url_policy = policy;
     49         self
     50     }
     51 }
     52 
     53 #[derive(Clone, Debug, PartialEq, Eq)]
     54 pub struct RadrootsOutboxPublishReceipt {
     55     pub local_ingest: RadrootsOutboxEventStoreIngestReceipt,
     56     pub publish: RadrootsRelayPublishReceipt,
     57 }
     58 
     59 pub async fn publish_claimed_outbox_event<A>(
     60     outbox: &RadrootsOutbox,
     61     event_store: &RadrootsEventStore,
     62     adapter: &A,
     63     claimed: &RadrootsOutboxClaimedEvent,
     64     policy: RadrootsOutboxPublishPolicy,
     65     now_ms: i64,
     66 ) -> Result<RadrootsOutboxPublishReceipt, RadrootsRelayTransportError>
     67 where
     68     A: RadrootsRelayPublishAdapter,
     69 {
     70     let signed_event = claimed.signed_event.clone().ok_or(
     71         RadrootsRelayTransportError::MissingSignedOutboxEvent(claimed.outbox_event_id),
     72     )?;
     73     let local_ingest = outbox
     74         .ingest_signed_event_local(
     75             event_store,
     76             claimed.outbox_event_id,
     77             claimed.claim_token.as_str(),
     78             now_ms,
     79         )
     80         .await?;
     81     let publishable = publishable_relays(outbox, claimed, policy.republish_accepted_relays).await?;
     82     let overall_quorum = policy
     83         .accepted_quorum
     84         .unwrap_or(publishable.total_target_count);
     85     outbox
     86         .set_publish_quorum(
     87             claimed.outbox_event_id,
     88             claimed.claim_token.as_str(),
     89             overall_quorum as i64,
     90             now_ms,
     91         )
     92         .await?;
     93     if publishable.accepted_count >= overall_quorum {
     94         outbox
     95             .complete_publish_attempt(
     96                 claimed.outbox_event_id,
     97                 claimed.claim_token.as_str(),
     98                 "relay publish incomplete",
     99                 "relay publish terminal",
    100                 policy.next_attempt_after_ms,
    101                 now_ms,
    102             )
    103             .await?;
    104         let publish = RadrootsRelayPublishReceipt {
    105             event_id: signed_event.id,
    106             attempted_count: 0,
    107             accepted_count: publishable.accepted_count,
    108             retryable_count: 0,
    109             terminal_count: 0,
    110             quorum: overall_quorum,
    111             quorum_met: true,
    112             relays: Vec::new(),
    113         };
    114         return Ok(RadrootsOutboxPublishReceipt {
    115             local_ingest,
    116             publish,
    117         });
    118     }
    119     let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?;
    120     let target_strings = targets.relay_strings();
    121     let quorum = overall_quorum.saturating_sub(publishable.accepted_count);
    122     let request = RadrootsRelayPublishRequest::new(signed_event.clone(), targets, now_ms)
    123         .with_accepted_quorum(quorum);
    124     let publish = match publish_signed_event(adapter, request).await {
    125         Ok(receipt) => receipt,
    126         Err(RadrootsRelayTransportError::Transport(message)) => adapter_transport_failure_receipt(
    127             signed_event.id.clone(),
    128             target_strings,
    129             quorum,
    130             message,
    131         ),
    132         Err(error) => return Err(error),
    133     };
    134 
    135     for relay in &publish.relays {
    136         match relay.outcome.kind {
    137             RadrootsRelayOutcomeKind::Accepted | RadrootsRelayOutcomeKind::DuplicateAccepted => {
    138                 outbox
    139                     .mark_relay_accepted(
    140                         claimed.outbox_event_id,
    141                         claimed.claim_token.as_str(),
    142                         relay.relay_url.as_str(),
    143                         now_ms,
    144                     )
    145                     .await?;
    146                 ingest_publish_observation(
    147                     event_store,
    148                     &signed_event,
    149                     relay.relay_url.as_str(),
    150                     relay.outcome.message.as_deref(),
    151                     now_ms,
    152                 )
    153                 .await?;
    154             }
    155             _ if relay.outcome.is_retryable() => {
    156                 outbox
    157                     .mark_relay_failed_retryable(
    158                         claimed.outbox_event_id,
    159                         claimed.claim_token.as_str(),
    160                         relay.relay_url.as_str(),
    161                         relay
    162                             .outcome
    163                             .message
    164                             .as_deref()
    165                             .unwrap_or("relay publish retryable"),
    166                         now_ms,
    167                     )
    168                     .await?;
    169             }
    170             _ => {
    171                 outbox
    172                     .mark_relay_failed_terminal(
    173                         claimed.outbox_event_id,
    174                         claimed.claim_token.as_str(),
    175                         relay.relay_url.as_str(),
    176                         relay
    177                             .outcome
    178                             .message
    179                             .as_deref()
    180                             .unwrap_or("relay publish terminal"),
    181                         now_ms,
    182                     )
    183                     .await?;
    184             }
    185         }
    186     }
    187 
    188     outbox
    189         .complete_publish_attempt(
    190             claimed.outbox_event_id,
    191             claimed.claim_token.as_str(),
    192             "relay publish incomplete",
    193             "relay publish terminal",
    194             policy.next_attempt_after_ms,
    195             now_ms,
    196         )
    197         .await?;
    198 
    199     Ok(RadrootsOutboxPublishReceipt {
    200         local_ingest,
    201         publish,
    202     })
    203 }
    204 
    205 fn adapter_transport_failure_receipt(
    206     event_id: String,
    207     relay_urls: Vec<String>,
    208     quorum: usize,
    209     message: String,
    210 ) -> RadrootsRelayPublishReceipt {
    211     let relays = relay_urls
    212         .into_iter()
    213         .map(|relay_url| {
    214             RadrootsRelayPublishRelayReceipt::attempted(
    215                 relay_url,
    216                 RadrootsRelayOutcome::connection_failed(message.clone()),
    217             )
    218         })
    219         .collect::<Vec<_>>();
    220     RadrootsRelayPublishReceipt {
    221         event_id,
    222         attempted_count: relays.len(),
    223         accepted_count: 0,
    224         retryable_count: relays.len(),
    225         terminal_count: 0,
    226         quorum,
    227         quorum_met: false,
    228         relays,
    229     }
    230 }
    231 
    232 struct PublishableRelays {
    233     relays: Vec<String>,
    234     total_target_count: usize,
    235     accepted_count: usize,
    236 }
    237 
    238 async fn publishable_relays(
    239     outbox: &RadrootsOutbox,
    240     claimed: &RadrootsOutboxClaimedEvent,
    241     republish_accepted_relays: bool,
    242 ) -> Result<PublishableRelays, RadrootsRelayTransportError> {
    243     let statuses = outbox.relay_statuses(claimed.outbox_event_id).await?;
    244     let mut relays = Vec::new();
    245     let mut total_target_count = 0usize;
    246     let mut accepted_count = 0usize;
    247     for status in statuses {
    248         if !claimed
    249             .target_relays
    250             .iter()
    251             .any(|relay_url| relay_url == &status.relay_url)
    252         {
    253             continue;
    254         }
    255         total_target_count += 1;
    256         if status.status == RadrootsOutboxRelayStatus::Accepted {
    257             accepted_count += 1;
    258         }
    259         if republish_accepted_relays || status.status != RadrootsOutboxRelayStatus::Accepted {
    260             relays.push(status.relay_url);
    261         }
    262     }
    263     Ok(PublishableRelays {
    264         relays,
    265         total_target_count,
    266         accepted_count,
    267     })
    268 }
    269 
    270 async fn ingest_publish_observation(
    271     event_store: &RadrootsEventStore,
    272     signed_event: &RadrootsSignedNostrEvent,
    273     relay_url: &str,
    274     message: Option<&str>,
    275     observed_at_ms: i64,
    276 ) -> Result<(), RadrootsRelayTransportError> {
    277     let mut observation = RadrootsRelayObservation::new(
    278         relay_url,
    279         RadrootsRelayObservationType::PublishAck,
    280         observed_at_ms,
    281     );
    282     if let Some(message) = message {
    283         observation = observation.with_message(message);
    284     }
    285     let ingest = RadrootsEventIngest::new(event_from_signed(signed_event), observed_at_ms)
    286         .with_raw_json(signed_event.raw_json.clone())
    287         .with_observation(observation);
    288     event_store.ingest_event(ingest).await?;
    289     Ok(())
    290 }
    291 
    292 fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent {
    293     RadrootsNostrEvent {
    294         id: signed_event.id.clone(),
    295         author: signed_event.pubkey.clone(),
    296         created_at: signed_event.created_at,
    297         kind: signed_event.kind,
    298         tags: signed_event.tags.clone(),
    299         content: signed_event.content.clone(),
    300         sig: signed_event.sig.clone(),
    301     }
    302 }