lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 85f6775af36d6ef2d0aa5704a81a1888002caee7
parent 91448e424f8a78d569c671261e2dcc6101b5f86d
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 01:17:44 +0000

simplex: add agent proto store and runtime crates

- add simplex-agent-proto with typed envelopes, receipts, and queue rotation codecs
- add simplex-agent-store for connection, queue, outbox, and delivery cursor state
- add simplex-agent-runtime with create join allow subscribe ack reconnect and retry flows
- register the new crates in the rr-rs workspace and cover them with cargo tests

Diffstat:
MCargo.lock | 28++++++++++++++++++++++++++++
MCargo.toml | 6++++++
Acrates/simplex-agent-proto/Cargo.toml | 22++++++++++++++++++++++
Acrates/simplex-agent-proto/src/codec.rs | 722+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-proto/src/error.rs | 45+++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-proto/src/lib.rs | 30++++++++++++++++++++++++++++++
Acrates/simplex-agent-proto/src/model.rs | 137+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-runtime/Cargo.toml | 33+++++++++++++++++++++++++++++++++
Acrates/simplex-agent-runtime/src/error.rs | 44++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-runtime/src/lib.rs | 14++++++++++++++
Acrates/simplex-agent-runtime/src/runtime.rs | 656+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-runtime/src/types.rs | 54++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/simplex-agent-store/Cargo.toml | 22++++++++++++++++++++++
Acrates/simplex-agent-store/src/error.rs | 29+++++++++++++++++++++++++++++
Acrates/simplex-agent-store/src/lib.rs | 17+++++++++++++++++
Acrates/simplex-agent-store/src/store.rs | 402+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
16 files changed, 2261 insertions(+), 0 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2416,6 +2416,34 @@ dependencies = [ ] [[package]] +name = "radroots-simplex-agent-proto" +version = "0.1.0-alpha.1" +dependencies = [ + "radroots-simplex-smp-crypto", + "radroots-simplex-smp-proto", +] + +[[package]] +name = "radroots-simplex-agent-runtime" +version = "0.1.0-alpha.1" +dependencies = [ + "radroots-simplex-agent-proto", + "radroots-simplex-agent-store", + "radroots-simplex-smp-crypto", + "radroots-simplex-smp-proto", + "radroots-simplex-smp-transport", + "sha2", +] + +[[package]] +name = "radroots-simplex-agent-store" +version = "0.1.0-alpha.1" +dependencies = [ + "radroots-simplex-agent-proto", + "radroots-simplex-smp-proto", +] + +[[package]] name = "radroots-simplex-chat-proto" version = "0.1.0-alpha.1" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml @@ -17,6 +17,9 @@ members = [ "crates/nostr-ndb", "crates/nostr-runtime", "crates/runtime", + "crates/simplex-agent-proto", + "crates/simplex-agent-runtime", + "crates/simplex-agent-store", "crates/simplex-chat-proto", "crates/simplex-smp-crypto", "crates/simplex-smp-proto", @@ -62,6 +65,9 @@ radroots-log = { path = "crates/log", version = "0.1.0-alpha.1", default-feature radroots-net = { path = "crates/net", version = "0.1.0-alpha.1", default-features = false } radroots-net-core = { path = "crates/net-core", version = "0.1.0-alpha.1", default-features = false } radroots-nostr-runtime = { path = "crates/nostr-runtime", version = "0.1.0-alpha.1", default-features = false } +radroots-simplex-agent-proto = { path = "crates/simplex-agent-proto", version = "0.1.0-alpha.1", default-features = false } +radroots-simplex-agent-runtime = { path = "crates/simplex-agent-runtime", version = "0.1.0-alpha.1", default-features = false } +radroots-simplex-agent-store = { path = "crates/simplex-agent-store", version = "0.1.0-alpha.1", default-features = false } radroots-simplex-chat-proto = { path = "crates/simplex-chat-proto", version = "0.1.0-alpha.1", default-features = false } radroots-simplex-smp-crypto = { path = "crates/simplex-smp-crypto", version = "0.1.0-alpha.1", default-features = false } radroots-simplex-smp-proto = { path = "crates/simplex-smp-proto", version = "0.1.0-alpha.1", default-features = false } diff --git a/crates/simplex-agent-proto/Cargo.toml b/crates/simplex-agent-proto/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "radroots-simplex-agent-proto" +version = "0.1.0-alpha.1" +edition.workspace = true +authors = [ + "Radroots Authors", +] +rust-version.workspace = true +license.workspace = true +description = "simplex agent protocol envelopes and queue lifecycle types for the radroots sdk" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots-simplex-agent-proto" +readme.workspace = true + +[features] +default = ["std"] +std = ["radroots-simplex-smp-crypto/std", "radroots-simplex-smp-proto/std"] + +[dependencies] +radroots-simplex-smp-crypto = { workspace = true, default-features = false } +radroots-simplex-smp-proto = { workspace = true, default-features = false } diff --git a/crates/simplex-agent-proto/src/codec.rs b/crates/simplex-agent-proto/src/codec.rs @@ -0,0 +1,722 @@ +use crate::error::RadrootsSimplexAgentProtoError; +use crate::model::{ + RADROOTS_SIMPLEX_AGENT_CURRENT_VERSION, RadrootsSimplexAgentConnectionLink, + RadrootsSimplexAgentDecryptedMessage, RadrootsSimplexAgentEncryptedPayload, + RadrootsSimplexAgentEnvelope, RadrootsSimplexAgentMessage, RadrootsSimplexAgentMessageFrame, + RadrootsSimplexAgentMessageHeader, RadrootsSimplexAgentMessageReceipt, + RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor, + RadrootsSimplexAgentQueueUseDecision, +}; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; +use radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpRatchetHeader; +use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpServerAddress, +}; + +pub fn encode_connection_link( + link: &RadrootsSimplexAgentConnectionLink, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + push_short_bytes(&mut buffer, link.invitation_queue.to_string().as_bytes())?; + push_short_bytes(&mut buffer, &link.connection_id)?; + push_short_bytes(&mut buffer, &link.e2e_public_key)?; + buffer.push(encode_bool(link.contact_address)); + Ok(buffer) +} + +pub fn decode_connection_link( + bytes: &[u8], +) -> Result<RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentProtoError> { + let mut cursor = Cursor::new(bytes); + let invitation_queue = String::from_utf8(cursor.read_short_bytes()?) + .map_err(|error| RadrootsSimplexAgentProtoError::InvalidUtf8(error.to_string()))?; + let link = RadrootsSimplexAgentConnectionLink { + invitation_queue: RadrootsSimplexSmpQueueUri::parse(&invitation_queue)?, + connection_id: cursor.read_short_bytes()?, + e2e_public_key: cursor.read_short_bytes()?, + contact_address: decode_bool(cursor.read_byte()?)?, + }; + cursor.finish()?; + Ok(link) +} + +pub fn encode_agent_message_frame( + frame: &RadrootsSimplexAgentMessageFrame, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + buffer.push(b'M'); + buffer.extend_from_slice(&frame.header.message_id.to_be_bytes()); + push_short_bytes(&mut buffer, &frame.header.previous_message_hash)?; + buffer.extend_from_slice(&encode_agent_message(&frame.message)?); + buffer.extend_from_slice(&frame.padding); + Ok(buffer) +} + +pub fn decode_agent_message_frame( + bytes: &[u8], +) -> Result<RadrootsSimplexAgentMessageFrame, RadrootsSimplexAgentProtoError> { + let mut cursor = Cursor::new(bytes); + cursor.expect_tag(b"M")?; + let header = RadrootsSimplexAgentMessageHeader { + message_id: cursor.read_u64()?, + previous_message_hash: cursor.read_short_bytes()?, + }; + let message = decode_agent_message(&mut cursor)?; + let padding = cursor.read_remaining().to_vec(); + Ok(RadrootsSimplexAgentMessageFrame { + header, + message, + padding, + }) +} + +pub fn encode_decrypted_message( + message: &RadrootsSimplexAgentDecryptedMessage, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + match message { + RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => { + buffer.push(b'I'); + buffer.extend_from_slice(info); + } + RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => { + buffer.push(b'D'); + push_list(&mut buffer, reply_queues, encode_queue_descriptor)?; + push_large_bytes(&mut buffer, info)?; + } + RadrootsSimplexAgentDecryptedMessage::RatchetInfo(info) => { + buffer.push(b'R'); + push_large_bytes(&mut buffer, info)?; + } + RadrootsSimplexAgentDecryptedMessage::Message(frame) => { + buffer.extend_from_slice(&encode_agent_message_frame(frame)?); + } + } + Ok(buffer) +} + +pub fn decode_decrypted_message( + bytes: &[u8], +) -> Result<RadrootsSimplexAgentDecryptedMessage, RadrootsSimplexAgentProtoError> { + let mut cursor = Cursor::new(bytes); + match cursor.read_byte()? { + b'I' => Ok(RadrootsSimplexAgentDecryptedMessage::ConnectionInfo( + cursor.read_remaining().to_vec(), + )), + b'D' => { + let reply_queues = cursor.read_list(decode_queue_descriptor)?; + let info = cursor.read_large_bytes()?; + cursor.finish()?; + Ok(RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info }) + } + b'R' => { + let info = cursor.read_large_bytes()?; + cursor.finish()?; + Ok(RadrootsSimplexAgentDecryptedMessage::RatchetInfo(info)) + } + b'M' => { + decode_agent_message_frame(bytes).map(RadrootsSimplexAgentDecryptedMessage::Message) + } + tag => Err(RadrootsSimplexAgentProtoError::InvalidTag( + String::from_utf8_lossy(&[tag]).into_owned(), + )), + } +} + +pub fn encode_envelope( + envelope: &RadrootsSimplexAgentEnvelope, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + buffer.extend_from_slice(&RADROOTS_SIMPLEX_AGENT_CURRENT_VERSION.to_be_bytes()); + match envelope { + RadrootsSimplexAgentEnvelope::Confirmation { + reply_queue, + encrypted, + } => { + buffer.push(b'C'); + buffer.push(encode_bool(*reply_queue)); + encode_encrypted_payload(&mut buffer, encrypted)?; + } + RadrootsSimplexAgentEnvelope::Message(encrypted) => { + buffer.push(b'M'); + encode_encrypted_payload(&mut buffer, encrypted)?; + } + RadrootsSimplexAgentEnvelope::Invitation { + request, + connection_info, + } => { + buffer.push(b'I'); + push_large_bytes(&mut buffer, request)?; + push_large_bytes(&mut buffer, connection_info)?; + } + RadrootsSimplexAgentEnvelope::RatchetKey { info, encrypted } => { + buffer.push(b'R'); + push_large_bytes(&mut buffer, info)?; + encode_encrypted_payload(&mut buffer, encrypted)?; + } + } + Ok(buffer) +} + +pub fn decode_envelope( + bytes: &[u8], +) -> Result<RadrootsSimplexAgentEnvelope, RadrootsSimplexAgentProtoError> { + let mut cursor = Cursor::new(bytes); + let _version = cursor.read_u16()?; + match cursor.read_byte()? { + b'C' => { + let reply_queue = decode_bool(cursor.read_byte()?)?; + let encrypted = decode_encrypted_payload(&mut cursor)?; + cursor.finish()?; + Ok(RadrootsSimplexAgentEnvelope::Confirmation { + reply_queue, + encrypted, + }) + } + b'M' => { + let encrypted = decode_encrypted_payload(&mut cursor)?; + cursor.finish()?; + Ok(RadrootsSimplexAgentEnvelope::Message(encrypted)) + } + b'I' => { + let request = cursor.read_large_bytes()?; + let connection_info = cursor.read_large_bytes()?; + cursor.finish()?; + Ok(RadrootsSimplexAgentEnvelope::Invitation { + request, + connection_info, + }) + } + b'R' => { + let info = cursor.read_large_bytes()?; + let encrypted = decode_encrypted_payload(&mut cursor)?; + cursor.finish()?; + Ok(RadrootsSimplexAgentEnvelope::RatchetKey { info, encrypted }) + } + tag => Err(RadrootsSimplexAgentProtoError::InvalidTag( + String::from_utf8_lossy(&[tag]).into_owned(), + )), + } +} + +fn encode_agent_message( + message: &RadrootsSimplexAgentMessage, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + match message { + RadrootsSimplexAgentMessage::Hello => buffer.push(b'H'), + RadrootsSimplexAgentMessage::UserMessage(body) => { + buffer.push(b'M'); + buffer.extend_from_slice(body); + } + RadrootsSimplexAgentMessage::Receipt(receipt) => { + buffer.push(b'V'); + buffer.extend_from_slice(&receipt.message_id.to_be_bytes()); + push_short_bytes(&mut buffer, &receipt.message_hash)?; + push_large_bytes(&mut buffer, &receipt.receipt_info)?; + } + RadrootsSimplexAgentMessage::EncryptionReady { up_to_message_id } => { + buffer.push(b'E'); + buffer.extend_from_slice(&up_to_message_id.to_be_bytes()); + } + RadrootsSimplexAgentMessage::QueueContinue(queue) => { + buffer.extend_from_slice(b"QC"); + encode_queue_address(&mut buffer, queue)?; + } + RadrootsSimplexAgentMessage::QueueAdd(queues) => { + buffer.extend_from_slice(b"QA"); + push_list(&mut buffer, queues, encode_queue_descriptor)?; + } + RadrootsSimplexAgentMessage::QueueKey(queues) => { + buffer.extend_from_slice(b"QK"); + push_list(&mut buffer, queues, encode_queue_descriptor)?; + } + RadrootsSimplexAgentMessage::QueueUse(queues) => { + buffer.extend_from_slice(b"QU"); + push_list(&mut buffer, queues, encode_queue_use_decision)?; + } + RadrootsSimplexAgentMessage::QueueTest(queues) => { + buffer.extend_from_slice(b"QT"); + push_list(&mut buffer, queues, encode_queue_address)?; + } + } + Ok(buffer) +} + +fn decode_agent_message( + cursor: &mut Cursor<'_>, +) -> Result<RadrootsSimplexAgentMessage, RadrootsSimplexAgentProtoError> { + let first = cursor.read_byte()?; + match first { + b'H' => Ok(RadrootsSimplexAgentMessage::Hello), + b'M' => Ok(RadrootsSimplexAgentMessage::UserMessage( + cursor.read_remaining().to_vec(), + )), + b'V' => Ok(RadrootsSimplexAgentMessage::Receipt( + RadrootsSimplexAgentMessageReceipt { + message_id: cursor.read_u64()?, + message_hash: cursor.read_short_bytes()?, + receipt_info: cursor.read_large_bytes()?, + }, + )), + b'E' => Ok(RadrootsSimplexAgentMessage::EncryptionReady { + up_to_message_id: cursor.read_u64()?, + }), + b'Q' => match cursor.read_byte()? { + b'C' => Ok(RadrootsSimplexAgentMessage::QueueContinue( + decode_queue_address(cursor)?, + )), + b'A' => Ok(RadrootsSimplexAgentMessage::QueueAdd( + cursor.read_list(decode_queue_descriptor)?, + )), + b'K' => Ok(RadrootsSimplexAgentMessage::QueueKey( + cursor.read_list(decode_queue_descriptor)?, + )), + b'U' => Ok(RadrootsSimplexAgentMessage::QueueUse( + cursor.read_list(decode_queue_use_decision)?, + )), + b'T' => Ok(RadrootsSimplexAgentMessage::QueueTest( + cursor.read_list(decode_queue_address)?, + )), + tag => Err(RadrootsSimplexAgentProtoError::InvalidTag(alloc::format!( + "Q{}", + tag as char + ))), + }, + tag => Err(RadrootsSimplexAgentProtoError::InvalidTag( + String::from_utf8_lossy(&[tag]).into_owned(), + )), + } +} + +fn encode_encrypted_payload( + buffer: &mut Vec<u8>, + encrypted: &RadrootsSimplexAgentEncryptedPayload, +) -> Result<(), RadrootsSimplexAgentProtoError> { + match &encrypted.ratchet_header { + Some(header) => { + buffer.push(1); + let ratchet = encode_ratchet_header(header)?; + push_large_bytes(buffer, &ratchet)?; + } + None => buffer.push(0), + } + push_large_bytes(buffer, &encrypted.ciphertext) +} + +fn decode_encrypted_payload( + cursor: &mut Cursor<'_>, +) -> Result<RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentProtoError> { + let has_header = decode_bool(cursor.read_byte()?)?; + let ratchet_header = if has_header { + Some(decode_ratchet_header(&cursor.read_large_bytes()?)?) + } else { + None + }; + Ok(RadrootsSimplexAgentEncryptedPayload { + ratchet_header, + ciphertext: cursor.read_large_bytes()?, + }) +} + +fn encode_queue_descriptor( + buffer: &mut Vec<u8>, + descriptor: &RadrootsSimplexAgentQueueDescriptor, +) -> Result<(), RadrootsSimplexAgentProtoError> { + push_short_bytes(buffer, descriptor.queue_uri.to_string().as_bytes())?; + push_maybe( + buffer, + descriptor.replaced_queue.as_ref(), + encode_queue_address, + )?; + buffer.push(encode_bool(descriptor.primary)); + push_maybe_short_bytes(buffer, descriptor.sender_key.as_deref())?; + Ok(()) +} + +fn decode_queue_descriptor( + cursor: &mut Cursor<'_>, +) -> Result<RadrootsSimplexAgentQueueDescriptor, RadrootsSimplexAgentProtoError> { + let queue_uri = String::from_utf8(cursor.read_short_bytes()?) + .map_err(|error| RadrootsSimplexAgentProtoError::InvalidUtf8(error.to_string()))?; + Ok(RadrootsSimplexAgentQueueDescriptor { + queue_uri: RadrootsSimplexSmpQueueUri::parse(&queue_uri)?, + replaced_queue: cursor.read_maybe(decode_queue_address)?, + primary: decode_bool(cursor.read_byte()?)?, + sender_key: cursor.read_maybe(decode_short_bytes)?, + }) +} + +fn encode_queue_use_decision( + buffer: &mut Vec<u8>, + decision: &RadrootsSimplexAgentQueueUseDecision, +) -> Result<(), RadrootsSimplexAgentProtoError> { + encode_queue_address(buffer, &decision.queue_address)?; + buffer.push(encode_bool(decision.primary)); + Ok(()) +} + +fn decode_queue_use_decision( + cursor: &mut Cursor<'_>, +) -> Result<RadrootsSimplexAgentQueueUseDecision, RadrootsSimplexAgentProtoError> { + Ok(RadrootsSimplexAgentQueueUseDecision { + queue_address: decode_queue_address(cursor)?, + primary: decode_bool(cursor.read_byte()?)?, + }) +} + +fn encode_queue_address( + buffer: &mut Vec<u8>, + queue: &RadrootsSimplexAgentQueueAddress, +) -> Result<(), RadrootsSimplexAgentProtoError> { + push_short_bytes(buffer, queue.server.server_identity.as_bytes())?; + push_list(buffer, &queue.server.hosts, |buffer, host| { + push_short_bytes(buffer, host.as_bytes()) + })?; + let port = queue + .server + .port + .map(|value| value.to_string()) + .unwrap_or_default(); + push_short_bytes(buffer, port.as_bytes())?; + push_short_bytes(buffer, &queue.sender_id)?; + Ok(()) +} + +fn decode_queue_address( + cursor: &mut Cursor<'_>, +) -> Result<RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentProtoError> { + let server_identity = String::from_utf8(cursor.read_short_bytes()?) + .map_err(|error| RadrootsSimplexAgentProtoError::InvalidUtf8(error.to_string()))?; + let hosts = cursor.read_list(|cursor| { + let host = String::from_utf8(cursor.read_short_bytes()?) + .map_err(|error| RadrootsSimplexAgentProtoError::InvalidUtf8(error.to_string()))?; + Ok(host) + })?; + let port_raw = String::from_utf8(cursor.read_short_bytes()?) + .map_err(|error| RadrootsSimplexAgentProtoError::InvalidUtf8(error.to_string()))?; + let port = if port_raw.is_empty() { + None + } else { + Some( + port_raw + .parse::<u16>() + .map_err(|_| RadrootsSimplexAgentProtoError::InvalidUtf8(port_raw.clone()))?, + ) + }; + Ok(RadrootsSimplexAgentQueueAddress { + server: RadrootsSimplexSmpServerAddress { + server_identity, + hosts, + port, + }, + sender_id: cursor.read_short_bytes()?, + }) +} + +fn encode_ratchet_header( + header: &RadrootsSimplexSmpRatchetHeader, +) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let mut buffer = Vec::new(); + buffer.extend_from_slice(&header.previous_sending_chain_length.to_be_bytes()); + buffer.extend_from_slice(&header.message_number.to_be_bytes()); + push_short_bytes(&mut buffer, &header.dh_public_key)?; + push_maybe_short_bytes(&mut buffer, header.pq_public_key.as_deref())?; + push_maybe_short_bytes(&mut buffer, header.pq_ciphertext.as_deref())?; + Ok(buffer) +} + +fn decode_ratchet_header( + bytes: &[u8], +) -> Result<RadrootsSimplexSmpRatchetHeader, RadrootsSimplexAgentProtoError> { + let mut cursor = Cursor::new(bytes); + let header = RadrootsSimplexSmpRatchetHeader { + previous_sending_chain_length: cursor.read_u32()?, + message_number: cursor.read_u32()?, + dh_public_key: cursor.read_short_bytes()?, + pq_public_key: cursor.read_maybe(decode_short_bytes)?, + pq_ciphertext: cursor.read_maybe(decode_short_bytes)?, + }; + cursor.finish()?; + Ok(header) +} + +fn decode_short_bytes(cursor: &mut Cursor<'_>) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + cursor.read_short_bytes() +} + +fn push_short_bytes( + buffer: &mut Vec<u8>, + value: &[u8], +) -> Result<(), RadrootsSimplexAgentProtoError> { + if value.len() > u8::MAX as usize { + return Err(RadrootsSimplexAgentProtoError::InvalidShortFieldLength( + value.len(), + )); + } + buffer.push(value.len() as u8); + buffer.extend_from_slice(value); + Ok(()) +} + +fn push_large_bytes( + buffer: &mut Vec<u8>, + value: &[u8], +) -> Result<(), RadrootsSimplexAgentProtoError> { + if value.len() > u16::MAX as usize { + return Err(RadrootsSimplexAgentProtoError::InvalidLargeFieldLength( + value.len(), + )); + } + buffer.extend_from_slice(&(value.len() as u16).to_be_bytes()); + buffer.extend_from_slice(value); + Ok(()) +} + +fn push_maybe_short_bytes( + buffer: &mut Vec<u8>, + value: Option<&[u8]>, +) -> Result<(), RadrootsSimplexAgentProtoError> { + match value { + Some(value) => { + buffer.push(1); + push_short_bytes(buffer, value) + } + None => { + buffer.push(0); + Ok(()) + } + } +} + +fn push_maybe<T>( + buffer: &mut Vec<u8>, + value: Option<&T>, + encode: fn(&mut Vec<u8>, &T) -> Result<(), RadrootsSimplexAgentProtoError>, +) -> Result<(), RadrootsSimplexAgentProtoError> { + match value { + Some(value) => { + buffer.push(1); + encode(buffer, value) + } + None => { + buffer.push(0); + Ok(()) + } + } +} + +fn push_list<T>( + buffer: &mut Vec<u8>, + values: &[T], + encode: fn(&mut Vec<u8>, &T) -> Result<(), RadrootsSimplexAgentProtoError>, +) -> Result<(), RadrootsSimplexAgentProtoError> { + if values.len() > u8::MAX as usize { + return Err(RadrootsSimplexAgentProtoError::InvalidShortFieldLength( + values.len(), + )); + } + buffer.push(values.len() as u8); + for value in values { + encode(buffer, value)?; + } + Ok(()) +} + +const fn encode_bool(value: bool) -> u8 { + if value { 1 } else { 0 } +} + +fn decode_bool(value: u8) -> Result<bool, RadrootsSimplexAgentProtoError> { + match value { + 0 => Ok(false), + 1 => Ok(true), + other => Err(RadrootsSimplexAgentProtoError::InvalidBoolEncoding(other)), + } +} + +struct Cursor<'a> { + bytes: &'a [u8], + position: usize, +} + +impl<'a> Cursor<'a> { + const fn new(bytes: &'a [u8]) -> Self { + Self { bytes, position: 0 } + } + + fn finish(&self) -> Result<(), RadrootsSimplexAgentProtoError> { + if self.position == self.bytes.len() { + Ok(()) + } else { + Err(RadrootsSimplexAgentProtoError::TrailingBytes) + } + } + + fn read_byte(&mut self) -> Result<u8, RadrootsSimplexAgentProtoError> { + let Some(value) = self.bytes.get(self.position) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += 1; + Ok(*value) + } + + fn expect_tag(&mut self, tag: &[u8]) -> Result<(), RadrootsSimplexAgentProtoError> { + let Some(value) = self.bytes.get(self.position..self.position + tag.len()) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + if value != tag { + return Err(RadrootsSimplexAgentProtoError::InvalidTag( + String::from_utf8_lossy(value).into_owned(), + )); + } + self.position += tag.len(); + Ok(()) + } + + fn read_u16(&mut self) -> Result<u16, RadrootsSimplexAgentProtoError> { + let Some(value) = self.bytes.get(self.position..self.position + 2) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += 2; + Ok(u16::from_be_bytes([value[0], value[1]])) + } + + fn read_u32(&mut self) -> Result<u32, RadrootsSimplexAgentProtoError> { + let Some(value) = self.bytes.get(self.position..self.position + 4) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += 4; + Ok(u32::from_be_bytes([value[0], value[1], value[2], value[3]])) + } + + fn read_u64(&mut self) -> Result<u64, RadrootsSimplexAgentProtoError> { + let Some(value) = self.bytes.get(self.position..self.position + 8) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += 8; + Ok(u64::from_be_bytes([ + value[0], value[1], value[2], value[3], value[4], value[5], value[6], value[7], + ])) + } + + fn read_short_bytes(&mut self) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let length = self.read_byte()? as usize; + let Some(value) = self.bytes.get(self.position..self.position + length) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += length; + Ok(value.to_vec()) + } + + fn read_large_bytes(&mut self) -> Result<Vec<u8>, RadrootsSimplexAgentProtoError> { + let length = self.read_u16()? as usize; + let Some(value) = self.bytes.get(self.position..self.position + length) else { + return Err(RadrootsSimplexAgentProtoError::UnexpectedEof); + }; + self.position += length; + Ok(value.to_vec()) + } + + fn read_maybe<T>( + &mut self, + decode: fn(&mut Cursor<'_>) -> Result<T, RadrootsSimplexAgentProtoError>, + ) -> Result<Option<T>, RadrootsSimplexAgentProtoError> { + match self.read_byte()? { + 0 => Ok(None), + 1 => decode(self).map(Some), + other => Err(RadrootsSimplexAgentProtoError::InvalidBoolEncoding(other)), + } + } + + fn read_list<T>( + &mut self, + decode: fn(&mut Cursor<'_>) -> Result<T, RadrootsSimplexAgentProtoError>, + ) -> Result<Vec<T>, RadrootsSimplexAgentProtoError> { + let len = self.read_byte()? as usize; + let mut values = Vec::with_capacity(len); + for _ in 0..len { + values.push(decode(self)?); + } + Ok(values) + } + + fn read_remaining(&self) -> &'a [u8] { + &self.bytes[self.position..] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpQueueMode, RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpVersionRange, + }; + + fn sample_queue_uri() -> RadrootsSimplexSmpQueueUri { + RadrootsSimplexSmpQueueUri::parse( + "smp://aGVsbG8@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m", + ) + .unwrap() + } + + #[test] + fn roundtrips_connection_link() { + let link = RadrootsSimplexAgentConnectionLink { + invitation_queue: sample_queue_uri(), + connection_id: b"conn-1".to_vec(), + e2e_public_key: b"e2e".to_vec(), + contact_address: true, + }; + let encoded = encode_connection_link(&link).unwrap(); + let decoded = decode_connection_link(&encoded).unwrap(); + assert_eq!(decoded, link); + } + + #[test] + fn roundtrips_message_frame_and_envelope() { + let descriptor = RadrootsSimplexAgentQueueDescriptor { + queue_uri: sample_queue_uri(), + replaced_queue: None, + primary: true, + sender_key: Some(b"sender-key".to_vec()), + }; + let frame = RadrootsSimplexAgentMessageFrame { + header: RadrootsSimplexAgentMessageHeader { + message_id: 7, + previous_message_hash: b"hash".to_vec(), + }, + message: RadrootsSimplexAgentMessage::QueueAdd(vec![descriptor.clone()]), + padding: b"pad".to_vec(), + }; + let decrypted = RadrootsSimplexAgentDecryptedMessage::Message(frame); + let encoded_decrypted = encode_decrypted_message(&decrypted).unwrap(); + let decoded_decrypted = decode_decrypted_message(&encoded_decrypted).unwrap(); + assert_eq!(decoded_decrypted, decrypted); + + let envelope = + RadrootsSimplexAgentEnvelope::Message(RadrootsSimplexAgentEncryptedPayload { + ratchet_header: Some(RadrootsSimplexSmpRatchetHeader { + previous_sending_chain_length: 1, + message_number: 2, + dh_public_key: b"dh".to_vec(), + pq_public_key: Some(b"pq".to_vec()), + pq_ciphertext: Some(b"ct".to_vec()), + }), + ciphertext: encoded_decrypted, + }); + let encoded_envelope = encode_envelope(&envelope).unwrap(); + let decoded_envelope = decode_envelope(&encoded_envelope).unwrap(); + assert_eq!(decoded_envelope, envelope); + + assert_eq!( + descriptor.client_version_range(), + RadrootsSimplexSmpVersionRange::single(4) + ); + assert_eq!( + descriptor.queue_uri.queue_mode, + Some(RadrootsSimplexSmpQueueMode::Messaging) + ); + } +} diff --git a/crates/simplex-agent-proto/src/error.rs b/crates/simplex-agent-proto/src/error.rs @@ -0,0 +1,45 @@ +use alloc::string::String; +use core::fmt; +use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpProtoError; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentProtoError { + Proto(RadrootsSimplexSmpProtoError), + UnexpectedEof, + InvalidTag(String), + InvalidUtf8(String), + InvalidShortFieldLength(usize), + InvalidLargeFieldLength(usize), + InvalidBoolEncoding(u8), + TrailingBytes, +} + +impl From<RadrootsSimplexSmpProtoError> for RadrootsSimplexAgentProtoError { + fn from(value: RadrootsSimplexSmpProtoError) -> Self { + Self::Proto(value) + } +} + +impl fmt::Display for RadrootsSimplexAgentProtoError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Proto(error) => write!(f, "{error}"), + Self::UnexpectedEof => write!(f, "unexpected end of SimpleX agent input"), + Self::InvalidTag(tag) => write!(f, "invalid SimpleX agent tag `{tag}`"), + Self::InvalidUtf8(error) => write!(f, "invalid UTF-8 in SimpleX agent field: {error}"), + Self::InvalidShortFieldLength(length) => { + write!(f, "invalid SimpleX agent short field length {length}") + } + Self::InvalidLargeFieldLength(length) => { + write!(f, "invalid SimpleX agent large field length {length}") + } + Self::InvalidBoolEncoding(value) => { + write!(f, "invalid SimpleX agent bool encoding `{value}`") + } + Self::TrailingBytes => write!(f, "trailing bytes after SimpleX agent decode"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RadrootsSimplexAgentProtoError {} diff --git a/crates/simplex-agent-proto/src/lib.rs b/crates/simplex-agent-proto/src/lib.rs @@ -0,0 +1,30 @@ +#![cfg_attr(not(feature = "std"), no_std)] +#![forbid(unsafe_code)] + +extern crate alloc; + +pub mod codec; +pub mod error; +pub mod model; + +pub mod prelude { + pub use crate::codec::{ + decode_agent_message_frame, decode_connection_link, decode_decrypted_message, + decode_envelope, encode_agent_message_frame, encode_connection_link, + encode_decrypted_message, encode_envelope, + }; + pub use crate::error::RadrootsSimplexAgentProtoError; + pub use crate::model::{ + RADROOTS_SIMPLEX_AGENT_CURRENT_VERSION, RadrootsSimplexAgentConnectionLink, + RadrootsSimplexAgentConnectionMode, RadrootsSimplexAgentConnectionStatus, + RadrootsSimplexAgentDecryptedMessage, RadrootsSimplexAgentEncryptedPayload, + RadrootsSimplexAgentEnvelope, RadrootsSimplexAgentMessage, + RadrootsSimplexAgentMessageFrame, RadrootsSimplexAgentMessageHeader, + RadrootsSimplexAgentMessageId, RadrootsSimplexAgentMessageReceipt, + RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor, + RadrootsSimplexAgentQueueUseDecision, + }; + pub use radroots_simplex_smp_crypto::prelude::{ + RadrootsSimplexSmpRatchetHeader, RadrootsSimplexSmpRatchetState, + }; +} diff --git a/crates/simplex-agent-proto/src/model.rs b/crates/simplex-agent-proto/src/model.rs @@ -0,0 +1,137 @@ +use alloc::vec::Vec; +use radroots_simplex_smp_crypto::prelude::RadrootsSimplexSmpRatchetHeader; +use radroots_simplex_smp_proto::prelude::{ + RadrootsSimplexSmpQueueUri, RadrootsSimplexSmpServerAddress, RadrootsSimplexSmpVersionRange, +}; + +pub const RADROOTS_SIMPLEX_AGENT_CURRENT_VERSION: u16 = 5; +pub type RadrootsSimplexAgentMessageId = u64; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RadrootsSimplexAgentConnectionMode { + Direct, + ContactAddress, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RadrootsSimplexAgentConnectionStatus { + CreatePending, + InvitationReady, + JoinPending, + AwaitingApproval, + Allowed, + Connected, + Suspended, + Rotating, + Deleted, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentConnectionLink { + pub invitation_queue: RadrootsSimplexSmpQueueUri, + pub connection_id: Vec<u8>, + pub e2e_public_key: Vec<u8>, + pub contact_address: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentQueueAddress { + pub server: RadrootsSimplexSmpServerAddress, + pub sender_id: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentQueueDescriptor { + pub queue_uri: RadrootsSimplexSmpQueueUri, + pub replaced_queue: Option<RadrootsSimplexAgentQueueAddress>, + pub primary: bool, + pub sender_key: Option<Vec<u8>>, +} + +impl RadrootsSimplexAgentQueueDescriptor { + pub const fn client_version_range(&self) -> RadrootsSimplexSmpVersionRange { + self.queue_uri.version_range + } + + pub fn queue_address(&self) -> RadrootsSimplexAgentQueueAddress { + RadrootsSimplexAgentQueueAddress { + server: self.queue_uri.server.clone(), + sender_id: self.queue_uri.sender_id.as_bytes().to_vec(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentQueueUseDecision { + pub queue_address: RadrootsSimplexAgentQueueAddress, + pub primary: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentMessageHeader { + pub message_id: RadrootsSimplexAgentMessageId, + pub previous_message_hash: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentMessageReceipt { + pub message_id: RadrootsSimplexAgentMessageId, + pub message_hash: Vec<u8>, + pub receipt_info: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentMessage { + Hello, + UserMessage(Vec<u8>), + Receipt(RadrootsSimplexAgentMessageReceipt), + EncryptionReady { + up_to_message_id: RadrootsSimplexAgentMessageId, + }, + QueueContinue(RadrootsSimplexAgentQueueAddress), + QueueAdd(Vec<RadrootsSimplexAgentQueueDescriptor>), + QueueKey(Vec<RadrootsSimplexAgentQueueDescriptor>), + QueueUse(Vec<RadrootsSimplexAgentQueueUseDecision>), + QueueTest(Vec<RadrootsSimplexAgentQueueAddress>), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentMessageFrame { + pub header: RadrootsSimplexAgentMessageHeader, + pub message: RadrootsSimplexAgentMessage, + pub padding: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentDecryptedMessage { + ConnectionInfo(Vec<u8>), + ConnectionInfoReply { + reply_queues: Vec<RadrootsSimplexAgentQueueDescriptor>, + info: Vec<u8>, + }, + RatchetInfo(Vec<u8>), + Message(RadrootsSimplexAgentMessageFrame), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentEncryptedPayload { + pub ratchet_header: Option<RadrootsSimplexSmpRatchetHeader>, + pub ciphertext: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentEnvelope { + Confirmation { + reply_queue: bool, + encrypted: RadrootsSimplexAgentEncryptedPayload, + }, + Message(RadrootsSimplexAgentEncryptedPayload), + Invitation { + request: Vec<u8>, + connection_info: Vec<u8>, + }, + RatchetKey { + info: Vec<u8>, + encrypted: RadrootsSimplexAgentEncryptedPayload, + }, +} diff --git a/crates/simplex-agent-runtime/Cargo.toml b/crates/simplex-agent-runtime/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "radroots-simplex-agent-runtime" +version = "0.1.0-alpha.1" +edition.workspace = true +authors = [ + "Radroots Authors", +] +rust-version.workspace = true +license.workspace = true +description = "simplex agent runtime coordination for the radroots sdk" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots-simplex-agent-runtime" +readme.workspace = true + +[features] +default = ["std"] +std = [ + "radroots-simplex-agent-proto/std", + "radroots-simplex-agent-store/std", + "radroots-simplex-smp-crypto/std", + "radroots-simplex-smp-proto/std", + "radroots-simplex-smp-transport/std", + "sha2/std", +] + +[dependencies] +radroots-simplex-agent-proto = { workspace = true, default-features = false } +radroots-simplex-agent-store = { workspace = true, default-features = false } +radroots-simplex-smp-crypto = { workspace = true, default-features = false } +radroots-simplex-smp-proto = { workspace = true, default-features = false } +radroots-simplex-smp-transport = { workspace = true, default-features = false } +sha2 = { workspace = true, default-features = false } diff --git a/crates/simplex-agent-runtime/src/error.rs b/crates/simplex-agent-runtime/src/error.rs @@ -0,0 +1,44 @@ +use alloc::string::String; +use core::fmt; +use radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentProtoError; +use radroots_simplex_agent_store::prelude::RadrootsSimplexAgentStoreError; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentRuntimeError { + Proto(RadrootsSimplexAgentProtoError), + Store(RadrootsSimplexAgentStoreError), + MissingConfig(&'static str), + InvalidConfig(&'static str), + Runtime(String), +} + +impl From<RadrootsSimplexAgentProtoError> for RadrootsSimplexAgentRuntimeError { + fn from(value: RadrootsSimplexAgentProtoError) -> Self { + Self::Proto(value) + } +} + +impl From<RadrootsSimplexAgentStoreError> for RadrootsSimplexAgentRuntimeError { + fn from(value: RadrootsSimplexAgentStoreError) -> Self { + Self::Store(value) + } +} + +impl fmt::Display for RadrootsSimplexAgentRuntimeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Proto(error) => write!(f, "{error}"), + Self::Store(error) => write!(f, "{error}"), + Self::MissingConfig(field) => { + write!(f, "missing SimpleX agent runtime config `{field}`") + } + Self::InvalidConfig(field) => { + write!(f, "invalid SimpleX agent runtime config `{field}`") + } + Self::Runtime(message) => write!(f, "{message}"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RadrootsSimplexAgentRuntimeError {} diff --git a/crates/simplex-agent-runtime/src/lib.rs b/crates/simplex-agent-runtime/src/lib.rs @@ -0,0 +1,14 @@ +#![cfg_attr(not(feature = "std"), no_std)] +#![forbid(unsafe_code)] + +extern crate alloc; + +pub mod error; +pub mod runtime; +pub mod types; + +pub mod prelude { + pub use crate::error::RadrootsSimplexAgentRuntimeError; + pub use crate::runtime::{RadrootsSimplexAgentRuntime, RadrootsSimplexAgentRuntimeBuilder}; + pub use crate::types::{RadrootsSimplexAgentCommandOutcome, RadrootsSimplexAgentRuntimeEvent}; +} diff --git a/crates/simplex-agent-runtime/src/runtime.rs b/crates/simplex-agent-runtime/src/runtime.rs @@ -0,0 +1,656 @@ +use crate::error::RadrootsSimplexAgentRuntimeError; +use crate::types::{RadrootsSimplexAgentCommandOutcome, RadrootsSimplexAgentRuntimeEvent}; +use alloc::collections::VecDeque; +use alloc::string::String; +use alloc::vec::Vec; +use radroots_simplex_agent_proto::prelude::{ + RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentConnectionMode, + RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentDecryptedMessage, + RadrootsSimplexAgentEncryptedPayload, RadrootsSimplexAgentEnvelope, + RadrootsSimplexAgentMessage, RadrootsSimplexAgentMessageFrame, + RadrootsSimplexAgentMessageHeader, RadrootsSimplexAgentMessageReceipt, + RadrootsSimplexAgentQueueDescriptor, RadrootsSimplexSmpRatchetState, encode_decrypted_message, +}; +use radroots_simplex_agent_store::prelude::{ + RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentPendingCommandKind, + RadrootsSimplexAgentQueueRole, RadrootsSimplexAgentStore, +}; +use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri; +use sha2::{Digest, Sha256}; + +pub struct RadrootsSimplexAgentRuntimeBuilder { + store: Option<RadrootsSimplexAgentStore>, + queue_capacity: usize, + retry_delay_ms: u64, +} + +impl RadrootsSimplexAgentRuntimeBuilder { + pub const DEFAULT_QUEUE_CAPACITY: usize = 2_048; + pub const DEFAULT_RETRY_DELAY_MS: u64 = 5_000; + + pub fn new() -> Self { + Self { + store: None, + queue_capacity: Self::DEFAULT_QUEUE_CAPACITY, + retry_delay_ms: Self::DEFAULT_RETRY_DELAY_MS, + } + } + + pub fn store(mut self, store: RadrootsSimplexAgentStore) -> Self { + self.store = Some(store); + self + } + + pub fn queue_capacity(mut self, queue_capacity: usize) -> Self { + self.queue_capacity = queue_capacity; + self + } + + pub fn retry_delay_ms(mut self, retry_delay_ms: u64) -> Self { + self.retry_delay_ms = retry_delay_ms; + self + } + + pub fn build(self) -> Result<RadrootsSimplexAgentRuntime, RadrootsSimplexAgentRuntimeError> { + if self.queue_capacity == 0 { + return Err(RadrootsSimplexAgentRuntimeError::InvalidConfig( + "queue_capacity", + )); + } + Ok(RadrootsSimplexAgentRuntime { + store: self.store.unwrap_or_default(), + events: VecDeque::with_capacity(self.queue_capacity), + retry_delay_ms: self.retry_delay_ms, + }) + } +} + +impl Default for RadrootsSimplexAgentRuntimeBuilder { + fn default() -> Self { + Self::new() + } +} + +pub struct RadrootsSimplexAgentRuntime { + store: RadrootsSimplexAgentStore, + events: VecDeque<RadrootsSimplexAgentRuntimeEvent>, + retry_delay_ms: u64, +} + +impl RadrootsSimplexAgentRuntime { + pub fn create_connection( + &mut self, + invitation_queue: RadrootsSimplexSmpQueueUri, + e2e_public_key: Vec<u8>, + contact_address: bool, + now: u64, + ) -> Result<String, RadrootsSimplexAgentRuntimeError> { + let ratchet_state = RadrootsSimplexSmpRatchetState::initiator( + b"local-dh".to_vec(), + invitation_queue.recipient_dh_public_key.as_bytes().to_vec(), + None, + ) + .ok(); + let connection = self.store.create_connection( + if contact_address { + RadrootsSimplexAgentConnectionMode::ContactAddress + } else { + RadrootsSimplexAgentConnectionMode::Direct + }, + RadrootsSimplexAgentConnectionStatus::InvitationReady, + None, + ratchet_state, + ); + let invitation = RadrootsSimplexAgentConnectionLink { + invitation_queue: invitation_queue.clone(), + connection_id: connection.id.as_bytes().to_vec(), + e2e_public_key, + contact_address, + }; + self.store.connection_mut(&connection.id)?.invitation = Some(invitation.clone()); + let descriptor = RadrootsSimplexAgentQueueDescriptor { + queue_uri: invitation_queue, + replaced_queue: None, + primary: true, + sender_key: None, + }; + self.store.add_queue( + &connection.id, + descriptor.clone(), + RadrootsSimplexAgentQueueRole::Receive, + true, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::CreateQueue { + descriptor: descriptor.clone(), + }, + now, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { + queue: descriptor.queue_address(), + }, + now, + )?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::InvitationReady { + connection_id: connection.id.clone(), + invitation, + }); + Ok(connection.id) + } + + pub fn join_connection( + &mut self, + invitation: RadrootsSimplexAgentConnectionLink, + reply_queue: RadrootsSimplexSmpQueueUri, + now: u64, + ) -> Result<String, RadrootsSimplexAgentRuntimeError> { + let ratchet_state = RadrootsSimplexSmpRatchetState::responder( + b"reply-dh".to_vec(), + invitation + .invitation_queue + .recipient_dh_public_key + .as_bytes() + .to_vec(), + None, + ) + .ok(); + let connection = self.store.create_connection( + RadrootsSimplexAgentConnectionMode::Direct, + RadrootsSimplexAgentConnectionStatus::JoinPending, + Some(invitation.clone()), + ratchet_state, + ); + let send_descriptor = RadrootsSimplexAgentQueueDescriptor { + queue_uri: invitation.invitation_queue.clone(), + replaced_queue: None, + primary: true, + sender_key: Some(b"sender-auth".to_vec()), + }; + let receive_descriptor = RadrootsSimplexAgentQueueDescriptor { + queue_uri: reply_queue, + replaced_queue: None, + primary: true, + sender_key: None, + }; + self.store.add_queue( + &connection.id, + send_descriptor.clone(), + RadrootsSimplexAgentQueueRole::Send, + true, + )?; + self.store.add_queue( + &connection.id, + receive_descriptor.clone(), + RadrootsSimplexAgentQueueRole::Receive, + true, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SecureQueue { + queue: send_descriptor.queue_address(), + sender_key: send_descriptor.sender_key.clone(), + }, + now, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::CreateQueue { + descriptor: receive_descriptor.clone(), + }, + now, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { + queue: receive_descriptor.queue_address(), + }, + now, + )?; + self.store.enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue: send_descriptor.queue_address(), + envelope: RadrootsSimplexAgentEnvelope::Confirmation { + reply_queue: true, + encrypted: RadrootsSimplexAgentEncryptedPayload { + ratchet_header: None, + ciphertext: invitation.connection_id, + }, + }, + }, + now, + )?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { + connection_id: connection.id.clone(), + }); + Ok(connection.id) + } + + pub fn allow_connection( + &mut self, + connection_id: &str, + local_info: Vec<u8>, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + self.store + .set_status(connection_id, RadrootsSimplexAgentConnectionStatus::Allowed)?; + let send_queue = self.store.primary_send_queue(connection_id)?; + let encrypted = RadrootsSimplexAgentEncryptedPayload { + ratchet_header: None, + ciphertext: local_info, + }; + self.store.enqueue_command( + connection_id, + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue: send_queue.descriptor.queue_address(), + envelope: RadrootsSimplexAgentEnvelope::Confirmation { + reply_queue: false, + encrypted, + }, + }, + now, + )?; + Ok(()) + } + + pub fn subscribe_connection( + &mut self, + connection_id: &str, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + for queue in self.store.receive_queues(connection_id)? { + self.store.enqueue_command( + connection_id, + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { + queue: queue.descriptor.queue_address(), + }, + now, + )?; + } + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { + connection_id: connection_id.into(), + }); + Ok(()) + } + + pub fn send_message( + &mut self, + connection_id: &str, + body: Vec<u8>, + now: u64, + ) -> Result<u64, RadrootsSimplexAgentRuntimeError> { + let send_queue = self.store.primary_send_queue(connection_id)?; + let previous_hash = self + .store + .connection(connection_id)? + .delivery_cursor + .last_sent_message_hash + .clone() + .unwrap_or_default(); + let message_id = self + .store + .connection(connection_id)? + .delivery_cursor + .last_sent_message_id + .unwrap_or(0) + .saturating_add(1); + let frame = RadrootsSimplexAgentMessageFrame { + header: RadrootsSimplexAgentMessageHeader { + message_id, + previous_message_hash: previous_hash, + }, + message: RadrootsSimplexAgentMessage::UserMessage(body.clone()), + padding: Vec::new(), + }; + let ciphertext = + encode_decrypted_message(&RadrootsSimplexAgentDecryptedMessage::Message(frame))?; + let message_hash = Sha256::digest(&ciphertext).to_vec(); + self.store + .record_outbound_message(connection_id, message_id, message_hash)?; + self.store.enqueue_command( + connection_id, + RadrootsSimplexAgentPendingCommandKind::SendEnvelope { + queue: send_queue.descriptor.queue_address(), + envelope: RadrootsSimplexAgentEnvelope::Message( + RadrootsSimplexAgentEncryptedPayload { + ratchet_header: None, + ciphertext, + }, + ), + }, + now, + )?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::MessageQueued { + connection_id: connection_id.into(), + message_id, + }); + Ok(message_id) + } + + pub fn ack_message( + &mut self, + connection_id: &str, + message_id: u64, + message_hash: Vec<u8>, + receipt_info: Vec<u8>, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + let send_queue = self.store.primary_send_queue(connection_id)?; + self.store.enqueue_command( + connection_id, + RadrootsSimplexAgentPendingCommandKind::AckInboxMessage { + queue: send_queue.descriptor.queue_address(), + receipt: RadrootsSimplexAgentMessageReceipt { + message_id, + message_hash, + receipt_info, + }, + }, + now, + )?; + Ok(()) + } + + pub fn reconnect_connection( + &mut self, + connection_id: &str, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + self.subscribe_connection(connection_id, now)?; + let ready = self.store.take_ready_commands(now, usize::MAX); + for command in ready { + self.store + .mark_command_retry(command.id, now + self.retry_delay_ms)?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued { + connection_id: connection_id.into(), + command_id: command.id, + }); + } + Ok(()) + } + + pub fn queue_rotation( + &mut self, + connection_id: &str, + descriptors: Vec<RadrootsSimplexAgentQueueDescriptor>, + now: u64, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + self.store.set_status( + connection_id, + RadrootsSimplexAgentConnectionStatus::Rotating, + )?; + self.store.enqueue_command( + connection_id, + RadrootsSimplexAgentPendingCommandKind::RotateQueues { descriptors }, + now, + )?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { + connection_id: connection_id.into(), + }); + Ok(()) + } + + pub fn handle_inbound_decrypted_message( + &mut self, + connection_id: &str, + message: RadrootsSimplexAgentDecryptedMessage, + transport_hash: Vec<u8>, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + match message { + RadrootsSimplexAgentDecryptedMessage::ConnectionInfo(info) => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { + connection_id: connection_id.into(), + info, + }); + } + RadrootsSimplexAgentDecryptedMessage::ConnectionInfoReply { reply_queues, info } => { + for descriptor in reply_queues { + self.store.add_queue( + connection_id, + descriptor, + RadrootsSimplexAgentQueueRole::Send, + true, + )?; + } + self.store.set_status( + connection_id, + RadrootsSimplexAgentConnectionStatus::AwaitingApproval, + )?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::ConfirmationRequired { + connection_id: connection_id.into(), + }); + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { + connection_id: connection_id.into(), + info, + }); + } + RadrootsSimplexAgentDecryptedMessage::RatchetInfo(info) => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::ConnectionInfo { + connection_id: connection_id.into(), + info, + }); + } + RadrootsSimplexAgentDecryptedMessage::Message(frame) => { + self.store.record_inbound_message( + connection_id, + frame.header.message_id, + transport_hash, + )?; + match frame.message { + RadrootsSimplexAgentMessage::Hello => { + self.store.set_status( + connection_id, + RadrootsSimplexAgentConnectionStatus::Connected, + )?; + self.events.push_back( + RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { + connection_id: connection_id.into(), + }, + ); + } + RadrootsSimplexAgentMessage::Receipt(receipt) => { + self.events.push_back( + RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { + connection_id: connection_id.into(), + message_id: receipt.message_id, + }, + ); + } + RadrootsSimplexAgentMessage::QueueAdd(_) + | RadrootsSimplexAgentMessage::QueueKey(_) + | RadrootsSimplexAgentMessage::QueueUse(_) + | RadrootsSimplexAgentMessage::QueueTest(_) + | RadrootsSimplexAgentMessage::QueueContinue(_) => { + self.events.push_back( + RadrootsSimplexAgentRuntimeEvent::QueueRotationQueued { + connection_id: connection_id.into(), + }, + ); + } + _ => { + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::MessageReceived { + connection_id: connection_id.into(), + message_id: frame.header.message_id, + }); + } + } + } + } + Ok(()) + } + + pub fn record_command_outcome( + &mut self, + command_id: u64, + outcome: RadrootsSimplexAgentCommandOutcome, + ) -> Result<(), RadrootsSimplexAgentRuntimeError> { + match outcome { + RadrootsSimplexAgentCommandOutcome::Delivered => { + let _ = self.store.mark_command_delivered(command_id)?; + } + RadrootsSimplexAgentCommandOutcome::RetryAt { ready_at } => { + let command = self.store.mark_command_retry(command_id, ready_at)?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::RetryQueued { + connection_id: command.connection_id, + command_id, + }); + } + RadrootsSimplexAgentCommandOutcome::Failed { message } => { + let command = self.store.mark_command_failed(command_id)?; + self.events + .push_back(RadrootsSimplexAgentRuntimeEvent::Error { + connection_id: Some(command.connection_id), + message, + }); + } + } + Ok(()) + } + + pub fn retry_pending( + &mut self, + now: u64, + limit: usize, + ) -> Vec<RadrootsSimplexAgentPendingCommand> { + self.store.take_ready_commands(now, limit) + } + + pub fn drain_events(&mut self, max: usize) -> Vec<RadrootsSimplexAgentRuntimeEvent> { + let take = self.events.len().min(max); + (0..take) + .filter_map(|_| self.events.pop_front()) + .collect::<Vec<_>>() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri; + + fn invitation_queue() -> RadrootsSimplexSmpQueueUri { + RadrootsSimplexSmpQueueUri::parse( + "smp://aGVsbG8@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m", + ) + .unwrap() + } + + fn reply_queue() -> RadrootsSimplexSmpQueueUri { + RadrootsSimplexSmpQueueUri::parse( + "smp://aGVsbG8@relay.example/cmVwbHk#/?v=4&dh=YmF6cXV4&q=m", + ) + .unwrap() + } + + #[test] + fn create_join_allow_send_and_retry_flow() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let created = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + assert!(matches!( + runtime.drain_events(10).remove(0), + RadrootsSimplexAgentRuntimeEvent::InvitationReady { .. } + )); + + let invitation = runtime + .store + .connection(&created) + .unwrap() + .invitation + .clone() + .unwrap(); + let joined = runtime + .join_connection(invitation, reply_queue(), 20) + .unwrap(); + runtime + .allow_connection(&joined, b"local-info".to_vec(), 30) + .unwrap(); + runtime.subscribe_connection(&joined, 40).unwrap(); + let message_id = runtime + .send_message(&joined, b"hello simplex".to_vec(), 50) + .unwrap(); + assert_eq!(message_id, 1); + runtime + .ack_message( + &joined, + message_id, + b"hash".to_vec(), + b"receipt".to_vec(), + 60, + ) + .unwrap(); + runtime.reconnect_connection(&joined, 70).unwrap(); + let ready = runtime.retry_pending(70 + 5_000, 64); + assert!(!ready.is_empty()); + } + + #[test] + fn handles_inbound_hello_and_receipt_events() { + let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap(); + let connection_id = runtime + .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10) + .unwrap(); + runtime.drain_events(8); + + runtime + .handle_inbound_decrypted_message( + &connection_id, + RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { + header: RadrootsSimplexAgentMessageHeader { + message_id: 1, + previous_message_hash: Vec::new(), + }, + message: RadrootsSimplexAgentMessage::Hello, + padding: Vec::new(), + }), + b"transport-hash".to_vec(), + ) + .unwrap(); + runtime + .handle_inbound_decrypted_message( + &connection_id, + RadrootsSimplexAgentDecryptedMessage::Message(RadrootsSimplexAgentMessageFrame { + header: RadrootsSimplexAgentMessageHeader { + message_id: 2, + previous_message_hash: b"transport-hash".to_vec(), + }, + message: RadrootsSimplexAgentMessage::Receipt( + RadrootsSimplexAgentMessageReceipt { + message_id: 1, + message_hash: b"transport-hash".to_vec(), + receipt_info: Vec::new(), + }, + ), + padding: Vec::new(), + }), + b"transport-hash-2".to_vec(), + ) + .unwrap(); + + let events = runtime.drain_events(16); + assert!(events.iter().any(|event| matches!( + event, + RadrootsSimplexAgentRuntimeEvent::ConnectionEstablished { .. } + ))); + assert!(events.iter().any(|event| matches!( + event, + RadrootsSimplexAgentRuntimeEvent::MessageAcknowledged { message_id: 1, .. } + ))); + } +} diff --git a/crates/simplex-agent-runtime/src/types.rs b/crates/simplex-agent-runtime/src/types.rs @@ -0,0 +1,54 @@ +use alloc::string::String; +use alloc::vec::Vec; +use radroots_simplex_agent_proto::prelude::RadrootsSimplexAgentConnectionLink; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentRuntimeEvent { + InvitationReady { + connection_id: String, + invitation: RadrootsSimplexAgentConnectionLink, + }, + ConfirmationRequired { + connection_id: String, + }, + ConnectionInfo { + connection_id: String, + info: Vec<u8>, + }, + ConnectionEstablished { + connection_id: String, + }, + MessageQueued { + connection_id: String, + message_id: u64, + }, + MessageReceived { + connection_id: String, + message_id: u64, + }, + MessageAcknowledged { + connection_id: String, + message_id: u64, + }, + SubscriptionQueued { + connection_id: String, + }, + RetryQueued { + connection_id: String, + command_id: u64, + }, + QueueRotationQueued { + connection_id: String, + }, + Error { + connection_id: Option<String>, + message: String, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentCommandOutcome { + Delivered, + RetryAt { ready_at: u64 }, + Failed { message: String }, +} diff --git a/crates/simplex-agent-store/Cargo.toml b/crates/simplex-agent-store/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "radroots-simplex-agent-store" +version = "0.1.0-alpha.1" +edition.workspace = true +authors = [ + "Radroots Authors", +] +rust-version.workspace = true +license.workspace = true +description = "simplex agent persistence and queue state for the radroots sdk" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots-simplex-agent-store" +readme.workspace = true + +[features] +default = ["std"] +std = ["radroots-simplex-agent-proto/std", "radroots-simplex-smp-proto/std"] + +[dependencies] +radroots-simplex-agent-proto = { workspace = true, default-features = false } +radroots-simplex-smp-proto = { workspace = true, default-features = false } diff --git a/crates/simplex-agent-store/src/error.rs b/crates/simplex-agent-store/src/error.rs @@ -0,0 +1,29 @@ +use alloc::string::String; +use core::fmt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentStoreError { + ConnectionNotFound(String), + QueueNotFound(String), + CommandNotFound(u64), + MissingPrimarySendQueue(String), +} + +impl fmt::Display for RadrootsSimplexAgentStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ConnectionNotFound(id) => write!(f, "SimpleX agent connection `{id}` not found"), + Self::QueueNotFound(id) => write!(f, "SimpleX agent queue `{id}` not found"), + Self::CommandNotFound(id) => write!(f, "SimpleX agent command `{id}` not found"), + Self::MissingPrimarySendQueue(id) => { + write!( + f, + "SimpleX agent connection `{id}` has no primary send queue" + ) + } + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RadrootsSimplexAgentStoreError {} diff --git a/crates/simplex-agent-store/src/lib.rs b/crates/simplex-agent-store/src/lib.rs @@ -0,0 +1,17 @@ +#![cfg_attr(not(feature = "std"), no_std)] +#![forbid(unsafe_code)] + +extern crate alloc; + +pub mod error; +pub mod store; + +pub mod prelude { + pub use crate::error::RadrootsSimplexAgentStoreError; + pub use crate::store::{ + RadrootsSimplexAgentConnectionRecord, RadrootsSimplexAgentDeliveryCursor, + RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentPendingCommandKind, + RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentQueueRole, + RadrootsSimplexAgentRecentMessageRecord, RadrootsSimplexAgentStore, + }; +} diff --git a/crates/simplex-agent-store/src/store.rs b/crates/simplex-agent-store/src/store.rs @@ -0,0 +1,402 @@ +use crate::error::RadrootsSimplexAgentStoreError; +use alloc::collections::BTreeMap; +use alloc::string::String; +use alloc::vec::Vec; +use radroots_simplex_agent_proto::prelude::{ + RadrootsSimplexAgentConnectionLink, RadrootsSimplexAgentConnectionMode, + RadrootsSimplexAgentConnectionStatus, RadrootsSimplexAgentEnvelope, + RadrootsSimplexAgentMessageId, RadrootsSimplexAgentMessageReceipt, + RadrootsSimplexAgentQueueAddress, RadrootsSimplexAgentQueueDescriptor, + RadrootsSimplexSmpRatchetState, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RadrootsSimplexAgentQueueRole { + Receive, + Send, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentQueueRecord { + pub descriptor: RadrootsSimplexAgentQueueDescriptor, + pub role: RadrootsSimplexAgentQueueRole, + pub subscribed: bool, + pub primary: bool, + pub tested: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentDeliveryCursor { + pub last_sent_message_id: Option<RadrootsSimplexAgentMessageId>, + pub last_received_message_id: Option<RadrootsSimplexAgentMessageId>, + pub last_sent_message_hash: Option<Vec<u8>>, + pub last_received_message_hash: Option<Vec<u8>>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentRecentMessageRecord { + pub message_id: RadrootsSimplexAgentMessageId, + pub message_hash: Vec<u8>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RadrootsSimplexAgentPendingCommandKind { + CreateQueue { + descriptor: RadrootsSimplexAgentQueueDescriptor, + }, + SecureQueue { + queue: RadrootsSimplexAgentQueueAddress, + sender_key: Option<Vec<u8>>, + }, + SendEnvelope { + queue: RadrootsSimplexAgentQueueAddress, + envelope: RadrootsSimplexAgentEnvelope, + }, + SubscribeQueue { + queue: RadrootsSimplexAgentQueueAddress, + }, + AckInboxMessage { + queue: RadrootsSimplexAgentQueueAddress, + receipt: RadrootsSimplexAgentMessageReceipt, + }, + RotateQueues { + descriptors: Vec<RadrootsSimplexAgentQueueDescriptor>, + }, + TestQueues { + queues: Vec<RadrootsSimplexAgentQueueAddress>, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentPendingCommand { + pub id: u64, + pub connection_id: String, + pub kind: RadrootsSimplexAgentPendingCommandKind, + pub attempts: u32, + pub ready_at: u64, + pub inflight: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RadrootsSimplexAgentConnectionRecord { + pub id: String, + pub mode: RadrootsSimplexAgentConnectionMode, + pub status: RadrootsSimplexAgentConnectionStatus, + pub invitation: Option<RadrootsSimplexAgentConnectionLink>, + pub queues: Vec<RadrootsSimplexAgentQueueRecord>, + pub ratchet_state: Option<RadrootsSimplexSmpRatchetState>, + pub delivery_cursor: RadrootsSimplexAgentDeliveryCursor, + pub recent_messages: Vec<RadrootsSimplexAgentRecentMessageRecord>, +} + +#[derive(Debug, Clone, Default)] +pub struct RadrootsSimplexAgentStore { + next_connection_sequence: u64, + next_command_sequence: u64, + connections: BTreeMap<String, RadrootsSimplexAgentConnectionRecord>, + pending_commands: BTreeMap<u64, RadrootsSimplexAgentPendingCommand>, +} + +impl RadrootsSimplexAgentStore { + pub fn new() -> Self { + Self::default() + } + + pub fn create_connection( + &mut self, + mode: RadrootsSimplexAgentConnectionMode, + status: RadrootsSimplexAgentConnectionStatus, + invitation: Option<RadrootsSimplexAgentConnectionLink>, + ratchet_state: Option<RadrootsSimplexSmpRatchetState>, + ) -> RadrootsSimplexAgentConnectionRecord { + self.next_connection_sequence = self.next_connection_sequence.saturating_add(1); + let id = alloc::format!("conn-{}", self.next_connection_sequence); + let record = RadrootsSimplexAgentConnectionRecord { + id: id.clone(), + mode, + status, + invitation, + queues: Vec::new(), + ratchet_state, + delivery_cursor: RadrootsSimplexAgentDeliveryCursor { + last_sent_message_id: None, + last_received_message_id: None, + last_sent_message_hash: None, + last_received_message_hash: None, + }, + recent_messages: Vec::new(), + }; + self.connections.insert(id, record.clone()); + record + } + + pub fn connection( + &self, + connection_id: &str, + ) -> Result<&RadrootsSimplexAgentConnectionRecord, RadrootsSimplexAgentStoreError> { + self.connections + .get(connection_id) + .ok_or_else(|| RadrootsSimplexAgentStoreError::ConnectionNotFound(connection_id.into())) + } + + pub fn connection_mut( + &mut self, + connection_id: &str, + ) -> Result<&mut RadrootsSimplexAgentConnectionRecord, RadrootsSimplexAgentStoreError> { + self.connections + .get_mut(connection_id) + .ok_or_else(|| RadrootsSimplexAgentStoreError::ConnectionNotFound(connection_id.into())) + } + + pub fn set_status( + &mut self, + connection_id: &str, + status: RadrootsSimplexAgentConnectionStatus, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + self.connection_mut(connection_id)?.status = status; + Ok(()) + } + + pub fn add_queue( + &mut self, + connection_id: &str, + descriptor: RadrootsSimplexAgentQueueDescriptor, + role: RadrootsSimplexAgentQueueRole, + primary: bool, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + let address = descriptor.queue_address(); + if let Some(queue) = connection + .queues + .iter_mut() + .find(|queue| queue.descriptor.queue_address() == address) + { + queue.descriptor = descriptor; + queue.role = role; + queue.primary = primary; + return Ok(()); + } + connection.queues.push(RadrootsSimplexAgentQueueRecord { + descriptor, + role, + subscribed: false, + primary, + tested: false, + }); + Ok(()) + } + + pub fn mark_queue_subscribed( + &mut self, + connection_id: &str, + queue_address: &RadrootsSimplexAgentQueueAddress, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + let Some(queue) = connection + .queues + .iter_mut() + .find(|queue| &queue.descriptor.queue_address() == queue_address) + else { + return Err(RadrootsSimplexAgentStoreError::QueueNotFound( + connection_id.into(), + )); + }; + queue.subscribed = true; + Ok(()) + } + + pub fn primary_send_queue( + &self, + connection_id: &str, + ) -> Result<RadrootsSimplexAgentQueueRecord, RadrootsSimplexAgentStoreError> { + let connection = self.connection(connection_id)?; + connection + .queues + .iter() + .find(|queue| queue.role == RadrootsSimplexAgentQueueRole::Send && queue.primary) + .cloned() + .ok_or_else(|| { + RadrootsSimplexAgentStoreError::MissingPrimarySendQueue(connection_id.into()) + }) + } + + pub fn receive_queues( + &self, + connection_id: &str, + ) -> Result<Vec<RadrootsSimplexAgentQueueRecord>, RadrootsSimplexAgentStoreError> { + let connection = self.connection(connection_id)?; + Ok(connection + .queues + .iter() + .filter(|queue| queue.role == RadrootsSimplexAgentQueueRole::Receive) + .cloned() + .collect()) + } + + pub fn record_outbound_message( + &mut self, + connection_id: &str, + message_id: RadrootsSimplexAgentMessageId, + message_hash: Vec<u8>, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + connection.delivery_cursor.last_sent_message_id = Some(message_id); + connection.delivery_cursor.last_sent_message_hash = Some(message_hash.clone()); + connection + .recent_messages + .push(RadrootsSimplexAgentRecentMessageRecord { + message_id, + message_hash, + }); + Ok(()) + } + + pub fn record_inbound_message( + &mut self, + connection_id: &str, + message_id: RadrootsSimplexAgentMessageId, + message_hash: Vec<u8>, + ) -> Result<(), RadrootsSimplexAgentStoreError> { + let connection = self.connection_mut(connection_id)?; + connection.delivery_cursor.last_received_message_id = Some(message_id); + connection.delivery_cursor.last_received_message_hash = Some(message_hash.clone()); + connection + .recent_messages + .push(RadrootsSimplexAgentRecentMessageRecord { + message_id, + message_hash, + }); + Ok(()) + } + + pub fn enqueue_command( + &mut self, + connection_id: &str, + kind: RadrootsSimplexAgentPendingCommandKind, + ready_at: u64, + ) -> Result<RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentStoreError> { + let _ = self.connection(connection_id)?; + self.next_command_sequence = self.next_command_sequence.saturating_add(1); + let command = RadrootsSimplexAgentPendingCommand { + id: self.next_command_sequence, + connection_id: connection_id.into(), + kind, + attempts: 0, + ready_at, + inflight: false, + }; + self.pending_commands.insert(command.id, command.clone()); + Ok(command) + } + + pub fn take_ready_commands( + &mut self, + now: u64, + limit: usize, + ) -> Vec<RadrootsSimplexAgentPendingCommand> { + let ready_ids = self + .pending_commands + .iter() + .filter(|(_, command)| !command.inflight && command.ready_at <= now) + .map(|(id, _)| *id) + .take(limit) + .collect::<Vec<_>>(); + + ready_ids + .into_iter() + .filter_map(|id| { + let command = self.pending_commands.get_mut(&id)?; + command.inflight = true; + command.attempts = command.attempts.saturating_add(1); + Some(command.clone()) + }) + .collect() + } + + pub fn mark_command_delivered( + &mut self, + command_id: u64, + ) -> Result<RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentStoreError> { + self.pending_commands + .remove(&command_id) + .ok_or(RadrootsSimplexAgentStoreError::CommandNotFound(command_id)) + } + + pub fn mark_command_retry( + &mut self, + command_id: u64, + ready_at: u64, + ) -> Result<RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentStoreError> { + let command = self + .pending_commands + .get_mut(&command_id) + .ok_or(RadrootsSimplexAgentStoreError::CommandNotFound(command_id))?; + command.inflight = false; + command.ready_at = ready_at; + Ok(command.clone()) + } + + pub fn mark_command_failed( + &mut self, + command_id: u64, + ) -> Result<RadrootsSimplexAgentPendingCommand, RadrootsSimplexAgentStoreError> { + self.pending_commands + .remove(&command_id) + .ok_or(RadrootsSimplexAgentStoreError::CommandNotFound(command_id)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use radroots_simplex_smp_proto::prelude::RadrootsSimplexSmpQueueUri; + + fn sample_descriptor(primary: bool) -> RadrootsSimplexAgentQueueDescriptor { + RadrootsSimplexAgentQueueDescriptor { + queue_uri: RadrootsSimplexSmpQueueUri::parse( + "smp://aGVsbG8@relay.example/cXVldWU#/?v=4&dh=Zm9vYmFy&q=m", + ) + .unwrap(), + replaced_queue: None, + primary, + sender_key: None, + } + } + + #[test] + fn stores_connections_queues_and_retryable_commands() { + let mut store = RadrootsSimplexAgentStore::new(); + let connection = store.create_connection( + RadrootsSimplexAgentConnectionMode::Direct, + RadrootsSimplexAgentConnectionStatus::CreatePending, + None, + None, + ); + store + .add_queue( + &connection.id, + sample_descriptor(true), + RadrootsSimplexAgentQueueRole::Send, + true, + ) + .unwrap(); + let command = store + .enqueue_command( + &connection.id, + RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { + queue: sample_descriptor(true).queue_address(), + }, + 10, + ) + .unwrap(); + let ready = store.take_ready_commands(10, 10); + assert_eq!(ready.len(), 1); + assert_eq!(ready[0].id, command.id); + let retried = store.mark_command_retry(command.id, 20).unwrap(); + assert_eq!(retried.ready_at, 20); + assert_eq!( + store.primary_send_queue(&connection.id).unwrap().descriptor, + sample_descriptor(true) + ); + } +}