lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

ingest.rs (151924B)


      1 #[cfg(not(feature = "std"))]
      2 use alloc::format;
      3 #[cfg(not(feature = "std"))]
      4 use alloc::{
      5     string::{String, ToString},
      6     vec::Vec,
      7 };
      8 
      9 #[cfg(feature = "std")]
     10 use base64::Engine;
     11 #[cfg(feature = "std")]
     12 use base64::engine::general_purpose::URL_SAFE_NO_PAD;
     13 
     14 use radroots_core::RadrootsCoreDecimal;
     15 use radroots_events::RadrootsNostrEvent;
     16 use radroots_events::event_head::{
     17     RadrootsCurrentEventHead, RadrootsEventHeadCandidateResult, RadrootsEventHeadCoordinate,
     18     RadrootsEventHeadDecision as ProtocolEventHeadDecision, event_head_candidate_for_event,
     19     select_event_head,
     20 };
     21 use radroots_events::ids::RadrootsEventId;
     22 use radroots_events::kinds::{
     23     KIND_FARM, KIND_LISTING, KIND_PLOT, KIND_PROFILE, is_nip51_list_set_kind,
     24 };
     25 use radroots_events::listing::{
     26     RadrootsListing, RadrootsListingAvailability, RadrootsListingBin, RadrootsListingStatus,
     27 };
     28 use radroots_events_codec::farm::decode as farm_decode;
     29 use radroots_events_codec::list_set::decode as list_set_decode;
     30 use radroots_events_codec::listing::decode as listing_decode;
     31 use radroots_events_codec::plot::decode as plot_decode;
     32 use radroots_events_codec::profile::decode as profile_decode;
     33 use radroots_replica_db::{
     34     farm, farm_gcs_location, farm_member, farm_member_claim, farm_tag, gcs_location,
     35     nostr_event_head, nostr_profile, plot, plot_gcs_location, plot_tag, trade_product,
     36 };
     37 use radroots_replica_db_schema::farm::{
     38     FarmQueryBindValues, IFarmFields, IFarmFieldsFilter, IFarmFieldsPartial, IFarmFindMany,
     39     IFarmUpdate,
     40 };
     41 use radroots_replica_db_schema::farm_gcs_location::{
     42     FarmGcsLocationQueryBindValues, IFarmGcsLocationDelete, IFarmGcsLocationFields,
     43     IFarmGcsLocationFieldsFilter, IFarmGcsLocationFindMany, IFarmGcsLocationFindOneArgs,
     44 };
     45 use radroots_replica_db_schema::farm_member::{
     46     FarmMemberQueryBindValues, IFarmMemberDelete, IFarmMemberFields, IFarmMemberFieldsFilter,
     47     IFarmMemberFindMany, IFarmMemberFindOneArgs,
     48 };
     49 use radroots_replica_db_schema::farm_member_claim::{
     50     FarmMemberClaimQueryBindValues, IFarmMemberClaimDelete, IFarmMemberClaimFields,
     51     IFarmMemberClaimFieldsFilter, IFarmMemberClaimFindMany, IFarmMemberClaimFindOneArgs,
     52 };
     53 use radroots_replica_db_schema::farm_tag::{
     54     FarmTagQueryBindValues, IFarmTagDelete, IFarmTagFields, IFarmTagFieldsFilter, IFarmTagFindMany,
     55     IFarmTagFindOneArgs,
     56 };
     57 use radroots_replica_db_schema::gcs_location::IGcsLocationFields;
     58 use radroots_replica_db_schema::nostr_event_head::{
     59     INostrEventHeadFields, INostrEventHeadFieldsPartial, INostrEventHeadFindOne,
     60     INostrEventHeadFindOneArgs, INostrEventHeadUpdate, NostrEventHead,
     61     NostrEventHeadQueryBindValues,
     62 };
     63 use radroots_replica_db_schema::nostr_profile::{
     64     INostrProfileFields, INostrProfileFieldsPartial, INostrProfileFindOne,
     65     INostrProfileFindOneArgs, INostrProfileUpdate, NostrProfileQueryBindValues,
     66 };
     67 use radroots_replica_db_schema::plot::{
     68     IPlotFields, IPlotFieldsFilter, IPlotFieldsPartial, IPlotFindMany, IPlotUpdate,
     69     PlotQueryBindValues,
     70 };
     71 use radroots_replica_db_schema::plot_gcs_location::{
     72     IPlotGcsLocationDelete, IPlotGcsLocationFields, IPlotGcsLocationFieldsFilter,
     73     IPlotGcsLocationFindMany, IPlotGcsLocationFindOneArgs, PlotGcsLocationQueryBindValues,
     74 };
     75 use radroots_replica_db_schema::plot_tag::{
     76     IPlotTagDelete, IPlotTagFields, IPlotTagFieldsFilter, IPlotTagFindMany, IPlotTagFindOneArgs,
     77     PlotTagQueryBindValues,
     78 };
     79 use radroots_replica_db_schema::trade_product::{
     80     ITradeProductFields, ITradeProductFieldsFilter, ITradeProductFieldsPartial,
     81     ITradeProductFindMany, ITradeProductFindOne, ITradeProductFindOneArgs, ITradeProductUpdate,
     82     TradeProductQueryBindValues,
     83 };
     84 use radroots_sql_core::SqlExecutor;
     85 use radroots_sql_core::error::SqlError;
     86 use serde_json::{Value, json};
     87 
     88 use crate::error::RadrootsReplicaEventsError;
     89 use crate::event_head::{event_content_hash, event_head_key};
     90 const ROLE_PRIMARY: &str = "primary";
     91 const ROLE_MEMBER: &str = "member";
     92 const ROLE_OWNER: &str = "owner";
     93 const ROLE_WORKER: &str = "worker";
     94 
     95 #[cfg(test)]
     96 pub(crate) mod failpoints {
     97     use std::cell::Cell;
     98 
     99     thread_local! {
    100         static FORCE_GCS_POINT_SERIALIZE_ERROR: Cell<bool> = const { Cell::new(false) };
    101         static FORCE_GCS_POLYGON_SERIALIZE_ERROR: Cell<bool> = const { Cell::new(false) };
    102     }
    103 
    104     pub(crate) fn set_gcs_point_serialize_error() {
    105         FORCE_GCS_POINT_SERIALIZE_ERROR.with(|flag| flag.set(true));
    106     }
    107 
    108     pub(crate) fn take_gcs_point_serialize_error() -> bool {
    109         FORCE_GCS_POINT_SERIALIZE_ERROR.with(|flag| {
    110             let value = flag.get();
    111             flag.set(false);
    112             value
    113         })
    114     }
    115 
    116     pub(crate) fn set_gcs_polygon_serialize_error() {
    117         FORCE_GCS_POLYGON_SERIALIZE_ERROR.with(|flag| flag.set(true));
    118     }
    119 
    120     pub(crate) fn take_gcs_polygon_serialize_error() -> bool {
    121         FORCE_GCS_POLYGON_SERIALIZE_ERROR.with(|flag| {
    122             let value = flag.get();
    123             flag.set(false);
    124             value
    125         })
    126     }
    127 }
    128 
    129 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
    130 pub enum RadrootsReplicaIngestOutcome {
    131     Applied,
    132     Skipped,
    133 }
    134 
    135 pub trait RadrootsReplicaIdFactory {
    136     fn new_d_tag(&self) -> String;
    137 }
    138 
    139 #[cfg(feature = "std")]
    140 pub struct RadrootsReplicaDefaultIdFactory;
    141 
    142 #[cfg(feature = "std")]
    143 impl RadrootsReplicaIdFactory for RadrootsReplicaDefaultIdFactory {
    144     fn new_d_tag(&self) -> String {
    145         let uuid = uuid::Uuid::now_v7();
    146         let bytes = uuid.as_bytes();
    147         URL_SAFE_NO_PAD.encode(bytes)
    148     }
    149 }
    150 
    151 #[cfg(feature = "std")]
    152 pub fn radroots_replica_ingest_event(
    153     exec: &dyn SqlExecutor,
    154     event: &RadrootsNostrEvent,
    155 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    156     radroots_replica_ingest_event_with_factory(exec, event, &RadrootsReplicaDefaultIdFactory)
    157 }
    158 
    159 pub fn radroots_replica_ingest_event_with_factory(
    160     exec: &dyn SqlExecutor,
    161     event: &RadrootsNostrEvent,
    162     factory: &dyn RadrootsReplicaIdFactory,
    163 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    164     if let Err(err) = exec.begin() {
    165         return Err(RadrootsReplicaEventsError::from(
    166             radroots_types::types::IError::from(err),
    167         ));
    168     }
    169 
    170     match ingest_event_inner(exec, event, factory) {
    171         Ok(outcome) => {
    172             if let Err(err) = exec.commit() {
    173                 return Err(RadrootsReplicaEventsError::from(
    174                     radroots_types::types::IError::from(err),
    175                 ));
    176             }
    177             Ok(outcome)
    178         }
    179         Err(err) => {
    180             let _ = exec.rollback();
    181             Err(err)
    182         }
    183     }
    184 }
    185 
    186 fn ingest_event_inner(
    187     exec: &dyn SqlExecutor,
    188     event: &RadrootsNostrEvent,
    189     factory: &dyn RadrootsReplicaIdFactory,
    190 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    191     match event.kind {
    192         KIND_PROFILE => ingest_profile_event(exec, event),
    193         KIND_FARM => ingest_farm_event(exec, event, factory),
    194         KIND_PLOT => ingest_plot_event(exec, event, factory),
    195         KIND_LISTING => ingest_listing_event(exec, event),
    196         kind if is_nip51_list_set_kind(kind) => ingest_list_set_event(exec, event),
    197         _ => Err(RadrootsReplicaEventsError::InvalidData(format!(
    198             "unsupported kind {}",
    199             event.kind
    200         ))),
    201     }
    202 }
    203 
    204 fn ingest_profile_event(
    205     exec: &dyn SqlExecutor,
    206     event: &RadrootsNostrEvent,
    207 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    208     let data_result = profile_decode::data_from_event(
    209         event.id.clone(),
    210         event.author.clone(),
    211         event.created_at,
    212         event.kind,
    213         event.content.clone(),
    214         event.tags.clone(),
    215     );
    216     let data = data_result?;
    217     let profile_type = match data.data.profile_type {
    218         Some(profile_type) => profile_type,
    219         None => {
    220             return Err(RadrootsReplicaEventsError::InvalidData(
    221                 "profile_type required".to_string(),
    222             ));
    223         }
    224     };
    225 
    226     let decision = event_head_decision(exec, event)?;
    227     if !decision.apply {
    228         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    229     }
    230 
    231     let profile_type = match profile_type {
    232         radroots_events::profile::RadrootsProfileType::Individual => "individual",
    233         radroots_events::profile::RadrootsProfileType::Farm => "farm",
    234         radroots_events::profile::RadrootsProfileType::Coop => "coop",
    235         radroots_events::profile::RadrootsProfileType::Any => "any",
    236         radroots_events::profile::RadrootsProfileType::Radrootsd => "radrootsd",
    237     };
    238 
    239     let existing_result = nostr_profile::find_one(
    240         exec,
    241         &INostrProfileFindOne::On(INostrProfileFindOneArgs {
    242             on: NostrProfileQueryBindValues::PublicKey {
    243                 public_key: data.author.clone(),
    244             },
    245         }),
    246     );
    247     let existing = existing_result?.result;
    248 
    249     match existing {
    250         Some(profile) => {
    251             let fields = INostrProfileFieldsPartial {
    252                 public_key: None,
    253                 profile_type: Some(Value::from(profile_type)),
    254                 name: Some(Value::from(data.data.profile.name)),
    255                 display_name: to_value_opt(data.data.profile.display_name),
    256                 about: to_value_opt(data.data.profile.about),
    257                 website: to_value_opt(data.data.profile.website),
    258                 picture: to_value_opt(data.data.profile.picture),
    259                 banner: to_value_opt(data.data.profile.banner),
    260                 nip05: to_value_opt(data.data.profile.nip05),
    261                 lud06: to_value_opt(data.data.profile.lud06),
    262                 lud16: to_value_opt(data.data.profile.lud16),
    263             };
    264             let update_result = nostr_profile::update(
    265                 exec,
    266                 &INostrProfileUpdate {
    267                     on: NostrProfileQueryBindValues::Id { id: profile.id },
    268                     fields,
    269                 },
    270             );
    271             let _updated = update_result?;
    272         }
    273         None => {
    274             let fields = INostrProfileFields {
    275                 public_key: data.author.clone(),
    276                 profile_type: profile_type.to_string(),
    277                 name: data.data.profile.name,
    278                 display_name: data.data.profile.display_name,
    279                 about: data.data.profile.about,
    280                 website: data.data.profile.website,
    281                 picture: data.data.profile.picture,
    282                 banner: data.data.profile.banner,
    283                 nip05: data.data.profile.nip05,
    284                 lud06: data.data.profile.lud06,
    285                 lud16: data.data.profile.lud16,
    286             };
    287             let _ = nostr_profile::create(exec, &fields)?;
    288         }
    289     }
    290 
    291     upsert_event_head(exec, &decision)?;
    292     Ok(RadrootsReplicaIngestOutcome::Applied)
    293 }
    294 
    295 fn ingest_farm_event(
    296     exec: &dyn SqlExecutor,
    297     event: &RadrootsNostrEvent,
    298     factory: &dyn RadrootsReplicaIdFactory,
    299 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    300     let farm = farm_decode::farm_from_event(event.kind, &event.tags, &event.content)?;
    301     let decision = event_head_decision(exec, event)?;
    302     if !decision.apply {
    303         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    304     }
    305 
    306     let filter = IFarmFieldsFilter {
    307         id: None,
    308         created_at: None,
    309         updated_at: None,
    310         d_tag: Some(farm.d_tag.clone()),
    311         pubkey: Some(event.author.clone()),
    312         name: None,
    313         about: None,
    314         website: None,
    315         picture: None,
    316         banner: None,
    317         location_primary: None,
    318         location_city: None,
    319         location_region: None,
    320         location_country: None,
    321     };
    322     let existing_result = farm::find_many(
    323         exec,
    324         &IFarmFindMany {
    325             filter: Some(filter),
    326         },
    327     );
    328     let existing = existing_result?;
    329     let location = farm.location.clone();
    330     let (location_primary, location_city, location_region, location_country) =
    331         unpack_farm_location_strings(location.as_ref());
    332     let farm_id = if let Some(row) = existing.results.first() {
    333         let fields = IFarmFieldsPartial {
    334             d_tag: Some(Value::from(farm.d_tag.clone())),
    335             pubkey: Some(Value::from(event.author.clone())),
    336             name: Some(Value::from(farm.name.clone())),
    337             about: to_value_opt(farm.about.clone()),
    338             website: to_value_opt(farm.website.clone()),
    339             picture: to_value_opt(farm.picture.clone()),
    340             banner: to_value_opt(farm.banner.clone()),
    341             location_primary: to_value_opt(location_primary),
    342             location_city: to_value_opt(location_city),
    343             location_region: to_value_opt(location_region),
    344             location_country: to_value_opt(location_country),
    345         };
    346         let update_result = farm::update(
    347             exec,
    348             &IFarmUpdate {
    349                 on: FarmQueryBindValues::Id { id: row.id.clone() },
    350                 fields,
    351             },
    352         );
    353         let _updated = update_result?;
    354         row.id.clone()
    355     } else {
    356         let fields = IFarmFields {
    357             d_tag: farm.d_tag.clone(),
    358             pubkey: event.author.clone(),
    359             name: farm.name.clone(),
    360             about: farm.about.clone(),
    361             website: farm.website.clone(),
    362             picture: farm.picture.clone(),
    363             banner: farm.banner.clone(),
    364             location_primary,
    365             location_city,
    366             location_region,
    367             location_country,
    368         };
    369         farm::create(exec, &fields)?.result.id
    370     };
    371 
    372     upsert_farm_tags(exec, &farm_id, farm.tags)?;
    373     upsert_farm_location(exec, &farm_id, location, factory)?;
    374 
    375     upsert_event_head(exec, &decision)?;
    376     Ok(RadrootsReplicaIngestOutcome::Applied)
    377 }
    378 
    379 fn ingest_plot_event(
    380     exec: &dyn SqlExecutor,
    381     event: &RadrootsNostrEvent,
    382     factory: &dyn RadrootsReplicaIdFactory,
    383 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    384     let plot = plot_decode::plot_from_event(event.kind, &event.tags, &event.content)?;
    385     let decision = event_head_decision(exec, event)?;
    386     if !decision.apply {
    387         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    388     }
    389 
    390     let farm = find_farm_by_ref(exec, &plot.farm.pubkey, &plot.farm.d_tag)?;
    391     let filter = IPlotFieldsFilter {
    392         id: None,
    393         created_at: None,
    394         updated_at: None,
    395         d_tag: Some(plot.d_tag.clone()),
    396         farm_id: Some(farm.id.clone()),
    397         name: None,
    398         about: None,
    399         location_primary: None,
    400         location_city: None,
    401         location_region: None,
    402         location_country: None,
    403     };
    404     let existing_result = plot::find_many(
    405         exec,
    406         &IPlotFindMany {
    407             filter: Some(filter),
    408         },
    409     );
    410     let existing = existing_result?;
    411     let location = plot.location.clone();
    412     let (location_primary, location_city, location_region, location_country) =
    413         unpack_plot_location_strings(location.as_ref());
    414     let plot_id = if let Some(row) = existing.results.first() {
    415         let fields = IPlotFieldsPartial {
    416             d_tag: Some(Value::from(plot.d_tag.clone())),
    417             farm_id: Some(Value::from(farm.id.clone())),
    418             name: Some(Value::from(plot.name.clone())),
    419             about: to_value_opt(plot.about.clone()),
    420             location_primary: to_value_opt(location_primary),
    421             location_city: to_value_opt(location_city),
    422             location_region: to_value_opt(location_region),
    423             location_country: to_value_opt(location_country),
    424         };
    425         let update_result = plot::update(
    426             exec,
    427             &IPlotUpdate {
    428                 on: PlotQueryBindValues::Id { id: row.id.clone() },
    429                 fields,
    430             },
    431         );
    432         let _updated = update_result?;
    433         row.id.clone()
    434     } else {
    435         let fields = IPlotFields {
    436             d_tag: plot.d_tag.clone(),
    437             farm_id: farm.id.clone(),
    438             name: plot.name.clone(),
    439             about: plot.about.clone(),
    440             location_primary,
    441             location_city,
    442             location_region,
    443             location_country,
    444         };
    445         plot::create(exec, &fields)?.result.id
    446     };
    447 
    448     upsert_plot_tags(exec, &plot_id, plot.tags)?;
    449     upsert_plot_location(exec, &plot_id, location, factory)?;
    450 
    451     upsert_event_head(exec, &decision)?;
    452     Ok(RadrootsReplicaIngestOutcome::Applied)
    453 }
    454 
    455 fn ingest_listing_event(
    456     exec: &dyn SqlExecutor,
    457     event: &RadrootsNostrEvent,
    458 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    459     let listing = listing_decode::listing_from_event(event.kind, &event.tags, &event.content)?;
    460     let decision = event_head_decision(exec, event)?;
    461     if !decision.apply {
    462         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    463     }
    464 
    465     let listing_addr = listing_event_addr(event, &listing);
    466     if listing_is_orderable(&listing) {
    467         let fields = trade_product_fields_from_listing(&listing, &listing_addr)?;
    468         upsert_trade_product_for_listing_addr(exec, &listing_addr, fields)?;
    469     } else {
    470         delete_trade_products_for_listing_addr(exec, &listing_addr)?;
    471     }
    472 
    473     upsert_event_head(exec, &decision)?;
    474     Ok(RadrootsReplicaIngestOutcome::Applied)
    475 }
    476 
    477 fn ingest_list_set_event(
    478     exec: &dyn SqlExecutor,
    479     event: &RadrootsNostrEvent,
    480 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    481     if event.kind != radroots_events::kinds::KIND_LIST_SET_GENERIC {
    482         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    483     }
    484     let list_set =
    485         list_set_decode::list_set_from_tags(event.kind, event.content.clone(), &event.tags)?;
    486 
    487     let metadata_count = usize::from(list_set.title.is_some())
    488         + usize::from(list_set.description.is_some())
    489         + usize::from(list_set.image.is_some());
    490     if metadata_count != 0 {
    491         return Err(RadrootsReplicaEventsError::InvalidData(
    492             "domain:farm list sets must omit metadata".to_string(),
    493         ));
    494     }
    495     if !list_set.content.is_empty() {
    496         return Err(RadrootsReplicaEventsError::InvalidData(
    497             "domain:farm list sets must not include content".to_string(),
    498         ));
    499     }
    500 
    501     let d_tag = list_set.d_tag.clone();
    502 
    503     if d_tag == "member_of.farms" {
    504         ensure_list_set_entries_tag(&list_set, "p", "member_of.farms")?;
    505         let decision = event_head_decision(exec, event)?;
    506         if !decision.apply {
    507             return Ok(RadrootsReplicaIngestOutcome::Skipped);
    508         }
    509         upsert_member_claims(exec, &event.author, &list_set)?;
    510         upsert_event_head(exec, &decision)?;
    511         return Ok(RadrootsReplicaIngestOutcome::Applied);
    512     }
    513 
    514     if let Some((farm_d_tag, role)) = parse_farm_list_set_d_tag(&d_tag) {
    515         if role == ListSetRole::Plots {
    516             ensure_list_set_entries_tag(&list_set, "a", "farm plots")?;
    517             let decision = event_head_decision(exec, event)?;
    518             if !decision.apply {
    519                 return Ok(RadrootsReplicaIngestOutcome::Skipped);
    520             }
    521             upsert_event_head(exec, &decision)?;
    522             return Ok(RadrootsReplicaIngestOutcome::Applied);
    523         }
    524         ensure_list_set_entries_tag(&list_set, "p", "farm members")?;
    525         let decision = event_head_decision(exec, event)?;
    526         if !decision.apply {
    527             return Ok(RadrootsReplicaIngestOutcome::Skipped);
    528         }
    529         let farm = find_farm_by_ref(exec, &event.author, &farm_d_tag)?;
    530         upsert_farm_members(exec, &farm.id, role, &list_set)?;
    531         upsert_event_head(exec, &decision)?;
    532         return Ok(RadrootsReplicaIngestOutcome::Applied);
    533     }
    534 
    535     Err(RadrootsReplicaEventsError::InvalidData(
    536         "unsupported list set d_tag".to_string(),
    537     ))
    538 }
    539 
    540 fn listing_event_addr(event: &RadrootsNostrEvent, listing: &RadrootsListing) -> String {
    541     format!("{}:{}:{}", event.kind, event.author, listing.d_tag)
    542 }
    543 
    544 fn listing_is_orderable(listing: &RadrootsListing) -> bool {
    545     match listing.availability.as_ref() {
    546         Some(RadrootsListingAvailability::Status { status }) => {
    547             matches!(status, RadrootsListingStatus::Active)
    548         }
    549         Some(RadrootsListingAvailability::Window { .. }) | None => true,
    550     }
    551 }
    552 
    553 fn trade_product_fields_from_listing(
    554     listing: &RadrootsListing,
    555     listing_addr: &str,
    556 ) -> Result<ITradeProductFields, RadrootsReplicaEventsError> {
    557     let bin = primary_listing_bin(listing)?;
    558     let qty_amt = decimal_to_f64(&bin.quantity.amount, "listing primary bin quantity")?;
    559     let qty_amt_exact = bin.quantity.amount.to_string();
    560     let qty_avail = listing
    561         .inventory_available
    562         .as_ref()
    563         .map(|amount| decimal_to_i64(amount, "listing inventory"))
    564         .transpose()?;
    565     let price_source = bin
    566         .display_price
    567         .as_ref()
    568         .unwrap_or(&bin.price_per_canonical_unit.amount);
    569     let Some(price_amt) = price_source.amount.to_f64_lossy() else {
    570         return Err(RadrootsReplicaEventsError::InvalidData(
    571             "listing price amount out of range".to_string(),
    572         ));
    573     };
    574     let price_amt_exact = price_source.amount.to_string();
    575     let price_currency = price_source.currency.as_str().to_string();
    576     let price_qty_amt = if bin.display_price.is_some() {
    577         1.0
    578     } else {
    579         decimal_to_f64(
    580             &bin.price_per_canonical_unit.quantity.amount,
    581             "listing price quantity",
    582         )?
    583     };
    584     let price_qty_amt_exact = if bin.display_price.is_some() {
    585         "1".to_string()
    586     } else {
    587         bin.price_per_canonical_unit.quantity.amount.to_string()
    588     };
    589     let price_qty_unit = bin
    590         .display_price_unit
    591         .unwrap_or(bin.price_per_canonical_unit.quantity.unit)
    592         .to_string();
    593 
    594     Ok(ITradeProductFields {
    595         key: listing.product.key.clone(),
    596         category: listing.product.category.clone(),
    597         title: listing.product.title.clone(),
    598         summary: listing.product.summary.clone().unwrap_or_default(),
    599         process: listing.product.process.clone().unwrap_or_default(),
    600         lot: listing.product.lot.clone().unwrap_or_default(),
    601         profile: listing.product.profile.clone().unwrap_or_default(),
    602         year: listing
    603             .product
    604             .year
    605             .as_deref()
    606             .and_then(|value| value.parse::<i64>().ok())
    607             .unwrap_or_default(),
    608         qty_amt,
    609         qty_amt_exact,
    610         qty_unit: bin.quantity.unit.to_string(),
    611         qty_label: bin
    612             .display_label
    613             .clone()
    614             .or_else(|| bin.quantity.label.clone()),
    615         qty_avail,
    616         price_amt,
    617         price_amt_exact,
    618         price_currency,
    619         price_qty_amt,
    620         price_qty_amt_exact,
    621         price_qty_unit,
    622         listing_addr: Some(listing_addr.to_string()),
    623         primary_bin_id: Some(listing.primary_bin_id.to_string()),
    624         verified_primary_bin_id: Some(listing.primary_bin_id.to_string()),
    625         notes: trade_product_notes_from_listing(listing)?,
    626     })
    627 }
    628 
    629 fn trade_product_notes_from_listing(
    630     listing: &RadrootsListing,
    631 ) -> Result<Option<String>, RadrootsReplicaEventsError> {
    632     let Some(discounts) = listing
    633         .discounts
    634         .as_ref()
    635         .filter(|discounts| !discounts.is_empty())
    636     else {
    637         return Ok(None);
    638     };
    639     match serde_json::to_string(&json!({ "listing_discounts": discounts })) {
    640         Ok(notes) => Ok(Some(notes)),
    641         Err(error) => Err(RadrootsReplicaEventsError::InvalidData(format!(
    642             "listing discounts could not be serialized: {error}"
    643         ))),
    644     }
    645 }
    646 
    647 fn primary_listing_bin(
    648     listing: &RadrootsListing,
    649 ) -> Result<&RadrootsListingBin, RadrootsReplicaEventsError> {
    650     listing
    651         .bins
    652         .iter()
    653         .find(|bin| bin.bin_id == listing.primary_bin_id)
    654         .ok_or_else(|| {
    655             RadrootsReplicaEventsError::InvalidData(
    656                 "listing primary bin missing from bins".to_string(),
    657             )
    658         })
    659 }
    660 
    661 fn decimal_to_i64(
    662     value: &RadrootsCoreDecimal,
    663     field: &str,
    664 ) -> Result<i64, RadrootsReplicaEventsError> {
    665     let value = decimal_to_u64(value, field)?;
    666     match i64::try_from(value) {
    667         Ok(value) => Ok(value),
    668         Err(_) => Err(RadrootsReplicaEventsError::InvalidData(format!(
    669             "{field} exceeds i64 range"
    670         ))),
    671     }
    672 }
    673 
    674 fn decimal_to_f64(
    675     value: &RadrootsCoreDecimal,
    676     field: &str,
    677 ) -> Result<f64, RadrootsReplicaEventsError> {
    678     match value.to_f64_lossy() {
    679         Some(value) => Ok(value),
    680         None => Err(RadrootsReplicaEventsError::InvalidData(format!(
    681             "{field} exceeds f64 range"
    682         ))),
    683     }
    684 }
    685 
    686 fn decimal_to_u64(
    687     value: &RadrootsCoreDecimal,
    688     field: &str,
    689 ) -> Result<u64, RadrootsReplicaEventsError> {
    690     match value.to_u64_exact() {
    691         Some(value) => Ok(value),
    692         None => Err(RadrootsReplicaEventsError::InvalidData(format!(
    693             "{field} must be a whole number"
    694         ))),
    695     }
    696 }
    697 
    698 fn trade_product_listing_addr_filter(listing_addr: &str) -> ITradeProductFieldsFilter {
    699     ITradeProductFieldsFilter {
    700         id: None,
    701         created_at: None,
    702         updated_at: None,
    703         key: None,
    704         category: None,
    705         title: None,
    706         summary: None,
    707         process: None,
    708         lot: None,
    709         profile: None,
    710         year: None,
    711         qty_amt: None,
    712         qty_amt_exact: None,
    713         qty_unit: None,
    714         qty_label: None,
    715         qty_avail: None,
    716         price_amt: None,
    717         price_amt_exact: None,
    718         price_currency: None,
    719         price_qty_amt: None,
    720         price_qty_amt_exact: None,
    721         price_qty_unit: None,
    722         listing_addr: Some(listing_addr.to_string()),
    723         primary_bin_id: None,
    724         verified_primary_bin_id: None,
    725         notes: None,
    726     }
    727 }
    728 
    729 fn upsert_trade_product_for_listing_addr(
    730     exec: &dyn SqlExecutor,
    731     listing_addr: &str,
    732     fields: ITradeProductFields,
    733 ) -> Result<(), RadrootsReplicaEventsError> {
    734     let existing = trade_product::find_many(
    735         exec,
    736         &ITradeProductFindMany {
    737             filter: Some(trade_product_listing_addr_filter(listing_addr)),
    738         },
    739     )?
    740     .results;
    741 
    742     if let Some(row) = existing.first() {
    743         let update = ITradeProductUpdate {
    744             on: TradeProductQueryBindValues::Id { id: row.id.clone() },
    745             fields: trade_product_partial_from_fields(&fields),
    746         };
    747         let _ = trade_product::update(exec, &update)?;
    748         for duplicate in existing.iter().skip(1) {
    749             delete_trade_product_by_id(exec, &duplicate.id)?;
    750         }
    751     } else {
    752         let _ = trade_product::create(exec, &fields)?;
    753     }
    754 
    755     Ok(())
    756 }
    757 
    758 fn delete_trade_products_for_listing_addr(
    759     exec: &dyn SqlExecutor,
    760     listing_addr: &str,
    761 ) -> Result<(), RadrootsReplicaEventsError> {
    762     let existing = trade_product::find_many(
    763         exec,
    764         &ITradeProductFindMany {
    765             filter: Some(trade_product_listing_addr_filter(listing_addr)),
    766         },
    767     )?
    768     .results;
    769 
    770     for row in existing {
    771         delete_trade_product_by_id(exec, &row.id)?;
    772     }
    773 
    774     Ok(())
    775 }
    776 
    777 fn delete_trade_product_by_id(
    778     exec: &dyn SqlExecutor,
    779     id: &str,
    780 ) -> Result<(), RadrootsReplicaEventsError> {
    781     let _ = trade_product::delete(
    782         exec,
    783         &ITradeProductFindOne::On(ITradeProductFindOneArgs {
    784             on: TradeProductQueryBindValues::Id { id: id.to_string() },
    785         }),
    786     )?;
    787     Ok(())
    788 }
    789 
    790 fn trade_product_partial_from_fields(fields: &ITradeProductFields) -> ITradeProductFieldsPartial {
    791     ITradeProductFieldsPartial {
    792         key: Some(Value::from(fields.key.clone())),
    793         category: Some(Value::from(fields.category.clone())),
    794         title: Some(Value::from(fields.title.clone())),
    795         summary: Some(Value::from(fields.summary.clone())),
    796         process: Some(Value::from(fields.process.clone())),
    797         lot: Some(Value::from(fields.lot.clone())),
    798         profile: Some(Value::from(fields.profile.clone())),
    799         year: Some(Value::from(fields.year)),
    800         qty_amt: Some(Value::from(fields.qty_amt)),
    801         qty_amt_exact: Some(Value::from(fields.qty_amt_exact.clone())),
    802         qty_unit: Some(Value::from(fields.qty_unit.clone())),
    803         qty_label: to_value_opt(fields.qty_label.clone()),
    804         qty_avail: fields.qty_avail.map(Value::from).or(Some(Value::Null)),
    805         price_amt: Some(Value::from(fields.price_amt)),
    806         price_amt_exact: Some(Value::from(fields.price_amt_exact.clone())),
    807         price_currency: Some(Value::from(fields.price_currency.clone())),
    808         price_qty_amt: Some(Value::from(fields.price_qty_amt)),
    809         price_qty_amt_exact: Some(Value::from(fields.price_qty_amt_exact.clone())),
    810         price_qty_unit: Some(Value::from(fields.price_qty_unit.clone())),
    811         listing_addr: to_value_opt(fields.listing_addr.clone()),
    812         primary_bin_id: to_value_opt(fields.primary_bin_id.clone()),
    813         verified_primary_bin_id: to_value_opt(fields.verified_primary_bin_id.clone()),
    814         notes: to_value_opt(fields.notes.clone()),
    815     }
    816 }
    817 
    818 pub fn radroots_replica_ingest_event_head(
    819     exec: &dyn SqlExecutor,
    820     event: &RadrootsNostrEvent,
    821 ) -> Result<RadrootsReplicaIngestOutcome, RadrootsReplicaEventsError> {
    822     let decision = event_head_decision(exec, event)?;
    823     if !decision.apply {
    824         return Ok(RadrootsReplicaIngestOutcome::Skipped);
    825     }
    826     upsert_event_head(exec, &decision)?;
    827     Ok(RadrootsReplicaIngestOutcome::Applied)
    828 }
    829 
    830 fn upsert_event_head(
    831     exec: &dyn SqlExecutor,
    832     decision: &EventHeadDecision,
    833 ) -> Result<(), RadrootsReplicaEventsError> {
    834     let existing_result = nostr_event_head::find_one(
    835         exec,
    836         &INostrEventHeadFindOne::On(INostrEventHeadFindOneArgs {
    837             on: NostrEventHeadQueryBindValues::Key {
    838                 key: decision.key.clone(),
    839             },
    840         }),
    841     );
    842     let existing = existing_result?.result;
    843 
    844     match existing {
    845         Some(state) => {
    846             let fields = INostrEventHeadFieldsPartial {
    847                 key: None,
    848                 kind: None,
    849                 pubkey: None,
    850                 d_tag: None,
    851                 last_event_id: Some(Value::from(decision.last_event_id.clone())),
    852                 last_created_at: Some(Value::from(decision.last_created_at)),
    853                 content_hash: Some(Value::from(decision.content_hash.clone())),
    854             };
    855             let update_result = nostr_event_head::update(
    856                 exec,
    857                 &INostrEventHeadUpdate {
    858                     on: NostrEventHeadQueryBindValues::Id { id: state.id },
    859                     fields,
    860                 },
    861             );
    862             let _updated = update_result?;
    863         }
    864         None => {
    865             let fields = INostrEventHeadFields {
    866                 key: decision.key.clone(),
    867                 kind: decision.kind,
    868                 pubkey: decision.pubkey.clone(),
    869                 d_tag: decision.d_tag.clone(),
    870                 last_event_id: decision.last_event_id.clone(),
    871                 last_created_at: decision.last_created_at,
    872                 content_hash: decision.content_hash.clone(),
    873             };
    874             let _ = nostr_event_head::create(exec, &fields)?;
    875         }
    876     }
    877 
    878     Ok(())
    879 }
    880 
    881 fn event_head_decision(
    882     exec: &dyn SqlExecutor,
    883     event: &RadrootsNostrEvent,
    884 ) -> Result<EventHeadDecision, RadrootsReplicaEventsError> {
    885     let candidate_result = match event_head_candidate_for_event(event) {
    886         Ok(candidate) => candidate,
    887         Err(err) => {
    888             return Err(RadrootsReplicaEventsError::InvalidData(format!(
    889                 "event head contract mismatch: {err:?}"
    890             )));
    891         }
    892     };
    893     let candidate = match candidate_result {
    894         RadrootsEventHeadCandidateResult::Candidate(candidate) => candidate,
    895         RadrootsEventHeadCandidateResult::NotHeadSelected => {
    896             return Err(RadrootsReplicaEventsError::InvalidData(
    897                 "event is not head-selected".to_string(),
    898             ));
    899         }
    900         RadrootsEventHeadCandidateResult::NotPersisted => {
    901             return Ok(EventHeadDecision {
    902                 apply: false,
    903                 key: String::new(),
    904                 kind: event.kind,
    905                 pubkey: event.author.clone(),
    906                 d_tag: String::new(),
    907                 last_event_id: event.id.clone(),
    908                 last_created_at: event.created_at,
    909                 content_hash: String::new(),
    910             });
    911         }
    912         RadrootsEventHeadCandidateResult::Malformed(err) => {
    913             return Err(RadrootsReplicaEventsError::InvalidData(format!(
    914                 "malformed event head: {err:?}"
    915             )));
    916         }
    917     };
    918     let (key, kind, pubkey, d_tag) = event_head_coordinate_fields(&candidate.coordinate);
    919     #[cfg(test)]
    920     let content_hash = event_content_hash(&event.content, &event.tags)?;
    921     #[cfg(not(test))]
    922     let content_hash = event_content_hash(&event.content, &event.tags);
    923     let existing_result = nostr_event_head::find_one(
    924         exec,
    925         &INostrEventHeadFindOne::On(INostrEventHeadFindOneArgs {
    926             on: NostrEventHeadQueryBindValues::Key { key: key.clone() },
    927         }),
    928     );
    929     let existing = existing_result?.result;
    930     let current = existing
    931         .as_ref()
    932         .map(|state| current_event_head_from_row(state, &candidate.coordinate))
    933         .transpose()?;
    934 
    935     let decision = select_event_head(candidate, current.as_ref());
    936     let apply = match decision {
    937         ProtocolEventHeadDecision::Applied(_) => true,
    938         ProtocolEventHeadDecision::SkippedDuplicate
    939         | ProtocolEventHeadDecision::SkippedOlder
    940         | ProtocolEventHeadDecision::SkippedSameTimestampHigherEventId => false,
    941         ProtocolEventHeadDecision::CoordinateMismatch => {
    942             return Err(RadrootsReplicaEventsError::InvalidData(
    943                 "event head coordinate mismatch".to_string(),
    944             ));
    945         }
    946     };
    947 
    948     Ok(EventHeadDecision {
    949         apply,
    950         key,
    951         kind,
    952         pubkey,
    953         d_tag,
    954         last_event_id: event.id.clone(),
    955         last_created_at: event.created_at,
    956         content_hash,
    957     })
    958 }
    959 
    960 fn current_event_head_from_row(
    961     row: &NostrEventHead,
    962     coordinate: &RadrootsEventHeadCoordinate,
    963 ) -> Result<RadrootsCurrentEventHead, RadrootsReplicaEventsError> {
    964     let event_id = RadrootsEventId::parse(&row.last_event_id).map_err(|err| {
    965         RadrootsReplicaEventsError::InvalidData(format!(
    966             "nostr event head last_event_id invalid: {err}"
    967         ))
    968     })?;
    969     Ok(RadrootsCurrentEventHead {
    970         coordinate: coordinate.clone(),
    971         event_id,
    972         created_at: row.last_created_at,
    973     })
    974 }
    975 
    976 fn event_head_coordinate_fields(
    977     coordinate: &RadrootsEventHeadCoordinate,
    978 ) -> (String, u32, String, String) {
    979     match coordinate {
    980         RadrootsEventHeadCoordinate::Replaceable { kind, pubkey } => {
    981             let pubkey = pubkey.to_string();
    982             (
    983                 event_head_key(*kind, &pubkey, ""),
    984                 *kind,
    985                 pubkey,
    986                 String::new(),
    987             )
    988         }
    989         RadrootsEventHeadCoordinate::Addressable {
    990             kind,
    991             pubkey,
    992             d_tag,
    993         } => {
    994             let pubkey = pubkey.to_string();
    995             let d_tag = d_tag.to_string();
    996             (event_head_key(*kind, &pubkey, &d_tag), *kind, pubkey, d_tag)
    997         }
    998     }
    999 }
   1000 
   1001 fn find_farm_by_ref(
   1002     exec: &dyn SqlExecutor,
   1003     pubkey: &str,
   1004     d_tag: &str,
   1005 ) -> Result<radroots_replica_db_schema::farm::Farm, RadrootsReplicaEventsError> {
   1006     let filter = IFarmFieldsFilter {
   1007         id: None,
   1008         created_at: None,
   1009         updated_at: None,
   1010         d_tag: Some(d_tag.to_string()),
   1011         pubkey: Some(pubkey.to_string()),
   1012         name: None,
   1013         about: None,
   1014         website: None,
   1015         picture: None,
   1016         banner: None,
   1017         location_primary: None,
   1018         location_city: None,
   1019         location_region: None,
   1020         location_country: None,
   1021     };
   1022     let result_query = farm::find_many(
   1023         exec,
   1024         &IFarmFindMany {
   1025             filter: Some(filter),
   1026         },
   1027     );
   1028     let result = result_query?;
   1029     match result.results.into_iter().next() {
   1030         Some(farm) => Ok(farm),
   1031         None => Err(RadrootsReplicaEventsError::InvalidData(
   1032             "farm not found".to_string(),
   1033         )),
   1034     }
   1035 }
   1036 
   1037 fn upsert_farm_tags(
   1038     exec: &dyn SqlExecutor,
   1039     farm_id: &str,
   1040     tags: Option<Vec<String>>,
   1041 ) -> Result<(), RadrootsReplicaEventsError> {
   1042     let existing_query = farm_tag::find_many(
   1043         exec,
   1044         &IFarmTagFindMany {
   1045             filter: Some(IFarmTagFieldsFilter {
   1046                 id: None,
   1047                 created_at: None,
   1048                 updated_at: None,
   1049                 farm_id: Some(farm_id.to_string()),
   1050                 tag: None,
   1051             }),
   1052         },
   1053     );
   1054     let existing = existing_query?;
   1055     for row in existing.results {
   1056         handle_delete_result(farm_tag::delete(
   1057             exec,
   1058             &IFarmTagDelete::On(IFarmTagFindOneArgs {
   1059                 on: FarmTagQueryBindValues::Id { id: row.id },
   1060             }),
   1061         ))?;
   1062     }
   1063 
   1064     let mut tags = tags.unwrap_or_default();
   1065     tags.sort();
   1066     tags.dedup();
   1067     for tag in tags {
   1068         if tag.trim().is_empty() {
   1069             continue;
   1070         }
   1071         let fields = IFarmTagFields {
   1072             farm_id: farm_id.to_string(),
   1073             tag,
   1074         };
   1075         let _ = farm_tag::create(exec, &fields)?;
   1076     }
   1077     Ok(())
   1078 }
   1079 
   1080 fn upsert_plot_tags(
   1081     exec: &dyn SqlExecutor,
   1082     plot_id: &str,
   1083     tags: Option<Vec<String>>,
   1084 ) -> Result<(), RadrootsReplicaEventsError> {
   1085     let existing_query = plot_tag::find_many(
   1086         exec,
   1087         &IPlotTagFindMany {
   1088             filter: Some(IPlotTagFieldsFilter {
   1089                 id: None,
   1090                 created_at: None,
   1091                 updated_at: None,
   1092                 plot_id: Some(plot_id.to_string()),
   1093                 tag: None,
   1094             }),
   1095         },
   1096     );
   1097     let existing = existing_query?;
   1098     for row in existing.results {
   1099         handle_delete_result(plot_tag::delete(
   1100             exec,
   1101             &IPlotTagDelete::On(IPlotTagFindOneArgs {
   1102                 on: PlotTagQueryBindValues::Id { id: row.id },
   1103             }),
   1104         ))?;
   1105     }
   1106 
   1107     let mut tags = tags.unwrap_or_default();
   1108     tags.sort();
   1109     tags.dedup();
   1110     for tag in tags {
   1111         if tag.trim().is_empty() {
   1112             continue;
   1113         }
   1114         let fields = IPlotTagFields {
   1115             plot_id: plot_id.to_string(),
   1116             tag,
   1117         };
   1118         let _ = plot_tag::create(exec, &fields)?;
   1119     }
   1120     Ok(())
   1121 }
   1122 
   1123 fn upsert_farm_location(
   1124     exec: &dyn SqlExecutor,
   1125     farm_id: &str,
   1126     location: Option<radroots_events::farm::RadrootsFarmLocation>,
   1127     factory: &dyn RadrootsReplicaIdFactory,
   1128 ) -> Result<(), RadrootsReplicaEventsError> {
   1129     clear_farm_locations(exec, farm_id)?;
   1130     if let Some(location) = location
   1131         && let Some(gcs) = location.gcs
   1132     {
   1133         let gcs_id = create_gcs_location(exec, gcs, factory)?;
   1134         let fields = IFarmGcsLocationFields {
   1135             farm_id: farm_id.to_string(),
   1136             gcs_location_id: gcs_id,
   1137             role: ROLE_PRIMARY.to_string(),
   1138         };
   1139         let _ = farm_gcs_location::create(exec, &fields)?;
   1140     }
   1141     Ok(())
   1142 }
   1143 
   1144 fn upsert_plot_location(
   1145     exec: &dyn SqlExecutor,
   1146     plot_id: &str,
   1147     location: Option<radroots_events::plot::RadrootsPlotLocation>,
   1148     factory: &dyn RadrootsReplicaIdFactory,
   1149 ) -> Result<(), RadrootsReplicaEventsError> {
   1150     clear_plot_locations(exec, plot_id)?;
   1151     if let Some(location) = location {
   1152         let gcs_id = create_gcs_location(exec, location.gcs, factory)?;
   1153         let fields = IPlotGcsLocationFields {
   1154             plot_id: plot_id.to_string(),
   1155             gcs_location_id: gcs_id,
   1156             role: ROLE_PRIMARY.to_string(),
   1157         };
   1158         let _ = plot_gcs_location::create(exec, &fields)?;
   1159     }
   1160     Ok(())
   1161 }
   1162 
   1163 fn clear_farm_locations(
   1164     exec: &dyn SqlExecutor,
   1165     farm_id: &str,
   1166 ) -> Result<(), RadrootsReplicaEventsError> {
   1167     let existing_query = farm_gcs_location::find_many(
   1168         exec,
   1169         &IFarmGcsLocationFindMany {
   1170             filter: Some(IFarmGcsLocationFieldsFilter {
   1171                 id: None,
   1172                 created_at: None,
   1173                 updated_at: None,
   1174                 farm_id: Some(farm_id.to_string()),
   1175                 gcs_location_id: None,
   1176                 role: None,
   1177             }),
   1178         },
   1179     );
   1180     let existing = existing_query?;
   1181     for row in existing.results {
   1182         handle_delete_result(farm_gcs_location::delete(
   1183             exec,
   1184             &IFarmGcsLocationDelete::On(IFarmGcsLocationFindOneArgs {
   1185                 on: FarmGcsLocationQueryBindValues::Id { id: row.id },
   1186             }),
   1187         ))?;
   1188     }
   1189     Ok(())
   1190 }
   1191 
   1192 fn clear_plot_locations(
   1193     exec: &dyn SqlExecutor,
   1194     plot_id: &str,
   1195 ) -> Result<(), RadrootsReplicaEventsError> {
   1196     let existing_query = plot_gcs_location::find_many(
   1197         exec,
   1198         &IPlotGcsLocationFindMany {
   1199             filter: Some(IPlotGcsLocationFieldsFilter {
   1200                 id: None,
   1201                 created_at: None,
   1202                 updated_at: None,
   1203                 plot_id: Some(plot_id.to_string()),
   1204                 gcs_location_id: None,
   1205                 role: None,
   1206             }),
   1207         },
   1208     );
   1209     let existing = existing_query?;
   1210     for row in existing.results {
   1211         handle_delete_result(plot_gcs_location::delete(
   1212             exec,
   1213             &IPlotGcsLocationDelete::On(IPlotGcsLocationFindOneArgs {
   1214                 on: PlotGcsLocationQueryBindValues::Id { id: row.id },
   1215             }),
   1216         ))?;
   1217     }
   1218     Ok(())
   1219 }
   1220 
   1221 fn create_gcs_location(
   1222     exec: &dyn SqlExecutor,
   1223     gcs: radroots_events::farm::RadrootsGcsLocation,
   1224     factory: &dyn RadrootsReplicaIdFactory,
   1225 ) -> Result<String, RadrootsReplicaEventsError> {
   1226     let d_tag = factory.new_d_tag();
   1227     #[cfg(test)]
   1228     let point = serialize_gcs_point(&gcs.point).map_err(map_gcs_point_serialize_error)?;
   1229     #[cfg(not(test))]
   1230     let point = serialize_gcs_point(&gcs.point);
   1231 
   1232     #[cfg(test)]
   1233     let polygon = serialize_gcs_polygon(&gcs.polygon).map_err(map_gcs_polygon_serialize_error)?;
   1234     #[cfg(not(test))]
   1235     let polygon = serialize_gcs_polygon(&gcs.polygon);
   1236 
   1237     let fields = IGcsLocationFields {
   1238         d_tag,
   1239         lat: gcs.lat,
   1240         lng: gcs.lng,
   1241         geohash: gcs.geohash,
   1242         point,
   1243         polygon,
   1244         accuracy: gcs.accuracy,
   1245         altitude: gcs.altitude,
   1246         tag_0: gcs.tag_0,
   1247         label: gcs.label,
   1248         area: gcs.area,
   1249         elevation: gcs.elevation,
   1250         soil: gcs.soil,
   1251         climate: gcs.climate,
   1252         gc_id: gcs.gc_id,
   1253         gc_name: gcs.gc_name,
   1254         gc_admin1_id: gcs.gc_admin1_id,
   1255         gc_admin1_name: gcs.gc_admin1_name,
   1256         gc_country_id: gcs.gc_country_id,
   1257         gc_country_name: gcs.gc_country_name,
   1258     };
   1259     let result = gcs_location::create(exec, &fields)?;
   1260     Ok(result.result.id)
   1261 }
   1262 
   1263 #[cfg(test)]
   1264 fn map_gcs_point_serialize_error(_err: serde_json::Error) -> RadrootsReplicaEventsError {
   1265     RadrootsReplicaEventsError::InvalidData("gcs.point".to_string())
   1266 }
   1267 
   1268 #[cfg(test)]
   1269 fn map_gcs_polygon_serialize_error(_err: serde_json::Error) -> RadrootsReplicaEventsError {
   1270     RadrootsReplicaEventsError::InvalidData("gcs.polygon".to_string())
   1271 }
   1272 
   1273 #[cfg(test)]
   1274 fn serialize_gcs_point(
   1275     point: &radroots_events::farm::RadrootsGeoJsonPoint,
   1276 ) -> Result<String, serde_json::Error> {
   1277     #[cfg(test)]
   1278     if failpoints::take_gcs_point_serialize_error() {
   1279         return Err(json_parse_error());
   1280     }
   1281     serde_json::to_string(point)
   1282 }
   1283 
   1284 #[cfg(not(test))]
   1285 fn serialize_gcs_point(point: &radroots_events::farm::RadrootsGeoJsonPoint) -> String {
   1286     serde_json::to_string(point).expect("gcs.point serializes")
   1287 }
   1288 
   1289 #[cfg(test)]
   1290 fn serialize_gcs_polygon(
   1291     polygon: &radroots_events::farm::RadrootsGeoJsonPolygon,
   1292 ) -> Result<String, serde_json::Error> {
   1293     #[cfg(test)]
   1294     if failpoints::take_gcs_polygon_serialize_error() {
   1295         return Err(json_parse_error());
   1296     }
   1297     serde_json::to_string(polygon)
   1298 }
   1299 
   1300 #[cfg(not(test))]
   1301 fn serialize_gcs_polygon(polygon: &radroots_events::farm::RadrootsGeoJsonPolygon) -> String {
   1302     serde_json::to_string(polygon).expect("gcs.polygon serializes")
   1303 }
   1304 
   1305 #[cfg(test)]
   1306 fn json_parse_error() -> serde_json::Error {
   1307     serde_json::from_str::<Value>("{").expect_err("json parse error")
   1308 }
   1309 
   1310 fn upsert_farm_members(
   1311     exec: &dyn SqlExecutor,
   1312     farm_id: &str,
   1313     role: ListSetRole,
   1314     list_set: &radroots_events::list_set::RadrootsListSet,
   1315 ) -> Result<(), RadrootsReplicaEventsError> {
   1316     let role_value = match role {
   1317         ListSetRole::Members => ROLE_MEMBER,
   1318         ListSetRole::Owners => ROLE_OWNER,
   1319         ListSetRole::Workers => ROLE_WORKER,
   1320         ListSetRole::Plots => return Ok(()),
   1321     };
   1322     let existing_query = farm_member::find_many(
   1323         exec,
   1324         &IFarmMemberFindMany {
   1325             filter: Some(IFarmMemberFieldsFilter {
   1326                 id: None,
   1327                 created_at: None,
   1328                 updated_at: None,
   1329                 farm_id: Some(farm_id.to_string()),
   1330                 member_pubkey: None,
   1331                 role: Some(role_value.to_string()),
   1332             }),
   1333         },
   1334     );
   1335     let existing = existing_query?;
   1336     for row in existing.results {
   1337         handle_delete_result(farm_member::delete(
   1338             exec,
   1339             &IFarmMemberDelete::On(IFarmMemberFindOneArgs {
   1340                 on: FarmMemberQueryBindValues::Id { id: row.id },
   1341             }),
   1342         ))?;
   1343     }
   1344 
   1345     let mut entries = Vec::new();
   1346     for entry in &list_set.entries {
   1347         for value in entry.values.iter().take(1) {
   1348             entries.push(value.to_string());
   1349         }
   1350     }
   1351     entries.sort();
   1352     entries.dedup();
   1353 
   1354     for pubkey in entries {
   1355         let fields = IFarmMemberFields {
   1356             farm_id: farm_id.to_string(),
   1357             member_pubkey: pubkey,
   1358             role: role_value.to_string(),
   1359         };
   1360         let _ = farm_member::create(exec, &fields)?;
   1361     }
   1362     Ok(())
   1363 }
   1364 
   1365 fn upsert_member_claims(
   1366     exec: &dyn SqlExecutor,
   1367     member_pubkey: &str,
   1368     list_set: &radroots_events::list_set::RadrootsListSet,
   1369 ) -> Result<(), RadrootsReplicaEventsError> {
   1370     let existing_query = farm_member_claim::find_many(
   1371         exec,
   1372         &IFarmMemberClaimFindMany {
   1373             filter: Some(IFarmMemberClaimFieldsFilter {
   1374                 id: None,
   1375                 created_at: None,
   1376                 updated_at: None,
   1377                 member_pubkey: Some(member_pubkey.to_string()),
   1378                 farm_pubkey: None,
   1379             }),
   1380         },
   1381     );
   1382     let existing = existing_query?;
   1383     for row in existing.results {
   1384         handle_delete_result(farm_member_claim::delete(
   1385             exec,
   1386             &IFarmMemberClaimDelete::On(IFarmMemberClaimFindOneArgs {
   1387                 on: FarmMemberClaimQueryBindValues::Id { id: row.id },
   1388             }),
   1389         ))?;
   1390     }
   1391 
   1392     let mut entries = Vec::new();
   1393     for entry in &list_set.entries {
   1394         for value in entry.values.iter().take(1) {
   1395             entries.push(value.to_string());
   1396         }
   1397     }
   1398     entries.sort();
   1399     entries.dedup();
   1400 
   1401     for farm_pubkey in entries {
   1402         let fields = IFarmMemberClaimFields {
   1403             member_pubkey: member_pubkey.to_string(),
   1404             farm_pubkey,
   1405         };
   1406         let _ = farm_member_claim::create(exec, &fields)?;
   1407     }
   1408     Ok(())
   1409 }
   1410 
   1411 fn handle_delete_result<T>(
   1412     result: Result<T, radroots_types::types::IError<SqlError>>,
   1413 ) -> Result<(), RadrootsReplicaEventsError> {
   1414     match result {
   1415         Ok(_) => Ok(()),
   1416         Err(err) => {
   1417             if matches!(err.err, SqlError::NotFound(_)) {
   1418                 return Ok(());
   1419             }
   1420             Err(err.into())
   1421         }
   1422     }
   1423 }
   1424 
   1425 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
   1426 enum ListSetRole {
   1427     Members,
   1428     Owners,
   1429     Workers,
   1430     Plots,
   1431 }
   1432 
   1433 fn unpack_farm_location_strings(
   1434     location: Option<&radroots_events::farm::RadrootsFarmLocation>,
   1435 ) -> (
   1436     Option<String>,
   1437     Option<String>,
   1438     Option<String>,
   1439     Option<String>,
   1440 ) {
   1441     match location {
   1442         Some(location) => (
   1443             location.primary.clone(),
   1444             location.city.clone(),
   1445             location.region.clone(),
   1446             location.country.clone(),
   1447         ),
   1448         None => (None, None, None, None),
   1449     }
   1450 }
   1451 
   1452 fn unpack_plot_location_strings(
   1453     location: Option<&radroots_events::plot::RadrootsPlotLocation>,
   1454 ) -> (
   1455     Option<String>,
   1456     Option<String>,
   1457     Option<String>,
   1458     Option<String>,
   1459 ) {
   1460     match location {
   1461         Some(location) => (
   1462             location.primary.clone(),
   1463             location.city.clone(),
   1464             location.region.clone(),
   1465             location.country.clone(),
   1466         ),
   1467         None => (None, None, None, None),
   1468     }
   1469 }
   1470 
   1471 fn ensure_list_set_entries_tag(
   1472     list_set: &radroots_events::list_set::RadrootsListSet,
   1473     expected: &str,
   1474     label: &str,
   1475 ) -> Result<(), RadrootsReplicaEventsError> {
   1476     for entry in list_set.entries.iter() {
   1477         if entry.tag != expected {
   1478             return Err(RadrootsReplicaEventsError::InvalidData(format!(
   1479                 "domain:farm list set {label} must only include {expected} tags"
   1480             )));
   1481         }
   1482     }
   1483     Ok(())
   1484 }
   1485 
   1486 fn parse_farm_list_set_d_tag(d_tag: &str) -> Option<(String, ListSetRole)> {
   1487     let mut parts = d_tag.splitn(3, ':');
   1488     if parts.next() != Some("farm") {
   1489         return None;
   1490     }
   1491     let farm_d_tag = parts.next()?.to_string();
   1492     let suffix = parts.next()?;
   1493     let role = match suffix {
   1494         "members" => ListSetRole::Members,
   1495         "members.owners" => ListSetRole::Owners,
   1496         "members.workers" => ListSetRole::Workers,
   1497         "plots" => ListSetRole::Plots,
   1498         _ => return None,
   1499     };
   1500     Some((farm_d_tag, role))
   1501 }
   1502 
   1503 fn to_value_opt(value: Option<String>) -> Option<Value> {
   1504     Some(match value {
   1505         Some(value) => Value::from(value),
   1506         None => Value::Null,
   1507     })
   1508 }
   1509 
   1510 struct EventHeadDecision {
   1511     apply: bool,
   1512     key: String,
   1513     kind: u32,
   1514     pubkey: String,
   1515     d_tag: String,
   1516     last_event_id: String,
   1517     last_created_at: u32,
   1518     content_hash: String,
   1519 }
   1520 
   1521 #[cfg(test)]
   1522 mod tests {
   1523     use super::*;
   1524     use std::sync::Arc;
   1525     use std::sync::atomic::{AtomicUsize, Ordering};
   1526 
   1527     use radroots_core::{
   1528         RadrootsCoreCurrency, RadrootsCoreMoney, RadrootsCoreQuantity, RadrootsCoreQuantityPrice,
   1529         RadrootsCoreUnit,
   1530     };
   1531     use radroots_events::farm::{
   1532         RadrootsFarm, RadrootsFarmLocation, RadrootsFarmRef, RadrootsGcsLocation,
   1533         RadrootsGeoJsonPoint, RadrootsGeoJsonPolygon,
   1534     };
   1535     use radroots_events::kinds::{KIND_LIST_SET_FOLLOW, KIND_LIST_SET_GENERIC};
   1536     use radroots_events::list::RadrootsListEntry;
   1537     use radroots_events::list_set::RadrootsListSet;
   1538     use radroots_events::listing::RadrootsListingProduct;
   1539     use radroots_events::plot::{RadrootsPlot, RadrootsPlotLocation};
   1540     use radroots_events::profile::{
   1541         RADROOTS_PROFILE_TYPE_TAG_KEY, RadrootsProfile, RadrootsProfileType,
   1542         radroots_profile_type_tag_value,
   1543     };
   1544     use radroots_events_codec::farm::encode as farm_encode;
   1545     use radroots_events_codec::farm::list_sets as farm_list_sets;
   1546     use radroots_events_codec::list_set::encode as list_set_encode;
   1547     use radroots_events_codec::plot::encode as plot_encode;
   1548     use radroots_replica_db::{
   1549         ReplicaSql, farm, farm_gcs_location, farm_member, farm_member_claim, farm_tag,
   1550         gcs_location, migrations, nostr_event_head, plot, plot_gcs_location, plot_tag,
   1551         trade_product,
   1552     };
   1553     use radroots_replica_db_schema::farm::IFarmFields;
   1554     use radroots_replica_db_schema::farm_gcs_location::IFarmGcsLocationFields;
   1555     use radroots_replica_db_schema::farm_member::IFarmMemberFields;
   1556     use radroots_replica_db_schema::farm_member_claim::IFarmMemberClaimFields;
   1557     use radroots_replica_db_schema::farm_tag::IFarmTagFields;
   1558     use radroots_replica_db_schema::gcs_location::IGcsLocationFields;
   1559     use radroots_replica_db_schema::plot::IPlotFields;
   1560     use radroots_replica_db_schema::plot_gcs_location::IPlotGcsLocationFields;
   1561     use radroots_replica_db_schema::plot_tag::IPlotTagFields;
   1562     use radroots_sql_core::{ExecOutcome, SqlExecutor, SqliteExecutor};
   1563 
   1564     struct FixedFactory;
   1565 
   1566     impl RadrootsReplicaIdFactory for FixedFactory {
   1567         fn new_d_tag(&self) -> String {
   1568             "AAAAAAAAAAAAAAAAAAAAAZ".to_string()
   1569         }
   1570     }
   1571 
   1572     struct TxnExecutor<'a> {
   1573         inner: Option<&'a SqliteExecutor>,
   1574         begin_err: Option<SqlError>,
   1575         commit_err: Option<SqlError>,
   1576         rollback_count: Arc<AtomicUsize>,
   1577     }
   1578 
   1579     impl SqlExecutor for TxnExecutor<'_> {
   1580         fn exec(&self, sql: &str, params_json: &str) -> Result<ExecOutcome, SqlError> {
   1581             match self.inner {
   1582                 Some(inner) => inner.exec(sql, params_json),
   1583                 None => Err(SqlError::UnsupportedPlatform),
   1584             }
   1585         }
   1586 
   1587         fn query_raw(&self, sql: &str, params_json: &str) -> Result<String, SqlError> {
   1588             match self.inner {
   1589                 Some(inner) => inner.query_raw(sql, params_json),
   1590                 None => Err(SqlError::UnsupportedPlatform),
   1591             }
   1592         }
   1593 
   1594         fn begin(&self) -> Result<(), SqlError> {
   1595             match self.begin_err.clone() {
   1596                 Some(err) => Err(err),
   1597                 None => Ok(()),
   1598             }
   1599         }
   1600 
   1601         fn commit(&self) -> Result<(), SqlError> {
   1602             match self.commit_err.clone() {
   1603                 Some(err) => Err(err),
   1604                 None => Ok(()),
   1605             }
   1606         }
   1607 
   1608         fn rollback(&self) -> Result<(), SqlError> {
   1609             self.rollback_count.fetch_add(1, Ordering::SeqCst);
   1610             Ok(())
   1611         }
   1612     }
   1613 
   1614     struct DeleteErrorExecutor<'a> {
   1615         inner: &'a SqliteExecutor,
   1616         table_name: &'static str,
   1617         err: SqlError,
   1618     }
   1619 
   1620     impl SqlExecutor for DeleteErrorExecutor<'_> {
   1621         fn exec(&self, sql: &str, params_json: &str) -> Result<ExecOutcome, SqlError> {
   1622             let normalized = sql.to_ascii_lowercase();
   1623             if normalized.contains("delete from") && normalized.contains(self.table_name) {
   1624                 return Err(self.err.clone());
   1625             }
   1626             self.inner.exec(sql, params_json)
   1627         }
   1628 
   1629         fn query_raw(&self, sql: &str, params_json: &str) -> Result<String, SqlError> {
   1630             self.inner.query_raw(sql, params_json)
   1631         }
   1632 
   1633         fn begin(&self) -> Result<(), SqlError> {
   1634             self.inner.begin()
   1635         }
   1636 
   1637         fn commit(&self) -> Result<(), SqlError> {
   1638             self.inner.commit()
   1639         }
   1640 
   1641         fn rollback(&self) -> Result<(), SqlError> {
   1642             self.inner.rollback()
   1643         }
   1644     }
   1645 
   1646     struct PassExecutor<'a> {
   1647         inner: &'a SqliteExecutor,
   1648     }
   1649 
   1650     impl SqlExecutor for PassExecutor<'_> {
   1651         fn exec(&self, sql: &str, params_json: &str) -> Result<ExecOutcome, SqlError> {
   1652             self.inner.exec(sql, params_json)
   1653         }
   1654 
   1655         fn query_raw(&self, sql: &str, params_json: &str) -> Result<String, SqlError> {
   1656             self.inner.query_raw(sql, params_json)
   1657         }
   1658 
   1659         fn begin(&self) -> Result<(), SqlError> {
   1660             self.inner.begin()
   1661         }
   1662 
   1663         fn commit(&self) -> Result<(), SqlError> {
   1664             self.inner.commit()
   1665         }
   1666 
   1667         fn rollback(&self) -> Result<(), SqlError> {
   1668             self.inner.rollback()
   1669         }
   1670     }
   1671 
   1672     struct QueryFailExecutor<'a> {
   1673         inner: &'a SqliteExecutor,
   1674         needle: &'static str,
   1675         err: SqlError,
   1676     }
   1677 
   1678     impl SqlExecutor for QueryFailExecutor<'_> {
   1679         fn exec(&self, sql: &str, params_json: &str) -> Result<ExecOutcome, SqlError> {
   1680             let normalized = sql.to_ascii_lowercase();
   1681             if normalized.contains(self.needle) {
   1682                 return Err(self.err.clone());
   1683             }
   1684             self.inner.exec(sql, params_json)
   1685         }
   1686 
   1687         fn query_raw(&self, sql: &str, params_json: &str) -> Result<String, SqlError> {
   1688             let normalized = sql.to_ascii_lowercase();
   1689             if normalized.contains(self.needle) {
   1690                 return Err(self.err.clone());
   1691             }
   1692             self.inner.query_raw(sql, params_json)
   1693         }
   1694 
   1695         fn begin(&self) -> Result<(), SqlError> {
   1696             self.inner.begin()
   1697         }
   1698 
   1699         fn commit(&self) -> Result<(), SqlError> {
   1700             self.inner.commit()
   1701         }
   1702 
   1703         fn rollback(&self) -> Result<(), SqlError> {
   1704             self.inner.rollback()
   1705         }
   1706     }
   1707 
   1708     fn sample_gcs(lat: f64, lng: f64, geohash: &str) -> RadrootsGcsLocation {
   1709         RadrootsGcsLocation {
   1710             lat,
   1711             lng,
   1712             geohash: geohash.to_string(),
   1713             point: RadrootsGeoJsonPoint {
   1714                 r#type: "Point".to_string(),
   1715                 coordinates: [lng, lat],
   1716             },
   1717             polygon: RadrootsGeoJsonPolygon {
   1718                 r#type: "Polygon".to_string(),
   1719                 coordinates: vec![vec![
   1720                     [lng, lat],
   1721                     [lng, lat + 0.001],
   1722                     [lng - 0.001, lat + 0.001],
   1723                     [lng, lat],
   1724                 ]],
   1725             },
   1726             accuracy: Some(1.0),
   1727             altitude: Some(2.0),
   1728             tag_0: Some("tag".to_string()),
   1729             label: Some("label".to_string()),
   1730             area: Some(3.0),
   1731             elevation: Some(4),
   1732             soil: Some("soil".to_string()),
   1733             climate: Some("climate".to_string()),
   1734             gc_id: Some("gc_id".to_string()),
   1735             gc_name: Some("gc_name".to_string()),
   1736             gc_admin1_id: Some("gc_admin1_id".to_string()),
   1737             gc_admin1_name: Some("gc_admin1_name".to_string()),
   1738             gc_country_id: Some("gc_country_id".to_string()),
   1739             gc_country_name: Some("gc_country_name".to_string()),
   1740         }
   1741     }
   1742 
   1743     fn profile_event(
   1744         id: u64,
   1745         author: &str,
   1746         created_at: u32,
   1747         profile_type: Option<RadrootsProfileType>,
   1748         name: &str,
   1749     ) -> RadrootsNostrEvent {
   1750         let profile = RadrootsProfile {
   1751             name: name.to_string(),
   1752             display_name: Some(format!("{name}-display")),
   1753             nip05: Some(format!("{name}@example.com")),
   1754             about: Some(format!("{name}-about")),
   1755             website: Some("https://example.com".to_string()),
   1756             picture: Some("https://example.com/p.png".to_string()),
   1757             banner: Some("https://example.com/b.png".to_string()),
   1758             lud06: Some("lud06".to_string()),
   1759             lud16: Some("lud16".to_string()),
   1760             bot: None,
   1761         };
   1762         let mut tags = Vec::new();
   1763         if let Some(profile_type) = profile_type {
   1764             tags.push(vec![
   1765                 RADROOTS_PROFILE_TYPE_TAG_KEY.to_string(),
   1766                 radroots_profile_type_tag_value(profile_type).to_string(),
   1767             ]);
   1768         }
   1769         RadrootsNostrEvent {
   1770             id: format!("{id:064x}"),
   1771             author: author.to_string(),
   1772             created_at,
   1773             kind: KIND_PROFILE,
   1774             tags,
   1775             content: serde_json::to_string(&profile).expect("profile json"),
   1776             sig: "f".repeat(128),
   1777         }
   1778     }
   1779 
   1780     fn farm_event(
   1781         id: u64,
   1782         author: &str,
   1783         created_at: u32,
   1784         d_tag: &str,
   1785         name: &str,
   1786         location: Option<RadrootsFarmLocation>,
   1787         tags: Option<Vec<String>>,
   1788     ) -> RadrootsNostrEvent {
   1789         let farm = RadrootsFarm {
   1790             d_tag: d_tag.to_string(),
   1791             name: name.to_string(),
   1792             about: Some("about".to_string()),
   1793             website: Some("https://farm.example.com".to_string()),
   1794             picture: Some("https://farm.example.com/p.png".to_string()),
   1795             banner: Some("https://farm.example.com/b.png".to_string()),
   1796             location,
   1797             tags,
   1798         };
   1799         let tags = farm_encode::farm_build_tags(&farm).expect("farm tags");
   1800         RadrootsNostrEvent {
   1801             id: format!("{id:064x}"),
   1802             author: author.to_string(),
   1803             created_at,
   1804             kind: KIND_FARM,
   1805             tags,
   1806             content: serde_json::to_string(&farm).expect("farm json"),
   1807             sig: "f".repeat(128),
   1808         }
   1809     }
   1810 
   1811     fn plot_event(
   1812         id: u64,
   1813         author: &str,
   1814         created_at: u32,
   1815         d_tag: &str,
   1816         farm_ref: RadrootsFarmRef,
   1817         name: &str,
   1818         location: Option<RadrootsPlotLocation>,
   1819         tags: Option<Vec<String>>,
   1820     ) -> RadrootsNostrEvent {
   1821         let plot = RadrootsPlot {
   1822             d_tag: d_tag.to_string(),
   1823             farm: farm_ref,
   1824             name: name.to_string(),
   1825             about: Some("plot-about".to_string()),
   1826             location,
   1827             tags,
   1828         };
   1829         let tags = plot_encode::plot_build_tags(&plot).expect("plot tags");
   1830         RadrootsNostrEvent {
   1831             id: format!("{id:064x}"),
   1832             author: author.to_string(),
   1833             created_at,
   1834             kind: KIND_PLOT,
   1835             tags,
   1836             content: serde_json::to_string(&plot).expect("plot json"),
   1837             sig: "f".repeat(128),
   1838         }
   1839     }
   1840 
   1841     fn list_set_event(
   1842         id: u64,
   1843         author: &str,
   1844         created_at: u32,
   1845         kind: u32,
   1846         list_set: &RadrootsListSet,
   1847     ) -> RadrootsNostrEvent {
   1848         let parts = list_set_encode::to_wire_parts_with_kind(list_set, kind).expect("list set");
   1849         RadrootsNostrEvent {
   1850             id: format!("{id:064x}"),
   1851             author: author.to_string(),
   1852             created_at,
   1853             kind,
   1854             tags: parts.tags,
   1855             content: parts.content,
   1856             sig: "f".repeat(128),
   1857         }
   1858     }
   1859 
   1860     fn listing_event(
   1861         id: u64,
   1862         author: &str,
   1863         created_at: u32,
   1864         d_tag: &str,
   1865         status: &str,
   1866         title: &str,
   1867     ) -> RadrootsNostrEvent {
   1868         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   1869         RadrootsNostrEvent {
   1870             id: format!("{id:064x}"),
   1871             author: author.to_string(),
   1872             created_at,
   1873             kind: KIND_LISTING,
   1874             tags: vec![
   1875                 vec!["d".to_string(), d_tag.to_string()],
   1876                 vec![
   1877                     "a".to_string(),
   1878                     format!("{}:{}:{}", KIND_FARM, author, farm_d_tag),
   1879                 ],
   1880                 vec!["p".to_string(), author.to_string()],
   1881                 vec!["key".to_string(), "pasture-eggs".to_string()],
   1882                 vec!["title".to_string(), title.to_string()],
   1883                 vec!["category".to_string(), "eggs".to_string()],
   1884                 vec!["summary".to_string(), "Pasture-raised eggs".to_string()],
   1885                 vec!["process".to_string(), "washed".to_string()],
   1886                 vec!["lot".to_string(), "lot-a".to_string()],
   1887                 vec!["profile".to_string(), "dozen".to_string()],
   1888                 vec!["year".to_string(), "2026".to_string()],
   1889                 vec!["radroots:primary_bin".to_string(), "bin-a".to_string()],
   1890                 vec![
   1891                     "radroots:bin".to_string(),
   1892                     "bin-a".to_string(),
   1893                     "12".to_string(),
   1894                     "each".to_string(),
   1895                     "12".to_string(),
   1896                     "each".to_string(),
   1897                     "dozen".to_string(),
   1898                 ],
   1899                 vec![
   1900                     "radroots:price".to_string(),
   1901                     "bin-a".to_string(),
   1902                     "6".to_string(),
   1903                     "USD".to_string(),
   1904                     "1".to_string(),
   1905                     "each".to_string(),
   1906                     "6".to_string(),
   1907                     "each".to_string(),
   1908                 ],
   1909                 vec!["inventory".to_string(), "5".to_string()],
   1910                 vec!["status".to_string(), status.to_string()],
   1911             ],
   1912             content: format!("# {title}"),
   1913             sig: "f".repeat(128),
   1914         }
   1915     }
   1916 
   1917     fn listing_decimal(raw: &str) -> RadrootsCoreDecimal {
   1918         raw.parse().expect("decimal")
   1919     }
   1920 
   1921     fn listing_currency() -> RadrootsCoreCurrency {
   1922         "USD".parse().expect("currency")
   1923     }
   1924 
   1925     fn listing_model() -> RadrootsListing {
   1926         RadrootsListing {
   1927             d_tag: "AAAAAAAAAAAAAAAAAAAAAA".parse().expect("d tag"),
   1928             published_at: Some(1),
   1929             farm: RadrootsFarmRef {
   1930                 pubkey: "c".repeat(64),
   1931                 d_tag: "AAAAAAAAAAAAAAAAAAAAAZ".to_string(),
   1932             },
   1933             product: RadrootsListingProduct {
   1934                 key: "pasture-eggs".to_string(),
   1935                 title: "Pasture Eggs".to_string(),
   1936                 category: "eggs".to_string(),
   1937                 summary: Some("Pasture-raised eggs".to_string()),
   1938                 process: None,
   1939                 lot: None,
   1940                 location: None,
   1941                 profile: None,
   1942                 year: Some("2026".to_string()),
   1943             },
   1944             primary_bin_id: "bin-a".parse().expect("primary bin id"),
   1945             bins: vec![RadrootsListingBin {
   1946                 bin_id: "bin-a".parse().expect("bin id"),
   1947                 quantity: RadrootsCoreQuantity::new(listing_decimal("12"), RadrootsCoreUnit::Each)
   1948                     .with_label("unit label"),
   1949                 price_per_canonical_unit: RadrootsCoreQuantityPrice::new(
   1950                     RadrootsCoreMoney::new(listing_decimal("6"), listing_currency()),
   1951                     RadrootsCoreQuantity::new(listing_decimal("1"), RadrootsCoreUnit::Each),
   1952                 ),
   1953                 display_amount: None,
   1954                 display_unit: None,
   1955                 display_label: None,
   1956                 display_price: None,
   1957                 display_price_unit: None,
   1958             }],
   1959             resource_area: None,
   1960             plot: None,
   1961             discounts: Some(Vec::new()),
   1962             inventory_available: Some(listing_decimal("5")),
   1963             availability: Some(RadrootsListingAvailability::Status {
   1964                 status: RadrootsListingStatus::Active,
   1965             }),
   1966             delivery_method: None,
   1967             location: None,
   1968             images: None,
   1969         }
   1970     }
   1971 
   1972     #[test]
   1973     fn listing_field_helpers_cover_optional_label_and_error_paths() {
   1974         let listing = listing_model();
   1975         let fields =
   1976             trade_product_fields_from_listing(&listing, "30402:pubkey:listing").expect("fields");
   1977         assert_eq!(fields.qty_label.as_deref(), Some("unit label"));
   1978         assert_eq!(fields.notes, None);
   1979 
   1980         let mut missing_bin = listing.clone();
   1981         missing_bin.primary_bin_id = "bin-missing".parse().expect("missing bin id");
   1982         let err = trade_product_fields_from_listing(&missing_bin, "30402:pubkey:listing")
   1983             .err()
   1984             .expect("missing primary bin should fail");
   1985         assert!(err.to_string().contains("primary bin missing"));
   1986 
   1987         let mut fractional_inventory = listing;
   1988         fractional_inventory.inventory_available = Some(listing_decimal("1.5"));
   1989         let err = trade_product_fields_from_listing(&fractional_inventory, "30402:pubkey:listing")
   1990             .err()
   1991             .expect("fractional inventory should fail");
   1992         assert!(err.to_string().contains("whole number"));
   1993 
   1994         let err = decimal_to_i64(&listing_decimal("9223372036854775808"), "listing inventory")
   1995             .err()
   1996             .expect("i64 overflow should fail");
   1997         assert!(err.to_string().contains("exceeds i64 range"));
   1998     }
   1999 
   2000     #[test]
   2001     fn current_event_head_reports_invalid_stored_event_id() {
   2002         let row = NostrEventHead {
   2003             id: "head-1".to_string(),
   2004             created_at: "2026-01-01T00:00:00Z".to_string(),
   2005             updated_at: "2026-01-01T00:00:00Z".to_string(),
   2006             key: "profile".to_string(),
   2007             kind: KIND_PROFILE,
   2008             pubkey: "a".repeat(64),
   2009             d_tag: String::new(),
   2010             last_event_id: "not-an-event-id".to_string(),
   2011             last_created_at: 1,
   2012             content_hash: "hash".to_string(),
   2013         };
   2014         let coordinate = RadrootsEventHeadCoordinate::Replaceable {
   2015             kind: KIND_PROFILE,
   2016             pubkey: "a".repeat(64).parse().expect("pubkey"),
   2017         };
   2018 
   2019         let err = current_event_head_from_row(&row, &coordinate)
   2020             .expect_err("invalid stored event id should fail");
   2021         assert!(err.to_string().contains("last_event_id invalid"));
   2022     }
   2023 
   2024     fn seed_rows(exec: &SqliteExecutor) -> (String, String, String, String) {
   2025         migrations::run_all_up(exec).expect("migrations");
   2026         let farm_row = farm::create(
   2027             exec,
   2028             &IFarmFields {
   2029                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
   2030                 pubkey: "f".repeat(64),
   2031                 name: "farm".to_string(),
   2032                 about: None,
   2033                 website: None,
   2034                 picture: None,
   2035                 banner: None,
   2036                 location_primary: None,
   2037                 location_city: None,
   2038                 location_region: None,
   2039                 location_country: None,
   2040             },
   2041         )
   2042         .expect("farm")
   2043         .result;
   2044         let plot_row = plot::create(
   2045             exec,
   2046             &IPlotFields {
   2047                 d_tag: "AAAAAAAAAAAAAAAAAAAAAQ".to_string(),
   2048                 farm_id: farm_row.id.clone(),
   2049                 name: "plot".to_string(),
   2050                 about: None,
   2051                 location_primary: None,
   2052                 location_city: None,
   2053                 location_region: None,
   2054                 location_country: None,
   2055             },
   2056         )
   2057         .expect("plot")
   2058         .result;
   2059         let gcs_row = gcs_location::create(
   2060             exec,
   2061             &IGcsLocationFields {
   2062                 d_tag: "AAAAAAAAAAAAAAAAAAAAAw".to_string(),
   2063                 lat: 1.0,
   2064                 lng: 2.0,
   2065                 geohash: "s0".to_string(),
   2066                 point: "{\"type\":\"Point\",\"coordinates\":[2.0,1.0]}".to_string(),
   2067                 polygon:
   2068                     "{\"type\":\"Polygon\",\"coordinates\":[[[2.0,1.0],[2.1,1.1],[1.9,1.1],[2.0,1.0]]]}".to_string(),
   2069                 accuracy: None,
   2070                 altitude: None,
   2071                 tag_0: None,
   2072                 label: None,
   2073                 area: None,
   2074                 elevation: None,
   2075                 soil: None,
   2076                 climate: None,
   2077                 gc_id: None,
   2078                 gc_name: None,
   2079                 gc_admin1_id: None,
   2080                 gc_admin1_name: None,
   2081                 gc_country_id: None,
   2082                 gc_country_name: None,
   2083             },
   2084         )
   2085         .expect("gcs")
   2086         .result;
   2087 
   2088         let _ = farm_tag::create(
   2089             exec,
   2090             &IFarmTagFields {
   2091                 farm_id: farm_row.id.clone(),
   2092                 tag: "alpha".to_string(),
   2093             },
   2094         )
   2095         .expect("farm tag");
   2096         let _ = plot_tag::create(
   2097             exec,
   2098             &IPlotTagFields {
   2099                 plot_id: plot_row.id.clone(),
   2100                 tag: "beta".to_string(),
   2101             },
   2102         )
   2103         .expect("plot tag");
   2104         let _ = farm_gcs_location::create(
   2105             exec,
   2106             &IFarmGcsLocationFields {
   2107                 farm_id: farm_row.id.clone(),
   2108                 gcs_location_id: gcs_row.id.clone(),
   2109                 role: "primary".to_string(),
   2110             },
   2111         )
   2112         .expect("farm gcs");
   2113         let _ = plot_gcs_location::create(
   2114             exec,
   2115             &IPlotGcsLocationFields {
   2116                 plot_id: plot_row.id.clone(),
   2117                 gcs_location_id: gcs_row.id.clone(),
   2118                 role: "primary".to_string(),
   2119             },
   2120         )
   2121         .expect("plot gcs");
   2122         let _ = farm_member::create(
   2123             exec,
   2124             &IFarmMemberFields {
   2125                 farm_id: farm_row.id.clone(),
   2126                 member_pubkey: "6".repeat(64),
   2127                 role: "member".to_string(),
   2128             },
   2129         )
   2130         .expect("member");
   2131         let _ = farm_member_claim::create(
   2132             exec,
   2133             &IFarmMemberClaimFields {
   2134                 member_pubkey: "6".repeat(64),
   2135                 farm_pubkey: farm_row.pubkey.clone(),
   2136             },
   2137         )
   2138         .expect("claim");
   2139         (
   2140             farm_row.id,
   2141             farm_row.pubkey,
   2142             farm_row.d_tag,
   2143             plot_row.d_tag.clone(),
   2144         )
   2145     }
   2146 
   2147     #[test]
   2148     fn ingest_transaction_paths_are_covered() {
   2149         let begin_executor = TxnExecutor {
   2150             inner: None,
   2151             begin_err: Some(SqlError::Internal),
   2152             commit_err: None,
   2153             rollback_count: Arc::new(AtomicUsize::new(0)),
   2154         };
   2155         let event = RadrootsNostrEvent {
   2156             id: format!("{:064x}", 1u64),
   2157             author: "a".repeat(64),
   2158             created_at: 1,
   2159             kind: KIND_LIST_SET_FOLLOW,
   2160             tags: Vec::new(),
   2161             content: String::new(),
   2162             sig: "f".repeat(128),
   2163         };
   2164         let begin_err =
   2165             radroots_replica_ingest_event_with_factory(&begin_executor, &event, &FixedFactory)
   2166                 .expect_err("begin");
   2167         assert!(begin_err.to_string().contains("replica_sync.sql"));
   2168         assert!(begin_executor.commit().is_ok());
   2169         assert_eq!(
   2170             begin_executor
   2171                 .exec("select 1", "[]")
   2172                 .expect_err("exec")
   2173                 .code(),
   2174             "ERR_UNSUPPORTED_PLATFORM"
   2175         );
   2176         assert_eq!(
   2177             begin_executor
   2178                 .query_raw("select 1", "[]")
   2179                 .expect_err("query")
   2180                 .code(),
   2181             "ERR_UNSUPPORTED_PLATFORM"
   2182         );
   2183 
   2184         let rollback_count = Arc::new(AtomicUsize::new(0));
   2185         let commit_executor = TxnExecutor {
   2186             inner: None,
   2187             begin_err: None,
   2188             commit_err: Some(SqlError::Internal),
   2189             rollback_count: rollback_count.clone(),
   2190         };
   2191         let commit_err =
   2192             radroots_replica_ingest_event_with_factory(&commit_executor, &event, &FixedFactory)
   2193                 .expect_err("commit");
   2194         assert!(commit_err.to_string().contains("replica_sync.sql"));
   2195         assert_eq!(rollback_count.load(Ordering::SeqCst), 0);
   2196 
   2197         let rollback_executor = TxnExecutor {
   2198             inner: None,
   2199             begin_err: None,
   2200             commit_err: None,
   2201             rollback_count: Arc::new(AtomicUsize::new(0)),
   2202         };
   2203         let unsupported = RadrootsNostrEvent {
   2204             id: format!("{:064x}", 2u64),
   2205             author: "a".repeat(64),
   2206             created_at: 2,
   2207             kind: 42,
   2208             tags: Vec::new(),
   2209             content: String::new(),
   2210             sig: "f".repeat(128),
   2211         };
   2212         let err = radroots_replica_ingest_event_with_factory(
   2213             &rollback_executor,
   2214             &unsupported,
   2215             &FixedFactory,
   2216         )
   2217         .expect_err("rollback");
   2218         assert!(err.to_string().contains("unsupported kind"));
   2219         assert_eq!(rollback_executor.rollback_count.load(Ordering::SeqCst), 1);
   2220     }
   2221 
   2222     #[test]
   2223     fn ingest_core_paths_cover_helpers_and_decisions() {
   2224         let exec = SqliteExecutor::open_memory().expect("db");
   2225         migrations::run_all_up(&exec).expect("migrations");
   2226 
   2227         let factory = RadrootsReplicaDefaultIdFactory;
   2228         assert_eq!(factory.new_d_tag().len(), 22);
   2229 
   2230         let profile_pubkey = "9".repeat(64);
   2231         let profile = profile_event(
   2232             10,
   2233             &profile_pubkey,
   2234             1,
   2235             Some(RadrootsProfileType::Individual),
   2236             "alice",
   2237         );
   2238         let profile_no_type = profile_event(9, &profile_pubkey, 0, None, "alice-none");
   2239         assert!(ingest_profile_event(&exec, &profile_no_type).is_err());
   2240         assert_eq!(
   2241             radroots_replica_ingest_event(&exec, &profile).expect("ingest wrapper"),
   2242             RadrootsReplicaIngestOutcome::Applied
   2243         );
   2244         let profile_update = profile_event(
   2245             11,
   2246             &profile_pubkey,
   2247             2,
   2248             Some(RadrootsProfileType::Individual),
   2249             "alice-2",
   2250         );
   2251         assert_eq!(
   2252             ingest_profile_event(&exec, &profile_update).expect("profile update"),
   2253             RadrootsReplicaIngestOutcome::Applied
   2254         );
   2255         assert_eq!(
   2256             ingest_profile_event(&exec, &profile_update).expect("profile skip"),
   2257             RadrootsReplicaIngestOutcome::Skipped
   2258         );
   2259         let profile_older = profile_event(
   2260             8,
   2261             &profile_pubkey,
   2262             1,
   2263             Some(RadrootsProfileType::Individual),
   2264             "alice-old",
   2265         );
   2266         let decision_old = event_head_decision(&exec, &profile_older).expect("decision old");
   2267         assert!(!decision_old.apply);
   2268         let decision_same = event_head_decision(&exec, &profile_update).expect("decision same");
   2269         assert!(!decision_same.apply);
   2270         let profile_same_time_higher_id = profile_event(
   2271             12,
   2272             &profile_pubkey,
   2273             2,
   2274             Some(RadrootsProfileType::Individual),
   2275             "alice-3",
   2276         );
   2277         let decision = event_head_decision(&exec, &profile_same_time_higher_id).expect("decision");
   2278         assert!(!decision.apply);
   2279         let profile_same_time_lower_id = profile_event(
   2280             10,
   2281             &profile_pubkey,
   2282             2,
   2283             Some(RadrootsProfileType::Individual),
   2284             "alice-0",
   2285         );
   2286         let decision = event_head_decision(&exec, &profile_same_time_lower_id).expect("decision");
   2287         assert!(decision.apply);
   2288 
   2289         let farm_pubkey = "f".repeat(64);
   2290         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   2291         let farm = farm_event(
   2292             20,
   2293             &farm_pubkey,
   2294             10,
   2295             farm_d_tag,
   2296             "farm-a",
   2297             Some(RadrootsFarmLocation {
   2298                 primary: Some("primary".to_string()),
   2299                 city: Some("city".to_string()),
   2300                 region: Some("region".to_string()),
   2301                 country: Some("country".to_string()),
   2302                 gcs: Some(sample_gcs(10.0, 20.0, "s0")),
   2303             }),
   2304             Some(vec![
   2305                 "coffee".to_string(),
   2306                 "coffee".to_string(),
   2307                 " ".to_string(),
   2308             ]),
   2309         );
   2310         assert_eq!(
   2311             ingest_farm_event(&exec, &farm, &FixedFactory).expect("farm"),
   2312             RadrootsReplicaIngestOutcome::Applied
   2313         );
   2314         let farm_update = farm_event(
   2315             21,
   2316             &farm_pubkey,
   2317             11,
   2318             farm_d_tag,
   2319             "farm-b",
   2320             None,
   2321             Some(vec!["market".to_string()]),
   2322         );
   2323         assert_eq!(
   2324             ingest_farm_event(&exec, &farm_update, &FixedFactory).expect("farm update"),
   2325             RadrootsReplicaIngestOutcome::Applied
   2326         );
   2327         assert_eq!(
   2328             ingest_farm_event(&exec, &farm_update, &FixedFactory).expect("farm skip"),
   2329             RadrootsReplicaIngestOutcome::Skipped
   2330         );
   2331 
   2332         let plot_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
   2333         let plot = plot_event(
   2334             30,
   2335             &farm_pubkey,
   2336             20,
   2337             plot_d_tag,
   2338             RadrootsFarmRef {
   2339                 pubkey: farm_pubkey.clone(),
   2340                 d_tag: farm_d_tag.to_string(),
   2341             },
   2342             "plot-a",
   2343             Some(RadrootsPlotLocation {
   2344                 primary: Some("p".to_string()),
   2345                 city: Some("c".to_string()),
   2346                 region: Some("r".to_string()),
   2347                 country: Some("k".to_string()),
   2348                 gcs: sample_gcs(11.0, 21.0, "s1"),
   2349             }),
   2350             Some(vec!["tag".to_string()]),
   2351         );
   2352         assert_eq!(
   2353             ingest_plot_event(&exec, &plot, &FixedFactory).expect("plot"),
   2354             RadrootsReplicaIngestOutcome::Applied
   2355         );
   2356         let plot_update = plot_event(
   2357             31,
   2358             &farm_pubkey,
   2359             21,
   2360             plot_d_tag,
   2361             RadrootsFarmRef {
   2362                 pubkey: farm_pubkey.clone(),
   2363                 d_tag: farm_d_tag.to_string(),
   2364             },
   2365             "plot-b",
   2366             None,
   2367             Some(vec!["tag2".to_string()]),
   2368         );
   2369         assert_eq!(
   2370             ingest_plot_event(&exec, &plot_update, &FixedFactory).expect("plot update"),
   2371             RadrootsReplicaIngestOutcome::Applied
   2372         );
   2373         assert_eq!(
   2374             ingest_plot_event(&exec, &plot_update, &FixedFactory).expect("plot skip"),
   2375             RadrootsReplicaIngestOutcome::Skipped
   2376         );
   2377 
   2378         let members = farm_list_sets::farm_members_list_set(farm_d_tag, vec!["6".repeat(64)])
   2379             .expect("members");
   2380         let owners =
   2381             farm_list_sets::farm_owners_list_set(farm_d_tag, vec!["8".repeat(64)]).expect("owners");
   2382         let workers = farm_list_sets::farm_workers_list_set(farm_d_tag, vec!["0".repeat(64)])
   2383             .expect("workers");
   2384         let plots = farm_list_sets::farm_plots_list_set(
   2385             farm_d_tag,
   2386             &farm_pubkey,
   2387             vec![plot_d_tag.to_string()],
   2388         )
   2389         .expect("plots");
   2390         let member_of =
   2391             farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()]).expect("member_of");
   2392 
   2393         for (idx, list_set) in [members, owners, workers, plots, member_of]
   2394             .iter()
   2395             .enumerate()
   2396         {
   2397             let event = list_set_event(
   2398                 40 + idx as u64,
   2399                 if list_set.d_tag == "member_of.farms" {
   2400                     &profile_pubkey
   2401                 } else {
   2402                     &farm_pubkey
   2403                 },
   2404                 30 + idx as u32,
   2405                 KIND_LIST_SET_GENERIC,
   2406                 list_set,
   2407             );
   2408             assert_eq!(
   2409                 ingest_list_set_event(&exec, &event).expect("list set"),
   2410                 RadrootsReplicaIngestOutcome::Applied
   2411             );
   2412             assert_eq!(
   2413                 ingest_list_set_event(&exec, &event).expect("list set skip"),
   2414                 RadrootsReplicaIngestOutcome::Skipped
   2415             );
   2416         }
   2417 
   2418         let bad_description = RadrootsListSet {
   2419             d_tag: "member_of.farms".to_string(),
   2420             content: String::new(),
   2421             entries: vec![RadrootsListEntry {
   2422                 tag: "p".to_string(),
   2423                 values: vec![farm_pubkey.clone()],
   2424             }],
   2425             title: None,
   2426             description: Some("bad".to_string()),
   2427             image: None,
   2428         };
   2429         let bad_description_event = list_set_event(
   2430             90,
   2431             &profile_pubkey,
   2432             100,
   2433             KIND_LIST_SET_GENERIC,
   2434             &bad_description,
   2435         );
   2436         assert!(ingest_list_set_event(&exec, &bad_description_event).is_err());
   2437 
   2438         let bad_image = RadrootsListSet {
   2439             d_tag: "member_of.farms".to_string(),
   2440             content: String::new(),
   2441             entries: vec![RadrootsListEntry {
   2442                 tag: "p".to_string(),
   2443                 values: vec![farm_pubkey.clone()],
   2444             }],
   2445             title: None,
   2446             description: None,
   2447             image: Some("bad".to_string()),
   2448         };
   2449         let bad_image_event =
   2450             list_set_event(91, &profile_pubkey, 101, KIND_LIST_SET_GENERIC, &bad_image);
   2451         assert!(ingest_list_set_event(&exec, &bad_image_event).is_err());
   2452 
   2453         let bad_title = RadrootsListSet {
   2454             d_tag: "member_of.farms".to_string(),
   2455             content: String::new(),
   2456             entries: vec![RadrootsListEntry {
   2457                 tag: "p".to_string(),
   2458                 values: vec![farm_pubkey.clone()],
   2459             }],
   2460             title: Some("bad".to_string()),
   2461             description: None,
   2462             image: None,
   2463         };
   2464         let bad_title_event =
   2465             list_set_event(92, &profile_pubkey, 102, KIND_LIST_SET_GENERIC, &bad_title);
   2466         assert!(ingest_list_set_event(&exec, &bad_title_event).is_err());
   2467 
   2468         let bad_content = RadrootsListSet {
   2469             d_tag: "member_of.farms".to_string(),
   2470             content: "bad".to_string(),
   2471             entries: vec![RadrootsListEntry {
   2472                 tag: "p".to_string(),
   2473                 values: vec![farm_pubkey.clone()],
   2474             }],
   2475             title: None,
   2476             description: None,
   2477             image: None,
   2478         };
   2479         let bad_content_event = list_set_event(
   2480             93,
   2481             &profile_pubkey,
   2482             103,
   2483             KIND_LIST_SET_GENERIC,
   2484             &bad_content,
   2485         );
   2486         assert!(ingest_list_set_event(&exec, &bad_content_event).is_err());
   2487 
   2488         let unknown_farm_list_set = RadrootsListSet {
   2489             d_tag: format!("farm:{farm_d_tag}:unknown"),
   2490             content: String::new(),
   2491             entries: vec![RadrootsListEntry {
   2492                 tag: "p".to_string(),
   2493                 values: vec![profile_pubkey.clone()],
   2494             }],
   2495             title: None,
   2496             description: None,
   2497             image: None,
   2498         };
   2499         let unknown_farm_list_event = list_set_event(
   2500             94,
   2501             &farm_pubkey,
   2502             104,
   2503             KIND_LIST_SET_GENERIC,
   2504             &unknown_farm_list_set,
   2505         );
   2506         assert!(ingest_list_set_event(&exec, &unknown_farm_list_event).is_err());
   2507 
   2508         assert!(parse_farm_list_set_d_tag("farm:AAAAAAAAAAAAAAAAAAAAAA:unknown").is_none());
   2509         assert!(parse_farm_list_set_d_tag("farm:AAAAAAAAAAAAAAAAAAAAAA:plots").is_some());
   2510         assert_eq!(to_value_opt(Some("x".to_string())), Some(Value::from("x")));
   2511         assert_eq!(to_value_opt(None), Some(Value::Null));
   2512         let location = RadrootsFarmLocation {
   2513             primary: Some("p".to_string()),
   2514             city: Some("c".to_string()),
   2515             region: Some("r".to_string()),
   2516             country: Some("k".to_string()),
   2517             gcs: Some(sample_gcs(12.0, 22.0, "s2")),
   2518         };
   2519         assert_eq!(
   2520             unpack_farm_location_strings(Some(&location)).0,
   2521             Some("p".to_string())
   2522         );
   2523         assert_eq!(
   2524             unpack_plot_location_strings(Some(&RadrootsPlotLocation {
   2525                 primary: Some("p".to_string()),
   2526                 city: None,
   2527                 region: None,
   2528                 country: None,
   2529                 gcs: sample_gcs(13.0, 23.0, "s3"),
   2530             }))
   2531             .0,
   2532             Some("p".to_string())
   2533         );
   2534         assert!(ensure_list_set_entries_tag(&bad_image, "p", "x").is_ok());
   2535         assert!(
   2536             ensure_list_set_entries_tag(
   2537                 &RadrootsListSet {
   2538                     d_tag: "x".to_string(),
   2539                     content: String::new(),
   2540                     entries: vec![RadrootsListEntry {
   2541                         tag: "a".to_string(),
   2542                         values: vec!["x".to_string()],
   2543                     }],
   2544                     title: None,
   2545                     description: None,
   2546                     image: None,
   2547                 },
   2548                 "p",
   2549                 "x",
   2550             )
   2551             .is_err()
   2552         );
   2553     }
   2554 
   2555     #[test]
   2556     fn ingest_listing_projects_trade_product_and_removes_archived_replacements() {
   2557         let exec = SqliteExecutor::open_memory().expect("db");
   2558         migrations::run_all_up(&exec).expect("migrations");
   2559 
   2560         let seller_pubkey = "c".repeat(64);
   2561         let listing_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
   2562         let listing_addr = format!("{}:{}:{}", KIND_LISTING, seller_pubkey, listing_d_tag);
   2563 
   2564         let mut active = listing_event(
   2565             500,
   2566             &seller_pubkey,
   2567             10,
   2568             listing_d_tag,
   2569             "active",
   2570             "Pasture Eggs",
   2571         );
   2572         active.tags.push(vec![
   2573             "radroots:discount".to_string(),
   2574             serde_json::json!({
   2575                 "scope": "bin",
   2576                 "threshold": {
   2577                     "kind": "bin_count",
   2578                     "amount": { "bin_id": "bin-a", "min": 1 }
   2579                 },
   2580                 "value": {
   2581                     "kind": "percent",
   2582                     "amount": { "value": "10" }
   2583                 }
   2584             })
   2585             .to_string(),
   2586         ]);
   2587         assert_eq!(
   2588             radroots_replica_ingest_event(&exec, &active).expect("active ingest"),
   2589             RadrootsReplicaIngestOutcome::Applied
   2590         );
   2591 
   2592         let replica = ReplicaSql::new(&exec);
   2593         let search_rows = replica
   2594             .trade_product_search(&["eggs".to_string()])
   2595             .expect("search");
   2596         assert_eq!(search_rows.len(), 1);
   2597         assert_eq!(
   2598             search_rows[0].listing_addr.as_deref(),
   2599             Some(listing_addr.as_str())
   2600         );
   2601         assert_eq!(search_rows[0].title, "Pasture Eggs");
   2602         assert_eq!(search_rows[0].primary_bin_id.as_deref(), Some("bin-a"));
   2603         assert_eq!(
   2604             search_rows[0].verified_primary_bin_id.as_deref(),
   2605             Some("bin-a")
   2606         );
   2607         assert_eq!(search_rows[0].qty_amt, 12.0);
   2608         assert_eq!(search_rows[0].qty_amt_exact.as_deref(), Some("12"));
   2609         assert_eq!(search_rows[0].qty_avail, Some(5));
   2610         assert_eq!(search_rows[0].price_amt, 6.0);
   2611         assert_eq!(search_rows[0].price_amt_exact.as_deref(), Some("6"));
   2612         assert_eq!(search_rows[0].price_currency, "USD");
   2613         assert_eq!(search_rows[0].price_qty_amt, 1.0);
   2614         assert_eq!(search_rows[0].price_qty_amt_exact.as_deref(), Some("1"));
   2615         assert!(
   2616             search_rows[0]
   2617                 .notes
   2618                 .as_deref()
   2619                 .is_some_and(|notes| notes.contains("listing_discounts"))
   2620         );
   2621 
   2622         let updated = listing_event(
   2623             501,
   2624             &seller_pubkey,
   2625             11,
   2626             listing_d_tag,
   2627             "active",
   2628             "Market Eggs",
   2629         );
   2630         assert_eq!(
   2631             radroots_replica_ingest_event(&exec, &updated).expect("listing update"),
   2632             RadrootsReplicaIngestOutcome::Applied
   2633         );
   2634         let product_rows = trade_product::find_many(
   2635             &exec,
   2636             &ITradeProductFindMany {
   2637                 filter: Some(trade_product_listing_addr_filter(&listing_addr)),
   2638             },
   2639         )
   2640         .expect("product rows")
   2641         .results;
   2642         assert_eq!(product_rows.len(), 1);
   2643         assert_eq!(product_rows[0].title, "Market Eggs");
   2644         assert_eq!(product_rows[0].primary_bin_id.as_deref(), Some("bin-a"));
   2645         assert_eq!(
   2646             product_rows[0].verified_primary_bin_id.as_deref(),
   2647             Some("bin-a")
   2648         );
   2649 
   2650         let archived = listing_event(
   2651             502,
   2652             &seller_pubkey,
   2653             12,
   2654             listing_d_tag,
   2655             "archived",
   2656             "Market Eggs",
   2657         );
   2658         assert_eq!(
   2659             radroots_replica_ingest_event(&exec, &archived).expect("archived ingest"),
   2660             RadrootsReplicaIngestOutcome::Applied
   2661         );
   2662         let search_rows = replica
   2663             .trade_product_search(&["eggs".to_string()])
   2664             .expect("search archived");
   2665         assert!(search_rows.is_empty());
   2666 
   2667         let product_rows = trade_product::find_many(
   2668             &exec,
   2669             &ITradeProductFindMany {
   2670                 filter: Some(trade_product_listing_addr_filter(&listing_addr)),
   2671             },
   2672         )
   2673         .expect("archived product rows")
   2674         .results;
   2675         assert!(product_rows.is_empty());
   2676 
   2677         let state = nostr_event_head::find_one(
   2678             &exec,
   2679             &INostrEventHeadFindOne::On(INostrEventHeadFindOneArgs {
   2680                 on: NostrEventHeadQueryBindValues::Key {
   2681                     key: event_head_key(KIND_LISTING, &seller_pubkey, listing_d_tag),
   2682                 },
   2683             }),
   2684         )
   2685         .expect("event state")
   2686         .result
   2687         .expect("state row");
   2688         assert_eq!(state.last_event_id, archived.id);
   2689 
   2690         let stale_active = listing_event(
   2691             499,
   2692             &seller_pubkey,
   2693             11,
   2694             listing_d_tag,
   2695             "active",
   2696             "Stale Eggs",
   2697         );
   2698         assert_eq!(
   2699             radroots_replica_ingest_event(&exec, &stale_active).expect("stale ingest"),
   2700             RadrootsReplicaIngestOutcome::Skipped
   2701         );
   2702         let product_rows = trade_product::find_many(
   2703             &exec,
   2704             &ITradeProductFindMany {
   2705                 filter: Some(trade_product_listing_addr_filter(&listing_addr)),
   2706             },
   2707         )
   2708         .expect("stale product rows")
   2709         .results;
   2710         assert!(product_rows.is_empty());
   2711     }
   2712 
   2713     #[test]
   2714     fn ingest_listing_preserves_fractional_exact_economics() {
   2715         let exec = SqliteExecutor::open_memory().expect("db");
   2716         migrations::run_all_up(&exec).expect("migrations");
   2717 
   2718         let seller_pubkey = "c".repeat(64);
   2719         let listing_d_tag = "AAAAAAAAAAAAAAAAAAAAAg";
   2720         let listing_addr = format!("{}:{}:{}", KIND_LISTING, seller_pubkey, listing_d_tag);
   2721 
   2722         let mut active = listing_event(
   2723             600,
   2724             &seller_pubkey,
   2725             10,
   2726             listing_d_tag,
   2727             "active",
   2728             "Half Gram Greens",
   2729         );
   2730         for tag in &mut active.tags {
   2731             if tag.first().is_some_and(|name| name == "radroots:bin") {
   2732                 tag[2] = "0.5".to_string();
   2733                 tag[3] = "g".to_string();
   2734                 tag[4] = "0.5".to_string();
   2735                 tag[5] = "g".to_string();
   2736                 tag[6] = "half gram".to_string();
   2737             }
   2738             if tag.first().is_some_and(|name| name == "radroots:price") {
   2739                 tag[2] = "3.25".to_string();
   2740                 tag[3] = "USD".to_string();
   2741                 tag[4] = "1".to_string();
   2742                 tag[5] = "g".to_string();
   2743                 tag[6] = "3.25".to_string();
   2744                 tag[7] = "g".to_string();
   2745             }
   2746         }
   2747 
   2748         assert_eq!(
   2749             radroots_replica_ingest_event(&exec, &active).expect("fractional active ingest"),
   2750             RadrootsReplicaIngestOutcome::Applied
   2751         );
   2752 
   2753         let replica = ReplicaSql::new(&exec);
   2754         let search_rows = replica
   2755             .trade_product_search(&["greens".to_string()])
   2756             .expect("search");
   2757         assert_eq!(search_rows.len(), 1);
   2758         assert_eq!(
   2759             search_rows[0].listing_addr.as_deref(),
   2760             Some(listing_addr.as_str())
   2761         );
   2762         assert_eq!(search_rows[0].qty_amt, 0.5);
   2763         assert_eq!(search_rows[0].qty_amt_exact.as_deref(), Some("0.5"));
   2764         assert_eq!(search_rows[0].price_amt, 3.25);
   2765         assert_eq!(search_rows[0].price_amt_exact.as_deref(), Some("3.25"));
   2766         assert_eq!(search_rows[0].price_qty_amt, 1.0);
   2767         assert_eq!(search_rows[0].price_qty_amt_exact.as_deref(), Some("1"));
   2768     }
   2769 
   2770     #[test]
   2771     fn upsert_location_none_paths_are_ok() {
   2772         let exec = SqliteExecutor::open_memory().expect("db");
   2773         migrations::run_all_up(&exec).expect("migrations");
   2774 
   2775         let farm_row = farm::create(
   2776             &exec,
   2777             &IFarmFields {
   2778                 d_tag: "AAAAAAAAAAAAAAAAAAAAAA".to_string(),
   2779                 pubkey: "f".repeat(64),
   2780                 name: "farm-none".to_string(),
   2781                 about: None,
   2782                 website: None,
   2783                 picture: None,
   2784                 banner: None,
   2785                 location_primary: None,
   2786                 location_city: None,
   2787                 location_region: None,
   2788                 location_country: None,
   2789             },
   2790         )
   2791         .expect("farm")
   2792         .result;
   2793         let plot_row = plot::create(
   2794             &exec,
   2795             &IPlotFields {
   2796                 d_tag: "AAAAAAAAAAAAAAAAAAAAAQ".to_string(),
   2797                 farm_id: farm_row.id.clone(),
   2798                 name: "plot-none".to_string(),
   2799                 about: None,
   2800                 location_primary: None,
   2801                 location_city: None,
   2802                 location_region: None,
   2803                 location_country: None,
   2804             },
   2805         )
   2806         .expect("plot")
   2807         .result;
   2808 
   2809         let _ = upsert_farm_location(&exec, &farm_row.id, None, &FixedFactory).expect("farm none");
   2810         let _ = upsert_plot_location(&exec, &plot_row.id, None, &FixedFactory).expect("plot none");
   2811     }
   2812 
   2813     #[test]
   2814     fn ingest_delete_error_paths_are_covered() {
   2815         let exec = SqliteExecutor::open_memory().expect("db");
   2816         let (farm_id, _farm_pubkey, farm_d_tag, _plot_d_tag) = seed_rows(&exec);
   2817 
   2818         let not_found_farm_tags = DeleteErrorExecutor {
   2819             inner: &exec,
   2820             table_name: "farm_tag",
   2821             err: SqlError::NotFound("farm_tag".to_string()),
   2822         };
   2823         assert!(
   2824             upsert_farm_tags(
   2825                 &not_found_farm_tags,
   2826                 &farm_id,
   2827                 Some(vec!["next".to_string()])
   2828             )
   2829             .is_ok()
   2830         );
   2831 
   2832         let not_found_plot_tags = DeleteErrorExecutor {
   2833             inner: &exec,
   2834             table_name: "plot_tag",
   2835             err: SqlError::NotFound("plot_tag".to_string()),
   2836         };
   2837         let plot_id = plot::find_many(&exec, &IPlotFindMany { filter: None })
   2838             .expect("plots")
   2839             .results[0]
   2840             .id
   2841             .clone();
   2842         assert!(
   2843             upsert_plot_tags(
   2844                 &not_found_plot_tags,
   2845                 &plot_id,
   2846                 Some(vec!["next".to_string()])
   2847             )
   2848             .is_ok()
   2849         );
   2850         assert!(
   2851             upsert_plot_tags(
   2852                 &exec,
   2853                 &plot_id,
   2854                 Some(vec!["next".to_string(), " ".to_string()])
   2855             )
   2856             .is_ok()
   2857         );
   2858 
   2859         let not_found_farm_locations = DeleteErrorExecutor {
   2860             inner: &exec,
   2861             table_name: "farm_gcs_location",
   2862             err: SqlError::NotFound("farm_gcs_location".to_string()),
   2863         };
   2864         assert!(
   2865             upsert_farm_location(
   2866                 &not_found_farm_locations,
   2867                 &farm_id,
   2868                 Some(RadrootsFarmLocation {
   2869                     primary: None,
   2870                     city: None,
   2871                     region: None,
   2872                     country: None,
   2873                     gcs: Some(sample_gcs(1.0, 2.0, "s4")),
   2874                 }),
   2875                 &FixedFactory,
   2876             )
   2877             .is_ok()
   2878         );
   2879         assert!(
   2880             upsert_farm_location(
   2881                 &exec,
   2882                 &farm_id,
   2883                 Some(RadrootsFarmLocation {
   2884                     primary: Some("manual".to_string()),
   2885                     city: Some("San Francisco".to_string()),
   2886                     region: Some("CA".to_string()),
   2887                     country: Some("US".to_string()),
   2888                     gcs: None,
   2889                 }),
   2890                 &FixedFactory,
   2891             )
   2892             .is_ok()
   2893         );
   2894         assert!(
   2895             farm_gcs_location::find_many(&exec, &IFarmGcsLocationFindMany { filter: None })
   2896                 .expect("farm locations after string-only upsert")
   2897                 .results
   2898                 .is_empty()
   2899         );
   2900         assert!(
   2901             upsert_farm_location(
   2902                 &exec,
   2903                 &farm_id,
   2904                 Some(RadrootsFarmLocation {
   2905                     primary: None,
   2906                     city: None,
   2907                     region: None,
   2908                     country: None,
   2909                     gcs: Some(sample_gcs(2.0, 3.0, "s6")),
   2910                 }),
   2911                 &FixedFactory,
   2912             )
   2913             .is_ok()
   2914         );
   2915 
   2916         let not_found_plot_locations = DeleteErrorExecutor {
   2917             inner: &exec,
   2918             table_name: "plot_gcs_location",
   2919             err: SqlError::NotFound("plot_gcs_location".to_string()),
   2920         };
   2921         assert!(
   2922             upsert_plot_location(
   2923                 &not_found_plot_locations,
   2924                 &plot_id,
   2925                 Some(RadrootsPlotLocation {
   2926                     primary: None,
   2927                     city: None,
   2928                     region: None,
   2929                     country: None,
   2930                     gcs: sample_gcs(1.1, 2.1, "s5"),
   2931                 }),
   2932                 &FixedFactory,
   2933             )
   2934             .is_ok()
   2935         );
   2936 
   2937         let members_list_set =
   2938             farm_list_sets::farm_members_list_set(&farm_d_tag, vec!["7".repeat(64)])
   2939                 .expect("members");
   2940         assert!(
   2941             upsert_farm_members(&exec, &farm_id, ListSetRole::Members, &members_list_set).is_ok()
   2942         );
   2943         let not_found_members = DeleteErrorExecutor {
   2944             inner: &exec,
   2945             table_name: "farm_member",
   2946             err: SqlError::NotFound("farm_member".to_string()),
   2947         };
   2948         let not_found_members_list_set =
   2949             farm_list_sets::farm_members_list_set(&farm_d_tag, vec!["a".repeat(64)])
   2950                 .expect("not found members");
   2951         assert!(
   2952             upsert_farm_members(
   2953                 &not_found_members,
   2954                 &farm_id,
   2955                 ListSetRole::Members,
   2956                 &not_found_members_list_set,
   2957             )
   2958             .is_ok()
   2959         );
   2960         assert!(
   2961             upsert_farm_members(
   2962                 &not_found_members,
   2963                 &farm_id,
   2964                 ListSetRole::Plots,
   2965                 &not_found_members_list_set,
   2966             )
   2967             .is_ok()
   2968         );
   2969 
   2970         let member_claims =
   2971             farm_list_sets::member_of_farms_list_set(vec!["3".repeat(64)]).expect("claims");
   2972         assert!(upsert_member_claims(&exec, &"6".repeat(64), &member_claims).is_ok());
   2973         let not_found_claims = DeleteErrorExecutor {
   2974             inner: &exec,
   2975             table_name: "farm_member_claim",
   2976             err: SqlError::NotFound("farm_member_claim".to_string()),
   2977         };
   2978         let not_found_member_claims =
   2979             farm_list_sets::member_of_farms_list_set(vec!["2".repeat(64)]).expect("claims nf");
   2980         assert!(
   2981             upsert_member_claims(&not_found_claims, &"6".repeat(64), &not_found_member_claims)
   2982                 .is_ok()
   2983         );
   2984         assert!(not_found_claims.begin().is_ok());
   2985         assert!(not_found_claims.commit().is_ok());
   2986         let _ = not_found_claims.rollback();
   2987         assert!(not_found_claims.query_raw("SELECT 1", "[]").is_ok());
   2988         assert_eq!(
   2989             not_found_claims
   2990                 .exec("DELETE FROM farm_member_claim WHERE id = 1", "[]")
   2991                 .expect_err("exec not found")
   2992                 .code(),
   2993             "ERR_NOT_FOUND"
   2994         );
   2995         let _ = not_found_claims.exec("DELETE FROM other_table WHERE id = 1", "[]");
   2996 
   2997         let internal_farm_tags = DeleteErrorExecutor {
   2998             inner: &exec,
   2999             table_name: "farm_tag",
   3000             err: SqlError::Internal,
   3001         };
   3002         assert!(
   3003             upsert_farm_tags(&internal_farm_tags, &farm_id, Some(vec!["x".to_string()])).is_err()
   3004         );
   3005 
   3006         let internal_plot_tags = DeleteErrorExecutor {
   3007             inner: &exec,
   3008             table_name: "plot_tag",
   3009             err: SqlError::Internal,
   3010         };
   3011         assert!(
   3012             upsert_plot_tags(&internal_plot_tags, &plot_id, Some(vec!["x".to_string()])).is_err()
   3013         );
   3014 
   3015         let internal_farm_locations = DeleteErrorExecutor {
   3016             inner: &exec,
   3017             table_name: "farm_gcs_location",
   3018             err: SqlError::Internal,
   3019         };
   3020         assert!(
   3021             upsert_farm_location(
   3022                 &internal_farm_locations,
   3023                 &farm_id,
   3024                 Some(RadrootsFarmLocation {
   3025                     primary: None,
   3026                     city: None,
   3027                     region: None,
   3028                     country: None,
   3029                     gcs: Some(sample_gcs(2.0, 3.0, "s6")),
   3030                 }),
   3031                 &FixedFactory,
   3032             )
   3033             .is_err()
   3034         );
   3035 
   3036         let internal_plot_locations = DeleteErrorExecutor {
   3037             inner: &exec,
   3038             table_name: "plot_gcs_location",
   3039             err: SqlError::Internal,
   3040         };
   3041         assert!(
   3042             upsert_plot_location(
   3043                 &internal_plot_locations,
   3044                 &plot_id,
   3045                 Some(RadrootsPlotLocation {
   3046                     primary: None,
   3047                     city: None,
   3048                     region: None,
   3049                     country: None,
   3050                     gcs: sample_gcs(2.1, 3.1, "s7"),
   3051                 }),
   3052                 &FixedFactory,
   3053             )
   3054             .is_err()
   3055         );
   3056 
   3057         let internal_members = DeleteErrorExecutor {
   3058             inner: &exec,
   3059             table_name: "farm_member",
   3060             err: SqlError::Internal,
   3061         };
   3062         assert!(
   3063             upsert_farm_members(
   3064                 &internal_members,
   3065                 &farm_id,
   3066                 ListSetRole::Members,
   3067                 &members_list_set,
   3068             )
   3069             .is_err()
   3070         );
   3071 
   3072         let internal_claims = DeleteErrorExecutor {
   3073             inner: &exec,
   3074             table_name: "farm_member_claim",
   3075             err: SqlError::Internal,
   3076         };
   3077         assert!(upsert_member_claims(&internal_claims, &"6".repeat(64), &member_claims).is_err());
   3078     }
   3079 
   3080     #[test]
   3081     fn create_gcs_location_error_mapping_helpers_are_covered() {
   3082         let point_json_err = serde_json::from_str::<Value>("{").expect_err("invalid json");
   3083         let point_err = map_gcs_point_serialize_error(point_json_err);
   3084         assert_eq!(point_err.to_string(), "replica_sync.data: gcs.point");
   3085 
   3086         let polygon_json_err = serde_json::from_str::<Value>("{").expect_err("invalid json");
   3087         let polygon_err = map_gcs_polygon_serialize_error(polygon_json_err);
   3088         assert_eq!(polygon_err.to_string(), "replica_sync.data: gcs.polygon");
   3089     }
   3090 
   3091     #[test]
   3092     fn ingest_pass_executor_and_parse_edge_paths_are_covered() {
   3093         let exec = SqliteExecutor::open_memory().expect("db");
   3094         migrations::run_all_up(&exec).expect("migrations");
   3095         let pass = PassExecutor { inner: &exec };
   3096 
   3097         let profile_pubkey = "9".repeat(64);
   3098         let farm_pubkey = "f".repeat(64);
   3099         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   3100         let plot_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
   3101 
   3102         let profile = profile_event(
   3103             500,
   3104             &profile_pubkey,
   3105             50,
   3106             Some(RadrootsProfileType::Individual),
   3107             "pass-profile",
   3108         );
   3109         assert_eq!(
   3110             radroots_replica_ingest_event_with_factory(&pass, &profile, &FixedFactory)
   3111                 .expect("profile ingest"),
   3112             RadrootsReplicaIngestOutcome::Applied
   3113         );
   3114         assert_eq!(
   3115             ingest_profile_event(&pass, &profile).expect("profile skip"),
   3116             RadrootsReplicaIngestOutcome::Skipped
   3117         );
   3118 
   3119         let farm = farm_event(
   3120             501,
   3121             &farm_pubkey,
   3122             51,
   3123             farm_d_tag,
   3124             "pass-farm",
   3125             Some(RadrootsFarmLocation {
   3126                 primary: Some("primary".to_string()),
   3127                 city: Some("city".to_string()),
   3128                 region: Some("region".to_string()),
   3129                 country: Some("country".to_string()),
   3130                 gcs: Some(sample_gcs(10.0, 20.0, "s0")),
   3131             }),
   3132             Some(vec!["coffee".to_string(), "coffee".to_string()]),
   3133         );
   3134         assert_eq!(
   3135             ingest_farm_event(&pass, &farm, &FixedFactory).expect("farm ingest"),
   3136             RadrootsReplicaIngestOutcome::Applied
   3137         );
   3138 
   3139         let plot = plot_event(
   3140             502,
   3141             &farm_pubkey,
   3142             52,
   3143             plot_d_tag,
   3144             RadrootsFarmRef {
   3145                 pubkey: farm_pubkey.clone(),
   3146                 d_tag: farm_d_tag.to_string(),
   3147             },
   3148             "pass-plot",
   3149             Some(RadrootsPlotLocation {
   3150                 primary: Some("plot".to_string()),
   3151                 city: None,
   3152                 region: None,
   3153                 country: None,
   3154                 gcs: sample_gcs(11.0, 21.0, "s1"),
   3155             }),
   3156             Some(vec!["orchard".to_string()]),
   3157         );
   3158         assert_eq!(
   3159             ingest_plot_event(&pass, &plot, &FixedFactory).expect("plot ingest"),
   3160             RadrootsReplicaIngestOutcome::Applied
   3161         );
   3162 
   3163         let members =
   3164             farm_list_sets::farm_members_list_set(farm_d_tag, vec!["6".repeat(64)]).expect("list");
   3165         let members_event = list_set_event(503, &farm_pubkey, 53, KIND_LIST_SET_GENERIC, &members);
   3166         assert_eq!(
   3167             ingest_list_set_event(&pass, &members_event).expect("members list set"),
   3168             RadrootsReplicaIngestOutcome::Applied
   3169         );
   3170 
   3171         let claims = farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()])
   3172             .expect("claims list set");
   3173         let claims_event = list_set_event(504, &profile_pubkey, 54, KIND_LIST_SET_GENERIC, &claims);
   3174         assert_eq!(
   3175             ingest_list_set_event(&pass, &claims_event).expect("claims list set"),
   3176             RadrootsReplicaIngestOutcome::Applied
   3177         );
   3178 
   3179         let farm_row = find_farm_by_ref(&pass, &farm_pubkey, farm_d_tag).expect("farm row");
   3180         let mixed_member_entries = RadrootsListSet {
   3181             d_tag: format!("farm:{farm_d_tag}:members"),
   3182             content: String::new(),
   3183             entries: vec![
   3184                 RadrootsListEntry {
   3185                     tag: "p".to_string(),
   3186                     values: Vec::new(),
   3187                 },
   3188                 RadrootsListEntry {
   3189                     tag: "p".to_string(),
   3190                     values: vec!["6".repeat(64)],
   3191                 },
   3192             ],
   3193             title: None,
   3194             description: None,
   3195             image: None,
   3196         };
   3197         assert!(
   3198             upsert_farm_members(
   3199                 &pass,
   3200                 &farm_row.id,
   3201                 ListSetRole::Members,
   3202                 &mixed_member_entries
   3203             )
   3204             .is_ok()
   3205         );
   3206         let mixed_claim_entries = RadrootsListSet {
   3207             d_tag: "member_of.farms".to_string(),
   3208             content: String::new(),
   3209             entries: vec![
   3210                 RadrootsListEntry {
   3211                     tag: "p".to_string(),
   3212                     values: Vec::new(),
   3213                 },
   3214                 RadrootsListEntry {
   3215                     tag: "p".to_string(),
   3216                     values: vec![farm_pubkey.clone()],
   3217                 },
   3218             ],
   3219             title: None,
   3220             description: None,
   3221             image: None,
   3222         };
   3223         assert!(upsert_member_claims(&pass, &profile_pubkey, &mixed_claim_entries).is_ok());
   3224         assert!(pass.begin().is_ok());
   3225         assert!(pass.rollback().is_ok());
   3226 
   3227         assert!(parse_farm_list_set_d_tag("coop:AAAAAAAAAAAAAAAAAAAAAA:members").is_none());
   3228         assert!(parse_farm_list_set_d_tag("farm:AAAAAAAAAAAAAAAAAAAAAA").is_none());
   3229         assert!(parse_farm_list_set_d_tag("farm:AAAAAAAAAAAAAAAAAAAAAA:members").is_some());
   3230     }
   3231 
   3232     #[test]
   3233     fn create_gcs_location_success_path_is_covered() {
   3234         let exec = SqliteExecutor::open_memory().expect("db");
   3235         migrations::run_all_up(&exec).expect("migrations");
   3236 
   3237         let id = create_gcs_location(&exec, sample_gcs(1.0, 2.0, "s0"), &FixedFactory)
   3238             .expect("create gcs");
   3239         assert!(!id.trim().is_empty());
   3240     }
   3241 
   3242     #[test]
   3243     fn ingest_default_factory_wrapper_paths_are_covered() {
   3244         let exec = SqliteExecutor::open_memory().expect("db");
   3245         migrations::run_all_up(&exec).expect("migrations");
   3246 
   3247         let farm_pubkey = "f".repeat(64);
   3248         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   3249         let farm_create = farm_event(
   3250             600,
   3251             &farm_pubkey,
   3252             60,
   3253             farm_d_tag,
   3254             "wrapper-farm",
   3255             Some(RadrootsFarmLocation {
   3256                 primary: Some("primary".to_string()),
   3257                 city: None,
   3258                 region: None,
   3259                 country: None,
   3260                 gcs: Some(sample_gcs(10.0, 20.0, "s0")),
   3261             }),
   3262             Some(vec!["coffee".to_string()]),
   3263         );
   3264         assert_eq!(
   3265             radroots_replica_ingest_event(&exec, &farm_create).expect("farm create"),
   3266             RadrootsReplicaIngestOutcome::Applied
   3267         );
   3268 
   3269         let farm_update = farm_event(
   3270             601,
   3271             &farm_pubkey,
   3272             61,
   3273             farm_d_tag,
   3274             "wrapper-farm-updated",
   3275             None,
   3276             Some(vec!["market".to_string()]),
   3277         );
   3278         assert_eq!(
   3279             radroots_replica_ingest_event(&exec, &farm_update).expect("farm update"),
   3280             RadrootsReplicaIngestOutcome::Applied
   3281         );
   3282 
   3283         let plot_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
   3284         let plot_create = plot_event(
   3285             602,
   3286             &farm_pubkey,
   3287             62,
   3288             plot_d_tag,
   3289             RadrootsFarmRef {
   3290                 pubkey: farm_pubkey.clone(),
   3291                 d_tag: farm_d_tag.to_string(),
   3292             },
   3293             "wrapper-plot",
   3294             Some(RadrootsPlotLocation {
   3295                 primary: Some("plot-primary".to_string()),
   3296                 city: None,
   3297                 region: None,
   3298                 country: None,
   3299                 gcs: sample_gcs(11.0, 21.0, "s1"),
   3300             }),
   3301             Some(vec!["orchard".to_string()]),
   3302         );
   3303         assert_eq!(
   3304             radroots_replica_ingest_event(&exec, &plot_create).expect("plot create"),
   3305             RadrootsReplicaIngestOutcome::Applied
   3306         );
   3307 
   3308         let plot_update = plot_event(
   3309             603,
   3310             &farm_pubkey,
   3311             63,
   3312             plot_d_tag,
   3313             RadrootsFarmRef {
   3314                 pubkey: farm_pubkey.clone(),
   3315                 d_tag: farm_d_tag.to_string(),
   3316             },
   3317             "wrapper-plot-updated",
   3318             None,
   3319             Some(vec!["updated".to_string()]),
   3320         );
   3321         assert_eq!(
   3322             radroots_replica_ingest_event(&exec, &plot_update).expect("plot update"),
   3323             RadrootsReplicaIngestOutcome::Applied
   3324         );
   3325     }
   3326 
   3327     #[test]
   3328     fn ingest_txn_executor_instantiation_error_paths_are_covered() {
   3329         let pass_db = SqliteExecutor::open_memory().expect("db");
   3330         migrations::run_all_up(&pass_db).expect("migrations");
   3331         let pass_txn = TxnExecutor {
   3332             inner: Some(&pass_db),
   3333             begin_err: None,
   3334             commit_err: None,
   3335             rollback_count: Arc::new(AtomicUsize::new(0)),
   3336         };
   3337 
   3338         let profile_pubkey = "9".repeat(64);
   3339         let profile_event_row = profile_event(
   3340             700,
   3341             &profile_pubkey,
   3342             70,
   3343             Some(RadrootsProfileType::Individual),
   3344             "txn-profile",
   3345         );
   3346         assert_eq!(
   3347             ingest_profile_event(&pass_txn, &profile_event_row).expect("txn profile"),
   3348             RadrootsReplicaIngestOutcome::Applied
   3349         );
   3350         let profile_decision =
   3351             event_head_decision(&pass_txn, &profile_event_row).expect("profile decision");
   3352         assert!(!profile_decision.apply);
   3353         assert!(radroots_replica_ingest_event_head(&pass_txn, &profile_event_row).is_ok());
   3354         assert_eq!(
   3355             radroots_replica_ingest_event_with_factory(
   3356                 &pass_txn,
   3357                 &profile_event_row,
   3358                 &FixedFactory
   3359             )
   3360             .expect("txn wrapper"),
   3361             RadrootsReplicaIngestOutcome::Skipped
   3362         );
   3363 
   3364         let farm_pubkey = "f".repeat(64);
   3365         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   3366         let farm_event_row = farm_event(
   3367             701,
   3368             &farm_pubkey,
   3369             71,
   3370             farm_d_tag,
   3371             "txn-farm",
   3372             Some(RadrootsFarmLocation {
   3373                 primary: Some("primary".to_string()),
   3374                 city: None,
   3375                 region: None,
   3376                 country: None,
   3377                 gcs: Some(sample_gcs(12.0, 22.0, "s2")),
   3378             }),
   3379             Some(vec!["coffee".to_string()]),
   3380         );
   3381         assert_eq!(
   3382             ingest_farm_event(&pass_txn, &farm_event_row, &FixedFactory).expect("txn farm"),
   3383             RadrootsReplicaIngestOutcome::Applied
   3384         );
   3385 
   3386         let plot_event_row = plot_event(
   3387             702,
   3388             &farm_pubkey,
   3389             72,
   3390             "AAAAAAAAAAAAAAAAAAAAAQ",
   3391             RadrootsFarmRef {
   3392                 pubkey: farm_pubkey.clone(),
   3393                 d_tag: farm_d_tag.to_string(),
   3394             },
   3395             "txn-plot",
   3396             Some(RadrootsPlotLocation {
   3397                 primary: Some("primary".to_string()),
   3398                 city: None,
   3399                 region: None,
   3400                 country: None,
   3401                 gcs: sample_gcs(13.0, 23.0, "s3"),
   3402             }),
   3403             Some(vec!["orchard".to_string()]),
   3404         );
   3405         assert_eq!(
   3406             ingest_plot_event(&pass_txn, &plot_event_row, &FixedFactory).expect("txn plot"),
   3407             RadrootsReplicaIngestOutcome::Applied
   3408         );
   3409 
   3410         let farm_row = find_farm_by_ref(&pass_txn, &farm_pubkey, farm_d_tag).expect("find farm");
   3411         assert!(upsert_farm_tags(&pass_txn, &farm_row.id, Some(vec!["x".to_string()])).is_ok());
   3412         let plot_id = plot::find_many(&pass_db, &IPlotFindMany { filter: None })
   3413             .expect("plots")
   3414             .results[0]
   3415             .id
   3416             .clone();
   3417         assert!(upsert_plot_tags(&pass_txn, &plot_id, Some(vec!["y".to_string()])).is_ok());
   3418         assert!(clear_farm_locations(&pass_txn, &farm_row.id).is_ok());
   3419         assert!(clear_plot_locations(&pass_txn, &plot_id).is_ok());
   3420         assert!(
   3421             create_gcs_location(&pass_txn, sample_gcs(14.0, 24.0, "s4"), &FixedFactory).is_ok()
   3422         );
   3423         assert!(
   3424             upsert_farm_location(
   3425                 &pass_txn,
   3426                 &farm_row.id,
   3427                 Some(RadrootsFarmLocation {
   3428                     primary: Some("primary".to_string()),
   3429                     city: None,
   3430                     region: None,
   3431                     country: None,
   3432                     gcs: Some(sample_gcs(15.0, 25.0, "s5")),
   3433                 }),
   3434                 &FixedFactory,
   3435             )
   3436             .is_ok()
   3437         );
   3438         assert!(
   3439             upsert_plot_location(
   3440                 &pass_txn,
   3441                 &plot_id,
   3442                 Some(RadrootsPlotLocation {
   3443                     primary: Some("primary".to_string()),
   3444                     city: None,
   3445                     region: None,
   3446                     country: None,
   3447                     gcs: sample_gcs(16.0, 26.0, "s6"),
   3448                 }),
   3449                 &FixedFactory,
   3450             )
   3451             .is_ok()
   3452         );
   3453         let members_list =
   3454             farm_list_sets::farm_members_list_set(farm_d_tag, vec!["6".repeat(64)]).expect("list");
   3455         assert!(
   3456             upsert_farm_members(&pass_txn, &farm_row.id, ListSetRole::Members, &members_list)
   3457                 .is_ok()
   3458         );
   3459         let member_of_list =
   3460             farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()]).expect("member_of");
   3461         assert!(upsert_member_claims(&pass_txn, &"6".repeat(64), &member_of_list).is_ok());
   3462 
   3463         let rollback_count = Arc::new(AtomicUsize::new(0));
   3464         let txn = TxnExecutor {
   3465             inner: None,
   3466             begin_err: None,
   3467             commit_err: None,
   3468             rollback_count,
   3469         };
   3470 
   3471         assert!(ingest_profile_event(&txn, &profile_event_row).is_err());
   3472         assert!(event_head_decision(&txn, &profile_event_row).is_err());
   3473         assert!(radroots_replica_ingest_event_head(&txn, &profile_event_row).is_err());
   3474         assert!(
   3475             radroots_replica_ingest_event_with_factory(&txn, &profile_event_row, &FixedFactory)
   3476                 .is_err()
   3477         );
   3478 
   3479         assert!(ingest_farm_event(&txn, &farm_event_row, &FixedFactory).is_err());
   3480 
   3481         assert!(ingest_plot_event(&txn, &plot_event_row, &FixedFactory).is_err());
   3482 
   3483         assert!(find_farm_by_ref(&txn, &farm_pubkey, farm_d_tag).is_err());
   3484         assert!(upsert_farm_tags(&txn, "farm-id", Some(vec!["x".to_string()])).is_err());
   3485         assert!(upsert_plot_tags(&txn, "plot-id", Some(vec!["y".to_string()])).is_err());
   3486         assert!(clear_farm_locations(&txn, "farm-id").is_err());
   3487         assert!(clear_plot_locations(&txn, "plot-id").is_err());
   3488         assert!(create_gcs_location(&txn, sample_gcs(14.0, 24.0, "s4"), &FixedFactory).is_err());
   3489         assert!(
   3490             upsert_farm_location(
   3491                 &txn,
   3492                 "farm-id",
   3493                 Some(RadrootsFarmLocation {
   3494                     primary: Some("primary".to_string()),
   3495                     city: None,
   3496                     region: None,
   3497                     country: None,
   3498                     gcs: Some(sample_gcs(15.0, 25.0, "s5")),
   3499                 }),
   3500                 &FixedFactory,
   3501             )
   3502             .is_err()
   3503         );
   3504         assert!(
   3505             upsert_plot_location(
   3506                 &txn,
   3507                 "plot-id",
   3508                 Some(RadrootsPlotLocation {
   3509                     primary: Some("primary".to_string()),
   3510                     city: None,
   3511                     region: None,
   3512                     country: None,
   3513                     gcs: sample_gcs(16.0, 26.0, "s6"),
   3514                 }),
   3515                 &FixedFactory,
   3516             )
   3517             .is_err()
   3518         );
   3519         assert!(upsert_farm_members(&txn, "farm-id", ListSetRole::Members, &members_list).is_err());
   3520         assert!(upsert_member_claims(&txn, &"6".repeat(64), &member_of_list).is_err());
   3521     }
   3522 
   3523     #[test]
   3524     fn ingest_sqlite_queryfail_and_parser_edges_are_covered() {
   3525         let exec = SqliteExecutor::open_memory().expect("db");
   3526         migrations::run_all_up(&exec).expect("migrations");
   3527         let pass_through = QueryFailExecutor {
   3528             inner: &exec,
   3529             needle: "__missing__",
   3530             err: SqlError::Internal,
   3531         };
   3532         assert!(pass_through.query_raw("select 1", "[]").is_ok());
   3533         assert!(
   3534             pass_through
   3535                 .exec(
   3536                     "create table if not exists coverage_probe (id integer)",
   3537                     "[]"
   3538                 )
   3539                 .is_ok()
   3540         );
   3541         let _ = pass_through.begin();
   3542         let _ = pass_through.rollback();
   3543         let _ = pass_through.commit();
   3544 
   3545         let farm_pubkey = "f".repeat(64);
   3546         let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAA";
   3547         let plot_d_tag = "AAAAAAAAAAAAAAAAAAAAAQ";
   3548         let profile_pubkey = "9".repeat(64);
   3549 
   3550         let profile = profile_event(
   3551             800,
   3552             &profile_pubkey,
   3553             80,
   3554             Some(RadrootsProfileType::Individual),
   3555             "profile-base",
   3556         );
   3557         let mut profile_bad_content = profile.clone();
   3558         profile_bad_content.content = "{".to_string();
   3559         assert!(ingest_profile_event(&exec, &profile_bad_content).is_err());
   3560 
   3561         let profile_query_fail = QueryFailExecutor {
   3562             inner: &exec,
   3563             needle: "nostr_profile",
   3564             err: SqlError::Internal,
   3565         };
   3566         assert!(ingest_profile_event(&profile_query_fail, &profile).is_err());
   3567 
   3568         assert_eq!(
   3569             ingest_profile_event(&exec, &profile).expect("profile seed"),
   3570             RadrootsReplicaIngestOutcome::Applied
   3571         );
   3572         let profile_update = profile_event(
   3573             801,
   3574             &profile_pubkey,
   3575             81,
   3576             Some(RadrootsProfileType::Individual),
   3577             "profile-update",
   3578         );
   3579         let profile_update_fail = QueryFailExecutor {
   3580             inner: &exec,
   3581             needle: "update nostr_profile",
   3582             err: SqlError::Internal,
   3583         };
   3584         assert!(ingest_profile_event(&profile_update_fail, &profile_update).is_err());
   3585 
   3586         let profile_create_fail = QueryFailExecutor {
   3587             inner: &exec,
   3588             needle: "insert into nostr_profile",
   3589             err: SqlError::Internal,
   3590         };
   3591         let profile_new = profile_event(
   3592             802,
   3593             &"7".repeat(64),
   3594             82,
   3595             Some(RadrootsProfileType::Individual),
   3596             "profile-new",
   3597         );
   3598         assert!(ingest_profile_event(&profile_create_fail, &profile_new).is_err());
   3599 
   3600         let profile_state_fail = QueryFailExecutor {
   3601             inner: &exec,
   3602             needle: "nostr_event_head",
   3603             err: SqlError::Internal,
   3604         };
   3605         let profile_state_event = profile_event(
   3606             803,
   3607             &"c".repeat(64),
   3608             83,
   3609             Some(RadrootsProfileType::Individual),
   3610             "profile-state",
   3611         );
   3612         assert!(ingest_profile_event(&profile_state_fail, &profile_state_event).is_err());
   3613 
   3614         let farm_seed = farm_event(
   3615             810,
   3616             &farm_pubkey,
   3617             90,
   3618             farm_d_tag,
   3619             "farm-seed",
   3620             Some(RadrootsFarmLocation {
   3621                 primary: Some("primary".to_string()),
   3622                 city: Some("city".to_string()),
   3623                 region: Some("region".to_string()),
   3624                 country: Some("country".to_string()),
   3625                 gcs: Some(sample_gcs(10.0, 20.0, "s0")),
   3626             }),
   3627             Some(vec!["seed".to_string()]),
   3628         );
   3629         assert_eq!(
   3630             ingest_farm_event(&exec, &farm_seed, &FixedFactory).expect("farm seed"),
   3631             RadrootsReplicaIngestOutcome::Applied
   3632         );
   3633 
   3634         let mut farm_bad_content = farm_seed.clone();
   3635         farm_bad_content.content = "{".to_string();
   3636         assert!(ingest_farm_event(&exec, &farm_bad_content, &FixedFactory).is_err());
   3637 
   3638         let farm_query_fail = QueryFailExecutor {
   3639             inner: &exec,
   3640             needle: "from farm",
   3641             err: SqlError::Internal,
   3642         };
   3643         let farm_query_event = farm_event(
   3644             811,
   3645             &"a".repeat(64),
   3646             91,
   3647             farm_d_tag,
   3648             "farm-query",
   3649             None,
   3650             None,
   3651         );
   3652         assert!(ingest_farm_event(&farm_query_fail, &farm_query_event, &FixedFactory).is_err());
   3653 
   3654         let farm_update_fail = QueryFailExecutor {
   3655             inner: &exec,
   3656             needle: "update farm",
   3657             err: SqlError::Internal,
   3658         };
   3659         let farm_update = farm_event(
   3660             812,
   3661             &farm_pubkey,
   3662             92,
   3663             farm_d_tag,
   3664             "farm-update",
   3665             None,
   3666             Some(vec!["u".to_string()]),
   3667         );
   3668         assert!(ingest_farm_event(&farm_update_fail, &farm_update, &FixedFactory).is_err());
   3669 
   3670         let farm_create_fail = QueryFailExecutor {
   3671             inner: &exec,
   3672             needle: "insert into farm",
   3673             err: SqlError::Internal,
   3674         };
   3675         let farm_create = farm_event(
   3676             813,
   3677             &"c".repeat(64),
   3678             93,
   3679             farm_d_tag,
   3680             "farm-create",
   3681             None,
   3682             None,
   3683         );
   3684         assert!(ingest_farm_event(&farm_create_fail, &farm_create, &FixedFactory).is_err());
   3685 
   3686         let farm_tag_fail = QueryFailExecutor {
   3687             inner: &exec,
   3688             needle: "farm_tag",
   3689             err: SqlError::Internal,
   3690         };
   3691         let farm_tag_event = farm_event(
   3692             814,
   3693             &"d".repeat(64),
   3694             94,
   3695             farm_d_tag,
   3696             "farm-tag",
   3697             None,
   3698             Some(vec!["coffee".to_string()]),
   3699         );
   3700         assert!(ingest_farm_event(&farm_tag_fail, &farm_tag_event, &FixedFactory).is_err());
   3701 
   3702         let farm_gcs_fail = QueryFailExecutor {
   3703             inner: &exec,
   3704             needle: "gcs_location",
   3705             err: SqlError::Internal,
   3706         };
   3707         let farm_gcs_event = farm_event(
   3708             815,
   3709             &"0".repeat(64),
   3710             95,
   3711             farm_d_tag,
   3712             "farm-gcs",
   3713             Some(RadrootsFarmLocation {
   3714                 primary: Some("primary".to_string()),
   3715                 city: None,
   3716                 region: None,
   3717                 country: None,
   3718                 gcs: Some(sample_gcs(11.0, 21.0, "s1")),
   3719             }),
   3720             None,
   3721         );
   3722         assert!(ingest_farm_event(&farm_gcs_fail, &farm_gcs_event, &FixedFactory).is_err());
   3723 
   3724         let farm_rel_fail = QueryFailExecutor {
   3725             inner: &exec,
   3726             needle: "farm_gcs_location",
   3727             err: SqlError::Internal,
   3728         };
   3729         let farm_rel_event = farm_event(
   3730             816,
   3731             &"b".repeat(64),
   3732             96,
   3733             farm_d_tag,
   3734             "farm-rel",
   3735             Some(RadrootsFarmLocation {
   3736                 primary: Some("primary".to_string()),
   3737                 city: None,
   3738                 region: None,
   3739                 country: None,
   3740                 gcs: Some(sample_gcs(12.0, 22.0, "s2")),
   3741             }),
   3742             None,
   3743         );
   3744         assert!(ingest_farm_event(&farm_rel_fail, &farm_rel_event, &FixedFactory).is_err());
   3745 
   3746         let farm_state_fail = QueryFailExecutor {
   3747             inner: &exec,
   3748             needle: "nostr_event_head",
   3749             err: SqlError::Internal,
   3750         };
   3751         let farm_state_event = farm_event(
   3752             817,
   3753             &"0".repeat(64),
   3754             97,
   3755             farm_d_tag,
   3756             "farm-state",
   3757             None,
   3758             None,
   3759         );
   3760         assert!(ingest_farm_event(&farm_state_fail, &farm_state_event, &FixedFactory).is_err());
   3761 
   3762         let mut bad_point = sample_gcs(13.0, 23.0, "s3");
   3763         bad_point.point.coordinates = [f64::NAN, 13.0];
   3764         let farm_bad_point = farm_event(
   3765             818,
   3766             &"1".repeat(64),
   3767             98,
   3768             farm_d_tag,
   3769             "farm-bad-point",
   3770             Some(RadrootsFarmLocation {
   3771                 primary: Some("primary".to_string()),
   3772                 city: None,
   3773                 region: None,
   3774                 country: None,
   3775                 gcs: Some(bad_point),
   3776             }),
   3777             None,
   3778         );
   3779         assert!(ingest_farm_event(&exec, &farm_bad_point, &FixedFactory).is_err());
   3780 
   3781         let mut bad_polygon = sample_gcs(14.0, 24.0, "s4");
   3782         bad_polygon.polygon.coordinates[0][1][0] = f64::NAN;
   3783         let farm_bad_polygon = farm_event(
   3784             819,
   3785             &"2".repeat(64),
   3786             99,
   3787             farm_d_tag,
   3788             "farm-bad-polygon",
   3789             Some(RadrootsFarmLocation {
   3790                 primary: Some("primary".to_string()),
   3791                 city: None,
   3792                 region: None,
   3793                 country: None,
   3794                 gcs: Some(bad_polygon),
   3795             }),
   3796             None,
   3797         );
   3798         assert!(ingest_farm_event(&exec, &farm_bad_polygon, &FixedFactory).is_err());
   3799 
   3800         let plot_seed = plot_event(
   3801             820,
   3802             &farm_pubkey,
   3803             100,
   3804             plot_d_tag,
   3805             RadrootsFarmRef {
   3806                 pubkey: farm_pubkey.clone(),
   3807                 d_tag: farm_d_tag.to_string(),
   3808             },
   3809             "plot-seed",
   3810             Some(RadrootsPlotLocation {
   3811                 primary: Some("primary".to_string()),
   3812                 city: None,
   3813                 region: None,
   3814                 country: None,
   3815                 gcs: sample_gcs(15.0, 25.0, "s5"),
   3816             }),
   3817             Some(vec!["orchard".to_string()]),
   3818         );
   3819         assert_eq!(
   3820             ingest_plot_event(&exec, &plot_seed, &FixedFactory).expect("plot seed"),
   3821             RadrootsReplicaIngestOutcome::Applied
   3822         );
   3823 
   3824         let mut plot_bad_content = plot_seed.clone();
   3825         plot_bad_content.content = "{".to_string();
   3826         assert!(ingest_plot_event(&exec, &plot_bad_content, &FixedFactory).is_err());
   3827 
   3828         let plot_query_fail = QueryFailExecutor {
   3829             inner: &exec,
   3830             needle: "from plot",
   3831             err: SqlError::Internal,
   3832         };
   3833         let plot_query = plot_event(
   3834             821,
   3835             &farm_pubkey,
   3836             101,
   3837             plot_d_tag,
   3838             RadrootsFarmRef {
   3839                 pubkey: farm_pubkey.clone(),
   3840                 d_tag: farm_d_tag.to_string(),
   3841             },
   3842             "plot-query",
   3843             None,
   3844             None,
   3845         );
   3846         assert!(ingest_plot_event(&plot_query_fail, &plot_query, &FixedFactory).is_err());
   3847 
   3848         let plot_update_fail = QueryFailExecutor {
   3849             inner: &exec,
   3850             needle: "update plot",
   3851             err: SqlError::Internal,
   3852         };
   3853         let plot_update = plot_event(
   3854             822,
   3855             &farm_pubkey,
   3856             102,
   3857             plot_d_tag,
   3858             RadrootsFarmRef {
   3859                 pubkey: farm_pubkey.clone(),
   3860                 d_tag: farm_d_tag.to_string(),
   3861             },
   3862             "plot-update",
   3863             None,
   3864             Some(vec!["u".to_string()]),
   3865         );
   3866         assert!(ingest_plot_event(&plot_update_fail, &plot_update, &FixedFactory).is_err());
   3867 
   3868         let plot_create_fail = QueryFailExecutor {
   3869             inner: &exec,
   3870             needle: "insert into plot",
   3871             err: SqlError::Internal,
   3872         };
   3873         let plot_create = plot_event(
   3874             823,
   3875             &farm_pubkey,
   3876             103,
   3877             "AAAAAAAAAAAAAAAAAAAAAg",
   3878             RadrootsFarmRef {
   3879                 pubkey: farm_pubkey.clone(),
   3880                 d_tag: farm_d_tag.to_string(),
   3881             },
   3882             "plot-create",
   3883             None,
   3884             None,
   3885         );
   3886         assert!(ingest_plot_event(&plot_create_fail, &plot_create, &FixedFactory).is_err());
   3887 
   3888         let plot_tag_fail = QueryFailExecutor {
   3889             inner: &exec,
   3890             needle: "plot_tag",
   3891             err: SqlError::Internal,
   3892         };
   3893         let plot_tag_event = plot_event(
   3894             824,
   3895             &farm_pubkey,
   3896             104,
   3897             "AAAAAAAAAAAAAAAAAAAAAw",
   3898             RadrootsFarmRef {
   3899                 pubkey: farm_pubkey.clone(),
   3900                 d_tag: farm_d_tag.to_string(),
   3901             },
   3902             "plot-tag",
   3903             None,
   3904             Some(vec!["tag".to_string()]),
   3905         );
   3906         assert!(ingest_plot_event(&plot_tag_fail, &plot_tag_event, &FixedFactory).is_err());
   3907 
   3908         let plot_gcs_fail = QueryFailExecutor {
   3909             inner: &exec,
   3910             needle: "gcs_location",
   3911             err: SqlError::Internal,
   3912         };
   3913         let plot_gcs_event = plot_event(
   3914             825,
   3915             &farm_pubkey,
   3916             105,
   3917             "AAAAAAAAAAAAAAAAAAAAAw",
   3918             RadrootsFarmRef {
   3919                 pubkey: farm_pubkey.clone(),
   3920                 d_tag: farm_d_tag.to_string(),
   3921             },
   3922             "plot-gcs",
   3923             Some(RadrootsPlotLocation {
   3924                 primary: Some("primary".to_string()),
   3925                 city: None,
   3926                 region: None,
   3927                 country: None,
   3928                 gcs: sample_gcs(16.0, 26.0, "s6"),
   3929             }),
   3930             None,
   3931         );
   3932         assert!(ingest_plot_event(&plot_gcs_fail, &plot_gcs_event, &FixedFactory).is_err());
   3933 
   3934         let plot_rel_fail = QueryFailExecutor {
   3935             inner: &exec,
   3936             needle: "plot_gcs_location",
   3937             err: SqlError::Internal,
   3938         };
   3939         let plot_rel_event = plot_event(
   3940             826,
   3941             &farm_pubkey,
   3942             106,
   3943             "AAAAAAAAAAAAAAAAAAAAAw",
   3944             RadrootsFarmRef {
   3945                 pubkey: farm_pubkey.clone(),
   3946                 d_tag: farm_d_tag.to_string(),
   3947             },
   3948             "plot-rel",
   3949             Some(RadrootsPlotLocation {
   3950                 primary: Some("primary".to_string()),
   3951                 city: None,
   3952                 region: None,
   3953                 country: None,
   3954                 gcs: sample_gcs(17.0, 27.0, "s7"),
   3955             }),
   3956             None,
   3957         );
   3958         assert!(ingest_plot_event(&plot_rel_fail, &plot_rel_event, &FixedFactory).is_err());
   3959 
   3960         let plot_state_fail = QueryFailExecutor {
   3961             inner: &exec,
   3962             needle: "nostr_event_head",
   3963             err: SqlError::Internal,
   3964         };
   3965         let plot_state_event = plot_event(
   3966             827,
   3967             &farm_pubkey,
   3968             107,
   3969             "AAAAAAAAAAAAAAAAAAAAAw",
   3970             RadrootsFarmRef {
   3971                 pubkey: farm_pubkey.clone(),
   3972                 d_tag: farm_d_tag.to_string(),
   3973             },
   3974             "plot-state",
   3975             None,
   3976             None,
   3977         );
   3978         assert!(ingest_plot_event(&plot_state_fail, &plot_state_event, &FixedFactory).is_err());
   3979 
   3980         let mut list_decode_fail = profile_event(
   3981             830,
   3982             &farm_pubkey,
   3983             108,
   3984             Some(RadrootsProfileType::Farm),
   3985             "unused",
   3986         );
   3987         list_decode_fail.kind = KIND_LIST_SET_GENERIC;
   3988         list_decode_fail.content = "{".to_string();
   3989         list_decode_fail.tags = Vec::new();
   3990         assert!(ingest_list_set_event(&exec, &list_decode_fail).is_err());
   3991 
   3992         let members_list = farm_list_sets::farm_members_list_set(farm_d_tag, vec!["6".repeat(64)])
   3993             .expect("members list");
   3994         let member_event =
   3995             list_set_event(831, &farm_pubkey, 109, KIND_LIST_SET_GENERIC, &members_list);
   3996         let list_decision_fail = QueryFailExecutor {
   3997             inner: &exec,
   3998             needle: "nostr_event_head",
   3999             err: SqlError::Internal,
   4000         };
   4001         assert!(ingest_list_set_event(&list_decision_fail, &member_event).is_err());
   4002 
   4003         let member_of =
   4004             farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()]).expect("member-of");
   4005         let member_of_event =
   4006             list_set_event(832, &"6".repeat(64), 110, KIND_LIST_SET_GENERIC, &member_of);
   4007         let claims_fail = QueryFailExecutor {
   4008             inner: &exec,
   4009             needle: "farm_member_claim",
   4010             err: SqlError::Internal,
   4011         };
   4012         assert!(ingest_list_set_event(&claims_fail, &member_of_event).is_err());
   4013 
   4014         let claims_state_fail = QueryFailExecutor {
   4015             inner: &exec,
   4016             needle: "nostr_event_head",
   4017             err: SqlError::Internal,
   4018         };
   4019         assert!(ingest_list_set_event(&claims_state_fail, &member_of_event).is_err());
   4020 
   4021         let plots_list = farm_list_sets::farm_plots_list_set(
   4022             farm_d_tag,
   4023             &farm_pubkey,
   4024             vec![plot_d_tag.to_string()],
   4025         )
   4026         .expect("plots list");
   4027         let plots_event =
   4028             list_set_event(833, &farm_pubkey, 111, KIND_LIST_SET_GENERIC, &plots_list);
   4029         let plots_state_fail = QueryFailExecutor {
   4030             inner: &exec,
   4031             needle: "nostr_event_head",
   4032             err: SqlError::Internal,
   4033         };
   4034         assert!(ingest_list_set_event(&plots_state_fail, &plots_event).is_err());
   4035 
   4036         let missing_farm_members =
   4037             farm_list_sets::farm_members_list_set(farm_d_tag, vec!["7".repeat(64)]).expect("list");
   4038         let missing_farm_event = list_set_event(
   4039             834,
   4040             &"3".repeat(64),
   4041             112,
   4042             KIND_LIST_SET_GENERIC,
   4043             &missing_farm_members,
   4044         );
   4045         assert!(ingest_list_set_event(&exec, &missing_farm_event).is_err());
   4046 
   4047         let members_create_fail = QueryFailExecutor {
   4048             inner: &exec,
   4049             needle: "farm_member",
   4050             err: SqlError::Internal,
   4051         };
   4052         assert!(ingest_list_set_event(&members_create_fail, &member_event).is_err());
   4053 
   4054         let members_state_fail = QueryFailExecutor {
   4055             inner: &exec,
   4056             needle: "nostr_event_head",
   4057             err: SqlError::Internal,
   4058         };
   4059         assert!(ingest_list_set_event(&members_state_fail, &member_event).is_err());
   4060 
   4061         assert!(parse_farm_list_set_d_tag("").is_none());
   4062         assert!(parse_farm_list_set_d_tag("farm").is_none());
   4063 
   4064         let state_create_fail = QueryFailExecutor {
   4065             inner: &exec,
   4066             needle: "nostr_event_head",
   4067             err: SqlError::Internal,
   4068         };
   4069         assert!(radroots_replica_ingest_event_head(&state_create_fail, &profile).is_err());
   4070 
   4071         radroots_replica_ingest_event_head(&exec, &profile).expect("seed state");
   4072         let state_update_fail = QueryFailExecutor {
   4073             inner: &exec,
   4074             needle: "update nostr_event_head",
   4075             err: SqlError::Internal,
   4076         };
   4077         let profile_update = profile_event(
   4078             808,
   4079             &"9".repeat(64),
   4080             101,
   4081             Some(RadrootsProfileType::Individual),
   4082             "profile-update-error",
   4083         );
   4084         assert!(radroots_replica_ingest_event_head(&state_update_fail, &profile_update).is_err());
   4085     }
   4086 
   4087     #[test]
   4088     fn ingest_insert_and_state_error_branches_are_covered() {
   4089         let exec = SqliteExecutor::open_memory().expect("db");
   4090         let (farm_id, farm_pubkey, farm_d_tag, plot_d_tag) = seed_rows(&exec);
   4091 
   4092         let profile = profile_event(
   4093             900,
   4094             &"e".repeat(64),
   4095             120,
   4096             Some(RadrootsProfileType::Individual),
   4097             "profile-state-insert",
   4098         );
   4099         let state_insert_fail = QueryFailExecutor {
   4100             inner: &exec,
   4101             needle: "insert into nostr_event_head",
   4102             err: SqlError::Internal,
   4103         };
   4104         assert!(ingest_profile_event(&state_insert_fail, &profile).is_err());
   4105 
   4106         let farm_state = farm_event(
   4107             901,
   4108             &"a".repeat(64),
   4109             121,
   4110             "AAAAAAAAAAAAAAAAAAAAAQ",
   4111             "farm-state-insert",
   4112             None,
   4113             None,
   4114         );
   4115         assert!(ingest_farm_event(&state_insert_fail, &farm_state, &FixedFactory).is_err());
   4116 
   4117         let plot_state = plot_event(
   4118             902,
   4119             &farm_pubkey,
   4120             122,
   4121             "AAAAAAAAAAAAAAAAAAAAAg",
   4122             RadrootsFarmRef {
   4123                 pubkey: farm_pubkey.clone(),
   4124                 d_tag: farm_d_tag.clone(),
   4125             },
   4126             "plot-state-insert",
   4127             None,
   4128             None,
   4129         );
   4130         assert!(ingest_plot_event(&state_insert_fail, &plot_state, &FixedFactory).is_err());
   4131 
   4132         let members_set = farm_list_sets::farm_members_list_set(&farm_d_tag, vec!["7".repeat(64)])
   4133             .expect("members");
   4134         let members_event =
   4135             list_set_event(903, &farm_pubkey, 123, KIND_LIST_SET_GENERIC, &members_set);
   4136         assert!(ingest_list_set_event(&state_insert_fail, &members_event).is_err());
   4137 
   4138         let plots_set = farm_list_sets::farm_plots_list_set(
   4139             &farm_d_tag,
   4140             &farm_pubkey,
   4141             vec![plot_d_tag.clone()],
   4142         )
   4143         .expect("plots");
   4144         let plots_event = list_set_event(904, &farm_pubkey, 124, KIND_LIST_SET_GENERIC, &plots_set);
   4145         assert!(ingest_list_set_event(&state_insert_fail, &plots_event).is_err());
   4146 
   4147         let member_of_set =
   4148             farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()]).expect("member_of");
   4149         let member_of_event = list_set_event(
   4150             905,
   4151             &"7".repeat(64),
   4152             125,
   4153             KIND_LIST_SET_GENERIC,
   4154             &member_of_set,
   4155         );
   4156         assert!(ingest_list_set_event(&state_insert_fail, &member_of_event).is_err());
   4157 
   4158         let state_insert_only_fail = QueryFailExecutor {
   4159             inner: &exec,
   4160             needle: "insert into nostr_event_head",
   4161             err: SqlError::Internal,
   4162         };
   4163         assert!(radroots_replica_ingest_event_head(&state_insert_only_fail, &profile).is_err());
   4164 
   4165         crate::event_head::event_content_hash_fail_next();
   4166         assert!(event_head_decision(&exec, &profile).is_err());
   4167 
   4168         let farm_tag_insert_fail = QueryFailExecutor {
   4169             inner: &exec,
   4170             needle: "insert into farm_tag",
   4171             err: SqlError::Internal,
   4172         };
   4173         assert!(
   4174             upsert_farm_tags(
   4175                 &farm_tag_insert_fail,
   4176                 &farm_id,
   4177                 Some(vec!["delta".to_string()])
   4178             )
   4179             .is_err()
   4180         );
   4181 
   4182         let plot_id = plot::find_many(&exec, &IPlotFindMany { filter: None })
   4183             .expect("plots")
   4184             .results[0]
   4185             .id
   4186             .clone();
   4187         let plot_tag_insert_fail = QueryFailExecutor {
   4188             inner: &exec,
   4189             needle: "insert into plot_tag",
   4190             err: SqlError::Internal,
   4191         };
   4192         assert!(
   4193             upsert_plot_tags(
   4194                 &plot_tag_insert_fail,
   4195                 &plot_id,
   4196                 Some(vec!["epsilon".to_string()])
   4197             )
   4198             .is_err()
   4199         );
   4200 
   4201         let farm_gcs_insert_fail = QueryFailExecutor {
   4202             inner: &exec,
   4203             needle: "insert into gcs_location",
   4204             err: SqlError::Internal,
   4205         };
   4206         assert!(
   4207             upsert_farm_location(
   4208                 &farm_gcs_insert_fail,
   4209                 &farm_id,
   4210                 Some(RadrootsFarmLocation {
   4211                     primary: Some("primary".to_string()),
   4212                     city: None,
   4213                     region: None,
   4214                     country: None,
   4215                     gcs: Some(sample_gcs(31.0, 41.0, "s8")),
   4216                 }),
   4217                 &FixedFactory,
   4218             )
   4219             .is_err()
   4220         );
   4221 
   4222         let farm_rel_insert_fail = QueryFailExecutor {
   4223             inner: &exec,
   4224             needle: "insert into farm_gcs_location",
   4225             err: SqlError::Internal,
   4226         };
   4227         assert!(
   4228             upsert_farm_location(
   4229                 &farm_rel_insert_fail,
   4230                 &farm_id,
   4231                 Some(RadrootsFarmLocation {
   4232                     primary: Some("primary".to_string()),
   4233                     city: None,
   4234                     region: None,
   4235                     country: None,
   4236                     gcs: Some(sample_gcs(32.0, 42.0, "s9")),
   4237                 }),
   4238                 &FixedFactory,
   4239             )
   4240             .is_err()
   4241         );
   4242 
   4243         let plot_gcs_insert_fail = QueryFailExecutor {
   4244             inner: &exec,
   4245             needle: "insert into gcs_location",
   4246             err: SqlError::Internal,
   4247         };
   4248         assert!(
   4249             upsert_plot_location(
   4250                 &plot_gcs_insert_fail,
   4251                 &plot_id,
   4252                 Some(RadrootsPlotLocation {
   4253                     primary: Some("primary".to_string()),
   4254                     city: None,
   4255                     region: None,
   4256                     country: None,
   4257                     gcs: sample_gcs(33.0, 43.0, "sa"),
   4258                 }),
   4259                 &FixedFactory,
   4260             )
   4261             .is_err()
   4262         );
   4263 
   4264         let plot_rel_insert_fail = QueryFailExecutor {
   4265             inner: &exec,
   4266             needle: "insert into plot_gcs_location",
   4267             err: SqlError::Internal,
   4268         };
   4269         assert!(
   4270             upsert_plot_location(
   4271                 &plot_rel_insert_fail,
   4272                 &plot_id,
   4273                 Some(RadrootsPlotLocation {
   4274                     primary: Some("primary".to_string()),
   4275                     city: None,
   4276                     region: None,
   4277                     country: None,
   4278                     gcs: sample_gcs(34.0, 44.0, "sb"),
   4279                 }),
   4280                 &FixedFactory,
   4281             )
   4282             .is_err()
   4283         );
   4284 
   4285         let member_insert_fail = QueryFailExecutor {
   4286             inner: &exec,
   4287             needle: "insert into farm_member",
   4288             err: SqlError::Internal,
   4289         };
   4290         assert!(
   4291             upsert_farm_members(
   4292                 &member_insert_fail,
   4293                 &farm_id,
   4294                 ListSetRole::Members,
   4295                 &members_set
   4296             )
   4297             .is_err()
   4298         );
   4299 
   4300         let claims_insert_fail = QueryFailExecutor {
   4301             inner: &exec,
   4302             needle: "insert into farm_member_claim",
   4303             err: SqlError::Internal,
   4304         };
   4305         assert!(
   4306             upsert_member_claims(&claims_insert_fail, &"7".repeat(64), &member_of_set).is_err()
   4307         );
   4308 
   4309         super::failpoints::set_gcs_point_serialize_error();
   4310         assert!(create_gcs_location(&exec, sample_gcs(35.0, 45.0, "sc"), &FixedFactory).is_err());
   4311 
   4312         super::failpoints::set_gcs_polygon_serialize_error();
   4313         assert!(create_gcs_location(&exec, sample_gcs(36.0, 46.0, "sd"), &FixedFactory).is_err());
   4314 
   4315         assert!(parse_farm_list_set_d_tag("coop:AAAAAAAAAAAAAAAAAAAAAA:members").is_none());
   4316     }
   4317 
   4318     #[test]
   4319     fn upsert_member_helpers_ignore_empty_entry_values() {
   4320         let exec = SqliteExecutor::open_memory().expect("db");
   4321         let (farm_id, farm_pubkey, _, _) = seed_rows(&exec);
   4322 
   4323         let member_pubkey = "6".repeat(64);
   4324         let member_list_set = RadrootsListSet {
   4325             d_tag: "farm:AAAAAAAAAAAAAAAAAAAAAQ:members".to_string(),
   4326             content: String::new(),
   4327             entries: vec![
   4328                 RadrootsListEntry {
   4329                     tag: "p".to_string(),
   4330                     values: Vec::new(),
   4331                 },
   4332                 RadrootsListEntry {
   4333                     tag: "p".to_string(),
   4334                     values: vec![member_pubkey.clone(), "ignored".to_string()],
   4335                 },
   4336                 RadrootsListEntry {
   4337                     tag: "p".to_string(),
   4338                     values: vec![member_pubkey.clone()],
   4339                 },
   4340             ],
   4341             title: None,
   4342             description: None,
   4343             image: None,
   4344         };
   4345         upsert_farm_members(&exec, &farm_id, ListSetRole::Members, &member_list_set)
   4346             .expect("members");
   4347         let members = farm_member::find_many(
   4348             &exec,
   4349             &IFarmMemberFindMany {
   4350                 filter: Some(IFarmMemberFieldsFilter {
   4351                     id: None,
   4352                     created_at: None,
   4353                     updated_at: None,
   4354                     farm_id: Some(farm_id.clone()),
   4355                     member_pubkey: None,
   4356                     role: Some(ROLE_MEMBER.to_string()),
   4357                 }),
   4358             },
   4359         )
   4360         .expect("member rows")
   4361         .results;
   4362         assert_eq!(members.len(), 1);
   4363         assert_eq!(members[0].member_pubkey, member_pubkey);
   4364 
   4365         upsert_farm_members(&exec, &farm_id, ListSetRole::Plots, &member_list_set)
   4366             .expect("plots is no-op");
   4367 
   4368         let claimant_pubkey = "7".repeat(64);
   4369         let claims_list_set = RadrootsListSet {
   4370             d_tag: "member_of.farms".to_string(),
   4371             content: String::new(),
   4372             entries: vec![
   4373                 RadrootsListEntry {
   4374                     tag: "p".to_string(),
   4375                     values: Vec::new(),
   4376                 },
   4377                 RadrootsListEntry {
   4378                     tag: "p".to_string(),
   4379                     values: vec![farm_pubkey.clone(), "ignored".to_string()],
   4380                 },
   4381                 RadrootsListEntry {
   4382                     tag: "p".to_string(),
   4383                     values: vec![farm_pubkey.clone()],
   4384                 },
   4385             ],
   4386             title: None,
   4387             description: None,
   4388             image: None,
   4389         };
   4390         upsert_member_claims(&exec, &claimant_pubkey, &claims_list_set).expect("claims");
   4391         let claims = farm_member_claim::find_many(
   4392             &exec,
   4393             &IFarmMemberClaimFindMany {
   4394                 filter: Some(IFarmMemberClaimFieldsFilter {
   4395                     id: None,
   4396                     created_at: None,
   4397                     updated_at: None,
   4398                     member_pubkey: Some(claimant_pubkey),
   4399                     farm_pubkey: None,
   4400                 }),
   4401             },
   4402         )
   4403         .expect("claim rows")
   4404         .results;
   4405         assert_eq!(claims.len(), 1);
   4406         assert_eq!(claims[0].farm_pubkey, farm_pubkey);
   4407     }
   4408 
   4409     #[test]
   4410     fn ingest_error_paths_cover_missing_farm_and_bad_list_set_tags() {
   4411         let exec = SqliteExecutor::open_memory().expect("db");
   4412         let (_, farm_pubkey, farm_d_tag, plot_d_tag) = seed_rows(&exec);
   4413 
   4414         let missing_farm_plot = plot_event(
   4415             950,
   4416             &farm_pubkey,
   4417             220,
   4418             &plot_d_tag,
   4419             RadrootsFarmRef {
   4420                 pubkey: "1".repeat(64),
   4421                 d_tag: farm_d_tag.clone(),
   4422             },
   4423             "plot-missing-farm",
   4424             None,
   4425             None,
   4426         );
   4427         assert!(ingest_plot_event(&exec, &missing_farm_plot, &FixedFactory).is_err());
   4428 
   4429         let mut bad_member_of =
   4430             farm_list_sets::member_of_farms_list_set(vec![farm_pubkey.clone()]).expect("member_of");
   4431         bad_member_of.entries[0].tag = "x".to_string();
   4432         let bad_member_of_event = list_set_event(
   4433             951,
   4434             &"7".repeat(64),
   4435             221,
   4436             KIND_LIST_SET_GENERIC,
   4437             &bad_member_of,
   4438         );
   4439         assert!(ingest_list_set_event(&exec, &bad_member_of_event).is_err());
   4440 
   4441         let mut bad_plots = farm_list_sets::farm_plots_list_set(
   4442             &farm_d_tag,
   4443             &farm_pubkey,
   4444             vec![plot_d_tag.clone()],
   4445         )
   4446         .expect("plots");
   4447         bad_plots.entries[0].tag = "p".to_string();
   4448         let bad_plots_event =
   4449             list_set_event(952, &farm_pubkey, 222, KIND_LIST_SET_GENERIC, &bad_plots);
   4450         assert!(ingest_list_set_event(&exec, &bad_plots_event).is_err());
   4451 
   4452         let mut bad_members =
   4453             farm_list_sets::farm_members_list_set(&farm_d_tag, vec!["6".repeat(64)])
   4454                 .expect("members");
   4455         bad_members.entries[0].tag = "a".to_string();
   4456         let bad_members_event =
   4457             list_set_event(953, &farm_pubkey, 223, KIND_LIST_SET_GENERIC, &bad_members);
   4458         assert!(ingest_list_set_event(&exec, &bad_members_event).is_err());
   4459     }
   4460 }