lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

sync_state.rs (11410B)


      1 #[cfg(not(feature = "std"))]
      2 use alloc::{collections::BTreeMap, string::String, string::ToString, vec::Vec};
      3 #[cfg(feature = "std")]
      4 use std::collections::BTreeMap;
      5 
      6 use radroots_replica_db_schema::farm::IFarmFindMany;
      7 use radroots_replica_db_schema::nostr_event_head::INostrEventHeadFindMany;
      8 use radroots_sql_core::SqlExecutor;
      9 
     10 use crate::error::RadrootsReplicaEventsError;
     11 use crate::event_head::{event_content_hash, event_head_key, tag_value};
     12 use crate::types::{RadrootsReplicaEventDraft, RadrootsReplicaFarmSelector};
     13 
     14 #[derive(Clone, Debug)]
     15 pub struct RadrootsReplicaSyncStatus {
     16     pub expected_count: usize,
     17     pub pending_count: usize,
     18 }
     19 
     20 #[derive(Clone, Debug)]
     21 pub struct RadrootsReplicaPendingPublishEvent {
     22     pub key: String,
     23     pub kind: u32,
     24     pub author: String,
     25     pub d_tag: String,
     26     pub content_hash: String,
     27     pub draft: RadrootsReplicaEventDraft,
     28 }
     29 
     30 #[derive(Clone, Debug)]
     31 pub struct RadrootsReplicaPendingPublishBatch {
     32     pub expected_count: usize,
     33     pub pending_count: usize,
     34     pub pending_events: Vec<RadrootsReplicaPendingPublishEvent>,
     35 }
     36 
     37 pub fn radroots_replica_sync_status<E: SqlExecutor>(
     38     exec: &E,
     39 ) -> Result<RadrootsReplicaSyncStatus, RadrootsReplicaEventsError> {
     40     let batch = radroots_replica_pending_publish_batch(exec)?;
     41     Ok(RadrootsReplicaSyncStatus {
     42         expected_count: batch.expected_count,
     43         pending_count: batch.pending_count,
     44     })
     45 }
     46 
     47 pub fn radroots_replica_pending_publish_batch<E: SqlExecutor>(
     48     exec: &E,
     49 ) -> Result<RadrootsReplicaPendingPublishBatch, RadrootsReplicaEventsError> {
     50     let farms =
     51         radroots_replica_db::farm::find_many(exec, &IFarmFindMany { filter: None })?.results;
     52     let mut expected: BTreeMap<String, RadrootsReplicaPendingPublishEvent> = BTreeMap::new();
     53 
     54     for farm in farms {
     55         let selector = RadrootsReplicaFarmSelector {
     56             id: Some(farm.id),
     57             d_tag: None,
     58             pubkey: None,
     59         };
     60         let bundle = crate::emit::radroots_replica_sync_all_with_options(exec, &selector, None)?;
     61         for event in bundle.events {
     62             let d_tag = tag_value(&event.tags, "d").unwrap_or("");
     63             let key = event_head_key(event.kind, &event.author, d_tag);
     64             let content_hash = draft_content_hash(&event)?;
     65             expected
     66                 .entry(key.clone())
     67                 .or_insert(RadrootsReplicaPendingPublishEvent {
     68                     key,
     69                     kind: event.kind,
     70                     author: event.author.clone(),
     71                     d_tag: d_tag.to_string(),
     72                     content_hash,
     73                     draft: event,
     74                 });
     75         }
     76     }
     77 
     78     let states_query = radroots_replica_db::nostr_event_head::find_many(
     79         exec,
     80         &INostrEventHeadFindMany { filter: None },
     81     );
     82     let states_result = states_query?;
     83     let states = states_result.results;
     84 
     85     let mut state_map: BTreeMap<String, String> = BTreeMap::new();
     86     for state in states {
     87         state_map.insert(state.key, state.content_hash);
     88     }
     89 
     90     let mut pending_events = Vec::new();
     91     for (key, event) in expected.iter() {
     92         match state_map.get(key) {
     93             Some(existing) if existing == &event.content_hash => {}
     94             _ => pending_events.push(event.clone()),
     95         }
     96     }
     97 
     98     Ok(RadrootsReplicaPendingPublishBatch {
     99         expected_count: expected.len(),
    100         pending_count: pending_events.len(),
    101         pending_events,
    102     })
    103 }
    104 
    105 fn draft_content_hash(
    106     event: &RadrootsReplicaEventDraft,
    107 ) -> Result<String, RadrootsReplicaEventsError> {
    108     #[cfg(test)]
    109     {
    110         event_content_hash(&event.content, &event.tags)
    111     }
    112     #[cfg(not(test))]
    113     {
    114         Ok(event_content_hash(&event.content, &event.tags))
    115     }
    116 }
    117 
    118 #[cfg(test)]
    119 mod tests {
    120     use super::{radroots_replica_pending_publish_batch, radroots_replica_sync_status};
    121     use crate::emit::radroots_replica_sync_all_with_options;
    122     use crate::event_head::{
    123         event_content_hash, event_content_hash_fail_next, event_head_key, tag_value,
    124     };
    125     use crate::types::RadrootsReplicaFarmSelector;
    126     use radroots_replica_db::{farm, migrations, nostr_event_head};
    127     use radroots_replica_db_schema::farm::IFarmFields;
    128     use radroots_replica_db_schema::nostr_event_head::INostrEventHeadFields;
    129     use radroots_sql_core::{SqlExecutor, SqliteExecutor};
    130 
    131     #[test]
    132     fn sync_status_empty_db_is_zero() {
    133         let exec = SqliteExecutor::open_memory().expect("db");
    134         migrations::run_all_up(&exec).expect("migrations");
    135         let status = radroots_replica_sync_status(&exec).expect("status");
    136         assert_eq!(status.expected_count, 0);
    137         assert_eq!(status.pending_count, 0);
    138     }
    139 
    140     #[test]
    141     fn sync_status_tracks_expected_and_pending() {
    142         let exec = SqliteExecutor::open_memory().expect("db");
    143         migrations::run_all_up(&exec).expect("migrations");
    144 
    145         let farm_row = farm::create(
    146             &exec,
    147             &IFarmFields {
    148                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
    149                 pubkey: "f".repeat(64),
    150                 name: "farm".to_string(),
    151                 about: None,
    152                 website: None,
    153                 picture: None,
    154                 banner: None,
    155                 location_primary: None,
    156                 location_city: None,
    157                 location_region: None,
    158                 location_country: None,
    159             },
    160         )
    161         .expect("farm")
    162         .result;
    163 
    164         let selector = RadrootsReplicaFarmSelector {
    165             id: Some(farm_row.id.clone()),
    166             d_tag: None,
    167             pubkey: None,
    168         };
    169         let bundle =
    170             radroots_replica_sync_all_with_options(&exec, &selector, None).expect("bundle");
    171         let expected_count = bundle.events.len();
    172         let first = bundle.events.first().expect("event");
    173         let d_tag = tag_value(&first.tags, "d").unwrap_or("");
    174         let key = event_head_key(first.kind, &first.author, d_tag);
    175         let content_hash = event_content_hash(&first.content, &first.tags).expect("hash");
    176         let fields = INostrEventHeadFields {
    177             key,
    178             kind: first.kind,
    179             pubkey: first.author.clone(),
    180             d_tag: d_tag.to_string(),
    181             last_event_id: format!("{:064x}", 1u64),
    182             last_created_at: 1,
    183             content_hash,
    184         };
    185         let _ = nostr_event_head::create(&exec, &fields).expect("state");
    186 
    187         let status = radroots_replica_sync_status(&exec).expect("status");
    188         assert_eq!(status.expected_count, expected_count);
    189         assert_eq!(status.pending_count, expected_count.saturating_sub(1));
    190     }
    191 
    192     #[test]
    193     fn pending_publish_batch_lists_only_missing_or_changed_expected_events() {
    194         let exec = SqliteExecutor::open_memory().expect("db");
    195         migrations::run_all_up(&exec).expect("migrations");
    196 
    197         let farm_row = farm::create(
    198             &exec,
    199             &IFarmFields {
    200                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
    201                 pubkey: "a".repeat(64),
    202                 name: "farm".to_string(),
    203                 about: None,
    204                 website: None,
    205                 picture: None,
    206                 banner: None,
    207                 location_primary: None,
    208                 location_city: None,
    209                 location_region: None,
    210                 location_country: None,
    211             },
    212         )
    213         .expect("farm")
    214         .result;
    215 
    216         let selector = RadrootsReplicaFarmSelector {
    217             id: Some(farm_row.id.clone()),
    218             d_tag: None,
    219             pubkey: None,
    220         };
    221         let bundle =
    222             radroots_replica_sync_all_with_options(&exec, &selector, None).expect("bundle");
    223         let first = bundle.events.first().expect("event");
    224         let d_tag = tag_value(&first.tags, "d").unwrap_or("");
    225         let key = event_head_key(first.kind, &first.author, d_tag);
    226         let content_hash = event_content_hash(&first.content, &first.tags).expect("hash");
    227         let fields = INostrEventHeadFields {
    228             key: key.clone(),
    229             kind: first.kind,
    230             pubkey: first.author.clone(),
    231             d_tag: d_tag.to_string(),
    232             last_event_id: format!("{:064x}", 1u64),
    233             last_created_at: 1,
    234             content_hash,
    235         };
    236         let _ = nostr_event_head::create(&exec, &fields).expect("state");
    237 
    238         let batch = radroots_replica_pending_publish_batch(&exec).expect("batch");
    239 
    240         assert_eq!(batch.expected_count, bundle.events.len());
    241         assert_eq!(batch.pending_count, bundle.events.len().saturating_sub(1));
    242         for event in &batch.pending_events {
    243             assert_ne!(event.key, key);
    244             assert_eq!(event.content_hash.len(), 64);
    245         }
    246     }
    247 
    248     #[test]
    249     fn sync_status_reports_farm_query_errors() {
    250         let exec = SqliteExecutor::open_memory().expect("db");
    251         let err = radroots_replica_sync_status(&exec).expect_err("farm query error");
    252         assert!(err.to_string().contains("invalid query"));
    253     }
    254 
    255     #[test]
    256     fn sync_status_reports_emit_errors() {
    257         let exec = SqliteExecutor::open_memory().expect("db");
    258         migrations::run_all_up(&exec).expect("migrations");
    259         let _ = farm::create(
    260             &exec,
    261             &IFarmFields {
    262                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
    263                 pubkey: "b".repeat(64),
    264                 name: "farm".to_string(),
    265                 about: None,
    266                 website: None,
    267                 picture: None,
    268                 banner: None,
    269                 location_primary: None,
    270                 location_city: None,
    271                 location_region: None,
    272                 location_country: None,
    273             },
    274         )
    275         .expect("farm");
    276         let _ = exec
    277             .exec("DROP TABLE farm_tag;", "[]")
    278             .expect("drop farm_tag");
    279         let err = radroots_replica_sync_status(&exec).expect_err("emit error");
    280         assert!(err.to_string().contains("invalid query"));
    281     }
    282 
    283     #[test]
    284     fn sync_status_reports_content_hash_errors() {
    285         let exec = SqliteExecutor::open_memory().expect("db");
    286         migrations::run_all_up(&exec).expect("migrations");
    287         let _ = farm::create(
    288             &exec,
    289             &IFarmFields {
    290                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
    291                 pubkey: "c".repeat(64),
    292                 name: "farm".to_string(),
    293                 about: None,
    294                 website: None,
    295                 picture: None,
    296                 banner: None,
    297                 location_primary: None,
    298                 location_city: None,
    299                 location_region: None,
    300                 location_country: None,
    301             },
    302         )
    303         .expect("farm");
    304         event_content_hash_fail_next();
    305         let err = radroots_replica_sync_status(&exec).expect_err("content hash error");
    306         assert!(err.to_string().contains("content_hash"));
    307     }
    308 
    309     #[test]
    310     fn sync_status_reports_state_query_errors() {
    311         let exec = SqliteExecutor::open_memory().expect("db");
    312         migrations::run_all_up(&exec).expect("migrations");
    313         let _ = exec
    314             .exec("DROP TABLE nostr_event_head;", "[]")
    315             .expect("drop nostr_event_head");
    316         let err = radroots_replica_sync_status(&exec).expect_err("state query error");
    317         assert!(err.to_string().contains("invalid query"));
    318     }
    319 }