commit e0774fc9ca04320cec6840b35a5f0044d66a998a
parent 2db321edc71ab9c5963e40d7ba63787dafbd8eb1
Author: triesap <137732411+triesap@users.noreply.github.com>
Date: Sat, 12 Apr 2025 22:56:31 +0000
Adds job request events subscriber tags parser and NIP-90 JobRequest interface.
Diffstat:
4 files changed, 272 insertions(+), 16 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -28,6 +28,17 @@ dependencies = [
]
[[package]]
+name = "aes"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
+dependencies = [
+ "cfg-if",
+ "cipher",
+ "cpufeatures",
+]
+
+[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1037,6 +1048,7 @@ version = "0.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f900ddcdc28395759fcd44b18a03255e7deee8858551bfe5d5d5a07311d82ea"
dependencies = [
+ "aes",
"base64 0.22.1",
"bech32",
"bip39",
diff --git a/Cargo.toml b/Cargo.toml
@@ -10,7 +10,7 @@ description = "rhizome is a Nostr data vending machine (NIP-90)"
anyhow = "1.0"
clap = { version = "4", features = ["derive"] }
config = "0.15"
-nostr = "0.40.0"
+nostr = { version = "0.40.0", features = ["nip04"] }
nostr-sdk = "0.40.0"
serde = "1.0"
serde_json = "1.0"
diff --git a/src/events/job_request.rs b/src/events/job_request.rs
@@ -1,19 +1,96 @@
use anyhow::Result;
-use nostr::event::Event;
-use nostr::{event::Kind, filter::Filter, key::Keys, types::Timestamp};
+use nostr::event::{Event, EventId, Tag, TagKind};
+use nostr::filter::{Alphabet, SingleLetterTag};
+use nostr::{event::Kind, key::Keys};
use nostr_sdk::Client;
use nostr_sdk::RelayPoolNotification;
use tracing::{info, warn};
use crate::KIND_JOB_REQUEST;
-use crate::utils::nostr::nostr_event_job_request_feedback;
+use crate::utils::nostr::{
+ nostr_event_job_request_feedback, nostr_filter_kind, nostr_filter_new_events,
+ nostr_tag_at_value, nostr_tag_first_value, nostr_tag_relays_parse, nostr_tag_slice,
+ nostr_tags_resolve,
+};
#[derive(thiserror::Error, Debug)]
pub enum JobRequestError {
- #[error("Failure to process request.")]
+ #[error("Invalid job request input type: {0}")]
+ InvalidInputType(String),
+
+ #[error("Invalid job request input marker: {0}")]
+ InvalidInputMarker(String),
+
+ #[error("Failure to resolve event tags: {0}")]
+ TagResolution(#[from] anyhow::Error),
+
+ #[error("Failure to process request")]
Failure,
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum JobRequestInputType {
+ Url,
+ Event,
+ Job,
+ Text,
+}
+
+impl TryFrom<&str> for JobRequestInputType {
+ type Error = JobRequestError;
+
+ fn try_from(s: &str) -> Result<Self, Self::Error> {
+ match s {
+ "url" => Ok(Self::Url),
+ "event" => Ok(Self::Event),
+ "job" => Ok(Self::Job),
+ "text" => Ok(Self::Text),
+ other => Err(JobRequestError::InvalidInputType(other.to_string())),
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum JobRequestInputMarker {
+ Order,
+ Quote,
+ Preview,
+}
+
+impl TryFrom<&str> for JobRequestInputMarker {
+ type Error = JobRequestError;
+
+ fn try_from(s: &str) -> Result<Self, Self::Error> {
+ match s {
+ "order" => Ok(Self::Order),
+ "quote" => Ok(Self::Quote),
+ "preview" => Ok(Self::Preview),
+ other => Err(JobRequestError::InvalidInputMarker(other.to_string())),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct JobRequestInput {
+ pub data: String,
+ pub input_type: JobRequestInputType,
+ pub relay: Option<String>,
+ pub marker: Option<JobRequestInputMarker>,
+}
+
+#[derive(Debug, Clone)]
+pub struct JobRequest {
+ pub id: EventId,
+ pub inputs: Vec<JobRequestInput>,
+ pub output: Option<String>,
+ pub bid_msat: Option<u64>,
+ pub relays: Vec<String>,
+ pub service_providers: Vec<String>,
+ pub params: Vec<(String, String)>,
+ pub hashtags: Vec<String>,
+ pub tags: Vec<Tag>,
+}
+
pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> {
info!("Starting subscriber for kind {}", KIND_JOB_REQUEST);
let client = Client::new(keys.clone());
@@ -22,9 +99,7 @@ pub async fn subscriber(keys: Keys, relays: Vec<String>) -> Result<()> {
client.add_relay(relay).await?;
}
- let filter = Filter::new()
- .kind(Kind::Custom(KIND_JOB_REQUEST))
- .since(Timestamp::now());
+ let filter = nostr_filter_new_events(nostr_filter_kind(KIND_JOB_REQUEST));
client.connect().await;
client.subscribe(filter, None).await?;
@@ -72,12 +147,94 @@ async fn handle_error(
}
async fn handle_event(event: Event, keys: Keys, client: Client) -> Result<(), JobRequestError> {
- let t = std::time::Instant::now();
- if t.elapsed().as_nanos() % 2 == 0 {
- info!("job_request handle_event {:?}", { event.clone() });
- } else {
- return Err(JobRequestError::Failure);
- }
+ let job_request = parse_event(&event, &keys)?;
+
+ info!("job_request handle_event job_request {:?}", {
+ job_request.clone()
+ });
Ok(())
}
+
+fn parse_event(event: &Event, keys: &Keys) -> Result<JobRequest, JobRequestError> {
+ let tags = nostr_tags_resolve(event, keys).map_err(JobRequestError::TagResolution)?;
+ let mut inputs = vec![];
+ let mut output = None;
+ let mut bid_msat = None;
+ let mut relays = vec![];
+ let mut providers = vec![];
+ let mut params = vec![];
+ let mut hashtags = vec![];
+
+ for tag in &tags {
+ match tag.kind() {
+ TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::I) => {
+ if let Some(vals) = nostr_tag_slice(tag, 1) {
+ match &vals[..] {
+ [data, input_type, relay, marker, ..] => {
+ let data = data.clone();
+ let input_type = JobRequestInputType::try_from(input_type.as_str())?;
+ let relay = relay.clone();
+ let marker = JobRequestInputMarker::try_from(marker.as_str())?;
+ inputs.push(JobRequestInput {
+ data,
+ input_type,
+ relay: Some(relay),
+ marker: Some(marker),
+ });
+ }
+ _ => continue,
+ }
+ }
+ }
+
+ TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::T) => {
+ if let Some(val) = nostr_tag_first_value(tag, "t") {
+ hashtags.push(val);
+ }
+ }
+
+ TagKind::Custom(ref k) if k == "output" => {
+ output = nostr_tag_first_value(tag, k);
+ }
+
+ TagKind::Custom(ref k) if k == "bid" => {
+ bid_msat = nostr_tag_first_value(tag, k).and_then(|s| s.parse().ok());
+ }
+
+ TagKind::Custom(k) if k == "param" => {
+ if let Some(vals) = nostr_tag_slice(tag, 1) {
+ if vals.len() >= 2 {
+ params.push((vals[0].clone(), vals[1].clone()));
+ }
+ }
+ }
+
+ TagKind::Relays => {
+ if let Some(urls) = nostr_tag_relays_parse(tag) {
+ relays = urls.into_iter().map(|u| u.to_string()).collect();
+ }
+ }
+
+ TagKind::SingleLetter(l) if l == SingleLetterTag::lowercase(Alphabet::P) => {
+ if let Some(pk) = nostr_tag_at_value(tag, 1) {
+ providers.push(pk);
+ }
+ }
+
+ _ => {}
+ }
+ }
+
+ Ok(JobRequest {
+ id: event.id,
+ inputs,
+ output,
+ bid_msat,
+ relays,
+ service_providers: providers,
+ tags,
+ params,
+ hashtags,
+ })
+}
diff --git a/src/utils/nostr.rs b/src/utils/nostr.rs
@@ -1,10 +1,53 @@
+use anyhow::Result;
use nostr::{
- event::{Event, EventBuilder, Tag},
- nips::nip90::{DataVendingMachineStatus, JobFeedbackData},
+ event::{Event, EventBuilder, Kind, Tag, TagKind, TagStandard},
+ filter::Filter,
+ key::{Keys, PublicKey},
+ nips::{
+ nip04,
+ nip90::{DataVendingMachineStatus, JobFeedbackData},
+ },
+ types::{RelayUrl, Timestamp},
};
+use thiserror::Error;
use crate::events::job_request::JobRequestError;
+pub fn nostr_kind(kind: u16) -> Kind {
+ Kind::Custom(kind)
+}
+
+pub fn nostr_filter_kind(kind: u16) -> Filter {
+ Filter::new().kind(Kind::Custom(kind))
+}
+
+pub fn nostr_filter_new_events(filter: Filter) -> Filter {
+ filter.since(Timestamp::now())
+}
+
+pub fn nostr_tag_first_value(tag: &Tag, key: &str) -> Option<String> {
+ if tag.kind() == TagKind::custom(key) {
+ tag.content().map(|v| v.to_string())
+ } else {
+ None
+ }
+}
+
+pub fn nostr_tag_at_value(tag: &Tag, index: usize) -> Option<String> {
+ tag.as_slice().get(index).cloned()
+}
+
+pub fn nostr_tag_slice(tag: &Tag, start: usize) -> Option<Vec<String>> {
+ tag.as_slice().get(start..).map(|s| s.to_vec())
+}
+
+pub fn nostr_tag_relays_parse(tag: &Tag) -> Option<&Vec<RelayUrl>> {
+ match tag.as_standardized()? {
+ TagStandard::Relays(urls) => Some(urls),
+ _ => None,
+ }
+}
+
pub fn nostr_event_job_request_feedback(
event: &Event,
error: JobRequestError,
@@ -18,3 +61,47 @@ pub fn nostr_event_job_request_feedback(
let builder = EventBuilder::job_feedback(feedback_data).tags(tags.unwrap_or_default());
Ok(builder)
}
+
+#[derive(Debug, Error)]
+pub enum NostrTagsResolveError {
+ #[error("Missing public key tag in encrypted event: {0:?}")]
+ MissingPTag(nostr::Event),
+
+ #[error("Encrypted event recipient mismatch")]
+ NotRecipient,
+
+ #[error("Decryption error: {0}")]
+ DecryptionError(String),
+
+ #[error("Failed to parse decrypted tag JSON: {0}")]
+ ParseError(#[from] serde_json::Error),
+}
+
+pub fn nostr_tags_resolve(event: &Event, keys: &Keys) -> Result<Vec<Tag>> {
+ if event.tags.iter().any(|t| t.kind() == TagKind::Encrypted) {
+ let recipient = event
+ .tags
+ .iter()
+ .find_map(|tag| {
+ if tag.kind() == TagKind::p() {
+ tag.content()?.parse::<PublicKey>().ok()
+ } else {
+ None
+ }
+ })
+ .ok_or_else(|| NostrTagsResolveError::MissingPTag(event.clone()))?;
+
+ if recipient != keys.public_key() {
+ return Err(NostrTagsResolveError::NotRecipient.into());
+ }
+
+ let cleartext = nip04::decrypt(keys.secret_key(), &event.pubkey, &event.content)
+ .map_err(|e| NostrTagsResolveError::DecryptionError(e.to_string()))?;
+
+ let decrypted_tags: nostr::event::tag::list::Tags = serde_json::from_str(&cleartext)?;
+
+ Ok(decrypted_tags.to_vec())
+ } else {
+ Ok(event.clone().tags.to_vec())
+ }
+}