Compare commits

...

4 Commits

Author SHA1 Message Date
ben
cdb7a15739 Record t1.21 task evidence 2026-03-24 16:49:01 -04:00
ben
46a3b2c639 Add purchase analysis summaries 2026-03-24 16:48:53 -04:00
ben
c35688c87f Record t1.20 task evidence 2026-03-24 08:29:31 -04:00
ben
6940f165fb Document visit-level purchase analysis 2026-03-24 08:29:26 -04:00
5 changed files with 550 additions and 5 deletions

View File

@@ -13,6 +13,7 @@ Run each script step-by-step from the terminal.
5. `build_purchases.py`: combine retailer outputs into one purchase table
6. `review_products.py`: review unresolved product matches in the terminal
7. `report_pipeline_status.py`: show how many rows survive each stage
8. `analyze_purchases.py`: write chart-ready analysis CSVs from the purchase table
Active refactor entrypoints:
- `collect_giant_web.py`
@@ -87,6 +88,7 @@ python review_products.py
python build_purchases.py
python review_products.py --refresh-only
python report_pipeline_status.py
python analyze_purchases.py
```
Why run `build_purchases.py` twice:
@@ -121,6 +123,11 @@ Costco:
Combined:
- `data/review/purchases.csv`
- `data/review/analysis/item_price_over_time.csv`
- `data/review/analysis/spend_by_visit.csv`
- `data/review/analysis/items_per_visit.csv`
- `data/review/analysis/category_spend_over_time.csv`
- `data/review/analysis/retailer_store_breakdown.csv`
- `data/review/review_queue.csv`
- `data/review/review_resolutions.csv`
- `data/review/product_links.csv`
@@ -129,6 +136,19 @@ Combined:
- `data/review/pipeline_status.json`
- `data/catalog.csv`
`data/review/purchases.csv` is the main analysis artifact. It is designed to support both:
- item-level price analysis
- visit-level analysis such as spend by visit, items per visit, category spend by visit, and retailer/store breakdown
The visit fields are carried directly in `purchases.csv`, so you can pivot on them without extra joins:
- `order_id`
- `purchase_date`
- `retailer`
- `store_name`
- `store_number`
- `store_city`
- `store_state`
## Review Workflow
Run `review_products.py` to cleanup unresolved or weakly unified items:

271
analyze_purchases.py Normal file
View File

@@ -0,0 +1,271 @@
from collections import defaultdict
from pathlib import Path
import click
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, write_csv_rows
ITEM_PRICE_FIELDS = [
"purchase_date",
"retailer",
"store_name",
"store_number",
"store_city",
"store_state",
"order_id",
"catalog_id",
"catalog_name",
"category",
"product_type",
"effective_price",
"effective_price_unit",
"net_line_total",
"normalized_quantity",
]
SPEND_BY_VISIT_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"store_name",
"store_number",
"store_city",
"store_state",
"visit_spend_total",
]
ITEMS_PER_VISIT_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"store_name",
"store_number",
"store_city",
"store_state",
"item_row_count",
"distinct_catalog_count",
]
CATEGORY_SPEND_FIELDS = [
"purchase_date",
"retailer",
"category",
"category_spend_total",
]
RETAILER_STORE_FIELDS = [
"retailer",
"store_name",
"store_number",
"store_city",
"store_state",
"visit_count",
"item_row_count",
"store_spend_total",
]
def effective_total(row):
total = to_decimal(row.get("net_line_total"))
if total is not None:
return total
return to_decimal(row.get("line_total"))
def is_item_row(row):
return (
row.get("is_fee") != "true"
and row.get("is_discount_line") != "true"
and row.get("is_coupon_line") != "true"
)
def build_item_price_rows(purchase_rows):
rows = []
for row in purchase_rows:
if not row.get("catalog_name") or not row.get("effective_price"):
continue
rows.append(
{
"purchase_date": row.get("purchase_date", ""),
"retailer": row.get("retailer", ""),
"store_name": row.get("store_name", ""),
"store_number": row.get("store_number", ""),
"store_city": row.get("store_city", ""),
"store_state": row.get("store_state", ""),
"order_id": row.get("order_id", ""),
"catalog_id": row.get("catalog_id", ""),
"catalog_name": row.get("catalog_name", ""),
"category": row.get("category", ""),
"product_type": row.get("product_type", ""),
"effective_price": row.get("effective_price", ""),
"effective_price_unit": row.get("effective_price_unit", ""),
"net_line_total": row.get("net_line_total", ""),
"normalized_quantity": row.get("normalized_quantity", ""),
}
)
return rows
def build_spend_by_visit_rows(purchase_rows):
grouped = defaultdict(lambda: {"total": to_decimal("0")})
for row in purchase_rows:
total = effective_total(row)
if total is None:
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
row.get("order_id", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["total"] += total
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"order_id": key[2],
"store_name": key[3],
"store_number": key[4],
"store_city": key[5],
"store_state": key[6],
"visit_spend_total": format_decimal(values["total"]),
}
)
return rows
def build_items_per_visit_rows(purchase_rows):
grouped = defaultdict(lambda: {"item_rows": 0, "catalog_ids": set()})
for row in purchase_rows:
if not is_item_row(row):
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
row.get("order_id", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["item_rows"] += 1
if row.get("catalog_id"):
grouped[key]["catalog_ids"].add(row["catalog_id"])
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"order_id": key[2],
"store_name": key[3],
"store_number": key[4],
"store_city": key[5],
"store_state": key[6],
"item_row_count": str(values["item_rows"]),
"distinct_catalog_count": str(len(values["catalog_ids"])),
}
)
return rows
def build_category_spend_rows(purchase_rows):
grouped = defaultdict(lambda: to_decimal("0"))
for row in purchase_rows:
category = row.get("category", "")
total = effective_total(row)
if not category or total is None:
continue
key = (
row.get("purchase_date", ""),
row.get("retailer", ""),
category,
)
grouped[key] += total
rows = []
for key, total in sorted(grouped.items()):
rows.append(
{
"purchase_date": key[0],
"retailer": key[1],
"category": key[2],
"category_spend_total": format_decimal(total),
}
)
return rows
def build_retailer_store_rows(purchase_rows):
grouped = defaultdict(lambda: {"visit_ids": set(), "item_rows": 0, "total": to_decimal("0")})
for row in purchase_rows:
total = effective_total(row)
key = (
row.get("retailer", ""),
row.get("store_name", ""),
row.get("store_number", ""),
row.get("store_city", ""),
row.get("store_state", ""),
)
grouped[key]["visit_ids"].add((row.get("purchase_date", ""), row.get("order_id", "")))
if is_item_row(row):
grouped[key]["item_rows"] += 1
if total is not None:
grouped[key]["total"] += total
rows = []
for key, values in sorted(grouped.items()):
rows.append(
{
"retailer": key[0],
"store_name": key[1],
"store_number": key[2],
"store_city": key[3],
"store_state": key[4],
"visit_count": str(len(values["visit_ids"])),
"item_row_count": str(values["item_rows"]),
"store_spend_total": format_decimal(values["total"]),
}
)
return rows
@click.command()
@click.option("--purchases-csv", default="data/review/purchases.csv", show_default=True)
@click.option("--output-dir", default="data/review/analysis", show_default=True)
def main(purchases_csv, output_dir):
purchase_rows = read_csv_rows(purchases_csv)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
item_price_rows = build_item_price_rows(purchase_rows)
spend_by_visit_rows = build_spend_by_visit_rows(purchase_rows)
items_per_visit_rows = build_items_per_visit_rows(purchase_rows)
category_spend_rows = build_category_spend_rows(purchase_rows)
retailer_store_rows = build_retailer_store_rows(purchase_rows)
outputs = [
("item_price_over_time.csv", item_price_rows, ITEM_PRICE_FIELDS),
("spend_by_visit.csv", spend_by_visit_rows, SPEND_BY_VISIT_FIELDS),
("items_per_visit.csv", items_per_visit_rows, ITEMS_PER_VISIT_FIELDS),
("category_spend_over_time.csv", category_spend_rows, CATEGORY_SPEND_FIELDS),
("retailer_store_breakdown.csv", retailer_store_rows, RETAILER_STORE_FIELDS),
]
for filename, rows, fieldnames in outputs:
write_csv_rows(output_path / filename, rows, fieldnames)
click.echo(f"wrote analysis outputs to {output_path}")
if __name__ == "__main__":
main()

View File

@@ -1021,7 +1021,7 @@ refresh review state from the current normalized universe so missing or broken l
** notes
- `review_products.py` now rebuilds its queue from the current normalized files and order files instead of trusting stale `purchases.csv` state.
- Missing catalog rows and incomplete catalog rows now re-enter review explicitly as `orphaned_catalog_link` or `incomplete_catalog_link`, and excluded rows no longer inflate unresolved-not-in-review accounting.
- Missing catalog rows and incomplete catalog rows now re-enter review explicitly as `orphaned_catalog_link` or `incomplete_catalog_link`, and excluded rows no longer inflate unresolved-not-in-review accounting.
* [X] t1.20: add visit-level fields and outputs for spend analysis (2-4 commits)
ensure purchases retains enough visit/order context to support spend-by-visit and store-level analysis
@@ -1042,13 +1042,15 @@ ensure purchases retains enough visit/order context to support spend-by-visit an
3. documentation or task notes make clear that `purchases.csv` is the primary analysis artifact for both item-level and visit-level reporting
- pm note: do not build dash/plotly here; this task is only about carrying the right data through
** evidence
- commit:
- tests:
** evidence
- commit: `6940f16` `Document visit-level purchase analysis`
- tests: `./venv/bin/python -m unittest tests.test_purchases`; `./venv/bin/python build_purchases.py`
- datetime: 2026-03-24 08:29:13 EDT
** notes
- The needed visit fields were already flowing through `build_purchases.py`; this task locked them in with explicit tests and documentation instead of adding a new visit layer.
- `data/review/purchases.csv` is now documented as the primary analysis artifact for both item-level and visit-level work.
* [X] t1.21: add lightweight charting/analysis surface on top of purchases.csv (2-4 commits)
build a minimal analysis layer for common price and visit charts without changing the csv pipeline
@@ -1061,6 +1063,16 @@ build a minimal analysis layer for common price and visit charts without changin
- retailer/store comparison
2. use `data/purchases.csv` as the source of truth
3. keep excel/pivot compatibility intact
- pm note: thin reader layer only; do not move business logic out of the pipeline
** evidence
- commit: `46a3b2c` `Add purchase analysis summaries`
- tests: `./venv/bin/python -m unittest tests.test_analyze_purchases tests.test_purchases`; `./venv/bin/python analyze_purchases.py`
- datetime: 2026-03-24 16:48:41 EDT
** notes
- The new layer is file-based, not notebook- or dashboard-based: `analyze_purchases.py` reads `data/review/purchases.csv` and writes chart-ready CSVs under `data/review/analysis/`.
- This keeps Excel/pivot workflows intact while still giving a repeatable CLI path for common price, visit, category, and retailer/store summaries.
* [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved normalized retailer items (2-4 commits)

View File

@@ -0,0 +1,149 @@
import csv
import tempfile
import unittest
from pathlib import Path
import analyze_purchases
class AnalyzePurchasesTests(unittest.TestCase):
def test_analysis_outputs_cover_required_views(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
output_dir = Path(tmpdir) / "analysis"
fieldnames = [
"purchase_date",
"retailer",
"order_id",
"catalog_id",
"catalog_name",
"category",
"product_type",
"net_line_total",
"line_total",
"normalized_quantity",
"normalized_quantity_unit",
"effective_price",
"effective_price_unit",
"store_name",
"store_number",
"store_city",
"store_state",
"is_fee",
"is_discount_line",
"is_coupon_line",
]
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-01",
"retailer": "giant",
"order_id": "g1",
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"net_line_total": "1.29",
"line_total": "1.29",
"normalized_quantity": "2.19",
"normalized_quantity_unit": "lb",
"effective_price": "0.589",
"effective_price_unit": "lb",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"purchase_date": "2026-03-01",
"retailer": "giant",
"order_id": "g1",
"catalog_id": "cat_ice",
"catalog_name": "ICE",
"category": "frozen",
"product_type": "ice",
"net_line_total": "3.50",
"line_total": "3.50",
"normalized_quantity": "20",
"normalized_quantity_unit": "lb",
"effective_price": "0.175",
"effective_price_unit": "lb",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
{
"purchase_date": "2026-03-02",
"retailer": "costco",
"order_id": "c1",
"catalog_id": "cat_banana",
"catalog_name": "BANANA",
"category": "produce",
"product_type": "banana",
"net_line_total": "1.49",
"line_total": "2.98",
"normalized_quantity": "3",
"normalized_quantity_unit": "lb",
"effective_price": "0.4967",
"effective_price_unit": "lb",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
"is_fee": "false",
"is_discount_line": "false",
"is_coupon_line": "false",
},
]
)
analyze_purchases.main.callback(
purchases_csv=str(purchases_csv),
output_dir=str(output_dir),
)
expected_files = [
"item_price_over_time.csv",
"spend_by_visit.csv",
"items_per_visit.csv",
"category_spend_over_time.csv",
"retailer_store_breakdown.csv",
]
for name in expected_files:
self.assertTrue((output_dir / name).exists(), name)
with (output_dir / "spend_by_visit.csv").open(newline="", encoding="utf-8") as handle:
spend_rows = list(csv.DictReader(handle))
self.assertEqual("4.79", spend_rows[0]["visit_spend_total"])
with (output_dir / "items_per_visit.csv").open(newline="", encoding="utf-8") as handle:
item_rows = list(csv.DictReader(handle))
self.assertEqual("2", item_rows[0]["item_row_count"])
self.assertEqual("2", item_rows[0]["distinct_catalog_count"])
with (output_dir / "category_spend_over_time.csv").open(newline="", encoding="utf-8") as handle:
category_rows = list(csv.DictReader(handle))
produce_row = next(row for row in category_rows if row["purchase_date"] == "2026-03-01" and row["category"] == "produce")
self.assertEqual("1.29", produce_row["category_spend_total"])
with (output_dir / "retailer_store_breakdown.csv").open(newline="", encoding="utf-8") as handle:
store_rows = list(csv.DictReader(handle))
giant_row = next(row for row in store_rows if row["retailer"] == "giant")
self.assertEqual("1", giant_row["visit_count"])
self.assertEqual("2", giant_row["item_row_count"])
self.assertEqual("4.79", giant_row["store_spend_total"])
if __name__ == "__main__":
unittest.main()

View File

@@ -167,6 +167,11 @@ class PurchaseLogTests(unittest.TestCase):
self.assertEqual("1", rows[0]["normalized_quantity"])
self.assertEqual("lb", rows[0]["normalized_quantity_unit"])
self.assertEqual("lb", rows[0]["effective_price_unit"])
self.assertEqual("g1", rows[0]["order_id"])
self.assertEqual("Giant", rows[0]["store_name"])
self.assertEqual("42", rows[0]["store_number"])
self.assertEqual("Springfield", rows[0]["store_city"])
self.assertEqual("VA", rows[0]["store_state"])
def test_main_writes_purchase_and_example_csvs(self):
with tempfile.TemporaryDirectory() as tmpdir:
@@ -624,6 +629,94 @@ class PurchaseLogTests(unittest.TestCase):
self.assertEqual("", rows[0]["effective_price"])
self.assertEqual("", rows[0]["effective_price_unit"])
def test_purchase_rows_support_visit_level_grouping_without_extra_joins(self):
fieldnames = enrich_costco.OUTPUT_FIELDS
def base_row():
return {field: "" for field in fieldnames}
row_one = base_row()
row_one.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"normalized_row_id": "giant:g1:1",
"normalized_item_id": "gnorm:first",
"order_date": "2026-03-01",
"item_name": "FIRST ITEM",
"item_name_norm": "FIRST ITEM",
"qty": "1",
"unit": "EA",
"normalized_quantity": "1",
"normalized_quantity_unit": "each",
"line_total": "3.50",
"measure_type": "each",
"raw_order_path": "data/giant-web/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
row_two = base_row()
row_two.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "2",
"normalized_row_id": "giant:g1:2",
"normalized_item_id": "gnorm:second",
"order_date": "2026-03-01",
"item_name": "SECOND ITEM",
"item_name_norm": "SECOND ITEM",
"qty": "1",
"unit": "EA",
"normalized_quantity": "1",
"normalized_quantity_unit": "each",
"line_total": "2.00",
"measure_type": "each",
"raw_order_path": "data/giant-web/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
rows, _links = build_purchases.build_purchase_rows(
[row_one, row_two],
[],
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
[],
[],
[],
[],
)
visit_key = {
(
row["retailer"],
row["order_id"],
row["purchase_date"],
row["store_name"],
row["store_number"],
row["store_city"],
row["store_state"],
)
for row in rows
}
visit_total = sum(float(row["net_line_total"]) for row in rows)
self.assertEqual(1, len(visit_key))
self.assertEqual(5.5, visit_total)
if __name__ == "__main__":
unittest.main()