import csv import click from layer_helpers import read_csv_rows, stable_id, write_csv_rows CANONICAL_FIELDS = [ "canonical_product_id", "canonical_name", "product_type", "brand", "variant", "size_value", "size_unit", "pack_qty", "measure_type", "normalized_quantity", "normalized_quantity_unit", "notes", "created_at", "updated_at", ] LINK_FIELDS = [ "observed_product_id", "canonical_product_id", "link_method", "link_confidence", "review_status", "reviewed_by", "reviewed_at", "link_notes", ] def to_float(value): try: return float(value) except (TypeError, ValueError): return None def normalized_quantity(row): size_value = to_float(row.get("representative_size_value")) pack_qty = to_float(row.get("representative_pack_qty")) or 1.0 size_unit = row.get("representative_size_unit", "") measure_type = row.get("representative_measure_type", "") if size_value is not None and size_unit: return format(size_value * pack_qty, "g"), size_unit if row.get("representative_pack_qty") and measure_type == "count": return row["representative_pack_qty"], "count" if measure_type == "each": return "1", "each" return "", "" def build_canonical_layer(observed_rows): canonical_rows = [] link_rows = [] for observed_row in sorted(observed_rows, key=lambda row: row["observed_product_id"]): canonical_product_id = stable_id( "gcan", f"seed|{observed_row['observed_product_id']}" ) quantity_value, quantity_unit = normalized_quantity(observed_row) canonical_rows.append( { "canonical_product_id": canonical_product_id, "canonical_name": observed_row["representative_name_norm"], "product_type": "", "brand": observed_row["representative_brand"], "variant": observed_row["representative_variant"], "size_value": observed_row["representative_size_value"], "size_unit": observed_row["representative_size_unit"], "pack_qty": observed_row["representative_pack_qty"], "measure_type": observed_row["representative_measure_type"], "normalized_quantity": quantity_value, "normalized_quantity_unit": quantity_unit, "notes": f"seeded from {observed_row['observed_product_id']}", "created_at": "", "updated_at": "", } ) link_rows.append( { "observed_product_id": observed_row["observed_product_id"], "canonical_product_id": canonical_product_id, "link_method": "seed_observed_product", "link_confidence": "", "review_status": "", "reviewed_by": "", "reviewed_at": "", "link_notes": "", } ) return canonical_rows, link_rows @click.command() @click.option( "--observed-csv", default="giant_output/products_observed.csv", show_default=True, help="Path to observed product rows.", ) @click.option( "--canonical-csv", default="giant_output/products_canonical.csv", show_default=True, help="Path to canonical product output.", ) @click.option( "--links-csv", default="giant_output/product_links.csv", show_default=True, help="Path to observed-to-canonical link output.", ) def main(observed_csv, canonical_csv, links_csv): observed_rows = read_csv_rows(observed_csv) canonical_rows, link_rows = build_canonical_layer(observed_rows) write_csv_rows(canonical_csv, canonical_rows, CANONICAL_FIELDS) write_csv_rows(links_csv, link_rows, LINK_FIELDS) click.echo( f"wrote {len(canonical_rows)} canonical rows to {canonical_csv} and " f"{len(link_rows)} links to {links_csv}" ) if __name__ == "__main__": main()