commit bbe69bb83ea89727fd182662011959cde658c9be
parent 79486928acfe79455721c6ac02068b6b6ccaaad7
Author: triesap <tyson@radroots.org>
Date: Fri, 5 Jun 2026 22:51:40 -0700
store-surreal: maintain current events
Diffstat:
1 file changed, 278 insertions(+), 3 deletions(-)
diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs
@@ -649,6 +649,14 @@ pub enum MigrationApplyOutcome {
AlreadyApplied,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CurrentEventOutcome {
+ NotCurrent,
+ Inserted,
+ Replaced,
+ Unchanged,
+}
+
#[derive(Clone)]
pub struct SurrealStore {
db: Surreal<Db>,
@@ -882,6 +890,68 @@ CREATE event_tag_index CONTENT {
response.take(0).map_err(SurrealStoreError::from)
}
+ pub async fn maintain_current_event(
+ &self,
+ event: &Event,
+ ) -> Result<CurrentEventOutcome, SurrealStoreError> {
+ let Some(current_key) = current_event_key(event)? else {
+ return Ok(CurrentEventOutcome::NotCurrent);
+ };
+ let existing = self.current_event_row(¤t_key.address_key).await?;
+ let outcome = existing
+ .as_ref()
+ .map(|row| current_event_replacement_outcome(event, row))
+ .unwrap_or(CurrentEventOutcome::Inserted);
+ if outcome == CurrentEventOutcome::Unchanged {
+ return Ok(outcome);
+ }
+ self.db
+ .query(
+ r#"
+UPSERT type::record('event_current', $address_key) CONTENT {
+ address_key: $address_key,
+ kind: $kind,
+ pubkey: $pubkey,
+ d: $d,
+ event_id: $event_id,
+ created_at: $created_at,
+ tie_break_id: $tie_break_id,
+ deleted: false,
+ hidden: false,
+ updated_at: $updated_at
+};
+"#,
+ )
+ .bind(("address_key", current_key.address_key))
+ .bind(("kind", event.unsigned().kind().as_u32()))
+ .bind(("pubkey", event.unsigned().pubkey().as_str()))
+ .bind(("d", current_key.d))
+ .bind(("event_id", event.id().as_str()))
+ .bind(("created_at", event.unsigned().created_at().as_u64()))
+ .bind(("tie_break_id", event.id().as_str()))
+ .bind(("updated_at", event.unsigned().created_at().as_u64()))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ Ok(outcome)
+ }
+
+ pub async fn current_event_row(
+ &self,
+ address_key: &str,
+ ) -> Result<Option<serde_json::Value>, SurrealStoreError> {
+ let mut response = self
+ .db
+ .query("SELECT * FROM ONLY type::record('event_current', $address_key);")
+ .bind(("address_key", address_key))
+ .await
+ .map_err(SurrealStoreError::from)?
+ .check()
+ .map_err(SurrealStoreError::from)?;
+ response.take(0).map_err(SurrealStoreError::from)
+ }
+
async fn applied_migration(
&self,
name: &str,
@@ -957,6 +1027,46 @@ fn address_key_value(event: &Event) -> Result<Option<String>, SurrealStoreError>
.map_err(|message| SurrealStoreError::new(&message))
}
+struct CurrentEventKey {
+ address_key: String,
+ d: Option<String>,
+}
+
+fn current_event_key(event: &Event) -> Result<Option<CurrentEventKey>, SurrealStoreError> {
+ let kind = event.unsigned().kind();
+ if kind.is_addressable() {
+ let coordinate = AddressCoordinate::from_event(event)
+ .map_err(|message| SurrealStoreError::new(&message))?;
+ return Ok(coordinate.map(|coordinate| CurrentEventKey {
+ address_key: coordinate.key().to_string(),
+ d: Some(coordinate.d().as_str().to_owned()),
+ }));
+ }
+ if kind.is_replaceable() {
+ return Ok(Some(CurrentEventKey {
+ address_key: format!("{}:{}", kind.as_u32(), event.unsigned().pubkey().as_str()),
+ d: None,
+ }));
+ }
+ Ok(None)
+}
+
+fn current_event_replacement_outcome(
+ event: &Event,
+ row: &serde_json::Value,
+) -> CurrentEventOutcome {
+ let incoming_created_at = event.unsigned().created_at().as_u64();
+ let existing_created_at = row["created_at"].as_u64().unwrap_or_default();
+ let existing_event_id = row["event_id"].as_str().unwrap_or_default();
+ if incoming_created_at > existing_created_at
+ || (incoming_created_at == existing_created_at && event.id().as_str() > existing_event_id)
+ {
+ CurrentEventOutcome::Replaced
+ } else {
+ CurrentEventOutcome::Unchanged
+ }
+}
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SurrealStoreError {
message: String,
@@ -991,9 +1101,9 @@ impl From<surrealdb::Error> for SurrealStoreError {
#[cfg(test)]
mod tests {
use super::{
- MigrationApplyOutcome, SurrealConfigError, SurrealConnectionConfig, SurrealConnectionMode,
- SurrealMigration, SurrealMigrationError, SurrealMigrationPlan, SurrealStore,
- base_migration_plan, migration_tracking_schema,
+ CurrentEventOutcome, MigrationApplyOutcome, SurrealConfigError, SurrealConnectionConfig,
+ SurrealConnectionMode, SurrealMigration, SurrealMigrationError, SurrealMigrationPlan,
+ SurrealStore, base_migration_plan, migration_tracking_schema,
};
use tangle_protocol::{
Event, EventId, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent,
@@ -1734,4 +1844,169 @@ mod tests {
event.unsigned().created_at().as_u64()
);
}
+
+ #[tokio::test]
+ async fn maintain_current_events_tracks_replaceable_and_addressable_winners() {
+ let store = memory_store().await;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("apply plan");
+ let pubkey = "a".repeat(PublicKeyHex::HEX_LENGTH);
+ let replaceable_key = format!("3:{pubkey}");
+ let first = synthetic_event(
+ "1",
+ "b",
+ &pubkey,
+ 1_714_124_700,
+ 3,
+ Vec::new(),
+ "profile one",
+ );
+ let older = synthetic_event(
+ "2",
+ "c",
+ &pubkey,
+ 1_714_124_699,
+ 3,
+ Vec::new(),
+ "profile older",
+ );
+ let newer = synthetic_event(
+ "3",
+ "d",
+ &pubkey,
+ 1_714_124_701,
+ 3,
+ Vec::new(),
+ "profile newer",
+ );
+ let tied_lower = synthetic_event(
+ "2",
+ "e",
+ &pubkey,
+ 1_714_124_701,
+ 3,
+ Vec::new(),
+ "profile tied lower",
+ );
+ let tied_higher = synthetic_event(
+ "4",
+ "f",
+ &pubkey,
+ 1_714_124_701,
+ 3,
+ Vec::new(),
+ "profile tied higher",
+ );
+ let regular = synthetic_event(
+ "5",
+ "1",
+ &pubkey,
+ 1_714_124_702,
+ 1,
+ Vec::new(),
+ "regular note",
+ );
+
+ assert_eq!(
+ store.maintain_current_event(&first).await.expect("first"),
+ CurrentEventOutcome::Inserted
+ );
+ assert_eq!(
+ store.maintain_current_event(&older).await.expect("older"),
+ CurrentEventOutcome::Unchanged
+ );
+ assert_eq!(
+ store.maintain_current_event(&newer).await.expect("newer"),
+ CurrentEventOutcome::Replaced
+ );
+ assert_eq!(
+ store
+ .maintain_current_event(&tied_lower)
+ .await
+ .expect("tied lower"),
+ CurrentEventOutcome::Unchanged
+ );
+ assert_eq!(
+ store
+ .maintain_current_event(&tied_higher)
+ .await
+ .expect("tied higher"),
+ CurrentEventOutcome::Replaced
+ );
+ assert_eq!(
+ store
+ .maintain_current_event(®ular)
+ .await
+ .expect("regular"),
+ CurrentEventOutcome::NotCurrent
+ );
+
+ let row = store
+ .current_event_row(&replaceable_key)
+ .await
+ .expect("replaceable row")
+ .expect("replaceable row exists");
+ assert_eq!(row["address_key"], replaceable_key);
+ assert_eq!(row["kind"], 3_u64);
+ assert_eq!(row["pubkey"], pubkey);
+ assert_eq!(row["event_id"], tied_higher.id().as_str());
+ assert_eq!(row["tie_break_id"], tied_higher.id().as_str());
+ assert_eq!(row["created_at"], 1_714_124_701_u64);
+ assert!(row["d"].is_null());
+ assert_eq!(row["deleted"], false);
+ assert_eq!(row["hidden"], false);
+
+ let addressable = synthetic_event(
+ "6",
+ "2",
+ &pubkey,
+ 1_714_124_703,
+ 30_402,
+ vec![Tag::from_parts("d", &["listing-a"]).expect("d tag")],
+ "listing projection",
+ );
+ let addressable_key = format!("30402:{pubkey}:listing-a");
+
+ assert_eq!(
+ store
+ .maintain_current_event(&addressable)
+ .await
+ .expect("addressable"),
+ CurrentEventOutcome::Inserted
+ );
+
+ let addressable_row = store
+ .current_event_row(&addressable_key)
+ .await
+ .expect("addressable row")
+ .expect("addressable row exists");
+ assert_eq!(addressable_row["address_key"], addressable_key);
+ assert_eq!(addressable_row["kind"], 30_402_u64);
+ assert_eq!(addressable_row["d"], "listing-a");
+ assert_eq!(addressable_row["event_id"], addressable.id().as_str());
+ }
+
+ fn synthetic_event(
+ id_digit: &str,
+ sig_digit: &str,
+ pubkey: &str,
+ created_at: u64,
+ kind: u64,
+ tags: Vec<Tag>,
+ content: &str,
+ ) -> Event {
+ Event::new(
+ EventId::new(&id_digit.repeat(EventId::HEX_LENGTH)).expect("id"),
+ UnsignedEvent::new(
+ PublicKeyHex::new(pubkey).expect("pubkey"),
+ UnixTimestamp::new(created_at),
+ Kind::new(kind).expect("kind"),
+ tags,
+ content,
+ ),
+ SignatureHex::new(&sig_digit.repeat(SignatureHex::HEX_LENGTH)).expect("sig"),
+ )
+ }
}