commit 955ac53189a6abc005436f46165dbb83475f68ab
parent 9e38332b3a5c4e91bd66f9f2f6d7439ef89d6acd
Author: triesap <tyson@radroots.org>
Date: Thu, 18 Jun 2026 20:16:23 -0700
sdk: extract private workflow enqueue helper
Diffstat:
3 files changed, 209 insertions(+), 154 deletions(-)
diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs
@@ -36,6 +36,8 @@ mod relay_targets;
mod runtime;
#[cfg(feature = "runtime")]
mod sync_runtime;
+#[cfg(feature = "runtime")]
+mod workflow_runtime;
pub use crate::client::{
FarmClient, ListingClient, ProfileClient, RadrootsSdkClient, SdkPublishError,
diff --git a/crates/sdk/src/listings_runtime.rs b/crates/sdk/src/listings_runtime.rs
@@ -1,24 +1,20 @@
#[cfg(feature = "runtime")]
use crate::{
ListingsClient, RadrootsSdkError, RadrootsSdkTimestamp, SdkIdempotencyKey,
- SdkRelayTargetPolicy, SdkRelayTargetSet, SdkRelayUrlPolicy, runtime::sdk_now_ms,
+ SdkRelayTargetPolicy, SdkRelayUrlPolicy,
+ workflow_runtime::{SdkWorkflowEnqueueRequest, enqueue_signed_workflow},
};
#[cfg(feature = "runtime")]
-use radroots_authority::{
- RadrootsActorContext, RadrootsActorSource, RadrootsEventSigner, sign_authorized_draft,
-};
-#[cfg(feature = "runtime")]
-use radroots_event_store::RadrootsEventIngest;
+use radroots_authority::{RadrootsActorContext, RadrootsActorSource, RadrootsEventSigner};
#[cfg(feature = "runtime")]
use radroots_events::{
- RadrootsNostrEvent,
contract::RadrootsActorRole,
- draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent},
+ draft::RadrootsFrozenEventDraft,
ids::{RadrootsEventId, RadrootsListingAddress},
listing::RadrootsListing,
};
#[cfg(feature = "runtime")]
-use radroots_outbox::{RadrootsOutboxEnqueueStatus, RadrootsOutboxSignedOperationInput};
+use radroots_outbox::RadrootsOutboxEnqueueStatus;
#[cfg(feature = "runtime")]
use radroots_trade::listing::{
RadrootsCanonicalListingDraft, RadrootsListingDraftDocumentV1, RadrootsListingMutation,
@@ -26,8 +22,6 @@ use radroots_trade::listing::{
};
#[cfg(feature = "runtime")]
use serde::ser::SerializeStruct;
-#[cfg(feature = "runtime")]
-use sha2::{Digest, Sha256};
#[cfg(feature = "runtime")]
const LISTING_PUBLISH_OPERATION_KIND: &str = "listing.publish.v1";
@@ -258,84 +252,31 @@ impl<'sdk> ListingsClient<'sdk> {
where
S: RadrootsEventSigner + ?Sized,
{
- let target_relays = self.resolved_target_relays(&target_relays)?;
- let signed_event = sign_authorized_draft(actor, signer, &plan.frozen_draft)?;
- let idempotency_key = match idempotency_key {
- Some(idempotency_key) => idempotency_key,
- None => SdkIdempotencyKey::derive(
- LISTING_PUBLISH_OPERATION_KIND,
- plan.frozen_draft.expected_event_id.as_str(),
- plan.frozen_draft.expected_pubkey.as_str(),
- target_relays.canonical_relays(),
- )?,
- };
- let observed_at_ms = sdk_now_ms(self.sdk)?;
- let signed_event_id = parse_event_id(signed_event.id.as_str(), "signed event id")?;
- let event = event_from_signed(&signed_event);
- let ingest = RadrootsEventIngest::new(event, observed_at_ms)
- .with_raw_json(signed_event.raw_json.clone());
- let ingest_receipt = self.sdk._event_store.ingest_event(ingest).await?;
- let canonical_target_relays = target_relays.canonical_relays().to_vec();
- let target_relay_values = target_relays.into_vec();
- let partial_failure_digest_prefix =
- outbox_idempotency_digest_prefix(&plan, canonical_target_relays.as_slice())?;
- let outbox_input = signed_outbox_input(
- &plan,
- signed_event.clone(),
- target_relay_values,
- idempotency_key,
- ingest_receipt.inserted,
- observed_at_ms,
- );
- let outbox_receipt = self
- .sdk
- ._outbox
- .enqueue_signed_operation(outbox_input)
- .await
- .map_err(|error| {
- if matches!(
- error,
- radroots_outbox::RadrootsOutboxError::IdempotencyConflict { .. }
- ) {
- RadrootsSdkError::partial_outbox_idempotency_conflict_mutation(
- signed_event_id.as_str(),
- LISTING_PUBLISH_OPERATION_KIND,
- partial_failure_digest_prefix.as_str(),
- )
- } else {
- RadrootsSdkError::partial_outbox_enqueue_mutation(
- signed_event_id.as_str(),
- LISTING_PUBLISH_OPERATION_KIND,
- partial_failure_digest_prefix.as_str(),
- )
- }
- })?;
- let idempotency_digest_prefix = digest_prefix(outbox_receipt.idempotency_digest.as_str());
+ let enqueue = enqueue_signed_workflow(
+ self.sdk,
+ SdkWorkflowEnqueueRequest {
+ operation_kind: LISTING_PUBLISH_OPERATION_KIND,
+ actor,
+ frozen_draft: &plan.frozen_draft,
+ target_relays,
+ idempotency_key,
+ },
+ signer,
+ )
+ .await?;
Ok(ListingEnqueueReceipt {
public_listing_addr: plan.public_listing_addr,
draft_listing_addr: plan.draft_listing_addr,
expected_event_id: plan.expected_event_id,
- signed_event_id,
- local_event_seq: ingest_receipt.seq,
- outbox_operation_id: outbox_receipt.operation_id,
- outbox_event_id: outbox_receipt.outbox_event_id,
- state: outbox_receipt.status.into(),
- idempotency_digest_prefix: Some(idempotency_digest_prefix),
+ signed_event_id: enqueue.signed_event_id,
+ local_event_seq: enqueue.local_event_seq,
+ outbox_operation_id: enqueue.outbox_operation_id,
+ outbox_event_id: enqueue.outbox_event_id,
+ state: enqueue.state.into(),
+ idempotency_digest_prefix: Some(enqueue.idempotency_digest_prefix),
})
}
- fn resolved_target_relays(
- &self,
- target_relays: &SdkRelayTargetPolicy,
- ) -> Result<SdkRelayTargetSet, RadrootsSdkError> {
- match target_relays {
- SdkRelayTargetPolicy::Explicit(target_relays) => Ok(target_relays.clone()),
- SdkRelayTargetPolicy::UseConfiguredRelays => {
- SdkRelayTargetSet::from_normalized_relays(self.sdk.relay_urls().to_vec())
- }
- }
- }
-
fn resolved_created_at(
&self,
created_at: Option<RadrootsSdkTimestamp>,
@@ -381,78 +322,6 @@ fn listing_publish_plan(
}
#[cfg(feature = "runtime")]
-#[derive(serde::Serialize)]
-struct ListingOutboxDigestInput<'a> {
- operation_kind: &'static str,
- expected_pubkey: &'a str,
- draft: &'a RadrootsFrozenEventDraft,
- target_relays: &'a [String],
-}
-
-#[cfg(feature = "runtime")]
-fn outbox_idempotency_digest_prefix(
- plan: &ListingPublishPlan,
- target_relays: &[String],
-) -> Result<String, RadrootsSdkError> {
- let input = ListingOutboxDigestInput {
- operation_kind: LISTING_PUBLISH_OPERATION_KIND,
- expected_pubkey: plan.frozen_draft.expected_pubkey.as_str(),
- draft: &plan.frozen_draft,
- target_relays,
- };
- let bytes = serde_json::to_vec(&input).map_err(|error| RadrootsSdkError::InvalidRequest {
- message: format!("listing outbox idempotency digest failed: {error}"),
- })?;
- Ok(digest_prefix(hex::encode(Sha256::digest(bytes)).as_str()))
-}
-
-#[cfg(feature = "runtime")]
-fn digest_prefix(digest: &str) -> String {
- digest.chars().take(12).collect()
-}
-
-#[cfg(feature = "runtime")]
-fn parse_event_id(value: &str, field: &str) -> Result<RadrootsEventId, RadrootsSdkError> {
- RadrootsEventId::parse(value).map_err(|error| RadrootsSdkError::InvalidRequest {
- message: format!("{field} is invalid: {error}"),
- })
-}
-
-#[cfg(feature = "runtime")]
-fn signed_outbox_input(
- plan: &ListingPublishPlan,
- signed_event: RadrootsSignedNostrEvent,
- target_relays: Vec<String>,
- idempotency_key: SdkIdempotencyKey,
- event_store_inserted: bool,
- observed_at_ms: i64,
-) -> RadrootsOutboxSignedOperationInput {
- RadrootsOutboxSignedOperationInput::new(
- LISTING_PUBLISH_OPERATION_KIND,
- plan.frozen_draft.clone(),
- signed_event,
- target_relays,
- event_store_inserted,
- observed_at_ms,
- observed_at_ms,
- )
- .with_idempotency_key(idempotency_key.into_string())
-}
-
-#[cfg(feature = "runtime")]
-fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent {
- RadrootsNostrEvent {
- id: signed_event.id.clone(),
- author: signed_event.pubkey.clone(),
- created_at: signed_event.created_at,
- kind: signed_event.kind,
- tags: signed_event.tags.clone(),
- content: signed_event.content.clone(),
- sig: signed_event.sig.clone(),
- }
-}
-
-#[cfg(feature = "runtime")]
struct SdkActorContextJson<'a>(&'a RadrootsActorContext);
#[cfg(feature = "runtime")]
diff --git a/crates/sdk/src/workflow_runtime.rs b/crates/sdk/src/workflow_runtime.rs
@@ -0,0 +1,184 @@
+use crate::{
+ RadrootsSdk, RadrootsSdkError, SdkIdempotencyKey, SdkRelayTargetPolicy, SdkRelayTargetSet,
+ runtime::sdk_now_ms,
+};
+use radroots_authority::{RadrootsActorContext, RadrootsEventSigner, sign_authorized_draft};
+use radroots_event_store::RadrootsEventIngest;
+use radroots_events::{
+ RadrootsNostrEvent,
+ draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent},
+ ids::RadrootsEventId,
+};
+use radroots_outbox::{RadrootsOutboxEnqueueStatus, RadrootsOutboxSignedOperationInput};
+use sha2::{Digest, Sha256};
+
+pub(crate) struct SdkWorkflowEnqueueRequest<'a> {
+ pub(crate) operation_kind: &'static str,
+ pub(crate) actor: &'a RadrootsActorContext,
+ pub(crate) frozen_draft: &'a RadrootsFrozenEventDraft,
+ pub(crate) target_relays: SdkRelayTargetPolicy,
+ pub(crate) idempotency_key: Option<SdkIdempotencyKey>,
+}
+
+pub(crate) struct SdkWorkflowEnqueueReceipt {
+ pub(crate) signed_event_id: RadrootsEventId,
+ pub(crate) local_event_seq: i64,
+ pub(crate) outbox_operation_id: i64,
+ pub(crate) outbox_event_id: i64,
+ pub(crate) state: RadrootsOutboxEnqueueStatus,
+ pub(crate) idempotency_digest_prefix: String,
+}
+
+pub(crate) async fn enqueue_signed_workflow<S>(
+ sdk: &RadrootsSdk,
+ request: SdkWorkflowEnqueueRequest<'_>,
+ signer: &S,
+) -> Result<SdkWorkflowEnqueueReceipt, RadrootsSdkError>
+where
+ S: RadrootsEventSigner + ?Sized,
+{
+ let target_relays = resolved_target_relays(sdk, &request.target_relays)?;
+ let signed_event = sign_authorized_draft(request.actor, signer, request.frozen_draft)?;
+ let idempotency_key = match request.idempotency_key {
+ Some(idempotency_key) => idempotency_key,
+ None => SdkIdempotencyKey::derive(
+ request.operation_kind,
+ request.frozen_draft.expected_event_id.as_str(),
+ request.frozen_draft.expected_pubkey.as_str(),
+ target_relays.canonical_relays(),
+ )?,
+ };
+ let observed_at_ms = sdk_now_ms(sdk)?;
+ let signed_event_id = parse_event_id(signed_event.id.as_str(), "signed event id")?;
+ let event = event_from_signed(&signed_event);
+ let ingest = RadrootsEventIngest::new(event, observed_at_ms)
+ .with_raw_json(signed_event.raw_json.clone());
+ let ingest_receipt = sdk._event_store.ingest_event(ingest).await?;
+ let canonical_target_relays = target_relays.canonical_relays().to_vec();
+ let target_relay_values = target_relays.into_vec();
+ let partial_failure_digest_prefix = outbox_idempotency_digest_prefix(
+ request.operation_kind,
+ request.frozen_draft,
+ canonical_target_relays.as_slice(),
+ )?;
+ let outbox_input = signed_outbox_input(
+ request.operation_kind,
+ request.frozen_draft,
+ signed_event,
+ target_relay_values,
+ idempotency_key,
+ ingest_receipt.inserted,
+ observed_at_ms,
+ );
+ let outbox_receipt = sdk
+ ._outbox
+ .enqueue_signed_operation(outbox_input)
+ .await
+ .map_err(|error| {
+ if matches!(
+ error,
+ radroots_outbox::RadrootsOutboxError::IdempotencyConflict { .. }
+ ) {
+ RadrootsSdkError::partial_outbox_idempotency_conflict_mutation(
+ signed_event_id.as_str(),
+ request.operation_kind,
+ partial_failure_digest_prefix.as_str(),
+ )
+ } else {
+ RadrootsSdkError::partial_outbox_enqueue_mutation(
+ signed_event_id.as_str(),
+ request.operation_kind,
+ partial_failure_digest_prefix.as_str(),
+ )
+ }
+ })?;
+ let idempotency_digest_prefix = digest_prefix(outbox_receipt.idempotency_digest.as_str());
+ Ok(SdkWorkflowEnqueueReceipt {
+ signed_event_id,
+ local_event_seq: ingest_receipt.seq,
+ outbox_operation_id: outbox_receipt.operation_id,
+ outbox_event_id: outbox_receipt.outbox_event_id,
+ state: outbox_receipt.status,
+ idempotency_digest_prefix,
+ })
+}
+
+fn resolved_target_relays(
+ sdk: &RadrootsSdk,
+ target_relays: &SdkRelayTargetPolicy,
+) -> Result<SdkRelayTargetSet, RadrootsSdkError> {
+ match target_relays {
+ SdkRelayTargetPolicy::Explicit(target_relays) => Ok(target_relays.clone()),
+ SdkRelayTargetPolicy::UseConfiguredRelays => {
+ SdkRelayTargetSet::from_normalized_relays(sdk.relay_urls().to_vec())
+ }
+ }
+}
+
+#[derive(serde::Serialize)]
+struct SdkWorkflowOutboxDigestInput<'a> {
+ operation_kind: &'static str,
+ expected_pubkey: &'a str,
+ draft: &'a RadrootsFrozenEventDraft,
+ target_relays: &'a [String],
+}
+
+fn outbox_idempotency_digest_prefix(
+ operation_kind: &'static str,
+ frozen_draft: &RadrootsFrozenEventDraft,
+ target_relays: &[String],
+) -> Result<String, RadrootsSdkError> {
+ let input = SdkWorkflowOutboxDigestInput {
+ operation_kind,
+ expected_pubkey: frozen_draft.expected_pubkey.as_str(),
+ draft: frozen_draft,
+ target_relays,
+ };
+ let bytes = serde_json::to_vec(&input).map_err(|error| RadrootsSdkError::InvalidRequest {
+ message: format!("workflow outbox idempotency digest failed: {error}"),
+ })?;
+ Ok(digest_prefix(hex::encode(Sha256::digest(bytes)).as_str()))
+}
+
+fn digest_prefix(digest: &str) -> String {
+ digest.chars().take(12).collect()
+}
+
+fn parse_event_id(value: &str, field: &str) -> Result<RadrootsEventId, RadrootsSdkError> {
+ RadrootsEventId::parse(value).map_err(|error| RadrootsSdkError::InvalidRequest {
+ message: format!("{field} is invalid: {error}"),
+ })
+}
+
+fn signed_outbox_input(
+ operation_kind: &'static str,
+ frozen_draft: &RadrootsFrozenEventDraft,
+ signed_event: RadrootsSignedNostrEvent,
+ target_relays: Vec<String>,
+ idempotency_key: SdkIdempotencyKey,
+ event_store_inserted: bool,
+ observed_at_ms: i64,
+) -> RadrootsOutboxSignedOperationInput {
+ RadrootsOutboxSignedOperationInput::new(
+ operation_kind,
+ frozen_draft.clone(),
+ signed_event,
+ target_relays,
+ event_store_inserted,
+ observed_at_ms,
+ observed_at_ms,
+ )
+ .with_idempotency_key(idempotency_key.into_string())
+}
+
+fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent {
+ RadrootsNostrEvent {
+ id: signed_event.id.clone(),
+ author: signed_event.pubkey.clone(),
+ created_at: signed_event.created_at,
+ kind: signed_event.kind,
+ tags: signed_event.tags.clone(),
+ content: signed_event.content.clone(),
+ sig: signed_event.sig.clone(),
+ }
+}