tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit d33f78987ee4e56fc777cb89efa13cade9858e40
parent 0a6352ec30b906e2edd615da3fd1ddc28cf4920c
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 05:02:04 -0700

tests: prove group write races

- Add production-path concurrency tests for duplicate create and duplicate join races.
- Cover join versus leave, delete versus write, and put versus remove races.
- Compare live group semantics with canonical Pocket replay after race handling.
- Validate group, recovery, privacy, check, clippy, and forbidden-surface lanes.

Diffstat:
Mcrates/tangle_runtime/src/runtime.rs | 665++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 657 insertions(+), 8 deletions(-)

diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1885,27 +1885,32 @@ mod tests { TangleRuntimeLimits, }; use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; - use crate::event_bus::{TangleEventBus, TangleEventReceiveError}; + use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver}; + use crate::pocket_conversion::pocket_event_to_tangle; use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope}; + use crate::relay::auth::BaseAuthState; use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics}; use crate::relay::live::LiveSubscriptionSet; use serde_json::json; use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet}, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, time::Duration, }; use tangle_groups::{ - GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_MEMBERS, - KIND_GROUP_METADATA, StoreOffset, + CanonicalGroupEvent, GroupAuthContext, GroupEventClass, GroupId, GroupProjection, + KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_MEMBERS, + KIND_GROUP_METADATA, MemberStatus, StoreOffset, rebuild_group_projection, }; use tangle_protocol::{ - ClientMessage, Kind, RelayMessage, SubscriptionId, Tag, UnixTimestamp, filter_from_value, + ClientMessage, Event, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, Tag, + UnixTimestamp, filter_from_value, }; use tangle_test_support::{ - FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event, - tangle_v2_group_event, tangle_v2_put_user_event, + FixtureKey, tangle_v2_auth_event, tangle_v2_delete_group_event, tangle_v2_event, + tangle_v2_group_create_event, tangle_v2_group_event, tangle_v2_join_event, + tangle_v2_leave_event, tangle_v2_put_user_event, tangle_v2_remove_user_event, }; #[test] @@ -3419,6 +3424,410 @@ mod tests { } #[tokio::test] + async fn runtime_group_concurrency_duplicate_create_accepts_one_projection() { + let root = temp_root("runtime-group-concurrency-duplicate-create"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config(&root, 32)).expect("runtime"), + ); + let mut offsets = handle.subscribe_events().await; + let owner_auth = + authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-create", 1_714_126_100) + .await; + let first = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_101, &[]) + .expect("first create"); + let second = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceCreate", 1_714_126_102, &[]) + .expect("second create"); + let first_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = first.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_101).await + }) + }; + let second_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = second.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_102).await + }) + }; + let replies = tokio::time::timeout(Duration::from_secs(3), async { + vec![ + first_task.await.expect("first task"), + second_task.await.expect("second task"), + ] + }) + .await + .expect("duplicate create race"); + + assert_eq!(accepted_count(&replies), 1); + assert_eq!( + rejected_messages(&replies), + vec!["invalid: group already exists".to_owned()] + ); + assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3); + assert_eq!( + offsets + .try_recv() + .expect_err("one create source plus generated"), + TangleEventReceiveError::Empty + ); + let mut auth = owner_auth.clone(); + assert_eq!( + runtime_group_count( + &handle, + "duplicate-create-count", + "RaceCreate", + KIND_GROUP_METADATA, + "d", + &mut auth, + 1_714_126_103, + ) + .await, + 1 + ); + assert_live_projection_matches_rebuild(&handle, "RaceCreate"); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_group_concurrency_duplicate_join_accepts_one_membership() { + let root = temp_root("runtime-group-concurrency-duplicate-join"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), + ); + let mut offsets = handle.subscribe_events().await; + let mut owner_auth = + authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-join", 1_714_126_200) + .await; + let member_auth = + authenticated_runtime_state(&handle, FixtureKey::Member, "member-join", 1_714_126_201) + .await; + let create = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoin", 1_714_126_202, &[]) + .expect("create"); + assert_accepted_reply( + runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_202).await, + &create, + ); + assert_eq!(drain_offsets(&mut offsets, 3).await.len(), 3); + let join_a = + tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_203).expect("join a"); + let join_b = + tangle_v2_join_event(FixtureKey::Member, "RaceJoin", 1_714_126_204).expect("join b"); + let first_task = { + let handle = handle.clone(); + let mut auth = member_auth.clone(); + let event = join_a.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_203).await + }) + }; + let second_task = { + let handle = handle.clone(); + let mut auth = member_auth.clone(); + let event = join_b.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_204).await + }) + }; + let replies = tokio::time::timeout(Duration::from_secs(3), async { + vec![ + first_task.await.expect("first task"), + second_task.await.expect("second task"), + ] + }) + .await + .expect("duplicate join race"); + + assert_eq!(accepted_count(&replies), 1); + assert_eq!( + rejected_messages(&replies), + vec!["duplicate: group member already exists".to_owned()] + ); + assert_eq!(drain_offsets(&mut offsets, 2).await.len(), 2); + assert_runtime_member_status( + &handle, + "RaceJoin", + &FixtureKey::Member.public_key(), + MemberStatus::Member, + ); + assert_live_projection_matches_rebuild(&handle, "RaceJoin"); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_group_concurrency_join_and_leave_match_rebuild() { + let root = temp_root("runtime-group-concurrency-join-leave"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config_with_public_join(&root, 32)).expect("runtime"), + ); + let mut owner_auth = authenticated_runtime_state( + &handle, + FixtureKey::Owner, + "owner-join-leave", + 1_714_126_300, + ) + .await; + let member_auth = authenticated_runtime_state( + &handle, + FixtureKey::Member, + "member-join-leave", + 1_714_126_301, + ) + .await; + let create = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceJoinLeave", 1_714_126_302, &[]) + .expect("create"); + let put_member = tangle_v2_put_user_event( + FixtureKey::Owner, + "RaceJoinLeave", + FixtureKey::Member, + 1_714_126_303, + ) + .expect("put member"); + assert_accepted_reply( + runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_302).await, + &create, + ); + assert_accepted_reply( + runtime_event_reply(&handle, put_member.clone(), &mut owner_auth, 1_714_126_303).await, + &put_member, + ); + let leave = tangle_v2_leave_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_304) + .expect("leave"); + let join = + tangle_v2_join_event(FixtureKey::Member, "RaceJoinLeave", 1_714_126_305).expect("join"); + let leave_task = { + let handle = handle.clone(); + let mut auth = member_auth.clone(); + let event = leave.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_304).await + }) + }; + let join_task = { + let handle = handle.clone(); + let mut auth = member_auth.clone(); + let event = join.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_305).await + }) + }; + let replies = tokio::time::timeout(Duration::from_secs(3), async { + vec![ + leave_task.await.expect("leave task"), + join_task.await.expect("join task"), + ] + }) + .await + .expect("join leave race"); + let join_accepted = reply_is_accepted(&replies[1]); + + assert_eq!(accepted_count(&replies), if join_accepted { 2 } else { 1 }); + if join_accepted { + assert!(rejected_messages(&replies).is_empty()); + } else { + assert_eq!( + rejected_messages(&replies), + vec!["duplicate: group member already exists".to_owned()] + ); + } + assert_runtime_member_status( + &handle, + "RaceJoinLeave", + &FixtureKey::Member.public_key(), + if join_accepted { + MemberStatus::Member + } else { + MemberStatus::Removed + }, + ); + assert_live_projection_matches_rebuild(&handle, "RaceJoinLeave"); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_group_concurrency_delete_tombstone_blocks_normal_write() { + let root = temp_root("runtime-group-concurrency-delete-write"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config(&root, 32)).expect("runtime"), + ); + let mut owner_auth = + authenticated_runtime_state(&handle, FixtureKey::Owner, "owner-delete", 1_714_126_400) + .await; + let create = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceDelete", 1_714_126_401, &[]) + .expect("create"); + assert_accepted_reply( + runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_401).await, + &create, + ); + let normal = + tangle_v2_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_402, 1, "normal") + .expect("normal"); + let delete = tangle_v2_delete_group_event(FixtureKey::Owner, "RaceDelete", 1_714_126_403) + .expect("delete"); + let normal_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = normal.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_402).await + }) + }; + let delete_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = delete.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_403).await + }) + }; + let replies = tokio::time::timeout(Duration::from_secs(3), async { + vec![ + normal_task.await.expect("normal task"), + delete_task.await.expect("delete task"), + ] + }) + .await + .expect("delete write race"); + let delete_reply = &replies[1]; + + assert!(reply_is_accepted(delete_reply)); + assert!( + reply_is_accepted(&replies[0]) + || rejected_messages(&replies) == vec!["blocked: group is deleted".to_owned()] + ); + let mut auth = owner_auth.clone(); + assert_eq!( + runtime_group_count( + &handle, + "deleted-normal-count", + "RaceDelete", + 1, + "h", + &mut auth, + 1_714_126_404, + ) + .await, + 0 + ); + assert_eq!( + runtime_group_count( + &handle, + "deleted-marker-count", + "RaceDelete", + KIND_GROUP_DELETE_GROUP, + "h", + &mut auth, + 1_714_126_405, + ) + .await, + 1 + ); + assert_live_projection_matches_rebuild(&handle, "RaceDelete"); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_group_concurrency_membership_mutation_matches_rebuild() { + let root = temp_root("runtime-group-concurrency-membership-mutation"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config(&root, 32)).expect("runtime"), + ); + let mut owner_auth = authenticated_runtime_state( + &handle, + FixtureKey::Owner, + "owner-membership", + 1_714_126_500, + ) + .await; + let create = + tangle_v2_group_create_event(FixtureKey::Owner, "RaceMember", 1_714_126_501, &[]) + .expect("create"); + assert_accepted_reply( + runtime_event_reply(&handle, create.clone(), &mut owner_auth, 1_714_126_501).await, + &create, + ); + let put_member = tangle_v2_put_user_event( + FixtureKey::Owner, + "RaceMember", + FixtureKey::Member, + 1_714_126_502, + ) + .expect("put member"); + let remove_member = tangle_v2_remove_user_event( + FixtureKey::Owner, + "RaceMember", + FixtureKey::Member, + 1_714_126_503, + ) + .expect("remove member"); + let put_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = put_member.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_502).await + }) + }; + let remove_task = { + let handle = handle.clone(); + let mut auth = owner_auth.clone(); + let event = remove_member.clone(); + tokio::spawn(async move { + runtime_event_reply(&handle, event, &mut auth, 1_714_126_503).await + }) + }; + let replies = tokio::time::timeout(Duration::from_secs(3), async { + vec![ + put_task.await.expect("put task"), + remove_task.await.expect("remove task"), + ] + }) + .await + .expect("membership mutation race"); + let remove_accepted = reply_is_accepted(&replies[1]); + + assert!(reply_is_accepted(&replies[0])); + if remove_accepted { + assert!(rejected_messages(&replies).is_empty()); + } else { + assert_eq!( + rejected_messages(&replies), + vec!["duplicate: group member does not exist".to_owned()] + ); + } + assert_runtime_member_status( + &handle, + "RaceMember", + &FixtureKey::Member.public_key(), + if remove_accepted { + MemberStatus::Removed + } else { + MemberStatus::Member + }, + ); + assert_live_projection_matches_rebuild(&handle, "RaceMember"); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_shared_services_progress_under_concurrent_event_query_count_and_fanout() { let root = temp_root("runtime-shared-concurrency"); let _ = std::fs::remove_dir_all(&root); @@ -3856,6 +4265,21 @@ mod tests { } fn runtime_config(root: &Path, per_connection_outbound_queue: usize) -> BaseRelayRuntimeConfig { + runtime_config_with_group_policy(root, per_connection_outbound_queue, false) + } + + fn runtime_config_with_public_join( + root: &Path, + per_connection_outbound_queue: usize, + ) -> BaseRelayRuntimeConfig { + runtime_config_with_group_policy(root, per_connection_outbound_queue, true) + } + + fn runtime_config_with_group_policy( + root: &Path, + per_connection_outbound_queue: usize, + public_join: bool, + ) -> BaseRelayRuntimeConfig { let raw = json!({ "server": { "listen_addr": "127.0.0.1:0", @@ -3874,7 +4298,11 @@ mod tests { "enabled": true, "canonical_relay_url": "wss://relay.radroots.test", "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", - "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], + "policy": { + "public_join": public_join, + "invites_enabled": false + } }, "auth": { "challenge_ttl_seconds": 300, @@ -3936,6 +4364,227 @@ mod tests { parse_base_relay_runtime_config_json(&raw).expect("config") } + async fn authenticated_runtime_state( + handle: &TangleRuntimeHandle, + key: FixtureKey, + challenge: &str, + now: u64, + ) -> BaseAuthState { + let mut auth = handle.auth_state().await.expect("auth"); + auth.issue_challenge(challenge, UnixTimestamp::new(now)) + .expect("challenge"); + let event = tangle_v2_auth_event(key, challenge, now).expect("auth event"); + let replies = handle + .handle_client_message( + ClientMessage::Auth(event.clone()), + &mut auth, + UnixTimestamp::new(now), + ) + .await + .expect("auth message"); + + assert_eq!( + replies, + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: true, + message: String::new() + }] + ); + auth + } + + async fn runtime_event_reply( + handle: &TangleRuntimeHandle, + event: Event, + auth: &mut BaseAuthState, + now: u64, + ) -> RelayMessage { + let replies = handle + .handle_client_message(ClientMessage::Event(event), auth, UnixTimestamp::new(now)) + .await + .expect("event message"); + + assert_eq!(replies.len(), 1); + replies.into_iter().next().expect("reply") + } + + async fn runtime_group_count( + handle: &TangleRuntimeHandle, + subscription_id: &str, + group_id: &str, + kind: u32, + tag_name: &str, + auth: &mut BaseAuthState, + now: u64, + ) -> u64 { + let replies = handle + .handle_client_message( + ClientMessage::Count { + subscription_id: SubscriptionId::new(subscription_id).expect("subscription"), + filters: vec![runtime_group_filter(group_id, kind, tag_name)], + }, + auth, + UnixTimestamp::new(now), + ) + .await + .expect("count"); + + match replies.as_slice() { + [RelayMessage::Count { count, .. }] => *count, + other => panic!("count reply expected, got {other:?}"), + } + } + + fn runtime_group_filter(group_id: &str, kind: u32, tag_name: &str) -> Filter { + let mut value = json!({"kinds": [kind]}); + value + .as_object_mut() + .expect("filter") + .insert(format!("#{tag_name}"), json!([group_id])); + filter_from_value(&value).expect("filter") + } + + async fn drain_offsets(receiver: &mut TangleEventReceiver, count: usize) -> Vec<StoreOffset> { + let mut offsets = Vec::with_capacity(count); + for _ in 0..count { + offsets.push( + tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .expect("offset timeout") + .expect("offset"), + ); + } + offsets + } + + fn accepted_count(replies: &[RelayMessage]) -> usize { + replies + .iter() + .filter(|reply| reply_is_accepted(reply)) + .count() + } + + fn reply_is_accepted(reply: &RelayMessage) -> bool { + matches!( + reply, + RelayMessage::Ok { + accepted: true, + message, + .. + } if message.is_empty() + ) + } + + fn rejected_messages(replies: &[RelayMessage]) -> Vec<String> { + replies + .iter() + .filter_map(|reply| match reply { + RelayMessage::Ok { + accepted: false, + message, + .. + } => Some(message.clone()), + _ => None, + }) + .collect() + } + + fn assert_accepted_reply(reply: RelayMessage, event: &Event) { + assert_eq!( + reply, + RelayMessage::Ok { + event_id: event.id().clone(), + accepted: true, + message: String::new() + } + ); + } + + fn assert_runtime_member_status( + handle: &TangleRuntimeHandle, + group_id: &str, + pubkey: &PublicKeyHex, + status: MemberStatus, + ) { + let group_id = GroupId::new(group_id).expect("group"); + let groups = handle.inner.groups.as_ref().expect("groups"); + let projection = groups.projection(); + + assert_eq!( + projection + .member(&group_id, pubkey) + .expect("member") + .status(), + status + ); + } + + fn assert_live_projection_matches_rebuild(handle: &TangleRuntimeHandle, group_id: &str) { + let group_id = GroupId::new(group_id).expect("group"); + let groups = handle.inner.groups.as_ref().expect("groups"); + let live = groups.projection(); + let rebuilt = rebuilt_projection(handle); + let live_group = live.group(&group_id); + let rebuilt_group = rebuilt.group(&group_id); + + assert_eq!( + live_group.map(|group| group.lifecycle()), + rebuilt_group.map(|group| group.lifecycle()) + ); + assert_eq!( + live_group.map(|group| group.metadata()), + rebuilt_group.map(|group| group.metadata()) + ); + assert_eq!( + live_group.and_then(|group| group.delete_event_id()), + rebuilt_group.and_then(|group| group.delete_event_id()) + ); + assert_eq!(live.tombstone(&group_id), rebuilt.tombstone(&group_id)); + assert_eq!( + projection_member_statuses(&live, &group_id), + projection_member_statuses(&rebuilt, &group_id) + ); + } + + fn rebuilt_projection(handle: &TangleRuntimeHandle) -> GroupProjection { + let groups = handle.inner.groups.as_ref().expect("groups"); + let limits = groups.limits(); + let events = handle + .inner + .store + .scan_events() + .expect("scan") + .into_iter() + .filter_map(|stored| { + let event = pocket_event_to_tangle(stored.event()).expect("event"); + match tangle_groups::classify_group_event(&event, limits).expect("classify") { + GroupEventClass::NonGroup => None, + _ => Some(CanonicalGroupEvent::new( + event, + StoreOffset::new(stored.store_offset()), + )), + } + }) + .collect::<Vec<_>>(); + + rebuild_group_projection(events, limits, UnixTimestamp::new(1_714_199_999)) + .expect("rebuild") + .into_projection() + } + + fn projection_member_statuses( + projection: &GroupProjection, + group_id: &GroupId, + ) -> BTreeMap<String, MemberStatus> { + projection + .members() + .iter() + .filter(|((candidate, _), _)| candidate == group_id) + .map(|((_, pubkey), member)| (pubkey.as_str().to_owned(), member.status())) + .collect() + } + fn runtime_relay_limits(max_pending_events: usize) -> BaseRelayLimits { BaseRelayLimits::new(BaseRelayLimitSettings { max_pending_events,