outbox.rs (9812B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter, 5 RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, 6 RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrlPolicy, 7 publish_signed_event, 8 }; 9 use radroots_event_store::{ 10 RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, RadrootsRelayObservationType, 11 }; 12 use radroots_events::RadrootsNostrEvent; 13 use radroots_events::draft::RadrootsSignedNostrEvent; 14 use radroots_outbox::{ 15 RadrootsOutbox, RadrootsOutboxClaimedEvent, RadrootsOutboxEventStoreIngestReceipt, 16 RadrootsOutboxRelayStatus, 17 }; 18 19 #[derive(Clone, Debug, PartialEq, Eq)] 20 pub struct RadrootsOutboxPublishPolicy { 21 pub accepted_quorum: Option<usize>, 22 pub next_attempt_after_ms: i64, 23 pub republish_accepted_relays: bool, 24 pub relay_url_policy: RadrootsRelayUrlPolicy, 25 } 26 27 impl RadrootsOutboxPublishPolicy { 28 pub fn new(next_attempt_after_ms: i64) -> Self { 29 Self { 30 accepted_quorum: None, 31 next_attempt_after_ms, 32 republish_accepted_relays: false, 33 relay_url_policy: RadrootsRelayUrlPolicy::Public, 34 } 35 } 36 37 pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self { 38 self.accepted_quorum = Some(accepted_quorum); 39 self 40 } 41 42 pub fn republish_accepted_relays(mut self, enabled: bool) -> Self { 43 self.republish_accepted_relays = enabled; 44 self 45 } 46 47 pub fn relay_url_policy(mut self, policy: RadrootsRelayUrlPolicy) -> Self { 48 self.relay_url_policy = policy; 49 self 50 } 51 } 52 53 #[derive(Clone, Debug, PartialEq, Eq)] 54 pub struct RadrootsOutboxPublishReceipt { 55 pub local_ingest: RadrootsOutboxEventStoreIngestReceipt, 56 pub publish: RadrootsRelayPublishReceipt, 57 } 58 59 pub async fn publish_claimed_outbox_event<A>( 60 outbox: &RadrootsOutbox, 61 event_store: &RadrootsEventStore, 62 adapter: &A, 63 claimed: &RadrootsOutboxClaimedEvent, 64 policy: RadrootsOutboxPublishPolicy, 65 now_ms: i64, 66 ) -> Result<RadrootsOutboxPublishReceipt, RadrootsRelayTransportError> 67 where 68 A: RadrootsRelayPublishAdapter, 69 { 70 let signed_event = claimed.signed_event.clone().ok_or( 71 RadrootsRelayTransportError::MissingSignedOutboxEvent(claimed.outbox_event_id), 72 )?; 73 let local_ingest = outbox 74 .ingest_signed_event_local( 75 event_store, 76 claimed.outbox_event_id, 77 claimed.claim_token.as_str(), 78 now_ms, 79 ) 80 .await?; 81 let publishable = publishable_relays(outbox, claimed, policy.republish_accepted_relays).await?; 82 let overall_quorum = policy 83 .accepted_quorum 84 .unwrap_or(publishable.total_target_count); 85 outbox 86 .set_publish_quorum( 87 claimed.outbox_event_id, 88 claimed.claim_token.as_str(), 89 overall_quorum as i64, 90 now_ms, 91 ) 92 .await?; 93 if publishable.accepted_count >= overall_quorum { 94 outbox 95 .complete_publish_attempt( 96 claimed.outbox_event_id, 97 claimed.claim_token.as_str(), 98 "relay publish incomplete", 99 "relay publish terminal", 100 policy.next_attempt_after_ms, 101 now_ms, 102 ) 103 .await?; 104 let publish = RadrootsRelayPublishReceipt { 105 event_id: signed_event.id, 106 attempted_count: 0, 107 accepted_count: publishable.accepted_count, 108 retryable_count: 0, 109 terminal_count: 0, 110 quorum: overall_quorum, 111 quorum_met: true, 112 relays: Vec::new(), 113 }; 114 return Ok(RadrootsOutboxPublishReceipt { 115 local_ingest, 116 publish, 117 }); 118 } 119 let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?; 120 let target_strings = targets.relay_strings(); 121 let quorum = overall_quorum.saturating_sub(publishable.accepted_count); 122 let request = RadrootsRelayPublishRequest::new(signed_event.clone(), targets, now_ms) 123 .with_accepted_quorum(quorum); 124 let publish = match publish_signed_event(adapter, request).await { 125 Ok(receipt) => receipt, 126 Err(RadrootsRelayTransportError::Transport(message)) => adapter_transport_failure_receipt( 127 signed_event.id.clone(), 128 target_strings, 129 quorum, 130 message, 131 ), 132 Err(error) => return Err(error), 133 }; 134 135 for relay in &publish.relays { 136 match relay.outcome.kind { 137 RadrootsRelayOutcomeKind::Accepted | RadrootsRelayOutcomeKind::DuplicateAccepted => { 138 outbox 139 .mark_relay_accepted( 140 claimed.outbox_event_id, 141 claimed.claim_token.as_str(), 142 relay.relay_url.as_str(), 143 now_ms, 144 ) 145 .await?; 146 ingest_publish_observation( 147 event_store, 148 &signed_event, 149 relay.relay_url.as_str(), 150 relay.outcome.message.as_deref(), 151 now_ms, 152 ) 153 .await?; 154 } 155 _ if relay.outcome.is_retryable() => { 156 outbox 157 .mark_relay_failed_retryable( 158 claimed.outbox_event_id, 159 claimed.claim_token.as_str(), 160 relay.relay_url.as_str(), 161 relay 162 .outcome 163 .message 164 .as_deref() 165 .unwrap_or("relay publish retryable"), 166 now_ms, 167 ) 168 .await?; 169 } 170 _ => { 171 outbox 172 .mark_relay_failed_terminal( 173 claimed.outbox_event_id, 174 claimed.claim_token.as_str(), 175 relay.relay_url.as_str(), 176 relay 177 .outcome 178 .message 179 .as_deref() 180 .unwrap_or("relay publish terminal"), 181 now_ms, 182 ) 183 .await?; 184 } 185 } 186 } 187 188 outbox 189 .complete_publish_attempt( 190 claimed.outbox_event_id, 191 claimed.claim_token.as_str(), 192 "relay publish incomplete", 193 "relay publish terminal", 194 policy.next_attempt_after_ms, 195 now_ms, 196 ) 197 .await?; 198 199 Ok(RadrootsOutboxPublishReceipt { 200 local_ingest, 201 publish, 202 }) 203 } 204 205 fn adapter_transport_failure_receipt( 206 event_id: String, 207 relay_urls: Vec<String>, 208 quorum: usize, 209 message: String, 210 ) -> RadrootsRelayPublishReceipt { 211 let relays = relay_urls 212 .into_iter() 213 .map(|relay_url| { 214 RadrootsRelayPublishRelayReceipt::attempted( 215 relay_url, 216 RadrootsRelayOutcome::connection_failed(message.clone()), 217 ) 218 }) 219 .collect::<Vec<_>>(); 220 RadrootsRelayPublishReceipt { 221 event_id, 222 attempted_count: relays.len(), 223 accepted_count: 0, 224 retryable_count: relays.len(), 225 terminal_count: 0, 226 quorum, 227 quorum_met: false, 228 relays, 229 } 230 } 231 232 struct PublishableRelays { 233 relays: Vec<String>, 234 total_target_count: usize, 235 accepted_count: usize, 236 } 237 238 async fn publishable_relays( 239 outbox: &RadrootsOutbox, 240 claimed: &RadrootsOutboxClaimedEvent, 241 republish_accepted_relays: bool, 242 ) -> Result<PublishableRelays, RadrootsRelayTransportError> { 243 let statuses = outbox.relay_statuses(claimed.outbox_event_id).await?; 244 let mut relays = Vec::new(); 245 let mut total_target_count = 0usize; 246 let mut accepted_count = 0usize; 247 for status in statuses { 248 if !claimed 249 .target_relays 250 .iter() 251 .any(|relay_url| relay_url == &status.relay_url) 252 { 253 continue; 254 } 255 total_target_count += 1; 256 if status.status == RadrootsOutboxRelayStatus::Accepted { 257 accepted_count += 1; 258 } 259 if republish_accepted_relays || status.status != RadrootsOutboxRelayStatus::Accepted { 260 relays.push(status.relay_url); 261 } 262 } 263 Ok(PublishableRelays { 264 relays, 265 total_target_count, 266 accepted_count, 267 }) 268 } 269 270 async fn ingest_publish_observation( 271 event_store: &RadrootsEventStore, 272 signed_event: &RadrootsSignedNostrEvent, 273 relay_url: &str, 274 message: Option<&str>, 275 observed_at_ms: i64, 276 ) -> Result<(), RadrootsRelayTransportError> { 277 let mut observation = RadrootsRelayObservation::new( 278 relay_url, 279 RadrootsRelayObservationType::PublishAck, 280 observed_at_ms, 281 ); 282 if let Some(message) = message { 283 observation = observation.with_message(message); 284 } 285 let ingest = RadrootsEventIngest::new(event_from_signed(signed_event), observed_at_ms) 286 .with_raw_json(signed_event.raw_json.clone()) 287 .with_observation(observation); 288 event_store.ingest_event(ingest).await?; 289 Ok(()) 290 } 291 292 fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent { 293 RadrootsNostrEvent { 294 id: signed_event.id.clone(), 295 author: signed_event.pubkey.clone(), 296 created_at: signed_event.created_at, 297 kind: signed_event.kind, 298 tags: signed_event.tags.clone(), 299 content: signed_event.content.clone(), 300 sig: signed_event.sig.clone(), 301 } 302 }