tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit f27291c96996502fed932fc578631d38b12c7da9
parent 9af337383db8459657d2ac771520abbcabeb59f8
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 03:44:35 -0700

tests: cover websocket relay protocol flows

- Promote the Phase 2 websocket protocol target into a real integration test.

- Exercise live websocket AUTH, EVENT, REQ, COUNT, CLOSE, malformed input, and duplicate replies.

- Prove same subscription ids remain isolated across concurrent websocket connections.

- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 264+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 258 insertions(+), 6 deletions(-)

diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1,12 +1,15 @@ #![forbid(unsafe_code)] +use futures_util::{SinkExt, StreamExt}; +use http::header; +use serde_json::{Value, json}; use std::{ io::{Read, Write}, net::{SocketAddr, TcpStream}, path::{Path, PathBuf}, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -use tangle_protocol::{RelayMessage, UnixTimestamp}; +use tangle_protocol::{Event, RelayMessage, UnixTimestamp, event_to_value}; use tangle_runtime::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, relay::auth::BaseAuthState, @@ -15,8 +18,10 @@ use tangle_runtime::{ }; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, + tangle_v2_event, }; use tokio::{net::TcpListener, time::timeout}; +use tokio_tungstenite::tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest}; #[tokio::test] async fn tangle_run_serves_until_shutdown() { @@ -55,10 +60,156 @@ async fn tangle_run_serves_until_shutdown() { let _ = std::fs::remove_dir_all(root); } -#[test] -#[ignore = "phase2 target: websocket protocol runtime"] -fn websocket_clients_use_nip01_nip42_and_nip45_flows() { - pending("real websocket sessions must handle EVENT REQ COUNT CLOSE and AUTH"); +#[tokio::test] +async fn websocket_clients_use_nip01_nip42_and_nip45_flows() { + let root = temp_root("acceptance-websocket"); + let _ = std::fs::remove_dir_all(&root); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let runtime = TangleRuntime::open(runtime_config(&root, address)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); + let mut first = connect_nostr_socket(address).await; + let mut second = connect_nostr_socket(address).await; + let first_challenge = read_auth_challenge(&mut first).await; + let second_challenge = read_auth_challenge(&mut second).await; + let first_event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + Vec::new(), + "websocket-one", + ) + .expect("first event"); + let second_event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_434, + 1, + Vec::new(), + "websocket-two", + ) + .expect("second event"); + let auth_created_at = current_unix_timestamp(); + let owner_auth = tangle_v2_auth_event(FixtureKey::Owner, &first_challenge, auth_created_at) + .expect("owner auth"); + let admin_auth = tangle_v2_auth_event( + FixtureKey::Admin, + &first_challenge, + auth_created_at.saturating_add(1), + ) + .expect("admin auth"); + let wrong_challenge_auth = tangle_v2_auth_event( + FixtureKey::Member, + &second_challenge, + auth_created_at.saturating_add(2), + ) + .expect("wrong challenge auth"); + + send_client_text(&mut first, "{").await; + assert_notice_prefix( + read_relay_value(&mut first).await, + "invalid: client message JSON is invalid:", + ); + + send_client_binary(&mut first, &[1, 2, 3]).await; + assert_eq!( + read_relay_value(&mut first).await, + json!(["NOTICE", "invalid: client message must be a text frame"]) + ); + + send_client_value(&mut first, json!(["AUTH", event_to_value(&owner_auth)])).await; + assert_ok(read_relay_value(&mut first).await, &owner_auth, true, ""); + send_client_value(&mut first, json!(["AUTH", event_to_value(&admin_auth)])).await; + assert_ok(read_relay_value(&mut first).await, &admin_auth, true, ""); + send_client_value( + &mut first, + json!(["AUTH", event_to_value(&wrong_challenge_auth)]), + ) + .await; + assert_ok( + read_relay_value(&mut first).await, + &wrong_challenge_auth, + false, + "auth-required: auth challenge does not match", + ); + + send_client_value(&mut first, json!(["EVENT", event_to_value(&first_event)])).await; + assert_ok(read_relay_value(&mut first).await, &first_event, true, ""); + send_client_value(&mut first, json!(["EVENT", event_to_value(&first_event)])).await; + assert_ok( + read_relay_value(&mut first).await, + &first_event, + true, + "duplicate: already have this event", + ); + + send_client_value( + &mut first, + json!(["COUNT", "count-websocket", {"kinds":[1]}]), + ) + .await; + assert_eq!( + read_relay_value(&mut first).await, + json!(["COUNT", "count-websocket", {"count": 1}]) + ); + + send_client_value(&mut first, json!(["REQ", "shared-sub", {"kinds":[1]}])).await; + assert_live_event( + read_relay_value(&mut first).await, + "shared-sub", + &first_event, + ); + assert_eq!( + read_relay_value(&mut first).await, + json!(["EOSE", "shared-sub"]) + ); + + send_client_value(&mut second, json!(["REQ", "shared-sub", {"kinds":[1]}])).await; + assert_live_event( + read_relay_value(&mut second).await, + "shared-sub", + &first_event, + ); + assert_eq!( + read_relay_value(&mut second).await, + json!(["EOSE", "shared-sub"]) + ); + + send_client_value(&mut first, json!(["CLOSE", "shared-sub"])).await; + expect_no_relay_message(&mut first).await; + + send_client_value(&mut first, json!(["EVENT", event_to_value(&second_event)])).await; + assert_ok(read_relay_value(&mut first).await, &second_event, true, ""); + assert_live_event( + read_relay_value(&mut second).await, + "shared-sub", + &second_event, + ); + expect_no_relay_message(&mut first).await; + + send_client_value(&mut first, json!(["EVENT", event_to_value(&second_event)])).await; + assert_ok( + read_relay_value(&mut first).await, + &second_event, + true, + "duplicate: already have this event", + ); + expect_no_relay_message(&mut second).await; + + send_client_value(&mut second, json!(["CLOSE", "shared-sub"])).await; + expect_no_relay_message(&mut second).await; + + shutdown.request_shutdown(); + read_websocket_close(&mut first).await; + read_websocket_close(&mut second).await; + let report = timeout(Duration::from_secs(2), task) + .await + .expect("shutdown timeout") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + + let _ = std::fs::remove_dir_all(root); } #[test] @@ -232,3 +383,104 @@ fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::R fn temp_root(name: &str) -> PathBuf { std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) } + +type TestWebSocket = + tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>; + +async fn connect_nostr_socket(address: SocketAddr) -> TestWebSocket { + let mut request = format!("ws://{address}/") + .into_client_request() + .expect("request"); + request.headers_mut().insert( + header::SEC_WEBSOCKET_PROTOCOL, + http::HeaderValue::from_static("nostr"), + ); + let (socket, response) = tokio_tungstenite::connect_async(request) + .await + .expect("websocket"); + assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); + assert_eq!( + response + .headers() + .get(header::SEC_WEBSOCKET_PROTOCOL) + .expect("protocol"), + "nostr" + ); + socket +} + +async fn send_client_value(socket: &mut TestWebSocket, value: Value) { + send_client_text(socket, &value.to_string()).await; +} + +async fn send_client_text(socket: &mut TestWebSocket, value: &str) { + socket + .send(TungsteniteMessage::Text(value.to_owned().into())) + .await + .expect("send client message"); +} + +async fn send_client_binary(socket: &mut TestWebSocket, value: &[u8]) { + socket + .send(TungsteniteMessage::Binary(value.to_vec().into())) + .await + .expect("send client binary"); +} + +async fn read_relay_value(socket: &mut TestWebSocket) -> Value { + let message = timeout(Duration::from_secs(1), socket.next()) + .await + .expect("relay message timeout") + .expect("relay message") + .expect("relay message result"); + let TungsteniteMessage::Text(text) = message else { + panic!("expected relay text message, got {message:?}"); + }; + serde_json::from_str(text.as_str()).expect("relay json") +} + +async fn read_auth_challenge(socket: &mut TestWebSocket) -> String { + let auth = read_relay_value(socket).await; + assert_eq!(auth[0], "AUTH"); + auth[1].as_str().expect("auth challenge").to_owned() +} + +async fn read_websocket_close(socket: &mut TestWebSocket) { + let next = timeout(Duration::from_secs(1), socket.next()) + .await + .expect("websocket close"); + match next { + Some(Ok(TungsteniteMessage::Close(_))) | None => {} + other => panic!("expected websocket close, got {other:?}"), + } +} + +async fn expect_no_relay_message(socket: &mut TestWebSocket) { + assert!( + timeout(Duration::from_millis(75), socket.next()) + .await + .is_err() + ); +} + +fn assert_notice_prefix(value: Value, prefix: &str) { + assert_eq!(value[0], "NOTICE"); + assert!(value[1].as_str().expect("notice").starts_with(prefix)); +} + +fn assert_ok(value: Value, event: &Event, accepted: bool, message: &str) { + assert_eq!(value, json!(["OK", event.id().as_str(), accepted, message])); +} + +fn assert_live_event(value: Value, subscription_id: &str, event: &Event) { + assert_eq!(value[0], "EVENT"); + assert_eq!(value[1], subscription_id); + assert_eq!(value[2]["id"], event.id().as_str()); +} + +fn current_unix_timestamp() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time") + .as_secs() +}