Files
scrape-giant/build_purchases.py
2026-03-16 18:01:09 -04:00

275 lines
9.6 KiB
Python

from decimal import Decimal
from pathlib import Path
import click
import build_canonical_layer
import build_observed_products
import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, write_csv_rows
PURCHASE_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_item_key",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"retailer_item_id",
"upc",
"qty",
"unit",
"pack_qty",
"size_value",
"size_unit",
"measure_type",
"line_total",
"unit_price",
"store_name",
"store_number",
"store_city",
"store_state",
"price_per_each",
"price_per_each_basis",
"price_per_count",
"price_per_count_basis",
"price_per_lb",
"price_per_lb_basis",
"price_per_oz",
"price_per_oz_basis",
"is_discount_line",
"is_coupon_line",
"is_fee",
"raw_order_path",
]
EXAMPLE_FIELDS = [
"example_name",
"canonical_product_id",
"giant_purchase_date",
"giant_raw_item_name",
"giant_price_per_lb",
"costco_purchase_date",
"costco_raw_item_name",
"costco_price_per_lb",
"notes",
]
def decimal_or_zero(value):
return to_decimal(value) or Decimal("0")
def derive_metrics(row):
line_total = to_decimal(row.get("line_total"))
qty = to_decimal(row.get("qty"))
pack_qty = to_decimal(row.get("pack_qty"))
size_value = to_decimal(row.get("size_value"))
picked_weight = to_decimal(row.get("picked_weight"))
size_unit = row.get("size_unit", "")
price_per_each = row.get("price_per_each", "")
price_per_lb = row.get("price_per_lb", "")
price_per_oz = row.get("price_per_oz", "")
price_per_count = ""
basis_each = ""
basis_count = ""
basis_lb = ""
basis_oz = ""
if price_per_each:
basis_each = "line_total_over_qty"
elif line_total is not None and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
basis_each = "line_total_over_qty"
if line_total is not None and pack_qty not in (None, 0):
total_count = pack_qty * (qty or Decimal("1"))
if total_count not in (None, 0):
price_per_count = format_decimal(line_total / total_count)
basis_count = "line_total_over_pack_qty"
if picked_weight not in (None, 0):
price_per_lb = format_decimal(line_total / picked_weight) if line_total is not None else ""
price_per_oz = (
format_decimal((line_total / picked_weight) / Decimal("16"))
if line_total is not None
else ""
)
basis_lb = "picked_weight_lb"
basis_oz = "picked_weight_lb_to_oz"
elif line_total is not None and size_value not in (None, 0):
total_units = size_value * (pack_qty or Decimal("1")) * (qty or Decimal("1"))
if size_unit == "lb" and total_units not in (None, 0):
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
basis_lb = "parsed_size_lb"
basis_oz = "parsed_size_lb_to_oz"
elif size_unit == "oz" and total_units not in (None, 0):
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * Decimal("16"))
basis_lb = "parsed_size_oz_to_lb"
basis_oz = "parsed_size_oz"
return {
"price_per_each": price_per_each,
"price_per_each_basis": basis_each,
"price_per_count": price_per_count,
"price_per_count_basis": basis_count,
"price_per_lb": price_per_lb,
"price_per_lb_basis": basis_lb,
"price_per_oz": price_per_oz,
"price_per_oz_basis": basis_oz,
}
def order_lookup(rows, retailer):
return {
(retailer, row["order_id"]): row
for row in rows
}
def build_link_lookup(enriched_rows):
observed_rows = build_observed_products.build_observed_products(enriched_rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair(
canonical_rows,
link_rows,
giant_row,
costco_row,
)
observed_id_by_key = {
row["observed_key"]: row["observed_product_id"] for row in observed_rows
}
canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
}
return observed_id_by_key, canonical_id_by_observed
def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
observed_id_by_key, canonical_id_by_observed = build_link_lookup(all_enriched_rows)
orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco"))
purchase_rows = []
for row in sorted(
all_enriched_rows,
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
):
observed_key = build_observed_products.build_observed_key(row)
observed_product_id = observed_id_by_key.get(observed_key, "")
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row)
purchase_rows.append(
{
"purchase_date": row["order_date"],
"retailer": row["retailer"],
"order_id": row["order_id"],
"line_no": row["line_no"],
"observed_item_key": row["observed_item_key"],
"observed_product_id": observed_product_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
"raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"],
"retailer_item_id": row["retailer_item_id"],
"upc": row["upc"],
"qty": row["qty"],
"unit": row["unit"],
"pack_qty": row["pack_qty"],
"size_value": row["size_value"],
"size_unit": row["size_unit"],
"measure_type": row["measure_type"],
"line_total": row["line_total"],
"unit_price": row["unit_price"],
"store_name": order_row.get("store_name", ""),
"store_number": order_row.get("store_number", ""),
"store_city": order_row.get("store_city", ""),
"store_state": order_row.get("store_state", ""),
"is_discount_line": row["is_discount_line"],
"is_coupon_line": row["is_coupon_line"],
"is_fee": row["is_fee"],
"raw_order_path": row["raw_order_path"],
**metrics,
}
)
return purchase_rows
def build_comparison_examples(purchase_rows):
giant_banana = None
costco_banana = None
for row in purchase_rows:
if row.get("normalized_item_name") != "BANANA":
continue
if not row.get("canonical_product_id"):
continue
if row["retailer"] == "giant" and row.get("price_per_lb"):
giant_banana = row
if row["retailer"] == "costco" and row.get("price_per_lb"):
costco_banana = row
if not giant_banana or not costco_banana:
return []
return [
{
"example_name": "banana_price_per_lb",
"canonical_product_id": giant_banana["canonical_product_id"],
"giant_purchase_date": giant_banana["purchase_date"],
"giant_raw_item_name": giant_banana["raw_item_name"],
"giant_price_per_lb": giant_banana["price_per_lb"],
"costco_purchase_date": costco_banana["purchase_date"],
"costco_raw_item_name": costco_banana["raw_item_name"],
"costco_price_per_lb": costco_banana["price_per_lb"],
"notes": "Example comparison using normalized price_per_lb across Giant and Costco",
}
]
@click.command()
@click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True)
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
def main(
giant_items_enriched_csv,
costco_items_enriched_csv,
giant_orders_csv,
costco_orders_csv,
output_csv,
examples_csv,
):
purchase_rows = build_purchase_rows(
read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv),
)
example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv} "
f"and {len(example_rows)} comparison examples to {examples_csv}"
)
if __name__ == "__main__":
main()