lib.rs (16290B)
1 #![forbid(unsafe_code)] 2 3 use radroots_sql_core::{ExecOutcome, SqlError, SqlExecutor, utils}; 4 use serde::de::DeserializeOwned; 5 use serde_json::Value; 6 use std::cell::Cell; 7 use std::sync::atomic::{AtomicBool, Ordering}; 8 use wasm_bindgen::JsValue; 9 10 #[cfg(target_arch = "wasm32")] 11 use wasm_bindgen::prelude::*; 12 13 #[cfg(target_arch = "wasm32")] 14 #[wasm_bindgen] 15 extern "C" { 16 #[wasm_bindgen(js_name = __radroots_sql_wasm_exec)] 17 fn js_exec(sql: &str, params_json: &str) -> JsValue; 18 19 #[wasm_bindgen(js_name = __radroots_sql_wasm_query)] 20 fn js_query(sql: &str, params_json: &str) -> JsValue; 21 22 #[wasm_bindgen(js_name = __radroots_sql_wasm_export_bytes)] 23 fn js_export_bytes() -> JsValue; 24 } 25 26 #[cfg(not(target_arch = "wasm32"))] 27 use std::sync::{Mutex, OnceLock}; 28 29 #[cfg(all(not(target_arch = "wasm32"), test))] 30 use std::collections::VecDeque; 31 32 #[cfg(not(target_arch = "wasm32"))] 33 type RecordedCall = (String, String); 34 35 #[cfg(all(not(target_arch = "wasm32"), test))] 36 type NativeHostResult = Result<Value, SqlError>; 37 38 #[cfg(not(target_arch = "wasm32"))] 39 fn exec_calls() -> &'static Mutex<Vec<RecordedCall>> { 40 static EXEC_CALLS: OnceLock<Mutex<Vec<RecordedCall>>> = OnceLock::new(); 41 EXEC_CALLS.get_or_init(|| Mutex::new(Vec::new())) 42 } 43 44 #[cfg(not(target_arch = "wasm32"))] 45 fn query_calls() -> &'static Mutex<Vec<RecordedCall>> { 46 static QUERY_CALLS: OnceLock<Mutex<Vec<RecordedCall>>> = OnceLock::new(); 47 QUERY_CALLS.get_or_init(|| Mutex::new(Vec::new())) 48 } 49 50 #[cfg(not(target_arch = "wasm32"))] 51 fn export_calls() -> &'static Mutex<u64> { 52 static EXPORT_CALLS: OnceLock<Mutex<u64>> = OnceLock::new(); 53 EXPORT_CALLS.get_or_init(|| Mutex::new(0)) 54 } 55 56 #[cfg(all(not(target_arch = "wasm32"), test))] 57 fn exec_results() -> &'static Mutex<VecDeque<NativeHostResult>> { 58 static EXEC_RESULTS: OnceLock<Mutex<VecDeque<NativeHostResult>>> = OnceLock::new(); 59 EXEC_RESULTS.get_or_init(|| Mutex::new(VecDeque::new())) 60 } 61 62 #[cfg(all(not(target_arch = "wasm32"), test))] 63 fn query_results() -> &'static Mutex<VecDeque<NativeHostResult>> { 64 static QUERY_RESULTS: OnceLock<Mutex<VecDeque<NativeHostResult>>> = OnceLock::new(); 65 QUERY_RESULTS.get_or_init(|| Mutex::new(VecDeque::new())) 66 } 67 68 #[cfg(all(not(target_arch = "wasm32"), test))] 69 fn push_exec_result(result: NativeHostResult) { 70 let mut results = exec_results().lock().expect("exec results lock"); 71 results.push_back(result); 72 } 73 74 #[cfg(all(not(target_arch = "wasm32"), test))] 75 fn push_query_result(result: NativeHostResult) { 76 let mut results = query_results().lock().expect("query results lock"); 77 results.push_back(result); 78 } 79 80 #[cfg(all(not(target_arch = "wasm32"), test))] 81 fn take_exec_result() -> Option<NativeHostResult> { 82 exec_results() 83 .lock() 84 .expect("exec results lock") 85 .pop_front() 86 } 87 88 #[cfg(all(not(target_arch = "wasm32"), test))] 89 fn take_query_result() -> Option<NativeHostResult> { 90 query_results() 91 .lock() 92 .expect("query results lock") 93 .pop_front() 94 } 95 96 #[cfg(not(target_arch = "wasm32"))] 97 fn js_exec(sql: &str, params_json: &str) -> JsValue { 98 let mut calls = exec_calls().lock().expect("exec calls lock"); 99 calls.push((sql.to_string(), params_json.to_string())); 100 JsValue::NULL 101 } 102 103 #[cfg(not(target_arch = "wasm32"))] 104 fn js_query(sql: &str, params_json: &str) -> JsValue { 105 let mut calls = query_calls().lock().expect("query calls lock"); 106 calls.push((sql.to_string(), params_json.to_string())); 107 JsValue::NULL 108 } 109 110 #[cfg(not(target_arch = "wasm32"))] 111 fn js_export_bytes() -> JsValue { 112 let mut calls = export_calls().lock().expect("export calls lock"); 113 *calls += 1; 114 JsValue::NULL 115 } 116 117 const SAVEPOINT: &str = "radroots_schema_tx"; 118 const EXPORT_LOCK_ERR: &str = "replica db export in progress"; 119 120 static EXPORT_LOCK_ACTIVE: AtomicBool = AtomicBool::new(false); 121 122 thread_local! { 123 static EXPORT_LOCK_BYPASS: Cell<bool> = const { Cell::new(false) }; 124 } 125 126 pub struct WasmSqlExecutor; 127 128 impl WasmSqlExecutor { 129 pub fn new() -> Self { 130 Self 131 } 132 } 133 134 impl Default for WasmSqlExecutor { 135 fn default() -> Self { 136 Self::new() 137 } 138 } 139 140 impl SqlExecutor for WasmSqlExecutor { 141 fn exec(&self, sql: &str, params_json: &str) -> Result<ExecOutcome, SqlError> { 142 if export_lock_blocked() { 143 return Err(SqlError::InvalidArgument(EXPORT_LOCK_ERR.to_string())); 144 } 145 let v = host_exec_json(sql, params_json)?; 146 Ok(exec_outcome_from_json(&v)) 147 } 148 149 fn query_raw(&self, sql: &str, params_json: &str) -> Result<String, SqlError> { 150 let v = host_query_json(sql, params_json)?; 151 Ok(query_raw_from_json(&v)) 152 } 153 154 fn begin(&self) -> Result<(), SqlError> { 155 if export_lock_blocked() { 156 return Err(SqlError::InvalidArgument(EXPORT_LOCK_ERR.to_string())); 157 } 158 begin_tx(); 159 Ok(()) 160 } 161 162 fn commit(&self) -> Result<(), SqlError> { 163 if export_lock_blocked() { 164 return Err(SqlError::InvalidArgument(EXPORT_LOCK_ERR.to_string())); 165 } 166 commit_tx(); 167 Ok(()) 168 } 169 170 fn rollback(&self) -> Result<(), SqlError> { 171 if export_lock_blocked() { 172 return Err(SqlError::InvalidArgument(EXPORT_LOCK_ERR.to_string())); 173 } 174 rollback_tx(); 175 Ok(()) 176 } 177 } 178 179 pub fn parse_json<T: DeserializeOwned>(s: &str) -> Result<T, SqlError> { 180 utils::parse_json(s) 181 } 182 183 fn host_exec_json(sql: &str, params_json: &str) -> Result<Value, SqlError> { 184 let js = exec(sql, params_json); 185 #[cfg(all(not(target_arch = "wasm32"), test))] 186 if let Some(result) = take_exec_result() { 187 return result; 188 } 189 sql_value_from_js(js) 190 } 191 192 fn host_query_json(sql: &str, params_json: &str) -> Result<Value, SqlError> { 193 let js = query(sql, params_json); 194 #[cfg(all(not(target_arch = "wasm32"), test))] 195 if let Some(result) = take_query_result() { 196 return result; 197 } 198 sql_value_from_js(js) 199 } 200 201 #[cfg(target_arch = "wasm32")] 202 fn sql_value_from_js(js: JsValue) -> Result<Value, SqlError> { 203 serde_wasm_bindgen::from_value(js).map_err(|e| SqlError::SerializationError(e.to_string())) 204 } 205 206 #[cfg(not(target_arch = "wasm32"))] 207 fn sql_value_from_js(_js: JsValue) -> Result<Value, SqlError> { 208 Err(SqlError::UnsupportedPlatform) 209 } 210 211 fn exec_outcome_from_json(v: &Value) -> ExecOutcome { 212 let changes = v.get("changes").and_then(|x| x.as_i64()).unwrap_or(0); 213 let last_insert_id = v 214 .get("last_insert_id") 215 .or_else(|| v.get("lastInsertRowid")) 216 .and_then(|x| x.as_i64()) 217 .unwrap_or(0); 218 ExecOutcome { 219 changes, 220 last_insert_id, 221 } 222 } 223 224 fn query_raw_from_json(v: &Value) -> String { 225 v.to_string() 226 } 227 228 pub fn err_js(err: SqlError) -> JsValue { 229 err_js_value(err) 230 } 231 232 #[cfg(target_arch = "wasm32")] 233 fn err_js_value(err: SqlError) -> JsValue { 234 match err_js_with_encoder(err, |err| { 235 let value = err.to_json(); 236 serde_wasm_bindgen::to_value(&value).map_err(|_| ()) 237 }) { 238 Ok(value) => value, 239 Err(err) => JsValue::from_str(&err.to_string()), 240 } 241 } 242 243 #[cfg(not(target_arch = "wasm32"))] 244 fn err_js_value(err: SqlError) -> JsValue { 245 let _ = err.to_json(); 246 JsValue::NULL 247 } 248 249 #[cfg(target_arch = "wasm32")] 250 fn err_js_with_encoder( 251 err: SqlError, 252 encode: impl FnOnce(&SqlError) -> Result<JsValue, ()>, 253 ) -> Result<JsValue, SqlError> { 254 encode(&err).map_err(|()| err) 255 } 256 257 pub fn exec(sql: &str, params_json: &str) -> JsValue { 258 js_exec(sql, params_json) 259 } 260 261 pub fn query(sql: &str, params_json: &str) -> JsValue { 262 js_query(sql, params_json) 263 } 264 265 pub fn export_bytes() -> JsValue { 266 js_export_bytes() 267 } 268 269 pub fn begin_tx() { 270 let _ = js_exec(&format!("savepoint {}", SAVEPOINT), "[]"); 271 } 272 273 pub fn commit_tx() { 274 let _ = js_exec(&format!("release savepoint {}", SAVEPOINT), "[]"); 275 } 276 277 pub fn rollback_tx() { 278 let _ = js_exec(&format!("rollback to savepoint {}", SAVEPOINT), "[]"); 279 let _ = js_exec(&format!("release savepoint {}", SAVEPOINT), "[]"); 280 } 281 282 pub fn export_lock_begin() -> Result<(), SqlError> { 283 let was_active = EXPORT_LOCK_ACTIVE.swap(true, Ordering::SeqCst); 284 if was_active { 285 return Err(SqlError::InvalidArgument(EXPORT_LOCK_ERR.to_string())); 286 } 287 Ok(()) 288 } 289 290 pub fn export_lock_end() { 291 EXPORT_LOCK_ACTIVE.store(false, Ordering::SeqCst); 292 } 293 294 pub fn export_lock_active() -> bool { 295 EXPORT_LOCK_ACTIVE.load(Ordering::SeqCst) 296 } 297 298 pub fn with_export_lock_bypass<T>(f: impl FnOnce() -> T) -> T { 299 EXPORT_LOCK_BYPASS.with(|flag| { 300 let prev = flag.replace(true); 301 let out = f(); 302 flag.set(prev); 303 out 304 }) 305 } 306 307 fn export_lock_blocked() -> bool { 308 if !EXPORT_LOCK_ACTIVE.load(Ordering::SeqCst) { 309 return false; 310 } 311 EXPORT_LOCK_BYPASS.with(|flag| !flag.get()) 312 } 313 314 #[cfg(test)] 315 mod tests { 316 use std::{ 317 collections::BTreeMap, 318 sync::{Mutex, OnceLock}, 319 }; 320 321 use radroots_sql_core::{SqlError, SqlExecutor}; 322 use serde_json::json; 323 324 use super::{ 325 WasmSqlExecutor, begin_tx, commit_tx, err_js, exec, exec_calls, exec_outcome_from_json, 326 exec_results, export_bytes, export_calls, export_lock_active, export_lock_begin, 327 export_lock_end, parse_json, push_exec_result, push_query_result, query, query_calls, 328 query_raw_from_json, query_results, rollback_tx, with_export_lock_bypass, 329 }; 330 331 fn native_test_lock() -> &'static Mutex<()> { 332 static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new(); 333 TEST_LOCK.get_or_init(|| Mutex::new(())) 334 } 335 336 fn reset_native_state() { 337 exec_calls().lock().expect("exec calls lock").clear(); 338 query_calls().lock().expect("query calls lock").clear(); 339 *export_calls().lock().expect("export calls lock") = 0; 340 exec_results().lock().expect("exec results lock").clear(); 341 query_results().lock().expect("query results lock").clear(); 342 export_lock_end(); 343 } 344 345 #[test] 346 fn parse_json_reports_valid_and_invalid_payloads() { 347 let parsed: BTreeMap<String, u64> = parse_json(r#"{"count":2}"#).expect("parse json"); 348 assert_eq!(parsed.get("count"), Some(&2)); 349 assert_eq!( 350 parse_json::<BTreeMap<String, u64>>("{").unwrap_err().code(), 351 "ERR_SERIALIZATION" 352 ); 353 } 354 355 #[test] 356 fn err_js_accepts_sql_errors() { 357 let _ = err_js(SqlError::Internal); 358 let _ = err_js(SqlError::UnsupportedPlatform); 359 } 360 361 #[test] 362 fn exec_query_export_delegate_to_js_hooks() { 363 let _guard = native_test_lock().lock().expect("native test lock"); 364 reset_native_state(); 365 366 let _ = exec("select 1", "[]"); 367 let _ = query("select 2", "[1]"); 368 let _ = export_bytes(); 369 370 let exec_len = exec_calls().lock().map(|calls| calls.len()).unwrap_or(0); 371 let query_len = query_calls().lock().map(|calls| calls.len()).unwrap_or(0); 372 let export_len = export_calls().lock().map(|calls| *calls).unwrap_or(0); 373 assert!(exec_len >= 1); 374 assert!(query_len >= 1); 375 assert!(export_len >= 1); 376 } 377 378 #[test] 379 fn tx_helpers_emit_expected_savepoint_statements() { 380 let _guard = native_test_lock().lock().expect("native test lock"); 381 reset_native_state(); 382 383 begin_tx(); 384 commit_tx(); 385 rollback_tx(); 386 387 let calls = exec_calls() 388 .lock() 389 .map(|calls| calls.clone()) 390 .unwrap_or_default(); 391 assert!( 392 calls 393 .iter() 394 .any(|(sql, _)| sql == "savepoint radroots_schema_tx") 395 ); 396 assert!( 397 calls 398 .iter() 399 .any(|(sql, _)| sql == "release savepoint radroots_schema_tx") 400 ); 401 assert!( 402 calls 403 .iter() 404 .any(|(sql, _)| sql == "rollback to savepoint radroots_schema_tx") 405 ); 406 } 407 408 #[test] 409 fn export_lock_tracks_state() { 410 let _guard = native_test_lock().lock().expect("native test lock"); 411 reset_native_state(); 412 413 assert!(!export_lock_active()); 414 export_lock_begin().expect("begin export lock"); 415 assert!(export_lock_active()); 416 assert!(with_export_lock_bypass(|| true)); 417 export_lock_end(); 418 assert!(!export_lock_active()); 419 } 420 421 #[test] 422 fn executor_decodes_exec_outcomes() { 423 let _guard = native_test_lock().lock().expect("native test lock"); 424 reset_native_state(); 425 426 let executor = WasmSqlExecutor::default(); 427 push_exec_result(Ok(json!({"changes": 2, "lastInsertRowid": 99}))); 428 let outcome = executor 429 .exec("insert into listing values (?)", r#"["bin-1"]"#) 430 .expect("exec outcome"); 431 assert_eq!(outcome.changes, 2); 432 assert_eq!(outcome.last_insert_id, 99); 433 434 push_exec_result(Ok(json!({"changes": 3, "last_insert_id": 101}))); 435 let outcome = executor 436 .exec("update listing set qty = ?", "[1]") 437 .expect("exec outcome"); 438 assert_eq!(outcome.changes, 3); 439 assert_eq!(outcome.last_insert_id, 101); 440 441 let default_outcome = exec_outcome_from_json(&json!({})); 442 assert_eq!(default_outcome.changes, 0); 443 assert_eq!(default_outcome.last_insert_id, 0); 444 445 let calls = exec_calls().lock().expect("exec calls lock").clone(); 446 assert!(calls.iter().any(|(sql, params)| { 447 sql == "insert into listing values (?)" && params == r#"["bin-1"]"# 448 })); 449 } 450 451 #[test] 452 fn executor_decodes_query_results_and_host_errors() { 453 let _guard = native_test_lock().lock().expect("native test lock"); 454 reset_native_state(); 455 456 let executor = WasmSqlExecutor::new(); 457 let rows = json!([{"id": "listing-1"}]); 458 push_query_result(Ok(rows.clone())); 459 assert_eq!( 460 executor 461 .query_raw("select id from listing", "[]") 462 .expect("query rows"), 463 query_raw_from_json(&rows) 464 ); 465 466 assert_eq!( 467 executor 468 .query_raw("select id from listing", "[]") 469 .unwrap_err() 470 .code(), 471 "ERR_UNSUPPORTED_PLATFORM" 472 ); 473 474 push_exec_result(Err(SqlError::SerializationError( 475 "host response was not an object".to_string(), 476 ))); 477 assert_eq!( 478 executor 479 .exec("insert into listing values (?)", "[]") 480 .unwrap_err() 481 .code(), 482 "ERR_SERIALIZATION" 483 ); 484 assert_eq!( 485 executor 486 .exec("insert into listing values (?)", "[]") 487 .unwrap_err() 488 .code(), 489 "ERR_UNSUPPORTED_PLATFORM" 490 ); 491 } 492 493 #[test] 494 fn export_lock_rejects_nested_lock_and_write_trait_calls() { 495 let _guard = native_test_lock().lock().expect("native test lock"); 496 reset_native_state(); 497 498 let executor = WasmSqlExecutor::new(); 499 export_lock_begin().expect("begin export lock"); 500 assert_eq!( 501 export_lock_begin().unwrap_err().to_string(), 502 "invalid argument: replica db export in progress" 503 ); 504 assert_eq!( 505 executor 506 .exec("insert into listing values (?)", "[]") 507 .unwrap_err() 508 .code(), 509 "ERR_INVALID_ARGUMENT" 510 ); 511 assert_eq!(executor.begin().unwrap_err().code(), "ERR_INVALID_ARGUMENT"); 512 assert_eq!( 513 executor.commit().unwrap_err().code(), 514 "ERR_INVALID_ARGUMENT" 515 ); 516 assert_eq!( 517 executor.rollback().unwrap_err().code(), 518 "ERR_INVALID_ARGUMENT" 519 ); 520 521 with_export_lock_bypass(|| { 522 push_exec_result(Ok(json!({"changes": 1, "last_insert_id": 7}))); 523 let outcome = executor 524 .exec("insert into listing values (?)", "[]") 525 .expect("bypassed exec"); 526 assert_eq!(outcome.changes, 1); 527 executor.begin().expect("bypassed begin"); 528 executor.commit().expect("bypassed commit"); 529 executor.rollback().expect("bypassed rollback"); 530 }); 531 assert!(export_lock_active()); 532 export_lock_end(); 533 } 534 }