tangle_indexer


git clone https://radroots.dev/git/tangle_indexer.git
Log | Files | Refs | Submodules | LICENSE

runner.rs (22582B)


      1 #![forbid(unsafe_code)]
      2 
      3 use anyhow::{Context, Result};
      4 use rusqlite::params;
      5 use std::{
      6     collections::HashMap,
      7     time::{Duration, Instant},
      8 };
      9 use tracing::{info, warn};
     10 
     11 use crate::{
     12     audit,
     13     domain::{
     14         indexer::{
     15             kind::IndexerEventKind,
     16             models::{
     17                 EventCommentIndexes, EventFollowIndexes, EventIndexes, EventJobFeedbackIndexes,
     18                 EventJobRequestIndexes, EventJobResultIndexes, EventListingIndexes,
     19                 EventPostIndexes, EventProfileIndexes, EventReactionIndexes, WriteEventIndexes,
     20             },
     21         },
     22         resolvers::profile::ProfileResolver,
     23     },
     24     relay::{event::RelayIndexerEvent, record::RelayEventRecord},
     25     utils::{
     26         db::IndexerDb,
     27         sqlite::{sqlite_conn, sqlite_stmt},
     28     },
     29     Settings,
     30 };
     31 use radroots_events::kinds::{KIND_JOB_REQUEST_MIN, KIND_JOB_RESULT_MIN};
     32 
     33 const TREE_RAW: &str = "hashes";
     34 const TREE_EVENTS_PROFILE: &str = "profile_events";
     35 const TREE_EVENTS_POST: &str = "post_events";
     36 const TREE_EVENTS_FOLLOW: &str = "follow_events";
     37 const TREE_EVENTS_LISTING: &str = "listing_events";
     38 const TREE_EVENTS_REACTION: &str = "reaction_events";
     39 const TREE_EVENTS_COMMENT: &str = "comment_events";
     40 const TREE_EVENTS_JOB_REQUEST: &str = "job_request_events";
     41 const TREE_EVENTS_JOB_RESULT: &str = "job_result_events";
     42 const TREE_EVENTS_JOB_FEEDBACK: &str = "job_feedback_events";
     43 const TREE_STATS: &str = "stats";
     44 
     45 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
     46 enum CursorMode {
     47     RowId,
     48     CreatedAt,
     49 }
     50 
     51 #[derive(Debug, Default)]
     52 struct CursorState {
     53     last_created_at: u32,
     54     last_event_hash: String,
     55     last_rowid: u64,
     56 }
     57 
     58 impl CursorState {
     59     fn load(db_idx: &IndexerDb) -> Result<Self> {
     60         let last_created_at = db_idx
     61             .get_raw(TREE_STATS, "last_created_at")?
     62             .and_then(|ivec| parse_u32(ivec.as_ref(), "last_created_at"))
     63             .unwrap_or(0);
     64         let last_event_hash = db_idx
     65             .get_raw(TREE_STATS, "last_event_hash")?
     66             .and_then(|ivec| parse_string(ivec.as_ref(), "last_event_hash"))
     67             .unwrap_or_default();
     68         let last_rowid = db_idx
     69             .get_raw(TREE_STATS, "last_rowid")?
     70             .and_then(|ivec| parse_u64(ivec.as_ref(), "last_rowid"))
     71             .unwrap_or(0);
     72 
     73         Ok(Self {
     74             last_created_at,
     75             last_event_hash,
     76             last_rowid,
     77         })
     78     }
     79 }
     80 
     81 fn parse_u32(raw: &[u8], label: &str) -> Option<u32> {
     82     if raw.len() != 4 {
     83         warn!(len = raw.len(), label, "Ignoring invalid cursor value");
     84         return None;
     85     }
     86     let arr: [u8; 4] = raw.try_into().ok()?;
     87     Some(u32::from_be_bytes(arr))
     88 }
     89 
     90 fn parse_u64(raw: &[u8], label: &str) -> Option<u64> {
     91     if raw.len() != 8 {
     92         warn!(len = raw.len(), label, "Ignoring invalid cursor value");
     93         return None;
     94     }
     95     let arr: [u8; 8] = raw.try_into().ok()?;
     96     Some(u64::from_be_bytes(arr))
     97 }
     98 
     99 fn parse_string(raw: &[u8], label: &str) -> Option<String> {
    100     match std::str::from_utf8(raw) {
    101         Ok(value) => Some(value.to_string()),
    102         Err(err) => {
    103             warn!(error = %err, label, "Ignoring invalid cursor value");
    104             None
    105         }
    106     }
    107 }
    108 
    109 struct EventBatch {
    110     events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>>,
    111     next_created: Option<(u32, String)>,
    112     next_rowid: Option<u64>,
    113     record_count: usize,
    114 }
    115 
    116 impl EventBatch {
    117     fn from_records(records: Vec<RelayEventRecord>) -> Result<Self> {
    118         let record_count = records.len();
    119         let next_created = records
    120             .last()
    121             .map(|rec| (rec.created_at, rec.event_hash.clone()));
    122         let next_rowid = records.last().and_then(|rec| rec.rowid);
    123         let mut events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> =
    124             HashMap::with_capacity(IndexerEventKind::GROUPS.len());
    125 
    126         for rec in records {
    127             let iev = RelayIndexerEvent::try_from(rec)?;
    128             audit::log_indexer_event(&iev);
    129             events_by_kind.entry(iev.kind.group()).or_default().push(iev);
    130         }
    131 
    132         Ok(Self {
    133             events_by_kind,
    134             next_created,
    135             next_rowid,
    136             record_count,
    137         })
    138     }
    139 }
    140 
    141 struct RelayQueries {
    142     created: String,
    143     rowid: String,
    144 }
    145 
    146 fn build_relay_queries() -> RelayQueries {
    147     let relay_kind_filter = IndexerEventKind::relay_kind_filter_sql();
    148     let created = format!(
    149         "SELECT hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND (created_at > ? OR (created_at = ? AND hex(event_hash) > ?)) ORDER BY created_at ASC, hex(event_hash) ASC",
    150         relay_kind_filter
    151     );
    152     let rowid = format!(
    153         "SELECT rowid, hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND rowid > ? ORDER BY rowid ASC",
    154         relay_kind_filter
    155     );
    156     RelayQueries { created, rowid }
    157 }
    158 
    159 fn resolve_cursor_mode(relay_db: &rusqlite::Connection, rowid_query: &str) -> CursorMode {
    160     match sqlite_stmt(relay_db, rowid_query) {
    161         Ok(_) => CursorMode::RowId,
    162         Err(err) => {
    163             warn!(
    164                 error = %err,
    165                 "Rowid cursor unavailable, falling back to created_at cursor"
    166             );
    167             CursorMode::CreatedAt
    168         }
    169     }
    170 }
    171 
    172 fn load_records(
    173     relay_db: &rusqlite::Connection,
    174     mode: CursorMode,
    175     queries: &RelayQueries,
    176     cursor: &CursorState,
    177 ) -> Result<Vec<RelayEventRecord>> {
    178     match mode {
    179         CursorMode::RowId => {
    180             let mut stmt = sqlite_stmt(relay_db, &queries.rowid)
    181                 .context("Could not prepare rowid event query")?;
    182             let rows = stmt.query_map(
    183                 params![cursor.last_rowid],
    184                 RelayEventRecord::from_row_with_rowid,
    185             )?;
    186             rows.collect::<Result<Vec<_>, _>>()
    187                 .context("collecting RelayEventRecord rows")
    188         }
    189         CursorMode::CreatedAt => {
    190             let mut stmt = sqlite_stmt(relay_db, &queries.created)
    191                 .context("Could not prepare created_at event query")?;
    192             let rows = stmt.query_map(
    193                 params![
    194                     cursor.last_created_at,
    195                     cursor.last_created_at,
    196                     &cursor.last_event_hash
    197                 ],
    198                 RelayEventRecord::from_row,
    199             )?;
    200             rows.collect::<Result<Vec<_>, _>>()
    201                 .context("collecting RelayEventRecord rows")
    202         }
    203     }
    204 }
    205 
    206 fn update_cursor(
    207     db_idx: &IndexerDb,
    208     cursor: &mut CursorState,
    209     mode: CursorMode,
    210     batch: &mut EventBatch,
    211 ) -> Result<bool> {
    212     let mut cursor_updated = false;
    213     match mode {
    214         CursorMode::CreatedAt => {
    215             if let Some((created_at, event_hash)) = batch.next_created.take() {
    216                 cursor.last_created_at = created_at;
    217                 cursor.last_event_hash = event_hash;
    218                 db_idx.insert_raw(
    219                     TREE_STATS,
    220                     "last_created_at",
    221                     &cursor.last_created_at.to_be_bytes(),
    222                 )?;
    223                 db_idx.insert_raw(
    224                     TREE_STATS,
    225                     "last_event_hash",
    226                     cursor.last_event_hash.as_bytes(),
    227                 )?;
    228                 cursor_updated = true;
    229             }
    230         }
    231         CursorMode::RowId => {
    232             if let Some(rowid) = batch.next_rowid.take() {
    233                 cursor.last_rowid = rowid;
    234                 db_idx.insert_raw(TREE_STATS, "last_rowid", &cursor.last_rowid.to_be_bytes())?;
    235                 cursor_updated = true;
    236             }
    237         }
    238     }
    239     Ok(cursor_updated)
    240 }
    241 
    242 #[derive(Default)]
    243 struct ChangeFlags {
    244     profiles: bool,
    245     posts: bool,
    246     follows: bool,
    247     listings: bool,
    248     reactions: bool,
    249     comments: bool,
    250     job_requests: bool,
    251     job_results: bool,
    252     job_feedback: bool,
    253 }
    254 
    255 impl ChangeFlags {
    256     fn needs_profiles(&self) -> bool {
    257         self.profiles
    258             || self.listings
    259             || self.reactions
    260             || self.comments
    261             || self.posts
    262             || self.follows
    263             || self.job_requests
    264             || self.job_results
    265             || self.job_feedback
    266     }
    267 }
    268 
    269 fn insert_event(
    270     db_idx: &IndexerDb,
    271     tree: &str,
    272     raw_tree: &str,
    273     ev: &RelayIndexerEvent,
    274 ) -> Result<bool> {
    275     let id = &ev.id;
    276     let hash = &ev.hash;
    277     let skip = if let Some(old) = db_idx.get_raw(raw_tree, id)? {
    278         old.as_ref() == hash.as_bytes()
    279     } else {
    280         false
    281     };
    282     if skip {
    283         return Ok(false);
    284     }
    285     db_idx.insert(tree, id, ev)?;
    286     db_idx.insert_raw(raw_tree, id, hash.as_bytes())?;
    287     Ok(true)
    288 }
    289 
    290 fn insert_events(
    291     db_idx: &IndexerDb,
    292     tree: &str,
    293     raw_tree: &str,
    294     events: &[RelayIndexerEvent],
    295 ) -> Result<bool> {
    296     let mut any_new = false;
    297     for ev in events {
    298         if insert_event(db_idx, tree, raw_tree, ev)? {
    299             any_new = true;
    300         }
    301     }
    302     Ok(any_new)
    303 }
    304 
    305 fn write_indexes<T: WriteEventIndexes>(
    306     settings: &Settings,
    307     label: Option<&str>,
    308     indexes: T,
    309 ) -> Result<()> {
    310     let mut updated = Vec::new();
    311     indexes.write(settings, &mut updated)?;
    312     match label {
    313         Some(label) => info!(
    314             written = updated.len(),
    315             "Written {} {} index files",
    316             updated.len(),
    317             label
    318         ),
    319         None => info!(written = updated.len(), "Written {} index files", updated.len()),
    320     }
    321     Ok(())
    322 }
    323 
    324 pub async fn run(settings: Settings) -> Result<()> {
    325     let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?;
    326     let mut cursor = CursorState::load(&db_idx)?;
    327     let relay_queries = build_relay_queries();
    328 
    329     let mut profiles = ProfileResolver::default();
    330     let mut profiles_loaded = false;
    331     let mut cursor_mode: Option<CursorMode> = None;
    332 
    333     loop {
    334         let iteration_start = Instant::now();
    335         let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| {
    336             format!(
    337                 "Could not open relay DB at {}",
    338                 settings.relay.database_path
    339             )
    340         })?;
    341         if cursor_mode.is_none() {
    342             cursor_mode = Some(resolve_cursor_mode(&relay_db, &relay_queries.rowid));
    343         }
    344 
    345         let mode = cursor_mode.unwrap_or(CursorMode::CreatedAt);
    346         let records = load_records(&relay_db, mode, &relay_queries, &cursor)?;
    347 
    348         let mut batch = EventBatch::from_records(records)?;
    349         info!(record_count = batch.record_count, "Loaded relay records");
    350 
    351         let mut changes = ChangeFlags::default();
    352         let mut raw_listing_events: Option<Vec<RelayIndexerEvent>> = None;
    353 
    354         if let Some(profile_events) = batch.events_by_kind.remove(&IndexerEventKind::Profile) {
    355             if insert_events(&db_idx, TREE_EVENTS_PROFILE, TREE_RAW, &profile_events)? {
    356                 let raw_profile_events: Vec<RelayIndexerEvent> =
    357                     db_idx.get_all(TREE_EVENTS_PROFILE)?;
    358                 let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?;
    359                 write_indexes(&settings, None, indexed_profile_events)?;
    360 
    361                 profiles = ProfileResolver::from_metadata(&raw_profile_events);
    362                 profiles_loaded = true;
    363                 audit::set_profile_resolver(profiles.clone());
    364                 changes.profiles = true;
    365             }
    366         }
    367 
    368         if let Some(post_events) = batch.events_by_kind.remove(&IndexerEventKind::Post) {
    369             changes.posts = insert_events(&db_idx, TREE_EVENTS_POST, TREE_RAW, &post_events)?;
    370         }
    371 
    372         if let Some(follow_events) = batch.events_by_kind.remove(&IndexerEventKind::Follow) {
    373             changes.follows = insert_events(&db_idx, TREE_EVENTS_FOLLOW, TREE_RAW, &follow_events)?;
    374         }
    375 
    376         if let Some(listing_events) = batch.events_by_kind.remove(&IndexerEventKind::Listing) {
    377             if insert_events(&db_idx, TREE_EVENTS_LISTING, TREE_RAW, &listing_events)? {
    378                 raw_listing_events = Some(db_idx.get_all(TREE_EVENTS_LISTING)?);
    379                 changes.listings = true;
    380             }
    381         }
    382 
    383         if let Some(reaction_events) = batch.events_by_kind.remove(&IndexerEventKind::Reaction) {
    384             changes.reactions =
    385                 insert_events(&db_idx, TREE_EVENTS_REACTION, TREE_RAW, &reaction_events)?;
    386         }
    387 
    388         if let Some(comment_events) = batch.events_by_kind.remove(&IndexerEventKind::Comment) {
    389             changes.comments =
    390                 insert_events(&db_idx, TREE_EVENTS_COMMENT, TREE_RAW, &comment_events)?;
    391         }
    392 
    393         if let Some(job_request_events) =
    394             batch.events_by_kind.remove(&IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN))
    395         {
    396             changes.job_requests = insert_events(
    397                 &db_idx,
    398                 TREE_EVENTS_JOB_REQUEST,
    399                 TREE_RAW,
    400                 &job_request_events,
    401             )?;
    402         }
    403 
    404         if let Some(job_result_events) =
    405             batch.events_by_kind.remove(&IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN))
    406         {
    407             changes.job_results = insert_events(
    408                 &db_idx,
    409                 TREE_EVENTS_JOB_RESULT,
    410                 TREE_RAW,
    411                 &job_result_events,
    412             )?;
    413         }
    414 
    415         if let Some(job_feedback_events) =
    416             batch.events_by_kind.remove(&IndexerEventKind::JobFeedback)
    417         {
    418             changes.job_feedback = insert_events(
    419                 &db_idx,
    420                 TREE_EVENTS_JOB_FEEDBACK,
    421                 TREE_RAW,
    422                 &job_feedback_events,
    423             )?;
    424         }
    425 
    426         if !batch.events_by_kind.is_empty() {
    427             let kinds: Vec<IndexerEventKind> =
    428                 batch.events_by_kind.keys().copied().collect();
    429             warn!(kinds = ?kinds, "Unhandled indexer event kinds");
    430         }
    431 
    432         if changes.needs_profiles() && !profiles_loaded {
    433             let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_PROFILE)?;
    434             profiles = ProfileResolver::from_metadata(&raw_profile_events);
    435             profiles_loaded = true;
    436             audit::set_profile_resolver(profiles.clone());
    437         }
    438 
    439         if changes.reactions {
    440             let raw_reaction_events: Vec<RelayIndexerEvent> =
    441                 db_idx.get_all(TREE_EVENTS_REACTION)?;
    442             let reaction_indexes =
    443                 EventReactionIndexes::build_with_profiles(&raw_reaction_events, &profiles)?;
    444             write_indexes(&settings, Some("reaction"), reaction_indexes)?;
    445         }
    446 
    447         if changes.comments {
    448             let raw_comment_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_COMMENT)?;
    449             let comment_indexes =
    450                 EventCommentIndexes::build_with_profiles(&raw_comment_events, &profiles)?;
    451             write_indexes(&settings, Some("comment"), comment_indexes)?;
    452         }
    453 
    454         if changes.posts {
    455             let raw_post_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_POST)?;
    456             let post_indexes = EventPostIndexes::build_with_profiles(&raw_post_events, &profiles)?;
    457             write_indexes(&settings, Some("post"), post_indexes)?;
    458         }
    459 
    460         if changes.follows {
    461             let raw_follow_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_FOLLOW)?;
    462             let follow_indexes =
    463                 EventFollowIndexes::build_with_profiles(&raw_follow_events, &profiles)?;
    464             write_indexes(&settings, Some("follow"), follow_indexes)?;
    465         }
    466 
    467         if changes.job_requests {
    468             let raw_job_request_events: Vec<RelayIndexerEvent> =
    469                 db_idx.get_all(TREE_EVENTS_JOB_REQUEST)?;
    470             let job_request_indexes =
    471                 EventJobRequestIndexes::build_with_profiles(&raw_job_request_events, &profiles)?;
    472             write_indexes(&settings, Some("job request"), job_request_indexes)?;
    473         }
    474 
    475         if changes.job_results {
    476             let raw_job_result_events: Vec<RelayIndexerEvent> =
    477                 db_idx.get_all(TREE_EVENTS_JOB_RESULT)?;
    478             let job_result_indexes =
    479                 EventJobResultIndexes::build_with_profiles(&raw_job_result_events, &profiles)?;
    480             write_indexes(&settings, Some("job result"), job_result_indexes)?;
    481         }
    482 
    483         if changes.job_feedback {
    484             let raw_job_feedback_events: Vec<RelayIndexerEvent> =
    485                 db_idx.get_all(TREE_EVENTS_JOB_FEEDBACK)?;
    486             let job_feedback_indexes =
    487                 EventJobFeedbackIndexes::build_with_profiles(&raw_job_feedback_events, &profiles)?;
    488             write_indexes(&settings, Some("job feedback"), job_feedback_indexes)?;
    489         }
    490 
    491         if changes.listings || changes.profiles {
    492             let raw_listing_events = match raw_listing_events.take() {
    493                 Some(events) => events,
    494                 None => db_idx.get_all(TREE_EVENTS_LISTING)?,
    495             };
    496             let listing_indexes =
    497                 EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?;
    498             write_indexes(&settings, Some("listing"), listing_indexes)?;
    499         }
    500 
    501         let cursor_updated = update_cursor(&db_idx, &mut cursor, mode, &mut batch)?;
    502         if cursor_updated {
    503             db_idx.flush()?;
    504         }
    505 
    506         let elapsed = iteration_start.elapsed();
    507         let interval = Duration::from_secs(settings.indexer.flush_interval);
    508         let delay = interval.saturating_sub(elapsed);
    509         info!(
    510             elapsed_ms = elapsed.as_millis(),
    511             sleeping_ms = delay.as_millis(),
    512             "Iteration complete"
    513         );
    514         tokio::time::sleep(delay).await;
    515     }
    516 }
    517 
    518 #[cfg(test)]
    519 mod tests {
    520     use super::{
    521         build_relay_queries, parse_string, parse_u32, parse_u64, resolve_cursor_mode, update_cursor,
    522         CursorMode, CursorState, EventBatch,
    523     };
    524     use crate::domain::indexer::kind::IndexerEventKind;
    525     use crate::relay::event::RelayRawEvent;
    526     use crate::relay::record::RelayEventRecord;
    527     use crate::utils::db::IndexerDb;
    528     use radroots_events::kinds::KIND_JOB_REQUEST_MIN;
    529     use std::collections::HashMap;
    530     use tempfile::tempdir;
    531 
    532     fn make_record(rowid: u64, event_hash: &str, author: &str, created_at: u32, kind: u32) -> RelayEventRecord {
    533         let raw = RelayRawEvent {
    534             id: event_hash.to_string(),
    535             pubkey: author.to_string(),
    536             created_at,
    537             kind,
    538             tags: Vec::new(),
    539             content: "hello".to_string(),
    540             sig: "sig".to_string(),
    541         };
    542         let content = serde_json::to_string(&raw).expect("json");
    543         RelayEventRecord {
    544             rowid: Some(rowid),
    545             event_hash: event_hash.to_string(),
    546             author: author.to_string(),
    547             created_at,
    548             kind: IndexerEventKind::try_from(kind as u64).expect("kind"),
    549             content,
    550         }
    551     }
    552 
    553     #[test]
    554     fn parse_helpers_reject_invalid_lengths() {
    555         assert!(parse_u32(&[0u8; 3], "u32").is_none());
    556         assert!(parse_u64(&[0u8; 7], "u64").is_none());
    557     }
    558 
    559     #[test]
    560     fn parse_string_rejects_invalid_utf8() {
    561         assert!(parse_string(&[0xff, 0xfe], "str").is_none());
    562     }
    563 
    564     #[test]
    565     fn event_batch_groups_job_request_kinds() {
    566         let author = "a".repeat(64);
    567         let rec = make_record(
    568             1,
    569             "1".repeat(64).as_str(),
    570             &author,
    571             10,
    572             KIND_JOB_REQUEST_MIN + 1,
    573         );
    574         let batch = EventBatch::from_records(vec![rec]).expect("batch");
    575         assert!(batch
    576             .events_by_kind
    577             .contains_key(&IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN)));
    578     }
    579 
    580     #[test]
    581     fn resolve_cursor_mode_falls_back_without_rowid() {
    582         let conn = rusqlite::Connection::open_in_memory().expect("conn");
    583         conn.execute(
    584             "CREATE TABLE event (event_hash BLOB PRIMARY KEY, author BLOB, created_at INTEGER, kind INTEGER, content TEXT) WITHOUT ROWID",
    585             [],
    586         )
    587         .expect("create table");
    588         let queries = build_relay_queries();
    589         let mode = resolve_cursor_mode(&conn, &queries.rowid);
    590         assert_eq!(mode, CursorMode::CreatedAt);
    591     }
    592 
    593     #[test]
    594     fn resolve_cursor_mode_uses_rowid_when_available() {
    595         let conn = rusqlite::Connection::open_in_memory().expect("conn");
    596         conn.execute(
    597             "CREATE TABLE event (event_hash BLOB, author BLOB, created_at INTEGER, kind INTEGER, content TEXT)",
    598             [],
    599         )
    600         .expect("create table");
    601         let queries = build_relay_queries();
    602         let mode = resolve_cursor_mode(&conn, &queries.rowid);
    603         assert_eq!(mode, CursorMode::RowId);
    604     }
    605 
    606     #[test]
    607     fn update_cursor_writes_created_at_state() {
    608         let dir = tempdir().expect("tempdir");
    609         let db_idx = IndexerDb::open(dir.path().join("db").to_str().expect("path"))
    610             .expect("open db");
    611         let mut cursor = CursorState::default();
    612         let mut batch = EventBatch {
    613             events_by_kind: HashMap::new(),
    614             next_created: Some((42, "hash".to_string())),
    615             next_rowid: None,
    616             record_count: 0,
    617         };
    618 
    619         let updated = update_cursor(&db_idx, &mut cursor, CursorMode::CreatedAt, &mut batch)
    620             .expect("update cursor");
    621         assert!(updated);
    622         assert_eq!(cursor.last_created_at, 42);
    623         assert_eq!(cursor.last_event_hash, "hash");
    624         db_idx.flush().expect("flush");
    625         let stored = db_idx
    626             .get_raw("stats", "last_created_at")
    627             .expect("get raw")
    628             .expect("value");
    629         assert_eq!(stored.as_ref(), &42u32.to_be_bytes());
    630     }
    631 
    632     #[test]
    633     fn update_cursor_writes_rowid_state() {
    634         let dir = tempdir().expect("tempdir");
    635         let db_idx = IndexerDb::open(dir.path().join("db").to_str().expect("path"))
    636             .expect("open db");
    637         let mut cursor = CursorState::default();
    638         let mut batch = EventBatch {
    639             events_by_kind: HashMap::new(),
    640             next_created: None,
    641             next_rowid: Some(7),
    642             record_count: 0,
    643         };
    644 
    645         let updated = update_cursor(&db_idx, &mut cursor, CursorMode::RowId, &mut batch)
    646             .expect("update cursor");
    647         assert!(updated);
    648         assert_eq!(cursor.last_rowid, 7);
    649         db_idx.flush().expect("flush");
    650         let stored = db_idx
    651             .get_raw("stats", "last_rowid")
    652             .expect("get raw")
    653             .expect("value");
    654         assert_eq!(stored.as_ref(), &7u64.to_be_bytes());
    655     }
    656 }