Add purchase analysis summaries

This commit is contained in:
ben
2026-03-24 16:48:53 -04:00
parent c35688c87f
commit 46a3b2c639
3 changed files with 427 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ Run each script step-by-step from the terminal.
5. `build_purchases.py`: combine retailer outputs into one purchase table
6. `review_products.py`: review unresolved product matches in the terminal
7. `report_pipeline_status.py`: show how many rows survive each stage
8. `analyze_purchases.py`: write chart-ready analysis CSVs from the purchase table
Active refactor entrypoints:
- `collect_giant_web.py`
@@ -87,6 +88,7 @@ python review_products.py
python build_purchases.py
python review_products.py --refresh-only
python report_pipeline_status.py
python analyze_purchases.py
```
Why run `build_purchases.py` twice:
@@ -121,6 +123,11 @@ Costco:
Combined:
- `data/review/purchases.csv`
- `data/review/analysis/item_price_over_time.csv`
- `data/review/analysis/spend_by_visit.csv`
- `data/review/analysis/items_per_visit.csv`
- `data/review/analysis/category_spend_over_time.csv`
- `data/review/analysis/retailer_store_breakdown.csv`
- `data/review/review_queue.csv`
- `data/review/review_resolutions.csv`
- `data/review/product_links.csv`

271
analyze_purchases.py Normal file
View File

@@ -0,0 +1,271 @@
from collections import defaultdict
from pathlib import Path
import click
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, write_csv_rows
ITEM_PRICE_FIELDS = [
"purchase_date",
"retailer",
"store_name",
"store_number",
"store_city",
"store_state",
"order_id",
"catalog_id",
"catalog_name",
"category",
"product_type",
"effective_price",
"effective_price_unit",
"net_line_total",
"normalized_quantity",
]
SPEND_BY_VISIT_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"store_name",
"store_number",
"store_city",
"store_state",
"visit_spend_total",
]
ITEMS_PER_VISIT_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"store_name",
"store_number",
"store_city",
"store_state",
"item_row_count",
"distinct_catalog_count",
]
CATEGORY_SPEND_FIELDS = [
"purchase_date",
"retailer",
"category",
"category_spend_total",
]
RETAILER_STORE_FIELDS = [
"retailer",
"store_name",
"store_number",
"store_city",
"store_state",
"visit_count",
"item_row_count",
"store_spend_total",
]
def effective_total(row):
total = to_decimal(row.get("net_line_total"))
if total is not None:
return total
return to_decimal(row.get("line_total"))
def is_item_row(row):
return (
row.get("is_fee") != "true"
and row.get("is_discount_line") != "true"
and row.get("is_coupon_line") != "true"
)
def build_item_price_rows(purchase_rows):
rows = []
for row in purchase_rows:
if not row.get("catalog_name") or not row.get("effective_price"):
continue
rows.append(
{
"purchase_date": row.get("purchase_date", ""),
"retailer": row.get("retailer", ""),
"store_name": row.get("store_name", ""),
"store_number": row.get("store_number", ""),
"store_city": row.get("store_city", ""),
"store_state": row.get("store_state", ""),
"order_id": row.get("order_id", ""),
"catalog_id": row.get("catalog_id", ""),
"catalog_name": row.get("catalog_name", ""),
"category": row.get("category", ""),
"product_type": row.get("product_type", ""),
"effective_price": row.get("effective_price", ""),
"effective_price_unit": row.get("effective_price_unit", ""),
"net_line_total": row.get("net_line_total", ""),
"normalized_quantity": row.get("normalized_quantity", ""),
}
)
return rows
def build_spend_by_visit_rows(purchase_rows):
grouped = defaultdict(lambda: {"total": to_decimal("0")})
for row in purchase_rows:
total = effective_total(row)
if total is None:
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
row.get("order_id", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["total"] += total
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"order_id": key[2],
"store_name": key[3],
"store_number": key[4],
"store_city": key[5],
"store_state": key[6],
"visit_spend_total": format_decimal(values["total"]),
}
)
return rows
def build_items_per_visit_rows(purchase_rows):
grouped = defaultdict(lambda: {"item_rows": 0, "catalog_ids": set()})
for row in purchase_rows:
if not is_item_row(row):
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
row.get("order_id", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["item_rows"] += 1
if row.get("catalog_id"):
grouped[key]["catalog_ids"].add(row["catalog_id"])
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"order_id": key[2],
"store_name": key[3],
"store_number": key[4],
"store_city": key[5],
"store_state": key[6],
"item_row_count": str(values["item_rows"]),
"distinct_catalog_count": str(len(values["catalog_ids"])),
}
)
return rows
def build_category_spend_rows(purchase_rows):
grouped = defaultdict(lambda: to_decimal("0"))
for row in purchase_rows:
category = row.get("category", "")
total = effective_total(row)
if not category or total is None:
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
category,
)
grouped[key] += total
rows = []
for key, total in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"category": key[2],
"category_spend_total": format_decimal(total),
}
)
return rows
def build_retailer_store_rows(purchase_rows):
grouped = defaultdict(lambda: {"visit_ids": set(), "item_rows": 0, "total": to_decimal("0")})
for row in purchase_rows:
total = effective_total(row)
key = (
row.get("retailer", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["visit_ids"].add((row.get("purchase_date", ""), row.get("order_id", "")))
if is_item_row(row):
grouped[key]["item_rows"] += 1
if total is not None:
grouped[key]["total"] += total
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"retailer": key[0],
"store_name": key[1],
"store_number": key[2],
"store_city": key[3],
"store_state": key[4],
"visit_count": str(len(values["visit_ids"])),
"item_row_count": str(values["item_rows"]),
"store_spend_total": format_decimal(values["total"]),
}
)
return rows
@click.command()
@click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--output-dir", default="data/review/analysis", show_default=True)
def main(purchases_csv, output_dir):
purchase_rows = read_csv_rows(purchases_csv)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
item_price_rows = build_item_price_rows(purchase_rows)
spend_by_visit_rows = build_spend_by_visit_rows(purchase_rows)
items_per_visit_rows = build_items_per_visit_rows(purchase_rows)
category_spend_rows = build_category_spend_rows(purchase_rows)
retailer_store_rows = build_retailer_store_rows(purchase_rows)
outputs = [
("item_price_over_time.csv", item_price_rows, ITEM_PRICE_FIELDS),
("spend_by_visit.csv", spend_by_visit_rows, SPEND_BY_VISIT_FIELDS),
("items_per_visit.csv", items_per_visit_rows, ITEMS_PER_VISIT_FIELDS),
("category_spend_over_time.csv", category_spend_rows, CATEGORY_SPEND_FIELDS),
("retailer_store_breakdown.csv", retailer_store_rows, RETAILER_STORE_FIELDS),
]
for filename, rows, fieldnames in outputs:
write_csv_rows(output_path / filename, rows, fieldnames)
click.echo(f"wrote analysis outputs to {output_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
import csv
import tempfile
import unittest
from pathlib import Path
import analyze_purchases
class AnalyzePurchasesTests(unittest.TestCase):
def test_analysis_outputs_cover_required_views(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
output_dir = Path(tmpdir) / "analysis"
fieldnames = [
"purchase_date",
"retailer",
"order_id",
"catalog_id",
"catalog_name",
"category",
"product_type",
"net_line_total",
"line_total",
"normalized_quantity",
"normalized_quantity_unit",
"effective_price",
"effective_price_unit",
"store_name",
"store_number",
"store_city",
"store_state",
"is_fee",
"is_discount_line",
"is_coupon_line",
]
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-01",
"retailer": "giant",
"order_id": "g1",
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"net_line_total": "1.29",
"line_total": "1.29",
"normalized_quantity": "2.19",
"normalized_quantity_unit": "lb",
"effective_price": "0.589",
"effective_price_unit": "lb",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"purchase_date": "2026-03-01",
"retailer": "giant",
"order_id": "g1",
"catalog_id": "cat_ice",
"catalog_name": "ICE",
"category": "frozen",
"product_type": "ice",
"net_line_total": "3.50",
"line_total": "3.50",
"normalized_quantity": "20",
"normalized_quantity_unit": "lb",
"effective_price": "0.175",
"effective_price_unit": "lb",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"purchase_date": "2026-03-02",
"retailer": "costco",
"order_id": "c1",
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"net_line_total": "1.49",
"line_total": "2.98",
"normalized_quantity": "3",
"normalized_quantity_unit": "lb",
"effective_price": "0.4967",
"effective_price_unit": "lb",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
]
)
analyze_purchases.main.callback(
purchases_csv=str(purchases_csv),
output_dir=str(output_dir),
)
expected_files = [
"item_price_over_time.csv",
"spend_by_visit.csv",
"items_per_visit.csv",
"category_spend_over_time.csv",
"retailer_store_breakdown.csv",
]
for name in expected_files:
self.assertTrue((output_dir / name).exists(), name)
with (output_dir / "spend_by_visit.csv").open(newline="", encoding="utf-8") as handle:
spend_rows = list(csv.DictReader(handle))
self.assertEqual("4.79", spend_rows[0]["visit_spend_total"])
with (output_dir / "items_per_visit.csv").open(newline="", encoding="utf-8") as handle:
item_rows = list(csv.DictReader(handle))
self.assertEqual("2", item_rows[0]["item_row_count"])
self.assertEqual("2", item_rows[0]["distinct_catalog_count"])
with (output_dir / "category_spend_over_time.csv").open(newline="", encoding="utf-8") as handle:
category_rows = list(csv.DictReader(handle))
produce_row = next(row for row in category_rows if row["purchase_date"] == "2026-03-01" and row["category"] == "produce")
self.assertEqual("1.29", produce_row["category_spend_total"])
with (output_dir / "retailer_store_breakdown.csv").open(newline="", encoding="utf-8") as handle:
store_rows = list(csv.DictReader(handle))
giant_row = next(row for row in store_rows if row["retailer"] == "giant")
self.assertEqual("1", giant_row["visit_count"])
self.assertEqual("2", giant_row["item_row_count"])
self.assertEqual("4.79", giant_row["store_spend_total"])
if __name__ == "__main__":
unittest.main()