tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

live.rs (8072B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::errors::BaseRelayError;
      4 use std::collections::BTreeMap;
      5 use tangle_groups::GroupAuthContext;
      6 use tangle_protocol::SubscriptionId;
      7 use tangle_store_pocket::{PocketEvent, PocketOwnedFilter};
      8 
      9 #[derive(Debug, Clone, PartialEq, Eq)]
     10 pub(crate) struct LiveSubscriptionSet {
     11     subscriptions: BTreeMap<SubscriptionId, LiveSubscription>,
     12     max_subscriptions: usize,
     13 }
     14 
     15 #[derive(Debug, Clone, PartialEq, Eq)]
     16 struct LiveSubscription {
     17     filters: Vec<PocketOwnedFilter>,
     18 }
     19 
     20 impl LiveSubscriptionSet {
     21     pub(crate) fn new(
     22         max_pending_events: usize,
     23         max_subscriptions: usize,
     24     ) -> Result<Self, BaseRelayError> {
     25         if max_pending_events == 0 {
     26             return Err(BaseRelayError::invalid(
     27                 "live subscription pending event limit must be greater than zero",
     28             ));
     29         }
     30         if max_subscriptions == 0 {
     31             return Err(BaseRelayError::invalid(
     32                 "live subscription count limit must be greater than zero",
     33             ));
     34         }
     35         Ok(Self {
     36             subscriptions: BTreeMap::new(),
     37             max_subscriptions,
     38         })
     39     }
     40 
     41     pub(crate) fn subscribe(
     42         &mut self,
     43         subscription_id: SubscriptionId,
     44         filters: Vec<PocketOwnedFilter>,
     45     ) -> Result<(), BaseRelayError> {
     46         self.ensure_can_subscribe(&subscription_id, &filters)?;
     47         self.subscriptions
     48             .insert(subscription_id, LiveSubscription { filters });
     49         Ok(())
     50     }
     51 
     52     pub(crate) fn ensure_can_subscribe(
     53         &self,
     54         subscription_id: &SubscriptionId,
     55         filters: &[PocketOwnedFilter],
     56     ) -> Result<(), BaseRelayError> {
     57         if filters.is_empty() {
     58             return Err(BaseRelayError::invalid(
     59                 "subscription must include at least one filter",
     60             ));
     61         }
     62         if !self.subscriptions.contains_key(subscription_id)
     63             && self.subscriptions.len() >= self.max_subscriptions
     64         {
     65             return Err(BaseRelayError::invalid(
     66                 "connection subscription limit exceeded",
     67             ));
     68         }
     69         Ok(())
     70     }
     71 
     72     pub(crate) fn close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
     73         if self.subscriptions.remove(subscription_id).is_some() {
     74             CloseResult::Closed
     75         } else {
     76             CloseResult::NotFound
     77         }
     78     }
     79 
     80     pub(crate) fn contains(&self, subscription_id: &SubscriptionId) -> bool {
     81         self.subscriptions.contains_key(subscription_id)
     82     }
     83 
     84     pub(crate) fn close_all(&mut self) -> usize {
     85         let closed = self.subscriptions.len();
     86         self.subscriptions.clear();
     87         closed
     88     }
     89 
     90     pub(crate) fn fanout(
     91         &self,
     92         event: &PocketEvent,
     93         auth: &GroupAuthContext,
     94         visible_to_auth: impl Fn(&PocketEvent, &GroupAuthContext) -> bool,
     95     ) -> Result<Vec<SubscriptionId>, BaseRelayError> {
     96         self.subscriptions.iter().try_fold(
     97             Vec::new(),
     98             |mut matched, (subscription_id, subscription)| {
     99                 if !subscription
    100                     .filters
    101                     .iter()
    102                     .map(|filter| filter.event_matches(event))
    103                     .collect::<Result<Vec<_>, _>>()
    104                     .map_err(|error| BaseRelayError::error(error.to_string()))?
    105                     .into_iter()
    106                     .any(|matches| matches)
    107                 {
    108                     return Ok(matched);
    109                 }
    110                 if visible_to_auth(event, auth) {
    111                     matched.push(subscription_id.clone());
    112                 }
    113                 Ok(matched)
    114             },
    115         )
    116     }
    117 
    118     pub(crate) fn active_count(&self) -> usize {
    119         self.subscriptions.len()
    120     }
    121 }
    122 
    123 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    124 pub enum CloseResult {
    125     Closed,
    126     NotFound,
    127 }
    128 
    129 #[cfg(test)]
    130 mod tests {
    131     use super::{CloseResult, LiveSubscriptionSet};
    132     use tangle_groups::GroupAuthContext;
    133     use tangle_protocol::{SubscriptionId, filter_from_value};
    134     use tangle_test_support::{FixtureKey, tangle_v2_event};
    135 
    136     #[test]
    137     fn live_subscription_fanout_keeps_healthy_subscriptions_open() {
    138         let mut subscriptions = LiveSubscriptionSet::new(1, 1).expect("subscriptions");
    139         let subscription_id = SubscriptionId::new("live").expect("subscription");
    140         subscriptions
    141             .subscribe(
    142                 subscription_id.clone(),
    143                 vec![pocket_filter(serde_json::json!({"kinds":[1]}))],
    144             )
    145             .expect("subscribe");
    146         let first = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "first")
    147             .expect("first");
    148         let second = tangle_v2_event(FixtureKey::Member, 1_714_124_434, 1, Vec::new(), "second")
    149             .expect("second");
    150         let third = tangle_v2_event(FixtureKey::Member, 1_714_124_435, 1, Vec::new(), "third")
    151             .expect("third");
    152 
    153         assert!(matches!(
    154             subscriptions
    155                 .fanout(&pocket_event(&first), &GroupAuthContext::unauthenticated(), |_, _| true)
    156                 .expect("fanout")
    157                 .as_slice(),
    158             [delivered] if delivered == &subscription_id
    159         ));
    160         assert!(matches!(
    161             subscriptions
    162                 .fanout(&pocket_event(&second), &GroupAuthContext::unauthenticated(), |_, _| true)
    163                 .expect("fanout")
    164                 .as_slice(),
    165             [delivered] if delivered == &subscription_id
    166         ));
    167         assert!(matches!(
    168             subscriptions
    169                 .fanout(&pocket_event(&third), &GroupAuthContext::unauthenticated(), |_, _| true)
    170                 .expect("fanout")
    171                 .as_slice(),
    172             [delivered] if delivered == &subscription_id
    173         ));
    174         assert_eq!(subscriptions.close(&subscription_id), CloseResult::Closed);
    175     }
    176 
    177     #[test]
    178     fn live_subscription_fanout_uses_pocket_filter_matching_and_auth_gate() {
    179         let mut subscriptions = LiveSubscriptionSet::new(4, 4).expect("subscriptions");
    180         let event = tangle_v2_event(
    181             FixtureKey::Member,
    182             1_714_124_433,
    183             1,
    184             vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")],
    185             "first",
    186         )
    187         .expect("event");
    188         let matched = SubscriptionId::new("matched").expect("subscription");
    189         let mismatched = SubscriptionId::new("mismatched").expect("subscription");
    190         subscriptions
    191             .subscribe(
    192                 matched.clone(),
    193                 vec![pocket_filter(serde_json::json!({
    194                     "ids": [event.id().as_str()],
    195                     "authors": [event.unsigned().pubkey().as_str()],
    196                     "kinds": [1],
    197                     "#t": ["market"],
    198                     "since": 1_714_124_433,
    199                     "until": 1_714_124_434
    200                 }))],
    201             )
    202             .expect("matched subscribe");
    203         subscriptions
    204             .subscribe(
    205                 mismatched,
    206                 vec![pocket_filter(serde_json::json!({"kinds":[2]}))],
    207             )
    208             .expect("mismatched subscribe");
    209         let event = pocket_event(&event);
    210 
    211         assert_eq!(
    212             subscriptions
    213                 .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| true)
    214                 .expect("fanout"),
    215             vec![matched.clone()]
    216         );
    217         assert!(
    218             subscriptions
    219                 .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| false)
    220                 .expect("auth gated fanout")
    221                 .is_empty()
    222         );
    223     }
    224 
    225     fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter {
    226         let filter = filter_from_value(&value).expect("filter");
    227         crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter")
    228     }
    229 
    230     fn pocket_event(event: &tangle_protocol::Event) -> tangle_store_pocket::PocketOwnedEvent {
    231         crate::pocket_conversion::tangle_event_to_pocket(event).expect("pocket event")
    232     }
    233 }