tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit abc9147a12c11a457b8eff1b7ec2de076a63bab9
parent 5f368bbcf8240dc001c169d6b55bf080dd191fac
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 02:19:48 -0700

runtime: add offset event bus

- add a bounded offset broadcast bus for stored Pocket offsets
- expose typed receivers with empty, closed, and lagged states
- allow publishing with no active receivers without failing storage flow
- verify formatting, runtime tests, workspace checks, and clippy

Diffstat:
Acrates/tangle_runtime/src/event_bus.rs | 113+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/lib.rs | 38+++++++++++++++++++++++++-------------
Mcrates/tangle_runtime/src/runtime.rs | 281++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
3 files changed, 317 insertions(+), 115 deletions(-)

diff --git a/crates/tangle_runtime/src/event_bus.rs b/crates/tangle_runtime/src/event_bus.rs @@ -0,0 +1,113 @@ +#![forbid(unsafe_code)] + +use crate::errors::BaseRelayError; +use tangle_groups::StoreOffset; +use tokio::sync::broadcast; + +#[derive(Debug, Clone)] +pub struct TangleEventBus { + sender: broadcast::Sender<StoreOffset>, + capacity: usize, +} + +impl TangleEventBus { + pub fn new(capacity: usize) -> Result<Self, BaseRelayError> { + if capacity == 0 { + return Err(BaseRelayError::invalid( + "runtime event bus capacity must be greater than zero", + )); + } + let (sender, _) = broadcast::channel(capacity); + Ok(Self { sender, capacity }) + } + + pub fn capacity(&self) -> usize { + self.capacity + } + + pub fn subscribe(&self) -> TangleEventReceiver { + TangleEventReceiver { + receiver: self.sender.subscribe(), + } + } + + pub fn publish(&self, offset: StoreOffset) -> usize { + self.sender.send(offset).unwrap_or(0) + } + + pub fn receiver_count(&self) -> usize { + self.sender.receiver_count() + } +} + +#[derive(Debug)] +pub struct TangleEventReceiver { + receiver: broadcast::Receiver<StoreOffset>, +} + +impl TangleEventReceiver { + pub fn try_recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> { + self.receiver.try_recv().map_err(Into::into) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TangleEventReceiveError { + Empty, + Closed, + Lagged(u64), +} + +impl From<broadcast::error::TryRecvError> for TangleEventReceiveError { + fn from(error: broadcast::error::TryRecvError) -> Self { + match error { + broadcast::error::TryRecvError::Empty => Self::Empty, + broadcast::error::TryRecvError::Closed => Self::Closed, + broadcast::error::TryRecvError::Lagged(skipped) => Self::Lagged(skipped), + } + } +} + +#[cfg(test)] +mod tests { + use super::{TangleEventBus, TangleEventReceiveError}; + use tangle_groups::StoreOffset; + + #[test] + fn event_bus_broadcasts_offsets_to_subscribers() { + let bus = TangleEventBus::new(2).expect("bus"); + let mut first = bus.subscribe(); + let mut second = bus.subscribe(); + + assert_eq!(bus.capacity(), 2); + assert_eq!(bus.receiver_count(), 2); + assert_eq!(bus.publish(StoreOffset::new(42)), 2); + assert_eq!(first.try_recv().expect("first"), StoreOffset::new(42)); + assert_eq!(second.try_recv().expect("second"), StoreOffset::new(42)); + } + + #[test] + fn event_bus_reports_lagged_receivers() { + let bus = TangleEventBus::new(2).expect("bus"); + let mut receiver = bus.subscribe(); + + assert_eq!(bus.publish(StoreOffset::new(1)), 1); + assert_eq!(bus.publish(StoreOffset::new(2)), 1); + assert_eq!(bus.publish(StoreOffset::new(3)), 1); + assert_eq!(bus.publish(StoreOffset::new(4)), 1); + assert_eq!( + receiver.try_recv().expect_err("lagged"), + TangleEventReceiveError::Lagged(2) + ); + assert_eq!(receiver.try_recv().expect("next"), StoreOffset::new(3)); + assert_eq!(receiver.try_recv().expect("latest"), StoreOffset::new(4)); + } + + #[test] + fn event_bus_accepts_publish_without_receivers() { + let bus = TangleEventBus::new(2).expect("bus"); + + assert_eq!(bus.receiver_count(), 0); + assert_eq!(bus.publish(StoreOffset::new(7)), 0); + } +} diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -3,6 +3,7 @@ pub mod chorus_pocket; pub mod config; pub mod errors; +pub mod event_bus; pub mod groups; pub mod nip11; pub mod ops; @@ -30,6 +31,20 @@ pub struct TangleRuntimeStartupReport { } impl TangleRuntimeStartupReport { + pub(crate) fn new( + relay_url: impl Into<String>, + data_directory: PathBuf, + groups_enabled: bool, + readiness: BaseRelayReadinessState, + ) -> Self { + Self { + relay_url: relay_url.into(), + data_directory, + groups_enabled, + readiness, + } + } + pub fn relay_url(&self) -> &str { &self.relay_url } @@ -91,20 +106,17 @@ pub fn load_base_relay_runtime_config( pub fn open_base_relay_from_config_path( path: impl AsRef<Path>, ) -> Result<TangleRuntimeStartupReport, TangleRuntimeLoadError> { - let config = load_base_relay_runtime_config(path)?; - let mut runtime = TangleRuntime::open(config).map_err(TangleRuntimeLoadError::OpenRelay)?; - let readiness = runtime.readiness().clone(); + let mut runtime = open_tangle_runtime_from_config_path(path)?; + let report = runtime.startup_report(); runtime .shutdown() .map_err(TangleRuntimeLoadError::ShutdownRelay)?; - Ok(TangleRuntimeStartupReport { - relay_url: runtime.config().relay_url().to_owned(), - data_directory: runtime - .config() - .pocket_config() - .data_directory() - .to_path_buf(), - groups_enabled: runtime.config().groups().enabled(), - readiness, - }) + Ok(report) +} + +pub fn open_tangle_runtime_from_config_path( + path: impl AsRef<Path>, +) -> Result<TangleRuntime, TangleRuntimeLoadError> { + let config = load_base_relay_runtime_config(path)?; + TangleRuntime::open(config).map_err(TangleRuntimeLoadError::OpenRelay) } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1,39 +1,46 @@ #![forbid(unsafe_code)] use crate::{ + TangleRuntimeStartupReport, config::BaseRelayRuntimeConfig, errors::BaseRelayError, + event_bus::TangleEventBus, ops::BaseRelayReadinessState, - relay::core::{BaseRelay, BaseRelayShutdownReport}, + relay::{ + auth::BaseAuthState, + core::{BaseRelay, BaseRelayShutdownReport}, + }, }; -use std::sync::{ - Arc, - atomic::{AtomicU64, AtomicUsize, Ordering}, +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, AtomicUsize, Ordering}, + }, + time::Instant, }; -use tangle_groups::StoreOffset; -use tokio::sync::{broadcast, watch}; +use tokio::sync::watch; pub struct TangleRuntime { config: BaseRelayRuntimeConfig, relay: BaseRelay, readiness: BaseRelayReadinessState, limits: TangleRuntimeLimits, - event_bus: TangleRuntimeEventBus, - metrics: Arc<TangleRuntimeMetrics>, + event_bus: TangleEventBus, + metrics: TangleRuntimeMetrics, shutdown: TangleShutdownSignal, } impl TangleRuntime { pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { - let limits = TangleRuntimeLimits::from_config(&config); + let limits = TangleRuntimeLimits::from_config(&config)?; let relay = config.open_relay()?; let readiness = relay.readiness_state(); Ok(Self { config, relay, readiness, - event_bus: TangleRuntimeEventBus::new(limits.event_bus_capacity())?, - metrics: Arc::new(TangleRuntimeMetrics::default()), + event_bus: TangleEventBus::new(limits.event_bus_capacity())?, + metrics: TangleRuntimeMetrics::new(), limits, shutdown: TangleShutdownSignal::new(), }) @@ -51,7 +58,11 @@ impl TangleRuntime { &mut self.relay } - pub fn readiness(&self) -> &BaseRelayReadinessState { + pub fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { + self.config.auth_state() + } + + pub fn readiness_state(&self) -> &BaseRelayReadinessState { &self.readiness } @@ -59,11 +70,11 @@ impl TangleRuntime { self.limits } - pub fn event_bus(&self) -> &TangleRuntimeEventBus { + pub fn event_bus(&self) -> &TangleEventBus { &self.event_bus } - pub fn metrics(&self) -> &Arc<TangleRuntimeMetrics> { + pub fn metrics(&self) -> &TangleRuntimeMetrics { &self.metrics } @@ -71,8 +82,17 @@ impl TangleRuntime { &self.shutdown } + pub fn startup_report(&self) -> TangleRuntimeStartupReport { + TangleRuntimeStartupReport::new( + self.config.relay_url(), + self.config.pocket_config().data_directory().to_path_buf(), + self.config.groups().enabled(), + self.readiness.clone(), + ) + } + pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> { - self.shutdown.request(); + self.shutdown.request_shutdown(); self.relay.shutdown() } } @@ -81,24 +101,43 @@ impl TangleRuntime { pub struct TangleRuntimeLimits { max_pending_events: usize, event_bus_capacity: usize, + outbound_queue_capacity: usize, } impl TangleRuntimeLimits { - pub fn new(max_pending_events: usize) -> Result<Self, BaseRelayError> { + pub fn new( + max_pending_events: usize, + event_bus_capacity: usize, + outbound_queue_capacity: usize, + ) -> Result<Self, BaseRelayError> { if max_pending_events == 0 { return Err(BaseRelayError::invalid( - "limits.max_pending_events must be greater than zero", + "runtime max pending events must be greater than zero", + )); + } + if event_bus_capacity == 0 { + return Err(BaseRelayError::invalid( + "runtime event bus capacity must be greater than zero", + )); + } + if outbound_queue_capacity == 0 { + return Err(BaseRelayError::invalid( + "runtime outbound queue capacity must be greater than zero", )); } Ok(Self { max_pending_events, - event_bus_capacity: max_pending_events, + event_bus_capacity, + outbound_queue_capacity, }) } - pub fn from_config(config: &BaseRelayRuntimeConfig) -> Self { - Self::new(config.max_pending_events()) - .expect("runtime config validates pending event limit") + pub fn from_config(config: &BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { + Self::new( + config.max_pending_events(), + config.max_pending_events(), + config.max_pending_events(), + ) } pub fn max_pending_events(self) -> usize { @@ -108,52 +147,66 @@ impl TangleRuntimeLimits { pub fn event_bus_capacity(self) -> usize { self.event_bus_capacity } + + pub fn outbound_queue_capacity(self) -> usize { + self.outbound_queue_capacity + } } #[derive(Debug, Clone)] -pub struct TangleRuntimeEventBus { - sender: broadcast::Sender<StoreOffset>, +pub struct TangleRuntimeMetrics { + inner: Arc<TangleRuntimeMetricsInner>, } -impl TangleRuntimeEventBus { - pub fn new(capacity: usize) -> Result<Self, BaseRelayError> { - if capacity == 0 { - return Err(BaseRelayError::invalid( - "runtime event bus capacity must be greater than zero", - )); +#[derive(Debug)] +struct TangleRuntimeMetricsInner { + started_at: Instant, + active_sessions: AtomicUsize, + stored_event_offsets: AtomicU64, +} + +impl TangleRuntimeMetrics { + pub fn new() -> Self { + Self { + inner: Arc::new(TangleRuntimeMetricsInner { + started_at: Instant::now(), + active_sessions: AtomicUsize::new(0), + stored_event_offsets: AtomicU64::new(0), + }), } - let (sender, _) = broadcast::channel(capacity); - Ok(Self { sender }) } - pub fn subscribe(&self) -> broadcast::Receiver<StoreOffset> { - self.sender.subscribe() + pub fn started_at(&self) -> Instant { + self.inner.started_at } - pub fn publish(&self, offset: StoreOffset) -> Result<usize, BaseRelayError> { - self.sender - .send(offset) - .map_err(|error| BaseRelayError::error(error.to_string())) + pub fn active_sessions(&self) -> usize { + self.inner.active_sessions.load(Ordering::Relaxed) } - pub fn receiver_count(&self) -> usize { - self.sender.receiver_count() + pub fn increment_active_sessions(&self) -> usize { + self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1 } -} -#[derive(Debug, Default)] -pub struct TangleRuntimeMetrics { - active_sessions: AtomicUsize, - stored_event_offsets: AtomicU64, -} - -impl TangleRuntimeMetrics { - pub fn active_sessions(&self) -> usize { - self.active_sessions.load(Ordering::Relaxed) + pub fn decrement_active_sessions(&self) -> usize { + self.inner.active_sessions.fetch_sub(1, Ordering::Relaxed) - 1 } pub fn stored_event_offsets(&self) -> u64 { - self.stored_event_offsets.load(Ordering::Relaxed) + self.inner.stored_event_offsets.load(Ordering::Relaxed) + } + + pub fn record_stored_event_offset(&self) -> u64 { + self.inner + .stored_event_offsets + .fetch_add(1, Ordering::Relaxed) + + 1 + } +} + +impl Default for TangleRuntimeMetrics { + fn default() -> Self { + Self::new() } } @@ -172,12 +225,12 @@ impl TangleShutdownSignal { self.sender.subscribe() } - pub fn requested(&self) -> bool { - *self.sender.borrow() + pub fn request_shutdown(&self) { + let _ = self.sender.send(true); } - pub fn request(&self) { - let _ = self.sender.send(true); + pub fn requested(&self) -> bool { + *self.sender.borrow() } } @@ -189,81 +242,105 @@ impl Default for TangleShutdownSignal { #[cfg(test)] mod tests { - use super::{TangleRuntime, TangleRuntimeEventBus, TangleRuntimeLimits}; - use crate::config::parse_base_relay_runtime_config_json; - use std::{env, fs, process}; + use super::{TangleRuntime, TangleRuntimeLimits}; + use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; + use crate::event_bus::TangleEventBus; + use serde_json::json; + use std::path::{Path, PathBuf}; use tangle_groups::StoreOffset; #[test] - fn tangle_runtime_opens_owner_boundary_and_signals_shutdown() { - let config = runtime_config("owner"); + fn tangle_runtime_opens_owned_process_shell_from_config() { + let root = temp_root("owned-runtime"); + let _ = std::fs::remove_dir_all(&root); + let config = runtime_config(&root, 8); + let mut runtime = TangleRuntime::open(config).expect("runtime"); let mut offsets = runtime.event_bus().subscribe(); let shutdown = runtime.shutdown_signal().subscribe(); assert_eq!(runtime.config().relay_url(), "wss://relay.radroots.test"); + assert_eq!(runtime.config().listen_addr().to_string(), "127.0.0.1:0"); assert_eq!(runtime.limits().max_pending_events(), 8); assert_eq!(runtime.limits().event_bus_capacity(), 8); + assert_eq!(runtime.limits().outbound_queue_capacity(), 8); + assert_eq!(runtime.event_bus().capacity(), 8); + assert_eq!(runtime.event_bus().receiver_count(), 1); assert_eq!(runtime.metrics().active_sessions(), 0); assert_eq!(runtime.metrics().stored_event_offsets(), 0); assert!(runtime.relay().groups_enabled()); - assert!(runtime.readiness().is_ready()); + assert!(runtime.readiness_state().is_ready()); assert!(!*shutdown.borrow()); + assert_eq!(runtime.event_bus().publish(StoreOffset::new(42)), 1); + assert_eq!(offsets.try_recv().expect("offset"), StoreOffset::new(42)); assert_eq!( runtime - .event_bus() - .publish(StoreOffset::new(7)) - .expect("publish"), - 1 + .auth_state() + .expect("auth") + .authenticated_pubkeys() + .len(), + 0 ); - assert_eq!(offsets.try_recv().expect("offset"), StoreOffset::new(7)); + assert_eq!( + runtime.startup_report().data_directory(), + Path::new(&root).join("pocket") + ); + + assert_eq!(runtime.metrics().increment_active_sessions(), 1); + assert_eq!(runtime.metrics().active_sessions(), 1); + assert_eq!(runtime.metrics().decrement_active_sessions(), 0); + assert_eq!(runtime.metrics().active_sessions(), 0); + assert_eq!(runtime.metrics().record_stored_event_offset(), 1); + assert_eq!(runtime.metrics().stored_event_offsets(), 1); let report = runtime.shutdown().expect("shutdown"); assert_eq!(report.closed_subscriptions(), 0); assert!(runtime.shutdown_signal().requested()); assert!(*shutdown.borrow()); + + let _ = std::fs::remove_dir_all(root); } #[test] fn runtime_limits_and_event_bus_reject_zero_capacity() { - assert!(TangleRuntimeLimits::new(0).is_err()); - assert!(TangleRuntimeEventBus::new(0).is_err()); - } - - fn runtime_config(name: &str) -> crate::config::BaseRelayRuntimeConfig { - let root = env::temp_dir().join(format!("tangle-runtime-{name}-{}", process::id())); - let _ = fs::remove_dir_all(&root); - parse_base_relay_runtime_config_json(&format!( - r#"{{ - "server": {{ - "listen_addr": "127.0.0.1:0", - "relay_url": "wss://relay.radroots.test" - }}, - "pocket": {{ - "data_directory": "{}", - "map_size_bytes": 10485760, - "reader_slots": 32, - "sync_policy": "flush_on_shutdown" - }}, - "groups": {{ - "enabled": true, - "canonical_relay_url": "wss://relay.radroots.test", - "relay_secret": "{}", - "owner_pubkeys": ["{}"] - }}, - "auth": {{ - "challenge_ttl_seconds": 300 - }}, - "limits": {{ - "max_pending_events": 8 - }} - }}"#, - root.join("pocket").display(), - "7".repeat(64), - "02".repeat(32) - )) - .expect("config") + assert!(TangleRuntimeLimits::new(0, 1, 1).is_err()); + assert!(TangleRuntimeLimits::new(1, 0, 1).is_err()); + assert!(TangleRuntimeLimits::new(1, 1, 0).is_err()); + assert!(TangleEventBus::new(0).is_err()); + } + + fn runtime_config(root: &Path, max_pending_events: usize) -> BaseRelayRuntimeConfig { + let raw = json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "wss://relay.radroots.test" + }, + "pocket": { + "data_directory": root.join("pocket"), + "map_size_bytes": 1073741824_u64, + "reader_slots": 128, + "sync_policy": "flush_on_shutdown" + }, + "groups": { + "enabled": true, + "canonical_relay_url": "wss://relay.radroots.test", + "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", + "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"] + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "max_pending_events": max_pending_events + } + }) + .to_string(); + parse_base_relay_runtime_config_json(&raw).expect("config") + } + + fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) } }