tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit e7135aec4465f94152729da628ab01149769f28c
parent bfaa2cc638ef740b28ab42fb246a2a792e4805ba
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 19:11:58 -0700

runtime: admit events through Pocket storage

- add Pocket-native relay event admission APIs
- route runtime EVENT dispatch and rate limits through Pocket events
- store public non-group events without protocol event rebuilds
- quarantine protocol event admission behind test-only adapters

Diffstat:
Mcrates/tangle_bench/src/lib.rs | 12++++++++----
Mcrates/tangle_runtime/src/pocket_event_validation.rs | 7+++++--
Mcrates/tangle_runtime/src/relay/core.rs | 278+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/src/runtime.rs | 157+++++++++++++++++++++++++++++++++----------------------------------------------
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 29+++++++++++++++++++++++++++++
Mcrates/tangle_runtime/tests/ops_truthfulness.rs | 34++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 38++++++++++++++++++++++++++++++++++----
7 files changed, 445 insertions(+), 110 deletions(-)

diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -20,7 +20,8 @@ use tangle_runtime::{ runtime::{TangleRuntime, TangleRuntimeHandle}, }; use tangle_store_pocket::{ - PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, parse_pocket_filter_json, + PocketQueryConfig, PocketStoreConfig, PocketSyncPolicy, parse_pocket_event_json, + parse_pocket_filter_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_config, @@ -1465,15 +1466,18 @@ fn materialize_dataset( let mut rejected = 0; for source in dataset.source_events() { let sample = Instant::now(); + let pocket_event = + parse_pocket_event_json(event_to_value(source.event()).to_string().as_bytes()) + .map_err(|error| error.to_string())?; let message = match source.auth() { BenchEventAuth::None => relay - .handle_event(source.event().clone()) + .handle_pocket_event(&pocket_event) .map_err(|error| error.to_string())?, BenchEventAuth::Owner => relay - .handle_event_with_auth(source.event().clone(), &owner_auth) + .handle_pocket_event_with_auth(&pocket_event, &owner_auth) .map_err(|error| error.to_string())?, BenchEventAuth::Admin => relay - .handle_event_with_auth(source.event().clone(), &admin_auth) + .handle_pocket_event_with_auth(&pocket_event, &admin_auth) .map_err(|error| error.to_string())?, }; samples.push(elapsed_micros(sample)); diff --git a/crates/tangle_runtime/src/pocket_event_validation.rs b/crates/tangle_runtime/src/pocket_event_validation.rs @@ -105,7 +105,7 @@ mod tests { pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, }; use crate::pocket_conversion::tangle_event_to_pocket; - use tangle_protocol::{Event, EventId, SignatureHex, Tag, event_to_value}; + use tangle_protocol::{Event, EventId, Tag, event_to_value}; use tangle_store_pocket::parse_pocket_event_json; use tangle_test_support::{FixtureKey, tangle_v2_event}; @@ -117,10 +117,13 @@ mod tests { assert_eq!(verify_pocket_event_signature(&pocket), Ok(())); + let signature_source = + tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 1, Vec::new(), "hello") + .expect("signature source"); let wrong_signature = Event::new( event.id().clone(), event.unsigned().clone(), - SignatureHex::new(&"0".repeat(128)).expect("sig"), + signature_source.sig().clone(), ); let wrong_pocket = tangle_event_to_pocket(&wrong_signature).expect("wrong pocket"); assert_eq!( diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -4,12 +4,12 @@ use crate::groups::{ }; use crate::logging::{self, TangleModerationAuditResult}; use crate::ops::BaseRelayReadinessState; -use crate::pocket_conversion::{ - pocket_event_id, pocket_event_to_tangle, pocket_pubkey, tangle_event_to_pocket, - tangle_filter_to_pocket, -}; +#[cfg(test)] +use crate::pocket_conversion::{pocket_event_id, tangle_event_to_pocket}; +use crate::pocket_conversion::{pocket_event_to_tangle, pocket_pubkey, tangle_filter_to_pocket}; use crate::pocket_event_validation::{ - pocket_event_id as pocket_runtime_event_id, validate_pocket_event_shape, + is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind, + pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, }; use crate::relay::{ auth::BaseAuthState, @@ -19,12 +19,15 @@ use std::{ cell::{Cell, RefCell}, collections::BTreeSet, }; +#[cfg(test)] use tangle_crypto::verify_event_signature; use tangle_groups::{ GroupAuthContext, GroupEventClass, GroupEventView, GroupRuntimeConfig, StoreOffset, classify_group_event, validate_client_group_event_structure, }; -use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; +#[cfg(test)] +use tangle_protocol::ClientMessage; +use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; use tangle_store_pocket::{ PocketEvent, PocketHll8, PocketQueryConfig, PocketScreenResult, PocketStoreConfig, PocketStoreHandle, @@ -234,6 +237,7 @@ enum BaseRelayFilterLimitMode { PreserveCountLimitless, } +#[cfg(test)] fn is_nip70_protected_event(event: &Event) -> bool { event .unsigned() @@ -591,6 +595,7 @@ impl BaseRelay { }) } + #[cfg(test)] pub fn handle_client_message( &mut self, message: ClientMessage, @@ -687,6 +692,7 @@ impl BaseRelay { ) } + #[cfg(test)] fn handle_auth_message( &self, event: Event, @@ -696,6 +702,7 @@ impl BaseRelay { Self::handle_auth_with_limits(self.limits, event, auth, now) } + #[cfg(test)] pub(crate) fn handle_auth_with_limits( limits: BaseRelayLimits, event: Event, @@ -758,11 +765,13 @@ impl BaseRelay { }) } + #[cfg(test)] pub fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated()) .map(BaseRelayEventWrite::into_message) } + #[cfg(test)] pub fn handle_event_with_auth( &self, event: Event, @@ -772,6 +781,21 @@ impl BaseRelay { .map(BaseRelayEventWrite::into_message) } + pub fn handle_pocket_event(&self, event: &PocketEvent) -> Result<RelayMessage, BaseRelayError> { + self.handle_pocket_event_with_group_auth(event, &GroupAuthContext::unauthenticated()) + .map(BaseRelayEventWrite::into_message) + } + + pub fn handle_pocket_event_with_auth( + &self, + event: &PocketEvent, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + self.handle_pocket_event_with_auth_report(event, auth) + .map(BaseRelayEventWrite::into_message) + } + + #[cfg(test)] pub(crate) fn handle_event_with_auth_report( &self, event: Event, @@ -786,6 +810,7 @@ impl BaseRelay { ) } + #[cfg(test)] pub(crate) fn handle_event_with_shared_services( store: &PocketStoreHandle, groups: Option<&GroupServiceHandle>, @@ -802,6 +827,36 @@ impl BaseRelay { ) } + pub(crate) fn handle_pocket_event_with_auth_report( + &self, + event: &PocketEvent, + auth: &BaseAuthState, + ) -> Result<BaseRelayEventWrite, BaseRelayError> { + Self::handle_pocket_event_with_shared_services( + &self.store, + self.groups.as_ref(), + self.limits, + event, + auth, + ) + } + + pub(crate) fn handle_pocket_event_with_shared_services( + store: &PocketStoreHandle, + groups: Option<&GroupServiceHandle>, + limits: BaseRelayLimits, + event: &PocketEvent, + auth: &BaseAuthState, + ) -> Result<BaseRelayEventWrite, BaseRelayError> { + Self::handle_pocket_event_with_group_auth_and_services( + store, + groups, + limits, + event, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + } + pub fn groups_enabled(&self) -> bool { self.groups.is_some() } @@ -835,6 +890,7 @@ impl BaseRelay { Ok(BaseRelayShutdownReport::new(closed)) } + #[cfg(test)] fn handle_event_with_group_auth( &self, event: Event, @@ -849,6 +905,7 @@ impl BaseRelay { ) } + #[cfg(test)] fn handle_event_with_group_auth_and_services( store: &PocketStoreHandle, groups: Option<&GroupServiceHandle>, @@ -965,6 +1022,137 @@ impl BaseRelay { )) } + fn handle_pocket_event_with_group_auth( + &self, + event: &PocketEvent, + auth: &GroupAuthContext, + ) -> Result<BaseRelayEventWrite, BaseRelayError> { + Self::handle_pocket_event_with_group_auth_and_services( + &self.store, + self.groups.as_ref(), + self.limits, + event, + auth, + ) + } + + fn handle_pocket_event_with_group_auth_and_services( + store: &PocketStoreHandle, + groups: Option<&GroupServiceHandle>, + limits: BaseRelayLimits, + event: &PocketEvent, + auth: &GroupAuthContext, + ) -> Result<BaseRelayEventWrite, BaseRelayError> { + let event_id = pocket_runtime_event_id(event)?; + if let Err(error) = limits.validate_pocket_event(event) { + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + error.prefixed_message(), + ))); + } + if let Err(error) = verify_pocket_event_signature(event) { + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + error.prefixed_message(), + ))); + } + let pubkey = pocket_event_pubkey(event)?; + if is_pocket_nip70_protected_event(event)? && !auth.contains(&pubkey) { + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + BaseRelayError::auth_required( + "protected event requires authenticated event author", + ) + .prefixed_message(), + ))); + } + let group_limits = groups.map(GroupServiceHandle::limits).unwrap_or_default(); + let audit_class = classify_group_event(event, group_limits).ok(); + let class = match validate_client_group_event_structure(event, group_limits) { + Ok(class) => class, + Err(error) => { + if let Some(class) = audit_class.as_ref() { + log_pocket_group_moderation_audit( + event, + class, + TangleModerationAuditResult::Rejected, + )?; + } + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + error.prefixed_message(), + ))); + } + }; + if !matches!(class, GroupEventClass::NonGroup) { + let tangle_event = pocket_event_to_tangle(event)?; + let Some(groups) = groups else { + logging::log_group_moderation_audit( + &tangle_event, + &class, + TangleModerationAuditResult::Rejected, + ); + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + "blocked: NIP-29 group events are not accepted before group service".to_owned(), + ))); + }; + match groups.store_group_event(store, &tangle_event, &class, auth) { + Ok(GroupEventWrite::Stored(stored_offsets)) => { + logging::log_group_moderation_audit( + &tangle_event, + &class, + TangleModerationAuditResult::Accepted, + ); + return Ok(BaseRelayEventWrite::stored( + ok_accepted(event_id, String::new()), + stored_offsets, + )); + } + Ok(GroupEventWrite::Duplicate) => { + logging::log_group_moderation_audit( + &tangle_event, + &class, + TangleModerationAuditResult::Accepted, + ); + return Ok(BaseRelayEventWrite::unstored(ok_accepted( + event_id, + "duplicate: already have this event".to_owned(), + ))); + } + Err(GroupEventWriteError::Rejected(error)) => { + logging::log_group_moderation_audit( + &tangle_event, + &class, + TangleModerationAuditResult::Rejected, + ); + return Ok(BaseRelayEventWrite::unstored(ok_rejected( + event_id, + error.prefixed_message(), + ))); + } + Err(GroupEventWriteError::Storage(error)) => return Err(error), + } + } + if pocket_event_kind(event)?.is_ephemeral() { + return Ok(BaseRelayEventWrite::unstored(ok_accepted( + event_id, + String::new(), + ))); + } + if store.event_by_id(event.id())?.is_some() { + return Ok(BaseRelayEventWrite::unstored(ok_accepted( + event_id, + "duplicate: already have this event".to_owned(), + ))); + } + let store_offset = StoreOffset::new(store.store_event(event)?); + Ok(BaseRelayEventWrite::stored( + ok_accepted(event_id, String::new()), + vec![store_offset], + )) + } + pub fn handle_req( &mut self, subscription_id: SubscriptionId, @@ -1446,6 +1634,16 @@ fn filters_are_complete(filters: &[Filter]) -> bool { !filters.is_empty() && filters.iter().all(Filter::is_complete) } +fn log_pocket_group_moderation_audit( + event: &PocketEvent, + class: &GroupEventClass, + result: TangleModerationAuditResult, +) -> Result<(), BaseRelayError> { + let event = pocket_event_to_tangle(event)?; + logging::log_group_moderation_audit(&event, class, result); + Ok(()) +} + #[cfg(test)] mod tests { use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE}; @@ -2410,6 +2608,74 @@ mod tests { } #[test] + fn base_relay_pocket_event_path_preserves_event_admission_behavior() { + let relay = test_relay("base-relay-pocket-event-store-path", 8); + let valid = signed_public_event(7, 1, Vec::new(), "valid"); + let signature_source = signed_public_event(8, 1, Vec::new(), "signature source"); + let invalid = Event::new( + valid.id().clone(), + valid.unsigned().clone(), + signature_source.sig().clone(), + ); + let ephemeral = signed_public_event(7, 20_001, Vec::new(), "ephemeral"); + let protected = signed_public_event( + 7, + 1, + vec![Tag::from_parts("-", &[]).expect("protected")], + "protected", + ); + let valid_pocket = tangle_event_to_pocket(&valid).expect("valid pocket"); + let invalid_pocket = tangle_event_to_pocket(&invalid).expect("invalid pocket"); + let ephemeral_pocket = tangle_event_to_pocket(&ephemeral).expect("ephemeral pocket"); + let protected_pocket = tangle_event_to_pocket(&protected).expect("protected pocket"); + + assert_eq!( + rejected_message(relay.handle_pocket_event(&invalid_pocket).expect("invalid")), + "invalid: event signature verification failed" + ); + assert_eq!(count_kind(&relay, 1), 0); + + assert_accepted( + relay + .handle_pocket_event(&valid_pocket) + .expect("valid pocket"), + &valid, + ); + assert_eq!( + relay.handle_pocket_event(&valid_pocket).expect("duplicate"), + RelayMessage::Ok { + event_id: valid.id().clone(), + accepted: true, + message: "duplicate: already have this event".to_owned() + } + ); + assert_eq!(count_kind(&relay, 1), 1); + + assert_accepted( + relay + .handle_pocket_event(&ephemeral_pocket) + .expect("ephemeral"), + &ephemeral, + ); + assert_eq!(count_kind(&relay, 20_001), 0); + + assert_eq!( + rejected_message( + relay + .handle_pocket_event(&protected_pocket) + .expect("protected") + ), + "auth-required: protected event requires authenticated event author" + ); + assert_accepted( + relay + .handle_pocket_event_with_auth(&protected_pocket, &authenticated_state(7)) + .expect("protected auth"), + &protected, + ); + } + + #[test] fn group_write_source_uses_atomic_service_boundary() { let core_source = include_str!("core.rs"); let group_source = include_str!("../groups.rs"); diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -9,9 +9,7 @@ use crate::{ logging, ops::{BaseRelayReadinessHandle, BaseRelayReadinessState}, pocket_conversion::{pocket_event_to_tangle, pocket_filter_to_tangle}, - pocket_event_validation::{ - is_pocket_nip70_protected_event, pocket_event_id, pocket_event_pubkey, - }, + pocket_event_validation::{pocket_event_id, pocket_event_kind, pocket_event_pubkey}, rate_limits::{ TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter, @@ -371,38 +369,38 @@ impl TangleRuntimeShared { } } - fn rate_limit_event( + fn rate_limit_event_pocket( &self, - event: &Event, + event: &PocketEvent, context: TangleClientRateLimitContext, now: UnixTimestamp, - ) -> Option<RelayMessage> { + ) -> Result<Option<RelayMessage>, BaseRelayError> { let rules = self.config.rate_limits().event(); if let Some(peer_ip) = context.peer_ip - && let Some(message) = self.rate_limit_ok( + && let Some(message) = self.rate_limit_ok_pocket( event, TangleRateLimitKey::ip(TangleRateLimitScope::Event, peer_ip), rules.per_ip(), "event ip", now, - ) + )? { - return Some(message); + return Ok(Some(message)); } - self.rate_limit_ok( + self.rate_limit_ok_pocket( event, - TangleRateLimitKey::pubkey( - TangleRateLimitScope::Event, - event.unsigned().pubkey().clone(), - ), + TangleRateLimitKey::pubkey(TangleRateLimitScope::Event, pocket_event_pubkey(event)?), rules.per_pubkey(), "event pubkey", now, ) - .or_else(|| { - self.rate_limit_ok( + .and_then(|message| { + if message.is_some() { + return Ok(message); + } + self.rate_limit_ok_pocket( event, - TangleRateLimitKey::kind(TangleRateLimitScope::Event, event.unsigned().kind()), + TangleRateLimitKey::kind(TangleRateLimitScope::Event, pocket_event_kind(event)?), rules.per_kind(), "event kind", now, @@ -464,94 +462,98 @@ impl TangleRuntimeShared { ) } - fn rate_limit_group_write( + fn rate_limit_group_write_pocket( &self, - event: &Event, + event: &PocketEvent, context: TangleClientRateLimitContext, now: UnixTimestamp, - ) -> Option<RelayMessage> { + ) -> Result<Option<RelayMessage>, BaseRelayError> { if !self.config.groups().enabled() { - return None; + return Ok(None); } let class = - validate_client_group_event_structure(event, self.config.groups().limits()).ok()?; - let group_id = class.group_id()?.clone(); + validate_client_group_event_structure(event, self.config.groups().limits()).ok(); + let Some(class) = class else { + return Ok(None); + }; + let Some(group_id) = class.group_id().cloned() else { + return Ok(None); + }; let rules = self.config.rate_limits().group(); - if event.unsigned().kind().as_u32() == KIND_GROUP_JOIN_REQUEST { + let kind = pocket_event_kind(event)?; + let pubkey = pocket_event_pubkey(event)?; + if kind.as_u32() == KIND_GROUP_JOIN_REQUEST { if let Some(peer_ip) = context.peer_ip - && let Some(message) = self.rate_limit_ok( + && let Some(message) = self.rate_limit_ok_pocket( event, TangleRateLimitKey::join_flow_ip(group_id.clone(), peer_ip), rules.join_flow_per_ip(), "group join ip", now, - ) + )? { - return Some(message); + return Ok(Some(message)); } - if let Some(message) = self.rate_limit_ok( + if let Some(message) = self.rate_limit_ok_pocket( event, - TangleRateLimitKey::join_flow(group_id.clone(), event.unsigned().pubkey().clone()), + TangleRateLimitKey::join_flow(group_id.clone(), pubkey.clone()), rules.join_flow(), "group join", now, - ) { - return Some(message); + )? { + return Ok(Some(message)); } } if let Some(peer_ip) = context.peer_ip - && let Some(message) = self.rate_limit_ok( + && let Some(message) = self.rate_limit_ok_pocket( event, TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip), rules.write_per_ip(), "group ip", now, - ) + )? { - return Some(message); + return Ok(Some(message)); } - if let Some(message) = self.rate_limit_ok( + if let Some(message) = self.rate_limit_ok_pocket( event, - TangleRateLimitKey::pubkey( - TangleRateLimitScope::GroupWrite, - event.unsigned().pubkey().clone(), - ), + TangleRateLimitKey::pubkey(TangleRateLimitScope::GroupWrite, pubkey), rules.write_per_pubkey(), "group pubkey", now, - ) { - return Some(message); + )? { + return Ok(Some(message)); } - if let Some(message) = self.rate_limit_ok( + if let Some(message) = self.rate_limit_ok_pocket( event, TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id), rules.write_per_group(), "group write", now, - ) { - return Some(message); + )? { + return Ok(Some(message)); } - self.rate_limit_ok( + self.rate_limit_ok_pocket( event, - TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, event.unsigned().kind()), + TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, kind), rules.write_per_kind(), "group kind", now, ) } - fn is_group_event(&self, event: &Event) -> bool { + fn is_group_event_pocket(&self, event: &PocketEvent) -> bool { self.config.groups().enabled() && validate_client_group_event_structure(event, self.config.groups().limits()) .is_ok_and(|class| !matches!(class, GroupEventClass::NonGroup)) } - fn handle_event_with_auth_report( + fn handle_pocket_event_with_auth_report( &self, - event: Event, + event: &PocketEvent, auth: &BaseAuthState, ) -> Result<BaseRelayEventWrite, BaseRelayError> { - BaseRelay::handle_event_with_shared_services( + BaseRelay::handle_pocket_event_with_shared_services( &self.store, self.groups.as_ref(), self.limits.base_relay_limits(), @@ -765,31 +767,6 @@ impl TangleRuntimeShared { } } - fn rate_limit_ok( - &self, - event: &Event, - key: TangleRateLimitKey, - rule: TangleRateLimitRule, - label: &'static str, - now: UnixTimestamp, - ) -> Option<RelayMessage> { - match self.rate_limiter.record(key, rule, now) { - TangleRateLimitDecision::Allowed { .. } => None, - TangleRateLimitDecision::Rejected { reset_at } => { - self.metrics.record_rate_limit_rejection(); - logging::log_rate_limit_rejected(label, "event", reset_at); - Some(RelayMessage::Ok { - event_id: event.id().clone(), - accepted: false, - message: BaseRelayError::rate_limited(format!( - "{label} rate limit exceeded until {reset_at}" - )) - .prefixed_message(), - }) - } - } - } - fn rate_limit_ok_pocket( &self, event: &PocketEvent, @@ -920,31 +897,27 @@ impl TangleRuntimeHandle { .record_client_message(runtime_client_message_metric_kind(&message)); match message { RuntimeClientMessage::Event(pocket_event) => { - let event = pocket_event_to_tangle(&pocket_event)?; - debug_assert_eq!( - is_pocket_nip70_protected_event(&pocket_event)?, - event - .unsigned() - .tags() - .iter() - .any(|tag| tag.name().as_str() == "-") - ); let started_at = Instant::now(); - let event_id = event.id().clone(); - let is_group_event = self.inner.is_group_event(&event); - if let Some(message) = self.inner.rate_limit_event(&event, rate_limit_context, now) + let event_id = pocket_event_id(&pocket_event)?; + let is_group_event = self.inner.is_group_event_pocket(&pocket_event); + if let Some(message) = + self.inner + .rate_limit_event_pocket(&pocket_event, rate_limit_context, now)? { record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); return Ok(vec![message]); } - if let Some(message) = - self.inner - .rate_limit_group_write(&event, rate_limit_context, now) - { + if let Some(message) = self.inner.rate_limit_group_write_pocket( + &pocket_event, + rate_limit_context, + now, + )? { record_event_metrics(&self.inner.metrics, &message, is_group_event, started_at); return Ok(vec![message]); } - let result = self.inner.handle_event_with_auth_report(event, auth)?; + let result = self + .inner + .handle_pocket_event_with_auth_report(&pocket_event, auth)?; let group_outbox_pending_events = is_group_event.then(|| self.inner.group_outbox_pending_events()); if is_group_event { diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -17,6 +17,7 @@ use tangle_protocol::{ }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + errors::BaseRelayError, groups::{GroupCheckpointStatus, validate_group_extra_tables}, nip11::BaseRelayInfoConfig, relay::{ @@ -39,6 +40,34 @@ use tangle_test_support::{ tangle_v2_remove_user_event, tangle_v2_tag, }; +trait BaseRelayEventTestExt { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError>; + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError>; +} + +impl BaseRelayEventTestExt for BaseRelay { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event(&pocket) + } + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event_with_auth(&pocket, auth) + } +} + #[test] fn public_relay_smoke_stores_queries_counts_and_fans_out() { let config = test_store_config("public-smoke"); diff --git a/crates/tangle_runtime/tests/ops_truthfulness.rs b/crates/tangle_runtime/tests/ops_truthfulness.rs @@ -2,21 +2,51 @@ use serde_json::json; use std::path::{Path, PathBuf}; -use tangle_protocol::{RelayMessage, Tag, UnixTimestamp}; +use tangle_protocol::{Event, RelayMessage, Tag, UnixTimestamp, event_to_value}; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + errors::BaseRelayError, logging::{TANGLE_LOG_REDACTED, TangleLogRedactor}, nip11::BaseRelayInfoConfig, ops::BaseRelayReadinessCheckStatus, rate_limits::{TangleRateLimitKey, TangleRateLimitScope, TangleRateLimiter}, - relay::auth::BaseAuthState, + relay::{auth::BaseAuthState, core::BaseRelay}, runtime::TangleRuntime, }; +use tangle_store_pocket::parse_pocket_event_json; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_event, }; +trait BaseRelayEventTestExt { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError>; + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError>; +} + +impl BaseRelayEventTestExt for BaseRelay { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event(&pocket) + } + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event_with_auth(&pocket, auth) + } +} + #[test] fn operations_surfaces_match_enforced_runtime_contracts() { let root = temp_root("ops-truthfulness"); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -23,14 +23,15 @@ use tangle_protocol::{ }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + errors::BaseRelayError, nip11::BaseRelayInfoConfig, - relay::auth::BaseAuthState, + relay::{auth::BaseAuthState, core::BaseRelay}, runtime::TangleRuntime, server::serve_listener_until_shutdown, }; use tangle_store_pocket::{ PocketStoreConfig, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, - TANGLE_GROUP_PROJECTION_TABLE, + TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, @@ -40,6 +41,34 @@ use tangle_test_support::{ use tokio::{net::TcpListener, time::timeout}; use tokio_tungstenite::tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest}; +trait BaseRelayEventTestExt { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError>; + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError>; +} + +impl BaseRelayEventTestExt for BaseRelay { + fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event(&pocket) + } + + fn handle_event_with_auth( + &self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + self.handle_pocket_event_with_auth(&pocket, auth) + } +} + #[tokio::test] async fn tangle_run_serves_until_shutdown() { let root = temp_root("acceptance-server"); @@ -1580,7 +1609,8 @@ fn runtime_event_handling_does_not_lock_relay_state() { .expect("req branch"); assert!(!event_branch.contains("relay.lock().await")); - assert!(event_branch.contains("self.inner.handle_event_with_auth_report(event, auth)?")); + assert!(!event_branch.contains("pocket_event_to_tangle(&pocket_event)?")); + assert!(event_branch.contains("handle_pocket_event_with_auth_report(&pocket_event, auth)?")); } #[test] @@ -1661,7 +1691,7 @@ fn runtime_shared_shell_does_not_keep_transitional_base_relay_mutex() { assert!(!runtime.contains("Mutex<BaseRelay>")); assert!(!runtime.contains("relay.lock().await")); assert!(!shared_shell.contains("relay:")); - assert!(handle_impl.contains("BaseRelay::handle_auth_with_limits")); + assert!(handle_impl.contains("BaseRelay::handle_pocket_auth_with_limits")); assert!(handle_impl.contains("self.inner.store.sync()?")); }