Compare commits

...

13 Commits

Author SHA1 Message Date
ben
eddef7de2b updated readme and prep for next phase 2026-03-17 13:59:57 -04:00
ben
83bc6c4a7c Update t1.12 task evidence 2026-03-17 13:25:21 -04:00
ben
d39497c298 Refine product review prompt flow 2026-03-17 13:25:12 -04:00
ben
7b8141cd42 Improve product review display workflow 2026-03-17 12:25:47 -04:00
ben
e494386e64 build_purchases rev1 2026-03-17 12:21:44 -04:00
ben
7527fe37eb added git notes 2026-03-17 12:21:24 -04:00
ben
a1fafa3885 added t1.12 scope to simplify review process 2026-03-17 12:20:48 -04:00
ben
37b2196023 added git notes 2026-03-17 09:23:00 -04:00
ben
7f8c3ed8eb updated readme with Review steps 2026-03-17 09:14:14 -04:00
ben
91bfd3597e Record t1.11 task evidence 2026-03-16 20:45:57 -04:00
ben
c7dad5489e Add terminal review resolution workflow 2026-03-16 20:45:37 -04:00
ben
34eedff9c5 Record t1.8.7 and t1.9 task evidence 2026-03-16 18:01:16 -04:00
ben
be1bf6328e Build pivot-ready purchase log 2026-03-16 18:01:09 -04:00
9 changed files with 1846 additions and 202 deletions

274
README.md
View File

@@ -1,227 +1,113 @@
# scrape-giant
Small grocery-history pipeline for Giant and Costco receipt data.
CLI to pull purchase history from Giant and Costco websites and refine into a single product catalog for external analysis.
This repo is still a manual, stepwise pipeline. There is no single orchestrator
script yet. Each stage is run directly, and later stages depend on files
produced by earlier stages.
Run each script step-by-step from the terminal.
## What The Project Does
## What It Does
The current flow is:
1. `scrape_giant.py`: download Giant orders and items
2. `enrich_giant.py`: normalize Giant line items
3. `scrape_costco.py`: download Costco orders and items
4. `enrich_costco.py`: normalize Costco line items
5. `build_purchases.py`: combine retailer outputs into one purchase table
6. `review_products.py`: review unresolved product matches in the terminal
1. acquire raw Giant receipt/history data
2. enrich Giant line items into a shared enriched-item schema
3. acquire raw Costco receipt data
4. enrich Costco line items into the same shared enriched-item schema
5. build observed-product, review, and canonical-product layers
6. validate that Giant and Costco can flow through the same downstream model
## Requirements
Raw retailer JSON remains the source of truth.
- Python 3.10+
- Firefox installed with active Giant and Costco sessions
## Current Scripts
- `scrape_giant.py`
Fetch Giant in-store history and order detail payloads from an active Firefox
session.
- `scrape_costco.py`
Fetch Costco receipt summary/detail payloads from an active Firefox session.
Costco currently prefers `.env` header values first, then falls back to exact
Firefox local-storage values for session auth.
- `enrich_giant.py`
Parse Giant raw order JSON into `giant_output/items_enriched.csv`.
- `enrich_costco.py`
Parse Costco raw receipt JSON into `costco_output/items_enriched.csv`.
- `build_observed_products.py`
Build retailer-facing observed products from enriched rows.
- `build_review_queue.py`
Build a manual review queue for low-confidence or unresolved observed
products.
- `build_canonical_layer.py`
Build shared canonical products and observed-to-canonical links.
- `validate_cross_retailer_flow.py`
Write a proof/check output showing that Giant and Costco can meet in the same
downstream model.
## Manual Pipeline
Run these from the repo root with the venv active, or call them through
`./venv/bin/python`.
### 1. Acquire Giant raw data
## Install
```bash
./venv/bin/python scrape_giant.py
python -m venv venv
./venv/scripts/activate
pip install -r requirements.txt
```
Inputs:
- active Firefox session for `giantfood.com`
- `GIANT_USER_ID` and `GIANT_LOYALTY_NUMBER` from `.env`, shell env, or prompt
## Optional `.env`
Outputs:
- `giant_output/raw/history.json`
- `giant_output/raw/<order_id>.json`
Current version works best with `.env` in the project root. The scraper will prompt for these values if they are not found in the current browser session.
- `scrape_giant` prompts if `GIANT_USER_ID` or `GIANT_LOYALTY_NUMBER` is missing.
- `scrape_costco` tries `.env` first, then Firefox local storage for session-backed values; `COSTCO_CLIENT_IDENTIFIER` should still be set explicitly.
```env
GIANT_USER_ID=...
GIANT_LOYALTY_NUMBER=...
COSTCO_X_AUTHORIZATION=...
COSTCO_X_WCS_CLIENTID=...
COSTCO_CLIENT_IDENTIFIER=...
```
## Run Order
Run the pipeline in this order:
```bash
python scrape_giant.py
python enrich_giant.py
python scrape_costco.py
python enrich_costco.py
python build_purchases.py
python review_products.py
python build_purchases.py
```
Why run `build_purchases.py` twice:
- first pass builds the current combined dataset and review queue inputs
- `review_products.py` writes durable review decisions
- second pass reapplies those decisions into the purchase output
If you only want to refresh the queue without reviewing interactively:
```bash
python review_products.py --refresh-only
```
## Key Outputs
Giant:
- `giant_output/orders.csv`
- `giant_output/items.csv`
### 2. Enrich Giant data
```bash
./venv/bin/python enrich_giant.py
```
Input:
- `giant_output/raw/*.json`
Output:
- `giant_output/items_enriched.csv`
### 3. Acquire Costco raw data
```bash
./venv/bin/python scrape_costco.py
```
Optional useful flags:
```bash
./venv/bin/python scrape_costco.py --months-back 36
./venv/bin/python scrape_costco.py --firefox-profile-dir "C:\\Users\\you\\AppData\\Roaming\\Mozilla\\Firefox\\Profiles\\xxxx.default-release"
```
Inputs:
- active Firefox session for `costco.com`
- optional `.env` values:
- `COSTCO_X_AUTHORIZATION`
- `COSTCO_X_WCS_CLIENTID`
- `COSTCO_CLIENT_IDENTIFIER`
- if `COSTCO_X_AUTHORIZATION` is absent, the script falls back to exact Firefox
local-storage values:
- `idToken` -> sent as `Bearer <idToken>`
- `clientID` -> used as `costco-x-wcs-clientId` when env is blank
Outputs:
- `costco_output/raw/summary.json`
- `costco_output/raw/summary_requests.json`
- `costco_output/raw/<receipt_id>-<timestamp>.json`
Costco:
- `costco_output/orders.csv`
- `costco_output/items.csv`
### 4. Enrich Costco data
```bash
./venv/bin/python enrich_costco.py
```
Input:
- `costco_output/raw/*.json`
Output:
- `costco_output/items_enriched.csv`
### 5. Build shared downstream layers
Combined:
- `combined_output/purchases.csv`
- `combined_output/review_queue.csv`
- `combined_output/review_resolutions.csv`
- `combined_output/canonical_catalog.csv`
- `combined_output/product_links.csv`
- `combined_output/comparison_examples.csv`
```bash
./venv/bin/python build_observed_products.py
./venv/bin/python build_review_queue.py
./venv/bin/python build_canonical_layer.py
```
## Review Workflow
These scripts consume the enriched item files and generate the downstream
product-model outputs.
Current outputs on disk:
- retailer-facing:
- `giant_output/products_observed.csv`
- `giant_output/review_queue.csv`
- `giant_output/products_canonical.csv`
- `giant_output/product_links.csv`
- cross-retailer proof/check output:
- `combined_output/products_observed.csv`
- `combined_output/products_canonical.csv`
- `combined_output/product_links.csv`
- `combined_output/proof_examples.csv`
### 6. Validate cross-retailer flow
```bash
./venv/bin/python validate_cross_retailer_flow.py
```
This is a proof/check step, not the main acquisition path.
## Inputs And Outputs By Directory
### `giant_output/`
Inputs to this layer:
- Firefox session data for Giant
- Giant raw JSON payloads
Generated files:
- `raw/history.json`
- `raw/<order_id>.json`
- `orders.csv`
- `items.csv`
- `items_enriched.csv`
- `products_observed.csv`
- `review_queue.csv`
- `products_canonical.csv`
- `product_links.csv`
### `costco_output/`
Inputs to this layer:
- Firefox session data for Costco
- Costco raw GraphQL receipt payloads
Generated files:
- `raw/summary.json`
- `raw/summary_requests.json`
- `raw/<receipt_id>-<timestamp>.json`
- `orders.csv`
- `items.csv`
- `items_enriched.csv`
### `combined_output/`
Generated by cross-retailer proof/build scripts:
- `products_observed.csv`
- `products_canonical.csv`
- `product_links.csv`
- `proof_examples.csv`
Run `review_products.py` to cleanup unresolved or weakly unified items:
- link an item to an existing canonical product
- create a new canonical product
- exclude an item
- skip it for later
Decisions are saved and reused on later runs.
## Notes
- This project is designed around fragile retailer scraping flows, so the code favors explicit retailer-specific steps over heavy abstraction.
- `scrape_giant.py` and `scrape_costco.py` are meant to work as standalone acquisition scripts.
- `validate_cross_retailer_flow.py` is a proof/check script, not a required production step.
- The pipeline is intentionally simple and currently manual.
- Scraping is retailer-specific and fragile; downstream modeling is shared only
after enrichment.
- `summary_requests.json` is diagnostic metadata from Costco summary enumeration
and is not a receipt payload.
- `enrich_costco.py` skips that file and only parses receipt payloads.
- The repo may contain archived or sample output files under `archive/`; they
are not part of the active scrape path.
## Verification
Run the full test suite with:
## Test
```bash
./venv/bin/python -m unittest discover -s tests
```
Useful one-off checks:
```bash
./venv/bin/python scrape_giant.py --help
./venv/bin/python scrape_costco.py --help
./venv/bin/python enrich_giant.py
./venv/bin/python enrich_costco.py
```
## Project Docs
- `pm/tasks.org`
- `pm/data-model.org`
- `pm/scrape-giant.org`
- `pm/tasks.org`: task tracking
- `pm/data-model.org`: current data model notes
- `pm/review-workflow.org`: review and resolution workflow

414
build_purchases.py Normal file
View File

@@ -0,0 +1,414 @@
from decimal import Decimal
from pathlib import Path
import click
import build_canonical_layer
import build_observed_products
import validate_cross_retailer_flow
from enrich_giant import format_decimal, to_decimal
from layer_helpers import read_csv_rows, stable_id, write_csv_rows
PURCHASE_FIELDS = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_item_key",
"observed_product_id",
"canonical_product_id",
"review_status",
"resolution_action",
"raw_item_name",
"normalized_item_name",
"image_url",
"retailer_item_id",
"upc",
"qty",
"unit",
"pack_qty",
"size_value",
"size_unit",
"measure_type",
"line_total",
"unit_price",
"store_name",
"store_number",
"store_city",
"store_state",
"price_per_each",
"price_per_each_basis",
"price_per_count",
"price_per_count_basis",
"price_per_lb",
"price_per_lb_basis",
"price_per_oz",
"price_per_oz_basis",
"is_discount_line",
"is_coupon_line",
"is_fee",
"raw_order_path",
]
EXAMPLE_FIELDS = [
"example_name",
"canonical_product_id",
"giant_purchase_date",
"giant_raw_item_name",
"giant_price_per_lb",
"costco_purchase_date",
"costco_raw_item_name",
"costco_price_per_lb",
"notes",
]
CATALOG_FIELDS = [
"canonical_product_id",
"canonical_name",
"category",
"product_type",
"brand",
"variant",
"size_value",
"size_unit",
"pack_qty",
"measure_type",
"notes",
"created_at",
"updated_at",
]
RESOLUTION_FIELDS = [
"observed_product_id",
"canonical_product_id",
"resolution_action",
"status",
"resolution_notes",
"reviewed_at",
]
def decimal_or_zero(value):
return to_decimal(value) or Decimal("0")
def derive_metrics(row):
line_total = to_decimal(row.get("line_total"))
qty = to_decimal(row.get("qty"))
pack_qty = to_decimal(row.get("pack_qty"))
size_value = to_decimal(row.get("size_value"))
picked_weight = to_decimal(row.get("picked_weight"))
size_unit = row.get("size_unit", "")
price_per_each = row.get("price_per_each", "")
price_per_lb = row.get("price_per_lb", "")
price_per_oz = row.get("price_per_oz", "")
price_per_count = ""
basis_each = ""
basis_count = ""
basis_lb = ""
basis_oz = ""
if price_per_each:
basis_each = "line_total_over_qty"
elif line_total is not None and qty not in (None, 0):
price_per_each = format_decimal(line_total / qty)
basis_each = "line_total_over_qty"
if line_total is not None and pack_qty not in (None, 0):
total_count = pack_qty * (qty or Decimal("1"))
if total_count not in (None, 0):
price_per_count = format_decimal(line_total / total_count)
basis_count = "line_total_over_pack_qty"
if picked_weight not in (None, 0):
price_per_lb = format_decimal(line_total / picked_weight) if line_total is not None else ""
price_per_oz = (
format_decimal((line_total / picked_weight) / Decimal("16"))
if line_total is not None
else ""
)
basis_lb = "picked_weight_lb"
basis_oz = "picked_weight_lb_to_oz"
elif line_total is not None and size_value not in (None, 0):
total_units = size_value * (pack_qty or Decimal("1")) * (qty or Decimal("1"))
if size_unit == "lb" and total_units not in (None, 0):
per_lb = line_total / total_units
price_per_lb = format_decimal(per_lb)
price_per_oz = format_decimal(per_lb / Decimal("16"))
basis_lb = "parsed_size_lb"
basis_oz = "parsed_size_lb_to_oz"
elif size_unit == "oz" and total_units not in (None, 0):
per_oz = line_total / total_units
price_per_oz = format_decimal(per_oz)
price_per_lb = format_decimal(per_oz * Decimal("16"))
basis_lb = "parsed_size_oz_to_lb"
basis_oz = "parsed_size_oz"
return {
"price_per_each": price_per_each,
"price_per_each_basis": basis_each,
"price_per_count": price_per_count,
"price_per_count_basis": basis_count,
"price_per_lb": price_per_lb,
"price_per_lb_basis": basis_lb,
"price_per_oz": price_per_oz,
"price_per_oz_basis": basis_oz,
}
def order_lookup(rows, retailer):
return {
(retailer, row["order_id"]): row
for row in rows
}
def read_optional_csv_rows(path):
path = Path(path)
if not path.exists():
return []
return read_csv_rows(path)
def load_resolution_lookup(resolution_rows):
lookup = {}
for row in resolution_rows:
if not row.get("observed_product_id"):
continue
lookup[row["observed_product_id"]] = row
return lookup
def merge_catalog_rows(existing_rows, auto_rows):
merged = {}
for row in auto_rows + existing_rows:
canonical_product_id = row.get("canonical_product_id", "")
if canonical_product_id:
merged[canonical_product_id] = row
return sorted(merged.values(), key=lambda row: row["canonical_product_id"])
def catalog_row_from_canonical(row):
return {
"canonical_product_id": row.get("canonical_product_id", ""),
"canonical_name": row.get("canonical_name", ""),
"category": row.get("category", ""),
"product_type": row.get("product_type", ""),
"brand": row.get("brand", ""),
"variant": row.get("variant", ""),
"size_value": row.get("size_value", ""),
"size_unit": row.get("size_unit", ""),
"pack_qty": row.get("pack_qty", ""),
"measure_type": row.get("measure_type", ""),
"notes": row.get("notes", ""),
"created_at": row.get("created_at", ""),
"updated_at": row.get("updated_at", ""),
}
def build_link_state(enriched_rows):
observed_rows = build_observed_products.build_observed_products(enriched_rows)
canonical_rows, link_rows = build_canonical_layer.build_canonical_layer(observed_rows)
giant_row, costco_row = validate_cross_retailer_flow.find_proof_pair(observed_rows)
canonical_rows, link_rows, _proof_rows = validate_cross_retailer_flow.merge_proof_pair(
canonical_rows,
link_rows,
giant_row,
costco_row,
)
observed_id_by_key = {
row["observed_key"]: row["observed_product_id"] for row in observed_rows
}
canonical_id_by_observed = {
row["observed_product_id"]: row["canonical_product_id"] for row in link_rows
}
return observed_rows, canonical_rows, link_rows, observed_id_by_key, canonical_id_by_observed
def build_purchase_rows(
giant_enriched_rows,
costco_enriched_rows,
giant_orders,
costco_orders,
resolution_rows,
):
all_enriched_rows = giant_enriched_rows + costco_enriched_rows
(
observed_rows,
canonical_rows,
link_rows,
observed_id_by_key,
canonical_id_by_observed,
) = build_link_state(all_enriched_rows)
resolution_lookup = load_resolution_lookup(resolution_rows)
for observed_product_id, resolution in resolution_lookup.items():
action = resolution.get("resolution_action", "")
status = resolution.get("status", "")
if status != "approved":
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
canonical_id_by_observed[observed_product_id] = resolution["canonical_product_id"]
elif action == "exclude":
canonical_id_by_observed[observed_product_id] = ""
orders_by_id = {}
orders_by_id.update(order_lookup(giant_orders, "giant"))
orders_by_id.update(order_lookup(costco_orders, "costco"))
purchase_rows = []
for row in sorted(
all_enriched_rows,
key=lambda item: (item["order_date"], item["retailer"], item["order_id"], int(item["line_no"])),
):
observed_key = build_observed_products.build_observed_key(row)
observed_product_id = observed_id_by_key.get(observed_key, "")
order_row = orders_by_id.get((row["retailer"], row["order_id"]), {})
metrics = derive_metrics(row)
resolution = resolution_lookup.get(observed_product_id, {})
purchase_rows.append(
{
"purchase_date": row["order_date"],
"retailer": row["retailer"],
"order_id": row["order_id"],
"line_no": row["line_no"],
"observed_item_key": row["observed_item_key"],
"observed_product_id": observed_product_id,
"canonical_product_id": canonical_id_by_observed.get(observed_product_id, ""),
"review_status": resolution.get("status", ""),
"resolution_action": resolution.get("resolution_action", ""),
"raw_item_name": row["item_name"],
"normalized_item_name": row["item_name_norm"],
"image_url": row.get("image_url", ""),
"retailer_item_id": row["retailer_item_id"],
"upc": row["upc"],
"qty": row["qty"],
"unit": row["unit"],
"pack_qty": row["pack_qty"],
"size_value": row["size_value"],
"size_unit": row["size_unit"],
"measure_type": row["measure_type"],
"line_total": row["line_total"],
"unit_price": row["unit_price"],
"store_name": order_row.get("store_name", ""),
"store_number": order_row.get("store_number", ""),
"store_city": order_row.get("store_city", ""),
"store_state": order_row.get("store_state", ""),
"is_discount_line": row["is_discount_line"],
"is_coupon_line": row["is_coupon_line"],
"is_fee": row["is_fee"],
"raw_order_path": row["raw_order_path"],
**metrics,
}
)
return purchase_rows, observed_rows, canonical_rows, link_rows
def apply_manual_resolutions_to_links(link_rows, resolution_rows):
link_by_observed = {row["observed_product_id"]: dict(row) for row in link_rows}
for resolution in resolution_rows:
if resolution.get("status") != "approved":
continue
observed_product_id = resolution.get("observed_product_id", "")
action = resolution.get("resolution_action", "")
if not observed_product_id:
continue
if action == "exclude":
link_by_observed.pop(observed_product_id, None)
continue
if action in {"link", "create"} and resolution.get("canonical_product_id"):
link_by_observed[observed_product_id] = {
"observed_product_id": observed_product_id,
"canonical_product_id": resolution["canonical_product_id"],
"link_method": f"manual_{action}",
"link_confidence": "high",
"review_status": resolution.get("status", ""),
"reviewed_by": "",
"reviewed_at": resolution.get("reviewed_at", ""),
"link_notes": resolution.get("resolution_notes", ""),
}
return sorted(link_by_observed.values(), key=lambda row: row["observed_product_id"])
def build_comparison_examples(purchase_rows):
giant_banana = None
costco_banana = None
for row in purchase_rows:
if row.get("normalized_item_name") != "BANANA":
continue
if not row.get("canonical_product_id"):
continue
if row["retailer"] == "giant" and row.get("price_per_lb"):
giant_banana = row
if row["retailer"] == "costco" and row.get("price_per_lb"):
costco_banana = row
if not giant_banana or not costco_banana:
return []
return [
{
"example_name": "banana_price_per_lb",
"canonical_product_id": giant_banana["canonical_product_id"],
"giant_purchase_date": giant_banana["purchase_date"],
"giant_raw_item_name": giant_banana["raw_item_name"],
"giant_price_per_lb": giant_banana["price_per_lb"],
"costco_purchase_date": costco_banana["purchase_date"],
"costco_raw_item_name": costco_banana["raw_item_name"],
"costco_price_per_lb": costco_banana["price_per_lb"],
"notes": "Example comparison using normalized price_per_lb across Giant and Costco",
}
]
@click.command()
@click.option("--giant-items-enriched-csv", default="giant_output/items_enriched.csv", show_default=True)
@click.option("--costco-items-enriched-csv", default="costco_output/items_enriched.csv", show_default=True)
@click.option("--giant-orders-csv", default="giant_output/orders.csv", show_default=True)
@click.option("--costco-orders-csv", default="costco_output/orders.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--links-csv", default="combined_output/product_links.csv", show_default=True)
@click.option("--output-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--examples-csv", default="combined_output/comparison_examples.csv", show_default=True)
def main(
giant_items_enriched_csv,
costco_items_enriched_csv,
giant_orders_csv,
costco_orders_csv,
resolutions_csv,
catalog_csv,
links_csv,
output_csv,
examples_csv,
):
resolution_rows = read_optional_csv_rows(resolutions_csv)
purchase_rows, _observed_rows, canonical_rows, link_rows = build_purchase_rows(
read_csv_rows(giant_items_enriched_csv),
read_csv_rows(costco_items_enriched_csv),
read_csv_rows(giant_orders_csv),
read_csv_rows(costco_orders_csv),
resolution_rows,
)
existing_catalog_rows = read_optional_csv_rows(catalog_csv)
merged_catalog_rows = merge_catalog_rows(
existing_catalog_rows,
[catalog_row_from_canonical(row) for row in canonical_rows],
)
link_rows = apply_manual_resolutions_to_links(link_rows, resolution_rows)
example_rows = build_comparison_examples(purchase_rows)
write_csv_rows(catalog_csv, merged_catalog_rows, CATALOG_FIELDS)
write_csv_rows(links_csv, link_rows, build_canonical_layer.LINK_FIELDS)
write_csv_rows(output_csv, purchase_rows, PURCHASE_FIELDS)
write_csv_rows(examples_csv, example_rows, EXAMPLE_FIELDS)
click.echo(
f"wrote {len(purchase_rows)} purchase rows to {output_csv}, "
f"{len(merged_catalog_rows)} catalog rows to {catalog_csv}, "
f"and {len(example_rows)} comparison examples to {examples_csv}"
)
if __name__ == "__main__":
main()

73
pm/review-workflow.org Normal file
View File

@@ -0,0 +1,73 @@
* review and item-resolution workflow
This document defines the durable review workflow for unresolved observed
products.
** persistent files
- `combined_output/purchases.csv`
Flat normalized purchase log. This is the review input because it retains:
- raw item name
- normalized item name
- observed product id
- canonical product id when resolved
- retailer/order/date/price context
- `combined_output/review_queue.csv`
Current unresolved observed products grouped for review.
- `combined_output/review_resolutions.csv`
Durable mapping decisions from observed products to canonical products.
- `combined_output/canonical_catalog.csv`
Durable canonical item catalog used by manual review and later purchase-log
rebuilds.
There is no separate alias file in v1. `review_resolutions.csv` is the mapping
layer from observed products to canonical product ids.
** workflow
1. Run `build_purchases.py`
This refreshes the purchase log and seeds/updates the canonical catalog from
current auto-linked canonical rows.
2. Run `review_products.py`
This rebuilds `review_queue.csv` from unresolved purchase rows and prompts in
the terminal for one observed product at a time.
3. Choose one of:
- link to existing canonical
- create new canonical
- exclude
- skip
4. `review_products.py` writes decisions immediately to:
- `review_resolutions.csv`
- `canonical_catalog.csv` when a new canonical item is created
5. Rerun `build_purchases.py`
This reapplies approved resolutions so the final normalized purchase log now
carries the reviewed `canonical_product_id`.
** what the human edits
The primary interface is terminal prompts in `review_products.py`.
The human provides:
- existing canonical id when linking
- canonical name/category/product type when creating a new canonical item
- optional resolution notes
The generated CSVs remain editable by hand if needed, but the intended workflow
is terminal-first.
** durability
- Resolutions are keyed by `observed_product_id`, not by one-off text
substitution.
- Canonical products are keyed by stable `canonical_product_id`.
- Future runs reuse approved mappings through `review_resolutions.csv`.
** retention of audit fields
The final `purchases.csv` retains:
- `raw_item_name`
- `normalized_item_name`
- `canonical_product_id`
This preserves the raw receipt description, the deterministic parser output, and
the human-approved canonical identity in one flat purchase log.

View File

@@ -27,6 +27,8 @@ carry forward image url
3. build observed-product atble from enriched items
* git issues
** ssh / access to gitea
ssh://git@192.168.1.207:2020/ben/scrape-giant.git
https://git.hgsky.me/ben/scrape-giant.git
@@ -44,6 +46,31 @@ git remote set-url gitea git@gitea:ben/scrape-giant.git
on local network: use ssh to 192.168.1.207:2020
from elsewhere/public: use https to git.hgsky.me/... unless you later expose ssh properly
** stash
z z to stash local work only
take care not to add ignored files which will add the venv and `__pycache__`
z p to pop the stash back
** creating remote branches
P p, magit will suggest upstream (gitea), select and Enter and it will be created
** cherry-picking
b b : switch to desired branch (review)
l B : open reflog for local branches
(my changes were committed to local cx but not pushed to gitea/cx)
put point on the commit you want; did this in sequence
A A : cherry pick commit to current branch
minibuffer will show the commit and all branches, leave it on that commit
the final commit was not shown by hash, just the branch cx
since (local) cx was caught up with that branch
** reverting a branch
b l : switch to local branch (cx)
l l : open local reflog
put point on the commit; highlighted remote gitea/cx
X : reset branch; prompts you, selected cx
* giant requests
** item:
get:
@@ -223,3 +250,18 @@ python build_observed_products.py
python build_review_queue.py
python build_canonical_layer.py
python validate_cross_retailer_flow.py
* t1.11 tasks [2026-03-17 Tue 13:49]
ok i ran a few. time to run some cleanups here - i'm wondering if we shouldn't be less aggressive with canonical names and encourage a better manual process to start.
1. auto-created canonical_names lack category, product_type - ok with filling these in manually in the catalog once the queue is empty
2. canonical_names feel too specific, e.g., "5DZ egg"
3. some canonical_names need consolidation, eg "LIME" and "LIME . / ." ; poss cleanup issue. there are 5 entries for ergg but but they are all regular large grade A white eggs, just different amounts in dozens.
Eggs are actually a great candidate for the kind of analysis we want to do - the pipeline should have caught and properly sorted these into size/qty:
```canonical_product_id canonical_name category product_type brand variant size_value size_unit pack_qty measure_type notes created_at updated_at
gcan_0e350505fd22 5DZ EGG / / KS each auto-linked via exact_name
gcan_47279a80f5f3 EGG 5 DOZ. BBS each auto-linked via exact_name
gcan_7d099130c1bf LRG WHITE EGG SB 30 count auto-linked via exact_upc
gcan_849c2817e667 GDA LRG WHITE EGG SB 18 count auto-linked via exact_upc
gcan_cb0c6c8cf480 LG EGG CONVENTIONAL 18 count count auto-linked via exact_name_size ```
4. Build costco mechanism for matching discount to line item.
1. Discounts appear as their own line items with a number like /123456, this matches the UPC of the discounted item
2. must be date-matched to the UPC

View File

@@ -276,7 +276,7 @@
- commit: `7789c2e` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_giant.py --help`; `./venv/bin/python scrape_costco.py --help`; verified Firefox storage token extraction and locked-db copy behavior in unit tests
- date: 2026-03-16
* [ ] t1.8.7: simplify costco session bootstrap and remove over-abstraction (2-4 commits)
* [X] t1.8.7: simplify costco session bootstrap and remove over-abstraction (2-4 commits)
** acceptance criteria
- make `scrape_costco.py` readable end-to-end without tracing through multiple partial bootstrap layers
@@ -302,12 +302,23 @@
- no new heuristics in this task
** evidence
- commit:
- tests:
- date:
* [ ] t1.9: compute normalized comparison metrics (2-4 commits)
- commit: `d7a0329` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python scrape_costco.py --help`; verified explicit Costco session bootstrap flow in `scrape_costco.py` and low-level-only browser access in `browser_session.py`
- date: 2026-03-16
* [X] t1.9: build pivot-ready normalized purchase log and comparison metrics (2-4 commits)
** acceptance criteria
- produce a flat `purchases.csv` suitable for excel pivot tables and pivot charts
- each purchase row preserves:
- purchase date
- retailer
- order id
- raw item name
- normalized item name
- canonical item id when resolved
- quantity / unit
- line total
- store/location info where available
- derive normalized comparison fields where possible on enriched or observed product rows:
- `price_per_lb`
- `price_per_oz`
@@ -318,17 +329,92 @@
- receipt weight
- explicit count/pack
- emit nulls when basis is unknown, conflicting, or ambiguous
- support pivot-friendly analysis of purchase frequency and item cost over time
- document at least one Giant vs Costco comparison example using the normalized metrics
** notes
- compute metrics as close to the raw observation as possible
- canonical layer can aggregate later, but should not invent missing unit economics
- unit discipline matters more than coverage
- raw item name must be retained for audit/debugging
** evidence
- commit:
- tests:
- date:
- commit: `be1bf63` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; verified `combined_output/purchases.csv` and `combined_output/comparison_examples.csv` on the current Giant + Costco dataset
- date: 2026-03-16
* [X] t1.11: define review and item-resolution workflow for unresolved products (2-3 commits)
** acceptance criteria
- define the persistent files used to resolve unknown items, including:
- review queue
- canonical item catalog
- alias / mapping layer if separate
- specify how unresolved items move from `review_queue.csv` into the final normalized purchase log
- define the manual resolution workflow, including:
- what the human edits
- what script is rerun afterward
- how resolved mappings are persisted for future runs
- ensure resolved items are positively identified into stable canonical item ids rather than one-off text substitutions
- document how raw item name, normalized item name, and canonical item id are all retained
** notes
- goal is “approve once, reuse forever”
- keep the workflow simple and auditable
- manual review is fine; the important part is making it durable and rerunnable
** evidence
- commit: `c7dad54` on branch `cx`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python build_purchases.py`; `./venv/bin/python review_products.py --refresh-only`; verified `combined_output/review_queue.csv`, `combined_output/review_resolutions.csv` workflow, and `combined_output/canonical_catalog.csv`
- date: 2026-03-16
* [X] t1.12: simplify review process display
Clearly show current state separate from proposed future state.
** acceptance criteria
1. Display position in review queue, e.g., (1/22)
2. Display compact header with observed_product under review, queue position, and canonical decision, e.g.: "Resolve [n] observed product group [name] and associated items to canonical_name [name]? (\n [n] matched items)"
3. color-code outputs based on info, input/prompt, warning/error
1. color action menu/requests for input differently from display text; do not color individual options separately
2. "no canonical_name suggestions found" is informational, not a warning/error.
4. update action menu `[x]exclude` to `e[x]clude`
5. on each review item, display a list of all matched items to be linked, sorted by descending date:
1. YYYY-mm-dd, price, raw item name, normalized item name, upc, retailer
2. image URL, if exists
3. Sample:
6. on each review item, suggest (but do not auto-apply) up to 3 likely existing canonicals using determinstic rules, e.g:
1. exact normalized name match
2. prefix/contains match on canonical name
3. exact UPC
7. Sample Entry:
#+begin_comment
Review 7/22: Resolve observed_product MIXED PEPPER to canonical_name [__]?
2 matched items:
[1] 2026-03-12 | 7.49 | MIXED PEPPER 6-PACK | MIXED PEPPER | [upc] | costco | [img_url]
[2] [YYYY-mm-dd] | [price] | [raw_name] | [observed_name] | [upc] | [retailer] | [img_url]
2 canonical suggestions found:
[1] BELL PEPPERS, PRODUCE
[2] PEPPER, SPICES
#+end_comment
8. When link is selected, users should be able to select the number of the item in the list, e.g.:
#+begin_comment
Select the canonical_name to associate [n] items with:
[1] GRB GRADU PCH PUF1. | gcan_01b0d623aa02
[2] BTB CHICKEN | gcan_0201f0feb749
[3] LIME | gcan_02074d9e7359
#+end_comment
9. Add confirmation to link selection with instructions, "[n] [observed_name] and future observed_name matches will be associated with [canonical_name], is this ok?
actions: [Y]es [n]o [b]ack [s]kip [q]uit
- reinforce project terminology such as raw_name, observed_name, canonical_name
** evidence
- commit: `7b8141c`, `d39497c`
- tests: `./venv/bin/python -m unittest discover -s tests`; `./venv/bin/python -m unittest tests.test_review_workflow tests.test_purchases`; `./venv/bin/python review_products.py --help`; verified compact review header, numbered matched-item display, informational no-suggestion state, numbered canonical selection, and confirmation flow
- date: 2026-03-17
** notes
- The key improvement was shifting the prompt from system metadata to reviewer intent: one observed_product, its matched retailer rows, and one canonical_name decision.
- Numbered canonical selection plus confirmation worked better than free-text id entry and should reduce accidental links.
- Deterministic suggestions remain intentionally conservative; they speed up common cases, but unresolved items still depend on human review by design.
* [ ] t1.10: add optional llm-assisted suggestion workflow for unresolved products (2-4 commits)

426
review_products.py Normal file
View File

@@ -0,0 +1,426 @@
from collections import defaultdict
from datetime import date
import click
import build_purchases
from layer_helpers import compact_join, stable_id, write_csv_rows
QUEUE_FIELDS = [
"review_id",
"retailer",
"observed_product_id",
"canonical_product_id",
"reason_code",
"priority",
"raw_item_names",
"normalized_names",
"upc_values",
"example_prices",
"seen_count",
"status",
"resolution_action",
"resolution_notes",
"created_at",
"updated_at",
]
def build_review_queue(purchase_rows, resolution_rows):
by_observed = defaultdict(list)
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "")
if not observed_product_id:
continue
by_observed[observed_product_id].append(row)
today_text = str(date.today())
queue_rows = []
for observed_product_id, rows in sorted(by_observed.items()):
current_resolution = resolution_lookup.get(observed_product_id, {})
if current_resolution.get("status") == "approved":
continue
unresolved_rows = [row for row in rows if not row.get("canonical_product_id")]
if not unresolved_rows:
continue
retailers = sorted({row["retailer"] for row in rows})
review_id = stable_id("rvw", observed_product_id)
queue_rows.append(
{
"review_id": review_id,
"retailer": " | ".join(retailers),
"observed_product_id": observed_product_id,
"canonical_product_id": current_resolution.get("canonical_product_id", ""),
"reason_code": "missing_canonical_link",
"priority": "high",
"raw_item_names": compact_join(
sorted({row["raw_item_name"] for row in rows if row["raw_item_name"]}),
limit=8,
),
"normalized_names": compact_join(
sorted(
{
row["normalized_item_name"]
for row in rows
if row["normalized_item_name"]
}
),
limit=8,
),
"upc_values": compact_join(
sorted({row["upc"] for row in rows if row["upc"]}),
limit=8,
),
"example_prices": compact_join(
sorted({row["line_total"] for row in rows if row["line_total"]}),
limit=8,
),
"seen_count": str(len(rows)),
"status": current_resolution.get("status", "pending"),
"resolution_action": current_resolution.get("resolution_action", ""),
"resolution_notes": current_resolution.get("resolution_notes", ""),
"created_at": current_resolution.get("reviewed_at", today_text),
"updated_at": today_text,
}
)
return queue_rows
def save_resolution_rows(path, rows):
write_csv_rows(path, rows, build_purchases.RESOLUTION_FIELDS)
def save_catalog_rows(path, rows):
write_csv_rows(path, rows, build_purchases.CATALOG_FIELDS)
INFO_COLOR = "cyan"
PROMPT_COLOR = "bright_yellow"
WARNING_COLOR = "magenta"
def sort_related_items(rows):
return sorted(
rows,
key=lambda row: (
row.get("purchase_date", ""),
row.get("order_id", ""),
int(row.get("line_no", "0") or "0"),
),
reverse=True,
)
def build_canonical_suggestions(related_rows, catalog_rows, limit=3):
normalized_names = {
row.get("normalized_item_name", "").strip().upper()
for row in related_rows
if row.get("normalized_item_name", "").strip()
}
upcs = {
row.get("upc", "").strip()
for row in related_rows
if row.get("upc", "").strip()
}
suggestions = []
seen_ids = set()
def add_matches(rows, reason):
for row in rows:
canonical_product_id = row.get("canonical_product_id", "")
if not canonical_product_id or canonical_product_id in seen_ids:
continue
seen_ids.add(canonical_product_id)
suggestions.append(
{
"canonical_product_id": canonical_product_id,
"canonical_name": row.get("canonical_name", ""),
"reason": reason,
}
)
if len(suggestions) >= limit:
return True
return False
exact_upc_rows = [
row
for row in catalog_rows
if row.get("upc", "").strip() and row.get("upc", "").strip() in upcs
]
if add_matches(exact_upc_rows, "exact upc"):
return suggestions
exact_name_rows = [
row
for row in catalog_rows
if row.get("canonical_name", "").strip().upper() in normalized_names
]
if add_matches(exact_name_rows, "exact normalized name"):
return suggestions
contains_rows = []
for row in catalog_rows:
canonical_name = row.get("canonical_name", "").strip().upper()
if not canonical_name:
continue
for normalized_name in normalized_names:
if normalized_name in canonical_name or canonical_name in normalized_name:
contains_rows.append(row)
break
add_matches(contains_rows, "canonical name contains match")
return suggestions
def build_display_lines(queue_row, related_rows):
lines = []
for index, row in enumerate(sort_related_items(related_rows), start=1):
lines.append(
" [{index}] {purchase_date} | {line_total} | {raw_item_name} | {normalized_item_name} | "
"{upc} | {retailer}".format(
index=index,
purchase_date=row.get("purchase_date", ""),
line_total=row.get("line_total", ""),
raw_item_name=row.get("raw_item_name", ""),
normalized_item_name=row.get("normalized_item_name", ""),
upc=row.get("upc", ""),
retailer=row.get("retailer", ""),
)
)
if row.get("image_url"):
lines.append(f" {row['image_url']}")
if not lines:
lines.append(" [1] no matched item rows found")
return lines
def observed_name(queue_row, related_rows):
if queue_row.get("normalized_names"):
return queue_row["normalized_names"].split(" | ")[0]
for row in related_rows:
if row.get("normalized_item_name"):
return row["normalized_item_name"]
return queue_row.get("observed_product_id", "")
def choose_existing_canonical(display_rows, observed_label, matched_count):
click.secho(
f"Select the canonical_name to associate {matched_count} items with:",
fg=INFO_COLOR,
)
for index, row in enumerate(display_rows, start=1):
click.echo(f" [{index}] {row['canonical_name']} | {row['canonical_product_id']}")
choice = click.prompt(
click.style("selection", fg=PROMPT_COLOR),
type=click.IntRange(1, len(display_rows)),
)
chosen_row = display_rows[choice - 1]
click.echo(
f'{matched_count} "{observed_label}" items and future matches will be associated '
f'with "{chosen_row["canonical_name"]}".'
)
click.secho(
"actions: [y]es [n]o [b]ack [s]kip [q]uit",
fg=PROMPT_COLOR,
)
confirm = click.prompt(
click.style("confirm", fg=PROMPT_COLOR),
type=click.Choice(["y", "n", "b", "s", "q"]),
)
if confirm == "y":
return chosen_row["canonical_product_id"], ""
if confirm == "s":
return "", "skip"
if confirm == "q":
return "", "quit"
return "", "back"
def prompt_resolution(queue_row, related_rows, catalog_rows, queue_index, queue_total):
suggestions = build_canonical_suggestions(related_rows, catalog_rows)
observed_label = observed_name(queue_row, related_rows)
matched_count = len(related_rows)
click.echo("")
click.secho(
f"Review {queue_index}/{queue_total}: Resolve observed_product {observed_label} "
"to canonical_name [__]?",
fg=INFO_COLOR,
)
click.echo(f"{matched_count} matched items:")
for line in build_display_lines(queue_row, related_rows):
click.echo(line)
if suggestions:
click.echo(f"{len(suggestions)} canonical suggestions found:")
for index, suggestion in enumerate(suggestions, start=1):
click.echo(f" [{index}] {suggestion['canonical_name']}")
else:
click.echo("no canonical_name suggestions found")
click.secho(
"[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:",
fg=PROMPT_COLOR,
)
action = click.prompt(
"",
type=click.Choice(["l", "n", "x", "s", "q"]),
prompt_suffix=" ",
)
if action == "q":
return None, None
if action == "s":
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if action == "x":
notes = click.prompt(
click.style("exclude notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "exclude",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
if action == "l":
display_rows = suggestions or [
{
"canonical_product_id": row["canonical_product_id"],
"canonical_name": row["canonical_name"],
"reason": "catalog sample",
}
for row in catalog_rows[:10]
]
while True:
canonical_product_id, outcome = choose_existing_canonical(
display_rows,
observed_label,
matched_count,
)
if outcome == "skip":
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": "",
"resolution_action": "skip",
"status": "pending",
"resolution_notes": queue_row.get("resolution_notes", ""),
"reviewed_at": str(date.today()),
}, None
if outcome == "quit":
return None, None
if outcome == "back":
continue
break
notes = click.prompt(click.style("link notes", fg=PROMPT_COLOR), default="", show_default=False)
return {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"resolution_action": "link",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}, None
canonical_name = click.prompt(click.style("canonical name", fg=PROMPT_COLOR), type=str)
category = click.prompt(
click.style("category", fg=PROMPT_COLOR),
default="",
show_default=False,
)
product_type = click.prompt(
click.style("product type", fg=PROMPT_COLOR),
default="",
show_default=False,
)
notes = click.prompt(
click.style("notes", fg=PROMPT_COLOR),
default="",
show_default=False,
)
canonical_product_id = stable_id("gcan", f"manual|{canonical_name}|{category}|{product_type}")
canonical_row = {
"canonical_product_id": canonical_product_id,
"canonical_name": canonical_name,
"category": category,
"product_type": product_type,
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": notes,
"created_at": str(date.today()),
"updated_at": str(date.today()),
}
resolution_row = {
"observed_product_id": queue_row["observed_product_id"],
"canonical_product_id": canonical_product_id,
"resolution_action": "create",
"status": "approved",
"resolution_notes": notes,
"reviewed_at": str(date.today()),
}
return resolution_row, canonical_row
@click.command()
@click.option("--purchases-csv", default="combined_output/purchases.csv", show_default=True)
@click.option("--queue-csv", default="combined_output/review_queue.csv", show_default=True)
@click.option("--resolutions-csv", default="combined_output/review_resolutions.csv", show_default=True)
@click.option("--catalog-csv", default="combined_output/canonical_catalog.csv", show_default=True)
@click.option("--limit", default=0, show_default=True, type=int)
@click.option("--refresh-only", is_flag=True, help="Only rebuild review_queue.csv without prompting.")
def main(purchases_csv, queue_csv, resolutions_csv, catalog_csv, limit, refresh_only):
purchase_rows = build_purchases.read_optional_csv_rows(purchases_csv)
resolution_rows = build_purchases.read_optional_csv_rows(resolutions_csv)
catalog_rows = build_purchases.read_optional_csv_rows(catalog_csv)
queue_rows = build_review_queue(purchase_rows, resolution_rows)
write_csv_rows(queue_csv, queue_rows, QUEUE_FIELDS)
click.echo(f"wrote {len(queue_rows)} rows to {queue_csv}")
if refresh_only:
return
resolution_lookup = build_purchases.load_resolution_lookup(resolution_rows)
catalog_by_id = {row["canonical_product_id"]: row for row in catalog_rows if row.get("canonical_product_id")}
rows_by_observed = defaultdict(list)
for row in purchase_rows:
observed_product_id = row.get("observed_product_id", "")
if observed_product_id:
rows_by_observed[observed_product_id].append(row)
reviewed = 0
for index, queue_row in enumerate(queue_rows, start=1):
if limit and reviewed >= limit:
break
related_rows = rows_by_observed.get(queue_row["observed_product_id"], [])
result = prompt_resolution(queue_row, related_rows, catalog_rows, index, len(queue_rows))
if result == (None, None):
break
resolution_row, canonical_row = result
resolution_lookup[resolution_row["observed_product_id"]] = resolution_row
if canonical_row and canonical_row["canonical_product_id"] not in catalog_by_id:
catalog_by_id[canonical_row["canonical_product_id"]] = canonical_row
catalog_rows.append(canonical_row)
reviewed += 1
save_resolution_rows(resolutions_csv, sorted(resolution_lookup.values(), key=lambda row: row["observed_product_id"]))
save_catalog_rows(catalog_csv, sorted(catalog_by_id.values(), key=lambda row: row["canonical_product_id"]))
click.echo(
f"saved {len(resolution_lookup)} resolution rows to {resolutions_csv} "
f"and {len(catalog_by_id)} catalog rows to {catalog_csv}"
)
if __name__ == "__main__":
main()

View File

@@ -670,6 +670,13 @@ def main(
client_identifier=config["client_identifier"],
)
session = build_session(profile_dir, auth_headers)
click.echo(
"session bootstrap: "
f"cookies={True} "
f"authorization={bool(auth_headers.get('costco-x-authorization'))} "
f"client_id={bool(auth_headers.get('costco-x-wcs-clientId'))} "
f"client_identifier={bool(auth_headers.get('client-identifier'))}"
)
start_date, end_date = resolve_date_range(months_back)

301
tests/test_purchases.py Normal file
View File

@@ -0,0 +1,301 @@
import csv
import tempfile
import unittest
from pathlib import Path
import build_purchases
import enrich_costco
class PurchaseLogTests(unittest.TestCase):
def test_derive_metrics_prefers_picked_weight_and_pack_count(self):
metrics = build_purchases.derive_metrics(
{
"line_total": "4.00",
"qty": "1",
"pack_qty": "4",
"size_value": "",
"size_unit": "",
"picked_weight": "2",
"price_per_each": "",
"price_per_lb": "",
"price_per_oz": "",
}
)
self.assertEqual("4", metrics["price_per_each"])
self.assertEqual("1", metrics["price_per_count"])
self.assertEqual("2", metrics["price_per_lb"])
self.assertEqual("0.125", metrics["price_per_oz"])
self.assertEqual("picked_weight_lb", metrics["price_per_lb_basis"])
def test_build_purchase_rows_maps_canonical_ids(self):
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "FRESH BANANA",
"item_name_norm": "BANANA",
"image_url": "https://example.test/banana.jpg",
"retailer_item_id": "100",
"upc": "4011",
"qty": "1",
"unit": "LB",
"line_total": "1.29",
"unit_price": "1.29",
"measure_type": "weight",
"price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
costco_row = {field: "" for field in fieldnames}
costco_row.update(
{
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_item_key": "costco:c1:1",
"order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA",
"retailer_item_id": "30669",
"qty": "1",
"unit": "E",
"line_total": "2.98",
"unit_price": "2.98",
"size_value": "3",
"size_unit": "lb",
"measure_type": "weight",
"price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
giant_orders = [
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
]
costco_orders = [
{
"order_id": "c1",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
}
]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row],
[costco_row],
giant_orders,
costco_orders,
[],
)
self.assertEqual(2, len(rows))
self.assertTrue(all(row["canonical_product_id"] for row in rows))
self.assertEqual({"giant", "costco"}, {row["retailer"] for row in rows})
self.assertEqual("https://example.test/banana.jpg", rows[0]["image_url"])
def test_main_writes_purchase_and_example_csvs(self):
with tempfile.TemporaryDirectory() as tmpdir:
giant_items = Path(tmpdir) / "giant_items.csv"
costco_items = Path(tmpdir) / "costco_items.csv"
giant_orders = Path(tmpdir) / "giant_orders.csv"
costco_orders = Path(tmpdir) / "costco_orders.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
links_csv = Path(tmpdir) / "product_links.csv"
purchases_csv = Path(tmpdir) / "combined" / "purchases.csv"
examples_csv = Path(tmpdir) / "combined" / "comparison_examples.csv"
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "FRESH BANANA",
"item_name_norm": "BANANA",
"retailer_item_id": "100",
"upc": "4011",
"qty": "1",
"unit": "LB",
"line_total": "1.29",
"unit_price": "1.29",
"measure_type": "weight",
"price_per_lb": "1.29",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
costco_row = {field: "" for field in fieldnames}
costco_row.update(
{
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_item_key": "costco:c1:1",
"order_date": "2026-03-12",
"item_name": "BANANAS 3 LB / 1.36 KG",
"item_name_norm": "BANANA",
"retailer_item_id": "30669",
"qty": "1",
"unit": "E",
"line_total": "2.98",
"unit_price": "2.98",
"size_value": "3",
"size_unit": "lb",
"measure_type": "weight",
"price_per_lb": "0.9933",
"raw_order_path": "costco_output/raw/c1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
for path, source_rows in [
(giant_items, [giant_row]),
(costco_items, [costco_row]),
]:
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(source_rows)
order_fields = ["order_id", "store_name", "store_number", "store_city", "store_state"]
for path, source_rows in [
(
giant_orders,
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
),
(
costco_orders,
[
{
"order_id": "c1",
"store_name": "MT VERNON",
"store_number": "1115",
"store_city": "ALEXANDRIA",
"store_state": "VA",
}
],
),
]:
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=order_fields)
writer.writeheader()
writer.writerows(source_rows)
build_purchases.main.callback(
giant_items_enriched_csv=str(giant_items),
costco_items_enriched_csv=str(costco_items),
giant_orders_csv=str(giant_orders),
costco_orders_csv=str(costco_orders),
resolutions_csv=str(resolutions_csv),
catalog_csv=str(catalog_csv),
links_csv=str(links_csv),
output_csv=str(purchases_csv),
examples_csv=str(examples_csv),
)
self.assertTrue(purchases_csv.exists())
self.assertTrue(examples_csv.exists())
with purchases_csv.open(newline="", encoding="utf-8") as handle:
purchase_rows = list(csv.DictReader(handle))
with examples_csv.open(newline="", encoding="utf-8") as handle:
example_rows = list(csv.DictReader(handle))
self.assertEqual(2, len(purchase_rows))
self.assertEqual(1, len(example_rows))
def test_build_purchase_rows_applies_manual_resolution(self):
fieldnames = enrich_costco.OUTPUT_FIELDS
giant_row = {field: "" for field in fieldnames}
giant_row.update(
{
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_item_key": "giant:g1:1",
"order_date": "2026-03-01",
"item_name": "SB BAGGED ICE 20LB",
"item_name_norm": "BAGGED ICE",
"retailer_item_id": "100",
"upc": "",
"qty": "1",
"unit": "EA",
"line_total": "3.50",
"unit_price": "3.50",
"measure_type": "each",
"raw_order_path": "giant_output/raw/g1.json",
"is_discount_line": "false",
"is_coupon_line": "false",
"is_fee": "false",
}
)
observed_rows, _canonical_rows, _link_rows, _observed_id_by_key, _canonical_by_observed = (
build_purchases.build_link_state([giant_row])
)
observed_product_id = observed_rows[0]["observed_product_id"]
rows, _observed, _canon, _links = build_purchases.build_purchase_rows(
[giant_row],
[],
[
{
"order_id": "g1",
"store_name": "Giant",
"store_number": "42",
"store_city": "Springfield",
"store_state": "VA",
}
],
[],
[
{
"observed_product_id": observed_product_id,
"canonical_product_id": "gcan_manual_ice",
"resolution_action": "create",
"status": "approved",
"resolution_notes": "manual ice merge",
"reviewed_at": "2026-03-16",
}
],
)
self.assertEqual("gcan_manual_ice", rows[0]["canonical_product_id"])
self.assertEqual("approved", rows[0]["review_status"])
self.assertEqual("create", rows[0]["resolution_action"])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,409 @@
import csv
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from click.testing import CliRunner
import review_products
class ReviewWorkflowTests(unittest.TestCase):
def test_build_review_queue_groups_unresolved_purchases(self):
queue_rows = review_products.build_review_queue(
[
{
"observed_product_id": "gobs_1",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"upc": "",
"line_total": "3.50",
},
{
"observed_product_id": "gobs_1",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAG ICE CUBED 10LB",
"normalized_item_name": "BAG ICE",
"upc": "",
"line_total": "2.50",
},
],
[],
)
self.assertEqual(1, len(queue_rows))
self.assertEqual("gobs_1", queue_rows[0]["observed_product_id"])
self.assertIn("SB BAGGED ICE 20LB", queue_rows[0]["raw_item_names"])
def test_build_canonical_suggestions_prefers_upc_then_name(self):
suggestions = review_products.build_canonical_suggestions(
[
{
"normalized_item_name": "MIXED PEPPER",
"upc": "12345",
}
],
[
{
"canonical_product_id": "gcan_1",
"canonical_name": "MIXED PEPPER",
"upc": "",
},
{
"canonical_product_id": "gcan_2",
"canonical_name": "MIXED PEPPER 6 PACK",
"upc": "12345",
},
],
)
self.assertEqual("gcan_2", suggestions[0]["canonical_product_id"])
self.assertEqual("exact upc", suggestions[0]["reason"])
self.assertEqual("gcan_1", suggestions[1]["canonical_product_id"])
def test_review_products_displays_position_items_and_suggestions(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
purchase_fields = [
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
]
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=purchase_fields)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "https://example.test/mixed-pepper.jpg",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "produce",
"product_type": "pepper",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
runner = CliRunner()
result = runner.invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Review 1/1: Resolve observed_product MIXED PEPPER to canonical_name [__]?", result.output)
self.assertIn("2 matched items:", result.output)
self.assertIn("[l]ink existing [n]ew canonical e[x]clude [s]kip [q]uit:", result.output)
first_item = result.output.index("[1] 2026-03-14 | 7.49")
second_item = result.output.index("[2] 2026-03-12 | 6.99")
self.assertLess(first_item, second_item)
self.assertIn("https://example.test/mixed-pepper.jpg", result.output)
self.assertIn("1 canonical suggestions found:", result.output)
self.assertIn("[1] MIXED PEPPER", result.output)
self.assertIn("\x1b[", result.output)
def test_review_products_no_suggestions_is_informational(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-14",
"retailer": "giant",
"order_id": "g1",
"line_no": "1",
"observed_product_id": "gobs_ice",
"canonical_product_id": "",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
}
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
],
input="q\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("no canonical_name suggestions found", result.output)
def test_link_existing_uses_numbered_selection_and_confirmation(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"retailer",
"order_id",
"line_no",
"observed_product_id",
"canonical_product_id",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
],
)
writer.writeheader()
writer.writerows(
[
{
"purchase_date": "2026-03-14",
"retailer": "costco",
"order_id": "c2",
"line_no": "2",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "7.49",
},
{
"purchase_date": "2026-03-12",
"retailer": "costco",
"order_id": "c1",
"line_no": "1",
"observed_product_id": "gobs_mix",
"canonical_product_id": "",
"raw_item_name": "MIXED PEPPER 6-PACK",
"normalized_item_name": "MIXED PEPPER",
"image_url": "",
"upc": "",
"line_total": "6.99",
},
]
)
with catalog_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=review_products.build_purchases.CATALOG_FIELDS)
writer.writeheader()
writer.writerow(
{
"canonical_product_id": "gcan_mix",
"canonical_name": "MIXED PEPPER",
"category": "",
"product_type": "",
"brand": "",
"variant": "",
"size_value": "",
"size_unit": "",
"pack_qty": "",
"measure_type": "",
"notes": "",
"created_at": "",
"updated_at": "",
}
)
result = CliRunner().invoke(
review_products.main,
[
"--purchases-csv",
str(purchases_csv),
"--queue-csv",
str(queue_csv),
"--resolutions-csv",
str(resolutions_csv),
"--catalog-csv",
str(catalog_csv),
"--limit",
"1",
],
input="l\n1\ny\nlinked by test\n",
color=True,
)
self.assertEqual(0, result.exit_code)
self.assertIn("Select the canonical_name to associate 2 items with:", result.output)
self.assertIn('[1] MIXED PEPPER | gcan_mix', result.output)
self.assertIn('2 "MIXED PEPPER" items and future matches will be associated with "MIXED PEPPER".', result.output)
self.assertIn("actions: [y]es [n]o [b]ack [s]kip [q]uit", result.output)
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
rows = list(csv.DictReader(handle))
self.assertEqual("gcan_mix", rows[0]["canonical_product_id"])
self.assertEqual("link", rows[0]["resolution_action"])
def test_review_products_creates_canonical_and_resolution(self):
with tempfile.TemporaryDirectory() as tmpdir:
purchases_csv = Path(tmpdir) / "purchases.csv"
queue_csv = Path(tmpdir) / "review_queue.csv"
resolutions_csv = Path(tmpdir) / "review_resolutions.csv"
catalog_csv = Path(tmpdir) / "canonical_catalog.csv"
with purchases_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"purchase_date",
"observed_product_id",
"canonical_product_id",
"retailer",
"raw_item_name",
"normalized_item_name",
"image_url",
"upc",
"line_total",
"order_id",
"line_no",
],
)
writer.writeheader()
writer.writerow(
{
"purchase_date": "2026-03-15",
"observed_product_id": "gobs_ice",
"canonical_product_id": "",
"retailer": "giant",
"raw_item_name": "SB BAGGED ICE 20LB",
"normalized_item_name": "BAGGED ICE",
"image_url": "",
"upc": "",
"line_total": "3.50",
"order_id": "g1",
"line_no": "1",
}
)
with mock.patch.object(
review_products.click,
"prompt",
side_effect=["n", "ICE", "frozen", "ice", "manual merge", "q"],
):
review_products.main.callback(
purchases_csv=str(purchases_csv),
queue_csv=str(queue_csv),
resolutions_csv=str(resolutions_csv),
catalog_csv=str(catalog_csv),
limit=1,
refresh_only=False,
)
self.assertTrue(queue_csv.exists())
self.assertTrue(resolutions_csv.exists())
self.assertTrue(catalog_csv.exists())
with resolutions_csv.open(newline="", encoding="utf-8") as handle:
resolution_rows = list(csv.DictReader(handle))
with catalog_csv.open(newline="", encoding="utf-8") as handle:
catalog_rows = list(csv.DictReader(handle))
self.assertEqual("create", resolution_rows[0]["resolution_action"])
self.assertEqual("approved", resolution_rows[0]["status"])
self.assertEqual("ICE", catalog_rows[0]["canonical_name"])
if __name__ == "__main__":
unittest.main()