commit ec20604b94025d5a539df2f93c2277c335a44afb
parent 9d1a4d1b1fd7744554f3af5628d884b14a413eac
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 21:44:55 -0700
core: add event ingestion pipeline
Diffstat:
3 files changed, 762 insertions(+), 9 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -380,6 +380,7 @@ dependencies = [
"tangle_crypto",
"tangle_nips",
"tangle_protocol",
+ "tangle_store",
"tangle_test_support",
]
diff --git a/crates/tangle_core/Cargo.toml b/crates/tangle_core/Cargo.toml
@@ -11,6 +11,7 @@ description = "Transport-independent relay core policy for tangle"
tangle_crypto = { path = "../tangle_crypto" }
tangle_nips = { path = "../tangle_nips" }
tangle_protocol = { path = "../tangle_protocol" }
+tangle_store = { path = "../tangle_store" }
[dev-dependencies]
tangle_test_support = { path = "../tangle_test_support" }
diff --git a/crates/tangle_core/src/lib.rs b/crates/tangle_core/src/lib.rs
@@ -7,7 +7,11 @@ use tangle_nips::{
DeletionRequest, ListingProjectionEvaluation, RelayAuthEvent, evaluate_listing_projection,
parse_deletion_request, parse_relay_auth_event,
};
-use tangle_protocol::{Event, PublicKeyHex, UnixTimestamp, event_to_value};
+use tangle_protocol::{Event, EventId, PublicKeyHex, UnixTimestamp, event_to_value};
+use tangle_store::{
+ DeletionMarker, DeletionMarkerRepository, ListingProjectionRepository, RawEventRepository,
+ RepositoryError, StoreEventOutcome, StoreProjectionOutcome, StoredEvent,
+};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RuntimeLimitValues {
@@ -801,7 +805,7 @@ impl Default for EventValidator {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ValidatedEvent {
- event_id: tangle_protocol::EventId,
+ event_id: EventId,
author_pubkey: PublicKeyHex,
admission_kind: AdmissionEventKind,
admission: AdmissionAcceptance,
@@ -809,7 +813,7 @@ pub struct ValidatedEvent {
}
impl ValidatedEvent {
- pub fn event_id(&self) -> &tangle_protocol::EventId {
+ pub fn event_id(&self) -> &EventId {
&self.event_id
}
@@ -997,6 +1001,232 @@ fn validation_payload(event: &Event) -> Result<ValidatedEventPayload, EventValid
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct EventIngestor {
+ validator: EventValidator,
+}
+
+impl EventIngestor {
+ pub fn new(validator: EventValidator) -> Self {
+ Self { validator }
+ }
+
+ pub fn validator(&self) -> &EventValidator {
+ &self.validator
+ }
+
+ pub fn ingest<R>(
+ &self,
+ repository: &mut R,
+ event: Event,
+ context: &AdmissionContext,
+ received_at: UnixTimestamp,
+ now: UnixTimestamp,
+ ) -> Result<EventIngestion, EventIngestionRejection>
+ where
+ R: RawEventRepository + ListingProjectionRepository + DeletionMarkerRepository,
+ {
+ let validated = self
+ .validator
+ .validate(&event, context, now)
+ .map_err(EventIngestionRejection::Validation)?;
+ if validated.admission().effect() == AdmissionEffect::AuthenticateOnly {
+ return Ok(EventIngestion::new(
+ validated.event_id().clone(),
+ EventIngestionEffect::Authenticated,
+ None,
+ None,
+ 0,
+ ));
+ }
+ if event.unsigned().kind().is_ephemeral() {
+ return Ok(EventIngestion::new(
+ validated.event_id().clone(),
+ EventIngestionEffect::EphemeralAccepted,
+ None,
+ None,
+ 0,
+ ));
+ }
+ let raw_outcome = repository
+ .put_event(StoredEvent::new(event.clone(), received_at))
+ .map_err(EventIngestionRejection::Repository)?;
+ if raw_outcome == StoreEventOutcome::Duplicate {
+ return Ok(EventIngestion::new(
+ validated.event_id().clone(),
+ EventIngestionEffect::Duplicate,
+ Some(raw_outcome),
+ None,
+ 0,
+ ));
+ }
+ let projection_outcome = ingest_projection(repository, &validated)?;
+ let deletion_marker_count = ingest_deletion_markers(repository, &validated, &event)?;
+ Ok(EventIngestion::new(
+ validated.event_id().clone(),
+ EventIngestionEffect::Stored,
+ Some(raw_outcome),
+ projection_outcome,
+ deletion_marker_count,
+ ))
+ }
+}
+
+impl Default for EventIngestor {
+ fn default() -> Self {
+ Self::new(EventValidator::default())
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct EventIngestion {
+ event_id: EventId,
+ effect: EventIngestionEffect,
+ raw_event_outcome: Option<StoreEventOutcome>,
+ projection_outcome: Option<StoreProjectionOutcome>,
+ deletion_marker_count: usize,
+}
+
+impl EventIngestion {
+ pub fn new(
+ event_id: EventId,
+ effect: EventIngestionEffect,
+ raw_event_outcome: Option<StoreEventOutcome>,
+ projection_outcome: Option<StoreProjectionOutcome>,
+ deletion_marker_count: usize,
+ ) -> Self {
+ Self {
+ event_id,
+ effect,
+ raw_event_outcome,
+ projection_outcome,
+ deletion_marker_count,
+ }
+ }
+
+ pub fn event_id(&self) -> &EventId {
+ &self.event_id
+ }
+
+ pub fn effect(&self) -> EventIngestionEffect {
+ self.effect
+ }
+
+ pub fn raw_event_outcome(&self) -> Option<StoreEventOutcome> {
+ self.raw_event_outcome
+ }
+
+ pub fn projection_outcome(&self) -> Option<StoreProjectionOutcome> {
+ self.projection_outcome
+ }
+
+ pub fn deletion_marker_count(&self) -> usize {
+ self.deletion_marker_count
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EventIngestionEffect {
+ Authenticated,
+ EphemeralAccepted,
+ Stored,
+ Duplicate,
+}
+
+impl EventIngestionEffect {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::Authenticated => "authenticated",
+ Self::EphemeralAccepted => "ephemeral accepted",
+ Self::Stored => "stored",
+ Self::Duplicate => "duplicate",
+ }
+ }
+}
+
+impl fmt::Display for EventIngestionEffect {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ formatter.write_str(self.as_str())
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum EventIngestionRejection {
+ Validation(EventValidationRejection),
+ Repository(RepositoryError),
+}
+
+impl EventIngestionRejection {
+ pub fn kind(&self) -> EventIngestionRejectionKind {
+ match self {
+ Self::Validation(_) => EventIngestionRejectionKind::Validation,
+ Self::Repository(_) => EventIngestionRejectionKind::Repository,
+ }
+ }
+}
+
+impl fmt::Display for EventIngestionRejection {
+ fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::Validation(rejection) => write!(formatter, "validation: {rejection}"),
+ Self::Repository(rejection) => write!(formatter, "repository: {rejection}"),
+ }
+ }
+}
+
+impl std::error::Error for EventIngestionRejection {}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum EventIngestionRejectionKind {
+ Validation,
+ Repository,
+}
+
+fn ingest_projection<R>(
+ repository: &mut R,
+ validated: &ValidatedEvent,
+) -> Result<Option<StoreProjectionOutcome>, EventIngestionRejection>
+where
+ R: ListingProjectionRepository,
+{
+ if validated.admission().effect() != AdmissionEffect::StoreRawAndProjectPublicListing {
+ return Ok(None);
+ }
+ let Some(ListingProjectionEvaluation::Eligible(projection)) =
+ validated.payload().listing_evaluation()
+ else {
+ return Ok(None);
+ };
+ repository
+ .put_listing_projection(projection.as_ref().clone())
+ .map(Some)
+ .map_err(EventIngestionRejection::Repository)
+}
+
+fn ingest_deletion_markers<R>(
+ repository: &mut R,
+ validated: &ValidatedEvent,
+ event: &Event,
+) -> Result<usize, EventIngestionRejection>
+where
+ R: DeletionMarkerRepository,
+{
+ let Some(request) = validated.payload().deletion_request() else {
+ return Ok(0);
+ };
+ for target in request.targets() {
+ repository
+ .put_deletion_marker(DeletionMarker::new(
+ request.event_id().clone(),
+ event.unsigned().pubkey().clone(),
+ target.clone(),
+ event.unsigned().created_at(),
+ ))
+ .map_err(EventIngestionRejection::Repository)?;
+ }
+ Ok(request.targets().len())
+}
+
fn require_positive(field: &'static str, value: u64) -> Result<(), RuntimeLimitConfigError> {
if value == 0 {
Err(RuntimeLimitConfigError::Zero { field })
@@ -1021,16 +1251,22 @@ fn require_within(
mod tests {
use super::{
AdmissionContext, AdmissionEffect, AdmissionEvent, AdmissionEventKind, AdmissionPolicy,
- AdmissionRejectionKind, EventParser, EventValidationRejection,
- EventValidationRejectionKind, EventValidator, ProjectionExclusionReason,
- RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues, RuntimeLimits,
- UnapprovedSellerAction,
+ AdmissionRejectionKind, EventIngestionEffect, EventIngestionRejectionKind, EventIngestor,
+ EventParser, EventValidationRejection, EventValidationRejectionKind, EventValidator,
+ ProjectionExclusionReason, RuntimeLimitConfigError, RuntimeLimitKind, RuntimeLimitValues,
+ RuntimeLimits, UnapprovedSellerAction,
};
+ use tangle_nips::{ListingProjection, evaluate_listing_projection, parse_deletion_request};
use tangle_protocol::{
- Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
+ AddressCoordinate, Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp,
+ UnsignedEvent,
+ };
+ use tangle_store::{
+ DeletionMarker, DeletionMarkerRepository, ListingProjectionRepository, RawEventRepository,
+ RepositoryError, StoreEventOutcome, StoreProjectionOutcome, StoredEvent,
};
use tangle_test_support::{
- FixtureKey, auth_event_spec, build_fixture_event, deletion_event_spec,
+ FixtureKey, InMemoryRepository, auth_event_spec, build_fixture_event, deletion_event_spec,
fixture_spec_from_json, projection_ineligible_listing_spec, valid_public_listing_spec,
};
@@ -1899,6 +2135,372 @@ mod tests {
assert_eq!(EventParser::Deletion.to_string(), "deletion");
}
+ #[test]
+ fn event_ingestor_keeps_auth_and_ephemeral_events_out_of_raw_storage() {
+ let seller = FixtureKey::Seller.public_key();
+ let auth = build_fixture_event(&auth_event_spec()).expect("auth");
+ let ephemeral = build_fixture_event(
+ &fixture_spec_from_json(
+ r#"{"name":"ephemeral","key":"seller","created_at":1714124440,"kind":20000,"tags":[],"content":"typing"}"#,
+ )
+ .expect("ephemeral"),
+ )
+ .expect("ephemeral event");
+ let mut repository = InMemoryRepository::new();
+ let ingestor = EventIngestor::default();
+ let auth = ingestor
+ .ingest(
+ &mut repository,
+ auth,
+ &AdmissionContext::unauthenticated(),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect("auth");
+ let ephemeral = ingestor
+ .ingest(
+ &mut repository,
+ ephemeral,
+ &AdmissionContext::authenticated(seller),
+ UnixTimestamp::new(1_714_124_502),
+ UnixTimestamp::new(1_714_124_502),
+ )
+ .expect("ephemeral");
+
+ assert_eq!(ingestor.validator().limits(), RuntimeLimits::default());
+ assert_eq!(auth.effect(), EventIngestionEffect::Authenticated);
+ assert_eq!(ephemeral.effect(), EventIngestionEffect::EphemeralAccepted);
+ assert_eq!(auth.raw_event_outcome(), None);
+ assert_eq!(ephemeral.raw_event_outcome(), None);
+ assert_eq!(repository.events().expect("events"), Vec::new());
+ }
+
+ #[test]
+ fn event_ingestor_stores_raw_events_and_projects_approved_listings() {
+ let event = build_fixture_event(&valid_public_listing_spec()).expect("event");
+ let seller = FixtureKey::Seller.public_key();
+ let projection_address = AddressCoordinate::from_event(&event)
+ .expect("address")
+ .expect("address");
+ let mut repository = InMemoryRepository::new();
+ let ingestor = EventIngestor::new(EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(seller.clone()),
+ ));
+ let ingestion = ingestor
+ .ingest(
+ &mut repository,
+ event.clone(),
+ &AdmissionContext::authenticated(seller),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect("ingestion");
+
+ assert_eq!(ingestion.event_id(), event.id());
+ assert_eq!(ingestion.effect(), EventIngestionEffect::Stored);
+ assert_eq!(
+ ingestion.raw_event_outcome(),
+ Some(StoreEventOutcome::Inserted)
+ );
+ assert_eq!(
+ ingestion.projection_outcome(),
+ Some(StoreProjectionOutcome::Inserted)
+ );
+ assert_eq!(ingestion.deletion_marker_count(), 0);
+ assert_eq!(
+ repository
+ .event_by_id(event.id())
+ .expect("event")
+ .expect("stored")
+ .event(),
+ &event
+ );
+ assert!(
+ repository
+ .listing_projection(&projection_address)
+ .expect("projection")
+ .is_some()
+ );
+ }
+
+ #[test]
+ fn event_ingestor_stores_projection_ineligible_listing_without_projection() {
+ let event = build_fixture_event(&projection_ineligible_listing_spec()).expect("event");
+ let seller = FixtureKey::Seller.public_key();
+ let mut repository = InMemoryRepository::new();
+ let ingestor = EventIngestor::new(EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(seller.clone()),
+ ));
+ let ingestion = ingestor
+ .ingest(
+ &mut repository,
+ event,
+ &AdmissionContext::authenticated(seller),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect("ingestion");
+
+ assert_eq!(ingestion.effect(), EventIngestionEffect::Stored);
+ assert_eq!(
+ ingestion.raw_event_outcome(),
+ Some(StoreEventOutcome::Inserted)
+ );
+ assert_eq!(ingestion.projection_outcome(), None);
+ }
+
+ #[test]
+ fn event_ingestor_creates_deletion_markers_after_raw_insert() {
+ let event = build_fixture_event(&deletion_event_spec()).expect("deletion");
+ let seller = FixtureKey::Seller.public_key();
+ let mut repository = InMemoryRepository::new();
+ let ingestion = EventIngestor::default()
+ .ingest(
+ &mut repository,
+ event.clone(),
+ &AdmissionContext::authenticated(seller.clone()),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect("ingestion");
+ let markers = repository.deletion_markers().expect("markers");
+
+ assert_eq!(ingestion.effect(), EventIngestionEffect::Stored);
+ assert_eq!(ingestion.deletion_marker_count(), 1);
+ assert_eq!(markers.len(), 1);
+ assert_eq!(markers[0].deletion_event_id(), event.id());
+ assert_eq!(markers[0].author_pubkey(), &seller);
+ assert_eq!(markers[0].deleted_at(), event.unsigned().created_at());
+ }
+
+ #[test]
+ fn event_ingestor_skips_duplicate_side_effects() {
+ let event = build_fixture_event(&valid_public_listing_spec()).expect("event");
+ let seller = FixtureKey::Seller.public_key();
+ let mut repository = InMemoryRepository::new();
+ let ingestor = EventIngestor::new(EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(seller.clone()),
+ ));
+ let first = ingestor
+ .ingest(
+ &mut repository,
+ event.clone(),
+ &AdmissionContext::authenticated(seller.clone()),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect("first");
+ let duplicate = ingestor
+ .ingest(
+ &mut repository,
+ event,
+ &AdmissionContext::authenticated(seller),
+ UnixTimestamp::new(1_714_124_502),
+ UnixTimestamp::new(1_714_124_502),
+ )
+ .expect("duplicate");
+
+ assert_eq!(
+ first.projection_outcome(),
+ Some(StoreProjectionOutcome::Inserted)
+ );
+ assert_eq!(duplicate.effect(), EventIngestionEffect::Duplicate);
+ assert_eq!(
+ duplicate.raw_event_outcome(),
+ Some(StoreEventOutcome::Duplicate)
+ );
+ assert_eq!(duplicate.projection_outcome(), None);
+ }
+
+ #[test]
+ fn event_ingestor_reports_validation_and_repository_rejections() {
+ let seller = FixtureKey::Seller.public_key();
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let deletion = build_fixture_event(&deletion_event_spec()).expect("deletion");
+ let note = build_fixture_event(
+ &fixture_spec_from_json(
+ r#"{"name":"note","key":"seller","created_at":1714124437,"kind":1,"tags":[],"content":"hello"}"#,
+ )
+ .expect("note"),
+ )
+ .expect("note event");
+ let validation_rejection = EventIngestor::default()
+ .ingest(
+ &mut InMemoryRepository::new(),
+ note.clone(),
+ &AdmissionContext::unauthenticated(),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect_err("validation");
+ let repository_rejection = EventIngestor::default()
+ .ingest(
+ &mut RawFailingRepository,
+ note,
+ &AdmissionContext::authenticated(seller),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect_err("repository");
+ let projection_rejection = EventIngestor::new(EventValidator::new(
+ RuntimeLimits::default(),
+ AdmissionPolicy::new().approve_seller(FixtureKey::Seller.public_key()),
+ ))
+ .ingest(
+ &mut ProjectionFailingRepository::new(),
+ listing,
+ &AdmissionContext::authenticated(FixtureKey::Seller.public_key()),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect_err("projection repository");
+ let deletion_rejection = EventIngestor::default()
+ .ingest(
+ &mut DeletionFailingRepository::new(),
+ deletion,
+ &AdmissionContext::authenticated(FixtureKey::Seller.public_key()),
+ UnixTimestamp::new(1_714_124_501),
+ UnixTimestamp::new(1_714_124_501),
+ )
+ .expect_err("deletion repository");
+
+ assert_eq!(
+ validation_rejection.kind(),
+ EventIngestionRejectionKind::Validation
+ );
+ assert_eq!(
+ repository_rejection.kind(),
+ EventIngestionRejectionKind::Repository
+ );
+ assert!(validation_rejection.to_string().starts_with("validation:"));
+ assert_eq!(
+ repository_rejection.to_string(),
+ "repository: repository unavailable"
+ );
+ assert_eq!(
+ projection_rejection.kind(),
+ EventIngestionRejectionKind::Repository
+ );
+ assert_eq!(
+ projection_rejection.to_string(),
+ "repository: projection unavailable"
+ );
+ assert_eq!(
+ deletion_rejection.kind(),
+ EventIngestionRejectionKind::Repository
+ );
+ assert_eq!(
+ deletion_rejection.to_string(),
+ "repository: deletion unavailable"
+ );
+ assert_eq!(EventIngestionEffect::Stored.to_string(), "stored");
+ assert_eq!(
+ [
+ EventIngestionEffect::Authenticated.as_str(),
+ EventIngestionEffect::EphemeralAccepted.as_str(),
+ EventIngestionEffect::Stored.as_str(),
+ EventIngestionEffect::Duplicate.as_str(),
+ ],
+ ["authenticated", "ephemeral accepted", "stored", "duplicate",]
+ );
+ }
+
+ #[test]
+ fn failing_repository_helpers_cover_trait_surfaces() {
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let deletion = build_fixture_event(&deletion_event_spec()).expect("deletion");
+ let projection = evaluate_listing_projection(&listing)
+ .projection()
+ .expect("projection")
+ .clone();
+ let address = projection.identity().address().clone();
+ let deletion_request = parse_deletion_request(&deletion)
+ .expect("deletion parse")
+ .expect("deletion request");
+ let marker = DeletionMarker::new(
+ deletion.id().clone(),
+ deletion.unsigned().pubkey().clone(),
+ deletion_request.targets()[0].clone(),
+ deletion.unsigned().created_at(),
+ );
+ let stored = StoredEvent::new(listing.clone(), UnixTimestamp::new(1_714_124_501));
+ let mut raw = RawFailingRepository;
+
+ assert!(raw.event_by_id(listing.id()).is_err());
+ assert!(raw.events().is_err());
+ assert!(raw.put_listing_projection(projection.clone()).is_err());
+ assert!(raw.listing_projection(&address).is_err());
+ assert!(raw.put_deletion_marker(marker.clone()).is_err());
+ assert!(raw.deletion_markers().is_err());
+
+ let mut projection_failing = ProjectionFailingRepository::new();
+ assert_eq!(
+ projection_failing.put_event(stored.clone()).expect("raw"),
+ StoreEventOutcome::Inserted
+ );
+ assert_eq!(
+ projection_failing
+ .event_by_id(listing.id())
+ .expect("event")
+ .expect("stored")
+ .event(),
+ &listing
+ );
+ assert_eq!(projection_failing.events().expect("events").len(), 1);
+ assert!(
+ projection_failing
+ .put_listing_projection(projection.clone())
+ .is_err()
+ );
+ assert_eq!(
+ projection_failing
+ .listing_projection(&address)
+ .expect("projection"),
+ None
+ );
+ assert_eq!(
+ projection_failing.put_deletion_marker(marker.clone()),
+ Ok(())
+ );
+ assert_eq!(
+ projection_failing.deletion_markers().expect("markers"),
+ vec![marker.clone()]
+ );
+
+ let mut deletion_failing = DeletionFailingRepository::new();
+ assert_eq!(
+ deletion_failing.put_event(stored).expect("raw"),
+ StoreEventOutcome::Inserted
+ );
+ assert_eq!(
+ deletion_failing
+ .put_listing_projection(projection.clone())
+ .expect("projection"),
+ StoreProjectionOutcome::Inserted
+ );
+ assert_eq!(
+ deletion_failing
+ .listing_projection(&address)
+ .expect("projection"),
+ Some(projection)
+ );
+ assert!(deletion_failing.put_deletion_marker(marker).is_err());
+ assert_eq!(
+ deletion_failing.deletion_markers().expect("markers"),
+ Vec::new()
+ );
+ assert!(
+ deletion_failing
+ .event_by_id(listing.id())
+ .expect("event")
+ .is_some()
+ );
+ assert_eq!(deletion_failing.events().expect("events").len(), 1);
+ }
+
fn limits_with(update: impl FnOnce(&mut RuntimeLimitValues)) -> RuntimeLimits {
let mut values = RuntimeLimitValues::default();
update(&mut values);
@@ -1919,6 +2521,155 @@ mod tests {
)
}
+ struct RawFailingRepository;
+
+ impl RawEventRepository for RawFailingRepository {
+ fn put_event(
+ &mut self,
+ _record: StoredEvent,
+ ) -> Result<StoreEventOutcome, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+
+ fn event_by_id(&self, _event_id: &EventId) -> Result<Option<StoredEvent>, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+
+ fn events(&self) -> Result<Vec<StoredEvent>, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+ }
+
+ impl ListingProjectionRepository for RawFailingRepository {
+ fn put_listing_projection(
+ &mut self,
+ _projection: ListingProjection,
+ ) -> Result<StoreProjectionOutcome, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+
+ fn listing_projection(
+ &self,
+ _address: &AddressCoordinate,
+ ) -> Result<Option<ListingProjection>, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+ }
+
+ impl DeletionMarkerRepository for RawFailingRepository {
+ fn put_deletion_marker(&mut self, _marker: DeletionMarker) -> Result<(), RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+
+ fn deletion_markers(&self) -> Result<Vec<DeletionMarker>, RepositoryError> {
+ Err(RepositoryError::new("repository unavailable"))
+ }
+ }
+
+ struct ProjectionFailingRepository {
+ inner: InMemoryRepository,
+ }
+
+ impl ProjectionFailingRepository {
+ fn new() -> Self {
+ Self {
+ inner: InMemoryRepository::new(),
+ }
+ }
+ }
+
+ impl RawEventRepository for ProjectionFailingRepository {
+ fn put_event(&mut self, record: StoredEvent) -> Result<StoreEventOutcome, RepositoryError> {
+ self.inner.put_event(record)
+ }
+
+ fn event_by_id(&self, event_id: &EventId) -> Result<Option<StoredEvent>, RepositoryError> {
+ self.inner.event_by_id(event_id)
+ }
+
+ fn events(&self) -> Result<Vec<StoredEvent>, RepositoryError> {
+ self.inner.events()
+ }
+ }
+
+ impl ListingProjectionRepository for ProjectionFailingRepository {
+ fn put_listing_projection(
+ &mut self,
+ _projection: ListingProjection,
+ ) -> Result<StoreProjectionOutcome, RepositoryError> {
+ Err(RepositoryError::new("projection unavailable"))
+ }
+
+ fn listing_projection(
+ &self,
+ address: &AddressCoordinate,
+ ) -> Result<Option<ListingProjection>, RepositoryError> {
+ self.inner.listing_projection(address)
+ }
+ }
+
+ impl DeletionMarkerRepository for ProjectionFailingRepository {
+ fn put_deletion_marker(&mut self, marker: DeletionMarker) -> Result<(), RepositoryError> {
+ self.inner.put_deletion_marker(marker)
+ }
+
+ fn deletion_markers(&self) -> Result<Vec<DeletionMarker>, RepositoryError> {
+ self.inner.deletion_markers()
+ }
+ }
+
+ struct DeletionFailingRepository {
+ inner: InMemoryRepository,
+ }
+
+ impl DeletionFailingRepository {
+ fn new() -> Self {
+ Self {
+ inner: InMemoryRepository::new(),
+ }
+ }
+ }
+
+ impl RawEventRepository for DeletionFailingRepository {
+ fn put_event(&mut self, record: StoredEvent) -> Result<StoreEventOutcome, RepositoryError> {
+ self.inner.put_event(record)
+ }
+
+ fn event_by_id(&self, event_id: &EventId) -> Result<Option<StoredEvent>, RepositoryError> {
+ self.inner.event_by_id(event_id)
+ }
+
+ fn events(&self) -> Result<Vec<StoredEvent>, RepositoryError> {
+ self.inner.events()
+ }
+ }
+
+ impl ListingProjectionRepository for DeletionFailingRepository {
+ fn put_listing_projection(
+ &mut self,
+ projection: ListingProjection,
+ ) -> Result<StoreProjectionOutcome, RepositoryError> {
+ self.inner.put_listing_projection(projection)
+ }
+
+ fn listing_projection(
+ &self,
+ address: &AddressCoordinate,
+ ) -> Result<Option<ListingProjection>, RepositoryError> {
+ self.inner.listing_projection(address)
+ }
+ }
+
+ impl DeletionMarkerRepository for DeletionFailingRepository {
+ fn put_deletion_marker(&mut self, _marker: DeletionMarker) -> Result<(), RepositoryError> {
+ Err(RepositoryError::new("deletion unavailable"))
+ }
+
+ fn deletion_markers(&self) -> Result<Vec<DeletionMarker>, RepositoryError> {
+ self.inner.deletion_markers()
+ }
+ }
+
fn pubkey(hex: &str) -> PublicKeyHex {
PublicKeyHex::new(&hex.repeat(PublicKeyHex::HEX_LENGTH)).expect("pubkey")
}