commit bc9481e065a652aa0d3486e296454b35d5073ae3
parent b70d855cf5f63dc8993c9bdcf433600d11aba27b
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 16:24:29 -0700
groups: introduce service handle state
Diffstat:
3 files changed, 123 insertions(+), 38 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -5,7 +5,9 @@ use crate::{
pocket_conversion::{pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket},
};
use std::{
+ ops::Deref,
str,
+ sync::{Arc, RwLock, RwLockReadGuard},
time::{SystemTime, UNIX_EPOCH},
};
use tangle_crypto::RelaySigner;
@@ -27,7 +29,24 @@ use tangle_store_pocket::{
TANGLE_GROUP_PROJECTION_TABLE,
};
-pub(crate) struct GroupService {
+#[derive(Clone)]
+pub(crate) struct GroupServiceHandle {
+ state: Arc<RwLock<GroupServiceState>>,
+}
+
+pub struct GroupProjectionReadGuard<'a> {
+ state: RwLockReadGuard<'a, GroupServiceState>,
+}
+
+impl Deref for GroupProjectionReadGuard<'_> {
+ type Target = GroupProjection;
+
+ fn deref(&self) -> &Self::Target {
+ &self.state.projection
+ }
+}
+
+pub(crate) struct GroupServiceState {
builder: GroupGeneratedEventBuilder,
authority: GroupAuthority,
projection: GroupProjection,
@@ -38,11 +57,84 @@ pub(crate) struct GroupService {
outbox_replay_batch_cap: u32,
}
-impl GroupService {
+impl GroupServiceHandle {
pub(crate) fn from_config(
store: &PocketStoreHandle,
config: &GroupRuntimeConfig,
) -> Result<Option<Self>, BaseRelayError> {
+ GroupServiceState::from_config(store, config).map(|state| {
+ state.map(|state| Self {
+ state: Arc::new(RwLock::new(state)),
+ })
+ })
+ }
+
+ pub(crate) fn projection(&self) -> GroupProjectionReadGuard<'_> {
+ GroupProjectionReadGuard {
+ state: self
+ .state
+ .read()
+ .expect("group service state lock is not poisoned"),
+ }
+ }
+
+ pub(crate) fn limits(&self) -> GroupLimitsConfig {
+ self.state
+ .read()
+ .expect("group service state lock is not poisoned")
+ .limits()
+ }
+
+ pub(crate) fn outbox_pending_events(&self) -> usize {
+ self.state
+ .read()
+ .expect("group service state lock is not poisoned")
+ .outbox_pending_events()
+ }
+
+ pub(crate) fn check_event(
+ &self,
+ store: &PocketStoreHandle,
+ event: &Event,
+ class: &GroupEventClass,
+ auth: &GroupAuthContext,
+ ) -> Result<(), GroupError> {
+ self.state
+ .read()
+ .map_err(|_| GroupError::internal("group service state lock is poisoned"))?
+ .check_event(store, event, class, auth)
+ }
+
+ pub(crate) fn event_visible_to_auth(
+ &self,
+ event: &(impl GroupEventView + ?Sized),
+ auth: &GroupAuthContext,
+ ) -> Result<bool, GroupError> {
+ self.state
+ .read()
+ .map_err(|_| GroupError::internal("group service state lock is poisoned"))?
+ .event_visible_to_auth(event, auth)
+ }
+
+ pub(crate) fn after_source_event_stored(
+ &self,
+ store: &PocketStoreHandle,
+ event: &Event,
+ class: &GroupEventClass,
+ store_offset: StoreOffset,
+ ) -> Result<Vec<StoreOffset>, BaseRelayError> {
+ self.state
+ .write()
+ .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))?
+ .after_source_event_stored(store, event, class, store_offset)
+ }
+}
+
+impl GroupServiceState {
+ fn from_config(
+ store: &PocketStoreHandle,
+ config: &GroupRuntimeConfig,
+ ) -> Result<Option<Self>, BaseRelayError> {
if !config.enabled() {
return Ok(None);
}
@@ -52,7 +144,7 @@ impl GroupService {
let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing())
.map_err(BaseRelayError::invalid)?;
let storage = load_group_storage(store, config.limits())?;
- let mut service = Self {
+ let mut state = Self {
builder: GroupGeneratedEventBuilder::new(signer),
authority: GroupAuthority::new(
config.owner_pubkeys().iter().cloned(),
@@ -65,24 +157,20 @@ impl GroupService {
member_snapshot_cap: config.limits().max_member_list_pubkeys(),
outbox_replay_batch_cap: config.limits().max_outbox_replay_batch(),
};
- service.derive_missing_outbox_records(store)?;
- service.materialize_outbox(store)?;
- Ok(Some(service))
+ state.derive_missing_outbox_records(store)?;
+ state.materialize_outbox(store)?;
+ Ok(Some(state))
}
- pub(crate) fn projection(&self) -> &GroupProjection {
- &self.projection
- }
-
- pub(crate) fn limits(&self) -> GroupLimitsConfig {
+ fn limits(&self) -> GroupLimitsConfig {
self.limits
}
- pub(crate) fn outbox_pending_events(&self) -> usize {
+ fn outbox_pending_events(&self) -> usize {
self.outbox.replay_plan().records().len()
}
- pub(crate) fn check_event(
+ fn check_event(
&self,
store: &PocketStoreHandle,
event: &Event,
@@ -138,7 +226,7 @@ impl GroupService {
Ok(())
}
- pub(crate) fn event_visible_to_auth(
+ fn event_visible_to_auth(
&self,
event: &(impl GroupEventView + ?Sized),
auth: &GroupAuthContext,
@@ -157,7 +245,7 @@ impl GroupService {
Ok(false)
}
- pub(crate) fn after_source_event_stored(
+ fn after_source_event_stored(
&mut self,
store: &PocketStoreHandle,
event: &Event,
@@ -1046,7 +1134,7 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> {
#[cfg(test)]
mod tests {
use super::{
- GroupCheckpointStatus, GroupService, scan_canonical_group_events,
+ GroupCheckpointStatus, GroupServiceHandle, scan_canonical_group_events,
scan_canonical_group_events_after, validate_group_extra_tables,
};
use crate::pocket_conversion::tangle_event_to_pocket;
@@ -1075,7 +1163,7 @@ mod tests {
let store = PocketStoreHandle::open(&config).expect("store");
assert!(
- GroupService::from_config(&store, &GroupRuntimeConfig::disabled())
+ GroupServiceHandle::from_config(&store, &GroupRuntimeConfig::disabled())
.expect("service")
.is_none()
);
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -1,5 +1,5 @@
use crate::errors::{BaseRelayError, ok_accepted, ok_rejected};
-use crate::groups::GroupService;
+use crate::groups::{GroupProjectionReadGuard, GroupServiceHandle};
use crate::logging::{self, TangleModerationAuditResult};
use crate::ops::BaseRelayReadinessState;
use crate::pocket_conversion::{
@@ -12,8 +12,8 @@ use crate::relay::{
use std::{cell::RefCell, collections::BTreeSet};
use tangle_crypto::verify_event_signature;
use tangle_groups::{
- GroupAuthContext, GroupEventClass, GroupEventView, GroupProjection, GroupRuntimeConfig,
- StoreOffset, classify_group_event, validate_client_group_event_structure,
+ GroupAuthContext, GroupEventClass, GroupEventView, GroupRuntimeConfig, StoreOffset,
+ classify_group_event, validate_client_group_event_structure,
};
use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
use tangle_store_pocket::{
@@ -23,7 +23,7 @@ use tangle_store_pocket::{
pub struct BaseRelay {
store: PocketStoreHandle,
subscriptions: LiveSubscriptionSet,
- groups: Option<GroupService>,
+ groups: Option<GroupServiceHandle>,
readiness: BaseRelayReadinessState,
limits: BaseRelayLimits,
query: PocketQueryConfig,
@@ -458,7 +458,7 @@ impl BaseRelay {
groups: &GroupRuntimeConfig,
query: PocketQueryConfig,
) -> Result<Self, BaseRelayError> {
- let groups = GroupService::from_config(&store, groups)?;
+ let groups = GroupServiceHandle::from_config(&store, groups)?;
let subscriptions =
LiveSubscriptionSet::new(limits.max_pending_events(), limits.max_subscriptions())?;
let readiness = BaseRelayReadinessState::runtime_ready_before_bind();
@@ -610,18 +610,18 @@ impl BaseRelay {
self.store.clone()
}
- pub fn group_projection(&self) -> Option<&GroupProjection> {
- self.groups.as_ref().map(|groups| groups.projection())
+ pub fn group_projection(&self) -> Option<GroupProjectionReadGuard<'_>> {
+ self.groups.as_ref().map(GroupServiceHandle::projection)
}
- pub(crate) fn group_service(&self) -> Option<&GroupService> {
+ pub(crate) fn group_service(&self) -> Option<&GroupServiceHandle> {
self.groups.as_ref()
}
pub(crate) fn group_outbox_pending_events(&self) -> usize {
self.groups
.as_ref()
- .map(GroupService::outbox_pending_events)
+ .map(GroupServiceHandle::outbox_pending_events)
.unwrap_or(0)
}
@@ -665,7 +665,7 @@ impl BaseRelay {
let group_limits = self
.groups
.as_ref()
- .map(GroupService::limits)
+ .map(GroupServiceHandle::limits)
.unwrap_or_default();
let audit_class = classify_group_event(&event, group_limits).ok();
let class = match validate_client_group_event_structure(&event, group_limits) {
@@ -1028,7 +1028,7 @@ impl BaseRelay {
}
pub(crate) fn group_read_gate_visible_to_auth(
- groups: Option<&GroupService>,
+ groups: Option<&GroupServiceHandle>,
event: &(impl GroupEventView + ?Sized),
auth: &GroupAuthContext,
) -> Result<bool, BaseRelayError> {
@@ -1828,12 +1828,11 @@ mod tests {
);
let group_id = GroupId::new("Farm").expect("group");
- let group = relay
- .group_projection()
- .expect("projection")
- .group(&group_id)
- .expect("group");
- assert_eq!(group.metadata().name(), Some("Market"));
+ {
+ let projection = relay.group_projection().expect("projection");
+ let group = projection.group(&group_id).expect("group");
+ assert_eq!(group.metadata().name(), Some("Market"));
+ }
let metadata = query_filter(
&mut relay,
"metadata-edit",
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -1379,10 +1379,8 @@ fn rebuilt_projection_matches_live_projection_for_moderation_stream() {
PocketQueryConfig::default(),
)
.expect("reopen");
- assert_projection_without_checkpoint_eq(
- &live_projection,
- relay.group_projection().expect("projection"),
- );
+ let recovered_projection = relay.group_projection().expect("projection");
+ assert_projection_without_checkpoint_eq(&live_projection, &recovered_projection);
assert_eq!(
stored_event_ids_for_kind(&config, KIND_GROUP_METADATA),
metadata_before