listing.rs (26830B)
1 use anyhow::anyhow; 2 use crate::utils::io::{fs_mkdir, safe_path_segment, write_json_if_changed}; 3 use crate::utils::nostr::public_key_to_npub; 4 use crate::utils::strings::truncate_log; 5 use radroots_events::listing::{RadrootsListingEventIndex, RadrootsListingEventMetadata}; 6 use radroots_events_indexed::{RadrootsEventsIndexedManifest, RadrootsEventsIndexedShardMetadata}; 7 use std::{collections::BTreeMap, path::PathBuf}; 8 use tracing::{instrument, warn}; 9 10 use crate::{ 11 audit, 12 domain::{ 13 events::ToRadrootsListingEventIndex, 14 indexer::{ 15 key::LISTING_INDEX_DIRECTORY, 16 kind::IndexerEventKind, 17 models::{EventIndexes, NostrEventsStaticError, WriteEventIndexes}, 18 }, 19 resolvers::profile::ProfileResolver, 20 }, 21 relay::event::RelayIndexerEvent, 22 Settings, 23 }; 24 25 #[derive(Debug)] 26 pub struct EventListingIndexes { 27 events: Vec<RadrootsListingEventIndex>, 28 events_id: BTreeMap<String, usize>, 29 country_ids: BTreeMap<String, Vec<String>>, 30 author_ids: BTreeMap<String, Vec<String>>, 31 npub_ids: BTreeMap<String, Vec<String>>, 32 nip05_ids: BTreeMap<String, Vec<String>>, 33 } 34 35 impl EventListingIndexes { 36 pub fn build_with_profiles( 37 raw_events: &[RelayIndexerEvent], 38 profiles: &ProfileResolver, 39 ) -> Result<Self, NostrEventsStaticError> { 40 let mut events: Vec<RadrootsListingEventIndex> = Vec::with_capacity(raw_events.len()); 41 let mut events_id: BTreeMap<String, usize> = BTreeMap::new(); 42 let mut country_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); 43 let mut author_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); 44 let mut npub_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); 45 let mut nip05_ids: BTreeMap<String, Vec<String>> = BTreeMap::new(); 46 47 for raw in raw_events { 48 match raw.to_radroots_listing_event() { 49 Ok(evt) => { 50 audit::log_listing_event(&evt); 51 52 let id = evt.metadata.id.clone(); 53 let author_hex = evt.metadata.author.to_lowercase(); 54 55 let npub = public_key_to_npub(&author_hex) 56 .map(|mut s| { 57 s.make_ascii_lowercase(); 58 s 59 }) 60 .ok(); 61 let author_nip05 = profiles.nip05_for_author(&author_hex).map(str::to_owned); 62 63 let country_opt = evt 64 .metadata 65 .listing 66 .location 67 .as_ref() 68 .and_then(|loc| loc.country.as_ref()) 69 .map(|c| c.to_lowercase()); 70 71 events.push(evt); 72 let idx = events.len() - 1; 73 events_id.insert(id.clone(), idx); 74 75 if let Some(country) = country_opt { 76 country_ids.entry(country).or_default().push(id.clone()); 77 } 78 79 author_ids.entry(author_hex).or_default().push(id.clone()); 80 81 if let Some(n) = npub { 82 npub_ids.entry(n).or_default().push(id.clone()); 83 } 84 if let Some(n05) = author_nip05 { 85 nip05_ids.entry(n05).or_default().push(id.clone()); 86 } 87 } 88 Err(err) => { 89 warn!( 90 kind = raw.kind.as_u64(), 91 id = %raw.id, 92 author = %raw.author, 93 content = %truncate_log(&raw.content, 1000), 94 tags = ?raw.tags, 95 error = %err, 96 "Skipping malformed listing event" 97 ); 98 } 99 } 100 } 101 102 let sort_ids = |ids: &mut Vec<String>, 103 map: &BTreeMap<String, usize>, 104 events: &[RadrootsListingEventIndex]| { 105 ids.sort_unstable_by(|a, b| { 106 let pa = map 107 .get(a) 108 .map(|idx| events[*idx].metadata.published_at) 109 .unwrap_or_default(); 110 let pb = map 111 .get(b) 112 .map(|idx| events[*idx].metadata.published_at) 113 .unwrap_or_default(); 114 pb.cmp(&pa).then(a.cmp(b)) 115 }); 116 }; 117 118 for ids in country_ids.values_mut() { 119 sort_ids(ids, &events_id, &events); 120 } 121 for ids in author_ids.values_mut() { 122 sort_ids(ids, &events_id, &events); 123 } 124 for ids in npub_ids.values_mut() { 125 sort_ids(ids, &events_id, &events); 126 } 127 for ids in nip05_ids.values_mut() { 128 ids.sort_unstable_by(|a, b| { 129 let pa = events_id 130 .get(a) 131 .map(|idx| events[*idx].metadata.published_at) 132 .unwrap_or_default(); 133 let pb = events_id 134 .get(b) 135 .map(|idx| events[*idx].metadata.published_at) 136 .unwrap_or_default(); 137 pb.cmp(&pa).then(a.cmp(b)) 138 }); 139 } 140 141 Ok(EventListingIndexes { 142 events, 143 events_id, 144 country_ids, 145 author_ids, 146 npub_ids, 147 nip05_ids, 148 }) 149 } 150 } 151 152 impl EventIndexes for EventListingIndexes { 153 type Event = RelayIndexerEvent; 154 155 fn subdirs() -> &'static [crate::domain::indexer::key::IndexerKey] { 156 &LISTING_INDEX_DIRECTORY 157 } 158 159 #[instrument(skip(raw_events), fields(event_count = raw_events.len()))] 160 fn build(raw_events: &[Self::Event]) -> Result<Self, NostrEventsStaticError> { 161 let empty = ProfileResolver::default(); 162 Self::build_with_profiles(raw_events, &empty) 163 } 164 } 165 166 impl EventListingIndexes { 167 fn format_shard_filename(ix: usize) -> String { 168 format!("shards/{:06}.json", ix) 169 } 170 171 fn shard_vec<T: Clone>(items: &[T], size: usize) -> Vec<Vec<T>> { 172 if items.is_empty() { 173 return Vec::new(); 174 } 175 if size == 0 { 176 return vec![items.to_vec()]; 177 } 178 let mut out = Vec::with_capacity((items.len() + size - 1) / size); 179 let mut i = 0; 180 while i < items.len() { 181 let end = (i + size).min(items.len()); 182 out.push(items[i..end].to_vec()); 183 i = end; 184 } 185 out 186 } 187 188 fn manifest_shard_size(configured: usize, len: usize) -> usize { 189 if configured == 0 { 190 len 191 } else { 192 configured 193 } 194 } 195 196 fn effective_shard_size(configured: usize, len: usize) -> usize { 197 let size = Self::manifest_shard_size(configured, len); 198 size.max(1) 199 } 200 201 fn usize_to_u32(value: usize, label: &str) -> anyhow::Result<u32> { 202 u32::try_from(value).map_err(|_| anyhow!("{label} too large for u32")) 203 } 204 } 205 206 impl WriteEventIndexes for EventListingIndexes { 207 fn write(&self, settings: &Settings, updated: &mut Vec<PathBuf>) -> anyhow::Result<()> { 208 let base: PathBuf = IndexerEventKind::Listing.base_path(&settings.indexer.data_dir)?; 209 fs_mkdir(&[&base])?; 210 211 { 212 let idxs_root = base.join("events.json"); 213 let ids = super::sorted_event_ids( 214 &self.events, 215 |event| event.metadata.published_at, 216 |event| &event.event.id, 217 ); 218 write_json_if_changed(&idxs_root, &ids, updated)?; 219 } 220 221 { 222 let sub = base.join("id"); 223 fs_mkdir(&[&sub])?; 224 let keys: Vec<String> = self 225 .events_id 226 .keys() 227 .filter_map(|key| safe_path_segment(&key.to_lowercase())) 228 .collect(); 229 write_json_if_changed(&sub.join("indexes.json"), &keys, updated)?; 230 231 for (id, idx) in &self.events_id { 232 let id_lower = id.to_lowercase(); 233 let Some(dir_key) = safe_path_segment(&id_lower) else { 234 warn!(id = %id, "Skipping unsafe listing id path segment"); 235 continue; 236 }; 237 let dir = sub.join(dir_key); 238 let evt = &self.events[*idx]; 239 fs_mkdir(&[&dir])?; 240 write_json_if_changed(&dir.join("event.json"), &evt.event, updated)?; 241 write_json_if_changed(&dir.join("data.json"), &evt.metadata, updated)?; 242 } 243 } 244 245 { 246 let sub_country = base.join(crate::domain::indexer::key::IndexerKey::Country.as_str()); 247 fs_mkdir(&[&sub_country])?; 248 let country_codes: Vec<String> = self 249 .country_ids 250 .keys() 251 .filter_map(|cc| safe_path_segment(cc)) 252 .collect(); 253 write_json_if_changed(&sub_country.join("indexes.json"), &country_codes, updated)?; 254 255 for (cc, ids) in &self.country_ids { 256 let Some(dir_key) = safe_path_segment(cc) else { 257 warn!(country = %cc, "Skipping unsafe country path segment"); 258 continue; 259 }; 260 let cc_dir = sub_country.join(dir_key); 261 let shards_dir = cc_dir.join("shards"); 262 fs_mkdir(&[&cc_dir])?; 263 fs_mkdir(&[&shards_dir])?; 264 265 let mut data_items: Vec<&RadrootsListingEventMetadata> = 266 Vec::with_capacity(ids.len()); 267 for id in ids { 268 if let Some(idx) = self.events_id.get(id) { 269 data_items.push(&self.events[*idx].metadata); 270 } 271 } 272 273 let shard_size = settings.listings.country_shard_size; 274 let manifest_shard_size = 275 Self::manifest_shard_size(shard_size, data_items.len()); 276 let effective_shard_size = 277 Self::effective_shard_size(shard_size, data_items.len()); 278 279 let shards = Self::shard_vec(&data_items, shard_size); 280 281 let (country_first_pub, country_last_pub) = 282 if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { 283 (f.published_at, l.published_at) 284 } else { 285 (0, 0) 286 }; 287 288 let mut manifest = RadrootsEventsIndexedManifest { 289 country: cc.clone(), 290 total: Self::usize_to_u32(data_items.len(), "data items")?, 291 shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?, 292 first_published_at: country_first_pub, 293 last_published_at: country_last_pub, 294 shards: Vec::with_capacity(shards.len()), 295 }; 296 297 for (ix, chunk) in shards.into_iter().enumerate() { 298 let file_rel = Self::format_shard_filename(ix); 299 let file_abs = cc_dir.join(&file_rel); 300 if let Some(parent) = file_abs.parent() { 301 fs_mkdir(&[&parent])?; 302 } 303 304 let sha = write_json_if_changed(&file_abs, &chunk, updated)?; 305 306 let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( 307 data_items.get(ix * effective_shard_size), 308 data_items 309 .get(((ix + 1) * effective_shard_size).saturating_sub(1)), 310 ) { 311 (f.id.clone(), f.published_at, l.id.clone(), l.published_at) 312 } else { 313 let fp = chunk 314 .first() 315 .map(|x| (x.id.clone(), x.published_at)) 316 .unwrap_or_default(); 317 let lp = chunk 318 .last() 319 .map(|x| (x.id.clone(), x.published_at)) 320 .unwrap_or_default(); 321 (fp.0, fp.1, lp.0, lp.1) 322 }; 323 324 manifest.shards.push(RadrootsEventsIndexedShardMetadata { 325 file: file_rel, 326 count: Self::usize_to_u32(chunk.len(), "chunk length")?, 327 first_id, 328 last_id, 329 first_published_at: first_pub, 330 last_published_at: last_pub, 331 sha256: sha, 332 }); 333 } 334 335 write_json_if_changed(&cc_dir.join("manifest.json"), &manifest, updated)?; 336 } 337 } 338 339 { 340 let sub_author = base.join(crate::domain::indexer::key::IndexerKey::Author.as_str()); 341 fs_mkdir(&[&sub_author])?; 342 let authors: Vec<String> = self 343 .author_ids 344 .keys() 345 .filter_map(|author| safe_path_segment(author)) 346 .collect(); 347 write_json_if_changed(&sub_author.join("indexes.json"), &authors, updated)?; 348 349 for (author, ids) in &self.author_ids { 350 let Some(dir_key) = safe_path_segment(author) else { 351 warn!(author = %author, "Skipping unsafe author path segment"); 352 continue; 353 }; 354 let dir = sub_author.join(dir_key); 355 let shards_dir = dir.join("shards"); 356 fs_mkdir(&[&dir, &shards_dir])?; 357 358 let mut data_items: Vec<&RadrootsListingEventMetadata> = 359 Vec::with_capacity(ids.len()); 360 for id in ids { 361 if let Some(idx) = self.events_id.get(id) { 362 data_items.push(&self.events[*idx].metadata); 363 } 364 } 365 366 let shard_size = settings.listings.profile_shard_size; 367 let manifest_shard_size = 368 Self::manifest_shard_size(shard_size, data_items.len()); 369 let effective_shard_size = 370 Self::effective_shard_size(shard_size, data_items.len()); 371 let shards = Self::shard_vec(&data_items, shard_size); 372 373 let (first_pub, last_pub) = 374 if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { 375 (f.published_at, l.published_at) 376 } else { 377 (0, 0) 378 }; 379 380 let mut manifest = RadrootsEventsIndexedManifest { 381 country: author.clone(), 382 total: Self::usize_to_u32(data_items.len(), "data items")?, 383 shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?, 384 first_published_at: first_pub, 385 last_published_at: last_pub, 386 shards: Vec::with_capacity(shards.len()), 387 }; 388 389 for (ix, chunk) in shards.into_iter().enumerate() { 390 let file_rel = Self::format_shard_filename(ix); 391 let file_abs = dir.join(&file_rel); 392 if let Some(parent) = file_abs.parent() { 393 fs_mkdir(&[&parent])?; 394 } 395 396 let sha = write_json_if_changed(&file_abs, &chunk, updated)?; 397 398 let (first_id, first_published_at, last_id, last_published_at) = 399 if let (Some(f), Some(l)) = ( 400 data_items.get(ix * effective_shard_size), 401 data_items 402 .get(((ix + 1) * effective_shard_size).saturating_sub(1)), 403 ) { 404 (f.id.clone(), f.published_at, l.id.clone(), l.published_at) 405 } else { 406 let fp = data_items 407 .get(ix * effective_shard_size) 408 .map(|x| (x.id.clone(), x.published_at)) 409 .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) 410 .unwrap_or_default(); 411 412 let lp = data_items 413 .get(((ix + 1) * effective_shard_size).saturating_sub(1)) 414 .map(|x| (x.id.clone(), x.published_at)) 415 .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) 416 .unwrap_or_default(); 417 418 (fp.0, fp.1, lp.0, lp.1) 419 }; 420 421 manifest.shards.push(RadrootsEventsIndexedShardMetadata { 422 file: file_rel, 423 count: Self::usize_to_u32(chunk.len(), "chunk length")?, 424 first_id, 425 last_id, 426 first_published_at, 427 last_published_at, 428 sha256: sha, 429 }); 430 } 431 432 write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?; 433 } 434 } 435 436 { 437 let sub_npub = base.join(crate::domain::indexer::key::IndexerKey::Npub.as_str()); 438 fs_mkdir(&[&sub_npub])?; 439 let npubs: Vec<String> = self 440 .npub_ids 441 .keys() 442 .filter_map(|npub| safe_path_segment(npub)) 443 .collect(); 444 write_json_if_changed(&sub_npub.join("indexes.json"), &npubs, updated)?; 445 446 for (npub, ids) in &self.npub_ids { 447 let Some(dir_key) = safe_path_segment(npub) else { 448 warn!(npub = %npub, "Skipping unsafe npub path segment"); 449 continue; 450 }; 451 let dir = sub_npub.join(dir_key); 452 let shards_dir = dir.join("shards"); 453 fs_mkdir(&[&dir, &shards_dir])?; 454 455 let mut data_items: Vec<&RadrootsListingEventMetadata> = 456 Vec::with_capacity(ids.len()); 457 for id in ids { 458 if let Some(idx) = self.events_id.get(id) { 459 data_items.push(&self.events[*idx].metadata); 460 } 461 } 462 463 let shard_size = settings.listings.profile_shard_size; 464 let manifest_shard_size = 465 Self::manifest_shard_size(shard_size, data_items.len()); 466 let effective_shard_size = 467 Self::effective_shard_size(shard_size, data_items.len()); 468 let shards = Self::shard_vec(&data_items, shard_size); 469 470 let (first_pub, last_pub) = 471 if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { 472 (f.published_at, l.published_at) 473 } else { 474 (0, 0) 475 }; 476 477 let mut manifest = RadrootsEventsIndexedManifest { 478 country: npub.clone(), 479 total: Self::usize_to_u32(data_items.len(), "data items")?, 480 shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?, 481 first_published_at: first_pub, 482 last_published_at: last_pub, 483 shards: Vec::with_capacity(shards.len()), 484 }; 485 486 for (ix, chunk) in shards.into_iter().enumerate() { 487 let file_rel = Self::format_shard_filename(ix); 488 let file_abs = dir.join(&file_rel); 489 if let Some(parent) = file_abs.parent() { 490 fs_mkdir(&[&parent])?; 491 } 492 493 let sha = write_json_if_changed(&file_abs, &chunk, updated)?; 494 495 let (first_id, first_published_at, last_id, last_published_at) = 496 if let (Some(f), Some(l)) = ( 497 data_items.get(ix * effective_shard_size), 498 data_items 499 .get(((ix + 1) * effective_shard_size).saturating_sub(1)), 500 ) { 501 (f.id.clone(), f.published_at, l.id.clone(), l.published_at) 502 } else { 503 let fp = data_items 504 .get(ix * effective_shard_size) 505 .map(|x| (x.id.clone(), x.published_at)) 506 .or_else(|| chunk.first().map(|x| (x.id.clone(), x.published_at))) 507 .unwrap_or_default(); 508 509 let lp = data_items 510 .get(((ix + 1) * effective_shard_size).saturating_sub(1)) 511 .map(|x| (x.id.clone(), x.published_at)) 512 .or_else(|| chunk.last().map(|x| (x.id.clone(), x.published_at))) 513 .unwrap_or_default(); 514 515 (fp.0, fp.1, lp.0, lp.1) 516 }; 517 518 manifest.shards.push(RadrootsEventsIndexedShardMetadata { 519 file: file_rel, 520 count: Self::usize_to_u32(chunk.len(), "chunk length")?, 521 first_id, 522 last_id, 523 first_published_at, 524 last_published_at, 525 sha256: sha, 526 }); 527 } 528 529 write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?; 530 } 531 532 { 533 let sub_nip05 = base.join(crate::domain::indexer::key::IndexerKey::Nip05.as_str()); 534 fs_mkdir(&[&sub_nip05])?; 535 let names: Vec<String> = self 536 .nip05_ids 537 .keys() 538 .filter_map(|name| safe_path_segment(name)) 539 .collect(); 540 write_json_if_changed(&sub_nip05.join("indexes.json"), &names, updated)?; 541 542 for (name, ids) in &self.nip05_ids { 543 let Some(dir_key) = safe_path_segment(name) else { 544 warn!(nip05 = %name, "Skipping unsafe nip05 path segment"); 545 continue; 546 }; 547 let dir = sub_nip05.join(dir_key); 548 let shards_dir = dir.join("shards"); 549 fs_mkdir(&[&dir, &shards_dir])?; 550 551 let mut data_items: Vec<&RadrootsListingEventMetadata> = 552 Vec::with_capacity(ids.len()); 553 for id in ids { 554 if let Some(idx) = self.events_id.get(id) { 555 data_items.push(&self.events[*idx].metadata); 556 } 557 } 558 559 let shard_size = settings.listings.profile_shard_size; 560 let manifest_shard_size = 561 Self::manifest_shard_size(shard_size, data_items.len()); 562 let effective_shard_size = 563 Self::effective_shard_size(shard_size, data_items.len()); 564 let shards = Self::shard_vec(&data_items, shard_size); 565 566 let (first_pub, last_pub) = 567 if let (Some(f), Some(l)) = (data_items.first(), data_items.last()) { 568 (f.published_at, l.published_at) 569 } else { 570 (0, 0) 571 }; 572 573 let mut manifest = RadrootsEventsIndexedManifest { 574 country: name.clone(), 575 total: Self::usize_to_u32(data_items.len(), "data items")?, 576 shard_size: Self::usize_to_u32(manifest_shard_size, "shard_size")?, 577 first_published_at: first_pub, 578 last_published_at: last_pub, 579 shards: Vec::with_capacity(shards.len()), 580 }; 581 582 for (ix, chunk) in shards.into_iter().enumerate() { 583 let file_rel = Self::format_shard_filename(ix); 584 let file_abs = dir.join(&file_rel); 585 if let Some(parent) = file_abs.parent() { 586 fs_mkdir(&[&parent])?; 587 } 588 589 let sha = write_json_if_changed(&file_abs, &chunk, updated)?; 590 591 let (first_id, first_pub, last_id, last_pub) = if let (Some(f), Some(l)) = ( 592 data_items.get(ix * effective_shard_size), 593 data_items 594 .get(((ix + 1) * effective_shard_size).saturating_sub(1)), 595 ) { 596 (f.id.clone(), f.published_at, l.id.clone(), l.published_at) 597 } else { 598 let fp = chunk 599 .first() 600 .map(|x| (x.id.clone(), x.published_at)) 601 .unwrap_or_default(); 602 let lp = chunk 603 .last() 604 .map(|x| (x.id.clone(), x.published_at)) 605 .unwrap_or_default(); 606 (fp.0, fp.1, lp.0, lp.1) 607 }; 608 609 manifest.shards.push(RadrootsEventsIndexedShardMetadata { 610 file: file_rel, 611 count: Self::usize_to_u32(chunk.len(), "chunk length")?, 612 first_id, 613 last_id, 614 first_published_at: first_pub, 615 last_published_at: last_pub, 616 sha256: sha, 617 }); 618 } 619 620 write_json_if_changed(&dir.join("manifest.json"), &manifest, updated)?; 621 } 622 } 623 } 624 625 Ok(()) 626 } 627 } 628 629 #[cfg(test)] 630 mod tests { 631 use super::EventListingIndexes; 632 633 #[test] 634 fn shard_vec_empty_returns_empty() { 635 let items: Vec<u32> = Vec::new(); 636 let shards = EventListingIndexes::shard_vec(&items, 0); 637 assert!(shards.is_empty()); 638 } 639 640 #[test] 641 fn shard_vec_zero_size_groups_all() { 642 let items = vec![1u32, 2, 3]; 643 let shards = EventListingIndexes::shard_vec(&items, 0); 644 assert_eq!(shards.len(), 1); 645 assert_eq!(shards[0], items); 646 } 647 }