runner.rs (22582B)
1 #![forbid(unsafe_code)] 2 3 use anyhow::{Context, Result}; 4 use rusqlite::params; 5 use std::{ 6 collections::HashMap, 7 time::{Duration, Instant}, 8 }; 9 use tracing::{info, warn}; 10 11 use crate::{ 12 audit, 13 domain::{ 14 indexer::{ 15 kind::IndexerEventKind, 16 models::{ 17 EventCommentIndexes, EventFollowIndexes, EventIndexes, EventJobFeedbackIndexes, 18 EventJobRequestIndexes, EventJobResultIndexes, EventListingIndexes, 19 EventPostIndexes, EventProfileIndexes, EventReactionIndexes, WriteEventIndexes, 20 }, 21 }, 22 resolvers::profile::ProfileResolver, 23 }, 24 relay::{event::RelayIndexerEvent, record::RelayEventRecord}, 25 utils::{ 26 db::IndexerDb, 27 sqlite::{sqlite_conn, sqlite_stmt}, 28 }, 29 Settings, 30 }; 31 use radroots_events::kinds::{KIND_JOB_REQUEST_MIN, KIND_JOB_RESULT_MIN}; 32 33 const TREE_RAW: &str = "hashes"; 34 const TREE_EVENTS_PROFILE: &str = "profile_events"; 35 const TREE_EVENTS_POST: &str = "post_events"; 36 const TREE_EVENTS_FOLLOW: &str = "follow_events"; 37 const TREE_EVENTS_LISTING: &str = "listing_events"; 38 const TREE_EVENTS_REACTION: &str = "reaction_events"; 39 const TREE_EVENTS_COMMENT: &str = "comment_events"; 40 const TREE_EVENTS_JOB_REQUEST: &str = "job_request_events"; 41 const TREE_EVENTS_JOB_RESULT: &str = "job_result_events"; 42 const TREE_EVENTS_JOB_FEEDBACK: &str = "job_feedback_events"; 43 const TREE_STATS: &str = "stats"; 44 45 #[derive(Clone, Copy, Debug, PartialEq, Eq)] 46 enum CursorMode { 47 RowId, 48 CreatedAt, 49 } 50 51 #[derive(Debug, Default)] 52 struct CursorState { 53 last_created_at: u32, 54 last_event_hash: String, 55 last_rowid: u64, 56 } 57 58 impl CursorState { 59 fn load(db_idx: &IndexerDb) -> Result<Self> { 60 let last_created_at = db_idx 61 .get_raw(TREE_STATS, "last_created_at")? 62 .and_then(|ivec| parse_u32(ivec.as_ref(), "last_created_at")) 63 .unwrap_or(0); 64 let last_event_hash = db_idx 65 .get_raw(TREE_STATS, "last_event_hash")? 66 .and_then(|ivec| parse_string(ivec.as_ref(), "last_event_hash")) 67 .unwrap_or_default(); 68 let last_rowid = db_idx 69 .get_raw(TREE_STATS, "last_rowid")? 70 .and_then(|ivec| parse_u64(ivec.as_ref(), "last_rowid")) 71 .unwrap_or(0); 72 73 Ok(Self { 74 last_created_at, 75 last_event_hash, 76 last_rowid, 77 }) 78 } 79 } 80 81 fn parse_u32(raw: &[u8], label: &str) -> Option<u32> { 82 if raw.len() != 4 { 83 warn!(len = raw.len(), label, "Ignoring invalid cursor value"); 84 return None; 85 } 86 let arr: [u8; 4] = raw.try_into().ok()?; 87 Some(u32::from_be_bytes(arr)) 88 } 89 90 fn parse_u64(raw: &[u8], label: &str) -> Option<u64> { 91 if raw.len() != 8 { 92 warn!(len = raw.len(), label, "Ignoring invalid cursor value"); 93 return None; 94 } 95 let arr: [u8; 8] = raw.try_into().ok()?; 96 Some(u64::from_be_bytes(arr)) 97 } 98 99 fn parse_string(raw: &[u8], label: &str) -> Option<String> { 100 match std::str::from_utf8(raw) { 101 Ok(value) => Some(value.to_string()), 102 Err(err) => { 103 warn!(error = %err, label, "Ignoring invalid cursor value"); 104 None 105 } 106 } 107 } 108 109 struct EventBatch { 110 events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>>, 111 next_created: Option<(u32, String)>, 112 next_rowid: Option<u64>, 113 record_count: usize, 114 } 115 116 impl EventBatch { 117 fn from_records(records: Vec<RelayEventRecord>) -> Result<Self> { 118 let record_count = records.len(); 119 let next_created = records 120 .last() 121 .map(|rec| (rec.created_at, rec.event_hash.clone())); 122 let next_rowid = records.last().and_then(|rec| rec.rowid); 123 let mut events_by_kind: HashMap<IndexerEventKind, Vec<RelayIndexerEvent>> = 124 HashMap::with_capacity(IndexerEventKind::GROUPS.len()); 125 126 for rec in records { 127 let iev = RelayIndexerEvent::try_from(rec)?; 128 audit::log_indexer_event(&iev); 129 events_by_kind.entry(iev.kind.group()).or_default().push(iev); 130 } 131 132 Ok(Self { 133 events_by_kind, 134 next_created, 135 next_rowid, 136 record_count, 137 }) 138 } 139 } 140 141 struct RelayQueries { 142 created: String, 143 rowid: String, 144 } 145 146 fn build_relay_queries() -> RelayQueries { 147 let relay_kind_filter = IndexerEventKind::relay_kind_filter_sql(); 148 let created = format!( 149 "SELECT hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND (created_at > ? OR (created_at = ? AND hex(event_hash) > ?)) ORDER BY created_at ASC, hex(event_hash) ASC", 150 relay_kind_filter 151 ); 152 let rowid = format!( 153 "SELECT rowid, hex(event_hash), hex(author), created_at, kind, content FROM event WHERE ({}) AND rowid > ? ORDER BY rowid ASC", 154 relay_kind_filter 155 ); 156 RelayQueries { created, rowid } 157 } 158 159 fn resolve_cursor_mode(relay_db: &rusqlite::Connection, rowid_query: &str) -> CursorMode { 160 match sqlite_stmt(relay_db, rowid_query) { 161 Ok(_) => CursorMode::RowId, 162 Err(err) => { 163 warn!( 164 error = %err, 165 "Rowid cursor unavailable, falling back to created_at cursor" 166 ); 167 CursorMode::CreatedAt 168 } 169 } 170 } 171 172 fn load_records( 173 relay_db: &rusqlite::Connection, 174 mode: CursorMode, 175 queries: &RelayQueries, 176 cursor: &CursorState, 177 ) -> Result<Vec<RelayEventRecord>> { 178 match mode { 179 CursorMode::RowId => { 180 let mut stmt = sqlite_stmt(relay_db, &queries.rowid) 181 .context("Could not prepare rowid event query")?; 182 let rows = stmt.query_map( 183 params![cursor.last_rowid], 184 RelayEventRecord::from_row_with_rowid, 185 )?; 186 rows.collect::<Result<Vec<_>, _>>() 187 .context("collecting RelayEventRecord rows") 188 } 189 CursorMode::CreatedAt => { 190 let mut stmt = sqlite_stmt(relay_db, &queries.created) 191 .context("Could not prepare created_at event query")?; 192 let rows = stmt.query_map( 193 params![ 194 cursor.last_created_at, 195 cursor.last_created_at, 196 &cursor.last_event_hash 197 ], 198 RelayEventRecord::from_row, 199 )?; 200 rows.collect::<Result<Vec<_>, _>>() 201 .context("collecting RelayEventRecord rows") 202 } 203 } 204 } 205 206 fn update_cursor( 207 db_idx: &IndexerDb, 208 cursor: &mut CursorState, 209 mode: CursorMode, 210 batch: &mut EventBatch, 211 ) -> Result<bool> { 212 let mut cursor_updated = false; 213 match mode { 214 CursorMode::CreatedAt => { 215 if let Some((created_at, event_hash)) = batch.next_created.take() { 216 cursor.last_created_at = created_at; 217 cursor.last_event_hash = event_hash; 218 db_idx.insert_raw( 219 TREE_STATS, 220 "last_created_at", 221 &cursor.last_created_at.to_be_bytes(), 222 )?; 223 db_idx.insert_raw( 224 TREE_STATS, 225 "last_event_hash", 226 cursor.last_event_hash.as_bytes(), 227 )?; 228 cursor_updated = true; 229 } 230 } 231 CursorMode::RowId => { 232 if let Some(rowid) = batch.next_rowid.take() { 233 cursor.last_rowid = rowid; 234 db_idx.insert_raw(TREE_STATS, "last_rowid", &cursor.last_rowid.to_be_bytes())?; 235 cursor_updated = true; 236 } 237 } 238 } 239 Ok(cursor_updated) 240 } 241 242 #[derive(Default)] 243 struct ChangeFlags { 244 profiles: bool, 245 posts: bool, 246 follows: bool, 247 listings: bool, 248 reactions: bool, 249 comments: bool, 250 job_requests: bool, 251 job_results: bool, 252 job_feedback: bool, 253 } 254 255 impl ChangeFlags { 256 fn needs_profiles(&self) -> bool { 257 self.profiles 258 || self.listings 259 || self.reactions 260 || self.comments 261 || self.posts 262 || self.follows 263 || self.job_requests 264 || self.job_results 265 || self.job_feedback 266 } 267 } 268 269 fn insert_event( 270 db_idx: &IndexerDb, 271 tree: &str, 272 raw_tree: &str, 273 ev: &RelayIndexerEvent, 274 ) -> Result<bool> { 275 let id = &ev.id; 276 let hash = &ev.hash; 277 let skip = if let Some(old) = db_idx.get_raw(raw_tree, id)? { 278 old.as_ref() == hash.as_bytes() 279 } else { 280 false 281 }; 282 if skip { 283 return Ok(false); 284 } 285 db_idx.insert(tree, id, ev)?; 286 db_idx.insert_raw(raw_tree, id, hash.as_bytes())?; 287 Ok(true) 288 } 289 290 fn insert_events( 291 db_idx: &IndexerDb, 292 tree: &str, 293 raw_tree: &str, 294 events: &[RelayIndexerEvent], 295 ) -> Result<bool> { 296 let mut any_new = false; 297 for ev in events { 298 if insert_event(db_idx, tree, raw_tree, ev)? { 299 any_new = true; 300 } 301 } 302 Ok(any_new) 303 } 304 305 fn write_indexes<T: WriteEventIndexes>( 306 settings: &Settings, 307 label: Option<&str>, 308 indexes: T, 309 ) -> Result<()> { 310 let mut updated = Vec::new(); 311 indexes.write(settings, &mut updated)?; 312 match label { 313 Some(label) => info!( 314 written = updated.len(), 315 "Written {} {} index files", 316 updated.len(), 317 label 318 ), 319 None => info!(written = updated.len(), "Written {} index files", updated.len()), 320 } 321 Ok(()) 322 } 323 324 pub async fn run(settings: Settings) -> Result<()> { 325 let db_idx = IndexerDb::open(&format!("{}/indexer_db", settings.indexer.data_dir))?; 326 let mut cursor = CursorState::load(&db_idx)?; 327 let relay_queries = build_relay_queries(); 328 329 let mut profiles = ProfileResolver::default(); 330 let mut profiles_loaded = false; 331 let mut cursor_mode: Option<CursorMode> = None; 332 333 loop { 334 let iteration_start = Instant::now(); 335 let relay_db = sqlite_conn(&settings.relay.database_path).with_context(|| { 336 format!( 337 "Could not open relay DB at {}", 338 settings.relay.database_path 339 ) 340 })?; 341 if cursor_mode.is_none() { 342 cursor_mode = Some(resolve_cursor_mode(&relay_db, &relay_queries.rowid)); 343 } 344 345 let mode = cursor_mode.unwrap_or(CursorMode::CreatedAt); 346 let records = load_records(&relay_db, mode, &relay_queries, &cursor)?; 347 348 let mut batch = EventBatch::from_records(records)?; 349 info!(record_count = batch.record_count, "Loaded relay records"); 350 351 let mut changes = ChangeFlags::default(); 352 let mut raw_listing_events: Option<Vec<RelayIndexerEvent>> = None; 353 354 if let Some(profile_events) = batch.events_by_kind.remove(&IndexerEventKind::Profile) { 355 if insert_events(&db_idx, TREE_EVENTS_PROFILE, TREE_RAW, &profile_events)? { 356 let raw_profile_events: Vec<RelayIndexerEvent> = 357 db_idx.get_all(TREE_EVENTS_PROFILE)?; 358 let indexed_profile_events = EventProfileIndexes::build(&raw_profile_events)?; 359 write_indexes(&settings, None, indexed_profile_events)?; 360 361 profiles = ProfileResolver::from_metadata(&raw_profile_events); 362 profiles_loaded = true; 363 audit::set_profile_resolver(profiles.clone()); 364 changes.profiles = true; 365 } 366 } 367 368 if let Some(post_events) = batch.events_by_kind.remove(&IndexerEventKind::Post) { 369 changes.posts = insert_events(&db_idx, TREE_EVENTS_POST, TREE_RAW, &post_events)?; 370 } 371 372 if let Some(follow_events) = batch.events_by_kind.remove(&IndexerEventKind::Follow) { 373 changes.follows = insert_events(&db_idx, TREE_EVENTS_FOLLOW, TREE_RAW, &follow_events)?; 374 } 375 376 if let Some(listing_events) = batch.events_by_kind.remove(&IndexerEventKind::Listing) { 377 if insert_events(&db_idx, TREE_EVENTS_LISTING, TREE_RAW, &listing_events)? { 378 raw_listing_events = Some(db_idx.get_all(TREE_EVENTS_LISTING)?); 379 changes.listings = true; 380 } 381 } 382 383 if let Some(reaction_events) = batch.events_by_kind.remove(&IndexerEventKind::Reaction) { 384 changes.reactions = 385 insert_events(&db_idx, TREE_EVENTS_REACTION, TREE_RAW, &reaction_events)?; 386 } 387 388 if let Some(comment_events) = batch.events_by_kind.remove(&IndexerEventKind::Comment) { 389 changes.comments = 390 insert_events(&db_idx, TREE_EVENTS_COMMENT, TREE_RAW, &comment_events)?; 391 } 392 393 if let Some(job_request_events) = 394 batch.events_by_kind.remove(&IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN)) 395 { 396 changes.job_requests = insert_events( 397 &db_idx, 398 TREE_EVENTS_JOB_REQUEST, 399 TREE_RAW, 400 &job_request_events, 401 )?; 402 } 403 404 if let Some(job_result_events) = 405 batch.events_by_kind.remove(&IndexerEventKind::JobResult(KIND_JOB_RESULT_MIN)) 406 { 407 changes.job_results = insert_events( 408 &db_idx, 409 TREE_EVENTS_JOB_RESULT, 410 TREE_RAW, 411 &job_result_events, 412 )?; 413 } 414 415 if let Some(job_feedback_events) = 416 batch.events_by_kind.remove(&IndexerEventKind::JobFeedback) 417 { 418 changes.job_feedback = insert_events( 419 &db_idx, 420 TREE_EVENTS_JOB_FEEDBACK, 421 TREE_RAW, 422 &job_feedback_events, 423 )?; 424 } 425 426 if !batch.events_by_kind.is_empty() { 427 let kinds: Vec<IndexerEventKind> = 428 batch.events_by_kind.keys().copied().collect(); 429 warn!(kinds = ?kinds, "Unhandled indexer event kinds"); 430 } 431 432 if changes.needs_profiles() && !profiles_loaded { 433 let raw_profile_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_PROFILE)?; 434 profiles = ProfileResolver::from_metadata(&raw_profile_events); 435 profiles_loaded = true; 436 audit::set_profile_resolver(profiles.clone()); 437 } 438 439 if changes.reactions { 440 let raw_reaction_events: Vec<RelayIndexerEvent> = 441 db_idx.get_all(TREE_EVENTS_REACTION)?; 442 let reaction_indexes = 443 EventReactionIndexes::build_with_profiles(&raw_reaction_events, &profiles)?; 444 write_indexes(&settings, Some("reaction"), reaction_indexes)?; 445 } 446 447 if changes.comments { 448 let raw_comment_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_COMMENT)?; 449 let comment_indexes = 450 EventCommentIndexes::build_with_profiles(&raw_comment_events, &profiles)?; 451 write_indexes(&settings, Some("comment"), comment_indexes)?; 452 } 453 454 if changes.posts { 455 let raw_post_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_POST)?; 456 let post_indexes = EventPostIndexes::build_with_profiles(&raw_post_events, &profiles)?; 457 write_indexes(&settings, Some("post"), post_indexes)?; 458 } 459 460 if changes.follows { 461 let raw_follow_events: Vec<RelayIndexerEvent> = db_idx.get_all(TREE_EVENTS_FOLLOW)?; 462 let follow_indexes = 463 EventFollowIndexes::build_with_profiles(&raw_follow_events, &profiles)?; 464 write_indexes(&settings, Some("follow"), follow_indexes)?; 465 } 466 467 if changes.job_requests { 468 let raw_job_request_events: Vec<RelayIndexerEvent> = 469 db_idx.get_all(TREE_EVENTS_JOB_REQUEST)?; 470 let job_request_indexes = 471 EventJobRequestIndexes::build_with_profiles(&raw_job_request_events, &profiles)?; 472 write_indexes(&settings, Some("job request"), job_request_indexes)?; 473 } 474 475 if changes.job_results { 476 let raw_job_result_events: Vec<RelayIndexerEvent> = 477 db_idx.get_all(TREE_EVENTS_JOB_RESULT)?; 478 let job_result_indexes = 479 EventJobResultIndexes::build_with_profiles(&raw_job_result_events, &profiles)?; 480 write_indexes(&settings, Some("job result"), job_result_indexes)?; 481 } 482 483 if changes.job_feedback { 484 let raw_job_feedback_events: Vec<RelayIndexerEvent> = 485 db_idx.get_all(TREE_EVENTS_JOB_FEEDBACK)?; 486 let job_feedback_indexes = 487 EventJobFeedbackIndexes::build_with_profiles(&raw_job_feedback_events, &profiles)?; 488 write_indexes(&settings, Some("job feedback"), job_feedback_indexes)?; 489 } 490 491 if changes.listings || changes.profiles { 492 let raw_listing_events = match raw_listing_events.take() { 493 Some(events) => events, 494 None => db_idx.get_all(TREE_EVENTS_LISTING)?, 495 }; 496 let listing_indexes = 497 EventListingIndexes::build_with_profiles(&raw_listing_events, &profiles)?; 498 write_indexes(&settings, Some("listing"), listing_indexes)?; 499 } 500 501 let cursor_updated = update_cursor(&db_idx, &mut cursor, mode, &mut batch)?; 502 if cursor_updated { 503 db_idx.flush()?; 504 } 505 506 let elapsed = iteration_start.elapsed(); 507 let interval = Duration::from_secs(settings.indexer.flush_interval); 508 let delay = interval.saturating_sub(elapsed); 509 info!( 510 elapsed_ms = elapsed.as_millis(), 511 sleeping_ms = delay.as_millis(), 512 "Iteration complete" 513 ); 514 tokio::time::sleep(delay).await; 515 } 516 } 517 518 #[cfg(test)] 519 mod tests { 520 use super::{ 521 build_relay_queries, parse_string, parse_u32, parse_u64, resolve_cursor_mode, update_cursor, 522 CursorMode, CursorState, EventBatch, 523 }; 524 use crate::domain::indexer::kind::IndexerEventKind; 525 use crate::relay::event::RelayRawEvent; 526 use crate::relay::record::RelayEventRecord; 527 use crate::utils::db::IndexerDb; 528 use radroots_events::kinds::KIND_JOB_REQUEST_MIN; 529 use std::collections::HashMap; 530 use tempfile::tempdir; 531 532 fn make_record(rowid: u64, event_hash: &str, author: &str, created_at: u32, kind: u32) -> RelayEventRecord { 533 let raw = RelayRawEvent { 534 id: event_hash.to_string(), 535 pubkey: author.to_string(), 536 created_at, 537 kind, 538 tags: Vec::new(), 539 content: "hello".to_string(), 540 sig: "sig".to_string(), 541 }; 542 let content = serde_json::to_string(&raw).expect("json"); 543 RelayEventRecord { 544 rowid: Some(rowid), 545 event_hash: event_hash.to_string(), 546 author: author.to_string(), 547 created_at, 548 kind: IndexerEventKind::try_from(kind as u64).expect("kind"), 549 content, 550 } 551 } 552 553 #[test] 554 fn parse_helpers_reject_invalid_lengths() { 555 assert!(parse_u32(&[0u8; 3], "u32").is_none()); 556 assert!(parse_u64(&[0u8; 7], "u64").is_none()); 557 } 558 559 #[test] 560 fn parse_string_rejects_invalid_utf8() { 561 assert!(parse_string(&[0xff, 0xfe], "str").is_none()); 562 } 563 564 #[test] 565 fn event_batch_groups_job_request_kinds() { 566 let author = "a".repeat(64); 567 let rec = make_record( 568 1, 569 "1".repeat(64).as_str(), 570 &author, 571 10, 572 KIND_JOB_REQUEST_MIN + 1, 573 ); 574 let batch = EventBatch::from_records(vec![rec]).expect("batch"); 575 assert!(batch 576 .events_by_kind 577 .contains_key(&IndexerEventKind::JobRequest(KIND_JOB_REQUEST_MIN))); 578 } 579 580 #[test] 581 fn resolve_cursor_mode_falls_back_without_rowid() { 582 let conn = rusqlite::Connection::open_in_memory().expect("conn"); 583 conn.execute( 584 "CREATE TABLE event (event_hash BLOB PRIMARY KEY, author BLOB, created_at INTEGER, kind INTEGER, content TEXT) WITHOUT ROWID", 585 [], 586 ) 587 .expect("create table"); 588 let queries = build_relay_queries(); 589 let mode = resolve_cursor_mode(&conn, &queries.rowid); 590 assert_eq!(mode, CursorMode::CreatedAt); 591 } 592 593 #[test] 594 fn resolve_cursor_mode_uses_rowid_when_available() { 595 let conn = rusqlite::Connection::open_in_memory().expect("conn"); 596 conn.execute( 597 "CREATE TABLE event (event_hash BLOB, author BLOB, created_at INTEGER, kind INTEGER, content TEXT)", 598 [], 599 ) 600 .expect("create table"); 601 let queries = build_relay_queries(); 602 let mode = resolve_cursor_mode(&conn, &queries.rowid); 603 assert_eq!(mode, CursorMode::RowId); 604 } 605 606 #[test] 607 fn update_cursor_writes_created_at_state() { 608 let dir = tempdir().expect("tempdir"); 609 let db_idx = IndexerDb::open(dir.path().join("db").to_str().expect("path")) 610 .expect("open db"); 611 let mut cursor = CursorState::default(); 612 let mut batch = EventBatch { 613 events_by_kind: HashMap::new(), 614 next_created: Some((42, "hash".to_string())), 615 next_rowid: None, 616 record_count: 0, 617 }; 618 619 let updated = update_cursor(&db_idx, &mut cursor, CursorMode::CreatedAt, &mut batch) 620 .expect("update cursor"); 621 assert!(updated); 622 assert_eq!(cursor.last_created_at, 42); 623 assert_eq!(cursor.last_event_hash, "hash"); 624 db_idx.flush().expect("flush"); 625 let stored = db_idx 626 .get_raw("stats", "last_created_at") 627 .expect("get raw") 628 .expect("value"); 629 assert_eq!(stored.as_ref(), &42u32.to_be_bytes()); 630 } 631 632 #[test] 633 fn update_cursor_writes_rowid_state() { 634 let dir = tempdir().expect("tempdir"); 635 let db_idx = IndexerDb::open(dir.path().join("db").to_str().expect("path")) 636 .expect("open db"); 637 let mut cursor = CursorState::default(); 638 let mut batch = EventBatch { 639 events_by_kind: HashMap::new(), 640 next_created: None, 641 next_rowid: Some(7), 642 record_count: 0, 643 }; 644 645 let updated = update_cursor(&db_idx, &mut cursor, CursorMode::RowId, &mut batch) 646 .expect("update cursor"); 647 assert!(updated); 648 assert_eq!(cursor.last_rowid, 7); 649 db_idx.flush().expect("flush"); 650 let stored = db_idx 651 .get_raw("stats", "last_rowid") 652 .expect("get raw") 653 .expect("value"); 654 assert_eq!(stored.as_ref(), &7u64.to_be_bytes()); 655 } 656 }