commit dfc34f3f6f9280046ed85200e783ff74cdb8232e
parent 79b00952e272ddb1dbf7805ae77e37bab868f853
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 01:40:04 -0700
cli: add migrate command
Diffstat:
6 files changed, 410 insertions(+), 23 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -3883,6 +3883,10 @@ dependencies = [
[[package]]
name = "tangle"
version = "0.1.0"
+dependencies = [
+ "tangle_runtime",
+ "tokio",
+]
[[package]]
name = "tangle_core"
diff --git a/crates/tangle/Cargo.toml b/crates/tangle/Cargo.toml
@@ -8,5 +8,9 @@ license.workspace = true
description = "The tangle Nostr relay runtime"
readme = "../../README"
+[dependencies]
+tangle_runtime = { path = "../tangle_runtime" }
+tokio = { version = "1", features = ["rt"] }
+
[lints]
workspace = true
diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs
@@ -5,7 +5,7 @@ use std::fmt;
pub const PACKAGE_NAME: &str = env!("CARGO_PKG_NAME");
pub const PACKAGE_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const USAGE: &str = "\
-usage: tangle [--version] <command>
+usage: tangle [--version] <command> [--config PATH]
commands:
migrate
@@ -39,7 +39,30 @@ impl TangleCommand {
}
pub fn implemented(self) -> bool {
- matches!(self, Self::Version | Self::Help)
+ matches!(self, Self::Version | Self::Help | Self::Migrate)
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct TangleInvocation {
+ command: TangleCommand,
+ config_path: Option<String>,
+}
+
+impl TangleInvocation {
+ pub fn new(command: TangleCommand, config_path: Option<String>) -> Self {
+ Self {
+ command,
+ config_path,
+ }
+ }
+
+ pub fn command(&self) -> TangleCommand {
+ self.command
+ }
+
+ pub fn config_path(&self) -> Option<&str> {
+ self.config_path.as_deref()
}
}
@@ -47,6 +70,8 @@ impl TangleCommand {
pub enum TangleCliError {
UnknownCommand(String),
MissingNestedCommand(&'static str),
+ MissingOptionValue(&'static str),
+ RepeatedOption(&'static str),
UnexpectedArgument { command: String, argument: String },
}
@@ -57,6 +82,12 @@ impl fmt::Display for TangleCliError {
Self::MissingNestedCommand(command) => {
write!(formatter, "{command} command requires a nested command")
}
+ Self::MissingOptionValue(option) => {
+ write!(formatter, "{option} requires a value")
+ }
+ Self::RepeatedOption(option) => {
+ write!(formatter, "{option} must not be repeated")
+ }
Self::UnexpectedArgument { command, argument } => {
write!(
formatter,
@@ -82,9 +113,17 @@ where
I: IntoIterator<Item = S>,
S: Into<String>,
{
+ parse_tangle_invocation(args).map(|invocation| invocation.command)
+}
+
+pub fn parse_tangle_invocation<I, S>(args: I) -> Result<TangleInvocation, TangleCliError>
+where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+{
let mut args = args.into_iter().map(Into::into);
let Some(first) = args.next() else {
- return Ok(TangleCommand::Help);
+ return Ok(TangleInvocation::new(TangleCommand::Help, None));
};
let command = match first.as_str() {
"--version" | "-V" => TangleCommand::Version,
@@ -116,21 +155,60 @@ where
}
_ => return Err(TangleCliError::UnknownCommand(first)),
};
- if let Some(argument) = args.next() {
- return Err(TangleCliError::UnexpectedArgument {
- command: command.as_str().to_owned(),
- argument,
- });
+ let mut config_path = None;
+ while let Some(argument) = args.next() {
+ match argument.as_str() {
+ "--config" => {
+ if config_path.is_some() {
+ return Err(TangleCliError::RepeatedOption("--config"));
+ }
+ let Some(path) = args.next() else {
+ return Err(TangleCliError::MissingOptionValue("--config"));
+ };
+ config_path = Some(path);
+ }
+ _ => {
+ return Err(TangleCliError::UnexpectedArgument {
+ command: command.as_str().to_owned(),
+ argument,
+ });
+ }
+ }
}
- Ok(command)
+ Ok(TangleInvocation::new(command, config_path))
+}
+
+pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> {
+ invocation
+ .config_path()
+ .ok_or(TangleCliError::MissingOptionValue("--config"))
+}
+
+pub fn migrate_output(report: tangle_runtime::RuntimeMigrationReport) -> String {
+ format!(
+ "migrations applied: {}\nmigrations already applied: {}\nmigrations total: {}",
+ report.applied(),
+ report.already_applied(),
+ report.total()
+ )
+}
+
+pub async fn migrate_with_config(path: &str) -> Result<String, String> {
+ let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
+ let report = tangle_runtime::migrate_runtime_database(&config)
+ .await
+ .map_err(|error| error.to_string())?;
+ Ok(migrate_output(report))
}
#[cfg(test)]
mod tests {
use super::{
- PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, parse_tangle_command,
+ PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation,
+ migrate_output, parse_tangle_command, parse_tangle_invocation, require_config_path,
usage_output, version_output,
};
+ use tangle_runtime::RuntimeMigrationReport;
#[test]
fn package_name_is_tangle() {
@@ -151,7 +229,7 @@ mod tests {
fn usage_output_lists_supported_command_model() {
assert_eq!(
usage_output(),
- "usage: tangle [--version] <command>\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild"
+ "usage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild"
);
}
@@ -177,12 +255,36 @@ mod tests {
assert_eq!(parse_tangle_command(args).expect("command"), expected);
assert_eq!(
expected.implemented(),
- matches!(expected, TangleCommand::Version | TangleCommand::Help)
+ matches!(
+ expected,
+ TangleCommand::Version | TangleCommand::Help | TangleCommand::Migrate
+ )
);
}
}
#[test]
+ fn command_model_parses_common_config_option() {
+ assert_eq!(
+ parse_tangle_invocation(["migrate", "--config", "runtime.json"]).expect("invocation"),
+ TangleInvocation::new(TangleCommand::Migrate, Some("runtime.json".to_owned()))
+ );
+ assert_eq!(
+ require_config_path(&TangleInvocation::new(
+ TangleCommand::Migrate,
+ Some("runtime.json".to_owned())
+ ))
+ .expect("config"),
+ "runtime.json"
+ );
+ assert_eq!(
+ require_config_path(&TangleInvocation::new(TangleCommand::Migrate, None))
+ .expect_err("config"),
+ TangleCliError::MissingOptionValue("--config")
+ );
+ }
+
+ #[test]
fn command_model_rejects_unknown_or_extra_arguments() {
assert_eq!(
parse_tangle_command(["unknown"]).expect_err("unknown"),
@@ -203,5 +305,22 @@ mod tests {
argument: "--extra".to_owned()
}
);
+ assert_eq!(
+ parse_tangle_invocation(["migrate", "--config"]).expect_err("missing config"),
+ TangleCliError::MissingOptionValue("--config")
+ );
+ assert_eq!(
+ parse_tangle_invocation(["migrate", "--config", "a", "--config", "b"])
+ .expect_err("repeated config"),
+ TangleCliError::RepeatedOption("--config")
+ );
+ }
+
+ #[test]
+ fn migrate_output_reports_outcome_counts() {
+ assert_eq!(
+ migrate_output(RuntimeMigrationReport::new(8, 2, 10)),
+ "migrations applied: 8\nmigrations already applied: 2\nmigrations total: 10"
+ );
}
}
diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs
@@ -4,15 +4,15 @@ use std::env;
use std::process::ExitCode;
fn main() -> ExitCode {
- let command = match tangle::parse_tangle_command(env::args().skip(1)) {
- Ok(command) => command,
+ let invocation = match tangle::parse_tangle_invocation(env::args().skip(1)) {
+ Ok(invocation) => invocation,
Err(error) => {
eprintln!("{error}");
eprintln!("{}", tangle::usage_output());
return ExitCode::from(2);
}
};
- match command {
+ match invocation.command() {
tangle::TangleCommand::Version => {
println!("{}", tangle::version_output());
ExitCode::SUCCESS
@@ -21,9 +21,28 @@ fn main() -> ExitCode {
println!("{}", tangle::usage_output());
ExitCode::SUCCESS
}
+ tangle::TangleCommand::Migrate => match run_migrate(&invocation) {
+ Ok(output) => {
+ println!("{output}");
+ ExitCode::SUCCESS
+ }
+ Err(error) => {
+ eprintln!("{error}");
+ ExitCode::from(2)
+ }
+ },
command => {
eprintln!("command not implemented: {}", command.as_str());
ExitCode::from(2)
}
}
}
+
+fn run_migrate(invocation: &tangle::TangleInvocation) -> Result<String, String> {
+ let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?;
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .map_err(|error| format!("failed to start runtime: {error}"))?;
+ runtime.block_on(tangle::migrate_with_config(config_path))
+}
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -23,7 +23,7 @@ fn tangle_without_args_reports_usage() {
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout),
- "usage: tangle [--version] <command>\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n"
+ "usage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n"
);
assert!(output.stderr.is_empty());
}
@@ -39,7 +39,65 @@ fn tangle_unknown_arg_reports_usage_error() {
assert!(output.stdout.is_empty());
assert_eq!(
String::from_utf8_lossy(&output.stderr),
- "unknown command: --unknown\nusage: tangle [--version] <command>\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n"
+ "unknown command: --unknown\nusage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n"
+ );
+}
+
+#[test]
+fn tangle_migrate_command_applies_configured_migrations() {
+ let path = std::env::temp_dir().join(format!("tangle-cli-migrate-{}.json", std::process::id()));
+ std::fs::write(
+ &path,
+ r#"{
+ "server": {
+ "listen_addr": "127.0.0.1:7400",
+ "relay_url": "ws://127.0.0.1:7400"
+ },
+ "database": {
+ "mode": "memory",
+ "namespace": "tangle_cli_migrate",
+ "database": "relay"
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300
+ },
+ "limits": {
+ "message_rate_limit": {
+ "limit": 120,
+ "window_seconds": 60
+ }
+ }
+ }"#,
+ )
+ .expect("write config");
+
+ let output = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .args(["migrate", "--config"])
+ .arg(&path)
+ .output()
+ .expect("run tangle migrate");
+ std::fs::remove_file(&path).expect("remove config");
+
+ assert!(output.status.success());
+ assert_eq!(
+ String::from_utf8_lossy(&output.stdout),
+ "migrations applied: 10\nmigrations already applied: 0\nmigrations total: 10\n"
+ );
+ assert!(output.stderr.is_empty());
+}
+
+#[test]
+fn tangle_migrate_requires_config_path() {
+ let output = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .arg("migrate")
+ .output()
+ .expect("run tangle migrate without config");
+
+ assert_eq!(output.status.code(), Some(2));
+ assert!(output.stdout.is_empty());
+ assert_eq!(
+ String::from_utf8_lossy(&output.stderr),
+ "--config requires a value\n"
);
}
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -25,7 +25,8 @@ use tangle_protocol::{
};
use tangle_store::{StoreEventOutcome, StoredEvent};
use tangle_store_surreal::{
- ListingProjectionQuery, SearchDocumentQuery, SurrealConnectionConfig, SurrealStore,
+ ListingProjectionQuery, MigrationApplyOutcome, SearchDocumentQuery, SurrealConnectionConfig,
+ SurrealConnectionMode, SurrealStore, base_migration_plan,
};
use url::form_urlencoded;
@@ -321,6 +322,110 @@ pub fn parse_runtime_config_json(raw: &str) -> Result<TangleRuntimeConfig, Runti
runtime_config_from_document(document)
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RuntimeMigrationReport {
+ applied: u64,
+ already_applied: u64,
+ total: u64,
+}
+
+impl RuntimeMigrationReport {
+ pub fn new(applied: u64, already_applied: u64, total: u64) -> Self {
+ Self {
+ applied,
+ already_applied,
+ total,
+ }
+ }
+
+ pub fn applied(self) -> u64 {
+ self.applied
+ }
+
+ pub fn already_applied(self) -> u64 {
+ self.already_applied
+ }
+
+ pub fn total(self) -> u64 {
+ self.total
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum RuntimeCommandErrorKind {
+ Unsupported,
+ Store,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RuntimeCommandError {
+ kind: RuntimeCommandErrorKind,
+ message: String,
+}
+
+impl RuntimeCommandError {
+ pub fn new(kind: RuntimeCommandErrorKind, message: impl Into<String>) -> Self {
+ Self {
+ kind,
+ message: message.into(),
+ }
+ }
+
+ pub fn unsupported(message: impl Into<String>) -> Self {
+ Self::new(RuntimeCommandErrorKind::Unsupported, message)
+ }
+
+ pub fn store(message: impl Into<String>) -> Self {
+ Self::new(RuntimeCommandErrorKind::Store, message)
+ }
+
+ pub fn kind(&self) -> RuntimeCommandErrorKind {
+ self.kind
+ }
+
+ pub fn message(&self) -> &str {
+ &self.message
+ }
+}
+
+impl fmt::Display for RuntimeCommandError {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(formatter, "{:?}: {}", self.kind, self.message)
+ }
+}
+
+impl std::error::Error for RuntimeCommandError {}
+
+pub async fn migrate_runtime_database(
+ config: &TangleRuntimeConfig,
+) -> Result<RuntimeMigrationReport, RuntimeCommandError> {
+ if config.database_config().mode() != &SurrealConnectionMode::Memory {
+ return Err(RuntimeCommandError::unsupported(
+ "migrate currently supports memory SurrealDB configs only",
+ ));
+ }
+ let store = SurrealStore::connect_memory(config.database_config())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let outcomes = store
+ .apply_plan(&base_migration_plan())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let applied = outcomes
+ .iter()
+ .filter(|outcome| **outcome == MigrationApplyOutcome::Applied)
+ .count() as u64;
+ let already_applied = outcomes
+ .iter()
+ .filter(|outcome| **outcome == MigrationApplyOutcome::AlreadyApplied)
+ .count() as u64;
+ Ok(RuntimeMigrationReport::new(
+ applied,
+ already_applied,
+ outcomes.len() as u64,
+ ))
+}
+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
struct RuntimeConfigDocument {
server: RuntimeServerConfigDocument,
@@ -2137,11 +2242,12 @@ mod tests {
ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome,
EventMessageHandler, GracefulShutdownSignal, ListingsHttpState, LiveEventFanout,
ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig,
- RelayConnectionId, RelayInfoDocument, ReqMessageHandler, RuntimeConfigErrorKind,
- TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router,
- listing_item_document, listing_projection_query, listings_router, load_runtime_config,
- parse_listing_query, parse_marketplace_search_query, parse_runtime_config_json,
- relay_info_router, search_document_query, websocket_router,
+ RelayConnectionId, RelayInfoDocument, ReqMessageHandler, RuntimeCommandErrorKind,
+ RuntimeConfigErrorKind, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, WebSocketHttpState,
+ health_router, listing_item_document, listing_projection_query, listings_router,
+ load_runtime_config, migrate_runtime_database, parse_listing_query,
+ parse_marketplace_search_query, parse_runtime_config_json, relay_info_router,
+ search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
@@ -2569,6 +2675,83 @@ mod tests {
}
#[tokio::test]
+ async fn runtime_migration_command_applies_memory_database_plan() {
+ let config = parse_runtime_config_json(
+ r#"{
+ "server": {
+ "listen_addr": "127.0.0.1:7300",
+ "relay_url": "ws://127.0.0.1:7300"
+ },
+ "database": {
+ "mode": "memory",
+ "namespace": "tangle_migrate",
+ "database": "relay"
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300
+ },
+ "limits": {
+ "message_rate_limit": {
+ "limit": 120,
+ "window_seconds": 60
+ }
+ }
+ }"#,
+ )
+ .expect("runtime config");
+
+ let report = migrate_runtime_database(&config).await.expect("migrate");
+
+ assert_eq!(
+ report.applied(),
+ base_migration_plan().migrations().len() as u64
+ );
+ assert_eq!(report.already_applied(), 0);
+ assert_eq!(
+ report.total(),
+ base_migration_plan().migrations().len() as u64
+ );
+ }
+
+ #[tokio::test]
+ async fn runtime_migration_command_rejects_remote_database_modes() {
+ let config = parse_runtime_config_json(
+ r#"{
+ "server": {
+ "listen_addr": "127.0.0.1:7301",
+ "relay_url": "ws://127.0.0.1:7301"
+ },
+ "database": {
+ "mode": "http",
+ "endpoint": "http://127.0.0.1:8000",
+ "namespace": "tangle",
+ "database": "relay"
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300
+ },
+ "limits": {
+ "message_rate_limit": {
+ "limit": 120,
+ "window_seconds": 60
+ }
+ }
+ }"#,
+ )
+ .expect("runtime config");
+
+ let error = migrate_runtime_database(&config)
+ .await
+ .expect_err("remote unsupported");
+
+ assert_eq!(error.kind(), RuntimeCommandErrorKind::Unsupported);
+ assert_eq!(
+ error.message(),
+ "migrate currently supports memory SurrealDB configs only"
+ );
+ }
+
+ #[tokio::test]
async fn websocket_route_requires_upgrade_headers() {
let response = websocket_router(WebSocketHttpState::default())
.oneshot(