import click from layer_helpers import read_csv_rows, representative_value, stable_id, write_csv_rows CANONICAL_FIELDS = [ "canonical_product_id", "canonical_name", "product_type", "brand", "variant", "size_value", "size_unit", "pack_qty", "measure_type", "normalized_quantity", "normalized_quantity_unit", "notes", "created_at", "updated_at", ] LINK_FIELDS = [ "observed_product_id", "canonical_product_id", "link_method", "link_confidence", "review_status", "reviewed_by", "reviewed_at", "link_notes", ] def to_float(value): try: return float(value) except (TypeError, ValueError): return None def normalized_quantity(row): size_value = to_float(row.get("representative_size_value")) pack_qty = to_float(row.get("representative_pack_qty")) or 1.0 size_unit = row.get("representative_size_unit", "") measure_type = row.get("representative_measure_type", "") if size_value is not None and size_unit: return format(size_value * pack_qty, "g"), size_unit if row.get("representative_pack_qty") and measure_type == "count": return row["representative_pack_qty"], "count" if measure_type == "each": return "1", "each" return "", "" def auto_link_rule(observed_row): if observed_row.get("is_fee") == "true": return "", "", "" if observed_row.get("representative_upc"): return ( "exact_upc", f"upc={observed_row['representative_upc']}", "high", ) if ( observed_row.get("representative_name_norm") and observed_row.get("representative_size_value") and observed_row.get("representative_size_unit") ): return ( "exact_name_size", "|".join( [ f"name={observed_row['representative_name_norm']}", f"size={observed_row['representative_size_value']}", f"unit={observed_row['representative_size_unit']}", f"pack={observed_row['representative_pack_qty']}", f"measure={observed_row['representative_measure_type']}", ] ), "high", ) if ( observed_row.get("representative_name_norm") and not observed_row.get("representative_size_value") and not observed_row.get("representative_size_unit") and not observed_row.get("representative_pack_qty") ): return ( "exact_name", "|".join( [ f"name={observed_row['representative_name_norm']}", f"measure={observed_row['representative_measure_type']}", ] ), "medium", ) return "", "", "" def canonical_row_for_group(canonical_product_id, group_rows, link_method): quantity_value, quantity_unit = normalized_quantity( { "representative_size_value": representative_value( group_rows, "representative_size_value" ), "representative_size_unit": representative_value( group_rows, "representative_size_unit" ), "representative_pack_qty": representative_value( group_rows, "representative_pack_qty" ), "representative_measure_type": representative_value( group_rows, "representative_measure_type" ), } ) return { "canonical_product_id": canonical_product_id, "canonical_name": representative_value(group_rows, "representative_name_norm"), "product_type": "", "brand": representative_value(group_rows, "representative_brand"), "variant": representative_value(group_rows, "representative_variant"), "size_value": representative_value(group_rows, "representative_size_value"), "size_unit": representative_value(group_rows, "representative_size_unit"), "pack_qty": representative_value(group_rows, "representative_pack_qty"), "measure_type": representative_value(group_rows, "representative_measure_type"), "normalized_quantity": quantity_value, "normalized_quantity_unit": quantity_unit, "notes": f"auto-linked via {link_method}", "created_at": "", "updated_at": "", } def build_canonical_layer(observed_rows): canonical_rows = [] link_rows = [] groups = {} for observed_row in sorted(observed_rows, key=lambda row: row["observed_product_id"]): link_method, group_key, confidence = auto_link_rule(observed_row) if not group_key: continue canonical_product_id = stable_id("gcan", f"{link_method}|{group_key}") groups.setdefault(canonical_product_id, {"method": link_method, "rows": []}) groups[canonical_product_id]["rows"].append(observed_row) link_rows.append( { "observed_product_id": observed_row["observed_product_id"], "canonical_product_id": canonical_product_id, "link_method": link_method, "link_confidence": confidence, "review_status": "", "reviewed_by": "", "reviewed_at": "", "link_notes": "", } ) for canonical_product_id, group in sorted(groups.items()): canonical_rows.append( canonical_row_for_group( canonical_product_id, group["rows"], group["method"] ) ) return canonical_rows, link_rows @click.command() @click.option( "--observed-csv", default="giant_output/products_observed.csv", show_default=True, help="Path to observed product rows.", ) @click.option( "--canonical-csv", default="giant_output/products_canonical.csv", show_default=True, help="Path to canonical product output.", ) @click.option( "--links-csv", default="giant_output/product_links.csv", show_default=True, help="Path to observed-to-canonical link output.", ) def main(observed_csv, canonical_csv, links_csv): observed_rows = read_csv_rows(observed_csv) canonical_rows, link_rows = build_canonical_layer(observed_rows) write_csv_rows(canonical_csv, canonical_rows, CANONICAL_FIELDS) write_csv_rows(links_csv, link_rows, LINK_FIELDS) click.echo( f"wrote {len(canonical_rows)} canonical rows to {canonical_csv} and " f"{len(link_rows)} links to {links_csv}" ) if __name__ == "__main__": main()