commit 480c68d70e04e7814218a656030f720afd5f191d
parent 02a4e343bbde3e7a8bd5c91e6bf87303173a2c25
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 23:00:06 -0700
store-surreal: project current listings
Diffstat:
1 file changed, 347 insertions(+), 5 deletions(-)
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -5,7 +5,7 @@ use sha2::{Digest, Sha256};
use surrealdb::Surreal;
use surrealdb::engine::local::{Db, Mem};
use tangle_nips::{
- DeletionTarget, ListingProjectionEvaluation, NIP99_DRAFT_LISTING_KIND,
+ DeletionTarget, ListingProjection, ListingProjectionEvaluation, NIP99_DRAFT_LISTING_KIND,
NIP99_PUBLIC_LISTING_KIND, evaluate_listing_projection, parse_deletion_request,
};
use tangle_protocol::{AddressCoordinate, Event, EventId, UnixTimestamp, event_to_value};
@@ -673,6 +673,13 @@ pub enum ListingRevisionOutcome {
Stored { parsed_ok: bool },
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ListingCurrentOutcome {
+ NotListing,
+ Ineligible,
+ Projected,
+}
+
#[derive(Clone)]
pub struct SurrealStore {
db: Surreal<Db>,
@@ -1114,6 +1121,129 @@ UPSERT type::record('listing_revision', $event_id) CONTENT {
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn project_current_listing(
+ &self,
+ event: &Event,
+ projected_at: UnixTimestamp,
+ ) -> Result<ListingCurrentOutcome, SurrealStoreError> {
+ let evaluation = evaluate_listing_projection(event);
+ let ListingProjectionEvaluation::Eligible(projection) = evaluation else {
+ return Ok(
+ if matches!(evaluation, ListingProjectionEvaluation::NotListing) {
+ ListingCurrentOutcome::NotListing
+ } else {
+ ListingCurrentOutcome::Ineligible
+ },
+ );
+ };
+ let fields = listing_current_fields(&projection, event, projected_at)?;
+ self.db
+ .query(
+ r#"
+UPSERT type::record('listing_current', $listing_key) CONTENT {
+ listing_key: $listing_key,
+ listing_key_hash: $listing_key_hash,
+ event_id: $event_id,
+ seller_pubkey: $seller_pubkey,
+ d: $d,
+ created_at: $created_at,
+ updated_at: $updated_at,
+ published_at: $published_at,
+ title: $title,
+ summary: $summary,
+ content: $content,
+ price_decimal: $price_decimal,
+ price_minor: $price_minor,
+ currency_raw: $currency_raw,
+ currency_norm: $currency_norm,
+ price_frequency: $price_frequency,
+ unit: $unit,
+ unit_family: $unit_family,
+ location_text: $location_text,
+ geohash: $geohash,
+ geohash4: $geohash4,
+ geohash5: $geohash5,
+ geohash6: $geohash6,
+ geohash7: $geohash7,
+ point: $point,
+ status_tag: $status_tag,
+ effective_status: $effective_status,
+ categories: $categories,
+ tags: $tags,
+ practices: $practices,
+ certifications: $certifications,
+ image_urls: $image_urls,
+ pickup_available: $pickup_available,
+ delivery_available: $delivery_available,
+ shipping_available: $shipping_available,
+ delivery_only: $delivery_only,
+ seller_trust_score: $seller_trust_score,
+ hidden: false,
+ deleted: false,
+ projected_at: $projected_at
+};
+"#,
+ )
+ .bind(("listing_key", fields.listing_key))
+ .bind(("listing_key_hash", fields.listing_key_hash))
+ .bind(("event_id", event.id().as_str()))
+ .bind(("seller_pubkey", fields.seller_pubkey))
+ .bind(("d", fields.d))
+ .bind(("created_at", event.unsigned().created_at().as_u64()))
+ .bind(("updated_at", event.unsigned().created_at().as_u64()))
+ .bind(("published_at", fields.published_at))
+ .bind(("title", fields.title))
+ .bind(("summary", fields.summary))
+ .bind(("content", fields.content))
+ .bind(("price_decimal", fields.price_decimal))
+ .bind(("price_minor", fields.price_minor))
+ .bind(("currency_raw", fields.currency_raw))
+ .bind(("currency_norm", fields.currency_norm))
+ .bind(("price_frequency", fields.price_frequency))
+ .bind(("unit", fields.unit.clone()))
+ .bind(("unit_family", fields.unit))
+ .bind(("location_text", fields.location_text))
+ .bind(("geohash", fields.geohash))
+ .bind(("geohash4", fields.geohash4))
+ .bind(("geohash5", fields.geohash5))
+ .bind(("geohash6", fields.geohash6))
+ .bind(("geohash7", fields.geohash7))
+ .bind(("point", Option::<Vec<serde_json::Value>>::None))
+ .bind(("status_tag", fields.status_tag))
+ .bind(("effective_status", fields.effective_status))
+ .bind(("categories", fields.categories))
+ .bind(("tags", fields.tags))
+ .bind(("practices", fields.practices))
+ .bind(("certifications", fields.certifications))
+ .bind(("image_urls", fields.image_urls))
+ .bind(("pickup_available", fields.pickup_available))
+ .bind(("delivery_available", fields.delivery_available))
+ .bind(("shipping_available", fields.shipping_available))
+ .bind(("delivery_only", fields.delivery_only))
+ .bind(("seller_trust_score", Option::<i64>::None))
+ .bind(("projected_at", projected_at.as_u64()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ Ok(ListingCurrentOutcome::Projected)
+ }
+
+ pub async fn listing_current_row(
+ &self,
+ listing_key: &str,
+ ) -> Result<Option<serde_json::Value>, SurrealStoreError> {
+ let mut response = self
+ .db
+ .query("SELECT * FROM ONLY type::record('listing_current', $listing_key);")
+ .bind(("listing_key", listing_key))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
async fn applied_migration(
&self,
name: &str,
@@ -1292,6 +1422,89 @@ struct ListingRevisionFields {
status_tag: Option<String>,
}
+struct ListingCurrentFields {
+ listing_key: String,
+ listing_key_hash: String,
+ seller_pubkey: String,
+ d: String,
+ published_at: Option<u64>,
+ title: String,
+ summary: Option<String>,
+ content: String,
+ price_decimal: String,
+ price_minor: i64,
+ currency_raw: String,
+ currency_norm: String,
+ price_frequency: Option<String>,
+ unit: String,
+ location_text: Option<String>,
+ geohash: Option<String>,
+ geohash4: Option<String>,
+ geohash5: Option<String>,
+ geohash6: Option<String>,
+ geohash7: Option<String>,
+ status_tag: Option<String>,
+ effective_status: String,
+ categories: Vec<String>,
+ tags: Vec<String>,
+ practices: Vec<String>,
+ certifications: Vec<String>,
+ image_urls: Vec<String>,
+ pickup_available: bool,
+ delivery_available: bool,
+ shipping_available: bool,
+ delivery_only: bool,
+}
+
+fn listing_current_fields(
+ projection: &ListingProjection,
+ event: &Event,
+ _projected_at: UnixTimestamp,
+) -> Result<ListingCurrentFields, SurrealStoreError> {
+ let listing_key = projection.identity().address().key().to_string();
+ let price_decimal = projection.price().amount().raw().to_owned();
+ let price_minor = price_minor(&price_decimal).ok_or_else(|| {
+ SurrealStoreError::new("listing price amount must fit two decimal minor units")
+ })?;
+ Ok(ListingCurrentFields {
+ listing_key_hash: checksum(&listing_key),
+ listing_key,
+ seller_pubkey: projection.identity().seller_pubkey().as_str().to_owned(),
+ d: projection.identity().d().as_str().to_owned(),
+ published_at: first_tag_value(event, "published_at").and_then(|value| value.parse().ok()),
+ title: projection.text().title().to_owned(),
+ summary: projection.text().summary().map(str::to_owned),
+ content: projection.text().body().to_owned(),
+ price_decimal,
+ price_minor,
+ currency_raw: projection.price().currency().to_owned(),
+ currency_norm: projection.price().display_currency().to_owned(),
+ price_frequency: projection.price().frequency().map(str::to_owned),
+ unit: projection.unit().canonical().to_owned(),
+ location_text: projection.location().location_text().map(str::to_owned),
+ geohash: projection.location().geohash().map(str::to_owned),
+ geohash4: projection.location().geohash4().map(str::to_owned),
+ geohash5: projection.location().geohash5().map(str::to_owned),
+ geohash6: projection.location().geohash6().map(str::to_owned),
+ geohash7: projection.location().geohash7().map(str::to_owned),
+ status_tag: projection.status().raw_status().map(str::to_owned),
+ effective_status: projection
+ .status()
+ .effective_status()
+ .canonical()
+ .to_owned(),
+ categories: projection.taxonomy().categories().to_vec(),
+ tags: projection.taxonomy().topics().to_vec(),
+ practices: projection.taxonomy().practices().to_vec(),
+ certifications: projection.taxonomy().certifications().to_vec(),
+ image_urls: tag_values(event, "image"),
+ pickup_available: projection.fulfillment().pickup_available(),
+ delivery_available: projection.fulfillment().delivery_available(),
+ shipping_available: projection.fulfillment().shipping_available(),
+ delivery_only: projection.fulfillment().delivery_only(),
+ })
+}
+
fn listing_revision_fields(
event: &Event,
evaluation: &ListingProjectionEvaluation,
@@ -1358,6 +1571,17 @@ fn first_tag_value(event: &Event, name: &str) -> Option<String> {
.map(|value| value.as_str().to_owned())
}
+fn tag_values(event: &Event, name: &str) -> Vec<String> {
+ event
+ .unsigned()
+ .tags()
+ .iter()
+ .filter(|tag| tag.name().as_str() == name)
+ .filter_map(|tag| tag.value())
+ .map(|value| value.as_str().to_owned())
+ .collect()
+}
+
fn price_minor(raw: &str) -> Option<i64> {
let mut parts = raw.split('.');
let whole = parts.next()?.parse::<i64>().ok()?;
@@ -1409,10 +1633,10 @@ impl From<surrealdb::Error> for SurrealStoreError {
#[cfg(test)]
mod tests {
use super::{
- CurrentEventOutcome, DeletionMarkerOutcome, ListingRevisionOutcome, MigrationApplyOutcome,
- SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode, SurrealMigration,
- SurrealMigrationError, SurrealMigrationPlan, SurrealStore, base_migration_plan,
- migration_tracking_schema,
+ CurrentEventOutcome, DeletionMarkerOutcome, ListingCurrentOutcome, ListingRevisionOutcome,
+ MigrationApplyOutcome, SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode,
+ SurrealMigration, SurrealMigrationError, SurrealMigrationPlan, SurrealStore,
+ base_migration_plan, migration_tracking_schema,
};
use tangle_protocol::{
Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
@@ -2551,6 +2775,124 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn project_current_listings_persists_normalized_marketplace_rows() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let projected_at = UnixTimestamp::new(1_714_125_100);
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str());
+
+ assert_eq!(
+ store
+ .project_current_listing(&listing, projected_at)
+ .await
+ .expect("current listing"),
+ ListingCurrentOutcome::Projected
+ );
+
+ let row = store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("listing row")
+ .expect("listing row exists");
+ assert_eq!(row["listing_key"], listing_key);
+ assert_eq!(row["listing_key_hash"].as_str().expect("hash").len(), 64);
+ assert_eq!(row["event_id"], listing.id().as_str());
+ assert_eq!(row["seller_pubkey"], listing.unsigned().pubkey().as_str());
+ assert_eq!(row["d"], "listing-a");
+ assert_eq!(row["created_at"], 1_714_124_433_u64);
+ assert_eq!(row["updated_at"], 1_714_124_433_u64);
+ assert!(row["published_at"].is_null());
+ assert_eq!(row["title"], "Carrot bunches");
+ assert!(row["summary"].is_null());
+ assert_eq!(row["content"], "Sweet storage carrots.");
+ assert_eq!(row["price_decimal"], "12.50");
+ assert_eq!(row["price_minor"], 1_250_u64);
+ assert_eq!(row["currency_raw"], "USD");
+ assert_eq!(row["currency_norm"], "USD");
+ assert!(row["price_frequency"].is_null());
+ assert_eq!(row["unit"], "lb");
+ assert_eq!(row["unit_family"], "lb");
+ assert!(row["location_text"].is_null());
+ assert_eq!(row["geohash"], "c22yzug");
+ assert_eq!(row["geohash4"], "c22y");
+ assert_eq!(row["geohash5"], "c22yz");
+ assert_eq!(row["geohash6"], "c22yzu");
+ assert_eq!(row["geohash7"], "c22yzug");
+ assert!(row["point"].is_null());
+ assert!(row["status_tag"].is_null());
+ assert_eq!(row["effective_status"], "active");
+ assert_eq!(row["categories"].as_array().expect("categories").len(), 1);
+ assert_eq!(row["categories"][0], "vegetables");
+ assert_eq!(row["tags"].as_array().expect("tags").len(), 1);
+ assert_eq!(row["tags"][0], "carrots");
+ assert_eq!(row["practices"].as_array().expect("practices").len(), 1);
+ assert_eq!(row["practices"][0], "no spray");
+ assert_eq!(
+ row["certifications"]
+ .as_array()
+ .expect("certifications")
+ .len(),
+ 1
+ );
+ assert_eq!(row["certifications"][0], "organic");
+ assert_eq!(row["image_urls"].as_array().expect("images").len(), 0);
+ assert_eq!(row["pickup_available"], true);
+ assert_eq!(row["delivery_available"], false);
+ assert_eq!(row["shipping_available"], false);
+ assert_eq!(row["delivery_only"], false);
+ assert!(row["seller_trust_score"].is_null());
+ assert_eq!(row["hidden"], false);
+ assert_eq!(row["deleted"], false);
+ assert_eq!(row["projected_at"], projected_at.as_u64());
+
+ let pubkey = "d".repeat(PublicKeyHex::HEX_LENGTH);
+ let invalid = synthetic_event(
+ "e",
+ "9",
+ &pubkey,
+ 1_714_125_110,
+ 30_402,
+ vec![Tag::from_parts("d", &["listing-invalid"]).expect("d tag")],
+ "",
+ );
+ let note = synthetic_event(
+ "f",
+ "a",
+ &pubkey,
+ 1_714_125_111,
+ 1,
+ Vec::new(),
+ "not a listing",
+ );
+
+ assert_eq!(
+ store
+ .project_current_listing(&invalid, projected_at)
+ .await
+ .expect("invalid current"),
+ ListingCurrentOutcome::Ineligible
+ );
+ assert_eq!(
+ store
+ .project_current_listing(¬e, projected_at)
+ .await
+ .expect("note current"),
+ ListingCurrentOutcome::NotListing
+ );
+ assert!(
+ store
+ .listing_current_row(&format!("30402:{pubkey}:listing-invalid"))
+ .await
+ .expect("invalid row")
+ .is_none()
+ );
+ }
+
fn synthetic_event(
id_digit: &str,
sig_digit: &str,