phase2_acceptance_targets.rs (96554B)
1 #![forbid(unsafe_code)] 2 3 use futures_util::{SinkExt, StreamExt}; 4 use http::header; 5 use serde_json::{Value, json}; 6 use std::{ 7 io::{Read, Write}, 8 net::{SocketAddr, TcpStream}, 9 path::{Path, PathBuf}, 10 time::{Duration, Instant, SystemTime, UNIX_EPOCH}, 11 }; 12 use tangle_crypto::RelaySigner; 13 use tangle_groups::{ 14 GroupAuthContext, GroupAuthority, GroupErrorKind, GroupEventClass, GroupId, GroupMetadata, 15 GroupMetadataFlags, GroupMetadataText, GroupPolicyConfig, GroupProjection, GroupReadDecision, 16 GroupReadGate, GroupState, GroupWriteDecision, GroupWritePolicy, KIND_GROUP_ADMINS, 17 KIND_GROUP_CREATE_GROUP, KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, 18 KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, 19 MemberState, MemberStatus, ProjectionOrderTuple, StoreOffset, SupportedKinds, 20 parse_group_runtime_config_json, 21 }; 22 use tangle_protocol::{ 23 Event, EventId, Filter, Kind, PublicKeyHex, RelayMessage, SignatureHex, SubscriptionId, Tag, 24 UnixTimestamp, UnsignedEvent, event_to_value, filter_from_value, filter_to_value, 25 }; 26 use tangle_runtime::{ 27 config::{ 28 BaseRelayRuntimeConfig, TangleHostRuntimeConfigSet, parse_base_relay_runtime_config_json, 29 parse_tangle_host_runtime_config_json, parse_tenant_runtime_config_json, 30 }, 31 errors::BaseRelayError, 32 host::TangleHostRuntime, 33 nip11::BaseRelayInfoConfig, 34 relay::{auth::BaseAuthState, core::BaseRelay}, 35 runtime::RelayRuntime, 36 server::serve_listener_until_shutdown, 37 }; 38 use tangle_store_pocket::{ 39 PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketStoreConfig, PocketStoreHandle, 40 PocketTime, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, 41 TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, 42 }; 43 use tangle_test_support::{FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL}; 44 use tokio::{net::TcpListener, time::timeout}; 45 use tokio_tungstenite::tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest}; 46 47 trait BaseRelayEventTestExt { 48 fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError>; 49 50 fn handle_event_with_auth( 51 &self, 52 event: Event, 53 auth: &BaseAuthState, 54 ) -> Result<RelayMessage, BaseRelayError>; 55 } 56 57 impl BaseRelayEventTestExt for BaseRelay { 58 fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { 59 let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); 60 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 61 self.handle_pocket_event(&pocket) 62 } 63 64 fn handle_event_with_auth( 65 &self, 66 event: Event, 67 auth: &BaseAuthState, 68 ) -> Result<RelayMessage, BaseRelayError> { 69 let raw = serde_json::to_vec(&event_to_value(&event)).expect("event JSON"); 70 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 71 self.handle_pocket_event_with_auth(&pocket, auth) 72 } 73 } 74 75 trait BaseRelayCountTestExt { 76 fn handle_count_protocol( 77 &self, 78 subscription_id: SubscriptionId, 79 filters: Vec<Filter>, 80 ) -> Result<RelayMessage, BaseRelayError>; 81 } 82 83 impl BaseRelayCountTestExt for BaseRelay { 84 fn handle_count_protocol( 85 &self, 86 subscription_id: SubscriptionId, 87 filters: Vec<Filter>, 88 ) -> Result<RelayMessage, BaseRelayError> { 89 BaseRelay::handle_count(self, subscription_id, pocket_filters(filters)) 90 } 91 } 92 93 fn pocket_filters(filters: Vec<Filter>) -> Vec<tangle_store_pocket::PocketOwnedFilter> { 94 filters 95 .iter() 96 .map(|filter| { 97 let raw = serde_json::to_vec(&filter_to_value(filter)).expect("filter JSON"); 98 parse_pocket_filter_json(&raw).expect("pocket filter") 99 }) 100 .collect() 101 } 102 103 fn authenticate_pocket_event_for_test( 104 auth: &mut BaseAuthState, 105 event: &Event, 106 now: UnixTimestamp, 107 ) -> Result<(), BaseRelayError> { 108 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 109 let pocket = parse_pocket_event_json(&raw).expect("pocket event"); 110 auth.authenticate_pocket(&pocket, now).map(|_| ()) 111 } 112 113 fn pocket_event_for_test(event: &Event) -> PocketOwnedEvent { 114 let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON"); 115 parse_pocket_event_json(&raw).expect("pocket event") 116 } 117 118 fn pocket_protocol_event( 119 key: FixtureKey, 120 created_at: u64, 121 kind: u64, 122 tags: Vec<Tag>, 123 content: &str, 124 ) -> Event { 125 let tags = pocket_tags_from_protocol(&tags); 126 let pocket = signed_pocket_event( 127 fixture_secret_byte(key), 128 created_at, 129 u16::try_from(kind).expect("pocket kind"), 130 &tags, 131 content.as_bytes(), 132 ); 133 pocket_event_to_protocol(&pocket) 134 } 135 136 fn pocket_protocol_auth_event(key: FixtureKey, challenge: &str, created_at: u64) -> Event { 137 pocket_protocol_event( 138 key, 139 created_at, 140 22_242, 141 vec![ 142 Tag::from_parts("relay", &[TANGLE_V2_RELAY_URL]).expect("relay"), 143 Tag::from_parts("challenge", &[challenge]).expect("challenge"), 144 ], 145 "", 146 ) 147 } 148 149 fn pocket_protocol_group_create_event( 150 key: FixtureKey, 151 group_id: &str, 152 created_at: u64, 153 flags: &[&str], 154 ) -> Event { 155 let mut tags = vec![ 156 Tag::from_parts("h", &[group_id]).expect("h"), 157 Tag::from_parts("name", &[group_id]).expect("name"), 158 ]; 159 for flag in flags { 160 tags.push(Tag::from_parts(flag, &[]).expect("flag")); 161 } 162 pocket_protocol_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 163 } 164 165 fn pocket_protocol_put_user_event( 166 key: FixtureKey, 167 group_id: &str, 168 target: FixtureKey, 169 created_at: u64, 170 ) -> Event { 171 let target_pubkey = target.public_key(); 172 pocket_protocol_event( 173 key, 174 created_at, 175 KIND_GROUP_PUT_USER.into(), 176 vec![ 177 Tag::from_parts("h", &[group_id]).expect("h"), 178 Tag::from_parts("p", &[target_pubkey.as_str()]).expect("p"), 179 ], 180 "", 181 ) 182 } 183 184 fn pocket_protocol_group_event( 185 key: FixtureKey, 186 group_id: &str, 187 created_at: u64, 188 kind: u64, 189 content: &str, 190 ) -> Event { 191 pocket_protocol_event( 192 key, 193 created_at, 194 kind, 195 vec![Tag::from_parts("h", &[group_id]).expect("h")], 196 content, 197 ) 198 } 199 200 fn tangle_v2_event( 201 key: FixtureKey, 202 created_at: u64, 203 kind: u64, 204 tags: Vec<Tag>, 205 content: &str, 206 ) -> Result<Event, String> { 207 Ok(pocket_protocol_event(key, created_at, kind, tags, content)) 208 } 209 210 fn tangle_v2_auth_event( 211 key: FixtureKey, 212 challenge: &str, 213 created_at: u64, 214 ) -> Result<Event, String> { 215 Ok(pocket_protocol_auth_event(key, challenge, created_at)) 216 } 217 218 fn tangle_v2_group_create_event( 219 key: FixtureKey, 220 group_id: &str, 221 created_at: u64, 222 flags: &[&str], 223 ) -> Result<Event, String> { 224 Ok(pocket_protocol_group_create_event( 225 key, group_id, created_at, flags, 226 )) 227 } 228 229 fn tangle_v2_group_metadata_event( 230 key: FixtureKey, 231 group_id: &str, 232 name: &str, 233 created_at: u64, 234 flags: &[&str], 235 ) -> Result<Event, String> { 236 let mut tags = vec![ 237 Tag::from_parts("h", &[group_id])?, 238 Tag::from_parts("name", &[name])?, 239 ]; 240 for flag in flags { 241 tags.push(Tag::from_parts(flag, &[])?); 242 } 243 tangle_v2_event(key, created_at, KIND_GROUP_EDIT_METADATA.into(), tags, "") 244 } 245 246 fn tangle_v2_join_event(key: FixtureKey, group_id: &str, created_at: u64) -> Result<Event, String> { 247 tangle_v2_group_event( 248 key, 249 group_id, 250 created_at, 251 KIND_GROUP_JOIN_REQUEST.into(), 252 "", 253 ) 254 } 255 256 fn tangle_v2_put_user_event( 257 key: FixtureKey, 258 group_id: &str, 259 target: FixtureKey, 260 created_at: u64, 261 ) -> Result<Event, String> { 262 let target_pubkey = target.public_key(); 263 tangle_v2_event( 264 key, 265 created_at, 266 KIND_GROUP_PUT_USER.into(), 267 vec![ 268 Tag::from_parts("h", &[group_id])?, 269 Tag::from_parts("p", &[target_pubkey.as_str()])?, 270 ], 271 "", 272 ) 273 } 274 275 fn tangle_v2_group_event( 276 key: FixtureKey, 277 group_id: &str, 278 created_at: u64, 279 kind: u64, 280 content: &str, 281 ) -> Result<Event, String> { 282 Ok(pocket_protocol_group_event( 283 key, group_id, created_at, kind, content, 284 )) 285 } 286 287 fn signed_pocket_event( 288 secret_byte: u8, 289 created_at: u64, 290 kind: u16, 291 tags: &PocketOwnedTags, 292 content: &[u8], 293 ) -> PocketOwnedEvent { 294 let secret = format!("{secret_byte:02x}").repeat(32); 295 RelaySigner::from_secret_hex(&secret) 296 .expect("signer") 297 .sign_pocket_event( 298 PocketKind::from_u16(kind), 299 tags, 300 PocketTime::from_u64(created_at), 301 content, 302 ) 303 .expect("pocket event") 304 } 305 306 fn pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 307 let parts = tags 308 .iter() 309 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 310 .collect::<Vec<_>>(); 311 PocketOwnedTags::new(&parts).expect("pocket tags") 312 } 313 314 fn pocket_event_to_protocol(event: &PocketOwnedEvent) -> Event { 315 let tags = event 316 .tags() 317 .expect("tags") 318 .iter() 319 .map(|tag| { 320 Tag::new( 321 tag.map(|value| std::str::from_utf8(value).expect("utf8").to_owned()) 322 .collect::<Vec<_>>(), 323 ) 324 .expect("tag") 325 }) 326 .collect::<Vec<_>>(); 327 Event::new( 328 EventId::new(&event.id().as_hex_string()).expect("event id"), 329 UnsignedEvent::new( 330 PublicKeyHex::new(&event.pubkey().as_hex_string()).expect("pubkey"), 331 UnixTimestamp::new(event.created_at().as_u64()), 332 Kind::new(u64::from(event.kind().as_u16())).expect("kind"), 333 tags, 334 std::str::from_utf8(event.content()).expect("content"), 335 ), 336 SignatureHex::new(&event.sig().to_string()).expect("sig"), 337 ) 338 } 339 340 fn fixture_secret_byte(key: FixtureKey) -> u8 { 341 match key { 342 FixtureKey::Relay => 9, 343 FixtureKey::Owner => 10, 344 FixtureKey::Admin => 11, 345 FixtureKey::Member => 12, 346 FixtureKey::Outsider => 13, 347 } 348 } 349 350 #[tokio::test] 351 async fn tangle_run_serves_until_shutdown() { 352 let root = temp_root("acceptance-server"); 353 let _ = std::fs::remove_dir_all(&root); 354 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 355 let address = listener.local_addr().expect("address"); 356 let runtime = host_runtime(&root, address); 357 let shutdown = runtime.shutdown_signal().clone(); 358 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 359 360 let ready = wait_for_http_ok(address, "/.well-known/tangle/ready", None).await; 361 let metrics = wait_for_http_ok(address, "/.well-known/tangle/metrics", None).await; 362 let tenants = wait_for_http_ok(address, "/.well-known/tangle/tenants", None).await; 363 let nip11 = wait_for_http_ok(address, "/", Some("application/nostr+json")).await; 364 365 assert!(ready.contains(r#""status":"ready""#)); 366 assert!(ready.contains(r#""active_tenants":"ready""#)); 367 assert!(metrics.contains(r#""tangle_readiness_ready":true"#)); 368 assert!(metrics.contains(r#""tangle_host_active_tenants":1"#)); 369 assert!(metrics.contains(r#""tangle_ws_connections_current":0"#)); 370 assert!(metrics.contains(r#""tangle_stored_event_offsets_total":0"#)); 371 assert!(tenants.contains(r#""tenant_id":"acceptance-relay""#)); 372 assert!(nip11.contains(r#""name":"tangle""#)); 373 assert!( 374 nip11 375 .to_ascii_lowercase() 376 .contains("content-type: application/nostr+json") 377 ); 378 assert!(!task.is_finished()); 379 380 shutdown.request_shutdown(); 381 382 let report = timeout(Duration::from_secs(2), task) 383 .await 384 .expect("shutdown timeout") 385 .expect("task") 386 .expect("serve"); 387 assert_eq!(report.listen_addr(), address); 388 assert_eq!(report.closed_subscriptions(), 0); 389 390 let _ = std::fs::remove_dir_all(root); 391 } 392 393 #[tokio::test] 394 async fn websocket_clients_use_nip01_nip42_and_nip45_flows() { 395 let root = temp_root("acceptance-websocket"); 396 let _ = std::fs::remove_dir_all(&root); 397 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 398 let address = listener.local_addr().expect("address"); 399 let runtime = host_runtime(&root, address); 400 let shutdown = runtime.shutdown_signal().clone(); 401 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 402 let mut first = connect_nostr_socket(address).await; 403 let mut second = connect_nostr_socket(address).await; 404 let first_challenge = read_auth_challenge(&mut first).await; 405 let second_challenge = read_auth_challenge(&mut second).await; 406 let first_event = tangle_v2_event( 407 FixtureKey::Member, 408 1_714_124_433, 409 1, 410 Vec::new(), 411 "websocket-one", 412 ) 413 .expect("first event"); 414 let second_event = tangle_v2_event( 415 FixtureKey::Member, 416 1_714_124_434, 417 1, 418 Vec::new(), 419 "websocket-two", 420 ) 421 .expect("second event"); 422 let auth_created_at = current_unix_timestamp(); 423 let owner_auth = tangle_v2_auth_event(FixtureKey::Owner, &first_challenge, auth_created_at) 424 .expect("owner auth"); 425 let admin_auth = tangle_v2_auth_event( 426 FixtureKey::Admin, 427 &first_challenge, 428 auth_created_at.saturating_add(1), 429 ) 430 .expect("admin auth"); 431 let wrong_challenge_auth = tangle_v2_auth_event( 432 FixtureKey::Member, 433 &second_challenge, 434 auth_created_at.saturating_add(2), 435 ) 436 .expect("wrong challenge auth"); 437 438 send_client_text(&mut first, "{").await; 439 assert_notice_prefix( 440 read_relay_value(&mut first).await, 441 "invalid: client message JSON is invalid:", 442 ); 443 444 send_client_binary(&mut first, &[1, 2, 3]).await; 445 assert_eq!( 446 read_relay_value(&mut first).await, 447 json!(["NOTICE", "invalid: client message must be a text frame"]) 448 ); 449 450 send_client_value(&mut first, json!(["AUTH", event_to_value(&owner_auth)])).await; 451 assert_ok(read_relay_value(&mut first).await, &owner_auth, true, ""); 452 send_client_value(&mut first, json!(["AUTH", event_to_value(&admin_auth)])).await; 453 assert_ok(read_relay_value(&mut first).await, &admin_auth, true, ""); 454 send_client_value( 455 &mut first, 456 json!(["AUTH", event_to_value(&wrong_challenge_auth)]), 457 ) 458 .await; 459 assert_ok( 460 read_relay_value(&mut first).await, 461 &wrong_challenge_auth, 462 false, 463 "auth-required: auth challenge does not match", 464 ); 465 466 let group_create = tangle_v2_group_create_event( 467 FixtureKey::Owner, 468 "WebsocketFarm", 469 auth_created_at.saturating_add(3), 470 &[], 471 ) 472 .expect("group create"); 473 send_client_value(&mut second, json!(["EVENT", event_to_value(&group_create)])).await; 474 assert_ok( 475 read_relay_value(&mut second).await, 476 &group_create, 477 false, 478 "auth-required: group event author must authenticate with AUTH", 479 ); 480 send_client_value(&mut first, json!(["EVENT", event_to_value(&group_create)])).await; 481 assert_ok(read_relay_value(&mut first).await, &group_create, true, ""); 482 483 send_client_value(&mut first, json!(["EVENT", event_to_value(&first_event)])).await; 484 assert_ok(read_relay_value(&mut first).await, &first_event, true, ""); 485 send_client_value(&mut first, json!(["EVENT", event_to_value(&first_event)])).await; 486 assert_ok( 487 read_relay_value(&mut first).await, 488 &first_event, 489 true, 490 "duplicate: already have this event", 491 ); 492 493 send_client_value( 494 &mut first, 495 json!(["COUNT", "count-websocket", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_433}]), 496 ) 497 .await; 498 assert_eq!( 499 read_relay_value(&mut first).await, 500 json!(["COUNT", "count-websocket", {"count": 1}]) 501 ); 502 503 send_client_value(&mut first, json!(["REQ", "shared-sub", {"kinds":[1]}])).await; 504 assert_live_event( 505 read_relay_value(&mut first).await, 506 "shared-sub", 507 &first_event, 508 ); 509 assert_eq!( 510 read_relay_value(&mut first).await, 511 json!(["EOSE", "shared-sub"]) 512 ); 513 514 send_client_value(&mut second, json!(["REQ", "shared-sub", {"kinds":[1]}])).await; 515 assert_live_event( 516 read_relay_value(&mut second).await, 517 "shared-sub", 518 &first_event, 519 ); 520 assert_eq!( 521 read_relay_value(&mut second).await, 522 json!(["EOSE", "shared-sub"]) 523 ); 524 525 send_client_value(&mut first, json!(["CLOSE", "shared-sub"])).await; 526 expect_no_relay_message(&mut first).await; 527 528 send_client_value(&mut first, json!(["EVENT", event_to_value(&second_event)])).await; 529 assert_ok(read_relay_value(&mut first).await, &second_event, true, ""); 530 assert_live_event( 531 read_relay_value(&mut second).await, 532 "shared-sub", 533 &second_event, 534 ); 535 expect_no_relay_message(&mut first).await; 536 537 send_client_value(&mut first, json!(["EVENT", event_to_value(&second_event)])).await; 538 assert_ok( 539 read_relay_value(&mut first).await, 540 &second_event, 541 true, 542 "duplicate: already have this event", 543 ); 544 expect_no_relay_message(&mut second).await; 545 546 send_client_value(&mut second, json!(["CLOSE", "shared-sub"])).await; 547 expect_no_relay_message(&mut second).await; 548 549 shutdown.request_shutdown(); 550 read_websocket_close(&mut first).await; 551 read_websocket_close(&mut second).await; 552 let report = timeout(Duration::from_secs(2), task) 553 .await 554 .expect("shutdown timeout") 555 .expect("task") 556 .expect("serve"); 557 assert_eq!(report.listen_addr(), address); 558 559 let _ = std::fs::remove_dir_all(root); 560 } 561 562 #[tokio::test] 563 async fn websocket_public_relay_covers_query_count_ephemeral_and_rejection_flows() { 564 let root = temp_root("acceptance-public-websocket"); 565 let _ = std::fs::remove_dir_all(&root); 566 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 567 let address = listener.local_addr().expect("address"); 568 let runtime = host_runtime(&root, address); 569 let shutdown = runtime.shutdown_signal().clone(); 570 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 571 let mut publisher = connect_nostr_socket(address).await; 572 let mut subscriber = connect_nostr_socket(address).await; 573 let _ = read_auth_challenge(&mut publisher).await; 574 let _ = read_auth_challenge(&mut subscriber).await; 575 let first = pocket_protocol_event( 576 FixtureKey::Member, 577 1_714_124_433, 578 1, 579 Vec::new(), 580 "public one", 581 ); 582 let second = pocket_protocol_event( 583 FixtureKey::Admin, 584 1_714_124_435, 585 1, 586 Vec::new(), 587 "public two", 588 ); 589 let other_kind = pocket_protocol_event( 590 FixtureKey::Owner, 591 1_714_124_436, 592 2, 593 Vec::new(), 594 "public other", 595 ); 596 let ephemeral = pocket_protocol_event( 597 FixtureKey::Member, 598 1_714_124_437, 599 20_001, 600 Vec::new(), 601 "public transient", 602 ); 603 let signature_source = pocket_protocol_event( 604 FixtureKey::Owner, 605 1_714_124_438, 606 1, 607 Vec::new(), 608 "signature source", 609 ); 610 let invalid = Event::new( 611 first.id().clone(), 612 first.unsigned().clone(), 613 signature_source.sig().clone(), 614 ); 615 616 send_client_value( 617 &mut subscriber, 618 json!(["REQ", "live-public", {"kinds":[1, 20001]}]), 619 ) 620 .await; 621 assert_eq!( 622 read_relay_value(&mut subscriber).await, 623 json!(["EOSE", "live-public"]) 624 ); 625 626 send_client_value(&mut publisher, json!(["EVENT", event_to_value(&invalid)])).await; 627 assert_ok_message_prefix( 628 read_relay_value(&mut publisher).await, 629 &invalid, 630 false, 631 "invalid:", 632 ); 633 expect_no_relay_message(&mut subscriber).await; 634 635 send_client_value(&mut publisher, json!(["EVENT", event_to_value(&first)])).await; 636 assert_ok(read_relay_value(&mut publisher).await, &first, true, ""); 637 assert_live_event( 638 read_relay_value(&mut subscriber).await, 639 "live-public", 640 &first, 641 ); 642 643 send_client_value(&mut publisher, json!(["EVENT", event_to_value(&second)])).await; 644 assert_ok(read_relay_value(&mut publisher).await, &second, true, ""); 645 assert_live_event( 646 read_relay_value(&mut subscriber).await, 647 "live-public", 648 &second, 649 ); 650 651 send_client_value( 652 &mut publisher, 653 json!(["EVENT", event_to_value(&other_kind)]), 654 ) 655 .await; 656 assert_ok( 657 read_relay_value(&mut publisher).await, 658 &other_kind, 659 true, 660 "", 661 ); 662 expect_no_relay_message(&mut subscriber).await; 663 664 send_client_value(&mut publisher, json!(["EVENT", event_to_value(&ephemeral)])).await; 665 assert_ok(read_relay_value(&mut publisher).await, &ephemeral, true, ""); 666 expect_no_relay_message(&mut subscriber).await; 667 668 send_client_value( 669 &mut publisher, 670 json!(["COUNT", "count-kind-one", {"kinds":[1], "since": 1_714_124_433, "until": 1_714_124_435}]), 671 ) 672 .await; 673 assert_eq!( 674 read_relay_value(&mut publisher).await, 675 json!(["COUNT", "count-kind-one", {"count": 2}]) 676 ); 677 678 send_client_value( 679 &mut publisher, 680 json!(["COUNT", "count-ephemeral", {"kinds":[20001], "since": 1_714_124_437, "until": 1_714_124_437}]), 681 ) 682 .await; 683 assert_eq!( 684 read_relay_value(&mut publisher).await, 685 json!(["COUNT", "count-ephemeral", {"count": 0}]) 686 ); 687 688 send_client_value( 689 &mut publisher, 690 json!([ 691 "REQ", 692 "query-public", 693 {"kinds":[1], "limit":1}, 694 {"ids":[first.id().as_str(), other_kind.id().as_str()]} 695 ]), 696 ) 697 .await; 698 assert_live_event( 699 read_relay_value(&mut publisher).await, 700 "query-public", 701 &other_kind, 702 ); 703 assert_live_event( 704 read_relay_value(&mut publisher).await, 705 "query-public", 706 &second, 707 ); 708 assert_live_event( 709 read_relay_value(&mut publisher).await, 710 "query-public", 711 &first, 712 ); 713 assert_eq!( 714 read_relay_value(&mut publisher).await, 715 json!(["EOSE", "query-public"]) 716 ); 717 718 send_client_value(&mut subscriber, json!(["CLOSE", "live-public"])).await; 719 expect_no_relay_message(&mut subscriber).await; 720 send_client_value(&mut publisher, json!(["CLOSE", "query-public"])).await; 721 expect_no_relay_message(&mut publisher).await; 722 723 let after_close = pocket_protocol_event( 724 FixtureKey::Admin, 725 1_714_124_439, 726 1, 727 Vec::new(), 728 "after close", 729 ); 730 send_client_value( 731 &mut publisher, 732 json!(["EVENT", event_to_value(&after_close)]), 733 ) 734 .await; 735 assert_ok( 736 read_relay_value(&mut publisher).await, 737 &after_close, 738 true, 739 "", 740 ); 741 expect_no_relay_message(&mut subscriber).await; 742 743 shutdown.request_shutdown(); 744 read_websocket_close(&mut publisher).await; 745 read_websocket_close(&mut subscriber).await; 746 let report = timeout(Duration::from_secs(2), task) 747 .await 748 .expect("shutdown timeout") 749 .expect("task") 750 .expect("serve"); 751 assert_eq!(report.listen_addr(), address); 752 753 let _ = std::fs::remove_dir_all(root); 754 } 755 756 #[tokio::test] 757 async fn websocket_healthy_subscriber_receives_more_than_outbound_capacity() { 758 let root = temp_root("acceptance-healthy-live-volume"); 759 let _ = std::fs::remove_dir_all(&root); 760 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 761 let address = listener.local_addr().expect("address"); 762 let runtime = host_runtime(&root, address); 763 let shutdown = runtime.shutdown_signal().clone(); 764 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 765 let mut publisher = connect_nostr_socket(address).await; 766 let mut subscriber = connect_nostr_socket(address).await; 767 let _ = read_auth_challenge(&mut publisher).await; 768 let _ = read_auth_challenge(&mut subscriber).await; 769 send_client_value( 770 &mut subscriber, 771 json!(["REQ", "healthy-live", {"kinds":[1]}]), 772 ) 773 .await; 774 assert_eq!( 775 read_relay_value(&mut subscriber).await, 776 json!(["EOSE", "healthy-live"]) 777 ); 778 let delivered_count = 10_u64; 779 for index in 0..delivered_count { 780 let event = tangle_v2_event( 781 FixtureKey::Member, 782 1_714_124_500 + index, 783 1, 784 Vec::new(), 785 &format!("healthy live {index}"), 786 ) 787 .expect("event"); 788 send_client_value(&mut publisher, json!(["EVENT", event_to_value(&event)])).await; 789 assert_ok(read_relay_value(&mut publisher).await, &event, true, ""); 790 assert_live_event( 791 read_relay_value(&mut subscriber).await, 792 "healthy-live", 793 &event, 794 ); 795 } 796 send_client_value( 797 &mut subscriber, 798 json!(["COUNT", "healthy-count", {"kinds":[1], "since": 1_714_124_500, "until": 1_714_124_509}]), 799 ) 800 .await; 801 assert_eq!( 802 read_relay_value(&mut subscriber).await, 803 json!(["COUNT", "healthy-count", {"count": delivered_count}]) 804 ); 805 806 shutdown.request_shutdown(); 807 read_websocket_close(&mut publisher).await; 808 read_websocket_close(&mut subscriber).await; 809 let report = timeout(Duration::from_secs(2), task) 810 .await 811 .expect("shutdown timeout") 812 .expect("task") 813 .expect("serve"); 814 assert_eq!(report.listen_addr(), address); 815 816 let _ = std::fs::remove_dir_all(root); 817 } 818 819 #[tokio::test] 820 async fn websocket_nip29_group_lifecycle_state_and_live_paths_are_integrated() { 821 let root = temp_root("acceptance-nip29-websocket"); 822 let _ = std::fs::remove_dir_all(&root); 823 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 824 let address = listener.local_addr().expect("address"); 825 let runtime = host_runtime(&root, address); 826 let shutdown = runtime.shutdown_signal().clone(); 827 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 828 let mut owner = connect_nostr_socket(address).await; 829 let mut member = connect_nostr_socket(address).await; 830 let mut outsider = connect_nostr_socket(address).await; 831 let mut observer = connect_nostr_socket(address).await; 832 let owner_challenge = read_auth_challenge(&mut owner).await; 833 let member_challenge = read_auth_challenge(&mut member).await; 834 let outsider_challenge = read_auth_challenge(&mut outsider).await; 835 let _ = read_auth_challenge(&mut observer).await; 836 let auth_created_at = current_unix_timestamp(); 837 838 authenticate_client( 839 &mut owner, 840 FixtureKey::Owner, 841 &owner_challenge, 842 auth_created_at, 843 ) 844 .await; 845 authenticate_client( 846 &mut member, 847 FixtureKey::Member, 848 &member_challenge, 849 auth_created_at.saturating_add(1), 850 ) 851 .await; 852 authenticate_client( 853 &mut outsider, 854 FixtureKey::Outsider, 855 &outsider_challenge, 856 auth_created_at.saturating_add(2), 857 ) 858 .await; 859 860 let create = tangle_v2_group_create_event(FixtureKey::Owner, "SocketFarm", 1_714_124_440, &[]) 861 .expect("create"); 862 send_client_value(&mut owner, json!(["EVENT", event_to_value(&create)])).await; 863 assert_ok(read_relay_value(&mut owner).await, &create, true, ""); 864 865 let denied_join = 866 tangle_v2_join_event(FixtureKey::Outsider, "SocketFarm", 1_714_124_441).expect("join"); 867 send_client_value( 868 &mut outsider, 869 json!(["EVENT", event_to_value(&denied_join)]), 870 ) 871 .await; 872 assert_ok( 873 read_relay_value(&mut outsider).await, 874 &denied_join, 875 false, 876 "restricted: group is unavailable", 877 ); 878 879 let metadata = tangle_v2_group_metadata_event( 880 FixtureKey::Owner, 881 "SocketFarm", 882 "Socket Market", 883 1_714_124_442, 884 &[], 885 ) 886 .expect("metadata"); 887 send_client_value(&mut owner, json!(["EVENT", event_to_value(&metadata)])).await; 888 assert_ok(read_relay_value(&mut owner).await, &metadata, true, ""); 889 890 let put_member = tangle_v2_put_user_event( 891 FixtureKey::Owner, 892 "SocketFarm", 893 FixtureKey::Member, 894 1_714_124_443, 895 ) 896 .expect("put member"); 897 send_client_value(&mut owner, json!(["EVENT", event_to_value(&put_member)])).await; 898 assert_ok(read_relay_value(&mut owner).await, &put_member, true, ""); 899 900 for (subscription_id, kind) in [ 901 ("metadata-count", KIND_GROUP_METADATA), 902 ("admins-count", KIND_GROUP_ADMINS), 903 ("members-count", KIND_GROUP_MEMBERS), 904 ] { 905 send_client_value( 906 &mut observer, 907 json!(["COUNT", subscription_id, {"kinds":[kind], "#d":["SocketFarm"]}]), 908 ) 909 .await; 910 assert_eq!( 911 read_relay_value(&mut observer).await, 912 json!(["COUNT", subscription_id, {"count": 1}]) 913 ); 914 } 915 916 for (subscription_id, kind) in [ 917 ("metadata-state", KIND_GROUP_METADATA), 918 ("admins-state", KIND_GROUP_ADMINS), 919 ("members-state", KIND_GROUP_MEMBERS), 920 ] { 921 send_client_value( 922 &mut observer, 923 json!(["REQ", subscription_id, {"kinds":[kind], "#d":["SocketFarm"]}]), 924 ) 925 .await; 926 assert_relay_event_kind_tag( 927 read_relay_value(&mut observer).await, 928 subscription_id, 929 kind, 930 "d", 931 "SocketFarm", 932 ); 933 assert_eq!( 934 read_relay_value(&mut observer).await, 935 json!(["EOSE", subscription_id]) 936 ); 937 send_client_value(&mut observer, json!(["CLOSE", subscription_id])).await; 938 expect_no_relay_message(&mut observer).await; 939 } 940 941 send_client_value( 942 &mut observer, 943 json!(["REQ", "group-live", {"kinds":[1], "#h":["SocketFarm"]}]), 944 ) 945 .await; 946 assert_eq!( 947 read_relay_value(&mut observer).await, 948 json!(["EOSE", "group-live"]) 949 ); 950 951 let group_note = tangle_v2_group_event( 952 FixtureKey::Member, 953 "SocketFarm", 954 1_714_124_444, 955 1, 956 "harvest", 957 ) 958 .expect("group note"); 959 send_client_value(&mut member, json!(["EVENT", event_to_value(&group_note)])).await; 960 assert_ok(read_relay_value(&mut member).await, &group_note, true, ""); 961 assert_live_event( 962 read_relay_value(&mut observer).await, 963 "group-live", 964 &group_note, 965 ); 966 967 send_client_value( 968 &mut observer, 969 json!(["COUNT", "group-note-count", {"kinds":[1], "#h":["SocketFarm"]}]), 970 ) 971 .await; 972 assert_eq!( 973 read_relay_value(&mut observer).await, 974 json!(["COUNT", "group-note-count", {"count": 1}]) 975 ); 976 977 send_client_value( 978 &mut observer, 979 json!(["REQ", "group-note-query", {"kinds":[1], "#h":["SocketFarm"]}]), 980 ) 981 .await; 982 assert_live_event( 983 read_relay_value(&mut observer).await, 984 "group-note-query", 985 &group_note, 986 ); 987 assert_eq!( 988 read_relay_value(&mut observer).await, 989 json!(["EOSE", "group-note-query"]) 990 ); 991 992 shutdown.request_shutdown(); 993 read_websocket_close(&mut owner).await; 994 read_websocket_close(&mut member).await; 995 read_websocket_close(&mut outsider).await; 996 read_websocket_close(&mut observer).await; 997 let report = timeout(Duration::from_secs(2), task) 998 .await 999 .expect("shutdown timeout") 1000 .expect("task") 1001 .expect("serve"); 1002 assert_eq!(report.listen_addr(), address); 1003 1004 let _ = std::fs::remove_dir_all(root); 1005 } 1006 1007 #[tokio::test] 1008 async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_live() { 1009 let root = temp_root("acceptance-privacy-websocket"); 1010 let _ = std::fs::remove_dir_all(&root); 1011 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 1012 let address = listener.local_addr().expect("address"); 1013 let runtime = host_runtime(&root, address); 1014 let shutdown = runtime.shutdown_signal().clone(); 1015 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 1016 let mut owner_writer = connect_nostr_socket(address).await; 1017 let mut owner_reader = connect_nostr_socket(address).await; 1018 let mut member_writer = connect_nostr_socket(address).await; 1019 let mut member_reader = connect_nostr_socket(address).await; 1020 let mut observer = connect_nostr_socket(address).await; 1021 let owner_writer_challenge = read_auth_challenge(&mut owner_writer).await; 1022 let owner_reader_challenge = read_auth_challenge(&mut owner_reader).await; 1023 let member_writer_challenge = read_auth_challenge(&mut member_writer).await; 1024 let member_reader_challenge = read_auth_challenge(&mut member_reader).await; 1025 let _ = read_auth_challenge(&mut observer).await; 1026 let auth_created_at = current_unix_timestamp(); 1027 1028 authenticate_client( 1029 &mut owner_writer, 1030 FixtureKey::Owner, 1031 &owner_writer_challenge, 1032 auth_created_at, 1033 ) 1034 .await; 1035 authenticate_client( 1036 &mut owner_reader, 1037 FixtureKey::Owner, 1038 &owner_reader_challenge, 1039 auth_created_at.saturating_add(1), 1040 ) 1041 .await; 1042 authenticate_client( 1043 &mut member_writer, 1044 FixtureKey::Member, 1045 &member_writer_challenge, 1046 auth_created_at.saturating_add(2), 1047 ) 1048 .await; 1049 authenticate_client( 1050 &mut member_reader, 1051 FixtureKey::Member, 1052 &member_reader_challenge, 1053 auth_created_at.saturating_add(3), 1054 ) 1055 .await; 1056 1057 let private_create = pocket_protocol_group_create_event( 1058 FixtureKey::Owner, 1059 "PrivateSocket", 1060 1_714_124_450, 1061 &["private"], 1062 ); 1063 send_client_value( 1064 &mut owner_writer, 1065 json!(["EVENT", event_to_value(&private_create)]), 1066 ) 1067 .await; 1068 assert_ok( 1069 read_relay_value(&mut owner_writer).await, 1070 &private_create, 1071 true, 1072 "", 1073 ); 1074 1075 let private_put = pocket_protocol_put_user_event( 1076 FixtureKey::Owner, 1077 "PrivateSocket", 1078 FixtureKey::Member, 1079 1_714_124_451, 1080 ); 1081 send_client_value( 1082 &mut owner_writer, 1083 json!(["EVENT", event_to_value(&private_put)]), 1084 ) 1085 .await; 1086 assert_ok( 1087 read_relay_value(&mut owner_writer).await, 1088 &private_put, 1089 true, 1090 "", 1091 ); 1092 1093 assert_count_message( 1094 &mut observer, 1095 "private-metadata-public-count", 1096 json!({"kinds":[KIND_GROUP_METADATA], "#d":["PrivateSocket"]}), 1097 1, 1098 ) 1099 .await; 1100 assert_count_message( 1101 &mut observer, 1102 "private-admins-public-count", 1103 json!({"kinds":[KIND_GROUP_ADMINS], "#d":["PrivateSocket"]}), 1104 1, 1105 ) 1106 .await; 1107 assert_count_message( 1108 &mut observer, 1109 "private-members-public-count", 1110 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["PrivateSocket"]}), 1111 0, 1112 ) 1113 .await; 1114 assert_count_message( 1115 &mut member_reader, 1116 "private-members-member-count", 1117 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["PrivateSocket"]}), 1118 1, 1119 ) 1120 .await; 1121 assert_req_kind_tag_then_eose( 1122 &mut observer, 1123 "private-metadata-public-query", 1124 json!({"kinds":[KIND_GROUP_METADATA], "#d":["PrivateSocket"]}), 1125 KIND_GROUP_METADATA, 1126 "d", 1127 "PrivateSocket", 1128 ) 1129 .await; 1130 assert_req_kind_tag_then_eose( 1131 &mut observer, 1132 "private-admins-public-query", 1133 json!({"kinds":[KIND_GROUP_ADMINS], "#d":["PrivateSocket"]}), 1134 KIND_GROUP_ADMINS, 1135 "d", 1136 "PrivateSocket", 1137 ) 1138 .await; 1139 assert_redacted_req_closed( 1140 &mut observer, 1141 "private-members-public-query", 1142 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["PrivateSocket"]}), 1143 ) 1144 .await; 1145 assert_req_kind_tag_then_eose( 1146 &mut member_reader, 1147 "private-members-member-query", 1148 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["PrivateSocket"]}), 1149 KIND_GROUP_MEMBERS, 1150 "d", 1151 "PrivateSocket", 1152 ) 1153 .await; 1154 1155 send_client_value( 1156 &mut observer, 1157 json!(["REQ", "private-public-live", {"kinds":[1], "#h":["PrivateSocket"]}]), 1158 ) 1159 .await; 1160 assert_eq!( 1161 read_relay_value(&mut observer).await, 1162 json!(["EOSE", "private-public-live"]) 1163 ); 1164 send_client_value( 1165 &mut member_reader, 1166 json!(["REQ", "private-member-live", {"kinds":[1], "#h":["PrivateSocket"]}]), 1167 ) 1168 .await; 1169 assert_eq!( 1170 read_relay_value(&mut member_reader).await, 1171 json!(["EOSE", "private-member-live"]) 1172 ); 1173 1174 let private_note = pocket_protocol_group_event( 1175 FixtureKey::Member, 1176 "PrivateSocket", 1177 1_714_124_452, 1178 1, 1179 "private harvest", 1180 ); 1181 send_client_value( 1182 &mut member_writer, 1183 json!(["EVENT", event_to_value(&private_note)]), 1184 ) 1185 .await; 1186 assert_ok( 1187 read_relay_value(&mut member_writer).await, 1188 &private_note, 1189 true, 1190 "", 1191 ); 1192 assert_live_event( 1193 read_relay_value(&mut member_reader).await, 1194 "private-member-live", 1195 &private_note, 1196 ); 1197 expect_no_relay_message(&mut observer).await; 1198 assert_count_message( 1199 &mut observer, 1200 "private-public-count", 1201 json!({"kinds":[1], "#h":["PrivateSocket"]}), 1202 0, 1203 ) 1204 .await; 1205 assert_count_message( 1206 &mut member_reader, 1207 "private-member-count", 1208 json!({"kinds":[1], "#h":["PrivateSocket"]}), 1209 1, 1210 ) 1211 .await; 1212 assert_count_closed( 1213 &mut observer, 1214 "private-public-broad-count", 1215 json!({"kinds":[1], "#h":["PrivateSocket"], "limit":500}), 1216 "restricted: count filters are too broad or expensive", 1217 ) 1218 .await; 1219 send_client_value( 1220 &mut observer, 1221 json!(["COUNT", "private-approx-count", {"kinds":[1], "#h":["PrivateSocket"], "approximate":true}]), 1222 ) 1223 .await; 1224 assert_eq!( 1225 read_relay_value(&mut observer).await, 1226 json!([ 1227 "NOTICE", 1228 "invalid: filter field `approximate` is unsupported" 1229 ]) 1230 ); 1231 assert_redacted_req_closed( 1232 &mut observer, 1233 "private-public-query", 1234 json!({"kinds":[1], "#h":["PrivateSocket"]}), 1235 ) 1236 .await; 1237 assert_req_event_then_eose( 1238 &mut member_reader, 1239 "private-member-query", 1240 json!({"kinds":[1], "#h":["PrivateSocket"]}), 1241 &private_note, 1242 ) 1243 .await; 1244 1245 let hidden_create = pocket_protocol_group_create_event( 1246 FixtureKey::Owner, 1247 "HiddenSocket", 1248 1_714_124_453, 1249 &["hidden"], 1250 ); 1251 send_client_value( 1252 &mut owner_writer, 1253 json!(["EVENT", event_to_value(&hidden_create)]), 1254 ) 1255 .await; 1256 assert_ok( 1257 read_relay_value(&mut owner_writer).await, 1258 &hidden_create, 1259 true, 1260 "", 1261 ); 1262 1263 let hidden_put = pocket_protocol_put_user_event( 1264 FixtureKey::Owner, 1265 "HiddenSocket", 1266 FixtureKey::Member, 1267 1_714_124_454, 1268 ); 1269 send_client_value( 1270 &mut owner_writer, 1271 json!(["EVENT", event_to_value(&hidden_put)]), 1272 ) 1273 .await; 1274 assert_ok( 1275 read_relay_value(&mut owner_writer).await, 1276 &hidden_put, 1277 true, 1278 "", 1279 ); 1280 1281 for (subscription_id, kind) in [ 1282 ("hidden-metadata-public-count", KIND_GROUP_METADATA), 1283 ("hidden-admins-public-count", KIND_GROUP_ADMINS), 1284 ("hidden-members-public-count", KIND_GROUP_MEMBERS), 1285 ] { 1286 assert_count_message( 1287 &mut observer, 1288 subscription_id, 1289 json!({"kinds":[kind], "#d":["HiddenSocket"]}), 1290 0, 1291 ) 1292 .await; 1293 } 1294 for (subscription_id, kind) in [ 1295 ("hidden-metadata-owner-count", KIND_GROUP_METADATA), 1296 ("hidden-admins-owner-count", KIND_GROUP_ADMINS), 1297 ("hidden-members-owner-count", KIND_GROUP_MEMBERS), 1298 ] { 1299 assert_count_message( 1300 &mut owner_reader, 1301 subscription_id, 1302 json!({"kinds":[kind], "#d":["HiddenSocket"]}), 1303 1, 1304 ) 1305 .await; 1306 } 1307 for (subscription_id, kind) in [ 1308 ("hidden-metadata-public-query", KIND_GROUP_METADATA), 1309 ("hidden-admins-public-query", KIND_GROUP_ADMINS), 1310 ("hidden-members-public-query", KIND_GROUP_MEMBERS), 1311 ] { 1312 assert_redacted_req_closed( 1313 &mut observer, 1314 subscription_id, 1315 json!({"kinds":[kind], "#d":["HiddenSocket"]}), 1316 ) 1317 .await; 1318 } 1319 for (subscription_id, kind) in [ 1320 ("hidden-metadata-owner-query", KIND_GROUP_METADATA), 1321 ("hidden-admins-owner-query", KIND_GROUP_ADMINS), 1322 ("hidden-members-owner-query", KIND_GROUP_MEMBERS), 1323 ] { 1324 assert_req_kind_tag_then_eose( 1325 &mut owner_reader, 1326 subscription_id, 1327 json!({"kinds":[kind], "#d":["HiddenSocket"]}), 1328 kind, 1329 "d", 1330 "HiddenSocket", 1331 ) 1332 .await; 1333 } 1334 1335 send_client_value( 1336 &mut observer, 1337 json!(["REQ", "hidden-public-live", {"kinds":[1], "#h":["HiddenSocket"]}]), 1338 ) 1339 .await; 1340 assert_eq!( 1341 read_relay_value(&mut observer).await, 1342 json!(["EOSE", "hidden-public-live"]) 1343 ); 1344 send_client_value( 1345 &mut owner_reader, 1346 json!(["REQ", "hidden-owner-live", {"kinds":[1], "#h":["HiddenSocket"]}]), 1347 ) 1348 .await; 1349 assert_eq!( 1350 read_relay_value(&mut owner_reader).await, 1351 json!(["EOSE", "hidden-owner-live"]) 1352 ); 1353 send_client_value( 1354 &mut member_reader, 1355 json!(["REQ", "hidden-member-live", {"kinds":[1], "#h":["HiddenSocket"]}]), 1356 ) 1357 .await; 1358 assert_eq!( 1359 read_relay_value(&mut member_reader).await, 1360 json!(["EOSE", "hidden-member-live"]) 1361 ); 1362 1363 let hidden_note = pocket_protocol_group_event( 1364 FixtureKey::Owner, 1365 "HiddenSocket", 1366 1_714_124_455, 1367 1, 1368 "hidden harvest", 1369 ); 1370 send_client_value( 1371 &mut owner_writer, 1372 json!(["EVENT", event_to_value(&hidden_note)]), 1373 ) 1374 .await; 1375 assert_ok( 1376 read_relay_value(&mut owner_writer).await, 1377 &hidden_note, 1378 true, 1379 "", 1380 ); 1381 assert_live_event( 1382 read_relay_value(&mut owner_reader).await, 1383 "hidden-owner-live", 1384 &hidden_note, 1385 ); 1386 assert_live_event( 1387 read_relay_value(&mut member_reader).await, 1388 "hidden-member-live", 1389 &hidden_note, 1390 ); 1391 expect_no_relay_message(&mut observer).await; 1392 assert_count_message( 1393 &mut observer, 1394 "hidden-public-count", 1395 json!({"kinds":[1], "#h":["HiddenSocket"]}), 1396 0, 1397 ) 1398 .await; 1399 assert_count_message( 1400 &mut owner_reader, 1401 "hidden-owner-count", 1402 json!({"kinds":[1], "#h":["HiddenSocket"]}), 1403 1, 1404 ) 1405 .await; 1406 assert_count_closed( 1407 &mut observer, 1408 "hidden-public-broad-count", 1409 json!({"kinds":[1], "#h":["HiddenSocket"], "limit":500}), 1410 "restricted: count filters are too broad or expensive", 1411 ) 1412 .await; 1413 send_client_value( 1414 &mut observer, 1415 json!(["COUNT", "hidden-approx-count", {"kinds":[1], "#h":["HiddenSocket"], "approximate":true}]), 1416 ) 1417 .await; 1418 assert_eq!( 1419 read_relay_value(&mut observer).await, 1420 json!([ 1421 "NOTICE", 1422 "invalid: filter field `approximate` is unsupported" 1423 ]) 1424 ); 1425 assert_redacted_req_closed( 1426 &mut observer, 1427 "hidden-public-query", 1428 json!({"kinds":[1], "#h":["HiddenSocket"]}), 1429 ) 1430 .await; 1431 assert_req_event_then_eose( 1432 &mut owner_reader, 1433 "hidden-owner-query", 1434 json!({"kinds":[1], "#h":["HiddenSocket"]}), 1435 &hidden_note, 1436 ) 1437 .await; 1438 1439 let metrics = wait_for_http_ok(address, "/.well-known/tangle/metrics", None).await; 1440 for private_value in [ 1441 "PrivateSocket", 1442 "HiddenSocket", 1443 "private harvest", 1444 "hidden harvest", 1445 "private-public-broad-count", 1446 "hidden-public-broad-count", 1447 "private-approx-count", 1448 "hidden-approx-count", 1449 ] { 1450 assert!(!metrics.contains(private_value)); 1451 } 1452 let metrics_body = metrics.split_once("\r\n\r\n").expect("metrics body").1; 1453 let metrics_value: Value = serde_json::from_str(metrics_body).expect("metrics json"); 1454 assert_eq!(metrics_value["tangle_count_refusals_total"], 2); 1455 assert_eq!(metrics_value["tangle_broad_query_rejections_total"], 2); 1456 assert!( 1457 metrics_value["tangle_query_candidates_scanned_total"] 1458 .as_u64() 1459 .expect("candidates") 1460 >= metrics_value["tangle_query_redacted_events_total"] 1461 .as_u64() 1462 .expect("redacted") 1463 ); 1464 assert!( 1465 metrics_value["tangle_query_redacted_events_total"] 1466 .as_u64() 1467 .expect("redacted") 1468 >= 2 1469 ); 1470 1471 shutdown.request_shutdown(); 1472 read_websocket_close(&mut owner_writer).await; 1473 read_websocket_close(&mut owner_reader).await; 1474 read_websocket_close(&mut member_writer).await; 1475 read_websocket_close(&mut member_reader).await; 1476 read_websocket_close(&mut observer).await; 1477 let report = timeout(Duration::from_secs(2), task) 1478 .await 1479 .expect("shutdown timeout") 1480 .expect("task") 1481 .expect("serve"); 1482 assert_eq!(report.listen_addr(), address); 1483 1484 let _ = std::fs::remove_dir_all(root); 1485 } 1486 1487 #[tokio::test] 1488 async fn nip11_includes_cors_headers_and_truthful_supported_nips() { 1489 let root = temp_root("acceptance-nip11"); 1490 let _ = std::fs::remove_dir_all(&root); 1491 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 1492 let address = listener.local_addr().expect("address"); 1493 let runtime = host_runtime(&root, address); 1494 let shutdown = runtime.shutdown_signal().clone(); 1495 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 1496 1497 let response = wait_for_http_ok(address, "/", Some("application/nostr+json")).await; 1498 let lower = response.to_ascii_lowercase(); 1499 assert!(lower.contains("content-type: application/nostr+json")); 1500 assert!(lower.contains("access-control-allow-origin: *")); 1501 assert!(lower.contains("access-control-allow-headers: *")); 1502 assert!(lower.contains("access-control-allow-methods: *")); 1503 1504 let document = serde_json::from_str::<Value>(response_body(&response)).expect("nip11 json"); 1505 assert_eq!(document["supported_nips"], json!([1, 11, 29, 42, 45, 70])); 1506 assert!( 1507 !document["supported_nips"] 1508 .as_array() 1509 .expect("supported nips") 1510 .contains(&json!(50)) 1511 ); 1512 assert!( 1513 !document["supported_nips"] 1514 .as_array() 1515 .expect("supported nips") 1516 .contains(&json!(77)) 1517 ); 1518 assert!( 1519 !document["supported_nips"] 1520 .as_array() 1521 .expect("supported nips") 1522 .contains(&json!(99)) 1523 ); 1524 1525 shutdown.request_shutdown(); 1526 let report = timeout(Duration::from_secs(2), task) 1527 .await 1528 .expect("shutdown timeout") 1529 .expect("task") 1530 .expect("serve"); 1531 assert_eq!(report.listen_addr(), address); 1532 1533 let _ = std::fs::remove_dir_all(root); 1534 } 1535 1536 #[test] 1537 fn auth_rejects_events_outside_created_at_skew() { 1538 let mut auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 300, 10).expect("auth"); 1539 1540 assert_eq!( 1541 auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) 1542 .expect("challenge"), 1543 RelayMessage::Auth("challenge-a".to_owned()) 1544 ); 1545 1546 authenticate_pocket_event_for_test( 1547 &mut auth, 1548 &tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 100).expect("fresh"), 1549 UnixTimestamp::new(100), 1550 ) 1551 .expect("fresh"); 1552 1553 assert_eq!( 1554 authenticate_pocket_event_for_test( 1555 &mut auth, 1556 &tangle_v2_auth_event(FixtureKey::Admin, "challenge-a", 89).expect("auth"), 1557 UnixTimestamp::new(100), 1558 ) 1559 .expect_err("stale") 1560 .prefixed_message(), 1561 "auth-required: auth event created_at is outside configured skew" 1562 ); 1563 assert_eq!( 1564 authenticate_pocket_event_for_test( 1565 &mut auth, 1566 &tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 111).expect("auth"), 1567 UnixTimestamp::new(100), 1568 ) 1569 .expect_err("future") 1570 .prefixed_message(), 1571 "auth-required: auth event created_at is outside configured skew" 1572 ); 1573 } 1574 1575 #[test] 1576 fn protected_events_require_author_auth_before_nip70_is_advertised() { 1577 let root = temp_root("acceptance-nip70"); 1578 let _ = std::fs::remove_dir_all(&root); 1579 let config = runtime_config(&root, SocketAddr::from(([127, 0, 0, 1], 0))); 1580 let document = BaseRelayInfoConfig::new("tangle", &config) 1581 .expect("info config") 1582 .build_document() 1583 .expect("document"); 1584 let relay = config.open_relay().expect("relay"); 1585 let protected = tangle_v2_event( 1586 FixtureKey::Member, 1587 1_714_124_433, 1588 1, 1589 vec![Tag::from_parts("-", &[]).expect("protected")], 1590 "protected", 1591 ) 1592 .expect("protected event"); 1593 let mut auth = BaseAuthState::new(TANGLE_V2_RELAY_URL, 300, 10).expect("auth"); 1594 auth.issue_challenge("challenge-a", UnixTimestamp::new(1_714_124_433)) 1595 .expect("challenge"); 1596 authenticate_pocket_event_for_test( 1597 &mut auth, 1598 &tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 1_714_124_433).expect("auth"), 1599 UnixTimestamp::new(1_714_124_433), 1600 ) 1601 .expect("author auth"); 1602 1603 assert!(document.supported_nips.contains(&70)); 1604 assert_eq!( 1605 relay.handle_event(protected.clone()).expect("unauth"), 1606 RelayMessage::Ok { 1607 event_id: protected.id().clone(), 1608 accepted: false, 1609 message: "auth-required: protected event requires authenticated event author" 1610 .to_owned() 1611 } 1612 ); 1613 assert_eq!( 1614 relay 1615 .handle_event_with_auth(protected.clone(), &auth) 1616 .expect("author write"), 1617 RelayMessage::Ok { 1618 event_id: protected.id().clone(), 1619 accepted: true, 1620 message: String::new() 1621 } 1622 ); 1623 1624 let _ = std::fs::remove_dir_all(root); 1625 } 1626 1627 #[test] 1628 fn negentropy_remains_unconfigurable_and_unadvertised_until_read_gated() { 1629 let root = temp_root("acceptance-negentropy"); 1630 let mut raw = runtime_config_value(&root, SocketAddr::from(([127, 0, 0, 1], 0))); 1631 raw.as_object_mut() 1632 .expect("config object") 1633 .insert("negentropy".to_owned(), json!({"enabled": true})); 1634 1635 let error = 1636 parse_base_relay_runtime_config_json(&raw.to_string()).expect_err("negentropy rejected"); 1637 assert!(error.message().contains("unknown field `negentropy`")); 1638 1639 raw.as_object_mut() 1640 .expect("config object") 1641 .remove("negentropy") 1642 .expect("negentropy field"); 1643 let config = parse_base_relay_runtime_config_json(&raw.to_string()).expect("config"); 1644 let document = BaseRelayInfoConfig::new("tangle", &config) 1645 .expect("info config") 1646 .build_document() 1647 .expect("document"); 1648 assert!(!document.supported_nips.contains(&77)); 1649 1650 let _ = std::fs::remove_dir_all(root); 1651 } 1652 1653 #[test] 1654 fn private_but_not_hidden_group_metadata_remains_visible() { 1655 let owner = phase2_pubkey("1"); 1656 let projection = phase2_projection_with_group( 1657 "Farm", 1658 phase2_metadata(true, false, false, false), 1659 owner.clone(), 1660 ); 1661 let authority = GroupAuthority::new([owner.clone()], Vec::<PublicKeyHex>::new()); 1662 let gate = GroupReadGate::new(&projection, &authority); 1663 1664 assert_eq!( 1665 gate.screen_event( 1666 &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_METADATA, "Farm")), 1667 None, 1668 Default::default() 1669 ) 1670 .expect("metadata"), 1671 GroupReadDecision::Visible 1672 ); 1673 assert_eq!( 1674 gate.screen_event( 1675 &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_ADMINS, "Farm")), 1676 None, 1677 Default::default() 1678 ) 1679 .expect("admins"), 1680 GroupReadDecision::Visible 1681 ); 1682 assert_eq!( 1683 gate.screen_event( 1684 &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_MEMBERS, "Farm")), 1685 None, 1686 Default::default() 1687 ) 1688 .expect("members"), 1689 GroupReadDecision::Hidden 1690 ); 1691 1692 let hidden_projection = 1693 phase2_projection_with_group("Hidden", phase2_metadata(false, false, true, false), owner); 1694 let hidden_gate = GroupReadGate::new(&hidden_projection, &authority); 1695 assert_eq!( 1696 hidden_gate 1697 .screen_event( 1698 &pocket_event_for_test(&phase2_snapshot_event(KIND_GROUP_METADATA, "Hidden")), 1699 None, 1700 Default::default() 1701 ) 1702 .expect("hidden metadata"), 1703 GroupReadDecision::Hidden 1704 ); 1705 } 1706 1707 #[test] 1708 fn public_join_defaults_false() { 1709 let owner = phase2_pubkey("1"); 1710 let joiner = phase2_pubkey("2"); 1711 let projection = phase2_projection_with_group( 1712 "Farm", 1713 phase2_metadata(false, false, false, false), 1714 owner.clone(), 1715 ); 1716 let authority = GroupAuthority::new([owner], Vec::<PublicKeyHex>::new()); 1717 let policy = GroupWritePolicy::new(&projection, &authority, GroupPolicyConfig::strict()); 1718 let join = phase2_group_event(KIND_GROUP_JOIN_REQUEST, "Farm", joiner.clone()); 1719 let join_pocket = pocket_event_for_test(&join); 1720 let error = policy 1721 .check_event( 1722 &join_pocket, 1723 &GroupEventClass::Normal { 1724 group_id: GroupId::new("Farm").expect("group"), 1725 }, 1726 &GroupAuthContext::new([joiner]), 1727 ) 1728 .expect_err("join"); 1729 1730 assert_eq!(error.kind(), GroupErrorKind::GroupUnavailable); 1731 assert_eq!(error.prefixed_message(), "restricted: group is unavailable"); 1732 } 1733 1734 #[test] 1735 fn duplicate_join_and_leave_use_duplicate_prefix() { 1736 let owner = phase2_pubkey("1"); 1737 let member = phase2_pubkey("2"); 1738 let outsider = phase2_pubkey("3"); 1739 let mut projection = phase2_projection_with_group( 1740 "Farm", 1741 phase2_metadata(false, false, false, false), 1742 owner.clone(), 1743 ); 1744 projection.put_member( 1745 GroupId::new("Farm").expect("group"), 1746 MemberState::new( 1747 member.clone(), 1748 MemberStatus::Member, 1749 Default::default(), 1750 phase2_event_id("20"), 1751 phase2_order_tuple(20, "20", 2), 1752 ), 1753 ); 1754 let authority = GroupAuthority::new([owner], Vec::<PublicKeyHex>::new()); 1755 let policy = GroupWritePolicy::new( 1756 &projection, 1757 &authority, 1758 GroupPolicyConfig::new(true, false).expect("policy"), 1759 ); 1760 1761 let duplicate_join = policy 1762 .check_event( 1763 &pocket_event_for_test(&phase2_group_event( 1764 KIND_GROUP_JOIN_REQUEST, 1765 "Farm", 1766 member.clone(), 1767 )), 1768 &GroupEventClass::Normal { 1769 group_id: GroupId::new("Farm").expect("group"), 1770 }, 1771 &GroupAuthContext::new([member]), 1772 ) 1773 .expect_err("duplicate join"); 1774 assert_eq!( 1775 duplicate_join.prefixed_message(), 1776 "duplicate: group member already exists" 1777 ); 1778 1779 let duplicate_leave = policy 1780 .check_event( 1781 &pocket_event_for_test(&phase2_group_event( 1782 KIND_GROUP_LEAVE_REQUEST, 1783 "Farm", 1784 outsider.clone(), 1785 )), 1786 &GroupEventClass::Normal { 1787 group_id: GroupId::new("Farm").expect("group"), 1788 }, 1789 &GroupAuthContext::new([outsider]), 1790 ) 1791 .expect_err("duplicate leave"); 1792 assert_eq!( 1793 duplicate_leave.prefixed_message(), 1794 "duplicate: group member does not exist" 1795 ); 1796 } 1797 1798 #[test] 1799 fn closed_groups_use_strict_nip29_semantics_without_compatibility_flag() { 1800 let owner = phase2_pubkey("1"); 1801 let outsider = phase2_pubkey("2"); 1802 let projection = phase2_projection_with_group( 1803 "Closed", 1804 phase2_metadata(false, false, false, true), 1805 owner.clone(), 1806 ); 1807 let authority = GroupAuthority::new([owner], Vec::<PublicKeyHex>::new()); 1808 let policy = GroupWritePolicy::new( 1809 &projection, 1810 &authority, 1811 GroupPolicyConfig::new(true, false).expect("policy"), 1812 ); 1813 1814 let join_error = policy 1815 .check_event( 1816 &pocket_event_for_test(&phase2_group_event( 1817 KIND_GROUP_JOIN_REQUEST, 1818 "Closed", 1819 outsider.clone(), 1820 )), 1821 &GroupEventClass::Normal { 1822 group_id: GroupId::new("Closed").expect("group"), 1823 }, 1824 &GroupAuthContext::new([outsider.clone()]), 1825 ) 1826 .expect_err("closed join"); 1827 assert_eq!(join_error.kind(), GroupErrorKind::GroupUnavailable); 1828 assert_eq!( 1829 join_error.prefixed_message(), 1830 "restricted: group is unavailable" 1831 ); 1832 1833 assert_eq!( 1834 policy 1835 .check_event( 1836 &pocket_event_for_test(&phase2_group_event(1, "Closed", outsider.clone())), 1837 &GroupEventClass::Normal { 1838 group_id: GroupId::new("Closed").expect("group"), 1839 }, 1840 &GroupAuthContext::new([outsider]), 1841 ) 1842 .expect("normal write"), 1843 GroupWriteDecision::Accept 1844 ); 1845 1846 let error = parse_group_runtime_config_json( 1847 r#"{"enabled": false, "policy": {"compat_closed_means_restricted": true}}"#, 1848 ) 1849 .expect_err("compat"); 1850 assert!( 1851 error 1852 .message() 1853 .contains("unknown field `compat_closed_means_restricted`") 1854 ); 1855 } 1856 1857 #[test] 1858 fn req_count_and_live_fanout_share_one_group_read_gate() { 1859 let relay_core = include_str!("../src/relay/core.rs"); 1860 let runtime = include_str!("../src/runtime.rs"); 1861 1862 assert_eq!( 1863 relay_core 1864 .matches("fn group_read_gate_visible_to_auth") 1865 .count(), 1866 1 1867 ); 1868 assert_eq!( 1869 relay_core 1870 .matches("Self::group_read_gate_visible_to_auth") 1871 .count(), 1872 3 1873 ); 1874 assert_eq!( 1875 runtime 1876 .matches("BaseRelay::group_read_gate_visible_to_auth") 1877 .count(), 1878 2 1879 ); 1880 assert!(!relay_core.contains("fn event_visible_to_auth(")); 1881 assert!(!relay_core.contains("fn pocket_event_visible_to_auth(")); 1882 } 1883 1884 #[test] 1885 fn runtime_event_handling_does_not_lock_relay_state() { 1886 let runtime = include_str!("../src/runtime.rs"); 1887 let event_branch = runtime 1888 .split("RuntimeClientMessage::Event(pocket_event) => {") 1889 .nth(1) 1890 .expect("event branch") 1891 .split("RuntimeClientMessage::Req") 1892 .next() 1893 .expect("req branch"); 1894 1895 assert!(!event_branch.contains("relay.lock().await")); 1896 assert!(!event_branch.contains("pocket_event_to_tangle(&pocket_event)?")); 1897 assert!(event_branch.contains("handle_pocket_event_with_auth_report(&pocket_event, auth)?")); 1898 } 1899 1900 #[test] 1901 fn runtime_req_handling_does_not_lock_relay_state() { 1902 let runtime = include_str!("../src/runtime.rs"); 1903 let req_branch = runtime 1904 .split("RuntimeClientMessage::Req {") 1905 .nth(1) 1906 .expect("req branch") 1907 .split("RuntimeClientMessage::Count") 1908 .next() 1909 .expect("count branch"); 1910 let query_helper = runtime 1911 .split("pub(crate) async fn query_req_with_auth") 1912 .nth(1) 1913 .expect("query helper") 1914 .split("pub async fn event_by_offset_with_auth") 1915 .next() 1916 .expect("offset helper"); 1917 1918 assert!(!req_branch.contains("relay.lock().await")); 1919 assert!(!query_helper.contains("relay.lock().await")); 1920 assert!(!req_branch.contains("runtime_filters_to_protocol(filters, search_present)?")); 1921 assert!(req_branch.contains("validate_pocket_filters(&filters)?")); 1922 assert!(req_branch.contains("rate_limit_req_pocket(")); 1923 assert!(req_branch.contains("query_req_with_auth_report(")); 1924 assert!(query_helper.contains("query_req_with_auth_report(")); 1925 } 1926 1927 #[test] 1928 fn runtime_count_handling_does_not_lock_relay_state() { 1929 let runtime = include_str!("../src/runtime.rs"); 1930 let count_branch = runtime 1931 .split( 1932 " RuntimeClientMessage::Count {\n subscription_id,\n filters,\n search_present,\n } => {", 1933 ) 1934 .nth(1) 1935 .expect("count branch") 1936 .split("RuntimeClientMessage::Auth") 1937 .next() 1938 .expect("auth branch"); 1939 1940 assert!(!count_branch.contains("relay.lock().await")); 1941 assert!(!count_branch.contains("runtime_filters_to_protocol(")); 1942 assert!(count_branch.contains("validate_pocket_filters(&filters)?")); 1943 assert!(count_branch.contains("rate_limit_count_pocket(")); 1944 assert!(count_branch.contains("handle_count_with_auth_report(")); 1945 assert!(count_branch.contains("search_present")); 1946 } 1947 1948 #[test] 1949 fn relay_core_exposes_pocket_native_req_and_fanout_boundaries() { 1950 let relay_core = include_str!("../src/relay/core.rs"); 1951 1952 assert!(relay_core.contains("pub fn handle_pocket_req(")); 1953 assert!(relay_core.contains("pub fn handle_pocket_req_with_auth(")); 1954 assert!(relay_core.contains("pub fn fanout_pocket(")); 1955 assert!(relay_core.contains("pub fn fanout_pocket_with_group_auth(")); 1956 assert!(!relay_core.contains("pub fn handle_req(")); 1957 assert!(!relay_core.contains("pub fn handle_req_with_auth(")); 1958 assert!(!relay_core.contains("pub fn fanout(")); 1959 assert!(!relay_core.contains("pub fn fanout_with_group_auth(")); 1960 assert!(!relay_core.contains("pub fn query_events_with_auth(")); 1961 assert!(!relay_core.contains("pub fn validate_event(")); 1962 assert!(!relay_core.contains("pub fn validate_filters(")); 1963 assert!(relay_core.contains("handle_protocol_req_for_test")); 1964 assert!(relay_core.contains("fanout_protocol_for_test")); 1965 } 1966 1967 #[test] 1968 fn runtime_live_fanout_offset_lookup_does_not_lock_relay_state() { 1969 let runtime = include_str!("../src/runtime.rs"); 1970 let fanout_helper = runtime 1971 .split("pub(crate) async fn fanout_event_offset") 1972 .nth(1) 1973 .expect("fanout helper") 1974 .split("pub async fn shutdown") 1975 .next() 1976 .expect("shutdown helper"); 1977 1978 assert!(!fanout_helper.contains("relay.lock().await")); 1979 assert!(fanout_helper.contains("self.inner.store.event_by_offset")); 1980 assert!(fanout_helper.contains("BaseRelay::group_read_gate_visible_to_auth")); 1981 } 1982 1983 #[test] 1984 fn runtime_shared_shell_does_not_keep_transitional_base_relay_mutex() { 1985 let runtime = include_str!("../src/runtime.rs"); 1986 let shared_shell = runtime 1987 .split("struct RelayRuntimeShared {") 1988 .nth(1) 1989 .expect("shared shell") 1990 .split("impl RelayRuntimeShared") 1991 .next() 1992 .expect("shared shell fields"); 1993 let handle_impl = runtime 1994 .split("impl RelayRuntimeHandle") 1995 .nth(1) 1996 .expect("runtime handle") 1997 .split("fn auth_response_failed") 1998 .next() 1999 .expect("runtime handle body"); 2000 2001 assert!(!runtime.contains("Mutex<BaseRelay>")); 2002 assert!(!runtime.contains("relay.lock().await")); 2003 assert!(!shared_shell.contains("relay:")); 2004 assert!(handle_impl.contains("BaseRelay::handle_pocket_auth_with_limits")); 2005 assert!(handle_impl.contains("self.inner.store.sync()?")); 2006 } 2007 2008 #[test] 2009 fn runtime_hot_path_does_not_stringify_and_reparse_events() { 2010 let conversion_boundary = include_str!("../src/pocket_conversion.rs"); 2011 for forbidden in [ 2012 "event_to_value", 2013 "filter_to_value", 2014 "parse_event_json", 2015 "parse_pocket_event_json", 2016 "parse_pocket_filter_json", 2017 ".as_json()", 2018 ] { 2019 assert!( 2020 !conversion_boundary.contains(forbidden), 2021 "runtime Pocket conversion boundary contains forbidden JSON bridge `{forbidden}`" 2022 ); 2023 } 2024 } 2025 2026 #[test] 2027 fn projection_and_outbox_recover_from_canonical_pocket_events() { 2028 let root = temp_root("acceptance-recovery"); 2029 let _ = std::fs::remove_dir_all(&root); 2030 let config = runtime_config(&root, "127.0.0.1:0".parse().expect("listen addr")); 2031 let mut auth = config.auth_state().expect("auth"); 2032 auth.issue_challenge("recovery-challenge", UnixTimestamp::new(1_714_124_470)) 2033 .expect("challenge"); 2034 let owner_auth = tangle_v2_auth_event(FixtureKey::Owner, "recovery-challenge", 1_714_124_470) 2035 .expect("owner auth"); 2036 let member_auth = tangle_v2_auth_event(FixtureKey::Member, "recovery-challenge", 1_714_124_471) 2037 .expect("member auth"); 2038 authenticate_pocket_event_for_test(&mut auth, &owner_auth, UnixTimestamp::new(1_714_124_470)) 2039 .expect("owner"); 2040 authenticate_pocket_event_for_test(&mut auth, &member_auth, UnixTimestamp::new(1_714_124_471)) 2041 .expect("member"); 2042 let create = 2043 tangle_v2_group_create_event(FixtureKey::Owner, "RecoverSocket", 1_714_124_472, &[]) 2044 .expect("create"); 2045 let put_member = tangle_v2_put_user_event( 2046 FixtureKey::Owner, 2047 "RecoverSocket", 2048 FixtureKey::Member, 2049 1_714_124_473, 2050 ) 2051 .expect("put member"); 2052 let note = tangle_v2_group_event( 2053 FixtureKey::Member, 2054 "RecoverSocket", 2055 1_714_124_474, 2056 1, 2057 "recover harvest", 2058 ) 2059 .expect("note"); 2060 2061 { 2062 let mut runtime = RelayRuntime::open(config.clone()).expect("runtime"); 2063 assert_relay_ok( 2064 runtime 2065 .relay_mut() 2066 .handle_event_with_auth(create.clone(), &auth) 2067 .expect("create"), 2068 &create, 2069 true, 2070 "", 2071 ); 2072 assert_relay_ok( 2073 runtime 2074 .relay_mut() 2075 .handle_event_with_auth(put_member.clone(), &auth) 2076 .expect("put member"), 2077 &put_member, 2078 true, 2079 "", 2080 ); 2081 assert_relay_ok( 2082 runtime 2083 .relay_mut() 2084 .handle_event_with_auth(note.clone(), &auth) 2085 .expect("note"), 2086 ¬e, 2087 true, 2088 "", 2089 ); 2090 assert_relay_count( 2091 runtime 2092 .relay() 2093 .handle_count_protocol( 2094 subscription_id("pre-recovery-members"), 2095 vec![relay_filter( 2096 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["RecoverSocket"]}), 2097 )], 2098 ) 2099 .expect("members count"), 2100 "pre-recovery-members", 2101 1, 2102 ); 2103 runtime.shutdown().expect("shutdown"); 2104 } 2105 2106 delete_group_extra_records(config.pocket_config()); 2107 2108 let recovered = RelayRuntime::open(config.clone()).expect("recovered"); 2109 let readiness = recovered.readiness_state().response(); 2110 assert_eq!(readiness.checks.group_projection, "ready"); 2111 assert_eq!(readiness.checks.group_outbox_replay, "ready"); 2112 assert_eq!(readiness.checks.event_bus, "ready"); 2113 assert!( 2114 recovered 2115 .relay() 2116 .group_projection() 2117 .expect("projection") 2118 .group(&GroupId::new("RecoverSocket").expect("group")) 2119 .is_some() 2120 ); 2121 assert_relay_count( 2122 recovered 2123 .relay() 2124 .handle_count_protocol( 2125 subscription_id("recovered-metadata"), 2126 vec![relay_filter( 2127 json!({"kinds":[KIND_GROUP_METADATA], "#d":["RecoverSocket"]}), 2128 )], 2129 ) 2130 .expect("metadata count"), 2131 "recovered-metadata", 2132 1, 2133 ); 2134 assert_relay_count( 2135 recovered 2136 .relay() 2137 .handle_count_protocol( 2138 subscription_id("recovered-admins"), 2139 vec![relay_filter( 2140 json!({"kinds":[KIND_GROUP_ADMINS], "#d":["RecoverSocket"]}), 2141 )], 2142 ) 2143 .expect("admins count"), 2144 "recovered-admins", 2145 1, 2146 ); 2147 assert_relay_count( 2148 recovered 2149 .relay() 2150 .handle_count_protocol( 2151 subscription_id("recovered-members"), 2152 vec![relay_filter( 2153 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["RecoverSocket"]}), 2154 )], 2155 ) 2156 .expect("members count"), 2157 "recovered-members", 2158 1, 2159 ); 2160 assert_relay_count( 2161 recovered 2162 .relay() 2163 .handle_count_protocol( 2164 subscription_id("recovered-note"), 2165 vec![relay_filter(json!({"kinds":[1], "#h":["RecoverSocket"]}))], 2166 ) 2167 .expect("note count"), 2168 "recovered-note", 2169 1, 2170 ); 2171 2172 let _ = std::fs::remove_dir_all(root); 2173 } 2174 2175 #[tokio::test] 2176 async fn relay_generated_events_are_stored_projected_and_broadcast_to_websocket_clients() { 2177 let root = temp_root("acceptance-generated-websocket"); 2178 let _ = std::fs::remove_dir_all(&root); 2179 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 2180 let address = listener.local_addr().expect("address"); 2181 let runtime = host_runtime(&root, address); 2182 let shutdown = runtime.shutdown_signal().clone(); 2183 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 2184 let mut owner = connect_nostr_socket(address).await; 2185 let mut observer = connect_nostr_socket(address).await; 2186 let owner_challenge = read_auth_challenge(&mut owner).await; 2187 let _ = read_auth_challenge(&mut observer).await; 2188 authenticate_client( 2189 &mut owner, 2190 FixtureKey::Owner, 2191 &owner_challenge, 2192 current_unix_timestamp(), 2193 ) 2194 .await; 2195 2196 send_client_value( 2197 &mut observer, 2198 json!([ 2199 "REQ", 2200 "generated-state-live", 2201 {"kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS], "#d":["GeneratedSocket"]} 2202 ]), 2203 ) 2204 .await; 2205 assert_eq!( 2206 read_relay_value(&mut observer).await, 2207 json!(["EOSE", "generated-state-live"]) 2208 ); 2209 2210 let create = 2211 tangle_v2_group_create_event(FixtureKey::Owner, "GeneratedSocket", 1_714_124_460, &[]) 2212 .expect("create"); 2213 send_client_value(&mut owner, json!(["EVENT", event_to_value(&create)])).await; 2214 assert_ok(read_relay_value(&mut owner).await, &create, true, ""); 2215 let create_generated_kinds = [ 2216 relay_event_kind_tag( 2217 read_relay_value(&mut observer).await, 2218 "generated-state-live", 2219 "d", 2220 "GeneratedSocket", 2221 ), 2222 relay_event_kind_tag( 2223 read_relay_value(&mut observer).await, 2224 "generated-state-live", 2225 "d", 2226 "GeneratedSocket", 2227 ), 2228 ]; 2229 assert!(create_generated_kinds.contains(&KIND_GROUP_METADATA)); 2230 assert!(create_generated_kinds.contains(&KIND_GROUP_ADMINS)); 2231 assert_count_message( 2232 &mut observer, 2233 "generated-metadata-count", 2234 json!({"kinds":[KIND_GROUP_METADATA], "#d":["GeneratedSocket"]}), 2235 1, 2236 ) 2237 .await; 2238 assert_count_message( 2239 &mut observer, 2240 "generated-admins-count", 2241 json!({"kinds":[KIND_GROUP_ADMINS], "#d":["GeneratedSocket"]}), 2242 1, 2243 ) 2244 .await; 2245 2246 let put_member = tangle_v2_put_user_event( 2247 FixtureKey::Owner, 2248 "GeneratedSocket", 2249 FixtureKey::Member, 2250 1_714_124_461, 2251 ) 2252 .expect("put member"); 2253 send_client_value(&mut owner, json!(["EVENT", event_to_value(&put_member)])).await; 2254 assert_ok(read_relay_value(&mut owner).await, &put_member, true, ""); 2255 assert_eq!( 2256 relay_event_kind_tag( 2257 read_relay_value(&mut observer).await, 2258 "generated-state-live", 2259 "d", 2260 "GeneratedSocket", 2261 ), 2262 KIND_GROUP_MEMBERS 2263 ); 2264 assert_count_message( 2265 &mut observer, 2266 "generated-members-count", 2267 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":["GeneratedSocket"]}), 2268 1, 2269 ) 2270 .await; 2271 2272 shutdown.request_shutdown(); 2273 read_websocket_close(&mut owner).await; 2274 read_websocket_close(&mut observer).await; 2275 let report = timeout(Duration::from_secs(2), task) 2276 .await 2277 .expect("shutdown timeout") 2278 .expect("task") 2279 .expect("serve"); 2280 assert_eq!(report.listen_addr(), address); 2281 2282 let _ = std::fs::remove_dir_all(root); 2283 } 2284 2285 #[tokio::test] 2286 async fn private_relay_generated_events_reach_authorized_websocket_subscribers() { 2287 let root = temp_root("acceptance-private-generated-websocket"); 2288 let _ = std::fs::remove_dir_all(&root); 2289 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 2290 let address = listener.local_addr().expect("address"); 2291 let runtime = host_runtime(&root, address); 2292 let shutdown = runtime.shutdown_signal().clone(); 2293 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 2294 let mut owner_writer = connect_nostr_socket(address).await; 2295 let mut owner_reader = connect_nostr_socket(address).await; 2296 let writer_challenge = read_auth_challenge(&mut owner_writer).await; 2297 let reader_challenge = read_auth_challenge(&mut owner_reader).await; 2298 let auth_created_at = current_unix_timestamp(); 2299 2300 authenticate_client( 2301 &mut owner_writer, 2302 FixtureKey::Owner, 2303 &writer_challenge, 2304 auth_created_at, 2305 ) 2306 .await; 2307 authenticate_client( 2308 &mut owner_reader, 2309 FixtureKey::Owner, 2310 &reader_challenge, 2311 auth_created_at.saturating_add(1), 2312 ) 2313 .await; 2314 2315 send_client_value( 2316 &mut owner_reader, 2317 json!([ 2318 "REQ", 2319 "private-generated-live", 2320 {"kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS], "#d":["PrivateGeneratedSocket"]} 2321 ]), 2322 ) 2323 .await; 2324 assert_eq!( 2325 read_relay_value(&mut owner_reader).await, 2326 json!(["EOSE", "private-generated-live"]) 2327 ); 2328 2329 let create = tangle_v2_group_create_event( 2330 FixtureKey::Owner, 2331 "PrivateGeneratedSocket", 2332 1_714_124_470, 2333 &["private"], 2334 ) 2335 .expect("create"); 2336 send_client_value(&mut owner_writer, json!(["EVENT", event_to_value(&create)])).await; 2337 assert_ok(read_relay_value(&mut owner_writer).await, &create, true, ""); 2338 let create_generated_kinds = [ 2339 relay_event_kind_tag( 2340 read_relay_value(&mut owner_reader).await, 2341 "private-generated-live", 2342 "d", 2343 "PrivateGeneratedSocket", 2344 ), 2345 relay_event_kind_tag( 2346 read_relay_value(&mut owner_reader).await, 2347 "private-generated-live", 2348 "d", 2349 "PrivateGeneratedSocket", 2350 ), 2351 ]; 2352 assert!(create_generated_kinds.contains(&KIND_GROUP_METADATA)); 2353 assert!(create_generated_kinds.contains(&KIND_GROUP_ADMINS)); 2354 2355 let put_member = tangle_v2_put_user_event( 2356 FixtureKey::Owner, 2357 "PrivateGeneratedSocket", 2358 FixtureKey::Member, 2359 1_714_124_471, 2360 ) 2361 .expect("put member"); 2362 send_client_value( 2363 &mut owner_writer, 2364 json!(["EVENT", event_to_value(&put_member)]), 2365 ) 2366 .await; 2367 assert_ok( 2368 read_relay_value(&mut owner_writer).await, 2369 &put_member, 2370 true, 2371 "", 2372 ); 2373 assert_eq!( 2374 relay_event_kind_tag( 2375 read_relay_value(&mut owner_reader).await, 2376 "private-generated-live", 2377 "d", 2378 "PrivateGeneratedSocket", 2379 ), 2380 KIND_GROUP_MEMBERS 2381 ); 2382 2383 shutdown.request_shutdown(); 2384 read_websocket_close(&mut owner_writer).await; 2385 read_websocket_close(&mut owner_reader).await; 2386 let report = timeout(Duration::from_secs(2), task) 2387 .await 2388 .expect("shutdown timeout") 2389 .expect("task") 2390 .expect("serve"); 2391 assert_eq!(report.listen_addr(), address); 2392 2393 let _ = std::fs::remove_dir_all(root); 2394 } 2395 2396 fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfig { 2397 parse_base_relay_runtime_config_json(&runtime_config_value(root, listen_addr).to_string()) 2398 .expect("config") 2399 } 2400 2401 fn host_runtime(root: &Path, listen_addr: SocketAddr) -> TangleHostRuntime { 2402 let host = parse_tangle_host_runtime_config_json( 2403 &json!({ 2404 "listen_addr": listen_addr.to_string(), 2405 "tenant_config_dir": "tenants", 2406 "limits": { 2407 "max_total_connections": 128, 2408 "max_total_subscriptions": 1024, 2409 "tenant_startup_concurrency": 4 2410 } 2411 }) 2412 .to_string(), 2413 ) 2414 .expect("host config"); 2415 let tenant = parse_tenant_runtime_config_json( 2416 &json!({ 2417 "tenant_id": "acceptance-relay", 2418 "tenant_schema": "acceptance_relay", 2419 "host": "relay.radroots.test", 2420 "relay_url": "wss://relay.radroots.test", 2421 "info": { 2422 "name": "tangle" 2423 }, 2424 "pocket": { 2425 "data_directory": root.join("pocket"), 2426 "sync_policy": "flush_on_shutdown" 2427 }, 2428 "pocket_query": { 2429 "allow_scraping": false, 2430 "allow_scrape_if_limited_to": 100, 2431 "allow_scrape_if_max_seconds": 3600 2432 }, 2433 "groups": { 2434 "enabled": true, 2435 "canonical_relay_url": "wss://relay.radroots.test", 2436 "relay_secret": TANGLE_V2_RELAY_SECRET_HEX, 2437 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] 2438 }, 2439 "auth": { 2440 "challenge_ttl_seconds": 300, 2441 "created_at_skew_seconds": 600 2442 }, 2443 "limits": { 2444 "max_message_length": 1048576, 2445 "max_subid_length": 64, 2446 "max_subscriptions_per_connection": 64, 2447 "max_filters_per_request": 10, 2448 "max_tag_values_per_filter": 100, 2449 "max_query_complexity": 2048, 2450 "max_limit": 500, 2451 "default_limit": 100, 2452 "max_event_tags": 200, 2453 "max_content_length": 65536, 2454 "broadcast_channel_capacity": 8, 2455 "per_connection_outbound_queue": 8 2456 }, 2457 "rate_limits": { 2458 "auth": { 2459 "per_ip": {"window_seconds": 60, "max_hits": 120}, 2460 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 2461 "failures": {"window_seconds": 300, "max_hits": 5}, 2462 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 2463 }, 2464 "event": { 2465 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2466 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2467 "per_kind": {"window_seconds": 60, "max_hits": 1000} 2468 }, 2469 "group": { 2470 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 2471 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 2472 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 2473 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 2474 "join_flow": {"window_seconds": 300, "max_hits": 10}, 2475 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 2476 }, 2477 "req": { 2478 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2479 "per_connection": {"window_seconds": 60, "max_hits": 120}, 2480 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 2481 "per_group": {"window_seconds": 60, "max_hits": 240}, 2482 "per_kind": {"window_seconds": 60, "max_hits": 500}, 2483 "broad": {"window_seconds": 60, "max_hits": 30} 2484 }, 2485 "count": { 2486 "per_ip": {"window_seconds": 60, "max_hits": 300}, 2487 "per_connection": {"window_seconds": 60, "max_hits": 60}, 2488 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2489 "per_group": {"window_seconds": 60, "max_hits": 120}, 2490 "per_kind": {"window_seconds": 60, "max_hits": 240}, 2491 "broad": {"window_seconds": 60, "max_hits": 20} 2492 } 2493 } 2494 }) 2495 .to_string(), 2496 ) 2497 .expect("tenant config"); 2498 let config = TangleHostRuntimeConfigSet::new(host, vec![tenant]).expect("host config set"); 2499 TangleHostRuntime::open(config).expect("host runtime") 2500 } 2501 2502 fn runtime_config_value(root: &Path, listen_addr: SocketAddr) -> Value { 2503 json!({ 2504 "server": { 2505 "listen_addr": listen_addr.to_string(), 2506 "relay_url": "wss://relay.radroots.test" 2507 }, 2508 "pocket": { 2509 "data_directory": root.join("pocket"), 2510 "sync_policy": "flush_on_shutdown", 2511 "query": { 2512 "allow_scraping": false, 2513 "allow_scrape_if_limited_to": 100, 2514 "allow_scrape_if_max_seconds": 3600 2515 } 2516 }, 2517 "groups": { 2518 "enabled": true, 2519 "canonical_relay_url": "wss://relay.radroots.test", 2520 "relay_secret": TANGLE_V2_RELAY_SECRET_HEX, 2521 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()] 2522 }, 2523 "auth": { 2524 "challenge_ttl_seconds": 300, 2525 "created_at_skew_seconds": 600 2526 }, 2527 "limits": { 2528 "max_message_length": 1048576, 2529 "max_subid_length": 64, 2530 "max_subscriptions_per_connection": 64, 2531 "max_filters_per_request": 10, 2532 "max_tag_values_per_filter": 100, 2533 "max_query_complexity": 2048, 2534 "max_limit": 500, 2535 "default_limit": 100, 2536 "max_event_tags": 200, 2537 "max_content_length": 65536, 2538 "broadcast_channel_capacity": 8, 2539 "per_connection_outbound_queue": 8 2540 }, 2541 "rate_limits": { 2542 "auth": { 2543 "per_ip": {"window_seconds": 60, "max_hits": 120}, 2544 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 2545 "failures": {"window_seconds": 300, "max_hits": 5}, 2546 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 2547 }, 2548 "event": { 2549 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2550 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2551 "per_kind": {"window_seconds": 60, "max_hits": 1000} 2552 }, 2553 "group": { 2554 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 2555 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 2556 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 2557 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 2558 "join_flow": {"window_seconds": 300, "max_hits": 10}, 2559 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 2560 }, 2561 "req": { 2562 "per_ip": {"window_seconds": 60, "max_hits": 600}, 2563 "per_connection": {"window_seconds": 60, "max_hits": 120}, 2564 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 2565 "per_group": {"window_seconds": 60, "max_hits": 240}, 2566 "per_kind": {"window_seconds": 60, "max_hits": 500}, 2567 "broad": {"window_seconds": 60, "max_hits": 30} 2568 }, 2569 "count": { 2570 "per_ip": {"window_seconds": 60, "max_hits": 300}, 2571 "per_connection": {"window_seconds": 60, "max_hits": 60}, 2572 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 2573 "per_group": {"window_seconds": 60, "max_hits": 120}, 2574 "per_kind": {"window_seconds": 60, "max_hits": 240}, 2575 "broad": {"window_seconds": 60, "max_hits": 20} 2576 } 2577 } 2578 }) 2579 } 2580 2581 async fn wait_for_http_ok( 2582 address: SocketAddr, 2583 path: &'static str, 2584 accept: Option<&'static str>, 2585 ) -> String { 2586 let deadline = Instant::now() + Duration::from_secs(3); 2587 let mut last_error = String::new(); 2588 while Instant::now() < deadline { 2589 match tokio::task::spawn_blocking(move || http_get(address, path, accept)) 2590 .await 2591 .expect("http task") 2592 { 2593 Ok(response) if response.starts_with("HTTP/1.1 200 OK") => return response, 2594 Ok(response) => { 2595 last_error = response.lines().next().unwrap_or("").to_owned(); 2596 } 2597 Err(error) => { 2598 last_error = error.to_string(); 2599 } 2600 } 2601 tokio::time::sleep(Duration::from_millis(25)).await; 2602 } 2603 panic!("server did not answer {path}: {last_error}"); 2604 } 2605 2606 fn http_get(address: SocketAddr, path: &str, accept: Option<&str>) -> std::io::Result<String> { 2607 let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(200))?; 2608 stream.set_read_timeout(Some(Duration::from_millis(500)))?; 2609 stream.set_write_timeout(Some(Duration::from_millis(500)))?; 2610 let mut request = 2611 format!("GET {path} HTTP/1.1\r\nHost: relay.radroots.test\r\nConnection: close\r\n"); 2612 if let Some(accept) = accept { 2613 request.push_str(&format!("Accept: {accept}\r\n")); 2614 } 2615 request.push_str("\r\n"); 2616 stream.write_all(request.as_bytes())?; 2617 let mut response = String::new(); 2618 stream.read_to_string(&mut response)?; 2619 Ok(response) 2620 } 2621 2622 fn response_body(response: &str) -> &str { 2623 response.split_once("\r\n\r\n").expect("response body").1 2624 } 2625 2626 fn temp_root(name: &str) -> PathBuf { 2627 std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) 2628 } 2629 2630 type TestWebSocket = 2631 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>; 2632 2633 async fn connect_nostr_socket(address: SocketAddr) -> TestWebSocket { 2634 let mut request = format!("ws://{address}/") 2635 .into_client_request() 2636 .expect("request"); 2637 request.headers_mut().insert( 2638 header::SEC_WEBSOCKET_PROTOCOL, 2639 http::HeaderValue::from_static("nostr"), 2640 ); 2641 request.headers_mut().insert( 2642 header::HOST, 2643 http::HeaderValue::from_static("relay.radroots.test"), 2644 ); 2645 let (socket, response) = tokio_tungstenite::connect_async(request) 2646 .await 2647 .expect("websocket"); 2648 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 2649 assert_eq!( 2650 response 2651 .headers() 2652 .get(header::SEC_WEBSOCKET_PROTOCOL) 2653 .expect("protocol"), 2654 "nostr" 2655 ); 2656 socket 2657 } 2658 2659 async fn send_client_value(socket: &mut TestWebSocket, value: Value) { 2660 send_client_text(socket, &value.to_string()).await; 2661 } 2662 2663 async fn send_client_text(socket: &mut TestWebSocket, value: &str) { 2664 socket 2665 .send(TungsteniteMessage::Text(value.to_owned().into())) 2666 .await 2667 .expect("send client message"); 2668 } 2669 2670 async fn send_client_binary(socket: &mut TestWebSocket, value: &[u8]) { 2671 socket 2672 .send(TungsteniteMessage::Binary(value.to_vec().into())) 2673 .await 2674 .expect("send client binary"); 2675 } 2676 2677 async fn read_relay_value(socket: &mut TestWebSocket) -> Value { 2678 let message = timeout(Duration::from_secs(1), socket.next()) 2679 .await 2680 .expect("relay message timeout") 2681 .expect("relay message") 2682 .expect("relay message result"); 2683 let TungsteniteMessage::Text(text) = message else { 2684 panic!("expected relay text message, got {message:?}"); 2685 }; 2686 serde_json::from_str(text.as_str()).expect("relay json") 2687 } 2688 2689 async fn read_auth_challenge(socket: &mut TestWebSocket) -> String { 2690 let auth = read_relay_value(socket).await; 2691 assert_eq!(auth[0], "AUTH"); 2692 auth[1].as_str().expect("auth challenge").to_owned() 2693 } 2694 2695 async fn read_websocket_close(socket: &mut TestWebSocket) { 2696 let next = timeout(Duration::from_secs(1), socket.next()) 2697 .await 2698 .expect("websocket close"); 2699 match next { 2700 Some(Ok(TungsteniteMessage::Close(_))) | None => {} 2701 other => panic!("expected websocket close, got {other:?}"), 2702 } 2703 } 2704 2705 async fn expect_no_relay_message(socket: &mut TestWebSocket) { 2706 assert!( 2707 timeout(Duration::from_millis(75), socket.next()) 2708 .await 2709 .is_err() 2710 ); 2711 } 2712 2713 async fn authenticate_client( 2714 socket: &mut TestWebSocket, 2715 fixture_key: FixtureKey, 2716 challenge: &str, 2717 created_at: u64, 2718 ) { 2719 let auth = pocket_protocol_auth_event(fixture_key, challenge, created_at); 2720 send_client_value(socket, json!(["AUTH", event_to_value(&auth)])).await; 2721 assert_ok(read_relay_value(socket).await, &auth, true, ""); 2722 } 2723 2724 async fn assert_count_message( 2725 socket: &mut TestWebSocket, 2726 subscription_id: &str, 2727 filter: Value, 2728 count: u64, 2729 ) { 2730 send_client_value(socket, json!(["COUNT", subscription_id, filter])).await; 2731 assert_eq!( 2732 read_relay_value(socket).await, 2733 json!(["COUNT", subscription_id, {"count": count}]) 2734 ); 2735 } 2736 2737 async fn assert_count_closed( 2738 socket: &mut TestWebSocket, 2739 subscription_id: &str, 2740 filter: Value, 2741 message: &str, 2742 ) { 2743 send_client_value(socket, json!(["COUNT", subscription_id, filter])).await; 2744 assert_eq!( 2745 read_relay_value(socket).await, 2746 json!(["CLOSED", subscription_id, message]) 2747 ); 2748 } 2749 2750 async fn assert_redacted_req_closed( 2751 socket: &mut TestWebSocket, 2752 subscription_id: &str, 2753 filter: Value, 2754 ) { 2755 send_client_value(socket, json!(["REQ", subscription_id, filter])).await; 2756 assert_eq!( 2757 read_relay_value(socket).await, 2758 json!([ 2759 "CLOSED", 2760 subscription_id, 2761 "auth-required: authentication required to read group events" 2762 ]) 2763 ); 2764 } 2765 2766 async fn assert_req_event_then_eose( 2767 socket: &mut TestWebSocket, 2768 subscription_id: &str, 2769 filter: Value, 2770 event: &Event, 2771 ) { 2772 send_client_value(socket, json!(["REQ", subscription_id, filter])).await; 2773 assert_live_event(read_relay_value(socket).await, subscription_id, event); 2774 assert_eq!( 2775 read_relay_value(socket).await, 2776 json!(["EOSE", subscription_id]) 2777 ); 2778 } 2779 2780 async fn assert_req_kind_tag_then_eose( 2781 socket: &mut TestWebSocket, 2782 subscription_id: &str, 2783 filter: Value, 2784 kind: u32, 2785 tag_name: &str, 2786 tag_value: &str, 2787 ) { 2788 send_client_value(socket, json!(["REQ", subscription_id, filter])).await; 2789 assert_relay_event_kind_tag( 2790 read_relay_value(socket).await, 2791 subscription_id, 2792 kind, 2793 tag_name, 2794 tag_value, 2795 ); 2796 assert_eq!( 2797 read_relay_value(socket).await, 2798 json!(["EOSE", subscription_id]) 2799 ); 2800 } 2801 2802 fn assert_relay_ok(message: RelayMessage, event: &Event, accepted: bool, reason: &str) { 2803 assert_eq!( 2804 message, 2805 RelayMessage::Ok { 2806 event_id: event.id().clone(), 2807 accepted, 2808 message: reason.to_owned() 2809 } 2810 ); 2811 } 2812 2813 fn assert_relay_count(message: RelayMessage, subscription_id: &str, count: u64) { 2814 assert_eq!( 2815 message, 2816 RelayMessage::Count { 2817 subscription_id: SubscriptionId::new(subscription_id).expect("subscription"), 2818 count, 2819 hll: None 2820 } 2821 ); 2822 } 2823 2824 fn relay_filter(value: Value) -> Filter { 2825 filter_from_value(&value).expect("filter") 2826 } 2827 2828 fn subscription_id(value: &str) -> SubscriptionId { 2829 SubscriptionId::new(value).expect("subscription") 2830 } 2831 2832 fn delete_group_extra_records(config: &PocketStoreConfig) { 2833 let store = PocketStoreHandle::open(config).expect("store"); 2834 for table in [ 2835 TANGLE_GROUP_PROJECTION_TABLE, 2836 TANGLE_GROUP_OUTBOX_TABLE, 2837 TANGLE_GROUP_CHECKPOINT_TABLE, 2838 ] { 2839 for (key, _) in store.scan_extra_records(table).expect("scan") { 2840 store.delete_extra_record(table, &key).expect("delete"); 2841 } 2842 } 2843 store.sync().expect("sync"); 2844 } 2845 2846 fn assert_notice_prefix(value: Value, prefix: &str) { 2847 assert_eq!(value[0], "NOTICE"); 2848 assert!(value[1].as_str().expect("notice").starts_with(prefix)); 2849 } 2850 2851 fn assert_ok(value: Value, event: &Event, accepted: bool, message: &str) { 2852 assert_eq!(value, json!(["OK", event.id().as_str(), accepted, message])); 2853 } 2854 2855 fn assert_ok_message_prefix(value: Value, event: &Event, accepted: bool, prefix: &str) { 2856 assert_eq!(value[0], "OK"); 2857 assert_eq!(value[1], event.id().as_str()); 2858 assert_eq!(value[2], accepted); 2859 assert!( 2860 value[3] 2861 .as_str() 2862 .is_some_and(|message| message.starts_with(prefix)) 2863 ); 2864 } 2865 2866 fn assert_live_event(value: Value, subscription_id: &str, event: &Event) { 2867 assert_eq!(value[0], "EVENT"); 2868 assert_eq!(value[1], subscription_id); 2869 assert_eq!(value[2]["id"], event.id().as_str()); 2870 } 2871 2872 fn assert_relay_event_kind_tag( 2873 value: Value, 2874 subscription_id: &str, 2875 kind: u32, 2876 tag_name: &str, 2877 tag_value: &str, 2878 ) { 2879 assert_eq!( 2880 relay_event_kind_tag(value, subscription_id, tag_name, tag_value), 2881 kind 2882 ); 2883 } 2884 2885 fn relay_event_kind_tag( 2886 value: Value, 2887 subscription_id: &str, 2888 tag_name: &str, 2889 tag_value: &str, 2890 ) -> u32 { 2891 assert_eq!(value[0], "EVENT"); 2892 assert_eq!(value[1], subscription_id); 2893 let tags = value[2]["tags"].as_array().expect("tags"); 2894 assert!(tags.iter().any(|tag| { 2895 let Some(parts) = tag.as_array() else { 2896 return false; 2897 }; 2898 parts.first().and_then(Value::as_str) == Some(tag_name) 2899 && parts.get(1).and_then(Value::as_str) == Some(tag_value) 2900 })); 2901 u32::try_from(value[2]["kind"].as_u64().expect("event kind")).expect("event kind fits u32") 2902 } 2903 2904 fn phase2_projection_with_group( 2905 group_id: &str, 2906 metadata: GroupMetadata, 2907 author: PublicKeyHex, 2908 ) -> GroupProjection { 2909 let mut projection = GroupProjection::new(); 2910 projection.put_group(GroupState::new( 2911 tangle_groups::GroupId::new(group_id).expect("group"), 2912 metadata, 2913 author, 2914 phase2_event_id("10"), 2915 phase2_order_tuple(10, "10", 1), 2916 )); 2917 projection 2918 } 2919 2920 fn phase2_metadata(private: bool, restricted: bool, hidden: bool, closed: bool) -> GroupMetadata { 2921 GroupMetadata::from_parts( 2922 GroupMetadataText::empty(), 2923 GroupMetadataFlags::new(private, restricted, hidden, closed), 2924 SupportedKinds::UnspecifiedAll, 2925 ) 2926 } 2927 2928 fn phase2_snapshot_event(kind: u32, group_id: &str) -> Event { 2929 Event::new( 2930 phase2_event_id("01"), 2931 UnsignedEvent::new( 2932 phase2_pubkey("9"), 2933 UnixTimestamp::new(1), 2934 Kind::new(kind.into()).expect("kind"), 2935 vec![Tag::from_parts("d", &[group_id]).expect("d")], 2936 "", 2937 ), 2938 SignatureHex::new(&"2".repeat(128)).expect("sig"), 2939 ) 2940 } 2941 2942 fn phase2_group_event(kind: u32, group_id: &str, author: PublicKeyHex) -> Event { 2943 Event::new( 2944 phase2_event_id("02"), 2945 UnsignedEvent::new( 2946 author, 2947 UnixTimestamp::new(2), 2948 Kind::new(kind.into()).expect("kind"), 2949 vec![Tag::from_parts("h", &[group_id]).expect("h")], 2950 "", 2951 ), 2952 SignatureHex::new(&"3".repeat(128)).expect("sig"), 2953 ) 2954 } 2955 2956 fn phase2_pubkey(suffix: &str) -> PublicKeyHex { 2957 PublicKeyHex::new(&suffix.repeat(64)).expect("pubkey") 2958 } 2959 2960 fn phase2_event_id(suffix: &str) -> EventId { 2961 let mut value = "0".repeat(64 - suffix.len()); 2962 value.push_str(suffix); 2963 EventId::new(&value).expect("id") 2964 } 2965 2966 fn phase2_order_tuple(created_at: u64, suffix: &str, offset: u64) -> ProjectionOrderTuple { 2967 ProjectionOrderTuple::new( 2968 UnixTimestamp::new(created_at), 2969 phase2_event_id(suffix), 2970 StoreOffset::new(offset), 2971 ) 2972 } 2973 2974 fn current_unix_timestamp() -> u64 { 2975 SystemTime::now() 2976 .duration_since(UNIX_EPOCH) 2977 .expect("system time") 2978 .as_secs() 2979 }