tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

outbound.rs (4582B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::errors::BaseRelayError;
      4 use std::str;
      5 use tangle_protocol::{RelayMessage, SubscriptionId};
      6 use tangle_store_pocket::PocketOwnedEvent;
      7 
      8 #[derive(Debug, Clone, PartialEq)]
      9 pub enum RuntimeRelayMessage {
     10     Event {
     11         subscription_id: SubscriptionId,
     12         event: PocketOwnedEvent,
     13     },
     14     Protocol(RelayMessage),
     15 }
     16 
     17 impl RuntimeRelayMessage {
     18     pub(crate) fn event(subscription_id: SubscriptionId, event: PocketOwnedEvent) -> Self {
     19         Self::Event {
     20             subscription_id,
     21             event,
     22         }
     23     }
     24 
     25     pub fn encode(&self) -> Result<String, BaseRelayError> {
     26         match self {
     27             Self::Event {
     28                 subscription_id,
     29                 event,
     30             } => encode_pocket_event_message(subscription_id, event),
     31             Self::Protocol(message) => Ok(message.encode()),
     32         }
     33     }
     34 
     35     pub(crate) fn into_protocol_control_message(self) -> Result<RelayMessage, BaseRelayError> {
     36         match self {
     37             Self::Event { .. } => Err(BaseRelayError::error(
     38                 "event-bearing runtime messages must be encoded from Pocket events",
     39             )),
     40             Self::Protocol(message) => Ok(message),
     41         }
     42     }
     43 }
     44 
     45 impl From<RelayMessage> for RuntimeRelayMessage {
     46     fn from(message: RelayMessage) -> Self {
     47         Self::Protocol(message)
     48     }
     49 }
     50 
     51 pub(crate) fn protocol_control_messages(
     52     messages: Vec<RuntimeRelayMessage>,
     53 ) -> Result<Vec<RelayMessage>, BaseRelayError> {
     54     messages
     55         .into_iter()
     56         .map(RuntimeRelayMessage::into_protocol_control_message)
     57         .collect()
     58 }
     59 
     60 #[cfg(test)]
     61 pub(crate) fn protocol_messages_for_test(
     62     messages: Vec<RuntimeRelayMessage>,
     63 ) -> Result<Vec<RelayMessage>, BaseRelayError> {
     64     messages
     65         .into_iter()
     66         .map(|message| match message {
     67             RuntimeRelayMessage::Event {
     68                 subscription_id,
     69                 event,
     70             } => Ok(RelayMessage::Event {
     71                 subscription_id,
     72                 event: crate::pocket_conversion::pocket_event_to_tangle(&event)?,
     73             }),
     74             RuntimeRelayMessage::Protocol(message) => Ok(message),
     75         })
     76         .collect()
     77 }
     78 
     79 fn encode_pocket_event_message(
     80     subscription_id: &SubscriptionId,
     81     event: &PocketOwnedEvent,
     82 ) -> Result<String, BaseRelayError> {
     83     let subscription = serde_json::to_string(subscription_id.as_str()).map_err(|error| {
     84         BaseRelayError::error(format!("outbound subscription encode failed: {error}"))
     85     })?;
     86     let event_json = event.as_json().map_err(|error| {
     87         BaseRelayError::error(format!("outbound Pocket event encode failed: {error}"))
     88     })?;
     89     let event_json = str::from_utf8(&event_json).map_err(|error| {
     90         BaseRelayError::error(format!("outbound Pocket event JSON is not UTF-8: {error}"))
     91     })?;
     92     Ok(format!(r#"["EVENT",{subscription},{event_json}]"#))
     93 }
     94 
     95 #[cfg(test)]
     96 mod tests {
     97     use super::RuntimeRelayMessage;
     98     use crate::pocket_conversion::tangle_event_to_pocket;
     99     use serde_json::json;
    100     use tangle_protocol::{RelayMessage, SubscriptionId, event_to_value, relay_message_to_value};
    101     use tangle_test_support::{FixtureKey, tangle_v2_event};
    102 
    103     #[test]
    104     fn outbound_pocket_event_encoding_preserves_event_fields() {
    105         let event = tangle_v2_event(
    106             FixtureKey::Member,
    107             1_714_124_433,
    108             1,
    109             vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")],
    110             "fresh carrots",
    111         )
    112         .expect("event");
    113         let pocket = tangle_event_to_pocket(&event).expect("pocket");
    114         let subscription_id = SubscriptionId::new("outbound-event").expect("subscription");
    115         let encoded = RuntimeRelayMessage::event(subscription_id.clone(), pocket)
    116             .encode()
    117             .expect("encoded");
    118 
    119         assert_eq!(
    120             serde_json::from_str::<serde_json::Value>(&encoded).expect("json"),
    121             json!(["EVENT", subscription_id.as_str(), event_to_value(&event)])
    122         );
    123     }
    124 
    125     #[test]
    126     fn outbound_protocol_messages_still_use_protocol_encoder() {
    127         let subscription_id = SubscriptionId::new("outbound-eose").expect("subscription");
    128         let message = RelayMessage::Eose(subscription_id);
    129 
    130         assert_eq!(
    131             serde_json::from_str::<serde_json::Value>(
    132                 &RuntimeRelayMessage::from(message.clone())
    133                     .encode()
    134                     .expect("encoded")
    135             )
    136             .expect("json"),
    137             relay_message_to_value(&message)
    138         );
    139     }
    140 }