live.rs (8072B)
1 #![forbid(unsafe_code)] 2 3 use crate::errors::BaseRelayError; 4 use std::collections::BTreeMap; 5 use tangle_groups::GroupAuthContext; 6 use tangle_protocol::SubscriptionId; 7 use tangle_store_pocket::{PocketEvent, PocketOwnedFilter}; 8 9 #[derive(Debug, Clone, PartialEq, Eq)] 10 pub(crate) struct LiveSubscriptionSet { 11 subscriptions: BTreeMap<SubscriptionId, LiveSubscription>, 12 max_subscriptions: usize, 13 } 14 15 #[derive(Debug, Clone, PartialEq, Eq)] 16 struct LiveSubscription { 17 filters: Vec<PocketOwnedFilter>, 18 } 19 20 impl LiveSubscriptionSet { 21 pub(crate) fn new( 22 max_pending_events: usize, 23 max_subscriptions: usize, 24 ) -> Result<Self, BaseRelayError> { 25 if max_pending_events == 0 { 26 return Err(BaseRelayError::invalid( 27 "live subscription pending event limit must be greater than zero", 28 )); 29 } 30 if max_subscriptions == 0 { 31 return Err(BaseRelayError::invalid( 32 "live subscription count limit must be greater than zero", 33 )); 34 } 35 Ok(Self { 36 subscriptions: BTreeMap::new(), 37 max_subscriptions, 38 }) 39 } 40 41 pub(crate) fn subscribe( 42 &mut self, 43 subscription_id: SubscriptionId, 44 filters: Vec<PocketOwnedFilter>, 45 ) -> Result<(), BaseRelayError> { 46 self.ensure_can_subscribe(&subscription_id, &filters)?; 47 self.subscriptions 48 .insert(subscription_id, LiveSubscription { filters }); 49 Ok(()) 50 } 51 52 pub(crate) fn ensure_can_subscribe( 53 &self, 54 subscription_id: &SubscriptionId, 55 filters: &[PocketOwnedFilter], 56 ) -> Result<(), BaseRelayError> { 57 if filters.is_empty() { 58 return Err(BaseRelayError::invalid( 59 "subscription must include at least one filter", 60 )); 61 } 62 if !self.subscriptions.contains_key(subscription_id) 63 && self.subscriptions.len() >= self.max_subscriptions 64 { 65 return Err(BaseRelayError::invalid( 66 "connection subscription limit exceeded", 67 )); 68 } 69 Ok(()) 70 } 71 72 pub(crate) fn close(&mut self, subscription_id: &SubscriptionId) -> CloseResult { 73 if self.subscriptions.remove(subscription_id).is_some() { 74 CloseResult::Closed 75 } else { 76 CloseResult::NotFound 77 } 78 } 79 80 pub(crate) fn contains(&self, subscription_id: &SubscriptionId) -> bool { 81 self.subscriptions.contains_key(subscription_id) 82 } 83 84 pub(crate) fn close_all(&mut self) -> usize { 85 let closed = self.subscriptions.len(); 86 self.subscriptions.clear(); 87 closed 88 } 89 90 pub(crate) fn fanout( 91 &self, 92 event: &PocketEvent, 93 auth: &GroupAuthContext, 94 visible_to_auth: impl Fn(&PocketEvent, &GroupAuthContext) -> bool, 95 ) -> Result<Vec<SubscriptionId>, BaseRelayError> { 96 self.subscriptions.iter().try_fold( 97 Vec::new(), 98 |mut matched, (subscription_id, subscription)| { 99 if !subscription 100 .filters 101 .iter() 102 .map(|filter| filter.event_matches(event)) 103 .collect::<Result<Vec<_>, _>>() 104 .map_err(|error| BaseRelayError::error(error.to_string()))? 105 .into_iter() 106 .any(|matches| matches) 107 { 108 return Ok(matched); 109 } 110 if visible_to_auth(event, auth) { 111 matched.push(subscription_id.clone()); 112 } 113 Ok(matched) 114 }, 115 ) 116 } 117 118 pub(crate) fn active_count(&self) -> usize { 119 self.subscriptions.len() 120 } 121 } 122 123 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 124 pub enum CloseResult { 125 Closed, 126 NotFound, 127 } 128 129 #[cfg(test)] 130 mod tests { 131 use super::{CloseResult, LiveSubscriptionSet}; 132 use tangle_groups::GroupAuthContext; 133 use tangle_protocol::{SubscriptionId, filter_from_value}; 134 use tangle_test_support::{FixtureKey, tangle_v2_event}; 135 136 #[test] 137 fn live_subscription_fanout_keeps_healthy_subscriptions_open() { 138 let mut subscriptions = LiveSubscriptionSet::new(1, 1).expect("subscriptions"); 139 let subscription_id = SubscriptionId::new("live").expect("subscription"); 140 subscriptions 141 .subscribe( 142 subscription_id.clone(), 143 vec![pocket_filter(serde_json::json!({"kinds":[1]}))], 144 ) 145 .expect("subscribe"); 146 let first = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "first") 147 .expect("first"); 148 let second = tangle_v2_event(FixtureKey::Member, 1_714_124_434, 1, Vec::new(), "second") 149 .expect("second"); 150 let third = tangle_v2_event(FixtureKey::Member, 1_714_124_435, 1, Vec::new(), "third") 151 .expect("third"); 152 153 assert!(matches!( 154 subscriptions 155 .fanout(&pocket_event(&first), &GroupAuthContext::unauthenticated(), |_, _| true) 156 .expect("fanout") 157 .as_slice(), 158 [delivered] if delivered == &subscription_id 159 )); 160 assert!(matches!( 161 subscriptions 162 .fanout(&pocket_event(&second), &GroupAuthContext::unauthenticated(), |_, _| true) 163 .expect("fanout") 164 .as_slice(), 165 [delivered] if delivered == &subscription_id 166 )); 167 assert!(matches!( 168 subscriptions 169 .fanout(&pocket_event(&third), &GroupAuthContext::unauthenticated(), |_, _| true) 170 .expect("fanout") 171 .as_slice(), 172 [delivered] if delivered == &subscription_id 173 )); 174 assert_eq!(subscriptions.close(&subscription_id), CloseResult::Closed); 175 } 176 177 #[test] 178 fn live_subscription_fanout_uses_pocket_filter_matching_and_auth_gate() { 179 let mut subscriptions = LiveSubscriptionSet::new(4, 4).expect("subscriptions"); 180 let event = tangle_v2_event( 181 FixtureKey::Member, 182 1_714_124_433, 183 1, 184 vec![tangle_protocol::Tag::from_parts("t", &["market"]).expect("tag")], 185 "first", 186 ) 187 .expect("event"); 188 let matched = SubscriptionId::new("matched").expect("subscription"); 189 let mismatched = SubscriptionId::new("mismatched").expect("subscription"); 190 subscriptions 191 .subscribe( 192 matched.clone(), 193 vec![pocket_filter(serde_json::json!({ 194 "ids": [event.id().as_str()], 195 "authors": [event.unsigned().pubkey().as_str()], 196 "kinds": [1], 197 "#t": ["market"], 198 "since": 1_714_124_433, 199 "until": 1_714_124_434 200 }))], 201 ) 202 .expect("matched subscribe"); 203 subscriptions 204 .subscribe( 205 mismatched, 206 vec![pocket_filter(serde_json::json!({"kinds":[2]}))], 207 ) 208 .expect("mismatched subscribe"); 209 let event = pocket_event(&event); 210 211 assert_eq!( 212 subscriptions 213 .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| true) 214 .expect("fanout"), 215 vec![matched.clone()] 216 ); 217 assert!( 218 subscriptions 219 .fanout(&event, &GroupAuthContext::unauthenticated(), |_, _| false) 220 .expect("auth gated fanout") 221 .is_empty() 222 ); 223 } 224 225 fn pocket_filter(value: serde_json::Value) -> tangle_store_pocket::PocketOwnedFilter { 226 let filter = filter_from_value(&value).expect("filter"); 227 crate::pocket_conversion::tangle_filter_to_pocket(&filter).expect("pocket filter") 228 } 229 230 fn pocket_event(event: &tangle_protocol::Event) -> tangle_store_pocket::PocketOwnedEvent { 231 crate::pocket_conversion::tangle_event_to_pocket(event).expect("pocket event") 232 } 233 }