radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit ccd52d1f2c358671e5a3b10bda92f0e001ad28b6
parent b783a54d5b2064a638660f78bc34f397a837edee
Author: triesap <triesap@radroots.dev>
Date:   Sun,  4 Jan 2026 15:33:55 +0000

jsonrpc: add JSON-RPC methods for dvm request/result/feedback events

- Register events.dvm_request.* methods and module
- Register events.dvm_result.* methods and module
- Register events.dvm_feedback.* methods and module
- Add row builders, kind validation, and decode/sort tests

Diffstat:
Asrc/api/jsonrpc/methods/events/dvm_feedback/get.rs | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_feedback/list.rs | 237+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_feedback/mod.rs | 18++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_feedback/publish.rs | 51+++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_request/get.rs | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_request/list.rs | 252+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_request/mod.rs | 18++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_request/publish.rs | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_result/get.rs | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_result/list.rs | 272+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_result/mod.rs | 18++++++++++++++++++
Asrc/api/jsonrpc/methods/events/dvm_result/publish.rs | 51+++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/api/jsonrpc/methods/events/mod.rs | 3+++
Msrc/api/jsonrpc/methods/mod.rs | 3+++
14 files changed, 1145 insertions(+), 0 deletions(-)

diff --git a/src/api/jsonrpc/methods/events/dvm_feedback/get.rs b/src/api/jsonrpc/methods/events/dvm_feedback/get.rs @@ -0,0 +1,55 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::kinds::KIND_JOB_FEEDBACK; +use radroots_nostr::prelude::{RadrootsNostrEventId, RadrootsNostrFilter}; + +use super::list::{build_dvm_feedback_rows, DvmFeedbackRow}; +use crate::api::jsonrpc::methods::events::helpers::{fetch_latest_event, require_non_empty}; + +#[derive(Debug, Deserialize)] +struct DvmFeedbackGetParams { + id: String, + #[serde(default)] + timeout_secs: Option<u64>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmFeedbackGetResponse { + feedback: Option<DvmFeedbackRow>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_feedback.get"); + m.register_async_method("events.dvm_feedback.get", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmFeedbackGetParams { id, timeout_secs } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let id = require_non_empty("id", id)?; + let event_id = RadrootsNostrEventId::parse(&id) + .map_err(|e| RpcError::InvalidParams(format!("invalid id: {e}")))?; + + let filter = RadrootsNostrFilter::new().id(event_id); + + let event = fetch_latest_event(&ctx.state.client, filter, timeout_secs).await?; + let feedback = event.and_then(|event| { + let kind = event.kind.as_u16() as u32; + if kind != KIND_JOB_FEEDBACK { + return None; + } + build_dvm_feedback_rows(vec![event]).into_iter().next() + }); + + Ok::<DvmFeedbackGetResponse, RpcError>(DvmFeedbackGetResponse { feedback }) + })?; + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/dvm_feedback/list.rs b/src/api/jsonrpc/methods/events/dvm_feedback/list.rs @@ -0,0 +1,237 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +use crate::api::jsonrpc::nostr::{event_tags, event_view_with_tags}; +use crate::api::jsonrpc::params::{ + apply_time_bounds, + limit_or, + parse_pubkeys_opt, + timeout_or, +}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_feedback::RadrootsJobFeedback; +use radroots_events::kinds::KIND_JOB_FEEDBACK; +use radroots_events_codec::job::feedback::decode::job_feedback_from_tags; +use radroots_nostr::prelude::{ + RadrootsNostrEvent, + RadrootsNostrEventId, + RadrootsNostrFilter, + RadrootsNostrKind, +}; + +#[derive(Clone, Debug, Serialize)] +pub(crate) struct DvmFeedbackRow { + id: String, + author: String, + created_at: u64, + kind: u32, + tags: Vec<Vec<String>>, + content: String, + sig: String, + feedback: Option<RadrootsJobFeedback>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmFeedbackListResponse { + feedbacks: Vec<DvmFeedbackRow>, +} + +#[derive(Debug, Default, Deserialize)] +struct DvmFeedbackListParams { + #[serde(default)] + authors: Option<Vec<String>>, + #[serde(default)] + limit: Option<u64>, + #[serde(default)] + since: Option<u64>, + #[serde(default)] + until: Option<u64>, + #[serde(default)] + timeout_secs: Option<u64>, + #[serde(default)] + request_id: Option<String>, +} + +pub(crate) fn build_dvm_feedback_rows<I>(events: I) -> Vec<DvmFeedbackRow> +where + I: IntoIterator<Item = RadrootsNostrEvent>, +{ + let mut items = events + .into_iter() + .map(|ev| { + let tags = event_tags(&ev); + let feedback = parse_dvm_feedback_event(&ev, &tags); + let event = event_view_with_tags(&ev, tags); + DvmFeedbackRow { + id: event.id, + author: event.author, + created_at: event.created_at, + kind: event.kind, + tags: event.tags, + content: event.content, + sig: event.sig, + feedback, + } + }) + .collect::<Vec<_>>(); + items.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + items +} + +fn parse_dvm_feedback_event( + event: &RadrootsNostrEvent, + tags: &[Vec<String>], +) -> Option<RadrootsJobFeedback> { + let kind = event.kind.as_u16() as u32; + if kind != KIND_JOB_FEEDBACK { + return None; + } + job_feedback_from_tags(kind, tags, &event.content).ok() +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_feedback.list"); + m.register_async_method("events.dvm_feedback.list", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmFeedbackListParams { + authors, + limit, + since, + until, + timeout_secs, + request_id, + } = params + .parse::<Option<DvmFeedbackListParams>>() + .map_err(|e| RpcError::InvalidParams(e.to_string()))? + .unwrap_or_default(); + + let limit = limit_or(limit); + + let mut filter = RadrootsNostrFilter::new() + .limit(limit) + .kind(RadrootsNostrKind::Custom(KIND_JOB_FEEDBACK as u16)); + + if let Some(authors) = parse_pubkeys_opt("author", authors)? { + filter = filter.authors(authors); + } else { + filter = filter.author(ctx.state.pubkey); + } + filter = apply_time_bounds(filter, since, until); + + if let Some(request_id) = request_id { + let request_id = request_id.trim(); + if request_id.is_empty() { + return Err(RpcError::InvalidParams( + "request_id cannot be empty".to_string(), + )); + } + let request_id = RadrootsNostrEventId::parse(request_id) + .map_err(|e| RpcError::InvalidParams(format!("invalid request_id: {e}")))?; + filter = filter.event(request_id); + } + + let events = ctx + .state + .client + .fetch_events(filter, Duration::from_secs(timeout_or(timeout_secs))) + .await + .map_err(|e| RpcError::Other(format!("fetch failed: {e}")))?; + + let items = build_dvm_feedback_rows(events); + + Ok::<DvmFeedbackListResponse, RpcError>(DvmFeedbackListResponse { feedbacks: items }) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::build_dvm_feedback_rows; + use radroots_events::job::JobFeedbackStatus; + use radroots_events::job_feedback::RadrootsJobFeedback; + use radroots_events::kinds::KIND_JOB_FEEDBACK; + use radroots_events::RadrootsNostrEventPtr; + use radroots_events_codec::job::feedback::encode::job_feedback_build_tags; + use radroots_nostr::prelude::RadrootsNostrEvent; + use serde_json::json; + + fn dvm_feedback_event( + id: &str, + pubkey: &str, + created_at: u64, + tags: Vec<Vec<String>>, + content: &str, + ) -> RadrootsNostrEvent { + let sig = format!("{:0128x}", 11); + let event_json = json!({ + "id": id, + "pubkey": pubkey, + "created_at": created_at, + "kind": KIND_JOB_FEEDBACK, + "tags": tags, + "content": content, + "sig": sig, + }); + serde_json::from_value(event_json).expect("event") + } + + fn sample_feedback() -> RadrootsJobFeedback { + RadrootsJobFeedback { + kind: KIND_JOB_FEEDBACK as u16, + status: JobFeedbackStatus::Success, + extra_info: Some("ok".to_string()), + request_event: RadrootsNostrEventPtr { + id: "req".to_string(), + relays: None, + }, + customer_pubkey: None, + payment: None, + content: Some("payload".to_string()), + encrypted: false, + } + } + + #[test] + fn dvm_feedback_list_sorts_by_created_at_desc() { + let pubkey = "1bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let old_id = format!("{:064x}", 1); + let new_id = format!("{:064x}", 2); + let feedback = sample_feedback(); + let tags = job_feedback_build_tags(&feedback); + let older = dvm_feedback_event(&old_id, pubkey, 100, tags.clone(), "payload"); + let newer = dvm_feedback_event(&new_id, pubkey, 200, tags.clone(), "payload"); + + let feedbacks = build_dvm_feedback_rows(vec![older, newer]); + + assert_eq!(feedbacks.len(), 2); + assert_eq!(feedbacks[0].id, new_id); + assert_eq!(feedbacks[0].created_at, 200); + assert_eq!(feedbacks[1].id, old_id); + assert_eq!(feedbacks[1].created_at, 100); + } + + #[test] + fn dvm_feedback_list_decodes_feedback() { + let pubkey = "2bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let feedback = sample_feedback(); + let content = feedback.content.clone().unwrap(); + let tags = job_feedback_build_tags(&feedback); + let id = format!("{:064x}", 3); + let event = dvm_feedback_event(&id, pubkey, 300, tags.clone(), &content); + + let feedbacks = build_dvm_feedback_rows(vec![event]); + + assert_eq!(feedbacks.len(), 1); + assert_eq!(feedbacks[0].tags, tags); + let decoded = feedbacks[0].feedback.as_ref().expect("feedback"); + assert_eq!(decoded, &feedback); + } +} diff --git a/src/api/jsonrpc/methods/events/dvm_feedback/mod.rs b/src/api/jsonrpc/methods/events/dvm_feedback/mod.rs @@ -0,0 +1,18 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext}; + +pub mod get; +pub mod list; +pub mod publish; + +pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { + let mut m = RpcModule::new(ctx); + list::register(&mut m, &registry)?; + publish::register(&mut m, &registry)?; + get::register(&mut m, &registry)?; + Ok(m) +} diff --git a/src/api/jsonrpc/methods/events/dvm_feedback/publish.rs b/src/api/jsonrpc/methods/events/dvm_feedback/publish.rs @@ -0,0 +1,51 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::Deserialize; + +use crate::api::jsonrpc::nostr::{publish_response, PublishResponse}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_feedback::RadrootsJobFeedback; +use radroots_events_codec::job::encode::canonicalize_tags; +use radroots_events_codec::job::feedback::encode::to_wire_parts; +use radroots_nostr::prelude::{radroots_nostr_build_event, radroots_nostr_send_event}; + +#[derive(Debug, Deserialize)] +struct PublishDvmFeedbackParams { + feedback: RadrootsJobFeedback, + #[serde(default)] + tags: Option<Vec<Vec<String>>>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_feedback.publish"); + m.register_async_method("events.dvm_feedback.publish", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let PublishDvmFeedbackParams { feedback, tags } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let content = feedback.content.clone().unwrap_or_default(); + let mut parts = to_wire_parts(&feedback, &content) + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + if let Some(extra_tags) = tags { + parts.tags.extend(extra_tags); + canonicalize_tags(&mut parts.tags); + } + + let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) + .map_err(|e| RpcError::Other(format!("failed to build dvm feedback event: {e}")))?; + + let output = radroots_nostr_send_event(&ctx.state.client, builder) + .await + .map_err(|e| RpcError::Other(format!("failed to publish dvm feedback: {e}")))?; + + Ok::<PublishResponse, RpcError>(publish_response(output)) + })?; + + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/dvm_request/get.rs b/src/api/jsonrpc/methods/events/dvm_request/get.rs @@ -0,0 +1,55 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::kinds::is_request_kind; +use radroots_nostr::prelude::{RadrootsNostrEventId, RadrootsNostrFilter}; + +use super::list::{build_dvm_request_rows, DvmRequestRow}; +use crate::api::jsonrpc::methods::events::helpers::{fetch_latest_event, require_non_empty}; + +#[derive(Debug, Deserialize)] +struct DvmRequestGetParams { + id: String, + #[serde(default)] + timeout_secs: Option<u64>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmRequestGetResponse { + request: Option<DvmRequestRow>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_request.get"); + m.register_async_method("events.dvm_request.get", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmRequestGetParams { id, timeout_secs } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let id = require_non_empty("id", id)?; + let event_id = RadrootsNostrEventId::parse(&id) + .map_err(|e| RpcError::InvalidParams(format!("invalid id: {e}")))?; + + let filter = RadrootsNostrFilter::new().id(event_id); + + let event = fetch_latest_event(&ctx.state.client, filter, timeout_secs).await?; + let request = event.and_then(|event| { + let kind = event.kind.as_u16() as u32; + if !is_request_kind(kind) { + return None; + } + build_dvm_request_rows(vec![event]).into_iter().next() + }); + + Ok::<DvmRequestGetResponse, RpcError>(DvmRequestGetResponse { request }) + })?; + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/dvm_request/list.rs b/src/api/jsonrpc/methods/events/dvm_request/list.rs @@ -0,0 +1,252 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +use crate::api::jsonrpc::nostr::{event_tags, event_view_with_tags}; +use crate::api::jsonrpc::params::{ + apply_time_bounds, + limit_or, + parse_pubkeys_opt, + timeout_or, +}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_request::RadrootsJobRequest; +use radroots_events::kinds::{is_request_kind, KIND_JOB_REQUEST_MAX, KIND_JOB_REQUEST_MIN}; +use radroots_events_codec::job::request::decode::job_request_from_tags; +use radroots_nostr::prelude::{ + RadrootsNostrEvent, + RadrootsNostrFilter, + RadrootsNostrKind, +}; + +#[derive(Clone, Debug, Serialize)] +pub(crate) struct DvmRequestRow { + id: String, + author: String, + created_at: u64, + kind: u32, + tags: Vec<Vec<String>>, + content: String, + sig: String, + request: Option<RadrootsJobRequest>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmRequestListResponse { + requests: Vec<DvmRequestRow>, +} + +#[derive(Debug, Default, Deserialize)] +struct DvmRequestListParams { + #[serde(default)] + authors: Option<Vec<String>>, + #[serde(default)] + limit: Option<u64>, + #[serde(default)] + since: Option<u64>, + #[serde(default)] + until: Option<u64>, + #[serde(default)] + timeout_secs: Option<u64>, + #[serde(default)] + kinds: Option<Vec<u32>>, +} + +fn dvm_request_kinds_or(kinds: Option<Vec<u32>>) -> Result<Vec<RadrootsNostrKind>, RpcError> { + let kinds = match kinds { + Some(kinds) => { + if kinds.is_empty() { + return Err(RpcError::InvalidParams( + "dvm request kinds cannot be empty".to_string(), + )); + } + kinds + } + None => (KIND_JOB_REQUEST_MIN..=KIND_JOB_REQUEST_MAX).collect(), + }; + + let mut out = Vec::with_capacity(kinds.len()); + for kind in kinds { + if !is_request_kind(kind) { + return Err(RpcError::InvalidParams(format!( + "invalid dvm request kind: {kind}", + ))); + } + let kind = u16::try_from(kind) + .map_err(|_| RpcError::InvalidParams(format!("dvm request kind out of range: {kind}")))?; + out.push(RadrootsNostrKind::Custom(kind)); + } + Ok(out) +} + +pub(crate) fn build_dvm_request_rows<I>(events: I) -> Vec<DvmRequestRow> +where + I: IntoIterator<Item = RadrootsNostrEvent>, +{ + let mut items = events + .into_iter() + .map(|ev| { + let tags = event_tags(&ev); + let request = parse_dvm_request_event(&ev, &tags); + let event = event_view_with_tags(&ev, tags); + DvmRequestRow { + id: event.id, + author: event.author, + created_at: event.created_at, + kind: event.kind, + tags: event.tags, + content: event.content, + sig: event.sig, + request, + } + }) + .collect::<Vec<_>>(); + items.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + items +} + +fn parse_dvm_request_event( + event: &RadrootsNostrEvent, + tags: &[Vec<String>], +) -> Option<RadrootsJobRequest> { + let kind = event.kind.as_u16() as u32; + if !is_request_kind(kind) { + return None; + } + job_request_from_tags(kind, tags).ok() +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_request.list"); + m.register_async_method("events.dvm_request.list", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmRequestListParams { + authors, + limit, + since, + until, + timeout_secs, + kinds, + } = params + .parse::<Option<DvmRequestListParams>>() + .map_err(|e| RpcError::InvalidParams(e.to_string()))? + .unwrap_or_default(); + + let limit = limit_or(limit); + let kinds = dvm_request_kinds_or(kinds)?; + + let mut filter = RadrootsNostrFilter::new().limit(limit).kinds(kinds); + + if let Some(authors) = parse_pubkeys_opt("author", authors)? { + filter = filter.authors(authors); + } else { + filter = filter.author(ctx.state.pubkey); + } + filter = apply_time_bounds(filter, since, until); + + let events = ctx + .state + .client + .fetch_events(filter, Duration::from_secs(timeout_or(timeout_secs))) + .await + .map_err(|e| RpcError::Other(format!("fetch failed: {e}")))?; + + let items = build_dvm_request_rows(events); + + Ok::<DvmRequestListResponse, RpcError>(DvmRequestListResponse { requests: items }) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::build_dvm_request_rows; + use radroots_events::job::JobInputType; + use radroots_events::job_request::{RadrootsJobInput, RadrootsJobRequest}; + use radroots_events::kinds::KIND_JOB_REQUEST_MIN; + use radroots_events_codec::job::request::encode::job_request_build_tags; + use radroots_nostr::prelude::RadrootsNostrEvent; + use serde_json::json; + + fn dvm_request_event( + id: &str, + pubkey: &str, + created_at: u64, + kind: u32, + tags: Vec<Vec<String>>, + content: &str, + ) -> RadrootsNostrEvent { + let sig = format!("{:0128x}", 9); + let event_json = json!({ + "id": id, + "pubkey": pubkey, + "created_at": created_at, + "kind": kind, + "tags": tags, + "content": content, + "sig": sig, + }); + serde_json::from_value(event_json).expect("event") + } + + fn sample_request() -> RadrootsJobRequest { + RadrootsJobRequest { + kind: (KIND_JOB_REQUEST_MIN + 1) as u16, + inputs: vec![RadrootsJobInput { + data: "https://example.com".to_string(), + input_type: JobInputType::Url, + relay: None, + marker: None, + }], + output: None, + params: Vec::new(), + bid_sat: None, + relays: Vec::new(), + providers: Vec::new(), + topics: Vec::new(), + encrypted: false, + } + } + + #[test] + fn dvm_request_list_sorts_by_created_at_desc() { + let pubkey = "1bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let old_id = format!("{:064x}", 1); + let new_id = format!("{:064x}", 2); + let req = sample_request(); + let tags = job_request_build_tags(&req); + let older = dvm_request_event(&old_id, pubkey, 100, KIND_JOB_REQUEST_MIN + 1, tags.clone(), ""); + let newer = dvm_request_event(&new_id, pubkey, 200, KIND_JOB_REQUEST_MIN + 1, tags.clone(), ""); + + let requests = build_dvm_request_rows(vec![older, newer]); + + assert_eq!(requests.len(), 2); + assert_eq!(requests[0].id, new_id); + assert_eq!(requests[0].created_at, 200); + assert_eq!(requests[1].id, old_id); + assert_eq!(requests[1].created_at, 100); + } + + #[test] + fn dvm_request_list_decodes_request() { + let pubkey = "2bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let req = sample_request(); + let tags = job_request_build_tags(&req); + let id = format!("{:064x}", 3); + let event = dvm_request_event(&id, pubkey, 300, KIND_JOB_REQUEST_MIN + 1, tags.clone(), "payload"); + + let requests = build_dvm_request_rows(vec![event]); + + assert_eq!(requests.len(), 1); + assert_eq!(requests[0].tags, tags); + let decoded = requests[0].request.as_ref().expect("request"); + assert_eq!(decoded, &req); + } +} diff --git a/src/api/jsonrpc/methods/events/dvm_request/mod.rs b/src/api/jsonrpc/methods/events/dvm_request/mod.rs @@ -0,0 +1,18 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext}; + +pub mod get; +pub mod list; +pub mod publish; + +pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { + let mut m = RpcModule::new(ctx); + list::register(&mut m, &registry)?; + publish::register(&mut m, &registry)?; + get::register(&mut m, &registry)?; + Ok(m) +} diff --git a/src/api/jsonrpc/methods/events/dvm_request/publish.rs b/src/api/jsonrpc/methods/events/dvm_request/publish.rs @@ -0,0 +1,57 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::Deserialize; + +use crate::api::jsonrpc::nostr::{publish_response, PublishResponse}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_request::RadrootsJobRequest; +use radroots_events_codec::job::encode::canonicalize_tags; +use radroots_events_codec::job::request::encode::to_wire_parts; +use radroots_nostr::prelude::{radroots_nostr_build_event, radroots_nostr_send_event}; + +#[derive(Debug, Deserialize)] +struct PublishDvmRequestParams { + request: RadrootsJobRequest, + #[serde(default)] + content: Option<String>, + #[serde(default)] + tags: Option<Vec<Vec<String>>>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_request.publish"); + m.register_async_method("events.dvm_request.publish", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let PublishDvmRequestParams { + request, + content, + tags, + } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let content = content.unwrap_or_default(); + let mut parts = to_wire_parts(&request, &content) + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + if let Some(extra_tags) = tags { + parts.tags.extend(extra_tags); + canonicalize_tags(&mut parts.tags); + } + + let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) + .map_err(|e| RpcError::Other(format!("failed to build dvm request event: {e}")))?; + + let output = radroots_nostr_send_event(&ctx.state.client, builder) + .await + .map_err(|e| RpcError::Other(format!("failed to publish dvm request: {e}")))?; + + Ok::<PublishResponse, RpcError>(publish_response(output)) + })?; + + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/dvm_result/get.rs b/src/api/jsonrpc/methods/events/dvm_result/get.rs @@ -0,0 +1,55 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::kinds::is_result_kind; +use radroots_nostr::prelude::{RadrootsNostrEventId, RadrootsNostrFilter}; + +use super::list::{build_dvm_result_rows, DvmResultRow}; +use crate::api::jsonrpc::methods::events::helpers::{fetch_latest_event, require_non_empty}; + +#[derive(Debug, Deserialize)] +struct DvmResultGetParams { + id: String, + #[serde(default)] + timeout_secs: Option<u64>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmResultGetResponse { + result: Option<DvmResultRow>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_result.get"); + m.register_async_method("events.dvm_result.get", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmResultGetParams { id, timeout_secs } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let id = require_non_empty("id", id)?; + let event_id = RadrootsNostrEventId::parse(&id) + .map_err(|e| RpcError::InvalidParams(format!("invalid id: {e}")))?; + + let filter = RadrootsNostrFilter::new().id(event_id); + + let event = fetch_latest_event(&ctx.state.client, filter, timeout_secs).await?; + let result = event.and_then(|event| { + let kind = event.kind.as_u16() as u32; + if !is_result_kind(kind) { + return None; + } + build_dvm_result_rows(vec![event]).into_iter().next() + }); + + Ok::<DvmResultGetResponse, RpcError>(DvmResultGetResponse { result }) + })?; + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/dvm_result/list.rs b/src/api/jsonrpc/methods/events/dvm_result/list.rs @@ -0,0 +1,272 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +use crate::api::jsonrpc::nostr::{event_tags, event_view_with_tags}; +use crate::api::jsonrpc::params::{ + apply_time_bounds, + limit_or, + parse_pubkeys_opt, + timeout_or, +}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_result::RadrootsJobResult; +use radroots_events::kinds::{is_result_kind, KIND_JOB_RESULT_MAX, KIND_JOB_RESULT_MIN}; +use radroots_events_codec::job::result::decode::job_result_from_tags; +use radroots_nostr::prelude::{ + RadrootsNostrEvent, + RadrootsNostrEventId, + RadrootsNostrFilter, + RadrootsNostrKind, +}; + +#[derive(Clone, Debug, Serialize)] +pub(crate) struct DvmResultRow { + id: String, + author: String, + created_at: u64, + kind: u32, + tags: Vec<Vec<String>>, + content: String, + sig: String, + result: Option<RadrootsJobResult>, +} + +#[derive(Clone, Debug, Serialize)] +struct DvmResultListResponse { + results: Vec<DvmResultRow>, +} + +#[derive(Debug, Default, Deserialize)] +struct DvmResultListParams { + #[serde(default)] + authors: Option<Vec<String>>, + #[serde(default)] + limit: Option<u64>, + #[serde(default)] + since: Option<u64>, + #[serde(default)] + until: Option<u64>, + #[serde(default)] + timeout_secs: Option<u64>, + #[serde(default)] + kinds: Option<Vec<u32>>, + #[serde(default)] + request_id: Option<String>, +} + +fn dvm_result_kinds_or(kinds: Option<Vec<u32>>) -> Result<Vec<RadrootsNostrKind>, RpcError> { + let kinds = match kinds { + Some(kinds) => { + if kinds.is_empty() { + return Err(RpcError::InvalidParams( + "dvm result kinds cannot be empty".to_string(), + )); + } + kinds + } + None => (KIND_JOB_RESULT_MIN..=KIND_JOB_RESULT_MAX).collect(), + }; + + let mut out = Vec::with_capacity(kinds.len()); + for kind in kinds { + if !is_result_kind(kind) { + return Err(RpcError::InvalidParams(format!( + "invalid dvm result kind: {kind}", + ))); + } + let kind = u16::try_from(kind) + .map_err(|_| RpcError::InvalidParams(format!("dvm result kind out of range: {kind}")))?; + out.push(RadrootsNostrKind::Custom(kind)); + } + Ok(out) +} + +pub(crate) fn build_dvm_result_rows<I>(events: I) -> Vec<DvmResultRow> +where + I: IntoIterator<Item = RadrootsNostrEvent>, +{ + let mut items = events + .into_iter() + .map(|ev| { + let tags = event_tags(&ev); + let result = parse_dvm_result_event(&ev, &tags); + let event = event_view_with_tags(&ev, tags); + DvmResultRow { + id: event.id, + author: event.author, + created_at: event.created_at, + kind: event.kind, + tags: event.tags, + content: event.content, + sig: event.sig, + result, + } + }) + .collect::<Vec<_>>(); + items.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + items +} + +fn parse_dvm_result_event( + event: &RadrootsNostrEvent, + tags: &[Vec<String>], +) -> Option<RadrootsJobResult> { + let kind = event.kind.as_u16() as u32; + if !is_result_kind(kind) { + return None; + } + job_result_from_tags(kind, tags, &event.content).ok() +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_result.list"); + m.register_async_method("events.dvm_result.list", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let DvmResultListParams { + authors, + limit, + since, + until, + timeout_secs, + kinds, + request_id, + } = params + .parse::<Option<DvmResultListParams>>() + .map_err(|e| RpcError::InvalidParams(e.to_string()))? + .unwrap_or_default(); + + let limit = limit_or(limit); + let kinds = dvm_result_kinds_or(kinds)?; + + let mut filter = RadrootsNostrFilter::new().limit(limit).kinds(kinds); + + if let Some(authors) = parse_pubkeys_opt("author", authors)? { + filter = filter.authors(authors); + } else { + filter = filter.author(ctx.state.pubkey); + } + filter = apply_time_bounds(filter, since, until); + + if let Some(request_id) = request_id { + let request_id = request_id.trim(); + if request_id.is_empty() { + return Err(RpcError::InvalidParams( + "request_id cannot be empty".to_string(), + )); + } + let request_id = RadrootsNostrEventId::parse(request_id) + .map_err(|e| RpcError::InvalidParams(format!("invalid request_id: {e}")))?; + filter = filter.event(request_id); + } + + let events = ctx + .state + .client + .fetch_events(filter, Duration::from_secs(timeout_or(timeout_secs))) + .await + .map_err(|e| RpcError::Other(format!("fetch failed: {e}")))?; + + let items = build_dvm_result_rows(events); + + Ok::<DvmResultListResponse, RpcError>(DvmResultListResponse { results: items }) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::build_dvm_result_rows; + use radroots_events::job_request::RadrootsJobInput; + use radroots_events::job_result::RadrootsJobResult; + use radroots_events::kinds::KIND_JOB_RESULT_MIN; + use radroots_events::RadrootsNostrEventPtr; + use radroots_events_codec::job::result::encode::job_result_build_tags; + use radroots_nostr::prelude::RadrootsNostrEvent; + use serde_json::json; + + fn dvm_result_event( + id: &str, + pubkey: &str, + created_at: u64, + kind: u32, + tags: Vec<Vec<String>>, + content: &str, + ) -> RadrootsNostrEvent { + let sig = format!("{:0128x}", 10); + let event_json = json!({ + "id": id, + "pubkey": pubkey, + "created_at": created_at, + "kind": kind, + "tags": tags, + "content": content, + "sig": sig, + }); + serde_json::from_value(event_json).expect("event") + } + + fn sample_result() -> RadrootsJobResult { + RadrootsJobResult { + kind: (KIND_JOB_RESULT_MIN + 1) as u16, + request_event: RadrootsNostrEventPtr { + id: "req".to_string(), + relays: None, + }, + request_json: None, + inputs: vec![RadrootsJobInput { + data: "https://example.com".to_string(), + input_type: radroots_events::job::JobInputType::Url, + relay: None, + marker: None, + }], + customer_pubkey: None, + payment: None, + content: Some("payload".to_string()), + encrypted: false, + } + } + + #[test] + fn dvm_result_list_sorts_by_created_at_desc() { + let pubkey = "1bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let old_id = format!("{:064x}", 1); + let new_id = format!("{:064x}", 2); + let result = sample_result(); + let tags = job_result_build_tags(&result); + let older = dvm_result_event(&old_id, pubkey, 100, KIND_JOB_RESULT_MIN + 1, tags.clone(), "payload"); + let newer = dvm_result_event(&new_id, pubkey, 200, KIND_JOB_RESULT_MIN + 1, tags.clone(), "payload"); + + let results = build_dvm_result_rows(vec![older, newer]); + + assert_eq!(results.len(), 2); + assert_eq!(results[0].id, new_id); + assert_eq!(results[0].created_at, 200); + assert_eq!(results[1].id, old_id); + assert_eq!(results[1].created_at, 100); + } + + #[test] + fn dvm_result_list_decodes_result() { + let pubkey = "2bdebe7b23fccb167fc8843280b789839dfa296ae9fd86cc9769b4813d76d8a4"; + let result = sample_result(); + let content = result.content.clone().unwrap(); + let tags = job_result_build_tags(&result); + let id = format!("{:064x}", 3); + let event = dvm_result_event(&id, pubkey, 300, KIND_JOB_RESULT_MIN + 1, tags.clone(), &content); + + let results = build_dvm_result_rows(vec![event]); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].tags, tags); + let decoded = results[0].result.as_ref().expect("result"); + assert_eq!(decoded, &result); + } +} diff --git a/src/api/jsonrpc/methods/events/dvm_result/mod.rs b/src/api/jsonrpc/methods/events/dvm_result/mod.rs @@ -0,0 +1,18 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; + +use crate::api::jsonrpc::{MethodRegistry, RpcContext}; + +pub mod get; +pub mod list; +pub mod publish; + +pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { + let mut m = RpcModule::new(ctx); + list::register(&mut m, &registry)?; + publish::register(&mut m, &registry)?; + get::register(&mut m, &registry)?; + Ok(m) +} diff --git a/src/api/jsonrpc/methods/events/dvm_result/publish.rs b/src/api/jsonrpc/methods/events/dvm_result/publish.rs @@ -0,0 +1,51 @@ +#![forbid(unsafe_code)] + +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::Deserialize; + +use crate::api::jsonrpc::nostr::{publish_response, PublishResponse}; +use crate::api::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use radroots_events::job_result::RadrootsJobResult; +use radroots_events_codec::job::encode::canonicalize_tags; +use radroots_events_codec::job::result::encode::to_wire_parts; +use radroots_nostr::prelude::{radroots_nostr_build_event, radroots_nostr_send_event}; + +#[derive(Debug, Deserialize)] +struct PublishDvmResultParams { + result: RadrootsJobResult, + #[serde(default)] + tags: Option<Vec<Vec<String>>>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("events.dvm_result.publish"); + m.register_async_method("events.dvm_result.publish", |params, ctx, _| async move { + if ctx.state.client.relays().await.is_empty() { + return Err(RpcError::NoRelays); + } + + let PublishDvmResultParams { result, tags } = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + + let content = result.content.clone().unwrap_or_default(); + let mut parts = to_wire_parts(&result, &content) + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + if let Some(extra_tags) = tags { + parts.tags.extend(extra_tags); + canonicalize_tags(&mut parts.tags); + } + + let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) + .map_err(|e| RpcError::Other(format!("failed to build dvm result event: {e}")))?; + + let output = radroots_nostr_send_event(&ctx.state.client, builder) + .await + .map_err(|e| RpcError::Other(format!("failed to publish dvm result: {e}")))?; + + Ok::<PublishResponse, RpcError>(publish_response(output)) + })?; + + Ok(()) +} diff --git a/src/api/jsonrpc/methods/events/mod.rs b/src/api/jsonrpc/methods/events/mod.rs @@ -1,4 +1,7 @@ pub mod farm; +pub mod dvm_feedback; +pub mod dvm_request; +pub mod dvm_result; pub mod plot; pub mod resource_area; pub mod resource_cap; diff --git a/src/api/jsonrpc/methods/mod.rs b/src/api/jsonrpc/methods/mod.rs @@ -21,6 +21,9 @@ pub fn register_all( root.merge(events::post::module(ctx.clone(), registry.clone())?)?; root.merge(events::listing::module(ctx.clone(), registry.clone())?)?; root.merge(events::list_set::module(ctx.clone(), registry.clone())?)?; + root.merge(events::dvm_request::module(ctx.clone(), registry.clone())?)?; + root.merge(events::dvm_result::module(ctx.clone(), registry.clone())?)?; + root.merge(events::dvm_feedback::module(ctx.clone(), registry.clone())?)?; root.merge(events::farm::module(ctx.clone(), registry.clone())?)?; root.merge(events::plot::module(ctx.clone(), registry.clone())?)?; root.merge(events::resource_area::module(ctx.clone(), registry.clone())?)?;