commit 7efaeeb81a06a3a7dd2b61c23fc283b030da7504
parent 4d6ceba3f3eceb036e5faaa015a260041e3de473
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 02:33:28 -0700
http: add minimal admin policy api
Diffstat:
3 files changed, 624 insertions(+), 22 deletions(-)
diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs
@@ -387,6 +387,159 @@ async fn tangle_run_persists_durable_write_rate_limits() {
fs::remove_dir_all(&root).expect("remove runtime root");
}
+#[tokio::test]
+async fn tangle_run_serves_admin_policy_api() {
+ let port = free_port();
+ let root = std::env::temp_dir().join(format!(
+ "tangle-admin-policy-integration-{}-{port}",
+ std::process::id()
+ ));
+ let db_path = root.join("surrealdb");
+ let config_path = root.join("runtime.json");
+ fs::create_dir_all(&root).expect("runtime root");
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let auth = build_fixture_event(&auth_event_spec()).expect("auth");
+ let seller = FixtureKey::Seller.public_key();
+ let admin = FixtureKey::Relay.public_key();
+ let listing_key = format!("30402:{}:listing-a", seller.as_str());
+ write_runtime_config(
+ &config_path,
+ &db_path,
+ port,
+ "tangle_admin_policy",
+ serde_json::json!({
+ "admin_pubkeys": [admin.as_str()]
+ }),
+ );
+ let mut relay = spawn_relay(&config_path);
+ wait_for_http(port, &mut relay);
+
+ let unauthorized = http_post_json(
+ port,
+ &format!("/api/admin/sellers/{}/approve", seller.as_str()),
+ None,
+ serde_json::json!({}),
+ );
+ assert!(unauthorized.contains("401 Unauthorized"));
+ let approve = http_post_json(
+ port,
+ &format!("/api/admin/sellers/{}/approve", seller.as_str()),
+ Some(admin.as_str()),
+ serde_json::json!({}),
+ );
+ assert!(approve.contains("200 OK"));
+ assert!(approve.contains("\"status\":\"approved\""));
+ let seller_detail = http_get(port, &format!("/api/sellers/{}", seller.as_str()));
+ assert!(seller_detail.contains("\"approved\":true"));
+
+ let (mut client, _) = connect_async(format!("ws://127.0.0.1:{port}/ws"))
+ .await
+ .expect("client connect");
+ assert_eq!(next_label(&mut client).await, "AUTH");
+ client
+ .send(Message::Text(
+ serde_json::json!(["AUTH", event_to_value(&auth)])
+ .to_string()
+ .into(),
+ ))
+ .await
+ .expect("auth send");
+ assert_ok(&next_json(&mut client).await, true);
+ client
+ .send(Message::Text(
+ serde_json::json!(["EVENT", event_to_value(&listing)])
+ .to_string()
+ .into(),
+ ))
+ .await
+ .expect("event send");
+ assert_ok(&next_json(&mut client).await, true);
+ assert!(http_get(port, "/api/listings?limit=5").contains(listing.id().as_str()));
+
+ let hide = http_post_json(
+ port,
+ &format!("/api/admin/events/{}/hide", listing.id().as_str()),
+ Some(admin.as_str()),
+ serde_json::json!({
+ "reason": "admin policy integration"
+ }),
+ );
+ assert!(hide.contains("200 OK"));
+ assert!(hide.contains("\"status\":\"hidden\""));
+ assert!(!http_get(port, "/api/listings?limit=5").contains(listing.id().as_str()));
+ let unhide = http_post_json(
+ port,
+ &format!("/api/admin/events/{}/unhide", listing.id().as_str()),
+ Some(admin.as_str()),
+ serde_json::json!({
+ "reason": "admin policy integration complete"
+ }),
+ );
+ assert!(unhide.contains("200 OK"));
+ assert!(unhide.contains("\"status\":\"unhidden\""));
+ assert!(http_get(port, "/api/listings?limit=5").contains(listing.id().as_str()));
+ let block = http_post_json(
+ port,
+ &format!("/api/admin/pubkeys/{}/block", seller.as_str()),
+ Some(admin.as_str()),
+ serde_json::json!({}),
+ );
+ assert!(block.contains("200 OK"));
+ assert!(block.contains("\"status\":\"blocked\""));
+ stop_relay(relay);
+
+ let store_config = SurrealConnectionConfig::rocksdb(
+ db_path.to_str().expect("db path"),
+ "tangle_admin_policy",
+ "relay",
+ )
+ .expect("store config");
+ let store = reopen_store(&store_config).await;
+ let user = store
+ .relay_user_row(seller.as_str())
+ .await
+ .expect("relay user")
+ .expect("relay user exists");
+ assert_eq!(user["seller_approved"], true);
+ assert_eq!(user["blocked"], true);
+ assert!(
+ store
+ .hidden_event_row(listing.id())
+ .await
+ .expect("hidden row")
+ .is_none()
+ );
+ assert_eq!(
+ store
+ .raw_event_row(listing.id())
+ .await
+ .expect("raw row")
+ .expect("raw row exists")["hidden"],
+ false
+ );
+ assert_eq!(
+ store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("listing row")
+ .expect("listing row exists")["hidden"],
+ false
+ );
+ let actions = store
+ .moderation_action_rows("event", listing.id().as_str())
+ .await
+ .expect("moderation actions");
+ assert_eq!(actions.len(), 2);
+ let action_labels = actions
+ .iter()
+ .map(|action| action["action"].as_str().expect("action label"))
+ .collect::<std::collections::BTreeSet<_>>();
+ assert!(action_labels.contains("hide"));
+ assert!(action_labels.contains("unhide"));
+ drop(store);
+ fs::remove_dir_all(&root).expect("remove runtime root");
+}
+
struct PolicyWriteScenario {
root: std::path::PathBuf,
store_config: SurrealConnectionConfig,
@@ -518,6 +671,29 @@ fn http_get(port: u16, path: &str) -> String {
try_http_get(port, path).expect("http get")
}
+fn http_post_json(port: u16, path: &str, admin_pubkey: Option<&str>, body: Value) -> String {
+ let body = body.to_string();
+ let mut stream = TcpStream::connect(("127.0.0.1", port)).expect("http connect");
+ stream
+ .set_read_timeout(Some(Duration::from_secs(2)))
+ .expect("read timeout");
+ stream
+ .set_write_timeout(Some(Duration::from_secs(2)))
+ .expect("write timeout");
+ let admin_header = admin_pubkey
+ .map(|pubkey| format!("x-tangle-admin-pubkey: {pubkey}\r\n"))
+ .unwrap_or_default();
+ write!(
+ stream,
+ "POST {path} HTTP/1.1\r\nHost: 127.0.0.1:{port}\r\nAccept: application/json\r\nContent-Type: application/json\r\nContent-Length: {}\r\n{admin_header}Connection: close\r\n\r\n{body}",
+ body.len()
+ )
+ .expect("http post");
+ let mut response = String::new();
+ stream.read_to_string(&mut response).expect("http read");
+ response
+}
+
fn try_http_get(port: u16, path: &str) -> Result<String, std::io::Error> {
let mut stream = TcpStream::connect(("127.0.0.1", port))?;
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -5,7 +5,7 @@ use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::{Path, RawQuery, State},
response::{IntoResponse, Response},
- routing::get,
+ routing::{get, post},
};
use core::fmt;
use http::{HeaderMap, HeaderValue, StatusCode, header};
@@ -237,6 +237,7 @@ pub struct TangleRuntimeConfig {
database: SurrealConnectionConfig,
admission_policy: AdmissionPolicy,
durable_write_rate_limit: Option<RateLimitConfig>,
+ admin_pubkeys: BTreeSet<PublicKeyHex>,
limits: RuntimeLimits,
}
@@ -261,6 +262,10 @@ impl TangleRuntimeConfig {
self.durable_write_rate_limit
}
+ pub fn admin_pubkeys(&self) -> &BTreeSet<PublicKeyHex> {
+ &self.admin_pubkeys
+ }
+
pub fn limits(&self) -> RuntimeLimits {
self.limits
}
@@ -950,6 +955,22 @@ fn runtime_router(
.route("/api/listings/{pubkey}/{d}", get(runtime_listing_detail))
.route("/api/search", get(runtime_marketplace_search))
.route("/api/sellers/{pubkey}", get(runtime_seller_detail))
+ .route(
+ "/api/admin/sellers/{pubkey}/approve",
+ post(runtime_admin_approve_seller),
+ )
+ .route(
+ "/api/admin/pubkeys/{pubkey}/block",
+ post(runtime_admin_block_pubkey),
+ )
+ .route(
+ "/api/admin/events/{event_id}/hide",
+ post(runtime_admin_hide_event),
+ )
+ .route(
+ "/api/admin/events/{event_id}/unhide",
+ post(runtime_admin_unhide_event),
+ )
.with_state(state)
}
@@ -1036,6 +1057,106 @@ async fn runtime_seller_detail(
.await
}
+async fn runtime_admin_approve_seller(
+ State(state): State<RuntimeRelayState>,
+ headers: HeaderMap,
+ Path(pubkey): Path<String>,
+) -> Result<Json<AdminPolicyDocument>, ApiError> {
+ let _admin = require_admin_pubkey(&state.config, &headers)?;
+ let pubkey = parse_pubkey("pubkey", &pubkey)?;
+ state
+ .store
+ .set_seller_approved(pubkey.as_str(), true, now_timestamp())
+ .await
+ .map_err(|_| ApiError::internal())?;
+ Ok(Json(AdminPolicyDocument::new(
+ "approved",
+ "seller",
+ pubkey.as_str(),
+ )))
+}
+
+async fn runtime_admin_block_pubkey(
+ State(state): State<RuntimeRelayState>,
+ headers: HeaderMap,
+ Path(pubkey): Path<String>,
+) -> Result<Json<AdminPolicyDocument>, ApiError> {
+ let _admin = require_admin_pubkey(&state.config, &headers)?;
+ let pubkey = parse_pubkey("pubkey", &pubkey)?;
+ state
+ .store
+ .set_pubkey_blocked(pubkey.as_str(), true, now_timestamp())
+ .await
+ .map_err(|_| ApiError::internal())?;
+ Ok(Json(AdminPolicyDocument::new(
+ "blocked",
+ "pubkey",
+ pubkey.as_str(),
+ )))
+}
+
+async fn runtime_admin_hide_event(
+ State(state): State<RuntimeRelayState>,
+ headers: HeaderMap,
+ Path(event_id): Path<String>,
+ Json(request): Json<AdminEventPolicyRequest>,
+) -> Result<Json<AdminPolicyDocument>, ApiError> {
+ let admin = require_admin_pubkey(&state.config, &headers)?;
+ let event_id = EventId::new(&event_id)
+ .map_err(|_| invalid_parameter("event_id", "must be a 64-character hex event id"))?;
+ let reason = request.reason.unwrap_or_else(|| "admin policy".to_owned());
+ match state
+ .store
+ .hide_event(
+ &event_id,
+ &reason,
+ "admin_api",
+ admin.as_str(),
+ now_timestamp(),
+ )
+ .await
+ .map_err(|_| ApiError::internal())?
+ {
+ tangle_store_surreal::HiddenEventOutcome::Hidden => Ok(Json(AdminPolicyDocument::new(
+ "hidden",
+ "event",
+ event_id.as_str(),
+ ))),
+ tangle_store_surreal::HiddenEventOutcome::NotFound => {
+ Err(ApiError::not_found("event not found"))
+ }
+ tangle_store_surreal::HiddenEventOutcome::Unhidden => Err(ApiError::internal()),
+ }
+}
+
+async fn runtime_admin_unhide_event(
+ State(state): State<RuntimeRelayState>,
+ headers: HeaderMap,
+ Path(event_id): Path<String>,
+ Json(request): Json<AdminEventPolicyRequest>,
+) -> Result<Json<AdminPolicyDocument>, ApiError> {
+ let admin = require_admin_pubkey(&state.config, &headers)?;
+ let event_id = EventId::new(&event_id)
+ .map_err(|_| invalid_parameter("event_id", "must be a 64-character hex event id"))?;
+ let reason = request.reason.unwrap_or_else(|| "admin policy".to_owned());
+ match state
+ .store
+ .unhide_event(&event_id, &reason, admin.as_str(), now_timestamp())
+ .await
+ .map_err(|_| ApiError::internal())?
+ {
+ tangle_store_surreal::HiddenEventOutcome::Unhidden => Ok(Json(AdminPolicyDocument::new(
+ "unhidden",
+ "event",
+ event_id.as_str(),
+ ))),
+ tangle_store_surreal::HiddenEventOutcome::NotFound => {
+ Err(ApiError::not_found("event not found"))
+ }
+ tangle_store_surreal::HiddenEventOutcome::Hidden => Err(ApiError::internal()),
+ }
+}
+
async fn handle_websocket(mut socket: WebSocket, state: RuntimeRelayState) {
let mut shutdown = state.shutdown_signal.subscribe();
let mut event_rx = state.event_tx.subscribe();
@@ -1277,6 +1398,8 @@ struct RuntimePolicyConfigDocument {
unapproved_seller_action: Option<RuntimeUnapprovedSellerActionDocument>,
write_rate_limit: Option<RuntimeRateLimitConfigDocument>,
#[serde(default)]
+ admin_pubkeys: Vec<String>,
+ #[serde(default)]
approved_sellers: Vec<String>,
#[serde(default)]
blocked_pubkeys: Vec<String>,
@@ -1309,6 +1432,7 @@ fn runtime_config_from_document(
.map_err(RuntimeConfigError::invalid)?;
let database = database_config_from_document(document.database)?;
let durable_write_rate_limit = durable_write_rate_limit_from_document(&document.policy)?;
+ let admin_pubkeys = admin_pubkeys_from_document(&document.policy)?;
let admission_policy = admission_policy_from_document(&document.policy)?;
Ok(TangleRuntimeConfig {
listen_addr,
@@ -1316,6 +1440,7 @@ fn runtime_config_from_document(
database,
admission_policy,
durable_write_rate_limit,
+ admin_pubkeys,
limits: limits.runtime,
})
}
@@ -1410,6 +1535,22 @@ fn durable_write_rate_limit_from_document(
.transpose()
}
+fn admin_pubkeys_from_document(
+ document: &RuntimePolicyConfigDocument,
+) -> Result<BTreeSet<PublicKeyHex>, RuntimeConfigError> {
+ document
+ .admin_pubkeys
+ .iter()
+ .map(|pubkey| {
+ PublicKeyHex::new(pubkey.as_str()).map_err(|error| {
+ RuntimeConfigError::invalid(format!(
+ "policy.admin_pubkeys contains invalid pubkey: {error}"
+ ))
+ })
+ })
+ .collect()
+}
+
fn admission_policy_from_document(
document: &RuntimePolicyConfigDocument,
) -> Result<AdmissionPolicy, RuntimeConfigError> {
@@ -1663,6 +1804,13 @@ impl EventMessageHandler {
if validated.admission().effect() == AdmissionEffect::AuthenticateOnly {
return ok_rejected(event_id, "invalid: auth events must use AUTH".to_owned());
}
+ let effect = match self
+ .effective_admission_effect(&event, validated.admission().effect())
+ .await
+ {
+ Ok(effect) => effect,
+ Err(_) => return ok_rejected(event_id, "error: policy unavailable".to_owned()),
+ };
if let Some(config) = self.durable_write_rate_limit {
match self
.store
@@ -1715,7 +1863,7 @@ impl EventMessageHandler {
{
return ok_rejected(event_id, "error: projection failed".to_owned());
}
- if validated.admission().effect() == AdmissionEffect::StoreRawAndProjectPublicListing
+ if effect == AdmissionEffect::StoreRawAndProjectPublicListing
&& (self
.store
.project_current_listing(&event, now)
@@ -1732,6 +1880,38 @@ impl EventMessageHandler {
}
ok_accepted(event_id)
}
+
+ async fn effective_admission_effect(
+ &self,
+ event: &Event,
+ fallback: AdmissionEffect,
+ ) -> Result<AdmissionEffect, tangle_store_surreal::SurrealStoreError> {
+ if event.unsigned().kind().as_u32() != 30_402 {
+ return Ok(fallback);
+ }
+ let Some(row) = self
+ .store
+ .relay_user_row(event.unsigned().pubkey().as_str())
+ .await?
+ else {
+ return Ok(fallback);
+ };
+ if row
+ .get("blocked")
+ .and_then(serde_json::Value::as_bool)
+ .unwrap_or(false)
+ {
+ return Ok(AdmissionEffect::StoreRawWithoutPublicListingProjection);
+ }
+ if row
+ .get("seller_approved")
+ .and_then(serde_json::Value::as_bool)
+ .unwrap_or(false)
+ {
+ return Ok(AdmissionEffect::StoreRawAndProjectPublicListing);
+ }
+ Ok(fallback)
+ }
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
@@ -2344,6 +2524,28 @@ pub struct SellerDocument {
pub active_listing_count: u64,
}
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct AdminPolicyDocument {
+ pub status: String,
+ pub target_type: String,
+ pub target_ref: String,
+}
+
+impl AdminPolicyDocument {
+ pub fn new(status: &str, target_type: &str, target_ref: &str) -> Self {
+ Self {
+ status: status.to_owned(),
+ target_type: target_type.to_owned(),
+ target_ref: target_ref.to_owned(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
+pub struct AdminEventPolicyRequest {
+ pub reason: Option<String>,
+}
+
pub fn parse_listing_query(
query: &str,
limits: RuntimeLimits,
@@ -2662,7 +2864,11 @@ async fn seller_detail(
Path(pubkey): Path<String>,
) -> Result<Json<SellerDocument>, ApiError> {
let pubkey = parse_pubkey("pubkey", &pubkey)?;
- let seller = seller_policy_row(&state.store, pubkey.as_str()).await?;
+ let seller = state
+ .store
+ .relay_user_row(pubkey.as_str())
+ .await
+ .map_err(|_| ApiError::internal())?;
let listings = state
.store
.query_current_listings(
@@ -2702,6 +2908,26 @@ fn accepts_nostr_json(value: Option<&HeaderValue>) -> bool {
})
}
+fn require_admin_pubkey(
+ config: &TangleRuntimeConfig,
+ headers: &HeaderMap,
+) -> Result<PublicKeyHex, ApiError> {
+ if config.admin_pubkeys().is_empty() {
+ return Err(ApiError::forbidden("admin policy api is disabled"));
+ }
+ let value = headers
+ .get("x-tangle-admin-pubkey")
+ .ok_or_else(|| ApiError::unauthorized("admin pubkey header is required"))?
+ .to_str()
+ .map_err(|_| ApiError::unauthorized("admin pubkey header is invalid"))?;
+ let pubkey = PublicKeyHex::new(value)
+ .map_err(|_| ApiError::unauthorized("admin pubkey header is invalid"))?;
+ if !config.admin_pubkeys().contains(&pubkey) {
+ return Err(ApiError::forbidden("admin pubkey is not authorized"));
+ }
+ Ok(pubkey)
+}
+
fn is_websocket_upgrade(headers: &HeaderMap) -> bool {
headers
.get(header::UPGRADE)
@@ -2813,24 +3039,6 @@ fn search_document_query(parsed: &MarketplaceSearchHttpQuery) -> SearchDocumentQ
query
}
-async fn seller_policy_row(
- store: &SurrealStore,
- pubkey: &str,
-) -> Result<Option<serde_json::Value>, ApiError> {
- let mut response = store
- .database()
- .query("SELECT * FROM relay_user WHERE pubkey = $pubkey LIMIT 1;")
- .bind(("pubkey", pubkey))
- .await
- .map_err(|_| ApiError::internal())?
- .check()
- .map_err(|_| ApiError::internal())?;
- let rows = response
- .take::<Vec<serde_json::Value>>(0)
- .map_err(|_| ApiError::internal())?;
- Ok(rows.into_iter().next())
-}
-
fn listing_item_document(row: &serde_json::Value) -> Result<ListingItemDocument, ApiError> {
Ok(ListingItemDocument {
listing_key: string_field(row, "listing_key")?,
@@ -3177,7 +3385,7 @@ mod tests {
};
use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event};
use tangle_protocol::{
- ClientMessage, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value,
+ ClientMessage, PublicKeyHex, RelayMessage, SubscriptionId, UnixTimestamp, event_to_value,
filter_from_value,
};
use tangle_store::StoredEvent;
@@ -3414,6 +3622,9 @@ mod tests {
}
},
"policy": {
+ "admin_pubkeys": [
+ "1111111111111111111111111111111111111111111111111111111111111111"
+ ],
"write_rate_limit": {
"limit": 2,
"window_seconds": 60
@@ -3442,6 +3653,14 @@ mod tests {
config.durable_write_rate_limit(),
Some(RateLimitConfig::new(2, 60).expect("write limit"))
);
+ assert!(
+ config.admin_pubkeys().contains(
+ &PublicKeyHex::new(
+ "1111111111111111111111111111111111111111111111111111111111111111"
+ )
+ .expect("admin pubkey")
+ )
+ );
assert_eq!(config.database_config().namespace(), "tangle_test");
assert_eq!(config.database_config().database(), "relay");
assert_eq!(
@@ -3988,6 +4207,58 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn event_message_handler_applies_dynamic_seller_policy_rows() {
+ let store = runtime_memory_store().await;
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let listing_key = format!("30402:{}:listing-a", listing.unsigned().pubkey().as_str());
+ let connection = authenticated_connection();
+ let handler = EventMessageHandler::new(
+ store.clone(),
+ EventValidator::new(RuntimeLimits::default(), AdmissionPolicy::new()),
+ );
+
+ store
+ .set_seller_approved(
+ listing.unsigned().pubkey().as_str(),
+ true,
+ UnixTimestamp::new(1_714_126_200),
+ )
+ .await
+ .expect("approve seller");
+ let accepted = handler
+ .handle_event(
+ &connection,
+ listing.clone(),
+ UnixTimestamp::new(1_714_126_201),
+ UnixTimestamp::new(1_714_126_201),
+ )
+ .await;
+
+ assert_eq!(
+ accepted,
+ RelayMessage::Ok {
+ event_id: listing.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ );
+ assert!(
+ store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("listing row")
+ .is_some()
+ );
+ assert!(
+ store
+ .search_document_row(&listing_key)
+ .await
+ .expect("search row")
+ .is_some()
+ );
+ }
+
#[test]
fn auth_message_handler_issues_and_accepts_auth_events() {
let handler = AuthMessageHandler;
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -2085,6 +2085,72 @@ UPDATE search_doc SET visible = false WHERE (event_id = $event_id OR current_eve
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn relay_user_row(
+ &self,
+ pubkey: &str,
+ ) -> Result<Option<serde_json::Value>, SurrealStoreError> {
+ let pubkey = required_policy_text(pubkey, "relay user pubkey")?;
+ let mut response = self
+ .db
+ .query("SELECT * FROM ONLY type::record('relay_user', $pubkey);")
+ .bind(("pubkey", pubkey.as_str()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
+ pub async fn set_seller_approved(
+ &self,
+ pubkey: &str,
+ approved: bool,
+ updated_at: UnixTimestamp,
+ ) -> Result<(), SurrealStoreError> {
+ let pubkey = required_policy_text(pubkey, "relay user pubkey")?;
+ let existing = self.relay_user_row(&pubkey).await?;
+ let blocked = existing
+ .as_ref()
+ .and_then(|row| row.get("blocked"))
+ .and_then(serde_json::Value::as_bool)
+ .unwrap_or(false);
+ let created_at = existing
+ .as_ref()
+ .and_then(|row| row.get("created_at"))
+ .and_then(serde_json::Value::as_u64)
+ .unwrap_or_else(|| updated_at.as_u64());
+ self.upsert_relay_user(&pubkey, "seller", approved, blocked, created_at, updated_at)
+ .await
+ }
+
+ pub async fn set_pubkey_blocked(
+ &self,
+ pubkey: &str,
+ blocked: bool,
+ updated_at: UnixTimestamp,
+ ) -> Result<(), SurrealStoreError> {
+ let pubkey = required_policy_text(pubkey, "relay user pubkey")?;
+ let existing = self.relay_user_row(&pubkey).await?;
+ let approved = existing
+ .as_ref()
+ .and_then(|row| row.get("seller_approved"))
+ .and_then(serde_json::Value::as_bool)
+ .unwrap_or(false);
+ let role = existing
+ .as_ref()
+ .and_then(|row| row.get("role"))
+ .and_then(serde_json::Value::as_str)
+ .unwrap_or("seller")
+ .to_owned();
+ let created_at = existing
+ .as_ref()
+ .and_then(|row| row.get("created_at"))
+ .and_then(serde_json::Value::as_u64)
+ .unwrap_or_else(|| updated_at.as_u64());
+ self.upsert_relay_user(&pubkey, &role, approved, blocked, created_at, updated_at)
+ .await
+ }
+
pub async fn check_durable_rate_limit(
&self,
key: &str,
@@ -2263,6 +2329,41 @@ UPSERT type::record('rate_limit_state', $key) CONTENT {
Ok(())
}
+ async fn upsert_relay_user(
+ &self,
+ pubkey: &str,
+ role: &str,
+ seller_approved: bool,
+ blocked: bool,
+ created_at: u64,
+ updated_at: UnixTimestamp,
+ ) -> Result<(), SurrealStoreError> {
+ self.db
+ .query(
+ r#"
+UPSERT type::record('relay_user', $pubkey) CONTENT {
+ pubkey: $pubkey,
+ role: $role,
+ seller_approved: $seller_approved,
+ blocked: $blocked,
+ created_at: $created_at,
+ updated_at: $updated_at
+};
+"#,
+ )
+ .bind(("pubkey", pubkey))
+ .bind(("role", role))
+ .bind(("seller_approved", seller_approved))
+ .bind(("blocked", blocked))
+ .bind(("created_at", created_at))
+ .bind(("updated_at", updated_at.as_u64()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ Ok(())
+ }
+
async fn query_single_indexed_tag_event_ids(
&self,
tag: &str,
@@ -5081,6 +5182,60 @@ mod tests {
}
#[tokio::test]
+ async fn relay_user_policy_rows_persist_approval_and_block_state() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let seller = "2".repeat(PublicKeyHex::HEX_LENGTH);
+
+ store
+ .set_seller_approved(&seller, true, UnixTimestamp::new(1_714_125_900))
+ .await
+ .expect("approve seller");
+ let approved = store
+ .relay_user_row(&seller)
+ .await
+ .expect("relay user")
+ .expect("relay user exists");
+ assert_eq!(approved["pubkey"], seller);
+ assert_eq!(approved["role"], "seller");
+ assert_eq!(approved["seller_approved"], true);
+ assert_eq!(approved["blocked"], false);
+ assert_eq!(approved["created_at"], 1_714_125_900_u64);
+ assert_eq!(approved["updated_at"], 1_714_125_900_u64);
+
+ store
+ .set_pubkey_blocked(&seller, true, UnixTimestamp::new(1_714_126_000))
+ .await
+ .expect("block seller");
+ let blocked = store
+ .relay_user_row(&seller)
+ .await
+ .expect("relay user")
+ .expect("relay user exists");
+ assert_eq!(blocked["seller_approved"], true);
+ assert_eq!(blocked["blocked"], true);
+ assert_eq!(blocked["created_at"], 1_714_125_900_u64);
+ assert_eq!(blocked["updated_at"], 1_714_126_000_u64);
+
+ store
+ .set_seller_approved(&seller, false, UnixTimestamp::new(1_714_126_100))
+ .await
+ .expect("unapprove seller");
+ let unapproved = store
+ .relay_user_row(&seller)
+ .await
+ .expect("relay user")
+ .expect("relay user exists");
+ assert_eq!(unapproved["seller_approved"], false);
+ assert_eq!(unapproved["blocked"], true);
+ assert_eq!(unapproved["created_at"], 1_714_125_900_u64);
+ assert_eq!(unapproved["updated_at"], 1_714_126_100_u64);
+ }
+
+ #[tokio::test]
async fn private_helpers_cover_debug_errors_and_decimal_edges() {
let store = memory_store().await;
assert!(format!("{store:?}").contains("SurrealStore"));