commit 4818ad787b21a02ba582c54b2e456c1afbbfdb4e parent 159a4ec06ea632796138af28029f46ee4fefc253 Author: triesap <137732411+triesap@users.noreply.github.com> Date: Fri, 8 Aug 2025 16:22:54 +0000 Edit `indexer` adding content-addressed shard files and manifests for listing country indexes. Edit `app` adding static build loaders for listing country pages, initialisation of global ndk instance. Diffstat:
19 files changed, 642 insertions(+), 100 deletions(-)
diff --git a/app/.env.example b/app/.env.example @@ -1,3 +1,6 @@ -PUBLIC_RADROOTS_MARKET_RELAY_URL= +VITE_PUBLIC_RADROOTS_MARKET_RELAY_URL= PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL= +VITE_PUBLIC_IDB_NAME= +VITE_PUBLIC_NDK_CACHE_NAME= +VITE_PUBLIC_NDK_CLIENT_NAME= PORT= \ No newline at end of file diff --git a/app/src/lib/utils/app/storage.ts b/app/src/lib/utils/app/storage.ts @@ -4,6 +4,7 @@ export type GlobalConfig = { theme_mode: ThemeMode; theme_key: string; locale: string; + global_relays: string[]; npub: string; }; @@ -17,5 +18,4 @@ export type PageSession = { export type PageSessionKeys = keyof PageSession; - export const idb = new IdbLib<GlobalConfigKeys, GlobalConfig, PageSessionKeys, PageSession>(); \ No newline at end of file diff --git a/app/src/params/country.ts b/app/src/params/country.ts @@ -0,0 +1,4 @@ +import type { ParamMatcher } from '@sveltejs/kit'; +export const match: ParamMatcher = (value: string): boolean => { + return value.length == 2; +}; +\ No newline at end of file diff --git a/app/src/routes/(market)/(listing)/[0=country]/+page.svelte b/app/src/routes/(market)/(listing)/[0=country]/+page.svelte @@ -0,0 +1,18 @@ +<script lang="ts"> + import type { PageProps } from "./$types"; + + let { data }: PageProps = $props(); +</script> + +<div + class={`flex flex-col w-full px-4 gap-12 justify-start items-start break-all`} +> + <p class={`font-sans font-[400] text-base text-ly0-gl`}> + {JSON.stringify(data.manifest)} + </p> + {#each data.events as event} + <p class={`font-sans font-[400] text-base text-ly0-gl`}> + {JSON.stringify(event)} + </p> + {/each} +</div> diff --git a/app/src/routes/(market)/(listing)/[0=country]/+page.ts b/app/src/routes/(market)/(listing)/[0=country]/+page.ts @@ -0,0 +1,52 @@ +import { PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL } from "$env/static/public"; +import type { RadrootsListingEventData, RadrootsListingIndexCountryManifest } from "@radroots/radroots-common-bindings"; +import { error } from "@sveltejs/kit"; +import type { EntryGenerator, PageLoad } from "./$types"; + +export const entries: EntryGenerator = async () => { + const [ + events_0_country_indexes, + ]: [ + string[] + ] = await Promise.all([ + fetch(`${PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL}/events/30402/country/indexes.json`).then(r => r.json()) + ]); + return events_0_country_indexes.map(i => ({ 0: i })) +}; + +type PageLoadData = { + country: string; + manifest: RadrootsListingIndexCountryManifest; + events: RadrootsListingEventData[]; +}; + +export const load: PageLoad<PageLoadData> = async ({ fetch, params }) => { + const { 0: country } = params; + + const [ + res_country_manifest, + ] = await Promise.all([ + fetch(`${PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL}/events/30402/country/${country}/manifest.json`) + ]); + + if (!res_country_manifest.ok) error(404, { message: `country:${country}` }); + + const manifest: RadrootsListingIndexCountryManifest = await res_country_manifest.json(); + + let events: RadrootsListingEventData[] = []; + if (manifest.shards.length > 0) { + const shard = manifest.shards[0]; + const res_country_shard = await fetch(`${PUBLIC_RADROOTS_MARKET_RELAY_INDEXES_URL}/events/30402/country/${country}/${shard.file}?v=${shard.sha256}`); + if (!res_country_shard.ok) error(500, { message: `load:${country}:${shard.file}` }); + events = await res_country_shard.json(); + } + + const data: PageLoadData = { + country, + manifest, + events, + } + return data; +} + +export const prerender = true; diff --git a/app/src/routes/+layout.svelte b/app/src/routes/+layout.svelte @@ -1,7 +1,7 @@ <script lang="ts"> - import { PUBLIC_RADROOTS_MARKET_RELAY_URL } from "$env/static/public"; + import { idb } from "$lib/utils/app/storage"; import { init_theme } from "$lib/utils/app/theme"; - import { ndk } from "@radroots/apps-lib"; + import { ndk, ndk_global } from "@radroots/apps-lib"; import { onMount, type Snippet } from "svelte"; import "../app.css"; @@ -10,12 +10,18 @@ onMount(async () => { await init_theme(); - $ndk.addExplicitRelay(PUBLIC_RADROOTS_MARKET_RELAY_URL); + await $ndk.connect(); + console.log(`[ndk] connected`); - $ndk.autoConnectUserRelays = true; - $ndk.autoFetchUserMutelist = true; + const global_relays = await idb.read_global("global_relays"); + if (!global_relays) { + console.log(`[ndk_global] no global relays added`); + } else { + $ndk_global.explicitRelayUrls = global_relays; + } - await $ndk.connect(); + await $ndk_global.connect(); + console.log(`[ndk_global] connected`); }); </script> diff --git a/crates/indexer/config.toml b/crates/indexer/config.toml @@ -1,7 +1,11 @@ +[indexer] +data_dir = "data/radroots-market-relay-indexer" +logs_dir = "logs/radroots-market-relay-indexer" +flush_interval = 20 + [relay] url = "ws://127.0.0.1:8080" db_path = "data/nostr-rs-relay/nostr.db" -[service] -output_dir = "data/radroots-market-relay-indexer" -flush_interval = 20 -\ No newline at end of file +[listings] +country_shard_size = 1000 diff --git a/crates/indexer/src/config.rs b/crates/indexer/src/config.rs @@ -12,22 +12,28 @@ pub enum SettingsError { } #[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Indexer { + pub data_dir: String, + pub logs_dir: String, + pub flush_interval: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Relay { pub url: String, pub database_path: String, } #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Service { - pub output_dir: String, - pub logs_dir: String, - pub flush_interval: u64, +pub struct Listings { + pub country_shard_size: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Settings { + pub indexer: Indexer, pub relay: Relay, - pub service: Service, + pub listings: Listings, } impl Settings { diff --git a/crates/indexer/src/domain/events/listing.rs b/crates/indexer/src/domain/events/listing.rs @@ -0,0 +1,191 @@ +use crate::domain::events::RequiredField; +use crate::relay::event::RelayIndexerEvent; +use crate::{opt_default, opt_required}; +use anyhow::Result; +use radroots_common::models::events::{ + RadrootsListingEvent, RadrootsListingEventData, RadrootsNostrEvent, +}; +use std::collections::HashMap; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum RadrootsListingEventError { + #[error("Missing or invalid tag structure")] + TagParseError, + + #[error("Missing required field: {0}")] + MissingField(String), +} + +pub fn create_radroots_listing_event_data( + id: String, + author: String, + tags: Vec<Vec<String>>, +) -> Result<RadrootsListingEventData, RadrootsListingEventError> { + let mut tags_map: HashMap<String, Vec<String>> = HashMap::new(); + let mut images = Vec::new(); + let mut location_lat: Option<String> = None; + let mut location_lng: Option<String> = None; + + for tag in &tags { + if let Some(key) = tag.get(0).map(String::as_str) { + match key { + "image" => { + if let Some(img) = tag.get(1) { + images.push(img.clone()); + } + } + "l" => { + if let Some(value) = tag.get(1) { + if let Some(hint) = tag.get(2) { + match hint.as_str() { + "dd.lat" if location_lat.is_none() => { + location_lat = Some(value.clone()) + } + "dd.lon" if location_lng.is_none() => { + location_lng = Some(value.clone()) + } + _ => {} + } + } else if value.contains(',') { + let parts: Vec<&str> = value.split(',').map(str::trim).collect(); + if parts.len() == 2 { + location_lat = Some(parts[0].to_string()); + location_lng = Some(parts[1].to_string()); + } + } + } + } + _ => { + tags_map + .entry(key.to_string()) + .or_default() + .extend_from_slice(&tag[1..]); + } + } + } + } + + let get = |key: &str, idx: usize| -> Option<String> { + tags_map.get(key).and_then(|v| v.get(idx)).cloned() + }; + + let published_at_str = opt_required!(get("published_at", 0)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let published_at = published_at_str + .parse::<u32>() + .map_err(|_| RadrootsListingEventError::MissingField("published_at".into()))?; + + let d_tag = + opt_required!(get("d", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let title = + opt_required!(get("title", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let summary = + opt_required!(get("summary", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + + let location_address = opt_required!(get("location", 0)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let location_city = opt_required!(get("location", 1)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let location_region = opt_required!(get("location", 2)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let location_country = opt_required!(get("location", 3)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + + let location_lat = + opt_required!(location_lat).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let location_lng = + opt_required!(location_lng).map_err(|e| RadrootsListingEventError::MissingField(e))?; + + let location_geohash = tags + .iter() + .filter_map(|tag| { + if tag.get(0).map(String::as_str) == Some("g") { + tag.get(1).cloned() + } else { + None + } + }) + .max_by_key(|g| g.len()) + .ok_or_else(|| RadrootsListingEventError::MissingField("location_geohash".into()))?; + + let product_kind = + opt_required!(get("key", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_category = opt_required!(get("category", 0)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_process = + opt_required!(get("process", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_lot = + opt_required!(get("lot", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_profile = opt_default!(get("profile", 0)); + let product_year = + opt_required!(get("year", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_quantity_amt = opt_required!(get("quantity", 0)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_quantity_unit = opt_required!(get("quantity", 1)) + .map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_price_amt = + opt_required!(get("price", 0)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_price_cur = + opt_required!(get("price", 1)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_price_qty_amt = + opt_required!(get("price", 2)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + let product_price_qty_unit = + opt_required!(get("price", 3)).map_err(|e| RadrootsListingEventError::MissingField(e))?; + + Ok(RadrootsListingEventData { + id, + author, + published_at, + d_tag, + title, + summary, + images, + location_address, + location_city, + location_region, + location_country, + location_lat, + location_lng, + location_geohash, + product_kind, + product_category, + product_process, + product_lot, + product_profile, + product_year, + product_quantity_amt, + product_quantity_unit, + product_price_amt, + product_price_cur, + product_price_qty_amt, + product_price_qty_unit, + }) +} + +pub trait ToRadrootsListingEvent { + fn to_radroots_listing_event(self) -> Result<RadrootsListingEvent, RadrootsListingEventError>; +} + +impl ToRadrootsListingEvent for RelayIndexerEvent { + fn to_radroots_listing_event(self) -> Result<RadrootsListingEvent, RadrootsListingEventError> { + let data = create_radroots_listing_event_data( + self.id.clone(), + self.author.clone(), + self.tags.clone(), + )?; + + Ok(RadrootsListingEvent { + event: RadrootsNostrEvent { + id: self.id, + author: self.author, + created_at: self.created_at, + kind: self.kind.as_u64().try_into().unwrap(), + content: self.content, + tags: self.tags, + sig: self.sig, + }, + data, + }) + } +} diff --git a/crates/indexer/src/domain/events/metadata.rs b/crates/indexer/src/domain/events/metadata.rs @@ -67,19 +67,17 @@ impl ToRadrootsMetadataEvent for RelayIndexerEvent { ) -> Result<RadrootsMetadataEvent, RadrootsMetadataEventError> { let data = create_radroots_metadata_event_data( self.id.clone(), - self.pubkey.clone(), + self.author.clone(), self.content.clone(), self.tags.clone(), )?; - let kind = self.kind.as_u64(); - Ok(RadrootsMetadataEvent { event: RadrootsNostrEvent { id: self.id, author: self.author, created_at: self.created_at, - kind: kind.try_into().unwrap(), + kind: self.kind.as_u64().try_into().unwrap(), content: self.content, tags: self.tags, sig: self.sig, diff --git a/crates/indexer/src/domain/events/mod.rs b/crates/indexer/src/domain/events/mod.rs @@ -1,5 +1,9 @@ +pub mod listing; pub mod metadata; +pub use listing::ToRadrootsListingEvent; +pub use metadata::ToRadrootsMetadataEvent; + #[macro_export] macro_rules! opt_required { ($opt:expr) => { @@ -7,6 +11,22 @@ macro_rules! opt_required { }; } +#[macro_export] +macro_rules! opt_default { + ($opt:expr) => { + match $opt { + Some(val) => val, + None => "".to_string(), + } + }; + ($opt:expr, $default:expr) => { + match $opt { + Some(val) => val, + None => $default.to_string(), + } + }; +} + pub trait RequiredField { type Output; fn required(self, field_name: &str) -> Result<Self::Output, String>; diff --git a/crates/indexer/src/domain/indexer/key.rs b/crates/indexer/src/domain/indexer/key.rs @@ -4,7 +4,7 @@ pub enum IndexerKey { Author, Nip05, Npub, - Geohash, + Country, } impl IndexerKey { @@ -14,7 +14,7 @@ impl IndexerKey { IndexerKey::Author => "author", IndexerKey::Nip05 => "nip05", IndexerKey::Npub => "npub", - IndexerKey::Geohash => "geohash", + IndexerKey::Country => "country", } } } @@ -25,3 +25,5 @@ pub const METADATA_INDEX_DIRECTORY: [IndexerKey; 4] = [ IndexerKey::Nip05, IndexerKey::Npub, ]; + +pub const LISTING_INDEX_DIRECTORY: [IndexerKey; 2] = [IndexerKey::Id, IndexerKey::Country]; diff --git a/crates/indexer/src/domain/indexer/kind.rs b/crates/indexer/src/domain/indexer/kind.rs @@ -1,28 +1,42 @@ +use indexer_utils::paths::{paths_join, PathsError}; use serde::de::Error as DeError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; +use std::path::PathBuf; use crate::domain::indexer::{IndexerKey, METADATA_INDEX_DIRECTORY}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum IndexerEventKind { Metadata, + Listing, } impl IndexerEventKind { - pub const ALL: [IndexerEventKind; 1] = [IndexerEventKind::Metadata]; + pub const ALL: [IndexerEventKind; 2] = [IndexerEventKind::Metadata, IndexerEventKind::Listing]; pub const fn as_u64(self) -> u64 { match self { IndexerEventKind::Metadata => 0, + IndexerEventKind::Listing => 30402, } } pub const fn paths(self) -> &'static [IndexerKey] { match self { IndexerEventKind::Metadata => &METADATA_INDEX_DIRECTORY, + IndexerEventKind::Listing => &METADATA_INDEX_DIRECTORY, } } + + pub fn base_path<P: AsRef<std::path::Path>>(self, data_dir: P) -> Result<PathBuf, PathsError> { + paths_join(&[ + data_dir.as_ref().to_str().unwrap(), + "static", + "events", + &self.as_u64().to_string(), + ]) + } } impl fmt::Display for IndexerEventKind { @@ -61,6 +75,7 @@ impl TryFrom<u64> for IndexerEventKind { fn try_from(val: u64) -> Result<Self, Self::Error> { match val { 0 => Ok(IndexerEventKind::Metadata), + 30402 => Ok(IndexerEventKind::Listing), other => Err(IndexerEventKindParseError(other)), } } diff --git a/crates/indexer/src/domain/indexer/mod.rs b/crates/indexer/src/domain/indexer/mod.rs @@ -3,46 +3,3 @@ pub mod kind; pub mod models; pub use key::{IndexerKey, METADATA_INDEX_DIRECTORY}; - -/*pub fn create_index_dirs(settings: &Settings) -> Result<()> { - for kind in IndexerEventKind::ALL { - let kind_str = kind.as_u64().to_string(); - - for subdir in kind.paths() { - fs_mkdir(&[ - settings.service.output_dir.as_str(), - "events", - &kind_str, - subdir.as_str(), - ]) - .with_context(|| { - format!( - "Failed to create directory for kind {} / {}", - kind_str, - subdir.as_str() - ) - })?; - } - } - Ok(()) -} - -pub fn write_index_events( - settings: &Settings, - events_by_kind: &HashMap<IndexerEventKind, Vec<RelayIndexerEvent>>, -) -> Result<Vec<PathBuf>> { - let mut updated = Vec::new(); - - for &kind in &IndexerEventKind::ALL { - let events = events_by_kind.get(&kind).cloned().unwrap_or_default(); - match kind { - IndexerEventKind::Metadata => { - let idx = - Event0StaticIndexes::build(&events).context("building indexes for Metadata")?; - idx.write(settings, &mut updated)?; - } - } - } - - Ok(updated) -}*/ diff --git a/crates/indexer/src/domain/indexer/models/listing.rs b/crates/indexer/src/domain/indexer/models/listing.rs @@ -0,0 +1,244 @@ +use indexer_utils::{ + file::fs_mkdir, + logs::truncate_log, + write::{compute_hash, write_hash, write_json}, +}; +use radroots_common::models::{ + events::{RadrootsListingEvent, RadrootsListingEventData}, + indexer::{RadrootsListingIndexCountryManifest, RadrootsListingIndexShardMetadata}, +}; +use std::{collections::BTreeMap, fs, path::PathBuf}; +use tracing::{instrument, warn}; + +use crate::{ + domain::{ + events::ToRadrootsListingEvent, + indexer::{ + key::LISTING_INDEX_DIRECTORY, + kind::IndexerEventKind, + models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, + IndexerKey, + }, + }, + relay::event::RelayIndexerEvent, + Settings, +}; + +macro_rules! write_if_stale { + ($path:expr, $data:expr, $updated:expr) => {{ + let hash = compute_hash(&$data)?; + let hash_path = $path.with_extension("sha256.txt"); + + let needs_write = if $path.exists() && hash_path.exists() { + let stored = fs::read_to_string(&hash_path)?; + stored.trim() != hash + } else { + true + }; + + if needs_write { + write_json(&$path, &$data)?; + write_hash(&$path, &hash)?; + $updated.push($path.clone()); + } + + hash + }}; +} + +#[derive(Debug)] +pub struct EventListingIndexes { + events: Vec<RadrootsListingEvent>, + events_id: BTreeMap<String, RadrootsListingEvent>, + country_ids: BTreeMap<String, Vec<String>>, +} + +impl EventIndexes for EventListingIndexes { + type Event = RelayIndexerEvent; + + fn subdirs() -> &'static [IndexerKey] { + &LISTING_INDEX_DIRECTORY + } + + #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] + fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { + let mut events: Vec<RadrootsListingEvent> = Vec::with_capacity(raw_events.len()); + let mut events_id: BTreeMap<String, RadrootsListingEvent> = BTreeMap::new(); + let mut country_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); + + for raw in raw_events { + match raw.clone().to_radroots_listing_event() { + Ok(evt) => { + let id = evt.event.id.clone(); + let country_code = evt.data.location_country.to_lowercase(); + + events_id.insert(id.clone(), evt.clone()); + events.push(evt.clone()); + + country_ids.entry(country_code).or_default().push(id); + } + Err(err) => { + warn!( + kind = raw.kind.as_u64(), + id = %raw.id, + author = %raw.author, + content = %truncate_log(&raw.content, 1000), + tags = ?raw.tags, + error = %err, + "Skipping malformed listing event" + ); + } + } + } + + for (_cc, ids) in country_ids.iter_mut() { + ids.sort_unstable_by(|a, b| { + let pa = events_id + .get(a) + .map(|e| e.data.published_at) + .unwrap_or_default(); + let pb = events_id + .get(b) + .map(|e| e.data.published_at) + .unwrap_or_default(); + + pb.cmp(&pa).then(a.cmp(b)) + }); + } + + Ok(EventListingIndexes { + events, + events_id, + country_ids, + }) + } +} + +impl EventListingIndexes { + fn format_shard_filename(ix: usize) -> String { + format!("shards/{:06}.json", ix) + } + + fn shard_vec<T: Clone>(items: &[T], size: usize) -> Vec<Vec<T>> { + if size == 0 { + return vec![items.to_vec()]; + } + let mut out = Vec::with_capacity((items.len() + size - 1) / size); + let mut i = 0; + while i < items.len() { + let end = (i + size).min(items.len()); + out.push(items[i..end].to_vec()); + i = end; + } + out + } +} + +impl WriteEventIndexes for EventListingIndexes { + fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { + let base: PathBuf = IndexerEventKind::Listing.base_path(&settings.indexer.data_dir)?; + fs_mkdir(&[&base])?; + + { + let idxs_root = base.join("events.json"); + let ids: Vec<&String> = self.events.iter().map(|e| &e.event.id).collect(); + write_if_stale!(idxs_root, ids, updated); + } + + { + let sub = base.join("id"); + fs_mkdir(&[&sub])?; + let keys: Vec<String> = self.events_id.keys().cloned().collect(); + write_if_stale!(sub.join("indexes.json"), keys, updated); + + for (id, evt) in &self.events_id { + let dir = sub.join(id.to_lowercase()); + fs_mkdir(&[&dir])?; + write_if_stale!(dir.join("event.json"), evt.event.clone(), updated); + write_if_stale!(dir.join("data.json"), evt.data.clone(), updated); + } + } + + { + let sub_country = base.join(IndexerKey::Country.as_str()); + fs_mkdir(&[&sub_country])?; + let country_codes: Vec<String> = self.country_ids.keys().cloned().collect(); + write_if_stale!(sub_country.join("indexes.json"), country_codes, updated); + + for (cc, ids) in &self.country_ids { + let cc_dir = sub_country.join(cc); + let shards_dir = cc_dir.join("shards"); + fs_mkdir(&[&cc_dir])?; + fs_mkdir(&[&shards_dir])?; + + let mut data_items: Vec<RadrootsListingEventData> = Vec::with_capacity(ids.len()); + for id in ids { + if let Some(evt) = self.events_id.get(id) { + data_items.push(evt.data.clone()); + } + } + + let shard_size = settings.listings.country_shard_size; + + let shards = Self::shard_vec(&data_items, shard_size); + + let (country_first_pub, country_last_pub) = + if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { + (f.published_at, l.published_at) + } else { + (0, 0) + }; + + let mut manifest = RadrootsListingIndexCountryManifest { + country: cc.clone(), + total: u32::try_from(data_items.len()).expect("too many data items for u32"), + shard_size: u32::try_from(shard_size).expect("shard_size too large for u32"), + first_published_at: country_first_pub, + last_published_at: country_last_pub, + shards: Vec::with_capacity(shards.len()), + }; + + for (ix, chunk) in shards.into_iter().enumerate() { + let file_rel = Self::format_shard_filename(ix); + let file_abs = cc_dir.join(&file_rel); + if let Some(parent) = file_abs.parent() { + fs_mkdir(&[&parent])?; + } + + let sha = write_if_stale!(file_abs, chunk, updated); + + let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( + data_items.get(ix * shard_size), + data_items.get(((ix + 1) * shard_size).saturating_sub(1)), + ) { + (f.id.clone(), f.published_at, l.id.clone(), l.published_at) + } else { + let fp = chunk + .first() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + let lp = chunk + .last() + .map(|x| (x.id.clone(), x.published_at)) + .unwrap_or_default(); + (fp.0, fp.1, lp.0, lp.1) + }; + + manifest.shards.push(RadrootsListingIndexShardMetadata { + file: file_rel, + count: u32::try_from(chunk.len()).expect("chunk length too large for u32"), + first_id, + last_id, + first_published_at: first_pub, + last_published_at: last_pub, + sha256: sha, + }); + } + + write_if_stale!(cc_dir.join("manifest.json"), manifest, updated); + } + } + + Ok(()) + } +} diff --git a/crates/indexer/src/domain/indexer/models/metadata.rs b/crates/indexer/src/domain/indexer/models/metadata.rs @@ -2,17 +2,15 @@ use indexer_utils::{ file::fs_mkdir, logs::truncate_log, nostr::public_key_to_npub, - paths::paths_join, write::{compute_hash, write_hash, write_json}, }; use radroots_common::models::events::RadrootsMetadataEvent; -use serde_json::Value; use std::{collections::BTreeMap, fs, path::PathBuf}; use tracing::{instrument, warn}; use crate::{ domain::{ - events::metadata::ToRadrootsMetadataEvent, + events::ToRadrootsMetadataEvent, indexer::{ kind::IndexerEventKind, models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, @@ -44,7 +42,7 @@ macro_rules! write_if_stale { } #[derive(Debug)] -pub struct Event0StaticIndexes { +pub struct EventMetadataIndexes { events: Vec<RadrootsMetadataEvent>, events_id: BTreeMap<String, RadrootsMetadataEvent>, events_author: BTreeMap<String, RadrootsMetadataEvent>, @@ -52,7 +50,7 @@ pub struct Event0StaticIndexes { events_npub: BTreeMap<String, RadrootsMetadataEvent>, } -impl EventIndexes for Event0StaticIndexes { +impl EventIndexes for EventMetadataIndexes { type Event = RelayIndexerEvent; fn subdirs() -> &'static [IndexerKey] { @@ -98,7 +96,7 @@ impl EventIndexes for Event0StaticIndexes { } } - Ok(Event0StaticIndexes { + Ok(EventMetadataIndexes { events, events_id, events_author, @@ -106,25 +104,11 @@ impl EventIndexes for Event0StaticIndexes { events_npub, }) } - - fn index_json(&self, subdir: IndexerKey) -> Option<Value> { - match subdir { - IndexerKey::Id => serde_json::to_value(&self.events_id).ok(), - IndexerKey::Author => serde_json::to_value(&self.events_author).ok(), - IndexerKey::Nip05 => serde_json::to_value(&self.events_nip05).ok(), - IndexerKey::Npub => serde_json::to_value(&self.events_npub).ok(), - _ => None, - } - } } -impl WriteEventIndexes for Event0StaticIndexes { +impl WriteEventIndexes for EventMetadataIndexes { fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { - let base = paths_join(&[ - &settings.service.output_dir, - "events", - &IndexerEventKind::Metadata.as_u64().to_string(), - ])?; + let base: PathBuf = IndexerEventKind::Metadata.base_path(&settings.indexer.data_dir)?; fs_mkdir(&[&base])?; let idxs_root = base.join("events.json"); diff --git a/crates/indexer/src/domain/indexer/models/mod.rs b/crates/indexer/src/domain/indexer/models/mod.rs @@ -1,13 +1,14 @@ -pub use metadata::Event0StaticIndexes; +pub mod listing; +pub mod metadata; + +pub use listing::EventListingIndexes; +pub use metadata::EventMetadataIndexes; use crate::{config::Settings, domain::indexer::IndexerKey}; use anyhow::Result; -use serde_json::Value; use std::path::PathBuf; use thiserror::Error; -pub mod metadata; - #[derive(Debug, Error)] pub enum NostrEventsStaticError { #[error("Failed to build static indexes: {0}")] @@ -22,8 +23,6 @@ pub trait EventIndexes { fn build(events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> where Self: Sized; - - fn index_json(&self, subdir: IndexerKey) -> Option<Value>; } pub trait WriteEventIndexes { diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs @@ -23,7 +23,7 @@ pub mod relay { use crate::{ domain::indexer::{ kind::IndexerEventKind, - models::{Event0StaticIndexes, EventIndexes, WriteEventIndexes}, + models::{EventIndexes, EventListingIndexes, EventMetadataIndexes, WriteEventIndexes}, }, relay::event::RelayIndexerEvent, }; @@ -31,9 +31,10 @@ pub use config::Settings; pub use relay::record::RelayEventRecord; pub async fn run(settings: Settings) -> Result<()> { - let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.service.output_dir))?; + let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?; let tree_raw = "hashes"; let tree_events_metadata = "metadata_events"; + let tree_events_listing = "listing_events"; let tree_stats = "stats"; let last_created_at_db: u32 = db_idx @@ -109,7 +110,7 @@ pub async fn run(settings: Settings) -> Result<()> { let raw_metadata_events: Vec<RelayIndexerEvent> = db_idx.get_all(tree_events_metadata)?; - let indexed_metadata_events = Event0StaticIndexes::build(&raw_metadata_events)?; + let indexed_metadata_events = EventMetadataIndexes::build(&raw_metadata_events)?; let mut updated_indexes = Vec::new(); indexed_metadata_events.write(&settings, &mut updated_indexes)?; info!( @@ -120,8 +121,46 @@ pub async fn run(settings: Settings) -> Result<()> { } } + if let Some(listing_events) = records_kind.remove(&IndexerEventKind::Listing) { + if !listing_events.is_empty() { + for ev in &listing_events { + last_created_at = last_created_at.max(ev.created_at); + let id = &ev.id; + let hash = &ev.hash; + let skip = if let Some(old) = db_idx.get_raw(tree_raw, id)? { + old.as_ref() == hash.as_bytes() + } else { + false + }; + if skip { + continue; + } + db_idx.insert(tree_events_listing, id, ev)?; + db_idx.insert_raw(tree_raw, id, hash.as_bytes())?; + } + + db_idx.insert_raw( + tree_stats, + "last_created_at", + &last_created_at.to_be_bytes(), + )?; + db_idx.flush()?; + + let raw_listing_events: Vec<RelayIndexerEvent> = + db_idx.get_all(tree_events_listing)?; + let listing_indexes = EventListingIndexes::build(&raw_listing_events)?; + let mut updated_listing = Vec::new(); + listing_indexes.write(&settings, &mut updated_listing)?; + info!( + written = updated_listing.len(), + "Written {} listing index files", + updated_listing.len() + ); + } + } + let elapsed = iteration_start.elapsed(); - let interval = Duration::from_secs(settings.service.flush_interval); + let interval = Duration::from_secs(settings.indexer.flush_interval); let delay = interval.saturating_sub(elapsed); info!( elapsed_ms = elapsed.as_millis(), diff --git a/crates/indexer/src/main.rs b/crates/indexer/src/main.rs @@ -17,7 +17,7 @@ async fn setup() -> Result<()> { let settings = Settings::load(&args.config)?; - telemetry::init(&settings.service.logs_dir); + telemetry::init(&settings.indexer.logs_dir); info!("Service starting"); run(settings).await