runtime.rs (18091B)
1 use crate::error::RadrootsNostrRuntimeError; 2 use crate::sink::RadrootsNostrEventSink; 3 use crate::types::{ 4 RadrootsNostrConnectionSnapshot, RadrootsNostrRuntimeEvent, RadrootsNostrSubscriptionHandle, 5 RadrootsNostrSubscriptionPolicy, RadrootsNostrSubscriptionSpec, RadrootsNostrTrafficLight, 6 }; 7 use alloc::string::{String, ToString}; 8 use alloc::sync::Arc; 9 use alloc::vec::Vec; 10 use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 11 use core::time::Duration; 12 use futures::StreamExt; 13 use radroots_nostr::prelude::{ 14 RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMonitor, RadrootsNostrMonitorNotification, 15 RadrootsNostrRelayStatus, RadrootsNostrRelayUrl, RadrootsNostrTimestamp, 16 }; 17 use std::collections::HashMap; 18 use std::sync::Mutex; 19 use tokio::sync::mpsc; 20 use tokio::task::JoinHandle; 21 22 #[derive(Clone)] 23 pub struct RadrootsNostrRuntimeBuilder { 24 keys: Option<RadrootsNostrKeys>, 25 relays: Vec<String>, 26 queue_capacity: usize, 27 monitor_capacity: usize, 28 event_sink: Option<Arc<dyn RadrootsNostrEventSink>>, 29 } 30 31 impl RadrootsNostrRuntimeBuilder { 32 pub const DEFAULT_QUEUE_CAPACITY: usize = 2_048; 33 pub const DEFAULT_MONITOR_CAPACITY: usize = 2_048; 34 35 pub fn new() -> Self { 36 Self { 37 keys: None, 38 relays: Vec::new(), 39 queue_capacity: Self::DEFAULT_QUEUE_CAPACITY, 40 monitor_capacity: Self::DEFAULT_MONITOR_CAPACITY, 41 event_sink: None, 42 } 43 } 44 45 pub fn keys(mut self, keys: RadrootsNostrKeys) -> Self { 46 self.keys = Some(keys); 47 self 48 } 49 50 pub fn relays(mut self, relays: Vec<String>) -> Self { 51 self.relays = relays; 52 self 53 } 54 55 pub fn add_relay(mut self, relay: impl Into<String>) -> Self { 56 self.relays.push(relay.into()); 57 self 58 } 59 60 pub fn queue_capacity(mut self, capacity: usize) -> Self { 61 self.queue_capacity = capacity; 62 self 63 } 64 65 pub fn monitor_capacity(mut self, capacity: usize) -> Self { 66 self.monitor_capacity = capacity; 67 self 68 } 69 70 pub fn event_sink(mut self, sink: Arc<dyn RadrootsNostrEventSink>) -> Self { 71 self.event_sink = Some(sink); 72 self 73 } 74 75 pub fn build(self) -> Result<RadrootsNostrRuntime, RadrootsNostrRuntimeError> { 76 let keys = self 77 .keys 78 .ok_or(RadrootsNostrRuntimeError::MissingConfig("keys"))?; 79 if self.relays.is_empty() { 80 return Err(RadrootsNostrRuntimeError::MissingConfig("relays")); 81 } 82 if self.queue_capacity == 0 { 83 return Err(RadrootsNostrRuntimeError::InvalidConfig("queue_capacity")); 84 } 85 if self.monitor_capacity == 0 { 86 return Err(RadrootsNostrRuntimeError::InvalidConfig("monitor_capacity")); 87 } 88 89 let monitor = RadrootsNostrMonitor::new(self.monitor_capacity); 90 let client = RadrootsNostrClient::new_with_monitor(keys, monitor); 91 let (queue_tx, queue_rx) = mpsc::channel(self.queue_capacity); 92 93 let inner = Arc::new(RadrootsNostrRuntimeInner { 94 client, 95 relays: Mutex::new(self.relays), 96 queue_tx, 97 queue_rx: Mutex::new(queue_rx), 98 statuses: Mutex::new(HashMap::new()), 99 last_error: Mutex::new(None), 100 monitor_task: Mutex::new(None), 101 subscription_tasks: Mutex::new(HashMap::new()), 102 started: AtomicBool::new(false), 103 shutting_down: AtomicBool::new(false), 104 next_subscription_id: AtomicU64::new(1), 105 event_sink: self.event_sink, 106 }); 107 108 Ok(RadrootsNostrRuntime { inner }) 109 } 110 } 111 112 impl Default for RadrootsNostrRuntimeBuilder { 113 fn default() -> Self { 114 Self::new() 115 } 116 } 117 118 #[derive(Clone)] 119 pub struct RadrootsNostrRuntime { 120 inner: Arc<RadrootsNostrRuntimeInner>, 121 } 122 123 struct RadrootsNostrRuntimeInner { 124 client: RadrootsNostrClient, 125 relays: Mutex<Vec<String>>, 126 queue_tx: mpsc::Sender<RadrootsNostrRuntimeEvent>, 127 queue_rx: Mutex<mpsc::Receiver<RadrootsNostrRuntimeEvent>>, 128 statuses: Mutex<HashMap<RadrootsNostrRelayUrl, RadrootsNostrRelayStatus>>, 129 last_error: Mutex<Option<String>>, 130 monitor_task: Mutex<Option<JoinHandle<()>>>, 131 subscription_tasks: Mutex<HashMap<String, JoinHandle<()>>>, 132 started: AtomicBool, 133 shutting_down: AtomicBool, 134 next_subscription_id: AtomicU64, 135 event_sink: Option<Arc<dyn RadrootsNostrEventSink>>, 136 } 137 138 impl RadrootsNostrRuntime { 139 pub async fn start(&self) -> Result<(), RadrootsNostrRuntimeError> { 140 if self.inner.started.swap(true, Ordering::SeqCst) { 141 return Err(RadrootsNostrRuntimeError::RuntimeAlreadyStarted); 142 } 143 self.inner.shutting_down.store(false, Ordering::SeqCst); 144 145 let relays = self.relays(); 146 for relay in relays { 147 if let Err(source) = self.inner.client.add_relay(relay.as_str()).await { 148 let message = source.to_string(); 149 self.record_error(message.clone()); 150 let _ = self 151 .inner 152 .queue_tx 153 .send(RadrootsNostrRuntimeEvent::Error { message }) 154 .await; 155 } 156 } 157 158 self.spawn_monitor_watcher(); 159 self.inner.client.connect().await; 160 let _ = self 161 .inner 162 .queue_tx 163 .send(RadrootsNostrRuntimeEvent::RuntimeStarted) 164 .await; 165 166 Ok(()) 167 } 168 169 pub async fn shutdown(&self) -> Result<(), RadrootsNostrRuntimeError> { 170 if !self.inner.started.swap(false, Ordering::SeqCst) { 171 return Err(RadrootsNostrRuntimeError::RuntimeNotStarted); 172 } 173 self.inner.shutting_down.store(true, Ordering::SeqCst); 174 175 if let Ok(mut guard) = self.inner.subscription_tasks.lock() { 176 for (_, handle) in guard.drain() { 177 handle.abort(); 178 } 179 } 180 181 if let Ok(mut guard) = self.inner.monitor_task.lock() 182 && let Some(handle) = guard.take() 183 { 184 handle.abort(); 185 } 186 187 let _ = self 188 .inner 189 .queue_tx 190 .send(RadrootsNostrRuntimeEvent::RuntimeStopped) 191 .await; 192 193 Ok(()) 194 } 195 196 pub async fn subscribe( 197 &self, 198 spec: RadrootsNostrSubscriptionSpec, 199 ) -> Result<RadrootsNostrSubscriptionHandle, RadrootsNostrRuntimeError> { 200 if !self.inner.started.load(Ordering::SeqCst) { 201 return Err(RadrootsNostrRuntimeError::RuntimeNotStarted); 202 } 203 204 let sequence = self 205 .inner 206 .next_subscription_id 207 .fetch_add(1, Ordering::SeqCst); 208 let id = alloc::format!("sub-{sequence}"); 209 let handle = RadrootsNostrSubscriptionHandle { 210 id: id.clone(), 211 name: spec.name.clone(), 212 }; 213 214 let worker = spawn_subscription_worker(self.inner.clone(), id.clone(), spec); 215 self.inner 216 .subscription_tasks 217 .lock() 218 .map_err(|_| RadrootsNostrRuntimeError::Runtime("subscription lock poisoned".into()))? 219 .insert(id, worker); 220 221 Ok(handle) 222 } 223 224 pub async fn unsubscribe( 225 &self, 226 handle: &RadrootsNostrSubscriptionHandle, 227 ) -> Result<(), RadrootsNostrRuntimeError> { 228 let removed = self 229 .inner 230 .subscription_tasks 231 .lock() 232 .map_err(|_| RadrootsNostrRuntimeError::Runtime("subscription lock poisoned".into()))? 233 .remove(handle.id.as_str()); 234 235 let task = removed.ok_or_else(|| { 236 RadrootsNostrRuntimeError::SubscriptionNotFound(handle.id.to_string()) 237 })?; 238 task.abort(); 239 let _ = self 240 .inner 241 .queue_tx 242 .send(RadrootsNostrRuntimeEvent::SubscriptionClosed { 243 id: handle.id.clone(), 244 }) 245 .await; 246 247 Ok(()) 248 } 249 250 pub fn set_relays(&self, relays: Vec<String>) -> Result<(), RadrootsNostrRuntimeError> { 251 if relays.is_empty() { 252 return Err(RadrootsNostrRuntimeError::InvalidConfig("relays")); 253 } 254 self.inner 255 .relays 256 .lock() 257 .map_err(|_| RadrootsNostrRuntimeError::Runtime("relays lock poisoned".into())) 258 .map(|mut guard| { 259 *guard = relays; 260 }) 261 } 262 263 pub fn relays(&self) -> Vec<String> { 264 self.inner 265 .relays 266 .lock() 267 .map(|guard| guard.clone()) 268 .unwrap_or_default() 269 } 270 271 pub fn drain_events(&self, max: usize) -> Vec<RadrootsNostrRuntimeEvent> { 272 if max == 0 { 273 return Vec::new(); 274 } 275 276 let mut out = Vec::with_capacity(max); 277 let mut guard = match self.inner.queue_rx.lock() { 278 Ok(guard) => guard, 279 Err(_) => return out, 280 }; 281 282 for _ in 0..max { 283 match guard.try_recv() { 284 Ok(event) => out.push(event), 285 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, 286 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, 287 } 288 } 289 290 out 291 } 292 293 pub fn snapshot(&self) -> RadrootsNostrConnectionSnapshot { 294 let statuses = self 295 .inner 296 .statuses 297 .lock() 298 .map(|guard| guard.clone()) 299 .unwrap_or_default(); 300 let last_error = self 301 .inner 302 .last_error 303 .lock() 304 .ok() 305 .and_then(|guard| guard.clone()); 306 307 let mut connected = 0usize; 308 let mut connecting = 0usize; 309 for (_, status) in statuses.iter() { 310 match status { 311 RadrootsNostrRelayStatus::Connected => connected += 1, 312 RadrootsNostrRelayStatus::Connecting => connecting += 1, 313 _ => {} 314 } 315 } 316 317 let light = if connected > 0 { 318 RadrootsNostrTrafficLight::Green 319 } else if connecting > 0 { 320 RadrootsNostrTrafficLight::Yellow 321 } else { 322 RadrootsNostrTrafficLight::Red 323 }; 324 325 RadrootsNostrConnectionSnapshot { 326 light, 327 connected, 328 connecting, 329 last_error, 330 } 331 } 332 333 fn spawn_monitor_watcher(&self) { 334 let inner = self.inner.clone(); 335 let handle = tokio::spawn(async move { 336 if let Some(monitor) = inner.client.monitor() { 337 let mut rx = monitor.subscribe(); 338 while let Ok(notification) = rx.recv().await { 339 match notification { 340 RadrootsNostrMonitorNotification::StatusChanged { relay_url, status } => { 341 if let Ok(mut map) = inner.statuses.lock() { 342 map.insert(relay_url, status); 343 } 344 } 345 } 346 } 347 } 348 }); 349 350 if let Ok(mut guard) = self.inner.monitor_task.lock() 351 && let Some(existing) = guard.replace(handle) 352 { 353 existing.abort(); 354 } 355 } 356 357 fn record_error(&self, message: String) { 358 if let Ok(mut guard) = self.inner.last_error.lock() { 359 *guard = Some(message); 360 } 361 } 362 } 363 364 fn spawn_subscription_worker( 365 inner: Arc<RadrootsNostrRuntimeInner>, 366 id: String, 367 spec: RadrootsNostrSubscriptionSpec, 368 ) -> JoinHandle<()> { 369 tokio::spawn(async move { 370 let _ = inner 371 .queue_tx 372 .send(RadrootsNostrRuntimeEvent::SubscriptionOpened { id: id.clone() }) 373 .await; 374 375 let timeout = Duration::from_secs(spec.stream_timeout_secs.max(1)); 376 let reconnect_delay = Duration::from_millis(spec.reconnect_delay_millis.max(1)); 377 let mut since_unix: Option<u64> = None; 378 379 loop { 380 if inner.shutting_down.load(Ordering::SeqCst) { 381 break; 382 } 383 384 let mut filter = spec.filter.clone(); 385 if let Some(since) = since_unix { 386 filter = filter.since(RadrootsNostrTimestamp::from(since)); 387 } 388 389 let mut stream = match inner.client.stream_events(filter, timeout).await { 390 Ok(stream) => stream, 391 Err(source) => { 392 let message = source.to_string(); 393 if let Ok(mut guard) = inner.last_error.lock() { 394 *guard = Some(message.clone()); 395 } 396 let _ = inner 397 .queue_tx 398 .send(RadrootsNostrRuntimeEvent::Error { message }) 399 .await; 400 401 if matches!(spec.policy, RadrootsNostrSubscriptionPolicy::OneShotOnEose) { 402 break; 403 } 404 405 tokio::time::sleep(reconnect_delay).await; 406 continue; 407 } 408 }; 409 410 while let Some(event) = stream.next().await { 411 let event_id = event.id.to_hex(); 412 let author = event.pubkey.to_hex(); 413 let kind = event.kind.as_u16(); 414 since_unix = Some(event.created_at.as_secs().saturating_add(1)); 415 416 if let Some(sink) = inner.event_sink.as_ref() 417 && let Err(message) = sink.ingest_event(&event) 418 { 419 if let Ok(mut guard) = inner.last_error.lock() { 420 *guard = Some(message.clone()); 421 } 422 let _ = inner 423 .queue_tx 424 .send(RadrootsNostrRuntimeEvent::Error { message }) 425 .await; 426 } 427 428 let _ = inner 429 .queue_tx 430 .send(RadrootsNostrRuntimeEvent::Note { 431 subscription_id: id.clone(), 432 id: event_id, 433 author, 434 kind, 435 relay: None, 436 }) 437 .await; 438 439 if inner.shutting_down.load(Ordering::SeqCst) { 440 break; 441 } 442 } 443 444 if matches!(spec.policy, RadrootsNostrSubscriptionPolicy::OneShotOnEose) { 445 break; 446 } 447 448 tokio::time::sleep(reconnect_delay).await; 449 } 450 451 let _ = inner 452 .queue_tx 453 .send(RadrootsNostrRuntimeEvent::SubscriptionClosed { id }) 454 .await; 455 }) 456 } 457 458 #[cfg(test)] 459 mod tests { 460 use super::*; 461 use crate::sink::RadrootsNostrInMemoryEventSink; 462 use alloc::sync::Arc; 463 use radroots_nostr::prelude::RadrootsNostrFilter; 464 465 fn sample_runtime() -> RadrootsNostrRuntime { 466 RadrootsNostrRuntimeBuilder::new() 467 .keys(RadrootsNostrKeys::generate()) 468 .add_relay("wss://relay.example.com") 469 .build() 470 .expect("runtime should build") 471 } 472 473 #[test] 474 fn build_requires_keys() { 475 let result = RadrootsNostrRuntimeBuilder::new() 476 .add_relay("wss://relay.example.com") 477 .build(); 478 assert!(matches!( 479 result, 480 Err(RadrootsNostrRuntimeError::MissingConfig("keys")) 481 )); 482 } 483 484 #[test] 485 fn build_requires_relays() { 486 let result = RadrootsNostrRuntimeBuilder::new() 487 .keys(RadrootsNostrKeys::generate()) 488 .build(); 489 assert!(matches!( 490 result, 491 Err(RadrootsNostrRuntimeError::MissingConfig("relays")) 492 )); 493 } 494 495 #[test] 496 fn queue_capacity_must_be_positive() { 497 let result = RadrootsNostrRuntimeBuilder::new() 498 .keys(RadrootsNostrKeys::generate()) 499 .add_relay("wss://relay.example.com") 500 .queue_capacity(0) 501 .build(); 502 assert!(matches!( 503 result, 504 Err(RadrootsNostrRuntimeError::InvalidConfig("queue_capacity")) 505 )); 506 } 507 508 #[test] 509 fn monitor_capacity_must_be_positive() { 510 let result = RadrootsNostrRuntimeBuilder::new() 511 .keys(RadrootsNostrKeys::generate()) 512 .add_relay("wss://relay.example.com") 513 .monitor_capacity(0) 514 .build(); 515 assert!(matches!( 516 result, 517 Err(RadrootsNostrRuntimeError::InvalidConfig("monitor_capacity")) 518 )); 519 } 520 521 #[test] 522 fn build_accepts_event_sink() { 523 let sink = Arc::new(RadrootsNostrInMemoryEventSink::new()); 524 let result = RadrootsNostrRuntimeBuilder::new() 525 .keys(RadrootsNostrKeys::generate()) 526 .add_relay("wss://relay.example.com") 527 .event_sink(sink) 528 .build(); 529 assert!(result.is_ok()); 530 } 531 532 #[test] 533 fn set_relays_rejects_empty_input() { 534 let runtime = sample_runtime(); 535 let result = runtime.set_relays(Vec::new()); 536 assert!(matches!( 537 result, 538 Err(RadrootsNostrRuntimeError::InvalidConfig("relays")) 539 )); 540 } 541 542 #[test] 543 fn drain_events_zero_returns_empty() { 544 let runtime = sample_runtime(); 545 assert!(runtime.drain_events(0).is_empty()); 546 } 547 548 #[tokio::test] 549 async fn subscribe_requires_started_runtime() { 550 let runtime = sample_runtime(); 551 let spec = RadrootsNostrSubscriptionSpec::streaming(RadrootsNostrFilter::new()); 552 let result = runtime.subscribe(spec).await; 553 assert!(matches!( 554 result, 555 Err(RadrootsNostrRuntimeError::RuntimeNotStarted) 556 )); 557 } 558 559 #[tokio::test] 560 async fn unsubscribe_requires_existing_subscription() { 561 let runtime = sample_runtime(); 562 let handle = RadrootsNostrSubscriptionHandle { 563 id: "sub-999".into(), 564 name: None, 565 }; 566 let result = runtime.unsubscribe(&handle).await; 567 assert!(matches!( 568 result, 569 Err(RadrootsNostrRuntimeError::SubscriptionNotFound(_)) 570 )); 571 } 572 }