tangle_indexer


git clone https://radroots.dev/git/tangle_indexer.git
Log | Files | Refs | Submodules | LICENSE

commit be2ce93621b80ca32a936a21bd83009b153a7c66
parent fec726b4ff944fe9c2dbfd29d58618d39fa379a1
Author: triesap <triesap@radroots.dev>
Date:   Mon,  3 Nov 2025 20:03:12 +0000

Add `kind 7` reaction events with metadata extraction and indexes by root, author, npub, and NIP-05, integrated into the indexing pipeline. Refine config and structure by extending event kinds and keys, updating paths and ports, and removing repository data artifacts.

Diffstat:
Mapp/src/app.css | 4++--
Mindexer-config.toml | 2+-
Dindexer/data/radroots-market-indexer/indexer_db/conf | 5-----
Dindexer/data/radroots-market-indexer/indexer_db/db | 0
Mindexer/src/domain/events/mod.rs | 2++
Aindexer/src/domain/events/reaction.rs | 123+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mindexer/src/domain/indexer/key.rs | 10++++++++++
Mindexer/src/domain/indexer/kind.rs | 15++++++++++++---
Mindexer/src/domain/indexer/mod.rs | 2--
Mindexer/src/domain/indexer/models/listing.rs | 10+++++-----
Mindexer/src/domain/indexer/models/mod.rs | 4+++-
Mindexer/src/domain/indexer/models/profile.rs | 2+-
Aindexer/src/domain/indexer/models/reaction.rs | 274+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mindexer/src/lib.rs | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++--
14 files changed, 488 insertions(+), 22 deletions(-)

diff --git a/app/src/app.css b/app/src/app.css @@ -6,5 +6,5 @@ themes: os_light, os_dark; } -@source "../../../../../global/packages/apps-lib/src/**/*.{svelte,ts}"; -@source "../../../../../global/packages/apps-lib-market/src/**/*.{svelte,ts}"; +@source "../../../packages/apps-lib/src/**/*.{svelte,ts}"; +@source "../../../packages/apps-lib-market/src/**/*.{svelte,ts}"; diff --git a/indexer-config.toml b/indexer-config.toml @@ -4,7 +4,7 @@ logs_dir = "./dev/logs/indexer" flush_interval = 20 [relay] -url = "ws://127.0.0.1:21648" +url = "ws://127.0.0.1:8080" database_path = "./dev/data/relay/nostr.db" [listings] diff --git a/indexer/data/radroots-market-indexer/indexer_db/conf b/indexer/data/radroots-market-indexer/indexer_db/conf @@ -1,4 +0,0 @@ -segment_size: 524288 -use_compression: false -version: 0.34 -vQÁ -\ No newline at end of file diff --git a/indexer/data/radroots-market-indexer/indexer_db/db b/indexer/data/radroots-market-indexer/indexer_db/db Binary files differ. diff --git a/indexer/src/domain/events/mod.rs b/indexer/src/domain/events/mod.rs @@ -1,8 +1,10 @@ pub mod listing; pub mod profile; +pub mod reaction; pub use listing::ToRadrootsListingEventIndex; pub use profile::ToRadrootsProfileEventIndex; +pub use reaction::ToRadrootsReactionEventIndex; #[macro_export] macro_rules! opt_required { diff --git a/indexer/src/domain/events/reaction.rs b/indexer/src/domain/events/reaction.rs @@ -0,0 +1,123 @@ +use thiserror::Error; + +use radroots_events::{ + reaction::models::{ + RadrootsReaction, RadrootsReactionEventIndex, RadrootsReactionEventMetadata, + }, + RadrootsNostrEvent, RadrootsNostrEventRef, +}; + +use crate::relay::event::RelayIndexerEvent; + +#[derive(Debug, Error)] +pub enum RadrootsReactionEventIndexError { + #[error("Failed to parse reaction from tags")] + ParseError, +} + +fn parse_reaction_from_tags( + tags: &[Vec<String>], + content: &str, +) -> Result<RadrootsReaction, RadrootsReactionEventIndexError> { + let mut root_id: Option<String> = None; + let mut root_relays: Option<Vec<String>> = None; + let mut root_kind: Option<u32> = None; + let mut root_author: Option<String> = None; + let mut root_d: Option<String> = None; + + for t in tags { + if t.first().map(|k| k == "e").unwrap_or(false) { + if let Some(id) = t.get(1).cloned() { + root_id = Some(id); + } + if let Some(r) = t.get(2).cloned() { + root_relays = Some(vec![r]); + } + } else if t.first().map(|k| k == "a").unwrap_or(false) { + if let Some(arg) = t.get(1) { + let parts: Vec<&str> = arg.split(':').collect(); + if parts.len() >= 2 { + root_kind = parts[0].parse::<u32>().ok(); + root_author = Some(parts[1].to_lowercase()); + if parts.len() >= 3 && !parts[2].is_empty() { + root_d = Some(parts[2].to_string()); + } + } + } + if let Some(r) = t.get(2).cloned() { + root_relays = Some(vec![r]); + } + } + } + + let id = root_id.ok_or(RadrootsReactionEventIndexError::ParseError)?; + let kind = root_kind.unwrap_or(1); + let author = root_author.unwrap_or_default(); + + let root = RadrootsNostrEventRef { + id, + author, + kind, + d_tag: root_d, + relays: root_relays, + }; + + Ok(RadrootsReaction { + root, + content: content.to_string(), + }) +} + +fn create_radroots_reaction_event_metadata( + id: String, + author: String, + published_at: u32, + kind: u32, + content: String, + tags: Vec<Vec<String>>, +) -> Result<RadrootsReactionEventMetadata, RadrootsReactionEventIndexError> { + let reaction = parse_reaction_from_tags(&tags, &content)?; + Ok(RadrootsReactionEventMetadata { + id, + author, + published_at, + kind, + reaction, + }) +} + +pub trait ToRadrootsReactionEventIndex { + fn to_radroots_reaction_event( + self, + ) -> Result<RadrootsReactionEventIndex, RadrootsReactionEventIndexError>; +} + +impl ToRadrootsReactionEventIndex for RelayIndexerEvent { + fn to_radroots_reaction_event( + self, + ) -> Result<RadrootsReactionEventIndex, RadrootsReactionEventIndexError> { + let kind_u32 = self.kind.as_u64() as u32; + + let metadata = create_radroots_reaction_event_metadata( + self.id.clone(), + self.author.clone(), + self.created_at, + kind_u32, + self.content.clone(), + self.tags.clone(), + )?; + + Ok(RadrootsReactionEventIndex { + event: RadrootsNostrEvent { + id: self.id, + author: self.author, + created_at: self.created_at, + kind: kind_u32, + tags: self.tags, + content: self.content, + sig: self.sig, + }, + metadata, + }) + } +} diff --git a/indexer/src/domain/indexer/key.rs b/indexer/src/domain/indexer/key.rs @@ -5,6 +5,7 @@ pub enum IndexerKey { Nip05, Npub, Country, + RootId, } impl IndexerKey { @@ -15,6 +16,7 @@ impl IndexerKey { IndexerKey::Nip05 => "nip05", IndexerKey::Npub => "npub", IndexerKey::Country => "country", + IndexerKey::RootId => "root", } } } @@ -33,3 +35,11 @@ pub const LISTING_INDEX_DIRECTORY: [IndexerKey; 5] = [ IndexerKey::Npub, IndexerKey::Nip05, ]; + +pub const REACTION_INDEX_DIRECTORY: [IndexerKey; 5] = [ + IndexerKey::Id, + IndexerKey::RootId, + IndexerKey::Author, + IndexerKey::Npub, + IndexerKey::Nip05, +]; diff --git a/indexer/src/domain/indexer/kind.rs b/indexer/src/domain/indexer/kind.rs @@ -3,22 +3,29 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; use std::path::PathBuf; -use crate::domain::indexer::key::LISTING_INDEX_DIRECTORY; -use crate::domain::indexer::{IndexerKey, PROFILE_INDEX_DIRECTORY}; +use crate::domain::indexer::key::{ + IndexerKey, LISTING_INDEX_DIRECTORY, PROFILE_INDEX_DIRECTORY, REACTION_INDEX_DIRECTORY, +}; use crate::utils::io::{paths_join, PathsError}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum IndexerEventKind { Profile, + Reaction, Listing, } impl IndexerEventKind { - pub const ALL: [IndexerEventKind; 2] = [IndexerEventKind::Profile, IndexerEventKind::Listing]; + pub const ALL: [IndexerEventKind; 3] = [ + IndexerEventKind::Profile, + IndexerEventKind::Reaction, + IndexerEventKind::Listing, + ]; pub const fn as_u64(self) -> u64 { match self { IndexerEventKind::Profile => 0, + IndexerEventKind::Reaction => 7, IndexerEventKind::Listing => 30402, } } @@ -26,6 +33,7 @@ impl IndexerEventKind { pub const fn paths(self) -> &'static [IndexerKey] { match self { IndexerEventKind::Profile => &PROFILE_INDEX_DIRECTORY, + IndexerEventKind::Reaction => &REACTION_INDEX_DIRECTORY, IndexerEventKind::Listing => &LISTING_INDEX_DIRECTORY, } } @@ -76,6 +84,7 @@ impl TryFrom<u64> for IndexerEventKind { fn try_from(val: u64) -> Result<Self, Self::Error> { match val { 0 => Ok(IndexerEventKind::Profile), + 7 => Ok(IndexerEventKind::Reaction), 30402 => Ok(IndexerEventKind::Listing), other => Err(IndexerEventKindParseError(other)), } diff --git a/indexer/src/domain/indexer/mod.rs b/indexer/src/domain/indexer/mod.rs @@ -1,5 +1,3 @@ pub mod key; pub mod kind; pub mod models; - -pub use key::{IndexerKey, PROFILE_INDEX_DIRECTORY}; diff --git a/indexer/src/domain/indexer/models/listing.rs b/indexer/src/domain/indexer/models/listing.rs @@ -171,7 +171,7 @@ impl EventListingIndexes { impl EventIndexes for EventListingIndexes { type Event = RelayIndexerEvent; - fn subdirs() -> &'static [crate::domain::indexer::IndexerKey] { + fn subdirs() -> &'static [crate::domain::indexer::key::IndexerKey] { &LISTING_INDEX_DIRECTORY } @@ -228,7 +228,7 @@ impl WriteEventIndexes for EventListingIndexes { } { - let sub_country = base.join(crate::domain::indexer::IndexerKey::Country.as_str()); + let sub_country = base.join(crate::domain::indexer::key::IndexerKey::Country.as_str()); fs_mkdir(&[&sub_country])?; let country_codes: Vec<String> = self.country_ids.keys().cloned().collect(); write_if_stale!(sub_country.join("indexes.json"), country_codes, updated); @@ -309,7 +309,7 @@ impl WriteEventIndexes for EventListingIndexes { } { - let sub_author = base.join(crate::domain::indexer::IndexerKey::Author.as_str()); + let sub_author = base.join(crate::domain::indexer::key::IndexerKey::Author.as_str()); fs_mkdir(&[&sub_author])?; let authors: Vec<String> = self.author_ids.keys().cloned().collect(); write_if_stale!(sub_author.join("indexes.json"), authors, updated); @@ -397,7 +397,7 @@ impl WriteEventIndexes for EventListingIndexes { } { - let sub_npub = base.join(crate::domain::indexer::IndexerKey::Npub.as_str()); + let sub_npub = base.join(crate::domain::indexer::key::IndexerKey::Npub.as_str()); fs_mkdir(&[&sub_npub])?; let npubs: Vec<String> = self.npub_ids.keys().cloned().collect(); write_if_stale!(sub_npub.join("indexes.json"), npubs, updated); @@ -484,7 +484,7 @@ impl WriteEventIndexes for EventListingIndexes { } { - let sub_nip05 = base.join(crate::domain::indexer::IndexerKey::Nip05.as_str()); + let sub_nip05 = base.join(crate::domain::indexer::key::IndexerKey::Nip05.as_str()); fs_mkdir(&[&sub_nip05])?; let names: Vec<String> = self.nip05_ids.keys().cloned().collect(); write_if_stale!(sub_nip05.join("indexes.json"), names, updated); diff --git a/indexer/src/domain/indexer/models/mod.rs b/indexer/src/domain/indexer/models/mod.rs @@ -1,10 +1,12 @@ pub mod listing; pub mod profile; +pub mod reaction; pub use listing::EventListingIndexes; pub use profile::EventProfileIndexes; +pub use reaction::EventReactionIndexes; -use crate::{config::Settings, domain::indexer::IndexerKey}; +use crate::{config::Settings, domain::indexer::key::IndexerKey}; use anyhow::Result; use std::path::PathBuf; use thiserror::Error; diff --git a/indexer/src/domain/indexer/models/profile.rs b/indexer/src/domain/indexer/models/profile.rs @@ -1,3 +1,4 @@ +use crate::domain::indexer::key::{IndexerKey, PROFILE_INDEX_DIRECTORY}; use crate::utils::crypto::compute_hash; use crate::utils::io::fs_mkdir; use crate::utils::io::{write_hash, write_json}; @@ -10,7 +11,6 @@ use crate::{ indexer::{ kind::IndexerEventKind, models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, - IndexerKey, PROFILE_INDEX_DIRECTORY, }, }, relay::event::RelayIndexerEvent, diff --git a/indexer/src/domain/indexer/models/reaction.rs b/indexer/src/domain/indexer/models/reaction.rs @@ -0,0 +1,274 @@ +use crate::domain::indexer::key::{IndexerKey, REACTION_INDEX_DIRECTORY}; +use crate::utils::crypto::compute_hash; +use crate::utils::io::fs_mkdir; +use crate::utils::io::{write_hash, write_json}; +use crate::utils::nostr::public_key_to_npub; +use crate::utils::strings::truncate_log; +use crate::{ + audit, + domain::{ + events::reaction::ToRadrootsReactionEventIndex, + indexer::{ + kind::IndexerEventKind, + models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, + }, + resolvers::profile::ProfileResolver, + }, + relay::event::RelayIndexerEvent, + Settings, +}; +use radroots_events::reaction::models::{ + RadrootsReactionEventIndex, RadrootsReactionEventMetadata, +}; +use std::{collections::BTreeMap, fs, path::PathBuf}; +use tracing::{instrument, warn}; + +macro_rules! write_if_stale { + ($path:expr, $data:expr, $updated:expr) => {{ + let hash = compute_hash(&$data)?; + let hash_path = $path.with_extension("sha256.txt"); + + let needs_write = if $path.exists() && hash_path.exists() { + let stored = fs::read_to_string(&hash_path)?; + stored.trim() != hash + } else { + true + }; + + if needs_write { + write_json(&$path, &$data)?; + write_hash(&$path, &hash)?; + $updated.push($path.clone()); + } + }}; +} + +#[derive(Debug)] +pub struct EventReactionIndexes { + events: Vec<RadrootsReactionEventIndex>, + events_id: BTreeMap<String, RadrootsReactionEventIndex>, + root_ids: BTreeMap<String, Vec<String>>, + author_ids: BTreeMap<String, Vec<String>>, + npub_ids: BTreeMap<String, Vec<String>>, + nip05_ids: BTreeMap<String, Vec<String>>, +} + +impl EventReactionIndexes { + pub fn build_with_profiles( + raw_events: &[RelayIndexerEvent], + profiles: &ProfileResolver, + ) -> Result<Self, NostrEventsStaticError> { + let mut events = Vec::with_capacity(raw_events.len()); + let mut events_id = BTreeMap::new(); + let mut root_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + + for raw in raw_events { + match raw.clone().to_radroots_reaction_event() { + Ok(evt) => { + audit::log_indexer_event(&raw); + + let id = evt.metadata.id.clone(); + let author_hex = evt.metadata.author.to_lowercase(); + + let npub = public_key_to_npub(&author_hex) + .map(|s| s.to_lowercase()) + .ok(); + let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned); + + let root = evt.metadata.reaction.root.id.to_lowercase(); + + events_id.insert(id.clone(), evt.clone()); + events.push(evt.clone()); + + root_ids.entry(root).or_default().push(id.clone()); + author_ids.entry(author_hex).or_default().push(id.clone()); + if let Some(n) = npub { + npub_ids.entry(n).or_default().push(id.clone()); + } + if let Some(n05) = author_nip05 { + nip05_ids + .entry(n05.to_lowercase()) + .or_default() + .push(id.clone()); + } + } + Err(err) => { + warn!( + kind = raw.kind.as_u64(), + id = %raw.id, + author = %raw.author, + content = %truncate_log(&raw.content, 1000), + tags = ?raw.tags, + error = %err, + "Skipping malformed reaction event" + ); + } + } + } + + let sort_ids = |ids: &mut Vec<String>, + map: &BTreeMap<String, RadrootsReactionEventIndex>| { + ids.sort_unstable_by(|a, b| { + let pa = map + .get(a) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + let pb = map + .get(b) + .map(|e| e.metadata.published_at) + .unwrap_or_default(); + pb.cmp(&pa).then(a.cmp(b)) + }); + }; + + for ids in root_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in author_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in npub_ids.values_mut() { + sort_ids(ids, &events_id); + } + for ids in nip05_ids.values_mut() { + sort_ids(ids, &events_id); + } + + Ok(Self { + events, + events_id, + root_ids, + author_ids, + npub_ids, + nip05_ids, + }) + } +} + +impl EventIndexes for EventReactionIndexes { + type Event = RelayIndexerEvent; + + fn subdirs() -> &'static [IndexerKey] { + &REACTION_INDEX_DIRECTORY + } + + #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] + fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { + let empty = ProfileResolver::default(); + Self::build_with_profiles(raw_events, &empty) + } +} + +impl WriteEventIndexes for EventReactionIndexes { + fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { + let base: PathBuf = IndexerEventKind::Reaction.base_path(&settings.indexer.data_dir)?; + fs_mkdir(&[&base])?; + + // Root-level event list + { + let idxs_root = base.join("events.json"); + let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); + write_if_stale!(idxs_root, ids, updated); + } + + // Index by event ID + { + let sub = base.join("id"); + fs_mkdir(&[&sub])?; + let keys: Vec<String> = self.events_id.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), keys, updated); + + for (id, evt) in &self.events_id { + let dir = sub.join(id.to_lowercase()); + fs_mkdir(&[&dir])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("metadata.json"), evt.metadata.clone(), updated); + } + } + + // Index by Root ID + { + let sub = base.join(IndexerKey::RootId.as_str()); + fs_mkdir(&[&sub])?; + let roots: Vec<String> = self.root_ids.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), roots, updated); + + for (root, ids) in &self.root_ids { + let dir = sub.join(root); + fs_mkdir(&[&dir])?; + let metas: Vec<RadrootsReactionEventMetadata> = ids + .iter() + .filter_map(|id| self.events_id.get(id)) + .map(|e| e.metadata.clone()) + .collect(); + write_if_stale!(dir.join("events.json"), ids.clone(), updated); + write_if_stale!(dir.join("metadata.json"), metas, updated); + } + } + + // Index by Author (hex public key) + { + let sub = base.join(IndexerKey::Author.as_str()); + fs_mkdir(&[&sub])?; + let authors: Vec<String> = self.author_ids.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), authors, updated); + + for (author, ids) in &self.author_ids { + let dir = sub.join(author); + fs_mkdir(&[&dir])?; + let metas: Vec<RadrootsReactionEventMetadata> = ids + .iter() + .filter_map(|id| self.events_id.get(id)) + .map(|e| e.metadata.clone()) + .collect(); + write_if_stale!(dir.join("events.json"), ids.clone(), updated); + write_if_stale!(dir.join("metadata.json"), metas, updated); + } + } + + // Index by Npub (bech32 public key) + { + let sub = base.join(IndexerKey::Npub.as_str()); + fs_mkdir(&[&sub])?; + let npubs: Vec<String> = self.npub_ids.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), npubs, updated); + + for (npub, ids) in &self.npub_ids { + let dir = sub.join(npub); + fs_mkdir(&[&dir])?; + let metas: Vec<RadrootsReactionEventMetadata> = ids + .iter() + .filter_map(|id| self.events_id.get(id)) + .map(|e| e.metadata.clone()) + .collect(); + write_if_stale!(dir.join("events.json"), ids.clone(), updated); + write_if_stale!(dir.join("metadata.json"), metas, updated); + } + } + + // Index by NIP-05 name + { + let sub = base.join(IndexerKey::Nip05.as_str()); + fs_mkdir(&[&sub])?; + let names: Vec<String> = self.nip05_ids.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), names, updated); + + for (name, ids) in &self.nip05_ids { + let dir = sub.join(name); + fs_mkdir(&[&dir])?; + let metas: Vec<RadrootsReactionEventMetadata> = ids + .iter() + .filter_map(|id| self.events_id.get(id)) + .map(|e| e.metadata.clone()) + .collect(); + write_if_stale!(dir.join("events.json"), ids.clone(), updated); + write_if_stale!(dir.join("metadata.json"), metas, updated); + } + } + + Ok(()) + } +} diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs @@ -37,11 +37,18 @@ use crate::{ domain::{ indexer::{ kind::IndexerEventKind, - models::{EventIndexes, EventListingIndexes, EventProfileIndexes, WriteEventIndexes}, + models::{ + EventIndexes, EventListingIndexes, EventProfileIndexes, EventReactionIndexes, + WriteEventIndexes, + }, }, resolvers::profile::ProfileResolver, }, - relay::event::RelayIndexerEvent, utils::{db::IndexerDb, sqlite::{sqlite_conn, sqlite_stmt}}, + relay::event::RelayIndexerEvent, + utils::{ + db::IndexerDb, + sqlite::{sqlite_conn, sqlite_stmt}, + }, }; pub use config::Settings; pub use relay::record::RelayEventRecord; @@ -51,6 +58,7 @@ pub async fn run(settings: Settings) -> Result<()> { let tree_raw = "hashes"; let tree_events_profile = "profile_events"; let tree_events_listing = "listing_events"; + let tree_events_reaction = "reaction_events"; let tree_stats = "stats"; let last_created_at_db: u32 = db_idx @@ -100,6 +108,7 @@ pub async fn run(settings: Settings) -> Result<()> { let mut need_rebuild_listing = false; + // Handle Profile Events if let Some(profile_events) = records_kind.remove(&IndexerEventKind::Profile) { if !profile_events.is_empty() { for ev in &profile_events { @@ -141,9 +150,11 @@ pub async fn run(settings: Settings) -> Result<()> { } } + // Load current profile data (used for listings and reactions) let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_profile)?; let profiles = ProfileResolver::from_metadata(&raw_profile_events); + // Handle Listing Events if let Some(listing_events) = records_kind.remove(&IndexerEventKind::Listing) { if !listing_events.is_empty() { for ev in &listing_events { @@ -184,6 +195,47 @@ pub async fn run(settings: Settings) -> Result<()> { } } + // Handle Reaction Events + if let Some(reaction_events) = records_kind.remove(&IndexerEventKind::Reaction) { + if !reaction_events.is_empty() { + for ev in &reaction_events { + last_created_at = last_created_at.max(ev.created_at); + let id = &ev.id; + let hash = &ev.hash; + let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { + old.as_ref() == hash.as_bytes() + } else { + false + }; + if skip { + continue; + } + db_idx.insert(tree_events_reaction, id, ev)?; + db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; + } + + db_idx.insert_raw( + tree_stats, + "last_created_at", + &last_created_at.to_be_bytes(), + )?; + db_idx.flush()?; + + let raw_reaction_events: Vec<RelayIndexerEvent> = + db_idx.get_all(tree_events_reaction)?; + let reaction_indexes = + EventReactionIndexes::build_with_profiles(&raw_reaction_events, &profiles)?; + let mut updated_reaction = Vec::new(); + reaction_indexes.write(&settings, &mut updated_reaction)?; + info!( + written = updated_reaction.len(), + "Written {} reaction index files", + updated_reaction.len() + ); + } + } + + // If new profiles or listings were written, rebuild the listing indexes if need_rebuild_listing { let raw_listing_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_listing)?; let listing_indexes = @@ -197,6 +249,7 @@ pub async fn run(settings: Settings) -> Result<()> { ); } + // Sleep between runs based on configuration let elapsed = iteration_start.elapsed(); let interval = Duration::from_secs(settings.indexer.flush_interval); let delay = interval.saturating_sub(elapsed);