commit 4f1cb74955e563b3b8263bf3e3e155d436bb3d63
parent f2197ab754d4cd226d5b7e46216e59ac7b888dff
Author: triesap <tyson@radroots.org>
Date: Sat, 9 May 2026 02:17:21 +0000
sync: expose pending publish batches
Diffstat:
2 files changed, 124 insertions(+), 15 deletions(-)
diff --git a/crates/replica_sync/src/lib.rs b/crates/replica_sync/src/lib.rs
@@ -24,7 +24,11 @@ pub use ingest::{
RadrootsReplicaIdFactory, RadrootsReplicaIngestOutcome, radroots_replica_ingest_event_state,
radroots_replica_ingest_event_with_factory,
};
-pub use sync_state::{RadrootsReplicaSyncStatus, radroots_replica_sync_status};
+pub use sync_state::{
+ RadrootsReplicaPendingPublishBatch, RadrootsReplicaPendingPublishEvent,
+ RadrootsReplicaSyncStatus, radroots_replica_pending_publish_batch,
+ radroots_replica_sync_status,
+};
pub use types::{
RADROOTS_REPLICA_TRANSFER_VERSION, RadrootsReplicaEventDraft, RadrootsReplicaFarmSelector,
RadrootsReplicaSyncBundle, RadrootsReplicaSyncOptions, RadrootsReplicaSyncRequest,
diff --git a/crates/replica_sync/src/sync_state.rs b/crates/replica_sync/src/sync_state.rs
@@ -9,7 +9,7 @@ use radroots_sql_core::SqlExecutor;
use crate::error::RadrootsReplicaEventsError;
use crate::event_state::{event_content_hash, event_state_key, tag_value};
-use crate::types::RadrootsReplicaFarmSelector;
+use crate::types::{RadrootsReplicaEventDraft, RadrootsReplicaFarmSelector};
#[derive(Clone, Debug)]
pub struct RadrootsReplicaSyncStatus {
@@ -17,12 +17,39 @@ pub struct RadrootsReplicaSyncStatus {
pub pending_count: usize,
}
+#[derive(Clone, Debug)]
+pub struct RadrootsReplicaPendingPublishEvent {
+ pub key: String,
+ pub kind: u32,
+ pub author: String,
+ pub d_tag: String,
+ pub content_hash: String,
+ pub draft: RadrootsReplicaEventDraft,
+}
+
+#[derive(Clone, Debug)]
+pub struct RadrootsReplicaPendingPublishBatch {
+ pub expected_count: usize,
+ pub pending_count: usize,
+ pub pending_events: Vec<RadrootsReplicaPendingPublishEvent>,
+}
+
pub fn radroots_replica_sync_status<E: SqlExecutor>(
exec: &E,
) -> Result<RadrootsReplicaSyncStatus, RadrootsReplicaEventsError> {
+ let batch = radroots_replica_pending_publish_batch(exec)?;
+ Ok(RadrootsReplicaSyncStatus {
+ expected_count: batch.expected_count,
+ pending_count: batch.pending_count,
+ })
+}
+
+pub fn radroots_replica_pending_publish_batch<E: SqlExecutor>(
+ exec: &E,
+) -> Result<RadrootsReplicaPendingPublishBatch, RadrootsReplicaEventsError> {
let farms =
radroots_replica_db::farm::find_many(exec, &IFarmFindMany { filter: None })?.results;
- let mut expected: BTreeMap<String, String> = BTreeMap::new();
+ let mut expected: BTreeMap<String, RadrootsReplicaPendingPublishEvent> = BTreeMap::new();
for farm in farms {
let selector = RadrootsReplicaFarmSelector {
@@ -34,11 +61,17 @@ pub fn radroots_replica_sync_status<E: SqlExecutor>(
for event in bundle.events {
let d_tag = tag_value(&event.tags, "d").unwrap_or("");
let key = event_state_key(event.kind, &event.author, d_tag);
- #[cfg(test)]
- let content_hash = event_content_hash(&event.content, &event.tags)?;
- #[cfg(not(test))]
- let content_hash = event_content_hash(&event.content, &event.tags);
- expected.entry(key).or_insert(content_hash);
+ let content_hash = draft_content_hash(&event)?;
+ expected
+ .entry(key.clone())
+ .or_insert(RadrootsReplicaPendingPublishEvent {
+ key,
+ kind: event.kind,
+ author: event.author.clone(),
+ d_tag: d_tag.to_string(),
+ content_hash,
+ draft: event,
+ });
}
}
@@ -54,23 +87,37 @@ pub fn radroots_replica_sync_status<E: SqlExecutor>(
state_map.insert(state.key, state.content_hash);
}
- let mut pending = 0;
- for (key, content_hash) in expected.iter() {
+ let mut pending_events = Vec::new();
+ for (key, event) in expected.iter() {
match state_map.get(key) {
- Some(existing) if existing == content_hash => {}
- _ => pending += 1,
+ Some(existing) if existing == &event.content_hash => {}
+ _ => pending_events.push(event.clone()),
}
}
- Ok(RadrootsReplicaSyncStatus {
+ Ok(RadrootsReplicaPendingPublishBatch {
expected_count: expected.len(),
- pending_count: pending,
+ pending_count: pending_events.len(),
+ pending_events,
})
}
+fn draft_content_hash(
+ event: &RadrootsReplicaEventDraft,
+) -> Result<String, RadrootsReplicaEventsError> {
+ #[cfg(test)]
+ {
+ event_content_hash(&event.content, &event.tags)
+ }
+ #[cfg(not(test))]
+ {
+ Ok(event_content_hash(&event.content, &event.tags))
+ }
+}
+
#[cfg(test)]
mod tests {
- use super::radroots_replica_sync_status;
+ use super::{radroots_replica_pending_publish_batch, radroots_replica_sync_status};
use crate::emit::radroots_replica_sync_all_with_options;
use crate::event_state::{
event_content_hash, event_content_hash_fail_next, event_state_key, tag_value,
@@ -143,6 +190,64 @@ mod tests {
}
#[test]
+ fn pending_publish_batch_lists_only_missing_or_changed_expected_events() {
+ let exec = SqliteExecutor::open_memory().expect("db");
+ migrations::run_all_up(&exec).expect("migrations");
+
+ let farm_row = farm::create(
+ &exec,
+ &IFarmFields {
+ d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
+ pubkey: "a".repeat(64),
+ name: "farm".to_string(),
+ about: None,
+ website: None,
+ picture: None,
+ banner: None,
+ location_primary: None,
+ location_city: None,
+ location_region: None,
+ location_country: None,
+ },
+ )
+ .expect("farm")
+ .result;
+
+ let selector = RadrootsReplicaFarmSelector {
+ id: Some(farm_row.id.clone()),
+ d_tag: None,
+ pubkey: None,
+ };
+ let bundle =
+ radroots_replica_sync_all_with_options(&exec, &selector, None).expect("bundle");
+ let first = bundle.events.first().expect("event");
+ let d_tag = tag_value(&first.tags, "d").unwrap_or("");
+ let key = event_state_key(first.kind, &first.author, d_tag);
+ let content_hash = event_content_hash(&first.content, &first.tags).expect("hash");
+ let fields = INostrEventStateFields {
+ key: key.clone(),
+ kind: first.kind,
+ pubkey: first.author.clone(),
+ d_tag: d_tag.to_string(),
+ last_event_id: format!("{:064x}", 1u64),
+ last_created_at: 1,
+ content_hash,
+ };
+ let _ = nostr_event_state::create(&exec, &fields).expect("state");
+
+ let batch = radroots_replica_pending_publish_batch(&exec).expect("batch");
+
+ assert_eq!(batch.expected_count, bundle.events.len());
+ assert_eq!(batch.pending_count, bundle.events.len().saturating_sub(1));
+ assert!(
+ batch
+ .pending_events
+ .iter()
+ .all(|event| event.key != key && event.content_hash.len() == 64)
+ );
+ }
+
+ #[test]
fn sync_status_reports_farm_query_errors() {
let exec = SqliteExecutor::open_memory().expect("db");
let err = radroots_replica_sync_status(&exec).expect_err("farm query error");