tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 0f35b05c789b61458532d27504e5e88ba44cbaed
parent abc9147a12c11a457b8eff1b7ec2de076a63bab9
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 02:24:02 -0700

runtime: add async serve skeleton

- add a Tokio serve-until-shutdown loop around the runtime owner
- expose serve reports with bound address and shutdown subscription counts
- wire the CLI binary onto Tokio without changing the current probe command behavior
- verify formatting, runtime tests, CLI smoke tests, workspace checks, and clippy

Diffstat:
MCargo.lock | 1+
Mcrates/tangle/Cargo.toml | 1+
Mcrates/tangle/src/main.rs | 15+++++++--------
Mcrates/tangle_runtime/src/lib.rs | 1+
Mcrates/tangle_runtime/src/runtime.rs | 2+-
Acrates/tangle_runtime/src/server.rs | 124+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 135 insertions(+), 9 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1210,6 +1210,7 @@ dependencies = [ "tangle_protocol", "tangle_runtime", "tangle_test_support", + "tokio", ] [[package]] diff --git a/crates/tangle/Cargo.toml b/crates/tangle/Cargo.toml @@ -10,6 +10,7 @@ readme = "../../README" [dependencies] tangle_runtime = { path = "../tangle_runtime" } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [dev-dependencies] serde_json = "1" diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs @@ -3,7 +3,8 @@ use std::env; use std::process::ExitCode; -fn main() -> ExitCode { +#[tokio::main] +async fn main() -> ExitCode { let invocation = match tangle::parse_tangle_invocation(env::args().skip(1)) { Ok(invocation) => invocation, Err(error) => { @@ -21,7 +22,7 @@ fn main() -> ExitCode { println!("{}", tangle::usage_output()); ExitCode::SUCCESS } - tangle::TangleCommand::Run => match run_server(&invocation) { + tangle::TangleCommand::Run => match run_server(&invocation).await { Ok(output) => { println!("{output}"); ExitCode::SUCCESS @@ -34,20 +35,18 @@ fn main() -> ExitCode { } } -fn run_server(invocation: &tangle::TangleInvocation) -> Result<String, String> { +async fn run_server(invocation: &tangle::TangleInvocation) -> Result<String, String> { let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?; tangle::run_with_config(config_path) } #[cfg(test)] mod tests { - use super::run_server; - - #[test] - fn command_runner_reports_missing_config_in_process() { + #[tokio::test] + async fn command_runner_reports_missing_config_in_process() { let run = tangle::TangleInvocation::new(tangle::TangleCommand::Run, None); assert_eq!( - run_server(&run).expect_err("run config"), + super::run_server(&run).await.expect_err("run config"), "--config requires a value" ); } diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -10,6 +10,7 @@ pub mod ops; pub(crate) mod pocket_conversion; pub mod relay; pub mod runtime; +pub mod server; use std::{fmt, fs, path::Path, path::PathBuf}; diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -226,7 +226,7 @@ impl TangleShutdownSignal { } pub fn request_shutdown(&self) { - let _ = self.sender.send(true); + self.sender.send_replace(true); } pub fn requested(&self) -> bool { diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -0,0 +1,124 @@ +#![forbid(unsafe_code)] + +use crate::{errors::BaseRelayError, runtime::TangleRuntime}; +use std::net::SocketAddr; +use tokio::net::TcpListener; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleServeReport { + listen_addr: SocketAddr, + closed_subscriptions: usize, +} + +impl TangleServeReport { + pub fn new(listen_addr: SocketAddr, closed_subscriptions: usize) -> Self { + Self { + listen_addr, + closed_subscriptions, + } + } + + pub fn listen_addr(self) -> SocketAddr { + self.listen_addr + } + + pub fn closed_subscriptions(self) -> usize { + self.closed_subscriptions + } +} + +pub async fn serve_until_shutdown( + mut runtime: TangleRuntime, +) -> Result<TangleServeReport, BaseRelayError> { + let listener = TcpListener::bind(runtime.config().listen_addr()) + .await + .map_err(|error| BaseRelayError::error(error.to_string()))?; + let listen_addr = listener + .local_addr() + .map_err(|error| BaseRelayError::error(error.to_string()))?; + let mut shutdown = runtime.shutdown_signal().subscribe(); + loop { + if *shutdown.borrow() { + break; + } + tokio::select! { + accept = listener.accept() => { + let (_stream, _peer_addr) = accept.map_err(|error| BaseRelayError::error(error.to_string()))?; + } + changed = shutdown.changed() => { + changed.map_err(|error| BaseRelayError::error(error.to_string()))?; + if *shutdown.borrow() { + break; + } + } + } + } + let shutdown = runtime.shutdown()?; + Ok(TangleServeReport::new( + listen_addr, + shutdown.closed_subscriptions(), + )) +} + +#[cfg(test)] +mod tests { + use super::serve_until_shutdown; + use crate::{ + config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + runtime::TangleRuntime, + }; + use serde_json::json; + use std::path::{Path, PathBuf}; + + #[tokio::test] + async fn serve_until_shutdown_binds_listener_and_exits_on_signal() { + let root = temp_root("serve-until-shutdown"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let task = tokio::spawn(serve_until_shutdown(runtime)); + + tokio::task::yield_now().await; + shutdown.request_shutdown(); + + let report = task.await.expect("task").expect("serve"); + assert_eq!(report.listen_addr().ip().to_string(), "127.0.0.1"); + assert_ne!(report.listen_addr().port(), 0); + assert_eq!(report.closed_subscriptions(), 0); + + let _ = std::fs::remove_dir_all(root); + } + + fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { + let raw = json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "wss://relay.radroots.test" + }, + "pocket": { + "data_directory": root.join("pocket"), + "map_size_bytes": 1073741824_u64, + "reader_slots": 128, + "sync_policy": "flush_on_shutdown" + }, + "groups": { + "enabled": true, + "canonical_relay_url": "wss://relay.radroots.test", + "relay_secret": "7777777777777777777777777777777777777777777777777777777777777777", + "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"] + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "max_pending_events": 8 + } + }) + .to_string(); + parse_base_relay_runtime_config_json(&raw).expect("config") + } + + fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-server-{name}-{}", std::process::id())) + } +}