event_bus.rs (3813B)
1 #![forbid(unsafe_code)] 2 3 use crate::errors::BaseRelayError; 4 use tangle_groups::StoreOffset; 5 use tokio::sync::broadcast; 6 7 #[derive(Debug, Clone)] 8 pub struct TangleEventBus { 9 sender: broadcast::Sender<StoreOffset>, 10 capacity: usize, 11 } 12 13 impl TangleEventBus { 14 pub fn new(capacity: usize) -> Result<Self, BaseRelayError> { 15 if capacity == 0 { 16 return Err(BaseRelayError::invalid( 17 "runtime event bus capacity must be greater than zero", 18 )); 19 } 20 let (sender, _) = broadcast::channel(capacity); 21 Ok(Self { sender, capacity }) 22 } 23 24 pub fn capacity(&self) -> usize { 25 self.capacity 26 } 27 28 pub fn subscribe(&self) -> TangleEventReceiver { 29 TangleEventReceiver { 30 receiver: self.sender.subscribe(), 31 } 32 } 33 34 pub fn publish(&self, offset: StoreOffset) -> usize { 35 self.sender.send(offset).unwrap_or(0) 36 } 37 38 pub fn receiver_count(&self) -> usize { 39 self.sender.receiver_count() 40 } 41 } 42 43 #[derive(Debug)] 44 pub struct TangleEventReceiver { 45 receiver: broadcast::Receiver<StoreOffset>, 46 } 47 48 impl TangleEventReceiver { 49 pub async fn recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> { 50 self.receiver.recv().await.map_err(Into::into) 51 } 52 53 pub fn try_recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> { 54 self.receiver.try_recv().map_err(Into::into) 55 } 56 } 57 58 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 59 pub enum TangleEventReceiveError { 60 Empty, 61 Closed, 62 Lagged(u64), 63 } 64 65 impl From<broadcast::error::TryRecvError> for TangleEventReceiveError { 66 fn from(error: broadcast::error::TryRecvError) -> Self { 67 match error { 68 broadcast::error::TryRecvError::Empty => Self::Empty, 69 broadcast::error::TryRecvError::Closed => Self::Closed, 70 broadcast::error::TryRecvError::Lagged(skipped) => Self::Lagged(skipped), 71 } 72 } 73 } 74 75 impl From<broadcast::error::RecvError> for TangleEventReceiveError { 76 fn from(error: broadcast::error::RecvError) -> Self { 77 match error { 78 broadcast::error::RecvError::Closed => Self::Closed, 79 broadcast::error::RecvError::Lagged(skipped) => Self::Lagged(skipped), 80 } 81 } 82 } 83 84 #[cfg(test)] 85 mod tests { 86 use super::{TangleEventBus, TangleEventReceiveError}; 87 use tangle_groups::StoreOffset; 88 89 #[test] 90 fn event_bus_broadcasts_offsets_to_subscribers() { 91 let bus = TangleEventBus::new(2).expect("bus"); 92 let mut first = bus.subscribe(); 93 let mut second = bus.subscribe(); 94 95 assert_eq!(bus.capacity(), 2); 96 assert_eq!(bus.receiver_count(), 2); 97 assert_eq!(bus.publish(StoreOffset::new(42)), 2); 98 assert_eq!(first.try_recv().expect("first"), StoreOffset::new(42)); 99 assert_eq!(second.try_recv().expect("second"), StoreOffset::new(42)); 100 } 101 102 #[test] 103 fn event_bus_reports_lagged_receivers() { 104 let bus = TangleEventBus::new(2).expect("bus"); 105 let mut receiver = bus.subscribe(); 106 107 assert_eq!(bus.publish(StoreOffset::new(1)), 1); 108 assert_eq!(bus.publish(StoreOffset::new(2)), 1); 109 assert_eq!(bus.publish(StoreOffset::new(3)), 1); 110 assert_eq!(bus.publish(StoreOffset::new(4)), 1); 111 assert_eq!( 112 receiver.try_recv().expect_err("lagged"), 113 TangleEventReceiveError::Lagged(2) 114 ); 115 assert_eq!(receiver.try_recv().expect("next"), StoreOffset::new(3)); 116 assert_eq!(receiver.try_recv().expect("latest"), StoreOffset::new(4)); 117 } 118 119 #[test] 120 fn event_bus_accepts_publish_without_receivers() { 121 let bus = TangleEventBus::new(2).expect("bus"); 122 123 assert_eq!(bus.receiver_count(), 0); 124 assert_eq!(bus.publish(StoreOffset::new(7)), 0); 125 } 126 }