commit 670b5b59575478d57f76cc16497395ed60444e07
parent e4879ff3be7a5fbd798d4143afd23198c053122f
Author: triesap <tyson@radroots.org>
Date: Fri, 19 Jun 2026 16:47:51 -0700
runtime: add relay event hooks
- expose neutral event admission and stored-event hook contexts
- keep default relay runtime behavior unchanged with noop hooks
- cover hook rejection and stored-offset notification in runtime tests
Diffstat:
1 file changed, 337 insertions(+), 3 deletions(-)
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -32,6 +32,7 @@ use std::{
fmt, fs,
net::IpAddr,
path::Path,
+ str,
sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
@@ -57,6 +58,7 @@ pub struct RelayRuntime {
rate_limiter: TangleRateLimiter,
metrics: TangleRuntimeMetrics,
shutdown: TangleShutdownSignal,
+ hooks: Arc<dyn RelayRuntimeHooks>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
@@ -72,6 +74,191 @@ impl TangleClientRateLimitContext {
connection_id,
}
}
+
+ pub fn peer_ip(self) -> Option<IpAddr> {
+ self.peer_ip
+ }
+
+ pub fn connection_id(self) -> Option<u64> {
+ self.connection_id
+ }
+}
+
+pub trait RelayRuntimeHooks: Send + Sync {
+ fn admit_event(&self, _context: &RelayEventAdmissionContext) -> EventAdmissionDecision {
+ EventAdmissionDecision::Accept
+ }
+
+ fn event_stored(&self, _context: &RelayEventStoredContext) {}
+}
+
+#[derive(Debug, Default)]
+pub struct NoopRelayRuntimeHooks;
+
+impl RelayRuntimeHooks for NoopRelayRuntimeHooks {}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum EventAdmissionDecision {
+ Accept,
+ Reject { message: String },
+}
+
+impl EventAdmissionDecision {
+ pub fn reject(message: impl Into<String>) -> Self {
+ Self::Reject {
+ message: message.into(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RelayEventContext {
+ event_id: String,
+ pubkey: String,
+ kind: u32,
+ tags: Vec<Vec<String>>,
+ content: String,
+}
+
+impl RelayEventContext {
+ pub fn new(
+ event_id: String,
+ pubkey: String,
+ kind: u32,
+ tags: Vec<Vec<String>>,
+ content: String,
+ ) -> Self {
+ Self {
+ event_id,
+ pubkey,
+ kind,
+ tags,
+ content,
+ }
+ }
+
+ fn from_pocket_event(event: &PocketEvent) -> Result<Self, BaseRelayError> {
+ let tags = event
+ .tags()
+ .map_err(|error| BaseRelayError::invalid(error.to_string()))?
+ .iter()
+ .map(|tag| {
+ tag.map(|value| {
+ str::from_utf8(value)
+ .map(str::to_owned)
+ .map_err(|error| BaseRelayError::invalid(error.to_string()))
+ })
+ .collect::<Result<Vec<_>, _>>()
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ let content = str::from_utf8(event.content())
+ .map(str::to_owned)
+ .map_err(|error| BaseRelayError::invalid(error.to_string()))?;
+ Ok(Self {
+ event_id: event.id().to_string(),
+ pubkey: event.pubkey().to_string(),
+ kind: u32::from(event.kind().as_u16()),
+ tags,
+ content,
+ })
+ }
+
+ pub fn event_id(&self) -> &str {
+ &self.event_id
+ }
+
+ pub fn pubkey(&self) -> &str {
+ &self.pubkey
+ }
+
+ pub fn kind(&self) -> u32 {
+ self.kind
+ }
+
+ pub fn tags(&self) -> &[Vec<String>] {
+ &self.tags
+ }
+
+ pub fn content(&self) -> &str {
+ &self.content
+ }
+
+ pub fn has_tag(&self, name: &str, value: &str) -> bool {
+ self.tags.iter().any(|tag| {
+ tag.first().is_some_and(|tag_name| tag_name == name)
+ && tag.iter().skip(1).any(|tag_value| tag_value == value)
+ })
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RelayEventAdmissionContext {
+ event: RelayEventContext,
+ authenticated_pubkeys: Vec<String>,
+ peer_ip: Option<IpAddr>,
+ connection_id: Option<u64>,
+ now: u64,
+}
+
+impl RelayEventAdmissionContext {
+ pub fn new(
+ event: RelayEventContext,
+ authenticated_pubkeys: Vec<String>,
+ peer_ip: Option<IpAddr>,
+ connection_id: Option<u64>,
+ now: u64,
+ ) -> Self {
+ Self {
+ event,
+ authenticated_pubkeys,
+ peer_ip,
+ connection_id,
+ now,
+ }
+ }
+
+ pub fn event(&self) -> &RelayEventContext {
+ &self.event
+ }
+
+ pub fn authenticated_pubkeys(&self) -> &[String] {
+ &self.authenticated_pubkeys
+ }
+
+ pub fn peer_ip(&self) -> Option<IpAddr> {
+ self.peer_ip
+ }
+
+ pub fn connection_id(&self) -> Option<u64> {
+ self.connection_id
+ }
+
+ pub fn now(&self) -> u64 {
+ self.now
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RelayEventStoredContext {
+ event: RelayEventContext,
+ store_offsets: Vec<u64>,
+}
+
+impl RelayEventStoredContext {
+ pub fn new(event: RelayEventContext, store_offsets: Vec<u64>) -> Self {
+ Self {
+ event,
+ store_offsets,
+ }
+ }
+
+ pub fn event(&self) -> &RelayEventContext {
+ &self.event
+ }
+
+ pub fn store_offsets(&self) -> &[u64] {
+ &self.store_offsets
+ }
}
struct TanglePocketQueryRateLimitRequest<'a> {
@@ -245,6 +432,13 @@ impl TangleQueryClassifier {
impl RelayRuntime {
pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> {
+ Self::open_with_hooks(config, Arc::new(NoopRelayRuntimeHooks))
+ }
+
+ pub fn open_with_hooks(
+ config: BaseRelayRuntimeConfig,
+ hooks: Arc<dyn RelayRuntimeHooks>,
+ ) -> Result<Self, BaseRelayError> {
let limits = TangleRuntimeLimits::from_config(&config)?;
let relay = config.open_relay()?;
let readiness = BaseRelayReadinessHandle::new(relay.readiness_state());
@@ -266,6 +460,7 @@ impl RelayRuntime {
metrics,
limits,
shutdown: TangleShutdownSignal::new(),
+ hooks,
})
}
@@ -329,6 +524,7 @@ struct RelayRuntimeShared {
rate_limiter: TangleRateLimiter,
metrics: TangleRuntimeMetrics,
shutdown: TangleShutdownSignal,
+ hooks: Arc<dyn RelayRuntimeHooks>,
}
impl RelayRuntimeShared {
@@ -342,6 +538,7 @@ impl RelayRuntimeShared {
rate_limiter,
metrics,
shutdown,
+ hooks,
} = runtime;
let store = relay.store_handle();
let groups = relay.group_service_handle();
@@ -355,6 +552,7 @@ impl RelayRuntimeShared {
rate_limiter,
metrics,
shutdown,
+ hooks,
}
}
@@ -905,6 +1103,7 @@ impl RelayRuntimeHandle {
RuntimeClientMessage::Event(pocket_event) => {
let started_at = Instant::now();
let event_id = pocket_event_id(&pocket_event)?;
+ let event_context = RelayEventContext::from_pocket_event(&pocket_event)?;
let is_group_event = self.inner.is_group_event_pocket(&pocket_event);
if let Some(message) =
self.inner
@@ -921,6 +1120,29 @@ impl RelayRuntimeHandle {
record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
return Ok(vec![message.into()]);
}
+ let authenticated_pubkeys = auth
+ .authenticated_pubkeys()
+ .iter()
+ .map(ToString::to_string)
+ .collect();
+ let admission = RelayEventAdmissionContext::new(
+ event_context.clone(),
+ authenticated_pubkeys,
+ rate_limit_context.peer_ip(),
+ rate_limit_context.connection_id(),
+ now.as_u64(),
+ );
+ if let EventAdmissionDecision::Reject { message } =
+ self.inner.hooks.admit_event(&admission)
+ {
+ let message = RelayMessage::Ok {
+ event_id,
+ accepted: false,
+ message: BaseRelayError::restricted(message).prefixed_message(),
+ };
+ record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
+ return Ok(vec![message.into()]);
+ }
let result = self
.inner
.handle_pocket_event_with_auth_report(&pocket_event, auth)?;
@@ -945,6 +1167,14 @@ impl RelayRuntimeHandle {
result.stored_offsets().len(),
self.inner.metrics.stored_event_offsets(),
);
+ self.inner.hooks.event_stored(&RelayEventStoredContext::new(
+ event_context,
+ result
+ .stored_offsets()
+ .iter()
+ .map(|offset| offset.as_u64())
+ .collect(),
+ ));
}
let message = result.into_message();
record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at);
@@ -2088,9 +2318,10 @@ impl Default for TangleShutdownSignal {
#[cfg(test)]
mod tests {
use super::{
- BROAD_QUERY_TIME_WINDOW_SECONDS, RelayRuntime, RelayRuntimeHandle, RuntimeClientMessage,
- TangleBroadQueryReason, TangleClientRateLimitContext, TangleQueryClassification,
- TangleQueryClassifier, TangleRuntimeLimits,
+ BROAD_QUERY_TIME_WINDOW_SECONDS, EventAdmissionDecision, RelayEventAdmissionContext,
+ RelayEventStoredContext, RelayRuntime, RelayRuntimeHandle, RelayRuntimeHooks,
+ RuntimeClientMessage, TangleBroadQueryReason, TangleClientRateLimitContext,
+ TangleQueryClassification, TangleQueryClassifier, TangleRuntimeLimits,
};
use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver};
@@ -2104,6 +2335,7 @@ mod tests {
collections::{BTreeMap, BTreeSet},
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
+ sync::{Arc, Mutex},
time::Duration,
};
use tangle_crypto::RelaySigner;
@@ -2384,6 +2616,84 @@ mod tests {
}
#[tokio::test]
+ async fn runtime_hooks_reject_events_and_observe_stored_offsets() {
+ let root = temp_root("runtime-hooks");
+ let _ = std::fs::remove_dir_all(&root);
+ let hooks = Arc::new(RecordingHooks::default());
+ let handle = RelayRuntimeHandle::new(
+ RelayRuntime::open_with_hooks(runtime_config(&root, 8), hooks.clone())
+ .expect("runtime"),
+ );
+ let mut offsets = handle.subscribe_events().await;
+ let mut auth = handle.auth_state().await.expect("auth");
+ let rejected = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_433,
+ 1,
+ vec![Tag::from_parts("policy", &["reject"]).expect("policy")],
+ "rejected",
+ )
+ .expect("rejected event");
+
+ assert_eq!(
+ handle
+ .handle_protocol_client_message_for_test(
+ ClientMessage::Event(rejected.clone()),
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("rejected"),
+ vec![RelayMessage::Ok {
+ event_id: rejected.id().clone(),
+ accepted: false,
+ message: "restricted: hook rejected event".to_owned()
+ }]
+ );
+ assert_eq!(
+ offsets.try_recv().expect_err("no rejected offset"),
+ TangleEventReceiveError::Empty
+ );
+
+ let accepted = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_434,
+ 1,
+ vec![Tag::from_parts("policy", &["accept"]).expect("policy")],
+ "accepted",
+ )
+ .expect("accepted event");
+
+ assert_eq!(
+ handle
+ .handle_protocol_client_message_for_test(
+ ClientMessage::Event(accepted.clone()),
+ &mut auth,
+ UnixTimestamp::new(1_714_124_434)
+ )
+ .await
+ .expect("accepted"),
+ vec![RelayMessage::Ok {
+ event_id: accepted.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ assert!(offsets.try_recv().is_ok());
+ let admissions = hooks.admissions.lock().expect("admissions");
+ assert_eq!(admissions.len(), 2);
+ assert_eq!(admissions[0].event().event_id(), rejected.id().as_str());
+ assert_eq!(admissions[1].event().event_id(), accepted.id().as_str());
+ drop(admissions);
+ let stored = hooks.stored.lock().expect("stored");
+ assert_eq!(stored.len(), 1);
+ assert_eq!(stored[0].event().event_id(), accepted.id().as_str());
+ assert_eq!(stored[0].store_offsets().len(), 1);
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn runtime_rate_limits_event_pubkeys_before_storage() {
let root = temp_root("runtime-event-rate-limit");
let _ = std::fs::remove_dir_all(&root);
@@ -4525,6 +4835,30 @@ mod tests {
runtime_config_with_group_policy(root, per_connection_outbound_queue, true)
}
+ #[derive(Default)]
+ struct RecordingHooks {
+ admissions: Mutex<Vec<RelayEventAdmissionContext>>,
+ stored: Mutex<Vec<RelayEventStoredContext>>,
+ }
+
+ impl RelayRuntimeHooks for RecordingHooks {
+ fn admit_event(&self, context: &RelayEventAdmissionContext) -> EventAdmissionDecision {
+ self.admissions
+ .lock()
+ .expect("admissions")
+ .push(context.clone());
+ if context.event().has_tag("policy", "reject") {
+ EventAdmissionDecision::reject("hook rejected event")
+ } else {
+ EventAdmissionDecision::Accept
+ }
+ }
+
+ fn event_stored(&self, context: &RelayEventStoredContext) {
+ self.stored.lock().expect("stored").push(context.clone());
+ }
+ }
+
fn runtime_config_with_group_policy(
root: &Path,
per_connection_outbound_queue: usize,