from decimal import Decimal from pathlib import Path import click import build_canonical_layer import build_observed_products import validate_cross_retailer_flow from enrich_giant import format_decimal, to_decimal <<<<<<< HEAD from layer_helpers import read_csv_rows, stable_id, write_csv_rows ======= from layer_helpers import read_csv_rows, write_csv_rows >>>>>>> be1bf63 (Build pivot-ready purchase log) PURCHASE_FIELDS = [ "purchase_date", "retailer", "order_id", "line_no", "observed_item_key", "observed_product_id", "canonical_product_id", <<<<<<< HEAD "review_status", "resolution_action", ======= >>>>>>> be1bf63 (Build pivot-ready purchase log) "raw_item_name", "normalized_item_name", "retailer_item_id", "upc", "qty", "unit", "pack_qty", "size_value", "size_unit", "measure_type", "line_total", "unit_price", "store_name", "store_number", "store_city", "store_state", "price_per_each", "price_per_each_basis", "price_per_count", "price_per_count_basis", "price_per_lb", "price_per_lb_basis", "price_per_oz", "price_per_oz_basis", "is_discount_line", "is_coupon_line", "is_fee", "raw_order_path", ] EXAMPLE_FIELDS = [ "example_name", "canonical_product_id", "giant_purchase_date", "giant_raw_item_name", "giant_price_per_lb", "costco_purchase_date", "costco_raw_item_name", "costco_price_per_lb", "notes", ] <<<<<<< HEAD CATALOG_FIELDS = [ "canonical_product_id", "canonical_name", "category", "product_type", "brand", "variant", "size_value", "size_unit", "pack_qty", "measure_type", "notes", "created_at", "updated_at", ] RESOLUTION_FIELDS = [ "observed_product_id", "canonical_product_id", "resolution_action", "status", "resolution_notes", "reviewed_at", ] ======= >>>>>>> be1bf63 (Build pivot-ready purchase log) def decimal_or_zero(value): return to_decimal(value) or Decimal("0") def derive_metrics(row): line_total = to_decimal(row.get("line_total")) qty = to_decimal(row.get("qty")) pack_qty = to_decimal(row.get("pack_qty")) size_value = to_decimal(row.get("size_value")) picked_weight = to_decimal(row.get("picked_weight")) size_unit = row.get("size_unit", "") price_per_each = row.get("price_per_each", "") price_per_lb = row.get("price_per_lb", "") price_per_oz = row.get("price_per_oz", "") price_per_count = "" basis_each = "" basis_count = "" basis_lb = "" basis_oz = "" if price_per_each: basis_each = "line_total_over_qty" elif line_total is not None and qty not in (None, 0): price_per_each = format_decimal(line_total / qty) basis_each = "line_total_over_qty" if line_total is not None and pack_qty not in (None, 0): total_count = pack_qty * (qty or Decimal("1")) if total_count not in (None, 0): price_per_count = format_decimal(line_total / total_count) basis_count = "line_total_over_pack_qty" if picked_weight not in (None, 0): price_per_lb = format_decimal(line_total / picked_weight) if line_total is not None else "" price_per_oz = ( format_decimal((line_total / picked_weight) / Decimal("16")) if line_total is not None else "" ) basis_lb = "picked_weight_lb" basis_oz = "picked_weight_lb_to_oz" elif line_total is not None and size_value not in (None, 0): total_units = size_value * (pack_qty or Decimal("1")) * (qty or Decimal("1")) if size_unit == "lb" and total_units not in (None, 0): per_lb = line_total / total_units price_per_lb = format_decimal(per_lb) price_per_oz = format_decimal(per_lb / Decimal("16")) basis_lb = "parsed_size_lb" basis_oz = "parsed_size_lb_to_oz" elif size_unit == "oz" and total_units not in (None, 0): per_oz = line_total / total_units price_per_oz = format_decimal(per_oz) price_per_lb = format_decimal(per_oz * Decimal("16")) basis_lb = "parsed_size_oz_to_lb" basis_oz = "parsed_size_oz" return { "price_per_each": price_per_each, "price_per_each_basis": basis_each, "price_per_count": price_per_count, "price_per_count_basis": basis_count, "price_per_lb": price_per_lb, "price_per_lb_basis": basis_lb, "price_per_oz": price_per_oz, "price_per_oz_basis": basis_oz, } def order_lookup(rows, retailer): return { (retailer, row["order_id"]): row for row in rows } <<<<<<< HEAD def read_optional_csv_rows(path): path = Path(path) if not path.exists(): return [] return read_csv_rows(path) def load_resolution_lookup(resolution_rows): lookup = {} for row in resolution_rows: if not row.get("observed_product_id"): continue lookup[row["observed_product_id"]] = row return lookup def merge_catalog_rows(existing_rows, auto_rows): merged = {} for row in auto_rows + existing_rows: canonical_product_id = row.get("canonical_product_id", "") if canonical_product_id: merged[canonical_product_id] = row return sorted(merged.values(), key=lambda row: row["canonical_product_id"]) def catalog_row_from_canonical(row): return { "canonical_product_id": row.get("canonical_product_id", ""), "canonical_name": row.get("canonical_name", ""), "category": row.get("category", ""), "product_type": row.get("product_type", ""), "brand": row.get("brand", ""), "variant": row.get("variant", ""), "size_value": row.get("size_value", ""), "size_unit": row.get("size_unit", ""), "pack_qty": row.get("pack_qty", ""), "measure_type": row.get("measure_type", ""), "notes": row.get("notes", ""), "created_at": row.get("created_at", ""), "updated_at": row.get("updated_at", ""), } def build_link_state(enriched_rows): ======= def build_link_lookup(enriched_rows): >>>>>>> be1bf63 (Build pivot-ready purchase log) observed_rows = build_observed_products.build_observed_products(enriched_rows) canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows) giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows) canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair( canonical_rows, link_rows, giant_row, costco_row, ) observed_id_by_key = { row["observed_key"]: row["observed_product_id"] for row in observed_rows } canonical_id_by_observed = { row["observed_product_id"]: row["canonical_product_id"] for row in link_rows } <<<<<<< HEAD return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed def build_purchase_rows( giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders, resolution_rows, ): all_enriched_rows = giant_enriched_rows + costco_enriched_rows ( observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed, ) = build_link_state(all_enriched_rows) resolution_lookup = load_resolution_lookup(resolution_rows) for observed_product_id, resolution in resolution_lookup.items(): action = resolution.get("resolution_action", "") status = resolution.get("status", "") if status != "approved": continue if action in {"link", "create"} and resolution.get("canonical_product_id"): canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"] elif action == "exclude": canonical_id_by_observed[observed_product_id] = "" ======= return observed_id_by_key, canonical_id_by_observed def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders): all_enriched_rows = giant_enriched_rows + costco_enriched_rows observed_id_by_key, canonical_id_by_observed = build_link_lookup(all_enriched_rows) >>>>>>> be1bf63 (Build pivot-ready purchase log) orders_by_id = {} orders_by_id.update(order_lookup(giant_orders, "giant")) orders_by_id.update(order_lookup(costco_orders, "costco")) purchase_rows = [] for row in sorted( all_enriched_rows, key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])), ): observed_key = build_observed_products.build_observed_key(row) observed_product_id = observed_id_by_key.get(observed_key, "") order_row = orders_by_id.get((row["retailer"], row["order_id"]), {}) metrics = derive_metrics(row) <<<<<<< HEAD resolution = resolution_lookup.get(observed_product_id, {}) ======= >>>>>>> be1bf63 (Build pivot-ready purchase log) purchase_rows.append( { "purchase_date": row["order_date"], "retailer": row["retailer"], "order_id": row["order_id"], "line_no": row["line_no"], "observed_item_key": row["observed_item_key"], "observed_product_id": observed_product_id, "canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""), <<<<<<< HEAD "review_status": resolution.get("status", ""), "resolution_action": resolution.get("resolution_action", ""), ======= >>>>>>> be1bf63 (Build pivot-ready purchase log) "raw_item_name": row["item_name"], "normalized_item_name": row["item_name_norm"], "retailer_item_id": row["retailer_item_id"], "upc": row["upc"], "qty": row["qty"], "unit": row["unit"], "pack_qty": row["pack_qty"], "size_value": row["size_value"], "size_unit": row["size_unit"], "measure_type": row["measure_type"], "line_total": row["line_total"], "unit_price": row["unit_price"], "store_name": order_row.get("store_name", ""), "store_number": order_row.get("store_number", ""), "store_city": order_row.get("store_city", ""), "store_state": order_row.get("store_state", ""), "is_discount_line": row["is_discount_line"], "is_coupon_line": row["is_coupon_line"], "is_fee": row["is_fee"], "raw_order_path": row["raw_order_path"], **metrics, } ) <<<<<<< HEAD return purchase_rows, observed_rows, canonical_rows, link_rows def apply_manual_resolutions_to_links(link_rows, resolution_rows): link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows} for resolution in resolution_rows: if resolution.get("status") != "approved": continue observed_product_id = resolution.get("observed_product_id", "") action = resolution.get("resolution_action", "") if not observed_product_id: continue if action == "exclude": link_by_observed.pop(observed_product_id, None) continue if action in {"link", "create"} and resolution.get("canonical_product_id"): link_by_observed[observed_product_id] = { "observed_product_id": observed_product_id, "canonical_product_id": resolution["canonical_product_id"], "link_method": f"manual_{action}", "link_confidence": "high", "review_status": resolution.get("status", ""), "reviewed_by": "", "reviewed_at": resolution.get("reviewed_at", ""), "link_notes": resolution.get("resolution_notes", ""), } return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"]) ======= return purchase_rows >>>>>>> be1bf63 (Build pivot-ready purchase log) def build_comparison_examples(purchase_rows): giant_banana = None costco_banana = None for row in purchase_rows: if row.get("normalized_item_name") != "BANANA": continue if not row.get("canonical_product_id"): continue if row["retailer"] == "giant" and row.get("price_per_lb"): giant_banana = row if row["retailer"] == "costco" and row.get("price_per_lb"): costco_banana = row if not giant_banana or not costco_banana: return [] return [ { "example_name": "banana_price_per_lb", "canonical_product_id": giant_banana["canonical_product_id"], "giant_purchase_date": giant_banana["purchase_date"], "giant_raw_item_name": giant_banana["raw_item_name"], "giant_price_per_lb": giant_banana["price_per_lb"], "costco_purchase_date": costco_banana["purchase_date"], "costco_raw_item_name": costco_banana["raw_item_name"], "costco_price_per_lb": costco_banana["price_per_lb"], "notes": "Example comparison using normalized price_per_lb across Giant and Costco", } ] @click.command() @click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True) @click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True) @click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True) @click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True) <<<<<<< HEAD @click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True) @click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True) @click.option("--links-csv", default="combined_output/product_links.csv", show_default=True) ======= >>>>>>> be1bf63 (Build pivot-ready purchase log) @click.option("--output-csv", default="combined_output/purchases.csv", show_default=True) @click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True) def main( giant_items_enriched_csv, costco_items_enriched_csv, giant_orders_csv, costco_orders_csv, <<<<<<< HEAD resolutions_csv, catalog_csv, links_csv, output_csv, examples_csv, ): resolution_rows = read_optional_csv_rows(resolutions_csv) purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows( ======= output_csv, examples_csv, ): purchase_rows = build_purchase_rows( >>>>>>> be1bf63 (Build pivot-ready purchase log) read_csv_rows(giant_items_enriched_csv), read_csv_rows(costco_items_enriched_csv), read_csv_rows(giant_orders_csv), read_csv_rows(costco_orders_csv), <<<<<<< HEAD resolution_rows, ) existing_catalog_rows = read_optional_csv_rows(catalog_csv) merged_catalog_rows = merge_catalog_rows( existing_catalog_rows, [catalog_row_from_canonical(row) for row in canonical_rows], ) link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows) example_rows = build_comparison_examples(purchase_rows) write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS) write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS) write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS) write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS) click.echo( f"wrote {len(purchase_rows)} purchase rows to {output_csv}, " f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, " ======= ) example_rows = build_comparison_examples(purchase_rows) write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS) write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS) click.echo( f"wrote {len(purchase_rows)} purchase rows to {output_csv} " >>>>>>> be1bf63 (Build pivot-ready purchase log) f"and {len(example_rows)} comparison examples to {examples_csv}" ) if __name__ == "__main__": main()