commit 377003a4c8bdb46769b279dc906792f4fffb8db3
parent 9a472bf0b4f6b5d7b99ecd7c5a285cc88be13d62
Author: triesap <triesap@radroots.dev>
Date: Mon, 22 Dec 2025 17:41:41 +0000
indexer: add event types, deterministic outputs, and robust cursor handling
- add repository coding directives and workspace guidance
- add codec dependency and wiring for expanded event decoding
- introduce indexers for post, follow, job request, job result, and job feedback events
- extend event kind handling with grouping, ranges, and SQL filtering
- refactor the indexer loop into batch processing with cursor persistence and rowid fallback
- normalize nip05 handling with full and local resolution in profile lookups
- strengthen comment and reaction tag parsing for address and legacy forms
- make index writing deterministic with stable hashes, safe path segments, and change detection
- improve listing sharding with empty handling and safe size conversion
- add tests for parsing, grouping, sharding, and profile tie breaks
- keep log writers alive with nonblocking guards and reduce utf8 truncation risks
- cache database trees for reuse and lower per call overhead
Diffstat:
38 files changed, 3396 insertions(+), 812 deletions(-)
diff --git a/AGENTS.md b/AGENTS.md
@@ -0,0 +1,22 @@
+# Rad Roots - Code Directives
+
+## Rust Code Directives
+- Toolchain: Rust 1.86, edition 2024; use workspace versions from the root Cargo.toml.
+- Portability: preserve no_std patterns; gate std usage with cfg(feature = "std") and use alloc when needed.
+- Safety: avoid unsafe; prefer safe, explicit APIs. Add #![forbid(unsafe_code)] on new crates/modules.
+- Public API: keep Radroots* prefix; avoid hidden panics; return Result/Option for fallible ops; use precise error enums (thiserror where appropriate).
+- Features: keep serde/typeshare/ts-rs derives behind existing feature gates and in the current style; ensure feature combinations compile (no_std, std, wasm).
+- Generated outputs: treat */bindings/ts/src/types.ts as generated; do not hand-edit.
+- Performance: borrow over clone, avoid intermediate allocations, preallocate when sizes are known, and prefer iterators over indexing loops.
+- DRY: consolidate shared logic into core/types/events-codec or dedicated helpers.
+- Parity: maintain feature parity across native/wasm layers when adding SQL or Tangle APIs.
+- Module layout: keep lib.rs as a module manifest and re-export surface; avoid heavy logic in lib.rs.
+- Testing: add or update unit tests for new behavior and edge cases, especially around parsing, invariants, conversions, and rounding.
+
+## TypeScript Code Directives
+- Types: use strict TypeScript; do not use any, unknown casts, or weaken types; prefer explicit interfaces and types; use enums or literal unions when clear; use named exports and avoid default exports.
+- Naming: variables and functions use snake_case; types, interfaces, and enums use PascalCase; prefer layered prefixes to namespace meaning: domain_object_action, e.g. nostr_keys_gen(), NostrKeysGen; do not enforce naming conventions in _env*.ts files; constant data structures must use ALL_CAPS_SNAKE_CASE.
+- Source code: keep code deterministic and reproducible; do not add source code comments; single-line if statements must not use braces; anchor comments are allowed only when they start with @ (e.g. // @todo); /* */ blocks are allowed only to disable features during development and must not include descriptive text; <!-- --> HTML blocks are allowed only to disable features during development or compiler/lint suppression (e.g. <!-- svelte-ignore ... -->).
+- Modularity: do not duplicate logic; put shared or generalizable code in packages/; apps should rely on packages/ for shared utilities; treat @radroots/*-bindings as generated from .rs crates and do not edit or format their .ts outputs, change upstream .rs or report errors instead; every class must implement a same-name interface prefixed with I, all public methods must be declared on the interface, and method return types are required for all class methods.
+- Architecture: prefer pure functions; prefer composition over inheritance; avoid side effects unless required; avoid global mutable state.
+- Change policy: apply breaking changes when needed; do not add legacy or shim fixes.
diff --git a/Cargo.lock b/Cargo.lock
@@ -1298,10 +1298,19 @@ version = "0.1.0"
dependencies = [
"radroots-core",
"serde",
+ "ts-rs",
"typeshare",
]
[[package]]
+name = "radroots-events-codec"
+version = "0.1.0"
+dependencies = [
+ "radroots-core",
+ "radroots-events",
+]
+
+[[package]]
name = "radroots-events-indexed"
version = "0.1.0"
dependencies = [
@@ -1321,6 +1330,7 @@ dependencies = [
"once_cell",
"radroots-core",
"radroots-events",
+ "radroots-events-codec",
"radroots-events-indexed",
"regex",
"rusqlite",
@@ -1704,6 +1714,15 @@ dependencies = [
]
[[package]]
+name = "termcolor"
+version = "1.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
+dependencies = [
+ "winapi-util",
+]
+
+[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1953,6 +1972,28 @@ dependencies = [
]
[[package]]
+name = "ts-rs"
+version = "11.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4994acea2522cd2b3b85c1d9529a55991e3ad5e25cdcd3de9d505972c4379424"
+dependencies = [
+ "thiserror 2.0.12",
+ "ts-rs-macros",
+]
+
+[[package]]
+name = "ts-rs-macros"
+version = "11.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ee6ff59666c9cbaec3533964505d39154dc4e0a56151fdea30a09ed0301f62e2"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "termcolor",
+]
+
+[[package]]
name = "typenum"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2162,6 +2203,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
+name = "winapi-util"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
+dependencies = [
+ "windows-sys",
+]
+
+[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
@@ -13,6 +13,7 @@ license = "AGPL-3.0"
[workspace.dependencies]
radroots-core = { path = "../crates/core" }
radroots-events = { path = "../crates/events" }
+radroots-events-codec = { path = "../crates/events-codec" }
radroots-events-indexed = { path = "../crates/events-indexed" }
anyhow = { version = "1" }
diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml
@@ -13,6 +13,7 @@ audit = []
[dependencies]
radroots-core = { workspace = true }
radroots-events = { workspace = true }
+radroots-events-codec = { workspace = true }
radroots-events-indexed = { workspace = true }
anyhow = { workspace = true }
diff --git a/indexer/src/audit.rs b/indexer/src/audit.rs
@@ -3,16 +3,16 @@
use std::collections::HashSet;
use std::sync::RwLock;
-use crate::utils::nostr::public_key_to_npub;
+use crate::utils::nostr::{normalize_nip05, public_key_to_npub};
use once_cell::sync::Lazy;
use regex::Regex;
use tracing::info;
use crate::domain::resolvers::profile::ProfileResolver;
use crate::relay::event::RelayIndexerEvent;
-use radroots_events::comment::models::RadrootsCommentEventIndex;
-use radroots_events::listing::models::RadrootsListingEventIndex;
-use radroots_events::profile::models::RadrootsProfileEventIndex;
+use radroots_events::comment::RadrootsCommentEventIndex;
+use radroots_events::listing::RadrootsListingEventIndex;
+use radroots_events::profile::RadrootsProfileEventIndex;
#[derive(Clone, Debug)]
pub struct AuditFilter {
@@ -123,12 +123,8 @@ pub fn set_profile_resolver(resolver: ProfileResolver) {
}
fn nip05_parts_from_metadata(nip05: &str) -> (String, String) {
- let lower = nip05.to_lowercase();
- if let Some((name, domain)) = lower.split_once('@') {
- (format!("{name}@{domain}"), name.to_string())
- } else {
- (lower.clone(), lower)
- }
+ let (full, local, _) = normalize_nip05(nip05);
+ (full, local)
}
fn should_log(
@@ -235,8 +231,19 @@ pub fn log_indexer_event(idx: &RelayIndexerEvent) {
let (nip05_full_opt, nip05_local_opt) = if need_full || need_local {
if let Ok(s) = STATE.read() {
if let Some(res) = s.resolver.as_ref() {
- let local = res.nip05_for_author(&idx.author).map(|s| s.to_string());
- (None, local)
+ let full = if need_full {
+ res.nip05_full_for_author(&idx.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ let local = if need_local {
+ res.nip05_local_for_author(&idx.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ (full, local)
} else {
(None, None)
}
@@ -299,7 +306,7 @@ pub fn log_profile_event(evt: &RadrootsProfileEventIndex) {
if !should_log(
&evt.event.author,
- evt.event.kind.try_into().unwrap(),
+ u64::from(evt.event.kind),
evt.event.created_at,
&evt.event.content,
npub_opt,
@@ -349,10 +356,19 @@ pub fn log_listing_event(evt: &RadrootsListingEventIndex) {
let (nip05_full_opt, nip05_local_opt) = if need_full || need_local {
if let Ok(s) = STATE.read() {
if let Some(res) = s.resolver.as_ref() {
- let local = res
- .nip05_for_author(&evt.event.author)
- .map(|s| s.to_string());
- (None, local)
+ let full = if need_full {
+ res.nip05_full_for_author(&evt.event.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ let local = if need_local {
+ res.nip05_local_for_author(&evt.event.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ (full, local)
} else {
(None, None)
}
@@ -415,10 +431,19 @@ pub fn log_comment_event(evt: &RadrootsCommentEventIndex) {
let (nip05_full_opt, nip05_local_opt) = if need_full || need_local {
if let Ok(s) = STATE.read() {
if let Some(res) = s.resolver.as_ref() {
- let local = res
- .nip05_for_author(&evt.event.author)
- .map(|s| s.to_string());
- (None, local)
+ let full = if need_full {
+ res.nip05_full_for_author(&evt.event.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ let local = if need_local {
+ res.nip05_local_for_author(&evt.event.author)
+ .map(|s| s.to_string())
+ } else {
+ None
+ };
+ (full, local)
} else {
(None, None)
}
diff --git a/indexer/src/domain/events/comment.rs b/indexer/src/domain/events/comment.rs
@@ -1,6 +1,6 @@
use crate::relay::event::RelayIndexerEvent;
use radroots_events::{
- comment::models::{RadrootsComment, RadrootsCommentEventIndex, RadrootsCommentEventMetadata},
+ comment::{RadrootsComment, RadrootsCommentEventIndex, RadrootsCommentEventMetadata},
RadrootsNostrEvent, RadrootsNostrEventRef,
};
use thiserror::Error;
@@ -11,85 +11,217 @@ pub enum RadrootsCommentEventIndexError {
ParseError,
}
+fn parse_address(addr: &str) -> Option<(u32, String, Option<String>)> {
+ let mut parts = addr.splitn(3, ':');
+ let kind = parts.next()?.parse::<u32>().ok()?;
+ let author = parts.next()?.to_lowercase();
+ let d_tag = parts
+ .next()
+ .and_then(|d| if d.is_empty() { None } else { Some(d.to_string()) });
+ Some((kind, author, d_tag))
+}
+
fn parse_comment_from_tags(
tags: &[Vec<String>],
content: &str,
) -> Result<RadrootsComment, RadrootsCommentEventIndexError> {
let mut root_id: Option<String> = None;
- let mut root_relays: Option<Vec<String>> = None;
- let mut root_kind: Option<u32> = None;
- let mut root_author: Option<String> = None;
+ let mut root_addr: Option<String> = None;
+ let mut root_relays_e: Option<Vec<String>> = None;
+ let mut root_relays_a: Option<Vec<String>> = None;
+ let mut root_kind_tag: Option<u32> = None;
+ let mut root_kind_addr: Option<u32> = None;
+ let mut root_author_tag: Option<String> = None;
+ let mut root_author_addr: Option<String> = None;
+ let mut root_author_e: Option<String> = None;
let mut root_d: Option<String> = None;
let mut parent_id: Option<String> = None;
- let mut parent_relays: Option<Vec<String>> = None;
- let mut parent_kind: Option<u32> = None;
- let mut parent_author: Option<String> = None;
+ let mut parent_addr: Option<String> = None;
+ let mut parent_relays_e: Option<Vec<String>> = None;
+ let mut parent_relays_a: Option<Vec<String>> = None;
+ let mut parent_kind_tag: Option<u32> = None;
+ let mut parent_kind_addr: Option<u32> = None;
+ let mut parent_author_tag: Option<String> = None;
+ let mut parent_author_addr: Option<String> = None;
+ let mut parent_author_e: Option<String> = None;
let mut parent_d: Option<String> = None;
+ let mut legacy_root_id: Option<String> = None;
+ let mut legacy_root_relays: Option<Vec<String>> = None;
+ let mut legacy_parent_id: Option<String> = None;
+ let mut legacy_parent_relays: Option<Vec<String>> = None;
+ let mut legacy_root_kind: Option<u32> = None;
+ let mut legacy_root_author: Option<String> = None;
+ let mut legacy_root_d: Option<String> = None;
+
for t in tags {
- if t.first().map(|k| k == "e_root").unwrap_or(false) {
- if let Some(id) = t.get(1).cloned() {
- root_id = Some(id);
+ match t.first().map(|k| k.as_str()) {
+ Some("E") => {
+ if let Some(id) = t.get(1).cloned() {
+ root_id = Some(id);
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ root_relays_e = Some(vec![r]);
+ }
+ if let Some(pk) = t.get(3).filter(|s| !s.is_empty()) {
+ root_author_e = Some(pk.to_lowercase());
+ }
}
- if let Some(r) = t.get(2).cloned() {
- root_relays = Some(vec![r]);
+ Some("A") => {
+ if let Some(addr) = t.get(1).cloned() {
+ root_addr = Some(addr.clone());
+ if let Some((kind, author, d_tag)) = parse_address(&addr) {
+ root_kind_addr = Some(kind);
+ root_author_addr = Some(author);
+ root_d = d_tag;
+ }
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ root_relays_a = Some(vec![r]);
+ }
}
- } else if t.first().map(|k| k == "e_prev").unwrap_or(false) {
- if let Some(id) = t.get(1).cloned() {
- parent_id = Some(id);
+ Some("K") => {
+ if let Some(kind) = t.get(1).and_then(|v| v.parse::<u32>().ok()) {
+ root_kind_tag = Some(kind);
+ }
}
- if let Some(r) = t.get(2).cloned() {
- parent_relays = Some(vec![r]);
+ Some("P") => {
+ if let Some(pk) = t.get(1).filter(|s| !s.is_empty()) {
+ root_author_tag = Some(pk.to_lowercase());
+ }
}
- } else if t.first().map(|k| k == "e").unwrap_or(false) {
- if root_id.is_none() {
+ Some("e_root") => {
if let Some(id) = t.get(1).cloned() {
- root_id = Some(id);
+ legacy_root_id = Some(id);
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ legacy_root_relays = Some(vec![r]);
}
}
- if root_relays.is_none() {
- if let Some(r) = t.get(2).cloned() {
- root_relays = Some(vec![r]);
+ Some("e_prev") => {
+ if let Some(id) = t.get(1).cloned() {
+ legacy_parent_id = Some(id);
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ legacy_parent_relays = Some(vec![r]);
}
}
- } else if t.first().map(|k| k == "a").unwrap_or(false) {
- if let Some(arg) = t.get(1) {
- let parts: Vec<&str> = arg.split(':').collect();
- if parts.len() >= 2 {
- root_kind = parts[0].parse::<u32>().ok();
- root_author = Some(parts[1].to_lowercase());
- if parts.len() >= 3 && !parts[2].is_empty() {
- root_d = Some(parts[2].to_string());
+ Some("e") => {
+ if let Some(id) = t.get(1).cloned() {
+ parent_id = Some(id.clone());
+ if legacy_root_id.is_none() {
+ legacy_root_id = Some(id);
+ }
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ parent_relays_e = Some(vec![r.clone()]);
+ if legacy_root_relays.is_none() {
+ legacy_root_relays = Some(vec![r]);
}
}
+ if let Some(pk) = t.get(3).filter(|s| !s.is_empty()) {
+ parent_author_e = Some(pk.to_lowercase());
+ }
}
- if let Some(r) = t.get(2).cloned() {
- root_relays = Some(vec![r]);
+ Some("a") => {
+ if let Some(addr) = t.get(1).cloned() {
+ parent_addr = Some(addr.clone());
+ if let Some((kind, author, d_tag)) = parse_address(&addr) {
+ parent_kind_addr = Some(kind);
+ parent_author_addr = Some(author.clone());
+ parent_d = d_tag.clone();
+ if legacy_root_kind.is_none() {
+ legacy_root_kind = Some(kind);
+ }
+ if legacy_root_author.is_none() {
+ legacy_root_author = Some(author);
+ }
+ if legacy_root_d.is_none() {
+ legacy_root_d = d_tag;
+ }
+ }
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ parent_relays_a = Some(vec![r]);
+ }
+ }
+ Some("k") => {
+ if let Some(kind) = t.get(1).and_then(|v| v.parse::<u32>().ok()) {
+ parent_kind_tag = Some(kind);
+ }
+ }
+ Some("p") => {
+ if let Some(pk) = t.get(1).filter(|s| !s.is_empty()) {
+ parent_author_tag = Some(pk.to_lowercase());
+ }
+ }
+ _ => {}
+ }
+ }
+
+ let has_nip22_root = root_id.is_some() || root_addr.is_some();
+ if !has_nip22_root {
+ if root_id.is_none() {
+ root_id = legacy_root_id;
+ if root_relays_e.is_none() {
+ root_relays_e = legacy_root_relays;
+ }
+ }
+ if root_kind_tag.is_none() {
+ root_kind_tag = legacy_root_kind;
+ }
+ if root_author_tag.is_none() {
+ root_author_tag = legacy_root_author;
+ }
+ if root_d.is_none() {
+ root_d = legacy_root_d;
+ }
+ if parent_id.is_none() && parent_addr.is_none() {
+ parent_id = legacy_parent_id;
+ if parent_relays_e.is_none() {
+ parent_relays_e = legacy_parent_relays;
}
}
}
+ let root_id = root_id.or(root_addr.clone()).ok_or(RadrootsCommentEventIndexError::ParseError)?;
+ let root_kind = root_kind_tag.or(root_kind_addr).unwrap_or(1);
+ let root_author = root_author_tag
+ .or(root_author_addr)
+ .or(root_author_e)
+ .unwrap_or_default();
+ let root_relays = root_relays_e.or(root_relays_a);
+
+ let mut parent_id = parent_id.or(parent_addr.clone());
+ let parent_kind = parent_kind_tag
+ .or(parent_kind_addr)
+ .unwrap_or(root_kind);
+ let parent_author = parent_author_tag
+ .or(parent_author_addr)
+ .or(parent_author_e)
+ .unwrap_or_else(|| root_author.clone());
+ let parent_relays = parent_relays_e
+ .or(parent_relays_a)
+ .or_else(|| root_relays.clone());
+ let parent_d = parent_d.or(root_d.clone());
+
if parent_id.is_none() {
- parent_id = root_id.clone();
- parent_relays = root_relays.clone();
- parent_kind = root_kind;
- parent_author = root_author.clone();
- parent_d = root_d.clone();
+ parent_id = Some(root_id.clone());
}
let root = RadrootsNostrEventRef {
- id: root_id.ok_or(RadrootsCommentEventIndexError::ParseError)?,
- author: root_author.unwrap_or_default(),
- kind: root_kind.unwrap_or(1),
+ id: root_id,
+ author: root_author,
+ kind: root_kind,
d_tag: root_d,
relays: root_relays,
};
let parent = RadrootsNostrEventRef {
id: parent_id.ok_or(RadrootsCommentEventIndexError::ParseError)?,
- author: parent_author.unwrap_or_default(),
- kind: parent_kind.unwrap_or(1),
+ author: parent_author,
+ kind: parent_kind,
d_tag: parent_d,
relays: parent_relays,
};
@@ -106,10 +238,10 @@ fn create_radroots_comment_event_metadata(
author: String,
published_at: u32,
kind: u32,
- content: String,
- tags: Vec<Vec<String>>,
+ content: &str,
+ tags: &[Vec<String>],
) -> Result<RadrootsCommentEventMetadata, RadrootsCommentEventIndexError> {
- let comment = parse_comment_from_tags(&tags, &content)?;
+ let comment = parse_comment_from_tags(tags, content)?;
Ok(RadrootsCommentEventMetadata {
id,
author,
@@ -121,34 +253,76 @@ fn create_radroots_comment_event_metadata(
pub trait ToRadrootsCommentEventIndex {
fn to_radroots_comment_event(
- self,
+ &self,
) -> Result<RadrootsCommentEventIndex, RadrootsCommentEventIndexError>;
}
impl ToRadrootsCommentEventIndex for RelayIndexerEvent {
fn to_radroots_comment_event(
- self,
+ &self,
) -> Result<RadrootsCommentEventIndex, RadrootsCommentEventIndexError> {
let kind_u32 = self.kind.as_u64() as u32;
+ let id = self.id.clone();
+ let author = self.author.clone();
let metadata = create_radroots_comment_event_metadata(
- self.id.clone(),
- self.author.clone(),
+ id.clone(),
+ author.clone(),
self.created_at,
kind_u32,
- self.content.clone(),
- self.tags.clone(),
+ &self.content,
+ &self.tags,
)?;
Ok(RadrootsCommentEventIndex {
event: RadrootsNostrEvent {
- id: self.id,
- author: self.author,
+ id,
+ author,
created_at: self.created_at,
kind: kind_u32,
- tags: self.tags,
- content: self.content,
- sig: self.sig,
+ tags: self.tags.clone(),
+ content: self.content.clone(),
+ sig: self.sig.clone(),
},
metadata,
})
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::parse_comment_from_tags;
+
+ #[test]
+ fn comment_parses_address_only_root_and_parent() {
+ let pubkey = "f".repeat(64);
+ let addr = format!("30023:{}:dtag", pubkey);
+ let tags = vec![
+ vec!["A".to_string(), addr.clone()],
+ vec!["K".to_string(), "30023".to_string()],
+ vec!["a".to_string(), addr.clone()],
+ vec!["k".to_string(), "30023".to_string()],
+ ];
+
+ let comment = parse_comment_from_tags(&tags, "hello").expect("parse comment");
+ assert_eq!(comment.root.id, addr);
+ assert_eq!(comment.root.kind, 30023);
+ assert_eq!(comment.root.author, pubkey);
+ assert_eq!(comment.root.d_tag.as_deref(), Some("dtag"));
+ assert_eq!(comment.parent.id, comment.root.id);
+ assert_eq!(comment.parent.kind, 30023);
+ }
+
+ #[test]
+ fn comment_defaults_parent_to_root_when_missing() {
+ let pubkey = "e".repeat(64);
+ let addr = format!("30023:{}:root", pubkey);
+ let tags = vec![
+ vec!["A".to_string(), addr.clone()],
+ vec!["K".to_string(), "30023".to_string()],
+ ];
+
+ let comment = parse_comment_from_tags(&tags, "hello").expect("parse comment");
+ assert_eq!(comment.parent.id, comment.root.id);
+ assert_eq!(comment.parent.kind, comment.root.kind);
+ assert_eq!(comment.parent.author, comment.root.author);
+ }
+}
diff --git a/indexer/src/domain/events/follow.rs b/indexer/src/domain/events/follow.rs
@@ -0,0 +1,35 @@
+use radroots_events::follow::RadrootsFollowEventIndex;
+use radroots_events_codec::{error::EventParseError, follow::decode as follow_decode};
+use thiserror::Error;
+
+use crate::relay::event::RelayIndexerEvent;
+
+#[derive(Debug, Error)]
+pub enum RadrootsFollowEventIndexError {
+ #[error("Failed to parse follow event: {0}")]
+ ParseError(#[from] EventParseError),
+}
+
+pub trait ToRadrootsFollowEventIndex {
+ fn to_radroots_follow_event(
+ &self,
+ ) -> Result<RadrootsFollowEventIndex, RadrootsFollowEventIndexError>;
+}
+
+impl ToRadrootsFollowEventIndex for RelayIndexerEvent {
+ fn to_radroots_follow_event(
+ &self,
+ ) -> Result<RadrootsFollowEventIndex, RadrootsFollowEventIndexError> {
+ let kind_u32 = self.kind.as_u64() as u32;
+ let index = follow_decode::index_from_event(
+ self.id.clone(),
+ self.author.clone(),
+ self.created_at,
+ kind_u32,
+ self.content.clone(),
+ self.tags.clone(),
+ self.sig.clone(),
+ )?;
+ Ok(index)
+ }
+}
diff --git a/indexer/src/domain/events/job_feedback.rs b/indexer/src/domain/events/job_feedback.rs
@@ -0,0 +1,35 @@
+use radroots_events::job_feedback::RadrootsJobFeedbackEventIndex;
+use radroots_events_codec::{job::error::JobParseError, job::feedback::decode as job_feedback_decode};
+use thiserror::Error;
+
+use crate::relay::event::RelayIndexerEvent;
+
+#[derive(Debug, Error)]
+pub enum RadrootsJobFeedbackEventIndexError {
+ #[error("Failed to parse job feedback event: {0}")]
+ ParseError(#[from] JobParseError),
+}
+
+pub trait ToRadrootsJobFeedbackEventIndex {
+ fn to_radroots_job_feedback_event(
+ &self,
+ ) -> Result<RadrootsJobFeedbackEventIndex, RadrootsJobFeedbackEventIndexError>;
+}
+
+impl ToRadrootsJobFeedbackEventIndex for RelayIndexerEvent {
+ fn to_radroots_job_feedback_event(
+ &self,
+ ) -> Result<RadrootsJobFeedbackEventIndex, RadrootsJobFeedbackEventIndexError> {
+ let kind_u32 = self.kind.as_u64() as u32;
+ let index = job_feedback_decode::index_from_event(
+ self.id.clone(),
+ self.author.clone(),
+ self.created_at,
+ kind_u32,
+ self.content.clone(),
+ self.tags.clone(),
+ self.sig.clone(),
+ )?;
+ Ok(index)
+ }
+}
diff --git a/indexer/src/domain/events/job_request.rs b/indexer/src/domain/events/job_request.rs
@@ -0,0 +1,35 @@
+use radroots_events::job_request::RadrootsJobRequestEventIndex;
+use radroots_events_codec::{job::error::JobParseError, job::request::decode as job_request_decode};
+use thiserror::Error;
+
+use crate::relay::event::RelayIndexerEvent;
+
+#[derive(Debug, Error)]
+pub enum RadrootsJobRequestEventIndexError {
+ #[error("Failed to parse job request event: {0}")]
+ ParseError(#[from] JobParseError),
+}
+
+pub trait ToRadrootsJobRequestEventIndex {
+ fn to_radroots_job_request_event(
+ &self,
+ ) -> Result<RadrootsJobRequestEventIndex, RadrootsJobRequestEventIndexError>;
+}
+
+impl ToRadrootsJobRequestEventIndex for RelayIndexerEvent {
+ fn to_radroots_job_request_event(
+ &self,
+ ) -> Result<RadrootsJobRequestEventIndex, RadrootsJobRequestEventIndexError> {
+ let kind_u32 = self.kind.as_u64() as u32;
+ let index = job_request_decode::index_from_event(
+ self.id.clone(),
+ self.author.clone(),
+ self.created_at,
+ kind_u32,
+ self.content.clone(),
+ self.tags.clone(),
+ self.sig.clone(),
+ )?;
+ Ok(index)
+ }
+}
diff --git a/indexer/src/domain/events/job_result.rs b/indexer/src/domain/events/job_result.rs
@@ -0,0 +1,35 @@
+use radroots_events::job_result::RadrootsJobResultEventIndex;
+use radroots_events_codec::{job::error::JobParseError, job::result::decode as job_result_decode};
+use thiserror::Error;
+
+use crate::relay::event::RelayIndexerEvent;
+
+#[derive(Debug, Error)]
+pub enum RadrootsJobResultEventIndexError {
+ #[error("Failed to parse job result event: {0}")]
+ ParseError(#[from] JobParseError),
+}
+
+pub trait ToRadrootsJobResultEventIndex {
+ fn to_radroots_job_result_event(
+ &self,
+ ) -> Result<RadrootsJobResultEventIndex, RadrootsJobResultEventIndexError>;
+}
+
+impl ToRadrootsJobResultEventIndex for RelayIndexerEvent {
+ fn to_radroots_job_result_event(
+ &self,
+ ) -> Result<RadrootsJobResultEventIndex, RadrootsJobResultEventIndexError> {
+ let kind_u32 = self.kind.as_u64() as u32;
+ let index = job_result_decode::index_from_event(
+ self.id.clone(),
+ self.author.clone(),
+ self.created_at,
+ kind_u32,
+ self.content.clone(),
+ self.tags.clone(),
+ self.sig.clone(),
+ )?;
+ Ok(index)
+ }
+}
diff --git a/indexer/src/domain/events/listing.rs b/indexer/src/domain/events/listing.rs
@@ -1,10 +1,10 @@
use thiserror::Error;
use radroots_events::{
- listing::models::{
+ listing::{
RadrootsListing, RadrootsListingEventIndex, RadrootsListingEventMetadata,
RadrootsListingImage, RadrootsListingImageSize, RadrootsListingLocation,
- RadrootsListingPrice, RadrootsListingProduct, RadrootsListingQuantity,
+ RadrootsListingProduct, RadrootsListingQuantity,
},
RadrootsNostrEvent,
};
@@ -73,7 +73,7 @@ fn parse_listing_from_tags(
}
}
- let mut prices: Vec<RadrootsListingPrice> = Vec::new();
+ let mut prices: Vec<radroots_core::RadrootsCoreQuantityPrice> = Vec::new();
for t in tags
.iter()
.filter(|t| t.first().map(|k| k == "price").unwrap_or(false))
@@ -178,7 +178,7 @@ fn parse_listing_from_tags(
None
};
- let images: Option<Vec<RadrootsListingImage>> = tags
+ let images = tags
.iter()
.filter(|t| t.first().map(|k| k == "img").unwrap_or(false))
.map(|t| {
@@ -195,8 +195,8 @@ fn parse_listing_from_tags(
};
RadrootsListingImage { url, size }
})
- .collect::<Vec<_>>()
- .into();
+ .collect::<Vec<_>>();
+ let images = if images.is_empty() { None } else { Some(images) };
Ok(RadrootsListing {
d_tag,
@@ -214,10 +214,9 @@ fn create_radroots_listing_event_metadata(
author: String,
published_at: u32,
kind: u32,
- _content: String,
- tags: Vec<Vec<String>>,
+ tags: &[Vec<String>],
) -> Result<RadrootsListingEventMetadata, RadrootsListingEventIndexError> {
- let listing = parse_listing_from_tags(&tags)?;
+ let listing = parse_listing_from_tags(tags)?;
Ok(RadrootsListingEventMetadata {
id,
author,
@@ -229,34 +228,35 @@ fn create_radroots_listing_event_metadata(
pub trait ToRadrootsListingEventIndex {
fn to_radroots_listing_event(
- self,
+ &self,
) -> Result<RadrootsListingEventIndex, RadrootsListingEventIndexError>;
}
impl ToRadrootsListingEventIndex for RelayIndexerEvent {
fn to_radroots_listing_event(
- self,
+ &self,
) -> Result<RadrootsListingEventIndex, RadrootsListingEventIndexError> {
let kind_u32 = self.kind.as_u64() as u32;
+ let id = self.id.clone();
+ let author = self.author.clone();
let metadata = create_radroots_listing_event_metadata(
- self.id.clone(),
- self.author.clone(),
+ id.clone(),
+ author.clone(),
self.created_at,
kind_u32,
- self.content.clone(),
- self.tags.clone(),
+ &self.tags,
)?;
Ok(RadrootsListingEventIndex {
event: RadrootsNostrEvent {
- id: self.id,
- author: self.author,
+ id,
+ author,
created_at: self.created_at,
kind: kind_u32,
- tags: self.tags,
- content: self.content,
- sig: self.sig,
+ tags: self.tags.clone(),
+ content: self.content.clone(),
+ sig: self.sig.clone(),
},
metadata,
})
diff --git a/indexer/src/domain/events/mod.rs b/indexer/src/domain/events/mod.rs
@@ -1,10 +1,20 @@
pub mod comment;
+pub mod follow;
+pub mod job_feedback;
+pub mod job_request;
+pub mod job_result;
pub mod listing;
+pub mod post;
pub mod profile;
pub mod reaction;
pub use comment::ToRadrootsCommentEventIndex;
+pub use follow::ToRadrootsFollowEventIndex;
+pub use job_feedback::ToRadrootsJobFeedbackEventIndex;
+pub use job_request::ToRadrootsJobRequestEventIndex;
+pub use job_result::ToRadrootsJobResultEventIndex;
pub use listing::ToRadrootsListingEventIndex;
+pub use post::ToRadrootsPostEventIndex;
pub use profile::ToRadrootsProfileEventIndex;
pub use reaction::ToRadrootsReactionEventIndex;
diff --git a/indexer/src/domain/events/post.rs b/indexer/src/domain/events/post.rs
@@ -0,0 +1,35 @@
+use radroots_events::post::RadrootsPostEventIndex;
+use radroots_events_codec::{error::EventParseError, post::decode as post_decode};
+use thiserror::Error;
+
+use crate::relay::event::RelayIndexerEvent;
+
+#[derive(Debug, Error)]
+pub enum RadrootsPostEventIndexError {
+ #[error("Failed to parse post event: {0}")]
+ ParseError(#[from] EventParseError),
+}
+
+pub trait ToRadrootsPostEventIndex {
+ fn to_radroots_post_event(
+ &self,
+ ) -> Result<RadrootsPostEventIndex, RadrootsPostEventIndexError>;
+}
+
+impl ToRadrootsPostEventIndex for RelayIndexerEvent {
+ fn to_radroots_post_event(
+ &self,
+ ) -> Result<RadrootsPostEventIndex, RadrootsPostEventIndexError> {
+ let kind_u32 = self.kind.as_u64() as u32;
+ let index = post_decode::index_from_event(
+ self.id.clone(),
+ self.author.clone(),
+ self.created_at,
+ kind_u32,
+ self.content.clone(),
+ self.tags.clone(),
+ self.sig.clone(),
+ )?;
+ Ok(index)
+ }
+}
diff --git a/indexer/src/domain/events/profile.rs b/indexer/src/domain/events/profile.rs
@@ -1,9 +1,8 @@
use anyhow::Result;
use radroots_events::{
- profile::models::{RadrootsProfile, RadrootsProfileEventIndex, RadrootsProfileEventMetadata},
+ profile::{RadrootsProfile, RadrootsProfileEventIndex, RadrootsProfileEventMetadata},
RadrootsNostrEvent,
};
-use std::collections::HashMap;
use thiserror::Error;
use crate::relay::event::RelayIndexerEvent;
@@ -21,18 +20,10 @@ pub enum RadrootsProfileEventIndexError {
pub fn create_radroots_profile_event_metadata(
id: String,
author: String,
- published_at: u64,
+ published_at: u32,
kind: u32,
- content: String,
- tags: Vec<Vec<String>>,
+ content: &str,
) -> Result<RadrootsProfileEventMetadata, RadrootsProfileEventIndexError> {
- let mut tag_map: HashMap<String, Vec<Vec<String>>> = HashMap::new();
- for tag in tags {
- if let Some(key) = tag.get(0).map(String::as_str) {
- tag_map.entry(key.to_string()).or_default().push(tag);
- }
- }
-
let profile: RadrootsProfile = serde_json::from_str(&content)?;
if profile.name.trim().is_empty() {
return Err(RadrootsProfileEventIndexError::MissingNameField);
@@ -49,34 +40,35 @@ pub fn create_radroots_profile_event_metadata(
pub trait ToRadrootsProfileEventIndex {
fn to_radroots_profile_event(
- self,
+ &self,
) -> Result<RadrootsProfileEventIndex, RadrootsProfileEventIndexError>;
}
impl ToRadrootsProfileEventIndex for RelayIndexerEvent {
fn to_radroots_profile_event(
- self,
+ &self,
) -> Result<RadrootsProfileEventIndex, RadrootsProfileEventIndexError> {
let kind_u32 = self.kind.as_u64() as u32;
+ let id = self.id.clone();
+ let author = self.author.clone();
let metadata = create_radroots_profile_event_metadata(
- self.id.clone(),
- self.author.clone(),
- self.created_at as u64,
+ id.clone(),
+ author.clone(),
+ self.created_at,
kind_u32,
- self.content.clone(),
- self.tags.clone(),
+ &self.content,
)?;
Ok(RadrootsProfileEventIndex {
event: RadrootsNostrEvent {
- id: self.id,
- author: self.author,
+ id,
+ author,
created_at: self.created_at,
kind: kind_u32,
- content: self.content,
- tags: self.tags,
- sig: self.sig,
+ content: self.content.clone(),
+ tags: self.tags.clone(),
+ sig: self.sig.clone(),
},
metadata,
})
diff --git a/indexer/src/domain/events/reaction.rs b/indexer/src/domain/events/reaction.rs
@@ -1,7 +1,7 @@
use thiserror::Error;
use radroots_events::{
- reaction::models::{
+ reaction::{
RadrootsReaction, RadrootsReactionEventIndex, RadrootsReactionEventMetadata,
},
RadrootsNostrEvent, RadrootsNostrEventRef,
@@ -19,47 +19,79 @@ fn parse_reaction_from_tags(
tags: &[Vec<String>],
content: &str,
) -> Result<RadrootsReaction, RadrootsReactionEventIndexError> {
+ let parse_address = |addr: &str| -> Option<(u32, String, Option<String>)> {
+ let mut parts = addr.splitn(3, ':');
+ let kind = parts.next()?.parse::<u32>().ok()?;
+ let author = parts.next()?.to_lowercase();
+ let d_tag = parts
+ .next()
+ .and_then(|d| if d.is_empty() { None } else { Some(d.to_string()) });
+ Some((kind, author, d_tag))
+ };
+
let mut root_id: Option<String> = None;
- let mut root_relays: Option<Vec<String>> = None;
- let mut root_kind: Option<u32> = None;
- let mut root_author: Option<String> = None;
+ let mut root_relays_e: Option<Vec<String>> = None;
+ let mut root_relays_a: Option<Vec<String>> = None;
+ let mut root_kind_tag: Option<u32> = None;
+ let mut root_kind_addr: Option<u32> = None;
+ let mut root_author_tag: Option<String> = None;
+ let mut root_author_addr: Option<String> = None;
+ let mut root_author_e: Option<String> = None;
let mut root_d: Option<String> = None;
for t in tags {
- if t.first().map(|k| k == "e").unwrap_or(false) {
- if let Some(id) = t.get(1).cloned() {
- root_id = Some(id);
- }
- if let Some(r) = t.get(2).cloned() {
- root_relays = Some(vec![r]);
+ match t.first().map(|k| k.as_str()) {
+ Some("e") => {
+ if let Some(id) = t.get(1).cloned() {
+ root_id = Some(id);
+ }
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ root_relays_e = Some(vec![r]);
+ }
+ if let Some(pk) = t.get(3).filter(|s| !s.is_empty()) {
+ root_author_e = Some(pk.to_lowercase());
+ }
}
- } else if t.first().map(|k| k == "a").unwrap_or(false) {
- if let Some(arg) = t.get(1) {
- let parts: Vec<&str> = arg.split(':').collect();
- if parts.len() >= 2 {
- root_kind = parts[0].parse::<u32>().ok();
- root_author = Some(parts[1].to_lowercase());
- if parts.len() >= 3 && !parts[2].is_empty() {
- root_d = Some(parts[2].to_string());
+ Some("a") => {
+ if let Some(addr) = t.get(1).cloned() {
+ if let Some((kind, author, d_tag)) = parse_address(&addr) {
+ root_kind_addr = Some(kind);
+ root_author_addr = Some(author);
+ root_d = d_tag;
}
}
+ if let Some(r) = t.get(2).filter(|s| !s.is_empty()).cloned() {
+ root_relays_a = Some(vec![r]);
+ }
+ }
+ Some("k") => {
+ if let Some(kind) = t.get(1).and_then(|v| v.parse::<u32>().ok()) {
+ root_kind_tag = Some(kind);
+ }
}
- if let Some(r) = t.get(2).cloned() {
- root_relays = Some(vec![r]);
+ Some("p") => {
+ if let Some(pk) = t.get(1).filter(|s| !s.is_empty()) {
+ root_author_tag = Some(pk.to_lowercase());
+ }
}
+ _ => {}
}
}
let id = root_id.ok_or(RadrootsReactionEventIndexError::ParseError)?;
- let kind = root_kind.unwrap_or(1);
- let author = root_author.unwrap_or_default();
+ let kind = root_kind_tag.or(root_kind_addr).unwrap_or(1);
+ let author = root_author_tag
+ .or(root_author_addr)
+ .or(root_author_e)
+ .unwrap_or_default();
+ let relays = root_relays_e.or(root_relays_a);
let root = RadrootsNostrEventRef {
id,
author,
kind,
d_tag: root_d,
- relays: root_relays,
+ relays,
};
Ok(RadrootsReaction {
@@ -73,10 +105,10 @@ fn create_radroots_reaction_event_metadata(
author: String,
published_at: u32,
kind: u32,
- content: String,
- tags: Vec<Vec<String>>,
+ content: &str,
+ tags: &[Vec<String>],
) -> Result<RadrootsReactionEventMetadata, RadrootsReactionEventIndexError> {
- let reaction = parse_reaction_from_tags(&tags, &content)?;
+ let reaction = parse_reaction_from_tags(tags, content)?;
Ok(RadrootsReactionEventMetadata {
id,
author,
@@ -88,34 +120,36 @@ fn create_radroots_reaction_event_metadata(
pub trait ToRadrootsReactionEventIndex {
fn to_radroots_reaction_event(
- self,
+ &self,
) -> Result<RadrootsReactionEventIndex, RadrootsReactionEventIndexError>;
}
impl ToRadrootsReactionEventIndex for RelayIndexerEvent {
fn to_radroots_reaction_event(
- self,
+ &self,
) -> Result<RadrootsReactionEventIndex, RadrootsReactionEventIndexError> {
let kind_u32 = self.kind.as_u64() as u32;
+ let id = self.id.clone();
+ let author = self.author.clone();
let metadata = create_radroots_reaction_event_metadata(
- self.id.clone(),
- self.author.clone(),
+ id.clone(),
+ author.clone(),
self.created_at,
kind_u32,
- self.content.clone(),
- self.tags.clone(),
+ &self.content,
+ &self.tags,
)?;
Ok(RadrootsReactionEventIndex {
event: RadrootsNostrEvent {
- id: self.id,
- author: self.author,
+ id,
+ author,
created_at: self.created_at,
kind: kind_u32,
- tags: self.tags,
- content: self.content,
- sig: self.sig,
+ tags: self.tags.clone(),
+ content: self.content.clone(),
+ sig: self.sig.clone(),
},
metadata,
})
diff --git a/indexer/src/domain/indexer/key.rs b/indexer/src/domain/indexer/key.rs
@@ -28,6 +28,20 @@ pub const PROFILE_INDEX_DIRECTORY: [IndexerKey; 4] = [
IndexerKey::Npub,
];
+pub const POST_INDEX_DIRECTORY: [IndexerKey; 4] = [
+ IndexerKey::Id,
+ IndexerKey::Author,
+ IndexerKey::Npub,
+ IndexerKey::Nip05,
+];
+
+pub const FOLLOW_INDEX_DIRECTORY: [IndexerKey; 4] = [
+ IndexerKey::Id,
+ IndexerKey::Author,
+ IndexerKey::Npub,
+ IndexerKey::Nip05,
+];
+
pub const LISTING_INDEX_DIRECTORY: [IndexerKey; 5] = [
IndexerKey::Id,
IndexerKey::Country,
@@ -51,3 +65,26 @@ pub const COMMENT_INDEX_DIRECTORY: [IndexerKey; 5] = [
IndexerKey::Npub,
IndexerKey::Nip05,
];
+
+pub const JOB_REQUEST_INDEX_DIRECTORY: [IndexerKey; 4] = [
+ IndexerKey::Id,
+ IndexerKey::Author,
+ IndexerKey::Npub,
+ IndexerKey::Nip05,
+];
+
+pub const JOB_RESULT_INDEX_DIRECTORY: [IndexerKey; 5] = [
+ IndexerKey::Id,
+ IndexerKey::RootId,
+ IndexerKey::Author,
+ IndexerKey::Npub,
+ IndexerKey::Nip05,
+];
+
+pub const JOB_FEEDBACK_INDEX_DIRECTORY: [IndexerKey; 5] = [
+ IndexerKey::Id,
+ IndexerKey::RootId,
+ IndexerKey::Author,
+ IndexerKey::Npub,
+ IndexerKey::Nip05,
+];
diff --git a/indexer/src/domain/indexer/kind.rs b/indexer/src/domain/indexer/kind.rs
@@ -4,53 +4,113 @@ use std::fmt;
use std::path::PathBuf;
use crate::domain::indexer::key::{
- IndexerKey, COMMENT_INDEX_DIRECTORY, LISTING_INDEX_DIRECTORY, PROFILE_INDEX_DIRECTORY,
- REACTION_INDEX_DIRECTORY,
+ COMMENT_INDEX_DIRECTORY, FOLLOW_INDEX_DIRECTORY, IndexerKey, JOB_FEEDBACK_INDEX_DIRECTORY,
+ JOB_REQUEST_INDEX_DIRECTORY, JOB_RESULT_INDEX_DIRECTORY, LISTING_INDEX_DIRECTORY,
+ POST_INDEX_DIRECTORY, PROFILE_INDEX_DIRECTORY, REACTION_INDEX_DIRECTORY,
};
use crate::utils::io::{paths_join, PathsError};
+use radroots_events::kinds::{
+ is_request_kind, is_result_kind, KIND_JOB_FEEDBACK, KIND_JOB_REQUEST_MAX, KIND_JOB_REQUEST_MIN,
+ KIND_JOB_RESULT_MAX, KIND_JOB_RESULT_MIN,
+};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum IndexerEventKind {
Profile,
+ Post,
+ Follow,
Reaction,
Listing,
Comment,
+ JobRequest(u32),
+ JobResult(u32),
+ JobFeedback,
}
impl IndexerEventKind {
- pub const ALL: [IndexerEventKind; 4] = [
+ pub const GROUPS: [IndexerEventKind; 9] = [
IndexerEventKind::Profile,
+ IndexerEventKind::Post,
+ IndexerEventKind::Follow,
IndexerEventKind::Reaction,
IndexerEventKind::Listing,
IndexerEventKind::Comment,
+ IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN),
+ IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN),
+ IndexerEventKind::JobFeedback,
];
pub const fn as_u64(self) -> u64 {
match self {
IndexerEventKind::Profile => 0,
+ IndexerEventKind::Post => 1,
+ IndexerEventKind::Follow => 3,
IndexerEventKind::Reaction => 7,
IndexerEventKind::Listing => 30402,
IndexerEventKind::Comment => 1111,
+ IndexerEventKind::JobRequest(kind) => kind as u64,
+ IndexerEventKind::JobResult(kind) => kind as u64,
+ IndexerEventKind::JobFeedback => KIND_JOB_FEEDBACK as u64,
}
}
pub const fn paths(self) -> &'static [IndexerKey] {
match self {
IndexerEventKind::Profile => &PROFILE_INDEX_DIRECTORY,
+ IndexerEventKind::Post => &POST_INDEX_DIRECTORY,
+ IndexerEventKind::Follow => &FOLLOW_INDEX_DIRECTORY,
IndexerEventKind::Reaction => &REACTION_INDEX_DIRECTORY,
IndexerEventKind::Listing => &LISTING_INDEX_DIRECTORY,
IndexerEventKind::Comment => &COMMENT_INDEX_DIRECTORY,
+ IndexerEventKind::JobRequest(_) => &JOB_REQUEST_INDEX_DIRECTORY,
+ IndexerEventKind::JobResult(_) => &JOB_RESULT_INDEX_DIRECTORY,
+ IndexerEventKind::JobFeedback => &JOB_FEEDBACK_INDEX_DIRECTORY,
}
}
pub fn base_path<P: AsRef<std::path::Path>>(self, data_dir: P) -> Result<PathBuf, PathsError> {
- paths_join(&[
- data_dir.as_ref().to_str().unwrap(),
- "static",
- "events",
- &self.as_u64().to_string(),
+ let kind_dir = self.as_u64().to_string();
+ paths_join([
+ data_dir.as_ref(),
+ std::path::Path::new("static"),
+ std::path::Path::new("events"),
+ std::path::Path::new(&kind_dir),
])
}
+
+ pub fn group(self) -> Self {
+ match self {
+ IndexerEventKind::JobRequest(_) => IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN),
+ IndexerEventKind::JobResult(_) => IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN),
+ other => other,
+ }
+ }
+
+ pub fn relay_kind_filter_sql() -> String {
+ let exact = [
+ IndexerEventKind::Profile.as_u64(),
+ IndexerEventKind::Post.as_u64(),
+ IndexerEventKind::Follow.as_u64(),
+ IndexerEventKind::Reaction.as_u64(),
+ IndexerEventKind::Listing.as_u64(),
+ IndexerEventKind::Comment.as_u64(),
+ IndexerEventKind::JobFeedback.as_u64(),
+ ];
+ let list = exact
+ .iter()
+ .map(|k| k.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ let req = format!(
+ "(kind BETWEEN {} AND {})",
+ KIND_JOB_REQUEST_MIN, KIND_JOB_REQUEST_MAX
+ );
+ let res = format!(
+ "(kind BETWEEN {} AND {})",
+ KIND_JOB_RESULT_MIN, KIND_JOB_RESULT_MAX
+ );
+ format!("kind IN ({}) OR {} OR {}", list, req, res)
+ }
}
impl fmt::Display for IndexerEventKind {
@@ -87,12 +147,53 @@ impl TryFrom<u64> for IndexerEventKind {
type Error = IndexerEventKindParseError;
fn try_from(val: u64) -> Result<Self, Self::Error> {
- match val {
+ let v = u32::try_from(val).map_err(|_| IndexerEventKindParseError(val))?;
+ match v {
0 => Ok(IndexerEventKind::Profile),
+ 1 => Ok(IndexerEventKind::Post),
+ 3 => Ok(IndexerEventKind::Follow),
7 => Ok(IndexerEventKind::Reaction),
30402 => Ok(IndexerEventKind::Listing),
1111 => Ok(IndexerEventKind::Comment),
- other => Err(IndexerEventKindParseError(other)),
+ KIND_JOB_FEEDBACK => Ok(IndexerEventKind::JobFeedback),
+ _ if is_request_kind(v) => Ok(IndexerEventKind::JobRequest(v)),
+ _ if is_result_kind(v) => Ok(IndexerEventKind::JobResult(v)),
+ other => Err(IndexerEventKindParseError(other as u64)),
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::IndexerEventKind;
+ use radroots_events::kinds::{KIND_JOB_REQUEST_MIN, KIND_JOB_RESULT_MIN};
+
+ #[test]
+ fn kind_grouping_uses_job_min() {
+ let req = IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN + 1);
+ let res = IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN + 1);
+ assert_eq!(
+ req.group(),
+ IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN)
+ );
+ assert_eq!(
+ res.group(),
+ IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN)
+ );
+ }
+
+ #[test]
+ fn kind_try_from_job_ranges() {
+ let req = IndexerEventKind::try_from(KIND_JOB_REQUEST_MIN as u64 + 5).unwrap();
+ let res = IndexerEventKind::try_from(KIND_JOB_RESULT_MIN as u64 + 5).unwrap();
+ assert_eq!(req.as_u64(), KIND_JOB_REQUEST_MIN as u64 + 5);
+ assert_eq!(res.as_u64(), KIND_JOB_RESULT_MIN as u64 + 5);
+ }
+
+ #[test]
+ fn kind_try_from_rejects_overflow() {
+ let too_large = u64::from(u32::MAX) + 1;
+ let err = IndexerEventKind::try_from(too_large).expect_err("overflow");
+ assert_eq!(err.0, too_large);
+ }
+}
diff --git a/indexer/src/domain/indexer/models/comment.rs b/indexer/src/domain/indexer/models/comment.rs
@@ -1,7 +1,5 @@
use crate::domain::indexer::key::{IndexerKey, COMMENT_INDEX_DIRECTORY};
-use crate::utils::crypto::compute_hash;
-use crate::utils::io::fs_mkdir;
-use crate::utils::io::{write_hash, write_json};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
use crate::utils::nostr::public_key_to_npub;
use crate::utils::strings::truncate_log;
use crate::{
@@ -17,32 +15,14 @@ use crate::{
relay::event::RelayIndexerEvent,
Settings,
};
-use radroots_events::comment::models::{RadrootsCommentEventIndex, RadrootsCommentEventMetadata};
-use std::{collections::BTreeMap, fs, path::PathBuf};
+use radroots_events::comment::{RadrootsCommentEventIndex, RadrootsCommentEventMetadata};
+use std::{collections::BTreeMap, path::PathBuf};
use tracing::{instrument, warn};
-macro_rules! write_if_stale {
- ($path:expr, $data:expr, $updated:expr) => {{
- let hash = compute_hash(&$data)?;
- let hash_path = $path.with_extension("sha256.txt");
- let needs_write = if $path.exists() && hash_path.exists() {
- let stored = fs::read_to_string(&hash_path)?;
- stored.trim() != hash
- } else {
- true
- };
- if needs_write {
- write_json(&$path, &$data)?;
- write_hash(&$path, &hash)?;
- $updated.push($path.clone());
- }
- }};
-}
-
#[derive(Debug)]
pub struct EventCommentIndexes {
events: Vec<RadrootsCommentEventIndex>,
- events_id: BTreeMap<String, RadrootsCommentEventIndex>,
+ events_id: BTreeMap<String, usize>,
root_ids: BTreeMap<String, Vec<String>>,
author_ids: BTreeMap<String, Vec<String>>,
npub_ids: BTreeMap<String, Vec<String>>,
@@ -55,28 +35,32 @@ impl EventCommentIndexes {
profiles: &ProfileResolver,
) -> Result<Self, NostrEventsStaticError> {
let mut events = Vec::with_capacity(raw_events.len());
- let mut events_id = BTreeMap::new();
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
let mut root_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
for raw in raw_events {
- match raw.clone().to_radroots_comment_event() {
+ match raw.to_radroots_comment_event() {
Ok(evt) => {
audit::log_comment_event(&evt);
let id = evt.metadata.id.clone();
let author_hex = evt.metadata.author.to_lowercase();
let npub = public_key_to_npub(&author_hex)
- .map(|s| s.to_lowercase())
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
.ok();
let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
let root = evt.metadata.comment.root.id.to_lowercase();
- events_id.insert(id.clone(), evt.clone());
- events.push(evt.clone());
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
root_ids.entry(root).or_default().push(id.clone());
author_ids.entry(author_hex).or_default().push(id.clone());
@@ -84,10 +68,7 @@ impl EventCommentIndexes {
npub_ids.entry(n).or_default().push(id.clone());
}
if let Some(n05) = author_nip05 {
- nip05_ids
- .entry(n05.to_lowercase())
- .or_default()
- .push(id.clone());
+ nip05_ids.entry(n05).or_default().push(id.clone());
}
}
Err(err) => {
@@ -105,31 +86,32 @@ impl EventCommentIndexes {
}
let sort_ids = |ids: &mut Vec<String>,
- map: &BTreeMap<String, RadrootsCommentEventIndex>| {
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsCommentEventIndex]| {
ids.sort_unstable_by(|a, b| {
let pa = map
.get(a)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
let pb = map
.get(b)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
pb.cmp(&pa).then(a.cmp(b))
});
};
for ids in root_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in author_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in npub_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in nip05_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
Ok(Self {
@@ -165,96 +147,138 @@ impl WriteEventIndexes for EventCommentIndexes {
{
let idxs_root = base.join("events.json");
let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
- write_if_stale!(idxs_root, ids, updated);
+ write_json_if_changed(&idxs_root, &ids, updated)?;
}
{
let sub = base.join("id");
fs_mkdir(&[&sub])?;
- let keys: Vec<String> = self.events_id.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), keys, updated);
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
- for (id, evt) in &self.events_id {
- let dir = sub.join(id.to_lowercase());
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe comment id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
fs_mkdir(&[&dir])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
{
let sub = base.join(IndexerKey::RootId.as_str());
fs_mkdir(&[&sub])?;
- let roots: Vec<String> = self.root_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), roots, updated);
+ let roots: Vec<String> = self
+ .root_ids
+ .keys()
+ .filter_map(|root| safe_path_segment(root))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &roots, updated)?;
for (root, ids) in &self.root_ids {
- let dir = sub.join(root);
+ let Some(dir_key) = safe_path_segment(root) else {
+ warn!(root = %root, "Skipping unsafe comment root path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsCommentEventMetadata> = ids
+ let metas: Vec<&RadrootsCommentEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
{
let sub = base.join(IndexerKey::Author.as_str());
fs_mkdir(&[&sub])?;
- let authors: Vec<String> = self.author_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), authors, updated);
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
for (author, ids) in &self.author_ids {
- let dir = sub.join(author);
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe comment author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsCommentEventMetadata> = ids
+ let metas: Vec<&RadrootsCommentEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
{
let sub = base.join(IndexerKey::Npub.as_str());
fs_mkdir(&[&sub])?;
- let npubs: Vec<String> = self.npub_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), npubs, updated);
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
for (npub, ids) in &self.npub_ids {
- let dir = sub.join(npub);
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe comment npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsCommentEventMetadata> = ids
+ let metas: Vec<&RadrootsCommentEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
{
let sub = base.join(IndexerKey::Nip05.as_str());
fs_mkdir(&[&sub])?;
- let names: Vec<String> = self.nip05_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), names, updated);
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
for (name, ids) in &self.nip05_ids {
- let dir = sub.join(name);
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe comment nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsCommentEventMetadata> = ids
+ let metas: Vec<&RadrootsCommentEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
diff --git a/indexer/src/domain/indexer/models/follow.rs b/indexer/src/domain/indexer/models/follow.rs
@@ -0,0 +1,251 @@
+use crate::domain::indexer::key::{FOLLOW_INDEX_DIRECTORY, IndexerKey};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::public_key_to_npub;
+use crate::utils::strings::truncate_log;
+use crate::{
+ audit,
+ domain::{
+ events::follow::ToRadrootsFollowEventIndex,
+ indexer::{
+ kind::IndexerEventKind,
+ models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes},
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::event::RelayIndexerEvent,
+ Settings,
+};
+use radroots_events::follow::{RadrootsFollowEventIndex, RadrootsFollowEventMetadata};
+use std::{collections::BTreeMap, path::PathBuf};
+use tracing::{instrument, warn};
+
+#[derive(Debug)]
+pub struct EventFollowIndexes {
+ events: Vec<RadrootsFollowEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ author_ids: BTreeMap<String, Vec<String>>,
+ npub_ids: BTreeMap<String, Vec<String>>,
+ nip05_ids: BTreeMap<String, Vec<String>>,
+}
+
+impl EventFollowIndexes {
+ pub fn build_with_profiles(
+ raw_events: &[RelayIndexerEvent],
+ profiles: &ProfileResolver,
+ ) -> Result<Self, NostrEventsStaticError> {
+ let mut events = Vec::with_capacity(raw_events.len());
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+
+ for raw in raw_events {
+ match raw.to_radroots_follow_event() {
+ Ok(evt) => {
+ audit::log_indexer_event(&raw);
+ let id = evt.metadata.id.clone();
+ let author_hex = evt.metadata.author.to_lowercase();
+
+ let npub = public_key_to_npub(&author_hex)
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
+ .ok();
+ let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
+
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
+
+ author_ids.entry(author_hex).or_default().push(id.clone());
+ if let Some(n) = npub {
+ npub_ids.entry(n).or_default().push(id.clone());
+ }
+ if let Some(n05) = author_nip05 {
+ nip05_ids.entry(n05).or_default().push(id.clone());
+ }
+ }
+ Err(err) => {
+ warn!(
+ kind = raw.kind.as_u64(),
+ id = %raw.id,
+ author = %raw.author,
+ content = %truncate_log(&raw.content, 1000),
+ tags = ?raw.tags,
+ error = %err,
+ "Skipping malformed follow event"
+ );
+ }
+ }
+ }
+
+ let sort_ids = |ids: &mut Vec<String>,
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsFollowEventIndex]| {
+ ids.sort_unstable_by(|a, b| {
+ let pa = map
+ .get(a)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ let pb = map
+ .get(b)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ pb.cmp(&pa).then(a.cmp(b))
+ });
+ };
+
+ for ids in author_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in npub_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in nip05_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+
+ Ok(Self {
+ events,
+ events_id,
+ author_ids,
+ npub_ids,
+ nip05_ids,
+ })
+ }
+}
+
+impl EventIndexes for EventFollowIndexes {
+ type Event = RelayIndexerEvent;
+
+ fn subdirs() -> &'static [IndexerKey] {
+ &FOLLOW_INDEX_DIRECTORY
+ }
+
+ #[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
+ fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
+ let empty = ProfileResolver::default();
+ Self::build_with_profiles(raw_events, &empty)
+ }
+}
+
+impl WriteEventIndexes for EventFollowIndexes {
+ fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+ let base: PathBuf = IndexerEventKind::Follow.base_path(&settings.indexer.data_dir)?;
+ fs_mkdir(&[&base])?;
+
+ {
+ let idxs_root = base.join("events.json");
+ let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
+ write_json_if_changed(&idxs_root, &ids, updated)?;
+ }
+
+ {
+ let sub = base.join("id");
+ fs_mkdir(&[&sub])?;
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe follow id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Author.as_str());
+ fs_mkdir(&[&sub])?;
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
+
+ for (author, ids) in &self.author_ids {
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe follow author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsFollowEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Npub.as_str());
+ fs_mkdir(&[&sub])?;
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
+
+ for (npub, ids) in &self.npub_ids {
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe follow npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsFollowEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Nip05.as_str());
+ fs_mkdir(&[&sub])?;
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
+
+ for (name, ids) in &self.nip05_ids {
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe follow nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsFollowEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/indexer/src/domain/indexer/models/job_feedback.rs b/indexer/src/domain/indexer/models/job_feedback.rs
@@ -0,0 +1,289 @@
+use crate::domain::indexer::key::{IndexerKey, JOB_FEEDBACK_INDEX_DIRECTORY};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::public_key_to_npub;
+use crate::utils::strings::truncate_log;
+use crate::{
+ audit,
+ domain::{
+ events::job_feedback::ToRadrootsJobFeedbackEventIndex,
+ indexer::{
+ kind::IndexerEventKind,
+ models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes},
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::event::RelayIndexerEvent,
+ Settings,
+};
+use radroots_events::job_feedback::{
+ RadrootsJobFeedbackEventIndex, RadrootsJobFeedbackEventMetadata,
+};
+use std::{collections::BTreeMap, path::PathBuf};
+use tracing::{instrument, warn};
+
+#[derive(Debug)]
+pub struct EventJobFeedbackIndexes {
+ events: Vec<RadrootsJobFeedbackEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ root_ids: BTreeMap<String, Vec<String>>,
+ author_ids: BTreeMap<String, Vec<String>>,
+ npub_ids: BTreeMap<String, Vec<String>>,
+ nip05_ids: BTreeMap<String, Vec<String>>,
+}
+
+impl EventJobFeedbackIndexes {
+ pub fn build_with_profiles(
+ raw_events: &[RelayIndexerEvent],
+ profiles: &ProfileResolver,
+ ) -> Result<Self, NostrEventsStaticError> {
+ let mut events = Vec::with_capacity(raw_events.len());
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut root_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+
+ for raw in raw_events {
+ match raw.to_radroots_job_feedback_event() {
+ Ok(evt) => {
+ audit::log_indexer_event(&raw);
+ let id = evt.metadata.id.clone();
+ let author_hex = evt.metadata.author.to_lowercase();
+
+ let npub = public_key_to_npub(&author_hex)
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
+ .ok();
+ let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
+
+ let root = evt.metadata.job_feedback.request_event.id.to_lowercase();
+
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
+
+ root_ids.entry(root).or_default().push(id.clone());
+ author_ids.entry(author_hex).or_default().push(id.clone());
+ if let Some(n) = npub {
+ npub_ids.entry(n).or_default().push(id.clone());
+ }
+ if let Some(n05) = author_nip05 {
+ nip05_ids.entry(n05).or_default().push(id.clone());
+ }
+ }
+ Err(err) => {
+ warn!(
+ kind = raw.kind.as_u64(),
+ id = %raw.id,
+ author = %raw.author,
+ content = %truncate_log(&raw.content, 1000),
+ tags = ?raw.tags,
+ error = %err,
+ "Skipping malformed job feedback event"
+ );
+ }
+ }
+ }
+
+ let sort_ids = |ids: &mut Vec<String>,
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsJobFeedbackEventIndex]| {
+ ids.sort_unstable_by(|a, b| {
+ let pa = map
+ .get(a)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ let pb = map
+ .get(b)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ pb.cmp(&pa).then(a.cmp(b))
+ });
+ };
+
+ for ids in root_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in author_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in npub_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in nip05_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+
+ Ok(Self {
+ events,
+ events_id,
+ root_ids,
+ author_ids,
+ npub_ids,
+ nip05_ids,
+ })
+ }
+}
+
+impl EventIndexes for EventJobFeedbackIndexes {
+ type Event = RelayIndexerEvent;
+
+ fn subdirs() -> &'static [IndexerKey] {
+ &JOB_FEEDBACK_INDEX_DIRECTORY
+ }
+
+ #[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
+ fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
+ let empty = ProfileResolver::default();
+ Self::build_with_profiles(raw_events, &empty)
+ }
+}
+
+impl WriteEventIndexes for EventJobFeedbackIndexes {
+ fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+ let base: PathBuf = IndexerEventKind::JobFeedback.base_path(&settings.indexer.data_dir)?;
+ fs_mkdir(&[&base])?;
+
+ {
+ let idxs_root = base.join("events.json");
+ let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
+ write_json_if_changed(&idxs_root, &ids, updated)?;
+ }
+
+ {
+ let sub = base.join("id");
+ fs_mkdir(&[&sub])?;
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe job feedback id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::RootId.as_str());
+ fs_mkdir(&[&sub])?;
+ let roots: Vec<String> = self
+ .root_ids
+ .keys()
+ .filter_map(|root| safe_path_segment(root))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &roots, updated)?;
+
+ for (root, ids) in &self.root_ids {
+ let Some(dir_key) = safe_path_segment(root) else {
+ warn!(root = %root, "Skipping unsafe job feedback root path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobFeedbackEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Author.as_str());
+ fs_mkdir(&[&sub])?;
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
+
+ for (author, ids) in &self.author_ids {
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe job feedback author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobFeedbackEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Npub.as_str());
+ fs_mkdir(&[&sub])?;
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
+
+ for (npub, ids) in &self.npub_ids {
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe job feedback npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobFeedbackEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Nip05.as_str());
+ fs_mkdir(&[&sub])?;
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
+
+ for (name, ids) in &self.nip05_ids {
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe job feedback nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobFeedbackEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/indexer/src/domain/indexer/models/job_request.rs b/indexer/src/domain/indexer/models/job_request.rs
@@ -0,0 +1,255 @@
+use crate::domain::indexer::key::{IndexerKey, JOB_REQUEST_INDEX_DIRECTORY};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::public_key_to_npub;
+use crate::utils::strings::truncate_log;
+use crate::{
+ audit,
+ domain::{
+ events::job_request::ToRadrootsJobRequestEventIndex,
+ indexer::{
+ kind::IndexerEventKind,
+ models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes},
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::event::RelayIndexerEvent,
+ Settings,
+};
+use radroots_events::job_request::{
+ RadrootsJobRequestEventIndex, RadrootsJobRequestEventMetadata,
+};
+use radroots_events::kinds::KIND_JOB_REQUEST_MIN;
+use std::{collections::BTreeMap, path::PathBuf};
+use tracing::{instrument, warn};
+
+#[derive(Debug)]
+pub struct EventJobRequestIndexes {
+ events: Vec<RadrootsJobRequestEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ author_ids: BTreeMap<String, Vec<String>>,
+ npub_ids: BTreeMap<String, Vec<String>>,
+ nip05_ids: BTreeMap<String, Vec<String>>,
+}
+
+impl EventJobRequestIndexes {
+ pub fn build_with_profiles(
+ raw_events: &[RelayIndexerEvent],
+ profiles: &ProfileResolver,
+ ) -> Result<Self, NostrEventsStaticError> {
+ let mut events = Vec::with_capacity(raw_events.len());
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+
+ for raw in raw_events {
+ match raw.to_radroots_job_request_event() {
+ Ok(evt) => {
+ audit::log_indexer_event(&raw);
+ let id = evt.metadata.id.clone();
+ let author_hex = evt.metadata.author.to_lowercase();
+
+ let npub = public_key_to_npub(&author_hex)
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
+ .ok();
+ let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
+
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
+
+ author_ids.entry(author_hex).or_default().push(id.clone());
+ if let Some(n) = npub {
+ npub_ids.entry(n).or_default().push(id.clone());
+ }
+ if let Some(n05) = author_nip05 {
+ nip05_ids.entry(n05).or_default().push(id.clone());
+ }
+ }
+ Err(err) => {
+ warn!(
+ kind = raw.kind.as_u64(),
+ id = %raw.id,
+ author = %raw.author,
+ content = %truncate_log(&raw.content, 1000),
+ tags = ?raw.tags,
+ error = %err,
+ "Skipping malformed job request event"
+ );
+ }
+ }
+ }
+
+ let sort_ids = |ids: &mut Vec<String>,
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsJobRequestEventIndex]| {
+ ids.sort_unstable_by(|a, b| {
+ let pa = map
+ .get(a)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ let pb = map
+ .get(b)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ pb.cmp(&pa).then(a.cmp(b))
+ });
+ };
+
+ for ids in author_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in npub_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in nip05_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+
+ Ok(Self {
+ events,
+ events_id,
+ author_ids,
+ npub_ids,
+ nip05_ids,
+ })
+ }
+}
+
+impl EventIndexes for EventJobRequestIndexes {
+ type Event = RelayIndexerEvent;
+
+ fn subdirs() -> &'static [IndexerKey] {
+ &JOB_REQUEST_INDEX_DIRECTORY
+ }
+
+ #[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
+ fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
+ let empty = ProfileResolver::default();
+ Self::build_with_profiles(raw_events, &empty)
+ }
+}
+
+impl WriteEventIndexes for EventJobRequestIndexes {
+ fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+ let base: PathBuf =
+ IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN).base_path(&settings.indexer.data_dir)?;
+ fs_mkdir(&[&base])?;
+
+ {
+ let idxs_root = base.join("events.json");
+ let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
+ write_json_if_changed(&idxs_root, &ids, updated)?;
+ }
+
+ {
+ let sub = base.join("id");
+ fs_mkdir(&[&sub])?;
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe job request id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Author.as_str());
+ fs_mkdir(&[&sub])?;
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
+
+ for (author, ids) in &self.author_ids {
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe job request author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobRequestEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Npub.as_str());
+ fs_mkdir(&[&sub])?;
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
+
+ for (npub, ids) in &self.npub_ids {
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe job request npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobRequestEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Nip05.as_str());
+ fs_mkdir(&[&sub])?;
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
+
+ for (name, ids) in &self.nip05_ids {
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe job request nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobRequestEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/indexer/src/domain/indexer/models/job_result.rs b/indexer/src/domain/indexer/models/job_result.rs
@@ -0,0 +1,291 @@
+use crate::domain::indexer::key::{IndexerKey, JOB_RESULT_INDEX_DIRECTORY};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::public_key_to_npub;
+use crate::utils::strings::truncate_log;
+use crate::{
+ audit,
+ domain::{
+ events::job_result::ToRadrootsJobResultEventIndex,
+ indexer::{
+ kind::IndexerEventKind,
+ models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes},
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::event::RelayIndexerEvent,
+ Settings,
+};
+use radroots_events::job_result::{
+ RadrootsJobResultEventIndex, RadrootsJobResultEventMetadata,
+};
+use radroots_events::kinds::KIND_JOB_RESULT_MIN;
+use std::{collections::BTreeMap, path::PathBuf};
+use tracing::{instrument, warn};
+
+#[derive(Debug)]
+pub struct EventJobResultIndexes {
+ events: Vec<RadrootsJobResultEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ root_ids: BTreeMap<String, Vec<String>>,
+ author_ids: BTreeMap<String, Vec<String>>,
+ npub_ids: BTreeMap<String, Vec<String>>,
+ nip05_ids: BTreeMap<String, Vec<String>>,
+}
+
+impl EventJobResultIndexes {
+ pub fn build_with_profiles(
+ raw_events: &[RelayIndexerEvent],
+ profiles: &ProfileResolver,
+ ) -> Result<Self, NostrEventsStaticError> {
+ let mut events = Vec::with_capacity(raw_events.len());
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut root_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+
+ for raw in raw_events {
+ match raw.to_radroots_job_result_event() {
+ Ok(evt) => {
+ audit::log_indexer_event(&raw);
+ let id = evt.metadata.id.clone();
+ let author_hex = evt.metadata.author.to_lowercase();
+
+ let npub = public_key_to_npub(&author_hex)
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
+ .ok();
+ let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
+
+ let root = evt.metadata.job_result.request_event.id.to_lowercase();
+
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
+
+ root_ids.entry(root).or_default().push(id.clone());
+ author_ids.entry(author_hex).or_default().push(id.clone());
+ if let Some(n) = npub {
+ npub_ids.entry(n).or_default().push(id.clone());
+ }
+ if let Some(n05) = author_nip05 {
+ nip05_ids.entry(n05).or_default().push(id.clone());
+ }
+ }
+ Err(err) => {
+ warn!(
+ kind = raw.kind.as_u64(),
+ id = %raw.id,
+ author = %raw.author,
+ content = %truncate_log(&raw.content, 1000),
+ tags = ?raw.tags,
+ error = %err,
+ "Skipping malformed job result event"
+ );
+ }
+ }
+ }
+
+ let sort_ids = |ids: &mut Vec<String>,
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsJobResultEventIndex]| {
+ ids.sort_unstable_by(|a, b| {
+ let pa = map
+ .get(a)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ let pb = map
+ .get(b)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ pb.cmp(&pa).then(a.cmp(b))
+ });
+ };
+
+ for ids in root_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in author_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in npub_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in nip05_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+
+ Ok(Self {
+ events,
+ events_id,
+ root_ids,
+ author_ids,
+ npub_ids,
+ nip05_ids,
+ })
+ }
+}
+
+impl EventIndexes for EventJobResultIndexes {
+ type Event = RelayIndexerEvent;
+
+ fn subdirs() -> &'static [IndexerKey] {
+ &JOB_RESULT_INDEX_DIRECTORY
+ }
+
+ #[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
+ fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
+ let empty = ProfileResolver::default();
+ Self::build_with_profiles(raw_events, &empty)
+ }
+}
+
+impl WriteEventIndexes for EventJobResultIndexes {
+ fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+ let base: PathBuf =
+ IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN).base_path(&settings.indexer.data_dir)?;
+ fs_mkdir(&[&base])?;
+
+ {
+ let idxs_root = base.join("events.json");
+ let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
+ write_json_if_changed(&idxs_root, &ids, updated)?;
+ }
+
+ {
+ let sub = base.join("id");
+ fs_mkdir(&[&sub])?;
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe job result id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::RootId.as_str());
+ fs_mkdir(&[&sub])?;
+ let roots: Vec<String> = self
+ .root_ids
+ .keys()
+ .filter_map(|root| safe_path_segment(root))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &roots, updated)?;
+
+ for (root, ids) in &self.root_ids {
+ let Some(dir_key) = safe_path_segment(root) else {
+ warn!(root = %root, "Skipping unsafe job result root path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobResultEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Author.as_str());
+ fs_mkdir(&[&sub])?;
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
+
+ for (author, ids) in &self.author_ids {
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe job result author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobResultEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Npub.as_str());
+ fs_mkdir(&[&sub])?;
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
+
+ for (npub, ids) in &self.npub_ids {
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe job result npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobResultEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Nip05.as_str());
+ fs_mkdir(&[&sub])?;
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
+
+ for (name, ids) in &self.nip05_ids {
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe job result nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsJobResultEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/indexer/src/domain/indexer/models/listing.rs b/indexer/src/domain/indexer/models/listing.rs
@@ -1,11 +1,10 @@
-use crate::utils::crypto::compute_hash;
-use crate::utils::io::fs_mkdir;
-use crate::utils::io::{write_hash, write_json};
+use anyhow::anyhow;
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
use crate::utils::nostr::public_key_to_npub;
use crate::utils::strings::truncate_log;
-use radroots_events::listing::models::{RadrootsListingEventIndex, RadrootsListingEventMetadata};
+use radroots_events::listing::{RadrootsListingEventIndex, RadrootsListingEventMetadata};
use radroots_events_indexed::{RadrootsEventsIndexedManifest, RadrootsEventsIndexedShardMetadata};
-use std::{collections::BTreeMap, fs, path::PathBuf};
+use std::{collections::BTreeMap, path::PathBuf};
use tracing::{instrument, warn};
use crate::{
@@ -23,32 +22,10 @@ use crate::{
Settings,
};
-macro_rules! write_if_stale {
- ($path:expr, $data:expr, $updated:expr) => {{
- let hash = compute_hash(&$data)?;
- let hash_path = $path.with_extension("sha256.txt");
-
- let needs_write = if $path.exists() && hash_path.exists() {
- let stored = fs::read_to_string(&hash_path)?;
- stored.trim() != hash
- } else {
- true
- };
-
- if needs_write {
- write_json(&$path, &$data)?;
- write_hash(&$path, &hash)?;
- $updated.push($path.clone());
- }
-
- hash
- }};
-}
-
#[derive(Debug)]
pub struct EventListingIndexes {
events: Vec<RadrootsListingEventIndex>,
- events_id: BTreeMap<String, RadrootsListingEventIndex>,
+ events_id: BTreeMap<String, usize>,
country_ids: BTreeMap<String, Vec<String>>,
author_ids: BTreeMap<String, Vec<String>>,
npub_ids: BTreeMap<String, Vec<String>>,
@@ -61,14 +38,14 @@ impl EventListingIndexes {
profiles: &ProfileResolver,
) -> Result<Self, NostrEventsStaticError> {
let mut events: Vec<RadrootsListingEventIndex> = Vec::with_capacity(raw_events.len());
- let mut events_id: BTreeMap<String, RadrootsListingEventIndex> = BTreeMap::new();
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
let mut country_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
for raw in raw_events {
- match raw.clone().to_radroots_listing_event() {
+ match raw.to_radroots_listing_event() {
Ok(evt) => {
audit::log_listing_event(&evt);
@@ -76,7 +53,10 @@ impl EventListingIndexes {
let author_hex = evt.metadata.author.to_lowercase();
let npub = public_key_to_npub(&author_hex)
- .map(|s| s.to_lowercase())
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
.ok();
let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
@@ -88,8 +68,9 @@ impl EventListingIndexes {
.and_then(|loc| loc.country.as_ref())
.map(|c| c.to_lowercase());
- events_id.insert(id.clone(), evt.clone());
- events.push(evt.clone());
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
if let Some(country) = country_opt {
country_ids.entry(country).or_default().push(id.clone());
@@ -101,7 +82,6 @@ impl EventListingIndexes {
npub_ids.entry(n).or_default().push(id.clone());
}
if let Some(n05) = author_nip05 {
- let n05 = n05.to_lowercase();
nip05_ids.entry(n05).or_default().push(id.clone());
}
}
@@ -120,38 +100,39 @@ impl EventListingIndexes {
}
let sort_ids = |ids: &mut Vec<String>,
- map: &BTreeMap<String, RadrootsListingEventIndex>| {
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsListingEventIndex]| {
ids.sort_unstable_by(|a, b| {
let pa = map
.get(a)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
let pb = map
.get(b)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
pb.cmp(&pa).then(a.cmp(b))
});
};
for ids in country_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in author_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in npub_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in nip05_ids.values_mut() {
ids.sort_unstable_by(|a, b| {
let pa = events_id
.get(a)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
let pb = events_id
.get(b)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
pb.cmp(&pa).then(a.cmp(b))
});
@@ -188,6 +169,9 @@ impl EventListingIndexes {
}
fn shard_vec<T: Clone>(items: &[T], size: usize) -> Vec<Vec<T>> {
+ if items.is_empty() {
+ return Vec::new();
+ }
if size == 0 {
return vec![items.to_vec()];
}
@@ -200,6 +184,23 @@ impl EventListingIndexes {
}
out
}
+
+ fn manifest_shard_size(configured: usize, len: usize) -> usize {
+ if configured == 0 {
+ len
+ } else {
+ configured
+ }
+ }
+
+ fn effective_shard_size(configured: usize, len: usize) -> usize {
+ let size = Self::manifest_shard_size(configured, len);
+ size.max(1)
+ }
+
+ fn usize_to_u32(value: usize, label: &str) -> anyhow::Result<u32> {
+ u32::try_from(value).map_err(|_| anyhow!("{label} too large for u32"))
+ }
}
impl WriteEventIndexes for EventListingIndexes {
@@ -210,44 +211,66 @@ impl WriteEventIndexes for EventListingIndexes {
{
let idxs_root = base.join("events.json");
let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
- write_if_stale!(idxs_root, ids, updated);
+ write_json_if_changed(&idxs_root, &ids, updated)?;
}
{
let sub = base.join("id");
fs_mkdir(&[&sub])?;
- let keys: Vec<String> = self.events_id.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), keys, updated);
-
- for (id, evt) in &self.events_id {
- let dir = sub.join(id.to_lowercase());
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe listing id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
fs_mkdir(&[&dir])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("data.json"), evt.metadata.clone(), updated);
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("data.json"), &evt.metadata, updated)?;
}
}
{
let sub_country = base.join(crate::domain::indexer::key::IndexerKey::Country.as_str());
fs_mkdir(&[&sub_country])?;
- let country_codes: Vec<String> = self.country_ids.keys().cloned().collect();
- write_if_stale!(sub_country.join("indexes.json"), country_codes, updated);
+ let country_codes: Vec<String> = self
+ .country_ids
+ .keys()
+ .filter_map(|cc| safe_path_segment(cc))
+ .collect();
+ write_json_if_changed(&sub_country.join("indexes.json"), &country_codes, updated)?;
for (cc, ids) in &self.country_ids {
- let cc_dir = sub_country.join(cc);
+ let Some(dir_key) = safe_path_segment(cc) else {
+ warn!(country = %cc, "Skipping unsafe country path segment");
+ continue;
+ };
+ let cc_dir = sub_country.join(dir_key);
let shards_dir = cc_dir.join("shards");
fs_mkdir(&[&cc_dir])?;
fs_mkdir(&[&shards_dir])?;
- let mut data_items: Vec<RadrootsListingEventMetadata> =
+ let mut data_items: Vec<&RadrootsListingEventMetadata> =
Vec::with_capacity(ids.len());
for id in ids {
- if let Some(evt) = self.events_id.get(id) {
- data_items.push(evt.metadata.clone());
+ if let Some(idx) = self.events_id.get(id) {
+ data_items.push(&self.events[*idx].metadata);
}
}
let shard_size = settings.listings.country_shard_size;
+ let manifest_shard_size =
+ Self::manifest_shard_size(shard_size, data_items.len());
+ let effective_shard_size =
+ Self::effective_shard_size(shard_size, data_items.len());
let shards = Self::shard_vec(&data_items, shard_size);
@@ -260,8 +283,8 @@ impl WriteEventIndexes for EventListingIndexes {
let mut manifest = RadrootsEventsIndexedManifest {
country: cc.clone(),
- total: u32::try_from(data_items.len()).expect("too many data items for u32"),
- shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"),
+ total: Self::usize_to_u32(data_items.len(), "data items")?,
+ shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?,
first_published_at: country_first_pub,
last_published_at: country_last_pub,
shards: Vec::with_capacity(shards.len()),
@@ -274,11 +297,12 @@ impl WriteEventIndexes for EventListingIndexes {
fs_mkdir(&[&parent])?;
}
- let sha = write_if_stale!(file_abs, chunk, updated);
+ let sha = write_json_if_changed(&file_abs, &chunk, updated)?;
let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = (
- data_items.get(ix * shard_size),
- data_items.get(((ix + 1) * shard_size).saturating_sub(1)),
+ data_items.get(ix * effective_shard_size),
+ data_items
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1)),
) {
(f.id.clone(), f.published_at, l.id.clone(), l.published_at)
} else {
@@ -295,7 +319,7 @@ impl WriteEventIndexes for EventListingIndexes {
manifest.shards.push(RadrootsEventsIndexedShardMetadata {
file: file_rel,
- count: u32::try_from(chunk.len()).expect("chunk length too large for u32"),
+ count: Self::usize_to_u32(chunk.len(), "chunk length")?,
first_id,
last_id,
first_published_at: first_pub,
@@ -304,30 +328,42 @@ impl WriteEventIndexes for EventListingIndexes {
});
}
- write_if_stale!(cc_dir.join("manifest.json"), manifest, updated);
+ write_json_if_changed(&cc_dir.join("manifest.json"), &manifest, updated)?;
}
}
{
let sub_author = base.join(crate::domain::indexer::key::IndexerKey::Author.as_str());
fs_mkdir(&[&sub_author])?;
- let authors: Vec<String> = self.author_ids.keys().cloned().collect();
- write_if_stale!(sub_author.join("indexes.json"), authors, updated);
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub_author.join("indexes.json"), &authors, updated)?;
for (author, ids) in &self.author_ids {
- let dir = sub_author.join(author);
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe author path segment");
+ continue;
+ };
+ let dir = sub_author.join(dir_key);
let shards_dir = dir.join("shards");
fs_mkdir(&[&dir, &shards_dir])?;
- let mut data_items: Vec<RadrootsListingEventMetadata> =
+ let mut data_items: Vec<&RadrootsListingEventMetadata> =
Vec::with_capacity(ids.len());
for id in ids {
- if let Some(evt) = self.events_id.get(id) {
- data_items.push(evt.metadata.clone());
+ if let Some(idx) = self.events_id.get(id) {
+ data_items.push(&self.events[*idx].metadata);
}
}
let shard_size = settings.listings.profile_shard_size;
+ let manifest_shard_size =
+ Self::manifest_shard_size(shard_size, data_items.len());
+ let effective_shard_size =
+ Self::effective_shard_size(shard_size, data_items.len());
let shards = Self::shard_vec(&data_items, shard_size);
let (first_pub, last_pub) =
@@ -339,8 +375,8 @@ impl WriteEventIndexes for EventListingIndexes {
let mut manifest = RadrootsEventsIndexedManifest {
country: author.clone(),
- total: u32::try_from(data_items.len()).expect("too many data items for u32"),
- shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"),
+ total: Self::usize_to_u32(data_items.len(), "data items")?,
+ shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?,
first_published_at: first_pub,
last_published_at: last_pub,
shards: Vec::with_capacity(shards.len()),
@@ -353,23 +389,24 @@ impl WriteEventIndexes for EventListingIndexes {
fs_mkdir(&[&parent])?;
}
- let sha = write_if_stale!(file_abs, chunk, updated);
+ let sha = write_json_if_changed(&file_abs, &chunk, updated)?;
let (first_id, first_published_at, last_id, last_published_at) =
if let (Some(f), Some(l)) = (
- data_items.get(ix * shard_size),
- data_items.get(((ix + 1) * shard_size).saturating_sub(1)),
+ data_items.get(ix * effective_shard_size),
+ data_items
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1)),
) {
(f.id.clone(), f.published_at, l.id.clone(), l.published_at)
} else {
let fp = data_items
- .get(ix * shard_size)
+ .get(ix * effective_shard_size)
.map(|x| (x.id.clone(), x.published_at))
.or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at)))
.unwrap_or_default();
let lp = data_items
- .get(((ix + 1) * shard_size).saturating_sub(1))
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1))
.map(|x| (x.id.clone(), x.published_at))
.or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at)))
.unwrap_or_default();
@@ -379,11 +416,7 @@ impl WriteEventIndexes for EventListingIndexes {
manifest.shards.push(RadrootsEventsIndexedShardMetadata {
file: file_rel,
- count: u32::try_from(std::cmp::min(
- shard_size,
- data_items.len().saturating_sub(ix * shard_size),
- ))
- .expect("chunk length too large for u32"),
+ count: Self::usize_to_u32(chunk.len(), "chunk length")?,
first_id,
last_id,
first_published_at,
@@ -392,30 +425,42 @@ impl WriteEventIndexes for EventListingIndexes {
});
}
- write_if_stale!(dir.join("manifest.json"), manifest, updated);
+ write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?;
}
}
{
let sub_npub = base.join(crate::domain::indexer::key::IndexerKey::Npub.as_str());
fs_mkdir(&[&sub_npub])?;
- let npubs: Vec<String> = self.npub_ids.keys().cloned().collect();
- write_if_stale!(sub_npub.join("indexes.json"), npubs, updated);
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub_npub.join("indexes.json"), &npubs, updated)?;
for (npub, ids) in &self.npub_ids {
- let dir = sub_npub.join(npub);
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe npub path segment");
+ continue;
+ };
+ let dir = sub_npub.join(dir_key);
let shards_dir = dir.join("shards");
fs_mkdir(&[&dir, &shards_dir])?;
- let mut data_items: Vec<RadrootsListingEventMetadata> =
+ let mut data_items: Vec<&RadrootsListingEventMetadata> =
Vec::with_capacity(ids.len());
for id in ids {
- if let Some(evt) = self.events_id.get(id) {
- data_items.push(evt.metadata.clone());
+ if let Some(idx) = self.events_id.get(id) {
+ data_items.push(&self.events[*idx].metadata);
}
}
let shard_size = settings.listings.profile_shard_size;
+ let manifest_shard_size =
+ Self::manifest_shard_size(shard_size, data_items.len());
+ let effective_shard_size =
+ Self::effective_shard_size(shard_size, data_items.len());
let shards = Self::shard_vec(&data_items, shard_size);
let (first_pub, last_pub) =
@@ -427,8 +472,8 @@ impl WriteEventIndexes for EventListingIndexes {
let mut manifest = RadrootsEventsIndexedManifest {
country: npub.clone(),
- total: u32::try_from(data_items.len()).expect("too many data items for u32"),
- shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"),
+ total: Self::usize_to_u32(data_items.len(), "data items")?,
+ shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?,
first_published_at: first_pub,
last_published_at: last_pub,
shards: Vec::with_capacity(shards.len()),
@@ -441,23 +486,24 @@ impl WriteEventIndexes for EventListingIndexes {
fs_mkdir(&[&parent])?;
}
- let sha = write_if_stale!(file_abs, chunk, updated);
+ let sha = write_json_if_changed(&file_abs, &chunk, updated)?;
let (first_id, first_published_at, last_id, last_published_at) =
if let (Some(f), Some(l)) = (
- data_items.get(ix * shard_size),
- data_items.get(((ix + 1) * shard_size).saturating_sub(1)),
+ data_items.get(ix * effective_shard_size),
+ data_items
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1)),
) {
(f.id.clone(), f.published_at, l.id.clone(), l.published_at)
} else {
let fp = data_items
- .get(ix * shard_size)
+ .get(ix * effective_shard_size)
.map(|x| (x.id.clone(), x.published_at))
.or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at)))
.unwrap_or_default();
let lp = data_items
- .get(((ix + 1) * shard_size).saturating_sub(1))
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1))
.map(|x| (x.id.clone(), x.published_at))
.or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at)))
.unwrap_or_default();
@@ -467,11 +513,7 @@ impl WriteEventIndexes for EventListingIndexes {
manifest.shards.push(RadrootsEventsIndexedShardMetadata {
file: file_rel,
- count: u32::try_from(std::cmp::min(
- shard_size,
- data_items.len().saturating_sub(ix * shard_size),
- ))
- .expect("chunk length too large for u32"),
+ count: Self::usize_to_u32(chunk.len(), "chunk length")?,
first_id,
last_id,
first_published_at,
@@ -480,28 +522,41 @@ impl WriteEventIndexes for EventListingIndexes {
});
}
- write_if_stale!(dir.join("manifest.json"), manifest, updated);
+ write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?;
}
{
let sub_nip05 = base.join(crate::domain::indexer::key::IndexerKey::Nip05.as_str());
fs_mkdir(&[&sub_nip05])?;
- let names: Vec<String> = self.nip05_ids.keys().cloned().collect();
- write_if_stale!(sub_nip05.join("indexes.json"), names, updated);
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub_nip05.join("indexes.json"), &names, updated)?;
for (name, ids) in &self.nip05_ids {
- let dir = sub_nip05.join(name);
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe nip05 path segment");
+ continue;
+ };
+ let dir = sub_nip05.join(dir_key);
let shards_dir = dir.join("shards");
fs_mkdir(&[&dir, &shards_dir])?;
- let mut data_items = Vec::with_capacity(ids.len());
+ let mut data_items: Vec<&RadrootsListingEventMetadata> =
+ Vec::with_capacity(ids.len());
for id in ids {
- if let Some(evt) = self.events_id.get(id) {
- data_items.push(evt.metadata.clone());
+ if let Some(idx) = self.events_id.get(id) {
+ data_items.push(&self.events[*idx].metadata);
}
}
let shard_size = settings.listings.profile_shard_size;
+ let manifest_shard_size =
+ Self::manifest_shard_size(shard_size, data_items.len());
+ let effective_shard_size =
+ Self::effective_shard_size(shard_size, data_items.len());
let shards = Self::shard_vec(&data_items, shard_size);
let (first_pub, last_pub) =
@@ -513,8 +568,8 @@ impl WriteEventIndexes for EventListingIndexes {
let mut manifest = RadrootsEventsIndexedManifest {
country: name.clone(),
- total: u32::try_from(data_items.len()).expect("u32 overflow"),
- shard_size: u32::try_from(shard_size).expect("u32 overflow"),
+ total: Self::usize_to_u32(data_items.len(), "data items")?,
+ shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?,
first_published_at: first_pub,
last_published_at: last_pub,
shards: Vec::with_capacity(shards.len()),
@@ -527,11 +582,12 @@ impl WriteEventIndexes for EventListingIndexes {
fs_mkdir(&[&parent])?;
}
- let sha = write_if_stale!(file_abs, chunk, updated);
+ let sha = write_json_if_changed(&file_abs, &chunk, updated)?;
let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = (
- data_items.get(ix * shard_size),
- data_items.get(((ix + 1) * shard_size).saturating_sub(1)),
+ data_items.get(ix * effective_shard_size),
+ data_items
+ .get(((ix + 1) * effective_shard_size).saturating_sub(1)),
) {
(f.id.clone(), f.published_at, l.id.clone(), l.published_at)
} else {
@@ -548,11 +604,7 @@ impl WriteEventIndexes for EventListingIndexes {
manifest.shards.push(RadrootsEventsIndexedShardMetadata {
file: file_rel,
- count: u32::try_from(std::cmp::min(
- shard_size,
- data_items.len().saturating_sub(ix * shard_size),
- ))
- .expect("u32 overflow"),
+ count: Self::usize_to_u32(chunk.len(), "chunk length")?,
first_id,
last_id,
first_published_at: first_pub,
@@ -561,7 +613,7 @@ impl WriteEventIndexes for EventListingIndexes {
});
}
- write_if_stale!(dir.join("manifest.json"), manifest, updated);
+ write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?;
}
}
}
@@ -569,3 +621,23 @@ impl WriteEventIndexes for EventListingIndexes {
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::EventListingIndexes;
+
+ #[test]
+ fn shard_vec_empty_returns_empty() {
+ let items: Vec<u32> = Vec::new();
+ let shards = EventListingIndexes::shard_vec(&items, 0);
+ assert!(shards.is_empty());
+ }
+
+ #[test]
+ fn shard_vec_zero_size_groups_all() {
+ let items = vec![1u32, 2, 3];
+ let shards = EventListingIndexes::shard_vec(&items, 0);
+ assert_eq!(shards.len(), 1);
+ assert_eq!(shards[0], items);
+ }
+}
diff --git a/indexer/src/domain/indexer/models/mod.rs b/indexer/src/domain/indexer/models/mod.rs
@@ -1,10 +1,20 @@
pub mod comment;
+pub mod follow;
+pub mod job_feedback;
+pub mod job_request;
+pub mod job_result;
pub mod listing;
+pub mod post;
pub mod profile;
pub mod reaction;
pub use comment::EventCommentIndexes;
+pub use follow::EventFollowIndexes;
+pub use job_feedback::EventJobFeedbackIndexes;
+pub use job_request::EventJobRequestIndexes;
+pub use job_result::EventJobResultIndexes;
pub use listing::EventListingIndexes;
+pub use post::EventPostIndexes;
pub use profile::EventProfileIndexes;
pub use reaction::EventReactionIndexes;
diff --git a/indexer/src/domain/indexer/models/post.rs b/indexer/src/domain/indexer/models/post.rs
@@ -0,0 +1,251 @@
+use crate::domain::indexer::key::{IndexerKey, POST_INDEX_DIRECTORY};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::public_key_to_npub;
+use crate::utils::strings::truncate_log;
+use crate::{
+ audit,
+ domain::{
+ events::post::ToRadrootsPostEventIndex,
+ indexer::{
+ kind::IndexerEventKind,
+ models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes},
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::event::RelayIndexerEvent,
+ Settings,
+};
+use radroots_events::post::{RadrootsPostEventIndex, RadrootsPostEventMetadata};
+use std::{collections::BTreeMap, path::PathBuf};
+use tracing::{instrument, warn};
+
+#[derive(Debug)]
+pub struct EventPostIndexes {
+ events: Vec<RadrootsPostEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ author_ids: BTreeMap<String, Vec<String>>,
+ npub_ids: BTreeMap<String, Vec<String>>,
+ nip05_ids: BTreeMap<String, Vec<String>>,
+}
+
+impl EventPostIndexes {
+ pub fn build_with_profiles(
+ raw_events: &[RelayIndexerEvent],
+ profiles: &ProfileResolver,
+ ) -> Result<Self, NostrEventsStaticError> {
+ let mut events = Vec::with_capacity(raw_events.len());
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+ let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
+
+ for raw in raw_events {
+ match raw.to_radroots_post_event() {
+ Ok(evt) => {
+ audit::log_indexer_event(&raw);
+ let id = evt.metadata.id.clone();
+ let author_hex = evt.metadata.author.to_lowercase();
+
+ let npub = public_key_to_npub(&author_hex)
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
+ .ok();
+ let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
+
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
+
+ author_ids.entry(author_hex).or_default().push(id.clone());
+ if let Some(n) = npub {
+ npub_ids.entry(n).or_default().push(id.clone());
+ }
+ if let Some(n05) = author_nip05 {
+ nip05_ids.entry(n05).or_default().push(id.clone());
+ }
+ }
+ Err(err) => {
+ warn!(
+ kind = raw.kind.as_u64(),
+ id = %raw.id,
+ author = %raw.author,
+ content = %truncate_log(&raw.content, 1000),
+ tags = ?raw.tags,
+ error = %err,
+ "Skipping malformed post event"
+ );
+ }
+ }
+ }
+
+ let sort_ids = |ids: &mut Vec<String>,
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsPostEventIndex]| {
+ ids.sort_unstable_by(|a, b| {
+ let pa = map
+ .get(a)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ let pb = map
+ .get(b)
+ .map(|idx| events[*idx].metadata.published_at)
+ .unwrap_or_default();
+ pb.cmp(&pa).then(a.cmp(b))
+ });
+ };
+
+ for ids in author_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in npub_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+ for ids in nip05_ids.values_mut() {
+ sort_ids(ids, &events_id, &events);
+ }
+
+ Ok(Self {
+ events,
+ events_id,
+ author_ids,
+ npub_ids,
+ nip05_ids,
+ })
+ }
+}
+
+impl EventIndexes for EventPostIndexes {
+ type Event = RelayIndexerEvent;
+
+ fn subdirs() -> &'static [IndexerKey] {
+ &POST_INDEX_DIRECTORY
+ }
+
+ #[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
+ fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
+ let empty = ProfileResolver::default();
+ Self::build_with_profiles(raw_events, &empty)
+ }
+}
+
+impl WriteEventIndexes for EventPostIndexes {
+ fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> {
+ let base: PathBuf = IndexerEventKind::Post.base_path(&settings.indexer.data_dir)?;
+ fs_mkdir(&[&base])?;
+
+ {
+ let idxs_root = base.join("events.json");
+ let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
+ write_json_if_changed(&idxs_root, &ids, updated)?;
+ }
+
+ {
+ let sub = base.join("id");
+ fs_mkdir(&[&sub])?;
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
+
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe post id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Author.as_str());
+ fs_mkdir(&[&sub])?;
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
+
+ for (author, ids) in &self.author_ids {
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe post author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsPostEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Npub.as_str());
+ fs_mkdir(&[&sub])?;
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
+
+ for (npub, ids) in &self.npub_ids {
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe post npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsPostEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ {
+ let sub = base.join(IndexerKey::Nip05.as_str());
+ fs_mkdir(&[&sub])?;
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
+
+ for (name, ids) in &self.nip05_ids {
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe post nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ fs_mkdir(&[&dir])?;
+ let metas: Vec<&RadrootsPostEventMetadata> = ids
+ .iter()
+ .filter_map(|id| self.events_id.get(id))
+ .map(|idx| &self.events[*idx].metadata)
+ .collect();
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/indexer/src/domain/indexer/models/profile.rs b/indexer/src/domain/indexer/models/profile.rs
@@ -1,8 +1,6 @@
use crate::domain::indexer::key::{IndexerKey, PROFILE_INDEX_DIRECTORY};
-use crate::utils::crypto::compute_hash;
-use crate::utils::io::fs_mkdir;
-use crate::utils::io::{write_hash, write_json};
-use crate::utils::nostr::public_key_to_npub;
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
+use crate::utils::nostr::{normalize_nip05, public_key_to_npub};
use crate::utils::strings::truncate_log;
use crate::{
audit,
@@ -16,37 +14,17 @@ use crate::{
relay::event::RelayIndexerEvent,
Settings,
};
-use radroots_events::profile::models::RadrootsProfileEventIndex;
-use std::{collections::BTreeMap, fs, path::PathBuf};
+use radroots_events::profile::RadrootsProfileEventIndex;
+use std::{collections::BTreeMap, path::PathBuf};
use tracing::{instrument, warn};
-macro_rules! write_if_stale {
- ($path:expr, $data:expr, $updated:expr) => {{
- let hash = compute_hash(&$data)?;
- let hash_path = $path.with_extension("sha256.txt");
-
- let needs_write = if $path.exists() && hash_path.exists() {
- let stored = fs::read_to_string(&hash_path)?;
- stored.trim() != hash
- } else {
- true
- };
-
- if needs_write {
- write_json(&$path, &$data)?;
- write_hash(&$path, &hash)?;
- $updated.push($path.clone());
- }
- }};
-}
-
#[derive(Debug)]
pub struct EventProfileIndexes {
events: Vec<RadrootsProfileEventIndex>,
- events_id: BTreeMap<String, RadrootsProfileEventIndex>,
- events_author: BTreeMap<String, RadrootsProfileEventIndex>,
- events_nip05: BTreeMap<String, RadrootsProfileEventIndex>,
- events_npub: BTreeMap<String, RadrootsProfileEventIndex>,
+ events_id: BTreeMap<String, usize>,
+ events_author: BTreeMap<String, usize>,
+ events_nip05: BTreeMap<String, usize>,
+ events_npub: BTreeMap<String, usize>,
}
impl EventIndexes for EventProfileIndexes {
@@ -59,27 +37,75 @@ impl EventIndexes for EventProfileIndexes {
#[instrument(skip(raw_events), fields(event_count = raw_events.len()))]
fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> {
let mut events = Vec::with_capacity(raw_events.len());
- let mut events_id = BTreeMap::new();
- let mut events_author = BTreeMap::new();
- let mut events_nip05 = BTreeMap::new();
- let mut events_npub = BTreeMap::new();
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
+ let mut events_author: BTreeMap<String, usize> = BTreeMap::new();
+ let mut events_nip05: BTreeMap<String, usize> = BTreeMap::new();
+ let mut events_npub: BTreeMap<String, usize> = BTreeMap::new();
+
+ let should_replace = |existing_idx: usize,
+ candidate_idx: usize,
+ events: &[RadrootsProfileEventIndex]| {
+ let existing = &events[existing_idx];
+ let candidate = &events[candidate_idx];
+ let new_ts = candidate.metadata.published_at;
+ let old_ts = existing.metadata.published_at;
+ if new_ts > old_ts {
+ true
+ } else if new_ts < old_ts {
+ false
+ } else {
+ candidate.event.id < existing.event.id
+ }
+ };
for raw in raw_events {
- match raw.clone().to_radroots_profile_event() {
+ match raw.to_radroots_profile_event() {
Ok(evt) => {
audit::log_profile_event(&evt);
let id = evt.event.id.clone();
let author = evt.event.author.clone();
- events.push(evt.clone());
- events_id.insert(id.clone(), evt.clone());
- events_author.insert(author.clone(), evt.clone());
+ let npub_key = public_key_to_npub(&author).ok().map(|mut npub| {
+ npub.make_ascii_lowercase();
+ npub
+ });
+ let nip05_index_key = evt
+ .metadata
+ .profile
+ .nip05
+ .as_deref()
+ .map(normalize_nip05)
+ .map(|(_, _, index_key)| index_key);
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id, idx);
+
+ let replace_author = events_author
+ .get(&author)
+ .map(|&existing_idx| should_replace(existing_idx, idx, &events))
+ .unwrap_or(true);
+ if replace_author {
+ events_author.insert(author, idx);
+ }
- if let Ok(npub) = public_key_to_npub(&author) {
- events_npub.insert(npub.to_lowercase(), evt.clone());
+ if let Some(key) = npub_key {
+ let replace_npub = events_npub
+ .get(&key)
+ .map(|&existing_idx| should_replace(existing_idx, idx, &events))
+ .unwrap_or(true);
+ if replace_npub {
+ events_npub.insert(key, idx);
+ }
}
- if let Some(nip05) = &evt.metadata.profile.nip05 {
- let normalized = nip05.replace("@radroots.market", "");
- events_nip05.insert(normalized, evt.clone());
+ if let Some(index_key) = nip05_index_key {
+ if !index_key.is_empty() {
+ let replace_nip05 = events_nip05
+ .get(&index_key)
+ .map(|&existing_idx| should_replace(existing_idx, idx, &events))
+ .unwrap_or(true);
+ if replace_nip05 {
+ events_nip05.insert(index_key, idx);
+ }
+ }
}
}
Err(err) => {
@@ -113,57 +139,93 @@ impl WriteEventIndexes for EventProfileIndexes {
let idxs_root = base.join("events.json");
let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
- write_if_stale!(idxs_root, ids, updated);
+ write_json_if_changed(&idxs_root, &ids, updated)?;
for &subdir in Self::subdirs().iter() {
let sub_base = base.join(subdir.as_str());
- fs_mkdir(&[sub_base.to_str().unwrap()])?;
+ fs_mkdir(&[&sub_base])?;
let keys_lower: Vec<String> = match subdir {
- IndexerKey::Id => self.events_id.keys().map(|k| k.to_lowercase()).collect(),
+ IndexerKey::Id => self
+ .events_id
+ .keys()
+ .filter_map(|k| safe_path_segment(&k.to_lowercase()))
+ .collect(),
IndexerKey::Author => self
.events_author
.keys()
- .map(|k| k.to_lowercase())
+ .filter_map(|k| safe_path_segment(&k.to_lowercase()))
+ .collect(),
+ IndexerKey::Nip05 => self
+ .events_nip05
+ .keys()
+ .filter_map(|k| safe_path_segment(&k.to_lowercase()))
+ .collect(),
+ IndexerKey::Npub => self
+ .events_npub
+ .keys()
+ .filter_map(|k| safe_path_segment(&k.to_lowercase()))
.collect(),
- IndexerKey::Nip05 => self.events_nip05.keys().map(|k| k.to_lowercase()).collect(),
- IndexerKey::Npub => self.events_npub.keys().map(|k| k.to_lowercase()).collect(),
_ => Vec::new(),
};
let idxs_subdir = sub_base.join("indexes.json");
- write_if_stale!(idxs_subdir, keys_lower, updated);
+ write_json_if_changed(&idxs_subdir, &keys_lower, updated)?;
match subdir {
IndexerKey::Id => {
- for (key, evt) in &self.events_id {
- let dir = sub_base.join(key.to_lowercase());
- fs_mkdir(&[dir.to_str().unwrap()])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ for (key, idx) in &self.events_id {
+ let key_lower = key.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&key_lower) else {
+ warn!(key = %key, "Skipping unsafe profile id path segment");
+ continue;
+ };
+ let dir = sub_base.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
IndexerKey::Author => {
- for (key, evt) in &self.events_author {
- let dir = sub_base.join(key.to_lowercase());
- fs_mkdir(&[dir.to_str().unwrap()])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ for (key, idx) in &self.events_author {
+ let key_lower = key.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&key_lower) else {
+ warn!(key = %key, "Skipping unsafe profile author path segment");
+ continue;
+ };
+ let dir = sub_base.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
IndexerKey::Nip05 => {
- for (key, evt) in &self.events_nip05 {
- let dir = sub_base.join(key.to_lowercase());
- fs_mkdir(&[dir.to_str().unwrap()])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ for (key, idx) in &self.events_nip05 {
+ let key_lower = key.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&key_lower) else {
+ warn!(key = %key, "Skipping unsafe profile nip05 path segment");
+ continue;
+ };
+ let dir = sub_base.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
IndexerKey::Npub => {
- for (key, evt) in &self.events_npub {
- let dir = sub_base.join(key.to_lowercase());
- fs_mkdir(&[dir.to_str().unwrap()])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ for (key, idx) in &self.events_npub {
+ let key_lower = key.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&key_lower) else {
+ warn!(key = %key, "Skipping unsafe profile npub path segment");
+ continue;
+ };
+ let dir = sub_base.join(dir_key);
+ let evt = &self.events[*idx];
+ fs_mkdir(&[&dir])?;
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
_ => {}
@@ -173,3 +235,48 @@ impl WriteEventIndexes for EventProfileIndexes {
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::EventProfileIndexes;
+ use crate::domain::indexer::kind::IndexerEventKind;
+ use crate::relay::event::RelayIndexerEvent;
+
+ fn make_profile_event(id: &str, author: &str, created_at: u32, name: &str) -> RelayIndexerEvent {
+ RelayIndexerEvent {
+ id: id.to_string(),
+ author: author.to_string(),
+ created_at,
+ pubkey: author.to_string(),
+ kind: IndexerEventKind::Profile,
+ tags: Vec::new(),
+ content: format!(r#"{{"name":"{}"}}"#, name),
+ hash: id.to_string(),
+ sig: "sig".to_string(),
+ }
+ }
+
+ #[test]
+ fn profile_index_uses_latest_event() {
+ let author = "a".repeat(64);
+ let older = make_profile_event(&"b".repeat(64), &author, 10, "old");
+ let newer = make_profile_event(&"c".repeat(64), &author, 20, "new");
+
+ let indexes = EventProfileIndexes::build(&[older, newer]).expect("build");
+ let idx = *indexes.events_author.get(&author).expect("author index");
+ assert_eq!(indexes.events[idx].metadata.profile.name, "new");
+ }
+
+ #[test]
+ fn profile_index_tiebreaks_by_id() {
+ let author = "b".repeat(64);
+ let low_id = "0".repeat(64);
+ let high_id = "f".repeat(64);
+ let first = make_profile_event(&high_id, &author, 10, "high");
+ let second = make_profile_event(&low_id, &author, 10, "low");
+
+ let indexes = EventProfileIndexes::build(&[first, second]).expect("build");
+ let idx = *indexes.events_author.get(&author).expect("author index");
+ assert_eq!(indexes.events[idx].event.id, low_id);
+ }
+}
diff --git a/indexer/src/domain/indexer/models/reaction.rs b/indexer/src/domain/indexer/models/reaction.rs
@@ -1,7 +1,5 @@
use crate::domain::indexer::key::{IndexerKey, REACTION_INDEX_DIRECTORY};
-use crate::utils::crypto::compute_hash;
-use crate::utils::io::fs_mkdir;
-use crate::utils::io::{write_hash, write_json};
+use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed};
use crate::utils::nostr::public_key_to_npub;
use crate::utils::strings::truncate_log;
use crate::{
@@ -17,36 +15,16 @@ use crate::{
relay::event::RelayIndexerEvent,
Settings,
};
-use radroots_events::reaction::models::{
+use radroots_events::reaction::{
RadrootsReactionEventIndex, RadrootsReactionEventMetadata,
};
-use std::{collections::BTreeMap, fs, path::PathBuf};
+use std::{collections::BTreeMap, path::PathBuf};
use tracing::{instrument, warn};
-macro_rules! write_if_stale {
- ($path:expr, $data:expr, $updated:expr) => {{
- let hash = compute_hash(&$data)?;
- let hash_path = $path.with_extension("sha256.txt");
-
- let needs_write = if $path.exists() && hash_path.exists() {
- let stored = fs::read_to_string(&hash_path)?;
- stored.trim() != hash
- } else {
- true
- };
-
- if needs_write {
- write_json(&$path, &$data)?;
- write_hash(&$path, &hash)?;
- $updated.push($path.clone());
- }
- }};
-}
-
#[derive(Debug)]
pub struct EventReactionIndexes {
events: Vec<RadrootsReactionEventIndex>,
- events_id: BTreeMap<String, RadrootsReactionEventIndex>,
+ events_id: BTreeMap<String, usize>,
root_ids: BTreeMap<String, Vec<String>>,
author_ids: BTreeMap<String, Vec<String>>,
npub_ids: BTreeMap<String, Vec<String>>,
@@ -59,14 +37,14 @@ impl EventReactionIndexes {
profiles: &ProfileResolver,
) -> Result<Self, NostrEventsStaticError> {
let mut events = Vec::with_capacity(raw_events.len());
- let mut events_id = BTreeMap::new();
+ let mut events_id: BTreeMap<String, usize> = BTreeMap::new();
let mut root_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new();
for raw in raw_events {
- match raw.clone().to_radroots_reaction_event() {
+ match raw.to_radroots_reaction_event() {
Ok(evt) => {
audit::log_indexer_event(&raw);
@@ -74,14 +52,18 @@ impl EventReactionIndexes {
let author_hex = evt.metadata.author.to_lowercase();
let npub = public_key_to_npub(&author_hex)
- .map(|s| s.to_lowercase())
+ .map(|mut s| {
+ s.make_ascii_lowercase();
+ s
+ })
.ok();
let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned);
let root = evt.metadata.reaction.root.id.to_lowercase();
- events_id.insert(id.clone(), evt.clone());
- events.push(evt.clone());
+ events.push(evt);
+ let idx = events.len() - 1;
+ events_id.insert(id.clone(), idx);
root_ids.entry(root).or_default().push(id.clone());
author_ids.entry(author_hex).or_default().push(id.clone());
@@ -89,10 +71,7 @@ impl EventReactionIndexes {
npub_ids.entry(n).or_default().push(id.clone());
}
if let Some(n05) = author_nip05 {
- nip05_ids
- .entry(n05.to_lowercase())
- .or_default()
- .push(id.clone());
+ nip05_ids.entry(n05).or_default().push(id.clone());
}
}
Err(err) => {
@@ -110,31 +89,32 @@ impl EventReactionIndexes {
}
let sort_ids = |ids: &mut Vec<String>,
- map: &BTreeMap<String, RadrootsReactionEventIndex>| {
+ map: &BTreeMap<String, usize>,
+ events: &[RadrootsReactionEventIndex]| {
ids.sort_unstable_by(|a, b| {
let pa = map
.get(a)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
let pb = map
.get(b)
- .map(|e| e.metadata.published_at)
+ .map(|idx| events[*idx].metadata.published_at)
.unwrap_or_default();
pb.cmp(&pa).then(a.cmp(b))
});
};
for ids in root_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in author_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in npub_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
for ids in nip05_ids.values_mut() {
- sort_ids(ids, &events_id);
+ sort_ids(ids, &events_id, &events);
}
Ok(Self {
@@ -167,105 +147,141 @@ impl WriteEventIndexes for EventReactionIndexes {
let base: PathBuf = IndexerEventKind::Reaction.base_path(&settings.indexer.data_dir)?;
fs_mkdir(&[&base])?;
- // Root-level event list
{
let idxs_root = base.join("events.json");
let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect();
- write_if_stale!(idxs_root, ids, updated);
+ write_json_if_changed(&idxs_root, &ids, updated)?;
}
- // Index by event ID
{
let sub = base.join("id");
fs_mkdir(&[&sub])?;
- let keys: Vec<String> = self.events_id.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), keys, updated);
+ let keys: Vec<String> = self
+ .events_id
+ .keys()
+ .filter_map(|key| safe_path_segment(&key.to_lowercase()))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?;
- for (id, evt) in &self.events_id {
- let dir = sub.join(id.to_lowercase());
+ for (id, idx) in &self.events_id {
+ let id_lower = id.to_lowercase();
+ let Some(dir_key) = safe_path_segment(&id_lower) else {
+ warn!(id = %id, "Skipping unsafe reaction id path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
+ let evt = &self.events[*idx];
fs_mkdir(&[&dir])?;
- write_if_stale!(dir.join("event.json"), evt.event.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated);
+ write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &evt.metadata, updated)?;
}
}
- // Index by Root ID
{
let sub = base.join(IndexerKey::RootId.as_str());
fs_mkdir(&[&sub])?;
- let roots: Vec<String> = self.root_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), roots, updated);
+ let roots: Vec<String> = self
+ .root_ids
+ .keys()
+ .filter_map(|root| safe_path_segment(root))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &roots, updated)?;
for (root, ids) in &self.root_ids {
- let dir = sub.join(root);
+ let Some(dir_key) = safe_path_segment(root) else {
+ warn!(root = %root, "Skipping unsafe reaction root path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsReactionEventMetadata> = ids
+ let metas: Vec<&RadrootsReactionEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
- // Index by Author (hex public key)
{
let sub = base.join(IndexerKey::Author.as_str());
fs_mkdir(&[&sub])?;
- let authors: Vec<String> = self.author_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), authors, updated);
+ let authors: Vec<String> = self
+ .author_ids
+ .keys()
+ .filter_map(|author| safe_path_segment(author))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &authors, updated)?;
for (author, ids) in &self.author_ids {
- let dir = sub.join(author);
+ let Some(dir_key) = safe_path_segment(author) else {
+ warn!(author = %author, "Skipping unsafe reaction author path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsReactionEventMetadata> = ids
+ let metas: Vec<&RadrootsReactionEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
- // Index by Npub (bech32 public key)
{
let sub = base.join(IndexerKey::Npub.as_str());
fs_mkdir(&[&sub])?;
- let npubs: Vec<String> = self.npub_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), npubs, updated);
+ let npubs: Vec<String> = self
+ .npub_ids
+ .keys()
+ .filter_map(|npub| safe_path_segment(npub))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &npubs, updated)?;
for (npub, ids) in &self.npub_ids {
- let dir = sub.join(npub);
+ let Some(dir_key) = safe_path_segment(npub) else {
+ warn!(npub = %npub, "Skipping unsafe reaction npub path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsReactionEventMetadata> = ids
+ let metas: Vec<&RadrootsReactionEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
- // Index by NIP-05 name
{
let sub = base.join(IndexerKey::Nip05.as_str());
fs_mkdir(&[&sub])?;
- let names: Vec<String> = self.nip05_ids.keys().cloned().collect();
- write_if_stale!(sub.join("indexes.json"), names, updated);
+ let names: Vec<String> = self
+ .nip05_ids
+ .keys()
+ .filter_map(|name| safe_path_segment(name))
+ .collect();
+ write_json_if_changed(&sub.join("indexes.json"), &names, updated)?;
for (name, ids) in &self.nip05_ids {
- let dir = sub.join(name);
+ let Some(dir_key) = safe_path_segment(name) else {
+ warn!(nip05 = %name, "Skipping unsafe reaction nip05 path segment");
+ continue;
+ };
+ let dir = sub.join(dir_key);
fs_mkdir(&[&dir])?;
- let metas: Vec<RadrootsReactionEventMetadata> = ids
+ let metas: Vec<&RadrootsReactionEventMetadata> = ids
.iter()
.filter_map(|id| self.events_id.get(id))
- .map(|e| e.metadata.clone())
+ .map(|idx| &self.events[*idx].metadata)
.collect();
- write_if_stale!(dir.join("events.json"), ids.clone(), updated);
- write_if_stale!(dir.join("metadata.json"), metas, updated);
+ write_json_if_changed(&dir.join("events.json"), ids, updated)?;
+ write_json_if_changed(&dir.join("metadata.json"), &metas, updated)?;
}
}
diff --git a/indexer/src/domain/resolvers/profile.rs b/indexer/src/domain/resolvers/profile.rs
@@ -1,30 +1,48 @@
use crate::domain::events::ToRadrootsProfileEventIndex;
use crate::relay::event::RelayIndexerEvent;
+use crate::utils::nostr::normalize_nip05;
use std::collections::BTreeMap;
+#[derive(Clone)]
+struct Nip05Info {
+ full: String,
+ local: String,
+ index_key: String,
+}
+
#[derive(Default, Clone)]
pub struct ProfileResolver {
- author_to_nip05: BTreeMap<String, String>,
+ author_to_nip05: BTreeMap<String, Nip05Info>,
}
impl ProfileResolver {
pub fn from_metadata(raw_metadata: &[RelayIndexerEvent]) -> Self {
- let mut latest: BTreeMap<String, (u64, String)> = BTreeMap::new();
+ let mut latest: BTreeMap<String, (u32, Nip05Info)> = BTreeMap::new();
for raw in raw_metadata {
- if let Ok(evt) = raw.clone().to_radroots_profile_event() {
+ if let Ok(evt) = raw.to_radroots_profile_event() {
if let Some(n) = &evt.metadata.profile.nip05 {
- let normalized = n.replace("@radroots.market", "").to_lowercase();
- if normalized.is_empty() {
+ let (full, local, index_key) = normalize_nip05(n);
+ if index_key.is_empty() {
continue;
}
let author = evt.event.author.to_lowercase();
- let ts: u64 = evt.metadata.published_at;
+ let ts: u32 = evt.metadata.published_at;
match latest.get(&author) {
Some(&(old_ts, _)) if old_ts >= ts => {}
_ => {
- latest.insert(author, (ts, normalized));
+ latest.insert(
+ author,
+ (
+ ts,
+ Nip05Info {
+ full,
+ local,
+ index_key,
+ },
+ ),
+ );
}
}
}
@@ -39,7 +57,21 @@ impl ProfileResolver {
#[inline]
pub fn nip05_for_author(&self, author_hex: &str) -> Option<&str> {
self.author_to_nip05
- .get(&author_hex.to_lowercase())
- .map(|s| s.as_str())
+ .get(author_hex)
+ .map(|info| info.index_key.as_str())
+ }
+
+ #[inline]
+ pub fn nip05_full_for_author(&self, author_hex: &str) -> Option<&str> {
+ self.author_to_nip05
+ .get(author_hex)
+ .map(|info| info.full.as_str())
+ }
+
+ #[inline]
+ pub fn nip05_local_for_author(&self, author_hex: &str) -> Option<&str> {
+ self.author_to_nip05
+ .get(author_hex)
+ .map(|info| info.local.as_str())
}
}
diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs
@@ -1,10 +1,3 @@
-use anyhow::{Context, Result};
-use std::{
- collections::HashMap,
- time::{Duration, Instant},
-};
-use tracing::info;
-
pub mod cli;
pub mod config;
pub mod telemetry;
@@ -18,6 +11,7 @@ pub mod relay {
pub mod record;
}
pub mod utils;
+mod runner;
#[cfg(feature = "audit")]
pub mod audit;
@@ -25,274 +19,17 @@ pub mod audit;
#[cfg(not(feature = "audit"))]
pub mod audit {
use radroots_events::{
- comment::models::RadrootsCommentEventIndex, listing::models::RadrootsListingEventIndex,
- profile::models::RadrootsProfileEventIndex,
+ comment::RadrootsCommentEventIndex, listing::RadrootsListingEventIndex,
+ profile::RadrootsProfileEventIndex,
};
+ use crate::domain::resolvers::profile::ProfileResolver;
pub fn log_indexer_event(_: &crate::relay::event::RelayIndexerEvent) {}
pub fn log_profile_event(_: &RadrootsProfileEventIndex) {}
pub fn log_listing_event(_: &RadrootsListingEventIndex) {}
pub fn log_comment_event(_: &RadrootsCommentEventIndex) {}
+ pub fn set_profile_resolver(_: ProfileResolver) {}
}
-
-use crate::{
- domain::{
- indexer::{
- kind::IndexerEventKind,
- models::{
- EventCommentIndexes, EventIndexes, EventListingIndexes, EventProfileIndexes,
- EventReactionIndexes, WriteEventIndexes,
- },
- },
- resolvers::profile::ProfileResolver,
- },
- relay::event::RelayIndexerEvent,
- utils::{
- db::IndexerDb,
- sqlite::{sqlite_conn, sqlite_stmt},
- },
-};
pub use config::Settings;
pub use relay::record::RelayEventRecord;
-
-pub async fn run(settings: Settings) -> Result<()> {
- let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?;
- let tree_raw = "hashes";
- let tree_events_profile = "profile_events";
- let tree_events_listing = "listing_events";
- let tree_events_reaction = "reaction_events";
- let tree_events_comment = "comment_events";
- let tree_stats = "stats";
-
- let last_created_at_db: u32 = db_idx
- .get_raw(tree_stats, "last_created_at")?
- .map(|ivec| {
- let arr: [u8; 4] = ivec.as_ref().try_into().unwrap();
- u32::from_be_bytes(arr)
- })
- .unwrap_or(0);
- let mut last_created_at = last_created_at_db;
-
- let event_kinds = IndexerEventKind::ALL
- .iter()
- .map(|k| k.as_u64().to_string())
- .collect::<Vec<_>>()
- .join(", ");
-
- let relay_query = format!(
- "SELECT hex(event_hash), hex(author), created_at, kind, content FROM event WHERE kind IN ({}) AND created_at > ?",
- event_kinds
- );
-
- loop {
- let iteration_start = Instant::now();
- let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| {
- format!(
- "Could not open relay DB at {}",
- settings.relay.database_path
- )
- })?;
- let mut stmt =
- sqlite_stmt(&relay_db, &relay_query).context("Could not prepare event query")?;
-
- let records: Vec<RelayEventRecord> = stmt
- .query_map([last_created_at], RelayEventRecord::from_row)?
- .collect::<Result<_, _>>()
- .context("collecting RelayEventRecord rows")?;
- info!(record_count = records.len(), "Loaded relay records");
-
- let mut records_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> = HashMap::new();
- for rec in records.into_iter() {
- let iev = RelayIndexerEvent::try_from(rec)?;
- audit::log_indexer_event(&iev);
- records_kind.entry(iev.kind).or_default().push(iev);
- }
-
- let mut need_rebuild_listing = false;
-
- if let Some(profile_events) = records_kind.remove(&IndexerEventKind::Profile) {
- if !profile_events.is_empty() {
- for ev in &profile_events {
- last_created_at = last_created_at.max(ev.created_at);
- let id = &ev.id;
- let hash = &ev.hash;
- let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? {
- old.as_ref() == hash.as_bytes()
- } else {
- false
- };
- if skip {
- continue;
- }
-
- db_idx.insert(tree_events_profile, id, ev)?;
- db_idx.insert_raw(tree_raw, id, hash.as_bytes())?;
- }
-
- db_idx.insert_raw(
- tree_stats,
- "last_created_at",
- &last_created_at.to_be_bytes(),
- )?;
- db_idx.flush()?;
-
- let raw_profile_events: Vec<RelayIndexerEvent> =
- db_idx.get_all(tree_events_profile)?;
- let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?;
- let mut updated_indexes = Vec::new();
- indexed_profile_events.write(&settings, &mut updated_indexes)?;
- info!(
- written = updated_indexes.len(),
- "Written {} index files",
- updated_indexes.len()
- );
-
- need_rebuild_listing = true;
- }
- }
-
- let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_profile)?;
- let profiles = ProfileResolver::from_metadata(&raw_profile_events);
-
- if let Some(listing_events) = records_kind.remove(&IndexerEventKind::Listing) {
- if !listing_events.is_empty() {
- for ev in &listing_events {
- last_created_at = last_created_at.max(ev.created_at);
- let id = &ev.id;
- let hash = &ev.hash;
- let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? {
- old.as_ref() == hash.as_bytes()
- } else {
- false
- };
- if skip {
- continue;
- }
- db_idx.insert(tree_events_listing, id, ev)?;
- db_idx.insert_raw(tree_raw, id, hash.as_bytes())?;
- }
-
- db_idx.insert_raw(
- tree_stats,
- "last_created_at",
- &last_created_at.to_be_bytes(),
- )?;
- db_idx.flush()?;
-
- let raw_listing_events: Vec<RelayIndexerEvent> =
- db_idx.get_all(tree_events_listing)?;
- let listing_indexes = EventListingIndexes::build(&raw_listing_events)?;
- let mut updated_listing = Vec::new();
- listing_indexes.write(&settings, &mut updated_listing)?;
- info!(
- written = updated_listing.len(),
- "Written {} listing index files",
- updated_listing.len()
- );
-
- need_rebuild_listing = true;
- }
- }
-
- if let Some(reaction_events) = records_kind.remove(&IndexerEventKind::Reaction) {
- if !reaction_events.is_empty() {
- for ev in &reaction_events {
- last_created_at = last_created_at.max(ev.created_at);
- let id = &ev.id;
- let hash = &ev.hash;
- let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? {
- old.as_ref() == hash.as_bytes()
- } else {
- false
- };
- if skip {
- continue;
- }
- db_idx.insert(tree_events_reaction, id, ev)?;
- db_idx.insert_raw(tree_raw, id, hash.as_bytes())?;
- }
-
- db_idx.insert_raw(
- tree_stats,
- "last_created_at",
- &last_created_at.to_be_bytes(),
- )?;
- db_idx.flush()?;
-
- let raw_reaction_events: Vec<RelayIndexerEvent> =
- db_idx.get_all(tree_events_reaction)?;
- let reaction_indexes =
- EventReactionIndexes::build_with_profiles(&raw_reaction_events, &profiles)?;
- let mut updated_reaction = Vec::new();
- reaction_indexes.write(&settings, &mut updated_reaction)?;
- info!(
- written = updated_reaction.len(),
- "Written {} reaction index files",
- updated_reaction.len()
- );
- }
- }
-
- if let Some(comment_events) = records_kind.remove(&IndexerEventKind::Comment) {
- if !comment_events.is_empty() {
- for ev in &comment_events {
- last_created_at = last_created_at.max(ev.created_at);
- let id = &ev.id;
- let hash = &ev.hash;
- let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? {
- old.as_ref() == hash.as_bytes()
- } else {
- false
- };
- if skip {
- continue;
- }
- db_idx.insert(tree_events_comment, id, ev)?;
- db_idx.insert_raw(tree_raw, id, hash.as_bytes())?;
- }
-
- db_idx.insert_raw(
- tree_stats,
- "last_created_at",
- &last_created_at.to_be_bytes(),
- )?;
- db_idx.flush()?;
-
- let raw_comment_events: Vec<RelayIndexerEvent> =
- db_idx.get_all(tree_events_comment)?;
- let comment_indexes =
- EventCommentIndexes::build_with_profiles(&raw_comment_events, &profiles)?;
- let mut updated_comment = Vec::new();
- comment_indexes.write(&settings, &mut updated_comment)?;
- info!(
- written = updated_comment.len(),
- "Written {} comment index files",
- updated_comment.len()
- );
- }
- }
-
- if need_rebuild_listing {
- let raw_listing_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_listing)?;
- let listing_indexes =
- EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?;
- let mut updated_listing = Vec::new();
- listing_indexes.write(&settings, &mut updated_listing)?;
- info!(
- written = updated_listing.len(),
- "Written {} listing index files",
- updated_listing.len()
- );
- }
-
- let elapsed = iteration_start.elapsed();
- let interval = Duration::from_secs(settings.indexer.flush_interval);
- let delay = interval.saturating_sub(elapsed);
- info!(
- elapsed_ms = elapsed.as_millis(),
- sleeping_ms = delay.as_millis(),
- "Iteration complete"
- );
- tokio::time::sleep(delay).await;
- }
-}
+pub use runner::run;
diff --git a/indexer/src/main.rs b/indexer/src/main.rs
@@ -16,7 +16,7 @@ async fn setup() -> Result<()> {
let settings = Settings::load(&args.config)?;
- telemetry::init(&settings.indexer.logs_dir);
+ let _telemetry_guards = telemetry::init(&settings.indexer.logs_dir);
info!("Service starting");
run(settings).await
diff --git a/indexer/src/relay/record.rs b/indexer/src/relay/record.rs
@@ -5,6 +5,7 @@ use crate::domain::indexer::kind::{IndexerEventKind, IndexerEventKindParseError}
#[derive(Clone, Debug, Serialize)]
pub struct RelayEventRecord {
+ pub rowid: Option<u64>,
pub event_hash: String,
pub author: String,
pub created_at: u32,
@@ -27,6 +28,31 @@ impl RelayEventRecord {
let content: String = row.get(4)?;
Ok(RelayEventRecord {
+ rowid: None,
+ event_hash,
+ author,
+ created_at,
+ kind,
+ content,
+ })
+ }
+
+ pub fn from_row_with_rowid(row: &SqliteRow) -> SqliteResult<Self> {
+ let rowid: u64 = row.get(0)?;
+ let event_hash: String = row.get(1)?;
+ let author: String = row.get(2)?;
+ let created_at: u32 = row.get(3)?;
+ let kind_num: u32 = row.get(4)?;
+
+ let kind = IndexerEventKind::try_from(kind_num as u64).map_err(
+ |e: IndexerEventKindParseError| {
+ RustqliteError::FromSqlConversionFailure(4, SqliteType::Integer, Box::new(e))
+ },
+ )?;
+
+ let content: String = row.get(5)?;
+ Ok(RelayEventRecord {
+ rowid: Some(rowid),
event_hash,
author,
created_at,
diff --git a/indexer/src/runner.rs b/indexer/src/runner.rs
@@ -0,0 +1,486 @@
+#![forbid(unsafe_code)]
+
+use anyhow::{Context, Result};
+use rusqlite::params;
+use std::{
+ collections::HashMap,
+ time::{Duration, Instant},
+};
+use tracing::{info, warn};
+
+use crate::{
+ audit,
+ domain::{
+ indexer::{
+ kind::IndexerEventKind,
+ models::{
+ EventCommentIndexes, EventFollowIndexes, EventIndexes, EventJobFeedbackIndexes,
+ EventJobRequestIndexes, EventJobResultIndexes, EventListingIndexes,
+ EventPostIndexes, EventProfileIndexes, EventReactionIndexes, WriteEventIndexes,
+ },
+ },
+ resolvers::profile::ProfileResolver,
+ },
+ relay::{event::RelayIndexerEvent, record::RelayEventRecord},
+ utils::{
+ db::IndexerDb,
+ sqlite::{sqlite_conn, sqlite_stmt},
+ },
+ Settings,
+};
+use radroots_events::kinds::{KIND_JOB_REQUEST_MIN, KIND_JOB_RESULT_MIN};
+
+const TREE_RAW: &str = "hashes";
+const TREE_EVENTS_PROFILE: &str = "profile_events";
+const TREE_EVENTS_POST: &str = "post_events";
+const TREE_EVENTS_FOLLOW: &str = "follow_events";
+const TREE_EVENTS_LISTING: &str = "listing_events";
+const TREE_EVENTS_REACTION: &str = "reaction_events";
+const TREE_EVENTS_COMMENT: &str = "comment_events";
+const TREE_EVENTS_JOB_REQUEST: &str = "job_request_events";
+const TREE_EVENTS_JOB_RESULT: &str = "job_result_events";
+const TREE_EVENTS_JOB_FEEDBACK: &str = "job_feedback_events";
+const TREE_STATS: &str = "stats";
+
+#[derive(Clone, Copy, Debug)]
+enum CursorMode {
+ RowId,
+ CreatedAt,
+}
+
+#[derive(Debug, Default)]
+struct CursorState {
+ last_created_at: u32,
+ last_event_hash: String,
+ last_rowid: u64,
+}
+
+impl CursorState {
+ fn load(db_idx: &IndexerDb) -> Result<Self> {
+ let last_created_at = db_idx
+ .get_raw(TREE_STATS, "last_created_at")?
+ .and_then(|ivec| parse_u32(ivec.as_ref(), "last_created_at"))
+ .unwrap_or(0);
+ let last_event_hash = db_idx
+ .get_raw(TREE_STATS, "last_event_hash")?
+ .and_then(|ivec| parse_string(ivec.as_ref(), "last_event_hash"))
+ .unwrap_or_default();
+ let last_rowid = db_idx
+ .get_raw(TREE_STATS, "last_rowid")?
+ .and_then(|ivec| parse_u64(ivec.as_ref(), "last_rowid"))
+ .unwrap_or(0);
+
+ Ok(Self {
+ last_created_at,
+ last_event_hash,
+ last_rowid,
+ })
+ }
+}
+
+fn parse_u32(raw: &[u8], label: &str) -> Option<u32> {
+ if raw.len() != 4 {
+ warn!(len = raw.len(), label, "Ignoring invalid cursor value");
+ return None;
+ }
+ let arr: [u8; 4] = raw.try_into().ok()?;
+ Some(u32::from_be_bytes(arr))
+}
+
+fn parse_u64(raw: &[u8], label: &str) -> Option<u64> {
+ if raw.len() != 8 {
+ warn!(len = raw.len(), label, "Ignoring invalid cursor value");
+ return None;
+ }
+ let arr: [u8; 8] = raw.try_into().ok()?;
+ Some(u64::from_be_bytes(arr))
+}
+
+fn parse_string(raw: &[u8], label: &str) -> Option<String> {
+ match std::str::from_utf8(raw) {
+ Ok(value) => Some(value.to_string()),
+ Err(err) => {
+ warn!(error = %err, label, "Ignoring invalid cursor value");
+ None
+ }
+ }
+}
+
+struct EventBatch {
+ events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>>,
+ next_created: Option<(u32, String)>,
+ next_rowid: Option<u64>,
+ record_count: usize,
+}
+
+impl EventBatch {
+ fn from_records(records: Vec<RelayEventRecord>) -> Result<Self> {
+ let record_count = records.len();
+ let next_created = records
+ .last()
+ .map(|rec| (rec.created_at, rec.event_hash.clone()));
+ let next_rowid = records.last().and_then(|rec| rec.rowid);
+ let mut events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> =
+ HashMap::with_capacity(IndexerEventKind::GROUPS.len());
+
+ for rec in records {
+ let iev = RelayIndexerEvent::try_from(rec)?;
+ audit::log_indexer_event(&iev);
+ events_by_kind.entry(iev.kind.group()).or_default().push(iev);
+ }
+
+ Ok(Self {
+ events_by_kind,
+ next_created,
+ next_rowid,
+ record_count,
+ })
+ }
+}
+
+#[derive(Default)]
+struct ChangeFlags {
+ profiles: bool,
+ posts: bool,
+ follows: bool,
+ listings: bool,
+ reactions: bool,
+ comments: bool,
+ job_requests: bool,
+ job_results: bool,
+ job_feedback: bool,
+}
+
+impl ChangeFlags {
+ fn needs_profiles(&self) -> bool {
+ self.profiles
+ || self.listings
+ || self.reactions
+ || self.comments
+ || self.posts
+ || self.follows
+ || self.job_requests
+ || self.job_results
+ || self.job_feedback
+ }
+}
+
+fn insert_event(
+ db_idx: &IndexerDb,
+ tree: &str,
+ raw_tree: &str,
+ ev: &RelayIndexerEvent,
+) -> Result<bool> {
+ let id = &ev.id;
+ let hash = &ev.hash;
+ let skip = if let Some(old) = db_idx.get_raw(raw_tree, id)? {
+ old.as_ref() == hash.as_bytes()
+ } else {
+ false
+ };
+ if skip {
+ return Ok(false);
+ }
+ db_idx.insert(tree, id, ev)?;
+ db_idx.insert_raw(raw_tree, id, hash.as_bytes())?;
+ Ok(true)
+}
+
+fn insert_events(
+ db_idx: &IndexerDb,
+ tree: &str,
+ raw_tree: &str,
+ events: &[RelayIndexerEvent],
+) -> Result<bool> {
+ let mut any_new = false;
+ for ev in events {
+ if insert_event(db_idx, tree, raw_tree, ev)? {
+ any_new = true;
+ }
+ }
+ Ok(any_new)
+}
+
+fn write_indexes<T: WriteEventIndexes>(
+ settings: &Settings,
+ label: Option<&str>,
+ indexes: T,
+) -> Result<()> {
+ let mut updated = Vec::new();
+ indexes.write(settings, &mut updated)?;
+ match label {
+ Some(label) => info!(
+ written = updated.len(),
+ "Written {} {} index files",
+ updated.len(),
+ label
+ ),
+ None => info!(written = updated.len(), "Written {} index files", updated.len()),
+ }
+ Ok(())
+}
+
+pub async fn run(settings: Settings) -> Result<()> {
+ let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?;
+ let mut cursor = CursorState::load(&db_idx)?;
+
+ let relay_kind_filter = IndexerEventKind::relay_kind_filter_sql();
+ let relay_query_created = format!(
+ "SELECT hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND (created_at > ? OR (created_at = ? AND hex(event_hash) > ?)) ORDER BY created_at ASC, hex(event_hash) ASC",
+ relay_kind_filter
+ );
+ let relay_query_rowid = format!(
+ "SELECT rowid, hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND rowid > ? ORDER BY rowid ASC",
+ relay_kind_filter
+ );
+
+ let mut profiles = ProfileResolver::default();
+ let mut profiles_loaded = false;
+ let mut cursor_mode: Option<CursorMode> = None;
+
+ loop {
+ let iteration_start = Instant::now();
+ let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| {
+ format!(
+ "Could not open relay DB at {}",
+ settings.relay.database_path
+ )
+ })?;
+ if cursor_mode.is_none() {
+ cursor_mode = match sqlite_stmt(&relay_db, &relay_query_rowid) {
+ Ok(_) => Some(CursorMode::RowId),
+ Err(err) => {
+ warn!(
+ error = %err,
+ "Rowid cursor unavailable, falling back to created_at cursor"
+ );
+ Some(CursorMode::CreatedAt)
+ }
+ };
+ }
+
+ let mode = cursor_mode.unwrap_or(CursorMode::CreatedAt);
+ let records: Vec<RelayEventRecord> = match mode {
+ CursorMode::RowId => {
+ let mut stmt = sqlite_stmt(&relay_db, &relay_query_rowid)
+ .context("Could not prepare rowid event query")?;
+ let rows =
+ stmt.query_map(params![cursor.last_rowid], RelayEventRecord::from_row_with_rowid)?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .context("collecting RelayEventRecord rows")?
+ }
+ CursorMode::CreatedAt => {
+ let mut stmt = sqlite_stmt(&relay_db, &relay_query_created)
+ .context("Could not prepare created_at event query")?;
+ let rows = stmt.query_map(
+ params![
+ cursor.last_created_at,
+ cursor.last_created_at,
+ &cursor.last_event_hash
+ ],
+ RelayEventRecord::from_row,
+ )?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .context("collecting RelayEventRecord rows")?
+ }
+ };
+
+ let mut batch = EventBatch::from_records(records)?;
+ info!(record_count = batch.record_count, "Loaded relay records");
+
+ let mut changes = ChangeFlags::default();
+ let mut raw_listing_events: Option<Vec<RelayIndexerEvent>> = None;
+
+ if let Some(profile_events) = batch.events_by_kind.remove(&IndexerEventKind::Profile) {
+ if insert_events(&db_idx, TREE_EVENTS_PROFILE, TREE_RAW, &profile_events)? {
+ let raw_profile_events: Vec<RelayIndexerEvent> =
+ db_idx.get_all(TREE_EVENTS_PROFILE)?;
+ let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?;
+ write_indexes(&settings, None, indexed_profile_events)?;
+
+ profiles = ProfileResolver::from_metadata(&raw_profile_events);
+ profiles_loaded = true;
+ audit::set_profile_resolver(profiles.clone());
+ changes.profiles = true;
+ }
+ }
+
+ if let Some(post_events) = batch.events_by_kind.remove(&IndexerEventKind::Post) {
+ changes.posts = insert_events(&db_idx, TREE_EVENTS_POST, TREE_RAW, &post_events)?;
+ }
+
+ if let Some(follow_events) = batch.events_by_kind.remove(&IndexerEventKind::Follow) {
+ changes.follows = insert_events(&db_idx, TREE_EVENTS_FOLLOW, TREE_RAW, &follow_events)?;
+ }
+
+ if let Some(listing_events) = batch.events_by_kind.remove(&IndexerEventKind::Listing) {
+ if insert_events(&db_idx, TREE_EVENTS_LISTING, TREE_RAW, &listing_events)? {
+ raw_listing_events = Some(db_idx.get_all(TREE_EVENTS_LISTING)?);
+ changes.listings = true;
+ }
+ }
+
+ if let Some(reaction_events) = batch.events_by_kind.remove(&IndexerEventKind::Reaction) {
+ changes.reactions =
+ insert_events(&db_idx, TREE_EVENTS_REACTION, TREE_RAW, &reaction_events)?;
+ }
+
+ if let Some(comment_events) = batch.events_by_kind.remove(&IndexerEventKind::Comment) {
+ changes.comments =
+ insert_events(&db_idx, TREE_EVENTS_COMMENT, TREE_RAW, &comment_events)?;
+ }
+
+ if let Some(job_request_events) =
+ batch.events_by_kind.remove(&IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN))
+ {
+ changes.job_requests = insert_events(
+ &db_idx,
+ TREE_EVENTS_JOB_REQUEST,
+ TREE_RAW,
+ &job_request_events,
+ )?;
+ }
+
+ if let Some(job_result_events) =
+ batch.events_by_kind.remove(&IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN))
+ {
+ changes.job_results = insert_events(
+ &db_idx,
+ TREE_EVENTS_JOB_RESULT,
+ TREE_RAW,
+ &job_result_events,
+ )?;
+ }
+
+ if let Some(job_feedback_events) =
+ batch.events_by_kind.remove(&IndexerEventKind::JobFeedback)
+ {
+ changes.job_feedback = insert_events(
+ &db_idx,
+ TREE_EVENTS_JOB_FEEDBACK,
+ TREE_RAW,
+ &job_feedback_events,
+ )?;
+ }
+
+ if !batch.events_by_kind.is_empty() {
+ let kinds: Vec<IndexerEventKind> =
+ batch.events_by_kind.keys().copied().collect();
+ warn!(kinds = ?kinds, "Unhandled indexer event kinds");
+ }
+
+ if changes.needs_profiles() && !profiles_loaded {
+ let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_PROFILE)?;
+ profiles = ProfileResolver::from_metadata(&raw_profile_events);
+ profiles_loaded = true;
+ audit::set_profile_resolver(profiles.clone());
+ }
+
+ if changes.reactions {
+ let raw_reaction_events: Vec<RelayIndexerEvent> =
+ db_idx.get_all(TREE_EVENTS_REACTION)?;
+ let reaction_indexes =
+ EventReactionIndexes::build_with_profiles(&raw_reaction_events, &profiles)?;
+ write_indexes(&settings, Some("reaction"), reaction_indexes)?;
+ }
+
+ if changes.comments {
+ let raw_comment_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_COMMENT)?;
+ let comment_indexes =
+ EventCommentIndexes::build_with_profiles(&raw_comment_events, &profiles)?;
+ write_indexes(&settings, Some("comment"), comment_indexes)?;
+ }
+
+ if changes.posts {
+ let raw_post_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_POST)?;
+ let post_indexes = EventPostIndexes::build_with_profiles(&raw_post_events, &profiles)?;
+ write_indexes(&settings, Some("post"), post_indexes)?;
+ }
+
+ if changes.follows {
+ let raw_follow_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_FOLLOW)?;
+ let follow_indexes =
+ EventFollowIndexes::build_with_profiles(&raw_follow_events, &profiles)?;
+ write_indexes(&settings, Some("follow"), follow_indexes)?;
+ }
+
+ if changes.job_requests {
+ let raw_job_request_events: Vec<RelayIndexerEvent> =
+ db_idx.get_all(TREE_EVENTS_JOB_REQUEST)?;
+ let job_request_indexes =
+ EventJobRequestIndexes::build_with_profiles(&raw_job_request_events, &profiles)?;
+ write_indexes(&settings, Some("job request"), job_request_indexes)?;
+ }
+
+ if changes.job_results {
+ let raw_job_result_events: Vec<RelayIndexerEvent> =
+ db_idx.get_all(TREE_EVENTS_JOB_RESULT)?;
+ let job_result_indexes =
+ EventJobResultIndexes::build_with_profiles(&raw_job_result_events, &profiles)?;
+ write_indexes(&settings, Some("job result"), job_result_indexes)?;
+ }
+
+ if changes.job_feedback {
+ let raw_job_feedback_events: Vec<RelayIndexerEvent> =
+ db_idx.get_all(TREE_EVENTS_JOB_FEEDBACK)?;
+ let job_feedback_indexes =
+ EventJobFeedbackIndexes::build_with_profiles(&raw_job_feedback_events, &profiles)?;
+ write_indexes(&settings, Some("job feedback"), job_feedback_indexes)?;
+ }
+
+ if changes.listings || changes.profiles {
+ let raw_listing_events = match raw_listing_events.take() {
+ Some(events) => events,
+ None => db_idx.get_all(TREE_EVENTS_LISTING)?,
+ };
+ let listing_indexes =
+ EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?;
+ write_indexes(&settings, Some("listing"), listing_indexes)?;
+ }
+
+ let mut cursor_updated = false;
+ match mode {
+ CursorMode::CreatedAt => {
+ if let Some((created_at, event_hash)) = batch.next_created.take() {
+ cursor.last_created_at = created_at;
+ cursor.last_event_hash = event_hash;
+ db_idx.insert_raw(
+ TREE_STATS,
+ "last_created_at",
+ &cursor.last_created_at.to_be_bytes(),
+ )?;
+ db_idx.insert_raw(
+ TREE_STATS,
+ "last_event_hash",
+ cursor.last_event_hash.as_bytes(),
+ )?;
+ cursor_updated = true;
+ }
+ }
+ CursorMode::RowId => {
+ if let Some(rowid) = batch.next_rowid.take() {
+ cursor.last_rowid = rowid;
+ db_idx.insert_raw(
+ TREE_STATS,
+ "last_rowid",
+ &cursor.last_rowid.to_be_bytes(),
+ )?;
+ cursor_updated = true;
+ }
+ }
+ }
+ if cursor_updated {
+ db_idx.flush()?;
+ }
+
+ let elapsed = iteration_start.elapsed();
+ let interval = Duration::from_secs(settings.indexer.flush_interval);
+ let delay = interval.saturating_sub(elapsed);
+ info!(
+ elapsed_ms = elapsed.as_millis(),
+ sleeping_ms = delay.as_millis(),
+ "Iteration complete"
+ );
+ tokio::time::sleep(delay).await;
+ }
+}
diff --git a/indexer/src/telemetry.rs b/indexer/src/telemetry.rs
@@ -1,19 +1,24 @@
use std::path::Path;
+use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling;
use tracing_subscriber::{fmt, prelude::*, EnvFilter, Registry};
#[cfg(feature = "audit")]
use tracing_subscriber::filter::Targets;
-pub fn init(logs_dir: impl AsRef<Path>) {
+pub struct TelemetryGuards {
+ _file: WorkerGuard,
+ _audit: Option<WorkerGuard>,
+}
+
+pub fn init(logs_dir: impl AsRef<Path>) -> TelemetryGuards {
let logs_path = logs_dir.as_ref();
if let Err(e) = std::fs::create_dir_all(logs_path) {
eprintln!("Failed to create logs directory {}: {}", logs_path.display(), e);
}
-
+
let file_appender = rolling::daily(logs_path, concat!(env!("CARGO_PKG_NAME"), ".log"));
let (file_writer, guard) = tracing_appender::non_blocking(file_appender);
- std::mem::forget(guard);
let stdout_layer = fmt::layer().with_writer(std::io::stdout).with_target(false);
@@ -28,10 +33,9 @@ pub fn init(logs_dir: impl AsRef<Path>) {
.with(file_layer);
#[cfg(feature = "audit")]
- let subscriber = {
- let audit_app = rolling::daily(&logs_dir, "audit.log");
- let (audit_writer, audit_guard) = tracing_appender::non_blocking(audit_app);
- std::mem::forget(audit_guard);
+ let (subscriber, audit_guard) = {
+ let audit_app = rolling::daily(logs_path, "audit.log");
+ let (audit_writer, guard) = tracing_appender::non_blocking(audit_app);
let audit_layer = fmt::layer()
.with_writer(audit_writer)
@@ -39,8 +43,15 @@ pub fn init(logs_dir: impl AsRef<Path>) {
.with_target(true)
.with_filter(Targets::new().with_target("audit", tracing::Level::INFO));
- subscriber.with(audit_layer)
+ (subscriber.with(audit_layer), Some(guard))
};
+ #[cfg(not(feature = "audit"))]
+ let (subscriber, audit_guard) = (subscriber, None);
+
subscriber.init();
+ TelemetryGuards {
+ _file: guard,
+ _audit: audit_guard,
+ }
}
diff --git a/indexer/src/utils/crypto.rs b/indexer/src/utils/crypto.rs
@@ -36,7 +36,7 @@ pub fn compute_hash<T: Serialize>(value: &T) -> anyhow::Result<String> {
let mut hasher = Sha256::new();
{
let writer = HasherWriter(&mut hasher);
- serde_json::to_writer_pretty(writer, value)?;
+ serde_json::to_writer(writer, value)?;
}
Ok(format!("{:x}", hasher.finalize()))
}
diff --git a/indexer/src/utils/db.rs b/indexer/src/utils/db.rs
@@ -1,28 +1,44 @@
use anyhow::Result;
use serde::{de::DeserializeOwned, Serialize};
use sled::{Config as SledConfig, Db, IVec};
+use std::collections::HashMap;
+use std::sync::Mutex;
use crate::utils::serde_utils::{deserialize, serialize};
pub struct IndexerDb {
pub db: Db,
+ trees: Mutex<HashMap<String, sled::Tree>>,
}
impl IndexerDb {
pub fn open(path: &str) -> Result<Self> {
let db = SledConfig::new().path(path).open()?;
- Ok(Self { db })
+ Ok(Self {
+ db,
+ trees: Mutex::new(HashMap::new()),
+ })
+ }
+
+ fn tree(&self, name: &str) -> Result<sled::Tree> {
+ let mut trees = self.trees.lock().unwrap_or_else(|err| err.into_inner());
+ if let Some(tree) = trees.get(name) {
+ return Ok(tree.clone());
+ }
+ let tree = self.db.open_tree(name)?;
+ trees.insert(name.to_string(), tree.clone());
+ Ok(tree)
}
pub fn insert<T: Serialize>(&self, tree: &str, key: &str, value: &T) -> Result<()> {
- let t = self.db.open_tree(tree)?;
+ let t = self.tree(tree)?;
let blob: Vec<u8> = serialize(value)?;
t.insert(key, blob)?;
Ok(())
}
pub fn get<T: DeserializeOwned>(&self, tree: &str, key: &str) -> Result<Option<T>> {
- let t = self.db.open_tree(tree)?;
+ let t = self.tree(tree)?;
if let Some(bytes) = t.get(key)? {
let v: T = deserialize(&bytes)?;
Ok(Some(v))
@@ -32,7 +48,7 @@ impl IndexerDb {
}
pub fn get_all<T: DeserializeOwned>(&self, tree: &str) -> Result<Vec<T>> {
- let t = self.db.open_tree(tree)?;
+ let t = self.tree(tree)?;
let mut out = Vec::new();
for res in t.iter().values() {
let bytes = res?;
@@ -43,12 +59,12 @@ impl IndexerDb {
}
pub fn insert_raw(&self, tree: &str, key: &str, bytes: &[u8]) -> Result<()> {
- self.db.open_tree(tree)?.insert(key, bytes)?;
+ self.tree(tree)?.insert(key, bytes)?;
Ok(())
}
pub fn get_raw(&self, tree: &str, key: &str) -> Result<Option<IVec>> {
- Ok(self.db.open_tree(tree)?.get(key)?)
+ Ok(self.tree(tree)?.get(key)?)
}
pub fn flush(&self) -> Result<()> {
diff --git a/indexer/src/utils/io.rs b/indexer/src/utils/io.rs
@@ -1,10 +1,12 @@
-use anyhow::Result;
+use anyhow::{Context, Result};
use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tracing::debug;
+use crate::utils::crypto::compute_hash;
+
#[derive(Debug, Error)]
pub enum PathsError {
#[error("Invalid path segment at index {index}: `{segment}`")]
@@ -30,6 +32,21 @@ where
Ok(path)
}
+pub fn safe_path_segment(segment: &str) -> Option<String> {
+ let mut components = Path::new(segment).components();
+ match (components.next(), components.next()) {
+ (Some(std::path::Component::Normal(comp)), None) => {
+ let value = comp.to_string_lossy();
+ if value.is_empty() {
+ None
+ } else {
+ Some(value.into_owned())
+ }
+ }
+ _ => None,
+ }
+}
+
#[derive(thiserror::Error, Debug)]
pub enum FileError {
#[error("Failed to create directory `{path}`: {source}")]
@@ -77,8 +94,58 @@ pub fn write_hash(path: &Path, hash: &str) -> Result<()> {
Ok(())
}
+pub fn write_json_if_changed<T: serde::Serialize>(
+ path: &Path,
+ data: &T,
+ updated: &mut Vec<PathBuf>,
+) -> Result<String> {
+ let hash = compute_hash(data)
+ .with_context(|| format!("Failed to hash JSON for {}", path.display()))?;
+ let hash_path = path.with_extension("sha256.txt");
+
+ let needs_write = if path.exists() && hash_path.exists() {
+ let stored = fs::read_to_string(&hash_path)
+ .with_context(|| format!("Failed to read {}", hash_path.display()))?;
+ stored.trim() != hash
+ } else {
+ true
+ };
+
+ if needs_write {
+ write_json(path, data)
+ .with_context(|| format!("Failed to write {}", path.display()))?;
+ write_hash(path, &hash)
+ .with_context(|| format!("Failed to write hash for {}", path.display()))?;
+ updated.push(path.to_path_buf());
+ }
+
+ Ok(hash)
+}
+
pub fn fs_write_rss(path: &Path, content: &str) -> Result<()> {
let mut file = File::create(path)?;
file.write_all(content.as_bytes())?;
Ok(())
}
+
+#[cfg(test)]
+mod tests {
+ use super::safe_path_segment;
+
+ #[test]
+ fn safe_path_segment_rejects_traversal() {
+ assert!(safe_path_segment("..").is_none());
+ assert!(safe_path_segment(".").is_none());
+ assert!(safe_path_segment("a/b").is_none());
+ assert!(safe_path_segment("/abs").is_none());
+ }
+
+ #[test]
+ fn safe_path_segment_accepts_normal() {
+ assert_eq!(safe_path_segment("alpha"), Some("alpha".to_string()));
+ assert_eq!(
+ safe_path_segment("user@example.com"),
+ Some("user@example.com".to_string())
+ );
+ }
+}
diff --git a/indexer/src/utils/nostr.rs b/indexer/src/utils/nostr.rs
@@ -14,7 +14,24 @@ pub enum NostrUtilsError {
pub fn public_key_to_npub(public_key_hex: &str) -> Result<String, NostrUtilsError> {
let pubkey = PublicKey::from_hex(public_key_hex)?;
- Ok(pubkey.to_bech32().expect("to_bech32 is infallible"))
+ let bech32 = match pubkey.to_bech32() {
+ Ok(value) => value,
+ Err(err) => match err {},
+ };
+ Ok(bech32)
+}
+
+pub(crate) fn normalize_nip05(nip05: &str) -> (String, String, String) {
+ let lower = nip05.to_lowercase();
+ let local = lower
+ .split_once('@')
+ .map(|(name, _)| name.to_string())
+ .unwrap_or_else(|| lower.clone());
+ let index_key = lower
+ .strip_suffix("@radroots.market")
+ .map(|s| s.to_string())
+ .unwrap_or_else(|| lower.clone());
+ (lower, local, index_key)
}
pub fn get_tag_value<'a>(
diff --git a/indexer/src/utils/strings.rs b/indexer/src/utils/strings.rs
@@ -1,6 +1,10 @@
pub fn truncate_log(s: &str, max: usize) -> &str {
if s.len() > max {
- &s[..max]
+ let mut idx = max;
+ while idx > 0 && !s.is_char_boundary(idx) {
+ idx -= 1;
+ }
+ &s[..idx]
} else {
s
}