diff --git a/README.md b/README.md index 7b01541..41cb5a2 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Run each script step-by-step from the terminal. 5. `build_purchases.py`: combine retailer outputs into one purchase table 6. `review_products.py`: review unresolved product matches in the terminal 7. `report_pipeline_status.py`: show how many rows survive each stage +8. `analyze_purchases.py`: write chart-ready analysis CSVs from the purchase table Active refactor entrypoints: - `collect_giant_web.py` @@ -87,6 +88,7 @@ python review_products.py python build_purchases.py python review_products.py --refresh-only python report_pipeline_status.py +python analyze_purchases.py ``` Why run `build_purchases.py` twice: @@ -121,6 +123,11 @@ Costco: Combined: - `data/review/purchases.csv` +- `data/review/analysis/item_price_over_time.csv` +- `data/review/analysis/spend_by_visit.csv` +- `data/review/analysis/items_per_visit.csv` +- `data/review/analysis/category_spend_over_time.csv` +- `data/review/analysis/retailer_store_breakdown.csv` - `data/review/review_queue.csv` - `data/review/review_resolutions.csv` - `data/review/product_links.csv` diff --git a/analyze_purchases.py b/analyze_purchases.py new file mode 100644 index 0000000..88676ea --- /dev/null +++ b/analyze_purchases.py @@ -0,0 +1,271 @@ +from collections import defaultdict +from pathlib import Path + +import click + +from enrich_giant import format_decimal, to_decimal +from layer_helpers import read_csv_rows, write_csv_rows + + +ITEM_PRICE_FIELDS = [ + "purchase_date", + "retailer", + "store_name", + "store_number", + "store_city", + "store_state", + "order_id", + "catalog_id", + "catalog_name", + "category", + "product_type", + "effective_price", + "effective_price_unit", + "net_line_total", + "normalized_quantity", +] + +SPEND_BY_VISIT_FIELDS = [ + "purchase_date", + "retailer", + "order_id", + "store_name", + "store_number", + "store_city", + "store_state", + "visit_spend_total", +] + +ITEMS_PER_VISIT_FIELDS = [ + "purchase_date", + "retailer", + "order_id", + "store_name", + "store_number", + "store_city", + "store_state", + "item_row_count", + "distinct_catalog_count", +] + +CATEGORY_SPEND_FIELDS = [ + "purchase_date", + "retailer", + "category", + "category_spend_total", +] + +RETAILER_STORE_FIELDS = [ + "retailer", + "store_name", + "store_number", + "store_city", + "store_state", + "visit_count", + "item_row_count", + "store_spend_total", +] + + +def effective_total(row): + total = to_decimal(row.get("net_line_total")) + if total is not None: + return total + return to_decimal(row.get("line_total")) + + +def is_item_row(row): + return ( + row.get("is_fee") != "true" + and row.get("is_discount_line") != "true" + and row.get("is_coupon_line") != "true" + ) + + +def build_item_price_rows(purchase_rows): + rows = [] + for row in purchase_rows: + if not row.get("catalog_name") or not row.get("effective_price"): + continue + rows.append( + { + "purchase_date": row.get("purchase_date", ""), + "retailer": row.get("retailer", ""), + "store_name": row.get("store_name", ""), + "store_number": row.get("store_number", ""), + "store_city": row.get("store_city", ""), + "store_state": row.get("store_state", ""), + "order_id": row.get("order_id", ""), + "catalog_id": row.get("catalog_id", ""), + "catalog_name": row.get("catalog_name", ""), + "category": row.get("category", ""), + "product_type": row.get("product_type", ""), + "effective_price": row.get("effective_price", ""), + "effective_price_unit": row.get("effective_price_unit", ""), + "net_line_total": row.get("net_line_total", ""), + "normalized_quantity": row.get("normalized_quantity", ""), + } + ) + return rows + + +def build_spend_by_visit_rows(purchase_rows): + grouped = defaultdict(lambda: {"total": to_decimal("0")}) + for row in purchase_rows: + total = effective_total(row) + if total is None: + continue + key = ( + row.get("purchase_date", ""), + row.get("retailer", ""), + row.get("order_id", ""), + row.get("store_name", ""), + row.get("store_number", ""), + row.get("store_city", ""), + row.get("store_state", ""), + ) + grouped[key]["total"] += total + + rows = [] + for key, values in sorted(grouped.items()): + rows.append( + { + "purchase_date": key[0], + "retailer": key[1], + "order_id": key[2], + "store_name": key[3], + "store_number": key[4], + "store_city": key[5], + "store_state": key[6], + "visit_spend_total": format_decimal(values["total"]), + } + ) + return rows + + +def build_items_per_visit_rows(purchase_rows): + grouped = defaultdict(lambda: {"item_rows": 0, "catalog_ids": set()}) + for row in purchase_rows: + if not is_item_row(row): + continue + key = ( + row.get("purchase_date", ""), + row.get("retailer", ""), + row.get("order_id", ""), + row.get("store_name", ""), + row.get("store_number", ""), + row.get("store_city", ""), + row.get("store_state", ""), + ) + grouped[key]["item_rows"] += 1 + if row.get("catalog_id"): + grouped[key]["catalog_ids"].add(row["catalog_id"]) + + rows = [] + for key, values in sorted(grouped.items()): + rows.append( + { + "purchase_date": key[0], + "retailer": key[1], + "order_id": key[2], + "store_name": key[3], + "store_number": key[4], + "store_city": key[5], + "store_state": key[6], + "item_row_count": str(values["item_rows"]), + "distinct_catalog_count": str(len(values["catalog_ids"])), + } + ) + return rows + + +def build_category_spend_rows(purchase_rows): + grouped = defaultdict(lambda: to_decimal("0")) + for row in purchase_rows: + category = row.get("category", "") + total = effective_total(row) + if not category or total is None: + continue + key = ( + row.get("purchase_date", ""), + row.get("retailer", ""), + category, + ) + grouped[key] += total + + rows = [] + for key, total in sorted(grouped.items()): + rows.append( + { + "purchase_date": key[0], + "retailer": key[1], + "category": key[2], + "category_spend_total": format_decimal(total), + } + ) + return rows + + +def build_retailer_store_rows(purchase_rows): + grouped = defaultdict(lambda: {"visit_ids": set(), "item_rows": 0, "total": to_decimal("0")}) + for row in purchase_rows: + total = effective_total(row) + key = ( + row.get("retailer", ""), + row.get("store_name", ""), + row.get("store_number", ""), + row.get("store_city", ""), + row.get("store_state", ""), + ) + grouped[key]["visit_ids"].add((row.get("purchase_date", ""), row.get("order_id", ""))) + if is_item_row(row): + grouped[key]["item_rows"] += 1 + if total is not None: + grouped[key]["total"] += total + + rows = [] + for key, values in sorted(grouped.items()): + rows.append( + { + "retailer": key[0], + "store_name": key[1], + "store_number": key[2], + "store_city": key[3], + "store_state": key[4], + "visit_count": str(len(values["visit_ids"])), + "item_row_count": str(values["item_rows"]), + "store_spend_total": format_decimal(values["total"]), + } + ) + return rows + + +@click.command() +@click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True) +@click.option("--output-dir", default="data/review/analysis", show_default=True) +def main(purchases_csv, output_dir): + purchase_rows = read_csv_rows(purchases_csv) + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + item_price_rows = build_item_price_rows(purchase_rows) + spend_by_visit_rows = build_spend_by_visit_rows(purchase_rows) + items_per_visit_rows = build_items_per_visit_rows(purchase_rows) + category_spend_rows = build_category_spend_rows(purchase_rows) + retailer_store_rows = build_retailer_store_rows(purchase_rows) + + outputs = [ + ("item_price_over_time.csv", item_price_rows, ITEM_PRICE_FIELDS), + ("spend_by_visit.csv", spend_by_visit_rows, SPEND_BY_VISIT_FIELDS), + ("items_per_visit.csv", items_per_visit_rows, ITEMS_PER_VISIT_FIELDS), + ("category_spend_over_time.csv", category_spend_rows, CATEGORY_SPEND_FIELDS), + ("retailer_store_breakdown.csv", retailer_store_rows, RETAILER_STORE_FIELDS), + ] + for filename, rows, fieldnames in outputs: + write_csv_rows(output_path / filename, rows, fieldnames) + + click.echo(f"wrote analysis outputs to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/test_analyze_purchases.py b/tests/test_analyze_purchases.py new file mode 100644 index 0000000..a93bece --- /dev/null +++ b/tests/test_analyze_purchases.py @@ -0,0 +1,149 @@ +import csv +import tempfile +import unittest +from pathlib import Path + +import analyze_purchases + + +class AnalyzePurchasesTests(unittest.TestCase): + def test_analysis_outputs_cover_required_views(self): + with tempfile.TemporaryDirectory() as tmpdir: + purchases_csv = Path(tmpdir) / "purchases.csv" + output_dir = Path(tmpdir) / "analysis" + + fieldnames = [ + "purchase_date", + "retailer", + "order_id", + "catalog_id", + "catalog_name", + "category", + "product_type", + "net_line_total", + "line_total", + "normalized_quantity", + "normalized_quantity_unit", + "effective_price", + "effective_price_unit", + "store_name", + "store_number", + "store_city", + "store_state", + "is_fee", + "is_discount_line", + "is_coupon_line", + ] + with purchases_csv.open("w", newline="", encoding="utf-8") as handle: + writer = csv.DictWriter(handle, fieldnames=fieldnames) + writer.writeheader() + writer.writerows( + [ + { + "purchase_date": "2026-03-01", + "retailer": "giant", + "order_id": "g1", + "catalog_id": "cat_banana", + "catalog_name": "BANANA", + "category": "produce", + "product_type": "banana", + "net_line_total": "1.29", + "line_total": "1.29", + "normalized_quantity": "2.19", + "normalized_quantity_unit": "lb", + "effective_price": "0.589", + "effective_price_unit": "lb", + "store_name": "Giant", + "store_number": "42", + "store_city": "Springfield", + "store_state": "VA", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + }, + { + "purchase_date": "2026-03-01", + "retailer": "giant", + "order_id": "g1", + "catalog_id": "cat_ice", + "catalog_name": "ICE", + "category": "frozen", + "product_type": "ice", + "net_line_total": "3.50", + "line_total": "3.50", + "normalized_quantity": "20", + "normalized_quantity_unit": "lb", + "effective_price": "0.175", + "effective_price_unit": "lb", + "store_name": "Giant", + "store_number": "42", + "store_city": "Springfield", + "store_state": "VA", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + }, + { + "purchase_date": "2026-03-02", + "retailer": "costco", + "order_id": "c1", + "catalog_id": "cat_banana", + "catalog_name": "BANANA", + "category": "produce", + "product_type": "banana", + "net_line_total": "1.49", + "line_total": "2.98", + "normalized_quantity": "3", + "normalized_quantity_unit": "lb", + "effective_price": "0.4967", + "effective_price_unit": "lb", + "store_name": "MT VERNON", + "store_number": "1115", + "store_city": "ALEXANDRIA", + "store_state": "VA", + "is_fee": "false", + "is_discount_line": "false", + "is_coupon_line": "false", + }, + ] + ) + + analyze_purchases.main.callback( + purchases_csv=str(purchases_csv), + output_dir=str(output_dir), + ) + + expected_files = [ + "item_price_over_time.csv", + "spend_by_visit.csv", + "items_per_visit.csv", + "category_spend_over_time.csv", + "retailer_store_breakdown.csv", + ] + for name in expected_files: + self.assertTrue((output_dir / name).exists(), name) + + with (output_dir / "spend_by_visit.csv").open(newline="", encoding="utf-8") as handle: + spend_rows = list(csv.DictReader(handle)) + self.assertEqual("4.79", spend_rows[0]["visit_spend_total"]) + + with (output_dir / "items_per_visit.csv").open(newline="", encoding="utf-8") as handle: + item_rows = list(csv.DictReader(handle)) + self.assertEqual("2", item_rows[0]["item_row_count"]) + self.assertEqual("2", item_rows[0]["distinct_catalog_count"]) + + with (output_dir / "category_spend_over_time.csv").open(newline="", encoding="utf-8") as handle: + category_rows = list(csv.DictReader(handle)) + produce_row = next(row for row in category_rows if row["purchase_date"] == "2026-03-01" and row["category"] == "produce") + self.assertEqual("1.29", produce_row["category_spend_total"]) + + with (output_dir / "retailer_store_breakdown.csv").open(newline="", encoding="utf-8") as handle: + store_rows = list(csv.DictReader(handle)) + giant_row = next(row for row in store_rows if row["retailer"] == "giant") + self.assertEqual("1", giant_row["visit_count"]) + self.assertEqual("2", giant_row["item_row_count"]) + self.assertEqual("4.79", giant_row["store_spend_total"]) + + +if __name__ == "__main__": + unittest.main()