tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

tenant_isolation.rs (32449B)


      1 #![forbid(unsafe_code)]
      2 
      3 use futures_util::{SinkExt, StreamExt};
      4 use http::header;
      5 use serde_json::{Value, json};
      6 use std::{
      7     net::SocketAddr,
      8     path::{Path, PathBuf},
      9     time::{Duration, SystemTime, UNIX_EPOCH},
     10 };
     11 use tangle_crypto::RelaySigner;
     12 use tangle_groups::{
     13     KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA,
     14     KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER,
     15 };
     16 use tangle_protocol::{
     17     Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
     18     event_from_value, event_to_value,
     19 };
     20 use tangle_runtime::{
     21     config::{
     22         TangleHostRuntimeConfigSet, parse_tangle_host_runtime_config_json,
     23         parse_tenant_runtime_config_json,
     24     },
     25     errors::BaseRelayError,
     26     host::TangleHostRuntime,
     27     runtime::TangleShutdownSignal,
     28     server::{TangleServeReport, serve_listener_until_shutdown},
     29 };
     30 use tangle_store_pocket::{
     31     PocketEvent, PocketKind, PocketOwnedEvent, PocketOwnedTags, PocketTime, parse_pocket_event_json,
     32 };
     33 use tangle_test_support::FixtureKey;
     34 use tokio::{net::TcpListener, task::JoinHandle, time::timeout};
     35 use tokio_tungstenite::{
     36     MaybeTlsStream, WebSocketStream, connect_async,
     37     tungstenite::{Message as TungsteniteMessage, client::IntoClientRequest},
     38 };
     39 
     40 type TestWebSocket = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
     41 
     42 #[tokio::test]
     43 async fn tenant_isolation_public_events_counts_hll_and_live_fanout() {
     44     let host = RunningHost::start("public-events", 600).await;
     45     let mut alpha = connect_socket(host.address, "alpha.relay.test").await;
     46     let mut beta = connect_socket(host.address, "beta.relay.test").await;
     47     let mut beta_subscriber = connect_socket(host.address, "beta.relay.test").await;
     48     let target = "a".repeat(EventId::HEX_LENGTH);
     49     let target_tag = Tag::from_parts("e", &[&target]).expect("target");
     50     let shared = tangle_v2_event(
     51         FixtureKey::Member,
     52         1_714_200_001,
     53         1,
     54         Vec::new(),
     55         "shared tenant note",
     56     )
     57     .expect("shared");
     58     let beta_extra = tangle_v2_event(
     59         FixtureKey::Admin,
     60         1_714_200_002,
     61         1,
     62         Vec::new(),
     63         "beta only note",
     64     )
     65     .expect("beta extra");
     66     let hll_shared = tangle_v2_event(
     67         FixtureKey::Member,
     68         1_714_200_003,
     69         7,
     70         vec![target_tag.clone()],
     71         "+",
     72     )
     73     .expect("hll shared");
     74     let hll_beta_extra =
     75         tangle_v2_event(FixtureKey::Admin, 1_714_200_004, 7, vec![target_tag], "+")
     76             .expect("hll beta extra");
     77 
     78     send_client_value(
     79         &mut beta_subscriber,
     80         json!(["REQ", "beta-live", {"kinds":[1]}]),
     81     )
     82     .await;
     83     assert_eq!(
     84         read_relay_value(&mut beta_subscriber).await,
     85         json!(["EOSE", "beta-live"])
     86     );
     87 
     88     assert_ok_accepted(&mut alpha, &shared).await;
     89     assert_eq!(
     90         collect_req_events(&mut alpha, "alpha-after-alpha", json!({"kinds":[1]}))
     91             .await
     92             .iter()
     93             .map(|event| event.id().as_str().to_owned())
     94             .collect::<Vec<_>>(),
     95         vec![shared.id().as_str().to_owned()]
     96     );
     97     assert!(
     98         collect_req_events(&mut beta, "beta-before-beta", json!({"kinds":[1]}))
     99             .await
    100             .is_empty()
    101     );
    102     assert_no_relay_message(&mut beta_subscriber).await;
    103 
    104     assert_ok_accepted(&mut beta, &shared).await;
    105     let live_shared = read_relay_value(&mut beta_subscriber).await;
    106     assert_eq!(live_shared[0], "EVENT");
    107     assert_eq!(live_shared[1], "beta-live");
    108     assert_eq!(live_shared[2]["id"], shared.id().as_str());
    109 
    110     assert_ok_accepted(&mut beta, &beta_extra).await;
    111     let live_extra = read_relay_value(&mut beta_subscriber).await;
    112     assert_eq!(live_extra[0], "EVENT");
    113     assert_eq!(live_extra[1], "beta-live");
    114     assert_eq!(live_extra[2]["id"], beta_extra.id().as_str());
    115     assert_ok_accepted(&mut alpha, &hll_shared).await;
    116     assert_ok_accepted(&mut beta, &hll_shared).await;
    117     assert_ok_accepted(&mut beta, &hll_beta_extra).await;
    118 
    119     let alpha_events = collect_req_events(&mut alpha, "alpha-final", json!({"kinds":[1]})).await;
    120     let beta_events = collect_req_events(&mut beta, "beta-final", json!({"kinds":[1]})).await;
    121     assert_eq!(alpha_events.len(), 1);
    122     assert_eq!(beta_events.len(), 2);
    123     assert_eq!(alpha_events[0].id(), shared.id());
    124     assert!(beta_events.iter().any(|event| event.id() == shared.id()));
    125     assert!(
    126         beta_events
    127             .iter()
    128             .any(|event| event.id() == beta_extra.id())
    129     );
    130 
    131     let alpha_count = count_payload(
    132         &mut alpha,
    133         "alpha-count",
    134         json!({"kinds":[7], "#e":[target]}),
    135     )
    136     .await;
    137     let beta_count =
    138         count_payload(&mut beta, "beta-count", json!({"kinds":[7], "#e":[target]})).await;
    139     assert_eq!(alpha_count["count"], 1);
    140     assert_eq!(beta_count["count"], 2);
    141     let alpha_hll = alpha_count["hll"].as_str().expect("alpha hll");
    142     let beta_hll = beta_count["hll"].as_str().expect("beta hll");
    143     assert_eq!(alpha_hll.len(), 512);
    144     assert_eq!(beta_hll.len(), 512);
    145     assert_ne!(alpha_hll, beta_hll);
    146 
    147     host.shutdown().await;
    148 }
    149 
    150 #[tokio::test]
    151 async fn tenant_isolation_group_state_generated_signatures_and_delete_are_local() {
    152     let host = RunningHost::start("group-state", 600).await;
    153     let mut alpha = connect_authenticated_socket(
    154         host.address,
    155         "alpha.relay.test",
    156         "wss://alpha.relay.test",
    157         FixtureKey::Owner,
    158     )
    159     .await;
    160     let mut beta = connect_authenticated_socket(
    161         host.address,
    162         "beta.relay.test",
    163         "wss://beta.relay.test",
    164         FixtureKey::Owner,
    165     )
    166     .await;
    167     let group_id = "shared-isolation";
    168     let create =
    169         tangle_v2_group_create_event(FixtureKey::Owner, group_id, 1_714_200_100, &["public"])
    170             .expect("create");
    171     let alpha_metadata = tangle_v2_group_metadata_event(
    172         FixtureKey::Owner,
    173         group_id,
    174         "Alpha Tenant Market",
    175         1_714_200_101,
    176         &[],
    177     )
    178     .expect("alpha metadata");
    179     let beta_metadata = tangle_v2_group_metadata_event(
    180         FixtureKey::Owner,
    181         group_id,
    182         "Beta Tenant Market",
    183         1_714_200_102,
    184         &[],
    185     )
    186     .expect("beta metadata");
    187     let beta_member = tangle_v2_put_user_event(
    188         FixtureKey::Owner,
    189         group_id,
    190         FixtureKey::Member,
    191         1_714_200_103,
    192     )
    193     .expect("beta member");
    194     let alpha_normal =
    195         tangle_v2_group_event(FixtureKey::Owner, group_id, 1_714_200_104, 1, "alpha crop")
    196             .expect("alpha normal");
    197     let beta_normal =
    198         tangle_v2_group_event(FixtureKey::Owner, group_id, 1_714_200_105, 1, "beta crop")
    199             .expect("beta normal");
    200 
    201     assert_ok_accepted(&mut alpha, &create).await;
    202     assert_ok_accepted(&mut beta, &create).await;
    203     assert_ok_accepted(&mut alpha, &alpha_metadata).await;
    204     assert_ok_accepted(&mut beta, &beta_metadata).await;
    205     assert_ok_accepted(&mut beta, &beta_member).await;
    206     assert_ok_accepted(&mut alpha, &alpha_normal).await;
    207     assert_ok_accepted(&mut beta, &beta_normal).await;
    208 
    209     let alpha_relay_pubkey = relay_pubkey_hex(0x77);
    210     let beta_relay_pubkey = relay_pubkey_hex(0x88);
    211     let alpha_generated = collect_req_events(
    212         &mut alpha,
    213         "alpha-generated-metadata",
    214         json!({"kinds":[KIND_GROUP_METADATA], "#d":[group_id]}),
    215     )
    216     .await;
    217     let beta_generated = collect_req_events(
    218         &mut beta,
    219         "beta-generated-metadata",
    220         json!({"kinds":[KIND_GROUP_METADATA], "#d":[group_id]}),
    221     )
    222     .await;
    223     assert_eq!(alpha_generated.len(), 1);
    224     assert_eq!(beta_generated.len(), 1);
    225     assert_eq!(
    226         tag_value(&alpha_generated[0], "name"),
    227         Some("Alpha Tenant Market")
    228     );
    229     assert_eq!(
    230         tag_value(&beta_generated[0], "name"),
    231         Some("Beta Tenant Market")
    232     );
    233     assert_eq!(
    234         alpha_generated[0].unsigned().pubkey().as_str(),
    235         alpha_relay_pubkey
    236     );
    237     assert_eq!(
    238         beta_generated[0].unsigned().pubkey().as_str(),
    239         beta_relay_pubkey
    240     );
    241     assert_pocket_signature(&alpha_generated[0]);
    242     assert_pocket_signature(&beta_generated[0]);
    243 
    244     let alpha_admins = collect_req_events(
    245         &mut alpha,
    246         "alpha-generated-admins",
    247         json!({"kinds":[KIND_GROUP_ADMINS], "#d":[group_id]}),
    248     )
    249     .await;
    250     let beta_admins = collect_req_events(
    251         &mut beta,
    252         "beta-generated-admins",
    253         json!({"kinds":[KIND_GROUP_ADMINS], "#d":[group_id]}),
    254     )
    255     .await;
    256     assert_eq!(alpha_admins.len(), 1);
    257     assert_eq!(beta_admins.len(), 1);
    258     assert_eq!(
    259         alpha_admins[0].unsigned().pubkey().as_str(),
    260         alpha_relay_pubkey
    261     );
    262     assert_eq!(
    263         beta_admins[0].unsigned().pubkey().as_str(),
    264         beta_relay_pubkey
    265     );
    266 
    267     assert!(
    268         collect_req_events(
    269             &mut alpha,
    270             "alpha-generated-members",
    271             json!({"kinds":[KIND_GROUP_MEMBERS], "#d":[group_id]}),
    272         )
    273         .await
    274         .is_empty()
    275     );
    276     let beta_members = collect_req_events(
    277         &mut beta,
    278         "beta-generated-members",
    279         json!({"kinds":[KIND_GROUP_MEMBERS], "#d":[group_id]}),
    280     )
    281     .await;
    282     assert_eq!(beta_members.len(), 1);
    283     assert_eq!(
    284         beta_members[0].unsigned().pubkey().as_str(),
    285         beta_relay_pubkey
    286     );
    287 
    288     let alpha_delete =
    289         tangle_v2_delete_group_event(FixtureKey::Owner, group_id, 1_714_200_106).expect("delete");
    290     let alpha_future = tangle_v2_group_event(
    291         FixtureKey::Owner,
    292         group_id,
    293         1_714_200_107,
    294         1,
    295         "alpha blocked",
    296     )
    297     .expect("alpha future");
    298     let beta_future = tangle_v2_group_event(
    299         FixtureKey::Owner,
    300         group_id,
    301         1_714_200_108,
    302         1,
    303         "beta still open",
    304     )
    305     .expect("beta future");
    306     assert_ok_accepted(&mut alpha, &alpha_delete).await;
    307     let rejected = publish_event(&mut alpha, &alpha_future).await;
    308     assert_eq!(rejected[0], "OK");
    309     assert_eq!(rejected[2], false);
    310     assert_eq!(rejected[3], "blocked: group is deleted");
    311     assert_ok_accepted(&mut beta, &beta_future).await;
    312 
    313     assert_eq!(
    314         req_closed_message(
    315             &mut alpha,
    316             "alpha-deleted-normal",
    317             json!({"kinds":[1], "#h":[group_id]}),
    318         )
    319         .await,
    320         "restricted: group is unavailable"
    321     );
    322     assert_eq!(
    323         collect_req_events(
    324             &mut beta,
    325             "beta-open-normal",
    326             json!({"kinds":[1], "#h":[group_id]}),
    327         )
    328         .await
    329         .len(),
    330         2
    331     );
    332     assert_eq!(
    333         count_payload(
    334             &mut alpha,
    335             "alpha-delete-marker",
    336             json!({"kinds":[KIND_GROUP_DELETE_GROUP], "#h":[group_id]}),
    337         )
    338         .await["count"],
    339         1
    340     );
    341     assert_eq!(
    342         count_payload(
    343             &mut beta,
    344             "beta-delete-marker",
    345             json!({"kinds":[KIND_GROUP_DELETE_GROUP], "#h":[group_id]}),
    346         )
    347         .await["count"],
    348         0
    349     );
    350 
    351     let metrics = http_get(host.address, "/.well-known/tangle/metrics").await;
    352     for private_value in [
    353         group_id,
    354         alpha_metadata.id().as_str(),
    355         beta_metadata.id().as_str(),
    356         alpha_normal.id().as_str(),
    357         beta_normal.id().as_str(),
    358         FixtureKey::Owner.public_key().as_str(),
    359         "alpha crop",
    360         "beta crop",
    361         &"77".repeat(32),
    362         &"88".repeat(32),
    363     ] {
    364         assert!(!metrics.contains(private_value));
    365     }
    366 
    367     host.shutdown().await;
    368 }
    369 
    370 #[tokio::test]
    371 async fn tenant_isolation_rate_limits_are_tenant_local() {
    372     let host = RunningHost::start("rate-limits", 1).await;
    373     let mut alpha = connect_socket(host.address, "alpha.relay.test").await;
    374     let mut beta = connect_socket(host.address, "beta.relay.test").await;
    375     let first = tangle_v2_event(
    376         FixtureKey::Member,
    377         1_714_200_200,
    378         1,
    379         Vec::new(),
    380         "alpha first",
    381     )
    382     .expect("first");
    383     let second = tangle_v2_event(
    384         FixtureKey::Admin,
    385         1_714_200_201,
    386         1,
    387         Vec::new(),
    388         "alpha second",
    389     )
    390     .expect("second");
    391     let beta_first = tangle_v2_event(
    392         FixtureKey::Outsider,
    393         1_714_200_202,
    394         1,
    395         Vec::new(),
    396         "beta first",
    397     )
    398     .expect("beta first");
    399 
    400     assert_ok_accepted(&mut alpha, &first).await;
    401     let rejected = publish_event(&mut alpha, &second).await;
    402     assert_eq!(rejected[0], "OK");
    403     assert_eq!(rejected[2], false);
    404     let message = rejected[3].as_str().expect("message");
    405     assert!(message.starts_with("rate-limited: "));
    406     assert!(message.contains("rate limit exceeded until "));
    407     assert_ok_accepted(&mut beta, &beta_first).await;
    408 
    409     host.shutdown().await;
    410 }
    411 
    412 #[test]
    413 fn tenant_isolation_rejects_shared_pocket_store_config() {
    414     let root = temp_root("shared-store");
    415     let _ = std::fs::remove_dir_all(&root);
    416     let host = parse_tangle_host_runtime_config_json(&host_config_value().to_string())
    417         .expect("host config");
    418     let alpha = parse_tenant_runtime_config_json(
    419         &tenant_config_value(
    420             &root,
    421             TenantFixture {
    422                 tenant_id: "alpha",
    423                 tenant_schema: "alpha_schema",
    424                 host: "alpha.relay.test",
    425                 relay_url: "wss://alpha.relay.test",
    426                 name: "Alpha Relay",
    427                 relay_secret_byte: 0x77,
    428                 pocket_suffix: "shared",
    429             },
    430             600,
    431         )
    432         .to_string(),
    433     )
    434     .expect("alpha");
    435     let beta = parse_tenant_runtime_config_json(
    436         &tenant_config_value(
    437             &root,
    438             TenantFixture {
    439                 tenant_id: "beta",
    440                 tenant_schema: "beta_schema",
    441                 host: "beta.relay.test",
    442                 relay_url: "wss://beta.relay.test",
    443                 name: "Beta Relay",
    444                 relay_secret_byte: 0x88,
    445                 pocket_suffix: "shared",
    446             },
    447             600,
    448         )
    449         .to_string(),
    450     )
    451     .expect("beta");
    452     let error = TangleHostRuntimeConfigSet::new(host, vec![alpha, beta]).expect_err("shared store");
    453 
    454     assert!(
    455         error
    456             .message()
    457             .contains("duplicate tenant pocket data directory")
    458     );
    459     let _ = std::fs::remove_dir_all(root);
    460 }
    461 
    462 struct RunningHost {
    463     root: PathBuf,
    464     address: SocketAddr,
    465     shutdown_signal: TangleShutdownSignal,
    466     task: JoinHandle<Result<TangleServeReport, BaseRelayError>>,
    467 }
    468 
    469 impl RunningHost {
    470     async fn start(name: &str, event_per_ip_max_hits: u64) -> Self {
    471         let root = temp_root(name);
    472         let _ = std::fs::remove_dir_all(&root);
    473         let runtime = host_runtime(&root, event_per_ip_max_hits);
    474         let shutdown_signal = runtime.shutdown_signal().clone();
    475         let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
    476         let address = listener.local_addr().expect("address");
    477         let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener));
    478         Self {
    479             root,
    480             address,
    481             shutdown_signal,
    482             task,
    483         }
    484     }
    485 
    486     async fn shutdown(self) {
    487         self.shutdown_signal.request_shutdown();
    488         let report = timeout(Duration::from_secs(1), self.task)
    489             .await
    490             .expect("server shutdown")
    491             .expect("task")
    492             .expect("serve");
    493         assert_eq!(report.listen_addr(), self.address);
    494         let _ = std::fs::remove_dir_all(self.root);
    495     }
    496 }
    497 
    498 async fn connect_socket(address: SocketAddr, host: &str) -> TestWebSocket {
    499     let mut request = format!("ws://{address}/")
    500         .into_client_request()
    501         .expect("request");
    502     request.headers_mut().insert(
    503         header::SEC_WEBSOCKET_PROTOCOL,
    504         http::HeaderValue::from_static("nostr"),
    505     );
    506     request.headers_mut().insert(
    507         header::HOST,
    508         http::HeaderValue::from_str(host).expect("host"),
    509     );
    510     let (mut socket, response) = connect_async(request).await.expect("websocket");
    511     assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    512     let challenge = read_relay_value(&mut socket).await;
    513     assert_eq!(challenge[0], "AUTH");
    514     socket
    515 }
    516 
    517 async fn connect_authenticated_socket(
    518     address: SocketAddr,
    519     host: &str,
    520     relay_url: &str,
    521     key: FixtureKey,
    522 ) -> TestWebSocket {
    523     let mut request = format!("ws://{address}/")
    524         .into_client_request()
    525         .expect("request");
    526     request.headers_mut().insert(
    527         header::SEC_WEBSOCKET_PROTOCOL,
    528         http::HeaderValue::from_static("nostr"),
    529     );
    530     request.headers_mut().insert(
    531         header::HOST,
    532         http::HeaderValue::from_str(host).expect("host"),
    533     );
    534     let (mut socket, response) = connect_async(request).await.expect("websocket");
    535     assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS);
    536     let challenge = read_relay_value(&mut socket).await;
    537     assert_eq!(challenge[0], "AUTH");
    538     let auth = auth_event_for_relay(
    539         key,
    540         challenge[1].as_str().expect("challenge"),
    541         current_unix_timestamp(),
    542         relay_url,
    543     )
    544     .expect("auth");
    545     send_client_value(&mut socket, json!(["AUTH", event_to_value(&auth)])).await;
    546     assert_eq!(
    547         read_relay_value(&mut socket).await,
    548         json!(["OK", auth.id().as_str(), true, ""])
    549     );
    550     socket
    551 }
    552 
    553 async fn assert_ok_accepted(socket: &mut TestWebSocket, event: &Event) {
    554     assert_eq!(
    555         publish_event(socket, event).await,
    556         json!(["OK", event.id().as_str(), true, ""])
    557     );
    558 }
    559 
    560 async fn publish_event(socket: &mut TestWebSocket, event: &Event) -> Value {
    561     send_client_value(socket, json!(["EVENT", event_to_value(event)])).await;
    562     read_relay_value(socket).await
    563 }
    564 
    565 async fn collect_req_events(
    566     socket: &mut TestWebSocket,
    567     subscription_id: &str,
    568     filter: Value,
    569 ) -> Vec<Event> {
    570     send_client_value(socket, json!(["REQ", subscription_id, filter])).await;
    571     let mut events = Vec::new();
    572     loop {
    573         let message = read_relay_value(socket).await;
    574         match message[0].as_str().expect("message kind") {
    575             "EVENT" => {
    576                 assert_eq!(message[1], subscription_id);
    577                 events.push(event_from_value(&message[2]).expect("event"));
    578             }
    579             "EOSE" => {
    580                 assert_eq!(message[1], subscription_id);
    581                 send_client_value(socket, json!(["CLOSE", subscription_id])).await;
    582                 break;
    583             }
    584             "CLOSED" => panic!("{message}"),
    585             other => panic!("{other}: {message}"),
    586         }
    587     }
    588     events
    589 }
    590 
    591 async fn req_closed_message(
    592     socket: &mut TestWebSocket,
    593     subscription_id: &str,
    594     filter: Value,
    595 ) -> String {
    596     send_client_value(socket, json!(["REQ", subscription_id, filter])).await;
    597     let message = read_relay_value(socket).await;
    598     assert_eq!(message[0], "CLOSED");
    599     assert_eq!(message[1], subscription_id);
    600     message[2].as_str().expect("closed message").to_owned()
    601 }
    602 
    603 async fn count_payload(socket: &mut TestWebSocket, subscription_id: &str, filter: Value) -> Value {
    604     send_client_value(socket, json!(["COUNT", subscription_id, filter])).await;
    605     let message = read_relay_value(socket).await;
    606     assert_eq!(message[0], "COUNT", "{message}");
    607     assert_eq!(message[1], subscription_id);
    608     message[2].clone()
    609 }
    610 
    611 async fn send_client_value(socket: &mut TestWebSocket, value: Value) {
    612     socket
    613         .send(TungsteniteMessage::Text(value.to_string().into()))
    614         .await
    615         .expect("send client message");
    616 }
    617 
    618 async fn read_relay_value(socket: &mut TestWebSocket) -> Value {
    619     let message = timeout(Duration::from_secs(1), socket.next())
    620         .await
    621         .expect("relay message timeout")
    622         .expect("relay message")
    623         .expect("relay message result");
    624     let TungsteniteMessage::Text(text) = message else {
    625         panic!("expected relay text message, got {message:?}");
    626     };
    627     serde_json::from_str(text.as_str()).expect("relay json")
    628 }
    629 
    630 async fn assert_no_relay_message(socket: &mut TestWebSocket) {
    631     assert!(
    632         timeout(Duration::from_millis(100), socket.next())
    633             .await
    634             .is_err()
    635     );
    636 }
    637 
    638 async fn http_get(address: SocketAddr, path: &str) -> String {
    639     let path = path.to_owned();
    640     tokio::task::spawn_blocking(move || -> std::io::Result<String> {
    641         use std::io::{Read, Write};
    642         use std::net::TcpStream;
    643 
    644         let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(500))?;
    645         stream.set_read_timeout(Some(Duration::from_millis(500)))?;
    646         stream.set_write_timeout(Some(Duration::from_millis(500)))?;
    647         let request =
    648             format!("GET {path} HTTP/1.1\r\nHost: alpha.relay.test\r\nConnection: close\r\n\r\n");
    649         stream.write_all(request.as_bytes())?;
    650         let mut response = String::new();
    651         stream.read_to_string(&mut response)?;
    652         Ok(response
    653             .split("\r\n\r\n")
    654             .nth(1)
    655             .unwrap_or_default()
    656             .to_owned())
    657     })
    658     .await
    659     .expect("http task")
    660     .expect("http get")
    661 }
    662 
    663 fn auth_event_for_relay(
    664     key: FixtureKey,
    665     challenge: &str,
    666     created_at: u64,
    667     relay_url: &str,
    668 ) -> Result<Event, String> {
    669     tangle_v2_event(
    670         key,
    671         created_at,
    672         22_242,
    673         vec![
    674             Tag::from_parts("relay", &[relay_url])?,
    675             Tag::from_parts("challenge", &[challenge])?,
    676         ],
    677         "",
    678     )
    679 }
    680 
    681 fn tangle_v2_event(
    682     key: FixtureKey,
    683     created_at: u64,
    684     kind: u64,
    685     tags: Vec<Tag>,
    686     content: &str,
    687 ) -> Result<Event, String> {
    688     let event = isolation_pocket_event(key, created_at, kind, tags, content);
    689     isolation_pocket_event_to_protocol(&event)
    690 }
    691 
    692 fn tangle_v2_group_create_event(
    693     key: FixtureKey,
    694     group_id: &str,
    695     created_at: u64,
    696     flags: &[&str],
    697 ) -> Result<Event, String> {
    698     let mut tags = vec![
    699         Tag::from_parts("h", &[group_id])?,
    700         Tag::from_parts("name", &[group_id])?,
    701     ];
    702     for flag in flags {
    703         tags.push(Tag::from_parts(flag, &[])?);
    704     }
    705     tangle_v2_event(key, created_at, KIND_GROUP_CREATE_GROUP.into(), tags, "")
    706 }
    707 
    708 fn tangle_v2_group_metadata_event(
    709     key: FixtureKey,
    710     group_id: &str,
    711     name: &str,
    712     created_at: u64,
    713     flags: &[&str],
    714 ) -> Result<Event, String> {
    715     let mut tags = vec![
    716         Tag::from_parts("h", &[group_id])?,
    717         Tag::from_parts("name", &[name])?,
    718     ];
    719     for flag in flags {
    720         tags.push(Tag::from_parts(flag, &[])?);
    721     }
    722     tangle_v2_event(key, created_at, KIND_GROUP_EDIT_METADATA.into(), tags, "")
    723 }
    724 
    725 fn tangle_v2_put_user_event(
    726     key: FixtureKey,
    727     group_id: &str,
    728     target: FixtureKey,
    729     created_at: u64,
    730 ) -> Result<Event, String> {
    731     let target = target.public_key();
    732     tangle_v2_event(
    733         key,
    734         created_at,
    735         KIND_GROUP_PUT_USER.into(),
    736         vec![
    737             Tag::from_parts("h", &[group_id])?,
    738             Tag::from_parts("p", &[target.as_str()])?,
    739         ],
    740         "",
    741     )
    742 }
    743 
    744 fn tangle_v2_delete_group_event(
    745     key: FixtureKey,
    746     group_id: &str,
    747     created_at: u64,
    748 ) -> Result<Event, String> {
    749     tangle_v2_group_event(
    750         key,
    751         group_id,
    752         created_at,
    753         KIND_GROUP_DELETE_GROUP.into(),
    754         "",
    755     )
    756 }
    757 
    758 fn tangle_v2_group_event(
    759     key: FixtureKey,
    760     group_id: &str,
    761     created_at: u64,
    762     kind: u64,
    763     content: &str,
    764 ) -> Result<Event, String> {
    765     tangle_v2_event(
    766         key,
    767         created_at,
    768         kind,
    769         vec![Tag::from_parts("h", &[group_id])?],
    770         content,
    771     )
    772 }
    773 
    774 fn isolation_pocket_event(
    775     key: FixtureKey,
    776     created_at: u64,
    777     kind: u64,
    778     tags: Vec<Tag>,
    779     content: &str,
    780 ) -> PocketOwnedEvent {
    781     let tags = isolation_pocket_tags_from_protocol(&tags);
    782     let secret = format!("{:02x}", fixture_secret_byte(key)).repeat(32);
    783     RelaySigner::from_secret_hex(&secret)
    784         .expect("signer")
    785         .sign_pocket_event(
    786             PocketKind::from_u16(u16::try_from(kind).expect("pocket kind")),
    787             &tags,
    788             PocketTime::from_u64(created_at),
    789             content.as_bytes(),
    790         )
    791         .expect("pocket event")
    792 }
    793 
    794 fn isolation_pocket_tags_from_protocol(tags: &[Tag]) -> PocketOwnedTags {
    795     let parts = tags
    796         .iter()
    797         .map(|tag| tag.values().iter().map(String::as_str).collect::<Vec<_>>())
    798         .collect::<Vec<_>>();
    799     PocketOwnedTags::new(&parts).expect("pocket tags")
    800 }
    801 
    802 fn isolation_pocket_event_to_protocol(event: &PocketEvent) -> Result<Event, String> {
    803     let tags = event
    804         .tags()
    805         .map_err(|error| error.to_string())?
    806         .iter()
    807         .map(|tag| {
    808             Tag::new(
    809                 tag.map(|value| {
    810                     std::str::from_utf8(value)
    811                         .map(str::to_owned)
    812                         .map_err(|error| error.to_string())
    813                 })
    814                 .collect::<Result<Vec<_>, _>>()?,
    815             )
    816             .map_err(|error| error.to_string())
    817         })
    818         .collect::<Result<Vec<_>, _>>()?;
    819     Ok(Event::new(
    820         EventId::new(&event.id().as_hex_string()).map_err(|error| error.to_string())?,
    821         UnsignedEvent::new(
    822             PublicKeyHex::new(&event.pubkey().as_hex_string())
    823                 .map_err(|error| error.to_string())?,
    824             UnixTimestamp::new(event.created_at().as_u64()),
    825             Kind::new(u64::from(event.kind().as_u16())).map_err(|error| error.to_string())?,
    826             tags,
    827             std::str::from_utf8(event.content()).map_err(|error| error.to_string())?,
    828         ),
    829         SignatureHex::new(&event.sig().to_string()).map_err(|error| error.to_string())?,
    830     ))
    831 }
    832 
    833 fn fixture_secret_byte(key: FixtureKey) -> u8 {
    834     match key {
    835         FixtureKey::Relay => 9,
    836         FixtureKey::Owner => 10,
    837         FixtureKey::Admin => 11,
    838         FixtureKey::Member => 12,
    839         FixtureKey::Outsider => 13,
    840     }
    841 }
    842 
    843 fn host_runtime(root: &Path, event_per_ip_max_hits: u64) -> TangleHostRuntime {
    844     let host = parse_tangle_host_runtime_config_json(&host_config_value().to_string())
    845         .expect("host config");
    846     let tenants = [
    847         TenantFixture {
    848             tenant_id: "alpha",
    849             tenant_schema: "alpha_schema",
    850             host: "alpha.relay.test",
    851             relay_url: "wss://alpha.relay.test",
    852             name: "Alpha Relay",
    853             relay_secret_byte: 0x77,
    854             pocket_suffix: "alpha",
    855         },
    856         TenantFixture {
    857             tenant_id: "beta",
    858             tenant_schema: "beta_schema",
    859             host: "beta.relay.test",
    860             relay_url: "wss://beta.relay.test",
    861             name: "Beta Relay",
    862             relay_secret_byte: 0x88,
    863             pocket_suffix: "beta",
    864         },
    865     ]
    866     .into_iter()
    867     .map(|fixture| {
    868         parse_tenant_runtime_config_json(
    869             &tenant_config_value(root, fixture, event_per_ip_max_hits).to_string(),
    870         )
    871         .expect("tenant config")
    872     })
    873     .collect::<Vec<_>>();
    874     let config = TangleHostRuntimeConfigSet::new(host, tenants).expect("config set");
    875     TangleHostRuntime::open(config).expect("host runtime")
    876 }
    877 
    878 fn host_config_value() -> Value {
    879     json!({
    880         "listen_addr": "127.0.0.1:0",
    881         "tenant_config_dir": "tenants",
    882         "limits": {
    883             "max_total_connections": 64,
    884             "max_total_subscriptions": 512,
    885             "tenant_startup_concurrency": 4
    886         }
    887     })
    888 }
    889 
    890 struct TenantFixture<'a> {
    891     tenant_id: &'a str,
    892     tenant_schema: &'a str,
    893     host: &'a str,
    894     relay_url: &'a str,
    895     name: &'a str,
    896     relay_secret_byte: u8,
    897     pocket_suffix: &'a str,
    898 }
    899 
    900 fn tenant_config_value(
    901     root: &Path,
    902     fixture: TenantFixture<'_>,
    903     event_per_ip_max_hits: u64,
    904 ) -> Value {
    905     let relay_secret = format!("{:02x}", fixture.relay_secret_byte).repeat(32);
    906     json!({
    907         "tenant_id": fixture.tenant_id,
    908         "tenant_schema": fixture.tenant_schema,
    909         "host": fixture.host,
    910         "relay_url": fixture.relay_url,
    911         "info": {
    912             "name": fixture.name
    913         },
    914         "pocket": {
    915             "data_directory": root.join(fixture.pocket_suffix),
    916             "sync_policy": "flush_on_shutdown",
    917         },
    918         "pocket_query": {
    919             "allow_scraping": false,
    920             "allow_scrape_if_limited_to": 100,
    921             "allow_scrape_if_max_seconds": 3600
    922         },
    923         "groups": {
    924             "enabled": true,
    925             "canonical_relay_url": fixture.relay_url,
    926             "relay_secret": relay_secret,
    927             "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()],
    928             "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()]
    929         },
    930         "auth": {
    931             "challenge_ttl_seconds": 300,
    932             "created_at_skew_seconds": 600
    933         },
    934         "limits": {
    935             "max_message_length": 1048576,
    936             "max_subid_length": 64,
    937             "max_subscriptions_per_connection": 64,
    938             "max_filters_per_request": 10,
    939             "max_tag_values_per_filter": 100,
    940             "max_query_complexity": 2048,
    941             "max_limit": 500,
    942             "default_limit": 100,
    943             "max_event_tags": 200,
    944             "max_content_length": 65536,
    945             "broadcast_channel_capacity": 16,
    946             "per_connection_outbound_queue": 16
    947         },
    948         "rate_limits": {
    949             "auth": {
    950                 "per_ip": {"window_seconds": 60, "max_hits": 120},
    951                 "per_pubkey": {"window_seconds": 60, "max_hits": 30},
    952                 "failures": {"window_seconds": 300, "max_hits": 5},
    953                 "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
    954             },
    955             "event": {
    956                 "per_ip": {"window_seconds": 60, "max_hits": event_per_ip_max_hits},
    957                 "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    958                 "per_kind": {"window_seconds": 60, "max_hits": 1000}
    959             },
    960             "group": {
    961                 "write_per_ip": {"window_seconds": 60, "max_hits": 300},
    962                 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
    963                 "write_per_group": {"window_seconds": 60, "max_hits": 90},
    964                 "write_per_kind": {"window_seconds": 60, "max_hits": 300},
    965                 "join_flow": {"window_seconds": 300, "max_hits": 10},
    966                 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
    967             },
    968             "req": {
    969                 "per_ip": {"window_seconds": 60, "max_hits": 600},
    970                 "per_connection": {"window_seconds": 60, "max_hits": 120},
    971                 "per_pubkey": {"window_seconds": 60, "max_hits": 240},
    972                 "per_group": {"window_seconds": 60, "max_hits": 240},
    973                 "per_kind": {"window_seconds": 60, "max_hits": 500},
    974                 "broad": {"window_seconds": 60, "max_hits": 30}
    975             },
    976             "count": {
    977                 "per_ip": {"window_seconds": 60, "max_hits": 300},
    978                 "per_connection": {"window_seconds": 60, "max_hits": 60},
    979                 "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    980                 "per_group": {"window_seconds": 60, "max_hits": 120},
    981                 "per_kind": {"window_seconds": 60, "max_hits": 240},
    982                 "broad": {"window_seconds": 60, "max_hits": 20}
    983             }
    984         }
    985     })
    986 }
    987 
    988 fn relay_pubkey_hex(secret_byte: u8) -> String {
    989     RelaySigner::from_secret_hex(&format!("{secret_byte:02x}").repeat(32))
    990         .expect("relay signer")
    991         .public_key()
    992         .as_str()
    993         .to_owned()
    994 }
    995 
    996 fn tag_value<'a>(event: &'a Event, tag_name: &str) -> Option<&'a str> {
    997     event
    998         .unsigned()
    999         .tags()
   1000         .iter()
   1001         .find(|tag| tag.name().as_str() == tag_name)
   1002         .and_then(|tag| tag.values().get(1))
   1003         .map(String::as_str)
   1004 }
   1005 
   1006 fn assert_pocket_signature(event: &Event) {
   1007     let raw = serde_json::to_vec(&event_to_value(event)).expect("event json");
   1008     parse_pocket_event_json(&raw)
   1009         .expect("pocket event")
   1010         .verify()
   1011         .expect("pocket signature");
   1012 }
   1013 
   1014 fn current_unix_timestamp() -> u64 {
   1015     SystemTime::now()
   1016         .duration_since(UNIX_EPOCH)
   1017         .expect("system time")
   1018         .as_secs()
   1019 }
   1020 
   1021 fn temp_root(name: &str) -> PathBuf {
   1022     std::env::temp_dir().join(format!(
   1023         "tangle-tenant-isolation-{name}-{}",
   1024         std::process::id()
   1025     ))
   1026 }