tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 304238cd7f9f1a458bd8fce25843bc3344a5f0f5
parent 817b3031a3e44860144e331fb3e67d41b2f4af3a
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 17:07:29 -0700

runtime: stress shared service concurrency

Diffstat:
Mcrates/tangle_runtime/src/runtime.rs | 434+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 434 insertions(+), 0 deletions(-)

diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1631,8 +1631,10 @@ mod tests { use crate::relay::live::LiveSubscriptionSet; use serde_json::json; use std::{ + collections::BTreeSet, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, + time::Duration, }; use tangle_groups::{ GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_METADATA, @@ -1643,6 +1645,7 @@ mod tests { }; use tangle_test_support::{ FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event, + tangle_v2_group_event, tangle_v2_put_user_event, }; #[test] @@ -2884,6 +2887,437 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + #[tokio::test] + async fn runtime_shared_services_progress_under_concurrent_event_query_count_and_fanout() { + let root = temp_root("runtime-shared-concurrency"); + let _ = std::fs::remove_dir_all(&root); + let handle = TangleRuntimeHandle::new( + TangleRuntime::open(runtime_config(&root, 32)).expect("runtime"), + ); + let base_time = 1_714_126_000; + let mut owner_auth = handle.auth_state().await.expect("owner auth"); + owner_auth + .issue_challenge("owner-stress", UnixTimestamp::new(base_time)) + .expect("owner challenge"); + let owner_auth_event = tangle_v2_auth_event(FixtureKey::Owner, "owner-stress", base_time) + .expect("owner auth event"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Auth(owner_auth_event.clone()), + &mut owner_auth, + UnixTimestamp::new(base_time) + ) + .await + .expect("owner auth"), + vec![RelayMessage::Ok { + event_id: owner_auth_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + let create = tangle_v2_group_create_event( + FixtureKey::Owner, + "StressPrivate", + base_time + 1, + &["private"], + ) + .expect("create"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(create.clone()), + &mut owner_auth, + UnixTimestamp::new(base_time + 1) + ) + .await + .expect("create"), + vec![RelayMessage::Ok { + event_id: create.id().clone(), + accepted: true, + message: String::new() + }] + ); + let put_member = tangle_v2_put_user_event( + FixtureKey::Owner, + "StressPrivate", + FixtureKey::Member, + base_time + 2, + ) + .expect("put member"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(put_member.clone()), + &mut owner_auth, + UnixTimestamp::new(base_time + 2) + ) + .await + .expect("put member"), + vec![RelayMessage::Ok { + event_id: put_member.id().clone(), + accepted: true, + message: String::new() + }] + ); + let mut member_auth = handle.auth_state().await.expect("member auth"); + member_auth + .issue_challenge("member-stress", UnixTimestamp::new(base_time + 3)) + .expect("member challenge"); + let member_auth_event = + tangle_v2_auth_event(FixtureKey::Member, "member-stress", base_time + 3) + .expect("member auth event"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Auth(member_auth_event.clone()), + &mut member_auth, + UnixTimestamp::new(base_time + 3) + ) + .await + .expect("member auth"), + vec![RelayMessage::Ok { + event_id: member_auth_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + let public_auth = handle.auth_state().await.expect("public auth"); + let mut offsets = handle.subscribe_events().await; + let group_write_count = 6_usize; + let public_write_count = 4_usize; + let mut write_tasks = Vec::new(); + for index in 0..group_write_count { + let handle = handle.clone(); + let mut auth = member_auth.clone(); + write_tasks.push(tokio::spawn(async move { + let event = tangle_v2_group_event( + FixtureKey::Member, + "StressPrivate", + base_time + 10 + u64::try_from(index).expect("index"), + 1, + &format!("private stress {index}"), + ) + .expect("group event"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new( + base_time + 10 + u64::try_from(index).expect("index") + ) + ) + .await + .expect("group write"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: true, + message: String::new() + }] + ); + (true, event) + })); + } + for index in 0..public_write_count { + let handle = handle.clone(); + let mut auth = public_auth.clone(); + write_tasks.push(tokio::spawn(async move { + let event = tangle_v2_event( + FixtureKey::Admin, + base_time + 40 + u64::try_from(index).expect("index"), + 1, + Vec::new(), + &format!("public stress {index}"), + ) + .expect("public event"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new( + base_time + 40 + u64::try_from(index).expect("index") + ) + ) + .await + .expect("public write"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: true, + message: String::new() + }] + ); + (false, event) + })); + } + let stored_events = tokio::time::timeout(Duration::from_secs(3), async { + let mut stored_events = Vec::new(); + for task in write_tasks { + stored_events.push(task.await.expect("write task")); + } + stored_events + }) + .await + .expect("write concurrency timeout"); + assert_eq!( + stored_events + .iter() + .filter(|(is_group, _)| *is_group) + .count(), + group_write_count + ); + assert_eq!( + stored_events + .iter() + .filter(|(is_group, _)| !*is_group) + .count(), + public_write_count + ); + let group_event_ids = stored_events + .iter() + .filter(|(is_group, _)| *is_group) + .map(|(_, event)| event.id().clone()) + .collect::<BTreeSet<_>>(); + let mut published_offsets = Vec::new(); + for _ in 0..stored_events.len() { + published_offsets.push( + tokio::time::timeout(Duration::from_secs(1), offsets.recv()) + .await + .expect("offset timeout") + .expect("offset"), + ); + } + assert_eq!( + offsets.try_recv().expect_err("no extra offsets"), + TangleEventReceiveError::Empty + ); + let mut visibility_tasks = Vec::new(); + for offset in published_offsets.iter().copied() { + let handle = handle.clone(); + let member_auth = member_auth.clone(); + let public_auth = public_auth.clone(); + let group_event_ids = group_event_ids.clone(); + visibility_tasks.push(tokio::spawn(async move { + let member_event = handle + .event_by_offset_with_auth(offset, &member_auth) + .await + .expect("member offset") + .expect("member visible"); + let public_event = handle + .event_by_offset_with_auth(offset, &public_auth) + .await + .expect("public offset"); + let is_group_event = group_event_ids.contains(member_event.id()); + if is_group_event { + assert!(public_event.is_none()); + } else { + assert!(public_event.is_some()); + } + is_group_event + })); + } + let visible_group_offsets = tokio::time::timeout(Duration::from_secs(3), async { + let mut visible_group_offsets = 0; + for task in visibility_tasks { + if task.await.expect("visibility task") { + visible_group_offsets += 1; + } + } + visible_group_offsets + }) + .await + .expect("visibility timeout"); + assert_eq!(visible_group_offsets, group_write_count); + let member_subscription = SubscriptionId::new("member-stress-live").expect("subscription"); + let public_subscription = SubscriptionId::new("public-stress-live").expect("subscription"); + let mut member_subscriptions = LiveSubscriptionSet::new(32, 64).expect("member live set"); + let mut public_subscriptions = LiveSubscriptionSet::new(32, 64).expect("public live set"); + let stress_filter = + filter_from_value(&json!({"kinds":[1], "#h":["StressPrivate"]})).expect("filter"); + member_subscriptions + .subscribe( + member_subscription.clone(), + vec![stress_filter.clone()], + GroupAuthContext::new([FixtureKey::Member.public_key()]), + ) + .expect("member subscribe"); + public_subscriptions + .subscribe( + public_subscription, + vec![stress_filter], + GroupAuthContext::unauthenticated(), + ) + .expect("public subscribe"); + let mut member_fanout_count = 0; + for offset in &published_offsets { + let public_replies = handle + .fanout_event_offset(*offset, &mut public_subscriptions) + .await + .expect("public fanout"); + assert!(public_replies.is_empty()); + let member_replies = handle + .fanout_event_offset(*offset, &mut member_subscriptions) + .await + .expect("member fanout"); + for reply in member_replies { + match reply { + RelayMessage::Event { + subscription_id, + event, + } => { + assert_eq!(subscription_id, member_subscription); + assert!(group_event_ids.contains(event.id())); + member_fanout_count += 1; + } + other => panic!("unexpected fanout reply {other:?}"), + } + } + } + assert_eq!(member_fanout_count, group_write_count); + let mut query_tasks = Vec::new(); + for index in 0..3_u64 { + let member_req_handle = handle.clone(); + let mut auth = member_auth.clone(); + let group_event_ids = group_event_ids.clone(); + query_tasks.push(tokio::spawn(async move { + let subscription_id = + SubscriptionId::new(&format!("member-req-{index}")).expect("subscription"); + let replies = member_req_handle + .handle_client_message( + ClientMessage::Req { + subscription_id: subscription_id.clone(), + filters: vec![ + filter_from_value(&json!({ + "kinds":[1], + "#h":["StressPrivate"], + "limit": 20 + })) + .expect("filter"), + ], + }, + &mut auth, + UnixTimestamp::new(base_time + 100 + index), + ) + .await + .expect("member req"); + assert_eq!( + replies + .iter() + .filter(|reply| matches!( + reply, + RelayMessage::Event { + subscription_id: delivered, + event + } if delivered == &subscription_id && group_event_ids.contains(event.id()) + )) + .count(), + group_event_ids.len() + ); + assert!(matches!( + replies.last(), + Some(RelayMessage::Eose(delivered)) if delivered == &subscription_id + )); + })); + let public_req_handle = handle.clone(); + let mut auth = public_auth.clone(); + query_tasks.push(tokio::spawn(async move { + let subscription_id = + SubscriptionId::new(&format!("public-req-{index}")).expect("subscription"); + let replies = public_req_handle + .handle_client_message( + ClientMessage::Req { + subscription_id: subscription_id.clone(), + filters: vec![ + filter_from_value(&json!({ + "kinds":[1], + "#h":["StressPrivate"], + "limit": 20 + })) + .expect("filter"), + ], + }, + &mut auth, + UnixTimestamp::new(base_time + 110 + index), + ) + .await + .expect("public req"); + assert_eq!(replies.len(), 1); + assert_eq!(replies[0], RelayMessage::Eose(subscription_id)); + })); + let member_count_handle = handle.clone(); + let mut auth = member_auth.clone(); + query_tasks.push(tokio::spawn(async move { + let subscription_id = + SubscriptionId::new(&format!("member-count-{index}")).expect("subscription"); + let replies = member_count_handle + .handle_client_message( + ClientMessage::Count { + subscription_id: subscription_id.clone(), + filters: vec![ + filter_from_value(&json!({ + "kinds":[1], + "#h":["StressPrivate"] + })) + .expect("filter"), + ], + }, + &mut auth, + UnixTimestamp::new(base_time + 120 + index), + ) + .await + .expect("member count"); + assert_eq!( + replies, + vec![RelayMessage::Count { + subscription_id, + count: u64::try_from(group_write_count).expect("group count") + }] + ); + })); + let public_count_handle = handle.clone(); + let mut auth = public_auth.clone(); + query_tasks.push(tokio::spawn(async move { + let subscription_id = + SubscriptionId::new(&format!("public-count-{index}")).expect("subscription"); + let replies = public_count_handle + .handle_client_message( + ClientMessage::Count { + subscription_id: subscription_id.clone(), + filters: vec![ + filter_from_value(&json!({ + "kinds":[1], + "#h":["StressPrivate"] + })) + .expect("filter"), + ], + }, + &mut auth, + UnixTimestamp::new(base_time + 130 + index), + ) + .await + .expect("public count"); + assert_eq!( + replies, + vec![RelayMessage::Count { + subscription_id, + count: 0 + }] + ); + })); + } + tokio::time::timeout(Duration::from_secs(3), async { + for task in query_tasks { + task.await.expect("query task"); + } + }) + .await + .expect("query concurrency timeout"); + handle.shutdown().await.expect("shutdown"); + + let _ = std::fs::remove_dir_all(root); + } + fn runtime_config(root: &Path, per_connection_outbound_queue: usize) -> BaseRelayRuntimeConfig { let raw = json!({ "server": {