tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

event_bus.rs (3813B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::errors::BaseRelayError;
      4 use tangle_groups::StoreOffset;
      5 use tokio::sync::broadcast;
      6 
      7 #[derive(Debug, Clone)]
      8 pub struct TangleEventBus {
      9     sender: broadcast::Sender<StoreOffset>,
     10     capacity: usize,
     11 }
     12 
     13 impl TangleEventBus {
     14     pub fn new(capacity: usize) -> Result<Self, BaseRelayError> {
     15         if capacity == 0 {
     16             return Err(BaseRelayError::invalid(
     17                 "runtime event bus capacity must be greater than zero",
     18             ));
     19         }
     20         let (sender, _) = broadcast::channel(capacity);
     21         Ok(Self { sender, capacity })
     22     }
     23 
     24     pub fn capacity(&self) -> usize {
     25         self.capacity
     26     }
     27 
     28     pub fn subscribe(&self) -> TangleEventReceiver {
     29         TangleEventReceiver {
     30             receiver: self.sender.subscribe(),
     31         }
     32     }
     33 
     34     pub fn publish(&self, offset: StoreOffset) -> usize {
     35         self.sender.send(offset).unwrap_or(0)
     36     }
     37 
     38     pub fn receiver_count(&self) -> usize {
     39         self.sender.receiver_count()
     40     }
     41 }
     42 
     43 #[derive(Debug)]
     44 pub struct TangleEventReceiver {
     45     receiver: broadcast::Receiver<StoreOffset>,
     46 }
     47 
     48 impl TangleEventReceiver {
     49     pub async fn recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> {
     50         self.receiver.recv().await.map_err(Into::into)
     51     }
     52 
     53     pub fn try_recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> {
     54         self.receiver.try_recv().map_err(Into::into)
     55     }
     56 }
     57 
     58 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     59 pub enum TangleEventReceiveError {
     60     Empty,
     61     Closed,
     62     Lagged(u64),
     63 }
     64 
     65 impl From<broadcast::error::TryRecvError> for TangleEventReceiveError {
     66     fn from(error: broadcast::error::TryRecvError) -> Self {
     67         match error {
     68             broadcast::error::TryRecvError::Empty => Self::Empty,
     69             broadcast::error::TryRecvError::Closed => Self::Closed,
     70             broadcast::error::TryRecvError::Lagged(skipped) => Self::Lagged(skipped),
     71         }
     72     }
     73 }
     74 
     75 impl From<broadcast::error::RecvError> for TangleEventReceiveError {
     76     fn from(error: broadcast::error::RecvError) -> Self {
     77         match error {
     78             broadcast::error::RecvError::Closed => Self::Closed,
     79             broadcast::error::RecvError::Lagged(skipped) => Self::Lagged(skipped),
     80         }
     81     }
     82 }
     83 
     84 #[cfg(test)]
     85 mod tests {
     86     use super::{TangleEventBus, TangleEventReceiveError};
     87     use tangle_groups::StoreOffset;
     88 
     89     #[test]
     90     fn event_bus_broadcasts_offsets_to_subscribers() {
     91         let bus = TangleEventBus::new(2).expect("bus");
     92         let mut first = bus.subscribe();
     93         let mut second = bus.subscribe();
     94 
     95         assert_eq!(bus.capacity(), 2);
     96         assert_eq!(bus.receiver_count(), 2);
     97         assert_eq!(bus.publish(StoreOffset::new(42)), 2);
     98         assert_eq!(first.try_recv().expect("first"), StoreOffset::new(42));
     99         assert_eq!(second.try_recv().expect("second"), StoreOffset::new(42));
    100     }
    101 
    102     #[test]
    103     fn event_bus_reports_lagged_receivers() {
    104         let bus = TangleEventBus::new(2).expect("bus");
    105         let mut receiver = bus.subscribe();
    106 
    107         assert_eq!(bus.publish(StoreOffset::new(1)), 1);
    108         assert_eq!(bus.publish(StoreOffset::new(2)), 1);
    109         assert_eq!(bus.publish(StoreOffset::new(3)), 1);
    110         assert_eq!(bus.publish(StoreOffset::new(4)), 1);
    111         assert_eq!(
    112             receiver.try_recv().expect_err("lagged"),
    113             TangleEventReceiveError::Lagged(2)
    114         );
    115         assert_eq!(receiver.try_recv().expect("next"), StoreOffset::new(3));
    116         assert_eq!(receiver.try_recv().expect("latest"), StoreOffset::new(4));
    117     }
    118 
    119     #[test]
    120     fn event_bus_accepts_publish_without_receivers() {
    121         let bus = TangleEventBus::new(2).expect("bus");
    122 
    123         assert_eq!(bus.receiver_count(), 0);
    124         assert_eq!(bus.publish(StoreOffset::new(7)), 0);
    125     }
    126 }