commit 3abf26aff686e1c5990ee5773afa04c3b8589fb8
parent 1fe976ae1ad2e4d361a4f58b2b282a5fae0b6ca1
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:41:33 -0700
groups: require outbox replay before readiness
- Store relay readiness after startup recovery instead of manufacturing a ready response on demand.
- Keep group outbox replay on the readiness path after projection recovery and outbox materialization complete.
- Add restart coverage for pending and retryable outbox records materializing before ready state is reported.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 119 insertions(+), 6 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -21,6 +21,7 @@ pub struct BaseRelay {
store: PocketStoreHandle,
subscriptions: LiveSubscriptionSet,
groups: Option<GroupService>,
+ readiness: BaseRelayReadinessState,
}
#[derive(Debug, Clone, PartialEq)]
@@ -101,10 +102,13 @@ impl BaseRelay {
groups: &GroupRuntimeConfig,
) -> Result<Self, BaseRelayError> {
let groups = GroupService::from_config(&store, groups)?;
+ let subscriptions = LiveSubscriptionSet::new(max_pending_events)?;
+ let readiness = BaseRelayReadinessState::ready();
Ok(Self {
store,
- subscriptions: LiveSubscriptionSet::new(max_pending_events)?,
+ subscriptions,
groups,
+ readiness,
})
}
@@ -239,7 +243,7 @@ impl BaseRelay {
}
pub fn readiness_state(&self) -> BaseRelayReadinessState {
- BaseRelayReadinessState::ready()
+ self.readiness.clone()
}
pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
@@ -841,6 +845,14 @@ mod tests {
let disabled = test_relay_with_groups("base-relay-groups-disabled", 4, &disabled_groups());
assert!(relay.groups_enabled());
+ assert_eq!(
+ relay
+ .readiness_state()
+ .response()
+ .checks
+ .group_outbox_replay,
+ "ready"
+ );
assert!(
relay
.group_projection()
@@ -849,6 +861,14 @@ mod tests {
.is_empty()
);
assert!(!disabled.groups_enabled());
+ assert_eq!(
+ disabled
+ .readiness_state()
+ .response()
+ .checks
+ .group_outbox_replay,
+ "ready"
+ );
assert!(disabled.group_projection().is_none());
}
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -3,10 +3,11 @@
use std::{fs, panic, path::PathBuf};
use tangle_crypto::{event_id_matches, verify_event_signature};
use tangle_groups::{
- GroupId, GroupRuntimeConfig, KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP,
- KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
- KIND_GROUP_PUT_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, ProjectionCheckpoint,
- StoreOffset, member_current_key, parse_group_runtime_config_json, projection_checkpoint_key,
+ GroupId, GroupOutboxRecord, GroupOutboxStatus, GroupRuntimeConfig, KIND_GROUP_ADMINS,
+ KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS,
+ KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES,
+ ProjectionCheckpoint, StoreOffset, member_current_key, parse_group_runtime_config_json,
+ projection_checkpoint_key,
};
use tangle_protocol::{
Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp,
@@ -955,6 +956,14 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
delete_group_extra_records(&config);
let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ assert_eq!(
+ relay
+ .readiness_state()
+ .response()
+ .checks
+ .group_outbox_replay,
+ "ready"
+ );
assert!(
relay
.group_projection()
@@ -1044,6 +1053,36 @@ fn projection_applies_canonical_events_after_checkpoint_on_restart() {
}
#[test]
+fn pending_and_retryable_group_outbox_records_materialize_on_restart() {
+ let config = test_store_config("outbox-retryable-restart");
+ let owner_auth = authenticated(FixtureKey::Owner);
+ {
+ let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ accept_group_create(&mut relay, "OutboxFarm", &[], 1, &owner_auth);
+ relay.shutdown().expect("shutdown");
+ }
+ regress_outbox_records_to_retryable(&config);
+ assert_eq!(outbox_status_counts(&config).pending, 1);
+ assert_eq!(outbox_status_counts(&config).retryable, 1);
+
+ let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ assert_eq!(
+ relay
+ .readiness_state()
+ .response()
+ .checks
+ .group_outbox_replay,
+ "ready"
+ );
+ assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
+ assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
+ let counts = outbox_status_counts(&config);
+ assert_eq!(counts.pending, 0);
+ assert_eq!(counts.retryable, 0);
+ assert!(counts.stored >= 2);
+}
+
+#[test]
fn same_timestamp_conflicts_are_deterministic_across_ingest_order() {
let first = tangle_v2_group_metadata_event(FixtureKey::Owner, "ClockFarm", "Alpha", 100, &[])
.expect("first");
@@ -1227,6 +1266,60 @@ fn regress_member_projection_to_checkpoint(
store.sync().expect("sync");
}
+fn regress_outbox_records_to_retryable(config: &PocketStoreConfig) {
+ let store = PocketStoreHandle::open(config).expect("store");
+ let records = store
+ .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)
+ .expect("outbox records");
+ assert!(records.len() >= 2);
+ let mut first = GroupOutboxRecord::from_json_bytes(&records[0].1).expect("first outbox record");
+ let second = GroupOutboxRecord::from_json_bytes(&records[1].1).expect("second outbox record");
+ first.mark_failed(true, "retry on restart");
+ let pending = GroupOutboxRecord::pending(second.key().clone(), second.payload().clone());
+ store
+ .put_extra_record(
+ TANGLE_GROUP_OUTBOX_TABLE,
+ &first.key().storage_key(),
+ &first.to_json_bytes().expect("failed bytes"),
+ )
+ .expect("put failed");
+ store
+ .put_extra_record(
+ TANGLE_GROUP_OUTBOX_TABLE,
+ &pending.key().storage_key(),
+ &pending.to_json_bytes().expect("pending bytes"),
+ )
+ .expect("put pending");
+ store.sync().expect("sync");
+}
+
+#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
+struct OutboxStatusCounts {
+ pending: usize,
+ retryable: usize,
+ stored: usize,
+}
+
+fn outbox_status_counts(config: &PocketStoreConfig) -> OutboxStatusCounts {
+ let store = PocketStoreHandle::open(config).expect("store");
+ let mut counts = OutboxStatusCounts::default();
+ for (_, value) in store
+ .scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)
+ .expect("outbox records")
+ {
+ match GroupOutboxRecord::from_json_bytes(&value)
+ .expect("outbox record")
+ .status()
+ {
+ GroupOutboxStatus::Pending => counts.pending += 1,
+ GroupOutboxStatus::Failed { retryable: true } => counts.retryable += 1,
+ GroupOutboxStatus::Stored { .. } => counts.stored += 1,
+ GroupOutboxStatus::Skipped { .. } | GroupOutboxStatus::Failed { retryable: false } => {}
+ }
+ }
+ counts
+}
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-rcld12-{name}-{}", std::process::id()))
}