local_events.rs (7214B)
1 use std::fs; 2 use std::path::PathBuf; 3 use std::sync::atomic::{AtomicU64, Ordering}; 4 use std::time::{SystemTime, UNIX_EPOCH}; 5 6 use radroots_local_events::{ 7 LocalEventRecord, LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, 8 LocalRecordStatus, PublishOutboxStatus, SourceRuntime, 9 }; 10 use radroots_runtime_paths::{ 11 default_shared_local_events_database_path_from_shared_accounts_data_root, 12 default_shared_local_events_root_from_shared_accounts_data_root, 13 }; 14 use radroots_sql_core::SqliteExecutor; 15 use serde_json::Value; 16 17 use crate::runtime::RuntimeError; 18 use crate::runtime::config::{PathsConfig, RuntimeConfig}; 19 20 static RECORD_COUNTER: AtomicU64 = AtomicU64::new(0); 21 22 pub fn append_local_work( 23 config: &RuntimeConfig, 24 subject: &str, 25 owner_account_id: Option<String>, 26 owner_pubkey: Option<String>, 27 farm_id: Option<String>, 28 listing_addr: Option<String>, 29 payload: Value, 30 ) -> Result<LocalEventRecord, RuntimeError> { 31 let timestamp = current_time_ms()?; 32 let sequence = RECORD_COUNTER.fetch_add(1, Ordering::Relaxed); 33 let input = LocalEventRecordInput { 34 record_id: format!("cli:local_work:{subject}:{timestamp}:{sequence}"), 35 family: LocalRecordFamily::LocalWork, 36 status: LocalRecordStatus::LocalSaved, 37 source_runtime: SourceRuntime::Cli, 38 created_at_ms: timestamp, 39 inserted_at_ms: timestamp, 40 owner_account_id, 41 owner_pubkey, 42 farm_id, 43 listing_addr, 44 local_work_json: Some(payload), 45 event_id: None, 46 event_kind: None, 47 event_pubkey: None, 48 event_created_at: None, 49 event_tags_json: None, 50 event_content: None, 51 event_sig: None, 52 raw_event_json: None, 53 outbox_status: PublishOutboxStatus::None, 54 relay_set_fingerprint: None, 55 relay_delivery_json: None, 56 }; 57 let store = open_store(config)?; 58 Ok(store.append_record(&input)?) 59 } 60 61 pub fn shared_local_events_db_path(config: &RuntimeConfig) -> Result<PathBuf, RuntimeError> { 62 shared_local_events_db_path_from_paths(&config.paths) 63 } 64 65 fn shared_local_events_db_path_from_paths(paths: &PathsConfig) -> Result<PathBuf, RuntimeError> { 66 default_shared_local_events_database_path_from_shared_accounts_data_root( 67 &paths.shared_accounts_data_root, 68 ) 69 .map_err(|err| { 70 RuntimeError::Config(format!("resolve shared local-events database path: {err}")) 71 }) 72 } 73 74 pub fn list_shared_records_latest( 75 config: &RuntimeConfig, 76 limit: u32, 77 ) -> Result<Vec<LocalEventRecord>, RuntimeError> { 78 let database_path = shared_local_events_db_path(config)?; 79 if !database_path.exists() { 80 return Ok(Vec::new()); 81 } 82 let executor = SqliteExecutor::open(database_path)?; 83 let store = LocalEventsStore::new(executor); 84 Ok(store.list_records_changed_latest(limit)?) 85 } 86 87 pub fn list_shared_records_before( 88 config: &RuntimeConfig, 89 before_change_seq: i64, 90 before_seq: i64, 91 limit: u32, 92 ) -> Result<Vec<LocalEventRecord>, RuntimeError> { 93 let database_path = shared_local_events_db_path(config)?; 94 if !database_path.exists() { 95 return Ok(Vec::new()); 96 } 97 let executor = SqliteExecutor::open(database_path)?; 98 let store = LocalEventsStore::new(executor); 99 Ok(store.list_records_changed_before(before_change_seq, before_seq, limit)?) 100 } 101 102 pub fn get_shared_record( 103 config: &RuntimeConfig, 104 record_id: &str, 105 ) -> Result<Option<LocalEventRecord>, RuntimeError> { 106 let database_path = shared_local_events_db_path(config)?; 107 if !database_path.exists() { 108 return Ok(None); 109 } 110 let executor = SqliteExecutor::open(database_path)?; 111 let store = LocalEventsStore::new(executor); 112 Ok(store.get_record(record_id)?) 113 } 114 115 fn open_store(config: &RuntimeConfig) -> Result<LocalEventsStore<SqliteExecutor>, RuntimeError> { 116 let root = shared_local_events_root_from_paths(&config.paths)?; 117 fs::create_dir_all(&root)?; 118 let executor = SqliteExecutor::open(shared_local_events_db_path_from_paths(&config.paths)?)?; 119 let store = LocalEventsStore::new(executor); 120 store.migrate_up()?; 121 Ok(store) 122 } 123 124 fn shared_local_events_root_from_paths(paths: &PathsConfig) -> Result<PathBuf, RuntimeError> { 125 default_shared_local_events_root_from_shared_accounts_data_root( 126 &paths.shared_accounts_data_root, 127 ) 128 .map_err(|err| RuntimeError::Config(format!("resolve shared local-events root: {err}"))) 129 } 130 131 #[cfg(test)] 132 mod tests { 133 use std::path::PathBuf; 134 135 use super::{shared_local_events_db_path_from_paths, shared_local_events_root_from_paths}; 136 use crate::runtime::config::PathsConfig; 137 138 #[test] 139 fn shared_local_events_paths_use_shared_runtime_contract() { 140 let paths = paths_config("/repo/infra/local/runtime/radroots/data/shared/accounts"); 141 142 assert_eq!( 143 shared_local_events_root_from_paths(&paths).expect("shared local-events root"), 144 PathBuf::from("/repo/infra/local/runtime/radroots/data/shared/local_events") 145 ); 146 assert_eq!( 147 shared_local_events_db_path_from_paths(&paths).expect("shared local-events database"), 148 PathBuf::from( 149 "/repo/infra/local/runtime/radroots/data/shared/local_events/local_events.sqlite" 150 ) 151 ); 152 } 153 154 fn paths_config(shared_accounts_data_root: &str) -> PathsConfig { 155 PathsConfig { 156 profile: "repo_local".to_owned(), 157 profile_source: "test".to_owned(), 158 allowed_profiles: vec!["repo_local".to_owned()], 159 root_source: "repo_local_root".to_owned(), 160 repo_local_root: Some(PathBuf::from("/repo/infra/local/runtime/radroots")), 161 repo_local_root_source: Some("test".to_owned()), 162 subordinate_path_override_source: "runtime_config".to_owned(), 163 app_namespace: "apps/cli".to_owned(), 164 shared_accounts_namespace: "shared/accounts".to_owned(), 165 shared_identities_namespace: "shared/identities".to_owned(), 166 app_config_path: PathBuf::from( 167 "/repo/infra/local/runtime/radroots/config/apps/cli/config.toml", 168 ), 169 workspace_config_path: None, 170 app_data_root: PathBuf::from("/repo/infra/local/runtime/radroots/data/apps/cli"), 171 app_logs_root: PathBuf::from("/repo/infra/local/runtime/radroots/logs/apps/cli"), 172 shared_accounts_data_root: PathBuf::from(shared_accounts_data_root), 173 shared_accounts_secrets_root: PathBuf::from( 174 "/repo/infra/local/runtime/radroots/secrets/shared/accounts", 175 ), 176 default_identity_path: PathBuf::from( 177 "/repo/infra/local/runtime/radroots/secrets/shared/identities/default.json", 178 ), 179 } 180 } 181 } 182 183 fn current_time_ms() -> Result<i64, RuntimeError> { 184 let duration = SystemTime::now() 185 .duration_since(UNIX_EPOCH) 186 .map_err(|error| { 187 RuntimeError::Config(format!("system clock is before unix epoch: {error}")) 188 })?; 189 i64::try_from(duration.as_millis()) 190 .map_err(|_| RuntimeError::Config("current timestamp exceeds i64 milliseconds".to_owned())) 191 }