commit 2081f41c953d8c8cdc63e245aa95cbd72e69e5bd
parent 61c879e09f4388dc6fb40dbb9200038fb07042d6
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 10:22:14 +0000
simplex: restore receive subscriptions on restart
- list persisted subscribed receive queues from the agent store
- avoid duplicate pending SUB commands during runtime resume
- expose runtime subscription resume for reopened stores
- cover persisted restart resubscription through command execution
Diffstat:
2 files changed, 110 insertions(+), 0 deletions(-)
diff --git a/crates/simplex_agent_runtime/src/runtime.rs b/crates/simplex_agent_runtime/src/runtime.rs
@@ -618,6 +618,39 @@ impl RadrootsSimplexAgentRuntime {
Ok(())
}
+ pub fn resume_subscriptions(
+ &mut self,
+ now: u64,
+ ) -> Result<usize, RadrootsSimplexAgentRuntimeError> {
+ let mut queued = 0_usize;
+ let mut event_connections = Vec::new();
+ for (connection_id, queue) in self.store.subscribed_receive_queues() {
+ if self
+ .store
+ .has_pending_subscribe_queue(&connection_id, &queue)
+ {
+ continue;
+ }
+ self.store.enqueue_command(
+ &connection_id,
+ RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue },
+ now,
+ )?;
+ queued = queued.saturating_add(1);
+ if !event_connections.contains(&connection_id) {
+ event_connections.push(connection_id);
+ }
+ }
+ for connection_id in event_connections {
+ self.events
+ .push_back(RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { connection_id });
+ }
+ if queued > 0 {
+ self.flush_store()?;
+ }
+ Ok(queued)
+ }
+
pub fn get_connection_message(
&mut self,
connection_id: &str,
@@ -3807,6 +3840,56 @@ mod tests {
assert!(path.exists());
}
+ #[cfg(feature = "std")]
+ #[test]
+ fn resume_subscriptions_requeues_persisted_receive_queues_after_restart() {
+ let tempdir = tempfile::tempdir().unwrap();
+ let path = tempdir.path().join("runtime-store.json");
+ {
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new()
+ .persistent_store_path(&path)
+ .build()
+ .unwrap();
+ let created = runtime
+ .create_connection(invitation_queue(), b"e2e".to_vec(), false, 10)
+ .unwrap();
+ let mut setup_transport = ScriptedTransport::with_responses(vec![
+ ids_response(b"recipient", b"sender", b"server-dh"),
+ RadrootsSimplexSmpBrokerMessage::Ok,
+ ]);
+ runtime
+ .execute_ready_commands(&mut setup_transport, 30, 16)
+ .unwrap();
+ assert!(runtime.store.receive_queues(&created).unwrap()[0].subscribed);
+ }
+
+ let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new()
+ .persistent_store_path(&path)
+ .build()
+ .unwrap();
+ assert_eq!(runtime.resume_subscriptions(40).unwrap(), 1);
+ assert_eq!(runtime.resume_subscriptions(41).unwrap(), 0);
+ let mut subscription_transport =
+ ScriptedTransport::with_responses(vec![RadrootsSimplexSmpBrokerMessage::Ok]);
+ runtime
+ .execute_ready_commands(&mut subscription_transport, 50, 16)
+ .unwrap();
+
+ assert_eq!(subscription_transport.requests.len(), 1);
+ assert!(matches!(
+ subscription_transport.requests[0].command,
+ RadrootsSimplexSmpCommand::Sub
+ ));
+ assert_eq!(
+ subscription_transport.requests[0].entity_id,
+ b"recipient".to_vec()
+ );
+ assert!(runtime.drain_events(16).into_iter().any(|event| matches!(
+ event,
+ RadrootsSimplexAgentRuntimeEvent::SubscriptionQueued { .. }
+ )));
+ }
+
#[test]
fn manual_record_command_failure_clears_staged_delivery_state() {
let mut runtime = RadrootsSimplexAgentRuntimeBuilder::new().build().unwrap();
diff --git a/crates/simplex_agent_store/src/store.rs b/crates/simplex_agent_store/src/store.rs
@@ -1016,6 +1016,18 @@ impl RadrootsSimplexAgentStore {
servers
}
+ pub fn subscribed_receive_queues(&self) -> Vec<(String, RadrootsSimplexAgentQueueAddress)> {
+ let mut queues = Vec::new();
+ for connection in self.connections.values() {
+ for queue in &connection.queues {
+ if queue.role == RadrootsSimplexAgentQueueRole::Receive && queue.subscribed {
+ queues.push((connection.id.clone(), queue.descriptor.queue_address()));
+ }
+ }
+ }
+ queues
+ }
+
pub fn receive_queue_by_entity_id(
&self,
server: &RadrootsSimplexSmpServerAddress,
@@ -1193,6 +1205,21 @@ impl RadrootsSimplexAgentStore {
})
}
+ pub fn has_pending_subscribe_queue(
+ &self,
+ connection_id: &str,
+ queue_address: &RadrootsSimplexAgentQueueAddress,
+ ) -> bool {
+ self.pending_commands.values().any(|command| {
+ command.connection_id == connection_id
+ && matches!(
+ &command.kind,
+ RadrootsSimplexAgentPendingCommandKind::SubscribeQueue { queue }
+ if queue == queue_address
+ )
+ })
+ }
+
pub fn inbound_ack_target(
&self,
connection_id: &str,