commit 2df1abc31e36816cfbbe7a1a3e8fa6d4950c94a8
parent 3258c607fd16ce83a49dc03115294326b1d09851
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 02:08:42 -0700
cli: add projection rebuild command
Diffstat:
4 files changed, 259 insertions(+), 57 deletions(-)
diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs
@@ -46,6 +46,7 @@ impl TangleCommand {
| Self::Run
| Self::EventImport
| Self::EventExport
+ | Self::ProjectionRebuild
)
}
}
@@ -283,6 +284,16 @@ pub fn event_export_output(report: tangle_runtime::RuntimeEventExportReport) ->
format!("events exported: {}", report.exported())
}
+pub fn projection_rebuild_output(report: tangle_runtime::RuntimeProjectionRebuildReport) -> String {
+ format!(
+ "events scanned: {}\nevents rebuilt: {}\nlistings projected: {}\nevents skipped: {}",
+ report.scanned(),
+ report.rebuilt(),
+ report.projected(),
+ report.skipped()
+ )
+}
+
pub async fn migrate_with_config(path: &str) -> Result<String, String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
let report = tangle_runtime::migrate_runtime_database(&config)
@@ -315,6 +326,15 @@ pub async fn event_export_with_config(
Ok(event_export_output(report))
}
+pub async fn projection_rebuild_with_config(config_path: &str) -> Result<String, String> {
+ let config =
+ tangle_runtime::load_runtime_config(config_path).map_err(|error| error.to_string())?;
+ let report = tangle_runtime::rebuild_projections(&config)
+ .await
+ .map_err(|error| error.to_string())?;
+ Ok(projection_rebuild_output(report))
+}
+
pub async fn run_with_config(path: &str) -> Result<(), String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
let (shutdown, _) = tangle_runtime::GracefulShutdownSignal::new();
@@ -335,11 +355,12 @@ mod tests {
use super::{
PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation,
event_export_output, event_import_output, migrate_output, parse_tangle_command,
- parse_tangle_invocation, require_config_path, require_input_path, require_output_path,
- usage_output, version_output,
+ parse_tangle_invocation, projection_rebuild_output, require_config_path,
+ require_input_path, require_output_path, usage_output, version_output,
};
use tangle_runtime::{
RuntimeEventExportReport, RuntimeEventImportReport, RuntimeMigrationReport,
+ RuntimeProjectionRebuildReport,
};
#[test]
@@ -395,6 +416,7 @@ mod tests {
| TangleCommand::Run
| TangleCommand::EventImport
| TangleCommand::EventExport
+ | TangleCommand::ProjectionRebuild
)
);
}
@@ -562,4 +584,12 @@ mod tests {
"events exported: 3"
);
}
+
+ #[test]
+ fn projection_rebuild_output_reports_outcome_counts() {
+ assert_eq!(
+ projection_rebuild_output(RuntimeProjectionRebuildReport::new(4, 3, 2, 1)),
+ "events scanned: 4\nevents rebuilt: 3\nlistings projected: 2\nevents skipped: 1"
+ );
+ }
}
diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs
@@ -58,10 +58,16 @@ fn main() -> ExitCode {
ExitCode::from(2)
}
},
- command => {
- eprintln!("command not implemented: {}", command.as_str());
- ExitCode::from(2)
- }
+ tangle::TangleCommand::ProjectionRebuild => match run_projection_rebuild(&invocation) {
+ Ok(output) => {
+ println!("{output}");
+ ExitCode::SUCCESS
+ }
+ Err(error) => {
+ eprintln!("{error}");
+ ExitCode::from(2)
+ }
+ },
}
}
@@ -102,3 +108,12 @@ fn run_event_export(invocation: &tangle::TangleInvocation) -> Result<String, Str
.map_err(|error| format!("failed to start runtime: {error}"))?;
runtime.block_on(tangle::event_export_with_config(config_path, output_path))
}
+
+fn run_projection_rebuild(invocation: &tangle::TangleInvocation) -> Result<String, String> {
+ let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?;
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .map_err(|error| format!("failed to start runtime: {error}"))?;
+ runtime.block_on(tangle::projection_rebuild_with_config(config_path))
+}
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -157,6 +157,32 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
format!("{}\n", event_to_value(&listing))
);
+ let rebuild = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .args(["projection", "rebuild", "--config"])
+ .arg(&config_path)
+ .output()
+ .expect("run tangle projection rebuild");
+
+ assert!(rebuild.status.success());
+ assert_eq!(
+ String::from_utf8_lossy(&rebuild.stdout),
+ "events scanned: 1\nevents rebuilt: 1\nlistings projected: 1\nevents skipped: 0\n"
+ );
+ assert!(rebuild.stderr.is_empty());
+
+ let second_rebuild = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .args(["projection", "rebuild", "--config"])
+ .arg(&config_path)
+ .output()
+ .expect("rerun tangle projection rebuild");
+
+ assert!(second_rebuild.status.success());
+ assert_eq!(
+ String::from_utf8_lossy(&second_rebuild.stdout),
+ "events scanned: 1\nevents rebuilt: 1\nlistings projected: 1\nevents skipped: 0\n"
+ );
+ assert!(second_rebuild.stderr.is_empty());
+
let seller = FixtureKey::Seller.public_key();
let listing_key = format!("30402:{}:listing-a", seller.as_str());
let store_config = SurrealConnectionConfig::rocksdb(
@@ -208,17 +234,17 @@ fn tangle_migrate_requires_config_path() {
}
#[test]
-fn tangle_known_future_commands_report_not_implemented() {
+fn tangle_projection_rebuild_requires_config_path() {
let output = Command::new(env!("CARGO_BIN_EXE_tangle"))
.args(["projection", "rebuild"])
.output()
- .expect("run tangle projection rebuild");
+ .expect("run tangle projection rebuild without config");
assert_eq!(output.status.code(), Some(2));
assert!(output.stdout.is_empty());
assert_eq!(
String::from_utf8_lossy(&output.stderr),
- "command not implemented: projection rebuild\n"
+ "--config requires a value\n"
);
}
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -563,26 +563,34 @@ async fn import_single_event(
if raw_outcome == StoreEventOutcome::Duplicate {
return Ok(RuntimeEventImportOutcome::Duplicate);
}
- if store.index_event_tags(&event).await.is_err()
- || store.maintain_current_event(&event).await.is_err()
- || store.apply_deletion_markers(&event).await.is_err()
- || store.store_listing_revision(&event, now).await.is_err()
+ let projected =
+ project_stored_event(store, &event, validated.admission().effect(), now).await?;
+ Ok(RuntimeEventImportOutcome::Inserted { projected })
+}
+
+async fn project_stored_event(
+ store: &SurrealStore,
+ event: &Event,
+ effect: AdmissionEffect,
+ now: UnixTimestamp,
+) -> Result<bool, RuntimeCommandError> {
+ if store.index_event_tags(event).await.is_err()
+ || store.maintain_current_event(event).await.is_err()
+ || store.apply_deletion_markers(event).await.is_err()
+ || store.store_listing_revision(event, now).await.is_err()
{
return Err(RuntimeCommandError::store("event projection failed"));
}
- let projected =
- if validated.admission().effect() == AdmissionEffect::StoreRawAndProjectPublicListing {
- if store.project_current_listing(&event, now).await.is_err()
- || store.project_listing_helpers(&event).await.is_err()
- || store.index_listing_search_document(&event).await.is_err()
- {
- return Err(RuntimeCommandError::store("event projection failed"));
- }
- true
- } else {
- false
- };
- Ok(RuntimeEventImportOutcome::Inserted { projected })
+ if effect == AdmissionEffect::StoreRawAndProjectPublicListing {
+ if store.project_current_listing(event, now).await.is_err()
+ || store.project_listing_helpers(event).await.is_err()
+ || store.index_listing_search_document(event).await.is_err()
+ {
+ return Err(RuntimeCommandError::store("event projection failed"));
+ }
+ return Ok(true);
+ }
+ Ok(false)
}
fn parse_event_import_document(raw: &str) -> Result<Vec<Event>, RuntimeCommandError> {
@@ -692,6 +700,121 @@ fn runtime_row_string(
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RuntimeProjectionRebuildReport {
+ scanned: u64,
+ rebuilt: u64,
+ projected: u64,
+ skipped: u64,
+}
+
+impl RuntimeProjectionRebuildReport {
+ pub fn new(scanned: u64, rebuilt: u64, projected: u64, skipped: u64) -> Self {
+ Self {
+ scanned,
+ rebuilt,
+ projected,
+ skipped,
+ }
+ }
+
+ pub fn scanned(self) -> u64 {
+ self.scanned
+ }
+
+ pub fn rebuilt(self) -> u64 {
+ self.rebuilt
+ }
+
+ pub fn projected(self) -> u64 {
+ self.projected
+ }
+
+ pub fn skipped(self) -> u64 {
+ self.skipped
+ }
+
+ fn record(&mut self, outcome: RuntimeProjectionRebuildOutcome) {
+ self.scanned += 1;
+ match outcome {
+ RuntimeProjectionRebuildOutcome::Rebuilt { projected } => {
+ self.rebuilt += 1;
+ if projected {
+ self.projected += 1;
+ }
+ }
+ RuntimeProjectionRebuildOutcome::Skipped => {
+ self.skipped += 1;
+ }
+ }
+ }
+}
+
+impl Default for RuntimeProjectionRebuildReport {
+ fn default() -> Self {
+ Self::new(0, 0, 0, 0)
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum RuntimeProjectionRebuildOutcome {
+ Rebuilt { projected: bool },
+ Skipped,
+}
+
+pub async fn rebuild_projections(
+ config: &TangleRuntimeConfig,
+) -> Result<RuntimeProjectionRebuildReport, RuntimeCommandError> {
+ let store = connect_runtime_store(config).await?;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let rows = store
+ .query_raw_events(&Filter::empty())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let validator = EventValidator::new(
+ config.limits(),
+ config
+ .admission_policy()
+ .clone()
+ .with_write_auth_required(false),
+ );
+ let now = now_timestamp();
+ let mut report = RuntimeProjectionRebuildReport::default();
+ for row in rows {
+ let raw = RawEventJson::new(&runtime_row_string(&row, "raw_json")?)
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let event = parse_event_json(&raw)
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let outcome = rebuild_single_event_projection(&store, &validator, event, now).await?;
+ report.record(outcome);
+ }
+ Ok(report)
+}
+
+async fn rebuild_single_event_projection(
+ store: &SurrealStore,
+ validator: &EventValidator,
+ event: Event,
+ now: UnixTimestamp,
+) -> Result<RuntimeProjectionRebuildOutcome, RuntimeCommandError> {
+ let validated = match validator.validate(&event, &AdmissionContext::unauthenticated(), now) {
+ Ok(validated) => validated,
+ Err(_) => return Ok(RuntimeProjectionRebuildOutcome::Skipped),
+ };
+ if validated.admission().effect() == AdmissionEffect::AuthenticateOnly {
+ return Ok(RuntimeProjectionRebuildOutcome::Skipped);
+ }
+ if event.unsigned().kind().is_ephemeral() {
+ return Ok(RuntimeProjectionRebuildOutcome::Skipped);
+ }
+ let projected =
+ project_stored_event(store, &event, validated.admission().effect(), now).await?;
+ Ok(RuntimeProjectionRebuildOutcome::Rebuilt { projected })
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RuntimeServerReport {
listen_addr: SocketAddr,
}
@@ -956,11 +1079,13 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) {
if handle_client_message(
&mut socket,
&mut loop_state,
- &event_handler,
- &auth_handler,
- &req_handler,
- &close_handler,
- &state.event_tx,
+ ClientMessageHandlers {
+ event: &event_handler,
+ auth: &auth_handler,
+ req: &req_handler,
+ close: &close_handler,
+ event_tx: &state.event_tx,
+ },
message,
)
.await
@@ -982,20 +1107,26 @@ async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) {
}
}
+#[derive(Clone, Copy)]
+struct ClientMessageHandlers<'a> {
+ event: &'a EventMessageHandler,
+ auth: &'a AuthMessageHandler,
+ req: &'a ReqMessageHandler,
+ close: &'a CloseMessageHandler,
+ event_tx: &'a broadcast::Sender<Event>,
+}
+
async fn handle_client_message(
socket: &mut WebSocket,
loop_state: &mut ClientMessageLoop,
- event_handler: &EventMessageHandler,
- auth_handler: &AuthMessageHandler,
- req_handler: &ReqMessageHandler,
- close_handler: &CloseMessageHandler,
- event_tx: &broadcast::Sender<Event>,
+ handlers: ClientMessageHandlers<'_>,
message: ClientMessage,
) -> Result<(), axum::Error> {
match message {
ClientMessage::Event(event) => {
let accepted_event = event.clone();
- let response = event_handler
+ let response = handlers
+ .event
.handle_event(
loop_state.connection(),
event,
@@ -1006,11 +1137,11 @@ async fn handle_client_message(
let accepted = matches!(response, RelayMessage::Ok { accepted: true, .. });
send_relay_message(socket, &response).await?;
if accepted {
- let _ = event_tx.send(accepted_event);
+ let _ = handlers.event_tx.send(accepted_event);
}
}
ClientMessage::Auth(event) => {
- let response = auth_handler.handle_auth(
+ let response = handlers.auth.handle_auth(
loop_state.connection_mut(),
event.clone(),
event.unsigned().created_at(),
@@ -1021,7 +1152,8 @@ async fn handle_client_message(
subscription_id,
filters,
} => {
- for response in req_handler
+ for response in handlers
+ .req
.handle_req(loop_state.connection_mut(), subscription_id, filters)
.await
{
@@ -1029,7 +1161,9 @@ async fn handle_client_message(
}
}
ClientMessage::Close(subscription_id) => {
- close_handler.handle_close(loop_state.connection_mut(), &subscription_id);
+ handlers
+ .close
+ .handle_close(loop_state.connection_mut(), &subscription_id);
}
}
Ok(())
@@ -3140,7 +3274,7 @@ mod tests {
.expires_at,
UnixTimestamp::new(130)
);
- assert_eq!(decision.allowed(), true);
+ assert!(decision.allowed());
assert_eq!(decision.remaining(), 1);
assert_eq!(connection.rate_limiter().tracked_key_count(), 1);
assert_eq!(connection.subscriptions_mut().active_count(), 0);
@@ -3163,11 +3297,8 @@ mod tests {
default_state.connection_config().relay_url(),
"wss://relay.radroots.test"
);
- assert_eq!(state.shutdown_signal().is_shutdown_requested(), false);
- assert_eq!(
- default_state.shutdown_signal().is_shutdown_requested(),
- false
- );
+ assert!(!state.shutdown_signal().is_shutdown_requested());
+ assert!(!default_state.shutdown_signal().is_shutdown_requested());
}
#[tokio::test]
@@ -3175,17 +3306,17 @@ mod tests {
let (shutdown, mut first) = GracefulShutdownSignal::new();
let mut second = shutdown.subscribe();
- assert_eq!(shutdown.is_shutdown_requested(), false);
- assert_eq!(first.is_shutdown_requested(), false);
- assert_eq!(second.is_shutdown_requested(), false);
+ assert!(!shutdown.is_shutdown_requested());
+ assert!(!first.is_shutdown_requested());
+ assert!(!second.is_shutdown_requested());
- assert_eq!(shutdown.request_shutdown(), true);
+ assert!(shutdown.request_shutdown());
first.wait_for_shutdown().await;
second.wait_for_shutdown().await;
- assert_eq!(shutdown.is_shutdown_requested(), true);
- assert_eq!(first.is_shutdown_requested(), true);
- assert_eq!(second.is_shutdown_requested(), true);
+ assert!(shutdown.is_shutdown_requested());
+ assert!(first.is_shutdown_requested());
+ assert!(second.is_shutdown_requested());
}
#[test]
@@ -4077,8 +4208,8 @@ mod tests {
assert_eq!(relay_info.supported_nips, TANGLE_SUPPORTED_NIPS);
assert_eq!(relay_info.software, TANGLE_RELAY_SOFTWARE);
assert_eq!(relay_info.version, "0.1.0");
- assert_eq!(relay_info.limitation.payment_required, false);
- assert_eq!(relay_info.limitation.restricted_writes, true);
+ assert!(!relay_info.limitation.payment_required);
+ assert!(relay_info.limitation.restricted_writes);
assert_eq!(
serde_json::to_value(relay_info).expect("json"),
serde_json::json!({
@@ -4443,7 +4574,7 @@ mod tests {
assert_eq!(browse.limit(), 50);
let query = search_document_query(&text);
- assert_eq!(format!("{query:?}").contains("SearchDocumentQuery"), true);
+ assert!(format!("{query:?}").contains("SearchDocumentQuery"));
}
#[test]