commit 3e3d537bf4416a5007abb9e9072395182966a53c
parent e04d25d43113a7b3bb6cfc4d8362a7dba18f93bf
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:46:23 -0700
groups: broadcast generated outbox offsets
- Return newly stored generated event offsets from group outbox materialization.
- Carry source and generated offsets through relay write results for runtime publication.
- Publish every stored offset on the runtime event bus so generated snapshots reach live fanout.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
3 files changed, 126 insertions(+), 24 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -158,7 +158,7 @@ impl GroupService {
event: &Event,
class: &GroupEventClass,
store_offset: StoreOffset,
- ) -> Result<(), BaseRelayError> {
+ ) -> Result<Vec<StoreOffset>, BaseRelayError> {
self.projection
.apply_canonical_event(event, store_offset, self.limits)?;
if let Some(group_id) = class_group_id(class) {
@@ -218,19 +218,25 @@ impl GroupService {
Ok(())
}
- fn materialize_outbox(&mut self, store: &PocketStoreHandle) -> Result<(), BaseRelayError> {
+ fn materialize_outbox(
+ &mut self,
+ store: &PocketStoreHandle,
+ ) -> Result<Vec<StoreOffset>, BaseRelayError> {
+ let mut stored_offsets = Vec::new();
let records = self.outbox.replay_plan().records().to_vec();
for record in records {
- self.materialize_record(store, record)?;
+ if let Some(offset) = self.materialize_record(store, record)? {
+ stored_offsets.push(offset);
+ }
}
- Ok(())
+ Ok(stored_offsets)
}
fn materialize_record(
&mut self,
store: &PocketStoreHandle,
mut record: GroupOutboxRecord,
- ) -> Result<(), BaseRelayError> {
+ ) -> Result<Option<StoreOffset>, BaseRelayError> {
if matches!(
record.key().effect(),
GroupOutboxEffect::RoleListSnapshot | GroupOutboxEffect::State39004Snapshot
@@ -238,14 +244,14 @@ impl GroupService {
record.mark_skipped("generated group effect is not supported");
self.outbox.update(record.clone());
persist_outbox_record(store, &record)?;
- return Ok(());
+ return Ok(None);
}
match self.store_generated_event(store, &record) {
- Ok(generated_event_id) => {
+ Ok((generated_event_id, stored_offset)) => {
record.mark_stored(generated_event_id);
self.outbox.update(record.clone());
persist_outbox_record(store, &record)?;
- Ok(())
+ Ok(stored_offset)
}
Err(error) => {
record.mark_failed(true, error.prefixed_message());
@@ -260,17 +266,17 @@ impl GroupService {
&mut self,
store: &PocketStoreHandle,
record: &GroupOutboxRecord,
- ) -> Result<EventId, BaseRelayError> {
+ ) -> Result<(EventId, Option<StoreOffset>), BaseRelayError> {
let event = self.builder.sign_payload(record.payload())?;
if store.event_by_id(pocket_event_id(event.id())?)?.is_some() {
- return Ok(event.id().clone());
+ return Ok((event.id().clone(), None));
}
let pocket_event = tangle_event_to_pocket(&event)?;
let offset = StoreOffset::new(store.store_event(&pocket_event)?);
self.projection
.apply_canonical_event(&event, offset, self.limits)?;
self.persist_group_projection(store, record.key().group_id())?;
- Ok(event.id().clone())
+ Ok((event.id().clone(), Some(offset)))
}
fn persist_group_projection(
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -27,26 +27,26 @@ pub struct BaseRelay {
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct BaseRelayEventWrite {
message: RelayMessage,
- stored_offset: Option<StoreOffset>,
+ stored_offsets: Vec<StoreOffset>,
}
impl BaseRelayEventWrite {
- fn stored(message: RelayMessage, stored_offset: StoreOffset) -> Self {
+ fn stored(message: RelayMessage, stored_offsets: Vec<StoreOffset>) -> Self {
Self {
message,
- stored_offset: Some(stored_offset),
+ stored_offsets,
}
}
fn unstored(message: RelayMessage) -> Self {
Self {
message,
- stored_offset: None,
+ stored_offsets: Vec::new(),
}
}
- pub(crate) fn stored_offset(&self) -> Option<StoreOffset> {
- self.stored_offset
+ pub(crate) fn stored_offsets(&self) -> &[StoreOffset] {
+ &self.stored_offsets
}
pub(crate) fn into_message(self) -> RelayMessage {
@@ -310,15 +310,21 @@ impl BaseRelay {
}
let pocket_event = tangle_event_to_pocket(&event)?;
let store_offset = StoreOffset::new(self.store.store_event(&pocket_event)?);
+ let mut stored_offsets = vec![store_offset];
if !matches!(class, GroupEventClass::NonGroup)
&& let Some(groups) = self.groups.as_mut()
{
- groups.after_source_event_stored(&self.store, &event, &class, store_offset)?;
+ stored_offsets.extend(groups.after_source_event_stored(
+ &self.store,
+ &event,
+ &class,
+ store_offset,
+ )?);
}
self.store.sync()?;
Ok(BaseRelayEventWrite::stored(
ok_accepted(event_id, String::new()),
- store_offset,
+ stored_offsets,
))
}
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -119,9 +119,9 @@ impl TangleRuntimeHandle {
let result = runtime
.relay_mut()
.handle_event_with_auth_report(event, auth)?;
- if let Some(offset) = result.stored_offset() {
+ for offset in result.stored_offsets() {
runtime.metrics().record_stored_event_offset();
- runtime.event_bus().publish(offset);
+ runtime.event_bus().publish(*offset);
}
Ok(vec![result.into_message()])
}
@@ -334,11 +334,13 @@ mod tests {
use crate::relay::live::LiveSubscriptionSet;
use serde_json::json;
use std::path::{Path, PathBuf};
- use tangle_groups::{GroupAuthContext, StoreOffset};
+ use tangle_groups::{GroupAuthContext, KIND_GROUP_ADMINS, KIND_GROUP_METADATA, StoreOffset};
use tangle_protocol::{
ClientMessage, RelayMessage, SubscriptionId, UnixTimestamp, filter_from_value,
};
- use tangle_test_support::{FixtureKey, tangle_v2_event};
+ use tangle_test_support::{
+ FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event,
+ };
#[test]
fn tangle_runtime_opens_owned_process_shell_from_config() {
@@ -474,6 +476,94 @@ mod tests {
let _ = std::fs::remove_dir_all(root);
}
+ #[tokio::test]
+ async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() {
+ let root = temp_root("runtime-generated-offset-fanout");
+ let _ = std::fs::remove_dir_all(&root);
+ let handle = TangleRuntimeHandle::new(
+ TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"),
+ );
+ let mut offsets = handle.subscribe_events().await;
+ let mut auth = handle.auth_state().await.expect("auth");
+ auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
+ .expect("challenge");
+ let auth_event =
+ tangle_v2_auth_event(FixtureKey::Owner, "challenge-a", 120).expect("auth event");
+ let create = tangle_v2_group_create_event(FixtureKey::Owner, "RuntimeFarm", 121, &[])
+ .expect("create");
+ let mut subscriptions = LiveSubscriptionSet::new(8).expect("subscriptions");
+ let subscription_id = SubscriptionId::new("generated-offsets").expect("subscription");
+ subscriptions
+ .subscribe(
+ subscription_id.clone(),
+ vec![
+ filter_from_value(&json!({"kinds":[KIND_GROUP_METADATA, KIND_GROUP_ADMINS]}))
+ .expect("filter"),
+ ],
+ GroupAuthContext::unauthenticated(),
+ )
+ .expect("subscribe");
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Auth(auth_event.clone()),
+ &mut auth,
+ UnixTimestamp::new(120)
+ )
+ .await
+ .expect("auth"),
+ vec![RelayMessage::Ok {
+ event_id: auth_event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Event(create.clone()),
+ &mut auth,
+ UnixTimestamp::new(121)
+ )
+ .await
+ .expect("create"),
+ vec![RelayMessage::Ok {
+ event_id: create.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let source_offset = offsets.try_recv().expect("source offset");
+ let generated_offsets = [
+ offsets.try_recv().expect("first generated offset"),
+ offsets.try_recv().expect("second generated offset"),
+ ];
+ assert!(source_offset < generated_offsets[0]);
+ assert!(generated_offsets[0] < generated_offsets[1]);
+ for offset in generated_offsets {
+ assert!(matches!(
+ handle
+ .fanout_event_offset(offset, &mut subscriptions)
+ .await
+ .expect("fanout")
+ .as_slice(),
+ [RelayMessage::Event {
+ subscription_id: delivered,
+ event
+ }] if delivered == &subscription_id
+ && [KIND_GROUP_METADATA, KIND_GROUP_ADMINS]
+ .contains(&event.unsigned().kind().as_u32())
+ ));
+ }
+ assert_eq!(
+ offsets.try_recv().expect_err("only source plus generated"),
+ TangleEventReceiveError::Empty
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
fn runtime_config(root: &Path, max_pending_events: usize) -> BaseRelayRuntimeConfig {
let raw = json!({
"server": {
@@ -490,7 +580,7 @@ mod tests {
"enabled": true,
"canonical_relay_url": "wss://relay.radroots.test",
"relay_secret": "7777777777777777777777777777777777777777777777777777777777777777",
- "owner_pubkeys": ["0202020202020202020202020202020202020202020202020202020202020202"]
+ "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()]
},
"auth": {
"challenge_ttl_seconds": 300,