tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit e168d97755148343affcd3fd75138db3a6708b2e
parent 350bf9d13a9b5f0eb1abec90acefd1151d6b9505
Author: triesap <tyson@radroots.org>
Date:   Tue, 16 Jun 2026 00:47:30 -0700

runtime: enforce Pocket source invariants

- gate protocol event conversions behind test-only adapters
- return runtime messages from Pocket query and fanout paths
- move auth and projection tests onto Pocket event views
- add a production source invariant for runtime and groups

Diffstat:
Acrates/tangle/tests/source_invariant.rs | 397+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_bench/src/lib.rs | 10++++++----
Mcrates/tangle_groups/src/event_view.rs | 16++++++++++++++--
Mcrates/tangle_runtime/src/groups.rs | 44--------------------------------------------
Mcrates/tangle_runtime/src/logging.rs | 16++++++++++------
Mcrates/tangle_runtime/src/pocket_conversion.rs | 12++++++++----
Mcrates/tangle_runtime/src/relay/auth.rs | 10++++++++--
Mcrates/tangle_runtime/src/relay/core.rs | 157++++++++++++-------------------------------------------------------------------
Mcrates/tangle_runtime/src/relay/outbound.rs | 41++++++++++++++++++++++++++++-------------
Mcrates/tangle_runtime/src/runtime.rs | 10++++++----
Mcrates/tangle_runtime/src/session.rs | 2+-
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 134++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Mcrates/tangle_runtime/tests/ops_truthfulness.rs | 13++++++++++++-
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 67++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
14 files changed, 665 insertions(+), 264 deletions(-)

diff --git a/crates/tangle/tests/source_invariant.rs b/crates/tangle/tests/source_invariant.rs @@ -0,0 +1,397 @@ +#![forbid(unsafe_code)] + +use std::fs; +use std::path::{Path, PathBuf}; + +#[test] +fn production_runtime_and_group_source_keeps_pocket_native_boundary() { + let workspace_root = workspace_root(); + let mut files = Vec::new(); + collect_rust_files( + &workspace_root.join("crates/tangle_runtime/src"), + &mut files, + ); + collect_rust_files(&workspace_root.join("crates/tangle_groups/src"), &mut files); + files.sort(); + let mut violations = Vec::new(); + for path in files { + let source = fs::read_to_string(&path).expect("source file"); + let production = production_source(&source); + collect_forbidden_imports(&workspace_root, &path, &production, &mut violations); + collect_forbidden_paths(&workspace_root, &path, &production, &mut violations); + collect_forbidden_identifiers(&workspace_root, &path, &production, &mut violations); + } + assert!( + violations.is_empty(), + "production Pocket-native source invariants failed:\n{}", + violations.join("\n") + ); +} + +#[test] +fn scanner_removes_test_gated_items_without_removing_production_items() { + let source = [ + "#[cfg(test)]\n", + "use tangle_protocol::{Event, Filter};\n", + "fn production() {}\n", + "#[cfg(test)]\n", + "fn test_only() { let value = \"}\"; }\n", + "fn production_two() {}\n", + ] + .concat(); + let production = production_source(&source); + assert!(!production.contains("Event")); + assert!(!production.contains("test_only")); + assert!(production.contains("production()")); + assert!(production.contains("production_two()")); +} + +fn workspace_root() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(Path::parent) + .expect("workspace root") + .to_path_buf() +} + +fn collect_rust_files(path: &Path, files: &mut Vec<PathBuf>) { + for entry in fs::read_dir(path).expect("source directory") { + let path = entry.expect("source entry").path(); + if path.is_dir() { + collect_rust_files(&path, files); + } else if path.extension().is_some_and(|extension| extension == "rs") { + files.push(path); + } + } +} + +fn collect_forbidden_imports( + workspace_root: &Path, + path: &Path, + source: &str, + violations: &mut Vec<String>, +) { + for block in protocol_use_blocks(source) { + for ident in ["Event", "UnsignedEvent", "Tag", "Filter"] { + if contains_identifier(block.text, ident) { + violations.push(violation(workspace_root, path, source, block.offset, ident)); + } + } + } +} + +fn collect_forbidden_paths( + workspace_root: &Path, + path: &Path, + source: &str, + violations: &mut Vec<String>, +) { + for ident in ["Event", "UnsignedEvent", "Tag", "Filter"] { + let needle = ["tangle_protocol::", ident].concat(); + for offset in identifier_occurrences(source, &needle) { + violations.push(violation(workspace_root, path, source, offset, &needle)); + } + } +} + +fn collect_forbidden_identifiers( + workspace_root: &Path, + path: &Path, + source: &str, + violations: &mut Vec<String>, +) { + for ident in [ + "pocket_event_to_tangle", + "tangle_event_to_pocket", + "RuntimeRelayMessage::into_protocol_message", + "protocol_messages", + ] { + for offset in identifier_occurrences(source, ident) { + violations.push(violation(workspace_root, path, source, offset, ident)); + } + } +} + +#[derive(Debug, Clone, Copy)] +struct UseBlock<'a> { + offset: usize, + text: &'a str, +} + +fn protocol_use_blocks(source: &str) -> Vec<UseBlock<'_>> { + let mut output = Vec::new(); + let mut cursor = 0; + while let Some(relative) = source[cursor..].find("use tangle_protocol::") { + let start = cursor + relative; + let Some(end) = statement_end(source, start) else { + break; + }; + output.push(UseBlock { + offset: start, + text: &source[start..end], + }); + cursor = end; + } + output +} + +fn production_source(source: &str) -> String { + let mut output = String::new(); + let mut scan_cursor = 0; + let mut copy_start = 0; + while let Some((line_start, line_end, line)) = next_line(source, scan_cursor) { + if line.trim() == "#[cfg(test)]" { + output.push_str(&source[copy_start..line_start]); + let item_start = next_item_start(source, line_end); + let item_end = cfg_item_end(source, item_start).unwrap_or(source.len()); + for character in source[line_start..item_end].chars() { + if character == '\n' { + output.push('\n'); + } + } + copy_start = item_end; + scan_cursor = item_end; + } else { + scan_cursor = line_end; + } + } + output.push_str(&source[copy_start..]); + output +} + +fn next_item_start(source: &str, mut cursor: usize) -> usize { + while let Some((line_start, line_end, line)) = next_line(source, cursor) { + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with("#[") { + cursor = line_end; + } else { + return line_start; + } + } + source.len() +} + +fn cfg_item_end(source: &str, item_start: usize) -> Option<usize> { + let mut cursor = item_start; + while cursor < source.len() { + if let Some(end) = raw_string_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + if let Some(end) = regular_string_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + if let Some(end) = char_literal_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + match source.as_bytes()[cursor] { + b';' => return Some(include_trailing_newline(source, cursor + 1)), + b'{' => { + return Some(include_trailing_newline( + source, + matching_brace_end(source, cursor), + )); + } + _ => cursor += 1, + } + } + None +} + +fn statement_end(source: &str, start: usize) -> Option<usize> { + let mut cursor = start; + while cursor < source.len() { + if let Some(end) = raw_string_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + if let Some(end) = regular_string_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + if let Some(end) = char_literal_end(source.as_bytes(), cursor) { + cursor = end; + continue; + } + if source.as_bytes()[cursor] == b';' { + return Some(cursor + 1); + } + cursor += 1; + } + None +} + +fn matching_brace_end(source: &str, start: usize) -> usize { + let bytes = source.as_bytes(); + let mut depth = 0usize; + let mut cursor = start; + while cursor < bytes.len() { + if let Some(end) = raw_string_end(bytes, cursor) { + cursor = end; + continue; + } + if let Some(end) = regular_string_end(bytes, cursor) { + cursor = end; + continue; + } + if let Some(end) = char_literal_end(bytes, cursor) { + cursor = end; + continue; + } + match bytes[cursor] { + b'{' => depth += 1, + b'}' => { + depth -= 1; + if depth == 0 { + return cursor + 1; + } + } + _ => {} + } + cursor += 1; + } + source.len() +} + +fn include_trailing_newline(source: &str, cursor: usize) -> usize { + if source.as_bytes().get(cursor) == Some(&b'\n') { + cursor + 1 + } else { + cursor + } +} + +fn next_line(source: &str, start: usize) -> Option<(usize, usize, &str)> { + if start >= source.len() { + return None; + } + let end = source[start..] + .find('\n') + .map(|relative| start + relative + 1) + .unwrap_or(source.len()); + Some((start, end, &source[start..end])) +} + +fn identifier_occurrences(source: &str, needle: &str) -> Vec<usize> { + let mut offsets = Vec::new(); + let mut cursor = 0; + while let Some(relative) = source[cursor..].find(needle) { + let offset = cursor + relative; + if !is_identifier_byte(source.as_bytes().get(offset.wrapping_sub(1)).copied()) + && !is_identifier_byte(source.as_bytes().get(offset + needle.len()).copied()) + { + offsets.push(offset); + } + cursor = offset + needle.len(); + } + offsets +} + +fn contains_identifier(source: &str, needle: &str) -> bool { + !identifier_occurrences(source, needle).is_empty() +} + +fn violation( + workspace_root: &Path, + path: &Path, + source: &str, + offset: usize, + needle: &str, +) -> String { + let (line, column) = line_column(source, offset); + format!( + "{}:{line}:{column}: {needle}", + path.strip_prefix(workspace_root) + .expect("relative path") + .display() + ) +} + +fn raw_string_end(bytes: &[u8], index: usize) -> Option<usize> { + let mut cursor = index; + if matches!(bytes.get(cursor), Some(b'b' | b'c')) { + cursor += 1; + } + if bytes.get(cursor) != Some(&b'r') { + return None; + } + cursor += 1; + let mut hashes = 0; + while bytes.get(cursor) == Some(&b'#') { + hashes += 1; + cursor += 1; + } + if bytes.get(cursor) != Some(&b'"') { + return None; + } + cursor += 1; + while cursor < bytes.len() { + if bytes[cursor] == b'"' + && bytes + .get(cursor + 1..cursor + 1 + hashes) + .is_some_and(|suffix| suffix.iter().all(|byte| *byte == b'#')) + { + return Some(cursor + 1 + hashes); + } + cursor += 1; + } + Some(bytes.len()) +} + +fn regular_string_end(bytes: &[u8], index: usize) -> Option<usize> { + let quote = if matches!(bytes.get(index), Some(b'b' | b'c')) { + index + 1 + } else { + index + }; + if bytes.get(quote) != Some(&b'"') { + return None; + } + let mut cursor = quote + 1; + while cursor < bytes.len() { + match bytes[cursor] { + b'\\' => cursor += 2, + b'"' => return Some(cursor + 1), + _ => cursor += 1, + } + } + Some(bytes.len()) +} + +fn char_literal_end(bytes: &[u8], index: usize) -> Option<usize> { + if bytes.get(index) != Some(&b'\'') { + return None; + } + let mut cursor = index + 1; + while cursor < bytes.len() && bytes[cursor] != b'\n' { + match bytes[cursor] { + b'\\' => cursor += 2, + b'\'' => return Some(cursor + 1), + _ => cursor += 1, + } + } + None +} + +fn is_identifier_byte(byte: Option<u8>) -> bool { + byte.is_some_and(|byte| byte.is_ascii_alphanumeric() || byte == b'_') +} + +fn line_column(source: &str, offset: usize) -> (usize, usize) { + let mut line = 1; + let mut column = 1; + for (index, character) in source.char_indices() { + if index == offset { + return (line, column); + } + if character == '\n' { + line += 1; + column = 1; + } else { + column += 1; + } + } + (line, column) +} diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs @@ -17,6 +17,7 @@ use tangle_runtime::{ relay::{ auth::BaseAuthState, core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, + outbound::RuntimeRelayMessage, }, runtime::{TangleRuntime, TangleRuntimeHandle}, }; @@ -1340,11 +1341,11 @@ fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport, let elapsed = elapsed_micros(started); let first_events = first_messages .iter() - .filter(|message| matches!(message, RelayMessage::Event { .. })) + .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) .count(); let second_events = second_messages .iter() - .filter(|message| matches!(message, RelayMessage::Event { .. })) + .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) .count(); let accepted = if first_events == subscriber_count && second_events == subscriber_count @@ -1593,7 +1594,7 @@ fn query_for_operation( relay.handle_close(&subscription_id); Ok(messages .iter() - .filter(|message| matches!(message, RelayMessage::Event { .. })) + .filter(|message| matches!(message, RuntimeRelayMessage::Event { .. })) .count() .try_into() .expect("message count fits in u64")) @@ -1818,7 +1819,8 @@ fn authenticated(key: FixtureKey) -> Result<BaseAuthState, String> { auth.issue_challenge("challenge-a", tangle_protocol::UnixTimestamp::new(100)) .map_err(|error| error.to_string())?; let event = tangle_v2_auth_event(key, "challenge-a", 120)?; - auth.authenticate(&event, tangle_protocol::UnixTimestamp::new(120)) + let pocket = pocket_event(&event)?; + auth.authenticate_pocket(&pocket, tangle_protocol::UnixTimestamp::new(120)) .map_err(|error| error.to_string())?; Ok(auth) } diff --git a/crates/tangle_groups/src/event_view.rs b/crates/tangle_groups/src/event_view.rs @@ -1,7 +1,9 @@ use crate::errors::{GroupError, GroupErrorKind}; use pocket_types::{Event as PocketEvent, OwnedEvent as PocketOwnedEvent, TagsStringIter}; use std::str; -use tangle_protocol::{Event, EventId, Kind, PublicKeyHex, Tag, TagName, UnixTimestamp}; +#[cfg(test)] +use tangle_protocol::{Event, Tag}; +use tangle_protocol::{EventId, Kind, PublicKeyHex, UnixTimestamp}; pub trait GroupEventView { fn id_hex(&self) -> String; @@ -53,12 +55,13 @@ impl<'a> GroupEventTag<'a> { pub fn indexed_pair(&self) -> Option<(&'a str, &'a str)> { let name = self.first_value()?; - if !TagName::is_indexable_name(name) { + if !is_indexable_tag_name(name) { return None; } self.value(1).map(|value| (name, value)) } + #[cfg(test)] fn from_tangle(tag: &'a Tag) -> Self { Self { values: tag.values().iter().map(String::as_str).collect(), @@ -74,6 +77,7 @@ impl<'a> GroupEventTag<'a> { } } +#[cfg(test)] impl GroupEventView for Event { fn id_hex(&self) -> String { self.id().as_str().to_owned() @@ -161,6 +165,14 @@ impl GroupEventView for PocketOwnedEvent { } } +fn is_indexable_tag_name(value: &str) -> bool { + let mut bytes = value.bytes(); + let Some(byte) = bytes.next() else { + return false; + }; + bytes.next().is_none() && byte.is_ascii_alphabetic() +} + fn tag_value_utf8(value: &[u8]) -> Result<&str, GroupError> { str::from_utf8(value).map_err(|_| { GroupError::invalid( diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs @@ -20,8 +20,6 @@ use tangle_groups::{ event_view::GroupEventView, group_current_key, member_current_key, projection_checkpoint_key, rebuild_group_projection, role_current_key, tombstone_key, }; -#[cfg(test)] -use tangle_protocol::Event; use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp}; use tangle_store_pocket::{ PocketEvent, PocketEventId, PocketOwnedEvent, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, @@ -152,20 +150,6 @@ impl GroupServiceHandle { .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))? .store_group_pocket_event(store, event, class, auth) } - - #[cfg(test)] - pub(crate) fn store_group_event( - &self, - store: &PocketStoreHandle, - event: &Event, - class: &GroupEventClass, - auth: &GroupAuthContext, - ) -> Result<GroupEventWrite, GroupEventWriteError> { - self.state - .write() - .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))? - .store_group_event(store, event, class, auth) - } } impl GroupServiceState { @@ -287,34 +271,6 @@ impl GroupServiceState { Ok(GroupEventWrite::Stored(stored_offsets)) } - #[cfg(test)] - fn store_group_event( - &mut self, - store: &PocketStoreHandle, - event: &Event, - class: &GroupEventClass, - auth: &GroupAuthContext, - ) -> Result<GroupEventWrite, GroupEventWriteError> { - self.check_event(store, event, class, auth) - .map_err(GroupEventWriteError::Rejected)?; - if store - .event_by_id(pocket_event_id(event.id())?) - .map_err(BaseRelayError::from)? - .is_some() - { - return Ok(GroupEventWrite::Duplicate); - } - let pocket_event = crate::pocket_conversion::tangle_event_to_pocket(event)?; - let store_offset = StoreOffset::new( - store - .store_event(&pocket_event) - .map_err(BaseRelayError::from)?, - ); - let mut stored_offsets = vec![store_offset]; - stored_offsets.extend(self.after_source_event_stored(store, event, class, store_offset)?); - Ok(GroupEventWrite::Stored(stored_offsets)) - } - fn event_visible_to_auth( &self, event: &(impl GroupEventView + ?Sized), diff --git a/crates/tangle_runtime/src/logging.rs b/crates/tangle_runtime/src/logging.rs @@ -400,6 +400,7 @@ mod tests { log_runtime_config_loaded, sanitize_error_message, }; use crate::config::parse_base_relay_runtime_config_json; + use crate::pocket_conversion::tangle_event_to_pocket; use std::{ io, sync::{Arc, Mutex}, @@ -576,14 +577,15 @@ mod tests { ]; for (event, action_family, target_count, generated_state_rejection) in cases { - let class = classify_group_event(&event, GroupLimitsConfig::default()).expect("class"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); let result = if matches!(class, GroupEventClass::RelayGeneratedSnapshot { .. }) { TangleModerationAuditResult::Rejected } else { TangleModerationAuditResult::Accepted }; let entry = - TangleModerationAuditEntry::new(&event, &class, result).expect("audit entry"); + TangleModerationAuditEntry::new(&pocket, &class, result).expect("audit entry"); assert_eq!(entry.action_family, action_family); assert_eq!(entry.target_count, target_count); @@ -606,7 +608,8 @@ mod tests { "secret-content", ) .expect("join"); - let class = classify_group_event(&event, GroupLimitsConfig::default()).expect("class"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); let writer = CapturedWriter::default(); let subscriber = tracing_subscriber::fmt() .json() @@ -615,7 +618,7 @@ mod tests { .finish(); tracing::subscriber::with_default(subscriber, || { - log_group_moderation_audit(&event, &class, TangleModerationAuditResult::Rejected); + log_group_moderation_audit(&pocket, &class, TangleModerationAuditResult::Rejected); }); let output = writer.output(); @@ -641,10 +644,11 @@ mod tests { fn group_moderation_audit_ignores_non_requested_group_event_kinds() { let event = tangle_v2_group_event(FixtureKey::Member, "AuditFarm", 41, 1, "normal").expect("event"); - let class = classify_group_event(&event, GroupLimitsConfig::default()).expect("class"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); assert!( - TangleModerationAuditEntry::new(&event, &class, TangleModerationAuditResult::Accepted) + TangleModerationAuditEntry::new(&pocket, &class, TangleModerationAuditResult::Accepted) .is_none() ); } diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs @@ -1,13 +1,16 @@ #![forbid(unsafe_code)] use crate::errors::BaseRelayError; +#[cfg(test)] use std::str; +use tangle_protocol::EventId; #[cfg(test)] use tangle_protocol::Filter; -use tangle_protocol::{ - Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, -}; -use tangle_store_pocket::{PocketEvent, PocketEventId}; +#[cfg(test)] +use tangle_protocol::{Event, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent}; +#[cfg(test)] +use tangle_store_pocket::PocketEvent; +use tangle_store_pocket::PocketEventId; #[cfg(test)] use tangle_store_pocket::{ PocketKind, PocketOwnedEvent, PocketOwnedFilter, PocketOwnedTags, PocketPubkey, PocketSig, @@ -89,6 +92,7 @@ pub(crate) fn tangle_filter_to_pocket( .map_err(|error| BaseRelayError::error(error.to_string())) } +#[cfg(test)] pub(crate) fn pocket_event_to_tangle(event: &PocketEvent) -> Result<Event, BaseRelayError> { let tags = event .tags() diff --git a/crates/tangle_runtime/src/relay/auth.rs b/crates/tangle_runtime/src/relay/auth.rs @@ -9,8 +9,11 @@ use crate::{ }; use std::collections::BTreeSet; use std::str; +#[cfg(test)] use tangle_crypto::verify_event_signature; -use tangle_protocol::{Event, PublicKeyHex, RelayMessage, UnixTimestamp}; +#[cfg(test)] +use tangle_protocol::Event; +use tangle_protocol::{PublicKeyHex, RelayMessage, UnixTimestamp}; use tangle_store_pocket::PocketEvent; pub fn generate_auth_challenge() -> Result<String, BaseRelayError> { @@ -75,6 +78,7 @@ impl BaseAuthState { Ok(RelayMessage::Auth(challenge)) } + #[cfg(test)] pub fn authenticate( &mut self, event: &Event, @@ -123,7 +127,7 @@ impl BaseAuthState { Ok(pubkey) } - pub(crate) fn authenticate_pocket( + pub fn authenticate_pocket( &mut self, event: &PocketEvent, now: UnixTimestamp, @@ -208,6 +212,7 @@ impl BaseRelayAuthEvent { } } +#[cfg(test)] fn parse_base_relay_auth_event(event: &Event) -> Result<Option<BaseRelayAuthEvent>, String> { if event.unsigned().kind().as_u32() != 22_242 { return Ok(None); @@ -254,6 +259,7 @@ fn parse_base_relay_pocket_auth_event( })) } +#[cfg(test)] fn required_single_tag_value(event: &Event, name: &str) -> Result<String, String> { let mut matches = event .unsigned() diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -4,24 +4,23 @@ use crate::groups::{ }; use crate::logging::{self, TangleModerationAuditResult}; use crate::ops::BaseRelayReadinessState; -use crate::pocket_conversion::pocket_event_to_tangle; #[cfg(test)] -use crate::pocket_conversion::{pocket_event_id, tangle_event_to_pocket, tangle_filter_to_pocket}; +use crate::pocket_conversion::{tangle_event_to_pocket, tangle_filter_to_pocket}; use crate::pocket_event_validation::{ is_pocket_nip70_protected_event, pocket_event_id as pocket_runtime_event_id, pocket_event_kind, pocket_event_pubkey, validate_pocket_event_shape, verify_pocket_event_signature, }; +#[cfg(test)] +use crate::relay::outbound::protocol_messages_for_test; use crate::relay::{ auth::BaseAuthState, live::{CloseResult, LiveSubscriptionSet}, - outbound::{RuntimeRelayMessage, protocol_messages}, + outbound::RuntimeRelayMessage, }; use std::{ cell::{Cell, RefCell}, collections::BTreeSet, }; -#[cfg(test)] -use tangle_crypto::verify_event_signature; use tangle_groups::{ GroupAuthContext, GroupEventClass, GroupEventView, GroupRuntimeConfig, StoreOffset, classify_group_event, validate_client_group_event_structure, @@ -166,10 +165,6 @@ impl BaseRelayQueryReport { pub(crate) fn into_messages(self) -> Vec<RuntimeRelayMessage> { self.messages } - - pub(crate) fn into_protocol_messages(self) -> Result<Vec<RelayMessage>, BaseRelayError> { - protocol_messages(self.messages) - } } #[derive(Debug, Clone, PartialEq)] @@ -350,15 +345,6 @@ enum BaseRelayFilterLimitMode { PreserveCountLimitless, } -#[cfg(test)] -fn is_nip70_protected_event(event: &Event) -> bool { - event - .unsigned() - .tags() - .iter() - .any(|tag| tag.name().as_str() == "-") -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BaseRelayShutdownReport { closed_subscriptions: usize, @@ -1048,113 +1034,14 @@ impl BaseRelay { event: Event, auth: &GroupAuthContext, ) -> Result<BaseRelayEventWrite, BaseRelayError> { - let event_id = event.id().clone(); - if let Err(error) = limits.validate_protocol_event_for_test(&event) { - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - error.prefixed_message(), - ))); - } - if let Err(error) = verify_event_signature(&event) { - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - format!("invalid: {error}"), - ))); - } - if is_nip70_protected_event(&event) && !auth.contains(event.unsigned().pubkey()) { - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - BaseRelayError::auth_required( - "protected event requires authenticated event author", - ) - .prefixed_message(), - ))); - } - let group_limits = groups.map(GroupServiceHandle::limits).unwrap_or_default(); - let audit_class = classify_group_event(&event, group_limits).ok(); - let class = match validate_client_group_event_structure(&event, group_limits) { - Ok(class) => class, - Err(error) => { - if let Some(class) = audit_class.as_ref() { - logging::log_group_moderation_audit( - &event, - class, - TangleModerationAuditResult::Rejected, - ); - } - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - error.prefixed_message(), - ))); - } - }; - if !matches!(class, GroupEventClass::NonGroup) { - let Some(groups) = groups else { - logging::log_group_moderation_audit( - &event, - &class, - TangleModerationAuditResult::Rejected, - ); - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - "blocked: NIP-29 group events are not accepted before group service".to_owned(), - ))); - }; - match groups.store_group_event(store, &event, &class, auth) { - Ok(GroupEventWrite::Stored(stored_offsets)) => { - logging::log_group_moderation_audit( - &event, - &class, - TangleModerationAuditResult::Accepted, - ); - return Ok(BaseRelayEventWrite::stored( - ok_accepted(event_id, String::new()), - stored_offsets, - )); - } - Ok(GroupEventWrite::Duplicate) => { - logging::log_group_moderation_audit( - &event, - &class, - TangleModerationAuditResult::Accepted, - ); - return Ok(BaseRelayEventWrite::unstored(ok_accepted( - event_id, - "duplicate: already have this event".to_owned(), - ))); - } - Err(GroupEventWriteError::Rejected(error)) => { - logging::log_group_moderation_audit( - &event, - &class, - TangleModerationAuditResult::Rejected, - ); - return Ok(BaseRelayEventWrite::unstored(ok_rejected( - event_id, - error.prefixed_message(), - ))); - } - Err(GroupEventWriteError::Storage(error)) => return Err(error), - } - } - if event.unsigned().kind().is_ephemeral() { - return Ok(BaseRelayEventWrite::unstored(ok_accepted( - event_id, - String::new(), - ))); - } - if store.event_by_id(pocket_event_id(&event_id)?)?.is_some() { - return Ok(BaseRelayEventWrite::unstored(ok_accepted( - event_id, - "duplicate: already have this event".to_owned(), - ))); - } let pocket_event = tangle_event_to_pocket(&event)?; - let store_offset = StoreOffset::new(store.store_event(&pocket_event)?); - Ok(BaseRelayEventWrite::stored( - ok_accepted(event_id, String::new()), - vec![store_offset], - )) + Self::handle_pocket_event_with_group_auth_and_services( + store, + groups, + limits, + &pocket_event, + auth, + ) } fn handle_pocket_event_with_group_auth( @@ -1291,7 +1178,7 @@ impl BaseRelay { &mut self, subscription_id: SubscriptionId, filters: Vec<PocketOwnedFilter>, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { self.handle_pocket_req_with_group_auth( subscription_id, filters, @@ -1334,7 +1221,8 @@ impl BaseRelay { auth: &GroupAuthContext, ) -> Result<Vec<RelayMessage>, BaseRelayError> { self.handle_protocol_req_with_group_auth_report_for_test(subscription_id, filters, auth) - .and_then(BaseRelayQueryReport::into_protocol_messages) + .map(BaseRelayQueryReport::into_messages) + .and_then(protocol_messages_for_test) } pub fn handle_pocket_req_with_auth( @@ -1342,7 +1230,7 @@ impl BaseRelay { subscription_id: SubscriptionId, filters: Vec<PocketOwnedFilter>, auth: &BaseAuthState, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { self.handle_pocket_req_with_group_auth( subscription_id, filters, @@ -1355,9 +1243,9 @@ impl BaseRelay { subscription_id: SubscriptionId, filters: Vec<PocketOwnedFilter>, auth: &GroupAuthContext, - ) -> Result<Vec<RelayMessage>, BaseRelayError> { + ) -> Result<Vec<RuntimeRelayMessage>, BaseRelayError> { self.handle_pocket_req_with_group_auth_report(subscription_id, filters, false, auth) - .and_then(BaseRelayQueryReport::into_protocol_messages) + .map(BaseRelayQueryReport::into_messages) } #[cfg(test)] @@ -1614,7 +1502,7 @@ impl BaseRelay { self.subscriptions.close(subscription_id) } - pub fn fanout_pocket(&mut self, event: &PocketEvent) -> Vec<RelayMessage> { + pub fn fanout_pocket(&mut self, event: &PocketEvent) -> Vec<RuntimeRelayMessage> { self.fanout_pocket_with_group_auth(event, &GroupAuthContext::unauthenticated()) } @@ -1622,7 +1510,7 @@ impl BaseRelay { &mut self, event: &PocketEvent, auth: &GroupAuthContext, - ) -> Vec<RelayMessage> { + ) -> Vec<RuntimeRelayMessage> { let groups = self.groups.as_ref(); self.subscriptions .fanout(event, auth, |event, auth| { @@ -1630,9 +1518,9 @@ impl BaseRelay { }) .expect("Pocket live fanout must match") .into_iter() - .map(|subscription_id| RelayMessage::Event { + .map(|subscription_id| RuntimeRelayMessage::Event { subscription_id, - event: pocket_event_to_tangle(event).expect("Pocket fanout event must convert"), + event: event.to_owned(), }) .collect() } @@ -1649,7 +1537,8 @@ impl BaseRelay { auth: &GroupAuthContext, ) -> Vec<RelayMessage> { let pocket_event = tangle_event_to_pocket(event).expect("event must convert to Pocket"); - self.fanout_pocket_with_group_auth(&pocket_event, auth) + protocol_messages_for_test(self.fanout_pocket_with_group_auth(&pocket_event, auth)) + .expect("test protocol fanout must convert") } pub fn active_subscription_count(&self) -> usize { diff --git a/crates/tangle_runtime/src/relay/outbound.rs b/crates/tangle_runtime/src/relay/outbound.rs @@ -1,12 +1,12 @@ #![forbid(unsafe_code)] -use crate::{errors::BaseRelayError, pocket_conversion::pocket_event_to_tangle}; +use crate::errors::BaseRelayError; use std::str; use tangle_protocol::{RelayMessage, SubscriptionId}; use tangle_store_pocket::PocketOwnedEvent; #[derive(Debug, Clone, PartialEq)] -pub(crate) enum RuntimeRelayMessage { +pub enum RuntimeRelayMessage { Event { subscription_id: SubscriptionId, event: PocketOwnedEvent, @@ -22,7 +22,7 @@ impl RuntimeRelayMessage { } } - pub(crate) fn encode(&self) -> Result<String, BaseRelayError> { + pub fn encode(&self) -> Result<String, BaseRelayError> { match self { Self::Event { subscription_id, @@ -32,15 +32,11 @@ impl RuntimeRelayMessage { } } - pub(crate) fn into_protocol_message(self) -> Result<RelayMessage, BaseRelayError> { + pub(crate) fn into_protocol_control_message(self) -> Result<RelayMessage, BaseRelayError> { match self { - Self::Event { - subscription_id, - event, - } => Ok(RelayMessage::Event { - subscription_id, - event: pocket_event_to_tangle(&event)?, - }), + Self::Event { .. } => Err(BaseRelayError::error( + "event-bearing runtime messages must be encoded from Pocket events", + )), Self::Protocol(message) => Ok(message), } } @@ -52,12 +48,31 @@ impl From<RelayMessage> for RuntimeRelayMessage { } } -pub(crate) fn protocol_messages( +pub(crate) fn protocol_control_messages( messages: Vec<RuntimeRelayMessage>, ) -> Result<Vec<RelayMessage>, BaseRelayError> { messages .into_iter() - .map(RuntimeRelayMessage::into_protocol_message) + .map(RuntimeRelayMessage::into_protocol_control_message) + .collect() +} + +#[cfg(test)] +pub(crate) fn protocol_messages_for_test( + messages: Vec<RuntimeRelayMessage>, +) -> Result<Vec<RelayMessage>, BaseRelayError> { + messages + .into_iter() + .map(|message| match message { + RuntimeRelayMessage::Event { + subscription_id, + event, + } => Ok(RelayMessage::Event { + subscription_id, + event: crate::pocket_conversion::pocket_event_to_tangle(&event)?, + }), + RuntimeRelayMessage::Protocol(message) => Ok(message), + }) .collect() } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1,5 +1,7 @@ #![forbid(unsafe_code)] +#[cfg(test)] +use crate::relay::outbound::protocol_messages_for_test; use crate::{ client_message::RuntimeClientMessage, config::BaseRelayRuntimeConfig, @@ -21,7 +23,7 @@ use crate::{ BaseRelayShutdownReport, }, live::LiveSubscriptionSet, - outbound::{RuntimeRelayMessage, protocol_messages}, + outbound::{RuntimeRelayMessage, protocol_control_messages}, }, }; use serde::{Deserialize, Serialize}; @@ -830,7 +832,7 @@ impl TangleRuntimeHandle { now, ) .await?; - protocol_messages(messages) + protocol_control_messages(messages) } #[cfg(test)] @@ -848,7 +850,7 @@ impl TangleRuntimeHandle { now, ) .await?; - protocol_messages(messages) + protocol_messages_for_test(messages) } #[cfg(test)] @@ -882,7 +884,7 @@ impl TangleRuntimeHandle { now, ) .await?; - protocol_messages(messages) + protocol_messages_for_test(messages) } pub(crate) async fn handle_client_message_with_rate_limit_context( diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -499,7 +499,7 @@ impl TangleWebSocketSession { message, )?) .await?; - crate::relay::outbound::protocol_messages(messages) + crate::relay::outbound::protocol_messages_for_test(messages) } } diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -12,8 +12,9 @@ use tangle_groups::{ projection_checkpoint_key, }; use tangle_protocol::{ - Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp, event_to_value, - filter_from_value, filter_to_value, parse_client_message, parse_event_json, + Event, Filter, PublicKeyHex, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp, + event_from_value, event_to_value, filter_from_value, filter_to_value, parse_client_message, + parse_event_json, }; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, @@ -24,10 +25,11 @@ use tangle_runtime::{ auth::BaseAuthState, core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, live::CloseResult, + outbound::RuntimeRelayMessage, }, }; use tangle_store_pocket::{ - PocketQueryConfig, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, + PocketOwnedEvent, PocketQueryConfig, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, }; @@ -78,7 +80,7 @@ impl BaseRelayEventTestExt for BaseRelay { fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); let pocket = parse_pocket_event_json(&raw).expect("pocket event"); - self.fanout_pocket(&pocket) + runtime_messages_to_protocol(self.fanout_pocket(&pocket)).expect("protocol fanout") } fn fanout_with_group_auth( @@ -88,7 +90,8 @@ impl BaseRelayEventTestExt for BaseRelay { ) -> Vec<RelayMessage> { let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); let pocket = parse_pocket_event_json(&raw).expect("pocket event"); - self.fanout_pocket_with_group_auth(&pocket, auth) + runtime_messages_to_protocol(self.fanout_pocket_with_group_auth(&pocket, auth)) + .expect("protocol fanout") } } @@ -113,7 +116,9 @@ impl BaseRelayReqTestExt for BaseRelay { subscription_id: SubscriptionId, filters: Vec<Filter>, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_pocket_req(subscription_id, pocket_filters(filters)) + runtime_messages_to_protocol( + self.handle_pocket_req(subscription_id, pocket_filters(filters))?, + ) } fn handle_req_with_auth( @@ -122,7 +127,52 @@ impl BaseRelayReqTestExt for BaseRelay { filters: Vec<Filter>, auth: &BaseAuthState, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_pocket_req_with_auth(subscription_id, pocket_filters(filters), auth) + runtime_messages_to_protocol(self.handle_pocket_req_with_auth( + subscription_id, + pocket_filters(filters), + auth, + )?) + } +} + +fn runtime_messages_to_protocol( + messages: Vec<RuntimeRelayMessage>, +) -> Result<Vec<RelayMessage>, BaseRelayError> { + messages + .into_iter() + .map(runtime_message_to_protocol) + .collect() +} + +fn runtime_message_to_protocol( + message: RuntimeRelayMessage, +) -> Result<RelayMessage, BaseRelayError> { + match message { + RuntimeRelayMessage::Protocol(message) => Ok(message), + RuntimeRelayMessage::Event { + subscription_id, + event, + } => { + let encoded = RuntimeRelayMessage::Event { + subscription_id: subscription_id.clone(), + event, + } + .encode()?; + let value = serde_json::from_str::<serde_json::Value>(&encoded) + .map_err(|error| BaseRelayError::error(error.to_string()))?; + let event = value + .as_array() + .and_then(|items| items.get(2)) + .ok_or_else(|| BaseRelayError::error("encoded EVENT frame is malformed")) + .and_then(|value| { + event_from_value(value) + .map_err(|error| BaseRelayError::error(error.to_string())) + })?; + Ok(RelayMessage::Event { + subscription_id, + event, + }) + } } } @@ -170,6 +220,21 @@ fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedF .collect() } +fn authenticate_pocket_event_for_test( + auth: &mut BaseAuthState, + event: &Event, + now: UnixTimestamp, +) -> Result<PublicKeyHex, BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + auth.authenticate_pocket(&pocket, now) +} + +fn pocket_event_for_test(event: &Event) -> PocketOwnedEvent { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + parse_pocket_event_json(&raw).expect("pocket event") +} + #[test] fn public_relay_smoke_stores_queries_counts_and_fans_out() { let config = test_store_config("public-smoke"); @@ -261,18 +326,19 @@ fn auth_integration_covers_challenge_edges() { let owner_event = tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 105).expect("owner"); let admin_event = tangle_v2_auth_event(FixtureKey::Admin, "challenge-a", 110).expect("admin"); - let owner = auth - .authenticate(&owner_event, UnixTimestamp::new(105)) - .expect("owner"); - let admin = auth - .authenticate(&admin_event, UnixTimestamp::new(110)) - .expect("admin"); + let owner = + authenticate_pocket_event_for_test(&mut auth, &owner_event, UnixTimestamp::new(105)) + .expect("owner"); + let admin = + authenticate_pocket_event_for_test(&mut auth, &admin_event, UnixTimestamp::new(110)) + .expect("admin"); assert_ne!(owner, admin); assert!(auth.authenticated_pubkeys().contains(&owner)); assert!(auth.authenticated_pubkeys().contains(&admin)); assert_eq!( - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Member, "wrong", 111).expect("wrong"), UnixTimestamp::new(111), ) @@ -284,13 +350,13 @@ fn auth_integration_covers_challenge_edges() { let expired = BaseAuthState::new(TANGLE_V2_RELAY_URL, 1, 600).expect("expired"); let mut expired = issue_challenge(expired, "challenge-b", 100); assert_eq!( - expired - .authenticate( - &tangle_v2_auth_event(FixtureKey::Owner, "challenge-b", 101).expect("expired"), - UnixTimestamp::new(102), - ) - .expect_err("expired") - .prefixed_message(), + authenticate_pocket_event_for_test( + &mut expired, + &tangle_v2_auth_event(FixtureKey::Owner, "challenge-b", 101).expect("expired"), + UnixTimestamp::new(102), + ) + .expect_err("expired") + .prefixed_message(), "auth-required: auth challenge expired" ); @@ -299,10 +365,13 @@ fn auth_integration_covers_challenge_edges() { .issue_challenge("challenge-a", UnixTimestamp::new(100)) .expect("challenge"); assert_eq!( - wrong_relay - .authenticate(&owner_event, UnixTimestamp::new(105)) - .expect_err("relay") - .prefixed_message(), + authenticate_pocket_event_for_test( + &mut wrong_relay, + &owner_event, + UnixTimestamp::new(105), + ) + .expect_err("relay") + .prefixed_message(), "auth-required: auth relay does not match canonical relay URL" ); } @@ -2232,8 +2301,7 @@ fn store_source_events(config: &PocketStoreConfig, events: &[Event]) -> Vec<Stor let store = PocketStoreHandle::open(config).expect("store"); let mut offsets = Vec::new(); for event in events { - let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); - let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + let pocket = pocket_event_for_test(event); offsets.push(StoreOffset::new(store.store_event(&pocket).expect("store"))); } store.sync().expect("sync"); @@ -2248,8 +2316,9 @@ fn seed_pending_create_outbox_records( let store = PocketStoreHandle::open(config).expect("store"); let group_id = group("CrashFarm"); let mut projection = GroupProjection::new(); + let create_pocket = pocket_event_for_test(create); projection - .apply_canonical_event(create, create_offset, GroupLimitsConfig::default()) + .apply_canonical_event(&create_pocket, create_offset, GroupLimitsConfig::default()) .expect("projection"); let authority = GroupAuthority::new( [FixtureKey::Owner.public_key()], @@ -2331,10 +2400,11 @@ fn pending_recovery_outbox_records( [FixtureKey::Admin.public_key()], ); let mut projection = GroupProjection::new(); + let pocket_events = events.iter().map(pocket_event_for_test).collect::<Vec<_>>(); let mut records = Vec::new(); projection - .apply_canonical_event(&events[0], offsets[0], limits) + .apply_canonical_event(&pocket_events[0], offsets[0], limits) .expect("create projection"); let create_group = projection.group(&group_id).expect("create group"); records.push(GroupOutboxRecord::pending( @@ -2367,7 +2437,7 @@ fn pending_recovery_outbox_records( )); projection - .apply_canonical_event(&events[1], offsets[1], limits) + .apply_canonical_event(&pocket_events[1], offsets[1], limits) .expect("put projection"); records.push(GroupOutboxRecord::pending( GroupOutboxKey::new( @@ -2402,7 +2472,7 @@ fn pending_recovery_outbox_records( )); projection - .apply_canonical_event(&events[2], offsets[2], limits) + .apply_canonical_event(&pocket_events[2], offsets[2], limits) .expect("metadata projection"); let metadata_group = projection.group(&group_id).expect("metadata group"); records.push(GroupOutboxRecord::pending( @@ -2851,7 +2921,7 @@ fn authenticated(key: FixtureKey) -> BaseAuthState { let auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 60, 600).expect("auth"); let mut auth = issue_challenge(auth, "challenge-a", 100); let event = tangle_v2_auth_event(key, "challenge-a", 120).expect("auth event"); - auth.authenticate(&event, UnixTimestamp::new(120)) + authenticate_pocket_event_for_test(&mut auth, &event, UnixTimestamp::new(120)) .expect("authenticate"); auth } diff --git a/crates/tangle_runtime/tests/ops_truthfulness.rs b/crates/tangle_runtime/tests/ops_truthfulness.rs @@ -47,6 +47,16 @@ impl BaseRelayEventTestExt for BaseRelay { } } +fn authenticate_pocket_event_for_test( + auth: &mut BaseAuthState, + event: &Event, + now: UnixTimestamp, +) -> Result<(), BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + auth.authenticate_pocket(&pocket, now).map(|_| ()) +} + #[test] fn operations_surfaces_match_enforced_runtime_contracts() { let root = temp_root("ops-truthfulness"); @@ -138,7 +148,8 @@ fn operations_surfaces_match_enforced_runtime_contracts() { let mut auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 300, 600).expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(1_714_124_433)) .expect("challenge"); - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_433).expect("auth"), UnixTimestamp::new(1_714_124_433), ) diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -30,8 +30,9 @@ use tangle_runtime::{ server::serve_listener_until_shutdown, }; use tangle_store_pocket::{ - PocketStoreConfig, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, - TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, + PocketOwnedEvent, PocketStoreConfig, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, + TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, + parse_pocket_filter_json, }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, @@ -97,6 +98,21 @@ fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedF .collect() } +fn authenticate_pocket_event_for_test( + auth: &mut BaseAuthState, + event: &Event, + now: UnixTimestamp, +) -> Result<(), BaseRelayError> { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + let pocket = parse_pocket_event_json(&raw).expect("pocket event"); + auth.authenticate_pocket(&pocket, now).map(|_| ()) +} + +fn pocket_event_for_test(event: &Event) -> PocketOwnedEvent { + let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); + parse_pocket_event_json(&raw).expect("pocket event") +} + #[tokio::test] async fn tangle_run_serves_until_shutdown() { let root = temp_root("acceptance-server"); @@ -1304,14 +1320,16 @@ fn auth_rejects_events_outside_created_at_skew() { RelayMessage::Auth("challenge-a".to_owned()) ); - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 100).expect("fresh"), UnixTimestamp::new(100), ) .expect("fresh"); assert_eq!( - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Admin, "challenge-a", 89).expect("auth"), UnixTimestamp::new(100), ) @@ -1320,7 +1338,8 @@ fn auth_rejects_events_outside_created_at_skew() { "auth-required: auth event created_at is outside configured skew" ); assert_eq!( - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 111).expect("auth"), UnixTimestamp::new(100), ) @@ -1351,7 +1370,8 @@ fn protected_events_require_author_auth_before_nip70_is_advertised() { let mut auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 300, 10).expect("auth"); auth.issue_challenge("challenge-a", UnixTimestamp::new(1_714_124_433)) .expect("challenge"); - auth.authenticate( + authenticate_pocket_event_for_test( + &mut auth, &tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_433).expect("auth"), UnixTimestamp::new(1_714_124_433), ) @@ -1420,7 +1440,7 @@ fn private_but_not_hidden_group_metadata_remains_visible() { assert_eq!( gate.screen_event( - &phase2_snapshot_event(KIND_GROUP_METADATA, "Farm"), + &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_METADATA, "Farm")), None, Default::default() ) @@ -1429,7 +1449,7 @@ fn private_but_not_hidden_group_metadata_remains_visible() { ); assert_eq!( gate.screen_event( - &phase2_snapshot_event(KIND_GROUP_ADMINS, "Farm"), + &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_ADMINS, "Farm")), None, Default::default() ) @@ -1438,7 +1458,7 @@ fn private_but_not_hidden_group_metadata_remains_visible() { ); assert_eq!( gate.screen_event( - &phase2_snapshot_event(KIND_GROUP_MEMBERS, "Farm"), + &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_MEMBERS, "Farm")), None, Default::default() ) @@ -1452,7 +1472,7 @@ fn private_but_not_hidden_group_metadata_remains_visible() { assert_eq!( hidden_gate .screen_event( - &phase2_snapshot_event(KIND_GROUP_METADATA, "Hidden"), + &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_METADATA, "Hidden")), None, Default::default() ) @@ -1473,9 +1493,10 @@ fn public_join_defaults_false() { let authority = GroupAuthority::new([owner], Vec::<PublicKeyHex>::new()); let policy = GroupWritePolicy::new(&projection, &authority, GroupPolicyConfig::strict()); let join = phase2_group_event(KIND_GROUP_JOIN_REQUEST, "Farm", joiner.clone()); + let join_pocket = pocket_event_for_test(&join); let error = policy .check_event( - &join, + &join_pocket, &GroupEventClass::Normal { group_id: GroupId::new("Farm").expect("group"), }, @@ -1516,7 +1537,11 @@ fn duplicate_join_and_leave_use_duplicate_prefix() { let duplicate_join = policy .check_event( - &phase2_group_event(KIND_GROUP_JOIN_REQUEST, "Farm", member.clone()), + &pocket_event_for_test(&phase2_group_event( + KIND_GROUP_JOIN_REQUEST, + "Farm", + member.clone(), + )), &GroupEventClass::Normal { group_id: GroupId::new("Farm").expect("group"), }, @@ -1530,7 +1555,11 @@ fn duplicate_join_and_leave_use_duplicate_prefix() { let duplicate_leave = policy .check_event( - &phase2_group_event(KIND_GROUP_LEAVE_REQUEST, "Farm", outsider.clone()), + &pocket_event_for_test(&phase2_group_event( + KIND_GROUP_LEAVE_REQUEST, + "Farm", + outsider.clone(), + )), &GroupEventClass::Normal { group_id: GroupId::new("Farm").expect("group"), }, @@ -1561,7 +1590,11 @@ fn closed_groups_use_strict_nip29_semantics_without_compatibility_flag() { let join_error = policy .check_event( - &phase2_group_event(KIND_GROUP_JOIN_REQUEST, "Closed", outsider.clone()), + &pocket_event_for_test(&phase2_group_event( + KIND_GROUP_JOIN_REQUEST, + "Closed", + outsider.clone(), + )), &GroupEventClass::Normal { group_id: GroupId::new("Closed").expect("group"), }, @@ -1577,7 +1610,7 @@ fn closed_groups_use_strict_nip29_semantics_without_compatibility_flag() { assert_eq!( policy .check_event( - &phase2_group_event(1, "Closed", outsider.clone()), + &pocket_event_for_test(&phase2_group_event(1, "Closed", outsider.clone())), &GroupEventClass::Normal { group_id: GroupId::new("Closed").expect("group"), }, @@ -1779,9 +1812,9 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { .expect("owner auth"); let member_auth = tangle_v2_auth_event(FixtureKey::Member, "recovery-challenge", 1_714_124_471) .expect("member auth"); - auth.authenticate(&owner_auth, UnixTimestamp::new(1_714_124_470)) + authenticate_pocket_event_for_test(&mut auth, &owner_auth, UnixTimestamp::new(1_714_124_470)) .expect("owner"); - auth.authenticate(&member_auth, UnixTimestamp::new(1_714_124_471)) + authenticate_pocket_event_for_test(&mut auth, &member_auth, UnixTimestamp::new(1_714_124_471)) .expect("member"); let create = tangle_v2_group_create_event(FixtureKey::Owner, "RecoverSocket", 1_714_124_472, &[])