tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 00bf0c2c5d717095c832d28538c6269f152d15a9
parent 42bb1ef88508d320d723d0925f86f14e3593bfee
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 18:57:00 -0700

runtime: prove generated state live fanout

Diffstat:
Mcrates/tangle_runtime/src/runtime.rs | 60+++++++++++++++++++++++++++++++++++++++++++++++-------------
Mcrates/tangle_runtime/src/server.rs | 6+++++-
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 111+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 163 insertions(+), 14 deletions(-)

diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -1897,8 +1897,8 @@ mod tests { time::Duration, }; use tangle_groups::{ - GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_METADATA, - StoreOffset, + GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_MEMBERS, + KIND_GROUP_METADATA, StoreOffset, }; use tangle_protocol::{ ClientMessage, Kind, RelayMessage, SubscriptionId, Tag, UnixTimestamp, filter_from_value, @@ -3314,8 +3314,11 @@ mod tests { .subscribe( subscription_id.clone(), vec![ - filter_from_value(&json!({"kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS]})) - .expect("filter"), + filter_from_value(&json!({ + "kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS], + "#d":["RuntimeFarm"] + })) + .expect("filter"), ], GroupAuthContext::unauthenticated(), ) @@ -3358,24 +3361,55 @@ mod tests { ]; assert!(source_offset < generated_offsets[0]); assert!(generated_offsets[0] < generated_offsets[1]); + let put_member = + tangle_v2_put_user_event(FixtureKey::Owner, "RuntimeFarm", FixtureKey::Member, 122) + .expect("put member"); + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(put_member.clone()), + &mut auth, + UnixTimestamp::new(122) + ) + .await + .expect("put member"), + vec![RelayMessage::Ok { + event_id: put_member.id().clone(), + accepted: true, + message: String::new() + }] + ); + let put_source_offset = offsets.try_recv().expect("put source offset"); + let member_generated_offset = offsets.try_recv().expect("member generated offset"); + assert!(generated_offsets[1] < put_source_offset); + assert!(put_source_offset < member_generated_offset); + let generated_offsets = [ + generated_offsets[0], + generated_offsets[1], + member_generated_offset, + ]; + let mut generated_kinds = BTreeSet::new(); for offset in generated_offsets { + let messages = handle + .fanout_event_offset(offset, &mut subscriptions) + .await + .expect("fanout"); assert!(matches!( - handle - .fanout_event_offset(offset, &mut subscriptions) - .await - .expect("fanout") - .as_slice(), + messages.as_slice(), [RelayMessage::Event { subscription_id: delivered, event }] if delivered == &subscription_id - && [KIND_GROUP_METADATA, KIND_GROUP_ADMINS] - .contains(&event.unsigned().kind().as_u32()) + && generated_kinds.insert(event.unsigned().kind().as_u32()) )); } - assert_eq!(handle.metrics().outbox_replayed_events(), 2); + assert_eq!( + generated_kinds, + BTreeSet::from([KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS]) + ); + assert_eq!(handle.metrics().outbox_replayed_events(), 3); assert_eq!(handle.metrics().outbox_pending_events(), 0); - assert_eq!(handle.metrics().event_bus_published_offsets(), 3); + assert_eq!(handle.metrics().event_bus_published_offsets(), 5); assert_eq!( offsets.try_recv().expect_err("only source plus generated"), TangleEventReceiveError::Empty diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -343,7 +343,11 @@ mod tests { json!(["OK", event.id().as_str(), true, ""]) ); - send_client_value(&mut socket, json!(["COUNT", "count-a", {}])).await; + send_client_value( + &mut socket, + json!(["COUNT", "count-a", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_433}]), + ) + .await; assert_eq!( read_relay_value(&mut socket).await, json!(["COUNT", "count-a", {"count": 1}]) diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1829,6 +1829,117 @@ async fn relay_generated_events_are_stored_projected_and_broadcast_to_websocket_ let _ = std::fs::remove_dir_all(root); } +#[tokio::test] +async fn private_relay_generated_events_reach_authorized_websocket_subscribers() { + let root = temp_root("acceptance-private-generated-websocket"); + let _ = std::fs::remove_dir_all(&root); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let runtime = TangleRuntime::open(runtime_config(&root, address)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); + let mut owner_writer = connect_nostr_socket(address).await; + let mut owner_reader = connect_nostr_socket(address).await; + let writer_challenge = read_auth_challenge(&mut owner_writer).await; + let reader_challenge = read_auth_challenge(&mut owner_reader).await; + let auth_created_at = current_unix_timestamp(); + + authenticate_client( + &mut owner_writer, + FixtureKey::Owner, + &writer_challenge, + auth_created_at, + ) + .await; + authenticate_client( + &mut owner_reader, + FixtureKey::Owner, + &reader_challenge, + auth_created_at.saturating_add(1), + ) + .await; + + send_client_value( + &mut owner_reader, + json!([ + "REQ", + "private-generated-live", + {"kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS], "#d":["PrivateGeneratedSocket"]} + ]), + ) + .await; + assert_eq!( + read_relay_value(&mut owner_reader).await, + json!(["EOSE", "private-generated-live"]) + ); + + let create = tangle_v2_group_create_event( + FixtureKey::Owner, + "PrivateGeneratedSocket", + 1_714_124_470, + &["private"], + ) + .expect("create"); + send_client_value(&mut owner_writer, json!(["EVENT", event_to_value(&create)])).await; + assert_ok(read_relay_value(&mut owner_writer).await, &create, true, ""); + let create_generated_kinds = [ + relay_event_kind_tag( + read_relay_value(&mut owner_reader).await, + "private-generated-live", + "d", + "PrivateGeneratedSocket", + ), + relay_event_kind_tag( + read_relay_value(&mut owner_reader).await, + "private-generated-live", + "d", + "PrivateGeneratedSocket", + ), + ]; + assert!(create_generated_kinds.contains(&KIND_GROUP_METADATA)); + assert!(create_generated_kinds.contains(&KIND_GROUP_ADMINS)); + + let put_member = tangle_v2_put_user_event( + FixtureKey::Owner, + "PrivateGeneratedSocket", + FixtureKey::Member, + 1_714_124_471, + ) + .expect("put member"); + send_client_value( + &mut owner_writer, + json!(["EVENT", event_to_value(&put_member)]), + ) + .await; + assert_ok( + read_relay_value(&mut owner_writer).await, + &put_member, + true, + "", + ); + assert_eq!( + relay_event_kind_tag( + read_relay_value(&mut owner_reader).await, + "private-generated-live", + "d", + "PrivateGeneratedSocket", + ), + KIND_GROUP_MEMBERS + ); + + shutdown.request_shutdown(); + read_websocket_close(&mut owner_writer).await; + read_websocket_close(&mut owner_reader).await; + let report = timeout(Duration::from_secs(2), task) + .await + .expect("shutdown timeout") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + + let _ = std::fs::remove_dir_all(root); +} + fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfig { parse_base_relay_runtime_config_json(&runtime_config_value(root, listen_addr).to_string()) .expect("config")