outbound.rs (4582B)
1 #![forbid(unsafe_code)] 2 3 use crate::errors::BaseRelayError; 4 use std::str; 5 use tangle_protocol::{RelayMessage, SubscriptionId}; 6 use tangle_store_pocket::PocketOwnedEvent; 7 8 #[derive(Debug, Clone, PartialEq)] 9 pub enum RuntimeRelayMessage { 10 Event { 11 subscription_id: SubscriptionId, 12 event: PocketOwnedEvent, 13 }, 14 Protocol(RelayMessage), 15 } 16 17 impl RuntimeRelayMessage { 18 pub(crate) fn event(subscription_id: SubscriptionId, event: PocketOwnedEvent) -> Self { 19 Self::Event { 20 subscription_id, 21 event, 22 } 23 } 24 25 pub fn encode(&self) -> Result<String, BaseRelayError> { 26 match self { 27 Self::Event { 28 subscription_id, 29 event, 30 } => encode_pocket_event_message(subscription_id, event), 31 Self::Protocol(message) => Ok(message.encode()), 32 } 33 } 34 35 pub(crate) fn into_protocol_control_message(self) -> Result<RelayMessage, BaseRelayError> { 36 match self { 37 Self::Event { .. } => Err(BaseRelayError::error( 38 "event-bearing runtime messages must be encoded from Pocket events", 39 )), 40 Self::Protocol(message) => Ok(message), 41 } 42 } 43 } 44 45 impl From<RelayMessage> for RuntimeRelayMessage { 46 fn from(message: RelayMessage) -> Self { 47 Self::Protocol(message) 48 } 49 } 50 51 pub(crate) fn protocol_control_messages( 52 messages: Vec<RuntimeRelayMessage>, 53 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 54 messages 55 .into_iter() 56 .map(RuntimeRelayMessage::into_protocol_control_message) 57 .collect() 58 } 59 60 #[cfg(test)] 61 pub(crate) fn protocol_messages_for_test( 62 messages: Vec<RuntimeRelayMessage>, 63 ) -> Result<Vec<RelayMessage>, BaseRelayError> { 64 messages 65 .into_iter() 66 .map(|message| match message { 67 RuntimeRelayMessage::Event { 68 subscription_id, 69 event, 70 } => Ok(RelayMessage::Event { 71 subscription_id, 72 event: crate::pocket_conversion::pocket_event_to_tangle(&event)?, 73 }), 74 RuntimeRelayMessage::Protocol(message) => Ok(message), 75 }) 76 .collect() 77 } 78 79 fn encode_pocket_event_message( 80 subscription_id: &SubscriptionId, 81 event: &PocketOwnedEvent, 82 ) -> Result<String, BaseRelayError> { 83 let subscription = serde_json::to_string(subscription_id.as_str()).map_err(|error| { 84 BaseRelayError::error(format!("outbound subscription encode failed: {error}")) 85 })?; 86 let event_json = event.as_json().map_err(|error| { 87 BaseRelayError::error(format!("outbound Pocket event encode failed: {error}")) 88 })?; 89 let event_json = str::from_utf8(&event_json).map_err(|error| { 90 BaseRelayError::error(format!("outbound Pocket event JSON is not UTF-8: {error}")) 91 })?; 92 Ok(format!(r#"["EVENT",{subscription},{event_json}]"#)) 93 } 94 95 #[cfg(test)] 96 mod tests { 97 use super::RuntimeRelayMessage; 98 use crate::pocket_conversion::tangle_event_to_pocket; 99 use serde_json::json; 100 use tangle_protocol::{RelayMessage, SubscriptionId, event_to_value, relay_message_to_value}; 101 use tangle_test_support::{FixtureKey, tangle_v2_event}; 102 103 #[test] 104 fn outbound_pocket_event_encoding_preserves_event_fields() { 105 let event = tangle_v2_event( 106 FixtureKey::Member, 107 1_714_124_433, 108 1, 109 vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")], 110 "fresh carrots", 111 ) 112 .expect("event"); 113 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 114 let subscription_id = SubscriptionId::new("outbound-event").expect("subscription"); 115 let encoded = RuntimeRelayMessage::event(subscription_id.clone(), pocket) 116 .encode() 117 .expect("encoded"); 118 119 assert_eq!( 120 serde_json::from_str::<serde_json::Value>(&encoded).expect("json"), 121 json!(["EVENT", subscription_id.as_str(), event_to_value(&event)]) 122 ); 123 } 124 125 #[test] 126 fn outbound_protocol_messages_still_use_protocol_encoder() { 127 let subscription_id = SubscriptionId::new("outbound-eose").expect("subscription"); 128 let message = RelayMessage::Eose(subscription_id); 129 130 assert_eq!( 131 serde_json::from_str::<serde_json::Value>( 132 &RuntimeRelayMessage::from(message.clone()) 133 .encode() 134 .expect("encoded") 135 ) 136 .expect("json"), 137 relay_message_to_value(&message) 138 ); 139 } 140 }