sync_state.rs (11410B)
1 #[cfg(not(feature = "std"))] 2 use alloc::{collections::BTreeMap, string::String, string::ToString, vec::Vec}; 3 #[cfg(feature = "std")] 4 use std::collections::BTreeMap; 5 6 use radroots_replica_db_schema::farm::IFarmFindMany; 7 use radroots_replica_db_schema::nostr_event_head::INostrEventHeadFindMany; 8 use radroots_sql_core::SqlExecutor; 9 10 use crate::error::RadrootsReplicaEventsError; 11 use crate::event_head::{event_content_hash, event_head_key, tag_value}; 12 use crate::types::{RadrootsReplicaEventDraft, RadrootsReplicaFarmSelector}; 13 14 #[derive(Clone, Debug)] 15 pub struct RadrootsReplicaSyncStatus { 16 pub expected_count: usize, 17 pub pending_count: usize, 18 } 19 20 #[derive(Clone, Debug)] 21 pub struct RadrootsReplicaPendingPublishEvent { 22 pub key: String, 23 pub kind: u32, 24 pub author: String, 25 pub d_tag: String, 26 pub content_hash: String, 27 pub draft: RadrootsReplicaEventDraft, 28 } 29 30 #[derive(Clone, Debug)] 31 pub struct RadrootsReplicaPendingPublishBatch { 32 pub expected_count: usize, 33 pub pending_count: usize, 34 pub pending_events: Vec<RadrootsReplicaPendingPublishEvent>, 35 } 36 37 pub fn radroots_replica_sync_status<E: SqlExecutor>( 38 exec: &E, 39 ) -> Result<RadrootsReplicaSyncStatus, RadrootsReplicaEventsError> { 40 let batch = radroots_replica_pending_publish_batch(exec)?; 41 Ok(RadrootsReplicaSyncStatus { 42 expected_count: batch.expected_count, 43 pending_count: batch.pending_count, 44 }) 45 } 46 47 pub fn radroots_replica_pending_publish_batch<E: SqlExecutor>( 48 exec: &E, 49 ) -> Result<RadrootsReplicaPendingPublishBatch, RadrootsReplicaEventsError> { 50 let farms = 51 radroots_replica_db::farm::find_many(exec, &IFarmFindMany { filter: None })?.results; 52 let mut expected: BTreeMap<String, RadrootsReplicaPendingPublishEvent> = BTreeMap::new(); 53 54 for farm in farms { 55 let selector = RadrootsReplicaFarmSelector { 56 id: Some(farm.id), 57 d_tag: None, 58 pubkey: None, 59 }; 60 let bundle = crate::emit::radroots_replica_sync_all_with_options(exec, &selector, None)?; 61 for event in bundle.events { 62 let d_tag = tag_value(&event.tags, "d").unwrap_or(""); 63 let key = event_head_key(event.kind, &event.author, d_tag); 64 let content_hash = draft_content_hash(&event)?; 65 expected 66 .entry(key.clone()) 67 .or_insert(RadrootsReplicaPendingPublishEvent { 68 key, 69 kind: event.kind, 70 author: event.author.clone(), 71 d_tag: d_tag.to_string(), 72 content_hash, 73 draft: event, 74 }); 75 } 76 } 77 78 let states_query = radroots_replica_db::nostr_event_head::find_many( 79 exec, 80 &INostrEventHeadFindMany { filter: None }, 81 ); 82 let states_result = states_query?; 83 let states = states_result.results; 84 85 let mut state_map: BTreeMap<String, String> = BTreeMap::new(); 86 for state in states { 87 state_map.insert(state.key, state.content_hash); 88 } 89 90 let mut pending_events = Vec::new(); 91 for (key, event) in expected.iter() { 92 match state_map.get(key) { 93 Some(existing) if existing == &event.content_hash => {} 94 _ => pending_events.push(event.clone()), 95 } 96 } 97 98 Ok(RadrootsReplicaPendingPublishBatch { 99 expected_count: expected.len(), 100 pending_count: pending_events.len(), 101 pending_events, 102 }) 103 } 104 105 fn draft_content_hash( 106 event: &RadrootsReplicaEventDraft, 107 ) -> Result<String, RadrootsReplicaEventsError> { 108 #[cfg(test)] 109 { 110 event_content_hash(&event.content, &event.tags) 111 } 112 #[cfg(not(test))] 113 { 114 Ok(event_content_hash(&event.content, &event.tags)) 115 } 116 } 117 118 #[cfg(test)] 119 mod tests { 120 use super::{radroots_replica_pending_publish_batch, radroots_replica_sync_status}; 121 use crate::emit::radroots_replica_sync_all_with_options; 122 use crate::event_head::{ 123 event_content_hash, event_content_hash_fail_next, event_head_key, tag_value, 124 }; 125 use crate::types::RadrootsReplicaFarmSelector; 126 use radroots_replica_db::{farm, migrations, nostr_event_head}; 127 use radroots_replica_db_schema::farm::IFarmFields; 128 use radroots_replica_db_schema::nostr_event_head::INostrEventHeadFields; 129 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 130 131 #[test] 132 fn sync_status_empty_db_is_zero() { 133 let exec = SqliteExecutor::open_memory().expect("db"); 134 migrations::run_all_up(&exec).expect("migrations"); 135 let status = radroots_replica_sync_status(&exec).expect("status"); 136 assert_eq!(status.expected_count, 0); 137 assert_eq!(status.pending_count, 0); 138 } 139 140 #[test] 141 fn sync_status_tracks_expected_and_pending() { 142 let exec = SqliteExecutor::open_memory().expect("db"); 143 migrations::run_all_up(&exec).expect("migrations"); 144 145 let farm_row = farm::create( 146 &exec, 147 &IFarmFields { 148 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(), 149 pubkey: "f".repeat(64), 150 name: "farm".to_string(), 151 about: None, 152 website: None, 153 picture: None, 154 banner: None, 155 location_primary: None, 156 location_city: None, 157 location_region: None, 158 location_country: None, 159 }, 160 ) 161 .expect("farm") 162 .result; 163 164 let selector = RadrootsReplicaFarmSelector { 165 id: Some(farm_row.id.clone()), 166 d_tag: None, 167 pubkey: None, 168 }; 169 let bundle = 170 radroots_replica_sync_all_with_options(&exec, &selector, None).expect("bundle"); 171 let expected_count = bundle.events.len(); 172 let first = bundle.events.first().expect("event"); 173 let d_tag = tag_value(&first.tags, "d").unwrap_or(""); 174 let key = event_head_key(first.kind, &first.author, d_tag); 175 let content_hash = event_content_hash(&first.content, &first.tags).expect("hash"); 176 let fields = INostrEventHeadFields { 177 key, 178 kind: first.kind, 179 pubkey: first.author.clone(), 180 d_tag: d_tag.to_string(), 181 last_event_id: format!("{:064x}", 1u64), 182 last_created_at: 1, 183 content_hash, 184 }; 185 let _ = nostr_event_head::create(&exec, &fields).expect("state"); 186 187 let status = radroots_replica_sync_status(&exec).expect("status"); 188 assert_eq!(status.expected_count, expected_count); 189 assert_eq!(status.pending_count, expected_count.saturating_sub(1)); 190 } 191 192 #[test] 193 fn pending_publish_batch_lists_only_missing_or_changed_expected_events() { 194 let exec = SqliteExecutor::open_memory().expect("db"); 195 migrations::run_all_up(&exec).expect("migrations"); 196 197 let farm_row = farm::create( 198 &exec, 199 &IFarmFields { 200 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(), 201 pubkey: "a".repeat(64), 202 name: "farm".to_string(), 203 about: None, 204 website: None, 205 picture: None, 206 banner: None, 207 location_primary: None, 208 location_city: None, 209 location_region: None, 210 location_country: None, 211 }, 212 ) 213 .expect("farm") 214 .result; 215 216 let selector = RadrootsReplicaFarmSelector { 217 id: Some(farm_row.id.clone()), 218 d_tag: None, 219 pubkey: None, 220 }; 221 let bundle = 222 radroots_replica_sync_all_with_options(&exec, &selector, None).expect("bundle"); 223 let first = bundle.events.first().expect("event"); 224 let d_tag = tag_value(&first.tags, "d").unwrap_or(""); 225 let key = event_head_key(first.kind, &first.author, d_tag); 226 let content_hash = event_content_hash(&first.content, &first.tags).expect("hash"); 227 let fields = INostrEventHeadFields { 228 key: key.clone(), 229 kind: first.kind, 230 pubkey: first.author.clone(), 231 d_tag: d_tag.to_string(), 232 last_event_id: format!("{:064x}", 1u64), 233 last_created_at: 1, 234 content_hash, 235 }; 236 let _ = nostr_event_head::create(&exec, &fields).expect("state"); 237 238 let batch = radroots_replica_pending_publish_batch(&exec).expect("batch"); 239 240 assert_eq!(batch.expected_count, bundle.events.len()); 241 assert_eq!(batch.pending_count, bundle.events.len().saturating_sub(1)); 242 for event in &batch.pending_events { 243 assert_ne!(event.key, key); 244 assert_eq!(event.content_hash.len(), 64); 245 } 246 } 247 248 #[test] 249 fn sync_status_reports_farm_query_errors() { 250 let exec = SqliteExecutor::open_memory().expect("db"); 251 let err = radroots_replica_sync_status(&exec).expect_err("farm query error"); 252 assert!(err.to_string().contains("invalid query")); 253 } 254 255 #[test] 256 fn sync_status_reports_emit_errors() { 257 let exec = SqliteExecutor::open_memory().expect("db"); 258 migrations::run_all_up(&exec).expect("migrations"); 259 let _ = farm::create( 260 &exec, 261 &IFarmFields { 262 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(), 263 pubkey: "b".repeat(64), 264 name: "farm".to_string(), 265 about: None, 266 website: None, 267 picture: None, 268 banner: None, 269 location_primary: None, 270 location_city: None, 271 location_region: None, 272 location_country: None, 273 }, 274 ) 275 .expect("farm"); 276 let _ = exec 277 .exec("DROP TABLE farm_tag;", "[]") 278 .expect("drop farm_tag"); 279 let err = radroots_replica_sync_status(&exec).expect_err("emit error"); 280 assert!(err.to_string().contains("invalid query")); 281 } 282 283 #[test] 284 fn sync_status_reports_content_hash_errors() { 285 let exec = SqliteExecutor::open_memory().expect("db"); 286 migrations::run_all_up(&exec).expect("migrations"); 287 let _ = farm::create( 288 &exec, 289 &IFarmFields { 290 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(), 291 pubkey: "c".repeat(64), 292 name: "farm".to_string(), 293 about: None, 294 website: None, 295 picture: None, 296 banner: None, 297 location_primary: None, 298 location_city: None, 299 location_region: None, 300 location_country: None, 301 }, 302 ) 303 .expect("farm"); 304 event_content_hash_fail_next(); 305 let err = radroots_replica_sync_status(&exec).expect_err("content hash error"); 306 assert!(err.to_string().contains("content_hash")); 307 } 308 309 #[test] 310 fn sync_status_reports_state_query_errors() { 311 let exec = SqliteExecutor::open_memory().expect("db"); 312 migrations::run_all_up(&exec).expect("migrations"); 313 let _ = exec 314 .exec("DROP TABLE nostr_event_head;", "[]") 315 .expect("drop nostr_event_head"); 316 let err = radroots_replica_sync_status(&exec).expect_err("state query error"); 317 assert!(err.to_string().contains("invalid query")); 318 } 319 }