Build pivot-ready purchase log
This commit is contained in:
274
build_purchases.py
Normal file
274
build_purchases.py
Normal file
@@ -0,0 +1,274 @@
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
import build_canonical_layer
|
||||
import build_observed_products
|
||||
import validate_cross_retailer_flow
|
||||
from enrich_giant import format_decimal, to_decimal
|
||||
from layer_helpers import read_csv_rows, write_csv_rows
|
||||
|
||||
|
||||
PURCHASE_FIELDS = [
|
||||
"purchase_date",
|
||||
"retailer",
|
||||
"order_id",
|
||||
"line_no",
|
||||
"observed_item_key",
|
||||
"observed_product_id",
|
||||
"canonical_product_id",
|
||||
"raw_item_name",
|
||||
"normalized_item_name",
|
||||
"retailer_item_id",
|
||||
"upc",
|
||||
"qty",
|
||||
"unit",
|
||||
"pack_qty",
|
||||
"size_value",
|
||||
"size_unit",
|
||||
"measure_type",
|
||||
"line_total",
|
||||
"unit_price",
|
||||
"store_name",
|
||||
"store_number",
|
||||
"store_city",
|
||||
"store_state",
|
||||
"price_per_each",
|
||||
"price_per_each_basis",
|
||||
"price_per_count",
|
||||
"price_per_count_basis",
|
||||
"price_per_lb",
|
||||
"price_per_lb_basis",
|
||||
"price_per_oz",
|
||||
"price_per_oz_basis",
|
||||
"is_discount_line",
|
||||
"is_coupon_line",
|
||||
"is_fee",
|
||||
"raw_order_path",
|
||||
]
|
||||
|
||||
EXAMPLE_FIELDS = [
|
||||
"example_name",
|
||||
"canonical_product_id",
|
||||
"giant_purchase_date",
|
||||
"giant_raw_item_name",
|
||||
"giant_price_per_lb",
|
||||
"costco_purchase_date",
|
||||
"costco_raw_item_name",
|
||||
"costco_price_per_lb",
|
||||
"notes",
|
||||
]
|
||||
|
||||
|
||||
def decimal_or_zero(value):
|
||||
return to_decimal(value) or Decimal("0")
|
||||
|
||||
|
||||
def derive_metrics(row):
|
||||
line_total = to_decimal(row.get("line_total"))
|
||||
qty = to_decimal(row.get("qty"))
|
||||
pack_qty = to_decimal(row.get("pack_qty"))
|
||||
size_value = to_decimal(row.get("size_value"))
|
||||
picked_weight = to_decimal(row.get("picked_weight"))
|
||||
size_unit = row.get("size_unit", "")
|
||||
|
||||
price_per_each = row.get("price_per_each", "")
|
||||
price_per_lb = row.get("price_per_lb", "")
|
||||
price_per_oz = row.get("price_per_oz", "")
|
||||
price_per_count = ""
|
||||
|
||||
basis_each = ""
|
||||
basis_count = ""
|
||||
basis_lb = ""
|
||||
basis_oz = ""
|
||||
|
||||
if price_per_each:
|
||||
basis_each = "line_total_over_qty"
|
||||
elif line_total is not None and qty not in (None, 0):
|
||||
price_per_each = format_decimal(line_total / qty)
|
||||
basis_each = "line_total_over_qty"
|
||||
|
||||
if line_total is not None and pack_qty not in (None, 0):
|
||||
total_count = pack_qty * (qty or Decimal("1"))
|
||||
if total_count not in (None, 0):
|
||||
price_per_count = format_decimal(line_total / total_count)
|
||||
basis_count = "line_total_over_pack_qty"
|
||||
|
||||
if picked_weight not in (None, 0):
|
||||
price_per_lb = format_decimal(line_total / picked_weight) if line_total is not None else ""
|
||||
price_per_oz = (
|
||||
format_decimal((line_total / picked_weight) / Decimal("16"))
|
||||
if line_total is not None
|
||||
else ""
|
||||
)
|
||||
basis_lb = "picked_weight_lb"
|
||||
basis_oz = "picked_weight_lb_to_oz"
|
||||
elif line_total is not None and size_value not in (None, 0):
|
||||
total_units = size_value * (pack_qty or Decimal("1")) * (qty or Decimal("1"))
|
||||
if size_unit == "lb" and total_units not in (None, 0):
|
||||
per_lb = line_total / total_units
|
||||
price_per_lb = format_decimal(per_lb)
|
||||
price_per_oz = format_decimal(per_lb / Decimal("16"))
|
||||
basis_lb = "parsed_size_lb"
|
||||
basis_oz = "parsed_size_lb_to_oz"
|
||||
elif size_unit == "oz" and total_units not in (None, 0):
|
||||
per_oz = line_total / total_units
|
||||
price_per_oz = format_decimal(per_oz)
|
||||
price_per_lb = format_decimal(per_oz * Decimal("16"))
|
||||
basis_lb = "parsed_size_oz_to_lb"
|
||||
basis_oz = "parsed_size_oz"
|
||||
|
||||
return {
|
||||
"price_per_each": price_per_each,
|
||||
"price_per_each_basis": basis_each,
|
||||
"price_per_count": price_per_count,
|
||||
"price_per_count_basis": basis_count,
|
||||
"price_per_lb": price_per_lb,
|
||||
"price_per_lb_basis": basis_lb,
|
||||
"price_per_oz": price_per_oz,
|
||||
"price_per_oz_basis": basis_oz,
|
||||
}
|
||||
|
||||
|
||||
def order_lookup(rows, retailer):
|
||||
return {
|
||||
(retailer, row["order_id"]): row
|
||||
for row in rows
|
||||
}
|
||||
|
||||
|
||||
def build_link_lookup(enriched_rows):
|
||||
observed_rows = build_observed_products.build_observed_products(enriched_rows)
|
||||
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
|
||||
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
|
||||
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair(
|
||||
canonical_rows,
|
||||
link_rows,
|
||||
giant_row,
|
||||
costco_row,
|
||||
)
|
||||
|
||||
observed_id_by_key = {
|
||||
row["observed_key"]: row["observed_product_id"] for row in observed_rows
|
||||
}
|
||||
canonical_id_by_observed = {
|
||||
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
|
||||
}
|
||||
return observed_id_by_key, canonical_id_by_observed
|
||||
|
||||
|
||||
def build_purchase_rows(giant_enriched_rows, costco_enriched_rows, giant_orders, costco_orders):
|
||||
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
|
||||
observed_id_by_key, canonical_id_by_observed = build_link_lookup(all_enriched_rows)
|
||||
orders_by_id = {}
|
||||
orders_by_id.update(order_lookup(giant_orders, "giant"))
|
||||
orders_by_id.update(order_lookup(costco_orders, "costco"))
|
||||
|
||||
purchase_rows = []
|
||||
for row in sorted(
|
||||
all_enriched_rows,
|
||||
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
|
||||
):
|
||||
observed_key = build_observed_products.build_observed_key(row)
|
||||
observed_product_id = observed_id_by_key.get(observed_key, "")
|
||||
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
|
||||
metrics = derive_metrics(row)
|
||||
purchase_rows.append(
|
||||
{
|
||||
"purchase_date": row["order_date"],
|
||||
"retailer": row["retailer"],
|
||||
"order_id": row["order_id"],
|
||||
"line_no": row["line_no"],
|
||||
"observed_item_key": row["observed_item_key"],
|
||||
"observed_product_id": observed_product_id,
|
||||
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
|
||||
"raw_item_name": row["item_name"],
|
||||
"normalized_item_name": row["item_name_norm"],
|
||||
"retailer_item_id": row["retailer_item_id"],
|
||||
"upc": row["upc"],
|
||||
"qty": row["qty"],
|
||||
"unit": row["unit"],
|
||||
"pack_qty": row["pack_qty"],
|
||||
"size_value": row["size_value"],
|
||||
"size_unit": row["size_unit"],
|
||||
"measure_type": row["measure_type"],
|
||||
"line_total": row["line_total"],
|
||||
"unit_price": row["unit_price"],
|
||||
"store_name": order_row.get("store_name", ""),
|
||||
"store_number": order_row.get("store_number", ""),
|
||||
"store_city": order_row.get("store_city", ""),
|
||||
"store_state": order_row.get("store_state", ""),
|
||||
"is_discount_line": row["is_discount_line"],
|
||||
"is_coupon_line": row["is_coupon_line"],
|
||||
"is_fee": row["is_fee"],
|
||||
"raw_order_path": row["raw_order_path"],
|
||||
**metrics,
|
||||
}
|
||||
)
|
||||
return purchase_rows
|
||||
|
||||
|
||||
def build_comparison_examples(purchase_rows):
|
||||
giant_banana = None
|
||||
costco_banana = None
|
||||
for row in purchase_rows:
|
||||
if row.get("normalized_item_name") != "BANANA":
|
||||
continue
|
||||
if not row.get("canonical_product_id"):
|
||||
continue
|
||||
if row["retailer"] == "giant" and row.get("price_per_lb"):
|
||||
giant_banana = row
|
||||
if row["retailer"] == "costco" and row.get("price_per_lb"):
|
||||
costco_banana = row
|
||||
|
||||
if not giant_banana or not costco_banana:
|
||||
return []
|
||||
|
||||
return [
|
||||
{
|
||||
"example_name": "banana_price_per_lb",
|
||||
"canonical_product_id": giant_banana["canonical_product_id"],
|
||||
"giant_purchase_date": giant_banana["purchase_date"],
|
||||
"giant_raw_item_name": giant_banana["raw_item_name"],
|
||||
"giant_price_per_lb": giant_banana["price_per_lb"],
|
||||
"costco_purchase_date": costco_banana["purchase_date"],
|
||||
"costco_raw_item_name": costco_banana["raw_item_name"],
|
||||
"costco_price_per_lb": costco_banana["price_per_lb"],
|
||||
"notes": "Example comparison using normalized price_per_lb across Giant and Costco",
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True)
|
||||
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
|
||||
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
|
||||
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
|
||||
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
|
||||
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
|
||||
def main(
|
||||
giant_items_enriched_csv,
|
||||
costco_items_enriched_csv,
|
||||
giant_orders_csv,
|
||||
costco_orders_csv,
|
||||
output_csv,
|
||||
examples_csv,
|
||||
):
|
||||
purchase_rows = build_purchase_rows(
|
||||
read_csv_rows(giant_items_enriched_csv),
|
||||
read_csv_rows(costco_items_enriched_csv),
|
||||
read_csv_rows(giant_orders_csv),
|
||||
read_csv_rows(costco_orders_csv),
|
||||
)
|
||||
example_rows = build_comparison_examples(purchase_rows)
|
||||
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
|
||||
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
|
||||
click.echo(
|
||||
f"wrote {len(purchase_rows)} purchase rows to {output_csv} "
|
||||
f"and {len(example_rows)} comparison examples to {examples_csv}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -670,6 +670,13 @@ def main(
|
||||
client_identifier=config["client_identifier"],
|
||||
)
|
||||
session = build_session(profile_dir, auth_headers)
|
||||
click.echo(
|
||||
"session bootstrap: "
|
||||
f"cookies={True} "
|
||||
f"authorization={bool(auth_headers.get('costco-x-authorization'))} "
|
||||
f"client_id={bool(auth_headers.get('costco-x-wcs-clientId'))} "
|
||||
f"client_identifier={bool(auth_headers.get('client-identifier'))}"
|
||||
)
|
||||
|
||||
start_date, end_date = resolve_date_range(months_back)
|
||||
|
||||
|
||||
213
tests/test_purchases.py
Normal file
213
tests/test_purchases.py
Normal file
@@ -0,0 +1,213 @@
|
||||
import csv
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import build_purchases
|
||||
import enrich_costco
|
||||
|
||||
|
||||
class PurchaseLogTests(unittest.TestCase):
|
||||
def test_derive_metrics_prefers_picked_weight_and_pack_count(self):
|
||||
metrics = build_purchases.derive_metrics(
|
||||
{
|
||||
"line_total": "4.00",
|
||||
"qty": "1",
|
||||
"pack_qty": "4",
|
||||
"size_value": "",
|
||||
"size_unit": "",
|
||||
"picked_weight": "2",
|
||||
"price_per_each": "",
|
||||
"price_per_lb": "",
|
||||
"price_per_oz": "",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual("4", metrics["price_per_each"])
|
||||
self.assertEqual("1", metrics["price_per_count"])
|
||||
self.assertEqual("2", metrics["price_per_lb"])
|
||||
self.assertEqual("0.125", metrics["price_per_oz"])
|
||||
self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"])
|
||||
|
||||
def test_build_purchase_rows_maps_canonical_ids(self):
|
||||
fieldnames = enrich_costco.OUTPUT_FIELDS
|
||||
giant_row = {field: "" for field in fieldnames}
|
||||
giant_row.update(
|
||||
{
|
||||
"retailer": "giant",
|
||||
"order_id": "g1",
|
||||
"line_no": "1",
|
||||
"observed_item_key": "giant:g1:1",
|
||||
"order_date": "2026-03-01",
|
||||
"item_name": "FRESH BANANA",
|
||||
"item_name_norm": "BANANA",
|
||||
"retailer_item_id": "100",
|
||||
"upc": "4011",
|
||||
"qty": "1",
|
||||
"unit": "LB",
|
||||
"line_total": "1.29",
|
||||
"unit_price": "1.29",
|
||||
"measure_type": "weight",
|
||||
"price_per_lb": "1.29",
|
||||
"raw_order_path": "giant_output/raw/g1.json",
|
||||
"is_discount_line": "false",
|
||||
"is_coupon_line": "false",
|
||||
"is_fee": "false",
|
||||
}
|
||||
)
|
||||
costco_row = {field: "" for field in fieldnames}
|
||||
costco_row.update(
|
||||
{
|
||||
"retailer": "costco",
|
||||
"order_id": "c1",
|
||||
"line_no": "1",
|
||||
"observed_item_key": "costco:c1:1",
|
||||
"order_date": "2026-03-12",
|
||||
"item_name": "BANANAS 3 LB / 1.36 KG",
|
||||
"item_name_norm": "BANANA",
|
||||
"retailer_item_id": "30669",
|
||||
"qty": "1",
|
||||
"unit": "E",
|
||||
"line_total": "2.98",
|
||||
"unit_price": "2.98",
|
||||
"size_value": "3",
|
||||
"size_unit": "lb",
|
||||
"measure_type": "weight",
|
||||
"price_per_lb": "0.9933",
|
||||
"raw_order_path": "costco_output/raw/c1.json",
|
||||
"is_discount_line": "false",
|
||||
"is_coupon_line": "false",
|
||||
"is_fee": "false",
|
||||
}
|
||||
)
|
||||
giant_orders = [
|
||||
{
|
||||
"order_id": "g1",
|
||||
"store_name": "Giant",
|
||||
"store_number": "42",
|
||||
"store_city": "Springfield",
|
||||
"store_state": "VA",
|
||||
}
|
||||
]
|
||||
costco_orders = [
|
||||
{
|
||||
"order_id": "c1",
|
||||
"store_name": "MT VERNON",
|
||||
"store_number": "1115",
|
||||
"store_city": "ALEXANDRIA",
|
||||
"store_state": "VA",
|
||||
}
|
||||
]
|
||||
|
||||
rows = build_purchases.build_purchase_rows(
|
||||
[giant_row],
|
||||
[costco_row],
|
||||
giant_orders,
|
||||
costco_orders,
|
||||
)
|
||||
|
||||
self.assertEqual(2, len(rows))
|
||||
self.assertTrue(all(row["canonical_product_id"] for row in rows))
|
||||
self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows})
|
||||
|
||||
def test_main_writes_purchase_and_example_csvs(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
giant_items = Path(tmpdir) / "giant_items.csv"
|
||||
costco_items = Path(tmpdir) / "costco_items.csv"
|
||||
giant_orders = Path(tmpdir) / "giant_orders.csv"
|
||||
costco_orders = Path(tmpdir) / "costco_orders.csv"
|
||||
purchases_csv = Path(tmpdir) / "combined" / "purchases.csv"
|
||||
examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv"
|
||||
|
||||
fieldnames = enrich_costco.OUTPUT_FIELDS
|
||||
rows = []
|
||||
giant_row = {field: "" for field in fieldnames}
|
||||
giant_row.update(
|
||||
{
|
||||
"retailer": "giant",
|
||||
"order_id": "g1",
|
||||
"line_no": "1",
|
||||
"observed_item_key": "giant:g1:1",
|
||||
"order_date": "2026-03-01",
|
||||
"item_name": "FRESH BANANA",
|
||||
"item_name_norm": "BANANA",
|
||||
"retailer_item_id": "100",
|
||||
"upc": "4011",
|
||||
"qty": "1",
|
||||
"unit": "LB",
|
||||
"line_total": "1.29",
|
||||
"unit_price": "1.29",
|
||||
"measure_type": "weight",
|
||||
"price_per_lb": "1.29",
|
||||
"raw_order_path": "giant_output/raw/g1.json",
|
||||
"is_discount_line": "false",
|
||||
"is_coupon_line": "false",
|
||||
"is_fee": "false",
|
||||
}
|
||||
)
|
||||
costco_row = {field: "" for field in fieldnames}
|
||||
costco_row.update(
|
||||
{
|
||||
"retailer": "costco",
|
||||
"order_id": "c1",
|
||||
"line_no": "1",
|
||||
"observed_item_key": "costco:c1:1",
|
||||
"order_date": "2026-03-12",
|
||||
"item_name": "BANANAS 3 LB / 1.36 KG",
|
||||
"item_name_norm": "BANANA",
|
||||
"retailer_item_id": "30669",
|
||||
"qty": "1",
|
||||
"unit": "E",
|
||||
"line_total": "2.98",
|
||||
"unit_price": "2.98",
|
||||
"size_value": "3",
|
||||
"size_unit": "lb",
|
||||
"measure_type": "weight",
|
||||
"price_per_lb": "0.9933",
|
||||
"raw_order_path": "costco_output/raw/c1.json",
|
||||
"is_discount_line": "false",
|
||||
"is_coupon_line": "false",
|
||||
"is_fee": "false",
|
||||
}
|
||||
)
|
||||
rows.extend([giant_row, costco_row])
|
||||
|
||||
for path, source_rows in [
|
||||
(giant_items, [giant_row]),
|
||||
(costco_items, [costco_row]),
|
||||
]:
|
||||
with path.open("w", newline="", encoding="utf-8") as handle:
|
||||
writer = csv.DictWriter(handle, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(source_rows)
|
||||
|
||||
for path, source_rows in [
|
||||
(giant_orders, [{"order_id": "g1", "store_name": "Giant", "store_number": "42", "store_city": "Springfield", "store_state": "VA"}]),
|
||||
(costco_orders, [{"order_id": "c1", "store_name": "MT VERNON", "store_number": "1115", "store_city": "ALEXANDRIA", "store_state": "VA"}]),
|
||||
]:
|
||||
with path.open("w", newline="", encoding="utf-8") as handle:
|
||||
writer = csv.DictWriter(handle, fieldnames=["order_id", "store_name", "store_number", "store_city", "store_state"])
|
||||
writer.writeheader()
|
||||
writer.writerows(source_rows)
|
||||
|
||||
build_purchases.main.callback(
|
||||
giant_items_enriched_csv=str(giant_items),
|
||||
costco_items_enriched_csv=str(costco_items),
|
||||
giant_orders_csv=str(giant_orders),
|
||||
costco_orders_csv=str(costco_orders),
|
||||
output_csv=str(purchases_csv),
|
||||
examples_csv=str(examples_csv),
|
||||
)
|
||||
|
||||
self.assertTrue(purchases_csv.exists())
|
||||
self.assertTrue(examples_csv.exists())
|
||||
with purchases_csv.open(newline="", encoding="utf-8") as handle:
|
||||
purchase_rows = list(csv.DictReader(handle))
|
||||
with examples_csv.open(newline="", encoding="utf-8") as handle:
|
||||
example_rows = list(csv.DictReader(handle))
|
||||
self.assertEqual(2, len(purchase_rows))
|
||||
self.assertEqual(1, len(example_rows))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user