tenant_isolation.rs (32449B)
1 #![forbid(unsafe_code)] 2 3 use futures_util::{SinkExt, StreamExt}; 4 use http::header; 5 use serde_json::{Value, json}; 6 use std::{ 7 net::SocketAddr, 8 path::{Path, PathBuf}, 9 time::{Duration, SystemTime, UNIX_EPOCH}, 10 }; 11 use tangle_crypto::RelaySigner; 12 use tangle_groups::{ 13 KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, 14 KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, 15 }; 16 use tangle_protocol::{ 17 Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, 18 event_from_value, event_to_value, 19 }; 20 use tangle_runtime::{ 21 config::{ 22 TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json, 23 parse_tenant_runtime_config_json, 24 }, 25 errors::BaseRelayError, 26 host::TangleHostRuntime, 27 runtime::TangleShutdownSignal, 28 server::{TangleServeReport, serve_listener_until_shutdown}, 29 }; 30 use tangle_store_pocket::{ 31 PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime, parse_pocket_event_json, 32 }; 33 use tangle_test_support::FixtureKey; 34 use tokio::{net::TcpListener, task::JoinHandle, time::timeout}; 35 use tokio_tungstenite::{ 36 MaybeTlsStream, WebSocketStream, connect_async, 37 tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest}, 38 }; 39 40 type TestWebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>; 41 42 #[tokio::test] 43 async fn tenant_isolation_public_events_counts_hll_and_live_fanout() { 44 let host = RunningHost::start("public-events", 600).await; 45 let mut alpha = connect_socket(host.address, "alpha.relay.test").await; 46 let mut beta = connect_socket(host.address, "beta.relay.test").await; 47 let mut beta_subscriber = connect_socket(host.address, "beta.relay.test").await; 48 let target = "a".repeat(EventId::HEX_LENGTH); 49 let target_tag = Tag::from_parts("e", &[&target]).expect("target"); 50 let shared = tangle_v2_event( 51 FixtureKey::Member, 52 1_714_200_001, 53 1, 54 Vec::new(), 55 "shared tenant note", 56 ) 57 .expect("shared"); 58 let beta_extra = tangle_v2_event( 59 FixtureKey::Admin, 60 1_714_200_002, 61 1, 62 Vec::new(), 63 "beta only note", 64 ) 65 .expect("beta extra"); 66 let hll_shared = tangle_v2_event( 67 FixtureKey::Member, 68 1_714_200_003, 69 7, 70 vec![target_tag.clone()], 71 "+", 72 ) 73 .expect("hll shared"); 74 let hll_beta_extra = 75 tangle_v2_event(FixtureKey::Admin, 1_714_200_004, 7, vec![target_tag], "+") 76 .expect("hll beta extra"); 77 78 send_client_value( 79 &mut beta_subscriber, 80 json!(["REQ", "beta-live", {"kinds":[1]}]), 81 ) 82 .await; 83 assert_eq!( 84 read_relay_value(&mut beta_subscriber).await, 85 json!(["EOSE", "beta-live"]) 86 ); 87 88 assert_ok_accepted(&mut alpha, &shared).await; 89 assert_eq!( 90 collect_req_events(&mut alpha, "alpha-after-alpha", json!({"kinds":[1]})) 91 .await 92 .iter() 93 .map(|event| event.id().as_str().to_owned()) 94 .collect::<Vec<_>>(), 95 vec![shared.id().as_str().to_owned()] 96 ); 97 assert!( 98 collect_req_events(&mut beta, "beta-before-beta", json!({"kinds":[1]})) 99 .await 100 .is_empty() 101 ); 102 assert_no_relay_message(&mut beta_subscriber).await; 103 104 assert_ok_accepted(&mut beta, &shared).await; 105 let live_shared = read_relay_value(&mut beta_subscriber).await; 106 assert_eq!(live_shared[0], "EVENT"); 107 assert_eq!(live_shared[1], "beta-live"); 108 assert_eq!(live_shared[2]["id"], shared.id().as_str()); 109 110 assert_ok_accepted(&mut beta, &beta_extra).await; 111 let live_extra = read_relay_value(&mut beta_subscriber).await; 112 assert_eq!(live_extra[0], "EVENT"); 113 assert_eq!(live_extra[1], "beta-live"); 114 assert_eq!(live_extra[2]["id"], beta_extra.id().as_str()); 115 assert_ok_accepted(&mut alpha, &hll_shared).await; 116 assert_ok_accepted(&mut beta, &hll_shared).await; 117 assert_ok_accepted(&mut beta, &hll_beta_extra).await; 118 119 let alpha_events = collect_req_events(&mut alpha, "alpha-final", json!({"kinds":[1]})).await; 120 let beta_events = collect_req_events(&mut beta, "beta-final", json!({"kinds":[1]})).await; 121 assert_eq!(alpha_events.len(), 1); 122 assert_eq!(beta_events.len(), 2); 123 assert_eq!(alpha_events[0].id(), shared.id()); 124 assert!(beta_events.iter().any(|event| event.id() == shared.id())); 125 assert!( 126 beta_events 127 .iter() 128 .any(|event| event.id() == beta_extra.id()) 129 ); 130 131 let alpha_count = count_payload( 132 &mut alpha, 133 "alpha-count", 134 json!({"kinds":[7], "#e":[target]}), 135 ) 136 .await; 137 let beta_count = 138 count_payload(&mut beta, "beta-count", json!({"kinds":[7], "#e":[target]})).await; 139 assert_eq!(alpha_count["count"], 1); 140 assert_eq!(beta_count["count"], 2); 141 let alpha_hll = alpha_count["hll"].as_str().expect("alpha hll"); 142 let beta_hll = beta_count["hll"].as_str().expect("beta hll"); 143 assert_eq!(alpha_hll.len(), 512); 144 assert_eq!(beta_hll.len(), 512); 145 assert_ne!(alpha_hll, beta_hll); 146 147 host.shutdown().await; 148 } 149 150 #[tokio::test] 151 async fn tenant_isolation_group_state_generated_signatures_and_delete_are_local() { 152 let host = RunningHost::start("group-state", 600).await; 153 let mut alpha = connect_authenticated_socket( 154 host.address, 155 "alpha.relay.test", 156 "wss://alpha.relay.test", 157 FixtureKey::Owner, 158 ) 159 .await; 160 let mut beta = connect_authenticated_socket( 161 host.address, 162 "beta.relay.test", 163 "wss://beta.relay.test", 164 FixtureKey::Owner, 165 ) 166 .await; 167 let group_id = "shared-isolation"; 168 let create = 169 tangle_v2_group_create_event(FixtureKey::Owner, group_id, 1_714_200_100, &["public"]) 170 .expect("create"); 171 let alpha_metadata = tangle_v2_group_metadata_event( 172 FixtureKey::Owner, 173 group_id, 174 "Alpha Tenant Market", 175 1_714_200_101, 176 &[], 177 ) 178 .expect("alpha metadata"); 179 let beta_metadata = tangle_v2_group_metadata_event( 180 FixtureKey::Owner, 181 group_id, 182 "Beta Tenant Market", 183 1_714_200_102, 184 &[], 185 ) 186 .expect("beta metadata"); 187 let beta_member = tangle_v2_put_user_event( 188 FixtureKey::Owner, 189 group_id, 190 FixtureKey::Member, 191 1_714_200_103, 192 ) 193 .expect("beta member"); 194 let alpha_normal = 195 tangle_v2_group_event(FixtureKey::Owner, group_id, 1_714_200_104, 1, "alpha crop") 196 .expect("alpha normal"); 197 let beta_normal = 198 tangle_v2_group_event(FixtureKey::Owner, group_id, 1_714_200_105, 1, "beta crop") 199 .expect("beta normal"); 200 201 assert_ok_accepted(&mut alpha, &create).await; 202 assert_ok_accepted(&mut beta, &create).await; 203 assert_ok_accepted(&mut alpha, &alpha_metadata).await; 204 assert_ok_accepted(&mut beta, &beta_metadata).await; 205 assert_ok_accepted(&mut beta, &beta_member).await; 206 assert_ok_accepted(&mut alpha, &alpha_normal).await; 207 assert_ok_accepted(&mut beta, &beta_normal).await; 208 209 let alpha_relay_pubkey = relay_pubkey_hex(0x77); 210 let beta_relay_pubkey = relay_pubkey_hex(0x88); 211 let alpha_generated = collect_req_events( 212 &mut alpha, 213 "alpha-generated-metadata", 214 json!({"kinds":[KIND_GROUP_METADATA], "#d":[group_id]}), 215 ) 216 .await; 217 let beta_generated = collect_req_events( 218 &mut beta, 219 "beta-generated-metadata", 220 json!({"kinds":[KIND_GROUP_METADATA], "#d":[group_id]}), 221 ) 222 .await; 223 assert_eq!(alpha_generated.len(), 1); 224 assert_eq!(beta_generated.len(), 1); 225 assert_eq!( 226 tag_value(&alpha_generated[0], "name"), 227 Some("Alpha Tenant Market") 228 ); 229 assert_eq!( 230 tag_value(&beta_generated[0], "name"), 231 Some("Beta Tenant Market") 232 ); 233 assert_eq!( 234 alpha_generated[0].unsigned().pubkey().as_str(), 235 alpha_relay_pubkey 236 ); 237 assert_eq!( 238 beta_generated[0].unsigned().pubkey().as_str(), 239 beta_relay_pubkey 240 ); 241 assert_pocket_signature(&alpha_generated[0]); 242 assert_pocket_signature(&beta_generated[0]); 243 244 let alpha_admins = collect_req_events( 245 &mut alpha, 246 "alpha-generated-admins", 247 json!({"kinds":[KIND_GROUP_ADMINS], "#d":[group_id]}), 248 ) 249 .await; 250 let beta_admins = collect_req_events( 251 &mut beta, 252 "beta-generated-admins", 253 json!({"kinds":[KIND_GROUP_ADMINS], "#d":[group_id]}), 254 ) 255 .await; 256 assert_eq!(alpha_admins.len(), 1); 257 assert_eq!(beta_admins.len(), 1); 258 assert_eq!( 259 alpha_admins[0].unsigned().pubkey().as_str(), 260 alpha_relay_pubkey 261 ); 262 assert_eq!( 263 beta_admins[0].unsigned().pubkey().as_str(), 264 beta_relay_pubkey 265 ); 266 267 assert!( 268 collect_req_events( 269 &mut alpha, 270 "alpha-generated-members", 271 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":[group_id]}), 272 ) 273 .await 274 .is_empty() 275 ); 276 let beta_members = collect_req_events( 277 &mut beta, 278 "beta-generated-members", 279 json!({"kinds":[KIND_GROUP_MEMBERS], "#d":[group_id]}), 280 ) 281 .await; 282 assert_eq!(beta_members.len(), 1); 283 assert_eq!( 284 beta_members[0].unsigned().pubkey().as_str(), 285 beta_relay_pubkey 286 ); 287 288 let alpha_delete = 289 tangle_v2_delete_group_event(FixtureKey::Owner, group_id, 1_714_200_106).expect("delete"); 290 let alpha_future = tangle_v2_group_event( 291 FixtureKey::Owner, 292 group_id, 293 1_714_200_107, 294 1, 295 "alpha blocked", 296 ) 297 .expect("alpha future"); 298 let beta_future = tangle_v2_group_event( 299 FixtureKey::Owner, 300 group_id, 301 1_714_200_108, 302 1, 303 "beta still open", 304 ) 305 .expect("beta future"); 306 assert_ok_accepted(&mut alpha, &alpha_delete).await; 307 let rejected = publish_event(&mut alpha, &alpha_future).await; 308 assert_eq!(rejected[0], "OK"); 309 assert_eq!(rejected[2], false); 310 assert_eq!(rejected[3], "blocked: group is deleted"); 311 assert_ok_accepted(&mut beta, &beta_future).await; 312 313 assert_eq!( 314 req_closed_message( 315 &mut alpha, 316 "alpha-deleted-normal", 317 json!({"kinds":[1], "#h":[group_id]}), 318 ) 319 .await, 320 "restricted: group is unavailable" 321 ); 322 assert_eq!( 323 collect_req_events( 324 &mut beta, 325 "beta-open-normal", 326 json!({"kinds":[1], "#h":[group_id]}), 327 ) 328 .await 329 .len(), 330 2 331 ); 332 assert_eq!( 333 count_payload( 334 &mut alpha, 335 "alpha-delete-marker", 336 json!({"kinds":[KIND_GROUP_DELETE_GROUP], "#h":[group_id]}), 337 ) 338 .await["count"], 339 1 340 ); 341 assert_eq!( 342 count_payload( 343 &mut beta, 344 "beta-delete-marker", 345 json!({"kinds":[KIND_GROUP_DELETE_GROUP], "#h":[group_id]}), 346 ) 347 .await["count"], 348 0 349 ); 350 351 let metrics = http_get(host.address, "/.well-known/tangle/metrics").await; 352 for private_value in [ 353 group_id, 354 alpha_metadata.id().as_str(), 355 beta_metadata.id().as_str(), 356 alpha_normal.id().as_str(), 357 beta_normal.id().as_str(), 358 FixtureKey::Owner.public_key().as_str(), 359 "alpha crop", 360 "beta crop", 361 &"77".repeat(32), 362 &"88".repeat(32), 363 ] { 364 assert!(!metrics.contains(private_value)); 365 } 366 367 host.shutdown().await; 368 } 369 370 #[tokio::test] 371 async fn tenant_isolation_rate_limits_are_tenant_local() { 372 let host = RunningHost::start("rate-limits", 1).await; 373 let mut alpha = connect_socket(host.address, "alpha.relay.test").await; 374 let mut beta = connect_socket(host.address, "beta.relay.test").await; 375 let first = tangle_v2_event( 376 FixtureKey::Member, 377 1_714_200_200, 378 1, 379 Vec::new(), 380 "alpha first", 381 ) 382 .expect("first"); 383 let second = tangle_v2_event( 384 FixtureKey::Admin, 385 1_714_200_201, 386 1, 387 Vec::new(), 388 "alpha second", 389 ) 390 .expect("second"); 391 let beta_first = tangle_v2_event( 392 FixtureKey::Outsider, 393 1_714_200_202, 394 1, 395 Vec::new(), 396 "beta first", 397 ) 398 .expect("beta first"); 399 400 assert_ok_accepted(&mut alpha, &first).await; 401 let rejected = publish_event(&mut alpha, &second).await; 402 assert_eq!(rejected[0], "OK"); 403 assert_eq!(rejected[2], false); 404 let message = rejected[3].as_str().expect("message"); 405 assert!(message.starts_with("rate-limited: ")); 406 assert!(message.contains("rate limit exceeded until ")); 407 assert_ok_accepted(&mut beta, &beta_first).await; 408 409 host.shutdown().await; 410 } 411 412 #[test] 413 fn tenant_isolation_rejects_shared_pocket_store_config() { 414 let root = temp_root("shared-store"); 415 let _ = std::fs::remove_dir_all(&root); 416 let host = parse_tangle_host_runtime_config_json(&host_config_value().to_string()) 417 .expect("host config"); 418 let alpha = parse_tenant_runtime_config_json( 419 &tenant_config_value( 420 &root, 421 TenantFixture { 422 tenant_id: "alpha", 423 tenant_schema: "alpha_schema", 424 host: "alpha.relay.test", 425 relay_url: "wss://alpha.relay.test", 426 name: "Alpha Relay", 427 relay_secret_byte: 0x77, 428 pocket_suffix: "shared", 429 }, 430 600, 431 ) 432 .to_string(), 433 ) 434 .expect("alpha"); 435 let beta = parse_tenant_runtime_config_json( 436 &tenant_config_value( 437 &root, 438 TenantFixture { 439 tenant_id: "beta", 440 tenant_schema: "beta_schema", 441 host: "beta.relay.test", 442 relay_url: "wss://beta.relay.test", 443 name: "Beta Relay", 444 relay_secret_byte: 0x88, 445 pocket_suffix: "shared", 446 }, 447 600, 448 ) 449 .to_string(), 450 ) 451 .expect("beta"); 452 let error = TangleHostRuntimeConfigSet::new(host, vec![alpha, beta]).expect_err("shared store"); 453 454 assert!( 455 error 456 .message() 457 .contains("duplicate tenant pocket data directory") 458 ); 459 let _ = std::fs::remove_dir_all(root); 460 } 461 462 struct RunningHost { 463 root: PathBuf, 464 address: SocketAddr, 465 shutdown_signal: TangleShutdownSignal, 466 task: JoinHandle<Result<TangleServeReport, BaseRelayError>>, 467 } 468 469 impl RunningHost { 470 async fn start(name: &str, event_per_ip_max_hits: u64) -> Self { 471 let root = temp_root(name); 472 let _ = std::fs::remove_dir_all(&root); 473 let runtime = host_runtime(&root, event_per_ip_max_hits); 474 let shutdown_signal = runtime.shutdown_signal().clone(); 475 let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); 476 let address = listener.local_addr().expect("address"); 477 let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener)); 478 Self { 479 root, 480 address, 481 shutdown_signal, 482 task, 483 } 484 } 485 486 async fn shutdown(self) { 487 self.shutdown_signal.request_shutdown(); 488 let report = timeout(Duration::from_secs(1), self.task) 489 .await 490 .expect("server shutdown") 491 .expect("task") 492 .expect("serve"); 493 assert_eq!(report.listen_addr(), self.address); 494 let _ = std::fs::remove_dir_all(self.root); 495 } 496 } 497 498 async fn connect_socket(address: SocketAddr, host: &str) -> TestWebSocket { 499 let mut request = format!("ws://{address}/") 500 .into_client_request() 501 .expect("request"); 502 request.headers_mut().insert( 503 header::SEC_WEBSOCKET_PROTOCOL, 504 http::HeaderValue::from_static("nostr"), 505 ); 506 request.headers_mut().insert( 507 header::HOST, 508 http::HeaderValue::from_str(host).expect("host"), 509 ); 510 let (mut socket, response) = connect_async(request).await.expect("websocket"); 511 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 512 let challenge = read_relay_value(&mut socket).await; 513 assert_eq!(challenge[0], "AUTH"); 514 socket 515 } 516 517 async fn connect_authenticated_socket( 518 address: SocketAddr, 519 host: &str, 520 relay_url: &str, 521 key: FixtureKey, 522 ) -> TestWebSocket { 523 let mut request = format!("ws://{address}/") 524 .into_client_request() 525 .expect("request"); 526 request.headers_mut().insert( 527 header::SEC_WEBSOCKET_PROTOCOL, 528 http::HeaderValue::from_static("nostr"), 529 ); 530 request.headers_mut().insert( 531 header::HOST, 532 http::HeaderValue::from_str(host).expect("host"), 533 ); 534 let (mut socket, response) = connect_async(request).await.expect("websocket"); 535 assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); 536 let challenge = read_relay_value(&mut socket).await; 537 assert_eq!(challenge[0], "AUTH"); 538 let auth = auth_event_for_relay( 539 key, 540 challenge[1].as_str().expect("challenge"), 541 current_unix_timestamp(), 542 relay_url, 543 ) 544 .expect("auth"); 545 send_client_value(&mut socket, json!(["AUTH", event_to_value(&auth)])).await; 546 assert_eq!( 547 read_relay_value(&mut socket).await, 548 json!(["OK", auth.id().as_str(), true, ""]) 549 ); 550 socket 551 } 552 553 async fn assert_ok_accepted(socket: &mut TestWebSocket, event: &Event) { 554 assert_eq!( 555 publish_event(socket, event).await, 556 json!(["OK", event.id().as_str(), true, ""]) 557 ); 558 } 559 560 async fn publish_event(socket: &mut TestWebSocket, event: &Event) -> Value { 561 send_client_value(socket, json!(["EVENT", event_to_value(event)])).await; 562 read_relay_value(socket).await 563 } 564 565 async fn collect_req_events( 566 socket: &mut TestWebSocket, 567 subscription_id: &str, 568 filter: Value, 569 ) -> Vec<Event> { 570 send_client_value(socket, json!(["REQ", subscription_id, filter])).await; 571 let mut events = Vec::new(); 572 loop { 573 let message = read_relay_value(socket).await; 574 match message[0].as_str().expect("message kind") { 575 "EVENT" => { 576 assert_eq!(message[1], subscription_id); 577 events.push(event_from_value(&message[2]).expect("event")); 578 } 579 "EOSE" => { 580 assert_eq!(message[1], subscription_id); 581 send_client_value(socket, json!(["CLOSE", subscription_id])).await; 582 break; 583 } 584 "CLOSED" => panic!("{message}"), 585 other => panic!("{other}: {message}"), 586 } 587 } 588 events 589 } 590 591 async fn req_closed_message( 592 socket: &mut TestWebSocket, 593 subscription_id: &str, 594 filter: Value, 595 ) -> String { 596 send_client_value(socket, json!(["REQ", subscription_id, filter])).await; 597 let message = read_relay_value(socket).await; 598 assert_eq!(message[0], "CLOSED"); 599 assert_eq!(message[1], subscription_id); 600 message[2].as_str().expect("closed message").to_owned() 601 } 602 603 async fn count_payload(socket: &mut TestWebSocket, subscription_id: &str, filter: Value) -> Value { 604 send_client_value(socket, json!(["COUNT", subscription_id, filter])).await; 605 let message = read_relay_value(socket).await; 606 assert_eq!(message[0], "COUNT", "{message}"); 607 assert_eq!(message[1], subscription_id); 608 message[2].clone() 609 } 610 611 async fn send_client_value(socket: &mut TestWebSocket, value: Value) { 612 socket 613 .send(TungsteniteMessage::Text(value.to_string().into())) 614 .await 615 .expect("send client message"); 616 } 617 618 async fn read_relay_value(socket: &mut TestWebSocket) -> Value { 619 let message = timeout(Duration::from_secs(1), socket.next()) 620 .await 621 .expect("relay message timeout") 622 .expect("relay message") 623 .expect("relay message result"); 624 let TungsteniteMessage::Text(text) = message else { 625 panic!("expected relay text message, got {message:?}"); 626 }; 627 serde_json::from_str(text.as_str()).expect("relay json") 628 } 629 630 async fn assert_no_relay_message(socket: &mut TestWebSocket) { 631 assert!( 632 timeout(Duration::from_millis(100), socket.next()) 633 .await 634 .is_err() 635 ); 636 } 637 638 async fn http_get(address: SocketAddr, path: &str) -> String { 639 let path = path.to_owned(); 640 tokio::task::spawn_blocking(move || -> std::io::Result<String> { 641 use std::io::{Read, Write}; 642 use std::net::TcpStream; 643 644 let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(500))?; 645 stream.set_read_timeout(Some(Duration::from_millis(500)))?; 646 stream.set_write_timeout(Some(Duration::from_millis(500)))?; 647 let request = 648 format!("GET {path} HTTP/1.1\r\nHost: alpha.relay.test\r\nConnection: close\r\n\r\n"); 649 stream.write_all(request.as_bytes())?; 650 let mut response = String::new(); 651 stream.read_to_string(&mut response)?; 652 Ok(response 653 .split("\r\n\r\n") 654 .nth(1) 655 .unwrap_or_default() 656 .to_owned()) 657 }) 658 .await 659 .expect("http task") 660 .expect("http get") 661 } 662 663 fn auth_event_for_relay( 664 key: FixtureKey, 665 challenge: &str, 666 created_at: u64, 667 relay_url: &str, 668 ) -> Result<Event, String> { 669 tangle_v2_event( 670 key, 671 created_at, 672 22_242, 673 vec![ 674 Tag::from_parts("relay", &[relay_url])?, 675 Tag::from_parts("challenge", &[challenge])?, 676 ], 677 "", 678 ) 679 } 680 681 fn tangle_v2_event( 682 key: FixtureKey, 683 created_at: u64, 684 kind: u64, 685 tags: Vec<Tag>, 686 content: &str, 687 ) -> Result<Event, String> { 688 let event = isolation_pocket_event(key, created_at, kind, tags, content); 689 isolation_pocket_event_to_protocol(&event) 690 } 691 692 fn tangle_v2_group_create_event( 693 key: FixtureKey, 694 group_id: &str, 695 created_at: u64, 696 flags: &[&str], 697 ) -> Result<Event, String> { 698 let mut tags = vec![ 699 Tag::from_parts("h", &[group_id])?, 700 Tag::from_parts("name", &[group_id])?, 701 ]; 702 for flag in flags { 703 tags.push(Tag::from_parts(flag, &[])?); 704 } 705 tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "") 706 } 707 708 fn tangle_v2_group_metadata_event( 709 key: FixtureKey, 710 group_id: &str, 711 name: &str, 712 created_at: u64, 713 flags: &[&str], 714 ) -> Result<Event, String> { 715 let mut tags = vec![ 716 Tag::from_parts("h", &[group_id])?, 717 Tag::from_parts("name", &[name])?, 718 ]; 719 for flag in flags { 720 tags.push(Tag::from_parts(flag, &[])?); 721 } 722 tangle_v2_event(key, created_at, KIND_GROUP_EDIT_METADATA.into(), tags, "") 723 } 724 725 fn tangle_v2_put_user_event( 726 key: FixtureKey, 727 group_id: &str, 728 target: FixtureKey, 729 created_at: u64, 730 ) -> Result<Event, String> { 731 let target = target.public_key(); 732 tangle_v2_event( 733 key, 734 created_at, 735 KIND_GROUP_PUT_USER.into(), 736 vec![ 737 Tag::from_parts("h", &[group_id])?, 738 Tag::from_parts("p", &[target.as_str()])?, 739 ], 740 "", 741 ) 742 } 743 744 fn tangle_v2_delete_group_event( 745 key: FixtureKey, 746 group_id: &str, 747 created_at: u64, 748 ) -> Result<Event, String> { 749 tangle_v2_group_event( 750 key, 751 group_id, 752 created_at, 753 KIND_GROUP_DELETE_GROUP.into(), 754 "", 755 ) 756 } 757 758 fn tangle_v2_group_event( 759 key: FixtureKey, 760 group_id: &str, 761 created_at: u64, 762 kind: u64, 763 content: &str, 764 ) -> Result<Event, String> { 765 tangle_v2_event( 766 key, 767 created_at, 768 kind, 769 vec![Tag::from_parts("h", &[group_id])?], 770 content, 771 ) 772 } 773 774 fn isolation_pocket_event( 775 key: FixtureKey, 776 created_at: u64, 777 kind: u64, 778 tags: Vec<Tag>, 779 content: &str, 780 ) -> PocketOwnedEvent { 781 let tags = isolation_pocket_tags_from_protocol(&tags); 782 let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32); 783 RelaySigner::from_secret_hex(&secret) 784 .expect("signer") 785 .sign_pocket_event( 786 PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")), 787 &tags, 788 PocketTime::from_u64(created_at), 789 content.as_bytes(), 790 ) 791 .expect("pocket event") 792 } 793 794 fn isolation_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags { 795 let parts = tags 796 .iter() 797 .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>()) 798 .collect::<Vec<_>>(); 799 PocketOwnedTags::new(&parts).expect("pocket tags") 800 } 801 802 fn isolation_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> { 803 let tags = event 804 .tags() 805 .map_err(|error| error.to_string())? 806 .iter() 807 .map(|tag| { 808 Tag::new( 809 tag.map(|value| { 810 std::str::from_utf8(value) 811 .map(str::to_owned) 812 .map_err(|error| error.to_string()) 813 }) 814 .collect::<Result<Vec<_>, _>>()?, 815 ) 816 .map_err(|error| error.to_string()) 817 }) 818 .collect::<Result<Vec<_>, _>>()?; 819 Ok(Event::new( 820 EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?, 821 UnsignedEvent::new( 822 PublicKeyHex::new(&event.pubkey().as_hex_string()) 823 .map_err(|error| error.to_string())?, 824 UnixTimestamp::new(event.created_at().as_u64()), 825 Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?, 826 tags, 827 std::str::from_utf8(event.content()).map_err(|error| error.to_string())?, 828 ), 829 SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?, 830 )) 831 } 832 833 fn fixture_secret_byte(key: FixtureKey) -> u8 { 834 match key { 835 FixtureKey::Relay => 9, 836 FixtureKey::Owner => 10, 837 FixtureKey::Admin => 11, 838 FixtureKey::Member => 12, 839 FixtureKey::Outsider => 13, 840 } 841 } 842 843 fn host_runtime(root: &Path, event_per_ip_max_hits: u64) -> TangleHostRuntime { 844 let host = parse_tangle_host_runtime_config_json(&host_config_value().to_string()) 845 .expect("host config"); 846 let tenants = [ 847 TenantFixture { 848 tenant_id: "alpha", 849 tenant_schema: "alpha_schema", 850 host: "alpha.relay.test", 851 relay_url: "wss://alpha.relay.test", 852 name: "Alpha Relay", 853 relay_secret_byte: 0x77, 854 pocket_suffix: "alpha", 855 }, 856 TenantFixture { 857 tenant_id: "beta", 858 tenant_schema: "beta_schema", 859 host: "beta.relay.test", 860 relay_url: "wss://beta.relay.test", 861 name: "Beta Relay", 862 relay_secret_byte: 0x88, 863 pocket_suffix: "beta", 864 }, 865 ] 866 .into_iter() 867 .map(|fixture| { 868 parse_tenant_runtime_config_json( 869 &tenant_config_value(root, fixture, event_per_ip_max_hits).to_string(), 870 ) 871 .expect("tenant config") 872 }) 873 .collect::<Vec<_>>(); 874 let config = TangleHostRuntimeConfigSet::new(host, tenants).expect("config set"); 875 TangleHostRuntime::open(config).expect("host runtime") 876 } 877 878 fn host_config_value() -> Value { 879 json!({ 880 "listen_addr": "127.0.0.1:0", 881 "tenant_config_dir": "tenants", 882 "limits": { 883 "max_total_connections": 64, 884 "max_total_subscriptions": 512, 885 "tenant_startup_concurrency": 4 886 } 887 }) 888 } 889 890 struct TenantFixture<'a> { 891 tenant_id: &'a str, 892 tenant_schema: &'a str, 893 host: &'a str, 894 relay_url: &'a str, 895 name: &'a str, 896 relay_secret_byte: u8, 897 pocket_suffix: &'a str, 898 } 899 900 fn tenant_config_value( 901 root: &Path, 902 fixture: TenantFixture<'_>, 903 event_per_ip_max_hits: u64, 904 ) -> Value { 905 let relay_secret = format!("{:02x}", fixture.relay_secret_byte).repeat(32); 906 json!({ 907 "tenant_id": fixture.tenant_id, 908 "tenant_schema": fixture.tenant_schema, 909 "host": fixture.host, 910 "relay_url": fixture.relay_url, 911 "info": { 912 "name": fixture.name 913 }, 914 "pocket": { 915 "data_directory": root.join(fixture.pocket_suffix), 916 "sync_policy": "flush_on_shutdown", 917 }, 918 "pocket_query": { 919 "allow_scraping": false, 920 "allow_scrape_if_limited_to": 100, 921 "allow_scrape_if_max_seconds": 3600 922 }, 923 "groups": { 924 "enabled": true, 925 "canonical_relay_url": fixture.relay_url, 926 "relay_secret": relay_secret, 927 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], 928 "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()] 929 }, 930 "auth": { 931 "challenge_ttl_seconds": 300, 932 "created_at_skew_seconds": 600 933 }, 934 "limits": { 935 "max_message_length": 1048576, 936 "max_subid_length": 64, 937 "max_subscriptions_per_connection": 64, 938 "max_filters_per_request": 10, 939 "max_tag_values_per_filter": 100, 940 "max_query_complexity": 2048, 941 "max_limit": 500, 942 "default_limit": 100, 943 "max_event_tags": 200, 944 "max_content_length": 65536, 945 "broadcast_channel_capacity": 16, 946 "per_connection_outbound_queue": 16 947 }, 948 "rate_limits": { 949 "auth": { 950 "per_ip": {"window_seconds": 60, "max_hits": 120}, 951 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 952 "failures": {"window_seconds": 300, "max_hits": 5}, 953 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 954 }, 955 "event": { 956 "per_ip": {"window_seconds": 60, "max_hits": event_per_ip_max_hits}, 957 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 958 "per_kind": {"window_seconds": 60, "max_hits": 1000} 959 }, 960 "group": { 961 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 962 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 963 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 964 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 965 "join_flow": {"window_seconds": 300, "max_hits": 10}, 966 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 967 }, 968 "req": { 969 "per_ip": {"window_seconds": 60, "max_hits": 600}, 970 "per_connection": {"window_seconds": 60, "max_hits": 120}, 971 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 972 "per_group": {"window_seconds": 60, "max_hits": 240}, 973 "per_kind": {"window_seconds": 60, "max_hits": 500}, 974 "broad": {"window_seconds": 60, "max_hits": 30} 975 }, 976 "count": { 977 "per_ip": {"window_seconds": 60, "max_hits": 300}, 978 "per_connection": {"window_seconds": 60, "max_hits": 60}, 979 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 980 "per_group": {"window_seconds": 60, "max_hits": 120}, 981 "per_kind": {"window_seconds": 60, "max_hits": 240}, 982 "broad": {"window_seconds": 60, "max_hits": 20} 983 } 984 } 985 }) 986 } 987 988 fn relay_pubkey_hex(secret_byte: u8) -> String { 989 RelaySigner::from_secret_hex(&format!("{secret_byte:02x}").repeat(32)) 990 .expect("relay signer") 991 .public_key() 992 .as_str() 993 .to_owned() 994 } 995 996 fn tag_value<'a>(event: &'a Event, tag_name: &str) -> Option<&'a str> { 997 event 998 .unsigned() 999 .tags() 1000 .iter() 1001 .find(|tag| tag.name().as_str() == tag_name) 1002 .and_then(|tag| tag.values().get(1)) 1003 .map(String::as_str) 1004 } 1005 1006 fn assert_pocket_signature(event: &Event) { 1007 let raw = serde_json::to_vec(&event_to_value(event)).expect("event json"); 1008 parse_pocket_event_json(&raw) 1009 .expect("pocket event") 1010 .verify() 1011 .expect("pocket signature"); 1012 } 1013 1014 fn current_unix_timestamp() -> u64 { 1015 SystemTime::now() 1016 .duration_since(UNIX_EPOCH) 1017 .expect("system time") 1018 .as_secs() 1019 } 1020 1021 fn temp_root(name: &str) -> PathBuf { 1022 std::env::temp_dir().join(format!( 1023 "tangle-tenant-isolation-{name}-{}", 1024 std::process::id() 1025 )) 1026 }